fxfs/
object_store.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5pub mod allocator;
6pub mod caching_object_handle;
7pub mod data_object_handle;
8pub mod directory;
9mod extent_mapping_iterator;
10mod extent_record;
11mod flush;
12pub mod graveyard;
13mod install;
14pub mod journal;
15mod key_manager;
16pub(crate) mod merge;
17pub mod object_manager;
18pub mod object_record;
19pub mod project_id;
20mod store_object_handle;
21pub mod transaction;
22mod tree;
23mod tree_cache;
24pub mod volume;
25
26pub use data_object_handle::{
27    DataObjectHandle, DirectWriter, FileExtent, FsverityState, FsverityStateInner, RangeType,
28};
29pub use directory::Directory;
30pub use object_record::{ChildValue, ObjectDescriptor, PosixAttributes, Timestamp};
31pub use store_object_handle::{
32    EXTENDED_ATTRIBUTE_RANGE_END, EXTENDED_ATTRIBUTE_RANGE_START, SetExtendedAttributeMode,
33    StoreObjectHandle,
34};
35
36use crate::errors::FxfsError;
37use crate::filesystem::{
38    ApplyContext, ApplyMode, FxFilesystem, JournalingObject, MAX_FILE_SIZE, SyncOptions,
39    TruncateGuard, TxnGuard,
40};
41use crate::log::*;
42use crate::lsm_tree::cache::{NullCache, ObjectCache};
43use crate::lsm_tree::types::{Existence, Item, ItemRef, LayerIterator};
44use crate::lsm_tree::{LSMTree, Query};
45use crate::object_handle::{INVALID_OBJECT_ID, ObjectHandle, ObjectProperties, ReadObjectHandle};
46use crate::object_store::allocator::Allocator;
47use crate::object_store::graveyard::Graveyard;
48use crate::object_store::journal::{JournalCheckpoint, JournalCheckpointV32, JournaledTransaction};
49use crate::object_store::key_manager::KeyManager;
50use crate::object_store::transaction::{
51    AssocObj, AssociatedObject, LockKey, ObjectStoreMutation, Operation, Options, Transaction,
52    lock_keys,
53};
54use crate::range::RangeExt;
55use crate::round::round_up;
56use crate::serialized_types::{Version, Versioned, VersionedLatest};
57use anyhow::{Context, Error, anyhow, bail, ensure};
58use async_trait::async_trait;
59use fidl_fuchsia_io as fio;
60use fprint::TypeFingerprint;
61use fuchsia_sync::Mutex;
62use fxfs_crypto::ff1::Ff1;
63use fxfs_crypto::{
64    CipherHolder, Crypt, KeyPurpose, ObjectType, StreamCipher, UnwrappedKey, WrappingKeyId,
65    key_to_cipher,
66};
67use fxfs_macros::{Migrate, migrate_to_version};
68use rand::RngCore;
69use scopeguard::ScopeGuard;
70use serde::{Deserialize, Serialize};
71use std::collections::HashSet;
72use std::fmt;
73use std::num::NonZero;
74use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
75use std::sync::{Arc, OnceLock, Weak};
76use storage_device::Device;
77use uuid::Uuid;
78
79pub use extent_record::{
80    BLOB_MERKLE_ATTRIBUTE_ID, DEFAULT_DATA_ATTRIBUTE_ID, ExtentKey, ExtentMode, ExtentValue,
81    FSVERITY_MERKLE_ATTRIBUTE_ID,
82};
83pub use object_record::{
84    AttributeKey, EncryptionKey, EncryptionKeys, ExtendedAttributeValue, FsverityMetadata, FxfsKey,
85    FxfsKeyV40, FxfsKeyV49, ObjectAttributes, ObjectKey, ObjectKeyData, ObjectKind, ObjectValue,
86    ProjectProperty, RootDigest,
87};
88pub use transaction::Mutation;
89
90// For encrypted stores, the lower 32 bits of the object ID are encrypted to make side-channel
91// attacks more difficult. This mask can be used to extract the hi part of the object ID.
92const OBJECT_ID_HI_MASK: u64 = 0xffffffff00000000;
93
94// At time of writing, this threshold limits transactions that delete extents to about 10,000 bytes.
95const TRANSACTION_MUTATION_THRESHOLD: usize = 200;
96
97// Encrypted files and directories use the fscrypt key (identified by `FSCRYPT_KEY_ID`) to encrypt
98// file contents and filenames respectively. All non-fscrypt encrypted files otherwise default to
99// using the `VOLUME_DATA_KEY_ID` key. Note, the filesystem always uses the `VOLUME_DATA_KEY_ID`
100// key to encrypt large extended attributes. Thus, encrypted files and directories with large
101// xattrs will have both an fscrypt and volume data key.
102pub const VOLUME_DATA_KEY_ID: u64 = 0;
103pub const FSCRYPT_KEY_ID: u64 = 1;
104
105/// A constant that can be used where an owner is expected of type `Weak<dyn StoreOwner>` but no
106/// owner is required.
107pub const NO_OWNER: Weak<()> = Weak::new();
108impl StoreOwner for () {}
109
110#[async_trait]
111pub trait StoreOwner: Send + Sync {
112    /// Forcibly lock the store.  This exists to give the StoreOwner an opportunity to clean up
113    /// tasks which might access the store before locking it, because ObjectStore::unlock can only
114    /// be called when the store is not in use.
115    async fn force_lock(self: Arc<Self>, _store: &ObjectStore) -> Result<(), Error> {
116        Err(anyhow!(FxfsError::Internal))
117    }
118}
119
120/// DataObjectHandle stores an owner that must implement this trait, which allows the handle to get
121/// back to an ObjectStore.
122pub trait HandleOwner: AsRef<ObjectStore> + Send + Sync + 'static {}
123
124/// StoreInfo stores information about the object store.  This is stored within the parent object
125/// store, and is used, for example, to get the persistent layer objects.
126pub type StoreInfo = StoreInfoV52;
127
128#[derive(Clone, Debug, Default, Serialize, Deserialize, TypeFingerprint, Versioned)]
129pub struct StoreInfoV52 {
130    /// The globally unique identifier for the associated object store. If unset, will be all zero.
131    guid: [u8; 16],
132
133    /// The last used object ID.  Note that this field is not accurate in memory; ObjectStore's
134    /// last_object_id field is the one to use in that case.  Technically, this might not be the
135    /// last object ID used for the latest transaction that created an object because we use this at
136    /// the point of creating the object but before we commit the transaction.  Transactions can
137    /// then get committed in an arbitrary order (or not at all).
138    last_object_id: LastObjectIdInfo,
139
140    /// Object ids for layers.  TODO(https://fxbug.dev/42178036): need a layer of indirection here
141    /// so we can support snapshots.
142    pub layers: Vec<u64>,
143
144    /// The object ID for the root directory.
145    root_directory_object_id: u64,
146
147    /// The object ID for the graveyard.
148    graveyard_directory_object_id: u64,
149
150    /// The number of live objects in the store.  This should *not* be trusted; it can be invalid
151    /// due to filesystem inconsistencies.
152    object_count: u64,
153
154    /// The (wrapped) key that encrypted mutations should use.
155    mutations_key: Option<FxfsKeyV49>,
156
157    /// Mutations for the store are encrypted using a stream cipher.  To decrypt the mutations, we
158    /// need to know the offset in the cipher stream to start it.
159    mutations_cipher_offset: u64,
160
161    /// If we have to flush the store whilst we do not have the key, we need to write the encrypted
162    /// mutations to an object. This is the object ID of that file if it exists.
163    pub encrypted_mutations_object_id: u64,
164
165    /// A directory for storing internal files in a directory structure. Holds INVALID_OBJECT_ID
166    /// when the directory doesn't yet exist.
167    internal_directory_object_id: u64,
168}
169
170#[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint)]
171enum LastObjectIdInfo {
172    Unencrypted {
173        id: u64,
174    },
175    Encrypted {
176        /// The *unencrypted* value of the last object ID.
177        id: u64,
178
179        /// Object IDs are encrypted to reduce the amount of information that sequential object IDs
180        /// reveal (such as the number of files in the system and the ordering of their creation in
181        /// time).  Only the bottom 32 bits of the object ID are encrypted whilst the top 32 bits
182        /// will increment after 2^32 object IDs have been used and this allows us to roll the key.
183        key: FxfsKeyV49,
184    },
185    Low32Bit,
186}
187
188impl Default for LastObjectIdInfo {
189    fn default() -> Self {
190        LastObjectIdInfo::Unencrypted { id: 0 }
191    }
192}
193
194#[derive(Default, Serialize, Deserialize, TypeFingerprint, Versioned)]
195pub struct StoreInfoV49 {
196    guid: [u8; 16],
197    last_object_id: u64,
198    layers: Vec<u64>,
199    root_directory_object_id: u64,
200    graveyard_directory_object_id: u64,
201    object_count: u64,
202    mutations_key: Option<FxfsKeyV49>,
203    mutations_cipher_offset: u64,
204    encrypted_mutations_object_id: u64,
205    object_id_key: Option<FxfsKeyV49>,
206    internal_directory_object_id: u64,
207}
208
209impl From<StoreInfoV49> for StoreInfoV52 {
210    fn from(value: StoreInfoV49) -> Self {
211        Self {
212            guid: value.guid,
213            last_object_id: if let Some(key) = value.object_id_key {
214                LastObjectIdInfo::Encrypted { id: value.last_object_id, key: key }
215            } else {
216                LastObjectIdInfo::Unencrypted { id: value.last_object_id }
217            },
218            layers: value.layers,
219            root_directory_object_id: value.root_directory_object_id,
220            graveyard_directory_object_id: value.graveyard_directory_object_id,
221            object_count: value.object_count,
222            mutations_key: value.mutations_key,
223            mutations_cipher_offset: value.mutations_cipher_offset,
224            encrypted_mutations_object_id: value.encrypted_mutations_object_id,
225            internal_directory_object_id: value.internal_directory_object_id,
226        }
227    }
228}
229
230#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
231#[migrate_to_version(StoreInfoV49)]
232pub struct StoreInfoV40 {
233    guid: [u8; 16],
234    last_object_id: u64,
235    layers: Vec<u64>,
236    root_directory_object_id: u64,
237    graveyard_directory_object_id: u64,
238    object_count: u64,
239    mutations_key: Option<FxfsKeyV40>,
240    mutations_cipher_offset: u64,
241    encrypted_mutations_object_id: u64,
242    object_id_key: Option<FxfsKeyV40>,
243    internal_directory_object_id: u64,
244}
245
246impl StoreInfo {
247    /// Returns the parent objects for this store.
248    pub fn parent_objects(&self) -> Vec<u64> {
249        // We should not include the ID of the store itself, since that should be referred to in the
250        // volume directory.
251        let mut objects = self.layers.to_vec();
252        if self.encrypted_mutations_object_id != INVALID_OBJECT_ID {
253            objects.push(self.encrypted_mutations_object_id);
254        }
255        objects
256    }
257}
258
259// TODO(https://fxbug.dev/42178037): We should test or put checks in place to ensure this limit isn't exceeded.
260// It will likely involve placing limits on the maximum number of layers.
261pub const MAX_STORE_INFO_SERIALIZED_SIZE: usize = 131072;
262
263// This needs to be large enough to accommodate the maximum amount of unflushed data (data that is
264// in the journal but hasn't yet been written to layer files) for a store.  We set a limit because
265// we want to limit the amount of memory use in the case the filesystem is corrupt or under attack.
266pub const MAX_ENCRYPTED_MUTATIONS_SIZE: usize = 8 * journal::DEFAULT_RECLAIM_SIZE as usize;
267
268#[derive(Default)]
269pub struct HandleOptions {
270    /// If true, transactions used by this handle will skip journal space checks.
271    pub skip_journal_checks: bool,
272    /// If true, data written to any attribute of this handle will not have per-block checksums
273    /// computed.
274    pub skip_checksums: bool,
275    /// If true, any files using fsverity will not attempt to perform any verification. This is
276    /// useful to open an object without the correct encryption keys to look at the metadata.
277    pub skip_fsverity: bool,
278}
279
280/// Parameters for encrypting a newly created object.
281pub struct ObjectEncryptionOptions {
282    /// If set, the keys are treated as permanent and never evicted from the KeyManager cache.
283    /// This is necessary when keys are managed by another store; for example, the layer files
284    /// of a child store are objects in the root store, but they are encrypted with keys from the
285    /// child store.  Generally, most objects should have this set to `false`.
286    pub permanent: bool,
287    pub key_id: u64,
288    pub key: EncryptionKey,
289    pub unwrapped_key: UnwrappedKey,
290}
291
292pub struct StoreOptions {
293    /// The owner of the store.
294    pub owner: Weak<dyn StoreOwner>,
295
296    /// The store is unencrypted if store is none.
297    pub crypt: Option<Arc<dyn Crypt>>,
298}
299
300impl Default for StoreOptions {
301    fn default() -> Self {
302        Self { owner: NO_OWNER, crypt: None }
303    }
304}
305
306#[derive(Default)]
307pub struct NewChildStoreOptions {
308    pub options: StoreOptions,
309
310    /// Specifies the object ID in the root store to be used for the store.  If set to
311    /// INVALID_OBJECT_ID (the default and typical case), a suitable ID will be chosen.
312    pub object_id: u64,
313
314    /// If true, reserve all 32 bit object_ids.  All new objects will start with IDs exceeding
315    /// 0x1_0000_0000.
316    pub reserve_32bit_object_ids: bool,
317
318    /// Object IDs will be restricted to 32 bits.  This involves a less performant algorithm and so
319    /// should not be used unless necessary.
320    pub low_32_bit_object_ids: bool,
321
322    /// If set, use this GUID for the new store.
323    pub guid: Option<[u8; 16]>,
324}
325
326pub type EncryptedMutations = EncryptedMutationsV49;
327
328#[derive(Clone, Default, Deserialize, Serialize, TypeFingerprint)]
329pub struct EncryptedMutationsV49 {
330    // Information about the mutations are held here, but the actual encrypted data is held within
331    // data.  For each transaction, we record the checkpoint and the count of mutations within the
332    // transaction.  The checkpoint is required for the log file offset (which we need to apply the
333    // mutations), and the version so that we can correctly decode the mutation after it has been
334    // decrypted. The count specifies the number of serialized mutations encoded in |data|.
335    transactions: Vec<(JournalCheckpointV32, u64)>,
336
337    // The encrypted mutations.
338    data: Vec<u8>,
339
340    // If the mutations key was rolled, this holds the offset in `data` where the new key should
341    // apply.
342    mutations_key_roll: Vec<(usize, FxfsKeyV49)>,
343}
344
345impl std::fmt::Debug for EncryptedMutations {
346    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
347        f.debug_struct("EncryptedMutations")
348            .field("transactions", &self.transactions)
349            .field("len", &self.data.len())
350            .field(
351                "mutations_key_roll",
352                &self.mutations_key_roll.iter().map(|k| k.0).collect::<Vec<usize>>(),
353            )
354            .finish()
355    }
356}
357
358impl Versioned for EncryptedMutations {
359    fn max_serialized_size() -> u64 {
360        MAX_ENCRYPTED_MUTATIONS_SIZE as u64
361    }
362}
363
364impl From<EncryptedMutationsV40> for EncryptedMutationsV49 {
365    fn from(value: EncryptedMutationsV40) -> Self {
366        EncryptedMutationsV49 {
367            transactions: value.transactions,
368            data: value.data,
369            mutations_key_roll: value
370                .mutations_key_roll
371                .into_iter()
372                .map(|(offset, key)| (offset, key.into()))
373                .collect(),
374        }
375    }
376}
377
378#[derive(Deserialize, Serialize, TypeFingerprint)]
379pub struct EncryptedMutationsV40 {
380    transactions: Vec<(JournalCheckpointV32, u64)>,
381    data: Vec<u8>,
382    mutations_key_roll: Vec<(usize, FxfsKeyV40)>,
383}
384
385impl Versioned for EncryptedMutationsV40 {
386    fn max_serialized_size() -> u64 {
387        MAX_ENCRYPTED_MUTATIONS_SIZE as u64
388    }
389}
390
391impl EncryptedMutations {
392    fn from_replayed_mutations(
393        store_object_id: u64,
394        transactions: Vec<JournaledTransaction>,
395    ) -> Self {
396        let mut this = Self::default();
397        for JournaledTransaction { checkpoint, non_root_mutations, .. } in transactions {
398            for (object_id, mutation) in non_root_mutations {
399                if store_object_id == object_id {
400                    if let Mutation::EncryptedObjectStore(data) = mutation {
401                        this.push(&checkpoint, data);
402                    } else if let Mutation::UpdateMutationsKey(key) = mutation {
403                        this.mutations_key_roll.push((this.data.len(), key.into()));
404                    }
405                }
406            }
407        }
408        this
409    }
410
411    fn extend(&mut self, other: &EncryptedMutations) {
412        self.transactions.extend_from_slice(&other.transactions[..]);
413        self.mutations_key_roll.extend(
414            other
415                .mutations_key_roll
416                .iter()
417                .map(|(offset, key)| (offset + self.data.len(), key.clone())),
418        );
419        self.data.extend_from_slice(&other.data[..]);
420    }
421
422    fn push(&mut self, checkpoint: &JournalCheckpoint, data: Box<[u8]>) {
423        self.data.append(&mut data.into());
424        // If the checkpoint is the same as the last mutation we pushed, increment the count.
425        if let Some((last_checkpoint, count)) = self.transactions.last_mut() {
426            if last_checkpoint.file_offset == checkpoint.file_offset {
427                *count += 1;
428                return;
429            }
430        }
431        self.transactions.push((checkpoint.clone(), 1));
432    }
433}
434
435pub enum LockState {
436    Locked,
437    Unencrypted,
438    Unlocked { owner: Weak<dyn StoreOwner>, crypt: Arc<dyn Crypt> },
439
440    // The store is unlocked, but in a read-only state, and no flushes or other operations will be
441    // performed on the store.
442    UnlockedReadOnly(Arc<dyn Crypt>),
443
444    // The store is encrypted but is now in an unusable state (due to a failure to sync the journal
445    // after locking the store).  The store cannot be unlocked.
446    Invalid,
447
448    // Before we've read the StoreInfo we might not know whether the store is Locked or Unencrypted.
449    // This can happen when lazily opening stores (ObjectManager::lazy_open_store).
450    Unknown,
451
452    // The store is in the process of being locked.  Whilst the store is being locked, the store
453    // isn't usable; assertions will trip if any mutations are applied.
454    Locking,
455
456    // Whilst we're unlocking, we will replay encrypted mutations.  The store isn't usable until
457    // it's in the Unlocked state.
458    Unlocking,
459
460    // The store has been deleted.
461    Deleted,
462}
463
464impl LockState {
465    fn owner(&self) -> Option<Arc<dyn StoreOwner>> {
466        if let Self::Unlocked { owner, .. } = self { owner.upgrade() } else { None }
467    }
468}
469
470impl fmt::Debug for LockState {
471    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
472        formatter.write_str(match self {
473            LockState::Locked => "Locked",
474            LockState::Unencrypted => "Unencrypted",
475            LockState::Unlocked { .. } => "Unlocked",
476            LockState::UnlockedReadOnly(..) => "UnlockedReadOnly",
477            LockState::Invalid => "Invalid",
478            LockState::Unknown => "Unknown",
479            LockState::Locking => "Locking",
480            LockState::Unlocking => "Unlocking",
481            LockState::Deleted => "Deleted",
482        })
483    }
484}
485
486enum LastObjectId {
487    // This is used when the store is encrypted, but the key and ID isn't yet available.
488    Pending,
489
490    Unencrypted {
491        id: u64,
492    },
493
494    Encrypted {
495        // The *unencrypted* value of the last object ID.
496        id: u64,
497
498        // Encrypted stores will use a cipher to obfuscate the object ID.
499        cipher: Box<Ff1>,
500    },
501
502    Low32Bit {
503        reserved: HashSet<u32>,
504        unreserved: Vec<u32>,
505    },
506}
507
508impl LastObjectId {
509    /// Tries to get the next object ID.  Returns None if a new cipher is required because all
510    /// object IDs that can be generated with the current cipher have been exhausted, or if only
511    /// using the lower 32 bits which requires an async algorithm.
512    fn try_get_next(&mut self) -> Option<NonZero<u64>> {
513        match self {
514            LastObjectId::Unencrypted { id } => {
515                NonZero::new(id.wrapping_add(1)).inspect(|next| *id = next.get())
516            }
517            LastObjectId::Encrypted { id, cipher } => {
518                let mut next = *id;
519                let hi = next & OBJECT_ID_HI_MASK;
520                loop {
521                    if next as u32 == u32::MAX {
522                        return None;
523                    }
524                    next += 1;
525                    let candidate = hi | cipher.encrypt(next as u32) as u64;
526                    if let Some(candidate) = NonZero::new(candidate) {
527                        *id = next;
528                        return Some(candidate);
529                    }
530                }
531            }
532            _ => None,
533        }
534    }
535
536    /// Returns INVALID_OBJECT_ID if it's not possible to peek at the next object ID.
537    fn peek_next(&self) -> u64 {
538        match self {
539            LastObjectId::Unencrypted { id } => id.wrapping_add(1),
540            LastObjectId::Encrypted { id, cipher } => {
541                let mut next = *id;
542                let hi = next & OBJECT_ID_HI_MASK;
543                loop {
544                    if next as u32 == u32::MAX {
545                        return INVALID_OBJECT_ID;
546                    }
547                    next += 1;
548                    let candidate = hi | cipher.encrypt(next as u32) as u64;
549                    if candidate != INVALID_OBJECT_ID {
550                        return candidate;
551                    }
552                }
553            }
554            _ => INVALID_OBJECT_ID,
555        }
556    }
557
558    /// Returns INVALID_OBJECT_ID for algorithms that don't use the last ID.
559    fn id(&self) -> u64 {
560        match self {
561            LastObjectId::Unencrypted { id } | LastObjectId::Encrypted { id, .. } => *id,
562            _ => INVALID_OBJECT_ID,
563        }
564    }
565
566    /// Returns true if `id` is reserved (it must be 32 bits).
567    fn is_reserved(&self, id: u64) -> bool {
568        match self {
569            LastObjectId::Low32Bit { reserved, .. } => {
570                if let Ok(id) = id.try_into() {
571                    reserved.contains(&id)
572                } else {
573                    false
574                }
575            }
576            _ => false,
577        }
578    }
579
580    /// Reserves `id`.
581    fn reserve(&mut self, id: u64) {
582        match self {
583            LastObjectId::Low32Bit { reserved, .. } => {
584                assert!(reserved.insert(id.try_into().unwrap()))
585            }
586            _ => unreachable!(),
587        }
588    }
589
590    /// Unreserves `id`.
591    fn unreserve(&mut self, id: u64) {
592        match self {
593            LastObjectId::Low32Bit { unreserved, .. } => {
594                // To avoid races, where a reserved ID transitions from being reserved to being
595                // actually used in a committed transaction, we delay updating `reserved` until a
596                // suitable point.
597                //
598                // On thread A, we might have:
599                //
600                //   A1. Commit transaction (insert a record into the LSM tree that uses ID)
601                //   A2. `unreserve`
602                //
603                // And on another thread B, we might have:
604                //
605                //   B1. Drain `unreserved`.
606                //   B2. Check tree and `reserved` to see if ID is used.
607                //
608                // B2 will involve calling `LsmTree::layer_set` which should be thought of as a
609                // snapshot, so the change A1 might not be visible to thread B, but it won't matter
610                // because `reserved` will still include the ID.  So long as each thread does the
611                // operations in this order, it should be safe.
612                unreserved.push(id.try_into().unwrap())
613            }
614            _ => {}
615        }
616    }
617
618    /// Removes `unreserved` IDs from the `reserved` list.
619    fn drain_unreserved(&mut self) {
620        match self {
621            LastObjectId::Low32Bit { reserved, unreserved } => {
622                for u in unreserved.drain(..) {
623                    assert!(reserved.remove(&u));
624                }
625            }
626            _ => {}
627        }
628    }
629}
630
631pub struct ReservedId<'a>(&'a ObjectStore, NonZero<u64>);
632
633impl<'a> ReservedId<'a> {
634    fn new(store: &'a ObjectStore, id: NonZero<u64>) -> Self {
635        Self(store, id)
636    }
637
638    pub fn get(&self) -> u64 {
639        self.1.get()
640    }
641
642    /// The caller takes responsibility for this id.
643    #[must_use]
644    pub fn release(self) -> u64 {
645        let id = self.1.get();
646        std::mem::forget(self);
647        id
648    }
649}
650
651impl Drop for ReservedId<'_> {
652    fn drop(&mut self) {
653        self.0.last_object_id.lock().unreserve(self.1.get());
654    }
655}
656
657/// An object store supports a file like interface for objects.  Objects are keyed by a 64 bit
658/// identifier.  And object store has to be backed by a parent object store (which stores metadata
659/// for the object store).  The top-level object store (a.k.a. the root parent object store) is
660/// in-memory only.
661pub struct ObjectStore {
662    parent_store: Option<Arc<ObjectStore>>,
663    store_object_id: u64,
664    device: Arc<dyn Device>,
665    block_size: u64,
666    filesystem: Weak<FxFilesystem>,
667    // Lock ordering: This must be taken before `lock_state`.
668    store_info: Mutex<Option<StoreInfo>>,
669    tree: LSMTree<ObjectKey, ObjectValue>,
670
671    // When replaying the journal, the store cannot read StoreInfo until the whole journal
672    // has been replayed, so during that time, store_info_handle will be None and records
673    // just get sent to the tree. Once the journal has been replayed, we can open the store
674    // and load all the other layer information.
675    store_info_handle: OnceLock<DataObjectHandle<ObjectStore>>,
676
677    // The cipher to use for encrypted mutations, if this store is encrypted.
678    mutations_cipher: Mutex<Option<StreamCipher>>,
679
680    // Current lock state of the store.
681    // Lock ordering: This must be taken after `store_info`.
682    lock_state: Mutex<LockState>,
683    pub key_manager: KeyManager,
684
685    // Enable/disable tracing.
686    trace: AtomicBool,
687
688    // Informational counters for events occurring within the store.
689    counters: Mutex<ObjectStoreCounters>,
690
691    // These are updated in performance-sensitive code paths so we use atomics instead of counters.
692    device_read_ops: AtomicU64,
693    device_write_ops: AtomicU64,
694    logical_read_ops: AtomicU64,
695    logical_write_ops: AtomicU64,
696
697    // Contains the last object ID and, optionally, a cipher to be used when generating new object
698    // IDs.
699    last_object_id: Mutex<LastObjectId>,
700
701    // An optional callback to be invoked each time the ObjectStore flushes.  The callback is
702    // invoked at the end of flush, while the write lock is still held.
703    flush_callback: Mutex<Option<Box<dyn Fn(&ObjectStore) + Send + Sync + 'static>>>,
704}
705
706#[derive(Clone, Default)]
707struct ObjectStoreCounters {
708    mutations_applied: u64,
709    mutations_dropped: u64,
710    num_flushes: u64,
711    last_flush_time: Option<std::time::SystemTime>,
712}
713
714impl ObjectStore {
715    fn new(
716        parent_store: Option<Arc<ObjectStore>>,
717        store_object_id: u64,
718        filesystem: Arc<FxFilesystem>,
719        store_info: Option<StoreInfo>,
720        object_cache: Box<dyn ObjectCache<ObjectKey, ObjectValue>>,
721        mutations_cipher: Option<StreamCipher>,
722        lock_state: LockState,
723        last_object_id: LastObjectId,
724    ) -> Arc<ObjectStore> {
725        let device = filesystem.device();
726        let block_size = filesystem.block_size();
727        Arc::new(ObjectStore {
728            parent_store,
729            store_object_id,
730            device,
731            block_size,
732            filesystem: Arc::downgrade(&filesystem),
733            store_info: Mutex::new(store_info),
734            tree: LSMTree::new(merge::merge, object_cache),
735            store_info_handle: OnceLock::new(),
736            mutations_cipher: Mutex::new(mutations_cipher),
737            lock_state: Mutex::new(lock_state),
738            key_manager: KeyManager::new(),
739            trace: AtomicBool::new(false),
740            counters: Mutex::new(ObjectStoreCounters::default()),
741            device_read_ops: AtomicU64::new(0),
742            device_write_ops: AtomicU64::new(0),
743            logical_read_ops: AtomicU64::new(0),
744            logical_write_ops: AtomicU64::new(0),
745            last_object_id: Mutex::new(last_object_id),
746            flush_callback: Mutex::new(None),
747        })
748    }
749
750    fn new_empty(
751        parent_store: Option<Arc<ObjectStore>>,
752        store_object_id: u64,
753        filesystem: Arc<FxFilesystem>,
754        object_cache: Box<dyn ObjectCache<ObjectKey, ObjectValue>>,
755    ) -> Arc<Self> {
756        Self::new(
757            parent_store,
758            store_object_id,
759            filesystem,
760            Some(StoreInfo::default()),
761            object_cache,
762            None,
763            LockState::Unencrypted,
764            LastObjectId::Unencrypted { id: 0 },
765        )
766    }
767
768    /// Cycle breaker constructor that returns an ObjectStore without a filesystem.
769    /// This should only be used from super block code.
770    pub fn new_root_parent(device: Arc<dyn Device>, block_size: u64, store_object_id: u64) -> Self {
771        ObjectStore {
772            parent_store: None,
773            store_object_id,
774            device,
775            block_size,
776            filesystem: Weak::<FxFilesystem>::new(),
777            store_info: Mutex::new(Some(StoreInfo::default())),
778            tree: LSMTree::new(merge::merge, Box::new(NullCache {})),
779            store_info_handle: OnceLock::new(),
780            mutations_cipher: Mutex::new(None),
781            lock_state: Mutex::new(LockState::Unencrypted),
782            key_manager: KeyManager::new(),
783            trace: AtomicBool::new(false),
784            counters: Mutex::new(ObjectStoreCounters::default()),
785            device_read_ops: AtomicU64::new(0),
786            device_write_ops: AtomicU64::new(0),
787            logical_read_ops: AtomicU64::new(0),
788            logical_write_ops: AtomicU64::new(0),
789            last_object_id: Mutex::new(LastObjectId::Unencrypted { id: 0 }),
790            flush_callback: Mutex::new(None),
791        }
792    }
793
794    /// Used to set filesystem on root_parent stores at bootstrap time after the filesystem has
795    /// been created.
796    pub fn attach_filesystem(mut this: ObjectStore, filesystem: Arc<FxFilesystem>) -> ObjectStore {
797        this.filesystem = Arc::downgrade(&filesystem);
798        this
799    }
800
801    /// Create a child store. It is a multi-step process:
802    ///
803    ///   1. Call `ObjectStore::new_child_store`.
804    ///   2. Register the store with the object-manager.
805    ///   3. Call `ObjectStore::create` to write the store-info.
806    ///
807    /// If the procedure fails, care must be taken to unregister store with the object-manager.
808    ///
809    /// The steps have to be separate because of lifetime issues when working with a transaction.
810    async fn new_child_store(
811        self: &Arc<Self>,
812        transaction: &mut Transaction<'_>,
813        options: NewChildStoreOptions,
814        object_cache: Box<dyn ObjectCache<ObjectKey, ObjectValue>>,
815    ) -> Result<Arc<Self>, Error> {
816        ensure!(
817            !options.reserve_32bit_object_ids || !options.low_32_bit_object_ids,
818            FxfsError::InvalidArgs
819        );
820        let handle = if let Some(object_id) = NonZero::new(options.object_id) {
821            self.update_last_object_id(object_id.get());
822            let handle = ObjectStore::create_object_with_id(
823                self,
824                transaction,
825                ReservedId::new(self, object_id),
826                HandleOptions::default(),
827                None,
828            )?;
829            handle
830        } else {
831            ObjectStore::create_object(self, transaction, HandleOptions::default(), None).await?
832        };
833        let filesystem = self.filesystem();
834        let id = if options.reserve_32bit_object_ids { 0x1_0000_0000 } else { 0 };
835        let (last_object_id, last_object_id_in_memory) = if options.low_32_bit_object_ids {
836            (
837                LastObjectIdInfo::Low32Bit,
838                LastObjectId::Low32Bit { reserved: HashSet::new(), unreserved: Vec::new() },
839            )
840        } else if let Some(crypt) = &options.options.crypt {
841            let (object_id_wrapped, object_id_unwrapped) =
842                crypt.create_key(handle.object_id(), KeyPurpose::Metadata).await?;
843            (
844                LastObjectIdInfo::Encrypted { id, key: object_id_wrapped },
845                LastObjectId::Encrypted { id, cipher: Box::new(Ff1::new(&object_id_unwrapped)) },
846            )
847        } else {
848            (LastObjectIdInfo::Unencrypted { id }, LastObjectId::Unencrypted { id })
849        };
850        let store = if let Some(crypt) = options.options.crypt {
851            let (wrapped_key, unwrapped_key) =
852                crypt.create_key(handle.object_id(), KeyPurpose::Metadata).await?;
853            Self::new(
854                Some(self.clone()),
855                handle.object_id(),
856                filesystem.clone(),
857                Some(StoreInfo {
858                    mutations_key: Some(wrapped_key),
859                    last_object_id,
860                    guid: options.guid.unwrap_or_else(|| *Uuid::new_v4().as_bytes()),
861                    ..Default::default()
862                }),
863                object_cache,
864                Some(StreamCipher::new(&unwrapped_key, 0)),
865                LockState::Unlocked { owner: options.options.owner, crypt },
866                last_object_id_in_memory,
867            )
868        } else {
869            Self::new(
870                Some(self.clone()),
871                handle.object_id(),
872                filesystem.clone(),
873                Some(StoreInfo {
874                    last_object_id,
875                    guid: options.guid.unwrap_or_else(|| *Uuid::new_v4().as_bytes()),
876                    ..Default::default()
877                }),
878                object_cache,
879                None,
880                LockState::Unencrypted,
881                last_object_id_in_memory,
882            )
883        };
884        assert!(store.store_info_handle.set(handle).is_ok());
885        Ok(store)
886    }
887
888    /// Actually creates the store in a transaction.  This will also create a root directory and
889    /// graveyard directory for the store.  See `new_child_store` above.
890    async fn create<'a>(
891        self: &'a Arc<Self>,
892        transaction: &mut Transaction<'a>,
893    ) -> Result<(), Error> {
894        let buf = {
895            // Create a root directory and graveyard directory.
896            let graveyard_directory_object_id = Graveyard::create(transaction, &self).await?;
897            let root_directory = Directory::create(transaction, &self, None).await?;
898
899            let serialized_info = {
900                let mut store_info = self.store_info.lock();
901                let store_info = store_info.as_mut().unwrap();
902
903                store_info.graveyard_directory_object_id = graveyard_directory_object_id;
904                store_info.root_directory_object_id = root_directory.object_id();
905
906                let mut serialized_info = Vec::new();
907                store_info.serialize_with_version(&mut serialized_info)?;
908                serialized_info
909            };
910            let mut buf = self.device.allocate_buffer(serialized_info.len()).await;
911            buf.as_mut_slice().copy_from_slice(&serialized_info[..]);
912            buf
913        };
914
915        if self.filesystem().options().image_builder_mode.is_some() {
916            // If we're in image builder mode, we want to avoid writing to disk unless explicitly
917            // asked to. New object stores will have their StoreInfo written when we compact in
918            // FxFilesystem::finalize().
919            Ok(())
920        } else {
921            self.store_info_handle.get().unwrap().txn_write(transaction, 0u64, buf.as_ref()).await
922        }
923    }
924
925    pub fn set_trace(&self, trace: bool) {
926        let old_value = self.trace.swap(trace, Ordering::Relaxed);
927        if trace != old_value {
928            info!(store_id = self.store_object_id(), trace; "OS: trace",);
929        }
930    }
931
932    /// Sets a callback to be invoked each time the ObjectStore flushes.  The callback is invoked at
933    /// the end of flush, while the write lock is still held.
934    pub fn set_flush_callback<F: Fn(&ObjectStore) + Send + Sync + 'static>(&self, callback: F) {
935        let mut flush_callback = self.flush_callback.lock();
936        *flush_callback = Some(Box::new(callback));
937    }
938
939    pub fn is_root(&self) -> bool {
940        if let Some(parent) = &self.parent_store {
941            parent.parent_store.is_none()
942        } else {
943            // The root parent store isn't the root store.
944            false
945        }
946    }
947
948    /// Populates an inspect node with store statistics.
949    pub fn record_data(self: &Arc<Self>, root: &fuchsia_inspect::Node) {
950        // TODO(https://fxbug.dev/42069513): Push-back or rate-limit to prevent DoS.
951        let counters = self.counters.lock();
952        if let Some(store_info) = self.store_info() {
953            root.record_string("guid", Uuid::from_bytes(store_info.guid).to_string());
954        };
955        root.record_uint("store_object_id", self.store_object_id);
956        root.record_uint("mutations_applied", counters.mutations_applied);
957        root.record_uint("mutations_dropped", counters.mutations_dropped);
958        root.record_uint("num_flushes", counters.num_flushes);
959        if let Some(last_flush_time) = counters.last_flush_time.as_ref() {
960            root.record_uint(
961                "last_flush_time_ms",
962                last_flush_time
963                    .duration_since(std::time::UNIX_EPOCH)
964                    .unwrap_or(std::time::Duration::ZERO)
965                    .as_millis()
966                    .try_into()
967                    .unwrap_or(0u64),
968            );
969        }
970        root.record_uint("device_read_ops", self.device_read_ops.load(Ordering::Relaxed));
971        root.record_uint("device_write_ops", self.device_write_ops.load(Ordering::Relaxed));
972        root.record_uint("logical_read_ops", self.logical_read_ops.load(Ordering::Relaxed));
973        root.record_uint("logical_write_ops", self.logical_write_ops.load(Ordering::Relaxed));
974        {
975            let last_object_id = self.last_object_id.lock();
976            root.record_uint("object_id_hi", last_object_id.id() >> 32);
977            root.record_bool(
978                "low_32_bit_object_ids",
979                matches!(&*last_object_id, LastObjectId::Low32Bit { .. }),
980            );
981        }
982
983        let this = self.clone();
984        root.record_child("lsm_tree", move |node| this.tree().record_inspect_data(node));
985    }
986
987    pub fn device(&self) -> &Arc<dyn Device> {
988        &self.device
989    }
990
991    pub fn block_size(&self) -> u64 {
992        self.block_size
993    }
994
995    pub fn filesystem(&self) -> Arc<FxFilesystem> {
996        self.filesystem.upgrade().unwrap()
997    }
998
999    pub fn store_object_id(&self) -> u64 {
1000        self.store_object_id
1001    }
1002
1003    pub fn tree(&self) -> &LSMTree<ObjectKey, ObjectValue> {
1004        &self.tree
1005    }
1006
1007    pub fn root_directory_object_id(&self) -> u64 {
1008        self.store_info.lock().as_ref().unwrap().root_directory_object_id
1009    }
1010
1011    pub fn guid(&self) -> [u8; 16] {
1012        self.store_info.lock().as_ref().unwrap().guid
1013    }
1014
1015    pub fn graveyard_directory_object_id(&self) -> u64 {
1016        self.store_info.lock().as_ref().unwrap().graveyard_directory_object_id
1017    }
1018
1019    fn set_graveyard_directory_object_id(&self, oid: u64) {
1020        assert_eq!(
1021            std::mem::replace(
1022                &mut self.store_info.lock().as_mut().unwrap().graveyard_directory_object_id,
1023                oid
1024            ),
1025            INVALID_OBJECT_ID
1026        );
1027    }
1028
1029    pub fn object_count(&self) -> u64 {
1030        self.store_info.lock().as_ref().unwrap().object_count
1031    }
1032
1033    /// Returns INVALID_OBJECT_ID for algorithms that don't use the last ID.
1034    pub(crate) fn unencrypted_last_object_id(&self) -> u64 {
1035        self.last_object_id.lock().id()
1036    }
1037
1038    pub fn key_manager(&self) -> &KeyManager {
1039        &self.key_manager
1040    }
1041
1042    pub fn parent_store(&self) -> Option<&Arc<ObjectStore>> {
1043        self.parent_store.as_ref()
1044    }
1045
1046    /// Returns the crypt object for the store.  Returns None if the store is unencrypted.
1047    pub fn crypt(&self) -> Option<Arc<dyn Crypt>> {
1048        match &*self.lock_state.lock() {
1049            LockState::Locked => panic!("Store is locked"),
1050            LockState::Invalid
1051            | LockState::Unencrypted
1052            | LockState::Locking
1053            | LockState::Unlocking
1054            | LockState::Deleted => None,
1055            LockState::Unlocked { crypt, .. } => Some(crypt.clone()),
1056            LockState::UnlockedReadOnly(crypt) => Some(crypt.clone()),
1057            LockState::Unknown => {
1058                panic!("Store is of unknown lock state; has the journal been replayed yet?")
1059            }
1060        }
1061    }
1062
1063    pub async fn get_or_create_internal_directory_id(self: &Arc<Self>) -> Result<u64, Error> {
1064        // Create the transaction first to use the object store lock.
1065        let mut transaction = self
1066            .filesystem()
1067            .new_transaction(
1068                lock_keys![LockKey::InternalDirectory { store_object_id: self.store_object_id }],
1069                Options::default(),
1070            )
1071            .await?;
1072        let obj_id = self.store_info.lock().as_ref().unwrap().internal_directory_object_id;
1073        if obj_id != INVALID_OBJECT_ID {
1074            return Ok(obj_id);
1075        }
1076
1077        // Need to create an internal directory.
1078        let directory = Directory::create(&mut transaction, self, None).await?;
1079
1080        transaction.add(self.store_object_id, Mutation::CreateInternalDir(directory.object_id()));
1081        transaction.commit().await?;
1082        Ok(directory.object_id())
1083    }
1084
1085    /// Returns the file size for the object without opening the object.
1086    async fn get_file_size(&self, object_id: u64) -> Result<u64, Error> {
1087        let item = self
1088            .tree
1089            .find(&ObjectKey::attribute(
1090                object_id,
1091                DEFAULT_DATA_ATTRIBUTE_ID,
1092                AttributeKey::Attribute,
1093            ))
1094            .await?
1095            .ok_or(FxfsError::NotFound)?;
1096        if let ObjectValue::Attribute { size, .. } = item.value {
1097            Ok(size)
1098        } else {
1099            bail!(FxfsError::NotFile);
1100        }
1101    }
1102
1103    #[cfg(feature = "migration")]
1104    pub fn last_object_id(&self) -> u64 {
1105        self.last_object_id.lock().id()
1106    }
1107
1108    /// Provides access to the allocator to mark a specific region of the device as allocated.
1109    #[cfg(feature = "migration")]
1110    pub fn mark_allocated(
1111        &self,
1112        transaction: &mut Transaction<'_>,
1113        store_object_id: u64,
1114        device_range: std::ops::Range<u64>,
1115    ) -> Result<(), Error> {
1116        self.allocator().mark_allocated(transaction, store_object_id, device_range)
1117    }
1118
1119    /// `crypt` can be provided if the crypt service should be different to the default; see the
1120    /// comment on create_object.  Users should avoid having more than one handle open for the same
1121    /// object at the same time because they might get out-of-sync; there is no code that will
1122    /// prevent this.  One example where this can cause an issue is if the object ends up using a
1123    /// permanent key (which is the case if a value is passed for `crypt`), the permanent key is
1124    /// dropped when a handle is dropped, which will impact any other handles for the same object.
1125    pub async fn open_object<S: HandleOwner>(
1126        owner: &Arc<S>,
1127        obj_id: u64,
1128        options: HandleOptions,
1129        crypt: Option<Arc<dyn Crypt>>,
1130    ) -> Result<DataObjectHandle<S>, Error> {
1131        let store = owner.as_ref().as_ref();
1132        let mut fsverity_descriptor = None;
1133        let mut overwrite_ranges = Vec::new();
1134        let item = store
1135            .tree
1136            .find(&ObjectKey::attribute(obj_id, DEFAULT_DATA_ATTRIBUTE_ID, AttributeKey::Attribute))
1137            .await?
1138            .ok_or(FxfsError::NotFound)?;
1139
1140        let (size, track_overwrite_extents) = match item.value {
1141            ObjectValue::Attribute { size, has_overwrite_extents } => (size, has_overwrite_extents),
1142            ObjectValue::VerifiedAttribute { size, fsverity_metadata } => {
1143                if !options.skip_fsverity {
1144                    fsverity_descriptor = Some(fsverity_metadata);
1145                }
1146                // We only track the overwrite extents in memory for writes, reads handle them
1147                // implicitly, which means verified files (where the data won't change anymore)
1148                // don't need to track them.
1149                (size, false)
1150            }
1151            _ => bail!(anyhow!(FxfsError::Inconsistent).context("open_object: Expected attibute")),
1152        };
1153
1154        ensure!(size <= MAX_FILE_SIZE, FxfsError::Inconsistent);
1155
1156        if track_overwrite_extents {
1157            let layer_set = store.tree.layer_set();
1158            let mut merger = layer_set.merger();
1159            let mut iter = merger
1160                .query(Query::FullRange(&ObjectKey::attribute(
1161                    obj_id,
1162                    DEFAULT_DATA_ATTRIBUTE_ID,
1163                    AttributeKey::Extent(ExtentKey::search_key_from_offset(0)),
1164                )))
1165                .await?;
1166            loop {
1167                match iter.get() {
1168                    Some(ItemRef {
1169                        key:
1170                            ObjectKey {
1171                                object_id,
1172                                data:
1173                                    ObjectKeyData::Attribute(
1174                                        attribute_id,
1175                                        AttributeKey::Extent(ExtentKey { range }),
1176                                    ),
1177                            },
1178                        value,
1179                        ..
1180                    }) if *object_id == obj_id && *attribute_id == DEFAULT_DATA_ATTRIBUTE_ID => {
1181                        match value {
1182                            ObjectValue::Extent(ExtentValue::None)
1183                            | ObjectValue::Extent(ExtentValue::Some {
1184                                mode: ExtentMode::Raw,
1185                                ..
1186                            })
1187                            | ObjectValue::Extent(ExtentValue::Some {
1188                                mode: ExtentMode::Cow(_),
1189                                ..
1190                            }) => (),
1191                            ObjectValue::Extent(ExtentValue::Some {
1192                                mode: ExtentMode::OverwritePartial(_),
1193                                ..
1194                            })
1195                            | ObjectValue::Extent(ExtentValue::Some {
1196                                mode: ExtentMode::Overwrite,
1197                                ..
1198                            }) => overwrite_ranges.push(range.clone()),
1199                            _ => bail!(
1200                                anyhow!(FxfsError::Inconsistent)
1201                                    .context("open_object: Expected extent")
1202                            ),
1203                        }
1204                        iter.advance().await?;
1205                    }
1206                    _ => break,
1207                }
1208            }
1209        }
1210
1211        // If a crypt service has been specified, it needs to be a permanent key because cached
1212        // keys can only use the store's crypt service.
1213        let permanent = if let Some(crypt) = crypt {
1214            store
1215                .key_manager
1216                .get_keys(
1217                    obj_id,
1218                    crypt.as_ref(),
1219                    &mut Some(async || store.get_keys(obj_id).await),
1220                    /* permanent= */ true,
1221                    /* force= */ false,
1222                )
1223                .await?;
1224            true
1225        } else {
1226            false
1227        };
1228        let data_object_handle = DataObjectHandle::new(
1229            owner.clone(),
1230            obj_id,
1231            permanent,
1232            DEFAULT_DATA_ATTRIBUTE_ID,
1233            size,
1234            FsverityState::None,
1235            options,
1236            false,
1237            &overwrite_ranges,
1238        );
1239        if let Some(descriptor) = fsverity_descriptor {
1240            data_object_handle
1241                .set_fsverity_state_some(descriptor)
1242                .await
1243                .context("Invalid or mismatched merkle tree")?;
1244        }
1245        Ok(data_object_handle)
1246    }
1247
1248    pub fn create_object_with_id<S: HandleOwner>(
1249        owner: &Arc<S>,
1250        transaction: &mut Transaction<'_>,
1251        reserved_object_id: ReservedId<'_>,
1252        options: HandleOptions,
1253        encryption_options: Option<ObjectEncryptionOptions>,
1254    ) -> Result<DataObjectHandle<S>, Error> {
1255        let store = owner.as_ref().as_ref();
1256        // Don't permit creating unencrypted objects in an encrypted store.  The converse is OK.
1257        debug_assert!(store.crypt().is_none() || encryption_options.is_some());
1258        let now = Timestamp::now();
1259        let object_id = reserved_object_id.get();
1260        assert!(
1261            transaction
1262                .add(
1263                    store.store_object_id(),
1264                    Mutation::insert_object(
1265                        ObjectKey::object(reserved_object_id.release()),
1266                        ObjectValue::file(
1267                            1,
1268                            0,
1269                            now.clone(),
1270                            now.clone(),
1271                            now.clone(),
1272                            now,
1273                            0,
1274                            None
1275                        ),
1276                    ),
1277                )
1278                .is_none()
1279        );
1280        let mut permanent_keys = false;
1281        if let Some(ObjectEncryptionOptions { permanent, key_id, key, unwrapped_key }) =
1282            encryption_options
1283        {
1284            permanent_keys = permanent;
1285            let cipher = key_to_cipher(&key, &unwrapped_key)?;
1286            transaction.add(
1287                store.store_object_id(),
1288                Mutation::insert_object(
1289                    ObjectKey::keys(object_id),
1290                    ObjectValue::keys(vec![(key_id, key)].into()),
1291                ),
1292            );
1293            store.key_manager.insert(
1294                object_id,
1295                Arc::new(vec![(key_id, CipherHolder::Cipher(cipher))].into()),
1296                permanent,
1297            );
1298        }
1299        transaction.add(
1300            store.store_object_id(),
1301            Mutation::insert_object(
1302                ObjectKey::attribute(object_id, DEFAULT_DATA_ATTRIBUTE_ID, AttributeKey::Attribute),
1303                // This is a new object so nothing has pre-allocated overwrite extents yet.
1304                ObjectValue::attribute(0, false),
1305            ),
1306        );
1307        Ok(DataObjectHandle::new(
1308            owner.clone(),
1309            object_id,
1310            permanent_keys,
1311            DEFAULT_DATA_ATTRIBUTE_ID,
1312            0,
1313            FsverityState::None,
1314            options,
1315            false,
1316            &[],
1317        ))
1318    }
1319
1320    /// Creates an object in the store.
1321    ///
1322    /// If the store is encrypted, the object will be automatically encrypted as well.
1323    /// If `wrapping_key_id` is set, the new keys will be wrapped with that specific key, and
1324    /// otherwise the default data key is used.
1325    pub async fn create_object<S: HandleOwner>(
1326        owner: &Arc<S>,
1327        mut transaction: &mut Transaction<'_>,
1328        options: HandleOptions,
1329        wrapping_key_id: Option<WrappingKeyId>,
1330    ) -> Result<DataObjectHandle<S>, Error> {
1331        let store = owner.as_ref().as_ref();
1332        let object_id = store.get_next_object_id(transaction.txn_guard()).await?;
1333        let crypt = store.crypt();
1334        let encryption_options = if let Some(crypt) = crypt {
1335            let key_id =
1336                if wrapping_key_id.is_some() { FSCRYPT_KEY_ID } else { VOLUME_DATA_KEY_ID };
1337            let (key, unwrapped_key) = if let Some(wrapping_key_id) = wrapping_key_id {
1338                crypt.create_key_with_id(object_id.get(), wrapping_key_id, ObjectType::File).await?
1339            } else {
1340                let (fxfs_key, unwrapped_key) =
1341                    crypt.create_key(object_id.get(), KeyPurpose::Data).await?;
1342                (EncryptionKey::Fxfs(fxfs_key), unwrapped_key)
1343            };
1344            Some(ObjectEncryptionOptions { permanent: false, key_id, key, unwrapped_key })
1345        } else {
1346            None
1347        };
1348        ObjectStore::create_object_with_id(
1349            owner,
1350            &mut transaction,
1351            object_id,
1352            options,
1353            encryption_options,
1354        )
1355    }
1356
1357    /// Creates an object using explicitly provided keys.
1358    ///
1359    /// There are some cases where an encrypted object needs to be created in an unencrypted store.
1360    /// For example, when layer files for a child store are created in the root store, but they must
1361    /// be encrypted using the child store's keys.  This method exists for that purpose.
1362    pub(crate) async fn create_object_with_key<S: HandleOwner>(
1363        owner: &Arc<S>,
1364        mut transaction: &mut Transaction<'_>,
1365        object_id: ReservedId<'_>,
1366        options: HandleOptions,
1367        key: EncryptionKey,
1368        unwrapped_key: UnwrappedKey,
1369    ) -> Result<DataObjectHandle<S>, Error> {
1370        ObjectStore::create_object_with_id(
1371            owner,
1372            &mut transaction,
1373            object_id,
1374            options,
1375            Some(ObjectEncryptionOptions {
1376                permanent: true,
1377                key_id: VOLUME_DATA_KEY_ID,
1378                key,
1379                unwrapped_key,
1380            }),
1381        )
1382    }
1383
1384    /// Adjusts the reference count for a given object.  If the reference count reaches zero, the
1385    /// object is moved into the graveyard and true is returned.
1386    pub async fn adjust_refs(
1387        &self,
1388        transaction: &mut Transaction<'_>,
1389        object_id: u64,
1390        delta: i64,
1391    ) -> Result<bool, Error> {
1392        let mut mutation = self.txn_get_object_mutation(transaction, object_id).await?;
1393        let refs = if let ObjectValue::Object {
1394            kind: ObjectKind::File { refs, .. } | ObjectKind::Symlink { refs, .. },
1395            ..
1396        } = &mut mutation.item.value
1397        {
1398            *refs =
1399                refs.checked_add_signed(delta).ok_or_else(|| anyhow!("refs underflow/overflow"))?;
1400            refs
1401        } else {
1402            bail!(FxfsError::NotFile);
1403        };
1404        if *refs == 0 {
1405            self.add_to_graveyard(transaction, object_id);
1406
1407            // We might still need to adjust the reference count if delta was something other than
1408            // -1.
1409            if delta != -1 {
1410                *refs = 1;
1411                transaction.add(self.store_object_id, Mutation::ObjectStore(mutation));
1412            }
1413            // Otherwise, we don't commit the mutation as we want to keep reference count as 1 for
1414            // objects in graveyard.
1415            Ok(true)
1416        } else {
1417            transaction.add(self.store_object_id, Mutation::ObjectStore(mutation));
1418            Ok(false)
1419        }
1420    }
1421
1422    // Purges an object that is in the graveyard.
1423    pub async fn tombstone_object(
1424        &self,
1425        object_id: u64,
1426        txn_options: Options<'_>,
1427    ) -> Result<(), Error> {
1428        self.key_manager.remove(object_id).await;
1429        let fs = self.filesystem();
1430        let truncate_guard = fs.truncate_guard(self.store_object_id, object_id).await;
1431        self.trim_or_tombstone(object_id, true, txn_options, &truncate_guard).await
1432    }
1433
1434    /// Trim extents beyond the end of a file for all attributes.  This will remove the entry from
1435    /// the graveyard when done.
1436    pub async fn trim(
1437        &self,
1438        object_id: u64,
1439        truncate_guard: &TruncateGuard<'_>,
1440    ) -> Result<(), Error> {
1441        // For the root and root parent store, we would need to use the metadata reservation which
1442        // we don't currently support, so assert that we're not those stores.
1443        assert!(self.parent_store.as_ref().unwrap().parent_store.is_some());
1444
1445        self.trim_or_tombstone(
1446            object_id,
1447            false,
1448            Options { borrow_metadata_space: true, ..Default::default() },
1449            truncate_guard,
1450        )
1451        .await
1452    }
1453
1454    /// Trims or tombstones an object.
1455    async fn trim_or_tombstone(
1456        &self,
1457        object_id: u64,
1458        for_tombstone: bool,
1459        txn_options: Options<'_>,
1460        _truncate_guard: &TruncateGuard<'_>,
1461    ) -> Result<(), Error> {
1462        let fs = self.filesystem();
1463        let mut next_attribute = Some(0);
1464        while let Some(attribute_id) = next_attribute.take() {
1465            let mut transaction = fs
1466                .clone()
1467                .new_transaction(
1468                    lock_keys![
1469                        LockKey::object_attribute(self.store_object_id, object_id, attribute_id),
1470                        LockKey::object(self.store_object_id, object_id),
1471                    ],
1472                    txn_options,
1473                )
1474                .await?;
1475
1476            match self
1477                .trim_some(
1478                    &mut transaction,
1479                    object_id,
1480                    attribute_id,
1481                    if for_tombstone {
1482                        TrimMode::Tombstone(TombstoneMode::Object)
1483                    } else {
1484                        TrimMode::UseSize
1485                    },
1486                )
1487                .await?
1488            {
1489                TrimResult::Incomplete => next_attribute = Some(attribute_id),
1490                TrimResult::Done(None) => {
1491                    if for_tombstone
1492                        || matches!(
1493                            self.tree
1494                                .find(&ObjectKey::graveyard_entry(
1495                                    self.graveyard_directory_object_id(),
1496                                    object_id,
1497                                ))
1498                                .await?,
1499                            Some(Item { value: ObjectValue::Trim, .. })
1500                        )
1501                    {
1502                        self.remove_from_graveyard(&mut transaction, object_id);
1503                    }
1504                }
1505                TrimResult::Done(id) => next_attribute = id,
1506            }
1507
1508            if !transaction.mutations().is_empty() {
1509                transaction.commit().await?;
1510            }
1511        }
1512        Ok(())
1513    }
1514
1515    // Purges an object's attribute that is in the graveyard.
1516    pub async fn tombstone_attribute(
1517        &self,
1518        object_id: u64,
1519        attribute_id: u64,
1520        txn_options: Options<'_>,
1521    ) -> Result<(), Error> {
1522        let fs = self.filesystem();
1523        let mut trim_result = TrimResult::Incomplete;
1524        while matches!(trim_result, TrimResult::Incomplete) {
1525            let mut transaction = fs
1526                .clone()
1527                .new_transaction(
1528                    lock_keys![
1529                        LockKey::object_attribute(self.store_object_id, object_id, attribute_id),
1530                        LockKey::object(self.store_object_id, object_id),
1531                    ],
1532                    txn_options,
1533                )
1534                .await?;
1535            trim_result = self
1536                .trim_some(
1537                    &mut transaction,
1538                    object_id,
1539                    attribute_id,
1540                    TrimMode::Tombstone(TombstoneMode::Attribute),
1541                )
1542                .await?;
1543            if let TrimResult::Done(..) = trim_result {
1544                self.remove_attribute_from_graveyard(&mut transaction, object_id, attribute_id)
1545            }
1546            if !transaction.mutations().is_empty() {
1547                transaction.commit().await?;
1548            }
1549        }
1550        Ok(())
1551    }
1552
1553    /// Deletes extents for attribute `attribute_id` in object `object_id`.  Also see the comments
1554    /// for TrimMode and TrimResult. Should hold a lock on the attribute, and the object as it
1555    /// performs a read-modify-write on the sizes.
1556    pub async fn trim_some(
1557        &self,
1558        transaction: &mut Transaction<'_>,
1559        object_id: u64,
1560        attribute_id: u64,
1561        mode: TrimMode,
1562    ) -> Result<TrimResult, Error> {
1563        let layer_set = self.tree.layer_set();
1564        let mut merger = layer_set.merger();
1565
1566        let aligned_offset = match mode {
1567            TrimMode::FromOffset(offset) => {
1568                round_up(offset, self.block_size).ok_or(FxfsError::Inconsistent)?
1569            }
1570            TrimMode::Tombstone(..) => 0,
1571            TrimMode::UseSize => {
1572                let iter = merger
1573                    .query(Query::FullRange(&ObjectKey::attribute(
1574                        object_id,
1575                        attribute_id,
1576                        AttributeKey::Attribute,
1577                    )))
1578                    .await?;
1579                if let Some(item_ref) = iter.get() {
1580                    if item_ref.key.object_id != object_id {
1581                        return Ok(TrimResult::Done(None));
1582                    }
1583
1584                    if let ItemRef {
1585                        key:
1586                            ObjectKey {
1587                                data:
1588                                    ObjectKeyData::Attribute(size_attribute_id, AttributeKey::Attribute),
1589                                ..
1590                            },
1591                        value: ObjectValue::Attribute { size, .. },
1592                        ..
1593                    } = item_ref
1594                    {
1595                        // If we found a different attribute_id, return so we can get the
1596                        // right lock.
1597                        if *size_attribute_id != attribute_id {
1598                            return Ok(TrimResult::Done(Some(*size_attribute_id)));
1599                        }
1600                        round_up(*size, self.block_size).ok_or(FxfsError::Inconsistent)?
1601                    } else {
1602                        // At time of writing, we should always see a size record or None here, but
1603                        // asserting here would be brittle so just skip to the the next attribute
1604                        // instead.
1605                        return Ok(TrimResult::Done(Some(attribute_id + 1)));
1606                    }
1607                } else {
1608                    // End of the tree.
1609                    return Ok(TrimResult::Done(None));
1610                }
1611            }
1612        };
1613
1614        // Loop over the extents and deallocate them.
1615        let mut iter = merger
1616            .query(Query::FullRange(&ObjectKey::from_extent(
1617                object_id,
1618                attribute_id,
1619                ExtentKey::search_key_from_offset(aligned_offset),
1620            )))
1621            .await?;
1622        let mut end = 0;
1623        let allocator = self.allocator();
1624        let mut result = TrimResult::Done(None);
1625        let mut deallocated = 0;
1626        let block_size = self.block_size;
1627
1628        while let Some(item_ref) = iter.get() {
1629            if item_ref.key.object_id != object_id {
1630                break;
1631            }
1632            if let ObjectKey {
1633                data: ObjectKeyData::Attribute(extent_attribute_id, attribute_key),
1634                ..
1635            } = item_ref.key
1636            {
1637                if *extent_attribute_id != attribute_id {
1638                    result = TrimResult::Done(Some(*extent_attribute_id));
1639                    break;
1640                }
1641                if let (
1642                    AttributeKey::Extent(ExtentKey { range }),
1643                    ObjectValue::Extent(ExtentValue::Some { device_offset, .. }),
1644                ) = (attribute_key, item_ref.value)
1645                {
1646                    let start = std::cmp::max(range.start, aligned_offset);
1647                    ensure!(start < range.end, FxfsError::Inconsistent);
1648                    let device_offset = device_offset
1649                        .checked_add(start - range.start)
1650                        .ok_or(FxfsError::Inconsistent)?;
1651                    end = range.end;
1652                    let len = end - start;
1653                    let device_range = device_offset..device_offset + len;
1654                    ensure!(device_range.is_aligned(block_size), FxfsError::Inconsistent);
1655                    allocator.deallocate(transaction, self.store_object_id, device_range).await?;
1656                    deallocated += len;
1657                    // Stop if the transaction is getting too big.
1658                    if transaction.mutations().len() >= TRANSACTION_MUTATION_THRESHOLD {
1659                        result = TrimResult::Incomplete;
1660                        break;
1661                    }
1662                }
1663            }
1664            iter.advance().await?;
1665        }
1666
1667        let finished_tombstone_object = matches!(mode, TrimMode::Tombstone(TombstoneMode::Object))
1668            && matches!(result, TrimResult::Done(None));
1669        let finished_tombstone_attribute =
1670            matches!(mode, TrimMode::Tombstone(TombstoneMode::Attribute))
1671                && !matches!(result, TrimResult::Incomplete);
1672        let mut object_mutation = None;
1673        let nodes = if finished_tombstone_object { -1 } else { 0 };
1674        if nodes != 0 || deallocated != 0 {
1675            let mutation = self.txn_get_object_mutation(transaction, object_id).await?;
1676            if let ObjectValue::Object { attributes: ObjectAttributes { project_id, .. }, .. } =
1677                mutation.item.value
1678            {
1679                if project_id != 0 {
1680                    transaction.add(
1681                        self.store_object_id,
1682                        Mutation::merge_object(
1683                            ObjectKey::project_usage(self.root_directory_object_id(), project_id),
1684                            ObjectValue::BytesAndNodes {
1685                                bytes: -i64::try_from(deallocated).unwrap(),
1686                                nodes,
1687                            },
1688                        ),
1689                    );
1690                }
1691                object_mutation = Some(mutation);
1692            } else {
1693                panic!("Inconsistent object type.");
1694            }
1695        }
1696
1697        // Deletion marker records *must* be merged so as to consume all other records for the
1698        // object.
1699        if finished_tombstone_object {
1700            transaction.add(
1701                self.store_object_id,
1702                Mutation::merge_object(ObjectKey::object(object_id), ObjectValue::None),
1703            );
1704        } else {
1705            if finished_tombstone_attribute {
1706                transaction.add(
1707                    self.store_object_id,
1708                    Mutation::merge_object(
1709                        ObjectKey::attribute(object_id, attribute_id, AttributeKey::Attribute),
1710                        ObjectValue::None,
1711                    ),
1712                );
1713            }
1714            if deallocated > 0 {
1715                let mut mutation = match object_mutation {
1716                    Some(mutation) => mutation,
1717                    None => self.txn_get_object_mutation(transaction, object_id).await?,
1718                };
1719                transaction.add(
1720                    self.store_object_id,
1721                    Mutation::merge_object(
1722                        ObjectKey::extent(object_id, attribute_id, aligned_offset..end),
1723                        ObjectValue::deleted_extent(),
1724                    ),
1725                );
1726                // Update allocated size.
1727                if let ObjectValue::Object {
1728                    attributes: ObjectAttributes { allocated_size, .. },
1729                    ..
1730                } = &mut mutation.item.value
1731                {
1732                    // The only way for these to fail are if the volume is inconsistent.
1733                    *allocated_size = allocated_size.checked_sub(deallocated).ok_or_else(|| {
1734                        anyhow!(FxfsError::Inconsistent).context("Allocated size overflow")
1735                    })?;
1736                } else {
1737                    panic!("Unexpected object value");
1738                }
1739                transaction.add(self.store_object_id, Mutation::ObjectStore(mutation));
1740            }
1741        }
1742        Ok(result)
1743    }
1744
1745    /// Returns all objects that exist in the parent store that pertain to this object store.
1746    /// Note that this doesn't include the object_id of the store itself which is generally
1747    /// referenced externally.
1748    pub fn parent_objects(&self) -> Vec<u64> {
1749        assert!(self.store_info_handle.get().is_some());
1750        self.store_info.lock().as_ref().unwrap().parent_objects()
1751    }
1752
1753    /// Returns root objects for this store.
1754    pub fn root_objects(&self) -> Vec<u64> {
1755        let mut objects = Vec::new();
1756        let store_info = self.store_info.lock();
1757        let info = store_info.as_ref().unwrap();
1758        if info.root_directory_object_id != INVALID_OBJECT_ID {
1759            objects.push(info.root_directory_object_id);
1760        }
1761        if info.graveyard_directory_object_id != INVALID_OBJECT_ID {
1762            objects.push(info.graveyard_directory_object_id);
1763        }
1764        if info.internal_directory_object_id != INVALID_OBJECT_ID {
1765            objects.push(info.internal_directory_object_id);
1766        }
1767        objects
1768    }
1769
1770    pub fn store_info(&self) -> Option<StoreInfo> {
1771        self.store_info.lock().as_ref().cloned()
1772    }
1773
1774    /// Returns None if called during journal replay.
1775    pub fn store_info_handle_object_id(&self) -> Option<u64> {
1776        self.store_info_handle.get().map(|h| h.object_id())
1777    }
1778
1779    /// Called to open a store, before replay of this store's mutations.
1780    async fn open(
1781        parent_store: &Arc<ObjectStore>,
1782        store_object_id: u64,
1783        object_cache: Box<dyn ObjectCache<ObjectKey, ObjectValue>>,
1784    ) -> Result<Arc<ObjectStore>, Error> {
1785        let handle =
1786            ObjectStore::open_object(parent_store, store_object_id, HandleOptions::default(), None)
1787                .await?;
1788
1789        let info = load_store_info(parent_store, store_object_id).await?;
1790        let is_encrypted = info.mutations_key.is_some();
1791
1792        let mut total_layer_size = 0;
1793        let last_object_id;
1794
1795        // TODO(https://fxbug.dev/42178043): the layer size here could be bad and cause overflow.
1796
1797        // If the store is encrypted, we can't open the object tree layers now, but we need to
1798        // compute the size of the layers.
1799        if is_encrypted {
1800            for &oid in &info.layers {
1801                total_layer_size += parent_store.get_file_size(oid).await?;
1802            }
1803            if info.encrypted_mutations_object_id != INVALID_OBJECT_ID {
1804                total_layer_size += layer_size_from_encrypted_mutations_size(
1805                    parent_store.get_file_size(info.encrypted_mutations_object_id).await?,
1806                );
1807            }
1808            last_object_id = LastObjectId::Pending;
1809            ensure!(
1810                matches!(
1811                    info.last_object_id,
1812                    LastObjectIdInfo::Encrypted { .. } | LastObjectIdInfo::Low32Bit { .. }
1813                ),
1814                FxfsError::Inconsistent
1815            );
1816        } else {
1817            last_object_id = match info.last_object_id {
1818                LastObjectIdInfo::Unencrypted { id } => LastObjectId::Unencrypted { id },
1819                LastObjectIdInfo::Low32Bit => {
1820                    LastObjectId::Low32Bit { reserved: HashSet::new(), unreserved: Vec::new() }
1821                }
1822                _ => bail!(FxfsError::Inconsistent),
1823            };
1824        }
1825
1826        let fs = parent_store.filesystem();
1827
1828        let store = ObjectStore::new(
1829            Some(parent_store.clone()),
1830            store_object_id,
1831            fs.clone(),
1832            if is_encrypted { None } else { Some(info) },
1833            object_cache,
1834            None,
1835            if is_encrypted { LockState::Locked } else { LockState::Unencrypted },
1836            last_object_id,
1837        );
1838
1839        assert!(store.store_info_handle.set(handle).is_ok(), "Failed to set store_info_handle!");
1840
1841        if !is_encrypted {
1842            let object_tree_layer_object_ids =
1843                store.store_info.lock().as_ref().unwrap().layers.clone();
1844            let object_layers = store.open_layers(object_tree_layer_object_ids, None).await?;
1845            total_layer_size = object_layers.iter().map(|h| h.get_size()).sum();
1846            store
1847                .tree
1848                .append_layers(object_layers)
1849                .await
1850                .context("Failed to read object store layers")?;
1851        }
1852
1853        fs.object_manager().update_reservation(
1854            store_object_id,
1855            tree::reservation_amount_from_layer_size(total_layer_size),
1856        );
1857
1858        Ok(store)
1859    }
1860
1861    async fn load_store_info(&self) -> Result<StoreInfo, Error> {
1862        load_store_info_from_handle(self.store_info_handle.get().unwrap()).await
1863    }
1864
1865    async fn open_layers(
1866        &self,
1867        object_ids: impl std::iter::IntoIterator<Item = u64>,
1868        crypt: Option<Arc<dyn Crypt>>,
1869    ) -> Result<Vec<DataObjectHandle<ObjectStore>>, Error> {
1870        let parent_store = self.parent_store.as_ref().unwrap();
1871        let mut handles = Vec::new();
1872        for object_id in object_ids {
1873            let handle = ObjectStore::open_object(
1874                &parent_store,
1875                object_id,
1876                HandleOptions::default(),
1877                crypt.clone(),
1878            )
1879            .await
1880            .with_context(|| format!("Failed to open layer file {}", object_id))?;
1881            handles.push(handle);
1882        }
1883        Ok(handles)
1884    }
1885
1886    /// Unlocks a store so that it is ready to be used.
1887    /// This is not thread-safe.
1888    pub async fn unlock(
1889        self: &Arc<Self>,
1890        owner: Weak<dyn StoreOwner>,
1891        crypt: Arc<dyn Crypt>,
1892    ) -> Result<(), Error> {
1893        self.unlock_inner(owner, crypt, /*read_only=*/ false).await
1894    }
1895
1896    /// Unlocks a store so that it is ready to be read from.
1897    /// The store will generally behave like it is still locked: when flushed, the store will
1898    /// write out its mutations into the encrypted mutations file, rather than directly updating
1899    /// the layer files of the object store.
1900    /// Re-locking the store (which *must* be done with `Self::lock_read_only` will not trigger a
1901    /// flush, although the store might still be flushed during other operations.
1902    /// This is not thread-safe.
1903    pub async fn unlock_read_only(self: &Arc<Self>, crypt: Arc<dyn Crypt>) -> Result<(), Error> {
1904        self.unlock_inner(NO_OWNER, crypt, /*read_only=*/ true).await
1905    }
1906
1907    async fn unlock_inner(
1908        self: &Arc<Self>,
1909        owner: Weak<dyn StoreOwner>,
1910        crypt: Arc<dyn Crypt>,
1911        read_only: bool,
1912    ) -> Result<(), Error> {
1913        // Unless we are unlocking the store as read-only, the filesystem must not be read-only.
1914        assert!(read_only || !self.filesystem().options().read_only);
1915        match &*self.lock_state.lock() {
1916            LockState::Locked => {}
1917            LockState::Unencrypted => bail!(FxfsError::InvalidArgs),
1918            LockState::Invalid | LockState::Deleted => bail!(FxfsError::Internal),
1919            LockState::Unlocked { .. } | LockState::UnlockedReadOnly(..) => {
1920                bail!(FxfsError::AlreadyBound)
1921            }
1922            LockState::Unknown => panic!("Store was unlocked before replay"),
1923            LockState::Locking => panic!("Store is being locked"),
1924            LockState::Unlocking => panic!("Store is being unlocked"),
1925        }
1926        // We must lock flushing since that can modify store_info and the encrypted mutations file.
1927        let keys = lock_keys![LockKey::flush(self.store_object_id())];
1928        let fs = self.filesystem();
1929        let guard = fs.lock_manager().write_lock(keys).await;
1930
1931        let store_info = self.load_store_info().await?;
1932
1933        self.tree
1934            .append_layers(
1935                self.open_layers(store_info.layers.iter().cloned(), Some(crypt.clone())).await?,
1936            )
1937            .await
1938            .context("Failed to read object tree layer file contents")?;
1939
1940        let wrapped_key =
1941            fxfs_crypto::WrappedKey::Fxfs(store_info.mutations_key.clone().unwrap().into());
1942        let unwrapped_key = crypt
1943            .unwrap_key(&wrapped_key, self.store_object_id)
1944            .await
1945            .context("Failed to unwrap mutations keys")?;
1946        // The ChaCha20 stream cipher we use supports up to 64 GiB.  By default we'll roll the key
1947        // after every 128 MiB.  Here we just need to pick a number that won't cause issues if it
1948        // wraps, so we just use u32::MAX (the offset is u64).
1949        ensure!(store_info.mutations_cipher_offset <= u32::MAX as u64, FxfsError::Inconsistent);
1950        let mut mutations_cipher =
1951            StreamCipher::new(&unwrapped_key, store_info.mutations_cipher_offset);
1952
1953        match &store_info.last_object_id {
1954            LastObjectIdInfo::Encrypted { id, key } => {
1955                let wrapped_key = fxfs_crypto::WrappedKey::Fxfs(key.clone().into());
1956                *self.last_object_id.lock() = LastObjectId::Encrypted {
1957                    id: *id,
1958                    cipher: Box::new(Ff1::new(
1959                        &crypt.unwrap_key(&wrapped_key, self.store_object_id).await?,
1960                    )),
1961                };
1962            }
1963            LastObjectIdInfo::Low32Bit => {
1964                *self.last_object_id.lock() = LastObjectId::Low32Bit {
1965                    reserved: Default::default(),
1966                    unreserved: Default::default(),
1967                }
1968            }
1969            _ => unreachable!(),
1970        }
1971
1972        // Apply the encrypted mutations.
1973        let mut mutations = {
1974            if store_info.encrypted_mutations_object_id == INVALID_OBJECT_ID {
1975                EncryptedMutations::default()
1976            } else {
1977                let parent_store = self.parent_store.as_ref().unwrap();
1978                let handle = ObjectStore::open_object(
1979                    &parent_store,
1980                    store_info.encrypted_mutations_object_id,
1981                    HandleOptions::default(),
1982                    None,
1983                )
1984                .await?;
1985                let mut cursor = std::io::Cursor::new(
1986                    handle
1987                        .contents(MAX_ENCRYPTED_MUTATIONS_SIZE)
1988                        .await
1989                        .context(FxfsError::Inconsistent)?,
1990                );
1991                let mut mutations = EncryptedMutations::deserialize_with_version(&mut cursor)
1992                    .context("Failed to deserialize EncryptedMutations")?
1993                    .0;
1994                let len = cursor.get_ref().len() as u64;
1995                while cursor.position() < len {
1996                    mutations.extend(
1997                        &EncryptedMutations::deserialize_with_version(&mut cursor)
1998                            .context("Failed to deserialize EncryptedMutations")?
1999                            .0,
2000                    );
2001                }
2002                mutations
2003            }
2004        };
2005
2006        // This assumes that the journal has no buffered mutations for this store (see Self::lock).
2007        let journaled = EncryptedMutations::from_replayed_mutations(
2008            self.store_object_id,
2009            fs.journal()
2010                .read_transactions_for_object(self.store_object_id)
2011                .await
2012                .context("Failed to read encrypted mutations from journal")?,
2013        );
2014        mutations.extend(&journaled);
2015
2016        let _ = std::mem::replace(&mut *self.lock_state.lock(), LockState::Unlocking);
2017        *self.store_info.lock() = Some(store_info);
2018
2019        // If we fail, clean up.
2020        let clean_up = scopeguard::guard((), |_| {
2021            *self.lock_state.lock() = LockState::Locked;
2022            *self.store_info.lock() = None;
2023            // Make sure we don't leave unencrypted data lying around in memory.
2024            self.tree.reset();
2025        });
2026
2027        let EncryptedMutations { transactions, mut data, mutations_key_roll } = mutations;
2028
2029        let mut slice = &mut data[..];
2030        let mut last_offset = 0;
2031        for (offset, key) in mutations_key_roll {
2032            let split_offset = offset
2033                .checked_sub(last_offset)
2034                .ok_or(FxfsError::Inconsistent)
2035                .context("Invalid mutation key roll offset")?;
2036            last_offset = offset;
2037            ensure!(split_offset <= slice.len(), FxfsError::Inconsistent);
2038            let (old, new) = slice.split_at_mut(split_offset);
2039            mutations_cipher.decrypt(old);
2040            let unwrapped_key = crypt
2041                .unwrap_key(&fxfs_crypto::WrappedKey::Fxfs(key.into()), self.store_object_id)
2042                .await
2043                .context("Failed to unwrap mutations keys")?;
2044            mutations_cipher = StreamCipher::new(&unwrapped_key, 0);
2045            slice = new;
2046        }
2047        mutations_cipher.decrypt(slice);
2048
2049        // Always roll the mutations key when we unlock which guarantees we won't reuse a
2050        // previous key and nonce.
2051        self.roll_mutations_key(crypt.as_ref()).await?;
2052
2053        let mut cursor = std::io::Cursor::new(data);
2054        for (checkpoint, count) in transactions {
2055            let context = ApplyContext { mode: ApplyMode::Replay, checkpoint };
2056            for _ in 0..count {
2057                let mutation =
2058                    Mutation::deserialize_from_version(&mut cursor, context.checkpoint.version)
2059                        .context("failed to deserialize encrypted mutation")?;
2060                self.apply_mutation(mutation, &context, AssocObj::None)
2061                    .context("failed to apply encrypted mutation")?;
2062            }
2063        }
2064
2065        *self.lock_state.lock() = if read_only {
2066            LockState::UnlockedReadOnly(crypt)
2067        } else {
2068            LockState::Unlocked { owner, crypt }
2069        };
2070
2071        // To avoid unbounded memory growth, we should flush the encrypted mutations now. Otherwise
2072        // it's possible for more writes to be queued and for the store to be locked before we can
2073        // flush anything and that can repeat.
2074        std::mem::drop(guard);
2075
2076        if !read_only && !self.filesystem().options().read_only {
2077            self.flush_with_reason(flush::Reason::Unlock).await?;
2078
2079            // Reap purged files within this store.
2080            let _ = self.filesystem().graveyard().initial_reap(&self).await?;
2081        }
2082
2083        // Return and cancel the clean up.
2084        Ok(ScopeGuard::into_inner(clean_up))
2085    }
2086
2087    pub fn is_locked(&self) -> bool {
2088        matches!(
2089            *self.lock_state.lock(),
2090            LockState::Locked | LockState::Locking | LockState::Unknown
2091        )
2092    }
2093
2094    /// NB: This is not the converse of `is_locked`, as there are lock states where neither are
2095    /// true.
2096    pub fn is_unlocked(&self) -> bool {
2097        matches!(
2098            *self.lock_state.lock(),
2099            LockState::Unlocked { .. } | LockState::UnlockedReadOnly { .. } | LockState::Unlocking
2100        )
2101    }
2102
2103    pub fn is_unknown(&self) -> bool {
2104        matches!(*self.lock_state.lock(), LockState::Unknown)
2105    }
2106
2107    pub fn is_encrypted(&self) -> bool {
2108        self.store_info.lock().as_ref().unwrap().mutations_key.is_some()
2109    }
2110
2111    // Locks a store.
2112    // This operation will take a flush lock on the store, in case any flushes are ongoing.  Any
2113    // ongoing store accesses might be interrupted by this.  See `Self::crypt`.
2114    // Whilst this can return an error, the store will be placed into an unusable but safe state
2115    // (i.e. no lingering unencrypted data) if an error is encountered.
2116    pub async fn lock(&self) -> Result<(), Error> {
2117        // We must lock flushing since it is not safe for that to be happening whilst we are locking
2118        // the store.
2119        let keys = lock_keys![LockKey::flush(self.store_object_id())];
2120        let fs = self.filesystem();
2121        let _guard = fs.lock_manager().write_lock(keys).await;
2122
2123        {
2124            let mut lock_state = self.lock_state.lock();
2125            if let LockState::Unlocked { .. } = &*lock_state {
2126                *lock_state = LockState::Locking;
2127            } else {
2128                panic!("Unexpected lock state: {:?}", &*lock_state);
2129            }
2130        }
2131
2132        // Sync the journal now to ensure that any buffered mutations for this store make it out to
2133        // disk.  This is necessary to be able to unlock the store again.
2134        // We need to establish a barrier at this point (so that the journaled writes are observable
2135        // by any future attempts to unlock the store), hence the flush_device.
2136        let sync_result =
2137            self.filesystem().sync(SyncOptions { flush_device: true, ..Default::default() }).await;
2138
2139        *self.lock_state.lock() = if let Err(error) = &sync_result {
2140            error!(error:?; "Failed to sync journal; store will no longer be usable");
2141            LockState::Invalid
2142        } else {
2143            LockState::Locked
2144        };
2145        self.key_manager.clear();
2146        *self.store_info.lock() = None;
2147        self.tree.reset();
2148
2149        sync_result
2150    }
2151
2152    // Locks a store which was previously unlocked read-only (see `Self::unlock_read_only`).  Data
2153    // is not flushed, and instead any journaled mutations are buffered back into the ObjectStore
2154    // and will be replayed next time the store is unlocked.
2155    pub fn lock_read_only(&self) {
2156        *self.lock_state.lock() = LockState::Locked;
2157        *self.store_info.lock() = None;
2158        self.tree.reset();
2159    }
2160
2161    // Returns None if the object ID cipher needs to be created or rolled, or a more expensive
2162    // algorithm needs to be used.
2163    fn maybe_get_next_object_id(&self) -> Option<ReservedId<'_>> {
2164        self.last_object_id.lock().try_get_next().map(|id| ReservedId::new(self, id))
2165    }
2166
2167    /// Returns a new object ID that can be used.  This will create an object ID cipher if needed.
2168    ///
2169    /// If the object ID key needs to be rolled, a new transaction will be created and committed.
2170    /// This transaction does not take the filesystem lock, hence `txn_guard`.
2171    pub(super) async fn get_next_object_id(
2172        &self,
2173        txn_guard: &TxnGuard<'_>,
2174    ) -> Result<ReservedId<'_>, Error> {
2175        {
2176            let mut last_object_id = self.last_object_id.lock();
2177            if let Some(id) = last_object_id.try_get_next() {
2178                return Ok(ReservedId::new(self, id));
2179            }
2180            ensure!(
2181                !matches!(&*last_object_id, LastObjectId::Unencrypted { .. }),
2182                FxfsError::Inconsistent
2183            );
2184        }
2185
2186        let parent_store = self.parent_store().unwrap();
2187
2188        // Create a transaction (which has a lock) and then check again.
2189        //
2190        // NOTE: Since this can be a nested transaction, we must take care to avoid deadlocks; no
2191        // more locks should be taken whilst we hold this lock.
2192        let mut transaction = self
2193            .filesystem()
2194            .new_transaction(
2195                lock_keys![LockKey::object(parent_store.store_object_id, self.store_object_id)],
2196                Options {
2197                    // We must skip journal checks because this transaction might be needed to
2198                    // compact.
2199                    skip_journal_checks: true,
2200                    borrow_metadata_space: true,
2201                    txn_guard: Some(txn_guard),
2202                    ..Default::default()
2203                },
2204            )
2205            .await?;
2206
2207        let mut next_id_hi = 0;
2208
2209        let is_low_32_bit = {
2210            let mut last_object_id = self.last_object_id.lock();
2211            if let Some(id) = last_object_id.try_get_next() {
2212                // Something else raced and created/rolled the cipher.
2213                return Ok(ReservedId::new(self, id));
2214            }
2215
2216            match &*last_object_id {
2217                LastObjectId::Encrypted { id, .. } => {
2218                    // It shouldn't be possible for last_object_id to wrap within our lifetime, so
2219                    // if this happens, it's most likely due to corruption.
2220                    next_id_hi =
2221                        id.checked_add(1 << 32).ok_or(FxfsError::Inconsistent)? & OBJECT_ID_HI_MASK;
2222
2223                    info!(store_id = self.store_object_id; "Rolling object ID key");
2224
2225                    false
2226                }
2227                LastObjectId::Low32Bit { .. } => true,
2228                _ => unreachable!(),
2229            }
2230        };
2231
2232        if is_low_32_bit {
2233            // Keep picking an object ID at random until we find one free.
2234
2235            // To avoid races, this must be before we capture the layer set.
2236            self.last_object_id.lock().drain_unreserved();
2237
2238            let layer_set = self.tree.layer_set();
2239            let mut key = ObjectKey::object(0);
2240            loop {
2241                let next_id = rand::rng().next_u32() as u64;
2242                let Some(next_id) = NonZero::new(next_id) else { continue };
2243                if self.last_object_id.lock().is_reserved(next_id.get()) {
2244                    continue;
2245                }
2246                key.object_id = next_id.get();
2247                if layer_set.key_exists(&key).await? == Existence::Missing {
2248                    self.last_object_id.lock().reserve(next_id.get());
2249                    return Ok(ReservedId::new(self, next_id));
2250                }
2251            }
2252        } else {
2253            // Create a key.
2254            let (object_id_wrapped, object_id_unwrapped) = self
2255                .crypt()
2256                .unwrap()
2257                .create_key(self.store_object_id, KeyPurpose::Metadata)
2258                .await?;
2259
2260            // Normally we would use a mutation to note the updated key, but that would complicate
2261            // replay.  During replay, we need to keep track of the highest used object ID and this
2262            // is done by watching mutations to see when we create objects, and then decrypting
2263            // the object ID.  This relies on the unwrapped key being available, so as soon as
2264            // we detect the key has changed, we would need to immediately unwrap the key via the
2265            // crypt service.  Currently, this isn't easy to do during replay.  An option we could
2266            // consider would be to include the unencrypted object ID when we create objects, which
2267            // would avoid us having to decrypt the object ID during replay.
2268            //
2269            // For now and for historical reasons, the approach we take is to just write a new
2270            // version of StoreInfo here.  We must take care that we only update the key and not any
2271            // other information contained within StoreInfo because other information should only be
2272            // updated when we flush.  We are holding the lock on the StoreInfo file, so this will
2273            // prevent potential races with flushing.  To make sure we only change the key, we read
2274            // StoreInfo from storage rather than using our in-memory copy.  This won't be
2275            // performant, but rolling the object ID key will be extremely rare.
2276            let new_store_info = StoreInfo {
2277                last_object_id: LastObjectIdInfo::Encrypted {
2278                    id: next_id_hi,
2279                    key: object_id_wrapped.clone(),
2280                },
2281                ..self.load_store_info().await?
2282            };
2283
2284            self.write_store_info(&mut transaction, &new_store_info).await?;
2285
2286            transaction
2287                .commit_with_callback(|_| {
2288                    self.store_info.lock().as_mut().unwrap().last_object_id =
2289                        new_store_info.last_object_id;
2290                    match &mut *self.last_object_id.lock() {
2291                        LastObjectId::Encrypted { id, cipher } => {
2292                            **cipher = Ff1::new(&object_id_unwrapped);
2293                            *id = next_id_hi;
2294                            ReservedId::new(
2295                                self,
2296                                NonZero::new(next_id_hi | cipher.encrypt(0) as u64).unwrap(),
2297                            )
2298                        }
2299                        _ => unreachable!(),
2300                    }
2301                })
2302                .await
2303        }
2304    }
2305
2306    /// Query the next object ID that will be used. Intended for use when checking filesystem
2307    /// consistency. Prefer [`Self::get_next_object_id()`] for general use.
2308    pub(crate) fn query_next_object_id(&self) -> u64 {
2309        self.last_object_id.lock().peek_next()
2310    }
2311
2312    fn allocator(&self) -> Arc<Allocator> {
2313        self.filesystem().allocator()
2314    }
2315
2316    // If |transaction| has an impending mutation for the underlying object, returns that.
2317    // Otherwise, looks up the object from the tree and returns a suitable mutation for it.  The
2318    // mutation is returned here rather than the item because the mutation includes the operation
2319    // which has significance: inserting an object implies it's the first of its kind unlike
2320    // replacing an object.
2321    async fn txn_get_object_mutation(
2322        &self,
2323        transaction: &Transaction<'_>,
2324        object_id: u64,
2325    ) -> Result<ObjectStoreMutation, Error> {
2326        if let Some(mutation) =
2327            transaction.get_object_mutation(self.store_object_id, ObjectKey::object(object_id))
2328        {
2329            Ok(mutation.clone())
2330        } else {
2331            Ok(ObjectStoreMutation {
2332                item: self
2333                    .tree
2334                    .find(&ObjectKey::object(object_id))
2335                    .await?
2336                    .ok_or(FxfsError::Inconsistent)
2337                    .context("Object id missing")?,
2338                op: Operation::ReplaceOrInsert,
2339            })
2340        }
2341    }
2342
2343    /// Like txn_get_object_mutation but with expanded visibility.
2344    /// Only available in migration code.
2345    #[cfg(feature = "migration")]
2346    pub async fn get_object_mutation(
2347        &self,
2348        transaction: &Transaction<'_>,
2349        object_id: u64,
2350    ) -> Result<ObjectStoreMutation, Error> {
2351        self.txn_get_object_mutation(transaction, object_id).await
2352    }
2353
2354    fn update_last_object_id(&self, object_id: u64) {
2355        let mut last_object_id = self.last_object_id.lock();
2356        match &mut *last_object_id {
2357            LastObjectId::Pending => unreachable!(),
2358            LastObjectId::Unencrypted { id } => {
2359                if object_id > *id {
2360                    *id = object_id
2361                }
2362            }
2363            LastObjectId::Encrypted { id, cipher } => {
2364                // For encrypted stores, object_id will be encrypted here, so we must decrypt first.
2365
2366                // If the object ID cipher has been rolled, then it's possible we might see object
2367                // IDs that were generated using a different cipher so the decrypt here will return
2368                // the wrong value, but that won't matter because the hi part of the object ID
2369                // should still discriminate.
2370                let object_id =
2371                    object_id & OBJECT_ID_HI_MASK | cipher.decrypt(object_id as u32) as u64;
2372                if object_id > *id {
2373                    *id = object_id;
2374                }
2375            }
2376            LastObjectId::Low32Bit { .. } => {}
2377        }
2378    }
2379
2380    /// If possible, converts the given object ID to its unencrypted value.  Returns None if it is
2381    /// not possible to convert to its unencrypted value because the key is unavailable.
2382    pub fn to_unencrypted_object_id(&self, object_id: u64) -> Option<u64> {
2383        let last_object_id = self.last_object_id.lock();
2384        match &*last_object_id {
2385            LastObjectId::Pending => None,
2386            LastObjectId::Unencrypted { .. } | LastObjectId::Low32Bit { .. } => Some(object_id),
2387            LastObjectId::Encrypted { id, cipher } => {
2388                if id & OBJECT_ID_HI_MASK != object_id & OBJECT_ID_HI_MASK {
2389                    None
2390                } else {
2391                    Some(object_id & OBJECT_ID_HI_MASK | cipher.decrypt(object_id as u32) as u64)
2392                }
2393            }
2394        }
2395    }
2396
2397    /// Adds the specified object to the graveyard.
2398    pub fn add_to_graveyard(&self, transaction: &mut Transaction<'_>, object_id: u64) {
2399        let graveyard_id = self.graveyard_directory_object_id();
2400        assert_ne!(graveyard_id, INVALID_OBJECT_ID);
2401        transaction.add(
2402            self.store_object_id,
2403            Mutation::replace_or_insert_object(
2404                ObjectKey::graveyard_entry(graveyard_id, object_id),
2405                ObjectValue::Some,
2406            ),
2407        );
2408    }
2409
2410    /// Removes the specified object from the graveyard.  NB: Care should be taken when calling
2411    /// this because graveyard entries are used for purging deleted files *and* for trimming
2412    /// extents.  For example, consider the following sequence:
2413    ///
2414    ///     1. Add Trim graveyard entry.
2415    ///     2. Replace with Some graveyard entry (see above).
2416    ///     3. Remove graveyard entry.
2417    ///
2418    /// If the desire in #3 is just to cancel the effect of the Some entry, then #3 should
2419    /// actually be:
2420    ///
2421    ///     3. Replace with Trim graveyard entry.
2422    pub fn remove_from_graveyard(&self, transaction: &mut Transaction<'_>, object_id: u64) {
2423        transaction.add(
2424            self.store_object_id,
2425            Mutation::replace_or_insert_object(
2426                ObjectKey::graveyard_entry(self.graveyard_directory_object_id(), object_id),
2427                ObjectValue::None,
2428            ),
2429        );
2430    }
2431
2432    /// Removes the specified attribute from the graveyard. Unlike object graveyard entries,
2433    /// attribute graveyard entries only have one functionality (i.e. to purge deleted attributes)
2434    /// so the caller does not need to be concerned about replacing the graveyard attribute entry
2435    /// with its prior state when cancelling it. See comment on `remove_from_graveyard()`.
2436    pub fn remove_attribute_from_graveyard(
2437        &self,
2438        transaction: &mut Transaction<'_>,
2439        object_id: u64,
2440        attribute_id: u64,
2441    ) {
2442        transaction.add(
2443            self.store_object_id,
2444            Mutation::replace_or_insert_object(
2445                ObjectKey::graveyard_attribute_entry(
2446                    self.graveyard_directory_object_id(),
2447                    object_id,
2448                    attribute_id,
2449                ),
2450                ObjectValue::None,
2451            ),
2452        );
2453    }
2454
2455    // Roll the mutations key.  The new key will be written for the next encrypted mutation.
2456    async fn roll_mutations_key(&self, crypt: &dyn Crypt) -> Result<(), Error> {
2457        let (wrapped_key, unwrapped_key) =
2458            crypt.create_key(self.store_object_id, KeyPurpose::Metadata).await?;
2459
2460        // The mutations_cipher lock must be held for the duration so that mutations_cipher and
2461        // store_info are updated atomically.  Otherwise, write_mutation could find a new cipher but
2462        // end up writing the wrong wrapped key.
2463        let mut cipher = self.mutations_cipher.lock();
2464        *cipher = Some(StreamCipher::new(&unwrapped_key, 0));
2465        self.store_info.lock().as_mut().unwrap().mutations_key = Some(wrapped_key);
2466        // mutations_cipher_offset is updated by flush.
2467        Ok(())
2468    }
2469
2470    // When the symlink is unlocked, this function decrypts `link` and returns a bag of bytes that
2471    // is identical to that which was passed in as the target on `create_symlink`.
2472    // If the symlink is locked, this function hashes the encrypted `link` with Sha256 in order to
2473    // get a standard length and then base64 encodes the hash and returns that to the caller.
2474    pub async fn read_encrypted_symlink(
2475        &self,
2476        object_id: u64,
2477        link: Vec<u8>,
2478    ) -> Result<Vec<u8>, Error> {
2479        let mut link = link;
2480        let key = self
2481            .key_manager()
2482            .get_fscrypt_key(object_id, self.crypt().unwrap().as_ref(), async || {
2483                self.get_keys(object_id).await
2484            })
2485            .await?;
2486        if let Some(key) = key.into_cipher() {
2487            key.decrypt_symlink(object_id, &mut link)?;
2488            Ok(link)
2489        } else {
2490            // Locked symlinks are encoded using a hash_code of 0.
2491            let proxy_filename =
2492                fscrypt::proxy_filename::ProxyFilename::new_with_hash_code(0, &link);
2493            let proxy_filename_str: String = proxy_filename.into();
2494            Ok(proxy_filename_str.as_bytes().to_vec())
2495        }
2496    }
2497
2498    /// Returns the link of a symlink object.
2499    pub async fn read_symlink(&self, object_id: u64) -> Result<Vec<u8>, Error> {
2500        match self.tree.find(&ObjectKey::object(object_id)).await? {
2501            None => bail!(FxfsError::NotFound),
2502            Some(Item {
2503                value: ObjectValue::Object { kind: ObjectKind::EncryptedSymlink { link, .. }, .. },
2504                ..
2505            }) => self.read_encrypted_symlink(object_id, link.to_vec()).await,
2506            Some(Item {
2507                value: ObjectValue::Object { kind: ObjectKind::Symlink { link, .. }, .. },
2508                ..
2509            }) => Ok(link.to_vec()),
2510            Some(item) => Err(anyhow!(FxfsError::Inconsistent)
2511                .context(format!("Unexpected item in lookup: {item:?}"))),
2512        }
2513    }
2514
2515    /// Retrieves the wrapped keys for the given object.  The keys *should* be known to exist and it
2516    /// will be considered an inconsistency if they don't.
2517    pub async fn get_keys(&self, object_id: u64) -> Result<EncryptionKeys, Error> {
2518        match self.tree.find(&ObjectKey::keys(object_id)).await?.ok_or(FxfsError::Inconsistent)? {
2519            Item { value: ObjectValue::Keys(keys), .. } => Ok(keys),
2520            _ => Err(anyhow!(FxfsError::Inconsistent).context("open_object: Expected keys")),
2521        }
2522    }
2523
2524    pub async fn update_attributes<'a>(
2525        &self,
2526        transaction: &mut Transaction<'a>,
2527        object_id: u64,
2528        node_attributes: Option<&fio::MutableNodeAttributes>,
2529        change_time: Option<Timestamp>,
2530    ) -> Result<(), Error> {
2531        if change_time.is_none() {
2532            if let Some(attributes) = node_attributes {
2533                let empty_attributes = fio::MutableNodeAttributes { ..Default::default() };
2534                if *attributes == empty_attributes {
2535                    return Ok(());
2536                }
2537            } else {
2538                return Ok(());
2539            }
2540        }
2541        let mut mutation = self.txn_get_object_mutation(transaction, object_id).await?;
2542        if let ObjectValue::Object { ref mut attributes, .. } = mutation.item.value {
2543            if let Some(time) = change_time {
2544                attributes.change_time = time;
2545            }
2546            if let Some(node_attributes) = node_attributes {
2547                if let Some(time) = node_attributes.creation_time {
2548                    attributes.creation_time = Timestamp::from_nanos(time);
2549                }
2550                if let Some(time) = node_attributes.modification_time {
2551                    attributes.modification_time = Timestamp::from_nanos(time);
2552                }
2553                if let Some(time) = node_attributes.access_time {
2554                    attributes.access_time = Timestamp::from_nanos(time);
2555                }
2556                if node_attributes.mode.is_some()
2557                    || node_attributes.uid.is_some()
2558                    || node_attributes.gid.is_some()
2559                    || node_attributes.rdev.is_some()
2560                {
2561                    if let Some(a) = &mut attributes.posix_attributes {
2562                        if let Some(mode) = node_attributes.mode {
2563                            a.mode = mode;
2564                        }
2565                        if let Some(uid) = node_attributes.uid {
2566                            a.uid = uid;
2567                        }
2568                        if let Some(gid) = node_attributes.gid {
2569                            a.gid = gid;
2570                        }
2571                        if let Some(rdev) = node_attributes.rdev {
2572                            a.rdev = rdev;
2573                        }
2574                    } else {
2575                        attributes.posix_attributes = Some(PosixAttributes {
2576                            mode: node_attributes.mode.unwrap_or_default(),
2577                            uid: node_attributes.uid.unwrap_or_default(),
2578                            gid: node_attributes.gid.unwrap_or_default(),
2579                            rdev: node_attributes.rdev.unwrap_or_default(),
2580                        });
2581                    }
2582                }
2583            }
2584        } else {
2585            bail!(
2586                anyhow!(FxfsError::Inconsistent)
2587                    .context("ObjectStore.update_attributes: Expected object value")
2588            );
2589        };
2590        transaction.add(self.store_object_id(), Mutation::ObjectStore(mutation));
2591        Ok(())
2592    }
2593
2594    // Updates and commits the changes to access time in ObjectProperties. The update matches
2595    // Linux's RELATIME. That is, access time is updated to the current time if access time is less
2596    // than or equal to the last modification or status change, or if it has been more than a day
2597    // since the last access.
2598    pub async fn update_access_time(
2599        &self,
2600        object_id: u64,
2601        props: &mut ObjectProperties,
2602    ) -> Result<(), Error> {
2603        let access_time = props.access_time.as_nanos();
2604        let modification_time = props.modification_time.as_nanos();
2605        let change_time = props.change_time.as_nanos();
2606        let now = Timestamp::now();
2607        if access_time <= modification_time
2608            || access_time <= change_time
2609            || access_time
2610                < now.as_nanos()
2611                    - Timestamp::from(std::time::Duration::from_secs(24 * 60 * 60)).as_nanos()
2612        {
2613            let mut transaction = self
2614                .filesystem()
2615                .clone()
2616                .new_transaction(
2617                    lock_keys![LockKey::object(self.store_object_id, object_id,)],
2618                    Options { borrow_metadata_space: true, ..Default::default() },
2619                )
2620                .await?;
2621            self.update_attributes(
2622                &mut transaction,
2623                object_id,
2624                Some(&fio::MutableNodeAttributes {
2625                    access_time: Some(now.as_nanos()),
2626                    ..Default::default()
2627                }),
2628                None,
2629            )
2630            .await?;
2631            transaction.commit().await?;
2632            props.access_time = now;
2633        }
2634        Ok(())
2635    }
2636
2637    async fn write_store_info<'a>(
2638        &'a self,
2639        transaction: &mut Transaction<'a>,
2640        info: &StoreInfo,
2641    ) -> Result<(), Error> {
2642        let mut serialized_info = Vec::new();
2643        info.serialize_with_version(&mut serialized_info)?;
2644        let mut buf = self.device.allocate_buffer(serialized_info.len()).await;
2645        buf.as_mut_slice().copy_from_slice(&serialized_info[..]);
2646        self.store_info_handle.get().unwrap().txn_write(transaction, 0u64, buf.as_ref()).await
2647    }
2648
2649    pub fn mark_deleted(&self) {
2650        *self.lock_state.lock() = LockState::Deleted;
2651    }
2652
2653    #[cfg(test)]
2654    pub(crate) fn test_set_last_object_id(&self, object_id: u64) {
2655        match &mut *self.last_object_id.lock() {
2656            LastObjectId::Encrypted { id, .. } => *id = object_id,
2657            _ => unreachable!(),
2658        }
2659    }
2660}
2661
2662#[async_trait]
2663impl JournalingObject for ObjectStore {
2664    fn apply_mutation(
2665        &self,
2666        mutation: Mutation,
2667        context: &ApplyContext<'_, '_>,
2668        _assoc_obj: AssocObj<'_>,
2669    ) -> Result<(), Error> {
2670        match &*self.lock_state.lock() {
2671            LockState::Locked | LockState::Locking => {
2672                ensure!(
2673                    matches!(mutation, Mutation::BeginFlush | Mutation::EndFlush)
2674                        || matches!(
2675                            mutation,
2676                            Mutation::EncryptedObjectStore(_) | Mutation::UpdateMutationsKey(_)
2677                                if context.mode.is_replay()
2678                        ),
2679                    anyhow!(FxfsError::Inconsistent)
2680                        .context(format!("Unexpected mutation for encrypted store: {mutation:?}"))
2681                );
2682            }
2683            LockState::Invalid
2684            | LockState::Unlocking
2685            | LockState::Unencrypted
2686            | LockState::Unlocked { .. }
2687            | LockState::UnlockedReadOnly(..)
2688            | LockState::Deleted => {}
2689            lock_state @ _ => panic!("Unexpected lock state: {lock_state:?}"),
2690        }
2691        match mutation {
2692            Mutation::ObjectStore(ObjectStoreMutation { mut item, op }) => {
2693                item.sequence = context.checkpoint.file_offset;
2694                match op {
2695                    Operation::Insert => {
2696                        let mut unreserve_id = INVALID_OBJECT_ID;
2697                        // If we are inserting an object record for the first time, it signifies the
2698                        // birth of the object so we need to adjust the object count.
2699                        if matches!(item.value, ObjectValue::Object { .. }) {
2700                            {
2701                                let info = &mut self.store_info.lock();
2702                                let object_count = &mut info.as_mut().unwrap().object_count;
2703                                *object_count = object_count.saturating_add(1);
2704                            }
2705                            if context.mode.is_replay() {
2706                                self.update_last_object_id(item.key.object_id);
2707                            } else {
2708                                unreserve_id = item.key.object_id;
2709                            }
2710                        }
2711                        self.tree.insert(item)?;
2712                        if unreserve_id != INVALID_OBJECT_ID {
2713                            // To avoid races, this *must* be after the `tree.insert(..)` above.
2714                            self.last_object_id.lock().unreserve(unreserve_id);
2715                        }
2716                    }
2717                    Operation::ReplaceOrInsert => {
2718                        self.tree.replace_or_insert(item);
2719                    }
2720                    Operation::Merge => {
2721                        if item.is_tombstone() {
2722                            let info = &mut self.store_info.lock();
2723                            let object_count = &mut info.as_mut().unwrap().object_count;
2724                            *object_count = object_count.saturating_sub(1);
2725                        }
2726                        let lower_bound = item.key.key_for_merge_into();
2727                        self.tree.merge_into(item, &lower_bound);
2728                    }
2729                }
2730            }
2731            Mutation::BeginFlush => {
2732                ensure!(self.parent_store.is_some(), FxfsError::Inconsistent);
2733                self.tree.seal();
2734            }
2735            Mutation::EndFlush => ensure!(self.parent_store.is_some(), FxfsError::Inconsistent),
2736            Mutation::EncryptedObjectStore(_) | Mutation::UpdateMutationsKey(_) => {
2737                // We will process these during Self::unlock.
2738                ensure!(
2739                    !matches!(&*self.lock_state.lock(), LockState::Unencrypted),
2740                    FxfsError::Inconsistent
2741                );
2742            }
2743            Mutation::CreateInternalDir(object_id) => {
2744                ensure!(object_id != INVALID_OBJECT_ID, FxfsError::Inconsistent);
2745                self.store_info.lock().as_mut().unwrap().internal_directory_object_id = object_id;
2746            }
2747            _ => bail!("unexpected mutation: {:?}", mutation),
2748        }
2749        self.counters.lock().mutations_applied += 1;
2750        Ok(())
2751    }
2752
2753    fn drop_mutation(&self, mutation: Mutation, _transaction: &Transaction<'_>) {
2754        self.counters.lock().mutations_dropped += 1;
2755        if let Mutation::ObjectStore(ObjectStoreMutation {
2756            item: Item { key: ObjectKey { object_id, .. }, value: ObjectValue::Object { .. }, .. },
2757            op: Operation::Insert,
2758        }) = mutation
2759        {
2760            self.last_object_id.lock().unreserve(object_id);
2761        }
2762    }
2763
2764    /// Push all in-memory structures to the device. This is not necessary for sync since the
2765    /// journal will take care of it.  This is supposed to be called when there is either memory or
2766    /// space pressure (flushing the store will persist in-memory data and allow the journal file to
2767    /// be trimmed).
2768    ///
2769    /// Also returns the earliest version of a struct in the filesystem (when known).
2770    async fn flush(&self) -> Result<Version, Error> {
2771        self.flush_with_reason(flush::Reason::Journal).await
2772    }
2773
2774    fn write_mutation(&self, mutation: &Mutation, mut writer: journal::Writer<'_>) {
2775        // Intentionally enumerating all variants to force a decision on any new variants. Encrypt
2776        // all mutations that could affect an encrypted object store contents or the `StoreInfo` of
2777        // the encrypted object store. During `unlock()` any mutations which haven't been encrypted
2778        // won't be replayed after reading `StoreInfo`.
2779        match mutation {
2780            // Whilst CreateInternalDir is a mutation for `StoreInfo`, which isn't encrypted, we
2781            // still choose to encrypt the mutation because it makes it easier to deal with replay.
2782            // When we replay mutations for an encrypted store, the only thing we keep in memory are
2783            // the encrypted mutations; we don't keep `StoreInfo` or changes to it in memory. So, by
2784            // encrypting the CreateInternalDir mutation here, it means we don't have to track both
2785            // encrypted mutations bound for the LSM tree and unencrypted mutations for `StoreInfo`
2786            // to use in `unlock()`. It'll just bundle CreateInternalDir mutations with the other
2787            // encrypted mutations and handled them all in sequence during `unlock()`.
2788            Mutation::ObjectStore(_) | Mutation::CreateInternalDir(_) => {
2789                let mut cipher = self.mutations_cipher.lock();
2790                if let Some(cipher) = cipher.as_mut() {
2791                    // If this is the first time we've used this key, we must write the key out.
2792                    if cipher.offset() == 0 {
2793                        writer.write(Mutation::update_mutations_key(
2794                            self.store_info
2795                                .lock()
2796                                .as_ref()
2797                                .unwrap()
2798                                .mutations_key
2799                                .as_ref()
2800                                .unwrap()
2801                                .clone(),
2802                        ));
2803                    }
2804                    let mut buffer = Vec::new();
2805                    mutation.serialize_into(&mut buffer).unwrap();
2806                    cipher.encrypt(&mut buffer);
2807                    writer.write(Mutation::EncryptedObjectStore(buffer.into()));
2808                    return;
2809                }
2810            }
2811            // `EncryptedObjectStore` and `UpdateMutationsKey` are both obviously associated with
2812            // encrypted object stores, but are either the encrypted mutation data itself or
2813            // metadata governing how the data will be encrypted. They should only be produced here.
2814            Mutation::EncryptedObjectStore(_) | Mutation::UpdateMutationsKey(_) => {
2815                debug_assert!(false, "Only this method should generate encrypted mutations");
2816            }
2817            // `BeginFlush` and `EndFlush` are not needed during `unlock()` and are needed during
2818            // the initial journal replay, so should not be encrypted. `Allocator`, `DeleteVolume`,
2819            // `UpdateBorrowed` mutations are never associated with an encrypted store as we do not
2820            // encrypt the allocator or root/root-parent stores so we can avoid the locking.
2821            Mutation::Allocator(_)
2822            | Mutation::BeginFlush
2823            | Mutation::EndFlush
2824            | Mutation::DeleteVolume
2825            | Mutation::UpdateBorrowed(_) => {}
2826        }
2827        writer.write(mutation.clone());
2828    }
2829}
2830
2831impl Drop for ObjectStore {
2832    fn drop(&mut self) {
2833        let mut last_object_id = self.last_object_id.lock();
2834        last_object_id.drain_unreserved();
2835        match &*last_object_id {
2836            LastObjectId::Low32Bit { reserved, .. } => debug_assert!(reserved.is_empty()),
2837            _ => {}
2838        }
2839    }
2840}
2841
2842impl HandleOwner for ObjectStore {}
2843
2844impl AsRef<ObjectStore> for ObjectStore {
2845    fn as_ref(&self) -> &ObjectStore {
2846        self
2847    }
2848}
2849
2850fn layer_size_from_encrypted_mutations_size(size: u64) -> u64 {
2851    // This is similar to reserved_space_from_journal_usage. It needs to be a worst case estimate of
2852    // the amount of metadata space that might need to be reserved to allow the encrypted mutations
2853    // to be written to layer files.  It needs to be >= than reservation_amount_from_layer_size will
2854    // return once the data has been written to layer files and <= than
2855    // reserved_space_from_journal_usage would use.  We can't just use
2856    // reserved_space_from_journal_usage because the encrypted mutations file includes some extra
2857    // data (it includes the checkpoints) that isn't written in the same way to the journal.
2858    size * 3
2859}
2860
2861impl AssociatedObject for ObjectStore {}
2862
2863/// Argument to the trim_some method.
2864#[derive(Debug)]
2865pub enum TrimMode {
2866    /// Trim extents beyond the current size.
2867    UseSize,
2868
2869    /// Trim extents beyond the supplied offset.
2870    FromOffset(u64),
2871
2872    /// Remove the object (or attribute) from the store once it is fully trimmed.
2873    Tombstone(TombstoneMode),
2874}
2875
2876/// Sets the mode for tombstoning (either at the object or attribute level).
2877#[derive(Debug)]
2878pub enum TombstoneMode {
2879    Object,
2880    Attribute,
2881}
2882
2883/// Result of the trim_some method.
2884#[derive(Debug)]
2885pub enum TrimResult {
2886    /// We reached the limit of the transaction and more extents might follow.
2887    Incomplete,
2888
2889    /// We finished this attribute.  Returns the ID of the next attribute for the same object if
2890    /// there is one.
2891    Done(Option<u64>),
2892}
2893
2894/// Loads store info.
2895pub async fn load_store_info(
2896    parent: &Arc<ObjectStore>,
2897    store_object_id: u64,
2898) -> Result<StoreInfo, Error> {
2899    load_store_info_from_handle(
2900        &ObjectStore::open_object(parent, store_object_id, HandleOptions::default(), None).await?,
2901    )
2902    .await
2903}
2904
2905async fn load_store_info_from_handle(
2906    handle: &DataObjectHandle<impl HandleOwner>,
2907) -> Result<StoreInfo, Error> {
2908    Ok(if handle.get_size() > 0 {
2909        let serialized_info = handle.contents(MAX_STORE_INFO_SERIALIZED_SIZE).await?;
2910        let mut cursor = std::io::Cursor::new(serialized_info);
2911        let (store_info, _) = StoreInfo::deserialize_with_version(&mut cursor)
2912            .context("Failed to deserialize StoreInfo")?;
2913        store_info
2914    } else {
2915        // The store_info will be absent for a newly created and empty object store.
2916        StoreInfo::default()
2917    })
2918}
2919
2920#[cfg(test)]
2921mod tests {
2922    use super::{
2923        DEFAULT_DATA_ATTRIBUTE_ID, FSVERITY_MERKLE_ATTRIBUTE_ID, FsverityMetadata, HandleOptions,
2924        LastObjectId, LastObjectIdInfo, LockKey, MAX_STORE_INFO_SERIALIZED_SIZE, Mutation,
2925        NO_OWNER, NewChildStoreOptions, OBJECT_ID_HI_MASK, ObjectStore, RootDigest, StoreInfo,
2926        StoreOptions, StoreOwner,
2927    };
2928    use crate::errors::FxfsError;
2929    use crate::filesystem::{FxFilesystem, JournalingObject, OpenFxFilesystem, SyncOptions};
2930    use crate::fsck::{fsck, fsck_volume};
2931    use crate::lsm_tree::Query;
2932    use crate::lsm_tree::types::{ItemRef, LayerIterator};
2933    use crate::object_handle::{
2934        INVALID_OBJECT_ID, ObjectHandle, ReadObjectHandle, WriteObjectHandle,
2935    };
2936    use crate::object_store::directory::Directory;
2937    use crate::object_store::object_record::{AttributeKey, ObjectKey, ObjectKind, ObjectValue};
2938    use crate::object_store::transaction::{Options, lock_keys};
2939    use crate::object_store::volume::root_volume;
2940    use crate::serialized_types::VersionedLatest;
2941    use crate::testing;
2942    use assert_matches::assert_matches;
2943    use async_trait::async_trait;
2944    use fuchsia_async as fasync;
2945    use fuchsia_sync::Mutex;
2946    use futures::join;
2947    use fxfs_crypto::ff1::Ff1;
2948    use fxfs_crypto::{
2949        Crypt, FXFS_KEY_SIZE, FXFS_WRAPPED_KEY_SIZE, FxfsKey, UnwrappedKey, WrappedKeyBytes,
2950    };
2951    use fxfs_insecure_crypto::new_insecure_crypt;
2952
2953    use std::sync::Arc;
2954    use std::time::Duration;
2955    use storage_device::DeviceHolder;
2956    use storage_device::fake_device::FakeDevice;
2957    use test_case::test_case;
2958
2959    const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
2960
2961    async fn test_filesystem() -> OpenFxFilesystem {
2962        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
2963        FxFilesystem::new_empty(device).await.expect("new_empty failed")
2964    }
2965
2966    #[fuchsia::test]
2967    async fn test_item_sequences() {
2968        let fs = test_filesystem().await;
2969        let object1;
2970        let object2;
2971        let object3;
2972        let mut transaction = fs
2973            .clone()
2974            .new_transaction(lock_keys![], Options::default())
2975            .await
2976            .expect("new_transaction failed");
2977        let store = fs.root_store();
2978        object1 = Arc::new(
2979            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2980                .await
2981                .expect("create_object failed"),
2982        );
2983        transaction.commit().await.expect("commit failed");
2984        let mut transaction = fs
2985            .clone()
2986            .new_transaction(lock_keys![], Options::default())
2987            .await
2988            .expect("new_transaction failed");
2989        object2 = Arc::new(
2990            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2991                .await
2992                .expect("create_object failed"),
2993        );
2994        transaction.commit().await.expect("commit failed");
2995
2996        fs.sync(SyncOptions::default()).await.expect("sync failed");
2997
2998        let mut transaction = fs
2999            .clone()
3000            .new_transaction(lock_keys![], Options::default())
3001            .await
3002            .expect("new_transaction failed");
3003        object3 = Arc::new(
3004            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
3005                .await
3006                .expect("create_object failed"),
3007        );
3008        transaction.commit().await.expect("commit failed");
3009
3010        let layer_set = store.tree.layer_set();
3011        let mut merger = layer_set.merger();
3012        let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
3013        let mut sequences = [0u64; 3];
3014        while let Some(ItemRef { key: ObjectKey { object_id, .. }, sequence, .. }) = iter.get() {
3015            if *object_id == object1.object_id() {
3016                sequences[0] = sequence;
3017            } else if *object_id == object2.object_id() {
3018                sequences[1] = sequence;
3019            } else if *object_id == object3.object_id() {
3020                sequences[2] = sequence;
3021            }
3022            iter.advance().await.expect("advance failed");
3023        }
3024
3025        assert!(sequences[0] <= sequences[1], "sequences: {:?}", sequences);
3026        // The last item came after a sync, so should be strictly greater.
3027        assert!(sequences[1] < sequences[2], "sequences: {:?}", sequences);
3028        fs.close().await.expect("Close failed");
3029    }
3030
3031    #[fuchsia::test]
3032    async fn test_verified_file_with_verified_attribute() {
3033        let fs: OpenFxFilesystem = test_filesystem().await;
3034        let mut transaction = fs
3035            .clone()
3036            .new_transaction(lock_keys![], Options::default())
3037            .await
3038            .expect("new_transaction failed");
3039        let store = fs.root_store();
3040        let object = Arc::new(
3041            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
3042                .await
3043                .expect("create_object failed"),
3044        );
3045
3046        transaction.add(
3047            store.store_object_id(),
3048            Mutation::replace_or_insert_object(
3049                ObjectKey::attribute(
3050                    object.object_id(),
3051                    DEFAULT_DATA_ATTRIBUTE_ID,
3052                    AttributeKey::Attribute,
3053                ),
3054                ObjectValue::verified_attribute(
3055                    0,
3056                    FsverityMetadata::Internal(RootDigest::Sha256([0; 32]), vec![]),
3057                ),
3058            ),
3059        );
3060
3061        transaction.add(
3062            store.store_object_id(),
3063            Mutation::replace_or_insert_object(
3064                ObjectKey::attribute(
3065                    object.object_id(),
3066                    FSVERITY_MERKLE_ATTRIBUTE_ID,
3067                    AttributeKey::Attribute,
3068                ),
3069                ObjectValue::attribute(0, false),
3070            ),
3071        );
3072
3073        transaction.commit().await.unwrap();
3074
3075        let handle =
3076            ObjectStore::open_object(&store, object.object_id(), HandleOptions::default(), None)
3077                .await
3078                .expect("open_object failed");
3079
3080        assert!(handle.is_verified_file());
3081
3082        fs.close().await.expect("Close failed");
3083    }
3084
3085    #[fuchsia::test]
3086    async fn test_verified_file_without_verified_attribute() {
3087        let fs: OpenFxFilesystem = test_filesystem().await;
3088        let mut transaction = fs
3089            .clone()
3090            .new_transaction(lock_keys![], Options::default())
3091            .await
3092            .expect("new_transaction failed");
3093        let store = fs.root_store();
3094        let object = Arc::new(
3095            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
3096                .await
3097                .expect("create_object failed"),
3098        );
3099
3100        transaction.commit().await.unwrap();
3101
3102        let handle =
3103            ObjectStore::open_object(&store, object.object_id(), HandleOptions::default(), None)
3104                .await
3105                .expect("open_object failed");
3106
3107        assert!(!handle.is_verified_file());
3108
3109        fs.close().await.expect("Close failed");
3110    }
3111
3112    #[fuchsia::test]
3113    async fn test_create_and_open_store() {
3114        let fs = test_filesystem().await;
3115        let store_id = {
3116            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3117            root_volume
3118                .new_volume(
3119                    "test",
3120                    NewChildStoreOptions {
3121                        options: StoreOptions {
3122                            owner: NO_OWNER,
3123                            crypt: Some(Arc::new(new_insecure_crypt())),
3124                        },
3125                        ..Default::default()
3126                    },
3127                )
3128                .await
3129                .expect("new_volume failed")
3130                .store_object_id()
3131        };
3132
3133        fs.close().await.expect("close failed");
3134        let device = fs.take_device().await;
3135        device.reopen(false);
3136        let fs = FxFilesystem::open(device).await.expect("open failed");
3137
3138        {
3139            let store = fs.object_manager().store(store_id).expect("store not found");
3140            store.unlock(NO_OWNER, Arc::new(new_insecure_crypt())).await.expect("unlock failed");
3141        }
3142        fs.close().await.expect("Close failed");
3143    }
3144
3145    #[fuchsia::test]
3146    async fn test_create_and_open_internal_dir() {
3147        let fs = test_filesystem().await;
3148        let dir_id;
3149        let store_id;
3150        {
3151            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3152            let store = root_volume
3153                .new_volume(
3154                    "test",
3155                    NewChildStoreOptions {
3156                        options: StoreOptions {
3157                            owner: NO_OWNER,
3158                            crypt: Some(Arc::new(new_insecure_crypt())),
3159                        },
3160                        ..Default::default()
3161                    },
3162                )
3163                .await
3164                .expect("new_volume failed");
3165            dir_id =
3166                store.get_or_create_internal_directory_id().await.expect("Create internal dir");
3167            store_id = store.store_object_id();
3168        }
3169
3170        fs.close().await.expect("close failed");
3171        let device = fs.take_device().await;
3172        device.reopen(false);
3173        let fs = FxFilesystem::open(device).await.expect("open failed");
3174
3175        {
3176            let store = fs.object_manager().store(store_id).expect("store not found");
3177            store.unlock(NO_OWNER, Arc::new(new_insecure_crypt())).await.expect("unlock failed");
3178            assert_eq!(
3179                dir_id,
3180                store.get_or_create_internal_directory_id().await.expect("Retrieving dir")
3181            );
3182            let obj = store
3183                .tree()
3184                .find(&ObjectKey::object(dir_id))
3185                .await
3186                .expect("Searching tree for dir")
3187                .unwrap();
3188            assert_matches!(
3189                obj.value,
3190                ObjectValue::Object { kind: ObjectKind::Directory { .. }, .. }
3191            );
3192        }
3193        fs.close().await.expect("Close failed");
3194    }
3195
3196    #[fuchsia::test]
3197    async fn test_create_and_open_internal_dir_unencrypted() {
3198        let fs = test_filesystem().await;
3199        let dir_id;
3200        let store_id;
3201        {
3202            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3203            let store = root_volume
3204                .new_volume("test", NewChildStoreOptions::default())
3205                .await
3206                .expect("new_volume failed");
3207            dir_id =
3208                store.get_or_create_internal_directory_id().await.expect("Create internal dir");
3209            store_id = store.store_object_id();
3210        }
3211
3212        fs.close().await.expect("close failed");
3213        let device = fs.take_device().await;
3214        device.reopen(false);
3215        let fs = FxFilesystem::open(device).await.expect("open failed");
3216
3217        {
3218            let store = fs.object_manager().store(store_id).expect("store not found");
3219            assert_eq!(
3220                dir_id,
3221                store.get_or_create_internal_directory_id().await.expect("Retrieving dir")
3222            );
3223            let obj = store
3224                .tree()
3225                .find(&ObjectKey::object(dir_id))
3226                .await
3227                .expect("Searching tree for dir")
3228                .unwrap();
3229            assert_matches!(
3230                obj.value,
3231                ObjectValue::Object { kind: ObjectKind::Directory { .. }, .. }
3232            );
3233        }
3234        fs.close().await.expect("Close failed");
3235    }
3236
3237    #[fuchsia::test(threads = 10)]
3238    async fn test_old_layers_are_purged() {
3239        let fs = test_filesystem().await;
3240
3241        let store = fs.root_store();
3242        let mut transaction = fs
3243            .clone()
3244            .new_transaction(lock_keys![], Options::default())
3245            .await
3246            .expect("new_transaction failed");
3247        let object = Arc::new(
3248            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
3249                .await
3250                .expect("create_object failed"),
3251        );
3252        transaction.commit().await.expect("commit failed");
3253
3254        store.flush().await.expect("flush failed");
3255
3256        let mut buf = object.allocate_buffer(5).await;
3257        buf.as_mut_slice().copy_from_slice(b"hello");
3258        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3259
3260        // Getting the layer-set should cause the flush to stall.
3261        let layer_set = store.tree().layer_set();
3262
3263        let done = Mutex::new(false);
3264        let mut object_id = 0;
3265
3266        join!(
3267            async {
3268                store.flush().await.expect("flush failed");
3269                assert!(*done.lock());
3270            },
3271            async {
3272                // This is a halting problem so all we can do is sleep.
3273                fasync::Timer::new(Duration::from_secs(1)).await;
3274                *done.lock() = true;
3275                object_id = layer_set.layers.last().unwrap().handle().unwrap().object_id();
3276                std::mem::drop(layer_set);
3277            }
3278        );
3279
3280        if let Err(e) = ObjectStore::open_object(
3281            &store.parent_store.as_ref().unwrap(),
3282            object_id,
3283            HandleOptions::default(),
3284            store.crypt(),
3285        )
3286        .await
3287        {
3288            assert!(FxfsError::NotFound.matches(&e));
3289        } else {
3290            panic!("open_object succeeded");
3291        }
3292    }
3293
3294    #[fuchsia::test]
3295    async fn test_tombstone_deletes_data() {
3296        let fs = test_filesystem().await;
3297        let root_store = fs.root_store();
3298        let child_id = {
3299            let mut transaction = fs
3300                .clone()
3301                .new_transaction(lock_keys![], Options::default())
3302                .await
3303                .expect("new_transaction failed");
3304            let child = ObjectStore::create_object(
3305                &root_store,
3306                &mut transaction,
3307                HandleOptions::default(),
3308                None,
3309            )
3310            .await
3311            .expect("create_object failed");
3312            transaction.commit().await.expect("commit failed");
3313
3314            // Allocate an extent in the file.
3315            let mut buffer = child.allocate_buffer(8192).await;
3316            buffer.as_mut_slice().fill(0xaa);
3317            child.write_or_append(Some(0), buffer.as_ref()).await.expect("write failed");
3318
3319            child.object_id()
3320        };
3321
3322        root_store.tombstone_object(child_id, Options::default()).await.expect("tombstone failed");
3323
3324        // Let fsck check allocations.
3325        fsck(fs.clone()).await.expect("fsck failed");
3326    }
3327
3328    #[fuchsia::test]
3329    async fn test_tombstone_purges_keys() {
3330        let fs = test_filesystem().await;
3331        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3332        let store = root_volume
3333            .new_volume(
3334                "test",
3335                NewChildStoreOptions {
3336                    options: StoreOptions {
3337                        crypt: Some(Arc::new(new_insecure_crypt())),
3338                        ..StoreOptions::default()
3339                    },
3340                    ..NewChildStoreOptions::default()
3341                },
3342            )
3343            .await
3344            .expect("new_volume failed");
3345        let mut transaction = fs
3346            .clone()
3347            .new_transaction(lock_keys![], Options::default())
3348            .await
3349            .expect("new_transaction failed");
3350        let child =
3351            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
3352                .await
3353                .expect("create_object failed");
3354        transaction.commit().await.expect("commit failed");
3355        assert!(store.key_manager.get(child.object_id()).await.unwrap().is_some());
3356        store
3357            .tombstone_object(child.object_id(), Options::default())
3358            .await
3359            .expect("tombstone_object failed");
3360        assert!(store.key_manager.get(child.object_id()).await.unwrap().is_none());
3361        fs.close().await.expect("close failed");
3362    }
3363
3364    #[fuchsia::test]
3365    async fn test_major_compaction_discards_unnecessary_records() {
3366        let fs = test_filesystem().await;
3367        let root_store = fs.root_store();
3368        let child_id = {
3369            let mut transaction = fs
3370                .clone()
3371                .new_transaction(lock_keys![], Options::default())
3372                .await
3373                .expect("new_transaction failed");
3374            let child = ObjectStore::create_object(
3375                &root_store,
3376                &mut transaction,
3377                HandleOptions::default(),
3378                None,
3379            )
3380            .await
3381            .expect("create_object failed");
3382            transaction.commit().await.expect("commit failed");
3383
3384            // Allocate an extent in the file.
3385            let mut buffer = child.allocate_buffer(8192).await;
3386            buffer.as_mut_slice().fill(0xaa);
3387            child.write_or_append(Some(0), buffer.as_ref()).await.expect("write failed");
3388
3389            child.object_id()
3390        };
3391
3392        root_store.tombstone_object(child_id, Options::default()).await.expect("tombstone failed");
3393        {
3394            let layers = root_store.tree.layer_set();
3395            let mut merger = layers.merger();
3396            let iter = merger
3397                .query(Query::FullRange(&ObjectKey::object(child_id)))
3398                .await
3399                .expect("seek failed");
3400            // Find at least one object still in the tree.
3401            match iter.get() {
3402                Some(ItemRef { key: ObjectKey { object_id, .. }, .. })
3403                    if *object_id == child_id => {}
3404                _ => panic!("Objects should still be in the tree."),
3405            }
3406        }
3407        root_store.flush().await.expect("flush failed");
3408
3409        // There should be no records for the object.
3410        let layers = root_store.tree.layer_set();
3411        let mut merger = layers.merger();
3412        let iter = merger
3413            .query(Query::FullRange(&ObjectKey::object(child_id)))
3414            .await
3415            .expect("seek failed");
3416        match iter.get() {
3417            None => {}
3418            Some(ItemRef { key: ObjectKey { object_id, .. }, .. }) => {
3419                assert_ne!(*object_id, child_id)
3420            }
3421        }
3422    }
3423
3424    #[fuchsia::test]
3425    async fn test_overlapping_extents_in_different_layers() {
3426        let fs = test_filesystem().await;
3427        let store = fs.root_store();
3428
3429        let mut transaction = fs
3430            .clone()
3431            .new_transaction(
3432                lock_keys![LockKey::object(
3433                    store.store_object_id(),
3434                    store.root_directory_object_id()
3435                )],
3436                Options::default(),
3437            )
3438            .await
3439            .expect("new_transaction failed");
3440        let root_directory =
3441            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
3442        let object = root_directory
3443            .create_child_file(&mut transaction, "test")
3444            .await
3445            .expect("create_child_file failed");
3446        transaction.commit().await.expect("commit failed");
3447
3448        let buf = object.allocate_buffer(16384).await;
3449        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3450
3451        store.flush().await.expect("flush failed");
3452
3453        object.write_or_append(Some(0), buf.subslice(0..4096)).await.expect("write failed");
3454
3455        // At this point, we should have an extent for 0..16384 in a layer that has been flushed,
3456        // and an extent for 0..4096 that partially overwrites it.  Writing to 0..16384 should
3457        // overwrite both of those extents.
3458        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3459
3460        fsck(fs.clone()).await.expect("fsck failed");
3461    }
3462
3463    #[fuchsia::test(threads = 10)]
3464    async fn test_encrypted_mutations() {
3465        async fn one_iteration(
3466            fs: OpenFxFilesystem,
3467            crypt: Arc<dyn Crypt>,
3468            iteration: u64,
3469        ) -> OpenFxFilesystem {
3470            async fn reopen(fs: OpenFxFilesystem) -> OpenFxFilesystem {
3471                fs.close().await.expect("Close failed");
3472                let device = fs.take_device().await;
3473                device.reopen(false);
3474                FxFilesystem::open(device).await.expect("FS open failed")
3475            }
3476
3477            let fs = reopen(fs).await;
3478
3479            let (store_object_id, object_id) = {
3480                let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3481                let store = root_volume
3482                    .volume(
3483                        "test",
3484                        StoreOptions { crypt: Some(crypt.clone()), ..StoreOptions::default() },
3485                    )
3486                    .await
3487                    .expect("volume failed");
3488
3489                let mut transaction = fs
3490                    .clone()
3491                    .new_transaction(
3492                        lock_keys![LockKey::object(
3493                            store.store_object_id(),
3494                            store.root_directory_object_id(),
3495                        )],
3496                        Options::default(),
3497                    )
3498                    .await
3499                    .expect("new_transaction failed");
3500                let root_directory = Directory::open(&store, store.root_directory_object_id())
3501                    .await
3502                    .expect("open failed");
3503                let object = root_directory
3504                    .create_child_file(&mut transaction, &format!("test {}", iteration))
3505                    .await
3506                    .expect("create_child_file failed");
3507                transaction.commit().await.expect("commit failed");
3508
3509                let mut buf = object.allocate_buffer(1000).await;
3510                for i in 0..buf.len() {
3511                    buf.as_mut_slice()[i] = i as u8;
3512                }
3513                object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3514
3515                (store.store_object_id(), object.object_id())
3516            };
3517
3518            let fs = reopen(fs).await;
3519
3520            let check_object = |fs: Arc<FxFilesystem>| {
3521                let crypt = crypt.clone();
3522                async move {
3523                    let root_volume = root_volume(fs).await.expect("root_volume failed");
3524                    let volume = root_volume
3525                        .volume(
3526                            "test",
3527                            StoreOptions { crypt: Some(crypt), ..StoreOptions::default() },
3528                        )
3529                        .await
3530                        .expect("volume failed");
3531
3532                    let object = ObjectStore::open_object(
3533                        &volume,
3534                        object_id,
3535                        HandleOptions::default(),
3536                        None,
3537                    )
3538                    .await
3539                    .expect("open_object failed");
3540                    let mut buf = object.allocate_buffer(1000).await;
3541                    assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), 1000);
3542                    for i in 0..buf.len() {
3543                        assert_eq!(buf.as_slice()[i], i as u8);
3544                    }
3545                }
3546            };
3547
3548            check_object(fs.clone()).await;
3549
3550            let fs = reopen(fs).await;
3551
3552            // At this point the "test" volume is locked.  Before checking the object, flush the
3553            // filesystem.  This should leave a file with encrypted mutations.
3554            fs.object_manager().flush().await.expect("flush failed");
3555
3556            assert_ne!(
3557                fs.object_manager()
3558                    .store(store_object_id)
3559                    .unwrap()
3560                    .load_store_info()
3561                    .await
3562                    .expect("load_store_info failed")
3563                    .encrypted_mutations_object_id,
3564                INVALID_OBJECT_ID
3565            );
3566
3567            check_object(fs.clone()).await;
3568
3569            // Checking the object should have triggered a flush and so now there should be no
3570            // encrypted mutations object.
3571            assert_eq!(
3572                fs.object_manager()
3573                    .store(store_object_id)
3574                    .unwrap()
3575                    .load_store_info()
3576                    .await
3577                    .expect("load_store_info failed")
3578                    .encrypted_mutations_object_id,
3579                INVALID_OBJECT_ID
3580            );
3581
3582            let fs = reopen(fs).await;
3583
3584            fsck(fs.clone()).await.expect("fsck failed");
3585
3586            let fs = reopen(fs).await;
3587
3588            check_object(fs.clone()).await;
3589
3590            fs
3591        }
3592
3593        let mut fs = test_filesystem().await;
3594        let crypt = Arc::new(new_insecure_crypt());
3595
3596        {
3597            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3598            let _store = root_volume
3599                .new_volume(
3600                    "test",
3601                    NewChildStoreOptions {
3602                        options: StoreOptions {
3603                            crypt: Some(crypt.clone()),
3604                            ..StoreOptions::default()
3605                        },
3606                        ..Default::default()
3607                    },
3608                )
3609                .await
3610                .expect("new_volume failed");
3611        }
3612
3613        // Run a few iterations so that we test changes with the stream cipher offset.
3614        for i in 0..5 {
3615            fs = one_iteration(fs, crypt.clone(), i).await;
3616        }
3617    }
3618
3619    #[test_case(true; "with a flush")]
3620    #[test_case(false; "without a flush")]
3621    #[fuchsia::test(threads = 10)]
3622    async fn test_object_id_cipher_roll(with_flush: bool) {
3623        let fs = test_filesystem().await;
3624        let crypt = Arc::new(new_insecure_crypt());
3625
3626        let expected_key = {
3627            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3628            let store = root_volume
3629                .new_volume(
3630                    "test",
3631                    NewChildStoreOptions {
3632                        options: StoreOptions {
3633                            crypt: Some(crypt.clone()),
3634                            ..StoreOptions::default()
3635                        },
3636                        ..Default::default()
3637                    },
3638                )
3639                .await
3640                .expect("new_volume failed");
3641
3642            // Create some files so that our in-memory copy of StoreInfo has changes (the object
3643            // count) pending a flush.
3644            let root_dir_id = store.root_directory_object_id();
3645            let root_dir =
3646                Arc::new(Directory::open(&store, root_dir_id).await.expect("open failed"));
3647            let mut transaction = fs
3648                .clone()
3649                .new_transaction(
3650                    lock_keys![LockKey::object(store.store_object_id(), root_dir_id)],
3651                    Options::default(),
3652                )
3653                .await
3654                .expect("new_transaction failed");
3655            for i in 0..10 {
3656                root_dir.create_child_file(&mut transaction, &format!("file {i}")).await.unwrap();
3657            }
3658            transaction.commit().await.expect("commit failed");
3659
3660            let orig_store_info = store.store_info().unwrap();
3661
3662            // Hack the last object ID to force a roll of the object ID cipher.
3663            {
3664                let mut last_object_id = store.last_object_id.lock();
3665                match &mut *last_object_id {
3666                    LastObjectId::Encrypted { id, .. } => {
3667                        assert_eq!(*id & OBJECT_ID_HI_MASK, 0);
3668                        *id |= 0xffffffff;
3669                    }
3670                    _ => unreachable!(),
3671                }
3672            }
3673
3674            let mut transaction = fs
3675                .clone()
3676                .new_transaction(
3677                    lock_keys![LockKey::object(
3678                        store.store_object_id(),
3679                        store.root_directory_object_id()
3680                    )],
3681                    Options::default(),
3682                )
3683                .await
3684                .expect("new_transaction failed");
3685            let root_directory = Directory::open(&store, store.root_directory_object_id())
3686                .await
3687                .expect("open failed");
3688            let object = root_directory
3689                .create_child_file(&mut transaction, "test")
3690                .await
3691                .expect("create_child_file failed");
3692            transaction.commit().await.expect("commit failed");
3693
3694            assert_eq!(object.object_id() & OBJECT_ID_HI_MASK, 1u64 << 32);
3695
3696            // Check that the key has been changed.
3697            let key = match (
3698                store.store_info().unwrap().last_object_id,
3699                orig_store_info.last_object_id,
3700            ) {
3701                (
3702                    LastObjectIdInfo::Encrypted { key, id },
3703                    LastObjectIdInfo::Encrypted { key: orig_key, .. },
3704                ) => {
3705                    assert_ne!(key, orig_key);
3706                    assert_eq!(id, 1u64 << 32);
3707                    key
3708                }
3709                _ => unreachable!(),
3710            };
3711
3712            if with_flush {
3713                fs.journal().compact().await.unwrap();
3714            }
3715
3716            let last_object_id = store.last_object_id.lock();
3717            assert_eq!(last_object_id.id(), 1u64 << 32);
3718            key
3719        };
3720
3721        fs.close().await.expect("Close failed");
3722        let device = fs.take_device().await;
3723        device.reopen(false);
3724        let fs = FxFilesystem::open(device).await.expect("open failed");
3725        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3726        let store = root_volume
3727            .volume("test", StoreOptions { crypt: Some(crypt.clone()), ..StoreOptions::default() })
3728            .await
3729            .expect("volume failed");
3730
3731        assert_matches!(store.store_info().unwrap().last_object_id, LastObjectIdInfo::Encrypted { key, .. } if key == expected_key);
3732        assert_eq!(store.last_object_id.lock().id(), 1u64 << 32);
3733
3734        fsck(fs.clone()).await.expect("fsck failed");
3735        fsck_volume(&fs, store.store_object_id(), None).await.expect("fsck_volume failed");
3736    }
3737
3738    #[fuchsia::test(threads = 2)]
3739    async fn test_race_object_id_cipher_roll_and_flush() {
3740        let fs = test_filesystem().await;
3741        let crypt = Arc::new(new_insecure_crypt());
3742
3743        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3744        let store = root_volume
3745            .new_volume(
3746                "test",
3747                NewChildStoreOptions {
3748                    options: StoreOptions { crypt: Some(crypt.clone()), ..StoreOptions::default() },
3749                    ..Default::default()
3750                },
3751            )
3752            .await
3753            .expect("new_volume failed");
3754
3755        assert!(matches!(&*store.last_object_id.lock(), LastObjectId::Encrypted { .. }));
3756
3757        // Create some files so that our in-memory copy of StoreInfo has changes (the object
3758        // count) pending a flush.
3759        let root_dir_id = store.root_directory_object_id();
3760        let root_dir = Arc::new(Directory::open(&store, root_dir_id).await.expect("open failed"));
3761
3762        let _executor_tasks = testing::force_executor_threads_to_run(2).await;
3763
3764        for j in 0..100 {
3765            let mut transaction = fs
3766                .clone()
3767                .new_transaction(
3768                    lock_keys![LockKey::object(store.store_object_id(), root_dir_id)],
3769                    Options::default(),
3770                )
3771                .await
3772                .expect("new_transaction failed");
3773            root_dir.create_child_file(&mut transaction, &format!("file {j}")).await.unwrap();
3774            transaction.commit().await.expect("commit failed");
3775
3776            let task = {
3777                let fs = fs.clone();
3778                fasync::Task::spawn(async move {
3779                    fs.journal().compact().await.unwrap();
3780                })
3781            };
3782
3783            // Hack the last object ID to force a roll of the object ID cipher.
3784            {
3785                let mut last_object_id = store.last_object_id.lock();
3786                let LastObjectId::Encrypted { id, .. } = &mut *last_object_id else {
3787                    unreachable!()
3788                };
3789                assert_eq!(*id >> 32, j);
3790                *id |= 0xffffffff;
3791            }
3792
3793            let mut transaction = fs
3794                .clone()
3795                .new_transaction(
3796                    lock_keys![LockKey::object(
3797                        store.store_object_id(),
3798                        store.root_directory_object_id()
3799                    )],
3800                    Options::default(),
3801                )
3802                .await
3803                .expect("new_transaction failed");
3804            let root_directory = Directory::open(&store, store.root_directory_object_id())
3805                .await
3806                .expect("open failed");
3807            root_directory
3808                .create_child_file(&mut transaction, "test {j}")
3809                .await
3810                .expect("create_child_file failed");
3811            transaction.commit().await.expect("commit failed");
3812
3813            task.await;
3814
3815            // Check that the key has been changed.
3816            let new_store_info = store.load_store_info().await.unwrap();
3817
3818            let LastObjectIdInfo::Encrypted { id, key } = new_store_info.last_object_id else {
3819                unreachable!()
3820            };
3821            assert_eq!(id >> 32, j + 1);
3822            let LastObjectIdInfo::Encrypted { key: in_memory_key, .. } =
3823                store.store_info().unwrap().last_object_id
3824            else {
3825                unreachable!()
3826            };
3827            assert_eq!(key, in_memory_key);
3828        }
3829
3830        fs.close().await.expect("Close failed");
3831    }
3832
3833    #[fuchsia::test]
3834    async fn test_object_id_no_roll_for_unencrypted_store() {
3835        let fs = test_filesystem().await;
3836
3837        {
3838            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3839            let store = root_volume
3840                .new_volume("test", NewChildStoreOptions::default())
3841                .await
3842                .expect("new_volume failed");
3843
3844            // Hack the last object ID.
3845            {
3846                let mut last_object_id = store.last_object_id.lock();
3847                match &mut *last_object_id {
3848                    LastObjectId::Unencrypted { id } => {
3849                        assert_eq!(*id & OBJECT_ID_HI_MASK, 0);
3850                        *id |= 0xffffffff;
3851                    }
3852                    _ => unreachable!(),
3853                }
3854            }
3855
3856            let mut transaction = fs
3857                .clone()
3858                .new_transaction(
3859                    lock_keys![LockKey::object(
3860                        store.store_object_id(),
3861                        store.root_directory_object_id()
3862                    )],
3863                    Options::default(),
3864                )
3865                .await
3866                .expect("new_transaction failed");
3867            let root_directory = Directory::open(&store, store.root_directory_object_id())
3868                .await
3869                .expect("open failed");
3870            let object = root_directory
3871                .create_child_file(&mut transaction, "test")
3872                .await
3873                .expect("create_child_file failed");
3874            transaction.commit().await.expect("commit failed");
3875
3876            assert_eq!(object.object_id(), 0x1_0000_0000);
3877
3878            // Check that there is still no key.
3879            assert_matches!(
3880                store.store_info().unwrap().last_object_id,
3881                LastObjectIdInfo::Unencrypted { .. }
3882            );
3883
3884            assert_eq!(store.last_object_id.lock().id(), 0x1_0000_0000);
3885        };
3886
3887        fs.close().await.expect("Close failed");
3888        let device = fs.take_device().await;
3889        device.reopen(false);
3890        let fs = FxFilesystem::open(device).await.expect("open failed");
3891        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3892        let store =
3893            root_volume.volume("test", StoreOptions::default()).await.expect("volume failed");
3894
3895        assert_eq!(store.last_object_id.lock().id(), 0x1_0000_0000);
3896    }
3897
3898    #[fuchsia::test]
3899    fn test_object_id_is_not_invalid_object_id() {
3900        let key = UnwrappedKey::new(vec![0; FXFS_KEY_SIZE]);
3901        // 1106634048 results in INVALID_OBJECT_ID with this key.
3902        let mut last_object_id =
3903            LastObjectId::Encrypted { id: 1106634047, cipher: Box::new(Ff1::new(&key)) };
3904        assert!(last_object_id.try_get_next().is_some());
3905        assert!(last_object_id.try_get_next().is_some());
3906    }
3907
3908    #[fuchsia::test]
3909    async fn test_last_object_id_is_correct_after_unlock() {
3910        let fs = test_filesystem().await;
3911        let crypt = Arc::new(new_insecure_crypt());
3912
3913        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3914        let store = root_volume
3915            .new_volume(
3916                "test",
3917                NewChildStoreOptions {
3918                    options: StoreOptions { crypt: Some(crypt.clone()), ..StoreOptions::default() },
3919                    ..Default::default()
3920                },
3921            )
3922            .await
3923            .expect("new_volume failed");
3924
3925        let mut transaction = fs
3926            .clone()
3927            .new_transaction(
3928                lock_keys![LockKey::object(
3929                    store.store_object_id(),
3930                    store.root_directory_object_id()
3931                )],
3932                Options::default(),
3933            )
3934            .await
3935            .expect("new_transaction failed");
3936        let root_directory =
3937            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
3938        root_directory
3939            .create_child_file(&mut transaction, "test")
3940            .await
3941            .expect("create_child_file failed");
3942        transaction.commit().await.expect("commit failed");
3943
3944        // Compact so that StoreInfo is written.
3945        fs.journal().compact().await.unwrap();
3946
3947        let last_object_id = store.last_object_id.lock().id();
3948
3949        store.lock().await.unwrap();
3950        store.unlock(NO_OWNER, crypt.clone()).await.unwrap();
3951
3952        assert_eq!(store.last_object_id.lock().id(), last_object_id);
3953    }
3954
3955    #[fuchsia::test(threads = 20)]
3956    async fn test_race_when_rolling_last_object_id_cipher() {
3957        // NOTE: This test is trying to test a race, so if it fails, it might be flaky.
3958
3959        const NUM_THREADS: usize = 20;
3960
3961        let fs = test_filesystem().await;
3962        let crypt = Arc::new(new_insecure_crypt());
3963
3964        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3965        let store = root_volume
3966            .new_volume(
3967                "test",
3968                NewChildStoreOptions {
3969                    options: StoreOptions { crypt: Some(crypt.clone()), ..StoreOptions::default() },
3970                    ..Default::default()
3971                },
3972            )
3973            .await
3974            .expect("new_volume failed");
3975
3976        let store_id = store.store_object_id();
3977        let root_dir_id = store.root_directory_object_id();
3978
3979        let root_directory =
3980            Arc::new(Directory::open(&store, root_dir_id).await.expect("open failed"));
3981
3982        // Create directories.
3983        let mut directories = Vec::new();
3984        for _ in 0..NUM_THREADS {
3985            let mut transaction = fs
3986                .clone()
3987                .new_transaction(
3988                    lock_keys![LockKey::object(store_id, root_dir_id,)],
3989                    Options::default(),
3990                )
3991                .await
3992                .expect("new_transaction failed");
3993            directories.push(
3994                root_directory
3995                    .create_child_dir(&mut transaction, "test")
3996                    .await
3997                    .expect("create_child_file failed"),
3998            );
3999            transaction.commit().await.expect("commit failed");
4000        }
4001
4002        // Hack the last object ID so that the next ID will require a roll.
4003        match &mut *store.last_object_id.lock() {
4004            LastObjectId::Encrypted { id, .. } => *id |= 0xffff_ffff,
4005            _ => unreachable!(),
4006        }
4007
4008        let scope = fasync::Scope::new();
4009
4010        let _executor_tasks = testing::force_executor_threads_to_run(NUM_THREADS).await;
4011
4012        for dir in directories {
4013            let fs = fs.clone();
4014            scope.spawn(async move {
4015                let mut transaction = fs
4016                    .clone()
4017                    .new_transaction(
4018                        lock_keys![LockKey::object(store_id, dir.object_id(),)],
4019                        Options::default(),
4020                    )
4021                    .await
4022                    .expect("new_transaction failed");
4023                dir.create_child_file(&mut transaction, "test")
4024                    .await
4025                    .expect("create_child_file failed");
4026                transaction.commit().await.expect("commit failed");
4027            });
4028        }
4029
4030        scope.on_no_tasks().await;
4031
4032        assert_eq!(store.last_object_id.lock().id(), 0x1_0000_0000 + NUM_THREADS as u64 - 1);
4033    }
4034
4035    #[fuchsia::test(threads = 10)]
4036    async fn test_lock_store() {
4037        let fs = test_filesystem().await;
4038        let crypt = Arc::new(new_insecure_crypt());
4039
4040        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
4041        let store = root_volume
4042            .new_volume(
4043                "test",
4044                NewChildStoreOptions {
4045                    options: StoreOptions { crypt: Some(crypt.clone()), ..StoreOptions::default() },
4046                    ..NewChildStoreOptions::default()
4047                },
4048            )
4049            .await
4050            .expect("new_volume failed");
4051        let mut transaction = fs
4052            .clone()
4053            .new_transaction(
4054                lock_keys![LockKey::object(
4055                    store.store_object_id(),
4056                    store.root_directory_object_id()
4057                )],
4058                Options::default(),
4059            )
4060            .await
4061            .expect("new_transaction failed");
4062        let root_directory =
4063            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
4064        root_directory
4065            .create_child_file(&mut transaction, "test")
4066            .await
4067            .expect("create_child_file failed");
4068        transaction.commit().await.expect("commit failed");
4069        store.lock().await.expect("lock failed");
4070
4071        store.unlock(NO_OWNER, crypt).await.expect("unlock failed");
4072        root_directory.lookup("test").await.expect("lookup failed").expect("not found");
4073    }
4074
4075    #[fuchsia::test(threads = 10)]
4076    async fn test_unlock_read_only() {
4077        let fs = test_filesystem().await;
4078        let crypt = Arc::new(new_insecure_crypt());
4079
4080        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
4081        let store = root_volume
4082            .new_volume(
4083                "test",
4084                NewChildStoreOptions {
4085                    options: StoreOptions { crypt: Some(crypt.clone()), ..StoreOptions::default() },
4086                    ..NewChildStoreOptions::default()
4087                },
4088            )
4089            .await
4090            .expect("new_volume failed");
4091        let mut transaction = fs
4092            .clone()
4093            .new_transaction(
4094                lock_keys![LockKey::object(
4095                    store.store_object_id(),
4096                    store.root_directory_object_id()
4097                )],
4098                Options::default(),
4099            )
4100            .await
4101            .expect("new_transaction failed");
4102        let root_directory =
4103            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
4104        root_directory
4105            .create_child_file(&mut transaction, "test")
4106            .await
4107            .expect("create_child_file failed");
4108        transaction.commit().await.expect("commit failed");
4109        store.lock().await.expect("lock failed");
4110
4111        store.unlock_read_only(crypt.clone()).await.expect("unlock failed");
4112        root_directory.lookup("test").await.expect("lookup failed").expect("not found");
4113        store.lock_read_only();
4114        store.unlock_read_only(crypt).await.expect("unlock failed");
4115        root_directory.lookup("test").await.expect("lookup failed").expect("not found");
4116    }
4117
4118    #[fuchsia::test(threads = 10)]
4119    async fn test_key_rolled_when_unlocked() {
4120        let fs = test_filesystem().await;
4121        let crypt = Arc::new(new_insecure_crypt());
4122
4123        let object_id;
4124        {
4125            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
4126            let store = root_volume
4127                .new_volume(
4128                    "test",
4129                    NewChildStoreOptions {
4130                        options: StoreOptions {
4131                            crypt: Some(crypt.clone()),
4132                            ..StoreOptions::default()
4133                        },
4134                        ..Default::default()
4135                    },
4136                )
4137                .await
4138                .expect("new_volume failed");
4139            let mut transaction = fs
4140                .clone()
4141                .new_transaction(
4142                    lock_keys![LockKey::object(
4143                        store.store_object_id(),
4144                        store.root_directory_object_id()
4145                    )],
4146                    Options::default(),
4147                )
4148                .await
4149                .expect("new_transaction failed");
4150            let root_directory = Directory::open(&store, store.root_directory_object_id())
4151                .await
4152                .expect("open failed");
4153            object_id = root_directory
4154                .create_child_file(&mut transaction, "test")
4155                .await
4156                .expect("create_child_file failed")
4157                .object_id();
4158            transaction.commit().await.expect("commit failed");
4159        }
4160
4161        fs.close().await.expect("Close failed");
4162        let mut device = fs.take_device().await;
4163
4164        // Repeatedly remount so that we can be sure that we can remount when there are many
4165        // mutations keys.
4166        for _ in 0..100 {
4167            device.reopen(false);
4168            let fs = FxFilesystem::open(device).await.expect("open failed");
4169            {
4170                let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
4171                let store = root_volume
4172                    .volume(
4173                        "test",
4174                        StoreOptions { crypt: Some(crypt.clone()), ..StoreOptions::default() },
4175                    )
4176                    .await
4177                    .expect("open_volume failed");
4178
4179                // The key should get rolled every time we unlock.
4180                assert_eq!(store.mutations_cipher.lock().as_ref().unwrap().offset(), 0);
4181
4182                // Make sure there's an encrypted mutation.
4183                let handle =
4184                    ObjectStore::open_object(&store, object_id, HandleOptions::default(), None)
4185                        .await
4186                        .expect("open_object failed");
4187                let buffer = handle.allocate_buffer(100).await;
4188                handle
4189                    .write_or_append(Some(0), buffer.as_ref())
4190                    .await
4191                    .expect("write_or_append failed");
4192            }
4193            fs.close().await.expect("Close failed");
4194            device = fs.take_device().await;
4195        }
4196    }
4197
4198    #[test]
4199    fn test_store_info_max_serialized_size() {
4200        let info = StoreInfo {
4201            guid: [0xff; 16],
4202            last_object_id: LastObjectIdInfo::Encrypted {
4203                id: 0x1234567812345678,
4204                key: FxfsKey {
4205                    wrapping_key_id: 0x1234567812345678u128.to_le_bytes(),
4206                    key: WrappedKeyBytes::from([0xff; FXFS_WRAPPED_KEY_SIZE]),
4207                },
4208            },
4209            // Worst case, each layer should be 3/4 the size of the layer below it (because of the
4210            // compaction policy we're using).  If the smallest layer is 8,192 bytes, then 120
4211            // layers would take up a size that exceeds a 64 bit unsigned integer, so if this fits,
4212            // any size should fit.
4213            layers: vec![0x1234567812345678; 120],
4214            root_directory_object_id: 0x1234567812345678,
4215            graveyard_directory_object_id: 0x1234567812345678,
4216            object_count: 0x1234567812345678,
4217            mutations_key: Some(FxfsKey {
4218                wrapping_key_id: 0x1234567812345678u128.to_le_bytes(),
4219                key: WrappedKeyBytes::from([0xff; FXFS_WRAPPED_KEY_SIZE]),
4220            }),
4221            mutations_cipher_offset: 0x1234567812345678,
4222            encrypted_mutations_object_id: 0x1234567812345678,
4223            internal_directory_object_id: INVALID_OBJECT_ID,
4224        };
4225        let mut serialized_info = Vec::new();
4226        info.serialize_with_version(&mut serialized_info).unwrap();
4227        assert!(
4228            serialized_info.len() <= MAX_STORE_INFO_SERIALIZED_SIZE,
4229            "{}",
4230            serialized_info.len()
4231        );
4232    }
4233
4234    async fn reopen_after_crypt_failure_inner(read_only: bool) {
4235        let fs = test_filesystem().await;
4236        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
4237
4238        let store = {
4239            let crypt = Arc::new(new_insecure_crypt());
4240            let store = root_volume
4241                .new_volume(
4242                    "vol",
4243                    NewChildStoreOptions {
4244                        options: StoreOptions {
4245                            crypt: Some(crypt.clone()),
4246                            ..StoreOptions::default()
4247                        },
4248                        ..Default::default()
4249                    },
4250                )
4251                .await
4252                .expect("new_volume failed");
4253            let root_directory = Directory::open(&store, store.root_directory_object_id())
4254                .await
4255                .expect("open failed");
4256            let mut transaction = fs
4257                .clone()
4258                .new_transaction(
4259                    lock_keys![LockKey::object(
4260                        store.store_object_id(),
4261                        root_directory.object_id()
4262                    )],
4263                    Options::default(),
4264                )
4265                .await
4266                .expect("new_transaction failed");
4267            root_directory
4268                .create_child_file(&mut transaction, "test")
4269                .await
4270                .expect("create_child_file failed");
4271            transaction.commit().await.expect("commit failed");
4272
4273            crypt.shutdown();
4274            let mut transaction = fs
4275                .clone()
4276                .new_transaction(
4277                    lock_keys![LockKey::object(
4278                        store.store_object_id(),
4279                        root_directory.object_id()
4280                    )],
4281                    Options::default(),
4282                )
4283                .await
4284                .expect("new_transaction failed");
4285            root_directory
4286                .create_child_file(&mut transaction, "test2")
4287                .await
4288                .map(|_| ())
4289                .expect_err("create_child_file should fail");
4290            store.lock().await.expect("lock failed");
4291            store
4292        };
4293
4294        let crypt = Arc::new(new_insecure_crypt());
4295        if read_only {
4296            store.unlock_read_only(crypt).await.expect("unlock failed");
4297        } else {
4298            store.unlock(NO_OWNER, crypt).await.expect("unlock failed");
4299        }
4300        let root_directory =
4301            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
4302        root_directory.lookup("test").await.expect("lookup failed").expect("not found");
4303    }
4304
4305    #[fuchsia::test(threads = 10)]
4306    async fn test_reopen_after_crypt_failure() {
4307        reopen_after_crypt_failure_inner(false).await;
4308    }
4309
4310    #[fuchsia::test(threads = 10)]
4311    async fn test_reopen_read_only_after_crypt_failure() {
4312        reopen_after_crypt_failure_inner(true).await;
4313    }
4314
4315    #[fuchsia::test(threads = 10)]
4316    #[should_panic(expected = "Insufficient reservation space")]
4317    #[cfg(debug_assertions)]
4318    async fn large_transaction_causes_panic_in_debug_builds() {
4319        let fs = test_filesystem().await;
4320        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
4321        let store = root_volume
4322            .new_volume("vol", NewChildStoreOptions::default())
4323            .await
4324            .expect("new_volume failed");
4325        let root_directory =
4326            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
4327        let mut transaction = fs
4328            .clone()
4329            .new_transaction(
4330                lock_keys![LockKey::object(store.store_object_id(), root_directory.object_id())],
4331                Options::default(),
4332            )
4333            .await
4334            .expect("transaction");
4335        for i in 0..500 {
4336            root_directory
4337                .create_symlink(&mut transaction, b"link", &format!("{}", i))
4338                .await
4339                .expect("symlink");
4340        }
4341        assert_eq!(transaction.commit().await.expect("commit"), 0);
4342    }
4343
4344    #[fuchsia::test]
4345    async fn test_crypt_failure_does_not_fuse_journal() {
4346        let fs = test_filesystem().await;
4347
4348        struct Owner;
4349        #[async_trait]
4350        impl StoreOwner for Owner {
4351            async fn force_lock(self: Arc<Self>, store: &ObjectStore) -> Result<(), anyhow::Error> {
4352                store.lock().await
4353            }
4354        }
4355        let owner = Arc::new(Owner) as Arc<dyn StoreOwner>;
4356
4357        {
4358            // Create two stores and a record for each store, so the journal will need to flush them
4359            // both later.
4360            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
4361            let store1 = root_volume
4362                .new_volume(
4363                    "vol1",
4364                    NewChildStoreOptions {
4365                        options: StoreOptions {
4366                            crypt: Some(Arc::new(new_insecure_crypt())),
4367                            ..StoreOptions::default()
4368                        },
4369                        ..Default::default()
4370                    },
4371                )
4372                .await
4373                .expect("new_volume failed");
4374            let crypt = Arc::new(new_insecure_crypt());
4375            let store2 = root_volume
4376                .new_volume(
4377                    "vol2",
4378                    NewChildStoreOptions {
4379                        options: StoreOptions {
4380                            owner: Arc::downgrade(&owner),
4381                            crypt: Some(crypt.clone()),
4382                        },
4383                        ..Default::default()
4384                    },
4385                )
4386                .await
4387                .expect("new_volume failed");
4388            for store in [&store1, &store2] {
4389                let root_directory = Directory::open(store, store.root_directory_object_id())
4390                    .await
4391                    .expect("open failed");
4392                let mut transaction = fs
4393                    .clone()
4394                    .new_transaction(
4395                        lock_keys![LockKey::object(
4396                            store.store_object_id(),
4397                            root_directory.object_id()
4398                        )],
4399                        Options::default(),
4400                    )
4401                    .await
4402                    .expect("new_transaction failed");
4403                root_directory
4404                    .create_child_file(&mut transaction, "test")
4405                    .await
4406                    .expect("create_child_file failed");
4407                transaction.commit().await.expect("commit failed");
4408            }
4409            // Shut down the crypt instance for store2, and then compact.  Compaction should not
4410            // fail, and the store should become locked.
4411            crypt.shutdown();
4412            fs.journal().compact().await.expect("compact failed");
4413            // The store should now be locked.
4414            assert!(store2.is_locked());
4415        }
4416
4417        // Even though the store wasn't flushed, the mutation to store2 will still be valid as it is
4418        // held in the journal.
4419        fs.close().await.expect("close failed");
4420        let device = fs.take_device().await;
4421        device.reopen(false);
4422        let fs = FxFilesystem::open(device).await.expect("open failed");
4423        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
4424
4425        for volume_name in ["vol1", "vol2"] {
4426            let store = root_volume
4427                .volume(
4428                    volume_name,
4429                    StoreOptions {
4430                        crypt: Some(Arc::new(new_insecure_crypt())),
4431                        ..StoreOptions::default()
4432                    },
4433                )
4434                .await
4435                .expect("open volume failed");
4436            let root_directory = Directory::open(&store, store.root_directory_object_id())
4437                .await
4438                .expect("open failed");
4439            assert!(root_directory.lookup("test").await.expect("lookup failed").is_some());
4440        }
4441
4442        fs.close().await.expect("close failed");
4443    }
4444
4445    #[fuchsia::test]
4446    async fn test_crypt_failure_during_unlock_race() {
4447        let fs = test_filesystem().await;
4448
4449        struct Owner;
4450        #[async_trait]
4451        impl StoreOwner for Owner {
4452            async fn force_lock(self: Arc<Self>, store: &ObjectStore) -> Result<(), anyhow::Error> {
4453                store.lock().await
4454            }
4455        }
4456        let owner = Arc::new(Owner) as Arc<dyn StoreOwner>;
4457
4458        let store_object_id = {
4459            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
4460            let store = root_volume
4461                .new_volume(
4462                    "vol",
4463                    NewChildStoreOptions {
4464                        options: StoreOptions {
4465                            owner: Arc::downgrade(&owner),
4466                            crypt: Some(Arc::new(new_insecure_crypt())),
4467                        },
4468                        ..Default::default()
4469                    },
4470                )
4471                .await
4472                .expect("new_volume failed");
4473            let root_directory = Directory::open(&store, store.root_directory_object_id())
4474                .await
4475                .expect("open failed");
4476            let mut transaction = fs
4477                .clone()
4478                .new_transaction(
4479                    lock_keys![LockKey::object(
4480                        store.store_object_id(),
4481                        root_directory.object_id()
4482                    )],
4483                    Options::default(),
4484                )
4485                .await
4486                .expect("new_transaction failed");
4487            root_directory
4488                .create_child_file(&mut transaction, "test")
4489                .await
4490                .expect("create_child_file failed");
4491            transaction.commit().await.expect("commit failed");
4492            store.store_object_id()
4493        };
4494
4495        fs.close().await.expect("close failed");
4496        let device = fs.take_device().await;
4497        device.reopen(false);
4498
4499        let fs = FxFilesystem::open(device).await.expect("open failed");
4500        {
4501            let fs_clone = fs.clone();
4502            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
4503
4504            let crypt = Arc::new(new_insecure_crypt());
4505            let crypt_clone = crypt.clone();
4506            join!(
4507                async move {
4508                    // Unlock might fail, so ignore errors.
4509                    let _ = root_volume
4510                        .volume(
4511                            "vol",
4512                            StoreOptions {
4513                                owner: Arc::downgrade(&owner),
4514                                crypt: Some(crypt_clone),
4515                            },
4516                        )
4517                        .await;
4518                },
4519                async move {
4520                    // Block until unlock is finished but before flushing due to unlock is finished, to
4521                    // maximize the chances of weirdness.
4522                    let keys = lock_keys![LockKey::flush(store_object_id)];
4523                    let _ = fs_clone.lock_manager().write_lock(keys).await;
4524                    crypt.shutdown();
4525                }
4526            );
4527        }
4528
4529        fs.close().await.expect("close failed");
4530        let device = fs.take_device().await;
4531        device.reopen(false);
4532
4533        let fs = FxFilesystem::open(device).await.expect("open failed");
4534        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
4535        let store = root_volume
4536            .volume(
4537                "vol",
4538                StoreOptions {
4539                    crypt: Some(Arc::new(new_insecure_crypt())),
4540                    ..StoreOptions::default()
4541                },
4542            )
4543            .await
4544            .expect("open volume failed");
4545        let root_directory =
4546            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
4547        assert!(root_directory.lookup("test").await.expect("lookup failed").is_some());
4548
4549        fs.close().await.expect("close failed");
4550    }
4551
4552    #[fuchsia::test]
4553    async fn test_low_32_bit_object_ids() {
4554        let device = DeviceHolder::new(FakeDevice::new(16384, TEST_DEVICE_BLOCK_SIZE));
4555        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
4556
4557        {
4558            let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
4559
4560            let store = root_vol
4561                .new_volume(
4562                    "test",
4563                    NewChildStoreOptions { low_32_bit_object_ids: true, ..Default::default() },
4564                )
4565                .await
4566                .expect("new_volume failed");
4567
4568            let root_dir = Directory::open(&store, store.root_directory_object_id())
4569                .await
4570                .expect("open failed");
4571
4572            let mut ids = std::collections::HashSet::new();
4573
4574            for i in 0..100 {
4575                let mut transaction = fs
4576                    .clone()
4577                    .new_transaction(
4578                        lock_keys![LockKey::object(store.store_object_id(), root_dir.object_id())],
4579                        Options::default(),
4580                    )
4581                    .await
4582                    .expect("new_transaction failed");
4583
4584                for j in 0..100 {
4585                    let object = root_dir
4586                        .create_child_dir(&mut transaction, &format!("{i}.{j}"))
4587                        .await
4588                        .expect("create_child_file failed");
4589
4590                    assert!(object.object_id() < 1 << 32);
4591                    assert_ne!(object.object_id(), INVALID_OBJECT_ID);
4592                    assert!(ids.insert(object.object_id()));
4593                }
4594
4595                transaction.commit().await.expect("commit failed");
4596            }
4597
4598            assert_matches!(store.store_info().unwrap().last_object_id, LastObjectIdInfo::Low32Bit);
4599
4600            fsck_volume(&fs, store.store_object_id(), None).await.expect("fsck_volume failed");
4601        }
4602
4603        // Verify persistence
4604        fs.close().await.expect("Close failed");
4605        let device = fs.take_device().await;
4606        device.reopen(false);
4607        let fs = FxFilesystem::open(device).await.expect("open failed");
4608        let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
4609        let store = root_vol.volume("test", StoreOptions::default()).await.expect("volume failed");
4610
4611        // Check that we can still create files and they have low 32-bit IDs.
4612        let root_dir =
4613            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
4614        let mut transaction = fs
4615            .clone()
4616            .new_transaction(
4617                lock_keys![LockKey::object(store.store_object_id(), root_dir.object_id())],
4618                Options::default(),
4619            )
4620            .await
4621            .expect("new_transaction failed");
4622
4623        let object = root_dir
4624            .create_child_file(&mut transaction, "persistence_check")
4625            .await
4626            .expect("create_child_file failed");
4627        assert!(object.object_id() < 1 << 32);
4628
4629        transaction.commit().await.expect("commit failed");
4630
4631        assert_matches!(store.store_info().unwrap().last_object_id, LastObjectIdInfo::Low32Bit);
4632    }
4633}