Skip to main content

fxfs/
object_store.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5pub mod allocator;
6pub mod caching_object_handle;
7pub mod data_object_handle;
8pub mod directory;
9pub mod extent;
10mod extent_mapping_iterator;
11mod extent_record;
12mod flush;
13pub mod graveyard;
14mod install;
15pub mod journal;
16mod key_manager;
17pub(crate) mod merge;
18pub mod object_manager;
19pub mod object_record;
20pub mod project_id;
21mod store_object_handle;
22pub mod transaction;
23mod tree;
24mod tree_cache;
25pub mod volume;
26
27pub use data_object_handle::{
28    DataObjectHandle, DirectWriter, FileExtent, FsverityState, FsverityStateInner, RangeType,
29};
30pub use directory::Directory;
31pub use object_record::{ChildValue, DirType, ObjectDescriptor, PosixAttributes, Timestamp};
32pub use store_object_handle::{SetExtendedAttributeMode, StoreObjectHandle};
33
34use crate::errors::FxfsError;
35use crate::filesystem::{
36    ApplyContext, ApplyMode, FxFilesystem, JournalingObject, MAX_FILE_SIZE, SyncOptions,
37    TruncateGuard, TxnGuard,
38};
39use crate::log::*;
40use crate::lsm_tree::cache::{NullCache, ObjectCache};
41use crate::lsm_tree::types::{Existence, Item, ItemRef, LayerIterator};
42use crate::lsm_tree::{LSMTree, Query};
43use crate::object_handle::{INVALID_OBJECT_ID, ObjectHandle, ObjectProperties, ReadObjectHandle};
44use crate::object_store::allocator::Allocator;
45use crate::object_store::graveyard::Graveyard;
46use crate::object_store::journal::{JournalCheckpoint, JournalCheckpointV32, JournaledTransaction};
47use crate::object_store::key_manager::KeyManager;
48use crate::object_store::transaction::{
49    AssocObj, AssociatedObject, LockKey, ObjectStoreMutation, Operation, Options, Transaction,
50    lock_keys,
51};
52use crate::range::RangeExt;
53use crate::round::round_up;
54use crate::serialized_types::{Version, Versioned, VersionedLatest};
55use anyhow::{Context, Error, anyhow, bail, ensure};
56use async_trait::async_trait;
57use fidl_fuchsia_io as fio;
58use fprint::TypeFingerprint;
59use fuchsia_sync::Mutex;
60use fxfs_crypto::ff1::Ff1;
61use fxfs_crypto::{
62    CipherHolder, Crypt, KeyPurpose, ObjectType, StreamCipher, UnwrappedKey, WrappingKeyId,
63    key_to_cipher,
64};
65use fxfs_macros::{Migrate, migrate_to_version};
66use rand::RngCore;
67use scopeguard::ScopeGuard;
68use serde::{Deserialize, Serialize};
69use std::collections::HashSet;
70use std::fmt;
71use std::num::NonZero;
72use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
73use std::sync::{Arc, OnceLock, Weak};
74use storage_device::Device;
75use uuid::Uuid;
76
77pub use extent::Extent;
78pub use extent_record::{ExtentMode, ExtentValue};
79pub use object_record::{
80    AttributeId, AttributeKey, EncryptionKey, EncryptionKeys, ExtendedAttributeValue,
81    FsverityMetadata, FxfsKey, FxfsKeyV40, FxfsKeyV49, ObjectAttributes, ObjectKey, ObjectKeyData,
82    ObjectKind, ObjectValue, ProjectProperty, RootDigest,
83};
84pub use project_id::{ProjectId, ProjectIdExt};
85pub use transaction::Mutation;
86
87// For encrypted stores, the lower 32 bits of the object ID are encrypted to make side-channel
88// attacks more difficult. This mask can be used to extract the hi part of the object ID.
89const OBJECT_ID_HI_MASK: u64 = 0xffffffff00000000;
90
91// At time of writing, this threshold limits transactions that delete extents to about 10,000 bytes.
92const TRANSACTION_MUTATION_THRESHOLD: usize = 200;
93
94// Encrypted files and directories use the fscrypt key (identified by `FSCRYPT_KEY_ID`) to encrypt
95// file contents and filenames respectively. All non-fscrypt encrypted files otherwise default to
96// using the `VOLUME_DATA_KEY_ID` key. Note, the filesystem always uses the `VOLUME_DATA_KEY_ID`
97// key to encrypt large extended attributes. Thus, encrypted files and directories with large
98// xattrs will have both an fscrypt and volume data key.
99pub const VOLUME_DATA_KEY_ID: u64 = 0;
100pub const FSCRYPT_KEY_ID: u64 = 1;
101
102/// A constant that can be used where an owner is expected of type `Weak<dyn StoreOwner>` but no
103/// owner is required.
104pub const NO_OWNER: Weak<()> = Weak::new();
105impl StoreOwner for () {}
106
107#[async_trait]
108pub trait StoreOwner: Send + Sync {
109    /// Forcibly lock the store.  This exists to give the StoreOwner an opportunity to clean up
110    /// tasks which might access the store before locking it, because ObjectStore::unlock can only
111    /// be called when the store is not in use.
112    async fn force_lock(self: Arc<Self>, _store: &ObjectStore) -> Result<(), Error> {
113        Err(anyhow!(FxfsError::Internal))
114    }
115}
116
117/// DataObjectHandle stores an owner that must implement this trait, which allows the handle to get
118/// back to an ObjectStore.
119pub trait HandleOwner: AsRef<ObjectStore> + Send + Sync + 'static {}
120
121/// StoreInfo stores information about the object store.  This is stored within the parent object
122/// store, and is used, for example, to get the persistent layer objects.
123pub type StoreInfo = StoreInfoV52;
124
125#[derive(Clone, Debug, Default, Serialize, Deserialize, TypeFingerprint, Versioned)]
126pub struct StoreInfoV52 {
127    /// The globally unique identifier for the associated object store. If unset, will be all zero.
128    guid: [u8; 16],
129
130    /// The last used object ID.  Note that this field is not accurate in memory; ObjectStore's
131    /// last_object_id field is the one to use in that case.  Technically, this might not be the
132    /// last object ID used for the latest transaction that created an object because we use this at
133    /// the point of creating the object but before we commit the transaction.  Transactions can
134    /// then get committed in an arbitrary order (or not at all).
135    last_object_id: LastObjectIdInfo,
136
137    /// Object ids for layers.  TODO(https://fxbug.dev/42178036): need a layer of indirection here
138    /// so we can support snapshots.
139    pub layers: Vec<u64>,
140
141    /// The object ID for the root directory.
142    root_directory_object_id: u64,
143
144    /// The object ID for the graveyard.
145    graveyard_directory_object_id: u64,
146
147    /// The number of live objects in the store.  This should *not* be trusted; it can be invalid
148    /// due to filesystem inconsistencies.
149    object_count: u64,
150
151    /// The (wrapped) key that encrypted mutations should use.
152    mutations_key: Option<FxfsKeyV49>,
153
154    /// Mutations for the store are encrypted using a stream cipher.  To decrypt the mutations, we
155    /// need to know the offset in the cipher stream to start it.
156    mutations_cipher_offset: u64,
157
158    /// If we have to flush the store whilst we do not have the key, we need to write the encrypted
159    /// mutations to an object. This is the object ID of that file if it exists.
160    pub encrypted_mutations_object_id: u64,
161
162    /// A directory for storing internal files in a directory structure. Holds INVALID_OBJECT_ID
163    /// when the directory doesn't yet exist.
164    internal_directory_object_id: u64,
165}
166
167#[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint)]
168enum LastObjectIdInfo {
169    Unencrypted {
170        id: u64,
171    },
172    Encrypted {
173        /// The *unencrypted* value of the last object ID.
174        id: u64,
175
176        /// Object IDs are encrypted to reduce the amount of information that sequential object IDs
177        /// reveal (such as the number of files in the system and the ordering of their creation in
178        /// time).  Only the bottom 32 bits of the object ID are encrypted whilst the top 32 bits
179        /// will increment after 2^32 object IDs have been used and this allows us to roll the key.
180        key: FxfsKeyV49,
181    },
182    Low32Bit,
183}
184
185impl Default for LastObjectIdInfo {
186    fn default() -> Self {
187        LastObjectIdInfo::Unencrypted { id: 0 }
188    }
189}
190
191#[derive(Default, Serialize, Deserialize, TypeFingerprint, Versioned)]
192pub struct StoreInfoV49 {
193    guid: [u8; 16],
194    last_object_id: u64,
195    layers: Vec<u64>,
196    root_directory_object_id: u64,
197    graveyard_directory_object_id: u64,
198    object_count: u64,
199    mutations_key: Option<FxfsKeyV49>,
200    mutations_cipher_offset: u64,
201    encrypted_mutations_object_id: u64,
202    object_id_key: Option<FxfsKeyV49>,
203    internal_directory_object_id: u64,
204}
205
206impl From<StoreInfoV49> for StoreInfoV52 {
207    fn from(value: StoreInfoV49) -> Self {
208        Self {
209            guid: value.guid,
210            last_object_id: if let Some(key) = value.object_id_key {
211                LastObjectIdInfo::Encrypted { id: value.last_object_id, key: key }
212            } else {
213                LastObjectIdInfo::Unencrypted { id: value.last_object_id }
214            },
215            layers: value.layers,
216            root_directory_object_id: value.root_directory_object_id,
217            graveyard_directory_object_id: value.graveyard_directory_object_id,
218            object_count: value.object_count,
219            mutations_key: value.mutations_key,
220            mutations_cipher_offset: value.mutations_cipher_offset,
221            encrypted_mutations_object_id: value.encrypted_mutations_object_id,
222            internal_directory_object_id: value.internal_directory_object_id,
223        }
224    }
225}
226
227#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
228#[migrate_to_version(StoreInfoV49)]
229pub struct StoreInfoV40 {
230    guid: [u8; 16],
231    last_object_id: u64,
232    layers: Vec<u64>,
233    root_directory_object_id: u64,
234    graveyard_directory_object_id: u64,
235    object_count: u64,
236    mutations_key: Option<FxfsKeyV40>,
237    mutations_cipher_offset: u64,
238    encrypted_mutations_object_id: u64,
239    object_id_key: Option<FxfsKeyV40>,
240    internal_directory_object_id: u64,
241}
242
243impl StoreInfo {
244    /// Returns the parent objects for this store.
245    pub fn parent_objects(&self) -> Vec<u64> {
246        // We should not include the ID of the store itself, since that should be referred to in the
247        // volume directory.
248        let mut objects = self.layers.to_vec();
249        if self.encrypted_mutations_object_id != INVALID_OBJECT_ID {
250            objects.push(self.encrypted_mutations_object_id);
251        }
252        objects
253    }
254}
255
256// TODO(https://fxbug.dev/42178037): We should test or put checks in place to ensure this limit isn't exceeded.
257// It will likely involve placing limits on the maximum number of layers.
258pub const MAX_STORE_INFO_SERIALIZED_SIZE: usize = 131072;
259
260// This needs to be large enough to accommodate the maximum amount of unflushed data (data that is
261// in the journal but hasn't yet been written to layer files) for a store.  We set a limit because
262// we want to limit the amount of memory use in the case the filesystem is corrupt or under attack.
263pub const MAX_ENCRYPTED_MUTATIONS_SIZE: usize = 8 * journal::DEFAULT_RECLAIM_SIZE as usize;
264
265#[derive(Default)]
266pub struct HandleOptions {
267    /// If true, transactions used by this handle will skip journal space checks.
268    pub skip_journal_checks: bool,
269    /// If true, data written to any attribute of this handle will not have per-block checksums
270    /// computed.
271    pub skip_checksums: bool,
272    /// If true, any files using fsverity will not attempt to perform any verification. This is
273    /// useful to open an object without the correct encryption keys to look at the metadata.
274    pub skip_fsverity: bool,
275}
276
277/// Parameters for encrypting a newly created object.
278pub struct ObjectEncryptionOptions {
279    /// If set, the keys are treated as permanent and never evicted from the KeyManager cache.
280    /// This is necessary when keys are managed by another store; for example, the layer files
281    /// of a child store are objects in the root store, but they are encrypted with keys from the
282    /// child store.  Generally, most objects should have this set to `false`.
283    pub permanent: bool,
284    pub key_id: u64,
285    pub key: EncryptionKey,
286    pub unwrapped_key: UnwrappedKey,
287}
288
289pub struct StoreOptions {
290    /// The owner of the store.
291    pub owner: Weak<dyn StoreOwner>,
292
293    /// The store is unencrypted if store is none.
294    pub crypt: Option<Arc<dyn Crypt>>,
295}
296
297impl Default for StoreOptions {
298    fn default() -> Self {
299        Self { owner: NO_OWNER, crypt: None }
300    }
301}
302
303#[derive(Default)]
304pub struct NewChildStoreOptions {
305    pub options: StoreOptions,
306
307    /// Specifies the object ID in the root store to be used for the store.  If set to
308    /// INVALID_OBJECT_ID (the default and typical case), a suitable ID will be chosen.
309    pub object_id: u64,
310
311    /// If true, reserve all 32 bit object_ids.  All new objects will start with IDs exceeding
312    /// 0x1_0000_0000.
313    pub reserve_32bit_object_ids: bool,
314
315    /// Object IDs will be restricted to 32 bits.  This involves a less performant algorithm and so
316    /// should not be used unless necessary.
317    pub low_32_bit_object_ids: bool,
318
319    /// If set, use this GUID for the new store.
320    pub guid: Option<[u8; 16]>,
321}
322
323pub type EncryptedMutations = EncryptedMutationsV49;
324
325#[derive(Clone, Default, Deserialize, Serialize, TypeFingerprint)]
326pub struct EncryptedMutationsV49 {
327    // Information about the mutations are held here, but the actual encrypted data is held within
328    // data.  For each transaction, we record the checkpoint and the count of mutations within the
329    // transaction.  The checkpoint is required for the log file offset (which we need to apply the
330    // mutations), and the version so that we can correctly decode the mutation after it has been
331    // decrypted. The count specifies the number of serialized mutations encoded in |data|.
332    transactions: Vec<(JournalCheckpointV32, u64)>,
333
334    // The encrypted mutations.
335    #[serde(with = "crate::zerocopy_serialization")]
336    data: Vec<u8>,
337
338    // If the mutations key was rolled, this holds the offset in `data` where the new key should
339    // apply.
340    mutations_key_roll: Vec<(usize, FxfsKeyV49)>,
341}
342
343impl std::fmt::Debug for EncryptedMutations {
344    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
345        f.debug_struct("EncryptedMutations")
346            .field("transactions", &self.transactions)
347            .field("len", &self.data.len())
348            .field(
349                "mutations_key_roll",
350                &self.mutations_key_roll.iter().map(|k| k.0).collect::<Vec<usize>>(),
351            )
352            .finish()
353    }
354}
355
356impl Versioned for EncryptedMutations {
357    fn max_serialized_size() -> Option<u64> {
358        Some(MAX_ENCRYPTED_MUTATIONS_SIZE as u64)
359    }
360}
361
362impl From<EncryptedMutationsV40> for EncryptedMutationsV49 {
363    fn from(value: EncryptedMutationsV40) -> Self {
364        EncryptedMutationsV49 {
365            transactions: value.transactions,
366            data: value.data,
367            mutations_key_roll: value
368                .mutations_key_roll
369                .into_iter()
370                .map(|(offset, key)| (offset, key.into()))
371                .collect(),
372        }
373    }
374}
375
376#[derive(Deserialize, Serialize, TypeFingerprint)]
377pub struct EncryptedMutationsV40 {
378    transactions: Vec<(JournalCheckpointV32, u64)>,
379    data: Vec<u8>,
380    mutations_key_roll: Vec<(usize, FxfsKeyV40)>,
381}
382
383impl Versioned for EncryptedMutationsV40 {
384    fn max_serialized_size() -> Option<u64> {
385        Some(MAX_ENCRYPTED_MUTATIONS_SIZE as u64)
386    }
387}
388
389impl EncryptedMutations {
390    fn from_replayed_mutations(
391        store_object_id: u64,
392        transactions: Vec<JournaledTransaction>,
393    ) -> Self {
394        let mut this = Self::default();
395        for JournaledTransaction { checkpoint, non_root_mutations, .. } in transactions {
396            for (object_id, mutation) in non_root_mutations {
397                if store_object_id == object_id {
398                    if let Mutation::EncryptedObjectStore(data) = mutation {
399                        this.push(&checkpoint, data);
400                    } else if let Mutation::UpdateMutationsKey(key) = mutation {
401                        this.mutations_key_roll.push((this.data.len(), key.into()));
402                    }
403                }
404            }
405        }
406        this
407    }
408
409    fn extend(&mut self, other: &EncryptedMutations) {
410        self.transactions.extend_from_slice(&other.transactions[..]);
411        self.mutations_key_roll.extend(
412            other
413                .mutations_key_roll
414                .iter()
415                .map(|(offset, key)| (offset + self.data.len(), key.clone())),
416        );
417        self.data.extend_from_slice(&other.data[..]);
418    }
419
420    fn push(&mut self, checkpoint: &JournalCheckpoint, data: Box<[u8]>) {
421        self.data.append(&mut data.into());
422        // If the checkpoint is the same as the last mutation we pushed, increment the count.
423        if let Some((last_checkpoint, count)) = self.transactions.last_mut() {
424            if last_checkpoint.file_offset == checkpoint.file_offset {
425                *count += 1;
426                return;
427            }
428        }
429        self.transactions.push((checkpoint.clone(), 1));
430    }
431}
432
433pub enum LockState {
434    Locked,
435    Unencrypted,
436    Unlocked { owner: Weak<dyn StoreOwner>, crypt: Arc<dyn Crypt> },
437
438    // The store is unlocked, but in a read-only state, and no flushes or other operations will be
439    // performed on the store.
440    UnlockedReadOnly(Arc<dyn Crypt>),
441
442    // The store is encrypted but is now in an unusable state (due to a failure to sync the journal
443    // after locking the store).  The store cannot be unlocked.
444    Invalid,
445
446    // Before we've read the StoreInfo we might not know whether the store is Locked or Unencrypted.
447    // This can happen when lazily opening stores (ObjectManager::lazy_open_store).
448    Unknown,
449
450    // The store is in the process of being locked.  Whilst the store is being locked, the store
451    // isn't usable; assertions will trip if any mutations are applied.
452    Locking,
453
454    // Whilst we're unlocking, we will replay encrypted mutations.  The store isn't usable until
455    // it's in the Unlocked state.
456    Unlocking,
457
458    // The store has been deleted.
459    Deleted,
460}
461
462impl LockState {
463    fn owner(&self) -> Option<Arc<dyn StoreOwner>> {
464        if let Self::Unlocked { owner, .. } = self { owner.upgrade() } else { None }
465    }
466}
467
468impl fmt::Debug for LockState {
469    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
470        formatter.write_str(match self {
471            LockState::Locked => "Locked",
472            LockState::Unencrypted => "Unencrypted",
473            LockState::Unlocked { .. } => "Unlocked",
474            LockState::UnlockedReadOnly(..) => "UnlockedReadOnly",
475            LockState::Invalid => "Invalid",
476            LockState::Unknown => "Unknown",
477            LockState::Locking => "Locking",
478            LockState::Unlocking => "Unlocking",
479            LockState::Deleted => "Deleted",
480        })
481    }
482}
483
484enum LastObjectId {
485    // This is used when the store is encrypted, but the key and ID isn't yet available.
486    Pending,
487
488    Unencrypted {
489        id: u64,
490    },
491
492    Encrypted {
493        // The *unencrypted* value of the last object ID.
494        id: u64,
495
496        // Encrypted stores will use a cipher to obfuscate the object ID.
497        cipher: Box<Ff1>,
498    },
499
500    Low32Bit {
501        reserved: HashSet<u32>,
502        unreserved: Vec<u32>,
503    },
504}
505
506impl LastObjectId {
507    /// Tries to get the next object ID.  Returns None if a new cipher is required because all
508    /// object IDs that can be generated with the current cipher have been exhausted, or if only
509    /// using the lower 32 bits which requires an async algorithm.
510    fn try_get_next(&mut self) -> Option<NonZero<u64>> {
511        match self {
512            LastObjectId::Unencrypted { id } => {
513                NonZero::new(id.wrapping_add(1)).inspect(|next| *id = next.get())
514            }
515            LastObjectId::Encrypted { id, cipher } => {
516                let mut next = *id;
517                let hi = next & OBJECT_ID_HI_MASK;
518                loop {
519                    if next as u32 == u32::MAX {
520                        return None;
521                    }
522                    next += 1;
523                    let candidate = hi | cipher.encrypt(next as u32) as u64;
524                    if let Some(candidate) = NonZero::new(candidate) {
525                        *id = next;
526                        return Some(candidate);
527                    }
528                }
529            }
530            _ => None,
531        }
532    }
533
534    /// Returns INVALID_OBJECT_ID if it's not possible to peek at the next object ID.
535    fn peek_next(&self) -> u64 {
536        match self {
537            LastObjectId::Unencrypted { id } => id.wrapping_add(1),
538            LastObjectId::Encrypted { id, cipher } => {
539                let mut next = *id;
540                let hi = next & OBJECT_ID_HI_MASK;
541                loop {
542                    if next as u32 == u32::MAX {
543                        return INVALID_OBJECT_ID;
544                    }
545                    next += 1;
546                    let candidate = hi | cipher.encrypt(next as u32) as u64;
547                    if candidate != INVALID_OBJECT_ID {
548                        return candidate;
549                    }
550                }
551            }
552            _ => INVALID_OBJECT_ID,
553        }
554    }
555
556    /// Returns INVALID_OBJECT_ID for algorithms that don't use the last ID.
557    fn id(&self) -> u64 {
558        match self {
559            LastObjectId::Unencrypted { id } | LastObjectId::Encrypted { id, .. } => *id,
560            _ => INVALID_OBJECT_ID,
561        }
562    }
563
564    /// Returns true if `id` is reserved (it must be 32 bits).
565    fn is_reserved(&self, id: u64) -> bool {
566        match self {
567            LastObjectId::Low32Bit { reserved, .. } => {
568                if let Ok(id) = id.try_into() {
569                    reserved.contains(&id)
570                } else {
571                    false
572                }
573            }
574            _ => false,
575        }
576    }
577
578    /// Reserves `id`.
579    fn reserve(&mut self, id: u64) {
580        match self {
581            LastObjectId::Low32Bit { reserved, .. } => {
582                assert!(reserved.insert(id.try_into().unwrap()))
583            }
584            _ => unreachable!(),
585        }
586    }
587
588    /// Unreserves `id`.
589    fn unreserve(&mut self, id: u64) {
590        match self {
591            LastObjectId::Low32Bit { unreserved, .. } => {
592                // To avoid races, where a reserved ID transitions from being reserved to being
593                // actually used in a committed transaction, we delay updating `reserved` until a
594                // suitable point.
595                //
596                // On thread A, we might have:
597                //
598                //   A1. Commit transaction (insert a record into the LSM tree that uses ID)
599                //   A2. `unreserve`
600                //
601                // And on another thread B, we might have:
602                //
603                //   B1. Drain `unreserved`.
604                //   B2. Check tree and `reserved` to see if ID is used.
605                //
606                // B2 will involve calling `LsmTree::layer_set` which should be thought of as a
607                // snapshot, so the change A1 might not be visible to thread B, but it won't matter
608                // because `reserved` will still include the ID.  So long as each thread does the
609                // operations in this order, it should be safe.
610                unreserved.push(id.try_into().unwrap())
611            }
612            _ => {}
613        }
614    }
615
616    /// Removes `unreserved` IDs from the `reserved` list.
617    fn drain_unreserved(&mut self) {
618        match self {
619            LastObjectId::Low32Bit { reserved, unreserved } => {
620                for u in unreserved.drain(..) {
621                    assert!(reserved.remove(&u));
622                }
623            }
624            _ => {}
625        }
626    }
627}
628
629pub struct ReservedId<'a>(&'a ObjectStore, NonZero<u64>);
630
631impl<'a> ReservedId<'a> {
632    fn new(store: &'a ObjectStore, id: NonZero<u64>) -> Self {
633        Self(store, id)
634    }
635
636    pub fn get(&self) -> u64 {
637        self.1.get()
638    }
639
640    /// The caller takes responsibility for this id.
641    #[must_use]
642    pub fn release(self) -> u64 {
643        let id = self.1.get();
644        std::mem::forget(self);
645        id
646    }
647}
648
649impl Drop for ReservedId<'_> {
650    fn drop(&mut self) {
651        self.0.last_object_id.lock().unreserve(self.1.get());
652    }
653}
654
655/// An object store supports a file like interface for objects.  Objects are keyed by a 64 bit
656/// identifier.  And object store has to be backed by a parent object store (which stores metadata
657/// for the object store).  The top-level object store (a.k.a. the root parent object store) is
658/// in-memory only.
659pub struct ObjectStore {
660    parent_store: Option<Arc<ObjectStore>>,
661    store_object_id: u64,
662    device: Arc<dyn Device>,
663    block_size: u64,
664    filesystem: Weak<FxFilesystem>,
665    // Lock ordering: This must be taken before `lock_state`.
666    store_info: Mutex<Option<StoreInfo>>,
667    tree: LSMTree<ObjectKey, ObjectValue>,
668
669    // When replaying the journal, the store cannot read StoreInfo until the whole journal
670    // has been replayed, so during that time, store_info_handle will be None and records
671    // just get sent to the tree. Once the journal has been replayed, we can open the store
672    // and load all the other layer information.
673    store_info_handle: OnceLock<DataObjectHandle<ObjectStore>>,
674
675    // The cipher to use for encrypted mutations, if this store is encrypted.
676    mutations_cipher: Mutex<Option<StreamCipher>>,
677
678    // Current lock state of the store.
679    // Lock ordering: This must be taken after `store_info`.
680    lock_state: Mutex<LockState>,
681    pub key_manager: KeyManager,
682
683    // Enable/disable tracing.
684    trace: AtomicBool,
685
686    // Informational counters for events occurring within the store.
687    counters: Mutex<ObjectStoreCounters>,
688
689    // These are updated in performance-sensitive code paths so we use atomics instead of counters.
690    device_read_ops: AtomicU64,
691    device_write_ops: AtomicU64,
692    logical_read_ops: AtomicU64,
693    logical_write_ops: AtomicU64,
694    graveyard_entries: AtomicU64,
695
696    // Contains the last object ID and, optionally, a cipher to be used when generating new object
697    // IDs.
698    last_object_id: Mutex<LastObjectId>,
699
700    // An optional callback to be invoked each time the ObjectStore flushes.  The callback is
701    // invoked at the end of flush, while the write lock is still held.
702    flush_callback: Mutex<Option<Box<dyn Fn(&ObjectStore) + Send + Sync + 'static>>>,
703}
704
705#[derive(Clone, Default)]
706struct ObjectStoreCounters {
707    mutations_applied: u64,
708    mutations_dropped: u64,
709    num_flushes: u64,
710    last_flush_time: Option<std::time::SystemTime>,
711}
712
713impl ObjectStore {
714    fn new(
715        parent_store: Option<Arc<ObjectStore>>,
716        store_object_id: u64,
717        filesystem: Arc<FxFilesystem>,
718        store_info: Option<StoreInfo>,
719        object_cache: Box<dyn ObjectCache<ObjectKey, ObjectValue>>,
720        mutations_cipher: Option<StreamCipher>,
721        lock_state: LockState,
722        last_object_id: LastObjectId,
723    ) -> Arc<ObjectStore> {
724        let device = filesystem.device();
725        let block_size = filesystem.block_size();
726        Arc::new(ObjectStore {
727            parent_store,
728            store_object_id,
729            device,
730            block_size,
731            filesystem: Arc::downgrade(&filesystem),
732            store_info: Mutex::new(store_info),
733            tree: LSMTree::new(merge::merge, object_cache),
734            store_info_handle: OnceLock::new(),
735            mutations_cipher: Mutex::new(mutations_cipher),
736            lock_state: Mutex::new(lock_state),
737            key_manager: KeyManager::new(),
738            trace: AtomicBool::new(false),
739            counters: Mutex::new(ObjectStoreCounters::default()),
740            device_read_ops: AtomicU64::new(0),
741            device_write_ops: AtomicU64::new(0),
742            logical_read_ops: AtomicU64::new(0),
743            logical_write_ops: AtomicU64::new(0),
744            graveyard_entries: AtomicU64::new(0),
745            last_object_id: Mutex::new(last_object_id),
746            flush_callback: Mutex::new(None),
747        })
748    }
749
750    fn new_empty(
751        parent_store: Option<Arc<ObjectStore>>,
752        store_object_id: u64,
753        filesystem: Arc<FxFilesystem>,
754        object_cache: Box<dyn ObjectCache<ObjectKey, ObjectValue>>,
755    ) -> Arc<Self> {
756        Self::new(
757            parent_store,
758            store_object_id,
759            filesystem,
760            Some(StoreInfo::default()),
761            object_cache,
762            None,
763            LockState::Unencrypted,
764            LastObjectId::Unencrypted { id: 0 },
765        )
766    }
767
768    /// Cycle breaker constructor that returns an ObjectStore without a filesystem.
769    /// This should only be used from super block code.
770    pub fn new_root_parent(device: Arc<dyn Device>, block_size: u64, store_object_id: u64) -> Self {
771        ObjectStore {
772            parent_store: None,
773            store_object_id,
774            device,
775            block_size,
776            filesystem: Weak::<FxFilesystem>::new(),
777            store_info: Mutex::new(Some(StoreInfo::default())),
778            tree: LSMTree::new(merge::merge, Box::new(NullCache {})),
779            store_info_handle: OnceLock::new(),
780            mutations_cipher: Mutex::new(None),
781            lock_state: Mutex::new(LockState::Unencrypted),
782            key_manager: KeyManager::new(),
783            trace: AtomicBool::new(false),
784            counters: Mutex::new(ObjectStoreCounters::default()),
785            device_read_ops: AtomicU64::new(0),
786            device_write_ops: AtomicU64::new(0),
787            logical_read_ops: AtomicU64::new(0),
788            logical_write_ops: AtomicU64::new(0),
789            graveyard_entries: AtomicU64::new(0),
790            last_object_id: Mutex::new(LastObjectId::Unencrypted { id: 0 }),
791            flush_callback: Mutex::new(None),
792        }
793    }
794
795    /// Used to set filesystem on root_parent stores at bootstrap time after the filesystem has
796    /// been created.
797    pub fn attach_filesystem(mut this: ObjectStore, filesystem: Arc<FxFilesystem>) -> ObjectStore {
798        this.filesystem = Arc::downgrade(&filesystem);
799        this
800    }
801
802    /// Create a child store. It is a multi-step process:
803    ///
804    ///   1. Call `ObjectStore::new_child_store`.
805    ///   2. Register the store with the object-manager.
806    ///   3. Call `ObjectStore::create` to write the store-info.
807    ///
808    /// If the procedure fails, care must be taken to unregister store with the object-manager.
809    ///
810    /// The steps have to be separate because of lifetime issues when working with a transaction.
811    async fn new_child_store(
812        self: &Arc<Self>,
813        transaction: &mut Transaction<'_>,
814        options: NewChildStoreOptions,
815        object_cache: Box<dyn ObjectCache<ObjectKey, ObjectValue>>,
816    ) -> Result<Arc<Self>, Error> {
817        ensure!(
818            !options.reserve_32bit_object_ids || !options.low_32_bit_object_ids,
819            FxfsError::InvalidArgs
820        );
821        let handle = if let Some(object_id) = NonZero::new(options.object_id) {
822            self.update_last_object_id(object_id.get());
823            let handle = ObjectStore::create_object_with_id(
824                self,
825                transaction,
826                ReservedId::new(self, object_id),
827                HandleOptions::default(),
828                None,
829            )?;
830            handle
831        } else {
832            ObjectStore::create_object(self, transaction, HandleOptions::default(), None).await?
833        };
834        let filesystem = self.filesystem();
835        let id = if options.reserve_32bit_object_ids { 0x1_0000_0000 } else { 0 };
836        let (last_object_id, last_object_id_in_memory) = if options.low_32_bit_object_ids {
837            (
838                LastObjectIdInfo::Low32Bit,
839                LastObjectId::Low32Bit { reserved: HashSet::new(), unreserved: Vec::new() },
840            )
841        } else if let Some(crypt) = &options.options.crypt {
842            let (object_id_wrapped, object_id_unwrapped) =
843                crypt.create_key(handle.object_id(), KeyPurpose::Metadata).await?;
844            (
845                LastObjectIdInfo::Encrypted { id, key: object_id_wrapped },
846                LastObjectId::Encrypted { id, cipher: Box::new(Ff1::new(&object_id_unwrapped)) },
847            )
848        } else {
849            (LastObjectIdInfo::Unencrypted { id }, LastObjectId::Unencrypted { id })
850        };
851        let store = if let Some(crypt) = options.options.crypt {
852            let (wrapped_key, unwrapped_key) =
853                crypt.create_key(handle.object_id(), KeyPurpose::Metadata).await?;
854            Self::new(
855                Some(self.clone()),
856                handle.object_id(),
857                filesystem.clone(),
858                Some(StoreInfo {
859                    mutations_key: Some(wrapped_key),
860                    last_object_id,
861                    guid: options.guid.unwrap_or_else(|| *Uuid::new_v4().as_bytes()),
862                    ..Default::default()
863                }),
864                object_cache,
865                Some(StreamCipher::new(&unwrapped_key, 0)),
866                LockState::Unlocked { owner: options.options.owner, crypt },
867                last_object_id_in_memory,
868            )
869        } else {
870            Self::new(
871                Some(self.clone()),
872                handle.object_id(),
873                filesystem.clone(),
874                Some(StoreInfo {
875                    last_object_id,
876                    guid: options.guid.unwrap_or_else(|| *Uuid::new_v4().as_bytes()),
877                    ..Default::default()
878                }),
879                object_cache,
880                None,
881                LockState::Unencrypted,
882                last_object_id_in_memory,
883            )
884        };
885        assert!(store.store_info_handle.set(handle).is_ok());
886        Ok(store)
887    }
888
889    /// Actually creates the store in a transaction.  This will also create a root directory and
890    /// graveyard directory for the store.  See `new_child_store` above.
891    async fn create<'a>(
892        self: &'a Arc<Self>,
893        transaction: &mut Transaction<'a>,
894    ) -> Result<(), Error> {
895        let buf = {
896            // Create a root directory and graveyard directory.
897            let graveyard_directory_object_id = Graveyard::create(transaction, &self).await?;
898            let root_directory = Directory::create(transaction, &self, None).await?;
899
900            let serialized_info = {
901                let mut store_info = self.store_info.lock();
902                let store_info = store_info.as_mut().unwrap();
903
904                store_info.graveyard_directory_object_id = graveyard_directory_object_id;
905                store_info.root_directory_object_id = root_directory.object_id();
906
907                let mut serialized_info = Vec::new();
908                store_info.serialize_with_version(&mut serialized_info)?;
909                serialized_info
910            };
911            let mut buf = self.device.allocate_buffer(serialized_info.len()).await;
912            buf.as_mut_slice().copy_from_slice(&serialized_info[..]);
913            buf
914        };
915
916        if self.filesystem().options().image_builder_mode.is_some() {
917            // If we're in image builder mode, we want to avoid writing to disk unless explicitly
918            // asked to. New object stores will have their StoreInfo written when we compact in
919            // FxFilesystem::finalize().
920            Ok(())
921        } else {
922            self.store_info_handle.get().unwrap().txn_write(transaction, 0u64, buf.as_ref()).await
923        }
924    }
925
926    pub fn set_trace(&self, trace: bool) {
927        let old_value = self.trace.swap(trace, Ordering::Relaxed);
928        if trace != old_value {
929            info!(store_id = self.store_object_id(), trace; "OS: trace",);
930        }
931    }
932
933    /// Sets a callback to be invoked each time the ObjectStore flushes.  The callback is invoked at
934    /// the end of flush, while the write lock is still held.
935    pub fn set_flush_callback<F: Fn(&ObjectStore) + Send + Sync + 'static>(&self, callback: F) {
936        let mut flush_callback = self.flush_callback.lock();
937        *flush_callback = Some(Box::new(callback));
938    }
939
940    pub fn is_root(&self) -> bool {
941        if let Some(parent) = &self.parent_store {
942            parent.parent_store.is_none()
943        } else {
944            // The root parent store isn't the root store.
945            false
946        }
947    }
948
949    /// Populates an inspect node with store statistics.
950    pub fn record_data(self: &Arc<Self>, root: &fuchsia_inspect::Node) {
951        // TODO(https://fxbug.dev/42069513): Push-back or rate-limit to prevent DoS.
952        let counters = self.counters.lock();
953        if let Some(store_info) = self.store_info() {
954            root.record_string("guid", Uuid::from_bytes(store_info.guid).to_string());
955        };
956        root.record_uint("store_object_id", self.store_object_id);
957        root.record_uint("mutations_applied", counters.mutations_applied);
958        root.record_uint("mutations_dropped", counters.mutations_dropped);
959        root.record_uint("num_flushes", counters.num_flushes);
960        if let Some(last_flush_time) = counters.last_flush_time.as_ref() {
961            root.record_uint(
962                "last_flush_time_ms",
963                last_flush_time
964                    .duration_since(std::time::UNIX_EPOCH)
965                    .unwrap_or(std::time::Duration::ZERO)
966                    .as_millis()
967                    .try_into()
968                    .unwrap_or(0u64),
969            );
970        }
971        root.record_uint("device_read_ops", self.device_read_ops.load(Ordering::Relaxed));
972        root.record_uint("device_write_ops", self.device_write_ops.load(Ordering::Relaxed));
973        root.record_uint("logical_read_ops", self.logical_read_ops.load(Ordering::Relaxed));
974        root.record_uint("logical_write_ops", self.logical_write_ops.load(Ordering::Relaxed));
975        root.record_uint("graveyard_entries", self.graveyard_entries.load(Ordering::Relaxed));
976        {
977            let last_object_id = self.last_object_id.lock();
978            root.record_uint("object_id_hi", last_object_id.id() >> 32);
979            root.record_bool(
980                "low_32_bit_object_ids",
981                matches!(&*last_object_id, LastObjectId::Low32Bit { .. }),
982            );
983        }
984
985        let this = self.clone();
986        root.record_child("lsm_tree", move |node| this.tree().record_inspect_data(node));
987    }
988
989    pub fn device(&self) -> &Arc<dyn Device> {
990        &self.device
991    }
992
993    pub fn block_size(&self) -> u64 {
994        self.block_size
995    }
996
997    pub fn filesystem(&self) -> Arc<FxFilesystem> {
998        self.filesystem.upgrade().unwrap()
999    }
1000
1001    pub fn store_object_id(&self) -> u64 {
1002        self.store_object_id
1003    }
1004
1005    pub fn tree(&self) -> &LSMTree<ObjectKey, ObjectValue> {
1006        &self.tree
1007    }
1008
1009    pub fn root_directory_object_id(&self) -> u64 {
1010        self.store_info.lock().as_ref().unwrap().root_directory_object_id
1011    }
1012
1013    pub fn guid(&self) -> [u8; 16] {
1014        self.store_info.lock().as_ref().unwrap().guid
1015    }
1016
1017    pub fn graveyard_directory_object_id(&self) -> u64 {
1018        self.store_info.lock().as_ref().unwrap().graveyard_directory_object_id
1019    }
1020
1021    fn set_graveyard_directory_object_id(&self, oid: u64) {
1022        assert_eq!(
1023            std::mem::replace(
1024                &mut self.store_info.lock().as_mut().unwrap().graveyard_directory_object_id,
1025                oid
1026            ),
1027            INVALID_OBJECT_ID
1028        );
1029    }
1030
1031    pub fn object_count(&self) -> u64 {
1032        self.store_info.lock().as_ref().unwrap().object_count
1033    }
1034
1035    /// Returns INVALID_OBJECT_ID for algorithms that don't use the last ID.
1036    pub(crate) fn unencrypted_last_object_id(&self) -> u64 {
1037        self.last_object_id.lock().id()
1038    }
1039
1040    pub fn key_manager(&self) -> &KeyManager {
1041        &self.key_manager
1042    }
1043
1044    pub fn parent_store(&self) -> Option<&Arc<ObjectStore>> {
1045        self.parent_store.as_ref()
1046    }
1047
1048    /// Returns the crypt object for the store.  Returns None if the store is unencrypted.
1049    pub fn crypt(&self) -> Option<Arc<dyn Crypt>> {
1050        match &*self.lock_state.lock() {
1051            LockState::Locked => panic!("Store is locked"),
1052            LockState::Invalid
1053            | LockState::Unencrypted
1054            | LockState::Locking
1055            | LockState::Unlocking
1056            | LockState::Deleted => None,
1057            LockState::Unlocked { crypt, .. } => Some(crypt.clone()),
1058            LockState::UnlockedReadOnly(crypt) => Some(crypt.clone()),
1059            LockState::Unknown => {
1060                panic!("Store is of unknown lock state; has the journal been replayed yet?")
1061            }
1062        }
1063    }
1064
1065    /// Returns the id of the internal directory. Returns a NotFound error if this has not been
1066    /// initialized.
1067    pub fn get_internal_directory_id(self: &Arc<Self>) -> Result<u64, Error> {
1068        if let Some(store_info) = self.store_info.lock().as_ref() {
1069            if store_info.internal_directory_object_id == INVALID_OBJECT_ID {
1070                Err(FxfsError::NotFound.into())
1071            } else {
1072                Ok(store_info.internal_directory_object_id)
1073            }
1074        } else {
1075            Err(FxfsError::Unavailable.into())
1076        }
1077    }
1078
1079    pub async fn get_or_create_internal_directory_id(self: &Arc<Self>) -> Result<u64, Error> {
1080        // Create the transaction first to use the object store lock.
1081        let mut transaction = self
1082            .filesystem()
1083            .new_transaction(
1084                lock_keys![LockKey::InternalDirectory { store_object_id: self.store_object_id }],
1085                Options::default(),
1086            )
1087            .await?;
1088        let obj_id = self.store_info.lock().as_ref().unwrap().internal_directory_object_id;
1089        if obj_id != INVALID_OBJECT_ID {
1090            return Ok(obj_id);
1091        }
1092
1093        // Need to create an internal directory.
1094        let directory = Directory::create(&mut transaction, self, None).await?;
1095
1096        transaction.add(self.store_object_id, Mutation::CreateInternalDir(directory.object_id()));
1097        transaction.commit().await?;
1098        Ok(directory.object_id())
1099    }
1100
1101    /// Returns the file size for the object without opening the object.
1102    async fn get_file_size(&self, object_id: u64) -> Result<u64, Error> {
1103        let item = self
1104            .tree
1105            .find(&ObjectKey::attribute(object_id, AttributeId::DATA, AttributeKey::Attribute))
1106            .await?
1107            .ok_or(FxfsError::NotFound)?;
1108        if let ObjectValue::Attribute { size, .. } = item.value {
1109            Ok(size)
1110        } else {
1111            bail!(FxfsError::NotFile);
1112        }
1113    }
1114
1115    #[cfg(feature = "migration")]
1116    pub fn last_object_id(&self) -> u64 {
1117        self.last_object_id.lock().id()
1118    }
1119
1120    /// Provides access to the allocator to mark a specific region of the device as allocated.
1121    #[cfg(feature = "migration")]
1122    pub fn mark_allocated(
1123        &self,
1124        transaction: &mut Transaction<'_>,
1125        store_object_id: u64,
1126        device_range: std::ops::Range<u64>,
1127    ) -> Result<(), Error> {
1128        self.allocator().mark_allocated(transaction, store_object_id, device_range)
1129    }
1130
1131    /// `crypt` can be provided if the crypt service should be different to the default; see the
1132    /// comment on create_object.  Users should avoid having more than one handle open for the same
1133    /// object at the same time because they might get out-of-sync; there is no code that will
1134    /// prevent this.  One example where this can cause an issue is if the object ends up using a
1135    /// permanent key (which is the case if a value is passed for `crypt`), the permanent key is
1136    /// dropped when a handle is dropped, which will impact any other handles for the same object.
1137    pub async fn open_object<S: HandleOwner>(
1138        owner: &Arc<S>,
1139        obj_id: u64,
1140        options: HandleOptions,
1141        crypt: Option<Arc<dyn Crypt>>,
1142    ) -> Result<DataObjectHandle<S>, Error> {
1143        let store = owner.as_ref().as_ref();
1144        let mut fsverity_descriptor = None;
1145        let mut overwrite_ranges = Vec::new();
1146        let item = store
1147            .tree
1148            .find(&ObjectKey::attribute(obj_id, AttributeId::DATA, AttributeKey::Attribute))
1149            .await?
1150            .ok_or(FxfsError::NotFound)?;
1151
1152        let (size, track_overwrite_extents) = match item.value {
1153            ObjectValue::Attribute { size, has_overwrite_extents } => (size, has_overwrite_extents),
1154            ObjectValue::VerifiedAttribute { size, fsverity_metadata } => {
1155                if !options.skip_fsverity {
1156                    fsverity_descriptor = Some(fsverity_metadata);
1157                }
1158                // We only track the overwrite extents in memory for writes, reads handle them
1159                // implicitly, which means verified files (where the data won't change anymore)
1160                // don't need to track them.
1161                (size, false)
1162            }
1163            _ => bail!(anyhow!(FxfsError::Inconsistent).context("open_object: Expected attibute")),
1164        };
1165
1166        ensure!(size <= MAX_FILE_SIZE, FxfsError::Inconsistent);
1167
1168        if track_overwrite_extents {
1169            let layer_set = store.tree.layer_set();
1170            let mut merger = layer_set.merger();
1171            let mut iter = merger
1172                .query(Query::FullRange(&ObjectKey::attribute(
1173                    obj_id,
1174                    AttributeId::DATA,
1175                    AttributeKey::Extent(Extent::search_key_from_offset(0)),
1176                )))
1177                .await?;
1178            loop {
1179                match iter.get() {
1180                    Some(ItemRef {
1181                        key:
1182                            ObjectKey {
1183                                object_id,
1184                                data:
1185                                    ObjectKeyData::Attribute(
1186                                        AttributeId::DATA,
1187                                        AttributeKey::Extent(extent),
1188                                    ),
1189                            },
1190                        value,
1191                        ..
1192                    }) if *object_id == obj_id => {
1193                        match value {
1194                            ObjectValue::Extent(ExtentValue::None)
1195                            | ObjectValue::Extent(ExtentValue::Some {
1196                                mode: ExtentMode::Raw,
1197                                ..
1198                            })
1199                            | ObjectValue::Extent(ExtentValue::Some {
1200                                mode: ExtentMode::Cow(_),
1201                                ..
1202                            }) => (),
1203                            ObjectValue::Extent(ExtentValue::Some {
1204                                mode: ExtentMode::OverwritePartial(_),
1205                                ..
1206                            })
1207                            | ObjectValue::Extent(ExtentValue::Some {
1208                                mode: ExtentMode::Overwrite,
1209                                ..
1210                            }) => overwrite_ranges.push(extent.clone().into()),
1211                            _ => bail!(
1212                                anyhow!(FxfsError::Inconsistent)
1213                                    .context("open_object: Expected extent")
1214                            ),
1215                        }
1216                        iter.advance().await?;
1217                    }
1218                    _ => break,
1219                }
1220            }
1221        }
1222
1223        // If a crypt service has been specified, it needs to be a permanent key because cached
1224        // keys can only use the store's crypt service.
1225        let permanent = if let Some(crypt) = crypt {
1226            store
1227                .key_manager
1228                .get_keys(
1229                    obj_id,
1230                    crypt.as_ref(),
1231                    &mut Some(async || store.get_keys(obj_id).await),
1232                    /* permanent= */ true,
1233                    /* force= */ false,
1234                )
1235                .await?;
1236            true
1237        } else {
1238            false
1239        };
1240        let data_object_handle = DataObjectHandle::new(
1241            owner.clone(),
1242            obj_id,
1243            permanent,
1244            AttributeId::DATA,
1245            size,
1246            FsverityState::None,
1247            options,
1248            false,
1249            &overwrite_ranges,
1250        );
1251        if let Some(descriptor) = fsverity_descriptor {
1252            data_object_handle
1253                .set_fsverity_state_some(descriptor)
1254                .await
1255                .context("Invalid or mismatched merkle tree")?;
1256        }
1257        Ok(data_object_handle)
1258    }
1259
1260    pub fn create_object_with_id<S: HandleOwner>(
1261        owner: &Arc<S>,
1262        transaction: &mut Transaction<'_>,
1263        reserved_object_id: ReservedId<'_>,
1264        options: HandleOptions,
1265        encryption_options: Option<ObjectEncryptionOptions>,
1266    ) -> Result<DataObjectHandle<S>, Error> {
1267        let store = owner.as_ref().as_ref();
1268        // Don't permit creating unencrypted objects in an encrypted store.  The converse is OK.
1269        debug_assert!(store.crypt().is_none() || encryption_options.is_some());
1270        let now = Timestamp::now();
1271        let object_id = reserved_object_id.get();
1272        assert!(
1273            transaction
1274                .add(
1275                    store.store_object_id(),
1276                    Mutation::insert_object(
1277                        ObjectKey::object(reserved_object_id.release()),
1278                        ObjectValue::file(
1279                            1,
1280                            0,
1281                            now.clone(),
1282                            now.clone(),
1283                            now.clone(),
1284                            now,
1285                            None,
1286                            None
1287                        ),
1288                    ),
1289                )
1290                .is_none()
1291        );
1292        let mut permanent_keys = false;
1293        if let Some(ObjectEncryptionOptions { permanent, key_id, key, unwrapped_key }) =
1294            encryption_options
1295        {
1296            permanent_keys = permanent;
1297            let cipher = key_to_cipher(&key, &unwrapped_key)?;
1298            transaction.add(
1299                store.store_object_id(),
1300                Mutation::insert_object(
1301                    ObjectKey::keys(object_id),
1302                    ObjectValue::keys(vec![(key_id, key)].into()),
1303                ),
1304            );
1305            store.key_manager.insert(
1306                object_id,
1307                Arc::new(vec![(key_id, CipherHolder::Cipher(cipher))].into()),
1308                permanent,
1309            );
1310        }
1311        transaction.add(
1312            store.store_object_id(),
1313            Mutation::insert_object(
1314                ObjectKey::attribute(object_id, AttributeId::DATA, AttributeKey::Attribute),
1315                // This is a new object so nothing has pre-allocated overwrite extents yet.
1316                ObjectValue::attribute(0, false),
1317            ),
1318        );
1319        Ok(DataObjectHandle::new(
1320            owner.clone(),
1321            object_id,
1322            permanent_keys,
1323            AttributeId::DATA,
1324            0,
1325            FsverityState::None,
1326            options,
1327            false,
1328            &[],
1329        ))
1330    }
1331
1332    /// Creates an object in the store.
1333    ///
1334    /// If the store is encrypted, the object will be automatically encrypted as well.
1335    /// If `wrapping_key_id` is set, the new keys will be wrapped with that specific key, and
1336    /// otherwise the default data key is used.
1337    pub async fn create_object<S: HandleOwner>(
1338        owner: &Arc<S>,
1339        mut transaction: &mut Transaction<'_>,
1340        options: HandleOptions,
1341        wrapping_key_id: Option<WrappingKeyId>,
1342    ) -> Result<DataObjectHandle<S>, Error> {
1343        let store = owner.as_ref().as_ref();
1344        let object_id = store.get_next_object_id(transaction.txn_guard()).await?;
1345        let crypt = store.crypt();
1346        let encryption_options = if let Some(crypt) = crypt {
1347            let key_id =
1348                if wrapping_key_id.is_some() { FSCRYPT_KEY_ID } else { VOLUME_DATA_KEY_ID };
1349            let (key, unwrapped_key) = if let Some(wrapping_key_id) = wrapping_key_id {
1350                crypt.create_key_with_id(object_id.get(), wrapping_key_id, ObjectType::File).await?
1351            } else {
1352                let (fxfs_key, unwrapped_key) =
1353                    crypt.create_key(object_id.get(), KeyPurpose::Data).await?;
1354                (EncryptionKey::Fxfs(fxfs_key), unwrapped_key)
1355            };
1356            Some(ObjectEncryptionOptions { permanent: false, key_id, key, unwrapped_key })
1357        } else {
1358            None
1359        };
1360        ObjectStore::create_object_with_id(
1361            owner,
1362            &mut transaction,
1363            object_id,
1364            options,
1365            encryption_options,
1366        )
1367    }
1368
1369    /// Creates an object using explicitly provided keys.
1370    ///
1371    /// There are some cases where an encrypted object needs to be created in an unencrypted store.
1372    /// For example, when layer files for a child store are created in the root store, but they must
1373    /// be encrypted using the child store's keys.  This method exists for that purpose.
1374    pub(crate) async fn create_object_with_key<S: HandleOwner>(
1375        owner: &Arc<S>,
1376        mut transaction: &mut Transaction<'_>,
1377        object_id: ReservedId<'_>,
1378        options: HandleOptions,
1379        key: EncryptionKey,
1380        unwrapped_key: UnwrappedKey,
1381    ) -> Result<DataObjectHandle<S>, Error> {
1382        ObjectStore::create_object_with_id(
1383            owner,
1384            &mut transaction,
1385            object_id,
1386            options,
1387            Some(ObjectEncryptionOptions {
1388                permanent: true,
1389                key_id: VOLUME_DATA_KEY_ID,
1390                key,
1391                unwrapped_key,
1392            }),
1393        )
1394    }
1395
1396    /// Adjusts the reference count for a given object.  If the reference count reaches zero, the
1397    /// object is moved into the graveyard and true is returned.
1398    pub async fn adjust_refs(
1399        &self,
1400        transaction: &mut Transaction<'_>,
1401        object_id: u64,
1402        delta: i64,
1403    ) -> Result<bool, Error> {
1404        let mut mutation = self.txn_get_object_mutation(transaction, object_id).await?;
1405        let refs = if let ObjectValue::Object {
1406            kind: ObjectKind::File { refs, .. } | ObjectKind::Symlink { refs, .. },
1407            ..
1408        } = &mut mutation.item.value
1409        {
1410            *refs =
1411                refs.checked_add_signed(delta).ok_or_else(|| anyhow!("refs underflow/overflow"))?;
1412            refs
1413        } else {
1414            bail!(FxfsError::NotFile);
1415        };
1416        if *refs == 0 {
1417            self.add_to_graveyard(transaction, object_id);
1418
1419            // We might still need to adjust the reference count if delta was something other than
1420            // -1.
1421            if delta != -1 {
1422                *refs = 1;
1423                transaction.add(self.store_object_id, Mutation::ObjectStore(mutation));
1424            }
1425            // Otherwise, we don't commit the mutation as we want to keep reference count as 1 for
1426            // objects in graveyard.
1427            Ok(true)
1428        } else {
1429            transaction.add(self.store_object_id, Mutation::ObjectStore(mutation));
1430            Ok(false)
1431        }
1432    }
1433
1434    // Purges an object that is in the graveyard.
1435    pub async fn tombstone_object(
1436        &self,
1437        object_id: u64,
1438        txn_options: Options<'_>,
1439    ) -> Result<(), Error> {
1440        debug_assert!(
1441            self.tree.find(&ObjectKey::object(object_id)).await?.is_some(),
1442            "Tombstoning missing object"
1443        );
1444        debug_assert!(
1445            self.tree
1446                .find(&ObjectKey::graveyard_entry(self.graveyard_directory_object_id(), object_id))
1447                .await?
1448                .is_some(),
1449            "Tombstoning object not in graveyard"
1450        );
1451        self.key_manager.remove(object_id).await;
1452        let fs = self.filesystem();
1453        let truncate_guard = fs.truncate_guard(self.store_object_id, object_id).await;
1454        self.trim_or_tombstone(object_id, true, txn_options, &truncate_guard).await
1455    }
1456
1457    /// Trim extents beyond the end of a file for all attributes.  This will remove the entry from
1458    /// the graveyard when done.
1459    pub async fn trim(
1460        &self,
1461        object_id: u64,
1462        truncate_guard: &TruncateGuard<'_>,
1463    ) -> Result<(), Error> {
1464        // For the root and root parent store, we would need to use the metadata reservation which
1465        // we don't currently support, so assert that we're not those stores.
1466        assert!(self.parent_store.as_ref().unwrap().parent_store.is_some());
1467
1468        self.trim_or_tombstone(
1469            object_id,
1470            false,
1471            Options { borrow_metadata_space: true, ..Default::default() },
1472            truncate_guard,
1473        )
1474        .await
1475    }
1476
1477    /// Trims or tombstones an object.
1478    async fn trim_or_tombstone(
1479        &self,
1480        object_id: u64,
1481        for_tombstone: bool,
1482        txn_options: Options<'_>,
1483        _truncate_guard: &TruncateGuard<'_>,
1484    ) -> Result<(), Error> {
1485        let fs = self.filesystem();
1486        let mut next_attribute = Some(AttributeId::SORTED_START);
1487        while let Some(attribute_id) = next_attribute.take() {
1488            let mut transaction = fs
1489                .clone()
1490                .new_transaction(
1491                    lock_keys![
1492                        LockKey::object_attribute(self.store_object_id, object_id, attribute_id),
1493                        LockKey::object(self.store_object_id, object_id),
1494                    ],
1495                    txn_options,
1496                )
1497                .await?;
1498
1499            match self
1500                .trim_some(
1501                    &mut transaction,
1502                    object_id,
1503                    attribute_id,
1504                    if for_tombstone {
1505                        TrimMode::Tombstone(TombstoneMode::Object)
1506                    } else {
1507                        TrimMode::UseSize
1508                    },
1509                )
1510                .await?
1511            {
1512                TrimResult::Incomplete => next_attribute = Some(attribute_id),
1513                TrimResult::Done(None) => {
1514                    if for_tombstone
1515                        || matches!(
1516                            self.tree
1517                                .find(&ObjectKey::graveyard_entry(
1518                                    self.graveyard_directory_object_id(),
1519                                    object_id,
1520                                ))
1521                                .await?,
1522                            Some(Item { value: ObjectValue::Trim, .. })
1523                        )
1524                    {
1525                        self.remove_from_graveyard(&mut transaction, object_id);
1526                    }
1527                    // The last attribute was not the default attribute, it may have been added to
1528                    // the graveyard alongside the object.
1529                    if for_tombstone && attribute_id != AttributeId::DATA {
1530                        self.remove_attribute_from_graveyard(
1531                            &mut transaction,
1532                            object_id,
1533                            attribute_id,
1534                        );
1535                    }
1536                }
1537                TrimResult::Done(id) => {
1538                    // Moved to the next attribute. This one is finished and it may have been
1539                    // added to the graveyard alongside the object.
1540                    if for_tombstone && attribute_id != AttributeId::DATA {
1541                        self.remove_attribute_from_graveyard(
1542                            &mut transaction,
1543                            object_id,
1544                            attribute_id,
1545                        );
1546                    }
1547                    next_attribute = id;
1548                }
1549            }
1550
1551            if !transaction.mutations().is_empty() {
1552                transaction.commit().await?;
1553            }
1554        }
1555        Ok(())
1556    }
1557
1558    // Purges an object's attribute that is in the graveyard.
1559    pub async fn tombstone_attribute(
1560        &self,
1561        object_id: u64,
1562        attribute_id: AttributeId,
1563        txn_options: Options<'_>,
1564    ) -> Result<(), Error> {
1565        // Ensure that we don't double-delete things, it should still exist and be in the graveyard.
1566        debug_assert!(
1567            self.tree
1568                .find(&ObjectKey::attribute(object_id, attribute_id, AttributeKey::Attribute))
1569                .await?
1570                .is_some(),
1571            "Tombstoning missing attribute"
1572        );
1573        debug_assert!(
1574            self.tree
1575                .find(&ObjectKey::graveyard_attribute_entry(
1576                    self.graveyard_directory_object_id(),
1577                    object_id,
1578                    attribute_id
1579                ))
1580                .await?
1581                .is_some(),
1582            "Tombstoning attribute not in graveyard"
1583        );
1584        let fs = self.filesystem();
1585        let mut trim_result = TrimResult::Incomplete;
1586        while matches!(trim_result, TrimResult::Incomplete) {
1587            let mut transaction = fs
1588                .clone()
1589                .new_transaction(
1590                    lock_keys![
1591                        LockKey::object_attribute(self.store_object_id, object_id, attribute_id),
1592                        LockKey::object(self.store_object_id, object_id),
1593                    ],
1594                    txn_options,
1595                )
1596                .await?;
1597            trim_result = self
1598                .trim_some(
1599                    &mut transaction,
1600                    object_id,
1601                    attribute_id,
1602                    TrimMode::Tombstone(TombstoneMode::Attribute),
1603                )
1604                .await?;
1605            if let TrimResult::Done(..) = trim_result {
1606                self.remove_attribute_from_graveyard(&mut transaction, object_id, attribute_id)
1607            }
1608            if !transaction.mutations().is_empty() {
1609                transaction.commit().await?;
1610            }
1611        }
1612        Ok(())
1613    }
1614
1615    /// Deletes extents for attribute `attribute_id` in object `object_id`.  Also see the comments
1616    /// for TrimMode and TrimResult. Should hold a lock on the attribute, and the object as it
1617    /// performs a read-modify-write on the sizes.
1618    pub async fn trim_some(
1619        &self,
1620        transaction: &mut Transaction<'_>,
1621        object_id: u64,
1622        attribute_id: AttributeId,
1623        mode: TrimMode,
1624    ) -> Result<TrimResult, Error> {
1625        let layer_set = self.tree.layer_set();
1626        let mut merger = layer_set.merger();
1627
1628        let aligned_offset = match mode {
1629            TrimMode::FromOffset(offset) => {
1630                round_up(offset, self.block_size).ok_or(FxfsError::Inconsistent)?
1631            }
1632            TrimMode::Tombstone(..) => 0,
1633            TrimMode::UseSize => {
1634                let iter = merger
1635                    .query(Query::FullRange(&ObjectKey::attribute(
1636                        object_id,
1637                        attribute_id,
1638                        AttributeKey::Attribute,
1639                    )))
1640                    .await?;
1641                if let Some(item_ref) = iter.get() {
1642                    if item_ref.key.object_id != object_id {
1643                        return Ok(TrimResult::Done(None));
1644                    }
1645
1646                    if let ItemRef {
1647                        key:
1648                            ObjectKey {
1649                                data:
1650                                    ObjectKeyData::Attribute(size_attribute_id, AttributeKey::Attribute),
1651                                ..
1652                            },
1653                        value: ObjectValue::Attribute { size, .. },
1654                        ..
1655                    } = item_ref
1656                    {
1657                        // If we found a different attribute_id, return so we can get the
1658                        // right lock.
1659                        if *size_attribute_id != attribute_id {
1660                            return Ok(TrimResult::Done(Some(*size_attribute_id)));
1661                        }
1662                        round_up(*size, self.block_size).ok_or(FxfsError::Inconsistent)?
1663                    } else {
1664                        // At time of writing, we should always see a size record or None here, but
1665                        // asserting here would be brittle so just skip to the the next attribute
1666                        // instead.
1667                        return Ok(TrimResult::Done(Some(attribute_id.next())));
1668                    }
1669                } else {
1670                    // End of the tree.
1671                    return Ok(TrimResult::Done(None));
1672                }
1673            }
1674        };
1675
1676        // Loop over the extents and deallocate them.
1677        let mut iter = merger
1678            .query(Query::FullRange(&ObjectKey::from_extent(
1679                object_id,
1680                attribute_id,
1681                Extent::search_key_from_offset(aligned_offset),
1682            )))
1683            .await?;
1684        let mut end = 0;
1685        let allocator = self.allocator();
1686        let mut result = TrimResult::Done(None);
1687        let mut deallocated = 0;
1688        let block_size = self.block_size;
1689
1690        while let Some(item_ref) = iter.get() {
1691            if item_ref.key.object_id != object_id {
1692                break;
1693            }
1694            if let ObjectKey {
1695                data: ObjectKeyData::Attribute(extent_attribute_id, attribute_key),
1696                ..
1697            } = item_ref.key
1698            {
1699                if *extent_attribute_id != attribute_id {
1700                    result = TrimResult::Done(Some(*extent_attribute_id));
1701                    break;
1702                }
1703                if let (
1704                    AttributeKey::Extent(extent),
1705                    ObjectValue::Extent(ExtentValue::Some { device_offset, .. }),
1706                ) = (attribute_key, item_ref.value)
1707                {
1708                    let start = std::cmp::max(extent.start, aligned_offset);
1709                    ensure!(start < extent.end, FxfsError::Inconsistent);
1710                    let device_offset = device_offset
1711                        .checked_add(start - extent.start)
1712                        .ok_or(FxfsError::Inconsistent)?;
1713                    end = extent.end;
1714                    let len = end - start;
1715                    let device_range = device_offset..device_offset + len;
1716                    ensure!(device_range.is_aligned(block_size), FxfsError::Inconsistent);
1717                    allocator.deallocate(transaction, self.store_object_id, device_range).await?;
1718                    deallocated += len;
1719                    // Stop if the transaction is getting too big.
1720                    if transaction.mutations().len() >= TRANSACTION_MUTATION_THRESHOLD {
1721                        result = TrimResult::Incomplete;
1722                        break;
1723                    }
1724                }
1725            }
1726            iter.advance().await?;
1727        }
1728
1729        let finished_tombstone_object = matches!(mode, TrimMode::Tombstone(TombstoneMode::Object))
1730            && matches!(result, TrimResult::Done(None));
1731        let finished_tombstone_attribute =
1732            matches!(mode, TrimMode::Tombstone(TombstoneMode::Attribute))
1733                && !matches!(result, TrimResult::Incomplete);
1734        let mut object_mutation = None;
1735        let nodes = if finished_tombstone_object { -1 } else { 0 };
1736        if nodes != 0 || deallocated != 0 {
1737            let mutation = self.txn_get_object_mutation(transaction, object_id).await?;
1738            if let ObjectValue::Object { attributes: ObjectAttributes { project_id, .. }, .. } =
1739                mutation.item.value
1740            {
1741                if let Some(project_id) = project_id {
1742                    transaction.add(
1743                        self.store_object_id,
1744                        Mutation::merge_object(
1745                            ObjectKey::project_usage(self.root_directory_object_id(), project_id),
1746                            ObjectValue::BytesAndNodes {
1747                                bytes: -i64::try_from(deallocated).unwrap(),
1748                                nodes,
1749                            },
1750                        ),
1751                    );
1752                }
1753                object_mutation = Some(mutation);
1754            } else {
1755                panic!("Inconsistent object type.");
1756            }
1757        }
1758
1759        // Deletion marker records *must* be merged so as to consume all other records for the
1760        // object.
1761        if finished_tombstone_object {
1762            transaction.add(
1763                self.store_object_id,
1764                Mutation::merge_object(ObjectKey::object(object_id), ObjectValue::None),
1765            );
1766        } else {
1767            if finished_tombstone_attribute {
1768                transaction.add(
1769                    self.store_object_id,
1770                    Mutation::merge_object(
1771                        ObjectKey::attribute(object_id, attribute_id, AttributeKey::Attribute),
1772                        ObjectValue::None,
1773                    ),
1774                );
1775            }
1776            if deallocated > 0 {
1777                let mut mutation = match object_mutation {
1778                    Some(mutation) => mutation,
1779                    None => self.txn_get_object_mutation(transaction, object_id).await?,
1780                };
1781                transaction.add(
1782                    self.store_object_id,
1783                    Mutation::merge_object(
1784                        ObjectKey::extent(object_id, attribute_id, aligned_offset..end),
1785                        ObjectValue::deleted_extent(),
1786                    ),
1787                );
1788                // Update allocated size.
1789                if let ObjectValue::Object {
1790                    attributes: ObjectAttributes { allocated_size, .. },
1791                    ..
1792                } = &mut mutation.item.value
1793                {
1794                    // The only way for these to fail are if the volume is inconsistent.
1795                    *allocated_size = allocated_size.checked_sub(deallocated).ok_or_else(|| {
1796                        anyhow!(FxfsError::Inconsistent).context("Allocated size overflow")
1797                    })?;
1798                } else {
1799                    panic!("Unexpected object value");
1800                }
1801                transaction.add(self.store_object_id, Mutation::ObjectStore(mutation));
1802            }
1803        }
1804        Ok(result)
1805    }
1806
1807    /// Returns all objects that exist in the parent store that pertain to this object store.
1808    /// Note that this doesn't include the object_id of the store itself which is generally
1809    /// referenced externally.
1810    pub fn parent_objects(&self) -> Vec<u64> {
1811        assert!(self.store_info_handle.get().is_some());
1812        self.store_info.lock().as_ref().unwrap().parent_objects()
1813    }
1814
1815    /// Returns root objects for this store.
1816    pub fn root_objects(&self) -> Vec<u64> {
1817        let mut objects = Vec::new();
1818        let store_info = self.store_info.lock();
1819        let info = store_info.as_ref().unwrap();
1820        if info.root_directory_object_id != INVALID_OBJECT_ID {
1821            objects.push(info.root_directory_object_id);
1822        }
1823        if info.graveyard_directory_object_id != INVALID_OBJECT_ID {
1824            objects.push(info.graveyard_directory_object_id);
1825        }
1826        if info.internal_directory_object_id != INVALID_OBJECT_ID {
1827            objects.push(info.internal_directory_object_id);
1828        }
1829        objects
1830    }
1831
1832    pub fn store_info(&self) -> Option<StoreInfo> {
1833        self.store_info.lock().as_ref().cloned()
1834    }
1835
1836    /// Returns None if called during journal replay.
1837    pub fn store_info_handle_object_id(&self) -> Option<u64> {
1838        self.store_info_handle.get().map(|h| h.object_id())
1839    }
1840
1841    pub fn graveyard_count(&self) -> u64 {
1842        self.graveyard_entries.load(Ordering::Relaxed)
1843    }
1844
1845    /// Called to open a store, before replay of this store's mutations.
1846    async fn open(
1847        parent_store: &Arc<ObjectStore>,
1848        store_object_id: u64,
1849        object_cache: Box<dyn ObjectCache<ObjectKey, ObjectValue>>,
1850    ) -> Result<Arc<ObjectStore>, Error> {
1851        let handle =
1852            ObjectStore::open_object(parent_store, store_object_id, HandleOptions::default(), None)
1853                .await?;
1854
1855        let info = load_store_info(parent_store, store_object_id).await?;
1856        let is_encrypted = info.mutations_key.is_some();
1857
1858        let mut total_layer_size = 0;
1859        let last_object_id;
1860
1861        // TODO(https://fxbug.dev/42178043): the layer size here could be bad and cause overflow.
1862
1863        // If the store is encrypted, we can't open the object tree layers now, but we need to
1864        // compute the size of the layers.
1865        if is_encrypted {
1866            for &oid in &info.layers {
1867                total_layer_size += parent_store.get_file_size(oid).await?;
1868            }
1869            if info.encrypted_mutations_object_id != INVALID_OBJECT_ID {
1870                total_layer_size += layer_size_from_encrypted_mutations_size(
1871                    parent_store.get_file_size(info.encrypted_mutations_object_id).await?,
1872                );
1873            }
1874            last_object_id = LastObjectId::Pending;
1875            ensure!(
1876                matches!(
1877                    info.last_object_id,
1878                    LastObjectIdInfo::Encrypted { .. } | LastObjectIdInfo::Low32Bit { .. }
1879                ),
1880                FxfsError::Inconsistent
1881            );
1882        } else {
1883            last_object_id = match info.last_object_id {
1884                LastObjectIdInfo::Unencrypted { id } => LastObjectId::Unencrypted { id },
1885                LastObjectIdInfo::Low32Bit => {
1886                    LastObjectId::Low32Bit { reserved: HashSet::new(), unreserved: Vec::new() }
1887                }
1888                _ => bail!(FxfsError::Inconsistent),
1889            };
1890        }
1891
1892        let fs = parent_store.filesystem();
1893
1894        let store = ObjectStore::new(
1895            Some(parent_store.clone()),
1896            store_object_id,
1897            fs.clone(),
1898            if is_encrypted { None } else { Some(info) },
1899            object_cache,
1900            None,
1901            if is_encrypted { LockState::Locked } else { LockState::Unencrypted },
1902            last_object_id,
1903        );
1904
1905        assert!(store.store_info_handle.set(handle).is_ok(), "Failed to set store_info_handle!");
1906
1907        if !is_encrypted {
1908            let object_tree_layer_object_ids =
1909                store.store_info.lock().as_ref().unwrap().layers.clone();
1910            let object_layers = store.open_layers(object_tree_layer_object_ids, None).await?;
1911            total_layer_size = object_layers.iter().map(|h| h.get_size()).sum();
1912            store
1913                .tree
1914                .append_layers(object_layers)
1915                .await
1916                .context("Failed to read object store layers")?;
1917        }
1918
1919        fs.object_manager().update_reservation(
1920            store_object_id,
1921            tree::reservation_amount_from_layer_size(total_layer_size),
1922        );
1923
1924        Ok(store)
1925    }
1926
1927    async fn load_store_info(&self) -> Result<StoreInfo, Error> {
1928        load_store_info_from_handle(self.store_info_handle.get().unwrap()).await
1929    }
1930
1931    async fn open_layers(
1932        &self,
1933        object_ids: impl std::iter::IntoIterator<Item = u64>,
1934        crypt: Option<Arc<dyn Crypt>>,
1935    ) -> Result<Vec<DataObjectHandle<ObjectStore>>, Error> {
1936        let parent_store = self.parent_store.as_ref().unwrap();
1937        let mut handles = Vec::new();
1938        for object_id in object_ids {
1939            let handle = ObjectStore::open_object(
1940                &parent_store,
1941                object_id,
1942                HandleOptions::default(),
1943                crypt.clone(),
1944            )
1945            .await
1946            .with_context(|| format!("Failed to open layer file {}", object_id))?;
1947            handles.push(handle);
1948        }
1949        Ok(handles)
1950    }
1951
1952    /// Unlocks a store so that it is ready to be used.
1953    /// This is not thread-safe.
1954    pub async fn unlock(
1955        self: &Arc<Self>,
1956        owner: Weak<dyn StoreOwner>,
1957        crypt: Arc<dyn Crypt>,
1958    ) -> Result<(), Error> {
1959        self.unlock_inner(owner, crypt, /*read_only=*/ false).await
1960    }
1961
1962    /// Unlocks a store so that it is ready to be read from.
1963    /// The store will generally behave like it is still locked: when flushed, the store will
1964    /// write out its mutations into the encrypted mutations file, rather than directly updating
1965    /// the layer files of the object store.
1966    /// Re-locking the store (which *must* be done with `Self::lock_read_only` will not trigger a
1967    /// flush, although the store might still be flushed during other operations.
1968    /// This is not thread-safe.
1969    pub async fn unlock_read_only(self: &Arc<Self>, crypt: Arc<dyn Crypt>) -> Result<(), Error> {
1970        self.unlock_inner(NO_OWNER, crypt, /*read_only=*/ true).await
1971    }
1972
1973    async fn unlock_inner(
1974        self: &Arc<Self>,
1975        owner: Weak<dyn StoreOwner>,
1976        crypt: Arc<dyn Crypt>,
1977        read_only: bool,
1978    ) -> Result<(), Error> {
1979        // Unless we are unlocking the store as read-only, the filesystem must not be read-only.
1980        assert!(read_only || !self.filesystem().options().read_only);
1981        match &*self.lock_state.lock() {
1982            LockState::Locked => {}
1983            LockState::Unencrypted => bail!(FxfsError::InvalidArgs),
1984            LockState::Invalid | LockState::Deleted => bail!(FxfsError::Internal),
1985            LockState::Unlocked { .. } | LockState::UnlockedReadOnly(..) => {
1986                bail!(FxfsError::AlreadyBound)
1987            }
1988            LockState::Unknown => panic!("Store was unlocked before replay"),
1989            LockState::Locking => panic!("Store is being locked"),
1990            LockState::Unlocking => panic!("Store is being unlocked"),
1991        }
1992        // We must lock flushing since that can modify store_info and the encrypted mutations file.
1993        let keys = lock_keys![LockKey::flush(self.store_object_id())];
1994        let fs = self.filesystem();
1995        let guard = fs.lock_manager().write_lock(keys).await;
1996
1997        let store_info = self.load_store_info().await?;
1998
1999        self.tree
2000            .append_layers(
2001                self.open_layers(store_info.layers.iter().cloned(), Some(crypt.clone())).await?,
2002            )
2003            .await
2004            .context("Failed to read object tree layer file contents")?;
2005
2006        let wrapped_key =
2007            fxfs_crypto::WrappedKey::Fxfs(store_info.mutations_key.clone().unwrap().into());
2008        let unwrapped_key = crypt
2009            .unwrap_key(&wrapped_key, self.store_object_id)
2010            .await
2011            .context("Failed to unwrap mutations keys")?;
2012        // The ChaCha20 stream cipher we use supports up to 64 GiB.  By default we'll roll the key
2013        // after every 128 MiB.  Here we just need to pick a number that won't cause issues if it
2014        // wraps, so we just use u32::MAX (the offset is u64).
2015        ensure!(store_info.mutations_cipher_offset <= u32::MAX as u64, FxfsError::Inconsistent);
2016        let mut mutations_cipher =
2017            StreamCipher::new(&unwrapped_key, store_info.mutations_cipher_offset);
2018
2019        match &store_info.last_object_id {
2020            LastObjectIdInfo::Encrypted { id, key } => {
2021                let wrapped_key = fxfs_crypto::WrappedKey::Fxfs(key.clone().into());
2022                *self.last_object_id.lock() = LastObjectId::Encrypted {
2023                    id: *id,
2024                    cipher: Box::new(Ff1::new(
2025                        &crypt.unwrap_key(&wrapped_key, self.store_object_id).await?,
2026                    )),
2027                };
2028            }
2029            LastObjectIdInfo::Low32Bit => {
2030                *self.last_object_id.lock() = LastObjectId::Low32Bit {
2031                    reserved: Default::default(),
2032                    unreserved: Default::default(),
2033                }
2034            }
2035            _ => unreachable!(),
2036        }
2037
2038        // Apply the encrypted mutations.
2039        let mut mutations = {
2040            if store_info.encrypted_mutations_object_id == INVALID_OBJECT_ID {
2041                EncryptedMutations::default()
2042            } else {
2043                let parent_store = self.parent_store.as_ref().unwrap();
2044                let handle = ObjectStore::open_object(
2045                    &parent_store,
2046                    store_info.encrypted_mutations_object_id,
2047                    HandleOptions::default(),
2048                    None,
2049                )
2050                .await?;
2051                let mut cursor = std::io::Cursor::new(
2052                    handle
2053                        .contents(MAX_ENCRYPTED_MUTATIONS_SIZE)
2054                        .await
2055                        .context(FxfsError::Inconsistent)?,
2056                );
2057                let mut mutations = EncryptedMutations::deserialize_with_version(&mut cursor)
2058                    .context("Failed to deserialize EncryptedMutations")?
2059                    .0;
2060                let len = cursor.get_ref().len() as u64;
2061                while cursor.position() < len {
2062                    mutations.extend(
2063                        &EncryptedMutations::deserialize_with_version(&mut cursor)
2064                            .context("Failed to deserialize EncryptedMutations")?
2065                            .0,
2066                    );
2067                }
2068                mutations
2069            }
2070        };
2071
2072        // This assumes that the journal has no buffered mutations for this store (see Self::lock).
2073        let journaled = EncryptedMutations::from_replayed_mutations(
2074            self.store_object_id,
2075            fs.journal()
2076                .read_transactions_for_object(self.store_object_id)
2077                .await
2078                .context("Failed to read encrypted mutations from journal")?,
2079        );
2080        mutations.extend(&journaled);
2081
2082        let _ = std::mem::replace(&mut *self.lock_state.lock(), LockState::Unlocking);
2083        *self.store_info.lock() = Some(store_info);
2084
2085        // If we fail, clean up.
2086        let clean_up = scopeguard::guard((), |_| {
2087            *self.lock_state.lock() = LockState::Locked;
2088            *self.store_info.lock() = None;
2089            // Make sure we don't leave unencrypted data lying around in memory.
2090            self.tree.reset();
2091        });
2092
2093        let EncryptedMutations { transactions, mut data, mutations_key_roll } = mutations;
2094
2095        let mut slice = &mut data[..];
2096        let mut last_offset = 0;
2097        for (offset, key) in mutations_key_roll {
2098            let split_offset = offset
2099                .checked_sub(last_offset)
2100                .ok_or(FxfsError::Inconsistent)
2101                .context("Invalid mutation key roll offset")?;
2102            last_offset = offset;
2103            ensure!(split_offset <= slice.len(), FxfsError::Inconsistent);
2104            let (old, new) = slice.split_at_mut(split_offset);
2105            mutations_cipher.decrypt(old);
2106            let unwrapped_key = crypt
2107                .unwrap_key(&fxfs_crypto::WrappedKey::Fxfs(key.into()), self.store_object_id)
2108                .await
2109                .context("Failed to unwrap mutations keys")?;
2110            mutations_cipher = StreamCipher::new(&unwrapped_key, 0);
2111            slice = new;
2112        }
2113        mutations_cipher.decrypt(slice);
2114
2115        // Always roll the mutations key when we unlock which guarantees we won't reuse a
2116        // previous key and nonce.
2117        self.roll_mutations_key(crypt.as_ref()).await?;
2118
2119        let mut cursor = std::io::Cursor::new(data);
2120        for (checkpoint, count) in transactions {
2121            let context = ApplyContext { mode: ApplyMode::Replay, checkpoint };
2122            for _ in 0..count {
2123                let mutation =
2124                    Mutation::deserialize_from_version(&mut cursor, context.checkpoint.version)
2125                        .context("failed to deserialize encrypted mutation")?;
2126                self.apply_mutation(mutation, &context, AssocObj::None)
2127                    .context("failed to apply encrypted mutation")?;
2128            }
2129        }
2130
2131        *self.lock_state.lock() = if read_only {
2132            LockState::UnlockedReadOnly(crypt)
2133        } else {
2134            LockState::Unlocked { owner, crypt }
2135        };
2136
2137        // To avoid unbounded memory growth, we should flush the encrypted mutations now. Otherwise
2138        // it's possible for more writes to be queued and for the store to be locked before we can
2139        // flush anything and that can repeat.
2140        std::mem::drop(guard);
2141
2142        if !read_only && !self.filesystem().options().read_only {
2143            self.flush_with_reason(flush::Reason::Unlock).await?;
2144
2145            // Reap purged files within this store.
2146            let _ = self.filesystem().graveyard().initial_reap(&self).await?;
2147        }
2148
2149        // Return and cancel the clean up.
2150        Ok(ScopeGuard::into_inner(clean_up))
2151    }
2152
2153    pub fn is_locked(&self) -> bool {
2154        matches!(
2155            *self.lock_state.lock(),
2156            LockState::Locked | LockState::Locking | LockState::Unknown
2157        )
2158    }
2159
2160    /// NB: This is not the converse of `is_locked`, as there are lock states where neither are
2161    /// true.
2162    pub fn is_unlocked(&self) -> bool {
2163        matches!(
2164            *self.lock_state.lock(),
2165            LockState::Unlocked { .. } | LockState::UnlockedReadOnly { .. } | LockState::Unlocking
2166        )
2167    }
2168
2169    pub fn is_unknown(&self) -> bool {
2170        matches!(*self.lock_state.lock(), LockState::Unknown)
2171    }
2172
2173    pub fn is_encrypted(&self) -> bool {
2174        self.store_info.lock().as_ref().unwrap().mutations_key.is_some()
2175    }
2176
2177    // Locks a store.
2178    // This operation will take a flush lock on the store, in case any flushes are ongoing.  Any
2179    // ongoing store accesses might be interrupted by this.  See `Self::crypt`.
2180    // Whilst this can return an error, the store will be placed into an unusable but safe state
2181    // (i.e. no lingering unencrypted data) if an error is encountered.
2182    pub async fn lock(&self) -> Result<(), Error> {
2183        // We must lock flushing since it is not safe for that to be happening whilst we are locking
2184        // the store.
2185        let keys = lock_keys![LockKey::flush(self.store_object_id())];
2186        let fs = self.filesystem();
2187        let _guard = fs.lock_manager().write_lock(keys).await;
2188
2189        {
2190            let mut lock_state = self.lock_state.lock();
2191            if let LockState::Unlocked { .. } = &*lock_state {
2192                *lock_state = LockState::Locking;
2193            } else {
2194                panic!("Unexpected lock state: {:?}", *lock_state);
2195            }
2196        }
2197
2198        // Sync the journal now to ensure that any buffered mutations for this store make it out to
2199        // disk.  This is necessary to be able to unlock the store again.
2200        // We need to establish a barrier at this point (so that the journaled writes are observable
2201        // by any future attempts to unlock the store), hence the flush_device.
2202        let sync_result =
2203            self.filesystem().sync(SyncOptions { flush_device: true, ..Default::default() }).await;
2204
2205        *self.lock_state.lock() = if let Err(error) = &sync_result {
2206            error!(error:?; "Failed to sync journal; store will no longer be usable");
2207            LockState::Invalid
2208        } else {
2209            LockState::Locked
2210        };
2211        self.key_manager.clear();
2212        *self.store_info.lock() = None;
2213        self.tree.reset();
2214
2215        sync_result
2216    }
2217
2218    // Locks a store which was previously unlocked read-only (see `Self::unlock_read_only`).  Data
2219    // is not flushed, and instead any journaled mutations are buffered back into the ObjectStore
2220    // and will be replayed next time the store is unlocked.
2221    pub fn lock_read_only(&self) {
2222        *self.lock_state.lock() = LockState::Locked;
2223        *self.store_info.lock() = None;
2224        self.tree.reset();
2225    }
2226
2227    // Returns None if the object ID cipher needs to be created or rolled, or a more expensive
2228    // algorithm needs to be used.
2229    fn maybe_get_next_object_id(&self) -> Option<ReservedId<'_>> {
2230        self.last_object_id.lock().try_get_next().map(|id| ReservedId::new(self, id))
2231    }
2232
2233    /// Returns a new object ID that can be used.  This will create an object ID cipher if needed.
2234    ///
2235    /// If the object ID key needs to be rolled, a new transaction will be created and committed.
2236    /// This transaction does not take the filesystem lock, hence `txn_guard`.
2237    pub(super) async fn get_next_object_id(
2238        &self,
2239        txn_guard: &TxnGuard<'_>,
2240    ) -> Result<ReservedId<'_>, Error> {
2241        {
2242            let mut last_object_id = self.last_object_id.lock();
2243            if let Some(id) = last_object_id.try_get_next() {
2244                return Ok(ReservedId::new(self, id));
2245            }
2246            ensure!(
2247                !matches!(&*last_object_id, LastObjectId::Unencrypted { .. }),
2248                FxfsError::Inconsistent
2249            );
2250        }
2251
2252        let parent_store = self.parent_store().unwrap();
2253
2254        // Create a transaction (which has a lock) and then check again.
2255        //
2256        // NOTE: Since this can be a nested transaction, we must take care to avoid deadlocks; no
2257        // more locks should be taken whilst we hold this lock.
2258        let mut transaction = self
2259            .filesystem()
2260            .new_transaction(
2261                lock_keys![LockKey::object(parent_store.store_object_id, self.store_object_id)],
2262                Options {
2263                    // We must skip journal checks because this transaction might be needed to
2264                    // compact.
2265                    skip_journal_checks: true,
2266                    borrow_metadata_space: true,
2267                    txn_guard: Some(txn_guard),
2268                    ..Default::default()
2269                },
2270            )
2271            .await?;
2272
2273        let mut next_id_hi = 0;
2274
2275        let is_low_32_bit = {
2276            let mut last_object_id = self.last_object_id.lock();
2277            if let Some(id) = last_object_id.try_get_next() {
2278                // Something else raced and created/rolled the cipher.
2279                return Ok(ReservedId::new(self, id));
2280            }
2281
2282            match &*last_object_id {
2283                LastObjectId::Encrypted { id, .. } => {
2284                    // It shouldn't be possible for last_object_id to wrap within our lifetime, so
2285                    // if this happens, it's most likely due to corruption.
2286                    next_id_hi =
2287                        id.checked_add(1 << 32).ok_or(FxfsError::Inconsistent)? & OBJECT_ID_HI_MASK;
2288
2289                    info!(store_id = self.store_object_id; "Rolling object ID key");
2290
2291                    false
2292                }
2293                LastObjectId::Low32Bit { .. } => true,
2294                _ => unreachable!(),
2295            }
2296        };
2297
2298        if is_low_32_bit {
2299            // Keep picking an object ID at random until we find one free.
2300
2301            // To avoid races, this must be before we capture the layer set.
2302            self.last_object_id.lock().drain_unreserved();
2303
2304            let layer_set = self.tree.layer_set();
2305            let mut key = ObjectKey::object(0);
2306            loop {
2307                let next_id = rand::rng().next_u32() as u64;
2308                let Some(next_id) = NonZero::new(next_id) else { continue };
2309                if self.last_object_id.lock().is_reserved(next_id.get()) {
2310                    continue;
2311                }
2312                key.object_id = next_id.get();
2313                if layer_set.key_exists(&key).await? == Existence::Missing {
2314                    self.last_object_id.lock().reserve(next_id.get());
2315                    return Ok(ReservedId::new(self, next_id));
2316                }
2317            }
2318        } else {
2319            // Create a key.
2320            let (object_id_wrapped, object_id_unwrapped) = self
2321                .crypt()
2322                .unwrap()
2323                .create_key(self.store_object_id, KeyPurpose::Metadata)
2324                .await?;
2325
2326            // Normally we would use a mutation to note the updated key, but that would complicate
2327            // replay.  During replay, we need to keep track of the highest used object ID and this
2328            // is done by watching mutations to see when we create objects, and then decrypting
2329            // the object ID.  This relies on the unwrapped key being available, so as soon as
2330            // we detect the key has changed, we would need to immediately unwrap the key via the
2331            // crypt service.  Currently, this isn't easy to do during replay.  An option we could
2332            // consider would be to include the unencrypted object ID when we create objects, which
2333            // would avoid us having to decrypt the object ID during replay.
2334            //
2335            // For now and for historical reasons, the approach we take is to just write a new
2336            // version of StoreInfo here.  We must take care that we only update the key and not any
2337            // other information contained within StoreInfo because other information should only be
2338            // updated when we flush.  We are holding the lock on the StoreInfo file, so this will
2339            // prevent potential races with flushing.  To make sure we only change the key, we read
2340            // StoreInfo from storage rather than using our in-memory copy.  This won't be
2341            // performant, but rolling the object ID key will be extremely rare.
2342            let new_store_info = StoreInfo {
2343                last_object_id: LastObjectIdInfo::Encrypted {
2344                    id: next_id_hi,
2345                    key: object_id_wrapped.clone(),
2346                },
2347                ..self.load_store_info().await?
2348            };
2349
2350            self.write_store_info(&mut transaction, &new_store_info).await?;
2351
2352            transaction
2353                .commit_with_callback(|_| {
2354                    self.store_info.lock().as_mut().unwrap().last_object_id =
2355                        new_store_info.last_object_id;
2356                    match &mut *self.last_object_id.lock() {
2357                        LastObjectId::Encrypted { id, cipher } => {
2358                            **cipher = Ff1::new(&object_id_unwrapped);
2359                            *id = next_id_hi;
2360                            ReservedId::new(
2361                                self,
2362                                NonZero::new(next_id_hi | cipher.encrypt(0) as u64).unwrap(),
2363                            )
2364                        }
2365                        _ => unreachable!(),
2366                    }
2367                })
2368                .await
2369        }
2370    }
2371
2372    /// Query the next object ID that will be used. Intended for use when checking filesystem
2373    /// consistency. Prefer [`Self::get_next_object_id()`] for general use.
2374    pub(crate) fn query_next_object_id(&self) -> u64 {
2375        self.last_object_id.lock().peek_next()
2376    }
2377
2378    fn allocator(&self) -> Arc<Allocator> {
2379        self.filesystem().allocator()
2380    }
2381
2382    // If |transaction| has an impending mutation for the underlying object, returns that.
2383    // Otherwise, looks up the object from the tree and returns a suitable mutation for it.  The
2384    // mutation is returned here rather than the item because the mutation includes the operation
2385    // which has significance: inserting an object implies it's the first of its kind unlike
2386    // replacing an object.
2387    async fn txn_get_object_mutation(
2388        &self,
2389        transaction: &Transaction<'_>,
2390        object_id: u64,
2391    ) -> Result<ObjectStoreMutation, Error> {
2392        if let Some(mutation) =
2393            transaction.get_object_mutation(self.store_object_id, ObjectKey::object(object_id))
2394        {
2395            Ok(mutation.clone())
2396        } else {
2397            Ok(ObjectStoreMutation {
2398                item: self
2399                    .tree
2400                    .find(&ObjectKey::object(object_id))
2401                    .await?
2402                    .ok_or(FxfsError::Inconsistent)
2403                    .context("Object id missing")?,
2404                op: Operation::ReplaceOrInsert,
2405            })
2406        }
2407    }
2408
2409    /// Like txn_get_object_mutation but with expanded visibility.
2410    /// Only available in migration code.
2411    #[cfg(feature = "migration")]
2412    pub async fn get_object_mutation(
2413        &self,
2414        transaction: &Transaction<'_>,
2415        object_id: u64,
2416    ) -> Result<ObjectStoreMutation, Error> {
2417        self.txn_get_object_mutation(transaction, object_id).await
2418    }
2419
2420    fn update_last_object_id(&self, object_id: u64) {
2421        let mut last_object_id = self.last_object_id.lock();
2422        match &mut *last_object_id {
2423            LastObjectId::Pending => unreachable!(),
2424            LastObjectId::Unencrypted { id } => {
2425                if object_id > *id {
2426                    *id = object_id
2427                }
2428            }
2429            LastObjectId::Encrypted { id, cipher } => {
2430                // For encrypted stores, object_id will be encrypted here, so we must decrypt first.
2431
2432                // If the object ID cipher has been rolled, then it's possible we might see object
2433                // IDs that were generated using a different cipher so the decrypt here will return
2434                // the wrong value, but that won't matter because the hi part of the object ID
2435                // should still discriminate.
2436                let object_id =
2437                    object_id & OBJECT_ID_HI_MASK | cipher.decrypt(object_id as u32) as u64;
2438                if object_id > *id {
2439                    *id = object_id;
2440                }
2441            }
2442            LastObjectId::Low32Bit { .. } => {}
2443        }
2444    }
2445
2446    /// If possible, converts the given object ID to its unencrypted value.  Returns None if it is
2447    /// not possible to convert to its unencrypted value because the key is unavailable.
2448    pub fn to_unencrypted_object_id(&self, object_id: u64) -> Option<u64> {
2449        let last_object_id = self.last_object_id.lock();
2450        match &*last_object_id {
2451            LastObjectId::Pending => None,
2452            LastObjectId::Unencrypted { .. } | LastObjectId::Low32Bit { .. } => Some(object_id),
2453            LastObjectId::Encrypted { id, cipher } => {
2454                if id & OBJECT_ID_HI_MASK != object_id & OBJECT_ID_HI_MASK {
2455                    None
2456                } else {
2457                    Some(object_id & OBJECT_ID_HI_MASK | cipher.decrypt(object_id as u32) as u64)
2458                }
2459            }
2460        }
2461    }
2462
2463    /// Adds the specified object to the graveyard.
2464    pub fn add_to_graveyard(&self, transaction: &mut Transaction<'_>, object_id: u64) {
2465        let graveyard_id = self.graveyard_directory_object_id();
2466        assert_ne!(graveyard_id, INVALID_OBJECT_ID);
2467        transaction.add(
2468            self.store_object_id,
2469            Mutation::replace_or_insert_object(
2470                ObjectKey::graveyard_entry(graveyard_id, object_id),
2471                ObjectValue::Some,
2472            ),
2473        );
2474    }
2475
2476    /// Removes the specified object from the graveyard.  NB: Care should be taken when calling
2477    /// this because graveyard entries are used for purging deleted files *and* for trimming
2478    /// extents.  For example, consider the following sequence:
2479    ///
2480    ///     1. Add Trim graveyard entry.
2481    ///     2. Replace with Some graveyard entry (see above).
2482    ///     3. Remove graveyard entry.
2483    ///
2484    /// If the desire in #3 is just to cancel the effect of the Some entry, then #3 should
2485    /// actually be:
2486    ///
2487    ///     3. Replace with Trim graveyard entry.
2488    pub fn remove_from_graveyard(&self, transaction: &mut Transaction<'_>, object_id: u64) {
2489        transaction.add(
2490            self.store_object_id,
2491            Mutation::replace_or_insert_object(
2492                ObjectKey::graveyard_entry(self.graveyard_directory_object_id(), object_id),
2493                ObjectValue::None,
2494            ),
2495        );
2496    }
2497
2498    /// Removes the specified attribute from the graveyard. Unlike object graveyard entries,
2499    /// attribute graveyard entries only have one functionality (i.e. to purge deleted attributes)
2500    /// so the caller does not need to be concerned about replacing the graveyard attribute entry
2501    /// with its prior state when cancelling it. See comment on `remove_from_graveyard()`.
2502    pub fn remove_attribute_from_graveyard(
2503        &self,
2504        transaction: &mut Transaction<'_>,
2505        object_id: u64,
2506        attribute_id: AttributeId,
2507    ) {
2508        transaction.add(
2509            self.store_object_id,
2510            Mutation::replace_or_insert_object(
2511                ObjectKey::graveyard_attribute_entry(
2512                    self.graveyard_directory_object_id(),
2513                    object_id,
2514                    attribute_id,
2515                ),
2516                ObjectValue::None,
2517            ),
2518        );
2519    }
2520
2521    // Roll the mutations key.  The new key will be written for the next encrypted mutation.
2522    async fn roll_mutations_key(&self, crypt: &dyn Crypt) -> Result<(), Error> {
2523        let (wrapped_key, unwrapped_key) =
2524            crypt.create_key(self.store_object_id, KeyPurpose::Metadata).await?;
2525
2526        // The mutations_cipher lock must be held for the duration so that mutations_cipher and
2527        // store_info are updated atomically.  Otherwise, write_mutation could find a new cipher but
2528        // end up writing the wrong wrapped key.
2529        let mut cipher = self.mutations_cipher.lock();
2530        *cipher = Some(StreamCipher::new(&unwrapped_key, 0));
2531        self.store_info.lock().as_mut().unwrap().mutations_key = Some(wrapped_key);
2532        // mutations_cipher_offset is updated by flush.
2533        Ok(())
2534    }
2535
2536    // When the symlink is unlocked, this function decrypts `link` and returns a bag of bytes that
2537    // is identical to that which was passed in as the target on `create_symlink`.
2538    // If the symlink is locked, this function hashes the encrypted `link` with Sha256 in order to
2539    // get a standard length and then base64 encodes the hash and returns that to the caller.
2540    pub async fn read_encrypted_symlink(
2541        &self,
2542        object_id: u64,
2543        link: Vec<u8>,
2544    ) -> Result<Vec<u8>, Error> {
2545        let mut link = link;
2546        let key = self
2547            .key_manager()
2548            .get_fscrypt_key(object_id, self.crypt().unwrap().as_ref(), async || {
2549                self.get_keys(object_id).await
2550            })
2551            .await?;
2552        if let Some(key) = key.into_cipher() {
2553            key.decrypt_symlink(object_id, &mut link)?;
2554            Ok(link)
2555        } else {
2556            // Locked symlinks are encoded using a hash_code of 0.
2557            let proxy_filename =
2558                fscrypt::proxy_filename::ProxyFilename::new_with_hash_code(0, &link);
2559            let proxy_filename_str: String = proxy_filename.into();
2560            Ok(proxy_filename_str.into_bytes())
2561        }
2562    }
2563
2564    /// Returns the link of a symlink object.
2565    pub async fn read_symlink(&self, object_id: u64) -> Result<Vec<u8>, Error> {
2566        match self.tree.find(&ObjectKey::object(object_id)).await? {
2567            None => bail!(FxfsError::NotFound),
2568            Some(Item {
2569                value: ObjectValue::Object { kind: ObjectKind::EncryptedSymlink { link, .. }, .. },
2570                ..
2571            }) => self.read_encrypted_symlink(object_id, link.to_vec()).await,
2572            Some(Item {
2573                value: ObjectValue::Object { kind: ObjectKind::Symlink { link, .. }, .. },
2574                ..
2575            }) => Ok(link.to_vec()),
2576            Some(item) => Err(anyhow!(FxfsError::Inconsistent)
2577                .context(format!("Unexpected item in lookup: {item:?}"))),
2578        }
2579    }
2580
2581    /// Retrieves the wrapped keys for the given object.  The keys *should* be known to exist and it
2582    /// will be considered an inconsistency if they don't.
2583    pub async fn get_keys(&self, object_id: u64) -> Result<EncryptionKeys, Error> {
2584        match self.tree.find(&ObjectKey::keys(object_id)).await?.ok_or(FxfsError::Inconsistent)? {
2585            Item { value: ObjectValue::Keys(keys), .. } => Ok(keys),
2586            _ => Err(anyhow!(FxfsError::Inconsistent).context("open_object: Expected keys")),
2587        }
2588    }
2589
2590    pub async fn update_attributes<'a>(
2591        &self,
2592        transaction: &mut Transaction<'a>,
2593        object_id: u64,
2594        node_attributes: Option<&fio::MutableNodeAttributes>,
2595        change_time: Option<Timestamp>,
2596    ) -> Result<(), Error> {
2597        if change_time.is_none() {
2598            if let Some(attributes) = node_attributes {
2599                let empty_attributes = fio::MutableNodeAttributes { ..Default::default() };
2600                if *attributes == empty_attributes {
2601                    return Ok(());
2602                }
2603            } else {
2604                return Ok(());
2605            }
2606        }
2607        let mut mutation = self.txn_get_object_mutation(transaction, object_id).await?;
2608        if let ObjectValue::Object { ref mut attributes, .. } = mutation.item.value {
2609            if let Some(time) = change_time {
2610                attributes.change_time = time;
2611            }
2612            if let Some(node_attributes) = node_attributes {
2613                if let Some(time) = node_attributes.creation_time {
2614                    attributes.creation_time = Timestamp::from_nanos(time);
2615                }
2616                if let Some(time) = node_attributes.modification_time {
2617                    attributes.modification_time = Timestamp::from_nanos(time);
2618                }
2619                if let Some(time) = node_attributes.access_time {
2620                    attributes.access_time = Timestamp::from_nanos(time);
2621                }
2622                if node_attributes.mode.is_some()
2623                    || node_attributes.uid.is_some()
2624                    || node_attributes.gid.is_some()
2625                    || node_attributes.rdev.is_some()
2626                {
2627                    if let Some(a) = &mut attributes.posix_attributes {
2628                        if let Some(mode) = node_attributes.mode {
2629                            a.mode = mode;
2630                        }
2631                        if let Some(uid) = node_attributes.uid {
2632                            a.uid = uid;
2633                        }
2634                        if let Some(gid) = node_attributes.gid {
2635                            a.gid = gid;
2636                        }
2637                        if let Some(rdev) = node_attributes.rdev {
2638                            a.rdev = rdev;
2639                        }
2640                    } else {
2641                        attributes.posix_attributes = Some(PosixAttributes {
2642                            mode: node_attributes.mode.unwrap_or_default(),
2643                            uid: node_attributes.uid.unwrap_or_default(),
2644                            gid: node_attributes.gid.unwrap_or_default(),
2645                            rdev: node_attributes.rdev.unwrap_or_default(),
2646                        });
2647                    }
2648                }
2649            }
2650        } else {
2651            bail!(
2652                anyhow!(FxfsError::Inconsistent)
2653                    .context("ObjectStore.update_attributes: Expected object value")
2654            );
2655        };
2656        transaction.add(self.store_object_id(), Mutation::ObjectStore(mutation));
2657        Ok(())
2658    }
2659
2660    // Updates and commits the changes to access time in ObjectProperties. The update matches
2661    // Linux's RELATIME. That is, access time is updated to the current time if access time is less
2662    // than or equal to the last modification or status change, or if it has been more than a day
2663    // since the last access.  `precondition` is a condition to be checked *after* taking the lock
2664    // on the object.  If `precondition` returns false, no update will be performed.
2665    pub async fn update_access_time(
2666        &self,
2667        object_id: u64,
2668        props: &mut ObjectProperties,
2669        precondition: impl FnOnce() -> bool,
2670    ) -> Result<(), Error> {
2671        let access_time = props.access_time.as_nanos();
2672        let modification_time = props.modification_time.as_nanos();
2673        let change_time = props.change_time.as_nanos();
2674        let now = Timestamp::now();
2675        if access_time <= modification_time
2676            || access_time <= change_time
2677            || access_time
2678                < now.as_nanos()
2679                    - Timestamp::from(std::time::Duration::from_secs(24 * 60 * 60)).as_nanos()
2680        {
2681            let mut transaction = self
2682                .filesystem()
2683                .clone()
2684                .new_transaction(
2685                    lock_keys![LockKey::object(self.store_object_id, object_id,)],
2686                    Options { borrow_metadata_space: true, ..Default::default() },
2687                )
2688                .await?;
2689            if precondition() {
2690                self.update_attributes(
2691                    &mut transaction,
2692                    object_id,
2693                    Some(&fio::MutableNodeAttributes {
2694                        access_time: Some(now.as_nanos()),
2695                        ..Default::default()
2696                    }),
2697                    None,
2698                )
2699                .await?;
2700                transaction.commit().await?;
2701                props.access_time = now;
2702            }
2703        }
2704        Ok(())
2705    }
2706
2707    async fn write_store_info<'a>(
2708        &'a self,
2709        transaction: &mut Transaction<'a>,
2710        info: &StoreInfo,
2711    ) -> Result<(), Error> {
2712        let mut serialized_info = Vec::new();
2713        info.serialize_with_version(&mut serialized_info)?;
2714        let mut buf = self.device.allocate_buffer(serialized_info.len()).await;
2715        buf.as_mut_slice().copy_from_slice(&serialized_info[..]);
2716        self.store_info_handle.get().unwrap().txn_write(transaction, 0u64, buf.as_ref()).await
2717    }
2718
2719    pub fn mark_deleted(&self) {
2720        *self.lock_state.lock() = LockState::Deleted;
2721    }
2722
2723    #[cfg(test)]
2724    pub(crate) fn test_set_last_object_id(&self, object_id: u64) {
2725        match &mut *self.last_object_id.lock() {
2726            LastObjectId::Encrypted { id, .. } => *id = object_id,
2727            _ => unreachable!(),
2728        }
2729    }
2730
2731    /// Looks up the size of the attribute. Returns an error if either the object or attribute
2732    /// doesn't exist.
2733    pub async fn get_attribute_size(
2734        &self,
2735        object_id: u64,
2736        attribute_id: AttributeId,
2737    ) -> Result<u64, Error> {
2738        let item = self
2739            .tree
2740            .find(&ObjectKey::attribute(object_id, attribute_id, AttributeKey::Attribute))
2741            .await?
2742            .ok_or(FxfsError::NotFound)?;
2743        let size = match item.value {
2744            ObjectValue::Attribute { size, .. } => size,
2745            ObjectValue::VerifiedAttribute { size, .. } => size,
2746            _ => bail!(FxfsError::Inconsistent),
2747        };
2748        Ok(size)
2749    }
2750}
2751
2752#[async_trait]
2753impl JournalingObject for ObjectStore {
2754    fn apply_mutation(
2755        &self,
2756        mutation: Mutation,
2757        context: &ApplyContext<'_, '_>,
2758        _assoc_obj: AssocObj<'_>,
2759    ) -> Result<(), Error> {
2760        match &*self.lock_state.lock() {
2761            LockState::Locked | LockState::Locking => {
2762                ensure!(
2763                    matches!(mutation, Mutation::BeginFlush | Mutation::EndFlush)
2764                        || matches!(
2765                            mutation,
2766                            Mutation::EncryptedObjectStore(_) | Mutation::UpdateMutationsKey(_)
2767                                if context.mode.is_replay()
2768                        ),
2769                    anyhow!(FxfsError::Inconsistent)
2770                        .context(format!("Unexpected mutation for encrypted store: {mutation:?}"))
2771                );
2772            }
2773            LockState::Invalid
2774            | LockState::Unlocking
2775            | LockState::Unencrypted
2776            | LockState::Unlocked { .. }
2777            | LockState::UnlockedReadOnly(..)
2778            | LockState::Deleted => {}
2779            lock_state @ _ => panic!("Unexpected lock state: {lock_state:?}"),
2780        }
2781        match mutation {
2782            Mutation::ObjectStore(ObjectStoreMutation { item, op }) => {
2783                match op {
2784                    Operation::Insert => {
2785                        let mut unreserve_id = INVALID_OBJECT_ID;
2786                        // If we are inserting an object record for the first time, it signifies the
2787                        // birth of the object so we need to adjust the object count.
2788                        if matches!(item.value, ObjectValue::Object { .. }) {
2789                            {
2790                                let info = &mut self.store_info.lock();
2791                                let object_count = &mut info.as_mut().unwrap().object_count;
2792                                *object_count = object_count.saturating_add(1);
2793                            }
2794                            if context.mode.is_replay() {
2795                                self.update_last_object_id(item.key.object_id);
2796                            } else {
2797                                unreserve_id = item.key.object_id;
2798                            }
2799                        } else if !context.mode.is_replay()
2800                            && matches!(
2801                                item.key.data,
2802                                ObjectKeyData::GraveyardEntry { .. }
2803                                    | ObjectKeyData::GraveyardAttributeEntry { .. }
2804                            )
2805                        {
2806                            if matches!(item.value, ObjectValue::Some | ObjectValue::Trim) {
2807                                self.graveyard_entries.fetch_add(1, Ordering::Relaxed);
2808                            } else if matches!(item.value, ObjectValue::None) {
2809                                self.graveyard_entries.fetch_sub(1, Ordering::Relaxed);
2810                            }
2811                        }
2812                        self.tree.insert(item)?;
2813                        if unreserve_id != INVALID_OBJECT_ID {
2814                            // To avoid races, this *must* be after the `tree.insert(..)` above.
2815                            self.last_object_id.lock().unreserve(unreserve_id);
2816                        }
2817                    }
2818                    Operation::ReplaceOrInsert => {
2819                        if !context.mode.is_replay()
2820                            && matches!(
2821                                item.key.data,
2822                                ObjectKeyData::GraveyardEntry { .. }
2823                                    | ObjectKeyData::GraveyardAttributeEntry { .. }
2824                            )
2825                        {
2826                            if matches!(item.value, ObjectValue::Some | ObjectValue::Trim) {
2827                                self.graveyard_entries.fetch_add(1, Ordering::Relaxed);
2828                            } else if matches!(item.value, ObjectValue::None) {
2829                                self.graveyard_entries.fetch_sub(1, Ordering::Relaxed);
2830                            }
2831                        }
2832                        self.tree.replace_or_insert(item);
2833                    }
2834                    Operation::Merge => {
2835                        if item.is_tombstone() {
2836                            let info = &mut self.store_info.lock();
2837                            let object_count = &mut info.as_mut().unwrap().object_count;
2838                            *object_count = object_count.saturating_sub(1);
2839                        }
2840                        if !context.mode.is_replay()
2841                            && matches!(
2842                                item.key.data,
2843                                ObjectKeyData::GraveyardEntry { .. }
2844                                    | ObjectKeyData::GraveyardAttributeEntry { .. }
2845                            )
2846                        {
2847                            if matches!(item.value, ObjectValue::Some | ObjectValue::Trim) {
2848                                self.graveyard_entries.fetch_add(1, Ordering::Relaxed);
2849                            } else if matches!(item.value, ObjectValue::None) {
2850                                self.graveyard_entries.fetch_sub(1, Ordering::Relaxed);
2851                            }
2852                        }
2853                        let lower_bound = item.key.key_for_merge_into();
2854                        self.tree.merge_into(item, &lower_bound);
2855                    }
2856                }
2857            }
2858            Mutation::BeginFlush => {
2859                ensure!(self.parent_store.is_some(), FxfsError::Inconsistent);
2860                self.tree.seal();
2861            }
2862            Mutation::EndFlush => ensure!(self.parent_store.is_some(), FxfsError::Inconsistent),
2863            Mutation::EncryptedObjectStore(_) | Mutation::UpdateMutationsKey(_) => {
2864                // We will process these during Self::unlock.
2865                ensure!(
2866                    !matches!(&*self.lock_state.lock(), LockState::Unencrypted),
2867                    FxfsError::Inconsistent
2868                );
2869            }
2870            Mutation::CreateInternalDir(object_id) => {
2871                ensure!(object_id != INVALID_OBJECT_ID, FxfsError::Inconsistent);
2872                self.store_info.lock().as_mut().unwrap().internal_directory_object_id = object_id;
2873            }
2874            _ => bail!("unexpected mutation: {:?}", mutation),
2875        }
2876        self.counters.lock().mutations_applied += 1;
2877        Ok(())
2878    }
2879
2880    fn drop_mutation(&self, mutation: Mutation, _transaction: &Transaction<'_>) {
2881        self.counters.lock().mutations_dropped += 1;
2882        if let Mutation::ObjectStore(ObjectStoreMutation {
2883            item: Item { key: ObjectKey { object_id, .. }, value: ObjectValue::Object { .. }, .. },
2884            op: Operation::Insert,
2885        }) = mutation
2886        {
2887            self.last_object_id.lock().unreserve(object_id);
2888        }
2889    }
2890
2891    /// Push all in-memory structures to the device. This is not necessary for sync since the
2892    /// journal will take care of it.  This is supposed to be called when there is either memory or
2893    /// space pressure (flushing the store will persist in-memory data and allow the journal file to
2894    /// be trimmed).
2895    ///
2896    /// Also returns the earliest version of a struct in the filesystem (when known).
2897    async fn flush(&self) -> Result<Version, Error> {
2898        self.flush_with_reason(flush::Reason::Journal).await
2899    }
2900
2901    fn write_mutation(&self, mutation: &Mutation, mut writer: journal::Writer<'_>) {
2902        // Intentionally enumerating all variants to force a decision on any new variants. Encrypt
2903        // all mutations that could affect an encrypted object store contents or the `StoreInfo` of
2904        // the encrypted object store. During `unlock()` any mutations which haven't been encrypted
2905        // won't be replayed after reading `StoreInfo`.
2906        match mutation {
2907            // Whilst CreateInternalDir is a mutation for `StoreInfo`, which isn't encrypted, we
2908            // still choose to encrypt the mutation because it makes it easier to deal with replay.
2909            // When we replay mutations for an encrypted store, the only thing we keep in memory are
2910            // the encrypted mutations; we don't keep `StoreInfo` or changes to it in memory. So, by
2911            // encrypting the CreateInternalDir mutation here, it means we don't have to track both
2912            // encrypted mutations bound for the LSM tree and unencrypted mutations for `StoreInfo`
2913            // to use in `unlock()`. It'll just bundle CreateInternalDir mutations with the other
2914            // encrypted mutations and handled them all in sequence during `unlock()`.
2915            Mutation::ObjectStore(_) | Mutation::CreateInternalDir(_) => {
2916                let mut cipher = self.mutations_cipher.lock();
2917                if let Some(cipher) = cipher.as_mut() {
2918                    // If this is the first time we've used this key, we must write the key out.
2919                    if cipher.offset() == 0 {
2920                        writer.write(Mutation::update_mutations_key(
2921                            self.store_info
2922                                .lock()
2923                                .as_ref()
2924                                .unwrap()
2925                                .mutations_key
2926                                .as_ref()
2927                                .unwrap()
2928                                .clone(),
2929                        ));
2930                    }
2931                    let mut buffer = Vec::new();
2932                    mutation.serialize_into(&mut buffer).unwrap();
2933                    cipher.encrypt(&mut buffer);
2934                    writer.write(Mutation::EncryptedObjectStore(buffer.into()));
2935                    return;
2936                }
2937            }
2938            // `EncryptedObjectStore` and `UpdateMutationsKey` are both obviously associated with
2939            // encrypted object stores, but are either the encrypted mutation data itself or
2940            // metadata governing how the data will be encrypted. They should only be produced here.
2941            Mutation::EncryptedObjectStore(_) | Mutation::UpdateMutationsKey(_) => {
2942                debug_assert!(false, "Only this method should generate encrypted mutations");
2943            }
2944            // `BeginFlush` and `EndFlush` are not needed during `unlock()` and are needed during
2945            // the initial journal replay, so should not be encrypted. `Allocator`, `DeleteVolume`,
2946            // `UpdateBorrowed` mutations are never associated with an encrypted store as we do not
2947            // encrypt the allocator or root/root-parent stores so we can avoid the locking.
2948            Mutation::Allocator(_)
2949            | Mutation::BeginFlush
2950            | Mutation::EndFlush
2951            | Mutation::DeleteVolume
2952            | Mutation::UpdateBorrowed(_) => {}
2953        }
2954        writer.write(mutation.clone());
2955    }
2956}
2957
2958impl Drop for ObjectStore {
2959    fn drop(&mut self) {
2960        let mut last_object_id = self.last_object_id.lock();
2961        last_object_id.drain_unreserved();
2962        match &*last_object_id {
2963            LastObjectId::Low32Bit { reserved, .. } => debug_assert!(reserved.is_empty()),
2964            _ => {}
2965        }
2966    }
2967}
2968
2969impl HandleOwner for ObjectStore {}
2970
2971impl AsRef<ObjectStore> for ObjectStore {
2972    fn as_ref(&self) -> &ObjectStore {
2973        self
2974    }
2975}
2976
2977fn layer_size_from_encrypted_mutations_size(size: u64) -> u64 {
2978    // This is similar to reserved_space_from_journal_usage. It needs to be a worst case estimate of
2979    // the amount of metadata space that might need to be reserved to allow the encrypted mutations
2980    // to be written to layer files.  It needs to be >= than reservation_amount_from_layer_size will
2981    // return once the data has been written to layer files and <= than
2982    // reserved_space_from_journal_usage would use.  We can't just use
2983    // reserved_space_from_journal_usage because the encrypted mutations file includes some extra
2984    // data (it includes the checkpoints) that isn't written in the same way to the journal.
2985    size * 3
2986}
2987
2988impl AssociatedObject for ObjectStore {}
2989
2990/// Argument to the trim_some method.
2991#[derive(Debug)]
2992pub enum TrimMode {
2993    /// Trim extents beyond the current size.
2994    UseSize,
2995
2996    /// Trim extents beyond the supplied offset.
2997    FromOffset(u64),
2998
2999    /// Remove the object (or attribute) from the store once it is fully trimmed.
3000    Tombstone(TombstoneMode),
3001}
3002
3003/// Sets the mode for tombstoning (either at the object or attribute level).
3004#[derive(Debug)]
3005pub enum TombstoneMode {
3006    Object,
3007    Attribute,
3008}
3009
3010/// Result of the trim_some method.
3011#[derive(Debug)]
3012pub enum TrimResult {
3013    /// We reached the limit of the transaction and more extents might follow.
3014    Incomplete,
3015
3016    /// We finished this attribute.  Returns the ID of the next attribute for the same object if
3017    /// there is one.
3018    Done(Option<AttributeId>),
3019}
3020
3021/// Loads store info.
3022pub async fn load_store_info(
3023    parent: &Arc<ObjectStore>,
3024    store_object_id: u64,
3025) -> Result<StoreInfo, Error> {
3026    load_store_info_from_handle(
3027        &ObjectStore::open_object(parent, store_object_id, HandleOptions::default(), None).await?,
3028    )
3029    .await
3030}
3031
3032async fn load_store_info_from_handle(
3033    handle: &DataObjectHandle<impl HandleOwner>,
3034) -> Result<StoreInfo, Error> {
3035    Ok(if handle.get_size() > 0 {
3036        let serialized_info = handle.contents(MAX_STORE_INFO_SERIALIZED_SIZE).await?;
3037        let mut cursor = std::io::Cursor::new(serialized_info);
3038        let (store_info, _) = StoreInfo::deserialize_with_version(&mut cursor)
3039            .context("Failed to deserialize StoreInfo")?;
3040        store_info
3041    } else {
3042        // The store_info will be absent for a newly created and empty object store.
3043        StoreInfo::default()
3044    })
3045}
3046
3047#[cfg(test)]
3048mod tests {
3049    use super::{
3050        AttributeId, FsverityMetadata, HandleOptions, LastObjectId, LastObjectIdInfo, LockKey,
3051        MAX_STORE_INFO_SERIALIZED_SIZE, Mutation, NO_OWNER, NewChildStoreOptions,
3052        OBJECT_ID_HI_MASK, ObjectStore, RootDigest, StoreInfo, StoreOptions, StoreOwner,
3053    };
3054    use crate::errors::FxfsError;
3055    use crate::filesystem::{FxFilesystem, JournalingObject, OpenFxFilesystem};
3056    use crate::fsck::{fsck, fsck_volume};
3057    use crate::lsm_tree::Query;
3058    use crate::lsm_tree::types::{ItemRef, LayerIterator};
3059    use crate::object_handle::{
3060        INVALID_OBJECT_ID, ObjectHandle, ReadObjectHandle, WriteObjectHandle,
3061    };
3062    use crate::object_store::directory::Directory;
3063    use crate::object_store::object_record::{AttributeKey, ObjectKey, ObjectKind, ObjectValue};
3064    use crate::object_store::transaction::{Options, lock_keys};
3065    use crate::object_store::volume::root_volume;
3066    use crate::serialized_types::VersionedLatest;
3067    use crate::testing;
3068    use assert_matches::assert_matches;
3069    use async_trait::async_trait;
3070    use fuchsia_async as fasync;
3071    use fuchsia_sync::Mutex;
3072    use futures::join;
3073    use fxfs_crypto::ff1::Ff1;
3074    use fxfs_crypto::{
3075        Crypt, FXFS_KEY_SIZE, FXFS_WRAPPED_KEY_SIZE, FxfsKey, UnwrappedKey, WrappedKeyBytes,
3076    };
3077    use fxfs_insecure_crypto::new_insecure_crypt;
3078    use std::sync::Arc;
3079    use std::time::Duration;
3080    use storage_device::DeviceHolder;
3081    use storage_device::fake_device::FakeDevice;
3082    use test_case::test_case;
3083
3084    const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
3085
3086    async fn test_filesystem() -> OpenFxFilesystem {
3087        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
3088        FxFilesystem::new_empty(device).await.expect("new_empty failed")
3089    }
3090
3091    #[fuchsia::test]
3092    async fn test_verified_file_with_verified_attribute() {
3093        let fs: OpenFxFilesystem = test_filesystem().await;
3094        let mut transaction = fs
3095            .clone()
3096            .new_transaction(lock_keys![], Options::default())
3097            .await
3098            .expect("new_transaction failed");
3099        let store = fs.root_store();
3100        let object = Arc::new(
3101            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
3102                .await
3103                .expect("create_object failed"),
3104        );
3105
3106        transaction.add(
3107            store.store_object_id(),
3108            Mutation::replace_or_insert_object(
3109                ObjectKey::attribute(
3110                    object.object_id(),
3111                    AttributeId::DATA,
3112                    AttributeKey::Attribute,
3113                ),
3114                ObjectValue::verified_attribute(
3115                    0,
3116                    FsverityMetadata::Internal(RootDigest::Sha256([0; 32]), vec![]),
3117                ),
3118            ),
3119        );
3120
3121        transaction.add(
3122            store.store_object_id(),
3123            Mutation::replace_or_insert_object(
3124                ObjectKey::attribute(
3125                    object.object_id(),
3126                    AttributeId::FSVERITY_MERKLE,
3127                    AttributeKey::Attribute,
3128                ),
3129                ObjectValue::attribute(0, false),
3130            ),
3131        );
3132
3133        transaction.commit().await.unwrap();
3134
3135        let handle =
3136            ObjectStore::open_object(&store, object.object_id(), HandleOptions::default(), None)
3137                .await
3138                .expect("open_object failed");
3139
3140        assert!(handle.is_verified_file());
3141
3142        fs.close().await.expect("Close failed");
3143    }
3144
3145    #[fuchsia::test]
3146    async fn test_verified_file_without_verified_attribute() {
3147        let fs: OpenFxFilesystem = test_filesystem().await;
3148        let mut transaction = fs
3149            .clone()
3150            .new_transaction(lock_keys![], Options::default())
3151            .await
3152            .expect("new_transaction failed");
3153        let store = fs.root_store();
3154        let object = Arc::new(
3155            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
3156                .await
3157                .expect("create_object failed"),
3158        );
3159
3160        transaction.commit().await.unwrap();
3161
3162        let handle =
3163            ObjectStore::open_object(&store, object.object_id(), HandleOptions::default(), None)
3164                .await
3165                .expect("open_object failed");
3166
3167        assert!(!handle.is_verified_file());
3168
3169        fs.close().await.expect("Close failed");
3170    }
3171
3172    #[fuchsia::test]
3173    async fn test_create_and_open_store() {
3174        let fs = test_filesystem().await;
3175        let store_id = {
3176            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3177            root_volume
3178                .new_volume(
3179                    "test",
3180                    NewChildStoreOptions {
3181                        options: StoreOptions {
3182                            owner: NO_OWNER,
3183                            crypt: Some(Arc::new(new_insecure_crypt())),
3184                        },
3185                        ..Default::default()
3186                    },
3187                )
3188                .await
3189                .expect("new_volume failed")
3190                .store_object_id()
3191        };
3192
3193        fs.close().await.expect("close failed");
3194        let device = fs.take_device().await;
3195        device.reopen(false);
3196        let fs = FxFilesystem::open(device).await.expect("open failed");
3197
3198        {
3199            let store = fs.object_manager().store(store_id).expect("store not found");
3200            store.unlock(NO_OWNER, Arc::new(new_insecure_crypt())).await.expect("unlock failed");
3201        }
3202        fs.close().await.expect("Close failed");
3203    }
3204
3205    #[fuchsia::test]
3206    async fn test_create_and_open_internal_dir() {
3207        let fs = test_filesystem().await;
3208        let dir_id;
3209        let store_id;
3210        {
3211            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3212            let store = root_volume
3213                .new_volume(
3214                    "test",
3215                    NewChildStoreOptions {
3216                        options: StoreOptions {
3217                            owner: NO_OWNER,
3218                            crypt: Some(Arc::new(new_insecure_crypt())),
3219                        },
3220                        ..Default::default()
3221                    },
3222                )
3223                .await
3224                .expect("new_volume failed");
3225            dir_id =
3226                store.get_or_create_internal_directory_id().await.expect("Create internal dir");
3227            store_id = store.store_object_id();
3228        }
3229
3230        fs.close().await.expect("close failed");
3231        let device = fs.take_device().await;
3232        device.reopen(false);
3233        let fs = FxFilesystem::open(device).await.expect("open failed");
3234
3235        {
3236            let store = fs.object_manager().store(store_id).expect("store not found");
3237            store.unlock(NO_OWNER, Arc::new(new_insecure_crypt())).await.expect("unlock failed");
3238            assert_eq!(
3239                dir_id,
3240                store.get_or_create_internal_directory_id().await.expect("Retrieving dir")
3241            );
3242            let obj = store
3243                .tree()
3244                .find(&ObjectKey::object(dir_id))
3245                .await
3246                .expect("Searching tree for dir")
3247                .unwrap();
3248            assert_matches!(
3249                obj.value,
3250                ObjectValue::Object { kind: ObjectKind::Directory { .. }, .. }
3251            );
3252        }
3253        fs.close().await.expect("Close failed");
3254    }
3255
3256    #[fuchsia::test]
3257    async fn test_create_and_open_internal_dir_unencrypted() {
3258        let fs = test_filesystem().await;
3259        let dir_id;
3260        let store_id;
3261        {
3262            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3263            let store = root_volume
3264                .new_volume("test", NewChildStoreOptions::default())
3265                .await
3266                .expect("new_volume failed");
3267            dir_id =
3268                store.get_or_create_internal_directory_id().await.expect("Create internal dir");
3269            store_id = store.store_object_id();
3270        }
3271
3272        fs.close().await.expect("close failed");
3273        let device = fs.take_device().await;
3274        device.reopen(false);
3275        let fs = FxFilesystem::open(device).await.expect("open failed");
3276
3277        {
3278            let store = fs.object_manager().store(store_id).expect("store not found");
3279            assert_eq!(
3280                dir_id,
3281                store.get_or_create_internal_directory_id().await.expect("Retrieving dir")
3282            );
3283            let obj = store
3284                .tree()
3285                .find(&ObjectKey::object(dir_id))
3286                .await
3287                .expect("Searching tree for dir")
3288                .unwrap();
3289            assert_matches!(
3290                obj.value,
3291                ObjectValue::Object { kind: ObjectKind::Directory { .. }, .. }
3292            );
3293        }
3294        fs.close().await.expect("Close failed");
3295    }
3296
3297    #[fuchsia::test(threads = 10)]
3298    async fn test_old_layers_are_purged() {
3299        let fs = test_filesystem().await;
3300
3301        let store = fs.root_store();
3302        let mut transaction = fs
3303            .clone()
3304            .new_transaction(lock_keys![], Options::default())
3305            .await
3306            .expect("new_transaction failed");
3307        let object = Arc::new(
3308            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
3309                .await
3310                .expect("create_object failed"),
3311        );
3312        transaction.commit().await.expect("commit failed");
3313
3314        store.flush().await.expect("flush failed");
3315
3316        let mut buf = object.allocate_buffer(5).await;
3317        buf.as_mut_slice().copy_from_slice(b"hello");
3318        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3319
3320        // Getting the layer-set should cause the flush to stall.
3321        let layer_set = store.tree().layer_set();
3322
3323        let done = Mutex::new(false);
3324        let mut object_id = 0;
3325
3326        join!(
3327            async {
3328                store.flush().await.expect("flush failed");
3329                assert!(*done.lock());
3330            },
3331            async {
3332                // This is a halting problem so all we can do is sleep.
3333                fasync::Timer::new(Duration::from_secs(1)).await;
3334                *done.lock() = true;
3335                object_id = layer_set.layers.last().unwrap().handle().unwrap().object_id();
3336                std::mem::drop(layer_set);
3337            }
3338        );
3339
3340        if let Err(e) = ObjectStore::open_object(
3341            &store.parent_store.as_ref().unwrap(),
3342            object_id,
3343            HandleOptions::default(),
3344            store.crypt(),
3345        )
3346        .await
3347        {
3348            assert!(FxfsError::NotFound.matches(&e));
3349        } else {
3350            panic!("open_object succeeded");
3351        }
3352    }
3353
3354    #[fuchsia::test]
3355    async fn test_tombstone_deletes_data() {
3356        let fs = test_filesystem().await;
3357        let root_store = fs.root_store();
3358        let child_id = {
3359            let mut transaction = fs
3360                .clone()
3361                .new_transaction(lock_keys![], Options::default())
3362                .await
3363                .expect("new_transaction failed");
3364            let child = ObjectStore::create_object(
3365                &root_store,
3366                &mut transaction,
3367                HandleOptions::default(),
3368                None,
3369            )
3370            .await
3371            .expect("create_object failed");
3372            root_store.add_to_graveyard(&mut transaction, child.object_id());
3373            transaction.commit().await.expect("commit failed");
3374
3375            // Allocate an extent in the file.
3376            let mut buffer = child.allocate_buffer(8192).await;
3377            buffer.as_mut_slice().fill(0xaa);
3378            child.write_or_append(Some(0), buffer.as_ref()).await.expect("write failed");
3379
3380            child.object_id()
3381        };
3382
3383        root_store.tombstone_object(child_id, Options::default()).await.expect("tombstone failed");
3384
3385        // Let fsck check allocations.
3386        fsck(fs.clone()).await.expect("fsck failed");
3387    }
3388
3389    #[fuchsia::test]
3390    async fn test_tombstone_purges_keys() {
3391        let fs = test_filesystem().await;
3392        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3393        let store = root_volume
3394            .new_volume(
3395                "test",
3396                NewChildStoreOptions {
3397                    options: StoreOptions {
3398                        crypt: Some(Arc::new(new_insecure_crypt())),
3399                        ..StoreOptions::default()
3400                    },
3401                    ..NewChildStoreOptions::default()
3402                },
3403            )
3404            .await
3405            .expect("new_volume failed");
3406        let mut transaction = fs
3407            .clone()
3408            .new_transaction(lock_keys![], Options::default())
3409            .await
3410            .expect("new_transaction failed");
3411        let child =
3412            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
3413                .await
3414                .expect("create_object failed");
3415        store.add_to_graveyard(&mut transaction, child.object_id());
3416        transaction.commit().await.expect("commit failed");
3417        assert!(store.key_manager.get(child.object_id()).await.unwrap().is_some());
3418        store
3419            .tombstone_object(child.object_id(), Options::default())
3420            .await
3421            .expect("tombstone_object failed");
3422        assert!(store.key_manager.get(child.object_id()).await.unwrap().is_none());
3423        fs.close().await.expect("close failed");
3424    }
3425
3426    #[fuchsia::test]
3427    async fn test_major_compaction_discards_unnecessary_records() {
3428        let fs = test_filesystem().await;
3429        let root_store = fs.root_store();
3430        let child_id = {
3431            let mut transaction = fs
3432                .clone()
3433                .new_transaction(lock_keys![], Options::default())
3434                .await
3435                .expect("new_transaction failed");
3436            let child = ObjectStore::create_object(
3437                &root_store,
3438                &mut transaction,
3439                HandleOptions::default(),
3440                None,
3441            )
3442            .await
3443            .expect("create_object failed");
3444            root_store.add_to_graveyard(&mut transaction, child.object_id());
3445            transaction.commit().await.expect("commit failed");
3446
3447            // Allocate an extent in the file.
3448            let mut buffer = child.allocate_buffer(8192).await;
3449            buffer.as_mut_slice().fill(0xaa);
3450            child.write_or_append(Some(0), buffer.as_ref()).await.expect("write failed");
3451
3452            child.object_id()
3453        };
3454
3455        root_store.tombstone_object(child_id, Options::default()).await.expect("tombstone failed");
3456        {
3457            let layers = root_store.tree.layer_set();
3458            let mut merger = layers.merger();
3459            let iter = merger
3460                .query(Query::FullRange(&ObjectKey::object(child_id)))
3461                .await
3462                .expect("seek failed");
3463            // Find at least one object still in the tree.
3464            match iter.get() {
3465                Some(ItemRef { key: ObjectKey { object_id, .. }, .. })
3466                    if *object_id == child_id => {}
3467                _ => panic!("Objects should still be in the tree."),
3468            }
3469        }
3470        root_store.flush().await.expect("flush failed");
3471
3472        // There should be no records for the object.
3473        let layers = root_store.tree.layer_set();
3474        let mut merger = layers.merger();
3475        let iter = merger
3476            .query(Query::FullRange(&ObjectKey::object(child_id)))
3477            .await
3478            .expect("seek failed");
3479        match iter.get() {
3480            None => {}
3481            Some(ItemRef { key: ObjectKey { object_id, .. }, .. }) => {
3482                assert_ne!(*object_id, child_id)
3483            }
3484        }
3485    }
3486
3487    #[fuchsia::test]
3488    async fn test_overlapping_extents_in_different_layers() {
3489        let fs = test_filesystem().await;
3490        let store = fs.root_store();
3491
3492        let mut transaction = fs
3493            .clone()
3494            .new_transaction(
3495                lock_keys![LockKey::object(
3496                    store.store_object_id(),
3497                    store.root_directory_object_id()
3498                )],
3499                Options::default(),
3500            )
3501            .await
3502            .expect("new_transaction failed");
3503        let root_directory =
3504            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
3505        let object = root_directory
3506            .create_child_file(&mut transaction, "test")
3507            .await
3508            .expect("create_child_file failed");
3509        transaction.commit().await.expect("commit failed");
3510
3511        let buf = object.allocate_buffer(16384).await;
3512        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3513
3514        store.flush().await.expect("flush failed");
3515
3516        object.write_or_append(Some(0), buf.subslice(0..4096)).await.expect("write failed");
3517
3518        // At this point, we should have an extent for 0..16384 in a layer that has been flushed,
3519        // and an extent for 0..4096 that partially overwrites it.  Writing to 0..16384 should
3520        // overwrite both of those extents.
3521        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3522
3523        fsck(fs.clone()).await.expect("fsck failed");
3524    }
3525
3526    #[fuchsia::test(threads = 10)]
3527    async fn test_encrypted_mutations() {
3528        async fn one_iteration(
3529            fs: OpenFxFilesystem,
3530            crypt: Arc<dyn Crypt>,
3531            iteration: u64,
3532        ) -> OpenFxFilesystem {
3533            async fn reopen(fs: OpenFxFilesystem) -> OpenFxFilesystem {
3534                fs.close().await.expect("Close failed");
3535                let device = fs.take_device().await;
3536                device.reopen(false);
3537                FxFilesystem::open(device).await.expect("FS open failed")
3538            }
3539
3540            let fs = reopen(fs).await;
3541
3542            let (store_object_id, object_id) = {
3543                let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3544                let store = root_volume
3545                    .volume(
3546                        "test",
3547                        StoreOptions { crypt: Some(crypt.clone()), ..StoreOptions::default() },
3548                    )
3549                    .await
3550                    .expect("volume failed");
3551
3552                let mut transaction = fs
3553                    .clone()
3554                    .new_transaction(
3555                        lock_keys![LockKey::object(
3556                            store.store_object_id(),
3557                            store.root_directory_object_id(),
3558                        )],
3559                        Options::default(),
3560                    )
3561                    .await
3562                    .expect("new_transaction failed");
3563                let root_directory = Directory::open(&store, store.root_directory_object_id())
3564                    .await
3565                    .expect("open failed");
3566                let object = root_directory
3567                    .create_child_file(&mut transaction, &format!("test {}", iteration))
3568                    .await
3569                    .expect("create_child_file failed");
3570                transaction.commit().await.expect("commit failed");
3571
3572                let mut buf = object.allocate_buffer(1000).await;
3573                for i in 0..buf.len() {
3574                    buf.as_mut_slice()[i] = i as u8;
3575                }
3576                object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3577
3578                (store.store_object_id(), object.object_id())
3579            };
3580
3581            let fs = reopen(fs).await;
3582
3583            let check_object = |fs: Arc<FxFilesystem>| {
3584                let crypt = crypt.clone();
3585                async move {
3586                    let root_volume = root_volume(fs).await.expect("root_volume failed");
3587                    let volume = root_volume
3588                        .volume(
3589                            "test",
3590                            StoreOptions { crypt: Some(crypt), ..StoreOptions::default() },
3591                        )
3592                        .await
3593                        .expect("volume failed");
3594
3595                    let object = ObjectStore::open_object(
3596                        &volume,
3597                        object_id,
3598                        HandleOptions::default(),
3599                        None,
3600                    )
3601                    .await
3602                    .expect("open_object failed");
3603                    let mut buf = object.allocate_buffer(1000).await;
3604                    assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), 1000);
3605                    for i in 0..buf.len() {
3606                        assert_eq!(buf.as_slice()[i], i as u8);
3607                    }
3608                }
3609            };
3610
3611            check_object(fs.clone()).await;
3612
3613            let fs = reopen(fs).await;
3614
3615            // At this point the "test" volume is locked.  Before checking the object, flush the
3616            // filesystem.  This should leave a file with encrypted mutations.
3617            fs.object_manager().flush().await.expect("flush failed");
3618
3619            assert_ne!(
3620                fs.object_manager()
3621                    .store(store_object_id)
3622                    .unwrap()
3623                    .load_store_info()
3624                    .await
3625                    .expect("load_store_info failed")
3626                    .encrypted_mutations_object_id,
3627                INVALID_OBJECT_ID
3628            );
3629
3630            check_object(fs.clone()).await;
3631
3632            // Checking the object should have triggered a flush and so now there should be no
3633            // encrypted mutations object.
3634            assert_eq!(
3635                fs.object_manager()
3636                    .store(store_object_id)
3637                    .unwrap()
3638                    .load_store_info()
3639                    .await
3640                    .expect("load_store_info failed")
3641                    .encrypted_mutations_object_id,
3642                INVALID_OBJECT_ID
3643            );
3644
3645            let fs = reopen(fs).await;
3646
3647            fsck(fs.clone()).await.expect("fsck failed");
3648
3649            let fs = reopen(fs).await;
3650
3651            check_object(fs.clone()).await;
3652
3653            fs
3654        }
3655
3656        let mut fs = test_filesystem().await;
3657        let crypt = Arc::new(new_insecure_crypt());
3658
3659        {
3660            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3661            let _store = root_volume
3662                .new_volume(
3663                    "test",
3664                    NewChildStoreOptions {
3665                        options: StoreOptions {
3666                            crypt: Some(crypt.clone()),
3667                            ..StoreOptions::default()
3668                        },
3669                        ..Default::default()
3670                    },
3671                )
3672                .await
3673                .expect("new_volume failed");
3674        }
3675
3676        // Run a few iterations so that we test changes with the stream cipher offset.
3677        for i in 0..5 {
3678            fs = one_iteration(fs, crypt.clone(), i).await;
3679        }
3680    }
3681
3682    #[test_case(true; "with a flush")]
3683    #[test_case(false; "without a flush")]
3684    #[fuchsia::test(threads = 10)]
3685    async fn test_object_id_cipher_roll(with_flush: bool) {
3686        let fs = test_filesystem().await;
3687        let crypt = Arc::new(new_insecure_crypt());
3688
3689        let expected_key = {
3690            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3691            let store = root_volume
3692                .new_volume(
3693                    "test",
3694                    NewChildStoreOptions {
3695                        options: StoreOptions {
3696                            crypt: Some(crypt.clone()),
3697                            ..StoreOptions::default()
3698                        },
3699                        ..Default::default()
3700                    },
3701                )
3702                .await
3703                .expect("new_volume failed");
3704
3705            // Create some files so that our in-memory copy of StoreInfo has changes (the object
3706            // count) pending a flush.
3707            let root_dir_id = store.root_directory_object_id();
3708            let root_dir =
3709                Arc::new(Directory::open(&store, root_dir_id).await.expect("open failed"));
3710            let mut transaction = fs
3711                .clone()
3712                .new_transaction(
3713                    lock_keys![LockKey::object(store.store_object_id(), root_dir_id)],
3714                    Options::default(),
3715                )
3716                .await
3717                .expect("new_transaction failed");
3718            for i in 0..10 {
3719                root_dir.create_child_file(&mut transaction, &format!("file {i}")).await.unwrap();
3720            }
3721            transaction.commit().await.expect("commit failed");
3722
3723            let orig_store_info = store.store_info().unwrap();
3724
3725            // Hack the last object ID to force a roll of the object ID cipher.
3726            {
3727                let mut last_object_id = store.last_object_id.lock();
3728                match &mut *last_object_id {
3729                    LastObjectId::Encrypted { id, .. } => {
3730                        assert_eq!(*id & OBJECT_ID_HI_MASK, 0);
3731                        *id |= 0xffffffff;
3732                    }
3733                    _ => unreachable!(),
3734                }
3735            }
3736
3737            let mut transaction = fs
3738                .clone()
3739                .new_transaction(
3740                    lock_keys![LockKey::object(
3741                        store.store_object_id(),
3742                        store.root_directory_object_id()
3743                    )],
3744                    Options::default(),
3745                )
3746                .await
3747                .expect("new_transaction failed");
3748            let root_directory = Directory::open(&store, store.root_directory_object_id())
3749                .await
3750                .expect("open failed");
3751            let object = root_directory
3752                .create_child_file(&mut transaction, "test")
3753                .await
3754                .expect("create_child_file failed");
3755            transaction.commit().await.expect("commit failed");
3756
3757            assert_eq!(object.object_id() & OBJECT_ID_HI_MASK, 1u64 << 32);
3758
3759            // Check that the key has been changed.
3760            let key = match (
3761                store.store_info().unwrap().last_object_id,
3762                orig_store_info.last_object_id,
3763            ) {
3764                (
3765                    LastObjectIdInfo::Encrypted { key, id },
3766                    LastObjectIdInfo::Encrypted { key: orig_key, .. },
3767                ) => {
3768                    assert_ne!(key, orig_key);
3769                    assert_eq!(id, 1u64 << 32);
3770                    key
3771                }
3772                _ => unreachable!(),
3773            };
3774
3775            if with_flush {
3776                fs.journal().force_compact().await.unwrap();
3777            }
3778
3779            let last_object_id = store.last_object_id.lock();
3780            assert_eq!(last_object_id.id(), 1u64 << 32);
3781            key
3782        };
3783
3784        fs.close().await.expect("Close failed");
3785        let device = fs.take_device().await;
3786        device.reopen(false);
3787        let fs = FxFilesystem::open(device).await.expect("open failed");
3788        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3789        let store = root_volume
3790            .volume("test", StoreOptions { crypt: Some(crypt.clone()), ..StoreOptions::default() })
3791            .await
3792            .expect("volume failed");
3793
3794        assert_matches!(store.store_info().unwrap().last_object_id, LastObjectIdInfo::Encrypted { key, .. } if key == expected_key);
3795        assert_eq!(store.last_object_id.lock().id(), 1u64 << 32);
3796
3797        fsck(fs.clone()).await.expect("fsck failed");
3798        fsck_volume(&fs, store.store_object_id(), None).await.expect("fsck_volume failed");
3799    }
3800
3801    #[fuchsia::test(threads = 2)]
3802    async fn test_race_object_id_cipher_roll_and_flush() {
3803        let fs = test_filesystem().await;
3804        let crypt = Arc::new(new_insecure_crypt());
3805
3806        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3807        let store = root_volume
3808            .new_volume(
3809                "test",
3810                NewChildStoreOptions {
3811                    options: StoreOptions { crypt: Some(crypt.clone()), ..StoreOptions::default() },
3812                    ..Default::default()
3813                },
3814            )
3815            .await
3816            .expect("new_volume failed");
3817
3818        assert!(matches!(&*store.last_object_id.lock(), LastObjectId::Encrypted { .. }));
3819
3820        // Create some files so that our in-memory copy of StoreInfo has changes (the object
3821        // count) pending a flush.
3822        let root_dir_id = store.root_directory_object_id();
3823        let root_dir = Arc::new(Directory::open(&store, root_dir_id).await.expect("open failed"));
3824
3825        let _executor_tasks = testing::force_executor_threads_to_run(2).await;
3826
3827        for j in 0..100 {
3828            let mut transaction = fs
3829                .clone()
3830                .new_transaction(
3831                    lock_keys![LockKey::object(store.store_object_id(), root_dir_id)],
3832                    Options::default(),
3833                )
3834                .await
3835                .expect("new_transaction failed");
3836            root_dir.create_child_file(&mut transaction, &format!("file {j}")).await.unwrap();
3837            transaction.commit().await.expect("commit failed");
3838
3839            let task = {
3840                let fs = fs.clone();
3841                fasync::Task::spawn(async move {
3842                    fs.journal().force_compact().await.unwrap();
3843                })
3844            };
3845
3846            // Hack the last object ID to force a roll of the object ID cipher.
3847            {
3848                let mut last_object_id = store.last_object_id.lock();
3849                let LastObjectId::Encrypted { id, .. } = &mut *last_object_id else {
3850                    unreachable!()
3851                };
3852                assert_eq!(*id >> 32, j);
3853                *id |= 0xffffffff;
3854            }
3855
3856            let mut transaction = fs
3857                .clone()
3858                .new_transaction(
3859                    lock_keys![LockKey::object(
3860                        store.store_object_id(),
3861                        store.root_directory_object_id()
3862                    )],
3863                    Options::default(),
3864                )
3865                .await
3866                .expect("new_transaction failed");
3867            let root_directory = Directory::open(&store, store.root_directory_object_id())
3868                .await
3869                .expect("open failed");
3870            root_directory
3871                .create_child_file(&mut transaction, "test {j}")
3872                .await
3873                .expect("create_child_file failed");
3874            transaction.commit().await.expect("commit failed");
3875
3876            task.await;
3877
3878            // Check that the key has been changed.
3879            let new_store_info = store.load_store_info().await.unwrap();
3880
3881            let LastObjectIdInfo::Encrypted { id, key } = new_store_info.last_object_id else {
3882                unreachable!()
3883            };
3884            assert_eq!(id >> 32, j + 1);
3885            let LastObjectIdInfo::Encrypted { key: in_memory_key, .. } =
3886                store.store_info().unwrap().last_object_id
3887            else {
3888                unreachable!()
3889            };
3890            assert_eq!(key, in_memory_key);
3891        }
3892
3893        fs.close().await.expect("Close failed");
3894    }
3895
3896    #[fuchsia::test]
3897    async fn test_object_id_no_roll_for_unencrypted_store() {
3898        let fs = test_filesystem().await;
3899
3900        {
3901            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3902            let store = root_volume
3903                .new_volume("test", NewChildStoreOptions::default())
3904                .await
3905                .expect("new_volume failed");
3906
3907            // Hack the last object ID.
3908            {
3909                let mut last_object_id = store.last_object_id.lock();
3910                match &mut *last_object_id {
3911                    LastObjectId::Unencrypted { id } => {
3912                        assert_eq!(*id & OBJECT_ID_HI_MASK, 0);
3913                        *id |= 0xffffffff;
3914                    }
3915                    _ => unreachable!(),
3916                }
3917            }
3918
3919            let mut transaction = fs
3920                .clone()
3921                .new_transaction(
3922                    lock_keys![LockKey::object(
3923                        store.store_object_id(),
3924                        store.root_directory_object_id()
3925                    )],
3926                    Options::default(),
3927                )
3928                .await
3929                .expect("new_transaction failed");
3930            let root_directory = Directory::open(&store, store.root_directory_object_id())
3931                .await
3932                .expect("open failed");
3933            let object = root_directory
3934                .create_child_file(&mut transaction, "test")
3935                .await
3936                .expect("create_child_file failed");
3937            transaction.commit().await.expect("commit failed");
3938
3939            assert_eq!(object.object_id(), 0x1_0000_0000);
3940
3941            // Check that there is still no key.
3942            assert_matches!(
3943                store.store_info().unwrap().last_object_id,
3944                LastObjectIdInfo::Unencrypted { .. }
3945            );
3946
3947            assert_eq!(store.last_object_id.lock().id(), 0x1_0000_0000);
3948        };
3949
3950        fs.close().await.expect("Close failed");
3951        let device = fs.take_device().await;
3952        device.reopen(false);
3953        let fs = FxFilesystem::open(device).await.expect("open failed");
3954        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3955        let store =
3956            root_volume.volume("test", StoreOptions::default()).await.expect("volume failed");
3957
3958        assert_eq!(store.last_object_id.lock().id(), 0x1_0000_0000);
3959    }
3960
3961    #[fuchsia::test]
3962    fn test_object_id_is_not_invalid_object_id() {
3963        let key = UnwrappedKey::new(vec![0; FXFS_KEY_SIZE]);
3964        // 1106634048 results in INVALID_OBJECT_ID with this key.
3965        let mut last_object_id =
3966            LastObjectId::Encrypted { id: 1106634047, cipher: Box::new(Ff1::new(&key)) };
3967        assert!(last_object_id.try_get_next().is_some());
3968        assert!(last_object_id.try_get_next().is_some());
3969    }
3970
3971    #[fuchsia::test]
3972    async fn test_last_object_id_is_correct_after_unlock() {
3973        let fs = test_filesystem().await;
3974        let crypt = Arc::new(new_insecure_crypt());
3975
3976        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3977        let store = root_volume
3978            .new_volume(
3979                "test",
3980                NewChildStoreOptions {
3981                    options: StoreOptions { crypt: Some(crypt.clone()), ..StoreOptions::default() },
3982                    ..Default::default()
3983                },
3984            )
3985            .await
3986            .expect("new_volume failed");
3987
3988        let mut transaction = fs
3989            .clone()
3990            .new_transaction(
3991                lock_keys![LockKey::object(
3992                    store.store_object_id(),
3993                    store.root_directory_object_id()
3994                )],
3995                Options::default(),
3996            )
3997            .await
3998            .expect("new_transaction failed");
3999        let root_directory =
4000            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
4001        root_directory
4002            .create_child_file(&mut transaction, "test")
4003            .await
4004            .expect("create_child_file failed");
4005        transaction.commit().await.expect("commit failed");
4006
4007        // Compact so that StoreInfo is written.
4008        fs.journal().force_compact().await.unwrap();
4009
4010        let last_object_id = store.last_object_id.lock().id();
4011
4012        store.lock().await.unwrap();
4013        store.unlock(NO_OWNER, crypt.clone()).await.unwrap();
4014
4015        assert_eq!(store.last_object_id.lock().id(), last_object_id);
4016    }
4017
4018    #[fuchsia::test(threads = 20)]
4019    async fn test_race_when_rolling_last_object_id_cipher() {
4020        // NOTE: This test is trying to test a race, so if it fails, it might be flaky.
4021
4022        const NUM_THREADS: usize = 20;
4023
4024        let fs = test_filesystem().await;
4025        let crypt = Arc::new(new_insecure_crypt());
4026
4027        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
4028        let store = root_volume
4029            .new_volume(
4030                "test",
4031                NewChildStoreOptions {
4032                    options: StoreOptions { crypt: Some(crypt.clone()), ..StoreOptions::default() },
4033                    ..Default::default()
4034                },
4035            )
4036            .await
4037            .expect("new_volume failed");
4038
4039        let store_id = store.store_object_id();
4040        let root_dir_id = store.root_directory_object_id();
4041
4042        let root_directory =
4043            Arc::new(Directory::open(&store, root_dir_id).await.expect("open failed"));
4044
4045        // Create directories.
4046        let mut directories = Vec::new();
4047        for _ in 0..NUM_THREADS {
4048            let mut transaction = fs
4049                .clone()
4050                .new_transaction(
4051                    lock_keys![LockKey::object(store_id, root_dir_id,)],
4052                    Options::default(),
4053                )
4054                .await
4055                .expect("new_transaction failed");
4056            directories.push(
4057                root_directory
4058                    .create_child_dir(&mut transaction, "test")
4059                    .await
4060                    .expect("create_child_file failed"),
4061            );
4062            transaction.commit().await.expect("commit failed");
4063        }
4064
4065        // Hack the last object ID so that the next ID will require a roll.
4066        match &mut *store.last_object_id.lock() {
4067            LastObjectId::Encrypted { id, .. } => *id |= 0xffff_ffff,
4068            _ => unreachable!(),
4069        }
4070
4071        let scope = fasync::Scope::new();
4072
4073        let _executor_tasks = testing::force_executor_threads_to_run(NUM_THREADS).await;
4074
4075        for dir in directories {
4076            let fs = fs.clone();
4077            scope.spawn(async move {
4078                let mut transaction = fs
4079                    .clone()
4080                    .new_transaction(
4081                        lock_keys![LockKey::object(store_id, dir.object_id(),)],
4082                        Options::default(),
4083                    )
4084                    .await
4085                    .expect("new_transaction failed");
4086                dir.create_child_file(&mut transaction, "test")
4087                    .await
4088                    .expect("create_child_file failed");
4089                transaction.commit().await.expect("commit failed");
4090            });
4091        }
4092
4093        scope.on_no_tasks().await;
4094
4095        assert_eq!(store.last_object_id.lock().id(), 0x1_0000_0000 + NUM_THREADS as u64 - 1);
4096    }
4097
4098    #[fuchsia::test(threads = 10)]
4099    async fn test_lock_store() {
4100        let fs = test_filesystem().await;
4101        let crypt = Arc::new(new_insecure_crypt());
4102
4103        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
4104        let store = root_volume
4105            .new_volume(
4106                "test",
4107                NewChildStoreOptions {
4108                    options: StoreOptions { crypt: Some(crypt.clone()), ..StoreOptions::default() },
4109                    ..NewChildStoreOptions::default()
4110                },
4111            )
4112            .await
4113            .expect("new_volume failed");
4114        let mut transaction = fs
4115            .clone()
4116            .new_transaction(
4117                lock_keys![LockKey::object(
4118                    store.store_object_id(),
4119                    store.root_directory_object_id()
4120                )],
4121                Options::default(),
4122            )
4123            .await
4124            .expect("new_transaction failed");
4125        let root_directory =
4126            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
4127        root_directory
4128            .create_child_file(&mut transaction, "test")
4129            .await
4130            .expect("create_child_file failed");
4131        transaction.commit().await.expect("commit failed");
4132        store.lock().await.expect("lock failed");
4133
4134        store.unlock(NO_OWNER, crypt).await.expect("unlock failed");
4135        root_directory.lookup("test").await.expect("lookup failed").expect("not found");
4136    }
4137
4138    #[fuchsia::test(threads = 10)]
4139    async fn test_unlock_read_only() {
4140        let fs = test_filesystem().await;
4141        let crypt = Arc::new(new_insecure_crypt());
4142
4143        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
4144        let store = root_volume
4145            .new_volume(
4146                "test",
4147                NewChildStoreOptions {
4148                    options: StoreOptions { crypt: Some(crypt.clone()), ..StoreOptions::default() },
4149                    ..NewChildStoreOptions::default()
4150                },
4151            )
4152            .await
4153            .expect("new_volume failed");
4154        let mut transaction = fs
4155            .clone()
4156            .new_transaction(
4157                lock_keys![LockKey::object(
4158                    store.store_object_id(),
4159                    store.root_directory_object_id()
4160                )],
4161                Options::default(),
4162            )
4163            .await
4164            .expect("new_transaction failed");
4165        let root_directory =
4166            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
4167        root_directory
4168            .create_child_file(&mut transaction, "test")
4169            .await
4170            .expect("create_child_file failed");
4171        transaction.commit().await.expect("commit failed");
4172        store.lock().await.expect("lock failed");
4173
4174        store.unlock_read_only(crypt.clone()).await.expect("unlock failed");
4175        root_directory.lookup("test").await.expect("lookup failed").expect("not found");
4176        store.lock_read_only();
4177        store.unlock_read_only(crypt).await.expect("unlock failed");
4178        root_directory.lookup("test").await.expect("lookup failed").expect("not found");
4179    }
4180
4181    #[fuchsia::test(threads = 10)]
4182    async fn test_key_rolled_when_unlocked() {
4183        let fs = test_filesystem().await;
4184        let crypt = Arc::new(new_insecure_crypt());
4185
4186        let object_id;
4187        {
4188            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
4189            let store = root_volume
4190                .new_volume(
4191                    "test",
4192                    NewChildStoreOptions {
4193                        options: StoreOptions {
4194                            crypt: Some(crypt.clone()),
4195                            ..StoreOptions::default()
4196                        },
4197                        ..Default::default()
4198                    },
4199                )
4200                .await
4201                .expect("new_volume failed");
4202            let mut transaction = fs
4203                .clone()
4204                .new_transaction(
4205                    lock_keys![LockKey::object(
4206                        store.store_object_id(),
4207                        store.root_directory_object_id()
4208                    )],
4209                    Options::default(),
4210                )
4211                .await
4212                .expect("new_transaction failed");
4213            let root_directory = Directory::open(&store, store.root_directory_object_id())
4214                .await
4215                .expect("open failed");
4216            object_id = root_directory
4217                .create_child_file(&mut transaction, "test")
4218                .await
4219                .expect("create_child_file failed")
4220                .object_id();
4221            transaction.commit().await.expect("commit failed");
4222        }
4223
4224        fs.close().await.expect("Close failed");
4225        let mut device = fs.take_device().await;
4226
4227        // Repeatedly remount so that we can be sure that we can remount when there are many
4228        // mutations keys.
4229        for _ in 0..100 {
4230            device.reopen(false);
4231            let fs = FxFilesystem::open(device).await.expect("open failed");
4232            {
4233                let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
4234                let store = root_volume
4235                    .volume(
4236                        "test",
4237                        StoreOptions { crypt: Some(crypt.clone()), ..StoreOptions::default() },
4238                    )
4239                    .await
4240                    .expect("open_volume failed");
4241
4242                // The key should get rolled every time we unlock.
4243                assert_eq!(store.mutations_cipher.lock().as_ref().unwrap().offset(), 0);
4244
4245                // Make sure there's an encrypted mutation.
4246                let handle =
4247                    ObjectStore::open_object(&store, object_id, HandleOptions::default(), None)
4248                        .await
4249                        .expect("open_object failed");
4250                let buffer = handle.allocate_buffer(100).await;
4251                handle
4252                    .write_or_append(Some(0), buffer.as_ref())
4253                    .await
4254                    .expect("write_or_append failed");
4255            }
4256            fs.close().await.expect("Close failed");
4257            device = fs.take_device().await;
4258        }
4259    }
4260
4261    #[test]
4262    fn test_store_info_max_serialized_size() {
4263        let info = StoreInfo {
4264            guid: [0xff; 16],
4265            last_object_id: LastObjectIdInfo::Encrypted {
4266                id: 0x1234567812345678,
4267                key: FxfsKey {
4268                    wrapping_key_id: 0x1234567812345678u128.to_le_bytes(),
4269                    key: WrappedKeyBytes::from([0xff; FXFS_WRAPPED_KEY_SIZE]),
4270                },
4271            },
4272            // Worst case, each layer should be 3/4 the size of the layer below it (because of the
4273            // compaction policy we're using).  If the smallest layer is 8,192 bytes, then 120
4274            // layers would take up a size that exceeds a 64 bit unsigned integer, so if this fits,
4275            // any size should fit.
4276            layers: vec![0x1234567812345678; 120],
4277            root_directory_object_id: 0x1234567812345678,
4278            graveyard_directory_object_id: 0x1234567812345678,
4279            object_count: 0x1234567812345678,
4280            mutations_key: Some(FxfsKey {
4281                wrapping_key_id: 0x1234567812345678u128.to_le_bytes(),
4282                key: WrappedKeyBytes::from([0xff; FXFS_WRAPPED_KEY_SIZE]),
4283            }),
4284            mutations_cipher_offset: 0x1234567812345678,
4285            encrypted_mutations_object_id: 0x1234567812345678,
4286            internal_directory_object_id: INVALID_OBJECT_ID,
4287        };
4288        let mut serialized_info = Vec::new();
4289        info.serialize_with_version(&mut serialized_info).unwrap();
4290        assert!(
4291            serialized_info.len() <= MAX_STORE_INFO_SERIALIZED_SIZE,
4292            "{}",
4293            serialized_info.len()
4294        );
4295    }
4296
4297    async fn reopen_after_crypt_failure_inner(read_only: bool) {
4298        let fs = test_filesystem().await;
4299        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
4300
4301        let store = {
4302            let crypt = Arc::new(new_insecure_crypt());
4303            let store = root_volume
4304                .new_volume(
4305                    "vol",
4306                    NewChildStoreOptions {
4307                        options: StoreOptions {
4308                            crypt: Some(crypt.clone()),
4309                            ..StoreOptions::default()
4310                        },
4311                        ..Default::default()
4312                    },
4313                )
4314                .await
4315                .expect("new_volume failed");
4316            let root_directory = Directory::open(&store, store.root_directory_object_id())
4317                .await
4318                .expect("open failed");
4319            let mut transaction = fs
4320                .clone()
4321                .new_transaction(
4322                    lock_keys![LockKey::object(
4323                        store.store_object_id(),
4324                        root_directory.object_id()
4325                    )],
4326                    Options::default(),
4327                )
4328                .await
4329                .expect("new_transaction failed");
4330            root_directory
4331                .create_child_file(&mut transaction, "test")
4332                .await
4333                .expect("create_child_file failed");
4334            transaction.commit().await.expect("commit failed");
4335
4336            crypt.shutdown();
4337            let mut transaction = fs
4338                .clone()
4339                .new_transaction(
4340                    lock_keys![LockKey::object(
4341                        store.store_object_id(),
4342                        root_directory.object_id()
4343                    )],
4344                    Options::default(),
4345                )
4346                .await
4347                .expect("new_transaction failed");
4348            root_directory
4349                .create_child_file(&mut transaction, "test2")
4350                .await
4351                .map(|_| ())
4352                .expect_err("create_child_file should fail");
4353            store.lock().await.expect("lock failed");
4354            store
4355        };
4356
4357        let crypt = Arc::new(new_insecure_crypt());
4358        if read_only {
4359            store.unlock_read_only(crypt).await.expect("unlock failed");
4360        } else {
4361            store.unlock(NO_OWNER, crypt).await.expect("unlock failed");
4362        }
4363        let root_directory =
4364            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
4365        root_directory.lookup("test").await.expect("lookup failed").expect("not found");
4366    }
4367
4368    #[fuchsia::test(threads = 10)]
4369    async fn test_reopen_after_crypt_failure() {
4370        reopen_after_crypt_failure_inner(false).await;
4371    }
4372
4373    #[fuchsia::test(threads = 10)]
4374    async fn test_reopen_read_only_after_crypt_failure() {
4375        reopen_after_crypt_failure_inner(true).await;
4376    }
4377
4378    #[fuchsia::test(threads = 10)]
4379    #[should_panic(expected = "Insufficient reservation space")]
4380    #[cfg(debug_assertions)]
4381    async fn large_transaction_causes_panic_in_debug_builds() {
4382        let fs = test_filesystem().await;
4383        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
4384        let store = root_volume
4385            .new_volume("vol", NewChildStoreOptions::default())
4386            .await
4387            .expect("new_volume failed");
4388        let root_directory =
4389            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
4390        let mut transaction = fs
4391            .clone()
4392            .new_transaction(
4393                lock_keys![LockKey::object(store.store_object_id(), root_directory.object_id())],
4394                Options::default(),
4395            )
4396            .await
4397            .expect("transaction");
4398        for i in 0..500 {
4399            root_directory
4400                .create_symlink(&mut transaction, b"link", &format!("{}", i))
4401                .await
4402                .expect("symlink");
4403        }
4404        assert_eq!(transaction.commit().await.expect("commit"), 0);
4405    }
4406
4407    #[fuchsia::test]
4408    async fn test_crypt_failure_does_not_fuse_journal() {
4409        let fs = test_filesystem().await;
4410
4411        struct Owner;
4412        #[async_trait]
4413        impl StoreOwner for Owner {
4414            async fn force_lock(self: Arc<Self>, store: &ObjectStore) -> Result<(), anyhow::Error> {
4415                store.lock().await
4416            }
4417        }
4418        let owner = Arc::new(Owner) as Arc<dyn StoreOwner>;
4419
4420        {
4421            // Create two stores and a record for each store, so the journal will need to flush them
4422            // both later.
4423            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
4424            let store1 = root_volume
4425                .new_volume(
4426                    "vol1",
4427                    NewChildStoreOptions {
4428                        options: StoreOptions {
4429                            crypt: Some(Arc::new(new_insecure_crypt())),
4430                            ..StoreOptions::default()
4431                        },
4432                        ..Default::default()
4433                    },
4434                )
4435                .await
4436                .expect("new_volume failed");
4437            let crypt = Arc::new(new_insecure_crypt());
4438            let store2 = root_volume
4439                .new_volume(
4440                    "vol2",
4441                    NewChildStoreOptions {
4442                        options: StoreOptions {
4443                            owner: Arc::downgrade(&owner),
4444                            crypt: Some(crypt.clone()),
4445                        },
4446                        ..Default::default()
4447                    },
4448                )
4449                .await
4450                .expect("new_volume failed");
4451            for store in [&store1, &store2] {
4452                let root_directory = Directory::open(store, store.root_directory_object_id())
4453                    .await
4454                    .expect("open failed");
4455                let mut transaction = fs
4456                    .clone()
4457                    .new_transaction(
4458                        lock_keys![LockKey::object(
4459                            store.store_object_id(),
4460                            root_directory.object_id()
4461                        )],
4462                        Options::default(),
4463                    )
4464                    .await
4465                    .expect("new_transaction failed");
4466                root_directory
4467                    .create_child_file(&mut transaction, "test")
4468                    .await
4469                    .expect("create_child_file failed");
4470                transaction.commit().await.expect("commit failed");
4471            }
4472            // Shut down the crypt instance for store2, and then compact.  Compaction should not
4473            // fail, and the store should become locked.
4474            crypt.shutdown();
4475            fs.journal().force_compact().await.expect("compact failed");
4476            // The store should now be locked.
4477            assert!(store2.is_locked());
4478        }
4479
4480        // Even though the store wasn't flushed, the mutation to store2 will still be valid as it is
4481        // held in the journal.
4482        fs.close().await.expect("close failed");
4483        let device = fs.take_device().await;
4484        device.reopen(false);
4485        let fs = FxFilesystem::open(device).await.expect("open failed");
4486        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
4487
4488        for volume_name in ["vol1", "vol2"] {
4489            let store = root_volume
4490                .volume(
4491                    volume_name,
4492                    StoreOptions {
4493                        crypt: Some(Arc::new(new_insecure_crypt())),
4494                        ..StoreOptions::default()
4495                    },
4496                )
4497                .await
4498                .expect("open volume failed");
4499            let root_directory = Directory::open(&store, store.root_directory_object_id())
4500                .await
4501                .expect("open failed");
4502            assert!(root_directory.lookup("test").await.expect("lookup failed").is_some());
4503        }
4504
4505        fs.close().await.expect("close failed");
4506    }
4507
4508    #[fuchsia::test]
4509    async fn test_crypt_failure_during_unlock_race() {
4510        let fs = test_filesystem().await;
4511
4512        struct Owner;
4513        #[async_trait]
4514        impl StoreOwner for Owner {
4515            async fn force_lock(self: Arc<Self>, store: &ObjectStore) -> Result<(), anyhow::Error> {
4516                store.lock().await
4517            }
4518        }
4519        let owner = Arc::new(Owner) as Arc<dyn StoreOwner>;
4520
4521        let store_object_id = {
4522            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
4523            let store = root_volume
4524                .new_volume(
4525                    "vol",
4526                    NewChildStoreOptions {
4527                        options: StoreOptions {
4528                            owner: Arc::downgrade(&owner),
4529                            crypt: Some(Arc::new(new_insecure_crypt())),
4530                        },
4531                        ..Default::default()
4532                    },
4533                )
4534                .await
4535                .expect("new_volume failed");
4536            let root_directory = Directory::open(&store, store.root_directory_object_id())
4537                .await
4538                .expect("open failed");
4539            let mut transaction = fs
4540                .clone()
4541                .new_transaction(
4542                    lock_keys![LockKey::object(
4543                        store.store_object_id(),
4544                        root_directory.object_id()
4545                    )],
4546                    Options::default(),
4547                )
4548                .await
4549                .expect("new_transaction failed");
4550            root_directory
4551                .create_child_file(&mut transaction, "test")
4552                .await
4553                .expect("create_child_file failed");
4554            transaction.commit().await.expect("commit failed");
4555            store.store_object_id()
4556        };
4557
4558        fs.close().await.expect("close failed");
4559        let device = fs.take_device().await;
4560        device.reopen(false);
4561
4562        let fs = FxFilesystem::open(device).await.expect("open failed");
4563        {
4564            let fs_clone = fs.clone();
4565            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
4566
4567            let crypt = Arc::new(new_insecure_crypt());
4568            let crypt_clone = crypt.clone();
4569            join!(
4570                async move {
4571                    // Unlock might fail, so ignore errors.
4572                    let _ = root_volume
4573                        .volume(
4574                            "vol",
4575                            StoreOptions {
4576                                owner: Arc::downgrade(&owner),
4577                                crypt: Some(crypt_clone),
4578                            },
4579                        )
4580                        .await;
4581                },
4582                async move {
4583                    // Block until unlock is finished but before flushing due to unlock is finished, to
4584                    // maximize the chances of weirdness.
4585                    let keys = lock_keys![LockKey::flush(store_object_id)];
4586                    let _ = fs_clone.lock_manager().write_lock(keys).await;
4587                    crypt.shutdown();
4588                }
4589            );
4590        }
4591
4592        fs.close().await.expect("close failed");
4593        let device = fs.take_device().await;
4594        device.reopen(false);
4595
4596        let fs = FxFilesystem::open(device).await.expect("open failed");
4597        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
4598        let store = root_volume
4599            .volume(
4600                "vol",
4601                StoreOptions {
4602                    crypt: Some(Arc::new(new_insecure_crypt())),
4603                    ..StoreOptions::default()
4604                },
4605            )
4606            .await
4607            .expect("open volume failed");
4608        let root_directory =
4609            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
4610        assert!(root_directory.lookup("test").await.expect("lookup failed").is_some());
4611
4612        fs.close().await.expect("close failed");
4613    }
4614
4615    #[fuchsia::test]
4616    async fn test_low_32_bit_object_ids() {
4617        let device = DeviceHolder::new(FakeDevice::new(16384, TEST_DEVICE_BLOCK_SIZE));
4618        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
4619
4620        {
4621            let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
4622
4623            let store = root_vol
4624                .new_volume(
4625                    "test",
4626                    NewChildStoreOptions { low_32_bit_object_ids: true, ..Default::default() },
4627                )
4628                .await
4629                .expect("new_volume failed");
4630
4631            let root_dir = Directory::open(&store, store.root_directory_object_id())
4632                .await
4633                .expect("open failed");
4634
4635            let mut ids = std::collections::HashSet::new();
4636
4637            for i in 0..100 {
4638                let mut transaction = fs
4639                    .clone()
4640                    .new_transaction(
4641                        lock_keys![LockKey::object(store.store_object_id(), root_dir.object_id())],
4642                        Options::default(),
4643                    )
4644                    .await
4645                    .expect("new_transaction failed");
4646
4647                for j in 0..100 {
4648                    let object = root_dir
4649                        .create_child_dir(&mut transaction, &format!("{i}.{j}"))
4650                        .await
4651                        .expect("create_child_file failed");
4652
4653                    assert!(object.object_id() < 1 << 32);
4654                    assert_ne!(object.object_id(), INVALID_OBJECT_ID);
4655                    assert!(ids.insert(object.object_id()));
4656                }
4657
4658                transaction.commit().await.expect("commit failed");
4659            }
4660
4661            assert_matches!(store.store_info().unwrap().last_object_id, LastObjectIdInfo::Low32Bit);
4662
4663            fsck_volume(&fs, store.store_object_id(), None).await.expect("fsck_volume failed");
4664        }
4665
4666        // Verify persistence
4667        fs.close().await.expect("Close failed");
4668        let device = fs.take_device().await;
4669        device.reopen(false);
4670        let fs = FxFilesystem::open(device).await.expect("open failed");
4671        let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
4672        let store = root_vol.volume("test", StoreOptions::default()).await.expect("volume failed");
4673
4674        // Check that we can still create files and they have low 32-bit IDs.
4675        let root_dir =
4676            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
4677        let mut transaction = fs
4678            .clone()
4679            .new_transaction(
4680                lock_keys![LockKey::object(store.store_object_id(), root_dir.object_id())],
4681                Options::default(),
4682            )
4683            .await
4684            .expect("new_transaction failed");
4685
4686        let object = root_dir
4687            .create_child_file(&mut transaction, "persistence_check")
4688            .await
4689            .expect("create_child_file failed");
4690        assert!(object.object_id() < 1 << 32);
4691
4692        transaction.commit().await.expect("commit failed");
4693
4694        assert_matches!(store.store_info().unwrap().last_object_id, LastObjectIdInfo::Low32Bit);
4695    }
4696}