fxfs/object_store/
journal.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! The journal is implemented as an ever extending file which contains variable length records
6//! that describe mutations to be applied to various objects.  The journal file consists of
7//! blocks, with a checksum at the end of each block, but otherwise it can be considered a
8//! continuous stream.
9//!
10//! The checksum is seeded with the checksum from the previous block.  To free space in the
11//! journal, records are replaced with sparse extents when it is known they are no longer
12//! needed to mount.
13//!
14//! At mount time, the journal is replayed: the mutations are applied into memory.
15//! Eventually, a checksum failure will indicate no more records exist to be replayed,
16//! at which point the mount can continue and the journal will be extended from that point with
17//! further mutations as required.
18
19mod bootstrap_handle;
20mod checksum_list;
21mod reader;
22pub mod super_block;
23mod writer;
24
25use crate::checksum::{Checksum, Checksums, ChecksumsV38};
26use crate::errors::FxfsError;
27use crate::filesystem::{ApplyContext, ApplyMode, FxFilesystem, SyncOptions};
28use crate::log::*;
29use crate::lsm_tree::cache::NullCache;
30use crate::lsm_tree::types::Layer;
31use crate::object_handle::{ObjectHandle as _, ReadObjectHandle};
32use crate::object_store::allocator::Allocator;
33use crate::object_store::data_object_handle::OverwriteOptions;
34use crate::object_store::extent_record::{
35    DEFAULT_DATA_ATTRIBUTE_ID, ExtentKey, ExtentMode, ExtentValue,
36};
37use crate::object_store::graveyard::Graveyard;
38use crate::object_store::journal::bootstrap_handle::BootstrapObjectHandle;
39use crate::object_store::journal::checksum_list::ChecksumList;
40use crate::object_store::journal::reader::{JournalReader, ReadResult};
41use crate::object_store::journal::super_block::{
42    SuperBlockHeader, SuperBlockInstance, SuperBlockManager,
43};
44use crate::object_store::journal::writer::JournalWriter;
45use crate::object_store::object_manager::ObjectManager;
46use crate::object_store::object_record::{AttributeKey, ObjectKey, ObjectKeyData, ObjectValue};
47use crate::object_store::transaction::{
48    AllocatorMutation, LockKey, Mutation, MutationV40, MutationV41, MutationV43, MutationV46,
49    MutationV47, MutationV49, ObjectStoreMutation, Options, TRANSACTION_MAX_JOURNAL_USAGE,
50    Transaction, TxnMutation, lock_keys,
51};
52use crate::object_store::{
53    AssocObj, DataObjectHandle, HandleOptions, HandleOwner, INVALID_OBJECT_ID, Item, ItemRef,
54    NewChildStoreOptions, ObjectStore,
55};
56use crate::range::RangeExt;
57use crate::round::{round_div, round_down};
58use crate::serialized_types::{LATEST_VERSION, Migrate, Version, Versioned, migrate_to_version};
59use anyhow::{Context, Error, anyhow, bail, ensure};
60use event_listener::Event;
61use fprint::TypeFingerprint;
62use fuchsia_sync::Mutex;
63use futures::FutureExt as _;
64use futures::future::poll_fn;
65use once_cell::sync::OnceCell;
66use rustc_hash::FxHashMap as HashMap;
67use serde::{Deserialize, Serialize};
68use static_assertions::const_assert;
69use std::clone::Clone;
70use std::collections::HashSet;
71use std::ops::{Bound, Range};
72use std::sync::Arc;
73use std::sync::atomic::{AtomicBool, Ordering};
74use std::task::{Poll, Waker};
75
76// The journal file is written to in blocks of this size.
77pub const BLOCK_SIZE: u64 = 4096;
78
79// The journal file is extended by this amount when necessary.
80const CHUNK_SIZE: u64 = 131_072;
81const_assert!(CHUNK_SIZE > TRANSACTION_MAX_JOURNAL_USAGE);
82
83// See the comment for the `reclaim_size` member of Inner.
84pub const DEFAULT_RECLAIM_SIZE: u64 = 262_144;
85
86// Temporary space that should be reserved for the journal.  For example: space that is currently
87// used in the journal file but cannot be deallocated yet because we are flushing.
88pub const RESERVED_SPACE: u64 = 1_048_576;
89
90// Whenever the journal is replayed (i.e. the system is unmounted and remounted), we reset the
91// journal stream, at which point any half-complete transactions are discarded.  We indicate a
92// journal reset by XORing the previous block's checksum with this mask, and using that value as a
93// seed for the next journal block.
94const RESET_XOR: u64 = 0xffffffffffffffff;
95
96// To keep track of offsets within a journal file, we need both the file offset and the check-sum of
97// the preceding block, since the check-sum of the preceding block is an input to the check-sum of
98// every block.
99pub type JournalCheckpoint = JournalCheckpointV32;
100
101#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize, TypeFingerprint)]
102pub struct JournalCheckpointV32 {
103    pub file_offset: u64,
104
105    // Starting check-sum for block that contains file_offset i.e. the checksum for the previous
106    // block.
107    pub checksum: Checksum,
108
109    // If versioned, the version of elements stored in the journal. e.g. JournalRecord version.
110    // This can change across reset events so we store it along with the offset and checksum to
111    // know which version to deserialize.
112    pub version: Version,
113}
114
115pub type JournalRecord = JournalRecordV49;
116
117#[allow(clippy::large_enum_variant)]
118#[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint, Versioned)]
119#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
120pub enum JournalRecordV49 {
121    // Indicates no more records in this block.
122    EndBlock,
123    // Mutation for a particular object.  object_id here is for the collection i.e. the store or
124    // allocator.
125    Mutation { object_id: u64, mutation: MutationV49 },
126    // Commits records in the transaction.
127    Commit,
128    // Discard all mutations with offsets greater than or equal to the given offset.
129    Discard(u64),
130    // Indicates the device was flushed at the given journal offset.
131    // Note that this really means that at this point in the journal offset, we can be certain that
132    // there's no remaining buffered data in the block device; the buffers and the disk contents are
133    // consistent.
134    // We insert one of these records *after* a flush along with the *next* transaction to go
135    // through.  If that never comes (either due to graceful or hard shutdown), the journal reset
136    // on the next mount will serve the same purpose and count as a flush, although it is necessary
137    // to defensively flush the device before replaying the journal (if possible, i.e. not
138    // read-only) in case the block device connection was reused.
139    DidFlushDevice(u64),
140    // Checksums for a data range written by this transaction. A transaction is only valid if these
141    // checksums are right. The range is the device offset the checksums are for.
142    //
143    // A boolean indicates whether this range is being written to for the first time. For overwrite
144    // extents, we only check the checksums for a block if it has been written to for the first
145    // time since the last flush, because otherwise we can't roll it back anyway so it doesn't
146    // matter. For copy-on-write extents, the bool is always true.
147    DataChecksums(Range<u64>, ChecksumsV38, bool),
148}
149
150#[allow(clippy::large_enum_variant)]
151#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
152#[migrate_to_version(JournalRecordV49)]
153pub enum JournalRecordV47 {
154    EndBlock,
155    Mutation { object_id: u64, mutation: MutationV47 },
156    Commit,
157    Discard(u64),
158    DidFlushDevice(u64),
159    DataChecksums(Range<u64>, ChecksumsV38, bool),
160}
161
162#[allow(clippy::large_enum_variant)]
163#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
164#[migrate_to_version(JournalRecordV47)]
165pub enum JournalRecordV46 {
166    EndBlock,
167    Mutation { object_id: u64, mutation: MutationV46 },
168    Commit,
169    Discard(u64),
170    DidFlushDevice(u64),
171    DataChecksums(Range<u64>, ChecksumsV38, bool),
172}
173
174#[allow(clippy::large_enum_variant)]
175#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
176#[migrate_to_version(JournalRecordV46)]
177pub enum JournalRecordV43 {
178    EndBlock,
179    Mutation { object_id: u64, mutation: MutationV43 },
180    Commit,
181    Discard(u64),
182    DidFlushDevice(u64),
183    DataChecksums(Range<u64>, ChecksumsV38, bool),
184}
185
186#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
187#[migrate_to_version(JournalRecordV43)]
188pub enum JournalRecordV42 {
189    EndBlock,
190    Mutation { object_id: u64, mutation: MutationV41 },
191    Commit,
192    Discard(u64),
193    DidFlushDevice(u64),
194    DataChecksums(Range<u64>, ChecksumsV38, bool),
195}
196
197#[derive(Serialize, Deserialize, TypeFingerprint, Versioned)]
198pub enum JournalRecordV41 {
199    EndBlock,
200    Mutation { object_id: u64, mutation: MutationV41 },
201    Commit,
202    Discard(u64),
203    DidFlushDevice(u64),
204    DataChecksums(Range<u64>, ChecksumsV38),
205}
206
207impl From<JournalRecordV41> for JournalRecordV42 {
208    fn from(record: JournalRecordV41) -> Self {
209        match record {
210            JournalRecordV41::EndBlock => Self::EndBlock,
211            JournalRecordV41::Mutation { object_id, mutation } => {
212                Self::Mutation { object_id, mutation: mutation.into() }
213            }
214            JournalRecordV41::Commit => Self::Commit,
215            JournalRecordV41::Discard(offset) => Self::Discard(offset),
216            JournalRecordV41::DidFlushDevice(offset) => Self::DidFlushDevice(offset),
217            JournalRecordV41::DataChecksums(range, sums) => {
218                // At the time of writing the only extents written by real systems are CoW extents
219                // so the new bool is always true.
220                Self::DataChecksums(range, sums, true)
221            }
222        }
223    }
224}
225
226#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
227#[migrate_to_version(JournalRecordV41)]
228pub enum JournalRecordV40 {
229    EndBlock,
230    Mutation { object_id: u64, mutation: MutationV40 },
231    Commit,
232    Discard(u64),
233    DidFlushDevice(u64),
234    DataChecksums(Range<u64>, ChecksumsV38),
235}
236
237pub(super) fn journal_handle_options() -> HandleOptions {
238    HandleOptions { skip_journal_checks: true, ..Default::default() }
239}
240
241/// The journal records a stream of mutations that are to be applied to other objects.  At mount
242/// time, these records can be replayed into memory.  It provides a way to quickly persist changes
243/// without having to make a large number of writes; they can be deferred to a later time (e.g.
244/// when a sufficient number have been queued).  It also provides support for transactions, the
245/// ability to have mutations that are to be applied atomically together.
246pub struct Journal {
247    objects: Arc<ObjectManager>,
248    handle: OnceCell<DataObjectHandle<ObjectStore>>,
249    super_block_manager: SuperBlockManager,
250    inner: Mutex<Inner>,
251    writer_mutex: Mutex<()>,
252    sync_mutex: futures::lock::Mutex<()>,
253    trace: AtomicBool,
254
255    // This event is used when we are waiting for a compaction to free up journal space.
256    reclaim_event: Event,
257}
258
259struct Inner {
260    super_block_header: SuperBlockHeader,
261
262    // The offset that we can zero the journal up to now that it is no longer needed.
263    zero_offset: Option<u64>,
264
265    // The journal offset that we most recently flushed to the device.
266    device_flushed_offset: u64,
267
268    // If true, indicates a DidFlushDevice record is pending.
269    needs_did_flush_device: bool,
270
271    // The writer for the journal.
272    writer: JournalWriter,
273
274    // Set when a reset is encountered during a read.
275    // Used at write pre_commit() time to ensure we write a version first thing after a reset.
276    output_reset_version: bool,
277
278    // Waker for the flush task.
279    flush_waker: Option<Waker>,
280
281    // Indicates the journal has been terminated.
282    terminate: bool,
283
284    // Latched error indicating reason for journal termination if not graceful.
285    terminate_reason: Option<Error>,
286
287    // Disable compactions.
288    disable_compactions: bool,
289
290    // True if compactions are running.
291    compaction_running: bool,
292
293    // Waker for the sync task for when it's waiting for the flush task to finish.
294    sync_waker: Option<Waker>,
295
296    // The last offset we flushed to the journal file.
297    flushed_offset: u64,
298
299    // The last offset that should be considered valid in the journal file.  Most of the time, this
300    // will be the same as `flushed_offset` but at mount time, this could be less and will only be
301    // up to the end of the last valid transaction; it won't include transactions that follow that
302    // have been discarded.
303    valid_to: u64,
304
305    // If, after replaying, we have to discard a number of mutations (because they don't validate),
306    // this offset specifies where we need to discard back to.  This is so that when we next replay,
307    // we ignore those mutations and continue with new good mutations.
308    discard_offset: Option<u64>,
309
310    // In the steady state, the journal should fluctuate between being approximately half of this
311    // number and this number.  New super-blocks will be written every time about half of this
312    // amount is written to the journal.
313    reclaim_size: u64,
314
315    image_builder_mode: Option<SuperBlockInstance>,
316
317    // If true and `needs_barrier`, issue a pre-barrier on the first device write of each journal
318    // write (which happens in multiples of `BLOCK_SIZE`). This ensures that all the corresponding
319    // data writes make it to disk before the journal gets written to.
320    barriers_enabled: bool,
321
322    // If true, indicates that data write requests have been made to the device since the last
323    // journal write.
324    needs_barrier: bool,
325}
326
327impl Inner {
328    fn terminate(&mut self, reason: Option<Error>) {
329        self.terminate = true;
330
331        if let Some(err) = reason {
332            error!(error:? = err; "Terminating journal");
333            // Log previous error if one was already set, otherwise latch the error.
334            if let Some(prev_err) = self.terminate_reason.as_ref() {
335                error!(error:? = prev_err; "Journal previously terminated");
336            } else {
337                self.terminate_reason = Some(err);
338            }
339        }
340
341        if let Some(waker) = self.flush_waker.take() {
342            waker.wake();
343        }
344        if let Some(waker) = self.sync_waker.take() {
345            waker.wake();
346        }
347    }
348}
349
350pub struct JournalOptions {
351    /// In the steady state, the journal should fluctuate between being approximately half of this
352    /// number and this number.  New super-blocks will be written every time about half of this
353    /// amount is written to the journal.
354    pub reclaim_size: u64,
355
356    // If true, issue a pre-barrier on the first device write of each journal write (which happens
357    // in multiples of `BLOCK_SIZE`). This ensures that all the corresponding data writes make it
358    // to disk before the journal gets written to.
359    pub barriers_enabled: bool,
360}
361
362impl Default for JournalOptions {
363    fn default() -> Self {
364        JournalOptions { reclaim_size: DEFAULT_RECLAIM_SIZE, barriers_enabled: false }
365    }
366}
367
368struct JournaledTransactions {
369    transactions: Vec<JournaledTransaction>,
370    device_flushed_offset: u64,
371}
372
373#[derive(Debug, Default)]
374pub struct JournaledTransaction {
375    pub checkpoint: JournalCheckpoint,
376    pub root_parent_mutations: Vec<Mutation>,
377    pub root_mutations: Vec<Mutation>,
378    /// List of (store_object_id, mutation).
379    pub non_root_mutations: Vec<(u64, Mutation)>,
380    pub end_offset: u64,
381    pub checksums: Vec<JournaledChecksums>,
382
383    /// Records offset + 1 of the matching begin_flush transaction. The +1 is because we want to
384    /// ignore the begin flush
385    /// transaction; we don't need or want to replay it.
386    pub end_flush: Option<(/* store_id: */ u64, /* begin offset: */ u64)>,
387
388    /// The volume which was deleted in this transaction, if any.
389    pub volume_deleted: Option</* store_id: */ u64>,
390}
391
392impl JournaledTransaction {
393    fn new(checkpoint: JournalCheckpoint) -> Self {
394        Self { checkpoint, ..Default::default() }
395    }
396}
397
398const VOLUME_DELETED: u64 = u64::MAX;
399
400#[derive(Debug)]
401pub struct JournaledChecksums {
402    pub device_range: Range<u64>,
403    pub checksums: Checksums,
404    pub first_write: bool,
405}
406
407/// Handles for journal-like objects have some additional functionality to manage their extents,
408/// since during replay we need to add extents as we find them.
409pub trait JournalHandle: ReadObjectHandle {
410    /// The end offset of the last extent in the JournalHandle.  Used only for validating extents
411    /// (which will be skipped if None is returned).
412    /// Note this is equivalent in value to ReadObjectHandle::get_size, when present.
413    fn end_offset(&self) -> Option<u64>;
414    /// Adds an extent to the current end of the journal stream.
415    /// `added_offset` is the offset into the journal of the transaction which added this extent,
416    /// used for discard_extents.
417    fn push_extent(&mut self, added_offset: u64, device_range: Range<u64>);
418    /// Discards all extents which were added in a transaction at offset >= |discard_offset|.
419    fn discard_extents(&mut self, discard_offset: u64);
420}
421
422// Provide a stub implementation for DataObjectHandle so we can use it in
423// Journal::read_transactions.  Manual extent management is a NOP (which is OK since presumably the
424// DataObjectHandle already knows where its extents live).
425impl<S: HandleOwner> JournalHandle for DataObjectHandle<S> {
426    fn end_offset(&self) -> Option<u64> {
427        None
428    }
429    fn push_extent(&mut self, _added_offset: u64, _device_range: Range<u64>) {
430        // NOP
431    }
432    fn discard_extents(&mut self, _discard_offset: u64) {
433        // NOP
434    }
435}
436
437#[fxfs_trace::trace]
438impl Journal {
439    pub fn new(objects: Arc<ObjectManager>, options: JournalOptions) -> Journal {
440        let starting_checksum = rand::random_range(1..u64::MAX);
441        Journal {
442            objects: objects,
443            handle: OnceCell::new(),
444            super_block_manager: SuperBlockManager::new(),
445            inner: Mutex::new(Inner {
446                super_block_header: SuperBlockHeader::default(),
447                zero_offset: None,
448                device_flushed_offset: 0,
449                needs_did_flush_device: false,
450                writer: JournalWriter::new(BLOCK_SIZE as usize, starting_checksum),
451                output_reset_version: false,
452                flush_waker: None,
453                terminate: false,
454                terminate_reason: None,
455                disable_compactions: false,
456                compaction_running: false,
457                sync_waker: None,
458                flushed_offset: 0,
459                valid_to: 0,
460                discard_offset: None,
461                reclaim_size: options.reclaim_size,
462                image_builder_mode: None,
463                barriers_enabled: options.barriers_enabled,
464                needs_barrier: false,
465            }),
466            writer_mutex: Mutex::new(()),
467            sync_mutex: futures::lock::Mutex::new(()),
468            trace: AtomicBool::new(false),
469            reclaim_event: Event::new(),
470        }
471    }
472
473    pub fn set_trace(&self, trace: bool) {
474        let old_value = self.trace.swap(trace, Ordering::Relaxed);
475        if trace != old_value {
476            info!(trace; "J: trace");
477        }
478    }
479
480    pub fn set_image_builder_mode(&self, mode: Option<SuperBlockInstance>) {
481        self.inner.lock().image_builder_mode = mode;
482        if let Some(instance) = mode {
483            *self.super_block_manager.next_instance.lock() = instance;
484        }
485    }
486
487    pub fn image_builder_mode(&self) -> Option<SuperBlockInstance> {
488        self.inner.lock().image_builder_mode
489    }
490
491    #[cfg(feature = "migration")]
492    pub fn set_filesystem_uuid(&self, uuid: &[u8; 16]) -> Result<(), Error> {
493        ensure!(
494            self.inner.lock().image_builder_mode.is_some(),
495            "Can only set filesystem uuid in image builder mode."
496        );
497        self.inner.lock().super_block_header.guid.0 = uuid::Uuid::from_bytes(*uuid);
498        Ok(())
499    }
500
501    /// Used during replay to validate a mutation.  This should return false if the mutation is not
502    /// valid and should not be applied.  This could be for benign reasons: e.g. the device flushed
503    /// data out-of-order, or because of a malicious actor.
504    fn validate_mutation(&self, mutation: &Mutation, block_size: u64, device_size: u64) -> bool {
505        match mutation {
506            Mutation::ObjectStore(ObjectStoreMutation {
507                item:
508                    Item {
509                        key:
510                            ObjectKey {
511                                data:
512                                    ObjectKeyData::Attribute(
513                                        _,
514                                        AttributeKey::Extent(ExtentKey { range }),
515                                    ),
516                                ..
517                            },
518                        value: ObjectValue::Extent(ExtentValue::Some { device_offset, mode, .. }),
519                        ..
520                    },
521                ..
522            }) => {
523                if range.is_empty() || !range.is_aligned(block_size) {
524                    return false;
525                }
526                let len = range.length().unwrap();
527                if let ExtentMode::Cow(checksums) = mode {
528                    if checksums.len() > 0 {
529                        if len % checksums.len() as u64 != 0 {
530                            return false;
531                        }
532                        if (len / checksums.len() as u64) % block_size != 0 {
533                            return false;
534                        }
535                    }
536                }
537                if *device_offset % block_size != 0
538                    || *device_offset >= device_size
539                    || device_size - *device_offset < len
540                {
541                    return false;
542                }
543            }
544            Mutation::ObjectStore(_) => {}
545            Mutation::EncryptedObjectStore(_) => {}
546            Mutation::Allocator(AllocatorMutation::Allocate { device_range, owner_object_id }) => {
547                return !device_range.is_empty()
548                    && *owner_object_id != INVALID_OBJECT_ID
549                    && device_range.end <= device_size;
550            }
551            Mutation::Allocator(AllocatorMutation::Deallocate {
552                device_range,
553                owner_object_id,
554            }) => {
555                return !device_range.is_empty()
556                    && *owner_object_id != INVALID_OBJECT_ID
557                    && device_range.end <= device_size;
558            }
559            Mutation::Allocator(AllocatorMutation::MarkForDeletion(owner_object_id)) => {
560                return *owner_object_id != INVALID_OBJECT_ID;
561            }
562            Mutation::Allocator(AllocatorMutation::SetLimit { owner_object_id, .. }) => {
563                return *owner_object_id != INVALID_OBJECT_ID;
564            }
565            Mutation::BeginFlush => {}
566            Mutation::EndFlush => {}
567            Mutation::DeleteVolume => {}
568            Mutation::UpdateBorrowed(_) => {}
569            Mutation::UpdateMutationsKey(_) => {}
570            Mutation::CreateInternalDir(owner_object_id) => {
571                return *owner_object_id != INVALID_OBJECT_ID;
572            }
573        }
574        true
575    }
576
577    // Assumes that `mutation` has been validated.
578    fn update_checksum_list(
579        &self,
580        journal_offset: u64,
581        mutation: &Mutation,
582        checksum_list: &mut ChecksumList,
583    ) -> Result<(), Error> {
584        match mutation {
585            Mutation::ObjectStore(_) => {}
586            Mutation::Allocator(AllocatorMutation::Deallocate { device_range, .. }) => {
587                checksum_list.mark_deallocated(journal_offset, device_range.clone().into());
588            }
589            _ => {}
590        }
591        Ok(())
592    }
593
594    /// Reads the latest super-block, and then replays journaled records.
595    #[trace]
596    pub async fn replay(
597        &self,
598        filesystem: Arc<FxFilesystem>,
599        on_new_allocator: Option<Box<dyn Fn(Arc<Allocator>) + Send + Sync>>,
600    ) -> Result<(), Error> {
601        let block_size = filesystem.block_size();
602
603        let (super_block, root_parent) =
604            self.super_block_manager.load(filesystem.device(), block_size).await?;
605
606        let root_parent = Arc::new(ObjectStore::attach_filesystem(root_parent, filesystem.clone()));
607
608        self.objects.set_root_parent_store(root_parent.clone());
609        let allocator =
610            Arc::new(Allocator::new(filesystem.clone(), super_block.allocator_object_id));
611        if let Some(on_new_allocator) = on_new_allocator {
612            on_new_allocator(allocator.clone());
613        }
614        self.objects.set_allocator(allocator.clone());
615        self.objects.set_borrowed_metadata_space(super_block.borrowed_metadata_space);
616        self.objects.set_last_end_offset(super_block.super_block_journal_file_offset);
617        {
618            let mut inner = self.inner.lock();
619            inner.super_block_header = super_block.clone();
620        }
621
622        let device = filesystem.device();
623
624        let mut handle;
625        {
626            let root_parent_layer = root_parent.tree().mutable_layer();
627            let mut iter = root_parent_layer
628                .seek(Bound::Included(&ObjectKey::attribute(
629                    super_block.journal_object_id,
630                    DEFAULT_DATA_ATTRIBUTE_ID,
631                    AttributeKey::Extent(ExtentKey::search_key_from_offset(round_down(
632                        super_block.journal_checkpoint.file_offset,
633                        BLOCK_SIZE,
634                    ))),
635                )))
636                .await
637                .context("Failed to seek root parent store")?;
638            let start_offset = if let Some(ItemRef {
639                key:
640                    ObjectKey {
641                        data:
642                            ObjectKeyData::Attribute(
643                                DEFAULT_DATA_ATTRIBUTE_ID,
644                                AttributeKey::Extent(ExtentKey { range }),
645                            ),
646                        ..
647                    },
648                ..
649            }) = iter.get()
650            {
651                range.start
652            } else {
653                0
654            };
655            handle = BootstrapObjectHandle::new_with_start_offset(
656                super_block.journal_object_id,
657                device.clone(),
658                start_offset,
659            );
660            while let Some(item) = iter.get() {
661                if !match item.into() {
662                    Some((
663                        object_id,
664                        DEFAULT_DATA_ATTRIBUTE_ID,
665                        ExtentKey { range },
666                        ExtentValue::Some { device_offset, .. },
667                    )) if object_id == super_block.journal_object_id => {
668                        if let Some(end_offset) = handle.end_offset() {
669                            if range.start != end_offset {
670                                bail!(anyhow!(FxfsError::Inconsistent).context(format!(
671                                    "Unexpected journal extent {:?}, expected start: {}",
672                                    item, end_offset
673                                )));
674                            }
675                        }
676                        handle.push_extent(
677                            0, // We never discard extents from the root parent store.
678                            *device_offset
679                                ..*device_offset + range.length().context("Invalid extent")?,
680                        );
681                        true
682                    }
683                    _ => false,
684                } {
685                    break;
686                }
687                iter.advance().await.context("Failed to advance root parent store iterator")?;
688            }
689        }
690
691        let mut reader = JournalReader::new(handle, &super_block.journal_checkpoint);
692        let JournaledTransactions { mut transactions, device_flushed_offset } = self
693            .read_transactions(&mut reader, None, INVALID_OBJECT_ID)
694            .await
695            .context("Reading transactions for replay")?;
696
697        // Validate all the mutations.
698        let mut checksum_list = ChecksumList::new(device_flushed_offset);
699        let mut valid_to = reader.journal_file_checkpoint().file_offset;
700        let device_size = device.size();
701        'bad_replay: for JournaledTransaction {
702            checkpoint,
703            root_parent_mutations,
704            root_mutations,
705            non_root_mutations,
706            checksums,
707            ..
708        } in &transactions
709        {
710            for JournaledChecksums { device_range, checksums, first_write } in checksums {
711                checksum_list
712                    .push(
713                        checkpoint.file_offset,
714                        device_range.clone(),
715                        checksums.maybe_as_ref().context("Malformed checksums")?,
716                        *first_write,
717                    )
718                    .context("Pushing journal checksum records to checksum list")?;
719            }
720            for mutation in root_parent_mutations
721                .iter()
722                .chain(root_mutations)
723                .chain(non_root_mutations.iter().map(|(_, m)| m))
724            {
725                if !self.validate_mutation(mutation, block_size, device_size) {
726                    info!(mutation:?; "Stopping replay at bad mutation");
727                    valid_to = checkpoint.file_offset;
728                    break 'bad_replay;
729                }
730                self.update_checksum_list(checkpoint.file_offset, &mutation, &mut checksum_list)?;
731            }
732        }
733
734        // Validate the checksums. Note if barriers are enabled, there will be no checksums in
735        // practice to verify.
736        let valid_to = checksum_list
737            .verify(device.as_ref(), valid_to)
738            .await
739            .context("Failed to validate checksums")?;
740
741        // Apply the mutations...
742
743        let mut last_checkpoint = reader.journal_file_checkpoint();
744        let mut journal_offsets = super_block.journal_file_offsets.clone();
745
746        // Start with the root-parent mutations, and also determine the journal offsets for all
747        // other objects.
748        for (
749            index,
750            JournaledTransaction {
751                checkpoint,
752                root_parent_mutations,
753                end_flush,
754                volume_deleted,
755                ..
756            },
757        ) in transactions.iter_mut().enumerate()
758        {
759            if checkpoint.file_offset >= valid_to {
760                last_checkpoint = checkpoint.clone();
761
762                // Truncate the transactions so we don't need to worry about them on the next pass.
763                transactions.truncate(index);
764                break;
765            }
766
767            let context = ApplyContext { mode: ApplyMode::Replay, checkpoint: checkpoint.clone() };
768            for mutation in root_parent_mutations.drain(..) {
769                self.objects
770                    .apply_mutation(
771                        super_block.root_parent_store_object_id,
772                        mutation,
773                        &context,
774                        AssocObj::None,
775                    )
776                    .context("Failed to replay root parent store mutations")?;
777            }
778
779            if let Some((object_id, journal_offset)) = end_flush {
780                journal_offsets.insert(*object_id, *journal_offset);
781            }
782
783            if let Some(object_id) = volume_deleted {
784                journal_offsets.insert(*object_id, VOLUME_DELETED);
785            }
786        }
787
788        // Now we can open the root store.
789        let root_store = ObjectStore::open(
790            &root_parent,
791            super_block.root_store_object_id,
792            Box::new(NullCache {}),
793        )
794        .await
795        .context("Unable to open root store")?;
796
797        ensure!(
798            !root_store.is_encrypted(),
799            anyhow!(FxfsError::Inconsistent).context("Root store is encrypted")
800        );
801        self.objects.set_root_store(root_store);
802
803        let root_store_offset =
804            journal_offsets.get(&super_block.root_store_object_id).copied().unwrap_or(0);
805
806        // Now replay the root store mutations.
807        for JournaledTransaction { checkpoint, root_mutations, .. } in &mut transactions {
808            if checkpoint.file_offset < root_store_offset {
809                continue;
810            }
811
812            let context = ApplyContext { mode: ApplyMode::Replay, checkpoint: checkpoint.clone() };
813            for mutation in root_mutations.drain(..) {
814                self.objects
815                    .apply_mutation(
816                        super_block.root_store_object_id,
817                        mutation,
818                        &context,
819                        AssocObj::None,
820                    )
821                    .context("Failed to replay root store mutations")?;
822            }
823        }
824
825        // Now we can open the allocator.
826        allocator.open().await.context("Failed to open allocator")?;
827
828        // Now replay all other mutations.
829        for JournaledTransaction { checkpoint, non_root_mutations, end_offset, .. } in transactions
830        {
831            self.objects
832                .replay_mutations(
833                    non_root_mutations,
834                    &journal_offsets,
835                    &ApplyContext { mode: ApplyMode::Replay, checkpoint },
836                    end_offset,
837                )
838                .await
839                .context("Failed to replay mutations")?;
840        }
841
842        allocator.on_replay_complete().await.context("Failed to complete replay for allocator")?;
843
844        let discarded_to =
845            if last_checkpoint.file_offset != reader.journal_file_checkpoint().file_offset {
846                Some(reader.journal_file_checkpoint().file_offset)
847            } else {
848                None
849            };
850
851        // Configure the journal writer so that we can continue.
852        {
853            if last_checkpoint.file_offset < super_block.super_block_journal_file_offset {
854                return Err(anyhow!(FxfsError::Inconsistent).context(format!(
855                    "journal replay cut short; journal finishes at {}, but super-block was \
856                     written at {}",
857                    last_checkpoint.file_offset, super_block.super_block_journal_file_offset
858                )));
859            }
860            let handle = ObjectStore::open_object(
861                &root_parent,
862                super_block.journal_object_id,
863                journal_handle_options(),
864                None,
865            )
866            .await
867            .with_context(|| {
868                format!(
869                    "Failed to open journal file (object id: {})",
870                    super_block.journal_object_id
871                )
872            })?;
873            let _ = self.handle.set(handle);
874            let mut inner = self.inner.lock();
875            reader.skip_to_end_of_block();
876            let mut writer_checkpoint = reader.journal_file_checkpoint();
877
878            // Make sure we don't accidentally use the reader from now onwards.
879            std::mem::drop(reader);
880
881            // Reset the stream to indicate that we've remounted the journal.
882            writer_checkpoint.checksum ^= RESET_XOR;
883            writer_checkpoint.version = LATEST_VERSION;
884            inner.flushed_offset = writer_checkpoint.file_offset;
885
886            // When we open the filesystem as writable, we flush the device.
887            inner.device_flushed_offset = inner.flushed_offset;
888
889            inner.writer.seek(writer_checkpoint);
890            inner.output_reset_version = true;
891            inner.valid_to = last_checkpoint.file_offset;
892            if last_checkpoint.file_offset < inner.flushed_offset {
893                inner.discard_offset = Some(last_checkpoint.file_offset);
894            }
895        }
896
897        self.objects
898            .on_replay_complete()
899            .await
900            .context("Failed to complete replay for object manager")?;
901
902        info!(checkpoint = last_checkpoint.file_offset, discarded_to; "replay complete");
903        Ok(())
904    }
905
906    async fn read_transactions(
907        &self,
908        reader: &mut JournalReader,
909        end_offset: Option<u64>,
910        object_id_filter: u64,
911    ) -> Result<JournaledTransactions, Error> {
912        let mut transactions = Vec::new();
913        let (mut device_flushed_offset, root_parent_store_object_id, root_store_object_id) = {
914            let super_block = &self.inner.lock().super_block_header;
915            (
916                super_block.super_block_journal_file_offset,
917                super_block.root_parent_store_object_id,
918                super_block.root_store_object_id,
919            )
920        };
921        let mut current_transaction = None;
922        let mut begin_flush_offsets = HashMap::default();
923        let mut stores_deleted = HashSet::new();
924        loop {
925            // Cache the checkpoint before we deserialize a record.
926            let checkpoint = reader.journal_file_checkpoint();
927            if let Some(end_offset) = end_offset {
928                if checkpoint.file_offset >= end_offset {
929                    break;
930                }
931            }
932            let result =
933                reader.deserialize().await.context("Failed to deserialize journal record")?;
934            match result {
935                ReadResult::Reset(_) => {
936                    if current_transaction.is_some() {
937                        current_transaction = None;
938                        transactions.pop();
939                    }
940                    let offset = reader.journal_file_checkpoint().file_offset;
941                    if offset > device_flushed_offset {
942                        device_flushed_offset = offset;
943                    }
944                }
945                ReadResult::Some(record) => {
946                    match record {
947                        JournalRecord::EndBlock => {
948                            reader.skip_to_end_of_block();
949                        }
950                        JournalRecord::Mutation { object_id, mutation } => {
951                            let current_transaction = match current_transaction.as_mut() {
952                                None => {
953                                    transactions.push(JournaledTransaction::new(checkpoint));
954                                    current_transaction = transactions.last_mut();
955                                    current_transaction.as_mut().unwrap()
956                                }
957                                Some(transaction) => transaction,
958                            };
959
960                            if stores_deleted.contains(&object_id) {
961                                bail!(
962                                    anyhow!(FxfsError::Inconsistent)
963                                        .context("Encountered mutations for deleted store")
964                                );
965                            }
966
967                            match &mutation {
968                                Mutation::BeginFlush => {
969                                    begin_flush_offsets.insert(
970                                        object_id,
971                                        current_transaction.checkpoint.file_offset,
972                                    );
973                                }
974                                Mutation::EndFlush => {
975                                    if let Some(offset) = begin_flush_offsets.remove(&object_id) {
976                                        if let Some(deleted_volume) =
977                                            &current_transaction.volume_deleted
978                                        {
979                                            if *deleted_volume == object_id {
980                                                bail!(anyhow!(FxfsError::Inconsistent).context(
981                                                    "Multiple EndFlush/DeleteVolume mutations in a \
982                                                    single transaction for the same object"
983                                                ));
984                                            }
985                                        }
986                                        // The +1 is because we don't want to replay the transaction
987                                        // containing the begin flush.
988                                        if current_transaction
989                                            .end_flush
990                                            .replace((object_id, offset + 1))
991                                            .is_some()
992                                        {
993                                            bail!(anyhow!(FxfsError::Inconsistent).context(
994                                                "Multiple EndFlush mutations in a \
995                                                 single transaction"
996                                            ));
997                                        }
998                                    }
999                                }
1000                                Mutation::DeleteVolume => {
1001                                    if let Some((flushed_object, _)) =
1002                                        &current_transaction.end_flush
1003                                    {
1004                                        if *flushed_object == object_id {
1005                                            bail!(anyhow!(FxfsError::Inconsistent).context(
1006                                                "Multiple EndFlush/DeleteVolume mutations in a \
1007                                                    single transaction for the same object"
1008                                            ));
1009                                        }
1010                                    }
1011                                    if current_transaction
1012                                        .volume_deleted
1013                                        .replace(object_id)
1014                                        .is_some()
1015                                    {
1016                                        bail!(anyhow!(FxfsError::Inconsistent).context(
1017                                            "Multiple DeleteVolume mutations in a single \
1018                                             transaction"
1019                                        ));
1020                                    }
1021                                    stores_deleted.insert(object_id);
1022                                }
1023                                _ => {}
1024                            }
1025
1026                            // If this mutation doesn't need to be applied, don't bother adding it
1027                            // to the transaction.
1028                            if (object_id_filter == INVALID_OBJECT_ID
1029                                || object_id_filter == object_id)
1030                                && self.should_apply(object_id, &current_transaction.checkpoint)
1031                            {
1032                                if object_id == root_parent_store_object_id {
1033                                    current_transaction.root_parent_mutations.push(mutation);
1034                                } else if object_id == root_store_object_id {
1035                                    current_transaction.root_mutations.push(mutation);
1036                                } else {
1037                                    current_transaction
1038                                        .non_root_mutations
1039                                        .push((object_id, mutation));
1040                                }
1041                            }
1042                        }
1043                        JournalRecord::DataChecksums(device_range, checksums, first_write) => {
1044                            let current_transaction = match current_transaction.as_mut() {
1045                                None => {
1046                                    transactions.push(JournaledTransaction::new(checkpoint));
1047                                    current_transaction = transactions.last_mut();
1048                                    current_transaction.as_mut().unwrap()
1049                                }
1050                                Some(transaction) => transaction,
1051                            };
1052                            current_transaction.checksums.push(JournaledChecksums {
1053                                device_range,
1054                                checksums,
1055                                first_write,
1056                            });
1057                        }
1058                        JournalRecord::Commit => {
1059                            if let Some(&mut JournaledTransaction {
1060                                ref checkpoint,
1061                                ref root_parent_mutations,
1062                                ref mut end_offset,
1063                                ..
1064                            }) = current_transaction.take()
1065                            {
1066                                for mutation in root_parent_mutations {
1067                                    // Snoop the mutations for any that might apply to the journal
1068                                    // file so that we can pass them to the reader so that it can
1069                                    // read the journal file.
1070                                    if let Mutation::ObjectStore(ObjectStoreMutation {
1071                                        item:
1072                                            Item {
1073                                                key:
1074                                                    ObjectKey {
1075                                                        object_id,
1076                                                        data:
1077                                                            ObjectKeyData::Attribute(
1078                                                                DEFAULT_DATA_ATTRIBUTE_ID,
1079                                                                AttributeKey::Extent(ExtentKey {
1080                                                                    range,
1081                                                                }),
1082                                                            ),
1083                                                        ..
1084                                                    },
1085                                                value:
1086                                                    ObjectValue::Extent(ExtentValue::Some {
1087                                                        device_offset,
1088                                                        ..
1089                                                    }),
1090                                                ..
1091                                            },
1092                                        ..
1093                                    }) = mutation
1094                                    {
1095                                        // Add the journal extents we find on the way to our
1096                                        // reader.
1097                                        let handle = reader.handle();
1098                                        if *object_id != handle.object_id() {
1099                                            continue;
1100                                        }
1101                                        if let Some(end_offset) = handle.end_offset() {
1102                                            if range.start != end_offset {
1103                                                bail!(anyhow!(FxfsError::Inconsistent).context(
1104                                                    format!(
1105                                                        "Unexpected journal extent {:?} -> {}, \
1106                                                           expected start: {}",
1107                                                        range, device_offset, end_offset,
1108                                                    )
1109                                                ));
1110                                            }
1111                                        }
1112                                        handle.push_extent(
1113                                            checkpoint.file_offset,
1114                                            *device_offset
1115                                                ..*device_offset
1116                                                    + range.length().context("Invalid extent")?,
1117                                        );
1118                                    }
1119                                }
1120                                *end_offset = reader.journal_file_checkpoint().file_offset;
1121                            }
1122                        }
1123                        JournalRecord::Discard(offset) => {
1124                            if offset == 0 {
1125                                bail!(
1126                                    anyhow!(FxfsError::Inconsistent)
1127                                        .context("Invalid offset for Discard")
1128                                );
1129                            }
1130                            if let Some(transaction) = current_transaction.as_ref() {
1131                                if transaction.checkpoint.file_offset < offset {
1132                                    // Odd, but OK.
1133                                    continue;
1134                                }
1135                            }
1136                            current_transaction = None;
1137                            while let Some(transaction) = transactions.last() {
1138                                if transaction.checkpoint.file_offset < offset {
1139                                    break;
1140                                }
1141                                transactions.pop();
1142                            }
1143                            reader.handle().discard_extents(offset);
1144                        }
1145                        JournalRecord::DidFlushDevice(offset) => {
1146                            if offset > device_flushed_offset {
1147                                device_flushed_offset = offset;
1148                            }
1149                        }
1150                    }
1151                }
1152                // This is expected when we reach the end of the journal stream.
1153                ReadResult::ChecksumMismatch => break,
1154            }
1155        }
1156
1157        // Discard any uncommitted transaction.
1158        if current_transaction.is_some() {
1159            transactions.pop();
1160        }
1161
1162        Ok(JournaledTransactions { transactions, device_flushed_offset })
1163    }
1164
1165    /// Creates an empty filesystem with the minimum viable objects (including a root parent and
1166    /// root store but no further child stores).
1167    pub async fn init_empty(&self, filesystem: Arc<FxFilesystem>) -> Result<(), Error> {
1168        // The following constants are only used at format time. When mounting, the recorded values
1169        // in the superblock should be used.  The root parent store does not have a parent, but
1170        // needs an object ID to be registered with ObjectManager, so it cannot collide (i.e. have
1171        // the same object ID) with any objects in the root store that use the journal to track
1172        // mutations.
1173        const INIT_ROOT_PARENT_STORE_OBJECT_ID: u64 = 3;
1174        const INIT_ROOT_STORE_OBJECT_ID: u64 = 4;
1175        const INIT_ALLOCATOR_OBJECT_ID: u64 = 5;
1176
1177        info!(device_size = filesystem.device().size(); "Formatting");
1178
1179        let checkpoint = JournalCheckpoint {
1180            version: LATEST_VERSION,
1181            ..self.inner.lock().writer.journal_file_checkpoint()
1182        };
1183
1184        let root_parent = ObjectStore::new_empty(
1185            None,
1186            INIT_ROOT_PARENT_STORE_OBJECT_ID,
1187            filesystem.clone(),
1188            Box::new(NullCache {}),
1189        );
1190        self.objects.set_root_parent_store(root_parent.clone());
1191
1192        let allocator = Arc::new(Allocator::new(filesystem.clone(), INIT_ALLOCATOR_OBJECT_ID));
1193        self.objects.set_allocator(allocator.clone());
1194        self.objects.init_metadata_reservation()?;
1195
1196        let journal_handle;
1197        let super_block_a_handle;
1198        let super_block_b_handle;
1199        let root_store;
1200        let mut transaction = filesystem
1201            .clone()
1202            .new_transaction(
1203                lock_keys![],
1204                Options { skip_journal_checks: true, ..Default::default() },
1205            )
1206            .await?;
1207        root_store = root_parent
1208            .new_child_store(
1209                &mut transaction,
1210                NewChildStoreOptions { object_id: INIT_ROOT_STORE_OBJECT_ID, ..Default::default() },
1211                Box::new(NullCache {}),
1212            )
1213            .await
1214            .context("new_child_store")?;
1215        self.objects.set_root_store(root_store.clone());
1216
1217        allocator.create(&mut transaction).await?;
1218
1219        // Create the super-block objects...
1220        super_block_a_handle = ObjectStore::create_object_with_id(
1221            &root_store,
1222            &mut transaction,
1223            SuperBlockInstance::A.object_id(),
1224            HandleOptions::default(),
1225            None,
1226        )
1227        .context("create super block")?;
1228        root_store.update_last_object_id(SuperBlockInstance::A.object_id());
1229        super_block_a_handle
1230            .extend(&mut transaction, SuperBlockInstance::A.first_extent())
1231            .await
1232            .context("extend super block")?;
1233        super_block_b_handle = ObjectStore::create_object_with_id(
1234            &root_store,
1235            &mut transaction,
1236            SuperBlockInstance::B.object_id(),
1237            HandleOptions::default(),
1238            None,
1239        )
1240        .context("create super block")?;
1241        root_store.update_last_object_id(SuperBlockInstance::B.object_id());
1242        super_block_b_handle
1243            .extend(&mut transaction, SuperBlockInstance::B.first_extent())
1244            .await
1245            .context("extend super block")?;
1246
1247        // the journal object...
1248        journal_handle = ObjectStore::create_object(
1249            &root_parent,
1250            &mut transaction,
1251            journal_handle_options(),
1252            None,
1253        )
1254        .await
1255        .context("create journal")?;
1256        if self.inner.lock().image_builder_mode.is_none() {
1257            let mut file_range = 0..self.chunk_size();
1258            journal_handle
1259                .preallocate_range(&mut transaction, &mut file_range)
1260                .await
1261                .context("preallocate journal")?;
1262            if file_range.start < file_range.end {
1263                bail!("preallocate_range returned too little space");
1264            }
1265        }
1266
1267        // Write the root store object info.
1268        root_store.create(&mut transaction).await?;
1269
1270        // The root parent graveyard.
1271        root_parent
1272            .set_graveyard_directory_object_id(Graveyard::create(&mut transaction, &root_parent));
1273
1274        transaction.commit().await?;
1275
1276        self.inner.lock().super_block_header = SuperBlockHeader::new(
1277            root_parent.store_object_id(),
1278            root_parent.graveyard_directory_object_id(),
1279            root_store.store_object_id(),
1280            allocator.object_id(),
1281            journal_handle.object_id(),
1282            checkpoint,
1283            /* earliest_version: */ LATEST_VERSION,
1284        );
1285
1286        // Initialize the journal writer.
1287        let _ = self.handle.set(journal_handle);
1288        Ok(())
1289    }
1290
1291    /// Normally we allocate the journal when creating the filesystem.
1292    /// This is used image_builder_mode when journal allocation is done last.
1293    pub async fn allocate_journal(&self) -> Result<(), Error> {
1294        let handle = self.handle.get().unwrap();
1295        let filesystem = handle.store().filesystem();
1296        let mut transaction = filesystem
1297            .clone()
1298            .new_transaction(
1299                lock_keys![LockKey::object(handle.store().store_object_id(), handle.object_id()),],
1300                Options { skip_journal_checks: true, ..Default::default() },
1301            )
1302            .await?;
1303        let mut file_range = 0..self.chunk_size();
1304        self.handle
1305            .get()
1306            .unwrap()
1307            .preallocate_range(&mut transaction, &mut file_range)
1308            .await
1309            .context("preallocate journal")?;
1310        if file_range.start < file_range.end {
1311            bail!("preallocate_range returned too little space");
1312        }
1313        transaction.commit().await?;
1314        Ok(())
1315    }
1316
1317    pub async fn init_superblocks(&self) -> Result<(), Error> {
1318        // Overwrite both superblocks.
1319        for _ in 0..2 {
1320            self.write_super_block().await?;
1321        }
1322        Ok(())
1323    }
1324
1325    /// Takes a snapshot of all journaled transactions which affect |object_id| since its last
1326    /// flush.
1327    /// The caller is responsible for locking; it must ensure that the journal is not trimmed during
1328    /// this call.  For example, a Flush lock could be held on the object in question (assuming that
1329    /// object has data to flush and is registered with ObjectManager).
1330    pub async fn read_transactions_for_object(
1331        &self,
1332        object_id: u64,
1333    ) -> Result<Vec<JournaledTransaction>, Error> {
1334        let handle = self.handle.get().expect("No journal handle");
1335        // Reopen the handle since JournalReader needs an owned handle.
1336        let handle = ObjectStore::open_object(
1337            handle.owner(),
1338            handle.object_id(),
1339            journal_handle_options(),
1340            None,
1341        )
1342        .await?;
1343
1344        let checkpoint = match self.objects.journal_checkpoint(object_id) {
1345            Some(checkpoint) => checkpoint,
1346            None => return Ok(vec![]),
1347        };
1348        let mut reader = JournalReader::new(handle, &checkpoint);
1349        // Record the current end offset and only read to there, so we don't accidentally read any
1350        // partially flushed blocks.
1351        let end_offset = self.inner.lock().valid_to;
1352        Ok(self.read_transactions(&mut reader, Some(end_offset), object_id).await?.transactions)
1353    }
1354
1355    /// Commits a transaction.  This is not thread safe; the caller must take appropriate locks.
1356    pub async fn commit(&self, transaction: &mut Transaction<'_>) -> Result<u64, Error> {
1357        if transaction.is_empty() {
1358            return Ok(self.inner.lock().writer.journal_file_checkpoint().file_offset);
1359        }
1360
1361        self.pre_commit(transaction).await?;
1362        Ok(self.write_and_apply_mutations(transaction))
1363    }
1364
1365    // Before we commit, we might need to extend the journal or write pending records to the
1366    // journal.
1367    async fn pre_commit(&self, transaction: &Transaction<'_>) -> Result<(), Error> {
1368        let handle;
1369
1370        let (size, zero_offset) = {
1371            let mut inner = self.inner.lock();
1372
1373            // If this is the first write after a RESET, we need to output version first.
1374            if std::mem::take(&mut inner.output_reset_version) {
1375                LATEST_VERSION.serialize_into(&mut inner.writer)?;
1376            }
1377
1378            if let Some(discard_offset) = inner.discard_offset {
1379                JournalRecord::Discard(discard_offset).serialize_into(&mut inner.writer)?;
1380                inner.discard_offset = None;
1381            }
1382
1383            if inner.needs_did_flush_device {
1384                let offset = inner.device_flushed_offset;
1385                JournalRecord::DidFlushDevice(offset).serialize_into(&mut inner.writer)?;
1386                inner.needs_did_flush_device = false;
1387            }
1388
1389            handle = match self.handle.get() {
1390                None => return Ok(()),
1391                Some(x) => x,
1392            };
1393
1394            let file_offset = inner.writer.journal_file_checkpoint().file_offset;
1395
1396            let size = handle.get_size();
1397            let size = if file_offset + self.chunk_size() > size { Some(size) } else { None };
1398
1399            if size.is_none()
1400                && inner.zero_offset.is_none()
1401                && !self.objects.needs_borrow_for_journal(file_offset)
1402            {
1403                return Ok(());
1404            }
1405
1406            (size, inner.zero_offset)
1407        };
1408
1409        let mut transaction = handle
1410            .new_transaction_with_options(Options {
1411                skip_journal_checks: true,
1412                borrow_metadata_space: true,
1413                allocator_reservation: Some(self.objects.metadata_reservation()),
1414                txn_guard: Some(transaction.txn_guard()),
1415                ..Default::default()
1416            })
1417            .await?;
1418        if let Some(size) = size {
1419            handle
1420                .preallocate_range(&mut transaction, &mut (size..size + self.chunk_size()))
1421                .await?;
1422        }
1423        if let Some(zero_offset) = zero_offset {
1424            handle.zero(&mut transaction, 0..zero_offset).await?;
1425        }
1426
1427        // We can't use regular transaction commit, because that can cause re-entrancy issues, so
1428        // instead we just apply the transaction directly here.
1429        self.write_and_apply_mutations(&mut transaction);
1430
1431        let mut inner = self.inner.lock();
1432
1433        // Make sure the transaction to extend the journal made it to the journal within the old
1434        // size, since otherwise, it won't be possible to replay.
1435        if let Some(size) = size {
1436            assert!(inner.writer.journal_file_checkpoint().file_offset < size);
1437        }
1438
1439        if inner.zero_offset == zero_offset {
1440            inner.zero_offset = None;
1441        }
1442
1443        Ok(())
1444    }
1445
1446    // Determines whether a mutation at the given checkpoint should be applied.  During replay, not
1447    // all records should be applied because the object store or allocator might already contain the
1448    // mutation.  After replay, that obviously isn't the case and we want to apply all mutations.
1449    fn should_apply(&self, object_id: u64, journal_file_checkpoint: &JournalCheckpoint) -> bool {
1450        let super_block_header = &self.inner.lock().super_block_header;
1451        let offset = super_block_header
1452            .journal_file_offsets
1453            .get(&object_id)
1454            .cloned()
1455            .unwrap_or(super_block_header.super_block_journal_file_offset);
1456        journal_file_checkpoint.file_offset >= offset
1457    }
1458
1459    /// Flushes previous writes to the device and then writes out a new super-block.
1460    /// Callers must ensure that we do not make concurrent calls.
1461    async fn write_super_block(&self) -> Result<(), Error> {
1462        let root_parent_store = self.objects.root_parent_store();
1463
1464        // We need to flush previous writes to the device since the new super-block we are writing
1465        // relies on written data being observable, and we also need to lock the root parent store
1466        // so that no new entries are written to it whilst we are writing the super-block, and for
1467        // that we use the write lock.
1468        let old_layers;
1469        let old_super_block_offset;
1470        let mut new_super_block_header;
1471        let checkpoint;
1472        let borrowed;
1473
1474        {
1475            let _sync_guard = debug_assert_not_too_long!(self.sync_mutex.lock());
1476            {
1477                let _write_guard = self.writer_mutex.lock();
1478                (checkpoint, borrowed) = self.pad_to_block()?;
1479                old_layers = super_block::compact_root_parent(&*root_parent_store)?;
1480            }
1481            self.flush_device(checkpoint.file_offset)
1482                .await
1483                .context("flush failed when writing superblock")?;
1484        }
1485
1486        new_super_block_header = self.inner.lock().super_block_header.clone();
1487
1488        old_super_block_offset = new_super_block_header.journal_checkpoint.file_offset;
1489
1490        let (journal_file_offsets, min_checkpoint) = self.objects.journal_file_offsets();
1491
1492        new_super_block_header.generation =
1493            new_super_block_header.generation.checked_add(1).ok_or(FxfsError::Inconsistent)?;
1494        new_super_block_header.super_block_journal_file_offset = checkpoint.file_offset;
1495        new_super_block_header.journal_checkpoint = min_checkpoint.unwrap_or(checkpoint);
1496        new_super_block_header.journal_checkpoint.version = LATEST_VERSION;
1497        new_super_block_header.journal_file_offsets = journal_file_offsets;
1498        new_super_block_header.borrowed_metadata_space = borrowed;
1499
1500        self.super_block_manager
1501            .save(
1502                new_super_block_header.clone(),
1503                self.objects.root_parent_store().filesystem(),
1504                old_layers,
1505            )
1506            .await?;
1507        {
1508            let mut inner = self.inner.lock();
1509            inner.super_block_header = new_super_block_header;
1510            inner.zero_offset = Some(round_down(old_super_block_offset, BLOCK_SIZE));
1511        }
1512
1513        Ok(())
1514    }
1515
1516    /// Flushes any buffered journal data to the device.  Note that this does not flush the device
1517    /// unless the flush_device option is set, in which case data should have been persisted to
1518    /// lower layers.  If a precondition is supplied, it is evaluated and the sync will be skipped
1519    /// if it returns false.  This allows callers to check a condition whilst a lock is held.  If a
1520    /// sync is performed, this function returns the checkpoint that was flushed and the amount of
1521    /// borrowed metadata space at the point it was flushed.
1522    pub async fn sync(
1523        &self,
1524        options: SyncOptions<'_>,
1525    ) -> Result<Option<(JournalCheckpoint, u64)>, Error> {
1526        let _guard = debug_assert_not_too_long!(self.sync_mutex.lock());
1527
1528        let (checkpoint, borrowed) = {
1529            if let Some(precondition) = options.precondition {
1530                if !precondition() {
1531                    return Ok(None);
1532                }
1533            }
1534
1535            // This guard is required so that we don't insert an EndBlock record in the middle of a
1536            // transaction.
1537            let _guard = self.writer_mutex.lock();
1538
1539            self.pad_to_block()?
1540        };
1541
1542        if options.flush_device {
1543            self.flush_device(checkpoint.file_offset).await.context("sync: flush failed")?;
1544        }
1545
1546        Ok(Some((checkpoint, borrowed)))
1547    }
1548
1549    // Returns the checkpoint as it was prior to padding.  This is done because the super block
1550    // needs to record where the last transaction ends and it's the next transaction that pays the
1551    // price of the padding.
1552    fn pad_to_block(&self) -> Result<(JournalCheckpoint, u64), Error> {
1553        let mut inner = self.inner.lock();
1554        let checkpoint = inner.writer.journal_file_checkpoint();
1555        if checkpoint.file_offset % BLOCK_SIZE != 0 {
1556            JournalRecord::EndBlock.serialize_into(&mut inner.writer)?;
1557            inner.writer.pad_to_block()?;
1558            if let Some(waker) = inner.flush_waker.take() {
1559                waker.wake();
1560            }
1561        }
1562        Ok((checkpoint, self.objects.borrowed_metadata_space()))
1563    }
1564
1565    async fn flush_device(&self, checkpoint_offset: u64) -> Result<(), Error> {
1566        debug_assert_not_too_long!(poll_fn(|ctx| {
1567            let mut inner = self.inner.lock();
1568            if inner.flushed_offset >= checkpoint_offset {
1569                Poll::Ready(Ok(()))
1570            } else if inner.terminate {
1571                let context = inner
1572                    .terminate_reason
1573                    .as_ref()
1574                    .map(|e| format!("Journal closed with error: {:?}", e))
1575                    .unwrap_or_else(|| "Journal closed".to_string());
1576                Poll::Ready(Err(anyhow!(FxfsError::JournalFlushError).context(context)))
1577            } else {
1578                inner.sync_waker = Some(ctx.waker().clone());
1579                Poll::Pending
1580            }
1581        }))?;
1582
1583        let needs_flush = self.inner.lock().device_flushed_offset < checkpoint_offset;
1584        if needs_flush {
1585            let trace = self.trace.load(Ordering::Relaxed);
1586            if trace {
1587                info!("J: start flush device");
1588            }
1589            self.handle.get().unwrap().flush_device().await?;
1590            if trace {
1591                info!("J: end flush device");
1592            }
1593
1594            // We need to write a DidFlushDevice record at some point, but if we are in the
1595            // process of shutting down the filesystem, we want to leave the journal clean to
1596            // avoid there being log messages complaining about unwritten journal data, so we
1597            // queue it up so that the next transaction will trigger this record to be written.
1598            // If we are shutting down, that will never happen but since the DidFlushDevice
1599            // message is purely advisory (it reduces the number of checksums we have to verify
1600            // during replay), it doesn't matter if it isn't written.
1601            {
1602                let mut inner = self.inner.lock();
1603                inner.device_flushed_offset = checkpoint_offset;
1604                inner.needs_did_flush_device = true;
1605            }
1606
1607            // Tell the allocator that we flushed the device so that it can now start using
1608            // space that was deallocated.
1609            self.objects.allocator().did_flush_device(checkpoint_offset);
1610            if trace {
1611                info!("J: did flush device");
1612            }
1613        }
1614
1615        Ok(())
1616    }
1617
1618    /// Returns a copy of the super-block header.
1619    pub fn super_block_header(&self) -> SuperBlockHeader {
1620        self.inner.lock().super_block_header.clone()
1621    }
1622
1623    /// Waits for there to be sufficient space in the journal.
1624    pub async fn check_journal_space(&self) -> Result<(), Error> {
1625        loop {
1626            debug_assert_not_too_long!({
1627                let inner = self.inner.lock();
1628                if inner.terminate {
1629                    // If the flush error is set, this will never make progress, since we can't
1630                    // extend the journal any more.
1631                    let context = inner
1632                        .terminate_reason
1633                        .as_ref()
1634                        .map(|e| format!("Journal closed with error: {:?}", e))
1635                        .unwrap_or_else(|| "Journal closed".to_string());
1636                    break Err(anyhow!(FxfsError::JournalFlushError).context(context));
1637                }
1638                if self.objects.last_end_offset()
1639                    - inner.super_block_header.journal_checkpoint.file_offset
1640                    < inner.reclaim_size
1641                {
1642                    break Ok(());
1643                }
1644                if inner.image_builder_mode.is_some() {
1645                    break Ok(());
1646                }
1647                if inner.disable_compactions {
1648                    break Err(
1649                        anyhow!(FxfsError::JournalFlushError).context("Compactions disabled")
1650                    );
1651                }
1652                self.reclaim_event.listen()
1653            });
1654        }
1655    }
1656
1657    fn chunk_size(&self) -> u64 {
1658        CHUNK_SIZE
1659    }
1660
1661    fn write_and_apply_mutations(&self, transaction: &mut Transaction<'_>) -> u64 {
1662        let checkpoint_before;
1663        let checkpoint_after;
1664        {
1665            let _guard = self.writer_mutex.lock();
1666            checkpoint_before = {
1667                let mut inner = self.inner.lock();
1668                if transaction.includes_write() {
1669                    inner.needs_barrier = true;
1670                }
1671                let checkpoint = inner.writer.journal_file_checkpoint();
1672                for TxnMutation { object_id, mutation, .. } in transaction.mutations() {
1673                    self.objects.write_mutation(
1674                        *object_id,
1675                        mutation,
1676                        Writer(*object_id, &mut inner.writer),
1677                    );
1678                }
1679                checkpoint
1680            };
1681            let maybe_mutation =
1682                self.objects.apply_transaction(transaction, &checkpoint_before).expect(
1683                    "apply_transaction should not fail in live mode; \
1684                     filesystem will be in an inconsistent state",
1685                );
1686            checkpoint_after = {
1687                let mut inner = self.inner.lock();
1688                if let Some(mutation) = maybe_mutation {
1689                    inner
1690                        .writer
1691                        .write_record(&JournalRecord::Mutation { object_id: 0, mutation })
1692                        .unwrap();
1693                }
1694                for (device_range, checksums, first_write) in
1695                    transaction.take_checksums().into_iter()
1696                {
1697                    inner
1698                        .writer
1699                        .write_record(&JournalRecord::DataChecksums(
1700                            device_range,
1701                            Checksums::fletcher(checksums),
1702                            first_write,
1703                        ))
1704                        .unwrap();
1705                }
1706                inner.writer.write_record(&JournalRecord::Commit).unwrap();
1707
1708                inner.writer.journal_file_checkpoint()
1709            };
1710        }
1711        self.objects.did_commit_transaction(
1712            transaction,
1713            &checkpoint_before,
1714            checkpoint_after.file_offset,
1715        );
1716
1717        if let Some(waker) = self.inner.lock().flush_waker.take() {
1718            waker.wake();
1719        }
1720
1721        checkpoint_before.file_offset
1722    }
1723
1724    /// This task will flush journal data to the device when there is data that needs flushing, and
1725    /// trigger compactions when short of journal space.  It will return after the terminate method
1726    /// has been called, or an error is encountered with either flushing or compaction.
1727    pub async fn flush_task(self: Arc<Self>) {
1728        let mut flush_fut = None;
1729        let mut compact_fut = None;
1730        let mut flush_error = false;
1731        poll_fn(|ctx| {
1732            loop {
1733                {
1734                    let mut inner = self.inner.lock();
1735                    if flush_fut.is_none() && !flush_error && self.handle.get().is_some() {
1736                        let flushable = inner.writer.flushable_bytes();
1737                        if flushable > 0 {
1738                            flush_fut = Some(self.flush(flushable).boxed());
1739                        }
1740                    }
1741                    if inner.terminate && flush_fut.is_none() && compact_fut.is_none() {
1742                        return Poll::Ready(());
1743                    }
1744                    // The / 2 is here because after compacting, we cannot reclaim the space until the
1745                    // _next_ time we flush the device since the super-block is not guaranteed to
1746                    // persist until then.
1747                    if compact_fut.is_none()
1748                        && !inner.terminate
1749                        && !inner.disable_compactions
1750                        && inner.image_builder_mode.is_none()
1751                        && self.objects.last_end_offset()
1752                            - inner.super_block_header.journal_checkpoint.file_offset
1753                            > inner.reclaim_size / 2
1754                    {
1755                        compact_fut = Some(self.compact().boxed());
1756                        inner.compaction_running = true;
1757                    }
1758                    inner.flush_waker = Some(ctx.waker().clone());
1759                }
1760                let mut pending = true;
1761                if let Some(fut) = flush_fut.as_mut() {
1762                    if let Poll::Ready(result) = fut.poll_unpin(ctx) {
1763                        if let Err(e) = result {
1764                            self.inner.lock().terminate(Some(e.context("Flush error")));
1765                            self.reclaim_event.notify(usize::MAX);
1766                            flush_error = true;
1767                        }
1768                        flush_fut = None;
1769                        pending = false;
1770                    }
1771                }
1772                if let Some(fut) = compact_fut.as_mut() {
1773                    if let Poll::Ready(result) = fut.poll_unpin(ctx) {
1774                        let mut inner = self.inner.lock();
1775                        if let Err(e) = result {
1776                            inner.terminate(Some(e.context("Compaction error")));
1777                        }
1778                        compact_fut = None;
1779                        inner.compaction_running = false;
1780                        self.reclaim_event.notify(usize::MAX);
1781                        pending = false;
1782                    }
1783                }
1784                if pending {
1785                    return Poll::Pending;
1786                }
1787            }
1788        })
1789        .await;
1790    }
1791
1792    async fn flush(&self, amount: usize) -> Result<(), Error> {
1793        let handle = self.handle.get().unwrap();
1794        let mut buf = handle.allocate_buffer(amount).await;
1795        let (offset, len, barrier_on_first_write) = {
1796            let mut inner = self.inner.lock();
1797            let offset = inner.writer.take_flushable(buf.as_mut());
1798            let barrier_on_first_write = inner.needs_barrier && inner.barriers_enabled;
1799            // Reset `needs_barrier` before instead of after the overwrite in case a txn commit
1800            // that contains data happens during the overwrite.
1801            inner.needs_barrier = false;
1802            (offset, buf.len() as u64, barrier_on_first_write)
1803        };
1804        self.handle
1805            .get()
1806            .unwrap()
1807            .overwrite(
1808                offset,
1809                buf.as_mut(),
1810                OverwriteOptions { barrier_on_first_write, ..Default::default() },
1811            )
1812            .await?;
1813
1814        let mut inner = self.inner.lock();
1815        if let Some(waker) = inner.sync_waker.take() {
1816            waker.wake();
1817        }
1818        inner.flushed_offset = offset + len;
1819        inner.valid_to = inner.flushed_offset;
1820        Ok(())
1821    }
1822
1823    /// This should generally NOT be called externally. It is public to allow use by FIDL service
1824    /// fxfs.Debug.
1825    #[trace]
1826    pub async fn compact(&self) -> Result<(), Error> {
1827        let trace = self.trace.load(Ordering::Relaxed);
1828        debug!("Compaction starting");
1829        if trace {
1830            info!("J: start compaction");
1831        }
1832        let earliest_version = self.objects.flush().await.context("Failed to flush objects")?;
1833        self.inner.lock().super_block_header.earliest_version = earliest_version;
1834        self.write_super_block().await.context("Failed to write superblock")?;
1835        if trace {
1836            info!("J: end compaction");
1837        }
1838        debug!("Compaction finished");
1839        Ok(())
1840    }
1841
1842    pub async fn stop_compactions(&self) {
1843        loop {
1844            debug_assert_not_too_long!({
1845                let mut inner = self.inner.lock();
1846                inner.disable_compactions = true;
1847                if !inner.compaction_running {
1848                    return;
1849                }
1850                self.reclaim_event.listen()
1851            });
1852        }
1853    }
1854
1855    /// Creates a lazy inspect node named `str` under `parent` which will yield statistics for the
1856    /// journal when queried.
1857    pub fn track_statistics(self: &Arc<Self>, parent: &fuchsia_inspect::Node, name: &str) {
1858        let this = Arc::downgrade(self);
1859        parent.record_lazy_child(name, move || {
1860            let this_clone = this.clone();
1861            async move {
1862                let inspector = fuchsia_inspect::Inspector::default();
1863                if let Some(this) = this_clone.upgrade() {
1864                    let (journal_min, journal_max, journal_reclaim_size) = {
1865                        // TODO(https://fxbug.dev/42069513): Push-back or rate-limit to prevent DoS.
1866                        let inner = this.inner.lock();
1867                        (
1868                            round_down(
1869                                inner.super_block_header.journal_checkpoint.file_offset,
1870                                BLOCK_SIZE,
1871                            ),
1872                            inner.flushed_offset,
1873                            inner.reclaim_size,
1874                        )
1875                    };
1876                    let root = inspector.root();
1877                    root.record_uint("journal_min_offset", journal_min);
1878                    root.record_uint("journal_max_offset", journal_max);
1879                    root.record_uint("journal_size", journal_max - journal_min);
1880                    root.record_uint("journal_reclaim_size", journal_reclaim_size);
1881
1882                    // TODO(https://fxbug.dev/42068224): Post-compute rather than manually computing metrics.
1883                    if let Some(x) = round_div(
1884                        100 * (journal_max - journal_min),
1885                        this.objects.allocator().get_disk_bytes(),
1886                    ) {
1887                        root.record_uint("journal_size_to_disk_size_percent", x);
1888                    }
1889                }
1890                Ok(inspector)
1891            }
1892            .boxed()
1893        });
1894    }
1895
1896    /// Terminate all journal activity.
1897    pub fn terminate(&self) {
1898        self.inner.lock().terminate(/*reason*/ None);
1899        self.reclaim_event.notify(usize::MAX);
1900    }
1901}
1902
1903/// Wrapper to allow records to be written to the journal.
1904pub struct Writer<'a>(u64, &'a mut JournalWriter);
1905
1906impl Writer<'_> {
1907    pub fn write(&mut self, mutation: Mutation) {
1908        self.1.write_record(&JournalRecord::Mutation { object_id: self.0, mutation }).unwrap();
1909    }
1910}
1911
1912#[cfg(test)]
1913mod tests {
1914    use crate::filesystem::{FxFilesystem, SyncOptions};
1915    use crate::fsck::fsck;
1916    use crate::object_handle::{ObjectHandle, ReadObjectHandle, WriteObjectHandle};
1917    use crate::object_store::directory::Directory;
1918    use crate::object_store::transaction::Options;
1919    use crate::object_store::volume::root_volume;
1920    use crate::object_store::{
1921        HandleOptions, LockKey, NewChildStoreOptions, ObjectStore, StoreOptions, lock_keys,
1922    };
1923    use fuchsia_async as fasync;
1924    use fuchsia_async::MonotonicDuration;
1925    use storage_device::DeviceHolder;
1926    use storage_device::fake_device::FakeDevice;
1927
1928    const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
1929
1930    #[fuchsia::test]
1931    async fn test_replay() {
1932        const TEST_DATA: &[u8] = b"hello";
1933
1934        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1935
1936        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1937
1938        let object_id = {
1939            let root_store = fs.root_store();
1940            let root_directory =
1941                Directory::open(&root_store, root_store.root_directory_object_id())
1942                    .await
1943                    .expect("open failed");
1944            let mut transaction = fs
1945                .clone()
1946                .new_transaction(
1947                    lock_keys![LockKey::object(
1948                        root_store.store_object_id(),
1949                        root_store.root_directory_object_id(),
1950                    )],
1951                    Options::default(),
1952                )
1953                .await
1954                .expect("new_transaction failed");
1955            let handle = root_directory
1956                .create_child_file(&mut transaction, "test")
1957                .await
1958                .expect("create_child_file failed");
1959
1960            transaction.commit().await.expect("commit failed");
1961            let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
1962            buf.as_mut_slice().copy_from_slice(TEST_DATA);
1963            handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
1964            // As this is the first sync, this will actually trigger a new super-block, but normally
1965            // this would not be the case.
1966            fs.sync(SyncOptions::default()).await.expect("sync failed");
1967            handle.object_id()
1968        };
1969
1970        {
1971            fs.close().await.expect("Close failed");
1972            let device = fs.take_device().await;
1973            device.reopen(false);
1974            let fs = FxFilesystem::open(device).await.expect("open failed");
1975            let handle = ObjectStore::open_object(
1976                &fs.root_store(),
1977                object_id,
1978                HandleOptions::default(),
1979                None,
1980            )
1981            .await
1982            .expect("open_object failed");
1983            let mut buf = handle.allocate_buffer(TEST_DEVICE_BLOCK_SIZE as usize).await;
1984            assert_eq!(handle.read(0, buf.as_mut()).await.expect("read failed"), TEST_DATA.len());
1985            assert_eq!(&buf.as_slice()[..TEST_DATA.len()], TEST_DATA);
1986            fsck(fs.clone()).await.expect("fsck failed");
1987            fs.close().await.expect("Close failed");
1988        }
1989    }
1990
1991    #[fuchsia::test]
1992    async fn test_reset() {
1993        const TEST_DATA: &[u8] = b"hello";
1994
1995        let device = DeviceHolder::new(FakeDevice::new(32768, TEST_DEVICE_BLOCK_SIZE));
1996
1997        let mut object_ids = Vec::new();
1998
1999        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2000        {
2001            let root_store = fs.root_store();
2002            let root_directory =
2003                Directory::open(&root_store, root_store.root_directory_object_id())
2004                    .await
2005                    .expect("open failed");
2006            let mut transaction = fs
2007                .clone()
2008                .new_transaction(
2009                    lock_keys![LockKey::object(
2010                        root_store.store_object_id(),
2011                        root_store.root_directory_object_id(),
2012                    )],
2013                    Options::default(),
2014                )
2015                .await
2016                .expect("new_transaction failed");
2017            let handle = root_directory
2018                .create_child_file(&mut transaction, "test")
2019                .await
2020                .expect("create_child_file failed");
2021            transaction.commit().await.expect("commit failed");
2022            let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
2023            buf.as_mut_slice().copy_from_slice(TEST_DATA);
2024            handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2025            fs.sync(SyncOptions::default()).await.expect("sync failed");
2026            object_ids.push(handle.object_id());
2027
2028            // Create a lot of objects but don't sync at the end. This should leave the filesystem
2029            // with a half finished transaction that cannot be replayed.
2030            for i in 0..1000 {
2031                let mut transaction = fs
2032                    .clone()
2033                    .new_transaction(
2034                        lock_keys![LockKey::object(
2035                            root_store.store_object_id(),
2036                            root_store.root_directory_object_id(),
2037                        )],
2038                        Options::default(),
2039                    )
2040                    .await
2041                    .expect("new_transaction failed");
2042                let handle = root_directory
2043                    .create_child_file(&mut transaction, &format!("{}", i))
2044                    .await
2045                    .expect("create_child_file failed");
2046                transaction.commit().await.expect("commit failed");
2047                let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
2048                buf.as_mut_slice().copy_from_slice(TEST_DATA);
2049                handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2050                object_ids.push(handle.object_id());
2051            }
2052        }
2053        fs.close().await.expect("fs close failed");
2054        let device = fs.take_device().await;
2055        device.reopen(false);
2056        let fs = FxFilesystem::open(device).await.expect("open failed");
2057        fsck(fs.clone()).await.expect("fsck failed");
2058        {
2059            let root_store = fs.root_store();
2060            // Check the first two objects which should exist.
2061            for &object_id in &object_ids[0..1] {
2062                let handle = ObjectStore::open_object(
2063                    &root_store,
2064                    object_id,
2065                    HandleOptions::default(),
2066                    None,
2067                )
2068                .await
2069                .expect("open_object failed");
2070                let mut buf = handle.allocate_buffer(TEST_DEVICE_BLOCK_SIZE as usize).await;
2071                assert_eq!(
2072                    handle.read(0, buf.as_mut()).await.expect("read failed"),
2073                    TEST_DATA.len()
2074                );
2075                assert_eq!(&buf.as_slice()[..TEST_DATA.len()], TEST_DATA);
2076            }
2077
2078            // Write one more object and sync.
2079            let root_directory =
2080                Directory::open(&root_store, root_store.root_directory_object_id())
2081                    .await
2082                    .expect("open failed");
2083            let mut transaction = fs
2084                .clone()
2085                .new_transaction(
2086                    lock_keys![LockKey::object(
2087                        root_store.store_object_id(),
2088                        root_store.root_directory_object_id(),
2089                    )],
2090                    Options::default(),
2091                )
2092                .await
2093                .expect("new_transaction failed");
2094            let handle = root_directory
2095                .create_child_file(&mut transaction, "test2")
2096                .await
2097                .expect("create_child_file failed");
2098            transaction.commit().await.expect("commit failed");
2099            let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
2100            buf.as_mut_slice().copy_from_slice(TEST_DATA);
2101            handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2102            fs.sync(SyncOptions::default()).await.expect("sync failed");
2103            object_ids.push(handle.object_id());
2104        }
2105
2106        fs.close().await.expect("close failed");
2107        let device = fs.take_device().await;
2108        device.reopen(false);
2109        let fs = FxFilesystem::open(device).await.expect("open failed");
2110        {
2111            fsck(fs.clone()).await.expect("fsck failed");
2112
2113            // Check the first two and the last objects.
2114            for &object_id in object_ids[0..1].iter().chain(object_ids.last().cloned().iter()) {
2115                let handle = ObjectStore::open_object(
2116                    &fs.root_store(),
2117                    object_id,
2118                    HandleOptions::default(),
2119                    None,
2120                )
2121                .await
2122                .unwrap_or_else(|e| {
2123                    panic!("open_object failed (object_id: {}): {:?}", object_id, e)
2124                });
2125                let mut buf = handle.allocate_buffer(TEST_DEVICE_BLOCK_SIZE as usize).await;
2126                assert_eq!(
2127                    handle.read(0, buf.as_mut()).await.expect("read failed"),
2128                    TEST_DATA.len()
2129                );
2130                assert_eq!(&buf.as_slice()[..TEST_DATA.len()], TEST_DATA);
2131            }
2132        }
2133        fs.close().await.expect("close failed");
2134    }
2135
2136    #[fuchsia::test]
2137    async fn test_discard() {
2138        let device = {
2139            let device = DeviceHolder::new(FakeDevice::new(8192, 4096));
2140            let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2141            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
2142
2143            let store = root_volume
2144                .new_volume("test", NewChildStoreOptions::default())
2145                .await
2146                .expect("new_volume failed");
2147            let root_directory = Directory::open(&store, store.root_directory_object_id())
2148                .await
2149                .expect("open failed");
2150
2151            // Create enough data so that another journal extent is used.
2152            let mut i = 0;
2153            loop {
2154                let mut transaction = fs
2155                    .clone()
2156                    .new_transaction(
2157                        lock_keys![LockKey::object(
2158                            store.store_object_id(),
2159                            store.root_directory_object_id()
2160                        )],
2161                        Options::default(),
2162                    )
2163                    .await
2164                    .expect("new_transaction failed");
2165                root_directory
2166                    .create_child_file(&mut transaction, &format!("a {i}"))
2167                    .await
2168                    .expect("create_child_file failed");
2169                if transaction.commit().await.expect("commit failed") > super::CHUNK_SIZE {
2170                    break;
2171                }
2172                i += 1;
2173            }
2174
2175            // Compact and then disable compactions.
2176            fs.journal().compact().await.expect("compact failed");
2177            fs.journal().stop_compactions().await;
2178
2179            // Keep going until we need another journal extent.
2180            let mut i = 0;
2181            loop {
2182                let mut transaction = fs
2183                    .clone()
2184                    .new_transaction(
2185                        lock_keys![LockKey::object(
2186                            store.store_object_id(),
2187                            store.root_directory_object_id()
2188                        )],
2189                        Options::default(),
2190                    )
2191                    .await
2192                    .expect("new_transaction failed");
2193                root_directory
2194                    .create_child_file(&mut transaction, &format!("b {i}"))
2195                    .await
2196                    .expect("create_child_file failed");
2197                if transaction.commit().await.expect("commit failed") > 2 * super::CHUNK_SIZE {
2198                    break;
2199                }
2200                i += 1;
2201            }
2202
2203            // Allow the journal to flush, but we don't want to sync.
2204            fasync::Timer::new(MonotonicDuration::from_millis(10)).await;
2205            // Because we're not gracefully closing the filesystem, a Discard record will be
2206            // emitted.
2207            fs.device().snapshot().expect("snapshot failed")
2208        };
2209
2210        let fs = FxFilesystem::open(device).await.expect("open failed");
2211
2212        {
2213            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
2214
2215            let store =
2216                root_volume.volume("test", StoreOptions::default()).await.expect("volume failed");
2217
2218            let root_directory = Directory::open(&store, store.root_directory_object_id())
2219                .await
2220                .expect("open failed");
2221
2222            // Write one more transaction.
2223            let mut transaction = fs
2224                .clone()
2225                .new_transaction(
2226                    lock_keys![LockKey::object(
2227                        store.store_object_id(),
2228                        store.root_directory_object_id()
2229                    )],
2230                    Options::default(),
2231                )
2232                .await
2233                .expect("new_transaction failed");
2234            root_directory
2235                .create_child_file(&mut transaction, &format!("d"))
2236                .await
2237                .expect("create_child_file failed");
2238            transaction.commit().await.expect("commit failed");
2239        }
2240
2241        fs.close().await.expect("close failed");
2242        let device = fs.take_device().await;
2243        device.reopen(false);
2244
2245        let fs = FxFilesystem::open(device).await.expect("open failed");
2246        fsck(fs.clone()).await.expect("fsck failed");
2247        fs.close().await.expect("close failed");
2248    }
2249}
2250
2251#[cfg(fuzz)]
2252mod fuzz {
2253    use fuzz::fuzz;
2254
2255    #[fuzz]
2256    fn fuzz_journal_bytes(input: Vec<u8>) {
2257        use crate::filesystem::FxFilesystem;
2258        use fuchsia_async as fasync;
2259        use std::io::Write;
2260        use storage_device::DeviceHolder;
2261        use storage_device::fake_device::FakeDevice;
2262
2263        fasync::SendExecutorBuilder::new().num_threads(4).build().run(async move {
2264            let device = DeviceHolder::new(FakeDevice::new(32768, 512));
2265            let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2266            fs.journal().inner.lock().writer.write_all(&input).expect("write failed");
2267            fs.close().await.expect("close failed");
2268            let device = fs.take_device().await;
2269            device.reopen(false);
2270            if let Ok(fs) = FxFilesystem::open(device).await {
2271                // `close()` can fail if there were objects to be tombstoned. If the said object is
2272                // corrupted, there will be an error when we compact the journal.
2273                let _ = fs.close().await;
2274            }
2275        });
2276    }
2277
2278    #[fuzz]
2279    fn fuzz_journal(input: Vec<super::JournalRecord>) {
2280        use crate::filesystem::FxFilesystem;
2281        use fuchsia_async as fasync;
2282        use storage_device::DeviceHolder;
2283        use storage_device::fake_device::FakeDevice;
2284
2285        fasync::SendExecutorBuilder::new().num_threads(4).build().run(async move {
2286            let device = DeviceHolder::new(FakeDevice::new(32768, 512));
2287            let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2288            {
2289                let mut inner = fs.journal().inner.lock();
2290                for record in &input {
2291                    let _ = inner.writer.write_record(record);
2292                }
2293            }
2294            fs.close().await.expect("close failed");
2295            let device = fs.take_device().await;
2296            device.reopen(false);
2297            if let Ok(fs) = FxFilesystem::open(device).await {
2298                // `close()` can fail if there were objects to be tombstoned. If the said object is
2299                // corrupted, there will be an error when we compact the journal.
2300                let _ = fs.close().await;
2301            }
2302        });
2303    }
2304}