fxfs/object_store/
journal.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! The journal is implemented as an ever extending file which contains variable length records
6//! that describe mutations to be applied to various objects.  The journal file consists of
7//! blocks, with a checksum at the end of each block, but otherwise it can be considered a
8//! continuous stream.
9//!
10//! The checksum is seeded with the checksum from the previous block.  To free space in the
11//! journal, records are replaced with sparse extents when it is known they are no longer
12//! needed to mount.
13//!
14//! At mount time, the journal is replayed: the mutations are applied into memory.
15//! Eventually, a checksum failure will indicate no more records exist to be replayed,
16//! at which point the mount can continue and the journal will be extended from that point with
17//! further mutations as required.
18
19mod bootstrap_handle;
20mod checksum_list;
21mod reader;
22pub mod super_block;
23mod writer;
24
25use crate::checksum::{Checksum, Checksums, ChecksumsV38};
26use crate::errors::FxfsError;
27use crate::filesystem::{ApplyContext, ApplyMode, FxFilesystem, SyncOptions};
28use crate::log::*;
29use crate::lsm_tree::cache::NullCache;
30use crate::lsm_tree::types::Layer;
31use crate::object_handle::{ObjectHandle as _, ReadObjectHandle};
32use crate::object_store::allocator::Allocator;
33use crate::object_store::data_object_handle::OverwriteOptions;
34use crate::object_store::extent_record::{
35    DEFAULT_DATA_ATTRIBUTE_ID, ExtentKey, ExtentMode, ExtentValue,
36};
37use crate::object_store::graveyard::Graveyard;
38use crate::object_store::journal::bootstrap_handle::BootstrapObjectHandle;
39use crate::object_store::journal::checksum_list::ChecksumList;
40use crate::object_store::journal::reader::{JournalReader, ReadResult};
41use crate::object_store::journal::super_block::{
42    MIN_SUPER_BLOCK_SIZE, SuperBlockHeader, SuperBlockInstance, SuperBlockManager,
43};
44use crate::object_store::journal::writer::JournalWriter;
45use crate::object_store::object_manager::ObjectManager;
46use crate::object_store::object_record::{AttributeKey, ObjectKey, ObjectKeyData, ObjectValue};
47use crate::object_store::transaction::{
48    AllocatorMutation, LockKey, Mutation, MutationV40, MutationV41, MutationV43, MutationV46,
49    MutationV47, MutationV49, MutationV50, ObjectStoreMutation, Options,
50    TRANSACTION_MAX_JOURNAL_USAGE, Transaction, TxnMutation, lock_keys,
51};
52use crate::object_store::{
53    AssocObj, DataObjectHandle, HandleOptions, HandleOwner, INVALID_OBJECT_ID, Item, ItemRef,
54    NewChildStoreOptions, ObjectStore,
55};
56use crate::range::RangeExt;
57use crate::round::{round_div, round_down};
58use crate::serialized_types::{LATEST_VERSION, Migrate, Version, Versioned, migrate_to_version};
59use anyhow::{Context, Error, anyhow, bail, ensure};
60use event_listener::Event;
61use fprint::TypeFingerprint;
62use fuchsia_sync::Mutex;
63use futures::FutureExt as _;
64use futures::future::poll_fn;
65use rustc_hash::FxHashMap as HashMap;
66use serde::{Deserialize, Serialize};
67use static_assertions::const_assert;
68use std::clone::Clone;
69use std::collections::HashSet;
70use std::ops::{Bound, Range};
71use std::sync::atomic::{AtomicBool, Ordering};
72use std::sync::{Arc, OnceLock};
73use std::task::{Poll, Waker};
74use storage_device::Device;
75
76// The journal file is written to in blocks of this size.
77pub const BLOCK_SIZE: u64 = 4096;
78
79// The journal file is extended by this amount when necessary.
80const CHUNK_SIZE: u64 = 131_072;
81const_assert!(CHUNK_SIZE > TRANSACTION_MAX_JOURNAL_USAGE);
82
83// See the comment for the `reclaim_size` member of Inner.
84pub const DEFAULT_RECLAIM_SIZE: u64 = 262_144;
85
86// Temporary space that should be reserved for the journal.  For example: space that is currently
87// used in the journal file but cannot be deallocated yet because we are flushing.
88pub const RESERVED_SPACE: u64 = 1_048_576;
89
90// Whenever the journal is replayed (i.e. the system is unmounted and remounted), we reset the
91// journal stream, at which point any half-complete transactions are discarded.  We indicate a
92// journal reset by XORing the previous block's checksum with this mask, and using that value as a
93// seed for the next journal block.
94const RESET_XOR: u64 = 0xffffffffffffffff;
95
96// To keep track of offsets within a journal file, we need both the file offset and the check-sum of
97// the preceding block, since the check-sum of the preceding block is an input to the check-sum of
98// every block.
99pub type JournalCheckpoint = JournalCheckpointV32;
100
101#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize, TypeFingerprint)]
102pub struct JournalCheckpointV32 {
103    pub file_offset: u64,
104
105    // Starting check-sum for block that contains file_offset i.e. the checksum for the previous
106    // block.
107    pub checksum: Checksum,
108
109    // If versioned, the version of elements stored in the journal. e.g. JournalRecord version.
110    // This can change across reset events so we store it along with the offset and checksum to
111    // know which version to deserialize.
112    pub version: Version,
113}
114
115pub type JournalRecord = JournalRecordV50;
116
117#[allow(clippy::large_enum_variant)]
118#[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint, Versioned)]
119#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
120pub enum JournalRecordV50 {
121    // Indicates no more records in this block.
122    EndBlock,
123    // Mutation for a particular object.  object_id here is for the collection i.e. the store or
124    // allocator.
125    Mutation { object_id: u64, mutation: MutationV50 },
126    // Commits records in the transaction.
127    Commit,
128    // Discard all mutations with offsets greater than or equal to the given offset.
129    Discard(u64),
130    // Indicates the device was flushed at the given journal offset.
131    // Note that this really means that at this point in the journal offset, we can be certain that
132    // there's no remaining buffered data in the block device; the buffers and the disk contents are
133    // consistent.
134    // We insert one of these records *after* a flush along with the *next* transaction to go
135    // through.  If that never comes (either due to graceful or hard shutdown), the journal reset
136    // on the next mount will serve the same purpose and count as a flush, although it is necessary
137    // to defensively flush the device before replaying the journal (if possible, i.e. not
138    // read-only) in case the block device connection was reused.
139    DidFlushDevice(u64),
140    // Checksums for a data range written by this transaction. A transaction is only valid if these
141    // checksums are right. The range is the device offset the checksums are for.
142    //
143    // A boolean indicates whether this range is being written to for the first time. For overwrite
144    // extents, we only check the checksums for a block if it has been written to for the first
145    // time since the last flush, because otherwise we can't roll it back anyway so it doesn't
146    // matter. For copy-on-write extents, the bool is always true.
147    DataChecksums(Range<u64>, ChecksumsV38, bool),
148}
149
150#[allow(clippy::large_enum_variant)]
151#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
152#[migrate_to_version(JournalRecordV50)]
153pub enum JournalRecordV49 {
154    EndBlock,
155    Mutation { object_id: u64, mutation: MutationV49 },
156    Commit,
157    Discard(u64),
158    DidFlushDevice(u64),
159    DataChecksums(Range<u64>, ChecksumsV38, bool),
160}
161
162#[allow(clippy::large_enum_variant)]
163#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
164#[migrate_to_version(JournalRecordV49)]
165pub enum JournalRecordV47 {
166    EndBlock,
167    Mutation { object_id: u64, mutation: MutationV47 },
168    Commit,
169    Discard(u64),
170    DidFlushDevice(u64),
171    DataChecksums(Range<u64>, ChecksumsV38, bool),
172}
173
174#[allow(clippy::large_enum_variant)]
175#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
176#[migrate_to_version(JournalRecordV47)]
177pub enum JournalRecordV46 {
178    EndBlock,
179    Mutation { object_id: u64, mutation: MutationV46 },
180    Commit,
181    Discard(u64),
182    DidFlushDevice(u64),
183    DataChecksums(Range<u64>, ChecksumsV38, bool),
184}
185
186#[allow(clippy::large_enum_variant)]
187#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
188#[migrate_to_version(JournalRecordV46)]
189pub enum JournalRecordV43 {
190    EndBlock,
191    Mutation { object_id: u64, mutation: MutationV43 },
192    Commit,
193    Discard(u64),
194    DidFlushDevice(u64),
195    DataChecksums(Range<u64>, ChecksumsV38, bool),
196}
197
198#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
199#[migrate_to_version(JournalRecordV43)]
200pub enum JournalRecordV42 {
201    EndBlock,
202    Mutation { object_id: u64, mutation: MutationV41 },
203    Commit,
204    Discard(u64),
205    DidFlushDevice(u64),
206    DataChecksums(Range<u64>, ChecksumsV38, bool),
207}
208
209#[derive(Serialize, Deserialize, TypeFingerprint, Versioned)]
210pub enum JournalRecordV41 {
211    EndBlock,
212    Mutation { object_id: u64, mutation: MutationV41 },
213    Commit,
214    Discard(u64),
215    DidFlushDevice(u64),
216    DataChecksums(Range<u64>, ChecksumsV38),
217}
218
219impl From<JournalRecordV41> for JournalRecordV42 {
220    fn from(record: JournalRecordV41) -> Self {
221        match record {
222            JournalRecordV41::EndBlock => Self::EndBlock,
223            JournalRecordV41::Mutation { object_id, mutation } => {
224                Self::Mutation { object_id, mutation: mutation.into() }
225            }
226            JournalRecordV41::Commit => Self::Commit,
227            JournalRecordV41::Discard(offset) => Self::Discard(offset),
228            JournalRecordV41::DidFlushDevice(offset) => Self::DidFlushDevice(offset),
229            JournalRecordV41::DataChecksums(range, sums) => {
230                // At the time of writing the only extents written by real systems are CoW extents
231                // so the new bool is always true.
232                Self::DataChecksums(range, sums, true)
233            }
234        }
235    }
236}
237
238#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
239#[migrate_to_version(JournalRecordV41)]
240pub enum JournalRecordV40 {
241    EndBlock,
242    Mutation { object_id: u64, mutation: MutationV40 },
243    Commit,
244    Discard(u64),
245    DidFlushDevice(u64),
246    DataChecksums(Range<u64>, ChecksumsV38),
247}
248
249pub(super) fn journal_handle_options() -> HandleOptions {
250    HandleOptions { skip_journal_checks: true, ..Default::default() }
251}
252
253/// The journal records a stream of mutations that are to be applied to other objects.  At mount
254/// time, these records can be replayed into memory.  It provides a way to quickly persist changes
255/// without having to make a large number of writes; they can be deferred to a later time (e.g.
256/// when a sufficient number have been queued).  It also provides support for transactions, the
257/// ability to have mutations that are to be applied atomically together.
258pub struct Journal {
259    objects: Arc<ObjectManager>,
260    handle: OnceLock<DataObjectHandle<ObjectStore>>,
261    super_block_manager: SuperBlockManager,
262    inner: Mutex<Inner>,
263    writer_mutex: Mutex<()>,
264    sync_mutex: futures::lock::Mutex<()>,
265    trace: AtomicBool,
266
267    // This event is used when we are waiting for a compaction to free up journal space.
268    reclaim_event: Event,
269}
270
271struct Inner {
272    super_block_header: SuperBlockHeader,
273
274    // The offset that we can zero the journal up to now that it is no longer needed.
275    zero_offset: Option<u64>,
276
277    // The journal offset that we most recently flushed to the device.
278    device_flushed_offset: u64,
279
280    // If true, indicates a DidFlushDevice record is pending.
281    needs_did_flush_device: bool,
282
283    // The writer for the journal.
284    writer: JournalWriter,
285
286    // Set when a reset is encountered during a read.
287    // Used at write pre_commit() time to ensure we write a version first thing after a reset.
288    output_reset_version: bool,
289
290    // Waker for the flush task.
291    flush_waker: Option<Waker>,
292
293    // Indicates the journal has been terminated.
294    terminate: bool,
295
296    // Latched error indicating reason for journal termination if not graceful.
297    terminate_reason: Option<Error>,
298
299    // Disable compactions.
300    disable_compactions: bool,
301
302    // True if compactions are running.
303    compaction_running: bool,
304
305    // Waker for the sync task for when it's waiting for the flush task to finish.
306    sync_waker: Option<Waker>,
307
308    // The last offset we flushed to the journal file.
309    flushed_offset: u64,
310
311    // The last offset that should be considered valid in the journal file.  Most of the time, this
312    // will be the same as `flushed_offset` but at mount time, this could be less and will only be
313    // up to the end of the last valid transaction; it won't include transactions that follow that
314    // have been discarded.
315    valid_to: u64,
316
317    // If, after replaying, we have to discard a number of mutations (because they don't validate),
318    // this offset specifies where we need to discard back to.  This is so that when we next replay,
319    // we ignore those mutations and continue with new good mutations.
320    discard_offset: Option<u64>,
321
322    // In the steady state, the journal should fluctuate between being approximately half of this
323    // number and this number.  New super-blocks will be written every time about half of this
324    // amount is written to the journal.
325    reclaim_size: u64,
326
327    image_builder_mode: Option<SuperBlockInstance>,
328
329    // If true and `needs_barrier`, issue a pre-barrier on the first device write of each journal
330    // write (which happens in multiples of `BLOCK_SIZE`). This ensures that all the corresponding
331    // data writes make it to disk before the journal gets written to.
332    barriers_enabled: bool,
333
334    // If true, indicates that data write requests have been made to the device since the last
335    // journal write.
336    needs_barrier: bool,
337}
338
339impl Inner {
340    fn terminate(&mut self, reason: Option<Error>) {
341        self.terminate = true;
342
343        if let Some(err) = reason {
344            error!(error:? = err; "Terminating journal");
345            // Log previous error if one was already set, otherwise latch the error.
346            if let Some(prev_err) = self.terminate_reason.as_ref() {
347                error!(error:? = prev_err; "Journal previously terminated");
348            } else {
349                self.terminate_reason = Some(err);
350            }
351        }
352
353        if let Some(waker) = self.flush_waker.take() {
354            waker.wake();
355        }
356        if let Some(waker) = self.sync_waker.take() {
357            waker.wake();
358        }
359    }
360}
361
362pub struct JournalOptions {
363    /// In the steady state, the journal should fluctuate between being approximately half of this
364    /// number and this number.  New super-blocks will be written every time about half of this
365    /// amount is written to the journal.
366    pub reclaim_size: u64,
367
368    // If true, issue a pre-barrier on the first device write of each journal write (which happens
369    // in multiples of `BLOCK_SIZE`). This ensures that all the corresponding data writes make it
370    // to disk before the journal gets written to.
371    pub barriers_enabled: bool,
372}
373
374impl Default for JournalOptions {
375    fn default() -> Self {
376        JournalOptions { reclaim_size: DEFAULT_RECLAIM_SIZE, barriers_enabled: false }
377    }
378}
379
380struct JournaledTransactions {
381    transactions: Vec<JournaledTransaction>,
382    device_flushed_offset: u64,
383}
384
385#[derive(Debug, Default)]
386pub struct JournaledTransaction {
387    pub checkpoint: JournalCheckpoint,
388    pub root_parent_mutations: Vec<Mutation>,
389    pub root_mutations: Vec<Mutation>,
390    /// List of (store_object_id, mutation).
391    pub non_root_mutations: Vec<(u64, Mutation)>,
392    pub end_offset: u64,
393    pub checksums: Vec<JournaledChecksums>,
394
395    /// Records offset + 1 of the matching begin_flush transaction. The +1 is because we want to
396    /// ignore the begin flush
397    /// transaction; we don't need or want to replay it.
398    pub end_flush: Option<(/* store_id: */ u64, /* begin offset: */ u64)>,
399
400    /// The volume which was deleted in this transaction, if any.
401    pub volume_deleted: Option</* store_id: */ u64>,
402}
403
404impl JournaledTransaction {
405    fn new(checkpoint: JournalCheckpoint) -> Self {
406        Self { checkpoint, ..Default::default() }
407    }
408}
409
410const VOLUME_DELETED: u64 = u64::MAX;
411
412#[derive(Debug)]
413pub struct JournaledChecksums {
414    pub device_range: Range<u64>,
415    pub checksums: Checksums,
416    pub first_write: bool,
417}
418
419/// Handles for journal-like objects have some additional functionality to manage their extents,
420/// since during replay we need to add extents as we find them.
421pub trait JournalHandle: ReadObjectHandle {
422    /// The end offset of the last extent in the JournalHandle.  Used only for validating extents
423    /// (which will be skipped if None is returned).
424    /// Note this is equivalent in value to ReadObjectHandle::get_size, when present.
425    fn end_offset(&self) -> Option<u64>;
426    /// Adds an extent to the current end of the journal stream.
427    /// `added_offset` is the offset into the journal of the transaction which added this extent,
428    /// used for discard_extents.
429    fn push_extent(&mut self, added_offset: u64, device_range: Range<u64>);
430    /// Discards all extents which were added in a transaction at offset >= |discard_offset|.
431    fn discard_extents(&mut self, discard_offset: u64);
432}
433
434// Provide a stub implementation for DataObjectHandle so we can use it in
435// Journal::read_transactions.  Manual extent management is a NOP (which is OK since presumably the
436// DataObjectHandle already knows where its extents live).
437impl<S: HandleOwner> JournalHandle for DataObjectHandle<S> {
438    fn end_offset(&self) -> Option<u64> {
439        None
440    }
441    fn push_extent(&mut self, _added_offset: u64, _device_range: Range<u64>) {
442        // NOP
443    }
444    fn discard_extents(&mut self, _discard_offset: u64) {
445        // NOP
446    }
447}
448
449#[fxfs_trace::trace]
450impl Journal {
451    pub fn new(objects: Arc<ObjectManager>, options: JournalOptions) -> Journal {
452        let starting_checksum = rand::random_range(1..u64::MAX);
453        Journal {
454            objects: objects,
455            handle: OnceLock::new(),
456            super_block_manager: SuperBlockManager::new(),
457            inner: Mutex::new(Inner {
458                super_block_header: SuperBlockHeader::default(),
459                zero_offset: None,
460                device_flushed_offset: 0,
461                needs_did_flush_device: false,
462                writer: JournalWriter::new(BLOCK_SIZE as usize, starting_checksum),
463                output_reset_version: false,
464                flush_waker: None,
465                terminate: false,
466                terminate_reason: None,
467                disable_compactions: false,
468                compaction_running: false,
469                sync_waker: None,
470                flushed_offset: 0,
471                valid_to: 0,
472                discard_offset: None,
473                reclaim_size: options.reclaim_size,
474                image_builder_mode: None,
475                barriers_enabled: options.barriers_enabled,
476                needs_barrier: false,
477            }),
478            writer_mutex: Mutex::new(()),
479            sync_mutex: futures::lock::Mutex::new(()),
480            trace: AtomicBool::new(false),
481            reclaim_event: Event::new(),
482        }
483    }
484
485    pub fn set_trace(&self, trace: bool) {
486        let old_value = self.trace.swap(trace, Ordering::Relaxed);
487        if trace != old_value {
488            info!(trace; "J: trace");
489        }
490    }
491
492    pub fn set_image_builder_mode(&self, mode: Option<SuperBlockInstance>) {
493        self.inner.lock().image_builder_mode = mode;
494        if let Some(instance) = mode {
495            *self.super_block_manager.next_instance.lock() = instance;
496        }
497    }
498
499    pub fn image_builder_mode(&self) -> Option<SuperBlockInstance> {
500        self.inner.lock().image_builder_mode
501    }
502
503    #[cfg(feature = "migration")]
504    pub fn set_filesystem_uuid(&self, uuid: &[u8; 16]) -> Result<(), Error> {
505        ensure!(
506            self.inner.lock().image_builder_mode.is_some(),
507            "Can only set filesystem uuid in image builder mode."
508        );
509        self.inner.lock().super_block_header.guid.0 = uuid::Uuid::from_bytes(*uuid);
510        Ok(())
511    }
512
513    pub(crate) async fn read_superblocks(
514        &self,
515        device: Arc<dyn Device>,
516        block_size: u64,
517    ) -> Result<(SuperBlockHeader, ObjectStore), Error> {
518        self.super_block_manager.load(device, block_size).await
519    }
520
521    /// Used during replay to validate a mutation.  This should return false if the mutation is not
522    /// valid and should not be applied.  This could be for benign reasons: e.g. the device flushed
523    /// data out-of-order, or because of a malicious actor.
524    fn validate_mutation(&self, mutation: &Mutation, block_size: u64, device_size: u64) -> bool {
525        match mutation {
526            Mutation::ObjectStore(ObjectStoreMutation {
527                item:
528                    Item {
529                        key:
530                            ObjectKey {
531                                data:
532                                    ObjectKeyData::Attribute(
533                                        _,
534                                        AttributeKey::Extent(ExtentKey { range }),
535                                    ),
536                                ..
537                            },
538                        value: ObjectValue::Extent(ExtentValue::Some { device_offset, mode, .. }),
539                        ..
540                    },
541                ..
542            }) => {
543                if range.is_empty() || !range.is_aligned(block_size) {
544                    return false;
545                }
546                let len = range.length().unwrap();
547                if let ExtentMode::Cow(checksums) = mode {
548                    if checksums.len() > 0 {
549                        if len % checksums.len() as u64 != 0 {
550                            return false;
551                        }
552                        if (len / checksums.len() as u64) % block_size != 0 {
553                            return false;
554                        }
555                    }
556                }
557                if *device_offset % block_size != 0
558                    || *device_offset >= device_size
559                    || device_size - *device_offset < len
560                {
561                    return false;
562                }
563            }
564            Mutation::ObjectStore(_) => {}
565            Mutation::EncryptedObjectStore(_) => {}
566            Mutation::Allocator(AllocatorMutation::Allocate { device_range, owner_object_id }) => {
567                return !device_range.is_empty()
568                    && *owner_object_id != INVALID_OBJECT_ID
569                    && device_range.end <= device_size;
570            }
571            Mutation::Allocator(AllocatorMutation::Deallocate {
572                device_range,
573                owner_object_id,
574            }) => {
575                return !device_range.is_empty()
576                    && *owner_object_id != INVALID_OBJECT_ID
577                    && device_range.end <= device_size;
578            }
579            Mutation::Allocator(AllocatorMutation::MarkForDeletion(owner_object_id)) => {
580                return *owner_object_id != INVALID_OBJECT_ID;
581            }
582            Mutation::Allocator(AllocatorMutation::SetLimit { owner_object_id, .. }) => {
583                return *owner_object_id != INVALID_OBJECT_ID;
584            }
585            Mutation::BeginFlush => {}
586            Mutation::EndFlush => {}
587            Mutation::DeleteVolume => {}
588            Mutation::UpdateBorrowed(_) => {}
589            Mutation::UpdateMutationsKey(_) => {}
590            Mutation::CreateInternalDir(owner_object_id) => {
591                return *owner_object_id != INVALID_OBJECT_ID;
592            }
593        }
594        true
595    }
596
597    // Assumes that `mutation` has been validated.
598    fn update_checksum_list(
599        &self,
600        journal_offset: u64,
601        mutation: &Mutation,
602        checksum_list: &mut ChecksumList,
603    ) -> Result<(), Error> {
604        match mutation {
605            Mutation::ObjectStore(_) => {}
606            Mutation::Allocator(AllocatorMutation::Deallocate { device_range, .. }) => {
607                checksum_list.mark_deallocated(journal_offset, device_range.clone().into());
608            }
609            _ => {}
610        }
611        Ok(())
612    }
613
614    /// Reads the latest super-block, and then replays journaled records.
615    #[trace]
616    pub async fn replay(
617        &self,
618        filesystem: Arc<FxFilesystem>,
619        on_new_allocator: Option<Box<dyn Fn(Arc<Allocator>) + Send + Sync>>,
620    ) -> Result<(), Error> {
621        let block_size = filesystem.block_size();
622
623        let (super_block, root_parent) =
624            self.super_block_manager.load(filesystem.device(), block_size).await?;
625
626        let root_parent = Arc::new(ObjectStore::attach_filesystem(root_parent, filesystem.clone()));
627
628        self.objects.set_root_parent_store(root_parent.clone());
629        let allocator =
630            Arc::new(Allocator::new(filesystem.clone(), super_block.allocator_object_id));
631        if let Some(on_new_allocator) = on_new_allocator {
632            on_new_allocator(allocator.clone());
633        }
634        self.objects.set_allocator(allocator.clone());
635        self.objects.set_borrowed_metadata_space(super_block.borrowed_metadata_space);
636        self.objects.set_last_end_offset(super_block.super_block_journal_file_offset);
637        {
638            let mut inner = self.inner.lock();
639            inner.super_block_header = super_block.clone();
640        }
641
642        let device = filesystem.device();
643
644        let mut handle;
645        {
646            let root_parent_layer = root_parent.tree().mutable_layer();
647            let mut iter = root_parent_layer
648                .seek(Bound::Included(&ObjectKey::attribute(
649                    super_block.journal_object_id,
650                    DEFAULT_DATA_ATTRIBUTE_ID,
651                    AttributeKey::Extent(ExtentKey::search_key_from_offset(round_down(
652                        super_block.journal_checkpoint.file_offset,
653                        BLOCK_SIZE,
654                    ))),
655                )))
656                .await
657                .context("Failed to seek root parent store")?;
658            let start_offset = if let Some(ItemRef {
659                key:
660                    ObjectKey {
661                        data:
662                            ObjectKeyData::Attribute(
663                                DEFAULT_DATA_ATTRIBUTE_ID,
664                                AttributeKey::Extent(ExtentKey { range }),
665                            ),
666                        ..
667                    },
668                ..
669            }) = iter.get()
670            {
671                range.start
672            } else {
673                0
674            };
675            handle = BootstrapObjectHandle::new_with_start_offset(
676                super_block.journal_object_id,
677                device.clone(),
678                start_offset,
679            );
680            while let Some(item) = iter.get() {
681                if !match item.into() {
682                    Some((
683                        object_id,
684                        DEFAULT_DATA_ATTRIBUTE_ID,
685                        ExtentKey { range },
686                        ExtentValue::Some { device_offset, .. },
687                    )) if object_id == super_block.journal_object_id => {
688                        if let Some(end_offset) = handle.end_offset() {
689                            if range.start != end_offset {
690                                bail!(anyhow!(FxfsError::Inconsistent).context(format!(
691                                    "Unexpected journal extent {:?}, expected start: {}",
692                                    item, end_offset
693                                )));
694                            }
695                        }
696                        handle.push_extent(
697                            0, // We never discard extents from the root parent store.
698                            *device_offset
699                                ..*device_offset + range.length().context("Invalid extent")?,
700                        );
701                        true
702                    }
703                    _ => false,
704                } {
705                    break;
706                }
707                iter.advance().await.context("Failed to advance root parent store iterator")?;
708            }
709        }
710
711        let mut reader = JournalReader::new(handle, &super_block.journal_checkpoint);
712        let JournaledTransactions { mut transactions, device_flushed_offset } = self
713            .read_transactions(&mut reader, None, INVALID_OBJECT_ID)
714            .await
715            .context("Reading transactions for replay")?;
716
717        // Validate all the mutations.
718        let mut checksum_list = ChecksumList::new(device_flushed_offset);
719        let mut valid_to = reader.journal_file_checkpoint().file_offset;
720        let device_size = device.size();
721        'bad_replay: for JournaledTransaction {
722            checkpoint,
723            root_parent_mutations,
724            root_mutations,
725            non_root_mutations,
726            checksums,
727            ..
728        } in &transactions
729        {
730            for JournaledChecksums { device_range, checksums, first_write } in checksums {
731                checksum_list
732                    .push(
733                        checkpoint.file_offset,
734                        device_range.clone(),
735                        checksums.maybe_as_ref().context("Malformed checksums")?,
736                        *first_write,
737                    )
738                    .context("Pushing journal checksum records to checksum list")?;
739            }
740            for mutation in root_parent_mutations
741                .iter()
742                .chain(root_mutations)
743                .chain(non_root_mutations.iter().map(|(_, m)| m))
744            {
745                if !self.validate_mutation(mutation, block_size, device_size) {
746                    info!(mutation:?; "Stopping replay at bad mutation");
747                    valid_to = checkpoint.file_offset;
748                    break 'bad_replay;
749                }
750                self.update_checksum_list(checkpoint.file_offset, &mutation, &mut checksum_list)?;
751            }
752        }
753
754        // Validate the checksums. Note if barriers are enabled, there will be no checksums in
755        // practice to verify.
756        let valid_to = checksum_list
757            .verify(device.as_ref(), valid_to)
758            .await
759            .context("Failed to validate checksums")?;
760
761        // Apply the mutations...
762
763        let mut last_checkpoint = reader.journal_file_checkpoint();
764        let mut journal_offsets = super_block.journal_file_offsets.clone();
765
766        // Start with the root-parent mutations, and also determine the journal offsets for all
767        // other objects.
768        for (
769            index,
770            JournaledTransaction {
771                checkpoint,
772                root_parent_mutations,
773                end_flush,
774                volume_deleted,
775                ..
776            },
777        ) in transactions.iter_mut().enumerate()
778        {
779            if checkpoint.file_offset >= valid_to {
780                last_checkpoint = checkpoint.clone();
781
782                // Truncate the transactions so we don't need to worry about them on the next pass.
783                transactions.truncate(index);
784                break;
785            }
786
787            let context = ApplyContext { mode: ApplyMode::Replay, checkpoint: checkpoint.clone() };
788            for mutation in root_parent_mutations.drain(..) {
789                self.objects
790                    .apply_mutation(
791                        super_block.root_parent_store_object_id,
792                        mutation,
793                        &context,
794                        AssocObj::None,
795                    )
796                    .context("Failed to replay root parent store mutations")?;
797            }
798
799            if let Some((object_id, journal_offset)) = end_flush {
800                journal_offsets.insert(*object_id, *journal_offset);
801            }
802
803            if let Some(object_id) = volume_deleted {
804                journal_offsets.insert(*object_id, VOLUME_DELETED);
805            }
806        }
807
808        // Now we can open the root store.
809        let root_store = ObjectStore::open(
810            &root_parent,
811            super_block.root_store_object_id,
812            Box::new(NullCache {}),
813        )
814        .await
815        .context("Unable to open root store")?;
816
817        ensure!(
818            !root_store.is_encrypted(),
819            anyhow!(FxfsError::Inconsistent).context("Root store is encrypted")
820        );
821        self.objects.set_root_store(root_store);
822
823        let root_store_offset =
824            journal_offsets.get(&super_block.root_store_object_id).copied().unwrap_or(0);
825
826        // Now replay the root store mutations.
827        for JournaledTransaction { checkpoint, root_mutations, .. } in &mut transactions {
828            if checkpoint.file_offset < root_store_offset {
829                continue;
830            }
831
832            let context = ApplyContext { mode: ApplyMode::Replay, checkpoint: checkpoint.clone() };
833            for mutation in root_mutations.drain(..) {
834                self.objects
835                    .apply_mutation(
836                        super_block.root_store_object_id,
837                        mutation,
838                        &context,
839                        AssocObj::None,
840                    )
841                    .context("Failed to replay root store mutations")?;
842            }
843        }
844
845        // Now we can open the allocator.
846        allocator.open().await.context("Failed to open allocator")?;
847
848        // Now replay all other mutations.
849        for JournaledTransaction { checkpoint, non_root_mutations, end_offset, .. } in transactions
850        {
851            self.objects
852                .replay_mutations(
853                    non_root_mutations,
854                    &journal_offsets,
855                    &ApplyContext { mode: ApplyMode::Replay, checkpoint },
856                    end_offset,
857                )
858                .await
859                .context("Failed to replay mutations")?;
860        }
861
862        allocator.on_replay_complete().await.context("Failed to complete replay for allocator")?;
863
864        let discarded_to =
865            if last_checkpoint.file_offset != reader.journal_file_checkpoint().file_offset {
866                Some(reader.journal_file_checkpoint().file_offset)
867            } else {
868                None
869            };
870
871        // Configure the journal writer so that we can continue.
872        {
873            if last_checkpoint.file_offset < super_block.super_block_journal_file_offset {
874                return Err(anyhow!(FxfsError::Inconsistent).context(format!(
875                    "journal replay cut short; journal finishes at {}, but super-block was \
876                     written at {}",
877                    last_checkpoint.file_offset, super_block.super_block_journal_file_offset
878                )));
879            }
880            let handle = ObjectStore::open_object(
881                &root_parent,
882                super_block.journal_object_id,
883                journal_handle_options(),
884                None,
885            )
886            .await
887            .with_context(|| {
888                format!(
889                    "Failed to open journal file (object id: {})",
890                    super_block.journal_object_id
891                )
892            })?;
893            let _ = self.handle.set(handle);
894            let mut inner = self.inner.lock();
895            reader.skip_to_end_of_block();
896            let mut writer_checkpoint = reader.journal_file_checkpoint();
897
898            // Make sure we don't accidentally use the reader from now onwards.
899            std::mem::drop(reader);
900
901            // Reset the stream to indicate that we've remounted the journal.
902            writer_checkpoint.checksum ^= RESET_XOR;
903            writer_checkpoint.version = LATEST_VERSION;
904            inner.flushed_offset = writer_checkpoint.file_offset;
905
906            // When we open the filesystem as writable, we flush the device.
907            inner.device_flushed_offset = inner.flushed_offset;
908
909            inner.writer.seek(writer_checkpoint);
910            inner.output_reset_version = true;
911            inner.valid_to = last_checkpoint.file_offset;
912            if last_checkpoint.file_offset < inner.flushed_offset {
913                inner.discard_offset = Some(last_checkpoint.file_offset);
914            }
915        }
916
917        self.objects
918            .on_replay_complete()
919            .await
920            .context("Failed to complete replay for object manager")?;
921
922        info!(checkpoint = last_checkpoint.file_offset, discarded_to; "replay complete");
923        Ok(())
924    }
925
926    async fn read_transactions(
927        &self,
928        reader: &mut JournalReader,
929        end_offset: Option<u64>,
930        object_id_filter: u64,
931    ) -> Result<JournaledTransactions, Error> {
932        let mut transactions = Vec::new();
933        let (mut device_flushed_offset, root_parent_store_object_id, root_store_object_id) = {
934            let super_block = &self.inner.lock().super_block_header;
935            (
936                super_block.super_block_journal_file_offset,
937                super_block.root_parent_store_object_id,
938                super_block.root_store_object_id,
939            )
940        };
941        let mut current_transaction = None;
942        let mut begin_flush_offsets = HashMap::default();
943        let mut stores_deleted = HashSet::new();
944        loop {
945            // Cache the checkpoint before we deserialize a record.
946            let checkpoint = reader.journal_file_checkpoint();
947            if let Some(end_offset) = end_offset {
948                if checkpoint.file_offset >= end_offset {
949                    break;
950                }
951            }
952            let result =
953                reader.deserialize().await.context("Failed to deserialize journal record")?;
954            match result {
955                ReadResult::Reset(_) => {
956                    if current_transaction.is_some() {
957                        current_transaction = None;
958                        transactions.pop();
959                    }
960                    let offset = reader.journal_file_checkpoint().file_offset;
961                    if offset > device_flushed_offset {
962                        device_flushed_offset = offset;
963                    }
964                }
965                ReadResult::Some(record) => {
966                    match record {
967                        JournalRecord::EndBlock => {
968                            reader.skip_to_end_of_block();
969                        }
970                        JournalRecord::Mutation { object_id, mutation } => {
971                            let current_transaction = match current_transaction.as_mut() {
972                                None => {
973                                    transactions.push(JournaledTransaction::new(checkpoint));
974                                    current_transaction = transactions.last_mut();
975                                    current_transaction.as_mut().unwrap()
976                                }
977                                Some(transaction) => transaction,
978                            };
979
980                            if stores_deleted.contains(&object_id) {
981                                bail!(
982                                    anyhow!(FxfsError::Inconsistent)
983                                        .context("Encountered mutations for deleted store")
984                                );
985                            }
986
987                            match &mutation {
988                                Mutation::BeginFlush => {
989                                    begin_flush_offsets.insert(
990                                        object_id,
991                                        current_transaction.checkpoint.file_offset,
992                                    );
993                                }
994                                Mutation::EndFlush => {
995                                    if let Some(offset) = begin_flush_offsets.remove(&object_id) {
996                                        if let Some(deleted_volume) =
997                                            &current_transaction.volume_deleted
998                                        {
999                                            if *deleted_volume == object_id {
1000                                                bail!(anyhow!(FxfsError::Inconsistent).context(
1001                                                    "Multiple EndFlush/DeleteVolume mutations in a \
1002                                                    single transaction for the same object"
1003                                                ));
1004                                            }
1005                                        }
1006                                        // The +1 is because we don't want to replay the transaction
1007                                        // containing the begin flush.
1008                                        if current_transaction
1009                                            .end_flush
1010                                            .replace((object_id, offset + 1))
1011                                            .is_some()
1012                                        {
1013                                            bail!(anyhow!(FxfsError::Inconsistent).context(
1014                                                "Multiple EndFlush mutations in a \
1015                                                 single transaction"
1016                                            ));
1017                                        }
1018                                    }
1019                                }
1020                                Mutation::DeleteVolume => {
1021                                    if let Some((flushed_object, _)) =
1022                                        &current_transaction.end_flush
1023                                    {
1024                                        if *flushed_object == object_id {
1025                                            bail!(anyhow!(FxfsError::Inconsistent).context(
1026                                                "Multiple EndFlush/DeleteVolume mutations in a \
1027                                                    single transaction for the same object"
1028                                            ));
1029                                        }
1030                                    }
1031                                    if current_transaction
1032                                        .volume_deleted
1033                                        .replace(object_id)
1034                                        .is_some()
1035                                    {
1036                                        bail!(anyhow!(FxfsError::Inconsistent).context(
1037                                            "Multiple DeleteVolume mutations in a single \
1038                                             transaction"
1039                                        ));
1040                                    }
1041                                    stores_deleted.insert(object_id);
1042                                }
1043                                _ => {}
1044                            }
1045
1046                            // If this mutation doesn't need to be applied, don't bother adding it
1047                            // to the transaction.
1048                            if (object_id_filter == INVALID_OBJECT_ID
1049                                || object_id_filter == object_id)
1050                                && self.should_apply(object_id, &current_transaction.checkpoint)
1051                            {
1052                                if object_id == root_parent_store_object_id {
1053                                    current_transaction.root_parent_mutations.push(mutation);
1054                                } else if object_id == root_store_object_id {
1055                                    current_transaction.root_mutations.push(mutation);
1056                                } else {
1057                                    current_transaction
1058                                        .non_root_mutations
1059                                        .push((object_id, mutation));
1060                                }
1061                            }
1062                        }
1063                        JournalRecord::DataChecksums(device_range, checksums, first_write) => {
1064                            let current_transaction = match current_transaction.as_mut() {
1065                                None => {
1066                                    transactions.push(JournaledTransaction::new(checkpoint));
1067                                    current_transaction = transactions.last_mut();
1068                                    current_transaction.as_mut().unwrap()
1069                                }
1070                                Some(transaction) => transaction,
1071                            };
1072                            current_transaction.checksums.push(JournaledChecksums {
1073                                device_range,
1074                                checksums,
1075                                first_write,
1076                            });
1077                        }
1078                        JournalRecord::Commit => {
1079                            if let Some(&mut JournaledTransaction {
1080                                ref checkpoint,
1081                                ref root_parent_mutations,
1082                                ref mut end_offset,
1083                                ..
1084                            }) = current_transaction.take()
1085                            {
1086                                for mutation in root_parent_mutations {
1087                                    // Snoop the mutations for any that might apply to the journal
1088                                    // file so that we can pass them to the reader so that it can
1089                                    // read the journal file.
1090                                    if let Mutation::ObjectStore(ObjectStoreMutation {
1091                                        item:
1092                                            Item {
1093                                                key:
1094                                                    ObjectKey {
1095                                                        object_id,
1096                                                        data:
1097                                                            ObjectKeyData::Attribute(
1098                                                                DEFAULT_DATA_ATTRIBUTE_ID,
1099                                                                AttributeKey::Extent(ExtentKey {
1100                                                                    range,
1101                                                                }),
1102                                                            ),
1103                                                        ..
1104                                                    },
1105                                                value:
1106                                                    ObjectValue::Extent(ExtentValue::Some {
1107                                                        device_offset,
1108                                                        ..
1109                                                    }),
1110                                                ..
1111                                            },
1112                                        ..
1113                                    }) = mutation
1114                                    {
1115                                        // Add the journal extents we find on the way to our
1116                                        // reader.
1117                                        let handle = reader.handle();
1118                                        if *object_id != handle.object_id() {
1119                                            continue;
1120                                        }
1121                                        if let Some(end_offset) = handle.end_offset() {
1122                                            if range.start != end_offset {
1123                                                bail!(anyhow!(FxfsError::Inconsistent).context(
1124                                                    format!(
1125                                                        "Unexpected journal extent {:?} -> {}, \
1126                                                           expected start: {}",
1127                                                        range, device_offset, end_offset,
1128                                                    )
1129                                                ));
1130                                            }
1131                                        }
1132                                        handle.push_extent(
1133                                            checkpoint.file_offset,
1134                                            *device_offset
1135                                                ..*device_offset
1136                                                    + range.length().context("Invalid extent")?,
1137                                        );
1138                                    }
1139                                }
1140                                *end_offset = reader.journal_file_checkpoint().file_offset;
1141                            }
1142                        }
1143                        JournalRecord::Discard(offset) => {
1144                            if offset == 0 {
1145                                bail!(
1146                                    anyhow!(FxfsError::Inconsistent)
1147                                        .context("Invalid offset for Discard")
1148                                );
1149                            }
1150                            if let Some(transaction) = current_transaction.as_ref() {
1151                                if transaction.checkpoint.file_offset < offset {
1152                                    // Odd, but OK.
1153                                    continue;
1154                                }
1155                            }
1156                            current_transaction = None;
1157                            while let Some(transaction) = transactions.last() {
1158                                if transaction.checkpoint.file_offset < offset {
1159                                    break;
1160                                }
1161                                transactions.pop();
1162                            }
1163                            reader.handle().discard_extents(offset);
1164                        }
1165                        JournalRecord::DidFlushDevice(offset) => {
1166                            if offset > device_flushed_offset {
1167                                device_flushed_offset = offset;
1168                            }
1169                        }
1170                    }
1171                }
1172                // This is expected when we reach the end of the journal stream.
1173                ReadResult::ChecksumMismatch => break,
1174            }
1175        }
1176
1177        // Discard any uncommitted transaction.
1178        if current_transaction.is_some() {
1179            transactions.pop();
1180        }
1181
1182        Ok(JournaledTransactions { transactions, device_flushed_offset })
1183    }
1184
1185    /// Creates an empty filesystem with the minimum viable objects (including a root parent and
1186    /// root store but no further child stores).
1187    pub async fn init_empty(&self, filesystem: Arc<FxFilesystem>) -> Result<(), Error> {
1188        // The following constants are only used at format time. When mounting, the recorded values
1189        // in the superblock should be used.  The root parent store does not have a parent, but
1190        // needs an object ID to be registered with ObjectManager, so it cannot collide (i.e. have
1191        // the same object ID) with any objects in the root store that use the journal to track
1192        // mutations.
1193        const INIT_ROOT_PARENT_STORE_OBJECT_ID: u64 = 3;
1194        const INIT_ROOT_STORE_OBJECT_ID: u64 = 4;
1195        const INIT_ALLOCATOR_OBJECT_ID: u64 = 5;
1196
1197        info!(device_size = filesystem.device().size(); "Formatting");
1198
1199        let checkpoint = JournalCheckpoint {
1200            version: LATEST_VERSION,
1201            ..self.inner.lock().writer.journal_file_checkpoint()
1202        };
1203
1204        let mut current_generation = 1;
1205        if filesystem.options().image_builder_mode.is_some() {
1206            // Note that in non-image_builder_mode we write both superblocks when we format
1207            // (in FxFilesystemBuilder::open). In image_builder_mode we only write once at the end
1208            // as part of finalize(), which is why we must make sure the generation we write is newer
1209            // than any existing generation.
1210            let block_size = filesystem.device().block_size();
1211            if block_size as u64 == MIN_SUPER_BLOCK_SIZE {
1212                match self.read_superblocks(filesystem.device(), block_size.into()).await {
1213                    Ok((super_block, _)) => {
1214                        log::info!(
1215                            "Found existing superblock with generation {}. Bumping by 1.",
1216                            super_block.generation
1217                        );
1218                        current_generation = super_block.generation.wrapping_add(1);
1219                    }
1220                    Err(_) => {
1221                        // TODO(https://fxbug.dev/463757813): It's not unusual to fail to read
1222                        // superblocks when we're formatting a new filesystem but we should probably
1223                        // fail the format if we get an IO error.
1224                    }
1225                }
1226            }
1227        }
1228
1229        let root_parent = ObjectStore::new_empty(
1230            None,
1231            INIT_ROOT_PARENT_STORE_OBJECT_ID,
1232            filesystem.clone(),
1233            Box::new(NullCache {}),
1234        );
1235        self.objects.set_root_parent_store(root_parent.clone());
1236
1237        let allocator = Arc::new(Allocator::new(filesystem.clone(), INIT_ALLOCATOR_OBJECT_ID));
1238        self.objects.set_allocator(allocator.clone());
1239        self.objects.init_metadata_reservation()?;
1240
1241        let journal_handle;
1242        let super_block_a_handle;
1243        let super_block_b_handle;
1244        let root_store;
1245        let mut transaction = filesystem
1246            .clone()
1247            .new_transaction(
1248                lock_keys![],
1249                Options { skip_journal_checks: true, ..Default::default() },
1250            )
1251            .await?;
1252        root_store = root_parent
1253            .new_child_store(
1254                &mut transaction,
1255                NewChildStoreOptions { object_id: INIT_ROOT_STORE_OBJECT_ID, ..Default::default() },
1256                Box::new(NullCache {}),
1257            )
1258            .await
1259            .context("new_child_store")?;
1260        self.objects.set_root_store(root_store.clone());
1261
1262        allocator.create(&mut transaction).await?;
1263
1264        // Create the super-block objects...
1265        super_block_a_handle = ObjectStore::create_object_with_id(
1266            &root_store,
1267            &mut transaction,
1268            SuperBlockInstance::A.object_id(),
1269            HandleOptions::default(),
1270            None,
1271        )
1272        .context("create super block")?;
1273        root_store.update_last_object_id(SuperBlockInstance::A.object_id());
1274        super_block_a_handle
1275            .extend(&mut transaction, SuperBlockInstance::A.first_extent())
1276            .await
1277            .context("extend super block")?;
1278        super_block_b_handle = ObjectStore::create_object_with_id(
1279            &root_store,
1280            &mut transaction,
1281            SuperBlockInstance::B.object_id(),
1282            HandleOptions::default(),
1283            None,
1284        )
1285        .context("create super block")?;
1286        root_store.update_last_object_id(SuperBlockInstance::B.object_id());
1287        super_block_b_handle
1288            .extend(&mut transaction, SuperBlockInstance::B.first_extent())
1289            .await
1290            .context("extend super block")?;
1291
1292        // the journal object...
1293        journal_handle = ObjectStore::create_object(
1294            &root_parent,
1295            &mut transaction,
1296            journal_handle_options(),
1297            None,
1298        )
1299        .await
1300        .context("create journal")?;
1301        if self.inner.lock().image_builder_mode.is_none() {
1302            let mut file_range = 0..self.chunk_size();
1303            journal_handle
1304                .preallocate_range(&mut transaction, &mut file_range)
1305                .await
1306                .context("preallocate journal")?;
1307            if file_range.start < file_range.end {
1308                bail!("preallocate_range returned too little space");
1309            }
1310        }
1311
1312        // Write the root store object info.
1313        root_store.create(&mut transaction).await?;
1314
1315        // The root parent graveyard.
1316        root_parent
1317            .set_graveyard_directory_object_id(Graveyard::create(&mut transaction, &root_parent));
1318
1319        transaction.commit().await?;
1320
1321        self.inner.lock().super_block_header = SuperBlockHeader::new(
1322            current_generation,
1323            root_parent.store_object_id(),
1324            root_parent.graveyard_directory_object_id(),
1325            root_store.store_object_id(),
1326            allocator.object_id(),
1327            journal_handle.object_id(),
1328            checkpoint,
1329            /* earliest_version: */ LATEST_VERSION,
1330        );
1331
1332        // Initialize the journal writer.
1333        let _ = self.handle.set(journal_handle);
1334        Ok(())
1335    }
1336
1337    /// Normally we allocate the journal when creating the filesystem.
1338    /// This is used image_builder_mode when journal allocation is done last.
1339    pub async fn allocate_journal(&self) -> Result<(), Error> {
1340        let handle = self.handle.get().unwrap();
1341        let filesystem = handle.store().filesystem();
1342        let mut transaction = filesystem
1343            .clone()
1344            .new_transaction(
1345                lock_keys![LockKey::object(handle.store().store_object_id(), handle.object_id()),],
1346                Options { skip_journal_checks: true, ..Default::default() },
1347            )
1348            .await?;
1349        let mut file_range = 0..self.chunk_size();
1350        self.handle
1351            .get()
1352            .unwrap()
1353            .preallocate_range(&mut transaction, &mut file_range)
1354            .await
1355            .context("preallocate journal")?;
1356        if file_range.start < file_range.end {
1357            bail!("preallocate_range returned too little space");
1358        }
1359        transaction.commit().await?;
1360        Ok(())
1361    }
1362
1363    pub async fn init_superblocks(&self) -> Result<(), Error> {
1364        // Overwrite both superblocks.
1365        for _ in 0..2 {
1366            self.write_super_block().await?;
1367        }
1368        Ok(())
1369    }
1370
1371    /// Takes a snapshot of all journaled transactions which affect |object_id| since its last
1372    /// flush.
1373    /// The caller is responsible for locking; it must ensure that the journal is not trimmed during
1374    /// this call.  For example, a Flush lock could be held on the object in question (assuming that
1375    /// object has data to flush and is registered with ObjectManager).
1376    pub async fn read_transactions_for_object(
1377        &self,
1378        object_id: u64,
1379    ) -> Result<Vec<JournaledTransaction>, Error> {
1380        let handle = self.handle.get().expect("No journal handle");
1381        // Reopen the handle since JournalReader needs an owned handle.
1382        let handle = ObjectStore::open_object(
1383            handle.owner(),
1384            handle.object_id(),
1385            journal_handle_options(),
1386            None,
1387        )
1388        .await?;
1389
1390        let checkpoint = match self.objects.journal_checkpoint(object_id) {
1391            Some(checkpoint) => checkpoint,
1392            None => return Ok(vec![]),
1393        };
1394        let mut reader = JournalReader::new(handle, &checkpoint);
1395        // Record the current end offset and only read to there, so we don't accidentally read any
1396        // partially flushed blocks.
1397        let end_offset = self.inner.lock().valid_to;
1398        Ok(self.read_transactions(&mut reader, Some(end_offset), object_id).await?.transactions)
1399    }
1400
1401    /// Commits a transaction.  This is not thread safe; the caller must take appropriate locks.
1402    pub async fn commit(&self, transaction: &mut Transaction<'_>) -> Result<u64, Error> {
1403        if transaction.is_empty() {
1404            return Ok(self.inner.lock().writer.journal_file_checkpoint().file_offset);
1405        }
1406
1407        self.pre_commit(transaction).await?;
1408        Ok(self.write_and_apply_mutations(transaction))
1409    }
1410
1411    // Before we commit, we might need to extend the journal or write pending records to the
1412    // journal.
1413    async fn pre_commit(&self, transaction: &Transaction<'_>) -> Result<(), Error> {
1414        let handle;
1415
1416        let (size, zero_offset) = {
1417            let mut inner = self.inner.lock();
1418
1419            // If this is the first write after a RESET, we need to output version first.
1420            if std::mem::take(&mut inner.output_reset_version) {
1421                LATEST_VERSION.serialize_into(&mut inner.writer)?;
1422            }
1423
1424            if let Some(discard_offset) = inner.discard_offset {
1425                JournalRecord::Discard(discard_offset).serialize_into(&mut inner.writer)?;
1426                inner.discard_offset = None;
1427            }
1428
1429            if inner.needs_did_flush_device {
1430                let offset = inner.device_flushed_offset;
1431                JournalRecord::DidFlushDevice(offset).serialize_into(&mut inner.writer)?;
1432                inner.needs_did_flush_device = false;
1433            }
1434
1435            handle = match self.handle.get() {
1436                None => return Ok(()),
1437                Some(x) => x,
1438            };
1439
1440            let file_offset = inner.writer.journal_file_checkpoint().file_offset;
1441
1442            let size = handle.get_size();
1443            let size = if file_offset + self.chunk_size() > size { Some(size) } else { None };
1444
1445            if size.is_none()
1446                && inner.zero_offset.is_none()
1447                && !self.objects.needs_borrow_for_journal(file_offset)
1448            {
1449                return Ok(());
1450            }
1451
1452            (size, inner.zero_offset)
1453        };
1454
1455        let mut transaction = handle
1456            .new_transaction_with_options(Options {
1457                skip_journal_checks: true,
1458                borrow_metadata_space: true,
1459                allocator_reservation: Some(self.objects.metadata_reservation()),
1460                txn_guard: Some(transaction.txn_guard()),
1461                ..Default::default()
1462            })
1463            .await?;
1464        if let Some(size) = size {
1465            handle
1466                .preallocate_range(&mut transaction, &mut (size..size + self.chunk_size()))
1467                .await?;
1468        }
1469        if let Some(zero_offset) = zero_offset {
1470            handle.zero(&mut transaction, 0..zero_offset).await?;
1471        }
1472
1473        // We can't use regular transaction commit, because that can cause re-entrancy issues, so
1474        // instead we just apply the transaction directly here.
1475        self.write_and_apply_mutations(&mut transaction);
1476
1477        let mut inner = self.inner.lock();
1478
1479        // Make sure the transaction to extend the journal made it to the journal within the old
1480        // size, since otherwise, it won't be possible to replay.
1481        if let Some(size) = size {
1482            assert!(inner.writer.journal_file_checkpoint().file_offset < size);
1483        }
1484
1485        if inner.zero_offset == zero_offset {
1486            inner.zero_offset = None;
1487        }
1488
1489        Ok(())
1490    }
1491
1492    // Determines whether a mutation at the given checkpoint should be applied.  During replay, not
1493    // all records should be applied because the object store or allocator might already contain the
1494    // mutation.  After replay, that obviously isn't the case and we want to apply all mutations.
1495    fn should_apply(&self, object_id: u64, journal_file_checkpoint: &JournalCheckpoint) -> bool {
1496        let super_block_header = &self.inner.lock().super_block_header;
1497        let offset = super_block_header
1498            .journal_file_offsets
1499            .get(&object_id)
1500            .cloned()
1501            .unwrap_or(super_block_header.super_block_journal_file_offset);
1502        journal_file_checkpoint.file_offset >= offset
1503    }
1504
1505    /// Flushes previous writes to the device and then writes out a new super-block.
1506    /// Callers must ensure that we do not make concurrent calls.
1507    async fn write_super_block(&self) -> Result<(), Error> {
1508        let root_parent_store = self.objects.root_parent_store();
1509
1510        // We need to flush previous writes to the device since the new super-block we are writing
1511        // relies on written data being observable, and we also need to lock the root parent store
1512        // so that no new entries are written to it whilst we are writing the super-block, and for
1513        // that we use the write lock.
1514        let old_layers;
1515        let old_super_block_offset;
1516        let mut new_super_block_header;
1517        let checkpoint;
1518        let borrowed;
1519
1520        {
1521            let _sync_guard = debug_assert_not_too_long!(self.sync_mutex.lock());
1522            {
1523                let _write_guard = self.writer_mutex.lock();
1524                (checkpoint, borrowed) = self.pad_to_block()?;
1525                old_layers = super_block::compact_root_parent(&*root_parent_store)?;
1526            }
1527            self.flush_device(checkpoint.file_offset)
1528                .await
1529                .context("flush failed when writing superblock")?;
1530        }
1531
1532        new_super_block_header = self.inner.lock().super_block_header.clone();
1533
1534        old_super_block_offset = new_super_block_header.journal_checkpoint.file_offset;
1535
1536        let (journal_file_offsets, min_checkpoint) = self.objects.journal_file_offsets();
1537
1538        new_super_block_header.generation = new_super_block_header.generation.wrapping_add(1);
1539        new_super_block_header.super_block_journal_file_offset = checkpoint.file_offset;
1540        new_super_block_header.journal_checkpoint = min_checkpoint.unwrap_or(checkpoint);
1541        new_super_block_header.journal_checkpoint.version = LATEST_VERSION;
1542        new_super_block_header.journal_file_offsets = journal_file_offsets;
1543        new_super_block_header.borrowed_metadata_space = borrowed;
1544
1545        self.super_block_manager
1546            .save(
1547                new_super_block_header.clone(),
1548                self.objects.root_parent_store().filesystem(),
1549                old_layers,
1550            )
1551            .await?;
1552        {
1553            let mut inner = self.inner.lock();
1554            inner.super_block_header = new_super_block_header;
1555            inner.zero_offset = Some(round_down(old_super_block_offset, BLOCK_SIZE));
1556        }
1557
1558        Ok(())
1559    }
1560
1561    /// Flushes any buffered journal data to the device.  Note that this does not flush the device
1562    /// unless the flush_device option is set, in which case data should have been persisted to
1563    /// lower layers.  If a precondition is supplied, it is evaluated and the sync will be skipped
1564    /// if it returns false.  This allows callers to check a condition whilst a lock is held.  If a
1565    /// sync is performed, this function returns the checkpoint that was flushed and the amount of
1566    /// borrowed metadata space at the point it was flushed.
1567    pub async fn sync(
1568        &self,
1569        options: SyncOptions<'_>,
1570    ) -> Result<Option<(JournalCheckpoint, u64)>, Error> {
1571        let _guard = debug_assert_not_too_long!(self.sync_mutex.lock());
1572
1573        let (checkpoint, borrowed) = {
1574            if let Some(precondition) = options.precondition {
1575                if !precondition() {
1576                    return Ok(None);
1577                }
1578            }
1579
1580            // This guard is required so that we don't insert an EndBlock record in the middle of a
1581            // transaction.
1582            let _guard = self.writer_mutex.lock();
1583
1584            self.pad_to_block()?
1585        };
1586
1587        if options.flush_device {
1588            self.flush_device(checkpoint.file_offset).await.context("sync: flush failed")?;
1589        }
1590
1591        Ok(Some((checkpoint, borrowed)))
1592    }
1593
1594    // Returns the checkpoint as it was prior to padding.  This is done because the super block
1595    // needs to record where the last transaction ends and it's the next transaction that pays the
1596    // price of the padding.
1597    fn pad_to_block(&self) -> Result<(JournalCheckpoint, u64), Error> {
1598        let mut inner = self.inner.lock();
1599        let checkpoint = inner.writer.journal_file_checkpoint();
1600        if checkpoint.file_offset % BLOCK_SIZE != 0 {
1601            JournalRecord::EndBlock.serialize_into(&mut inner.writer)?;
1602            inner.writer.pad_to_block()?;
1603            if let Some(waker) = inner.flush_waker.take() {
1604                waker.wake();
1605            }
1606        }
1607        Ok((checkpoint, self.objects.borrowed_metadata_space()))
1608    }
1609
1610    async fn flush_device(&self, checkpoint_offset: u64) -> Result<(), Error> {
1611        debug_assert_not_too_long!(poll_fn(|ctx| {
1612            let mut inner = self.inner.lock();
1613            if inner.flushed_offset >= checkpoint_offset {
1614                Poll::Ready(Ok(()))
1615            } else if inner.terminate {
1616                let context = inner
1617                    .terminate_reason
1618                    .as_ref()
1619                    .map(|e| format!("Journal closed with error: {:?}", e))
1620                    .unwrap_or_else(|| "Journal closed".to_string());
1621                Poll::Ready(Err(anyhow!(FxfsError::JournalFlushError).context(context)))
1622            } else {
1623                inner.sync_waker = Some(ctx.waker().clone());
1624                Poll::Pending
1625            }
1626        }))?;
1627
1628        let needs_flush = self.inner.lock().device_flushed_offset < checkpoint_offset;
1629        if needs_flush {
1630            let trace = self.trace.load(Ordering::Relaxed);
1631            if trace {
1632                info!("J: start flush device");
1633            }
1634            self.handle.get().unwrap().flush_device().await?;
1635            if trace {
1636                info!("J: end flush device");
1637            }
1638
1639            // We need to write a DidFlushDevice record at some point, but if we are in the
1640            // process of shutting down the filesystem, we want to leave the journal clean to
1641            // avoid there being log messages complaining about unwritten journal data, so we
1642            // queue it up so that the next transaction will trigger this record to be written.
1643            // If we are shutting down, that will never happen but since the DidFlushDevice
1644            // message is purely advisory (it reduces the number of checksums we have to verify
1645            // during replay), it doesn't matter if it isn't written.
1646            {
1647                let mut inner = self.inner.lock();
1648                inner.device_flushed_offset = checkpoint_offset;
1649                inner.needs_did_flush_device = true;
1650            }
1651
1652            // Tell the allocator that we flushed the device so that it can now start using
1653            // space that was deallocated.
1654            self.objects.allocator().did_flush_device(checkpoint_offset);
1655            if trace {
1656                info!("J: did flush device");
1657            }
1658        }
1659
1660        Ok(())
1661    }
1662
1663    /// Returns a copy of the super-block header.
1664    pub fn super_block_header(&self) -> SuperBlockHeader {
1665        self.inner.lock().super_block_header.clone()
1666    }
1667
1668    /// Waits for there to be sufficient space in the journal.
1669    pub async fn check_journal_space(&self) -> Result<(), Error> {
1670        loop {
1671            debug_assert_not_too_long!({
1672                let inner = self.inner.lock();
1673                if inner.terminate {
1674                    // If the flush error is set, this will never make progress, since we can't
1675                    // extend the journal any more.
1676                    let context = inner
1677                        .terminate_reason
1678                        .as_ref()
1679                        .map(|e| format!("Journal closed with error: {:?}", e))
1680                        .unwrap_or_else(|| "Journal closed".to_string());
1681                    break Err(anyhow!(FxfsError::JournalFlushError).context(context));
1682                }
1683                if self.objects.last_end_offset()
1684                    - inner.super_block_header.journal_checkpoint.file_offset
1685                    < inner.reclaim_size
1686                {
1687                    break Ok(());
1688                }
1689                if inner.image_builder_mode.is_some() {
1690                    break Ok(());
1691                }
1692                if inner.disable_compactions {
1693                    break Err(
1694                        anyhow!(FxfsError::JournalFlushError).context("Compactions disabled")
1695                    );
1696                }
1697                self.reclaim_event.listen()
1698            });
1699        }
1700    }
1701
1702    fn chunk_size(&self) -> u64 {
1703        CHUNK_SIZE
1704    }
1705
1706    fn write_and_apply_mutations(&self, transaction: &mut Transaction<'_>) -> u64 {
1707        let checkpoint_before;
1708        let checkpoint_after;
1709        {
1710            let _guard = self.writer_mutex.lock();
1711            checkpoint_before = {
1712                let mut inner = self.inner.lock();
1713                if transaction.includes_write() {
1714                    inner.needs_barrier = true;
1715                }
1716                let checkpoint = inner.writer.journal_file_checkpoint();
1717                for TxnMutation { object_id, mutation, .. } in transaction.mutations() {
1718                    self.objects.write_mutation(
1719                        *object_id,
1720                        mutation,
1721                        Writer(*object_id, &mut inner.writer),
1722                    );
1723                }
1724                checkpoint
1725            };
1726            let maybe_mutation =
1727                self.objects.apply_transaction(transaction, &checkpoint_before).expect(
1728                    "apply_transaction should not fail in live mode; \
1729                     filesystem will be in an inconsistent state",
1730                );
1731            checkpoint_after = {
1732                let mut inner = self.inner.lock();
1733                if let Some(mutation) = maybe_mutation {
1734                    inner
1735                        .writer
1736                        .write_record(&JournalRecord::Mutation { object_id: 0, mutation })
1737                        .unwrap();
1738                }
1739                for (device_range, checksums, first_write) in
1740                    transaction.take_checksums().into_iter()
1741                {
1742                    inner
1743                        .writer
1744                        .write_record(&JournalRecord::DataChecksums(
1745                            device_range,
1746                            Checksums::fletcher(checksums),
1747                            first_write,
1748                        ))
1749                        .unwrap();
1750                }
1751                inner.writer.write_record(&JournalRecord::Commit).unwrap();
1752
1753                inner.writer.journal_file_checkpoint()
1754            };
1755        }
1756        self.objects.did_commit_transaction(
1757            transaction,
1758            &checkpoint_before,
1759            checkpoint_after.file_offset,
1760        );
1761
1762        if let Some(waker) = self.inner.lock().flush_waker.take() {
1763            waker.wake();
1764        }
1765
1766        checkpoint_before.file_offset
1767    }
1768
1769    /// This task will flush journal data to the device when there is data that needs flushing, and
1770    /// trigger compactions when short of journal space.  It will return after the terminate method
1771    /// has been called, or an error is encountered with either flushing or compaction.
1772    pub async fn flush_task(self: Arc<Self>) {
1773        let mut flush_fut = None;
1774        let mut compact_fut = None;
1775        let mut flush_error = false;
1776        poll_fn(|ctx| {
1777            loop {
1778                {
1779                    let mut inner = self.inner.lock();
1780                    if flush_fut.is_none() && !flush_error && self.handle.get().is_some() {
1781                        let flushable = inner.writer.flushable_bytes();
1782                        if flushable > 0 {
1783                            flush_fut = Some(Box::pin(self.flush(flushable)));
1784                        }
1785                    }
1786                    if inner.terminate && flush_fut.is_none() && compact_fut.is_none() {
1787                        return Poll::Ready(());
1788                    }
1789                    // The / 2 is here because after compacting, we cannot reclaim the space until the
1790                    // _next_ time we flush the device since the super-block is not guaranteed to
1791                    // persist until then.
1792                    if compact_fut.is_none()
1793                        && !inner.terminate
1794                        && !inner.disable_compactions
1795                        && inner.image_builder_mode.is_none()
1796                        && self.objects.last_end_offset()
1797                            - inner.super_block_header.journal_checkpoint.file_offset
1798                            > inner.reclaim_size / 2
1799                    {
1800                        compact_fut = Some(Box::pin(self.compact()));
1801                        inner.compaction_running = true;
1802                    }
1803                    inner.flush_waker = Some(ctx.waker().clone());
1804                }
1805                let mut pending = true;
1806                if let Some(fut) = flush_fut.as_mut() {
1807                    if let Poll::Ready(result) = fut.poll_unpin(ctx) {
1808                        if let Err(e) = result {
1809                            self.inner.lock().terminate(Some(e.context("Flush error")));
1810                            self.reclaim_event.notify(usize::MAX);
1811                            flush_error = true;
1812                        }
1813                        flush_fut = None;
1814                        pending = false;
1815                    }
1816                }
1817                if let Some(fut) = compact_fut.as_mut() {
1818                    if let Poll::Ready(result) = fut.poll_unpin(ctx) {
1819                        let mut inner = self.inner.lock();
1820                        if let Err(e) = result {
1821                            inner.terminate(Some(e.context("Compaction error")));
1822                        }
1823                        compact_fut = None;
1824                        inner.compaction_running = false;
1825                        self.reclaim_event.notify(usize::MAX);
1826                        pending = false;
1827                    }
1828                }
1829                if pending {
1830                    return Poll::Pending;
1831                }
1832            }
1833        })
1834        .await;
1835    }
1836
1837    async fn flush(&self, amount: usize) -> Result<(), Error> {
1838        let handle = self.handle.get().unwrap();
1839        let mut buf = handle.allocate_buffer(amount).await;
1840        let (offset, len, barrier_on_first_write) = {
1841            let mut inner = self.inner.lock();
1842            let offset = inner.writer.take_flushable(buf.as_mut());
1843            let barrier_on_first_write = inner.needs_barrier && inner.barriers_enabled;
1844            // Reset `needs_barrier` before instead of after the overwrite in case a txn commit
1845            // that contains data happens during the overwrite.
1846            inner.needs_barrier = false;
1847            (offset, buf.len() as u64, barrier_on_first_write)
1848        };
1849        self.handle
1850            .get()
1851            .unwrap()
1852            .overwrite(
1853                offset,
1854                buf.as_mut(),
1855                OverwriteOptions { barrier_on_first_write, ..Default::default() },
1856            )
1857            .await?;
1858
1859        let mut inner = self.inner.lock();
1860        if let Some(waker) = inner.sync_waker.take() {
1861            waker.wake();
1862        }
1863        inner.flushed_offset = offset + len;
1864        inner.valid_to = inner.flushed_offset;
1865        Ok(())
1866    }
1867
1868    /// This should generally NOT be called externally. It is public to allow use by FIDL service
1869    /// fxfs.Debug.
1870    #[trace]
1871    pub async fn compact(&self) -> Result<(), Error> {
1872        let trace = self.trace.load(Ordering::Relaxed);
1873        debug!("Compaction starting");
1874        if trace {
1875            info!("J: start compaction");
1876        }
1877        let earliest_version = self.objects.flush().await.context("Failed to flush objects")?;
1878        self.inner.lock().super_block_header.earliest_version = earliest_version;
1879        self.write_super_block().await.context("Failed to write superblock")?;
1880        if trace {
1881            info!("J: end compaction");
1882        }
1883        debug!("Compaction finished");
1884        Ok(())
1885    }
1886
1887    pub async fn stop_compactions(&self) {
1888        loop {
1889            debug_assert_not_too_long!({
1890                let mut inner = self.inner.lock();
1891                inner.disable_compactions = true;
1892                if !inner.compaction_running {
1893                    return;
1894                }
1895                self.reclaim_event.listen()
1896            });
1897        }
1898    }
1899
1900    /// Creates a lazy inspect node named `str` under `parent` which will yield statistics for the
1901    /// journal when queried.
1902    pub fn track_statistics(self: &Arc<Self>, parent: &fuchsia_inspect::Node, name: &str) {
1903        let this = Arc::downgrade(self);
1904        parent.record_lazy_child(name, move || {
1905            let this_clone = this.clone();
1906            async move {
1907                let inspector = fuchsia_inspect::Inspector::default();
1908                if let Some(this) = this_clone.upgrade() {
1909                    let (journal_min, journal_max, journal_reclaim_size) = {
1910                        // TODO(https://fxbug.dev/42069513): Push-back or rate-limit to prevent DoS.
1911                        let inner = this.inner.lock();
1912                        (
1913                            round_down(
1914                                inner.super_block_header.journal_checkpoint.file_offset,
1915                                BLOCK_SIZE,
1916                            ),
1917                            inner.flushed_offset,
1918                            inner.reclaim_size,
1919                        )
1920                    };
1921                    let root = inspector.root();
1922                    root.record_uint("journal_min_offset", journal_min);
1923                    root.record_uint("journal_max_offset", journal_max);
1924                    root.record_uint("journal_size", journal_max - journal_min);
1925                    root.record_uint("journal_reclaim_size", journal_reclaim_size);
1926
1927                    // TODO(https://fxbug.dev/42068224): Post-compute rather than manually computing metrics.
1928                    if let Some(x) = round_div(
1929                        100 * (journal_max - journal_min),
1930                        this.objects.allocator().get_disk_bytes(),
1931                    ) {
1932                        root.record_uint("journal_size_to_disk_size_percent", x);
1933                    }
1934                }
1935                Ok(inspector)
1936            }
1937            .boxed()
1938        });
1939    }
1940
1941    /// Terminate all journal activity.
1942    pub fn terminate(&self) {
1943        self.inner.lock().terminate(/*reason*/ None);
1944        self.reclaim_event.notify(usize::MAX);
1945    }
1946}
1947
1948/// Wrapper to allow records to be written to the journal.
1949pub struct Writer<'a>(u64, &'a mut JournalWriter);
1950
1951impl Writer<'_> {
1952    pub fn write(&mut self, mutation: Mutation) {
1953        self.1.write_record(&JournalRecord::Mutation { object_id: self.0, mutation }).unwrap();
1954    }
1955}
1956
1957#[cfg(test)]
1958mod tests {
1959    use crate::filesystem::{FxFilesystem, FxFilesystemBuilder, SyncOptions};
1960    use crate::fsck::fsck;
1961    use crate::object_handle::{ObjectHandle, ReadObjectHandle, WriteObjectHandle};
1962    use crate::object_store::directory::Directory;
1963    use crate::object_store::journal::SuperBlockInstance;
1964    use crate::object_store::transaction::Options;
1965    use crate::object_store::volume::root_volume;
1966    use crate::object_store::{
1967        HandleOptions, LockKey, NewChildStoreOptions, ObjectStore, StoreOptions, lock_keys,
1968    };
1969    use fuchsia_async as fasync;
1970    use fuchsia_async::MonotonicDuration;
1971    use storage_device::DeviceHolder;
1972    use storage_device::fake_device::FakeDevice;
1973
1974    const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
1975
1976    #[fuchsia::test]
1977    async fn test_replay() {
1978        const TEST_DATA: &[u8] = b"hello";
1979
1980        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1981
1982        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1983
1984        let object_id = {
1985            let root_store = fs.root_store();
1986            let root_directory =
1987                Directory::open(&root_store, root_store.root_directory_object_id())
1988                    .await
1989                    .expect("open failed");
1990            let mut transaction = fs
1991                .clone()
1992                .new_transaction(
1993                    lock_keys![LockKey::object(
1994                        root_store.store_object_id(),
1995                        root_store.root_directory_object_id(),
1996                    )],
1997                    Options::default(),
1998                )
1999                .await
2000                .expect("new_transaction failed");
2001            let handle = root_directory
2002                .create_child_file(&mut transaction, "test")
2003                .await
2004                .expect("create_child_file failed");
2005
2006            transaction.commit().await.expect("commit failed");
2007            let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
2008            buf.as_mut_slice().copy_from_slice(TEST_DATA);
2009            handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2010            // As this is the first sync, this will actually trigger a new super-block, but normally
2011            // this would not be the case.
2012            fs.sync(SyncOptions::default()).await.expect("sync failed");
2013            handle.object_id()
2014        };
2015
2016        {
2017            fs.close().await.expect("Close failed");
2018            let device = fs.take_device().await;
2019            device.reopen(false);
2020            let fs = FxFilesystem::open(device).await.expect("open failed");
2021            let handle = ObjectStore::open_object(
2022                &fs.root_store(),
2023                object_id,
2024                HandleOptions::default(),
2025                None,
2026            )
2027            .await
2028            .expect("open_object failed");
2029            let mut buf = handle.allocate_buffer(TEST_DEVICE_BLOCK_SIZE as usize).await;
2030            assert_eq!(handle.read(0, buf.as_mut()).await.expect("read failed"), TEST_DATA.len());
2031            assert_eq!(&buf.as_slice()[..TEST_DATA.len()], TEST_DATA);
2032            fsck(fs.clone()).await.expect("fsck failed");
2033            fs.close().await.expect("Close failed");
2034        }
2035    }
2036
2037    #[fuchsia::test]
2038    async fn test_reset() {
2039        const TEST_DATA: &[u8] = b"hello";
2040
2041        let device = DeviceHolder::new(FakeDevice::new(32768, TEST_DEVICE_BLOCK_SIZE));
2042
2043        let mut object_ids = Vec::new();
2044
2045        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2046        {
2047            let root_store = fs.root_store();
2048            let root_directory =
2049                Directory::open(&root_store, root_store.root_directory_object_id())
2050                    .await
2051                    .expect("open failed");
2052            let mut transaction = fs
2053                .clone()
2054                .new_transaction(
2055                    lock_keys![LockKey::object(
2056                        root_store.store_object_id(),
2057                        root_store.root_directory_object_id(),
2058                    )],
2059                    Options::default(),
2060                )
2061                .await
2062                .expect("new_transaction failed");
2063            let handle = root_directory
2064                .create_child_file(&mut transaction, "test")
2065                .await
2066                .expect("create_child_file failed");
2067            transaction.commit().await.expect("commit failed");
2068            let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
2069            buf.as_mut_slice().copy_from_slice(TEST_DATA);
2070            handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2071            fs.sync(SyncOptions::default()).await.expect("sync failed");
2072            object_ids.push(handle.object_id());
2073
2074            // Create a lot of objects but don't sync at the end. This should leave the filesystem
2075            // with a half finished transaction that cannot be replayed.
2076            for i in 0..1000 {
2077                let mut transaction = fs
2078                    .clone()
2079                    .new_transaction(
2080                        lock_keys![LockKey::object(
2081                            root_store.store_object_id(),
2082                            root_store.root_directory_object_id(),
2083                        )],
2084                        Options::default(),
2085                    )
2086                    .await
2087                    .expect("new_transaction failed");
2088                let handle = root_directory
2089                    .create_child_file(&mut transaction, &format!("{}", i))
2090                    .await
2091                    .expect("create_child_file failed");
2092                transaction.commit().await.expect("commit failed");
2093                let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
2094                buf.as_mut_slice().copy_from_slice(TEST_DATA);
2095                handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2096                object_ids.push(handle.object_id());
2097            }
2098        }
2099        fs.close().await.expect("fs close failed");
2100        let device = fs.take_device().await;
2101        device.reopen(false);
2102        let fs = FxFilesystem::open(device).await.expect("open failed");
2103        fsck(fs.clone()).await.expect("fsck failed");
2104        {
2105            let root_store = fs.root_store();
2106            // Check the first two objects which should exist.
2107            for &object_id in &object_ids[0..1] {
2108                let handle = ObjectStore::open_object(
2109                    &root_store,
2110                    object_id,
2111                    HandleOptions::default(),
2112                    None,
2113                )
2114                .await
2115                .expect("open_object failed");
2116                let mut buf = handle.allocate_buffer(TEST_DEVICE_BLOCK_SIZE as usize).await;
2117                assert_eq!(
2118                    handle.read(0, buf.as_mut()).await.expect("read failed"),
2119                    TEST_DATA.len()
2120                );
2121                assert_eq!(&buf.as_slice()[..TEST_DATA.len()], TEST_DATA);
2122            }
2123
2124            // Write one more object and sync.
2125            let root_directory =
2126                Directory::open(&root_store, root_store.root_directory_object_id())
2127                    .await
2128                    .expect("open failed");
2129            let mut transaction = fs
2130                .clone()
2131                .new_transaction(
2132                    lock_keys![LockKey::object(
2133                        root_store.store_object_id(),
2134                        root_store.root_directory_object_id(),
2135                    )],
2136                    Options::default(),
2137                )
2138                .await
2139                .expect("new_transaction failed");
2140            let handle = root_directory
2141                .create_child_file(&mut transaction, "test2")
2142                .await
2143                .expect("create_child_file failed");
2144            transaction.commit().await.expect("commit failed");
2145            let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
2146            buf.as_mut_slice().copy_from_slice(TEST_DATA);
2147            handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2148            fs.sync(SyncOptions::default()).await.expect("sync failed");
2149            object_ids.push(handle.object_id());
2150        }
2151
2152        fs.close().await.expect("close failed");
2153        let device = fs.take_device().await;
2154        device.reopen(false);
2155        let fs = FxFilesystem::open(device).await.expect("open failed");
2156        {
2157            fsck(fs.clone()).await.expect("fsck failed");
2158
2159            // Check the first two and the last objects.
2160            for &object_id in object_ids[0..1].iter().chain(object_ids.last().cloned().iter()) {
2161                let handle = ObjectStore::open_object(
2162                    &fs.root_store(),
2163                    object_id,
2164                    HandleOptions::default(),
2165                    None,
2166                )
2167                .await
2168                .unwrap_or_else(|e| {
2169                    panic!("open_object failed (object_id: {}): {:?}", object_id, e)
2170                });
2171                let mut buf = handle.allocate_buffer(TEST_DEVICE_BLOCK_SIZE as usize).await;
2172                assert_eq!(
2173                    handle.read(0, buf.as_mut()).await.expect("read failed"),
2174                    TEST_DATA.len()
2175                );
2176                assert_eq!(&buf.as_slice()[..TEST_DATA.len()], TEST_DATA);
2177            }
2178        }
2179        fs.close().await.expect("close failed");
2180    }
2181
2182    #[fuchsia::test]
2183    async fn test_discard() {
2184        let device = {
2185            let device = DeviceHolder::new(FakeDevice::new(8192, 4096));
2186            let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2187            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
2188
2189            let store = root_volume
2190                .new_volume("test", NewChildStoreOptions::default())
2191                .await
2192                .expect("new_volume failed");
2193            let root_directory = Directory::open(&store, store.root_directory_object_id())
2194                .await
2195                .expect("open failed");
2196
2197            // Create enough data so that another journal extent is used.
2198            let mut i = 0;
2199            loop {
2200                let mut transaction = fs
2201                    .clone()
2202                    .new_transaction(
2203                        lock_keys![LockKey::object(
2204                            store.store_object_id(),
2205                            store.root_directory_object_id()
2206                        )],
2207                        Options::default(),
2208                    )
2209                    .await
2210                    .expect("new_transaction failed");
2211                root_directory
2212                    .create_child_file(&mut transaction, &format!("a {i}"))
2213                    .await
2214                    .expect("create_child_file failed");
2215                if transaction.commit().await.expect("commit failed") > super::CHUNK_SIZE {
2216                    break;
2217                }
2218                i += 1;
2219            }
2220
2221            // Compact and then disable compactions.
2222            fs.journal().compact().await.expect("compact failed");
2223            fs.journal().stop_compactions().await;
2224
2225            // Keep going until we need another journal extent.
2226            let mut i = 0;
2227            loop {
2228                let mut transaction = fs
2229                    .clone()
2230                    .new_transaction(
2231                        lock_keys![LockKey::object(
2232                            store.store_object_id(),
2233                            store.root_directory_object_id()
2234                        )],
2235                        Options::default(),
2236                    )
2237                    .await
2238                    .expect("new_transaction failed");
2239                root_directory
2240                    .create_child_file(&mut transaction, &format!("b {i}"))
2241                    .await
2242                    .expect("create_child_file failed");
2243                if transaction.commit().await.expect("commit failed") > 2 * super::CHUNK_SIZE {
2244                    break;
2245                }
2246                i += 1;
2247            }
2248
2249            // Allow the journal to flush, but we don't want to sync.
2250            fasync::Timer::new(MonotonicDuration::from_millis(10)).await;
2251            // Because we're not gracefully closing the filesystem, a Discard record will be
2252            // emitted.
2253            fs.device().snapshot().expect("snapshot failed")
2254        };
2255
2256        let fs = FxFilesystem::open(device).await.expect("open failed");
2257
2258        {
2259            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
2260
2261            let store =
2262                root_volume.volume("test", StoreOptions::default()).await.expect("volume failed");
2263
2264            let root_directory = Directory::open(&store, store.root_directory_object_id())
2265                .await
2266                .expect("open failed");
2267
2268            // Write one more transaction.
2269            let mut transaction = fs
2270                .clone()
2271                .new_transaction(
2272                    lock_keys![LockKey::object(
2273                        store.store_object_id(),
2274                        store.root_directory_object_id()
2275                    )],
2276                    Options::default(),
2277                )
2278                .await
2279                .expect("new_transaction failed");
2280            root_directory
2281                .create_child_file(&mut transaction, &format!("d"))
2282                .await
2283                .expect("create_child_file failed");
2284            transaction.commit().await.expect("commit failed");
2285        }
2286
2287        fs.close().await.expect("close failed");
2288        let device = fs.take_device().await;
2289        device.reopen(false);
2290
2291        let fs = FxFilesystem::open(device).await.expect("open failed");
2292        fsck(fs.clone()).await.expect("fsck failed");
2293        fs.close().await.expect("close failed");
2294    }
2295
2296    #[fuchsia::test]
2297    async fn test_use_existing_generation() {
2298        let device = DeviceHolder::new(FakeDevice::new(8192, 4096));
2299
2300        // First format should be generation 1.
2301        let fs = FxFilesystemBuilder::new()
2302            .format(true)
2303            .image_builder_mode(Some(SuperBlockInstance::A))
2304            .open(device)
2305            .await
2306            .expect("open failed");
2307        let generation0 = fs.super_block_header().generation;
2308        assert_eq!(generation0, 1);
2309        fs.close().await.expect("close failed");
2310        let device = fs.take_device().await;
2311        device.reopen(false);
2312
2313        // Format the device normally (again, generation 1).
2314        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2315        let generation1 = fs.super_block_header().generation;
2316        {
2317            let root_volume = crate::object_store::volume::root_volume(fs.clone())
2318                .await
2319                .expect("root_volume failed");
2320            root_volume
2321                .new_volume("test", crate::object_store::NewChildStoreOptions::default())
2322                .await
2323                .expect("new_volume failed");
2324        }
2325        fs.close().await.expect("close failed");
2326        let device = fs.take_device().await;
2327        device.reopen(false);
2328
2329        // Format again with image_builder_mode.
2330        let fs = FxFilesystemBuilder::new()
2331            .format(true)
2332            .image_builder_mode(Some(SuperBlockInstance::A))
2333            .open(device)
2334            .await
2335            .expect("open failed");
2336        let generation2 = fs.super_block_header().generation;
2337        assert!(
2338            generation2 > generation1,
2339            "generation2 ({}) should be greater than generation1 ({})",
2340            generation2,
2341            generation1
2342        );
2343        fs.close().await.expect("close failed");
2344    }
2345}
2346
2347#[cfg(fuzz)]
2348mod fuzz {
2349    use fuzz::fuzz;
2350
2351    #[fuzz]
2352    fn fuzz_journal_bytes(input: Vec<u8>) {
2353        use crate::filesystem::FxFilesystem;
2354        use fuchsia_async as fasync;
2355        use std::io::Write;
2356        use storage_device::DeviceHolder;
2357        use storage_device::fake_device::FakeDevice;
2358
2359        fasync::SendExecutorBuilder::new().num_threads(4).build().run(async move {
2360            let device = DeviceHolder::new(FakeDevice::new(32768, 512));
2361            let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2362            fs.journal().inner.lock().writer.write_all(&input).expect("write failed");
2363            fs.close().await.expect("close failed");
2364            let device = fs.take_device().await;
2365            device.reopen(false);
2366            if let Ok(fs) = FxFilesystem::open(device).await {
2367                // `close()` can fail if there were objects to be tombstoned. If the said object is
2368                // corrupted, there will be an error when we compact the journal.
2369                let _ = fs.close().await;
2370            }
2371        });
2372    }
2373
2374    #[fuzz]
2375    fn fuzz_journal(input: Vec<super::JournalRecord>) {
2376        use crate::filesystem::FxFilesystem;
2377        use fuchsia_async as fasync;
2378        use storage_device::DeviceHolder;
2379        use storage_device::fake_device::FakeDevice;
2380
2381        fasync::SendExecutorBuilder::new().num_threads(4).build().run(async move {
2382            let device = DeviceHolder::new(FakeDevice::new(32768, 512));
2383            let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2384            {
2385                let mut inner = fs.journal().inner.lock();
2386                for record in &input {
2387                    let _ = inner.writer.write_record(record);
2388                }
2389            }
2390            fs.close().await.expect("close failed");
2391            let device = fs.take_device().await;
2392            device.reopen(false);
2393            if let Ok(fs) = FxFilesystem::open(device).await {
2394                // `close()` can fail if there were objects to be tombstoned. If the said object is
2395                // corrupted, there will be an error when we compact the journal.
2396                let _ = fs.close().await;
2397            }
2398        });
2399    }
2400}