fxfs/object_store/
journal.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! The journal is implemented as an ever extending file which contains variable length records
6//! that describe mutations to be applied to various objects.  The journal file consists of
7//! blocks, with a checksum at the end of each block, but otherwise it can be considered a
8//! continuous stream.
9//!
10//! The checksum is seeded with the checksum from the previous block.  To free space in the
11//! journal, records are replaced with sparse extents when it is known they are no longer
12//! needed to mount.
13//!
14//! At mount time, the journal is replayed: the mutations are applied into memory.
15//! Eventually, a checksum failure will indicate no more records exist to be replayed,
16//! at which point the mount can continue and the journal will be extended from that point with
17//! further mutations as required.
18
19mod bootstrap_handle;
20mod checksum_list;
21mod reader;
22pub mod super_block;
23mod writer;
24
25use crate::checksum::{Checksum, Checksums, ChecksumsV38};
26use crate::debug_assert_not_too_long;
27use crate::errors::FxfsError;
28use crate::filesystem::{ApplyContext, ApplyMode, FxFilesystem, SyncOptions};
29use crate::log::*;
30use crate::lsm_tree::cache::NullCache;
31use crate::lsm_tree::types::Layer;
32use crate::object_handle::{ObjectHandle as _, ReadObjectHandle};
33use crate::object_store::allocator::Allocator;
34use crate::object_store::extent_record::{
35    ExtentKey, ExtentMode, ExtentValue, DEFAULT_DATA_ATTRIBUTE_ID,
36};
37use crate::object_store::graveyard::Graveyard;
38use crate::object_store::journal::bootstrap_handle::BootstrapObjectHandle;
39use crate::object_store::journal::checksum_list::ChecksumList;
40use crate::object_store::journal::reader::{JournalReader, ReadResult};
41use crate::object_store::journal::super_block::{
42    SuperBlockHeader, SuperBlockInstance, SuperBlockManager,
43};
44use crate::object_store::journal::writer::JournalWriter;
45use crate::object_store::object_manager::ObjectManager;
46use crate::object_store::object_record::{AttributeKey, ObjectKey, ObjectKeyData, ObjectValue};
47use crate::object_store::transaction::{
48    lock_keys, AllocatorMutation, LockKey, Mutation, MutationV40, MutationV41, MutationV43,
49    ObjectStoreMutation, Options, Transaction, TxnMutation, TRANSACTION_MAX_JOURNAL_USAGE,
50};
51use crate::object_store::{
52    AssocObj, DataObjectHandle, HandleOptions, HandleOwner, Item, ItemRef, NewChildStoreOptions,
53    ObjectStore, INVALID_OBJECT_ID,
54};
55use crate::range::RangeExt;
56use crate::round::{round_div, round_down};
57use crate::serialized_types::{migrate_to_version, Migrate, Version, Versioned, LATEST_VERSION};
58use anyhow::{anyhow, bail, ensure, Context, Error};
59use event_listener::Event;
60use fprint::TypeFingerprint;
61use fuchsia_sync::Mutex;
62use futures::future::poll_fn;
63use futures::FutureExt as _;
64use once_cell::sync::OnceCell;
65use rand::Rng;
66use rustc_hash::FxHashMap as HashMap;
67use serde::{Deserialize, Serialize};
68use static_assertions::const_assert;
69use std::clone::Clone;
70use std::collections::HashSet;
71use std::ops::{Bound, Range};
72use std::sync::atomic::{AtomicBool, Ordering};
73use std::sync::Arc;
74use std::task::{Poll, Waker};
75
76// The journal file is written to in blocks of this size.
77pub const BLOCK_SIZE: u64 = 4096;
78
79// The journal file is extended by this amount when necessary.
80const CHUNK_SIZE: u64 = 131_072;
81const_assert!(CHUNK_SIZE > TRANSACTION_MAX_JOURNAL_USAGE);
82
83// See the comment for the `reclaim_size` member of Inner.
84pub const DEFAULT_RECLAIM_SIZE: u64 = 262_144;
85
86// Temporary space that should be reserved for the journal.  For example: space that is currently
87// used in the journal file but cannot be deallocated yet because we are flushing.
88pub const RESERVED_SPACE: u64 = 1_048_576;
89
90// Whenever the journal is replayed (i.e. the system is unmounted and remounted), we reset the
91// journal stream, at which point any half-complete transactions are discarded.  We indicate a
92// journal reset by XORing the previous block's checksum with this mask, and using that value as a
93// seed for the next journal block.
94const RESET_XOR: u64 = 0xffffffffffffffff;
95
96// To keep track of offsets within a journal file, we need both the file offset and the check-sum of
97// the preceding block, since the check-sum of the preceding block is an input to the check-sum of
98// every block.
99pub type JournalCheckpoint = JournalCheckpointV32;
100
101#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize, TypeFingerprint)]
102pub struct JournalCheckpointV32 {
103    pub file_offset: u64,
104
105    // Starting check-sum for block that contains file_offset i.e. the checksum for the previous
106    // block.
107    pub checksum: Checksum,
108
109    // If versioned, the version of elements stored in the journal. e.g. JournalRecord version.
110    // This can change across reset events so we store it along with the offset and checksum to
111    // know which version to deserialize.
112    pub version: Version,
113}
114
115pub type JournalRecord = JournalRecordV43;
116
117#[allow(clippy::large_enum_variant)]
118#[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint, Versioned)]
119#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
120pub enum JournalRecordV43 {
121    // Indicates no more records in this block.
122    EndBlock,
123    // Mutation for a particular object.  object_id here is for the collection i.e. the store or
124    // allocator.
125    Mutation { object_id: u64, mutation: MutationV43 },
126    // Commits records in the transaction.
127    Commit,
128    // Discard all mutations with offsets greater than or equal to the given offset.
129    Discard(u64),
130    // Indicates the device was flushed at the given journal offset.
131    // Note that this really means that at this point in the journal offset, we can be certain that
132    // there's no remaining buffered data in the block device; the buffers and the disk contents are
133    // consistent.
134    // We insert one of these records *after* a flush along with the *next* transaction to go
135    // through.  If that never comes (either due to graceful or hard shutdown), the journal reset
136    // on the next mount will serve the same purpose and count as a flush, although it is necessary
137    // to defensively flush the device before replaying the journal (if possible, i.e. not
138    // read-only) in case the block device connection was reused.
139    DidFlushDevice(u64),
140    // Checksums for a data range written by this transaction. A transaction is only valid if these
141    // checksums are right. The range is the device offset the checksums are for.
142    //
143    // A boolean indicates whether this range is being written to for the first time. For overwrite
144    // extents, we only check the checksums for a block if it has been written to for the first
145    // time since the last flush, because otherwise we can't roll it back anyway so it doesn't
146    // matter. For copy-on-write extents, the bool is always true.
147    DataChecksums(Range<u64>, ChecksumsV38, bool),
148}
149
150#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
151#[migrate_to_version(JournalRecordV43)]
152pub enum JournalRecordV42 {
153    EndBlock,
154    Mutation { object_id: u64, mutation: MutationV41 },
155    Commit,
156    Discard(u64),
157    DidFlushDevice(u64),
158    DataChecksums(Range<u64>, ChecksumsV38, bool),
159}
160
161#[derive(Serialize, Deserialize, TypeFingerprint, Versioned)]
162pub enum JournalRecordV41 {
163    EndBlock,
164    Mutation { object_id: u64, mutation: MutationV41 },
165    Commit,
166    Discard(u64),
167    DidFlushDevice(u64),
168    DataChecksums(Range<u64>, ChecksumsV38),
169}
170
171impl From<JournalRecordV41> for JournalRecordV42 {
172    fn from(record: JournalRecordV41) -> Self {
173        match record {
174            JournalRecordV41::EndBlock => Self::EndBlock,
175            JournalRecordV41::Mutation { object_id, mutation } => {
176                Self::Mutation { object_id, mutation: mutation.into() }
177            }
178            JournalRecordV41::Commit => Self::Commit,
179            JournalRecordV41::Discard(offset) => Self::Discard(offset),
180            JournalRecordV41::DidFlushDevice(offset) => Self::DidFlushDevice(offset),
181            JournalRecordV41::DataChecksums(range, sums) => {
182                // At the time of writing the only extents written by real systems are CoW extents
183                // so the new bool is always true.
184                Self::DataChecksums(range, sums, true)
185            }
186        }
187    }
188}
189
190#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
191#[migrate_to_version(JournalRecordV41)]
192pub enum JournalRecordV40 {
193    EndBlock,
194    Mutation { object_id: u64, mutation: MutationV40 },
195    Commit,
196    Discard(u64),
197    DidFlushDevice(u64),
198    DataChecksums(Range<u64>, ChecksumsV38),
199}
200
201pub(super) fn journal_handle_options() -> HandleOptions {
202    HandleOptions { skip_journal_checks: true, ..Default::default() }
203}
204
205/// The journal records a stream of mutations that are to be applied to other objects.  At mount
206/// time, these records can be replayed into memory.  It provides a way to quickly persist changes
207/// without having to make a large number of writes; they can be deferred to a later time (e.g.
208/// when a sufficient number have been queued).  It also provides support for transactions, the
209/// ability to have mutations that are to be applied atomically together.
210pub struct Journal {
211    objects: Arc<ObjectManager>,
212    handle: OnceCell<DataObjectHandle<ObjectStore>>,
213    super_block_manager: SuperBlockManager,
214    inner: Mutex<Inner>,
215    writer_mutex: Mutex<()>,
216    sync_mutex: futures::lock::Mutex<()>,
217    trace: AtomicBool,
218
219    // This event is used when we are waiting for a compaction to free up journal space.
220    reclaim_event: Event,
221}
222
223struct Inner {
224    super_block_header: SuperBlockHeader,
225
226    // The offset that we can zero the journal up to now that it is no longer needed.
227    zero_offset: Option<u64>,
228
229    // The journal offset that we most recently flushed to the device.
230    device_flushed_offset: u64,
231
232    // If true, indicates a DidFlushDevice record is pending.
233    needs_did_flush_device: bool,
234
235    // The writer for the journal.
236    writer: JournalWriter,
237
238    // Set when a reset is encountered during a read.
239    // Used at write pre_commit() time to ensure we write a version first thing after a reset.
240    output_reset_version: bool,
241
242    // Waker for the flush task.
243    flush_waker: Option<Waker>,
244
245    // Indicates the journal has been terminated.
246    terminate: bool,
247
248    // Latched error indicating reason for journal termination if not graceful.
249    terminate_reason: Option<Error>,
250
251    // Disable compactions.
252    disable_compactions: bool,
253
254    // True if compactions are running.
255    compaction_running: bool,
256
257    // Waker for the sync task for when it's waiting for the flush task to finish.
258    sync_waker: Option<Waker>,
259
260    // The last offset we flushed to the journal file.
261    flushed_offset: u64,
262
263    // The last offset that should be considered valid in the journal file.  Most of the time, this
264    // will be the same as `flushed_offset` but at mount time, this could be less and will only be
265    // up to the end of the last valid transaction; it won't include transactions that follow that
266    // have been discarded.
267    valid_to: u64,
268
269    // If, after replaying, we have to discard a number of mutations (because they don't validate),
270    // this offset specifies where we need to discard back to.  This is so that when we next replay,
271    // we ignore those mutations and continue with new good mutations.
272    discard_offset: Option<u64>,
273
274    // In the steady state, the journal should fluctuate between being approximately half of this
275    // number and this number.  New super-blocks will be written every time about half of this
276    // amount is written to the journal.
277    reclaim_size: u64,
278
279    image_builder_mode: bool,
280}
281
282impl Inner {
283    fn terminate(&mut self, reason: Option<Error>) {
284        self.terminate = true;
285
286        if let Some(err) = reason {
287            error!(error:? = err; "Terminating journal");
288            // Log previous error if one was already set, otherwise latch the error.
289            if let Some(prev_err) = self.terminate_reason.as_ref() {
290                error!(error:? = prev_err; "Journal previously terminated");
291            } else {
292                self.terminate_reason = Some(err);
293            }
294        }
295
296        if let Some(waker) = self.flush_waker.take() {
297            waker.wake();
298        }
299        if let Some(waker) = self.sync_waker.take() {
300            waker.wake();
301        }
302    }
303}
304
305pub struct JournalOptions {
306    /// In the steady state, the journal should fluctuate between being approximately half of this
307    /// number and this number.  New super-blocks will be written every time about half of this
308    /// amount is written to the journal.
309    pub reclaim_size: u64,
310}
311
312impl Default for JournalOptions {
313    fn default() -> Self {
314        JournalOptions { reclaim_size: DEFAULT_RECLAIM_SIZE }
315    }
316}
317
318struct JournaledTransactions {
319    transactions: Vec<JournaledTransaction>,
320    device_flushed_offset: u64,
321}
322
323#[derive(Debug, Default)]
324pub struct JournaledTransaction {
325    pub checkpoint: JournalCheckpoint,
326    pub root_parent_mutations: Vec<Mutation>,
327    pub root_mutations: Vec<Mutation>,
328    // List of (store_object_id, mutation).
329    pub non_root_mutations: Vec<(u64, Mutation)>,
330    pub end_offset: u64,
331    pub checksums: Vec<JournaledChecksums>,
332
333    // Records offset + 1 of the matching begin_flush transaction.  If the object is deleted, the
334    // offset will be STORE_DELETED.  The +1 is because we want to ignore the begin flush
335    // transaction; we don't need or want to replay it.
336    pub end_flush: Option<(/* store_id: */ u64, /* begin offset: */ u64)>,
337}
338
339impl JournaledTransaction {
340    fn new(checkpoint: JournalCheckpoint) -> Self {
341        Self { checkpoint, ..Default::default() }
342    }
343}
344
345const STORE_DELETED: u64 = u64::MAX;
346
347#[derive(Debug)]
348pub struct JournaledChecksums {
349    pub device_range: Range<u64>,
350    pub checksums: Checksums,
351    pub first_write: bool,
352}
353
354/// Handles for journal-like objects have some additional functionality to manage their extents,
355/// since during replay we need to add extents as we find them.
356pub trait JournalHandle: ReadObjectHandle {
357    /// The end offset of the last extent in the JournalHandle.  Used only for validating extents
358    /// (which will be skipped if None is returned).
359    /// Note this is equivalent in value to ReadObjectHandle::get_size, when present.
360    fn end_offset(&self) -> Option<u64>;
361    /// Adds an extent to the current end of the journal stream.
362    /// `added_offset` is the offset into the journal of the transaction which added this extent,
363    /// used for discard_extents.
364    fn push_extent(&mut self, added_offset: u64, device_range: Range<u64>);
365    /// Discards all extents which were added in a transaction at offset >= |discard_offset|.
366    fn discard_extents(&mut self, discard_offset: u64);
367}
368
369// Provide a stub implementation for DataObjectHandle so we can use it in
370// Journal::read_transactions.  Manual extent management is a NOP (which is OK since presumably the
371// DataObjectHandle already knows where its extents live).
372impl<S: HandleOwner> JournalHandle for DataObjectHandle<S> {
373    fn end_offset(&self) -> Option<u64> {
374        None
375    }
376    fn push_extent(&mut self, _added_offset: u64, _device_range: Range<u64>) {
377        // NOP
378    }
379    fn discard_extents(&mut self, _discard_offset: u64) {
380        // NOP
381    }
382}
383
384#[fxfs_trace::trace]
385impl Journal {
386    pub fn new(objects: Arc<ObjectManager>, options: JournalOptions) -> Journal {
387        let starting_checksum = rand::thread_rng().gen_range(1..u64::MAX);
388        Journal {
389            objects: objects,
390            handle: OnceCell::new(),
391            super_block_manager: SuperBlockManager::new(),
392            inner: Mutex::new(Inner {
393                super_block_header: SuperBlockHeader::default(),
394                zero_offset: None,
395                device_flushed_offset: 0,
396                needs_did_flush_device: false,
397                writer: JournalWriter::new(BLOCK_SIZE as usize, starting_checksum),
398                output_reset_version: false,
399                flush_waker: None,
400                terminate: false,
401                terminate_reason: None,
402                disable_compactions: false,
403                compaction_running: false,
404                sync_waker: None,
405                flushed_offset: 0,
406                valid_to: 0,
407                discard_offset: None,
408                reclaim_size: options.reclaim_size,
409                image_builder_mode: false,
410            }),
411            writer_mutex: Mutex::new(()),
412            sync_mutex: futures::lock::Mutex::new(()),
413            trace: AtomicBool::new(false),
414            reclaim_event: Event::new(),
415        }
416    }
417
418    pub fn set_trace(&self, trace: bool) {
419        let old_value = self.trace.swap(trace, Ordering::Relaxed);
420        if trace != old_value {
421            info!(trace; "J: trace");
422        }
423    }
424
425    pub fn set_image_builder_mode(&self, enabled: bool) {
426        self.inner.lock().image_builder_mode = enabled;
427    }
428
429    pub fn image_builder_mode(&self) -> bool {
430        self.inner.lock().image_builder_mode
431    }
432
433    /// Used during replay to validate a mutation.  This should return false if the mutation is not
434    /// valid and should not be applied.  This could be for benign reasons: e.g. the device flushed
435    /// data out-of-order, or because of a malicious actor.
436    fn validate_mutation(&self, mutation: &Mutation, block_size: u64, device_size: u64) -> bool {
437        match mutation {
438            Mutation::ObjectStore(ObjectStoreMutation {
439                item:
440                    Item {
441                        key:
442                            ObjectKey {
443                                data:
444                                    ObjectKeyData::Attribute(
445                                        _,
446                                        AttributeKey::Extent(ExtentKey { range }),
447                                    ),
448                                ..
449                            },
450                        value: ObjectValue::Extent(ExtentValue::Some { device_offset, mode, .. }),
451                        ..
452                    },
453                ..
454            }) => {
455                if range.is_empty() || !range.is_aligned(block_size) {
456                    return false;
457                }
458                let len = range.length().unwrap();
459                if let ExtentMode::Cow(checksums) = mode {
460                    if checksums.len() > 0 {
461                        if len % checksums.len() as u64 != 0 {
462                            return false;
463                        }
464                        if (len / checksums.len() as u64) % block_size != 0 {
465                            return false;
466                        }
467                    }
468                }
469                if *device_offset % block_size != 0
470                    || *device_offset >= device_size
471                    || device_size - *device_offset < len
472                {
473                    return false;
474                }
475            }
476            Mutation::ObjectStore(_) => {}
477            Mutation::EncryptedObjectStore(_) => {}
478            Mutation::Allocator(AllocatorMutation::Allocate { device_range, owner_object_id }) => {
479                return !device_range.is_empty()
480                    && *owner_object_id != INVALID_OBJECT_ID
481                    && device_range.end <= device_size;
482            }
483            Mutation::Allocator(AllocatorMutation::Deallocate {
484                device_range,
485                owner_object_id,
486            }) => {
487                return !device_range.is_empty()
488                    && *owner_object_id != INVALID_OBJECT_ID
489                    && device_range.end <= device_size;
490            }
491            Mutation::Allocator(AllocatorMutation::MarkForDeletion(owner_object_id)) => {
492                return *owner_object_id != INVALID_OBJECT_ID;
493            }
494            Mutation::Allocator(AllocatorMutation::SetLimit { owner_object_id, .. }) => {
495                return *owner_object_id != INVALID_OBJECT_ID;
496            }
497            Mutation::BeginFlush => {}
498            Mutation::EndFlush => {}
499            Mutation::DeleteVolume => {}
500            Mutation::UpdateBorrowed(_) => {}
501            Mutation::UpdateMutationsKey(_) => {}
502            Mutation::CreateInternalDir(owner_object_id) => {
503                return *owner_object_id != INVALID_OBJECT_ID;
504            }
505        }
506        true
507    }
508
509    // Assumes that `mutation` has been validated.
510    fn update_checksum_list(
511        &self,
512        journal_offset: u64,
513        mutation: &Mutation,
514        checksum_list: &mut ChecksumList,
515    ) -> Result<(), Error> {
516        match mutation {
517            Mutation::ObjectStore(_) => {}
518            Mutation::Allocator(AllocatorMutation::Deallocate { device_range, .. }) => {
519                checksum_list.mark_deallocated(journal_offset, device_range.clone().into());
520            }
521            _ => {}
522        }
523        Ok(())
524    }
525
526    /// Reads the latest super-block, and then replays journaled records.
527    #[trace]
528    pub async fn replay(
529        &self,
530        filesystem: Arc<FxFilesystem>,
531        on_new_allocator: Option<Box<dyn Fn(Arc<Allocator>) + Send + Sync>>,
532    ) -> Result<(), Error> {
533        let block_size = filesystem.block_size();
534
535        let (super_block, root_parent) =
536            self.super_block_manager.load(filesystem.device(), block_size).await?;
537
538        let root_parent = Arc::new(ObjectStore::attach_filesystem(root_parent, filesystem.clone()));
539
540        self.objects.set_root_parent_store(root_parent.clone());
541        let allocator =
542            Arc::new(Allocator::new(filesystem.clone(), super_block.allocator_object_id));
543        if let Some(on_new_allocator) = on_new_allocator {
544            on_new_allocator(allocator.clone());
545        }
546        self.objects.set_allocator(allocator.clone());
547        self.objects.set_borrowed_metadata_space(super_block.borrowed_metadata_space);
548        self.objects.set_last_end_offset(super_block.super_block_journal_file_offset);
549        {
550            let mut inner = self.inner.lock();
551            inner.super_block_header = super_block.clone();
552        }
553
554        let device = filesystem.device();
555
556        let mut handle;
557        {
558            let root_parent_layer = root_parent.tree().mutable_layer();
559            let mut iter = root_parent_layer
560                .seek(Bound::Included(&ObjectKey::attribute(
561                    super_block.journal_object_id,
562                    DEFAULT_DATA_ATTRIBUTE_ID,
563                    AttributeKey::Extent(ExtentKey::search_key_from_offset(round_down(
564                        super_block.journal_checkpoint.file_offset,
565                        BLOCK_SIZE,
566                    ))),
567                )))
568                .await
569                .context("Failed to seek root parent store")?;
570            let start_offset = if let Some(ItemRef {
571                key:
572                    ObjectKey {
573                        data:
574                            ObjectKeyData::Attribute(
575                                DEFAULT_DATA_ATTRIBUTE_ID,
576                                AttributeKey::Extent(ExtentKey { range }),
577                            ),
578                        ..
579                    },
580                ..
581            }) = iter.get()
582            {
583                range.start
584            } else {
585                0
586            };
587            handle = BootstrapObjectHandle::new_with_start_offset(
588                super_block.journal_object_id,
589                device.clone(),
590                start_offset,
591            );
592            while let Some(item) = iter.get() {
593                if !match item.into() {
594                    Some((
595                        object_id,
596                        DEFAULT_DATA_ATTRIBUTE_ID,
597                        ExtentKey { range },
598                        ExtentValue::Some { device_offset, .. },
599                    )) if object_id == super_block.journal_object_id => {
600                        if let Some(end_offset) = handle.end_offset() {
601                            if range.start != end_offset {
602                                bail!(anyhow!(FxfsError::Inconsistent).context(format!(
603                                    "Unexpected journal extent {:?}, expected start: {}",
604                                    item, end_offset
605                                )));
606                            }
607                        }
608                        handle.push_extent(
609                            0, // We never discard extents from the root parent store.
610                            *device_offset
611                                ..*device_offset + range.length().context("Invalid extent")?,
612                        );
613                        true
614                    }
615                    _ => false,
616                } {
617                    break;
618                }
619                iter.advance().await.context("Failed to advance root parent store iterator")?;
620            }
621        }
622
623        let mut reader = JournalReader::new(handle, &super_block.journal_checkpoint);
624        let JournaledTransactions { mut transactions, device_flushed_offset } = self
625            .read_transactions(&mut reader, None, INVALID_OBJECT_ID)
626            .await
627            .context("Reading transactions for replay")?;
628
629        // Validate all the mutations.
630        let mut checksum_list = ChecksumList::new(device_flushed_offset);
631        let mut valid_to = reader.journal_file_checkpoint().file_offset;
632        let device_size = device.size();
633        'bad_replay: for JournaledTransaction {
634            checkpoint,
635            root_parent_mutations,
636            root_mutations,
637            non_root_mutations,
638            checksums,
639            ..
640        } in &transactions
641        {
642            for JournaledChecksums { device_range, checksums, first_write } in checksums {
643                checksum_list
644                    .push(
645                        checkpoint.file_offset,
646                        device_range.clone(),
647                        checksums.maybe_as_ref().context("Malformed checksums")?,
648                        *first_write,
649                    )
650                    .context("Pushing journal checksum records to checksum list")?;
651            }
652            for mutation in root_parent_mutations
653                .iter()
654                .chain(root_mutations)
655                .chain(non_root_mutations.iter().map(|(_, m)| m))
656            {
657                if !self.validate_mutation(mutation, block_size, device_size) {
658                    info!(mutation:?; "Stopping replay at bad mutation");
659                    valid_to = checkpoint.file_offset;
660                    break 'bad_replay;
661                }
662                self.update_checksum_list(checkpoint.file_offset, &mutation, &mut checksum_list)?;
663            }
664        }
665
666        // Validate the checksums.
667        let valid_to = checksum_list
668            .verify(device.as_ref(), valid_to)
669            .await
670            .context("Failed to validate checksums")?;
671
672        // Apply the mutations...
673
674        let mut last_checkpoint = reader.journal_file_checkpoint();
675        let mut journal_offsets = super_block.journal_file_offsets.clone();
676
677        // Start with the root-parent mutations, and also determine the journal offsets for all
678        // other objects.
679        for (index, JournaledTransaction { checkpoint, root_parent_mutations, end_flush, .. }) in
680            transactions.iter_mut().enumerate()
681        {
682            if checkpoint.file_offset >= valid_to {
683                last_checkpoint = checkpoint.clone();
684
685                // Truncate the transactions so we don't need to worry about them on the next pass.
686                transactions.truncate(index);
687                break;
688            }
689
690            let context = ApplyContext { mode: ApplyMode::Replay, checkpoint: checkpoint.clone() };
691            for mutation in root_parent_mutations.drain(..) {
692                self.objects
693                    .apply_mutation(
694                        super_block.root_parent_store_object_id,
695                        mutation,
696                        &context,
697                        AssocObj::None,
698                    )
699                    .context("Failed to replay root parent store mutations")?;
700            }
701
702            if let Some((object_id, journal_offset)) = end_flush {
703                journal_offsets.insert(*object_id, *journal_offset);
704            }
705        }
706
707        // Now we can open the root store.
708        let root_store = ObjectStore::open(
709            &root_parent,
710            super_block.root_store_object_id,
711            Box::new(NullCache {}),
712        )
713        .await
714        .context("Unable to open root store")?;
715
716        ensure!(
717            !root_store.is_encrypted(),
718            anyhow!(FxfsError::Inconsistent).context("Root store is encrypted")
719        );
720        self.objects.set_root_store(root_store);
721
722        let root_store_offset =
723            journal_offsets.get(&super_block.root_store_object_id).copied().unwrap_or(0);
724
725        // Now replay the root store mutations.
726        for JournaledTransaction { checkpoint, root_mutations, .. } in &mut transactions {
727            if checkpoint.file_offset < root_store_offset {
728                continue;
729            }
730
731            let context = ApplyContext { mode: ApplyMode::Replay, checkpoint: checkpoint.clone() };
732            for mutation in root_mutations.drain(..) {
733                self.objects
734                    .apply_mutation(
735                        super_block.root_store_object_id,
736                        mutation,
737                        &context,
738                        AssocObj::None,
739                    )
740                    .context("Failed to replay root store mutations")?;
741            }
742        }
743
744        // Now we can open the allocator.
745        allocator.open().await.context("Failed to open allocator")?;
746
747        // Now replay all other mutations.
748        for JournaledTransaction { checkpoint, non_root_mutations, end_offset, .. } in transactions
749        {
750            self.objects
751                .replay_mutations(
752                    non_root_mutations,
753                    &journal_offsets,
754                    &ApplyContext { mode: ApplyMode::Replay, checkpoint },
755                    end_offset,
756                )
757                .await
758                .context("Failed to replay mutations")?;
759        }
760
761        allocator.on_replay_complete().await.context("Failed to complete replay for allocator")?;
762
763        let discarded_to =
764            if last_checkpoint.file_offset != reader.journal_file_checkpoint().file_offset {
765                Some(reader.journal_file_checkpoint().file_offset)
766            } else {
767                None
768            };
769
770        // Configure the journal writer so that we can continue.
771        {
772            if last_checkpoint.file_offset < super_block.super_block_journal_file_offset {
773                return Err(anyhow!(FxfsError::Inconsistent).context(format!(
774                    "journal replay cut short; journal finishes at {}, but super-block was \
775                     written at {}",
776                    last_checkpoint.file_offset, super_block.super_block_journal_file_offset
777                )));
778            }
779            let handle = ObjectStore::open_object(
780                &root_parent,
781                super_block.journal_object_id,
782                journal_handle_options(),
783                None,
784            )
785            .await
786            .with_context(|| {
787                format!(
788                    "Failed to open journal file (object id: {})",
789                    super_block.journal_object_id
790                )
791            })?;
792            let _ = self.handle.set(handle);
793            let mut inner = self.inner.lock();
794            reader.skip_to_end_of_block();
795            let mut writer_checkpoint = reader.journal_file_checkpoint();
796
797            // Make sure we don't accidentally use the reader from now onwards.
798            std::mem::drop(reader);
799
800            // Reset the stream to indicate that we've remounted the journal.
801            writer_checkpoint.checksum ^= RESET_XOR;
802            writer_checkpoint.version = LATEST_VERSION;
803            inner.flushed_offset = writer_checkpoint.file_offset;
804
805            // When we open the filesystem as writable, we flush the device.
806            inner.device_flushed_offset = inner.flushed_offset;
807
808            inner.writer.seek(writer_checkpoint);
809            inner.output_reset_version = true;
810            inner.valid_to = last_checkpoint.file_offset;
811            if last_checkpoint.file_offset < inner.flushed_offset {
812                inner.discard_offset = Some(last_checkpoint.file_offset);
813            }
814        }
815
816        self.objects
817            .on_replay_complete()
818            .await
819            .context("Failed to complete replay for object manager")?;
820
821        info!(checkpoint = last_checkpoint.file_offset, discarded_to; "replay complete");
822        Ok(())
823    }
824
825    async fn read_transactions(
826        &self,
827        reader: &mut JournalReader,
828        end_offset: Option<u64>,
829        object_id_filter: u64,
830    ) -> Result<JournaledTransactions, Error> {
831        let mut transactions = Vec::new();
832        let (mut device_flushed_offset, root_parent_store_object_id, root_store_object_id) = {
833            let super_block = &self.inner.lock().super_block_header;
834            (
835                super_block.super_block_journal_file_offset,
836                super_block.root_parent_store_object_id,
837                super_block.root_store_object_id,
838            )
839        };
840        let mut current_transaction = None;
841        let mut begin_flush_offsets = HashMap::default();
842        let mut stores_deleted = HashSet::new();
843        loop {
844            // Cache the checkpoint before we deserialize a record.
845            let checkpoint = reader.journal_file_checkpoint();
846            if let Some(end_offset) = end_offset {
847                if checkpoint.file_offset >= end_offset {
848                    break;
849                }
850            }
851            let result =
852                reader.deserialize().await.context("Failed to deserialize journal record")?;
853            match result {
854                ReadResult::Reset(_) => {
855                    if current_transaction.is_some() {
856                        current_transaction = None;
857                        transactions.pop();
858                    }
859                    let offset = reader.journal_file_checkpoint().file_offset;
860                    if offset > device_flushed_offset {
861                        device_flushed_offset = offset;
862                    }
863                }
864                ReadResult::Some(record) => {
865                    match record {
866                        JournalRecord::EndBlock => {
867                            reader.skip_to_end_of_block();
868                        }
869                        JournalRecord::Mutation { object_id, mutation } => {
870                            let current_transaction = match current_transaction.as_mut() {
871                                None => {
872                                    transactions.push(JournaledTransaction::new(checkpoint));
873                                    current_transaction = transactions.last_mut();
874                                    current_transaction.as_mut().unwrap()
875                                }
876                                Some(transaction) => transaction,
877                            };
878
879                            if stores_deleted.contains(&object_id) {
880                                bail!(anyhow!(FxfsError::Inconsistent)
881                                    .context("Encountered mutations for deleted store"));
882                            }
883
884                            match &mutation {
885                                Mutation::BeginFlush => {
886                                    begin_flush_offsets.insert(
887                                        object_id,
888                                        current_transaction.checkpoint.file_offset,
889                                    );
890                                }
891                                Mutation::EndFlush => {
892                                    if let Some(offset) = begin_flush_offsets.remove(&object_id) {
893                                        // The +1 is because we don't want to replay the transaction
894                                        // containing the begin flush.
895                                        if current_transaction
896                                            .end_flush
897                                            .replace((object_id, offset + 1))
898                                            .is_some()
899                                        {
900                                            bail!(anyhow!(FxfsError::Inconsistent).context(
901                                                "Multiple EndFlush/DeleteVolume mutations in a \
902                                                 single transaction"
903                                            ));
904                                        }
905                                    }
906                                }
907                                Mutation::DeleteVolume => {
908                                    if current_transaction
909                                        .end_flush
910                                        .replace((object_id, STORE_DELETED))
911                                        .is_some()
912                                    {
913                                        bail!(anyhow!(FxfsError::Inconsistent).context(
914                                            "Multiple EndFlush/DeleteVolume mutations in a single \
915                                             transaction"
916                                        ));
917                                    }
918                                    stores_deleted.insert(object_id);
919                                }
920                                _ => {}
921                            }
922
923                            // If this mutation doesn't need to be applied, don't bother adding it
924                            // to the transaction.
925                            if (object_id_filter == INVALID_OBJECT_ID
926                                || object_id_filter == object_id)
927                                && self.should_apply(object_id, &current_transaction.checkpoint)
928                            {
929                                if object_id == root_parent_store_object_id {
930                                    current_transaction.root_parent_mutations.push(mutation);
931                                } else if object_id == root_store_object_id {
932                                    current_transaction.root_mutations.push(mutation);
933                                } else {
934                                    current_transaction
935                                        .non_root_mutations
936                                        .push((object_id, mutation));
937                                }
938                            }
939                        }
940                        JournalRecord::DataChecksums(device_range, checksums, first_write) => {
941                            let current_transaction = match current_transaction.as_mut() {
942                                None => {
943                                    transactions.push(JournaledTransaction::new(checkpoint));
944                                    current_transaction = transactions.last_mut();
945                                    current_transaction.as_mut().unwrap()
946                                }
947                                Some(transaction) => transaction,
948                            };
949                            current_transaction.checksums.push(JournaledChecksums {
950                                device_range,
951                                checksums,
952                                first_write,
953                            });
954                        }
955                        JournalRecord::Commit => {
956                            if let Some(JournaledTransaction {
957                                ref checkpoint,
958                                ref root_parent_mutations,
959                                ref mut end_offset,
960                                ..
961                            }) = current_transaction.take()
962                            {
963                                for mutation in root_parent_mutations {
964                                    // Snoop the mutations for any that might apply to the journal
965                                    // file so that we can pass them to the reader so that it can
966                                    // read the journal file.
967                                    if let Mutation::ObjectStore(ObjectStoreMutation {
968                                        item:
969                                            Item {
970                                                key:
971                                                    ObjectKey {
972                                                        object_id,
973                                                        data:
974                                                            ObjectKeyData::Attribute(
975                                                                DEFAULT_DATA_ATTRIBUTE_ID,
976                                                                AttributeKey::Extent(ExtentKey {
977                                                                    range,
978                                                                }),
979                                                            ),
980                                                        ..
981                                                    },
982                                                value:
983                                                    ObjectValue::Extent(ExtentValue::Some {
984                                                        device_offset,
985                                                        ..
986                                                    }),
987                                                ..
988                                            },
989                                        ..
990                                    }) = mutation
991                                    {
992                                        // Add the journal extents we find on the way to our
993                                        // reader.
994                                        let handle = reader.handle();
995                                        if *object_id != handle.object_id() {
996                                            continue;
997                                        }
998                                        if let Some(end_offset) = handle.end_offset() {
999                                            if range.start != end_offset {
1000                                                bail!(anyhow!(FxfsError::Inconsistent).context(
1001                                                    format!(
1002                                                        "Unexpected journal extent {:?} -> {}, \
1003                                                           expected start: {}",
1004                                                        range, device_offset, end_offset,
1005                                                    )
1006                                                ));
1007                                            }
1008                                        }
1009                                        handle.push_extent(
1010                                            checkpoint.file_offset,
1011                                            *device_offset
1012                                                ..*device_offset
1013                                                    + range.length().context("Invalid extent")?,
1014                                        );
1015                                    }
1016                                }
1017                                *end_offset = reader.journal_file_checkpoint().file_offset;
1018                            }
1019                        }
1020                        JournalRecord::Discard(offset) => {
1021                            if offset == 0 {
1022                                bail!(anyhow!(FxfsError::Inconsistent)
1023                                    .context("Invalid offset for Discard"));
1024                            }
1025                            if let Some(transaction) = current_transaction.as_ref() {
1026                                if transaction.checkpoint.file_offset < offset {
1027                                    // Odd, but OK.
1028                                    continue;
1029                                }
1030                            }
1031                            current_transaction = None;
1032                            while let Some(transaction) = transactions.last() {
1033                                if transaction.checkpoint.file_offset < offset {
1034                                    break;
1035                                }
1036                                transactions.pop();
1037                            }
1038                            reader.handle().discard_extents(offset);
1039                        }
1040                        JournalRecord::DidFlushDevice(offset) => {
1041                            if offset > device_flushed_offset {
1042                                device_flushed_offset = offset;
1043                            }
1044                        }
1045                    }
1046                }
1047                // This is expected when we reach the end of the journal stream.
1048                ReadResult::ChecksumMismatch => break,
1049            }
1050        }
1051
1052        // Discard any uncommitted transaction.
1053        if current_transaction.is_some() {
1054            transactions.pop();
1055        }
1056
1057        Ok(JournaledTransactions { transactions, device_flushed_offset })
1058    }
1059
1060    /// Creates an empty filesystem with the minimum viable objects (including a root parent and
1061    /// root store but no further child stores).
1062    pub async fn init_empty(&self, filesystem: Arc<FxFilesystem>) -> Result<(), Error> {
1063        // The following constants are only used at format time. When mounting, the recorded values
1064        // in the superblock should be used.  The root parent store does not have a parent, but
1065        // needs an object ID to be registered with ObjectManager, so it cannot collide (i.e. have
1066        // the same object ID) with any objects in the root store that use the journal to track
1067        // mutations.
1068        const INIT_ROOT_PARENT_STORE_OBJECT_ID: u64 = 3;
1069        const INIT_ROOT_STORE_OBJECT_ID: u64 = 4;
1070        const INIT_ALLOCATOR_OBJECT_ID: u64 = 5;
1071
1072        info!(device_size = filesystem.device().size(); "Formatting");
1073
1074        let checkpoint = JournalCheckpoint {
1075            version: LATEST_VERSION,
1076            ..self.inner.lock().writer.journal_file_checkpoint()
1077        };
1078
1079        let root_parent = ObjectStore::new_empty(
1080            None,
1081            INIT_ROOT_PARENT_STORE_OBJECT_ID,
1082            filesystem.clone(),
1083            Box::new(NullCache {}),
1084        );
1085        self.objects.set_root_parent_store(root_parent.clone());
1086
1087        let allocator = Arc::new(Allocator::new(filesystem.clone(), INIT_ALLOCATOR_OBJECT_ID));
1088        self.objects.set_allocator(allocator.clone());
1089        self.objects.init_metadata_reservation()?;
1090
1091        let journal_handle;
1092        let super_block_a_handle;
1093        let super_block_b_handle;
1094        let root_store;
1095        let mut transaction = filesystem
1096            .clone()
1097            .new_transaction(
1098                lock_keys![],
1099                Options { skip_journal_checks: true, ..Default::default() },
1100            )
1101            .await?;
1102        root_store = root_parent
1103            .new_child_store(
1104                &mut transaction,
1105                NewChildStoreOptions { object_id: INIT_ROOT_STORE_OBJECT_ID, ..Default::default() },
1106                Box::new(NullCache {}),
1107            )
1108            .await
1109            .context("new_child_store")?;
1110        self.objects.set_root_store(root_store.clone());
1111
1112        allocator.create(&mut transaction).await?;
1113
1114        // Create the super-block objects...
1115        super_block_a_handle = ObjectStore::create_object_with_id(
1116            &root_store,
1117            &mut transaction,
1118            SuperBlockInstance::A.object_id(),
1119            HandleOptions::default(),
1120            None,
1121        )
1122        .await
1123        .context("create super block")?;
1124        root_store.update_last_object_id(SuperBlockInstance::A.object_id());
1125        super_block_a_handle
1126            .extend(&mut transaction, SuperBlockInstance::A.first_extent())
1127            .await
1128            .context("extend super block")?;
1129        super_block_b_handle = ObjectStore::create_object_with_id(
1130            &root_store,
1131            &mut transaction,
1132            SuperBlockInstance::B.object_id(),
1133            HandleOptions::default(),
1134            None,
1135        )
1136        .await
1137        .context("create super block")?;
1138        root_store.update_last_object_id(SuperBlockInstance::B.object_id());
1139        super_block_b_handle
1140            .extend(&mut transaction, SuperBlockInstance::B.first_extent())
1141            .await
1142            .context("extend super block")?;
1143
1144        // the journal object...
1145        journal_handle = ObjectStore::create_object(
1146            &root_parent,
1147            &mut transaction,
1148            journal_handle_options(),
1149            None,
1150        )
1151        .await
1152        .context("create journal")?;
1153        if !self.inner.lock().image_builder_mode {
1154            let mut file_range = 0..self.chunk_size();
1155            journal_handle
1156                .preallocate_range(&mut transaction, &mut file_range)
1157                .await
1158                .context("preallocate journal")?;
1159            if file_range.start < file_range.end {
1160                bail!("preallocate_range returned too little space");
1161            }
1162        }
1163
1164        // Write the root store object info.
1165        root_store.create(&mut transaction).await?;
1166
1167        // The root parent graveyard.
1168        root_parent
1169            .set_graveyard_directory_object_id(Graveyard::create(&mut transaction, &root_parent));
1170
1171        transaction.commit().await?;
1172
1173        self.inner.lock().super_block_header = SuperBlockHeader::new(
1174            root_parent.store_object_id(),
1175            root_parent.graveyard_directory_object_id(),
1176            root_store.store_object_id(),
1177            allocator.object_id(),
1178            journal_handle.object_id(),
1179            checkpoint,
1180            /* earliest_version: */ LATEST_VERSION,
1181        );
1182
1183        // Initialize the journal writer.
1184        let _ = self.handle.set(journal_handle);
1185        Ok(())
1186    }
1187
1188    /// Normally we allocate the journal when creating the filesystem.
1189    /// This is used image_builder_mode when journal allocation is done last.
1190    pub async fn allocate_journal(&self) -> Result<(), Error> {
1191        let handle = self.handle.get().unwrap();
1192        let filesystem = handle.store().filesystem();
1193        let mut transaction = filesystem
1194            .clone()
1195            .new_transaction(
1196                lock_keys![LockKey::object(handle.store().store_object_id(), handle.object_id()),],
1197                Options { skip_journal_checks: true, ..Default::default() },
1198            )
1199            .await?;
1200        let mut file_range = 0..self.chunk_size();
1201        self.handle
1202            .get()
1203            .unwrap()
1204            .preallocate_range(&mut transaction, &mut file_range)
1205            .await
1206            .context("preallocate journal")?;
1207        if file_range.start < file_range.end {
1208            bail!("preallocate_range returned too little space");
1209        }
1210        transaction.commit().await?;
1211        Ok(())
1212    }
1213
1214    pub async fn init_superblocks(&self) -> Result<(), Error> {
1215        // Overwrite both superblocks.
1216        for _ in 0..2 {
1217            self.write_super_block().await?;
1218        }
1219        Ok(())
1220    }
1221
1222    /// Takes a snapshot of all journaled transactions which affect |object_id| since its last
1223    /// flush.
1224    /// The caller is responsible for locking; it must ensure that the journal is not trimmed during
1225    /// this call.  For example, a Flush lock could be held on the object in question (assuming that
1226    /// object has data to flush and is registered with ObjectManager).
1227    pub async fn read_transactions_for_object(
1228        &self,
1229        object_id: u64,
1230    ) -> Result<Vec<JournaledTransaction>, Error> {
1231        let handle = self.handle.get().expect("No journal handle");
1232        // Reopen the handle since JournalReader needs an owned handle.
1233        let handle = ObjectStore::open_object(
1234            handle.owner(),
1235            handle.object_id(),
1236            journal_handle_options(),
1237            None,
1238        )
1239        .await?;
1240
1241        let checkpoint = match self.objects.journal_checkpoint(object_id) {
1242            Some(checkpoint) => checkpoint,
1243            None => return Ok(vec![]),
1244        };
1245        let mut reader = JournalReader::new(handle, &checkpoint);
1246        // Record the current end offset and only read to there, so we don't accidentally read any
1247        // partially flushed blocks.
1248        let end_offset = self.inner.lock().valid_to;
1249        Ok(self.read_transactions(&mut reader, Some(end_offset), object_id).await?.transactions)
1250    }
1251
1252    /// Commits a transaction.  This is not thread safe; the caller must take appropriate locks.
1253    pub async fn commit(&self, transaction: &mut Transaction<'_>) -> Result<u64, Error> {
1254        if transaction.is_empty() {
1255            return Ok(self.inner.lock().writer.journal_file_checkpoint().file_offset);
1256        }
1257
1258        self.pre_commit(transaction).await?;
1259        Ok(self.write_and_apply_mutations(transaction))
1260    }
1261
1262    // Before we commit, we might need to extend the journal or write pending records to the
1263    // journal.
1264    async fn pre_commit(&self, transaction: &Transaction<'_>) -> Result<(), Error> {
1265        let handle;
1266
1267        let (size, zero_offset) = {
1268            let mut inner = self.inner.lock();
1269
1270            // If this is the first write after a RESET, we need to output version first.
1271            if std::mem::take(&mut inner.output_reset_version) {
1272                LATEST_VERSION.serialize_into(&mut inner.writer)?;
1273            }
1274
1275            if let Some(discard_offset) = inner.discard_offset {
1276                JournalRecord::Discard(discard_offset).serialize_into(&mut inner.writer)?;
1277                inner.discard_offset = None;
1278            }
1279
1280            if inner.needs_did_flush_device {
1281                let offset = inner.device_flushed_offset;
1282                JournalRecord::DidFlushDevice(offset).serialize_into(&mut inner.writer)?;
1283                inner.needs_did_flush_device = false;
1284            }
1285
1286            handle = match self.handle.get() {
1287                None => return Ok(()),
1288                Some(x) => x,
1289            };
1290
1291            let file_offset = inner.writer.journal_file_checkpoint().file_offset;
1292
1293            let size = handle.get_size();
1294            let size = if file_offset + self.chunk_size() > size { Some(size) } else { None };
1295
1296            if size.is_none()
1297                && inner.zero_offset.is_none()
1298                && !self.objects.needs_borrow_for_journal(file_offset)
1299            {
1300                return Ok(());
1301            }
1302
1303            (size, inner.zero_offset)
1304        };
1305
1306        let mut transaction = handle
1307            .new_transaction_with_options(Options {
1308                skip_journal_checks: true,
1309                borrow_metadata_space: true,
1310                allocator_reservation: Some(self.objects.metadata_reservation()),
1311                txn_guard: Some(transaction.txn_guard()),
1312                ..Default::default()
1313            })
1314            .await?;
1315        if let Some(size) = size {
1316            handle
1317                .preallocate_range(&mut transaction, &mut (size..size + self.chunk_size()))
1318                .await?;
1319        }
1320        if let Some(zero_offset) = zero_offset {
1321            handle.zero(&mut transaction, 0..zero_offset).await?;
1322        }
1323
1324        // We can't use regular transaction commit, because that can cause re-entrancy issues, so
1325        // instead we just apply the transaction directly here.
1326        self.write_and_apply_mutations(&mut transaction);
1327
1328        let mut inner = self.inner.lock();
1329
1330        // Make sure the transaction to extend the journal made it to the journal within the old
1331        // size, since otherwise, it won't be possible to replay.
1332        if let Some(size) = size {
1333            assert!(inner.writer.journal_file_checkpoint().file_offset < size);
1334        }
1335
1336        if inner.zero_offset == zero_offset {
1337            inner.zero_offset = None;
1338        }
1339
1340        Ok(())
1341    }
1342
1343    // Determines whether a mutation at the given checkpoint should be applied.  During replay, not
1344    // all records should be applied because the object store or allocator might already contain the
1345    // mutation.  After replay, that obviously isn't the case and we want to apply all mutations.
1346    fn should_apply(&self, object_id: u64, journal_file_checkpoint: &JournalCheckpoint) -> bool {
1347        let super_block_header = &self.inner.lock().super_block_header;
1348        let offset = super_block_header
1349            .journal_file_offsets
1350            .get(&object_id)
1351            .cloned()
1352            .unwrap_or(super_block_header.super_block_journal_file_offset);
1353        journal_file_checkpoint.file_offset >= offset
1354    }
1355
1356    /// Flushes previous writes to the device and then writes out a new super-block.
1357    /// Callers must ensure that we do not make concurrent calls.
1358    async fn write_super_block(&self) -> Result<(), Error> {
1359        let root_parent_store = self.objects.root_parent_store();
1360
1361        // We need to flush previous writes to the device since the new super-block we are writing
1362        // relies on written data being observable, and we also need to lock the root parent store
1363        // so that no new entries are written to it whilst we are writing the super-block, and for
1364        // that we use the write lock.
1365        let old_layers;
1366        let old_super_block_offset;
1367        let mut new_super_block_header;
1368        let checkpoint;
1369        let borrowed;
1370
1371        {
1372            let _sync_guard = debug_assert_not_too_long!(self.sync_mutex.lock());
1373            {
1374                let _write_guard = self.writer_mutex.lock();
1375                (checkpoint, borrowed) = self.pad_to_block()?;
1376                old_layers = super_block::compact_root_parent(&*root_parent_store)?;
1377            }
1378            self.flush_device(checkpoint.file_offset)
1379                .await
1380                .context("flush failed when writing superblock")?;
1381        }
1382
1383        new_super_block_header = self.inner.lock().super_block_header.clone();
1384
1385        old_super_block_offset = new_super_block_header.journal_checkpoint.file_offset;
1386
1387        let (journal_file_offsets, min_checkpoint) = self.objects.journal_file_offsets();
1388
1389        new_super_block_header.generation =
1390            new_super_block_header.generation.checked_add(1).ok_or(FxfsError::Inconsistent)?;
1391        new_super_block_header.super_block_journal_file_offset = checkpoint.file_offset;
1392        new_super_block_header.journal_checkpoint = min_checkpoint.unwrap_or(checkpoint);
1393        new_super_block_header.journal_checkpoint.version = LATEST_VERSION;
1394        new_super_block_header.journal_file_offsets = journal_file_offsets;
1395        new_super_block_header.borrowed_metadata_space = borrowed;
1396
1397        self.super_block_manager
1398            .save(
1399                new_super_block_header.clone(),
1400                self.objects.root_parent_store().filesystem(),
1401                old_layers,
1402            )
1403            .await?;
1404        {
1405            let mut inner = self.inner.lock();
1406            inner.super_block_header = new_super_block_header;
1407            inner.zero_offset = Some(round_down(old_super_block_offset, BLOCK_SIZE));
1408        }
1409
1410        Ok(())
1411    }
1412
1413    /// Flushes any buffered journal data to the device.  Note that this does not flush the device
1414    /// unless the flush_device option is set, in which case data should have been persisted to
1415    /// lower layers.  If a precondition is supplied, it is evaluated and the sync will be skipped
1416    /// if it returns false.  This allows callers to check a condition whilst a lock is held.  If a
1417    /// sync is performed, this function returns the checkpoint that was flushed and the amount of
1418    /// borrowed metadata space at the point it was flushed.
1419    pub async fn sync(
1420        &self,
1421        options: SyncOptions<'_>,
1422    ) -> Result<Option<(JournalCheckpoint, u64)>, Error> {
1423        let _guard = debug_assert_not_too_long!(self.sync_mutex.lock());
1424
1425        let (checkpoint, borrowed) = {
1426            if let Some(precondition) = options.precondition {
1427                if !precondition() {
1428                    return Ok(None);
1429                }
1430            }
1431
1432            // This guard is required so that we don't insert an EndBlock record in the middle of a
1433            // transaction.
1434            let _guard = self.writer_mutex.lock();
1435
1436            self.pad_to_block()?
1437        };
1438
1439        if options.flush_device {
1440            self.flush_device(checkpoint.file_offset).await.context("sync: flush failed")?;
1441        }
1442
1443        Ok(Some((checkpoint, borrowed)))
1444    }
1445
1446    // Returns the checkpoint as it was prior to padding.  This is done because the super block
1447    // needs to record where the last transaction ends and it's the next transaction that pays the
1448    // price of the padding.
1449    fn pad_to_block(&self) -> Result<(JournalCheckpoint, u64), Error> {
1450        let mut inner = self.inner.lock();
1451        let checkpoint = inner.writer.journal_file_checkpoint();
1452        if checkpoint.file_offset % BLOCK_SIZE != 0 {
1453            JournalRecord::EndBlock.serialize_into(&mut inner.writer)?;
1454            inner.writer.pad_to_block()?;
1455            if let Some(waker) = inner.flush_waker.take() {
1456                waker.wake();
1457            }
1458        }
1459        Ok((checkpoint, self.objects.borrowed_metadata_space()))
1460    }
1461
1462    async fn flush_device(&self, checkpoint_offset: u64) -> Result<(), Error> {
1463        debug_assert_not_too_long!(poll_fn(|ctx| {
1464            let mut inner = self.inner.lock();
1465            if inner.flushed_offset >= checkpoint_offset {
1466                Poll::Ready(Ok(()))
1467            } else if inner.terminate {
1468                let context = inner
1469                    .terminate_reason
1470                    .as_ref()
1471                    .map(|e| format!("Journal closed with error: {:?}", e))
1472                    .unwrap_or_else(|| "Journal closed".to_string());
1473                Poll::Ready(Err(anyhow!(FxfsError::JournalFlushError).context(context)))
1474            } else {
1475                inner.sync_waker = Some(ctx.waker().clone());
1476                Poll::Pending
1477            }
1478        }))?;
1479
1480        let needs_flush = self.inner.lock().device_flushed_offset < checkpoint_offset;
1481        if needs_flush {
1482            let trace = self.trace.load(Ordering::Relaxed);
1483            if trace {
1484                info!("J: start flush device");
1485            }
1486            self.handle.get().unwrap().flush_device().await?;
1487            if trace {
1488                info!("J: end flush device");
1489            }
1490
1491            // We need to write a DidFlushDevice record at some point, but if we are in the
1492            // process of shutting down the filesystem, we want to leave the journal clean to
1493            // avoid there being log messages complaining about unwritten journal data, so we
1494            // queue it up so that the next transaction will trigger this record to be written.
1495            // If we are shutting down, that will never happen but since the DidFlushDevice
1496            // message is purely advisory (it reduces the number of checksums we have to verify
1497            // during replay), it doesn't matter if it isn't written.
1498            {
1499                let mut inner = self.inner.lock();
1500                inner.device_flushed_offset = checkpoint_offset;
1501                inner.needs_did_flush_device = true;
1502            }
1503
1504            // Tell the allocator that we flushed the device so that it can now start using
1505            // space that was deallocated.
1506            self.objects.allocator().did_flush_device(checkpoint_offset).await;
1507            if trace {
1508                info!("J: did flush device");
1509            }
1510        }
1511
1512        Ok(())
1513    }
1514
1515    /// Returns a copy of the super-block header.
1516    pub fn super_block_header(&self) -> SuperBlockHeader {
1517        self.inner.lock().super_block_header.clone()
1518    }
1519
1520    /// Waits for there to be sufficient space in the journal.
1521    pub async fn check_journal_space(&self) -> Result<(), Error> {
1522        loop {
1523            debug_assert_not_too_long!({
1524                let inner = self.inner.lock();
1525                if inner.terminate {
1526                    // If the flush error is set, this will never make progress, since we can't
1527                    // extend the journal any more.
1528                    let context = inner
1529                        .terminate_reason
1530                        .as_ref()
1531                        .map(|e| format!("Journal closed with error: {:?}", e))
1532                        .unwrap_or_else(|| "Journal closed".to_string());
1533                    break Err(anyhow!(FxfsError::JournalFlushError).context(context));
1534                }
1535                if self.objects.last_end_offset()
1536                    - inner.super_block_header.journal_checkpoint.file_offset
1537                    < inner.reclaim_size
1538                {
1539                    break Ok(());
1540                }
1541                if inner.image_builder_mode {
1542                    break Ok(());
1543                }
1544                if inner.disable_compactions {
1545                    break Err(
1546                        anyhow!(FxfsError::JournalFlushError).context("Compactions disabled")
1547                    );
1548                }
1549                self.reclaim_event.listen()
1550            });
1551        }
1552    }
1553
1554    fn chunk_size(&self) -> u64 {
1555        CHUNK_SIZE
1556    }
1557
1558    fn write_and_apply_mutations(&self, transaction: &mut Transaction<'_>) -> u64 {
1559        let checkpoint_before;
1560        let checkpoint_after;
1561        {
1562            let _guard = self.writer_mutex.lock();
1563            checkpoint_before = {
1564                let mut inner = self.inner.lock();
1565                let checkpoint = inner.writer.journal_file_checkpoint();
1566                for TxnMutation { object_id, mutation, .. } in transaction.mutations() {
1567                    self.objects.write_mutation(
1568                        *object_id,
1569                        mutation,
1570                        Writer(*object_id, &mut inner.writer),
1571                    );
1572                }
1573                checkpoint
1574            };
1575            let maybe_mutation =
1576                self.objects.apply_transaction(transaction, &checkpoint_before).expect(
1577                    "apply_transaction should not fail in live mode; \
1578                     filesystem will be in an inconsistent state",
1579                );
1580            checkpoint_after = {
1581                let mut inner = self.inner.lock();
1582                if let Some(mutation) = maybe_mutation {
1583                    inner
1584                        .writer
1585                        .write_record(&JournalRecord::Mutation { object_id: 0, mutation })
1586                        .unwrap();
1587                }
1588                for (device_range, checksums, first_write) in
1589                    transaction.take_checksums().into_iter()
1590                {
1591                    inner
1592                        .writer
1593                        .write_record(&JournalRecord::DataChecksums(
1594                            device_range,
1595                            Checksums::fletcher(checksums),
1596                            first_write,
1597                        ))
1598                        .unwrap();
1599                }
1600                inner.writer.write_record(&JournalRecord::Commit).unwrap();
1601
1602                inner.writer.journal_file_checkpoint()
1603            };
1604        }
1605        self.objects.did_commit_transaction(
1606            transaction,
1607            &checkpoint_before,
1608            checkpoint_after.file_offset,
1609        );
1610
1611        if let Some(waker) = self.inner.lock().flush_waker.take() {
1612            waker.wake();
1613        }
1614
1615        checkpoint_before.file_offset
1616    }
1617
1618    /// This task will flush journal data to the device when there is data that needs flushing, and
1619    /// trigger compactions when short of journal space.  It will return after the terminate method
1620    /// has been called, or an error is encountered with either flushing or compaction.
1621    pub async fn flush_task(self: Arc<Self>) {
1622        let mut flush_fut = None;
1623        let mut compact_fut = None;
1624        let mut flush_error = false;
1625        poll_fn(|ctx| loop {
1626            {
1627                let mut inner = self.inner.lock();
1628                if flush_fut.is_none() && !flush_error && self.handle.get().is_some() {
1629                    let flushable = inner.writer.flushable_bytes();
1630                    if flushable > 0 {
1631                        flush_fut = Some(self.flush(flushable).boxed());
1632                    }
1633                }
1634                if inner.terminate && flush_fut.is_none() && compact_fut.is_none() {
1635                    return Poll::Ready(());
1636                }
1637                // The / 2 is here because after compacting, we cannot reclaim the space until the
1638                // _next_ time we flush the device since the super-block is not guaranteed to
1639                // persist until then.
1640                if compact_fut.is_none()
1641                    && !inner.terminate
1642                    && !inner.disable_compactions
1643                    && !inner.image_builder_mode
1644                    && self.objects.last_end_offset()
1645                        - inner.super_block_header.journal_checkpoint.file_offset
1646                        > inner.reclaim_size / 2
1647                {
1648                    compact_fut = Some(self.compact().boxed());
1649                    inner.compaction_running = true;
1650                }
1651                inner.flush_waker = Some(ctx.waker().clone());
1652            }
1653            let mut pending = true;
1654            if let Some(fut) = flush_fut.as_mut() {
1655                if let Poll::Ready(result) = fut.poll_unpin(ctx) {
1656                    if let Err(e) = result {
1657                        self.inner.lock().terminate(Some(e.context("Flush error")));
1658                        self.reclaim_event.notify(usize::MAX);
1659                        flush_error = true;
1660                    }
1661                    flush_fut = None;
1662                    pending = false;
1663                }
1664            }
1665            if let Some(fut) = compact_fut.as_mut() {
1666                if let Poll::Ready(result) = fut.poll_unpin(ctx) {
1667                    let mut inner = self.inner.lock();
1668                    if let Err(e) = result {
1669                        inner.terminate(Some(e.context("Compaction error")));
1670                    }
1671                    compact_fut = None;
1672                    inner.compaction_running = false;
1673                    self.reclaim_event.notify(usize::MAX);
1674                    pending = false;
1675                }
1676            }
1677            if pending {
1678                return Poll::Pending;
1679            }
1680        })
1681        .await;
1682    }
1683
1684    async fn flush(&self, amount: usize) -> Result<(), Error> {
1685        let handle = self.handle.get().unwrap();
1686        let mut buf = handle.allocate_buffer(amount).await;
1687        let offset = self.inner.lock().writer.take_flushable(buf.as_mut());
1688        let len = buf.len() as u64;
1689        self.handle.get().unwrap().overwrite(offset, buf.as_mut(), false).await?;
1690        let mut inner = self.inner.lock();
1691        if let Some(waker) = inner.sync_waker.take() {
1692            waker.wake();
1693        }
1694        inner.flushed_offset = offset + len;
1695        inner.valid_to = inner.flushed_offset;
1696        Ok(())
1697    }
1698
1699    /// This should generally NOT be called externally. It is public to allow use by FIDL service
1700    /// fxfs.Debug.
1701    #[trace]
1702    pub async fn compact(&self) -> Result<(), Error> {
1703        let trace = self.trace.load(Ordering::Relaxed);
1704        debug!("Compaction starting");
1705        if trace {
1706            info!("J: start compaction");
1707        }
1708        let earliest_version = self.objects.flush().await.context("Failed to flush objects")?;
1709        self.inner.lock().super_block_header.earliest_version = earliest_version;
1710        self.write_super_block().await.context("Failed to write superblock")?;
1711        if trace {
1712            info!("J: end compaction");
1713        }
1714        debug!("Compaction finished");
1715        Ok(())
1716    }
1717
1718    pub async fn stop_compactions(&self) {
1719        loop {
1720            debug_assert_not_too_long!({
1721                let mut inner = self.inner.lock();
1722                inner.disable_compactions = true;
1723                if !inner.compaction_running {
1724                    return;
1725                }
1726                self.reclaim_event.listen()
1727            });
1728        }
1729    }
1730
1731    /// Creates a lazy inspect node named `str` under `parent` which will yield statistics for the
1732    /// journal when queried.
1733    pub fn track_statistics(self: &Arc<Self>, parent: &fuchsia_inspect::Node, name: &str) {
1734        let this = Arc::downgrade(self);
1735        parent.record_lazy_child(name, move || {
1736            let this_clone = this.clone();
1737            async move {
1738                let inspector = fuchsia_inspect::Inspector::default();
1739                if let Some(this) = this_clone.upgrade() {
1740                    let (journal_min, journal_max, journal_reclaim_size) = {
1741                        // TODO(https://fxbug.dev/42069513): Push-back or rate-limit to prevent DoS.
1742                        let inner = this.inner.lock();
1743                        (
1744                            round_down(
1745                                inner.super_block_header.journal_checkpoint.file_offset,
1746                                BLOCK_SIZE,
1747                            ),
1748                            inner.flushed_offset,
1749                            inner.reclaim_size,
1750                        )
1751                    };
1752                    let root = inspector.root();
1753                    root.record_uint("journal_min_offset", journal_min);
1754                    root.record_uint("journal_max_offset", journal_max);
1755                    root.record_uint("journal_size", journal_max - journal_min);
1756                    root.record_uint("journal_reclaim_size", journal_reclaim_size);
1757
1758                    // TODO(https://fxbug.dev/42068224): Post-compute rather than manually computing metrics.
1759                    if let Some(x) = round_div(
1760                        100 * (journal_max - journal_min),
1761                        this.objects.allocator().get_disk_bytes(),
1762                    ) {
1763                        root.record_uint("journal_size_to_disk_size_percent", x);
1764                    }
1765                }
1766                Ok(inspector)
1767            }
1768            .boxed()
1769        });
1770    }
1771
1772    /// Terminate all journal activity.
1773    pub fn terminate(&self) {
1774        self.inner.lock().terminate(/*reason*/ None);
1775        self.reclaim_event.notify(usize::MAX);
1776    }
1777}
1778
1779/// Wrapper to allow records to be written to the journal.
1780pub struct Writer<'a>(u64, &'a mut JournalWriter);
1781
1782impl Writer<'_> {
1783    pub fn write(&mut self, mutation: Mutation) {
1784        self.1.write_record(&JournalRecord::Mutation { object_id: self.0, mutation }).unwrap();
1785    }
1786}
1787
1788#[cfg(test)]
1789mod tests {
1790    use crate::filesystem::{FxFilesystem, SyncOptions};
1791    use crate::fsck::fsck;
1792    use crate::object_handle::{ObjectHandle, ReadObjectHandle, WriteObjectHandle};
1793    use crate::object_store::directory::Directory;
1794    use crate::object_store::transaction::Options;
1795    use crate::object_store::volume::root_volume;
1796    use crate::object_store::{lock_keys, HandleOptions, LockKey, ObjectStore, NO_OWNER};
1797    use fuchsia_async as fasync;
1798    use fuchsia_async::MonotonicDuration;
1799    use storage_device::fake_device::FakeDevice;
1800    use storage_device::DeviceHolder;
1801
1802    const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
1803
1804    #[fuchsia::test]
1805    async fn test_replay() {
1806        const TEST_DATA: &[u8] = b"hello";
1807
1808        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1809
1810        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1811
1812        let object_id = {
1813            let root_store = fs.root_store();
1814            let root_directory =
1815                Directory::open(&root_store, root_store.root_directory_object_id())
1816                    .await
1817                    .expect("open failed");
1818            let mut transaction = fs
1819                .clone()
1820                .new_transaction(
1821                    lock_keys![LockKey::object(
1822                        root_store.store_object_id(),
1823                        root_store.root_directory_object_id(),
1824                    )],
1825                    Options::default(),
1826                )
1827                .await
1828                .expect("new_transaction failed");
1829            let handle = root_directory
1830                .create_child_file(&mut transaction, "test")
1831                .await
1832                .expect("create_child_file failed");
1833
1834            transaction.commit().await.expect("commit failed");
1835            let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
1836            buf.as_mut_slice().copy_from_slice(TEST_DATA);
1837            handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
1838            // As this is the first sync, this will actually trigger a new super-block, but normally
1839            // this would not be the case.
1840            fs.sync(SyncOptions::default()).await.expect("sync failed");
1841            handle.object_id()
1842        };
1843
1844        {
1845            fs.close().await.expect("Close failed");
1846            let device = fs.take_device().await;
1847            device.reopen(false);
1848            let fs = FxFilesystem::open(device).await.expect("open failed");
1849            let handle = ObjectStore::open_object(
1850                &fs.root_store(),
1851                object_id,
1852                HandleOptions::default(),
1853                None,
1854            )
1855            .await
1856            .expect("open_object failed");
1857            let mut buf = handle.allocate_buffer(TEST_DEVICE_BLOCK_SIZE as usize).await;
1858            assert_eq!(handle.read(0, buf.as_mut()).await.expect("read failed"), TEST_DATA.len());
1859            assert_eq!(&buf.as_slice()[..TEST_DATA.len()], TEST_DATA);
1860            fsck(fs.clone()).await.expect("fsck failed");
1861            fs.close().await.expect("Close failed");
1862        }
1863    }
1864
1865    #[fuchsia::test]
1866    async fn test_reset() {
1867        const TEST_DATA: &[u8] = b"hello";
1868
1869        let device = DeviceHolder::new(FakeDevice::new(32768, TEST_DEVICE_BLOCK_SIZE));
1870
1871        let mut object_ids = Vec::new();
1872
1873        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1874        {
1875            let root_store = fs.root_store();
1876            let root_directory =
1877                Directory::open(&root_store, root_store.root_directory_object_id())
1878                    .await
1879                    .expect("open failed");
1880            let mut transaction = fs
1881                .clone()
1882                .new_transaction(
1883                    lock_keys![LockKey::object(
1884                        root_store.store_object_id(),
1885                        root_store.root_directory_object_id(),
1886                    )],
1887                    Options::default(),
1888                )
1889                .await
1890                .expect("new_transaction failed");
1891            let handle = root_directory
1892                .create_child_file(&mut transaction, "test")
1893                .await
1894                .expect("create_child_file failed");
1895            transaction.commit().await.expect("commit failed");
1896            let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
1897            buf.as_mut_slice().copy_from_slice(TEST_DATA);
1898            handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
1899            fs.sync(SyncOptions::default()).await.expect("sync failed");
1900            object_ids.push(handle.object_id());
1901
1902            // Create a lot of objects but don't sync at the end. This should leave the filesystem
1903            // with a half finished transaction that cannot be replayed.
1904            for i in 0..1000 {
1905                let mut transaction = fs
1906                    .clone()
1907                    .new_transaction(
1908                        lock_keys![LockKey::object(
1909                            root_store.store_object_id(),
1910                            root_store.root_directory_object_id(),
1911                        )],
1912                        Options::default(),
1913                    )
1914                    .await
1915                    .expect("new_transaction failed");
1916                let handle = root_directory
1917                    .create_child_file(&mut transaction, &format!("{}", i))
1918                    .await
1919                    .expect("create_child_file failed");
1920                transaction.commit().await.expect("commit failed");
1921                let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
1922                buf.as_mut_slice().copy_from_slice(TEST_DATA);
1923                handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
1924                object_ids.push(handle.object_id());
1925            }
1926        }
1927        fs.close().await.expect("fs close failed");
1928        let device = fs.take_device().await;
1929        device.reopen(false);
1930        let fs = FxFilesystem::open(device).await.expect("open failed");
1931        fsck(fs.clone()).await.expect("fsck failed");
1932        {
1933            let root_store = fs.root_store();
1934            // Check the first two objects which should exist.
1935            for &object_id in &object_ids[0..1] {
1936                let handle = ObjectStore::open_object(
1937                    &root_store,
1938                    object_id,
1939                    HandleOptions::default(),
1940                    None,
1941                )
1942                .await
1943                .expect("open_object failed");
1944                let mut buf = handle.allocate_buffer(TEST_DEVICE_BLOCK_SIZE as usize).await;
1945                assert_eq!(
1946                    handle.read(0, buf.as_mut()).await.expect("read failed"),
1947                    TEST_DATA.len()
1948                );
1949                assert_eq!(&buf.as_slice()[..TEST_DATA.len()], TEST_DATA);
1950            }
1951
1952            // Write one more object and sync.
1953            let root_directory =
1954                Directory::open(&root_store, root_store.root_directory_object_id())
1955                    .await
1956                    .expect("open failed");
1957            let mut transaction = fs
1958                .clone()
1959                .new_transaction(
1960                    lock_keys![LockKey::object(
1961                        root_store.store_object_id(),
1962                        root_store.root_directory_object_id(),
1963                    )],
1964                    Options::default(),
1965                )
1966                .await
1967                .expect("new_transaction failed");
1968            let handle = root_directory
1969                .create_child_file(&mut transaction, "test2")
1970                .await
1971                .expect("create_child_file failed");
1972            transaction.commit().await.expect("commit failed");
1973            let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
1974            buf.as_mut_slice().copy_from_slice(TEST_DATA);
1975            handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
1976            fs.sync(SyncOptions::default()).await.expect("sync failed");
1977            object_ids.push(handle.object_id());
1978        }
1979
1980        fs.close().await.expect("close failed");
1981        let device = fs.take_device().await;
1982        device.reopen(false);
1983        let fs = FxFilesystem::open(device).await.expect("open failed");
1984        {
1985            fsck(fs.clone()).await.expect("fsck failed");
1986
1987            // Check the first two and the last objects.
1988            for &object_id in object_ids[0..1].iter().chain(object_ids.last().cloned().iter()) {
1989                let handle = ObjectStore::open_object(
1990                    &fs.root_store(),
1991                    object_id,
1992                    HandleOptions::default(),
1993                    None,
1994                )
1995                .await
1996                .unwrap_or_else(|e| {
1997                    panic!("open_object failed (object_id: {}): {:?}", object_id, e)
1998                });
1999                let mut buf = handle.allocate_buffer(TEST_DEVICE_BLOCK_SIZE as usize).await;
2000                assert_eq!(
2001                    handle.read(0, buf.as_mut()).await.expect("read failed"),
2002                    TEST_DATA.len()
2003                );
2004                assert_eq!(&buf.as_slice()[..TEST_DATA.len()], TEST_DATA);
2005            }
2006        }
2007        fs.close().await.expect("close failed");
2008    }
2009
2010    #[fuchsia::test]
2011    async fn test_discard() {
2012        let device = {
2013            let device = DeviceHolder::new(FakeDevice::new(8192, 4096));
2014            let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2015            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
2016
2017            let store =
2018                root_volume.new_volume("test", NO_OWNER, None).await.expect("new_volume failed");
2019            let root_directory = Directory::open(&store, store.root_directory_object_id())
2020                .await
2021                .expect("open failed");
2022
2023            // Create enough data so that another journal extent is used.
2024            let mut i = 0;
2025            loop {
2026                let mut transaction = fs
2027                    .clone()
2028                    .new_transaction(
2029                        lock_keys![LockKey::object(
2030                            store.store_object_id(),
2031                            store.root_directory_object_id()
2032                        )],
2033                        Options::default(),
2034                    )
2035                    .await
2036                    .expect("new_transaction failed");
2037                root_directory
2038                    .create_child_file(&mut transaction, &format!("a {i}"))
2039                    .await
2040                    .expect("create_child_file failed");
2041                if transaction.commit().await.expect("commit failed") > super::CHUNK_SIZE {
2042                    break;
2043                }
2044                i += 1;
2045            }
2046
2047            // Compact and then disable compactions.
2048            fs.journal().compact().await.expect("compact failed");
2049            fs.journal().stop_compactions().await;
2050
2051            // Keep going until we need another journal extent.
2052            let mut i = 0;
2053            loop {
2054                let mut transaction = fs
2055                    .clone()
2056                    .new_transaction(
2057                        lock_keys![LockKey::object(
2058                            store.store_object_id(),
2059                            store.root_directory_object_id()
2060                        )],
2061                        Options::default(),
2062                    )
2063                    .await
2064                    .expect("new_transaction failed");
2065                root_directory
2066                    .create_child_file(&mut transaction, &format!("b {i}"))
2067                    .await
2068                    .expect("create_child_file failed");
2069                if transaction.commit().await.expect("commit failed") > 2 * super::CHUNK_SIZE {
2070                    break;
2071                }
2072                i += 1;
2073            }
2074
2075            // Allow the journal to flush, but we don't want to sync.
2076            fasync::Timer::new(MonotonicDuration::from_millis(10)).await;
2077            // Because we're not gracefully closing the filesystem, a Discard record will be
2078            // emitted.
2079            fs.device().snapshot().expect("snapshot failed")
2080        };
2081
2082        let fs = FxFilesystem::open(device).await.expect("open failed");
2083
2084        {
2085            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
2086
2087            let store = root_volume.volume("test", NO_OWNER, None).await.expect("volume failed");
2088
2089            let root_directory = Directory::open(&store, store.root_directory_object_id())
2090                .await
2091                .expect("open failed");
2092
2093            // Write one more transaction.
2094            let mut transaction = fs
2095                .clone()
2096                .new_transaction(
2097                    lock_keys![LockKey::object(
2098                        store.store_object_id(),
2099                        store.root_directory_object_id()
2100                    )],
2101                    Options::default(),
2102                )
2103                .await
2104                .expect("new_transaction failed");
2105            root_directory
2106                .create_child_file(&mut transaction, &format!("d"))
2107                .await
2108                .expect("create_child_file failed");
2109            transaction.commit().await.expect("commit failed");
2110        }
2111
2112        fs.close().await.expect("close failed");
2113        let device = fs.take_device().await;
2114        device.reopen(false);
2115
2116        let fs = FxFilesystem::open(device).await.expect("open failed");
2117        fsck(fs.clone()).await.expect("fsck failed");
2118        fs.close().await.expect("close failed");
2119    }
2120}
2121
2122#[cfg(fuzz)]
2123mod fuzz {
2124    use fuzz::fuzz;
2125
2126    #[fuzz]
2127    fn fuzz_journal_bytes(input: Vec<u8>) {
2128        use crate::filesystem::FxFilesystem;
2129        use fuchsia_async as fasync;
2130        use std::io::Write;
2131        use storage_device::fake_device::FakeDevice;
2132        use storage_device::DeviceHolder;
2133
2134        fasync::SendExecutor::new(4).run(async move {
2135            let device = DeviceHolder::new(FakeDevice::new(32768, 512));
2136            let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2137            fs.journal().inner.lock().writer.write_all(&input).expect("write failed");
2138            fs.close().await.expect("close failed");
2139            let device = fs.take_device().await;
2140            device.reopen(false);
2141            if let Ok(fs) = FxFilesystem::open(device).await {
2142                // `close()` can fail if there were objects to be tombstoned. If the said object is
2143                // corrupted, there will be an error when we compact the journal.
2144                let _ = fs.close().await;
2145            }
2146        });
2147    }
2148
2149    #[fuzz]
2150    fn fuzz_journal(input: Vec<super::JournalRecord>) {
2151        use crate::filesystem::FxFilesystem;
2152        use fuchsia_async as fasync;
2153        use storage_device::fake_device::FakeDevice;
2154        use storage_device::DeviceHolder;
2155
2156        fasync::SendExecutor::new(4).run(async move {
2157            let device = DeviceHolder::new(FakeDevice::new(32768, 512));
2158            let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2159            {
2160                let mut inner = fs.journal().inner.lock();
2161                for record in &input {
2162                    let _ = inner.writer.write_record(record);
2163                }
2164            }
2165            fs.close().await.expect("close failed");
2166            let device = fs.take_device().await;
2167            device.reopen(false);
2168            if let Ok(fs) = FxFilesystem::open(device).await {
2169                // `close()` can fail if there were objects to be tombstoned. If the said object is
2170                // corrupted, there will be an error when we compact the journal.
2171                let _ = fs.close().await;
2172            }
2173        });
2174    }
2175}