Skip to main content

fxfs/
object_store.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5pub mod allocator;
6pub mod caching_object_handle;
7pub mod data_object_handle;
8pub mod directory;
9mod extent_mapping_iterator;
10mod extent_record;
11mod flush;
12pub mod graveyard;
13mod install;
14pub mod journal;
15mod key_manager;
16pub(crate) mod merge;
17pub mod object_manager;
18pub mod object_record;
19pub mod project_id;
20mod store_object_handle;
21pub mod transaction;
22mod tree;
23mod tree_cache;
24pub mod volume;
25
26pub use data_object_handle::{
27    DataObjectHandle, DirectWriter, FileExtent, FsverityState, FsverityStateInner, RangeType,
28};
29pub use directory::Directory;
30pub use object_record::{ChildValue, DirType, ObjectDescriptor, PosixAttributes, Timestamp};
31pub use store_object_handle::{
32    EXTENDED_ATTRIBUTE_RANGE_END, EXTENDED_ATTRIBUTE_RANGE_START, SetExtendedAttributeMode,
33    StoreObjectHandle,
34};
35
36use crate::errors::FxfsError;
37use crate::filesystem::{
38    ApplyContext, ApplyMode, FxFilesystem, JournalingObject, MAX_FILE_SIZE, SyncOptions,
39    TruncateGuard, TxnGuard,
40};
41use crate::log::*;
42use crate::lsm_tree::cache::{NullCache, ObjectCache};
43use crate::lsm_tree::types::{Existence, Item, ItemRef, LayerIterator};
44use crate::lsm_tree::{LSMTree, Query};
45use crate::object_handle::{INVALID_OBJECT_ID, ObjectHandle, ObjectProperties, ReadObjectHandle};
46use crate::object_store::allocator::Allocator;
47use crate::object_store::graveyard::Graveyard;
48use crate::object_store::journal::{JournalCheckpoint, JournalCheckpointV32, JournaledTransaction};
49use crate::object_store::key_manager::KeyManager;
50use crate::object_store::transaction::{
51    AssocObj, AssociatedObject, LockKey, ObjectStoreMutation, Operation, Options, Transaction,
52    lock_keys,
53};
54use crate::range::RangeExt;
55use crate::round::round_up;
56use crate::serialized_types::{Version, Versioned, VersionedLatest};
57use anyhow::{Context, Error, anyhow, bail, ensure};
58use async_trait::async_trait;
59use fidl_fuchsia_io as fio;
60use fprint::TypeFingerprint;
61use fuchsia_sync::Mutex;
62use fxfs_crypto::ff1::Ff1;
63use fxfs_crypto::{
64    CipherHolder, Crypt, KeyPurpose, ObjectType, StreamCipher, UnwrappedKey, WrappingKeyId,
65    key_to_cipher,
66};
67use fxfs_macros::{Migrate, migrate_to_version};
68use rand::RngCore;
69use scopeguard::ScopeGuard;
70use serde::{Deserialize, Serialize};
71use std::collections::HashSet;
72use std::fmt;
73use std::num::NonZero;
74use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
75use std::sync::{Arc, OnceLock, Weak};
76use storage_device::Device;
77use uuid::Uuid;
78
79pub use extent_record::{
80    BLOB_MERKLE_ATTRIBUTE_ID, BLOB_METADATA_ATTRIBUTE_ID, DEFAULT_DATA_ATTRIBUTE_ID, ExtentKey,
81    ExtentMode, ExtentValue, FSVERITY_MERKLE_ATTRIBUTE_ID,
82};
83pub use object_record::{
84    AttributeKey, EncryptionKey, EncryptionKeys, ExtendedAttributeValue, FsverityMetadata, FxfsKey,
85    FxfsKeyV40, FxfsKeyV49, ObjectAttributes, ObjectKey, ObjectKeyData, ObjectKind, ObjectValue,
86    ProjectProperty, RootDigest,
87};
88pub use transaction::Mutation;
89
90// For encrypted stores, the lower 32 bits of the object ID are encrypted to make side-channel
91// attacks more difficult. This mask can be used to extract the hi part of the object ID.
92const OBJECT_ID_HI_MASK: u64 = 0xffffffff00000000;
93
94// At time of writing, this threshold limits transactions that delete extents to about 10,000 bytes.
95const TRANSACTION_MUTATION_THRESHOLD: usize = 200;
96
97// Encrypted files and directories use the fscrypt key (identified by `FSCRYPT_KEY_ID`) to encrypt
98// file contents and filenames respectively. All non-fscrypt encrypted files otherwise default to
99// using the `VOLUME_DATA_KEY_ID` key. Note, the filesystem always uses the `VOLUME_DATA_KEY_ID`
100// key to encrypt large extended attributes. Thus, encrypted files and directories with large
101// xattrs will have both an fscrypt and volume data key.
102pub const VOLUME_DATA_KEY_ID: u64 = 0;
103pub const FSCRYPT_KEY_ID: u64 = 1;
104
105/// A constant that can be used where an owner is expected of type `Weak<dyn StoreOwner>` but no
106/// owner is required.
107pub const NO_OWNER: Weak<()> = Weak::new();
108impl StoreOwner for () {}
109
110#[async_trait]
111pub trait StoreOwner: Send + Sync {
112    /// Forcibly lock the store.  This exists to give the StoreOwner an opportunity to clean up
113    /// tasks which might access the store before locking it, because ObjectStore::unlock can only
114    /// be called when the store is not in use.
115    async fn force_lock(self: Arc<Self>, _store: &ObjectStore) -> Result<(), Error> {
116        Err(anyhow!(FxfsError::Internal))
117    }
118}
119
120/// DataObjectHandle stores an owner that must implement this trait, which allows the handle to get
121/// back to an ObjectStore.
122pub trait HandleOwner: AsRef<ObjectStore> + Send + Sync + 'static {}
123
124/// StoreInfo stores information about the object store.  This is stored within the parent object
125/// store, and is used, for example, to get the persistent layer objects.
126pub type StoreInfo = StoreInfoV52;
127
128#[derive(Clone, Debug, Default, Serialize, Deserialize, TypeFingerprint, Versioned)]
129pub struct StoreInfoV52 {
130    /// The globally unique identifier for the associated object store. If unset, will be all zero.
131    guid: [u8; 16],
132
133    /// The last used object ID.  Note that this field is not accurate in memory; ObjectStore's
134    /// last_object_id field is the one to use in that case.  Technically, this might not be the
135    /// last object ID used for the latest transaction that created an object because we use this at
136    /// the point of creating the object but before we commit the transaction.  Transactions can
137    /// then get committed in an arbitrary order (or not at all).
138    last_object_id: LastObjectIdInfo,
139
140    /// Object ids for layers.  TODO(https://fxbug.dev/42178036): need a layer of indirection here
141    /// so we can support snapshots.
142    pub layers: Vec<u64>,
143
144    /// The object ID for the root directory.
145    root_directory_object_id: u64,
146
147    /// The object ID for the graveyard.
148    graveyard_directory_object_id: u64,
149
150    /// The number of live objects in the store.  This should *not* be trusted; it can be invalid
151    /// due to filesystem inconsistencies.
152    object_count: u64,
153
154    /// The (wrapped) key that encrypted mutations should use.
155    mutations_key: Option<FxfsKeyV49>,
156
157    /// Mutations for the store are encrypted using a stream cipher.  To decrypt the mutations, we
158    /// need to know the offset in the cipher stream to start it.
159    mutations_cipher_offset: u64,
160
161    /// If we have to flush the store whilst we do not have the key, we need to write the encrypted
162    /// mutations to an object. This is the object ID of that file if it exists.
163    pub encrypted_mutations_object_id: u64,
164
165    /// A directory for storing internal files in a directory structure. Holds INVALID_OBJECT_ID
166    /// when the directory doesn't yet exist.
167    internal_directory_object_id: u64,
168}
169
170#[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint)]
171enum LastObjectIdInfo {
172    Unencrypted {
173        id: u64,
174    },
175    Encrypted {
176        /// The *unencrypted* value of the last object ID.
177        id: u64,
178
179        /// Object IDs are encrypted to reduce the amount of information that sequential object IDs
180        /// reveal (such as the number of files in the system and the ordering of their creation in
181        /// time).  Only the bottom 32 bits of the object ID are encrypted whilst the top 32 bits
182        /// will increment after 2^32 object IDs have been used and this allows us to roll the key.
183        key: FxfsKeyV49,
184    },
185    Low32Bit,
186}
187
188impl Default for LastObjectIdInfo {
189    fn default() -> Self {
190        LastObjectIdInfo::Unencrypted { id: 0 }
191    }
192}
193
194#[derive(Default, Serialize, Deserialize, TypeFingerprint, Versioned)]
195pub struct StoreInfoV49 {
196    guid: [u8; 16],
197    last_object_id: u64,
198    layers: Vec<u64>,
199    root_directory_object_id: u64,
200    graveyard_directory_object_id: u64,
201    object_count: u64,
202    mutations_key: Option<FxfsKeyV49>,
203    mutations_cipher_offset: u64,
204    encrypted_mutations_object_id: u64,
205    object_id_key: Option<FxfsKeyV49>,
206    internal_directory_object_id: u64,
207}
208
209impl From<StoreInfoV49> for StoreInfoV52 {
210    fn from(value: StoreInfoV49) -> Self {
211        Self {
212            guid: value.guid,
213            last_object_id: if let Some(key) = value.object_id_key {
214                LastObjectIdInfo::Encrypted { id: value.last_object_id, key: key }
215            } else {
216                LastObjectIdInfo::Unencrypted { id: value.last_object_id }
217            },
218            layers: value.layers,
219            root_directory_object_id: value.root_directory_object_id,
220            graveyard_directory_object_id: value.graveyard_directory_object_id,
221            object_count: value.object_count,
222            mutations_key: value.mutations_key,
223            mutations_cipher_offset: value.mutations_cipher_offset,
224            encrypted_mutations_object_id: value.encrypted_mutations_object_id,
225            internal_directory_object_id: value.internal_directory_object_id,
226        }
227    }
228}
229
230#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
231#[migrate_to_version(StoreInfoV49)]
232pub struct StoreInfoV40 {
233    guid: [u8; 16],
234    last_object_id: u64,
235    layers: Vec<u64>,
236    root_directory_object_id: u64,
237    graveyard_directory_object_id: u64,
238    object_count: u64,
239    mutations_key: Option<FxfsKeyV40>,
240    mutations_cipher_offset: u64,
241    encrypted_mutations_object_id: u64,
242    object_id_key: Option<FxfsKeyV40>,
243    internal_directory_object_id: u64,
244}
245
246impl StoreInfo {
247    /// Returns the parent objects for this store.
248    pub fn parent_objects(&self) -> Vec<u64> {
249        // We should not include the ID of the store itself, since that should be referred to in the
250        // volume directory.
251        let mut objects = self.layers.to_vec();
252        if self.encrypted_mutations_object_id != INVALID_OBJECT_ID {
253            objects.push(self.encrypted_mutations_object_id);
254        }
255        objects
256    }
257}
258
259// TODO(https://fxbug.dev/42178037): We should test or put checks in place to ensure this limit isn't exceeded.
260// It will likely involve placing limits on the maximum number of layers.
261pub const MAX_STORE_INFO_SERIALIZED_SIZE: usize = 131072;
262
263// This needs to be large enough to accommodate the maximum amount of unflushed data (data that is
264// in the journal but hasn't yet been written to layer files) for a store.  We set a limit because
265// we want to limit the amount of memory use in the case the filesystem is corrupt or under attack.
266pub const MAX_ENCRYPTED_MUTATIONS_SIZE: usize = 8 * journal::DEFAULT_RECLAIM_SIZE as usize;
267
268#[derive(Default)]
269pub struct HandleOptions {
270    /// If true, transactions used by this handle will skip journal space checks.
271    pub skip_journal_checks: bool,
272    /// If true, data written to any attribute of this handle will not have per-block checksums
273    /// computed.
274    pub skip_checksums: bool,
275    /// If true, any files using fsverity will not attempt to perform any verification. This is
276    /// useful to open an object without the correct encryption keys to look at the metadata.
277    pub skip_fsverity: bool,
278}
279
280/// Parameters for encrypting a newly created object.
281pub struct ObjectEncryptionOptions {
282    /// If set, the keys are treated as permanent and never evicted from the KeyManager cache.
283    /// This is necessary when keys are managed by another store; for example, the layer files
284    /// of a child store are objects in the root store, but they are encrypted with keys from the
285    /// child store.  Generally, most objects should have this set to `false`.
286    pub permanent: bool,
287    pub key_id: u64,
288    pub key: EncryptionKey,
289    pub unwrapped_key: UnwrappedKey,
290}
291
292pub struct StoreOptions {
293    /// The owner of the store.
294    pub owner: Weak<dyn StoreOwner>,
295
296    /// The store is unencrypted if store is none.
297    pub crypt: Option<Arc<dyn Crypt>>,
298}
299
300impl Default for StoreOptions {
301    fn default() -> Self {
302        Self { owner: NO_OWNER, crypt: None }
303    }
304}
305
306#[derive(Default)]
307pub struct NewChildStoreOptions {
308    pub options: StoreOptions,
309
310    /// Specifies the object ID in the root store to be used for the store.  If set to
311    /// INVALID_OBJECT_ID (the default and typical case), a suitable ID will be chosen.
312    pub object_id: u64,
313
314    /// If true, reserve all 32 bit object_ids.  All new objects will start with IDs exceeding
315    /// 0x1_0000_0000.
316    pub reserve_32bit_object_ids: bool,
317
318    /// Object IDs will be restricted to 32 bits.  This involves a less performant algorithm and so
319    /// should not be used unless necessary.
320    pub low_32_bit_object_ids: bool,
321
322    /// If set, use this GUID for the new store.
323    pub guid: Option<[u8; 16]>,
324}
325
326pub type EncryptedMutations = EncryptedMutationsV49;
327
328#[derive(Clone, Default, Deserialize, Serialize, TypeFingerprint)]
329pub struct EncryptedMutationsV49 {
330    // Information about the mutations are held here, but the actual encrypted data is held within
331    // data.  For each transaction, we record the checkpoint and the count of mutations within the
332    // transaction.  The checkpoint is required for the log file offset (which we need to apply the
333    // mutations), and the version so that we can correctly decode the mutation after it has been
334    // decrypted. The count specifies the number of serialized mutations encoded in |data|.
335    transactions: Vec<(JournalCheckpointV32, u64)>,
336
337    // The encrypted mutations.
338    #[serde(with = "crate::zerocopy_serialization")]
339    data: Vec<u8>,
340
341    // If the mutations key was rolled, this holds the offset in `data` where the new key should
342    // apply.
343    mutations_key_roll: Vec<(usize, FxfsKeyV49)>,
344}
345
346impl std::fmt::Debug for EncryptedMutations {
347    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
348        f.debug_struct("EncryptedMutations")
349            .field("transactions", &self.transactions)
350            .field("len", &self.data.len())
351            .field(
352                "mutations_key_roll",
353                &self.mutations_key_roll.iter().map(|k| k.0).collect::<Vec<usize>>(),
354            )
355            .finish()
356    }
357}
358
359impl Versioned for EncryptedMutations {
360    fn max_serialized_size() -> Option<u64> {
361        Some(MAX_ENCRYPTED_MUTATIONS_SIZE as u64)
362    }
363}
364
365impl From<EncryptedMutationsV40> for EncryptedMutationsV49 {
366    fn from(value: EncryptedMutationsV40) -> Self {
367        EncryptedMutationsV49 {
368            transactions: value.transactions,
369            data: value.data,
370            mutations_key_roll: value
371                .mutations_key_roll
372                .into_iter()
373                .map(|(offset, key)| (offset, key.into()))
374                .collect(),
375        }
376    }
377}
378
379#[derive(Deserialize, Serialize, TypeFingerprint)]
380pub struct EncryptedMutationsV40 {
381    transactions: Vec<(JournalCheckpointV32, u64)>,
382    data: Vec<u8>,
383    mutations_key_roll: Vec<(usize, FxfsKeyV40)>,
384}
385
386impl Versioned for EncryptedMutationsV40 {
387    fn max_serialized_size() -> Option<u64> {
388        Some(MAX_ENCRYPTED_MUTATIONS_SIZE as u64)
389    }
390}
391
392impl EncryptedMutations {
393    fn from_replayed_mutations(
394        store_object_id: u64,
395        transactions: Vec<JournaledTransaction>,
396    ) -> Self {
397        let mut this = Self::default();
398        for JournaledTransaction { checkpoint, non_root_mutations, .. } in transactions {
399            for (object_id, mutation) in non_root_mutations {
400                if store_object_id == object_id {
401                    if let Mutation::EncryptedObjectStore(data) = mutation {
402                        this.push(&checkpoint, data);
403                    } else if let Mutation::UpdateMutationsKey(key) = mutation {
404                        this.mutations_key_roll.push((this.data.len(), key.into()));
405                    }
406                }
407            }
408        }
409        this
410    }
411
412    fn extend(&mut self, other: &EncryptedMutations) {
413        self.transactions.extend_from_slice(&other.transactions[..]);
414        self.mutations_key_roll.extend(
415            other
416                .mutations_key_roll
417                .iter()
418                .map(|(offset, key)| (offset + self.data.len(), key.clone())),
419        );
420        self.data.extend_from_slice(&other.data[..]);
421    }
422
423    fn push(&mut self, checkpoint: &JournalCheckpoint, data: Box<[u8]>) {
424        self.data.append(&mut data.into());
425        // If the checkpoint is the same as the last mutation we pushed, increment the count.
426        if let Some((last_checkpoint, count)) = self.transactions.last_mut() {
427            if last_checkpoint.file_offset == checkpoint.file_offset {
428                *count += 1;
429                return;
430            }
431        }
432        self.transactions.push((checkpoint.clone(), 1));
433    }
434}
435
436pub enum LockState {
437    Locked,
438    Unencrypted,
439    Unlocked { owner: Weak<dyn StoreOwner>, crypt: Arc<dyn Crypt> },
440
441    // The store is unlocked, but in a read-only state, and no flushes or other operations will be
442    // performed on the store.
443    UnlockedReadOnly(Arc<dyn Crypt>),
444
445    // The store is encrypted but is now in an unusable state (due to a failure to sync the journal
446    // after locking the store).  The store cannot be unlocked.
447    Invalid,
448
449    // Before we've read the StoreInfo we might not know whether the store is Locked or Unencrypted.
450    // This can happen when lazily opening stores (ObjectManager::lazy_open_store).
451    Unknown,
452
453    // The store is in the process of being locked.  Whilst the store is being locked, the store
454    // isn't usable; assertions will trip if any mutations are applied.
455    Locking,
456
457    // Whilst we're unlocking, we will replay encrypted mutations.  The store isn't usable until
458    // it's in the Unlocked state.
459    Unlocking,
460
461    // The store has been deleted.
462    Deleted,
463}
464
465impl LockState {
466    fn owner(&self) -> Option<Arc<dyn StoreOwner>> {
467        if let Self::Unlocked { owner, .. } = self { owner.upgrade() } else { None }
468    }
469}
470
471impl fmt::Debug for LockState {
472    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
473        formatter.write_str(match self {
474            LockState::Locked => "Locked",
475            LockState::Unencrypted => "Unencrypted",
476            LockState::Unlocked { .. } => "Unlocked",
477            LockState::UnlockedReadOnly(..) => "UnlockedReadOnly",
478            LockState::Invalid => "Invalid",
479            LockState::Unknown => "Unknown",
480            LockState::Locking => "Locking",
481            LockState::Unlocking => "Unlocking",
482            LockState::Deleted => "Deleted",
483        })
484    }
485}
486
487enum LastObjectId {
488    // This is used when the store is encrypted, but the key and ID isn't yet available.
489    Pending,
490
491    Unencrypted {
492        id: u64,
493    },
494
495    Encrypted {
496        // The *unencrypted* value of the last object ID.
497        id: u64,
498
499        // Encrypted stores will use a cipher to obfuscate the object ID.
500        cipher: Box<Ff1>,
501    },
502
503    Low32Bit {
504        reserved: HashSet<u32>,
505        unreserved: Vec<u32>,
506    },
507}
508
509impl LastObjectId {
510    /// Tries to get the next object ID.  Returns None if a new cipher is required because all
511    /// object IDs that can be generated with the current cipher have been exhausted, or if only
512    /// using the lower 32 bits which requires an async algorithm.
513    fn try_get_next(&mut self) -> Option<NonZero<u64>> {
514        match self {
515            LastObjectId::Unencrypted { id } => {
516                NonZero::new(id.wrapping_add(1)).inspect(|next| *id = next.get())
517            }
518            LastObjectId::Encrypted { id, cipher } => {
519                let mut next = *id;
520                let hi = next & OBJECT_ID_HI_MASK;
521                loop {
522                    if next as u32 == u32::MAX {
523                        return None;
524                    }
525                    next += 1;
526                    let candidate = hi | cipher.encrypt(next as u32) as u64;
527                    if let Some(candidate) = NonZero::new(candidate) {
528                        *id = next;
529                        return Some(candidate);
530                    }
531                }
532            }
533            _ => None,
534        }
535    }
536
537    /// Returns INVALID_OBJECT_ID if it's not possible to peek at the next object ID.
538    fn peek_next(&self) -> u64 {
539        match self {
540            LastObjectId::Unencrypted { id } => id.wrapping_add(1),
541            LastObjectId::Encrypted { id, cipher } => {
542                let mut next = *id;
543                let hi = next & OBJECT_ID_HI_MASK;
544                loop {
545                    if next as u32 == u32::MAX {
546                        return INVALID_OBJECT_ID;
547                    }
548                    next += 1;
549                    let candidate = hi | cipher.encrypt(next as u32) as u64;
550                    if candidate != INVALID_OBJECT_ID {
551                        return candidate;
552                    }
553                }
554            }
555            _ => INVALID_OBJECT_ID,
556        }
557    }
558
559    /// Returns INVALID_OBJECT_ID for algorithms that don't use the last ID.
560    fn id(&self) -> u64 {
561        match self {
562            LastObjectId::Unencrypted { id } | LastObjectId::Encrypted { id, .. } => *id,
563            _ => INVALID_OBJECT_ID,
564        }
565    }
566
567    /// Returns true if `id` is reserved (it must be 32 bits).
568    fn is_reserved(&self, id: u64) -> bool {
569        match self {
570            LastObjectId::Low32Bit { reserved, .. } => {
571                if let Ok(id) = id.try_into() {
572                    reserved.contains(&id)
573                } else {
574                    false
575                }
576            }
577            _ => false,
578        }
579    }
580
581    /// Reserves `id`.
582    fn reserve(&mut self, id: u64) {
583        match self {
584            LastObjectId::Low32Bit { reserved, .. } => {
585                assert!(reserved.insert(id.try_into().unwrap()))
586            }
587            _ => unreachable!(),
588        }
589    }
590
591    /// Unreserves `id`.
592    fn unreserve(&mut self, id: u64) {
593        match self {
594            LastObjectId::Low32Bit { unreserved, .. } => {
595                // To avoid races, where a reserved ID transitions from being reserved to being
596                // actually used in a committed transaction, we delay updating `reserved` until a
597                // suitable point.
598                //
599                // On thread A, we might have:
600                //
601                //   A1. Commit transaction (insert a record into the LSM tree that uses ID)
602                //   A2. `unreserve`
603                //
604                // And on another thread B, we might have:
605                //
606                //   B1. Drain `unreserved`.
607                //   B2. Check tree and `reserved` to see if ID is used.
608                //
609                // B2 will involve calling `LsmTree::layer_set` which should be thought of as a
610                // snapshot, so the change A1 might not be visible to thread B, but it won't matter
611                // because `reserved` will still include the ID.  So long as each thread does the
612                // operations in this order, it should be safe.
613                unreserved.push(id.try_into().unwrap())
614            }
615            _ => {}
616        }
617    }
618
619    /// Removes `unreserved` IDs from the `reserved` list.
620    fn drain_unreserved(&mut self) {
621        match self {
622            LastObjectId::Low32Bit { reserved, unreserved } => {
623                for u in unreserved.drain(..) {
624                    assert!(reserved.remove(&u));
625                }
626            }
627            _ => {}
628        }
629    }
630}
631
632pub struct ReservedId<'a>(&'a ObjectStore, NonZero<u64>);
633
634impl<'a> ReservedId<'a> {
635    fn new(store: &'a ObjectStore, id: NonZero<u64>) -> Self {
636        Self(store, id)
637    }
638
639    pub fn get(&self) -> u64 {
640        self.1.get()
641    }
642
643    /// The caller takes responsibility for this id.
644    #[must_use]
645    pub fn release(self) -> u64 {
646        let id = self.1.get();
647        std::mem::forget(self);
648        id
649    }
650}
651
652impl Drop for ReservedId<'_> {
653    fn drop(&mut self) {
654        self.0.last_object_id.lock().unreserve(self.1.get());
655    }
656}
657
658/// An object store supports a file like interface for objects.  Objects are keyed by a 64 bit
659/// identifier.  And object store has to be backed by a parent object store (which stores metadata
660/// for the object store).  The top-level object store (a.k.a. the root parent object store) is
661/// in-memory only.
662pub struct ObjectStore {
663    parent_store: Option<Arc<ObjectStore>>,
664    store_object_id: u64,
665    device: Arc<dyn Device>,
666    block_size: u64,
667    filesystem: Weak<FxFilesystem>,
668    // Lock ordering: This must be taken before `lock_state`.
669    store_info: Mutex<Option<StoreInfo>>,
670    tree: LSMTree<ObjectKey, ObjectValue>,
671
672    // When replaying the journal, the store cannot read StoreInfo until the whole journal
673    // has been replayed, so during that time, store_info_handle will be None and records
674    // just get sent to the tree. Once the journal has been replayed, we can open the store
675    // and load all the other layer information.
676    store_info_handle: OnceLock<DataObjectHandle<ObjectStore>>,
677
678    // The cipher to use for encrypted mutations, if this store is encrypted.
679    mutations_cipher: Mutex<Option<StreamCipher>>,
680
681    // Current lock state of the store.
682    // Lock ordering: This must be taken after `store_info`.
683    lock_state: Mutex<LockState>,
684    pub key_manager: KeyManager,
685
686    // Enable/disable tracing.
687    trace: AtomicBool,
688
689    // Informational counters for events occurring within the store.
690    counters: Mutex<ObjectStoreCounters>,
691
692    // These are updated in performance-sensitive code paths so we use atomics instead of counters.
693    device_read_ops: AtomicU64,
694    device_write_ops: AtomicU64,
695    logical_read_ops: AtomicU64,
696    logical_write_ops: AtomicU64,
697    graveyard_entries: AtomicU64,
698
699    // Contains the last object ID and, optionally, a cipher to be used when generating new object
700    // IDs.
701    last_object_id: Mutex<LastObjectId>,
702
703    // An optional callback to be invoked each time the ObjectStore flushes.  The callback is
704    // invoked at the end of flush, while the write lock is still held.
705    flush_callback: Mutex<Option<Box<dyn Fn(&ObjectStore) + Send + Sync + 'static>>>,
706}
707
708#[derive(Clone, Default)]
709struct ObjectStoreCounters {
710    mutations_applied: u64,
711    mutations_dropped: u64,
712    num_flushes: u64,
713    last_flush_time: Option<std::time::SystemTime>,
714}
715
716impl ObjectStore {
717    fn new(
718        parent_store: Option<Arc<ObjectStore>>,
719        store_object_id: u64,
720        filesystem: Arc<FxFilesystem>,
721        store_info: Option<StoreInfo>,
722        object_cache: Box<dyn ObjectCache<ObjectKey, ObjectValue>>,
723        mutations_cipher: Option<StreamCipher>,
724        lock_state: LockState,
725        last_object_id: LastObjectId,
726    ) -> Arc<ObjectStore> {
727        let device = filesystem.device();
728        let block_size = filesystem.block_size();
729        Arc::new(ObjectStore {
730            parent_store,
731            store_object_id,
732            device,
733            block_size,
734            filesystem: Arc::downgrade(&filesystem),
735            store_info: Mutex::new(store_info),
736            tree: LSMTree::new(merge::merge, object_cache),
737            store_info_handle: OnceLock::new(),
738            mutations_cipher: Mutex::new(mutations_cipher),
739            lock_state: Mutex::new(lock_state),
740            key_manager: KeyManager::new(),
741            trace: AtomicBool::new(false),
742            counters: Mutex::new(ObjectStoreCounters::default()),
743            device_read_ops: AtomicU64::new(0),
744            device_write_ops: AtomicU64::new(0),
745            logical_read_ops: AtomicU64::new(0),
746            logical_write_ops: AtomicU64::new(0),
747            graveyard_entries: AtomicU64::new(0),
748            last_object_id: Mutex::new(last_object_id),
749            flush_callback: Mutex::new(None),
750        })
751    }
752
753    fn new_empty(
754        parent_store: Option<Arc<ObjectStore>>,
755        store_object_id: u64,
756        filesystem: Arc<FxFilesystem>,
757        object_cache: Box<dyn ObjectCache<ObjectKey, ObjectValue>>,
758    ) -> Arc<Self> {
759        Self::new(
760            parent_store,
761            store_object_id,
762            filesystem,
763            Some(StoreInfo::default()),
764            object_cache,
765            None,
766            LockState::Unencrypted,
767            LastObjectId::Unencrypted { id: 0 },
768        )
769    }
770
771    /// Cycle breaker constructor that returns an ObjectStore without a filesystem.
772    /// This should only be used from super block code.
773    pub fn new_root_parent(device: Arc<dyn Device>, block_size: u64, store_object_id: u64) -> Self {
774        ObjectStore {
775            parent_store: None,
776            store_object_id,
777            device,
778            block_size,
779            filesystem: Weak::<FxFilesystem>::new(),
780            store_info: Mutex::new(Some(StoreInfo::default())),
781            tree: LSMTree::new(merge::merge, Box::new(NullCache {})),
782            store_info_handle: OnceLock::new(),
783            mutations_cipher: Mutex::new(None),
784            lock_state: Mutex::new(LockState::Unencrypted),
785            key_manager: KeyManager::new(),
786            trace: AtomicBool::new(false),
787            counters: Mutex::new(ObjectStoreCounters::default()),
788            device_read_ops: AtomicU64::new(0),
789            device_write_ops: AtomicU64::new(0),
790            logical_read_ops: AtomicU64::new(0),
791            logical_write_ops: AtomicU64::new(0),
792            graveyard_entries: AtomicU64::new(0),
793            last_object_id: Mutex::new(LastObjectId::Unencrypted { id: 0 }),
794            flush_callback: Mutex::new(None),
795        }
796    }
797
798    /// Used to set filesystem on root_parent stores at bootstrap time after the filesystem has
799    /// been created.
800    pub fn attach_filesystem(mut this: ObjectStore, filesystem: Arc<FxFilesystem>) -> ObjectStore {
801        this.filesystem = Arc::downgrade(&filesystem);
802        this
803    }
804
805    /// Create a child store. It is a multi-step process:
806    ///
807    ///   1. Call `ObjectStore::new_child_store`.
808    ///   2. Register the store with the object-manager.
809    ///   3. Call `ObjectStore::create` to write the store-info.
810    ///
811    /// If the procedure fails, care must be taken to unregister store with the object-manager.
812    ///
813    /// The steps have to be separate because of lifetime issues when working with a transaction.
814    async fn new_child_store(
815        self: &Arc<Self>,
816        transaction: &mut Transaction<'_>,
817        options: NewChildStoreOptions,
818        object_cache: Box<dyn ObjectCache<ObjectKey, ObjectValue>>,
819    ) -> Result<Arc<Self>, Error> {
820        ensure!(
821            !options.reserve_32bit_object_ids || !options.low_32_bit_object_ids,
822            FxfsError::InvalidArgs
823        );
824        let handle = if let Some(object_id) = NonZero::new(options.object_id) {
825            self.update_last_object_id(object_id.get());
826            let handle = ObjectStore::create_object_with_id(
827                self,
828                transaction,
829                ReservedId::new(self, object_id),
830                HandleOptions::default(),
831                None,
832            )?;
833            handle
834        } else {
835            ObjectStore::create_object(self, transaction, HandleOptions::default(), None).await?
836        };
837        let filesystem = self.filesystem();
838        let id = if options.reserve_32bit_object_ids { 0x1_0000_0000 } else { 0 };
839        let (last_object_id, last_object_id_in_memory) = if options.low_32_bit_object_ids {
840            (
841                LastObjectIdInfo::Low32Bit,
842                LastObjectId::Low32Bit { reserved: HashSet::new(), unreserved: Vec::new() },
843            )
844        } else if let Some(crypt) = &options.options.crypt {
845            let (object_id_wrapped, object_id_unwrapped) =
846                crypt.create_key(handle.object_id(), KeyPurpose::Metadata).await?;
847            (
848                LastObjectIdInfo::Encrypted { id, key: object_id_wrapped },
849                LastObjectId::Encrypted { id, cipher: Box::new(Ff1::new(&object_id_unwrapped)) },
850            )
851        } else {
852            (LastObjectIdInfo::Unencrypted { id }, LastObjectId::Unencrypted { id })
853        };
854        let store = if let Some(crypt) = options.options.crypt {
855            let (wrapped_key, unwrapped_key) =
856                crypt.create_key(handle.object_id(), KeyPurpose::Metadata).await?;
857            Self::new(
858                Some(self.clone()),
859                handle.object_id(),
860                filesystem.clone(),
861                Some(StoreInfo {
862                    mutations_key: Some(wrapped_key),
863                    last_object_id,
864                    guid: options.guid.unwrap_or_else(|| *Uuid::new_v4().as_bytes()),
865                    ..Default::default()
866                }),
867                object_cache,
868                Some(StreamCipher::new(&unwrapped_key, 0)),
869                LockState::Unlocked { owner: options.options.owner, crypt },
870                last_object_id_in_memory,
871            )
872        } else {
873            Self::new(
874                Some(self.clone()),
875                handle.object_id(),
876                filesystem.clone(),
877                Some(StoreInfo {
878                    last_object_id,
879                    guid: options.guid.unwrap_or_else(|| *Uuid::new_v4().as_bytes()),
880                    ..Default::default()
881                }),
882                object_cache,
883                None,
884                LockState::Unencrypted,
885                last_object_id_in_memory,
886            )
887        };
888        assert!(store.store_info_handle.set(handle).is_ok());
889        Ok(store)
890    }
891
892    /// Actually creates the store in a transaction.  This will also create a root directory and
893    /// graveyard directory for the store.  See `new_child_store` above.
894    async fn create<'a>(
895        self: &'a Arc<Self>,
896        transaction: &mut Transaction<'a>,
897    ) -> Result<(), Error> {
898        let buf = {
899            // Create a root directory and graveyard directory.
900            let graveyard_directory_object_id = Graveyard::create(transaction, &self).await?;
901            let root_directory = Directory::create(transaction, &self, None).await?;
902
903            let serialized_info = {
904                let mut store_info = self.store_info.lock();
905                let store_info = store_info.as_mut().unwrap();
906
907                store_info.graveyard_directory_object_id = graveyard_directory_object_id;
908                store_info.root_directory_object_id = root_directory.object_id();
909
910                let mut serialized_info = Vec::new();
911                store_info.serialize_with_version(&mut serialized_info)?;
912                serialized_info
913            };
914            let mut buf = self.device.allocate_buffer(serialized_info.len()).await;
915            buf.as_mut_slice().copy_from_slice(&serialized_info[..]);
916            buf
917        };
918
919        if self.filesystem().options().image_builder_mode.is_some() {
920            // If we're in image builder mode, we want to avoid writing to disk unless explicitly
921            // asked to. New object stores will have their StoreInfo written when we compact in
922            // FxFilesystem::finalize().
923            Ok(())
924        } else {
925            self.store_info_handle.get().unwrap().txn_write(transaction, 0u64, buf.as_ref()).await
926        }
927    }
928
929    pub fn set_trace(&self, trace: bool) {
930        let old_value = self.trace.swap(trace, Ordering::Relaxed);
931        if trace != old_value {
932            info!(store_id = self.store_object_id(), trace; "OS: trace",);
933        }
934    }
935
936    /// Sets a callback to be invoked each time the ObjectStore flushes.  The callback is invoked at
937    /// the end of flush, while the write lock is still held.
938    pub fn set_flush_callback<F: Fn(&ObjectStore) + Send + Sync + 'static>(&self, callback: F) {
939        let mut flush_callback = self.flush_callback.lock();
940        *flush_callback = Some(Box::new(callback));
941    }
942
943    pub fn is_root(&self) -> bool {
944        if let Some(parent) = &self.parent_store {
945            parent.parent_store.is_none()
946        } else {
947            // The root parent store isn't the root store.
948            false
949        }
950    }
951
952    /// Populates an inspect node with store statistics.
953    pub fn record_data(self: &Arc<Self>, root: &fuchsia_inspect::Node) {
954        // TODO(https://fxbug.dev/42069513): Push-back or rate-limit to prevent DoS.
955        let counters = self.counters.lock();
956        if let Some(store_info) = self.store_info() {
957            root.record_string("guid", Uuid::from_bytes(store_info.guid).to_string());
958        };
959        root.record_uint("store_object_id", self.store_object_id);
960        root.record_uint("mutations_applied", counters.mutations_applied);
961        root.record_uint("mutations_dropped", counters.mutations_dropped);
962        root.record_uint("num_flushes", counters.num_flushes);
963        if let Some(last_flush_time) = counters.last_flush_time.as_ref() {
964            root.record_uint(
965                "last_flush_time_ms",
966                last_flush_time
967                    .duration_since(std::time::UNIX_EPOCH)
968                    .unwrap_or(std::time::Duration::ZERO)
969                    .as_millis()
970                    .try_into()
971                    .unwrap_or(0u64),
972            );
973        }
974        root.record_uint("device_read_ops", self.device_read_ops.load(Ordering::Relaxed));
975        root.record_uint("device_write_ops", self.device_write_ops.load(Ordering::Relaxed));
976        root.record_uint("logical_read_ops", self.logical_read_ops.load(Ordering::Relaxed));
977        root.record_uint("logical_write_ops", self.logical_write_ops.load(Ordering::Relaxed));
978        root.record_uint("graveyard_entries", self.graveyard_entries.load(Ordering::Relaxed));
979        {
980            let last_object_id = self.last_object_id.lock();
981            root.record_uint("object_id_hi", last_object_id.id() >> 32);
982            root.record_bool(
983                "low_32_bit_object_ids",
984                matches!(&*last_object_id, LastObjectId::Low32Bit { .. }),
985            );
986        }
987
988        let this = self.clone();
989        root.record_child("lsm_tree", move |node| this.tree().record_inspect_data(node));
990    }
991
992    pub fn device(&self) -> &Arc<dyn Device> {
993        &self.device
994    }
995
996    pub fn block_size(&self) -> u64 {
997        self.block_size
998    }
999
1000    pub fn filesystem(&self) -> Arc<FxFilesystem> {
1001        self.filesystem.upgrade().unwrap()
1002    }
1003
1004    pub fn store_object_id(&self) -> u64 {
1005        self.store_object_id
1006    }
1007
1008    pub fn tree(&self) -> &LSMTree<ObjectKey, ObjectValue> {
1009        &self.tree
1010    }
1011
1012    pub fn root_directory_object_id(&self) -> u64 {
1013        self.store_info.lock().as_ref().unwrap().root_directory_object_id
1014    }
1015
1016    pub fn guid(&self) -> [u8; 16] {
1017        self.store_info.lock().as_ref().unwrap().guid
1018    }
1019
1020    pub fn graveyard_directory_object_id(&self) -> u64 {
1021        self.store_info.lock().as_ref().unwrap().graveyard_directory_object_id
1022    }
1023
1024    fn set_graveyard_directory_object_id(&self, oid: u64) {
1025        assert_eq!(
1026            std::mem::replace(
1027                &mut self.store_info.lock().as_mut().unwrap().graveyard_directory_object_id,
1028                oid
1029            ),
1030            INVALID_OBJECT_ID
1031        );
1032    }
1033
1034    pub fn object_count(&self) -> u64 {
1035        self.store_info.lock().as_ref().unwrap().object_count
1036    }
1037
1038    /// Returns INVALID_OBJECT_ID for algorithms that don't use the last ID.
1039    pub(crate) fn unencrypted_last_object_id(&self) -> u64 {
1040        self.last_object_id.lock().id()
1041    }
1042
1043    pub fn key_manager(&self) -> &KeyManager {
1044        &self.key_manager
1045    }
1046
1047    pub fn parent_store(&self) -> Option<&Arc<ObjectStore>> {
1048        self.parent_store.as_ref()
1049    }
1050
1051    /// Returns the crypt object for the store.  Returns None if the store is unencrypted.
1052    pub fn crypt(&self) -> Option<Arc<dyn Crypt>> {
1053        match &*self.lock_state.lock() {
1054            LockState::Locked => panic!("Store is locked"),
1055            LockState::Invalid
1056            | LockState::Unencrypted
1057            | LockState::Locking
1058            | LockState::Unlocking
1059            | LockState::Deleted => None,
1060            LockState::Unlocked { crypt, .. } => Some(crypt.clone()),
1061            LockState::UnlockedReadOnly(crypt) => Some(crypt.clone()),
1062            LockState::Unknown => {
1063                panic!("Store is of unknown lock state; has the journal been replayed yet?")
1064            }
1065        }
1066    }
1067
1068    /// Returns the id of the internal directory. Returns a NotFound error if this has not been
1069    /// initialized.
1070    pub fn get_internal_directory_id(self: &Arc<Self>) -> Result<u64, Error> {
1071        if let Some(store_info) = self.store_info.lock().as_ref() {
1072            if store_info.internal_directory_object_id == INVALID_OBJECT_ID {
1073                Err(FxfsError::NotFound.into())
1074            } else {
1075                Ok(store_info.internal_directory_object_id)
1076            }
1077        } else {
1078            Err(FxfsError::Unavailable.into())
1079        }
1080    }
1081
1082    pub async fn get_or_create_internal_directory_id(self: &Arc<Self>) -> Result<u64, Error> {
1083        // Create the transaction first to use the object store lock.
1084        let mut transaction = self
1085            .filesystem()
1086            .new_transaction(
1087                lock_keys![LockKey::InternalDirectory { store_object_id: self.store_object_id }],
1088                Options::default(),
1089            )
1090            .await?;
1091        let obj_id = self.store_info.lock().as_ref().unwrap().internal_directory_object_id;
1092        if obj_id != INVALID_OBJECT_ID {
1093            return Ok(obj_id);
1094        }
1095
1096        // Need to create an internal directory.
1097        let directory = Directory::create(&mut transaction, self, None).await?;
1098
1099        transaction.add(self.store_object_id, Mutation::CreateInternalDir(directory.object_id()));
1100        transaction.commit().await?;
1101        Ok(directory.object_id())
1102    }
1103
1104    /// Returns the file size for the object without opening the object.
1105    async fn get_file_size(&self, object_id: u64) -> Result<u64, Error> {
1106        let item = self
1107            .tree
1108            .find(&ObjectKey::attribute(
1109                object_id,
1110                DEFAULT_DATA_ATTRIBUTE_ID,
1111                AttributeKey::Attribute,
1112            ))
1113            .await?
1114            .ok_or(FxfsError::NotFound)?;
1115        if let ObjectValue::Attribute { size, .. } = item.value {
1116            Ok(size)
1117        } else {
1118            bail!(FxfsError::NotFile);
1119        }
1120    }
1121
1122    #[cfg(feature = "migration")]
1123    pub fn last_object_id(&self) -> u64 {
1124        self.last_object_id.lock().id()
1125    }
1126
1127    /// Provides access to the allocator to mark a specific region of the device as allocated.
1128    #[cfg(feature = "migration")]
1129    pub fn mark_allocated(
1130        &self,
1131        transaction: &mut Transaction<'_>,
1132        store_object_id: u64,
1133        device_range: std::ops::Range<u64>,
1134    ) -> Result<(), Error> {
1135        self.allocator().mark_allocated(transaction, store_object_id, device_range)
1136    }
1137
1138    /// `crypt` can be provided if the crypt service should be different to the default; see the
1139    /// comment on create_object.  Users should avoid having more than one handle open for the same
1140    /// object at the same time because they might get out-of-sync; there is no code that will
1141    /// prevent this.  One example where this can cause an issue is if the object ends up using a
1142    /// permanent key (which is the case if a value is passed for `crypt`), the permanent key is
1143    /// dropped when a handle is dropped, which will impact any other handles for the same object.
1144    pub async fn open_object<S: HandleOwner>(
1145        owner: &Arc<S>,
1146        obj_id: u64,
1147        options: HandleOptions,
1148        crypt: Option<Arc<dyn Crypt>>,
1149    ) -> Result<DataObjectHandle<S>, Error> {
1150        let store = owner.as_ref().as_ref();
1151        let mut fsverity_descriptor = None;
1152        let mut overwrite_ranges = Vec::new();
1153        let item = store
1154            .tree
1155            .find(&ObjectKey::attribute(obj_id, DEFAULT_DATA_ATTRIBUTE_ID, AttributeKey::Attribute))
1156            .await?
1157            .ok_or(FxfsError::NotFound)?;
1158
1159        let (size, track_overwrite_extents) = match item.value {
1160            ObjectValue::Attribute { size, has_overwrite_extents } => (size, has_overwrite_extents),
1161            ObjectValue::VerifiedAttribute { size, fsverity_metadata } => {
1162                if !options.skip_fsverity {
1163                    fsverity_descriptor = Some(fsverity_metadata);
1164                }
1165                // We only track the overwrite extents in memory for writes, reads handle them
1166                // implicitly, which means verified files (where the data won't change anymore)
1167                // don't need to track them.
1168                (size, false)
1169            }
1170            _ => bail!(anyhow!(FxfsError::Inconsistent).context("open_object: Expected attibute")),
1171        };
1172
1173        ensure!(size <= MAX_FILE_SIZE, FxfsError::Inconsistent);
1174
1175        if track_overwrite_extents {
1176            let layer_set = store.tree.layer_set();
1177            let mut merger = layer_set.merger();
1178            let mut iter = merger
1179                .query(Query::FullRange(&ObjectKey::attribute(
1180                    obj_id,
1181                    DEFAULT_DATA_ATTRIBUTE_ID,
1182                    AttributeKey::Extent(ExtentKey::search_key_from_offset(0)),
1183                )))
1184                .await?;
1185            loop {
1186                match iter.get() {
1187                    Some(ItemRef {
1188                        key:
1189                            ObjectKey {
1190                                object_id,
1191                                data:
1192                                    ObjectKeyData::Attribute(
1193                                        attribute_id,
1194                                        AttributeKey::Extent(ExtentKey { range }),
1195                                    ),
1196                            },
1197                        value,
1198                        ..
1199                    }) if *object_id == obj_id && *attribute_id == DEFAULT_DATA_ATTRIBUTE_ID => {
1200                        match value {
1201                            ObjectValue::Extent(ExtentValue::None)
1202                            | ObjectValue::Extent(ExtentValue::Some {
1203                                mode: ExtentMode::Raw,
1204                                ..
1205                            })
1206                            | ObjectValue::Extent(ExtentValue::Some {
1207                                mode: ExtentMode::Cow(_),
1208                                ..
1209                            }) => (),
1210                            ObjectValue::Extent(ExtentValue::Some {
1211                                mode: ExtentMode::OverwritePartial(_),
1212                                ..
1213                            })
1214                            | ObjectValue::Extent(ExtentValue::Some {
1215                                mode: ExtentMode::Overwrite,
1216                                ..
1217                            }) => overwrite_ranges.push(range.clone()),
1218                            _ => bail!(
1219                                anyhow!(FxfsError::Inconsistent)
1220                                    .context("open_object: Expected extent")
1221                            ),
1222                        }
1223                        iter.advance().await?;
1224                    }
1225                    _ => break,
1226                }
1227            }
1228        }
1229
1230        // If a crypt service has been specified, it needs to be a permanent key because cached
1231        // keys can only use the store's crypt service.
1232        let permanent = if let Some(crypt) = crypt {
1233            store
1234                .key_manager
1235                .get_keys(
1236                    obj_id,
1237                    crypt.as_ref(),
1238                    &mut Some(async || store.get_keys(obj_id).await),
1239                    /* permanent= */ true,
1240                    /* force= */ false,
1241                )
1242                .await?;
1243            true
1244        } else {
1245            false
1246        };
1247        let data_object_handle = DataObjectHandle::new(
1248            owner.clone(),
1249            obj_id,
1250            permanent,
1251            DEFAULT_DATA_ATTRIBUTE_ID,
1252            size,
1253            FsverityState::None,
1254            options,
1255            false,
1256            &overwrite_ranges,
1257        );
1258        if let Some(descriptor) = fsverity_descriptor {
1259            data_object_handle
1260                .set_fsverity_state_some(descriptor)
1261                .await
1262                .context("Invalid or mismatched merkle tree")?;
1263        }
1264        Ok(data_object_handle)
1265    }
1266
1267    pub fn create_object_with_id<S: HandleOwner>(
1268        owner: &Arc<S>,
1269        transaction: &mut Transaction<'_>,
1270        reserved_object_id: ReservedId<'_>,
1271        options: HandleOptions,
1272        encryption_options: Option<ObjectEncryptionOptions>,
1273    ) -> Result<DataObjectHandle<S>, Error> {
1274        let store = owner.as_ref().as_ref();
1275        // Don't permit creating unencrypted objects in an encrypted store.  The converse is OK.
1276        debug_assert!(store.crypt().is_none() || encryption_options.is_some());
1277        let now = Timestamp::now();
1278        let object_id = reserved_object_id.get();
1279        assert!(
1280            transaction
1281                .add(
1282                    store.store_object_id(),
1283                    Mutation::insert_object(
1284                        ObjectKey::object(reserved_object_id.release()),
1285                        ObjectValue::file(
1286                            1,
1287                            0,
1288                            now.clone(),
1289                            now.clone(),
1290                            now.clone(),
1291                            now,
1292                            0,
1293                            None
1294                        ),
1295                    ),
1296                )
1297                .is_none()
1298        );
1299        let mut permanent_keys = false;
1300        if let Some(ObjectEncryptionOptions { permanent, key_id, key, unwrapped_key }) =
1301            encryption_options
1302        {
1303            permanent_keys = permanent;
1304            let cipher = key_to_cipher(&key, &unwrapped_key)?;
1305            transaction.add(
1306                store.store_object_id(),
1307                Mutation::insert_object(
1308                    ObjectKey::keys(object_id),
1309                    ObjectValue::keys(vec![(key_id, key)].into()),
1310                ),
1311            );
1312            store.key_manager.insert(
1313                object_id,
1314                Arc::new(vec![(key_id, CipherHolder::Cipher(cipher))].into()),
1315                permanent,
1316            );
1317        }
1318        transaction.add(
1319            store.store_object_id(),
1320            Mutation::insert_object(
1321                ObjectKey::attribute(object_id, DEFAULT_DATA_ATTRIBUTE_ID, AttributeKey::Attribute),
1322                // This is a new object so nothing has pre-allocated overwrite extents yet.
1323                ObjectValue::attribute(0, false),
1324            ),
1325        );
1326        Ok(DataObjectHandle::new(
1327            owner.clone(),
1328            object_id,
1329            permanent_keys,
1330            DEFAULT_DATA_ATTRIBUTE_ID,
1331            0,
1332            FsverityState::None,
1333            options,
1334            false,
1335            &[],
1336        ))
1337    }
1338
1339    /// Creates an object in the store.
1340    ///
1341    /// If the store is encrypted, the object will be automatically encrypted as well.
1342    /// If `wrapping_key_id` is set, the new keys will be wrapped with that specific key, and
1343    /// otherwise the default data key is used.
1344    pub async fn create_object<S: HandleOwner>(
1345        owner: &Arc<S>,
1346        mut transaction: &mut Transaction<'_>,
1347        options: HandleOptions,
1348        wrapping_key_id: Option<WrappingKeyId>,
1349    ) -> Result<DataObjectHandle<S>, Error> {
1350        let store = owner.as_ref().as_ref();
1351        let object_id = store.get_next_object_id(transaction.txn_guard()).await?;
1352        let crypt = store.crypt();
1353        let encryption_options = if let Some(crypt) = crypt {
1354            let key_id =
1355                if wrapping_key_id.is_some() { FSCRYPT_KEY_ID } else { VOLUME_DATA_KEY_ID };
1356            let (key, unwrapped_key) = if let Some(wrapping_key_id) = wrapping_key_id {
1357                crypt.create_key_with_id(object_id.get(), wrapping_key_id, ObjectType::File).await?
1358            } else {
1359                let (fxfs_key, unwrapped_key) =
1360                    crypt.create_key(object_id.get(), KeyPurpose::Data).await?;
1361                (EncryptionKey::Fxfs(fxfs_key), unwrapped_key)
1362            };
1363            Some(ObjectEncryptionOptions { permanent: false, key_id, key, unwrapped_key })
1364        } else {
1365            None
1366        };
1367        ObjectStore::create_object_with_id(
1368            owner,
1369            &mut transaction,
1370            object_id,
1371            options,
1372            encryption_options,
1373        )
1374    }
1375
1376    /// Creates an object using explicitly provided keys.
1377    ///
1378    /// There are some cases where an encrypted object needs to be created in an unencrypted store.
1379    /// For example, when layer files for a child store are created in the root store, but they must
1380    /// be encrypted using the child store's keys.  This method exists for that purpose.
1381    pub(crate) async fn create_object_with_key<S: HandleOwner>(
1382        owner: &Arc<S>,
1383        mut transaction: &mut Transaction<'_>,
1384        object_id: ReservedId<'_>,
1385        options: HandleOptions,
1386        key: EncryptionKey,
1387        unwrapped_key: UnwrappedKey,
1388    ) -> Result<DataObjectHandle<S>, Error> {
1389        ObjectStore::create_object_with_id(
1390            owner,
1391            &mut transaction,
1392            object_id,
1393            options,
1394            Some(ObjectEncryptionOptions {
1395                permanent: true,
1396                key_id: VOLUME_DATA_KEY_ID,
1397                key,
1398                unwrapped_key,
1399            }),
1400        )
1401    }
1402
1403    /// Adjusts the reference count for a given object.  If the reference count reaches zero, the
1404    /// object is moved into the graveyard and true is returned.
1405    pub async fn adjust_refs(
1406        &self,
1407        transaction: &mut Transaction<'_>,
1408        object_id: u64,
1409        delta: i64,
1410    ) -> Result<bool, Error> {
1411        let mut mutation = self.txn_get_object_mutation(transaction, object_id).await?;
1412        let refs = if let ObjectValue::Object {
1413            kind: ObjectKind::File { refs, .. } | ObjectKind::Symlink { refs, .. },
1414            ..
1415        } = &mut mutation.item.value
1416        {
1417            *refs =
1418                refs.checked_add_signed(delta).ok_or_else(|| anyhow!("refs underflow/overflow"))?;
1419            refs
1420        } else {
1421            bail!(FxfsError::NotFile);
1422        };
1423        if *refs == 0 {
1424            self.add_to_graveyard(transaction, object_id);
1425
1426            // We might still need to adjust the reference count if delta was something other than
1427            // -1.
1428            if delta != -1 {
1429                *refs = 1;
1430                transaction.add(self.store_object_id, Mutation::ObjectStore(mutation));
1431            }
1432            // Otherwise, we don't commit the mutation as we want to keep reference count as 1 for
1433            // objects in graveyard.
1434            Ok(true)
1435        } else {
1436            transaction.add(self.store_object_id, Mutation::ObjectStore(mutation));
1437            Ok(false)
1438        }
1439    }
1440
1441    // Purges an object that is in the graveyard.
1442    pub async fn tombstone_object(
1443        &self,
1444        object_id: u64,
1445        txn_options: Options<'_>,
1446    ) -> Result<(), Error> {
1447        debug_assert!(
1448            self.tree.find(&ObjectKey::object(object_id)).await?.is_some(),
1449            "Tombstoning missing object"
1450        );
1451        debug_assert!(
1452            self.tree
1453                .find(&ObjectKey::graveyard_entry(self.graveyard_directory_object_id(), object_id))
1454                .await?
1455                .is_some(),
1456            "Tombstoning object not in graveyard"
1457        );
1458        self.key_manager.remove(object_id).await;
1459        let fs = self.filesystem();
1460        let truncate_guard = fs.truncate_guard(self.store_object_id, object_id).await;
1461        self.trim_or_tombstone(object_id, true, txn_options, &truncate_guard).await
1462    }
1463
1464    /// Trim extents beyond the end of a file for all attributes.  This will remove the entry from
1465    /// the graveyard when done.
1466    pub async fn trim(
1467        &self,
1468        object_id: u64,
1469        truncate_guard: &TruncateGuard<'_>,
1470    ) -> Result<(), Error> {
1471        // For the root and root parent store, we would need to use the metadata reservation which
1472        // we don't currently support, so assert that we're not those stores.
1473        assert!(self.parent_store.as_ref().unwrap().parent_store.is_some());
1474
1475        self.trim_or_tombstone(
1476            object_id,
1477            false,
1478            Options { borrow_metadata_space: true, ..Default::default() },
1479            truncate_guard,
1480        )
1481        .await
1482    }
1483
1484    /// Trims or tombstones an object.
1485    async fn trim_or_tombstone(
1486        &self,
1487        object_id: u64,
1488        for_tombstone: bool,
1489        txn_options: Options<'_>,
1490        _truncate_guard: &TruncateGuard<'_>,
1491    ) -> Result<(), Error> {
1492        let fs = self.filesystem();
1493        let mut next_attribute = Some(0);
1494        while let Some(attribute_id) = next_attribute.take() {
1495            let mut transaction = fs
1496                .clone()
1497                .new_transaction(
1498                    lock_keys![
1499                        LockKey::object_attribute(self.store_object_id, object_id, attribute_id),
1500                        LockKey::object(self.store_object_id, object_id),
1501                    ],
1502                    txn_options,
1503                )
1504                .await?;
1505
1506            match self
1507                .trim_some(
1508                    &mut transaction,
1509                    object_id,
1510                    attribute_id,
1511                    if for_tombstone {
1512                        TrimMode::Tombstone(TombstoneMode::Object)
1513                    } else {
1514                        TrimMode::UseSize
1515                    },
1516                )
1517                .await?
1518            {
1519                TrimResult::Incomplete => next_attribute = Some(attribute_id),
1520                TrimResult::Done(None) => {
1521                    if for_tombstone
1522                        || matches!(
1523                            self.tree
1524                                .find(&ObjectKey::graveyard_entry(
1525                                    self.graveyard_directory_object_id(),
1526                                    object_id,
1527                                ))
1528                                .await?,
1529                            Some(Item { value: ObjectValue::Trim, .. })
1530                        )
1531                    {
1532                        self.remove_from_graveyard(&mut transaction, object_id);
1533                    }
1534                    // The last attribute was not the default attribute, it may have been added to
1535                    // the graveyard alongside the object.
1536                    if for_tombstone && attribute_id != DEFAULT_DATA_ATTRIBUTE_ID {
1537                        self.remove_attribute_from_graveyard(
1538                            &mut transaction,
1539                            object_id,
1540                            attribute_id,
1541                        );
1542                    }
1543                }
1544                TrimResult::Done(id) => {
1545                    // Moved to the next attribute. This one is finished and it may have been
1546                    // added to the graveyard alongside the object.
1547                    if for_tombstone && attribute_id != DEFAULT_DATA_ATTRIBUTE_ID {
1548                        self.remove_attribute_from_graveyard(
1549                            &mut transaction,
1550                            object_id,
1551                            attribute_id,
1552                        );
1553                    }
1554                    next_attribute = id;
1555                }
1556            }
1557
1558            if !transaction.mutations().is_empty() {
1559                transaction.commit().await?;
1560            }
1561        }
1562        Ok(())
1563    }
1564
1565    // Purges an object's attribute that is in the graveyard.
1566    pub async fn tombstone_attribute(
1567        &self,
1568        object_id: u64,
1569        attribute_id: u64,
1570        txn_options: Options<'_>,
1571    ) -> Result<(), Error> {
1572        // Ensure that we don't double-delete things, it should still exist and be in the graveyard.
1573        debug_assert!(
1574            self.tree
1575                .find(&ObjectKey::attribute(object_id, attribute_id, AttributeKey::Attribute))
1576                .await?
1577                .is_some(),
1578            "Tombstoning missing attribute"
1579        );
1580        debug_assert!(
1581            self.tree
1582                .find(&ObjectKey::graveyard_attribute_entry(
1583                    self.graveyard_directory_object_id(),
1584                    object_id,
1585                    attribute_id
1586                ))
1587                .await?
1588                .is_some(),
1589            "Tombstoning attribute not in graveyard"
1590        );
1591        let fs = self.filesystem();
1592        let mut trim_result = TrimResult::Incomplete;
1593        while matches!(trim_result, TrimResult::Incomplete) {
1594            let mut transaction = fs
1595                .clone()
1596                .new_transaction(
1597                    lock_keys![
1598                        LockKey::object_attribute(self.store_object_id, object_id, attribute_id),
1599                        LockKey::object(self.store_object_id, object_id),
1600                    ],
1601                    txn_options,
1602                )
1603                .await?;
1604            trim_result = self
1605                .trim_some(
1606                    &mut transaction,
1607                    object_id,
1608                    attribute_id,
1609                    TrimMode::Tombstone(TombstoneMode::Attribute),
1610                )
1611                .await?;
1612            if let TrimResult::Done(..) = trim_result {
1613                self.remove_attribute_from_graveyard(&mut transaction, object_id, attribute_id)
1614            }
1615            if !transaction.mutations().is_empty() {
1616                transaction.commit().await?;
1617            }
1618        }
1619        Ok(())
1620    }
1621
1622    /// Deletes extents for attribute `attribute_id` in object `object_id`.  Also see the comments
1623    /// for TrimMode and TrimResult. Should hold a lock on the attribute, and the object as it
1624    /// performs a read-modify-write on the sizes.
1625    pub async fn trim_some(
1626        &self,
1627        transaction: &mut Transaction<'_>,
1628        object_id: u64,
1629        attribute_id: u64,
1630        mode: TrimMode,
1631    ) -> Result<TrimResult, Error> {
1632        let layer_set = self.tree.layer_set();
1633        let mut merger = layer_set.merger();
1634
1635        let aligned_offset = match mode {
1636            TrimMode::FromOffset(offset) => {
1637                round_up(offset, self.block_size).ok_or(FxfsError::Inconsistent)?
1638            }
1639            TrimMode::Tombstone(..) => 0,
1640            TrimMode::UseSize => {
1641                let iter = merger
1642                    .query(Query::FullRange(&ObjectKey::attribute(
1643                        object_id,
1644                        attribute_id,
1645                        AttributeKey::Attribute,
1646                    )))
1647                    .await?;
1648                if let Some(item_ref) = iter.get() {
1649                    if item_ref.key.object_id != object_id {
1650                        return Ok(TrimResult::Done(None));
1651                    }
1652
1653                    if let ItemRef {
1654                        key:
1655                            ObjectKey {
1656                                data:
1657                                    ObjectKeyData::Attribute(size_attribute_id, AttributeKey::Attribute),
1658                                ..
1659                            },
1660                        value: ObjectValue::Attribute { size, .. },
1661                        ..
1662                    } = item_ref
1663                    {
1664                        // If we found a different attribute_id, return so we can get the
1665                        // right lock.
1666                        if *size_attribute_id != attribute_id {
1667                            return Ok(TrimResult::Done(Some(*size_attribute_id)));
1668                        }
1669                        round_up(*size, self.block_size).ok_or(FxfsError::Inconsistent)?
1670                    } else {
1671                        // At time of writing, we should always see a size record or None here, but
1672                        // asserting here would be brittle so just skip to the the next attribute
1673                        // instead.
1674                        return Ok(TrimResult::Done(Some(attribute_id + 1)));
1675                    }
1676                } else {
1677                    // End of the tree.
1678                    return Ok(TrimResult::Done(None));
1679                }
1680            }
1681        };
1682
1683        // Loop over the extents and deallocate them.
1684        let mut iter = merger
1685            .query(Query::FullRange(&ObjectKey::from_extent(
1686                object_id,
1687                attribute_id,
1688                ExtentKey::search_key_from_offset(aligned_offset),
1689            )))
1690            .await?;
1691        let mut end = 0;
1692        let allocator = self.allocator();
1693        let mut result = TrimResult::Done(None);
1694        let mut deallocated = 0;
1695        let block_size = self.block_size;
1696
1697        while let Some(item_ref) = iter.get() {
1698            if item_ref.key.object_id != object_id {
1699                break;
1700            }
1701            if let ObjectKey {
1702                data: ObjectKeyData::Attribute(extent_attribute_id, attribute_key),
1703                ..
1704            } = item_ref.key
1705            {
1706                if *extent_attribute_id != attribute_id {
1707                    result = TrimResult::Done(Some(*extent_attribute_id));
1708                    break;
1709                }
1710                if let (
1711                    AttributeKey::Extent(ExtentKey { range }),
1712                    ObjectValue::Extent(ExtentValue::Some { device_offset, .. }),
1713                ) = (attribute_key, item_ref.value)
1714                {
1715                    let start = std::cmp::max(range.start, aligned_offset);
1716                    ensure!(start < range.end, FxfsError::Inconsistent);
1717                    let device_offset = device_offset
1718                        .checked_add(start - range.start)
1719                        .ok_or(FxfsError::Inconsistent)?;
1720                    end = range.end;
1721                    let len = end - start;
1722                    let device_range = device_offset..device_offset + len;
1723                    ensure!(device_range.is_aligned(block_size), FxfsError::Inconsistent);
1724                    allocator.deallocate(transaction, self.store_object_id, device_range).await?;
1725                    deallocated += len;
1726                    // Stop if the transaction is getting too big.
1727                    if transaction.mutations().len() >= TRANSACTION_MUTATION_THRESHOLD {
1728                        result = TrimResult::Incomplete;
1729                        break;
1730                    }
1731                }
1732            }
1733            iter.advance().await?;
1734        }
1735
1736        let finished_tombstone_object = matches!(mode, TrimMode::Tombstone(TombstoneMode::Object))
1737            && matches!(result, TrimResult::Done(None));
1738        let finished_tombstone_attribute =
1739            matches!(mode, TrimMode::Tombstone(TombstoneMode::Attribute))
1740                && !matches!(result, TrimResult::Incomplete);
1741        let mut object_mutation = None;
1742        let nodes = if finished_tombstone_object { -1 } else { 0 };
1743        if nodes != 0 || deallocated != 0 {
1744            let mutation = self.txn_get_object_mutation(transaction, object_id).await?;
1745            if let ObjectValue::Object { attributes: ObjectAttributes { project_id, .. }, .. } =
1746                mutation.item.value
1747            {
1748                if project_id != 0 {
1749                    transaction.add(
1750                        self.store_object_id,
1751                        Mutation::merge_object(
1752                            ObjectKey::project_usage(self.root_directory_object_id(), project_id),
1753                            ObjectValue::BytesAndNodes {
1754                                bytes: -i64::try_from(deallocated).unwrap(),
1755                                nodes,
1756                            },
1757                        ),
1758                    );
1759                }
1760                object_mutation = Some(mutation);
1761            } else {
1762                panic!("Inconsistent object type.");
1763            }
1764        }
1765
1766        // Deletion marker records *must* be merged so as to consume all other records for the
1767        // object.
1768        if finished_tombstone_object {
1769            transaction.add(
1770                self.store_object_id,
1771                Mutation::merge_object(ObjectKey::object(object_id), ObjectValue::None),
1772            );
1773        } else {
1774            if finished_tombstone_attribute {
1775                transaction.add(
1776                    self.store_object_id,
1777                    Mutation::merge_object(
1778                        ObjectKey::attribute(object_id, attribute_id, AttributeKey::Attribute),
1779                        ObjectValue::None,
1780                    ),
1781                );
1782            }
1783            if deallocated > 0 {
1784                let mut mutation = match object_mutation {
1785                    Some(mutation) => mutation,
1786                    None => self.txn_get_object_mutation(transaction, object_id).await?,
1787                };
1788                transaction.add(
1789                    self.store_object_id,
1790                    Mutation::merge_object(
1791                        ObjectKey::extent(object_id, attribute_id, aligned_offset..end),
1792                        ObjectValue::deleted_extent(),
1793                    ),
1794                );
1795                // Update allocated size.
1796                if let ObjectValue::Object {
1797                    attributes: ObjectAttributes { allocated_size, .. },
1798                    ..
1799                } = &mut mutation.item.value
1800                {
1801                    // The only way for these to fail are if the volume is inconsistent.
1802                    *allocated_size = allocated_size.checked_sub(deallocated).ok_or_else(|| {
1803                        anyhow!(FxfsError::Inconsistent).context("Allocated size overflow")
1804                    })?;
1805                } else {
1806                    panic!("Unexpected object value");
1807                }
1808                transaction.add(self.store_object_id, Mutation::ObjectStore(mutation));
1809            }
1810        }
1811        Ok(result)
1812    }
1813
1814    /// Returns all objects that exist in the parent store that pertain to this object store.
1815    /// Note that this doesn't include the object_id of the store itself which is generally
1816    /// referenced externally.
1817    pub fn parent_objects(&self) -> Vec<u64> {
1818        assert!(self.store_info_handle.get().is_some());
1819        self.store_info.lock().as_ref().unwrap().parent_objects()
1820    }
1821
1822    /// Returns root objects for this store.
1823    pub fn root_objects(&self) -> Vec<u64> {
1824        let mut objects = Vec::new();
1825        let store_info = self.store_info.lock();
1826        let info = store_info.as_ref().unwrap();
1827        if info.root_directory_object_id != INVALID_OBJECT_ID {
1828            objects.push(info.root_directory_object_id);
1829        }
1830        if info.graveyard_directory_object_id != INVALID_OBJECT_ID {
1831            objects.push(info.graveyard_directory_object_id);
1832        }
1833        if info.internal_directory_object_id != INVALID_OBJECT_ID {
1834            objects.push(info.internal_directory_object_id);
1835        }
1836        objects
1837    }
1838
1839    pub fn store_info(&self) -> Option<StoreInfo> {
1840        self.store_info.lock().as_ref().cloned()
1841    }
1842
1843    /// Returns None if called during journal replay.
1844    pub fn store_info_handle_object_id(&self) -> Option<u64> {
1845        self.store_info_handle.get().map(|h| h.object_id())
1846    }
1847
1848    pub fn graveyard_count(&self) -> u64 {
1849        self.graveyard_entries.load(Ordering::Relaxed)
1850    }
1851
1852    /// Called to open a store, before replay of this store's mutations.
1853    async fn open(
1854        parent_store: &Arc<ObjectStore>,
1855        store_object_id: u64,
1856        object_cache: Box<dyn ObjectCache<ObjectKey, ObjectValue>>,
1857    ) -> Result<Arc<ObjectStore>, Error> {
1858        let handle =
1859            ObjectStore::open_object(parent_store, store_object_id, HandleOptions::default(), None)
1860                .await?;
1861
1862        let info = load_store_info(parent_store, store_object_id).await?;
1863        let is_encrypted = info.mutations_key.is_some();
1864
1865        let mut total_layer_size = 0;
1866        let last_object_id;
1867
1868        // TODO(https://fxbug.dev/42178043): the layer size here could be bad and cause overflow.
1869
1870        // If the store is encrypted, we can't open the object tree layers now, but we need to
1871        // compute the size of the layers.
1872        if is_encrypted {
1873            for &oid in &info.layers {
1874                total_layer_size += parent_store.get_file_size(oid).await?;
1875            }
1876            if info.encrypted_mutations_object_id != INVALID_OBJECT_ID {
1877                total_layer_size += layer_size_from_encrypted_mutations_size(
1878                    parent_store.get_file_size(info.encrypted_mutations_object_id).await?,
1879                );
1880            }
1881            last_object_id = LastObjectId::Pending;
1882            ensure!(
1883                matches!(
1884                    info.last_object_id,
1885                    LastObjectIdInfo::Encrypted { .. } | LastObjectIdInfo::Low32Bit { .. }
1886                ),
1887                FxfsError::Inconsistent
1888            );
1889        } else {
1890            last_object_id = match info.last_object_id {
1891                LastObjectIdInfo::Unencrypted { id } => LastObjectId::Unencrypted { id },
1892                LastObjectIdInfo::Low32Bit => {
1893                    LastObjectId::Low32Bit { reserved: HashSet::new(), unreserved: Vec::new() }
1894                }
1895                _ => bail!(FxfsError::Inconsistent),
1896            };
1897        }
1898
1899        let fs = parent_store.filesystem();
1900
1901        let store = ObjectStore::new(
1902            Some(parent_store.clone()),
1903            store_object_id,
1904            fs.clone(),
1905            if is_encrypted { None } else { Some(info) },
1906            object_cache,
1907            None,
1908            if is_encrypted { LockState::Locked } else { LockState::Unencrypted },
1909            last_object_id,
1910        );
1911
1912        assert!(store.store_info_handle.set(handle).is_ok(), "Failed to set store_info_handle!");
1913
1914        if !is_encrypted {
1915            let object_tree_layer_object_ids =
1916                store.store_info.lock().as_ref().unwrap().layers.clone();
1917            let object_layers = store.open_layers(object_tree_layer_object_ids, None).await?;
1918            total_layer_size = object_layers.iter().map(|h| h.get_size()).sum();
1919            store
1920                .tree
1921                .append_layers(object_layers)
1922                .await
1923                .context("Failed to read object store layers")?;
1924        }
1925
1926        fs.object_manager().update_reservation(
1927            store_object_id,
1928            tree::reservation_amount_from_layer_size(total_layer_size),
1929        );
1930
1931        Ok(store)
1932    }
1933
1934    async fn load_store_info(&self) -> Result<StoreInfo, Error> {
1935        load_store_info_from_handle(self.store_info_handle.get().unwrap()).await
1936    }
1937
1938    async fn open_layers(
1939        &self,
1940        object_ids: impl std::iter::IntoIterator<Item = u64>,
1941        crypt: Option<Arc<dyn Crypt>>,
1942    ) -> Result<Vec<DataObjectHandle<ObjectStore>>, Error> {
1943        let parent_store = self.parent_store.as_ref().unwrap();
1944        let mut handles = Vec::new();
1945        for object_id in object_ids {
1946            let handle = ObjectStore::open_object(
1947                &parent_store,
1948                object_id,
1949                HandleOptions::default(),
1950                crypt.clone(),
1951            )
1952            .await
1953            .with_context(|| format!("Failed to open layer file {}", object_id))?;
1954            handles.push(handle);
1955        }
1956        Ok(handles)
1957    }
1958
1959    /// Unlocks a store so that it is ready to be used.
1960    /// This is not thread-safe.
1961    pub async fn unlock(
1962        self: &Arc<Self>,
1963        owner: Weak<dyn StoreOwner>,
1964        crypt: Arc<dyn Crypt>,
1965    ) -> Result<(), Error> {
1966        self.unlock_inner(owner, crypt, /*read_only=*/ false).await
1967    }
1968
1969    /// Unlocks a store so that it is ready to be read from.
1970    /// The store will generally behave like it is still locked: when flushed, the store will
1971    /// write out its mutations into the encrypted mutations file, rather than directly updating
1972    /// the layer files of the object store.
1973    /// Re-locking the store (which *must* be done with `Self::lock_read_only` will not trigger a
1974    /// flush, although the store might still be flushed during other operations.
1975    /// This is not thread-safe.
1976    pub async fn unlock_read_only(self: &Arc<Self>, crypt: Arc<dyn Crypt>) -> Result<(), Error> {
1977        self.unlock_inner(NO_OWNER, crypt, /*read_only=*/ true).await
1978    }
1979
1980    async fn unlock_inner(
1981        self: &Arc<Self>,
1982        owner: Weak<dyn StoreOwner>,
1983        crypt: Arc<dyn Crypt>,
1984        read_only: bool,
1985    ) -> Result<(), Error> {
1986        // Unless we are unlocking the store as read-only, the filesystem must not be read-only.
1987        assert!(read_only || !self.filesystem().options().read_only);
1988        match &*self.lock_state.lock() {
1989            LockState::Locked => {}
1990            LockState::Unencrypted => bail!(FxfsError::InvalidArgs),
1991            LockState::Invalid | LockState::Deleted => bail!(FxfsError::Internal),
1992            LockState::Unlocked { .. } | LockState::UnlockedReadOnly(..) => {
1993                bail!(FxfsError::AlreadyBound)
1994            }
1995            LockState::Unknown => panic!("Store was unlocked before replay"),
1996            LockState::Locking => panic!("Store is being locked"),
1997            LockState::Unlocking => panic!("Store is being unlocked"),
1998        }
1999        // We must lock flushing since that can modify store_info and the encrypted mutations file.
2000        let keys = lock_keys![LockKey::flush(self.store_object_id())];
2001        let fs = self.filesystem();
2002        let guard = fs.lock_manager().write_lock(keys).await;
2003
2004        let store_info = self.load_store_info().await?;
2005
2006        self.tree
2007            .append_layers(
2008                self.open_layers(store_info.layers.iter().cloned(), Some(crypt.clone())).await?,
2009            )
2010            .await
2011            .context("Failed to read object tree layer file contents")?;
2012
2013        let wrapped_key =
2014            fxfs_crypto::WrappedKey::Fxfs(store_info.mutations_key.clone().unwrap().into());
2015        let unwrapped_key = crypt
2016            .unwrap_key(&wrapped_key, self.store_object_id)
2017            .await
2018            .context("Failed to unwrap mutations keys")?;
2019        // The ChaCha20 stream cipher we use supports up to 64 GiB.  By default we'll roll the key
2020        // after every 128 MiB.  Here we just need to pick a number that won't cause issues if it
2021        // wraps, so we just use u32::MAX (the offset is u64).
2022        ensure!(store_info.mutations_cipher_offset <= u32::MAX as u64, FxfsError::Inconsistent);
2023        let mut mutations_cipher =
2024            StreamCipher::new(&unwrapped_key, store_info.mutations_cipher_offset);
2025
2026        match &store_info.last_object_id {
2027            LastObjectIdInfo::Encrypted { id, key } => {
2028                let wrapped_key = fxfs_crypto::WrappedKey::Fxfs(key.clone().into());
2029                *self.last_object_id.lock() = LastObjectId::Encrypted {
2030                    id: *id,
2031                    cipher: Box::new(Ff1::new(
2032                        &crypt.unwrap_key(&wrapped_key, self.store_object_id).await?,
2033                    )),
2034                };
2035            }
2036            LastObjectIdInfo::Low32Bit => {
2037                *self.last_object_id.lock() = LastObjectId::Low32Bit {
2038                    reserved: Default::default(),
2039                    unreserved: Default::default(),
2040                }
2041            }
2042            _ => unreachable!(),
2043        }
2044
2045        // Apply the encrypted mutations.
2046        let mut mutations = {
2047            if store_info.encrypted_mutations_object_id == INVALID_OBJECT_ID {
2048                EncryptedMutations::default()
2049            } else {
2050                let parent_store = self.parent_store.as_ref().unwrap();
2051                let handle = ObjectStore::open_object(
2052                    &parent_store,
2053                    store_info.encrypted_mutations_object_id,
2054                    HandleOptions::default(),
2055                    None,
2056                )
2057                .await?;
2058                let mut cursor = std::io::Cursor::new(
2059                    handle
2060                        .contents(MAX_ENCRYPTED_MUTATIONS_SIZE)
2061                        .await
2062                        .context(FxfsError::Inconsistent)?,
2063                );
2064                let mut mutations = EncryptedMutations::deserialize_with_version(&mut cursor)
2065                    .context("Failed to deserialize EncryptedMutations")?
2066                    .0;
2067                let len = cursor.get_ref().len() as u64;
2068                while cursor.position() < len {
2069                    mutations.extend(
2070                        &EncryptedMutations::deserialize_with_version(&mut cursor)
2071                            .context("Failed to deserialize EncryptedMutations")?
2072                            .0,
2073                    );
2074                }
2075                mutations
2076            }
2077        };
2078
2079        // This assumes that the journal has no buffered mutations for this store (see Self::lock).
2080        let journaled = EncryptedMutations::from_replayed_mutations(
2081            self.store_object_id,
2082            fs.journal()
2083                .read_transactions_for_object(self.store_object_id)
2084                .await
2085                .context("Failed to read encrypted mutations from journal")?,
2086        );
2087        mutations.extend(&journaled);
2088
2089        let _ = std::mem::replace(&mut *self.lock_state.lock(), LockState::Unlocking);
2090        *self.store_info.lock() = Some(store_info);
2091
2092        // If we fail, clean up.
2093        let clean_up = scopeguard::guard((), |_| {
2094            *self.lock_state.lock() = LockState::Locked;
2095            *self.store_info.lock() = None;
2096            // Make sure we don't leave unencrypted data lying around in memory.
2097            self.tree.reset();
2098        });
2099
2100        let EncryptedMutations { transactions, mut data, mutations_key_roll } = mutations;
2101
2102        let mut slice = &mut data[..];
2103        let mut last_offset = 0;
2104        for (offset, key) in mutations_key_roll {
2105            let split_offset = offset
2106                .checked_sub(last_offset)
2107                .ok_or(FxfsError::Inconsistent)
2108                .context("Invalid mutation key roll offset")?;
2109            last_offset = offset;
2110            ensure!(split_offset <= slice.len(), FxfsError::Inconsistent);
2111            let (old, new) = slice.split_at_mut(split_offset);
2112            mutations_cipher.decrypt(old);
2113            let unwrapped_key = crypt
2114                .unwrap_key(&fxfs_crypto::WrappedKey::Fxfs(key.into()), self.store_object_id)
2115                .await
2116                .context("Failed to unwrap mutations keys")?;
2117            mutations_cipher = StreamCipher::new(&unwrapped_key, 0);
2118            slice = new;
2119        }
2120        mutations_cipher.decrypt(slice);
2121
2122        // Always roll the mutations key when we unlock which guarantees we won't reuse a
2123        // previous key and nonce.
2124        self.roll_mutations_key(crypt.as_ref()).await?;
2125
2126        let mut cursor = std::io::Cursor::new(data);
2127        for (checkpoint, count) in transactions {
2128            let context = ApplyContext { mode: ApplyMode::Replay, checkpoint };
2129            for _ in 0..count {
2130                let mutation =
2131                    Mutation::deserialize_from_version(&mut cursor, context.checkpoint.version)
2132                        .context("failed to deserialize encrypted mutation")?;
2133                self.apply_mutation(mutation, &context, AssocObj::None)
2134                    .context("failed to apply encrypted mutation")?;
2135            }
2136        }
2137
2138        *self.lock_state.lock() = if read_only {
2139            LockState::UnlockedReadOnly(crypt)
2140        } else {
2141            LockState::Unlocked { owner, crypt }
2142        };
2143
2144        // To avoid unbounded memory growth, we should flush the encrypted mutations now. Otherwise
2145        // it's possible for more writes to be queued and for the store to be locked before we can
2146        // flush anything and that can repeat.
2147        std::mem::drop(guard);
2148
2149        if !read_only && !self.filesystem().options().read_only {
2150            self.flush_with_reason(flush::Reason::Unlock).await?;
2151
2152            // Reap purged files within this store.
2153            let _ = self.filesystem().graveyard().initial_reap(&self).await?;
2154        }
2155
2156        // Return and cancel the clean up.
2157        Ok(ScopeGuard::into_inner(clean_up))
2158    }
2159
2160    pub fn is_locked(&self) -> bool {
2161        matches!(
2162            *self.lock_state.lock(),
2163            LockState::Locked | LockState::Locking | LockState::Unknown
2164        )
2165    }
2166
2167    /// NB: This is not the converse of `is_locked`, as there are lock states where neither are
2168    /// true.
2169    pub fn is_unlocked(&self) -> bool {
2170        matches!(
2171            *self.lock_state.lock(),
2172            LockState::Unlocked { .. } | LockState::UnlockedReadOnly { .. } | LockState::Unlocking
2173        )
2174    }
2175
2176    pub fn is_unknown(&self) -> bool {
2177        matches!(*self.lock_state.lock(), LockState::Unknown)
2178    }
2179
2180    pub fn is_encrypted(&self) -> bool {
2181        self.store_info.lock().as_ref().unwrap().mutations_key.is_some()
2182    }
2183
2184    // Locks a store.
2185    // This operation will take a flush lock on the store, in case any flushes are ongoing.  Any
2186    // ongoing store accesses might be interrupted by this.  See `Self::crypt`.
2187    // Whilst this can return an error, the store will be placed into an unusable but safe state
2188    // (i.e. no lingering unencrypted data) if an error is encountered.
2189    pub async fn lock(&self) -> Result<(), Error> {
2190        // We must lock flushing since it is not safe for that to be happening whilst we are locking
2191        // the store.
2192        let keys = lock_keys![LockKey::flush(self.store_object_id())];
2193        let fs = self.filesystem();
2194        let _guard = fs.lock_manager().write_lock(keys).await;
2195
2196        {
2197            let mut lock_state = self.lock_state.lock();
2198            if let LockState::Unlocked { .. } = &*lock_state {
2199                *lock_state = LockState::Locking;
2200            } else {
2201                panic!("Unexpected lock state: {:?}", *lock_state);
2202            }
2203        }
2204
2205        // Sync the journal now to ensure that any buffered mutations for this store make it out to
2206        // disk.  This is necessary to be able to unlock the store again.
2207        // We need to establish a barrier at this point (so that the journaled writes are observable
2208        // by any future attempts to unlock the store), hence the flush_device.
2209        let sync_result =
2210            self.filesystem().sync(SyncOptions { flush_device: true, ..Default::default() }).await;
2211
2212        *self.lock_state.lock() = if let Err(error) = &sync_result {
2213            error!(error:?; "Failed to sync journal; store will no longer be usable");
2214            LockState::Invalid
2215        } else {
2216            LockState::Locked
2217        };
2218        self.key_manager.clear();
2219        *self.store_info.lock() = None;
2220        self.tree.reset();
2221
2222        sync_result
2223    }
2224
2225    // Locks a store which was previously unlocked read-only (see `Self::unlock_read_only`).  Data
2226    // is not flushed, and instead any journaled mutations are buffered back into the ObjectStore
2227    // and will be replayed next time the store is unlocked.
2228    pub fn lock_read_only(&self) {
2229        *self.lock_state.lock() = LockState::Locked;
2230        *self.store_info.lock() = None;
2231        self.tree.reset();
2232    }
2233
2234    // Returns None if the object ID cipher needs to be created or rolled, or a more expensive
2235    // algorithm needs to be used.
2236    fn maybe_get_next_object_id(&self) -> Option<ReservedId<'_>> {
2237        self.last_object_id.lock().try_get_next().map(|id| ReservedId::new(self, id))
2238    }
2239
2240    /// Returns a new object ID that can be used.  This will create an object ID cipher if needed.
2241    ///
2242    /// If the object ID key needs to be rolled, a new transaction will be created and committed.
2243    /// This transaction does not take the filesystem lock, hence `txn_guard`.
2244    pub(super) async fn get_next_object_id(
2245        &self,
2246        txn_guard: &TxnGuard<'_>,
2247    ) -> Result<ReservedId<'_>, Error> {
2248        {
2249            let mut last_object_id = self.last_object_id.lock();
2250            if let Some(id) = last_object_id.try_get_next() {
2251                return Ok(ReservedId::new(self, id));
2252            }
2253            ensure!(
2254                !matches!(&*last_object_id, LastObjectId::Unencrypted { .. }),
2255                FxfsError::Inconsistent
2256            );
2257        }
2258
2259        let parent_store = self.parent_store().unwrap();
2260
2261        // Create a transaction (which has a lock) and then check again.
2262        //
2263        // NOTE: Since this can be a nested transaction, we must take care to avoid deadlocks; no
2264        // more locks should be taken whilst we hold this lock.
2265        let mut transaction = self
2266            .filesystem()
2267            .new_transaction(
2268                lock_keys![LockKey::object(parent_store.store_object_id, self.store_object_id)],
2269                Options {
2270                    // We must skip journal checks because this transaction might be needed to
2271                    // compact.
2272                    skip_journal_checks: true,
2273                    borrow_metadata_space: true,
2274                    txn_guard: Some(txn_guard),
2275                    ..Default::default()
2276                },
2277            )
2278            .await?;
2279
2280        let mut next_id_hi = 0;
2281
2282        let is_low_32_bit = {
2283            let mut last_object_id = self.last_object_id.lock();
2284            if let Some(id) = last_object_id.try_get_next() {
2285                // Something else raced and created/rolled the cipher.
2286                return Ok(ReservedId::new(self, id));
2287            }
2288
2289            match &*last_object_id {
2290                LastObjectId::Encrypted { id, .. } => {
2291                    // It shouldn't be possible for last_object_id to wrap within our lifetime, so
2292                    // if this happens, it's most likely due to corruption.
2293                    next_id_hi =
2294                        id.checked_add(1 << 32).ok_or(FxfsError::Inconsistent)? & OBJECT_ID_HI_MASK;
2295
2296                    info!(store_id = self.store_object_id; "Rolling object ID key");
2297
2298                    false
2299                }
2300                LastObjectId::Low32Bit { .. } => true,
2301                _ => unreachable!(),
2302            }
2303        };
2304
2305        if is_low_32_bit {
2306            // Keep picking an object ID at random until we find one free.
2307
2308            // To avoid races, this must be before we capture the layer set.
2309            self.last_object_id.lock().drain_unreserved();
2310
2311            let layer_set = self.tree.layer_set();
2312            let mut key = ObjectKey::object(0);
2313            loop {
2314                let next_id = rand::rng().next_u32() as u64;
2315                let Some(next_id) = NonZero::new(next_id) else { continue };
2316                if self.last_object_id.lock().is_reserved(next_id.get()) {
2317                    continue;
2318                }
2319                key.object_id = next_id.get();
2320                if layer_set.key_exists(&key).await? == Existence::Missing {
2321                    self.last_object_id.lock().reserve(next_id.get());
2322                    return Ok(ReservedId::new(self, next_id));
2323                }
2324            }
2325        } else {
2326            // Create a key.
2327            let (object_id_wrapped, object_id_unwrapped) = self
2328                .crypt()
2329                .unwrap()
2330                .create_key(self.store_object_id, KeyPurpose::Metadata)
2331                .await?;
2332
2333            // Normally we would use a mutation to note the updated key, but that would complicate
2334            // replay.  During replay, we need to keep track of the highest used object ID and this
2335            // is done by watching mutations to see when we create objects, and then decrypting
2336            // the object ID.  This relies on the unwrapped key being available, so as soon as
2337            // we detect the key has changed, we would need to immediately unwrap the key via the
2338            // crypt service.  Currently, this isn't easy to do during replay.  An option we could
2339            // consider would be to include the unencrypted object ID when we create objects, which
2340            // would avoid us having to decrypt the object ID during replay.
2341            //
2342            // For now and for historical reasons, the approach we take is to just write a new
2343            // version of StoreInfo here.  We must take care that we only update the key and not any
2344            // other information contained within StoreInfo because other information should only be
2345            // updated when we flush.  We are holding the lock on the StoreInfo file, so this will
2346            // prevent potential races with flushing.  To make sure we only change the key, we read
2347            // StoreInfo from storage rather than using our in-memory copy.  This won't be
2348            // performant, but rolling the object ID key will be extremely rare.
2349            let new_store_info = StoreInfo {
2350                last_object_id: LastObjectIdInfo::Encrypted {
2351                    id: next_id_hi,
2352                    key: object_id_wrapped.clone(),
2353                },
2354                ..self.load_store_info().await?
2355            };
2356
2357            self.write_store_info(&mut transaction, &new_store_info).await?;
2358
2359            transaction
2360                .commit_with_callback(|_| {
2361                    self.store_info.lock().as_mut().unwrap().last_object_id =
2362                        new_store_info.last_object_id;
2363                    match &mut *self.last_object_id.lock() {
2364                        LastObjectId::Encrypted { id, cipher } => {
2365                            **cipher = Ff1::new(&object_id_unwrapped);
2366                            *id = next_id_hi;
2367                            ReservedId::new(
2368                                self,
2369                                NonZero::new(next_id_hi | cipher.encrypt(0) as u64).unwrap(),
2370                            )
2371                        }
2372                        _ => unreachable!(),
2373                    }
2374                })
2375                .await
2376        }
2377    }
2378
2379    /// Query the next object ID that will be used. Intended for use when checking filesystem
2380    /// consistency. Prefer [`Self::get_next_object_id()`] for general use.
2381    pub(crate) fn query_next_object_id(&self) -> u64 {
2382        self.last_object_id.lock().peek_next()
2383    }
2384
2385    fn allocator(&self) -> Arc<Allocator> {
2386        self.filesystem().allocator()
2387    }
2388
2389    // If |transaction| has an impending mutation for the underlying object, returns that.
2390    // Otherwise, looks up the object from the tree and returns a suitable mutation for it.  The
2391    // mutation is returned here rather than the item because the mutation includes the operation
2392    // which has significance: inserting an object implies it's the first of its kind unlike
2393    // replacing an object.
2394    async fn txn_get_object_mutation(
2395        &self,
2396        transaction: &Transaction<'_>,
2397        object_id: u64,
2398    ) -> Result<ObjectStoreMutation, Error> {
2399        if let Some(mutation) =
2400            transaction.get_object_mutation(self.store_object_id, ObjectKey::object(object_id))
2401        {
2402            Ok(mutation.clone())
2403        } else {
2404            Ok(ObjectStoreMutation {
2405                item: self
2406                    .tree
2407                    .find(&ObjectKey::object(object_id))
2408                    .await?
2409                    .ok_or(FxfsError::Inconsistent)
2410                    .context("Object id missing")?,
2411                op: Operation::ReplaceOrInsert,
2412            })
2413        }
2414    }
2415
2416    /// Like txn_get_object_mutation but with expanded visibility.
2417    /// Only available in migration code.
2418    #[cfg(feature = "migration")]
2419    pub async fn get_object_mutation(
2420        &self,
2421        transaction: &Transaction<'_>,
2422        object_id: u64,
2423    ) -> Result<ObjectStoreMutation, Error> {
2424        self.txn_get_object_mutation(transaction, object_id).await
2425    }
2426
2427    fn update_last_object_id(&self, object_id: u64) {
2428        let mut last_object_id = self.last_object_id.lock();
2429        match &mut *last_object_id {
2430            LastObjectId::Pending => unreachable!(),
2431            LastObjectId::Unencrypted { id } => {
2432                if object_id > *id {
2433                    *id = object_id
2434                }
2435            }
2436            LastObjectId::Encrypted { id, cipher } => {
2437                // For encrypted stores, object_id will be encrypted here, so we must decrypt first.
2438
2439                // If the object ID cipher has been rolled, then it's possible we might see object
2440                // IDs that were generated using a different cipher so the decrypt here will return
2441                // the wrong value, but that won't matter because the hi part of the object ID
2442                // should still discriminate.
2443                let object_id =
2444                    object_id & OBJECT_ID_HI_MASK | cipher.decrypt(object_id as u32) as u64;
2445                if object_id > *id {
2446                    *id = object_id;
2447                }
2448            }
2449            LastObjectId::Low32Bit { .. } => {}
2450        }
2451    }
2452
2453    /// If possible, converts the given object ID to its unencrypted value.  Returns None if it is
2454    /// not possible to convert to its unencrypted value because the key is unavailable.
2455    pub fn to_unencrypted_object_id(&self, object_id: u64) -> Option<u64> {
2456        let last_object_id = self.last_object_id.lock();
2457        match &*last_object_id {
2458            LastObjectId::Pending => None,
2459            LastObjectId::Unencrypted { .. } | LastObjectId::Low32Bit { .. } => Some(object_id),
2460            LastObjectId::Encrypted { id, cipher } => {
2461                if id & OBJECT_ID_HI_MASK != object_id & OBJECT_ID_HI_MASK {
2462                    None
2463                } else {
2464                    Some(object_id & OBJECT_ID_HI_MASK | cipher.decrypt(object_id as u32) as u64)
2465                }
2466            }
2467        }
2468    }
2469
2470    /// Adds the specified object to the graveyard.
2471    pub fn add_to_graveyard(&self, transaction: &mut Transaction<'_>, object_id: u64) {
2472        let graveyard_id = self.graveyard_directory_object_id();
2473        assert_ne!(graveyard_id, INVALID_OBJECT_ID);
2474        transaction.add(
2475            self.store_object_id,
2476            Mutation::replace_or_insert_object(
2477                ObjectKey::graveyard_entry(graveyard_id, object_id),
2478                ObjectValue::Some,
2479            ),
2480        );
2481    }
2482
2483    /// Removes the specified object from the graveyard.  NB: Care should be taken when calling
2484    /// this because graveyard entries are used for purging deleted files *and* for trimming
2485    /// extents.  For example, consider the following sequence:
2486    ///
2487    ///     1. Add Trim graveyard entry.
2488    ///     2. Replace with Some graveyard entry (see above).
2489    ///     3. Remove graveyard entry.
2490    ///
2491    /// If the desire in #3 is just to cancel the effect of the Some entry, then #3 should
2492    /// actually be:
2493    ///
2494    ///     3. Replace with Trim graveyard entry.
2495    pub fn remove_from_graveyard(&self, transaction: &mut Transaction<'_>, object_id: u64) {
2496        transaction.add(
2497            self.store_object_id,
2498            Mutation::replace_or_insert_object(
2499                ObjectKey::graveyard_entry(self.graveyard_directory_object_id(), object_id),
2500                ObjectValue::None,
2501            ),
2502        );
2503    }
2504
2505    /// Removes the specified attribute from the graveyard. Unlike object graveyard entries,
2506    /// attribute graveyard entries only have one functionality (i.e. to purge deleted attributes)
2507    /// so the caller does not need to be concerned about replacing the graveyard attribute entry
2508    /// with its prior state when cancelling it. See comment on `remove_from_graveyard()`.
2509    pub fn remove_attribute_from_graveyard(
2510        &self,
2511        transaction: &mut Transaction<'_>,
2512        object_id: u64,
2513        attribute_id: u64,
2514    ) {
2515        transaction.add(
2516            self.store_object_id,
2517            Mutation::replace_or_insert_object(
2518                ObjectKey::graveyard_attribute_entry(
2519                    self.graveyard_directory_object_id(),
2520                    object_id,
2521                    attribute_id,
2522                ),
2523                ObjectValue::None,
2524            ),
2525        );
2526    }
2527
2528    // Roll the mutations key.  The new key will be written for the next encrypted mutation.
2529    async fn roll_mutations_key(&self, crypt: &dyn Crypt) -> Result<(), Error> {
2530        let (wrapped_key, unwrapped_key) =
2531            crypt.create_key(self.store_object_id, KeyPurpose::Metadata).await?;
2532
2533        // The mutations_cipher lock must be held for the duration so that mutations_cipher and
2534        // store_info are updated atomically.  Otherwise, write_mutation could find a new cipher but
2535        // end up writing the wrong wrapped key.
2536        let mut cipher = self.mutations_cipher.lock();
2537        *cipher = Some(StreamCipher::new(&unwrapped_key, 0));
2538        self.store_info.lock().as_mut().unwrap().mutations_key = Some(wrapped_key);
2539        // mutations_cipher_offset is updated by flush.
2540        Ok(())
2541    }
2542
2543    // When the symlink is unlocked, this function decrypts `link` and returns a bag of bytes that
2544    // is identical to that which was passed in as the target on `create_symlink`.
2545    // If the symlink is locked, this function hashes the encrypted `link` with Sha256 in order to
2546    // get a standard length and then base64 encodes the hash and returns that to the caller.
2547    pub async fn read_encrypted_symlink(
2548        &self,
2549        object_id: u64,
2550        link: Vec<u8>,
2551    ) -> Result<Vec<u8>, Error> {
2552        let mut link = link;
2553        let key = self
2554            .key_manager()
2555            .get_fscrypt_key(object_id, self.crypt().unwrap().as_ref(), async || {
2556                self.get_keys(object_id).await
2557            })
2558            .await?;
2559        if let Some(key) = key.into_cipher() {
2560            key.decrypt_symlink(object_id, &mut link)?;
2561            Ok(link)
2562        } else {
2563            // Locked symlinks are encoded using a hash_code of 0.
2564            let proxy_filename =
2565                fscrypt::proxy_filename::ProxyFilename::new_with_hash_code(0, &link);
2566            let proxy_filename_str: String = proxy_filename.into();
2567            Ok(proxy_filename_str.into_bytes())
2568        }
2569    }
2570
2571    /// Returns the link of a symlink object.
2572    pub async fn read_symlink(&self, object_id: u64) -> Result<Vec<u8>, Error> {
2573        match self.tree.find(&ObjectKey::object(object_id)).await? {
2574            None => bail!(FxfsError::NotFound),
2575            Some(Item {
2576                value: ObjectValue::Object { kind: ObjectKind::EncryptedSymlink { link, .. }, .. },
2577                ..
2578            }) => self.read_encrypted_symlink(object_id, link.to_vec()).await,
2579            Some(Item {
2580                value: ObjectValue::Object { kind: ObjectKind::Symlink { link, .. }, .. },
2581                ..
2582            }) => Ok(link.to_vec()),
2583            Some(item) => Err(anyhow!(FxfsError::Inconsistent)
2584                .context(format!("Unexpected item in lookup: {item:?}"))),
2585        }
2586    }
2587
2588    /// Retrieves the wrapped keys for the given object.  The keys *should* be known to exist and it
2589    /// will be considered an inconsistency if they don't.
2590    pub async fn get_keys(&self, object_id: u64) -> Result<EncryptionKeys, Error> {
2591        match self.tree.find(&ObjectKey::keys(object_id)).await?.ok_or(FxfsError::Inconsistent)? {
2592            Item { value: ObjectValue::Keys(keys), .. } => Ok(keys),
2593            _ => Err(anyhow!(FxfsError::Inconsistent).context("open_object: Expected keys")),
2594        }
2595    }
2596
2597    pub async fn update_attributes<'a>(
2598        &self,
2599        transaction: &mut Transaction<'a>,
2600        object_id: u64,
2601        node_attributes: Option<&fio::MutableNodeAttributes>,
2602        change_time: Option<Timestamp>,
2603    ) -> Result<(), Error> {
2604        if change_time.is_none() {
2605            if let Some(attributes) = node_attributes {
2606                let empty_attributes = fio::MutableNodeAttributes { ..Default::default() };
2607                if *attributes == empty_attributes {
2608                    return Ok(());
2609                }
2610            } else {
2611                return Ok(());
2612            }
2613        }
2614        let mut mutation = self.txn_get_object_mutation(transaction, object_id).await?;
2615        if let ObjectValue::Object { ref mut attributes, .. } = mutation.item.value {
2616            if let Some(time) = change_time {
2617                attributes.change_time = time;
2618            }
2619            if let Some(node_attributes) = node_attributes {
2620                if let Some(time) = node_attributes.creation_time {
2621                    attributes.creation_time = Timestamp::from_nanos(time);
2622                }
2623                if let Some(time) = node_attributes.modification_time {
2624                    attributes.modification_time = Timestamp::from_nanos(time);
2625                }
2626                if let Some(time) = node_attributes.access_time {
2627                    attributes.access_time = Timestamp::from_nanos(time);
2628                }
2629                if node_attributes.mode.is_some()
2630                    || node_attributes.uid.is_some()
2631                    || node_attributes.gid.is_some()
2632                    || node_attributes.rdev.is_some()
2633                {
2634                    if let Some(a) = &mut attributes.posix_attributes {
2635                        if let Some(mode) = node_attributes.mode {
2636                            a.mode = mode;
2637                        }
2638                        if let Some(uid) = node_attributes.uid {
2639                            a.uid = uid;
2640                        }
2641                        if let Some(gid) = node_attributes.gid {
2642                            a.gid = gid;
2643                        }
2644                        if let Some(rdev) = node_attributes.rdev {
2645                            a.rdev = rdev;
2646                        }
2647                    } else {
2648                        attributes.posix_attributes = Some(PosixAttributes {
2649                            mode: node_attributes.mode.unwrap_or_default(),
2650                            uid: node_attributes.uid.unwrap_or_default(),
2651                            gid: node_attributes.gid.unwrap_or_default(),
2652                            rdev: node_attributes.rdev.unwrap_or_default(),
2653                        });
2654                    }
2655                }
2656            }
2657        } else {
2658            bail!(
2659                anyhow!(FxfsError::Inconsistent)
2660                    .context("ObjectStore.update_attributes: Expected object value")
2661            );
2662        };
2663        transaction.add(self.store_object_id(), Mutation::ObjectStore(mutation));
2664        Ok(())
2665    }
2666
2667    // Updates and commits the changes to access time in ObjectProperties. The update matches
2668    // Linux's RELATIME. That is, access time is updated to the current time if access time is less
2669    // than or equal to the last modification or status change, or if it has been more than a day
2670    // since the last access.  `precondition` is a condition to be checked *after* taking the lock
2671    // on the object.  If `precondition` returns false, no update will be performed.
2672    pub async fn update_access_time(
2673        &self,
2674        object_id: u64,
2675        props: &mut ObjectProperties,
2676        precondition: impl FnOnce() -> bool,
2677    ) -> Result<(), Error> {
2678        let access_time = props.access_time.as_nanos();
2679        let modification_time = props.modification_time.as_nanos();
2680        let change_time = props.change_time.as_nanos();
2681        let now = Timestamp::now();
2682        if access_time <= modification_time
2683            || access_time <= change_time
2684            || access_time
2685                < now.as_nanos()
2686                    - Timestamp::from(std::time::Duration::from_secs(24 * 60 * 60)).as_nanos()
2687        {
2688            let mut transaction = self
2689                .filesystem()
2690                .clone()
2691                .new_transaction(
2692                    lock_keys![LockKey::object(self.store_object_id, object_id,)],
2693                    Options { borrow_metadata_space: true, ..Default::default() },
2694                )
2695                .await?;
2696            if precondition() {
2697                self.update_attributes(
2698                    &mut transaction,
2699                    object_id,
2700                    Some(&fio::MutableNodeAttributes {
2701                        access_time: Some(now.as_nanos()),
2702                        ..Default::default()
2703                    }),
2704                    None,
2705                )
2706                .await?;
2707                transaction.commit().await?;
2708                props.access_time = now;
2709            }
2710        }
2711        Ok(())
2712    }
2713
2714    async fn write_store_info<'a>(
2715        &'a self,
2716        transaction: &mut Transaction<'a>,
2717        info: &StoreInfo,
2718    ) -> Result<(), Error> {
2719        let mut serialized_info = Vec::new();
2720        info.serialize_with_version(&mut serialized_info)?;
2721        let mut buf = self.device.allocate_buffer(serialized_info.len()).await;
2722        buf.as_mut_slice().copy_from_slice(&serialized_info[..]);
2723        self.store_info_handle.get().unwrap().txn_write(transaction, 0u64, buf.as_ref()).await
2724    }
2725
2726    pub fn mark_deleted(&self) {
2727        *self.lock_state.lock() = LockState::Deleted;
2728    }
2729
2730    #[cfg(test)]
2731    pub(crate) fn test_set_last_object_id(&self, object_id: u64) {
2732        match &mut *self.last_object_id.lock() {
2733            LastObjectId::Encrypted { id, .. } => *id = object_id,
2734            _ => unreachable!(),
2735        }
2736    }
2737}
2738
2739#[async_trait]
2740impl JournalingObject for ObjectStore {
2741    fn apply_mutation(
2742        &self,
2743        mutation: Mutation,
2744        context: &ApplyContext<'_, '_>,
2745        _assoc_obj: AssocObj<'_>,
2746    ) -> Result<(), Error> {
2747        match &*self.lock_state.lock() {
2748            LockState::Locked | LockState::Locking => {
2749                ensure!(
2750                    matches!(mutation, Mutation::BeginFlush | Mutation::EndFlush)
2751                        || matches!(
2752                            mutation,
2753                            Mutation::EncryptedObjectStore(_) | Mutation::UpdateMutationsKey(_)
2754                                if context.mode.is_replay()
2755                        ),
2756                    anyhow!(FxfsError::Inconsistent)
2757                        .context(format!("Unexpected mutation for encrypted store: {mutation:?}"))
2758                );
2759            }
2760            LockState::Invalid
2761            | LockState::Unlocking
2762            | LockState::Unencrypted
2763            | LockState::Unlocked { .. }
2764            | LockState::UnlockedReadOnly(..)
2765            | LockState::Deleted => {}
2766            lock_state @ _ => panic!("Unexpected lock state: {lock_state:?}"),
2767        }
2768        match mutation {
2769            Mutation::ObjectStore(ObjectStoreMutation { item, op }) => {
2770                match op {
2771                    Operation::Insert => {
2772                        let mut unreserve_id = INVALID_OBJECT_ID;
2773                        // If we are inserting an object record for the first time, it signifies the
2774                        // birth of the object so we need to adjust the object count.
2775                        if matches!(item.value, ObjectValue::Object { .. }) {
2776                            {
2777                                let info = &mut self.store_info.lock();
2778                                let object_count = &mut info.as_mut().unwrap().object_count;
2779                                *object_count = object_count.saturating_add(1);
2780                            }
2781                            if context.mode.is_replay() {
2782                                self.update_last_object_id(item.key.object_id);
2783                            } else {
2784                                unreserve_id = item.key.object_id;
2785                            }
2786                        } else if !context.mode.is_replay()
2787                            && matches!(
2788                                item.key.data,
2789                                ObjectKeyData::GraveyardEntry { .. }
2790                                    | ObjectKeyData::GraveyardAttributeEntry { .. }
2791                            )
2792                        {
2793                            if matches!(item.value, ObjectValue::Some | ObjectValue::Trim) {
2794                                self.graveyard_entries.fetch_add(1, Ordering::Relaxed);
2795                            } else if matches!(item.value, ObjectValue::None) {
2796                                self.graveyard_entries.fetch_sub(1, Ordering::Relaxed);
2797                            }
2798                        }
2799                        self.tree.insert(item)?;
2800                        if unreserve_id != INVALID_OBJECT_ID {
2801                            // To avoid races, this *must* be after the `tree.insert(..)` above.
2802                            self.last_object_id.lock().unreserve(unreserve_id);
2803                        }
2804                    }
2805                    Operation::ReplaceOrInsert => {
2806                        if !context.mode.is_replay()
2807                            && matches!(
2808                                item.key.data,
2809                                ObjectKeyData::GraveyardEntry { .. }
2810                                    | ObjectKeyData::GraveyardAttributeEntry { .. }
2811                            )
2812                        {
2813                            if matches!(item.value, ObjectValue::Some | ObjectValue::Trim) {
2814                                self.graveyard_entries.fetch_add(1, Ordering::Relaxed);
2815                            } else if matches!(item.value, ObjectValue::None) {
2816                                self.graveyard_entries.fetch_sub(1, Ordering::Relaxed);
2817                            }
2818                        }
2819                        self.tree.replace_or_insert(item);
2820                    }
2821                    Operation::Merge => {
2822                        if item.is_tombstone() {
2823                            let info = &mut self.store_info.lock();
2824                            let object_count = &mut info.as_mut().unwrap().object_count;
2825                            *object_count = object_count.saturating_sub(1);
2826                        }
2827                        if !context.mode.is_replay()
2828                            && matches!(
2829                                item.key.data,
2830                                ObjectKeyData::GraveyardEntry { .. }
2831                                    | ObjectKeyData::GraveyardAttributeEntry { .. }
2832                            )
2833                        {
2834                            if matches!(item.value, ObjectValue::Some | ObjectValue::Trim) {
2835                                self.graveyard_entries.fetch_add(1, Ordering::Relaxed);
2836                            } else if matches!(item.value, ObjectValue::None) {
2837                                self.graveyard_entries.fetch_sub(1, Ordering::Relaxed);
2838                            }
2839                        }
2840                        let lower_bound = item.key.key_for_merge_into();
2841                        self.tree.merge_into(item, &lower_bound);
2842                    }
2843                }
2844            }
2845            Mutation::BeginFlush => {
2846                ensure!(self.parent_store.is_some(), FxfsError::Inconsistent);
2847                self.tree.seal();
2848            }
2849            Mutation::EndFlush => ensure!(self.parent_store.is_some(), FxfsError::Inconsistent),
2850            Mutation::EncryptedObjectStore(_) | Mutation::UpdateMutationsKey(_) => {
2851                // We will process these during Self::unlock.
2852                ensure!(
2853                    !matches!(&*self.lock_state.lock(), LockState::Unencrypted),
2854                    FxfsError::Inconsistent
2855                );
2856            }
2857            Mutation::CreateInternalDir(object_id) => {
2858                ensure!(object_id != INVALID_OBJECT_ID, FxfsError::Inconsistent);
2859                self.store_info.lock().as_mut().unwrap().internal_directory_object_id = object_id;
2860            }
2861            _ => bail!("unexpected mutation: {:?}", mutation),
2862        }
2863        self.counters.lock().mutations_applied += 1;
2864        Ok(())
2865    }
2866
2867    fn drop_mutation(&self, mutation: Mutation, _transaction: &Transaction<'_>) {
2868        self.counters.lock().mutations_dropped += 1;
2869        if let Mutation::ObjectStore(ObjectStoreMutation {
2870            item: Item { key: ObjectKey { object_id, .. }, value: ObjectValue::Object { .. }, .. },
2871            op: Operation::Insert,
2872        }) = mutation
2873        {
2874            self.last_object_id.lock().unreserve(object_id);
2875        }
2876    }
2877
2878    /// Push all in-memory structures to the device. This is not necessary for sync since the
2879    /// journal will take care of it.  This is supposed to be called when there is either memory or
2880    /// space pressure (flushing the store will persist in-memory data and allow the journal file to
2881    /// be trimmed).
2882    ///
2883    /// Also returns the earliest version of a struct in the filesystem (when known).
2884    async fn flush(&self) -> Result<Version, Error> {
2885        self.flush_with_reason(flush::Reason::Journal).await
2886    }
2887
2888    fn write_mutation(&self, mutation: &Mutation, mut writer: journal::Writer<'_>) {
2889        // Intentionally enumerating all variants to force a decision on any new variants. Encrypt
2890        // all mutations that could affect an encrypted object store contents or the `StoreInfo` of
2891        // the encrypted object store. During `unlock()` any mutations which haven't been encrypted
2892        // won't be replayed after reading `StoreInfo`.
2893        match mutation {
2894            // Whilst CreateInternalDir is a mutation for `StoreInfo`, which isn't encrypted, we
2895            // still choose to encrypt the mutation because it makes it easier to deal with replay.
2896            // When we replay mutations for an encrypted store, the only thing we keep in memory are
2897            // the encrypted mutations; we don't keep `StoreInfo` or changes to it in memory. So, by
2898            // encrypting the CreateInternalDir mutation here, it means we don't have to track both
2899            // encrypted mutations bound for the LSM tree and unencrypted mutations for `StoreInfo`
2900            // to use in `unlock()`. It'll just bundle CreateInternalDir mutations with the other
2901            // encrypted mutations and handled them all in sequence during `unlock()`.
2902            Mutation::ObjectStore(_) | Mutation::CreateInternalDir(_) => {
2903                let mut cipher = self.mutations_cipher.lock();
2904                if let Some(cipher) = cipher.as_mut() {
2905                    // If this is the first time we've used this key, we must write the key out.
2906                    if cipher.offset() == 0 {
2907                        writer.write(Mutation::update_mutations_key(
2908                            self.store_info
2909                                .lock()
2910                                .as_ref()
2911                                .unwrap()
2912                                .mutations_key
2913                                .as_ref()
2914                                .unwrap()
2915                                .clone(),
2916                        ));
2917                    }
2918                    let mut buffer = Vec::new();
2919                    mutation.serialize_into(&mut buffer).unwrap();
2920                    cipher.encrypt(&mut buffer);
2921                    writer.write(Mutation::EncryptedObjectStore(buffer.into()));
2922                    return;
2923                }
2924            }
2925            // `EncryptedObjectStore` and `UpdateMutationsKey` are both obviously associated with
2926            // encrypted object stores, but are either the encrypted mutation data itself or
2927            // metadata governing how the data will be encrypted. They should only be produced here.
2928            Mutation::EncryptedObjectStore(_) | Mutation::UpdateMutationsKey(_) => {
2929                debug_assert!(false, "Only this method should generate encrypted mutations");
2930            }
2931            // `BeginFlush` and `EndFlush` are not needed during `unlock()` and are needed during
2932            // the initial journal replay, so should not be encrypted. `Allocator`, `DeleteVolume`,
2933            // `UpdateBorrowed` mutations are never associated with an encrypted store as we do not
2934            // encrypt the allocator or root/root-parent stores so we can avoid the locking.
2935            Mutation::Allocator(_)
2936            | Mutation::BeginFlush
2937            | Mutation::EndFlush
2938            | Mutation::DeleteVolume
2939            | Mutation::UpdateBorrowed(_) => {}
2940        }
2941        writer.write(mutation.clone());
2942    }
2943}
2944
2945impl Drop for ObjectStore {
2946    fn drop(&mut self) {
2947        let mut last_object_id = self.last_object_id.lock();
2948        last_object_id.drain_unreserved();
2949        match &*last_object_id {
2950            LastObjectId::Low32Bit { reserved, .. } => debug_assert!(reserved.is_empty()),
2951            _ => {}
2952        }
2953    }
2954}
2955
2956impl HandleOwner for ObjectStore {}
2957
2958impl AsRef<ObjectStore> for ObjectStore {
2959    fn as_ref(&self) -> &ObjectStore {
2960        self
2961    }
2962}
2963
2964fn layer_size_from_encrypted_mutations_size(size: u64) -> u64 {
2965    // This is similar to reserved_space_from_journal_usage. It needs to be a worst case estimate of
2966    // the amount of metadata space that might need to be reserved to allow the encrypted mutations
2967    // to be written to layer files.  It needs to be >= than reservation_amount_from_layer_size will
2968    // return once the data has been written to layer files and <= than
2969    // reserved_space_from_journal_usage would use.  We can't just use
2970    // reserved_space_from_journal_usage because the encrypted mutations file includes some extra
2971    // data (it includes the checkpoints) that isn't written in the same way to the journal.
2972    size * 3
2973}
2974
2975impl AssociatedObject for ObjectStore {}
2976
2977/// Argument to the trim_some method.
2978#[derive(Debug)]
2979pub enum TrimMode {
2980    /// Trim extents beyond the current size.
2981    UseSize,
2982
2983    /// Trim extents beyond the supplied offset.
2984    FromOffset(u64),
2985
2986    /// Remove the object (or attribute) from the store once it is fully trimmed.
2987    Tombstone(TombstoneMode),
2988}
2989
2990/// Sets the mode for tombstoning (either at the object or attribute level).
2991#[derive(Debug)]
2992pub enum TombstoneMode {
2993    Object,
2994    Attribute,
2995}
2996
2997/// Result of the trim_some method.
2998#[derive(Debug)]
2999pub enum TrimResult {
3000    /// We reached the limit of the transaction and more extents might follow.
3001    Incomplete,
3002
3003    /// We finished this attribute.  Returns the ID of the next attribute for the same object if
3004    /// there is one.
3005    Done(Option<u64>),
3006}
3007
3008/// Loads store info.
3009pub async fn load_store_info(
3010    parent: &Arc<ObjectStore>,
3011    store_object_id: u64,
3012) -> Result<StoreInfo, Error> {
3013    load_store_info_from_handle(
3014        &ObjectStore::open_object(parent, store_object_id, HandleOptions::default(), None).await?,
3015    )
3016    .await
3017}
3018
3019async fn load_store_info_from_handle(
3020    handle: &DataObjectHandle<impl HandleOwner>,
3021) -> Result<StoreInfo, Error> {
3022    Ok(if handle.get_size() > 0 {
3023        let serialized_info = handle.contents(MAX_STORE_INFO_SERIALIZED_SIZE).await?;
3024        let mut cursor = std::io::Cursor::new(serialized_info);
3025        let (store_info, _) = StoreInfo::deserialize_with_version(&mut cursor)
3026            .context("Failed to deserialize StoreInfo")?;
3027        store_info
3028    } else {
3029        // The store_info will be absent for a newly created and empty object store.
3030        StoreInfo::default()
3031    })
3032}
3033
3034#[cfg(test)]
3035mod tests {
3036    use super::{
3037        DEFAULT_DATA_ATTRIBUTE_ID, FSVERITY_MERKLE_ATTRIBUTE_ID, FsverityMetadata, HandleOptions,
3038        LastObjectId, LastObjectIdInfo, LockKey, MAX_STORE_INFO_SERIALIZED_SIZE, Mutation,
3039        NO_OWNER, NewChildStoreOptions, OBJECT_ID_HI_MASK, ObjectStore, RootDigest, StoreInfo,
3040        StoreOptions, StoreOwner,
3041    };
3042    use crate::errors::FxfsError;
3043    use crate::filesystem::{FxFilesystem, JournalingObject, OpenFxFilesystem};
3044    use crate::fsck::{fsck, fsck_volume};
3045    use crate::lsm_tree::Query;
3046    use crate::lsm_tree::types::{ItemRef, LayerIterator};
3047    use crate::object_handle::{
3048        INVALID_OBJECT_ID, ObjectHandle, ReadObjectHandle, WriteObjectHandle,
3049    };
3050    use crate::object_store::directory::Directory;
3051    use crate::object_store::object_record::{AttributeKey, ObjectKey, ObjectKind, ObjectValue};
3052    use crate::object_store::transaction::{Options, lock_keys};
3053    use crate::object_store::volume::root_volume;
3054    use crate::serialized_types::VersionedLatest;
3055    use crate::testing;
3056    use assert_matches::assert_matches;
3057    use async_trait::async_trait;
3058    use fuchsia_async as fasync;
3059    use fuchsia_sync::Mutex;
3060    use futures::join;
3061    use fxfs_crypto::ff1::Ff1;
3062    use fxfs_crypto::{
3063        Crypt, FXFS_KEY_SIZE, FXFS_WRAPPED_KEY_SIZE, FxfsKey, UnwrappedKey, WrappedKeyBytes,
3064    };
3065    use fxfs_insecure_crypto::new_insecure_crypt;
3066
3067    use std::sync::Arc;
3068    use std::time::Duration;
3069    use storage_device::DeviceHolder;
3070    use storage_device::fake_device::FakeDevice;
3071    use test_case::test_case;
3072
3073    const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
3074
3075    async fn test_filesystem() -> OpenFxFilesystem {
3076        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
3077        FxFilesystem::new_empty(device).await.expect("new_empty failed")
3078    }
3079
3080    #[fuchsia::test]
3081    async fn test_verified_file_with_verified_attribute() {
3082        let fs: OpenFxFilesystem = test_filesystem().await;
3083        let mut transaction = fs
3084            .clone()
3085            .new_transaction(lock_keys![], Options::default())
3086            .await
3087            .expect("new_transaction failed");
3088        let store = fs.root_store();
3089        let object = Arc::new(
3090            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
3091                .await
3092                .expect("create_object failed"),
3093        );
3094
3095        transaction.add(
3096            store.store_object_id(),
3097            Mutation::replace_or_insert_object(
3098                ObjectKey::attribute(
3099                    object.object_id(),
3100                    DEFAULT_DATA_ATTRIBUTE_ID,
3101                    AttributeKey::Attribute,
3102                ),
3103                ObjectValue::verified_attribute(
3104                    0,
3105                    FsverityMetadata::Internal(RootDigest::Sha256([0; 32]), vec![]),
3106                ),
3107            ),
3108        );
3109
3110        transaction.add(
3111            store.store_object_id(),
3112            Mutation::replace_or_insert_object(
3113                ObjectKey::attribute(
3114                    object.object_id(),
3115                    FSVERITY_MERKLE_ATTRIBUTE_ID,
3116                    AttributeKey::Attribute,
3117                ),
3118                ObjectValue::attribute(0, false),
3119            ),
3120        );
3121
3122        transaction.commit().await.unwrap();
3123
3124        let handle =
3125            ObjectStore::open_object(&store, object.object_id(), HandleOptions::default(), None)
3126                .await
3127                .expect("open_object failed");
3128
3129        assert!(handle.is_verified_file());
3130
3131        fs.close().await.expect("Close failed");
3132    }
3133
3134    #[fuchsia::test]
3135    async fn test_verified_file_without_verified_attribute() {
3136        let fs: OpenFxFilesystem = test_filesystem().await;
3137        let mut transaction = fs
3138            .clone()
3139            .new_transaction(lock_keys![], Options::default())
3140            .await
3141            .expect("new_transaction failed");
3142        let store = fs.root_store();
3143        let object = Arc::new(
3144            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
3145                .await
3146                .expect("create_object failed"),
3147        );
3148
3149        transaction.commit().await.unwrap();
3150
3151        let handle =
3152            ObjectStore::open_object(&store, object.object_id(), HandleOptions::default(), None)
3153                .await
3154                .expect("open_object failed");
3155
3156        assert!(!handle.is_verified_file());
3157
3158        fs.close().await.expect("Close failed");
3159    }
3160
3161    #[fuchsia::test]
3162    async fn test_create_and_open_store() {
3163        let fs = test_filesystem().await;
3164        let store_id = {
3165            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3166            root_volume
3167                .new_volume(
3168                    "test",
3169                    NewChildStoreOptions {
3170                        options: StoreOptions {
3171                            owner: NO_OWNER,
3172                            crypt: Some(Arc::new(new_insecure_crypt())),
3173                        },
3174                        ..Default::default()
3175                    },
3176                )
3177                .await
3178                .expect("new_volume failed")
3179                .store_object_id()
3180        };
3181
3182        fs.close().await.expect("close failed");
3183        let device = fs.take_device().await;
3184        device.reopen(false);
3185        let fs = FxFilesystem::open(device).await.expect("open failed");
3186
3187        {
3188            let store = fs.object_manager().store(store_id).expect("store not found");
3189            store.unlock(NO_OWNER, Arc::new(new_insecure_crypt())).await.expect("unlock failed");
3190        }
3191        fs.close().await.expect("Close failed");
3192    }
3193
3194    #[fuchsia::test]
3195    async fn test_create_and_open_internal_dir() {
3196        let fs = test_filesystem().await;
3197        let dir_id;
3198        let store_id;
3199        {
3200            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3201            let store = root_volume
3202                .new_volume(
3203                    "test",
3204                    NewChildStoreOptions {
3205                        options: StoreOptions {
3206                            owner: NO_OWNER,
3207                            crypt: Some(Arc::new(new_insecure_crypt())),
3208                        },
3209                        ..Default::default()
3210                    },
3211                )
3212                .await
3213                .expect("new_volume failed");
3214            dir_id =
3215                store.get_or_create_internal_directory_id().await.expect("Create internal dir");
3216            store_id = store.store_object_id();
3217        }
3218
3219        fs.close().await.expect("close failed");
3220        let device = fs.take_device().await;
3221        device.reopen(false);
3222        let fs = FxFilesystem::open(device).await.expect("open failed");
3223
3224        {
3225            let store = fs.object_manager().store(store_id).expect("store not found");
3226            store.unlock(NO_OWNER, Arc::new(new_insecure_crypt())).await.expect("unlock failed");
3227            assert_eq!(
3228                dir_id,
3229                store.get_or_create_internal_directory_id().await.expect("Retrieving dir")
3230            );
3231            let obj = store
3232                .tree()
3233                .find(&ObjectKey::object(dir_id))
3234                .await
3235                .expect("Searching tree for dir")
3236                .unwrap();
3237            assert_matches!(
3238                obj.value,
3239                ObjectValue::Object { kind: ObjectKind::Directory { .. }, .. }
3240            );
3241        }
3242        fs.close().await.expect("Close failed");
3243    }
3244
3245    #[fuchsia::test]
3246    async fn test_create_and_open_internal_dir_unencrypted() {
3247        let fs = test_filesystem().await;
3248        let dir_id;
3249        let store_id;
3250        {
3251            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3252            let store = root_volume
3253                .new_volume("test", NewChildStoreOptions::default())
3254                .await
3255                .expect("new_volume failed");
3256            dir_id =
3257                store.get_or_create_internal_directory_id().await.expect("Create internal dir");
3258            store_id = store.store_object_id();
3259        }
3260
3261        fs.close().await.expect("close failed");
3262        let device = fs.take_device().await;
3263        device.reopen(false);
3264        let fs = FxFilesystem::open(device).await.expect("open failed");
3265
3266        {
3267            let store = fs.object_manager().store(store_id).expect("store not found");
3268            assert_eq!(
3269                dir_id,
3270                store.get_or_create_internal_directory_id().await.expect("Retrieving dir")
3271            );
3272            let obj = store
3273                .tree()
3274                .find(&ObjectKey::object(dir_id))
3275                .await
3276                .expect("Searching tree for dir")
3277                .unwrap();
3278            assert_matches!(
3279                obj.value,
3280                ObjectValue::Object { kind: ObjectKind::Directory { .. }, .. }
3281            );
3282        }
3283        fs.close().await.expect("Close failed");
3284    }
3285
3286    #[fuchsia::test(threads = 10)]
3287    async fn test_old_layers_are_purged() {
3288        let fs = test_filesystem().await;
3289
3290        let store = fs.root_store();
3291        let mut transaction = fs
3292            .clone()
3293            .new_transaction(lock_keys![], Options::default())
3294            .await
3295            .expect("new_transaction failed");
3296        let object = Arc::new(
3297            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
3298                .await
3299                .expect("create_object failed"),
3300        );
3301        transaction.commit().await.expect("commit failed");
3302
3303        store.flush().await.expect("flush failed");
3304
3305        let mut buf = object.allocate_buffer(5).await;
3306        buf.as_mut_slice().copy_from_slice(b"hello");
3307        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3308
3309        // Getting the layer-set should cause the flush to stall.
3310        let layer_set = store.tree().layer_set();
3311
3312        let done = Mutex::new(false);
3313        let mut object_id = 0;
3314
3315        join!(
3316            async {
3317                store.flush().await.expect("flush failed");
3318                assert!(*done.lock());
3319            },
3320            async {
3321                // This is a halting problem so all we can do is sleep.
3322                fasync::Timer::new(Duration::from_secs(1)).await;
3323                *done.lock() = true;
3324                object_id = layer_set.layers.last().unwrap().handle().unwrap().object_id();
3325                std::mem::drop(layer_set);
3326            }
3327        );
3328
3329        if let Err(e) = ObjectStore::open_object(
3330            &store.parent_store.as_ref().unwrap(),
3331            object_id,
3332            HandleOptions::default(),
3333            store.crypt(),
3334        )
3335        .await
3336        {
3337            assert!(FxfsError::NotFound.matches(&e));
3338        } else {
3339            panic!("open_object succeeded");
3340        }
3341    }
3342
3343    #[fuchsia::test]
3344    async fn test_tombstone_deletes_data() {
3345        let fs = test_filesystem().await;
3346        let root_store = fs.root_store();
3347        let child_id = {
3348            let mut transaction = fs
3349                .clone()
3350                .new_transaction(lock_keys![], Options::default())
3351                .await
3352                .expect("new_transaction failed");
3353            let child = ObjectStore::create_object(
3354                &root_store,
3355                &mut transaction,
3356                HandleOptions::default(),
3357                None,
3358            )
3359            .await
3360            .expect("create_object failed");
3361            root_store.add_to_graveyard(&mut transaction, child.object_id());
3362            transaction.commit().await.expect("commit failed");
3363
3364            // Allocate an extent in the file.
3365            let mut buffer = child.allocate_buffer(8192).await;
3366            buffer.as_mut_slice().fill(0xaa);
3367            child.write_or_append(Some(0), buffer.as_ref()).await.expect("write failed");
3368
3369            child.object_id()
3370        };
3371
3372        root_store.tombstone_object(child_id, Options::default()).await.expect("tombstone failed");
3373
3374        // Let fsck check allocations.
3375        fsck(fs.clone()).await.expect("fsck failed");
3376    }
3377
3378    #[fuchsia::test]
3379    async fn test_tombstone_purges_keys() {
3380        let fs = test_filesystem().await;
3381        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3382        let store = root_volume
3383            .new_volume(
3384                "test",
3385                NewChildStoreOptions {
3386                    options: StoreOptions {
3387                        crypt: Some(Arc::new(new_insecure_crypt())),
3388                        ..StoreOptions::default()
3389                    },
3390                    ..NewChildStoreOptions::default()
3391                },
3392            )
3393            .await
3394            .expect("new_volume failed");
3395        let mut transaction = fs
3396            .clone()
3397            .new_transaction(lock_keys![], Options::default())
3398            .await
3399            .expect("new_transaction failed");
3400        let child =
3401            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
3402                .await
3403                .expect("create_object failed");
3404        store.add_to_graveyard(&mut transaction, child.object_id());
3405        transaction.commit().await.expect("commit failed");
3406        assert!(store.key_manager.get(child.object_id()).await.unwrap().is_some());
3407        store
3408            .tombstone_object(child.object_id(), Options::default())
3409            .await
3410            .expect("tombstone_object failed");
3411        assert!(store.key_manager.get(child.object_id()).await.unwrap().is_none());
3412        fs.close().await.expect("close failed");
3413    }
3414
3415    #[fuchsia::test]
3416    async fn test_major_compaction_discards_unnecessary_records() {
3417        let fs = test_filesystem().await;
3418        let root_store = fs.root_store();
3419        let child_id = {
3420            let mut transaction = fs
3421                .clone()
3422                .new_transaction(lock_keys![], Options::default())
3423                .await
3424                .expect("new_transaction failed");
3425            let child = ObjectStore::create_object(
3426                &root_store,
3427                &mut transaction,
3428                HandleOptions::default(),
3429                None,
3430            )
3431            .await
3432            .expect("create_object failed");
3433            root_store.add_to_graveyard(&mut transaction, child.object_id());
3434            transaction.commit().await.expect("commit failed");
3435
3436            // Allocate an extent in the file.
3437            let mut buffer = child.allocate_buffer(8192).await;
3438            buffer.as_mut_slice().fill(0xaa);
3439            child.write_or_append(Some(0), buffer.as_ref()).await.expect("write failed");
3440
3441            child.object_id()
3442        };
3443
3444        root_store.tombstone_object(child_id, Options::default()).await.expect("tombstone failed");
3445        {
3446            let layers = root_store.tree.layer_set();
3447            let mut merger = layers.merger();
3448            let iter = merger
3449                .query(Query::FullRange(&ObjectKey::object(child_id)))
3450                .await
3451                .expect("seek failed");
3452            // Find at least one object still in the tree.
3453            match iter.get() {
3454                Some(ItemRef { key: ObjectKey { object_id, .. }, .. })
3455                    if *object_id == child_id => {}
3456                _ => panic!("Objects should still be in the tree."),
3457            }
3458        }
3459        root_store.flush().await.expect("flush failed");
3460
3461        // There should be no records for the object.
3462        let layers = root_store.tree.layer_set();
3463        let mut merger = layers.merger();
3464        let iter = merger
3465            .query(Query::FullRange(&ObjectKey::object(child_id)))
3466            .await
3467            .expect("seek failed");
3468        match iter.get() {
3469            None => {}
3470            Some(ItemRef { key: ObjectKey { object_id, .. }, .. }) => {
3471                assert_ne!(*object_id, child_id)
3472            }
3473        }
3474    }
3475
3476    #[fuchsia::test]
3477    async fn test_overlapping_extents_in_different_layers() {
3478        let fs = test_filesystem().await;
3479        let store = fs.root_store();
3480
3481        let mut transaction = fs
3482            .clone()
3483            .new_transaction(
3484                lock_keys![LockKey::object(
3485                    store.store_object_id(),
3486                    store.root_directory_object_id()
3487                )],
3488                Options::default(),
3489            )
3490            .await
3491            .expect("new_transaction failed");
3492        let root_directory =
3493            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
3494        let object = root_directory
3495            .create_child_file(&mut transaction, "test")
3496            .await
3497            .expect("create_child_file failed");
3498        transaction.commit().await.expect("commit failed");
3499
3500        let buf = object.allocate_buffer(16384).await;
3501        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3502
3503        store.flush().await.expect("flush failed");
3504
3505        object.write_or_append(Some(0), buf.subslice(0..4096)).await.expect("write failed");
3506
3507        // At this point, we should have an extent for 0..16384 in a layer that has been flushed,
3508        // and an extent for 0..4096 that partially overwrites it.  Writing to 0..16384 should
3509        // overwrite both of those extents.
3510        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3511
3512        fsck(fs.clone()).await.expect("fsck failed");
3513    }
3514
3515    #[fuchsia::test(threads = 10)]
3516    async fn test_encrypted_mutations() {
3517        async fn one_iteration(
3518            fs: OpenFxFilesystem,
3519            crypt: Arc<dyn Crypt>,
3520            iteration: u64,
3521        ) -> OpenFxFilesystem {
3522            async fn reopen(fs: OpenFxFilesystem) -> OpenFxFilesystem {
3523                fs.close().await.expect("Close failed");
3524                let device = fs.take_device().await;
3525                device.reopen(false);
3526                FxFilesystem::open(device).await.expect("FS open failed")
3527            }
3528
3529            let fs = reopen(fs).await;
3530
3531            let (store_object_id, object_id) = {
3532                let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3533                let store = root_volume
3534                    .volume(
3535                        "test",
3536                        StoreOptions { crypt: Some(crypt.clone()), ..StoreOptions::default() },
3537                    )
3538                    .await
3539                    .expect("volume failed");
3540
3541                let mut transaction = fs
3542                    .clone()
3543                    .new_transaction(
3544                        lock_keys![LockKey::object(
3545                            store.store_object_id(),
3546                            store.root_directory_object_id(),
3547                        )],
3548                        Options::default(),
3549                    )
3550                    .await
3551                    .expect("new_transaction failed");
3552                let root_directory = Directory::open(&store, store.root_directory_object_id())
3553                    .await
3554                    .expect("open failed");
3555                let object = root_directory
3556                    .create_child_file(&mut transaction, &format!("test {}", iteration))
3557                    .await
3558                    .expect("create_child_file failed");
3559                transaction.commit().await.expect("commit failed");
3560
3561                let mut buf = object.allocate_buffer(1000).await;
3562                for i in 0..buf.len() {
3563                    buf.as_mut_slice()[i] = i as u8;
3564                }
3565                object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3566
3567                (store.store_object_id(), object.object_id())
3568            };
3569
3570            let fs = reopen(fs).await;
3571
3572            let check_object = |fs: Arc<FxFilesystem>| {
3573                let crypt = crypt.clone();
3574                async move {
3575                    let root_volume = root_volume(fs).await.expect("root_volume failed");
3576                    let volume = root_volume
3577                        .volume(
3578                            "test",
3579                            StoreOptions { crypt: Some(crypt), ..StoreOptions::default() },
3580                        )
3581                        .await
3582                        .expect("volume failed");
3583
3584                    let object = ObjectStore::open_object(
3585                        &volume,
3586                        object_id,
3587                        HandleOptions::default(),
3588                        None,
3589                    )
3590                    .await
3591                    .expect("open_object failed");
3592                    let mut buf = object.allocate_buffer(1000).await;
3593                    assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), 1000);
3594                    for i in 0..buf.len() {
3595                        assert_eq!(buf.as_slice()[i], i as u8);
3596                    }
3597                }
3598            };
3599
3600            check_object(fs.clone()).await;
3601
3602            let fs = reopen(fs).await;
3603
3604            // At this point the "test" volume is locked.  Before checking the object, flush the
3605            // filesystem.  This should leave a file with encrypted mutations.
3606            fs.object_manager().flush().await.expect("flush failed");
3607
3608            assert_ne!(
3609                fs.object_manager()
3610                    .store(store_object_id)
3611                    .unwrap()
3612                    .load_store_info()
3613                    .await
3614                    .expect("load_store_info failed")
3615                    .encrypted_mutations_object_id,
3616                INVALID_OBJECT_ID
3617            );
3618
3619            check_object(fs.clone()).await;
3620
3621            // Checking the object should have triggered a flush and so now there should be no
3622            // encrypted mutations object.
3623            assert_eq!(
3624                fs.object_manager()
3625                    .store(store_object_id)
3626                    .unwrap()
3627                    .load_store_info()
3628                    .await
3629                    .expect("load_store_info failed")
3630                    .encrypted_mutations_object_id,
3631                INVALID_OBJECT_ID
3632            );
3633
3634            let fs = reopen(fs).await;
3635
3636            fsck(fs.clone()).await.expect("fsck failed");
3637
3638            let fs = reopen(fs).await;
3639
3640            check_object(fs.clone()).await;
3641
3642            fs
3643        }
3644
3645        let mut fs = test_filesystem().await;
3646        let crypt = Arc::new(new_insecure_crypt());
3647
3648        {
3649            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3650            let _store = root_volume
3651                .new_volume(
3652                    "test",
3653                    NewChildStoreOptions {
3654                        options: StoreOptions {
3655                            crypt: Some(crypt.clone()),
3656                            ..StoreOptions::default()
3657                        },
3658                        ..Default::default()
3659                    },
3660                )
3661                .await
3662                .expect("new_volume failed");
3663        }
3664
3665        // Run a few iterations so that we test changes with the stream cipher offset.
3666        for i in 0..5 {
3667            fs = one_iteration(fs, crypt.clone(), i).await;
3668        }
3669    }
3670
3671    #[test_case(true; "with a flush")]
3672    #[test_case(false; "without a flush")]
3673    #[fuchsia::test(threads = 10)]
3674    async fn test_object_id_cipher_roll(with_flush: bool) {
3675        let fs = test_filesystem().await;
3676        let crypt = Arc::new(new_insecure_crypt());
3677
3678        let expected_key = {
3679            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3680            let store = root_volume
3681                .new_volume(
3682                    "test",
3683                    NewChildStoreOptions {
3684                        options: StoreOptions {
3685                            crypt: Some(crypt.clone()),
3686                            ..StoreOptions::default()
3687                        },
3688                        ..Default::default()
3689                    },
3690                )
3691                .await
3692                .expect("new_volume failed");
3693
3694            // Create some files so that our in-memory copy of StoreInfo has changes (the object
3695            // count) pending a flush.
3696            let root_dir_id = store.root_directory_object_id();
3697            let root_dir =
3698                Arc::new(Directory::open(&store, root_dir_id).await.expect("open failed"));
3699            let mut transaction = fs
3700                .clone()
3701                .new_transaction(
3702                    lock_keys![LockKey::object(store.store_object_id(), root_dir_id)],
3703                    Options::default(),
3704                )
3705                .await
3706                .expect("new_transaction failed");
3707            for i in 0..10 {
3708                root_dir.create_child_file(&mut transaction, &format!("file {i}")).await.unwrap();
3709            }
3710            transaction.commit().await.expect("commit failed");
3711
3712            let orig_store_info = store.store_info().unwrap();
3713
3714            // Hack the last object ID to force a roll of the object ID cipher.
3715            {
3716                let mut last_object_id = store.last_object_id.lock();
3717                match &mut *last_object_id {
3718                    LastObjectId::Encrypted { id, .. } => {
3719                        assert_eq!(*id & OBJECT_ID_HI_MASK, 0);
3720                        *id |= 0xffffffff;
3721                    }
3722                    _ => unreachable!(),
3723                }
3724            }
3725
3726            let mut transaction = fs
3727                .clone()
3728                .new_transaction(
3729                    lock_keys![LockKey::object(
3730                        store.store_object_id(),
3731                        store.root_directory_object_id()
3732                    )],
3733                    Options::default(),
3734                )
3735                .await
3736                .expect("new_transaction failed");
3737            let root_directory = Directory::open(&store, store.root_directory_object_id())
3738                .await
3739                .expect("open failed");
3740            let object = root_directory
3741                .create_child_file(&mut transaction, "test")
3742                .await
3743                .expect("create_child_file failed");
3744            transaction.commit().await.expect("commit failed");
3745
3746            assert_eq!(object.object_id() & OBJECT_ID_HI_MASK, 1u64 << 32);
3747
3748            // Check that the key has been changed.
3749            let key = match (
3750                store.store_info().unwrap().last_object_id,
3751                orig_store_info.last_object_id,
3752            ) {
3753                (
3754                    LastObjectIdInfo::Encrypted { key, id },
3755                    LastObjectIdInfo::Encrypted { key: orig_key, .. },
3756                ) => {
3757                    assert_ne!(key, orig_key);
3758                    assert_eq!(id, 1u64 << 32);
3759                    key
3760                }
3761                _ => unreachable!(),
3762            };
3763
3764            if with_flush {
3765                fs.journal().force_compact().await.unwrap();
3766            }
3767
3768            let last_object_id = store.last_object_id.lock();
3769            assert_eq!(last_object_id.id(), 1u64 << 32);
3770            key
3771        };
3772
3773        fs.close().await.expect("Close failed");
3774        let device = fs.take_device().await;
3775        device.reopen(false);
3776        let fs = FxFilesystem::open(device).await.expect("open failed");
3777        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3778        let store = root_volume
3779            .volume("test", StoreOptions { crypt: Some(crypt.clone()), ..StoreOptions::default() })
3780            .await
3781            .expect("volume failed");
3782
3783        assert_matches!(store.store_info().unwrap().last_object_id, LastObjectIdInfo::Encrypted { key, .. } if key == expected_key);
3784        assert_eq!(store.last_object_id.lock().id(), 1u64 << 32);
3785
3786        fsck(fs.clone()).await.expect("fsck failed");
3787        fsck_volume(&fs, store.store_object_id(), None).await.expect("fsck_volume failed");
3788    }
3789
3790    #[fuchsia::test(threads = 2)]
3791    async fn test_race_object_id_cipher_roll_and_flush() {
3792        let fs = test_filesystem().await;
3793        let crypt = Arc::new(new_insecure_crypt());
3794
3795        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3796        let store = root_volume
3797            .new_volume(
3798                "test",
3799                NewChildStoreOptions {
3800                    options: StoreOptions { crypt: Some(crypt.clone()), ..StoreOptions::default() },
3801                    ..Default::default()
3802                },
3803            )
3804            .await
3805            .expect("new_volume failed");
3806
3807        assert!(matches!(&*store.last_object_id.lock(), LastObjectId::Encrypted { .. }));
3808
3809        // Create some files so that our in-memory copy of StoreInfo has changes (the object
3810        // count) pending a flush.
3811        let root_dir_id = store.root_directory_object_id();
3812        let root_dir = Arc::new(Directory::open(&store, root_dir_id).await.expect("open failed"));
3813
3814        let _executor_tasks = testing::force_executor_threads_to_run(2).await;
3815
3816        for j in 0..100 {
3817            let mut transaction = fs
3818                .clone()
3819                .new_transaction(
3820                    lock_keys![LockKey::object(store.store_object_id(), root_dir_id)],
3821                    Options::default(),
3822                )
3823                .await
3824                .expect("new_transaction failed");
3825            root_dir.create_child_file(&mut transaction, &format!("file {j}")).await.unwrap();
3826            transaction.commit().await.expect("commit failed");
3827
3828            let task = {
3829                let fs = fs.clone();
3830                fasync::Task::spawn(async move {
3831                    fs.journal().force_compact().await.unwrap();
3832                })
3833            };
3834
3835            // Hack the last object ID to force a roll of the object ID cipher.
3836            {
3837                let mut last_object_id = store.last_object_id.lock();
3838                let LastObjectId::Encrypted { id, .. } = &mut *last_object_id else {
3839                    unreachable!()
3840                };
3841                assert_eq!(*id >> 32, j);
3842                *id |= 0xffffffff;
3843            }
3844
3845            let mut transaction = fs
3846                .clone()
3847                .new_transaction(
3848                    lock_keys![LockKey::object(
3849                        store.store_object_id(),
3850                        store.root_directory_object_id()
3851                    )],
3852                    Options::default(),
3853                )
3854                .await
3855                .expect("new_transaction failed");
3856            let root_directory = Directory::open(&store, store.root_directory_object_id())
3857                .await
3858                .expect("open failed");
3859            root_directory
3860                .create_child_file(&mut transaction, "test {j}")
3861                .await
3862                .expect("create_child_file failed");
3863            transaction.commit().await.expect("commit failed");
3864
3865            task.await;
3866
3867            // Check that the key has been changed.
3868            let new_store_info = store.load_store_info().await.unwrap();
3869
3870            let LastObjectIdInfo::Encrypted { id, key } = new_store_info.last_object_id else {
3871                unreachable!()
3872            };
3873            assert_eq!(id >> 32, j + 1);
3874            let LastObjectIdInfo::Encrypted { key: in_memory_key, .. } =
3875                store.store_info().unwrap().last_object_id
3876            else {
3877                unreachable!()
3878            };
3879            assert_eq!(key, in_memory_key);
3880        }
3881
3882        fs.close().await.expect("Close failed");
3883    }
3884
3885    #[fuchsia::test]
3886    async fn test_object_id_no_roll_for_unencrypted_store() {
3887        let fs = test_filesystem().await;
3888
3889        {
3890            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3891            let store = root_volume
3892                .new_volume("test", NewChildStoreOptions::default())
3893                .await
3894                .expect("new_volume failed");
3895
3896            // Hack the last object ID.
3897            {
3898                let mut last_object_id = store.last_object_id.lock();
3899                match &mut *last_object_id {
3900                    LastObjectId::Unencrypted { id } => {
3901                        assert_eq!(*id & OBJECT_ID_HI_MASK, 0);
3902                        *id |= 0xffffffff;
3903                    }
3904                    _ => unreachable!(),
3905                }
3906            }
3907
3908            let mut transaction = fs
3909                .clone()
3910                .new_transaction(
3911                    lock_keys![LockKey::object(
3912                        store.store_object_id(),
3913                        store.root_directory_object_id()
3914                    )],
3915                    Options::default(),
3916                )
3917                .await
3918                .expect("new_transaction failed");
3919            let root_directory = Directory::open(&store, store.root_directory_object_id())
3920                .await
3921                .expect("open failed");
3922            let object = root_directory
3923                .create_child_file(&mut transaction, "test")
3924                .await
3925                .expect("create_child_file failed");
3926            transaction.commit().await.expect("commit failed");
3927
3928            assert_eq!(object.object_id(), 0x1_0000_0000);
3929
3930            // Check that there is still no key.
3931            assert_matches!(
3932                store.store_info().unwrap().last_object_id,
3933                LastObjectIdInfo::Unencrypted { .. }
3934            );
3935
3936            assert_eq!(store.last_object_id.lock().id(), 0x1_0000_0000);
3937        };
3938
3939        fs.close().await.expect("Close failed");
3940        let device = fs.take_device().await;
3941        device.reopen(false);
3942        let fs = FxFilesystem::open(device).await.expect("open failed");
3943        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3944        let store =
3945            root_volume.volume("test", StoreOptions::default()).await.expect("volume failed");
3946
3947        assert_eq!(store.last_object_id.lock().id(), 0x1_0000_0000);
3948    }
3949
3950    #[fuchsia::test]
3951    fn test_object_id_is_not_invalid_object_id() {
3952        let key = UnwrappedKey::new(vec![0; FXFS_KEY_SIZE]);
3953        // 1106634048 results in INVALID_OBJECT_ID with this key.
3954        let mut last_object_id =
3955            LastObjectId::Encrypted { id: 1106634047, cipher: Box::new(Ff1::new(&key)) };
3956        assert!(last_object_id.try_get_next().is_some());
3957        assert!(last_object_id.try_get_next().is_some());
3958    }
3959
3960    #[fuchsia::test]
3961    async fn test_last_object_id_is_correct_after_unlock() {
3962        let fs = test_filesystem().await;
3963        let crypt = Arc::new(new_insecure_crypt());
3964
3965        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3966        let store = root_volume
3967            .new_volume(
3968                "test",
3969                NewChildStoreOptions {
3970                    options: StoreOptions { crypt: Some(crypt.clone()), ..StoreOptions::default() },
3971                    ..Default::default()
3972                },
3973            )
3974            .await
3975            .expect("new_volume failed");
3976
3977        let mut transaction = fs
3978            .clone()
3979            .new_transaction(
3980                lock_keys![LockKey::object(
3981                    store.store_object_id(),
3982                    store.root_directory_object_id()
3983                )],
3984                Options::default(),
3985            )
3986            .await
3987            .expect("new_transaction failed");
3988        let root_directory =
3989            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
3990        root_directory
3991            .create_child_file(&mut transaction, "test")
3992            .await
3993            .expect("create_child_file failed");
3994        transaction.commit().await.expect("commit failed");
3995
3996        // Compact so that StoreInfo is written.
3997        fs.journal().force_compact().await.unwrap();
3998
3999        let last_object_id = store.last_object_id.lock().id();
4000
4001        store.lock().await.unwrap();
4002        store.unlock(NO_OWNER, crypt.clone()).await.unwrap();
4003
4004        assert_eq!(store.last_object_id.lock().id(), last_object_id);
4005    }
4006
4007    #[fuchsia::test(threads = 20)]
4008    async fn test_race_when_rolling_last_object_id_cipher() {
4009        // NOTE: This test is trying to test a race, so if it fails, it might be flaky.
4010
4011        const NUM_THREADS: usize = 20;
4012
4013        let fs = test_filesystem().await;
4014        let crypt = Arc::new(new_insecure_crypt());
4015
4016        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
4017        let store = root_volume
4018            .new_volume(
4019                "test",
4020                NewChildStoreOptions {
4021                    options: StoreOptions { crypt: Some(crypt.clone()), ..StoreOptions::default() },
4022                    ..Default::default()
4023                },
4024            )
4025            .await
4026            .expect("new_volume failed");
4027
4028        let store_id = store.store_object_id();
4029        let root_dir_id = store.root_directory_object_id();
4030
4031        let root_directory =
4032            Arc::new(Directory::open(&store, root_dir_id).await.expect("open failed"));
4033
4034        // Create directories.
4035        let mut directories = Vec::new();
4036        for _ in 0..NUM_THREADS {
4037            let mut transaction = fs
4038                .clone()
4039                .new_transaction(
4040                    lock_keys![LockKey::object(store_id, root_dir_id,)],
4041                    Options::default(),
4042                )
4043                .await
4044                .expect("new_transaction failed");
4045            directories.push(
4046                root_directory
4047                    .create_child_dir(&mut transaction, "test")
4048                    .await
4049                    .expect("create_child_file failed"),
4050            );
4051            transaction.commit().await.expect("commit failed");
4052        }
4053
4054        // Hack the last object ID so that the next ID will require a roll.
4055        match &mut *store.last_object_id.lock() {
4056            LastObjectId::Encrypted { id, .. } => *id |= 0xffff_ffff,
4057            _ => unreachable!(),
4058        }
4059
4060        let scope = fasync::Scope::new();
4061
4062        let _executor_tasks = testing::force_executor_threads_to_run(NUM_THREADS).await;
4063
4064        for dir in directories {
4065            let fs = fs.clone();
4066            scope.spawn(async move {
4067                let mut transaction = fs
4068                    .clone()
4069                    .new_transaction(
4070                        lock_keys![LockKey::object(store_id, dir.object_id(),)],
4071                        Options::default(),
4072                    )
4073                    .await
4074                    .expect("new_transaction failed");
4075                dir.create_child_file(&mut transaction, "test")
4076                    .await
4077                    .expect("create_child_file failed");
4078                transaction.commit().await.expect("commit failed");
4079            });
4080        }
4081
4082        scope.on_no_tasks().await;
4083
4084        assert_eq!(store.last_object_id.lock().id(), 0x1_0000_0000 + NUM_THREADS as u64 - 1);
4085    }
4086
4087    #[fuchsia::test(threads = 10)]
4088    async fn test_lock_store() {
4089        let fs = test_filesystem().await;
4090        let crypt = Arc::new(new_insecure_crypt());
4091
4092        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
4093        let store = root_volume
4094            .new_volume(
4095                "test",
4096                NewChildStoreOptions {
4097                    options: StoreOptions { crypt: Some(crypt.clone()), ..StoreOptions::default() },
4098                    ..NewChildStoreOptions::default()
4099                },
4100            )
4101            .await
4102            .expect("new_volume failed");
4103        let mut transaction = fs
4104            .clone()
4105            .new_transaction(
4106                lock_keys![LockKey::object(
4107                    store.store_object_id(),
4108                    store.root_directory_object_id()
4109                )],
4110                Options::default(),
4111            )
4112            .await
4113            .expect("new_transaction failed");
4114        let root_directory =
4115            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
4116        root_directory
4117            .create_child_file(&mut transaction, "test")
4118            .await
4119            .expect("create_child_file failed");
4120        transaction.commit().await.expect("commit failed");
4121        store.lock().await.expect("lock failed");
4122
4123        store.unlock(NO_OWNER, crypt).await.expect("unlock failed");
4124        root_directory.lookup("test").await.expect("lookup failed").expect("not found");
4125    }
4126
4127    #[fuchsia::test(threads = 10)]
4128    async fn test_unlock_read_only() {
4129        let fs = test_filesystem().await;
4130        let crypt = Arc::new(new_insecure_crypt());
4131
4132        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
4133        let store = root_volume
4134            .new_volume(
4135                "test",
4136                NewChildStoreOptions {
4137                    options: StoreOptions { crypt: Some(crypt.clone()), ..StoreOptions::default() },
4138                    ..NewChildStoreOptions::default()
4139                },
4140            )
4141            .await
4142            .expect("new_volume failed");
4143        let mut transaction = fs
4144            .clone()
4145            .new_transaction(
4146                lock_keys![LockKey::object(
4147                    store.store_object_id(),
4148                    store.root_directory_object_id()
4149                )],
4150                Options::default(),
4151            )
4152            .await
4153            .expect("new_transaction failed");
4154        let root_directory =
4155            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
4156        root_directory
4157            .create_child_file(&mut transaction, "test")
4158            .await
4159            .expect("create_child_file failed");
4160        transaction.commit().await.expect("commit failed");
4161        store.lock().await.expect("lock failed");
4162
4163        store.unlock_read_only(crypt.clone()).await.expect("unlock failed");
4164        root_directory.lookup("test").await.expect("lookup failed").expect("not found");
4165        store.lock_read_only();
4166        store.unlock_read_only(crypt).await.expect("unlock failed");
4167        root_directory.lookup("test").await.expect("lookup failed").expect("not found");
4168    }
4169
4170    #[fuchsia::test(threads = 10)]
4171    async fn test_key_rolled_when_unlocked() {
4172        let fs = test_filesystem().await;
4173        let crypt = Arc::new(new_insecure_crypt());
4174
4175        let object_id;
4176        {
4177            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
4178            let store = root_volume
4179                .new_volume(
4180                    "test",
4181                    NewChildStoreOptions {
4182                        options: StoreOptions {
4183                            crypt: Some(crypt.clone()),
4184                            ..StoreOptions::default()
4185                        },
4186                        ..Default::default()
4187                    },
4188                )
4189                .await
4190                .expect("new_volume failed");
4191            let mut transaction = fs
4192                .clone()
4193                .new_transaction(
4194                    lock_keys![LockKey::object(
4195                        store.store_object_id(),
4196                        store.root_directory_object_id()
4197                    )],
4198                    Options::default(),
4199                )
4200                .await
4201                .expect("new_transaction failed");
4202            let root_directory = Directory::open(&store, store.root_directory_object_id())
4203                .await
4204                .expect("open failed");
4205            object_id = root_directory
4206                .create_child_file(&mut transaction, "test")
4207                .await
4208                .expect("create_child_file failed")
4209                .object_id();
4210            transaction.commit().await.expect("commit failed");
4211        }
4212
4213        fs.close().await.expect("Close failed");
4214        let mut device = fs.take_device().await;
4215
4216        // Repeatedly remount so that we can be sure that we can remount when there are many
4217        // mutations keys.
4218        for _ in 0..100 {
4219            device.reopen(false);
4220            let fs = FxFilesystem::open(device).await.expect("open failed");
4221            {
4222                let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
4223                let store = root_volume
4224                    .volume(
4225                        "test",
4226                        StoreOptions { crypt: Some(crypt.clone()), ..StoreOptions::default() },
4227                    )
4228                    .await
4229                    .expect("open_volume failed");
4230
4231                // The key should get rolled every time we unlock.
4232                assert_eq!(store.mutations_cipher.lock().as_ref().unwrap().offset(), 0);
4233
4234                // Make sure there's an encrypted mutation.
4235                let handle =
4236                    ObjectStore::open_object(&store, object_id, HandleOptions::default(), None)
4237                        .await
4238                        .expect("open_object failed");
4239                let buffer = handle.allocate_buffer(100).await;
4240                handle
4241                    .write_or_append(Some(0), buffer.as_ref())
4242                    .await
4243                    .expect("write_or_append failed");
4244            }
4245            fs.close().await.expect("Close failed");
4246            device = fs.take_device().await;
4247        }
4248    }
4249
4250    #[test]
4251    fn test_store_info_max_serialized_size() {
4252        let info = StoreInfo {
4253            guid: [0xff; 16],
4254            last_object_id: LastObjectIdInfo::Encrypted {
4255                id: 0x1234567812345678,
4256                key: FxfsKey {
4257                    wrapping_key_id: 0x1234567812345678u128.to_le_bytes(),
4258                    key: WrappedKeyBytes::from([0xff; FXFS_WRAPPED_KEY_SIZE]),
4259                },
4260            },
4261            // Worst case, each layer should be 3/4 the size of the layer below it (because of the
4262            // compaction policy we're using).  If the smallest layer is 8,192 bytes, then 120
4263            // layers would take up a size that exceeds a 64 bit unsigned integer, so if this fits,
4264            // any size should fit.
4265            layers: vec![0x1234567812345678; 120],
4266            root_directory_object_id: 0x1234567812345678,
4267            graveyard_directory_object_id: 0x1234567812345678,
4268            object_count: 0x1234567812345678,
4269            mutations_key: Some(FxfsKey {
4270                wrapping_key_id: 0x1234567812345678u128.to_le_bytes(),
4271                key: WrappedKeyBytes::from([0xff; FXFS_WRAPPED_KEY_SIZE]),
4272            }),
4273            mutations_cipher_offset: 0x1234567812345678,
4274            encrypted_mutations_object_id: 0x1234567812345678,
4275            internal_directory_object_id: INVALID_OBJECT_ID,
4276        };
4277        let mut serialized_info = Vec::new();
4278        info.serialize_with_version(&mut serialized_info).unwrap();
4279        assert!(
4280            serialized_info.len() <= MAX_STORE_INFO_SERIALIZED_SIZE,
4281            "{}",
4282            serialized_info.len()
4283        );
4284    }
4285
4286    async fn reopen_after_crypt_failure_inner(read_only: bool) {
4287        let fs = test_filesystem().await;
4288        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
4289
4290        let store = {
4291            let crypt = Arc::new(new_insecure_crypt());
4292            let store = root_volume
4293                .new_volume(
4294                    "vol",
4295                    NewChildStoreOptions {
4296                        options: StoreOptions {
4297                            crypt: Some(crypt.clone()),
4298                            ..StoreOptions::default()
4299                        },
4300                        ..Default::default()
4301                    },
4302                )
4303                .await
4304                .expect("new_volume failed");
4305            let root_directory = Directory::open(&store, store.root_directory_object_id())
4306                .await
4307                .expect("open failed");
4308            let mut transaction = fs
4309                .clone()
4310                .new_transaction(
4311                    lock_keys![LockKey::object(
4312                        store.store_object_id(),
4313                        root_directory.object_id()
4314                    )],
4315                    Options::default(),
4316                )
4317                .await
4318                .expect("new_transaction failed");
4319            root_directory
4320                .create_child_file(&mut transaction, "test")
4321                .await
4322                .expect("create_child_file failed");
4323            transaction.commit().await.expect("commit failed");
4324
4325            crypt.shutdown();
4326            let mut transaction = fs
4327                .clone()
4328                .new_transaction(
4329                    lock_keys![LockKey::object(
4330                        store.store_object_id(),
4331                        root_directory.object_id()
4332                    )],
4333                    Options::default(),
4334                )
4335                .await
4336                .expect("new_transaction failed");
4337            root_directory
4338                .create_child_file(&mut transaction, "test2")
4339                .await
4340                .map(|_| ())
4341                .expect_err("create_child_file should fail");
4342            store.lock().await.expect("lock failed");
4343            store
4344        };
4345
4346        let crypt = Arc::new(new_insecure_crypt());
4347        if read_only {
4348            store.unlock_read_only(crypt).await.expect("unlock failed");
4349        } else {
4350            store.unlock(NO_OWNER, crypt).await.expect("unlock failed");
4351        }
4352        let root_directory =
4353            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
4354        root_directory.lookup("test").await.expect("lookup failed").expect("not found");
4355    }
4356
4357    #[fuchsia::test(threads = 10)]
4358    async fn test_reopen_after_crypt_failure() {
4359        reopen_after_crypt_failure_inner(false).await;
4360    }
4361
4362    #[fuchsia::test(threads = 10)]
4363    async fn test_reopen_read_only_after_crypt_failure() {
4364        reopen_after_crypt_failure_inner(true).await;
4365    }
4366
4367    #[fuchsia::test(threads = 10)]
4368    #[should_panic(expected = "Insufficient reservation space")]
4369    #[cfg(debug_assertions)]
4370    async fn large_transaction_causes_panic_in_debug_builds() {
4371        let fs = test_filesystem().await;
4372        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
4373        let store = root_volume
4374            .new_volume("vol", NewChildStoreOptions::default())
4375            .await
4376            .expect("new_volume failed");
4377        let root_directory =
4378            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
4379        let mut transaction = fs
4380            .clone()
4381            .new_transaction(
4382                lock_keys![LockKey::object(store.store_object_id(), root_directory.object_id())],
4383                Options::default(),
4384            )
4385            .await
4386            .expect("transaction");
4387        for i in 0..500 {
4388            root_directory
4389                .create_symlink(&mut transaction, b"link", &format!("{}", i))
4390                .await
4391                .expect("symlink");
4392        }
4393        assert_eq!(transaction.commit().await.expect("commit"), 0);
4394    }
4395
4396    #[fuchsia::test]
4397    async fn test_crypt_failure_does_not_fuse_journal() {
4398        let fs = test_filesystem().await;
4399
4400        struct Owner;
4401        #[async_trait]
4402        impl StoreOwner for Owner {
4403            async fn force_lock(self: Arc<Self>, store: &ObjectStore) -> Result<(), anyhow::Error> {
4404                store.lock().await
4405            }
4406        }
4407        let owner = Arc::new(Owner) as Arc<dyn StoreOwner>;
4408
4409        {
4410            // Create two stores and a record for each store, so the journal will need to flush them
4411            // both later.
4412            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
4413            let store1 = root_volume
4414                .new_volume(
4415                    "vol1",
4416                    NewChildStoreOptions {
4417                        options: StoreOptions {
4418                            crypt: Some(Arc::new(new_insecure_crypt())),
4419                            ..StoreOptions::default()
4420                        },
4421                        ..Default::default()
4422                    },
4423                )
4424                .await
4425                .expect("new_volume failed");
4426            let crypt = Arc::new(new_insecure_crypt());
4427            let store2 = root_volume
4428                .new_volume(
4429                    "vol2",
4430                    NewChildStoreOptions {
4431                        options: StoreOptions {
4432                            owner: Arc::downgrade(&owner),
4433                            crypt: Some(crypt.clone()),
4434                        },
4435                        ..Default::default()
4436                    },
4437                )
4438                .await
4439                .expect("new_volume failed");
4440            for store in [&store1, &store2] {
4441                let root_directory = Directory::open(store, store.root_directory_object_id())
4442                    .await
4443                    .expect("open failed");
4444                let mut transaction = fs
4445                    .clone()
4446                    .new_transaction(
4447                        lock_keys![LockKey::object(
4448                            store.store_object_id(),
4449                            root_directory.object_id()
4450                        )],
4451                        Options::default(),
4452                    )
4453                    .await
4454                    .expect("new_transaction failed");
4455                root_directory
4456                    .create_child_file(&mut transaction, "test")
4457                    .await
4458                    .expect("create_child_file failed");
4459                transaction.commit().await.expect("commit failed");
4460            }
4461            // Shut down the crypt instance for store2, and then compact.  Compaction should not
4462            // fail, and the store should become locked.
4463            crypt.shutdown();
4464            fs.journal().force_compact().await.expect("compact failed");
4465            // The store should now be locked.
4466            assert!(store2.is_locked());
4467        }
4468
4469        // Even though the store wasn't flushed, the mutation to store2 will still be valid as it is
4470        // held in the journal.
4471        fs.close().await.expect("close failed");
4472        let device = fs.take_device().await;
4473        device.reopen(false);
4474        let fs = FxFilesystem::open(device).await.expect("open failed");
4475        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
4476
4477        for volume_name in ["vol1", "vol2"] {
4478            let store = root_volume
4479                .volume(
4480                    volume_name,
4481                    StoreOptions {
4482                        crypt: Some(Arc::new(new_insecure_crypt())),
4483                        ..StoreOptions::default()
4484                    },
4485                )
4486                .await
4487                .expect("open volume failed");
4488            let root_directory = Directory::open(&store, store.root_directory_object_id())
4489                .await
4490                .expect("open failed");
4491            assert!(root_directory.lookup("test").await.expect("lookup failed").is_some());
4492        }
4493
4494        fs.close().await.expect("close failed");
4495    }
4496
4497    #[fuchsia::test]
4498    async fn test_crypt_failure_during_unlock_race() {
4499        let fs = test_filesystem().await;
4500
4501        struct Owner;
4502        #[async_trait]
4503        impl StoreOwner for Owner {
4504            async fn force_lock(self: Arc<Self>, store: &ObjectStore) -> Result<(), anyhow::Error> {
4505                store.lock().await
4506            }
4507        }
4508        let owner = Arc::new(Owner) as Arc<dyn StoreOwner>;
4509
4510        let store_object_id = {
4511            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
4512            let store = root_volume
4513                .new_volume(
4514                    "vol",
4515                    NewChildStoreOptions {
4516                        options: StoreOptions {
4517                            owner: Arc::downgrade(&owner),
4518                            crypt: Some(Arc::new(new_insecure_crypt())),
4519                        },
4520                        ..Default::default()
4521                    },
4522                )
4523                .await
4524                .expect("new_volume failed");
4525            let root_directory = Directory::open(&store, store.root_directory_object_id())
4526                .await
4527                .expect("open failed");
4528            let mut transaction = fs
4529                .clone()
4530                .new_transaction(
4531                    lock_keys![LockKey::object(
4532                        store.store_object_id(),
4533                        root_directory.object_id()
4534                    )],
4535                    Options::default(),
4536                )
4537                .await
4538                .expect("new_transaction failed");
4539            root_directory
4540                .create_child_file(&mut transaction, "test")
4541                .await
4542                .expect("create_child_file failed");
4543            transaction.commit().await.expect("commit failed");
4544            store.store_object_id()
4545        };
4546
4547        fs.close().await.expect("close failed");
4548        let device = fs.take_device().await;
4549        device.reopen(false);
4550
4551        let fs = FxFilesystem::open(device).await.expect("open failed");
4552        {
4553            let fs_clone = fs.clone();
4554            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
4555
4556            let crypt = Arc::new(new_insecure_crypt());
4557            let crypt_clone = crypt.clone();
4558            join!(
4559                async move {
4560                    // Unlock might fail, so ignore errors.
4561                    let _ = root_volume
4562                        .volume(
4563                            "vol",
4564                            StoreOptions {
4565                                owner: Arc::downgrade(&owner),
4566                                crypt: Some(crypt_clone),
4567                            },
4568                        )
4569                        .await;
4570                },
4571                async move {
4572                    // Block until unlock is finished but before flushing due to unlock is finished, to
4573                    // maximize the chances of weirdness.
4574                    let keys = lock_keys![LockKey::flush(store_object_id)];
4575                    let _ = fs_clone.lock_manager().write_lock(keys).await;
4576                    crypt.shutdown();
4577                }
4578            );
4579        }
4580
4581        fs.close().await.expect("close failed");
4582        let device = fs.take_device().await;
4583        device.reopen(false);
4584
4585        let fs = FxFilesystem::open(device).await.expect("open failed");
4586        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
4587        let store = root_volume
4588            .volume(
4589                "vol",
4590                StoreOptions {
4591                    crypt: Some(Arc::new(new_insecure_crypt())),
4592                    ..StoreOptions::default()
4593                },
4594            )
4595            .await
4596            .expect("open volume failed");
4597        let root_directory =
4598            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
4599        assert!(root_directory.lookup("test").await.expect("lookup failed").is_some());
4600
4601        fs.close().await.expect("close failed");
4602    }
4603
4604    #[fuchsia::test]
4605    async fn test_low_32_bit_object_ids() {
4606        let device = DeviceHolder::new(FakeDevice::new(16384, TEST_DEVICE_BLOCK_SIZE));
4607        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
4608
4609        {
4610            let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
4611
4612            let store = root_vol
4613                .new_volume(
4614                    "test",
4615                    NewChildStoreOptions { low_32_bit_object_ids: true, ..Default::default() },
4616                )
4617                .await
4618                .expect("new_volume failed");
4619
4620            let root_dir = Directory::open(&store, store.root_directory_object_id())
4621                .await
4622                .expect("open failed");
4623
4624            let mut ids = std::collections::HashSet::new();
4625
4626            for i in 0..100 {
4627                let mut transaction = fs
4628                    .clone()
4629                    .new_transaction(
4630                        lock_keys![LockKey::object(store.store_object_id(), root_dir.object_id())],
4631                        Options::default(),
4632                    )
4633                    .await
4634                    .expect("new_transaction failed");
4635
4636                for j in 0..100 {
4637                    let object = root_dir
4638                        .create_child_dir(&mut transaction, &format!("{i}.{j}"))
4639                        .await
4640                        .expect("create_child_file failed");
4641
4642                    assert!(object.object_id() < 1 << 32);
4643                    assert_ne!(object.object_id(), INVALID_OBJECT_ID);
4644                    assert!(ids.insert(object.object_id()));
4645                }
4646
4647                transaction.commit().await.expect("commit failed");
4648            }
4649
4650            assert_matches!(store.store_info().unwrap().last_object_id, LastObjectIdInfo::Low32Bit);
4651
4652            fsck_volume(&fs, store.store_object_id(), None).await.expect("fsck_volume failed");
4653        }
4654
4655        // Verify persistence
4656        fs.close().await.expect("Close failed");
4657        let device = fs.take_device().await;
4658        device.reopen(false);
4659        let fs = FxFilesystem::open(device).await.expect("open failed");
4660        let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
4661        let store = root_vol.volume("test", StoreOptions::default()).await.expect("volume failed");
4662
4663        // Check that we can still create files and they have low 32-bit IDs.
4664        let root_dir =
4665            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
4666        let mut transaction = fs
4667            .clone()
4668            .new_transaction(
4669                lock_keys![LockKey::object(store.store_object_id(), root_dir.object_id())],
4670                Options::default(),
4671            )
4672            .await
4673            .expect("new_transaction failed");
4674
4675        let object = root_dir
4676            .create_child_file(&mut transaction, "persistence_check")
4677            .await
4678            .expect("create_child_file failed");
4679        assert!(object.object_id() < 1 << 32);
4680
4681        transaction.commit().await.expect("commit failed");
4682
4683        assert_matches!(store.store_info().unwrap().last_object_id, LastObjectIdInfo::Low32Bit);
4684    }
4685}