Skip to main content

fxfs/object_store/
journal.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! The journal is implemented as an ever extending file which contains variable length records
6//! that describe mutations to be applied to various objects.  The journal file consists of
7//! blocks, with a checksum at the end of each block, but otherwise it can be considered a
8//! continuous stream.
9//!
10//! The checksum is seeded with the checksum from the previous block.  To free space in the
11//! journal, records are replaced with sparse extents when it is known they are no longer
12//! needed to mount.
13//!
14//! At mount time, the journal is replayed: the mutations are applied into memory.
15//! Eventually, a checksum failure will indicate no more records exist to be replayed,
16//! at which point the mount can continue and the journal will be extended from that point with
17//! further mutations as required.
18
19mod bootstrap_handle;
20mod checksum_list;
21mod reader;
22pub mod super_block;
23mod writer;
24
25use crate::checksum::{Checksum, Checksums, ChecksumsV38};
26use crate::errors::FxfsError;
27use crate::filesystem::{ApplyContext, ApplyMode, FxFilesystem, SyncOptions};
28use crate::log::*;
29use crate::lsm_tree::cache::NullCache;
30use crate::lsm_tree::types::Layer;
31use crate::object_handle::{ObjectHandle as _, ReadObjectHandle};
32use crate::object_store::allocator::Allocator;
33use crate::object_store::data_object_handle::OverwriteOptions;
34use crate::object_store::extent_record::{
35    DEFAULT_DATA_ATTRIBUTE_ID, ExtentKey, ExtentMode, ExtentValue,
36};
37use crate::object_store::graveyard::Graveyard;
38use crate::object_store::journal::bootstrap_handle::BootstrapObjectHandle;
39use crate::object_store::journal::checksum_list::ChecksumList;
40use crate::object_store::journal::reader::{JournalReader, ReadResult};
41use crate::object_store::journal::super_block::{
42    SuperBlockHeader, SuperBlockInstance, SuperBlockManager,
43};
44use crate::object_store::journal::writer::JournalWriter;
45use crate::object_store::object_manager::ObjectManager;
46use crate::object_store::object_record::{AttributeKey, ObjectKey, ObjectKeyData, ObjectValue};
47use crate::object_store::transaction::{
48    AllocatorMutation, LockKey, Mutation, MutationV40, MutationV41, MutationV43, MutationV46,
49    MutationV47, MutationV49, MutationV50, MutationV54, MutationV55, ObjectStoreMutation, Options,
50    TRANSACTION_MAX_JOURNAL_USAGE, Transaction, TxnMutation, lock_keys,
51};
52use crate::object_store::{
53    AssocObj, DataObjectHandle, HandleOptions, HandleOwner, INVALID_OBJECT_ID, Item, ItemRef,
54    NewChildStoreOptions, ObjectStore, ReservedId,
55};
56use crate::range::RangeExt;
57use crate::round::{round_div, round_down};
58use crate::serialized_types::{
59    LATEST_VERSION, Migrate, Version, Versioned, migrate_nodefault, migrate_to_version,
60};
61use anyhow::{Context, Error, anyhow, bail, ensure};
62use event_listener::Event;
63use fprint::TypeFingerprint;
64use fuchsia_inspect::NumericProperty;
65use fuchsia_sync::Mutex;
66use futures::FutureExt as _;
67use futures::future::poll_fn;
68use rustc_hash::FxHashMap as HashMap;
69use serde::{Deserialize, Serialize};
70use static_assertions::const_assert;
71use std::clone::Clone;
72use std::collections::HashSet;
73use std::num::NonZero;
74use std::ops::{Bound, Range};
75use std::sync::atomic::{AtomicBool, Ordering};
76use std::sync::{Arc, OnceLock};
77use std::task::{Poll, Waker};
78use storage_device::Device;
79
80// The journal file is written to in blocks of this size.
81pub const BLOCK_SIZE: u64 = 4096;
82
83// The journal file is extended by this amount when necessary.
84const CHUNK_SIZE: u64 = 131_072;
85const_assert!(CHUNK_SIZE > TRANSACTION_MAX_JOURNAL_USAGE);
86
87// See the comment for the `reclaim_size` member of Inner.
88pub const DEFAULT_RECLAIM_SIZE: u64 = 524_288;
89
90// Temporary space that should be reserved for the journal.  For example: space that is currently
91// used in the journal file but cannot be deallocated yet because we are flushing.
92pub const RESERVED_SPACE: u64 = 1_048_576;
93
94// Whenever the journal is replayed (i.e. the system is unmounted and remounted), we reset the
95// journal stream, at which point any half-complete transactions are discarded.  We indicate a
96// journal reset by XORing the previous block's checksum with this mask, and using that value as a
97// seed for the next journal block.
98const RESET_XOR: u64 = 0xffffffffffffffff;
99
100// To keep track of offsets within a journal file, we need both the file offset and the check-sum of
101// the preceding block, since the check-sum of the preceding block is an input to the check-sum of
102// every block.
103pub type JournalCheckpoint = JournalCheckpointV32;
104
105#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize, TypeFingerprint)]
106pub struct JournalCheckpointV32 {
107    pub file_offset: u64,
108
109    // Starting check-sum for block that contains file_offset i.e. the checksum for the previous
110    // block.
111    pub checksum: Checksum,
112
113    // If versioned, the version of elements stored in the journal. e.g. JournalRecord version.
114    // This can change across reset events so we store it along with the offset and checksum to
115    // know which version to deserialize.
116    pub version: Version,
117}
118
119pub type JournalRecord = JournalRecordV55;
120
121#[allow(clippy::large_enum_variant)]
122#[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint, Versioned)]
123#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
124pub enum JournalRecordV55 {
125    EndBlock,
126    Mutation {
127        object_id: u64,
128        mutation: MutationV55,
129    },
130    /// Commits records in the transaction.
131    Commit,
132    /// Discard all mutations with offsets greater than or equal to the given offset.
133    Discard(u64),
134    /// Indicates the device was flushed at the given journal offset.
135    /// Note that this really means that at this point in the journal offset, we can be certain that
136    /// there's no remaining buffered data in the block device; the buffers and the disk contents are
137    /// consistent.
138    /// We insert one of these records *after* a flush along with the *next* transaction to go
139    /// through.  If that never comes (either due to graceful or hard shutdown), the journal reset
140    /// on the next mount will serve the same purpose and count as a flush, although it is necessary
141    /// to defensively flush the device before replaying the journal (if possible, i.e. not
142    /// read-only) in case the block device connection was reused.
143    DidFlushDevice(u64),
144    /// Checksums for a data range written by this transaction. A transaction is only valid if these
145    /// checksums are right. The range is the device offset the checksums are for.
146    ///
147    /// A boolean indicates whether this range is being written to for the first time. For overwrite
148    /// extents, we only check the checksums for a block if it has been written to for the first
149    /// time since the last flush, because otherwise we can't roll it back anyway so it doesn't
150    /// matter. For copy-on-write extents, the bool is always true.
151    DataChecksums(Range<u64>, crate::checksum::ChecksumsV38, bool),
152}
153
154#[allow(clippy::large_enum_variant)]
155#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
156#[migrate_to_version(JournalRecordV55)]
157#[migrate_nodefault]
158pub enum JournalRecordV54 {
159    EndBlock,
160    Mutation { object_id: u64, mutation: MutationV54 },
161    Commit,
162    Discard(u64),
163    DidFlushDevice(u64),
164    DataChecksums(Range<u64>, crate::checksum::ChecksumsV38, bool),
165}
166
167#[allow(clippy::large_enum_variant)]
168#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
169#[migrate_to_version(JournalRecordV54)]
170#[migrate_nodefault]
171pub enum JournalRecordV50 {
172    EndBlock,
173    Mutation { object_id: u64, mutation: MutationV50 },
174    Commit,
175    Discard(u64),
176    DidFlushDevice(u64),
177    DataChecksums(Range<u64>, ChecksumsV38, bool),
178}
179
180#[allow(clippy::large_enum_variant)]
181#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
182#[migrate_to_version(JournalRecordV50)]
183pub enum JournalRecordV49 {
184    EndBlock,
185    Mutation { object_id: u64, mutation: MutationV49 },
186    Commit,
187    Discard(u64),
188    DidFlushDevice(u64),
189    DataChecksums(Range<u64>, ChecksumsV38, bool),
190}
191
192#[allow(clippy::large_enum_variant)]
193#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
194#[migrate_to_version(JournalRecordV49)]
195pub enum JournalRecordV47 {
196    EndBlock,
197    Mutation { object_id: u64, mutation: MutationV47 },
198    Commit,
199    Discard(u64),
200    DidFlushDevice(u64),
201    DataChecksums(Range<u64>, ChecksumsV38, bool),
202}
203
204#[allow(clippy::large_enum_variant)]
205#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
206#[migrate_to_version(JournalRecordV47)]
207pub enum JournalRecordV46 {
208    EndBlock,
209    Mutation { object_id: u64, mutation: MutationV46 },
210    Commit,
211    Discard(u64),
212    DidFlushDevice(u64),
213    DataChecksums(Range<u64>, ChecksumsV38, bool),
214}
215
216#[allow(clippy::large_enum_variant)]
217#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
218#[migrate_to_version(JournalRecordV46)]
219pub enum JournalRecordV43 {
220    EndBlock,
221    Mutation { object_id: u64, mutation: MutationV43 },
222    Commit,
223    Discard(u64),
224    DidFlushDevice(u64),
225    DataChecksums(Range<u64>, ChecksumsV38, bool),
226}
227
228#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
229#[migrate_to_version(JournalRecordV43)]
230pub enum JournalRecordV42 {
231    EndBlock,
232    Mutation { object_id: u64, mutation: MutationV41 },
233    Commit,
234    Discard(u64),
235    DidFlushDevice(u64),
236    DataChecksums(Range<u64>, ChecksumsV38, bool),
237}
238
239#[derive(Serialize, Deserialize, TypeFingerprint, Versioned)]
240pub enum JournalRecordV41 {
241    EndBlock,
242    Mutation { object_id: u64, mutation: MutationV41 },
243    Commit,
244    Discard(u64),
245    DidFlushDevice(u64),
246    DataChecksums(Range<u64>, ChecksumsV38),
247}
248
249impl From<JournalRecordV41> for JournalRecordV42 {
250    fn from(record: JournalRecordV41) -> Self {
251        match record {
252            JournalRecordV41::EndBlock => Self::EndBlock,
253            JournalRecordV41::Mutation { object_id, mutation } => {
254                Self::Mutation { object_id, mutation: mutation.into() }
255            }
256            JournalRecordV41::Commit => Self::Commit,
257            JournalRecordV41::Discard(offset) => Self::Discard(offset),
258            JournalRecordV41::DidFlushDevice(offset) => Self::DidFlushDevice(offset),
259            JournalRecordV41::DataChecksums(range, sums) => {
260                // At the time of writing the only extents written by real systems are CoW extents
261                // so the new bool is always true.
262                Self::DataChecksums(range, sums, true)
263            }
264        }
265    }
266}
267
268#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
269#[migrate_to_version(JournalRecordV41)]
270pub enum JournalRecordV40 {
271    EndBlock,
272    Mutation { object_id: u64, mutation: MutationV40 },
273    Commit,
274    Discard(u64),
275    DidFlushDevice(u64),
276    DataChecksums(Range<u64>, ChecksumsV38),
277}
278
279pub(super) fn journal_handle_options() -> HandleOptions {
280    HandleOptions { skip_journal_checks: true, ..Default::default() }
281}
282
283/// The journal records a stream of mutations that are to be applied to other objects.  At mount
284/// time, these records can be replayed into memory.  It provides a way to quickly persist changes
285/// without having to make a large number of writes; they can be deferred to a later time (e.g.
286/// when a sufficient number have been queued).  It also provides support for transactions, the
287/// ability to have mutations that are to be applied atomically together.
288pub struct Journal {
289    objects: Arc<ObjectManager>,
290    handle: OnceLock<DataObjectHandle<ObjectStore>>,
291    super_block_manager: SuperBlockManager,
292    inner: Mutex<Inner>,
293    writer_mutex: Mutex<()>,
294    sync_mutex: futures::lock::Mutex<()>,
295    trace: AtomicBool,
296
297    // This event is used when we are waiting for a compaction to free up journal space.
298    reclaim_event: Event,
299}
300
301struct Inner {
302    super_block_header: SuperBlockHeader,
303
304    // The offset that we can zero the journal up to now that it is no longer needed.
305    zero_offset: Option<u64>,
306
307    // The journal offset that we most recently flushed to the device.
308    device_flushed_offset: u64,
309
310    // If true, indicates a DidFlushDevice record is pending.
311    needs_did_flush_device: bool,
312
313    // The writer for the journal.
314    writer: JournalWriter,
315
316    // Set when a reset is encountered during a read.
317    // Used at write pre_commit() time to ensure we write a version first thing after a reset.
318    output_reset_version: bool,
319
320    // Waker for the flush task.
321    flush_waker: Option<Waker>,
322
323    // Indicates the journal has been terminated.
324    terminate: bool,
325
326    // Latched error indicating reason for journal termination if not graceful.
327    terminate_reason: Option<Error>,
328
329    // Disable compactions.
330    disable_compactions: bool,
331
332    // True if compactions are running.
333    compaction_running: bool,
334
335    // Waker for the sync task for when it's waiting for the flush task to finish.
336    sync_waker: Option<Waker>,
337
338    // The last offset we flushed to the journal file.
339    flushed_offset: u64,
340
341    // The last offset that should be considered valid in the journal file.  Most of the time, this
342    // will be the same as `flushed_offset` but at mount time, this could be less and will only be
343    // up to the end of the last valid transaction; it won't include transactions that follow that
344    // have been discarded.
345    valid_to: u64,
346
347    // If, after replaying, we have to discard a number of mutations (because they don't validate),
348    // this offset specifies where we need to discard back to.  This is so that when we next replay,
349    // we ignore those mutations and continue with new good mutations.
350    discard_offset: Option<u64>,
351
352    // In the steady state, the journal should fluctuate between being approximately half of this
353    // number and this number.  New super-blocks will be written every time about half of this
354    // amount is written to the journal.
355    reclaim_size: u64,
356
357    image_builder_mode: Option<SuperBlockInstance>,
358
359    // If true and `needs_barrier`, issue a pre-barrier on the first device write of each journal
360    // write (which happens in multiples of `BLOCK_SIZE`). This ensures that all the corresponding
361    // data writes make it to disk before the journal gets written to.
362    barriers_enabled: bool,
363
364    // If true, indicates that data write requests have been made to the device since the last
365    // journal write.
366    needs_barrier: bool,
367
368    // True if a compaction is being forced for reasons other than the journal being full.
369    forced_compaction: bool,
370}
371
372impl Inner {
373    fn terminate(&mut self, reason: Option<Error>) {
374        self.terminate = true;
375
376        if let Some(err) = reason {
377            error!(error:? = err; "Terminating journal");
378            // Log previous error if one was already set, otherwise latch the error.
379            if let Some(prev_err) = self.terminate_reason.as_ref() {
380                error!(error:? = prev_err; "Journal previously terminated");
381            } else {
382                self.terminate_reason = Some(err);
383            }
384        }
385
386        if let Some(waker) = self.flush_waker.take() {
387            waker.wake();
388        }
389        if let Some(waker) = self.sync_waker.take() {
390            waker.wake();
391        }
392    }
393}
394
395pub struct JournalOptions {
396    /// In the steady state, the journal should fluctuate between being approximately half of this
397    /// number and this number.  New super-blocks will be written every time about half of this
398    /// amount is written to the journal.
399    pub reclaim_size: u64,
400
401    // If true, issue a pre-barrier on the first device write of each journal write (which happens
402    // in multiples of `BLOCK_SIZE`). This ensures that all the corresponding data writes make it
403    // to disk before the journal gets written to.
404    pub barriers_enabled: bool,
405}
406
407impl Default for JournalOptions {
408    fn default() -> Self {
409        JournalOptions { reclaim_size: DEFAULT_RECLAIM_SIZE, barriers_enabled: false }
410    }
411}
412
413struct JournaledTransactions {
414    transactions: Vec<JournaledTransaction>,
415    device_flushed_offset: u64,
416}
417
418#[derive(Debug, Default)]
419pub struct JournaledTransaction {
420    pub checkpoint: JournalCheckpoint,
421    pub root_parent_mutations: Vec<Mutation>,
422    pub root_mutations: Vec<Mutation>,
423    /// List of (store_object_id, mutation).
424    pub non_root_mutations: Vec<(u64, Mutation)>,
425    pub end_offset: u64,
426    pub checksums: Vec<JournaledChecksums>,
427
428    /// Records offset + 1 of the matching begin_flush transaction. The +1 is because we want to
429    /// ignore the begin flush transaction; we don't need or want to replay it.
430    pub end_flush: Option<(/* store_id: */ u64, /* begin offset: */ u64)>,
431
432    /// The volume which was deleted in this transaction, if any.
433    pub volume_deleted: Option</* store_id: */ u64>,
434}
435
436impl JournaledTransaction {
437    fn new(checkpoint: JournalCheckpoint) -> Self {
438        Self { checkpoint, ..Default::default() }
439    }
440}
441
442const VOLUME_DELETED: u64 = u64::MAX;
443
444#[derive(Debug)]
445pub struct JournaledChecksums {
446    pub device_range: Range<u64>,
447    pub checksums: Checksums,
448    pub first_write: bool,
449}
450
451/// Handles for journal-like objects have some additional functionality to manage their extents,
452/// since during replay we need to add extents as we find them.
453pub trait JournalHandle: ReadObjectHandle {
454    /// The end offset of the last extent in the JournalHandle.  Used only for validating extents
455    /// (which will be skipped if None is returned).
456    /// Note this is equivalent in value to ReadObjectHandle::get_size, when present.
457    fn end_offset(&self) -> Option<u64>;
458    /// Adds an extent to the current end of the journal stream.
459    /// `added_offset` is the offset into the journal of the transaction which added this extent,
460    /// used for discard_extents.
461    fn push_extent(&mut self, added_offset: u64, device_range: Range<u64>);
462    /// Discards all extents which were added in a transaction at offset >= |discard_offset|.
463    fn discard_extents(&mut self, discard_offset: u64);
464}
465
466// Provide a stub implementation for DataObjectHandle so we can use it in
467// Journal::read_transactions.  Manual extent management is a NOP (which is OK since presumably the
468// DataObjectHandle already knows where its extents live).
469impl<S: HandleOwner> JournalHandle for DataObjectHandle<S> {
470    fn end_offset(&self) -> Option<u64> {
471        None
472    }
473    fn push_extent(&mut self, _added_offset: u64, _device_range: Range<u64>) {
474        // NOP
475    }
476    fn discard_extents(&mut self, _discard_offset: u64) {
477        // NOP
478    }
479}
480
481#[fxfs_trace::trace]
482impl Journal {
483    pub fn new(objects: Arc<ObjectManager>, options: JournalOptions) -> Journal {
484        let starting_checksum = rand::random_range(1..u64::MAX);
485        Journal {
486            objects: objects,
487            handle: OnceLock::new(),
488            super_block_manager: SuperBlockManager::new(),
489            inner: Mutex::new(Inner {
490                super_block_header: SuperBlockHeader::default(),
491                zero_offset: None,
492                device_flushed_offset: 0,
493                needs_did_flush_device: false,
494                writer: JournalWriter::new(BLOCK_SIZE as usize, starting_checksum),
495                output_reset_version: false,
496                flush_waker: None,
497                terminate: false,
498                terminate_reason: None,
499                disable_compactions: false,
500                compaction_running: false,
501                sync_waker: None,
502                flushed_offset: 0,
503                valid_to: 0,
504                discard_offset: None,
505                reclaim_size: options.reclaim_size,
506                image_builder_mode: None,
507                barriers_enabled: options.barriers_enabled,
508                needs_barrier: false,
509                forced_compaction: false,
510            }),
511            writer_mutex: Mutex::new(()),
512            sync_mutex: futures::lock::Mutex::new(()),
513            trace: AtomicBool::new(false),
514            reclaim_event: Event::new(),
515        }
516    }
517
518    pub fn set_trace(&self, trace: bool) {
519        let old_value = self.trace.swap(trace, Ordering::Relaxed);
520        if trace != old_value {
521            info!(trace; "J: trace");
522        }
523    }
524
525    pub fn set_image_builder_mode(&self, mode: Option<SuperBlockInstance>) {
526        self.inner.lock().image_builder_mode = mode;
527        if let Some(instance) = mode {
528            *self.super_block_manager.next_instance.lock() = instance;
529        }
530    }
531
532    pub fn image_builder_mode(&self) -> Option<SuperBlockInstance> {
533        self.inner.lock().image_builder_mode
534    }
535
536    #[cfg(feature = "migration")]
537    pub fn set_filesystem_uuid(&self, uuid: &[u8; 16]) -> Result<(), Error> {
538        ensure!(
539            self.inner.lock().image_builder_mode.is_some(),
540            "Can only set filesystem uuid in image builder mode."
541        );
542        self.inner.lock().super_block_header.guid.0 = uuid::Uuid::from_bytes(*uuid);
543        Ok(())
544    }
545
546    pub(crate) async fn read_superblocks(
547        &self,
548        device: Arc<dyn Device>,
549        block_size: u64,
550    ) -> Result<(SuperBlockHeader, ObjectStore), Error> {
551        self.super_block_manager.load(device, block_size).await
552    }
553
554    /// Used during replay to validate a mutation.  This should return false if the mutation is not
555    /// valid and should not be applied.  This could be for benign reasons: e.g. the device flushed
556    /// data out-of-order, or because of a malicious actor.
557    fn validate_mutation(&self, mutation: &Mutation, block_size: u64, device_size: u64) -> bool {
558        match mutation {
559            Mutation::ObjectStore(ObjectStoreMutation {
560                item:
561                    Item {
562                        key:
563                            ObjectKey {
564                                data:
565                                    ObjectKeyData::Attribute(
566                                        _,
567                                        AttributeKey::Extent(ExtentKey { range }),
568                                    ),
569                                ..
570                            },
571                        value: ObjectValue::Extent(ExtentValue::Some { device_offset, mode, .. }),
572                        ..
573                    },
574                ..
575            }) => {
576                if range.is_empty() || !range.is_aligned(block_size) {
577                    return false;
578                }
579                let len = range.length().unwrap();
580                if let ExtentMode::Cow(checksums) = mode {
581                    if checksums.len() > 0 {
582                        if len % checksums.len() as u64 != 0 {
583                            return false;
584                        }
585                        if (len / checksums.len() as u64) % block_size != 0 {
586                            return false;
587                        }
588                    }
589                }
590                if *device_offset % block_size != 0
591                    || *device_offset >= device_size
592                    || device_size - *device_offset < len
593                {
594                    return false;
595                }
596            }
597            Mutation::ObjectStore(_) => {}
598            Mutation::EncryptedObjectStore(_) => {}
599            Mutation::Allocator(AllocatorMutation::Allocate { device_range, owner_object_id }) => {
600                return !device_range.is_empty()
601                    && *owner_object_id != INVALID_OBJECT_ID
602                    && device_range.end <= device_size;
603            }
604            Mutation::Allocator(AllocatorMutation::Deallocate {
605                device_range,
606                owner_object_id,
607            }) => {
608                return !device_range.is_empty()
609                    && *owner_object_id != INVALID_OBJECT_ID
610                    && device_range.end <= device_size;
611            }
612            Mutation::Allocator(AllocatorMutation::MarkForDeletion(owner_object_id)) => {
613                return *owner_object_id != INVALID_OBJECT_ID;
614            }
615            Mutation::Allocator(AllocatorMutation::SetLimit { owner_object_id, .. }) => {
616                return *owner_object_id != INVALID_OBJECT_ID;
617            }
618            Mutation::BeginFlush => {}
619            Mutation::EndFlush => {}
620            Mutation::DeleteVolume => {}
621            Mutation::UpdateBorrowed(_) => {}
622            Mutation::UpdateMutationsKey(_) => {}
623            Mutation::CreateInternalDir(owner_object_id) => {
624                return *owner_object_id != INVALID_OBJECT_ID;
625            }
626        }
627        true
628    }
629
630    // Assumes that `mutation` has been validated.
631    fn update_checksum_list(
632        &self,
633        journal_offset: u64,
634        mutation: &Mutation,
635        checksum_list: &mut ChecksumList,
636    ) -> Result<(), Error> {
637        match mutation {
638            Mutation::ObjectStore(_) => {}
639            Mutation::Allocator(AllocatorMutation::Deallocate { device_range, .. }) => {
640                checksum_list.mark_deallocated(journal_offset, device_range.clone().into());
641            }
642            _ => {}
643        }
644        Ok(())
645    }
646
647    /// Reads the latest super-block, and then replays journaled records.
648    #[trace]
649    pub async fn replay(
650        &self,
651        filesystem: Arc<FxFilesystem>,
652        on_new_allocator: Option<Box<dyn Fn(Arc<Allocator>) + Send + Sync>>,
653    ) -> Result<(), Error> {
654        let block_size = filesystem.block_size();
655
656        let (super_block, root_parent) =
657            self.super_block_manager.load(filesystem.device(), block_size).await?;
658
659        let root_parent = Arc::new(ObjectStore::attach_filesystem(root_parent, filesystem.clone()));
660
661        self.objects.set_root_parent_store(root_parent.clone());
662        let allocator =
663            Arc::new(Allocator::new(filesystem.clone(), super_block.allocator_object_id));
664        if let Some(on_new_allocator) = on_new_allocator {
665            on_new_allocator(allocator.clone());
666        }
667        self.objects.set_allocator(allocator.clone());
668        self.objects.set_borrowed_metadata_space(super_block.borrowed_metadata_space);
669        self.objects.set_last_end_offset(super_block.super_block_journal_file_offset);
670        {
671            let mut inner = self.inner.lock();
672            inner.super_block_header = super_block.clone();
673        }
674
675        let device = filesystem.device();
676
677        let mut handle;
678        {
679            let root_parent_layer = root_parent.tree().mutable_layer();
680            let mut iter = root_parent_layer
681                .seek(Bound::Included(&ObjectKey::attribute(
682                    super_block.journal_object_id,
683                    DEFAULT_DATA_ATTRIBUTE_ID,
684                    AttributeKey::Extent(ExtentKey::search_key_from_offset(round_down(
685                        super_block.journal_checkpoint.file_offset,
686                        BLOCK_SIZE,
687                    ))),
688                )))
689                .await
690                .context("Failed to seek root parent store")?;
691            let start_offset = if let Some(ItemRef {
692                key:
693                    ObjectKey {
694                        data:
695                            ObjectKeyData::Attribute(
696                                DEFAULT_DATA_ATTRIBUTE_ID,
697                                AttributeKey::Extent(ExtentKey { range }),
698                            ),
699                        ..
700                    },
701                ..
702            }) = iter.get()
703            {
704                range.start
705            } else {
706                0
707            };
708            handle = BootstrapObjectHandle::new_with_start_offset(
709                super_block.journal_object_id,
710                device.clone(),
711                start_offset,
712            );
713            while let Some(item) = iter.get() {
714                if !match item.into() {
715                    Some((
716                        object_id,
717                        DEFAULT_DATA_ATTRIBUTE_ID,
718                        ExtentKey { range },
719                        ExtentValue::Some { device_offset, .. },
720                    )) if object_id == super_block.journal_object_id => {
721                        if let Some(end_offset) = handle.end_offset() {
722                            if range.start != end_offset {
723                                bail!(anyhow!(FxfsError::Inconsistent).context(format!(
724                                    "Unexpected journal extent {:?}, expected start: {}",
725                                    item, end_offset
726                                )));
727                            }
728                        }
729                        handle.push_extent(
730                            0, // We never discard extents from the root parent store.
731                            *device_offset
732                                ..*device_offset + range.length().context("Invalid extent")?,
733                        );
734                        true
735                    }
736                    _ => false,
737                } {
738                    break;
739                }
740                iter.advance().await.context("Failed to advance root parent store iterator")?;
741            }
742        }
743
744        let mut reader = JournalReader::new(handle, &super_block.journal_checkpoint);
745        let JournaledTransactions { mut transactions, device_flushed_offset } = self
746            .read_transactions(&mut reader, None, INVALID_OBJECT_ID)
747            .await
748            .context("Reading transactions for replay")?;
749
750        // Validate all the mutations.
751        let mut checksum_list = ChecksumList::new(device_flushed_offset);
752        let mut valid_to = reader.journal_file_checkpoint().file_offset;
753        let device_size = device.size();
754        'bad_replay: for JournaledTransaction {
755            checkpoint,
756            root_parent_mutations,
757            root_mutations,
758            non_root_mutations,
759            checksums,
760            ..
761        } in &transactions
762        {
763            for JournaledChecksums { device_range, checksums, first_write } in checksums {
764                checksum_list
765                    .push(
766                        checkpoint.file_offset,
767                        device_range.clone(),
768                        checksums.maybe_as_ref().context("Malformed checksums")?,
769                        *first_write,
770                    )
771                    .context("Pushing journal checksum records to checksum list")?;
772            }
773            for mutation in root_parent_mutations
774                .iter()
775                .chain(root_mutations)
776                .chain(non_root_mutations.iter().map(|(_, m)| m))
777            {
778                if !self.validate_mutation(mutation, block_size, device_size) {
779                    info!(mutation:?; "Stopping replay at bad mutation");
780                    valid_to = checkpoint.file_offset;
781                    break 'bad_replay;
782                }
783                self.update_checksum_list(checkpoint.file_offset, &mutation, &mut checksum_list)?;
784            }
785        }
786
787        // Validate the checksums. Note if barriers are enabled, there will be no checksums in
788        // practice to verify.
789        let valid_to = checksum_list
790            .verify(device.as_ref(), valid_to)
791            .await
792            .context("Failed to validate checksums")?;
793
794        // Apply the mutations...
795
796        let mut last_checkpoint = reader.journal_file_checkpoint();
797        let mut journal_offsets = super_block.journal_file_offsets.clone();
798
799        // Start with the root-parent mutations, and also determine the journal offsets for all
800        // other objects.
801        for (
802            index,
803            JournaledTransaction {
804                checkpoint,
805                root_parent_mutations,
806                end_flush,
807                volume_deleted,
808                ..
809            },
810        ) in transactions.iter_mut().enumerate()
811        {
812            if checkpoint.file_offset >= valid_to {
813                last_checkpoint = checkpoint.clone();
814
815                // Truncate the transactions so we don't need to worry about them on the next pass.
816                transactions.truncate(index);
817                break;
818            }
819
820            let context = ApplyContext { mode: ApplyMode::Replay, checkpoint: checkpoint.clone() };
821            for mutation in root_parent_mutations.drain(..) {
822                self.objects
823                    .apply_mutation(
824                        super_block.root_parent_store_object_id,
825                        mutation,
826                        &context,
827                        AssocObj::None,
828                    )
829                    .context("Failed to replay root parent store mutations")?;
830            }
831
832            if let Some((object_id, journal_offset)) = end_flush {
833                journal_offsets.insert(*object_id, *journal_offset);
834            }
835
836            if let Some(object_id) = volume_deleted {
837                journal_offsets.insert(*object_id, VOLUME_DELETED);
838            }
839        }
840
841        // Now we can open the root store.
842        let root_store = ObjectStore::open(
843            &root_parent,
844            super_block.root_store_object_id,
845            Box::new(NullCache {}),
846        )
847        .await
848        .context("Unable to open root store")?;
849
850        ensure!(
851            !root_store.is_encrypted(),
852            anyhow!(FxfsError::Inconsistent).context("Root store is encrypted")
853        );
854        self.objects.set_root_store(root_store);
855
856        let root_store_offset =
857            journal_offsets.get(&super_block.root_store_object_id).copied().unwrap_or(0);
858
859        // Now replay the root store mutations.
860        for JournaledTransaction { checkpoint, root_mutations, .. } in &mut transactions {
861            if checkpoint.file_offset < root_store_offset {
862                continue;
863            }
864
865            let context = ApplyContext { mode: ApplyMode::Replay, checkpoint: checkpoint.clone() };
866            for mutation in root_mutations.drain(..) {
867                self.objects
868                    .apply_mutation(
869                        super_block.root_store_object_id,
870                        mutation,
871                        &context,
872                        AssocObj::None,
873                    )
874                    .context("Failed to replay root store mutations")?;
875            }
876        }
877
878        // Now we can open the allocator.
879        allocator.open().await.context("Failed to open allocator")?;
880
881        // Now replay all other mutations.
882        for JournaledTransaction { checkpoint, non_root_mutations, end_offset, .. } in transactions
883        {
884            self.objects
885                .replay_mutations(
886                    non_root_mutations,
887                    &journal_offsets,
888                    &ApplyContext { mode: ApplyMode::Replay, checkpoint },
889                    end_offset,
890                )
891                .await
892                .context("Failed to replay mutations")?;
893        }
894
895        allocator.on_replay_complete().await.context("Failed to complete replay for allocator")?;
896
897        let discarded_to =
898            if last_checkpoint.file_offset != reader.journal_file_checkpoint().file_offset {
899                Some(reader.journal_file_checkpoint().file_offset)
900            } else {
901                None
902            };
903
904        // Configure the journal writer so that we can continue.
905        {
906            if last_checkpoint.file_offset < super_block.super_block_journal_file_offset {
907                return Err(anyhow!(FxfsError::Inconsistent).context(format!(
908                    "journal replay cut short; journal finishes at {}, but super-block was \
909                     written at {}",
910                    last_checkpoint.file_offset, super_block.super_block_journal_file_offset
911                )));
912            }
913            let handle = ObjectStore::open_object(
914                &root_parent,
915                super_block.journal_object_id,
916                journal_handle_options(),
917                None,
918            )
919            .await
920            .with_context(|| {
921                format!(
922                    "Failed to open journal file (object id: {})",
923                    super_block.journal_object_id
924                )
925            })?;
926            let _ = self.handle.set(handle);
927            let mut inner = self.inner.lock();
928            reader.skip_to_end_of_block();
929            let mut writer_checkpoint = reader.journal_file_checkpoint();
930
931            // Make sure we don't accidentally use the reader from now onwards.
932            std::mem::drop(reader);
933
934            // Reset the stream to indicate that we've remounted the journal.
935            writer_checkpoint.checksum ^= RESET_XOR;
936            writer_checkpoint.version = LATEST_VERSION;
937            inner.flushed_offset = writer_checkpoint.file_offset;
938
939            // When we open the filesystem as writable, we flush the device.
940            inner.device_flushed_offset = inner.flushed_offset;
941
942            inner.writer.seek(writer_checkpoint);
943            inner.output_reset_version = true;
944            inner.valid_to = last_checkpoint.file_offset;
945            if last_checkpoint.file_offset < inner.flushed_offset {
946                inner.discard_offset = Some(last_checkpoint.file_offset);
947            }
948        }
949
950        self.objects
951            .on_replay_complete()
952            .await
953            .context("Failed to complete replay for object manager")?;
954
955        info!(checkpoint = last_checkpoint.file_offset, discarded_to; "replay complete");
956        Ok(())
957    }
958
959    async fn read_transactions(
960        &self,
961        reader: &mut JournalReader,
962        end_offset: Option<u64>,
963        object_id_filter: u64,
964    ) -> Result<JournaledTransactions, Error> {
965        let mut transactions = Vec::new();
966        let (mut device_flushed_offset, root_parent_store_object_id, root_store_object_id) = {
967            let super_block = &self.inner.lock().super_block_header;
968            (
969                super_block.super_block_journal_file_offset,
970                super_block.root_parent_store_object_id,
971                super_block.root_store_object_id,
972            )
973        };
974        let mut current_transaction = None;
975        let mut begin_flush_offsets = HashMap::default();
976        let mut stores_deleted = HashSet::new();
977        loop {
978            // Cache the checkpoint before we deserialize a record.
979            let checkpoint = reader.journal_file_checkpoint();
980            if let Some(end_offset) = end_offset {
981                if checkpoint.file_offset >= end_offset {
982                    break;
983                }
984            }
985            let result =
986                reader.deserialize().await.context("Failed to deserialize journal record")?;
987            match result {
988                ReadResult::Reset(_) => {
989                    if current_transaction.is_some() {
990                        current_transaction = None;
991                        transactions.pop();
992                    }
993                    let offset = reader.journal_file_checkpoint().file_offset;
994                    if offset > device_flushed_offset {
995                        device_flushed_offset = offset;
996                    }
997                }
998                ReadResult::Some(record) => {
999                    match record {
1000                        JournalRecord::EndBlock => {
1001                            reader.skip_to_end_of_block();
1002                        }
1003                        JournalRecord::Mutation { object_id, mutation } => {
1004                            let current_transaction = match current_transaction.as_mut() {
1005                                None => {
1006                                    transactions.push(JournaledTransaction::new(checkpoint));
1007                                    current_transaction = transactions.last_mut();
1008                                    current_transaction.as_mut().unwrap()
1009                                }
1010                                Some(transaction) => transaction,
1011                            };
1012
1013                            if stores_deleted.contains(&object_id) {
1014                                bail!(
1015                                    anyhow!(FxfsError::Inconsistent)
1016                                        .context("Encountered mutations for deleted store")
1017                                );
1018                            }
1019
1020                            match &mutation {
1021                                Mutation::BeginFlush => {
1022                                    begin_flush_offsets.insert(
1023                                        object_id,
1024                                        current_transaction.checkpoint.file_offset,
1025                                    );
1026                                }
1027                                Mutation::EndFlush => {
1028                                    if let Some(offset) = begin_flush_offsets.remove(&object_id) {
1029                                        if let Some(deleted_volume) =
1030                                            &current_transaction.volume_deleted
1031                                        {
1032                                            if *deleted_volume == object_id {
1033                                                bail!(anyhow!(FxfsError::Inconsistent).context(
1034                                                    "Multiple EndFlush/DeleteVolume mutations in a \
1035                                                    single transaction for the same object"
1036                                                ));
1037                                            }
1038                                        }
1039                                        // The +1 is because we don't want to replay the transaction
1040                                        // containing the begin flush; we don't need or want to
1041                                        // replay it.
1042                                        if current_transaction
1043                                            .end_flush
1044                                            .replace((object_id, offset + 1))
1045                                            .is_some()
1046                                        {
1047                                            bail!(anyhow!(FxfsError::Inconsistent).context(
1048                                                "Multiple EndFlush mutations in a \
1049                                                 single transaction"
1050                                            ));
1051                                        }
1052                                    }
1053                                }
1054                                Mutation::DeleteVolume => {
1055                                    if let Some((flushed_object, _)) =
1056                                        &current_transaction.end_flush
1057                                    {
1058                                        if *flushed_object == object_id {
1059                                            bail!(anyhow!(FxfsError::Inconsistent).context(
1060                                                "Multiple EndFlush/DeleteVolume mutations in a \
1061                                                    single transaction for the same object"
1062                                            ));
1063                                        }
1064                                    }
1065                                    if current_transaction
1066                                        .volume_deleted
1067                                        .replace(object_id)
1068                                        .is_some()
1069                                    {
1070                                        bail!(anyhow!(FxfsError::Inconsistent).context(
1071                                            "Multiple DeleteVolume mutations in a single \
1072                                             transaction"
1073                                        ));
1074                                    }
1075                                    stores_deleted.insert(object_id);
1076                                }
1077                                _ => {}
1078                            }
1079
1080                            // If this mutation doesn't need to be applied, don't bother adding it
1081                            // to the transaction.
1082                            if (object_id_filter == INVALID_OBJECT_ID
1083                                || object_id_filter == object_id)
1084                                && self.should_apply(object_id, &current_transaction.checkpoint)
1085                            {
1086                                if object_id == root_parent_store_object_id {
1087                                    current_transaction.root_parent_mutations.push(mutation);
1088                                } else if object_id == root_store_object_id {
1089                                    current_transaction.root_mutations.push(mutation);
1090                                } else {
1091                                    current_transaction
1092                                        .non_root_mutations
1093                                        .push((object_id, mutation));
1094                                }
1095                            }
1096                        }
1097                        JournalRecord::DataChecksums(device_range, checksums, first_write) => {
1098                            let current_transaction = match current_transaction.as_mut() {
1099                                None => {
1100                                    transactions.push(JournaledTransaction::new(checkpoint));
1101                                    current_transaction = transactions.last_mut();
1102                                    current_transaction.as_mut().unwrap()
1103                                }
1104                                Some(transaction) => transaction,
1105                            };
1106                            current_transaction.checksums.push(JournaledChecksums {
1107                                device_range,
1108                                checksums,
1109                                first_write,
1110                            });
1111                        }
1112                        JournalRecord::Commit => {
1113                            if let Some(&mut JournaledTransaction {
1114                                ref checkpoint,
1115                                ref root_parent_mutations,
1116                                ref mut end_offset,
1117                                ..
1118                            }) = current_transaction.take()
1119                            {
1120                                for mutation in root_parent_mutations {
1121                                    // Snoop the mutations for any that might apply to the journal
1122                                    // file so that we can pass them to the reader so that it can
1123                                    // read the journal file.
1124                                    if let Mutation::ObjectStore(ObjectStoreMutation {
1125                                        item:
1126                                            Item {
1127                                                key:
1128                                                    ObjectKey {
1129                                                        object_id,
1130                                                        data:
1131                                                            ObjectKeyData::Attribute(
1132                                                                DEFAULT_DATA_ATTRIBUTE_ID,
1133                                                                AttributeKey::Extent(ExtentKey {
1134                                                                    range,
1135                                                                }),
1136                                                            ),
1137                                                        ..
1138                                                    },
1139                                                value:
1140                                                    ObjectValue::Extent(ExtentValue::Some {
1141                                                        device_offset,
1142                                                        ..
1143                                                    }),
1144                                                ..
1145                                            },
1146                                        ..
1147                                    }) = mutation
1148                                    {
1149                                        // Add the journal extents we find on the way to our
1150                                        // reader.
1151                                        let handle = reader.handle();
1152                                        if *object_id != handle.object_id() {
1153                                            continue;
1154                                        }
1155                                        if let Some(end_offset) = handle.end_offset() {
1156                                            if range.start != end_offset {
1157                                                bail!(anyhow!(FxfsError::Inconsistent).context(
1158                                                    format!(
1159                                                        "Unexpected journal extent {:?} -> {}, \
1160                                                           expected start: {}",
1161                                                        range, device_offset, end_offset,
1162                                                    )
1163                                                ));
1164                                            }
1165                                        }
1166                                        handle.push_extent(
1167                                            checkpoint.file_offset,
1168                                            *device_offset
1169                                                ..*device_offset
1170                                                    + range.length().context("Invalid extent")?,
1171                                        );
1172                                    }
1173                                }
1174                                *end_offset = reader.journal_file_checkpoint().file_offset;
1175                            }
1176                        }
1177                        JournalRecord::Discard(offset) => {
1178                            if offset == 0 {
1179                                bail!(
1180                                    anyhow!(FxfsError::Inconsistent)
1181                                        .context("Invalid offset for Discard")
1182                                );
1183                            }
1184                            if let Some(transaction) = current_transaction.as_ref() {
1185                                if transaction.checkpoint.file_offset < offset {
1186                                    // Odd, but OK.
1187                                    continue;
1188                                }
1189                            }
1190                            current_transaction = None;
1191                            while let Some(transaction) = transactions.last() {
1192                                if transaction.checkpoint.file_offset < offset {
1193                                    break;
1194                                }
1195                                transactions.pop();
1196                            }
1197                            reader.handle().discard_extents(offset);
1198                        }
1199                        JournalRecord::DidFlushDevice(offset) => {
1200                            if offset > device_flushed_offset {
1201                                device_flushed_offset = offset;
1202                            }
1203                        }
1204                    }
1205                }
1206                // This is expected when we reach the end of the journal stream.
1207                ReadResult::ChecksumMismatch => break,
1208            }
1209        }
1210
1211        // Discard any uncommitted transaction.
1212        if current_transaction.is_some() {
1213            transactions.pop();
1214        }
1215
1216        Ok(JournaledTransactions { transactions, device_flushed_offset })
1217    }
1218
1219    /// Creates an empty filesystem with the minimum viable objects (including a root parent and
1220    /// root store but no further child stores).
1221    pub async fn init_empty(&self, filesystem: Arc<FxFilesystem>) -> Result<(), Error> {
1222        // The following constants are only used at format time. When mounting, the recorded values
1223        // in the superblock should be used.  The root parent store does not have a parent, but
1224        // needs an object ID to be registered with ObjectManager, so it cannot collide (i.e. have
1225        // the same object ID) with any objects in the root store that use the journal to track
1226        // mutations.
1227        const INIT_ROOT_PARENT_STORE_OBJECT_ID: u64 = 3;
1228        const INIT_ROOT_STORE_OBJECT_ID: u64 = 4;
1229        const INIT_ALLOCATOR_OBJECT_ID: u64 = 5;
1230
1231        info!(device_size = filesystem.device().size(); "Formatting");
1232
1233        let checkpoint = JournalCheckpoint {
1234            version: LATEST_VERSION,
1235            ..self.inner.lock().writer.journal_file_checkpoint()
1236        };
1237
1238        let mut current_generation = 1;
1239        if filesystem.options().image_builder_mode.is_some() {
1240            // Note that in non-image_builder_mode we write both superblocks when we format
1241            // (in FxFilesystemBuilder::open). In image_builder_mode we only write once at the end
1242            // as part of finalize(), which is why we must make sure the generation we write is
1243            // newer than any existing generation.
1244
1245            // Note: This should is the *filesystem* block size, not the device block size which
1246            // is currently always 4096 (https://fxbug.dev/42063349)
1247            let block_size = filesystem.block_size();
1248            match self.read_superblocks(filesystem.device(), block_size).await {
1249                Ok((super_block, _)) => {
1250                    log::info!(
1251                        "Found existing superblock with generation {}. Bumping by 1.",
1252                        super_block.generation
1253                    );
1254                    current_generation = super_block.generation.wrapping_add(1);
1255                }
1256                Err(_) => {
1257                    // TODO(https://fxbug.dev/463757813): It's not unusual to fail to read
1258                    // superblocks when we're formatting a new filesystem but we should probably
1259                    // fail the format if we get an IO error.
1260                }
1261            }
1262        }
1263
1264        let root_parent = ObjectStore::new_empty(
1265            None,
1266            INIT_ROOT_PARENT_STORE_OBJECT_ID,
1267            filesystem.clone(),
1268            Box::new(NullCache {}),
1269        );
1270        self.objects.set_root_parent_store(root_parent.clone());
1271
1272        let allocator = Arc::new(Allocator::new(filesystem.clone(), INIT_ALLOCATOR_OBJECT_ID));
1273        self.objects.set_allocator(allocator.clone());
1274        self.objects.init_metadata_reservation()?;
1275
1276        let journal_handle;
1277        let super_block_a_handle;
1278        let super_block_b_handle;
1279        let root_store;
1280        let mut transaction = filesystem
1281            .clone()
1282            .new_transaction(
1283                lock_keys![],
1284                Options { skip_journal_checks: true, ..Default::default() },
1285            )
1286            .await?;
1287        root_store = root_parent
1288            .new_child_store(
1289                &mut transaction,
1290                NewChildStoreOptions { object_id: INIT_ROOT_STORE_OBJECT_ID, ..Default::default() },
1291                Box::new(NullCache {}),
1292            )
1293            .await
1294            .context("new_child_store")?;
1295        self.objects.set_root_store(root_store.clone());
1296
1297        allocator.create(&mut transaction).await?;
1298
1299        // Create the super-block objects...
1300        super_block_a_handle = ObjectStore::create_object_with_id(
1301            &root_store,
1302            &mut transaction,
1303            ReservedId::new(&root_store, NonZero::new(SuperBlockInstance::A.object_id()).unwrap()),
1304            HandleOptions::default(),
1305            None,
1306        )
1307        .context("create super block")?;
1308        root_store.update_last_object_id(SuperBlockInstance::A.object_id());
1309        super_block_a_handle
1310            .extend(&mut transaction, SuperBlockInstance::A.first_extent())
1311            .await
1312            .context("extend super block")?;
1313        super_block_b_handle = ObjectStore::create_object_with_id(
1314            &root_store,
1315            &mut transaction,
1316            ReservedId::new(&root_store, NonZero::new(SuperBlockInstance::B.object_id()).unwrap()),
1317            HandleOptions::default(),
1318            None,
1319        )
1320        .context("create super block")?;
1321        root_store.update_last_object_id(SuperBlockInstance::B.object_id());
1322        super_block_b_handle
1323            .extend(&mut transaction, SuperBlockInstance::B.first_extent())
1324            .await
1325            .context("extend super block")?;
1326
1327        // the journal object...
1328        journal_handle = ObjectStore::create_object(
1329            &root_parent,
1330            &mut transaction,
1331            journal_handle_options(),
1332            None,
1333        )
1334        .await
1335        .context("create journal")?;
1336        if self.inner.lock().image_builder_mode.is_none() {
1337            let mut file_range = 0..self.chunk_size();
1338            journal_handle
1339                .preallocate_range(&mut transaction, &mut file_range)
1340                .await
1341                .context("preallocate journal")?;
1342            if file_range.start < file_range.end {
1343                bail!("preallocate_range returned too little space");
1344            }
1345        }
1346
1347        // Write the root store object info.
1348        root_store.create(&mut transaction).await?;
1349
1350        // The root parent graveyard.
1351        root_parent.set_graveyard_directory_object_id(
1352            Graveyard::create(&mut transaction, &root_parent).await?,
1353        );
1354
1355        transaction.commit().await?;
1356
1357        self.inner.lock().super_block_header = SuperBlockHeader::new(
1358            current_generation,
1359            root_parent.store_object_id(),
1360            root_parent.graveyard_directory_object_id(),
1361            root_store.store_object_id(),
1362            allocator.object_id(),
1363            journal_handle.object_id(),
1364            checkpoint,
1365            /* earliest_version: */ LATEST_VERSION,
1366        );
1367
1368        // Initialize the journal writer.
1369        let _ = self.handle.set(journal_handle);
1370        Ok(())
1371    }
1372
1373    /// Normally we allocate the journal when creating the filesystem.
1374    /// This is used image_builder_mode when journal allocation is done last.
1375    pub async fn allocate_journal(&self) -> Result<(), Error> {
1376        let handle = self.handle.get().unwrap();
1377        let filesystem = handle.store().filesystem();
1378        let mut transaction = filesystem
1379            .clone()
1380            .new_transaction(
1381                lock_keys![LockKey::object(handle.store().store_object_id(), handle.object_id()),],
1382                Options { skip_journal_checks: true, ..Default::default() },
1383            )
1384            .await?;
1385        let mut file_range = 0..self.chunk_size();
1386        self.handle
1387            .get()
1388            .unwrap()
1389            .preallocate_range(&mut transaction, &mut file_range)
1390            .await
1391            .context("preallocate journal")?;
1392        if file_range.start < file_range.end {
1393            bail!("preallocate_range returned too little space");
1394        }
1395        transaction.commit().await?;
1396        Ok(())
1397    }
1398
1399    pub async fn init_superblocks(&self) -> Result<(), Error> {
1400        // Overwrite both superblocks.
1401        for _ in 0..2 {
1402            self.write_super_block().await?;
1403        }
1404        Ok(())
1405    }
1406
1407    /// Takes a snapshot of all journaled transactions which affect |object_id| since its last
1408    /// flush.
1409    /// The caller is responsible for locking; it must ensure that the journal is not trimmed during
1410    /// this call.  For example, a Flush lock could be held on the object in question (assuming that
1411    /// object has data to flush and is registered with ObjectManager).
1412    pub async fn read_transactions_for_object(
1413        &self,
1414        object_id: u64,
1415    ) -> Result<Vec<JournaledTransaction>, Error> {
1416        let handle = self.handle.get().expect("No journal handle");
1417        // Reopen the handle since JournalReader needs an owned handle.
1418        let handle = ObjectStore::open_object(
1419            handle.owner(),
1420            handle.object_id(),
1421            journal_handle_options(),
1422            None,
1423        )
1424        .await?;
1425
1426        let checkpoint = match self.objects.journal_checkpoint(object_id) {
1427            Some(checkpoint) => checkpoint,
1428            None => return Ok(vec![]),
1429        };
1430        let mut reader = JournalReader::new(handle, &checkpoint);
1431        // Record the current end offset and only read to there, so we don't accidentally read any
1432        // partially flushed blocks.
1433        let end_offset = self.inner.lock().valid_to;
1434        Ok(self.read_transactions(&mut reader, Some(end_offset), object_id).await?.transactions)
1435    }
1436
1437    /// Commits a transaction.  This is not thread safe; the caller must take appropriate locks.
1438    pub async fn commit(&self, transaction: &mut Transaction<'_>) -> Result<u64, Error> {
1439        if transaction.is_empty() {
1440            return Ok(self.inner.lock().writer.journal_file_checkpoint().file_offset);
1441        }
1442
1443        self.pre_commit(transaction).await?;
1444        Ok(self.write_and_apply_mutations(transaction))
1445    }
1446
1447    // Before we commit, we might need to extend the journal or write pending records to the
1448    // journal.
1449    async fn pre_commit(&self, transaction: &Transaction<'_>) -> Result<(), Error> {
1450        let handle;
1451
1452        let (size, zero_offset) = {
1453            let mut inner = self.inner.lock();
1454
1455            // If this is the first write after a RESET, we need to output version first.
1456            if std::mem::take(&mut inner.output_reset_version) {
1457                LATEST_VERSION.serialize_into(&mut inner.writer)?;
1458            }
1459
1460            if let Some(discard_offset) = inner.discard_offset {
1461                JournalRecord::Discard(discard_offset).serialize_into(&mut inner.writer)?;
1462                inner.discard_offset = None;
1463            }
1464
1465            if inner.needs_did_flush_device {
1466                let offset = inner.device_flushed_offset;
1467                JournalRecord::DidFlushDevice(offset).serialize_into(&mut inner.writer)?;
1468                inner.needs_did_flush_device = false;
1469            }
1470
1471            handle = match self.handle.get() {
1472                None => return Ok(()),
1473                Some(x) => x,
1474            };
1475
1476            let file_offset = inner.writer.journal_file_checkpoint().file_offset;
1477
1478            let size = handle.get_size();
1479            let size = if file_offset + self.chunk_size() > size { Some(size) } else { None };
1480
1481            if size.is_none()
1482                && inner.zero_offset.is_none()
1483                && !self.objects.needs_borrow_for_journal(file_offset)
1484            {
1485                return Ok(());
1486            }
1487
1488            (size, inner.zero_offset)
1489        };
1490
1491        let mut transaction = handle
1492            .new_transaction_with_options(Options {
1493                skip_journal_checks: true,
1494                borrow_metadata_space: true,
1495                allocator_reservation: Some(self.objects.metadata_reservation()),
1496                txn_guard: Some(transaction.txn_guard()),
1497                ..Default::default()
1498            })
1499            .await?;
1500        if let Some(size) = size {
1501            handle
1502                .preallocate_range(&mut transaction, &mut (size..size + self.chunk_size()))
1503                .await?;
1504        }
1505        if let Some(zero_offset) = zero_offset {
1506            handle.zero(&mut transaction, 0..zero_offset).await?;
1507        }
1508
1509        // We can't use regular transaction commit, because that can cause re-entrancy issues, so
1510        // instead we just apply the transaction directly here.
1511        self.write_and_apply_mutations(&mut transaction);
1512
1513        let mut inner = self.inner.lock();
1514
1515        // Make sure the transaction to extend the journal made it to the journal within the old
1516        // size, since otherwise, it won't be possible to replay.
1517        if let Some(size) = size {
1518            assert!(inner.writer.journal_file_checkpoint().file_offset < size);
1519        }
1520
1521        if inner.zero_offset == zero_offset {
1522            inner.zero_offset = None;
1523        }
1524
1525        Ok(())
1526    }
1527
1528    // Determines whether a mutation at the given checkpoint should be applied.  During replay, not
1529    // all records should be applied because the object store or allocator might already contain the
1530    // mutation.  After replay, that obviously isn't the case and we want to apply all mutations.
1531    fn should_apply(&self, object_id: u64, journal_file_checkpoint: &JournalCheckpoint) -> bool {
1532        let super_block_header = &self.inner.lock().super_block_header;
1533        let offset = super_block_header
1534            .journal_file_offsets
1535            .get(&object_id)
1536            .cloned()
1537            .unwrap_or(super_block_header.super_block_journal_file_offset);
1538        journal_file_checkpoint.file_offset >= offset
1539    }
1540
1541    /// Flushes previous writes to the device and then writes out a new super-block.
1542    /// Callers must ensure that we do not make concurrent calls.
1543    async fn write_super_block(&self) -> Result<(), Error> {
1544        let root_parent_store = self.objects.root_parent_store();
1545
1546        // We need to flush previous writes to the device since the new super-block we are writing
1547        // relies on written data being observable, and we also need to lock the root parent store
1548        // so that no new entries are written to it whilst we are writing the super-block, and for
1549        // that we use the write lock.
1550        let old_layers;
1551        let old_super_block_offset;
1552        let mut new_super_block_header;
1553        let checkpoint;
1554        let borrowed;
1555
1556        {
1557            let _sync_guard = debug_assert_not_too_long!(self.sync_mutex.lock());
1558            {
1559                let _write_guard = self.writer_mutex.lock();
1560                (checkpoint, borrowed) = self.pad_to_block()?;
1561                old_layers = super_block::compact_root_parent(&*root_parent_store)?;
1562            }
1563            self.flush_device(checkpoint.file_offset)
1564                .await
1565                .context("flush failed when writing superblock")?;
1566        }
1567
1568        new_super_block_header = self.inner.lock().super_block_header.clone();
1569
1570        old_super_block_offset = new_super_block_header.journal_checkpoint.file_offset;
1571
1572        let (journal_file_offsets, min_checkpoint) = self.objects.journal_file_offsets();
1573
1574        new_super_block_header.generation = new_super_block_header.generation.wrapping_add(1);
1575        new_super_block_header.super_block_journal_file_offset = checkpoint.file_offset;
1576        new_super_block_header.journal_checkpoint = min_checkpoint.unwrap_or(checkpoint);
1577        new_super_block_header.journal_checkpoint.version = LATEST_VERSION;
1578        new_super_block_header.journal_file_offsets = journal_file_offsets;
1579        new_super_block_header.borrowed_metadata_space = borrowed;
1580
1581        self.super_block_manager
1582            .save(
1583                new_super_block_header.clone(),
1584                self.objects.root_parent_store().filesystem(),
1585                old_layers,
1586            )
1587            .await?;
1588        {
1589            let mut inner = self.inner.lock();
1590            inner.super_block_header = new_super_block_header;
1591            inner.zero_offset = Some(round_down(old_super_block_offset, BLOCK_SIZE));
1592        }
1593
1594        Ok(())
1595    }
1596
1597    /// Flushes any buffered journal data to the device.  Note that this does not flush the device
1598    /// unless the flush_device option is set, in which case data should have been persisted to
1599    /// lower layers.  If a precondition is supplied, it is evaluated and the sync will be skipped
1600    /// if it returns false.  This allows callers to check a condition whilst a lock is held.  If a
1601    /// sync is performed, this function returns the checkpoint that was flushed and the amount of
1602    /// borrowed metadata space at the point it was flushed.
1603    pub async fn sync(
1604        &self,
1605        options: SyncOptions<'_>,
1606    ) -> Result<Option<(JournalCheckpoint, u64)>, Error> {
1607        let _guard = debug_assert_not_too_long!(self.sync_mutex.lock());
1608
1609        let (checkpoint, borrowed) = {
1610            if let Some(precondition) = options.precondition {
1611                if !precondition() {
1612                    return Ok(None);
1613                }
1614            }
1615
1616            // This guard is required so that we don't insert an EndBlock record in the middle of a
1617            // transaction.
1618            let _guard = self.writer_mutex.lock();
1619
1620            self.pad_to_block()?
1621        };
1622
1623        if options.flush_device {
1624            self.flush_device(checkpoint.file_offset).await.context("sync: flush failed")?;
1625        }
1626
1627        Ok(Some((checkpoint, borrowed)))
1628    }
1629
1630    // Returns the checkpoint as it was prior to padding.  This is done because the super block
1631    // needs to record where the last transaction ends and it's the next transaction that pays the
1632    // price of the padding.
1633    fn pad_to_block(&self) -> Result<(JournalCheckpoint, u64), Error> {
1634        let mut inner = self.inner.lock();
1635        let checkpoint = inner.writer.journal_file_checkpoint();
1636        if checkpoint.file_offset % BLOCK_SIZE != 0 {
1637            JournalRecord::EndBlock.serialize_into(&mut inner.writer)?;
1638            inner.writer.pad_to_block()?;
1639            if let Some(waker) = inner.flush_waker.take() {
1640                waker.wake();
1641            }
1642        }
1643        Ok((checkpoint, self.objects.borrowed_metadata_space()))
1644    }
1645
1646    async fn flush_device(&self, checkpoint_offset: u64) -> Result<(), Error> {
1647        assert!(
1648            self.inner.lock().image_builder_mode.is_none(),
1649            "flush_device called in image builder mode"
1650        );
1651        debug_assert_not_too_long!(poll_fn(|ctx| {
1652            let mut inner = self.inner.lock();
1653            if inner.flushed_offset >= checkpoint_offset {
1654                Poll::Ready(Ok(()))
1655            } else if inner.terminate {
1656                let context = inner
1657                    .terminate_reason
1658                    .as_ref()
1659                    .map(|e| format!("Journal closed with error: {:?}", e))
1660                    .unwrap_or_else(|| "Journal closed".to_string());
1661                Poll::Ready(Err(anyhow!(FxfsError::JournalFlushError).context(context)))
1662            } else {
1663                inner.sync_waker = Some(ctx.waker().clone());
1664                Poll::Pending
1665            }
1666        }))?;
1667
1668        let needs_flush = self.inner.lock().device_flushed_offset < checkpoint_offset;
1669        if needs_flush {
1670            let trace = self.trace.load(Ordering::Relaxed);
1671            if trace {
1672                info!("J: start flush device");
1673            }
1674            self.handle.get().unwrap().flush_device().await?;
1675            if trace {
1676                info!("J: end flush device");
1677            }
1678
1679            // We need to write a DidFlushDevice record at some point, but if we are in the
1680            // process of shutting down the filesystem, we want to leave the journal clean to
1681            // avoid there being log messages complaining about unwritten journal data, so we
1682            // queue it up so that the next transaction will trigger this record to be written.
1683            // If we are shutting down, that will never happen but since the DidFlushDevice
1684            // message is purely advisory (it reduces the number of checksums we have to verify
1685            // during replay), it doesn't matter if it isn't written.
1686            {
1687                let mut inner = self.inner.lock();
1688                inner.device_flushed_offset = checkpoint_offset;
1689                inner.needs_did_flush_device = true;
1690            }
1691
1692            // Tell the allocator that we flushed the device so that it can now start using
1693            // space that was deallocated.
1694            self.objects.allocator().did_flush_device(checkpoint_offset);
1695            if trace {
1696                info!("J: did flush device");
1697            }
1698        }
1699
1700        Ok(())
1701    }
1702
1703    /// Returns a copy of the super-block header.
1704    pub fn super_block_header(&self) -> SuperBlockHeader {
1705        self.inner.lock().super_block_header.clone()
1706    }
1707
1708    /// Waits for there to be sufficient space in the journal.
1709    pub async fn check_journal_space(&self) -> Result<(), Error> {
1710        loop {
1711            debug_assert_not_too_long!({
1712                let inner = self.inner.lock();
1713                if inner.terminate {
1714                    // If the flush error is set, this will never make progress, since we can't
1715                    // extend the journal any more.
1716                    let context = inner
1717                        .terminate_reason
1718                        .as_ref()
1719                        .map(|e| format!("Journal closed with error: {:?}", e))
1720                        .unwrap_or_else(|| "Journal closed".to_string());
1721                    break Err(anyhow!(FxfsError::JournalFlushError).context(context));
1722                }
1723                if self.objects.last_end_offset()
1724                    - inner.super_block_header.journal_checkpoint.file_offset
1725                    < inner.reclaim_size
1726                {
1727                    break Ok(());
1728                }
1729                if inner.image_builder_mode.is_some() {
1730                    break Ok(());
1731                }
1732                if inner.disable_compactions {
1733                    break Err(
1734                        anyhow!(FxfsError::JournalFlushError).context("Compactions disabled")
1735                    );
1736                }
1737                self.reclaim_event.listen()
1738            });
1739        }
1740    }
1741
1742    fn chunk_size(&self) -> u64 {
1743        CHUNK_SIZE
1744    }
1745
1746    fn write_and_apply_mutations(&self, transaction: &mut Transaction<'_>) -> u64 {
1747        let checkpoint_before;
1748        let checkpoint_after;
1749        {
1750            let _guard = self.writer_mutex.lock();
1751            checkpoint_before = {
1752                let mut inner = self.inner.lock();
1753                if transaction.includes_write() {
1754                    inner.needs_barrier = true;
1755                }
1756                let checkpoint = inner.writer.journal_file_checkpoint();
1757                for TxnMutation { object_id, mutation, .. } in transaction.mutations() {
1758                    self.objects.write_mutation(
1759                        *object_id,
1760                        mutation,
1761                        Writer(*object_id, &mut inner.writer),
1762                    );
1763                }
1764                checkpoint
1765            };
1766            let maybe_mutation =
1767                self.objects.apply_transaction(transaction, &checkpoint_before).expect(
1768                    "apply_transaction should not fail in live mode; \
1769                     filesystem will be in an inconsistent state",
1770                );
1771            checkpoint_after = {
1772                let mut inner = self.inner.lock();
1773                if let Some(mutation) = maybe_mutation {
1774                    inner
1775                        .writer
1776                        .write_record(&JournalRecord::Mutation { object_id: 0, mutation })
1777                        .unwrap();
1778                }
1779                for (device_range, checksums, first_write) in
1780                    transaction.take_checksums().into_iter()
1781                {
1782                    inner
1783                        .writer
1784                        .write_record(&JournalRecord::DataChecksums(
1785                            device_range,
1786                            Checksums::fletcher(checksums),
1787                            first_write,
1788                        ))
1789                        .unwrap();
1790                }
1791                inner.writer.write_record(&JournalRecord::Commit).unwrap();
1792
1793                inner.writer.journal_file_checkpoint()
1794            };
1795        }
1796        self.objects.did_commit_transaction(
1797            transaction,
1798            &checkpoint_before,
1799            checkpoint_after.file_offset,
1800        );
1801
1802        if let Some(waker) = self.inner.lock().flush_waker.take() {
1803            waker.wake();
1804        }
1805
1806        checkpoint_before.file_offset
1807    }
1808
1809    /// This task will flush journal data to the device when there is data that needs flushing, and
1810    /// trigger compactions when short of journal space.  It will return after the terminate method
1811    /// has been called, or an error is encountered with either flushing or compaction.
1812    pub async fn flush_task(self: Arc<Self>) {
1813        let mut flush_fut = None;
1814        let mut compact_fut = None;
1815        let mut flush_error = false;
1816        poll_fn(|ctx| {
1817            loop {
1818                {
1819                    let mut inner = self.inner.lock();
1820                    if flush_fut.is_none() && !flush_error && self.handle.get().is_some() {
1821                        let flushable = inner.writer.flushable_bytes();
1822                        if flushable > 0 {
1823                            flush_fut = Some(Box::pin(self.flush(flushable)));
1824                        }
1825                    }
1826                    if inner.terminate && flush_fut.is_none() && compact_fut.is_none() {
1827                        return Poll::Ready(());
1828                    }
1829                    // `journal_bytes` refers to bytes in the journal that haven't yet been flushed
1830                    // to the layer files. It increases with each transaction and decreases when
1831                    // compactions complete. The flush task is woken whenever a transaction is
1832                    // committed and we should see this metric updated regularly.
1833                    let journal_bytes = self.objects.last_end_offset()
1834                        - inner.super_block_header.journal_checkpoint.file_offset;
1835                    fxfs_trace::counter!("journal-bytes", 0, "total" => journal_bytes);
1836                    // The / 2 is here because after compacting, we cannot reclaim the space until
1837                    // the _next_ time we flush the device since the super-block is not guaranteed
1838                    // to persist until then.
1839                    if compact_fut.is_none()
1840                        && !inner.terminate
1841                        && !inner.disable_compactions
1842                        && inner.image_builder_mode.is_none()
1843                        && journal_bytes > inner.reclaim_size / 2
1844                    {
1845                        compact_fut = Some(Box::pin(self.compact()));
1846                        inner.compaction_running = true;
1847                    }
1848                    inner.flush_waker = Some(ctx.waker().clone());
1849                }
1850                let mut pending = true;
1851                if let Some(fut) = flush_fut.as_mut() {
1852                    if let Poll::Ready(result) = fut.poll_unpin(ctx) {
1853                        if let Err(e) = result {
1854                            self.inner.lock().terminate(Some(e.context("Flush error")));
1855                            self.reclaim_event.notify(usize::MAX);
1856                            flush_error = true;
1857                        }
1858                        flush_fut = None;
1859                        pending = false;
1860                    }
1861                }
1862                if let Some(fut) = compact_fut.as_mut() {
1863                    if let Poll::Ready(result) = fut.poll_unpin(ctx) {
1864                        let mut inner = self.inner.lock();
1865                        if let Err(e) = result {
1866                            inner.terminate(Some(e.context("Compaction error")));
1867                        }
1868                        compact_fut = None;
1869                        inner.compaction_running = false;
1870                        self.reclaim_event.notify(usize::MAX);
1871                        pending = false;
1872                        fxfs_trace::counter!(
1873                            "journal-bytes",
1874                            0,
1875                            "total" => self.objects.last_end_offset()
1876                                - inner.super_block_header.journal_checkpoint.file_offset
1877                        );
1878                    }
1879                }
1880                if pending {
1881                    return Poll::Pending;
1882                }
1883            }
1884        })
1885        .await;
1886    }
1887
1888    /// Returns a yielder that can be used for compactions.
1889    pub fn get_compaction_yielder(&self) -> CompactionYielder<'_> {
1890        CompactionYielder::new(self)
1891    }
1892
1893    async fn flush(&self, amount: usize) -> Result<(), Error> {
1894        let handle = self.handle.get().unwrap();
1895        let mut buf = handle.allocate_buffer(amount).await;
1896        let (offset, len, barrier_on_first_write) = {
1897            let mut inner = self.inner.lock();
1898            let offset = inner.writer.take_flushable(buf.as_mut());
1899            let barrier_on_first_write = inner.needs_barrier && inner.barriers_enabled;
1900            // Reset `needs_barrier` before instead of after the overwrite in case a txn commit
1901            // that contains data happens during the overwrite.
1902            inner.needs_barrier = false;
1903            (offset, buf.len() as u64, barrier_on_first_write)
1904        };
1905        self.handle
1906            .get()
1907            .unwrap()
1908            .overwrite(
1909                offset,
1910                buf.as_mut(),
1911                OverwriteOptions { barrier_on_first_write, ..Default::default() },
1912            )
1913            .await?;
1914
1915        let mut inner = self.inner.lock();
1916        if let Some(waker) = inner.sync_waker.take() {
1917            waker.wake();
1918        }
1919        inner.flushed_offset = offset + len;
1920        inner.valid_to = inner.flushed_offset;
1921        Ok(())
1922    }
1923
1924    #[trace]
1925    async fn compact(&self) -> Result<(), Error> {
1926        assert!(
1927            self.inner.lock().image_builder_mode.is_none(),
1928            "compact called in image builder mode"
1929        );
1930        let bytes_before = self.objects.compaction_bytes_written();
1931        let _measure = crate::metrics::DurationMeasureScope::new(
1932            &crate::metrics::lsm_tree_metrics().journal_compaction_time,
1933        );
1934        crate::metrics::lsm_tree_metrics().journal_compactions_total.add(1);
1935        let trace = self.trace.load(Ordering::Relaxed);
1936        debug!("Compaction starting");
1937        if trace {
1938            info!("J: start compaction");
1939        }
1940        let earliest_version = self.objects.flush().await.context("Failed to flush objects")?;
1941        self.inner.lock().super_block_header.earliest_version = earliest_version;
1942        self.write_super_block().await.context("Failed to write superblock")?;
1943        if trace {
1944            info!("J: end compaction");
1945        }
1946        debug!("Compaction finished");
1947        let bytes_after = self.objects.compaction_bytes_written();
1948        crate::metrics::lsm_tree_metrics()
1949            .journal_compaction_bytes_written
1950            .add(bytes_after.saturating_sub(bytes_before));
1951        Ok(())
1952    }
1953
1954    /// This should generally NOT be called externally. It is public to allow use by FIDL service
1955    /// fxfs.Debug.
1956    pub async fn force_compact(&self) -> Result<(), Error> {
1957        self.inner.lock().forced_compaction = true;
1958        scopeguard::defer! { self.inner.lock().forced_compaction = false; }
1959        self.compact().await
1960    }
1961
1962    pub async fn stop_compactions(&self) {
1963        loop {
1964            debug_assert_not_too_long!({
1965                let mut inner = self.inner.lock();
1966                inner.disable_compactions = true;
1967                if !inner.compaction_running {
1968                    return;
1969                }
1970                self.reclaim_event.listen()
1971            });
1972        }
1973    }
1974
1975    /// Creates a lazy inspect node named `str` under `parent` which will yield statistics for the
1976    /// journal when queried.
1977    pub fn track_statistics(self: &Arc<Self>, parent: &fuchsia_inspect::Node, name: &str) {
1978        let this = Arc::downgrade(self);
1979        parent.record_lazy_child(name, move || {
1980            let this_clone = this.clone();
1981            async move {
1982                let inspector = fuchsia_inspect::Inspector::default();
1983                if let Some(this) = this_clone.upgrade() {
1984                    let (journal_min, journal_max, journal_reclaim_size) = {
1985                        // TODO(https://fxbug.dev/42069513): Push-back or rate-limit to prevent DoS.
1986                        let inner = this.inner.lock();
1987                        (
1988                            round_down(
1989                                inner.super_block_header.journal_checkpoint.file_offset,
1990                                BLOCK_SIZE,
1991                            ),
1992                            inner.flushed_offset,
1993                            inner.reclaim_size,
1994                        )
1995                    };
1996                    let root = inspector.root();
1997                    root.record_uint("journal_min_offset", journal_min);
1998                    root.record_uint("journal_max_offset", journal_max);
1999                    root.record_uint("journal_size", journal_max - journal_min);
2000                    root.record_uint("journal_reclaim_size", journal_reclaim_size);
2001
2002                    // TODO(https://fxbug.dev/42068224): Post-compute rather than manually computing metrics.
2003                    if let Some(x) = round_div(
2004                        100 * (journal_max - journal_min),
2005                        this.objects.allocator().get_disk_bytes(),
2006                    ) {
2007                        root.record_uint("journal_size_to_disk_size_percent", x);
2008                    }
2009                }
2010                Ok(inspector)
2011            }
2012            .boxed()
2013        });
2014    }
2015
2016    /// Terminate all journal activity.
2017    pub fn terminate(&self) {
2018        self.inner.lock().terminate(/*reason*/ None);
2019        self.reclaim_event.notify(usize::MAX);
2020    }
2021}
2022
2023/// Wrapper to allow records to be written to the journal.
2024pub struct Writer<'a>(u64, &'a mut JournalWriter);
2025
2026impl Writer<'_> {
2027    pub fn write(&mut self, mutation: Mutation) {
2028        self.1.write_record(&JournalRecord::Mutation { object_id: self.0, mutation }).unwrap();
2029    }
2030}
2031
2032#[cfg(target_os = "fuchsia")]
2033mod yielder {
2034    use super::Journal;
2035    use crate::lsm_tree::Yielder;
2036    use fuchsia_async as fasync;
2037
2038    /// CompactionYielder uses fuchsia-async to yield if other tasks are being polled, which should
2039    /// be a proxy for how busy the system is.  We can afford to delay compactions for a small
2040    /// amount of time but not so long that we end up blocking new transactions.
2041    pub struct CompactionYielder<'a> {
2042        journal: &'a Journal,
2043        low_priority_task: Option<fasync::LowPriorityTask>,
2044    }
2045
2046    impl<'a> CompactionYielder<'a> {
2047        pub fn new(journal: &'a Journal) -> Self {
2048            Self { journal, low_priority_task: None }
2049        }
2050    }
2051
2052    impl Yielder for CompactionYielder<'_> {
2053        async fn yield_now(&mut self) {
2054            // We will wait for the executor to be idle for 4ms, but no longer than 16ms.  We need
2055            // to cap the maximum amount of time we wait in case we've reached a point where
2056            // compaction is now urgent or else we could block new transactions.
2057            const IDLE_PERIOD: zx::MonotonicDuration = zx::MonotonicDuration::from_millis(4);
2058            const MAX_YIELD_DURATION: zx::MonotonicDuration =
2059                zx::MonotonicDuration::from_millis(16);
2060
2061            {
2062                let inner = self.journal.inner.lock();
2063                if inner.forced_compaction {
2064                    return;
2065                }
2066                let outstanding = self.journal.objects.last_end_offset()
2067                    - inner.super_block_header.journal_checkpoint.file_offset;
2068                let half_reclaim_size = inner.reclaim_size / 2;
2069                if outstanding
2070                    .checked_sub(half_reclaim_size)
2071                    .is_some_and(|x| x >= half_reclaim_size / 2)
2072                {
2073                    // If we have got to the point where we have used up 3/4 of reclaim size in the
2074                    // journal, do not delay any further.  If we continue to yield we will get to
2075                    // the point where we block new transactions.
2076                    self.low_priority_task = None;
2077                    return;
2078                }
2079            }
2080
2081            self.low_priority_task
2082                .get_or_insert_with(|| fasync::LowPriorityTask::new())
2083                .wait_until_idle_for(
2084                    IDLE_PERIOD,
2085                    fasync::MonotonicInstant::after(MAX_YIELD_DURATION),
2086                )
2087                .await;
2088        }
2089    }
2090}
2091
2092#[cfg(not(target_os = "fuchsia"))]
2093mod yielder {
2094    use super::Journal;
2095    use crate::lsm_tree::Yielder;
2096
2097    #[expect(dead_code)]
2098    pub struct CompactionYielder<'a>(&'a Journal);
2099
2100    impl<'a> CompactionYielder<'a> {
2101        pub fn new(journal: &'a Journal) -> Self {
2102            Self(journal)
2103        }
2104    }
2105
2106    impl Yielder for CompactionYielder<'_> {
2107        async fn yield_now(&mut self) {}
2108    }
2109}
2110
2111pub use yielder::*;
2112
2113#[cfg(test)]
2114mod tests {
2115    use super::SuperBlockInstance;
2116    use crate::filesystem::{FxFilesystem, FxFilesystemBuilder, SyncOptions};
2117    use crate::fsck::fsck;
2118    use crate::object_handle::{ObjectHandle, ReadObjectHandle, WriteObjectHandle};
2119    use crate::object_store::directory::Directory;
2120    use crate::object_store::transaction::Options;
2121    use crate::object_store::volume::root_volume;
2122    use crate::object_store::{
2123        HandleOptions, LockKey, NewChildStoreOptions, ObjectStore, StoreOptions, lock_keys,
2124    };
2125    #[cfg(target_os = "fuchsia")]
2126    use fuchsia_async::TestExecutor;
2127    use fuchsia_async::{self as fasync, MonotonicDuration};
2128    use storage_device::DeviceHolder;
2129    use storage_device::fake_device::FakeDevice;
2130
2131    const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
2132
2133    #[fuchsia::test]
2134    async fn test_replay() {
2135        const TEST_DATA: &[u8] = b"hello";
2136
2137        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
2138
2139        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2140
2141        let object_id = {
2142            let root_store = fs.root_store();
2143            let root_directory =
2144                Directory::open(&root_store, root_store.root_directory_object_id())
2145                    .await
2146                    .expect("open failed");
2147            let mut transaction = fs
2148                .clone()
2149                .new_transaction(
2150                    lock_keys![LockKey::object(
2151                        root_store.store_object_id(),
2152                        root_store.root_directory_object_id(),
2153                    )],
2154                    Options::default(),
2155                )
2156                .await
2157                .expect("new_transaction failed");
2158            let handle = root_directory
2159                .create_child_file(&mut transaction, "test")
2160                .await
2161                .expect("create_child_file failed");
2162
2163            transaction.commit().await.expect("commit failed");
2164            let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
2165            buf.as_mut_slice().copy_from_slice(TEST_DATA);
2166            handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2167            // As this is the first sync, this will actually trigger a new super-block, but normally
2168            // this would not be the case.
2169            fs.sync(SyncOptions::default()).await.expect("sync failed");
2170            handle.object_id()
2171        };
2172
2173        {
2174            fs.close().await.expect("Close failed");
2175            let device = fs.take_device().await;
2176            device.reopen(false);
2177            let fs = FxFilesystem::open(device).await.expect("open failed");
2178            let handle = ObjectStore::open_object(
2179                &fs.root_store(),
2180                object_id,
2181                HandleOptions::default(),
2182                None,
2183            )
2184            .await
2185            .expect("open_object failed");
2186            let mut buf = handle.allocate_buffer(TEST_DEVICE_BLOCK_SIZE as usize).await;
2187            assert_eq!(handle.read(0, buf.as_mut()).await.expect("read failed"), TEST_DATA.len());
2188            assert_eq!(&buf.as_slice()[..TEST_DATA.len()], TEST_DATA);
2189            fsck(fs.clone()).await.expect("fsck failed");
2190            fs.close().await.expect("Close failed");
2191        }
2192    }
2193
2194    #[fuchsia::test]
2195    async fn test_reset() {
2196        const TEST_DATA: &[u8] = b"hello";
2197
2198        let device = DeviceHolder::new(FakeDevice::new(32768, TEST_DEVICE_BLOCK_SIZE));
2199
2200        let mut object_ids = Vec::new();
2201
2202        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2203        {
2204            let root_store = fs.root_store();
2205            let root_directory =
2206                Directory::open(&root_store, root_store.root_directory_object_id())
2207                    .await
2208                    .expect("open failed");
2209            let mut transaction = fs
2210                .clone()
2211                .new_transaction(
2212                    lock_keys![LockKey::object(
2213                        root_store.store_object_id(),
2214                        root_store.root_directory_object_id(),
2215                    )],
2216                    Options::default(),
2217                )
2218                .await
2219                .expect("new_transaction failed");
2220            let handle = root_directory
2221                .create_child_file(&mut transaction, "test")
2222                .await
2223                .expect("create_child_file failed");
2224            transaction.commit().await.expect("commit failed");
2225            let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
2226            buf.as_mut_slice().copy_from_slice(TEST_DATA);
2227            handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2228            fs.sync(SyncOptions::default()).await.expect("sync failed");
2229            object_ids.push(handle.object_id());
2230
2231            // Create a lot of objects but don't sync at the end. This should leave the filesystem
2232            // with a half finished transaction that cannot be replayed.
2233            for i in 0..1000 {
2234                let mut transaction = fs
2235                    .clone()
2236                    .new_transaction(
2237                        lock_keys![LockKey::object(
2238                            root_store.store_object_id(),
2239                            root_store.root_directory_object_id(),
2240                        )],
2241                        Options::default(),
2242                    )
2243                    .await
2244                    .expect("new_transaction failed");
2245                let handle = root_directory
2246                    .create_child_file(&mut transaction, &format!("{}", i))
2247                    .await
2248                    .expect("create_child_file failed");
2249                transaction.commit().await.expect("commit failed");
2250                let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
2251                buf.as_mut_slice().copy_from_slice(TEST_DATA);
2252                handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2253                object_ids.push(handle.object_id());
2254            }
2255        }
2256        fs.close().await.expect("fs close failed");
2257        let device = fs.take_device().await;
2258        device.reopen(false);
2259        let fs = FxFilesystem::open(device).await.expect("open failed");
2260        fsck(fs.clone()).await.expect("fsck failed");
2261        {
2262            let root_store = fs.root_store();
2263            // Check the first two objects which should exist.
2264            for &object_id in &object_ids[0..1] {
2265                let handle = ObjectStore::open_object(
2266                    &root_store,
2267                    object_id,
2268                    HandleOptions::default(),
2269                    None,
2270                )
2271                .await
2272                .expect("open_object failed");
2273                let mut buf = handle.allocate_buffer(TEST_DEVICE_BLOCK_SIZE as usize).await;
2274                assert_eq!(
2275                    handle.read(0, buf.as_mut()).await.expect("read failed"),
2276                    TEST_DATA.len()
2277                );
2278                assert_eq!(&buf.as_slice()[..TEST_DATA.len()], TEST_DATA);
2279            }
2280
2281            // Write one more object and sync.
2282            let root_directory =
2283                Directory::open(&root_store, root_store.root_directory_object_id())
2284                    .await
2285                    .expect("open failed");
2286            let mut transaction = fs
2287                .clone()
2288                .new_transaction(
2289                    lock_keys![LockKey::object(
2290                        root_store.store_object_id(),
2291                        root_store.root_directory_object_id(),
2292                    )],
2293                    Options::default(),
2294                )
2295                .await
2296                .expect("new_transaction failed");
2297            let handle = root_directory
2298                .create_child_file(&mut transaction, "test2")
2299                .await
2300                .expect("create_child_file failed");
2301            transaction.commit().await.expect("commit failed");
2302            let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
2303            buf.as_mut_slice().copy_from_slice(TEST_DATA);
2304            handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2305            fs.sync(SyncOptions::default()).await.expect("sync failed");
2306            object_ids.push(handle.object_id());
2307        }
2308
2309        fs.close().await.expect("close failed");
2310        let device = fs.take_device().await;
2311        device.reopen(false);
2312        let fs = FxFilesystem::open(device).await.expect("open failed");
2313        {
2314            fsck(fs.clone()).await.expect("fsck failed");
2315
2316            // Check the first two and the last objects.
2317            for &object_id in object_ids[0..1].iter().chain(object_ids.last().cloned().iter()) {
2318                let handle = ObjectStore::open_object(
2319                    &fs.root_store(),
2320                    object_id,
2321                    HandleOptions::default(),
2322                    None,
2323                )
2324                .await
2325                .unwrap_or_else(|e| {
2326                    panic!("open_object failed (object_id: {}): {:?}", object_id, e)
2327                });
2328                let mut buf = handle.allocate_buffer(TEST_DEVICE_BLOCK_SIZE as usize).await;
2329                assert_eq!(
2330                    handle.read(0, buf.as_mut()).await.expect("read failed"),
2331                    TEST_DATA.len()
2332                );
2333                assert_eq!(&buf.as_slice()[..TEST_DATA.len()], TEST_DATA);
2334            }
2335        }
2336        fs.close().await.expect("close failed");
2337    }
2338
2339    #[fuchsia::test]
2340    async fn test_discard() {
2341        let device = {
2342            let device = DeviceHolder::new(FakeDevice::new(8192, 4096));
2343            let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2344            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
2345
2346            let store = root_volume
2347                .new_volume("test", NewChildStoreOptions::default())
2348                .await
2349                .expect("new_volume failed");
2350            let root_directory = Directory::open(&store, store.root_directory_object_id())
2351                .await
2352                .expect("open failed");
2353
2354            // Create enough data so that another journal extent is used.
2355            let mut i = 0;
2356            loop {
2357                let mut transaction = fs
2358                    .clone()
2359                    .new_transaction(
2360                        lock_keys![LockKey::object(
2361                            store.store_object_id(),
2362                            store.root_directory_object_id()
2363                        )],
2364                        Options::default(),
2365                    )
2366                    .await
2367                    .expect("new_transaction failed");
2368                root_directory
2369                    .create_child_file(&mut transaction, &format!("a {i}"))
2370                    .await
2371                    .expect("create_child_file failed");
2372                if transaction.commit().await.expect("commit failed") > super::CHUNK_SIZE {
2373                    break;
2374                }
2375                i += 1;
2376            }
2377
2378            // Compact and then disable compactions.
2379            fs.journal().force_compact().await.expect("compact failed");
2380            fs.journal().stop_compactions().await;
2381
2382            // Keep going until we need another journal extent.
2383            let mut i = 0;
2384            loop {
2385                let mut transaction = fs
2386                    .clone()
2387                    .new_transaction(
2388                        lock_keys![LockKey::object(
2389                            store.store_object_id(),
2390                            store.root_directory_object_id()
2391                        )],
2392                        Options::default(),
2393                    )
2394                    .await
2395                    .expect("new_transaction failed");
2396                root_directory
2397                    .create_child_file(&mut transaction, &format!("b {i}"))
2398                    .await
2399                    .expect("create_child_file failed");
2400                if transaction.commit().await.expect("commit failed") > 2 * super::CHUNK_SIZE {
2401                    break;
2402                }
2403                i += 1;
2404            }
2405
2406            // Allow the journal to flush, but we don't want to sync.
2407            fasync::Timer::new(MonotonicDuration::from_millis(10)).await;
2408            // Because we're not gracefully closing the filesystem, a Discard record will be
2409            // emitted.
2410            fs.device().snapshot().expect("snapshot failed")
2411        };
2412
2413        let fs = FxFilesystem::open(device).await.expect("open failed");
2414
2415        {
2416            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
2417
2418            let store =
2419                root_volume.volume("test", StoreOptions::default()).await.expect("volume failed");
2420
2421            let root_directory = Directory::open(&store, store.root_directory_object_id())
2422                .await
2423                .expect("open failed");
2424
2425            // Write one more transaction.
2426            let mut transaction = fs
2427                .clone()
2428                .new_transaction(
2429                    lock_keys![LockKey::object(
2430                        store.store_object_id(),
2431                        store.root_directory_object_id()
2432                    )],
2433                    Options::default(),
2434                )
2435                .await
2436                .expect("new_transaction failed");
2437            root_directory
2438                .create_child_file(&mut transaction, &format!("d"))
2439                .await
2440                .expect("create_child_file failed");
2441            transaction.commit().await.expect("commit failed");
2442        }
2443
2444        fs.close().await.expect("close failed");
2445        let device = fs.take_device().await;
2446        device.reopen(false);
2447
2448        let fs = FxFilesystem::open(device).await.expect("open failed");
2449        fsck(fs.clone()).await.expect("fsck failed");
2450        fs.close().await.expect("close failed");
2451    }
2452
2453    #[fuchsia::test]
2454    async fn test_use_existing_generation() {
2455        let device = DeviceHolder::new(FakeDevice::new(8192, 4096));
2456
2457        // First format should be generation 1.
2458        let fs = FxFilesystemBuilder::new()
2459            .format(true)
2460            .image_builder_mode(Some(SuperBlockInstance::A))
2461            .open(device)
2462            .await
2463            .expect("open failed");
2464        fs.enable_allocations();
2465        let generation0 = fs.super_block_header().generation;
2466        assert_eq!(generation0, 1);
2467        fs.close().await.expect("close failed");
2468        let device = fs.take_device().await;
2469        device.reopen(false);
2470
2471        // Format the device normally (again, generation 1).
2472        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2473        let generation1 = fs.super_block_header().generation;
2474        {
2475            let root_volume = crate::object_store::volume::root_volume(fs.clone())
2476                .await
2477                .expect("root_volume failed");
2478            root_volume
2479                .new_volume("test", crate::object_store::NewChildStoreOptions::default())
2480                .await
2481                .expect("new_volume failed");
2482        }
2483        fs.close().await.expect("close failed");
2484        let device = fs.take_device().await;
2485        device.reopen(false);
2486
2487        // Format again with image_builder_mode.
2488        let fs = FxFilesystemBuilder::new()
2489            .format(true)
2490            .image_builder_mode(Some(SuperBlockInstance::A))
2491            .open(device)
2492            .await
2493            .expect("open failed");
2494        fs.enable_allocations();
2495        let generation2 = fs.super_block_header().generation;
2496        assert!(
2497            generation2 > generation1,
2498            "generation2 ({}) should be greater than generation1 ({})",
2499            generation2,
2500            generation1
2501        );
2502        fs.close().await.expect("close failed");
2503    }
2504
2505    #[fuchsia::test]
2506    async fn test_image_builder_mode_generation_bump_512_byte_block() {
2507        let device = DeviceHolder::new(FakeDevice::new(16384, 512));
2508
2509        // Format initial filesystem (generation 1)
2510        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2511        let generation1 = fs.super_block_header().generation;
2512        fs.close().await.expect("close failed");
2513        let device = fs.take_device().await;
2514        device.reopen(false);
2515
2516        // Format again with image_builder_mode (should bump generation)
2517        let fs = FxFilesystemBuilder::new()
2518            .format(true)
2519            .image_builder_mode(Some(SuperBlockInstance::A))
2520            .open(device)
2521            .await
2522            .expect("open failed");
2523
2524        fs.enable_allocations();
2525        let generation2 = fs.super_block_header().generation;
2526        assert!(
2527            generation2 > generation1,
2528            "Expected generation bump, got {} vs {}",
2529            generation2,
2530            generation1
2531        );
2532        fs.close().await.expect("close failed");
2533    }
2534
2535    #[fuchsia::test]
2536    #[cfg(target_os = "fuchsia")]
2537    fn test_low_priority_compaction() {
2538        let mut executor = TestExecutor::new_with_fake_time();
2539        let mut fut = std::pin::pin!(async {
2540            use std::sync::Arc;
2541            use std::sync::atomic::{AtomicBool, Ordering};
2542
2543            let device = DeviceHolder::new(FakeDevice::new(16384, TEST_DEVICE_BLOCK_SIZE));
2544            let fs = FxFilesystemBuilder::new()
2545                .journal_options(super::JournalOptions {
2546                    reclaim_size: 65536,
2547                    ..Default::default()
2548                })
2549                .format(true)
2550                .open(device)
2551                .await
2552                .expect("open failed");
2553
2554            let _low = fasync::LowPriorityTask::new();
2555
2556            // Add some data to the tree.
2557            {
2558                let root_store = fs.root_store();
2559                let root_directory =
2560                    Directory::open(&root_store, root_store.root_directory_object_id())
2561                        .await
2562                        .expect("open failed");
2563                for i in 0..100 {
2564                    let mut transaction = fs
2565                        .clone()
2566                        .new_transaction(
2567                            lock_keys![LockKey::object(
2568                                root_store.store_object_id(),
2569                                root_store.root_directory_object_id(),
2570                            )],
2571                            Options::default(),
2572                        )
2573                        .await
2574                        .expect("new_transaction failed");
2575                    root_directory
2576                        .create_child_file(&mut transaction, &format!("test{}", i))
2577                        .await
2578                        .expect("create_child_file failed");
2579                    transaction.commit().await.expect("commit failed");
2580                }
2581            }
2582
2583            // Spawn a task that polls every 1ms.
2584            let stop = Arc::new(AtomicBool::new(false));
2585            let stop_clone = stop.clone();
2586            let _normal_task = fasync::Task::spawn(async move {
2587                while !stop_clone.load(Ordering::Relaxed) {
2588                    fasync::Timer::new(fasync::MonotonicInstant::after(
2589                        MonotonicDuration::from_millis(1),
2590                    ))
2591                    .await;
2592                }
2593            });
2594
2595            // Trigger journal compaction.
2596            // We can do this by writing more data until outstanding > reclaim_size / 2.
2597            {
2598                let root_store = fs.root_store();
2599                let root_directory =
2600                    Directory::open(&root_store, root_store.root_directory_object_id())
2601                        .await
2602                        .expect("open failed");
2603                let mut i = 0;
2604                loop {
2605                    let mut transaction = fs
2606                        .clone()
2607                        .new_transaction(
2608                            lock_keys![LockKey::object(
2609                                root_store.store_object_id(),
2610                                root_store.root_directory_object_id(),
2611                            )],
2612                            Options::default(),
2613                        )
2614                        .await
2615                        .expect("new_transaction failed");
2616                    root_directory
2617                        .create_child_file(&mut transaction, &format!("trigger{i}"))
2618                        .await
2619                        .expect("create_child_file failed");
2620                    transaction.commit().await.expect("commit failed");
2621
2622                    if fs.journal().inner.lock().compaction_running {
2623                        break;
2624                    }
2625                    TestExecutor::advance_to(fasync::MonotonicInstant::after(
2626                        MonotonicDuration::from_millis(1),
2627                    ))
2628                    .await;
2629                    i += 1;
2630                }
2631            }
2632
2633            // Compaction should now be running. Because of our 1ms poller, it should be yielding.
2634            // It will yield for 16ms at each point.
2635            for _ in 0..10 {
2636                TestExecutor::advance_to(fasync::MonotonicInstant::after(
2637                    MonotonicDuration::from_millis(1),
2638                ))
2639                .await;
2640                assert!(fs.journal().inner.lock().compaction_running);
2641            }
2642
2643            // Stop the normal task.
2644            stop.store(true, Ordering::Relaxed);
2645            TestExecutor::advance_to(fasync::MonotonicInstant::after(
2646                MonotonicDuration::from_millis(1),
2647            ))
2648            .await;
2649
2650            // Compaction should still be running because it hasn't been 4 ms since the normal task
2651            // finished.
2652            assert!(fs.journal().inner.lock().compaction_running);
2653
2654            // For the next 3ms, compaction should still be running.
2655            for _ in 0..3 {
2656                TestExecutor::advance_to(fasync::MonotonicInstant::after(
2657                    MonotonicDuration::from_millis(1),
2658                ))
2659                .await;
2660                assert!(fs.journal().inner.lock().compaction_running);
2661            }
2662
2663            // 1 more ms and compaction should be unblocked.
2664            TestExecutor::advance_to(fasync::MonotonicInstant::after(
2665                MonotonicDuration::from_millis(1),
2666            ))
2667            .await;
2668
2669            // When the executor next stalls, compaction should be done.
2670            let _ = TestExecutor::poll_until_stalled(std::future::pending::<()>()).await;
2671            assert!(!fs.journal().inner.lock().compaction_running);
2672
2673            fs.close().await.expect("Close failed");
2674        });
2675        assert!(executor.run_until_stalled(&mut fut).is_ready());
2676    }
2677
2678    #[fuchsia::test]
2679    #[cfg(target_os = "fuchsia")]
2680    fn test_low_priority_compaction_deadline() {
2681        let mut executor = TestExecutor::new_with_fake_time();
2682        let mut fut = std::pin::pin!(async {
2683            use std::sync::Arc;
2684            use std::sync::atomic::{AtomicBool, Ordering};
2685
2686            let device = DeviceHolder::new(FakeDevice::new(216384, TEST_DEVICE_BLOCK_SIZE));
2687            let fs = FxFilesystemBuilder::new()
2688                .journal_options(super::JournalOptions {
2689                    reclaim_size: 65536,
2690                    ..Default::default()
2691                })
2692                .format(true)
2693                .open(device)
2694                .await
2695                .expect("open failed");
2696
2697            let _low = fasync::LowPriorityTask::new();
2698
2699            // Add some data to the tree.
2700            {
2701                let root_store = fs.root_store();
2702                let root_directory =
2703                    Directory::open(&root_store, root_store.root_directory_object_id())
2704                        .await
2705                        .expect("open failed");
2706                for i in 0..10 {
2707                    let mut transaction = fs
2708                        .clone()
2709                        .new_transaction(
2710                            lock_keys![LockKey::object(
2711                                root_store.store_object_id(),
2712                                root_store.root_directory_object_id(),
2713                            )],
2714                            Options::default(),
2715                        )
2716                        .await
2717                        .expect("new_transaction failed");
2718                    root_directory
2719                        .create_child_file(&mut transaction, &format!("test{}", i))
2720                        .await
2721                        .expect("create_child_file failed");
2722                    transaction.commit().await.expect("commit failed");
2723                }
2724            }
2725
2726            // Spawn a task that polls every 3ms.
2727            let stop = Arc::new(AtomicBool::new(false));
2728            let stop_clone = stop.clone();
2729            let _normal_task = fasync::Task::spawn(async move {
2730                while !stop_clone.load(Ordering::Relaxed) {
2731                    fasync::Timer::new(fasync::MonotonicInstant::after(
2732                        MonotonicDuration::from_millis(3),
2733                    ))
2734                    .await;
2735                }
2736            });
2737
2738            // Trigger journal compaction.
2739            {
2740                let root_store = fs.root_store();
2741                let root_directory =
2742                    Directory::open(&root_store, root_store.root_directory_object_id())
2743                        .await
2744                        .expect("open failed");
2745                let mut i = 0;
2746                loop {
2747                    let mut transaction = fs
2748                        .clone()
2749                        .new_transaction(
2750                            lock_keys![LockKey::object(
2751                                root_store.store_object_id(),
2752                                root_store.root_directory_object_id(),
2753                            )],
2754                            Options::default(),
2755                        )
2756                        .await
2757                        .expect("new_transaction failed");
2758                    root_directory
2759                        .create_child_file(&mut transaction, &format!("trigger{i}"))
2760                        .await
2761                        .expect("create_child_file failed");
2762                    transaction.commit().await.expect("commit failed");
2763
2764                    if fs.journal().inner.lock().compaction_running {
2765                        break;
2766                    }
2767                    TestExecutor::advance_to(fasync::MonotonicInstant::after(
2768                        MonotonicDuration::from_millis(1),
2769                    ))
2770                    .await;
2771                    i += 1;
2772                }
2773            }
2774
2775            // Advance time in 20ms increments. Each increment should allow compaction to make progress
2776            // on one item (since MAX_YIELD_DURATION is 16ms).
2777            let mut count = 0;
2778            for _ in 0..1000 {
2779                TestExecutor::advance_to(fasync::MonotonicInstant::after(
2780                    MonotonicDuration::from_millis(20),
2781                ))
2782                .await;
2783                if !fs.journal().inner.lock().compaction_running {
2784                    break;
2785                }
2786                count += 1;
2787            }
2788            assert!(!fs.journal().inner.lock().compaction_running);
2789
2790            // Make sure it took a few iterations to complete.  It's difficult to know what the exact
2791            // number should be.
2792            assert!(count > 200);
2793
2794            stop.store(true, Ordering::Relaxed);
2795            fs.close().await.expect("Close failed");
2796        });
2797        assert!(executor.run_until_stalled(&mut fut).is_ready());
2798    }
2799
2800    #[fuchsia::test]
2801    #[cfg(target_os = "fuchsia")]
2802    fn test_low_priority_compaction_no_yielding_when_full() {
2803        let mut executor = TestExecutor::new_with_fake_time();
2804        let mut fut = std::pin::pin!(async {
2805            use std::sync::Arc;
2806            use std::sync::atomic::{AtomicBool, Ordering};
2807
2808            let reclaim_size = 65536;
2809            let device = DeviceHolder::new(FakeDevice::new(216384, TEST_DEVICE_BLOCK_SIZE));
2810            let fs = FxFilesystemBuilder::new()
2811                .journal_options(super::JournalOptions { reclaim_size, ..Default::default() })
2812                .format(true)
2813                .open(device)
2814                .await
2815                .expect("open failed");
2816
2817            let _low = fasync::LowPriorityTask::new();
2818
2819            // Add some data to the tree.
2820            {
2821                let root_store = fs.root_store();
2822                let root_directory =
2823                    Directory::open(&root_store, root_store.root_directory_object_id())
2824                        .await
2825                        .expect("open failed");
2826                for i in 0..10 {
2827                    let mut transaction = fs
2828                        .clone()
2829                        .new_transaction(
2830                            lock_keys![LockKey::object(
2831                                root_store.store_object_id(),
2832                                root_store.root_directory_object_id(),
2833                            )],
2834                            Options::default(),
2835                        )
2836                        .await
2837                        .expect("new_transaction failed");
2838                    root_directory
2839                        .create_child_file(&mut transaction, &format!("test{}", i))
2840                        .await
2841                        .expect("create_child_file failed");
2842                    transaction.commit().await.expect("commit failed");
2843                }
2844            }
2845
2846            // Spawn a task that polls every 3ms.
2847            let stop = Arc::new(AtomicBool::new(false));
2848            let stop_clone = stop.clone();
2849            let _normal_task = fasync::Task::spawn(async move {
2850                while !stop_clone.load(Ordering::Relaxed) {
2851                    fasync::Timer::new(fasync::MonotonicInstant::after(
2852                        MonotonicDuration::from_millis(3),
2853                    ))
2854                    .await;
2855                }
2856            });
2857
2858            // Trigger journal compaction, but this time we fill it up to 3/4 full.
2859            {
2860                let root_store = fs.root_store();
2861                let root_directory =
2862                    Directory::open(&root_store, root_store.root_directory_object_id())
2863                        .await
2864                        .expect("open failed");
2865                let mut i = 0;
2866                loop {
2867                    let mut transaction = fs
2868                        .clone()
2869                        .new_transaction(
2870                            lock_keys![LockKey::object(
2871                                root_store.store_object_id(),
2872                                root_store.root_directory_object_id(),
2873                            )],
2874                            Options::default(),
2875                        )
2876                        .await
2877                        .expect("new_transaction failed");
2878                    root_directory
2879                        .create_child_file(&mut transaction, &format!("trigger{i}"))
2880                        .await
2881                        .expect("create_child_file failed");
2882                    transaction.commit().await.expect("commit failed");
2883
2884                    let outstanding = {
2885                        let inner = fs.journal().inner.lock();
2886                        fs.journal().objects.last_end_offset()
2887                            - inner.super_block_header.journal_checkpoint.file_offset
2888                    };
2889                    if outstanding >= reclaim_size * 7 / 8 {
2890                        break;
2891                    }
2892                    // We don't advance time here to try and reach 3/4 before compaction can yield.
2893                    i += 1;
2894                }
2895            }
2896
2897            // Advancing by 4ms should be enough to wake compaction up.
2898            TestExecutor::advance_to(fasync::MonotonicInstant::after(
2899                MonotonicDuration::from_millis(4),
2900            ))
2901            .await;
2902
2903            // When the executor next stalls, compaction should be done.
2904            let _ = TestExecutor::poll_until_stalled(std::future::pending::<()>()).await;
2905            assert!(!fs.journal().inner.lock().compaction_running);
2906
2907            stop.store(true, Ordering::Relaxed);
2908            fs.close().await.expect("Close failed");
2909        });
2910        assert!(executor.run_until_stalled(&mut fut).is_ready());
2911    }
2912}
2913
2914#[cfg(fuzz)]
2915mod fuzz {
2916    use fuzz::fuzz;
2917
2918    #[fuzz]
2919    fn fuzz_journal_bytes(input: Vec<u8>) {
2920        use crate::filesystem::FxFilesystem;
2921        use fuchsia_async as fasync;
2922        use std::io::Write;
2923        use storage_device::DeviceHolder;
2924        use storage_device::fake_device::FakeDevice;
2925
2926        fasync::SendExecutorBuilder::new().num_threads(4).build().run(async move {
2927            let device = DeviceHolder::new(FakeDevice::new(32768, 512));
2928            let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2929            fs.journal().inner.lock().writer.write_all(&input).expect("write failed");
2930            fs.close().await.expect("close failed");
2931            let device = fs.take_device().await;
2932            device.reopen(false);
2933            if let Ok(fs) = FxFilesystem::open(device).await {
2934                // `close()` can fail if there were objects to be tombstoned. If the said object is
2935                // corrupted, there will be an error when we compact the journal.
2936                let _ = fs.close().await;
2937            }
2938        });
2939    }
2940
2941    #[fuzz]
2942    fn fuzz_journal(input: Vec<super::JournalRecord>) {
2943        use crate::filesystem::FxFilesystem;
2944        use fuchsia_async as fasync;
2945        use storage_device::DeviceHolder;
2946        use storage_device::fake_device::FakeDevice;
2947
2948        fasync::SendExecutorBuilder::new().num_threads(4).build().run(async move {
2949            let device = DeviceHolder::new(FakeDevice::new(32768, 512));
2950            let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2951            {
2952                let mut inner = fs.journal().inner.lock();
2953                for record in &input {
2954                    let _ = inner.writer.write_record(record);
2955                }
2956            }
2957            fs.close().await.expect("close failed");
2958            let device = fs.take_device().await;
2959            device.reopen(false);
2960            if let Ok(fs) = FxFilesystem::open(device).await {
2961                // `close()` can fail if there were objects to be tombstoned. If the said object is
2962                // corrupted, there will be an error when we compact the journal.
2963                let _ = fs.close().await;
2964            }
2965        });
2966    }
2967}