Skip to main content

fxfs/object_store/
journal.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! The journal is implemented as an ever extending file which contains variable length records
6//! that describe mutations to be applied to various objects.  The journal file consists of
7//! blocks, with a checksum at the end of each block, but otherwise it can be considered a
8//! continuous stream.
9//!
10//! The checksum is seeded with the checksum from the previous block.  To free space in the
11//! journal, records are replaced with sparse extents when it is known they are no longer
12//! needed to mount.
13//!
14//! At mount time, the journal is replayed: the mutations are applied into memory.
15//! Eventually, a checksum failure will indicate no more records exist to be replayed,
16//! at which point the mount can continue and the journal will be extended from that point with
17//! further mutations as required.
18
19mod bootstrap_handle;
20mod checksum_list;
21mod reader;
22pub mod super_block;
23mod writer;
24
25use crate::checksum::{Checksum, Checksums, ChecksumsV38};
26use crate::errors::FxfsError;
27use crate::filesystem::{ApplyContext, ApplyMode, FxFilesystem, SyncOptions};
28use crate::log::*;
29use crate::lsm_tree::cache::NullCache;
30use crate::lsm_tree::types::Layer;
31use crate::object_handle::{ObjectHandle as _, ReadObjectHandle};
32use crate::object_store::allocator::Allocator;
33use crate::object_store::data_object_handle::OverwriteOptions;
34use crate::object_store::extent_record::{
35    DEFAULT_DATA_ATTRIBUTE_ID, ExtentKey, ExtentMode, ExtentValue,
36};
37use crate::object_store::graveyard::Graveyard;
38use crate::object_store::journal::bootstrap_handle::BootstrapObjectHandle;
39use crate::object_store::journal::checksum_list::ChecksumList;
40use crate::object_store::journal::reader::{JournalReader, ReadResult};
41use crate::object_store::journal::super_block::{
42    SuperBlockHeader, SuperBlockInstance, SuperBlockManager,
43};
44use crate::object_store::journal::writer::JournalWriter;
45use crate::object_store::object_manager::ObjectManager;
46use crate::object_store::object_record::{AttributeKey, ObjectKey, ObjectKeyData, ObjectValue};
47use crate::object_store::transaction::{
48    AllocatorMutation, LockKey, Mutation, MutationV40, MutationV41, MutationV43, MutationV46,
49    MutationV47, MutationV49, MutationV50, ObjectStoreMutation, Options,
50    TRANSACTION_MAX_JOURNAL_USAGE, Transaction, TxnMutation, lock_keys,
51};
52use crate::object_store::{
53    AssocObj, DataObjectHandle, HandleOptions, HandleOwner, INVALID_OBJECT_ID, Item, ItemRef,
54    NewChildStoreOptions, ObjectStore, ReservedId,
55};
56use crate::range::RangeExt;
57use crate::round::{round_div, round_down};
58use crate::serialized_types::{LATEST_VERSION, Migrate, Version, Versioned, migrate_to_version};
59use anyhow::{Context, Error, anyhow, bail, ensure};
60use event_listener::Event;
61use fprint::TypeFingerprint;
62use fuchsia_inspect::NumericProperty;
63use fuchsia_sync::Mutex;
64use futures::FutureExt as _;
65use futures::future::poll_fn;
66use rustc_hash::FxHashMap as HashMap;
67use serde::{Deserialize, Serialize};
68use static_assertions::const_assert;
69use std::clone::Clone;
70use std::collections::HashSet;
71use std::num::NonZero;
72use std::ops::{Bound, Range};
73use std::sync::atomic::{AtomicBool, Ordering};
74use std::sync::{Arc, OnceLock};
75use std::task::{Poll, Waker};
76use storage_device::Device;
77
78// The journal file is written to in blocks of this size.
79pub const BLOCK_SIZE: u64 = 4096;
80
81// The journal file is extended by this amount when necessary.
82const CHUNK_SIZE: u64 = 131_072;
83const_assert!(CHUNK_SIZE > TRANSACTION_MAX_JOURNAL_USAGE);
84
85// See the comment for the `reclaim_size` member of Inner.
86pub const DEFAULT_RECLAIM_SIZE: u64 = 524_288;
87
88// Temporary space that should be reserved for the journal.  For example: space that is currently
89// used in the journal file but cannot be deallocated yet because we are flushing.
90pub const RESERVED_SPACE: u64 = 1_048_576;
91
92// Whenever the journal is replayed (i.e. the system is unmounted and remounted), we reset the
93// journal stream, at which point any half-complete transactions are discarded.  We indicate a
94// journal reset by XORing the previous block's checksum with this mask, and using that value as a
95// seed for the next journal block.
96const RESET_XOR: u64 = 0xffffffffffffffff;
97
98// To keep track of offsets within a journal file, we need both the file offset and the check-sum of
99// the preceding block, since the check-sum of the preceding block is an input to the check-sum of
100// every block.
101pub type JournalCheckpoint = JournalCheckpointV32;
102
103#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize, TypeFingerprint)]
104pub struct JournalCheckpointV32 {
105    pub file_offset: u64,
106
107    // Starting check-sum for block that contains file_offset i.e. the checksum for the previous
108    // block.
109    pub checksum: Checksum,
110
111    // If versioned, the version of elements stored in the journal. e.g. JournalRecord version.
112    // This can change across reset events so we store it along with the offset and checksum to
113    // know which version to deserialize.
114    pub version: Version,
115}
116
117pub type JournalRecord = JournalRecordV50;
118
119#[allow(clippy::large_enum_variant)]
120#[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint, Versioned)]
121#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
122pub enum JournalRecordV50 {
123    // Indicates no more records in this block.
124    EndBlock,
125    // Mutation for a particular object.  object_id here is for the collection i.e. the store or
126    // allocator.
127    Mutation { object_id: u64, mutation: MutationV50 },
128    // Commits records in the transaction.
129    Commit,
130    // Discard all mutations with offsets greater than or equal to the given offset.
131    Discard(u64),
132    // Indicates the device was flushed at the given journal offset.
133    // Note that this really means that at this point in the journal offset, we can be certain that
134    // there's no remaining buffered data in the block device; the buffers and the disk contents are
135    // consistent.
136    // We insert one of these records *after* a flush along with the *next* transaction to go
137    // through.  If that never comes (either due to graceful or hard shutdown), the journal reset
138    // on the next mount will serve the same purpose and count as a flush, although it is necessary
139    // to defensively flush the device before replaying the journal (if possible, i.e. not
140    // read-only) in case the block device connection was reused.
141    DidFlushDevice(u64),
142    // Checksums for a data range written by this transaction. A transaction is only valid if these
143    // checksums are right. The range is the device offset the checksums are for.
144    //
145    // A boolean indicates whether this range is being written to for the first time. For overwrite
146    // extents, we only check the checksums for a block if it has been written to for the first
147    // time since the last flush, because otherwise we can't roll it back anyway so it doesn't
148    // matter. For copy-on-write extents, the bool is always true.
149    DataChecksums(Range<u64>, ChecksumsV38, bool),
150}
151
152#[allow(clippy::large_enum_variant)]
153#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
154#[migrate_to_version(JournalRecordV50)]
155pub enum JournalRecordV49 {
156    EndBlock,
157    Mutation { object_id: u64, mutation: MutationV49 },
158    Commit,
159    Discard(u64),
160    DidFlushDevice(u64),
161    DataChecksums(Range<u64>, ChecksumsV38, bool),
162}
163
164#[allow(clippy::large_enum_variant)]
165#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
166#[migrate_to_version(JournalRecordV49)]
167pub enum JournalRecordV47 {
168    EndBlock,
169    Mutation { object_id: u64, mutation: MutationV47 },
170    Commit,
171    Discard(u64),
172    DidFlushDevice(u64),
173    DataChecksums(Range<u64>, ChecksumsV38, bool),
174}
175
176#[allow(clippy::large_enum_variant)]
177#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
178#[migrate_to_version(JournalRecordV47)]
179pub enum JournalRecordV46 {
180    EndBlock,
181    Mutation { object_id: u64, mutation: MutationV46 },
182    Commit,
183    Discard(u64),
184    DidFlushDevice(u64),
185    DataChecksums(Range<u64>, ChecksumsV38, bool),
186}
187
188#[allow(clippy::large_enum_variant)]
189#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
190#[migrate_to_version(JournalRecordV46)]
191pub enum JournalRecordV43 {
192    EndBlock,
193    Mutation { object_id: u64, mutation: MutationV43 },
194    Commit,
195    Discard(u64),
196    DidFlushDevice(u64),
197    DataChecksums(Range<u64>, ChecksumsV38, bool),
198}
199
200#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
201#[migrate_to_version(JournalRecordV43)]
202pub enum JournalRecordV42 {
203    EndBlock,
204    Mutation { object_id: u64, mutation: MutationV41 },
205    Commit,
206    Discard(u64),
207    DidFlushDevice(u64),
208    DataChecksums(Range<u64>, ChecksumsV38, bool),
209}
210
211#[derive(Serialize, Deserialize, TypeFingerprint, Versioned)]
212pub enum JournalRecordV41 {
213    EndBlock,
214    Mutation { object_id: u64, mutation: MutationV41 },
215    Commit,
216    Discard(u64),
217    DidFlushDevice(u64),
218    DataChecksums(Range<u64>, ChecksumsV38),
219}
220
221impl From<JournalRecordV41> for JournalRecordV42 {
222    fn from(record: JournalRecordV41) -> Self {
223        match record {
224            JournalRecordV41::EndBlock => Self::EndBlock,
225            JournalRecordV41::Mutation { object_id, mutation } => {
226                Self::Mutation { object_id, mutation: mutation.into() }
227            }
228            JournalRecordV41::Commit => Self::Commit,
229            JournalRecordV41::Discard(offset) => Self::Discard(offset),
230            JournalRecordV41::DidFlushDevice(offset) => Self::DidFlushDevice(offset),
231            JournalRecordV41::DataChecksums(range, sums) => {
232                // At the time of writing the only extents written by real systems are CoW extents
233                // so the new bool is always true.
234                Self::DataChecksums(range, sums, true)
235            }
236        }
237    }
238}
239
240#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
241#[migrate_to_version(JournalRecordV41)]
242pub enum JournalRecordV40 {
243    EndBlock,
244    Mutation { object_id: u64, mutation: MutationV40 },
245    Commit,
246    Discard(u64),
247    DidFlushDevice(u64),
248    DataChecksums(Range<u64>, ChecksumsV38),
249}
250
251pub(super) fn journal_handle_options() -> HandleOptions {
252    HandleOptions { skip_journal_checks: true, ..Default::default() }
253}
254
255/// The journal records a stream of mutations that are to be applied to other objects.  At mount
256/// time, these records can be replayed into memory.  It provides a way to quickly persist changes
257/// without having to make a large number of writes; they can be deferred to a later time (e.g.
258/// when a sufficient number have been queued).  It also provides support for transactions, the
259/// ability to have mutations that are to be applied atomically together.
260pub struct Journal {
261    objects: Arc<ObjectManager>,
262    handle: OnceLock<DataObjectHandle<ObjectStore>>,
263    super_block_manager: SuperBlockManager,
264    inner: Mutex<Inner>,
265    writer_mutex: Mutex<()>,
266    sync_mutex: futures::lock::Mutex<()>,
267    trace: AtomicBool,
268
269    // This event is used when we are waiting for a compaction to free up journal space.
270    reclaim_event: Event,
271}
272
273struct Inner {
274    super_block_header: SuperBlockHeader,
275
276    // The offset that we can zero the journal up to now that it is no longer needed.
277    zero_offset: Option<u64>,
278
279    // The journal offset that we most recently flushed to the device.
280    device_flushed_offset: u64,
281
282    // If true, indicates a DidFlushDevice record is pending.
283    needs_did_flush_device: bool,
284
285    // The writer for the journal.
286    writer: JournalWriter,
287
288    // Set when a reset is encountered during a read.
289    // Used at write pre_commit() time to ensure we write a version first thing after a reset.
290    output_reset_version: bool,
291
292    // Waker for the flush task.
293    flush_waker: Option<Waker>,
294
295    // Indicates the journal has been terminated.
296    terminate: bool,
297
298    // Latched error indicating reason for journal termination if not graceful.
299    terminate_reason: Option<Error>,
300
301    // Disable compactions.
302    disable_compactions: bool,
303
304    // True if compactions are running.
305    compaction_running: bool,
306
307    // Waker for the sync task for when it's waiting for the flush task to finish.
308    sync_waker: Option<Waker>,
309
310    // The last offset we flushed to the journal file.
311    flushed_offset: u64,
312
313    // The last offset that should be considered valid in the journal file.  Most of the time, this
314    // will be the same as `flushed_offset` but at mount time, this could be less and will only be
315    // up to the end of the last valid transaction; it won't include transactions that follow that
316    // have been discarded.
317    valid_to: u64,
318
319    // If, after replaying, we have to discard a number of mutations (because they don't validate),
320    // this offset specifies where we need to discard back to.  This is so that when we next replay,
321    // we ignore those mutations and continue with new good mutations.
322    discard_offset: Option<u64>,
323
324    // In the steady state, the journal should fluctuate between being approximately half of this
325    // number and this number.  New super-blocks will be written every time about half of this
326    // amount is written to the journal.
327    reclaim_size: u64,
328
329    image_builder_mode: Option<SuperBlockInstance>,
330
331    // If true and `needs_barrier`, issue a pre-barrier on the first device write of each journal
332    // write (which happens in multiples of `BLOCK_SIZE`). This ensures that all the corresponding
333    // data writes make it to disk before the journal gets written to.
334    barriers_enabled: bool,
335
336    // If true, indicates that data write requests have been made to the device since the last
337    // journal write.
338    needs_barrier: bool,
339
340    // True if a compaction is being forced for reasons other than the journal being full.
341    forced_compaction: bool,
342}
343
344impl Inner {
345    fn terminate(&mut self, reason: Option<Error>) {
346        self.terminate = true;
347
348        if let Some(err) = reason {
349            error!(error:? = err; "Terminating journal");
350            // Log previous error if one was already set, otherwise latch the error.
351            if let Some(prev_err) = self.terminate_reason.as_ref() {
352                error!(error:? = prev_err; "Journal previously terminated");
353            } else {
354                self.terminate_reason = Some(err);
355            }
356        }
357
358        if let Some(waker) = self.flush_waker.take() {
359            waker.wake();
360        }
361        if let Some(waker) = self.sync_waker.take() {
362            waker.wake();
363        }
364    }
365}
366
367pub struct JournalOptions {
368    /// In the steady state, the journal should fluctuate between being approximately half of this
369    /// number and this number.  New super-blocks will be written every time about half of this
370    /// amount is written to the journal.
371    pub reclaim_size: u64,
372
373    // If true, issue a pre-barrier on the first device write of each journal write (which happens
374    // in multiples of `BLOCK_SIZE`). This ensures that all the corresponding data writes make it
375    // to disk before the journal gets written to.
376    pub barriers_enabled: bool,
377}
378
379impl Default for JournalOptions {
380    fn default() -> Self {
381        JournalOptions { reclaim_size: DEFAULT_RECLAIM_SIZE, barriers_enabled: false }
382    }
383}
384
385struct JournaledTransactions {
386    transactions: Vec<JournaledTransaction>,
387    device_flushed_offset: u64,
388}
389
390#[derive(Debug, Default)]
391pub struct JournaledTransaction {
392    pub checkpoint: JournalCheckpoint,
393    pub root_parent_mutations: Vec<Mutation>,
394    pub root_mutations: Vec<Mutation>,
395    /// List of (store_object_id, mutation).
396    pub non_root_mutations: Vec<(u64, Mutation)>,
397    pub end_offset: u64,
398    pub checksums: Vec<JournaledChecksums>,
399
400    /// Records offset + 1 of the matching begin_flush transaction. The +1 is because we want to
401    /// ignore the begin flush transaction; we don't need or want to replay it.
402    pub end_flush: Option<(/* store_id: */ u64, /* begin offset: */ u64)>,
403
404    /// The volume which was deleted in this transaction, if any.
405    pub volume_deleted: Option</* store_id: */ u64>,
406}
407
408impl JournaledTransaction {
409    fn new(checkpoint: JournalCheckpoint) -> Self {
410        Self { checkpoint, ..Default::default() }
411    }
412}
413
414const VOLUME_DELETED: u64 = u64::MAX;
415
416#[derive(Debug)]
417pub struct JournaledChecksums {
418    pub device_range: Range<u64>,
419    pub checksums: Checksums,
420    pub first_write: bool,
421}
422
423/// Handles for journal-like objects have some additional functionality to manage their extents,
424/// since during replay we need to add extents as we find them.
425pub trait JournalHandle: ReadObjectHandle {
426    /// The end offset of the last extent in the JournalHandle.  Used only for validating extents
427    /// (which will be skipped if None is returned).
428    /// Note this is equivalent in value to ReadObjectHandle::get_size, when present.
429    fn end_offset(&self) -> Option<u64>;
430    /// Adds an extent to the current end of the journal stream.
431    /// `added_offset` is the offset into the journal of the transaction which added this extent,
432    /// used for discard_extents.
433    fn push_extent(&mut self, added_offset: u64, device_range: Range<u64>);
434    /// Discards all extents which were added in a transaction at offset >= |discard_offset|.
435    fn discard_extents(&mut self, discard_offset: u64);
436}
437
438// Provide a stub implementation for DataObjectHandle so we can use it in
439// Journal::read_transactions.  Manual extent management is a NOP (which is OK since presumably the
440// DataObjectHandle already knows where its extents live).
441impl<S: HandleOwner> JournalHandle for DataObjectHandle<S> {
442    fn end_offset(&self) -> Option<u64> {
443        None
444    }
445    fn push_extent(&mut self, _added_offset: u64, _device_range: Range<u64>) {
446        // NOP
447    }
448    fn discard_extents(&mut self, _discard_offset: u64) {
449        // NOP
450    }
451}
452
453#[fxfs_trace::trace]
454impl Journal {
455    pub fn new(objects: Arc<ObjectManager>, options: JournalOptions) -> Journal {
456        let starting_checksum = rand::random_range(1..u64::MAX);
457        Journal {
458            objects: objects,
459            handle: OnceLock::new(),
460            super_block_manager: SuperBlockManager::new(),
461            inner: Mutex::new(Inner {
462                super_block_header: SuperBlockHeader::default(),
463                zero_offset: None,
464                device_flushed_offset: 0,
465                needs_did_flush_device: false,
466                writer: JournalWriter::new(BLOCK_SIZE as usize, starting_checksum),
467                output_reset_version: false,
468                flush_waker: None,
469                terminate: false,
470                terminate_reason: None,
471                disable_compactions: false,
472                compaction_running: false,
473                sync_waker: None,
474                flushed_offset: 0,
475                valid_to: 0,
476                discard_offset: None,
477                reclaim_size: options.reclaim_size,
478                image_builder_mode: None,
479                barriers_enabled: options.barriers_enabled,
480                needs_barrier: false,
481                forced_compaction: false,
482            }),
483            writer_mutex: Mutex::new(()),
484            sync_mutex: futures::lock::Mutex::new(()),
485            trace: AtomicBool::new(false),
486            reclaim_event: Event::new(),
487        }
488    }
489
490    pub fn set_trace(&self, trace: bool) {
491        let old_value = self.trace.swap(trace, Ordering::Relaxed);
492        if trace != old_value {
493            info!(trace; "J: trace");
494        }
495    }
496
497    pub fn set_image_builder_mode(&self, mode: Option<SuperBlockInstance>) {
498        self.inner.lock().image_builder_mode = mode;
499        if let Some(instance) = mode {
500            *self.super_block_manager.next_instance.lock() = instance;
501        }
502    }
503
504    pub fn image_builder_mode(&self) -> Option<SuperBlockInstance> {
505        self.inner.lock().image_builder_mode
506    }
507
508    #[cfg(feature = "migration")]
509    pub fn set_filesystem_uuid(&self, uuid: &[u8; 16]) -> Result<(), Error> {
510        ensure!(
511            self.inner.lock().image_builder_mode.is_some(),
512            "Can only set filesystem uuid in image builder mode."
513        );
514        self.inner.lock().super_block_header.guid.0 = uuid::Uuid::from_bytes(*uuid);
515        Ok(())
516    }
517
518    pub(crate) async fn read_superblocks(
519        &self,
520        device: Arc<dyn Device>,
521        block_size: u64,
522    ) -> Result<(SuperBlockHeader, ObjectStore), Error> {
523        self.super_block_manager.load(device, block_size).await
524    }
525
526    /// Used during replay to validate a mutation.  This should return false if the mutation is not
527    /// valid and should not be applied.  This could be for benign reasons: e.g. the device flushed
528    /// data out-of-order, or because of a malicious actor.
529    fn validate_mutation(&self, mutation: &Mutation, block_size: u64, device_size: u64) -> bool {
530        match mutation {
531            Mutation::ObjectStore(ObjectStoreMutation {
532                item:
533                    Item {
534                        key:
535                            ObjectKey {
536                                data:
537                                    ObjectKeyData::Attribute(
538                                        _,
539                                        AttributeKey::Extent(ExtentKey { range }),
540                                    ),
541                                ..
542                            },
543                        value: ObjectValue::Extent(ExtentValue::Some { device_offset, mode, .. }),
544                        ..
545                    },
546                ..
547            }) => {
548                if range.is_empty() || !range.is_aligned(block_size) {
549                    return false;
550                }
551                let len = range.length().unwrap();
552                if let ExtentMode::Cow(checksums) = mode {
553                    if checksums.len() > 0 {
554                        if len % checksums.len() as u64 != 0 {
555                            return false;
556                        }
557                        if (len / checksums.len() as u64) % block_size != 0 {
558                            return false;
559                        }
560                    }
561                }
562                if *device_offset % block_size != 0
563                    || *device_offset >= device_size
564                    || device_size - *device_offset < len
565                {
566                    return false;
567                }
568            }
569            Mutation::ObjectStore(_) => {}
570            Mutation::EncryptedObjectStore(_) => {}
571            Mutation::Allocator(AllocatorMutation::Allocate { device_range, owner_object_id }) => {
572                return !device_range.is_empty()
573                    && *owner_object_id != INVALID_OBJECT_ID
574                    && device_range.end <= device_size;
575            }
576            Mutation::Allocator(AllocatorMutation::Deallocate {
577                device_range,
578                owner_object_id,
579            }) => {
580                return !device_range.is_empty()
581                    && *owner_object_id != INVALID_OBJECT_ID
582                    && device_range.end <= device_size;
583            }
584            Mutation::Allocator(AllocatorMutation::MarkForDeletion(owner_object_id)) => {
585                return *owner_object_id != INVALID_OBJECT_ID;
586            }
587            Mutation::Allocator(AllocatorMutation::SetLimit { owner_object_id, .. }) => {
588                return *owner_object_id != INVALID_OBJECT_ID;
589            }
590            Mutation::BeginFlush => {}
591            Mutation::EndFlush => {}
592            Mutation::DeleteVolume => {}
593            Mutation::UpdateBorrowed(_) => {}
594            Mutation::UpdateMutationsKey(_) => {}
595            Mutation::CreateInternalDir(owner_object_id) => {
596                return *owner_object_id != INVALID_OBJECT_ID;
597            }
598        }
599        true
600    }
601
602    // Assumes that `mutation` has been validated.
603    fn update_checksum_list(
604        &self,
605        journal_offset: u64,
606        mutation: &Mutation,
607        checksum_list: &mut ChecksumList,
608    ) -> Result<(), Error> {
609        match mutation {
610            Mutation::ObjectStore(_) => {}
611            Mutation::Allocator(AllocatorMutation::Deallocate { device_range, .. }) => {
612                checksum_list.mark_deallocated(journal_offset, device_range.clone().into());
613            }
614            _ => {}
615        }
616        Ok(())
617    }
618
619    /// Reads the latest super-block, and then replays journaled records.
620    #[trace]
621    pub async fn replay(
622        &self,
623        filesystem: Arc<FxFilesystem>,
624        on_new_allocator: Option<Box<dyn Fn(Arc<Allocator>) + Send + Sync>>,
625    ) -> Result<(), Error> {
626        let block_size = filesystem.block_size();
627
628        let (super_block, root_parent) =
629            self.super_block_manager.load(filesystem.device(), block_size).await?;
630
631        let root_parent = Arc::new(ObjectStore::attach_filesystem(root_parent, filesystem.clone()));
632
633        self.objects.set_root_parent_store(root_parent.clone());
634        let allocator =
635            Arc::new(Allocator::new(filesystem.clone(), super_block.allocator_object_id));
636        if let Some(on_new_allocator) = on_new_allocator {
637            on_new_allocator(allocator.clone());
638        }
639        self.objects.set_allocator(allocator.clone());
640        self.objects.set_borrowed_metadata_space(super_block.borrowed_metadata_space);
641        self.objects.set_last_end_offset(super_block.super_block_journal_file_offset);
642        {
643            let mut inner = self.inner.lock();
644            inner.super_block_header = super_block.clone();
645        }
646
647        let device = filesystem.device();
648
649        let mut handle;
650        {
651            let root_parent_layer = root_parent.tree().mutable_layer();
652            let mut iter = root_parent_layer
653                .seek(Bound::Included(&ObjectKey::attribute(
654                    super_block.journal_object_id,
655                    DEFAULT_DATA_ATTRIBUTE_ID,
656                    AttributeKey::Extent(ExtentKey::search_key_from_offset(round_down(
657                        super_block.journal_checkpoint.file_offset,
658                        BLOCK_SIZE,
659                    ))),
660                )))
661                .await
662                .context("Failed to seek root parent store")?;
663            let start_offset = if let Some(ItemRef {
664                key:
665                    ObjectKey {
666                        data:
667                            ObjectKeyData::Attribute(
668                                DEFAULT_DATA_ATTRIBUTE_ID,
669                                AttributeKey::Extent(ExtentKey { range }),
670                            ),
671                        ..
672                    },
673                ..
674            }) = iter.get()
675            {
676                range.start
677            } else {
678                0
679            };
680            handle = BootstrapObjectHandle::new_with_start_offset(
681                super_block.journal_object_id,
682                device.clone(),
683                start_offset,
684            );
685            while let Some(item) = iter.get() {
686                if !match item.into() {
687                    Some((
688                        object_id,
689                        DEFAULT_DATA_ATTRIBUTE_ID,
690                        ExtentKey { range },
691                        ExtentValue::Some { device_offset, .. },
692                    )) if object_id == super_block.journal_object_id => {
693                        if let Some(end_offset) = handle.end_offset() {
694                            if range.start != end_offset {
695                                bail!(anyhow!(FxfsError::Inconsistent).context(format!(
696                                    "Unexpected journal extent {:?}, expected start: {}",
697                                    item, end_offset
698                                )));
699                            }
700                        }
701                        handle.push_extent(
702                            0, // We never discard extents from the root parent store.
703                            *device_offset
704                                ..*device_offset + range.length().context("Invalid extent")?,
705                        );
706                        true
707                    }
708                    _ => false,
709                } {
710                    break;
711                }
712                iter.advance().await.context("Failed to advance root parent store iterator")?;
713            }
714        }
715
716        let mut reader = JournalReader::new(handle, &super_block.journal_checkpoint);
717        let JournaledTransactions { mut transactions, device_flushed_offset } = self
718            .read_transactions(&mut reader, None, INVALID_OBJECT_ID)
719            .await
720            .context("Reading transactions for replay")?;
721
722        // Validate all the mutations.
723        let mut checksum_list = ChecksumList::new(device_flushed_offset);
724        let mut valid_to = reader.journal_file_checkpoint().file_offset;
725        let device_size = device.size();
726        'bad_replay: for JournaledTransaction {
727            checkpoint,
728            root_parent_mutations,
729            root_mutations,
730            non_root_mutations,
731            checksums,
732            ..
733        } in &transactions
734        {
735            for JournaledChecksums { device_range, checksums, first_write } in checksums {
736                checksum_list
737                    .push(
738                        checkpoint.file_offset,
739                        device_range.clone(),
740                        checksums.maybe_as_ref().context("Malformed checksums")?,
741                        *first_write,
742                    )
743                    .context("Pushing journal checksum records to checksum list")?;
744            }
745            for mutation in root_parent_mutations
746                .iter()
747                .chain(root_mutations)
748                .chain(non_root_mutations.iter().map(|(_, m)| m))
749            {
750                if !self.validate_mutation(mutation, block_size, device_size) {
751                    info!(mutation:?; "Stopping replay at bad mutation");
752                    valid_to = checkpoint.file_offset;
753                    break 'bad_replay;
754                }
755                self.update_checksum_list(checkpoint.file_offset, &mutation, &mut checksum_list)?;
756            }
757        }
758
759        // Validate the checksums. Note if barriers are enabled, there will be no checksums in
760        // practice to verify.
761        let valid_to = checksum_list
762            .verify(device.as_ref(), valid_to)
763            .await
764            .context("Failed to validate checksums")?;
765
766        // Apply the mutations...
767
768        let mut last_checkpoint = reader.journal_file_checkpoint();
769        let mut journal_offsets = super_block.journal_file_offsets.clone();
770
771        // Start with the root-parent mutations, and also determine the journal offsets for all
772        // other objects.
773        for (
774            index,
775            JournaledTransaction {
776                checkpoint,
777                root_parent_mutations,
778                end_flush,
779                volume_deleted,
780                ..
781            },
782        ) in transactions.iter_mut().enumerate()
783        {
784            if checkpoint.file_offset >= valid_to {
785                last_checkpoint = checkpoint.clone();
786
787                // Truncate the transactions so we don't need to worry about them on the next pass.
788                transactions.truncate(index);
789                break;
790            }
791
792            let context = ApplyContext { mode: ApplyMode::Replay, checkpoint: checkpoint.clone() };
793            for mutation in root_parent_mutations.drain(..) {
794                self.objects
795                    .apply_mutation(
796                        super_block.root_parent_store_object_id,
797                        mutation,
798                        &context,
799                        AssocObj::None,
800                    )
801                    .context("Failed to replay root parent store mutations")?;
802            }
803
804            if let Some((object_id, journal_offset)) = end_flush {
805                journal_offsets.insert(*object_id, *journal_offset);
806            }
807
808            if let Some(object_id) = volume_deleted {
809                journal_offsets.insert(*object_id, VOLUME_DELETED);
810            }
811        }
812
813        // Now we can open the root store.
814        let root_store = ObjectStore::open(
815            &root_parent,
816            super_block.root_store_object_id,
817            Box::new(NullCache {}),
818        )
819        .await
820        .context("Unable to open root store")?;
821
822        ensure!(
823            !root_store.is_encrypted(),
824            anyhow!(FxfsError::Inconsistent).context("Root store is encrypted")
825        );
826        self.objects.set_root_store(root_store);
827
828        let root_store_offset =
829            journal_offsets.get(&super_block.root_store_object_id).copied().unwrap_or(0);
830
831        // Now replay the root store mutations.
832        for JournaledTransaction { checkpoint, root_mutations, .. } in &mut transactions {
833            if checkpoint.file_offset < root_store_offset {
834                continue;
835            }
836
837            let context = ApplyContext { mode: ApplyMode::Replay, checkpoint: checkpoint.clone() };
838            for mutation in root_mutations.drain(..) {
839                self.objects
840                    .apply_mutation(
841                        super_block.root_store_object_id,
842                        mutation,
843                        &context,
844                        AssocObj::None,
845                    )
846                    .context("Failed to replay root store mutations")?;
847            }
848        }
849
850        // Now we can open the allocator.
851        allocator.open().await.context("Failed to open allocator")?;
852
853        // Now replay all other mutations.
854        for JournaledTransaction { checkpoint, non_root_mutations, end_offset, .. } in transactions
855        {
856            self.objects
857                .replay_mutations(
858                    non_root_mutations,
859                    &journal_offsets,
860                    &ApplyContext { mode: ApplyMode::Replay, checkpoint },
861                    end_offset,
862                )
863                .await
864                .context("Failed to replay mutations")?;
865        }
866
867        allocator.on_replay_complete().await.context("Failed to complete replay for allocator")?;
868
869        let discarded_to =
870            if last_checkpoint.file_offset != reader.journal_file_checkpoint().file_offset {
871                Some(reader.journal_file_checkpoint().file_offset)
872            } else {
873                None
874            };
875
876        // Configure the journal writer so that we can continue.
877        {
878            if last_checkpoint.file_offset < super_block.super_block_journal_file_offset {
879                return Err(anyhow!(FxfsError::Inconsistent).context(format!(
880                    "journal replay cut short; journal finishes at {}, but super-block was \
881                     written at {}",
882                    last_checkpoint.file_offset, super_block.super_block_journal_file_offset
883                )));
884            }
885            let handle = ObjectStore::open_object(
886                &root_parent,
887                super_block.journal_object_id,
888                journal_handle_options(),
889                None,
890            )
891            .await
892            .with_context(|| {
893                format!(
894                    "Failed to open journal file (object id: {})",
895                    super_block.journal_object_id
896                )
897            })?;
898            let _ = self.handle.set(handle);
899            let mut inner = self.inner.lock();
900            reader.skip_to_end_of_block();
901            let mut writer_checkpoint = reader.journal_file_checkpoint();
902
903            // Make sure we don't accidentally use the reader from now onwards.
904            std::mem::drop(reader);
905
906            // Reset the stream to indicate that we've remounted the journal.
907            writer_checkpoint.checksum ^= RESET_XOR;
908            writer_checkpoint.version = LATEST_VERSION;
909            inner.flushed_offset = writer_checkpoint.file_offset;
910
911            // When we open the filesystem as writable, we flush the device.
912            inner.device_flushed_offset = inner.flushed_offset;
913
914            inner.writer.seek(writer_checkpoint);
915            inner.output_reset_version = true;
916            inner.valid_to = last_checkpoint.file_offset;
917            if last_checkpoint.file_offset < inner.flushed_offset {
918                inner.discard_offset = Some(last_checkpoint.file_offset);
919            }
920        }
921
922        self.objects
923            .on_replay_complete()
924            .await
925            .context("Failed to complete replay for object manager")?;
926
927        info!(checkpoint = last_checkpoint.file_offset, discarded_to; "replay complete");
928        Ok(())
929    }
930
931    async fn read_transactions(
932        &self,
933        reader: &mut JournalReader,
934        end_offset: Option<u64>,
935        object_id_filter: u64,
936    ) -> Result<JournaledTransactions, Error> {
937        let mut transactions = Vec::new();
938        let (mut device_flushed_offset, root_parent_store_object_id, root_store_object_id) = {
939            let super_block = &self.inner.lock().super_block_header;
940            (
941                super_block.super_block_journal_file_offset,
942                super_block.root_parent_store_object_id,
943                super_block.root_store_object_id,
944            )
945        };
946        let mut current_transaction = None;
947        let mut begin_flush_offsets = HashMap::default();
948        let mut stores_deleted = HashSet::new();
949        loop {
950            // Cache the checkpoint before we deserialize a record.
951            let checkpoint = reader.journal_file_checkpoint();
952            if let Some(end_offset) = end_offset {
953                if checkpoint.file_offset >= end_offset {
954                    break;
955                }
956            }
957            let result =
958                reader.deserialize().await.context("Failed to deserialize journal record")?;
959            match result {
960                ReadResult::Reset(_) => {
961                    if current_transaction.is_some() {
962                        current_transaction = None;
963                        transactions.pop();
964                    }
965                    let offset = reader.journal_file_checkpoint().file_offset;
966                    if offset > device_flushed_offset {
967                        device_flushed_offset = offset;
968                    }
969                }
970                ReadResult::Some(record) => {
971                    match record {
972                        JournalRecord::EndBlock => {
973                            reader.skip_to_end_of_block();
974                        }
975                        JournalRecord::Mutation { object_id, mutation } => {
976                            let current_transaction = match current_transaction.as_mut() {
977                                None => {
978                                    transactions.push(JournaledTransaction::new(checkpoint));
979                                    current_transaction = transactions.last_mut();
980                                    current_transaction.as_mut().unwrap()
981                                }
982                                Some(transaction) => transaction,
983                            };
984
985                            if stores_deleted.contains(&object_id) {
986                                bail!(
987                                    anyhow!(FxfsError::Inconsistent)
988                                        .context("Encountered mutations for deleted store")
989                                );
990                            }
991
992                            match &mutation {
993                                Mutation::BeginFlush => {
994                                    begin_flush_offsets.insert(
995                                        object_id,
996                                        current_transaction.checkpoint.file_offset,
997                                    );
998                                }
999                                Mutation::EndFlush => {
1000                                    if let Some(offset) = begin_flush_offsets.remove(&object_id) {
1001                                        if let Some(deleted_volume) =
1002                                            &current_transaction.volume_deleted
1003                                        {
1004                                            if *deleted_volume == object_id {
1005                                                bail!(anyhow!(FxfsError::Inconsistent).context(
1006                                                    "Multiple EndFlush/DeleteVolume mutations in a \
1007                                                    single transaction for the same object"
1008                                                ));
1009                                            }
1010                                        }
1011                                        // The +1 is because we don't want to replay the transaction
1012                                        // containing the begin flush; we don't need or want to
1013                                        // replay it.
1014                                        if current_transaction
1015                                            .end_flush
1016                                            .replace((object_id, offset + 1))
1017                                            .is_some()
1018                                        {
1019                                            bail!(anyhow!(FxfsError::Inconsistent).context(
1020                                                "Multiple EndFlush mutations in a \
1021                                                 single transaction"
1022                                            ));
1023                                        }
1024                                    }
1025                                }
1026                                Mutation::DeleteVolume => {
1027                                    if let Some((flushed_object, _)) =
1028                                        &current_transaction.end_flush
1029                                    {
1030                                        if *flushed_object == object_id {
1031                                            bail!(anyhow!(FxfsError::Inconsistent).context(
1032                                                "Multiple EndFlush/DeleteVolume mutations in a \
1033                                                    single transaction for the same object"
1034                                            ));
1035                                        }
1036                                    }
1037                                    if current_transaction
1038                                        .volume_deleted
1039                                        .replace(object_id)
1040                                        .is_some()
1041                                    {
1042                                        bail!(anyhow!(FxfsError::Inconsistent).context(
1043                                            "Multiple DeleteVolume mutations in a single \
1044                                             transaction"
1045                                        ));
1046                                    }
1047                                    stores_deleted.insert(object_id);
1048                                }
1049                                _ => {}
1050                            }
1051
1052                            // If this mutation doesn't need to be applied, don't bother adding it
1053                            // to the transaction.
1054                            if (object_id_filter == INVALID_OBJECT_ID
1055                                || object_id_filter == object_id)
1056                                && self.should_apply(object_id, &current_transaction.checkpoint)
1057                            {
1058                                if object_id == root_parent_store_object_id {
1059                                    current_transaction.root_parent_mutations.push(mutation);
1060                                } else if object_id == root_store_object_id {
1061                                    current_transaction.root_mutations.push(mutation);
1062                                } else {
1063                                    current_transaction
1064                                        .non_root_mutations
1065                                        .push((object_id, mutation));
1066                                }
1067                            }
1068                        }
1069                        JournalRecord::DataChecksums(device_range, checksums, first_write) => {
1070                            let current_transaction = match current_transaction.as_mut() {
1071                                None => {
1072                                    transactions.push(JournaledTransaction::new(checkpoint));
1073                                    current_transaction = transactions.last_mut();
1074                                    current_transaction.as_mut().unwrap()
1075                                }
1076                                Some(transaction) => transaction,
1077                            };
1078                            current_transaction.checksums.push(JournaledChecksums {
1079                                device_range,
1080                                checksums,
1081                                first_write,
1082                            });
1083                        }
1084                        JournalRecord::Commit => {
1085                            if let Some(&mut JournaledTransaction {
1086                                ref checkpoint,
1087                                ref root_parent_mutations,
1088                                ref mut end_offset,
1089                                ..
1090                            }) = current_transaction.take()
1091                            {
1092                                for mutation in root_parent_mutations {
1093                                    // Snoop the mutations for any that might apply to the journal
1094                                    // file so that we can pass them to the reader so that it can
1095                                    // read the journal file.
1096                                    if let Mutation::ObjectStore(ObjectStoreMutation {
1097                                        item:
1098                                            Item {
1099                                                key:
1100                                                    ObjectKey {
1101                                                        object_id,
1102                                                        data:
1103                                                            ObjectKeyData::Attribute(
1104                                                                DEFAULT_DATA_ATTRIBUTE_ID,
1105                                                                AttributeKey::Extent(ExtentKey {
1106                                                                    range,
1107                                                                }),
1108                                                            ),
1109                                                        ..
1110                                                    },
1111                                                value:
1112                                                    ObjectValue::Extent(ExtentValue::Some {
1113                                                        device_offset,
1114                                                        ..
1115                                                    }),
1116                                                ..
1117                                            },
1118                                        ..
1119                                    }) = mutation
1120                                    {
1121                                        // Add the journal extents we find on the way to our
1122                                        // reader.
1123                                        let handle = reader.handle();
1124                                        if *object_id != handle.object_id() {
1125                                            continue;
1126                                        }
1127                                        if let Some(end_offset) = handle.end_offset() {
1128                                            if range.start != end_offset {
1129                                                bail!(anyhow!(FxfsError::Inconsistent).context(
1130                                                    format!(
1131                                                        "Unexpected journal extent {:?} -> {}, \
1132                                                           expected start: {}",
1133                                                        range, device_offset, end_offset,
1134                                                    )
1135                                                ));
1136                                            }
1137                                        }
1138                                        handle.push_extent(
1139                                            checkpoint.file_offset,
1140                                            *device_offset
1141                                                ..*device_offset
1142                                                    + range.length().context("Invalid extent")?,
1143                                        );
1144                                    }
1145                                }
1146                                *end_offset = reader.journal_file_checkpoint().file_offset;
1147                            }
1148                        }
1149                        JournalRecord::Discard(offset) => {
1150                            if offset == 0 {
1151                                bail!(
1152                                    anyhow!(FxfsError::Inconsistent)
1153                                        .context("Invalid offset for Discard")
1154                                );
1155                            }
1156                            if let Some(transaction) = current_transaction.as_ref() {
1157                                if transaction.checkpoint.file_offset < offset {
1158                                    // Odd, but OK.
1159                                    continue;
1160                                }
1161                            }
1162                            current_transaction = None;
1163                            while let Some(transaction) = transactions.last() {
1164                                if transaction.checkpoint.file_offset < offset {
1165                                    break;
1166                                }
1167                                transactions.pop();
1168                            }
1169                            reader.handle().discard_extents(offset);
1170                        }
1171                        JournalRecord::DidFlushDevice(offset) => {
1172                            if offset > device_flushed_offset {
1173                                device_flushed_offset = offset;
1174                            }
1175                        }
1176                    }
1177                }
1178                // This is expected when we reach the end of the journal stream.
1179                ReadResult::ChecksumMismatch => break,
1180            }
1181        }
1182
1183        // Discard any uncommitted transaction.
1184        if current_transaction.is_some() {
1185            transactions.pop();
1186        }
1187
1188        Ok(JournaledTransactions { transactions, device_flushed_offset })
1189    }
1190
1191    /// Creates an empty filesystem with the minimum viable objects (including a root parent and
1192    /// root store but no further child stores).
1193    pub async fn init_empty(&self, filesystem: Arc<FxFilesystem>) -> Result<(), Error> {
1194        // The following constants are only used at format time. When mounting, the recorded values
1195        // in the superblock should be used.  The root parent store does not have a parent, but
1196        // needs an object ID to be registered with ObjectManager, so it cannot collide (i.e. have
1197        // the same object ID) with any objects in the root store that use the journal to track
1198        // mutations.
1199        const INIT_ROOT_PARENT_STORE_OBJECT_ID: u64 = 3;
1200        const INIT_ROOT_STORE_OBJECT_ID: u64 = 4;
1201        const INIT_ALLOCATOR_OBJECT_ID: u64 = 5;
1202
1203        info!(device_size = filesystem.device().size(); "Formatting");
1204
1205        let checkpoint = JournalCheckpoint {
1206            version: LATEST_VERSION,
1207            ..self.inner.lock().writer.journal_file_checkpoint()
1208        };
1209
1210        let mut current_generation = 1;
1211        if filesystem.options().image_builder_mode.is_some() {
1212            // Note that in non-image_builder_mode we write both superblocks when we format
1213            // (in FxFilesystemBuilder::open). In image_builder_mode we only write once at the end
1214            // as part of finalize(), which is why we must make sure the generation we write is
1215            // newer than any existing generation.
1216
1217            // Note: This should is the *filesystem* block size, not the device block size which
1218            // is currently always 4096 (https://fxbug.dev/42063349)
1219            let block_size = filesystem.block_size();
1220            match self.read_superblocks(filesystem.device(), block_size).await {
1221                Ok((super_block, _)) => {
1222                    log::info!(
1223                        "Found existing superblock with generation {}. Bumping by 1.",
1224                        super_block.generation
1225                    );
1226                    current_generation = super_block.generation.wrapping_add(1);
1227                }
1228                Err(_) => {
1229                    // TODO(https://fxbug.dev/463757813): It's not unusual to fail to read
1230                    // superblocks when we're formatting a new filesystem but we should probably
1231                    // fail the format if we get an IO error.
1232                }
1233            }
1234        }
1235
1236        let root_parent = ObjectStore::new_empty(
1237            None,
1238            INIT_ROOT_PARENT_STORE_OBJECT_ID,
1239            filesystem.clone(),
1240            Box::new(NullCache {}),
1241        );
1242        self.objects.set_root_parent_store(root_parent.clone());
1243
1244        let allocator = Arc::new(Allocator::new(filesystem.clone(), INIT_ALLOCATOR_OBJECT_ID));
1245        self.objects.set_allocator(allocator.clone());
1246        self.objects.init_metadata_reservation()?;
1247
1248        let journal_handle;
1249        let super_block_a_handle;
1250        let super_block_b_handle;
1251        let root_store;
1252        let mut transaction = filesystem
1253            .clone()
1254            .new_transaction(
1255                lock_keys![],
1256                Options { skip_journal_checks: true, ..Default::default() },
1257            )
1258            .await?;
1259        root_store = root_parent
1260            .new_child_store(
1261                &mut transaction,
1262                NewChildStoreOptions { object_id: INIT_ROOT_STORE_OBJECT_ID, ..Default::default() },
1263                Box::new(NullCache {}),
1264            )
1265            .await
1266            .context("new_child_store")?;
1267        self.objects.set_root_store(root_store.clone());
1268
1269        allocator.create(&mut transaction).await?;
1270
1271        // Create the super-block objects...
1272        super_block_a_handle = ObjectStore::create_object_with_id(
1273            &root_store,
1274            &mut transaction,
1275            ReservedId::new(&root_store, NonZero::new(SuperBlockInstance::A.object_id()).unwrap()),
1276            HandleOptions::default(),
1277            None,
1278        )
1279        .context("create super block")?;
1280        root_store.update_last_object_id(SuperBlockInstance::A.object_id());
1281        super_block_a_handle
1282            .extend(&mut transaction, SuperBlockInstance::A.first_extent())
1283            .await
1284            .context("extend super block")?;
1285        super_block_b_handle = ObjectStore::create_object_with_id(
1286            &root_store,
1287            &mut transaction,
1288            ReservedId::new(&root_store, NonZero::new(SuperBlockInstance::B.object_id()).unwrap()),
1289            HandleOptions::default(),
1290            None,
1291        )
1292        .context("create super block")?;
1293        root_store.update_last_object_id(SuperBlockInstance::B.object_id());
1294        super_block_b_handle
1295            .extend(&mut transaction, SuperBlockInstance::B.first_extent())
1296            .await
1297            .context("extend super block")?;
1298
1299        // the journal object...
1300        journal_handle = ObjectStore::create_object(
1301            &root_parent,
1302            &mut transaction,
1303            journal_handle_options(),
1304            None,
1305        )
1306        .await
1307        .context("create journal")?;
1308        if self.inner.lock().image_builder_mode.is_none() {
1309            let mut file_range = 0..self.chunk_size();
1310            journal_handle
1311                .preallocate_range(&mut transaction, &mut file_range)
1312                .await
1313                .context("preallocate journal")?;
1314            if file_range.start < file_range.end {
1315                bail!("preallocate_range returned too little space");
1316            }
1317        }
1318
1319        // Write the root store object info.
1320        root_store.create(&mut transaction).await?;
1321
1322        // The root parent graveyard.
1323        root_parent.set_graveyard_directory_object_id(
1324            Graveyard::create(&mut transaction, &root_parent).await?,
1325        );
1326
1327        transaction.commit().await?;
1328
1329        self.inner.lock().super_block_header = SuperBlockHeader::new(
1330            current_generation,
1331            root_parent.store_object_id(),
1332            root_parent.graveyard_directory_object_id(),
1333            root_store.store_object_id(),
1334            allocator.object_id(),
1335            journal_handle.object_id(),
1336            checkpoint,
1337            /* earliest_version: */ LATEST_VERSION,
1338        );
1339
1340        // Initialize the journal writer.
1341        let _ = self.handle.set(journal_handle);
1342        Ok(())
1343    }
1344
1345    /// Normally we allocate the journal when creating the filesystem.
1346    /// This is used image_builder_mode when journal allocation is done last.
1347    pub async fn allocate_journal(&self) -> Result<(), Error> {
1348        let handle = self.handle.get().unwrap();
1349        let filesystem = handle.store().filesystem();
1350        let mut transaction = filesystem
1351            .clone()
1352            .new_transaction(
1353                lock_keys![LockKey::object(handle.store().store_object_id(), handle.object_id()),],
1354                Options { skip_journal_checks: true, ..Default::default() },
1355            )
1356            .await?;
1357        let mut file_range = 0..self.chunk_size();
1358        self.handle
1359            .get()
1360            .unwrap()
1361            .preallocate_range(&mut transaction, &mut file_range)
1362            .await
1363            .context("preallocate journal")?;
1364        if file_range.start < file_range.end {
1365            bail!("preallocate_range returned too little space");
1366        }
1367        transaction.commit().await?;
1368        Ok(())
1369    }
1370
1371    pub async fn init_superblocks(&self) -> Result<(), Error> {
1372        // Overwrite both superblocks.
1373        for _ in 0..2 {
1374            self.write_super_block().await?;
1375        }
1376        Ok(())
1377    }
1378
1379    /// Takes a snapshot of all journaled transactions which affect |object_id| since its last
1380    /// flush.
1381    /// The caller is responsible for locking; it must ensure that the journal is not trimmed during
1382    /// this call.  For example, a Flush lock could be held on the object in question (assuming that
1383    /// object has data to flush and is registered with ObjectManager).
1384    pub async fn read_transactions_for_object(
1385        &self,
1386        object_id: u64,
1387    ) -> Result<Vec<JournaledTransaction>, Error> {
1388        let handle = self.handle.get().expect("No journal handle");
1389        // Reopen the handle since JournalReader needs an owned handle.
1390        let handle = ObjectStore::open_object(
1391            handle.owner(),
1392            handle.object_id(),
1393            journal_handle_options(),
1394            None,
1395        )
1396        .await?;
1397
1398        let checkpoint = match self.objects.journal_checkpoint(object_id) {
1399            Some(checkpoint) => checkpoint,
1400            None => return Ok(vec![]),
1401        };
1402        let mut reader = JournalReader::new(handle, &checkpoint);
1403        // Record the current end offset and only read to there, so we don't accidentally read any
1404        // partially flushed blocks.
1405        let end_offset = self.inner.lock().valid_to;
1406        Ok(self.read_transactions(&mut reader, Some(end_offset), object_id).await?.transactions)
1407    }
1408
1409    /// Commits a transaction.  This is not thread safe; the caller must take appropriate locks.
1410    pub async fn commit(&self, transaction: &mut Transaction<'_>) -> Result<u64, Error> {
1411        if transaction.is_empty() {
1412            return Ok(self.inner.lock().writer.journal_file_checkpoint().file_offset);
1413        }
1414
1415        self.pre_commit(transaction).await?;
1416        Ok(self.write_and_apply_mutations(transaction))
1417    }
1418
1419    // Before we commit, we might need to extend the journal or write pending records to the
1420    // journal.
1421    async fn pre_commit(&self, transaction: &Transaction<'_>) -> Result<(), Error> {
1422        let handle;
1423
1424        let (size, zero_offset) = {
1425            let mut inner = self.inner.lock();
1426
1427            // If this is the first write after a RESET, we need to output version first.
1428            if std::mem::take(&mut inner.output_reset_version) {
1429                LATEST_VERSION.serialize_into(&mut inner.writer)?;
1430            }
1431
1432            if let Some(discard_offset) = inner.discard_offset {
1433                JournalRecord::Discard(discard_offset).serialize_into(&mut inner.writer)?;
1434                inner.discard_offset = None;
1435            }
1436
1437            if inner.needs_did_flush_device {
1438                let offset = inner.device_flushed_offset;
1439                JournalRecord::DidFlushDevice(offset).serialize_into(&mut inner.writer)?;
1440                inner.needs_did_flush_device = false;
1441            }
1442
1443            handle = match self.handle.get() {
1444                None => return Ok(()),
1445                Some(x) => x,
1446            };
1447
1448            let file_offset = inner.writer.journal_file_checkpoint().file_offset;
1449
1450            let size = handle.get_size();
1451            let size = if file_offset + self.chunk_size() > size { Some(size) } else { None };
1452
1453            if size.is_none()
1454                && inner.zero_offset.is_none()
1455                && !self.objects.needs_borrow_for_journal(file_offset)
1456            {
1457                return Ok(());
1458            }
1459
1460            (size, inner.zero_offset)
1461        };
1462
1463        let mut transaction = handle
1464            .new_transaction_with_options(Options {
1465                skip_journal_checks: true,
1466                borrow_metadata_space: true,
1467                allocator_reservation: Some(self.objects.metadata_reservation()),
1468                txn_guard: Some(transaction.txn_guard()),
1469                ..Default::default()
1470            })
1471            .await?;
1472        if let Some(size) = size {
1473            handle
1474                .preallocate_range(&mut transaction, &mut (size..size + self.chunk_size()))
1475                .await?;
1476        }
1477        if let Some(zero_offset) = zero_offset {
1478            handle.zero(&mut transaction, 0..zero_offset).await?;
1479        }
1480
1481        // We can't use regular transaction commit, because that can cause re-entrancy issues, so
1482        // instead we just apply the transaction directly here.
1483        self.write_and_apply_mutations(&mut transaction);
1484
1485        let mut inner = self.inner.lock();
1486
1487        // Make sure the transaction to extend the journal made it to the journal within the old
1488        // size, since otherwise, it won't be possible to replay.
1489        if let Some(size) = size {
1490            assert!(inner.writer.journal_file_checkpoint().file_offset < size);
1491        }
1492
1493        if inner.zero_offset == zero_offset {
1494            inner.zero_offset = None;
1495        }
1496
1497        Ok(())
1498    }
1499
1500    // Determines whether a mutation at the given checkpoint should be applied.  During replay, not
1501    // all records should be applied because the object store or allocator might already contain the
1502    // mutation.  After replay, that obviously isn't the case and we want to apply all mutations.
1503    fn should_apply(&self, object_id: u64, journal_file_checkpoint: &JournalCheckpoint) -> bool {
1504        let super_block_header = &self.inner.lock().super_block_header;
1505        let offset = super_block_header
1506            .journal_file_offsets
1507            .get(&object_id)
1508            .cloned()
1509            .unwrap_or(super_block_header.super_block_journal_file_offset);
1510        journal_file_checkpoint.file_offset >= offset
1511    }
1512
1513    /// Flushes previous writes to the device and then writes out a new super-block.
1514    /// Callers must ensure that we do not make concurrent calls.
1515    async fn write_super_block(&self) -> Result<(), Error> {
1516        let root_parent_store = self.objects.root_parent_store();
1517
1518        // We need to flush previous writes to the device since the new super-block we are writing
1519        // relies on written data being observable, and we also need to lock the root parent store
1520        // so that no new entries are written to it whilst we are writing the super-block, and for
1521        // that we use the write lock.
1522        let old_layers;
1523        let old_super_block_offset;
1524        let mut new_super_block_header;
1525        let checkpoint;
1526        let borrowed;
1527
1528        {
1529            let _sync_guard = debug_assert_not_too_long!(self.sync_mutex.lock());
1530            {
1531                let _write_guard = self.writer_mutex.lock();
1532                (checkpoint, borrowed) = self.pad_to_block()?;
1533                old_layers = super_block::compact_root_parent(&*root_parent_store)?;
1534            }
1535            self.flush_device(checkpoint.file_offset)
1536                .await
1537                .context("flush failed when writing superblock")?;
1538        }
1539
1540        new_super_block_header = self.inner.lock().super_block_header.clone();
1541
1542        old_super_block_offset = new_super_block_header.journal_checkpoint.file_offset;
1543
1544        let (journal_file_offsets, min_checkpoint) = self.objects.journal_file_offsets();
1545
1546        new_super_block_header.generation = new_super_block_header.generation.wrapping_add(1);
1547        new_super_block_header.super_block_journal_file_offset = checkpoint.file_offset;
1548        new_super_block_header.journal_checkpoint = min_checkpoint.unwrap_or(checkpoint);
1549        new_super_block_header.journal_checkpoint.version = LATEST_VERSION;
1550        new_super_block_header.journal_file_offsets = journal_file_offsets;
1551        new_super_block_header.borrowed_metadata_space = borrowed;
1552
1553        self.super_block_manager
1554            .save(
1555                new_super_block_header.clone(),
1556                self.objects.root_parent_store().filesystem(),
1557                old_layers,
1558            )
1559            .await?;
1560        {
1561            let mut inner = self.inner.lock();
1562            inner.super_block_header = new_super_block_header;
1563            inner.zero_offset = Some(round_down(old_super_block_offset, BLOCK_SIZE));
1564        }
1565
1566        Ok(())
1567    }
1568
1569    /// Flushes any buffered journal data to the device.  Note that this does not flush the device
1570    /// unless the flush_device option is set, in which case data should have been persisted to
1571    /// lower layers.  If a precondition is supplied, it is evaluated and the sync will be skipped
1572    /// if it returns false.  This allows callers to check a condition whilst a lock is held.  If a
1573    /// sync is performed, this function returns the checkpoint that was flushed and the amount of
1574    /// borrowed metadata space at the point it was flushed.
1575    pub async fn sync(
1576        &self,
1577        options: SyncOptions<'_>,
1578    ) -> Result<Option<(JournalCheckpoint, u64)>, Error> {
1579        let _guard = debug_assert_not_too_long!(self.sync_mutex.lock());
1580
1581        let (checkpoint, borrowed) = {
1582            if let Some(precondition) = options.precondition {
1583                if !precondition() {
1584                    return Ok(None);
1585                }
1586            }
1587
1588            // This guard is required so that we don't insert an EndBlock record in the middle of a
1589            // transaction.
1590            let _guard = self.writer_mutex.lock();
1591
1592            self.pad_to_block()?
1593        };
1594
1595        if options.flush_device {
1596            self.flush_device(checkpoint.file_offset).await.context("sync: flush failed")?;
1597        }
1598
1599        Ok(Some((checkpoint, borrowed)))
1600    }
1601
1602    // Returns the checkpoint as it was prior to padding.  This is done because the super block
1603    // needs to record where the last transaction ends and it's the next transaction that pays the
1604    // price of the padding.
1605    fn pad_to_block(&self) -> Result<(JournalCheckpoint, u64), Error> {
1606        let mut inner = self.inner.lock();
1607        let checkpoint = inner.writer.journal_file_checkpoint();
1608        if checkpoint.file_offset % BLOCK_SIZE != 0 {
1609            JournalRecord::EndBlock.serialize_into(&mut inner.writer)?;
1610            inner.writer.pad_to_block()?;
1611            if let Some(waker) = inner.flush_waker.take() {
1612                waker.wake();
1613            }
1614        }
1615        Ok((checkpoint, self.objects.borrowed_metadata_space()))
1616    }
1617
1618    async fn flush_device(&self, checkpoint_offset: u64) -> Result<(), Error> {
1619        assert!(
1620            self.inner.lock().image_builder_mode.is_none(),
1621            "flush_device called in image builder mode"
1622        );
1623        debug_assert_not_too_long!(poll_fn(|ctx| {
1624            let mut inner = self.inner.lock();
1625            if inner.flushed_offset >= checkpoint_offset {
1626                Poll::Ready(Ok(()))
1627            } else if inner.terminate {
1628                let context = inner
1629                    .terminate_reason
1630                    .as_ref()
1631                    .map(|e| format!("Journal closed with error: {:?}", e))
1632                    .unwrap_or_else(|| "Journal closed".to_string());
1633                Poll::Ready(Err(anyhow!(FxfsError::JournalFlushError).context(context)))
1634            } else {
1635                inner.sync_waker = Some(ctx.waker().clone());
1636                Poll::Pending
1637            }
1638        }))?;
1639
1640        let needs_flush = self.inner.lock().device_flushed_offset < checkpoint_offset;
1641        if needs_flush {
1642            let trace = self.trace.load(Ordering::Relaxed);
1643            if trace {
1644                info!("J: start flush device");
1645            }
1646            self.handle.get().unwrap().flush_device().await?;
1647            if trace {
1648                info!("J: end flush device");
1649            }
1650
1651            // We need to write a DidFlushDevice record at some point, but if we are in the
1652            // process of shutting down the filesystem, we want to leave the journal clean to
1653            // avoid there being log messages complaining about unwritten journal data, so we
1654            // queue it up so that the next transaction will trigger this record to be written.
1655            // If we are shutting down, that will never happen but since the DidFlushDevice
1656            // message is purely advisory (it reduces the number of checksums we have to verify
1657            // during replay), it doesn't matter if it isn't written.
1658            {
1659                let mut inner = self.inner.lock();
1660                inner.device_flushed_offset = checkpoint_offset;
1661                inner.needs_did_flush_device = true;
1662            }
1663
1664            // Tell the allocator that we flushed the device so that it can now start using
1665            // space that was deallocated.
1666            self.objects.allocator().did_flush_device(checkpoint_offset);
1667            if trace {
1668                info!("J: did flush device");
1669            }
1670        }
1671
1672        Ok(())
1673    }
1674
1675    /// Returns a copy of the super-block header.
1676    pub fn super_block_header(&self) -> SuperBlockHeader {
1677        self.inner.lock().super_block_header.clone()
1678    }
1679
1680    /// Waits for there to be sufficient space in the journal.
1681    pub async fn check_journal_space(&self) -> Result<(), Error> {
1682        loop {
1683            debug_assert_not_too_long!({
1684                let inner = self.inner.lock();
1685                if inner.terminate {
1686                    // If the flush error is set, this will never make progress, since we can't
1687                    // extend the journal any more.
1688                    let context = inner
1689                        .terminate_reason
1690                        .as_ref()
1691                        .map(|e| format!("Journal closed with error: {:?}", e))
1692                        .unwrap_or_else(|| "Journal closed".to_string());
1693                    break Err(anyhow!(FxfsError::JournalFlushError).context(context));
1694                }
1695                if self.objects.last_end_offset()
1696                    - inner.super_block_header.journal_checkpoint.file_offset
1697                    < inner.reclaim_size
1698                {
1699                    break Ok(());
1700                }
1701                if inner.image_builder_mode.is_some() {
1702                    break Ok(());
1703                }
1704                if inner.disable_compactions {
1705                    break Err(
1706                        anyhow!(FxfsError::JournalFlushError).context("Compactions disabled")
1707                    );
1708                }
1709                self.reclaim_event.listen()
1710            });
1711        }
1712    }
1713
1714    fn chunk_size(&self) -> u64 {
1715        CHUNK_SIZE
1716    }
1717
1718    fn write_and_apply_mutations(&self, transaction: &mut Transaction<'_>) -> u64 {
1719        let checkpoint_before;
1720        let checkpoint_after;
1721        {
1722            let _guard = self.writer_mutex.lock();
1723            checkpoint_before = {
1724                let mut inner = self.inner.lock();
1725                if transaction.includes_write() {
1726                    inner.needs_barrier = true;
1727                }
1728                let checkpoint = inner.writer.journal_file_checkpoint();
1729                for TxnMutation { object_id, mutation, .. } in transaction.mutations() {
1730                    self.objects.write_mutation(
1731                        *object_id,
1732                        mutation,
1733                        Writer(*object_id, &mut inner.writer),
1734                    );
1735                }
1736                checkpoint
1737            };
1738            let maybe_mutation =
1739                self.objects.apply_transaction(transaction, &checkpoint_before).expect(
1740                    "apply_transaction should not fail in live mode; \
1741                     filesystem will be in an inconsistent state",
1742                );
1743            checkpoint_after = {
1744                let mut inner = self.inner.lock();
1745                if let Some(mutation) = maybe_mutation {
1746                    inner
1747                        .writer
1748                        .write_record(&JournalRecord::Mutation { object_id: 0, mutation })
1749                        .unwrap();
1750                }
1751                for (device_range, checksums, first_write) in
1752                    transaction.take_checksums().into_iter()
1753                {
1754                    inner
1755                        .writer
1756                        .write_record(&JournalRecord::DataChecksums(
1757                            device_range,
1758                            Checksums::fletcher(checksums),
1759                            first_write,
1760                        ))
1761                        .unwrap();
1762                }
1763                inner.writer.write_record(&JournalRecord::Commit).unwrap();
1764
1765                inner.writer.journal_file_checkpoint()
1766            };
1767        }
1768        self.objects.did_commit_transaction(
1769            transaction,
1770            &checkpoint_before,
1771            checkpoint_after.file_offset,
1772        );
1773
1774        if let Some(waker) = self.inner.lock().flush_waker.take() {
1775            waker.wake();
1776        }
1777
1778        checkpoint_before.file_offset
1779    }
1780
1781    /// This task will flush journal data to the device when there is data that needs flushing, and
1782    /// trigger compactions when short of journal space.  It will return after the terminate method
1783    /// has been called, or an error is encountered with either flushing or compaction.
1784    pub async fn flush_task(self: Arc<Self>) {
1785        let mut flush_fut = None;
1786        let mut compact_fut = None;
1787        let mut flush_error = false;
1788        poll_fn(|ctx| {
1789            loop {
1790                {
1791                    let mut inner = self.inner.lock();
1792                    if flush_fut.is_none() && !flush_error && self.handle.get().is_some() {
1793                        let flushable = inner.writer.flushable_bytes();
1794                        if flushable > 0 {
1795                            flush_fut = Some(Box::pin(self.flush(flushable)));
1796                        }
1797                    }
1798                    if inner.terminate && flush_fut.is_none() && compact_fut.is_none() {
1799                        return Poll::Ready(());
1800                    }
1801                    // The / 2 is here because after compacting, we cannot reclaim the space until the
1802                    // _next_ time we flush the device since the super-block is not guaranteed to
1803                    // persist until then.
1804                    if compact_fut.is_none()
1805                        && !inner.terminate
1806                        && !inner.disable_compactions
1807                        && inner.image_builder_mode.is_none()
1808                        && self.objects.last_end_offset()
1809                            - inner.super_block_header.journal_checkpoint.file_offset
1810                            > inner.reclaim_size / 2
1811                    {
1812                        compact_fut = Some(Box::pin(self.compact()));
1813                        inner.compaction_running = true;
1814                    }
1815                    inner.flush_waker = Some(ctx.waker().clone());
1816                }
1817                let mut pending = true;
1818                if let Some(fut) = flush_fut.as_mut() {
1819                    if let Poll::Ready(result) = fut.poll_unpin(ctx) {
1820                        if let Err(e) = result {
1821                            self.inner.lock().terminate(Some(e.context("Flush error")));
1822                            self.reclaim_event.notify(usize::MAX);
1823                            flush_error = true;
1824                        }
1825                        flush_fut = None;
1826                        pending = false;
1827                    }
1828                }
1829                if let Some(fut) = compact_fut.as_mut() {
1830                    if let Poll::Ready(result) = fut.poll_unpin(ctx) {
1831                        let mut inner = self.inner.lock();
1832                        if let Err(e) = result {
1833                            inner.terminate(Some(e.context("Compaction error")));
1834                        }
1835                        compact_fut = None;
1836                        inner.compaction_running = false;
1837                        self.reclaim_event.notify(usize::MAX);
1838                        pending = false;
1839                    }
1840                }
1841                if pending {
1842                    return Poll::Pending;
1843                }
1844            }
1845        })
1846        .await;
1847    }
1848
1849    /// Returns a yielder that can be used for compactions.
1850    pub fn get_compaction_yielder(&self) -> CompactionYielder<'_> {
1851        CompactionYielder::new(self)
1852    }
1853
1854    async fn flush(&self, amount: usize) -> Result<(), Error> {
1855        let handle = self.handle.get().unwrap();
1856        let mut buf = handle.allocate_buffer(amount).await;
1857        let (offset, len, barrier_on_first_write) = {
1858            let mut inner = self.inner.lock();
1859            let offset = inner.writer.take_flushable(buf.as_mut());
1860            let barrier_on_first_write = inner.needs_barrier && inner.barriers_enabled;
1861            // Reset `needs_barrier` before instead of after the overwrite in case a txn commit
1862            // that contains data happens during the overwrite.
1863            inner.needs_barrier = false;
1864            (offset, buf.len() as u64, barrier_on_first_write)
1865        };
1866        self.handle
1867            .get()
1868            .unwrap()
1869            .overwrite(
1870                offset,
1871                buf.as_mut(),
1872                OverwriteOptions { barrier_on_first_write, ..Default::default() },
1873            )
1874            .await?;
1875
1876        let mut inner = self.inner.lock();
1877        if let Some(waker) = inner.sync_waker.take() {
1878            waker.wake();
1879        }
1880        inner.flushed_offset = offset + len;
1881        inner.valid_to = inner.flushed_offset;
1882        Ok(())
1883    }
1884
1885    #[trace]
1886    async fn compact(&self) -> Result<(), Error> {
1887        assert!(
1888            self.inner.lock().image_builder_mode.is_none(),
1889            "compact called in image builder mode"
1890        );
1891        let bytes_before = self.objects.compaction_bytes_written();
1892        let _measure = crate::metrics::DurationMeasureScope::new(
1893            &crate::metrics::lsm_tree_metrics().journal_compaction_time,
1894        );
1895        crate::metrics::lsm_tree_metrics().journal_compactions_total.add(1);
1896        let trace = self.trace.load(Ordering::Relaxed);
1897        debug!("Compaction starting");
1898        if trace {
1899            info!("J: start compaction");
1900        }
1901        let earliest_version = self.objects.flush().await.context("Failed to flush objects")?;
1902        self.inner.lock().super_block_header.earliest_version = earliest_version;
1903        self.write_super_block().await.context("Failed to write superblock")?;
1904        if trace {
1905            info!("J: end compaction");
1906        }
1907        debug!("Compaction finished");
1908        let bytes_after = self.objects.compaction_bytes_written();
1909        crate::metrics::lsm_tree_metrics()
1910            .journal_compaction_bytes_written
1911            .add(bytes_after.saturating_sub(bytes_before));
1912        Ok(())
1913    }
1914
1915    /// This should generally NOT be called externally. It is public to allow use by FIDL service
1916    /// fxfs.Debug.
1917    pub async fn force_compact(&self) -> Result<(), Error> {
1918        self.inner.lock().forced_compaction = true;
1919        scopeguard::defer! { self.inner.lock().forced_compaction = false; }
1920        self.compact().await
1921    }
1922
1923    pub async fn stop_compactions(&self) {
1924        loop {
1925            debug_assert_not_too_long!({
1926                let mut inner = self.inner.lock();
1927                inner.disable_compactions = true;
1928                if !inner.compaction_running {
1929                    return;
1930                }
1931                self.reclaim_event.listen()
1932            });
1933        }
1934    }
1935
1936    /// Creates a lazy inspect node named `str` under `parent` which will yield statistics for the
1937    /// journal when queried.
1938    pub fn track_statistics(self: &Arc<Self>, parent: &fuchsia_inspect::Node, name: &str) {
1939        let this = Arc::downgrade(self);
1940        parent.record_lazy_child(name, move || {
1941            let this_clone = this.clone();
1942            async move {
1943                let inspector = fuchsia_inspect::Inspector::default();
1944                if let Some(this) = this_clone.upgrade() {
1945                    let (journal_min, journal_max, journal_reclaim_size) = {
1946                        // TODO(https://fxbug.dev/42069513): Push-back or rate-limit to prevent DoS.
1947                        let inner = this.inner.lock();
1948                        (
1949                            round_down(
1950                                inner.super_block_header.journal_checkpoint.file_offset,
1951                                BLOCK_SIZE,
1952                            ),
1953                            inner.flushed_offset,
1954                            inner.reclaim_size,
1955                        )
1956                    };
1957                    let root = inspector.root();
1958                    root.record_uint("journal_min_offset", journal_min);
1959                    root.record_uint("journal_max_offset", journal_max);
1960                    root.record_uint("journal_size", journal_max - journal_min);
1961                    root.record_uint("journal_reclaim_size", journal_reclaim_size);
1962
1963                    // TODO(https://fxbug.dev/42068224): Post-compute rather than manually computing metrics.
1964                    if let Some(x) = round_div(
1965                        100 * (journal_max - journal_min),
1966                        this.objects.allocator().get_disk_bytes(),
1967                    ) {
1968                        root.record_uint("journal_size_to_disk_size_percent", x);
1969                    }
1970                }
1971                Ok(inspector)
1972            }
1973            .boxed()
1974        });
1975    }
1976
1977    /// Terminate all journal activity.
1978    pub fn terminate(&self) {
1979        self.inner.lock().terminate(/*reason*/ None);
1980        self.reclaim_event.notify(usize::MAX);
1981    }
1982}
1983
1984/// Wrapper to allow records to be written to the journal.
1985pub struct Writer<'a>(u64, &'a mut JournalWriter);
1986
1987impl Writer<'_> {
1988    pub fn write(&mut self, mutation: Mutation) {
1989        self.1.write_record(&JournalRecord::Mutation { object_id: self.0, mutation }).unwrap();
1990    }
1991}
1992
1993#[cfg(target_os = "fuchsia")]
1994mod yielder {
1995    use super::Journal;
1996    use crate::lsm_tree::Yielder;
1997    use fuchsia_async as fasync;
1998
1999    /// CompactionYielder uses fuchsia-async to yield if other tasks are being polled, which should
2000    /// be a proxy for how busy the system is.  We can afford to delay compactions for a small
2001    /// amount of time but not so long that we end up blocking new transactions.
2002    pub struct CompactionYielder<'a> {
2003        journal: &'a Journal,
2004        low_priority_task: Option<fasync::LowPriorityTask>,
2005    }
2006
2007    impl<'a> CompactionYielder<'a> {
2008        pub fn new(journal: &'a Journal) -> Self {
2009            Self { journal, low_priority_task: None }
2010        }
2011    }
2012
2013    impl Yielder for CompactionYielder<'_> {
2014        async fn yield_now(&mut self) {
2015            // We will wait for the executor to be idle for 4ms, but no longer than 16ms.  We need
2016            // to cap the maximum amount of time we wait in case we've reached a point where
2017            // compaction is now urgent or else we could block new transactions.
2018            const IDLE_PERIOD: zx::MonotonicDuration = zx::MonotonicDuration::from_millis(4);
2019            const MAX_YIELD_DURATION: zx::MonotonicDuration =
2020                zx::MonotonicDuration::from_millis(16);
2021
2022            {
2023                let inner = self.journal.inner.lock();
2024                if inner.forced_compaction {
2025                    return;
2026                }
2027                let outstanding = self.journal.objects.last_end_offset()
2028                    - inner.super_block_header.journal_checkpoint.file_offset;
2029                let half_reclaim_size = inner.reclaim_size / 2;
2030                if outstanding
2031                    .checked_sub(half_reclaim_size)
2032                    .is_some_and(|x| x >= half_reclaim_size / 2)
2033                {
2034                    // If we have got to the point where we have used up 3/4 of reclaim size in the
2035                    // journal, do not delay any further.  If we continue to yield we will get to
2036                    // the point where we block new transactions.
2037                    self.low_priority_task = None;
2038                    return;
2039                }
2040            }
2041
2042            self.low_priority_task
2043                .get_or_insert_with(|| fasync::LowPriorityTask::new())
2044                .wait_until_idle_for(
2045                    IDLE_PERIOD,
2046                    fasync::MonotonicInstant::after(MAX_YIELD_DURATION),
2047                )
2048                .await;
2049        }
2050    }
2051}
2052
2053#[cfg(not(target_os = "fuchsia"))]
2054mod yielder {
2055    use super::Journal;
2056    use crate::lsm_tree::Yielder;
2057
2058    #[expect(dead_code)]
2059    pub struct CompactionYielder<'a>(&'a Journal);
2060
2061    impl<'a> CompactionYielder<'a> {
2062        pub fn new(journal: &'a Journal) -> Self {
2063            Self(journal)
2064        }
2065    }
2066
2067    impl Yielder for CompactionYielder<'_> {
2068        async fn yield_now(&mut self) {}
2069    }
2070}
2071
2072pub use yielder::*;
2073
2074#[cfg(test)]
2075mod tests {
2076    use super::SuperBlockInstance;
2077    use crate::filesystem::{FxFilesystem, FxFilesystemBuilder, SyncOptions};
2078    use crate::fsck::fsck;
2079    use crate::object_handle::{ObjectHandle, ReadObjectHandle, WriteObjectHandle};
2080    use crate::object_store::directory::Directory;
2081    use crate::object_store::transaction::Options;
2082    use crate::object_store::volume::root_volume;
2083    use crate::object_store::{
2084        HandleOptions, LockKey, NewChildStoreOptions, ObjectStore, StoreOptions, lock_keys,
2085    };
2086    use fuchsia_async as fasync;
2087    use fuchsia_async::MonotonicDuration;
2088    use storage_device::DeviceHolder;
2089    use storage_device::fake_device::FakeDevice;
2090
2091    const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
2092
2093    #[fuchsia::test]
2094    async fn test_replay() {
2095        const TEST_DATA: &[u8] = b"hello";
2096
2097        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
2098
2099        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2100
2101        let object_id = {
2102            let root_store = fs.root_store();
2103            let root_directory =
2104                Directory::open(&root_store, root_store.root_directory_object_id())
2105                    .await
2106                    .expect("open failed");
2107            let mut transaction = fs
2108                .clone()
2109                .new_transaction(
2110                    lock_keys![LockKey::object(
2111                        root_store.store_object_id(),
2112                        root_store.root_directory_object_id(),
2113                    )],
2114                    Options::default(),
2115                )
2116                .await
2117                .expect("new_transaction failed");
2118            let handle = root_directory
2119                .create_child_file(&mut transaction, "test")
2120                .await
2121                .expect("create_child_file failed");
2122
2123            transaction.commit().await.expect("commit failed");
2124            let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
2125            buf.as_mut_slice().copy_from_slice(TEST_DATA);
2126            handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2127            // As this is the first sync, this will actually trigger a new super-block, but normally
2128            // this would not be the case.
2129            fs.sync(SyncOptions::default()).await.expect("sync failed");
2130            handle.object_id()
2131        };
2132
2133        {
2134            fs.close().await.expect("Close failed");
2135            let device = fs.take_device().await;
2136            device.reopen(false);
2137            let fs = FxFilesystem::open(device).await.expect("open failed");
2138            let handle = ObjectStore::open_object(
2139                &fs.root_store(),
2140                object_id,
2141                HandleOptions::default(),
2142                None,
2143            )
2144            .await
2145            .expect("open_object failed");
2146            let mut buf = handle.allocate_buffer(TEST_DEVICE_BLOCK_SIZE as usize).await;
2147            assert_eq!(handle.read(0, buf.as_mut()).await.expect("read failed"), TEST_DATA.len());
2148            assert_eq!(&buf.as_slice()[..TEST_DATA.len()], TEST_DATA);
2149            fsck(fs.clone()).await.expect("fsck failed");
2150            fs.close().await.expect("Close failed");
2151        }
2152    }
2153
2154    #[fuchsia::test]
2155    async fn test_reset() {
2156        const TEST_DATA: &[u8] = b"hello";
2157
2158        let device = DeviceHolder::new(FakeDevice::new(32768, TEST_DEVICE_BLOCK_SIZE));
2159
2160        let mut object_ids = Vec::new();
2161
2162        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2163        {
2164            let root_store = fs.root_store();
2165            let root_directory =
2166                Directory::open(&root_store, root_store.root_directory_object_id())
2167                    .await
2168                    .expect("open failed");
2169            let mut transaction = fs
2170                .clone()
2171                .new_transaction(
2172                    lock_keys![LockKey::object(
2173                        root_store.store_object_id(),
2174                        root_store.root_directory_object_id(),
2175                    )],
2176                    Options::default(),
2177                )
2178                .await
2179                .expect("new_transaction failed");
2180            let handle = root_directory
2181                .create_child_file(&mut transaction, "test")
2182                .await
2183                .expect("create_child_file failed");
2184            transaction.commit().await.expect("commit failed");
2185            let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
2186            buf.as_mut_slice().copy_from_slice(TEST_DATA);
2187            handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2188            fs.sync(SyncOptions::default()).await.expect("sync failed");
2189            object_ids.push(handle.object_id());
2190
2191            // Create a lot of objects but don't sync at the end. This should leave the filesystem
2192            // with a half finished transaction that cannot be replayed.
2193            for i in 0..1000 {
2194                let mut transaction = fs
2195                    .clone()
2196                    .new_transaction(
2197                        lock_keys![LockKey::object(
2198                            root_store.store_object_id(),
2199                            root_store.root_directory_object_id(),
2200                        )],
2201                        Options::default(),
2202                    )
2203                    .await
2204                    .expect("new_transaction failed");
2205                let handle = root_directory
2206                    .create_child_file(&mut transaction, &format!("{}", i))
2207                    .await
2208                    .expect("create_child_file failed");
2209                transaction.commit().await.expect("commit failed");
2210                let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
2211                buf.as_mut_slice().copy_from_slice(TEST_DATA);
2212                handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2213                object_ids.push(handle.object_id());
2214            }
2215        }
2216        fs.close().await.expect("fs close failed");
2217        let device = fs.take_device().await;
2218        device.reopen(false);
2219        let fs = FxFilesystem::open(device).await.expect("open failed");
2220        fsck(fs.clone()).await.expect("fsck failed");
2221        {
2222            let root_store = fs.root_store();
2223            // Check the first two objects which should exist.
2224            for &object_id in &object_ids[0..1] {
2225                let handle = ObjectStore::open_object(
2226                    &root_store,
2227                    object_id,
2228                    HandleOptions::default(),
2229                    None,
2230                )
2231                .await
2232                .expect("open_object failed");
2233                let mut buf = handle.allocate_buffer(TEST_DEVICE_BLOCK_SIZE as usize).await;
2234                assert_eq!(
2235                    handle.read(0, buf.as_mut()).await.expect("read failed"),
2236                    TEST_DATA.len()
2237                );
2238                assert_eq!(&buf.as_slice()[..TEST_DATA.len()], TEST_DATA);
2239            }
2240
2241            // Write one more object and sync.
2242            let root_directory =
2243                Directory::open(&root_store, root_store.root_directory_object_id())
2244                    .await
2245                    .expect("open failed");
2246            let mut transaction = fs
2247                .clone()
2248                .new_transaction(
2249                    lock_keys![LockKey::object(
2250                        root_store.store_object_id(),
2251                        root_store.root_directory_object_id(),
2252                    )],
2253                    Options::default(),
2254                )
2255                .await
2256                .expect("new_transaction failed");
2257            let handle = root_directory
2258                .create_child_file(&mut transaction, "test2")
2259                .await
2260                .expect("create_child_file failed");
2261            transaction.commit().await.expect("commit failed");
2262            let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
2263            buf.as_mut_slice().copy_from_slice(TEST_DATA);
2264            handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2265            fs.sync(SyncOptions::default()).await.expect("sync failed");
2266            object_ids.push(handle.object_id());
2267        }
2268
2269        fs.close().await.expect("close failed");
2270        let device = fs.take_device().await;
2271        device.reopen(false);
2272        let fs = FxFilesystem::open(device).await.expect("open failed");
2273        {
2274            fsck(fs.clone()).await.expect("fsck failed");
2275
2276            // Check the first two and the last objects.
2277            for &object_id in object_ids[0..1].iter().chain(object_ids.last().cloned().iter()) {
2278                let handle = ObjectStore::open_object(
2279                    &fs.root_store(),
2280                    object_id,
2281                    HandleOptions::default(),
2282                    None,
2283                )
2284                .await
2285                .unwrap_or_else(|e| {
2286                    panic!("open_object failed (object_id: {}): {:?}", object_id, e)
2287                });
2288                let mut buf = handle.allocate_buffer(TEST_DEVICE_BLOCK_SIZE as usize).await;
2289                assert_eq!(
2290                    handle.read(0, buf.as_mut()).await.expect("read failed"),
2291                    TEST_DATA.len()
2292                );
2293                assert_eq!(&buf.as_slice()[..TEST_DATA.len()], TEST_DATA);
2294            }
2295        }
2296        fs.close().await.expect("close failed");
2297    }
2298
2299    #[fuchsia::test]
2300    async fn test_discard() {
2301        let device = {
2302            let device = DeviceHolder::new(FakeDevice::new(8192, 4096));
2303            let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2304            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
2305
2306            let store = root_volume
2307                .new_volume("test", NewChildStoreOptions::default())
2308                .await
2309                .expect("new_volume failed");
2310            let root_directory = Directory::open(&store, store.root_directory_object_id())
2311                .await
2312                .expect("open failed");
2313
2314            // Create enough data so that another journal extent is used.
2315            let mut i = 0;
2316            loop {
2317                let mut transaction = fs
2318                    .clone()
2319                    .new_transaction(
2320                        lock_keys![LockKey::object(
2321                            store.store_object_id(),
2322                            store.root_directory_object_id()
2323                        )],
2324                        Options::default(),
2325                    )
2326                    .await
2327                    .expect("new_transaction failed");
2328                root_directory
2329                    .create_child_file(&mut transaction, &format!("a {i}"))
2330                    .await
2331                    .expect("create_child_file failed");
2332                if transaction.commit().await.expect("commit failed") > super::CHUNK_SIZE {
2333                    break;
2334                }
2335                i += 1;
2336            }
2337
2338            // Compact and then disable compactions.
2339            fs.journal().force_compact().await.expect("compact failed");
2340            fs.journal().stop_compactions().await;
2341
2342            // Keep going until we need another journal extent.
2343            let mut i = 0;
2344            loop {
2345                let mut transaction = fs
2346                    .clone()
2347                    .new_transaction(
2348                        lock_keys![LockKey::object(
2349                            store.store_object_id(),
2350                            store.root_directory_object_id()
2351                        )],
2352                        Options::default(),
2353                    )
2354                    .await
2355                    .expect("new_transaction failed");
2356                root_directory
2357                    .create_child_file(&mut transaction, &format!("b {i}"))
2358                    .await
2359                    .expect("create_child_file failed");
2360                if transaction.commit().await.expect("commit failed") > 2 * super::CHUNK_SIZE {
2361                    break;
2362                }
2363                i += 1;
2364            }
2365
2366            // Allow the journal to flush, but we don't want to sync.
2367            fasync::Timer::new(MonotonicDuration::from_millis(10)).await;
2368            // Because we're not gracefully closing the filesystem, a Discard record will be
2369            // emitted.
2370            fs.device().snapshot().expect("snapshot failed")
2371        };
2372
2373        let fs = FxFilesystem::open(device).await.expect("open failed");
2374
2375        {
2376            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
2377
2378            let store =
2379                root_volume.volume("test", StoreOptions::default()).await.expect("volume failed");
2380
2381            let root_directory = Directory::open(&store, store.root_directory_object_id())
2382                .await
2383                .expect("open failed");
2384
2385            // Write one more transaction.
2386            let mut transaction = fs
2387                .clone()
2388                .new_transaction(
2389                    lock_keys![LockKey::object(
2390                        store.store_object_id(),
2391                        store.root_directory_object_id()
2392                    )],
2393                    Options::default(),
2394                )
2395                .await
2396                .expect("new_transaction failed");
2397            root_directory
2398                .create_child_file(&mut transaction, &format!("d"))
2399                .await
2400                .expect("create_child_file failed");
2401            transaction.commit().await.expect("commit failed");
2402        }
2403
2404        fs.close().await.expect("close failed");
2405        let device = fs.take_device().await;
2406        device.reopen(false);
2407
2408        let fs = FxFilesystem::open(device).await.expect("open failed");
2409        fsck(fs.clone()).await.expect("fsck failed");
2410        fs.close().await.expect("close failed");
2411    }
2412
2413    #[fuchsia::test]
2414    async fn test_use_existing_generation() {
2415        let device = DeviceHolder::new(FakeDevice::new(8192, 4096));
2416
2417        // First format should be generation 1.
2418        let fs = FxFilesystemBuilder::new()
2419            .format(true)
2420            .image_builder_mode(Some(SuperBlockInstance::A))
2421            .open(device)
2422            .await
2423            .expect("open failed");
2424        fs.enable_allocations();
2425        let generation0 = fs.super_block_header().generation;
2426        assert_eq!(generation0, 1);
2427        fs.close().await.expect("close failed");
2428        let device = fs.take_device().await;
2429        device.reopen(false);
2430
2431        // Format the device normally (again, generation 1).
2432        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2433        let generation1 = fs.super_block_header().generation;
2434        {
2435            let root_volume = crate::object_store::volume::root_volume(fs.clone())
2436                .await
2437                .expect("root_volume failed");
2438            root_volume
2439                .new_volume("test", crate::object_store::NewChildStoreOptions::default())
2440                .await
2441                .expect("new_volume failed");
2442        }
2443        fs.close().await.expect("close failed");
2444        let device = fs.take_device().await;
2445        device.reopen(false);
2446
2447        // Format again with image_builder_mode.
2448        let fs = FxFilesystemBuilder::new()
2449            .format(true)
2450            .image_builder_mode(Some(SuperBlockInstance::A))
2451            .open(device)
2452            .await
2453            .expect("open failed");
2454        fs.enable_allocations();
2455        let generation2 = fs.super_block_header().generation;
2456        assert!(
2457            generation2 > generation1,
2458            "generation2 ({}) should be greater than generation1 ({})",
2459            generation2,
2460            generation1
2461        );
2462        fs.close().await.expect("close failed");
2463    }
2464
2465    #[fuchsia::test]
2466    async fn test_image_builder_mode_generation_bump_512_byte_block() {
2467        let device = DeviceHolder::new(FakeDevice::new(16384, 512));
2468
2469        // Format initial filesystem (generation 1)
2470        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2471        let generation1 = fs.super_block_header().generation;
2472        fs.close().await.expect("close failed");
2473        let device = fs.take_device().await;
2474        device.reopen(false);
2475
2476        // Format again with image_builder_mode (should bump generation)
2477        let fs = FxFilesystemBuilder::new()
2478            .format(true)
2479            .image_builder_mode(Some(SuperBlockInstance::A))
2480            .open(device)
2481            .await
2482            .expect("open failed");
2483
2484        fs.enable_allocations();
2485        let generation2 = fs.super_block_header().generation;
2486        assert!(
2487            generation2 > generation1,
2488            "Expected generation bump, got {} vs {}",
2489            generation2,
2490            generation1
2491        );
2492        fs.close().await.expect("close failed");
2493    }
2494
2495    #[fuchsia::test(allow_stalls = false)]
2496    #[cfg(target_os = "fuchsia")]
2497    async fn test_low_priority_compaction() {
2498        use fuchsia_async::TestExecutor;
2499        use std::sync::Arc;
2500        use std::sync::atomic::{AtomicBool, Ordering};
2501
2502        let device = DeviceHolder::new(FakeDevice::new(16384, TEST_DEVICE_BLOCK_SIZE));
2503        let fs = FxFilesystemBuilder::new()
2504            .journal_options(super::JournalOptions { reclaim_size: 65536, ..Default::default() })
2505            .format(true)
2506            .open(device)
2507            .await
2508            .expect("open failed");
2509
2510        let _low = fasync::LowPriorityTask::new();
2511
2512        // Add some data to the tree.
2513        {
2514            let root_store = fs.root_store();
2515            let root_directory =
2516                Directory::open(&root_store, root_store.root_directory_object_id())
2517                    .await
2518                    .expect("open failed");
2519            for i in 0..100 {
2520                let mut transaction = fs
2521                    .clone()
2522                    .new_transaction(
2523                        lock_keys![LockKey::object(
2524                            root_store.store_object_id(),
2525                            root_store.root_directory_object_id(),
2526                        )],
2527                        Options::default(),
2528                    )
2529                    .await
2530                    .expect("new_transaction failed");
2531                root_directory
2532                    .create_child_file(&mut transaction, &format!("test{}", i))
2533                    .await
2534                    .expect("create_child_file failed");
2535                transaction.commit().await.expect("commit failed");
2536            }
2537        }
2538
2539        // Spawn a task that polls every 1ms.
2540        let stop = Arc::new(AtomicBool::new(false));
2541        let stop_clone = stop.clone();
2542        let _normal_task = fasync::Task::spawn(async move {
2543            while !stop_clone.load(Ordering::Relaxed) {
2544                fasync::Timer::new(fasync::MonotonicInstant::after(
2545                    MonotonicDuration::from_millis(1),
2546                ))
2547                .await;
2548            }
2549        });
2550
2551        // Trigger journal compaction.
2552        // We can do this by writing more data until outstanding > reclaim_size / 2.
2553        {
2554            let root_store = fs.root_store();
2555            let root_directory =
2556                Directory::open(&root_store, root_store.root_directory_object_id())
2557                    .await
2558                    .expect("open failed");
2559            let mut i = 0;
2560            loop {
2561                let mut transaction = fs
2562                    .clone()
2563                    .new_transaction(
2564                        lock_keys![LockKey::object(
2565                            root_store.store_object_id(),
2566                            root_store.root_directory_object_id(),
2567                        )],
2568                        Options::default(),
2569                    )
2570                    .await
2571                    .expect("new_transaction failed");
2572                root_directory
2573                    .create_child_file(&mut transaction, &format!("trigger{i}"))
2574                    .await
2575                    .expect("create_child_file failed");
2576                transaction.commit().await.expect("commit failed");
2577
2578                if fs.journal().inner.lock().compaction_running {
2579                    break;
2580                }
2581                TestExecutor::advance_to(fasync::MonotonicInstant::after(
2582                    MonotonicDuration::from_millis(1),
2583                ))
2584                .await;
2585                i += 1;
2586            }
2587        }
2588
2589        // Compaction should now be running. Because of our 1ms poller, it should be yielding.
2590        // It will yield for 16ms at each point.
2591        for _ in 0..10 {
2592            TestExecutor::advance_to(fasync::MonotonicInstant::after(
2593                MonotonicDuration::from_millis(1),
2594            ))
2595            .await;
2596            assert!(fs.journal().inner.lock().compaction_running);
2597        }
2598
2599        // Stop the normal task.
2600        stop.store(true, Ordering::Relaxed);
2601        TestExecutor::advance_to(fasync::MonotonicInstant::after(MonotonicDuration::from_millis(
2602            1,
2603        )))
2604        .await;
2605
2606        // Compaction should still be running because it hasn't been 4 ms since the normal task
2607        // finished.
2608        assert!(fs.journal().inner.lock().compaction_running);
2609
2610        // For the next 3ms, compaction should still be running.
2611        for _ in 0..3 {
2612            TestExecutor::advance_to(fasync::MonotonicInstant::after(
2613                MonotonicDuration::from_millis(1),
2614            ))
2615            .await;
2616            assert!(fs.journal().inner.lock().compaction_running);
2617        }
2618
2619        // 1 more ms and compaction should be unblocked.
2620        TestExecutor::advance_to(fasync::MonotonicInstant::after(MonotonicDuration::from_millis(
2621            1,
2622        )))
2623        .await;
2624
2625        // When the executor next stalls, compaction should be done.
2626        let _ = TestExecutor::poll_until_stalled(std::future::pending::<()>()).await;
2627        assert!(!fs.journal().inner.lock().compaction_running);
2628
2629        fs.close().await.expect("Close failed");
2630    }
2631
2632    #[fuchsia::test(allow_stalls = false)]
2633    #[cfg(target_os = "fuchsia")]
2634    async fn test_low_priority_compaction_deadline() {
2635        use fuchsia_async::TestExecutor;
2636        use std::sync::Arc;
2637        use std::sync::atomic::{AtomicBool, Ordering};
2638
2639        let device = DeviceHolder::new(FakeDevice::new(216384, TEST_DEVICE_BLOCK_SIZE));
2640        let fs = FxFilesystemBuilder::new()
2641            .journal_options(super::JournalOptions { reclaim_size: 65536, ..Default::default() })
2642            .format(true)
2643            .open(device)
2644            .await
2645            .expect("open failed");
2646
2647        let _low = fasync::LowPriorityTask::new();
2648
2649        // Add some data to the tree.
2650        {
2651            let root_store = fs.root_store();
2652            let root_directory =
2653                Directory::open(&root_store, root_store.root_directory_object_id())
2654                    .await
2655                    .expect("open failed");
2656            for i in 0..10 {
2657                let mut transaction = fs
2658                    .clone()
2659                    .new_transaction(
2660                        lock_keys![LockKey::object(
2661                            root_store.store_object_id(),
2662                            root_store.root_directory_object_id(),
2663                        )],
2664                        Options::default(),
2665                    )
2666                    .await
2667                    .expect("new_transaction failed");
2668                root_directory
2669                    .create_child_file(&mut transaction, &format!("test{}", i))
2670                    .await
2671                    .expect("create_child_file failed");
2672                transaction.commit().await.expect("commit failed");
2673            }
2674        }
2675
2676        // Spawn a task that polls every 3ms.
2677        let stop = Arc::new(AtomicBool::new(false));
2678        let stop_clone = stop.clone();
2679        let _normal_task = fasync::Task::spawn(async move {
2680            while !stop_clone.load(Ordering::Relaxed) {
2681                fasync::Timer::new(fasync::MonotonicInstant::after(
2682                    MonotonicDuration::from_millis(3),
2683                ))
2684                .await;
2685            }
2686        });
2687
2688        // Trigger journal compaction.
2689        {
2690            let root_store = fs.root_store();
2691            let root_directory =
2692                Directory::open(&root_store, root_store.root_directory_object_id())
2693                    .await
2694                    .expect("open failed");
2695            let mut i = 0;
2696            loop {
2697                let mut transaction = fs
2698                    .clone()
2699                    .new_transaction(
2700                        lock_keys![LockKey::object(
2701                            root_store.store_object_id(),
2702                            root_store.root_directory_object_id(),
2703                        )],
2704                        Options::default(),
2705                    )
2706                    .await
2707                    .expect("new_transaction failed");
2708                root_directory
2709                    .create_child_file(&mut transaction, &format!("trigger{i}"))
2710                    .await
2711                    .expect("create_child_file failed");
2712                transaction.commit().await.expect("commit failed");
2713
2714                if fs.journal().inner.lock().compaction_running {
2715                    break;
2716                }
2717                TestExecutor::advance_to(fasync::MonotonicInstant::after(
2718                    MonotonicDuration::from_millis(1),
2719                ))
2720                .await;
2721                i += 1;
2722            }
2723        }
2724
2725        // Advance time in 20ms increments. Each increment should allow compaction to make progress
2726        // on one item (since MAX_YIELD_DURATION is 16ms).
2727        let mut count = 0;
2728        for _ in 0..1000 {
2729            TestExecutor::advance_to(fasync::MonotonicInstant::after(
2730                MonotonicDuration::from_millis(20),
2731            ))
2732            .await;
2733            if !fs.journal().inner.lock().compaction_running {
2734                break;
2735            }
2736            count += 1;
2737        }
2738        assert!(!fs.journal().inner.lock().compaction_running);
2739
2740        // Make sure it took a few iterations to complete.  It's difficult to know what the exact
2741        // number should be.
2742        assert!(count > 200);
2743
2744        stop.store(true, Ordering::Relaxed);
2745        fs.close().await.expect("Close failed");
2746    }
2747
2748    #[fuchsia::test(allow_stalls = false)]
2749    #[cfg(target_os = "fuchsia")]
2750    async fn test_low_priority_compaction_no_yielding_when_full() {
2751        use fuchsia_async::TestExecutor;
2752        use std::sync::Arc;
2753        use std::sync::atomic::{AtomicBool, Ordering};
2754
2755        let reclaim_size = 65536;
2756        let device = DeviceHolder::new(FakeDevice::new(216384, TEST_DEVICE_BLOCK_SIZE));
2757        let fs = FxFilesystemBuilder::new()
2758            .journal_options(super::JournalOptions { reclaim_size, ..Default::default() })
2759            .format(true)
2760            .open(device)
2761            .await
2762            .expect("open failed");
2763
2764        let _low = fasync::LowPriorityTask::new();
2765
2766        // Add some data to the tree.
2767        {
2768            let root_store = fs.root_store();
2769            let root_directory =
2770                Directory::open(&root_store, root_store.root_directory_object_id())
2771                    .await
2772                    .expect("open failed");
2773            for i in 0..10 {
2774                let mut transaction = fs
2775                    .clone()
2776                    .new_transaction(
2777                        lock_keys![LockKey::object(
2778                            root_store.store_object_id(),
2779                            root_store.root_directory_object_id(),
2780                        )],
2781                        Options::default(),
2782                    )
2783                    .await
2784                    .expect("new_transaction failed");
2785                root_directory
2786                    .create_child_file(&mut transaction, &format!("test{}", i))
2787                    .await
2788                    .expect("create_child_file failed");
2789                transaction.commit().await.expect("commit failed");
2790            }
2791        }
2792
2793        // Spawn a task that polls every 3ms.
2794        let stop = Arc::new(AtomicBool::new(false));
2795        let stop_clone = stop.clone();
2796        let _normal_task = fasync::Task::spawn(async move {
2797            while !stop_clone.load(Ordering::Relaxed) {
2798                fasync::Timer::new(fasync::MonotonicInstant::after(
2799                    MonotonicDuration::from_millis(3),
2800                ))
2801                .await;
2802            }
2803        });
2804
2805        // Trigger journal compaction, but this time we fill it up to 3/4 full.
2806        {
2807            let root_store = fs.root_store();
2808            let root_directory =
2809                Directory::open(&root_store, root_store.root_directory_object_id())
2810                    .await
2811                    .expect("open failed");
2812            let mut i = 0;
2813            loop {
2814                let mut transaction = fs
2815                    .clone()
2816                    .new_transaction(
2817                        lock_keys![LockKey::object(
2818                            root_store.store_object_id(),
2819                            root_store.root_directory_object_id(),
2820                        )],
2821                        Options::default(),
2822                    )
2823                    .await
2824                    .expect("new_transaction failed");
2825                root_directory
2826                    .create_child_file(&mut transaction, &format!("trigger{i}"))
2827                    .await
2828                    .expect("create_child_file failed");
2829                transaction.commit().await.expect("commit failed");
2830
2831                let outstanding = {
2832                    let inner = fs.journal().inner.lock();
2833                    fs.journal().objects.last_end_offset()
2834                        - inner.super_block_header.journal_checkpoint.file_offset
2835                };
2836                if outstanding >= reclaim_size * 7 / 8 {
2837                    break;
2838                }
2839                // We don't advance time here to try and reach 3/4 before compaction can yield.
2840                i += 1;
2841            }
2842        }
2843
2844        // Advancing by 4ms should be enough to wake compaction up.
2845        TestExecutor::advance_to(fasync::MonotonicInstant::after(MonotonicDuration::from_millis(
2846            4,
2847        )))
2848        .await;
2849
2850        // When the executor next stalls, compaction should be done.
2851        let _ = TestExecutor::poll_until_stalled(std::future::pending::<()>()).await;
2852        assert!(!fs.journal().inner.lock().compaction_running);
2853
2854        stop.store(true, Ordering::Relaxed);
2855        fs.close().await.expect("Close failed");
2856    }
2857}
2858
2859#[cfg(fuzz)]
2860mod fuzz {
2861    use fuzz::fuzz;
2862
2863    #[fuzz]
2864    fn fuzz_journal_bytes(input: Vec<u8>) {
2865        use crate::filesystem::FxFilesystem;
2866        use fuchsia_async as fasync;
2867        use std::io::Write;
2868        use storage_device::DeviceHolder;
2869        use storage_device::fake_device::FakeDevice;
2870
2871        fasync::SendExecutorBuilder::new().num_threads(4).build().run(async move {
2872            let device = DeviceHolder::new(FakeDevice::new(32768, 512));
2873            let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2874            fs.journal().inner.lock().writer.write_all(&input).expect("write failed");
2875            fs.close().await.expect("close failed");
2876            let device = fs.take_device().await;
2877            device.reopen(false);
2878            if let Ok(fs) = FxFilesystem::open(device).await {
2879                // `close()` can fail if there were objects to be tombstoned. If the said object is
2880                // corrupted, there will be an error when we compact the journal.
2881                let _ = fs.close().await;
2882            }
2883        });
2884    }
2885
2886    #[fuzz]
2887    fn fuzz_journal(input: Vec<super::JournalRecord>) {
2888        use crate::filesystem::FxFilesystem;
2889        use fuchsia_async as fasync;
2890        use storage_device::DeviceHolder;
2891        use storage_device::fake_device::FakeDevice;
2892
2893        fasync::SendExecutorBuilder::new().num_threads(4).build().run(async move {
2894            let device = DeviceHolder::new(FakeDevice::new(32768, 512));
2895            let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2896            {
2897                let mut inner = fs.journal().inner.lock();
2898                for record in &input {
2899                    let _ = inner.writer.write_record(record);
2900                }
2901            }
2902            fs.close().await.expect("close failed");
2903            let device = fs.take_device().await;
2904            device.reopen(false);
2905            if let Ok(fs) = FxFilesystem::open(device).await {
2906                // `close()` can fail if there were objects to be tombstoned. If the said object is
2907                // corrupted, there will be an error when we compact the journal.
2908                let _ = fs.close().await;
2909            }
2910        });
2911    }
2912}