Skip to main content

fxfs/object_store/
journal.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! The journal is implemented as an ever extending file which contains variable length records
6//! that describe mutations to be applied to various objects.  The journal file consists of
7//! blocks, with a checksum at the end of each block, but otherwise it can be considered a
8//! continuous stream.
9//!
10//! The checksum is seeded with the checksum from the previous block.  To free space in the
11//! journal, records are replaced with sparse extents when it is known they are no longer
12//! needed to mount.
13//!
14//! At mount time, the journal is replayed: the mutations are applied into memory.
15//! Eventually, a checksum failure will indicate no more records exist to be replayed,
16//! at which point the mount can continue and the journal will be extended from that point with
17//! further mutations as required.
18
19mod bootstrap_handle;
20mod checksum_list;
21mod reader;
22pub mod super_block;
23mod writer;
24
25use crate::checksum::{Checksum, Checksums, ChecksumsV38};
26use crate::errors::FxfsError;
27use crate::filesystem::{ApplyContext, ApplyMode, FxFilesystem, SyncOptions};
28use crate::log::*;
29use crate::lsm_tree::cache::NullCache;
30use crate::lsm_tree::types::Layer;
31use crate::object_handle::{ObjectHandle as _, ReadObjectHandle};
32use crate::object_store::allocator::Allocator;
33use crate::object_store::data_object_handle::OverwriteOptions;
34use crate::object_store::extent_record::{
35    DEFAULT_DATA_ATTRIBUTE_ID, ExtentKey, ExtentMode, ExtentValue,
36};
37use crate::object_store::graveyard::Graveyard;
38use crate::object_store::journal::bootstrap_handle::BootstrapObjectHandle;
39use crate::object_store::journal::checksum_list::ChecksumList;
40use crate::object_store::journal::reader::{JournalReader, ReadResult};
41use crate::object_store::journal::super_block::{
42    SuperBlockHeader, SuperBlockInstance, SuperBlockManager,
43};
44use crate::object_store::journal::writer::JournalWriter;
45use crate::object_store::object_manager::ObjectManager;
46use crate::object_store::object_record::{AttributeKey, ObjectKey, ObjectKeyData, ObjectValue};
47use crate::object_store::transaction::{
48    AllocatorMutation, LockKey, Mutation, MutationV40, MutationV41, MutationV43, MutationV46,
49    MutationV47, MutationV49, MutationV50, MutationV54, ObjectStoreMutation, Options,
50    TRANSACTION_MAX_JOURNAL_USAGE, Transaction, TxnMutation, lock_keys,
51};
52use crate::object_store::{
53    AssocObj, DataObjectHandle, HandleOptions, HandleOwner, INVALID_OBJECT_ID, Item, ItemRef,
54    NewChildStoreOptions, ObjectStore, ReservedId,
55};
56use crate::range::RangeExt;
57use crate::round::{round_div, round_down};
58use crate::serialized_types::{
59    LATEST_VERSION, Migrate, Version, Versioned, migrate_nodefault, migrate_to_version,
60};
61use anyhow::{Context, Error, anyhow, bail, ensure};
62use event_listener::Event;
63use fprint::TypeFingerprint;
64use fuchsia_inspect::NumericProperty;
65use fuchsia_sync::Mutex;
66use futures::FutureExt as _;
67use futures::future::poll_fn;
68use rustc_hash::FxHashMap as HashMap;
69use serde::{Deserialize, Serialize};
70use static_assertions::const_assert;
71use std::clone::Clone;
72use std::collections::HashSet;
73use std::num::NonZero;
74use std::ops::{Bound, Range};
75use std::sync::atomic::{AtomicBool, Ordering};
76use std::sync::{Arc, OnceLock};
77use std::task::{Poll, Waker};
78use storage_device::Device;
79
80// The journal file is written to in blocks of this size.
81pub const BLOCK_SIZE: u64 = 4096;
82
83// The journal file is extended by this amount when necessary.
84const CHUNK_SIZE: u64 = 131_072;
85const_assert!(CHUNK_SIZE > TRANSACTION_MAX_JOURNAL_USAGE);
86
87// See the comment for the `reclaim_size` member of Inner.
88pub const DEFAULT_RECLAIM_SIZE: u64 = 524_288;
89
90// Temporary space that should be reserved for the journal.  For example: space that is currently
91// used in the journal file but cannot be deallocated yet because we are flushing.
92pub const RESERVED_SPACE: u64 = 1_048_576;
93
94// Whenever the journal is replayed (i.e. the system is unmounted and remounted), we reset the
95// journal stream, at which point any half-complete transactions are discarded.  We indicate a
96// journal reset by XORing the previous block's checksum with this mask, and using that value as a
97// seed for the next journal block.
98const RESET_XOR: u64 = 0xffffffffffffffff;
99
100// To keep track of offsets within a journal file, we need both the file offset and the check-sum of
101// the preceding block, since the check-sum of the preceding block is an input to the check-sum of
102// every block.
103pub type JournalCheckpoint = JournalCheckpointV32;
104
105#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize, TypeFingerprint)]
106pub struct JournalCheckpointV32 {
107    pub file_offset: u64,
108
109    // Starting check-sum for block that contains file_offset i.e. the checksum for the previous
110    // block.
111    pub checksum: Checksum,
112
113    // If versioned, the version of elements stored in the journal. e.g. JournalRecord version.
114    // This can change across reset events so we store it along with the offset and checksum to
115    // know which version to deserialize.
116    pub version: Version,
117}
118
119pub type JournalRecord = JournalRecordV54;
120
121#[allow(clippy::large_enum_variant)]
122#[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint, Versioned)]
123#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
124pub enum JournalRecordV54 {
125    /// Indicates no more records in this block.
126    EndBlock,
127    /// Mutation for a particular object.  object_id here is for the collection i.e. the store or
128    /// allocator.
129    Mutation { object_id: u64, mutation: MutationV54 },
130    /// Commits records in the transaction.
131    Commit,
132    /// Discard all mutations with offsets greater than or equal to the given offset.
133    Discard(u64),
134    /// Indicates the device was flushed at the given journal offset.
135    /// Note that this really means that at this point in the journal offset, we can be certain that
136    /// there's no remaining buffered data in the block device; the buffers and the disk contents are
137    /// consistent.
138    /// We insert one of these records *after* a flush along with the *next* transaction to go
139    /// through.  If that never comes (either due to graceful or hard shutdown), the journal reset
140    /// on the next mount will serve the same purpose and count as a flush, although it is necessary
141    /// to defensively flush the device before replaying the journal (if possible, i.e. not
142    /// read-only) in case the block device connection was reused.
143    DidFlushDevice(u64),
144    /// Checksums for a data range written by this transaction. A transaction is only valid if these
145    /// checksums are right. The range is the device offset the checksums are for.
146    ///
147    /// A boolean indicates whether this range is being written to for the first time. For overwrite
148    /// extents, we only check the checksums for a block if it has been written to for the first
149    /// time since the last flush, because otherwise we can't roll it back anyway so it doesn't
150    /// matter. For copy-on-write extents, the bool is always true.
151    DataChecksums(Range<u64>, crate::checksum::ChecksumsV38, bool),
152}
153
154#[allow(clippy::large_enum_variant)]
155#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
156#[migrate_to_version(JournalRecordV54)]
157#[migrate_nodefault]
158pub enum JournalRecordV50 {
159    EndBlock,
160    Mutation { object_id: u64, mutation: MutationV50 },
161    Commit,
162    Discard(u64),
163    DidFlushDevice(u64),
164    DataChecksums(Range<u64>, ChecksumsV38, bool),
165}
166
167#[allow(clippy::large_enum_variant)]
168#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
169#[migrate_to_version(JournalRecordV50)]
170pub enum JournalRecordV49 {
171    EndBlock,
172    Mutation { object_id: u64, mutation: MutationV49 },
173    Commit,
174    Discard(u64),
175    DidFlushDevice(u64),
176    DataChecksums(Range<u64>, ChecksumsV38, bool),
177}
178
179#[allow(clippy::large_enum_variant)]
180#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
181#[migrate_to_version(JournalRecordV49)]
182pub enum JournalRecordV47 {
183    EndBlock,
184    Mutation { object_id: u64, mutation: MutationV47 },
185    Commit,
186    Discard(u64),
187    DidFlushDevice(u64),
188    DataChecksums(Range<u64>, ChecksumsV38, bool),
189}
190
191#[allow(clippy::large_enum_variant)]
192#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
193#[migrate_to_version(JournalRecordV47)]
194pub enum JournalRecordV46 {
195    EndBlock,
196    Mutation { object_id: u64, mutation: MutationV46 },
197    Commit,
198    Discard(u64),
199    DidFlushDevice(u64),
200    DataChecksums(Range<u64>, ChecksumsV38, bool),
201}
202
203#[allow(clippy::large_enum_variant)]
204#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
205#[migrate_to_version(JournalRecordV46)]
206pub enum JournalRecordV43 {
207    EndBlock,
208    Mutation { object_id: u64, mutation: MutationV43 },
209    Commit,
210    Discard(u64),
211    DidFlushDevice(u64),
212    DataChecksums(Range<u64>, ChecksumsV38, bool),
213}
214
215#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
216#[migrate_to_version(JournalRecordV43)]
217pub enum JournalRecordV42 {
218    EndBlock,
219    Mutation { object_id: u64, mutation: MutationV41 },
220    Commit,
221    Discard(u64),
222    DidFlushDevice(u64),
223    DataChecksums(Range<u64>, ChecksumsV38, bool),
224}
225
226#[derive(Serialize, Deserialize, TypeFingerprint, Versioned)]
227pub enum JournalRecordV41 {
228    EndBlock,
229    Mutation { object_id: u64, mutation: MutationV41 },
230    Commit,
231    Discard(u64),
232    DidFlushDevice(u64),
233    DataChecksums(Range<u64>, ChecksumsV38),
234}
235
236impl From<JournalRecordV41> for JournalRecordV42 {
237    fn from(record: JournalRecordV41) -> Self {
238        match record {
239            JournalRecordV41::EndBlock => Self::EndBlock,
240            JournalRecordV41::Mutation { object_id, mutation } => {
241                Self::Mutation { object_id, mutation: mutation.into() }
242            }
243            JournalRecordV41::Commit => Self::Commit,
244            JournalRecordV41::Discard(offset) => Self::Discard(offset),
245            JournalRecordV41::DidFlushDevice(offset) => Self::DidFlushDevice(offset),
246            JournalRecordV41::DataChecksums(range, sums) => {
247                // At the time of writing the only extents written by real systems are CoW extents
248                // so the new bool is always true.
249                Self::DataChecksums(range, sums, true)
250            }
251        }
252    }
253}
254
255#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
256#[migrate_to_version(JournalRecordV41)]
257pub enum JournalRecordV40 {
258    EndBlock,
259    Mutation { object_id: u64, mutation: MutationV40 },
260    Commit,
261    Discard(u64),
262    DidFlushDevice(u64),
263    DataChecksums(Range<u64>, ChecksumsV38),
264}
265
266pub(super) fn journal_handle_options() -> HandleOptions {
267    HandleOptions { skip_journal_checks: true, ..Default::default() }
268}
269
270/// The journal records a stream of mutations that are to be applied to other objects.  At mount
271/// time, these records can be replayed into memory.  It provides a way to quickly persist changes
272/// without having to make a large number of writes; they can be deferred to a later time (e.g.
273/// when a sufficient number have been queued).  It also provides support for transactions, the
274/// ability to have mutations that are to be applied atomically together.
275pub struct Journal {
276    objects: Arc<ObjectManager>,
277    handle: OnceLock<DataObjectHandle<ObjectStore>>,
278    super_block_manager: SuperBlockManager,
279    inner: Mutex<Inner>,
280    writer_mutex: Mutex<()>,
281    sync_mutex: futures::lock::Mutex<()>,
282    trace: AtomicBool,
283
284    // This event is used when we are waiting for a compaction to free up journal space.
285    reclaim_event: Event,
286}
287
288struct Inner {
289    super_block_header: SuperBlockHeader,
290
291    // The offset that we can zero the journal up to now that it is no longer needed.
292    zero_offset: Option<u64>,
293
294    // The journal offset that we most recently flushed to the device.
295    device_flushed_offset: u64,
296
297    // If true, indicates a DidFlushDevice record is pending.
298    needs_did_flush_device: bool,
299
300    // The writer for the journal.
301    writer: JournalWriter,
302
303    // Set when a reset is encountered during a read.
304    // Used at write pre_commit() time to ensure we write a version first thing after a reset.
305    output_reset_version: bool,
306
307    // Waker for the flush task.
308    flush_waker: Option<Waker>,
309
310    // Indicates the journal has been terminated.
311    terminate: bool,
312
313    // Latched error indicating reason for journal termination if not graceful.
314    terminate_reason: Option<Error>,
315
316    // Disable compactions.
317    disable_compactions: bool,
318
319    // True if compactions are running.
320    compaction_running: bool,
321
322    // Waker for the sync task for when it's waiting for the flush task to finish.
323    sync_waker: Option<Waker>,
324
325    // The last offset we flushed to the journal file.
326    flushed_offset: u64,
327
328    // The last offset that should be considered valid in the journal file.  Most of the time, this
329    // will be the same as `flushed_offset` but at mount time, this could be less and will only be
330    // up to the end of the last valid transaction; it won't include transactions that follow that
331    // have been discarded.
332    valid_to: u64,
333
334    // If, after replaying, we have to discard a number of mutations (because they don't validate),
335    // this offset specifies where we need to discard back to.  This is so that when we next replay,
336    // we ignore those mutations and continue with new good mutations.
337    discard_offset: Option<u64>,
338
339    // In the steady state, the journal should fluctuate between being approximately half of this
340    // number and this number.  New super-blocks will be written every time about half of this
341    // amount is written to the journal.
342    reclaim_size: u64,
343
344    image_builder_mode: Option<SuperBlockInstance>,
345
346    // If true and `needs_barrier`, issue a pre-barrier on the first device write of each journal
347    // write (which happens in multiples of `BLOCK_SIZE`). This ensures that all the corresponding
348    // data writes make it to disk before the journal gets written to.
349    barriers_enabled: bool,
350
351    // If true, indicates that data write requests have been made to the device since the last
352    // journal write.
353    needs_barrier: bool,
354
355    // True if a compaction is being forced for reasons other than the journal being full.
356    forced_compaction: bool,
357}
358
359impl Inner {
360    fn terminate(&mut self, reason: Option<Error>) {
361        self.terminate = true;
362
363        if let Some(err) = reason {
364            error!(error:? = err; "Terminating journal");
365            // Log previous error if one was already set, otherwise latch the error.
366            if let Some(prev_err) = self.terminate_reason.as_ref() {
367                error!(error:? = prev_err; "Journal previously terminated");
368            } else {
369                self.terminate_reason = Some(err);
370            }
371        }
372
373        if let Some(waker) = self.flush_waker.take() {
374            waker.wake();
375        }
376        if let Some(waker) = self.sync_waker.take() {
377            waker.wake();
378        }
379    }
380}
381
382pub struct JournalOptions {
383    /// In the steady state, the journal should fluctuate between being approximately half of this
384    /// number and this number.  New super-blocks will be written every time about half of this
385    /// amount is written to the journal.
386    pub reclaim_size: u64,
387
388    // If true, issue a pre-barrier on the first device write of each journal write (which happens
389    // in multiples of `BLOCK_SIZE`). This ensures that all the corresponding data writes make it
390    // to disk before the journal gets written to.
391    pub barriers_enabled: bool,
392}
393
394impl Default for JournalOptions {
395    fn default() -> Self {
396        JournalOptions { reclaim_size: DEFAULT_RECLAIM_SIZE, barriers_enabled: false }
397    }
398}
399
400struct JournaledTransactions {
401    transactions: Vec<JournaledTransaction>,
402    device_flushed_offset: u64,
403}
404
405#[derive(Debug, Default)]
406pub struct JournaledTransaction {
407    pub checkpoint: JournalCheckpoint,
408    pub root_parent_mutations: Vec<Mutation>,
409    pub root_mutations: Vec<Mutation>,
410    /// List of (store_object_id, mutation).
411    pub non_root_mutations: Vec<(u64, Mutation)>,
412    pub end_offset: u64,
413    pub checksums: Vec<JournaledChecksums>,
414
415    /// Records offset + 1 of the matching begin_flush transaction. The +1 is because we want to
416    /// ignore the begin flush transaction; we don't need or want to replay it.
417    pub end_flush: Option<(/* store_id: */ u64, /* begin offset: */ u64)>,
418
419    /// The volume which was deleted in this transaction, if any.
420    pub volume_deleted: Option</* store_id: */ u64>,
421}
422
423impl JournaledTransaction {
424    fn new(checkpoint: JournalCheckpoint) -> Self {
425        Self { checkpoint, ..Default::default() }
426    }
427}
428
429const VOLUME_DELETED: u64 = u64::MAX;
430
431#[derive(Debug)]
432pub struct JournaledChecksums {
433    pub device_range: Range<u64>,
434    pub checksums: Checksums,
435    pub first_write: bool,
436}
437
438/// Handles for journal-like objects have some additional functionality to manage their extents,
439/// since during replay we need to add extents as we find them.
440pub trait JournalHandle: ReadObjectHandle {
441    /// The end offset of the last extent in the JournalHandle.  Used only for validating extents
442    /// (which will be skipped if None is returned).
443    /// Note this is equivalent in value to ReadObjectHandle::get_size, when present.
444    fn end_offset(&self) -> Option<u64>;
445    /// Adds an extent to the current end of the journal stream.
446    /// `added_offset` is the offset into the journal of the transaction which added this extent,
447    /// used for discard_extents.
448    fn push_extent(&mut self, added_offset: u64, device_range: Range<u64>);
449    /// Discards all extents which were added in a transaction at offset >= |discard_offset|.
450    fn discard_extents(&mut self, discard_offset: u64);
451}
452
453// Provide a stub implementation for DataObjectHandle so we can use it in
454// Journal::read_transactions.  Manual extent management is a NOP (which is OK since presumably the
455// DataObjectHandle already knows where its extents live).
456impl<S: HandleOwner> JournalHandle for DataObjectHandle<S> {
457    fn end_offset(&self) -> Option<u64> {
458        None
459    }
460    fn push_extent(&mut self, _added_offset: u64, _device_range: Range<u64>) {
461        // NOP
462    }
463    fn discard_extents(&mut self, _discard_offset: u64) {
464        // NOP
465    }
466}
467
468#[fxfs_trace::trace]
469impl Journal {
470    pub fn new(objects: Arc<ObjectManager>, options: JournalOptions) -> Journal {
471        let starting_checksum = rand::random_range(1..u64::MAX);
472        Journal {
473            objects: objects,
474            handle: OnceLock::new(),
475            super_block_manager: SuperBlockManager::new(),
476            inner: Mutex::new(Inner {
477                super_block_header: SuperBlockHeader::default(),
478                zero_offset: None,
479                device_flushed_offset: 0,
480                needs_did_flush_device: false,
481                writer: JournalWriter::new(BLOCK_SIZE as usize, starting_checksum),
482                output_reset_version: false,
483                flush_waker: None,
484                terminate: false,
485                terminate_reason: None,
486                disable_compactions: false,
487                compaction_running: false,
488                sync_waker: None,
489                flushed_offset: 0,
490                valid_to: 0,
491                discard_offset: None,
492                reclaim_size: options.reclaim_size,
493                image_builder_mode: None,
494                barriers_enabled: options.barriers_enabled,
495                needs_barrier: false,
496                forced_compaction: false,
497            }),
498            writer_mutex: Mutex::new(()),
499            sync_mutex: futures::lock::Mutex::new(()),
500            trace: AtomicBool::new(false),
501            reclaim_event: Event::new(),
502        }
503    }
504
505    pub fn set_trace(&self, trace: bool) {
506        let old_value = self.trace.swap(trace, Ordering::Relaxed);
507        if trace != old_value {
508            info!(trace; "J: trace");
509        }
510    }
511
512    pub fn set_image_builder_mode(&self, mode: Option<SuperBlockInstance>) {
513        self.inner.lock().image_builder_mode = mode;
514        if let Some(instance) = mode {
515            *self.super_block_manager.next_instance.lock() = instance;
516        }
517    }
518
519    pub fn image_builder_mode(&self) -> Option<SuperBlockInstance> {
520        self.inner.lock().image_builder_mode
521    }
522
523    #[cfg(feature = "migration")]
524    pub fn set_filesystem_uuid(&self, uuid: &[u8; 16]) -> Result<(), Error> {
525        ensure!(
526            self.inner.lock().image_builder_mode.is_some(),
527            "Can only set filesystem uuid in image builder mode."
528        );
529        self.inner.lock().super_block_header.guid.0 = uuid::Uuid::from_bytes(*uuid);
530        Ok(())
531    }
532
533    pub(crate) async fn read_superblocks(
534        &self,
535        device: Arc<dyn Device>,
536        block_size: u64,
537    ) -> Result<(SuperBlockHeader, ObjectStore), Error> {
538        self.super_block_manager.load(device, block_size).await
539    }
540
541    /// Used during replay to validate a mutation.  This should return false if the mutation is not
542    /// valid and should not be applied.  This could be for benign reasons: e.g. the device flushed
543    /// data out-of-order, or because of a malicious actor.
544    fn validate_mutation(&self, mutation: &Mutation, block_size: u64, device_size: u64) -> bool {
545        match mutation {
546            Mutation::ObjectStore(ObjectStoreMutation {
547                item:
548                    Item {
549                        key:
550                            ObjectKey {
551                                data:
552                                    ObjectKeyData::Attribute(
553                                        _,
554                                        AttributeKey::Extent(ExtentKey { range }),
555                                    ),
556                                ..
557                            },
558                        value: ObjectValue::Extent(ExtentValue::Some { device_offset, mode, .. }),
559                        ..
560                    },
561                ..
562            }) => {
563                if range.is_empty() || !range.is_aligned(block_size) {
564                    return false;
565                }
566                let len = range.length().unwrap();
567                if let ExtentMode::Cow(checksums) = mode {
568                    if checksums.len() > 0 {
569                        if len % checksums.len() as u64 != 0 {
570                            return false;
571                        }
572                        if (len / checksums.len() as u64) % block_size != 0 {
573                            return false;
574                        }
575                    }
576                }
577                if *device_offset % block_size != 0
578                    || *device_offset >= device_size
579                    || device_size - *device_offset < len
580                {
581                    return false;
582                }
583            }
584            Mutation::ObjectStore(_) => {}
585            Mutation::EncryptedObjectStore(_) => {}
586            Mutation::Allocator(AllocatorMutation::Allocate { device_range, owner_object_id }) => {
587                return !device_range.is_empty()
588                    && *owner_object_id != INVALID_OBJECT_ID
589                    && device_range.end <= device_size;
590            }
591            Mutation::Allocator(AllocatorMutation::Deallocate {
592                device_range,
593                owner_object_id,
594            }) => {
595                return !device_range.is_empty()
596                    && *owner_object_id != INVALID_OBJECT_ID
597                    && device_range.end <= device_size;
598            }
599            Mutation::Allocator(AllocatorMutation::MarkForDeletion(owner_object_id)) => {
600                return *owner_object_id != INVALID_OBJECT_ID;
601            }
602            Mutation::Allocator(AllocatorMutation::SetLimit { owner_object_id, .. }) => {
603                return *owner_object_id != INVALID_OBJECT_ID;
604            }
605            Mutation::BeginFlush => {}
606            Mutation::EndFlush => {}
607            Mutation::DeleteVolume => {}
608            Mutation::UpdateBorrowed(_) => {}
609            Mutation::UpdateMutationsKey(_) => {}
610            Mutation::CreateInternalDir(owner_object_id) => {
611                return *owner_object_id != INVALID_OBJECT_ID;
612            }
613        }
614        true
615    }
616
617    // Assumes that `mutation` has been validated.
618    fn update_checksum_list(
619        &self,
620        journal_offset: u64,
621        mutation: &Mutation,
622        checksum_list: &mut ChecksumList,
623    ) -> Result<(), Error> {
624        match mutation {
625            Mutation::ObjectStore(_) => {}
626            Mutation::Allocator(AllocatorMutation::Deallocate { device_range, .. }) => {
627                checksum_list.mark_deallocated(journal_offset, device_range.clone().into());
628            }
629            _ => {}
630        }
631        Ok(())
632    }
633
634    /// Reads the latest super-block, and then replays journaled records.
635    #[trace]
636    pub async fn replay(
637        &self,
638        filesystem: Arc<FxFilesystem>,
639        on_new_allocator: Option<Box<dyn Fn(Arc<Allocator>) + Send + Sync>>,
640    ) -> Result<(), Error> {
641        let block_size = filesystem.block_size();
642
643        let (super_block, root_parent) =
644            self.super_block_manager.load(filesystem.device(), block_size).await?;
645
646        let root_parent = Arc::new(ObjectStore::attach_filesystem(root_parent, filesystem.clone()));
647
648        self.objects.set_root_parent_store(root_parent.clone());
649        let allocator =
650            Arc::new(Allocator::new(filesystem.clone(), super_block.allocator_object_id));
651        if let Some(on_new_allocator) = on_new_allocator {
652            on_new_allocator(allocator.clone());
653        }
654        self.objects.set_allocator(allocator.clone());
655        self.objects.set_borrowed_metadata_space(super_block.borrowed_metadata_space);
656        self.objects.set_last_end_offset(super_block.super_block_journal_file_offset);
657        {
658            let mut inner = self.inner.lock();
659            inner.super_block_header = super_block.clone();
660        }
661
662        let device = filesystem.device();
663
664        let mut handle;
665        {
666            let root_parent_layer = root_parent.tree().mutable_layer();
667            let mut iter = root_parent_layer
668                .seek(Bound::Included(&ObjectKey::attribute(
669                    super_block.journal_object_id,
670                    DEFAULT_DATA_ATTRIBUTE_ID,
671                    AttributeKey::Extent(ExtentKey::search_key_from_offset(round_down(
672                        super_block.journal_checkpoint.file_offset,
673                        BLOCK_SIZE,
674                    ))),
675                )))
676                .await
677                .context("Failed to seek root parent store")?;
678            let start_offset = if let Some(ItemRef {
679                key:
680                    ObjectKey {
681                        data:
682                            ObjectKeyData::Attribute(
683                                DEFAULT_DATA_ATTRIBUTE_ID,
684                                AttributeKey::Extent(ExtentKey { range }),
685                            ),
686                        ..
687                    },
688                ..
689            }) = iter.get()
690            {
691                range.start
692            } else {
693                0
694            };
695            handle = BootstrapObjectHandle::new_with_start_offset(
696                super_block.journal_object_id,
697                device.clone(),
698                start_offset,
699            );
700            while let Some(item) = iter.get() {
701                if !match item.into() {
702                    Some((
703                        object_id,
704                        DEFAULT_DATA_ATTRIBUTE_ID,
705                        ExtentKey { range },
706                        ExtentValue::Some { device_offset, .. },
707                    )) if object_id == super_block.journal_object_id => {
708                        if let Some(end_offset) = handle.end_offset() {
709                            if range.start != end_offset {
710                                bail!(anyhow!(FxfsError::Inconsistent).context(format!(
711                                    "Unexpected journal extent {:?}, expected start: {}",
712                                    item, end_offset
713                                )));
714                            }
715                        }
716                        handle.push_extent(
717                            0, // We never discard extents from the root parent store.
718                            *device_offset
719                                ..*device_offset + range.length().context("Invalid extent")?,
720                        );
721                        true
722                    }
723                    _ => false,
724                } {
725                    break;
726                }
727                iter.advance().await.context("Failed to advance root parent store iterator")?;
728            }
729        }
730
731        let mut reader = JournalReader::new(handle, &super_block.journal_checkpoint);
732        let JournaledTransactions { mut transactions, device_flushed_offset } = self
733            .read_transactions(&mut reader, None, INVALID_OBJECT_ID)
734            .await
735            .context("Reading transactions for replay")?;
736
737        // Validate all the mutations.
738        let mut checksum_list = ChecksumList::new(device_flushed_offset);
739        let mut valid_to = reader.journal_file_checkpoint().file_offset;
740        let device_size = device.size();
741        'bad_replay: for JournaledTransaction {
742            checkpoint,
743            root_parent_mutations,
744            root_mutations,
745            non_root_mutations,
746            checksums,
747            ..
748        } in &transactions
749        {
750            for JournaledChecksums { device_range, checksums, first_write } in checksums {
751                checksum_list
752                    .push(
753                        checkpoint.file_offset,
754                        device_range.clone(),
755                        checksums.maybe_as_ref().context("Malformed checksums")?,
756                        *first_write,
757                    )
758                    .context("Pushing journal checksum records to checksum list")?;
759            }
760            for mutation in root_parent_mutations
761                .iter()
762                .chain(root_mutations)
763                .chain(non_root_mutations.iter().map(|(_, m)| m))
764            {
765                if !self.validate_mutation(mutation, block_size, device_size) {
766                    info!(mutation:?; "Stopping replay at bad mutation");
767                    valid_to = checkpoint.file_offset;
768                    break 'bad_replay;
769                }
770                self.update_checksum_list(checkpoint.file_offset, &mutation, &mut checksum_list)?;
771            }
772        }
773
774        // Validate the checksums. Note if barriers are enabled, there will be no checksums in
775        // practice to verify.
776        let valid_to = checksum_list
777            .verify(device.as_ref(), valid_to)
778            .await
779            .context("Failed to validate checksums")?;
780
781        // Apply the mutations...
782
783        let mut last_checkpoint = reader.journal_file_checkpoint();
784        let mut journal_offsets = super_block.journal_file_offsets.clone();
785
786        // Start with the root-parent mutations, and also determine the journal offsets for all
787        // other objects.
788        for (
789            index,
790            JournaledTransaction {
791                checkpoint,
792                root_parent_mutations,
793                end_flush,
794                volume_deleted,
795                ..
796            },
797        ) in transactions.iter_mut().enumerate()
798        {
799            if checkpoint.file_offset >= valid_to {
800                last_checkpoint = checkpoint.clone();
801
802                // Truncate the transactions so we don't need to worry about them on the next pass.
803                transactions.truncate(index);
804                break;
805            }
806
807            let context = ApplyContext { mode: ApplyMode::Replay, checkpoint: checkpoint.clone() };
808            for mutation in root_parent_mutations.drain(..) {
809                self.objects
810                    .apply_mutation(
811                        super_block.root_parent_store_object_id,
812                        mutation,
813                        &context,
814                        AssocObj::None,
815                    )
816                    .context("Failed to replay root parent store mutations")?;
817            }
818
819            if let Some((object_id, journal_offset)) = end_flush {
820                journal_offsets.insert(*object_id, *journal_offset);
821            }
822
823            if let Some(object_id) = volume_deleted {
824                journal_offsets.insert(*object_id, VOLUME_DELETED);
825            }
826        }
827
828        // Now we can open the root store.
829        let root_store = ObjectStore::open(
830            &root_parent,
831            super_block.root_store_object_id,
832            Box::new(NullCache {}),
833        )
834        .await
835        .context("Unable to open root store")?;
836
837        ensure!(
838            !root_store.is_encrypted(),
839            anyhow!(FxfsError::Inconsistent).context("Root store is encrypted")
840        );
841        self.objects.set_root_store(root_store);
842
843        let root_store_offset =
844            journal_offsets.get(&super_block.root_store_object_id).copied().unwrap_or(0);
845
846        // Now replay the root store mutations.
847        for JournaledTransaction { checkpoint, root_mutations, .. } in &mut transactions {
848            if checkpoint.file_offset < root_store_offset {
849                continue;
850            }
851
852            let context = ApplyContext { mode: ApplyMode::Replay, checkpoint: checkpoint.clone() };
853            for mutation in root_mutations.drain(..) {
854                self.objects
855                    .apply_mutation(
856                        super_block.root_store_object_id,
857                        mutation,
858                        &context,
859                        AssocObj::None,
860                    )
861                    .context("Failed to replay root store mutations")?;
862            }
863        }
864
865        // Now we can open the allocator.
866        allocator.open().await.context("Failed to open allocator")?;
867
868        // Now replay all other mutations.
869        for JournaledTransaction { checkpoint, non_root_mutations, end_offset, .. } in transactions
870        {
871            self.objects
872                .replay_mutations(
873                    non_root_mutations,
874                    &journal_offsets,
875                    &ApplyContext { mode: ApplyMode::Replay, checkpoint },
876                    end_offset,
877                )
878                .await
879                .context("Failed to replay mutations")?;
880        }
881
882        allocator.on_replay_complete().await.context("Failed to complete replay for allocator")?;
883
884        let discarded_to =
885            if last_checkpoint.file_offset != reader.journal_file_checkpoint().file_offset {
886                Some(reader.journal_file_checkpoint().file_offset)
887            } else {
888                None
889            };
890
891        // Configure the journal writer so that we can continue.
892        {
893            if last_checkpoint.file_offset < super_block.super_block_journal_file_offset {
894                return Err(anyhow!(FxfsError::Inconsistent).context(format!(
895                    "journal replay cut short; journal finishes at {}, but super-block was \
896                     written at {}",
897                    last_checkpoint.file_offset, super_block.super_block_journal_file_offset
898                )));
899            }
900            let handle = ObjectStore::open_object(
901                &root_parent,
902                super_block.journal_object_id,
903                journal_handle_options(),
904                None,
905            )
906            .await
907            .with_context(|| {
908                format!(
909                    "Failed to open journal file (object id: {})",
910                    super_block.journal_object_id
911                )
912            })?;
913            let _ = self.handle.set(handle);
914            let mut inner = self.inner.lock();
915            reader.skip_to_end_of_block();
916            let mut writer_checkpoint = reader.journal_file_checkpoint();
917
918            // Make sure we don't accidentally use the reader from now onwards.
919            std::mem::drop(reader);
920
921            // Reset the stream to indicate that we've remounted the journal.
922            writer_checkpoint.checksum ^= RESET_XOR;
923            writer_checkpoint.version = LATEST_VERSION;
924            inner.flushed_offset = writer_checkpoint.file_offset;
925
926            // When we open the filesystem as writable, we flush the device.
927            inner.device_flushed_offset = inner.flushed_offset;
928
929            inner.writer.seek(writer_checkpoint);
930            inner.output_reset_version = true;
931            inner.valid_to = last_checkpoint.file_offset;
932            if last_checkpoint.file_offset < inner.flushed_offset {
933                inner.discard_offset = Some(last_checkpoint.file_offset);
934            }
935        }
936
937        self.objects
938            .on_replay_complete()
939            .await
940            .context("Failed to complete replay for object manager")?;
941
942        info!(checkpoint = last_checkpoint.file_offset, discarded_to; "replay complete");
943        Ok(())
944    }
945
946    async fn read_transactions(
947        &self,
948        reader: &mut JournalReader,
949        end_offset: Option<u64>,
950        object_id_filter: u64,
951    ) -> Result<JournaledTransactions, Error> {
952        let mut transactions = Vec::new();
953        let (mut device_flushed_offset, root_parent_store_object_id, root_store_object_id) = {
954            let super_block = &self.inner.lock().super_block_header;
955            (
956                super_block.super_block_journal_file_offset,
957                super_block.root_parent_store_object_id,
958                super_block.root_store_object_id,
959            )
960        };
961        let mut current_transaction = None;
962        let mut begin_flush_offsets = HashMap::default();
963        let mut stores_deleted = HashSet::new();
964        loop {
965            // Cache the checkpoint before we deserialize a record.
966            let checkpoint = reader.journal_file_checkpoint();
967            if let Some(end_offset) = end_offset {
968                if checkpoint.file_offset >= end_offset {
969                    break;
970                }
971            }
972            let result =
973                reader.deserialize().await.context("Failed to deserialize journal record")?;
974            match result {
975                ReadResult::Reset(_) => {
976                    if current_transaction.is_some() {
977                        current_transaction = None;
978                        transactions.pop();
979                    }
980                    let offset = reader.journal_file_checkpoint().file_offset;
981                    if offset > device_flushed_offset {
982                        device_flushed_offset = offset;
983                    }
984                }
985                ReadResult::Some(record) => {
986                    match record {
987                        JournalRecord::EndBlock => {
988                            reader.skip_to_end_of_block();
989                        }
990                        JournalRecord::Mutation { object_id, mutation } => {
991                            let current_transaction = match current_transaction.as_mut() {
992                                None => {
993                                    transactions.push(JournaledTransaction::new(checkpoint));
994                                    current_transaction = transactions.last_mut();
995                                    current_transaction.as_mut().unwrap()
996                                }
997                                Some(transaction) => transaction,
998                            };
999
1000                            if stores_deleted.contains(&object_id) {
1001                                bail!(
1002                                    anyhow!(FxfsError::Inconsistent)
1003                                        .context("Encountered mutations for deleted store")
1004                                );
1005                            }
1006
1007                            match &mutation {
1008                                Mutation::BeginFlush => {
1009                                    begin_flush_offsets.insert(
1010                                        object_id,
1011                                        current_transaction.checkpoint.file_offset,
1012                                    );
1013                                }
1014                                Mutation::EndFlush => {
1015                                    if let Some(offset) = begin_flush_offsets.remove(&object_id) {
1016                                        if let Some(deleted_volume) =
1017                                            &current_transaction.volume_deleted
1018                                        {
1019                                            if *deleted_volume == object_id {
1020                                                bail!(anyhow!(FxfsError::Inconsistent).context(
1021                                                    "Multiple EndFlush/DeleteVolume mutations in a \
1022                                                    single transaction for the same object"
1023                                                ));
1024                                            }
1025                                        }
1026                                        // The +1 is because we don't want to replay the transaction
1027                                        // containing the begin flush; we don't need or want to
1028                                        // replay it.
1029                                        if current_transaction
1030                                            .end_flush
1031                                            .replace((object_id, offset + 1))
1032                                            .is_some()
1033                                        {
1034                                            bail!(anyhow!(FxfsError::Inconsistent).context(
1035                                                "Multiple EndFlush mutations in a \
1036                                                 single transaction"
1037                                            ));
1038                                        }
1039                                    }
1040                                }
1041                                Mutation::DeleteVolume => {
1042                                    if let Some((flushed_object, _)) =
1043                                        &current_transaction.end_flush
1044                                    {
1045                                        if *flushed_object == object_id {
1046                                            bail!(anyhow!(FxfsError::Inconsistent).context(
1047                                                "Multiple EndFlush/DeleteVolume mutations in a \
1048                                                    single transaction for the same object"
1049                                            ));
1050                                        }
1051                                    }
1052                                    if current_transaction
1053                                        .volume_deleted
1054                                        .replace(object_id)
1055                                        .is_some()
1056                                    {
1057                                        bail!(anyhow!(FxfsError::Inconsistent).context(
1058                                            "Multiple DeleteVolume mutations in a single \
1059                                             transaction"
1060                                        ));
1061                                    }
1062                                    stores_deleted.insert(object_id);
1063                                }
1064                                _ => {}
1065                            }
1066
1067                            // If this mutation doesn't need to be applied, don't bother adding it
1068                            // to the transaction.
1069                            if (object_id_filter == INVALID_OBJECT_ID
1070                                || object_id_filter == object_id)
1071                                && self.should_apply(object_id, &current_transaction.checkpoint)
1072                            {
1073                                if object_id == root_parent_store_object_id {
1074                                    current_transaction.root_parent_mutations.push(mutation);
1075                                } else if object_id == root_store_object_id {
1076                                    current_transaction.root_mutations.push(mutation);
1077                                } else {
1078                                    current_transaction
1079                                        .non_root_mutations
1080                                        .push((object_id, mutation));
1081                                }
1082                            }
1083                        }
1084                        JournalRecord::DataChecksums(device_range, checksums, first_write) => {
1085                            let current_transaction = match current_transaction.as_mut() {
1086                                None => {
1087                                    transactions.push(JournaledTransaction::new(checkpoint));
1088                                    current_transaction = transactions.last_mut();
1089                                    current_transaction.as_mut().unwrap()
1090                                }
1091                                Some(transaction) => transaction,
1092                            };
1093                            current_transaction.checksums.push(JournaledChecksums {
1094                                device_range,
1095                                checksums,
1096                                first_write,
1097                            });
1098                        }
1099                        JournalRecord::Commit => {
1100                            if let Some(&mut JournaledTransaction {
1101                                ref checkpoint,
1102                                ref root_parent_mutations,
1103                                ref mut end_offset,
1104                                ..
1105                            }) = current_transaction.take()
1106                            {
1107                                for mutation in root_parent_mutations {
1108                                    // Snoop the mutations for any that might apply to the journal
1109                                    // file so that we can pass them to the reader so that it can
1110                                    // read the journal file.
1111                                    if let Mutation::ObjectStore(ObjectStoreMutation {
1112                                        item:
1113                                            Item {
1114                                                key:
1115                                                    ObjectKey {
1116                                                        object_id,
1117                                                        data:
1118                                                            ObjectKeyData::Attribute(
1119                                                                DEFAULT_DATA_ATTRIBUTE_ID,
1120                                                                AttributeKey::Extent(ExtentKey {
1121                                                                    range,
1122                                                                }),
1123                                                            ),
1124                                                        ..
1125                                                    },
1126                                                value:
1127                                                    ObjectValue::Extent(ExtentValue::Some {
1128                                                        device_offset,
1129                                                        ..
1130                                                    }),
1131                                                ..
1132                                            },
1133                                        ..
1134                                    }) = mutation
1135                                    {
1136                                        // Add the journal extents we find on the way to our
1137                                        // reader.
1138                                        let handle = reader.handle();
1139                                        if *object_id != handle.object_id() {
1140                                            continue;
1141                                        }
1142                                        if let Some(end_offset) = handle.end_offset() {
1143                                            if range.start != end_offset {
1144                                                bail!(anyhow!(FxfsError::Inconsistent).context(
1145                                                    format!(
1146                                                        "Unexpected journal extent {:?} -> {}, \
1147                                                           expected start: {}",
1148                                                        range, device_offset, end_offset,
1149                                                    )
1150                                                ));
1151                                            }
1152                                        }
1153                                        handle.push_extent(
1154                                            checkpoint.file_offset,
1155                                            *device_offset
1156                                                ..*device_offset
1157                                                    + range.length().context("Invalid extent")?,
1158                                        );
1159                                    }
1160                                }
1161                                *end_offset = reader.journal_file_checkpoint().file_offset;
1162                            }
1163                        }
1164                        JournalRecord::Discard(offset) => {
1165                            if offset == 0 {
1166                                bail!(
1167                                    anyhow!(FxfsError::Inconsistent)
1168                                        .context("Invalid offset for Discard")
1169                                );
1170                            }
1171                            if let Some(transaction) = current_transaction.as_ref() {
1172                                if transaction.checkpoint.file_offset < offset {
1173                                    // Odd, but OK.
1174                                    continue;
1175                                }
1176                            }
1177                            current_transaction = None;
1178                            while let Some(transaction) = transactions.last() {
1179                                if transaction.checkpoint.file_offset < offset {
1180                                    break;
1181                                }
1182                                transactions.pop();
1183                            }
1184                            reader.handle().discard_extents(offset);
1185                        }
1186                        JournalRecord::DidFlushDevice(offset) => {
1187                            if offset > device_flushed_offset {
1188                                device_flushed_offset = offset;
1189                            }
1190                        }
1191                    }
1192                }
1193                // This is expected when we reach the end of the journal stream.
1194                ReadResult::ChecksumMismatch => break,
1195            }
1196        }
1197
1198        // Discard any uncommitted transaction.
1199        if current_transaction.is_some() {
1200            transactions.pop();
1201        }
1202
1203        Ok(JournaledTransactions { transactions, device_flushed_offset })
1204    }
1205
1206    /// Creates an empty filesystem with the minimum viable objects (including a root parent and
1207    /// root store but no further child stores).
1208    pub async fn init_empty(&self, filesystem: Arc<FxFilesystem>) -> Result<(), Error> {
1209        // The following constants are only used at format time. When mounting, the recorded values
1210        // in the superblock should be used.  The root parent store does not have a parent, but
1211        // needs an object ID to be registered with ObjectManager, so it cannot collide (i.e. have
1212        // the same object ID) with any objects in the root store that use the journal to track
1213        // mutations.
1214        const INIT_ROOT_PARENT_STORE_OBJECT_ID: u64 = 3;
1215        const INIT_ROOT_STORE_OBJECT_ID: u64 = 4;
1216        const INIT_ALLOCATOR_OBJECT_ID: u64 = 5;
1217
1218        info!(device_size = filesystem.device().size(); "Formatting");
1219
1220        let checkpoint = JournalCheckpoint {
1221            version: LATEST_VERSION,
1222            ..self.inner.lock().writer.journal_file_checkpoint()
1223        };
1224
1225        let mut current_generation = 1;
1226        if filesystem.options().image_builder_mode.is_some() {
1227            // Note that in non-image_builder_mode we write both superblocks when we format
1228            // (in FxFilesystemBuilder::open). In image_builder_mode we only write once at the end
1229            // as part of finalize(), which is why we must make sure the generation we write is
1230            // newer than any existing generation.
1231
1232            // Note: This should is the *filesystem* block size, not the device block size which
1233            // is currently always 4096 (https://fxbug.dev/42063349)
1234            let block_size = filesystem.block_size();
1235            match self.read_superblocks(filesystem.device(), block_size).await {
1236                Ok((super_block, _)) => {
1237                    log::info!(
1238                        "Found existing superblock with generation {}. Bumping by 1.",
1239                        super_block.generation
1240                    );
1241                    current_generation = super_block.generation.wrapping_add(1);
1242                }
1243                Err(_) => {
1244                    // TODO(https://fxbug.dev/463757813): It's not unusual to fail to read
1245                    // superblocks when we're formatting a new filesystem but we should probably
1246                    // fail the format if we get an IO error.
1247                }
1248            }
1249        }
1250
1251        let root_parent = ObjectStore::new_empty(
1252            None,
1253            INIT_ROOT_PARENT_STORE_OBJECT_ID,
1254            filesystem.clone(),
1255            Box::new(NullCache {}),
1256        );
1257        self.objects.set_root_parent_store(root_parent.clone());
1258
1259        let allocator = Arc::new(Allocator::new(filesystem.clone(), INIT_ALLOCATOR_OBJECT_ID));
1260        self.objects.set_allocator(allocator.clone());
1261        self.objects.init_metadata_reservation()?;
1262
1263        let journal_handle;
1264        let super_block_a_handle;
1265        let super_block_b_handle;
1266        let root_store;
1267        let mut transaction = filesystem
1268            .clone()
1269            .new_transaction(
1270                lock_keys![],
1271                Options { skip_journal_checks: true, ..Default::default() },
1272            )
1273            .await?;
1274        root_store = root_parent
1275            .new_child_store(
1276                &mut transaction,
1277                NewChildStoreOptions { object_id: INIT_ROOT_STORE_OBJECT_ID, ..Default::default() },
1278                Box::new(NullCache {}),
1279            )
1280            .await
1281            .context("new_child_store")?;
1282        self.objects.set_root_store(root_store.clone());
1283
1284        allocator.create(&mut transaction).await?;
1285
1286        // Create the super-block objects...
1287        super_block_a_handle = ObjectStore::create_object_with_id(
1288            &root_store,
1289            &mut transaction,
1290            ReservedId::new(&root_store, NonZero::new(SuperBlockInstance::A.object_id()).unwrap()),
1291            HandleOptions::default(),
1292            None,
1293        )
1294        .context("create super block")?;
1295        root_store.update_last_object_id(SuperBlockInstance::A.object_id());
1296        super_block_a_handle
1297            .extend(&mut transaction, SuperBlockInstance::A.first_extent())
1298            .await
1299            .context("extend super block")?;
1300        super_block_b_handle = ObjectStore::create_object_with_id(
1301            &root_store,
1302            &mut transaction,
1303            ReservedId::new(&root_store, NonZero::new(SuperBlockInstance::B.object_id()).unwrap()),
1304            HandleOptions::default(),
1305            None,
1306        )
1307        .context("create super block")?;
1308        root_store.update_last_object_id(SuperBlockInstance::B.object_id());
1309        super_block_b_handle
1310            .extend(&mut transaction, SuperBlockInstance::B.first_extent())
1311            .await
1312            .context("extend super block")?;
1313
1314        // the journal object...
1315        journal_handle = ObjectStore::create_object(
1316            &root_parent,
1317            &mut transaction,
1318            journal_handle_options(),
1319            None,
1320        )
1321        .await
1322        .context("create journal")?;
1323        if self.inner.lock().image_builder_mode.is_none() {
1324            let mut file_range = 0..self.chunk_size();
1325            journal_handle
1326                .preallocate_range(&mut transaction, &mut file_range)
1327                .await
1328                .context("preallocate journal")?;
1329            if file_range.start < file_range.end {
1330                bail!("preallocate_range returned too little space");
1331            }
1332        }
1333
1334        // Write the root store object info.
1335        root_store.create(&mut transaction).await?;
1336
1337        // The root parent graveyard.
1338        root_parent.set_graveyard_directory_object_id(
1339            Graveyard::create(&mut transaction, &root_parent).await?,
1340        );
1341
1342        transaction.commit().await?;
1343
1344        self.inner.lock().super_block_header = SuperBlockHeader::new(
1345            current_generation,
1346            root_parent.store_object_id(),
1347            root_parent.graveyard_directory_object_id(),
1348            root_store.store_object_id(),
1349            allocator.object_id(),
1350            journal_handle.object_id(),
1351            checkpoint,
1352            /* earliest_version: */ LATEST_VERSION,
1353        );
1354
1355        // Initialize the journal writer.
1356        let _ = self.handle.set(journal_handle);
1357        Ok(())
1358    }
1359
1360    /// Normally we allocate the journal when creating the filesystem.
1361    /// This is used image_builder_mode when journal allocation is done last.
1362    pub async fn allocate_journal(&self) -> Result<(), Error> {
1363        let handle = self.handle.get().unwrap();
1364        let filesystem = handle.store().filesystem();
1365        let mut transaction = filesystem
1366            .clone()
1367            .new_transaction(
1368                lock_keys![LockKey::object(handle.store().store_object_id(), handle.object_id()),],
1369                Options { skip_journal_checks: true, ..Default::default() },
1370            )
1371            .await?;
1372        let mut file_range = 0..self.chunk_size();
1373        self.handle
1374            .get()
1375            .unwrap()
1376            .preallocate_range(&mut transaction, &mut file_range)
1377            .await
1378            .context("preallocate journal")?;
1379        if file_range.start < file_range.end {
1380            bail!("preallocate_range returned too little space");
1381        }
1382        transaction.commit().await?;
1383        Ok(())
1384    }
1385
1386    pub async fn init_superblocks(&self) -> Result<(), Error> {
1387        // Overwrite both superblocks.
1388        for _ in 0..2 {
1389            self.write_super_block().await?;
1390        }
1391        Ok(())
1392    }
1393
1394    /// Takes a snapshot of all journaled transactions which affect |object_id| since its last
1395    /// flush.
1396    /// The caller is responsible for locking; it must ensure that the journal is not trimmed during
1397    /// this call.  For example, a Flush lock could be held on the object in question (assuming that
1398    /// object has data to flush and is registered with ObjectManager).
1399    pub async fn read_transactions_for_object(
1400        &self,
1401        object_id: u64,
1402    ) -> Result<Vec<JournaledTransaction>, Error> {
1403        let handle = self.handle.get().expect("No journal handle");
1404        // Reopen the handle since JournalReader needs an owned handle.
1405        let handle = ObjectStore::open_object(
1406            handle.owner(),
1407            handle.object_id(),
1408            journal_handle_options(),
1409            None,
1410        )
1411        .await?;
1412
1413        let checkpoint = match self.objects.journal_checkpoint(object_id) {
1414            Some(checkpoint) => checkpoint,
1415            None => return Ok(vec![]),
1416        };
1417        let mut reader = JournalReader::new(handle, &checkpoint);
1418        // Record the current end offset and only read to there, so we don't accidentally read any
1419        // partially flushed blocks.
1420        let end_offset = self.inner.lock().valid_to;
1421        Ok(self.read_transactions(&mut reader, Some(end_offset), object_id).await?.transactions)
1422    }
1423
1424    /// Commits a transaction.  This is not thread safe; the caller must take appropriate locks.
1425    pub async fn commit(&self, transaction: &mut Transaction<'_>) -> Result<u64, Error> {
1426        if transaction.is_empty() {
1427            return Ok(self.inner.lock().writer.journal_file_checkpoint().file_offset);
1428        }
1429
1430        self.pre_commit(transaction).await?;
1431        Ok(self.write_and_apply_mutations(transaction))
1432    }
1433
1434    // Before we commit, we might need to extend the journal or write pending records to the
1435    // journal.
1436    async fn pre_commit(&self, transaction: &Transaction<'_>) -> Result<(), Error> {
1437        let handle;
1438
1439        let (size, zero_offset) = {
1440            let mut inner = self.inner.lock();
1441
1442            // If this is the first write after a RESET, we need to output version first.
1443            if std::mem::take(&mut inner.output_reset_version) {
1444                LATEST_VERSION.serialize_into(&mut inner.writer)?;
1445            }
1446
1447            if let Some(discard_offset) = inner.discard_offset {
1448                JournalRecord::Discard(discard_offset).serialize_into(&mut inner.writer)?;
1449                inner.discard_offset = None;
1450            }
1451
1452            if inner.needs_did_flush_device {
1453                let offset = inner.device_flushed_offset;
1454                JournalRecord::DidFlushDevice(offset).serialize_into(&mut inner.writer)?;
1455                inner.needs_did_flush_device = false;
1456            }
1457
1458            handle = match self.handle.get() {
1459                None => return Ok(()),
1460                Some(x) => x,
1461            };
1462
1463            let file_offset = inner.writer.journal_file_checkpoint().file_offset;
1464
1465            let size = handle.get_size();
1466            let size = if file_offset + self.chunk_size() > size { Some(size) } else { None };
1467
1468            if size.is_none()
1469                && inner.zero_offset.is_none()
1470                && !self.objects.needs_borrow_for_journal(file_offset)
1471            {
1472                return Ok(());
1473            }
1474
1475            (size, inner.zero_offset)
1476        };
1477
1478        let mut transaction = handle
1479            .new_transaction_with_options(Options {
1480                skip_journal_checks: true,
1481                borrow_metadata_space: true,
1482                allocator_reservation: Some(self.objects.metadata_reservation()),
1483                txn_guard: Some(transaction.txn_guard()),
1484                ..Default::default()
1485            })
1486            .await?;
1487        if let Some(size) = size {
1488            handle
1489                .preallocate_range(&mut transaction, &mut (size..size + self.chunk_size()))
1490                .await?;
1491        }
1492        if let Some(zero_offset) = zero_offset {
1493            handle.zero(&mut transaction, 0..zero_offset).await?;
1494        }
1495
1496        // We can't use regular transaction commit, because that can cause re-entrancy issues, so
1497        // instead we just apply the transaction directly here.
1498        self.write_and_apply_mutations(&mut transaction);
1499
1500        let mut inner = self.inner.lock();
1501
1502        // Make sure the transaction to extend the journal made it to the journal within the old
1503        // size, since otherwise, it won't be possible to replay.
1504        if let Some(size) = size {
1505            assert!(inner.writer.journal_file_checkpoint().file_offset < size);
1506        }
1507
1508        if inner.zero_offset == zero_offset {
1509            inner.zero_offset = None;
1510        }
1511
1512        Ok(())
1513    }
1514
1515    // Determines whether a mutation at the given checkpoint should be applied.  During replay, not
1516    // all records should be applied because the object store or allocator might already contain the
1517    // mutation.  After replay, that obviously isn't the case and we want to apply all mutations.
1518    fn should_apply(&self, object_id: u64, journal_file_checkpoint: &JournalCheckpoint) -> bool {
1519        let super_block_header = &self.inner.lock().super_block_header;
1520        let offset = super_block_header
1521            .journal_file_offsets
1522            .get(&object_id)
1523            .cloned()
1524            .unwrap_or(super_block_header.super_block_journal_file_offset);
1525        journal_file_checkpoint.file_offset >= offset
1526    }
1527
1528    /// Flushes previous writes to the device and then writes out a new super-block.
1529    /// Callers must ensure that we do not make concurrent calls.
1530    async fn write_super_block(&self) -> Result<(), Error> {
1531        let root_parent_store = self.objects.root_parent_store();
1532
1533        // We need to flush previous writes to the device since the new super-block we are writing
1534        // relies on written data being observable, and we also need to lock the root parent store
1535        // so that no new entries are written to it whilst we are writing the super-block, and for
1536        // that we use the write lock.
1537        let old_layers;
1538        let old_super_block_offset;
1539        let mut new_super_block_header;
1540        let checkpoint;
1541        let borrowed;
1542
1543        {
1544            let _sync_guard = debug_assert_not_too_long!(self.sync_mutex.lock());
1545            {
1546                let _write_guard = self.writer_mutex.lock();
1547                (checkpoint, borrowed) = self.pad_to_block()?;
1548                old_layers = super_block::compact_root_parent(&*root_parent_store)?;
1549            }
1550            self.flush_device(checkpoint.file_offset)
1551                .await
1552                .context("flush failed when writing superblock")?;
1553        }
1554
1555        new_super_block_header = self.inner.lock().super_block_header.clone();
1556
1557        old_super_block_offset = new_super_block_header.journal_checkpoint.file_offset;
1558
1559        let (journal_file_offsets, min_checkpoint) = self.objects.journal_file_offsets();
1560
1561        new_super_block_header.generation = new_super_block_header.generation.wrapping_add(1);
1562        new_super_block_header.super_block_journal_file_offset = checkpoint.file_offset;
1563        new_super_block_header.journal_checkpoint = min_checkpoint.unwrap_or(checkpoint);
1564        new_super_block_header.journal_checkpoint.version = LATEST_VERSION;
1565        new_super_block_header.journal_file_offsets = journal_file_offsets;
1566        new_super_block_header.borrowed_metadata_space = borrowed;
1567
1568        self.super_block_manager
1569            .save(
1570                new_super_block_header.clone(),
1571                self.objects.root_parent_store().filesystem(),
1572                old_layers,
1573            )
1574            .await?;
1575        {
1576            let mut inner = self.inner.lock();
1577            inner.super_block_header = new_super_block_header;
1578            inner.zero_offset = Some(round_down(old_super_block_offset, BLOCK_SIZE));
1579        }
1580
1581        Ok(())
1582    }
1583
1584    /// Flushes any buffered journal data to the device.  Note that this does not flush the device
1585    /// unless the flush_device option is set, in which case data should have been persisted to
1586    /// lower layers.  If a precondition is supplied, it is evaluated and the sync will be skipped
1587    /// if it returns false.  This allows callers to check a condition whilst a lock is held.  If a
1588    /// sync is performed, this function returns the checkpoint that was flushed and the amount of
1589    /// borrowed metadata space at the point it was flushed.
1590    pub async fn sync(
1591        &self,
1592        options: SyncOptions<'_>,
1593    ) -> Result<Option<(JournalCheckpoint, u64)>, Error> {
1594        let _guard = debug_assert_not_too_long!(self.sync_mutex.lock());
1595
1596        let (checkpoint, borrowed) = {
1597            if let Some(precondition) = options.precondition {
1598                if !precondition() {
1599                    return Ok(None);
1600                }
1601            }
1602
1603            // This guard is required so that we don't insert an EndBlock record in the middle of a
1604            // transaction.
1605            let _guard = self.writer_mutex.lock();
1606
1607            self.pad_to_block()?
1608        };
1609
1610        if options.flush_device {
1611            self.flush_device(checkpoint.file_offset).await.context("sync: flush failed")?;
1612        }
1613
1614        Ok(Some((checkpoint, borrowed)))
1615    }
1616
1617    // Returns the checkpoint as it was prior to padding.  This is done because the super block
1618    // needs to record where the last transaction ends and it's the next transaction that pays the
1619    // price of the padding.
1620    fn pad_to_block(&self) -> Result<(JournalCheckpoint, u64), Error> {
1621        let mut inner = self.inner.lock();
1622        let checkpoint = inner.writer.journal_file_checkpoint();
1623        if checkpoint.file_offset % BLOCK_SIZE != 0 {
1624            JournalRecord::EndBlock.serialize_into(&mut inner.writer)?;
1625            inner.writer.pad_to_block()?;
1626            if let Some(waker) = inner.flush_waker.take() {
1627                waker.wake();
1628            }
1629        }
1630        Ok((checkpoint, self.objects.borrowed_metadata_space()))
1631    }
1632
1633    async fn flush_device(&self, checkpoint_offset: u64) -> Result<(), Error> {
1634        assert!(
1635            self.inner.lock().image_builder_mode.is_none(),
1636            "flush_device called in image builder mode"
1637        );
1638        debug_assert_not_too_long!(poll_fn(|ctx| {
1639            let mut inner = self.inner.lock();
1640            if inner.flushed_offset >= checkpoint_offset {
1641                Poll::Ready(Ok(()))
1642            } else if inner.terminate {
1643                let context = inner
1644                    .terminate_reason
1645                    .as_ref()
1646                    .map(|e| format!("Journal closed with error: {:?}", e))
1647                    .unwrap_or_else(|| "Journal closed".to_string());
1648                Poll::Ready(Err(anyhow!(FxfsError::JournalFlushError).context(context)))
1649            } else {
1650                inner.sync_waker = Some(ctx.waker().clone());
1651                Poll::Pending
1652            }
1653        }))?;
1654
1655        let needs_flush = self.inner.lock().device_flushed_offset < checkpoint_offset;
1656        if needs_flush {
1657            let trace = self.trace.load(Ordering::Relaxed);
1658            if trace {
1659                info!("J: start flush device");
1660            }
1661            self.handle.get().unwrap().flush_device().await?;
1662            if trace {
1663                info!("J: end flush device");
1664            }
1665
1666            // We need to write a DidFlushDevice record at some point, but if we are in the
1667            // process of shutting down the filesystem, we want to leave the journal clean to
1668            // avoid there being log messages complaining about unwritten journal data, so we
1669            // queue it up so that the next transaction will trigger this record to be written.
1670            // If we are shutting down, that will never happen but since the DidFlushDevice
1671            // message is purely advisory (it reduces the number of checksums we have to verify
1672            // during replay), it doesn't matter if it isn't written.
1673            {
1674                let mut inner = self.inner.lock();
1675                inner.device_flushed_offset = checkpoint_offset;
1676                inner.needs_did_flush_device = true;
1677            }
1678
1679            // Tell the allocator that we flushed the device so that it can now start using
1680            // space that was deallocated.
1681            self.objects.allocator().did_flush_device(checkpoint_offset);
1682            if trace {
1683                info!("J: did flush device");
1684            }
1685        }
1686
1687        Ok(())
1688    }
1689
1690    /// Returns a copy of the super-block header.
1691    pub fn super_block_header(&self) -> SuperBlockHeader {
1692        self.inner.lock().super_block_header.clone()
1693    }
1694
1695    /// Waits for there to be sufficient space in the journal.
1696    pub async fn check_journal_space(&self) -> Result<(), Error> {
1697        loop {
1698            debug_assert_not_too_long!({
1699                let inner = self.inner.lock();
1700                if inner.terminate {
1701                    // If the flush error is set, this will never make progress, since we can't
1702                    // extend the journal any more.
1703                    let context = inner
1704                        .terminate_reason
1705                        .as_ref()
1706                        .map(|e| format!("Journal closed with error: {:?}", e))
1707                        .unwrap_or_else(|| "Journal closed".to_string());
1708                    break Err(anyhow!(FxfsError::JournalFlushError).context(context));
1709                }
1710                if self.objects.last_end_offset()
1711                    - inner.super_block_header.journal_checkpoint.file_offset
1712                    < inner.reclaim_size
1713                {
1714                    break Ok(());
1715                }
1716                if inner.image_builder_mode.is_some() {
1717                    break Ok(());
1718                }
1719                if inner.disable_compactions {
1720                    break Err(
1721                        anyhow!(FxfsError::JournalFlushError).context("Compactions disabled")
1722                    );
1723                }
1724                self.reclaim_event.listen()
1725            });
1726        }
1727    }
1728
1729    fn chunk_size(&self) -> u64 {
1730        CHUNK_SIZE
1731    }
1732
1733    fn write_and_apply_mutations(&self, transaction: &mut Transaction<'_>) -> u64 {
1734        let checkpoint_before;
1735        let checkpoint_after;
1736        {
1737            let _guard = self.writer_mutex.lock();
1738            checkpoint_before = {
1739                let mut inner = self.inner.lock();
1740                if transaction.includes_write() {
1741                    inner.needs_barrier = true;
1742                }
1743                let checkpoint = inner.writer.journal_file_checkpoint();
1744                for TxnMutation { object_id, mutation, .. } in transaction.mutations() {
1745                    self.objects.write_mutation(
1746                        *object_id,
1747                        mutation,
1748                        Writer(*object_id, &mut inner.writer),
1749                    );
1750                }
1751                checkpoint
1752            };
1753            let maybe_mutation =
1754                self.objects.apply_transaction(transaction, &checkpoint_before).expect(
1755                    "apply_transaction should not fail in live mode; \
1756                     filesystem will be in an inconsistent state",
1757                );
1758            checkpoint_after = {
1759                let mut inner = self.inner.lock();
1760                if let Some(mutation) = maybe_mutation {
1761                    inner
1762                        .writer
1763                        .write_record(&JournalRecord::Mutation { object_id: 0, mutation })
1764                        .unwrap();
1765                }
1766                for (device_range, checksums, first_write) in
1767                    transaction.take_checksums().into_iter()
1768                {
1769                    inner
1770                        .writer
1771                        .write_record(&JournalRecord::DataChecksums(
1772                            device_range,
1773                            Checksums::fletcher(checksums),
1774                            first_write,
1775                        ))
1776                        .unwrap();
1777                }
1778                inner.writer.write_record(&JournalRecord::Commit).unwrap();
1779
1780                inner.writer.journal_file_checkpoint()
1781            };
1782        }
1783        self.objects.did_commit_transaction(
1784            transaction,
1785            &checkpoint_before,
1786            checkpoint_after.file_offset,
1787        );
1788
1789        if let Some(waker) = self.inner.lock().flush_waker.take() {
1790            waker.wake();
1791        }
1792
1793        checkpoint_before.file_offset
1794    }
1795
1796    /// This task will flush journal data to the device when there is data that needs flushing, and
1797    /// trigger compactions when short of journal space.  It will return after the terminate method
1798    /// has been called, or an error is encountered with either flushing or compaction.
1799    pub async fn flush_task(self: Arc<Self>) {
1800        let mut flush_fut = None;
1801        let mut compact_fut = None;
1802        let mut flush_error = false;
1803        poll_fn(|ctx| {
1804            loop {
1805                {
1806                    let mut inner = self.inner.lock();
1807                    if flush_fut.is_none() && !flush_error && self.handle.get().is_some() {
1808                        let flushable = inner.writer.flushable_bytes();
1809                        if flushable > 0 {
1810                            flush_fut = Some(Box::pin(self.flush(flushable)));
1811                        }
1812                    }
1813                    if inner.terminate && flush_fut.is_none() && compact_fut.is_none() {
1814                        return Poll::Ready(());
1815                    }
1816                    // The / 2 is here because after compacting, we cannot reclaim the space until the
1817                    // _next_ time we flush the device since the super-block is not guaranteed to
1818                    // persist until then.
1819                    if compact_fut.is_none()
1820                        && !inner.terminate
1821                        && !inner.disable_compactions
1822                        && inner.image_builder_mode.is_none()
1823                        && self.objects.last_end_offset()
1824                            - inner.super_block_header.journal_checkpoint.file_offset
1825                            > inner.reclaim_size / 2
1826                    {
1827                        compact_fut = Some(Box::pin(self.compact()));
1828                        inner.compaction_running = true;
1829                    }
1830                    inner.flush_waker = Some(ctx.waker().clone());
1831                }
1832                let mut pending = true;
1833                if let Some(fut) = flush_fut.as_mut() {
1834                    if let Poll::Ready(result) = fut.poll_unpin(ctx) {
1835                        if let Err(e) = result {
1836                            self.inner.lock().terminate(Some(e.context("Flush error")));
1837                            self.reclaim_event.notify(usize::MAX);
1838                            flush_error = true;
1839                        }
1840                        flush_fut = None;
1841                        pending = false;
1842                    }
1843                }
1844                if let Some(fut) = compact_fut.as_mut() {
1845                    if let Poll::Ready(result) = fut.poll_unpin(ctx) {
1846                        let mut inner = self.inner.lock();
1847                        if let Err(e) = result {
1848                            inner.terminate(Some(e.context("Compaction error")));
1849                        }
1850                        compact_fut = None;
1851                        inner.compaction_running = false;
1852                        self.reclaim_event.notify(usize::MAX);
1853                        pending = false;
1854                    }
1855                }
1856                if pending {
1857                    return Poll::Pending;
1858                }
1859            }
1860        })
1861        .await;
1862    }
1863
1864    /// Returns a yielder that can be used for compactions.
1865    pub fn get_compaction_yielder(&self) -> CompactionYielder<'_> {
1866        CompactionYielder::new(self)
1867    }
1868
1869    async fn flush(&self, amount: usize) -> Result<(), Error> {
1870        let handle = self.handle.get().unwrap();
1871        let mut buf = handle.allocate_buffer(amount).await;
1872        let (offset, len, barrier_on_first_write) = {
1873            let mut inner = self.inner.lock();
1874            let offset = inner.writer.take_flushable(buf.as_mut());
1875            let barrier_on_first_write = inner.needs_barrier && inner.barriers_enabled;
1876            // Reset `needs_barrier` before instead of after the overwrite in case a txn commit
1877            // that contains data happens during the overwrite.
1878            inner.needs_barrier = false;
1879            (offset, buf.len() as u64, barrier_on_first_write)
1880        };
1881        self.handle
1882            .get()
1883            .unwrap()
1884            .overwrite(
1885                offset,
1886                buf.as_mut(),
1887                OverwriteOptions { barrier_on_first_write, ..Default::default() },
1888            )
1889            .await?;
1890
1891        let mut inner = self.inner.lock();
1892        if let Some(waker) = inner.sync_waker.take() {
1893            waker.wake();
1894        }
1895        inner.flushed_offset = offset + len;
1896        inner.valid_to = inner.flushed_offset;
1897        Ok(())
1898    }
1899
1900    #[trace]
1901    async fn compact(&self) -> Result<(), Error> {
1902        assert!(
1903            self.inner.lock().image_builder_mode.is_none(),
1904            "compact called in image builder mode"
1905        );
1906        let bytes_before = self.objects.compaction_bytes_written();
1907        let _measure = crate::metrics::DurationMeasureScope::new(
1908            &crate::metrics::lsm_tree_metrics().journal_compaction_time,
1909        );
1910        crate::metrics::lsm_tree_metrics().journal_compactions_total.add(1);
1911        let trace = self.trace.load(Ordering::Relaxed);
1912        debug!("Compaction starting");
1913        if trace {
1914            info!("J: start compaction");
1915        }
1916        let earliest_version = self.objects.flush().await.context("Failed to flush objects")?;
1917        self.inner.lock().super_block_header.earliest_version = earliest_version;
1918        self.write_super_block().await.context("Failed to write superblock")?;
1919        if trace {
1920            info!("J: end compaction");
1921        }
1922        debug!("Compaction finished");
1923        let bytes_after = self.objects.compaction_bytes_written();
1924        crate::metrics::lsm_tree_metrics()
1925            .journal_compaction_bytes_written
1926            .add(bytes_after.saturating_sub(bytes_before));
1927        Ok(())
1928    }
1929
1930    /// This should generally NOT be called externally. It is public to allow use by FIDL service
1931    /// fxfs.Debug.
1932    pub async fn force_compact(&self) -> Result<(), Error> {
1933        self.inner.lock().forced_compaction = true;
1934        scopeguard::defer! { self.inner.lock().forced_compaction = false; }
1935        self.compact().await
1936    }
1937
1938    pub async fn stop_compactions(&self) {
1939        loop {
1940            debug_assert_not_too_long!({
1941                let mut inner = self.inner.lock();
1942                inner.disable_compactions = true;
1943                if !inner.compaction_running {
1944                    return;
1945                }
1946                self.reclaim_event.listen()
1947            });
1948        }
1949    }
1950
1951    /// Creates a lazy inspect node named `str` under `parent` which will yield statistics for the
1952    /// journal when queried.
1953    pub fn track_statistics(self: &Arc<Self>, parent: &fuchsia_inspect::Node, name: &str) {
1954        let this = Arc::downgrade(self);
1955        parent.record_lazy_child(name, move || {
1956            let this_clone = this.clone();
1957            async move {
1958                let inspector = fuchsia_inspect::Inspector::default();
1959                if let Some(this) = this_clone.upgrade() {
1960                    let (journal_min, journal_max, journal_reclaim_size) = {
1961                        // TODO(https://fxbug.dev/42069513): Push-back or rate-limit to prevent DoS.
1962                        let inner = this.inner.lock();
1963                        (
1964                            round_down(
1965                                inner.super_block_header.journal_checkpoint.file_offset,
1966                                BLOCK_SIZE,
1967                            ),
1968                            inner.flushed_offset,
1969                            inner.reclaim_size,
1970                        )
1971                    };
1972                    let root = inspector.root();
1973                    root.record_uint("journal_min_offset", journal_min);
1974                    root.record_uint("journal_max_offset", journal_max);
1975                    root.record_uint("journal_size", journal_max - journal_min);
1976                    root.record_uint("journal_reclaim_size", journal_reclaim_size);
1977
1978                    // TODO(https://fxbug.dev/42068224): Post-compute rather than manually computing metrics.
1979                    if let Some(x) = round_div(
1980                        100 * (journal_max - journal_min),
1981                        this.objects.allocator().get_disk_bytes(),
1982                    ) {
1983                        root.record_uint("journal_size_to_disk_size_percent", x);
1984                    }
1985                }
1986                Ok(inspector)
1987            }
1988            .boxed()
1989        });
1990    }
1991
1992    /// Terminate all journal activity.
1993    pub fn terminate(&self) {
1994        self.inner.lock().terminate(/*reason*/ None);
1995        self.reclaim_event.notify(usize::MAX);
1996    }
1997}
1998
1999/// Wrapper to allow records to be written to the journal.
2000pub struct Writer<'a>(u64, &'a mut JournalWriter);
2001
2002impl Writer<'_> {
2003    pub fn write(&mut self, mutation: Mutation) {
2004        self.1.write_record(&JournalRecord::Mutation { object_id: self.0, mutation }).unwrap();
2005    }
2006}
2007
2008#[cfg(target_os = "fuchsia")]
2009mod yielder {
2010    use super::Journal;
2011    use crate::lsm_tree::Yielder;
2012    use fuchsia_async as fasync;
2013
2014    /// CompactionYielder uses fuchsia-async to yield if other tasks are being polled, which should
2015    /// be a proxy for how busy the system is.  We can afford to delay compactions for a small
2016    /// amount of time but not so long that we end up blocking new transactions.
2017    pub struct CompactionYielder<'a> {
2018        journal: &'a Journal,
2019        low_priority_task: Option<fasync::LowPriorityTask>,
2020    }
2021
2022    impl<'a> CompactionYielder<'a> {
2023        pub fn new(journal: &'a Journal) -> Self {
2024            Self { journal, low_priority_task: None }
2025        }
2026    }
2027
2028    impl Yielder for CompactionYielder<'_> {
2029        async fn yield_now(&mut self) {
2030            // We will wait for the executor to be idle for 4ms, but no longer than 16ms.  We need
2031            // to cap the maximum amount of time we wait in case we've reached a point where
2032            // compaction is now urgent or else we could block new transactions.
2033            const IDLE_PERIOD: zx::MonotonicDuration = zx::MonotonicDuration::from_millis(4);
2034            const MAX_YIELD_DURATION: zx::MonotonicDuration =
2035                zx::MonotonicDuration::from_millis(16);
2036
2037            {
2038                let inner = self.journal.inner.lock();
2039                if inner.forced_compaction {
2040                    return;
2041                }
2042                let outstanding = self.journal.objects.last_end_offset()
2043                    - inner.super_block_header.journal_checkpoint.file_offset;
2044                let half_reclaim_size = inner.reclaim_size / 2;
2045                if outstanding
2046                    .checked_sub(half_reclaim_size)
2047                    .is_some_and(|x| x >= half_reclaim_size / 2)
2048                {
2049                    // If we have got to the point where we have used up 3/4 of reclaim size in the
2050                    // journal, do not delay any further.  If we continue to yield we will get to
2051                    // the point where we block new transactions.
2052                    self.low_priority_task = None;
2053                    return;
2054                }
2055            }
2056
2057            self.low_priority_task
2058                .get_or_insert_with(|| fasync::LowPriorityTask::new())
2059                .wait_until_idle_for(
2060                    IDLE_PERIOD,
2061                    fasync::MonotonicInstant::after(MAX_YIELD_DURATION),
2062                )
2063                .await;
2064        }
2065    }
2066}
2067
2068#[cfg(not(target_os = "fuchsia"))]
2069mod yielder {
2070    use super::Journal;
2071    use crate::lsm_tree::Yielder;
2072
2073    #[expect(dead_code)]
2074    pub struct CompactionYielder<'a>(&'a Journal);
2075
2076    impl<'a> CompactionYielder<'a> {
2077        pub fn new(journal: &'a Journal) -> Self {
2078            Self(journal)
2079        }
2080    }
2081
2082    impl Yielder for CompactionYielder<'_> {
2083        async fn yield_now(&mut self) {}
2084    }
2085}
2086
2087pub use yielder::*;
2088
2089#[cfg(test)]
2090mod tests {
2091    use super::SuperBlockInstance;
2092    use crate::filesystem::{FxFilesystem, FxFilesystemBuilder, SyncOptions};
2093    use crate::fsck::fsck;
2094    use crate::object_handle::{ObjectHandle, ReadObjectHandle, WriteObjectHandle};
2095    use crate::object_store::directory::Directory;
2096    use crate::object_store::transaction::Options;
2097    use crate::object_store::volume::root_volume;
2098    use crate::object_store::{
2099        HandleOptions, LockKey, NewChildStoreOptions, ObjectStore, StoreOptions, lock_keys,
2100    };
2101    use fuchsia_async as fasync;
2102    use fuchsia_async::MonotonicDuration;
2103    use storage_device::DeviceHolder;
2104    use storage_device::fake_device::FakeDevice;
2105
2106    const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
2107
2108    #[fuchsia::test]
2109    async fn test_replay() {
2110        const TEST_DATA: &[u8] = b"hello";
2111
2112        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
2113
2114        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2115
2116        let object_id = {
2117            let root_store = fs.root_store();
2118            let root_directory =
2119                Directory::open(&root_store, root_store.root_directory_object_id())
2120                    .await
2121                    .expect("open failed");
2122            let mut transaction = fs
2123                .clone()
2124                .new_transaction(
2125                    lock_keys![LockKey::object(
2126                        root_store.store_object_id(),
2127                        root_store.root_directory_object_id(),
2128                    )],
2129                    Options::default(),
2130                )
2131                .await
2132                .expect("new_transaction failed");
2133            let handle = root_directory
2134                .create_child_file(&mut transaction, "test")
2135                .await
2136                .expect("create_child_file failed");
2137
2138            transaction.commit().await.expect("commit failed");
2139            let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
2140            buf.as_mut_slice().copy_from_slice(TEST_DATA);
2141            handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2142            // As this is the first sync, this will actually trigger a new super-block, but normally
2143            // this would not be the case.
2144            fs.sync(SyncOptions::default()).await.expect("sync failed");
2145            handle.object_id()
2146        };
2147
2148        {
2149            fs.close().await.expect("Close failed");
2150            let device = fs.take_device().await;
2151            device.reopen(false);
2152            let fs = FxFilesystem::open(device).await.expect("open failed");
2153            let handle = ObjectStore::open_object(
2154                &fs.root_store(),
2155                object_id,
2156                HandleOptions::default(),
2157                None,
2158            )
2159            .await
2160            .expect("open_object failed");
2161            let mut buf = handle.allocate_buffer(TEST_DEVICE_BLOCK_SIZE as usize).await;
2162            assert_eq!(handle.read(0, buf.as_mut()).await.expect("read failed"), TEST_DATA.len());
2163            assert_eq!(&buf.as_slice()[..TEST_DATA.len()], TEST_DATA);
2164            fsck(fs.clone()).await.expect("fsck failed");
2165            fs.close().await.expect("Close failed");
2166        }
2167    }
2168
2169    #[fuchsia::test]
2170    async fn test_reset() {
2171        const TEST_DATA: &[u8] = b"hello";
2172
2173        let device = DeviceHolder::new(FakeDevice::new(32768, TEST_DEVICE_BLOCK_SIZE));
2174
2175        let mut object_ids = Vec::new();
2176
2177        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2178        {
2179            let root_store = fs.root_store();
2180            let root_directory =
2181                Directory::open(&root_store, root_store.root_directory_object_id())
2182                    .await
2183                    .expect("open failed");
2184            let mut transaction = fs
2185                .clone()
2186                .new_transaction(
2187                    lock_keys![LockKey::object(
2188                        root_store.store_object_id(),
2189                        root_store.root_directory_object_id(),
2190                    )],
2191                    Options::default(),
2192                )
2193                .await
2194                .expect("new_transaction failed");
2195            let handle = root_directory
2196                .create_child_file(&mut transaction, "test")
2197                .await
2198                .expect("create_child_file failed");
2199            transaction.commit().await.expect("commit failed");
2200            let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
2201            buf.as_mut_slice().copy_from_slice(TEST_DATA);
2202            handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2203            fs.sync(SyncOptions::default()).await.expect("sync failed");
2204            object_ids.push(handle.object_id());
2205
2206            // Create a lot of objects but don't sync at the end. This should leave the filesystem
2207            // with a half finished transaction that cannot be replayed.
2208            for i in 0..1000 {
2209                let mut transaction = fs
2210                    .clone()
2211                    .new_transaction(
2212                        lock_keys![LockKey::object(
2213                            root_store.store_object_id(),
2214                            root_store.root_directory_object_id(),
2215                        )],
2216                        Options::default(),
2217                    )
2218                    .await
2219                    .expect("new_transaction failed");
2220                let handle = root_directory
2221                    .create_child_file(&mut transaction, &format!("{}", i))
2222                    .await
2223                    .expect("create_child_file failed");
2224                transaction.commit().await.expect("commit failed");
2225                let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
2226                buf.as_mut_slice().copy_from_slice(TEST_DATA);
2227                handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2228                object_ids.push(handle.object_id());
2229            }
2230        }
2231        fs.close().await.expect("fs close failed");
2232        let device = fs.take_device().await;
2233        device.reopen(false);
2234        let fs = FxFilesystem::open(device).await.expect("open failed");
2235        fsck(fs.clone()).await.expect("fsck failed");
2236        {
2237            let root_store = fs.root_store();
2238            // Check the first two objects which should exist.
2239            for &object_id in &object_ids[0..1] {
2240                let handle = ObjectStore::open_object(
2241                    &root_store,
2242                    object_id,
2243                    HandleOptions::default(),
2244                    None,
2245                )
2246                .await
2247                .expect("open_object failed");
2248                let mut buf = handle.allocate_buffer(TEST_DEVICE_BLOCK_SIZE as usize).await;
2249                assert_eq!(
2250                    handle.read(0, buf.as_mut()).await.expect("read failed"),
2251                    TEST_DATA.len()
2252                );
2253                assert_eq!(&buf.as_slice()[..TEST_DATA.len()], TEST_DATA);
2254            }
2255
2256            // Write one more object and sync.
2257            let root_directory =
2258                Directory::open(&root_store, root_store.root_directory_object_id())
2259                    .await
2260                    .expect("open failed");
2261            let mut transaction = fs
2262                .clone()
2263                .new_transaction(
2264                    lock_keys![LockKey::object(
2265                        root_store.store_object_id(),
2266                        root_store.root_directory_object_id(),
2267                    )],
2268                    Options::default(),
2269                )
2270                .await
2271                .expect("new_transaction failed");
2272            let handle = root_directory
2273                .create_child_file(&mut transaction, "test2")
2274                .await
2275                .expect("create_child_file failed");
2276            transaction.commit().await.expect("commit failed");
2277            let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
2278            buf.as_mut_slice().copy_from_slice(TEST_DATA);
2279            handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2280            fs.sync(SyncOptions::default()).await.expect("sync failed");
2281            object_ids.push(handle.object_id());
2282        }
2283
2284        fs.close().await.expect("close failed");
2285        let device = fs.take_device().await;
2286        device.reopen(false);
2287        let fs = FxFilesystem::open(device).await.expect("open failed");
2288        {
2289            fsck(fs.clone()).await.expect("fsck failed");
2290
2291            // Check the first two and the last objects.
2292            for &object_id in object_ids[0..1].iter().chain(object_ids.last().cloned().iter()) {
2293                let handle = ObjectStore::open_object(
2294                    &fs.root_store(),
2295                    object_id,
2296                    HandleOptions::default(),
2297                    None,
2298                )
2299                .await
2300                .unwrap_or_else(|e| {
2301                    panic!("open_object failed (object_id: {}): {:?}", object_id, e)
2302                });
2303                let mut buf = handle.allocate_buffer(TEST_DEVICE_BLOCK_SIZE as usize).await;
2304                assert_eq!(
2305                    handle.read(0, buf.as_mut()).await.expect("read failed"),
2306                    TEST_DATA.len()
2307                );
2308                assert_eq!(&buf.as_slice()[..TEST_DATA.len()], TEST_DATA);
2309            }
2310        }
2311        fs.close().await.expect("close failed");
2312    }
2313
2314    #[fuchsia::test]
2315    async fn test_discard() {
2316        let device = {
2317            let device = DeviceHolder::new(FakeDevice::new(8192, 4096));
2318            let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2319            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
2320
2321            let store = root_volume
2322                .new_volume("test", NewChildStoreOptions::default())
2323                .await
2324                .expect("new_volume failed");
2325            let root_directory = Directory::open(&store, store.root_directory_object_id())
2326                .await
2327                .expect("open failed");
2328
2329            // Create enough data so that another journal extent is used.
2330            let mut i = 0;
2331            loop {
2332                let mut transaction = fs
2333                    .clone()
2334                    .new_transaction(
2335                        lock_keys![LockKey::object(
2336                            store.store_object_id(),
2337                            store.root_directory_object_id()
2338                        )],
2339                        Options::default(),
2340                    )
2341                    .await
2342                    .expect("new_transaction failed");
2343                root_directory
2344                    .create_child_file(&mut transaction, &format!("a {i}"))
2345                    .await
2346                    .expect("create_child_file failed");
2347                if transaction.commit().await.expect("commit failed") > super::CHUNK_SIZE {
2348                    break;
2349                }
2350                i += 1;
2351            }
2352
2353            // Compact and then disable compactions.
2354            fs.journal().force_compact().await.expect("compact failed");
2355            fs.journal().stop_compactions().await;
2356
2357            // Keep going until we need another journal extent.
2358            let mut i = 0;
2359            loop {
2360                let mut transaction = fs
2361                    .clone()
2362                    .new_transaction(
2363                        lock_keys![LockKey::object(
2364                            store.store_object_id(),
2365                            store.root_directory_object_id()
2366                        )],
2367                        Options::default(),
2368                    )
2369                    .await
2370                    .expect("new_transaction failed");
2371                root_directory
2372                    .create_child_file(&mut transaction, &format!("b {i}"))
2373                    .await
2374                    .expect("create_child_file failed");
2375                if transaction.commit().await.expect("commit failed") > 2 * super::CHUNK_SIZE {
2376                    break;
2377                }
2378                i += 1;
2379            }
2380
2381            // Allow the journal to flush, but we don't want to sync.
2382            fasync::Timer::new(MonotonicDuration::from_millis(10)).await;
2383            // Because we're not gracefully closing the filesystem, a Discard record will be
2384            // emitted.
2385            fs.device().snapshot().expect("snapshot failed")
2386        };
2387
2388        let fs = FxFilesystem::open(device).await.expect("open failed");
2389
2390        {
2391            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
2392
2393            let store =
2394                root_volume.volume("test", StoreOptions::default()).await.expect("volume failed");
2395
2396            let root_directory = Directory::open(&store, store.root_directory_object_id())
2397                .await
2398                .expect("open failed");
2399
2400            // Write one more transaction.
2401            let mut transaction = fs
2402                .clone()
2403                .new_transaction(
2404                    lock_keys![LockKey::object(
2405                        store.store_object_id(),
2406                        store.root_directory_object_id()
2407                    )],
2408                    Options::default(),
2409                )
2410                .await
2411                .expect("new_transaction failed");
2412            root_directory
2413                .create_child_file(&mut transaction, &format!("d"))
2414                .await
2415                .expect("create_child_file failed");
2416            transaction.commit().await.expect("commit failed");
2417        }
2418
2419        fs.close().await.expect("close failed");
2420        let device = fs.take_device().await;
2421        device.reopen(false);
2422
2423        let fs = FxFilesystem::open(device).await.expect("open failed");
2424        fsck(fs.clone()).await.expect("fsck failed");
2425        fs.close().await.expect("close failed");
2426    }
2427
2428    #[fuchsia::test]
2429    async fn test_use_existing_generation() {
2430        let device = DeviceHolder::new(FakeDevice::new(8192, 4096));
2431
2432        // First format should be generation 1.
2433        let fs = FxFilesystemBuilder::new()
2434            .format(true)
2435            .image_builder_mode(Some(SuperBlockInstance::A))
2436            .open(device)
2437            .await
2438            .expect("open failed");
2439        fs.enable_allocations();
2440        let generation0 = fs.super_block_header().generation;
2441        assert_eq!(generation0, 1);
2442        fs.close().await.expect("close failed");
2443        let device = fs.take_device().await;
2444        device.reopen(false);
2445
2446        // Format the device normally (again, generation 1).
2447        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2448        let generation1 = fs.super_block_header().generation;
2449        {
2450            let root_volume = crate::object_store::volume::root_volume(fs.clone())
2451                .await
2452                .expect("root_volume failed");
2453            root_volume
2454                .new_volume("test", crate::object_store::NewChildStoreOptions::default())
2455                .await
2456                .expect("new_volume failed");
2457        }
2458        fs.close().await.expect("close failed");
2459        let device = fs.take_device().await;
2460        device.reopen(false);
2461
2462        // Format again with image_builder_mode.
2463        let fs = FxFilesystemBuilder::new()
2464            .format(true)
2465            .image_builder_mode(Some(SuperBlockInstance::A))
2466            .open(device)
2467            .await
2468            .expect("open failed");
2469        fs.enable_allocations();
2470        let generation2 = fs.super_block_header().generation;
2471        assert!(
2472            generation2 > generation1,
2473            "generation2 ({}) should be greater than generation1 ({})",
2474            generation2,
2475            generation1
2476        );
2477        fs.close().await.expect("close failed");
2478    }
2479
2480    #[fuchsia::test]
2481    async fn test_image_builder_mode_generation_bump_512_byte_block() {
2482        let device = DeviceHolder::new(FakeDevice::new(16384, 512));
2483
2484        // Format initial filesystem (generation 1)
2485        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2486        let generation1 = fs.super_block_header().generation;
2487        fs.close().await.expect("close failed");
2488        let device = fs.take_device().await;
2489        device.reopen(false);
2490
2491        // Format again with image_builder_mode (should bump generation)
2492        let fs = FxFilesystemBuilder::new()
2493            .format(true)
2494            .image_builder_mode(Some(SuperBlockInstance::A))
2495            .open(device)
2496            .await
2497            .expect("open failed");
2498
2499        fs.enable_allocations();
2500        let generation2 = fs.super_block_header().generation;
2501        assert!(
2502            generation2 > generation1,
2503            "Expected generation bump, got {} vs {}",
2504            generation2,
2505            generation1
2506        );
2507        fs.close().await.expect("close failed");
2508    }
2509
2510    #[fuchsia::test(allow_stalls = false)]
2511    #[cfg(target_os = "fuchsia")]
2512    async fn test_low_priority_compaction() {
2513        use fuchsia_async::TestExecutor;
2514        use std::sync::Arc;
2515        use std::sync::atomic::{AtomicBool, Ordering};
2516
2517        let device = DeviceHolder::new(FakeDevice::new(16384, TEST_DEVICE_BLOCK_SIZE));
2518        let fs = FxFilesystemBuilder::new()
2519            .journal_options(super::JournalOptions { reclaim_size: 65536, ..Default::default() })
2520            .format(true)
2521            .open(device)
2522            .await
2523            .expect("open failed");
2524
2525        let _low = fasync::LowPriorityTask::new();
2526
2527        // Add some data to the tree.
2528        {
2529            let root_store = fs.root_store();
2530            let root_directory =
2531                Directory::open(&root_store, root_store.root_directory_object_id())
2532                    .await
2533                    .expect("open failed");
2534            for i in 0..100 {
2535                let mut transaction = fs
2536                    .clone()
2537                    .new_transaction(
2538                        lock_keys![LockKey::object(
2539                            root_store.store_object_id(),
2540                            root_store.root_directory_object_id(),
2541                        )],
2542                        Options::default(),
2543                    )
2544                    .await
2545                    .expect("new_transaction failed");
2546                root_directory
2547                    .create_child_file(&mut transaction, &format!("test{}", i))
2548                    .await
2549                    .expect("create_child_file failed");
2550                transaction.commit().await.expect("commit failed");
2551            }
2552        }
2553
2554        // Spawn a task that polls every 1ms.
2555        let stop = Arc::new(AtomicBool::new(false));
2556        let stop_clone = stop.clone();
2557        let _normal_task = fasync::Task::spawn(async move {
2558            while !stop_clone.load(Ordering::Relaxed) {
2559                fasync::Timer::new(fasync::MonotonicInstant::after(
2560                    MonotonicDuration::from_millis(1),
2561                ))
2562                .await;
2563            }
2564        });
2565
2566        // Trigger journal compaction.
2567        // We can do this by writing more data until outstanding > reclaim_size / 2.
2568        {
2569            let root_store = fs.root_store();
2570            let root_directory =
2571                Directory::open(&root_store, root_store.root_directory_object_id())
2572                    .await
2573                    .expect("open failed");
2574            let mut i = 0;
2575            loop {
2576                let mut transaction = fs
2577                    .clone()
2578                    .new_transaction(
2579                        lock_keys![LockKey::object(
2580                            root_store.store_object_id(),
2581                            root_store.root_directory_object_id(),
2582                        )],
2583                        Options::default(),
2584                    )
2585                    .await
2586                    .expect("new_transaction failed");
2587                root_directory
2588                    .create_child_file(&mut transaction, &format!("trigger{i}"))
2589                    .await
2590                    .expect("create_child_file failed");
2591                transaction.commit().await.expect("commit failed");
2592
2593                if fs.journal().inner.lock().compaction_running {
2594                    break;
2595                }
2596                TestExecutor::advance_to(fasync::MonotonicInstant::after(
2597                    MonotonicDuration::from_millis(1),
2598                ))
2599                .await;
2600                i += 1;
2601            }
2602        }
2603
2604        // Compaction should now be running. Because of our 1ms poller, it should be yielding.
2605        // It will yield for 16ms at each point.
2606        for _ in 0..10 {
2607            TestExecutor::advance_to(fasync::MonotonicInstant::after(
2608                MonotonicDuration::from_millis(1),
2609            ))
2610            .await;
2611            assert!(fs.journal().inner.lock().compaction_running);
2612        }
2613
2614        // Stop the normal task.
2615        stop.store(true, Ordering::Relaxed);
2616        TestExecutor::advance_to(fasync::MonotonicInstant::after(MonotonicDuration::from_millis(
2617            1,
2618        )))
2619        .await;
2620
2621        // Compaction should still be running because it hasn't been 4 ms since the normal task
2622        // finished.
2623        assert!(fs.journal().inner.lock().compaction_running);
2624
2625        // For the next 3ms, compaction should still be running.
2626        for _ in 0..3 {
2627            TestExecutor::advance_to(fasync::MonotonicInstant::after(
2628                MonotonicDuration::from_millis(1),
2629            ))
2630            .await;
2631            assert!(fs.journal().inner.lock().compaction_running);
2632        }
2633
2634        // 1 more ms and compaction should be unblocked.
2635        TestExecutor::advance_to(fasync::MonotonicInstant::after(MonotonicDuration::from_millis(
2636            1,
2637        )))
2638        .await;
2639
2640        // When the executor next stalls, compaction should be done.
2641        let _ = TestExecutor::poll_until_stalled(std::future::pending::<()>()).await;
2642        assert!(!fs.journal().inner.lock().compaction_running);
2643
2644        fs.close().await.expect("Close failed");
2645    }
2646
2647    #[fuchsia::test(allow_stalls = false)]
2648    #[cfg(target_os = "fuchsia")]
2649    async fn test_low_priority_compaction_deadline() {
2650        use fuchsia_async::TestExecutor;
2651        use std::sync::Arc;
2652        use std::sync::atomic::{AtomicBool, Ordering};
2653
2654        let device = DeviceHolder::new(FakeDevice::new(216384, TEST_DEVICE_BLOCK_SIZE));
2655        let fs = FxFilesystemBuilder::new()
2656            .journal_options(super::JournalOptions { reclaim_size: 65536, ..Default::default() })
2657            .format(true)
2658            .open(device)
2659            .await
2660            .expect("open failed");
2661
2662        let _low = fasync::LowPriorityTask::new();
2663
2664        // Add some data to the tree.
2665        {
2666            let root_store = fs.root_store();
2667            let root_directory =
2668                Directory::open(&root_store, root_store.root_directory_object_id())
2669                    .await
2670                    .expect("open failed");
2671            for i in 0..10 {
2672                let mut transaction = fs
2673                    .clone()
2674                    .new_transaction(
2675                        lock_keys![LockKey::object(
2676                            root_store.store_object_id(),
2677                            root_store.root_directory_object_id(),
2678                        )],
2679                        Options::default(),
2680                    )
2681                    .await
2682                    .expect("new_transaction failed");
2683                root_directory
2684                    .create_child_file(&mut transaction, &format!("test{}", i))
2685                    .await
2686                    .expect("create_child_file failed");
2687                transaction.commit().await.expect("commit failed");
2688            }
2689        }
2690
2691        // Spawn a task that polls every 3ms.
2692        let stop = Arc::new(AtomicBool::new(false));
2693        let stop_clone = stop.clone();
2694        let _normal_task = fasync::Task::spawn(async move {
2695            while !stop_clone.load(Ordering::Relaxed) {
2696                fasync::Timer::new(fasync::MonotonicInstant::after(
2697                    MonotonicDuration::from_millis(3),
2698                ))
2699                .await;
2700            }
2701        });
2702
2703        // Trigger journal compaction.
2704        {
2705            let root_store = fs.root_store();
2706            let root_directory =
2707                Directory::open(&root_store, root_store.root_directory_object_id())
2708                    .await
2709                    .expect("open failed");
2710            let mut i = 0;
2711            loop {
2712                let mut transaction = fs
2713                    .clone()
2714                    .new_transaction(
2715                        lock_keys![LockKey::object(
2716                            root_store.store_object_id(),
2717                            root_store.root_directory_object_id(),
2718                        )],
2719                        Options::default(),
2720                    )
2721                    .await
2722                    .expect("new_transaction failed");
2723                root_directory
2724                    .create_child_file(&mut transaction, &format!("trigger{i}"))
2725                    .await
2726                    .expect("create_child_file failed");
2727                transaction.commit().await.expect("commit failed");
2728
2729                if fs.journal().inner.lock().compaction_running {
2730                    break;
2731                }
2732                TestExecutor::advance_to(fasync::MonotonicInstant::after(
2733                    MonotonicDuration::from_millis(1),
2734                ))
2735                .await;
2736                i += 1;
2737            }
2738        }
2739
2740        // Advance time in 20ms increments. Each increment should allow compaction to make progress
2741        // on one item (since MAX_YIELD_DURATION is 16ms).
2742        let mut count = 0;
2743        for _ in 0..1000 {
2744            TestExecutor::advance_to(fasync::MonotonicInstant::after(
2745                MonotonicDuration::from_millis(20),
2746            ))
2747            .await;
2748            if !fs.journal().inner.lock().compaction_running {
2749                break;
2750            }
2751            count += 1;
2752        }
2753        assert!(!fs.journal().inner.lock().compaction_running);
2754
2755        // Make sure it took a few iterations to complete.  It's difficult to know what the exact
2756        // number should be.
2757        assert!(count > 200);
2758
2759        stop.store(true, Ordering::Relaxed);
2760        fs.close().await.expect("Close failed");
2761    }
2762
2763    #[fuchsia::test(allow_stalls = false)]
2764    #[cfg(target_os = "fuchsia")]
2765    async fn test_low_priority_compaction_no_yielding_when_full() {
2766        use fuchsia_async::TestExecutor;
2767        use std::sync::Arc;
2768        use std::sync::atomic::{AtomicBool, Ordering};
2769
2770        let reclaim_size = 65536;
2771        let device = DeviceHolder::new(FakeDevice::new(216384, TEST_DEVICE_BLOCK_SIZE));
2772        let fs = FxFilesystemBuilder::new()
2773            .journal_options(super::JournalOptions { reclaim_size, ..Default::default() })
2774            .format(true)
2775            .open(device)
2776            .await
2777            .expect("open failed");
2778
2779        let _low = fasync::LowPriorityTask::new();
2780
2781        // Add some data to the tree.
2782        {
2783            let root_store = fs.root_store();
2784            let root_directory =
2785                Directory::open(&root_store, root_store.root_directory_object_id())
2786                    .await
2787                    .expect("open failed");
2788            for i in 0..10 {
2789                let mut transaction = fs
2790                    .clone()
2791                    .new_transaction(
2792                        lock_keys![LockKey::object(
2793                            root_store.store_object_id(),
2794                            root_store.root_directory_object_id(),
2795                        )],
2796                        Options::default(),
2797                    )
2798                    .await
2799                    .expect("new_transaction failed");
2800                root_directory
2801                    .create_child_file(&mut transaction, &format!("test{}", i))
2802                    .await
2803                    .expect("create_child_file failed");
2804                transaction.commit().await.expect("commit failed");
2805            }
2806        }
2807
2808        // Spawn a task that polls every 3ms.
2809        let stop = Arc::new(AtomicBool::new(false));
2810        let stop_clone = stop.clone();
2811        let _normal_task = fasync::Task::spawn(async move {
2812            while !stop_clone.load(Ordering::Relaxed) {
2813                fasync::Timer::new(fasync::MonotonicInstant::after(
2814                    MonotonicDuration::from_millis(3),
2815                ))
2816                .await;
2817            }
2818        });
2819
2820        // Trigger journal compaction, but this time we fill it up to 3/4 full.
2821        {
2822            let root_store = fs.root_store();
2823            let root_directory =
2824                Directory::open(&root_store, root_store.root_directory_object_id())
2825                    .await
2826                    .expect("open failed");
2827            let mut i = 0;
2828            loop {
2829                let mut transaction = fs
2830                    .clone()
2831                    .new_transaction(
2832                        lock_keys![LockKey::object(
2833                            root_store.store_object_id(),
2834                            root_store.root_directory_object_id(),
2835                        )],
2836                        Options::default(),
2837                    )
2838                    .await
2839                    .expect("new_transaction failed");
2840                root_directory
2841                    .create_child_file(&mut transaction, &format!("trigger{i}"))
2842                    .await
2843                    .expect("create_child_file failed");
2844                transaction.commit().await.expect("commit failed");
2845
2846                let outstanding = {
2847                    let inner = fs.journal().inner.lock();
2848                    fs.journal().objects.last_end_offset()
2849                        - inner.super_block_header.journal_checkpoint.file_offset
2850                };
2851                if outstanding >= reclaim_size * 7 / 8 {
2852                    break;
2853                }
2854                // We don't advance time here to try and reach 3/4 before compaction can yield.
2855                i += 1;
2856            }
2857        }
2858
2859        // Advancing by 4ms should be enough to wake compaction up.
2860        TestExecutor::advance_to(fasync::MonotonicInstant::after(MonotonicDuration::from_millis(
2861            4,
2862        )))
2863        .await;
2864
2865        // When the executor next stalls, compaction should be done.
2866        let _ = TestExecutor::poll_until_stalled(std::future::pending::<()>()).await;
2867        assert!(!fs.journal().inner.lock().compaction_running);
2868
2869        stop.store(true, Ordering::Relaxed);
2870        fs.close().await.expect("Close failed");
2871    }
2872}
2873
2874#[cfg(fuzz)]
2875mod fuzz {
2876    use fuzz::fuzz;
2877
2878    #[fuzz]
2879    fn fuzz_journal_bytes(input: Vec<u8>) {
2880        use crate::filesystem::FxFilesystem;
2881        use fuchsia_async as fasync;
2882        use std::io::Write;
2883        use storage_device::DeviceHolder;
2884        use storage_device::fake_device::FakeDevice;
2885
2886        fasync::SendExecutorBuilder::new().num_threads(4).build().run(async move {
2887            let device = DeviceHolder::new(FakeDevice::new(32768, 512));
2888            let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2889            fs.journal().inner.lock().writer.write_all(&input).expect("write failed");
2890            fs.close().await.expect("close failed");
2891            let device = fs.take_device().await;
2892            device.reopen(false);
2893            if let Ok(fs) = FxFilesystem::open(device).await {
2894                // `close()` can fail if there were objects to be tombstoned. If the said object is
2895                // corrupted, there will be an error when we compact the journal.
2896                let _ = fs.close().await;
2897            }
2898        });
2899    }
2900
2901    #[fuzz]
2902    fn fuzz_journal(input: Vec<super::JournalRecord>) {
2903        use crate::filesystem::FxFilesystem;
2904        use fuchsia_async as fasync;
2905        use storage_device::DeviceHolder;
2906        use storage_device::fake_device::FakeDevice;
2907
2908        fasync::SendExecutorBuilder::new().num_threads(4).build().run(async move {
2909            let device = DeviceHolder::new(FakeDevice::new(32768, 512));
2910            let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2911            {
2912                let mut inner = fs.journal().inner.lock();
2913                for record in &input {
2914                    let _ = inner.writer.write_record(record);
2915                }
2916            }
2917            fs.close().await.expect("close failed");
2918            let device = fs.take_device().await;
2919            device.reopen(false);
2920            if let Ok(fs) = FxFilesystem::open(device).await {
2921                // `close()` can fail if there were objects to be tombstoned. If the said object is
2922                // corrupted, there will be an error when we compact the journal.
2923                let _ = fs.close().await;
2924            }
2925        });
2926    }
2927}