fxfs/object_store/
journal.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! The journal is implemented as an ever extending file which contains variable length records
6//! that describe mutations to be applied to various objects.  The journal file consists of
7//! blocks, with a checksum at the end of each block, but otherwise it can be considered a
8//! continuous stream.
9//!
10//! The checksum is seeded with the checksum from the previous block.  To free space in the
11//! journal, records are replaced with sparse extents when it is known they are no longer
12//! needed to mount.
13//!
14//! At mount time, the journal is replayed: the mutations are applied into memory.
15//! Eventually, a checksum failure will indicate no more records exist to be replayed,
16//! at which point the mount can continue and the journal will be extended from that point with
17//! further mutations as required.
18
19mod bootstrap_handle;
20mod checksum_list;
21mod reader;
22pub mod super_block;
23mod writer;
24
25use crate::checksum::{Checksum, Checksums, ChecksumsV38};
26use crate::errors::FxfsError;
27use crate::filesystem::{ApplyContext, ApplyMode, FxFilesystem, SyncOptions};
28use crate::log::*;
29use crate::lsm_tree::cache::NullCache;
30use crate::lsm_tree::types::Layer;
31use crate::object_handle::{ObjectHandle as _, ReadObjectHandle};
32use crate::object_store::allocator::Allocator;
33use crate::object_store::data_object_handle::OverwriteOptions;
34use crate::object_store::extent_record::{
35    DEFAULT_DATA_ATTRIBUTE_ID, ExtentKey, ExtentMode, ExtentValue,
36};
37use crate::object_store::graveyard::Graveyard;
38use crate::object_store::journal::bootstrap_handle::BootstrapObjectHandle;
39use crate::object_store::journal::checksum_list::ChecksumList;
40use crate::object_store::journal::reader::{JournalReader, ReadResult};
41use crate::object_store::journal::super_block::{
42    SuperBlockHeader, SuperBlockInstance, SuperBlockManager,
43};
44use crate::object_store::journal::writer::JournalWriter;
45use crate::object_store::object_manager::ObjectManager;
46use crate::object_store::object_record::{AttributeKey, ObjectKey, ObjectKeyData, ObjectValue};
47use crate::object_store::transaction::{
48    AllocatorMutation, LockKey, Mutation, MutationV40, MutationV41, MutationV43, MutationV46,
49    MutationV47, MutationV49, MutationV50, ObjectStoreMutation, Options,
50    TRANSACTION_MAX_JOURNAL_USAGE, Transaction, TxnMutation, lock_keys,
51};
52use crate::object_store::{
53    AssocObj, DataObjectHandle, HandleOptions, HandleOwner, INVALID_OBJECT_ID, Item, ItemRef,
54    NewChildStoreOptions, ObjectStore,
55};
56use crate::range::RangeExt;
57use crate::round::{round_div, round_down};
58use crate::serialized_types::{LATEST_VERSION, Migrate, Version, Versioned, migrate_to_version};
59use anyhow::{Context, Error, anyhow, bail, ensure};
60use event_listener::Event;
61use fprint::TypeFingerprint;
62use fuchsia_sync::Mutex;
63use futures::FutureExt as _;
64use futures::future::poll_fn;
65use rustc_hash::FxHashMap as HashMap;
66use serde::{Deserialize, Serialize};
67use static_assertions::const_assert;
68use std::clone::Clone;
69use std::collections::HashSet;
70use std::ops::{Bound, Range};
71use std::sync::atomic::{AtomicBool, Ordering};
72use std::sync::{Arc, OnceLock};
73use std::task::{Poll, Waker};
74
75// The journal file is written to in blocks of this size.
76pub const BLOCK_SIZE: u64 = 4096;
77
78// The journal file is extended by this amount when necessary.
79const CHUNK_SIZE: u64 = 131_072;
80const_assert!(CHUNK_SIZE > TRANSACTION_MAX_JOURNAL_USAGE);
81
82// See the comment for the `reclaim_size` member of Inner.
83pub const DEFAULT_RECLAIM_SIZE: u64 = 262_144;
84
85// Temporary space that should be reserved for the journal.  For example: space that is currently
86// used in the journal file but cannot be deallocated yet because we are flushing.
87pub const RESERVED_SPACE: u64 = 1_048_576;
88
89// Whenever the journal is replayed (i.e. the system is unmounted and remounted), we reset the
90// journal stream, at which point any half-complete transactions are discarded.  We indicate a
91// journal reset by XORing the previous block's checksum with this mask, and using that value as a
92// seed for the next journal block.
93const RESET_XOR: u64 = 0xffffffffffffffff;
94
95// To keep track of offsets within a journal file, we need both the file offset and the check-sum of
96// the preceding block, since the check-sum of the preceding block is an input to the check-sum of
97// every block.
98pub type JournalCheckpoint = JournalCheckpointV32;
99
100#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize, TypeFingerprint)]
101pub struct JournalCheckpointV32 {
102    pub file_offset: u64,
103
104    // Starting check-sum for block that contains file_offset i.e. the checksum for the previous
105    // block.
106    pub checksum: Checksum,
107
108    // If versioned, the version of elements stored in the journal. e.g. JournalRecord version.
109    // This can change across reset events so we store it along with the offset and checksum to
110    // know which version to deserialize.
111    pub version: Version,
112}
113
114pub type JournalRecord = JournalRecordV50;
115
116#[allow(clippy::large_enum_variant)]
117#[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint, Versioned)]
118#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
119pub enum JournalRecordV50 {
120    // Indicates no more records in this block.
121    EndBlock,
122    // Mutation for a particular object.  object_id here is for the collection i.e. the store or
123    // allocator.
124    Mutation { object_id: u64, mutation: MutationV50 },
125    // Commits records in the transaction.
126    Commit,
127    // Discard all mutations with offsets greater than or equal to the given offset.
128    Discard(u64),
129    // Indicates the device was flushed at the given journal offset.
130    // Note that this really means that at this point in the journal offset, we can be certain that
131    // there's no remaining buffered data in the block device; the buffers and the disk contents are
132    // consistent.
133    // We insert one of these records *after* a flush along with the *next* transaction to go
134    // through.  If that never comes (either due to graceful or hard shutdown), the journal reset
135    // on the next mount will serve the same purpose and count as a flush, although it is necessary
136    // to defensively flush the device before replaying the journal (if possible, i.e. not
137    // read-only) in case the block device connection was reused.
138    DidFlushDevice(u64),
139    // Checksums for a data range written by this transaction. A transaction is only valid if these
140    // checksums are right. The range is the device offset the checksums are for.
141    //
142    // A boolean indicates whether this range is being written to for the first time. For overwrite
143    // extents, we only check the checksums for a block if it has been written to for the first
144    // time since the last flush, because otherwise we can't roll it back anyway so it doesn't
145    // matter. For copy-on-write extents, the bool is always true.
146    DataChecksums(Range<u64>, ChecksumsV38, bool),
147}
148
149#[allow(clippy::large_enum_variant)]
150#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
151#[migrate_to_version(JournalRecordV50)]
152pub enum JournalRecordV49 {
153    EndBlock,
154    Mutation { object_id: u64, mutation: MutationV49 },
155    Commit,
156    Discard(u64),
157    DidFlushDevice(u64),
158    DataChecksums(Range<u64>, ChecksumsV38, bool),
159}
160
161#[allow(clippy::large_enum_variant)]
162#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
163#[migrate_to_version(JournalRecordV49)]
164pub enum JournalRecordV47 {
165    EndBlock,
166    Mutation { object_id: u64, mutation: MutationV47 },
167    Commit,
168    Discard(u64),
169    DidFlushDevice(u64),
170    DataChecksums(Range<u64>, ChecksumsV38, bool),
171}
172
173#[allow(clippy::large_enum_variant)]
174#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
175#[migrate_to_version(JournalRecordV47)]
176pub enum JournalRecordV46 {
177    EndBlock,
178    Mutation { object_id: u64, mutation: MutationV46 },
179    Commit,
180    Discard(u64),
181    DidFlushDevice(u64),
182    DataChecksums(Range<u64>, ChecksumsV38, bool),
183}
184
185#[allow(clippy::large_enum_variant)]
186#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
187#[migrate_to_version(JournalRecordV46)]
188pub enum JournalRecordV43 {
189    EndBlock,
190    Mutation { object_id: u64, mutation: MutationV43 },
191    Commit,
192    Discard(u64),
193    DidFlushDevice(u64),
194    DataChecksums(Range<u64>, ChecksumsV38, bool),
195}
196
197#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
198#[migrate_to_version(JournalRecordV43)]
199pub enum JournalRecordV42 {
200    EndBlock,
201    Mutation { object_id: u64, mutation: MutationV41 },
202    Commit,
203    Discard(u64),
204    DidFlushDevice(u64),
205    DataChecksums(Range<u64>, ChecksumsV38, bool),
206}
207
208#[derive(Serialize, Deserialize, TypeFingerprint, Versioned)]
209pub enum JournalRecordV41 {
210    EndBlock,
211    Mutation { object_id: u64, mutation: MutationV41 },
212    Commit,
213    Discard(u64),
214    DidFlushDevice(u64),
215    DataChecksums(Range<u64>, ChecksumsV38),
216}
217
218impl From<JournalRecordV41> for JournalRecordV42 {
219    fn from(record: JournalRecordV41) -> Self {
220        match record {
221            JournalRecordV41::EndBlock => Self::EndBlock,
222            JournalRecordV41::Mutation { object_id, mutation } => {
223                Self::Mutation { object_id, mutation: mutation.into() }
224            }
225            JournalRecordV41::Commit => Self::Commit,
226            JournalRecordV41::Discard(offset) => Self::Discard(offset),
227            JournalRecordV41::DidFlushDevice(offset) => Self::DidFlushDevice(offset),
228            JournalRecordV41::DataChecksums(range, sums) => {
229                // At the time of writing the only extents written by real systems are CoW extents
230                // so the new bool is always true.
231                Self::DataChecksums(range, sums, true)
232            }
233        }
234    }
235}
236
237#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
238#[migrate_to_version(JournalRecordV41)]
239pub enum JournalRecordV40 {
240    EndBlock,
241    Mutation { object_id: u64, mutation: MutationV40 },
242    Commit,
243    Discard(u64),
244    DidFlushDevice(u64),
245    DataChecksums(Range<u64>, ChecksumsV38),
246}
247
248pub(super) fn journal_handle_options() -> HandleOptions {
249    HandleOptions { skip_journal_checks: true, ..Default::default() }
250}
251
252/// The journal records a stream of mutations that are to be applied to other objects.  At mount
253/// time, these records can be replayed into memory.  It provides a way to quickly persist changes
254/// without having to make a large number of writes; they can be deferred to a later time (e.g.
255/// when a sufficient number have been queued).  It also provides support for transactions, the
256/// ability to have mutations that are to be applied atomically together.
257pub struct Journal {
258    objects: Arc<ObjectManager>,
259    handle: OnceLock<DataObjectHandle<ObjectStore>>,
260    super_block_manager: SuperBlockManager,
261    inner: Mutex<Inner>,
262    writer_mutex: Mutex<()>,
263    sync_mutex: futures::lock::Mutex<()>,
264    trace: AtomicBool,
265
266    // This event is used when we are waiting for a compaction to free up journal space.
267    reclaim_event: Event,
268}
269
270struct Inner {
271    super_block_header: SuperBlockHeader,
272
273    // The offset that we can zero the journal up to now that it is no longer needed.
274    zero_offset: Option<u64>,
275
276    // The journal offset that we most recently flushed to the device.
277    device_flushed_offset: u64,
278
279    // If true, indicates a DidFlushDevice record is pending.
280    needs_did_flush_device: bool,
281
282    // The writer for the journal.
283    writer: JournalWriter,
284
285    // Set when a reset is encountered during a read.
286    // Used at write pre_commit() time to ensure we write a version first thing after a reset.
287    output_reset_version: bool,
288
289    // Waker for the flush task.
290    flush_waker: Option<Waker>,
291
292    // Indicates the journal has been terminated.
293    terminate: bool,
294
295    // Latched error indicating reason for journal termination if not graceful.
296    terminate_reason: Option<Error>,
297
298    // Disable compactions.
299    disable_compactions: bool,
300
301    // True if compactions are running.
302    compaction_running: bool,
303
304    // Waker for the sync task for when it's waiting for the flush task to finish.
305    sync_waker: Option<Waker>,
306
307    // The last offset we flushed to the journal file.
308    flushed_offset: u64,
309
310    // The last offset that should be considered valid in the journal file.  Most of the time, this
311    // will be the same as `flushed_offset` but at mount time, this could be less and will only be
312    // up to the end of the last valid transaction; it won't include transactions that follow that
313    // have been discarded.
314    valid_to: u64,
315
316    // If, after replaying, we have to discard a number of mutations (because they don't validate),
317    // this offset specifies where we need to discard back to.  This is so that when we next replay,
318    // we ignore those mutations and continue with new good mutations.
319    discard_offset: Option<u64>,
320
321    // In the steady state, the journal should fluctuate between being approximately half of this
322    // number and this number.  New super-blocks will be written every time about half of this
323    // amount is written to the journal.
324    reclaim_size: u64,
325
326    image_builder_mode: Option<SuperBlockInstance>,
327
328    // If true and `needs_barrier`, issue a pre-barrier on the first device write of each journal
329    // write (which happens in multiples of `BLOCK_SIZE`). This ensures that all the corresponding
330    // data writes make it to disk before the journal gets written to.
331    barriers_enabled: bool,
332
333    // If true, indicates that data write requests have been made to the device since the last
334    // journal write.
335    needs_barrier: bool,
336}
337
338impl Inner {
339    fn terminate(&mut self, reason: Option<Error>) {
340        self.terminate = true;
341
342        if let Some(err) = reason {
343            error!(error:? = err; "Terminating journal");
344            // Log previous error if one was already set, otherwise latch the error.
345            if let Some(prev_err) = self.terminate_reason.as_ref() {
346                error!(error:? = prev_err; "Journal previously terminated");
347            } else {
348                self.terminate_reason = Some(err);
349            }
350        }
351
352        if let Some(waker) = self.flush_waker.take() {
353            waker.wake();
354        }
355        if let Some(waker) = self.sync_waker.take() {
356            waker.wake();
357        }
358    }
359}
360
361pub struct JournalOptions {
362    /// In the steady state, the journal should fluctuate between being approximately half of this
363    /// number and this number.  New super-blocks will be written every time about half of this
364    /// amount is written to the journal.
365    pub reclaim_size: u64,
366
367    // If true, issue a pre-barrier on the first device write of each journal write (which happens
368    // in multiples of `BLOCK_SIZE`). This ensures that all the corresponding data writes make it
369    // to disk before the journal gets written to.
370    pub barriers_enabled: bool,
371}
372
373impl Default for JournalOptions {
374    fn default() -> Self {
375        JournalOptions { reclaim_size: DEFAULT_RECLAIM_SIZE, barriers_enabled: false }
376    }
377}
378
379struct JournaledTransactions {
380    transactions: Vec<JournaledTransaction>,
381    device_flushed_offset: u64,
382}
383
384#[derive(Debug, Default)]
385pub struct JournaledTransaction {
386    pub checkpoint: JournalCheckpoint,
387    pub root_parent_mutations: Vec<Mutation>,
388    pub root_mutations: Vec<Mutation>,
389    /// List of (store_object_id, mutation).
390    pub non_root_mutations: Vec<(u64, Mutation)>,
391    pub end_offset: u64,
392    pub checksums: Vec<JournaledChecksums>,
393
394    /// Records offset + 1 of the matching begin_flush transaction. The +1 is because we want to
395    /// ignore the begin flush
396    /// transaction; we don't need or want to replay it.
397    pub end_flush: Option<(/* store_id: */ u64, /* begin offset: */ u64)>,
398
399    /// The volume which was deleted in this transaction, if any.
400    pub volume_deleted: Option</* store_id: */ u64>,
401}
402
403impl JournaledTransaction {
404    fn new(checkpoint: JournalCheckpoint) -> Self {
405        Self { checkpoint, ..Default::default() }
406    }
407}
408
409const VOLUME_DELETED: u64 = u64::MAX;
410
411#[derive(Debug)]
412pub struct JournaledChecksums {
413    pub device_range: Range<u64>,
414    pub checksums: Checksums,
415    pub first_write: bool,
416}
417
418/// Handles for journal-like objects have some additional functionality to manage their extents,
419/// since during replay we need to add extents as we find them.
420pub trait JournalHandle: ReadObjectHandle {
421    /// The end offset of the last extent in the JournalHandle.  Used only for validating extents
422    /// (which will be skipped if None is returned).
423    /// Note this is equivalent in value to ReadObjectHandle::get_size, when present.
424    fn end_offset(&self) -> Option<u64>;
425    /// Adds an extent to the current end of the journal stream.
426    /// `added_offset` is the offset into the journal of the transaction which added this extent,
427    /// used for discard_extents.
428    fn push_extent(&mut self, added_offset: u64, device_range: Range<u64>);
429    /// Discards all extents which were added in a transaction at offset >= |discard_offset|.
430    fn discard_extents(&mut self, discard_offset: u64);
431}
432
433// Provide a stub implementation for DataObjectHandle so we can use it in
434// Journal::read_transactions.  Manual extent management is a NOP (which is OK since presumably the
435// DataObjectHandle already knows where its extents live).
436impl<S: HandleOwner> JournalHandle for DataObjectHandle<S> {
437    fn end_offset(&self) -> Option<u64> {
438        None
439    }
440    fn push_extent(&mut self, _added_offset: u64, _device_range: Range<u64>) {
441        // NOP
442    }
443    fn discard_extents(&mut self, _discard_offset: u64) {
444        // NOP
445    }
446}
447
448#[fxfs_trace::trace]
449impl Journal {
450    pub fn new(objects: Arc<ObjectManager>, options: JournalOptions) -> Journal {
451        let starting_checksum = rand::random_range(1..u64::MAX);
452        Journal {
453            objects: objects,
454            handle: OnceLock::new(),
455            super_block_manager: SuperBlockManager::new(),
456            inner: Mutex::new(Inner {
457                super_block_header: SuperBlockHeader::default(),
458                zero_offset: None,
459                device_flushed_offset: 0,
460                needs_did_flush_device: false,
461                writer: JournalWriter::new(BLOCK_SIZE as usize, starting_checksum),
462                output_reset_version: false,
463                flush_waker: None,
464                terminate: false,
465                terminate_reason: None,
466                disable_compactions: false,
467                compaction_running: false,
468                sync_waker: None,
469                flushed_offset: 0,
470                valid_to: 0,
471                discard_offset: None,
472                reclaim_size: options.reclaim_size,
473                image_builder_mode: None,
474                barriers_enabled: options.barriers_enabled,
475                needs_barrier: false,
476            }),
477            writer_mutex: Mutex::new(()),
478            sync_mutex: futures::lock::Mutex::new(()),
479            trace: AtomicBool::new(false),
480            reclaim_event: Event::new(),
481        }
482    }
483
484    pub fn set_trace(&self, trace: bool) {
485        let old_value = self.trace.swap(trace, Ordering::Relaxed);
486        if trace != old_value {
487            info!(trace; "J: trace");
488        }
489    }
490
491    pub fn set_image_builder_mode(&self, mode: Option<SuperBlockInstance>) {
492        self.inner.lock().image_builder_mode = mode;
493        if let Some(instance) = mode {
494            *self.super_block_manager.next_instance.lock() = instance;
495        }
496    }
497
498    pub fn image_builder_mode(&self) -> Option<SuperBlockInstance> {
499        self.inner.lock().image_builder_mode
500    }
501
502    #[cfg(feature = "migration")]
503    pub fn set_filesystem_uuid(&self, uuid: &[u8; 16]) -> Result<(), Error> {
504        ensure!(
505            self.inner.lock().image_builder_mode.is_some(),
506            "Can only set filesystem uuid in image builder mode."
507        );
508        self.inner.lock().super_block_header.guid.0 = uuid::Uuid::from_bytes(*uuid);
509        Ok(())
510    }
511
512    /// Used during replay to validate a mutation.  This should return false if the mutation is not
513    /// valid and should not be applied.  This could be for benign reasons: e.g. the device flushed
514    /// data out-of-order, or because of a malicious actor.
515    fn validate_mutation(&self, mutation: &Mutation, block_size: u64, device_size: u64) -> bool {
516        match mutation {
517            Mutation::ObjectStore(ObjectStoreMutation {
518                item:
519                    Item {
520                        key:
521                            ObjectKey {
522                                data:
523                                    ObjectKeyData::Attribute(
524                                        _,
525                                        AttributeKey::Extent(ExtentKey { range }),
526                                    ),
527                                ..
528                            },
529                        value: ObjectValue::Extent(ExtentValue::Some { device_offset, mode, .. }),
530                        ..
531                    },
532                ..
533            }) => {
534                if range.is_empty() || !range.is_aligned(block_size) {
535                    return false;
536                }
537                let len = range.length().unwrap();
538                if let ExtentMode::Cow(checksums) = mode {
539                    if checksums.len() > 0 {
540                        if len % checksums.len() as u64 != 0 {
541                            return false;
542                        }
543                        if (len / checksums.len() as u64) % block_size != 0 {
544                            return false;
545                        }
546                    }
547                }
548                if *device_offset % block_size != 0
549                    || *device_offset >= device_size
550                    || device_size - *device_offset < len
551                {
552                    return false;
553                }
554            }
555            Mutation::ObjectStore(_) => {}
556            Mutation::EncryptedObjectStore(_) => {}
557            Mutation::Allocator(AllocatorMutation::Allocate { device_range, owner_object_id }) => {
558                return !device_range.is_empty()
559                    && *owner_object_id != INVALID_OBJECT_ID
560                    && device_range.end <= device_size;
561            }
562            Mutation::Allocator(AllocatorMutation::Deallocate {
563                device_range,
564                owner_object_id,
565            }) => {
566                return !device_range.is_empty()
567                    && *owner_object_id != INVALID_OBJECT_ID
568                    && device_range.end <= device_size;
569            }
570            Mutation::Allocator(AllocatorMutation::MarkForDeletion(owner_object_id)) => {
571                return *owner_object_id != INVALID_OBJECT_ID;
572            }
573            Mutation::Allocator(AllocatorMutation::SetLimit { owner_object_id, .. }) => {
574                return *owner_object_id != INVALID_OBJECT_ID;
575            }
576            Mutation::BeginFlush => {}
577            Mutation::EndFlush => {}
578            Mutation::DeleteVolume => {}
579            Mutation::UpdateBorrowed(_) => {}
580            Mutation::UpdateMutationsKey(_) => {}
581            Mutation::CreateInternalDir(owner_object_id) => {
582                return *owner_object_id != INVALID_OBJECT_ID;
583            }
584        }
585        true
586    }
587
588    // Assumes that `mutation` has been validated.
589    fn update_checksum_list(
590        &self,
591        journal_offset: u64,
592        mutation: &Mutation,
593        checksum_list: &mut ChecksumList,
594    ) -> Result<(), Error> {
595        match mutation {
596            Mutation::ObjectStore(_) => {}
597            Mutation::Allocator(AllocatorMutation::Deallocate { device_range, .. }) => {
598                checksum_list.mark_deallocated(journal_offset, device_range.clone().into());
599            }
600            _ => {}
601        }
602        Ok(())
603    }
604
605    /// Reads the latest super-block, and then replays journaled records.
606    #[trace]
607    pub async fn replay(
608        &self,
609        filesystem: Arc<FxFilesystem>,
610        on_new_allocator: Option<Box<dyn Fn(Arc<Allocator>) + Send + Sync>>,
611    ) -> Result<(), Error> {
612        let block_size = filesystem.block_size();
613
614        let (super_block, root_parent) =
615            self.super_block_manager.load(filesystem.device(), block_size).await?;
616
617        let root_parent = Arc::new(ObjectStore::attach_filesystem(root_parent, filesystem.clone()));
618
619        self.objects.set_root_parent_store(root_parent.clone());
620        let allocator =
621            Arc::new(Allocator::new(filesystem.clone(), super_block.allocator_object_id));
622        if let Some(on_new_allocator) = on_new_allocator {
623            on_new_allocator(allocator.clone());
624        }
625        self.objects.set_allocator(allocator.clone());
626        self.objects.set_borrowed_metadata_space(super_block.borrowed_metadata_space);
627        self.objects.set_last_end_offset(super_block.super_block_journal_file_offset);
628        {
629            let mut inner = self.inner.lock();
630            inner.super_block_header = super_block.clone();
631        }
632
633        let device = filesystem.device();
634
635        let mut handle;
636        {
637            let root_parent_layer = root_parent.tree().mutable_layer();
638            let mut iter = root_parent_layer
639                .seek(Bound::Included(&ObjectKey::attribute(
640                    super_block.journal_object_id,
641                    DEFAULT_DATA_ATTRIBUTE_ID,
642                    AttributeKey::Extent(ExtentKey::search_key_from_offset(round_down(
643                        super_block.journal_checkpoint.file_offset,
644                        BLOCK_SIZE,
645                    ))),
646                )))
647                .await
648                .context("Failed to seek root parent store")?;
649            let start_offset = if let Some(ItemRef {
650                key:
651                    ObjectKey {
652                        data:
653                            ObjectKeyData::Attribute(
654                                DEFAULT_DATA_ATTRIBUTE_ID,
655                                AttributeKey::Extent(ExtentKey { range }),
656                            ),
657                        ..
658                    },
659                ..
660            }) = iter.get()
661            {
662                range.start
663            } else {
664                0
665            };
666            handle = BootstrapObjectHandle::new_with_start_offset(
667                super_block.journal_object_id,
668                device.clone(),
669                start_offset,
670            );
671            while let Some(item) = iter.get() {
672                if !match item.into() {
673                    Some((
674                        object_id,
675                        DEFAULT_DATA_ATTRIBUTE_ID,
676                        ExtentKey { range },
677                        ExtentValue::Some { device_offset, .. },
678                    )) if object_id == super_block.journal_object_id => {
679                        if let Some(end_offset) = handle.end_offset() {
680                            if range.start != end_offset {
681                                bail!(anyhow!(FxfsError::Inconsistent).context(format!(
682                                    "Unexpected journal extent {:?}, expected start: {}",
683                                    item, end_offset
684                                )));
685                            }
686                        }
687                        handle.push_extent(
688                            0, // We never discard extents from the root parent store.
689                            *device_offset
690                                ..*device_offset + range.length().context("Invalid extent")?,
691                        );
692                        true
693                    }
694                    _ => false,
695                } {
696                    break;
697                }
698                iter.advance().await.context("Failed to advance root parent store iterator")?;
699            }
700        }
701
702        let mut reader = JournalReader::new(handle, &super_block.journal_checkpoint);
703        let JournaledTransactions { mut transactions, device_flushed_offset } = self
704            .read_transactions(&mut reader, None, INVALID_OBJECT_ID)
705            .await
706            .context("Reading transactions for replay")?;
707
708        // Validate all the mutations.
709        let mut checksum_list = ChecksumList::new(device_flushed_offset);
710        let mut valid_to = reader.journal_file_checkpoint().file_offset;
711        let device_size = device.size();
712        'bad_replay: for JournaledTransaction {
713            checkpoint,
714            root_parent_mutations,
715            root_mutations,
716            non_root_mutations,
717            checksums,
718            ..
719        } in &transactions
720        {
721            for JournaledChecksums { device_range, checksums, first_write } in checksums {
722                checksum_list
723                    .push(
724                        checkpoint.file_offset,
725                        device_range.clone(),
726                        checksums.maybe_as_ref().context("Malformed checksums")?,
727                        *first_write,
728                    )
729                    .context("Pushing journal checksum records to checksum list")?;
730            }
731            for mutation in root_parent_mutations
732                .iter()
733                .chain(root_mutations)
734                .chain(non_root_mutations.iter().map(|(_, m)| m))
735            {
736                if !self.validate_mutation(mutation, block_size, device_size) {
737                    info!(mutation:?; "Stopping replay at bad mutation");
738                    valid_to = checkpoint.file_offset;
739                    break 'bad_replay;
740                }
741                self.update_checksum_list(checkpoint.file_offset, &mutation, &mut checksum_list)?;
742            }
743        }
744
745        // Validate the checksums. Note if barriers are enabled, there will be no checksums in
746        // practice to verify.
747        let valid_to = checksum_list
748            .verify(device.as_ref(), valid_to)
749            .await
750            .context("Failed to validate checksums")?;
751
752        // Apply the mutations...
753
754        let mut last_checkpoint = reader.journal_file_checkpoint();
755        let mut journal_offsets = super_block.journal_file_offsets.clone();
756
757        // Start with the root-parent mutations, and also determine the journal offsets for all
758        // other objects.
759        for (
760            index,
761            JournaledTransaction {
762                checkpoint,
763                root_parent_mutations,
764                end_flush,
765                volume_deleted,
766                ..
767            },
768        ) in transactions.iter_mut().enumerate()
769        {
770            if checkpoint.file_offset >= valid_to {
771                last_checkpoint = checkpoint.clone();
772
773                // Truncate the transactions so we don't need to worry about them on the next pass.
774                transactions.truncate(index);
775                break;
776            }
777
778            let context = ApplyContext { mode: ApplyMode::Replay, checkpoint: checkpoint.clone() };
779            for mutation in root_parent_mutations.drain(..) {
780                self.objects
781                    .apply_mutation(
782                        super_block.root_parent_store_object_id,
783                        mutation,
784                        &context,
785                        AssocObj::None,
786                    )
787                    .context("Failed to replay root parent store mutations")?;
788            }
789
790            if let Some((object_id, journal_offset)) = end_flush {
791                journal_offsets.insert(*object_id, *journal_offset);
792            }
793
794            if let Some(object_id) = volume_deleted {
795                journal_offsets.insert(*object_id, VOLUME_DELETED);
796            }
797        }
798
799        // Now we can open the root store.
800        let root_store = ObjectStore::open(
801            &root_parent,
802            super_block.root_store_object_id,
803            Box::new(NullCache {}),
804        )
805        .await
806        .context("Unable to open root store")?;
807
808        ensure!(
809            !root_store.is_encrypted(),
810            anyhow!(FxfsError::Inconsistent).context("Root store is encrypted")
811        );
812        self.objects.set_root_store(root_store);
813
814        let root_store_offset =
815            journal_offsets.get(&super_block.root_store_object_id).copied().unwrap_or(0);
816
817        // Now replay the root store mutations.
818        for JournaledTransaction { checkpoint, root_mutations, .. } in &mut transactions {
819            if checkpoint.file_offset < root_store_offset {
820                continue;
821            }
822
823            let context = ApplyContext { mode: ApplyMode::Replay, checkpoint: checkpoint.clone() };
824            for mutation in root_mutations.drain(..) {
825                self.objects
826                    .apply_mutation(
827                        super_block.root_store_object_id,
828                        mutation,
829                        &context,
830                        AssocObj::None,
831                    )
832                    .context("Failed to replay root store mutations")?;
833            }
834        }
835
836        // Now we can open the allocator.
837        allocator.open().await.context("Failed to open allocator")?;
838
839        // Now replay all other mutations.
840        for JournaledTransaction { checkpoint, non_root_mutations, end_offset, .. } in transactions
841        {
842            self.objects
843                .replay_mutations(
844                    non_root_mutations,
845                    &journal_offsets,
846                    &ApplyContext { mode: ApplyMode::Replay, checkpoint },
847                    end_offset,
848                )
849                .await
850                .context("Failed to replay mutations")?;
851        }
852
853        allocator.on_replay_complete().await.context("Failed to complete replay for allocator")?;
854
855        let discarded_to =
856            if last_checkpoint.file_offset != reader.journal_file_checkpoint().file_offset {
857                Some(reader.journal_file_checkpoint().file_offset)
858            } else {
859                None
860            };
861
862        // Configure the journal writer so that we can continue.
863        {
864            if last_checkpoint.file_offset < super_block.super_block_journal_file_offset {
865                return Err(anyhow!(FxfsError::Inconsistent).context(format!(
866                    "journal replay cut short; journal finishes at {}, but super-block was \
867                     written at {}",
868                    last_checkpoint.file_offset, super_block.super_block_journal_file_offset
869                )));
870            }
871            let handle = ObjectStore::open_object(
872                &root_parent,
873                super_block.journal_object_id,
874                journal_handle_options(),
875                None,
876            )
877            .await
878            .with_context(|| {
879                format!(
880                    "Failed to open journal file (object id: {})",
881                    super_block.journal_object_id
882                )
883            })?;
884            let _ = self.handle.set(handle);
885            let mut inner = self.inner.lock();
886            reader.skip_to_end_of_block();
887            let mut writer_checkpoint = reader.journal_file_checkpoint();
888
889            // Make sure we don't accidentally use the reader from now onwards.
890            std::mem::drop(reader);
891
892            // Reset the stream to indicate that we've remounted the journal.
893            writer_checkpoint.checksum ^= RESET_XOR;
894            writer_checkpoint.version = LATEST_VERSION;
895            inner.flushed_offset = writer_checkpoint.file_offset;
896
897            // When we open the filesystem as writable, we flush the device.
898            inner.device_flushed_offset = inner.flushed_offset;
899
900            inner.writer.seek(writer_checkpoint);
901            inner.output_reset_version = true;
902            inner.valid_to = last_checkpoint.file_offset;
903            if last_checkpoint.file_offset < inner.flushed_offset {
904                inner.discard_offset = Some(last_checkpoint.file_offset);
905            }
906        }
907
908        self.objects
909            .on_replay_complete()
910            .await
911            .context("Failed to complete replay for object manager")?;
912
913        info!(checkpoint = last_checkpoint.file_offset, discarded_to; "replay complete");
914        Ok(())
915    }
916
917    async fn read_transactions(
918        &self,
919        reader: &mut JournalReader,
920        end_offset: Option<u64>,
921        object_id_filter: u64,
922    ) -> Result<JournaledTransactions, Error> {
923        let mut transactions = Vec::new();
924        let (mut device_flushed_offset, root_parent_store_object_id, root_store_object_id) = {
925            let super_block = &self.inner.lock().super_block_header;
926            (
927                super_block.super_block_journal_file_offset,
928                super_block.root_parent_store_object_id,
929                super_block.root_store_object_id,
930            )
931        };
932        let mut current_transaction = None;
933        let mut begin_flush_offsets = HashMap::default();
934        let mut stores_deleted = HashSet::new();
935        loop {
936            // Cache the checkpoint before we deserialize a record.
937            let checkpoint = reader.journal_file_checkpoint();
938            if let Some(end_offset) = end_offset {
939                if checkpoint.file_offset >= end_offset {
940                    break;
941                }
942            }
943            let result =
944                reader.deserialize().await.context("Failed to deserialize journal record")?;
945            match result {
946                ReadResult::Reset(_) => {
947                    if current_transaction.is_some() {
948                        current_transaction = None;
949                        transactions.pop();
950                    }
951                    let offset = reader.journal_file_checkpoint().file_offset;
952                    if offset > device_flushed_offset {
953                        device_flushed_offset = offset;
954                    }
955                }
956                ReadResult::Some(record) => {
957                    match record {
958                        JournalRecord::EndBlock => {
959                            reader.skip_to_end_of_block();
960                        }
961                        JournalRecord::Mutation { object_id, mutation } => {
962                            let current_transaction = match current_transaction.as_mut() {
963                                None => {
964                                    transactions.push(JournaledTransaction::new(checkpoint));
965                                    current_transaction = transactions.last_mut();
966                                    current_transaction.as_mut().unwrap()
967                                }
968                                Some(transaction) => transaction,
969                            };
970
971                            if stores_deleted.contains(&object_id) {
972                                bail!(
973                                    anyhow!(FxfsError::Inconsistent)
974                                        .context("Encountered mutations for deleted store")
975                                );
976                            }
977
978                            match &mutation {
979                                Mutation::BeginFlush => {
980                                    begin_flush_offsets.insert(
981                                        object_id,
982                                        current_transaction.checkpoint.file_offset,
983                                    );
984                                }
985                                Mutation::EndFlush => {
986                                    if let Some(offset) = begin_flush_offsets.remove(&object_id) {
987                                        if let Some(deleted_volume) =
988                                            &current_transaction.volume_deleted
989                                        {
990                                            if *deleted_volume == object_id {
991                                                bail!(anyhow!(FxfsError::Inconsistent).context(
992                                                    "Multiple EndFlush/DeleteVolume mutations in a \
993                                                    single transaction for the same object"
994                                                ));
995                                            }
996                                        }
997                                        // The +1 is because we don't want to replay the transaction
998                                        // containing the begin flush.
999                                        if current_transaction
1000                                            .end_flush
1001                                            .replace((object_id, offset + 1))
1002                                            .is_some()
1003                                        {
1004                                            bail!(anyhow!(FxfsError::Inconsistent).context(
1005                                                "Multiple EndFlush mutations in a \
1006                                                 single transaction"
1007                                            ));
1008                                        }
1009                                    }
1010                                }
1011                                Mutation::DeleteVolume => {
1012                                    if let Some((flushed_object, _)) =
1013                                        &current_transaction.end_flush
1014                                    {
1015                                        if *flushed_object == object_id {
1016                                            bail!(anyhow!(FxfsError::Inconsistent).context(
1017                                                "Multiple EndFlush/DeleteVolume mutations in a \
1018                                                    single transaction for the same object"
1019                                            ));
1020                                        }
1021                                    }
1022                                    if current_transaction
1023                                        .volume_deleted
1024                                        .replace(object_id)
1025                                        .is_some()
1026                                    {
1027                                        bail!(anyhow!(FxfsError::Inconsistent).context(
1028                                            "Multiple DeleteVolume mutations in a single \
1029                                             transaction"
1030                                        ));
1031                                    }
1032                                    stores_deleted.insert(object_id);
1033                                }
1034                                _ => {}
1035                            }
1036
1037                            // If this mutation doesn't need to be applied, don't bother adding it
1038                            // to the transaction.
1039                            if (object_id_filter == INVALID_OBJECT_ID
1040                                || object_id_filter == object_id)
1041                                && self.should_apply(object_id, &current_transaction.checkpoint)
1042                            {
1043                                if object_id == root_parent_store_object_id {
1044                                    current_transaction.root_parent_mutations.push(mutation);
1045                                } else if object_id == root_store_object_id {
1046                                    current_transaction.root_mutations.push(mutation);
1047                                } else {
1048                                    current_transaction
1049                                        .non_root_mutations
1050                                        .push((object_id, mutation));
1051                                }
1052                            }
1053                        }
1054                        JournalRecord::DataChecksums(device_range, checksums, first_write) => {
1055                            let current_transaction = match current_transaction.as_mut() {
1056                                None => {
1057                                    transactions.push(JournaledTransaction::new(checkpoint));
1058                                    current_transaction = transactions.last_mut();
1059                                    current_transaction.as_mut().unwrap()
1060                                }
1061                                Some(transaction) => transaction,
1062                            };
1063                            current_transaction.checksums.push(JournaledChecksums {
1064                                device_range,
1065                                checksums,
1066                                first_write,
1067                            });
1068                        }
1069                        JournalRecord::Commit => {
1070                            if let Some(&mut JournaledTransaction {
1071                                ref checkpoint,
1072                                ref root_parent_mutations,
1073                                ref mut end_offset,
1074                                ..
1075                            }) = current_transaction.take()
1076                            {
1077                                for mutation in root_parent_mutations {
1078                                    // Snoop the mutations for any that might apply to the journal
1079                                    // file so that we can pass them to the reader so that it can
1080                                    // read the journal file.
1081                                    if let Mutation::ObjectStore(ObjectStoreMutation {
1082                                        item:
1083                                            Item {
1084                                                key:
1085                                                    ObjectKey {
1086                                                        object_id,
1087                                                        data:
1088                                                            ObjectKeyData::Attribute(
1089                                                                DEFAULT_DATA_ATTRIBUTE_ID,
1090                                                                AttributeKey::Extent(ExtentKey {
1091                                                                    range,
1092                                                                }),
1093                                                            ),
1094                                                        ..
1095                                                    },
1096                                                value:
1097                                                    ObjectValue::Extent(ExtentValue::Some {
1098                                                        device_offset,
1099                                                        ..
1100                                                    }),
1101                                                ..
1102                                            },
1103                                        ..
1104                                    }) = mutation
1105                                    {
1106                                        // Add the journal extents we find on the way to our
1107                                        // reader.
1108                                        let handle = reader.handle();
1109                                        if *object_id != handle.object_id() {
1110                                            continue;
1111                                        }
1112                                        if let Some(end_offset) = handle.end_offset() {
1113                                            if range.start != end_offset {
1114                                                bail!(anyhow!(FxfsError::Inconsistent).context(
1115                                                    format!(
1116                                                        "Unexpected journal extent {:?} -> {}, \
1117                                                           expected start: {}",
1118                                                        range, device_offset, end_offset,
1119                                                    )
1120                                                ));
1121                                            }
1122                                        }
1123                                        handle.push_extent(
1124                                            checkpoint.file_offset,
1125                                            *device_offset
1126                                                ..*device_offset
1127                                                    + range.length().context("Invalid extent")?,
1128                                        );
1129                                    }
1130                                }
1131                                *end_offset = reader.journal_file_checkpoint().file_offset;
1132                            }
1133                        }
1134                        JournalRecord::Discard(offset) => {
1135                            if offset == 0 {
1136                                bail!(
1137                                    anyhow!(FxfsError::Inconsistent)
1138                                        .context("Invalid offset for Discard")
1139                                );
1140                            }
1141                            if let Some(transaction) = current_transaction.as_ref() {
1142                                if transaction.checkpoint.file_offset < offset {
1143                                    // Odd, but OK.
1144                                    continue;
1145                                }
1146                            }
1147                            current_transaction = None;
1148                            while let Some(transaction) = transactions.last() {
1149                                if transaction.checkpoint.file_offset < offset {
1150                                    break;
1151                                }
1152                                transactions.pop();
1153                            }
1154                            reader.handle().discard_extents(offset);
1155                        }
1156                        JournalRecord::DidFlushDevice(offset) => {
1157                            if offset > device_flushed_offset {
1158                                device_flushed_offset = offset;
1159                            }
1160                        }
1161                    }
1162                }
1163                // This is expected when we reach the end of the journal stream.
1164                ReadResult::ChecksumMismatch => break,
1165            }
1166        }
1167
1168        // Discard any uncommitted transaction.
1169        if current_transaction.is_some() {
1170            transactions.pop();
1171        }
1172
1173        Ok(JournaledTransactions { transactions, device_flushed_offset })
1174    }
1175
1176    /// Creates an empty filesystem with the minimum viable objects (including a root parent and
1177    /// root store but no further child stores).
1178    pub async fn init_empty(&self, filesystem: Arc<FxFilesystem>) -> Result<(), Error> {
1179        // The following constants are only used at format time. When mounting, the recorded values
1180        // in the superblock should be used.  The root parent store does not have a parent, but
1181        // needs an object ID to be registered with ObjectManager, so it cannot collide (i.e. have
1182        // the same object ID) with any objects in the root store that use the journal to track
1183        // mutations.
1184        const INIT_ROOT_PARENT_STORE_OBJECT_ID: u64 = 3;
1185        const INIT_ROOT_STORE_OBJECT_ID: u64 = 4;
1186        const INIT_ALLOCATOR_OBJECT_ID: u64 = 5;
1187
1188        info!(device_size = filesystem.device().size(); "Formatting");
1189
1190        let checkpoint = JournalCheckpoint {
1191            version: LATEST_VERSION,
1192            ..self.inner.lock().writer.journal_file_checkpoint()
1193        };
1194
1195        let root_parent = ObjectStore::new_empty(
1196            None,
1197            INIT_ROOT_PARENT_STORE_OBJECT_ID,
1198            filesystem.clone(),
1199            Box::new(NullCache {}),
1200        );
1201        self.objects.set_root_parent_store(root_parent.clone());
1202
1203        let allocator = Arc::new(Allocator::new(filesystem.clone(), INIT_ALLOCATOR_OBJECT_ID));
1204        self.objects.set_allocator(allocator.clone());
1205        self.objects.init_metadata_reservation()?;
1206
1207        let journal_handle;
1208        let super_block_a_handle;
1209        let super_block_b_handle;
1210        let root_store;
1211        let mut transaction = filesystem
1212            .clone()
1213            .new_transaction(
1214                lock_keys![],
1215                Options { skip_journal_checks: true, ..Default::default() },
1216            )
1217            .await?;
1218        root_store = root_parent
1219            .new_child_store(
1220                &mut transaction,
1221                NewChildStoreOptions { object_id: INIT_ROOT_STORE_OBJECT_ID, ..Default::default() },
1222                Box::new(NullCache {}),
1223            )
1224            .await
1225            .context("new_child_store")?;
1226        self.objects.set_root_store(root_store.clone());
1227
1228        allocator.create(&mut transaction).await?;
1229
1230        // Create the super-block objects...
1231        super_block_a_handle = ObjectStore::create_object_with_id(
1232            &root_store,
1233            &mut transaction,
1234            SuperBlockInstance::A.object_id(),
1235            HandleOptions::default(),
1236            None,
1237        )
1238        .context("create super block")?;
1239        root_store.update_last_object_id(SuperBlockInstance::A.object_id());
1240        super_block_a_handle
1241            .extend(&mut transaction, SuperBlockInstance::A.first_extent())
1242            .await
1243            .context("extend super block")?;
1244        super_block_b_handle = ObjectStore::create_object_with_id(
1245            &root_store,
1246            &mut transaction,
1247            SuperBlockInstance::B.object_id(),
1248            HandleOptions::default(),
1249            None,
1250        )
1251        .context("create super block")?;
1252        root_store.update_last_object_id(SuperBlockInstance::B.object_id());
1253        super_block_b_handle
1254            .extend(&mut transaction, SuperBlockInstance::B.first_extent())
1255            .await
1256            .context("extend super block")?;
1257
1258        // the journal object...
1259        journal_handle = ObjectStore::create_object(
1260            &root_parent,
1261            &mut transaction,
1262            journal_handle_options(),
1263            None,
1264        )
1265        .await
1266        .context("create journal")?;
1267        if self.inner.lock().image_builder_mode.is_none() {
1268            let mut file_range = 0..self.chunk_size();
1269            journal_handle
1270                .preallocate_range(&mut transaction, &mut file_range)
1271                .await
1272                .context("preallocate journal")?;
1273            if file_range.start < file_range.end {
1274                bail!("preallocate_range returned too little space");
1275            }
1276        }
1277
1278        // Write the root store object info.
1279        root_store.create(&mut transaction).await?;
1280
1281        // The root parent graveyard.
1282        root_parent
1283            .set_graveyard_directory_object_id(Graveyard::create(&mut transaction, &root_parent));
1284
1285        transaction.commit().await?;
1286
1287        self.inner.lock().super_block_header = SuperBlockHeader::new(
1288            root_parent.store_object_id(),
1289            root_parent.graveyard_directory_object_id(),
1290            root_store.store_object_id(),
1291            allocator.object_id(),
1292            journal_handle.object_id(),
1293            checkpoint,
1294            /* earliest_version: */ LATEST_VERSION,
1295        );
1296
1297        // Initialize the journal writer.
1298        let _ = self.handle.set(journal_handle);
1299        Ok(())
1300    }
1301
1302    /// Normally we allocate the journal when creating the filesystem.
1303    /// This is used image_builder_mode when journal allocation is done last.
1304    pub async fn allocate_journal(&self) -> Result<(), Error> {
1305        let handle = self.handle.get().unwrap();
1306        let filesystem = handle.store().filesystem();
1307        let mut transaction = filesystem
1308            .clone()
1309            .new_transaction(
1310                lock_keys![LockKey::object(handle.store().store_object_id(), handle.object_id()),],
1311                Options { skip_journal_checks: true, ..Default::default() },
1312            )
1313            .await?;
1314        let mut file_range = 0..self.chunk_size();
1315        self.handle
1316            .get()
1317            .unwrap()
1318            .preallocate_range(&mut transaction, &mut file_range)
1319            .await
1320            .context("preallocate journal")?;
1321        if file_range.start < file_range.end {
1322            bail!("preallocate_range returned too little space");
1323        }
1324        transaction.commit().await?;
1325        Ok(())
1326    }
1327
1328    pub async fn init_superblocks(&self) -> Result<(), Error> {
1329        // Overwrite both superblocks.
1330        for _ in 0..2 {
1331            self.write_super_block().await?;
1332        }
1333        Ok(())
1334    }
1335
1336    /// Takes a snapshot of all journaled transactions which affect |object_id| since its last
1337    /// flush.
1338    /// The caller is responsible for locking; it must ensure that the journal is not trimmed during
1339    /// this call.  For example, a Flush lock could be held on the object in question (assuming that
1340    /// object has data to flush and is registered with ObjectManager).
1341    pub async fn read_transactions_for_object(
1342        &self,
1343        object_id: u64,
1344    ) -> Result<Vec<JournaledTransaction>, Error> {
1345        let handle = self.handle.get().expect("No journal handle");
1346        // Reopen the handle since JournalReader needs an owned handle.
1347        let handle = ObjectStore::open_object(
1348            handle.owner(),
1349            handle.object_id(),
1350            journal_handle_options(),
1351            None,
1352        )
1353        .await?;
1354
1355        let checkpoint = match self.objects.journal_checkpoint(object_id) {
1356            Some(checkpoint) => checkpoint,
1357            None => return Ok(vec![]),
1358        };
1359        let mut reader = JournalReader::new(handle, &checkpoint);
1360        // Record the current end offset and only read to there, so we don't accidentally read any
1361        // partially flushed blocks.
1362        let end_offset = self.inner.lock().valid_to;
1363        Ok(self.read_transactions(&mut reader, Some(end_offset), object_id).await?.transactions)
1364    }
1365
1366    /// Commits a transaction.  This is not thread safe; the caller must take appropriate locks.
1367    pub async fn commit(&self, transaction: &mut Transaction<'_>) -> Result<u64, Error> {
1368        if transaction.is_empty() {
1369            return Ok(self.inner.lock().writer.journal_file_checkpoint().file_offset);
1370        }
1371
1372        self.pre_commit(transaction).await?;
1373        Ok(self.write_and_apply_mutations(transaction))
1374    }
1375
1376    // Before we commit, we might need to extend the journal or write pending records to the
1377    // journal.
1378    async fn pre_commit(&self, transaction: &Transaction<'_>) -> Result<(), Error> {
1379        let handle;
1380
1381        let (size, zero_offset) = {
1382            let mut inner = self.inner.lock();
1383
1384            // If this is the first write after a RESET, we need to output version first.
1385            if std::mem::take(&mut inner.output_reset_version) {
1386                LATEST_VERSION.serialize_into(&mut inner.writer)?;
1387            }
1388
1389            if let Some(discard_offset) = inner.discard_offset {
1390                JournalRecord::Discard(discard_offset).serialize_into(&mut inner.writer)?;
1391                inner.discard_offset = None;
1392            }
1393
1394            if inner.needs_did_flush_device {
1395                let offset = inner.device_flushed_offset;
1396                JournalRecord::DidFlushDevice(offset).serialize_into(&mut inner.writer)?;
1397                inner.needs_did_flush_device = false;
1398            }
1399
1400            handle = match self.handle.get() {
1401                None => return Ok(()),
1402                Some(x) => x,
1403            };
1404
1405            let file_offset = inner.writer.journal_file_checkpoint().file_offset;
1406
1407            let size = handle.get_size();
1408            let size = if file_offset + self.chunk_size() > size { Some(size) } else { None };
1409
1410            if size.is_none()
1411                && inner.zero_offset.is_none()
1412                && !self.objects.needs_borrow_for_journal(file_offset)
1413            {
1414                return Ok(());
1415            }
1416
1417            (size, inner.zero_offset)
1418        };
1419
1420        let mut transaction = handle
1421            .new_transaction_with_options(Options {
1422                skip_journal_checks: true,
1423                borrow_metadata_space: true,
1424                allocator_reservation: Some(self.objects.metadata_reservation()),
1425                txn_guard: Some(transaction.txn_guard()),
1426                ..Default::default()
1427            })
1428            .await?;
1429        if let Some(size) = size {
1430            handle
1431                .preallocate_range(&mut transaction, &mut (size..size + self.chunk_size()))
1432                .await?;
1433        }
1434        if let Some(zero_offset) = zero_offset {
1435            handle.zero(&mut transaction, 0..zero_offset).await?;
1436        }
1437
1438        // We can't use regular transaction commit, because that can cause re-entrancy issues, so
1439        // instead we just apply the transaction directly here.
1440        self.write_and_apply_mutations(&mut transaction);
1441
1442        let mut inner = self.inner.lock();
1443
1444        // Make sure the transaction to extend the journal made it to the journal within the old
1445        // size, since otherwise, it won't be possible to replay.
1446        if let Some(size) = size {
1447            assert!(inner.writer.journal_file_checkpoint().file_offset < size);
1448        }
1449
1450        if inner.zero_offset == zero_offset {
1451            inner.zero_offset = None;
1452        }
1453
1454        Ok(())
1455    }
1456
1457    // Determines whether a mutation at the given checkpoint should be applied.  During replay, not
1458    // all records should be applied because the object store or allocator might already contain the
1459    // mutation.  After replay, that obviously isn't the case and we want to apply all mutations.
1460    fn should_apply(&self, object_id: u64, journal_file_checkpoint: &JournalCheckpoint) -> bool {
1461        let super_block_header = &self.inner.lock().super_block_header;
1462        let offset = super_block_header
1463            .journal_file_offsets
1464            .get(&object_id)
1465            .cloned()
1466            .unwrap_or(super_block_header.super_block_journal_file_offset);
1467        journal_file_checkpoint.file_offset >= offset
1468    }
1469
1470    /// Flushes previous writes to the device and then writes out a new super-block.
1471    /// Callers must ensure that we do not make concurrent calls.
1472    async fn write_super_block(&self) -> Result<(), Error> {
1473        let root_parent_store = self.objects.root_parent_store();
1474
1475        // We need to flush previous writes to the device since the new super-block we are writing
1476        // relies on written data being observable, and we also need to lock the root parent store
1477        // so that no new entries are written to it whilst we are writing the super-block, and for
1478        // that we use the write lock.
1479        let old_layers;
1480        let old_super_block_offset;
1481        let mut new_super_block_header;
1482        let checkpoint;
1483        let borrowed;
1484
1485        {
1486            let _sync_guard = debug_assert_not_too_long!(self.sync_mutex.lock());
1487            {
1488                let _write_guard = self.writer_mutex.lock();
1489                (checkpoint, borrowed) = self.pad_to_block()?;
1490                old_layers = super_block::compact_root_parent(&*root_parent_store)?;
1491            }
1492            self.flush_device(checkpoint.file_offset)
1493                .await
1494                .context("flush failed when writing superblock")?;
1495        }
1496
1497        new_super_block_header = self.inner.lock().super_block_header.clone();
1498
1499        old_super_block_offset = new_super_block_header.journal_checkpoint.file_offset;
1500
1501        let (journal_file_offsets, min_checkpoint) = self.objects.journal_file_offsets();
1502
1503        new_super_block_header.generation =
1504            new_super_block_header.generation.checked_add(1).ok_or(FxfsError::Inconsistent)?;
1505        new_super_block_header.super_block_journal_file_offset = checkpoint.file_offset;
1506        new_super_block_header.journal_checkpoint = min_checkpoint.unwrap_or(checkpoint);
1507        new_super_block_header.journal_checkpoint.version = LATEST_VERSION;
1508        new_super_block_header.journal_file_offsets = journal_file_offsets;
1509        new_super_block_header.borrowed_metadata_space = borrowed;
1510
1511        self.super_block_manager
1512            .save(
1513                new_super_block_header.clone(),
1514                self.objects.root_parent_store().filesystem(),
1515                old_layers,
1516            )
1517            .await?;
1518        {
1519            let mut inner = self.inner.lock();
1520            inner.super_block_header = new_super_block_header;
1521            inner.zero_offset = Some(round_down(old_super_block_offset, BLOCK_SIZE));
1522        }
1523
1524        Ok(())
1525    }
1526
1527    /// Flushes any buffered journal data to the device.  Note that this does not flush the device
1528    /// unless the flush_device option is set, in which case data should have been persisted to
1529    /// lower layers.  If a precondition is supplied, it is evaluated and the sync will be skipped
1530    /// if it returns false.  This allows callers to check a condition whilst a lock is held.  If a
1531    /// sync is performed, this function returns the checkpoint that was flushed and the amount of
1532    /// borrowed metadata space at the point it was flushed.
1533    pub async fn sync(
1534        &self,
1535        options: SyncOptions<'_>,
1536    ) -> Result<Option<(JournalCheckpoint, u64)>, Error> {
1537        let _guard = debug_assert_not_too_long!(self.sync_mutex.lock());
1538
1539        let (checkpoint, borrowed) = {
1540            if let Some(precondition) = options.precondition {
1541                if !precondition() {
1542                    return Ok(None);
1543                }
1544            }
1545
1546            // This guard is required so that we don't insert an EndBlock record in the middle of a
1547            // transaction.
1548            let _guard = self.writer_mutex.lock();
1549
1550            self.pad_to_block()?
1551        };
1552
1553        if options.flush_device {
1554            self.flush_device(checkpoint.file_offset).await.context("sync: flush failed")?;
1555        }
1556
1557        Ok(Some((checkpoint, borrowed)))
1558    }
1559
1560    // Returns the checkpoint as it was prior to padding.  This is done because the super block
1561    // needs to record where the last transaction ends and it's the next transaction that pays the
1562    // price of the padding.
1563    fn pad_to_block(&self) -> Result<(JournalCheckpoint, u64), Error> {
1564        let mut inner = self.inner.lock();
1565        let checkpoint = inner.writer.journal_file_checkpoint();
1566        if checkpoint.file_offset % BLOCK_SIZE != 0 {
1567            JournalRecord::EndBlock.serialize_into(&mut inner.writer)?;
1568            inner.writer.pad_to_block()?;
1569            if let Some(waker) = inner.flush_waker.take() {
1570                waker.wake();
1571            }
1572        }
1573        Ok((checkpoint, self.objects.borrowed_metadata_space()))
1574    }
1575
1576    async fn flush_device(&self, checkpoint_offset: u64) -> Result<(), Error> {
1577        debug_assert_not_too_long!(poll_fn(|ctx| {
1578            let mut inner = self.inner.lock();
1579            if inner.flushed_offset >= checkpoint_offset {
1580                Poll::Ready(Ok(()))
1581            } else if inner.terminate {
1582                let context = inner
1583                    .terminate_reason
1584                    .as_ref()
1585                    .map(|e| format!("Journal closed with error: {:?}", e))
1586                    .unwrap_or_else(|| "Journal closed".to_string());
1587                Poll::Ready(Err(anyhow!(FxfsError::JournalFlushError).context(context)))
1588            } else {
1589                inner.sync_waker = Some(ctx.waker().clone());
1590                Poll::Pending
1591            }
1592        }))?;
1593
1594        let needs_flush = self.inner.lock().device_flushed_offset < checkpoint_offset;
1595        if needs_flush {
1596            let trace = self.trace.load(Ordering::Relaxed);
1597            if trace {
1598                info!("J: start flush device");
1599            }
1600            self.handle.get().unwrap().flush_device().await?;
1601            if trace {
1602                info!("J: end flush device");
1603            }
1604
1605            // We need to write a DidFlushDevice record at some point, but if we are in the
1606            // process of shutting down the filesystem, we want to leave the journal clean to
1607            // avoid there being log messages complaining about unwritten journal data, so we
1608            // queue it up so that the next transaction will trigger this record to be written.
1609            // If we are shutting down, that will never happen but since the DidFlushDevice
1610            // message is purely advisory (it reduces the number of checksums we have to verify
1611            // during replay), it doesn't matter if it isn't written.
1612            {
1613                let mut inner = self.inner.lock();
1614                inner.device_flushed_offset = checkpoint_offset;
1615                inner.needs_did_flush_device = true;
1616            }
1617
1618            // Tell the allocator that we flushed the device so that it can now start using
1619            // space that was deallocated.
1620            self.objects.allocator().did_flush_device(checkpoint_offset);
1621            if trace {
1622                info!("J: did flush device");
1623            }
1624        }
1625
1626        Ok(())
1627    }
1628
1629    /// Returns a copy of the super-block header.
1630    pub fn super_block_header(&self) -> SuperBlockHeader {
1631        self.inner.lock().super_block_header.clone()
1632    }
1633
1634    /// Waits for there to be sufficient space in the journal.
1635    pub async fn check_journal_space(&self) -> Result<(), Error> {
1636        loop {
1637            debug_assert_not_too_long!({
1638                let inner = self.inner.lock();
1639                if inner.terminate {
1640                    // If the flush error is set, this will never make progress, since we can't
1641                    // extend the journal any more.
1642                    let context = inner
1643                        .terminate_reason
1644                        .as_ref()
1645                        .map(|e| format!("Journal closed with error: {:?}", e))
1646                        .unwrap_or_else(|| "Journal closed".to_string());
1647                    break Err(anyhow!(FxfsError::JournalFlushError).context(context));
1648                }
1649                if self.objects.last_end_offset()
1650                    - inner.super_block_header.journal_checkpoint.file_offset
1651                    < inner.reclaim_size
1652                {
1653                    break Ok(());
1654                }
1655                if inner.image_builder_mode.is_some() {
1656                    break Ok(());
1657                }
1658                if inner.disable_compactions {
1659                    break Err(
1660                        anyhow!(FxfsError::JournalFlushError).context("Compactions disabled")
1661                    );
1662                }
1663                self.reclaim_event.listen()
1664            });
1665        }
1666    }
1667
1668    fn chunk_size(&self) -> u64 {
1669        CHUNK_SIZE
1670    }
1671
1672    fn write_and_apply_mutations(&self, transaction: &mut Transaction<'_>) -> u64 {
1673        let checkpoint_before;
1674        let checkpoint_after;
1675        {
1676            let _guard = self.writer_mutex.lock();
1677            checkpoint_before = {
1678                let mut inner = self.inner.lock();
1679                if transaction.includes_write() {
1680                    inner.needs_barrier = true;
1681                }
1682                let checkpoint = inner.writer.journal_file_checkpoint();
1683                for TxnMutation { object_id, mutation, .. } in transaction.mutations() {
1684                    self.objects.write_mutation(
1685                        *object_id,
1686                        mutation,
1687                        Writer(*object_id, &mut inner.writer),
1688                    );
1689                }
1690                checkpoint
1691            };
1692            let maybe_mutation =
1693                self.objects.apply_transaction(transaction, &checkpoint_before).expect(
1694                    "apply_transaction should not fail in live mode; \
1695                     filesystem will be in an inconsistent state",
1696                );
1697            checkpoint_after = {
1698                let mut inner = self.inner.lock();
1699                if let Some(mutation) = maybe_mutation {
1700                    inner
1701                        .writer
1702                        .write_record(&JournalRecord::Mutation { object_id: 0, mutation })
1703                        .unwrap();
1704                }
1705                for (device_range, checksums, first_write) in
1706                    transaction.take_checksums().into_iter()
1707                {
1708                    inner
1709                        .writer
1710                        .write_record(&JournalRecord::DataChecksums(
1711                            device_range,
1712                            Checksums::fletcher(checksums),
1713                            first_write,
1714                        ))
1715                        .unwrap();
1716                }
1717                inner.writer.write_record(&JournalRecord::Commit).unwrap();
1718
1719                inner.writer.journal_file_checkpoint()
1720            };
1721        }
1722        self.objects.did_commit_transaction(
1723            transaction,
1724            &checkpoint_before,
1725            checkpoint_after.file_offset,
1726        );
1727
1728        if let Some(waker) = self.inner.lock().flush_waker.take() {
1729            waker.wake();
1730        }
1731
1732        checkpoint_before.file_offset
1733    }
1734
1735    /// This task will flush journal data to the device when there is data that needs flushing, and
1736    /// trigger compactions when short of journal space.  It will return after the terminate method
1737    /// has been called, or an error is encountered with either flushing or compaction.
1738    pub async fn flush_task(self: Arc<Self>) {
1739        let mut flush_fut = None;
1740        let mut compact_fut = None;
1741        let mut flush_error = false;
1742        poll_fn(|ctx| {
1743            loop {
1744                {
1745                    let mut inner = self.inner.lock();
1746                    if flush_fut.is_none() && !flush_error && self.handle.get().is_some() {
1747                        let flushable = inner.writer.flushable_bytes();
1748                        if flushable > 0 {
1749                            flush_fut = Some(self.flush(flushable).boxed());
1750                        }
1751                    }
1752                    if inner.terminate && flush_fut.is_none() && compact_fut.is_none() {
1753                        return Poll::Ready(());
1754                    }
1755                    // The / 2 is here because after compacting, we cannot reclaim the space until the
1756                    // _next_ time we flush the device since the super-block is not guaranteed to
1757                    // persist until then.
1758                    if compact_fut.is_none()
1759                        && !inner.terminate
1760                        && !inner.disable_compactions
1761                        && inner.image_builder_mode.is_none()
1762                        && self.objects.last_end_offset()
1763                            - inner.super_block_header.journal_checkpoint.file_offset
1764                            > inner.reclaim_size / 2
1765                    {
1766                        compact_fut = Some(self.compact().boxed());
1767                        inner.compaction_running = true;
1768                    }
1769                    inner.flush_waker = Some(ctx.waker().clone());
1770                }
1771                let mut pending = true;
1772                if let Some(fut) = flush_fut.as_mut() {
1773                    if let Poll::Ready(result) = fut.poll_unpin(ctx) {
1774                        if let Err(e) = result {
1775                            self.inner.lock().terminate(Some(e.context("Flush error")));
1776                            self.reclaim_event.notify(usize::MAX);
1777                            flush_error = true;
1778                        }
1779                        flush_fut = None;
1780                        pending = false;
1781                    }
1782                }
1783                if let Some(fut) = compact_fut.as_mut() {
1784                    if let Poll::Ready(result) = fut.poll_unpin(ctx) {
1785                        let mut inner = self.inner.lock();
1786                        if let Err(e) = result {
1787                            inner.terminate(Some(e.context("Compaction error")));
1788                        }
1789                        compact_fut = None;
1790                        inner.compaction_running = false;
1791                        self.reclaim_event.notify(usize::MAX);
1792                        pending = false;
1793                    }
1794                }
1795                if pending {
1796                    return Poll::Pending;
1797                }
1798            }
1799        })
1800        .await;
1801    }
1802
1803    async fn flush(&self, amount: usize) -> Result<(), Error> {
1804        let handle = self.handle.get().unwrap();
1805        let mut buf = handle.allocate_buffer(amount).await;
1806        let (offset, len, barrier_on_first_write) = {
1807            let mut inner = self.inner.lock();
1808            let offset = inner.writer.take_flushable(buf.as_mut());
1809            let barrier_on_first_write = inner.needs_barrier && inner.barriers_enabled;
1810            // Reset `needs_barrier` before instead of after the overwrite in case a txn commit
1811            // that contains data happens during the overwrite.
1812            inner.needs_barrier = false;
1813            (offset, buf.len() as u64, barrier_on_first_write)
1814        };
1815        self.handle
1816            .get()
1817            .unwrap()
1818            .overwrite(
1819                offset,
1820                buf.as_mut(),
1821                OverwriteOptions { barrier_on_first_write, ..Default::default() },
1822            )
1823            .await?;
1824
1825        let mut inner = self.inner.lock();
1826        if let Some(waker) = inner.sync_waker.take() {
1827            waker.wake();
1828        }
1829        inner.flushed_offset = offset + len;
1830        inner.valid_to = inner.flushed_offset;
1831        Ok(())
1832    }
1833
1834    /// This should generally NOT be called externally. It is public to allow use by FIDL service
1835    /// fxfs.Debug.
1836    #[trace]
1837    pub async fn compact(&self) -> Result<(), Error> {
1838        let trace = self.trace.load(Ordering::Relaxed);
1839        debug!("Compaction starting");
1840        if trace {
1841            info!("J: start compaction");
1842        }
1843        let earliest_version = self.objects.flush().await.context("Failed to flush objects")?;
1844        self.inner.lock().super_block_header.earliest_version = earliest_version;
1845        self.write_super_block().await.context("Failed to write superblock")?;
1846        if trace {
1847            info!("J: end compaction");
1848        }
1849        debug!("Compaction finished");
1850        Ok(())
1851    }
1852
1853    pub async fn stop_compactions(&self) {
1854        loop {
1855            debug_assert_not_too_long!({
1856                let mut inner = self.inner.lock();
1857                inner.disable_compactions = true;
1858                if !inner.compaction_running {
1859                    return;
1860                }
1861                self.reclaim_event.listen()
1862            });
1863        }
1864    }
1865
1866    /// Creates a lazy inspect node named `str` under `parent` which will yield statistics for the
1867    /// journal when queried.
1868    pub fn track_statistics(self: &Arc<Self>, parent: &fuchsia_inspect::Node, name: &str) {
1869        let this = Arc::downgrade(self);
1870        parent.record_lazy_child(name, move || {
1871            let this_clone = this.clone();
1872            async move {
1873                let inspector = fuchsia_inspect::Inspector::default();
1874                if let Some(this) = this_clone.upgrade() {
1875                    let (journal_min, journal_max, journal_reclaim_size) = {
1876                        // TODO(https://fxbug.dev/42069513): Push-back or rate-limit to prevent DoS.
1877                        let inner = this.inner.lock();
1878                        (
1879                            round_down(
1880                                inner.super_block_header.journal_checkpoint.file_offset,
1881                                BLOCK_SIZE,
1882                            ),
1883                            inner.flushed_offset,
1884                            inner.reclaim_size,
1885                        )
1886                    };
1887                    let root = inspector.root();
1888                    root.record_uint("journal_min_offset", journal_min);
1889                    root.record_uint("journal_max_offset", journal_max);
1890                    root.record_uint("journal_size", journal_max - journal_min);
1891                    root.record_uint("journal_reclaim_size", journal_reclaim_size);
1892
1893                    // TODO(https://fxbug.dev/42068224): Post-compute rather than manually computing metrics.
1894                    if let Some(x) = round_div(
1895                        100 * (journal_max - journal_min),
1896                        this.objects.allocator().get_disk_bytes(),
1897                    ) {
1898                        root.record_uint("journal_size_to_disk_size_percent", x);
1899                    }
1900                }
1901                Ok(inspector)
1902            }
1903            .boxed()
1904        });
1905    }
1906
1907    /// Terminate all journal activity.
1908    pub fn terminate(&self) {
1909        self.inner.lock().terminate(/*reason*/ None);
1910        self.reclaim_event.notify(usize::MAX);
1911    }
1912}
1913
1914/// Wrapper to allow records to be written to the journal.
1915pub struct Writer<'a>(u64, &'a mut JournalWriter);
1916
1917impl Writer<'_> {
1918    pub fn write(&mut self, mutation: Mutation) {
1919        self.1.write_record(&JournalRecord::Mutation { object_id: self.0, mutation }).unwrap();
1920    }
1921}
1922
1923#[cfg(test)]
1924mod tests {
1925    use crate::filesystem::{FxFilesystem, SyncOptions};
1926    use crate::fsck::fsck;
1927    use crate::object_handle::{ObjectHandle, ReadObjectHandle, WriteObjectHandle};
1928    use crate::object_store::directory::Directory;
1929    use crate::object_store::transaction::Options;
1930    use crate::object_store::volume::root_volume;
1931    use crate::object_store::{
1932        HandleOptions, LockKey, NewChildStoreOptions, ObjectStore, StoreOptions, lock_keys,
1933    };
1934    use fuchsia_async as fasync;
1935    use fuchsia_async::MonotonicDuration;
1936    use storage_device::DeviceHolder;
1937    use storage_device::fake_device::FakeDevice;
1938
1939    const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
1940
1941    #[fuchsia::test]
1942    async fn test_replay() {
1943        const TEST_DATA: &[u8] = b"hello";
1944
1945        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1946
1947        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1948
1949        let object_id = {
1950            let root_store = fs.root_store();
1951            let root_directory =
1952                Directory::open(&root_store, root_store.root_directory_object_id())
1953                    .await
1954                    .expect("open failed");
1955            let mut transaction = fs
1956                .clone()
1957                .new_transaction(
1958                    lock_keys![LockKey::object(
1959                        root_store.store_object_id(),
1960                        root_store.root_directory_object_id(),
1961                    )],
1962                    Options::default(),
1963                )
1964                .await
1965                .expect("new_transaction failed");
1966            let handle = root_directory
1967                .create_child_file(&mut transaction, "test")
1968                .await
1969                .expect("create_child_file failed");
1970
1971            transaction.commit().await.expect("commit failed");
1972            let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
1973            buf.as_mut_slice().copy_from_slice(TEST_DATA);
1974            handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
1975            // As this is the first sync, this will actually trigger a new super-block, but normally
1976            // this would not be the case.
1977            fs.sync(SyncOptions::default()).await.expect("sync failed");
1978            handle.object_id()
1979        };
1980
1981        {
1982            fs.close().await.expect("Close failed");
1983            let device = fs.take_device().await;
1984            device.reopen(false);
1985            let fs = FxFilesystem::open(device).await.expect("open failed");
1986            let handle = ObjectStore::open_object(
1987                &fs.root_store(),
1988                object_id,
1989                HandleOptions::default(),
1990                None,
1991            )
1992            .await
1993            .expect("open_object failed");
1994            let mut buf = handle.allocate_buffer(TEST_DEVICE_BLOCK_SIZE as usize).await;
1995            assert_eq!(handle.read(0, buf.as_mut()).await.expect("read failed"), TEST_DATA.len());
1996            assert_eq!(&buf.as_slice()[..TEST_DATA.len()], TEST_DATA);
1997            fsck(fs.clone()).await.expect("fsck failed");
1998            fs.close().await.expect("Close failed");
1999        }
2000    }
2001
2002    #[fuchsia::test]
2003    async fn test_reset() {
2004        const TEST_DATA: &[u8] = b"hello";
2005
2006        let device = DeviceHolder::new(FakeDevice::new(32768, TEST_DEVICE_BLOCK_SIZE));
2007
2008        let mut object_ids = Vec::new();
2009
2010        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2011        {
2012            let root_store = fs.root_store();
2013            let root_directory =
2014                Directory::open(&root_store, root_store.root_directory_object_id())
2015                    .await
2016                    .expect("open failed");
2017            let mut transaction = fs
2018                .clone()
2019                .new_transaction(
2020                    lock_keys![LockKey::object(
2021                        root_store.store_object_id(),
2022                        root_store.root_directory_object_id(),
2023                    )],
2024                    Options::default(),
2025                )
2026                .await
2027                .expect("new_transaction failed");
2028            let handle = root_directory
2029                .create_child_file(&mut transaction, "test")
2030                .await
2031                .expect("create_child_file failed");
2032            transaction.commit().await.expect("commit failed");
2033            let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
2034            buf.as_mut_slice().copy_from_slice(TEST_DATA);
2035            handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2036            fs.sync(SyncOptions::default()).await.expect("sync failed");
2037            object_ids.push(handle.object_id());
2038
2039            // Create a lot of objects but don't sync at the end. This should leave the filesystem
2040            // with a half finished transaction that cannot be replayed.
2041            for i in 0..1000 {
2042                let mut transaction = fs
2043                    .clone()
2044                    .new_transaction(
2045                        lock_keys![LockKey::object(
2046                            root_store.store_object_id(),
2047                            root_store.root_directory_object_id(),
2048                        )],
2049                        Options::default(),
2050                    )
2051                    .await
2052                    .expect("new_transaction failed");
2053                let handle = root_directory
2054                    .create_child_file(&mut transaction, &format!("{}", i))
2055                    .await
2056                    .expect("create_child_file failed");
2057                transaction.commit().await.expect("commit failed");
2058                let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
2059                buf.as_mut_slice().copy_from_slice(TEST_DATA);
2060                handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2061                object_ids.push(handle.object_id());
2062            }
2063        }
2064        fs.close().await.expect("fs close failed");
2065        let device = fs.take_device().await;
2066        device.reopen(false);
2067        let fs = FxFilesystem::open(device).await.expect("open failed");
2068        fsck(fs.clone()).await.expect("fsck failed");
2069        {
2070            let root_store = fs.root_store();
2071            // Check the first two objects which should exist.
2072            for &object_id in &object_ids[0..1] {
2073                let handle = ObjectStore::open_object(
2074                    &root_store,
2075                    object_id,
2076                    HandleOptions::default(),
2077                    None,
2078                )
2079                .await
2080                .expect("open_object failed");
2081                let mut buf = handle.allocate_buffer(TEST_DEVICE_BLOCK_SIZE as usize).await;
2082                assert_eq!(
2083                    handle.read(0, buf.as_mut()).await.expect("read failed"),
2084                    TEST_DATA.len()
2085                );
2086                assert_eq!(&buf.as_slice()[..TEST_DATA.len()], TEST_DATA);
2087            }
2088
2089            // Write one more object and sync.
2090            let root_directory =
2091                Directory::open(&root_store, root_store.root_directory_object_id())
2092                    .await
2093                    .expect("open failed");
2094            let mut transaction = fs
2095                .clone()
2096                .new_transaction(
2097                    lock_keys![LockKey::object(
2098                        root_store.store_object_id(),
2099                        root_store.root_directory_object_id(),
2100                    )],
2101                    Options::default(),
2102                )
2103                .await
2104                .expect("new_transaction failed");
2105            let handle = root_directory
2106                .create_child_file(&mut transaction, "test2")
2107                .await
2108                .expect("create_child_file failed");
2109            transaction.commit().await.expect("commit failed");
2110            let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
2111            buf.as_mut_slice().copy_from_slice(TEST_DATA);
2112            handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2113            fs.sync(SyncOptions::default()).await.expect("sync failed");
2114            object_ids.push(handle.object_id());
2115        }
2116
2117        fs.close().await.expect("close failed");
2118        let device = fs.take_device().await;
2119        device.reopen(false);
2120        let fs = FxFilesystem::open(device).await.expect("open failed");
2121        {
2122            fsck(fs.clone()).await.expect("fsck failed");
2123
2124            // Check the first two and the last objects.
2125            for &object_id in object_ids[0..1].iter().chain(object_ids.last().cloned().iter()) {
2126                let handle = ObjectStore::open_object(
2127                    &fs.root_store(),
2128                    object_id,
2129                    HandleOptions::default(),
2130                    None,
2131                )
2132                .await
2133                .unwrap_or_else(|e| {
2134                    panic!("open_object failed (object_id: {}): {:?}", object_id, e)
2135                });
2136                let mut buf = handle.allocate_buffer(TEST_DEVICE_BLOCK_SIZE as usize).await;
2137                assert_eq!(
2138                    handle.read(0, buf.as_mut()).await.expect("read failed"),
2139                    TEST_DATA.len()
2140                );
2141                assert_eq!(&buf.as_slice()[..TEST_DATA.len()], TEST_DATA);
2142            }
2143        }
2144        fs.close().await.expect("close failed");
2145    }
2146
2147    #[fuchsia::test]
2148    async fn test_discard() {
2149        let device = {
2150            let device = DeviceHolder::new(FakeDevice::new(8192, 4096));
2151            let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2152            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
2153
2154            let store = root_volume
2155                .new_volume("test", NewChildStoreOptions::default())
2156                .await
2157                .expect("new_volume failed");
2158            let root_directory = Directory::open(&store, store.root_directory_object_id())
2159                .await
2160                .expect("open failed");
2161
2162            // Create enough data so that another journal extent is used.
2163            let mut i = 0;
2164            loop {
2165                let mut transaction = fs
2166                    .clone()
2167                    .new_transaction(
2168                        lock_keys![LockKey::object(
2169                            store.store_object_id(),
2170                            store.root_directory_object_id()
2171                        )],
2172                        Options::default(),
2173                    )
2174                    .await
2175                    .expect("new_transaction failed");
2176                root_directory
2177                    .create_child_file(&mut transaction, &format!("a {i}"))
2178                    .await
2179                    .expect("create_child_file failed");
2180                if transaction.commit().await.expect("commit failed") > super::CHUNK_SIZE {
2181                    break;
2182                }
2183                i += 1;
2184            }
2185
2186            // Compact and then disable compactions.
2187            fs.journal().compact().await.expect("compact failed");
2188            fs.journal().stop_compactions().await;
2189
2190            // Keep going until we need another journal extent.
2191            let mut i = 0;
2192            loop {
2193                let mut transaction = fs
2194                    .clone()
2195                    .new_transaction(
2196                        lock_keys![LockKey::object(
2197                            store.store_object_id(),
2198                            store.root_directory_object_id()
2199                        )],
2200                        Options::default(),
2201                    )
2202                    .await
2203                    .expect("new_transaction failed");
2204                root_directory
2205                    .create_child_file(&mut transaction, &format!("b {i}"))
2206                    .await
2207                    .expect("create_child_file failed");
2208                if transaction.commit().await.expect("commit failed") > 2 * super::CHUNK_SIZE {
2209                    break;
2210                }
2211                i += 1;
2212            }
2213
2214            // Allow the journal to flush, but we don't want to sync.
2215            fasync::Timer::new(MonotonicDuration::from_millis(10)).await;
2216            // Because we're not gracefully closing the filesystem, a Discard record will be
2217            // emitted.
2218            fs.device().snapshot().expect("snapshot failed")
2219        };
2220
2221        let fs = FxFilesystem::open(device).await.expect("open failed");
2222
2223        {
2224            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
2225
2226            let store =
2227                root_volume.volume("test", StoreOptions::default()).await.expect("volume failed");
2228
2229            let root_directory = Directory::open(&store, store.root_directory_object_id())
2230                .await
2231                .expect("open failed");
2232
2233            // Write one more transaction.
2234            let mut transaction = fs
2235                .clone()
2236                .new_transaction(
2237                    lock_keys![LockKey::object(
2238                        store.store_object_id(),
2239                        store.root_directory_object_id()
2240                    )],
2241                    Options::default(),
2242                )
2243                .await
2244                .expect("new_transaction failed");
2245            root_directory
2246                .create_child_file(&mut transaction, &format!("d"))
2247                .await
2248                .expect("create_child_file failed");
2249            transaction.commit().await.expect("commit failed");
2250        }
2251
2252        fs.close().await.expect("close failed");
2253        let device = fs.take_device().await;
2254        device.reopen(false);
2255
2256        let fs = FxFilesystem::open(device).await.expect("open failed");
2257        fsck(fs.clone()).await.expect("fsck failed");
2258        fs.close().await.expect("close failed");
2259    }
2260}
2261
2262#[cfg(fuzz)]
2263mod fuzz {
2264    use fuzz::fuzz;
2265
2266    #[fuzz]
2267    fn fuzz_journal_bytes(input: Vec<u8>) {
2268        use crate::filesystem::FxFilesystem;
2269        use fuchsia_async as fasync;
2270        use std::io::Write;
2271        use storage_device::DeviceHolder;
2272        use storage_device::fake_device::FakeDevice;
2273
2274        fasync::SendExecutorBuilder::new().num_threads(4).build().run(async move {
2275            let device = DeviceHolder::new(FakeDevice::new(32768, 512));
2276            let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2277            fs.journal().inner.lock().writer.write_all(&input).expect("write failed");
2278            fs.close().await.expect("close failed");
2279            let device = fs.take_device().await;
2280            device.reopen(false);
2281            if let Ok(fs) = FxFilesystem::open(device).await {
2282                // `close()` can fail if there were objects to be tombstoned. If the said object is
2283                // corrupted, there will be an error when we compact the journal.
2284                let _ = fs.close().await;
2285            }
2286        });
2287    }
2288
2289    #[fuzz]
2290    fn fuzz_journal(input: Vec<super::JournalRecord>) {
2291        use crate::filesystem::FxFilesystem;
2292        use fuchsia_async as fasync;
2293        use storage_device::DeviceHolder;
2294        use storage_device::fake_device::FakeDevice;
2295
2296        fasync::SendExecutorBuilder::new().num_threads(4).build().run(async move {
2297            let device = DeviceHolder::new(FakeDevice::new(32768, 512));
2298            let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2299            {
2300                let mut inner = fs.journal().inner.lock();
2301                for record in &input {
2302                    let _ = inner.writer.write_record(record);
2303                }
2304            }
2305            fs.close().await.expect("close failed");
2306            let device = fs.take_device().await;
2307            device.reopen(false);
2308            if let Ok(fs) = FxFilesystem::open(device).await {
2309                // `close()` can fail if there were objects to be tombstoned. If the said object is
2310                // corrupted, there will be an error when we compact the journal.
2311                let _ = fs.close().await;
2312            }
2313        });
2314    }
2315}