Skip to main content

fxfs/object_store/
journal.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! The journal is implemented as an ever extending file which contains variable length records
6//! that describe mutations to be applied to various objects.  The journal file consists of
7//! blocks, with a checksum at the end of each block, but otherwise it can be considered a
8//! continuous stream.
9//!
10//! The checksum is seeded with the checksum from the previous block.  To free space in the
11//! journal, records are replaced with sparse extents when it is known they are no longer
12//! needed to mount.
13//!
14//! At mount time, the journal is replayed: the mutations are applied into memory.
15//! Eventually, a checksum failure will indicate no more records exist to be replayed,
16//! at which point the mount can continue and the journal will be extended from that point with
17//! further mutations as required.
18
19mod bootstrap_handle;
20mod checksum_list;
21mod reader;
22pub mod super_block;
23mod writer;
24
25use crate::checksum::{Checksum, Checksums, ChecksumsV38};
26use crate::errors::FxfsError;
27use crate::filesystem::{ApplyContext, ApplyMode, FxFilesystem, SyncOptions};
28use crate::log::*;
29use crate::lsm_tree::cache::NullCache;
30use crate::lsm_tree::types::Layer;
31use crate::object_handle::{ObjectHandle as _, ReadObjectHandle};
32use crate::object_store::allocator::Allocator;
33use crate::object_store::data_object_handle::OverwriteOptions;
34use crate::object_store::extent_record::{
35    DEFAULT_DATA_ATTRIBUTE_ID, ExtentKey, ExtentMode, ExtentValue,
36};
37use crate::object_store::graveyard::Graveyard;
38use crate::object_store::journal::bootstrap_handle::BootstrapObjectHandle;
39use crate::object_store::journal::checksum_list::ChecksumList;
40use crate::object_store::journal::reader::{JournalReader, ReadResult};
41use crate::object_store::journal::super_block::{
42    SuperBlockHeader, SuperBlockInstance, SuperBlockManager,
43};
44use crate::object_store::journal::writer::JournalWriter;
45use crate::object_store::object_manager::ObjectManager;
46use crate::object_store::object_record::{AttributeKey, ObjectKey, ObjectKeyData, ObjectValue};
47use crate::object_store::transaction::{
48    AllocatorMutation, LockKey, Mutation, MutationV40, MutationV41, MutationV43, MutationV46,
49    MutationV47, MutationV49, MutationV50, MutationV54, MutationV55, ObjectStoreMutation, Options,
50    TRANSACTION_MAX_JOURNAL_USAGE, Transaction, TxnMutation, lock_keys,
51};
52use crate::object_store::{
53    AssocObj, DataObjectHandle, HandleOptions, HandleOwner, INVALID_OBJECT_ID, Item, ItemRef,
54    NewChildStoreOptions, ObjectStore, ReservedId,
55};
56use crate::range::RangeExt;
57use crate::round::{round_div, round_down};
58use crate::serialized_types::{
59    LATEST_VERSION, Migrate, Version, Versioned, migrate_nodefault, migrate_to_version,
60};
61use anyhow::{Context, Error, anyhow, bail, ensure};
62use event_listener::Event;
63use fprint::TypeFingerprint;
64use fuchsia_inspect::NumericProperty;
65use fuchsia_sync::Mutex;
66use futures::FutureExt as _;
67use futures::future::poll_fn;
68use rustc_hash::FxHashMap as HashMap;
69use serde::{Deserialize, Serialize};
70use static_assertions::const_assert;
71use std::clone::Clone;
72use std::collections::HashSet;
73use std::num::NonZero;
74use std::ops::{Bound, Range};
75use std::sync::atomic::{AtomicBool, Ordering};
76use std::sync::{Arc, OnceLock};
77use std::task::{Poll, Waker};
78use storage_device::Device;
79
80// The journal file is written to in blocks of this size.
81pub const BLOCK_SIZE: u64 = 4096;
82
83// The journal file is extended by this amount when necessary.
84const CHUNK_SIZE: u64 = 131_072;
85const_assert!(CHUNK_SIZE > TRANSACTION_MAX_JOURNAL_USAGE);
86
87// See the comment for the `reclaim_size` member of Inner.
88pub const DEFAULT_RECLAIM_SIZE: u64 = 524_288;
89
90// Temporary space that should be reserved for the journal.  For example: space that is currently
91// used in the journal file but cannot be deallocated yet because we are flushing.
92pub const RESERVED_SPACE: u64 = 1_048_576;
93
94// Whenever the journal is replayed (i.e. the system is unmounted and remounted), we reset the
95// journal stream, at which point any half-complete transactions are discarded.  We indicate a
96// journal reset by XORing the previous block's checksum with this mask, and using that value as a
97// seed for the next journal block.
98const RESET_XOR: u64 = 0xffffffffffffffff;
99
100// To keep track of offsets within a journal file, we need both the file offset and the check-sum of
101// the preceding block, since the check-sum of the preceding block is an input to the check-sum of
102// every block.
103pub type JournalCheckpoint = JournalCheckpointV32;
104
105#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize, TypeFingerprint)]
106pub struct JournalCheckpointV32 {
107    pub file_offset: u64,
108
109    // Starting check-sum for block that contains file_offset i.e. the checksum for the previous
110    // block.
111    pub checksum: Checksum,
112
113    // If versioned, the version of elements stored in the journal. e.g. JournalRecord version.
114    // This can change across reset events so we store it along with the offset and checksum to
115    // know which version to deserialize.
116    pub version: Version,
117}
118
119pub type JournalRecord = JournalRecordV55;
120
121#[allow(clippy::large_enum_variant)]
122#[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint, Versioned)]
123#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
124pub enum JournalRecordV55 {
125    EndBlock,
126    Mutation {
127        object_id: u64,
128        mutation: MutationV55,
129    },
130    /// Commits records in the transaction.
131    Commit,
132    /// Discard all mutations with offsets greater than or equal to the given offset.
133    Discard(u64),
134    /// Indicates the device was flushed at the given journal offset.
135    /// Note that this really means that at this point in the journal offset, we can be certain that
136    /// there's no remaining buffered data in the block device; the buffers and the disk contents are
137    /// consistent.
138    /// We insert one of these records *after* a flush along with the *next* transaction to go
139    /// through.  If that never comes (either due to graceful or hard shutdown), the journal reset
140    /// on the next mount will serve the same purpose and count as a flush, although it is necessary
141    /// to defensively flush the device before replaying the journal (if possible, i.e. not
142    /// read-only) in case the block device connection was reused.
143    DidFlushDevice(u64),
144    /// Checksums for a data range written by this transaction. A transaction is only valid if these
145    /// checksums are right. The range is the device offset the checksums are for.
146    ///
147    /// A boolean indicates whether this range is being written to for the first time. For overwrite
148    /// extents, we only check the checksums for a block if it has been written to for the first
149    /// time since the last flush, because otherwise we can't roll it back anyway so it doesn't
150    /// matter. For copy-on-write extents, the bool is always true.
151    DataChecksums(Range<u64>, crate::checksum::ChecksumsV38, bool),
152}
153
154#[allow(clippy::large_enum_variant)]
155#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
156#[migrate_to_version(JournalRecordV55)]
157#[migrate_nodefault]
158pub enum JournalRecordV54 {
159    EndBlock,
160    Mutation { object_id: u64, mutation: MutationV54 },
161    Commit,
162    Discard(u64),
163    DidFlushDevice(u64),
164    DataChecksums(Range<u64>, crate::checksum::ChecksumsV38, bool),
165}
166
167#[allow(clippy::large_enum_variant)]
168#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
169#[migrate_to_version(JournalRecordV54)]
170#[migrate_nodefault]
171pub enum JournalRecordV50 {
172    EndBlock,
173    Mutation { object_id: u64, mutation: MutationV50 },
174    Commit,
175    Discard(u64),
176    DidFlushDevice(u64),
177    DataChecksums(Range<u64>, ChecksumsV38, bool),
178}
179
180#[allow(clippy::large_enum_variant)]
181#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
182#[migrate_to_version(JournalRecordV50)]
183pub enum JournalRecordV49 {
184    EndBlock,
185    Mutation { object_id: u64, mutation: MutationV49 },
186    Commit,
187    Discard(u64),
188    DidFlushDevice(u64),
189    DataChecksums(Range<u64>, ChecksumsV38, bool),
190}
191
192#[allow(clippy::large_enum_variant)]
193#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
194#[migrate_to_version(JournalRecordV49)]
195pub enum JournalRecordV47 {
196    EndBlock,
197    Mutation { object_id: u64, mutation: MutationV47 },
198    Commit,
199    Discard(u64),
200    DidFlushDevice(u64),
201    DataChecksums(Range<u64>, ChecksumsV38, bool),
202}
203
204#[allow(clippy::large_enum_variant)]
205#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
206#[migrate_to_version(JournalRecordV47)]
207pub enum JournalRecordV46 {
208    EndBlock,
209    Mutation { object_id: u64, mutation: MutationV46 },
210    Commit,
211    Discard(u64),
212    DidFlushDevice(u64),
213    DataChecksums(Range<u64>, ChecksumsV38, bool),
214}
215
216#[allow(clippy::large_enum_variant)]
217#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
218#[migrate_to_version(JournalRecordV46)]
219pub enum JournalRecordV43 {
220    EndBlock,
221    Mutation { object_id: u64, mutation: MutationV43 },
222    Commit,
223    Discard(u64),
224    DidFlushDevice(u64),
225    DataChecksums(Range<u64>, ChecksumsV38, bool),
226}
227
228#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
229#[migrate_to_version(JournalRecordV43)]
230pub enum JournalRecordV42 {
231    EndBlock,
232    Mutation { object_id: u64, mutation: MutationV41 },
233    Commit,
234    Discard(u64),
235    DidFlushDevice(u64),
236    DataChecksums(Range<u64>, ChecksumsV38, bool),
237}
238
239#[derive(Serialize, Deserialize, TypeFingerprint, Versioned)]
240pub enum JournalRecordV41 {
241    EndBlock,
242    Mutation { object_id: u64, mutation: MutationV41 },
243    Commit,
244    Discard(u64),
245    DidFlushDevice(u64),
246    DataChecksums(Range<u64>, ChecksumsV38),
247}
248
249impl From<JournalRecordV41> for JournalRecordV42 {
250    fn from(record: JournalRecordV41) -> Self {
251        match record {
252            JournalRecordV41::EndBlock => Self::EndBlock,
253            JournalRecordV41::Mutation { object_id, mutation } => {
254                Self::Mutation { object_id, mutation: mutation.into() }
255            }
256            JournalRecordV41::Commit => Self::Commit,
257            JournalRecordV41::Discard(offset) => Self::Discard(offset),
258            JournalRecordV41::DidFlushDevice(offset) => Self::DidFlushDevice(offset),
259            JournalRecordV41::DataChecksums(range, sums) => {
260                // At the time of writing the only extents written by real systems are CoW extents
261                // so the new bool is always true.
262                Self::DataChecksums(range, sums, true)
263            }
264        }
265    }
266}
267
268#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
269#[migrate_to_version(JournalRecordV41)]
270pub enum JournalRecordV40 {
271    EndBlock,
272    Mutation { object_id: u64, mutation: MutationV40 },
273    Commit,
274    Discard(u64),
275    DidFlushDevice(u64),
276    DataChecksums(Range<u64>, ChecksumsV38),
277}
278
279pub(super) fn journal_handle_options() -> HandleOptions {
280    HandleOptions { skip_journal_checks: true, ..Default::default() }
281}
282
283/// The journal records a stream of mutations that are to be applied to other objects.  At mount
284/// time, these records can be replayed into memory.  It provides a way to quickly persist changes
285/// without having to make a large number of writes; they can be deferred to a later time (e.g.
286/// when a sufficient number have been queued).  It also provides support for transactions, the
287/// ability to have mutations that are to be applied atomically together.
288pub struct Journal {
289    objects: Arc<ObjectManager>,
290    handle: OnceLock<DataObjectHandle<ObjectStore>>,
291    super_block_manager: SuperBlockManager,
292    inner: Mutex<Inner>,
293    writer_mutex: Mutex<()>,
294    sync_mutex: futures::lock::Mutex<()>,
295    trace: AtomicBool,
296
297    // This event is used when we are waiting for a compaction to free up journal space.
298    reclaim_event: Event,
299}
300
301struct Inner {
302    super_block_header: SuperBlockHeader,
303
304    // The offset that we can zero the journal up to now that it is no longer needed.
305    zero_offset: Option<u64>,
306
307    // The journal offset that we most recently flushed to the device.
308    device_flushed_offset: u64,
309
310    // If true, indicates a DidFlushDevice record is pending.
311    needs_did_flush_device: bool,
312
313    // The writer for the journal.
314    writer: JournalWriter,
315
316    // Set when a reset is encountered during a read.
317    // Used at write pre_commit() time to ensure we write a version first thing after a reset.
318    output_reset_version: bool,
319
320    // Waker for the flush task.
321    flush_waker: Option<Waker>,
322
323    // Indicates the journal has been terminated.
324    terminate: bool,
325
326    // Latched error indicating reason for journal termination if not graceful.
327    terminate_reason: Option<Error>,
328
329    // Disable compactions.
330    disable_compactions: bool,
331
332    // True if compactions are running.
333    compaction_running: bool,
334
335    // Waker for the sync task for when it's waiting for the flush task to finish.
336    sync_waker: Option<Waker>,
337
338    // The last offset we flushed to the journal file.
339    flushed_offset: u64,
340
341    // The last offset that should be considered valid in the journal file.  Most of the time, this
342    // will be the same as `flushed_offset` but at mount time, this could be less and will only be
343    // up to the end of the last valid transaction; it won't include transactions that follow that
344    // have been discarded.
345    valid_to: u64,
346
347    // If, after replaying, we have to discard a number of mutations (because they don't validate),
348    // this offset specifies where we need to discard back to.  This is so that when we next replay,
349    // we ignore those mutations and continue with new good mutations.
350    discard_offset: Option<u64>,
351
352    // In the steady state, the journal should fluctuate between being approximately half of this
353    // number and this number.  New super-blocks will be written every time about half of this
354    // amount is written to the journal.
355    reclaim_size: u64,
356
357    image_builder_mode: Option<SuperBlockInstance>,
358
359    // If true and `needs_barrier`, issue a pre-barrier on the first device write of each journal
360    // write (which happens in multiples of `BLOCK_SIZE`). This ensures that all the corresponding
361    // data writes make it to disk before the journal gets written to.
362    barriers_enabled: bool,
363
364    // If true, indicates that data write requests have been made to the device since the last
365    // journal write.
366    needs_barrier: bool,
367
368    // True if a compaction is being forced for reasons other than the journal being full.
369    forced_compaction: bool,
370}
371
372impl Inner {
373    fn terminate(&mut self, reason: Option<Error>) {
374        self.terminate = true;
375
376        if let Some(err) = reason {
377            error!(error:? = err; "Terminating journal");
378            // Log previous error if one was already set, otherwise latch the error.
379            if let Some(prev_err) = self.terminate_reason.as_ref() {
380                error!(error:? = prev_err; "Journal previously terminated");
381            } else {
382                self.terminate_reason = Some(err);
383            }
384        }
385
386        if let Some(waker) = self.flush_waker.take() {
387            waker.wake();
388        }
389        if let Some(waker) = self.sync_waker.take() {
390            waker.wake();
391        }
392    }
393}
394
395pub struct JournalOptions {
396    /// In the steady state, the journal should fluctuate between being approximately half of this
397    /// number and this number.  New super-blocks will be written every time about half of this
398    /// amount is written to the journal.
399    pub reclaim_size: u64,
400
401    // If true, issue a pre-barrier on the first device write of each journal write (which happens
402    // in multiples of `BLOCK_SIZE`). This ensures that all the corresponding data writes make it
403    // to disk before the journal gets written to.
404    pub barriers_enabled: bool,
405}
406
407impl Default for JournalOptions {
408    fn default() -> Self {
409        JournalOptions { reclaim_size: DEFAULT_RECLAIM_SIZE, barriers_enabled: false }
410    }
411}
412
413struct JournaledTransactions {
414    transactions: Vec<JournaledTransaction>,
415    device_flushed_offset: u64,
416}
417
418#[derive(Debug, Default)]
419pub struct JournaledTransaction {
420    pub checkpoint: JournalCheckpoint,
421    pub root_parent_mutations: Vec<Mutation>,
422    pub root_mutations: Vec<Mutation>,
423    /// List of (store_object_id, mutation).
424    pub non_root_mutations: Vec<(u64, Mutation)>,
425    pub end_offset: u64,
426    pub checksums: Vec<JournaledChecksums>,
427
428    /// Records offset + 1 of the matching begin_flush transaction. The +1 is because we want to
429    /// ignore the begin flush transaction; we don't need or want to replay it.
430    pub end_flush: Option<(/* store_id: */ u64, /* begin offset: */ u64)>,
431
432    /// The volume which was deleted in this transaction, if any.
433    pub volume_deleted: Option</* store_id: */ u64>,
434}
435
436impl JournaledTransaction {
437    fn new(checkpoint: JournalCheckpoint) -> Self {
438        Self { checkpoint, ..Default::default() }
439    }
440}
441
442const VOLUME_DELETED: u64 = u64::MAX;
443
444#[derive(Debug)]
445pub struct JournaledChecksums {
446    pub device_range: Range<u64>,
447    pub checksums: Checksums,
448    pub first_write: bool,
449}
450
451/// Handles for journal-like objects have some additional functionality to manage their extents,
452/// since during replay we need to add extents as we find them.
453pub trait JournalHandle: ReadObjectHandle {
454    /// The end offset of the last extent in the JournalHandle.  Used only for validating extents
455    /// (which will be skipped if None is returned).
456    /// Note this is equivalent in value to ReadObjectHandle::get_size, when present.
457    fn end_offset(&self) -> Option<u64>;
458    /// Adds an extent to the current end of the journal stream.
459    /// `added_offset` is the offset into the journal of the transaction which added this extent,
460    /// used for discard_extents.
461    fn push_extent(&mut self, added_offset: u64, device_range: Range<u64>);
462    /// Discards all extents which were added in a transaction at offset >= |discard_offset|.
463    fn discard_extents(&mut self, discard_offset: u64);
464}
465
466// Provide a stub implementation for DataObjectHandle so we can use it in
467// Journal::read_transactions.  Manual extent management is a NOP (which is OK since presumably the
468// DataObjectHandle already knows where its extents live).
469impl<S: HandleOwner> JournalHandle for DataObjectHandle<S> {
470    fn end_offset(&self) -> Option<u64> {
471        None
472    }
473    fn push_extent(&mut self, _added_offset: u64, _device_range: Range<u64>) {
474        // NOP
475    }
476    fn discard_extents(&mut self, _discard_offset: u64) {
477        // NOP
478    }
479}
480
481#[fxfs_trace::trace]
482impl Journal {
483    pub fn new(objects: Arc<ObjectManager>, options: JournalOptions) -> Journal {
484        let starting_checksum = rand::random_range(1..u64::MAX);
485        Journal {
486            objects: objects,
487            handle: OnceLock::new(),
488            super_block_manager: SuperBlockManager::new(),
489            inner: Mutex::new(Inner {
490                super_block_header: SuperBlockHeader::default(),
491                zero_offset: None,
492                device_flushed_offset: 0,
493                needs_did_flush_device: false,
494                writer: JournalWriter::new(BLOCK_SIZE as usize, starting_checksum),
495                output_reset_version: false,
496                flush_waker: None,
497                terminate: false,
498                terminate_reason: None,
499                disable_compactions: false,
500                compaction_running: false,
501                sync_waker: None,
502                flushed_offset: 0,
503                valid_to: 0,
504                discard_offset: None,
505                reclaim_size: options.reclaim_size,
506                image_builder_mode: None,
507                barriers_enabled: options.barriers_enabled,
508                needs_barrier: false,
509                forced_compaction: false,
510            }),
511            writer_mutex: Mutex::new(()),
512            sync_mutex: futures::lock::Mutex::new(()),
513            trace: AtomicBool::new(false),
514            reclaim_event: Event::new(),
515        }
516    }
517
518    pub fn set_trace(&self, trace: bool) {
519        let old_value = self.trace.swap(trace, Ordering::Relaxed);
520        if trace != old_value {
521            info!(trace; "J: trace");
522        }
523    }
524
525    pub fn set_image_builder_mode(&self, mode: Option<SuperBlockInstance>) {
526        self.inner.lock().image_builder_mode = mode;
527        if let Some(instance) = mode {
528            *self.super_block_manager.next_instance.lock() = instance;
529        }
530    }
531
532    pub fn image_builder_mode(&self) -> Option<SuperBlockInstance> {
533        self.inner.lock().image_builder_mode
534    }
535
536    #[cfg(feature = "migration")]
537    pub fn set_filesystem_uuid(&self, uuid: &[u8; 16]) -> Result<(), Error> {
538        ensure!(
539            self.inner.lock().image_builder_mode.is_some(),
540            "Can only set filesystem uuid in image builder mode."
541        );
542        self.inner.lock().super_block_header.guid.0 = uuid::Uuid::from_bytes(*uuid);
543        Ok(())
544    }
545
546    pub(crate) async fn read_superblocks(
547        &self,
548        device: Arc<dyn Device>,
549        block_size: u64,
550    ) -> Result<(SuperBlockHeader, ObjectStore), Error> {
551        self.super_block_manager.load(device, block_size).await
552    }
553
554    /// Used during replay to validate a mutation.  This should return false if the mutation is not
555    /// valid and should not be applied.  This could be for benign reasons: e.g. the device flushed
556    /// data out-of-order, or because of a malicious actor.
557    fn validate_mutation(&self, mutation: &Mutation, block_size: u64, device_size: u64) -> bool {
558        match mutation {
559            Mutation::ObjectStore(ObjectStoreMutation {
560                item:
561                    Item {
562                        key:
563                            ObjectKey {
564                                data:
565                                    ObjectKeyData::Attribute(
566                                        _,
567                                        AttributeKey::Extent(ExtentKey { range }),
568                                    ),
569                                ..
570                            },
571                        value: ObjectValue::Extent(ExtentValue::Some { device_offset, mode, .. }),
572                        ..
573                    },
574                ..
575            }) => {
576                if range.is_empty() || !range.is_aligned(block_size) {
577                    return false;
578                }
579                let len = range.length().unwrap();
580                if let ExtentMode::Cow(checksums) = mode {
581                    if checksums.len() > 0 {
582                        if len % checksums.len() as u64 != 0 {
583                            return false;
584                        }
585                        if (len / checksums.len() as u64) % block_size != 0 {
586                            return false;
587                        }
588                    }
589                }
590                if *device_offset % block_size != 0
591                    || *device_offset >= device_size
592                    || device_size - *device_offset < len
593                {
594                    return false;
595                }
596            }
597            Mutation::ObjectStore(_) => {}
598            Mutation::EncryptedObjectStore(_) => {}
599            Mutation::Allocator(AllocatorMutation::Allocate { device_range, owner_object_id }) => {
600                return !device_range.is_empty()
601                    && *owner_object_id != INVALID_OBJECT_ID
602                    && device_range.end <= device_size;
603            }
604            Mutation::Allocator(AllocatorMutation::Deallocate {
605                device_range,
606                owner_object_id,
607            }) => {
608                return !device_range.is_empty()
609                    && *owner_object_id != INVALID_OBJECT_ID
610                    && device_range.end <= device_size;
611            }
612            Mutation::Allocator(AllocatorMutation::MarkForDeletion(owner_object_id)) => {
613                return *owner_object_id != INVALID_OBJECT_ID;
614            }
615            Mutation::Allocator(AllocatorMutation::SetLimit { owner_object_id, .. }) => {
616                return *owner_object_id != INVALID_OBJECT_ID;
617            }
618            Mutation::BeginFlush => {}
619            Mutation::EndFlush => {}
620            Mutation::DeleteVolume => {}
621            Mutation::UpdateBorrowed(_) => {}
622            Mutation::UpdateMutationsKey(_) => {}
623            Mutation::CreateInternalDir(owner_object_id) => {
624                return *owner_object_id != INVALID_OBJECT_ID;
625            }
626        }
627        true
628    }
629
630    // Assumes that `mutation` has been validated.
631    fn update_checksum_list(
632        &self,
633        journal_offset: u64,
634        mutation: &Mutation,
635        checksum_list: &mut ChecksumList,
636    ) -> Result<(), Error> {
637        match mutation {
638            Mutation::ObjectStore(_) => {}
639            Mutation::Allocator(AllocatorMutation::Deallocate { device_range, .. }) => {
640                checksum_list.mark_deallocated(journal_offset, device_range.clone().into());
641            }
642            _ => {}
643        }
644        Ok(())
645    }
646
647    /// Reads the latest super-block, and then replays journaled records.
648    #[trace]
649    pub async fn replay(
650        &self,
651        filesystem: Arc<FxFilesystem>,
652        on_new_allocator: Option<Box<dyn Fn(Arc<Allocator>) + Send + Sync>>,
653    ) -> Result<(), Error> {
654        let block_size = filesystem.block_size();
655
656        let (super_block, root_parent) =
657            self.super_block_manager.load(filesystem.device(), block_size).await?;
658
659        let root_parent = Arc::new(ObjectStore::attach_filesystem(root_parent, filesystem.clone()));
660
661        self.objects.set_root_parent_store(root_parent.clone());
662        let allocator =
663            Arc::new(Allocator::new(filesystem.clone(), super_block.allocator_object_id));
664        if let Some(on_new_allocator) = on_new_allocator {
665            on_new_allocator(allocator.clone());
666        }
667        self.objects.set_allocator(allocator.clone());
668        self.objects.set_borrowed_metadata_space(super_block.borrowed_metadata_space);
669        self.objects.set_last_end_offset(super_block.super_block_journal_file_offset);
670        {
671            let mut inner = self.inner.lock();
672            inner.super_block_header = super_block.clone();
673        }
674
675        let device = filesystem.device();
676
677        let mut handle;
678        {
679            let root_parent_layer = root_parent.tree().mutable_layer();
680            let mut iter = root_parent_layer
681                .seek(Bound::Included(&ObjectKey::attribute(
682                    super_block.journal_object_id,
683                    DEFAULT_DATA_ATTRIBUTE_ID,
684                    AttributeKey::Extent(ExtentKey::search_key_from_offset(round_down(
685                        super_block.journal_checkpoint.file_offset,
686                        BLOCK_SIZE,
687                    ))),
688                )))
689                .await
690                .context("Failed to seek root parent store")?;
691            let start_offset = if let Some(ItemRef {
692                key:
693                    ObjectKey {
694                        data:
695                            ObjectKeyData::Attribute(
696                                DEFAULT_DATA_ATTRIBUTE_ID,
697                                AttributeKey::Extent(ExtentKey { range }),
698                            ),
699                        ..
700                    },
701                ..
702            }) = iter.get()
703            {
704                range.start
705            } else {
706                0
707            };
708            handle = BootstrapObjectHandle::new_with_start_offset(
709                super_block.journal_object_id,
710                device.clone(),
711                start_offset,
712            );
713            while let Some(item) = iter.get() {
714                if !match item.into() {
715                    Some((
716                        object_id,
717                        DEFAULT_DATA_ATTRIBUTE_ID,
718                        ExtentKey { range },
719                        ExtentValue::Some { device_offset, .. },
720                    )) if object_id == super_block.journal_object_id => {
721                        if let Some(end_offset) = handle.end_offset() {
722                            if range.start != end_offset {
723                                bail!(anyhow!(FxfsError::Inconsistent).context(format!(
724                                    "Unexpected journal extent {:?}, expected start: {}",
725                                    item, end_offset
726                                )));
727                            }
728                        }
729                        handle.push_extent(
730                            0, // We never discard extents from the root parent store.
731                            *device_offset
732                                ..*device_offset + range.length().context("Invalid extent")?,
733                        );
734                        true
735                    }
736                    _ => false,
737                } {
738                    break;
739                }
740                iter.advance().await.context("Failed to advance root parent store iterator")?;
741            }
742        }
743
744        let mut reader = JournalReader::new(handle, &super_block.journal_checkpoint);
745        let JournaledTransactions { mut transactions, device_flushed_offset } = self
746            .read_transactions(&mut reader, None, INVALID_OBJECT_ID)
747            .await
748            .context("Reading transactions for replay")?;
749
750        // Validate all the mutations.
751        let mut checksum_list = ChecksumList::new(device_flushed_offset);
752        let mut valid_to = reader.journal_file_checkpoint().file_offset;
753        let device_size = device.size();
754        'bad_replay: for JournaledTransaction {
755            checkpoint,
756            root_parent_mutations,
757            root_mutations,
758            non_root_mutations,
759            checksums,
760            ..
761        } in &transactions
762        {
763            for JournaledChecksums { device_range, checksums, first_write } in checksums {
764                checksum_list
765                    .push(
766                        checkpoint.file_offset,
767                        device_range.clone(),
768                        checksums.maybe_as_ref().context("Malformed checksums")?,
769                        *first_write,
770                    )
771                    .context("Pushing journal checksum records to checksum list")?;
772            }
773            for mutation in root_parent_mutations
774                .iter()
775                .chain(root_mutations)
776                .chain(non_root_mutations.iter().map(|(_, m)| m))
777            {
778                if !self.validate_mutation(mutation, block_size, device_size) {
779                    info!(mutation:?; "Stopping replay at bad mutation");
780                    valid_to = checkpoint.file_offset;
781                    break 'bad_replay;
782                }
783                self.update_checksum_list(checkpoint.file_offset, &mutation, &mut checksum_list)?;
784            }
785        }
786
787        // Validate the checksums. Note if barriers are enabled, there will be no checksums in
788        // practice to verify.
789        let valid_to = checksum_list
790            .verify(device.as_ref(), valid_to)
791            .await
792            .context("Failed to validate checksums")?;
793
794        // Apply the mutations...
795
796        let mut last_checkpoint = reader.journal_file_checkpoint();
797        let mut journal_offsets = super_block.journal_file_offsets.clone();
798
799        // Start with the root-parent mutations, and also determine the journal offsets for all
800        // other objects.
801        for (
802            index,
803            JournaledTransaction {
804                checkpoint,
805                root_parent_mutations,
806                end_flush,
807                volume_deleted,
808                ..
809            },
810        ) in transactions.iter_mut().enumerate()
811        {
812            if checkpoint.file_offset >= valid_to {
813                last_checkpoint = checkpoint.clone();
814
815                // Truncate the transactions so we don't need to worry about them on the next pass.
816                transactions.truncate(index);
817                break;
818            }
819
820            let context = ApplyContext { mode: ApplyMode::Replay, checkpoint: checkpoint.clone() };
821            for mutation in root_parent_mutations.drain(..) {
822                self.objects
823                    .apply_mutation(
824                        super_block.root_parent_store_object_id,
825                        mutation,
826                        &context,
827                        AssocObj::None,
828                    )
829                    .context("Failed to replay root parent store mutations")?;
830            }
831
832            if let Some((object_id, journal_offset)) = end_flush {
833                journal_offsets.insert(*object_id, *journal_offset);
834            }
835
836            if let Some(object_id) = volume_deleted {
837                journal_offsets.insert(*object_id, VOLUME_DELETED);
838            }
839        }
840
841        // Now we can open the root store.
842        let root_store = ObjectStore::open(
843            &root_parent,
844            super_block.root_store_object_id,
845            Box::new(NullCache {}),
846        )
847        .await
848        .context("Unable to open root store")?;
849
850        ensure!(
851            !root_store.is_encrypted(),
852            anyhow!(FxfsError::Inconsistent).context("Root store is encrypted")
853        );
854        self.objects.set_root_store(root_store);
855
856        let root_store_offset =
857            journal_offsets.get(&super_block.root_store_object_id).copied().unwrap_or(0);
858
859        // Now replay the root store mutations.
860        for JournaledTransaction { checkpoint, root_mutations, .. } in &mut transactions {
861            if checkpoint.file_offset < root_store_offset {
862                continue;
863            }
864
865            let context = ApplyContext { mode: ApplyMode::Replay, checkpoint: checkpoint.clone() };
866            for mutation in root_mutations.drain(..) {
867                self.objects
868                    .apply_mutation(
869                        super_block.root_store_object_id,
870                        mutation,
871                        &context,
872                        AssocObj::None,
873                    )
874                    .context("Failed to replay root store mutations")?;
875            }
876        }
877
878        // Now we can open the allocator.
879        allocator.open().await.context("Failed to open allocator")?;
880
881        // Now replay all other mutations.
882        for JournaledTransaction { checkpoint, non_root_mutations, end_offset, .. } in transactions
883        {
884            self.objects
885                .replay_mutations(
886                    non_root_mutations,
887                    &journal_offsets,
888                    &ApplyContext { mode: ApplyMode::Replay, checkpoint },
889                    end_offset,
890                )
891                .await
892                .context("Failed to replay mutations")?;
893        }
894
895        allocator.on_replay_complete().await.context("Failed to complete replay for allocator")?;
896
897        let discarded_to =
898            if last_checkpoint.file_offset != reader.journal_file_checkpoint().file_offset {
899                Some(reader.journal_file_checkpoint().file_offset)
900            } else {
901                None
902            };
903
904        // Configure the journal writer so that we can continue.
905        {
906            if last_checkpoint.file_offset < super_block.super_block_journal_file_offset {
907                return Err(anyhow!(FxfsError::Inconsistent).context(format!(
908                    "journal replay cut short; journal finishes at {}, but super-block was \
909                     written at {}",
910                    last_checkpoint.file_offset, super_block.super_block_journal_file_offset
911                )));
912            }
913            let handle = ObjectStore::open_object(
914                &root_parent,
915                super_block.journal_object_id,
916                journal_handle_options(),
917                None,
918            )
919            .await
920            .with_context(|| {
921                format!(
922                    "Failed to open journal file (object id: {})",
923                    super_block.journal_object_id
924                )
925            })?;
926            let _ = self.handle.set(handle);
927            let mut inner = self.inner.lock();
928            reader.skip_to_end_of_block();
929            let mut writer_checkpoint = reader.journal_file_checkpoint();
930
931            // Make sure we don't accidentally use the reader from now onwards.
932            std::mem::drop(reader);
933
934            // Reset the stream to indicate that we've remounted the journal.
935            writer_checkpoint.checksum ^= RESET_XOR;
936            writer_checkpoint.version = LATEST_VERSION;
937            inner.flushed_offset = writer_checkpoint.file_offset;
938
939            // When we open the filesystem as writable, we flush the device.
940            inner.device_flushed_offset = inner.flushed_offset;
941
942            inner.writer.seek(writer_checkpoint);
943            inner.output_reset_version = true;
944            inner.valid_to = last_checkpoint.file_offset;
945            if last_checkpoint.file_offset < inner.flushed_offset {
946                inner.discard_offset = Some(last_checkpoint.file_offset);
947            }
948        }
949
950        self.objects
951            .on_replay_complete()
952            .await
953            .context("Failed to complete replay for object manager")?;
954
955        info!(checkpoint = last_checkpoint.file_offset, discarded_to; "replay complete");
956        Ok(())
957    }
958
959    async fn read_transactions(
960        &self,
961        reader: &mut JournalReader,
962        end_offset: Option<u64>,
963        object_id_filter: u64,
964    ) -> Result<JournaledTransactions, Error> {
965        let mut transactions = Vec::new();
966        let (mut device_flushed_offset, root_parent_store_object_id, root_store_object_id) = {
967            let super_block = &self.inner.lock().super_block_header;
968            (
969                super_block.super_block_journal_file_offset,
970                super_block.root_parent_store_object_id,
971                super_block.root_store_object_id,
972            )
973        };
974        let mut current_transaction = None;
975        let mut begin_flush_offsets = HashMap::default();
976        let mut stores_deleted = HashSet::new();
977        loop {
978            // Cache the checkpoint before we deserialize a record.
979            let checkpoint = reader.journal_file_checkpoint();
980            if let Some(end_offset) = end_offset {
981                if checkpoint.file_offset >= end_offset {
982                    break;
983                }
984            }
985            let result =
986                reader.deserialize().await.context("Failed to deserialize journal record")?;
987            match result {
988                ReadResult::Reset(_) => {
989                    if current_transaction.is_some() {
990                        current_transaction = None;
991                        transactions.pop();
992                    }
993                    let offset = reader.journal_file_checkpoint().file_offset;
994                    if offset > device_flushed_offset {
995                        device_flushed_offset = offset;
996                    }
997                }
998                ReadResult::Some(record) => {
999                    match record {
1000                        JournalRecord::EndBlock => {
1001                            reader.skip_to_end_of_block();
1002                        }
1003                        JournalRecord::Mutation { object_id, mutation } => {
1004                            let current_transaction = match current_transaction.as_mut() {
1005                                None => {
1006                                    transactions.push(JournaledTransaction::new(checkpoint));
1007                                    current_transaction = transactions.last_mut();
1008                                    current_transaction.as_mut().unwrap()
1009                                }
1010                                Some(transaction) => transaction,
1011                            };
1012
1013                            if stores_deleted.contains(&object_id) {
1014                                bail!(
1015                                    anyhow!(FxfsError::Inconsistent)
1016                                        .context("Encountered mutations for deleted store")
1017                                );
1018                            }
1019
1020                            match &mutation {
1021                                Mutation::BeginFlush => {
1022                                    begin_flush_offsets.insert(
1023                                        object_id,
1024                                        current_transaction.checkpoint.file_offset,
1025                                    );
1026                                }
1027                                Mutation::EndFlush => {
1028                                    if let Some(offset) = begin_flush_offsets.remove(&object_id) {
1029                                        if let Some(deleted_volume) =
1030                                            &current_transaction.volume_deleted
1031                                        {
1032                                            if *deleted_volume == object_id {
1033                                                bail!(anyhow!(FxfsError::Inconsistent).context(
1034                                                    "Multiple EndFlush/DeleteVolume mutations in a \
1035                                                    single transaction for the same object"
1036                                                ));
1037                                            }
1038                                        }
1039                                        // The +1 is because we don't want to replay the transaction
1040                                        // containing the begin flush; we don't need or want to
1041                                        // replay it.
1042                                        if current_transaction
1043                                            .end_flush
1044                                            .replace((object_id, offset + 1))
1045                                            .is_some()
1046                                        {
1047                                            bail!(anyhow!(FxfsError::Inconsistent).context(
1048                                                "Multiple EndFlush mutations in a \
1049                                                 single transaction"
1050                                            ));
1051                                        }
1052                                    }
1053                                }
1054                                Mutation::DeleteVolume => {
1055                                    if let Some((flushed_object, _)) =
1056                                        &current_transaction.end_flush
1057                                    {
1058                                        if *flushed_object == object_id {
1059                                            bail!(anyhow!(FxfsError::Inconsistent).context(
1060                                                "Multiple EndFlush/DeleteVolume mutations in a \
1061                                                    single transaction for the same object"
1062                                            ));
1063                                        }
1064                                    }
1065                                    if current_transaction
1066                                        .volume_deleted
1067                                        .replace(object_id)
1068                                        .is_some()
1069                                    {
1070                                        bail!(anyhow!(FxfsError::Inconsistent).context(
1071                                            "Multiple DeleteVolume mutations in a single \
1072                                             transaction"
1073                                        ));
1074                                    }
1075                                    stores_deleted.insert(object_id);
1076                                }
1077                                _ => {}
1078                            }
1079
1080                            // If this mutation doesn't need to be applied, don't bother adding it
1081                            // to the transaction.
1082                            if (object_id_filter == INVALID_OBJECT_ID
1083                                || object_id_filter == object_id)
1084                                && self.should_apply(object_id, &current_transaction.checkpoint)
1085                            {
1086                                if object_id == root_parent_store_object_id {
1087                                    current_transaction.root_parent_mutations.push(mutation);
1088                                } else if object_id == root_store_object_id {
1089                                    current_transaction.root_mutations.push(mutation);
1090                                } else {
1091                                    current_transaction
1092                                        .non_root_mutations
1093                                        .push((object_id, mutation));
1094                                }
1095                            }
1096                        }
1097                        JournalRecord::DataChecksums(device_range, checksums, first_write) => {
1098                            let current_transaction = match current_transaction.as_mut() {
1099                                None => {
1100                                    transactions.push(JournaledTransaction::new(checkpoint));
1101                                    current_transaction = transactions.last_mut();
1102                                    current_transaction.as_mut().unwrap()
1103                                }
1104                                Some(transaction) => transaction,
1105                            };
1106                            current_transaction.checksums.push(JournaledChecksums {
1107                                device_range,
1108                                checksums,
1109                                first_write,
1110                            });
1111                        }
1112                        JournalRecord::Commit => {
1113                            if let Some(&mut JournaledTransaction {
1114                                ref checkpoint,
1115                                ref root_parent_mutations,
1116                                ref mut end_offset,
1117                                ..
1118                            }) = current_transaction.take()
1119                            {
1120                                for mutation in root_parent_mutations {
1121                                    // Snoop the mutations for any that might apply to the journal
1122                                    // file so that we can pass them to the reader so that it can
1123                                    // read the journal file.
1124                                    if let Mutation::ObjectStore(ObjectStoreMutation {
1125                                        item:
1126                                            Item {
1127                                                key:
1128                                                    ObjectKey {
1129                                                        object_id,
1130                                                        data:
1131                                                            ObjectKeyData::Attribute(
1132                                                                DEFAULT_DATA_ATTRIBUTE_ID,
1133                                                                AttributeKey::Extent(ExtentKey {
1134                                                                    range,
1135                                                                }),
1136                                                            ),
1137                                                        ..
1138                                                    },
1139                                                value:
1140                                                    ObjectValue::Extent(ExtentValue::Some {
1141                                                        device_offset,
1142                                                        ..
1143                                                    }),
1144                                                ..
1145                                            },
1146                                        ..
1147                                    }) = mutation
1148                                    {
1149                                        // Add the journal extents we find on the way to our
1150                                        // reader.
1151                                        let handle = reader.handle();
1152                                        if *object_id != handle.object_id() {
1153                                            continue;
1154                                        }
1155                                        if let Some(end_offset) = handle.end_offset() {
1156                                            if range.start != end_offset {
1157                                                bail!(anyhow!(FxfsError::Inconsistent).context(
1158                                                    format!(
1159                                                        "Unexpected journal extent {:?} -> {}, \
1160                                                           expected start: {}",
1161                                                        range, device_offset, end_offset,
1162                                                    )
1163                                                ));
1164                                            }
1165                                        }
1166                                        handle.push_extent(
1167                                            checkpoint.file_offset,
1168                                            *device_offset
1169                                                ..*device_offset
1170                                                    + range.length().context("Invalid extent")?,
1171                                        );
1172                                    }
1173                                }
1174                                *end_offset = reader.journal_file_checkpoint().file_offset;
1175                            }
1176                        }
1177                        JournalRecord::Discard(offset) => {
1178                            if offset == 0 {
1179                                bail!(
1180                                    anyhow!(FxfsError::Inconsistent)
1181                                        .context("Invalid offset for Discard")
1182                                );
1183                            }
1184                            if let Some(transaction) = current_transaction.as_ref() {
1185                                if transaction.checkpoint.file_offset < offset {
1186                                    // Odd, but OK.
1187                                    continue;
1188                                }
1189                            }
1190                            current_transaction = None;
1191                            while let Some(transaction) = transactions.last() {
1192                                if transaction.checkpoint.file_offset < offset {
1193                                    break;
1194                                }
1195                                transactions.pop();
1196                            }
1197                            reader.handle().discard_extents(offset);
1198                        }
1199                        JournalRecord::DidFlushDevice(offset) => {
1200                            if offset > device_flushed_offset {
1201                                device_flushed_offset = offset;
1202                            }
1203                        }
1204                    }
1205                }
1206                // This is expected when we reach the end of the journal stream.
1207                ReadResult::ChecksumMismatch => break,
1208            }
1209        }
1210
1211        // Discard any uncommitted transaction.
1212        if current_transaction.is_some() {
1213            transactions.pop();
1214        }
1215
1216        Ok(JournaledTransactions { transactions, device_flushed_offset })
1217    }
1218
1219    /// Creates an empty filesystem with the minimum viable objects (including a root parent and
1220    /// root store but no further child stores).
1221    pub async fn init_empty(&self, filesystem: Arc<FxFilesystem>) -> Result<(), Error> {
1222        // The following constants are only used at format time. When mounting, the recorded values
1223        // in the superblock should be used.  The root parent store does not have a parent, but
1224        // needs an object ID to be registered with ObjectManager, so it cannot collide (i.e. have
1225        // the same object ID) with any objects in the root store that use the journal to track
1226        // mutations.
1227        const INIT_ROOT_PARENT_STORE_OBJECT_ID: u64 = 3;
1228        const INIT_ROOT_STORE_OBJECT_ID: u64 = 4;
1229        const INIT_ALLOCATOR_OBJECT_ID: u64 = 5;
1230
1231        info!(device_size = filesystem.device().size(); "Formatting");
1232
1233        let checkpoint = JournalCheckpoint {
1234            version: LATEST_VERSION,
1235            ..self.inner.lock().writer.journal_file_checkpoint()
1236        };
1237
1238        let mut current_generation = 1;
1239        if filesystem.options().image_builder_mode.is_some() {
1240            // Note that in non-image_builder_mode we write both superblocks when we format
1241            // (in FxFilesystemBuilder::open). In image_builder_mode we only write once at the end
1242            // as part of finalize(), which is why we must make sure the generation we write is
1243            // newer than any existing generation.
1244
1245            // Note: This should is the *filesystem* block size, not the device block size which
1246            // is currently always 4096 (https://fxbug.dev/42063349)
1247            let block_size = filesystem.block_size();
1248            match self.read_superblocks(filesystem.device(), block_size).await {
1249                Ok((super_block, _)) => {
1250                    log::info!(
1251                        "Found existing superblock with generation {}. Bumping by 1.",
1252                        super_block.generation
1253                    );
1254                    current_generation = super_block.generation.wrapping_add(1);
1255                }
1256                Err(_) => {
1257                    // TODO(https://fxbug.dev/463757813): It's not unusual to fail to read
1258                    // superblocks when we're formatting a new filesystem but we should probably
1259                    // fail the format if we get an IO error.
1260                }
1261            }
1262        }
1263
1264        let root_parent = ObjectStore::new_empty(
1265            None,
1266            INIT_ROOT_PARENT_STORE_OBJECT_ID,
1267            filesystem.clone(),
1268            Box::new(NullCache {}),
1269        );
1270        self.objects.set_root_parent_store(root_parent.clone());
1271
1272        let allocator = Arc::new(Allocator::new(filesystem.clone(), INIT_ALLOCATOR_OBJECT_ID));
1273        self.objects.set_allocator(allocator.clone());
1274        self.objects.init_metadata_reservation()?;
1275
1276        let journal_handle;
1277        let super_block_a_handle;
1278        let super_block_b_handle;
1279        let root_store;
1280        let mut transaction = filesystem
1281            .clone()
1282            .new_transaction(
1283                lock_keys![],
1284                Options { skip_journal_checks: true, ..Default::default() },
1285            )
1286            .await?;
1287        root_store = root_parent
1288            .new_child_store(
1289                &mut transaction,
1290                NewChildStoreOptions { object_id: INIT_ROOT_STORE_OBJECT_ID, ..Default::default() },
1291                Box::new(NullCache {}),
1292            )
1293            .await
1294            .context("new_child_store")?;
1295        self.objects.set_root_store(root_store.clone());
1296
1297        allocator.create(&mut transaction).await?;
1298
1299        // Create the super-block objects...
1300        super_block_a_handle = ObjectStore::create_object_with_id(
1301            &root_store,
1302            &mut transaction,
1303            ReservedId::new(&root_store, NonZero::new(SuperBlockInstance::A.object_id()).unwrap()),
1304            HandleOptions::default(),
1305            None,
1306        )
1307        .context("create super block")?;
1308        root_store.update_last_object_id(SuperBlockInstance::A.object_id());
1309        super_block_a_handle
1310            .extend(&mut transaction, SuperBlockInstance::A.first_extent())
1311            .await
1312            .context("extend super block")?;
1313        super_block_b_handle = ObjectStore::create_object_with_id(
1314            &root_store,
1315            &mut transaction,
1316            ReservedId::new(&root_store, NonZero::new(SuperBlockInstance::B.object_id()).unwrap()),
1317            HandleOptions::default(),
1318            None,
1319        )
1320        .context("create super block")?;
1321        root_store.update_last_object_id(SuperBlockInstance::B.object_id());
1322        super_block_b_handle
1323            .extend(&mut transaction, SuperBlockInstance::B.first_extent())
1324            .await
1325            .context("extend super block")?;
1326
1327        // the journal object...
1328        journal_handle = ObjectStore::create_object(
1329            &root_parent,
1330            &mut transaction,
1331            journal_handle_options(),
1332            None,
1333        )
1334        .await
1335        .context("create journal")?;
1336        if self.inner.lock().image_builder_mode.is_none() {
1337            let mut file_range = 0..self.chunk_size();
1338            journal_handle
1339                .preallocate_range(&mut transaction, &mut file_range)
1340                .await
1341                .context("preallocate journal")?;
1342            if file_range.start < file_range.end {
1343                bail!("preallocate_range returned too little space");
1344            }
1345        }
1346
1347        // Write the root store object info.
1348        root_store.create(&mut transaction).await?;
1349
1350        // The root parent graveyard.
1351        root_parent.set_graveyard_directory_object_id(
1352            Graveyard::create(&mut transaction, &root_parent).await?,
1353        );
1354
1355        transaction.commit().await?;
1356
1357        self.inner.lock().super_block_header = SuperBlockHeader::new(
1358            current_generation,
1359            root_parent.store_object_id(),
1360            root_parent.graveyard_directory_object_id(),
1361            root_store.store_object_id(),
1362            allocator.object_id(),
1363            journal_handle.object_id(),
1364            checkpoint,
1365            /* earliest_version: */ LATEST_VERSION,
1366        );
1367
1368        // Initialize the journal writer.
1369        let _ = self.handle.set(journal_handle);
1370        Ok(())
1371    }
1372
1373    /// Normally we allocate the journal when creating the filesystem.
1374    /// This is used image_builder_mode when journal allocation is done last.
1375    pub async fn allocate_journal(&self) -> Result<(), Error> {
1376        let handle = self.handle.get().unwrap();
1377        let filesystem = handle.store().filesystem();
1378        let mut transaction = filesystem
1379            .clone()
1380            .new_transaction(
1381                lock_keys![LockKey::object(handle.store().store_object_id(), handle.object_id()),],
1382                Options { skip_journal_checks: true, ..Default::default() },
1383            )
1384            .await?;
1385        let mut file_range = 0..self.chunk_size();
1386        self.handle
1387            .get()
1388            .unwrap()
1389            .preallocate_range(&mut transaction, &mut file_range)
1390            .await
1391            .context("preallocate journal")?;
1392        if file_range.start < file_range.end {
1393            bail!("preallocate_range returned too little space");
1394        }
1395        transaction.commit().await?;
1396        Ok(())
1397    }
1398
1399    pub async fn init_superblocks(&self) -> Result<(), Error> {
1400        // Overwrite both superblocks.
1401        for _ in 0..2 {
1402            self.write_super_block().await?;
1403        }
1404        Ok(())
1405    }
1406
1407    /// Takes a snapshot of all journaled transactions which affect |object_id| since its last
1408    /// flush.
1409    /// The caller is responsible for locking; it must ensure that the journal is not trimmed during
1410    /// this call.  For example, a Flush lock could be held on the object in question (assuming that
1411    /// object has data to flush and is registered with ObjectManager).
1412    pub async fn read_transactions_for_object(
1413        &self,
1414        object_id: u64,
1415    ) -> Result<Vec<JournaledTransaction>, Error> {
1416        let handle = self.handle.get().expect("No journal handle");
1417        // Reopen the handle since JournalReader needs an owned handle.
1418        let handle = ObjectStore::open_object(
1419            handle.owner(),
1420            handle.object_id(),
1421            journal_handle_options(),
1422            None,
1423        )
1424        .await?;
1425
1426        let checkpoint = match self.objects.journal_checkpoint(object_id) {
1427            Some(checkpoint) => checkpoint,
1428            None => return Ok(vec![]),
1429        };
1430        let mut reader = JournalReader::new(handle, &checkpoint);
1431        // Record the current end offset and only read to there, so we don't accidentally read any
1432        // partially flushed blocks.
1433        let end_offset = self.inner.lock().valid_to;
1434        Ok(self.read_transactions(&mut reader, Some(end_offset), object_id).await?.transactions)
1435    }
1436
1437    /// Commits a transaction.  This is not thread safe; the caller must take appropriate locks.
1438    pub async fn commit(&self, transaction: &mut Transaction<'_>) -> Result<u64, Error> {
1439        if transaction.is_empty() {
1440            return Ok(self.inner.lock().writer.journal_file_checkpoint().file_offset);
1441        }
1442
1443        self.pre_commit(transaction).await?;
1444        Ok(self.write_and_apply_mutations(transaction))
1445    }
1446
1447    // Before we commit, we might need to extend the journal or write pending records to the
1448    // journal.
1449    async fn pre_commit(&self, transaction: &Transaction<'_>) -> Result<(), Error> {
1450        let handle;
1451
1452        let (size, zero_offset) = {
1453            let mut inner = self.inner.lock();
1454
1455            // If this is the first write after a RESET, we need to output version first.
1456            if std::mem::take(&mut inner.output_reset_version) {
1457                LATEST_VERSION.serialize_into(&mut inner.writer)?;
1458            }
1459
1460            if let Some(discard_offset) = inner.discard_offset {
1461                JournalRecord::Discard(discard_offset).serialize_into(&mut inner.writer)?;
1462                inner.discard_offset = None;
1463            }
1464
1465            if inner.needs_did_flush_device {
1466                let offset = inner.device_flushed_offset;
1467                JournalRecord::DidFlushDevice(offset).serialize_into(&mut inner.writer)?;
1468                inner.needs_did_flush_device = false;
1469            }
1470
1471            handle = match self.handle.get() {
1472                None => return Ok(()),
1473                Some(x) => x,
1474            };
1475
1476            let file_offset = inner.writer.journal_file_checkpoint().file_offset;
1477
1478            let size = handle.get_size();
1479            let size = if file_offset + self.chunk_size() > size { Some(size) } else { None };
1480
1481            if size.is_none()
1482                && inner.zero_offset.is_none()
1483                && !self.objects.needs_borrow_for_journal(file_offset)
1484            {
1485                return Ok(());
1486            }
1487
1488            (size, inner.zero_offset)
1489        };
1490
1491        let mut transaction = handle
1492            .new_transaction_with_options(Options {
1493                skip_journal_checks: true,
1494                borrow_metadata_space: true,
1495                allocator_reservation: Some(self.objects.metadata_reservation()),
1496                txn_guard: Some(transaction.txn_guard()),
1497                ..Default::default()
1498            })
1499            .await?;
1500        if let Some(size) = size {
1501            handle
1502                .preallocate_range(&mut transaction, &mut (size..size + self.chunk_size()))
1503                .await?;
1504        }
1505        if let Some(zero_offset) = zero_offset {
1506            handle.zero(&mut transaction, 0..zero_offset).await?;
1507        }
1508
1509        // We can't use regular transaction commit, because that can cause re-entrancy issues, so
1510        // instead we just apply the transaction directly here.
1511        self.write_and_apply_mutations(&mut transaction);
1512
1513        let mut inner = self.inner.lock();
1514
1515        // Make sure the transaction to extend the journal made it to the journal within the old
1516        // size, since otherwise, it won't be possible to replay.
1517        if let Some(size) = size {
1518            assert!(inner.writer.journal_file_checkpoint().file_offset < size);
1519        }
1520
1521        if inner.zero_offset == zero_offset {
1522            inner.zero_offset = None;
1523        }
1524
1525        Ok(())
1526    }
1527
1528    // Determines whether a mutation at the given checkpoint should be applied.  During replay, not
1529    // all records should be applied because the object store or allocator might already contain the
1530    // mutation.  After replay, that obviously isn't the case and we want to apply all mutations.
1531    fn should_apply(&self, object_id: u64, journal_file_checkpoint: &JournalCheckpoint) -> bool {
1532        let super_block_header = &self.inner.lock().super_block_header;
1533        let offset = super_block_header
1534            .journal_file_offsets
1535            .get(&object_id)
1536            .cloned()
1537            .unwrap_or(super_block_header.super_block_journal_file_offset);
1538        journal_file_checkpoint.file_offset >= offset
1539    }
1540
1541    /// Flushes previous writes to the device and then writes out a new super-block.
1542    /// Callers must ensure that we do not make concurrent calls.
1543    async fn write_super_block(&self) -> Result<(), Error> {
1544        let root_parent_store = self.objects.root_parent_store();
1545
1546        // We need to flush previous writes to the device since the new super-block we are writing
1547        // relies on written data being observable, and we also need to lock the root parent store
1548        // so that no new entries are written to it whilst we are writing the super-block, and for
1549        // that we use the write lock.
1550        let old_layers;
1551        let old_super_block_offset;
1552        let mut new_super_block_header;
1553        let checkpoint;
1554        let borrowed;
1555
1556        {
1557            let _sync_guard = debug_assert_not_too_long!(self.sync_mutex.lock());
1558            {
1559                let _write_guard = self.writer_mutex.lock();
1560                (checkpoint, borrowed) = self.pad_to_block()?;
1561                old_layers = super_block::compact_root_parent(&*root_parent_store)?;
1562            }
1563            self.flush_device(checkpoint.file_offset)
1564                .await
1565                .context("flush failed when writing superblock")?;
1566        }
1567
1568        new_super_block_header = self.inner.lock().super_block_header.clone();
1569
1570        old_super_block_offset = new_super_block_header.journal_checkpoint.file_offset;
1571
1572        let (journal_file_offsets, min_checkpoint) = self.objects.journal_file_offsets();
1573
1574        new_super_block_header.generation = new_super_block_header.generation.wrapping_add(1);
1575        new_super_block_header.super_block_journal_file_offset = checkpoint.file_offset;
1576        new_super_block_header.journal_checkpoint = min_checkpoint.unwrap_or(checkpoint);
1577        new_super_block_header.journal_checkpoint.version = LATEST_VERSION;
1578        new_super_block_header.journal_file_offsets = journal_file_offsets;
1579        new_super_block_header.borrowed_metadata_space = borrowed;
1580
1581        self.super_block_manager
1582            .save(
1583                new_super_block_header.clone(),
1584                self.objects.root_parent_store().filesystem(),
1585                old_layers,
1586            )
1587            .await?;
1588        {
1589            let mut inner = self.inner.lock();
1590            inner.super_block_header = new_super_block_header;
1591            inner.zero_offset = Some(round_down(old_super_block_offset, BLOCK_SIZE));
1592        }
1593
1594        Ok(())
1595    }
1596
1597    /// Flushes any buffered journal data to the device.  Note that this does not flush the device
1598    /// unless the flush_device option is set, in which case data should have been persisted to
1599    /// lower layers.  If a precondition is supplied, it is evaluated and the sync will be skipped
1600    /// if it returns false.  This allows callers to check a condition whilst a lock is held.  If a
1601    /// sync is performed, this function returns the checkpoint that was flushed and the amount of
1602    /// borrowed metadata space at the point it was flushed.
1603    pub async fn sync(
1604        &self,
1605        options: SyncOptions<'_>,
1606    ) -> Result<Option<(JournalCheckpoint, u64)>, Error> {
1607        let _guard = debug_assert_not_too_long!(self.sync_mutex.lock());
1608
1609        let (checkpoint, borrowed) = {
1610            if let Some(precondition) = options.precondition {
1611                if !precondition() {
1612                    return Ok(None);
1613                }
1614            }
1615
1616            // This guard is required so that we don't insert an EndBlock record in the middle of a
1617            // transaction.
1618            let _guard = self.writer_mutex.lock();
1619
1620            self.pad_to_block()?
1621        };
1622
1623        if options.flush_device {
1624            self.flush_device(checkpoint.file_offset).await.context("sync: flush failed")?;
1625        }
1626
1627        Ok(Some((checkpoint, borrowed)))
1628    }
1629
1630    // Returns the checkpoint as it was prior to padding.  This is done because the super block
1631    // needs to record where the last transaction ends and it's the next transaction that pays the
1632    // price of the padding.
1633    fn pad_to_block(&self) -> Result<(JournalCheckpoint, u64), Error> {
1634        let mut inner = self.inner.lock();
1635        let checkpoint = inner.writer.journal_file_checkpoint();
1636        if checkpoint.file_offset % BLOCK_SIZE != 0 {
1637            JournalRecord::EndBlock.serialize_into(&mut inner.writer)?;
1638            inner.writer.pad_to_block()?;
1639            if let Some(waker) = inner.flush_waker.take() {
1640                waker.wake();
1641            }
1642        }
1643        Ok((checkpoint, self.objects.borrowed_metadata_space()))
1644    }
1645
1646    async fn flush_device(&self, checkpoint_offset: u64) -> Result<(), Error> {
1647        assert!(
1648            self.inner.lock().image_builder_mode.is_none(),
1649            "flush_device called in image builder mode"
1650        );
1651        debug_assert_not_too_long!(poll_fn(|ctx| {
1652            let mut inner = self.inner.lock();
1653            if inner.flushed_offset >= checkpoint_offset {
1654                Poll::Ready(Ok(()))
1655            } else if inner.terminate {
1656                let context = inner
1657                    .terminate_reason
1658                    .as_ref()
1659                    .map(|e| format!("Journal closed with error: {:?}", e))
1660                    .unwrap_or_else(|| "Journal closed".to_string());
1661                Poll::Ready(Err(anyhow!(FxfsError::JournalFlushError).context(context)))
1662            } else {
1663                inner.sync_waker = Some(ctx.waker().clone());
1664                Poll::Pending
1665            }
1666        }))?;
1667
1668        let needs_flush = self.inner.lock().device_flushed_offset < checkpoint_offset;
1669        if needs_flush {
1670            let trace = self.trace.load(Ordering::Relaxed);
1671            if trace {
1672                info!("J: start flush device");
1673            }
1674            self.handle.get().unwrap().flush_device().await?;
1675            if trace {
1676                info!("J: end flush device");
1677            }
1678
1679            // We need to write a DidFlushDevice record at some point, but if we are in the
1680            // process of shutting down the filesystem, we want to leave the journal clean to
1681            // avoid there being log messages complaining about unwritten journal data, so we
1682            // queue it up so that the next transaction will trigger this record to be written.
1683            // If we are shutting down, that will never happen but since the DidFlushDevice
1684            // message is purely advisory (it reduces the number of checksums we have to verify
1685            // during replay), it doesn't matter if it isn't written.
1686            {
1687                let mut inner = self.inner.lock();
1688                inner.device_flushed_offset = checkpoint_offset;
1689                inner.needs_did_flush_device = true;
1690            }
1691
1692            // Tell the allocator that we flushed the device so that it can now start using
1693            // space that was deallocated.
1694            self.objects.allocator().did_flush_device(checkpoint_offset);
1695            if trace {
1696                info!("J: did flush device");
1697            }
1698        }
1699
1700        Ok(())
1701    }
1702
1703    /// Returns a copy of the super-block header.
1704    pub fn super_block_header(&self) -> SuperBlockHeader {
1705        self.inner.lock().super_block_header.clone()
1706    }
1707
1708    /// Waits for there to be sufficient space in the journal.
1709    pub async fn check_journal_space(&self) -> Result<(), Error> {
1710        loop {
1711            debug_assert_not_too_long!({
1712                let inner = self.inner.lock();
1713                if inner.terminate {
1714                    // If the flush error is set, this will never make progress, since we can't
1715                    // extend the journal any more.
1716                    let context = inner
1717                        .terminate_reason
1718                        .as_ref()
1719                        .map(|e| format!("Journal closed with error: {:?}", e))
1720                        .unwrap_or_else(|| "Journal closed".to_string());
1721                    break Err(anyhow!(FxfsError::JournalFlushError).context(context));
1722                }
1723                if self.objects.last_end_offset()
1724                    - inner.super_block_header.journal_checkpoint.file_offset
1725                    < inner.reclaim_size
1726                {
1727                    break Ok(());
1728                }
1729                if inner.image_builder_mode.is_some() {
1730                    break Ok(());
1731                }
1732                if inner.disable_compactions {
1733                    break Err(
1734                        anyhow!(FxfsError::JournalFlushError).context("Compactions disabled")
1735                    );
1736                }
1737                self.reclaim_event.listen()
1738            });
1739        }
1740    }
1741
1742    fn chunk_size(&self) -> u64 {
1743        CHUNK_SIZE
1744    }
1745
1746    fn write_and_apply_mutations(&self, transaction: &mut Transaction<'_>) -> u64 {
1747        let checkpoint_before;
1748        let checkpoint_after;
1749        {
1750            let _guard = self.writer_mutex.lock();
1751            checkpoint_before = {
1752                let mut inner = self.inner.lock();
1753                if transaction.includes_write() {
1754                    inner.needs_barrier = true;
1755                }
1756                let checkpoint = inner.writer.journal_file_checkpoint();
1757                for TxnMutation { object_id, mutation, .. } in transaction.mutations() {
1758                    self.objects.write_mutation(
1759                        *object_id,
1760                        mutation,
1761                        Writer(*object_id, &mut inner.writer),
1762                    );
1763                }
1764                checkpoint
1765            };
1766            let maybe_mutation =
1767                self.objects.apply_transaction(transaction, &checkpoint_before).expect(
1768                    "apply_transaction should not fail in live mode; \
1769                     filesystem will be in an inconsistent state",
1770                );
1771            checkpoint_after = {
1772                let mut inner = self.inner.lock();
1773                if let Some(mutation) = maybe_mutation {
1774                    inner
1775                        .writer
1776                        .write_record(&JournalRecord::Mutation { object_id: 0, mutation })
1777                        .unwrap();
1778                }
1779                for (device_range, checksums, first_write) in
1780                    transaction.take_checksums().into_iter()
1781                {
1782                    inner
1783                        .writer
1784                        .write_record(&JournalRecord::DataChecksums(
1785                            device_range,
1786                            Checksums::fletcher(checksums),
1787                            first_write,
1788                        ))
1789                        .unwrap();
1790                }
1791                inner.writer.write_record(&JournalRecord::Commit).unwrap();
1792
1793                inner.writer.journal_file_checkpoint()
1794            };
1795        }
1796        self.objects.did_commit_transaction(
1797            transaction,
1798            &checkpoint_before,
1799            checkpoint_after.file_offset,
1800        );
1801
1802        if let Some(waker) = self.inner.lock().flush_waker.take() {
1803            waker.wake();
1804        }
1805
1806        checkpoint_before.file_offset
1807    }
1808
1809    /// This task will flush journal data to the device when there is data that needs flushing, and
1810    /// trigger compactions when short of journal space.  It will return after the terminate method
1811    /// has been called, or an error is encountered with either flushing or compaction.
1812    pub async fn flush_task(self: Arc<Self>) {
1813        let mut flush_fut = None;
1814        let mut compact_fut = None;
1815        let mut flush_error = false;
1816        poll_fn(|ctx| {
1817            loop {
1818                {
1819                    let mut inner = self.inner.lock();
1820                    if flush_fut.is_none() && !flush_error && self.handle.get().is_some() {
1821                        let flushable = inner.writer.flushable_bytes();
1822                        if flushable > 0 {
1823                            flush_fut = Some(Box::pin(self.flush(flushable)));
1824                        }
1825                    }
1826                    if inner.terminate && flush_fut.is_none() && compact_fut.is_none() {
1827                        return Poll::Ready(());
1828                    }
1829                    // The / 2 is here because after compacting, we cannot reclaim the space until the
1830                    // _next_ time we flush the device since the super-block is not guaranteed to
1831                    // persist until then.
1832                    if compact_fut.is_none()
1833                        && !inner.terminate
1834                        && !inner.disable_compactions
1835                        && inner.image_builder_mode.is_none()
1836                        && self.objects.last_end_offset()
1837                            - inner.super_block_header.journal_checkpoint.file_offset
1838                            > inner.reclaim_size / 2
1839                    {
1840                        compact_fut = Some(Box::pin(self.compact()));
1841                        inner.compaction_running = true;
1842                    }
1843                    inner.flush_waker = Some(ctx.waker().clone());
1844                }
1845                let mut pending = true;
1846                if let Some(fut) = flush_fut.as_mut() {
1847                    if let Poll::Ready(result) = fut.poll_unpin(ctx) {
1848                        if let Err(e) = result {
1849                            self.inner.lock().terminate(Some(e.context("Flush error")));
1850                            self.reclaim_event.notify(usize::MAX);
1851                            flush_error = true;
1852                        }
1853                        flush_fut = None;
1854                        pending = false;
1855                    }
1856                }
1857                if let Some(fut) = compact_fut.as_mut() {
1858                    if let Poll::Ready(result) = fut.poll_unpin(ctx) {
1859                        let mut inner = self.inner.lock();
1860                        if let Err(e) = result {
1861                            inner.terminate(Some(e.context("Compaction error")));
1862                        }
1863                        compact_fut = None;
1864                        inner.compaction_running = false;
1865                        self.reclaim_event.notify(usize::MAX);
1866                        pending = false;
1867                    }
1868                }
1869                if pending {
1870                    return Poll::Pending;
1871                }
1872            }
1873        })
1874        .await;
1875    }
1876
1877    /// Returns a yielder that can be used for compactions.
1878    pub fn get_compaction_yielder(&self) -> CompactionYielder<'_> {
1879        CompactionYielder::new(self)
1880    }
1881
1882    async fn flush(&self, amount: usize) -> Result<(), Error> {
1883        let handle = self.handle.get().unwrap();
1884        let mut buf = handle.allocate_buffer(amount).await;
1885        let (offset, len, barrier_on_first_write) = {
1886            let mut inner = self.inner.lock();
1887            let offset = inner.writer.take_flushable(buf.as_mut());
1888            let barrier_on_first_write = inner.needs_barrier && inner.barriers_enabled;
1889            // Reset `needs_barrier` before instead of after the overwrite in case a txn commit
1890            // that contains data happens during the overwrite.
1891            inner.needs_barrier = false;
1892            (offset, buf.len() as u64, barrier_on_first_write)
1893        };
1894        self.handle
1895            .get()
1896            .unwrap()
1897            .overwrite(
1898                offset,
1899                buf.as_mut(),
1900                OverwriteOptions { barrier_on_first_write, ..Default::default() },
1901            )
1902            .await?;
1903
1904        let mut inner = self.inner.lock();
1905        if let Some(waker) = inner.sync_waker.take() {
1906            waker.wake();
1907        }
1908        inner.flushed_offset = offset + len;
1909        inner.valid_to = inner.flushed_offset;
1910        Ok(())
1911    }
1912
1913    #[trace]
1914    async fn compact(&self) -> Result<(), Error> {
1915        assert!(
1916            self.inner.lock().image_builder_mode.is_none(),
1917            "compact called in image builder mode"
1918        );
1919        let bytes_before = self.objects.compaction_bytes_written();
1920        let _measure = crate::metrics::DurationMeasureScope::new(
1921            &crate::metrics::lsm_tree_metrics().journal_compaction_time,
1922        );
1923        crate::metrics::lsm_tree_metrics().journal_compactions_total.add(1);
1924        let trace = self.trace.load(Ordering::Relaxed);
1925        debug!("Compaction starting");
1926        if trace {
1927            info!("J: start compaction");
1928        }
1929        let earliest_version = self.objects.flush().await.context("Failed to flush objects")?;
1930        self.inner.lock().super_block_header.earliest_version = earliest_version;
1931        self.write_super_block().await.context("Failed to write superblock")?;
1932        if trace {
1933            info!("J: end compaction");
1934        }
1935        debug!("Compaction finished");
1936        let bytes_after = self.objects.compaction_bytes_written();
1937        crate::metrics::lsm_tree_metrics()
1938            .journal_compaction_bytes_written
1939            .add(bytes_after.saturating_sub(bytes_before));
1940        Ok(())
1941    }
1942
1943    /// This should generally NOT be called externally. It is public to allow use by FIDL service
1944    /// fxfs.Debug.
1945    pub async fn force_compact(&self) -> Result<(), Error> {
1946        self.inner.lock().forced_compaction = true;
1947        scopeguard::defer! { self.inner.lock().forced_compaction = false; }
1948        self.compact().await
1949    }
1950
1951    pub async fn stop_compactions(&self) {
1952        loop {
1953            debug_assert_not_too_long!({
1954                let mut inner = self.inner.lock();
1955                inner.disable_compactions = true;
1956                if !inner.compaction_running {
1957                    return;
1958                }
1959                self.reclaim_event.listen()
1960            });
1961        }
1962    }
1963
1964    /// Creates a lazy inspect node named `str` under `parent` which will yield statistics for the
1965    /// journal when queried.
1966    pub fn track_statistics(self: &Arc<Self>, parent: &fuchsia_inspect::Node, name: &str) {
1967        let this = Arc::downgrade(self);
1968        parent.record_lazy_child(name, move || {
1969            let this_clone = this.clone();
1970            async move {
1971                let inspector = fuchsia_inspect::Inspector::default();
1972                if let Some(this) = this_clone.upgrade() {
1973                    let (journal_min, journal_max, journal_reclaim_size) = {
1974                        // TODO(https://fxbug.dev/42069513): Push-back or rate-limit to prevent DoS.
1975                        let inner = this.inner.lock();
1976                        (
1977                            round_down(
1978                                inner.super_block_header.journal_checkpoint.file_offset,
1979                                BLOCK_SIZE,
1980                            ),
1981                            inner.flushed_offset,
1982                            inner.reclaim_size,
1983                        )
1984                    };
1985                    let root = inspector.root();
1986                    root.record_uint("journal_min_offset", journal_min);
1987                    root.record_uint("journal_max_offset", journal_max);
1988                    root.record_uint("journal_size", journal_max - journal_min);
1989                    root.record_uint("journal_reclaim_size", journal_reclaim_size);
1990
1991                    // TODO(https://fxbug.dev/42068224): Post-compute rather than manually computing metrics.
1992                    if let Some(x) = round_div(
1993                        100 * (journal_max - journal_min),
1994                        this.objects.allocator().get_disk_bytes(),
1995                    ) {
1996                        root.record_uint("journal_size_to_disk_size_percent", x);
1997                    }
1998                }
1999                Ok(inspector)
2000            }
2001            .boxed()
2002        });
2003    }
2004
2005    /// Terminate all journal activity.
2006    pub fn terminate(&self) {
2007        self.inner.lock().terminate(/*reason*/ None);
2008        self.reclaim_event.notify(usize::MAX);
2009    }
2010}
2011
2012/// Wrapper to allow records to be written to the journal.
2013pub struct Writer<'a>(u64, &'a mut JournalWriter);
2014
2015impl Writer<'_> {
2016    pub fn write(&mut self, mutation: Mutation) {
2017        self.1.write_record(&JournalRecord::Mutation { object_id: self.0, mutation }).unwrap();
2018    }
2019}
2020
2021#[cfg(target_os = "fuchsia")]
2022mod yielder {
2023    use super::Journal;
2024    use crate::lsm_tree::Yielder;
2025    use fuchsia_async as fasync;
2026
2027    /// CompactionYielder uses fuchsia-async to yield if other tasks are being polled, which should
2028    /// be a proxy for how busy the system is.  We can afford to delay compactions for a small
2029    /// amount of time but not so long that we end up blocking new transactions.
2030    pub struct CompactionYielder<'a> {
2031        journal: &'a Journal,
2032        low_priority_task: Option<fasync::LowPriorityTask>,
2033    }
2034
2035    impl<'a> CompactionYielder<'a> {
2036        pub fn new(journal: &'a Journal) -> Self {
2037            Self { journal, low_priority_task: None }
2038        }
2039    }
2040
2041    impl Yielder for CompactionYielder<'_> {
2042        async fn yield_now(&mut self) {
2043            // We will wait for the executor to be idle for 4ms, but no longer than 16ms.  We need
2044            // to cap the maximum amount of time we wait in case we've reached a point where
2045            // compaction is now urgent or else we could block new transactions.
2046            const IDLE_PERIOD: zx::MonotonicDuration = zx::MonotonicDuration::from_millis(4);
2047            const MAX_YIELD_DURATION: zx::MonotonicDuration =
2048                zx::MonotonicDuration::from_millis(16);
2049
2050            {
2051                let inner = self.journal.inner.lock();
2052                if inner.forced_compaction {
2053                    return;
2054                }
2055                let outstanding = self.journal.objects.last_end_offset()
2056                    - inner.super_block_header.journal_checkpoint.file_offset;
2057                let half_reclaim_size = inner.reclaim_size / 2;
2058                if outstanding
2059                    .checked_sub(half_reclaim_size)
2060                    .is_some_and(|x| x >= half_reclaim_size / 2)
2061                {
2062                    // If we have got to the point where we have used up 3/4 of reclaim size in the
2063                    // journal, do not delay any further.  If we continue to yield we will get to
2064                    // the point where we block new transactions.
2065                    self.low_priority_task = None;
2066                    return;
2067                }
2068            }
2069
2070            self.low_priority_task
2071                .get_or_insert_with(|| fasync::LowPriorityTask::new())
2072                .wait_until_idle_for(
2073                    IDLE_PERIOD,
2074                    fasync::MonotonicInstant::after(MAX_YIELD_DURATION),
2075                )
2076                .await;
2077        }
2078    }
2079}
2080
2081#[cfg(not(target_os = "fuchsia"))]
2082mod yielder {
2083    use super::Journal;
2084    use crate::lsm_tree::Yielder;
2085
2086    #[expect(dead_code)]
2087    pub struct CompactionYielder<'a>(&'a Journal);
2088
2089    impl<'a> CompactionYielder<'a> {
2090        pub fn new(journal: &'a Journal) -> Self {
2091            Self(journal)
2092        }
2093    }
2094
2095    impl Yielder for CompactionYielder<'_> {
2096        async fn yield_now(&mut self) {}
2097    }
2098}
2099
2100pub use yielder::*;
2101
2102#[cfg(test)]
2103mod tests {
2104    use super::SuperBlockInstance;
2105    use crate::filesystem::{FxFilesystem, FxFilesystemBuilder, SyncOptions};
2106    use crate::fsck::fsck;
2107    use crate::object_handle::{ObjectHandle, ReadObjectHandle, WriteObjectHandle};
2108    use crate::object_store::directory::Directory;
2109    use crate::object_store::transaction::Options;
2110    use crate::object_store::volume::root_volume;
2111    use crate::object_store::{
2112        HandleOptions, LockKey, NewChildStoreOptions, ObjectStore, StoreOptions, lock_keys,
2113    };
2114    use fuchsia_async as fasync;
2115    use fuchsia_async::MonotonicDuration;
2116    use storage_device::DeviceHolder;
2117    use storage_device::fake_device::FakeDevice;
2118
2119    const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
2120
2121    #[fuchsia::test]
2122    async fn test_replay() {
2123        const TEST_DATA: &[u8] = b"hello";
2124
2125        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
2126
2127        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2128
2129        let object_id = {
2130            let root_store = fs.root_store();
2131            let root_directory =
2132                Directory::open(&root_store, root_store.root_directory_object_id())
2133                    .await
2134                    .expect("open failed");
2135            let mut transaction = fs
2136                .clone()
2137                .new_transaction(
2138                    lock_keys![LockKey::object(
2139                        root_store.store_object_id(),
2140                        root_store.root_directory_object_id(),
2141                    )],
2142                    Options::default(),
2143                )
2144                .await
2145                .expect("new_transaction failed");
2146            let handle = root_directory
2147                .create_child_file(&mut transaction, "test")
2148                .await
2149                .expect("create_child_file failed");
2150
2151            transaction.commit().await.expect("commit failed");
2152            let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
2153            buf.as_mut_slice().copy_from_slice(TEST_DATA);
2154            handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2155            // As this is the first sync, this will actually trigger a new super-block, but normally
2156            // this would not be the case.
2157            fs.sync(SyncOptions::default()).await.expect("sync failed");
2158            handle.object_id()
2159        };
2160
2161        {
2162            fs.close().await.expect("Close failed");
2163            let device = fs.take_device().await;
2164            device.reopen(false);
2165            let fs = FxFilesystem::open(device).await.expect("open failed");
2166            let handle = ObjectStore::open_object(
2167                &fs.root_store(),
2168                object_id,
2169                HandleOptions::default(),
2170                None,
2171            )
2172            .await
2173            .expect("open_object failed");
2174            let mut buf = handle.allocate_buffer(TEST_DEVICE_BLOCK_SIZE as usize).await;
2175            assert_eq!(handle.read(0, buf.as_mut()).await.expect("read failed"), TEST_DATA.len());
2176            assert_eq!(&buf.as_slice()[..TEST_DATA.len()], TEST_DATA);
2177            fsck(fs.clone()).await.expect("fsck failed");
2178            fs.close().await.expect("Close failed");
2179        }
2180    }
2181
2182    #[fuchsia::test]
2183    async fn test_reset() {
2184        const TEST_DATA: &[u8] = b"hello";
2185
2186        let device = DeviceHolder::new(FakeDevice::new(32768, TEST_DEVICE_BLOCK_SIZE));
2187
2188        let mut object_ids = Vec::new();
2189
2190        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2191        {
2192            let root_store = fs.root_store();
2193            let root_directory =
2194                Directory::open(&root_store, root_store.root_directory_object_id())
2195                    .await
2196                    .expect("open failed");
2197            let mut transaction = fs
2198                .clone()
2199                .new_transaction(
2200                    lock_keys![LockKey::object(
2201                        root_store.store_object_id(),
2202                        root_store.root_directory_object_id(),
2203                    )],
2204                    Options::default(),
2205                )
2206                .await
2207                .expect("new_transaction failed");
2208            let handle = root_directory
2209                .create_child_file(&mut transaction, "test")
2210                .await
2211                .expect("create_child_file failed");
2212            transaction.commit().await.expect("commit failed");
2213            let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
2214            buf.as_mut_slice().copy_from_slice(TEST_DATA);
2215            handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2216            fs.sync(SyncOptions::default()).await.expect("sync failed");
2217            object_ids.push(handle.object_id());
2218
2219            // Create a lot of objects but don't sync at the end. This should leave the filesystem
2220            // with a half finished transaction that cannot be replayed.
2221            for i in 0..1000 {
2222                let mut transaction = fs
2223                    .clone()
2224                    .new_transaction(
2225                        lock_keys![LockKey::object(
2226                            root_store.store_object_id(),
2227                            root_store.root_directory_object_id(),
2228                        )],
2229                        Options::default(),
2230                    )
2231                    .await
2232                    .expect("new_transaction failed");
2233                let handle = root_directory
2234                    .create_child_file(&mut transaction, &format!("{}", i))
2235                    .await
2236                    .expect("create_child_file failed");
2237                transaction.commit().await.expect("commit failed");
2238                let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
2239                buf.as_mut_slice().copy_from_slice(TEST_DATA);
2240                handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2241                object_ids.push(handle.object_id());
2242            }
2243        }
2244        fs.close().await.expect("fs close failed");
2245        let device = fs.take_device().await;
2246        device.reopen(false);
2247        let fs = FxFilesystem::open(device).await.expect("open failed");
2248        fsck(fs.clone()).await.expect("fsck failed");
2249        {
2250            let root_store = fs.root_store();
2251            // Check the first two objects which should exist.
2252            for &object_id in &object_ids[0..1] {
2253                let handle = ObjectStore::open_object(
2254                    &root_store,
2255                    object_id,
2256                    HandleOptions::default(),
2257                    None,
2258                )
2259                .await
2260                .expect("open_object failed");
2261                let mut buf = handle.allocate_buffer(TEST_DEVICE_BLOCK_SIZE as usize).await;
2262                assert_eq!(
2263                    handle.read(0, buf.as_mut()).await.expect("read failed"),
2264                    TEST_DATA.len()
2265                );
2266                assert_eq!(&buf.as_slice()[..TEST_DATA.len()], TEST_DATA);
2267            }
2268
2269            // Write one more object and sync.
2270            let root_directory =
2271                Directory::open(&root_store, root_store.root_directory_object_id())
2272                    .await
2273                    .expect("open failed");
2274            let mut transaction = fs
2275                .clone()
2276                .new_transaction(
2277                    lock_keys![LockKey::object(
2278                        root_store.store_object_id(),
2279                        root_store.root_directory_object_id(),
2280                    )],
2281                    Options::default(),
2282                )
2283                .await
2284                .expect("new_transaction failed");
2285            let handle = root_directory
2286                .create_child_file(&mut transaction, "test2")
2287                .await
2288                .expect("create_child_file failed");
2289            transaction.commit().await.expect("commit failed");
2290            let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
2291            buf.as_mut_slice().copy_from_slice(TEST_DATA);
2292            handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2293            fs.sync(SyncOptions::default()).await.expect("sync failed");
2294            object_ids.push(handle.object_id());
2295        }
2296
2297        fs.close().await.expect("close failed");
2298        let device = fs.take_device().await;
2299        device.reopen(false);
2300        let fs = FxFilesystem::open(device).await.expect("open failed");
2301        {
2302            fsck(fs.clone()).await.expect("fsck failed");
2303
2304            // Check the first two and the last objects.
2305            for &object_id in object_ids[0..1].iter().chain(object_ids.last().cloned().iter()) {
2306                let handle = ObjectStore::open_object(
2307                    &fs.root_store(),
2308                    object_id,
2309                    HandleOptions::default(),
2310                    None,
2311                )
2312                .await
2313                .unwrap_or_else(|e| {
2314                    panic!("open_object failed (object_id: {}): {:?}", object_id, e)
2315                });
2316                let mut buf = handle.allocate_buffer(TEST_DEVICE_BLOCK_SIZE as usize).await;
2317                assert_eq!(
2318                    handle.read(0, buf.as_mut()).await.expect("read failed"),
2319                    TEST_DATA.len()
2320                );
2321                assert_eq!(&buf.as_slice()[..TEST_DATA.len()], TEST_DATA);
2322            }
2323        }
2324        fs.close().await.expect("close failed");
2325    }
2326
2327    #[fuchsia::test]
2328    async fn test_discard() {
2329        let device = {
2330            let device = DeviceHolder::new(FakeDevice::new(8192, 4096));
2331            let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2332            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
2333
2334            let store = root_volume
2335                .new_volume("test", NewChildStoreOptions::default())
2336                .await
2337                .expect("new_volume failed");
2338            let root_directory = Directory::open(&store, store.root_directory_object_id())
2339                .await
2340                .expect("open failed");
2341
2342            // Create enough data so that another journal extent is used.
2343            let mut i = 0;
2344            loop {
2345                let mut transaction = fs
2346                    .clone()
2347                    .new_transaction(
2348                        lock_keys![LockKey::object(
2349                            store.store_object_id(),
2350                            store.root_directory_object_id()
2351                        )],
2352                        Options::default(),
2353                    )
2354                    .await
2355                    .expect("new_transaction failed");
2356                root_directory
2357                    .create_child_file(&mut transaction, &format!("a {i}"))
2358                    .await
2359                    .expect("create_child_file failed");
2360                if transaction.commit().await.expect("commit failed") > super::CHUNK_SIZE {
2361                    break;
2362                }
2363                i += 1;
2364            }
2365
2366            // Compact and then disable compactions.
2367            fs.journal().force_compact().await.expect("compact failed");
2368            fs.journal().stop_compactions().await;
2369
2370            // Keep going until we need another journal extent.
2371            let mut i = 0;
2372            loop {
2373                let mut transaction = fs
2374                    .clone()
2375                    .new_transaction(
2376                        lock_keys![LockKey::object(
2377                            store.store_object_id(),
2378                            store.root_directory_object_id()
2379                        )],
2380                        Options::default(),
2381                    )
2382                    .await
2383                    .expect("new_transaction failed");
2384                root_directory
2385                    .create_child_file(&mut transaction, &format!("b {i}"))
2386                    .await
2387                    .expect("create_child_file failed");
2388                if transaction.commit().await.expect("commit failed") > 2 * super::CHUNK_SIZE {
2389                    break;
2390                }
2391                i += 1;
2392            }
2393
2394            // Allow the journal to flush, but we don't want to sync.
2395            fasync::Timer::new(MonotonicDuration::from_millis(10)).await;
2396            // Because we're not gracefully closing the filesystem, a Discard record will be
2397            // emitted.
2398            fs.device().snapshot().expect("snapshot failed")
2399        };
2400
2401        let fs = FxFilesystem::open(device).await.expect("open failed");
2402
2403        {
2404            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
2405
2406            let store =
2407                root_volume.volume("test", StoreOptions::default()).await.expect("volume failed");
2408
2409            let root_directory = Directory::open(&store, store.root_directory_object_id())
2410                .await
2411                .expect("open failed");
2412
2413            // Write one more transaction.
2414            let mut transaction = fs
2415                .clone()
2416                .new_transaction(
2417                    lock_keys![LockKey::object(
2418                        store.store_object_id(),
2419                        store.root_directory_object_id()
2420                    )],
2421                    Options::default(),
2422                )
2423                .await
2424                .expect("new_transaction failed");
2425            root_directory
2426                .create_child_file(&mut transaction, &format!("d"))
2427                .await
2428                .expect("create_child_file failed");
2429            transaction.commit().await.expect("commit failed");
2430        }
2431
2432        fs.close().await.expect("close failed");
2433        let device = fs.take_device().await;
2434        device.reopen(false);
2435
2436        let fs = FxFilesystem::open(device).await.expect("open failed");
2437        fsck(fs.clone()).await.expect("fsck failed");
2438        fs.close().await.expect("close failed");
2439    }
2440
2441    #[fuchsia::test]
2442    async fn test_use_existing_generation() {
2443        let device = DeviceHolder::new(FakeDevice::new(8192, 4096));
2444
2445        // First format should be generation 1.
2446        let fs = FxFilesystemBuilder::new()
2447            .format(true)
2448            .image_builder_mode(Some(SuperBlockInstance::A))
2449            .open(device)
2450            .await
2451            .expect("open failed");
2452        fs.enable_allocations();
2453        let generation0 = fs.super_block_header().generation;
2454        assert_eq!(generation0, 1);
2455        fs.close().await.expect("close failed");
2456        let device = fs.take_device().await;
2457        device.reopen(false);
2458
2459        // Format the device normally (again, generation 1).
2460        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2461        let generation1 = fs.super_block_header().generation;
2462        {
2463            let root_volume = crate::object_store::volume::root_volume(fs.clone())
2464                .await
2465                .expect("root_volume failed");
2466            root_volume
2467                .new_volume("test", crate::object_store::NewChildStoreOptions::default())
2468                .await
2469                .expect("new_volume failed");
2470        }
2471        fs.close().await.expect("close failed");
2472        let device = fs.take_device().await;
2473        device.reopen(false);
2474
2475        // Format again with image_builder_mode.
2476        let fs = FxFilesystemBuilder::new()
2477            .format(true)
2478            .image_builder_mode(Some(SuperBlockInstance::A))
2479            .open(device)
2480            .await
2481            .expect("open failed");
2482        fs.enable_allocations();
2483        let generation2 = fs.super_block_header().generation;
2484        assert!(
2485            generation2 > generation1,
2486            "generation2 ({}) should be greater than generation1 ({})",
2487            generation2,
2488            generation1
2489        );
2490        fs.close().await.expect("close failed");
2491    }
2492
2493    #[fuchsia::test]
2494    async fn test_image_builder_mode_generation_bump_512_byte_block() {
2495        let device = DeviceHolder::new(FakeDevice::new(16384, 512));
2496
2497        // Format initial filesystem (generation 1)
2498        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2499        let generation1 = fs.super_block_header().generation;
2500        fs.close().await.expect("close failed");
2501        let device = fs.take_device().await;
2502        device.reopen(false);
2503
2504        // Format again with image_builder_mode (should bump generation)
2505        let fs = FxFilesystemBuilder::new()
2506            .format(true)
2507            .image_builder_mode(Some(SuperBlockInstance::A))
2508            .open(device)
2509            .await
2510            .expect("open failed");
2511
2512        fs.enable_allocations();
2513        let generation2 = fs.super_block_header().generation;
2514        assert!(
2515            generation2 > generation1,
2516            "Expected generation bump, got {} vs {}",
2517            generation2,
2518            generation1
2519        );
2520        fs.close().await.expect("close failed");
2521    }
2522
2523    #[fuchsia::test(allow_stalls = false)]
2524    #[cfg(target_os = "fuchsia")]
2525    async fn test_low_priority_compaction() {
2526        use fuchsia_async::TestExecutor;
2527        use std::sync::Arc;
2528        use std::sync::atomic::{AtomicBool, Ordering};
2529
2530        let device = DeviceHolder::new(FakeDevice::new(16384, TEST_DEVICE_BLOCK_SIZE));
2531        let fs = FxFilesystemBuilder::new()
2532            .journal_options(super::JournalOptions { reclaim_size: 65536, ..Default::default() })
2533            .format(true)
2534            .open(device)
2535            .await
2536            .expect("open failed");
2537
2538        let _low = fasync::LowPriorityTask::new();
2539
2540        // Add some data to the tree.
2541        {
2542            let root_store = fs.root_store();
2543            let root_directory =
2544                Directory::open(&root_store, root_store.root_directory_object_id())
2545                    .await
2546                    .expect("open failed");
2547            for i in 0..100 {
2548                let mut transaction = fs
2549                    .clone()
2550                    .new_transaction(
2551                        lock_keys![LockKey::object(
2552                            root_store.store_object_id(),
2553                            root_store.root_directory_object_id(),
2554                        )],
2555                        Options::default(),
2556                    )
2557                    .await
2558                    .expect("new_transaction failed");
2559                root_directory
2560                    .create_child_file(&mut transaction, &format!("test{}", i))
2561                    .await
2562                    .expect("create_child_file failed");
2563                transaction.commit().await.expect("commit failed");
2564            }
2565        }
2566
2567        // Spawn a task that polls every 1ms.
2568        let stop = Arc::new(AtomicBool::new(false));
2569        let stop_clone = stop.clone();
2570        let _normal_task = fasync::Task::spawn(async move {
2571            while !stop_clone.load(Ordering::Relaxed) {
2572                fasync::Timer::new(fasync::MonotonicInstant::after(
2573                    MonotonicDuration::from_millis(1),
2574                ))
2575                .await;
2576            }
2577        });
2578
2579        // Trigger journal compaction.
2580        // We can do this by writing more data until outstanding > reclaim_size / 2.
2581        {
2582            let root_store = fs.root_store();
2583            let root_directory =
2584                Directory::open(&root_store, root_store.root_directory_object_id())
2585                    .await
2586                    .expect("open failed");
2587            let mut i = 0;
2588            loop {
2589                let mut transaction = fs
2590                    .clone()
2591                    .new_transaction(
2592                        lock_keys![LockKey::object(
2593                            root_store.store_object_id(),
2594                            root_store.root_directory_object_id(),
2595                        )],
2596                        Options::default(),
2597                    )
2598                    .await
2599                    .expect("new_transaction failed");
2600                root_directory
2601                    .create_child_file(&mut transaction, &format!("trigger{i}"))
2602                    .await
2603                    .expect("create_child_file failed");
2604                transaction.commit().await.expect("commit failed");
2605
2606                if fs.journal().inner.lock().compaction_running {
2607                    break;
2608                }
2609                TestExecutor::advance_to(fasync::MonotonicInstant::after(
2610                    MonotonicDuration::from_millis(1),
2611                ))
2612                .await;
2613                i += 1;
2614            }
2615        }
2616
2617        // Compaction should now be running. Because of our 1ms poller, it should be yielding.
2618        // It will yield for 16ms at each point.
2619        for _ in 0..10 {
2620            TestExecutor::advance_to(fasync::MonotonicInstant::after(
2621                MonotonicDuration::from_millis(1),
2622            ))
2623            .await;
2624            assert!(fs.journal().inner.lock().compaction_running);
2625        }
2626
2627        // Stop the normal task.
2628        stop.store(true, Ordering::Relaxed);
2629        TestExecutor::advance_to(fasync::MonotonicInstant::after(MonotonicDuration::from_millis(
2630            1,
2631        )))
2632        .await;
2633
2634        // Compaction should still be running because it hasn't been 4 ms since the normal task
2635        // finished.
2636        assert!(fs.journal().inner.lock().compaction_running);
2637
2638        // For the next 3ms, compaction should still be running.
2639        for _ in 0..3 {
2640            TestExecutor::advance_to(fasync::MonotonicInstant::after(
2641                MonotonicDuration::from_millis(1),
2642            ))
2643            .await;
2644            assert!(fs.journal().inner.lock().compaction_running);
2645        }
2646
2647        // 1 more ms and compaction should be unblocked.
2648        TestExecutor::advance_to(fasync::MonotonicInstant::after(MonotonicDuration::from_millis(
2649            1,
2650        )))
2651        .await;
2652
2653        // When the executor next stalls, compaction should be done.
2654        let _ = TestExecutor::poll_until_stalled(std::future::pending::<()>()).await;
2655        assert!(!fs.journal().inner.lock().compaction_running);
2656
2657        fs.close().await.expect("Close failed");
2658    }
2659
2660    #[fuchsia::test(allow_stalls = false)]
2661    #[cfg(target_os = "fuchsia")]
2662    async fn test_low_priority_compaction_deadline() {
2663        use fuchsia_async::TestExecutor;
2664        use std::sync::Arc;
2665        use std::sync::atomic::{AtomicBool, Ordering};
2666
2667        let device = DeviceHolder::new(FakeDevice::new(216384, TEST_DEVICE_BLOCK_SIZE));
2668        let fs = FxFilesystemBuilder::new()
2669            .journal_options(super::JournalOptions { reclaim_size: 65536, ..Default::default() })
2670            .format(true)
2671            .open(device)
2672            .await
2673            .expect("open failed");
2674
2675        let _low = fasync::LowPriorityTask::new();
2676
2677        // Add some data to the tree.
2678        {
2679            let root_store = fs.root_store();
2680            let root_directory =
2681                Directory::open(&root_store, root_store.root_directory_object_id())
2682                    .await
2683                    .expect("open failed");
2684            for i in 0..10 {
2685                let mut transaction = fs
2686                    .clone()
2687                    .new_transaction(
2688                        lock_keys![LockKey::object(
2689                            root_store.store_object_id(),
2690                            root_store.root_directory_object_id(),
2691                        )],
2692                        Options::default(),
2693                    )
2694                    .await
2695                    .expect("new_transaction failed");
2696                root_directory
2697                    .create_child_file(&mut transaction, &format!("test{}", i))
2698                    .await
2699                    .expect("create_child_file failed");
2700                transaction.commit().await.expect("commit failed");
2701            }
2702        }
2703
2704        // Spawn a task that polls every 3ms.
2705        let stop = Arc::new(AtomicBool::new(false));
2706        let stop_clone = stop.clone();
2707        let _normal_task = fasync::Task::spawn(async move {
2708            while !stop_clone.load(Ordering::Relaxed) {
2709                fasync::Timer::new(fasync::MonotonicInstant::after(
2710                    MonotonicDuration::from_millis(3),
2711                ))
2712                .await;
2713            }
2714        });
2715
2716        // Trigger journal compaction.
2717        {
2718            let root_store = fs.root_store();
2719            let root_directory =
2720                Directory::open(&root_store, root_store.root_directory_object_id())
2721                    .await
2722                    .expect("open failed");
2723            let mut i = 0;
2724            loop {
2725                let mut transaction = fs
2726                    .clone()
2727                    .new_transaction(
2728                        lock_keys![LockKey::object(
2729                            root_store.store_object_id(),
2730                            root_store.root_directory_object_id(),
2731                        )],
2732                        Options::default(),
2733                    )
2734                    .await
2735                    .expect("new_transaction failed");
2736                root_directory
2737                    .create_child_file(&mut transaction, &format!("trigger{i}"))
2738                    .await
2739                    .expect("create_child_file failed");
2740                transaction.commit().await.expect("commit failed");
2741
2742                if fs.journal().inner.lock().compaction_running {
2743                    break;
2744                }
2745                TestExecutor::advance_to(fasync::MonotonicInstant::after(
2746                    MonotonicDuration::from_millis(1),
2747                ))
2748                .await;
2749                i += 1;
2750            }
2751        }
2752
2753        // Advance time in 20ms increments. Each increment should allow compaction to make progress
2754        // on one item (since MAX_YIELD_DURATION is 16ms).
2755        let mut count = 0;
2756        for _ in 0..1000 {
2757            TestExecutor::advance_to(fasync::MonotonicInstant::after(
2758                MonotonicDuration::from_millis(20),
2759            ))
2760            .await;
2761            if !fs.journal().inner.lock().compaction_running {
2762                break;
2763            }
2764            count += 1;
2765        }
2766        assert!(!fs.journal().inner.lock().compaction_running);
2767
2768        // Make sure it took a few iterations to complete.  It's difficult to know what the exact
2769        // number should be.
2770        assert!(count > 200);
2771
2772        stop.store(true, Ordering::Relaxed);
2773        fs.close().await.expect("Close failed");
2774    }
2775
2776    #[fuchsia::test(allow_stalls = false)]
2777    #[cfg(target_os = "fuchsia")]
2778    async fn test_low_priority_compaction_no_yielding_when_full() {
2779        use fuchsia_async::TestExecutor;
2780        use std::sync::Arc;
2781        use std::sync::atomic::{AtomicBool, Ordering};
2782
2783        let reclaim_size = 65536;
2784        let device = DeviceHolder::new(FakeDevice::new(216384, TEST_DEVICE_BLOCK_SIZE));
2785        let fs = FxFilesystemBuilder::new()
2786            .journal_options(super::JournalOptions { reclaim_size, ..Default::default() })
2787            .format(true)
2788            .open(device)
2789            .await
2790            .expect("open failed");
2791
2792        let _low = fasync::LowPriorityTask::new();
2793
2794        // Add some data to the tree.
2795        {
2796            let root_store = fs.root_store();
2797            let root_directory =
2798                Directory::open(&root_store, root_store.root_directory_object_id())
2799                    .await
2800                    .expect("open failed");
2801            for i in 0..10 {
2802                let mut transaction = fs
2803                    .clone()
2804                    .new_transaction(
2805                        lock_keys![LockKey::object(
2806                            root_store.store_object_id(),
2807                            root_store.root_directory_object_id(),
2808                        )],
2809                        Options::default(),
2810                    )
2811                    .await
2812                    .expect("new_transaction failed");
2813                root_directory
2814                    .create_child_file(&mut transaction, &format!("test{}", i))
2815                    .await
2816                    .expect("create_child_file failed");
2817                transaction.commit().await.expect("commit failed");
2818            }
2819        }
2820
2821        // Spawn a task that polls every 3ms.
2822        let stop = Arc::new(AtomicBool::new(false));
2823        let stop_clone = stop.clone();
2824        let _normal_task = fasync::Task::spawn(async move {
2825            while !stop_clone.load(Ordering::Relaxed) {
2826                fasync::Timer::new(fasync::MonotonicInstant::after(
2827                    MonotonicDuration::from_millis(3),
2828                ))
2829                .await;
2830            }
2831        });
2832
2833        // Trigger journal compaction, but this time we fill it up to 3/4 full.
2834        {
2835            let root_store = fs.root_store();
2836            let root_directory =
2837                Directory::open(&root_store, root_store.root_directory_object_id())
2838                    .await
2839                    .expect("open failed");
2840            let mut i = 0;
2841            loop {
2842                let mut transaction = fs
2843                    .clone()
2844                    .new_transaction(
2845                        lock_keys![LockKey::object(
2846                            root_store.store_object_id(),
2847                            root_store.root_directory_object_id(),
2848                        )],
2849                        Options::default(),
2850                    )
2851                    .await
2852                    .expect("new_transaction failed");
2853                root_directory
2854                    .create_child_file(&mut transaction, &format!("trigger{i}"))
2855                    .await
2856                    .expect("create_child_file failed");
2857                transaction.commit().await.expect("commit failed");
2858
2859                let outstanding = {
2860                    let inner = fs.journal().inner.lock();
2861                    fs.journal().objects.last_end_offset()
2862                        - inner.super_block_header.journal_checkpoint.file_offset
2863                };
2864                if outstanding >= reclaim_size * 7 / 8 {
2865                    break;
2866                }
2867                // We don't advance time here to try and reach 3/4 before compaction can yield.
2868                i += 1;
2869            }
2870        }
2871
2872        // Advancing by 4ms should be enough to wake compaction up.
2873        TestExecutor::advance_to(fasync::MonotonicInstant::after(MonotonicDuration::from_millis(
2874            4,
2875        )))
2876        .await;
2877
2878        // When the executor next stalls, compaction should be done.
2879        let _ = TestExecutor::poll_until_stalled(std::future::pending::<()>()).await;
2880        assert!(!fs.journal().inner.lock().compaction_running);
2881
2882        stop.store(true, Ordering::Relaxed);
2883        fs.close().await.expect("Close failed");
2884    }
2885}
2886
2887#[cfg(fuzz)]
2888mod fuzz {
2889    use fuzz::fuzz;
2890
2891    #[fuzz]
2892    fn fuzz_journal_bytes(input: Vec<u8>) {
2893        use crate::filesystem::FxFilesystem;
2894        use fuchsia_async as fasync;
2895        use std::io::Write;
2896        use storage_device::DeviceHolder;
2897        use storage_device::fake_device::FakeDevice;
2898
2899        fasync::SendExecutorBuilder::new().num_threads(4).build().run(async move {
2900            let device = DeviceHolder::new(FakeDevice::new(32768, 512));
2901            let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2902            fs.journal().inner.lock().writer.write_all(&input).expect("write failed");
2903            fs.close().await.expect("close failed");
2904            let device = fs.take_device().await;
2905            device.reopen(false);
2906            if let Ok(fs) = FxFilesystem::open(device).await {
2907                // `close()` can fail if there were objects to be tombstoned. If the said object is
2908                // corrupted, there will be an error when we compact the journal.
2909                let _ = fs.close().await;
2910            }
2911        });
2912    }
2913
2914    #[fuzz]
2915    fn fuzz_journal(input: Vec<super::JournalRecord>) {
2916        use crate::filesystem::FxFilesystem;
2917        use fuchsia_async as fasync;
2918        use storage_device::DeviceHolder;
2919        use storage_device::fake_device::FakeDevice;
2920
2921        fasync::SendExecutorBuilder::new().num_threads(4).build().run(async move {
2922            let device = DeviceHolder::new(FakeDevice::new(32768, 512));
2923            let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2924            {
2925                let mut inner = fs.journal().inner.lock();
2926                for record in &input {
2927                    let _ = inner.writer.write_record(record);
2928                }
2929            }
2930            fs.close().await.expect("close failed");
2931            let device = fs.take_device().await;
2932            device.reopen(false);
2933            if let Ok(fs) = FxFilesystem::open(device).await {
2934                // `close()` can fail if there were objects to be tombstoned. If the said object is
2935                // corrupted, there will be an error when we compact the journal.
2936                let _ = fs.close().await;
2937            }
2938        });
2939    }
2940}