Skip to main content

fxfs/object_store/
journal.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! The journal is implemented as an ever extending file which contains variable length records
6//! that describe mutations to be applied to various objects.  The journal file consists of
7//! blocks, with a checksum at the end of each block, but otherwise it can be considered a
8//! continuous stream.
9//!
10//! The checksum is seeded with the checksum from the previous block.  To free space in the
11//! journal, records are replaced with sparse extents when it is known they are no longer
12//! needed to mount.
13//!
14//! At mount time, the journal is replayed: the mutations are applied into memory.
15//! Eventually, a checksum failure will indicate no more records exist to be replayed,
16//! at which point the mount can continue and the journal will be extended from that point with
17//! further mutations as required.
18
19mod bootstrap_handle;
20mod checksum_list;
21mod reader;
22pub mod super_block;
23mod writer;
24
25use crate::checksum::{Checksum, Checksums, ChecksumsV38};
26use crate::errors::FxfsError;
27use crate::filesystem::{ApplyContext, ApplyMode, FxFilesystem, SyncOptions};
28use crate::log::*;
29use crate::lsm_tree::cache::NullCache;
30use crate::lsm_tree::types::Layer;
31use crate::object_handle::{ObjectHandle as _, ReadObjectHandle};
32use crate::object_store::allocator::Allocator;
33use crate::object_store::data_object_handle::OverwriteOptions;
34use crate::object_store::extent_record::{ExtentMode, ExtentValue};
35use crate::object_store::graveyard::Graveyard;
36use crate::object_store::journal::bootstrap_handle::BootstrapObjectHandle;
37use crate::object_store::journal::checksum_list::ChecksumList;
38use crate::object_store::journal::reader::{JournalReader, ReadResult};
39use crate::object_store::journal::super_block::{
40    SuperBlockHeader, SuperBlockInstance, SuperBlockManager,
41};
42use crate::object_store::journal::writer::JournalWriter;
43use crate::object_store::object_manager::ObjectManager;
44use crate::object_store::object_record::{AttributeKey, ObjectKey, ObjectKeyData, ObjectValue};
45use crate::object_store::transaction::{
46    AllocatorMutation, LockKey, Mutation, MutationV40, MutationV41, MutationV43, MutationV46,
47    MutationV47, MutationV49, MutationV50, MutationV54, MutationV55, ObjectStoreMutation, Options,
48    TRANSACTION_MAX_JOURNAL_USAGE, Transaction, TxnMutation, lock_keys,
49};
50use crate::object_store::{
51    AssocObj, AttributeId, DataObjectHandle, Extent, HandleOptions, HandleOwner, INVALID_OBJECT_ID,
52    Item, ItemRef, NewChildStoreOptions, ObjectStore, ReservedId,
53};
54use crate::range::RangeExt;
55use crate::round::{round_div, round_down};
56use crate::serialized_types::{
57    LATEST_VERSION, Migrate, Version, Versioned, migrate_nodefault, migrate_to_version,
58};
59use anyhow::{Context, Error, anyhow, bail, ensure};
60use event_listener::Event;
61use fprint::TypeFingerprint;
62use fuchsia_inspect::NumericProperty;
63use fuchsia_sync::Mutex;
64use futures::FutureExt as _;
65use futures::future::poll_fn;
66use rustc_hash::FxHashMap as HashMap;
67use serde::{Deserialize, Serialize};
68use static_assertions::const_assert;
69use std::clone::Clone;
70use std::collections::HashSet;
71use std::num::NonZero;
72use std::ops::{Bound, Range};
73use std::sync::atomic::{AtomicBool, Ordering};
74use std::sync::{Arc, OnceLock};
75use std::task::{Poll, Waker};
76use storage_device::Device;
77
78// The journal file is written to in blocks of this size.
79pub const BLOCK_SIZE: u64 = 4096;
80
81// The journal file is extended by this amount when necessary.
82const CHUNK_SIZE: u64 = 131_072;
83const_assert!(CHUNK_SIZE > TRANSACTION_MAX_JOURNAL_USAGE);
84
85// See the comment for the `reclaim_size` member of Inner.
86pub const DEFAULT_RECLAIM_SIZE: u64 = 524_288;
87
88// Temporary space that should be reserved for the journal.  For example: space that is currently
89// used in the journal file but cannot be deallocated yet because we are flushing.
90pub const RESERVED_SPACE: u64 = 1_048_576;
91
92// Whenever the journal is replayed (i.e. the system is unmounted and remounted), we reset the
93// journal stream, at which point any half-complete transactions are discarded.  We indicate a
94// journal reset by XORing the previous block's checksum with this mask, and using that value as a
95// seed for the next journal block.
96const RESET_XOR: u64 = 0xffffffffffffffff;
97
98// To keep track of offsets within a journal file, we need both the file offset and the check-sum of
99// the preceding block, since the check-sum of the preceding block is an input to the check-sum of
100// every block.
101pub type JournalCheckpoint = JournalCheckpointV32;
102
103#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize, TypeFingerprint)]
104pub struct JournalCheckpointV32 {
105    pub file_offset: u64,
106
107    // Starting check-sum for block that contains file_offset i.e. the checksum for the previous
108    // block.
109    pub checksum: Checksum,
110
111    // If versioned, the version of elements stored in the journal. e.g. JournalRecord version.
112    // This can change across reset events so we store it along with the offset and checksum to
113    // know which version to deserialize.
114    pub version: Version,
115}
116
117pub type JournalRecord = JournalRecordV55;
118
119#[allow(clippy::large_enum_variant)]
120#[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint, Versioned)]
121#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
122pub enum JournalRecordV55 {
123    EndBlock,
124    Mutation {
125        object_id: u64,
126        mutation: MutationV55,
127    },
128    /// Commits records in the transaction.
129    Commit,
130    /// Discard all mutations with offsets greater than or equal to the given offset.
131    Discard(u64),
132    /// Indicates the device was flushed at the given journal offset.
133    /// Note that this really means that at this point in the journal offset, we can be certain that
134    /// there's no remaining buffered data in the block device; the buffers and the disk contents are
135    /// consistent.
136    /// We insert one of these records *after* a flush along with the *next* transaction to go
137    /// through.  If that never comes (either due to graceful or hard shutdown), the journal reset
138    /// on the next mount will serve the same purpose and count as a flush, although it is necessary
139    /// to defensively flush the device before replaying the journal (if possible, i.e. not
140    /// read-only) in case the block device connection was reused.
141    DidFlushDevice(u64),
142    /// Checksums for a data range written by this transaction. A transaction is only valid if these
143    /// checksums are right. The range is the device offset the checksums are for.
144    ///
145    /// A boolean indicates whether this range is being written to for the first time. For overwrite
146    /// extents, we only check the checksums for a block if it has been written to for the first
147    /// time since the last flush, because otherwise we can't roll it back anyway so it doesn't
148    /// matter. For copy-on-write extents, the bool is always true.
149    DataChecksums(Range<u64>, crate::checksum::ChecksumsV38, bool),
150}
151
152#[allow(clippy::large_enum_variant)]
153#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
154#[migrate_to_version(JournalRecordV55)]
155#[migrate_nodefault]
156pub enum JournalRecordV54 {
157    EndBlock,
158    Mutation { object_id: u64, mutation: MutationV54 },
159    Commit,
160    Discard(u64),
161    DidFlushDevice(u64),
162    DataChecksums(Range<u64>, crate::checksum::ChecksumsV38, bool),
163}
164
165#[allow(clippy::large_enum_variant)]
166#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
167#[migrate_to_version(JournalRecordV54)]
168#[migrate_nodefault]
169pub enum JournalRecordV50 {
170    EndBlock,
171    Mutation { object_id: u64, mutation: MutationV50 },
172    Commit,
173    Discard(u64),
174    DidFlushDevice(u64),
175    DataChecksums(Range<u64>, ChecksumsV38, bool),
176}
177
178#[allow(clippy::large_enum_variant)]
179#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
180#[migrate_to_version(JournalRecordV50)]
181pub enum JournalRecordV49 {
182    EndBlock,
183    Mutation { object_id: u64, mutation: MutationV49 },
184    Commit,
185    Discard(u64),
186    DidFlushDevice(u64),
187    DataChecksums(Range<u64>, ChecksumsV38, bool),
188}
189
190#[allow(clippy::large_enum_variant)]
191#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
192#[migrate_to_version(JournalRecordV49)]
193pub enum JournalRecordV47 {
194    EndBlock,
195    Mutation { object_id: u64, mutation: MutationV47 },
196    Commit,
197    Discard(u64),
198    DidFlushDevice(u64),
199    DataChecksums(Range<u64>, ChecksumsV38, bool),
200}
201
202#[allow(clippy::large_enum_variant)]
203#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
204#[migrate_to_version(JournalRecordV47)]
205pub enum JournalRecordV46 {
206    EndBlock,
207    Mutation { object_id: u64, mutation: MutationV46 },
208    Commit,
209    Discard(u64),
210    DidFlushDevice(u64),
211    DataChecksums(Range<u64>, ChecksumsV38, bool),
212}
213
214#[allow(clippy::large_enum_variant)]
215#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
216#[migrate_to_version(JournalRecordV46)]
217pub enum JournalRecordV43 {
218    EndBlock,
219    Mutation { object_id: u64, mutation: MutationV43 },
220    Commit,
221    Discard(u64),
222    DidFlushDevice(u64),
223    DataChecksums(Range<u64>, ChecksumsV38, bool),
224}
225
226#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
227#[migrate_to_version(JournalRecordV43)]
228pub enum JournalRecordV42 {
229    EndBlock,
230    Mutation { object_id: u64, mutation: MutationV41 },
231    Commit,
232    Discard(u64),
233    DidFlushDevice(u64),
234    DataChecksums(Range<u64>, ChecksumsV38, bool),
235}
236
237#[derive(Serialize, Deserialize, TypeFingerprint, Versioned)]
238pub enum JournalRecordV41 {
239    EndBlock,
240    Mutation { object_id: u64, mutation: MutationV41 },
241    Commit,
242    Discard(u64),
243    DidFlushDevice(u64),
244    DataChecksums(Range<u64>, ChecksumsV38),
245}
246
247impl From<JournalRecordV41> for JournalRecordV42 {
248    fn from(record: JournalRecordV41) -> Self {
249        match record {
250            JournalRecordV41::EndBlock => Self::EndBlock,
251            JournalRecordV41::Mutation { object_id, mutation } => {
252                Self::Mutation { object_id, mutation: mutation.into() }
253            }
254            JournalRecordV41::Commit => Self::Commit,
255            JournalRecordV41::Discard(offset) => Self::Discard(offset),
256            JournalRecordV41::DidFlushDevice(offset) => Self::DidFlushDevice(offset),
257            JournalRecordV41::DataChecksums(range, sums) => {
258                // At the time of writing the only extents written by real systems are CoW extents
259                // so the new bool is always true.
260                Self::DataChecksums(range, sums, true)
261            }
262        }
263    }
264}
265
266#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
267#[migrate_to_version(JournalRecordV41)]
268pub enum JournalRecordV40 {
269    EndBlock,
270    Mutation { object_id: u64, mutation: MutationV40 },
271    Commit,
272    Discard(u64),
273    DidFlushDevice(u64),
274    DataChecksums(Range<u64>, ChecksumsV38),
275}
276
277pub(super) fn journal_handle_options() -> HandleOptions {
278    HandleOptions { skip_journal_checks: true, ..Default::default() }
279}
280
281/// The journal records a stream of mutations that are to be applied to other objects.  At mount
282/// time, these records can be replayed into memory.  It provides a way to quickly persist changes
283/// without having to make a large number of writes; they can be deferred to a later time (e.g.
284/// when a sufficient number have been queued).  It also provides support for transactions, the
285/// ability to have mutations that are to be applied atomically together.
286pub struct Journal {
287    objects: Arc<ObjectManager>,
288    handle: OnceLock<DataObjectHandle<ObjectStore>>,
289    super_block_manager: SuperBlockManager,
290    inner: Mutex<Inner>,
291    writer_mutex: Mutex<()>,
292    sync_mutex: futures::lock::Mutex<()>,
293    trace: AtomicBool,
294
295    // This event is used when we are waiting for a compaction to free up journal space.
296    reclaim_event: Event,
297}
298
299struct Inner {
300    super_block_header: SuperBlockHeader,
301
302    // The offset that we can zero the journal up to now that it is no longer needed.
303    zero_offset: Option<u64>,
304
305    // The journal offset that we most recently flushed to the device.
306    device_flushed_offset: u64,
307
308    // If true, indicates a DidFlushDevice record is pending.
309    needs_did_flush_device: bool,
310
311    // The writer for the journal.
312    writer: JournalWriter,
313
314    // Set when a reset is encountered during a read.
315    // Used at write pre_commit() time to ensure we write a version first thing after a reset.
316    output_reset_version: bool,
317
318    // Waker for the flush task.
319    flush_waker: Option<Waker>,
320
321    // Indicates the journal has been terminated.
322    terminate: bool,
323
324    // Latched error indicating reason for journal termination if not graceful.
325    terminate_reason: Option<Error>,
326
327    // Disable compactions.
328    disable_compactions: bool,
329
330    // True if compactions are running.
331    compaction_running: bool,
332
333    // Waker for the sync task for when it's waiting for the flush task to finish.
334    sync_waker: Option<Waker>,
335
336    // The last offset we flushed to the journal file.
337    flushed_offset: u64,
338
339    // The last offset that should be considered valid in the journal file.  Most of the time, this
340    // will be the same as `flushed_offset` but at mount time, this could be less and will only be
341    // up to the end of the last valid transaction; it won't include transactions that follow that
342    // have been discarded.
343    valid_to: u64,
344
345    // If, after replaying, we have to discard a number of mutations (because they don't validate),
346    // this offset specifies where we need to discard back to.  This is so that when we next replay,
347    // we ignore those mutations and continue with new good mutations.
348    discard_offset: Option<u64>,
349
350    // In the steady state, the journal should fluctuate between being approximately half of this
351    // number and this number.  New super-blocks will be written every time about half of this
352    // amount is written to the journal.
353    reclaim_size: u64,
354
355    image_builder_mode: Option<SuperBlockInstance>,
356
357    // If true and `needs_barrier`, issue a pre-barrier on the first device write of each journal
358    // write (which happens in multiples of `BLOCK_SIZE`). This ensures that all the corresponding
359    // data writes make it to disk before the journal gets written to.
360    barriers_enabled: bool,
361
362    // If true, indicates that data write requests have been made to the device since the last
363    // journal write.
364    needs_barrier: bool,
365
366    // True if a compaction is being forced for reasons other than the journal being full.
367    forced_compaction: bool,
368}
369
370impl Inner {
371    fn terminate(&mut self, reason: Option<Error>) {
372        self.terminate = true;
373
374        if let Some(err) = reason {
375            error!(error:? = err; "Terminating journal");
376            // Log previous error if one was already set, otherwise latch the error.
377            if let Some(prev_err) = self.terminate_reason.as_ref() {
378                error!(error:? = prev_err; "Journal previously terminated");
379            } else {
380                self.terminate_reason = Some(err);
381            }
382        }
383
384        if let Some(waker) = self.flush_waker.take() {
385            waker.wake();
386        }
387        if let Some(waker) = self.sync_waker.take() {
388            waker.wake();
389        }
390    }
391}
392
393pub struct JournalOptions {
394    /// In the steady state, the journal should fluctuate between being approximately half of this
395    /// number and this number.  New super-blocks will be written every time about half of this
396    /// amount is written to the journal.
397    pub reclaim_size: u64,
398
399    // If true, issue a pre-barrier on the first device write of each journal write (which happens
400    // in multiples of `BLOCK_SIZE`). This ensures that all the corresponding data writes make it
401    // to disk before the journal gets written to.
402    pub barriers_enabled: bool,
403}
404
405impl Default for JournalOptions {
406    fn default() -> Self {
407        JournalOptions { reclaim_size: DEFAULT_RECLAIM_SIZE, barriers_enabled: false }
408    }
409}
410
411struct JournaledTransactions {
412    transactions: Vec<JournaledTransaction>,
413    device_flushed_offset: u64,
414}
415
416#[derive(Debug, Default)]
417pub struct JournaledTransaction {
418    pub checkpoint: JournalCheckpoint,
419    pub root_parent_mutations: Vec<Mutation>,
420    pub root_mutations: Vec<Mutation>,
421    /// List of (store_object_id, mutation).
422    pub non_root_mutations: Vec<(u64, Mutation)>,
423    pub end_offset: u64,
424    pub checksums: Vec<JournaledChecksums>,
425
426    /// Records offset + 1 of the matching begin_flush transaction. The +1 is because we want to
427    /// ignore the begin flush transaction; we don't need or want to replay it.
428    pub end_flush: Option<(/* store_id: */ u64, /* begin offset: */ u64)>,
429
430    /// The volume which was deleted in this transaction, if any.
431    pub volume_deleted: Option</* store_id: */ u64>,
432}
433
434impl JournaledTransaction {
435    fn new(checkpoint: JournalCheckpoint) -> Self {
436        Self { checkpoint, ..Default::default() }
437    }
438}
439
440const VOLUME_DELETED: u64 = u64::MAX;
441
442#[derive(Debug)]
443pub struct JournaledChecksums {
444    pub device_range: Range<u64>,
445    pub checksums: Checksums,
446    pub first_write: bool,
447}
448
449/// Handles for journal-like objects have some additional functionality to manage their extents,
450/// since during replay we need to add extents as we find them.
451pub trait JournalHandle: ReadObjectHandle {
452    /// The end offset of the last extent in the JournalHandle.  Used only for validating extents
453    /// (which will be skipped if None is returned).
454    /// Note this is equivalent in value to ReadObjectHandle::get_size, when present.
455    fn end_offset(&self) -> Option<u64>;
456    /// Adds an extent to the current end of the journal stream.
457    /// `added_offset` is the offset into the journal of the transaction which added this extent,
458    /// used for discard_extents.
459    fn push_extent(&mut self, added_offset: u64, device_range: Range<u64>);
460    /// Discards all extents which were added in a transaction at offset >= |discard_offset|.
461    fn discard_extents(&mut self, discard_offset: u64);
462}
463
464// Provide a stub implementation for DataObjectHandle so we can use it in
465// Journal::read_transactions.  Manual extent management is a NOP (which is OK since presumably the
466// DataObjectHandle already knows where its extents live).
467impl<S: HandleOwner> JournalHandle for DataObjectHandle<S> {
468    fn end_offset(&self) -> Option<u64> {
469        None
470    }
471    fn push_extent(&mut self, _added_offset: u64, _device_range: Range<u64>) {
472        // NOP
473    }
474    fn discard_extents(&mut self, _discard_offset: u64) {
475        // NOP
476    }
477}
478
479#[fxfs_trace::trace]
480impl Journal {
481    pub fn new(objects: Arc<ObjectManager>, options: JournalOptions) -> Journal {
482        let starting_checksum = rand::random_range(1..u64::MAX);
483        Journal {
484            objects: objects,
485            handle: OnceLock::new(),
486            super_block_manager: SuperBlockManager::new(),
487            inner: Mutex::new(Inner {
488                super_block_header: SuperBlockHeader::default(),
489                zero_offset: None,
490                device_flushed_offset: 0,
491                needs_did_flush_device: false,
492                writer: JournalWriter::new(BLOCK_SIZE as usize, starting_checksum),
493                output_reset_version: false,
494                flush_waker: None,
495                terminate: false,
496                terminate_reason: None,
497                disable_compactions: false,
498                compaction_running: false,
499                sync_waker: None,
500                flushed_offset: 0,
501                valid_to: 0,
502                discard_offset: None,
503                reclaim_size: options.reclaim_size,
504                image_builder_mode: None,
505                barriers_enabled: options.barriers_enabled,
506                needs_barrier: false,
507                forced_compaction: false,
508            }),
509            writer_mutex: Mutex::new(()),
510            sync_mutex: futures::lock::Mutex::new(()),
511            trace: AtomicBool::new(false),
512            reclaim_event: Event::new(),
513        }
514    }
515
516    pub fn set_trace(&self, trace: bool) {
517        let old_value = self.trace.swap(trace, Ordering::Relaxed);
518        if trace != old_value {
519            info!(trace; "J: trace");
520        }
521    }
522
523    pub fn set_image_builder_mode(&self, mode: Option<SuperBlockInstance>) {
524        self.inner.lock().image_builder_mode = mode;
525        if let Some(instance) = mode {
526            *self.super_block_manager.next_instance.lock() = instance;
527        }
528    }
529
530    pub fn image_builder_mode(&self) -> Option<SuperBlockInstance> {
531        self.inner.lock().image_builder_mode
532    }
533
534    #[cfg(feature = "migration")]
535    pub fn set_filesystem_uuid(&self, uuid: &[u8; 16]) -> Result<(), Error> {
536        ensure!(
537            self.inner.lock().image_builder_mode.is_some(),
538            "Can only set filesystem uuid in image builder mode."
539        );
540        self.inner.lock().super_block_header.guid.0 = uuid::Uuid::from_bytes(*uuid);
541        Ok(())
542    }
543
544    pub(crate) async fn read_superblocks(
545        &self,
546        device: Arc<dyn Device>,
547        block_size: u64,
548    ) -> Result<(SuperBlockHeader, ObjectStore), Error> {
549        self.super_block_manager.load(device, block_size).await
550    }
551
552    /// Used during replay to validate a mutation.  This should return false if the mutation is not
553    /// valid and should not be applied.  This could be for benign reasons: e.g. the device flushed
554    /// data out-of-order, or because of a malicious actor.
555    fn validate_mutation(&self, mutation: &Mutation, block_size: u64, device_size: u64) -> bool {
556        match mutation {
557            Mutation::ObjectStore(ObjectStoreMutation {
558                item:
559                    Item {
560                        key:
561                            ObjectKey {
562                                data: ObjectKeyData::Attribute(_, AttributeKey::Extent(extent)),
563                                ..
564                            },
565                        value: ObjectValue::Extent(ExtentValue::Some { device_offset, mode, .. }),
566                        ..
567                    },
568                ..
569            }) => {
570                if extent.is_empty() || !extent.is_aligned(block_size) {
571                    return false;
572                }
573                let len = extent.length().unwrap();
574                if let ExtentMode::Cow(checksums) = mode {
575                    if checksums.len() > 0 {
576                        if len % checksums.len() as u64 != 0 {
577                            return false;
578                        }
579                        if (len / checksums.len() as u64) % block_size != 0 {
580                            return false;
581                        }
582                    }
583                }
584                if *device_offset % block_size != 0
585                    || *device_offset >= device_size
586                    || device_size - *device_offset < len
587                {
588                    return false;
589                }
590            }
591            Mutation::ObjectStore(_) => {}
592            Mutation::EncryptedObjectStore(_) => {}
593            Mutation::Allocator(AllocatorMutation::Allocate { device_range, owner_object_id }) => {
594                return !device_range.is_empty()
595                    && *owner_object_id != INVALID_OBJECT_ID
596                    && device_range.end <= device_size;
597            }
598            Mutation::Allocator(AllocatorMutation::Deallocate {
599                device_range,
600                owner_object_id,
601            }) => {
602                return !device_range.is_empty()
603                    && *owner_object_id != INVALID_OBJECT_ID
604                    && device_range.end <= device_size;
605            }
606            Mutation::Allocator(AllocatorMutation::MarkForDeletion(owner_object_id)) => {
607                return *owner_object_id != INVALID_OBJECT_ID;
608            }
609            Mutation::Allocator(AllocatorMutation::SetLimit { owner_object_id, .. }) => {
610                return *owner_object_id != INVALID_OBJECT_ID;
611            }
612            Mutation::BeginFlush => {}
613            Mutation::EndFlush => {}
614            Mutation::DeleteVolume => {}
615            Mutation::UpdateBorrowed(_) => {}
616            Mutation::UpdateMutationsKey(_) => {}
617            Mutation::CreateInternalDir(owner_object_id) => {
618                return *owner_object_id != INVALID_OBJECT_ID;
619            }
620        }
621        true
622    }
623
624    // Assumes that `mutation` has been validated.
625    fn update_checksum_list(
626        &self,
627        journal_offset: u64,
628        mutation: &Mutation,
629        checksum_list: &mut ChecksumList,
630    ) -> Result<(), Error> {
631        match mutation {
632            Mutation::ObjectStore(_) => {}
633            Mutation::Allocator(AllocatorMutation::Deallocate { device_range, .. }) => {
634                checksum_list.mark_deallocated(journal_offset, device_range.clone().into());
635            }
636            _ => {}
637        }
638        Ok(())
639    }
640
641    /// Reads the latest super-block, and then replays journaled records.
642    #[trace]
643    pub async fn replay(
644        &self,
645        filesystem: Arc<FxFilesystem>,
646        on_new_allocator: Option<Box<dyn Fn(Arc<Allocator>) + Send + Sync>>,
647    ) -> Result<(), Error> {
648        let block_size = filesystem.block_size();
649
650        let (super_block, root_parent) =
651            self.super_block_manager.load(filesystem.device(), block_size).await?;
652
653        let root_parent = Arc::new(ObjectStore::attach_filesystem(root_parent, filesystem.clone()));
654
655        self.objects.set_root_parent_store(root_parent.clone());
656        let allocator =
657            Arc::new(Allocator::new(filesystem.clone(), super_block.allocator_object_id));
658        if let Some(on_new_allocator) = on_new_allocator {
659            on_new_allocator(allocator.clone());
660        }
661        self.objects.set_allocator(allocator.clone());
662        self.objects.set_borrowed_metadata_space(super_block.borrowed_metadata_space);
663        self.objects.set_last_end_offset(super_block.super_block_journal_file_offset);
664        {
665            let mut inner = self.inner.lock();
666            inner.super_block_header = super_block.clone();
667        }
668
669        let device = filesystem.device();
670
671        let mut handle;
672        {
673            let root_parent_layer = root_parent.tree().mutable_layer();
674            let mut iter = root_parent_layer
675                .seek(Bound::Included(&ObjectKey::attribute(
676                    super_block.journal_object_id,
677                    AttributeId::DATA,
678                    AttributeKey::Extent(Extent::search_key_from_offset(round_down(
679                        super_block.journal_checkpoint.file_offset,
680                        BLOCK_SIZE,
681                    ))),
682                )))
683                .await
684                .context("Failed to seek root parent store")?;
685            let start_offset = if let Some(ItemRef {
686                key:
687                    ObjectKey {
688                        data:
689                            ObjectKeyData::Attribute(AttributeId::DATA, AttributeKey::Extent(extent)),
690                        ..
691                    },
692                ..
693            }) = iter.get()
694            {
695                extent.start
696            } else {
697                0
698            };
699            handle = BootstrapObjectHandle::new_with_start_offset(
700                super_block.journal_object_id,
701                device.clone(),
702                start_offset,
703            );
704            while let Some(item) = iter.get() {
705                if !match item.into() {
706                    Some((
707                        object_id,
708                        AttributeId::DATA,
709                        extent,
710                        ExtentValue::Some { device_offset, .. },
711                    )) if object_id == super_block.journal_object_id => {
712                        if let Some(end_offset) = handle.end_offset() {
713                            if extent.start != end_offset {
714                                bail!(anyhow!(FxfsError::Inconsistent).context(format!(
715                                    "Unexpected journal extent {:?}, expected start: {}",
716                                    item, end_offset
717                                )));
718                            }
719                        }
720                        handle.push_extent(
721                            0, // We never discard extents from the root parent store.
722                            *device_offset
723                                ..*device_offset + extent.length().context("Invalid extent")?,
724                        );
725                        true
726                    }
727                    _ => false,
728                } {
729                    break;
730                }
731                iter.advance().await.context("Failed to advance root parent store iterator")?;
732            }
733        }
734
735        let mut reader = JournalReader::new(handle, &super_block.journal_checkpoint);
736        let JournaledTransactions { mut transactions, device_flushed_offset } = self
737            .read_transactions(&mut reader, None, INVALID_OBJECT_ID)
738            .await
739            .context("Reading transactions for replay")?;
740
741        // Validate all the mutations.
742        let mut checksum_list = ChecksumList::new(device_flushed_offset);
743        let mut valid_to = reader.journal_file_checkpoint().file_offset;
744        let device_size = device.size();
745        'bad_replay: for JournaledTransaction {
746            checkpoint,
747            root_parent_mutations,
748            root_mutations,
749            non_root_mutations,
750            checksums,
751            ..
752        } in &transactions
753        {
754            for JournaledChecksums { device_range, checksums, first_write } in checksums {
755                checksum_list
756                    .push(
757                        checkpoint.file_offset,
758                        device_range.clone(),
759                        checksums.maybe_as_ref().context("Malformed checksums")?,
760                        *first_write,
761                    )
762                    .context("Pushing journal checksum records to checksum list")?;
763            }
764            for mutation in root_parent_mutations
765                .iter()
766                .chain(root_mutations)
767                .chain(non_root_mutations.iter().map(|(_, m)| m))
768            {
769                if !self.validate_mutation(mutation, block_size, device_size) {
770                    info!(mutation:?; "Stopping replay at bad mutation");
771                    valid_to = checkpoint.file_offset;
772                    break 'bad_replay;
773                }
774                self.update_checksum_list(checkpoint.file_offset, &mutation, &mut checksum_list)?;
775            }
776        }
777
778        // Validate the checksums. Note if barriers are enabled, there will be no checksums in
779        // practice to verify.
780        let valid_to = checksum_list
781            .verify(device.as_ref(), valid_to)
782            .await
783            .context("Failed to validate checksums")?;
784
785        // Apply the mutations...
786
787        let mut last_checkpoint = reader.journal_file_checkpoint();
788        let mut journal_offsets = super_block.journal_file_offsets.clone();
789
790        // Start with the root-parent mutations, and also determine the journal offsets for all
791        // other objects.
792        for (
793            index,
794            JournaledTransaction {
795                checkpoint,
796                root_parent_mutations,
797                end_flush,
798                volume_deleted,
799                ..
800            },
801        ) in transactions.iter_mut().enumerate()
802        {
803            if checkpoint.file_offset >= valid_to {
804                last_checkpoint = checkpoint.clone();
805
806                // Truncate the transactions so we don't need to worry about them on the next pass.
807                transactions.truncate(index);
808                break;
809            }
810
811            let context = ApplyContext { mode: ApplyMode::Replay, checkpoint: checkpoint.clone() };
812            for mutation in root_parent_mutations.drain(..) {
813                self.objects
814                    .apply_mutation(
815                        super_block.root_parent_store_object_id,
816                        mutation,
817                        &context,
818                        AssocObj::None,
819                    )
820                    .context("Failed to replay root parent store mutations")?;
821            }
822
823            if let Some((object_id, journal_offset)) = end_flush {
824                journal_offsets.insert(*object_id, *journal_offset);
825            }
826
827            if let Some(object_id) = volume_deleted {
828                journal_offsets.insert(*object_id, VOLUME_DELETED);
829            }
830        }
831
832        // Now we can open the root store.
833        let root_store = ObjectStore::open(
834            &root_parent,
835            super_block.root_store_object_id,
836            Box::new(NullCache {}),
837        )
838        .await
839        .context("Unable to open root store")?;
840
841        ensure!(
842            !root_store.is_encrypted(),
843            anyhow!(FxfsError::Inconsistent).context("Root store is encrypted")
844        );
845        self.objects.set_root_store(root_store);
846
847        let root_store_offset =
848            journal_offsets.get(&super_block.root_store_object_id).copied().unwrap_or(0);
849
850        // Now replay the root store mutations.
851        for JournaledTransaction { checkpoint, root_mutations, .. } in &mut transactions {
852            if checkpoint.file_offset < root_store_offset {
853                continue;
854            }
855
856            let context = ApplyContext { mode: ApplyMode::Replay, checkpoint: checkpoint.clone() };
857            for mutation in root_mutations.drain(..) {
858                self.objects
859                    .apply_mutation(
860                        super_block.root_store_object_id,
861                        mutation,
862                        &context,
863                        AssocObj::None,
864                    )
865                    .context("Failed to replay root store mutations")?;
866            }
867        }
868
869        // Now we can open the allocator.
870        allocator.open().await.context("Failed to open allocator")?;
871
872        // Now replay all other mutations.
873        for JournaledTransaction { checkpoint, non_root_mutations, end_offset, .. } in transactions
874        {
875            self.objects
876                .replay_mutations(
877                    non_root_mutations,
878                    &journal_offsets,
879                    &ApplyContext { mode: ApplyMode::Replay, checkpoint },
880                    end_offset,
881                )
882                .await
883                .context("Failed to replay mutations")?;
884        }
885
886        allocator.on_replay_complete().await.context("Failed to complete replay for allocator")?;
887
888        let discarded_to =
889            if last_checkpoint.file_offset != reader.journal_file_checkpoint().file_offset {
890                Some(reader.journal_file_checkpoint().file_offset)
891            } else {
892                None
893            };
894
895        // Configure the journal writer so that we can continue.
896        {
897            if last_checkpoint.file_offset < super_block.super_block_journal_file_offset {
898                return Err(anyhow!(FxfsError::Inconsistent).context(format!(
899                    "journal replay cut short; journal finishes at {}, but super-block was \
900                     written at {}",
901                    last_checkpoint.file_offset, super_block.super_block_journal_file_offset
902                )));
903            }
904            let handle = ObjectStore::open_object(
905                &root_parent,
906                super_block.journal_object_id,
907                journal_handle_options(),
908                None,
909            )
910            .await
911            .with_context(|| {
912                format!(
913                    "Failed to open journal file (object id: {})",
914                    super_block.journal_object_id
915                )
916            })?;
917            let _ = self.handle.set(handle);
918            let mut inner = self.inner.lock();
919            reader.skip_to_end_of_block();
920            let mut writer_checkpoint = reader.journal_file_checkpoint();
921
922            // Make sure we don't accidentally use the reader from now onwards.
923            std::mem::drop(reader);
924
925            // Reset the stream to indicate that we've remounted the journal.
926            writer_checkpoint.checksum ^= RESET_XOR;
927            writer_checkpoint.version = LATEST_VERSION;
928            inner.flushed_offset = writer_checkpoint.file_offset;
929
930            // When we open the filesystem as writable, we flush the device.
931            inner.device_flushed_offset = inner.flushed_offset;
932
933            inner.writer.seek(writer_checkpoint);
934            inner.output_reset_version = true;
935            inner.valid_to = last_checkpoint.file_offset;
936            if last_checkpoint.file_offset < inner.flushed_offset {
937                inner.discard_offset = Some(last_checkpoint.file_offset);
938            }
939        }
940
941        self.objects
942            .on_replay_complete()
943            .await
944            .context("Failed to complete replay for object manager")?;
945
946        info!(checkpoint = last_checkpoint.file_offset, discarded_to; "replay complete");
947        Ok(())
948    }
949
950    async fn read_transactions(
951        &self,
952        reader: &mut JournalReader,
953        end_offset: Option<u64>,
954        object_id_filter: u64,
955    ) -> Result<JournaledTransactions, Error> {
956        let mut transactions = Vec::new();
957        let (mut device_flushed_offset, root_parent_store_object_id, root_store_object_id) = {
958            let super_block = &self.inner.lock().super_block_header;
959            (
960                super_block.super_block_journal_file_offset,
961                super_block.root_parent_store_object_id,
962                super_block.root_store_object_id,
963            )
964        };
965        let mut current_transaction = None;
966        let mut begin_flush_offsets = HashMap::default();
967        let mut stores_deleted = HashSet::new();
968        loop {
969            // Cache the checkpoint before we deserialize a record.
970            let checkpoint = reader.journal_file_checkpoint();
971            if let Some(end_offset) = end_offset {
972                if checkpoint.file_offset >= end_offset {
973                    break;
974                }
975            }
976            let result =
977                reader.deserialize().await.context("Failed to deserialize journal record")?;
978            match result {
979                ReadResult::Reset(_) => {
980                    if current_transaction.is_some() {
981                        current_transaction = None;
982                        transactions.pop();
983                    }
984                    let offset = reader.journal_file_checkpoint().file_offset;
985                    if offset > device_flushed_offset {
986                        device_flushed_offset = offset;
987                    }
988                }
989                ReadResult::Some(record) => {
990                    match record {
991                        JournalRecord::EndBlock => {
992                            reader.skip_to_end_of_block();
993                        }
994                        JournalRecord::Mutation { object_id, mutation } => {
995                            let current_transaction = match current_transaction.as_mut() {
996                                None => {
997                                    transactions.push(JournaledTransaction::new(checkpoint));
998                                    current_transaction = transactions.last_mut();
999                                    current_transaction.as_mut().unwrap()
1000                                }
1001                                Some(transaction) => transaction,
1002                            };
1003
1004                            if stores_deleted.contains(&object_id) {
1005                                bail!(
1006                                    anyhow!(FxfsError::Inconsistent)
1007                                        .context("Encountered mutations for deleted store")
1008                                );
1009                            }
1010
1011                            match &mutation {
1012                                Mutation::BeginFlush => {
1013                                    begin_flush_offsets.insert(
1014                                        object_id,
1015                                        current_transaction.checkpoint.file_offset,
1016                                    );
1017                                }
1018                                Mutation::EndFlush => {
1019                                    if let Some(offset) = begin_flush_offsets.remove(&object_id) {
1020                                        if let Some(deleted_volume) =
1021                                            &current_transaction.volume_deleted
1022                                        {
1023                                            if *deleted_volume == object_id {
1024                                                bail!(anyhow!(FxfsError::Inconsistent).context(
1025                                                    "Multiple EndFlush/DeleteVolume mutations in a \
1026                                                    single transaction for the same object"
1027                                                ));
1028                                            }
1029                                        }
1030                                        // The +1 is because we don't want to replay the transaction
1031                                        // containing the begin flush; we don't need or want to
1032                                        // replay it.
1033                                        if current_transaction
1034                                            .end_flush
1035                                            .replace((object_id, offset + 1))
1036                                            .is_some()
1037                                        {
1038                                            bail!(anyhow!(FxfsError::Inconsistent).context(
1039                                                "Multiple EndFlush mutations in a \
1040                                                 single transaction"
1041                                            ));
1042                                        }
1043                                    }
1044                                }
1045                                Mutation::DeleteVolume => {
1046                                    if let Some((flushed_object, _)) =
1047                                        &current_transaction.end_flush
1048                                    {
1049                                        if *flushed_object == object_id {
1050                                            bail!(anyhow!(FxfsError::Inconsistent).context(
1051                                                "Multiple EndFlush/DeleteVolume mutations in a \
1052                                                    single transaction for the same object"
1053                                            ));
1054                                        }
1055                                    }
1056                                    if current_transaction
1057                                        .volume_deleted
1058                                        .replace(object_id)
1059                                        .is_some()
1060                                    {
1061                                        bail!(anyhow!(FxfsError::Inconsistent).context(
1062                                            "Multiple DeleteVolume mutations in a single \
1063                                             transaction"
1064                                        ));
1065                                    }
1066                                    stores_deleted.insert(object_id);
1067                                }
1068                                _ => {}
1069                            }
1070
1071                            // If this mutation doesn't need to be applied, don't bother adding it
1072                            // to the transaction.
1073                            if (object_id_filter == INVALID_OBJECT_ID
1074                                || object_id_filter == object_id)
1075                                && self.should_apply(object_id, &current_transaction.checkpoint)
1076                            {
1077                                if object_id == root_parent_store_object_id {
1078                                    current_transaction.root_parent_mutations.push(mutation);
1079                                } else if object_id == root_store_object_id {
1080                                    current_transaction.root_mutations.push(mutation);
1081                                } else {
1082                                    current_transaction
1083                                        .non_root_mutations
1084                                        .push((object_id, mutation));
1085                                }
1086                            }
1087                        }
1088                        JournalRecord::DataChecksums(device_range, checksums, first_write) => {
1089                            let current_transaction = match current_transaction.as_mut() {
1090                                None => {
1091                                    transactions.push(JournaledTransaction::new(checkpoint));
1092                                    current_transaction = transactions.last_mut();
1093                                    current_transaction.as_mut().unwrap()
1094                                }
1095                                Some(transaction) => transaction,
1096                            };
1097                            current_transaction.checksums.push(JournaledChecksums {
1098                                device_range,
1099                                checksums,
1100                                first_write,
1101                            });
1102                        }
1103                        JournalRecord::Commit => {
1104                            if let Some(&mut JournaledTransaction {
1105                                ref checkpoint,
1106                                ref root_parent_mutations,
1107                                ref mut end_offset,
1108                                ..
1109                            }) = current_transaction.take()
1110                            {
1111                                for mutation in root_parent_mutations {
1112                                    // Snoop the mutations for any that might apply to the journal
1113                                    // file so that we can pass them to the reader so that it can
1114                                    // read the journal file.
1115                                    if let Mutation::ObjectStore(ObjectStoreMutation {
1116                                        item:
1117                                            Item {
1118                                                key:
1119                                                    ObjectKey {
1120                                                        object_id,
1121                                                        data:
1122                                                            ObjectKeyData::Attribute(
1123                                                                AttributeId::DATA,
1124                                                                AttributeKey::Extent(extent),
1125                                                            ),
1126                                                        ..
1127                                                    },
1128                                                value:
1129                                                    ObjectValue::Extent(ExtentValue::Some {
1130                                                        device_offset,
1131                                                        ..
1132                                                    }),
1133                                                ..
1134                                            },
1135                                        ..
1136                                    }) = mutation
1137                                    {
1138                                        // Add the journal extents we find on the way to our
1139                                        // reader.
1140                                        let handle = reader.handle();
1141                                        if *object_id != handle.object_id() {
1142                                            continue;
1143                                        }
1144                                        if let Some(end_offset) = handle.end_offset() {
1145                                            if extent.start != end_offset {
1146                                                bail!(anyhow!(FxfsError::Inconsistent).context(
1147                                                    format!(
1148                                                        "Unexpected journal extent {:?} -> {}, \
1149                                                           expected start: {}",
1150                                                        *extent, device_offset, end_offset,
1151                                                    )
1152                                                ));
1153                                            }
1154                                        }
1155                                        handle.push_extent(
1156                                            checkpoint.file_offset,
1157                                            *device_offset
1158                                                ..*device_offset
1159                                                    + extent.length().context("Invalid extent")?,
1160                                        );
1161                                    }
1162                                }
1163                                *end_offset = reader.journal_file_checkpoint().file_offset;
1164                            }
1165                        }
1166                        JournalRecord::Discard(offset) => {
1167                            if offset == 0 {
1168                                bail!(
1169                                    anyhow!(FxfsError::Inconsistent)
1170                                        .context("Invalid offset for Discard")
1171                                );
1172                            }
1173                            if let Some(transaction) = current_transaction.as_ref() {
1174                                if transaction.checkpoint.file_offset < offset {
1175                                    // Odd, but OK.
1176                                    continue;
1177                                }
1178                            }
1179                            current_transaction = None;
1180                            while let Some(transaction) = transactions.last() {
1181                                if transaction.checkpoint.file_offset < offset {
1182                                    break;
1183                                }
1184                                transactions.pop();
1185                            }
1186                            reader.handle().discard_extents(offset);
1187                        }
1188                        JournalRecord::DidFlushDevice(offset) => {
1189                            if offset > device_flushed_offset {
1190                                device_flushed_offset = offset;
1191                            }
1192                        }
1193                    }
1194                }
1195                // This is expected when we reach the end of the journal stream.
1196                ReadResult::ChecksumMismatch => break,
1197            }
1198        }
1199
1200        // Discard any uncommitted transaction.
1201        if current_transaction.is_some() {
1202            transactions.pop();
1203        }
1204
1205        Ok(JournaledTransactions { transactions, device_flushed_offset })
1206    }
1207
1208    /// Creates an empty filesystem with the minimum viable objects (including a root parent and
1209    /// root store but no further child stores).
1210    pub async fn init_empty(&self, filesystem: Arc<FxFilesystem>) -> Result<(), Error> {
1211        // The following constants are only used at format time. When mounting, the recorded values
1212        // in the superblock should be used.  The root parent store does not have a parent, but
1213        // needs an object ID to be registered with ObjectManager, so it cannot collide (i.e. have
1214        // the same object ID) with any objects in the root store that use the journal to track
1215        // mutations.
1216        const INIT_ROOT_PARENT_STORE_OBJECT_ID: u64 = 3;
1217        const INIT_ROOT_STORE_OBJECT_ID: u64 = 4;
1218        const INIT_ALLOCATOR_OBJECT_ID: u64 = 5;
1219
1220        info!(device_size = filesystem.device().size(); "Formatting");
1221
1222        let checkpoint = JournalCheckpoint {
1223            version: LATEST_VERSION,
1224            ..self.inner.lock().writer.journal_file_checkpoint()
1225        };
1226
1227        let mut current_generation = 1;
1228        if filesystem.options().image_builder_mode.is_some() {
1229            // Note that in non-image_builder_mode we write both superblocks when we format
1230            // (in FxFilesystemBuilder::open). In image_builder_mode we only write once at the end
1231            // as part of finalize(), which is why we must make sure the generation we write is
1232            // newer than any existing generation.
1233
1234            // Note: This should is the *filesystem* block size, not the device block size which
1235            // is currently always 4096 (https://fxbug.dev/42063349)
1236            let block_size = filesystem.block_size();
1237            match self.read_superblocks(filesystem.device(), block_size).await {
1238                Ok((super_block, _)) => {
1239                    log::info!(
1240                        "Found existing superblock with generation {}. Bumping by 1.",
1241                        super_block.generation
1242                    );
1243                    current_generation = super_block.generation.wrapping_add(1);
1244                }
1245                Err(_) => {
1246                    // TODO(https://fxbug.dev/463757813): It's not unusual to fail to read
1247                    // superblocks when we're formatting a new filesystem but we should probably
1248                    // fail the format if we get an IO error.
1249                }
1250            }
1251        }
1252
1253        let root_parent = ObjectStore::new_empty(
1254            None,
1255            INIT_ROOT_PARENT_STORE_OBJECT_ID,
1256            filesystem.clone(),
1257            Box::new(NullCache {}),
1258        );
1259        self.objects.set_root_parent_store(root_parent.clone());
1260
1261        let allocator = Arc::new(Allocator::new(filesystem.clone(), INIT_ALLOCATOR_OBJECT_ID));
1262        self.objects.set_allocator(allocator.clone());
1263        self.objects.init_metadata_reservation()?;
1264
1265        let journal_handle;
1266        let super_block_a_handle;
1267        let super_block_b_handle;
1268        let root_store;
1269        let mut transaction = filesystem
1270            .clone()
1271            .new_transaction(
1272                lock_keys![],
1273                Options { skip_journal_checks: true, ..Default::default() },
1274            )
1275            .await?;
1276        root_store = root_parent
1277            .new_child_store(
1278                &mut transaction,
1279                NewChildStoreOptions { object_id: INIT_ROOT_STORE_OBJECT_ID, ..Default::default() },
1280                Box::new(NullCache {}),
1281            )
1282            .await
1283            .context("new_child_store")?;
1284        self.objects.set_root_store(root_store.clone());
1285
1286        allocator.create(&mut transaction).await?;
1287
1288        // Create the super-block objects...
1289        super_block_a_handle = ObjectStore::create_object_with_id(
1290            &root_store,
1291            &mut transaction,
1292            ReservedId::new(&root_store, NonZero::new(SuperBlockInstance::A.object_id()).unwrap()),
1293            HandleOptions::default(),
1294            None,
1295        )
1296        .context("create super block")?;
1297        root_store.update_last_object_id(SuperBlockInstance::A.object_id());
1298        super_block_a_handle
1299            .extend(&mut transaction, SuperBlockInstance::A.first_extent())
1300            .await
1301            .context("extend super block")?;
1302        super_block_b_handle = ObjectStore::create_object_with_id(
1303            &root_store,
1304            &mut transaction,
1305            ReservedId::new(&root_store, NonZero::new(SuperBlockInstance::B.object_id()).unwrap()),
1306            HandleOptions::default(),
1307            None,
1308        )
1309        .context("create super block")?;
1310        root_store.update_last_object_id(SuperBlockInstance::B.object_id());
1311        super_block_b_handle
1312            .extend(&mut transaction, SuperBlockInstance::B.first_extent())
1313            .await
1314            .context("extend super block")?;
1315
1316        // the journal object...
1317        journal_handle = ObjectStore::create_object(
1318            &root_parent,
1319            &mut transaction,
1320            journal_handle_options(),
1321            None,
1322        )
1323        .await
1324        .context("create journal")?;
1325        if self.inner.lock().image_builder_mode.is_none() {
1326            let mut file_range = 0..self.chunk_size();
1327            journal_handle
1328                .preallocate_range(&mut transaction, &mut file_range)
1329                .await
1330                .context("preallocate journal")?;
1331            if file_range.start < file_range.end {
1332                bail!("preallocate_range returned too little space");
1333            }
1334        }
1335
1336        // Write the root store object info.
1337        root_store.create(&mut transaction).await?;
1338
1339        // The root parent graveyard.
1340        root_parent.set_graveyard_directory_object_id(
1341            Graveyard::create(&mut transaction, &root_parent).await?,
1342        );
1343
1344        transaction.commit().await?;
1345
1346        self.inner.lock().super_block_header = SuperBlockHeader::new(
1347            current_generation,
1348            root_parent.store_object_id(),
1349            root_parent.graveyard_directory_object_id(),
1350            root_store.store_object_id(),
1351            allocator.object_id(),
1352            journal_handle.object_id(),
1353            checkpoint,
1354            /* earliest_version: */ LATEST_VERSION,
1355        );
1356
1357        // Initialize the journal writer.
1358        let _ = self.handle.set(journal_handle);
1359        Ok(())
1360    }
1361
1362    /// Normally we allocate the journal when creating the filesystem.
1363    /// This is used image_builder_mode when journal allocation is done last.
1364    pub async fn allocate_journal(&self) -> Result<(), Error> {
1365        let handle = self.handle.get().unwrap();
1366        let filesystem = handle.store().filesystem();
1367        let mut transaction = filesystem
1368            .clone()
1369            .new_transaction(
1370                lock_keys![LockKey::object(handle.store().store_object_id(), handle.object_id()),],
1371                Options { skip_journal_checks: true, ..Default::default() },
1372            )
1373            .await?;
1374        let mut file_range = 0..self.chunk_size();
1375        self.handle
1376            .get()
1377            .unwrap()
1378            .preallocate_range(&mut transaction, &mut file_range)
1379            .await
1380            .context("preallocate journal")?;
1381        if file_range.start < file_range.end {
1382            bail!("preallocate_range returned too little space");
1383        }
1384        transaction.commit().await?;
1385        Ok(())
1386    }
1387
1388    pub async fn init_superblocks(&self) -> Result<(), Error> {
1389        // Overwrite both superblocks.
1390        for _ in 0..2 {
1391            self.write_super_block().await?;
1392        }
1393        Ok(())
1394    }
1395
1396    /// Takes a snapshot of all journaled transactions which affect |object_id| since its last
1397    /// flush.
1398    /// The caller is responsible for locking; it must ensure that the journal is not trimmed during
1399    /// this call.  For example, a Flush lock could be held on the object in question (assuming that
1400    /// object has data to flush and is registered with ObjectManager).
1401    pub async fn read_transactions_for_object(
1402        &self,
1403        object_id: u64,
1404    ) -> Result<Vec<JournaledTransaction>, Error> {
1405        let handle = self.handle.get().expect("No journal handle");
1406        // Reopen the handle since JournalReader needs an owned handle.
1407        let handle = ObjectStore::open_object(
1408            handle.owner(),
1409            handle.object_id(),
1410            journal_handle_options(),
1411            None,
1412        )
1413        .await?;
1414
1415        let checkpoint = match self.objects.journal_checkpoint(object_id) {
1416            Some(checkpoint) => checkpoint,
1417            None => return Ok(vec![]),
1418        };
1419        let mut reader = JournalReader::new(handle, &checkpoint);
1420        // Record the current end offset and only read to there, so we don't accidentally read any
1421        // partially flushed blocks.
1422        let end_offset = self.inner.lock().valid_to;
1423        Ok(self.read_transactions(&mut reader, Some(end_offset), object_id).await?.transactions)
1424    }
1425
1426    /// Commits a transaction.  This is not thread safe; the caller must take appropriate locks.
1427    pub async fn commit(&self, transaction: &mut Transaction<'_>) -> Result<u64, Error> {
1428        if transaction.is_empty() {
1429            return Ok(self.inner.lock().writer.journal_file_checkpoint().file_offset);
1430        }
1431
1432        self.pre_commit(transaction).await?;
1433        Ok(self.write_and_apply_mutations(transaction))
1434    }
1435
1436    // Before we commit, we might need to extend the journal or write pending records to the
1437    // journal.
1438    async fn pre_commit(&self, transaction: &Transaction<'_>) -> Result<(), Error> {
1439        let handle;
1440
1441        let (size, zero_offset) = {
1442            let mut inner = self.inner.lock();
1443
1444            // If this is the first write after a RESET, we need to output version first.
1445            if std::mem::take(&mut inner.output_reset_version) {
1446                LATEST_VERSION.serialize_into(&mut inner.writer)?;
1447            }
1448
1449            if let Some(discard_offset) = inner.discard_offset {
1450                JournalRecord::Discard(discard_offset).serialize_into(&mut inner.writer)?;
1451                inner.discard_offset = None;
1452            }
1453
1454            if inner.needs_did_flush_device {
1455                let offset = inner.device_flushed_offset;
1456                JournalRecord::DidFlushDevice(offset).serialize_into(&mut inner.writer)?;
1457                inner.needs_did_flush_device = false;
1458            }
1459
1460            handle = match self.handle.get() {
1461                None => return Ok(()),
1462                Some(x) => x,
1463            };
1464
1465            let file_offset = inner.writer.journal_file_checkpoint().file_offset;
1466
1467            let size = handle.get_size();
1468            let size = if file_offset + self.chunk_size() > size { Some(size) } else { None };
1469
1470            if size.is_none()
1471                && inner.zero_offset.is_none()
1472                && !self.objects.needs_borrow_for_journal(file_offset)
1473            {
1474                return Ok(());
1475            }
1476
1477            (size, inner.zero_offset)
1478        };
1479
1480        let mut transaction = handle
1481            .new_transaction_with_options(Options {
1482                skip_journal_checks: true,
1483                borrow_metadata_space: true,
1484                allocator_reservation: Some(self.objects.metadata_reservation()),
1485                txn_guard: Some(transaction.txn_guard()),
1486                ..Default::default()
1487            })
1488            .await?;
1489        if let Some(size) = size {
1490            handle
1491                .preallocate_range(&mut transaction, &mut (size..size + self.chunk_size()))
1492                .await?;
1493        }
1494        if let Some(zero_offset) = zero_offset {
1495            handle.zero(&mut transaction, 0..zero_offset).await?;
1496        }
1497
1498        // We can't use regular transaction commit, because that can cause re-entrancy issues, so
1499        // instead we just apply the transaction directly here.
1500        self.write_and_apply_mutations(&mut transaction);
1501
1502        let mut inner = self.inner.lock();
1503
1504        // Make sure the transaction to extend the journal made it to the journal within the old
1505        // size, since otherwise, it won't be possible to replay.
1506        if let Some(size) = size {
1507            assert!(inner.writer.journal_file_checkpoint().file_offset < size);
1508        }
1509
1510        if inner.zero_offset == zero_offset {
1511            inner.zero_offset = None;
1512        }
1513
1514        Ok(())
1515    }
1516
1517    // Determines whether a mutation at the given checkpoint should be applied.  During replay, not
1518    // all records should be applied because the object store or allocator might already contain the
1519    // mutation.  After replay, that obviously isn't the case and we want to apply all mutations.
1520    fn should_apply(&self, object_id: u64, journal_file_checkpoint: &JournalCheckpoint) -> bool {
1521        let super_block_header = &self.inner.lock().super_block_header;
1522        let offset = super_block_header
1523            .journal_file_offsets
1524            .get(&object_id)
1525            .cloned()
1526            .unwrap_or(super_block_header.super_block_journal_file_offset);
1527        journal_file_checkpoint.file_offset >= offset
1528    }
1529
1530    /// Flushes previous writes to the device and then writes out a new super-block.
1531    /// Callers must ensure that we do not make concurrent calls.
1532    async fn write_super_block(&self) -> Result<(), Error> {
1533        let root_parent_store = self.objects.root_parent_store();
1534
1535        // We need to flush previous writes to the device since the new super-block we are writing
1536        // relies on written data being observable, and we also need to lock the root parent store
1537        // so that no new entries are written to it whilst we are writing the super-block, and for
1538        // that we use the write lock.
1539        let old_layers;
1540        let old_super_block_offset;
1541        let mut new_super_block_header;
1542        let checkpoint;
1543        let borrowed;
1544
1545        {
1546            let _sync_guard = debug_assert_not_too_long!(self.sync_mutex.lock());
1547            {
1548                let _write_guard = self.writer_mutex.lock();
1549                (checkpoint, borrowed) = self.pad_to_block()?;
1550                old_layers = super_block::compact_root_parent(&*root_parent_store)?;
1551            }
1552            self.flush_device(checkpoint.file_offset)
1553                .await
1554                .context("flush failed when writing superblock")?;
1555        }
1556
1557        new_super_block_header = self.inner.lock().super_block_header.clone();
1558
1559        old_super_block_offset = new_super_block_header.journal_checkpoint.file_offset;
1560
1561        let (journal_file_offsets, min_checkpoint) = self.objects.journal_file_offsets();
1562
1563        new_super_block_header.generation = new_super_block_header.generation.wrapping_add(1);
1564        new_super_block_header.super_block_journal_file_offset = checkpoint.file_offset;
1565        new_super_block_header.journal_checkpoint = min_checkpoint.unwrap_or(checkpoint);
1566        new_super_block_header.journal_checkpoint.version = LATEST_VERSION;
1567        new_super_block_header.journal_file_offsets = journal_file_offsets;
1568        new_super_block_header.borrowed_metadata_space = borrowed;
1569
1570        self.super_block_manager
1571            .save(
1572                new_super_block_header.clone(),
1573                self.objects.root_parent_store().filesystem(),
1574                old_layers,
1575            )
1576            .await?;
1577        {
1578            let mut inner = self.inner.lock();
1579            inner.super_block_header = new_super_block_header;
1580            inner.zero_offset = Some(round_down(old_super_block_offset, BLOCK_SIZE));
1581        }
1582
1583        Ok(())
1584    }
1585
1586    /// Flushes any buffered journal data to the device.  Note that this does not flush the device
1587    /// unless the flush_device option is set, in which case data should have been persisted to
1588    /// lower layers.  If a precondition is supplied, it is evaluated and the sync will be skipped
1589    /// if it returns false.  This allows callers to check a condition whilst a lock is held.  If a
1590    /// sync is performed, this function returns the checkpoint that was flushed and the amount of
1591    /// borrowed metadata space at the point it was flushed.
1592    pub async fn sync(
1593        &self,
1594        options: SyncOptions<'_>,
1595    ) -> Result<Option<(JournalCheckpoint, u64)>, Error> {
1596        let _guard = debug_assert_not_too_long!(self.sync_mutex.lock());
1597
1598        let (checkpoint, borrowed) = {
1599            if let Some(precondition) = options.precondition {
1600                if !precondition() {
1601                    return Ok(None);
1602                }
1603            }
1604
1605            // This guard is required so that we don't insert an EndBlock record in the middle of a
1606            // transaction.
1607            let _guard = self.writer_mutex.lock();
1608
1609            self.pad_to_block()?
1610        };
1611
1612        if options.flush_device {
1613            self.flush_device(checkpoint.file_offset).await.context("sync: flush failed")?;
1614        }
1615
1616        Ok(Some((checkpoint, borrowed)))
1617    }
1618
1619    // Returns the checkpoint as it was prior to padding.  This is done because the super block
1620    // needs to record where the last transaction ends and it's the next transaction that pays the
1621    // price of the padding.
1622    fn pad_to_block(&self) -> Result<(JournalCheckpoint, u64), Error> {
1623        let mut inner = self.inner.lock();
1624        let checkpoint = inner.writer.journal_file_checkpoint();
1625        if checkpoint.file_offset % BLOCK_SIZE != 0 {
1626            JournalRecord::EndBlock.serialize_into(&mut inner.writer)?;
1627            inner.writer.pad_to_block()?;
1628            if let Some(waker) = inner.flush_waker.take() {
1629                waker.wake();
1630            }
1631        }
1632        Ok((checkpoint, self.objects.borrowed_metadata_space()))
1633    }
1634
1635    async fn flush_device(&self, checkpoint_offset: u64) -> Result<(), Error> {
1636        assert!(
1637            self.inner.lock().image_builder_mode.is_none(),
1638            "flush_device called in image builder mode"
1639        );
1640        debug_assert_not_too_long!(poll_fn(|ctx| {
1641            let mut inner = self.inner.lock();
1642            if inner.flushed_offset >= checkpoint_offset {
1643                Poll::Ready(Ok(()))
1644            } else if inner.terminate {
1645                let context = inner
1646                    .terminate_reason
1647                    .as_ref()
1648                    .map(|e| format!("Journal closed with error: {:?}", e))
1649                    .unwrap_or_else(|| "Journal closed".to_string());
1650                Poll::Ready(Err(anyhow!(FxfsError::JournalFlushError).context(context)))
1651            } else {
1652                inner.sync_waker = Some(ctx.waker().clone());
1653                Poll::Pending
1654            }
1655        }))?;
1656
1657        let needs_flush = self.inner.lock().device_flushed_offset < checkpoint_offset;
1658        if needs_flush {
1659            let trace = self.trace.load(Ordering::Relaxed);
1660            if trace {
1661                info!("J: start flush device");
1662            }
1663            self.handle.get().unwrap().flush_device().await?;
1664            if trace {
1665                info!("J: end flush device");
1666            }
1667
1668            // We need to write a DidFlushDevice record at some point, but if we are in the
1669            // process of shutting down the filesystem, we want to leave the journal clean to
1670            // avoid there being log messages complaining about unwritten journal data, so we
1671            // queue it up so that the next transaction will trigger this record to be written.
1672            // If we are shutting down, that will never happen but since the DidFlushDevice
1673            // message is purely advisory (it reduces the number of checksums we have to verify
1674            // during replay), it doesn't matter if it isn't written.
1675            {
1676                let mut inner = self.inner.lock();
1677                inner.device_flushed_offset = checkpoint_offset;
1678                inner.needs_did_flush_device = true;
1679            }
1680
1681            // Tell the allocator that we flushed the device so that it can now start using
1682            // space that was deallocated.
1683            self.objects.allocator().did_flush_device(checkpoint_offset);
1684            if trace {
1685                info!("J: did flush device");
1686            }
1687        }
1688
1689        Ok(())
1690    }
1691
1692    /// Returns a copy of the super-block header.
1693    pub fn super_block_header(&self) -> SuperBlockHeader {
1694        self.inner.lock().super_block_header.clone()
1695    }
1696
1697    /// Waits for there to be sufficient space in the journal.
1698    pub async fn check_journal_space(&self) -> Result<(), Error> {
1699        loop {
1700            debug_assert_not_too_long!({
1701                let inner = self.inner.lock();
1702                if inner.terminate {
1703                    // If the flush error is set, this will never make progress, since we can't
1704                    // extend the journal any more.
1705                    let context = inner
1706                        .terminate_reason
1707                        .as_ref()
1708                        .map(|e| format!("Journal closed with error: {:?}", e))
1709                        .unwrap_or_else(|| "Journal closed".to_string());
1710                    break Err(anyhow!(FxfsError::JournalFlushError).context(context));
1711                }
1712                if self.objects.last_end_offset()
1713                    - inner.super_block_header.journal_checkpoint.file_offset
1714                    < inner.reclaim_size
1715                {
1716                    break Ok(());
1717                }
1718                if inner.image_builder_mode.is_some() {
1719                    break Ok(());
1720                }
1721                if inner.disable_compactions {
1722                    break Err(
1723                        anyhow!(FxfsError::JournalFlushError).context("Compactions disabled")
1724                    );
1725                }
1726                self.reclaim_event.listen()
1727            });
1728        }
1729    }
1730
1731    fn chunk_size(&self) -> u64 {
1732        CHUNK_SIZE
1733    }
1734
1735    fn write_and_apply_mutations(&self, transaction: &mut Transaction<'_>) -> u64 {
1736        let checkpoint_before;
1737        let checkpoint_after;
1738        {
1739            let _guard = self.writer_mutex.lock();
1740            checkpoint_before = {
1741                let mut inner = self.inner.lock();
1742                if transaction.includes_write() {
1743                    inner.needs_barrier = true;
1744                }
1745                let checkpoint = inner.writer.journal_file_checkpoint();
1746                for TxnMutation { object_id, mutation, .. } in transaction.mutations() {
1747                    self.objects.write_mutation(
1748                        *object_id,
1749                        mutation,
1750                        Writer(*object_id, &mut inner.writer),
1751                    );
1752                }
1753                checkpoint
1754            };
1755            let maybe_mutation =
1756                self.objects.apply_transaction(transaction, &checkpoint_before).expect(
1757                    "apply_transaction should not fail in live mode; \
1758                     filesystem will be in an inconsistent state",
1759                );
1760            checkpoint_after = {
1761                let mut inner = self.inner.lock();
1762                if let Some(mutation) = maybe_mutation {
1763                    inner
1764                        .writer
1765                        .write_record(&JournalRecord::Mutation { object_id: 0, mutation })
1766                        .unwrap();
1767                }
1768                for (device_range, checksums, first_write) in
1769                    transaction.take_checksums().into_iter()
1770                {
1771                    inner
1772                        .writer
1773                        .write_record(&JournalRecord::DataChecksums(
1774                            device_range,
1775                            Checksums::fletcher(checksums),
1776                            first_write,
1777                        ))
1778                        .unwrap();
1779                }
1780                inner.writer.write_record(&JournalRecord::Commit).unwrap();
1781
1782                inner.writer.journal_file_checkpoint()
1783            };
1784        }
1785        self.objects.did_commit_transaction(
1786            transaction,
1787            &checkpoint_before,
1788            checkpoint_after.file_offset,
1789        );
1790
1791        if let Some(waker) = self.inner.lock().flush_waker.take() {
1792            waker.wake();
1793        }
1794
1795        checkpoint_before.file_offset
1796    }
1797
1798    /// This task will flush journal data to the device when there is data that needs flushing, and
1799    /// trigger compactions when short of journal space.  It will return after the terminate method
1800    /// has been called, or an error is encountered with either flushing or compaction.
1801    pub async fn flush_task(self: Arc<Self>) {
1802        let mut flush_fut = None;
1803        let mut compact_fut = None;
1804        let mut flush_error = false;
1805        poll_fn(|ctx| {
1806            loop {
1807                {
1808                    let mut inner = self.inner.lock();
1809                    if flush_fut.is_none() && !flush_error && self.handle.get().is_some() {
1810                        let flushable = inner.writer.flushable_bytes();
1811                        if flushable > 0 {
1812                            flush_fut = Some(Box::pin(self.flush(flushable)));
1813                        }
1814                    }
1815                    if inner.terminate && flush_fut.is_none() && compact_fut.is_none() {
1816                        return Poll::Ready(());
1817                    }
1818                    // `journal_bytes` refers to bytes in the journal that haven't yet been flushed
1819                    // to the layer files. It increases with each transaction and decreases when
1820                    // compactions complete. The flush task is woken whenever a transaction is
1821                    // committed and we should see this metric updated regularly.
1822                    let journal_bytes = self.objects.last_end_offset()
1823                        - inner.super_block_header.journal_checkpoint.file_offset;
1824                    fxfs_trace::counter!("journal-bytes", 0, "total" => journal_bytes);
1825                    // The / 2 is here because after compacting, we cannot reclaim the space until
1826                    // the _next_ time we flush the device since the super-block is not guaranteed
1827                    // to persist until then.
1828                    if compact_fut.is_none()
1829                        && !inner.terminate
1830                        && !inner.disable_compactions
1831                        && inner.image_builder_mode.is_none()
1832                        && journal_bytes > inner.reclaim_size / 2
1833                    {
1834                        compact_fut = Some(Box::pin(self.compact()));
1835                        inner.compaction_running = true;
1836                    }
1837                    inner.flush_waker = Some(ctx.waker().clone());
1838                }
1839                let mut pending = true;
1840                if let Some(fut) = flush_fut.as_mut() {
1841                    if let Poll::Ready(result) = fut.poll_unpin(ctx) {
1842                        if let Err(e) = result {
1843                            self.inner.lock().terminate(Some(e.context("Flush error")));
1844                            self.reclaim_event.notify(usize::MAX);
1845                            flush_error = true;
1846                        }
1847                        flush_fut = None;
1848                        pending = false;
1849                    }
1850                }
1851                if let Some(fut) = compact_fut.as_mut() {
1852                    if let Poll::Ready(result) = fut.poll_unpin(ctx) {
1853                        let mut inner = self.inner.lock();
1854                        if let Err(e) = result {
1855                            inner.terminate(Some(e.context("Compaction error")));
1856                        }
1857                        compact_fut = None;
1858                        inner.compaction_running = false;
1859                        self.reclaim_event.notify(usize::MAX);
1860                        pending = false;
1861                        fxfs_trace::counter!(
1862                            "journal-bytes",
1863                            0,
1864                            "total" => self.objects.last_end_offset()
1865                                - inner.super_block_header.journal_checkpoint.file_offset
1866                        );
1867                    }
1868                }
1869                if pending {
1870                    return Poll::Pending;
1871                }
1872            }
1873        })
1874        .await;
1875    }
1876
1877    /// Returns a yielder that can be used for compactions.
1878    pub fn get_compaction_yielder(&self) -> CompactionYielder<'_> {
1879        CompactionYielder::new(self)
1880    }
1881
1882    async fn flush(&self, amount: usize) -> Result<(), Error> {
1883        let handle = self.handle.get().unwrap();
1884        let mut buf = handle.allocate_buffer(amount).await;
1885        let (offset, len, barrier_on_first_write) = {
1886            let mut inner = self.inner.lock();
1887            let offset = inner.writer.take_flushable(buf.as_mut());
1888            let barrier_on_first_write = inner.needs_barrier && inner.barriers_enabled;
1889            // Reset `needs_barrier` before instead of after the overwrite in case a txn commit
1890            // that contains data happens during the overwrite.
1891            inner.needs_barrier = false;
1892            (offset, buf.len() as u64, barrier_on_first_write)
1893        };
1894        self.handle
1895            .get()
1896            .unwrap()
1897            .overwrite(
1898                offset,
1899                buf.as_mut(),
1900                OverwriteOptions { barrier_on_first_write, ..Default::default() },
1901            )
1902            .await?;
1903
1904        let mut inner = self.inner.lock();
1905        if let Some(waker) = inner.sync_waker.take() {
1906            waker.wake();
1907        }
1908        inner.flushed_offset = offset + len;
1909        inner.valid_to = inner.flushed_offset;
1910        Ok(())
1911    }
1912
1913    #[trace]
1914    async fn compact(&self) -> Result<(), Error> {
1915        assert!(
1916            self.inner.lock().image_builder_mode.is_none(),
1917            "compact called in image builder mode"
1918        );
1919        let bytes_before = self.objects.compaction_bytes_written();
1920        let _measure = crate::metrics::DurationMeasureScope::new(
1921            &crate::metrics::lsm_tree_metrics().journal_compaction_time,
1922        );
1923        crate::metrics::lsm_tree_metrics().journal_compactions_total.add(1);
1924        let trace = self.trace.load(Ordering::Relaxed);
1925        debug!("Compaction starting");
1926        if trace {
1927            info!("J: start compaction");
1928        }
1929        let earliest_version = self.objects.flush().await.context("Failed to flush objects")?;
1930        self.inner.lock().super_block_header.earliest_version = earliest_version;
1931        self.write_super_block().await.context("Failed to write superblock")?;
1932        if trace {
1933            info!("J: end compaction");
1934        }
1935        debug!("Compaction finished");
1936        let bytes_after = self.objects.compaction_bytes_written();
1937        crate::metrics::lsm_tree_metrics()
1938            .journal_compaction_bytes_written
1939            .add(bytes_after.saturating_sub(bytes_before));
1940        Ok(())
1941    }
1942
1943    /// This should generally NOT be called externally. It is public to allow use by FIDL service
1944    /// fxfs.Debug.
1945    pub async fn force_compact(&self) -> Result<(), Error> {
1946        self.inner.lock().forced_compaction = true;
1947        scopeguard::defer! { self.inner.lock().forced_compaction = false; }
1948        self.compact().await
1949    }
1950
1951    pub async fn stop_compactions(&self) {
1952        loop {
1953            debug_assert_not_too_long!({
1954                let mut inner = self.inner.lock();
1955                inner.disable_compactions = true;
1956                if !inner.compaction_running {
1957                    return;
1958                }
1959                self.reclaim_event.listen()
1960            });
1961        }
1962    }
1963
1964    /// Creates a lazy inspect node named `str` under `parent` which will yield statistics for the
1965    /// journal when queried.
1966    pub fn track_statistics(self: &Arc<Self>, parent: &fuchsia_inspect::Node, name: &str) {
1967        let this = Arc::downgrade(self);
1968        parent.record_lazy_child(name, move || {
1969            let this_clone = this.clone();
1970            async move {
1971                let inspector = fuchsia_inspect::Inspector::default();
1972                if let Some(this) = this_clone.upgrade() {
1973                    let (journal_min, journal_max, journal_reclaim_size) = {
1974                        // TODO(https://fxbug.dev/42069513): Push-back or rate-limit to prevent DoS.
1975                        let inner = this.inner.lock();
1976                        (
1977                            round_down(
1978                                inner.super_block_header.journal_checkpoint.file_offset,
1979                                BLOCK_SIZE,
1980                            ),
1981                            inner.flushed_offset,
1982                            inner.reclaim_size,
1983                        )
1984                    };
1985                    let root = inspector.root();
1986                    root.record_uint("journal_min_offset", journal_min);
1987                    root.record_uint("journal_max_offset", journal_max);
1988                    root.record_uint("journal_size", journal_max - journal_min);
1989                    root.record_uint("journal_reclaim_size", journal_reclaim_size);
1990
1991                    // TODO(https://fxbug.dev/42068224): Post-compute rather than manually computing metrics.
1992                    if let Some(x) = round_div(
1993                        100 * (journal_max - journal_min),
1994                        this.objects.allocator().get_disk_bytes(),
1995                    ) {
1996                        root.record_uint("journal_size_to_disk_size_percent", x);
1997                    }
1998                }
1999                Ok(inspector)
2000            }
2001            .boxed()
2002        });
2003    }
2004
2005    /// Terminate all journal activity.
2006    pub fn terminate(&self) {
2007        self.inner.lock().terminate(/*reason*/ None);
2008        self.reclaim_event.notify(usize::MAX);
2009    }
2010}
2011
2012/// Wrapper to allow records to be written to the journal.
2013pub struct Writer<'a>(u64, &'a mut JournalWriter);
2014
2015impl Writer<'_> {
2016    pub fn write(&mut self, mutation: Mutation) {
2017        self.1.write_record(&JournalRecord::Mutation { object_id: self.0, mutation }).unwrap();
2018    }
2019}
2020
2021#[cfg(target_os = "fuchsia")]
2022mod yielder {
2023    use super::Journal;
2024    use crate::lsm_tree::Yielder;
2025    use fuchsia_async as fasync;
2026
2027    /// CompactionYielder uses fuchsia-async to yield if other tasks are being polled, which should
2028    /// be a proxy for how busy the system is.  We can afford to delay compactions for a small
2029    /// amount of time but not so long that we end up blocking new transactions.
2030    pub struct CompactionYielder<'a> {
2031        journal: &'a Journal,
2032        low_priority_task: Option<fasync::LowPriorityTask>,
2033    }
2034
2035    impl<'a> CompactionYielder<'a> {
2036        pub fn new(journal: &'a Journal) -> Self {
2037            Self { journal, low_priority_task: None }
2038        }
2039    }
2040
2041    impl Yielder for CompactionYielder<'_> {
2042        async fn yield_now(&mut self) {
2043            // We will wait for the executor to be idle for 4ms, but no longer than 16ms.  We need
2044            // to cap the maximum amount of time we wait in case we've reached a point where
2045            // compaction is now urgent or else we could block new transactions.
2046            const IDLE_PERIOD: zx::MonotonicDuration = zx::MonotonicDuration::from_millis(4);
2047            const MAX_YIELD_DURATION: zx::MonotonicDuration =
2048                zx::MonotonicDuration::from_millis(16);
2049
2050            {
2051                let inner = self.journal.inner.lock();
2052                if inner.forced_compaction {
2053                    return;
2054                }
2055                let outstanding = self.journal.objects.last_end_offset()
2056                    - inner.super_block_header.journal_checkpoint.file_offset;
2057                let half_reclaim_size = inner.reclaim_size / 2;
2058                if outstanding
2059                    .checked_sub(half_reclaim_size)
2060                    .is_some_and(|x| x >= half_reclaim_size / 2)
2061                {
2062                    // If we have got to the point where we have used up 3/4 of reclaim size in the
2063                    // journal, do not delay any further.  If we continue to yield we will get to
2064                    // the point where we block new transactions.
2065                    self.low_priority_task = None;
2066                    return;
2067                }
2068            }
2069
2070            self.low_priority_task
2071                .get_or_insert_with(|| fasync::LowPriorityTask::new())
2072                .wait_until_idle_for(
2073                    IDLE_PERIOD,
2074                    fasync::MonotonicInstant::after(MAX_YIELD_DURATION),
2075                )
2076                .await;
2077        }
2078    }
2079}
2080
2081#[cfg(not(target_os = "fuchsia"))]
2082mod yielder {
2083    use super::Journal;
2084    use crate::lsm_tree::Yielder;
2085
2086    #[expect(dead_code)]
2087    pub struct CompactionYielder<'a>(&'a Journal);
2088
2089    impl<'a> CompactionYielder<'a> {
2090        pub fn new(journal: &'a Journal) -> Self {
2091            Self(journal)
2092        }
2093    }
2094
2095    impl Yielder for CompactionYielder<'_> {
2096        async fn yield_now(&mut self) {}
2097    }
2098}
2099
2100pub use yielder::*;
2101
2102#[cfg(test)]
2103mod tests {
2104    use super::SuperBlockInstance;
2105    use crate::filesystem::{FxFilesystem, FxFilesystemBuilder, SyncOptions};
2106    use crate::fsck::fsck;
2107    use crate::object_handle::{ObjectHandle, ReadObjectHandle, WriteObjectHandle};
2108    use crate::object_store::directory::Directory;
2109    use crate::object_store::transaction::Options;
2110    use crate::object_store::volume::root_volume;
2111    use crate::object_store::{
2112        HandleOptions, LockKey, NewChildStoreOptions, ObjectStore, StoreOptions, lock_keys,
2113    };
2114    #[cfg(target_os = "fuchsia")]
2115    use fuchsia_async::TestExecutor;
2116    use fuchsia_async::{self as fasync, MonotonicDuration};
2117    use storage_device::DeviceHolder;
2118    use storage_device::fake_device::FakeDevice;
2119
2120    const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
2121
2122    #[fuchsia::test]
2123    async fn test_replay() {
2124        const TEST_DATA: &[u8] = b"hello";
2125
2126        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
2127
2128        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2129
2130        let object_id = {
2131            let root_store = fs.root_store();
2132            let root_directory =
2133                Directory::open(&root_store, root_store.root_directory_object_id())
2134                    .await
2135                    .expect("open failed");
2136            let mut transaction = fs
2137                .clone()
2138                .new_transaction(
2139                    lock_keys![LockKey::object(
2140                        root_store.store_object_id(),
2141                        root_store.root_directory_object_id(),
2142                    )],
2143                    Options::default(),
2144                )
2145                .await
2146                .expect("new_transaction failed");
2147            let handle = root_directory
2148                .create_child_file(&mut transaction, "test")
2149                .await
2150                .expect("create_child_file failed");
2151
2152            transaction.commit().await.expect("commit failed");
2153            let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
2154            buf.as_mut_slice().copy_from_slice(TEST_DATA);
2155            handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2156            // As this is the first sync, this will actually trigger a new super-block, but normally
2157            // this would not be the case.
2158            fs.sync(SyncOptions::default()).await.expect("sync failed");
2159            handle.object_id()
2160        };
2161
2162        {
2163            fs.close().await.expect("Close failed");
2164            let device = fs.take_device().await;
2165            device.reopen(false);
2166            let fs = FxFilesystem::open(device).await.expect("open failed");
2167            let handle = ObjectStore::open_object(
2168                &fs.root_store(),
2169                object_id,
2170                HandleOptions::default(),
2171                None,
2172            )
2173            .await
2174            .expect("open_object failed");
2175            let mut buf = handle.allocate_buffer(TEST_DEVICE_BLOCK_SIZE as usize).await;
2176            assert_eq!(handle.read(0, buf.as_mut()).await.expect("read failed"), TEST_DATA.len());
2177            assert_eq!(&buf.as_slice()[..TEST_DATA.len()], TEST_DATA);
2178            fsck(fs.clone()).await.expect("fsck failed");
2179            fs.close().await.expect("Close failed");
2180        }
2181    }
2182
2183    #[fuchsia::test]
2184    async fn test_reset() {
2185        const TEST_DATA: &[u8] = b"hello";
2186
2187        let device = DeviceHolder::new(FakeDevice::new(32768, TEST_DEVICE_BLOCK_SIZE));
2188
2189        let mut object_ids = Vec::new();
2190
2191        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2192        {
2193            let root_store = fs.root_store();
2194            let root_directory =
2195                Directory::open(&root_store, root_store.root_directory_object_id())
2196                    .await
2197                    .expect("open failed");
2198            let mut transaction = fs
2199                .clone()
2200                .new_transaction(
2201                    lock_keys![LockKey::object(
2202                        root_store.store_object_id(),
2203                        root_store.root_directory_object_id(),
2204                    )],
2205                    Options::default(),
2206                )
2207                .await
2208                .expect("new_transaction failed");
2209            let handle = root_directory
2210                .create_child_file(&mut transaction, "test")
2211                .await
2212                .expect("create_child_file failed");
2213            transaction.commit().await.expect("commit failed");
2214            let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
2215            buf.as_mut_slice().copy_from_slice(TEST_DATA);
2216            handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2217            fs.sync(SyncOptions::default()).await.expect("sync failed");
2218            object_ids.push(handle.object_id());
2219
2220            // Create a lot of objects but don't sync at the end. This should leave the filesystem
2221            // with a half finished transaction that cannot be replayed.
2222            for i in 0..1000 {
2223                let mut transaction = fs
2224                    .clone()
2225                    .new_transaction(
2226                        lock_keys![LockKey::object(
2227                            root_store.store_object_id(),
2228                            root_store.root_directory_object_id(),
2229                        )],
2230                        Options::default(),
2231                    )
2232                    .await
2233                    .expect("new_transaction failed");
2234                let handle = root_directory
2235                    .create_child_file(&mut transaction, &format!("{}", i))
2236                    .await
2237                    .expect("create_child_file failed");
2238                transaction.commit().await.expect("commit failed");
2239                let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
2240                buf.as_mut_slice().copy_from_slice(TEST_DATA);
2241                handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2242                object_ids.push(handle.object_id());
2243            }
2244        }
2245        fs.close().await.expect("fs close failed");
2246        let device = fs.take_device().await;
2247        device.reopen(false);
2248        let fs = FxFilesystem::open(device).await.expect("open failed");
2249        fsck(fs.clone()).await.expect("fsck failed");
2250        {
2251            let root_store = fs.root_store();
2252            // Check the first two objects which should exist.
2253            for &object_id in &object_ids[0..1] {
2254                let handle = ObjectStore::open_object(
2255                    &root_store,
2256                    object_id,
2257                    HandleOptions::default(),
2258                    None,
2259                )
2260                .await
2261                .expect("open_object failed");
2262                let mut buf = handle.allocate_buffer(TEST_DEVICE_BLOCK_SIZE as usize).await;
2263                assert_eq!(
2264                    handle.read(0, buf.as_mut()).await.expect("read failed"),
2265                    TEST_DATA.len()
2266                );
2267                assert_eq!(&buf.as_slice()[..TEST_DATA.len()], TEST_DATA);
2268            }
2269
2270            // Write one more object and sync.
2271            let root_directory =
2272                Directory::open(&root_store, root_store.root_directory_object_id())
2273                    .await
2274                    .expect("open failed");
2275            let mut transaction = fs
2276                .clone()
2277                .new_transaction(
2278                    lock_keys![LockKey::object(
2279                        root_store.store_object_id(),
2280                        root_store.root_directory_object_id(),
2281                    )],
2282                    Options::default(),
2283                )
2284                .await
2285                .expect("new_transaction failed");
2286            let handle = root_directory
2287                .create_child_file(&mut transaction, "test2")
2288                .await
2289                .expect("create_child_file failed");
2290            transaction.commit().await.expect("commit failed");
2291            let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
2292            buf.as_mut_slice().copy_from_slice(TEST_DATA);
2293            handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2294            fs.sync(SyncOptions::default()).await.expect("sync failed");
2295            object_ids.push(handle.object_id());
2296        }
2297
2298        fs.close().await.expect("close failed");
2299        let device = fs.take_device().await;
2300        device.reopen(false);
2301        let fs = FxFilesystem::open(device).await.expect("open failed");
2302        {
2303            fsck(fs.clone()).await.expect("fsck failed");
2304
2305            // Check the first two and the last objects.
2306            for &object_id in object_ids[0..1].iter().chain(object_ids.last().cloned().iter()) {
2307                let handle = ObjectStore::open_object(
2308                    &fs.root_store(),
2309                    object_id,
2310                    HandleOptions::default(),
2311                    None,
2312                )
2313                .await
2314                .unwrap_or_else(|e| {
2315                    panic!("open_object failed (object_id: {}): {:?}", object_id, e)
2316                });
2317                let mut buf = handle.allocate_buffer(TEST_DEVICE_BLOCK_SIZE as usize).await;
2318                assert_eq!(
2319                    handle.read(0, buf.as_mut()).await.expect("read failed"),
2320                    TEST_DATA.len()
2321                );
2322                assert_eq!(&buf.as_slice()[..TEST_DATA.len()], TEST_DATA);
2323            }
2324        }
2325        fs.close().await.expect("close failed");
2326    }
2327
2328    #[fuchsia::test]
2329    async fn test_discard() {
2330        let device = {
2331            let device = DeviceHolder::new(FakeDevice::new(8192, 4096));
2332            let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2333            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
2334
2335            let store = root_volume
2336                .new_volume("test", NewChildStoreOptions::default())
2337                .await
2338                .expect("new_volume failed");
2339            let root_directory = Directory::open(&store, store.root_directory_object_id())
2340                .await
2341                .expect("open failed");
2342
2343            // Create enough data so that another journal extent is used.
2344            let mut i = 0;
2345            loop {
2346                let mut transaction = fs
2347                    .clone()
2348                    .new_transaction(
2349                        lock_keys![LockKey::object(
2350                            store.store_object_id(),
2351                            store.root_directory_object_id()
2352                        )],
2353                        Options::default(),
2354                    )
2355                    .await
2356                    .expect("new_transaction failed");
2357                root_directory
2358                    .create_child_file(&mut transaction, &format!("a {i}"))
2359                    .await
2360                    .expect("create_child_file failed");
2361                if transaction.commit().await.expect("commit failed") > super::CHUNK_SIZE {
2362                    break;
2363                }
2364                i += 1;
2365            }
2366
2367            // Compact and then disable compactions.
2368            fs.journal().force_compact().await.expect("compact failed");
2369            fs.journal().stop_compactions().await;
2370
2371            // Keep going until we need another journal extent.
2372            let mut i = 0;
2373            loop {
2374                let mut transaction = fs
2375                    .clone()
2376                    .new_transaction(
2377                        lock_keys![LockKey::object(
2378                            store.store_object_id(),
2379                            store.root_directory_object_id()
2380                        )],
2381                        Options::default(),
2382                    )
2383                    .await
2384                    .expect("new_transaction failed");
2385                root_directory
2386                    .create_child_file(&mut transaction, &format!("b {i}"))
2387                    .await
2388                    .expect("create_child_file failed");
2389                if transaction.commit().await.expect("commit failed") > 2 * super::CHUNK_SIZE {
2390                    break;
2391                }
2392                i += 1;
2393            }
2394
2395            // Allow the journal to flush, but we don't want to sync.
2396            fasync::Timer::new(MonotonicDuration::from_millis(10)).await;
2397            // Because we're not gracefully closing the filesystem, a Discard record will be
2398            // emitted.
2399            fs.device().snapshot().expect("snapshot failed")
2400        };
2401
2402        let fs = FxFilesystem::open(device).await.expect("open failed");
2403
2404        {
2405            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
2406
2407            let store =
2408                root_volume.volume("test", StoreOptions::default()).await.expect("volume failed");
2409
2410            let root_directory = Directory::open(&store, store.root_directory_object_id())
2411                .await
2412                .expect("open failed");
2413
2414            // Write one more transaction.
2415            let mut transaction = fs
2416                .clone()
2417                .new_transaction(
2418                    lock_keys![LockKey::object(
2419                        store.store_object_id(),
2420                        store.root_directory_object_id()
2421                    )],
2422                    Options::default(),
2423                )
2424                .await
2425                .expect("new_transaction failed");
2426            root_directory
2427                .create_child_file(&mut transaction, &format!("d"))
2428                .await
2429                .expect("create_child_file failed");
2430            transaction.commit().await.expect("commit failed");
2431        }
2432
2433        fs.close().await.expect("close failed");
2434        let device = fs.take_device().await;
2435        device.reopen(false);
2436
2437        let fs = FxFilesystem::open(device).await.expect("open failed");
2438        fsck(fs.clone()).await.expect("fsck failed");
2439        fs.close().await.expect("close failed");
2440    }
2441
2442    #[fuchsia::test]
2443    async fn test_use_existing_generation() {
2444        let device = DeviceHolder::new(FakeDevice::new(8192, 4096));
2445
2446        // First format should be generation 1.
2447        let fs = FxFilesystemBuilder::new()
2448            .format(true)
2449            .image_builder_mode(Some(SuperBlockInstance::A))
2450            .open(device)
2451            .await
2452            .expect("open failed");
2453        fs.enable_allocations();
2454        let generation0 = fs.super_block_header().generation;
2455        assert_eq!(generation0, 1);
2456        fs.close().await.expect("close failed");
2457        let device = fs.take_device().await;
2458        device.reopen(false);
2459
2460        // Format the device normally (again, generation 1).
2461        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2462        let generation1 = fs.super_block_header().generation;
2463        {
2464            let root_volume = crate::object_store::volume::root_volume(fs.clone())
2465                .await
2466                .expect("root_volume failed");
2467            root_volume
2468                .new_volume("test", crate::object_store::NewChildStoreOptions::default())
2469                .await
2470                .expect("new_volume failed");
2471        }
2472        fs.close().await.expect("close failed");
2473        let device = fs.take_device().await;
2474        device.reopen(false);
2475
2476        // Format again with image_builder_mode.
2477        let fs = FxFilesystemBuilder::new()
2478            .format(true)
2479            .image_builder_mode(Some(SuperBlockInstance::A))
2480            .open(device)
2481            .await
2482            .expect("open failed");
2483        fs.enable_allocations();
2484        let generation2 = fs.super_block_header().generation;
2485        assert!(
2486            generation2 > generation1,
2487            "generation2 ({}) should be greater than generation1 ({})",
2488            generation2,
2489            generation1
2490        );
2491        fs.close().await.expect("close failed");
2492    }
2493
2494    #[fuchsia::test]
2495    async fn test_image_builder_mode_generation_bump_512_byte_block() {
2496        let device = DeviceHolder::new(FakeDevice::new(16384, 512));
2497
2498        // Format initial filesystem (generation 1)
2499        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2500        let generation1 = fs.super_block_header().generation;
2501        fs.close().await.expect("close failed");
2502        let device = fs.take_device().await;
2503        device.reopen(false);
2504
2505        // Format again with image_builder_mode (should bump generation)
2506        let fs = FxFilesystemBuilder::new()
2507            .format(true)
2508            .image_builder_mode(Some(SuperBlockInstance::A))
2509            .open(device)
2510            .await
2511            .expect("open failed");
2512
2513        fs.enable_allocations();
2514        let generation2 = fs.super_block_header().generation;
2515        assert!(
2516            generation2 > generation1,
2517            "Expected generation bump, got {} vs {}",
2518            generation2,
2519            generation1
2520        );
2521        fs.close().await.expect("close failed");
2522    }
2523
2524    #[fuchsia::test]
2525    #[cfg(target_os = "fuchsia")]
2526    fn test_low_priority_compaction() {
2527        let mut executor = TestExecutor::new_with_fake_time();
2528        let mut fut = std::pin::pin!(async {
2529            use std::sync::Arc;
2530            use std::sync::atomic::{AtomicBool, Ordering};
2531
2532            let device = DeviceHolder::new(FakeDevice::new(16384, TEST_DEVICE_BLOCK_SIZE));
2533            let fs = FxFilesystemBuilder::new()
2534                .journal_options(super::JournalOptions {
2535                    reclaim_size: 65536,
2536                    ..Default::default()
2537                })
2538                .format(true)
2539                .open(device)
2540                .await
2541                .expect("open failed");
2542
2543            let _low = fasync::LowPriorityTask::new();
2544
2545            // Add some data to the tree.
2546            {
2547                let root_store = fs.root_store();
2548                let root_directory =
2549                    Directory::open(&root_store, root_store.root_directory_object_id())
2550                        .await
2551                        .expect("open failed");
2552                for i in 0..100 {
2553                    let mut transaction = fs
2554                        .clone()
2555                        .new_transaction(
2556                            lock_keys![LockKey::object(
2557                                root_store.store_object_id(),
2558                                root_store.root_directory_object_id(),
2559                            )],
2560                            Options::default(),
2561                        )
2562                        .await
2563                        .expect("new_transaction failed");
2564                    root_directory
2565                        .create_child_file(&mut transaction, &format!("test{}", i))
2566                        .await
2567                        .expect("create_child_file failed");
2568                    transaction.commit().await.expect("commit failed");
2569                }
2570            }
2571
2572            // Spawn a task that polls every 1ms.
2573            let stop = Arc::new(AtomicBool::new(false));
2574            let stop_clone = stop.clone();
2575            let _normal_task = fasync::Task::spawn(async move {
2576                while !stop_clone.load(Ordering::Relaxed) {
2577                    fasync::Timer::new(fasync::MonotonicInstant::after(
2578                        MonotonicDuration::from_millis(1),
2579                    ))
2580                    .await;
2581                }
2582            });
2583
2584            // Trigger journal compaction.
2585            // We can do this by writing more data until outstanding > reclaim_size / 2.
2586            {
2587                let root_store = fs.root_store();
2588                let root_directory =
2589                    Directory::open(&root_store, root_store.root_directory_object_id())
2590                        .await
2591                        .expect("open failed");
2592                let mut i = 0;
2593                loop {
2594                    let mut transaction = fs
2595                        .clone()
2596                        .new_transaction(
2597                            lock_keys![LockKey::object(
2598                                root_store.store_object_id(),
2599                                root_store.root_directory_object_id(),
2600                            )],
2601                            Options::default(),
2602                        )
2603                        .await
2604                        .expect("new_transaction failed");
2605                    root_directory
2606                        .create_child_file(&mut transaction, &format!("trigger{i}"))
2607                        .await
2608                        .expect("create_child_file failed");
2609                    transaction.commit().await.expect("commit failed");
2610
2611                    if fs.journal().inner.lock().compaction_running {
2612                        break;
2613                    }
2614                    TestExecutor::advance_to(fasync::MonotonicInstant::after(
2615                        MonotonicDuration::from_millis(1),
2616                    ))
2617                    .await;
2618                    i += 1;
2619                }
2620            }
2621
2622            // Compaction should now be running. Because of our 1ms poller, it should be yielding.
2623            // It will yield for 16ms at each point.
2624            for _ in 0..10 {
2625                TestExecutor::advance_to(fasync::MonotonicInstant::after(
2626                    MonotonicDuration::from_millis(1),
2627                ))
2628                .await;
2629                assert!(fs.journal().inner.lock().compaction_running);
2630            }
2631
2632            // Stop the normal task.
2633            stop.store(true, Ordering::Relaxed);
2634            TestExecutor::advance_to(fasync::MonotonicInstant::after(
2635                MonotonicDuration::from_millis(1),
2636            ))
2637            .await;
2638
2639            // Compaction should still be running because it hasn't been 4 ms since the normal task
2640            // finished.
2641            assert!(fs.journal().inner.lock().compaction_running);
2642
2643            // For the next 3ms, compaction should still be running.
2644            for _ in 0..3 {
2645                TestExecutor::advance_to(fasync::MonotonicInstant::after(
2646                    MonotonicDuration::from_millis(1),
2647                ))
2648                .await;
2649                assert!(fs.journal().inner.lock().compaction_running);
2650            }
2651
2652            // 1 more ms and compaction should be unblocked.
2653            TestExecutor::advance_to(fasync::MonotonicInstant::after(
2654                MonotonicDuration::from_millis(1),
2655            ))
2656            .await;
2657
2658            // When the executor next stalls, compaction should be done.
2659            let _ = TestExecutor::poll_until_stalled(std::future::pending::<()>()).await;
2660            assert!(!fs.journal().inner.lock().compaction_running);
2661
2662            fs.close().await.expect("Close failed");
2663        });
2664        assert!(executor.run_until_stalled(&mut fut).is_ready());
2665    }
2666
2667    #[fuchsia::test]
2668    #[cfg(target_os = "fuchsia")]
2669    fn test_low_priority_compaction_deadline() {
2670        let mut executor = TestExecutor::new_with_fake_time();
2671        let mut fut = std::pin::pin!(async {
2672            use std::sync::Arc;
2673            use std::sync::atomic::{AtomicBool, Ordering};
2674
2675            let device = DeviceHolder::new(FakeDevice::new(216384, TEST_DEVICE_BLOCK_SIZE));
2676            let fs = FxFilesystemBuilder::new()
2677                .journal_options(super::JournalOptions {
2678                    reclaim_size: 65536,
2679                    ..Default::default()
2680                })
2681                .format(true)
2682                .open(device)
2683                .await
2684                .expect("open failed");
2685
2686            let _low = fasync::LowPriorityTask::new();
2687
2688            // Add some data to the tree.
2689            {
2690                let root_store = fs.root_store();
2691                let root_directory =
2692                    Directory::open(&root_store, root_store.root_directory_object_id())
2693                        .await
2694                        .expect("open failed");
2695                for i in 0..10 {
2696                    let mut transaction = fs
2697                        .clone()
2698                        .new_transaction(
2699                            lock_keys![LockKey::object(
2700                                root_store.store_object_id(),
2701                                root_store.root_directory_object_id(),
2702                            )],
2703                            Options::default(),
2704                        )
2705                        .await
2706                        .expect("new_transaction failed");
2707                    root_directory
2708                        .create_child_file(&mut transaction, &format!("test{}", i))
2709                        .await
2710                        .expect("create_child_file failed");
2711                    transaction.commit().await.expect("commit failed");
2712                }
2713            }
2714
2715            // Spawn a task that polls every 3ms.
2716            let stop = Arc::new(AtomicBool::new(false));
2717            let stop_clone = stop.clone();
2718            let _normal_task = fasync::Task::spawn(async move {
2719                while !stop_clone.load(Ordering::Relaxed) {
2720                    fasync::Timer::new(fasync::MonotonicInstant::after(
2721                        MonotonicDuration::from_millis(3),
2722                    ))
2723                    .await;
2724                }
2725            });
2726
2727            // Trigger journal compaction.
2728            {
2729                let root_store = fs.root_store();
2730                let root_directory =
2731                    Directory::open(&root_store, root_store.root_directory_object_id())
2732                        .await
2733                        .expect("open failed");
2734                let mut i = 0;
2735                loop {
2736                    let mut transaction = fs
2737                        .clone()
2738                        .new_transaction(
2739                            lock_keys![LockKey::object(
2740                                root_store.store_object_id(),
2741                                root_store.root_directory_object_id(),
2742                            )],
2743                            Options::default(),
2744                        )
2745                        .await
2746                        .expect("new_transaction failed");
2747                    root_directory
2748                        .create_child_file(&mut transaction, &format!("trigger{i}"))
2749                        .await
2750                        .expect("create_child_file failed");
2751                    transaction.commit().await.expect("commit failed");
2752
2753                    if fs.journal().inner.lock().compaction_running {
2754                        break;
2755                    }
2756                    TestExecutor::advance_to(fasync::MonotonicInstant::after(
2757                        MonotonicDuration::from_millis(1),
2758                    ))
2759                    .await;
2760                    i += 1;
2761                }
2762            }
2763
2764            // Advance time in 20ms increments. Each increment should allow compaction to make progress
2765            // on one item (since MAX_YIELD_DURATION is 16ms).
2766            let mut count = 0;
2767            for _ in 0..1000 {
2768                TestExecutor::advance_to(fasync::MonotonicInstant::after(
2769                    MonotonicDuration::from_millis(20),
2770                ))
2771                .await;
2772                if !fs.journal().inner.lock().compaction_running {
2773                    break;
2774                }
2775                count += 1;
2776            }
2777            assert!(!fs.journal().inner.lock().compaction_running);
2778
2779            // Make sure it took a few iterations to complete.  It's difficult to know what the exact
2780            // number should be.
2781            assert!(count > 200);
2782
2783            stop.store(true, Ordering::Relaxed);
2784            fs.close().await.expect("Close failed");
2785        });
2786        assert!(executor.run_until_stalled(&mut fut).is_ready());
2787    }
2788
2789    #[fuchsia::test]
2790    #[cfg(target_os = "fuchsia")]
2791    fn test_low_priority_compaction_no_yielding_when_full() {
2792        let mut executor = TestExecutor::new_with_fake_time();
2793        let mut fut = std::pin::pin!(async {
2794            use std::sync::Arc;
2795            use std::sync::atomic::{AtomicBool, Ordering};
2796
2797            let reclaim_size = 65536;
2798            let device = DeviceHolder::new(FakeDevice::new(216384, TEST_DEVICE_BLOCK_SIZE));
2799            let fs = FxFilesystemBuilder::new()
2800                .journal_options(super::JournalOptions { reclaim_size, ..Default::default() })
2801                .format(true)
2802                .open(device)
2803                .await
2804                .expect("open failed");
2805
2806            let _low = fasync::LowPriorityTask::new();
2807
2808            // Add some data to the tree.
2809            {
2810                let root_store = fs.root_store();
2811                let root_directory =
2812                    Directory::open(&root_store, root_store.root_directory_object_id())
2813                        .await
2814                        .expect("open failed");
2815                for i in 0..10 {
2816                    let mut transaction = fs
2817                        .clone()
2818                        .new_transaction(
2819                            lock_keys![LockKey::object(
2820                                root_store.store_object_id(),
2821                                root_store.root_directory_object_id(),
2822                            )],
2823                            Options::default(),
2824                        )
2825                        .await
2826                        .expect("new_transaction failed");
2827                    root_directory
2828                        .create_child_file(&mut transaction, &format!("test{}", i))
2829                        .await
2830                        .expect("create_child_file failed");
2831                    transaction.commit().await.expect("commit failed");
2832                }
2833            }
2834
2835            // Spawn a task that polls every 3ms.
2836            let stop = Arc::new(AtomicBool::new(false));
2837            let stop_clone = stop.clone();
2838            let _normal_task = fasync::Task::spawn(async move {
2839                while !stop_clone.load(Ordering::Relaxed) {
2840                    fasync::Timer::new(fasync::MonotonicInstant::after(
2841                        MonotonicDuration::from_millis(3),
2842                    ))
2843                    .await;
2844                }
2845            });
2846
2847            // Trigger journal compaction, but this time we fill it up to 3/4 full.
2848            {
2849                let root_store = fs.root_store();
2850                let root_directory =
2851                    Directory::open(&root_store, root_store.root_directory_object_id())
2852                        .await
2853                        .expect("open failed");
2854                let mut i = 0;
2855                loop {
2856                    let mut transaction = fs
2857                        .clone()
2858                        .new_transaction(
2859                            lock_keys![LockKey::object(
2860                                root_store.store_object_id(),
2861                                root_store.root_directory_object_id(),
2862                            )],
2863                            Options::default(),
2864                        )
2865                        .await
2866                        .expect("new_transaction failed");
2867                    root_directory
2868                        .create_child_file(&mut transaction, &format!("trigger{i}"))
2869                        .await
2870                        .expect("create_child_file failed");
2871                    transaction.commit().await.expect("commit failed");
2872
2873                    let outstanding = {
2874                        let inner = fs.journal().inner.lock();
2875                        fs.journal().objects.last_end_offset()
2876                            - inner.super_block_header.journal_checkpoint.file_offset
2877                    };
2878                    if outstanding >= reclaim_size * 7 / 8 {
2879                        break;
2880                    }
2881                    // We don't advance time here to try and reach 3/4 before compaction can yield.
2882                    i += 1;
2883                }
2884            }
2885
2886            // Advancing by 4ms should be enough to wake compaction up.
2887            TestExecutor::advance_to(fasync::MonotonicInstant::after(
2888                MonotonicDuration::from_millis(4),
2889            ))
2890            .await;
2891
2892            // When the executor next stalls, compaction should be done.
2893            let _ = TestExecutor::poll_until_stalled(std::future::pending::<()>()).await;
2894            assert!(!fs.journal().inner.lock().compaction_running);
2895
2896            stop.store(true, Ordering::Relaxed);
2897            fs.close().await.expect("Close failed");
2898        });
2899        assert!(executor.run_until_stalled(&mut fut).is_ready());
2900    }
2901}
2902
2903#[cfg(fuzz)]
2904mod fuzz {
2905    use fuzz::fuzz;
2906
2907    #[fuzz]
2908    fn fuzz_journal_bytes(input: Vec<u8>) {
2909        use crate::filesystem::FxFilesystem;
2910        use fuchsia_async as fasync;
2911        use std::io::Write;
2912        use storage_device::DeviceHolder;
2913        use storage_device::fake_device::FakeDevice;
2914
2915        fasync::SendExecutorBuilder::new().num_threads(4).build().run(async move {
2916            let device = DeviceHolder::new(FakeDevice::new(32768, 512));
2917            let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2918            fs.journal().inner.lock().writer.write_all(&input).expect("write failed");
2919            fs.close().await.expect("close failed");
2920            let device = fs.take_device().await;
2921            device.reopen(false);
2922            if let Ok(fs) = FxFilesystem::open(device).await {
2923                // `close()` can fail if there were objects to be tombstoned. If the said object is
2924                // corrupted, there will be an error when we compact the journal.
2925                let _ = fs.close().await;
2926            }
2927        });
2928    }
2929
2930    #[fuzz]
2931    fn fuzz_journal(input: Vec<super::JournalRecord>) {
2932        use crate::filesystem::FxFilesystem;
2933        use fuchsia_async as fasync;
2934        use storage_device::DeviceHolder;
2935        use storage_device::fake_device::FakeDevice;
2936
2937        fasync::SendExecutorBuilder::new().num_threads(4).build().run(async move {
2938            let device = DeviceHolder::new(FakeDevice::new(32768, 512));
2939            let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2940            {
2941                let mut inner = fs.journal().inner.lock();
2942                for record in &input {
2943                    let _ = inner.writer.write_record(record);
2944                }
2945            }
2946            fs.close().await.expect("close failed");
2947            let device = fs.take_device().await;
2948            device.reopen(false);
2949            if let Ok(fs) = FxFilesystem::open(device).await {
2950                // `close()` can fail if there were objects to be tombstoned. If the said object is
2951                // corrupted, there will be an error when we compact the journal.
2952                let _ = fs.close().await;
2953            }
2954        });
2955    }
2956}