Skip to main content

fxfs/
object_store.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5pub mod allocator;
6pub mod caching_object_handle;
7pub mod data_object_handle;
8pub mod directory;
9mod extent_mapping_iterator;
10mod extent_record;
11mod flush;
12pub mod graveyard;
13mod install;
14pub mod journal;
15mod key_manager;
16pub(crate) mod merge;
17pub mod object_manager;
18pub mod object_record;
19pub mod project_id;
20mod store_object_handle;
21pub mod transaction;
22mod tree;
23mod tree_cache;
24pub mod volume;
25
26pub use data_object_handle::{
27    DataObjectHandle, DirectWriter, FileExtent, FsverityState, FsverityStateInner, RangeType,
28};
29pub use directory::Directory;
30pub use object_record::{ChildValue, ObjectDescriptor, PosixAttributes, Timestamp};
31pub use store_object_handle::{
32    EXTENDED_ATTRIBUTE_RANGE_END, EXTENDED_ATTRIBUTE_RANGE_START, SetExtendedAttributeMode,
33    StoreObjectHandle,
34};
35
36use crate::errors::FxfsError;
37use crate::filesystem::{
38    ApplyContext, ApplyMode, FxFilesystem, JournalingObject, MAX_FILE_SIZE, SyncOptions,
39    TruncateGuard, TxnGuard,
40};
41use crate::log::*;
42use crate::lsm_tree::cache::{NullCache, ObjectCache};
43use crate::lsm_tree::types::{Existence, Item, ItemRef, LayerIterator};
44use crate::lsm_tree::{LSMTree, Query};
45use crate::object_handle::{INVALID_OBJECT_ID, ObjectHandle, ObjectProperties, ReadObjectHandle};
46use crate::object_store::allocator::Allocator;
47use crate::object_store::graveyard::Graveyard;
48use crate::object_store::journal::{JournalCheckpoint, JournalCheckpointV32, JournaledTransaction};
49use crate::object_store::key_manager::KeyManager;
50use crate::object_store::transaction::{
51    AssocObj, AssociatedObject, LockKey, ObjectStoreMutation, Operation, Options, Transaction,
52    lock_keys,
53};
54use crate::range::RangeExt;
55use crate::round::round_up;
56use crate::serialized_types::{Version, Versioned, VersionedLatest};
57use anyhow::{Context, Error, anyhow, bail, ensure};
58use async_trait::async_trait;
59use fidl_fuchsia_io as fio;
60use fprint::TypeFingerprint;
61use fuchsia_sync::Mutex;
62use fxfs_crypto::ff1::Ff1;
63use fxfs_crypto::{
64    CipherHolder, Crypt, KeyPurpose, ObjectType, StreamCipher, UnwrappedKey, WrappingKeyId,
65    key_to_cipher,
66};
67use fxfs_macros::{Migrate, migrate_to_version};
68use rand::RngCore;
69use scopeguard::ScopeGuard;
70use serde::{Deserialize, Serialize};
71use std::collections::HashSet;
72use std::fmt;
73use std::num::NonZero;
74use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
75use std::sync::{Arc, OnceLock, Weak};
76use storage_device::Device;
77use uuid::Uuid;
78
79pub use extent_record::{
80    BLOB_MERKLE_ATTRIBUTE_ID, BLOB_METADATA_ATTRIBUTE_ID, DEFAULT_DATA_ATTRIBUTE_ID, ExtentKey,
81    ExtentMode, ExtentValue, FSVERITY_MERKLE_ATTRIBUTE_ID,
82};
83pub use object_record::{
84    AttributeKey, EncryptionKey, EncryptionKeys, ExtendedAttributeValue, FsverityMetadata, FxfsKey,
85    FxfsKeyV40, FxfsKeyV49, ObjectAttributes, ObjectKey, ObjectKeyData, ObjectKind, ObjectValue,
86    ProjectProperty, RootDigest,
87};
88pub use transaction::Mutation;
89
90// For encrypted stores, the lower 32 bits of the object ID are encrypted to make side-channel
91// attacks more difficult. This mask can be used to extract the hi part of the object ID.
92const OBJECT_ID_HI_MASK: u64 = 0xffffffff00000000;
93
94// At time of writing, this threshold limits transactions that delete extents to about 10,000 bytes.
95const TRANSACTION_MUTATION_THRESHOLD: usize = 200;
96
97// Encrypted files and directories use the fscrypt key (identified by `FSCRYPT_KEY_ID`) to encrypt
98// file contents and filenames respectively. All non-fscrypt encrypted files otherwise default to
99// using the `VOLUME_DATA_KEY_ID` key. Note, the filesystem always uses the `VOLUME_DATA_KEY_ID`
100// key to encrypt large extended attributes. Thus, encrypted files and directories with large
101// xattrs will have both an fscrypt and volume data key.
102pub const VOLUME_DATA_KEY_ID: u64 = 0;
103pub const FSCRYPT_KEY_ID: u64 = 1;
104
105/// A constant that can be used where an owner is expected of type `Weak<dyn StoreOwner>` but no
106/// owner is required.
107pub const NO_OWNER: Weak<()> = Weak::new();
108impl StoreOwner for () {}
109
110#[async_trait]
111pub trait StoreOwner: Send + Sync {
112    /// Forcibly lock the store.  This exists to give the StoreOwner an opportunity to clean up
113    /// tasks which might access the store before locking it, because ObjectStore::unlock can only
114    /// be called when the store is not in use.
115    async fn force_lock(self: Arc<Self>, _store: &ObjectStore) -> Result<(), Error> {
116        Err(anyhow!(FxfsError::Internal))
117    }
118}
119
120/// DataObjectHandle stores an owner that must implement this trait, which allows the handle to get
121/// back to an ObjectStore.
122pub trait HandleOwner: AsRef<ObjectStore> + Send + Sync + 'static {}
123
124/// StoreInfo stores information about the object store.  This is stored within the parent object
125/// store, and is used, for example, to get the persistent layer objects.
126pub type StoreInfo = StoreInfoV52;
127
128#[derive(Clone, Debug, Default, Serialize, Deserialize, TypeFingerprint, Versioned)]
129pub struct StoreInfoV52 {
130    /// The globally unique identifier for the associated object store. If unset, will be all zero.
131    guid: [u8; 16],
132
133    /// The last used object ID.  Note that this field is not accurate in memory; ObjectStore's
134    /// last_object_id field is the one to use in that case.  Technically, this might not be the
135    /// last object ID used for the latest transaction that created an object because we use this at
136    /// the point of creating the object but before we commit the transaction.  Transactions can
137    /// then get committed in an arbitrary order (or not at all).
138    last_object_id: LastObjectIdInfo,
139
140    /// Object ids for layers.  TODO(https://fxbug.dev/42178036): need a layer of indirection here
141    /// so we can support snapshots.
142    pub layers: Vec<u64>,
143
144    /// The object ID for the root directory.
145    root_directory_object_id: u64,
146
147    /// The object ID for the graveyard.
148    graveyard_directory_object_id: u64,
149
150    /// The number of live objects in the store.  This should *not* be trusted; it can be invalid
151    /// due to filesystem inconsistencies.
152    object_count: u64,
153
154    /// The (wrapped) key that encrypted mutations should use.
155    mutations_key: Option<FxfsKeyV49>,
156
157    /// Mutations for the store are encrypted using a stream cipher.  To decrypt the mutations, we
158    /// need to know the offset in the cipher stream to start it.
159    mutations_cipher_offset: u64,
160
161    /// If we have to flush the store whilst we do not have the key, we need to write the encrypted
162    /// mutations to an object. This is the object ID of that file if it exists.
163    pub encrypted_mutations_object_id: u64,
164
165    /// A directory for storing internal files in a directory structure. Holds INVALID_OBJECT_ID
166    /// when the directory doesn't yet exist.
167    internal_directory_object_id: u64,
168}
169
170#[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint)]
171enum LastObjectIdInfo {
172    Unencrypted {
173        id: u64,
174    },
175    Encrypted {
176        /// The *unencrypted* value of the last object ID.
177        id: u64,
178
179        /// Object IDs are encrypted to reduce the amount of information that sequential object IDs
180        /// reveal (such as the number of files in the system and the ordering of their creation in
181        /// time).  Only the bottom 32 bits of the object ID are encrypted whilst the top 32 bits
182        /// will increment after 2^32 object IDs have been used and this allows us to roll the key.
183        key: FxfsKeyV49,
184    },
185    Low32Bit,
186}
187
188impl Default for LastObjectIdInfo {
189    fn default() -> Self {
190        LastObjectIdInfo::Unencrypted { id: 0 }
191    }
192}
193
194#[derive(Default, Serialize, Deserialize, TypeFingerprint, Versioned)]
195pub struct StoreInfoV49 {
196    guid: [u8; 16],
197    last_object_id: u64,
198    layers: Vec<u64>,
199    root_directory_object_id: u64,
200    graveyard_directory_object_id: u64,
201    object_count: u64,
202    mutations_key: Option<FxfsKeyV49>,
203    mutations_cipher_offset: u64,
204    encrypted_mutations_object_id: u64,
205    object_id_key: Option<FxfsKeyV49>,
206    internal_directory_object_id: u64,
207}
208
209impl From<StoreInfoV49> for StoreInfoV52 {
210    fn from(value: StoreInfoV49) -> Self {
211        Self {
212            guid: value.guid,
213            last_object_id: if let Some(key) = value.object_id_key {
214                LastObjectIdInfo::Encrypted { id: value.last_object_id, key: key }
215            } else {
216                LastObjectIdInfo::Unencrypted { id: value.last_object_id }
217            },
218            layers: value.layers,
219            root_directory_object_id: value.root_directory_object_id,
220            graveyard_directory_object_id: value.graveyard_directory_object_id,
221            object_count: value.object_count,
222            mutations_key: value.mutations_key,
223            mutations_cipher_offset: value.mutations_cipher_offset,
224            encrypted_mutations_object_id: value.encrypted_mutations_object_id,
225            internal_directory_object_id: value.internal_directory_object_id,
226        }
227    }
228}
229
230#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
231#[migrate_to_version(StoreInfoV49)]
232pub struct StoreInfoV40 {
233    guid: [u8; 16],
234    last_object_id: u64,
235    layers: Vec<u64>,
236    root_directory_object_id: u64,
237    graveyard_directory_object_id: u64,
238    object_count: u64,
239    mutations_key: Option<FxfsKeyV40>,
240    mutations_cipher_offset: u64,
241    encrypted_mutations_object_id: u64,
242    object_id_key: Option<FxfsKeyV40>,
243    internal_directory_object_id: u64,
244}
245
246impl StoreInfo {
247    /// Returns the parent objects for this store.
248    pub fn parent_objects(&self) -> Vec<u64> {
249        // We should not include the ID of the store itself, since that should be referred to in the
250        // volume directory.
251        let mut objects = self.layers.to_vec();
252        if self.encrypted_mutations_object_id != INVALID_OBJECT_ID {
253            objects.push(self.encrypted_mutations_object_id);
254        }
255        objects
256    }
257}
258
259// TODO(https://fxbug.dev/42178037): We should test or put checks in place to ensure this limit isn't exceeded.
260// It will likely involve placing limits on the maximum number of layers.
261pub const MAX_STORE_INFO_SERIALIZED_SIZE: usize = 131072;
262
263// This needs to be large enough to accommodate the maximum amount of unflushed data (data that is
264// in the journal but hasn't yet been written to layer files) for a store.  We set a limit because
265// we want to limit the amount of memory use in the case the filesystem is corrupt or under attack.
266pub const MAX_ENCRYPTED_MUTATIONS_SIZE: usize = 8 * journal::DEFAULT_RECLAIM_SIZE as usize;
267
268#[derive(Default)]
269pub struct HandleOptions {
270    /// If true, transactions used by this handle will skip journal space checks.
271    pub skip_journal_checks: bool,
272    /// If true, data written to any attribute of this handle will not have per-block checksums
273    /// computed.
274    pub skip_checksums: bool,
275    /// If true, any files using fsverity will not attempt to perform any verification. This is
276    /// useful to open an object without the correct encryption keys to look at the metadata.
277    pub skip_fsverity: bool,
278}
279
280/// Parameters for encrypting a newly created object.
281pub struct ObjectEncryptionOptions {
282    /// If set, the keys are treated as permanent and never evicted from the KeyManager cache.
283    /// This is necessary when keys are managed by another store; for example, the layer files
284    /// of a child store are objects in the root store, but they are encrypted with keys from the
285    /// child store.  Generally, most objects should have this set to `false`.
286    pub permanent: bool,
287    pub key_id: u64,
288    pub key: EncryptionKey,
289    pub unwrapped_key: UnwrappedKey,
290}
291
292pub struct StoreOptions {
293    /// The owner of the store.
294    pub owner: Weak<dyn StoreOwner>,
295
296    /// The store is unencrypted if store is none.
297    pub crypt: Option<Arc<dyn Crypt>>,
298}
299
300impl Default for StoreOptions {
301    fn default() -> Self {
302        Self { owner: NO_OWNER, crypt: None }
303    }
304}
305
306#[derive(Default)]
307pub struct NewChildStoreOptions {
308    pub options: StoreOptions,
309
310    /// Specifies the object ID in the root store to be used for the store.  If set to
311    /// INVALID_OBJECT_ID (the default and typical case), a suitable ID will be chosen.
312    pub object_id: u64,
313
314    /// If true, reserve all 32 bit object_ids.  All new objects will start with IDs exceeding
315    /// 0x1_0000_0000.
316    pub reserve_32bit_object_ids: bool,
317
318    /// Object IDs will be restricted to 32 bits.  This involves a less performant algorithm and so
319    /// should not be used unless necessary.
320    pub low_32_bit_object_ids: bool,
321
322    /// If set, use this GUID for the new store.
323    pub guid: Option<[u8; 16]>,
324}
325
326pub type EncryptedMutations = EncryptedMutationsV49;
327
328#[derive(Clone, Default, Deserialize, Serialize, TypeFingerprint)]
329pub struct EncryptedMutationsV49 {
330    // Information about the mutations are held here, but the actual encrypted data is held within
331    // data.  For each transaction, we record the checkpoint and the count of mutations within the
332    // transaction.  The checkpoint is required for the log file offset (which we need to apply the
333    // mutations), and the version so that we can correctly decode the mutation after it has been
334    // decrypted. The count specifies the number of serialized mutations encoded in |data|.
335    transactions: Vec<(JournalCheckpointV32, u64)>,
336
337    // The encrypted mutations.
338    data: Vec<u8>,
339
340    // If the mutations key was rolled, this holds the offset in `data` where the new key should
341    // apply.
342    mutations_key_roll: Vec<(usize, FxfsKeyV49)>,
343}
344
345impl std::fmt::Debug for EncryptedMutations {
346    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
347        f.debug_struct("EncryptedMutations")
348            .field("transactions", &self.transactions)
349            .field("len", &self.data.len())
350            .field(
351                "mutations_key_roll",
352                &self.mutations_key_roll.iter().map(|k| k.0).collect::<Vec<usize>>(),
353            )
354            .finish()
355    }
356}
357
358impl Versioned for EncryptedMutations {
359    fn max_serialized_size() -> Option<u64> {
360        Some(MAX_ENCRYPTED_MUTATIONS_SIZE as u64)
361    }
362}
363
364impl From<EncryptedMutationsV40> for EncryptedMutationsV49 {
365    fn from(value: EncryptedMutationsV40) -> Self {
366        EncryptedMutationsV49 {
367            transactions: value.transactions,
368            data: value.data,
369            mutations_key_roll: value
370                .mutations_key_roll
371                .into_iter()
372                .map(|(offset, key)| (offset, key.into()))
373                .collect(),
374        }
375    }
376}
377
378#[derive(Deserialize, Serialize, TypeFingerprint)]
379pub struct EncryptedMutationsV40 {
380    transactions: Vec<(JournalCheckpointV32, u64)>,
381    data: Vec<u8>,
382    mutations_key_roll: Vec<(usize, FxfsKeyV40)>,
383}
384
385impl Versioned for EncryptedMutationsV40 {
386    fn max_serialized_size() -> Option<u64> {
387        Some(MAX_ENCRYPTED_MUTATIONS_SIZE as u64)
388    }
389}
390
391impl EncryptedMutations {
392    fn from_replayed_mutations(
393        store_object_id: u64,
394        transactions: Vec<JournaledTransaction>,
395    ) -> Self {
396        let mut this = Self::default();
397        for JournaledTransaction { checkpoint, non_root_mutations, .. } in transactions {
398            for (object_id, mutation) in non_root_mutations {
399                if store_object_id == object_id {
400                    if let Mutation::EncryptedObjectStore(data) = mutation {
401                        this.push(&checkpoint, data);
402                    } else if let Mutation::UpdateMutationsKey(key) = mutation {
403                        this.mutations_key_roll.push((this.data.len(), key.into()));
404                    }
405                }
406            }
407        }
408        this
409    }
410
411    fn extend(&mut self, other: &EncryptedMutations) {
412        self.transactions.extend_from_slice(&other.transactions[..]);
413        self.mutations_key_roll.extend(
414            other
415                .mutations_key_roll
416                .iter()
417                .map(|(offset, key)| (offset + self.data.len(), key.clone())),
418        );
419        self.data.extend_from_slice(&other.data[..]);
420    }
421
422    fn push(&mut self, checkpoint: &JournalCheckpoint, data: Box<[u8]>) {
423        self.data.append(&mut data.into());
424        // If the checkpoint is the same as the last mutation we pushed, increment the count.
425        if let Some((last_checkpoint, count)) = self.transactions.last_mut() {
426            if last_checkpoint.file_offset == checkpoint.file_offset {
427                *count += 1;
428                return;
429            }
430        }
431        self.transactions.push((checkpoint.clone(), 1));
432    }
433}
434
435pub enum LockState {
436    Locked,
437    Unencrypted,
438    Unlocked { owner: Weak<dyn StoreOwner>, crypt: Arc<dyn Crypt> },
439
440    // The store is unlocked, but in a read-only state, and no flushes or other operations will be
441    // performed on the store.
442    UnlockedReadOnly(Arc<dyn Crypt>),
443
444    // The store is encrypted but is now in an unusable state (due to a failure to sync the journal
445    // after locking the store).  The store cannot be unlocked.
446    Invalid,
447
448    // Before we've read the StoreInfo we might not know whether the store is Locked or Unencrypted.
449    // This can happen when lazily opening stores (ObjectManager::lazy_open_store).
450    Unknown,
451
452    // The store is in the process of being locked.  Whilst the store is being locked, the store
453    // isn't usable; assertions will trip if any mutations are applied.
454    Locking,
455
456    // Whilst we're unlocking, we will replay encrypted mutations.  The store isn't usable until
457    // it's in the Unlocked state.
458    Unlocking,
459
460    // The store has been deleted.
461    Deleted,
462}
463
464impl LockState {
465    fn owner(&self) -> Option<Arc<dyn StoreOwner>> {
466        if let Self::Unlocked { owner, .. } = self { owner.upgrade() } else { None }
467    }
468}
469
470impl fmt::Debug for LockState {
471    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
472        formatter.write_str(match self {
473            LockState::Locked => "Locked",
474            LockState::Unencrypted => "Unencrypted",
475            LockState::Unlocked { .. } => "Unlocked",
476            LockState::UnlockedReadOnly(..) => "UnlockedReadOnly",
477            LockState::Invalid => "Invalid",
478            LockState::Unknown => "Unknown",
479            LockState::Locking => "Locking",
480            LockState::Unlocking => "Unlocking",
481            LockState::Deleted => "Deleted",
482        })
483    }
484}
485
486enum LastObjectId {
487    // This is used when the store is encrypted, but the key and ID isn't yet available.
488    Pending,
489
490    Unencrypted {
491        id: u64,
492    },
493
494    Encrypted {
495        // The *unencrypted* value of the last object ID.
496        id: u64,
497
498        // Encrypted stores will use a cipher to obfuscate the object ID.
499        cipher: Box<Ff1>,
500    },
501
502    Low32Bit {
503        reserved: HashSet<u32>,
504        unreserved: Vec<u32>,
505    },
506}
507
508impl LastObjectId {
509    /// Tries to get the next object ID.  Returns None if a new cipher is required because all
510    /// object IDs that can be generated with the current cipher have been exhausted, or if only
511    /// using the lower 32 bits which requires an async algorithm.
512    fn try_get_next(&mut self) -> Option<NonZero<u64>> {
513        match self {
514            LastObjectId::Unencrypted { id } => {
515                NonZero::new(id.wrapping_add(1)).inspect(|next| *id = next.get())
516            }
517            LastObjectId::Encrypted { id, cipher } => {
518                let mut next = *id;
519                let hi = next & OBJECT_ID_HI_MASK;
520                loop {
521                    if next as u32 == u32::MAX {
522                        return None;
523                    }
524                    next += 1;
525                    let candidate = hi | cipher.encrypt(next as u32) as u64;
526                    if let Some(candidate) = NonZero::new(candidate) {
527                        *id = next;
528                        return Some(candidate);
529                    }
530                }
531            }
532            _ => None,
533        }
534    }
535
536    /// Returns INVALID_OBJECT_ID if it's not possible to peek at the next object ID.
537    fn peek_next(&self) -> u64 {
538        match self {
539            LastObjectId::Unencrypted { id } => id.wrapping_add(1),
540            LastObjectId::Encrypted { id, cipher } => {
541                let mut next = *id;
542                let hi = next & OBJECT_ID_HI_MASK;
543                loop {
544                    if next as u32 == u32::MAX {
545                        return INVALID_OBJECT_ID;
546                    }
547                    next += 1;
548                    let candidate = hi | cipher.encrypt(next as u32) as u64;
549                    if candidate != INVALID_OBJECT_ID {
550                        return candidate;
551                    }
552                }
553            }
554            _ => INVALID_OBJECT_ID,
555        }
556    }
557
558    /// Returns INVALID_OBJECT_ID for algorithms that don't use the last ID.
559    fn id(&self) -> u64 {
560        match self {
561            LastObjectId::Unencrypted { id } | LastObjectId::Encrypted { id, .. } => *id,
562            _ => INVALID_OBJECT_ID,
563        }
564    }
565
566    /// Returns true if `id` is reserved (it must be 32 bits).
567    fn is_reserved(&self, id: u64) -> bool {
568        match self {
569            LastObjectId::Low32Bit { reserved, .. } => {
570                if let Ok(id) = id.try_into() {
571                    reserved.contains(&id)
572                } else {
573                    false
574                }
575            }
576            _ => false,
577        }
578    }
579
580    /// Reserves `id`.
581    fn reserve(&mut self, id: u64) {
582        match self {
583            LastObjectId::Low32Bit { reserved, .. } => {
584                assert!(reserved.insert(id.try_into().unwrap()))
585            }
586            _ => unreachable!(),
587        }
588    }
589
590    /// Unreserves `id`.
591    fn unreserve(&mut self, id: u64) {
592        match self {
593            LastObjectId::Low32Bit { unreserved, .. } => {
594                // To avoid races, where a reserved ID transitions from being reserved to being
595                // actually used in a committed transaction, we delay updating `reserved` until a
596                // suitable point.
597                //
598                // On thread A, we might have:
599                //
600                //   A1. Commit transaction (insert a record into the LSM tree that uses ID)
601                //   A2. `unreserve`
602                //
603                // And on another thread B, we might have:
604                //
605                //   B1. Drain `unreserved`.
606                //   B2. Check tree and `reserved` to see if ID is used.
607                //
608                // B2 will involve calling `LsmTree::layer_set` which should be thought of as a
609                // snapshot, so the change A1 might not be visible to thread B, but it won't matter
610                // because `reserved` will still include the ID.  So long as each thread does the
611                // operations in this order, it should be safe.
612                unreserved.push(id.try_into().unwrap())
613            }
614            _ => {}
615        }
616    }
617
618    /// Removes `unreserved` IDs from the `reserved` list.
619    fn drain_unreserved(&mut self) {
620        match self {
621            LastObjectId::Low32Bit { reserved, unreserved } => {
622                for u in unreserved.drain(..) {
623                    assert!(reserved.remove(&u));
624                }
625            }
626            _ => {}
627        }
628    }
629}
630
631pub struct ReservedId<'a>(&'a ObjectStore, NonZero<u64>);
632
633impl<'a> ReservedId<'a> {
634    fn new(store: &'a ObjectStore, id: NonZero<u64>) -> Self {
635        Self(store, id)
636    }
637
638    pub fn get(&self) -> u64 {
639        self.1.get()
640    }
641
642    /// The caller takes responsibility for this id.
643    #[must_use]
644    pub fn release(self) -> u64 {
645        let id = self.1.get();
646        std::mem::forget(self);
647        id
648    }
649}
650
651impl Drop for ReservedId<'_> {
652    fn drop(&mut self) {
653        self.0.last_object_id.lock().unreserve(self.1.get());
654    }
655}
656
657/// An object store supports a file like interface for objects.  Objects are keyed by a 64 bit
658/// identifier.  And object store has to be backed by a parent object store (which stores metadata
659/// for the object store).  The top-level object store (a.k.a. the root parent object store) is
660/// in-memory only.
661pub struct ObjectStore {
662    parent_store: Option<Arc<ObjectStore>>,
663    store_object_id: u64,
664    device: Arc<dyn Device>,
665    block_size: u64,
666    filesystem: Weak<FxFilesystem>,
667    // Lock ordering: This must be taken before `lock_state`.
668    store_info: Mutex<Option<StoreInfo>>,
669    tree: LSMTree<ObjectKey, ObjectValue>,
670
671    // When replaying the journal, the store cannot read StoreInfo until the whole journal
672    // has been replayed, so during that time, store_info_handle will be None and records
673    // just get sent to the tree. Once the journal has been replayed, we can open the store
674    // and load all the other layer information.
675    store_info_handle: OnceLock<DataObjectHandle<ObjectStore>>,
676
677    // The cipher to use for encrypted mutations, if this store is encrypted.
678    mutations_cipher: Mutex<Option<StreamCipher>>,
679
680    // Current lock state of the store.
681    // Lock ordering: This must be taken after `store_info`.
682    lock_state: Mutex<LockState>,
683    pub key_manager: KeyManager,
684
685    // Enable/disable tracing.
686    trace: AtomicBool,
687
688    // Informational counters for events occurring within the store.
689    counters: Mutex<ObjectStoreCounters>,
690
691    // These are updated in performance-sensitive code paths so we use atomics instead of counters.
692    device_read_ops: AtomicU64,
693    device_write_ops: AtomicU64,
694    logical_read_ops: AtomicU64,
695    logical_write_ops: AtomicU64,
696
697    // Contains the last object ID and, optionally, a cipher to be used when generating new object
698    // IDs.
699    last_object_id: Mutex<LastObjectId>,
700
701    // An optional callback to be invoked each time the ObjectStore flushes.  The callback is
702    // invoked at the end of flush, while the write lock is still held.
703    flush_callback: Mutex<Option<Box<dyn Fn(&ObjectStore) + Send + Sync + 'static>>>,
704}
705
706#[derive(Clone, Default)]
707struct ObjectStoreCounters {
708    mutations_applied: u64,
709    mutations_dropped: u64,
710    num_flushes: u64,
711    last_flush_time: Option<std::time::SystemTime>,
712}
713
714impl ObjectStore {
715    fn new(
716        parent_store: Option<Arc<ObjectStore>>,
717        store_object_id: u64,
718        filesystem: Arc<FxFilesystem>,
719        store_info: Option<StoreInfo>,
720        object_cache: Box<dyn ObjectCache<ObjectKey, ObjectValue>>,
721        mutations_cipher: Option<StreamCipher>,
722        lock_state: LockState,
723        last_object_id: LastObjectId,
724    ) -> Arc<ObjectStore> {
725        let device = filesystem.device();
726        let block_size = filesystem.block_size();
727        Arc::new(ObjectStore {
728            parent_store,
729            store_object_id,
730            device,
731            block_size,
732            filesystem: Arc::downgrade(&filesystem),
733            store_info: Mutex::new(store_info),
734            tree: LSMTree::new(merge::merge, object_cache),
735            store_info_handle: OnceLock::new(),
736            mutations_cipher: Mutex::new(mutations_cipher),
737            lock_state: Mutex::new(lock_state),
738            key_manager: KeyManager::new(),
739            trace: AtomicBool::new(false),
740            counters: Mutex::new(ObjectStoreCounters::default()),
741            device_read_ops: AtomicU64::new(0),
742            device_write_ops: AtomicU64::new(0),
743            logical_read_ops: AtomicU64::new(0),
744            logical_write_ops: AtomicU64::new(0),
745            last_object_id: Mutex::new(last_object_id),
746            flush_callback: Mutex::new(None),
747        })
748    }
749
750    fn new_empty(
751        parent_store: Option<Arc<ObjectStore>>,
752        store_object_id: u64,
753        filesystem: Arc<FxFilesystem>,
754        object_cache: Box<dyn ObjectCache<ObjectKey, ObjectValue>>,
755    ) -> Arc<Self> {
756        Self::new(
757            parent_store,
758            store_object_id,
759            filesystem,
760            Some(StoreInfo::default()),
761            object_cache,
762            None,
763            LockState::Unencrypted,
764            LastObjectId::Unencrypted { id: 0 },
765        )
766    }
767
768    /// Cycle breaker constructor that returns an ObjectStore without a filesystem.
769    /// This should only be used from super block code.
770    pub fn new_root_parent(device: Arc<dyn Device>, block_size: u64, store_object_id: u64) -> Self {
771        ObjectStore {
772            parent_store: None,
773            store_object_id,
774            device,
775            block_size,
776            filesystem: Weak::<FxFilesystem>::new(),
777            store_info: Mutex::new(Some(StoreInfo::default())),
778            tree: LSMTree::new(merge::merge, Box::new(NullCache {})),
779            store_info_handle: OnceLock::new(),
780            mutations_cipher: Mutex::new(None),
781            lock_state: Mutex::new(LockState::Unencrypted),
782            key_manager: KeyManager::new(),
783            trace: AtomicBool::new(false),
784            counters: Mutex::new(ObjectStoreCounters::default()),
785            device_read_ops: AtomicU64::new(0),
786            device_write_ops: AtomicU64::new(0),
787            logical_read_ops: AtomicU64::new(0),
788            logical_write_ops: AtomicU64::new(0),
789            last_object_id: Mutex::new(LastObjectId::Unencrypted { id: 0 }),
790            flush_callback: Mutex::new(None),
791        }
792    }
793
794    /// Used to set filesystem on root_parent stores at bootstrap time after the filesystem has
795    /// been created.
796    pub fn attach_filesystem(mut this: ObjectStore, filesystem: Arc<FxFilesystem>) -> ObjectStore {
797        this.filesystem = Arc::downgrade(&filesystem);
798        this
799    }
800
801    /// Create a child store. It is a multi-step process:
802    ///
803    ///   1. Call `ObjectStore::new_child_store`.
804    ///   2. Register the store with the object-manager.
805    ///   3. Call `ObjectStore::create` to write the store-info.
806    ///
807    /// If the procedure fails, care must be taken to unregister store with the object-manager.
808    ///
809    /// The steps have to be separate because of lifetime issues when working with a transaction.
810    async fn new_child_store(
811        self: &Arc<Self>,
812        transaction: &mut Transaction<'_>,
813        options: NewChildStoreOptions,
814        object_cache: Box<dyn ObjectCache<ObjectKey, ObjectValue>>,
815    ) -> Result<Arc<Self>, Error> {
816        ensure!(
817            !options.reserve_32bit_object_ids || !options.low_32_bit_object_ids,
818            FxfsError::InvalidArgs
819        );
820        let handle = if let Some(object_id) = NonZero::new(options.object_id) {
821            self.update_last_object_id(object_id.get());
822            let handle = ObjectStore::create_object_with_id(
823                self,
824                transaction,
825                ReservedId::new(self, object_id),
826                HandleOptions::default(),
827                None,
828            )?;
829            handle
830        } else {
831            ObjectStore::create_object(self, transaction, HandleOptions::default(), None).await?
832        };
833        let filesystem = self.filesystem();
834        let id = if options.reserve_32bit_object_ids { 0x1_0000_0000 } else { 0 };
835        let (last_object_id, last_object_id_in_memory) = if options.low_32_bit_object_ids {
836            (
837                LastObjectIdInfo::Low32Bit,
838                LastObjectId::Low32Bit { reserved: HashSet::new(), unreserved: Vec::new() },
839            )
840        } else if let Some(crypt) = &options.options.crypt {
841            let (object_id_wrapped, object_id_unwrapped) =
842                crypt.create_key(handle.object_id(), KeyPurpose::Metadata).await?;
843            (
844                LastObjectIdInfo::Encrypted { id, key: object_id_wrapped },
845                LastObjectId::Encrypted { id, cipher: Box::new(Ff1::new(&object_id_unwrapped)) },
846            )
847        } else {
848            (LastObjectIdInfo::Unencrypted { id }, LastObjectId::Unencrypted { id })
849        };
850        let store = if let Some(crypt) = options.options.crypt {
851            let (wrapped_key, unwrapped_key) =
852                crypt.create_key(handle.object_id(), KeyPurpose::Metadata).await?;
853            Self::new(
854                Some(self.clone()),
855                handle.object_id(),
856                filesystem.clone(),
857                Some(StoreInfo {
858                    mutations_key: Some(wrapped_key),
859                    last_object_id,
860                    guid: options.guid.unwrap_or_else(|| *Uuid::new_v4().as_bytes()),
861                    ..Default::default()
862                }),
863                object_cache,
864                Some(StreamCipher::new(&unwrapped_key, 0)),
865                LockState::Unlocked { owner: options.options.owner, crypt },
866                last_object_id_in_memory,
867            )
868        } else {
869            Self::new(
870                Some(self.clone()),
871                handle.object_id(),
872                filesystem.clone(),
873                Some(StoreInfo {
874                    last_object_id,
875                    guid: options.guid.unwrap_or_else(|| *Uuid::new_v4().as_bytes()),
876                    ..Default::default()
877                }),
878                object_cache,
879                None,
880                LockState::Unencrypted,
881                last_object_id_in_memory,
882            )
883        };
884        assert!(store.store_info_handle.set(handle).is_ok());
885        Ok(store)
886    }
887
888    /// Actually creates the store in a transaction.  This will also create a root directory and
889    /// graveyard directory for the store.  See `new_child_store` above.
890    async fn create<'a>(
891        self: &'a Arc<Self>,
892        transaction: &mut Transaction<'a>,
893    ) -> Result<(), Error> {
894        let buf = {
895            // Create a root directory and graveyard directory.
896            let graveyard_directory_object_id = Graveyard::create(transaction, &self).await?;
897            let root_directory = Directory::create(transaction, &self, None).await?;
898
899            let serialized_info = {
900                let mut store_info = self.store_info.lock();
901                let store_info = store_info.as_mut().unwrap();
902
903                store_info.graveyard_directory_object_id = graveyard_directory_object_id;
904                store_info.root_directory_object_id = root_directory.object_id();
905
906                let mut serialized_info = Vec::new();
907                store_info.serialize_with_version(&mut serialized_info)?;
908                serialized_info
909            };
910            let mut buf = self.device.allocate_buffer(serialized_info.len()).await;
911            buf.as_mut_slice().copy_from_slice(&serialized_info[..]);
912            buf
913        };
914
915        if self.filesystem().options().image_builder_mode.is_some() {
916            // If we're in image builder mode, we want to avoid writing to disk unless explicitly
917            // asked to. New object stores will have their StoreInfo written when we compact in
918            // FxFilesystem::finalize().
919            Ok(())
920        } else {
921            self.store_info_handle.get().unwrap().txn_write(transaction, 0u64, buf.as_ref()).await
922        }
923    }
924
925    pub fn set_trace(&self, trace: bool) {
926        let old_value = self.trace.swap(trace, Ordering::Relaxed);
927        if trace != old_value {
928            info!(store_id = self.store_object_id(), trace; "OS: trace",);
929        }
930    }
931
932    /// Sets a callback to be invoked each time the ObjectStore flushes.  The callback is invoked at
933    /// the end of flush, while the write lock is still held.
934    pub fn set_flush_callback<F: Fn(&ObjectStore) + Send + Sync + 'static>(&self, callback: F) {
935        let mut flush_callback = self.flush_callback.lock();
936        *flush_callback = Some(Box::new(callback));
937    }
938
939    pub fn is_root(&self) -> bool {
940        if let Some(parent) = &self.parent_store {
941            parent.parent_store.is_none()
942        } else {
943            // The root parent store isn't the root store.
944            false
945        }
946    }
947
948    /// Populates an inspect node with store statistics.
949    pub fn record_data(self: &Arc<Self>, root: &fuchsia_inspect::Node) {
950        // TODO(https://fxbug.dev/42069513): Push-back or rate-limit to prevent DoS.
951        let counters = self.counters.lock();
952        if let Some(store_info) = self.store_info() {
953            root.record_string("guid", Uuid::from_bytes(store_info.guid).to_string());
954        };
955        root.record_uint("store_object_id", self.store_object_id);
956        root.record_uint("mutations_applied", counters.mutations_applied);
957        root.record_uint("mutations_dropped", counters.mutations_dropped);
958        root.record_uint("num_flushes", counters.num_flushes);
959        if let Some(last_flush_time) = counters.last_flush_time.as_ref() {
960            root.record_uint(
961                "last_flush_time_ms",
962                last_flush_time
963                    .duration_since(std::time::UNIX_EPOCH)
964                    .unwrap_or(std::time::Duration::ZERO)
965                    .as_millis()
966                    .try_into()
967                    .unwrap_or(0u64),
968            );
969        }
970        root.record_uint("device_read_ops", self.device_read_ops.load(Ordering::Relaxed));
971        root.record_uint("device_write_ops", self.device_write_ops.load(Ordering::Relaxed));
972        root.record_uint("logical_read_ops", self.logical_read_ops.load(Ordering::Relaxed));
973        root.record_uint("logical_write_ops", self.logical_write_ops.load(Ordering::Relaxed));
974        {
975            let last_object_id = self.last_object_id.lock();
976            root.record_uint("object_id_hi", last_object_id.id() >> 32);
977            root.record_bool(
978                "low_32_bit_object_ids",
979                matches!(&*last_object_id, LastObjectId::Low32Bit { .. }),
980            );
981        }
982
983        let this = self.clone();
984        root.record_child("lsm_tree", move |node| this.tree().record_inspect_data(node));
985    }
986
987    pub fn device(&self) -> &Arc<dyn Device> {
988        &self.device
989    }
990
991    pub fn block_size(&self) -> u64 {
992        self.block_size
993    }
994
995    pub fn filesystem(&self) -> Arc<FxFilesystem> {
996        self.filesystem.upgrade().unwrap()
997    }
998
999    pub fn store_object_id(&self) -> u64 {
1000        self.store_object_id
1001    }
1002
1003    pub fn tree(&self) -> &LSMTree<ObjectKey, ObjectValue> {
1004        &self.tree
1005    }
1006
1007    pub fn root_directory_object_id(&self) -> u64 {
1008        self.store_info.lock().as_ref().unwrap().root_directory_object_id
1009    }
1010
1011    pub fn guid(&self) -> [u8; 16] {
1012        self.store_info.lock().as_ref().unwrap().guid
1013    }
1014
1015    pub fn graveyard_directory_object_id(&self) -> u64 {
1016        self.store_info.lock().as_ref().unwrap().graveyard_directory_object_id
1017    }
1018
1019    fn set_graveyard_directory_object_id(&self, oid: u64) {
1020        assert_eq!(
1021            std::mem::replace(
1022                &mut self.store_info.lock().as_mut().unwrap().graveyard_directory_object_id,
1023                oid
1024            ),
1025            INVALID_OBJECT_ID
1026        );
1027    }
1028
1029    pub fn object_count(&self) -> u64 {
1030        self.store_info.lock().as_ref().unwrap().object_count
1031    }
1032
1033    /// Returns INVALID_OBJECT_ID for algorithms that don't use the last ID.
1034    pub(crate) fn unencrypted_last_object_id(&self) -> u64 {
1035        self.last_object_id.lock().id()
1036    }
1037
1038    pub fn key_manager(&self) -> &KeyManager {
1039        &self.key_manager
1040    }
1041
1042    pub fn parent_store(&self) -> Option<&Arc<ObjectStore>> {
1043        self.parent_store.as_ref()
1044    }
1045
1046    /// Returns the crypt object for the store.  Returns None if the store is unencrypted.
1047    pub fn crypt(&self) -> Option<Arc<dyn Crypt>> {
1048        match &*self.lock_state.lock() {
1049            LockState::Locked => panic!("Store is locked"),
1050            LockState::Invalid
1051            | LockState::Unencrypted
1052            | LockState::Locking
1053            | LockState::Unlocking
1054            | LockState::Deleted => None,
1055            LockState::Unlocked { crypt, .. } => Some(crypt.clone()),
1056            LockState::UnlockedReadOnly(crypt) => Some(crypt.clone()),
1057            LockState::Unknown => {
1058                panic!("Store is of unknown lock state; has the journal been replayed yet?")
1059            }
1060        }
1061    }
1062
1063    pub async fn get_or_create_internal_directory_id(self: &Arc<Self>) -> Result<u64, Error> {
1064        // Create the transaction first to use the object store lock.
1065        let mut transaction = self
1066            .filesystem()
1067            .new_transaction(
1068                lock_keys![LockKey::InternalDirectory { store_object_id: self.store_object_id }],
1069                Options::default(),
1070            )
1071            .await?;
1072        let obj_id = self.store_info.lock().as_ref().unwrap().internal_directory_object_id;
1073        if obj_id != INVALID_OBJECT_ID {
1074            return Ok(obj_id);
1075        }
1076
1077        // Need to create an internal directory.
1078        let directory = Directory::create(&mut transaction, self, None).await?;
1079
1080        transaction.add(self.store_object_id, Mutation::CreateInternalDir(directory.object_id()));
1081        transaction.commit().await?;
1082        Ok(directory.object_id())
1083    }
1084
1085    /// Returns the file size for the object without opening the object.
1086    async fn get_file_size(&self, object_id: u64) -> Result<u64, Error> {
1087        let item = self
1088            .tree
1089            .find(&ObjectKey::attribute(
1090                object_id,
1091                DEFAULT_DATA_ATTRIBUTE_ID,
1092                AttributeKey::Attribute,
1093            ))
1094            .await?
1095            .ok_or(FxfsError::NotFound)?;
1096        if let ObjectValue::Attribute { size, .. } = item.value {
1097            Ok(size)
1098        } else {
1099            bail!(FxfsError::NotFile);
1100        }
1101    }
1102
1103    #[cfg(feature = "migration")]
1104    pub fn last_object_id(&self) -> u64 {
1105        self.last_object_id.lock().id()
1106    }
1107
1108    /// Provides access to the allocator to mark a specific region of the device as allocated.
1109    #[cfg(feature = "migration")]
1110    pub fn mark_allocated(
1111        &self,
1112        transaction: &mut Transaction<'_>,
1113        store_object_id: u64,
1114        device_range: std::ops::Range<u64>,
1115    ) -> Result<(), Error> {
1116        self.allocator().mark_allocated(transaction, store_object_id, device_range)
1117    }
1118
1119    /// `crypt` can be provided if the crypt service should be different to the default; see the
1120    /// comment on create_object.  Users should avoid having more than one handle open for the same
1121    /// object at the same time because they might get out-of-sync; there is no code that will
1122    /// prevent this.  One example where this can cause an issue is if the object ends up using a
1123    /// permanent key (which is the case if a value is passed for `crypt`), the permanent key is
1124    /// dropped when a handle is dropped, which will impact any other handles for the same object.
1125    pub async fn open_object<S: HandleOwner>(
1126        owner: &Arc<S>,
1127        obj_id: u64,
1128        options: HandleOptions,
1129        crypt: Option<Arc<dyn Crypt>>,
1130    ) -> Result<DataObjectHandle<S>, Error> {
1131        let store = owner.as_ref().as_ref();
1132        let mut fsverity_descriptor = None;
1133        let mut overwrite_ranges = Vec::new();
1134        let item = store
1135            .tree
1136            .find(&ObjectKey::attribute(obj_id, DEFAULT_DATA_ATTRIBUTE_ID, AttributeKey::Attribute))
1137            .await?
1138            .ok_or(FxfsError::NotFound)?;
1139
1140        let (size, track_overwrite_extents) = match item.value {
1141            ObjectValue::Attribute { size, has_overwrite_extents } => (size, has_overwrite_extents),
1142            ObjectValue::VerifiedAttribute { size, fsverity_metadata } => {
1143                if !options.skip_fsverity {
1144                    fsverity_descriptor = Some(fsverity_metadata);
1145                }
1146                // We only track the overwrite extents in memory for writes, reads handle them
1147                // implicitly, which means verified files (where the data won't change anymore)
1148                // don't need to track them.
1149                (size, false)
1150            }
1151            _ => bail!(anyhow!(FxfsError::Inconsistent).context("open_object: Expected attibute")),
1152        };
1153
1154        ensure!(size <= MAX_FILE_SIZE, FxfsError::Inconsistent);
1155
1156        if track_overwrite_extents {
1157            let layer_set = store.tree.layer_set();
1158            let mut merger = layer_set.merger();
1159            let mut iter = merger
1160                .query(Query::FullRange(&ObjectKey::attribute(
1161                    obj_id,
1162                    DEFAULT_DATA_ATTRIBUTE_ID,
1163                    AttributeKey::Extent(ExtentKey::search_key_from_offset(0)),
1164                )))
1165                .await?;
1166            loop {
1167                match iter.get() {
1168                    Some(ItemRef {
1169                        key:
1170                            ObjectKey {
1171                                object_id,
1172                                data:
1173                                    ObjectKeyData::Attribute(
1174                                        attribute_id,
1175                                        AttributeKey::Extent(ExtentKey { range }),
1176                                    ),
1177                            },
1178                        value,
1179                        ..
1180                    }) if *object_id == obj_id && *attribute_id == DEFAULT_DATA_ATTRIBUTE_ID => {
1181                        match value {
1182                            ObjectValue::Extent(ExtentValue::None)
1183                            | ObjectValue::Extent(ExtentValue::Some {
1184                                mode: ExtentMode::Raw,
1185                                ..
1186                            })
1187                            | ObjectValue::Extent(ExtentValue::Some {
1188                                mode: ExtentMode::Cow(_),
1189                                ..
1190                            }) => (),
1191                            ObjectValue::Extent(ExtentValue::Some {
1192                                mode: ExtentMode::OverwritePartial(_),
1193                                ..
1194                            })
1195                            | ObjectValue::Extent(ExtentValue::Some {
1196                                mode: ExtentMode::Overwrite,
1197                                ..
1198                            }) => overwrite_ranges.push(range.clone()),
1199                            _ => bail!(
1200                                anyhow!(FxfsError::Inconsistent)
1201                                    .context("open_object: Expected extent")
1202                            ),
1203                        }
1204                        iter.advance().await?;
1205                    }
1206                    _ => break,
1207                }
1208            }
1209        }
1210
1211        // If a crypt service has been specified, it needs to be a permanent key because cached
1212        // keys can only use the store's crypt service.
1213        let permanent = if let Some(crypt) = crypt {
1214            store
1215                .key_manager
1216                .get_keys(
1217                    obj_id,
1218                    crypt.as_ref(),
1219                    &mut Some(async || store.get_keys(obj_id).await),
1220                    /* permanent= */ true,
1221                    /* force= */ false,
1222                )
1223                .await?;
1224            true
1225        } else {
1226            false
1227        };
1228        let data_object_handle = DataObjectHandle::new(
1229            owner.clone(),
1230            obj_id,
1231            permanent,
1232            DEFAULT_DATA_ATTRIBUTE_ID,
1233            size,
1234            FsverityState::None,
1235            options,
1236            false,
1237            &overwrite_ranges,
1238        );
1239        if let Some(descriptor) = fsverity_descriptor {
1240            data_object_handle
1241                .set_fsverity_state_some(descriptor)
1242                .await
1243                .context("Invalid or mismatched merkle tree")?;
1244        }
1245        Ok(data_object_handle)
1246    }
1247
1248    pub fn create_object_with_id<S: HandleOwner>(
1249        owner: &Arc<S>,
1250        transaction: &mut Transaction<'_>,
1251        reserved_object_id: ReservedId<'_>,
1252        options: HandleOptions,
1253        encryption_options: Option<ObjectEncryptionOptions>,
1254    ) -> Result<DataObjectHandle<S>, Error> {
1255        let store = owner.as_ref().as_ref();
1256        // Don't permit creating unencrypted objects in an encrypted store.  The converse is OK.
1257        debug_assert!(store.crypt().is_none() || encryption_options.is_some());
1258        let now = Timestamp::now();
1259        let object_id = reserved_object_id.get();
1260        assert!(
1261            transaction
1262                .add(
1263                    store.store_object_id(),
1264                    Mutation::insert_object(
1265                        ObjectKey::object(reserved_object_id.release()),
1266                        ObjectValue::file(
1267                            1,
1268                            0,
1269                            now.clone(),
1270                            now.clone(),
1271                            now.clone(),
1272                            now,
1273                            0,
1274                            None
1275                        ),
1276                    ),
1277                )
1278                .is_none()
1279        );
1280        let mut permanent_keys = false;
1281        if let Some(ObjectEncryptionOptions { permanent, key_id, key, unwrapped_key }) =
1282            encryption_options
1283        {
1284            permanent_keys = permanent;
1285            let cipher = key_to_cipher(&key, &unwrapped_key)?;
1286            transaction.add(
1287                store.store_object_id(),
1288                Mutation::insert_object(
1289                    ObjectKey::keys(object_id),
1290                    ObjectValue::keys(vec![(key_id, key)].into()),
1291                ),
1292            );
1293            store.key_manager.insert(
1294                object_id,
1295                Arc::new(vec![(key_id, CipherHolder::Cipher(cipher))].into()),
1296                permanent,
1297            );
1298        }
1299        transaction.add(
1300            store.store_object_id(),
1301            Mutation::insert_object(
1302                ObjectKey::attribute(object_id, DEFAULT_DATA_ATTRIBUTE_ID, AttributeKey::Attribute),
1303                // This is a new object so nothing has pre-allocated overwrite extents yet.
1304                ObjectValue::attribute(0, false),
1305            ),
1306        );
1307        Ok(DataObjectHandle::new(
1308            owner.clone(),
1309            object_id,
1310            permanent_keys,
1311            DEFAULT_DATA_ATTRIBUTE_ID,
1312            0,
1313            FsverityState::None,
1314            options,
1315            false,
1316            &[],
1317        ))
1318    }
1319
1320    /// Creates an object in the store.
1321    ///
1322    /// If the store is encrypted, the object will be automatically encrypted as well.
1323    /// If `wrapping_key_id` is set, the new keys will be wrapped with that specific key, and
1324    /// otherwise the default data key is used.
1325    pub async fn create_object<S: HandleOwner>(
1326        owner: &Arc<S>,
1327        mut transaction: &mut Transaction<'_>,
1328        options: HandleOptions,
1329        wrapping_key_id: Option<WrappingKeyId>,
1330    ) -> Result<DataObjectHandle<S>, Error> {
1331        let store = owner.as_ref().as_ref();
1332        let object_id = store.get_next_object_id(transaction.txn_guard()).await?;
1333        let crypt = store.crypt();
1334        let encryption_options = if let Some(crypt) = crypt {
1335            let key_id =
1336                if wrapping_key_id.is_some() { FSCRYPT_KEY_ID } else { VOLUME_DATA_KEY_ID };
1337            let (key, unwrapped_key) = if let Some(wrapping_key_id) = wrapping_key_id {
1338                crypt.create_key_with_id(object_id.get(), wrapping_key_id, ObjectType::File).await?
1339            } else {
1340                let (fxfs_key, unwrapped_key) =
1341                    crypt.create_key(object_id.get(), KeyPurpose::Data).await?;
1342                (EncryptionKey::Fxfs(fxfs_key), unwrapped_key)
1343            };
1344            Some(ObjectEncryptionOptions { permanent: false, key_id, key, unwrapped_key })
1345        } else {
1346            None
1347        };
1348        ObjectStore::create_object_with_id(
1349            owner,
1350            &mut transaction,
1351            object_id,
1352            options,
1353            encryption_options,
1354        )
1355    }
1356
1357    /// Creates an object using explicitly provided keys.
1358    ///
1359    /// There are some cases where an encrypted object needs to be created in an unencrypted store.
1360    /// For example, when layer files for a child store are created in the root store, but they must
1361    /// be encrypted using the child store's keys.  This method exists for that purpose.
1362    pub(crate) async fn create_object_with_key<S: HandleOwner>(
1363        owner: &Arc<S>,
1364        mut transaction: &mut Transaction<'_>,
1365        object_id: ReservedId<'_>,
1366        options: HandleOptions,
1367        key: EncryptionKey,
1368        unwrapped_key: UnwrappedKey,
1369    ) -> Result<DataObjectHandle<S>, Error> {
1370        ObjectStore::create_object_with_id(
1371            owner,
1372            &mut transaction,
1373            object_id,
1374            options,
1375            Some(ObjectEncryptionOptions {
1376                permanent: true,
1377                key_id: VOLUME_DATA_KEY_ID,
1378                key,
1379                unwrapped_key,
1380            }),
1381        )
1382    }
1383
1384    /// Adjusts the reference count for a given object.  If the reference count reaches zero, the
1385    /// object is moved into the graveyard and true is returned.
1386    pub async fn adjust_refs(
1387        &self,
1388        transaction: &mut Transaction<'_>,
1389        object_id: u64,
1390        delta: i64,
1391    ) -> Result<bool, Error> {
1392        let mut mutation = self.txn_get_object_mutation(transaction, object_id).await?;
1393        let refs = if let ObjectValue::Object {
1394            kind: ObjectKind::File { refs, .. } | ObjectKind::Symlink { refs, .. },
1395            ..
1396        } = &mut mutation.item.value
1397        {
1398            *refs =
1399                refs.checked_add_signed(delta).ok_or_else(|| anyhow!("refs underflow/overflow"))?;
1400            refs
1401        } else {
1402            bail!(FxfsError::NotFile);
1403        };
1404        if *refs == 0 {
1405            self.add_to_graveyard(transaction, object_id);
1406
1407            // We might still need to adjust the reference count if delta was something other than
1408            // -1.
1409            if delta != -1 {
1410                *refs = 1;
1411                transaction.add(self.store_object_id, Mutation::ObjectStore(mutation));
1412            }
1413            // Otherwise, we don't commit the mutation as we want to keep reference count as 1 for
1414            // objects in graveyard.
1415            Ok(true)
1416        } else {
1417            transaction.add(self.store_object_id, Mutation::ObjectStore(mutation));
1418            Ok(false)
1419        }
1420    }
1421
1422    // Purges an object that is in the graveyard.
1423    pub async fn tombstone_object(
1424        &self,
1425        object_id: u64,
1426        txn_options: Options<'_>,
1427    ) -> Result<(), Error> {
1428        self.key_manager.remove(object_id).await;
1429        let fs = self.filesystem();
1430        let truncate_guard = fs.truncate_guard(self.store_object_id, object_id).await;
1431        self.trim_or_tombstone(object_id, true, txn_options, &truncate_guard).await
1432    }
1433
1434    /// Trim extents beyond the end of a file for all attributes.  This will remove the entry from
1435    /// the graveyard when done.
1436    pub async fn trim(
1437        &self,
1438        object_id: u64,
1439        truncate_guard: &TruncateGuard<'_>,
1440    ) -> Result<(), Error> {
1441        // For the root and root parent store, we would need to use the metadata reservation which
1442        // we don't currently support, so assert that we're not those stores.
1443        assert!(self.parent_store.as_ref().unwrap().parent_store.is_some());
1444
1445        self.trim_or_tombstone(
1446            object_id,
1447            false,
1448            Options { borrow_metadata_space: true, ..Default::default() },
1449            truncate_guard,
1450        )
1451        .await
1452    }
1453
1454    /// Trims or tombstones an object.
1455    async fn trim_or_tombstone(
1456        &self,
1457        object_id: u64,
1458        for_tombstone: bool,
1459        txn_options: Options<'_>,
1460        _truncate_guard: &TruncateGuard<'_>,
1461    ) -> Result<(), Error> {
1462        let fs = self.filesystem();
1463        let mut next_attribute = Some(0);
1464        while let Some(attribute_id) = next_attribute.take() {
1465            let mut transaction = fs
1466                .clone()
1467                .new_transaction(
1468                    lock_keys![
1469                        LockKey::object_attribute(self.store_object_id, object_id, attribute_id),
1470                        LockKey::object(self.store_object_id, object_id),
1471                    ],
1472                    txn_options,
1473                )
1474                .await?;
1475
1476            match self
1477                .trim_some(
1478                    &mut transaction,
1479                    object_id,
1480                    attribute_id,
1481                    if for_tombstone {
1482                        TrimMode::Tombstone(TombstoneMode::Object)
1483                    } else {
1484                        TrimMode::UseSize
1485                    },
1486                )
1487                .await?
1488            {
1489                TrimResult::Incomplete => next_attribute = Some(attribute_id),
1490                TrimResult::Done(None) => {
1491                    if for_tombstone
1492                        || matches!(
1493                            self.tree
1494                                .find(&ObjectKey::graveyard_entry(
1495                                    self.graveyard_directory_object_id(),
1496                                    object_id,
1497                                ))
1498                                .await?,
1499                            Some(Item { value: ObjectValue::Trim, .. })
1500                        )
1501                    {
1502                        self.remove_from_graveyard(&mut transaction, object_id);
1503                    }
1504                }
1505                TrimResult::Done(id) => next_attribute = id,
1506            }
1507
1508            if !transaction.mutations().is_empty() {
1509                transaction.commit().await?;
1510            }
1511        }
1512        Ok(())
1513    }
1514
1515    // Purges an object's attribute that is in the graveyard.
1516    pub async fn tombstone_attribute(
1517        &self,
1518        object_id: u64,
1519        attribute_id: u64,
1520        txn_options: Options<'_>,
1521    ) -> Result<(), Error> {
1522        let fs = self.filesystem();
1523        let mut trim_result = TrimResult::Incomplete;
1524        while matches!(trim_result, TrimResult::Incomplete) {
1525            let mut transaction = fs
1526                .clone()
1527                .new_transaction(
1528                    lock_keys![
1529                        LockKey::object_attribute(self.store_object_id, object_id, attribute_id),
1530                        LockKey::object(self.store_object_id, object_id),
1531                    ],
1532                    txn_options,
1533                )
1534                .await?;
1535            trim_result = self
1536                .trim_some(
1537                    &mut transaction,
1538                    object_id,
1539                    attribute_id,
1540                    TrimMode::Tombstone(TombstoneMode::Attribute),
1541                )
1542                .await?;
1543            if let TrimResult::Done(..) = trim_result {
1544                self.remove_attribute_from_graveyard(&mut transaction, object_id, attribute_id)
1545            }
1546            if !transaction.mutations().is_empty() {
1547                transaction.commit().await?;
1548            }
1549        }
1550        Ok(())
1551    }
1552
1553    /// Deletes extents for attribute `attribute_id` in object `object_id`.  Also see the comments
1554    /// for TrimMode and TrimResult. Should hold a lock on the attribute, and the object as it
1555    /// performs a read-modify-write on the sizes.
1556    pub async fn trim_some(
1557        &self,
1558        transaction: &mut Transaction<'_>,
1559        object_id: u64,
1560        attribute_id: u64,
1561        mode: TrimMode,
1562    ) -> Result<TrimResult, Error> {
1563        let layer_set = self.tree.layer_set();
1564        let mut merger = layer_set.merger();
1565
1566        let aligned_offset = match mode {
1567            TrimMode::FromOffset(offset) => {
1568                round_up(offset, self.block_size).ok_or(FxfsError::Inconsistent)?
1569            }
1570            TrimMode::Tombstone(..) => 0,
1571            TrimMode::UseSize => {
1572                let iter = merger
1573                    .query(Query::FullRange(&ObjectKey::attribute(
1574                        object_id,
1575                        attribute_id,
1576                        AttributeKey::Attribute,
1577                    )))
1578                    .await?;
1579                if let Some(item_ref) = iter.get() {
1580                    if item_ref.key.object_id != object_id {
1581                        return Ok(TrimResult::Done(None));
1582                    }
1583
1584                    if let ItemRef {
1585                        key:
1586                            ObjectKey {
1587                                data:
1588                                    ObjectKeyData::Attribute(size_attribute_id, AttributeKey::Attribute),
1589                                ..
1590                            },
1591                        value: ObjectValue::Attribute { size, .. },
1592                        ..
1593                    } = item_ref
1594                    {
1595                        // If we found a different attribute_id, return so we can get the
1596                        // right lock.
1597                        if *size_attribute_id != attribute_id {
1598                            return Ok(TrimResult::Done(Some(*size_attribute_id)));
1599                        }
1600                        round_up(*size, self.block_size).ok_or(FxfsError::Inconsistent)?
1601                    } else {
1602                        // At time of writing, we should always see a size record or None here, but
1603                        // asserting here would be brittle so just skip to the the next attribute
1604                        // instead.
1605                        return Ok(TrimResult::Done(Some(attribute_id + 1)));
1606                    }
1607                } else {
1608                    // End of the tree.
1609                    return Ok(TrimResult::Done(None));
1610                }
1611            }
1612        };
1613
1614        // Loop over the extents and deallocate them.
1615        let mut iter = merger
1616            .query(Query::FullRange(&ObjectKey::from_extent(
1617                object_id,
1618                attribute_id,
1619                ExtentKey::search_key_from_offset(aligned_offset),
1620            )))
1621            .await?;
1622        let mut end = 0;
1623        let allocator = self.allocator();
1624        let mut result = TrimResult::Done(None);
1625        let mut deallocated = 0;
1626        let block_size = self.block_size;
1627
1628        while let Some(item_ref) = iter.get() {
1629            if item_ref.key.object_id != object_id {
1630                break;
1631            }
1632            if let ObjectKey {
1633                data: ObjectKeyData::Attribute(extent_attribute_id, attribute_key),
1634                ..
1635            } = item_ref.key
1636            {
1637                if *extent_attribute_id != attribute_id {
1638                    result = TrimResult::Done(Some(*extent_attribute_id));
1639                    break;
1640                }
1641                if let (
1642                    AttributeKey::Extent(ExtentKey { range }),
1643                    ObjectValue::Extent(ExtentValue::Some { device_offset, .. }),
1644                ) = (attribute_key, item_ref.value)
1645                {
1646                    let start = std::cmp::max(range.start, aligned_offset);
1647                    ensure!(start < range.end, FxfsError::Inconsistent);
1648                    let device_offset = device_offset
1649                        .checked_add(start - range.start)
1650                        .ok_or(FxfsError::Inconsistent)?;
1651                    end = range.end;
1652                    let len = end - start;
1653                    let device_range = device_offset..device_offset + len;
1654                    ensure!(device_range.is_aligned(block_size), FxfsError::Inconsistent);
1655                    allocator.deallocate(transaction, self.store_object_id, device_range).await?;
1656                    deallocated += len;
1657                    // Stop if the transaction is getting too big.
1658                    if transaction.mutations().len() >= TRANSACTION_MUTATION_THRESHOLD {
1659                        result = TrimResult::Incomplete;
1660                        break;
1661                    }
1662                }
1663            }
1664            iter.advance().await?;
1665        }
1666
1667        let finished_tombstone_object = matches!(mode, TrimMode::Tombstone(TombstoneMode::Object))
1668            && matches!(result, TrimResult::Done(None));
1669        let finished_tombstone_attribute =
1670            matches!(mode, TrimMode::Tombstone(TombstoneMode::Attribute))
1671                && !matches!(result, TrimResult::Incomplete);
1672        let mut object_mutation = None;
1673        let nodes = if finished_tombstone_object { -1 } else { 0 };
1674        if nodes != 0 || deallocated != 0 {
1675            let mutation = self.txn_get_object_mutation(transaction, object_id).await?;
1676            if let ObjectValue::Object { attributes: ObjectAttributes { project_id, .. }, .. } =
1677                mutation.item.value
1678            {
1679                if project_id != 0 {
1680                    transaction.add(
1681                        self.store_object_id,
1682                        Mutation::merge_object(
1683                            ObjectKey::project_usage(self.root_directory_object_id(), project_id),
1684                            ObjectValue::BytesAndNodes {
1685                                bytes: -i64::try_from(deallocated).unwrap(),
1686                                nodes,
1687                            },
1688                        ),
1689                    );
1690                }
1691                object_mutation = Some(mutation);
1692            } else {
1693                panic!("Inconsistent object type.");
1694            }
1695        }
1696
1697        // Deletion marker records *must* be merged so as to consume all other records for the
1698        // object.
1699        if finished_tombstone_object {
1700            transaction.add(
1701                self.store_object_id,
1702                Mutation::merge_object(ObjectKey::object(object_id), ObjectValue::None),
1703            );
1704        } else {
1705            if finished_tombstone_attribute {
1706                transaction.add(
1707                    self.store_object_id,
1708                    Mutation::merge_object(
1709                        ObjectKey::attribute(object_id, attribute_id, AttributeKey::Attribute),
1710                        ObjectValue::None,
1711                    ),
1712                );
1713            }
1714            if deallocated > 0 {
1715                let mut mutation = match object_mutation {
1716                    Some(mutation) => mutation,
1717                    None => self.txn_get_object_mutation(transaction, object_id).await?,
1718                };
1719                transaction.add(
1720                    self.store_object_id,
1721                    Mutation::merge_object(
1722                        ObjectKey::extent(object_id, attribute_id, aligned_offset..end),
1723                        ObjectValue::deleted_extent(),
1724                    ),
1725                );
1726                // Update allocated size.
1727                if let ObjectValue::Object {
1728                    attributes: ObjectAttributes { allocated_size, .. },
1729                    ..
1730                } = &mut mutation.item.value
1731                {
1732                    // The only way for these to fail are if the volume is inconsistent.
1733                    *allocated_size = allocated_size.checked_sub(deallocated).ok_or_else(|| {
1734                        anyhow!(FxfsError::Inconsistent).context("Allocated size overflow")
1735                    })?;
1736                } else {
1737                    panic!("Unexpected object value");
1738                }
1739                transaction.add(self.store_object_id, Mutation::ObjectStore(mutation));
1740            }
1741        }
1742        Ok(result)
1743    }
1744
1745    /// Returns all objects that exist in the parent store that pertain to this object store.
1746    /// Note that this doesn't include the object_id of the store itself which is generally
1747    /// referenced externally.
1748    pub fn parent_objects(&self) -> Vec<u64> {
1749        assert!(self.store_info_handle.get().is_some());
1750        self.store_info.lock().as_ref().unwrap().parent_objects()
1751    }
1752
1753    /// Returns root objects for this store.
1754    pub fn root_objects(&self) -> Vec<u64> {
1755        let mut objects = Vec::new();
1756        let store_info = self.store_info.lock();
1757        let info = store_info.as_ref().unwrap();
1758        if info.root_directory_object_id != INVALID_OBJECT_ID {
1759            objects.push(info.root_directory_object_id);
1760        }
1761        if info.graveyard_directory_object_id != INVALID_OBJECT_ID {
1762            objects.push(info.graveyard_directory_object_id);
1763        }
1764        if info.internal_directory_object_id != INVALID_OBJECT_ID {
1765            objects.push(info.internal_directory_object_id);
1766        }
1767        objects
1768    }
1769
1770    pub fn store_info(&self) -> Option<StoreInfo> {
1771        self.store_info.lock().as_ref().cloned()
1772    }
1773
1774    /// Returns None if called during journal replay.
1775    pub fn store_info_handle_object_id(&self) -> Option<u64> {
1776        self.store_info_handle.get().map(|h| h.object_id())
1777    }
1778
1779    /// Called to open a store, before replay of this store's mutations.
1780    async fn open(
1781        parent_store: &Arc<ObjectStore>,
1782        store_object_id: u64,
1783        object_cache: Box<dyn ObjectCache<ObjectKey, ObjectValue>>,
1784    ) -> Result<Arc<ObjectStore>, Error> {
1785        let handle =
1786            ObjectStore::open_object(parent_store, store_object_id, HandleOptions::default(), None)
1787                .await?;
1788
1789        let info = load_store_info(parent_store, store_object_id).await?;
1790        let is_encrypted = info.mutations_key.is_some();
1791
1792        let mut total_layer_size = 0;
1793        let last_object_id;
1794
1795        // TODO(https://fxbug.dev/42178043): the layer size here could be bad and cause overflow.
1796
1797        // If the store is encrypted, we can't open the object tree layers now, but we need to
1798        // compute the size of the layers.
1799        if is_encrypted {
1800            for &oid in &info.layers {
1801                total_layer_size += parent_store.get_file_size(oid).await?;
1802            }
1803            if info.encrypted_mutations_object_id != INVALID_OBJECT_ID {
1804                total_layer_size += layer_size_from_encrypted_mutations_size(
1805                    parent_store.get_file_size(info.encrypted_mutations_object_id).await?,
1806                );
1807            }
1808            last_object_id = LastObjectId::Pending;
1809            ensure!(
1810                matches!(
1811                    info.last_object_id,
1812                    LastObjectIdInfo::Encrypted { .. } | LastObjectIdInfo::Low32Bit { .. }
1813                ),
1814                FxfsError::Inconsistent
1815            );
1816        } else {
1817            last_object_id = match info.last_object_id {
1818                LastObjectIdInfo::Unencrypted { id } => LastObjectId::Unencrypted { id },
1819                LastObjectIdInfo::Low32Bit => {
1820                    LastObjectId::Low32Bit { reserved: HashSet::new(), unreserved: Vec::new() }
1821                }
1822                _ => bail!(FxfsError::Inconsistent),
1823            };
1824        }
1825
1826        let fs = parent_store.filesystem();
1827
1828        let store = ObjectStore::new(
1829            Some(parent_store.clone()),
1830            store_object_id,
1831            fs.clone(),
1832            if is_encrypted { None } else { Some(info) },
1833            object_cache,
1834            None,
1835            if is_encrypted { LockState::Locked } else { LockState::Unencrypted },
1836            last_object_id,
1837        );
1838
1839        assert!(store.store_info_handle.set(handle).is_ok(), "Failed to set store_info_handle!");
1840
1841        if !is_encrypted {
1842            let object_tree_layer_object_ids =
1843                store.store_info.lock().as_ref().unwrap().layers.clone();
1844            let object_layers = store.open_layers(object_tree_layer_object_ids, None).await?;
1845            total_layer_size = object_layers.iter().map(|h| h.get_size()).sum();
1846            store
1847                .tree
1848                .append_layers(object_layers)
1849                .await
1850                .context("Failed to read object store layers")?;
1851        }
1852
1853        fs.object_manager().update_reservation(
1854            store_object_id,
1855            tree::reservation_amount_from_layer_size(total_layer_size),
1856        );
1857
1858        Ok(store)
1859    }
1860
1861    async fn load_store_info(&self) -> Result<StoreInfo, Error> {
1862        load_store_info_from_handle(self.store_info_handle.get().unwrap()).await
1863    }
1864
1865    async fn open_layers(
1866        &self,
1867        object_ids: impl std::iter::IntoIterator<Item = u64>,
1868        crypt: Option<Arc<dyn Crypt>>,
1869    ) -> Result<Vec<DataObjectHandle<ObjectStore>>, Error> {
1870        let parent_store = self.parent_store.as_ref().unwrap();
1871        let mut handles = Vec::new();
1872        for object_id in object_ids {
1873            let handle = ObjectStore::open_object(
1874                &parent_store,
1875                object_id,
1876                HandleOptions::default(),
1877                crypt.clone(),
1878            )
1879            .await
1880            .with_context(|| format!("Failed to open layer file {}", object_id))?;
1881            handles.push(handle);
1882        }
1883        Ok(handles)
1884    }
1885
1886    /// Unlocks a store so that it is ready to be used.
1887    /// This is not thread-safe.
1888    pub async fn unlock(
1889        self: &Arc<Self>,
1890        owner: Weak<dyn StoreOwner>,
1891        crypt: Arc<dyn Crypt>,
1892    ) -> Result<(), Error> {
1893        self.unlock_inner(owner, crypt, /*read_only=*/ false).await
1894    }
1895
1896    /// Unlocks a store so that it is ready to be read from.
1897    /// The store will generally behave like it is still locked: when flushed, the store will
1898    /// write out its mutations into the encrypted mutations file, rather than directly updating
1899    /// the layer files of the object store.
1900    /// Re-locking the store (which *must* be done with `Self::lock_read_only` will not trigger a
1901    /// flush, although the store might still be flushed during other operations.
1902    /// This is not thread-safe.
1903    pub async fn unlock_read_only(self: &Arc<Self>, crypt: Arc<dyn Crypt>) -> Result<(), Error> {
1904        self.unlock_inner(NO_OWNER, crypt, /*read_only=*/ true).await
1905    }
1906
1907    async fn unlock_inner(
1908        self: &Arc<Self>,
1909        owner: Weak<dyn StoreOwner>,
1910        crypt: Arc<dyn Crypt>,
1911        read_only: bool,
1912    ) -> Result<(), Error> {
1913        // Unless we are unlocking the store as read-only, the filesystem must not be read-only.
1914        assert!(read_only || !self.filesystem().options().read_only);
1915        match &*self.lock_state.lock() {
1916            LockState::Locked => {}
1917            LockState::Unencrypted => bail!(FxfsError::InvalidArgs),
1918            LockState::Invalid | LockState::Deleted => bail!(FxfsError::Internal),
1919            LockState::Unlocked { .. } | LockState::UnlockedReadOnly(..) => {
1920                bail!(FxfsError::AlreadyBound)
1921            }
1922            LockState::Unknown => panic!("Store was unlocked before replay"),
1923            LockState::Locking => panic!("Store is being locked"),
1924            LockState::Unlocking => panic!("Store is being unlocked"),
1925        }
1926        // We must lock flushing since that can modify store_info and the encrypted mutations file.
1927        let keys = lock_keys![LockKey::flush(self.store_object_id())];
1928        let fs = self.filesystem();
1929        let guard = fs.lock_manager().write_lock(keys).await;
1930
1931        let store_info = self.load_store_info().await?;
1932
1933        self.tree
1934            .append_layers(
1935                self.open_layers(store_info.layers.iter().cloned(), Some(crypt.clone())).await?,
1936            )
1937            .await
1938            .context("Failed to read object tree layer file contents")?;
1939
1940        let wrapped_key =
1941            fxfs_crypto::WrappedKey::Fxfs(store_info.mutations_key.clone().unwrap().into());
1942        let unwrapped_key = crypt
1943            .unwrap_key(&wrapped_key, self.store_object_id)
1944            .await
1945            .context("Failed to unwrap mutations keys")?;
1946        // The ChaCha20 stream cipher we use supports up to 64 GiB.  By default we'll roll the key
1947        // after every 128 MiB.  Here we just need to pick a number that won't cause issues if it
1948        // wraps, so we just use u32::MAX (the offset is u64).
1949        ensure!(store_info.mutations_cipher_offset <= u32::MAX as u64, FxfsError::Inconsistent);
1950        let mut mutations_cipher =
1951            StreamCipher::new(&unwrapped_key, store_info.mutations_cipher_offset);
1952
1953        match &store_info.last_object_id {
1954            LastObjectIdInfo::Encrypted { id, key } => {
1955                let wrapped_key = fxfs_crypto::WrappedKey::Fxfs(key.clone().into());
1956                *self.last_object_id.lock() = LastObjectId::Encrypted {
1957                    id: *id,
1958                    cipher: Box::new(Ff1::new(
1959                        &crypt.unwrap_key(&wrapped_key, self.store_object_id).await?,
1960                    )),
1961                };
1962            }
1963            LastObjectIdInfo::Low32Bit => {
1964                *self.last_object_id.lock() = LastObjectId::Low32Bit {
1965                    reserved: Default::default(),
1966                    unreserved: Default::default(),
1967                }
1968            }
1969            _ => unreachable!(),
1970        }
1971
1972        // Apply the encrypted mutations.
1973        let mut mutations = {
1974            if store_info.encrypted_mutations_object_id == INVALID_OBJECT_ID {
1975                EncryptedMutations::default()
1976            } else {
1977                let parent_store = self.parent_store.as_ref().unwrap();
1978                let handle = ObjectStore::open_object(
1979                    &parent_store,
1980                    store_info.encrypted_mutations_object_id,
1981                    HandleOptions::default(),
1982                    None,
1983                )
1984                .await?;
1985                let mut cursor = std::io::Cursor::new(
1986                    handle
1987                        .contents(MAX_ENCRYPTED_MUTATIONS_SIZE)
1988                        .await
1989                        .context(FxfsError::Inconsistent)?,
1990                );
1991                let mut mutations = EncryptedMutations::deserialize_with_version(&mut cursor)
1992                    .context("Failed to deserialize EncryptedMutations")?
1993                    .0;
1994                let len = cursor.get_ref().len() as u64;
1995                while cursor.position() < len {
1996                    mutations.extend(
1997                        &EncryptedMutations::deserialize_with_version(&mut cursor)
1998                            .context("Failed to deserialize EncryptedMutations")?
1999                            .0,
2000                    );
2001                }
2002                mutations
2003            }
2004        };
2005
2006        // This assumes that the journal has no buffered mutations for this store (see Self::lock).
2007        let journaled = EncryptedMutations::from_replayed_mutations(
2008            self.store_object_id,
2009            fs.journal()
2010                .read_transactions_for_object(self.store_object_id)
2011                .await
2012                .context("Failed to read encrypted mutations from journal")?,
2013        );
2014        mutations.extend(&journaled);
2015
2016        let _ = std::mem::replace(&mut *self.lock_state.lock(), LockState::Unlocking);
2017        *self.store_info.lock() = Some(store_info);
2018
2019        // If we fail, clean up.
2020        let clean_up = scopeguard::guard((), |_| {
2021            *self.lock_state.lock() = LockState::Locked;
2022            *self.store_info.lock() = None;
2023            // Make sure we don't leave unencrypted data lying around in memory.
2024            self.tree.reset();
2025        });
2026
2027        let EncryptedMutations { transactions, mut data, mutations_key_roll } = mutations;
2028
2029        let mut slice = &mut data[..];
2030        let mut last_offset = 0;
2031        for (offset, key) in mutations_key_roll {
2032            let split_offset = offset
2033                .checked_sub(last_offset)
2034                .ok_or(FxfsError::Inconsistent)
2035                .context("Invalid mutation key roll offset")?;
2036            last_offset = offset;
2037            ensure!(split_offset <= slice.len(), FxfsError::Inconsistent);
2038            let (old, new) = slice.split_at_mut(split_offset);
2039            mutations_cipher.decrypt(old);
2040            let unwrapped_key = crypt
2041                .unwrap_key(&fxfs_crypto::WrappedKey::Fxfs(key.into()), self.store_object_id)
2042                .await
2043                .context("Failed to unwrap mutations keys")?;
2044            mutations_cipher = StreamCipher::new(&unwrapped_key, 0);
2045            slice = new;
2046        }
2047        mutations_cipher.decrypt(slice);
2048
2049        // Always roll the mutations key when we unlock which guarantees we won't reuse a
2050        // previous key and nonce.
2051        self.roll_mutations_key(crypt.as_ref()).await?;
2052
2053        let mut cursor = std::io::Cursor::new(data);
2054        for (checkpoint, count) in transactions {
2055            let context = ApplyContext { mode: ApplyMode::Replay, checkpoint };
2056            for _ in 0..count {
2057                let mutation =
2058                    Mutation::deserialize_from_version(&mut cursor, context.checkpoint.version)
2059                        .context("failed to deserialize encrypted mutation")?;
2060                self.apply_mutation(mutation, &context, AssocObj::None)
2061                    .context("failed to apply encrypted mutation")?;
2062            }
2063        }
2064
2065        *self.lock_state.lock() = if read_only {
2066            LockState::UnlockedReadOnly(crypt)
2067        } else {
2068            LockState::Unlocked { owner, crypt }
2069        };
2070
2071        // To avoid unbounded memory growth, we should flush the encrypted mutations now. Otherwise
2072        // it's possible for more writes to be queued and for the store to be locked before we can
2073        // flush anything and that can repeat.
2074        std::mem::drop(guard);
2075
2076        if !read_only && !self.filesystem().options().read_only {
2077            self.flush_with_reason(flush::Reason::Unlock).await?;
2078
2079            // Reap purged files within this store.
2080            let _ = self.filesystem().graveyard().initial_reap(&self).await?;
2081        }
2082
2083        // Return and cancel the clean up.
2084        Ok(ScopeGuard::into_inner(clean_up))
2085    }
2086
2087    pub fn is_locked(&self) -> bool {
2088        matches!(
2089            *self.lock_state.lock(),
2090            LockState::Locked | LockState::Locking | LockState::Unknown
2091        )
2092    }
2093
2094    /// NB: This is not the converse of `is_locked`, as there are lock states where neither are
2095    /// true.
2096    pub fn is_unlocked(&self) -> bool {
2097        matches!(
2098            *self.lock_state.lock(),
2099            LockState::Unlocked { .. } | LockState::UnlockedReadOnly { .. } | LockState::Unlocking
2100        )
2101    }
2102
2103    pub fn is_unknown(&self) -> bool {
2104        matches!(*self.lock_state.lock(), LockState::Unknown)
2105    }
2106
2107    pub fn is_encrypted(&self) -> bool {
2108        self.store_info.lock().as_ref().unwrap().mutations_key.is_some()
2109    }
2110
2111    // Locks a store.
2112    // This operation will take a flush lock on the store, in case any flushes are ongoing.  Any
2113    // ongoing store accesses might be interrupted by this.  See `Self::crypt`.
2114    // Whilst this can return an error, the store will be placed into an unusable but safe state
2115    // (i.e. no lingering unencrypted data) if an error is encountered.
2116    pub async fn lock(&self) -> Result<(), Error> {
2117        // We must lock flushing since it is not safe for that to be happening whilst we are locking
2118        // the store.
2119        let keys = lock_keys![LockKey::flush(self.store_object_id())];
2120        let fs = self.filesystem();
2121        let _guard = fs.lock_manager().write_lock(keys).await;
2122
2123        {
2124            let mut lock_state = self.lock_state.lock();
2125            if let LockState::Unlocked { .. } = &*lock_state {
2126                *lock_state = LockState::Locking;
2127            } else {
2128                panic!("Unexpected lock state: {:?}", &*lock_state);
2129            }
2130        }
2131
2132        // Sync the journal now to ensure that any buffered mutations for this store make it out to
2133        // disk.  This is necessary to be able to unlock the store again.
2134        // We need to establish a barrier at this point (so that the journaled writes are observable
2135        // by any future attempts to unlock the store), hence the flush_device.
2136        let sync_result =
2137            self.filesystem().sync(SyncOptions { flush_device: true, ..Default::default() }).await;
2138
2139        *self.lock_state.lock() = if let Err(error) = &sync_result {
2140            error!(error:?; "Failed to sync journal; store will no longer be usable");
2141            LockState::Invalid
2142        } else {
2143            LockState::Locked
2144        };
2145        self.key_manager.clear();
2146        *self.store_info.lock() = None;
2147        self.tree.reset();
2148
2149        sync_result
2150    }
2151
2152    // Locks a store which was previously unlocked read-only (see `Self::unlock_read_only`).  Data
2153    // is not flushed, and instead any journaled mutations are buffered back into the ObjectStore
2154    // and will be replayed next time the store is unlocked.
2155    pub fn lock_read_only(&self) {
2156        *self.lock_state.lock() = LockState::Locked;
2157        *self.store_info.lock() = None;
2158        self.tree.reset();
2159    }
2160
2161    // Returns None if the object ID cipher needs to be created or rolled, or a more expensive
2162    // algorithm needs to be used.
2163    fn maybe_get_next_object_id(&self) -> Option<ReservedId<'_>> {
2164        self.last_object_id.lock().try_get_next().map(|id| ReservedId::new(self, id))
2165    }
2166
2167    /// Returns a new object ID that can be used.  This will create an object ID cipher if needed.
2168    ///
2169    /// If the object ID key needs to be rolled, a new transaction will be created and committed.
2170    /// This transaction does not take the filesystem lock, hence `txn_guard`.
2171    pub(super) async fn get_next_object_id(
2172        &self,
2173        txn_guard: &TxnGuard<'_>,
2174    ) -> Result<ReservedId<'_>, Error> {
2175        {
2176            let mut last_object_id = self.last_object_id.lock();
2177            if let Some(id) = last_object_id.try_get_next() {
2178                return Ok(ReservedId::new(self, id));
2179            }
2180            ensure!(
2181                !matches!(&*last_object_id, LastObjectId::Unencrypted { .. }),
2182                FxfsError::Inconsistent
2183            );
2184        }
2185
2186        let parent_store = self.parent_store().unwrap();
2187
2188        // Create a transaction (which has a lock) and then check again.
2189        //
2190        // NOTE: Since this can be a nested transaction, we must take care to avoid deadlocks; no
2191        // more locks should be taken whilst we hold this lock.
2192        let mut transaction = self
2193            .filesystem()
2194            .new_transaction(
2195                lock_keys![LockKey::object(parent_store.store_object_id, self.store_object_id)],
2196                Options {
2197                    // We must skip journal checks because this transaction might be needed to
2198                    // compact.
2199                    skip_journal_checks: true,
2200                    borrow_metadata_space: true,
2201                    txn_guard: Some(txn_guard),
2202                    ..Default::default()
2203                },
2204            )
2205            .await?;
2206
2207        let mut next_id_hi = 0;
2208
2209        let is_low_32_bit = {
2210            let mut last_object_id = self.last_object_id.lock();
2211            if let Some(id) = last_object_id.try_get_next() {
2212                // Something else raced and created/rolled the cipher.
2213                return Ok(ReservedId::new(self, id));
2214            }
2215
2216            match &*last_object_id {
2217                LastObjectId::Encrypted { id, .. } => {
2218                    // It shouldn't be possible for last_object_id to wrap within our lifetime, so
2219                    // if this happens, it's most likely due to corruption.
2220                    next_id_hi =
2221                        id.checked_add(1 << 32).ok_or(FxfsError::Inconsistent)? & OBJECT_ID_HI_MASK;
2222
2223                    info!(store_id = self.store_object_id; "Rolling object ID key");
2224
2225                    false
2226                }
2227                LastObjectId::Low32Bit { .. } => true,
2228                _ => unreachable!(),
2229            }
2230        };
2231
2232        if is_low_32_bit {
2233            // Keep picking an object ID at random until we find one free.
2234
2235            // To avoid races, this must be before we capture the layer set.
2236            self.last_object_id.lock().drain_unreserved();
2237
2238            let layer_set = self.tree.layer_set();
2239            let mut key = ObjectKey::object(0);
2240            loop {
2241                let next_id = rand::rng().next_u32() as u64;
2242                let Some(next_id) = NonZero::new(next_id) else { continue };
2243                if self.last_object_id.lock().is_reserved(next_id.get()) {
2244                    continue;
2245                }
2246                key.object_id = next_id.get();
2247                if layer_set.key_exists(&key).await? == Existence::Missing {
2248                    self.last_object_id.lock().reserve(next_id.get());
2249                    return Ok(ReservedId::new(self, next_id));
2250                }
2251            }
2252        } else {
2253            // Create a key.
2254            let (object_id_wrapped, object_id_unwrapped) = self
2255                .crypt()
2256                .unwrap()
2257                .create_key(self.store_object_id, KeyPurpose::Metadata)
2258                .await?;
2259
2260            // Normally we would use a mutation to note the updated key, but that would complicate
2261            // replay.  During replay, we need to keep track of the highest used object ID and this
2262            // is done by watching mutations to see when we create objects, and then decrypting
2263            // the object ID.  This relies on the unwrapped key being available, so as soon as
2264            // we detect the key has changed, we would need to immediately unwrap the key via the
2265            // crypt service.  Currently, this isn't easy to do during replay.  An option we could
2266            // consider would be to include the unencrypted object ID when we create objects, which
2267            // would avoid us having to decrypt the object ID during replay.
2268            //
2269            // For now and for historical reasons, the approach we take is to just write a new
2270            // version of StoreInfo here.  We must take care that we only update the key and not any
2271            // other information contained within StoreInfo because other information should only be
2272            // updated when we flush.  We are holding the lock on the StoreInfo file, so this will
2273            // prevent potential races with flushing.  To make sure we only change the key, we read
2274            // StoreInfo from storage rather than using our in-memory copy.  This won't be
2275            // performant, but rolling the object ID key will be extremely rare.
2276            let new_store_info = StoreInfo {
2277                last_object_id: LastObjectIdInfo::Encrypted {
2278                    id: next_id_hi,
2279                    key: object_id_wrapped.clone(),
2280                },
2281                ..self.load_store_info().await?
2282            };
2283
2284            self.write_store_info(&mut transaction, &new_store_info).await?;
2285
2286            transaction
2287                .commit_with_callback(|_| {
2288                    self.store_info.lock().as_mut().unwrap().last_object_id =
2289                        new_store_info.last_object_id;
2290                    match &mut *self.last_object_id.lock() {
2291                        LastObjectId::Encrypted { id, cipher } => {
2292                            **cipher = Ff1::new(&object_id_unwrapped);
2293                            *id = next_id_hi;
2294                            ReservedId::new(
2295                                self,
2296                                NonZero::new(next_id_hi | cipher.encrypt(0) as u64).unwrap(),
2297                            )
2298                        }
2299                        _ => unreachable!(),
2300                    }
2301                })
2302                .await
2303        }
2304    }
2305
2306    /// Query the next object ID that will be used. Intended for use when checking filesystem
2307    /// consistency. Prefer [`Self::get_next_object_id()`] for general use.
2308    pub(crate) fn query_next_object_id(&self) -> u64 {
2309        self.last_object_id.lock().peek_next()
2310    }
2311
2312    fn allocator(&self) -> Arc<Allocator> {
2313        self.filesystem().allocator()
2314    }
2315
2316    // If |transaction| has an impending mutation for the underlying object, returns that.
2317    // Otherwise, looks up the object from the tree and returns a suitable mutation for it.  The
2318    // mutation is returned here rather than the item because the mutation includes the operation
2319    // which has significance: inserting an object implies it's the first of its kind unlike
2320    // replacing an object.
2321    async fn txn_get_object_mutation(
2322        &self,
2323        transaction: &Transaction<'_>,
2324        object_id: u64,
2325    ) -> Result<ObjectStoreMutation, Error> {
2326        if let Some(mutation) =
2327            transaction.get_object_mutation(self.store_object_id, ObjectKey::object(object_id))
2328        {
2329            Ok(mutation.clone())
2330        } else {
2331            Ok(ObjectStoreMutation {
2332                item: self
2333                    .tree
2334                    .find(&ObjectKey::object(object_id))
2335                    .await?
2336                    .ok_or(FxfsError::Inconsistent)
2337                    .context("Object id missing")?,
2338                op: Operation::ReplaceOrInsert,
2339            })
2340        }
2341    }
2342
2343    /// Like txn_get_object_mutation but with expanded visibility.
2344    /// Only available in migration code.
2345    #[cfg(feature = "migration")]
2346    pub async fn get_object_mutation(
2347        &self,
2348        transaction: &Transaction<'_>,
2349        object_id: u64,
2350    ) -> Result<ObjectStoreMutation, Error> {
2351        self.txn_get_object_mutation(transaction, object_id).await
2352    }
2353
2354    fn update_last_object_id(&self, object_id: u64) {
2355        let mut last_object_id = self.last_object_id.lock();
2356        match &mut *last_object_id {
2357            LastObjectId::Pending => unreachable!(),
2358            LastObjectId::Unencrypted { id } => {
2359                if object_id > *id {
2360                    *id = object_id
2361                }
2362            }
2363            LastObjectId::Encrypted { id, cipher } => {
2364                // For encrypted stores, object_id will be encrypted here, so we must decrypt first.
2365
2366                // If the object ID cipher has been rolled, then it's possible we might see object
2367                // IDs that were generated using a different cipher so the decrypt here will return
2368                // the wrong value, but that won't matter because the hi part of the object ID
2369                // should still discriminate.
2370                let object_id =
2371                    object_id & OBJECT_ID_HI_MASK | cipher.decrypt(object_id as u32) as u64;
2372                if object_id > *id {
2373                    *id = object_id;
2374                }
2375            }
2376            LastObjectId::Low32Bit { .. } => {}
2377        }
2378    }
2379
2380    /// If possible, converts the given object ID to its unencrypted value.  Returns None if it is
2381    /// not possible to convert to its unencrypted value because the key is unavailable.
2382    pub fn to_unencrypted_object_id(&self, object_id: u64) -> Option<u64> {
2383        let last_object_id = self.last_object_id.lock();
2384        match &*last_object_id {
2385            LastObjectId::Pending => None,
2386            LastObjectId::Unencrypted { .. } | LastObjectId::Low32Bit { .. } => Some(object_id),
2387            LastObjectId::Encrypted { id, cipher } => {
2388                if id & OBJECT_ID_HI_MASK != object_id & OBJECT_ID_HI_MASK {
2389                    None
2390                } else {
2391                    Some(object_id & OBJECT_ID_HI_MASK | cipher.decrypt(object_id as u32) as u64)
2392                }
2393            }
2394        }
2395    }
2396
2397    /// Adds the specified object to the graveyard.
2398    pub fn add_to_graveyard(&self, transaction: &mut Transaction<'_>, object_id: u64) {
2399        let graveyard_id = self.graveyard_directory_object_id();
2400        assert_ne!(graveyard_id, INVALID_OBJECT_ID);
2401        transaction.add(
2402            self.store_object_id,
2403            Mutation::replace_or_insert_object(
2404                ObjectKey::graveyard_entry(graveyard_id, object_id),
2405                ObjectValue::Some,
2406            ),
2407        );
2408    }
2409
2410    /// Removes the specified object from the graveyard.  NB: Care should be taken when calling
2411    /// this because graveyard entries are used for purging deleted files *and* for trimming
2412    /// extents.  For example, consider the following sequence:
2413    ///
2414    ///     1. Add Trim graveyard entry.
2415    ///     2. Replace with Some graveyard entry (see above).
2416    ///     3. Remove graveyard entry.
2417    ///
2418    /// If the desire in #3 is just to cancel the effect of the Some entry, then #3 should
2419    /// actually be:
2420    ///
2421    ///     3. Replace with Trim graveyard entry.
2422    pub fn remove_from_graveyard(&self, transaction: &mut Transaction<'_>, object_id: u64) {
2423        transaction.add(
2424            self.store_object_id,
2425            Mutation::replace_or_insert_object(
2426                ObjectKey::graveyard_entry(self.graveyard_directory_object_id(), object_id),
2427                ObjectValue::None,
2428            ),
2429        );
2430    }
2431
2432    /// Removes the specified attribute from the graveyard. Unlike object graveyard entries,
2433    /// attribute graveyard entries only have one functionality (i.e. to purge deleted attributes)
2434    /// so the caller does not need to be concerned about replacing the graveyard attribute entry
2435    /// with its prior state when cancelling it. See comment on `remove_from_graveyard()`.
2436    pub fn remove_attribute_from_graveyard(
2437        &self,
2438        transaction: &mut Transaction<'_>,
2439        object_id: u64,
2440        attribute_id: u64,
2441    ) {
2442        transaction.add(
2443            self.store_object_id,
2444            Mutation::replace_or_insert_object(
2445                ObjectKey::graveyard_attribute_entry(
2446                    self.graveyard_directory_object_id(),
2447                    object_id,
2448                    attribute_id,
2449                ),
2450                ObjectValue::None,
2451            ),
2452        );
2453    }
2454
2455    // Roll the mutations key.  The new key will be written for the next encrypted mutation.
2456    async fn roll_mutations_key(&self, crypt: &dyn Crypt) -> Result<(), Error> {
2457        let (wrapped_key, unwrapped_key) =
2458            crypt.create_key(self.store_object_id, KeyPurpose::Metadata).await?;
2459
2460        // The mutations_cipher lock must be held for the duration so that mutations_cipher and
2461        // store_info are updated atomically.  Otherwise, write_mutation could find a new cipher but
2462        // end up writing the wrong wrapped key.
2463        let mut cipher = self.mutations_cipher.lock();
2464        *cipher = Some(StreamCipher::new(&unwrapped_key, 0));
2465        self.store_info.lock().as_mut().unwrap().mutations_key = Some(wrapped_key);
2466        // mutations_cipher_offset is updated by flush.
2467        Ok(())
2468    }
2469
2470    // When the symlink is unlocked, this function decrypts `link` and returns a bag of bytes that
2471    // is identical to that which was passed in as the target on `create_symlink`.
2472    // If the symlink is locked, this function hashes the encrypted `link` with Sha256 in order to
2473    // get a standard length and then base64 encodes the hash and returns that to the caller.
2474    pub async fn read_encrypted_symlink(
2475        &self,
2476        object_id: u64,
2477        link: Vec<u8>,
2478    ) -> Result<Vec<u8>, Error> {
2479        let mut link = link;
2480        let key = self
2481            .key_manager()
2482            .get_fscrypt_key(object_id, self.crypt().unwrap().as_ref(), async || {
2483                self.get_keys(object_id).await
2484            })
2485            .await?;
2486        if let Some(key) = key.into_cipher() {
2487            key.decrypt_symlink(object_id, &mut link)?;
2488            Ok(link)
2489        } else {
2490            // Locked symlinks are encoded using a hash_code of 0.
2491            let proxy_filename =
2492                fscrypt::proxy_filename::ProxyFilename::new_with_hash_code(0, &link);
2493            let proxy_filename_str: String = proxy_filename.into();
2494            Ok(proxy_filename_str.as_bytes().to_vec())
2495        }
2496    }
2497
2498    /// Returns the link of a symlink object.
2499    pub async fn read_symlink(&self, object_id: u64) -> Result<Vec<u8>, Error> {
2500        match self.tree.find(&ObjectKey::object(object_id)).await? {
2501            None => bail!(FxfsError::NotFound),
2502            Some(Item {
2503                value: ObjectValue::Object { kind: ObjectKind::EncryptedSymlink { link, .. }, .. },
2504                ..
2505            }) => self.read_encrypted_symlink(object_id, link.to_vec()).await,
2506            Some(Item {
2507                value: ObjectValue::Object { kind: ObjectKind::Symlink { link, .. }, .. },
2508                ..
2509            }) => Ok(link.to_vec()),
2510            Some(item) => Err(anyhow!(FxfsError::Inconsistent)
2511                .context(format!("Unexpected item in lookup: {item:?}"))),
2512        }
2513    }
2514
2515    /// Retrieves the wrapped keys for the given object.  The keys *should* be known to exist and it
2516    /// will be considered an inconsistency if they don't.
2517    pub async fn get_keys(&self, object_id: u64) -> Result<EncryptionKeys, Error> {
2518        match self.tree.find(&ObjectKey::keys(object_id)).await?.ok_or(FxfsError::Inconsistent)? {
2519            Item { value: ObjectValue::Keys(keys), .. } => Ok(keys),
2520            _ => Err(anyhow!(FxfsError::Inconsistent).context("open_object: Expected keys")),
2521        }
2522    }
2523
2524    pub async fn update_attributes<'a>(
2525        &self,
2526        transaction: &mut Transaction<'a>,
2527        object_id: u64,
2528        node_attributes: Option<&fio::MutableNodeAttributes>,
2529        change_time: Option<Timestamp>,
2530    ) -> Result<(), Error> {
2531        if change_time.is_none() {
2532            if let Some(attributes) = node_attributes {
2533                let empty_attributes = fio::MutableNodeAttributes { ..Default::default() };
2534                if *attributes == empty_attributes {
2535                    return Ok(());
2536                }
2537            } else {
2538                return Ok(());
2539            }
2540        }
2541        let mut mutation = self.txn_get_object_mutation(transaction, object_id).await?;
2542        if let ObjectValue::Object { ref mut attributes, .. } = mutation.item.value {
2543            if let Some(time) = change_time {
2544                attributes.change_time = time;
2545            }
2546            if let Some(node_attributes) = node_attributes {
2547                if let Some(time) = node_attributes.creation_time {
2548                    attributes.creation_time = Timestamp::from_nanos(time);
2549                }
2550                if let Some(time) = node_attributes.modification_time {
2551                    attributes.modification_time = Timestamp::from_nanos(time);
2552                }
2553                if let Some(time) = node_attributes.access_time {
2554                    attributes.access_time = Timestamp::from_nanos(time);
2555                }
2556                if node_attributes.mode.is_some()
2557                    || node_attributes.uid.is_some()
2558                    || node_attributes.gid.is_some()
2559                    || node_attributes.rdev.is_some()
2560                {
2561                    if let Some(a) = &mut attributes.posix_attributes {
2562                        if let Some(mode) = node_attributes.mode {
2563                            a.mode = mode;
2564                        }
2565                        if let Some(uid) = node_attributes.uid {
2566                            a.uid = uid;
2567                        }
2568                        if let Some(gid) = node_attributes.gid {
2569                            a.gid = gid;
2570                        }
2571                        if let Some(rdev) = node_attributes.rdev {
2572                            a.rdev = rdev;
2573                        }
2574                    } else {
2575                        attributes.posix_attributes = Some(PosixAttributes {
2576                            mode: node_attributes.mode.unwrap_or_default(),
2577                            uid: node_attributes.uid.unwrap_or_default(),
2578                            gid: node_attributes.gid.unwrap_or_default(),
2579                            rdev: node_attributes.rdev.unwrap_or_default(),
2580                        });
2581                    }
2582                }
2583            }
2584        } else {
2585            bail!(
2586                anyhow!(FxfsError::Inconsistent)
2587                    .context("ObjectStore.update_attributes: Expected object value")
2588            );
2589        };
2590        transaction.add(self.store_object_id(), Mutation::ObjectStore(mutation));
2591        Ok(())
2592    }
2593
2594    // Updates and commits the changes to access time in ObjectProperties. The update matches
2595    // Linux's RELATIME. That is, access time is updated to the current time if access time is less
2596    // than or equal to the last modification or status change, or if it has been more than a day
2597    // since the last access.  `precondition` is a condition to be checked *after* taking the lock
2598    // on the object.  If `precondition` returns false, no update will be performed.
2599    pub async fn update_access_time(
2600        &self,
2601        object_id: u64,
2602        props: &mut ObjectProperties,
2603        precondition: impl FnOnce() -> bool,
2604    ) -> Result<(), Error> {
2605        let access_time = props.access_time.as_nanos();
2606        let modification_time = props.modification_time.as_nanos();
2607        let change_time = props.change_time.as_nanos();
2608        let now = Timestamp::now();
2609        if access_time <= modification_time
2610            || access_time <= change_time
2611            || access_time
2612                < now.as_nanos()
2613                    - Timestamp::from(std::time::Duration::from_secs(24 * 60 * 60)).as_nanos()
2614        {
2615            let mut transaction = self
2616                .filesystem()
2617                .clone()
2618                .new_transaction(
2619                    lock_keys![LockKey::object(self.store_object_id, object_id,)],
2620                    Options { borrow_metadata_space: true, ..Default::default() },
2621                )
2622                .await?;
2623            if precondition() {
2624                self.update_attributes(
2625                    &mut transaction,
2626                    object_id,
2627                    Some(&fio::MutableNodeAttributes {
2628                        access_time: Some(now.as_nanos()),
2629                        ..Default::default()
2630                    }),
2631                    None,
2632                )
2633                .await?;
2634                transaction.commit().await?;
2635                props.access_time = now;
2636            }
2637        }
2638        Ok(())
2639    }
2640
2641    async fn write_store_info<'a>(
2642        &'a self,
2643        transaction: &mut Transaction<'a>,
2644        info: &StoreInfo,
2645    ) -> Result<(), Error> {
2646        let mut serialized_info = Vec::new();
2647        info.serialize_with_version(&mut serialized_info)?;
2648        let mut buf = self.device.allocate_buffer(serialized_info.len()).await;
2649        buf.as_mut_slice().copy_from_slice(&serialized_info[..]);
2650        self.store_info_handle.get().unwrap().txn_write(transaction, 0u64, buf.as_ref()).await
2651    }
2652
2653    pub fn mark_deleted(&self) {
2654        *self.lock_state.lock() = LockState::Deleted;
2655    }
2656
2657    #[cfg(test)]
2658    pub(crate) fn test_set_last_object_id(&self, object_id: u64) {
2659        match &mut *self.last_object_id.lock() {
2660            LastObjectId::Encrypted { id, .. } => *id = object_id,
2661            _ => unreachable!(),
2662        }
2663    }
2664}
2665
2666#[async_trait]
2667impl JournalingObject for ObjectStore {
2668    fn apply_mutation(
2669        &self,
2670        mutation: Mutation,
2671        context: &ApplyContext<'_, '_>,
2672        _assoc_obj: AssocObj<'_>,
2673    ) -> Result<(), Error> {
2674        match &*self.lock_state.lock() {
2675            LockState::Locked | LockState::Locking => {
2676                ensure!(
2677                    matches!(mutation, Mutation::BeginFlush | Mutation::EndFlush)
2678                        || matches!(
2679                            mutation,
2680                            Mutation::EncryptedObjectStore(_) | Mutation::UpdateMutationsKey(_)
2681                                if context.mode.is_replay()
2682                        ),
2683                    anyhow!(FxfsError::Inconsistent)
2684                        .context(format!("Unexpected mutation for encrypted store: {mutation:?}"))
2685                );
2686            }
2687            LockState::Invalid
2688            | LockState::Unlocking
2689            | LockState::Unencrypted
2690            | LockState::Unlocked { .. }
2691            | LockState::UnlockedReadOnly(..)
2692            | LockState::Deleted => {}
2693            lock_state @ _ => panic!("Unexpected lock state: {lock_state:?}"),
2694        }
2695        match mutation {
2696            Mutation::ObjectStore(ObjectStoreMutation { mut item, op }) => {
2697                item.sequence = context.checkpoint.file_offset;
2698                match op {
2699                    Operation::Insert => {
2700                        let mut unreserve_id = INVALID_OBJECT_ID;
2701                        // If we are inserting an object record for the first time, it signifies the
2702                        // birth of the object so we need to adjust the object count.
2703                        if matches!(item.value, ObjectValue::Object { .. }) {
2704                            {
2705                                let info = &mut self.store_info.lock();
2706                                let object_count = &mut info.as_mut().unwrap().object_count;
2707                                *object_count = object_count.saturating_add(1);
2708                            }
2709                            if context.mode.is_replay() {
2710                                self.update_last_object_id(item.key.object_id);
2711                            } else {
2712                                unreserve_id = item.key.object_id;
2713                            }
2714                        }
2715                        self.tree.insert(item)?;
2716                        if unreserve_id != INVALID_OBJECT_ID {
2717                            // To avoid races, this *must* be after the `tree.insert(..)` above.
2718                            self.last_object_id.lock().unreserve(unreserve_id);
2719                        }
2720                    }
2721                    Operation::ReplaceOrInsert => {
2722                        self.tree.replace_or_insert(item);
2723                    }
2724                    Operation::Merge => {
2725                        if item.is_tombstone() {
2726                            let info = &mut self.store_info.lock();
2727                            let object_count = &mut info.as_mut().unwrap().object_count;
2728                            *object_count = object_count.saturating_sub(1);
2729                        }
2730                        let lower_bound = item.key.key_for_merge_into();
2731                        self.tree.merge_into(item, &lower_bound);
2732                    }
2733                }
2734            }
2735            Mutation::BeginFlush => {
2736                ensure!(self.parent_store.is_some(), FxfsError::Inconsistent);
2737                self.tree.seal();
2738            }
2739            Mutation::EndFlush => ensure!(self.parent_store.is_some(), FxfsError::Inconsistent),
2740            Mutation::EncryptedObjectStore(_) | Mutation::UpdateMutationsKey(_) => {
2741                // We will process these during Self::unlock.
2742                ensure!(
2743                    !matches!(&*self.lock_state.lock(), LockState::Unencrypted),
2744                    FxfsError::Inconsistent
2745                );
2746            }
2747            Mutation::CreateInternalDir(object_id) => {
2748                ensure!(object_id != INVALID_OBJECT_ID, FxfsError::Inconsistent);
2749                self.store_info.lock().as_mut().unwrap().internal_directory_object_id = object_id;
2750            }
2751            _ => bail!("unexpected mutation: {:?}", mutation),
2752        }
2753        self.counters.lock().mutations_applied += 1;
2754        Ok(())
2755    }
2756
2757    fn drop_mutation(&self, mutation: Mutation, _transaction: &Transaction<'_>) {
2758        self.counters.lock().mutations_dropped += 1;
2759        if let Mutation::ObjectStore(ObjectStoreMutation {
2760            item: Item { key: ObjectKey { object_id, .. }, value: ObjectValue::Object { .. }, .. },
2761            op: Operation::Insert,
2762        }) = mutation
2763        {
2764            self.last_object_id.lock().unreserve(object_id);
2765        }
2766    }
2767
2768    /// Push all in-memory structures to the device. This is not necessary for sync since the
2769    /// journal will take care of it.  This is supposed to be called when there is either memory or
2770    /// space pressure (flushing the store will persist in-memory data and allow the journal file to
2771    /// be trimmed).
2772    ///
2773    /// Also returns the earliest version of a struct in the filesystem (when known).
2774    async fn flush(&self) -> Result<Version, Error> {
2775        self.flush_with_reason(flush::Reason::Journal).await
2776    }
2777
2778    fn write_mutation(&self, mutation: &Mutation, mut writer: journal::Writer<'_>) {
2779        // Intentionally enumerating all variants to force a decision on any new variants. Encrypt
2780        // all mutations that could affect an encrypted object store contents or the `StoreInfo` of
2781        // the encrypted object store. During `unlock()` any mutations which haven't been encrypted
2782        // won't be replayed after reading `StoreInfo`.
2783        match mutation {
2784            // Whilst CreateInternalDir is a mutation for `StoreInfo`, which isn't encrypted, we
2785            // still choose to encrypt the mutation because it makes it easier to deal with replay.
2786            // When we replay mutations for an encrypted store, the only thing we keep in memory are
2787            // the encrypted mutations; we don't keep `StoreInfo` or changes to it in memory. So, by
2788            // encrypting the CreateInternalDir mutation here, it means we don't have to track both
2789            // encrypted mutations bound for the LSM tree and unencrypted mutations for `StoreInfo`
2790            // to use in `unlock()`. It'll just bundle CreateInternalDir mutations with the other
2791            // encrypted mutations and handled them all in sequence during `unlock()`.
2792            Mutation::ObjectStore(_) | Mutation::CreateInternalDir(_) => {
2793                let mut cipher = self.mutations_cipher.lock();
2794                if let Some(cipher) = cipher.as_mut() {
2795                    // If this is the first time we've used this key, we must write the key out.
2796                    if cipher.offset() == 0 {
2797                        writer.write(Mutation::update_mutations_key(
2798                            self.store_info
2799                                .lock()
2800                                .as_ref()
2801                                .unwrap()
2802                                .mutations_key
2803                                .as_ref()
2804                                .unwrap()
2805                                .clone(),
2806                        ));
2807                    }
2808                    let mut buffer = Vec::new();
2809                    mutation.serialize_into(&mut buffer).unwrap();
2810                    cipher.encrypt(&mut buffer);
2811                    writer.write(Mutation::EncryptedObjectStore(buffer.into()));
2812                    return;
2813                }
2814            }
2815            // `EncryptedObjectStore` and `UpdateMutationsKey` are both obviously associated with
2816            // encrypted object stores, but are either the encrypted mutation data itself or
2817            // metadata governing how the data will be encrypted. They should only be produced here.
2818            Mutation::EncryptedObjectStore(_) | Mutation::UpdateMutationsKey(_) => {
2819                debug_assert!(false, "Only this method should generate encrypted mutations");
2820            }
2821            // `BeginFlush` and `EndFlush` are not needed during `unlock()` and are needed during
2822            // the initial journal replay, so should not be encrypted. `Allocator`, `DeleteVolume`,
2823            // `UpdateBorrowed` mutations are never associated with an encrypted store as we do not
2824            // encrypt the allocator or root/root-parent stores so we can avoid the locking.
2825            Mutation::Allocator(_)
2826            | Mutation::BeginFlush
2827            | Mutation::EndFlush
2828            | Mutation::DeleteVolume
2829            | Mutation::UpdateBorrowed(_) => {}
2830        }
2831        writer.write(mutation.clone());
2832    }
2833}
2834
2835impl Drop for ObjectStore {
2836    fn drop(&mut self) {
2837        let mut last_object_id = self.last_object_id.lock();
2838        last_object_id.drain_unreserved();
2839        match &*last_object_id {
2840            LastObjectId::Low32Bit { reserved, .. } => debug_assert!(reserved.is_empty()),
2841            _ => {}
2842        }
2843    }
2844}
2845
2846impl HandleOwner for ObjectStore {}
2847
2848impl AsRef<ObjectStore> for ObjectStore {
2849    fn as_ref(&self) -> &ObjectStore {
2850        self
2851    }
2852}
2853
2854fn layer_size_from_encrypted_mutations_size(size: u64) -> u64 {
2855    // This is similar to reserved_space_from_journal_usage. It needs to be a worst case estimate of
2856    // the amount of metadata space that might need to be reserved to allow the encrypted mutations
2857    // to be written to layer files.  It needs to be >= than reservation_amount_from_layer_size will
2858    // return once the data has been written to layer files and <= than
2859    // reserved_space_from_journal_usage would use.  We can't just use
2860    // reserved_space_from_journal_usage because the encrypted mutations file includes some extra
2861    // data (it includes the checkpoints) that isn't written in the same way to the journal.
2862    size * 3
2863}
2864
2865impl AssociatedObject for ObjectStore {}
2866
2867/// Argument to the trim_some method.
2868#[derive(Debug)]
2869pub enum TrimMode {
2870    /// Trim extents beyond the current size.
2871    UseSize,
2872
2873    /// Trim extents beyond the supplied offset.
2874    FromOffset(u64),
2875
2876    /// Remove the object (or attribute) from the store once it is fully trimmed.
2877    Tombstone(TombstoneMode),
2878}
2879
2880/// Sets the mode for tombstoning (either at the object or attribute level).
2881#[derive(Debug)]
2882pub enum TombstoneMode {
2883    Object,
2884    Attribute,
2885}
2886
2887/// Result of the trim_some method.
2888#[derive(Debug)]
2889pub enum TrimResult {
2890    /// We reached the limit of the transaction and more extents might follow.
2891    Incomplete,
2892
2893    /// We finished this attribute.  Returns the ID of the next attribute for the same object if
2894    /// there is one.
2895    Done(Option<u64>),
2896}
2897
2898/// Loads store info.
2899pub async fn load_store_info(
2900    parent: &Arc<ObjectStore>,
2901    store_object_id: u64,
2902) -> Result<StoreInfo, Error> {
2903    load_store_info_from_handle(
2904        &ObjectStore::open_object(parent, store_object_id, HandleOptions::default(), None).await?,
2905    )
2906    .await
2907}
2908
2909async fn load_store_info_from_handle(
2910    handle: &DataObjectHandle<impl HandleOwner>,
2911) -> Result<StoreInfo, Error> {
2912    Ok(if handle.get_size() > 0 {
2913        let serialized_info = handle.contents(MAX_STORE_INFO_SERIALIZED_SIZE).await?;
2914        let mut cursor = std::io::Cursor::new(serialized_info);
2915        let (store_info, _) = StoreInfo::deserialize_with_version(&mut cursor)
2916            .context("Failed to deserialize StoreInfo")?;
2917        store_info
2918    } else {
2919        // The store_info will be absent for a newly created and empty object store.
2920        StoreInfo::default()
2921    })
2922}
2923
2924#[cfg(test)]
2925mod tests {
2926    use super::{
2927        DEFAULT_DATA_ATTRIBUTE_ID, FSVERITY_MERKLE_ATTRIBUTE_ID, FsverityMetadata, HandleOptions,
2928        LastObjectId, LastObjectIdInfo, LockKey, MAX_STORE_INFO_SERIALIZED_SIZE, Mutation,
2929        NO_OWNER, NewChildStoreOptions, OBJECT_ID_HI_MASK, ObjectStore, RootDigest, StoreInfo,
2930        StoreOptions, StoreOwner,
2931    };
2932    use crate::errors::FxfsError;
2933    use crate::filesystem::{FxFilesystem, JournalingObject, OpenFxFilesystem, SyncOptions};
2934    use crate::fsck::{fsck, fsck_volume};
2935    use crate::lsm_tree::Query;
2936    use crate::lsm_tree::types::{ItemRef, LayerIterator};
2937    use crate::object_handle::{
2938        INVALID_OBJECT_ID, ObjectHandle, ReadObjectHandle, WriteObjectHandle,
2939    };
2940    use crate::object_store::directory::Directory;
2941    use crate::object_store::object_record::{AttributeKey, ObjectKey, ObjectKind, ObjectValue};
2942    use crate::object_store::transaction::{Options, lock_keys};
2943    use crate::object_store::volume::root_volume;
2944    use crate::serialized_types::VersionedLatest;
2945    use crate::testing;
2946    use assert_matches::assert_matches;
2947    use async_trait::async_trait;
2948    use fuchsia_async as fasync;
2949    use fuchsia_sync::Mutex;
2950    use futures::join;
2951    use fxfs_crypto::ff1::Ff1;
2952    use fxfs_crypto::{
2953        Crypt, FXFS_KEY_SIZE, FXFS_WRAPPED_KEY_SIZE, FxfsKey, UnwrappedKey, WrappedKeyBytes,
2954    };
2955    use fxfs_insecure_crypto::new_insecure_crypt;
2956
2957    use std::sync::Arc;
2958    use std::time::Duration;
2959    use storage_device::DeviceHolder;
2960    use storage_device::fake_device::FakeDevice;
2961    use test_case::test_case;
2962
2963    const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
2964
2965    async fn test_filesystem() -> OpenFxFilesystem {
2966        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
2967        FxFilesystem::new_empty(device).await.expect("new_empty failed")
2968    }
2969
2970    #[fuchsia::test]
2971    async fn test_item_sequences() {
2972        let fs = test_filesystem().await;
2973        let object1;
2974        let object2;
2975        let object3;
2976        let mut transaction = fs
2977            .clone()
2978            .new_transaction(lock_keys![], Options::default())
2979            .await
2980            .expect("new_transaction failed");
2981        let store = fs.root_store();
2982        object1 = Arc::new(
2983            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2984                .await
2985                .expect("create_object failed"),
2986        );
2987        transaction.commit().await.expect("commit failed");
2988        let mut transaction = fs
2989            .clone()
2990            .new_transaction(lock_keys![], Options::default())
2991            .await
2992            .expect("new_transaction failed");
2993        object2 = Arc::new(
2994            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2995                .await
2996                .expect("create_object failed"),
2997        );
2998        transaction.commit().await.expect("commit failed");
2999
3000        fs.sync(SyncOptions::default()).await.expect("sync failed");
3001
3002        let mut transaction = fs
3003            .clone()
3004            .new_transaction(lock_keys![], Options::default())
3005            .await
3006            .expect("new_transaction failed");
3007        object3 = Arc::new(
3008            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
3009                .await
3010                .expect("create_object failed"),
3011        );
3012        transaction.commit().await.expect("commit failed");
3013
3014        let layer_set = store.tree.layer_set();
3015        let mut merger = layer_set.merger();
3016        let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
3017        let mut sequences = [0u64; 3];
3018        while let Some(ItemRef { key: ObjectKey { object_id, .. }, sequence, .. }) = iter.get() {
3019            if *object_id == object1.object_id() {
3020                sequences[0] = sequence;
3021            } else if *object_id == object2.object_id() {
3022                sequences[1] = sequence;
3023            } else if *object_id == object3.object_id() {
3024                sequences[2] = sequence;
3025            }
3026            iter.advance().await.expect("advance failed");
3027        }
3028
3029        assert!(sequences[0] <= sequences[1], "sequences: {:?}", sequences);
3030        // The last item came after a sync, so should be strictly greater.
3031        assert!(sequences[1] < sequences[2], "sequences: {:?}", sequences);
3032        fs.close().await.expect("Close failed");
3033    }
3034
3035    #[fuchsia::test]
3036    async fn test_verified_file_with_verified_attribute() {
3037        let fs: OpenFxFilesystem = test_filesystem().await;
3038        let mut transaction = fs
3039            .clone()
3040            .new_transaction(lock_keys![], Options::default())
3041            .await
3042            .expect("new_transaction failed");
3043        let store = fs.root_store();
3044        let object = Arc::new(
3045            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
3046                .await
3047                .expect("create_object failed"),
3048        );
3049
3050        transaction.add(
3051            store.store_object_id(),
3052            Mutation::replace_or_insert_object(
3053                ObjectKey::attribute(
3054                    object.object_id(),
3055                    DEFAULT_DATA_ATTRIBUTE_ID,
3056                    AttributeKey::Attribute,
3057                ),
3058                ObjectValue::verified_attribute(
3059                    0,
3060                    FsverityMetadata::Internal(RootDigest::Sha256([0; 32]), vec![]),
3061                ),
3062            ),
3063        );
3064
3065        transaction.add(
3066            store.store_object_id(),
3067            Mutation::replace_or_insert_object(
3068                ObjectKey::attribute(
3069                    object.object_id(),
3070                    FSVERITY_MERKLE_ATTRIBUTE_ID,
3071                    AttributeKey::Attribute,
3072                ),
3073                ObjectValue::attribute(0, false),
3074            ),
3075        );
3076
3077        transaction.commit().await.unwrap();
3078
3079        let handle =
3080            ObjectStore::open_object(&store, object.object_id(), HandleOptions::default(), None)
3081                .await
3082                .expect("open_object failed");
3083
3084        assert!(handle.is_verified_file());
3085
3086        fs.close().await.expect("Close failed");
3087    }
3088
3089    #[fuchsia::test]
3090    async fn test_verified_file_without_verified_attribute() {
3091        let fs: OpenFxFilesystem = test_filesystem().await;
3092        let mut transaction = fs
3093            .clone()
3094            .new_transaction(lock_keys![], Options::default())
3095            .await
3096            .expect("new_transaction failed");
3097        let store = fs.root_store();
3098        let object = Arc::new(
3099            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
3100                .await
3101                .expect("create_object failed"),
3102        );
3103
3104        transaction.commit().await.unwrap();
3105
3106        let handle =
3107            ObjectStore::open_object(&store, object.object_id(), HandleOptions::default(), None)
3108                .await
3109                .expect("open_object failed");
3110
3111        assert!(!handle.is_verified_file());
3112
3113        fs.close().await.expect("Close failed");
3114    }
3115
3116    #[fuchsia::test]
3117    async fn test_create_and_open_store() {
3118        let fs = test_filesystem().await;
3119        let store_id = {
3120            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3121            root_volume
3122                .new_volume(
3123                    "test",
3124                    NewChildStoreOptions {
3125                        options: StoreOptions {
3126                            owner: NO_OWNER,
3127                            crypt: Some(Arc::new(new_insecure_crypt())),
3128                        },
3129                        ..Default::default()
3130                    },
3131                )
3132                .await
3133                .expect("new_volume failed")
3134                .store_object_id()
3135        };
3136
3137        fs.close().await.expect("close failed");
3138        let device = fs.take_device().await;
3139        device.reopen(false);
3140        let fs = FxFilesystem::open(device).await.expect("open failed");
3141
3142        {
3143            let store = fs.object_manager().store(store_id).expect("store not found");
3144            store.unlock(NO_OWNER, Arc::new(new_insecure_crypt())).await.expect("unlock failed");
3145        }
3146        fs.close().await.expect("Close failed");
3147    }
3148
3149    #[fuchsia::test]
3150    async fn test_create_and_open_internal_dir() {
3151        let fs = test_filesystem().await;
3152        let dir_id;
3153        let store_id;
3154        {
3155            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3156            let store = root_volume
3157                .new_volume(
3158                    "test",
3159                    NewChildStoreOptions {
3160                        options: StoreOptions {
3161                            owner: NO_OWNER,
3162                            crypt: Some(Arc::new(new_insecure_crypt())),
3163                        },
3164                        ..Default::default()
3165                    },
3166                )
3167                .await
3168                .expect("new_volume failed");
3169            dir_id =
3170                store.get_or_create_internal_directory_id().await.expect("Create internal dir");
3171            store_id = store.store_object_id();
3172        }
3173
3174        fs.close().await.expect("close failed");
3175        let device = fs.take_device().await;
3176        device.reopen(false);
3177        let fs = FxFilesystem::open(device).await.expect("open failed");
3178
3179        {
3180            let store = fs.object_manager().store(store_id).expect("store not found");
3181            store.unlock(NO_OWNER, Arc::new(new_insecure_crypt())).await.expect("unlock failed");
3182            assert_eq!(
3183                dir_id,
3184                store.get_or_create_internal_directory_id().await.expect("Retrieving dir")
3185            );
3186            let obj = store
3187                .tree()
3188                .find(&ObjectKey::object(dir_id))
3189                .await
3190                .expect("Searching tree for dir")
3191                .unwrap();
3192            assert_matches!(
3193                obj.value,
3194                ObjectValue::Object { kind: ObjectKind::Directory { .. }, .. }
3195            );
3196        }
3197        fs.close().await.expect("Close failed");
3198    }
3199
3200    #[fuchsia::test]
3201    async fn test_create_and_open_internal_dir_unencrypted() {
3202        let fs = test_filesystem().await;
3203        let dir_id;
3204        let store_id;
3205        {
3206            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3207            let store = root_volume
3208                .new_volume("test", NewChildStoreOptions::default())
3209                .await
3210                .expect("new_volume failed");
3211            dir_id =
3212                store.get_or_create_internal_directory_id().await.expect("Create internal dir");
3213            store_id = store.store_object_id();
3214        }
3215
3216        fs.close().await.expect("close failed");
3217        let device = fs.take_device().await;
3218        device.reopen(false);
3219        let fs = FxFilesystem::open(device).await.expect("open failed");
3220
3221        {
3222            let store = fs.object_manager().store(store_id).expect("store not found");
3223            assert_eq!(
3224                dir_id,
3225                store.get_or_create_internal_directory_id().await.expect("Retrieving dir")
3226            );
3227            let obj = store
3228                .tree()
3229                .find(&ObjectKey::object(dir_id))
3230                .await
3231                .expect("Searching tree for dir")
3232                .unwrap();
3233            assert_matches!(
3234                obj.value,
3235                ObjectValue::Object { kind: ObjectKind::Directory { .. }, .. }
3236            );
3237        }
3238        fs.close().await.expect("Close failed");
3239    }
3240
3241    #[fuchsia::test(threads = 10)]
3242    async fn test_old_layers_are_purged() {
3243        let fs = test_filesystem().await;
3244
3245        let store = fs.root_store();
3246        let mut transaction = fs
3247            .clone()
3248            .new_transaction(lock_keys![], Options::default())
3249            .await
3250            .expect("new_transaction failed");
3251        let object = Arc::new(
3252            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
3253                .await
3254                .expect("create_object failed"),
3255        );
3256        transaction.commit().await.expect("commit failed");
3257
3258        store.flush().await.expect("flush failed");
3259
3260        let mut buf = object.allocate_buffer(5).await;
3261        buf.as_mut_slice().copy_from_slice(b"hello");
3262        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3263
3264        // Getting the layer-set should cause the flush to stall.
3265        let layer_set = store.tree().layer_set();
3266
3267        let done = Mutex::new(false);
3268        let mut object_id = 0;
3269
3270        join!(
3271            async {
3272                store.flush().await.expect("flush failed");
3273                assert!(*done.lock());
3274            },
3275            async {
3276                // This is a halting problem so all we can do is sleep.
3277                fasync::Timer::new(Duration::from_secs(1)).await;
3278                *done.lock() = true;
3279                object_id = layer_set.layers.last().unwrap().handle().unwrap().object_id();
3280                std::mem::drop(layer_set);
3281            }
3282        );
3283
3284        if let Err(e) = ObjectStore::open_object(
3285            &store.parent_store.as_ref().unwrap(),
3286            object_id,
3287            HandleOptions::default(),
3288            store.crypt(),
3289        )
3290        .await
3291        {
3292            assert!(FxfsError::NotFound.matches(&e));
3293        } else {
3294            panic!("open_object succeeded");
3295        }
3296    }
3297
3298    #[fuchsia::test]
3299    async fn test_tombstone_deletes_data() {
3300        let fs = test_filesystem().await;
3301        let root_store = fs.root_store();
3302        let child_id = {
3303            let mut transaction = fs
3304                .clone()
3305                .new_transaction(lock_keys![], Options::default())
3306                .await
3307                .expect("new_transaction failed");
3308            let child = ObjectStore::create_object(
3309                &root_store,
3310                &mut transaction,
3311                HandleOptions::default(),
3312                None,
3313            )
3314            .await
3315            .expect("create_object failed");
3316            transaction.commit().await.expect("commit failed");
3317
3318            // Allocate an extent in the file.
3319            let mut buffer = child.allocate_buffer(8192).await;
3320            buffer.as_mut_slice().fill(0xaa);
3321            child.write_or_append(Some(0), buffer.as_ref()).await.expect("write failed");
3322
3323            child.object_id()
3324        };
3325
3326        root_store.tombstone_object(child_id, Options::default()).await.expect("tombstone failed");
3327
3328        // Let fsck check allocations.
3329        fsck(fs.clone()).await.expect("fsck failed");
3330    }
3331
3332    #[fuchsia::test]
3333    async fn test_tombstone_purges_keys() {
3334        let fs = test_filesystem().await;
3335        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3336        let store = root_volume
3337            .new_volume(
3338                "test",
3339                NewChildStoreOptions {
3340                    options: StoreOptions {
3341                        crypt: Some(Arc::new(new_insecure_crypt())),
3342                        ..StoreOptions::default()
3343                    },
3344                    ..NewChildStoreOptions::default()
3345                },
3346            )
3347            .await
3348            .expect("new_volume failed");
3349        let mut transaction = fs
3350            .clone()
3351            .new_transaction(lock_keys![], Options::default())
3352            .await
3353            .expect("new_transaction failed");
3354        let child =
3355            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
3356                .await
3357                .expect("create_object failed");
3358        transaction.commit().await.expect("commit failed");
3359        assert!(store.key_manager.get(child.object_id()).await.unwrap().is_some());
3360        store
3361            .tombstone_object(child.object_id(), Options::default())
3362            .await
3363            .expect("tombstone_object failed");
3364        assert!(store.key_manager.get(child.object_id()).await.unwrap().is_none());
3365        fs.close().await.expect("close failed");
3366    }
3367
3368    #[fuchsia::test]
3369    async fn test_major_compaction_discards_unnecessary_records() {
3370        let fs = test_filesystem().await;
3371        let root_store = fs.root_store();
3372        let child_id = {
3373            let mut transaction = fs
3374                .clone()
3375                .new_transaction(lock_keys![], Options::default())
3376                .await
3377                .expect("new_transaction failed");
3378            let child = ObjectStore::create_object(
3379                &root_store,
3380                &mut transaction,
3381                HandleOptions::default(),
3382                None,
3383            )
3384            .await
3385            .expect("create_object failed");
3386            transaction.commit().await.expect("commit failed");
3387
3388            // Allocate an extent in the file.
3389            let mut buffer = child.allocate_buffer(8192).await;
3390            buffer.as_mut_slice().fill(0xaa);
3391            child.write_or_append(Some(0), buffer.as_ref()).await.expect("write failed");
3392
3393            child.object_id()
3394        };
3395
3396        root_store.tombstone_object(child_id, Options::default()).await.expect("tombstone failed");
3397        {
3398            let layers = root_store.tree.layer_set();
3399            let mut merger = layers.merger();
3400            let iter = merger
3401                .query(Query::FullRange(&ObjectKey::object(child_id)))
3402                .await
3403                .expect("seek failed");
3404            // Find at least one object still in the tree.
3405            match iter.get() {
3406                Some(ItemRef { key: ObjectKey { object_id, .. }, .. })
3407                    if *object_id == child_id => {}
3408                _ => panic!("Objects should still be in the tree."),
3409            }
3410        }
3411        root_store.flush().await.expect("flush failed");
3412
3413        // There should be no records for the object.
3414        let layers = root_store.tree.layer_set();
3415        let mut merger = layers.merger();
3416        let iter = merger
3417            .query(Query::FullRange(&ObjectKey::object(child_id)))
3418            .await
3419            .expect("seek failed");
3420        match iter.get() {
3421            None => {}
3422            Some(ItemRef { key: ObjectKey { object_id, .. }, .. }) => {
3423                assert_ne!(*object_id, child_id)
3424            }
3425        }
3426    }
3427
3428    #[fuchsia::test]
3429    async fn test_overlapping_extents_in_different_layers() {
3430        let fs = test_filesystem().await;
3431        let store = fs.root_store();
3432
3433        let mut transaction = fs
3434            .clone()
3435            .new_transaction(
3436                lock_keys![LockKey::object(
3437                    store.store_object_id(),
3438                    store.root_directory_object_id()
3439                )],
3440                Options::default(),
3441            )
3442            .await
3443            .expect("new_transaction failed");
3444        let root_directory =
3445            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
3446        let object = root_directory
3447            .create_child_file(&mut transaction, "test")
3448            .await
3449            .expect("create_child_file failed");
3450        transaction.commit().await.expect("commit failed");
3451
3452        let buf = object.allocate_buffer(16384).await;
3453        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3454
3455        store.flush().await.expect("flush failed");
3456
3457        object.write_or_append(Some(0), buf.subslice(0..4096)).await.expect("write failed");
3458
3459        // At this point, we should have an extent for 0..16384 in a layer that has been flushed,
3460        // and an extent for 0..4096 that partially overwrites it.  Writing to 0..16384 should
3461        // overwrite both of those extents.
3462        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3463
3464        fsck(fs.clone()).await.expect("fsck failed");
3465    }
3466
3467    #[fuchsia::test(threads = 10)]
3468    async fn test_encrypted_mutations() {
3469        async fn one_iteration(
3470            fs: OpenFxFilesystem,
3471            crypt: Arc<dyn Crypt>,
3472            iteration: u64,
3473        ) -> OpenFxFilesystem {
3474            async fn reopen(fs: OpenFxFilesystem) -> OpenFxFilesystem {
3475                fs.close().await.expect("Close failed");
3476                let device = fs.take_device().await;
3477                device.reopen(false);
3478                FxFilesystem::open(device).await.expect("FS open failed")
3479            }
3480
3481            let fs = reopen(fs).await;
3482
3483            let (store_object_id, object_id) = {
3484                let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3485                let store = root_volume
3486                    .volume(
3487                        "test",
3488                        StoreOptions { crypt: Some(crypt.clone()), ..StoreOptions::default() },
3489                    )
3490                    .await
3491                    .expect("volume failed");
3492
3493                let mut transaction = fs
3494                    .clone()
3495                    .new_transaction(
3496                        lock_keys![LockKey::object(
3497                            store.store_object_id(),
3498                            store.root_directory_object_id(),
3499                        )],
3500                        Options::default(),
3501                    )
3502                    .await
3503                    .expect("new_transaction failed");
3504                let root_directory = Directory::open(&store, store.root_directory_object_id())
3505                    .await
3506                    .expect("open failed");
3507                let object = root_directory
3508                    .create_child_file(&mut transaction, &format!("test {}", iteration))
3509                    .await
3510                    .expect("create_child_file failed");
3511                transaction.commit().await.expect("commit failed");
3512
3513                let mut buf = object.allocate_buffer(1000).await;
3514                for i in 0..buf.len() {
3515                    buf.as_mut_slice()[i] = i as u8;
3516                }
3517                object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3518
3519                (store.store_object_id(), object.object_id())
3520            };
3521
3522            let fs = reopen(fs).await;
3523
3524            let check_object = |fs: Arc<FxFilesystem>| {
3525                let crypt = crypt.clone();
3526                async move {
3527                    let root_volume = root_volume(fs).await.expect("root_volume failed");
3528                    let volume = root_volume
3529                        .volume(
3530                            "test",
3531                            StoreOptions { crypt: Some(crypt), ..StoreOptions::default() },
3532                        )
3533                        .await
3534                        .expect("volume failed");
3535
3536                    let object = ObjectStore::open_object(
3537                        &volume,
3538                        object_id,
3539                        HandleOptions::default(),
3540                        None,
3541                    )
3542                    .await
3543                    .expect("open_object failed");
3544                    let mut buf = object.allocate_buffer(1000).await;
3545                    assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), 1000);
3546                    for i in 0..buf.len() {
3547                        assert_eq!(buf.as_slice()[i], i as u8);
3548                    }
3549                }
3550            };
3551
3552            check_object(fs.clone()).await;
3553
3554            let fs = reopen(fs).await;
3555
3556            // At this point the "test" volume is locked.  Before checking the object, flush the
3557            // filesystem.  This should leave a file with encrypted mutations.
3558            fs.object_manager().flush().await.expect("flush failed");
3559
3560            assert_ne!(
3561                fs.object_manager()
3562                    .store(store_object_id)
3563                    .unwrap()
3564                    .load_store_info()
3565                    .await
3566                    .expect("load_store_info failed")
3567                    .encrypted_mutations_object_id,
3568                INVALID_OBJECT_ID
3569            );
3570
3571            check_object(fs.clone()).await;
3572
3573            // Checking the object should have triggered a flush and so now there should be no
3574            // encrypted mutations object.
3575            assert_eq!(
3576                fs.object_manager()
3577                    .store(store_object_id)
3578                    .unwrap()
3579                    .load_store_info()
3580                    .await
3581                    .expect("load_store_info failed")
3582                    .encrypted_mutations_object_id,
3583                INVALID_OBJECT_ID
3584            );
3585
3586            let fs = reopen(fs).await;
3587
3588            fsck(fs.clone()).await.expect("fsck failed");
3589
3590            let fs = reopen(fs).await;
3591
3592            check_object(fs.clone()).await;
3593
3594            fs
3595        }
3596
3597        let mut fs = test_filesystem().await;
3598        let crypt = Arc::new(new_insecure_crypt());
3599
3600        {
3601            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3602            let _store = root_volume
3603                .new_volume(
3604                    "test",
3605                    NewChildStoreOptions {
3606                        options: StoreOptions {
3607                            crypt: Some(crypt.clone()),
3608                            ..StoreOptions::default()
3609                        },
3610                        ..Default::default()
3611                    },
3612                )
3613                .await
3614                .expect("new_volume failed");
3615        }
3616
3617        // Run a few iterations so that we test changes with the stream cipher offset.
3618        for i in 0..5 {
3619            fs = one_iteration(fs, crypt.clone(), i).await;
3620        }
3621    }
3622
3623    #[test_case(true; "with a flush")]
3624    #[test_case(false; "without a flush")]
3625    #[fuchsia::test(threads = 10)]
3626    async fn test_object_id_cipher_roll(with_flush: bool) {
3627        let fs = test_filesystem().await;
3628        let crypt = Arc::new(new_insecure_crypt());
3629
3630        let expected_key = {
3631            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3632            let store = root_volume
3633                .new_volume(
3634                    "test",
3635                    NewChildStoreOptions {
3636                        options: StoreOptions {
3637                            crypt: Some(crypt.clone()),
3638                            ..StoreOptions::default()
3639                        },
3640                        ..Default::default()
3641                    },
3642                )
3643                .await
3644                .expect("new_volume failed");
3645
3646            // Create some files so that our in-memory copy of StoreInfo has changes (the object
3647            // count) pending a flush.
3648            let root_dir_id = store.root_directory_object_id();
3649            let root_dir =
3650                Arc::new(Directory::open(&store, root_dir_id).await.expect("open failed"));
3651            let mut transaction = fs
3652                .clone()
3653                .new_transaction(
3654                    lock_keys![LockKey::object(store.store_object_id(), root_dir_id)],
3655                    Options::default(),
3656                )
3657                .await
3658                .expect("new_transaction failed");
3659            for i in 0..10 {
3660                root_dir.create_child_file(&mut transaction, &format!("file {i}")).await.unwrap();
3661            }
3662            transaction.commit().await.expect("commit failed");
3663
3664            let orig_store_info = store.store_info().unwrap();
3665
3666            // Hack the last object ID to force a roll of the object ID cipher.
3667            {
3668                let mut last_object_id = store.last_object_id.lock();
3669                match &mut *last_object_id {
3670                    LastObjectId::Encrypted { id, .. } => {
3671                        assert_eq!(*id & OBJECT_ID_HI_MASK, 0);
3672                        *id |= 0xffffffff;
3673                    }
3674                    _ => unreachable!(),
3675                }
3676            }
3677
3678            let mut transaction = fs
3679                .clone()
3680                .new_transaction(
3681                    lock_keys![LockKey::object(
3682                        store.store_object_id(),
3683                        store.root_directory_object_id()
3684                    )],
3685                    Options::default(),
3686                )
3687                .await
3688                .expect("new_transaction failed");
3689            let root_directory = Directory::open(&store, store.root_directory_object_id())
3690                .await
3691                .expect("open failed");
3692            let object = root_directory
3693                .create_child_file(&mut transaction, "test")
3694                .await
3695                .expect("create_child_file failed");
3696            transaction.commit().await.expect("commit failed");
3697
3698            assert_eq!(object.object_id() & OBJECT_ID_HI_MASK, 1u64 << 32);
3699
3700            // Check that the key has been changed.
3701            let key = match (
3702                store.store_info().unwrap().last_object_id,
3703                orig_store_info.last_object_id,
3704            ) {
3705                (
3706                    LastObjectIdInfo::Encrypted { key, id },
3707                    LastObjectIdInfo::Encrypted { key: orig_key, .. },
3708                ) => {
3709                    assert_ne!(key, orig_key);
3710                    assert_eq!(id, 1u64 << 32);
3711                    key
3712                }
3713                _ => unreachable!(),
3714            };
3715
3716            if with_flush {
3717                fs.journal().compact().await.unwrap();
3718            }
3719
3720            let last_object_id = store.last_object_id.lock();
3721            assert_eq!(last_object_id.id(), 1u64 << 32);
3722            key
3723        };
3724
3725        fs.close().await.expect("Close failed");
3726        let device = fs.take_device().await;
3727        device.reopen(false);
3728        let fs = FxFilesystem::open(device).await.expect("open failed");
3729        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3730        let store = root_volume
3731            .volume("test", StoreOptions { crypt: Some(crypt.clone()), ..StoreOptions::default() })
3732            .await
3733            .expect("volume failed");
3734
3735        assert_matches!(store.store_info().unwrap().last_object_id, LastObjectIdInfo::Encrypted { key, .. } if key == expected_key);
3736        assert_eq!(store.last_object_id.lock().id(), 1u64 << 32);
3737
3738        fsck(fs.clone()).await.expect("fsck failed");
3739        fsck_volume(&fs, store.store_object_id(), None).await.expect("fsck_volume failed");
3740    }
3741
3742    #[fuchsia::test(threads = 2)]
3743    async fn test_race_object_id_cipher_roll_and_flush() {
3744        let fs = test_filesystem().await;
3745        let crypt = Arc::new(new_insecure_crypt());
3746
3747        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3748        let store = root_volume
3749            .new_volume(
3750                "test",
3751                NewChildStoreOptions {
3752                    options: StoreOptions { crypt: Some(crypt.clone()), ..StoreOptions::default() },
3753                    ..Default::default()
3754                },
3755            )
3756            .await
3757            .expect("new_volume failed");
3758
3759        assert!(matches!(&*store.last_object_id.lock(), LastObjectId::Encrypted { .. }));
3760
3761        // Create some files so that our in-memory copy of StoreInfo has changes (the object
3762        // count) pending a flush.
3763        let root_dir_id = store.root_directory_object_id();
3764        let root_dir = Arc::new(Directory::open(&store, root_dir_id).await.expect("open failed"));
3765
3766        let _executor_tasks = testing::force_executor_threads_to_run(2).await;
3767
3768        for j in 0..100 {
3769            let mut transaction = fs
3770                .clone()
3771                .new_transaction(
3772                    lock_keys![LockKey::object(store.store_object_id(), root_dir_id)],
3773                    Options::default(),
3774                )
3775                .await
3776                .expect("new_transaction failed");
3777            root_dir.create_child_file(&mut transaction, &format!("file {j}")).await.unwrap();
3778            transaction.commit().await.expect("commit failed");
3779
3780            let task = {
3781                let fs = fs.clone();
3782                fasync::Task::spawn(async move {
3783                    fs.journal().compact().await.unwrap();
3784                })
3785            };
3786
3787            // Hack the last object ID to force a roll of the object ID cipher.
3788            {
3789                let mut last_object_id = store.last_object_id.lock();
3790                let LastObjectId::Encrypted { id, .. } = &mut *last_object_id else {
3791                    unreachable!()
3792                };
3793                assert_eq!(*id >> 32, j);
3794                *id |= 0xffffffff;
3795            }
3796
3797            let mut transaction = fs
3798                .clone()
3799                .new_transaction(
3800                    lock_keys![LockKey::object(
3801                        store.store_object_id(),
3802                        store.root_directory_object_id()
3803                    )],
3804                    Options::default(),
3805                )
3806                .await
3807                .expect("new_transaction failed");
3808            let root_directory = Directory::open(&store, store.root_directory_object_id())
3809                .await
3810                .expect("open failed");
3811            root_directory
3812                .create_child_file(&mut transaction, "test {j}")
3813                .await
3814                .expect("create_child_file failed");
3815            transaction.commit().await.expect("commit failed");
3816
3817            task.await;
3818
3819            // Check that the key has been changed.
3820            let new_store_info = store.load_store_info().await.unwrap();
3821
3822            let LastObjectIdInfo::Encrypted { id, key } = new_store_info.last_object_id else {
3823                unreachable!()
3824            };
3825            assert_eq!(id >> 32, j + 1);
3826            let LastObjectIdInfo::Encrypted { key: in_memory_key, .. } =
3827                store.store_info().unwrap().last_object_id
3828            else {
3829                unreachable!()
3830            };
3831            assert_eq!(key, in_memory_key);
3832        }
3833
3834        fs.close().await.expect("Close failed");
3835    }
3836
3837    #[fuchsia::test]
3838    async fn test_object_id_no_roll_for_unencrypted_store() {
3839        let fs = test_filesystem().await;
3840
3841        {
3842            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3843            let store = root_volume
3844                .new_volume("test", NewChildStoreOptions::default())
3845                .await
3846                .expect("new_volume failed");
3847
3848            // Hack the last object ID.
3849            {
3850                let mut last_object_id = store.last_object_id.lock();
3851                match &mut *last_object_id {
3852                    LastObjectId::Unencrypted { id } => {
3853                        assert_eq!(*id & OBJECT_ID_HI_MASK, 0);
3854                        *id |= 0xffffffff;
3855                    }
3856                    _ => unreachable!(),
3857                }
3858            }
3859
3860            let mut transaction = fs
3861                .clone()
3862                .new_transaction(
3863                    lock_keys![LockKey::object(
3864                        store.store_object_id(),
3865                        store.root_directory_object_id()
3866                    )],
3867                    Options::default(),
3868                )
3869                .await
3870                .expect("new_transaction failed");
3871            let root_directory = Directory::open(&store, store.root_directory_object_id())
3872                .await
3873                .expect("open failed");
3874            let object = root_directory
3875                .create_child_file(&mut transaction, "test")
3876                .await
3877                .expect("create_child_file failed");
3878            transaction.commit().await.expect("commit failed");
3879
3880            assert_eq!(object.object_id(), 0x1_0000_0000);
3881
3882            // Check that there is still no key.
3883            assert_matches!(
3884                store.store_info().unwrap().last_object_id,
3885                LastObjectIdInfo::Unencrypted { .. }
3886            );
3887
3888            assert_eq!(store.last_object_id.lock().id(), 0x1_0000_0000);
3889        };
3890
3891        fs.close().await.expect("Close failed");
3892        let device = fs.take_device().await;
3893        device.reopen(false);
3894        let fs = FxFilesystem::open(device).await.expect("open failed");
3895        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3896        let store =
3897            root_volume.volume("test", StoreOptions::default()).await.expect("volume failed");
3898
3899        assert_eq!(store.last_object_id.lock().id(), 0x1_0000_0000);
3900    }
3901
3902    #[fuchsia::test]
3903    fn test_object_id_is_not_invalid_object_id() {
3904        let key = UnwrappedKey::new(vec![0; FXFS_KEY_SIZE]);
3905        // 1106634048 results in INVALID_OBJECT_ID with this key.
3906        let mut last_object_id =
3907            LastObjectId::Encrypted { id: 1106634047, cipher: Box::new(Ff1::new(&key)) };
3908        assert!(last_object_id.try_get_next().is_some());
3909        assert!(last_object_id.try_get_next().is_some());
3910    }
3911
3912    #[fuchsia::test]
3913    async fn test_last_object_id_is_correct_after_unlock() {
3914        let fs = test_filesystem().await;
3915        let crypt = Arc::new(new_insecure_crypt());
3916
3917        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3918        let store = root_volume
3919            .new_volume(
3920                "test",
3921                NewChildStoreOptions {
3922                    options: StoreOptions { crypt: Some(crypt.clone()), ..StoreOptions::default() },
3923                    ..Default::default()
3924                },
3925            )
3926            .await
3927            .expect("new_volume failed");
3928
3929        let mut transaction = fs
3930            .clone()
3931            .new_transaction(
3932                lock_keys![LockKey::object(
3933                    store.store_object_id(),
3934                    store.root_directory_object_id()
3935                )],
3936                Options::default(),
3937            )
3938            .await
3939            .expect("new_transaction failed");
3940        let root_directory =
3941            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
3942        root_directory
3943            .create_child_file(&mut transaction, "test")
3944            .await
3945            .expect("create_child_file failed");
3946        transaction.commit().await.expect("commit failed");
3947
3948        // Compact so that StoreInfo is written.
3949        fs.journal().compact().await.unwrap();
3950
3951        let last_object_id = store.last_object_id.lock().id();
3952
3953        store.lock().await.unwrap();
3954        store.unlock(NO_OWNER, crypt.clone()).await.unwrap();
3955
3956        assert_eq!(store.last_object_id.lock().id(), last_object_id);
3957    }
3958
3959    #[fuchsia::test(threads = 20)]
3960    async fn test_race_when_rolling_last_object_id_cipher() {
3961        // NOTE: This test is trying to test a race, so if it fails, it might be flaky.
3962
3963        const NUM_THREADS: usize = 20;
3964
3965        let fs = test_filesystem().await;
3966        let crypt = Arc::new(new_insecure_crypt());
3967
3968        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3969        let store = root_volume
3970            .new_volume(
3971                "test",
3972                NewChildStoreOptions {
3973                    options: StoreOptions { crypt: Some(crypt.clone()), ..StoreOptions::default() },
3974                    ..Default::default()
3975                },
3976            )
3977            .await
3978            .expect("new_volume failed");
3979
3980        let store_id = store.store_object_id();
3981        let root_dir_id = store.root_directory_object_id();
3982
3983        let root_directory =
3984            Arc::new(Directory::open(&store, root_dir_id).await.expect("open failed"));
3985
3986        // Create directories.
3987        let mut directories = Vec::new();
3988        for _ in 0..NUM_THREADS {
3989            let mut transaction = fs
3990                .clone()
3991                .new_transaction(
3992                    lock_keys![LockKey::object(store_id, root_dir_id,)],
3993                    Options::default(),
3994                )
3995                .await
3996                .expect("new_transaction failed");
3997            directories.push(
3998                root_directory
3999                    .create_child_dir(&mut transaction, "test")
4000                    .await
4001                    .expect("create_child_file failed"),
4002            );
4003            transaction.commit().await.expect("commit failed");
4004        }
4005
4006        // Hack the last object ID so that the next ID will require a roll.
4007        match &mut *store.last_object_id.lock() {
4008            LastObjectId::Encrypted { id, .. } => *id |= 0xffff_ffff,
4009            _ => unreachable!(),
4010        }
4011
4012        let scope = fasync::Scope::new();
4013
4014        let _executor_tasks = testing::force_executor_threads_to_run(NUM_THREADS).await;
4015
4016        for dir in directories {
4017            let fs = fs.clone();
4018            scope.spawn(async move {
4019                let mut transaction = fs
4020                    .clone()
4021                    .new_transaction(
4022                        lock_keys![LockKey::object(store_id, dir.object_id(),)],
4023                        Options::default(),
4024                    )
4025                    .await
4026                    .expect("new_transaction failed");
4027                dir.create_child_file(&mut transaction, "test")
4028                    .await
4029                    .expect("create_child_file failed");
4030                transaction.commit().await.expect("commit failed");
4031            });
4032        }
4033
4034        scope.on_no_tasks().await;
4035
4036        assert_eq!(store.last_object_id.lock().id(), 0x1_0000_0000 + NUM_THREADS as u64 - 1);
4037    }
4038
4039    #[fuchsia::test(threads = 10)]
4040    async fn test_lock_store() {
4041        let fs = test_filesystem().await;
4042        let crypt = Arc::new(new_insecure_crypt());
4043
4044        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
4045        let store = root_volume
4046            .new_volume(
4047                "test",
4048                NewChildStoreOptions {
4049                    options: StoreOptions { crypt: Some(crypt.clone()), ..StoreOptions::default() },
4050                    ..NewChildStoreOptions::default()
4051                },
4052            )
4053            .await
4054            .expect("new_volume failed");
4055        let mut transaction = fs
4056            .clone()
4057            .new_transaction(
4058                lock_keys![LockKey::object(
4059                    store.store_object_id(),
4060                    store.root_directory_object_id()
4061                )],
4062                Options::default(),
4063            )
4064            .await
4065            .expect("new_transaction failed");
4066        let root_directory =
4067            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
4068        root_directory
4069            .create_child_file(&mut transaction, "test")
4070            .await
4071            .expect("create_child_file failed");
4072        transaction.commit().await.expect("commit failed");
4073        store.lock().await.expect("lock failed");
4074
4075        store.unlock(NO_OWNER, crypt).await.expect("unlock failed");
4076        root_directory.lookup("test").await.expect("lookup failed").expect("not found");
4077    }
4078
4079    #[fuchsia::test(threads = 10)]
4080    async fn test_unlock_read_only() {
4081        let fs = test_filesystem().await;
4082        let crypt = Arc::new(new_insecure_crypt());
4083
4084        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
4085        let store = root_volume
4086            .new_volume(
4087                "test",
4088                NewChildStoreOptions {
4089                    options: StoreOptions { crypt: Some(crypt.clone()), ..StoreOptions::default() },
4090                    ..NewChildStoreOptions::default()
4091                },
4092            )
4093            .await
4094            .expect("new_volume failed");
4095        let mut transaction = fs
4096            .clone()
4097            .new_transaction(
4098                lock_keys![LockKey::object(
4099                    store.store_object_id(),
4100                    store.root_directory_object_id()
4101                )],
4102                Options::default(),
4103            )
4104            .await
4105            .expect("new_transaction failed");
4106        let root_directory =
4107            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
4108        root_directory
4109            .create_child_file(&mut transaction, "test")
4110            .await
4111            .expect("create_child_file failed");
4112        transaction.commit().await.expect("commit failed");
4113        store.lock().await.expect("lock failed");
4114
4115        store.unlock_read_only(crypt.clone()).await.expect("unlock failed");
4116        root_directory.lookup("test").await.expect("lookup failed").expect("not found");
4117        store.lock_read_only();
4118        store.unlock_read_only(crypt).await.expect("unlock failed");
4119        root_directory.lookup("test").await.expect("lookup failed").expect("not found");
4120    }
4121
4122    #[fuchsia::test(threads = 10)]
4123    async fn test_key_rolled_when_unlocked() {
4124        let fs = test_filesystem().await;
4125        let crypt = Arc::new(new_insecure_crypt());
4126
4127        let object_id;
4128        {
4129            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
4130            let store = root_volume
4131                .new_volume(
4132                    "test",
4133                    NewChildStoreOptions {
4134                        options: StoreOptions {
4135                            crypt: Some(crypt.clone()),
4136                            ..StoreOptions::default()
4137                        },
4138                        ..Default::default()
4139                    },
4140                )
4141                .await
4142                .expect("new_volume failed");
4143            let mut transaction = fs
4144                .clone()
4145                .new_transaction(
4146                    lock_keys![LockKey::object(
4147                        store.store_object_id(),
4148                        store.root_directory_object_id()
4149                    )],
4150                    Options::default(),
4151                )
4152                .await
4153                .expect("new_transaction failed");
4154            let root_directory = Directory::open(&store, store.root_directory_object_id())
4155                .await
4156                .expect("open failed");
4157            object_id = root_directory
4158                .create_child_file(&mut transaction, "test")
4159                .await
4160                .expect("create_child_file failed")
4161                .object_id();
4162            transaction.commit().await.expect("commit failed");
4163        }
4164
4165        fs.close().await.expect("Close failed");
4166        let mut device = fs.take_device().await;
4167
4168        // Repeatedly remount so that we can be sure that we can remount when there are many
4169        // mutations keys.
4170        for _ in 0..100 {
4171            device.reopen(false);
4172            let fs = FxFilesystem::open(device).await.expect("open failed");
4173            {
4174                let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
4175                let store = root_volume
4176                    .volume(
4177                        "test",
4178                        StoreOptions { crypt: Some(crypt.clone()), ..StoreOptions::default() },
4179                    )
4180                    .await
4181                    .expect("open_volume failed");
4182
4183                // The key should get rolled every time we unlock.
4184                assert_eq!(store.mutations_cipher.lock().as_ref().unwrap().offset(), 0);
4185
4186                // Make sure there's an encrypted mutation.
4187                let handle =
4188                    ObjectStore::open_object(&store, object_id, HandleOptions::default(), None)
4189                        .await
4190                        .expect("open_object failed");
4191                let buffer = handle.allocate_buffer(100).await;
4192                handle
4193                    .write_or_append(Some(0), buffer.as_ref())
4194                    .await
4195                    .expect("write_or_append failed");
4196            }
4197            fs.close().await.expect("Close failed");
4198            device = fs.take_device().await;
4199        }
4200    }
4201
4202    #[test]
4203    fn test_store_info_max_serialized_size() {
4204        let info = StoreInfo {
4205            guid: [0xff; 16],
4206            last_object_id: LastObjectIdInfo::Encrypted {
4207                id: 0x1234567812345678,
4208                key: FxfsKey {
4209                    wrapping_key_id: 0x1234567812345678u128.to_le_bytes(),
4210                    key: WrappedKeyBytes::from([0xff; FXFS_WRAPPED_KEY_SIZE]),
4211                },
4212            },
4213            // Worst case, each layer should be 3/4 the size of the layer below it (because of the
4214            // compaction policy we're using).  If the smallest layer is 8,192 bytes, then 120
4215            // layers would take up a size that exceeds a 64 bit unsigned integer, so if this fits,
4216            // any size should fit.
4217            layers: vec![0x1234567812345678; 120],
4218            root_directory_object_id: 0x1234567812345678,
4219            graveyard_directory_object_id: 0x1234567812345678,
4220            object_count: 0x1234567812345678,
4221            mutations_key: Some(FxfsKey {
4222                wrapping_key_id: 0x1234567812345678u128.to_le_bytes(),
4223                key: WrappedKeyBytes::from([0xff; FXFS_WRAPPED_KEY_SIZE]),
4224            }),
4225            mutations_cipher_offset: 0x1234567812345678,
4226            encrypted_mutations_object_id: 0x1234567812345678,
4227            internal_directory_object_id: INVALID_OBJECT_ID,
4228        };
4229        let mut serialized_info = Vec::new();
4230        info.serialize_with_version(&mut serialized_info).unwrap();
4231        assert!(
4232            serialized_info.len() <= MAX_STORE_INFO_SERIALIZED_SIZE,
4233            "{}",
4234            serialized_info.len()
4235        );
4236    }
4237
4238    async fn reopen_after_crypt_failure_inner(read_only: bool) {
4239        let fs = test_filesystem().await;
4240        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
4241
4242        let store = {
4243            let crypt = Arc::new(new_insecure_crypt());
4244            let store = root_volume
4245                .new_volume(
4246                    "vol",
4247                    NewChildStoreOptions {
4248                        options: StoreOptions {
4249                            crypt: Some(crypt.clone()),
4250                            ..StoreOptions::default()
4251                        },
4252                        ..Default::default()
4253                    },
4254                )
4255                .await
4256                .expect("new_volume failed");
4257            let root_directory = Directory::open(&store, store.root_directory_object_id())
4258                .await
4259                .expect("open failed");
4260            let mut transaction = fs
4261                .clone()
4262                .new_transaction(
4263                    lock_keys![LockKey::object(
4264                        store.store_object_id(),
4265                        root_directory.object_id()
4266                    )],
4267                    Options::default(),
4268                )
4269                .await
4270                .expect("new_transaction failed");
4271            root_directory
4272                .create_child_file(&mut transaction, "test")
4273                .await
4274                .expect("create_child_file failed");
4275            transaction.commit().await.expect("commit failed");
4276
4277            crypt.shutdown();
4278            let mut transaction = fs
4279                .clone()
4280                .new_transaction(
4281                    lock_keys![LockKey::object(
4282                        store.store_object_id(),
4283                        root_directory.object_id()
4284                    )],
4285                    Options::default(),
4286                )
4287                .await
4288                .expect("new_transaction failed");
4289            root_directory
4290                .create_child_file(&mut transaction, "test2")
4291                .await
4292                .map(|_| ())
4293                .expect_err("create_child_file should fail");
4294            store.lock().await.expect("lock failed");
4295            store
4296        };
4297
4298        let crypt = Arc::new(new_insecure_crypt());
4299        if read_only {
4300            store.unlock_read_only(crypt).await.expect("unlock failed");
4301        } else {
4302            store.unlock(NO_OWNER, crypt).await.expect("unlock failed");
4303        }
4304        let root_directory =
4305            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
4306        root_directory.lookup("test").await.expect("lookup failed").expect("not found");
4307    }
4308
4309    #[fuchsia::test(threads = 10)]
4310    async fn test_reopen_after_crypt_failure() {
4311        reopen_after_crypt_failure_inner(false).await;
4312    }
4313
4314    #[fuchsia::test(threads = 10)]
4315    async fn test_reopen_read_only_after_crypt_failure() {
4316        reopen_after_crypt_failure_inner(true).await;
4317    }
4318
4319    #[fuchsia::test(threads = 10)]
4320    #[should_panic(expected = "Insufficient reservation space")]
4321    #[cfg(debug_assertions)]
4322    async fn large_transaction_causes_panic_in_debug_builds() {
4323        let fs = test_filesystem().await;
4324        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
4325        let store = root_volume
4326            .new_volume("vol", NewChildStoreOptions::default())
4327            .await
4328            .expect("new_volume failed");
4329        let root_directory =
4330            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
4331        let mut transaction = fs
4332            .clone()
4333            .new_transaction(
4334                lock_keys![LockKey::object(store.store_object_id(), root_directory.object_id())],
4335                Options::default(),
4336            )
4337            .await
4338            .expect("transaction");
4339        for i in 0..500 {
4340            root_directory
4341                .create_symlink(&mut transaction, b"link", &format!("{}", i))
4342                .await
4343                .expect("symlink");
4344        }
4345        assert_eq!(transaction.commit().await.expect("commit"), 0);
4346    }
4347
4348    #[fuchsia::test]
4349    async fn test_crypt_failure_does_not_fuse_journal() {
4350        let fs = test_filesystem().await;
4351
4352        struct Owner;
4353        #[async_trait]
4354        impl StoreOwner for Owner {
4355            async fn force_lock(self: Arc<Self>, store: &ObjectStore) -> Result<(), anyhow::Error> {
4356                store.lock().await
4357            }
4358        }
4359        let owner = Arc::new(Owner) as Arc<dyn StoreOwner>;
4360
4361        {
4362            // Create two stores and a record for each store, so the journal will need to flush them
4363            // both later.
4364            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
4365            let store1 = root_volume
4366                .new_volume(
4367                    "vol1",
4368                    NewChildStoreOptions {
4369                        options: StoreOptions {
4370                            crypt: Some(Arc::new(new_insecure_crypt())),
4371                            ..StoreOptions::default()
4372                        },
4373                        ..Default::default()
4374                    },
4375                )
4376                .await
4377                .expect("new_volume failed");
4378            let crypt = Arc::new(new_insecure_crypt());
4379            let store2 = root_volume
4380                .new_volume(
4381                    "vol2",
4382                    NewChildStoreOptions {
4383                        options: StoreOptions {
4384                            owner: Arc::downgrade(&owner),
4385                            crypt: Some(crypt.clone()),
4386                        },
4387                        ..Default::default()
4388                    },
4389                )
4390                .await
4391                .expect("new_volume failed");
4392            for store in [&store1, &store2] {
4393                let root_directory = Directory::open(store, store.root_directory_object_id())
4394                    .await
4395                    .expect("open failed");
4396                let mut transaction = fs
4397                    .clone()
4398                    .new_transaction(
4399                        lock_keys![LockKey::object(
4400                            store.store_object_id(),
4401                            root_directory.object_id()
4402                        )],
4403                        Options::default(),
4404                    )
4405                    .await
4406                    .expect("new_transaction failed");
4407                root_directory
4408                    .create_child_file(&mut transaction, "test")
4409                    .await
4410                    .expect("create_child_file failed");
4411                transaction.commit().await.expect("commit failed");
4412            }
4413            // Shut down the crypt instance for store2, and then compact.  Compaction should not
4414            // fail, and the store should become locked.
4415            crypt.shutdown();
4416            fs.journal().compact().await.expect("compact failed");
4417            // The store should now be locked.
4418            assert!(store2.is_locked());
4419        }
4420
4421        // Even though the store wasn't flushed, the mutation to store2 will still be valid as it is
4422        // held in the journal.
4423        fs.close().await.expect("close failed");
4424        let device = fs.take_device().await;
4425        device.reopen(false);
4426        let fs = FxFilesystem::open(device).await.expect("open failed");
4427        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
4428
4429        for volume_name in ["vol1", "vol2"] {
4430            let store = root_volume
4431                .volume(
4432                    volume_name,
4433                    StoreOptions {
4434                        crypt: Some(Arc::new(new_insecure_crypt())),
4435                        ..StoreOptions::default()
4436                    },
4437                )
4438                .await
4439                .expect("open volume failed");
4440            let root_directory = Directory::open(&store, store.root_directory_object_id())
4441                .await
4442                .expect("open failed");
4443            assert!(root_directory.lookup("test").await.expect("lookup failed").is_some());
4444        }
4445
4446        fs.close().await.expect("close failed");
4447    }
4448
4449    #[fuchsia::test]
4450    async fn test_crypt_failure_during_unlock_race() {
4451        let fs = test_filesystem().await;
4452
4453        struct Owner;
4454        #[async_trait]
4455        impl StoreOwner for Owner {
4456            async fn force_lock(self: Arc<Self>, store: &ObjectStore) -> Result<(), anyhow::Error> {
4457                store.lock().await
4458            }
4459        }
4460        let owner = Arc::new(Owner) as Arc<dyn StoreOwner>;
4461
4462        let store_object_id = {
4463            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
4464            let store = root_volume
4465                .new_volume(
4466                    "vol",
4467                    NewChildStoreOptions {
4468                        options: StoreOptions {
4469                            owner: Arc::downgrade(&owner),
4470                            crypt: Some(Arc::new(new_insecure_crypt())),
4471                        },
4472                        ..Default::default()
4473                    },
4474                )
4475                .await
4476                .expect("new_volume failed");
4477            let root_directory = Directory::open(&store, store.root_directory_object_id())
4478                .await
4479                .expect("open failed");
4480            let mut transaction = fs
4481                .clone()
4482                .new_transaction(
4483                    lock_keys![LockKey::object(
4484                        store.store_object_id(),
4485                        root_directory.object_id()
4486                    )],
4487                    Options::default(),
4488                )
4489                .await
4490                .expect("new_transaction failed");
4491            root_directory
4492                .create_child_file(&mut transaction, "test")
4493                .await
4494                .expect("create_child_file failed");
4495            transaction.commit().await.expect("commit failed");
4496            store.store_object_id()
4497        };
4498
4499        fs.close().await.expect("close failed");
4500        let device = fs.take_device().await;
4501        device.reopen(false);
4502
4503        let fs = FxFilesystem::open(device).await.expect("open failed");
4504        {
4505            let fs_clone = fs.clone();
4506            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
4507
4508            let crypt = Arc::new(new_insecure_crypt());
4509            let crypt_clone = crypt.clone();
4510            join!(
4511                async move {
4512                    // Unlock might fail, so ignore errors.
4513                    let _ = root_volume
4514                        .volume(
4515                            "vol",
4516                            StoreOptions {
4517                                owner: Arc::downgrade(&owner),
4518                                crypt: Some(crypt_clone),
4519                            },
4520                        )
4521                        .await;
4522                },
4523                async move {
4524                    // Block until unlock is finished but before flushing due to unlock is finished, to
4525                    // maximize the chances of weirdness.
4526                    let keys = lock_keys![LockKey::flush(store_object_id)];
4527                    let _ = fs_clone.lock_manager().write_lock(keys).await;
4528                    crypt.shutdown();
4529                }
4530            );
4531        }
4532
4533        fs.close().await.expect("close failed");
4534        let device = fs.take_device().await;
4535        device.reopen(false);
4536
4537        let fs = FxFilesystem::open(device).await.expect("open failed");
4538        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
4539        let store = root_volume
4540            .volume(
4541                "vol",
4542                StoreOptions {
4543                    crypt: Some(Arc::new(new_insecure_crypt())),
4544                    ..StoreOptions::default()
4545                },
4546            )
4547            .await
4548            .expect("open volume failed");
4549        let root_directory =
4550            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
4551        assert!(root_directory.lookup("test").await.expect("lookup failed").is_some());
4552
4553        fs.close().await.expect("close failed");
4554    }
4555
4556    #[fuchsia::test]
4557    async fn test_low_32_bit_object_ids() {
4558        let device = DeviceHolder::new(FakeDevice::new(16384, TEST_DEVICE_BLOCK_SIZE));
4559        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
4560
4561        {
4562            let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
4563
4564            let store = root_vol
4565                .new_volume(
4566                    "test",
4567                    NewChildStoreOptions { low_32_bit_object_ids: true, ..Default::default() },
4568                )
4569                .await
4570                .expect("new_volume failed");
4571
4572            let root_dir = Directory::open(&store, store.root_directory_object_id())
4573                .await
4574                .expect("open failed");
4575
4576            let mut ids = std::collections::HashSet::new();
4577
4578            for i in 0..100 {
4579                let mut transaction = fs
4580                    .clone()
4581                    .new_transaction(
4582                        lock_keys![LockKey::object(store.store_object_id(), root_dir.object_id())],
4583                        Options::default(),
4584                    )
4585                    .await
4586                    .expect("new_transaction failed");
4587
4588                for j in 0..100 {
4589                    let object = root_dir
4590                        .create_child_dir(&mut transaction, &format!("{i}.{j}"))
4591                        .await
4592                        .expect("create_child_file failed");
4593
4594                    assert!(object.object_id() < 1 << 32);
4595                    assert_ne!(object.object_id(), INVALID_OBJECT_ID);
4596                    assert!(ids.insert(object.object_id()));
4597                }
4598
4599                transaction.commit().await.expect("commit failed");
4600            }
4601
4602            assert_matches!(store.store_info().unwrap().last_object_id, LastObjectIdInfo::Low32Bit);
4603
4604            fsck_volume(&fs, store.store_object_id(), None).await.expect("fsck_volume failed");
4605        }
4606
4607        // Verify persistence
4608        fs.close().await.expect("Close failed");
4609        let device = fs.take_device().await;
4610        device.reopen(false);
4611        let fs = FxFilesystem::open(device).await.expect("open failed");
4612        let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
4613        let store = root_vol.volume("test", StoreOptions::default()).await.expect("volume failed");
4614
4615        // Check that we can still create files and they have low 32-bit IDs.
4616        let root_dir =
4617            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
4618        let mut transaction = fs
4619            .clone()
4620            .new_transaction(
4621                lock_keys![LockKey::object(store.store_object_id(), root_dir.object_id())],
4622                Options::default(),
4623            )
4624            .await
4625            .expect("new_transaction failed");
4626
4627        let object = root_dir
4628            .create_child_file(&mut transaction, "persistence_check")
4629            .await
4630            .expect("create_child_file failed");
4631        assert!(object.object_id() < 1 << 32);
4632
4633        transaction.commit().await.expect("commit failed");
4634
4635        assert_matches!(store.store_info().unwrap().last_object_id, LastObjectIdInfo::Low32Bit);
4636    }
4637}