Skip to main content

fxfs/object_store/
journal.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! The journal is implemented as an ever extending file which contains variable length records
6//! that describe mutations to be applied to various objects.  The journal file consists of
7//! blocks, with a checksum at the end of each block, but otherwise it can be considered a
8//! continuous stream.
9//!
10//! The checksum is seeded with the checksum from the previous block.  To free space in the
11//! journal, records are replaced with sparse extents when it is known they are no longer
12//! needed to mount.
13//!
14//! At mount time, the journal is replayed: the mutations are applied into memory.
15//! Eventually, a checksum failure will indicate no more records exist to be replayed,
16//! at which point the mount can continue and the journal will be extended from that point with
17//! further mutations as required.
18
19mod bootstrap_handle;
20mod checksum_list;
21mod reader;
22pub mod super_block;
23mod writer;
24
25use crate::checksum::{Checksum, Checksums, ChecksumsV38};
26use crate::errors::FxfsError;
27use crate::filesystem::{ApplyContext, ApplyMode, FxFilesystem, SyncOptions};
28use crate::log::*;
29use crate::lsm_tree::cache::NullCache;
30use crate::lsm_tree::types::Layer;
31use crate::object_handle::{ObjectHandle as _, ReadObjectHandle};
32use crate::object_store::allocator::Allocator;
33use crate::object_store::data_object_handle::OverwriteOptions;
34use crate::object_store::extent_record::{ExtentMode, ExtentValue};
35use crate::object_store::graveyard::Graveyard;
36use crate::object_store::journal::bootstrap_handle::BootstrapObjectHandle;
37use crate::object_store::journal::checksum_list::ChecksumList;
38use crate::object_store::journal::reader::{JournalReader, ReadResult};
39use crate::object_store::journal::super_block::{
40    SuperBlockHeader, SuperBlockInstance, SuperBlockManager,
41};
42use crate::object_store::journal::writer::JournalWriter;
43use crate::object_store::object_manager::ObjectManager;
44use crate::object_store::object_record::{AttributeKey, ObjectKey, ObjectKeyData, ObjectValue};
45use crate::object_store::transaction::{
46    AllocatorMutation, LockKey, Mutation, MutationV40, MutationV41, MutationV43, MutationV46,
47    MutationV47, MutationV49, MutationV50, MutationV54, MutationV55, ObjectStoreMutation, Options,
48    TRANSACTION_MAX_JOURNAL_USAGE, Transaction, TxnMutation, lock_keys,
49};
50use crate::object_store::{
51    AssocObj, AttributeId, DataObjectHandle, Extent, HandleOptions, HandleOwner, INVALID_OBJECT_ID,
52    Item, ItemRef, NewChildStoreOptions, ObjectStore, ReservedId,
53};
54use crate::range::RangeExt;
55use crate::round::{round_div, round_down};
56use crate::serialized_types::{
57    LATEST_VERSION, Migrate, Version, Versioned, migrate_nodefault, migrate_to_version,
58};
59use anyhow::{Context, Error, anyhow, bail, ensure};
60use event_listener::Event;
61use fprint::TypeFingerprint;
62use fuchsia_inspect::NumericProperty;
63use fuchsia_sync::Mutex;
64use futures::FutureExt as _;
65use futures::future::poll_fn;
66use rustc_hash::FxHashMap as HashMap;
67use serde::{Deserialize, Serialize};
68use static_assertions::const_assert;
69use std::clone::Clone;
70use std::collections::HashSet;
71use std::num::NonZero;
72use std::ops::{Bound, Range};
73use std::sync::atomic::{AtomicBool, Ordering};
74use std::sync::{Arc, OnceLock};
75use std::task::{Poll, Waker};
76use storage_device::Device;
77
78// The journal file is written to in blocks of this size.
79pub const BLOCK_SIZE: u64 = 4096;
80
81// The journal file is extended by this amount when necessary.
82const CHUNK_SIZE: u64 = 131_072;
83const_assert!(CHUNK_SIZE > TRANSACTION_MAX_JOURNAL_USAGE);
84
85// See the comment for the `reclaim_size` member of Inner.
86pub const DEFAULT_RECLAIM_SIZE: u64 = 524_288;
87
88// Temporary space that should be reserved for the journal.  For example: space that is currently
89// used in the journal file but cannot be deallocated yet because we are flushing.
90pub const RESERVED_SPACE: u64 = 1_048_576;
91
92// Whenever the journal is replayed (i.e. the system is unmounted and remounted), we reset the
93// journal stream, at which point any half-complete transactions are discarded.  We indicate a
94// journal reset by XORing the previous block's checksum with this mask, and using that value as a
95// seed for the next journal block.
96const RESET_XOR: u64 = 0xffffffffffffffff;
97
98// To keep track of offsets within a journal file, we need both the file offset and the check-sum of
99// the preceding block, since the check-sum of the preceding block is an input to the check-sum of
100// every block.
101pub type JournalCheckpoint = JournalCheckpointV32;
102
103#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize, TypeFingerprint)]
104pub struct JournalCheckpointV32 {
105    pub file_offset: u64,
106
107    // Starting check-sum for block that contains file_offset i.e. the checksum for the previous
108    // block.
109    pub checksum: Checksum,
110
111    // If versioned, the version of elements stored in the journal. e.g. JournalRecord version.
112    // This can change across reset events so we store it along with the offset and checksum to
113    // know which version to deserialize.
114    pub version: Version,
115}
116
117pub type JournalRecord = JournalRecordV55;
118
119#[allow(clippy::large_enum_variant)]
120#[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint, Versioned)]
121#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
122pub enum JournalRecordV55 {
123    EndBlock,
124    Mutation {
125        object_id: u64,
126        mutation: MutationV55,
127    },
128    /// Commits records in the transaction.
129    Commit,
130    /// Discard all mutations with offsets greater than or equal to the given offset.
131    Discard(u64),
132    /// Indicates the device was flushed at the given journal offset.
133    /// Note that this really means that at this point in the journal offset, we can be certain that
134    /// there's no remaining buffered data in the block device; the buffers and the disk contents are
135    /// consistent.
136    /// We insert one of these records *after* a flush along with the *next* transaction to go
137    /// through.  If that never comes (either due to graceful or hard shutdown), the journal reset
138    /// on the next mount will serve the same purpose and count as a flush, although it is necessary
139    /// to defensively flush the device before replaying the journal (if possible, i.e. not
140    /// read-only) in case the block device connection was reused.
141    DidFlushDevice(u64),
142    /// Checksums for a data range written by this transaction. A transaction is only valid if these
143    /// checksums are right. The range is the device offset the checksums are for.
144    ///
145    /// A boolean indicates whether this range is being written to for the first time. For overwrite
146    /// extents, we only check the checksums for a block if it has been written to for the first
147    /// time since the last flush, because otherwise we can't roll it back anyway so it doesn't
148    /// matter. For copy-on-write extents, the bool is always true.
149    DataChecksums(Range<u64>, crate::checksum::ChecksumsV38, bool),
150}
151
152#[allow(clippy::large_enum_variant)]
153#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
154#[migrate_to_version(JournalRecordV55)]
155#[migrate_nodefault]
156pub enum JournalRecordV54 {
157    EndBlock,
158    Mutation { object_id: u64, mutation: MutationV54 },
159    Commit,
160    Discard(u64),
161    DidFlushDevice(u64),
162    DataChecksums(Range<u64>, crate::checksum::ChecksumsV38, bool),
163}
164
165#[allow(clippy::large_enum_variant)]
166#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
167#[migrate_to_version(JournalRecordV54)]
168#[migrate_nodefault]
169pub enum JournalRecordV50 {
170    EndBlock,
171    Mutation { object_id: u64, mutation: MutationV50 },
172    Commit,
173    Discard(u64),
174    DidFlushDevice(u64),
175    DataChecksums(Range<u64>, ChecksumsV38, bool),
176}
177
178#[allow(clippy::large_enum_variant)]
179#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
180#[migrate_to_version(JournalRecordV50)]
181pub enum JournalRecordV49 {
182    EndBlock,
183    Mutation { object_id: u64, mutation: MutationV49 },
184    Commit,
185    Discard(u64),
186    DidFlushDevice(u64),
187    DataChecksums(Range<u64>, ChecksumsV38, bool),
188}
189
190#[allow(clippy::large_enum_variant)]
191#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
192#[migrate_to_version(JournalRecordV49)]
193pub enum JournalRecordV47 {
194    EndBlock,
195    Mutation { object_id: u64, mutation: MutationV47 },
196    Commit,
197    Discard(u64),
198    DidFlushDevice(u64),
199    DataChecksums(Range<u64>, ChecksumsV38, bool),
200}
201
202#[allow(clippy::large_enum_variant)]
203#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
204#[migrate_to_version(JournalRecordV47)]
205pub enum JournalRecordV46 {
206    EndBlock,
207    Mutation { object_id: u64, mutation: MutationV46 },
208    Commit,
209    Discard(u64),
210    DidFlushDevice(u64),
211    DataChecksums(Range<u64>, ChecksumsV38, bool),
212}
213
214#[allow(clippy::large_enum_variant)]
215#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
216#[migrate_to_version(JournalRecordV46)]
217pub enum JournalRecordV43 {
218    EndBlock,
219    Mutation { object_id: u64, mutation: MutationV43 },
220    Commit,
221    Discard(u64),
222    DidFlushDevice(u64),
223    DataChecksums(Range<u64>, ChecksumsV38, bool),
224}
225
226#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
227#[migrate_to_version(JournalRecordV43)]
228pub enum JournalRecordV42 {
229    EndBlock,
230    Mutation { object_id: u64, mutation: MutationV41 },
231    Commit,
232    Discard(u64),
233    DidFlushDevice(u64),
234    DataChecksums(Range<u64>, ChecksumsV38, bool),
235}
236
237#[derive(Serialize, Deserialize, TypeFingerprint, Versioned)]
238pub enum JournalRecordV41 {
239    EndBlock,
240    Mutation { object_id: u64, mutation: MutationV41 },
241    Commit,
242    Discard(u64),
243    DidFlushDevice(u64),
244    DataChecksums(Range<u64>, ChecksumsV38),
245}
246
247impl From<JournalRecordV41> for JournalRecordV42 {
248    fn from(record: JournalRecordV41) -> Self {
249        match record {
250            JournalRecordV41::EndBlock => Self::EndBlock,
251            JournalRecordV41::Mutation { object_id, mutation } => {
252                Self::Mutation { object_id, mutation: mutation.into() }
253            }
254            JournalRecordV41::Commit => Self::Commit,
255            JournalRecordV41::Discard(offset) => Self::Discard(offset),
256            JournalRecordV41::DidFlushDevice(offset) => Self::DidFlushDevice(offset),
257            JournalRecordV41::DataChecksums(range, sums) => {
258                // At the time of writing the only extents written by real systems are CoW extents
259                // so the new bool is always true.
260                Self::DataChecksums(range, sums, true)
261            }
262        }
263    }
264}
265
266#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
267#[migrate_to_version(JournalRecordV41)]
268pub enum JournalRecordV40 {
269    EndBlock,
270    Mutation { object_id: u64, mutation: MutationV40 },
271    Commit,
272    Discard(u64),
273    DidFlushDevice(u64),
274    DataChecksums(Range<u64>, ChecksumsV38),
275}
276
277pub(super) fn journal_handle_options() -> HandleOptions {
278    HandleOptions { skip_journal_checks: true, ..Default::default() }
279}
280
281/// The journal records a stream of mutations that are to be applied to other objects.  At mount
282/// time, these records can be replayed into memory.  It provides a way to quickly persist changes
283/// without having to make a large number of writes; they can be deferred to a later time (e.g.
284/// when a sufficient number have been queued).  It also provides support for transactions, the
285/// ability to have mutations that are to be applied atomically together.
286pub struct Journal {
287    objects: Arc<ObjectManager>,
288    handle: OnceLock<DataObjectHandle<ObjectStore>>,
289    super_block_manager: SuperBlockManager,
290    inner: Mutex<Inner>,
291    writer_mutex: Mutex<()>,
292    sync_mutex: futures::lock::Mutex<()>,
293    trace: AtomicBool,
294
295    // This event is used when we are waiting for a compaction to free up journal space.
296    reclaim_event: Event,
297}
298
299struct Inner {
300    super_block_header: SuperBlockHeader,
301
302    // The offset that we can zero the journal up to now that it is no longer needed.
303    zero_offset: Option<u64>,
304
305    // The journal offset that we most recently flushed to the device.
306    device_flushed_offset: u64,
307
308    // If true, indicates a DidFlushDevice record is pending.
309    needs_did_flush_device: bool,
310
311    // The writer for the journal.
312    writer: JournalWriter,
313
314    // Set when a reset is encountered during a read.
315    // Used at write pre_commit() time to ensure we write a version first thing after a reset.
316    output_reset_version: bool,
317
318    // Waker for the flush task.
319    flush_waker: Option<Waker>,
320
321    // Indicates the journal has been terminated.
322    terminate: bool,
323
324    // Latched error indicating reason for journal termination if not graceful.
325    terminate_reason: Option<Error>,
326
327    // Disable compactions.
328    disable_compactions: bool,
329
330    // True if compactions are running.
331    compaction_running: bool,
332
333    // Waker for the sync task for when it's waiting for the flush task to finish.
334    sync_waker: Option<Waker>,
335
336    // The last offset we flushed to the journal file.
337    flushed_offset: u64,
338
339    // The last offset that should be considered valid in the journal file.  Most of the time, this
340    // will be the same as `flushed_offset` but at mount time, this could be less and will only be
341    // up to the end of the last valid transaction; it won't include transactions that follow that
342    // have been discarded.
343    valid_to: u64,
344
345    // If, after replaying, we have to discard a number of mutations (because they don't validate),
346    // this offset specifies where we need to discard back to.  This is so that when we next replay,
347    // we ignore those mutations and continue with new good mutations.
348    discard_offset: Option<u64>,
349
350    // In the steady state, the journal should fluctuate between being approximately half of this
351    // number and this number.  New super-blocks will be written every time about half of this
352    // amount is written to the journal.
353    reclaim_size: u64,
354
355    image_builder_mode: Option<SuperBlockInstance>,
356
357    // If true and `needs_barrier`, issue a pre-barrier on the first device write of each journal
358    // write (which happens in multiples of `BLOCK_SIZE`). This ensures that all the corresponding
359    // data writes make it to disk before the journal gets written to.
360    barriers_enabled: bool,
361
362    // If true, indicates that data write requests have been made to the device since the last
363    // journal write.
364    needs_barrier: bool,
365
366    // True if a compaction is being forced for reasons other than the journal being full.
367    forced_compaction: bool,
368}
369
370impl Inner {
371    fn terminate(&mut self, reason: Option<Error>) {
372        self.terminate = true;
373
374        if let Some(err) = reason {
375            error!(error:? = err; "Terminating journal");
376            // Log previous error if one was already set, otherwise latch the error.
377            if let Some(prev_err) = self.terminate_reason.as_ref() {
378                error!(error:? = prev_err; "Journal previously terminated");
379            } else {
380                self.terminate_reason = Some(err);
381            }
382        }
383
384        if let Some(waker) = self.flush_waker.take() {
385            waker.wake();
386        }
387        if let Some(waker) = self.sync_waker.take() {
388            waker.wake();
389        }
390    }
391}
392
393pub struct JournalOptions {
394    /// In the steady state, the journal should fluctuate between being approximately half of this
395    /// number and this number.  New super-blocks will be written every time about half of this
396    /// amount is written to the journal.
397    pub reclaim_size: u64,
398
399    // If true, issue a pre-barrier on the first device write of each journal write (which happens
400    // in multiples of `BLOCK_SIZE`). This ensures that all the corresponding data writes make it
401    // to disk before the journal gets written to.
402    pub barriers_enabled: bool,
403}
404
405impl Default for JournalOptions {
406    fn default() -> Self {
407        JournalOptions { reclaim_size: DEFAULT_RECLAIM_SIZE, barriers_enabled: false }
408    }
409}
410
411struct JournaledTransactions {
412    transactions: Vec<JournaledTransaction>,
413    device_flushed_offset: u64,
414}
415
416#[derive(Debug, Default)]
417pub struct JournaledTransaction {
418    pub checkpoint: JournalCheckpoint,
419    pub root_parent_mutations: Vec<Mutation>,
420    pub root_mutations: Vec<Mutation>,
421    /// List of (store_object_id, mutation).
422    pub non_root_mutations: Vec<(u64, Mutation)>,
423    pub end_offset: u64,
424    pub checksums: Vec<JournaledChecksums>,
425
426    /// Records offset + 1 of the matching begin_flush transaction. The +1 is because we want to
427    /// ignore the begin flush transaction; we don't need or want to replay it.
428    pub end_flush: Option<(/* store_id: */ u64, /* begin offset: */ u64)>,
429
430    /// The volume which was deleted in this transaction, if any.
431    pub volume_deleted: Option</* store_id: */ u64>,
432}
433
434impl JournaledTransaction {
435    fn new(checkpoint: JournalCheckpoint) -> Self {
436        Self { checkpoint, ..Default::default() }
437    }
438}
439
440const VOLUME_DELETED: u64 = u64::MAX;
441
442#[derive(Debug)]
443pub struct JournaledChecksums {
444    pub device_range: Range<u64>,
445    pub checksums: Checksums,
446    pub first_write: bool,
447}
448
449/// Handles for journal-like objects have some additional functionality to manage their extents,
450/// since during replay we need to add extents as we find them.
451pub trait JournalHandle: ReadObjectHandle {
452    /// The end offset of the last extent in the JournalHandle.  Used only for validating extents
453    /// (which will be skipped if None is returned).
454    /// Note this is equivalent in value to ReadObjectHandle::get_size, when present.
455    fn end_offset(&self) -> Option<u64>;
456    /// Adds an extent to the current end of the journal stream.
457    /// `added_offset` is the offset into the journal of the transaction which added this extent,
458    /// used for discard_extents.
459    fn push_extent(&mut self, added_offset: u64, device_range: Range<u64>);
460    /// Discards all extents which were added in a transaction at offset >= |discard_offset|.
461    fn discard_extents(&mut self, discard_offset: u64);
462}
463
464// Provide a stub implementation for DataObjectHandle so we can use it in
465// Journal::read_transactions.  Manual extent management is a NOP (which is OK since presumably the
466// DataObjectHandle already knows where its extents live).
467impl<S: HandleOwner> JournalHandle for DataObjectHandle<S> {
468    fn end_offset(&self) -> Option<u64> {
469        None
470    }
471    fn push_extent(&mut self, _added_offset: u64, _device_range: Range<u64>) {
472        // NOP
473    }
474    fn discard_extents(&mut self, _discard_offset: u64) {
475        // NOP
476    }
477}
478
479#[fxfs_trace::trace]
480impl Journal {
481    pub fn new(objects: Arc<ObjectManager>, options: JournalOptions) -> Journal {
482        let starting_checksum = rand::random_range(1..u64::MAX);
483        Journal {
484            objects: objects,
485            handle: OnceLock::new(),
486            super_block_manager: SuperBlockManager::new(),
487            inner: Mutex::new(Inner {
488                super_block_header: SuperBlockHeader::default(),
489                zero_offset: None,
490                device_flushed_offset: 0,
491                needs_did_flush_device: false,
492                writer: JournalWriter::new(BLOCK_SIZE as usize, starting_checksum),
493                output_reset_version: false,
494                flush_waker: None,
495                terminate: false,
496                terminate_reason: None,
497                disable_compactions: false,
498                compaction_running: false,
499                sync_waker: None,
500                flushed_offset: 0,
501                valid_to: 0,
502                discard_offset: None,
503                reclaim_size: options.reclaim_size,
504                image_builder_mode: None,
505                barriers_enabled: options.barriers_enabled,
506                needs_barrier: false,
507                forced_compaction: false,
508            }),
509            writer_mutex: Mutex::new(()),
510            sync_mutex: futures::lock::Mutex::new(()),
511            trace: AtomicBool::new(false),
512            reclaim_event: Event::new(),
513        }
514    }
515
516    pub fn set_trace(&self, trace: bool) {
517        let old_value = self.trace.swap(trace, Ordering::Relaxed);
518        if trace != old_value {
519            info!(trace; "J: trace");
520        }
521    }
522
523    pub fn set_image_builder_mode(&self, mode: Option<SuperBlockInstance>) {
524        self.inner.lock().image_builder_mode = mode;
525        if let Some(instance) = mode {
526            *self.super_block_manager.next_instance.lock() = instance;
527        }
528    }
529
530    pub fn image_builder_mode(&self) -> Option<SuperBlockInstance> {
531        self.inner.lock().image_builder_mode
532    }
533
534    #[cfg(feature = "migration")]
535    pub fn set_filesystem_uuid(&self, uuid: &[u8; 16]) -> Result<(), Error> {
536        ensure!(
537            self.inner.lock().image_builder_mode.is_some(),
538            "Can only set filesystem uuid in image builder mode."
539        );
540        self.inner.lock().super_block_header.guid.0 = uuid::Uuid::from_bytes(*uuid);
541        Ok(())
542    }
543
544    pub(crate) async fn read_superblocks(
545        &self,
546        device: Arc<dyn Device>,
547        block_size: u64,
548    ) -> Result<(SuperBlockHeader, ObjectStore), Error> {
549        self.super_block_manager.load(device, block_size).await
550    }
551
552    /// Used during replay to validate a mutation.  This should return false if the mutation is not
553    /// valid and should not be applied.  This could be for benign reasons: e.g. the device flushed
554    /// data out-of-order, or because of a malicious actor.
555    fn validate_mutation(&self, mutation: &Mutation, block_size: u64, device_size: u64) -> bool {
556        match mutation {
557            Mutation::ObjectStore(ObjectStoreMutation {
558                item:
559                    Item {
560                        key:
561                            ObjectKey {
562                                data: ObjectKeyData::Attribute(_, AttributeKey::Extent(extent)),
563                                ..
564                            },
565                        value: ObjectValue::Extent(ExtentValue::Some { device_offset, mode, .. }),
566                        ..
567                    },
568                ..
569            }) => {
570                if extent.is_empty() || !extent.is_aligned(block_size) {
571                    return false;
572                }
573                let len = extent.length().unwrap();
574                if let ExtentMode::Cow(checksums) = mode {
575                    if checksums.len() > 0 {
576                        if len % checksums.len() as u64 != 0 {
577                            return false;
578                        }
579                        if (len / checksums.len() as u64) % block_size != 0 {
580                            return false;
581                        }
582                    }
583                }
584                if *device_offset % block_size != 0
585                    || *device_offset >= device_size
586                    || device_size - *device_offset < len
587                {
588                    return false;
589                }
590            }
591            Mutation::ObjectStore(_) => {}
592            Mutation::EncryptedObjectStore(_) => {}
593            Mutation::Allocator(AllocatorMutation::Allocate { device_range, owner_object_id }) => {
594                return !device_range.is_empty()
595                    && *owner_object_id != INVALID_OBJECT_ID
596                    && device_range.end <= device_size;
597            }
598            Mutation::Allocator(AllocatorMutation::Deallocate {
599                device_range,
600                owner_object_id,
601            }) => {
602                return !device_range.is_empty()
603                    && *owner_object_id != INVALID_OBJECT_ID
604                    && device_range.end <= device_size;
605            }
606            Mutation::Allocator(AllocatorMutation::MarkForDeletion(owner_object_id)) => {
607                return *owner_object_id != INVALID_OBJECT_ID;
608            }
609            Mutation::Allocator(AllocatorMutation::SetLimit { owner_object_id, .. }) => {
610                return *owner_object_id != INVALID_OBJECT_ID;
611            }
612            Mutation::BeginFlush => {}
613            Mutation::EndFlush => {}
614            Mutation::DeleteVolume => {}
615            Mutation::UpdateBorrowed(_) => {}
616            Mutation::UpdateMutationsKey(_) => {}
617            Mutation::CreateInternalDir(owner_object_id) => {
618                return *owner_object_id != INVALID_OBJECT_ID;
619            }
620        }
621        true
622    }
623
624    // Assumes that `mutation` has been validated.
625    fn update_checksum_list(
626        &self,
627        journal_offset: u64,
628        mutation: &Mutation,
629        checksum_list: &mut ChecksumList,
630    ) -> Result<(), Error> {
631        match mutation {
632            Mutation::ObjectStore(_) => {}
633            Mutation::Allocator(AllocatorMutation::Deallocate { device_range, .. }) => {
634                checksum_list.mark_deallocated(journal_offset, device_range.clone().into());
635            }
636            _ => {}
637        }
638        Ok(())
639    }
640
641    /// Reads the latest super-block, and then replays journaled records.
642    #[trace]
643    pub async fn replay(
644        &self,
645        filesystem: Arc<FxFilesystem>,
646        on_new_allocator: Option<Box<dyn Fn(Arc<Allocator>) + Send + Sync>>,
647    ) -> Result<(), Error> {
648        let block_size = filesystem.block_size();
649
650        let (super_block, root_parent) =
651            self.super_block_manager.load(filesystem.device(), block_size).await?;
652
653        let root_parent = Arc::new(ObjectStore::attach_filesystem(root_parent, filesystem.clone()));
654
655        self.objects.set_root_parent_store(root_parent.clone());
656        let allocator =
657            Arc::new(Allocator::new(filesystem.clone(), super_block.allocator_object_id));
658        if let Some(on_new_allocator) = on_new_allocator {
659            on_new_allocator(allocator.clone());
660        }
661        self.objects.set_allocator(allocator.clone());
662        self.objects.set_borrowed_metadata_space(super_block.borrowed_metadata_space);
663        self.objects.set_last_end_offset(super_block.super_block_journal_file_offset);
664        {
665            let mut inner = self.inner.lock();
666            inner.super_block_header = super_block.clone();
667        }
668
669        let device = filesystem.device();
670
671        let mut handle;
672        {
673            let root_parent_layer = root_parent.tree().mutable_layer();
674            let mut iter = root_parent_layer
675                .seek(Bound::Included(&ObjectKey::attribute(
676                    super_block.journal_object_id,
677                    AttributeId::DATA,
678                    AttributeKey::Extent(Extent::search_key_from_offset(round_down(
679                        super_block.journal_checkpoint.file_offset,
680                        BLOCK_SIZE,
681                    ))),
682                )))
683                .await
684                .context("Failed to seek root parent store")?;
685            let start_offset = if let Some(ItemRef {
686                key:
687                    ObjectKey {
688                        data:
689                            ObjectKeyData::Attribute(AttributeId::DATA, AttributeKey::Extent(extent)),
690                        ..
691                    },
692                ..
693            }) = iter.get()
694            {
695                extent.start
696            } else {
697                0
698            };
699            handle = BootstrapObjectHandle::new_with_start_offset(
700                super_block.journal_object_id,
701                device.clone(),
702                start_offset,
703            );
704            while let Some(item) = iter.get() {
705                if !match item.into() {
706                    Some((
707                        object_id,
708                        AttributeId::DATA,
709                        extent,
710                        ExtentValue::Some { device_offset, .. },
711                    )) if object_id == super_block.journal_object_id => {
712                        if let Some(end_offset) = handle.end_offset() {
713                            if extent.start != end_offset {
714                                bail!(anyhow!(FxfsError::Inconsistent).context(format!(
715                                    "Unexpected journal extent {:?}, expected start: {}",
716                                    item, end_offset
717                                )));
718                            }
719                        }
720                        handle.push_extent(
721                            0, // We never discard extents from the root parent store.
722                            *device_offset
723                                ..*device_offset + extent.length().context("Invalid extent")?,
724                        );
725                        true
726                    }
727                    _ => false,
728                } {
729                    break;
730                }
731                iter.advance().await.context("Failed to advance root parent store iterator")?;
732            }
733        }
734
735        let mut reader = JournalReader::new(handle, &super_block.journal_checkpoint);
736        let JournaledTransactions { mut transactions, device_flushed_offset } = self
737            .read_transactions(&mut reader, None, INVALID_OBJECT_ID)
738            .await
739            .context("Reading transactions for replay")?;
740
741        // Validate all the mutations.
742        let mut checksum_list = ChecksumList::new(device_flushed_offset);
743        let mut valid_to = reader.journal_file_checkpoint().file_offset;
744        let device_size = device.size();
745        'bad_replay: for JournaledTransaction {
746            checkpoint,
747            root_parent_mutations,
748            root_mutations,
749            non_root_mutations,
750            checksums,
751            ..
752        } in &transactions
753        {
754            for JournaledChecksums { device_range, checksums, first_write } in checksums {
755                checksum_list
756                    .push(
757                        checkpoint.file_offset,
758                        device_range.clone(),
759                        checksums.maybe_as_ref().context("Malformed checksums")?,
760                        *first_write,
761                    )
762                    .context("Pushing journal checksum records to checksum list")?;
763            }
764            for mutation in root_parent_mutations
765                .iter()
766                .chain(root_mutations)
767                .chain(non_root_mutations.iter().map(|(_, m)| m))
768            {
769                if !self.validate_mutation(mutation, block_size, device_size) {
770                    info!(mutation:?; "Stopping replay at bad mutation");
771                    valid_to = checkpoint.file_offset;
772                    break 'bad_replay;
773                }
774                self.update_checksum_list(checkpoint.file_offset, &mutation, &mut checksum_list)?;
775            }
776        }
777
778        // Validate the checksums. Note if barriers are enabled, there will be no checksums in
779        // practice to verify.
780        let valid_to = checksum_list
781            .verify(device.as_ref(), valid_to)
782            .await
783            .context("Failed to validate checksums")?;
784
785        // Apply the mutations...
786
787        let mut last_checkpoint = reader.journal_file_checkpoint();
788        let mut journal_offsets = super_block.journal_file_offsets.clone();
789
790        // Start with the root-parent mutations, and also determine the journal offsets for all
791        // other objects.
792        for (
793            index,
794            JournaledTransaction {
795                checkpoint,
796                root_parent_mutations,
797                end_flush,
798                volume_deleted,
799                ..
800            },
801        ) in transactions.iter_mut().enumerate()
802        {
803            if checkpoint.file_offset >= valid_to {
804                last_checkpoint = checkpoint.clone();
805
806                // Truncate the transactions so we don't need to worry about them on the next pass.
807                transactions.truncate(index);
808                break;
809            }
810
811            let context = ApplyContext { mode: ApplyMode::Replay, checkpoint: checkpoint.clone() };
812            for mutation in root_parent_mutations.drain(..) {
813                self.objects
814                    .apply_mutation(
815                        super_block.root_parent_store_object_id,
816                        mutation,
817                        &context,
818                        AssocObj::None,
819                    )
820                    .context("Failed to replay root parent store mutations")?;
821            }
822
823            if let Some((object_id, journal_offset)) = end_flush {
824                journal_offsets.insert(*object_id, *journal_offset);
825            }
826
827            if let Some(object_id) = volume_deleted {
828                journal_offsets.insert(*object_id, VOLUME_DELETED);
829            }
830        }
831
832        // Now we can open the root store.
833        let root_store = ObjectStore::open(
834            &root_parent,
835            super_block.root_store_object_id,
836            Box::new(NullCache {}),
837        )
838        .await
839        .context("Unable to open root store")?;
840
841        ensure!(
842            !root_store.is_encrypted(),
843            anyhow!(FxfsError::Inconsistent).context("Root store is encrypted")
844        );
845        self.objects.set_root_store(root_store);
846
847        let root_store_offset =
848            journal_offsets.get(&super_block.root_store_object_id).copied().unwrap_or(0);
849
850        // Now replay the root store mutations.
851        for JournaledTransaction { checkpoint, root_mutations, .. } in &mut transactions {
852            if checkpoint.file_offset < root_store_offset {
853                continue;
854            }
855
856            let context = ApplyContext { mode: ApplyMode::Replay, checkpoint: checkpoint.clone() };
857            for mutation in root_mutations.drain(..) {
858                self.objects
859                    .apply_mutation(
860                        super_block.root_store_object_id,
861                        mutation,
862                        &context,
863                        AssocObj::None,
864                    )
865                    .context("Failed to replay root store mutations")?;
866            }
867        }
868
869        // Now we can open the allocator.
870        allocator.open().await.context("Failed to open allocator")?;
871
872        // Now replay all other mutations.
873        for JournaledTransaction { checkpoint, non_root_mutations, end_offset, .. } in transactions
874        {
875            self.objects
876                .replay_mutations(
877                    non_root_mutations,
878                    &journal_offsets,
879                    &ApplyContext { mode: ApplyMode::Replay, checkpoint },
880                    end_offset,
881                )
882                .await
883                .context("Failed to replay mutations")?;
884        }
885
886        allocator.on_replay_complete().await.context("Failed to complete replay for allocator")?;
887
888        let discarded_to =
889            if last_checkpoint.file_offset != reader.journal_file_checkpoint().file_offset {
890                Some(reader.journal_file_checkpoint().file_offset)
891            } else {
892                None
893            };
894
895        // Configure the journal writer so that we can continue.
896        {
897            if last_checkpoint.file_offset < super_block.super_block_journal_file_offset {
898                return Err(anyhow!(FxfsError::Inconsistent).context(format!(
899                    "journal replay cut short; journal finishes at {}, but super-block was \
900                     written at {}",
901                    last_checkpoint.file_offset, super_block.super_block_journal_file_offset
902                )));
903            }
904            let handle = ObjectStore::open_object(
905                &root_parent,
906                super_block.journal_object_id,
907                journal_handle_options(),
908                None,
909            )
910            .await
911            .with_context(|| {
912                format!(
913                    "Failed to open journal file (object id: {})",
914                    super_block.journal_object_id
915                )
916            })?;
917            let _ = self.handle.set(handle);
918            let mut inner = self.inner.lock();
919            reader.skip_to_end_of_block();
920            let mut writer_checkpoint = reader.journal_file_checkpoint();
921
922            // Make sure we don't accidentally use the reader from now onwards.
923            std::mem::drop(reader);
924
925            // Reset the stream to indicate that we've remounted the journal.
926            writer_checkpoint.checksum ^= RESET_XOR;
927            writer_checkpoint.version = LATEST_VERSION;
928            inner.flushed_offset = writer_checkpoint.file_offset;
929
930            // When we open the filesystem as writable, we flush the device.
931            inner.device_flushed_offset = inner.flushed_offset;
932
933            inner.writer.seek(writer_checkpoint);
934            inner.output_reset_version = true;
935            inner.valid_to = last_checkpoint.file_offset;
936            if last_checkpoint.file_offset < inner.flushed_offset {
937                inner.discard_offset = Some(last_checkpoint.file_offset);
938            }
939        }
940
941        self.objects
942            .on_replay_complete()
943            .await
944            .context("Failed to complete replay for object manager")?;
945
946        info!(checkpoint = last_checkpoint.file_offset, discarded_to; "replay complete");
947        Ok(())
948    }
949
950    async fn read_transactions(
951        &self,
952        reader: &mut JournalReader,
953        end_offset: Option<u64>,
954        object_id_filter: u64,
955    ) -> Result<JournaledTransactions, Error> {
956        let mut transactions = Vec::new();
957        let (mut device_flushed_offset, root_parent_store_object_id, root_store_object_id) = {
958            let super_block = &self.inner.lock().super_block_header;
959            (
960                super_block.super_block_journal_file_offset,
961                super_block.root_parent_store_object_id,
962                super_block.root_store_object_id,
963            )
964        };
965        let mut current_transaction = None;
966        let mut begin_flush_offsets = HashMap::default();
967        let mut stores_deleted = HashSet::new();
968        loop {
969            // Cache the checkpoint before we deserialize a record.
970            let checkpoint = reader.journal_file_checkpoint();
971            if let Some(end_offset) = end_offset {
972                if checkpoint.file_offset >= end_offset {
973                    break;
974                }
975            }
976            let result =
977                reader.deserialize().await.context("Failed to deserialize journal record")?;
978            match result {
979                ReadResult::Reset(_) => {
980                    if current_transaction.is_some() {
981                        current_transaction = None;
982                        transactions.pop();
983                    }
984                    let offset = reader.journal_file_checkpoint().file_offset;
985                    if offset > device_flushed_offset {
986                        device_flushed_offset = offset;
987                    }
988                }
989                ReadResult::Some(record) => {
990                    match record {
991                        JournalRecord::EndBlock => {
992                            reader.skip_to_end_of_block();
993                        }
994                        JournalRecord::Mutation { object_id, mutation } => {
995                            let current_transaction = match current_transaction.as_mut() {
996                                None => {
997                                    transactions.push(JournaledTransaction::new(checkpoint));
998                                    current_transaction = transactions.last_mut();
999                                    current_transaction.as_mut().unwrap()
1000                                }
1001                                Some(transaction) => transaction,
1002                            };
1003
1004                            if stores_deleted.contains(&object_id) {
1005                                bail!(
1006                                    anyhow!(FxfsError::Inconsistent)
1007                                        .context("Encountered mutations for deleted store")
1008                                );
1009                            }
1010
1011                            match &mutation {
1012                                Mutation::BeginFlush => {
1013                                    begin_flush_offsets.insert(
1014                                        object_id,
1015                                        current_transaction.checkpoint.file_offset,
1016                                    );
1017                                }
1018                                Mutation::EndFlush => {
1019                                    if let Some(offset) = begin_flush_offsets.remove(&object_id) {
1020                                        if let Some(deleted_volume) =
1021                                            &current_transaction.volume_deleted
1022                                        {
1023                                            if *deleted_volume == object_id {
1024                                                bail!(anyhow!(FxfsError::Inconsistent).context(
1025                                                    "Multiple EndFlush/DeleteVolume mutations in a \
1026                                                    single transaction for the same object"
1027                                                ));
1028                                            }
1029                                        }
1030                                        // The +1 is because we don't want to replay the transaction
1031                                        // containing the begin flush; we don't need or want to
1032                                        // replay it.
1033                                        if current_transaction
1034                                            .end_flush
1035                                            .replace((object_id, offset + 1))
1036                                            .is_some()
1037                                        {
1038                                            bail!(anyhow!(FxfsError::Inconsistent).context(
1039                                                "Multiple EndFlush mutations in a \
1040                                                 single transaction"
1041                                            ));
1042                                        }
1043                                    }
1044                                }
1045                                Mutation::DeleteVolume => {
1046                                    if let Some((flushed_object, _)) =
1047                                        &current_transaction.end_flush
1048                                    {
1049                                        if *flushed_object == object_id {
1050                                            bail!(anyhow!(FxfsError::Inconsistent).context(
1051                                                "Multiple EndFlush/DeleteVolume mutations in a \
1052                                                    single transaction for the same object"
1053                                            ));
1054                                        }
1055                                    }
1056                                    if current_transaction
1057                                        .volume_deleted
1058                                        .replace(object_id)
1059                                        .is_some()
1060                                    {
1061                                        bail!(anyhow!(FxfsError::Inconsistent).context(
1062                                            "Multiple DeleteVolume mutations in a single \
1063                                             transaction"
1064                                        ));
1065                                    }
1066                                    stores_deleted.insert(object_id);
1067                                }
1068                                _ => {}
1069                            }
1070
1071                            // If this mutation doesn't need to be applied, don't bother adding it
1072                            // to the transaction.
1073                            if (object_id_filter == INVALID_OBJECT_ID
1074                                || object_id_filter == object_id)
1075                                && self.should_apply(object_id, &current_transaction.checkpoint)
1076                            {
1077                                if object_id == root_parent_store_object_id {
1078                                    current_transaction.root_parent_mutations.push(mutation);
1079                                } else if object_id == root_store_object_id {
1080                                    current_transaction.root_mutations.push(mutation);
1081                                } else {
1082                                    current_transaction
1083                                        .non_root_mutations
1084                                        .push((object_id, mutation));
1085                                }
1086                            }
1087                        }
1088                        JournalRecord::DataChecksums(device_range, checksums, first_write) => {
1089                            let current_transaction = match current_transaction.as_mut() {
1090                                None => {
1091                                    transactions.push(JournaledTransaction::new(checkpoint));
1092                                    current_transaction = transactions.last_mut();
1093                                    current_transaction.as_mut().unwrap()
1094                                }
1095                                Some(transaction) => transaction,
1096                            };
1097                            current_transaction.checksums.push(JournaledChecksums {
1098                                device_range,
1099                                checksums,
1100                                first_write,
1101                            });
1102                        }
1103                        JournalRecord::Commit => {
1104                            if let Some(&mut JournaledTransaction {
1105                                ref checkpoint,
1106                                ref root_parent_mutations,
1107                                ref mut end_offset,
1108                                ..
1109                            }) = current_transaction.take()
1110                            {
1111                                for mutation in root_parent_mutations {
1112                                    // Snoop the mutations for any that might apply to the journal
1113                                    // file so that we can pass them to the reader so that it can
1114                                    // read the journal file.
1115                                    if let Mutation::ObjectStore(ObjectStoreMutation {
1116                                        item:
1117                                            Item {
1118                                                key:
1119                                                    ObjectKey {
1120                                                        object_id,
1121                                                        data:
1122                                                            ObjectKeyData::Attribute(
1123                                                                AttributeId::DATA,
1124                                                                AttributeKey::Extent(extent),
1125                                                            ),
1126                                                        ..
1127                                                    },
1128                                                value:
1129                                                    ObjectValue::Extent(ExtentValue::Some {
1130                                                        device_offset,
1131                                                        ..
1132                                                    }),
1133                                                ..
1134                                            },
1135                                        ..
1136                                    }) = mutation
1137                                    {
1138                                        // Add the journal extents we find on the way to our
1139                                        // reader.
1140                                        let handle = reader.handle();
1141                                        if *object_id != handle.object_id() {
1142                                            continue;
1143                                        }
1144                                        if let Some(end_offset) = handle.end_offset() {
1145                                            if extent.start != end_offset {
1146                                                bail!(anyhow!(FxfsError::Inconsistent).context(
1147                                                    format!(
1148                                                        "Unexpected journal extent {:?} -> {}, \
1149                                                           expected start: {}",
1150                                                        *extent, device_offset, end_offset,
1151                                                    )
1152                                                ));
1153                                            }
1154                                        }
1155                                        handle.push_extent(
1156                                            checkpoint.file_offset,
1157                                            *device_offset
1158                                                ..*device_offset
1159                                                    + extent.length().context("Invalid extent")?,
1160                                        );
1161                                    }
1162                                }
1163                                *end_offset = reader.journal_file_checkpoint().file_offset;
1164                            }
1165                        }
1166                        JournalRecord::Discard(offset) => {
1167                            if offset == 0 {
1168                                bail!(
1169                                    anyhow!(FxfsError::Inconsistent)
1170                                        .context("Invalid offset for Discard")
1171                                );
1172                            }
1173                            if let Some(transaction) = current_transaction.as_ref() {
1174                                if transaction.checkpoint.file_offset < offset {
1175                                    // Odd, but OK.
1176                                    continue;
1177                                }
1178                            }
1179                            current_transaction = None;
1180                            while let Some(transaction) = transactions.last() {
1181                                if transaction.checkpoint.file_offset < offset {
1182                                    break;
1183                                }
1184                                transactions.pop();
1185                            }
1186                            reader.handle().discard_extents(offset);
1187                        }
1188                        JournalRecord::DidFlushDevice(offset) => {
1189                            if offset > device_flushed_offset {
1190                                device_flushed_offset = offset;
1191                            }
1192                        }
1193                    }
1194                }
1195                // This is expected when we reach the end of the journal stream.
1196                ReadResult::ChecksumMismatch => break,
1197            }
1198        }
1199
1200        // Discard any uncommitted transaction.
1201        if current_transaction.is_some() {
1202            transactions.pop();
1203        }
1204
1205        Ok(JournaledTransactions { transactions, device_flushed_offset })
1206    }
1207
1208    /// Creates an empty filesystem with the minimum viable objects (including a root parent and
1209    /// root store but no further child stores).
1210    pub async fn init_empty(&self, filesystem: Arc<FxFilesystem>) -> Result<(), Error> {
1211        // The following constants are only used at format time. When mounting, the recorded values
1212        // in the superblock should be used.  The root parent store does not have a parent, but
1213        // needs an object ID to be registered with ObjectManager, so it cannot collide (i.e. have
1214        // the same object ID) with any objects in the root store that use the journal to track
1215        // mutations.
1216        const INIT_ROOT_PARENT_STORE_OBJECT_ID: u64 = 3;
1217        const INIT_ROOT_STORE_OBJECT_ID: u64 = 4;
1218        const INIT_ALLOCATOR_OBJECT_ID: u64 = 5;
1219
1220        info!(device_size = filesystem.device().size(); "Formatting");
1221
1222        let checkpoint = JournalCheckpoint {
1223            version: LATEST_VERSION,
1224            ..self.inner.lock().writer.journal_file_checkpoint()
1225        };
1226
1227        let mut current_generation = 1;
1228        if filesystem.options().image_builder_mode.is_some() {
1229            // Note that in non-image_builder_mode we write both superblocks when we format
1230            // (in FxFilesystemBuilder::open). In image_builder_mode we only write once at the end
1231            // as part of finalize(), which is why we must make sure the generation we write is
1232            // newer than any existing generation.
1233
1234            // Note: This should is the *filesystem* block size, not the device block size which
1235            // is currently always 4096 (https://fxbug.dev/42063349)
1236            let block_size = filesystem.block_size();
1237            match self.read_superblocks(filesystem.device(), block_size).await {
1238                Ok((super_block, _)) => {
1239                    log::info!(
1240                        "Found existing superblock with generation {}. Bumping by 1.",
1241                        super_block.generation
1242                    );
1243                    current_generation = super_block.generation.wrapping_add(1);
1244                }
1245                Err(_) => {
1246                    // TODO(https://fxbug.dev/463757813): It's not unusual to fail to read
1247                    // superblocks when we're formatting a new filesystem but we should probably
1248                    // fail the format if we get an IO error.
1249                }
1250            }
1251        }
1252
1253        let root_parent = ObjectStore::new_empty(
1254            None,
1255            INIT_ROOT_PARENT_STORE_OBJECT_ID,
1256            filesystem.clone(),
1257            Box::new(NullCache {}),
1258        );
1259        self.objects.set_root_parent_store(root_parent.clone());
1260
1261        let allocator = Arc::new(Allocator::new(filesystem.clone(), INIT_ALLOCATOR_OBJECT_ID));
1262        self.objects.set_allocator(allocator.clone());
1263        self.objects.init_metadata_reservation()?;
1264
1265        let journal_handle;
1266        let super_block_a_handle;
1267        let super_block_b_handle;
1268        let root_store;
1269        let mut transaction = root_parent
1270            .new_transaction(
1271                lock_keys![],
1272                Options { skip_journal_checks: true, ..Default::default() },
1273            )
1274            .await?;
1275        root_store = root_parent
1276            .new_child_store(
1277                &mut transaction,
1278                NewChildStoreOptions { object_id: INIT_ROOT_STORE_OBJECT_ID, ..Default::default() },
1279                Box::new(NullCache {}),
1280            )
1281            .await
1282            .context("new_child_store")?;
1283        self.objects.set_root_store(root_store.clone());
1284
1285        allocator.create(&mut transaction).await?;
1286
1287        // Create the super-block objects...
1288        super_block_a_handle = ObjectStore::create_object_with_id(
1289            &root_store,
1290            &mut transaction,
1291            ReservedId::new(&root_store, NonZero::new(SuperBlockInstance::A.object_id()).unwrap()),
1292            HandleOptions::default(),
1293            None,
1294        )
1295        .context("create super block")?;
1296        root_store.update_last_object_id(SuperBlockInstance::A.object_id());
1297        super_block_a_handle
1298            .extend(&mut transaction, SuperBlockInstance::A.first_extent())
1299            .await
1300            .context("extend super block")?;
1301        super_block_b_handle = ObjectStore::create_object_with_id(
1302            &root_store,
1303            &mut transaction,
1304            ReservedId::new(&root_store, NonZero::new(SuperBlockInstance::B.object_id()).unwrap()),
1305            HandleOptions::default(),
1306            None,
1307        )
1308        .context("create super block")?;
1309        root_store.update_last_object_id(SuperBlockInstance::B.object_id());
1310        super_block_b_handle
1311            .extend(&mut transaction, SuperBlockInstance::B.first_extent())
1312            .await
1313            .context("extend super block")?;
1314
1315        // the journal object...
1316        journal_handle = ObjectStore::create_object(
1317            &root_parent,
1318            &mut transaction,
1319            journal_handle_options(),
1320            None,
1321        )
1322        .await
1323        .context("create journal")?;
1324        if self.inner.lock().image_builder_mode.is_none() {
1325            let mut file_range = 0..self.chunk_size();
1326            journal_handle
1327                .preallocate_range(&mut transaction, &mut file_range)
1328                .await
1329                .context("preallocate journal")?;
1330            if file_range.start < file_range.end {
1331                bail!("preallocate_range returned too little space");
1332            }
1333        }
1334
1335        // Write the root store object info.
1336        root_store.create(&mut transaction).await?;
1337
1338        // The root parent graveyard.
1339        root_parent.set_graveyard_directory_object_id(
1340            Graveyard::create(&mut transaction, &root_parent).await?,
1341        );
1342
1343        transaction.commit().await?;
1344
1345        self.inner.lock().super_block_header = SuperBlockHeader::new(
1346            current_generation,
1347            root_parent.store_object_id(),
1348            root_parent.graveyard_directory_object_id(),
1349            root_store.store_object_id(),
1350            allocator.object_id(),
1351            journal_handle.object_id(),
1352            checkpoint,
1353            /* earliest_version: */ LATEST_VERSION,
1354        );
1355
1356        // Initialize the journal writer.
1357        let _ = self.handle.set(journal_handle);
1358        Ok(())
1359    }
1360
1361    /// Normally we allocate the journal when creating the filesystem.
1362    /// This is used image_builder_mode when journal allocation is done last.
1363    pub async fn allocate_journal(&self) -> Result<(), Error> {
1364        let handle = self.handle.get().unwrap();
1365        let mut transaction = handle
1366            .store()
1367            .new_transaction(
1368                lock_keys![LockKey::object(handle.store().store_object_id(), handle.object_id()),],
1369                Options { skip_journal_checks: true, ..Default::default() },
1370            )
1371            .await?;
1372        let mut file_range = 0..self.chunk_size();
1373        self.handle
1374            .get()
1375            .unwrap()
1376            .preallocate_range(&mut transaction, &mut file_range)
1377            .await
1378            .context("preallocate journal")?;
1379        if file_range.start < file_range.end {
1380            bail!("preallocate_range returned too little space");
1381        }
1382        transaction.commit().await?;
1383        Ok(())
1384    }
1385
1386    pub async fn init_superblocks(&self) -> Result<(), Error> {
1387        // Overwrite both superblocks.
1388        for _ in 0..2 {
1389            self.write_super_block().await?;
1390        }
1391        Ok(())
1392    }
1393
1394    /// Takes a snapshot of all journaled transactions which affect |object_id| since its last
1395    /// flush.
1396    /// The caller is responsible for locking; it must ensure that the journal is not trimmed during
1397    /// this call.  For example, a Flush lock could be held on the object in question (assuming that
1398    /// object has data to flush and is registered with ObjectManager).
1399    pub async fn read_transactions_for_object(
1400        &self,
1401        object_id: u64,
1402    ) -> Result<Vec<JournaledTransaction>, Error> {
1403        let handle = self.handle.get().expect("No journal handle");
1404        // Reopen the handle since JournalReader needs an owned handle.
1405        let handle = ObjectStore::open_object(
1406            handle.owner(),
1407            handle.object_id(),
1408            journal_handle_options(),
1409            None,
1410        )
1411        .await?;
1412
1413        let checkpoint = match self.objects.journal_checkpoint(object_id) {
1414            Some(checkpoint) => checkpoint,
1415            None => return Ok(vec![]),
1416        };
1417        let mut reader = JournalReader::new(handle, &checkpoint);
1418        // Record the current end offset and only read to there, so we don't accidentally read any
1419        // partially flushed blocks.
1420        let end_offset = self.inner.lock().valid_to;
1421        Ok(self.read_transactions(&mut reader, Some(end_offset), object_id).await?.transactions)
1422    }
1423
1424    /// Commits a transaction.  This is not thread safe; the caller must take appropriate locks.
1425    pub async fn commit(&self, transaction: &mut Transaction<'_>) -> Result<u64, Error> {
1426        if transaction.is_empty() {
1427            return Ok(self.inner.lock().writer.journal_file_checkpoint().file_offset);
1428        }
1429
1430        self.pre_commit(transaction).await?;
1431        Ok(self.write_and_apply_mutations(transaction))
1432    }
1433
1434    // Before we commit, we might need to extend the journal or write pending records to the
1435    // journal.
1436    async fn pre_commit(&self, _transaction: &Transaction<'_>) -> Result<(), Error> {
1437        let handle;
1438
1439        let (size, zero_offset) = {
1440            let mut inner = self.inner.lock();
1441
1442            // If this is the first write after a RESET, we need to output version first.
1443            if std::mem::take(&mut inner.output_reset_version) {
1444                LATEST_VERSION.serialize_into(&mut inner.writer)?;
1445            }
1446
1447            if let Some(discard_offset) = inner.discard_offset {
1448                JournalRecord::Discard(discard_offset).serialize_into(&mut inner.writer)?;
1449                inner.discard_offset = None;
1450            }
1451
1452            if inner.needs_did_flush_device {
1453                let offset = inner.device_flushed_offset;
1454                JournalRecord::DidFlushDevice(offset).serialize_into(&mut inner.writer)?;
1455                inner.needs_did_flush_device = false;
1456            }
1457
1458            handle = match self.handle.get() {
1459                None => return Ok(()),
1460                Some(x) => x,
1461            };
1462
1463            let file_offset = inner.writer.journal_file_checkpoint().file_offset;
1464
1465            let size = handle.get_size();
1466            let size = if file_offset + self.chunk_size() > size { Some(size) } else { None };
1467
1468            if size.is_none()
1469                && inner.zero_offset.is_none()
1470                && !self.objects.needs_borrow_for_journal(file_offset)
1471            {
1472                return Ok(());
1473            }
1474
1475            (size, inner.zero_offset)
1476        };
1477
1478        let mut transaction = handle
1479            .new_transaction_with_options(Options {
1480                skip_journal_checks: true,
1481                borrow_metadata_space: true,
1482                allocator_reservation: Some(self.objects.metadata_reservation()),
1483                ..Default::default()
1484            })
1485            .await?;
1486        if let Some(size) = size {
1487            handle
1488                .preallocate_range(&mut transaction, &mut (size..size + self.chunk_size()))
1489                .await?;
1490        }
1491        if let Some(zero_offset) = zero_offset {
1492            handle.zero(&mut transaction, 0..zero_offset).await?;
1493        }
1494
1495        // We can't use regular transaction commit, because that can cause re-entrancy issues, so
1496        // instead we just apply the transaction directly here.
1497        self.write_and_apply_mutations(&mut transaction);
1498
1499        let mut inner = self.inner.lock();
1500
1501        // Make sure the transaction to extend the journal made it to the journal within the old
1502        // size, since otherwise, it won't be possible to replay.
1503        if let Some(size) = size {
1504            assert!(inner.writer.journal_file_checkpoint().file_offset < size);
1505        }
1506
1507        if inner.zero_offset == zero_offset {
1508            inner.zero_offset = None;
1509        }
1510
1511        Ok(())
1512    }
1513
1514    // Determines whether a mutation at the given checkpoint should be applied.  During replay, not
1515    // all records should be applied because the object store or allocator might already contain the
1516    // mutation.  After replay, that obviously isn't the case and we want to apply all mutations.
1517    fn should_apply(&self, object_id: u64, journal_file_checkpoint: &JournalCheckpoint) -> bool {
1518        let super_block_header = &self.inner.lock().super_block_header;
1519        let offset = super_block_header
1520            .journal_file_offsets
1521            .get(&object_id)
1522            .cloned()
1523            .unwrap_or(super_block_header.super_block_journal_file_offset);
1524        journal_file_checkpoint.file_offset >= offset
1525    }
1526
1527    /// Flushes previous writes to the device and then writes out a new super-block.
1528    /// Callers must ensure that we do not make concurrent calls.
1529    async fn write_super_block(&self) -> Result<(), Error> {
1530        let root_parent_store = self.objects.root_parent_store();
1531
1532        // We need to flush previous writes to the device since the new super-block we are writing
1533        // relies on written data being observable, and we also need to lock the root parent store
1534        // so that no new entries are written to it whilst we are writing the super-block, and for
1535        // that we use the write lock.
1536        let old_layers;
1537        let old_super_block_offset;
1538        let mut new_super_block_header;
1539        let checkpoint;
1540        let borrowed;
1541
1542        {
1543            let _sync_guard = debug_assert_not_too_long!(self.sync_mutex.lock());
1544            {
1545                let _write_guard = self.writer_mutex.lock();
1546                (checkpoint, borrowed) = self.pad_to_block()?;
1547                old_layers = super_block::compact_root_parent(&*root_parent_store)?;
1548            }
1549            self.flush_device(checkpoint.file_offset)
1550                .await
1551                .context("flush failed when writing superblock")?;
1552        }
1553
1554        new_super_block_header = self.inner.lock().super_block_header.clone();
1555
1556        old_super_block_offset = new_super_block_header.journal_checkpoint.file_offset;
1557
1558        let (journal_file_offsets, min_checkpoint) = self.objects.journal_file_offsets();
1559
1560        new_super_block_header.generation = new_super_block_header.generation.wrapping_add(1);
1561        new_super_block_header.super_block_journal_file_offset = checkpoint.file_offset;
1562        new_super_block_header.journal_checkpoint = min_checkpoint.unwrap_or(checkpoint);
1563        new_super_block_header.journal_checkpoint.version = LATEST_VERSION;
1564        new_super_block_header.journal_file_offsets = journal_file_offsets;
1565        new_super_block_header.borrowed_metadata_space = borrowed;
1566
1567        self.super_block_manager
1568            .save(
1569                new_super_block_header.clone(),
1570                self.objects.root_parent_store().filesystem(),
1571                old_layers,
1572            )
1573            .await?;
1574        {
1575            let mut inner = self.inner.lock();
1576            inner.super_block_header = new_super_block_header;
1577            inner.zero_offset = Some(round_down(old_super_block_offset, BLOCK_SIZE));
1578        }
1579
1580        Ok(())
1581    }
1582
1583    /// Flushes any buffered journal data to the device.  Note that this does not flush the device
1584    /// unless the flush_device option is set, in which case data should have been persisted to
1585    /// lower layers.  If a precondition is supplied, it is evaluated and the sync will be skipped
1586    /// if it returns false.  This allows callers to check a condition whilst a lock is held.  If a
1587    /// sync is performed, this function returns the checkpoint that was flushed and the amount of
1588    /// borrowed metadata space at the point it was flushed.
1589    pub async fn sync(
1590        &self,
1591        options: SyncOptions<'_>,
1592    ) -> Result<Option<(JournalCheckpoint, u64)>, Error> {
1593        let _guard = debug_assert_not_too_long!(self.sync_mutex.lock());
1594
1595        let (checkpoint, borrowed) = {
1596            if let Some(precondition) = options.precondition {
1597                if !precondition() {
1598                    return Ok(None);
1599                }
1600            }
1601
1602            // This guard is required so that we don't insert an EndBlock record in the middle of a
1603            // transaction.
1604            let _guard = self.writer_mutex.lock();
1605
1606            self.pad_to_block()?
1607        };
1608
1609        if options.flush_device {
1610            self.flush_device(checkpoint.file_offset).await.context("sync: flush failed")?;
1611        }
1612
1613        Ok(Some((checkpoint, borrowed)))
1614    }
1615
1616    // Returns the checkpoint as it was prior to padding.  This is done because the super block
1617    // needs to record where the last transaction ends and it's the next transaction that pays the
1618    // price of the padding.
1619    fn pad_to_block(&self) -> Result<(JournalCheckpoint, u64), Error> {
1620        let mut inner = self.inner.lock();
1621        let checkpoint = inner.writer.journal_file_checkpoint();
1622        if checkpoint.file_offset % BLOCK_SIZE != 0 {
1623            JournalRecord::EndBlock.serialize_into(&mut inner.writer)?;
1624            inner.writer.pad_to_block()?;
1625            if let Some(waker) = inner.flush_waker.take() {
1626                waker.wake();
1627            }
1628        }
1629        Ok((checkpoint, self.objects.borrowed_metadata_space()))
1630    }
1631
1632    async fn flush_device(&self, checkpoint_offset: u64) -> Result<(), Error> {
1633        assert!(
1634            self.inner.lock().image_builder_mode.is_none(),
1635            "flush_device called in image builder mode"
1636        );
1637        debug_assert_not_too_long!(poll_fn(|ctx| {
1638            let mut inner = self.inner.lock();
1639            if inner.flushed_offset >= checkpoint_offset {
1640                Poll::Ready(Ok(()))
1641            } else if inner.terminate {
1642                let context = inner
1643                    .terminate_reason
1644                    .as_ref()
1645                    .map(|e| format!("Journal closed with error: {:?}", e))
1646                    .unwrap_or_else(|| "Journal closed".to_string());
1647                Poll::Ready(Err(anyhow!(FxfsError::JournalFlushError).context(context)))
1648            } else {
1649                inner.sync_waker = Some(ctx.waker().clone());
1650                Poll::Pending
1651            }
1652        }))?;
1653
1654        let needs_flush = self.inner.lock().device_flushed_offset < checkpoint_offset;
1655        if needs_flush {
1656            let trace = self.trace.load(Ordering::Relaxed);
1657            if trace {
1658                info!("J: start flush device");
1659            }
1660            self.handle.get().unwrap().flush_device().await?;
1661            if trace {
1662                info!("J: end flush device");
1663            }
1664
1665            // We need to write a DidFlushDevice record at some point, but if we are in the
1666            // process of shutting down the filesystem, we want to leave the journal clean to
1667            // avoid there being log messages complaining about unwritten journal data, so we
1668            // queue it up so that the next transaction will trigger this record to be written.
1669            // If we are shutting down, that will never happen but since the DidFlushDevice
1670            // message is purely advisory (it reduces the number of checksums we have to verify
1671            // during replay), it doesn't matter if it isn't written.
1672            {
1673                let mut inner = self.inner.lock();
1674                inner.device_flushed_offset = checkpoint_offset;
1675                inner.needs_did_flush_device = true;
1676            }
1677
1678            // Tell the allocator that we flushed the device so that it can now start using
1679            // space that was deallocated.
1680            self.objects.allocator().did_flush_device(checkpoint_offset);
1681            if trace {
1682                info!("J: did flush device");
1683            }
1684        }
1685
1686        Ok(())
1687    }
1688
1689    /// Returns a copy of the super-block header.
1690    pub fn super_block_header(&self) -> SuperBlockHeader {
1691        self.inner.lock().super_block_header.clone()
1692    }
1693
1694    /// Waits for there to be sufficient space in the journal.
1695    pub async fn check_journal_space(&self) -> Result<(), Error> {
1696        loop {
1697            debug_assert_not_too_long!({
1698                let inner = self.inner.lock();
1699                if inner.terminate {
1700                    // If the flush error is set, this will never make progress, since we can't
1701                    // extend the journal any more.
1702                    let context = inner
1703                        .terminate_reason
1704                        .as_ref()
1705                        .map(|e| format!("Journal closed with error: {:?}", e))
1706                        .unwrap_or_else(|| "Journal closed".to_string());
1707                    break Err(anyhow!(FxfsError::JournalFlushError).context(context));
1708                }
1709                if self.objects.last_end_offset()
1710                    - inner.super_block_header.journal_checkpoint.file_offset
1711                    < inner.reclaim_size
1712                {
1713                    break Ok(());
1714                }
1715                if inner.image_builder_mode.is_some() {
1716                    break Ok(());
1717                }
1718                if inner.disable_compactions {
1719                    break Err(
1720                        anyhow!(FxfsError::JournalFlushError).context("Compactions disabled")
1721                    );
1722                }
1723                self.reclaim_event.listen()
1724            });
1725        }
1726    }
1727
1728    fn chunk_size(&self) -> u64 {
1729        CHUNK_SIZE
1730    }
1731
1732    fn write_and_apply_mutations(&self, transaction: &mut Transaction<'_>) -> u64 {
1733        let checkpoint_before;
1734        let checkpoint_after;
1735        {
1736            let _guard = self.writer_mutex.lock();
1737            checkpoint_before = {
1738                let mut inner = self.inner.lock();
1739                if transaction.includes_write() {
1740                    inner.needs_barrier = true;
1741                }
1742                let checkpoint = inner.writer.journal_file_checkpoint();
1743                for TxnMutation { object_id, mutation, .. } in transaction.mutations() {
1744                    self.objects.write_mutation(
1745                        *object_id,
1746                        mutation,
1747                        Writer(*object_id, &mut inner.writer),
1748                    );
1749                }
1750                checkpoint
1751            };
1752            let maybe_mutation =
1753                self.objects.apply_transaction(transaction, &checkpoint_before).expect(
1754                    "apply_transaction should not fail in live mode; \
1755                     filesystem will be in an inconsistent state",
1756                );
1757            checkpoint_after = {
1758                let mut inner = self.inner.lock();
1759                if let Some(mutation) = maybe_mutation {
1760                    inner
1761                        .writer
1762                        .write_record(&JournalRecord::Mutation { object_id: 0, mutation })
1763                        .unwrap();
1764                }
1765                for (device_range, checksums, first_write) in
1766                    transaction.take_checksums().into_iter()
1767                {
1768                    inner
1769                        .writer
1770                        .write_record(&JournalRecord::DataChecksums(
1771                            device_range,
1772                            Checksums::fletcher(checksums),
1773                            first_write,
1774                        ))
1775                        .unwrap();
1776                }
1777                inner.writer.write_record(&JournalRecord::Commit).unwrap();
1778
1779                inner.writer.journal_file_checkpoint()
1780            };
1781        }
1782        self.objects.did_commit_transaction(
1783            transaction,
1784            &checkpoint_before,
1785            checkpoint_after.file_offset,
1786        );
1787
1788        if let Some(waker) = self.inner.lock().flush_waker.take() {
1789            waker.wake();
1790        }
1791
1792        checkpoint_before.file_offset
1793    }
1794
1795    /// This task will flush journal data to the device when there is data that needs flushing, and
1796    /// trigger compactions when short of journal space.  It will return after the terminate method
1797    /// has been called, or an error is encountered with either flushing or compaction.
1798    pub async fn flush_task(self: Arc<Self>) {
1799        let mut flush_fut = None;
1800        let mut compact_fut = None;
1801        let mut flush_error = false;
1802        poll_fn(|ctx| {
1803            loop {
1804                {
1805                    let mut inner = self.inner.lock();
1806                    if flush_fut.is_none() && !flush_error && self.handle.get().is_some() {
1807                        let flushable = inner.writer.flushable_bytes();
1808                        if flushable > 0 {
1809                            flush_fut = Some(Box::pin(self.flush(flushable)));
1810                        }
1811                    }
1812                    if inner.terminate && flush_fut.is_none() && compact_fut.is_none() {
1813                        return Poll::Ready(());
1814                    }
1815                    // `journal_bytes` refers to bytes in the journal that haven't yet been flushed
1816                    // to the layer files. It increases with each transaction and decreases when
1817                    // compactions complete. The flush task is woken whenever a transaction is
1818                    // committed and we should see this metric updated regularly.
1819                    let journal_bytes = self.objects.last_end_offset()
1820                        - inner.super_block_header.journal_checkpoint.file_offset;
1821                    fxfs_trace::counter!("journal-bytes", 0, "total" => journal_bytes);
1822                    // The / 2 is here because after compacting, we cannot reclaim the space until
1823                    // the _next_ time we flush the device since the super-block is not guaranteed
1824                    // to persist until then.
1825                    if compact_fut.is_none()
1826                        && !inner.terminate
1827                        && !inner.disable_compactions
1828                        && inner.image_builder_mode.is_none()
1829                        && journal_bytes > inner.reclaim_size / 2
1830                    {
1831                        compact_fut = Some(Box::pin(self.compact()));
1832                        inner.compaction_running = true;
1833                    }
1834                    inner.flush_waker = Some(ctx.waker().clone());
1835                }
1836                let mut pending = true;
1837                if let Some(fut) = flush_fut.as_mut() {
1838                    if let Poll::Ready(result) = fut.poll_unpin(ctx) {
1839                        if let Err(e) = result {
1840                            self.inner.lock().terminate(Some(e.context("Flush error")));
1841                            self.reclaim_event.notify(usize::MAX);
1842                            flush_error = true;
1843                        }
1844                        flush_fut = None;
1845                        pending = false;
1846                    }
1847                }
1848                if let Some(fut) = compact_fut.as_mut() {
1849                    if let Poll::Ready(result) = fut.poll_unpin(ctx) {
1850                        let mut inner = self.inner.lock();
1851                        if let Err(e) = result {
1852                            inner.terminate(Some(e.context("Compaction error")));
1853                        }
1854                        compact_fut = None;
1855                        inner.compaction_running = false;
1856                        self.reclaim_event.notify(usize::MAX);
1857                        pending = false;
1858                        fxfs_trace::counter!(
1859                            "journal-bytes",
1860                            0,
1861                            "total" => self.objects.last_end_offset()
1862                                - inner.super_block_header.journal_checkpoint.file_offset
1863                        );
1864                    }
1865                }
1866                if pending {
1867                    return Poll::Pending;
1868                }
1869            }
1870        })
1871        .await;
1872    }
1873
1874    /// Returns a yielder that can be used for compactions.
1875    pub fn get_compaction_yielder(&self) -> CompactionYielder<'_> {
1876        CompactionYielder::new(self)
1877    }
1878
1879    async fn flush(&self, amount: usize) -> Result<(), Error> {
1880        let handle = self.handle.get().unwrap();
1881        let mut buf = handle.allocate_buffer(amount).await;
1882        let (offset, len, barrier_on_first_write) = {
1883            let mut inner = self.inner.lock();
1884            let offset = inner.writer.take_flushable(buf.as_mut());
1885            let barrier_on_first_write = inner.needs_barrier && inner.barriers_enabled;
1886            // Reset `needs_barrier` before instead of after the overwrite in case a txn commit
1887            // that contains data happens during the overwrite.
1888            inner.needs_barrier = false;
1889            (offset, buf.len() as u64, barrier_on_first_write)
1890        };
1891        self.handle
1892            .get()
1893            .unwrap()
1894            .overwrite(
1895                offset,
1896                buf.as_mut(),
1897                OverwriteOptions { barrier_on_first_write, ..Default::default() },
1898            )
1899            .await?;
1900
1901        let mut inner = self.inner.lock();
1902        if let Some(waker) = inner.sync_waker.take() {
1903            waker.wake();
1904        }
1905        inner.flushed_offset = offset + len;
1906        inner.valid_to = inner.flushed_offset;
1907        Ok(())
1908    }
1909
1910    #[trace]
1911    async fn compact(&self) -> Result<(), Error> {
1912        assert!(
1913            self.inner.lock().image_builder_mode.is_none(),
1914            "compact called in image builder mode"
1915        );
1916        let bytes_before = self.objects.compaction_bytes_written();
1917        let _measure = crate::metrics::DurationMeasureScope::new(
1918            &crate::metrics::lsm_tree_metrics().journal_compaction_time,
1919        );
1920        crate::metrics::lsm_tree_metrics().journal_compactions_total.add(1);
1921        let trace = self.trace.load(Ordering::Relaxed);
1922        debug!("Compaction starting");
1923        if trace {
1924            info!("J: start compaction");
1925        }
1926        let earliest_version = self.objects.flush().await.context("Failed to flush objects")?;
1927        self.inner.lock().super_block_header.earliest_version = earliest_version;
1928        self.write_super_block().await.context("Failed to write superblock")?;
1929        if trace {
1930            info!("J: end compaction");
1931        }
1932        debug!("Compaction finished");
1933        let bytes_after = self.objects.compaction_bytes_written();
1934        crate::metrics::lsm_tree_metrics()
1935            .journal_compaction_bytes_written
1936            .add(bytes_after.saturating_sub(bytes_before));
1937        Ok(())
1938    }
1939
1940    /// This should generally NOT be called externally. It is public to allow use by FIDL service
1941    /// fxfs.Debug.
1942    pub async fn force_compact(&self) -> Result<(), Error> {
1943        self.inner.lock().forced_compaction = true;
1944        scopeguard::defer! { self.inner.lock().forced_compaction = false; }
1945        self.compact().await
1946    }
1947
1948    pub async fn stop_compactions(&self) {
1949        loop {
1950            debug_assert_not_too_long!({
1951                let mut inner = self.inner.lock();
1952                inner.disable_compactions = true;
1953                if !inner.compaction_running {
1954                    return;
1955                }
1956                self.reclaim_event.listen()
1957            });
1958        }
1959    }
1960
1961    /// Creates a lazy inspect node named `str` under `parent` which will yield statistics for the
1962    /// journal when queried.
1963    pub fn track_statistics(self: &Arc<Self>, parent: &fuchsia_inspect::Node, name: &str) {
1964        let this = Arc::downgrade(self);
1965        parent.record_lazy_child(name, move || {
1966            let this_clone = this.clone();
1967            async move {
1968                let inspector = fuchsia_inspect::Inspector::default();
1969                if let Some(this) = this_clone.upgrade() {
1970                    let (journal_min, journal_max, journal_reclaim_size) = {
1971                        // TODO(https://fxbug.dev/42069513): Push-back or rate-limit to prevent DoS.
1972                        let inner = this.inner.lock();
1973                        (
1974                            round_down(
1975                                inner.super_block_header.journal_checkpoint.file_offset,
1976                                BLOCK_SIZE,
1977                            ),
1978                            inner.flushed_offset,
1979                            inner.reclaim_size,
1980                        )
1981                    };
1982                    let root = inspector.root();
1983                    root.record_uint("journal_min_offset", journal_min);
1984                    root.record_uint("journal_max_offset", journal_max);
1985                    root.record_uint("journal_size", journal_max - journal_min);
1986                    root.record_uint("journal_reclaim_size", journal_reclaim_size);
1987
1988                    // TODO(https://fxbug.dev/42068224): Post-compute rather than manually computing metrics.
1989                    if let Some(x) = round_div(
1990                        100 * (journal_max - journal_min),
1991                        this.objects.allocator().get_disk_bytes(),
1992                    ) {
1993                        root.record_uint("journal_size_to_disk_size_percent", x);
1994                    }
1995                }
1996                Ok(inspector)
1997            }
1998            .boxed()
1999        });
2000    }
2001
2002    /// Terminate all journal activity.
2003    pub fn terminate(&self) {
2004        self.inner.lock().terminate(/*reason*/ None);
2005        self.reclaim_event.notify(usize::MAX);
2006    }
2007}
2008
2009/// Wrapper to allow records to be written to the journal.
2010pub struct Writer<'a>(u64, &'a mut JournalWriter);
2011
2012impl Writer<'_> {
2013    pub fn write(&mut self, mutation: Mutation) {
2014        self.1.write_record(&JournalRecord::Mutation { object_id: self.0, mutation }).unwrap();
2015    }
2016}
2017
2018#[cfg(target_os = "fuchsia")]
2019mod yielder {
2020    use super::Journal;
2021    use crate::lsm_tree::Yielder;
2022    use fuchsia_async as fasync;
2023
2024    /// CompactionYielder uses fuchsia-async to yield if other tasks are being polled, which should
2025    /// be a proxy for how busy the system is.  We can afford to delay compactions for a small
2026    /// amount of time but not so long that we end up blocking new transactions.
2027    pub struct CompactionYielder<'a> {
2028        journal: &'a Journal,
2029        low_priority_task: Option<fasync::LowPriorityTask>,
2030    }
2031
2032    impl<'a> CompactionYielder<'a> {
2033        pub fn new(journal: &'a Journal) -> Self {
2034            Self { journal, low_priority_task: None }
2035        }
2036    }
2037
2038    impl Yielder for CompactionYielder<'_> {
2039        async fn yield_now(&mut self) {
2040            // We will wait for the executor to be idle for 4ms, but no longer than 16ms.  We need
2041            // to cap the maximum amount of time we wait in case we've reached a point where
2042            // compaction is now urgent or else we could block new transactions.
2043            const IDLE_PERIOD: zx::MonotonicDuration = zx::MonotonicDuration::from_millis(4);
2044            const MAX_YIELD_DURATION: zx::MonotonicDuration =
2045                zx::MonotonicDuration::from_millis(16);
2046
2047            {
2048                let inner = self.journal.inner.lock();
2049                if inner.forced_compaction {
2050                    return;
2051                }
2052                let outstanding = self.journal.objects.last_end_offset()
2053                    - inner.super_block_header.journal_checkpoint.file_offset;
2054                let half_reclaim_size = inner.reclaim_size / 2;
2055                if outstanding
2056                    .checked_sub(half_reclaim_size)
2057                    .is_some_and(|x| x >= half_reclaim_size / 2)
2058                {
2059                    // If we have got to the point where we have used up 3/4 of reclaim size in the
2060                    // journal, do not delay any further.  If we continue to yield we will get to
2061                    // the point where we block new transactions.
2062                    self.low_priority_task = None;
2063                    return;
2064                }
2065            }
2066
2067            self.low_priority_task
2068                .get_or_insert_with(|| fasync::LowPriorityTask::new())
2069                .wait_until_idle_for(
2070                    IDLE_PERIOD,
2071                    fasync::MonotonicInstant::after(MAX_YIELD_DURATION),
2072                )
2073                .await;
2074        }
2075    }
2076}
2077
2078#[cfg(not(target_os = "fuchsia"))]
2079mod yielder {
2080    use super::Journal;
2081    use crate::lsm_tree::Yielder;
2082
2083    #[expect(dead_code)]
2084    pub struct CompactionYielder<'a>(&'a Journal);
2085
2086    impl<'a> CompactionYielder<'a> {
2087        pub fn new(journal: &'a Journal) -> Self {
2088            Self(journal)
2089        }
2090    }
2091
2092    impl Yielder for CompactionYielder<'_> {
2093        async fn yield_now(&mut self) {}
2094    }
2095}
2096
2097pub use yielder::*;
2098
2099#[cfg(test)]
2100mod tests {
2101    use super::SuperBlockInstance;
2102    use crate::filesystem::{FxFilesystem, FxFilesystemBuilder, SyncOptions};
2103    use crate::fsck::fsck;
2104    use crate::object_handle::{ObjectHandle, ReadObjectHandle, WriteObjectHandle};
2105    use crate::object_store::directory::Directory;
2106    use crate::object_store::transaction::Options;
2107    use crate::object_store::volume::root_volume;
2108    use crate::object_store::{
2109        HandleOptions, LockKey, NewChildStoreOptions, ObjectStore, StoreOptions, lock_keys,
2110    };
2111    #[cfg(target_os = "fuchsia")]
2112    use fuchsia_async::TestExecutor;
2113    use fuchsia_async::{self as fasync, MonotonicDuration};
2114    use storage_device::DeviceHolder;
2115    use storage_device::fake_device::FakeDevice;
2116
2117    const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
2118
2119    #[fuchsia::test]
2120    async fn test_replay() {
2121        const TEST_DATA: &[u8] = b"hello";
2122
2123        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
2124
2125        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2126
2127        let object_id = {
2128            let root_store = fs.root_store();
2129            let root_directory =
2130                Directory::open(&root_store, root_store.root_directory_object_id())
2131                    .await
2132                    .expect("open failed");
2133            let mut transaction = fs
2134                .root_store()
2135                .new_transaction(
2136                    lock_keys![LockKey::object(
2137                        root_store.store_object_id(),
2138                        root_store.root_directory_object_id(),
2139                    )],
2140                    Options::default(),
2141                )
2142                .await
2143                .expect("new_transaction failed");
2144            let handle = root_directory
2145                .create_child_file(&mut transaction, "test")
2146                .await
2147                .expect("create_child_file failed");
2148
2149            transaction.commit().await.expect("commit failed");
2150            let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
2151            buf.as_mut_slice().copy_from_slice(TEST_DATA);
2152            handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2153            // As this is the first sync, this will actually trigger a new super-block, but normally
2154            // this would not be the case.
2155            fs.sync(SyncOptions::default()).await.expect("sync failed");
2156            handle.object_id()
2157        };
2158
2159        {
2160            fs.close().await.expect("Close failed");
2161            let device = fs.take_device().await;
2162            device.reopen(false);
2163            let fs = FxFilesystem::open(device).await.expect("open failed");
2164            let handle = ObjectStore::open_object(
2165                &fs.root_store(),
2166                object_id,
2167                HandleOptions::default(),
2168                None,
2169            )
2170            .await
2171            .expect("open_object failed");
2172            let mut buf = handle.allocate_buffer(TEST_DEVICE_BLOCK_SIZE as usize).await;
2173            assert_eq!(handle.read(0, buf.as_mut()).await.expect("read failed"), TEST_DATA.len());
2174            assert_eq!(&buf.as_slice()[..TEST_DATA.len()], TEST_DATA);
2175            fsck(fs.clone()).await.expect("fsck failed");
2176            fs.close().await.expect("Close failed");
2177        }
2178    }
2179
2180    #[fuchsia::test]
2181    async fn test_reset() {
2182        const TEST_DATA: &[u8] = b"hello";
2183
2184        let device = DeviceHolder::new(FakeDevice::new(32768, TEST_DEVICE_BLOCK_SIZE));
2185
2186        let mut object_ids = Vec::new();
2187
2188        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2189        {
2190            let root_store = fs.root_store();
2191            let root_directory =
2192                Directory::open(&root_store, root_store.root_directory_object_id())
2193                    .await
2194                    .expect("open failed");
2195            let mut transaction = fs
2196                .root_store()
2197                .new_transaction(
2198                    lock_keys![LockKey::object(
2199                        root_store.store_object_id(),
2200                        root_store.root_directory_object_id(),
2201                    )],
2202                    Options::default(),
2203                )
2204                .await
2205                .expect("new_transaction failed");
2206            let handle = root_directory
2207                .create_child_file(&mut transaction, "test")
2208                .await
2209                .expect("create_child_file failed");
2210            transaction.commit().await.expect("commit failed");
2211            let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
2212            buf.as_mut_slice().copy_from_slice(TEST_DATA);
2213            handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2214            fs.sync(SyncOptions::default()).await.expect("sync failed");
2215            object_ids.push(handle.object_id());
2216
2217            // Create a lot of objects but don't sync at the end. This should leave the filesystem
2218            // with a half finished transaction that cannot be replayed.
2219            for i in 0..1000 {
2220                let mut transaction = fs
2221                    .root_store()
2222                    .new_transaction(
2223                        lock_keys![LockKey::object(
2224                            root_store.store_object_id(),
2225                            root_store.root_directory_object_id(),
2226                        )],
2227                        Options::default(),
2228                    )
2229                    .await
2230                    .expect("new_transaction failed");
2231                let handle = root_directory
2232                    .create_child_file(&mut transaction, &format!("{}", i))
2233                    .await
2234                    .expect("create_child_file failed");
2235                transaction.commit().await.expect("commit failed");
2236                let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
2237                buf.as_mut_slice().copy_from_slice(TEST_DATA);
2238                handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2239                object_ids.push(handle.object_id());
2240            }
2241        }
2242        fs.close().await.expect("fs close failed");
2243        let device = fs.take_device().await;
2244        device.reopen(false);
2245        let fs = FxFilesystem::open(device).await.expect("open failed");
2246        fsck(fs.clone()).await.expect("fsck failed");
2247        {
2248            let root_store = fs.root_store();
2249            // Check the first two objects which should exist.
2250            for &object_id in &object_ids[0..1] {
2251                let handle = ObjectStore::open_object(
2252                    &root_store,
2253                    object_id,
2254                    HandleOptions::default(),
2255                    None,
2256                )
2257                .await
2258                .expect("open_object failed");
2259                let mut buf = handle.allocate_buffer(TEST_DEVICE_BLOCK_SIZE as usize).await;
2260                assert_eq!(
2261                    handle.read(0, buf.as_mut()).await.expect("read failed"),
2262                    TEST_DATA.len()
2263                );
2264                assert_eq!(&buf.as_slice()[..TEST_DATA.len()], TEST_DATA);
2265            }
2266
2267            // Write one more object and sync.
2268            let root_directory =
2269                Directory::open(&root_store, root_store.root_directory_object_id())
2270                    .await
2271                    .expect("open failed");
2272            let mut transaction = fs
2273                .root_store()
2274                .new_transaction(
2275                    lock_keys![LockKey::object(
2276                        root_store.store_object_id(),
2277                        root_store.root_directory_object_id(),
2278                    )],
2279                    Options::default(),
2280                )
2281                .await
2282                .expect("new_transaction failed");
2283            let handle = root_directory
2284                .create_child_file(&mut transaction, "test2")
2285                .await
2286                .expect("create_child_file failed");
2287            transaction.commit().await.expect("commit failed");
2288            let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
2289            buf.as_mut_slice().copy_from_slice(TEST_DATA);
2290            handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2291            fs.sync(SyncOptions::default()).await.expect("sync failed");
2292            object_ids.push(handle.object_id());
2293        }
2294
2295        fs.close().await.expect("close failed");
2296        let device = fs.take_device().await;
2297        device.reopen(false);
2298        let fs = FxFilesystem::open(device).await.expect("open failed");
2299        {
2300            fsck(fs.clone()).await.expect("fsck failed");
2301
2302            // Check the first two and the last objects.
2303            for &object_id in object_ids[0..1].iter().chain(object_ids.last().cloned().iter()) {
2304                let handle = ObjectStore::open_object(
2305                    &fs.root_store(),
2306                    object_id,
2307                    HandleOptions::default(),
2308                    None,
2309                )
2310                .await
2311                .unwrap_or_else(|e| {
2312                    panic!("open_object failed (object_id: {}): {:?}", object_id, e)
2313                });
2314                let mut buf = handle.allocate_buffer(TEST_DEVICE_BLOCK_SIZE as usize).await;
2315                assert_eq!(
2316                    handle.read(0, buf.as_mut()).await.expect("read failed"),
2317                    TEST_DATA.len()
2318                );
2319                assert_eq!(&buf.as_slice()[..TEST_DATA.len()], TEST_DATA);
2320            }
2321        }
2322        fs.close().await.expect("close failed");
2323    }
2324
2325    #[fuchsia::test]
2326    async fn test_discard() {
2327        let device = {
2328            let device = DeviceHolder::new(FakeDevice::new(8192, 4096));
2329            let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2330            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
2331
2332            let store = root_volume
2333                .new_volume("test", NewChildStoreOptions::default())
2334                .await
2335                .expect("new_volume failed");
2336            let root_directory = Directory::open(&store, store.root_directory_object_id())
2337                .await
2338                .expect("open failed");
2339
2340            // Create enough data so that another journal extent is used.
2341            let mut i = 0;
2342            loop {
2343                let mut transaction = fs
2344                    .root_store()
2345                    .new_transaction(
2346                        lock_keys![LockKey::object(
2347                            store.store_object_id(),
2348                            store.root_directory_object_id()
2349                        )],
2350                        Options::default(),
2351                    )
2352                    .await
2353                    .expect("new_transaction failed");
2354                root_directory
2355                    .create_child_file(&mut transaction, &format!("a {i}"))
2356                    .await
2357                    .expect("create_child_file failed");
2358                if transaction.commit().await.expect("commit failed") > super::CHUNK_SIZE {
2359                    break;
2360                }
2361                i += 1;
2362            }
2363
2364            // Compact and then disable compactions.
2365            fs.journal().force_compact().await.expect("compact failed");
2366            fs.journal().stop_compactions().await;
2367
2368            // Keep going until we need another journal extent.
2369            let mut i = 0;
2370            loop {
2371                let mut transaction = fs
2372                    .root_store()
2373                    .new_transaction(
2374                        lock_keys![LockKey::object(
2375                            store.store_object_id(),
2376                            store.root_directory_object_id()
2377                        )],
2378                        Options::default(),
2379                    )
2380                    .await
2381                    .expect("new_transaction failed");
2382                root_directory
2383                    .create_child_file(&mut transaction, &format!("b {i}"))
2384                    .await
2385                    .expect("create_child_file failed");
2386                if transaction.commit().await.expect("commit failed") > 2 * super::CHUNK_SIZE {
2387                    break;
2388                }
2389                i += 1;
2390            }
2391
2392            // Allow the journal to flush, but we don't want to sync.
2393            fasync::Timer::new(MonotonicDuration::from_millis(10)).await;
2394            // Because we're not gracefully closing the filesystem, a Discard record will be
2395            // emitted.
2396            fs.device().snapshot().expect("snapshot failed")
2397        };
2398
2399        let fs = FxFilesystem::open(device).await.expect("open failed");
2400
2401        {
2402            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
2403
2404            let store =
2405                root_volume.volume("test", StoreOptions::default()).await.expect("volume failed");
2406
2407            let root_directory = Directory::open(&store, store.root_directory_object_id())
2408                .await
2409                .expect("open failed");
2410
2411            // Write one more transaction.
2412            let mut transaction = fs
2413                .root_store()
2414                .new_transaction(
2415                    lock_keys![LockKey::object(
2416                        store.store_object_id(),
2417                        store.root_directory_object_id()
2418                    )],
2419                    Options::default(),
2420                )
2421                .await
2422                .expect("new_transaction failed");
2423            root_directory
2424                .create_child_file(&mut transaction, &format!("d"))
2425                .await
2426                .expect("create_child_file failed");
2427            transaction.commit().await.expect("commit failed");
2428        }
2429
2430        fs.close().await.expect("close failed");
2431        let device = fs.take_device().await;
2432        device.reopen(false);
2433
2434        let fs = FxFilesystem::open(device).await.expect("open failed");
2435        fsck(fs.clone()).await.expect("fsck failed");
2436        fs.close().await.expect("close failed");
2437    }
2438
2439    #[fuchsia::test]
2440    async fn test_use_existing_generation() {
2441        let device = DeviceHolder::new(FakeDevice::new(8192, 4096));
2442
2443        // First format should be generation 1.
2444        let fs = FxFilesystemBuilder::new()
2445            .format(true)
2446            .image_builder_mode(Some(SuperBlockInstance::A))
2447            .open(device)
2448            .await
2449            .expect("open failed");
2450        fs.enable_allocations();
2451        let generation0 = fs.super_block_header().generation;
2452        assert_eq!(generation0, 1);
2453        fs.close().await.expect("close failed");
2454        let device = fs.take_device().await;
2455        device.reopen(false);
2456
2457        // Format the device normally (again, generation 1).
2458        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2459        let generation1 = fs.super_block_header().generation;
2460        {
2461            let root_volume = crate::object_store::volume::root_volume(fs.clone())
2462                .await
2463                .expect("root_volume failed");
2464            root_volume
2465                .new_volume("test", crate::object_store::NewChildStoreOptions::default())
2466                .await
2467                .expect("new_volume failed");
2468        }
2469        fs.close().await.expect("close failed");
2470        let device = fs.take_device().await;
2471        device.reopen(false);
2472
2473        // Format again with image_builder_mode.
2474        let fs = FxFilesystemBuilder::new()
2475            .format(true)
2476            .image_builder_mode(Some(SuperBlockInstance::A))
2477            .open(device)
2478            .await
2479            .expect("open failed");
2480        fs.enable_allocations();
2481        let generation2 = fs.super_block_header().generation;
2482        assert!(
2483            generation2 > generation1,
2484            "generation2 ({}) should be greater than generation1 ({})",
2485            generation2,
2486            generation1
2487        );
2488        fs.close().await.expect("close failed");
2489    }
2490
2491    #[fuchsia::test]
2492    async fn test_image_builder_mode_generation_bump_512_byte_block() {
2493        let device = DeviceHolder::new(FakeDevice::new(16384, 512));
2494
2495        // Format initial filesystem (generation 1)
2496        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2497        let generation1 = fs.super_block_header().generation;
2498        fs.close().await.expect("close failed");
2499        let device = fs.take_device().await;
2500        device.reopen(false);
2501
2502        // Format again with image_builder_mode (should bump generation)
2503        let fs = FxFilesystemBuilder::new()
2504            .format(true)
2505            .image_builder_mode(Some(SuperBlockInstance::A))
2506            .open(device)
2507            .await
2508            .expect("open failed");
2509
2510        fs.enable_allocations();
2511        let generation2 = fs.super_block_header().generation;
2512        assert!(
2513            generation2 > generation1,
2514            "Expected generation bump, got {} vs {}",
2515            generation2,
2516            generation1
2517        );
2518        fs.close().await.expect("close failed");
2519    }
2520
2521    #[fuchsia::test]
2522    #[cfg(target_os = "fuchsia")]
2523    fn test_low_priority_compaction() {
2524        let mut executor = TestExecutor::new_with_fake_time();
2525        let mut fut = std::pin::pin!(async {
2526            use std::sync::Arc;
2527            use std::sync::atomic::{AtomicBool, Ordering};
2528
2529            let device = DeviceHolder::new(FakeDevice::new(16384, TEST_DEVICE_BLOCK_SIZE));
2530            let fs = FxFilesystemBuilder::new()
2531                .journal_options(super::JournalOptions {
2532                    reclaim_size: 65536,
2533                    ..Default::default()
2534                })
2535                .format(true)
2536                .open(device)
2537                .await
2538                .expect("open failed");
2539
2540            let _low = fasync::LowPriorityTask::new();
2541
2542            // Add some data to the tree.
2543            {
2544                let root_store = fs.root_store();
2545                let root_directory =
2546                    Directory::open(&root_store, root_store.root_directory_object_id())
2547                        .await
2548                        .expect("open failed");
2549                for i in 0..100 {
2550                    let mut transaction = fs
2551                        .root_store()
2552                        .new_transaction(
2553                            lock_keys![LockKey::object(
2554                                root_store.store_object_id(),
2555                                root_store.root_directory_object_id(),
2556                            )],
2557                            Options::default(),
2558                        )
2559                        .await
2560                        .expect("new_transaction failed");
2561                    root_directory
2562                        .create_child_file(&mut transaction, &format!("test{}", i))
2563                        .await
2564                        .expect("create_child_file failed");
2565                    transaction.commit().await.expect("commit failed");
2566                }
2567            }
2568
2569            // Spawn a task that polls every 1ms.
2570            let stop = Arc::new(AtomicBool::new(false));
2571            let stop_clone = stop.clone();
2572            let _normal_task = fasync::Task::spawn(async move {
2573                while !stop_clone.load(Ordering::Relaxed) {
2574                    fasync::Timer::new(fasync::MonotonicInstant::after(
2575                        MonotonicDuration::from_millis(1),
2576                    ))
2577                    .await;
2578                }
2579            });
2580
2581            // Trigger journal compaction.
2582            // We can do this by writing more data until outstanding > reclaim_size / 2.
2583            {
2584                let root_store = fs.root_store();
2585                let root_directory =
2586                    Directory::open(&root_store, root_store.root_directory_object_id())
2587                        .await
2588                        .expect("open failed");
2589                let mut i = 0;
2590                loop {
2591                    let mut transaction = fs
2592                        .root_store()
2593                        .new_transaction(
2594                            lock_keys![LockKey::object(
2595                                root_store.store_object_id(),
2596                                root_store.root_directory_object_id(),
2597                            )],
2598                            Options::default(),
2599                        )
2600                        .await
2601                        .expect("new_transaction failed");
2602                    root_directory
2603                        .create_child_file(&mut transaction, &format!("trigger{i}"))
2604                        .await
2605                        .expect("create_child_file failed");
2606                    transaction.commit().await.expect("commit failed");
2607
2608                    if fs.journal().inner.lock().compaction_running {
2609                        break;
2610                    }
2611                    TestExecutor::advance_to(fasync::MonotonicInstant::after(
2612                        MonotonicDuration::from_millis(1),
2613                    ))
2614                    .await;
2615                    i += 1;
2616                }
2617            }
2618
2619            // Compaction should now be running. Because of our 1ms poller, it should be yielding.
2620            // It will yield for 16ms at each point.
2621            for _ in 0..10 {
2622                TestExecutor::advance_to(fasync::MonotonicInstant::after(
2623                    MonotonicDuration::from_millis(1),
2624                ))
2625                .await;
2626                assert!(fs.journal().inner.lock().compaction_running);
2627            }
2628
2629            // Stop the normal task.
2630            stop.store(true, Ordering::Relaxed);
2631            TestExecutor::advance_to(fasync::MonotonicInstant::after(
2632                MonotonicDuration::from_millis(1),
2633            ))
2634            .await;
2635
2636            // Compaction should still be running because it hasn't been 4 ms since the normal task
2637            // finished.
2638            assert!(fs.journal().inner.lock().compaction_running);
2639
2640            // For the next 3ms, compaction should still be running.
2641            for _ in 0..3 {
2642                TestExecutor::advance_to(fasync::MonotonicInstant::after(
2643                    MonotonicDuration::from_millis(1),
2644                ))
2645                .await;
2646                assert!(fs.journal().inner.lock().compaction_running);
2647            }
2648
2649            // 1 more ms and compaction should be unblocked.
2650            TestExecutor::advance_to(fasync::MonotonicInstant::after(
2651                MonotonicDuration::from_millis(1),
2652            ))
2653            .await;
2654
2655            // When the executor next stalls, compaction should be done.
2656            let _ = TestExecutor::poll_until_stalled(std::future::pending::<()>()).await;
2657            assert!(!fs.journal().inner.lock().compaction_running);
2658
2659            fs.close().await.expect("Close failed");
2660        });
2661        assert!(executor.run_until_stalled(&mut fut).is_ready());
2662    }
2663
2664    #[fuchsia::test]
2665    #[cfg(target_os = "fuchsia")]
2666    fn test_low_priority_compaction_deadline() {
2667        let mut executor = TestExecutor::new_with_fake_time();
2668        let mut fut = std::pin::pin!(async {
2669            use std::sync::Arc;
2670            use std::sync::atomic::{AtomicBool, Ordering};
2671
2672            let device = DeviceHolder::new(FakeDevice::new(216384, TEST_DEVICE_BLOCK_SIZE));
2673            let fs = FxFilesystemBuilder::new()
2674                .journal_options(super::JournalOptions {
2675                    reclaim_size: 65536,
2676                    ..Default::default()
2677                })
2678                .format(true)
2679                .open(device)
2680                .await
2681                .expect("open failed");
2682
2683            let _low = fasync::LowPriorityTask::new();
2684
2685            // Add some data to the tree.
2686            {
2687                let root_store = fs.root_store();
2688                let root_directory =
2689                    Directory::open(&root_store, root_store.root_directory_object_id())
2690                        .await
2691                        .expect("open failed");
2692                for i in 0..10 {
2693                    let mut transaction = fs
2694                        .root_store()
2695                        .new_transaction(
2696                            lock_keys![LockKey::object(
2697                                root_store.store_object_id(),
2698                                root_store.root_directory_object_id(),
2699                            )],
2700                            Options::default(),
2701                        )
2702                        .await
2703                        .expect("new_transaction failed");
2704                    root_directory
2705                        .create_child_file(&mut transaction, &format!("test{}", i))
2706                        .await
2707                        .expect("create_child_file failed");
2708                    transaction.commit().await.expect("commit failed");
2709                }
2710            }
2711
2712            // Spawn a task that polls every 3ms.
2713            let stop = Arc::new(AtomicBool::new(false));
2714            let stop_clone = stop.clone();
2715            let _normal_task = fasync::Task::spawn(async move {
2716                while !stop_clone.load(Ordering::Relaxed) {
2717                    fasync::Timer::new(fasync::MonotonicInstant::after(
2718                        MonotonicDuration::from_millis(3),
2719                    ))
2720                    .await;
2721                }
2722            });
2723
2724            // Trigger journal compaction.
2725            {
2726                let root_store = fs.root_store();
2727                let root_directory =
2728                    Directory::open(&root_store, root_store.root_directory_object_id())
2729                        .await
2730                        .expect("open failed");
2731                let mut i = 0;
2732                loop {
2733                    let mut transaction = fs
2734                        .root_store()
2735                        .new_transaction(
2736                            lock_keys![LockKey::object(
2737                                root_store.store_object_id(),
2738                                root_store.root_directory_object_id(),
2739                            )],
2740                            Options::default(),
2741                        )
2742                        .await
2743                        .expect("new_transaction failed");
2744                    root_directory
2745                        .create_child_file(&mut transaction, &format!("trigger{i}"))
2746                        .await
2747                        .expect("create_child_file failed");
2748                    transaction.commit().await.expect("commit failed");
2749
2750                    if fs.journal().inner.lock().compaction_running {
2751                        break;
2752                    }
2753                    TestExecutor::advance_to(fasync::MonotonicInstant::after(
2754                        MonotonicDuration::from_millis(1),
2755                    ))
2756                    .await;
2757                    i += 1;
2758                }
2759            }
2760
2761            // Advance time in 20ms increments. Each increment should allow compaction to make progress
2762            // on one item (since MAX_YIELD_DURATION is 16ms).
2763            let mut count = 0;
2764            for _ in 0..1000 {
2765                TestExecutor::advance_to(fasync::MonotonicInstant::after(
2766                    MonotonicDuration::from_millis(20),
2767                ))
2768                .await;
2769                if !fs.journal().inner.lock().compaction_running {
2770                    break;
2771                }
2772                count += 1;
2773            }
2774            assert!(!fs.journal().inner.lock().compaction_running);
2775
2776            // Make sure it took a few iterations to complete.  It's difficult to know what the exact
2777            // number should be.
2778            assert!(count > 200);
2779
2780            stop.store(true, Ordering::Relaxed);
2781            fs.close().await.expect("Close failed");
2782        });
2783        assert!(executor.run_until_stalled(&mut fut).is_ready());
2784    }
2785
2786    #[fuchsia::test]
2787    #[cfg(target_os = "fuchsia")]
2788    fn test_low_priority_compaction_no_yielding_when_full() {
2789        let mut executor = TestExecutor::new_with_fake_time();
2790        let mut fut = std::pin::pin!(async {
2791            use std::sync::Arc;
2792            use std::sync::atomic::{AtomicBool, Ordering};
2793
2794            let reclaim_size = 65536;
2795            let device = DeviceHolder::new(FakeDevice::new(216384, TEST_DEVICE_BLOCK_SIZE));
2796            let fs = FxFilesystemBuilder::new()
2797                .journal_options(super::JournalOptions { reclaim_size, ..Default::default() })
2798                .format(true)
2799                .open(device)
2800                .await
2801                .expect("open failed");
2802
2803            let _low = fasync::LowPriorityTask::new();
2804
2805            // Add some data to the tree.
2806            {
2807                let root_store = fs.root_store();
2808                let root_directory =
2809                    Directory::open(&root_store, root_store.root_directory_object_id())
2810                        .await
2811                        .expect("open failed");
2812                for i in 0..10 {
2813                    let mut transaction = fs
2814                        .root_store()
2815                        .new_transaction(
2816                            lock_keys![LockKey::object(
2817                                root_store.store_object_id(),
2818                                root_store.root_directory_object_id(),
2819                            )],
2820                            Options::default(),
2821                        )
2822                        .await
2823                        .expect("new_transaction failed");
2824                    root_directory
2825                        .create_child_file(&mut transaction, &format!("test{}", i))
2826                        .await
2827                        .expect("create_child_file failed");
2828                    transaction.commit().await.expect("commit failed");
2829                }
2830            }
2831
2832            // Spawn a task that polls every 3ms.
2833            let stop = Arc::new(AtomicBool::new(false));
2834            let stop_clone = stop.clone();
2835            let _normal_task = fasync::Task::spawn(async move {
2836                while !stop_clone.load(Ordering::Relaxed) {
2837                    fasync::Timer::new(fasync::MonotonicInstant::after(
2838                        MonotonicDuration::from_millis(3),
2839                    ))
2840                    .await;
2841                }
2842            });
2843
2844            // Trigger journal compaction, but this time we fill it up to 3/4 full.
2845            {
2846                let root_store = fs.root_store();
2847                let root_directory =
2848                    Directory::open(&root_store, root_store.root_directory_object_id())
2849                        .await
2850                        .expect("open failed");
2851                let mut i = 0;
2852                loop {
2853                    let mut transaction = fs
2854                        .root_store()
2855                        .new_transaction(
2856                            lock_keys![LockKey::object(
2857                                root_store.store_object_id(),
2858                                root_store.root_directory_object_id(),
2859                            )],
2860                            Options::default(),
2861                        )
2862                        .await
2863                        .expect("new_transaction failed");
2864                    root_directory
2865                        .create_child_file(&mut transaction, &format!("trigger{i}"))
2866                        .await
2867                        .expect("create_child_file failed");
2868                    transaction.commit().await.expect("commit failed");
2869
2870                    let outstanding = {
2871                        let inner = fs.journal().inner.lock();
2872                        fs.journal().objects.last_end_offset()
2873                            - inner.super_block_header.journal_checkpoint.file_offset
2874                    };
2875                    if outstanding >= reclaim_size * 7 / 8 {
2876                        break;
2877                    }
2878                    // We don't advance time here to try and reach 3/4 before compaction can yield.
2879                    i += 1;
2880                }
2881            }
2882
2883            // Advancing by 4ms should be enough to wake compaction up.
2884            TestExecutor::advance_to(fasync::MonotonicInstant::after(
2885                MonotonicDuration::from_millis(4),
2886            ))
2887            .await;
2888
2889            // When the executor next stalls, compaction should be done.
2890            let _ = TestExecutor::poll_until_stalled(std::future::pending::<()>()).await;
2891            assert!(!fs.journal().inner.lock().compaction_running);
2892
2893            stop.store(true, Ordering::Relaxed);
2894            fs.close().await.expect("Close failed");
2895        });
2896        assert!(executor.run_until_stalled(&mut fut).is_ready());
2897    }
2898}
2899
2900#[cfg(fuzz)]
2901mod fuzz {
2902    use fuzz::fuzz;
2903
2904    #[fuzz]
2905    fn fuzz_journal_bytes(input: Vec<u8>) {
2906        use crate::filesystem::FxFilesystem;
2907        use fuchsia_async as fasync;
2908        use std::io::Write;
2909        use storage_device::DeviceHolder;
2910        use storage_device::fake_device::FakeDevice;
2911
2912        fasync::SendExecutorBuilder::new().num_threads(4).build().run(async move {
2913            let device = DeviceHolder::new(FakeDevice::new(32768, 512));
2914            let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2915            fs.journal().inner.lock().writer.write_all(&input).expect("write failed");
2916            fs.close().await.expect("close failed");
2917            let device = fs.take_device().await;
2918            device.reopen(false);
2919            if let Ok(fs) = FxFilesystem::open(device).await {
2920                // `close()` can fail if there were objects to be tombstoned. If the said object is
2921                // corrupted, there will be an error when we compact the journal.
2922                let _ = fs.close().await;
2923            }
2924        });
2925    }
2926
2927    #[fuzz]
2928    fn fuzz_journal(input: Vec<super::JournalRecord>) {
2929        use crate::filesystem::FxFilesystem;
2930        use fuchsia_async as fasync;
2931        use storage_device::DeviceHolder;
2932        use storage_device::fake_device::FakeDevice;
2933
2934        fasync::SendExecutorBuilder::new().num_threads(4).build().run(async move {
2935            let device = DeviceHolder::new(FakeDevice::new(32768, 512));
2936            let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2937            {
2938                let mut inner = fs.journal().inner.lock();
2939                for record in &input {
2940                    let _ = inner.writer.write_record(record);
2941                }
2942            }
2943            fs.close().await.expect("close failed");
2944            let device = fs.take_device().await;
2945            device.reopen(false);
2946            if let Ok(fs) = FxFilesystem::open(device).await {
2947                // `close()` can fail if there were objects to be tombstoned. If the said object is
2948                // corrupted, there will be an error when we compact the journal.
2949                let _ = fs.close().await;
2950            }
2951        });
2952    }
2953}