fxfs/
object_store.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5pub mod allocator;
6pub mod caching_object_handle;
7pub mod data_object_handle;
8pub mod directory;
9mod extent_mapping_iterator;
10mod extent_record;
11mod flush;
12pub mod graveyard;
13mod install;
14pub mod journal;
15mod key_manager;
16pub(crate) mod merge;
17pub mod object_manager;
18pub mod object_record;
19pub mod project_id;
20mod store_object_handle;
21pub mod transaction;
22mod tree;
23mod tree_cache;
24pub mod volume;
25
26pub use data_object_handle::{
27    DataObjectHandle, DirectWriter, FileExtent, FsverityState, FsverityStateInner, RangeType,
28};
29pub use directory::Directory;
30pub use object_record::{ChildValue, ObjectDescriptor, PosixAttributes, Timestamp};
31pub use store_object_handle::{
32    EXTENDED_ATTRIBUTE_RANGE_END, EXTENDED_ATTRIBUTE_RANGE_START, SetExtendedAttributeMode,
33    StoreObjectHandle,
34};
35
36use crate::errors::FxfsError;
37use crate::filesystem::{
38    ApplyContext, ApplyMode, FxFilesystem, JournalingObject, MAX_FILE_SIZE, SyncOptions,
39    TruncateGuard, TxnGuard,
40};
41use crate::log::*;
42use crate::lsm_tree::cache::{NullCache, ObjectCache};
43use crate::lsm_tree::types::{Existence, Item, ItemRef, LayerIterator};
44use crate::lsm_tree::{LSMTree, Query};
45use crate::object_handle::{INVALID_OBJECT_ID, ObjectHandle, ObjectProperties, ReadObjectHandle};
46use crate::object_store::allocator::Allocator;
47use crate::object_store::graveyard::Graveyard;
48use crate::object_store::journal::{JournalCheckpoint, JournalCheckpointV32, JournaledTransaction};
49use crate::object_store::key_manager::KeyManager;
50use crate::object_store::transaction::{
51    AssocObj, AssociatedObject, LockKey, ObjectStoreMutation, Operation, Options, Transaction,
52    lock_keys,
53};
54use crate::range::RangeExt;
55use crate::round::round_up;
56use crate::serialized_types::{Version, Versioned, VersionedLatest};
57use anyhow::{Context, Error, anyhow, bail, ensure};
58use async_trait::async_trait;
59use fidl_fuchsia_io as fio;
60use fprint::TypeFingerprint;
61use fuchsia_sync::Mutex;
62use fxfs_crypto::ff1::Ff1;
63use fxfs_crypto::{
64    Cipher, CipherHolder, Crypt, FxfsCipher, KeyPurpose, ObjectType, StreamCipher, UnwrappedKey,
65    WrappingKeyId,
66};
67use fxfs_macros::{Migrate, migrate_to_version};
68use rand::RngCore;
69use scopeguard::ScopeGuard;
70use serde::{Deserialize, Serialize};
71use std::collections::HashSet;
72use std::fmt;
73use std::num::NonZero;
74use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
75use std::sync::{Arc, OnceLock, Weak};
76use storage_device::Device;
77use uuid::Uuid;
78
79pub use extent_record::{
80    BLOB_MERKLE_ATTRIBUTE_ID, DEFAULT_DATA_ATTRIBUTE_ID, ExtentKey, ExtentMode, ExtentValue,
81    FSVERITY_MERKLE_ATTRIBUTE_ID,
82};
83pub use object_record::{
84    AttributeKey, EncryptionKey, EncryptionKeys, ExtendedAttributeValue, FsverityMetadata, FxfsKey,
85    FxfsKeyV40, FxfsKeyV49, ObjectAttributes, ObjectKey, ObjectKeyData, ObjectKind, ObjectValue,
86    ProjectProperty, RootDigest,
87};
88pub use transaction::Mutation;
89
90// For encrypted stores, the lower 32 bits of the object ID are encrypted to make side-channel
91// attacks more difficult. This mask can be used to extract the hi part of the object ID.
92const OBJECT_ID_HI_MASK: u64 = 0xffffffff00000000;
93
94// At time of writing, this threshold limits transactions that delete extents to about 10,000 bytes.
95const TRANSACTION_MUTATION_THRESHOLD: usize = 200;
96
97// Encrypted files and directories use the fscrypt key (identified by `FSCRYPT_KEY_ID`) to encrypt
98// file contents and filenames respectively. All non-fscrypt encrypted files otherwise default to
99// using the `VOLUME_DATA_KEY_ID` key. Note, the filesystem always uses the `VOLUME_DATA_KEY_ID`
100// key to encrypt large extended attributes. Thus, encrypted files and directories with large
101// xattrs will have both an fscrypt and volume data key.
102pub const VOLUME_DATA_KEY_ID: u64 = 0;
103pub const FSCRYPT_KEY_ID: u64 = 1;
104
105/// A constant that can be used where an owner is expected of type `Weak<dyn StoreOwner>` but no
106/// owner is required.
107pub const NO_OWNER: Weak<()> = Weak::new();
108impl StoreOwner for () {}
109
110#[async_trait]
111pub trait StoreOwner: Send + Sync {
112    /// Forcibly lock the store.  This exists to give the StoreOwner an opportunity to clean up
113    /// tasks which might access the store before locking it, because ObjectStore::unlock can only
114    /// be called when the store is not in use.
115    async fn force_lock(self: Arc<Self>, _store: &ObjectStore) -> Result<(), Error> {
116        Err(anyhow!(FxfsError::Internal))
117    }
118}
119
120/// DataObjectHandle stores an owner that must implement this trait, which allows the handle to get
121/// back to an ObjectStore.
122pub trait HandleOwner: AsRef<ObjectStore> + Send + Sync + 'static {}
123
124/// StoreInfo stores information about the object store.  This is stored within the parent object
125/// store, and is used, for example, to get the persistent layer objects.
126pub type StoreInfo = StoreInfoV52;
127
128#[derive(Clone, Debug, Default, Serialize, Deserialize, TypeFingerprint, Versioned)]
129pub struct StoreInfoV52 {
130    /// The globally unique identifier for the associated object store. If unset, will be all zero.
131    guid: [u8; 16],
132
133    /// The last used object ID.  Note that this field is not accurate in memory; ObjectStore's
134    /// last_object_id field is the one to use in that case.  Technically, this might not be the
135    /// last object ID used for the latest transaction that created an object because we use this at
136    /// the point of creating the object but before we commit the transaction.  Transactions can
137    /// then get committed in an arbitrary order (or not at all).
138    last_object_id: LastObjectIdInfo,
139
140    /// Object ids for layers.  TODO(https://fxbug.dev/42178036): need a layer of indirection here
141    /// so we can support snapshots.
142    pub layers: Vec<u64>,
143
144    /// The object ID for the root directory.
145    root_directory_object_id: u64,
146
147    /// The object ID for the graveyard.
148    graveyard_directory_object_id: u64,
149
150    /// The number of live objects in the store.  This should *not* be trusted; it can be invalid
151    /// due to filesystem inconsistencies.
152    object_count: u64,
153
154    /// The (wrapped) key that encrypted mutations should use.
155    mutations_key: Option<FxfsKeyV49>,
156
157    /// Mutations for the store are encrypted using a stream cipher.  To decrypt the mutations, we
158    /// need to know the offset in the cipher stream to start it.
159    mutations_cipher_offset: u64,
160
161    /// If we have to flush the store whilst we do not have the key, we need to write the encrypted
162    /// mutations to an object. This is the object ID of that file if it exists.
163    pub encrypted_mutations_object_id: u64,
164
165    /// A directory for storing internal files in a directory structure. Holds INVALID_OBJECT_ID
166    /// when the directory doesn't yet exist.
167    internal_directory_object_id: u64,
168}
169
170#[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint)]
171enum LastObjectIdInfo {
172    Unencrypted {
173        id: u64,
174    },
175    Encrypted {
176        /// The *unencrypted* value of the last object ID.
177        id: u64,
178
179        /// Object IDs are encrypted to reduce the amount of information that sequential object IDs
180        /// reveal (such as the number of files in the system and the ordering of their creation in
181        /// time).  Only the bottom 32 bits of the object ID are encrypted whilst the top 32 bits
182        /// will increment after 2^32 object IDs have been used and this allows us to roll the key.
183        key: FxfsKeyV49,
184    },
185    Low32Bit,
186}
187
188impl Default for LastObjectIdInfo {
189    fn default() -> Self {
190        LastObjectIdInfo::Unencrypted { id: 0 }
191    }
192}
193
194#[derive(Default, Serialize, Deserialize, TypeFingerprint, Versioned)]
195pub struct StoreInfoV49 {
196    guid: [u8; 16],
197    last_object_id: u64,
198    layers: Vec<u64>,
199    root_directory_object_id: u64,
200    graveyard_directory_object_id: u64,
201    object_count: u64,
202    mutations_key: Option<FxfsKeyV49>,
203    mutations_cipher_offset: u64,
204    encrypted_mutations_object_id: u64,
205    object_id_key: Option<FxfsKeyV49>,
206    internal_directory_object_id: u64,
207}
208
209impl From<StoreInfoV49> for StoreInfoV52 {
210    fn from(value: StoreInfoV49) -> Self {
211        Self {
212            guid: value.guid,
213            last_object_id: if let Some(key) = value.object_id_key {
214                LastObjectIdInfo::Encrypted { id: value.last_object_id, key: key }
215            } else {
216                LastObjectIdInfo::Unencrypted { id: value.last_object_id }
217            },
218            layers: value.layers,
219            root_directory_object_id: value.root_directory_object_id,
220            graveyard_directory_object_id: value.graveyard_directory_object_id,
221            object_count: value.object_count,
222            mutations_key: value.mutations_key,
223            mutations_cipher_offset: value.mutations_cipher_offset,
224            encrypted_mutations_object_id: value.encrypted_mutations_object_id,
225            internal_directory_object_id: value.internal_directory_object_id,
226        }
227    }
228}
229
230#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
231#[migrate_to_version(StoreInfoV49)]
232pub struct StoreInfoV40 {
233    guid: [u8; 16],
234    last_object_id: u64,
235    layers: Vec<u64>,
236    root_directory_object_id: u64,
237    graveyard_directory_object_id: u64,
238    object_count: u64,
239    mutations_key: Option<FxfsKeyV40>,
240    mutations_cipher_offset: u64,
241    encrypted_mutations_object_id: u64,
242    object_id_key: Option<FxfsKeyV40>,
243    internal_directory_object_id: u64,
244}
245
246impl StoreInfo {
247    /// Returns the parent objects for this store.
248    pub fn parent_objects(&self) -> Vec<u64> {
249        // We should not include the ID of the store itself, since that should be referred to in the
250        // volume directory.
251        let mut objects = self.layers.to_vec();
252        if self.encrypted_mutations_object_id != INVALID_OBJECT_ID {
253            objects.push(self.encrypted_mutations_object_id);
254        }
255        objects
256    }
257}
258
259// TODO(https://fxbug.dev/42178037): We should test or put checks in place to ensure this limit isn't exceeded.
260// It will likely involve placing limits on the maximum number of layers.
261pub const MAX_STORE_INFO_SERIALIZED_SIZE: usize = 131072;
262
263// This needs to be large enough to accommodate the maximum amount of unflushed data (data that is
264// in the journal but hasn't yet been written to layer files) for a store.  We set a limit because
265// we want to limit the amount of memory use in the case the filesystem is corrupt or under attack.
266pub const MAX_ENCRYPTED_MUTATIONS_SIZE: usize = 8 * journal::DEFAULT_RECLAIM_SIZE as usize;
267
268#[derive(Default)]
269pub struct HandleOptions {
270    /// If true, transactions used by this handle will skip journal space checks.
271    pub skip_journal_checks: bool,
272    /// If true, data written to any attribute of this handle will not have per-block checksums
273    /// computed.
274    pub skip_checksums: bool,
275    /// If true, any files using fsverity will not attempt to perform any verification. This is
276    /// useful to open an object without the correct encryption keys to look at the metadata.
277    pub skip_fsverity: bool,
278}
279
280/// Parameters for encrypting a newly created object.
281pub struct ObjectEncryptionOptions {
282    /// If set, the keys are treated as permanent and never evicted from the KeyManager cache.
283    /// This is necessary when keys are managed by another store; for example, the layer files
284    /// of a child store are objects in the root store, but they are encrypted with keys from the
285    /// child store.  Generally, most objects should have this set to `false`.
286    pub permanent: bool,
287    pub key_id: u64,
288    pub key: EncryptionKey,
289    pub unwrapped_key: UnwrappedKey,
290}
291
292pub struct StoreOptions {
293    /// The owner of the store.
294    pub owner: Weak<dyn StoreOwner>,
295
296    /// The store is unencrypted if store is none.
297    pub crypt: Option<Arc<dyn Crypt>>,
298}
299
300impl Default for StoreOptions {
301    fn default() -> Self {
302        Self { owner: NO_OWNER, crypt: None }
303    }
304}
305
306#[derive(Default)]
307pub struct NewChildStoreOptions {
308    pub options: StoreOptions,
309
310    /// Specifies the object ID in the root store to be used for the store.  If set to
311    /// INVALID_OBJECT_ID (the default and typical case), a suitable ID will be chosen.
312    pub object_id: u64,
313
314    /// If true, reserve all 32 bit object_ids.  All new objects will start with IDs exceeding
315    /// 0x1_0000_0000.
316    pub reserve_32bit_object_ids: bool,
317
318    /// Object IDs will be restricted to 32 bits.  This involves a less performant algorithm and so
319    /// should not be used unless necessary.
320    pub low_32_bit_object_ids: bool,
321
322    /// If set, use this GUID for the new store.
323    pub guid: Option<[u8; 16]>,
324}
325
326pub type EncryptedMutations = EncryptedMutationsV49;
327
328#[derive(Clone, Default, Deserialize, Serialize, TypeFingerprint)]
329pub struct EncryptedMutationsV49 {
330    // Information about the mutations are held here, but the actual encrypted data is held within
331    // data.  For each transaction, we record the checkpoint and the count of mutations within the
332    // transaction.  The checkpoint is required for the log file offset (which we need to apply the
333    // mutations), and the version so that we can correctly decode the mutation after it has been
334    // decrypted. The count specifies the number of serialized mutations encoded in |data|.
335    transactions: Vec<(JournalCheckpointV32, u64)>,
336
337    // The encrypted mutations.
338    data: Vec<u8>,
339
340    // If the mutations key was rolled, this holds the offset in `data` where the new key should
341    // apply.
342    mutations_key_roll: Vec<(usize, FxfsKeyV49)>,
343}
344
345impl std::fmt::Debug for EncryptedMutations {
346    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
347        f.debug_struct("EncryptedMutations")
348            .field("transactions", &self.transactions)
349            .field("len", &self.data.len())
350            .field(
351                "mutations_key_roll",
352                &self.mutations_key_roll.iter().map(|k| k.0).collect::<Vec<usize>>(),
353            )
354            .finish()
355    }
356}
357
358impl Versioned for EncryptedMutations {
359    fn max_serialized_size() -> u64 {
360        MAX_ENCRYPTED_MUTATIONS_SIZE as u64
361    }
362}
363
364impl From<EncryptedMutationsV40> for EncryptedMutationsV49 {
365    fn from(value: EncryptedMutationsV40) -> Self {
366        EncryptedMutationsV49 {
367            transactions: value.transactions,
368            data: value.data,
369            mutations_key_roll: value
370                .mutations_key_roll
371                .into_iter()
372                .map(|(offset, key)| (offset, key.into()))
373                .collect(),
374        }
375    }
376}
377
378#[derive(Deserialize, Serialize, TypeFingerprint)]
379pub struct EncryptedMutationsV40 {
380    transactions: Vec<(JournalCheckpointV32, u64)>,
381    data: Vec<u8>,
382    mutations_key_roll: Vec<(usize, FxfsKeyV40)>,
383}
384
385impl Versioned for EncryptedMutationsV40 {
386    fn max_serialized_size() -> u64 {
387        MAX_ENCRYPTED_MUTATIONS_SIZE as u64
388    }
389}
390
391impl EncryptedMutations {
392    fn from_replayed_mutations(
393        store_object_id: u64,
394        transactions: Vec<JournaledTransaction>,
395    ) -> Self {
396        let mut this = Self::default();
397        for JournaledTransaction { checkpoint, non_root_mutations, .. } in transactions {
398            for (object_id, mutation) in non_root_mutations {
399                if store_object_id == object_id {
400                    if let Mutation::EncryptedObjectStore(data) = mutation {
401                        this.push(&checkpoint, data);
402                    } else if let Mutation::UpdateMutationsKey(key) = mutation {
403                        this.mutations_key_roll.push((this.data.len(), key.into()));
404                    }
405                }
406            }
407        }
408        this
409    }
410
411    fn extend(&mut self, other: &EncryptedMutations) {
412        self.transactions.extend_from_slice(&other.transactions[..]);
413        self.mutations_key_roll.extend(
414            other
415                .mutations_key_roll
416                .iter()
417                .map(|(offset, key)| (offset + self.data.len(), key.clone())),
418        );
419        self.data.extend_from_slice(&other.data[..]);
420    }
421
422    fn push(&mut self, checkpoint: &JournalCheckpoint, data: Box<[u8]>) {
423        self.data.append(&mut data.into());
424        // If the checkpoint is the same as the last mutation we pushed, increment the count.
425        if let Some((last_checkpoint, count)) = self.transactions.last_mut() {
426            if last_checkpoint.file_offset == checkpoint.file_offset {
427                *count += 1;
428                return;
429            }
430        }
431        self.transactions.push((checkpoint.clone(), 1));
432    }
433}
434
435pub enum LockState {
436    Locked,
437    Unencrypted,
438    Unlocked { owner: Weak<dyn StoreOwner>, crypt: Arc<dyn Crypt> },
439
440    // The store is unlocked, but in a read-only state, and no flushes or other operations will be
441    // performed on the store.
442    UnlockedReadOnly(Arc<dyn Crypt>),
443
444    // The store is encrypted but is now in an unusable state (due to a failure to sync the journal
445    // after locking the store).  The store cannot be unlocked.
446    Invalid,
447
448    // Before we've read the StoreInfo we might not know whether the store is Locked or Unencrypted.
449    // This can happen when lazily opening stores (ObjectManager::lazy_open_store).
450    Unknown,
451
452    // The store is in the process of being locked.  Whilst the store is being locked, the store
453    // isn't usable; assertions will trip if any mutations are applied.
454    Locking,
455
456    // Whilst we're unlocking, we will replay encrypted mutations.  The store isn't usable until
457    // it's in the Unlocked state.
458    Unlocking,
459
460    // The store has been deleted.
461    Deleted,
462}
463
464impl LockState {
465    fn owner(&self) -> Option<Arc<dyn StoreOwner>> {
466        if let Self::Unlocked { owner, .. } = self { owner.upgrade() } else { None }
467    }
468}
469
470impl fmt::Debug for LockState {
471    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
472        formatter.write_str(match self {
473            LockState::Locked => "Locked",
474            LockState::Unencrypted => "Unencrypted",
475            LockState::Unlocked { .. } => "Unlocked",
476            LockState::UnlockedReadOnly(..) => "UnlockedReadOnly",
477            LockState::Invalid => "Invalid",
478            LockState::Unknown => "Unknown",
479            LockState::Locking => "Locking",
480            LockState::Unlocking => "Unlocking",
481            LockState::Deleted => "Deleted",
482        })
483    }
484}
485
486enum LastObjectId {
487    // This is used when the store is encrypted, but the key and ID isn't yet available.
488    Pending,
489
490    Unencrypted {
491        id: u64,
492    },
493    Encrypted {
494        // The *unencrypted* value of the last object ID.
495        id: u64,
496
497        // Encrypted stores will use a cipher to obfuscate the object ID.
498        cipher: Box<Ff1>,
499    },
500
501    Low32Bit {
502        reserved: HashSet<u32>,
503        unreserved: Vec<u32>,
504    },
505}
506
507impl LastObjectId {
508    /// Tries to get the next object ID.  Returns None if a new cipher is required because all
509    /// object IDs that can be generated with the current cipher have been exhausted, or if only
510    /// using the lower 32 bits which requires an async algorithm.
511    fn try_get_next(&mut self) -> Option<NonZero<u64>> {
512        match self {
513            LastObjectId::Unencrypted { id } => {
514                NonZero::new(id.wrapping_add(1)).inspect(|next| *id = next.get())
515            }
516            LastObjectId::Encrypted { id, cipher } => {
517                let mut next = *id;
518                let hi = next & OBJECT_ID_HI_MASK;
519                loop {
520                    if next as u32 == u32::MAX {
521                        return None;
522                    }
523                    next += 1;
524                    let candidate = hi | cipher.encrypt(next as u32) as u64;
525                    if let Some(candidate) = NonZero::new(candidate) {
526                        *id = next;
527                        return Some(candidate);
528                    }
529                }
530            }
531            _ => None,
532        }
533    }
534
535    /// Returns INVALID_OBJECT_ID if it's not possible to peek at the next object ID.
536    fn peek_next(&self) -> u64 {
537        match self {
538            LastObjectId::Unencrypted { id } => id.wrapping_add(1),
539            LastObjectId::Encrypted { id, cipher } => {
540                let mut next = *id;
541                let hi = next & OBJECT_ID_HI_MASK;
542                loop {
543                    if next as u32 == u32::MAX {
544                        return INVALID_OBJECT_ID;
545                    }
546                    next += 1;
547                    let candidate = hi | cipher.encrypt(next as u32) as u64;
548                    if candidate != INVALID_OBJECT_ID {
549                        return candidate;
550                    }
551                }
552            }
553            _ => INVALID_OBJECT_ID,
554        }
555    }
556
557    /// Returns INVALID_OBJECT_ID for algorithms that don't use the last ID.
558    fn id(&self) -> u64 {
559        match self {
560            LastObjectId::Unencrypted { id } | LastObjectId::Encrypted { id, .. } => *id,
561            _ => INVALID_OBJECT_ID,
562        }
563    }
564
565    /// Returns true if `id` is reserved (it must be 32 bits).
566    fn is_reserved(&self, id: u64) -> bool {
567        match self {
568            LastObjectId::Low32Bit { reserved, .. } => {
569                if let Ok(id) = id.try_into() {
570                    reserved.contains(&id)
571                } else {
572                    false
573                }
574            }
575            _ => false,
576        }
577    }
578
579    /// Reserves `id`.
580    fn reserve(&mut self, id: u64) {
581        match self {
582            LastObjectId::Low32Bit { reserved, .. } => {
583                assert!(reserved.insert(id.try_into().unwrap()))
584            }
585            _ => unreachable!(),
586        }
587    }
588
589    /// Unreserves `id`.
590    fn unreserve(&mut self, id: u64) {
591        match self {
592            LastObjectId::Low32Bit { unreserved, .. } => {
593                // To avoid races, where a reserved ID transitions from being reserved to being
594                // actually used in a committed transaction, we delay updating `reserved` until a
595                // suitable point.
596                //
597                // On thread A, we might have:
598                //
599                //   A1. Commit transaction (insert a record into the LSM tree that uses ID)
600                //   A2. `unreserve`
601                //
602                // And on another thread B, we might have:
603                //
604                //   B1. Drain `unreserved`.
605                //   B2. Check tree and `reserved` to see if ID is used.
606                //
607                // B2 will involve calling `LsmTree::layer_set` which should be thought of as a
608                // snapshot, so the change A1 might not be visible to thread B, but it won't matter
609                // because `reserved` will still include the ID.  So long as each thread does the
610                // operations in this order, it should be safe.
611                unreserved.push(id.try_into().unwrap())
612            }
613            _ => {}
614        }
615    }
616
617    /// Removes `unreserved` IDs from the `reserved` list.
618    fn drain_unreserved(&mut self) {
619        match self {
620            LastObjectId::Low32Bit { reserved, unreserved } => {
621                for u in unreserved.drain(..) {
622                    assert!(reserved.remove(&u));
623                }
624            }
625            _ => {}
626        }
627    }
628}
629
630pub struct ReservedId<'a>(&'a ObjectStore, NonZero<u64>);
631
632impl<'a> ReservedId<'a> {
633    fn new(store: &'a ObjectStore, id: NonZero<u64>) -> Self {
634        Self(store, id)
635    }
636
637    pub fn get(&self) -> u64 {
638        self.1.get()
639    }
640
641    /// The caller takes responsibility for this id.
642    #[must_use]
643    pub fn release(self) -> u64 {
644        let id = self.1.get();
645        std::mem::forget(self);
646        id
647    }
648}
649
650impl Drop for ReservedId<'_> {
651    fn drop(&mut self) {
652        self.0.last_object_id.lock().unreserve(self.1.get());
653    }
654}
655
656/// An object store supports a file like interface for objects.  Objects are keyed by a 64 bit
657/// identifier.  And object store has to be backed by a parent object store (which stores metadata
658/// for the object store).  The top-level object store (a.k.a. the root parent object store) is
659/// in-memory only.
660pub struct ObjectStore {
661    parent_store: Option<Arc<ObjectStore>>,
662    store_object_id: u64,
663    device: Arc<dyn Device>,
664    block_size: u64,
665    filesystem: Weak<FxFilesystem>,
666    // Lock ordering: This must be taken before `lock_state`.
667    store_info: Mutex<Option<StoreInfo>>,
668    tree: LSMTree<ObjectKey, ObjectValue>,
669
670    // When replaying the journal, the store cannot read StoreInfo until the whole journal
671    // has been replayed, so during that time, store_info_handle will be None and records
672    // just get sent to the tree. Once the journal has been replayed, we can open the store
673    // and load all the other layer information.
674    store_info_handle: OnceLock<DataObjectHandle<ObjectStore>>,
675
676    // The cipher to use for encrypted mutations, if this store is encrypted.
677    mutations_cipher: Mutex<Option<StreamCipher>>,
678
679    // Current lock state of the store.
680    // Lock ordering: This must be taken after `store_info`.
681    lock_state: Mutex<LockState>,
682    pub key_manager: KeyManager,
683
684    // Enable/disable tracing.
685    trace: AtomicBool,
686
687    // Informational counters for events occurring within the store.
688    counters: Mutex<ObjectStoreCounters>,
689
690    // These are updated in performance-sensitive code paths so we use atomics instead of counters.
691    device_read_ops: AtomicU64,
692    device_write_ops: AtomicU64,
693    logical_read_ops: AtomicU64,
694    logical_write_ops: AtomicU64,
695
696    // Contains the last object ID and, optionally, a cipher to be used when generating new object
697    // IDs.
698    last_object_id: Mutex<LastObjectId>,
699
700    // An optional callback to be invoked each time the ObjectStore flushes.  The callback is
701    // invoked at the end of flush, while the write lock is still held.
702    flush_callback: Mutex<Option<Box<dyn Fn(&ObjectStore) + Send + Sync + 'static>>>,
703}
704
705#[derive(Clone, Default)]
706struct ObjectStoreCounters {
707    mutations_applied: u64,
708    mutations_dropped: u64,
709    num_flushes: u64,
710    last_flush_time: Option<std::time::SystemTime>,
711}
712
713impl ObjectStore {
714    fn new(
715        parent_store: Option<Arc<ObjectStore>>,
716        store_object_id: u64,
717        filesystem: Arc<FxFilesystem>,
718        store_info: Option<StoreInfo>,
719        object_cache: Box<dyn ObjectCache<ObjectKey, ObjectValue>>,
720        mutations_cipher: Option<StreamCipher>,
721        lock_state: LockState,
722        last_object_id: LastObjectId,
723    ) -> Arc<ObjectStore> {
724        let device = filesystem.device();
725        let block_size = filesystem.block_size();
726        Arc::new(ObjectStore {
727            parent_store,
728            store_object_id,
729            device,
730            block_size,
731            filesystem: Arc::downgrade(&filesystem),
732            store_info: Mutex::new(store_info),
733            tree: LSMTree::new(merge::merge, object_cache),
734            store_info_handle: OnceLock::new(),
735            mutations_cipher: Mutex::new(mutations_cipher),
736            lock_state: Mutex::new(lock_state),
737            key_manager: KeyManager::new(),
738            trace: AtomicBool::new(false),
739            counters: Mutex::new(ObjectStoreCounters::default()),
740            device_read_ops: AtomicU64::new(0),
741            device_write_ops: AtomicU64::new(0),
742            logical_read_ops: AtomicU64::new(0),
743            logical_write_ops: AtomicU64::new(0),
744            last_object_id: Mutex::new(last_object_id),
745            flush_callback: Mutex::new(None),
746        })
747    }
748
749    fn new_empty(
750        parent_store: Option<Arc<ObjectStore>>,
751        store_object_id: u64,
752        filesystem: Arc<FxFilesystem>,
753        object_cache: Box<dyn ObjectCache<ObjectKey, ObjectValue>>,
754    ) -> Arc<Self> {
755        Self::new(
756            parent_store,
757            store_object_id,
758            filesystem,
759            Some(StoreInfo::default()),
760            object_cache,
761            None,
762            LockState::Unencrypted,
763            LastObjectId::Unencrypted { id: 0 },
764        )
765    }
766
767    /// Cycle breaker constructor that returns an ObjectStore without a filesystem.
768    /// This should only be used from super block code.
769    pub fn new_root_parent(device: Arc<dyn Device>, block_size: u64, store_object_id: u64) -> Self {
770        ObjectStore {
771            parent_store: None,
772            store_object_id,
773            device,
774            block_size,
775            filesystem: Weak::<FxFilesystem>::new(),
776            store_info: Mutex::new(Some(StoreInfo::default())),
777            tree: LSMTree::new(merge::merge, Box::new(NullCache {})),
778            store_info_handle: OnceLock::new(),
779            mutations_cipher: Mutex::new(None),
780            lock_state: Mutex::new(LockState::Unencrypted),
781            key_manager: KeyManager::new(),
782            trace: AtomicBool::new(false),
783            counters: Mutex::new(ObjectStoreCounters::default()),
784            device_read_ops: AtomicU64::new(0),
785            device_write_ops: AtomicU64::new(0),
786            logical_read_ops: AtomicU64::new(0),
787            logical_write_ops: AtomicU64::new(0),
788            last_object_id: Mutex::new(LastObjectId::Unencrypted { id: 0 }),
789            flush_callback: Mutex::new(None),
790        }
791    }
792
793    /// Used to set filesystem on root_parent stores at bootstrap time after the filesystem has
794    /// been created.
795    pub fn attach_filesystem(mut this: ObjectStore, filesystem: Arc<FxFilesystem>) -> ObjectStore {
796        this.filesystem = Arc::downgrade(&filesystem);
797        this
798    }
799
800    /// Create a child store. It is a multi-step process:
801    ///
802    ///   1. Call `ObjectStore::new_child_store`.
803    ///   2. Register the store with the object-manager.
804    ///   3. Call `ObjectStore::create` to write the store-info.
805    ///
806    /// If the procedure fails, care must be taken to unregister store with the object-manager.
807    ///
808    /// The steps have to be separate because of lifetime issues when working with a transaction.
809    async fn new_child_store(
810        self: &Arc<Self>,
811        transaction: &mut Transaction<'_>,
812        options: NewChildStoreOptions,
813        object_cache: Box<dyn ObjectCache<ObjectKey, ObjectValue>>,
814    ) -> Result<Arc<Self>, Error> {
815        ensure!(
816            !options.reserve_32bit_object_ids || !options.low_32_bit_object_ids,
817            FxfsError::InvalidArgs
818        );
819        let handle = if let Some(object_id) = NonZero::new(options.object_id) {
820            self.update_last_object_id(object_id.get());
821            let handle = ObjectStore::create_object_with_id(
822                self,
823                transaction,
824                ReservedId::new(self, object_id),
825                HandleOptions::default(),
826                None,
827            )?;
828            handle
829        } else {
830            ObjectStore::create_object(self, transaction, HandleOptions::default(), None).await?
831        };
832        let filesystem = self.filesystem();
833        let id = if options.reserve_32bit_object_ids { 0x1_0000_0000 } else { 0 };
834        let (last_object_id, last_object_id_in_memory) = if options.low_32_bit_object_ids {
835            (
836                LastObjectIdInfo::Low32Bit,
837                LastObjectId::Low32Bit { reserved: HashSet::new(), unreserved: Vec::new() },
838            )
839        } else if let Some(crypt) = &options.options.crypt {
840            let (object_id_wrapped, object_id_unwrapped) =
841                crypt.create_key(handle.object_id(), KeyPurpose::Metadata).await?;
842            (
843                LastObjectIdInfo::Encrypted { id, key: object_id_wrapped },
844                LastObjectId::Encrypted { id, cipher: Box::new(Ff1::new(&object_id_unwrapped)) },
845            )
846        } else {
847            (LastObjectIdInfo::Unencrypted { id }, LastObjectId::Unencrypted { id })
848        };
849        let store = if let Some(crypt) = options.options.crypt {
850            let (wrapped_key, unwrapped_key) =
851                crypt.create_key(handle.object_id(), KeyPurpose::Metadata).await?;
852            Self::new(
853                Some(self.clone()),
854                handle.object_id(),
855                filesystem.clone(),
856                Some(StoreInfo {
857                    mutations_key: Some(wrapped_key),
858                    last_object_id,
859                    guid: options.guid.unwrap_or_else(|| *Uuid::new_v4().as_bytes()),
860                    ..Default::default()
861                }),
862                object_cache,
863                Some(StreamCipher::new(&unwrapped_key, 0)),
864                LockState::Unlocked { owner: options.options.owner, crypt },
865                last_object_id_in_memory,
866            )
867        } else {
868            Self::new(
869                Some(self.clone()),
870                handle.object_id(),
871                filesystem.clone(),
872                Some(StoreInfo {
873                    last_object_id,
874                    guid: options.guid.unwrap_or_else(|| *Uuid::new_v4().as_bytes()),
875                    ..Default::default()
876                }),
877                object_cache,
878                None,
879                LockState::Unencrypted,
880                last_object_id_in_memory,
881            )
882        };
883        assert!(store.store_info_handle.set(handle).is_ok());
884        Ok(store)
885    }
886
887    /// Actually creates the store in a transaction.  This will also create a root directory and
888    /// graveyard directory for the store.  See `new_child_store` above.
889    async fn create<'a>(
890        self: &'a Arc<Self>,
891        transaction: &mut Transaction<'a>,
892    ) -> Result<(), Error> {
893        let buf = {
894            // Create a root directory and graveyard directory.
895            let graveyard_directory_object_id = Graveyard::create(transaction, &self).await?;
896            let root_directory = Directory::create(transaction, &self, None).await?;
897
898            let serialized_info = {
899                let mut store_info = self.store_info.lock();
900                let store_info = store_info.as_mut().unwrap();
901
902                store_info.graveyard_directory_object_id = graveyard_directory_object_id;
903                store_info.root_directory_object_id = root_directory.object_id();
904
905                let mut serialized_info = Vec::new();
906                store_info.serialize_with_version(&mut serialized_info)?;
907                serialized_info
908            };
909            let mut buf = self.device.allocate_buffer(serialized_info.len()).await;
910            buf.as_mut_slice().copy_from_slice(&serialized_info[..]);
911            buf
912        };
913
914        if self.filesystem().options().image_builder_mode.is_some() {
915            // If we're in image builder mode, we want to avoid writing to disk unless explicitly
916            // asked to. New object stores will have their StoreInfo written when we compact in
917            // FxFilesystem::finalize().
918            Ok(())
919        } else {
920            self.store_info_handle.get().unwrap().txn_write(transaction, 0u64, buf.as_ref()).await
921        }
922    }
923
924    pub fn set_trace(&self, trace: bool) {
925        let old_value = self.trace.swap(trace, Ordering::Relaxed);
926        if trace != old_value {
927            info!(store_id = self.store_object_id(), trace; "OS: trace",);
928        }
929    }
930
931    /// Sets a callback to be invoked each time the ObjectStore flushes.  The callback is invoked at
932    /// the end of flush, while the write lock is still held.
933    pub fn set_flush_callback<F: Fn(&ObjectStore) + Send + Sync + 'static>(&self, callback: F) {
934        let mut flush_callback = self.flush_callback.lock();
935        *flush_callback = Some(Box::new(callback));
936    }
937
938    pub fn is_root(&self) -> bool {
939        if let Some(parent) = &self.parent_store {
940            parent.parent_store.is_none()
941        } else {
942            // The root parent store isn't the root store.
943            false
944        }
945    }
946
947    /// Populates an inspect node with store statistics.
948    pub fn record_data(self: &Arc<Self>, root: &fuchsia_inspect::Node) {
949        // TODO(https://fxbug.dev/42069513): Push-back or rate-limit to prevent DoS.
950        let counters = self.counters.lock();
951        if let Some(store_info) = self.store_info() {
952            root.record_string("guid", Uuid::from_bytes(store_info.guid).to_string());
953        };
954        root.record_uint("store_object_id", self.store_object_id);
955        root.record_uint("mutations_applied", counters.mutations_applied);
956        root.record_uint("mutations_dropped", counters.mutations_dropped);
957        root.record_uint("num_flushes", counters.num_flushes);
958        if let Some(last_flush_time) = counters.last_flush_time.as_ref() {
959            root.record_uint(
960                "last_flush_time_ms",
961                last_flush_time
962                    .duration_since(std::time::UNIX_EPOCH)
963                    .unwrap_or(std::time::Duration::ZERO)
964                    .as_millis()
965                    .try_into()
966                    .unwrap_or(0u64),
967            );
968        }
969        root.record_uint("device_read_ops", self.device_read_ops.load(Ordering::Relaxed));
970        root.record_uint("device_write_ops", self.device_write_ops.load(Ordering::Relaxed));
971        root.record_uint("logical_read_ops", self.logical_read_ops.load(Ordering::Relaxed));
972        root.record_uint("logical_write_ops", self.logical_write_ops.load(Ordering::Relaxed));
973        {
974            let last_object_id = self.last_object_id.lock();
975            root.record_uint("object_id_hi", last_object_id.id() >> 32);
976            root.record_bool(
977                "low_32_bit_object_ids",
978                matches!(&*last_object_id, LastObjectId::Low32Bit { .. }),
979            );
980        }
981
982        let this = self.clone();
983        root.record_child("lsm_tree", move |node| this.tree().record_inspect_data(node));
984    }
985
986    pub fn device(&self) -> &Arc<dyn Device> {
987        &self.device
988    }
989
990    pub fn block_size(&self) -> u64 {
991        self.block_size
992    }
993
994    pub fn filesystem(&self) -> Arc<FxFilesystem> {
995        self.filesystem.upgrade().unwrap()
996    }
997
998    pub fn store_object_id(&self) -> u64 {
999        self.store_object_id
1000    }
1001
1002    pub fn tree(&self) -> &LSMTree<ObjectKey, ObjectValue> {
1003        &self.tree
1004    }
1005
1006    pub fn root_directory_object_id(&self) -> u64 {
1007        self.store_info.lock().as_ref().unwrap().root_directory_object_id
1008    }
1009
1010    pub fn guid(&self) -> [u8; 16] {
1011        self.store_info.lock().as_ref().unwrap().guid
1012    }
1013
1014    pub fn graveyard_directory_object_id(&self) -> u64 {
1015        self.store_info.lock().as_ref().unwrap().graveyard_directory_object_id
1016    }
1017
1018    fn set_graveyard_directory_object_id(&self, oid: u64) {
1019        assert_eq!(
1020            std::mem::replace(
1021                &mut self.store_info.lock().as_mut().unwrap().graveyard_directory_object_id,
1022                oid
1023            ),
1024            INVALID_OBJECT_ID
1025        );
1026    }
1027
1028    pub fn object_count(&self) -> u64 {
1029        self.store_info.lock().as_ref().unwrap().object_count
1030    }
1031
1032    pub fn key_manager(&self) -> &KeyManager {
1033        &self.key_manager
1034    }
1035
1036    pub fn parent_store(&self) -> Option<&Arc<ObjectStore>> {
1037        self.parent_store.as_ref()
1038    }
1039
1040    /// Returns the crypt object for the store.  Returns None if the store is unencrypted.
1041    pub fn crypt(&self) -> Option<Arc<dyn Crypt>> {
1042        match &*self.lock_state.lock() {
1043            LockState::Locked => panic!("Store is locked"),
1044            LockState::Invalid
1045            | LockState::Unencrypted
1046            | LockState::Locking
1047            | LockState::Unlocking
1048            | LockState::Deleted => None,
1049            LockState::Unlocked { crypt, .. } => Some(crypt.clone()),
1050            LockState::UnlockedReadOnly(crypt) => Some(crypt.clone()),
1051            LockState::Unknown => {
1052                panic!("Store is of unknown lock state; has the journal been replayed yet?")
1053            }
1054        }
1055    }
1056
1057    pub async fn get_or_create_internal_directory_id(self: &Arc<Self>) -> Result<u64, Error> {
1058        // Create the transaction first to use the object store lock.
1059        let mut transaction = self
1060            .filesystem()
1061            .new_transaction(
1062                lock_keys![LockKey::InternalDirectory { store_object_id: self.store_object_id }],
1063                Options::default(),
1064            )
1065            .await?;
1066        let obj_id = self.store_info.lock().as_ref().unwrap().internal_directory_object_id;
1067        if obj_id != INVALID_OBJECT_ID {
1068            return Ok(obj_id);
1069        }
1070
1071        // Need to create an internal directory.
1072        let directory = Directory::create(&mut transaction, self, None).await?;
1073
1074        transaction.add(self.store_object_id, Mutation::CreateInternalDir(directory.object_id()));
1075        transaction.commit().await?;
1076        Ok(directory.object_id())
1077    }
1078
1079    /// Returns the file size for the object without opening the object.
1080    async fn get_file_size(&self, object_id: u64) -> Result<u64, Error> {
1081        let item = self
1082            .tree
1083            .find(&ObjectKey::attribute(
1084                object_id,
1085                DEFAULT_DATA_ATTRIBUTE_ID,
1086                AttributeKey::Attribute,
1087            ))
1088            .await?
1089            .ok_or(FxfsError::NotFound)?;
1090        if let ObjectValue::Attribute { size, .. } = item.value {
1091            Ok(size)
1092        } else {
1093            bail!(FxfsError::NotFile);
1094        }
1095    }
1096
1097    #[cfg(feature = "migration")]
1098    pub fn last_object_id(&self) -> u64 {
1099        self.last_object_id.lock().id()
1100    }
1101
1102    /// Provides access to the allocator to mark a specific region of the device as allocated.
1103    #[cfg(feature = "migration")]
1104    pub fn mark_allocated(
1105        &self,
1106        transaction: &mut Transaction<'_>,
1107        store_object_id: u64,
1108        device_range: std::ops::Range<u64>,
1109    ) -> Result<(), Error> {
1110        self.allocator().mark_allocated(transaction, store_object_id, device_range)
1111    }
1112
1113    /// `crypt` can be provided if the crypt service should be different to the default; see the
1114    /// comment on create_object.  Users should avoid having more than one handle open for the same
1115    /// object at the same time because they might get out-of-sync; there is no code that will
1116    /// prevent this.  One example where this can cause an issue is if the object ends up using a
1117    /// permanent key (which is the case if a value is passed for `crypt`), the permanent key is
1118    /// dropped when a handle is dropped, which will impact any other handles for the same object.
1119    pub async fn open_object<S: HandleOwner>(
1120        owner: &Arc<S>,
1121        obj_id: u64,
1122        options: HandleOptions,
1123        crypt: Option<Arc<dyn Crypt>>,
1124    ) -> Result<DataObjectHandle<S>, Error> {
1125        let store = owner.as_ref().as_ref();
1126        let mut fsverity_descriptor = None;
1127        let mut overwrite_ranges = Vec::new();
1128        let item = store
1129            .tree
1130            .find(&ObjectKey::attribute(obj_id, DEFAULT_DATA_ATTRIBUTE_ID, AttributeKey::Attribute))
1131            .await?
1132            .ok_or(FxfsError::NotFound)?;
1133
1134        let (size, track_overwrite_extents) = match item.value {
1135            ObjectValue::Attribute { size, has_overwrite_extents } => (size, has_overwrite_extents),
1136            ObjectValue::VerifiedAttribute { size, fsverity_metadata } => {
1137                if !options.skip_fsverity {
1138                    fsverity_descriptor = Some(fsverity_metadata);
1139                }
1140                // We only track the overwrite extents in memory for writes, reads handle them
1141                // implicitly, which means verified files (where the data won't change anymore)
1142                // don't need to track them.
1143                (size, false)
1144            }
1145            _ => bail!(anyhow!(FxfsError::Inconsistent).context("open_object: Expected attibute")),
1146        };
1147
1148        ensure!(size <= MAX_FILE_SIZE, FxfsError::Inconsistent);
1149
1150        if track_overwrite_extents {
1151            let layer_set = store.tree.layer_set();
1152            let mut merger = layer_set.merger();
1153            let mut iter = merger
1154                .query(Query::FullRange(&ObjectKey::attribute(
1155                    obj_id,
1156                    DEFAULT_DATA_ATTRIBUTE_ID,
1157                    AttributeKey::Extent(ExtentKey::search_key_from_offset(0)),
1158                )))
1159                .await?;
1160            loop {
1161                match iter.get() {
1162                    Some(ItemRef {
1163                        key:
1164                            ObjectKey {
1165                                object_id,
1166                                data:
1167                                    ObjectKeyData::Attribute(
1168                                        attribute_id,
1169                                        AttributeKey::Extent(ExtentKey { range }),
1170                                    ),
1171                            },
1172                        value,
1173                        ..
1174                    }) if *object_id == obj_id && *attribute_id == DEFAULT_DATA_ATTRIBUTE_ID => {
1175                        match value {
1176                            ObjectValue::Extent(ExtentValue::None)
1177                            | ObjectValue::Extent(ExtentValue::Some {
1178                                mode: ExtentMode::Raw,
1179                                ..
1180                            })
1181                            | ObjectValue::Extent(ExtentValue::Some {
1182                                mode: ExtentMode::Cow(_),
1183                                ..
1184                            }) => (),
1185                            ObjectValue::Extent(ExtentValue::Some {
1186                                mode: ExtentMode::OverwritePartial(_),
1187                                ..
1188                            })
1189                            | ObjectValue::Extent(ExtentValue::Some {
1190                                mode: ExtentMode::Overwrite,
1191                                ..
1192                            }) => overwrite_ranges.push(range.clone()),
1193                            _ => bail!(
1194                                anyhow!(FxfsError::Inconsistent)
1195                                    .context("open_object: Expected extent")
1196                            ),
1197                        }
1198                        iter.advance().await?;
1199                    }
1200                    _ => break,
1201                }
1202            }
1203        }
1204
1205        // If a crypt service has been specified, it needs to be a permanent key because cached
1206        // keys can only use the store's crypt service.
1207        let permanent = if let Some(crypt) = crypt {
1208            store
1209                .key_manager
1210                .get_keys(
1211                    obj_id,
1212                    crypt.as_ref(),
1213                    &mut Some(async || store.get_keys(obj_id).await),
1214                    /* permanent= */ true,
1215                    /* force= */ false,
1216                )
1217                .await?;
1218            true
1219        } else {
1220            false
1221        };
1222        let data_object_handle = DataObjectHandle::new(
1223            owner.clone(),
1224            obj_id,
1225            permanent,
1226            DEFAULT_DATA_ATTRIBUTE_ID,
1227            size,
1228            FsverityState::None,
1229            options,
1230            false,
1231            &overwrite_ranges,
1232        );
1233        if let Some(descriptor) = fsverity_descriptor {
1234            data_object_handle
1235                .set_fsverity_state_some(descriptor)
1236                .await
1237                .context("Invalid or mismatched merkle tree")?;
1238        }
1239        Ok(data_object_handle)
1240    }
1241
1242    pub fn create_object_with_id<S: HandleOwner>(
1243        owner: &Arc<S>,
1244        transaction: &mut Transaction<'_>,
1245        reserved_object_id: ReservedId<'_>,
1246        options: HandleOptions,
1247        encryption_options: Option<ObjectEncryptionOptions>,
1248    ) -> Result<DataObjectHandle<S>, Error> {
1249        let store = owner.as_ref().as_ref();
1250        // Don't permit creating unencrypted objects in an encrypted store.  The converse is OK.
1251        debug_assert!(store.crypt().is_none() || encryption_options.is_some());
1252        let now = Timestamp::now();
1253        let object_id = reserved_object_id.get();
1254        assert!(
1255            transaction
1256                .add(
1257                    store.store_object_id(),
1258                    Mutation::insert_object(
1259                        ObjectKey::object(reserved_object_id.release()),
1260                        ObjectValue::file(
1261                            1,
1262                            0,
1263                            now.clone(),
1264                            now.clone(),
1265                            now.clone(),
1266                            now,
1267                            0,
1268                            None
1269                        ),
1270                    ),
1271                )
1272                .is_none()
1273        );
1274        let mut permanent_keys = false;
1275        if let Some(ObjectEncryptionOptions { permanent, key_id, key, unwrapped_key }) =
1276            encryption_options
1277        {
1278            permanent_keys = permanent;
1279            transaction.add(
1280                store.store_object_id(),
1281                Mutation::insert_object(
1282                    ObjectKey::keys(object_id),
1283                    ObjectValue::keys(vec![(key_id, key)].into()),
1284                ),
1285            );
1286            let cipher: Arc<dyn Cipher> = Arc::new(FxfsCipher::new(&unwrapped_key));
1287            store.key_manager.insert(
1288                object_id,
1289                Arc::new(vec![(key_id, CipherHolder::Cipher(cipher))].into()),
1290                permanent,
1291            );
1292        }
1293        transaction.add(
1294            store.store_object_id(),
1295            Mutation::insert_object(
1296                ObjectKey::attribute(object_id, DEFAULT_DATA_ATTRIBUTE_ID, AttributeKey::Attribute),
1297                // This is a new object so nothing has pre-allocated overwrite extents yet.
1298                ObjectValue::attribute(0, false),
1299            ),
1300        );
1301        Ok(DataObjectHandle::new(
1302            owner.clone(),
1303            object_id,
1304            permanent_keys,
1305            DEFAULT_DATA_ATTRIBUTE_ID,
1306            0,
1307            FsverityState::None,
1308            options,
1309            false,
1310            &[],
1311        ))
1312    }
1313
1314    /// Creates an object in the store.
1315    ///
1316    /// If the store is encrypted, the object will be automatically encrypted as well.
1317    /// If `wrapping_key_id` is set, the new keys will be wrapped with that specific key, and
1318    /// otherwise the default data key is used.
1319    pub async fn create_object<S: HandleOwner>(
1320        owner: &Arc<S>,
1321        mut transaction: &mut Transaction<'_>,
1322        options: HandleOptions,
1323        wrapping_key_id: Option<WrappingKeyId>,
1324    ) -> Result<DataObjectHandle<S>, Error> {
1325        let store = owner.as_ref().as_ref();
1326        let object_id = store.get_next_object_id(transaction.txn_guard()).await?;
1327        let crypt = store.crypt();
1328        let encryption_options = if let Some(crypt) = crypt {
1329            let key_id =
1330                if wrapping_key_id.is_some() { FSCRYPT_KEY_ID } else { VOLUME_DATA_KEY_ID };
1331            let (key, unwrapped_key) = if let Some(wrapping_key_id) = wrapping_key_id {
1332                crypt.create_key_with_id(object_id.get(), wrapping_key_id, ObjectType::File).await?
1333            } else {
1334                let (fxfs_key, unwrapped_key) =
1335                    crypt.create_key(object_id.get(), KeyPurpose::Data).await?;
1336                (EncryptionKey::Fxfs(fxfs_key), unwrapped_key)
1337            };
1338            Some(ObjectEncryptionOptions { permanent: false, key_id, key, unwrapped_key })
1339        } else {
1340            None
1341        };
1342        ObjectStore::create_object_with_id(
1343            owner,
1344            &mut transaction,
1345            object_id,
1346            options,
1347            encryption_options,
1348        )
1349    }
1350
1351    /// Creates an object using explicitly provided keys.
1352    ///
1353    /// There are some cases where an encrypted object needs to be created in an unencrypted store.
1354    /// For example, when layer files for a child store are created in the root store, but they must
1355    /// be encrypted using the child store's keys.  This method exists for that purpose.
1356    pub(crate) async fn create_object_with_key<S: HandleOwner>(
1357        owner: &Arc<S>,
1358        mut transaction: &mut Transaction<'_>,
1359        object_id: ReservedId<'_>,
1360        options: HandleOptions,
1361        key: EncryptionKey,
1362        unwrapped_key: UnwrappedKey,
1363    ) -> Result<DataObjectHandle<S>, Error> {
1364        ObjectStore::create_object_with_id(
1365            owner,
1366            &mut transaction,
1367            object_id,
1368            options,
1369            Some(ObjectEncryptionOptions {
1370                permanent: true,
1371                key_id: VOLUME_DATA_KEY_ID,
1372                key,
1373                unwrapped_key,
1374            }),
1375        )
1376    }
1377
1378    /// Adjusts the reference count for a given object.  If the reference count reaches zero, the
1379    /// object is moved into the graveyard and true is returned.
1380    pub async fn adjust_refs(
1381        &self,
1382        transaction: &mut Transaction<'_>,
1383        object_id: u64,
1384        delta: i64,
1385    ) -> Result<bool, Error> {
1386        let mut mutation = self.txn_get_object_mutation(transaction, object_id).await?;
1387        let refs = if let ObjectValue::Object {
1388            kind: ObjectKind::File { refs, .. } | ObjectKind::Symlink { refs, .. },
1389            ..
1390        } = &mut mutation.item.value
1391        {
1392            *refs =
1393                refs.checked_add_signed(delta).ok_or_else(|| anyhow!("refs underflow/overflow"))?;
1394            refs
1395        } else {
1396            bail!(FxfsError::NotFile);
1397        };
1398        if *refs == 0 {
1399            self.add_to_graveyard(transaction, object_id);
1400
1401            // We might still need to adjust the reference count if delta was something other than
1402            // -1.
1403            if delta != -1 {
1404                *refs = 1;
1405                transaction.add(self.store_object_id, Mutation::ObjectStore(mutation));
1406            }
1407            // Otherwise, we don't commit the mutation as we want to keep reference count as 1 for
1408            // objects in graveyard.
1409            Ok(true)
1410        } else {
1411            transaction.add(self.store_object_id, Mutation::ObjectStore(mutation));
1412            Ok(false)
1413        }
1414    }
1415
1416    // Purges an object that is in the graveyard.
1417    pub async fn tombstone_object(
1418        &self,
1419        object_id: u64,
1420        txn_options: Options<'_>,
1421    ) -> Result<(), Error> {
1422        self.key_manager.remove(object_id).await;
1423        let fs = self.filesystem();
1424        let truncate_guard = fs.truncate_guard(self.store_object_id, object_id).await;
1425        self.trim_or_tombstone(object_id, true, txn_options, &truncate_guard).await
1426    }
1427
1428    /// Trim extents beyond the end of a file for all attributes.  This will remove the entry from
1429    /// the graveyard when done.
1430    pub async fn trim(
1431        &self,
1432        object_id: u64,
1433        truncate_guard: &TruncateGuard<'_>,
1434    ) -> Result<(), Error> {
1435        // For the root and root parent store, we would need to use the metadata reservation which
1436        // we don't currently support, so assert that we're not those stores.
1437        assert!(self.parent_store.as_ref().unwrap().parent_store.is_some());
1438
1439        self.trim_or_tombstone(
1440            object_id,
1441            false,
1442            Options { borrow_metadata_space: true, ..Default::default() },
1443            truncate_guard,
1444        )
1445        .await
1446    }
1447
1448    /// Trims or tombstones an object.
1449    async fn trim_or_tombstone(
1450        &self,
1451        object_id: u64,
1452        for_tombstone: bool,
1453        txn_options: Options<'_>,
1454        _truncate_guard: &TruncateGuard<'_>,
1455    ) -> Result<(), Error> {
1456        let fs = self.filesystem();
1457        let mut next_attribute = Some(0);
1458        while let Some(attribute_id) = next_attribute.take() {
1459            let mut transaction = fs
1460                .clone()
1461                .new_transaction(
1462                    lock_keys![
1463                        LockKey::object_attribute(self.store_object_id, object_id, attribute_id),
1464                        LockKey::object(self.store_object_id, object_id),
1465                    ],
1466                    txn_options,
1467                )
1468                .await?;
1469
1470            match self
1471                .trim_some(
1472                    &mut transaction,
1473                    object_id,
1474                    attribute_id,
1475                    if for_tombstone {
1476                        TrimMode::Tombstone(TombstoneMode::Object)
1477                    } else {
1478                        TrimMode::UseSize
1479                    },
1480                )
1481                .await?
1482            {
1483                TrimResult::Incomplete => next_attribute = Some(attribute_id),
1484                TrimResult::Done(None) => {
1485                    if for_tombstone
1486                        || matches!(
1487                            self.tree
1488                                .find(&ObjectKey::graveyard_entry(
1489                                    self.graveyard_directory_object_id(),
1490                                    object_id,
1491                                ))
1492                                .await?,
1493                            Some(Item { value: ObjectValue::Trim, .. })
1494                        )
1495                    {
1496                        self.remove_from_graveyard(&mut transaction, object_id);
1497                    }
1498                }
1499                TrimResult::Done(id) => next_attribute = id,
1500            }
1501
1502            if !transaction.mutations().is_empty() {
1503                transaction.commit().await?;
1504            }
1505        }
1506        Ok(())
1507    }
1508
1509    // Purges an object's attribute that is in the graveyard.
1510    pub async fn tombstone_attribute(
1511        &self,
1512        object_id: u64,
1513        attribute_id: u64,
1514        txn_options: Options<'_>,
1515    ) -> Result<(), Error> {
1516        let fs = self.filesystem();
1517        let mut trim_result = TrimResult::Incomplete;
1518        while matches!(trim_result, TrimResult::Incomplete) {
1519            let mut transaction = fs
1520                .clone()
1521                .new_transaction(
1522                    lock_keys![
1523                        LockKey::object_attribute(self.store_object_id, object_id, attribute_id),
1524                        LockKey::object(self.store_object_id, object_id),
1525                    ],
1526                    txn_options,
1527                )
1528                .await?;
1529            trim_result = self
1530                .trim_some(
1531                    &mut transaction,
1532                    object_id,
1533                    attribute_id,
1534                    TrimMode::Tombstone(TombstoneMode::Attribute),
1535                )
1536                .await?;
1537            if let TrimResult::Done(..) = trim_result {
1538                self.remove_attribute_from_graveyard(&mut transaction, object_id, attribute_id)
1539            }
1540            if !transaction.mutations().is_empty() {
1541                transaction.commit().await?;
1542            }
1543        }
1544        Ok(())
1545    }
1546
1547    /// Deletes extents for attribute `attribute_id` in object `object_id`.  Also see the comments
1548    /// for TrimMode and TrimResult. Should hold a lock on the attribute, and the object as it
1549    /// performs a read-modify-write on the sizes.
1550    pub async fn trim_some(
1551        &self,
1552        transaction: &mut Transaction<'_>,
1553        object_id: u64,
1554        attribute_id: u64,
1555        mode: TrimMode,
1556    ) -> Result<TrimResult, Error> {
1557        let layer_set = self.tree.layer_set();
1558        let mut merger = layer_set.merger();
1559
1560        let aligned_offset = match mode {
1561            TrimMode::FromOffset(offset) => {
1562                round_up(offset, self.block_size).ok_or(FxfsError::Inconsistent)?
1563            }
1564            TrimMode::Tombstone(..) => 0,
1565            TrimMode::UseSize => {
1566                let iter = merger
1567                    .query(Query::FullRange(&ObjectKey::attribute(
1568                        object_id,
1569                        attribute_id,
1570                        AttributeKey::Attribute,
1571                    )))
1572                    .await?;
1573                if let Some(item_ref) = iter.get() {
1574                    if item_ref.key.object_id != object_id {
1575                        return Ok(TrimResult::Done(None));
1576                    }
1577
1578                    if let ItemRef {
1579                        key:
1580                            ObjectKey {
1581                                data:
1582                                    ObjectKeyData::Attribute(size_attribute_id, AttributeKey::Attribute),
1583                                ..
1584                            },
1585                        value: ObjectValue::Attribute { size, .. },
1586                        ..
1587                    } = item_ref
1588                    {
1589                        // If we found a different attribute_id, return so we can get the
1590                        // right lock.
1591                        if *size_attribute_id != attribute_id {
1592                            return Ok(TrimResult::Done(Some(*size_attribute_id)));
1593                        }
1594                        round_up(*size, self.block_size).ok_or(FxfsError::Inconsistent)?
1595                    } else {
1596                        // At time of writing, we should always see a size record or None here, but
1597                        // asserting here would be brittle so just skip to the the next attribute
1598                        // instead.
1599                        return Ok(TrimResult::Done(Some(attribute_id + 1)));
1600                    }
1601                } else {
1602                    // End of the tree.
1603                    return Ok(TrimResult::Done(None));
1604                }
1605            }
1606        };
1607
1608        // Loop over the extents and deallocate them.
1609        let mut iter = merger
1610            .query(Query::FullRange(&ObjectKey::from_extent(
1611                object_id,
1612                attribute_id,
1613                ExtentKey::search_key_from_offset(aligned_offset),
1614            )))
1615            .await?;
1616        let mut end = 0;
1617        let allocator = self.allocator();
1618        let mut result = TrimResult::Done(None);
1619        let mut deallocated = 0;
1620        let block_size = self.block_size;
1621
1622        while let Some(item_ref) = iter.get() {
1623            if item_ref.key.object_id != object_id {
1624                break;
1625            }
1626            if let ObjectKey {
1627                data: ObjectKeyData::Attribute(extent_attribute_id, attribute_key),
1628                ..
1629            } = item_ref.key
1630            {
1631                if *extent_attribute_id != attribute_id {
1632                    result = TrimResult::Done(Some(*extent_attribute_id));
1633                    break;
1634                }
1635                if let (
1636                    AttributeKey::Extent(ExtentKey { range }),
1637                    ObjectValue::Extent(ExtentValue::Some { device_offset, .. }),
1638                ) = (attribute_key, item_ref.value)
1639                {
1640                    let start = std::cmp::max(range.start, aligned_offset);
1641                    ensure!(start < range.end, FxfsError::Inconsistent);
1642                    let device_offset = device_offset
1643                        .checked_add(start - range.start)
1644                        .ok_or(FxfsError::Inconsistent)?;
1645                    end = range.end;
1646                    let len = end - start;
1647                    let device_range = device_offset..device_offset + len;
1648                    ensure!(device_range.is_aligned(block_size), FxfsError::Inconsistent);
1649                    allocator.deallocate(transaction, self.store_object_id, device_range).await?;
1650                    deallocated += len;
1651                    // Stop if the transaction is getting too big.
1652                    if transaction.mutations().len() >= TRANSACTION_MUTATION_THRESHOLD {
1653                        result = TrimResult::Incomplete;
1654                        break;
1655                    }
1656                }
1657            }
1658            iter.advance().await?;
1659        }
1660
1661        let finished_tombstone_object = matches!(mode, TrimMode::Tombstone(TombstoneMode::Object))
1662            && matches!(result, TrimResult::Done(None));
1663        let finished_tombstone_attribute =
1664            matches!(mode, TrimMode::Tombstone(TombstoneMode::Attribute))
1665                && !matches!(result, TrimResult::Incomplete);
1666        let mut object_mutation = None;
1667        let nodes = if finished_tombstone_object { -1 } else { 0 };
1668        if nodes != 0 || deallocated != 0 {
1669            let mutation = self.txn_get_object_mutation(transaction, object_id).await?;
1670            if let ObjectValue::Object { attributes: ObjectAttributes { project_id, .. }, .. } =
1671                mutation.item.value
1672            {
1673                if project_id != 0 {
1674                    transaction.add(
1675                        self.store_object_id,
1676                        Mutation::merge_object(
1677                            ObjectKey::project_usage(self.root_directory_object_id(), project_id),
1678                            ObjectValue::BytesAndNodes {
1679                                bytes: -i64::try_from(deallocated).unwrap(),
1680                                nodes,
1681                            },
1682                        ),
1683                    );
1684                }
1685                object_mutation = Some(mutation);
1686            } else {
1687                panic!("Inconsistent object type.");
1688            }
1689        }
1690
1691        // Deletion marker records *must* be merged so as to consume all other records for the
1692        // object.
1693        if finished_tombstone_object {
1694            transaction.add(
1695                self.store_object_id,
1696                Mutation::merge_object(ObjectKey::object(object_id), ObjectValue::None),
1697            );
1698        } else {
1699            if finished_tombstone_attribute {
1700                transaction.add(
1701                    self.store_object_id,
1702                    Mutation::merge_object(
1703                        ObjectKey::attribute(object_id, attribute_id, AttributeKey::Attribute),
1704                        ObjectValue::None,
1705                    ),
1706                );
1707            }
1708            if deallocated > 0 {
1709                let mut mutation = match object_mutation {
1710                    Some(mutation) => mutation,
1711                    None => self.txn_get_object_mutation(transaction, object_id).await?,
1712                };
1713                transaction.add(
1714                    self.store_object_id,
1715                    Mutation::merge_object(
1716                        ObjectKey::extent(object_id, attribute_id, aligned_offset..end),
1717                        ObjectValue::deleted_extent(),
1718                    ),
1719                );
1720                // Update allocated size.
1721                if let ObjectValue::Object {
1722                    attributes: ObjectAttributes { allocated_size, .. },
1723                    ..
1724                } = &mut mutation.item.value
1725                {
1726                    // The only way for these to fail are if the volume is inconsistent.
1727                    *allocated_size = allocated_size.checked_sub(deallocated).ok_or_else(|| {
1728                        anyhow!(FxfsError::Inconsistent).context("Allocated size overflow")
1729                    })?;
1730                } else {
1731                    panic!("Unexpected object value");
1732                }
1733                transaction.add(self.store_object_id, Mutation::ObjectStore(mutation));
1734            }
1735        }
1736        Ok(result)
1737    }
1738
1739    /// Returns all objects that exist in the parent store that pertain to this object store.
1740    /// Note that this doesn't include the object_id of the store itself which is generally
1741    /// referenced externally.
1742    pub fn parent_objects(&self) -> Vec<u64> {
1743        assert!(self.store_info_handle.get().is_some());
1744        self.store_info.lock().as_ref().unwrap().parent_objects()
1745    }
1746
1747    /// Returns root objects for this store.
1748    pub fn root_objects(&self) -> Vec<u64> {
1749        let mut objects = Vec::new();
1750        let store_info = self.store_info.lock();
1751        let info = store_info.as_ref().unwrap();
1752        if info.root_directory_object_id != INVALID_OBJECT_ID {
1753            objects.push(info.root_directory_object_id);
1754        }
1755        if info.graveyard_directory_object_id != INVALID_OBJECT_ID {
1756            objects.push(info.graveyard_directory_object_id);
1757        }
1758        if info.internal_directory_object_id != INVALID_OBJECT_ID {
1759            objects.push(info.internal_directory_object_id);
1760        }
1761        objects
1762    }
1763
1764    pub fn store_info(&self) -> Option<StoreInfo> {
1765        self.store_info.lock().as_ref().cloned()
1766    }
1767
1768    /// Returns None if called during journal replay.
1769    pub fn store_info_handle_object_id(&self) -> Option<u64> {
1770        self.store_info_handle.get().map(|h| h.object_id())
1771    }
1772
1773    /// Called to open a store, before replay of this store's mutations.
1774    async fn open(
1775        parent_store: &Arc<ObjectStore>,
1776        store_object_id: u64,
1777        object_cache: Box<dyn ObjectCache<ObjectKey, ObjectValue>>,
1778    ) -> Result<Arc<ObjectStore>, Error> {
1779        let handle =
1780            ObjectStore::open_object(parent_store, store_object_id, HandleOptions::default(), None)
1781                .await?;
1782
1783        let info = load_store_info(parent_store, store_object_id).await?;
1784        let is_encrypted = info.mutations_key.is_some();
1785
1786        let mut total_layer_size = 0;
1787        let last_object_id;
1788
1789        // TODO(https://fxbug.dev/42178043): the layer size here could be bad and cause overflow.
1790
1791        // If the store is encrypted, we can't open the object tree layers now, but we need to
1792        // compute the size of the layers.
1793        if is_encrypted {
1794            for &oid in &info.layers {
1795                total_layer_size += parent_store.get_file_size(oid).await?;
1796            }
1797            if info.encrypted_mutations_object_id != INVALID_OBJECT_ID {
1798                total_layer_size += layer_size_from_encrypted_mutations_size(
1799                    parent_store.get_file_size(info.encrypted_mutations_object_id).await?,
1800                );
1801            }
1802            last_object_id = LastObjectId::Pending;
1803            ensure!(
1804                matches!(
1805                    info.last_object_id,
1806                    LastObjectIdInfo::Encrypted { .. } | LastObjectIdInfo::Low32Bit { .. }
1807                ),
1808                FxfsError::Inconsistent
1809            );
1810        } else {
1811            last_object_id = match info.last_object_id {
1812                LastObjectIdInfo::Unencrypted { id } => LastObjectId::Unencrypted { id },
1813                LastObjectIdInfo::Low32Bit => {
1814                    LastObjectId::Low32Bit { reserved: HashSet::new(), unreserved: Vec::new() }
1815                }
1816                _ => bail!(FxfsError::Inconsistent),
1817            };
1818        }
1819
1820        let fs = parent_store.filesystem();
1821
1822        let store = ObjectStore::new(
1823            Some(parent_store.clone()),
1824            store_object_id,
1825            fs.clone(),
1826            if is_encrypted { None } else { Some(info) },
1827            object_cache,
1828            None,
1829            if is_encrypted { LockState::Locked } else { LockState::Unencrypted },
1830            last_object_id,
1831        );
1832
1833        assert!(store.store_info_handle.set(handle).is_ok(), "Failed to set store_info_handle!");
1834
1835        if !is_encrypted {
1836            let object_tree_layer_object_ids =
1837                store.store_info.lock().as_ref().unwrap().layers.clone();
1838            let object_layers = store.open_layers(object_tree_layer_object_ids, None).await?;
1839            total_layer_size = object_layers.iter().map(|h| h.get_size()).sum();
1840            store
1841                .tree
1842                .append_layers(object_layers)
1843                .await
1844                .context("Failed to read object store layers")?;
1845        }
1846
1847        fs.object_manager().update_reservation(
1848            store_object_id,
1849            tree::reservation_amount_from_layer_size(total_layer_size),
1850        );
1851
1852        Ok(store)
1853    }
1854
1855    async fn load_store_info(&self) -> Result<StoreInfo, Error> {
1856        load_store_info_from_handle(self.store_info_handle.get().unwrap()).await
1857    }
1858
1859    async fn open_layers(
1860        &self,
1861        object_ids: impl std::iter::IntoIterator<Item = u64>,
1862        crypt: Option<Arc<dyn Crypt>>,
1863    ) -> Result<Vec<DataObjectHandle<ObjectStore>>, Error> {
1864        let parent_store = self.parent_store.as_ref().unwrap();
1865        let mut handles = Vec::new();
1866        for object_id in object_ids {
1867            let handle = ObjectStore::open_object(
1868                &parent_store,
1869                object_id,
1870                HandleOptions::default(),
1871                crypt.clone(),
1872            )
1873            .await
1874            .with_context(|| format!("Failed to open layer file {}", object_id))?;
1875            handles.push(handle);
1876        }
1877        Ok(handles)
1878    }
1879
1880    /// Unlocks a store so that it is ready to be used.
1881    /// This is not thread-safe.
1882    pub async fn unlock(
1883        self: &Arc<Self>,
1884        owner: Weak<dyn StoreOwner>,
1885        crypt: Arc<dyn Crypt>,
1886    ) -> Result<(), Error> {
1887        self.unlock_inner(owner, crypt, /*read_only=*/ false).await
1888    }
1889
1890    /// Unlocks a store so that it is ready to be read from.
1891    /// The store will generally behave like it is still locked: when flushed, the store will
1892    /// write out its mutations into the encrypted mutations file, rather than directly updating
1893    /// the layer files of the object store.
1894    /// Re-locking the store (which *must* be done with `Self::lock_read_only` will not trigger a
1895    /// flush, although the store might still be flushed during other operations.
1896    /// This is not thread-safe.
1897    pub async fn unlock_read_only(self: &Arc<Self>, crypt: Arc<dyn Crypt>) -> Result<(), Error> {
1898        self.unlock_inner(NO_OWNER, crypt, /*read_only=*/ true).await
1899    }
1900
1901    async fn unlock_inner(
1902        self: &Arc<Self>,
1903        owner: Weak<dyn StoreOwner>,
1904        crypt: Arc<dyn Crypt>,
1905        read_only: bool,
1906    ) -> Result<(), Error> {
1907        // Unless we are unlocking the store as read-only, the filesystem must not be read-only.
1908        assert!(read_only || !self.filesystem().options().read_only);
1909        match &*self.lock_state.lock() {
1910            LockState::Locked => {}
1911            LockState::Unencrypted => bail!(FxfsError::InvalidArgs),
1912            LockState::Invalid | LockState::Deleted => bail!(FxfsError::Internal),
1913            LockState::Unlocked { .. } | LockState::UnlockedReadOnly(..) => {
1914                bail!(FxfsError::AlreadyBound)
1915            }
1916            LockState::Unknown => panic!("Store was unlocked before replay"),
1917            LockState::Locking => panic!("Store is being locked"),
1918            LockState::Unlocking => panic!("Store is being unlocked"),
1919        }
1920        // We must lock flushing since that can modify store_info and the encrypted mutations file.
1921        let keys = lock_keys![LockKey::flush(self.store_object_id())];
1922        let fs = self.filesystem();
1923        let guard = fs.lock_manager().write_lock(keys).await;
1924
1925        let store_info = self.load_store_info().await?;
1926
1927        self.tree
1928            .append_layers(
1929                self.open_layers(store_info.layers.iter().cloned(), Some(crypt.clone())).await?,
1930            )
1931            .await
1932            .context("Failed to read object tree layer file contents")?;
1933
1934        let wrapped_key =
1935            fxfs_crypto::WrappedKey::Fxfs(store_info.mutations_key.clone().unwrap().into());
1936        let unwrapped_key = crypt
1937            .unwrap_key(&wrapped_key, self.store_object_id)
1938            .await
1939            .context("Failed to unwrap mutations keys")?;
1940        // The ChaCha20 stream cipher we use supports up to 64 GiB.  By default we'll roll the key
1941        // after every 128 MiB.  Here we just need to pick a number that won't cause issues if it
1942        // wraps, so we just use u32::MAX (the offset is u64).
1943        ensure!(store_info.mutations_cipher_offset <= u32::MAX as u64, FxfsError::Inconsistent);
1944        let mut mutations_cipher =
1945            StreamCipher::new(&unwrapped_key, store_info.mutations_cipher_offset);
1946
1947        match &store_info.last_object_id {
1948            LastObjectIdInfo::Encrypted { id, key } => {
1949                let wrapped_key = fxfs_crypto::WrappedKey::Fxfs(key.clone().into());
1950                *self.last_object_id.lock() = LastObjectId::Encrypted {
1951                    id: *id,
1952                    cipher: Box::new(Ff1::new(
1953                        &crypt.unwrap_key(&wrapped_key, self.store_object_id).await?,
1954                    )),
1955                };
1956            }
1957            LastObjectIdInfo::Low32Bit => {
1958                *self.last_object_id.lock() = LastObjectId::Low32Bit {
1959                    reserved: Default::default(),
1960                    unreserved: Default::default(),
1961                }
1962            }
1963            _ => unreachable!(),
1964        }
1965
1966        // Apply the encrypted mutations.
1967        let mut mutations = {
1968            if store_info.encrypted_mutations_object_id == INVALID_OBJECT_ID {
1969                EncryptedMutations::default()
1970            } else {
1971                let parent_store = self.parent_store.as_ref().unwrap();
1972                let handle = ObjectStore::open_object(
1973                    &parent_store,
1974                    store_info.encrypted_mutations_object_id,
1975                    HandleOptions::default(),
1976                    None,
1977                )
1978                .await?;
1979                let mut cursor = std::io::Cursor::new(
1980                    handle
1981                        .contents(MAX_ENCRYPTED_MUTATIONS_SIZE)
1982                        .await
1983                        .context(FxfsError::Inconsistent)?,
1984                );
1985                let mut mutations = EncryptedMutations::deserialize_with_version(&mut cursor)
1986                    .context("Failed to deserialize EncryptedMutations")?
1987                    .0;
1988                let len = cursor.get_ref().len() as u64;
1989                while cursor.position() < len {
1990                    mutations.extend(
1991                        &EncryptedMutations::deserialize_with_version(&mut cursor)
1992                            .context("Failed to deserialize EncryptedMutations")?
1993                            .0,
1994                    );
1995                }
1996                mutations
1997            }
1998        };
1999
2000        // This assumes that the journal has no buffered mutations for this store (see Self::lock).
2001        let journaled = EncryptedMutations::from_replayed_mutations(
2002            self.store_object_id,
2003            fs.journal()
2004                .read_transactions_for_object(self.store_object_id)
2005                .await
2006                .context("Failed to read encrypted mutations from journal")?,
2007        );
2008        mutations.extend(&journaled);
2009
2010        let _ = std::mem::replace(&mut *self.lock_state.lock(), LockState::Unlocking);
2011        *self.store_info.lock() = Some(store_info);
2012
2013        // If we fail, clean up.
2014        let clean_up = scopeguard::guard((), |_| {
2015            *self.lock_state.lock() = LockState::Locked;
2016            *self.store_info.lock() = None;
2017            // Make sure we don't leave unencrypted data lying around in memory.
2018            self.tree.reset();
2019        });
2020
2021        let EncryptedMutations { transactions, mut data, mutations_key_roll } = mutations;
2022
2023        let mut slice = &mut data[..];
2024        let mut last_offset = 0;
2025        for (offset, key) in mutations_key_roll {
2026            let split_offset = offset
2027                .checked_sub(last_offset)
2028                .ok_or(FxfsError::Inconsistent)
2029                .context("Invalid mutation key roll offset")?;
2030            last_offset = offset;
2031            ensure!(split_offset <= slice.len(), FxfsError::Inconsistent);
2032            let (old, new) = slice.split_at_mut(split_offset);
2033            mutations_cipher.decrypt(old);
2034            let unwrapped_key = crypt
2035                .unwrap_key(&fxfs_crypto::WrappedKey::Fxfs(key.into()), self.store_object_id)
2036                .await
2037                .context("Failed to unwrap mutations keys")?;
2038            mutations_cipher = StreamCipher::new(&unwrapped_key, 0);
2039            slice = new;
2040        }
2041        mutations_cipher.decrypt(slice);
2042
2043        // Always roll the mutations key when we unlock which guarantees we won't reuse a
2044        // previous key and nonce.
2045        self.roll_mutations_key(crypt.as_ref()).await?;
2046
2047        let mut cursor = std::io::Cursor::new(data);
2048        for (checkpoint, count) in transactions {
2049            let context = ApplyContext { mode: ApplyMode::Replay, checkpoint };
2050            for _ in 0..count {
2051                let mutation =
2052                    Mutation::deserialize_from_version(&mut cursor, context.checkpoint.version)
2053                        .context("failed to deserialize encrypted mutation")?;
2054                self.apply_mutation(mutation, &context, AssocObj::None)
2055                    .context("failed to apply encrypted mutation")?;
2056            }
2057        }
2058
2059        *self.lock_state.lock() = if read_only {
2060            LockState::UnlockedReadOnly(crypt)
2061        } else {
2062            LockState::Unlocked { owner, crypt }
2063        };
2064
2065        // To avoid unbounded memory growth, we should flush the encrypted mutations now. Otherwise
2066        // it's possible for more writes to be queued and for the store to be locked before we can
2067        // flush anything and that can repeat.
2068        std::mem::drop(guard);
2069
2070        if !read_only && !self.filesystem().options().read_only {
2071            self.flush_with_reason(flush::Reason::Unlock).await?;
2072
2073            // Reap purged files within this store.
2074            let _ = self.filesystem().graveyard().initial_reap(&self).await?;
2075        }
2076
2077        // Return and cancel the clean up.
2078        Ok(ScopeGuard::into_inner(clean_up))
2079    }
2080
2081    pub fn is_locked(&self) -> bool {
2082        matches!(
2083            *self.lock_state.lock(),
2084            LockState::Locked | LockState::Locking | LockState::Unknown
2085        )
2086    }
2087
2088    /// NB: This is not the converse of `is_locked`, as there are lock states where neither are
2089    /// true.
2090    pub fn is_unlocked(&self) -> bool {
2091        matches!(
2092            *self.lock_state.lock(),
2093            LockState::Unlocked { .. } | LockState::UnlockedReadOnly { .. } | LockState::Unlocking
2094        )
2095    }
2096
2097    pub fn is_unknown(&self) -> bool {
2098        matches!(*self.lock_state.lock(), LockState::Unknown)
2099    }
2100
2101    pub fn is_encrypted(&self) -> bool {
2102        self.store_info.lock().as_ref().unwrap().mutations_key.is_some()
2103    }
2104
2105    // Locks a store.
2106    // This operation will take a flush lock on the store, in case any flushes are ongoing.  Any
2107    // ongoing store accesses might be interrupted by this.  See `Self::crypt`.
2108    // Whilst this can return an error, the store will be placed into an unusable but safe state
2109    // (i.e. no lingering unencrypted data) if an error is encountered.
2110    pub async fn lock(&self) -> Result<(), Error> {
2111        // We must lock flushing since it is not safe for that to be happening whilst we are locking
2112        // the store.
2113        let keys = lock_keys![LockKey::flush(self.store_object_id())];
2114        let fs = self.filesystem();
2115        let _guard = fs.lock_manager().write_lock(keys).await;
2116
2117        {
2118            let mut lock_state = self.lock_state.lock();
2119            if let LockState::Unlocked { .. } = &*lock_state {
2120                *lock_state = LockState::Locking;
2121            } else {
2122                panic!("Unexpected lock state: {:?}", &*lock_state);
2123            }
2124        }
2125
2126        // Sync the journal now to ensure that any buffered mutations for this store make it out to
2127        // disk.  This is necessary to be able to unlock the store again.
2128        // We need to establish a barrier at this point (so that the journaled writes are observable
2129        // by any future attempts to unlock the store), hence the flush_device.
2130        let sync_result =
2131            self.filesystem().sync(SyncOptions { flush_device: true, ..Default::default() }).await;
2132
2133        *self.lock_state.lock() = if let Err(error) = &sync_result {
2134            error!(error:?; "Failed to sync journal; store will no longer be usable");
2135            LockState::Invalid
2136        } else {
2137            LockState::Locked
2138        };
2139        self.key_manager.clear();
2140        *self.store_info.lock() = None;
2141        self.tree.reset();
2142
2143        sync_result
2144    }
2145
2146    // Locks a store which was previously unlocked read-only (see `Self::unlock_read_only`).  Data
2147    // is not flushed, and instead any journaled mutations are buffered back into the ObjectStore
2148    // and will be replayed next time the store is unlocked.
2149    pub fn lock_read_only(&self) {
2150        *self.lock_state.lock() = LockState::Locked;
2151        *self.store_info.lock() = None;
2152        self.tree.reset();
2153    }
2154
2155    // Returns None if the object ID cipher needs to be created or rolled, or a more expensive
2156    // algorithm needs to be used.
2157    fn maybe_get_next_object_id(&self) -> Option<ReservedId<'_>> {
2158        self.last_object_id.lock().try_get_next().map(|id| ReservedId::new(self, id))
2159    }
2160
2161    /// Returns a new object ID that can be used.  This will create an object ID cipher if needed.
2162    ///
2163    /// If the object ID key needs to be rolled, a new transaction will be created and committed.
2164    /// This transaction does not take the filesystem lock, hence `txn_guard`.
2165    pub(super) async fn get_next_object_id(
2166        &self,
2167        txn_guard: &TxnGuard<'_>,
2168    ) -> Result<ReservedId<'_>, Error> {
2169        {
2170            let mut last_object_id = self.last_object_id.lock();
2171            if let Some(id) = last_object_id.try_get_next() {
2172                return Ok(ReservedId::new(self, id));
2173            }
2174            ensure!(
2175                !matches!(&*last_object_id, LastObjectId::Unencrypted { .. }),
2176                FxfsError::Inconsistent
2177            );
2178        }
2179
2180        let parent_store = self.parent_store().unwrap();
2181
2182        // Create a transaction (which has a lock) and then check again.
2183        //
2184        // NOTE: Since this can be a nested transaction, we must take care to avoid deadlocks; no
2185        // more locks should be taken whilst we hold this lock.
2186        let mut transaction = self
2187            .filesystem()
2188            .new_transaction(
2189                lock_keys![LockKey::object(parent_store.store_object_id, self.store_object_id)],
2190                Options {
2191                    // We must skip journal checks because this transaction might be needed to
2192                    // compact.
2193                    skip_journal_checks: true,
2194                    borrow_metadata_space: true,
2195                    txn_guard: Some(txn_guard),
2196                    ..Default::default()
2197                },
2198            )
2199            .await?;
2200
2201        let mut next_id_hi = 0;
2202
2203        let is_low_32_bit = {
2204            let mut last_object_id = self.last_object_id.lock();
2205            if let Some(id) = last_object_id.try_get_next() {
2206                // Something else raced and created/rolled the cipher.
2207                return Ok(ReservedId::new(self, id));
2208            }
2209
2210            match &*last_object_id {
2211                LastObjectId::Encrypted { id, .. } => {
2212                    // It shouldn't be possible for last_object_id to wrap within our lifetime, so
2213                    // if this happens, it's most likely due to corruption.
2214                    next_id_hi =
2215                        id.checked_add(1 << 32).ok_or(FxfsError::Inconsistent)? & OBJECT_ID_HI_MASK;
2216
2217                    info!(store_id = self.store_object_id; "Rolling object ID key");
2218
2219                    false
2220                }
2221                LastObjectId::Low32Bit { .. } => true,
2222                _ => unreachable!(),
2223            }
2224        };
2225
2226        if is_low_32_bit {
2227            // Keep picking an object ID at random until we find one free.
2228
2229            // To avoid races, this must be before we capture the layer set.
2230            self.last_object_id.lock().drain_unreserved();
2231
2232            let layer_set = self.tree.layer_set();
2233            let mut key = ObjectKey::object(0);
2234            loop {
2235                let next_id = rand::rng().next_u32() as u64;
2236                let Some(next_id) = NonZero::new(next_id) else { continue };
2237                if self.last_object_id.lock().is_reserved(next_id.get()) {
2238                    continue;
2239                }
2240                key.object_id = next_id.get();
2241                if layer_set.key_exists(&key).await? == Existence::Missing {
2242                    self.last_object_id.lock().reserve(next_id.get());
2243                    return Ok(ReservedId::new(self, next_id));
2244                }
2245            }
2246        } else {
2247            // Create a key.
2248            let (object_id_wrapped, object_id_unwrapped) = self
2249                .crypt()
2250                .unwrap()
2251                .create_key(self.store_object_id, KeyPurpose::Metadata)
2252                .await?;
2253
2254            // Normally we would use a mutation to note the updated key, but that would complicate
2255            // replay.  During replay, we need to keep track of the highest used object ID and this
2256            // is done by watching mutations to see when we create objects, and then decrypting
2257            // the object ID.  This relies on the unwrapped key being available, so as soon as
2258            // we detect the key has changed, we would need to immediately unwrap the key via the
2259            // crypt service.  Currently, this isn't easy to do during replay.  An option we could
2260            // consider would be to include the unencrypted object ID when we create objects, which
2261            // would avoid us having to decrypt the object ID during replay.
2262            //
2263            // For now and for historical reasons, the approach we take is to just write a new
2264            // version of StoreInfo here.  We must take care that we only update the key and not any
2265            // other information contained within StoreInfo because other information should only be
2266            // updated when we flush.  We are holding the lock on the StoreInfo file, so this will
2267            // prevent potential races with flushing.  To make sure we only change the key, we read
2268            // StoreInfo from storage rather than using our in-memory copy.  This won't be
2269            // performant, but rolling the object ID key will be extremely rare.
2270            let new_store_info = StoreInfo {
2271                last_object_id: LastObjectIdInfo::Encrypted {
2272                    id: next_id_hi,
2273                    key: object_id_wrapped.clone(),
2274                },
2275                ..self.load_store_info().await?
2276            };
2277
2278            self.write_store_info(&mut transaction, &new_store_info).await?;
2279
2280            transaction
2281                .commit_with_callback(|_| {
2282                    self.store_info.lock().as_mut().unwrap().last_object_id =
2283                        new_store_info.last_object_id;
2284                    match &mut *self.last_object_id.lock() {
2285                        LastObjectId::Encrypted { id, cipher } => {
2286                            **cipher = Ff1::new(&object_id_unwrapped);
2287                            *id = next_id_hi;
2288                            ReservedId::new(
2289                                self,
2290                                NonZero::new(next_id_hi | cipher.encrypt(0) as u64).unwrap(),
2291                            )
2292                        }
2293                        _ => unreachable!(),
2294                    }
2295                })
2296                .await
2297        }
2298    }
2299
2300    /// Query the next object ID that will be used. Intended for use when checking filesystem
2301    /// consistency. Prefer [`Self::get_next_object_id()`] for general use.
2302    pub(crate) fn query_next_object_id(&self) -> u64 {
2303        self.last_object_id.lock().peek_next()
2304    }
2305
2306    fn allocator(&self) -> Arc<Allocator> {
2307        self.filesystem().allocator()
2308    }
2309
2310    // If |transaction| has an impending mutation for the underlying object, returns that.
2311    // Otherwise, looks up the object from the tree and returns a suitable mutation for it.  The
2312    // mutation is returned here rather than the item because the mutation includes the operation
2313    // which has significance: inserting an object implies it's the first of its kind unlike
2314    // replacing an object.
2315    async fn txn_get_object_mutation(
2316        &self,
2317        transaction: &Transaction<'_>,
2318        object_id: u64,
2319    ) -> Result<ObjectStoreMutation, Error> {
2320        if let Some(mutation) =
2321            transaction.get_object_mutation(self.store_object_id, ObjectKey::object(object_id))
2322        {
2323            Ok(mutation.clone())
2324        } else {
2325            Ok(ObjectStoreMutation {
2326                item: self
2327                    .tree
2328                    .find(&ObjectKey::object(object_id))
2329                    .await?
2330                    .ok_or(FxfsError::Inconsistent)
2331                    .context("Object id missing")?,
2332                op: Operation::ReplaceOrInsert,
2333            })
2334        }
2335    }
2336
2337    /// Like txn_get_object_mutation but with expanded visibility.
2338    /// Only available in migration code.
2339    #[cfg(feature = "migration")]
2340    pub async fn get_object_mutation(
2341        &self,
2342        transaction: &Transaction<'_>,
2343        object_id: u64,
2344    ) -> Result<ObjectStoreMutation, Error> {
2345        self.txn_get_object_mutation(transaction, object_id).await
2346    }
2347
2348    fn update_last_object_id(&self, object_id: u64) {
2349        let mut last_object_id = self.last_object_id.lock();
2350        match &mut *last_object_id {
2351            LastObjectId::Pending => unreachable!(),
2352            LastObjectId::Unencrypted { id } => {
2353                if object_id > *id {
2354                    *id = object_id
2355                }
2356            }
2357            LastObjectId::Encrypted { id, cipher } => {
2358                // For encrypted stores, object_id will be encrypted here, so we must decrypt first.
2359
2360                // If the object ID cipher has been rolled, then it's possible we might see object
2361                // IDs that were generated using a different cipher so the decrypt here will return
2362                // the wrong value, but that won't matter because the hi part of the object ID
2363                // should still discriminate.
2364                let object_id =
2365                    object_id & OBJECT_ID_HI_MASK | cipher.decrypt(object_id as u32) as u64;
2366                if object_id > *id {
2367                    *id = object_id;
2368                }
2369            }
2370            LastObjectId::Low32Bit { .. } => {}
2371        }
2372    }
2373
2374    /// Adds the specified object to the graveyard.
2375    pub fn add_to_graveyard(&self, transaction: &mut Transaction<'_>, object_id: u64) {
2376        let graveyard_id = self.graveyard_directory_object_id();
2377        assert_ne!(graveyard_id, INVALID_OBJECT_ID);
2378        transaction.add(
2379            self.store_object_id,
2380            Mutation::replace_or_insert_object(
2381                ObjectKey::graveyard_entry(graveyard_id, object_id),
2382                ObjectValue::Some,
2383            ),
2384        );
2385    }
2386
2387    /// Removes the specified object from the graveyard.  NB: Care should be taken when calling
2388    /// this because graveyard entries are used for purging deleted files *and* for trimming
2389    /// extents.  For example, consider the following sequence:
2390    ///
2391    ///     1. Add Trim graveyard entry.
2392    ///     2. Replace with Some graveyard entry (see above).
2393    ///     3. Remove graveyard entry.
2394    ///
2395    /// If the desire in #3 is just to cancel the effect of the Some entry, then #3 should
2396    /// actually be:
2397    ///
2398    ///     3. Replace with Trim graveyard entry.
2399    pub fn remove_from_graveyard(&self, transaction: &mut Transaction<'_>, object_id: u64) {
2400        transaction.add(
2401            self.store_object_id,
2402            Mutation::replace_or_insert_object(
2403                ObjectKey::graveyard_entry(self.graveyard_directory_object_id(), object_id),
2404                ObjectValue::None,
2405            ),
2406        );
2407    }
2408
2409    /// Removes the specified attribute from the graveyard. Unlike object graveyard entries,
2410    /// attribute graveyard entries only have one functionality (i.e. to purge deleted attributes)
2411    /// so the caller does not need to be concerned about replacing the graveyard attribute entry
2412    /// with its prior state when cancelling it. See comment on `remove_from_graveyard()`.
2413    pub fn remove_attribute_from_graveyard(
2414        &self,
2415        transaction: &mut Transaction<'_>,
2416        object_id: u64,
2417        attribute_id: u64,
2418    ) {
2419        transaction.add(
2420            self.store_object_id,
2421            Mutation::replace_or_insert_object(
2422                ObjectKey::graveyard_attribute_entry(
2423                    self.graveyard_directory_object_id(),
2424                    object_id,
2425                    attribute_id,
2426                ),
2427                ObjectValue::None,
2428            ),
2429        );
2430    }
2431
2432    // Roll the mutations key.  The new key will be written for the next encrypted mutation.
2433    async fn roll_mutations_key(&self, crypt: &dyn Crypt) -> Result<(), Error> {
2434        let (wrapped_key, unwrapped_key) =
2435            crypt.create_key(self.store_object_id, KeyPurpose::Metadata).await?;
2436
2437        // The mutations_cipher lock must be held for the duration so that mutations_cipher and
2438        // store_info are updated atomically.  Otherwise, write_mutation could find a new cipher but
2439        // end up writing the wrong wrapped key.
2440        let mut cipher = self.mutations_cipher.lock();
2441        *cipher = Some(StreamCipher::new(&unwrapped_key, 0));
2442        self.store_info.lock().as_mut().unwrap().mutations_key = Some(wrapped_key);
2443        // mutations_cipher_offset is updated by flush.
2444        Ok(())
2445    }
2446
2447    // When the symlink is unlocked, this function decrypts `link` and returns a bag of bytes that
2448    // is identical to that which was passed in as the target on `create_symlink`.
2449    // If the symlink is locked, this function hashes the encrypted `link` with Sha256 in order to
2450    // get a standard length and then base64 encodes the hash and returns that to the caller.
2451    pub async fn read_encrypted_symlink(
2452        &self,
2453        object_id: u64,
2454        link: Vec<u8>,
2455    ) -> Result<Vec<u8>, Error> {
2456        let mut link = link;
2457        let key = self
2458            .key_manager()
2459            .get_fscrypt_key(object_id, self.crypt().unwrap().as_ref(), async || {
2460                self.get_keys(object_id).await
2461            })
2462            .await?;
2463        if let Some(key) = key.into_cipher() {
2464            key.decrypt_symlink(object_id, &mut link)?;
2465            Ok(link)
2466        } else {
2467            // Locked symlinks are encoded using a hash_code of 0.
2468            let proxy_filename =
2469                fscrypt::proxy_filename::ProxyFilename::new_with_hash_code(0, &link);
2470            let proxy_filename_str: String = proxy_filename.into();
2471            Ok(proxy_filename_str.as_bytes().to_vec())
2472        }
2473    }
2474
2475    /// Returns the link of a symlink object.
2476    pub async fn read_symlink(&self, object_id: u64) -> Result<Vec<u8>, Error> {
2477        match self.tree.find(&ObjectKey::object(object_id)).await? {
2478            None => bail!(FxfsError::NotFound),
2479            Some(Item {
2480                value: ObjectValue::Object { kind: ObjectKind::EncryptedSymlink { link, .. }, .. },
2481                ..
2482            }) => self.read_encrypted_symlink(object_id, link.to_vec()).await,
2483            Some(Item {
2484                value: ObjectValue::Object { kind: ObjectKind::Symlink { link, .. }, .. },
2485                ..
2486            }) => Ok(link.to_vec()),
2487            Some(item) => Err(anyhow!(FxfsError::Inconsistent)
2488                .context(format!("Unexpected item in lookup: {item:?}"))),
2489        }
2490    }
2491
2492    /// Retrieves the wrapped keys for the given object.  The keys *should* be known to exist and it
2493    /// will be considered an inconsistency if they don't.
2494    pub async fn get_keys(&self, object_id: u64) -> Result<EncryptionKeys, Error> {
2495        match self.tree.find(&ObjectKey::keys(object_id)).await?.ok_or(FxfsError::Inconsistent)? {
2496            Item { value: ObjectValue::Keys(keys), .. } => Ok(keys),
2497            _ => Err(anyhow!(FxfsError::Inconsistent).context("open_object: Expected keys")),
2498        }
2499    }
2500
2501    pub async fn update_attributes<'a>(
2502        &self,
2503        transaction: &mut Transaction<'a>,
2504        object_id: u64,
2505        node_attributes: Option<&fio::MutableNodeAttributes>,
2506        change_time: Option<Timestamp>,
2507    ) -> Result<(), Error> {
2508        if change_time.is_none() {
2509            if let Some(attributes) = node_attributes {
2510                let empty_attributes = fio::MutableNodeAttributes { ..Default::default() };
2511                if *attributes == empty_attributes {
2512                    return Ok(());
2513                }
2514            } else {
2515                return Ok(());
2516            }
2517        }
2518        let mut mutation = self.txn_get_object_mutation(transaction, object_id).await?;
2519        if let ObjectValue::Object { ref mut attributes, .. } = mutation.item.value {
2520            if let Some(time) = change_time {
2521                attributes.change_time = time;
2522            }
2523            if let Some(node_attributes) = node_attributes {
2524                if let Some(time) = node_attributes.creation_time {
2525                    attributes.creation_time = Timestamp::from_nanos(time);
2526                }
2527                if let Some(time) = node_attributes.modification_time {
2528                    attributes.modification_time = Timestamp::from_nanos(time);
2529                }
2530                if let Some(time) = node_attributes.access_time {
2531                    attributes.access_time = Timestamp::from_nanos(time);
2532                }
2533                if node_attributes.mode.is_some()
2534                    || node_attributes.uid.is_some()
2535                    || node_attributes.gid.is_some()
2536                    || node_attributes.rdev.is_some()
2537                {
2538                    if let Some(a) = &mut attributes.posix_attributes {
2539                        if let Some(mode) = node_attributes.mode {
2540                            a.mode = mode;
2541                        }
2542                        if let Some(uid) = node_attributes.uid {
2543                            a.uid = uid;
2544                        }
2545                        if let Some(gid) = node_attributes.gid {
2546                            a.gid = gid;
2547                        }
2548                        if let Some(rdev) = node_attributes.rdev {
2549                            a.rdev = rdev;
2550                        }
2551                    } else {
2552                        attributes.posix_attributes = Some(PosixAttributes {
2553                            mode: node_attributes.mode.unwrap_or_default(),
2554                            uid: node_attributes.uid.unwrap_or_default(),
2555                            gid: node_attributes.gid.unwrap_or_default(),
2556                            rdev: node_attributes.rdev.unwrap_or_default(),
2557                        });
2558                    }
2559                }
2560            }
2561        } else {
2562            bail!(
2563                anyhow!(FxfsError::Inconsistent)
2564                    .context("ObjectStore.update_attributes: Expected object value")
2565            );
2566        };
2567        transaction.add(self.store_object_id(), Mutation::ObjectStore(mutation));
2568        Ok(())
2569    }
2570
2571    // Updates and commits the changes to access time in ObjectProperties. The update matches
2572    // Linux's RELATIME. That is, access time is updated to the current time if access time is less
2573    // than or equal to the last modification or status change, or if it has been more than a day
2574    // since the last access.
2575    pub async fn update_access_time(
2576        &self,
2577        object_id: u64,
2578        props: &mut ObjectProperties,
2579    ) -> Result<(), Error> {
2580        let access_time = props.access_time.as_nanos();
2581        let modification_time = props.modification_time.as_nanos();
2582        let change_time = props.change_time.as_nanos();
2583        let now = Timestamp::now();
2584        if access_time <= modification_time
2585            || access_time <= change_time
2586            || access_time
2587                < now.as_nanos()
2588                    - Timestamp::from(std::time::Duration::from_secs(24 * 60 * 60)).as_nanos()
2589        {
2590            let mut transaction = self
2591                .filesystem()
2592                .clone()
2593                .new_transaction(
2594                    lock_keys![LockKey::object(self.store_object_id, object_id,)],
2595                    Options { borrow_metadata_space: true, ..Default::default() },
2596                )
2597                .await?;
2598            self.update_attributes(
2599                &mut transaction,
2600                object_id,
2601                Some(&fio::MutableNodeAttributes {
2602                    access_time: Some(now.as_nanos()),
2603                    ..Default::default()
2604                }),
2605                None,
2606            )
2607            .await?;
2608            transaction.commit().await?;
2609            props.access_time = now;
2610        }
2611        Ok(())
2612    }
2613
2614    async fn write_store_info<'a>(
2615        &'a self,
2616        transaction: &mut Transaction<'a>,
2617        info: &StoreInfo,
2618    ) -> Result<(), Error> {
2619        let mut serialized_info = Vec::new();
2620        info.serialize_with_version(&mut serialized_info)?;
2621        let mut buf = self.device.allocate_buffer(serialized_info.len()).await;
2622        buf.as_mut_slice().copy_from_slice(&serialized_info[..]);
2623        self.store_info_handle.get().unwrap().txn_write(transaction, 0u64, buf.as_ref()).await
2624    }
2625
2626    pub fn mark_deleted(&self) {
2627        *self.lock_state.lock() = LockState::Deleted;
2628    }
2629}
2630
2631#[async_trait]
2632impl JournalingObject for ObjectStore {
2633    fn apply_mutation(
2634        &self,
2635        mutation: Mutation,
2636        context: &ApplyContext<'_, '_>,
2637        _assoc_obj: AssocObj<'_>,
2638    ) -> Result<(), Error> {
2639        match &*self.lock_state.lock() {
2640            LockState::Locked | LockState::Locking => {
2641                ensure!(
2642                    matches!(mutation, Mutation::BeginFlush | Mutation::EndFlush)
2643                        || matches!(
2644                            mutation,
2645                            Mutation::EncryptedObjectStore(_) | Mutation::UpdateMutationsKey(_)
2646                                if context.mode.is_replay()
2647                        ),
2648                    anyhow!(FxfsError::Inconsistent)
2649                        .context(format!("Unexpected mutation for encrypted store: {mutation:?}"))
2650                );
2651            }
2652            LockState::Invalid
2653            | LockState::Unlocking
2654            | LockState::Unencrypted
2655            | LockState::Unlocked { .. }
2656            | LockState::UnlockedReadOnly(..)
2657            | LockState::Deleted => {}
2658            lock_state @ _ => panic!("Unexpected lock state: {lock_state:?}"),
2659        }
2660        match mutation {
2661            Mutation::ObjectStore(ObjectStoreMutation { mut item, op }) => {
2662                item.sequence = context.checkpoint.file_offset;
2663                match op {
2664                    Operation::Insert => {
2665                        let mut unreserve_id = INVALID_OBJECT_ID;
2666                        // If we are inserting an object record for the first time, it signifies the
2667                        // birth of the object so we need to adjust the object count.
2668                        if matches!(item.value, ObjectValue::Object { .. }) {
2669                            {
2670                                let info = &mut self.store_info.lock();
2671                                let object_count = &mut info.as_mut().unwrap().object_count;
2672                                *object_count = object_count.saturating_add(1);
2673                            }
2674                            if context.mode.is_replay() {
2675                                self.update_last_object_id(item.key.object_id);
2676                            } else {
2677                                unreserve_id = item.key.object_id;
2678                            }
2679                        }
2680                        self.tree.insert(item)?;
2681                        if unreserve_id != INVALID_OBJECT_ID {
2682                            // To avoid races, this *must* be after the `tree.insert(..)` above.
2683                            self.last_object_id.lock().unreserve(unreserve_id);
2684                        }
2685                    }
2686                    Operation::ReplaceOrInsert => {
2687                        self.tree.replace_or_insert(item);
2688                    }
2689                    Operation::Merge => {
2690                        if item.is_tombstone() {
2691                            let info = &mut self.store_info.lock();
2692                            let object_count = &mut info.as_mut().unwrap().object_count;
2693                            *object_count = object_count.saturating_sub(1);
2694                        }
2695                        let lower_bound = item.key.key_for_merge_into();
2696                        self.tree.merge_into(item, &lower_bound);
2697                    }
2698                }
2699            }
2700            Mutation::BeginFlush => {
2701                ensure!(self.parent_store.is_some(), FxfsError::Inconsistent);
2702                self.tree.seal();
2703            }
2704            Mutation::EndFlush => ensure!(self.parent_store.is_some(), FxfsError::Inconsistent),
2705            Mutation::EncryptedObjectStore(_) | Mutation::UpdateMutationsKey(_) => {
2706                // We will process these during Self::unlock.
2707                ensure!(
2708                    !matches!(&*self.lock_state.lock(), LockState::Unencrypted),
2709                    FxfsError::Inconsistent
2710                );
2711            }
2712            Mutation::CreateInternalDir(object_id) => {
2713                ensure!(object_id != INVALID_OBJECT_ID, FxfsError::Inconsistent);
2714                self.store_info.lock().as_mut().unwrap().internal_directory_object_id = object_id;
2715            }
2716            _ => bail!("unexpected mutation: {:?}", mutation),
2717        }
2718        self.counters.lock().mutations_applied += 1;
2719        Ok(())
2720    }
2721
2722    fn drop_mutation(&self, mutation: Mutation, _transaction: &Transaction<'_>) {
2723        self.counters.lock().mutations_dropped += 1;
2724        if let Mutation::ObjectStore(ObjectStoreMutation {
2725            item: Item { key: ObjectKey { object_id, .. }, value: ObjectValue::Object { .. }, .. },
2726            op: Operation::Insert,
2727        }) = mutation
2728        {
2729            self.last_object_id.lock().unreserve(object_id);
2730        }
2731    }
2732
2733    /// Push all in-memory structures to the device. This is not necessary for sync since the
2734    /// journal will take care of it.  This is supposed to be called when there is either memory or
2735    /// space pressure (flushing the store will persist in-memory data and allow the journal file to
2736    /// be trimmed).
2737    ///
2738    /// Also returns the earliest version of a struct in the filesystem (when known).
2739    async fn flush(&self) -> Result<Version, Error> {
2740        self.flush_with_reason(flush::Reason::Journal).await
2741    }
2742
2743    fn write_mutation(&self, mutation: &Mutation, mut writer: journal::Writer<'_>) {
2744        // Intentionally enumerating all variants to force a decision on any new variants. Encrypt
2745        // all mutations that could affect an encrypted object store contents or the `StoreInfo` of
2746        // the encrypted object store. During `unlock()` any mutations which haven't been encrypted
2747        // won't be replayed after reading `StoreInfo`.
2748        match mutation {
2749            // Whilst CreateInternalDir is a mutation for `StoreInfo`, which isn't encrypted, we
2750            // still choose to encrypt the mutation because it makes it easier to deal with replay.
2751            // When we replay mutations for an encrypted store, the only thing we keep in memory are
2752            // the encrypted mutations; we don't keep `StoreInfo` or changes to it in memory. So, by
2753            // encrypting the CreateInternalDir mutation here, it means we don't have to track both
2754            // encrypted mutations bound for the LSM tree and unencrypted mutations for `StoreInfo`
2755            // to use in `unlock()`. It'll just bundle CreateInternalDir mutations with the other
2756            // encrypted mutations and handled them all in sequence during `unlock()`.
2757            Mutation::ObjectStore(_) | Mutation::CreateInternalDir(_) => {
2758                let mut cipher = self.mutations_cipher.lock();
2759                if let Some(cipher) = cipher.as_mut() {
2760                    // If this is the first time we've used this key, we must write the key out.
2761                    if cipher.offset() == 0 {
2762                        writer.write(Mutation::update_mutations_key(
2763                            self.store_info
2764                                .lock()
2765                                .as_ref()
2766                                .unwrap()
2767                                .mutations_key
2768                                .as_ref()
2769                                .unwrap()
2770                                .clone(),
2771                        ));
2772                    }
2773                    let mut buffer = Vec::new();
2774                    mutation.serialize_into(&mut buffer).unwrap();
2775                    cipher.encrypt(&mut buffer);
2776                    writer.write(Mutation::EncryptedObjectStore(buffer.into()));
2777                    return;
2778                }
2779            }
2780            // `EncryptedObjectStore` and `UpdateMutationsKey` are both obviously associated with
2781            // encrypted object stores, but are either the encrypted mutation data itself or
2782            // metadata governing how the data will be encrypted. They should only be produced here.
2783            Mutation::EncryptedObjectStore(_) | Mutation::UpdateMutationsKey(_) => {
2784                debug_assert!(false, "Only this method should generate encrypted mutations");
2785            }
2786            // `BeginFlush` and `EndFlush` are not needed during `unlock()` and are needed during
2787            // the initial journal replay, so should not be encrypted. `Allocator`, `DeleteVolume`,
2788            // `UpdateBorrowed` mutations are never associated with an encrypted store as we do not
2789            // encrypt the allocator or root/root-parent stores so we can avoid the locking.
2790            Mutation::Allocator(_)
2791            | Mutation::BeginFlush
2792            | Mutation::EndFlush
2793            | Mutation::DeleteVolume
2794            | Mutation::UpdateBorrowed(_) => {}
2795        }
2796        writer.write(mutation.clone());
2797    }
2798}
2799
2800impl Drop for ObjectStore {
2801    fn drop(&mut self) {
2802        let mut last_object_id = self.last_object_id.lock();
2803        last_object_id.drain_unreserved();
2804        match &*last_object_id {
2805            LastObjectId::Low32Bit { reserved, .. } => debug_assert!(reserved.is_empty()),
2806            _ => {}
2807        }
2808    }
2809}
2810
2811impl HandleOwner for ObjectStore {}
2812
2813impl AsRef<ObjectStore> for ObjectStore {
2814    fn as_ref(&self) -> &ObjectStore {
2815        self
2816    }
2817}
2818
2819fn layer_size_from_encrypted_mutations_size(size: u64) -> u64 {
2820    // This is similar to reserved_space_from_journal_usage. It needs to be a worst case estimate of
2821    // the amount of metadata space that might need to be reserved to allow the encrypted mutations
2822    // to be written to layer files.  It needs to be >= than reservation_amount_from_layer_size will
2823    // return once the data has been written to layer files and <= than
2824    // reserved_space_from_journal_usage would use.  We can't just use
2825    // reserved_space_from_journal_usage because the encrypted mutations file includes some extra
2826    // data (it includes the checkpoints) that isn't written in the same way to the journal.
2827    size * 3
2828}
2829
2830impl AssociatedObject for ObjectStore {}
2831
2832/// Argument to the trim_some method.
2833#[derive(Debug)]
2834pub enum TrimMode {
2835    /// Trim extents beyond the current size.
2836    UseSize,
2837
2838    /// Trim extents beyond the supplied offset.
2839    FromOffset(u64),
2840
2841    /// Remove the object (or attribute) from the store once it is fully trimmed.
2842    Tombstone(TombstoneMode),
2843}
2844
2845/// Sets the mode for tombstoning (either at the object or attribute level).
2846#[derive(Debug)]
2847pub enum TombstoneMode {
2848    Object,
2849    Attribute,
2850}
2851
2852/// Result of the trim_some method.
2853#[derive(Debug)]
2854pub enum TrimResult {
2855    /// We reached the limit of the transaction and more extents might follow.
2856    Incomplete,
2857
2858    /// We finished this attribute.  Returns the ID of the next attribute for the same object if
2859    /// there is one.
2860    Done(Option<u64>),
2861}
2862
2863/// Loads store info.
2864pub async fn load_store_info(
2865    parent: &Arc<ObjectStore>,
2866    store_object_id: u64,
2867) -> Result<StoreInfo, Error> {
2868    load_store_info_from_handle(
2869        &ObjectStore::open_object(parent, store_object_id, HandleOptions::default(), None).await?,
2870    )
2871    .await
2872}
2873
2874async fn load_store_info_from_handle(
2875    handle: &DataObjectHandle<impl HandleOwner>,
2876) -> Result<StoreInfo, Error> {
2877    Ok(if handle.get_size() > 0 {
2878        let serialized_info = handle.contents(MAX_STORE_INFO_SERIALIZED_SIZE).await?;
2879        let mut cursor = std::io::Cursor::new(serialized_info);
2880        let (store_info, _) = StoreInfo::deserialize_with_version(&mut cursor)
2881            .context("Failed to deserialize StoreInfo")?;
2882        store_info
2883    } else {
2884        // The store_info will be absent for a newly created and empty object store.
2885        StoreInfo::default()
2886    })
2887}
2888
2889#[cfg(test)]
2890mod tests {
2891    use super::{
2892        DEFAULT_DATA_ATTRIBUTE_ID, FSVERITY_MERKLE_ATTRIBUTE_ID, FsverityMetadata, HandleOptions,
2893        LastObjectId, LastObjectIdInfo, LockKey, MAX_STORE_INFO_SERIALIZED_SIZE, Mutation,
2894        NO_OWNER, NewChildStoreOptions, OBJECT_ID_HI_MASK, ObjectStore, RootDigest, StoreInfo,
2895        StoreOptions, StoreOwner,
2896    };
2897    use crate::errors::FxfsError;
2898    use crate::filesystem::{FxFilesystem, JournalingObject, OpenFxFilesystem, SyncOptions};
2899    use crate::fsck::{fsck, fsck_volume};
2900    use crate::lsm_tree::Query;
2901    use crate::lsm_tree::types::{ItemRef, LayerIterator};
2902    use crate::object_handle::{
2903        INVALID_OBJECT_ID, ObjectHandle, ReadObjectHandle, WriteObjectHandle,
2904    };
2905    use crate::object_store::directory::Directory;
2906    use crate::object_store::object_record::{AttributeKey, ObjectKey, ObjectKind, ObjectValue};
2907    use crate::object_store::transaction::{Options, lock_keys};
2908    use crate::object_store::volume::root_volume;
2909    use crate::serialized_types::VersionedLatest;
2910    use crate::testing;
2911    use assert_matches::assert_matches;
2912    use async_trait::async_trait;
2913    use fuchsia_async as fasync;
2914    use fuchsia_sync::Mutex;
2915    use futures::join;
2916    use fxfs_crypto::ff1::Ff1;
2917    use fxfs_crypto::{
2918        Crypt, FXFS_KEY_SIZE, FXFS_WRAPPED_KEY_SIZE, FxfsKey, UnwrappedKey, WrappedKeyBytes,
2919    };
2920    use fxfs_insecure_crypto::new_insecure_crypt;
2921
2922    use std::sync::Arc;
2923    use std::time::Duration;
2924    use storage_device::DeviceHolder;
2925    use storage_device::fake_device::FakeDevice;
2926    use test_case::test_case;
2927
2928    const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
2929
2930    async fn test_filesystem() -> OpenFxFilesystem {
2931        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
2932        FxFilesystem::new_empty(device).await.expect("new_empty failed")
2933    }
2934
2935    #[fuchsia::test]
2936    async fn test_item_sequences() {
2937        let fs = test_filesystem().await;
2938        let object1;
2939        let object2;
2940        let object3;
2941        let mut transaction = fs
2942            .clone()
2943            .new_transaction(lock_keys![], Options::default())
2944            .await
2945            .expect("new_transaction failed");
2946        let store = fs.root_store();
2947        object1 = Arc::new(
2948            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2949                .await
2950                .expect("create_object failed"),
2951        );
2952        transaction.commit().await.expect("commit failed");
2953        let mut transaction = fs
2954            .clone()
2955            .new_transaction(lock_keys![], Options::default())
2956            .await
2957            .expect("new_transaction failed");
2958        object2 = Arc::new(
2959            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2960                .await
2961                .expect("create_object failed"),
2962        );
2963        transaction.commit().await.expect("commit failed");
2964
2965        fs.sync(SyncOptions::default()).await.expect("sync failed");
2966
2967        let mut transaction = fs
2968            .clone()
2969            .new_transaction(lock_keys![], Options::default())
2970            .await
2971            .expect("new_transaction failed");
2972        object3 = Arc::new(
2973            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2974                .await
2975                .expect("create_object failed"),
2976        );
2977        transaction.commit().await.expect("commit failed");
2978
2979        let layer_set = store.tree.layer_set();
2980        let mut merger = layer_set.merger();
2981        let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
2982        let mut sequences = [0u64; 3];
2983        while let Some(ItemRef { key: ObjectKey { object_id, .. }, sequence, .. }) = iter.get() {
2984            if *object_id == object1.object_id() {
2985                sequences[0] = sequence;
2986            } else if *object_id == object2.object_id() {
2987                sequences[1] = sequence;
2988            } else if *object_id == object3.object_id() {
2989                sequences[2] = sequence;
2990            }
2991            iter.advance().await.expect("advance failed");
2992        }
2993
2994        assert!(sequences[0] <= sequences[1], "sequences: {:?}", sequences);
2995        // The last item came after a sync, so should be strictly greater.
2996        assert!(sequences[1] < sequences[2], "sequences: {:?}", sequences);
2997        fs.close().await.expect("Close failed");
2998    }
2999
3000    #[fuchsia::test]
3001    async fn test_verified_file_with_verified_attribute() {
3002        let fs: OpenFxFilesystem = test_filesystem().await;
3003        let mut transaction = fs
3004            .clone()
3005            .new_transaction(lock_keys![], Options::default())
3006            .await
3007            .expect("new_transaction failed");
3008        let store = fs.root_store();
3009        let object = Arc::new(
3010            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
3011                .await
3012                .expect("create_object failed"),
3013        );
3014
3015        transaction.add(
3016            store.store_object_id(),
3017            Mutation::replace_or_insert_object(
3018                ObjectKey::attribute(
3019                    object.object_id(),
3020                    DEFAULT_DATA_ATTRIBUTE_ID,
3021                    AttributeKey::Attribute,
3022                ),
3023                ObjectValue::verified_attribute(
3024                    0,
3025                    FsverityMetadata::Internal(RootDigest::Sha256([0; 32]), vec![]),
3026                ),
3027            ),
3028        );
3029
3030        transaction.add(
3031            store.store_object_id(),
3032            Mutation::replace_or_insert_object(
3033                ObjectKey::attribute(
3034                    object.object_id(),
3035                    FSVERITY_MERKLE_ATTRIBUTE_ID,
3036                    AttributeKey::Attribute,
3037                ),
3038                ObjectValue::attribute(0, false),
3039            ),
3040        );
3041
3042        transaction.commit().await.unwrap();
3043
3044        let handle =
3045            ObjectStore::open_object(&store, object.object_id(), HandleOptions::default(), None)
3046                .await
3047                .expect("open_object failed");
3048
3049        assert!(handle.is_verified_file());
3050
3051        fs.close().await.expect("Close failed");
3052    }
3053
3054    #[fuchsia::test]
3055    async fn test_verified_file_without_verified_attribute() {
3056        let fs: OpenFxFilesystem = test_filesystem().await;
3057        let mut transaction = fs
3058            .clone()
3059            .new_transaction(lock_keys![], Options::default())
3060            .await
3061            .expect("new_transaction failed");
3062        let store = fs.root_store();
3063        let object = Arc::new(
3064            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
3065                .await
3066                .expect("create_object failed"),
3067        );
3068
3069        transaction.commit().await.unwrap();
3070
3071        let handle =
3072            ObjectStore::open_object(&store, object.object_id(), HandleOptions::default(), None)
3073                .await
3074                .expect("open_object failed");
3075
3076        assert!(!handle.is_verified_file());
3077
3078        fs.close().await.expect("Close failed");
3079    }
3080
3081    #[fuchsia::test]
3082    async fn test_create_and_open_store() {
3083        let fs = test_filesystem().await;
3084        let store_id = {
3085            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3086            root_volume
3087                .new_volume(
3088                    "test",
3089                    NewChildStoreOptions {
3090                        options: StoreOptions {
3091                            owner: NO_OWNER,
3092                            crypt: Some(Arc::new(new_insecure_crypt())),
3093                        },
3094                        ..Default::default()
3095                    },
3096                )
3097                .await
3098                .expect("new_volume failed")
3099                .store_object_id()
3100        };
3101
3102        fs.close().await.expect("close failed");
3103        let device = fs.take_device().await;
3104        device.reopen(false);
3105        let fs = FxFilesystem::open(device).await.expect("open failed");
3106
3107        {
3108            let store = fs.object_manager().store(store_id).expect("store not found");
3109            store.unlock(NO_OWNER, Arc::new(new_insecure_crypt())).await.expect("unlock failed");
3110        }
3111        fs.close().await.expect("Close failed");
3112    }
3113
3114    #[fuchsia::test]
3115    async fn test_create_and_open_internal_dir() {
3116        let fs = test_filesystem().await;
3117        let dir_id;
3118        let store_id;
3119        {
3120            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3121            let store = root_volume
3122                .new_volume(
3123                    "test",
3124                    NewChildStoreOptions {
3125                        options: StoreOptions {
3126                            owner: NO_OWNER,
3127                            crypt: Some(Arc::new(new_insecure_crypt())),
3128                        },
3129                        ..Default::default()
3130                    },
3131                )
3132                .await
3133                .expect("new_volume failed");
3134            dir_id =
3135                store.get_or_create_internal_directory_id().await.expect("Create internal dir");
3136            store_id = store.store_object_id();
3137        }
3138
3139        fs.close().await.expect("close failed");
3140        let device = fs.take_device().await;
3141        device.reopen(false);
3142        let fs = FxFilesystem::open(device).await.expect("open failed");
3143
3144        {
3145            let store = fs.object_manager().store(store_id).expect("store not found");
3146            store.unlock(NO_OWNER, Arc::new(new_insecure_crypt())).await.expect("unlock failed");
3147            assert_eq!(
3148                dir_id,
3149                store.get_or_create_internal_directory_id().await.expect("Retrieving dir")
3150            );
3151            let obj = store
3152                .tree()
3153                .find(&ObjectKey::object(dir_id))
3154                .await
3155                .expect("Searching tree for dir")
3156                .unwrap();
3157            assert_matches!(
3158                obj.value,
3159                ObjectValue::Object { kind: ObjectKind::Directory { .. }, .. }
3160            );
3161        }
3162        fs.close().await.expect("Close failed");
3163    }
3164
3165    #[fuchsia::test]
3166    async fn test_create_and_open_internal_dir_unencrypted() {
3167        let fs = test_filesystem().await;
3168        let dir_id;
3169        let store_id;
3170        {
3171            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3172            let store = root_volume
3173                .new_volume("test", NewChildStoreOptions::default())
3174                .await
3175                .expect("new_volume failed");
3176            dir_id =
3177                store.get_or_create_internal_directory_id().await.expect("Create internal dir");
3178            store_id = store.store_object_id();
3179        }
3180
3181        fs.close().await.expect("close failed");
3182        let device = fs.take_device().await;
3183        device.reopen(false);
3184        let fs = FxFilesystem::open(device).await.expect("open failed");
3185
3186        {
3187            let store = fs.object_manager().store(store_id).expect("store not found");
3188            assert_eq!(
3189                dir_id,
3190                store.get_or_create_internal_directory_id().await.expect("Retrieving dir")
3191            );
3192            let obj = store
3193                .tree()
3194                .find(&ObjectKey::object(dir_id))
3195                .await
3196                .expect("Searching tree for dir")
3197                .unwrap();
3198            assert_matches!(
3199                obj.value,
3200                ObjectValue::Object { kind: ObjectKind::Directory { .. }, .. }
3201            );
3202        }
3203        fs.close().await.expect("Close failed");
3204    }
3205
3206    #[fuchsia::test(threads = 10)]
3207    async fn test_old_layers_are_purged() {
3208        let fs = test_filesystem().await;
3209
3210        let store = fs.root_store();
3211        let mut transaction = fs
3212            .clone()
3213            .new_transaction(lock_keys![], Options::default())
3214            .await
3215            .expect("new_transaction failed");
3216        let object = Arc::new(
3217            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
3218                .await
3219                .expect("create_object failed"),
3220        );
3221        transaction.commit().await.expect("commit failed");
3222
3223        store.flush().await.expect("flush failed");
3224
3225        let mut buf = object.allocate_buffer(5).await;
3226        buf.as_mut_slice().copy_from_slice(b"hello");
3227        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3228
3229        // Getting the layer-set should cause the flush to stall.
3230        let layer_set = store.tree().layer_set();
3231
3232        let done = Mutex::new(false);
3233        let mut object_id = 0;
3234
3235        join!(
3236            async {
3237                store.flush().await.expect("flush failed");
3238                assert!(*done.lock());
3239            },
3240            async {
3241                // This is a halting problem so all we can do is sleep.
3242                fasync::Timer::new(Duration::from_secs(1)).await;
3243                *done.lock() = true;
3244                object_id = layer_set.layers.last().unwrap().handle().unwrap().object_id();
3245                std::mem::drop(layer_set);
3246            }
3247        );
3248
3249        if let Err(e) = ObjectStore::open_object(
3250            &store.parent_store.as_ref().unwrap(),
3251            object_id,
3252            HandleOptions::default(),
3253            store.crypt(),
3254        )
3255        .await
3256        {
3257            assert!(FxfsError::NotFound.matches(&e));
3258        } else {
3259            panic!("open_object succeeded");
3260        }
3261    }
3262
3263    #[fuchsia::test]
3264    async fn test_tombstone_deletes_data() {
3265        let fs = test_filesystem().await;
3266        let root_store = fs.root_store();
3267        let child_id = {
3268            let mut transaction = fs
3269                .clone()
3270                .new_transaction(lock_keys![], Options::default())
3271                .await
3272                .expect("new_transaction failed");
3273            let child = ObjectStore::create_object(
3274                &root_store,
3275                &mut transaction,
3276                HandleOptions::default(),
3277                None,
3278            )
3279            .await
3280            .expect("create_object failed");
3281            transaction.commit().await.expect("commit failed");
3282
3283            // Allocate an extent in the file.
3284            let mut buffer = child.allocate_buffer(8192).await;
3285            buffer.as_mut_slice().fill(0xaa);
3286            child.write_or_append(Some(0), buffer.as_ref()).await.expect("write failed");
3287
3288            child.object_id()
3289        };
3290
3291        root_store.tombstone_object(child_id, Options::default()).await.expect("tombstone failed");
3292
3293        // Let fsck check allocations.
3294        fsck(fs.clone()).await.expect("fsck failed");
3295    }
3296
3297    #[fuchsia::test]
3298    async fn test_tombstone_purges_keys() {
3299        let fs = test_filesystem().await;
3300        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3301        let store = root_volume
3302            .new_volume(
3303                "test",
3304                NewChildStoreOptions {
3305                    options: StoreOptions {
3306                        crypt: Some(Arc::new(new_insecure_crypt())),
3307                        ..StoreOptions::default()
3308                    },
3309                    ..NewChildStoreOptions::default()
3310                },
3311            )
3312            .await
3313            .expect("new_volume failed");
3314        let mut transaction = fs
3315            .clone()
3316            .new_transaction(lock_keys![], Options::default())
3317            .await
3318            .expect("new_transaction failed");
3319        let child =
3320            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
3321                .await
3322                .expect("create_object failed");
3323        transaction.commit().await.expect("commit failed");
3324        assert!(store.key_manager.get(child.object_id()).await.unwrap().is_some());
3325        store
3326            .tombstone_object(child.object_id(), Options::default())
3327            .await
3328            .expect("tombstone_object failed");
3329        assert!(store.key_manager.get(child.object_id()).await.unwrap().is_none());
3330        fs.close().await.expect("close failed");
3331    }
3332
3333    #[fuchsia::test]
3334    async fn test_major_compaction_discards_unnecessary_records() {
3335        let fs = test_filesystem().await;
3336        let root_store = fs.root_store();
3337        let child_id = {
3338            let mut transaction = fs
3339                .clone()
3340                .new_transaction(lock_keys![], Options::default())
3341                .await
3342                .expect("new_transaction failed");
3343            let child = ObjectStore::create_object(
3344                &root_store,
3345                &mut transaction,
3346                HandleOptions::default(),
3347                None,
3348            )
3349            .await
3350            .expect("create_object failed");
3351            transaction.commit().await.expect("commit failed");
3352
3353            // Allocate an extent in the file.
3354            let mut buffer = child.allocate_buffer(8192).await;
3355            buffer.as_mut_slice().fill(0xaa);
3356            child.write_or_append(Some(0), buffer.as_ref()).await.expect("write failed");
3357
3358            child.object_id()
3359        };
3360
3361        root_store.tombstone_object(child_id, Options::default()).await.expect("tombstone failed");
3362        {
3363            let layers = root_store.tree.layer_set();
3364            let mut merger = layers.merger();
3365            let iter = merger
3366                .query(Query::FullRange(&ObjectKey::object(child_id)))
3367                .await
3368                .expect("seek failed");
3369            // Find at least one object still in the tree.
3370            match iter.get() {
3371                Some(ItemRef { key: ObjectKey { object_id, .. }, .. })
3372                    if *object_id == child_id => {}
3373                _ => panic!("Objects should still be in the tree."),
3374            }
3375        }
3376        root_store.flush().await.expect("flush failed");
3377
3378        // There should be no records for the object.
3379        let layers = root_store.tree.layer_set();
3380        let mut merger = layers.merger();
3381        let iter = merger
3382            .query(Query::FullRange(&ObjectKey::object(child_id)))
3383            .await
3384            .expect("seek failed");
3385        match iter.get() {
3386            None => {}
3387            Some(ItemRef { key: ObjectKey { object_id, .. }, .. }) => {
3388                assert_ne!(*object_id, child_id)
3389            }
3390        }
3391    }
3392
3393    #[fuchsia::test]
3394    async fn test_overlapping_extents_in_different_layers() {
3395        let fs = test_filesystem().await;
3396        let store = fs.root_store();
3397
3398        let mut transaction = fs
3399            .clone()
3400            .new_transaction(
3401                lock_keys![LockKey::object(
3402                    store.store_object_id(),
3403                    store.root_directory_object_id()
3404                )],
3405                Options::default(),
3406            )
3407            .await
3408            .expect("new_transaction failed");
3409        let root_directory =
3410            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
3411        let object = root_directory
3412            .create_child_file(&mut transaction, "test")
3413            .await
3414            .expect("create_child_file failed");
3415        transaction.commit().await.expect("commit failed");
3416
3417        let buf = object.allocate_buffer(16384).await;
3418        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3419
3420        store.flush().await.expect("flush failed");
3421
3422        object.write_or_append(Some(0), buf.subslice(0..4096)).await.expect("write failed");
3423
3424        // At this point, we should have an extent for 0..16384 in a layer that has been flushed,
3425        // and an extent for 0..4096 that partially overwrites it.  Writing to 0..16384 should
3426        // overwrite both of those extents.
3427        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3428
3429        fsck(fs.clone()).await.expect("fsck failed");
3430    }
3431
3432    #[fuchsia::test(threads = 10)]
3433    async fn test_encrypted_mutations() {
3434        async fn one_iteration(
3435            fs: OpenFxFilesystem,
3436            crypt: Arc<dyn Crypt>,
3437            iteration: u64,
3438        ) -> OpenFxFilesystem {
3439            async fn reopen(fs: OpenFxFilesystem) -> OpenFxFilesystem {
3440                fs.close().await.expect("Close failed");
3441                let device = fs.take_device().await;
3442                device.reopen(false);
3443                FxFilesystem::open(device).await.expect("FS open failed")
3444            }
3445
3446            let fs = reopen(fs).await;
3447
3448            let (store_object_id, object_id) = {
3449                let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3450                let store = root_volume
3451                    .volume(
3452                        "test",
3453                        StoreOptions { crypt: Some(crypt.clone()), ..StoreOptions::default() },
3454                    )
3455                    .await
3456                    .expect("volume failed");
3457
3458                let mut transaction = fs
3459                    .clone()
3460                    .new_transaction(
3461                        lock_keys![LockKey::object(
3462                            store.store_object_id(),
3463                            store.root_directory_object_id(),
3464                        )],
3465                        Options::default(),
3466                    )
3467                    .await
3468                    .expect("new_transaction failed");
3469                let root_directory = Directory::open(&store, store.root_directory_object_id())
3470                    .await
3471                    .expect("open failed");
3472                let object = root_directory
3473                    .create_child_file(&mut transaction, &format!("test {}", iteration))
3474                    .await
3475                    .expect("create_child_file failed");
3476                transaction.commit().await.expect("commit failed");
3477
3478                let mut buf = object.allocate_buffer(1000).await;
3479                for i in 0..buf.len() {
3480                    buf.as_mut_slice()[i] = i as u8;
3481                }
3482                object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3483
3484                (store.store_object_id(), object.object_id())
3485            };
3486
3487            let fs = reopen(fs).await;
3488
3489            let check_object = |fs: Arc<FxFilesystem>| {
3490                let crypt = crypt.clone();
3491                async move {
3492                    let root_volume = root_volume(fs).await.expect("root_volume failed");
3493                    let volume = root_volume
3494                        .volume(
3495                            "test",
3496                            StoreOptions { crypt: Some(crypt), ..StoreOptions::default() },
3497                        )
3498                        .await
3499                        .expect("volume failed");
3500
3501                    let object = ObjectStore::open_object(
3502                        &volume,
3503                        object_id,
3504                        HandleOptions::default(),
3505                        None,
3506                    )
3507                    .await
3508                    .expect("open_object failed");
3509                    let mut buf = object.allocate_buffer(1000).await;
3510                    assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), 1000);
3511                    for i in 0..buf.len() {
3512                        assert_eq!(buf.as_slice()[i], i as u8);
3513                    }
3514                }
3515            };
3516
3517            check_object(fs.clone()).await;
3518
3519            let fs = reopen(fs).await;
3520
3521            // At this point the "test" volume is locked.  Before checking the object, flush the
3522            // filesystem.  This should leave a file with encrypted mutations.
3523            fs.object_manager().flush().await.expect("flush failed");
3524
3525            assert_ne!(
3526                fs.object_manager()
3527                    .store(store_object_id)
3528                    .unwrap()
3529                    .load_store_info()
3530                    .await
3531                    .expect("load_store_info failed")
3532                    .encrypted_mutations_object_id,
3533                INVALID_OBJECT_ID
3534            );
3535
3536            check_object(fs.clone()).await;
3537
3538            // Checking the object should have triggered a flush and so now there should be no
3539            // encrypted mutations object.
3540            assert_eq!(
3541                fs.object_manager()
3542                    .store(store_object_id)
3543                    .unwrap()
3544                    .load_store_info()
3545                    .await
3546                    .expect("load_store_info failed")
3547                    .encrypted_mutations_object_id,
3548                INVALID_OBJECT_ID
3549            );
3550
3551            let fs = reopen(fs).await;
3552
3553            fsck(fs.clone()).await.expect("fsck failed");
3554
3555            let fs = reopen(fs).await;
3556
3557            check_object(fs.clone()).await;
3558
3559            fs
3560        }
3561
3562        let mut fs = test_filesystem().await;
3563        let crypt = Arc::new(new_insecure_crypt());
3564
3565        {
3566            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3567            let _store = root_volume
3568                .new_volume(
3569                    "test",
3570                    NewChildStoreOptions {
3571                        options: StoreOptions {
3572                            crypt: Some(crypt.clone()),
3573                            ..StoreOptions::default()
3574                        },
3575                        ..Default::default()
3576                    },
3577                )
3578                .await
3579                .expect("new_volume failed");
3580        }
3581
3582        // Run a few iterations so that we test changes with the stream cipher offset.
3583        for i in 0..5 {
3584            fs = one_iteration(fs, crypt.clone(), i).await;
3585        }
3586    }
3587
3588    #[test_case(true; "with a flush")]
3589    #[test_case(false; "without a flush")]
3590    #[fuchsia::test(threads = 10)]
3591    async fn test_object_id_cipher_roll(with_flush: bool) {
3592        let fs = test_filesystem().await;
3593        let crypt = Arc::new(new_insecure_crypt());
3594
3595        let expected_key = {
3596            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3597            let store = root_volume
3598                .new_volume(
3599                    "test",
3600                    NewChildStoreOptions {
3601                        options: StoreOptions {
3602                            crypt: Some(crypt.clone()),
3603                            ..StoreOptions::default()
3604                        },
3605                        ..Default::default()
3606                    },
3607                )
3608                .await
3609                .expect("new_volume failed");
3610
3611            // Create some files so that our in-memory copy of StoreInfo has changes (the object
3612            // count) pending a flush.
3613            let root_dir_id = store.root_directory_object_id();
3614            let root_dir =
3615                Arc::new(Directory::open(&store, root_dir_id).await.expect("open failed"));
3616            let mut transaction = fs
3617                .clone()
3618                .new_transaction(
3619                    lock_keys![LockKey::object(store.store_object_id(), root_dir_id)],
3620                    Options::default(),
3621                )
3622                .await
3623                .expect("new_transaction failed");
3624            for i in 0..10 {
3625                root_dir.create_child_file(&mut transaction, &format!("file {i}")).await.unwrap();
3626            }
3627            transaction.commit().await.expect("commit failed");
3628
3629            let orig_store_info = store.store_info().unwrap();
3630
3631            // Hack the last object ID to force a roll of the object ID cipher.
3632            {
3633                let mut last_object_id = store.last_object_id.lock();
3634                match &mut *last_object_id {
3635                    LastObjectId::Encrypted { id, .. } => {
3636                        assert_eq!(*id & OBJECT_ID_HI_MASK, 0);
3637                        *id |= 0xffffffff;
3638                    }
3639                    _ => unreachable!(),
3640                }
3641            }
3642
3643            let mut transaction = fs
3644                .clone()
3645                .new_transaction(
3646                    lock_keys![LockKey::object(
3647                        store.store_object_id(),
3648                        store.root_directory_object_id()
3649                    )],
3650                    Options::default(),
3651                )
3652                .await
3653                .expect("new_transaction failed");
3654            let root_directory = Directory::open(&store, store.root_directory_object_id())
3655                .await
3656                .expect("open failed");
3657            let object = root_directory
3658                .create_child_file(&mut transaction, "test")
3659                .await
3660                .expect("create_child_file failed");
3661            transaction.commit().await.expect("commit failed");
3662
3663            assert_eq!(object.object_id() & OBJECT_ID_HI_MASK, 1u64 << 32);
3664
3665            // Check that the key has been changed.
3666            let key = match (
3667                store.store_info().unwrap().last_object_id,
3668                orig_store_info.last_object_id,
3669            ) {
3670                (
3671                    LastObjectIdInfo::Encrypted { key, id },
3672                    LastObjectIdInfo::Encrypted { key: orig_key, .. },
3673                ) => {
3674                    assert_ne!(key, orig_key);
3675                    assert_eq!(id, 1u64 << 32);
3676                    key
3677                }
3678                _ => unreachable!(),
3679            };
3680
3681            if with_flush {
3682                fs.journal().compact().await.unwrap();
3683            }
3684
3685            let last_object_id = store.last_object_id.lock();
3686            assert_eq!(last_object_id.id(), 1u64 << 32);
3687            key
3688        };
3689
3690        fs.close().await.expect("Close failed");
3691        let device = fs.take_device().await;
3692        device.reopen(false);
3693        let fs = FxFilesystem::open(device).await.expect("open failed");
3694        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3695        let store = root_volume
3696            .volume("test", StoreOptions { crypt: Some(crypt.clone()), ..StoreOptions::default() })
3697            .await
3698            .expect("volume failed");
3699
3700        assert_matches!(store.store_info().unwrap().last_object_id, LastObjectIdInfo::Encrypted { key, .. } if key == expected_key);
3701        assert_eq!(store.last_object_id.lock().id(), 1u64 << 32);
3702
3703        fsck(fs.clone()).await.expect("fsck failed");
3704        fsck_volume(&fs, store.store_object_id(), None).await.expect("fsck_volume failed");
3705    }
3706
3707    #[fuchsia::test(threads = 2)]
3708    async fn test_race_object_id_cipher_roll_and_flush() {
3709        let fs = test_filesystem().await;
3710        let crypt = Arc::new(new_insecure_crypt());
3711
3712        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3713        let store = root_volume
3714            .new_volume(
3715                "test",
3716                NewChildStoreOptions {
3717                    options: StoreOptions { crypt: Some(crypt.clone()), ..StoreOptions::default() },
3718                    ..Default::default()
3719                },
3720            )
3721            .await
3722            .expect("new_volume failed");
3723
3724        assert!(matches!(&*store.last_object_id.lock(), LastObjectId::Encrypted { .. }));
3725
3726        // Create some files so that our in-memory copy of StoreInfo has changes (the object
3727        // count) pending a flush.
3728        let root_dir_id = store.root_directory_object_id();
3729        let root_dir = Arc::new(Directory::open(&store, root_dir_id).await.expect("open failed"));
3730
3731        let _executor_tasks = testing::force_executor_threads_to_run(2).await;
3732
3733        for j in 0..100 {
3734            let mut transaction = fs
3735                .clone()
3736                .new_transaction(
3737                    lock_keys![LockKey::object(store.store_object_id(), root_dir_id)],
3738                    Options::default(),
3739                )
3740                .await
3741                .expect("new_transaction failed");
3742            root_dir.create_child_file(&mut transaction, &format!("file {j}")).await.unwrap();
3743            transaction.commit().await.expect("commit failed");
3744
3745            let task = {
3746                let fs = fs.clone();
3747                fasync::Task::spawn(async move {
3748                    fs.journal().compact().await.unwrap();
3749                })
3750            };
3751
3752            // Hack the last object ID to force a roll of the object ID cipher.
3753            {
3754                let mut last_object_id = store.last_object_id.lock();
3755                let LastObjectId::Encrypted { id, .. } = &mut *last_object_id else {
3756                    unreachable!()
3757                };
3758                assert_eq!(*id >> 32, j);
3759                *id |= 0xffffffff;
3760            }
3761
3762            let mut transaction = fs
3763                .clone()
3764                .new_transaction(
3765                    lock_keys![LockKey::object(
3766                        store.store_object_id(),
3767                        store.root_directory_object_id()
3768                    )],
3769                    Options::default(),
3770                )
3771                .await
3772                .expect("new_transaction failed");
3773            let root_directory = Directory::open(&store, store.root_directory_object_id())
3774                .await
3775                .expect("open failed");
3776            root_directory
3777                .create_child_file(&mut transaction, "test {j}")
3778                .await
3779                .expect("create_child_file failed");
3780            transaction.commit().await.expect("commit failed");
3781
3782            task.await;
3783
3784            // Check that the key has been changed.
3785            let new_store_info = store.load_store_info().await.unwrap();
3786
3787            let LastObjectIdInfo::Encrypted { id, key } = new_store_info.last_object_id else {
3788                unreachable!()
3789            };
3790            assert_eq!(id >> 32, j + 1);
3791            let LastObjectIdInfo::Encrypted { key: in_memory_key, .. } =
3792                store.store_info().unwrap().last_object_id
3793            else {
3794                unreachable!()
3795            };
3796            assert_eq!(key, in_memory_key);
3797        }
3798
3799        fs.close().await.expect("Close failed");
3800    }
3801
3802    #[fuchsia::test]
3803    async fn test_object_id_no_roll_for_unencrypted_store() {
3804        let fs = test_filesystem().await;
3805
3806        {
3807            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3808            let store = root_volume
3809                .new_volume("test", NewChildStoreOptions::default())
3810                .await
3811                .expect("new_volume failed");
3812
3813            // Hack the last object ID.
3814            {
3815                let mut last_object_id = store.last_object_id.lock();
3816                match &mut *last_object_id {
3817                    LastObjectId::Unencrypted { id } => {
3818                        assert_eq!(*id & OBJECT_ID_HI_MASK, 0);
3819                        *id |= 0xffffffff;
3820                    }
3821                    _ => unreachable!(),
3822                }
3823            }
3824
3825            let mut transaction = fs
3826                .clone()
3827                .new_transaction(
3828                    lock_keys![LockKey::object(
3829                        store.store_object_id(),
3830                        store.root_directory_object_id()
3831                    )],
3832                    Options::default(),
3833                )
3834                .await
3835                .expect("new_transaction failed");
3836            let root_directory = Directory::open(&store, store.root_directory_object_id())
3837                .await
3838                .expect("open failed");
3839            let object = root_directory
3840                .create_child_file(&mut transaction, "test")
3841                .await
3842                .expect("create_child_file failed");
3843            transaction.commit().await.expect("commit failed");
3844
3845            assert_eq!(object.object_id(), 0x1_0000_0000);
3846
3847            // Check that there is still no key.
3848            assert_matches!(
3849                store.store_info().unwrap().last_object_id,
3850                LastObjectIdInfo::Unencrypted { .. }
3851            );
3852
3853            assert_eq!(store.last_object_id.lock().id(), 0x1_0000_0000);
3854        };
3855
3856        fs.close().await.expect("Close failed");
3857        let device = fs.take_device().await;
3858        device.reopen(false);
3859        let fs = FxFilesystem::open(device).await.expect("open failed");
3860        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3861        let store =
3862            root_volume.volume("test", StoreOptions::default()).await.expect("volume failed");
3863
3864        assert_eq!(store.last_object_id.lock().id(), 0x1_0000_0000);
3865    }
3866
3867    #[fuchsia::test]
3868    fn test_object_id_is_not_invalid_object_id() {
3869        let key = UnwrappedKey::new(vec![0; FXFS_KEY_SIZE]);
3870        // 1106634048 results in INVALID_OBJECT_ID with this key.
3871        let mut last_object_id =
3872            LastObjectId::Encrypted { id: 1106634047, cipher: Box::new(Ff1::new(&key)) };
3873        assert!(last_object_id.try_get_next().is_some());
3874        assert!(last_object_id.try_get_next().is_some());
3875    }
3876
3877    #[fuchsia::test]
3878    async fn test_last_object_id_is_correct_after_unlock() {
3879        let fs = test_filesystem().await;
3880        let crypt = Arc::new(new_insecure_crypt());
3881
3882        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3883        let store = root_volume
3884            .new_volume(
3885                "test",
3886                NewChildStoreOptions {
3887                    options: StoreOptions { crypt: Some(crypt.clone()), ..StoreOptions::default() },
3888                    ..Default::default()
3889                },
3890            )
3891            .await
3892            .expect("new_volume failed");
3893
3894        let mut transaction = fs
3895            .clone()
3896            .new_transaction(
3897                lock_keys![LockKey::object(
3898                    store.store_object_id(),
3899                    store.root_directory_object_id()
3900                )],
3901                Options::default(),
3902            )
3903            .await
3904            .expect("new_transaction failed");
3905        let root_directory =
3906            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
3907        root_directory
3908            .create_child_file(&mut transaction, "test")
3909            .await
3910            .expect("create_child_file failed");
3911        transaction.commit().await.expect("commit failed");
3912
3913        // Compact so that StoreInfo is written.
3914        fs.journal().compact().await.unwrap();
3915
3916        let last_object_id = store.last_object_id.lock().id();
3917
3918        store.lock().await.unwrap();
3919        store.unlock(NO_OWNER, crypt.clone()).await.unwrap();
3920
3921        assert_eq!(store.last_object_id.lock().id(), last_object_id);
3922    }
3923
3924    #[fuchsia::test(threads = 20)]
3925    async fn test_race_when_rolling_last_object_id_cipher() {
3926        // NOTE: This test is trying to test a race, so if it fails, it might be flaky.
3927
3928        const NUM_THREADS: usize = 20;
3929
3930        let fs = test_filesystem().await;
3931        let crypt = Arc::new(new_insecure_crypt());
3932
3933        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3934        let store = root_volume
3935            .new_volume(
3936                "test",
3937                NewChildStoreOptions {
3938                    options: StoreOptions { crypt: Some(crypt.clone()), ..StoreOptions::default() },
3939                    ..Default::default()
3940                },
3941            )
3942            .await
3943            .expect("new_volume failed");
3944
3945        let store_id = store.store_object_id();
3946        let root_dir_id = store.root_directory_object_id();
3947
3948        let root_directory =
3949            Arc::new(Directory::open(&store, root_dir_id).await.expect("open failed"));
3950
3951        // Create directories.
3952        let mut directories = Vec::new();
3953        for _ in 0..NUM_THREADS {
3954            let mut transaction = fs
3955                .clone()
3956                .new_transaction(
3957                    lock_keys![LockKey::object(store_id, root_dir_id,)],
3958                    Options::default(),
3959                )
3960                .await
3961                .expect("new_transaction failed");
3962            directories.push(
3963                root_directory
3964                    .create_child_dir(&mut transaction, "test")
3965                    .await
3966                    .expect("create_child_file failed"),
3967            );
3968            transaction.commit().await.expect("commit failed");
3969        }
3970
3971        // Hack the last object ID so that the next ID will require a roll.
3972        match &mut *store.last_object_id.lock() {
3973            LastObjectId::Encrypted { id, .. } => *id |= 0xffff_ffff,
3974            _ => unreachable!(),
3975        }
3976
3977        let scope = fasync::Scope::new();
3978
3979        let _executor_tasks = testing::force_executor_threads_to_run(NUM_THREADS).await;
3980
3981        for dir in directories {
3982            let fs = fs.clone();
3983            scope.spawn(async move {
3984                let mut transaction = fs
3985                    .clone()
3986                    .new_transaction(
3987                        lock_keys![LockKey::object(store_id, dir.object_id(),)],
3988                        Options::default(),
3989                    )
3990                    .await
3991                    .expect("new_transaction failed");
3992                dir.create_child_file(&mut transaction, "test")
3993                    .await
3994                    .expect("create_child_file failed");
3995                transaction.commit().await.expect("commit failed");
3996            });
3997        }
3998
3999        scope.on_no_tasks().await;
4000
4001        assert_eq!(store.last_object_id.lock().id(), 0x1_0000_0000 + NUM_THREADS as u64 - 1);
4002    }
4003
4004    #[fuchsia::test(threads = 10)]
4005    async fn test_lock_store() {
4006        let fs = test_filesystem().await;
4007        let crypt = Arc::new(new_insecure_crypt());
4008
4009        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
4010        let store = root_volume
4011            .new_volume(
4012                "test",
4013                NewChildStoreOptions {
4014                    options: StoreOptions { crypt: Some(crypt.clone()), ..StoreOptions::default() },
4015                    ..NewChildStoreOptions::default()
4016                },
4017            )
4018            .await
4019            .expect("new_volume failed");
4020        let mut transaction = fs
4021            .clone()
4022            .new_transaction(
4023                lock_keys![LockKey::object(
4024                    store.store_object_id(),
4025                    store.root_directory_object_id()
4026                )],
4027                Options::default(),
4028            )
4029            .await
4030            .expect("new_transaction failed");
4031        let root_directory =
4032            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
4033        root_directory
4034            .create_child_file(&mut transaction, "test")
4035            .await
4036            .expect("create_child_file failed");
4037        transaction.commit().await.expect("commit failed");
4038        store.lock().await.expect("lock failed");
4039
4040        store.unlock(NO_OWNER, crypt).await.expect("unlock failed");
4041        root_directory.lookup("test").await.expect("lookup failed").expect("not found");
4042    }
4043
4044    #[fuchsia::test(threads = 10)]
4045    async fn test_unlock_read_only() {
4046        let fs = test_filesystem().await;
4047        let crypt = Arc::new(new_insecure_crypt());
4048
4049        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
4050        let store = root_volume
4051            .new_volume(
4052                "test",
4053                NewChildStoreOptions {
4054                    options: StoreOptions { crypt: Some(crypt.clone()), ..StoreOptions::default() },
4055                    ..NewChildStoreOptions::default()
4056                },
4057            )
4058            .await
4059            .expect("new_volume failed");
4060        let mut transaction = fs
4061            .clone()
4062            .new_transaction(
4063                lock_keys![LockKey::object(
4064                    store.store_object_id(),
4065                    store.root_directory_object_id()
4066                )],
4067                Options::default(),
4068            )
4069            .await
4070            .expect("new_transaction failed");
4071        let root_directory =
4072            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
4073        root_directory
4074            .create_child_file(&mut transaction, "test")
4075            .await
4076            .expect("create_child_file failed");
4077        transaction.commit().await.expect("commit failed");
4078        store.lock().await.expect("lock failed");
4079
4080        store.unlock_read_only(crypt.clone()).await.expect("unlock failed");
4081        root_directory.lookup("test").await.expect("lookup failed").expect("not found");
4082        store.lock_read_only();
4083        store.unlock_read_only(crypt).await.expect("unlock failed");
4084        root_directory.lookup("test").await.expect("lookup failed").expect("not found");
4085    }
4086
4087    #[fuchsia::test(threads = 10)]
4088    async fn test_key_rolled_when_unlocked() {
4089        let fs = test_filesystem().await;
4090        let crypt = Arc::new(new_insecure_crypt());
4091
4092        let object_id;
4093        {
4094            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
4095            let store = root_volume
4096                .new_volume(
4097                    "test",
4098                    NewChildStoreOptions {
4099                        options: StoreOptions {
4100                            crypt: Some(crypt.clone()),
4101                            ..StoreOptions::default()
4102                        },
4103                        ..Default::default()
4104                    },
4105                )
4106                .await
4107                .expect("new_volume failed");
4108            let mut transaction = fs
4109                .clone()
4110                .new_transaction(
4111                    lock_keys![LockKey::object(
4112                        store.store_object_id(),
4113                        store.root_directory_object_id()
4114                    )],
4115                    Options::default(),
4116                )
4117                .await
4118                .expect("new_transaction failed");
4119            let root_directory = Directory::open(&store, store.root_directory_object_id())
4120                .await
4121                .expect("open failed");
4122            object_id = root_directory
4123                .create_child_file(&mut transaction, "test")
4124                .await
4125                .expect("create_child_file failed")
4126                .object_id();
4127            transaction.commit().await.expect("commit failed");
4128        }
4129
4130        fs.close().await.expect("Close failed");
4131        let mut device = fs.take_device().await;
4132
4133        // Repeatedly remount so that we can be sure that we can remount when there are many
4134        // mutations keys.
4135        for _ in 0..100 {
4136            device.reopen(false);
4137            let fs = FxFilesystem::open(device).await.expect("open failed");
4138            {
4139                let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
4140                let store = root_volume
4141                    .volume(
4142                        "test",
4143                        StoreOptions { crypt: Some(crypt.clone()), ..StoreOptions::default() },
4144                    )
4145                    .await
4146                    .expect("open_volume failed");
4147
4148                // The key should get rolled every time we unlock.
4149                assert_eq!(store.mutations_cipher.lock().as_ref().unwrap().offset(), 0);
4150
4151                // Make sure there's an encrypted mutation.
4152                let handle =
4153                    ObjectStore::open_object(&store, object_id, HandleOptions::default(), None)
4154                        .await
4155                        .expect("open_object failed");
4156                let buffer = handle.allocate_buffer(100).await;
4157                handle
4158                    .write_or_append(Some(0), buffer.as_ref())
4159                    .await
4160                    .expect("write_or_append failed");
4161            }
4162            fs.close().await.expect("Close failed");
4163            device = fs.take_device().await;
4164        }
4165    }
4166
4167    #[test]
4168    fn test_store_info_max_serialized_size() {
4169        let info = StoreInfo {
4170            guid: [0xff; 16],
4171            last_object_id: LastObjectIdInfo::Encrypted {
4172                id: 0x1234567812345678,
4173                key: FxfsKey {
4174                    wrapping_key_id: 0x1234567812345678u128.to_le_bytes(),
4175                    key: WrappedKeyBytes::from([0xff; FXFS_WRAPPED_KEY_SIZE]),
4176                },
4177            },
4178            // Worst case, each layer should be 3/4 the size of the layer below it (because of the
4179            // compaction policy we're using).  If the smallest layer is 8,192 bytes, then 120
4180            // layers would take up a size that exceeds a 64 bit unsigned integer, so if this fits,
4181            // any size should fit.
4182            layers: vec![0x1234567812345678; 120],
4183            root_directory_object_id: 0x1234567812345678,
4184            graveyard_directory_object_id: 0x1234567812345678,
4185            object_count: 0x1234567812345678,
4186            mutations_key: Some(FxfsKey {
4187                wrapping_key_id: 0x1234567812345678u128.to_le_bytes(),
4188                key: WrappedKeyBytes::from([0xff; FXFS_WRAPPED_KEY_SIZE]),
4189            }),
4190            mutations_cipher_offset: 0x1234567812345678,
4191            encrypted_mutations_object_id: 0x1234567812345678,
4192            internal_directory_object_id: INVALID_OBJECT_ID,
4193        };
4194        let mut serialized_info = Vec::new();
4195        info.serialize_with_version(&mut serialized_info).unwrap();
4196        assert!(
4197            serialized_info.len() <= MAX_STORE_INFO_SERIALIZED_SIZE,
4198            "{}",
4199            serialized_info.len()
4200        );
4201    }
4202
4203    async fn reopen_after_crypt_failure_inner(read_only: bool) {
4204        let fs = test_filesystem().await;
4205        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
4206
4207        let store = {
4208            let crypt = Arc::new(new_insecure_crypt());
4209            let store = root_volume
4210                .new_volume(
4211                    "vol",
4212                    NewChildStoreOptions {
4213                        options: StoreOptions {
4214                            crypt: Some(crypt.clone()),
4215                            ..StoreOptions::default()
4216                        },
4217                        ..Default::default()
4218                    },
4219                )
4220                .await
4221                .expect("new_volume failed");
4222            let root_directory = Directory::open(&store, store.root_directory_object_id())
4223                .await
4224                .expect("open failed");
4225            let mut transaction = fs
4226                .clone()
4227                .new_transaction(
4228                    lock_keys![LockKey::object(
4229                        store.store_object_id(),
4230                        root_directory.object_id()
4231                    )],
4232                    Options::default(),
4233                )
4234                .await
4235                .expect("new_transaction failed");
4236            root_directory
4237                .create_child_file(&mut transaction, "test")
4238                .await
4239                .expect("create_child_file failed");
4240            transaction.commit().await.expect("commit failed");
4241
4242            crypt.shutdown();
4243            let mut transaction = fs
4244                .clone()
4245                .new_transaction(
4246                    lock_keys![LockKey::object(
4247                        store.store_object_id(),
4248                        root_directory.object_id()
4249                    )],
4250                    Options::default(),
4251                )
4252                .await
4253                .expect("new_transaction failed");
4254            root_directory
4255                .create_child_file(&mut transaction, "test2")
4256                .await
4257                .map(|_| ())
4258                .expect_err("create_child_file should fail");
4259            store.lock().await.expect("lock failed");
4260            store
4261        };
4262
4263        let crypt = Arc::new(new_insecure_crypt());
4264        if read_only {
4265            store.unlock_read_only(crypt).await.expect("unlock failed");
4266        } else {
4267            store.unlock(NO_OWNER, crypt).await.expect("unlock failed");
4268        }
4269        let root_directory =
4270            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
4271        root_directory.lookup("test").await.expect("lookup failed").expect("not found");
4272    }
4273
4274    #[fuchsia::test(threads = 10)]
4275    async fn test_reopen_after_crypt_failure() {
4276        reopen_after_crypt_failure_inner(false).await;
4277    }
4278
4279    #[fuchsia::test(threads = 10)]
4280    async fn test_reopen_read_only_after_crypt_failure() {
4281        reopen_after_crypt_failure_inner(true).await;
4282    }
4283
4284    #[fuchsia::test(threads = 10)]
4285    #[should_panic(expected = "Insufficient reservation space")]
4286    #[cfg(debug_assertions)]
4287    async fn large_transaction_causes_panic_in_debug_builds() {
4288        let fs = test_filesystem().await;
4289        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
4290        let store = root_volume
4291            .new_volume("vol", NewChildStoreOptions::default())
4292            .await
4293            .expect("new_volume failed");
4294        let root_directory =
4295            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
4296        let mut transaction = fs
4297            .clone()
4298            .new_transaction(
4299                lock_keys![LockKey::object(store.store_object_id(), root_directory.object_id())],
4300                Options::default(),
4301            )
4302            .await
4303            .expect("transaction");
4304        for i in 0..500 {
4305            root_directory
4306                .create_symlink(&mut transaction, b"link", &format!("{}", i))
4307                .await
4308                .expect("symlink");
4309        }
4310        assert_eq!(transaction.commit().await.expect("commit"), 0);
4311    }
4312
4313    #[fuchsia::test]
4314    async fn test_crypt_failure_does_not_fuse_journal() {
4315        let fs = test_filesystem().await;
4316
4317        struct Owner;
4318        #[async_trait]
4319        impl StoreOwner for Owner {
4320            async fn force_lock(self: Arc<Self>, store: &ObjectStore) -> Result<(), anyhow::Error> {
4321                store.lock().await
4322            }
4323        }
4324        let owner = Arc::new(Owner) as Arc<dyn StoreOwner>;
4325
4326        {
4327            // Create two stores and a record for each store, so the journal will need to flush them
4328            // both later.
4329            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
4330            let store1 = root_volume
4331                .new_volume(
4332                    "vol1",
4333                    NewChildStoreOptions {
4334                        options: StoreOptions {
4335                            crypt: Some(Arc::new(new_insecure_crypt())),
4336                            ..StoreOptions::default()
4337                        },
4338                        ..Default::default()
4339                    },
4340                )
4341                .await
4342                .expect("new_volume failed");
4343            let crypt = Arc::new(new_insecure_crypt());
4344            let store2 = root_volume
4345                .new_volume(
4346                    "vol2",
4347                    NewChildStoreOptions {
4348                        options: StoreOptions {
4349                            owner: Arc::downgrade(&owner),
4350                            crypt: Some(crypt.clone()),
4351                        },
4352                        ..Default::default()
4353                    },
4354                )
4355                .await
4356                .expect("new_volume failed");
4357            for store in [&store1, &store2] {
4358                let root_directory = Directory::open(store, store.root_directory_object_id())
4359                    .await
4360                    .expect("open failed");
4361                let mut transaction = fs
4362                    .clone()
4363                    .new_transaction(
4364                        lock_keys![LockKey::object(
4365                            store.store_object_id(),
4366                            root_directory.object_id()
4367                        )],
4368                        Options::default(),
4369                    )
4370                    .await
4371                    .expect("new_transaction failed");
4372                root_directory
4373                    .create_child_file(&mut transaction, "test")
4374                    .await
4375                    .expect("create_child_file failed");
4376                transaction.commit().await.expect("commit failed");
4377            }
4378            // Shut down the crypt instance for store2, and then compact.  Compaction should not
4379            // fail, and the store should become locked.
4380            crypt.shutdown();
4381            fs.journal().compact().await.expect("compact failed");
4382            // The store should now be locked.
4383            assert!(store2.is_locked());
4384        }
4385
4386        // Even though the store wasn't flushed, the mutation to store2 will still be valid as it is
4387        // held in the journal.
4388        fs.close().await.expect("close failed");
4389        let device = fs.take_device().await;
4390        device.reopen(false);
4391        let fs = FxFilesystem::open(device).await.expect("open failed");
4392        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
4393
4394        for volume_name in ["vol1", "vol2"] {
4395            let store = root_volume
4396                .volume(
4397                    volume_name,
4398                    StoreOptions {
4399                        crypt: Some(Arc::new(new_insecure_crypt())),
4400                        ..StoreOptions::default()
4401                    },
4402                )
4403                .await
4404                .expect("open volume failed");
4405            let root_directory = Directory::open(&store, store.root_directory_object_id())
4406                .await
4407                .expect("open failed");
4408            assert!(root_directory.lookup("test").await.expect("lookup failed").is_some());
4409        }
4410
4411        fs.close().await.expect("close failed");
4412    }
4413
4414    #[fuchsia::test]
4415    async fn test_crypt_failure_during_unlock_race() {
4416        let fs = test_filesystem().await;
4417
4418        struct Owner;
4419        #[async_trait]
4420        impl StoreOwner for Owner {
4421            async fn force_lock(self: Arc<Self>, store: &ObjectStore) -> Result<(), anyhow::Error> {
4422                store.lock().await
4423            }
4424        }
4425        let owner = Arc::new(Owner) as Arc<dyn StoreOwner>;
4426
4427        let store_object_id = {
4428            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
4429            let store = root_volume
4430                .new_volume(
4431                    "vol",
4432                    NewChildStoreOptions {
4433                        options: StoreOptions {
4434                            owner: Arc::downgrade(&owner),
4435                            crypt: Some(Arc::new(new_insecure_crypt())),
4436                        },
4437                        ..Default::default()
4438                    },
4439                )
4440                .await
4441                .expect("new_volume failed");
4442            let root_directory = Directory::open(&store, store.root_directory_object_id())
4443                .await
4444                .expect("open failed");
4445            let mut transaction = fs
4446                .clone()
4447                .new_transaction(
4448                    lock_keys![LockKey::object(
4449                        store.store_object_id(),
4450                        root_directory.object_id()
4451                    )],
4452                    Options::default(),
4453                )
4454                .await
4455                .expect("new_transaction failed");
4456            root_directory
4457                .create_child_file(&mut transaction, "test")
4458                .await
4459                .expect("create_child_file failed");
4460            transaction.commit().await.expect("commit failed");
4461            store.store_object_id()
4462        };
4463
4464        fs.close().await.expect("close failed");
4465        let device = fs.take_device().await;
4466        device.reopen(false);
4467
4468        let fs = FxFilesystem::open(device).await.expect("open failed");
4469        {
4470            let fs_clone = fs.clone();
4471            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
4472
4473            let crypt = Arc::new(new_insecure_crypt());
4474            let crypt_clone = crypt.clone();
4475            join!(
4476                async move {
4477                    // Unlock might fail, so ignore errors.
4478                    let _ = root_volume
4479                        .volume(
4480                            "vol",
4481                            StoreOptions {
4482                                owner: Arc::downgrade(&owner),
4483                                crypt: Some(crypt_clone),
4484                            },
4485                        )
4486                        .await;
4487                },
4488                async move {
4489                    // Block until unlock is finished but before flushing due to unlock is finished, to
4490                    // maximize the chances of weirdness.
4491                    let keys = lock_keys![LockKey::flush(store_object_id)];
4492                    let _ = fs_clone.lock_manager().write_lock(keys).await;
4493                    crypt.shutdown();
4494                }
4495            );
4496        }
4497
4498        fs.close().await.expect("close failed");
4499        let device = fs.take_device().await;
4500        device.reopen(false);
4501
4502        let fs = FxFilesystem::open(device).await.expect("open failed");
4503        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
4504        let store = root_volume
4505            .volume(
4506                "vol",
4507                StoreOptions {
4508                    crypt: Some(Arc::new(new_insecure_crypt())),
4509                    ..StoreOptions::default()
4510                },
4511            )
4512            .await
4513            .expect("open volume failed");
4514        let root_directory =
4515            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
4516        assert!(root_directory.lookup("test").await.expect("lookup failed").is_some());
4517
4518        fs.close().await.expect("close failed");
4519    }
4520
4521    #[fuchsia::test]
4522    async fn test_low_32_bit_object_ids() {
4523        let device = DeviceHolder::new(FakeDevice::new(16384, TEST_DEVICE_BLOCK_SIZE));
4524        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
4525
4526        {
4527            let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
4528
4529            let store = root_vol
4530                .new_volume(
4531                    "test",
4532                    NewChildStoreOptions { low_32_bit_object_ids: true, ..Default::default() },
4533                )
4534                .await
4535                .expect("new_volume failed");
4536
4537            let root_dir = Directory::open(&store, store.root_directory_object_id())
4538                .await
4539                .expect("open failed");
4540
4541            let mut ids = std::collections::HashSet::new();
4542
4543            for i in 0..100 {
4544                let mut transaction = fs
4545                    .clone()
4546                    .new_transaction(
4547                        lock_keys![LockKey::object(store.store_object_id(), root_dir.object_id())],
4548                        Options::default(),
4549                    )
4550                    .await
4551                    .expect("new_transaction failed");
4552
4553                for j in 0..100 {
4554                    let object = root_dir
4555                        .create_child_dir(&mut transaction, &format!("{i}.{j}"))
4556                        .await
4557                        .expect("create_child_file failed");
4558
4559                    assert!(object.object_id() < 1 << 32);
4560                    assert_ne!(object.object_id(), INVALID_OBJECT_ID);
4561                    assert!(ids.insert(object.object_id()));
4562                }
4563
4564                transaction.commit().await.expect("commit failed");
4565            }
4566
4567            assert_matches!(store.store_info().unwrap().last_object_id, LastObjectIdInfo::Low32Bit);
4568
4569            fsck_volume(&fs, store.store_object_id(), None).await.expect("fsck_volume failed");
4570        }
4571
4572        // Verify persistence
4573        fs.close().await.expect("Close failed");
4574        let device = fs.take_device().await;
4575        device.reopen(false);
4576        let fs = FxFilesystem::open(device).await.expect("open failed");
4577        let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
4578        let store = root_vol.volume("test", StoreOptions::default()).await.expect("volume failed");
4579
4580        // Check that we can still create files and they have low 32-bit IDs.
4581        let root_dir =
4582            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
4583        let mut transaction = fs
4584            .clone()
4585            .new_transaction(
4586                lock_keys![LockKey::object(store.store_object_id(), root_dir.object_id())],
4587                Options::default(),
4588            )
4589            .await
4590            .expect("new_transaction failed");
4591
4592        let object = root_dir
4593            .create_child_file(&mut transaction, "persistence_check")
4594            .await
4595            .expect("create_child_file failed");
4596        assert!(object.object_id() < 1 << 32);
4597
4598        transaction.commit().await.expect("commit failed");
4599
4600        assert_matches!(store.store_info().unwrap().last_object_id, LastObjectIdInfo::Low32Bit);
4601    }
4602}