fxfs/object_store/journal/
super_block.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! We currently store two of these super-blocks (A/B) starting at offset 0 and 512kB.
6//!
7//! Immediately following the serialized `SuperBlockHeader` structure below is a stream of
8//! serialized operations that are replayed into the root parent `ObjectStore`. Note that the root
9//! parent object store exists entirely in RAM until serialized back into the super-block.
10//!
11//! Super-blocks are updated alternately with a monotonically increasing generation number.
12//! At mount time, the super-block used is the valid `SuperBlock` with the highest generation
13//! number.
14//!
15//! Note the asymmetry here regarding load/save:
16//!   * We load a superblock from a Device/SuperBlockInstance and return a
17//!     (SuperBlockHeader, ObjectStore) pair. The ObjectStore is populated directly from device.
18//!   * We save a superblock from a (SuperBlockHeader, Vec<ObjectItem>) pair to a WriteObjectHandle.
19//!
20//! This asymmetry is required for consistency.
21//! The Vec<ObjectItem> is produced by scanning the root_parent_store. This is the responsibility
22//! of the journal code, which must hold a lock to avoid concurrent updates. However, this lock
23//! must NOT be held when saving the superblock as additional extents may need to be allocated as
24//! part of the save process.
25use crate::errors::FxfsError;
26use crate::filesystem::{ApplyContext, ApplyMode, FxFilesystem, JournalingObject};
27use crate::log::*;
28use crate::lsm_tree::types::LayerIterator;
29use crate::lsm_tree::{LSMTree, LayerSet, Query};
30use crate::metrics;
31use crate::object_handle::ObjectHandle as _;
32use crate::object_store::allocator::Reservation;
33use crate::object_store::data_object_handle::{FileExtent, OverwriteOptions};
34use crate::object_store::journal::bootstrap_handle::BootstrapObjectHandle;
35use crate::object_store::journal::reader::{JournalReader, ReadResult};
36use crate::object_store::journal::writer::JournalWriter;
37use crate::object_store::journal::{BLOCK_SIZE, JournalCheckpoint, JournalCheckpointV32};
38use crate::object_store::object_record::{
39    ObjectItem, ObjectItemV40, ObjectItemV41, ObjectItemV43, ObjectItemV46, ObjectItemV47,
40    ObjectItemV49, ObjectItemV50,
41};
42use crate::object_store::transaction::{AssocObj, Options};
43use crate::object_store::tree::MajorCompactable;
44use crate::object_store::{
45    DataObjectHandle, HandleOptions, HandleOwner, Mutation, ObjectKey, ObjectStore, ObjectValue,
46};
47use crate::range::RangeExt;
48use crate::serialized_types::{
49    EARLIEST_SUPPORTED_VERSION, FIRST_EXTENT_IN_SUPERBLOCK_VERSION, Migrate,
50    SMALL_SUPERBLOCK_VERSION, Version, Versioned, VersionedLatest, migrate_to_version,
51};
52use anyhow::{Context, Error, bail, ensure};
53use fprint::TypeFingerprint;
54use fuchsia_inspect::{Property as _, UintProperty};
55use fuchsia_sync::Mutex;
56use futures::FutureExt;
57use rustc_hash::FxHashMap as HashMap;
58use serde::{Deserialize, Serialize};
59use std::collections::VecDeque;
60use std::fmt;
61use std::io::{Read, Write};
62use std::ops::Range;
63use std::sync::Arc;
64use std::time::SystemTime;
65use storage_device::Device;
66use uuid::Uuid;
67
68// These only exist in the root store.
69const SUPER_BLOCK_A_OBJECT_ID: u64 = 1;
70const SUPER_BLOCK_B_OBJECT_ID: u64 = 2;
71
72/// The superblock is extended in units of `SUPER_BLOCK_CHUNK_SIZE` as required.
73pub const SUPER_BLOCK_CHUNK_SIZE: u64 = 65536;
74
75/// Each superblock is one block but may contain records that extend its own length.
76pub(crate) const MIN_SUPER_BLOCK_SIZE: u64 = 4096;
77/// The first 2 * 512 KiB on the disk used to be reserved for two A/B super-blocks.
78const LEGACY_MIN_SUPER_BLOCK_SIZE: u64 = 524_288;
79
80/// All superblocks start with the magic bytes "FxfsSupr".
81const SUPER_BLOCK_MAGIC: &[u8; 8] = b"FxfsSupr";
82
83/// An enum representing one of our super-block instances.
84///
85/// This provides hard-coded constants related to the location and properties of the super-blocks
86/// that are required to bootstrap the filesystem.
87#[derive(Copy, Clone, Debug)]
88pub enum SuperBlockInstance {
89    A,
90    B,
91}
92
93impl SuperBlockInstance {
94    /// Returns the next [SuperBlockInstance] for use in round-robining writes across super-blocks.
95    pub fn next(&self) -> SuperBlockInstance {
96        match self {
97            SuperBlockInstance::A => SuperBlockInstance::B,
98            SuperBlockInstance::B => SuperBlockInstance::A,
99        }
100    }
101
102    pub fn object_id(&self) -> u64 {
103        match self {
104            SuperBlockInstance::A => SUPER_BLOCK_A_OBJECT_ID,
105            SuperBlockInstance::B => SUPER_BLOCK_B_OBJECT_ID,
106        }
107    }
108
109    /// Returns the byte range where the first extent of the [SuperBlockInstance] is stored.
110    /// (Note that a [SuperBlockInstance] may still have multiple extents.)
111    pub fn first_extent(&self) -> Range<u64> {
112        match self {
113            SuperBlockInstance::A => 0..MIN_SUPER_BLOCK_SIZE,
114            SuperBlockInstance::B => 524288..524288 + MIN_SUPER_BLOCK_SIZE,
115        }
116    }
117
118    /// We used to allocate 512kB to superblocks but this was almost always more than needed.
119    pub fn legacy_first_extent(&self) -> Range<u64> {
120        match self {
121            SuperBlockInstance::A => 0..LEGACY_MIN_SUPER_BLOCK_SIZE,
122            SuperBlockInstance::B => LEGACY_MIN_SUPER_BLOCK_SIZE..2 * LEGACY_MIN_SUPER_BLOCK_SIZE,
123        }
124    }
125}
126
127pub type SuperBlockHeader = SuperBlockHeaderV32;
128
129#[derive(
130    Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize, TypeFingerprint, Versioned,
131)]
132pub struct SuperBlockHeaderV32 {
133    /// The globally unique identifier for the filesystem.
134    pub guid: UuidWrapperV32,
135
136    /// There are two super-blocks which are used in an A/B configuration. The super-block with the
137    /// greatest generation number is what is used when mounting an Fxfs image; the other is
138    /// discarded.
139    pub generation: u64,
140
141    /// The root parent store is an in-memory only store and serves as the backing store for the
142    /// root store and the journal.  The records for this store are serialized into the super-block
143    /// and mutations are also recorded in the journal.
144    pub root_parent_store_object_id: u64,
145
146    /// The root parent needs a graveyard and there's nowhere else to store it other than in the
147    /// super-block.
148    pub root_parent_graveyard_directory_object_id: u64,
149
150    /// The root object store contains all other metadata objects (including the allocator, the
151    /// journal and the super-blocks) and is the parent for all other object stores.
152    pub root_store_object_id: u64,
153
154    /// This is in the root object store.
155    pub allocator_object_id: u64,
156
157    /// This is in the root parent object store.
158    pub journal_object_id: u64,
159
160    /// Start checkpoint for the journal file.
161    pub journal_checkpoint: JournalCheckpointV32,
162
163    /// Offset of the journal file when the super-block was written.  If no entry is present in
164    /// journal_file_offsets for a particular object, then an object might have dependencies on the
165    /// journal from super_block_journal_file_offset onwards, but not earlier.
166    pub super_block_journal_file_offset: u64,
167
168    /// object id -> journal file offset. Indicates where each object has been flushed to.
169    pub journal_file_offsets: HashMap<u64, u64>,
170
171    /// Records the amount of borrowed metadata space as applicable at
172    /// `super_block_journal_file_offset`.
173    pub borrowed_metadata_space: u64,
174
175    /// The earliest version of Fxfs used to create any still-existing struct in the filesystem.
176    ///
177    /// Note: structs in the filesystem may had been made with various different versions of Fxfs.
178    pub earliest_version: Version,
179}
180
181type UuidWrapper = UuidWrapperV32;
182#[derive(Clone, Default, Eq, PartialEq)]
183pub struct UuidWrapperV32(pub Uuid);
184
185impl UuidWrapper {
186    fn new() -> Self {
187        Self(Uuid::new_v4())
188    }
189    #[cfg(test)]
190    fn nil() -> Self {
191        Self(Uuid::nil())
192    }
193}
194
195impl fmt::Debug for UuidWrapper {
196    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
197        // The UUID uniquely identifies the filesystem, so we should redact it so that we don't leak
198        // it in logs.
199        f.write_str("<redacted>")
200    }
201}
202
203impl TypeFingerprint for UuidWrapper {
204    fn fingerprint() -> String {
205        "<[u8;16]>".to_owned()
206    }
207}
208
209// Uuid serializes like a slice, but SuperBlockHeader used to contain [u8; 16] and we want to remain
210// compatible.
211impl Serialize for UuidWrapper {
212    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
213        self.0.as_bytes().serialize(serializer)
214    }
215}
216
217impl<'de> Deserialize<'de> for UuidWrapper {
218    fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
219        <[u8; 16]>::deserialize(deserializer).map(|bytes| UuidWrapperV32(Uuid::from_bytes(bytes)))
220    }
221}
222
223pub type SuperBlockRecord = SuperBlockRecordV50;
224
225#[allow(clippy::large_enum_variant)]
226#[derive(Debug, Serialize, Deserialize, TypeFingerprint, Versioned)]
227pub enum SuperBlockRecordV50 {
228    // When reading the super-block we know the initial extent, but not subsequent extents, so these
229    // records need to exist to allow us to completely read the super-block.
230    Extent(Range<u64>),
231
232    // Following the super-block header are ObjectItem records that are to be replayed into the root
233    // parent object store.
234    ObjectItem(ObjectItemV50),
235
236    // Marks the end of the full super-block.
237    End,
238}
239
240#[allow(clippy::large_enum_variant)]
241#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
242#[migrate_to_version(SuperBlockRecordV50)]
243pub enum SuperBlockRecordV49 {
244    Extent(Range<u64>),
245    ObjectItem(ObjectItemV49),
246    End,
247}
248
249#[allow(clippy::large_enum_variant)]
250#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
251#[migrate_to_version(SuperBlockRecordV49)]
252pub enum SuperBlockRecordV47 {
253    Extent(Range<u64>),
254    ObjectItem(ObjectItemV47),
255    End,
256}
257
258#[allow(clippy::large_enum_variant)]
259#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
260#[migrate_to_version(SuperBlockRecordV47)]
261pub enum SuperBlockRecordV46 {
262    Extent(Range<u64>),
263    ObjectItem(ObjectItemV46),
264    End,
265}
266
267#[allow(clippy::large_enum_variant)]
268#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
269#[migrate_to_version(SuperBlockRecordV46)]
270pub enum SuperBlockRecordV43 {
271    Extent(Range<u64>),
272    ObjectItem(ObjectItemV43),
273    End,
274}
275
276#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
277#[migrate_to_version(SuperBlockRecordV43)]
278pub enum SuperBlockRecordV41 {
279    Extent(Range<u64>),
280    ObjectItem(ObjectItemV41),
281    End,
282}
283
284#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
285#[migrate_to_version(SuperBlockRecordV41)]
286pub enum SuperBlockRecordV40 {
287    Extent(Range<u64>),
288    ObjectItem(ObjectItemV40),
289    End,
290}
291
292struct SuperBlockMetrics {
293    /// Time we wrote the most recent superblock in milliseconds since [`std::time::UNIX_EPOCH`].
294    /// Uses [`std::time::SystemTime`] as the clock source.
295    last_super_block_update_time_ms: UintProperty,
296
297    /// Offset of the most recent superblock we wrote in the journal.
298    last_super_block_offset: UintProperty,
299}
300
301impl Default for SuperBlockMetrics {
302    fn default() -> Self {
303        SuperBlockMetrics {
304            last_super_block_update_time_ms: metrics::detail()
305                .create_uint("last_super_block_update_time_ms", 0),
306            last_super_block_offset: metrics::detail().create_uint("last_super_block_offset", 0),
307        }
308    }
309}
310
311/// Reads an individual (A/B) super-block instance and root_parent_store from device.
312/// Users should use SuperBlockManager::load() instead.
313async fn read(
314    device: Arc<dyn Device>,
315    block_size: u64,
316    instance: SuperBlockInstance,
317) -> Result<(SuperBlockHeader, SuperBlockInstance, ObjectStore), Error> {
318    let (super_block_header, mut reader) = SuperBlockHeader::read_header(device.clone(), instance)
319        .await
320        .context("failed to read superblock")?;
321    let root_parent = ObjectStore::new_root_parent(
322        device,
323        block_size,
324        super_block_header.root_parent_store_object_id,
325    );
326    root_parent.set_graveyard_directory_object_id(
327        super_block_header.root_parent_graveyard_directory_object_id,
328    );
329
330    loop {
331        // TODO: Flatten a layer and move reader here?
332        let (mutation, sequence) = match reader.next_item().await? {
333            // RecordReader should filter out extent records.
334            SuperBlockRecord::Extent(_) => bail!("Unexpected extent record"),
335            SuperBlockRecord::ObjectItem(item) => {
336                (Mutation::insert_object(item.key, item.value), item.sequence)
337            }
338            SuperBlockRecord::End => break,
339        };
340        root_parent.apply_mutation(
341            mutation,
342            &ApplyContext {
343                mode: ApplyMode::Replay,
344                checkpoint: JournalCheckpoint { file_offset: sequence, ..Default::default() },
345            },
346            AssocObj::None,
347        )?;
348    }
349    Ok((super_block_header, instance, root_parent))
350}
351
352/// Write a super-block to the given file handle.
353/// Requires that the filesystem is fully loaded and writable as this may require allocation.
354async fn write<S: HandleOwner>(
355    super_block_header: &SuperBlockHeader,
356    items: LayerSet<ObjectKey, ObjectValue>,
357    handle: DataObjectHandle<S>,
358) -> Result<(), Error> {
359    let object_manager = handle.store().filesystem().object_manager().clone();
360    // TODO(https://fxbug.dev/42177407): Don't use the same code here for Journal and SuperBlock. They
361    // aren't the same things and it is already getting convoluted. e.g of diff stream content:
362    //   Superblock:  (Magic, Ver, Header(Ver), Extent(Ver)*, SuperBlockRecord(Ver)*, ...)
363    //   Journal:     (Ver, JournalRecord(Ver)*, RESET, Ver2, JournalRecord(Ver2)*, ...)
364    // We should abstract away the checksum code and implement these separately.
365
366    let mut writer =
367        SuperBlockWriter::new(handle, super_block_header, object_manager.metadata_reservation())
368            .await?;
369    let mut merger = items.merger();
370    let mut iter = LSMTree::major_iter(merger.query(Query::FullScan).await?).await?;
371    while let Some(item) = iter.get() {
372        writer.write_root_parent_item(item.cloned()).await?;
373        iter.advance().await?;
374    }
375    writer.finalize().await
376}
377
378// Compacts and returns the *old* snapshot of the root_parent store.
379// Must be performed whilst holding a writer lock.
380pub fn compact_root_parent(
381    root_parent_store: &ObjectStore,
382) -> Result<LayerSet<ObjectKey, ObjectValue>, Error> {
383    // The root parent always uses in-memory layers which shouldn't be async, so we can use
384    // `now_or_never`.
385    let tree = root_parent_store.tree();
386    let layer_set = tree.layer_set();
387    {
388        let mut merger = layer_set.merger();
389        let mut iter = LSMTree::major_iter(merger.query(Query::FullScan).now_or_never().unwrap()?)
390            .now_or_never()
391            .unwrap()?;
392        let new_layer = LSMTree::new_mutable_layer();
393        while let Some(item_ref) = iter.get() {
394            new_layer.insert(item_ref.cloned())?;
395            iter.advance().now_or_never().unwrap()?;
396        }
397        tree.set_mutable_layer(new_layer);
398    }
399    Ok(layer_set)
400}
401
402/// This encapsulates the A/B alternating super-block logic.
403/// All super-block load/save operations should be via the methods on this type.
404pub(super) struct SuperBlockManager {
405    pub next_instance: Mutex<SuperBlockInstance>,
406    metrics: SuperBlockMetrics,
407}
408
409impl SuperBlockManager {
410    pub fn new() -> Self {
411        Self { next_instance: Mutex::new(SuperBlockInstance::A), metrics: Default::default() }
412    }
413
414    /// Loads both A/B super-blocks and root_parent ObjectStores and and returns the newest valid
415    /// pair. Also ensures the next superblock updated via |save| will be the other instance.
416    pub async fn load(
417        &self,
418        device: Arc<dyn Device>,
419        block_size: u64,
420    ) -> Result<(SuperBlockHeader, ObjectStore), Error> {
421        // Superblocks consume a minimum of one block. We currently hard code the length of
422        // this first extent. It should work with larger block sizes, but has not been tested.
423        // TODO(https://fxbug.dev/42063349): Consider relaxing this.
424        debug_assert!(MIN_SUPER_BLOCK_SIZE == block_size);
425
426        let (super_block, current_super_block, root_parent) = match futures::join!(
427            read(device.clone(), block_size, SuperBlockInstance::A),
428            read(device.clone(), block_size, SuperBlockInstance::B)
429        ) {
430            (Err(e1), Err(e2)) => {
431                bail!("Failed to load both superblocks due to {:?}\nand\n{:?}", e1, e2)
432            }
433            (Ok(result), Err(_)) => result,
434            (Err(_), Ok(result)) => result,
435            (Ok(result1), Ok(result2)) => {
436                // Break the tie by taking the super-block with the greatest generation.
437                if (result2.0.generation as i64).wrapping_sub(result1.0.generation as i64) > 0 {
438                    result2
439                } else {
440                    result1
441                }
442            }
443        };
444        info!(super_block:?, current_super_block:?; "loaded super-block");
445        *self.next_instance.lock() = current_super_block.next();
446        Ok((super_block, root_parent))
447    }
448
449    /// Writes the provided superblock and root_parent ObjectStore to the device.
450    /// Requires that the filesystem is fully loaded and writable as this may require allocation.
451    pub async fn save(
452        &self,
453        super_block_header: SuperBlockHeader,
454        filesystem: Arc<FxFilesystem>,
455        root_parent: LayerSet<ObjectKey, ObjectValue>,
456    ) -> Result<(), Error> {
457        let root_store = filesystem.root_store();
458        let object_id = {
459            let mut next_instance = self.next_instance.lock();
460            let object_id = next_instance.object_id();
461            *next_instance = next_instance.next();
462            object_id
463        };
464        let handle = ObjectStore::open_object(
465            &root_store,
466            object_id,
467            HandleOptions { skip_journal_checks: true, ..Default::default() },
468            None,
469        )
470        .await
471        .context("Failed to open superblock object")?;
472        write(&super_block_header, root_parent, handle).await?;
473        self.metrics
474            .last_super_block_offset
475            .set(super_block_header.super_block_journal_file_offset);
476        self.metrics.last_super_block_update_time_ms.set(
477            SystemTime::now()
478                .duration_since(SystemTime::UNIX_EPOCH)
479                .unwrap()
480                .as_millis()
481                .try_into()
482                .unwrap_or(0u64),
483        );
484        Ok(())
485    }
486}
487
488impl SuperBlockHeader {
489    /// Creates a new instance with random GUID.
490    pub fn new(
491        generation: u64,
492        root_parent_store_object_id: u64,
493        root_parent_graveyard_directory_object_id: u64,
494        root_store_object_id: u64,
495        allocator_object_id: u64,
496        journal_object_id: u64,
497        journal_checkpoint: JournalCheckpoint,
498        earliest_version: Version,
499    ) -> Self {
500        SuperBlockHeader {
501            guid: UuidWrapper::new(),
502            generation,
503            root_parent_store_object_id,
504            root_parent_graveyard_directory_object_id,
505            root_store_object_id,
506            allocator_object_id,
507            journal_object_id,
508            journal_checkpoint,
509            earliest_version,
510            ..Default::default()
511        }
512    }
513
514    /// Read the super-block header, and return it and a reader that produces the records that are
515    /// to be replayed in to the root parent object store.
516    async fn read_header(
517        device: Arc<dyn Device>,
518        target_super_block: SuperBlockInstance,
519    ) -> Result<(SuperBlockHeader, RecordReader), Error> {
520        let handle = BootstrapObjectHandle::new(
521            target_super_block.object_id(),
522            device,
523            target_super_block.first_extent(),
524        );
525        let mut reader = JournalReader::new(handle, &JournalCheckpoint::default());
526        reader.set_eof_ok();
527
528        reader.fill_buf().await?;
529
530        let mut super_block_header;
531        let super_block_version;
532        reader.consume({
533            let mut cursor = std::io::Cursor::new(reader.buffer());
534            // Validate magic bytes.
535            let mut magic_bytes: [u8; 8] = [0; 8];
536            cursor.read_exact(&mut magic_bytes)?;
537            if magic_bytes.as_slice() != SUPER_BLOCK_MAGIC.as_slice() {
538                bail!("Invalid magic: {:?}", magic_bytes);
539            }
540            (super_block_header, super_block_version) =
541                SuperBlockHeader::deserialize_with_version(&mut cursor)?;
542
543            if super_block_version < EARLIEST_SUPPORTED_VERSION {
544                bail!("Unsupported SuperBlock version: {:?}", super_block_version);
545            }
546
547            // NOTE: It is possible that data was written to the journal with an old version
548            // but no compaction ever happened, so the journal version could potentially be older
549            // than the layer file versions.
550            if super_block_header.journal_checkpoint.version < EARLIEST_SUPPORTED_VERSION {
551                bail!(
552                    "Unsupported JournalCheckpoint version: {:?}",
553                    super_block_header.journal_checkpoint.version
554                );
555            }
556
557            if super_block_header.earliest_version < EARLIEST_SUPPORTED_VERSION {
558                bail!(
559                    "Filesystem contains struct with unsupported version: {:?}",
560                    super_block_header.earliest_version
561                );
562            }
563
564            cursor.position() as usize
565        });
566
567        // From version 45 superblocks describe their own extents (a noop here).
568        // At version 44, superblocks assume a 4kb first extent.
569        // Prior to version 44, superblocks assume a 512kb first extent.
570        if super_block_version < SMALL_SUPERBLOCK_VERSION {
571            reader.handle().push_extent(0, target_super_block.legacy_first_extent());
572        } else if super_block_version < FIRST_EXTENT_IN_SUPERBLOCK_VERSION {
573            reader.handle().push_extent(0, target_super_block.first_extent())
574        }
575
576        // If guid is zeroed (e.g. in a newly imaged system), assign one randomly.
577        if super_block_header.guid.0.is_nil() {
578            super_block_header.guid = UuidWrapper::new();
579        }
580        reader.set_version(super_block_version);
581        Ok((super_block_header, RecordReader { reader }))
582    }
583}
584
585struct SuperBlockWriter<'a, S: HandleOwner> {
586    handle: DataObjectHandle<S>,
587    writer: JournalWriter,
588    existing_extents: VecDeque<FileExtent>,
589    size: u64,
590    reservation: &'a Reservation,
591}
592
593impl<'a, S: HandleOwner> SuperBlockWriter<'a, S> {
594    /// Create a new writer, outputs FXFS magic, version and SuperBlockHeader.
595    /// On success, the writer is ready to accept root parent store mutations.
596    pub async fn new(
597        handle: DataObjectHandle<S>,
598        super_block_header: &SuperBlockHeader,
599        reservation: &'a Reservation,
600    ) -> Result<Self, Error> {
601        let existing_extents = handle.device_extents().await?;
602        let mut this = Self {
603            handle,
604            writer: JournalWriter::new(BLOCK_SIZE as usize, 0),
605            existing_extents: existing_extents.into_iter().collect(),
606            size: 0,
607            reservation,
608        };
609        this.writer.write_all(SUPER_BLOCK_MAGIC)?;
610        super_block_header.serialize_with_version(&mut this.writer)?;
611        Ok(this)
612    }
613
614    /// Internal helper function to pull ranges from a list of existing extents and tack
615    /// corresponding extent records onto the journal.
616    fn try_extend_existing(&mut self, target_size: u64) -> Result<(), Error> {
617        while self.size < target_size {
618            if let Some(extent) = self.existing_extents.pop_front() {
619                ensure!(
620                    extent.logical_range().start == self.size,
621                    "superblock file contains a hole."
622                );
623                self.size += extent.length();
624                SuperBlockRecord::Extent(extent.device_range().clone())
625                    .serialize_into(&mut self.writer)?;
626            } else {
627                break;
628            }
629        }
630        Ok(())
631    }
632
633    pub async fn write_root_parent_item(&mut self, record: ObjectItem) -> Result<(), Error> {
634        let min_len = self.writer.journal_file_checkpoint().file_offset + SUPER_BLOCK_CHUNK_SIZE;
635        self.try_extend_existing(min_len)?;
636        if min_len > self.size {
637            // Need to allocate some more space.
638            let mut transaction = self
639                .handle
640                .new_transaction_with_options(Options {
641                    skip_journal_checks: true,
642                    borrow_metadata_space: true,
643                    allocator_reservation: Some(self.reservation),
644                    ..Default::default()
645                })
646                .await?;
647            let mut file_range = self.size..self.size + SUPER_BLOCK_CHUNK_SIZE;
648            let allocated = self
649                .handle
650                .preallocate_range(&mut transaction, &mut file_range)
651                .await
652                .context("preallocate superblock")?;
653            if file_range.start < file_range.end {
654                bail!("preallocate_range returned too little space");
655            }
656            transaction.commit().await?;
657            for device_range in allocated {
658                self.size += device_range.end - device_range.start;
659                SuperBlockRecord::Extent(device_range).serialize_into(&mut self.writer)?;
660            }
661        }
662        SuperBlockRecord::ObjectItem(record).serialize_into(&mut self.writer)?;
663        Ok(())
664    }
665
666    pub async fn finalize(mut self) -> Result<(), Error> {
667        SuperBlockRecord::End.serialize_into(&mut self.writer)?;
668        self.writer.pad_to_block()?;
669        let mut buf = self.handle.allocate_buffer(self.writer.flushable_bytes()).await;
670        let offset = self.writer.take_flushable(buf.as_mut());
671        self.handle.overwrite(offset, buf.as_mut(), OverwriteOptions::default()).await?;
672        let len =
673            std::cmp::max(MIN_SUPER_BLOCK_SIZE, self.writer.journal_file_checkpoint().file_offset)
674                + SUPER_BLOCK_CHUNK_SIZE;
675        self.handle
676            .truncate_with_options(
677                Options {
678                    skip_journal_checks: true,
679                    borrow_metadata_space: true,
680                    ..Default::default()
681                },
682                len,
683            )
684            .await?;
685        Ok(())
686    }
687}
688
689pub struct RecordReader {
690    reader: JournalReader,
691}
692
693impl RecordReader {
694    pub async fn next_item(&mut self) -> Result<SuperBlockRecord, Error> {
695        loop {
696            match self.reader.deserialize().await? {
697                ReadResult::Reset(_) => bail!("Unexpected reset"),
698                ReadResult::ChecksumMismatch => bail!("Checksum mismatch"),
699                ReadResult::Some(SuperBlockRecord::Extent(extent)) => {
700                    ensure!(extent.is_valid(), FxfsError::Inconsistent);
701                    self.reader.handle().push_extent(0, extent)
702                }
703                ReadResult::Some(x) => return Ok(x),
704            }
705        }
706    }
707}
708
709#[cfg(test)]
710mod tests {
711    use super::{
712        MIN_SUPER_BLOCK_SIZE, SUPER_BLOCK_CHUNK_SIZE, SUPER_BLOCK_MAGIC, SuperBlockHeader,
713        SuperBlockInstance, SuperBlockManager, SuperBlockRecord, UuidWrapper, compact_root_parent,
714        write,
715    };
716    use crate::filesystem::{FxFilesystem, OpenFxFilesystem, SyncOptions};
717    use crate::object_handle::ReadObjectHandle;
718    use crate::object_store::journal::JournalCheckpoint;
719    use crate::object_store::journal::writer::JournalWriter;
720    use crate::object_store::transaction::{Options, lock_keys};
721    use crate::object_store::{
722        DataObjectHandle, HandleOptions, ObjectHandle, ObjectKey, ObjectStore,
723    };
724    use crate::serialized_types::{LATEST_VERSION, Versioned, VersionedLatest};
725    use std::io::Write;
726    use storage_device::DeviceHolder;
727    use storage_device::fake_device::FakeDevice;
728
729    // We require 512kiB each for A/B super-blocks, 256kiB for the journal (128kiB before flush)
730    // and compactions require double the layer size to complete.
731    const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
732    const TEST_DEVICE_BLOCK_COUNT: u64 = 16384;
733
734    async fn filesystem_and_super_block_handles()
735    -> (OpenFxFilesystem, DataObjectHandle<ObjectStore>, DataObjectHandle<ObjectStore>) {
736        let device =
737            DeviceHolder::new(FakeDevice::new(TEST_DEVICE_BLOCK_COUNT, TEST_DEVICE_BLOCK_SIZE));
738        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
739        fs.close().await.expect("Close failed");
740        let device = fs.take_device().await;
741        device.reopen(false);
742        let fs = FxFilesystem::open(device).await.expect("open failed");
743
744        let handle_a = ObjectStore::open_object(
745            &fs.object_manager().root_store(),
746            SuperBlockInstance::A.object_id(),
747            HandleOptions::default(),
748            None,
749        )
750        .await
751        .expect("open superblock failed");
752
753        let handle_b = ObjectStore::open_object(
754            &fs.object_manager().root_store(),
755            SuperBlockInstance::B.object_id(),
756            HandleOptions::default(),
757            None,
758        )
759        .await
760        .expect("open superblock failed");
761        (fs, handle_a, handle_b)
762    }
763
764    #[fuchsia::test]
765    async fn test_read_written_super_block() {
766        let (fs, _handle_a, _handle_b) = filesystem_and_super_block_handles().await;
767        const JOURNAL_OBJECT_ID: u64 = 5;
768
769        // Confirm that the (first) super-block is expected size.
770        // It should be MIN_SUPER_BLOCK_SIZE + SUPER_BLOCK_CHUNK_SIZE.
771        assert_eq!(
772            ObjectStore::open_object(
773                &fs.root_store(),
774                SuperBlockInstance::A.object_id(),
775                HandleOptions::default(),
776                None,
777            )
778            .await
779            .expect("open_object failed")
780            .get_size(),
781            MIN_SUPER_BLOCK_SIZE + SUPER_BLOCK_CHUNK_SIZE
782        );
783
784        // Create a large number of objects in the root parent store so that we test growing
785        // of the super-block file, requiring us to add extents.
786        let mut created_object_ids = vec![];
787        const NUM_ENTRIES: u64 = 16384;
788        for _ in 0..NUM_ENTRIES {
789            let mut transaction = fs
790                .clone()
791                .new_transaction(lock_keys![], Options::default())
792                .await
793                .expect("new_transaction failed");
794            created_object_ids.push(
795                ObjectStore::create_object(
796                    &fs.object_manager().root_parent_store(),
797                    &mut transaction,
798                    HandleOptions::default(),
799                    None,
800                )
801                .await
802                .expect("create_object failed")
803                .object_id(),
804            );
805            transaction.commit().await.expect("commit failed");
806        }
807
808        // Note here that DataObjectHandle caches the size given to it at construction.
809        // If we want to know the true size after a super-block has been written, we need
810        // a new handle.
811        assert!(
812            ObjectStore::open_object(
813                &fs.root_store(),
814                SuperBlockInstance::A.object_id(),
815                HandleOptions::default(),
816                None,
817            )
818            .await
819            .expect("open_object failed")
820            .get_size()
821                > MIN_SUPER_BLOCK_SIZE + SUPER_BLOCK_CHUNK_SIZE
822        );
823
824        let written_super_block_a =
825            SuperBlockHeader::read_header(fs.device(), SuperBlockInstance::A)
826                .await
827                .expect("read failed");
828        let written_super_block_b =
829            SuperBlockHeader::read_header(fs.device(), SuperBlockInstance::B)
830                .await
831                .expect("read failed");
832
833        // Check that a non-zero GUID has been assigned.
834        assert!(!written_super_block_a.0.guid.0.is_nil());
835
836        // Depending on specific offsets is fragile so we just validate the fields we believe
837        // to be stable.
838        assert_eq!(written_super_block_a.0.guid, written_super_block_b.0.guid);
839        assert_eq!(written_super_block_a.0.guid, written_super_block_b.0.guid);
840        assert!(written_super_block_a.0.generation != written_super_block_b.0.generation);
841        assert_eq!(
842            written_super_block_a.0.root_parent_store_object_id,
843            written_super_block_b.0.root_parent_store_object_id
844        );
845        assert_eq!(
846            written_super_block_a.0.root_parent_graveyard_directory_object_id,
847            written_super_block_b.0.root_parent_graveyard_directory_object_id
848        );
849        assert_eq!(written_super_block_a.0.root_store_object_id, fs.root_store().store_object_id());
850        assert_eq!(
851            written_super_block_a.0.root_store_object_id,
852            written_super_block_b.0.root_store_object_id
853        );
854        assert_eq!(written_super_block_a.0.allocator_object_id, fs.allocator().object_id());
855        assert_eq!(
856            written_super_block_a.0.allocator_object_id,
857            written_super_block_b.0.allocator_object_id
858        );
859        assert_eq!(written_super_block_a.0.journal_object_id, JOURNAL_OBJECT_ID);
860        assert_eq!(
861            written_super_block_a.0.journal_object_id,
862            written_super_block_b.0.journal_object_id
863        );
864        assert!(
865            written_super_block_a.0.journal_checkpoint.file_offset
866                != written_super_block_b.0.journal_checkpoint.file_offset
867        );
868        assert!(
869            written_super_block_a.0.super_block_journal_file_offset
870                != written_super_block_b.0.super_block_journal_file_offset
871        );
872        // Nb: We skip journal_file_offsets and borrowed metadata space checks.
873        assert_eq!(written_super_block_a.0.earliest_version, LATEST_VERSION);
874        assert_eq!(
875            written_super_block_a.0.earliest_version,
876            written_super_block_b.0.earliest_version
877        );
878
879        // Nb: Skip comparison of root_parent store contents because we have no way of anticipating
880        // the extent offsets and it is reasonable that a/b differ.
881
882        // Delete all the objects we just made.
883        for object_id in created_object_ids {
884            let mut transaction = fs
885                .clone()
886                .new_transaction(lock_keys![], Options::default())
887                .await
888                .expect("new_transaction failed");
889            fs.object_manager()
890                .root_parent_store()
891                .adjust_refs(&mut transaction, object_id, -1)
892                .await
893                .expect("adjust_refs failed");
894            transaction.commit().await.expect("commit failed");
895            fs.object_manager()
896                .root_parent_store()
897                .tombstone_object(object_id, Options::default())
898                .await
899                .expect("tombstone failed");
900        }
901        // Write some stuff to the root store to ensure we rotate the journal and produce new
902        // super blocks.
903        for _ in 0..NUM_ENTRIES {
904            let mut transaction = fs
905                .clone()
906                .new_transaction(lock_keys![], Options::default())
907                .await
908                .expect("new_transaction failed");
909            ObjectStore::create_object(
910                &fs.object_manager().root_store(),
911                &mut transaction,
912                HandleOptions::default(),
913                None,
914            )
915            .await
916            .expect("create_object failed");
917            transaction.commit().await.expect("commit failed");
918        }
919
920        assert_eq!(
921            ObjectStore::open_object(
922                &fs.root_store(),
923                SuperBlockInstance::A.object_id(),
924                HandleOptions::default(),
925                None,
926            )
927            .await
928            .expect("open_object failed")
929            .get_size(),
930            MIN_SUPER_BLOCK_SIZE + SUPER_BLOCK_CHUNK_SIZE
931        );
932    }
933
934    #[fuchsia::test]
935    async fn test_generation_comparison_wrapping() {
936        let device = DeviceHolder::new(FakeDevice::new(
937            TEST_DEVICE_BLOCK_COUNT,
938            MIN_SUPER_BLOCK_SIZE as u32,
939        ));
940        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
941        fs.close().await.expect("close");
942        let device = fs.take_device().await;
943        device.reopen(false);
944
945        // Helper to write a superblock with a specific generation to a specific instance.
946        // We need to clone the inner Arc to pass to the closure.
947        let device_arc = (*device).clone();
948        let write_sb = |instance: SuperBlockInstance, generation: u64| {
949            let device = device_arc.clone();
950            async move {
951                let mut super_block_header = SuperBlockHeader::new(
952                    1, // generation
953                    3, // root_parent_store_object_id
954                    4, // root_parent_graveyard_directory_object_id
955                    5, // root_store_object_id
956                    6, // allocator_object_id
957                    7, // journal_object_id
958                    JournalCheckpoint::default(),
959                    LATEST_VERSION,
960                );
961                super_block_header.generation = generation;
962                super_block_header.journal_checkpoint.version = LATEST_VERSION;
963
964                let mut writer = JournalWriter::new(MIN_SUPER_BLOCK_SIZE as usize, 0);
965                writer.write_all(SUPER_BLOCK_MAGIC).unwrap();
966                super_block_header.serialize_with_version(&mut writer).unwrap();
967                SuperBlockRecord::End.serialize_into(&mut writer).unwrap();
968                writer.pad_to_block().unwrap();
969
970                let mut buf = device.allocate_buffer(writer.flushable_bytes()).await;
971                writer.take_flushable(buf.as_mut());
972                device
973                    .write(instance.first_extent().start, buf.as_ref())
974                    .await
975                    .expect("write failed");
976            }
977        };
978
979        // Case 1: A has MAX, B has 0. B should be selected.
980        write_sb(SuperBlockInstance::A, u64::MAX).await;
981        write_sb(SuperBlockInstance::B, 0).await;
982        let manager = SuperBlockManager::new();
983        let (header, _) = manager
984            .load((*device).clone(), MIN_SUPER_BLOCK_SIZE as u64)
985            .await
986            .expect("load failed");
987        assert_eq!(header.generation, 0);
988
989        // Case 2: A has 0, B has MAX. A should be selected.
990        write_sb(SuperBlockInstance::A, 0).await;
991        write_sb(SuperBlockInstance::B, u64::MAX).await;
992        let manager = SuperBlockManager::new();
993        let (header, _) = manager
994            .load((*device).clone(), MIN_SUPER_BLOCK_SIZE as u64)
995            .await
996            .expect("load failed");
997        assert_eq!(header.generation, 0);
998
999        // Case 3: A has 100, B has 200. B should be selected.
1000        write_sb(SuperBlockInstance::A, 100).await;
1001        write_sb(SuperBlockInstance::B, 200).await;
1002        let manager = SuperBlockManager::new();
1003        let (header, _) = manager
1004            .load((*device).clone(), MIN_SUPER_BLOCK_SIZE as u64)
1005            .await
1006            .expect("load failed");
1007        assert_eq!(header.generation, 200);
1008    }
1009
1010    #[fuchsia::test]
1011    async fn test_generation_wrapping_on_flush() {
1012        let block_size = 4096;
1013        let mut device =
1014            DeviceHolder::new(FakeDevice::new(TEST_DEVICE_BLOCK_COUNT, block_size as u32));
1015        {
1016            let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1017            let root_store = fs.root_store();
1018            let mut transaction = fs
1019                .clone()
1020                .new_transaction(lock_keys![], Options::default())
1021                .await
1022                .expect("new_transaction failed");
1023            ObjectStore::create_object(
1024                &root_store,
1025                &mut transaction,
1026                HandleOptions::default(),
1027                None,
1028            )
1029            .await
1030            .expect("create_object failed");
1031            transaction.commit().await.expect("commit failed");
1032            fs.sync(SyncOptions::default()).await.expect("sync failed");
1033            fs.close().await.expect("close failed");
1034            device = fs.take_device().await;
1035        }
1036        device.reopen(false);
1037
1038        let manager = SuperBlockManager::new();
1039        let (mut header, _) =
1040            manager.load((*device).clone(), block_size as u64).await.expect("load failed");
1041
1042        {
1043            let fs = FxFilesystem::open(device).await.expect("open failed");
1044            // To test wrapping, we need to get into a state where the current generation is
1045            // u64::MAX. Since we have A and B, and wrapping comparison is used, we need to set them
1046            // up carefully. We will set A to u64::MAX - 1, and B to u64::MAX.
1047            // Then the next write will be to A, and should be 0.
1048            header.generation = u64::MAX - 1;
1049            manager
1050                .save(header.clone(), (*fs).clone(), fs.root_parent_store().tree().layer_set())
1051                .await
1052                .expect("save 1 failed");
1053            header.generation = u64::MAX;
1054            manager
1055                .save(header, (*fs).clone(), fs.root_parent_store().tree().layer_set())
1056                .await
1057                .expect("save 2 failed");
1058            fs.close().await.expect("close failed");
1059            device = fs.take_device().await;
1060            device.reopen(false);
1061
1062            let fs = FxFilesystem::open(device).await.expect("open failed");
1063
1064            let root_store = fs.root_store();
1065            for _ in 0..6000 {
1066                let mut transaction = fs
1067                    .clone()
1068                    .new_transaction(lock_keys![], Options::default())
1069                    .await
1070                    .expect("new_transaction failed");
1071                ObjectStore::create_object(
1072                    &root_store,
1073                    &mut transaction,
1074                    HandleOptions::default(),
1075                    None,
1076                )
1077                .await
1078                .expect("create_object failed");
1079                transaction.commit().await.expect("commit failed");
1080            }
1081            fs.sync(SyncOptions::default()).await.expect("sync failed");
1082            fs.close().await.expect("close failed");
1083            device = fs.take_device().await;
1084        }
1085        device.reopen(false);
1086
1087        let (header, _) =
1088            manager.load((*device).clone(), block_size as u64).await.expect("load failed");
1089        assert!(header.generation < 10);
1090    }
1091
1092    #[fuchsia::test]
1093    async fn test_guid_assign_on_read() {
1094        let (fs, handle_a, _handle_b) = filesystem_and_super_block_handles().await;
1095        const JOURNAL_OBJECT_ID: u64 = 5;
1096        let mut super_block_header_a = SuperBlockHeader::new(
1097            1,
1098            fs.object_manager().root_parent_store().store_object_id(),
1099            /* root_parent_graveyard_directory_object_id: */ 1000,
1100            fs.root_store().store_object_id(),
1101            fs.allocator().object_id(),
1102            JOURNAL_OBJECT_ID,
1103            JournalCheckpoint { file_offset: 1234, checksum: 5678, version: LATEST_VERSION },
1104            /* earliest_version: */ LATEST_VERSION,
1105        );
1106        // Ensure the superblock has no set GUID.
1107        super_block_header_a.guid = UuidWrapper::nil();
1108        write(
1109            &super_block_header_a,
1110            compact_root_parent(fs.object_manager().root_parent_store().as_ref())
1111                .expect("scan failed"),
1112            handle_a,
1113        )
1114        .await
1115        .expect("write failed");
1116        let super_block_header = SuperBlockHeader::read_header(fs.device(), SuperBlockInstance::A)
1117            .await
1118            .expect("read failed");
1119        // Ensure a GUID has been assigned.
1120        assert!(!super_block_header.0.guid.0.is_nil());
1121    }
1122
1123    #[fuchsia::test]
1124    async fn test_init_wipes_superblocks() {
1125        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1126
1127        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1128        let root_store = fs.root_store();
1129        // Generate enough work to induce a journal flush and thus a new superblock being written.
1130        for _ in 0..6000 {
1131            let mut transaction = fs
1132                .clone()
1133                .new_transaction(lock_keys![], Options::default())
1134                .await
1135                .expect("new_transaction failed");
1136            ObjectStore::create_object(
1137                &root_store,
1138                &mut transaction,
1139                HandleOptions::default(),
1140                None,
1141            )
1142            .await
1143            .expect("create_object failed");
1144            transaction.commit().await.expect("commit failed");
1145        }
1146        fs.close().await.expect("Close failed");
1147        let device = fs.take_device().await;
1148        device.reopen(false);
1149
1150        SuperBlockHeader::read_header(device.clone(), SuperBlockInstance::A)
1151            .await
1152            .expect("read failed");
1153        let header = SuperBlockHeader::read_header(device.clone(), SuperBlockInstance::B)
1154            .await
1155            .expect("read failed");
1156
1157        let old_guid = header.0.guid;
1158
1159        // Re-initialize the filesystem.  The A and B blocks should be for the new FS.
1160        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1161        fs.close().await.expect("Close failed");
1162        let device = fs.take_device().await;
1163        device.reopen(false);
1164
1165        let a = SuperBlockHeader::read_header(device.clone(), SuperBlockInstance::A)
1166            .await
1167            .expect("read failed");
1168        let b = SuperBlockHeader::read_header(device.clone(), SuperBlockInstance::B)
1169            .await
1170            .expect("read failed");
1171
1172        assert_eq!(a.0.guid, b.0.guid);
1173        assert_ne!(old_guid, a.0.guid);
1174    }
1175
1176    #[fuchsia::test]
1177    async fn test_alternating_super_blocks() {
1178        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1179
1180        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1181        fs.close().await.expect("Close failed");
1182        let device = fs.take_device().await;
1183        device.reopen(false);
1184
1185        let (super_block_header_a, _) =
1186            SuperBlockHeader::read_header(device.clone(), SuperBlockInstance::A)
1187                .await
1188                .expect("read failed");
1189
1190        // The second super-block won't be valid at this time so there's no point reading it.
1191
1192        let fs = FxFilesystem::open(device).await.expect("open failed");
1193        let root_store = fs.root_store();
1194        // Generate enough work to induce a journal flush.
1195        for _ in 0..6000 {
1196            let mut transaction = fs
1197                .clone()
1198                .new_transaction(lock_keys![], Options::default())
1199                .await
1200                .expect("new_transaction failed");
1201            ObjectStore::create_object(
1202                &root_store,
1203                &mut transaction,
1204                HandleOptions::default(),
1205                None,
1206            )
1207            .await
1208            .expect("create_object failed");
1209            transaction.commit().await.expect("commit failed");
1210        }
1211        fs.close().await.expect("Close failed");
1212        let device = fs.take_device().await;
1213        device.reopen(false);
1214
1215        let (super_block_header_a_after, _) =
1216            SuperBlockHeader::read_header(device.clone(), SuperBlockInstance::A)
1217                .await
1218                .expect("read failed");
1219        let (super_block_header_b_after, _) =
1220            SuperBlockHeader::read_header(device.clone(), SuperBlockInstance::B)
1221                .await
1222                .expect("read failed");
1223
1224        // It's possible that multiple super-blocks were written, so cater for that.
1225
1226        // The sequence numbers should be one apart.
1227        assert_eq!(
1228            (super_block_header_b_after.generation as i64
1229                - super_block_header_a_after.generation as i64)
1230                .abs(),
1231            1
1232        );
1233
1234        // At least one super-block should have been written.
1235        assert!(
1236            std::cmp::max(
1237                super_block_header_a_after.generation,
1238                super_block_header_b_after.generation
1239            ) > super_block_header_a.generation
1240        );
1241
1242        // They should have the same oddness.
1243        assert_eq!(super_block_header_a_after.generation & 1, super_block_header_a.generation & 1);
1244    }
1245
1246    #[fuchsia::test]
1247    async fn test_root_parent_is_compacted() {
1248        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1249
1250        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1251
1252        let mut transaction = fs
1253            .clone()
1254            .new_transaction(lock_keys![], Options::default())
1255            .await
1256            .expect("new_transaction failed");
1257        let store = fs.root_parent_store();
1258        let handle =
1259            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
1260                .await
1261                .expect("create_object failed");
1262        transaction.commit().await.expect("commit failed");
1263
1264        store
1265            .tombstone_object(handle.object_id(), Options::default())
1266            .await
1267            .expect("tombstone failed");
1268
1269        // Generate enough work to induce a journal flush.
1270        let root_store = fs.root_store();
1271        for _ in 0..6000 {
1272            let mut transaction = fs
1273                .clone()
1274                .new_transaction(lock_keys![], Options::default())
1275                .await
1276                .expect("new_transaction failed");
1277            ObjectStore::create_object(
1278                &root_store,
1279                &mut transaction,
1280                HandleOptions::default(),
1281                None,
1282            )
1283            .await
1284            .expect("create_object failed");
1285            transaction.commit().await.expect("commit failed");
1286        }
1287
1288        // The root parent store should have been compacted, so we shouldn't be able to find any
1289        // record referring to the object we tombstoned.
1290        assert_eq!(
1291            store.tree().find(&ObjectKey::object(handle.object_id())).await.expect("find failed"),
1292            None
1293        );
1294    }
1295}