fxfs/object_store/
journal.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! The journal is implemented as an ever extending file which contains variable length records
6//! that describe mutations to be applied to various objects.  The journal file consists of
7//! blocks, with a checksum at the end of each block, but otherwise it can be considered a
8//! continuous stream.
9//!
10//! The checksum is seeded with the checksum from the previous block.  To free space in the
11//! journal, records are replaced with sparse extents when it is known they are no longer
12//! needed to mount.
13//!
14//! At mount time, the journal is replayed: the mutations are applied into memory.
15//! Eventually, a checksum failure will indicate no more records exist to be replayed,
16//! at which point the mount can continue and the journal will be extended from that point with
17//! further mutations as required.
18
19mod bootstrap_handle;
20mod checksum_list;
21mod reader;
22pub mod super_block;
23mod writer;
24
25use crate::checksum::{Checksum, Checksums, ChecksumsV38};
26use crate::errors::FxfsError;
27use crate::filesystem::{ApplyContext, ApplyMode, FxFilesystem, SyncOptions};
28use crate::log::*;
29use crate::lsm_tree::cache::NullCache;
30use crate::lsm_tree::types::Layer;
31use crate::object_handle::{ObjectHandle as _, ReadObjectHandle};
32use crate::object_store::allocator::Allocator;
33use crate::object_store::data_object_handle::OverwriteOptions;
34use crate::object_store::extent_record::{
35    DEFAULT_DATA_ATTRIBUTE_ID, ExtentKey, ExtentMode, ExtentValue,
36};
37use crate::object_store::graveyard::Graveyard;
38use crate::object_store::journal::bootstrap_handle::BootstrapObjectHandle;
39use crate::object_store::journal::checksum_list::ChecksumList;
40use crate::object_store::journal::reader::{JournalReader, ReadResult};
41use crate::object_store::journal::super_block::{
42    SuperBlockHeader, SuperBlockInstance, SuperBlockManager,
43};
44use crate::object_store::journal::writer::JournalWriter;
45use crate::object_store::object_manager::ObjectManager;
46use crate::object_store::object_record::{AttributeKey, ObjectKey, ObjectKeyData, ObjectValue};
47use crate::object_store::transaction::{
48    AllocatorMutation, LockKey, Mutation, MutationV40, MutationV41, MutationV43, MutationV46,
49    MutationV47, MutationV49, MutationV50, ObjectStoreMutation, Options,
50    TRANSACTION_MAX_JOURNAL_USAGE, Transaction, TxnMutation, lock_keys,
51};
52use crate::object_store::{
53    AssocObj, DataObjectHandle, HandleOptions, HandleOwner, INVALID_OBJECT_ID, Item, ItemRef,
54    NewChildStoreOptions, ObjectStore, ReservedId,
55};
56use crate::range::RangeExt;
57use crate::round::{round_div, round_down};
58use crate::serialized_types::{LATEST_VERSION, Migrate, Version, Versioned, migrate_to_version};
59use anyhow::{Context, Error, anyhow, bail, ensure};
60use event_listener::Event;
61use fprint::TypeFingerprint;
62use fuchsia_sync::Mutex;
63use futures::FutureExt as _;
64use futures::future::poll_fn;
65use rustc_hash::FxHashMap as HashMap;
66use serde::{Deserialize, Serialize};
67use static_assertions::const_assert;
68use std::clone::Clone;
69use std::collections::HashSet;
70use std::num::NonZero;
71use std::ops::{Bound, Range};
72use std::sync::atomic::{AtomicBool, Ordering};
73use std::sync::{Arc, OnceLock};
74use std::task::{Poll, Waker};
75use storage_device::Device;
76
77// The journal file is written to in blocks of this size.
78pub const BLOCK_SIZE: u64 = 4096;
79
80// The journal file is extended by this amount when necessary.
81const CHUNK_SIZE: u64 = 131_072;
82const_assert!(CHUNK_SIZE > TRANSACTION_MAX_JOURNAL_USAGE);
83
84// See the comment for the `reclaim_size` member of Inner.
85pub const DEFAULT_RECLAIM_SIZE: u64 = 262_144;
86
87// Temporary space that should be reserved for the journal.  For example: space that is currently
88// used in the journal file but cannot be deallocated yet because we are flushing.
89pub const RESERVED_SPACE: u64 = 1_048_576;
90
91// Whenever the journal is replayed (i.e. the system is unmounted and remounted), we reset the
92// journal stream, at which point any half-complete transactions are discarded.  We indicate a
93// journal reset by XORing the previous block's checksum with this mask, and using that value as a
94// seed for the next journal block.
95const RESET_XOR: u64 = 0xffffffffffffffff;
96
97// To keep track of offsets within a journal file, we need both the file offset and the check-sum of
98// the preceding block, since the check-sum of the preceding block is an input to the check-sum of
99// every block.
100pub type JournalCheckpoint = JournalCheckpointV32;
101
102#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize, TypeFingerprint)]
103pub struct JournalCheckpointV32 {
104    pub file_offset: u64,
105
106    // Starting check-sum for block that contains file_offset i.e. the checksum for the previous
107    // block.
108    pub checksum: Checksum,
109
110    // If versioned, the version of elements stored in the journal. e.g. JournalRecord version.
111    // This can change across reset events so we store it along with the offset and checksum to
112    // know which version to deserialize.
113    pub version: Version,
114}
115
116pub type JournalRecord = JournalRecordV50;
117
118#[allow(clippy::large_enum_variant)]
119#[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint, Versioned)]
120#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
121pub enum JournalRecordV50 {
122    // Indicates no more records in this block.
123    EndBlock,
124    // Mutation for a particular object.  object_id here is for the collection i.e. the store or
125    // allocator.
126    Mutation { object_id: u64, mutation: MutationV50 },
127    // Commits records in the transaction.
128    Commit,
129    // Discard all mutations with offsets greater than or equal to the given offset.
130    Discard(u64),
131    // Indicates the device was flushed at the given journal offset.
132    // Note that this really means that at this point in the journal offset, we can be certain that
133    // there's no remaining buffered data in the block device; the buffers and the disk contents are
134    // consistent.
135    // We insert one of these records *after* a flush along with the *next* transaction to go
136    // through.  If that never comes (either due to graceful or hard shutdown), the journal reset
137    // on the next mount will serve the same purpose and count as a flush, although it is necessary
138    // to defensively flush the device before replaying the journal (if possible, i.e. not
139    // read-only) in case the block device connection was reused.
140    DidFlushDevice(u64),
141    // Checksums for a data range written by this transaction. A transaction is only valid if these
142    // checksums are right. The range is the device offset the checksums are for.
143    //
144    // A boolean indicates whether this range is being written to for the first time. For overwrite
145    // extents, we only check the checksums for a block if it has been written to for the first
146    // time since the last flush, because otherwise we can't roll it back anyway so it doesn't
147    // matter. For copy-on-write extents, the bool is always true.
148    DataChecksums(Range<u64>, ChecksumsV38, bool),
149}
150
151#[allow(clippy::large_enum_variant)]
152#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
153#[migrate_to_version(JournalRecordV50)]
154pub enum JournalRecordV49 {
155    EndBlock,
156    Mutation { object_id: u64, mutation: MutationV49 },
157    Commit,
158    Discard(u64),
159    DidFlushDevice(u64),
160    DataChecksums(Range<u64>, ChecksumsV38, bool),
161}
162
163#[allow(clippy::large_enum_variant)]
164#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
165#[migrate_to_version(JournalRecordV49)]
166pub enum JournalRecordV47 {
167    EndBlock,
168    Mutation { object_id: u64, mutation: MutationV47 },
169    Commit,
170    Discard(u64),
171    DidFlushDevice(u64),
172    DataChecksums(Range<u64>, ChecksumsV38, bool),
173}
174
175#[allow(clippy::large_enum_variant)]
176#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
177#[migrate_to_version(JournalRecordV47)]
178pub enum JournalRecordV46 {
179    EndBlock,
180    Mutation { object_id: u64, mutation: MutationV46 },
181    Commit,
182    Discard(u64),
183    DidFlushDevice(u64),
184    DataChecksums(Range<u64>, ChecksumsV38, bool),
185}
186
187#[allow(clippy::large_enum_variant)]
188#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
189#[migrate_to_version(JournalRecordV46)]
190pub enum JournalRecordV43 {
191    EndBlock,
192    Mutation { object_id: u64, mutation: MutationV43 },
193    Commit,
194    Discard(u64),
195    DidFlushDevice(u64),
196    DataChecksums(Range<u64>, ChecksumsV38, bool),
197}
198
199#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
200#[migrate_to_version(JournalRecordV43)]
201pub enum JournalRecordV42 {
202    EndBlock,
203    Mutation { object_id: u64, mutation: MutationV41 },
204    Commit,
205    Discard(u64),
206    DidFlushDevice(u64),
207    DataChecksums(Range<u64>, ChecksumsV38, bool),
208}
209
210#[derive(Serialize, Deserialize, TypeFingerprint, Versioned)]
211pub enum JournalRecordV41 {
212    EndBlock,
213    Mutation { object_id: u64, mutation: MutationV41 },
214    Commit,
215    Discard(u64),
216    DidFlushDevice(u64),
217    DataChecksums(Range<u64>, ChecksumsV38),
218}
219
220impl From<JournalRecordV41> for JournalRecordV42 {
221    fn from(record: JournalRecordV41) -> Self {
222        match record {
223            JournalRecordV41::EndBlock => Self::EndBlock,
224            JournalRecordV41::Mutation { object_id, mutation } => {
225                Self::Mutation { object_id, mutation: mutation.into() }
226            }
227            JournalRecordV41::Commit => Self::Commit,
228            JournalRecordV41::Discard(offset) => Self::Discard(offset),
229            JournalRecordV41::DidFlushDevice(offset) => Self::DidFlushDevice(offset),
230            JournalRecordV41::DataChecksums(range, sums) => {
231                // At the time of writing the only extents written by real systems are CoW extents
232                // so the new bool is always true.
233                Self::DataChecksums(range, sums, true)
234            }
235        }
236    }
237}
238
239#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
240#[migrate_to_version(JournalRecordV41)]
241pub enum JournalRecordV40 {
242    EndBlock,
243    Mutation { object_id: u64, mutation: MutationV40 },
244    Commit,
245    Discard(u64),
246    DidFlushDevice(u64),
247    DataChecksums(Range<u64>, ChecksumsV38),
248}
249
250pub(super) fn journal_handle_options() -> HandleOptions {
251    HandleOptions { skip_journal_checks: true, ..Default::default() }
252}
253
254/// The journal records a stream of mutations that are to be applied to other objects.  At mount
255/// time, these records can be replayed into memory.  It provides a way to quickly persist changes
256/// without having to make a large number of writes; they can be deferred to a later time (e.g.
257/// when a sufficient number have been queued).  It also provides support for transactions, the
258/// ability to have mutations that are to be applied atomically together.
259pub struct Journal {
260    objects: Arc<ObjectManager>,
261    handle: OnceLock<DataObjectHandle<ObjectStore>>,
262    super_block_manager: SuperBlockManager,
263    inner: Mutex<Inner>,
264    writer_mutex: Mutex<()>,
265    sync_mutex: futures::lock::Mutex<()>,
266    trace: AtomicBool,
267
268    // This event is used when we are waiting for a compaction to free up journal space.
269    reclaim_event: Event,
270}
271
272struct Inner {
273    super_block_header: SuperBlockHeader,
274
275    // The offset that we can zero the journal up to now that it is no longer needed.
276    zero_offset: Option<u64>,
277
278    // The journal offset that we most recently flushed to the device.
279    device_flushed_offset: u64,
280
281    // If true, indicates a DidFlushDevice record is pending.
282    needs_did_flush_device: bool,
283
284    // The writer for the journal.
285    writer: JournalWriter,
286
287    // Set when a reset is encountered during a read.
288    // Used at write pre_commit() time to ensure we write a version first thing after a reset.
289    output_reset_version: bool,
290
291    // Waker for the flush task.
292    flush_waker: Option<Waker>,
293
294    // Indicates the journal has been terminated.
295    terminate: bool,
296
297    // Latched error indicating reason for journal termination if not graceful.
298    terminate_reason: Option<Error>,
299
300    // Disable compactions.
301    disable_compactions: bool,
302
303    // True if compactions are running.
304    compaction_running: bool,
305
306    // Waker for the sync task for when it's waiting for the flush task to finish.
307    sync_waker: Option<Waker>,
308
309    // The last offset we flushed to the journal file.
310    flushed_offset: u64,
311
312    // The last offset that should be considered valid in the journal file.  Most of the time, this
313    // will be the same as `flushed_offset` but at mount time, this could be less and will only be
314    // up to the end of the last valid transaction; it won't include transactions that follow that
315    // have been discarded.
316    valid_to: u64,
317
318    // If, after replaying, we have to discard a number of mutations (because they don't validate),
319    // this offset specifies where we need to discard back to.  This is so that when we next replay,
320    // we ignore those mutations and continue with new good mutations.
321    discard_offset: Option<u64>,
322
323    // In the steady state, the journal should fluctuate between being approximately half of this
324    // number and this number.  New super-blocks will be written every time about half of this
325    // amount is written to the journal.
326    reclaim_size: u64,
327
328    image_builder_mode: Option<SuperBlockInstance>,
329
330    // If true and `needs_barrier`, issue a pre-barrier on the first device write of each journal
331    // write (which happens in multiples of `BLOCK_SIZE`). This ensures that all the corresponding
332    // data writes make it to disk before the journal gets written to.
333    barriers_enabled: bool,
334
335    // If true, indicates that data write requests have been made to the device since the last
336    // journal write.
337    needs_barrier: bool,
338}
339
340impl Inner {
341    fn terminate(&mut self, reason: Option<Error>) {
342        self.terminate = true;
343
344        if let Some(err) = reason {
345            error!(error:? = err; "Terminating journal");
346            // Log previous error if one was already set, otherwise latch the error.
347            if let Some(prev_err) = self.terminate_reason.as_ref() {
348                error!(error:? = prev_err; "Journal previously terminated");
349            } else {
350                self.terminate_reason = Some(err);
351            }
352        }
353
354        if let Some(waker) = self.flush_waker.take() {
355            waker.wake();
356        }
357        if let Some(waker) = self.sync_waker.take() {
358            waker.wake();
359        }
360    }
361}
362
363pub struct JournalOptions {
364    /// In the steady state, the journal should fluctuate between being approximately half of this
365    /// number and this number.  New super-blocks will be written every time about half of this
366    /// amount is written to the journal.
367    pub reclaim_size: u64,
368
369    // If true, issue a pre-barrier on the first device write of each journal write (which happens
370    // in multiples of `BLOCK_SIZE`). This ensures that all the corresponding data writes make it
371    // to disk before the journal gets written to.
372    pub barriers_enabled: bool,
373}
374
375impl Default for JournalOptions {
376    fn default() -> Self {
377        JournalOptions { reclaim_size: DEFAULT_RECLAIM_SIZE, barriers_enabled: false }
378    }
379}
380
381struct JournaledTransactions {
382    transactions: Vec<JournaledTransaction>,
383    device_flushed_offset: u64,
384}
385
386#[derive(Debug, Default)]
387pub struct JournaledTransaction {
388    pub checkpoint: JournalCheckpoint,
389    pub root_parent_mutations: Vec<Mutation>,
390    pub root_mutations: Vec<Mutation>,
391    /// List of (store_object_id, mutation).
392    pub non_root_mutations: Vec<(u64, Mutation)>,
393    pub end_offset: u64,
394    pub checksums: Vec<JournaledChecksums>,
395
396    /// Records offset + 1 of the matching begin_flush transaction. The +1 is because we want to
397    /// ignore the begin flush
398    /// transaction; we don't need or want to replay it.
399    pub end_flush: Option<(/* store_id: */ u64, /* begin offset: */ u64)>,
400
401    /// The volume which was deleted in this transaction, if any.
402    pub volume_deleted: Option</* store_id: */ u64>,
403}
404
405impl JournaledTransaction {
406    fn new(checkpoint: JournalCheckpoint) -> Self {
407        Self { checkpoint, ..Default::default() }
408    }
409}
410
411const VOLUME_DELETED: u64 = u64::MAX;
412
413#[derive(Debug)]
414pub struct JournaledChecksums {
415    pub device_range: Range<u64>,
416    pub checksums: Checksums,
417    pub first_write: bool,
418}
419
420/// Handles for journal-like objects have some additional functionality to manage their extents,
421/// since during replay we need to add extents as we find them.
422pub trait JournalHandle: ReadObjectHandle {
423    /// The end offset of the last extent in the JournalHandle.  Used only for validating extents
424    /// (which will be skipped if None is returned).
425    /// Note this is equivalent in value to ReadObjectHandle::get_size, when present.
426    fn end_offset(&self) -> Option<u64>;
427    /// Adds an extent to the current end of the journal stream.
428    /// `added_offset` is the offset into the journal of the transaction which added this extent,
429    /// used for discard_extents.
430    fn push_extent(&mut self, added_offset: u64, device_range: Range<u64>);
431    /// Discards all extents which were added in a transaction at offset >= |discard_offset|.
432    fn discard_extents(&mut self, discard_offset: u64);
433}
434
435// Provide a stub implementation for DataObjectHandle so we can use it in
436// Journal::read_transactions.  Manual extent management is a NOP (which is OK since presumably the
437// DataObjectHandle already knows where its extents live).
438impl<S: HandleOwner> JournalHandle for DataObjectHandle<S> {
439    fn end_offset(&self) -> Option<u64> {
440        None
441    }
442    fn push_extent(&mut self, _added_offset: u64, _device_range: Range<u64>) {
443        // NOP
444    }
445    fn discard_extents(&mut self, _discard_offset: u64) {
446        // NOP
447    }
448}
449
450#[fxfs_trace::trace]
451impl Journal {
452    pub fn new(objects: Arc<ObjectManager>, options: JournalOptions) -> Journal {
453        let starting_checksum = rand::random_range(1..u64::MAX);
454        Journal {
455            objects: objects,
456            handle: OnceLock::new(),
457            super_block_manager: SuperBlockManager::new(),
458            inner: Mutex::new(Inner {
459                super_block_header: SuperBlockHeader::default(),
460                zero_offset: None,
461                device_flushed_offset: 0,
462                needs_did_flush_device: false,
463                writer: JournalWriter::new(BLOCK_SIZE as usize, starting_checksum),
464                output_reset_version: false,
465                flush_waker: None,
466                terminate: false,
467                terminate_reason: None,
468                disable_compactions: false,
469                compaction_running: false,
470                sync_waker: None,
471                flushed_offset: 0,
472                valid_to: 0,
473                discard_offset: None,
474                reclaim_size: options.reclaim_size,
475                image_builder_mode: None,
476                barriers_enabled: options.barriers_enabled,
477                needs_barrier: false,
478            }),
479            writer_mutex: Mutex::new(()),
480            sync_mutex: futures::lock::Mutex::new(()),
481            trace: AtomicBool::new(false),
482            reclaim_event: Event::new(),
483        }
484    }
485
486    pub fn set_trace(&self, trace: bool) {
487        let old_value = self.trace.swap(trace, Ordering::Relaxed);
488        if trace != old_value {
489            info!(trace; "J: trace");
490        }
491    }
492
493    pub fn set_image_builder_mode(&self, mode: Option<SuperBlockInstance>) {
494        self.inner.lock().image_builder_mode = mode;
495        if let Some(instance) = mode {
496            *self.super_block_manager.next_instance.lock() = instance;
497        }
498    }
499
500    pub fn image_builder_mode(&self) -> Option<SuperBlockInstance> {
501        self.inner.lock().image_builder_mode
502    }
503
504    #[cfg(feature = "migration")]
505    pub fn set_filesystem_uuid(&self, uuid: &[u8; 16]) -> Result<(), Error> {
506        ensure!(
507            self.inner.lock().image_builder_mode.is_some(),
508            "Can only set filesystem uuid in image builder mode."
509        );
510        self.inner.lock().super_block_header.guid.0 = uuid::Uuid::from_bytes(*uuid);
511        Ok(())
512    }
513
514    pub(crate) async fn read_superblocks(
515        &self,
516        device: Arc<dyn Device>,
517        block_size: u64,
518    ) -> Result<(SuperBlockHeader, ObjectStore), Error> {
519        self.super_block_manager.load(device, block_size).await
520    }
521
522    /// Used during replay to validate a mutation.  This should return false if the mutation is not
523    /// valid and should not be applied.  This could be for benign reasons: e.g. the device flushed
524    /// data out-of-order, or because of a malicious actor.
525    fn validate_mutation(&self, mutation: &Mutation, block_size: u64, device_size: u64) -> bool {
526        match mutation {
527            Mutation::ObjectStore(ObjectStoreMutation {
528                item:
529                    Item {
530                        key:
531                            ObjectKey {
532                                data:
533                                    ObjectKeyData::Attribute(
534                                        _,
535                                        AttributeKey::Extent(ExtentKey { range }),
536                                    ),
537                                ..
538                            },
539                        value: ObjectValue::Extent(ExtentValue::Some { device_offset, mode, .. }),
540                        ..
541                    },
542                ..
543            }) => {
544                if range.is_empty() || !range.is_aligned(block_size) {
545                    return false;
546                }
547                let len = range.length().unwrap();
548                if let ExtentMode::Cow(checksums) = mode {
549                    if checksums.len() > 0 {
550                        if len % checksums.len() as u64 != 0 {
551                            return false;
552                        }
553                        if (len / checksums.len() as u64) % block_size != 0 {
554                            return false;
555                        }
556                    }
557                }
558                if *device_offset % block_size != 0
559                    || *device_offset >= device_size
560                    || device_size - *device_offset < len
561                {
562                    return false;
563                }
564            }
565            Mutation::ObjectStore(_) => {}
566            Mutation::EncryptedObjectStore(_) => {}
567            Mutation::Allocator(AllocatorMutation::Allocate { device_range, owner_object_id }) => {
568                return !device_range.is_empty()
569                    && *owner_object_id != INVALID_OBJECT_ID
570                    && device_range.end <= device_size;
571            }
572            Mutation::Allocator(AllocatorMutation::Deallocate {
573                device_range,
574                owner_object_id,
575            }) => {
576                return !device_range.is_empty()
577                    && *owner_object_id != INVALID_OBJECT_ID
578                    && device_range.end <= device_size;
579            }
580            Mutation::Allocator(AllocatorMutation::MarkForDeletion(owner_object_id)) => {
581                return *owner_object_id != INVALID_OBJECT_ID;
582            }
583            Mutation::Allocator(AllocatorMutation::SetLimit { owner_object_id, .. }) => {
584                return *owner_object_id != INVALID_OBJECT_ID;
585            }
586            Mutation::BeginFlush => {}
587            Mutation::EndFlush => {}
588            Mutation::DeleteVolume => {}
589            Mutation::UpdateBorrowed(_) => {}
590            Mutation::UpdateMutationsKey(_) => {}
591            Mutation::CreateInternalDir(owner_object_id) => {
592                return *owner_object_id != INVALID_OBJECT_ID;
593            }
594        }
595        true
596    }
597
598    // Assumes that `mutation` has been validated.
599    fn update_checksum_list(
600        &self,
601        journal_offset: u64,
602        mutation: &Mutation,
603        checksum_list: &mut ChecksumList,
604    ) -> Result<(), Error> {
605        match mutation {
606            Mutation::ObjectStore(_) => {}
607            Mutation::Allocator(AllocatorMutation::Deallocate { device_range, .. }) => {
608                checksum_list.mark_deallocated(journal_offset, device_range.clone().into());
609            }
610            _ => {}
611        }
612        Ok(())
613    }
614
615    /// Reads the latest super-block, and then replays journaled records.
616    #[trace]
617    pub async fn replay(
618        &self,
619        filesystem: Arc<FxFilesystem>,
620        on_new_allocator: Option<Box<dyn Fn(Arc<Allocator>) + Send + Sync>>,
621    ) -> Result<(), Error> {
622        let block_size = filesystem.block_size();
623
624        let (super_block, root_parent) =
625            self.super_block_manager.load(filesystem.device(), block_size).await?;
626
627        let root_parent = Arc::new(ObjectStore::attach_filesystem(root_parent, filesystem.clone()));
628
629        self.objects.set_root_parent_store(root_parent.clone());
630        let allocator =
631            Arc::new(Allocator::new(filesystem.clone(), super_block.allocator_object_id));
632        if let Some(on_new_allocator) = on_new_allocator {
633            on_new_allocator(allocator.clone());
634        }
635        self.objects.set_allocator(allocator.clone());
636        self.objects.set_borrowed_metadata_space(super_block.borrowed_metadata_space);
637        self.objects.set_last_end_offset(super_block.super_block_journal_file_offset);
638        {
639            let mut inner = self.inner.lock();
640            inner.super_block_header = super_block.clone();
641        }
642
643        let device = filesystem.device();
644
645        let mut handle;
646        {
647            let root_parent_layer = root_parent.tree().mutable_layer();
648            let mut iter = root_parent_layer
649                .seek(Bound::Included(&ObjectKey::attribute(
650                    super_block.journal_object_id,
651                    DEFAULT_DATA_ATTRIBUTE_ID,
652                    AttributeKey::Extent(ExtentKey::search_key_from_offset(round_down(
653                        super_block.journal_checkpoint.file_offset,
654                        BLOCK_SIZE,
655                    ))),
656                )))
657                .await
658                .context("Failed to seek root parent store")?;
659            let start_offset = if let Some(ItemRef {
660                key:
661                    ObjectKey {
662                        data:
663                            ObjectKeyData::Attribute(
664                                DEFAULT_DATA_ATTRIBUTE_ID,
665                                AttributeKey::Extent(ExtentKey { range }),
666                            ),
667                        ..
668                    },
669                ..
670            }) = iter.get()
671            {
672                range.start
673            } else {
674                0
675            };
676            handle = BootstrapObjectHandle::new_with_start_offset(
677                super_block.journal_object_id,
678                device.clone(),
679                start_offset,
680            );
681            while let Some(item) = iter.get() {
682                if !match item.into() {
683                    Some((
684                        object_id,
685                        DEFAULT_DATA_ATTRIBUTE_ID,
686                        ExtentKey { range },
687                        ExtentValue::Some { device_offset, .. },
688                    )) if object_id == super_block.journal_object_id => {
689                        if let Some(end_offset) = handle.end_offset() {
690                            if range.start != end_offset {
691                                bail!(anyhow!(FxfsError::Inconsistent).context(format!(
692                                    "Unexpected journal extent {:?}, expected start: {}",
693                                    item, end_offset
694                                )));
695                            }
696                        }
697                        handle.push_extent(
698                            0, // We never discard extents from the root parent store.
699                            *device_offset
700                                ..*device_offset + range.length().context("Invalid extent")?,
701                        );
702                        true
703                    }
704                    _ => false,
705                } {
706                    break;
707                }
708                iter.advance().await.context("Failed to advance root parent store iterator")?;
709            }
710        }
711
712        let mut reader = JournalReader::new(handle, &super_block.journal_checkpoint);
713        let JournaledTransactions { mut transactions, device_flushed_offset } = self
714            .read_transactions(&mut reader, None, INVALID_OBJECT_ID)
715            .await
716            .context("Reading transactions for replay")?;
717
718        // Validate all the mutations.
719        let mut checksum_list = ChecksumList::new(device_flushed_offset);
720        let mut valid_to = reader.journal_file_checkpoint().file_offset;
721        let device_size = device.size();
722        'bad_replay: for JournaledTransaction {
723            checkpoint,
724            root_parent_mutations,
725            root_mutations,
726            non_root_mutations,
727            checksums,
728            ..
729        } in &transactions
730        {
731            for JournaledChecksums { device_range, checksums, first_write } in checksums {
732                checksum_list
733                    .push(
734                        checkpoint.file_offset,
735                        device_range.clone(),
736                        checksums.maybe_as_ref().context("Malformed checksums")?,
737                        *first_write,
738                    )
739                    .context("Pushing journal checksum records to checksum list")?;
740            }
741            for mutation in root_parent_mutations
742                .iter()
743                .chain(root_mutations)
744                .chain(non_root_mutations.iter().map(|(_, m)| m))
745            {
746                if !self.validate_mutation(mutation, block_size, device_size) {
747                    info!(mutation:?; "Stopping replay at bad mutation");
748                    valid_to = checkpoint.file_offset;
749                    break 'bad_replay;
750                }
751                self.update_checksum_list(checkpoint.file_offset, &mutation, &mut checksum_list)?;
752            }
753        }
754
755        // Validate the checksums. Note if barriers are enabled, there will be no checksums in
756        // practice to verify.
757        let valid_to = checksum_list
758            .verify(device.as_ref(), valid_to)
759            .await
760            .context("Failed to validate checksums")?;
761
762        // Apply the mutations...
763
764        let mut last_checkpoint = reader.journal_file_checkpoint();
765        let mut journal_offsets = super_block.journal_file_offsets.clone();
766
767        // Start with the root-parent mutations, and also determine the journal offsets for all
768        // other objects.
769        for (
770            index,
771            JournaledTransaction {
772                checkpoint,
773                root_parent_mutations,
774                end_flush,
775                volume_deleted,
776                ..
777            },
778        ) in transactions.iter_mut().enumerate()
779        {
780            if checkpoint.file_offset >= valid_to {
781                last_checkpoint = checkpoint.clone();
782
783                // Truncate the transactions so we don't need to worry about them on the next pass.
784                transactions.truncate(index);
785                break;
786            }
787
788            let context = ApplyContext { mode: ApplyMode::Replay, checkpoint: checkpoint.clone() };
789            for mutation in root_parent_mutations.drain(..) {
790                self.objects
791                    .apply_mutation(
792                        super_block.root_parent_store_object_id,
793                        mutation,
794                        &context,
795                        AssocObj::None,
796                    )
797                    .context("Failed to replay root parent store mutations")?;
798            }
799
800            if let Some((object_id, journal_offset)) = end_flush {
801                journal_offsets.insert(*object_id, *journal_offset);
802            }
803
804            if let Some(object_id) = volume_deleted {
805                journal_offsets.insert(*object_id, VOLUME_DELETED);
806            }
807        }
808
809        // Now we can open the root store.
810        let root_store = ObjectStore::open(
811            &root_parent,
812            super_block.root_store_object_id,
813            Box::new(NullCache {}),
814        )
815        .await
816        .context("Unable to open root store")?;
817
818        ensure!(
819            !root_store.is_encrypted(),
820            anyhow!(FxfsError::Inconsistent).context("Root store is encrypted")
821        );
822        self.objects.set_root_store(root_store);
823
824        let root_store_offset =
825            journal_offsets.get(&super_block.root_store_object_id).copied().unwrap_or(0);
826
827        // Now replay the root store mutations.
828        for JournaledTransaction { checkpoint, root_mutations, .. } in &mut transactions {
829            if checkpoint.file_offset < root_store_offset {
830                continue;
831            }
832
833            let context = ApplyContext { mode: ApplyMode::Replay, checkpoint: checkpoint.clone() };
834            for mutation in root_mutations.drain(..) {
835                self.objects
836                    .apply_mutation(
837                        super_block.root_store_object_id,
838                        mutation,
839                        &context,
840                        AssocObj::None,
841                    )
842                    .context("Failed to replay root store mutations")?;
843            }
844        }
845
846        // Now we can open the allocator.
847        allocator.open().await.context("Failed to open allocator")?;
848
849        // Now replay all other mutations.
850        for JournaledTransaction { checkpoint, non_root_mutations, end_offset, .. } in transactions
851        {
852            self.objects
853                .replay_mutations(
854                    non_root_mutations,
855                    &journal_offsets,
856                    &ApplyContext { mode: ApplyMode::Replay, checkpoint },
857                    end_offset,
858                )
859                .await
860                .context("Failed to replay mutations")?;
861        }
862
863        allocator.on_replay_complete().await.context("Failed to complete replay for allocator")?;
864
865        let discarded_to =
866            if last_checkpoint.file_offset != reader.journal_file_checkpoint().file_offset {
867                Some(reader.journal_file_checkpoint().file_offset)
868            } else {
869                None
870            };
871
872        // Configure the journal writer so that we can continue.
873        {
874            if last_checkpoint.file_offset < super_block.super_block_journal_file_offset {
875                return Err(anyhow!(FxfsError::Inconsistent).context(format!(
876                    "journal replay cut short; journal finishes at {}, but super-block was \
877                     written at {}",
878                    last_checkpoint.file_offset, super_block.super_block_journal_file_offset
879                )));
880            }
881            let handle = ObjectStore::open_object(
882                &root_parent,
883                super_block.journal_object_id,
884                journal_handle_options(),
885                None,
886            )
887            .await
888            .with_context(|| {
889                format!(
890                    "Failed to open journal file (object id: {})",
891                    super_block.journal_object_id
892                )
893            })?;
894            let _ = self.handle.set(handle);
895            let mut inner = self.inner.lock();
896            reader.skip_to_end_of_block();
897            let mut writer_checkpoint = reader.journal_file_checkpoint();
898
899            // Make sure we don't accidentally use the reader from now onwards.
900            std::mem::drop(reader);
901
902            // Reset the stream to indicate that we've remounted the journal.
903            writer_checkpoint.checksum ^= RESET_XOR;
904            writer_checkpoint.version = LATEST_VERSION;
905            inner.flushed_offset = writer_checkpoint.file_offset;
906
907            // When we open the filesystem as writable, we flush the device.
908            inner.device_flushed_offset = inner.flushed_offset;
909
910            inner.writer.seek(writer_checkpoint);
911            inner.output_reset_version = true;
912            inner.valid_to = last_checkpoint.file_offset;
913            if last_checkpoint.file_offset < inner.flushed_offset {
914                inner.discard_offset = Some(last_checkpoint.file_offset);
915            }
916        }
917
918        self.objects
919            .on_replay_complete()
920            .await
921            .context("Failed to complete replay for object manager")?;
922
923        info!(checkpoint = last_checkpoint.file_offset, discarded_to; "replay complete");
924        Ok(())
925    }
926
927    async fn read_transactions(
928        &self,
929        reader: &mut JournalReader,
930        end_offset: Option<u64>,
931        object_id_filter: u64,
932    ) -> Result<JournaledTransactions, Error> {
933        let mut transactions = Vec::new();
934        let (mut device_flushed_offset, root_parent_store_object_id, root_store_object_id) = {
935            let super_block = &self.inner.lock().super_block_header;
936            (
937                super_block.super_block_journal_file_offset,
938                super_block.root_parent_store_object_id,
939                super_block.root_store_object_id,
940            )
941        };
942        let mut current_transaction = None;
943        let mut begin_flush_offsets = HashMap::default();
944        let mut stores_deleted = HashSet::new();
945        loop {
946            // Cache the checkpoint before we deserialize a record.
947            let checkpoint = reader.journal_file_checkpoint();
948            if let Some(end_offset) = end_offset {
949                if checkpoint.file_offset >= end_offset {
950                    break;
951                }
952            }
953            let result =
954                reader.deserialize().await.context("Failed to deserialize journal record")?;
955            match result {
956                ReadResult::Reset(_) => {
957                    if current_transaction.is_some() {
958                        current_transaction = None;
959                        transactions.pop();
960                    }
961                    let offset = reader.journal_file_checkpoint().file_offset;
962                    if offset > device_flushed_offset {
963                        device_flushed_offset = offset;
964                    }
965                }
966                ReadResult::Some(record) => {
967                    match record {
968                        JournalRecord::EndBlock => {
969                            reader.skip_to_end_of_block();
970                        }
971                        JournalRecord::Mutation { object_id, mutation } => {
972                            let current_transaction = match current_transaction.as_mut() {
973                                None => {
974                                    transactions.push(JournaledTransaction::new(checkpoint));
975                                    current_transaction = transactions.last_mut();
976                                    current_transaction.as_mut().unwrap()
977                                }
978                                Some(transaction) => transaction,
979                            };
980
981                            if stores_deleted.contains(&object_id) {
982                                bail!(
983                                    anyhow!(FxfsError::Inconsistent)
984                                        .context("Encountered mutations for deleted store")
985                                );
986                            }
987
988                            match &mutation {
989                                Mutation::BeginFlush => {
990                                    begin_flush_offsets.insert(
991                                        object_id,
992                                        current_transaction.checkpoint.file_offset,
993                                    );
994                                }
995                                Mutation::EndFlush => {
996                                    if let Some(offset) = begin_flush_offsets.remove(&object_id) {
997                                        if let Some(deleted_volume) =
998                                            &current_transaction.volume_deleted
999                                        {
1000                                            if *deleted_volume == object_id {
1001                                                bail!(anyhow!(FxfsError::Inconsistent).context(
1002                                                    "Multiple EndFlush/DeleteVolume mutations in a \
1003                                                    single transaction for the same object"
1004                                                ));
1005                                            }
1006                                        }
1007                                        // The +1 is because we don't want to replay the transaction
1008                                        // containing the begin flush.
1009                                        if current_transaction
1010                                            .end_flush
1011                                            .replace((object_id, offset + 1))
1012                                            .is_some()
1013                                        {
1014                                            bail!(anyhow!(FxfsError::Inconsistent).context(
1015                                                "Multiple EndFlush mutations in a \
1016                                                 single transaction"
1017                                            ));
1018                                        }
1019                                    }
1020                                }
1021                                Mutation::DeleteVolume => {
1022                                    if let Some((flushed_object, _)) =
1023                                        &current_transaction.end_flush
1024                                    {
1025                                        if *flushed_object == object_id {
1026                                            bail!(anyhow!(FxfsError::Inconsistent).context(
1027                                                "Multiple EndFlush/DeleteVolume mutations in a \
1028                                                    single transaction for the same object"
1029                                            ));
1030                                        }
1031                                    }
1032                                    if current_transaction
1033                                        .volume_deleted
1034                                        .replace(object_id)
1035                                        .is_some()
1036                                    {
1037                                        bail!(anyhow!(FxfsError::Inconsistent).context(
1038                                            "Multiple DeleteVolume mutations in a single \
1039                                             transaction"
1040                                        ));
1041                                    }
1042                                    stores_deleted.insert(object_id);
1043                                }
1044                                _ => {}
1045                            }
1046
1047                            // If this mutation doesn't need to be applied, don't bother adding it
1048                            // to the transaction.
1049                            if (object_id_filter == INVALID_OBJECT_ID
1050                                || object_id_filter == object_id)
1051                                && self.should_apply(object_id, &current_transaction.checkpoint)
1052                            {
1053                                if object_id == root_parent_store_object_id {
1054                                    current_transaction.root_parent_mutations.push(mutation);
1055                                } else if object_id == root_store_object_id {
1056                                    current_transaction.root_mutations.push(mutation);
1057                                } else {
1058                                    current_transaction
1059                                        .non_root_mutations
1060                                        .push((object_id, mutation));
1061                                }
1062                            }
1063                        }
1064                        JournalRecord::DataChecksums(device_range, checksums, first_write) => {
1065                            let current_transaction = match current_transaction.as_mut() {
1066                                None => {
1067                                    transactions.push(JournaledTransaction::new(checkpoint));
1068                                    current_transaction = transactions.last_mut();
1069                                    current_transaction.as_mut().unwrap()
1070                                }
1071                                Some(transaction) => transaction,
1072                            };
1073                            current_transaction.checksums.push(JournaledChecksums {
1074                                device_range,
1075                                checksums,
1076                                first_write,
1077                            });
1078                        }
1079                        JournalRecord::Commit => {
1080                            if let Some(&mut JournaledTransaction {
1081                                ref checkpoint,
1082                                ref root_parent_mutations,
1083                                ref mut end_offset,
1084                                ..
1085                            }) = current_transaction.take()
1086                            {
1087                                for mutation in root_parent_mutations {
1088                                    // Snoop the mutations for any that might apply to the journal
1089                                    // file so that we can pass them to the reader so that it can
1090                                    // read the journal file.
1091                                    if let Mutation::ObjectStore(ObjectStoreMutation {
1092                                        item:
1093                                            Item {
1094                                                key:
1095                                                    ObjectKey {
1096                                                        object_id,
1097                                                        data:
1098                                                            ObjectKeyData::Attribute(
1099                                                                DEFAULT_DATA_ATTRIBUTE_ID,
1100                                                                AttributeKey::Extent(ExtentKey {
1101                                                                    range,
1102                                                                }),
1103                                                            ),
1104                                                        ..
1105                                                    },
1106                                                value:
1107                                                    ObjectValue::Extent(ExtentValue::Some {
1108                                                        device_offset,
1109                                                        ..
1110                                                    }),
1111                                                ..
1112                                            },
1113                                        ..
1114                                    }) = mutation
1115                                    {
1116                                        // Add the journal extents we find on the way to our
1117                                        // reader.
1118                                        let handle = reader.handle();
1119                                        if *object_id != handle.object_id() {
1120                                            continue;
1121                                        }
1122                                        if let Some(end_offset) = handle.end_offset() {
1123                                            if range.start != end_offset {
1124                                                bail!(anyhow!(FxfsError::Inconsistent).context(
1125                                                    format!(
1126                                                        "Unexpected journal extent {:?} -> {}, \
1127                                                           expected start: {}",
1128                                                        range, device_offset, end_offset,
1129                                                    )
1130                                                ));
1131                                            }
1132                                        }
1133                                        handle.push_extent(
1134                                            checkpoint.file_offset,
1135                                            *device_offset
1136                                                ..*device_offset
1137                                                    + range.length().context("Invalid extent")?,
1138                                        );
1139                                    }
1140                                }
1141                                *end_offset = reader.journal_file_checkpoint().file_offset;
1142                            }
1143                        }
1144                        JournalRecord::Discard(offset) => {
1145                            if offset == 0 {
1146                                bail!(
1147                                    anyhow!(FxfsError::Inconsistent)
1148                                        .context("Invalid offset for Discard")
1149                                );
1150                            }
1151                            if let Some(transaction) = current_transaction.as_ref() {
1152                                if transaction.checkpoint.file_offset < offset {
1153                                    // Odd, but OK.
1154                                    continue;
1155                                }
1156                            }
1157                            current_transaction = None;
1158                            while let Some(transaction) = transactions.last() {
1159                                if transaction.checkpoint.file_offset < offset {
1160                                    break;
1161                                }
1162                                transactions.pop();
1163                            }
1164                            reader.handle().discard_extents(offset);
1165                        }
1166                        JournalRecord::DidFlushDevice(offset) => {
1167                            if offset > device_flushed_offset {
1168                                device_flushed_offset = offset;
1169                            }
1170                        }
1171                    }
1172                }
1173                // This is expected when we reach the end of the journal stream.
1174                ReadResult::ChecksumMismatch => break,
1175            }
1176        }
1177
1178        // Discard any uncommitted transaction.
1179        if current_transaction.is_some() {
1180            transactions.pop();
1181        }
1182
1183        Ok(JournaledTransactions { transactions, device_flushed_offset })
1184    }
1185
1186    /// Creates an empty filesystem with the minimum viable objects (including a root parent and
1187    /// root store but no further child stores).
1188    pub async fn init_empty(&self, filesystem: Arc<FxFilesystem>) -> Result<(), Error> {
1189        // The following constants are only used at format time. When mounting, the recorded values
1190        // in the superblock should be used.  The root parent store does not have a parent, but
1191        // needs an object ID to be registered with ObjectManager, so it cannot collide (i.e. have
1192        // the same object ID) with any objects in the root store that use the journal to track
1193        // mutations.
1194        const INIT_ROOT_PARENT_STORE_OBJECT_ID: u64 = 3;
1195        const INIT_ROOT_STORE_OBJECT_ID: u64 = 4;
1196        const INIT_ALLOCATOR_OBJECT_ID: u64 = 5;
1197
1198        info!(device_size = filesystem.device().size(); "Formatting");
1199
1200        let checkpoint = JournalCheckpoint {
1201            version: LATEST_VERSION,
1202            ..self.inner.lock().writer.journal_file_checkpoint()
1203        };
1204
1205        let mut current_generation = 1;
1206        if filesystem.options().image_builder_mode.is_some() {
1207            // Note that in non-image_builder_mode we write both superblocks when we format
1208            // (in FxFilesystemBuilder::open). In image_builder_mode we only write once at the end
1209            // as part of finalize(), which is why we must make sure the generation we write is
1210            // newer than any existing generation.
1211
1212            // Note: This should is the *filesystem* block size, not the device block size which
1213            // is currently always 4096 (https://fxbug.dev/42063349)
1214            let block_size = filesystem.block_size();
1215            match self.read_superblocks(filesystem.device(), block_size).await {
1216                Ok((super_block, _)) => {
1217                    log::info!(
1218                        "Found existing superblock with generation {}. Bumping by 1.",
1219                        super_block.generation
1220                    );
1221                    current_generation = super_block.generation.wrapping_add(1);
1222                }
1223                Err(_) => {
1224                    // TODO(https://fxbug.dev/463757813): It's not unusual to fail to read
1225                    // superblocks when we're formatting a new filesystem but we should probably
1226                    // fail the format if we get an IO error.
1227                }
1228            }
1229        }
1230
1231        let root_parent = ObjectStore::new_empty(
1232            None,
1233            INIT_ROOT_PARENT_STORE_OBJECT_ID,
1234            filesystem.clone(),
1235            Box::new(NullCache {}),
1236        );
1237        self.objects.set_root_parent_store(root_parent.clone());
1238
1239        let allocator = Arc::new(Allocator::new(filesystem.clone(), INIT_ALLOCATOR_OBJECT_ID));
1240        self.objects.set_allocator(allocator.clone());
1241        self.objects.init_metadata_reservation()?;
1242
1243        let journal_handle;
1244        let super_block_a_handle;
1245        let super_block_b_handle;
1246        let root_store;
1247        let mut transaction = filesystem
1248            .clone()
1249            .new_transaction(
1250                lock_keys![],
1251                Options { skip_journal_checks: true, ..Default::default() },
1252            )
1253            .await?;
1254        root_store = root_parent
1255            .new_child_store(
1256                &mut transaction,
1257                NewChildStoreOptions { object_id: INIT_ROOT_STORE_OBJECT_ID, ..Default::default() },
1258                Box::new(NullCache {}),
1259            )
1260            .await
1261            .context("new_child_store")?;
1262        self.objects.set_root_store(root_store.clone());
1263
1264        allocator.create(&mut transaction).await?;
1265
1266        // Create the super-block objects...
1267        super_block_a_handle = ObjectStore::create_object_with_id(
1268            &root_store,
1269            &mut transaction,
1270            ReservedId::new(&root_store, NonZero::new(SuperBlockInstance::A.object_id()).unwrap()),
1271            HandleOptions::default(),
1272            None,
1273        )
1274        .context("create super block")?;
1275        root_store.update_last_object_id(SuperBlockInstance::A.object_id());
1276        super_block_a_handle
1277            .extend(&mut transaction, SuperBlockInstance::A.first_extent())
1278            .await
1279            .context("extend super block")?;
1280        super_block_b_handle = ObjectStore::create_object_with_id(
1281            &root_store,
1282            &mut transaction,
1283            ReservedId::new(&root_store, NonZero::new(SuperBlockInstance::B.object_id()).unwrap()),
1284            HandleOptions::default(),
1285            None,
1286        )
1287        .context("create super block")?;
1288        root_store.update_last_object_id(SuperBlockInstance::B.object_id());
1289        super_block_b_handle
1290            .extend(&mut transaction, SuperBlockInstance::B.first_extent())
1291            .await
1292            .context("extend super block")?;
1293
1294        // the journal object...
1295        journal_handle = ObjectStore::create_object(
1296            &root_parent,
1297            &mut transaction,
1298            journal_handle_options(),
1299            None,
1300        )
1301        .await
1302        .context("create journal")?;
1303        if self.inner.lock().image_builder_mode.is_none() {
1304            let mut file_range = 0..self.chunk_size();
1305            journal_handle
1306                .preallocate_range(&mut transaction, &mut file_range)
1307                .await
1308                .context("preallocate journal")?;
1309            if file_range.start < file_range.end {
1310                bail!("preallocate_range returned too little space");
1311            }
1312        }
1313
1314        // Write the root store object info.
1315        root_store.create(&mut transaction).await?;
1316
1317        // The root parent graveyard.
1318        root_parent.set_graveyard_directory_object_id(
1319            Graveyard::create(&mut transaction, &root_parent).await?,
1320        );
1321
1322        transaction.commit().await?;
1323
1324        self.inner.lock().super_block_header = SuperBlockHeader::new(
1325            current_generation,
1326            root_parent.store_object_id(),
1327            root_parent.graveyard_directory_object_id(),
1328            root_store.store_object_id(),
1329            allocator.object_id(),
1330            journal_handle.object_id(),
1331            checkpoint,
1332            /* earliest_version: */ LATEST_VERSION,
1333        );
1334
1335        // Initialize the journal writer.
1336        let _ = self.handle.set(journal_handle);
1337        Ok(())
1338    }
1339
1340    /// Normally we allocate the journal when creating the filesystem.
1341    /// This is used image_builder_mode when journal allocation is done last.
1342    pub async fn allocate_journal(&self) -> Result<(), Error> {
1343        let handle = self.handle.get().unwrap();
1344        let filesystem = handle.store().filesystem();
1345        let mut transaction = filesystem
1346            .clone()
1347            .new_transaction(
1348                lock_keys![LockKey::object(handle.store().store_object_id(), handle.object_id()),],
1349                Options { skip_journal_checks: true, ..Default::default() },
1350            )
1351            .await?;
1352        let mut file_range = 0..self.chunk_size();
1353        self.handle
1354            .get()
1355            .unwrap()
1356            .preallocate_range(&mut transaction, &mut file_range)
1357            .await
1358            .context("preallocate journal")?;
1359        if file_range.start < file_range.end {
1360            bail!("preallocate_range returned too little space");
1361        }
1362        transaction.commit().await?;
1363        Ok(())
1364    }
1365
1366    pub async fn init_superblocks(&self) -> Result<(), Error> {
1367        // Overwrite both superblocks.
1368        for _ in 0..2 {
1369            self.write_super_block().await?;
1370        }
1371        Ok(())
1372    }
1373
1374    /// Takes a snapshot of all journaled transactions which affect |object_id| since its last
1375    /// flush.
1376    /// The caller is responsible for locking; it must ensure that the journal is not trimmed during
1377    /// this call.  For example, a Flush lock could be held on the object in question (assuming that
1378    /// object has data to flush and is registered with ObjectManager).
1379    pub async fn read_transactions_for_object(
1380        &self,
1381        object_id: u64,
1382    ) -> Result<Vec<JournaledTransaction>, Error> {
1383        let handle = self.handle.get().expect("No journal handle");
1384        // Reopen the handle since JournalReader needs an owned handle.
1385        let handle = ObjectStore::open_object(
1386            handle.owner(),
1387            handle.object_id(),
1388            journal_handle_options(),
1389            None,
1390        )
1391        .await?;
1392
1393        let checkpoint = match self.objects.journal_checkpoint(object_id) {
1394            Some(checkpoint) => checkpoint,
1395            None => return Ok(vec![]),
1396        };
1397        let mut reader = JournalReader::new(handle, &checkpoint);
1398        // Record the current end offset and only read to there, so we don't accidentally read any
1399        // partially flushed blocks.
1400        let end_offset = self.inner.lock().valid_to;
1401        Ok(self.read_transactions(&mut reader, Some(end_offset), object_id).await?.transactions)
1402    }
1403
1404    /// Commits a transaction.  This is not thread safe; the caller must take appropriate locks.
1405    pub async fn commit(&self, transaction: &mut Transaction<'_>) -> Result<u64, Error> {
1406        if transaction.is_empty() {
1407            return Ok(self.inner.lock().writer.journal_file_checkpoint().file_offset);
1408        }
1409
1410        self.pre_commit(transaction).await?;
1411        Ok(self.write_and_apply_mutations(transaction))
1412    }
1413
1414    // Before we commit, we might need to extend the journal or write pending records to the
1415    // journal.
1416    async fn pre_commit(&self, transaction: &Transaction<'_>) -> Result<(), Error> {
1417        let handle;
1418
1419        let (size, zero_offset) = {
1420            let mut inner = self.inner.lock();
1421
1422            // If this is the first write after a RESET, we need to output version first.
1423            if std::mem::take(&mut inner.output_reset_version) {
1424                LATEST_VERSION.serialize_into(&mut inner.writer)?;
1425            }
1426
1427            if let Some(discard_offset) = inner.discard_offset {
1428                JournalRecord::Discard(discard_offset).serialize_into(&mut inner.writer)?;
1429                inner.discard_offset = None;
1430            }
1431
1432            if inner.needs_did_flush_device {
1433                let offset = inner.device_flushed_offset;
1434                JournalRecord::DidFlushDevice(offset).serialize_into(&mut inner.writer)?;
1435                inner.needs_did_flush_device = false;
1436            }
1437
1438            handle = match self.handle.get() {
1439                None => return Ok(()),
1440                Some(x) => x,
1441            };
1442
1443            let file_offset = inner.writer.journal_file_checkpoint().file_offset;
1444
1445            let size = handle.get_size();
1446            let size = if file_offset + self.chunk_size() > size { Some(size) } else { None };
1447
1448            if size.is_none()
1449                && inner.zero_offset.is_none()
1450                && !self.objects.needs_borrow_for_journal(file_offset)
1451            {
1452                return Ok(());
1453            }
1454
1455            (size, inner.zero_offset)
1456        };
1457
1458        let mut transaction = handle
1459            .new_transaction_with_options(Options {
1460                skip_journal_checks: true,
1461                borrow_metadata_space: true,
1462                allocator_reservation: Some(self.objects.metadata_reservation()),
1463                txn_guard: Some(transaction.txn_guard()),
1464                ..Default::default()
1465            })
1466            .await?;
1467        if let Some(size) = size {
1468            handle
1469                .preallocate_range(&mut transaction, &mut (size..size + self.chunk_size()))
1470                .await?;
1471        }
1472        if let Some(zero_offset) = zero_offset {
1473            handle.zero(&mut transaction, 0..zero_offset).await?;
1474        }
1475
1476        // We can't use regular transaction commit, because that can cause re-entrancy issues, so
1477        // instead we just apply the transaction directly here.
1478        self.write_and_apply_mutations(&mut transaction);
1479
1480        let mut inner = self.inner.lock();
1481
1482        // Make sure the transaction to extend the journal made it to the journal within the old
1483        // size, since otherwise, it won't be possible to replay.
1484        if let Some(size) = size {
1485            assert!(inner.writer.journal_file_checkpoint().file_offset < size);
1486        }
1487
1488        if inner.zero_offset == zero_offset {
1489            inner.zero_offset = None;
1490        }
1491
1492        Ok(())
1493    }
1494
1495    // Determines whether a mutation at the given checkpoint should be applied.  During replay, not
1496    // all records should be applied because the object store or allocator might already contain the
1497    // mutation.  After replay, that obviously isn't the case and we want to apply all mutations.
1498    fn should_apply(&self, object_id: u64, journal_file_checkpoint: &JournalCheckpoint) -> bool {
1499        let super_block_header = &self.inner.lock().super_block_header;
1500        let offset = super_block_header
1501            .journal_file_offsets
1502            .get(&object_id)
1503            .cloned()
1504            .unwrap_or(super_block_header.super_block_journal_file_offset);
1505        journal_file_checkpoint.file_offset >= offset
1506    }
1507
1508    /// Flushes previous writes to the device and then writes out a new super-block.
1509    /// Callers must ensure that we do not make concurrent calls.
1510    async fn write_super_block(&self) -> Result<(), Error> {
1511        let root_parent_store = self.objects.root_parent_store();
1512
1513        // We need to flush previous writes to the device since the new super-block we are writing
1514        // relies on written data being observable, and we also need to lock the root parent store
1515        // so that no new entries are written to it whilst we are writing the super-block, and for
1516        // that we use the write lock.
1517        let old_layers;
1518        let old_super_block_offset;
1519        let mut new_super_block_header;
1520        let checkpoint;
1521        let borrowed;
1522
1523        {
1524            let _sync_guard = debug_assert_not_too_long!(self.sync_mutex.lock());
1525            {
1526                let _write_guard = self.writer_mutex.lock();
1527                (checkpoint, borrowed) = self.pad_to_block()?;
1528                old_layers = super_block::compact_root_parent(&*root_parent_store)?;
1529            }
1530            self.flush_device(checkpoint.file_offset)
1531                .await
1532                .context("flush failed when writing superblock")?;
1533        }
1534
1535        new_super_block_header = self.inner.lock().super_block_header.clone();
1536
1537        old_super_block_offset = new_super_block_header.journal_checkpoint.file_offset;
1538
1539        let (journal_file_offsets, min_checkpoint) = self.objects.journal_file_offsets();
1540
1541        new_super_block_header.generation = new_super_block_header.generation.wrapping_add(1);
1542        new_super_block_header.super_block_journal_file_offset = checkpoint.file_offset;
1543        new_super_block_header.journal_checkpoint = min_checkpoint.unwrap_or(checkpoint);
1544        new_super_block_header.journal_checkpoint.version = LATEST_VERSION;
1545        new_super_block_header.journal_file_offsets = journal_file_offsets;
1546        new_super_block_header.borrowed_metadata_space = borrowed;
1547
1548        self.super_block_manager
1549            .save(
1550                new_super_block_header.clone(),
1551                self.objects.root_parent_store().filesystem(),
1552                old_layers,
1553            )
1554            .await?;
1555        {
1556            let mut inner = self.inner.lock();
1557            inner.super_block_header = new_super_block_header;
1558            inner.zero_offset = Some(round_down(old_super_block_offset, BLOCK_SIZE));
1559        }
1560
1561        Ok(())
1562    }
1563
1564    /// Flushes any buffered journal data to the device.  Note that this does not flush the device
1565    /// unless the flush_device option is set, in which case data should have been persisted to
1566    /// lower layers.  If a precondition is supplied, it is evaluated and the sync will be skipped
1567    /// if it returns false.  This allows callers to check a condition whilst a lock is held.  If a
1568    /// sync is performed, this function returns the checkpoint that was flushed and the amount of
1569    /// borrowed metadata space at the point it was flushed.
1570    pub async fn sync(
1571        &self,
1572        options: SyncOptions<'_>,
1573    ) -> Result<Option<(JournalCheckpoint, u64)>, Error> {
1574        let _guard = debug_assert_not_too_long!(self.sync_mutex.lock());
1575
1576        let (checkpoint, borrowed) = {
1577            if let Some(precondition) = options.precondition {
1578                if !precondition() {
1579                    return Ok(None);
1580                }
1581            }
1582
1583            // This guard is required so that we don't insert an EndBlock record in the middle of a
1584            // transaction.
1585            let _guard = self.writer_mutex.lock();
1586
1587            self.pad_to_block()?
1588        };
1589
1590        if options.flush_device {
1591            self.flush_device(checkpoint.file_offset).await.context("sync: flush failed")?;
1592        }
1593
1594        Ok(Some((checkpoint, borrowed)))
1595    }
1596
1597    // Returns the checkpoint as it was prior to padding.  This is done because the super block
1598    // needs to record where the last transaction ends and it's the next transaction that pays the
1599    // price of the padding.
1600    fn pad_to_block(&self) -> Result<(JournalCheckpoint, u64), Error> {
1601        let mut inner = self.inner.lock();
1602        let checkpoint = inner.writer.journal_file_checkpoint();
1603        if checkpoint.file_offset % BLOCK_SIZE != 0 {
1604            JournalRecord::EndBlock.serialize_into(&mut inner.writer)?;
1605            inner.writer.pad_to_block()?;
1606            if let Some(waker) = inner.flush_waker.take() {
1607                waker.wake();
1608            }
1609        }
1610        Ok((checkpoint, self.objects.borrowed_metadata_space()))
1611    }
1612
1613    async fn flush_device(&self, checkpoint_offset: u64) -> Result<(), Error> {
1614        assert!(
1615            self.inner.lock().image_builder_mode.is_none(),
1616            "flush_device called in image builder mode"
1617        );
1618        debug_assert_not_too_long!(poll_fn(|ctx| {
1619            let mut inner = self.inner.lock();
1620            if inner.flushed_offset >= checkpoint_offset {
1621                Poll::Ready(Ok(()))
1622            } else if inner.terminate {
1623                let context = inner
1624                    .terminate_reason
1625                    .as_ref()
1626                    .map(|e| format!("Journal closed with error: {:?}", e))
1627                    .unwrap_or_else(|| "Journal closed".to_string());
1628                Poll::Ready(Err(anyhow!(FxfsError::JournalFlushError).context(context)))
1629            } else {
1630                inner.sync_waker = Some(ctx.waker().clone());
1631                Poll::Pending
1632            }
1633        }))?;
1634
1635        let needs_flush = self.inner.lock().device_flushed_offset < checkpoint_offset;
1636        if needs_flush {
1637            let trace = self.trace.load(Ordering::Relaxed);
1638            if trace {
1639                info!("J: start flush device");
1640            }
1641            self.handle.get().unwrap().flush_device().await?;
1642            if trace {
1643                info!("J: end flush device");
1644            }
1645
1646            // We need to write a DidFlushDevice record at some point, but if we are in the
1647            // process of shutting down the filesystem, we want to leave the journal clean to
1648            // avoid there being log messages complaining about unwritten journal data, so we
1649            // queue it up so that the next transaction will trigger this record to be written.
1650            // If we are shutting down, that will never happen but since the DidFlushDevice
1651            // message is purely advisory (it reduces the number of checksums we have to verify
1652            // during replay), it doesn't matter if it isn't written.
1653            {
1654                let mut inner = self.inner.lock();
1655                inner.device_flushed_offset = checkpoint_offset;
1656                inner.needs_did_flush_device = true;
1657            }
1658
1659            // Tell the allocator that we flushed the device so that it can now start using
1660            // space that was deallocated.
1661            self.objects.allocator().did_flush_device(checkpoint_offset);
1662            if trace {
1663                info!("J: did flush device");
1664            }
1665        }
1666
1667        Ok(())
1668    }
1669
1670    /// Returns a copy of the super-block header.
1671    pub fn super_block_header(&self) -> SuperBlockHeader {
1672        self.inner.lock().super_block_header.clone()
1673    }
1674
1675    /// Waits for there to be sufficient space in the journal.
1676    pub async fn check_journal_space(&self) -> Result<(), Error> {
1677        loop {
1678            debug_assert_not_too_long!({
1679                let inner = self.inner.lock();
1680                if inner.terminate {
1681                    // If the flush error is set, this will never make progress, since we can't
1682                    // extend the journal any more.
1683                    let context = inner
1684                        .terminate_reason
1685                        .as_ref()
1686                        .map(|e| format!("Journal closed with error: {:?}", e))
1687                        .unwrap_or_else(|| "Journal closed".to_string());
1688                    break Err(anyhow!(FxfsError::JournalFlushError).context(context));
1689                }
1690                if self.objects.last_end_offset()
1691                    - inner.super_block_header.journal_checkpoint.file_offset
1692                    < inner.reclaim_size
1693                {
1694                    break Ok(());
1695                }
1696                if inner.image_builder_mode.is_some() {
1697                    break Ok(());
1698                }
1699                if inner.disable_compactions {
1700                    break Err(
1701                        anyhow!(FxfsError::JournalFlushError).context("Compactions disabled")
1702                    );
1703                }
1704                self.reclaim_event.listen()
1705            });
1706        }
1707    }
1708
1709    fn chunk_size(&self) -> u64 {
1710        CHUNK_SIZE
1711    }
1712
1713    fn write_and_apply_mutations(&self, transaction: &mut Transaction<'_>) -> u64 {
1714        let checkpoint_before;
1715        let checkpoint_after;
1716        {
1717            let _guard = self.writer_mutex.lock();
1718            checkpoint_before = {
1719                let mut inner = self.inner.lock();
1720                if transaction.includes_write() {
1721                    inner.needs_barrier = true;
1722                }
1723                let checkpoint = inner.writer.journal_file_checkpoint();
1724                for TxnMutation { object_id, mutation, .. } in transaction.mutations() {
1725                    self.objects.write_mutation(
1726                        *object_id,
1727                        mutation,
1728                        Writer(*object_id, &mut inner.writer),
1729                    );
1730                }
1731                checkpoint
1732            };
1733            let maybe_mutation =
1734                self.objects.apply_transaction(transaction, &checkpoint_before).expect(
1735                    "apply_transaction should not fail in live mode; \
1736                     filesystem will be in an inconsistent state",
1737                );
1738            checkpoint_after = {
1739                let mut inner = self.inner.lock();
1740                if let Some(mutation) = maybe_mutation {
1741                    inner
1742                        .writer
1743                        .write_record(&JournalRecord::Mutation { object_id: 0, mutation })
1744                        .unwrap();
1745                }
1746                for (device_range, checksums, first_write) in
1747                    transaction.take_checksums().into_iter()
1748                {
1749                    inner
1750                        .writer
1751                        .write_record(&JournalRecord::DataChecksums(
1752                            device_range,
1753                            Checksums::fletcher(checksums),
1754                            first_write,
1755                        ))
1756                        .unwrap();
1757                }
1758                inner.writer.write_record(&JournalRecord::Commit).unwrap();
1759
1760                inner.writer.journal_file_checkpoint()
1761            };
1762        }
1763        self.objects.did_commit_transaction(
1764            transaction,
1765            &checkpoint_before,
1766            checkpoint_after.file_offset,
1767        );
1768
1769        if let Some(waker) = self.inner.lock().flush_waker.take() {
1770            waker.wake();
1771        }
1772
1773        checkpoint_before.file_offset
1774    }
1775
1776    /// This task will flush journal data to the device when there is data that needs flushing, and
1777    /// trigger compactions when short of journal space.  It will return after the terminate method
1778    /// has been called, or an error is encountered with either flushing or compaction.
1779    pub async fn flush_task(self: Arc<Self>) {
1780        let mut flush_fut = None;
1781        let mut compact_fut = None;
1782        let mut flush_error = false;
1783        poll_fn(|ctx| {
1784            loop {
1785                {
1786                    let mut inner = self.inner.lock();
1787                    if flush_fut.is_none() && !flush_error && self.handle.get().is_some() {
1788                        let flushable = inner.writer.flushable_bytes();
1789                        if flushable > 0 {
1790                            flush_fut = Some(Box::pin(self.flush(flushable)));
1791                        }
1792                    }
1793                    if inner.terminate && flush_fut.is_none() && compact_fut.is_none() {
1794                        return Poll::Ready(());
1795                    }
1796                    // The / 2 is here because after compacting, we cannot reclaim the space until the
1797                    // _next_ time we flush the device since the super-block is not guaranteed to
1798                    // persist until then.
1799                    if compact_fut.is_none()
1800                        && !inner.terminate
1801                        && !inner.disable_compactions
1802                        && inner.image_builder_mode.is_none()
1803                        && self.objects.last_end_offset()
1804                            - inner.super_block_header.journal_checkpoint.file_offset
1805                            > inner.reclaim_size / 2
1806                    {
1807                        compact_fut = Some(Box::pin(self.compact()));
1808                        inner.compaction_running = true;
1809                    }
1810                    inner.flush_waker = Some(ctx.waker().clone());
1811                }
1812                let mut pending = true;
1813                if let Some(fut) = flush_fut.as_mut() {
1814                    if let Poll::Ready(result) = fut.poll_unpin(ctx) {
1815                        if let Err(e) = result {
1816                            self.inner.lock().terminate(Some(e.context("Flush error")));
1817                            self.reclaim_event.notify(usize::MAX);
1818                            flush_error = true;
1819                        }
1820                        flush_fut = None;
1821                        pending = false;
1822                    }
1823                }
1824                if let Some(fut) = compact_fut.as_mut() {
1825                    if let Poll::Ready(result) = fut.poll_unpin(ctx) {
1826                        let mut inner = self.inner.lock();
1827                        if let Err(e) = result {
1828                            inner.terminate(Some(e.context("Compaction error")));
1829                        }
1830                        compact_fut = None;
1831                        inner.compaction_running = false;
1832                        self.reclaim_event.notify(usize::MAX);
1833                        pending = false;
1834                    }
1835                }
1836                if pending {
1837                    return Poll::Pending;
1838                }
1839            }
1840        })
1841        .await;
1842    }
1843
1844    async fn flush(&self, amount: usize) -> Result<(), Error> {
1845        let handle = self.handle.get().unwrap();
1846        let mut buf = handle.allocate_buffer(amount).await;
1847        let (offset, len, barrier_on_first_write) = {
1848            let mut inner = self.inner.lock();
1849            let offset = inner.writer.take_flushable(buf.as_mut());
1850            let barrier_on_first_write = inner.needs_barrier && inner.barriers_enabled;
1851            // Reset `needs_barrier` before instead of after the overwrite in case a txn commit
1852            // that contains data happens during the overwrite.
1853            inner.needs_barrier = false;
1854            (offset, buf.len() as u64, barrier_on_first_write)
1855        };
1856        self.handle
1857            .get()
1858            .unwrap()
1859            .overwrite(
1860                offset,
1861                buf.as_mut(),
1862                OverwriteOptions { barrier_on_first_write, ..Default::default() },
1863            )
1864            .await?;
1865
1866        let mut inner = self.inner.lock();
1867        if let Some(waker) = inner.sync_waker.take() {
1868            waker.wake();
1869        }
1870        inner.flushed_offset = offset + len;
1871        inner.valid_to = inner.flushed_offset;
1872        Ok(())
1873    }
1874
1875    /// This should generally NOT be called externally. It is public to allow use by FIDL service
1876    /// fxfs.Debug.
1877    #[trace]
1878    pub async fn compact(&self) -> Result<(), Error> {
1879        assert!(
1880            self.inner.lock().image_builder_mode.is_none(),
1881            "compact called in image builder mode"
1882        );
1883        let trace = self.trace.load(Ordering::Relaxed);
1884        debug!("Compaction starting");
1885        if trace {
1886            info!("J: start compaction");
1887        }
1888        let earliest_version = self.objects.flush().await.context("Failed to flush objects")?;
1889        self.inner.lock().super_block_header.earliest_version = earliest_version;
1890        self.write_super_block().await.context("Failed to write superblock")?;
1891        if trace {
1892            info!("J: end compaction");
1893        }
1894        debug!("Compaction finished");
1895        Ok(())
1896    }
1897
1898    pub async fn stop_compactions(&self) {
1899        loop {
1900            debug_assert_not_too_long!({
1901                let mut inner = self.inner.lock();
1902                inner.disable_compactions = true;
1903                if !inner.compaction_running {
1904                    return;
1905                }
1906                self.reclaim_event.listen()
1907            });
1908        }
1909    }
1910
1911    /// Creates a lazy inspect node named `str` under `parent` which will yield statistics for the
1912    /// journal when queried.
1913    pub fn track_statistics(self: &Arc<Self>, parent: &fuchsia_inspect::Node, name: &str) {
1914        let this = Arc::downgrade(self);
1915        parent.record_lazy_child(name, move || {
1916            let this_clone = this.clone();
1917            async move {
1918                let inspector = fuchsia_inspect::Inspector::default();
1919                if let Some(this) = this_clone.upgrade() {
1920                    let (journal_min, journal_max, journal_reclaim_size) = {
1921                        // TODO(https://fxbug.dev/42069513): Push-back or rate-limit to prevent DoS.
1922                        let inner = this.inner.lock();
1923                        (
1924                            round_down(
1925                                inner.super_block_header.journal_checkpoint.file_offset,
1926                                BLOCK_SIZE,
1927                            ),
1928                            inner.flushed_offset,
1929                            inner.reclaim_size,
1930                        )
1931                    };
1932                    let root = inspector.root();
1933                    root.record_uint("journal_min_offset", journal_min);
1934                    root.record_uint("journal_max_offset", journal_max);
1935                    root.record_uint("journal_size", journal_max - journal_min);
1936                    root.record_uint("journal_reclaim_size", journal_reclaim_size);
1937
1938                    // TODO(https://fxbug.dev/42068224): Post-compute rather than manually computing metrics.
1939                    if let Some(x) = round_div(
1940                        100 * (journal_max - journal_min),
1941                        this.objects.allocator().get_disk_bytes(),
1942                    ) {
1943                        root.record_uint("journal_size_to_disk_size_percent", x);
1944                    }
1945                }
1946                Ok(inspector)
1947            }
1948            .boxed()
1949        });
1950    }
1951
1952    /// Terminate all journal activity.
1953    pub fn terminate(&self) {
1954        self.inner.lock().terminate(/*reason*/ None);
1955        self.reclaim_event.notify(usize::MAX);
1956    }
1957}
1958
1959/// Wrapper to allow records to be written to the journal.
1960pub struct Writer<'a>(u64, &'a mut JournalWriter);
1961
1962impl Writer<'_> {
1963    pub fn write(&mut self, mutation: Mutation) {
1964        self.1.write_record(&JournalRecord::Mutation { object_id: self.0, mutation }).unwrap();
1965    }
1966}
1967
1968#[cfg(test)]
1969mod tests {
1970    use crate::filesystem::{FxFilesystem, FxFilesystemBuilder, SyncOptions};
1971    use crate::fsck::fsck;
1972    use crate::object_handle::{ObjectHandle, ReadObjectHandle, WriteObjectHandle};
1973    use crate::object_store::directory::Directory;
1974    use crate::object_store::journal::SuperBlockInstance;
1975    use crate::object_store::transaction::Options;
1976    use crate::object_store::volume::root_volume;
1977    use crate::object_store::{
1978        HandleOptions, LockKey, NewChildStoreOptions, ObjectStore, StoreOptions, lock_keys,
1979    };
1980    use fuchsia_async as fasync;
1981    use fuchsia_async::MonotonicDuration;
1982    use storage_device::DeviceHolder;
1983    use storage_device::fake_device::FakeDevice;
1984
1985    const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
1986
1987    #[fuchsia::test]
1988    async fn test_replay() {
1989        const TEST_DATA: &[u8] = b"hello";
1990
1991        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1992
1993        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1994
1995        let object_id = {
1996            let root_store = fs.root_store();
1997            let root_directory =
1998                Directory::open(&root_store, root_store.root_directory_object_id())
1999                    .await
2000                    .expect("open failed");
2001            let mut transaction = fs
2002                .clone()
2003                .new_transaction(
2004                    lock_keys![LockKey::object(
2005                        root_store.store_object_id(),
2006                        root_store.root_directory_object_id(),
2007                    )],
2008                    Options::default(),
2009                )
2010                .await
2011                .expect("new_transaction failed");
2012            let handle = root_directory
2013                .create_child_file(&mut transaction, "test")
2014                .await
2015                .expect("create_child_file failed");
2016
2017            transaction.commit().await.expect("commit failed");
2018            let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
2019            buf.as_mut_slice().copy_from_slice(TEST_DATA);
2020            handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2021            // As this is the first sync, this will actually trigger a new super-block, but normally
2022            // this would not be the case.
2023            fs.sync(SyncOptions::default()).await.expect("sync failed");
2024            handle.object_id()
2025        };
2026
2027        {
2028            fs.close().await.expect("Close failed");
2029            let device = fs.take_device().await;
2030            device.reopen(false);
2031            let fs = FxFilesystem::open(device).await.expect("open failed");
2032            let handle = ObjectStore::open_object(
2033                &fs.root_store(),
2034                object_id,
2035                HandleOptions::default(),
2036                None,
2037            )
2038            .await
2039            .expect("open_object failed");
2040            let mut buf = handle.allocate_buffer(TEST_DEVICE_BLOCK_SIZE as usize).await;
2041            assert_eq!(handle.read(0, buf.as_mut()).await.expect("read failed"), TEST_DATA.len());
2042            assert_eq!(&buf.as_slice()[..TEST_DATA.len()], TEST_DATA);
2043            fsck(fs.clone()).await.expect("fsck failed");
2044            fs.close().await.expect("Close failed");
2045        }
2046    }
2047
2048    #[fuchsia::test]
2049    async fn test_reset() {
2050        const TEST_DATA: &[u8] = b"hello";
2051
2052        let device = DeviceHolder::new(FakeDevice::new(32768, TEST_DEVICE_BLOCK_SIZE));
2053
2054        let mut object_ids = Vec::new();
2055
2056        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2057        {
2058            let root_store = fs.root_store();
2059            let root_directory =
2060                Directory::open(&root_store, root_store.root_directory_object_id())
2061                    .await
2062                    .expect("open failed");
2063            let mut transaction = fs
2064                .clone()
2065                .new_transaction(
2066                    lock_keys![LockKey::object(
2067                        root_store.store_object_id(),
2068                        root_store.root_directory_object_id(),
2069                    )],
2070                    Options::default(),
2071                )
2072                .await
2073                .expect("new_transaction failed");
2074            let handle = root_directory
2075                .create_child_file(&mut transaction, "test")
2076                .await
2077                .expect("create_child_file failed");
2078            transaction.commit().await.expect("commit failed");
2079            let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
2080            buf.as_mut_slice().copy_from_slice(TEST_DATA);
2081            handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2082            fs.sync(SyncOptions::default()).await.expect("sync failed");
2083            object_ids.push(handle.object_id());
2084
2085            // Create a lot of objects but don't sync at the end. This should leave the filesystem
2086            // with a half finished transaction that cannot be replayed.
2087            for i in 0..1000 {
2088                let mut transaction = fs
2089                    .clone()
2090                    .new_transaction(
2091                        lock_keys![LockKey::object(
2092                            root_store.store_object_id(),
2093                            root_store.root_directory_object_id(),
2094                        )],
2095                        Options::default(),
2096                    )
2097                    .await
2098                    .expect("new_transaction failed");
2099                let handle = root_directory
2100                    .create_child_file(&mut transaction, &format!("{}", i))
2101                    .await
2102                    .expect("create_child_file failed");
2103                transaction.commit().await.expect("commit failed");
2104                let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
2105                buf.as_mut_slice().copy_from_slice(TEST_DATA);
2106                handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2107                object_ids.push(handle.object_id());
2108            }
2109        }
2110        fs.close().await.expect("fs close failed");
2111        let device = fs.take_device().await;
2112        device.reopen(false);
2113        let fs = FxFilesystem::open(device).await.expect("open failed");
2114        fsck(fs.clone()).await.expect("fsck failed");
2115        {
2116            let root_store = fs.root_store();
2117            // Check the first two objects which should exist.
2118            for &object_id in &object_ids[0..1] {
2119                let handle = ObjectStore::open_object(
2120                    &root_store,
2121                    object_id,
2122                    HandleOptions::default(),
2123                    None,
2124                )
2125                .await
2126                .expect("open_object failed");
2127                let mut buf = handle.allocate_buffer(TEST_DEVICE_BLOCK_SIZE as usize).await;
2128                assert_eq!(
2129                    handle.read(0, buf.as_mut()).await.expect("read failed"),
2130                    TEST_DATA.len()
2131                );
2132                assert_eq!(&buf.as_slice()[..TEST_DATA.len()], TEST_DATA);
2133            }
2134
2135            // Write one more object and sync.
2136            let root_directory =
2137                Directory::open(&root_store, root_store.root_directory_object_id())
2138                    .await
2139                    .expect("open failed");
2140            let mut transaction = fs
2141                .clone()
2142                .new_transaction(
2143                    lock_keys![LockKey::object(
2144                        root_store.store_object_id(),
2145                        root_store.root_directory_object_id(),
2146                    )],
2147                    Options::default(),
2148                )
2149                .await
2150                .expect("new_transaction failed");
2151            let handle = root_directory
2152                .create_child_file(&mut transaction, "test2")
2153                .await
2154                .expect("create_child_file failed");
2155            transaction.commit().await.expect("commit failed");
2156            let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
2157            buf.as_mut_slice().copy_from_slice(TEST_DATA);
2158            handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2159            fs.sync(SyncOptions::default()).await.expect("sync failed");
2160            object_ids.push(handle.object_id());
2161        }
2162
2163        fs.close().await.expect("close failed");
2164        let device = fs.take_device().await;
2165        device.reopen(false);
2166        let fs = FxFilesystem::open(device).await.expect("open failed");
2167        {
2168            fsck(fs.clone()).await.expect("fsck failed");
2169
2170            // Check the first two and the last objects.
2171            for &object_id in object_ids[0..1].iter().chain(object_ids.last().cloned().iter()) {
2172                let handle = ObjectStore::open_object(
2173                    &fs.root_store(),
2174                    object_id,
2175                    HandleOptions::default(),
2176                    None,
2177                )
2178                .await
2179                .unwrap_or_else(|e| {
2180                    panic!("open_object failed (object_id: {}): {:?}", object_id, e)
2181                });
2182                let mut buf = handle.allocate_buffer(TEST_DEVICE_BLOCK_SIZE as usize).await;
2183                assert_eq!(
2184                    handle.read(0, buf.as_mut()).await.expect("read failed"),
2185                    TEST_DATA.len()
2186                );
2187                assert_eq!(&buf.as_slice()[..TEST_DATA.len()], TEST_DATA);
2188            }
2189        }
2190        fs.close().await.expect("close failed");
2191    }
2192
2193    #[fuchsia::test]
2194    async fn test_discard() {
2195        let device = {
2196            let device = DeviceHolder::new(FakeDevice::new(8192, 4096));
2197            let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2198            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
2199
2200            let store = root_volume
2201                .new_volume("test", NewChildStoreOptions::default())
2202                .await
2203                .expect("new_volume failed");
2204            let root_directory = Directory::open(&store, store.root_directory_object_id())
2205                .await
2206                .expect("open failed");
2207
2208            // Create enough data so that another journal extent is used.
2209            let mut i = 0;
2210            loop {
2211                let mut transaction = fs
2212                    .clone()
2213                    .new_transaction(
2214                        lock_keys![LockKey::object(
2215                            store.store_object_id(),
2216                            store.root_directory_object_id()
2217                        )],
2218                        Options::default(),
2219                    )
2220                    .await
2221                    .expect("new_transaction failed");
2222                root_directory
2223                    .create_child_file(&mut transaction, &format!("a {i}"))
2224                    .await
2225                    .expect("create_child_file failed");
2226                if transaction.commit().await.expect("commit failed") > super::CHUNK_SIZE {
2227                    break;
2228                }
2229                i += 1;
2230            }
2231
2232            // Compact and then disable compactions.
2233            fs.journal().compact().await.expect("compact failed");
2234            fs.journal().stop_compactions().await;
2235
2236            // Keep going until we need another journal extent.
2237            let mut i = 0;
2238            loop {
2239                let mut transaction = fs
2240                    .clone()
2241                    .new_transaction(
2242                        lock_keys![LockKey::object(
2243                            store.store_object_id(),
2244                            store.root_directory_object_id()
2245                        )],
2246                        Options::default(),
2247                    )
2248                    .await
2249                    .expect("new_transaction failed");
2250                root_directory
2251                    .create_child_file(&mut transaction, &format!("b {i}"))
2252                    .await
2253                    .expect("create_child_file failed");
2254                if transaction.commit().await.expect("commit failed") > 2 * super::CHUNK_SIZE {
2255                    break;
2256                }
2257                i += 1;
2258            }
2259
2260            // Allow the journal to flush, but we don't want to sync.
2261            fasync::Timer::new(MonotonicDuration::from_millis(10)).await;
2262            // Because we're not gracefully closing the filesystem, a Discard record will be
2263            // emitted.
2264            fs.device().snapshot().expect("snapshot failed")
2265        };
2266
2267        let fs = FxFilesystem::open(device).await.expect("open failed");
2268
2269        {
2270            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
2271
2272            let store =
2273                root_volume.volume("test", StoreOptions::default()).await.expect("volume failed");
2274
2275            let root_directory = Directory::open(&store, store.root_directory_object_id())
2276                .await
2277                .expect("open failed");
2278
2279            // Write one more transaction.
2280            let mut transaction = fs
2281                .clone()
2282                .new_transaction(
2283                    lock_keys![LockKey::object(
2284                        store.store_object_id(),
2285                        store.root_directory_object_id()
2286                    )],
2287                    Options::default(),
2288                )
2289                .await
2290                .expect("new_transaction failed");
2291            root_directory
2292                .create_child_file(&mut transaction, &format!("d"))
2293                .await
2294                .expect("create_child_file failed");
2295            transaction.commit().await.expect("commit failed");
2296        }
2297
2298        fs.close().await.expect("close failed");
2299        let device = fs.take_device().await;
2300        device.reopen(false);
2301
2302        let fs = FxFilesystem::open(device).await.expect("open failed");
2303        fsck(fs.clone()).await.expect("fsck failed");
2304        fs.close().await.expect("close failed");
2305    }
2306
2307    #[fuchsia::test]
2308    async fn test_use_existing_generation() {
2309        let device = DeviceHolder::new(FakeDevice::new(8192, 4096));
2310
2311        // First format should be generation 1.
2312        let fs = FxFilesystemBuilder::new()
2313            .format(true)
2314            .image_builder_mode(Some(SuperBlockInstance::A))
2315            .open(device)
2316            .await
2317            .expect("open failed");
2318        fs.enable_allocations();
2319        let generation0 = fs.super_block_header().generation;
2320        assert_eq!(generation0, 1);
2321        fs.close().await.expect("close failed");
2322        let device = fs.take_device().await;
2323        device.reopen(false);
2324
2325        // Format the device normally (again, generation 1).
2326        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2327        let generation1 = fs.super_block_header().generation;
2328        {
2329            let root_volume = crate::object_store::volume::root_volume(fs.clone())
2330                .await
2331                .expect("root_volume failed");
2332            root_volume
2333                .new_volume("test", crate::object_store::NewChildStoreOptions::default())
2334                .await
2335                .expect("new_volume failed");
2336        }
2337        fs.close().await.expect("close failed");
2338        let device = fs.take_device().await;
2339        device.reopen(false);
2340
2341        // Format again with image_builder_mode.
2342        let fs = FxFilesystemBuilder::new()
2343            .format(true)
2344            .image_builder_mode(Some(SuperBlockInstance::A))
2345            .open(device)
2346            .await
2347            .expect("open failed");
2348        fs.enable_allocations();
2349        let generation2 = fs.super_block_header().generation;
2350        assert!(
2351            generation2 > generation1,
2352            "generation2 ({}) should be greater than generation1 ({})",
2353            generation2,
2354            generation1
2355        );
2356        fs.close().await.expect("close failed");
2357    }
2358
2359    #[fuchsia::test]
2360    async fn test_image_builder_mode_generation_bump_512_byte_block() {
2361        let device = DeviceHolder::new(FakeDevice::new(16384, 512));
2362
2363        // Format initial filesystem (generation 1)
2364        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2365        let generation1 = fs.super_block_header().generation;
2366        fs.close().await.expect("close failed");
2367        let device = fs.take_device().await;
2368        device.reopen(false);
2369
2370        // Format again with image_builder_mode (should bump generation)
2371        let fs = FxFilesystemBuilder::new()
2372            .format(true)
2373            .image_builder_mode(Some(SuperBlockInstance::A))
2374            .open(device)
2375            .await
2376            .expect("open failed");
2377
2378        fs.enable_allocations();
2379        let generation2 = fs.super_block_header().generation;
2380        assert!(
2381            generation2 > generation1,
2382            "Expected generation bump, got {} vs {}",
2383            generation2,
2384            generation1
2385        );
2386        fs.close().await.expect("close failed");
2387    }
2388}
2389
2390#[cfg(fuzz)]
2391mod fuzz {
2392    use fuzz::fuzz;
2393
2394    #[fuzz]
2395    fn fuzz_journal_bytes(input: Vec<u8>) {
2396        use crate::filesystem::FxFilesystem;
2397        use fuchsia_async as fasync;
2398        use std::io::Write;
2399        use storage_device::DeviceHolder;
2400        use storage_device::fake_device::FakeDevice;
2401
2402        fasync::SendExecutorBuilder::new().num_threads(4).build().run(async move {
2403            let device = DeviceHolder::new(FakeDevice::new(32768, 512));
2404            let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2405            fs.journal().inner.lock().writer.write_all(&input).expect("write failed");
2406            fs.close().await.expect("close failed");
2407            let device = fs.take_device().await;
2408            device.reopen(false);
2409            if let Ok(fs) = FxFilesystem::open(device).await {
2410                // `close()` can fail if there were objects to be tombstoned. If the said object is
2411                // corrupted, there will be an error when we compact the journal.
2412                let _ = fs.close().await;
2413            }
2414        });
2415    }
2416
2417    #[fuzz]
2418    fn fuzz_journal(input: Vec<super::JournalRecord>) {
2419        use crate::filesystem::FxFilesystem;
2420        use fuchsia_async as fasync;
2421        use storage_device::DeviceHolder;
2422        use storage_device::fake_device::FakeDevice;
2423
2424        fasync::SendExecutorBuilder::new().num_threads(4).build().run(async move {
2425            let device = DeviceHolder::new(FakeDevice::new(32768, 512));
2426            let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2427            {
2428                let mut inner = fs.journal().inner.lock();
2429                for record in &input {
2430                    let _ = inner.writer.write_record(record);
2431                }
2432            }
2433            fs.close().await.expect("close failed");
2434            let device = fs.take_device().await;
2435            device.reopen(false);
2436            if let Ok(fs) = FxFilesystem::open(device).await {
2437                // `close()` can fail if there were objects to be tombstoned. If the said object is
2438                // corrupted, there will be an error when we compact the journal.
2439                let _ = fs.close().await;
2440            }
2441        });
2442    }
2443}