Skip to main content

fxfs/
object_store.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5pub mod allocator;
6pub mod caching_object_handle;
7pub mod data_object_handle;
8pub mod directory;
9mod extent_mapping_iterator;
10mod extent_record;
11mod flush;
12pub mod graveyard;
13mod install;
14pub mod journal;
15mod key_manager;
16pub(crate) mod merge;
17pub mod object_manager;
18pub mod object_record;
19pub mod project_id;
20mod store_object_handle;
21pub mod transaction;
22mod tree;
23mod tree_cache;
24pub mod volume;
25
26pub use data_object_handle::{
27    DataObjectHandle, DirectWriter, FileExtent, FsverityState, FsverityStateInner, RangeType,
28};
29pub use directory::Directory;
30pub use object_record::{ChildValue, ObjectDescriptor, PosixAttributes, Timestamp};
31pub use store_object_handle::{
32    EXTENDED_ATTRIBUTE_RANGE_END, EXTENDED_ATTRIBUTE_RANGE_START, SetExtendedAttributeMode,
33    StoreObjectHandle,
34};
35
36use crate::errors::FxfsError;
37use crate::filesystem::{
38    ApplyContext, ApplyMode, FxFilesystem, JournalingObject, MAX_FILE_SIZE, SyncOptions,
39    TruncateGuard, TxnGuard,
40};
41use crate::log::*;
42use crate::lsm_tree::cache::{NullCache, ObjectCache};
43use crate::lsm_tree::types::{Existence, Item, ItemRef, LayerIterator};
44use crate::lsm_tree::{LSMTree, Query};
45use crate::object_handle::{INVALID_OBJECT_ID, ObjectHandle, ObjectProperties, ReadObjectHandle};
46use crate::object_store::allocator::Allocator;
47use crate::object_store::graveyard::Graveyard;
48use crate::object_store::journal::{JournalCheckpoint, JournalCheckpointV32, JournaledTransaction};
49use crate::object_store::key_manager::KeyManager;
50use crate::object_store::transaction::{
51    AssocObj, AssociatedObject, LockKey, ObjectStoreMutation, Operation, Options, Transaction,
52    lock_keys,
53};
54use crate::range::RangeExt;
55use crate::round::round_up;
56use crate::serialized_types::{Version, Versioned, VersionedLatest};
57use anyhow::{Context, Error, anyhow, bail, ensure};
58use async_trait::async_trait;
59use fidl_fuchsia_io as fio;
60use fprint::TypeFingerprint;
61use fuchsia_sync::Mutex;
62use fxfs_crypto::ff1::Ff1;
63use fxfs_crypto::{
64    CipherHolder, Crypt, KeyPurpose, ObjectType, StreamCipher, UnwrappedKey, WrappingKeyId,
65    key_to_cipher,
66};
67use fxfs_macros::{Migrate, migrate_to_version};
68use rand::RngCore;
69use scopeguard::ScopeGuard;
70use serde::{Deserialize, Serialize};
71use std::collections::HashSet;
72use std::fmt;
73use std::num::NonZero;
74use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
75use std::sync::{Arc, OnceLock, Weak};
76use storage_device::Device;
77use uuid::Uuid;
78
79pub use extent_record::{
80    BLOB_MERKLE_ATTRIBUTE_ID, BLOB_METADATA_ATTRIBUTE_ID, DEFAULT_DATA_ATTRIBUTE_ID, ExtentKey,
81    ExtentMode, ExtentValue, FSVERITY_MERKLE_ATTRIBUTE_ID,
82};
83pub use object_record::{
84    AttributeKey, EncryptionKey, EncryptionKeys, ExtendedAttributeValue, FsverityMetadata, FxfsKey,
85    FxfsKeyV40, FxfsKeyV49, ObjectAttributes, ObjectKey, ObjectKeyData, ObjectKind, ObjectValue,
86    ProjectProperty, RootDigest,
87};
88pub use transaction::Mutation;
89
90// For encrypted stores, the lower 32 bits of the object ID are encrypted to make side-channel
91// attacks more difficult. This mask can be used to extract the hi part of the object ID.
92const OBJECT_ID_HI_MASK: u64 = 0xffffffff00000000;
93
94// At time of writing, this threshold limits transactions that delete extents to about 10,000 bytes.
95const TRANSACTION_MUTATION_THRESHOLD: usize = 200;
96
97// Encrypted files and directories use the fscrypt key (identified by `FSCRYPT_KEY_ID`) to encrypt
98// file contents and filenames respectively. All non-fscrypt encrypted files otherwise default to
99// using the `VOLUME_DATA_KEY_ID` key. Note, the filesystem always uses the `VOLUME_DATA_KEY_ID`
100// key to encrypt large extended attributes. Thus, encrypted files and directories with large
101// xattrs will have both an fscrypt and volume data key.
102pub const VOLUME_DATA_KEY_ID: u64 = 0;
103pub const FSCRYPT_KEY_ID: u64 = 1;
104
105/// A constant that can be used where an owner is expected of type `Weak<dyn StoreOwner>` but no
106/// owner is required.
107pub const NO_OWNER: Weak<()> = Weak::new();
108impl StoreOwner for () {}
109
110#[async_trait]
111pub trait StoreOwner: Send + Sync {
112    /// Forcibly lock the store.  This exists to give the StoreOwner an opportunity to clean up
113    /// tasks which might access the store before locking it, because ObjectStore::unlock can only
114    /// be called when the store is not in use.
115    async fn force_lock(self: Arc<Self>, _store: &ObjectStore) -> Result<(), Error> {
116        Err(anyhow!(FxfsError::Internal))
117    }
118}
119
120/// DataObjectHandle stores an owner that must implement this trait, which allows the handle to get
121/// back to an ObjectStore.
122pub trait HandleOwner: AsRef<ObjectStore> + Send + Sync + 'static {}
123
124/// StoreInfo stores information about the object store.  This is stored within the parent object
125/// store, and is used, for example, to get the persistent layer objects.
126pub type StoreInfo = StoreInfoV52;
127
128#[derive(Clone, Debug, Default, Serialize, Deserialize, TypeFingerprint, Versioned)]
129pub struct StoreInfoV52 {
130    /// The globally unique identifier for the associated object store. If unset, will be all zero.
131    guid: [u8; 16],
132
133    /// The last used object ID.  Note that this field is not accurate in memory; ObjectStore's
134    /// last_object_id field is the one to use in that case.  Technically, this might not be the
135    /// last object ID used for the latest transaction that created an object because we use this at
136    /// the point of creating the object but before we commit the transaction.  Transactions can
137    /// then get committed in an arbitrary order (or not at all).
138    last_object_id: LastObjectIdInfo,
139
140    /// Object ids for layers.  TODO(https://fxbug.dev/42178036): need a layer of indirection here
141    /// so we can support snapshots.
142    pub layers: Vec<u64>,
143
144    /// The object ID for the root directory.
145    root_directory_object_id: u64,
146
147    /// The object ID for the graveyard.
148    graveyard_directory_object_id: u64,
149
150    /// The number of live objects in the store.  This should *not* be trusted; it can be invalid
151    /// due to filesystem inconsistencies.
152    object_count: u64,
153
154    /// The (wrapped) key that encrypted mutations should use.
155    mutations_key: Option<FxfsKeyV49>,
156
157    /// Mutations for the store are encrypted using a stream cipher.  To decrypt the mutations, we
158    /// need to know the offset in the cipher stream to start it.
159    mutations_cipher_offset: u64,
160
161    /// If we have to flush the store whilst we do not have the key, we need to write the encrypted
162    /// mutations to an object. This is the object ID of that file if it exists.
163    pub encrypted_mutations_object_id: u64,
164
165    /// A directory for storing internal files in a directory structure. Holds INVALID_OBJECT_ID
166    /// when the directory doesn't yet exist.
167    internal_directory_object_id: u64,
168}
169
170#[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint)]
171enum LastObjectIdInfo {
172    Unencrypted {
173        id: u64,
174    },
175    Encrypted {
176        /// The *unencrypted* value of the last object ID.
177        id: u64,
178
179        /// Object IDs are encrypted to reduce the amount of information that sequential object IDs
180        /// reveal (such as the number of files in the system and the ordering of their creation in
181        /// time).  Only the bottom 32 bits of the object ID are encrypted whilst the top 32 bits
182        /// will increment after 2^32 object IDs have been used and this allows us to roll the key.
183        key: FxfsKeyV49,
184    },
185    Low32Bit,
186}
187
188impl Default for LastObjectIdInfo {
189    fn default() -> Self {
190        LastObjectIdInfo::Unencrypted { id: 0 }
191    }
192}
193
194#[derive(Default, Serialize, Deserialize, TypeFingerprint, Versioned)]
195pub struct StoreInfoV49 {
196    guid: [u8; 16],
197    last_object_id: u64,
198    layers: Vec<u64>,
199    root_directory_object_id: u64,
200    graveyard_directory_object_id: u64,
201    object_count: u64,
202    mutations_key: Option<FxfsKeyV49>,
203    mutations_cipher_offset: u64,
204    encrypted_mutations_object_id: u64,
205    object_id_key: Option<FxfsKeyV49>,
206    internal_directory_object_id: u64,
207}
208
209impl From<StoreInfoV49> for StoreInfoV52 {
210    fn from(value: StoreInfoV49) -> Self {
211        Self {
212            guid: value.guid,
213            last_object_id: if let Some(key) = value.object_id_key {
214                LastObjectIdInfo::Encrypted { id: value.last_object_id, key: key }
215            } else {
216                LastObjectIdInfo::Unencrypted { id: value.last_object_id }
217            },
218            layers: value.layers,
219            root_directory_object_id: value.root_directory_object_id,
220            graveyard_directory_object_id: value.graveyard_directory_object_id,
221            object_count: value.object_count,
222            mutations_key: value.mutations_key,
223            mutations_cipher_offset: value.mutations_cipher_offset,
224            encrypted_mutations_object_id: value.encrypted_mutations_object_id,
225            internal_directory_object_id: value.internal_directory_object_id,
226        }
227    }
228}
229
230#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
231#[migrate_to_version(StoreInfoV49)]
232pub struct StoreInfoV40 {
233    guid: [u8; 16],
234    last_object_id: u64,
235    layers: Vec<u64>,
236    root_directory_object_id: u64,
237    graveyard_directory_object_id: u64,
238    object_count: u64,
239    mutations_key: Option<FxfsKeyV40>,
240    mutations_cipher_offset: u64,
241    encrypted_mutations_object_id: u64,
242    object_id_key: Option<FxfsKeyV40>,
243    internal_directory_object_id: u64,
244}
245
246impl StoreInfo {
247    /// Returns the parent objects for this store.
248    pub fn parent_objects(&self) -> Vec<u64> {
249        // We should not include the ID of the store itself, since that should be referred to in the
250        // volume directory.
251        let mut objects = self.layers.to_vec();
252        if self.encrypted_mutations_object_id != INVALID_OBJECT_ID {
253            objects.push(self.encrypted_mutations_object_id);
254        }
255        objects
256    }
257}
258
259// TODO(https://fxbug.dev/42178037): We should test or put checks in place to ensure this limit isn't exceeded.
260// It will likely involve placing limits on the maximum number of layers.
261pub const MAX_STORE_INFO_SERIALIZED_SIZE: usize = 131072;
262
263// This needs to be large enough to accommodate the maximum amount of unflushed data (data that is
264// in the journal but hasn't yet been written to layer files) for a store.  We set a limit because
265// we want to limit the amount of memory use in the case the filesystem is corrupt or under attack.
266pub const MAX_ENCRYPTED_MUTATIONS_SIZE: usize = 8 * journal::DEFAULT_RECLAIM_SIZE as usize;
267
268#[derive(Default)]
269pub struct HandleOptions {
270    /// If true, transactions used by this handle will skip journal space checks.
271    pub skip_journal_checks: bool,
272    /// If true, data written to any attribute of this handle will not have per-block checksums
273    /// computed.
274    pub skip_checksums: bool,
275    /// If true, any files using fsverity will not attempt to perform any verification. This is
276    /// useful to open an object without the correct encryption keys to look at the metadata.
277    pub skip_fsverity: bool,
278}
279
280/// Parameters for encrypting a newly created object.
281pub struct ObjectEncryptionOptions {
282    /// If set, the keys are treated as permanent and never evicted from the KeyManager cache.
283    /// This is necessary when keys are managed by another store; for example, the layer files
284    /// of a child store are objects in the root store, but they are encrypted with keys from the
285    /// child store.  Generally, most objects should have this set to `false`.
286    pub permanent: bool,
287    pub key_id: u64,
288    pub key: EncryptionKey,
289    pub unwrapped_key: UnwrappedKey,
290}
291
292pub struct StoreOptions {
293    /// The owner of the store.
294    pub owner: Weak<dyn StoreOwner>,
295
296    /// The store is unencrypted if store is none.
297    pub crypt: Option<Arc<dyn Crypt>>,
298}
299
300impl Default for StoreOptions {
301    fn default() -> Self {
302        Self { owner: NO_OWNER, crypt: None }
303    }
304}
305
306#[derive(Default)]
307pub struct NewChildStoreOptions {
308    pub options: StoreOptions,
309
310    /// Specifies the object ID in the root store to be used for the store.  If set to
311    /// INVALID_OBJECT_ID (the default and typical case), a suitable ID will be chosen.
312    pub object_id: u64,
313
314    /// If true, reserve all 32 bit object_ids.  All new objects will start with IDs exceeding
315    /// 0x1_0000_0000.
316    pub reserve_32bit_object_ids: bool,
317
318    /// Object IDs will be restricted to 32 bits.  This involves a less performant algorithm and so
319    /// should not be used unless necessary.
320    pub low_32_bit_object_ids: bool,
321
322    /// If set, use this GUID for the new store.
323    pub guid: Option<[u8; 16]>,
324}
325
326pub type EncryptedMutations = EncryptedMutationsV49;
327
328#[derive(Clone, Default, Deserialize, Serialize, TypeFingerprint)]
329pub struct EncryptedMutationsV49 {
330    // Information about the mutations are held here, but the actual encrypted data is held within
331    // data.  For each transaction, we record the checkpoint and the count of mutations within the
332    // transaction.  The checkpoint is required for the log file offset (which we need to apply the
333    // mutations), and the version so that we can correctly decode the mutation after it has been
334    // decrypted. The count specifies the number of serialized mutations encoded in |data|.
335    transactions: Vec<(JournalCheckpointV32, u64)>,
336
337    // The encrypted mutations.
338    #[serde(with = "crate::zerocopy_serialization")]
339    data: Vec<u8>,
340
341    // If the mutations key was rolled, this holds the offset in `data` where the new key should
342    // apply.
343    mutations_key_roll: Vec<(usize, FxfsKeyV49)>,
344}
345
346impl std::fmt::Debug for EncryptedMutations {
347    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
348        f.debug_struct("EncryptedMutations")
349            .field("transactions", &self.transactions)
350            .field("len", &self.data.len())
351            .field(
352                "mutations_key_roll",
353                &self.mutations_key_roll.iter().map(|k| k.0).collect::<Vec<usize>>(),
354            )
355            .finish()
356    }
357}
358
359impl Versioned for EncryptedMutations {
360    fn max_serialized_size() -> Option<u64> {
361        Some(MAX_ENCRYPTED_MUTATIONS_SIZE as u64)
362    }
363}
364
365impl From<EncryptedMutationsV40> for EncryptedMutationsV49 {
366    fn from(value: EncryptedMutationsV40) -> Self {
367        EncryptedMutationsV49 {
368            transactions: value.transactions,
369            data: value.data,
370            mutations_key_roll: value
371                .mutations_key_roll
372                .into_iter()
373                .map(|(offset, key)| (offset, key.into()))
374                .collect(),
375        }
376    }
377}
378
379#[derive(Deserialize, Serialize, TypeFingerprint)]
380pub struct EncryptedMutationsV40 {
381    transactions: Vec<(JournalCheckpointV32, u64)>,
382    data: Vec<u8>,
383    mutations_key_roll: Vec<(usize, FxfsKeyV40)>,
384}
385
386impl Versioned for EncryptedMutationsV40 {
387    fn max_serialized_size() -> Option<u64> {
388        Some(MAX_ENCRYPTED_MUTATIONS_SIZE as u64)
389    }
390}
391
392impl EncryptedMutations {
393    fn from_replayed_mutations(
394        store_object_id: u64,
395        transactions: Vec<JournaledTransaction>,
396    ) -> Self {
397        let mut this = Self::default();
398        for JournaledTransaction { checkpoint, non_root_mutations, .. } in transactions {
399            for (object_id, mutation) in non_root_mutations {
400                if store_object_id == object_id {
401                    if let Mutation::EncryptedObjectStore(data) = mutation {
402                        this.push(&checkpoint, data);
403                    } else if let Mutation::UpdateMutationsKey(key) = mutation {
404                        this.mutations_key_roll.push((this.data.len(), key.into()));
405                    }
406                }
407            }
408        }
409        this
410    }
411
412    fn extend(&mut self, other: &EncryptedMutations) {
413        self.transactions.extend_from_slice(&other.transactions[..]);
414        self.mutations_key_roll.extend(
415            other
416                .mutations_key_roll
417                .iter()
418                .map(|(offset, key)| (offset + self.data.len(), key.clone())),
419        );
420        self.data.extend_from_slice(&other.data[..]);
421    }
422
423    fn push(&mut self, checkpoint: &JournalCheckpoint, data: Box<[u8]>) {
424        self.data.append(&mut data.into());
425        // If the checkpoint is the same as the last mutation we pushed, increment the count.
426        if let Some((last_checkpoint, count)) = self.transactions.last_mut() {
427            if last_checkpoint.file_offset == checkpoint.file_offset {
428                *count += 1;
429                return;
430            }
431        }
432        self.transactions.push((checkpoint.clone(), 1));
433    }
434}
435
436pub enum LockState {
437    Locked,
438    Unencrypted,
439    Unlocked { owner: Weak<dyn StoreOwner>, crypt: Arc<dyn Crypt> },
440
441    // The store is unlocked, but in a read-only state, and no flushes or other operations will be
442    // performed on the store.
443    UnlockedReadOnly(Arc<dyn Crypt>),
444
445    // The store is encrypted but is now in an unusable state (due to a failure to sync the journal
446    // after locking the store).  The store cannot be unlocked.
447    Invalid,
448
449    // Before we've read the StoreInfo we might not know whether the store is Locked or Unencrypted.
450    // This can happen when lazily opening stores (ObjectManager::lazy_open_store).
451    Unknown,
452
453    // The store is in the process of being locked.  Whilst the store is being locked, the store
454    // isn't usable; assertions will trip if any mutations are applied.
455    Locking,
456
457    // Whilst we're unlocking, we will replay encrypted mutations.  The store isn't usable until
458    // it's in the Unlocked state.
459    Unlocking,
460
461    // The store has been deleted.
462    Deleted,
463}
464
465impl LockState {
466    fn owner(&self) -> Option<Arc<dyn StoreOwner>> {
467        if let Self::Unlocked { owner, .. } = self { owner.upgrade() } else { None }
468    }
469}
470
471impl fmt::Debug for LockState {
472    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
473        formatter.write_str(match self {
474            LockState::Locked => "Locked",
475            LockState::Unencrypted => "Unencrypted",
476            LockState::Unlocked { .. } => "Unlocked",
477            LockState::UnlockedReadOnly(..) => "UnlockedReadOnly",
478            LockState::Invalid => "Invalid",
479            LockState::Unknown => "Unknown",
480            LockState::Locking => "Locking",
481            LockState::Unlocking => "Unlocking",
482            LockState::Deleted => "Deleted",
483        })
484    }
485}
486
487enum LastObjectId {
488    // This is used when the store is encrypted, but the key and ID isn't yet available.
489    Pending,
490
491    Unencrypted {
492        id: u64,
493    },
494
495    Encrypted {
496        // The *unencrypted* value of the last object ID.
497        id: u64,
498
499        // Encrypted stores will use a cipher to obfuscate the object ID.
500        cipher: Box<Ff1>,
501    },
502
503    Low32Bit {
504        reserved: HashSet<u32>,
505        unreserved: Vec<u32>,
506    },
507}
508
509impl LastObjectId {
510    /// Tries to get the next object ID.  Returns None if a new cipher is required because all
511    /// object IDs that can be generated with the current cipher have been exhausted, or if only
512    /// using the lower 32 bits which requires an async algorithm.
513    fn try_get_next(&mut self) -> Option<NonZero<u64>> {
514        match self {
515            LastObjectId::Unencrypted { id } => {
516                NonZero::new(id.wrapping_add(1)).inspect(|next| *id = next.get())
517            }
518            LastObjectId::Encrypted { id, cipher } => {
519                let mut next = *id;
520                let hi = next & OBJECT_ID_HI_MASK;
521                loop {
522                    if next as u32 == u32::MAX {
523                        return None;
524                    }
525                    next += 1;
526                    let candidate = hi | cipher.encrypt(next as u32) as u64;
527                    if let Some(candidate) = NonZero::new(candidate) {
528                        *id = next;
529                        return Some(candidate);
530                    }
531                }
532            }
533            _ => None,
534        }
535    }
536
537    /// Returns INVALID_OBJECT_ID if it's not possible to peek at the next object ID.
538    fn peek_next(&self) -> u64 {
539        match self {
540            LastObjectId::Unencrypted { id } => id.wrapping_add(1),
541            LastObjectId::Encrypted { id, cipher } => {
542                let mut next = *id;
543                let hi = next & OBJECT_ID_HI_MASK;
544                loop {
545                    if next as u32 == u32::MAX {
546                        return INVALID_OBJECT_ID;
547                    }
548                    next += 1;
549                    let candidate = hi | cipher.encrypt(next as u32) as u64;
550                    if candidate != INVALID_OBJECT_ID {
551                        return candidate;
552                    }
553                }
554            }
555            _ => INVALID_OBJECT_ID,
556        }
557    }
558
559    /// Returns INVALID_OBJECT_ID for algorithms that don't use the last ID.
560    fn id(&self) -> u64 {
561        match self {
562            LastObjectId::Unencrypted { id } | LastObjectId::Encrypted { id, .. } => *id,
563            _ => INVALID_OBJECT_ID,
564        }
565    }
566
567    /// Returns true if `id` is reserved (it must be 32 bits).
568    fn is_reserved(&self, id: u64) -> bool {
569        match self {
570            LastObjectId::Low32Bit { reserved, .. } => {
571                if let Ok(id) = id.try_into() {
572                    reserved.contains(&id)
573                } else {
574                    false
575                }
576            }
577            _ => false,
578        }
579    }
580
581    /// Reserves `id`.
582    fn reserve(&mut self, id: u64) {
583        match self {
584            LastObjectId::Low32Bit { reserved, .. } => {
585                assert!(reserved.insert(id.try_into().unwrap()))
586            }
587            _ => unreachable!(),
588        }
589    }
590
591    /// Unreserves `id`.
592    fn unreserve(&mut self, id: u64) {
593        match self {
594            LastObjectId::Low32Bit { unreserved, .. } => {
595                // To avoid races, where a reserved ID transitions from being reserved to being
596                // actually used in a committed transaction, we delay updating `reserved` until a
597                // suitable point.
598                //
599                // On thread A, we might have:
600                //
601                //   A1. Commit transaction (insert a record into the LSM tree that uses ID)
602                //   A2. `unreserve`
603                //
604                // And on another thread B, we might have:
605                //
606                //   B1. Drain `unreserved`.
607                //   B2. Check tree and `reserved` to see if ID is used.
608                //
609                // B2 will involve calling `LsmTree::layer_set` which should be thought of as a
610                // snapshot, so the change A1 might not be visible to thread B, but it won't matter
611                // because `reserved` will still include the ID.  So long as each thread does the
612                // operations in this order, it should be safe.
613                unreserved.push(id.try_into().unwrap())
614            }
615            _ => {}
616        }
617    }
618
619    /// Removes `unreserved` IDs from the `reserved` list.
620    fn drain_unreserved(&mut self) {
621        match self {
622            LastObjectId::Low32Bit { reserved, unreserved } => {
623                for u in unreserved.drain(..) {
624                    assert!(reserved.remove(&u));
625                }
626            }
627            _ => {}
628        }
629    }
630}
631
632pub struct ReservedId<'a>(&'a ObjectStore, NonZero<u64>);
633
634impl<'a> ReservedId<'a> {
635    fn new(store: &'a ObjectStore, id: NonZero<u64>) -> Self {
636        Self(store, id)
637    }
638
639    pub fn get(&self) -> u64 {
640        self.1.get()
641    }
642
643    /// The caller takes responsibility for this id.
644    #[must_use]
645    pub fn release(self) -> u64 {
646        let id = self.1.get();
647        std::mem::forget(self);
648        id
649    }
650}
651
652impl Drop for ReservedId<'_> {
653    fn drop(&mut self) {
654        self.0.last_object_id.lock().unreserve(self.1.get());
655    }
656}
657
658/// An object store supports a file like interface for objects.  Objects are keyed by a 64 bit
659/// identifier.  And object store has to be backed by a parent object store (which stores metadata
660/// for the object store).  The top-level object store (a.k.a. the root parent object store) is
661/// in-memory only.
662pub struct ObjectStore {
663    parent_store: Option<Arc<ObjectStore>>,
664    store_object_id: u64,
665    device: Arc<dyn Device>,
666    block_size: u64,
667    filesystem: Weak<FxFilesystem>,
668    // Lock ordering: This must be taken before `lock_state`.
669    store_info: Mutex<Option<StoreInfo>>,
670    tree: LSMTree<ObjectKey, ObjectValue>,
671
672    // When replaying the journal, the store cannot read StoreInfo until the whole journal
673    // has been replayed, so during that time, store_info_handle will be None and records
674    // just get sent to the tree. Once the journal has been replayed, we can open the store
675    // and load all the other layer information.
676    store_info_handle: OnceLock<DataObjectHandle<ObjectStore>>,
677
678    // The cipher to use for encrypted mutations, if this store is encrypted.
679    mutations_cipher: Mutex<Option<StreamCipher>>,
680
681    // Current lock state of the store.
682    // Lock ordering: This must be taken after `store_info`.
683    lock_state: Mutex<LockState>,
684    pub key_manager: KeyManager,
685
686    // Enable/disable tracing.
687    trace: AtomicBool,
688
689    // Informational counters for events occurring within the store.
690    counters: Mutex<ObjectStoreCounters>,
691
692    // These are updated in performance-sensitive code paths so we use atomics instead of counters.
693    device_read_ops: AtomicU64,
694    device_write_ops: AtomicU64,
695    logical_read_ops: AtomicU64,
696    logical_write_ops: AtomicU64,
697    graveyard_entries: AtomicU64,
698
699    // Contains the last object ID and, optionally, a cipher to be used when generating new object
700    // IDs.
701    last_object_id: Mutex<LastObjectId>,
702
703    // An optional callback to be invoked each time the ObjectStore flushes.  The callback is
704    // invoked at the end of flush, while the write lock is still held.
705    flush_callback: Mutex<Option<Box<dyn Fn(&ObjectStore) + Send + Sync + 'static>>>,
706}
707
708#[derive(Clone, Default)]
709struct ObjectStoreCounters {
710    mutations_applied: u64,
711    mutations_dropped: u64,
712    num_flushes: u64,
713    last_flush_time: Option<std::time::SystemTime>,
714}
715
716impl ObjectStore {
717    fn new(
718        parent_store: Option<Arc<ObjectStore>>,
719        store_object_id: u64,
720        filesystem: Arc<FxFilesystem>,
721        store_info: Option<StoreInfo>,
722        object_cache: Box<dyn ObjectCache<ObjectKey, ObjectValue>>,
723        mutations_cipher: Option<StreamCipher>,
724        lock_state: LockState,
725        last_object_id: LastObjectId,
726    ) -> Arc<ObjectStore> {
727        let device = filesystem.device();
728        let block_size = filesystem.block_size();
729        Arc::new(ObjectStore {
730            parent_store,
731            store_object_id,
732            device,
733            block_size,
734            filesystem: Arc::downgrade(&filesystem),
735            store_info: Mutex::new(store_info),
736            tree: LSMTree::new(merge::merge, object_cache),
737            store_info_handle: OnceLock::new(),
738            mutations_cipher: Mutex::new(mutations_cipher),
739            lock_state: Mutex::new(lock_state),
740            key_manager: KeyManager::new(),
741            trace: AtomicBool::new(false),
742            counters: Mutex::new(ObjectStoreCounters::default()),
743            device_read_ops: AtomicU64::new(0),
744            device_write_ops: AtomicU64::new(0),
745            logical_read_ops: AtomicU64::new(0),
746            logical_write_ops: AtomicU64::new(0),
747            graveyard_entries: AtomicU64::new(0),
748            last_object_id: Mutex::new(last_object_id),
749            flush_callback: Mutex::new(None),
750        })
751    }
752
753    fn new_empty(
754        parent_store: Option<Arc<ObjectStore>>,
755        store_object_id: u64,
756        filesystem: Arc<FxFilesystem>,
757        object_cache: Box<dyn ObjectCache<ObjectKey, ObjectValue>>,
758    ) -> Arc<Self> {
759        Self::new(
760            parent_store,
761            store_object_id,
762            filesystem,
763            Some(StoreInfo::default()),
764            object_cache,
765            None,
766            LockState::Unencrypted,
767            LastObjectId::Unencrypted { id: 0 },
768        )
769    }
770
771    /// Cycle breaker constructor that returns an ObjectStore without a filesystem.
772    /// This should only be used from super block code.
773    pub fn new_root_parent(device: Arc<dyn Device>, block_size: u64, store_object_id: u64) -> Self {
774        ObjectStore {
775            parent_store: None,
776            store_object_id,
777            device,
778            block_size,
779            filesystem: Weak::<FxFilesystem>::new(),
780            store_info: Mutex::new(Some(StoreInfo::default())),
781            tree: LSMTree::new(merge::merge, Box::new(NullCache {})),
782            store_info_handle: OnceLock::new(),
783            mutations_cipher: Mutex::new(None),
784            lock_state: Mutex::new(LockState::Unencrypted),
785            key_manager: KeyManager::new(),
786            trace: AtomicBool::new(false),
787            counters: Mutex::new(ObjectStoreCounters::default()),
788            device_read_ops: AtomicU64::new(0),
789            device_write_ops: AtomicU64::new(0),
790            logical_read_ops: AtomicU64::new(0),
791            logical_write_ops: AtomicU64::new(0),
792            graveyard_entries: AtomicU64::new(0),
793            last_object_id: Mutex::new(LastObjectId::Unencrypted { id: 0 }),
794            flush_callback: Mutex::new(None),
795        }
796    }
797
798    /// Used to set filesystem on root_parent stores at bootstrap time after the filesystem has
799    /// been created.
800    pub fn attach_filesystem(mut this: ObjectStore, filesystem: Arc<FxFilesystem>) -> ObjectStore {
801        this.filesystem = Arc::downgrade(&filesystem);
802        this
803    }
804
805    /// Create a child store. It is a multi-step process:
806    ///
807    ///   1. Call `ObjectStore::new_child_store`.
808    ///   2. Register the store with the object-manager.
809    ///   3. Call `ObjectStore::create` to write the store-info.
810    ///
811    /// If the procedure fails, care must be taken to unregister store with the object-manager.
812    ///
813    /// The steps have to be separate because of lifetime issues when working with a transaction.
814    async fn new_child_store(
815        self: &Arc<Self>,
816        transaction: &mut Transaction<'_>,
817        options: NewChildStoreOptions,
818        object_cache: Box<dyn ObjectCache<ObjectKey, ObjectValue>>,
819    ) -> Result<Arc<Self>, Error> {
820        ensure!(
821            !options.reserve_32bit_object_ids || !options.low_32_bit_object_ids,
822            FxfsError::InvalidArgs
823        );
824        let handle = if let Some(object_id) = NonZero::new(options.object_id) {
825            self.update_last_object_id(object_id.get());
826            let handle = ObjectStore::create_object_with_id(
827                self,
828                transaction,
829                ReservedId::new(self, object_id),
830                HandleOptions::default(),
831                None,
832            )?;
833            handle
834        } else {
835            ObjectStore::create_object(self, transaction, HandleOptions::default(), None).await?
836        };
837        let filesystem = self.filesystem();
838        let id = if options.reserve_32bit_object_ids { 0x1_0000_0000 } else { 0 };
839        let (last_object_id, last_object_id_in_memory) = if options.low_32_bit_object_ids {
840            (
841                LastObjectIdInfo::Low32Bit,
842                LastObjectId::Low32Bit { reserved: HashSet::new(), unreserved: Vec::new() },
843            )
844        } else if let Some(crypt) = &options.options.crypt {
845            let (object_id_wrapped, object_id_unwrapped) =
846                crypt.create_key(handle.object_id(), KeyPurpose::Metadata).await?;
847            (
848                LastObjectIdInfo::Encrypted { id, key: object_id_wrapped },
849                LastObjectId::Encrypted { id, cipher: Box::new(Ff1::new(&object_id_unwrapped)) },
850            )
851        } else {
852            (LastObjectIdInfo::Unencrypted { id }, LastObjectId::Unencrypted { id })
853        };
854        let store = if let Some(crypt) = options.options.crypt {
855            let (wrapped_key, unwrapped_key) =
856                crypt.create_key(handle.object_id(), KeyPurpose::Metadata).await?;
857            Self::new(
858                Some(self.clone()),
859                handle.object_id(),
860                filesystem.clone(),
861                Some(StoreInfo {
862                    mutations_key: Some(wrapped_key),
863                    last_object_id,
864                    guid: options.guid.unwrap_or_else(|| *Uuid::new_v4().as_bytes()),
865                    ..Default::default()
866                }),
867                object_cache,
868                Some(StreamCipher::new(&unwrapped_key, 0)),
869                LockState::Unlocked { owner: options.options.owner, crypt },
870                last_object_id_in_memory,
871            )
872        } else {
873            Self::new(
874                Some(self.clone()),
875                handle.object_id(),
876                filesystem.clone(),
877                Some(StoreInfo {
878                    last_object_id,
879                    guid: options.guid.unwrap_or_else(|| *Uuid::new_v4().as_bytes()),
880                    ..Default::default()
881                }),
882                object_cache,
883                None,
884                LockState::Unencrypted,
885                last_object_id_in_memory,
886            )
887        };
888        assert!(store.store_info_handle.set(handle).is_ok());
889        Ok(store)
890    }
891
892    /// Actually creates the store in a transaction.  This will also create a root directory and
893    /// graveyard directory for the store.  See `new_child_store` above.
894    async fn create<'a>(
895        self: &'a Arc<Self>,
896        transaction: &mut Transaction<'a>,
897    ) -> Result<(), Error> {
898        let buf = {
899            // Create a root directory and graveyard directory.
900            let graveyard_directory_object_id = Graveyard::create(transaction, &self).await?;
901            let root_directory = Directory::create(transaction, &self, None).await?;
902
903            let serialized_info = {
904                let mut store_info = self.store_info.lock();
905                let store_info = store_info.as_mut().unwrap();
906
907                store_info.graveyard_directory_object_id = graveyard_directory_object_id;
908                store_info.root_directory_object_id = root_directory.object_id();
909
910                let mut serialized_info = Vec::new();
911                store_info.serialize_with_version(&mut serialized_info)?;
912                serialized_info
913            };
914            let mut buf = self.device.allocate_buffer(serialized_info.len()).await;
915            buf.as_mut_slice().copy_from_slice(&serialized_info[..]);
916            buf
917        };
918
919        if self.filesystem().options().image_builder_mode.is_some() {
920            // If we're in image builder mode, we want to avoid writing to disk unless explicitly
921            // asked to. New object stores will have their StoreInfo written when we compact in
922            // FxFilesystem::finalize().
923            Ok(())
924        } else {
925            self.store_info_handle.get().unwrap().txn_write(transaction, 0u64, buf.as_ref()).await
926        }
927    }
928
929    pub fn set_trace(&self, trace: bool) {
930        let old_value = self.trace.swap(trace, Ordering::Relaxed);
931        if trace != old_value {
932            info!(store_id = self.store_object_id(), trace; "OS: trace",);
933        }
934    }
935
936    /// Sets a callback to be invoked each time the ObjectStore flushes.  The callback is invoked at
937    /// the end of flush, while the write lock is still held.
938    pub fn set_flush_callback<F: Fn(&ObjectStore) + Send + Sync + 'static>(&self, callback: F) {
939        let mut flush_callback = self.flush_callback.lock();
940        *flush_callback = Some(Box::new(callback));
941    }
942
943    pub fn is_root(&self) -> bool {
944        if let Some(parent) = &self.parent_store {
945            parent.parent_store.is_none()
946        } else {
947            // The root parent store isn't the root store.
948            false
949        }
950    }
951
952    /// Populates an inspect node with store statistics.
953    pub fn record_data(self: &Arc<Self>, root: &fuchsia_inspect::Node) {
954        // TODO(https://fxbug.dev/42069513): Push-back or rate-limit to prevent DoS.
955        let counters = self.counters.lock();
956        if let Some(store_info) = self.store_info() {
957            root.record_string("guid", Uuid::from_bytes(store_info.guid).to_string());
958        };
959        root.record_uint("store_object_id", self.store_object_id);
960        root.record_uint("mutations_applied", counters.mutations_applied);
961        root.record_uint("mutations_dropped", counters.mutations_dropped);
962        root.record_uint("num_flushes", counters.num_flushes);
963        if let Some(last_flush_time) = counters.last_flush_time.as_ref() {
964            root.record_uint(
965                "last_flush_time_ms",
966                last_flush_time
967                    .duration_since(std::time::UNIX_EPOCH)
968                    .unwrap_or(std::time::Duration::ZERO)
969                    .as_millis()
970                    .try_into()
971                    .unwrap_or(0u64),
972            );
973        }
974        root.record_uint("device_read_ops", self.device_read_ops.load(Ordering::Relaxed));
975        root.record_uint("device_write_ops", self.device_write_ops.load(Ordering::Relaxed));
976        root.record_uint("logical_read_ops", self.logical_read_ops.load(Ordering::Relaxed));
977        root.record_uint("logical_write_ops", self.logical_write_ops.load(Ordering::Relaxed));
978        root.record_uint("graveyard_entries", self.graveyard_entries.load(Ordering::Relaxed));
979        {
980            let last_object_id = self.last_object_id.lock();
981            root.record_uint("object_id_hi", last_object_id.id() >> 32);
982            root.record_bool(
983                "low_32_bit_object_ids",
984                matches!(&*last_object_id, LastObjectId::Low32Bit { .. }),
985            );
986        }
987
988        let this = self.clone();
989        root.record_child("lsm_tree", move |node| this.tree().record_inspect_data(node));
990    }
991
992    pub fn device(&self) -> &Arc<dyn Device> {
993        &self.device
994    }
995
996    pub fn block_size(&self) -> u64 {
997        self.block_size
998    }
999
1000    pub fn filesystem(&self) -> Arc<FxFilesystem> {
1001        self.filesystem.upgrade().unwrap()
1002    }
1003
1004    pub fn store_object_id(&self) -> u64 {
1005        self.store_object_id
1006    }
1007
1008    pub fn tree(&self) -> &LSMTree<ObjectKey, ObjectValue> {
1009        &self.tree
1010    }
1011
1012    pub fn root_directory_object_id(&self) -> u64 {
1013        self.store_info.lock().as_ref().unwrap().root_directory_object_id
1014    }
1015
1016    pub fn guid(&self) -> [u8; 16] {
1017        self.store_info.lock().as_ref().unwrap().guid
1018    }
1019
1020    pub fn graveyard_directory_object_id(&self) -> u64 {
1021        self.store_info.lock().as_ref().unwrap().graveyard_directory_object_id
1022    }
1023
1024    fn set_graveyard_directory_object_id(&self, oid: u64) {
1025        assert_eq!(
1026            std::mem::replace(
1027                &mut self.store_info.lock().as_mut().unwrap().graveyard_directory_object_id,
1028                oid
1029            ),
1030            INVALID_OBJECT_ID
1031        );
1032    }
1033
1034    pub fn object_count(&self) -> u64 {
1035        self.store_info.lock().as_ref().unwrap().object_count
1036    }
1037
1038    /// Returns INVALID_OBJECT_ID for algorithms that don't use the last ID.
1039    pub(crate) fn unencrypted_last_object_id(&self) -> u64 {
1040        self.last_object_id.lock().id()
1041    }
1042
1043    pub fn key_manager(&self) -> &KeyManager {
1044        &self.key_manager
1045    }
1046
1047    pub fn parent_store(&self) -> Option<&Arc<ObjectStore>> {
1048        self.parent_store.as_ref()
1049    }
1050
1051    /// Returns the crypt object for the store.  Returns None if the store is unencrypted.
1052    pub fn crypt(&self) -> Option<Arc<dyn Crypt>> {
1053        match &*self.lock_state.lock() {
1054            LockState::Locked => panic!("Store is locked"),
1055            LockState::Invalid
1056            | LockState::Unencrypted
1057            | LockState::Locking
1058            | LockState::Unlocking
1059            | LockState::Deleted => None,
1060            LockState::Unlocked { crypt, .. } => Some(crypt.clone()),
1061            LockState::UnlockedReadOnly(crypt) => Some(crypt.clone()),
1062            LockState::Unknown => {
1063                panic!("Store is of unknown lock state; has the journal been replayed yet?")
1064            }
1065        }
1066    }
1067
1068    /// Returns the id of the internal directory. Returns a NotFound error if this has not been
1069    /// initialized.
1070    pub fn get_internal_directory_id(self: &Arc<Self>) -> Result<u64, Error> {
1071        if let Some(store_info) = self.store_info.lock().as_ref() {
1072            if store_info.internal_directory_object_id == INVALID_OBJECT_ID {
1073                Err(FxfsError::NotFound.into())
1074            } else {
1075                Ok(store_info.internal_directory_object_id)
1076            }
1077        } else {
1078            Err(FxfsError::Unavailable.into())
1079        }
1080    }
1081
1082    pub async fn get_or_create_internal_directory_id(self: &Arc<Self>) -> Result<u64, Error> {
1083        // Create the transaction first to use the object store lock.
1084        let mut transaction = self
1085            .filesystem()
1086            .new_transaction(
1087                lock_keys![LockKey::InternalDirectory { store_object_id: self.store_object_id }],
1088                Options::default(),
1089            )
1090            .await?;
1091        let obj_id = self.store_info.lock().as_ref().unwrap().internal_directory_object_id;
1092        if obj_id != INVALID_OBJECT_ID {
1093            return Ok(obj_id);
1094        }
1095
1096        // Need to create an internal directory.
1097        let directory = Directory::create(&mut transaction, self, None).await?;
1098
1099        transaction.add(self.store_object_id, Mutation::CreateInternalDir(directory.object_id()));
1100        transaction.commit().await?;
1101        Ok(directory.object_id())
1102    }
1103
1104    /// Returns the file size for the object without opening the object.
1105    async fn get_file_size(&self, object_id: u64) -> Result<u64, Error> {
1106        let item = self
1107            .tree
1108            .find(&ObjectKey::attribute(
1109                object_id,
1110                DEFAULT_DATA_ATTRIBUTE_ID,
1111                AttributeKey::Attribute,
1112            ))
1113            .await?
1114            .ok_or(FxfsError::NotFound)?;
1115        if let ObjectValue::Attribute { size, .. } = item.value {
1116            Ok(size)
1117        } else {
1118            bail!(FxfsError::NotFile);
1119        }
1120    }
1121
1122    #[cfg(feature = "migration")]
1123    pub fn last_object_id(&self) -> u64 {
1124        self.last_object_id.lock().id()
1125    }
1126
1127    /// Provides access to the allocator to mark a specific region of the device as allocated.
1128    #[cfg(feature = "migration")]
1129    pub fn mark_allocated(
1130        &self,
1131        transaction: &mut Transaction<'_>,
1132        store_object_id: u64,
1133        device_range: std::ops::Range<u64>,
1134    ) -> Result<(), Error> {
1135        self.allocator().mark_allocated(transaction, store_object_id, device_range)
1136    }
1137
1138    /// `crypt` can be provided if the crypt service should be different to the default; see the
1139    /// comment on create_object.  Users should avoid having more than one handle open for the same
1140    /// object at the same time because they might get out-of-sync; there is no code that will
1141    /// prevent this.  One example where this can cause an issue is if the object ends up using a
1142    /// permanent key (which is the case if a value is passed for `crypt`), the permanent key is
1143    /// dropped when a handle is dropped, which will impact any other handles for the same object.
1144    pub async fn open_object<S: HandleOwner>(
1145        owner: &Arc<S>,
1146        obj_id: u64,
1147        options: HandleOptions,
1148        crypt: Option<Arc<dyn Crypt>>,
1149    ) -> Result<DataObjectHandle<S>, Error> {
1150        let store = owner.as_ref().as_ref();
1151        let mut fsverity_descriptor = None;
1152        let mut overwrite_ranges = Vec::new();
1153        let item = store
1154            .tree
1155            .find(&ObjectKey::attribute(obj_id, DEFAULT_DATA_ATTRIBUTE_ID, AttributeKey::Attribute))
1156            .await?
1157            .ok_or(FxfsError::NotFound)?;
1158
1159        let (size, track_overwrite_extents) = match item.value {
1160            ObjectValue::Attribute { size, has_overwrite_extents } => (size, has_overwrite_extents),
1161            ObjectValue::VerifiedAttribute { size, fsverity_metadata } => {
1162                if !options.skip_fsverity {
1163                    fsverity_descriptor = Some(fsverity_metadata);
1164                }
1165                // We only track the overwrite extents in memory for writes, reads handle them
1166                // implicitly, which means verified files (where the data won't change anymore)
1167                // don't need to track them.
1168                (size, false)
1169            }
1170            _ => bail!(anyhow!(FxfsError::Inconsistent).context("open_object: Expected attibute")),
1171        };
1172
1173        ensure!(size <= MAX_FILE_SIZE, FxfsError::Inconsistent);
1174
1175        if track_overwrite_extents {
1176            let layer_set = store.tree.layer_set();
1177            let mut merger = layer_set.merger();
1178            let mut iter = merger
1179                .query(Query::FullRange(&ObjectKey::attribute(
1180                    obj_id,
1181                    DEFAULT_DATA_ATTRIBUTE_ID,
1182                    AttributeKey::Extent(ExtentKey::search_key_from_offset(0)),
1183                )))
1184                .await?;
1185            loop {
1186                match iter.get() {
1187                    Some(ItemRef {
1188                        key:
1189                            ObjectKey {
1190                                object_id,
1191                                data:
1192                                    ObjectKeyData::Attribute(
1193                                        attribute_id,
1194                                        AttributeKey::Extent(ExtentKey { range }),
1195                                    ),
1196                            },
1197                        value,
1198                        ..
1199                    }) if *object_id == obj_id && *attribute_id == DEFAULT_DATA_ATTRIBUTE_ID => {
1200                        match value {
1201                            ObjectValue::Extent(ExtentValue::None)
1202                            | ObjectValue::Extent(ExtentValue::Some {
1203                                mode: ExtentMode::Raw,
1204                                ..
1205                            })
1206                            | ObjectValue::Extent(ExtentValue::Some {
1207                                mode: ExtentMode::Cow(_),
1208                                ..
1209                            }) => (),
1210                            ObjectValue::Extent(ExtentValue::Some {
1211                                mode: ExtentMode::OverwritePartial(_),
1212                                ..
1213                            })
1214                            | ObjectValue::Extent(ExtentValue::Some {
1215                                mode: ExtentMode::Overwrite,
1216                                ..
1217                            }) => overwrite_ranges.push(range.clone()),
1218                            _ => bail!(
1219                                anyhow!(FxfsError::Inconsistent)
1220                                    .context("open_object: Expected extent")
1221                            ),
1222                        }
1223                        iter.advance().await?;
1224                    }
1225                    _ => break,
1226                }
1227            }
1228        }
1229
1230        // If a crypt service has been specified, it needs to be a permanent key because cached
1231        // keys can only use the store's crypt service.
1232        let permanent = if let Some(crypt) = crypt {
1233            store
1234                .key_manager
1235                .get_keys(
1236                    obj_id,
1237                    crypt.as_ref(),
1238                    &mut Some(async || store.get_keys(obj_id).await),
1239                    /* permanent= */ true,
1240                    /* force= */ false,
1241                )
1242                .await?;
1243            true
1244        } else {
1245            false
1246        };
1247        let data_object_handle = DataObjectHandle::new(
1248            owner.clone(),
1249            obj_id,
1250            permanent,
1251            DEFAULT_DATA_ATTRIBUTE_ID,
1252            size,
1253            FsverityState::None,
1254            options,
1255            false,
1256            &overwrite_ranges,
1257        );
1258        if let Some(descriptor) = fsverity_descriptor {
1259            data_object_handle
1260                .set_fsverity_state_some(descriptor)
1261                .await
1262                .context("Invalid or mismatched merkle tree")?;
1263        }
1264        Ok(data_object_handle)
1265    }
1266
1267    pub fn create_object_with_id<S: HandleOwner>(
1268        owner: &Arc<S>,
1269        transaction: &mut Transaction<'_>,
1270        reserved_object_id: ReservedId<'_>,
1271        options: HandleOptions,
1272        encryption_options: Option<ObjectEncryptionOptions>,
1273    ) -> Result<DataObjectHandle<S>, Error> {
1274        let store = owner.as_ref().as_ref();
1275        // Don't permit creating unencrypted objects in an encrypted store.  The converse is OK.
1276        debug_assert!(store.crypt().is_none() || encryption_options.is_some());
1277        let now = Timestamp::now();
1278        let object_id = reserved_object_id.get();
1279        assert!(
1280            transaction
1281                .add(
1282                    store.store_object_id(),
1283                    Mutation::insert_object(
1284                        ObjectKey::object(reserved_object_id.release()),
1285                        ObjectValue::file(
1286                            1,
1287                            0,
1288                            now.clone(),
1289                            now.clone(),
1290                            now.clone(),
1291                            now,
1292                            0,
1293                            None
1294                        ),
1295                    ),
1296                )
1297                .is_none()
1298        );
1299        let mut permanent_keys = false;
1300        if let Some(ObjectEncryptionOptions { permanent, key_id, key, unwrapped_key }) =
1301            encryption_options
1302        {
1303            permanent_keys = permanent;
1304            let cipher = key_to_cipher(&key, &unwrapped_key)?;
1305            transaction.add(
1306                store.store_object_id(),
1307                Mutation::insert_object(
1308                    ObjectKey::keys(object_id),
1309                    ObjectValue::keys(vec![(key_id, key)].into()),
1310                ),
1311            );
1312            store.key_manager.insert(
1313                object_id,
1314                Arc::new(vec![(key_id, CipherHolder::Cipher(cipher))].into()),
1315                permanent,
1316            );
1317        }
1318        transaction.add(
1319            store.store_object_id(),
1320            Mutation::insert_object(
1321                ObjectKey::attribute(object_id, DEFAULT_DATA_ATTRIBUTE_ID, AttributeKey::Attribute),
1322                // This is a new object so nothing has pre-allocated overwrite extents yet.
1323                ObjectValue::attribute(0, false),
1324            ),
1325        );
1326        Ok(DataObjectHandle::new(
1327            owner.clone(),
1328            object_id,
1329            permanent_keys,
1330            DEFAULT_DATA_ATTRIBUTE_ID,
1331            0,
1332            FsverityState::None,
1333            options,
1334            false,
1335            &[],
1336        ))
1337    }
1338
1339    /// Creates an object in the store.
1340    ///
1341    /// If the store is encrypted, the object will be automatically encrypted as well.
1342    /// If `wrapping_key_id` is set, the new keys will be wrapped with that specific key, and
1343    /// otherwise the default data key is used.
1344    pub async fn create_object<S: HandleOwner>(
1345        owner: &Arc<S>,
1346        mut transaction: &mut Transaction<'_>,
1347        options: HandleOptions,
1348        wrapping_key_id: Option<WrappingKeyId>,
1349    ) -> Result<DataObjectHandle<S>, Error> {
1350        let store = owner.as_ref().as_ref();
1351        let object_id = store.get_next_object_id(transaction.txn_guard()).await?;
1352        let crypt = store.crypt();
1353        let encryption_options = if let Some(crypt) = crypt {
1354            let key_id =
1355                if wrapping_key_id.is_some() { FSCRYPT_KEY_ID } else { VOLUME_DATA_KEY_ID };
1356            let (key, unwrapped_key) = if let Some(wrapping_key_id) = wrapping_key_id {
1357                crypt.create_key_with_id(object_id.get(), wrapping_key_id, ObjectType::File).await?
1358            } else {
1359                let (fxfs_key, unwrapped_key) =
1360                    crypt.create_key(object_id.get(), KeyPurpose::Data).await?;
1361                (EncryptionKey::Fxfs(fxfs_key), unwrapped_key)
1362            };
1363            Some(ObjectEncryptionOptions { permanent: false, key_id, key, unwrapped_key })
1364        } else {
1365            None
1366        };
1367        ObjectStore::create_object_with_id(
1368            owner,
1369            &mut transaction,
1370            object_id,
1371            options,
1372            encryption_options,
1373        )
1374    }
1375
1376    /// Creates an object using explicitly provided keys.
1377    ///
1378    /// There are some cases where an encrypted object needs to be created in an unencrypted store.
1379    /// For example, when layer files for a child store are created in the root store, but they must
1380    /// be encrypted using the child store's keys.  This method exists for that purpose.
1381    pub(crate) async fn create_object_with_key<S: HandleOwner>(
1382        owner: &Arc<S>,
1383        mut transaction: &mut Transaction<'_>,
1384        object_id: ReservedId<'_>,
1385        options: HandleOptions,
1386        key: EncryptionKey,
1387        unwrapped_key: UnwrappedKey,
1388    ) -> Result<DataObjectHandle<S>, Error> {
1389        ObjectStore::create_object_with_id(
1390            owner,
1391            &mut transaction,
1392            object_id,
1393            options,
1394            Some(ObjectEncryptionOptions {
1395                permanent: true,
1396                key_id: VOLUME_DATA_KEY_ID,
1397                key,
1398                unwrapped_key,
1399            }),
1400        )
1401    }
1402
1403    /// Adjusts the reference count for a given object.  If the reference count reaches zero, the
1404    /// object is moved into the graveyard and true is returned.
1405    pub async fn adjust_refs(
1406        &self,
1407        transaction: &mut Transaction<'_>,
1408        object_id: u64,
1409        delta: i64,
1410    ) -> Result<bool, Error> {
1411        let mut mutation = self.txn_get_object_mutation(transaction, object_id).await?;
1412        let refs = if let ObjectValue::Object {
1413            kind: ObjectKind::File { refs, .. } | ObjectKind::Symlink { refs, .. },
1414            ..
1415        } = &mut mutation.item.value
1416        {
1417            *refs =
1418                refs.checked_add_signed(delta).ok_or_else(|| anyhow!("refs underflow/overflow"))?;
1419            refs
1420        } else {
1421            bail!(FxfsError::NotFile);
1422        };
1423        if *refs == 0 {
1424            self.add_to_graveyard(transaction, object_id);
1425
1426            // We might still need to adjust the reference count if delta was something other than
1427            // -1.
1428            if delta != -1 {
1429                *refs = 1;
1430                transaction.add(self.store_object_id, Mutation::ObjectStore(mutation));
1431            }
1432            // Otherwise, we don't commit the mutation as we want to keep reference count as 1 for
1433            // objects in graveyard.
1434            Ok(true)
1435        } else {
1436            transaction.add(self.store_object_id, Mutation::ObjectStore(mutation));
1437            Ok(false)
1438        }
1439    }
1440
1441    // Purges an object that is in the graveyard.
1442    pub async fn tombstone_object(
1443        &self,
1444        object_id: u64,
1445        txn_options: Options<'_>,
1446    ) -> Result<(), Error> {
1447        self.key_manager.remove(object_id).await;
1448        let fs = self.filesystem();
1449        let truncate_guard = fs.truncate_guard(self.store_object_id, object_id).await;
1450        self.trim_or_tombstone(object_id, true, txn_options, &truncate_guard).await
1451    }
1452
1453    /// Trim extents beyond the end of a file for all attributes.  This will remove the entry from
1454    /// the graveyard when done.
1455    pub async fn trim(
1456        &self,
1457        object_id: u64,
1458        truncate_guard: &TruncateGuard<'_>,
1459    ) -> Result<(), Error> {
1460        // For the root and root parent store, we would need to use the metadata reservation which
1461        // we don't currently support, so assert that we're not those stores.
1462        assert!(self.parent_store.as_ref().unwrap().parent_store.is_some());
1463
1464        self.trim_or_tombstone(
1465            object_id,
1466            false,
1467            Options { borrow_metadata_space: true, ..Default::default() },
1468            truncate_guard,
1469        )
1470        .await
1471    }
1472
1473    /// Trims or tombstones an object.
1474    async fn trim_or_tombstone(
1475        &self,
1476        object_id: u64,
1477        for_tombstone: bool,
1478        txn_options: Options<'_>,
1479        _truncate_guard: &TruncateGuard<'_>,
1480    ) -> Result<(), Error> {
1481        let fs = self.filesystem();
1482        let mut next_attribute = Some(0);
1483        while let Some(attribute_id) = next_attribute.take() {
1484            let mut transaction = fs
1485                .clone()
1486                .new_transaction(
1487                    lock_keys![
1488                        LockKey::object_attribute(self.store_object_id, object_id, attribute_id),
1489                        LockKey::object(self.store_object_id, object_id),
1490                    ],
1491                    txn_options,
1492                )
1493                .await?;
1494
1495            match self
1496                .trim_some(
1497                    &mut transaction,
1498                    object_id,
1499                    attribute_id,
1500                    if for_tombstone {
1501                        TrimMode::Tombstone(TombstoneMode::Object)
1502                    } else {
1503                        TrimMode::UseSize
1504                    },
1505                )
1506                .await?
1507            {
1508                TrimResult::Incomplete => next_attribute = Some(attribute_id),
1509                TrimResult::Done(None) => {
1510                    if for_tombstone
1511                        || matches!(
1512                            self.tree
1513                                .find(&ObjectKey::graveyard_entry(
1514                                    self.graveyard_directory_object_id(),
1515                                    object_id,
1516                                ))
1517                                .await?,
1518                            Some(Item { value: ObjectValue::Trim, .. })
1519                        )
1520                    {
1521                        self.remove_from_graveyard(&mut transaction, object_id);
1522                    }
1523                }
1524                TrimResult::Done(id) => next_attribute = id,
1525            }
1526
1527            if !transaction.mutations().is_empty() {
1528                transaction.commit().await?;
1529            }
1530        }
1531        Ok(())
1532    }
1533
1534    // Purges an object's attribute that is in the graveyard.
1535    pub async fn tombstone_attribute(
1536        &self,
1537        object_id: u64,
1538        attribute_id: u64,
1539        txn_options: Options<'_>,
1540    ) -> Result<(), Error> {
1541        let fs = self.filesystem();
1542        let mut trim_result = TrimResult::Incomplete;
1543        while matches!(trim_result, TrimResult::Incomplete) {
1544            let mut transaction = fs
1545                .clone()
1546                .new_transaction(
1547                    lock_keys![
1548                        LockKey::object_attribute(self.store_object_id, object_id, attribute_id),
1549                        LockKey::object(self.store_object_id, object_id),
1550                    ],
1551                    txn_options,
1552                )
1553                .await?;
1554            trim_result = self
1555                .trim_some(
1556                    &mut transaction,
1557                    object_id,
1558                    attribute_id,
1559                    TrimMode::Tombstone(TombstoneMode::Attribute),
1560                )
1561                .await?;
1562            if let TrimResult::Done(..) = trim_result {
1563                self.remove_attribute_from_graveyard(&mut transaction, object_id, attribute_id)
1564            }
1565            if !transaction.mutations().is_empty() {
1566                transaction.commit().await?;
1567            }
1568        }
1569        Ok(())
1570    }
1571
1572    /// Deletes extents for attribute `attribute_id` in object `object_id`.  Also see the comments
1573    /// for TrimMode and TrimResult. Should hold a lock on the attribute, and the object as it
1574    /// performs a read-modify-write on the sizes.
1575    pub async fn trim_some(
1576        &self,
1577        transaction: &mut Transaction<'_>,
1578        object_id: u64,
1579        attribute_id: u64,
1580        mode: TrimMode,
1581    ) -> Result<TrimResult, Error> {
1582        let layer_set = self.tree.layer_set();
1583        let mut merger = layer_set.merger();
1584
1585        let aligned_offset = match mode {
1586            TrimMode::FromOffset(offset) => {
1587                round_up(offset, self.block_size).ok_or(FxfsError::Inconsistent)?
1588            }
1589            TrimMode::Tombstone(..) => 0,
1590            TrimMode::UseSize => {
1591                let iter = merger
1592                    .query(Query::FullRange(&ObjectKey::attribute(
1593                        object_id,
1594                        attribute_id,
1595                        AttributeKey::Attribute,
1596                    )))
1597                    .await?;
1598                if let Some(item_ref) = iter.get() {
1599                    if item_ref.key.object_id != object_id {
1600                        return Ok(TrimResult::Done(None));
1601                    }
1602
1603                    if let ItemRef {
1604                        key:
1605                            ObjectKey {
1606                                data:
1607                                    ObjectKeyData::Attribute(size_attribute_id, AttributeKey::Attribute),
1608                                ..
1609                            },
1610                        value: ObjectValue::Attribute { size, .. },
1611                        ..
1612                    } = item_ref
1613                    {
1614                        // If we found a different attribute_id, return so we can get the
1615                        // right lock.
1616                        if *size_attribute_id != attribute_id {
1617                            return Ok(TrimResult::Done(Some(*size_attribute_id)));
1618                        }
1619                        round_up(*size, self.block_size).ok_or(FxfsError::Inconsistent)?
1620                    } else {
1621                        // At time of writing, we should always see a size record or None here, but
1622                        // asserting here would be brittle so just skip to the the next attribute
1623                        // instead.
1624                        return Ok(TrimResult::Done(Some(attribute_id + 1)));
1625                    }
1626                } else {
1627                    // End of the tree.
1628                    return Ok(TrimResult::Done(None));
1629                }
1630            }
1631        };
1632
1633        // Loop over the extents and deallocate them.
1634        let mut iter = merger
1635            .query(Query::FullRange(&ObjectKey::from_extent(
1636                object_id,
1637                attribute_id,
1638                ExtentKey::search_key_from_offset(aligned_offset),
1639            )))
1640            .await?;
1641        let mut end = 0;
1642        let allocator = self.allocator();
1643        let mut result = TrimResult::Done(None);
1644        let mut deallocated = 0;
1645        let block_size = self.block_size;
1646
1647        while let Some(item_ref) = iter.get() {
1648            if item_ref.key.object_id != object_id {
1649                break;
1650            }
1651            if let ObjectKey {
1652                data: ObjectKeyData::Attribute(extent_attribute_id, attribute_key),
1653                ..
1654            } = item_ref.key
1655            {
1656                if *extent_attribute_id != attribute_id {
1657                    result = TrimResult::Done(Some(*extent_attribute_id));
1658                    break;
1659                }
1660                if let (
1661                    AttributeKey::Extent(ExtentKey { range }),
1662                    ObjectValue::Extent(ExtentValue::Some { device_offset, .. }),
1663                ) = (attribute_key, item_ref.value)
1664                {
1665                    let start = std::cmp::max(range.start, aligned_offset);
1666                    ensure!(start < range.end, FxfsError::Inconsistent);
1667                    let device_offset = device_offset
1668                        .checked_add(start - range.start)
1669                        .ok_or(FxfsError::Inconsistent)?;
1670                    end = range.end;
1671                    let len = end - start;
1672                    let device_range = device_offset..device_offset + len;
1673                    ensure!(device_range.is_aligned(block_size), FxfsError::Inconsistent);
1674                    allocator.deallocate(transaction, self.store_object_id, device_range).await?;
1675                    deallocated += len;
1676                    // Stop if the transaction is getting too big.
1677                    if transaction.mutations().len() >= TRANSACTION_MUTATION_THRESHOLD {
1678                        result = TrimResult::Incomplete;
1679                        break;
1680                    }
1681                }
1682            }
1683            iter.advance().await?;
1684        }
1685
1686        let finished_tombstone_object = matches!(mode, TrimMode::Tombstone(TombstoneMode::Object))
1687            && matches!(result, TrimResult::Done(None));
1688        let finished_tombstone_attribute =
1689            matches!(mode, TrimMode::Tombstone(TombstoneMode::Attribute))
1690                && !matches!(result, TrimResult::Incomplete);
1691        let mut object_mutation = None;
1692        let nodes = if finished_tombstone_object { -1 } else { 0 };
1693        if nodes != 0 || deallocated != 0 {
1694            let mutation = self.txn_get_object_mutation(transaction, object_id).await?;
1695            if let ObjectValue::Object { attributes: ObjectAttributes { project_id, .. }, .. } =
1696                mutation.item.value
1697            {
1698                if project_id != 0 {
1699                    transaction.add(
1700                        self.store_object_id,
1701                        Mutation::merge_object(
1702                            ObjectKey::project_usage(self.root_directory_object_id(), project_id),
1703                            ObjectValue::BytesAndNodes {
1704                                bytes: -i64::try_from(deallocated).unwrap(),
1705                                nodes,
1706                            },
1707                        ),
1708                    );
1709                }
1710                object_mutation = Some(mutation);
1711            } else {
1712                panic!("Inconsistent object type.");
1713            }
1714        }
1715
1716        // Deletion marker records *must* be merged so as to consume all other records for the
1717        // object.
1718        if finished_tombstone_object {
1719            transaction.add(
1720                self.store_object_id,
1721                Mutation::merge_object(ObjectKey::object(object_id), ObjectValue::None),
1722            );
1723        } else {
1724            if finished_tombstone_attribute {
1725                transaction.add(
1726                    self.store_object_id,
1727                    Mutation::merge_object(
1728                        ObjectKey::attribute(object_id, attribute_id, AttributeKey::Attribute),
1729                        ObjectValue::None,
1730                    ),
1731                );
1732            }
1733            if deallocated > 0 {
1734                let mut mutation = match object_mutation {
1735                    Some(mutation) => mutation,
1736                    None => self.txn_get_object_mutation(transaction, object_id).await?,
1737                };
1738                transaction.add(
1739                    self.store_object_id,
1740                    Mutation::merge_object(
1741                        ObjectKey::extent(object_id, attribute_id, aligned_offset..end),
1742                        ObjectValue::deleted_extent(),
1743                    ),
1744                );
1745                // Update allocated size.
1746                if let ObjectValue::Object {
1747                    attributes: ObjectAttributes { allocated_size, .. },
1748                    ..
1749                } = &mut mutation.item.value
1750                {
1751                    // The only way for these to fail are if the volume is inconsistent.
1752                    *allocated_size = allocated_size.checked_sub(deallocated).ok_or_else(|| {
1753                        anyhow!(FxfsError::Inconsistent).context("Allocated size overflow")
1754                    })?;
1755                } else {
1756                    panic!("Unexpected object value");
1757                }
1758                transaction.add(self.store_object_id, Mutation::ObjectStore(mutation));
1759            }
1760        }
1761        Ok(result)
1762    }
1763
1764    /// Returns all objects that exist in the parent store that pertain to this object store.
1765    /// Note that this doesn't include the object_id of the store itself which is generally
1766    /// referenced externally.
1767    pub fn parent_objects(&self) -> Vec<u64> {
1768        assert!(self.store_info_handle.get().is_some());
1769        self.store_info.lock().as_ref().unwrap().parent_objects()
1770    }
1771
1772    /// Returns root objects for this store.
1773    pub fn root_objects(&self) -> Vec<u64> {
1774        let mut objects = Vec::new();
1775        let store_info = self.store_info.lock();
1776        let info = store_info.as_ref().unwrap();
1777        if info.root_directory_object_id != INVALID_OBJECT_ID {
1778            objects.push(info.root_directory_object_id);
1779        }
1780        if info.graveyard_directory_object_id != INVALID_OBJECT_ID {
1781            objects.push(info.graveyard_directory_object_id);
1782        }
1783        if info.internal_directory_object_id != INVALID_OBJECT_ID {
1784            objects.push(info.internal_directory_object_id);
1785        }
1786        objects
1787    }
1788
1789    pub fn store_info(&self) -> Option<StoreInfo> {
1790        self.store_info.lock().as_ref().cloned()
1791    }
1792
1793    /// Returns None if called during journal replay.
1794    pub fn store_info_handle_object_id(&self) -> Option<u64> {
1795        self.store_info_handle.get().map(|h| h.object_id())
1796    }
1797
1798    pub fn graveyard_count(&self) -> u64 {
1799        self.graveyard_entries.load(Ordering::Relaxed)
1800    }
1801
1802    /// Called to open a store, before replay of this store's mutations.
1803    async fn open(
1804        parent_store: &Arc<ObjectStore>,
1805        store_object_id: u64,
1806        object_cache: Box<dyn ObjectCache<ObjectKey, ObjectValue>>,
1807    ) -> Result<Arc<ObjectStore>, Error> {
1808        let handle =
1809            ObjectStore::open_object(parent_store, store_object_id, HandleOptions::default(), None)
1810                .await?;
1811
1812        let info = load_store_info(parent_store, store_object_id).await?;
1813        let is_encrypted = info.mutations_key.is_some();
1814
1815        let mut total_layer_size = 0;
1816        let last_object_id;
1817
1818        // TODO(https://fxbug.dev/42178043): the layer size here could be bad and cause overflow.
1819
1820        // If the store is encrypted, we can't open the object tree layers now, but we need to
1821        // compute the size of the layers.
1822        if is_encrypted {
1823            for &oid in &info.layers {
1824                total_layer_size += parent_store.get_file_size(oid).await?;
1825            }
1826            if info.encrypted_mutations_object_id != INVALID_OBJECT_ID {
1827                total_layer_size += layer_size_from_encrypted_mutations_size(
1828                    parent_store.get_file_size(info.encrypted_mutations_object_id).await?,
1829                );
1830            }
1831            last_object_id = LastObjectId::Pending;
1832            ensure!(
1833                matches!(
1834                    info.last_object_id,
1835                    LastObjectIdInfo::Encrypted { .. } | LastObjectIdInfo::Low32Bit { .. }
1836                ),
1837                FxfsError::Inconsistent
1838            );
1839        } else {
1840            last_object_id = match info.last_object_id {
1841                LastObjectIdInfo::Unencrypted { id } => LastObjectId::Unencrypted { id },
1842                LastObjectIdInfo::Low32Bit => {
1843                    LastObjectId::Low32Bit { reserved: HashSet::new(), unreserved: Vec::new() }
1844                }
1845                _ => bail!(FxfsError::Inconsistent),
1846            };
1847        }
1848
1849        let fs = parent_store.filesystem();
1850
1851        let store = ObjectStore::new(
1852            Some(parent_store.clone()),
1853            store_object_id,
1854            fs.clone(),
1855            if is_encrypted { None } else { Some(info) },
1856            object_cache,
1857            None,
1858            if is_encrypted { LockState::Locked } else { LockState::Unencrypted },
1859            last_object_id,
1860        );
1861
1862        assert!(store.store_info_handle.set(handle).is_ok(), "Failed to set store_info_handle!");
1863
1864        if !is_encrypted {
1865            let object_tree_layer_object_ids =
1866                store.store_info.lock().as_ref().unwrap().layers.clone();
1867            let object_layers = store.open_layers(object_tree_layer_object_ids, None).await?;
1868            total_layer_size = object_layers.iter().map(|h| h.get_size()).sum();
1869            store
1870                .tree
1871                .append_layers(object_layers)
1872                .await
1873                .context("Failed to read object store layers")?;
1874        }
1875
1876        fs.object_manager().update_reservation(
1877            store_object_id,
1878            tree::reservation_amount_from_layer_size(total_layer_size),
1879        );
1880
1881        Ok(store)
1882    }
1883
1884    async fn load_store_info(&self) -> Result<StoreInfo, Error> {
1885        load_store_info_from_handle(self.store_info_handle.get().unwrap()).await
1886    }
1887
1888    async fn open_layers(
1889        &self,
1890        object_ids: impl std::iter::IntoIterator<Item = u64>,
1891        crypt: Option<Arc<dyn Crypt>>,
1892    ) -> Result<Vec<DataObjectHandle<ObjectStore>>, Error> {
1893        let parent_store = self.parent_store.as_ref().unwrap();
1894        let mut handles = Vec::new();
1895        for object_id in object_ids {
1896            let handle = ObjectStore::open_object(
1897                &parent_store,
1898                object_id,
1899                HandleOptions::default(),
1900                crypt.clone(),
1901            )
1902            .await
1903            .with_context(|| format!("Failed to open layer file {}", object_id))?;
1904            handles.push(handle);
1905        }
1906        Ok(handles)
1907    }
1908
1909    /// Unlocks a store so that it is ready to be used.
1910    /// This is not thread-safe.
1911    pub async fn unlock(
1912        self: &Arc<Self>,
1913        owner: Weak<dyn StoreOwner>,
1914        crypt: Arc<dyn Crypt>,
1915    ) -> Result<(), Error> {
1916        self.unlock_inner(owner, crypt, /*read_only=*/ false).await
1917    }
1918
1919    /// Unlocks a store so that it is ready to be read from.
1920    /// The store will generally behave like it is still locked: when flushed, the store will
1921    /// write out its mutations into the encrypted mutations file, rather than directly updating
1922    /// the layer files of the object store.
1923    /// Re-locking the store (which *must* be done with `Self::lock_read_only` will not trigger a
1924    /// flush, although the store might still be flushed during other operations.
1925    /// This is not thread-safe.
1926    pub async fn unlock_read_only(self: &Arc<Self>, crypt: Arc<dyn Crypt>) -> Result<(), Error> {
1927        self.unlock_inner(NO_OWNER, crypt, /*read_only=*/ true).await
1928    }
1929
1930    async fn unlock_inner(
1931        self: &Arc<Self>,
1932        owner: Weak<dyn StoreOwner>,
1933        crypt: Arc<dyn Crypt>,
1934        read_only: bool,
1935    ) -> Result<(), Error> {
1936        // Unless we are unlocking the store as read-only, the filesystem must not be read-only.
1937        assert!(read_only || !self.filesystem().options().read_only);
1938        match &*self.lock_state.lock() {
1939            LockState::Locked => {}
1940            LockState::Unencrypted => bail!(FxfsError::InvalidArgs),
1941            LockState::Invalid | LockState::Deleted => bail!(FxfsError::Internal),
1942            LockState::Unlocked { .. } | LockState::UnlockedReadOnly(..) => {
1943                bail!(FxfsError::AlreadyBound)
1944            }
1945            LockState::Unknown => panic!("Store was unlocked before replay"),
1946            LockState::Locking => panic!("Store is being locked"),
1947            LockState::Unlocking => panic!("Store is being unlocked"),
1948        }
1949        // We must lock flushing since that can modify store_info and the encrypted mutations file.
1950        let keys = lock_keys![LockKey::flush(self.store_object_id())];
1951        let fs = self.filesystem();
1952        let guard = fs.lock_manager().write_lock(keys).await;
1953
1954        let store_info = self.load_store_info().await?;
1955
1956        self.tree
1957            .append_layers(
1958                self.open_layers(store_info.layers.iter().cloned(), Some(crypt.clone())).await?,
1959            )
1960            .await
1961            .context("Failed to read object tree layer file contents")?;
1962
1963        let wrapped_key =
1964            fxfs_crypto::WrappedKey::Fxfs(store_info.mutations_key.clone().unwrap().into());
1965        let unwrapped_key = crypt
1966            .unwrap_key(&wrapped_key, self.store_object_id)
1967            .await
1968            .context("Failed to unwrap mutations keys")?;
1969        // The ChaCha20 stream cipher we use supports up to 64 GiB.  By default we'll roll the key
1970        // after every 128 MiB.  Here we just need to pick a number that won't cause issues if it
1971        // wraps, so we just use u32::MAX (the offset is u64).
1972        ensure!(store_info.mutations_cipher_offset <= u32::MAX as u64, FxfsError::Inconsistent);
1973        let mut mutations_cipher =
1974            StreamCipher::new(&unwrapped_key, store_info.mutations_cipher_offset);
1975
1976        match &store_info.last_object_id {
1977            LastObjectIdInfo::Encrypted { id, key } => {
1978                let wrapped_key = fxfs_crypto::WrappedKey::Fxfs(key.clone().into());
1979                *self.last_object_id.lock() = LastObjectId::Encrypted {
1980                    id: *id,
1981                    cipher: Box::new(Ff1::new(
1982                        &crypt.unwrap_key(&wrapped_key, self.store_object_id).await?,
1983                    )),
1984                };
1985            }
1986            LastObjectIdInfo::Low32Bit => {
1987                *self.last_object_id.lock() = LastObjectId::Low32Bit {
1988                    reserved: Default::default(),
1989                    unreserved: Default::default(),
1990                }
1991            }
1992            _ => unreachable!(),
1993        }
1994
1995        // Apply the encrypted mutations.
1996        let mut mutations = {
1997            if store_info.encrypted_mutations_object_id == INVALID_OBJECT_ID {
1998                EncryptedMutations::default()
1999            } else {
2000                let parent_store = self.parent_store.as_ref().unwrap();
2001                let handle = ObjectStore::open_object(
2002                    &parent_store,
2003                    store_info.encrypted_mutations_object_id,
2004                    HandleOptions::default(),
2005                    None,
2006                )
2007                .await?;
2008                let mut cursor = std::io::Cursor::new(
2009                    handle
2010                        .contents(MAX_ENCRYPTED_MUTATIONS_SIZE)
2011                        .await
2012                        .context(FxfsError::Inconsistent)?,
2013                );
2014                let mut mutations = EncryptedMutations::deserialize_with_version(&mut cursor)
2015                    .context("Failed to deserialize EncryptedMutations")?
2016                    .0;
2017                let len = cursor.get_ref().len() as u64;
2018                while cursor.position() < len {
2019                    mutations.extend(
2020                        &EncryptedMutations::deserialize_with_version(&mut cursor)
2021                            .context("Failed to deserialize EncryptedMutations")?
2022                            .0,
2023                    );
2024                }
2025                mutations
2026            }
2027        };
2028
2029        // This assumes that the journal has no buffered mutations for this store (see Self::lock).
2030        let journaled = EncryptedMutations::from_replayed_mutations(
2031            self.store_object_id,
2032            fs.journal()
2033                .read_transactions_for_object(self.store_object_id)
2034                .await
2035                .context("Failed to read encrypted mutations from journal")?,
2036        );
2037        mutations.extend(&journaled);
2038
2039        let _ = std::mem::replace(&mut *self.lock_state.lock(), LockState::Unlocking);
2040        *self.store_info.lock() = Some(store_info);
2041
2042        // If we fail, clean up.
2043        let clean_up = scopeguard::guard((), |_| {
2044            *self.lock_state.lock() = LockState::Locked;
2045            *self.store_info.lock() = None;
2046            // Make sure we don't leave unencrypted data lying around in memory.
2047            self.tree.reset();
2048        });
2049
2050        let EncryptedMutations { transactions, mut data, mutations_key_roll } = mutations;
2051
2052        let mut slice = &mut data[..];
2053        let mut last_offset = 0;
2054        for (offset, key) in mutations_key_roll {
2055            let split_offset = offset
2056                .checked_sub(last_offset)
2057                .ok_or(FxfsError::Inconsistent)
2058                .context("Invalid mutation key roll offset")?;
2059            last_offset = offset;
2060            ensure!(split_offset <= slice.len(), FxfsError::Inconsistent);
2061            let (old, new) = slice.split_at_mut(split_offset);
2062            mutations_cipher.decrypt(old);
2063            let unwrapped_key = crypt
2064                .unwrap_key(&fxfs_crypto::WrappedKey::Fxfs(key.into()), self.store_object_id)
2065                .await
2066                .context("Failed to unwrap mutations keys")?;
2067            mutations_cipher = StreamCipher::new(&unwrapped_key, 0);
2068            slice = new;
2069        }
2070        mutations_cipher.decrypt(slice);
2071
2072        // Always roll the mutations key when we unlock which guarantees we won't reuse a
2073        // previous key and nonce.
2074        self.roll_mutations_key(crypt.as_ref()).await?;
2075
2076        let mut cursor = std::io::Cursor::new(data);
2077        for (checkpoint, count) in transactions {
2078            let context = ApplyContext { mode: ApplyMode::Replay, checkpoint };
2079            for _ in 0..count {
2080                let mutation =
2081                    Mutation::deserialize_from_version(&mut cursor, context.checkpoint.version)
2082                        .context("failed to deserialize encrypted mutation")?;
2083                self.apply_mutation(mutation, &context, AssocObj::None)
2084                    .context("failed to apply encrypted mutation")?;
2085            }
2086        }
2087
2088        *self.lock_state.lock() = if read_only {
2089            LockState::UnlockedReadOnly(crypt)
2090        } else {
2091            LockState::Unlocked { owner, crypt }
2092        };
2093
2094        // To avoid unbounded memory growth, we should flush the encrypted mutations now. Otherwise
2095        // it's possible for more writes to be queued and for the store to be locked before we can
2096        // flush anything and that can repeat.
2097        std::mem::drop(guard);
2098
2099        if !read_only && !self.filesystem().options().read_only {
2100            self.flush_with_reason(flush::Reason::Unlock).await?;
2101
2102            // Reap purged files within this store.
2103            let _ = self.filesystem().graveyard().initial_reap(&self).await?;
2104        }
2105
2106        // Return and cancel the clean up.
2107        Ok(ScopeGuard::into_inner(clean_up))
2108    }
2109
2110    pub fn is_locked(&self) -> bool {
2111        matches!(
2112            *self.lock_state.lock(),
2113            LockState::Locked | LockState::Locking | LockState::Unknown
2114        )
2115    }
2116
2117    /// NB: This is not the converse of `is_locked`, as there are lock states where neither are
2118    /// true.
2119    pub fn is_unlocked(&self) -> bool {
2120        matches!(
2121            *self.lock_state.lock(),
2122            LockState::Unlocked { .. } | LockState::UnlockedReadOnly { .. } | LockState::Unlocking
2123        )
2124    }
2125
2126    pub fn is_unknown(&self) -> bool {
2127        matches!(*self.lock_state.lock(), LockState::Unknown)
2128    }
2129
2130    pub fn is_encrypted(&self) -> bool {
2131        self.store_info.lock().as_ref().unwrap().mutations_key.is_some()
2132    }
2133
2134    // Locks a store.
2135    // This operation will take a flush lock on the store, in case any flushes are ongoing.  Any
2136    // ongoing store accesses might be interrupted by this.  See `Self::crypt`.
2137    // Whilst this can return an error, the store will be placed into an unusable but safe state
2138    // (i.e. no lingering unencrypted data) if an error is encountered.
2139    pub async fn lock(&self) -> Result<(), Error> {
2140        // We must lock flushing since it is not safe for that to be happening whilst we are locking
2141        // the store.
2142        let keys = lock_keys![LockKey::flush(self.store_object_id())];
2143        let fs = self.filesystem();
2144        let _guard = fs.lock_manager().write_lock(keys).await;
2145
2146        {
2147            let mut lock_state = self.lock_state.lock();
2148            if let LockState::Unlocked { .. } = &*lock_state {
2149                *lock_state = LockState::Locking;
2150            } else {
2151                panic!("Unexpected lock state: {:?}", &*lock_state);
2152            }
2153        }
2154
2155        // Sync the journal now to ensure that any buffered mutations for this store make it out to
2156        // disk.  This is necessary to be able to unlock the store again.
2157        // We need to establish a barrier at this point (so that the journaled writes are observable
2158        // by any future attempts to unlock the store), hence the flush_device.
2159        let sync_result =
2160            self.filesystem().sync(SyncOptions { flush_device: true, ..Default::default() }).await;
2161
2162        *self.lock_state.lock() = if let Err(error) = &sync_result {
2163            error!(error:?; "Failed to sync journal; store will no longer be usable");
2164            LockState::Invalid
2165        } else {
2166            LockState::Locked
2167        };
2168        self.key_manager.clear();
2169        *self.store_info.lock() = None;
2170        self.tree.reset();
2171
2172        sync_result
2173    }
2174
2175    // Locks a store which was previously unlocked read-only (see `Self::unlock_read_only`).  Data
2176    // is not flushed, and instead any journaled mutations are buffered back into the ObjectStore
2177    // and will be replayed next time the store is unlocked.
2178    pub fn lock_read_only(&self) {
2179        *self.lock_state.lock() = LockState::Locked;
2180        *self.store_info.lock() = None;
2181        self.tree.reset();
2182    }
2183
2184    // Returns None if the object ID cipher needs to be created or rolled, or a more expensive
2185    // algorithm needs to be used.
2186    fn maybe_get_next_object_id(&self) -> Option<ReservedId<'_>> {
2187        self.last_object_id.lock().try_get_next().map(|id| ReservedId::new(self, id))
2188    }
2189
2190    /// Returns a new object ID that can be used.  This will create an object ID cipher if needed.
2191    ///
2192    /// If the object ID key needs to be rolled, a new transaction will be created and committed.
2193    /// This transaction does not take the filesystem lock, hence `txn_guard`.
2194    pub(super) async fn get_next_object_id(
2195        &self,
2196        txn_guard: &TxnGuard<'_>,
2197    ) -> Result<ReservedId<'_>, Error> {
2198        {
2199            let mut last_object_id = self.last_object_id.lock();
2200            if let Some(id) = last_object_id.try_get_next() {
2201                return Ok(ReservedId::new(self, id));
2202            }
2203            ensure!(
2204                !matches!(&*last_object_id, LastObjectId::Unencrypted { .. }),
2205                FxfsError::Inconsistent
2206            );
2207        }
2208
2209        let parent_store = self.parent_store().unwrap();
2210
2211        // Create a transaction (which has a lock) and then check again.
2212        //
2213        // NOTE: Since this can be a nested transaction, we must take care to avoid deadlocks; no
2214        // more locks should be taken whilst we hold this lock.
2215        let mut transaction = self
2216            .filesystem()
2217            .new_transaction(
2218                lock_keys![LockKey::object(parent_store.store_object_id, self.store_object_id)],
2219                Options {
2220                    // We must skip journal checks because this transaction might be needed to
2221                    // compact.
2222                    skip_journal_checks: true,
2223                    borrow_metadata_space: true,
2224                    txn_guard: Some(txn_guard),
2225                    ..Default::default()
2226                },
2227            )
2228            .await?;
2229
2230        let mut next_id_hi = 0;
2231
2232        let is_low_32_bit = {
2233            let mut last_object_id = self.last_object_id.lock();
2234            if let Some(id) = last_object_id.try_get_next() {
2235                // Something else raced and created/rolled the cipher.
2236                return Ok(ReservedId::new(self, id));
2237            }
2238
2239            match &*last_object_id {
2240                LastObjectId::Encrypted { id, .. } => {
2241                    // It shouldn't be possible for last_object_id to wrap within our lifetime, so
2242                    // if this happens, it's most likely due to corruption.
2243                    next_id_hi =
2244                        id.checked_add(1 << 32).ok_or(FxfsError::Inconsistent)? & OBJECT_ID_HI_MASK;
2245
2246                    info!(store_id = self.store_object_id; "Rolling object ID key");
2247
2248                    false
2249                }
2250                LastObjectId::Low32Bit { .. } => true,
2251                _ => unreachable!(),
2252            }
2253        };
2254
2255        if is_low_32_bit {
2256            // Keep picking an object ID at random until we find one free.
2257
2258            // To avoid races, this must be before we capture the layer set.
2259            self.last_object_id.lock().drain_unreserved();
2260
2261            let layer_set = self.tree.layer_set();
2262            let mut key = ObjectKey::object(0);
2263            loop {
2264                let next_id = rand::rng().next_u32() as u64;
2265                let Some(next_id) = NonZero::new(next_id) else { continue };
2266                if self.last_object_id.lock().is_reserved(next_id.get()) {
2267                    continue;
2268                }
2269                key.object_id = next_id.get();
2270                if layer_set.key_exists(&key).await? == Existence::Missing {
2271                    self.last_object_id.lock().reserve(next_id.get());
2272                    return Ok(ReservedId::new(self, next_id));
2273                }
2274            }
2275        } else {
2276            // Create a key.
2277            let (object_id_wrapped, object_id_unwrapped) = self
2278                .crypt()
2279                .unwrap()
2280                .create_key(self.store_object_id, KeyPurpose::Metadata)
2281                .await?;
2282
2283            // Normally we would use a mutation to note the updated key, but that would complicate
2284            // replay.  During replay, we need to keep track of the highest used object ID and this
2285            // is done by watching mutations to see when we create objects, and then decrypting
2286            // the object ID.  This relies on the unwrapped key being available, so as soon as
2287            // we detect the key has changed, we would need to immediately unwrap the key via the
2288            // crypt service.  Currently, this isn't easy to do during replay.  An option we could
2289            // consider would be to include the unencrypted object ID when we create objects, which
2290            // would avoid us having to decrypt the object ID during replay.
2291            //
2292            // For now and for historical reasons, the approach we take is to just write a new
2293            // version of StoreInfo here.  We must take care that we only update the key and not any
2294            // other information contained within StoreInfo because other information should only be
2295            // updated when we flush.  We are holding the lock on the StoreInfo file, so this will
2296            // prevent potential races with flushing.  To make sure we only change the key, we read
2297            // StoreInfo from storage rather than using our in-memory copy.  This won't be
2298            // performant, but rolling the object ID key will be extremely rare.
2299            let new_store_info = StoreInfo {
2300                last_object_id: LastObjectIdInfo::Encrypted {
2301                    id: next_id_hi,
2302                    key: object_id_wrapped.clone(),
2303                },
2304                ..self.load_store_info().await?
2305            };
2306
2307            self.write_store_info(&mut transaction, &new_store_info).await?;
2308
2309            transaction
2310                .commit_with_callback(|_| {
2311                    self.store_info.lock().as_mut().unwrap().last_object_id =
2312                        new_store_info.last_object_id;
2313                    match &mut *self.last_object_id.lock() {
2314                        LastObjectId::Encrypted { id, cipher } => {
2315                            **cipher = Ff1::new(&object_id_unwrapped);
2316                            *id = next_id_hi;
2317                            ReservedId::new(
2318                                self,
2319                                NonZero::new(next_id_hi | cipher.encrypt(0) as u64).unwrap(),
2320                            )
2321                        }
2322                        _ => unreachable!(),
2323                    }
2324                })
2325                .await
2326        }
2327    }
2328
2329    /// Query the next object ID that will be used. Intended for use when checking filesystem
2330    /// consistency. Prefer [`Self::get_next_object_id()`] for general use.
2331    pub(crate) fn query_next_object_id(&self) -> u64 {
2332        self.last_object_id.lock().peek_next()
2333    }
2334
2335    fn allocator(&self) -> Arc<Allocator> {
2336        self.filesystem().allocator()
2337    }
2338
2339    // If |transaction| has an impending mutation for the underlying object, returns that.
2340    // Otherwise, looks up the object from the tree and returns a suitable mutation for it.  The
2341    // mutation is returned here rather than the item because the mutation includes the operation
2342    // which has significance: inserting an object implies it's the first of its kind unlike
2343    // replacing an object.
2344    async fn txn_get_object_mutation(
2345        &self,
2346        transaction: &Transaction<'_>,
2347        object_id: u64,
2348    ) -> Result<ObjectStoreMutation, Error> {
2349        if let Some(mutation) =
2350            transaction.get_object_mutation(self.store_object_id, ObjectKey::object(object_id))
2351        {
2352            Ok(mutation.clone())
2353        } else {
2354            Ok(ObjectStoreMutation {
2355                item: self
2356                    .tree
2357                    .find(&ObjectKey::object(object_id))
2358                    .await?
2359                    .ok_or(FxfsError::Inconsistent)
2360                    .context("Object id missing")?,
2361                op: Operation::ReplaceOrInsert,
2362            })
2363        }
2364    }
2365
2366    /// Like txn_get_object_mutation but with expanded visibility.
2367    /// Only available in migration code.
2368    #[cfg(feature = "migration")]
2369    pub async fn get_object_mutation(
2370        &self,
2371        transaction: &Transaction<'_>,
2372        object_id: u64,
2373    ) -> Result<ObjectStoreMutation, Error> {
2374        self.txn_get_object_mutation(transaction, object_id).await
2375    }
2376
2377    fn update_last_object_id(&self, object_id: u64) {
2378        let mut last_object_id = self.last_object_id.lock();
2379        match &mut *last_object_id {
2380            LastObjectId::Pending => unreachable!(),
2381            LastObjectId::Unencrypted { id } => {
2382                if object_id > *id {
2383                    *id = object_id
2384                }
2385            }
2386            LastObjectId::Encrypted { id, cipher } => {
2387                // For encrypted stores, object_id will be encrypted here, so we must decrypt first.
2388
2389                // If the object ID cipher has been rolled, then it's possible we might see object
2390                // IDs that were generated using a different cipher so the decrypt here will return
2391                // the wrong value, but that won't matter because the hi part of the object ID
2392                // should still discriminate.
2393                let object_id =
2394                    object_id & OBJECT_ID_HI_MASK | cipher.decrypt(object_id as u32) as u64;
2395                if object_id > *id {
2396                    *id = object_id;
2397                }
2398            }
2399            LastObjectId::Low32Bit { .. } => {}
2400        }
2401    }
2402
2403    /// If possible, converts the given object ID to its unencrypted value.  Returns None if it is
2404    /// not possible to convert to its unencrypted value because the key is unavailable.
2405    pub fn to_unencrypted_object_id(&self, object_id: u64) -> Option<u64> {
2406        let last_object_id = self.last_object_id.lock();
2407        match &*last_object_id {
2408            LastObjectId::Pending => None,
2409            LastObjectId::Unencrypted { .. } | LastObjectId::Low32Bit { .. } => Some(object_id),
2410            LastObjectId::Encrypted { id, cipher } => {
2411                if id & OBJECT_ID_HI_MASK != object_id & OBJECT_ID_HI_MASK {
2412                    None
2413                } else {
2414                    Some(object_id & OBJECT_ID_HI_MASK | cipher.decrypt(object_id as u32) as u64)
2415                }
2416            }
2417        }
2418    }
2419
2420    /// Adds the specified object to the graveyard.
2421    pub fn add_to_graveyard(&self, transaction: &mut Transaction<'_>, object_id: u64) {
2422        let graveyard_id = self.graveyard_directory_object_id();
2423        assert_ne!(graveyard_id, INVALID_OBJECT_ID);
2424        transaction.add(
2425            self.store_object_id,
2426            Mutation::replace_or_insert_object(
2427                ObjectKey::graveyard_entry(graveyard_id, object_id),
2428                ObjectValue::Some,
2429            ),
2430        );
2431    }
2432
2433    /// Removes the specified object from the graveyard.  NB: Care should be taken when calling
2434    /// this because graveyard entries are used for purging deleted files *and* for trimming
2435    /// extents.  For example, consider the following sequence:
2436    ///
2437    ///     1. Add Trim graveyard entry.
2438    ///     2. Replace with Some graveyard entry (see above).
2439    ///     3. Remove graveyard entry.
2440    ///
2441    /// If the desire in #3 is just to cancel the effect of the Some entry, then #3 should
2442    /// actually be:
2443    ///
2444    ///     3. Replace with Trim graveyard entry.
2445    pub fn remove_from_graveyard(&self, transaction: &mut Transaction<'_>, object_id: u64) {
2446        transaction.add(
2447            self.store_object_id,
2448            Mutation::replace_or_insert_object(
2449                ObjectKey::graveyard_entry(self.graveyard_directory_object_id(), object_id),
2450                ObjectValue::None,
2451            ),
2452        );
2453    }
2454
2455    /// Removes the specified attribute from the graveyard. Unlike object graveyard entries,
2456    /// attribute graveyard entries only have one functionality (i.e. to purge deleted attributes)
2457    /// so the caller does not need to be concerned about replacing the graveyard attribute entry
2458    /// with its prior state when cancelling it. See comment on `remove_from_graveyard()`.
2459    pub fn remove_attribute_from_graveyard(
2460        &self,
2461        transaction: &mut Transaction<'_>,
2462        object_id: u64,
2463        attribute_id: u64,
2464    ) {
2465        transaction.add(
2466            self.store_object_id,
2467            Mutation::replace_or_insert_object(
2468                ObjectKey::graveyard_attribute_entry(
2469                    self.graveyard_directory_object_id(),
2470                    object_id,
2471                    attribute_id,
2472                ),
2473                ObjectValue::None,
2474            ),
2475        );
2476    }
2477
2478    // Roll the mutations key.  The new key will be written for the next encrypted mutation.
2479    async fn roll_mutations_key(&self, crypt: &dyn Crypt) -> Result<(), Error> {
2480        let (wrapped_key, unwrapped_key) =
2481            crypt.create_key(self.store_object_id, KeyPurpose::Metadata).await?;
2482
2483        // The mutations_cipher lock must be held for the duration so that mutations_cipher and
2484        // store_info are updated atomically.  Otherwise, write_mutation could find a new cipher but
2485        // end up writing the wrong wrapped key.
2486        let mut cipher = self.mutations_cipher.lock();
2487        *cipher = Some(StreamCipher::new(&unwrapped_key, 0));
2488        self.store_info.lock().as_mut().unwrap().mutations_key = Some(wrapped_key);
2489        // mutations_cipher_offset is updated by flush.
2490        Ok(())
2491    }
2492
2493    // When the symlink is unlocked, this function decrypts `link` and returns a bag of bytes that
2494    // is identical to that which was passed in as the target on `create_symlink`.
2495    // If the symlink is locked, this function hashes the encrypted `link` with Sha256 in order to
2496    // get a standard length and then base64 encodes the hash and returns that to the caller.
2497    pub async fn read_encrypted_symlink(
2498        &self,
2499        object_id: u64,
2500        link: Vec<u8>,
2501    ) -> Result<Vec<u8>, Error> {
2502        let mut link = link;
2503        let key = self
2504            .key_manager()
2505            .get_fscrypt_key(object_id, self.crypt().unwrap().as_ref(), async || {
2506                self.get_keys(object_id).await
2507            })
2508            .await?;
2509        if let Some(key) = key.into_cipher() {
2510            key.decrypt_symlink(object_id, &mut link)?;
2511            Ok(link)
2512        } else {
2513            // Locked symlinks are encoded using a hash_code of 0.
2514            let proxy_filename =
2515                fscrypt::proxy_filename::ProxyFilename::new_with_hash_code(0, &link);
2516            let proxy_filename_str: String = proxy_filename.into();
2517            Ok(proxy_filename_str.as_bytes().to_vec())
2518        }
2519    }
2520
2521    /// Returns the link of a symlink object.
2522    pub async fn read_symlink(&self, object_id: u64) -> Result<Vec<u8>, Error> {
2523        match self.tree.find(&ObjectKey::object(object_id)).await? {
2524            None => bail!(FxfsError::NotFound),
2525            Some(Item {
2526                value: ObjectValue::Object { kind: ObjectKind::EncryptedSymlink { link, .. }, .. },
2527                ..
2528            }) => self.read_encrypted_symlink(object_id, link.to_vec()).await,
2529            Some(Item {
2530                value: ObjectValue::Object { kind: ObjectKind::Symlink { link, .. }, .. },
2531                ..
2532            }) => Ok(link.to_vec()),
2533            Some(item) => Err(anyhow!(FxfsError::Inconsistent)
2534                .context(format!("Unexpected item in lookup: {item:?}"))),
2535        }
2536    }
2537
2538    /// Retrieves the wrapped keys for the given object.  The keys *should* be known to exist and it
2539    /// will be considered an inconsistency if they don't.
2540    pub async fn get_keys(&self, object_id: u64) -> Result<EncryptionKeys, Error> {
2541        match self.tree.find(&ObjectKey::keys(object_id)).await?.ok_or(FxfsError::Inconsistent)? {
2542            Item { value: ObjectValue::Keys(keys), .. } => Ok(keys),
2543            _ => Err(anyhow!(FxfsError::Inconsistent).context("open_object: Expected keys")),
2544        }
2545    }
2546
2547    pub async fn update_attributes<'a>(
2548        &self,
2549        transaction: &mut Transaction<'a>,
2550        object_id: u64,
2551        node_attributes: Option<&fio::MutableNodeAttributes>,
2552        change_time: Option<Timestamp>,
2553    ) -> Result<(), Error> {
2554        if change_time.is_none() {
2555            if let Some(attributes) = node_attributes {
2556                let empty_attributes = fio::MutableNodeAttributes { ..Default::default() };
2557                if *attributes == empty_attributes {
2558                    return Ok(());
2559                }
2560            } else {
2561                return Ok(());
2562            }
2563        }
2564        let mut mutation = self.txn_get_object_mutation(transaction, object_id).await?;
2565        if let ObjectValue::Object { ref mut attributes, .. } = mutation.item.value {
2566            if let Some(time) = change_time {
2567                attributes.change_time = time;
2568            }
2569            if let Some(node_attributes) = node_attributes {
2570                if let Some(time) = node_attributes.creation_time {
2571                    attributes.creation_time = Timestamp::from_nanos(time);
2572                }
2573                if let Some(time) = node_attributes.modification_time {
2574                    attributes.modification_time = Timestamp::from_nanos(time);
2575                }
2576                if let Some(time) = node_attributes.access_time {
2577                    attributes.access_time = Timestamp::from_nanos(time);
2578                }
2579                if node_attributes.mode.is_some()
2580                    || node_attributes.uid.is_some()
2581                    || node_attributes.gid.is_some()
2582                    || node_attributes.rdev.is_some()
2583                {
2584                    if let Some(a) = &mut attributes.posix_attributes {
2585                        if let Some(mode) = node_attributes.mode {
2586                            a.mode = mode;
2587                        }
2588                        if let Some(uid) = node_attributes.uid {
2589                            a.uid = uid;
2590                        }
2591                        if let Some(gid) = node_attributes.gid {
2592                            a.gid = gid;
2593                        }
2594                        if let Some(rdev) = node_attributes.rdev {
2595                            a.rdev = rdev;
2596                        }
2597                    } else {
2598                        attributes.posix_attributes = Some(PosixAttributes {
2599                            mode: node_attributes.mode.unwrap_or_default(),
2600                            uid: node_attributes.uid.unwrap_or_default(),
2601                            gid: node_attributes.gid.unwrap_or_default(),
2602                            rdev: node_attributes.rdev.unwrap_or_default(),
2603                        });
2604                    }
2605                }
2606            }
2607        } else {
2608            bail!(
2609                anyhow!(FxfsError::Inconsistent)
2610                    .context("ObjectStore.update_attributes: Expected object value")
2611            );
2612        };
2613        transaction.add(self.store_object_id(), Mutation::ObjectStore(mutation));
2614        Ok(())
2615    }
2616
2617    // Updates and commits the changes to access time in ObjectProperties. The update matches
2618    // Linux's RELATIME. That is, access time is updated to the current time if access time is less
2619    // than or equal to the last modification or status change, or if it has been more than a day
2620    // since the last access.  `precondition` is a condition to be checked *after* taking the lock
2621    // on the object.  If `precondition` returns false, no update will be performed.
2622    pub async fn update_access_time(
2623        &self,
2624        object_id: u64,
2625        props: &mut ObjectProperties,
2626        precondition: impl FnOnce() -> bool,
2627    ) -> Result<(), Error> {
2628        let access_time = props.access_time.as_nanos();
2629        let modification_time = props.modification_time.as_nanos();
2630        let change_time = props.change_time.as_nanos();
2631        let now = Timestamp::now();
2632        if access_time <= modification_time
2633            || access_time <= change_time
2634            || access_time
2635                < now.as_nanos()
2636                    - Timestamp::from(std::time::Duration::from_secs(24 * 60 * 60)).as_nanos()
2637        {
2638            let mut transaction = self
2639                .filesystem()
2640                .clone()
2641                .new_transaction(
2642                    lock_keys![LockKey::object(self.store_object_id, object_id,)],
2643                    Options { borrow_metadata_space: true, ..Default::default() },
2644                )
2645                .await?;
2646            if precondition() {
2647                self.update_attributes(
2648                    &mut transaction,
2649                    object_id,
2650                    Some(&fio::MutableNodeAttributes {
2651                        access_time: Some(now.as_nanos()),
2652                        ..Default::default()
2653                    }),
2654                    None,
2655                )
2656                .await?;
2657                transaction.commit().await?;
2658                props.access_time = now;
2659            }
2660        }
2661        Ok(())
2662    }
2663
2664    async fn write_store_info<'a>(
2665        &'a self,
2666        transaction: &mut Transaction<'a>,
2667        info: &StoreInfo,
2668    ) -> Result<(), Error> {
2669        let mut serialized_info = Vec::new();
2670        info.serialize_with_version(&mut serialized_info)?;
2671        let mut buf = self.device.allocate_buffer(serialized_info.len()).await;
2672        buf.as_mut_slice().copy_from_slice(&serialized_info[..]);
2673        self.store_info_handle.get().unwrap().txn_write(transaction, 0u64, buf.as_ref()).await
2674    }
2675
2676    pub fn mark_deleted(&self) {
2677        *self.lock_state.lock() = LockState::Deleted;
2678    }
2679
2680    #[cfg(test)]
2681    pub(crate) fn test_set_last_object_id(&self, object_id: u64) {
2682        match &mut *self.last_object_id.lock() {
2683            LastObjectId::Encrypted { id, .. } => *id = object_id,
2684            _ => unreachable!(),
2685        }
2686    }
2687}
2688
2689#[async_trait]
2690impl JournalingObject for ObjectStore {
2691    fn apply_mutation(
2692        &self,
2693        mutation: Mutation,
2694        context: &ApplyContext<'_, '_>,
2695        _assoc_obj: AssocObj<'_>,
2696    ) -> Result<(), Error> {
2697        match &*self.lock_state.lock() {
2698            LockState::Locked | LockState::Locking => {
2699                ensure!(
2700                    matches!(mutation, Mutation::BeginFlush | Mutation::EndFlush)
2701                        || matches!(
2702                            mutation,
2703                            Mutation::EncryptedObjectStore(_) | Mutation::UpdateMutationsKey(_)
2704                                if context.mode.is_replay()
2705                        ),
2706                    anyhow!(FxfsError::Inconsistent)
2707                        .context(format!("Unexpected mutation for encrypted store: {mutation:?}"))
2708                );
2709            }
2710            LockState::Invalid
2711            | LockState::Unlocking
2712            | LockState::Unencrypted
2713            | LockState::Unlocked { .. }
2714            | LockState::UnlockedReadOnly(..)
2715            | LockState::Deleted => {}
2716            lock_state @ _ => panic!("Unexpected lock state: {lock_state:?}"),
2717        }
2718        match mutation {
2719            Mutation::ObjectStore(ObjectStoreMutation { mut item, op }) => {
2720                item.sequence = context.checkpoint.file_offset;
2721                match op {
2722                    Operation::Insert => {
2723                        let mut unreserve_id = INVALID_OBJECT_ID;
2724                        // If we are inserting an object record for the first time, it signifies the
2725                        // birth of the object so we need to adjust the object count.
2726                        if matches!(item.value, ObjectValue::Object { .. }) {
2727                            {
2728                                let info = &mut self.store_info.lock();
2729                                let object_count = &mut info.as_mut().unwrap().object_count;
2730                                *object_count = object_count.saturating_add(1);
2731                            }
2732                            if context.mode.is_replay() {
2733                                self.update_last_object_id(item.key.object_id);
2734                            } else {
2735                                unreserve_id = item.key.object_id;
2736                            }
2737                        } else if !context.mode.is_replay()
2738                            && matches!(
2739                                item.key.data,
2740                                ObjectKeyData::GraveyardEntry { .. }
2741                                    | ObjectKeyData::GraveyardAttributeEntry { .. }
2742                            )
2743                        {
2744                            if matches!(item.value, ObjectValue::Some | ObjectValue::Trim) {
2745                                self.graveyard_entries.fetch_add(1, Ordering::Relaxed);
2746                            } else if matches!(item.value, ObjectValue::None) {
2747                                self.graveyard_entries.fetch_sub(1, Ordering::Relaxed);
2748                            }
2749                        }
2750                        self.tree.insert(item)?;
2751                        if unreserve_id != INVALID_OBJECT_ID {
2752                            // To avoid races, this *must* be after the `tree.insert(..)` above.
2753                            self.last_object_id.lock().unreserve(unreserve_id);
2754                        }
2755                    }
2756                    Operation::ReplaceOrInsert => {
2757                        if !context.mode.is_replay()
2758                            && matches!(
2759                                item.key.data,
2760                                ObjectKeyData::GraveyardEntry { .. }
2761                                    | ObjectKeyData::GraveyardAttributeEntry { .. }
2762                            )
2763                        {
2764                            if matches!(item.value, ObjectValue::Some | ObjectValue::Trim) {
2765                                self.graveyard_entries.fetch_add(1, Ordering::Relaxed);
2766                            } else if matches!(item.value, ObjectValue::None) {
2767                                self.graveyard_entries.fetch_sub(1, Ordering::Relaxed);
2768                            }
2769                        }
2770                        self.tree.replace_or_insert(item);
2771                    }
2772                    Operation::Merge => {
2773                        if item.is_tombstone() {
2774                            let info = &mut self.store_info.lock();
2775                            let object_count = &mut info.as_mut().unwrap().object_count;
2776                            *object_count = object_count.saturating_sub(1);
2777                        }
2778                        if !context.mode.is_replay()
2779                            && matches!(
2780                                item.key.data,
2781                                ObjectKeyData::GraveyardEntry { .. }
2782                                    | ObjectKeyData::GraveyardAttributeEntry { .. }
2783                            )
2784                        {
2785                            if matches!(item.value, ObjectValue::Some | ObjectValue::Trim) {
2786                                self.graveyard_entries.fetch_add(1, Ordering::Relaxed);
2787                            } else if matches!(item.value, ObjectValue::None) {
2788                                self.graveyard_entries.fetch_sub(1, Ordering::Relaxed);
2789                            }
2790                        }
2791                        let lower_bound = item.key.key_for_merge_into();
2792                        self.tree.merge_into(item, &lower_bound);
2793                    }
2794                }
2795            }
2796            Mutation::BeginFlush => {
2797                ensure!(self.parent_store.is_some(), FxfsError::Inconsistent);
2798                self.tree.seal();
2799            }
2800            Mutation::EndFlush => ensure!(self.parent_store.is_some(), FxfsError::Inconsistent),
2801            Mutation::EncryptedObjectStore(_) | Mutation::UpdateMutationsKey(_) => {
2802                // We will process these during Self::unlock.
2803                ensure!(
2804                    !matches!(&*self.lock_state.lock(), LockState::Unencrypted),
2805                    FxfsError::Inconsistent
2806                );
2807            }
2808            Mutation::CreateInternalDir(object_id) => {
2809                ensure!(object_id != INVALID_OBJECT_ID, FxfsError::Inconsistent);
2810                self.store_info.lock().as_mut().unwrap().internal_directory_object_id = object_id;
2811            }
2812            _ => bail!("unexpected mutation: {:?}", mutation),
2813        }
2814        self.counters.lock().mutations_applied += 1;
2815        Ok(())
2816    }
2817
2818    fn drop_mutation(&self, mutation: Mutation, _transaction: &Transaction<'_>) {
2819        self.counters.lock().mutations_dropped += 1;
2820        if let Mutation::ObjectStore(ObjectStoreMutation {
2821            item: Item { key: ObjectKey { object_id, .. }, value: ObjectValue::Object { .. }, .. },
2822            op: Operation::Insert,
2823        }) = mutation
2824        {
2825            self.last_object_id.lock().unreserve(object_id);
2826        }
2827    }
2828
2829    /// Push all in-memory structures to the device. This is not necessary for sync since the
2830    /// journal will take care of it.  This is supposed to be called when there is either memory or
2831    /// space pressure (flushing the store will persist in-memory data and allow the journal file to
2832    /// be trimmed).
2833    ///
2834    /// Also returns the earliest version of a struct in the filesystem (when known).
2835    async fn flush(&self) -> Result<Version, Error> {
2836        self.flush_with_reason(flush::Reason::Journal).await
2837    }
2838
2839    fn write_mutation(&self, mutation: &Mutation, mut writer: journal::Writer<'_>) {
2840        // Intentionally enumerating all variants to force a decision on any new variants. Encrypt
2841        // all mutations that could affect an encrypted object store contents or the `StoreInfo` of
2842        // the encrypted object store. During `unlock()` any mutations which haven't been encrypted
2843        // won't be replayed after reading `StoreInfo`.
2844        match mutation {
2845            // Whilst CreateInternalDir is a mutation for `StoreInfo`, which isn't encrypted, we
2846            // still choose to encrypt the mutation because it makes it easier to deal with replay.
2847            // When we replay mutations for an encrypted store, the only thing we keep in memory are
2848            // the encrypted mutations; we don't keep `StoreInfo` or changes to it in memory. So, by
2849            // encrypting the CreateInternalDir mutation here, it means we don't have to track both
2850            // encrypted mutations bound for the LSM tree and unencrypted mutations for `StoreInfo`
2851            // to use in `unlock()`. It'll just bundle CreateInternalDir mutations with the other
2852            // encrypted mutations and handled them all in sequence during `unlock()`.
2853            Mutation::ObjectStore(_) | Mutation::CreateInternalDir(_) => {
2854                let mut cipher = self.mutations_cipher.lock();
2855                if let Some(cipher) = cipher.as_mut() {
2856                    // If this is the first time we've used this key, we must write the key out.
2857                    if cipher.offset() == 0 {
2858                        writer.write(Mutation::update_mutations_key(
2859                            self.store_info
2860                                .lock()
2861                                .as_ref()
2862                                .unwrap()
2863                                .mutations_key
2864                                .as_ref()
2865                                .unwrap()
2866                                .clone(),
2867                        ));
2868                    }
2869                    let mut buffer = Vec::new();
2870                    mutation.serialize_into(&mut buffer).unwrap();
2871                    cipher.encrypt(&mut buffer);
2872                    writer.write(Mutation::EncryptedObjectStore(buffer.into()));
2873                    return;
2874                }
2875            }
2876            // `EncryptedObjectStore` and `UpdateMutationsKey` are both obviously associated with
2877            // encrypted object stores, but are either the encrypted mutation data itself or
2878            // metadata governing how the data will be encrypted. They should only be produced here.
2879            Mutation::EncryptedObjectStore(_) | Mutation::UpdateMutationsKey(_) => {
2880                debug_assert!(false, "Only this method should generate encrypted mutations");
2881            }
2882            // `BeginFlush` and `EndFlush` are not needed during `unlock()` and are needed during
2883            // the initial journal replay, so should not be encrypted. `Allocator`, `DeleteVolume`,
2884            // `UpdateBorrowed` mutations are never associated with an encrypted store as we do not
2885            // encrypt the allocator or root/root-parent stores so we can avoid the locking.
2886            Mutation::Allocator(_)
2887            | Mutation::BeginFlush
2888            | Mutation::EndFlush
2889            | Mutation::DeleteVolume
2890            | Mutation::UpdateBorrowed(_) => {}
2891        }
2892        writer.write(mutation.clone());
2893    }
2894}
2895
2896impl Drop for ObjectStore {
2897    fn drop(&mut self) {
2898        let mut last_object_id = self.last_object_id.lock();
2899        last_object_id.drain_unreserved();
2900        match &*last_object_id {
2901            LastObjectId::Low32Bit { reserved, .. } => debug_assert!(reserved.is_empty()),
2902            _ => {}
2903        }
2904    }
2905}
2906
2907impl HandleOwner for ObjectStore {}
2908
2909impl AsRef<ObjectStore> for ObjectStore {
2910    fn as_ref(&self) -> &ObjectStore {
2911        self
2912    }
2913}
2914
2915fn layer_size_from_encrypted_mutations_size(size: u64) -> u64 {
2916    // This is similar to reserved_space_from_journal_usage. It needs to be a worst case estimate of
2917    // the amount of metadata space that might need to be reserved to allow the encrypted mutations
2918    // to be written to layer files.  It needs to be >= than reservation_amount_from_layer_size will
2919    // return once the data has been written to layer files and <= than
2920    // reserved_space_from_journal_usage would use.  We can't just use
2921    // reserved_space_from_journal_usage because the encrypted mutations file includes some extra
2922    // data (it includes the checkpoints) that isn't written in the same way to the journal.
2923    size * 3
2924}
2925
2926impl AssociatedObject for ObjectStore {}
2927
2928/// Argument to the trim_some method.
2929#[derive(Debug)]
2930pub enum TrimMode {
2931    /// Trim extents beyond the current size.
2932    UseSize,
2933
2934    /// Trim extents beyond the supplied offset.
2935    FromOffset(u64),
2936
2937    /// Remove the object (or attribute) from the store once it is fully trimmed.
2938    Tombstone(TombstoneMode),
2939}
2940
2941/// Sets the mode for tombstoning (either at the object or attribute level).
2942#[derive(Debug)]
2943pub enum TombstoneMode {
2944    Object,
2945    Attribute,
2946}
2947
2948/// Result of the trim_some method.
2949#[derive(Debug)]
2950pub enum TrimResult {
2951    /// We reached the limit of the transaction and more extents might follow.
2952    Incomplete,
2953
2954    /// We finished this attribute.  Returns the ID of the next attribute for the same object if
2955    /// there is one.
2956    Done(Option<u64>),
2957}
2958
2959/// Loads store info.
2960pub async fn load_store_info(
2961    parent: &Arc<ObjectStore>,
2962    store_object_id: u64,
2963) -> Result<StoreInfo, Error> {
2964    load_store_info_from_handle(
2965        &ObjectStore::open_object(parent, store_object_id, HandleOptions::default(), None).await?,
2966    )
2967    .await
2968}
2969
2970async fn load_store_info_from_handle(
2971    handle: &DataObjectHandle<impl HandleOwner>,
2972) -> Result<StoreInfo, Error> {
2973    Ok(if handle.get_size() > 0 {
2974        let serialized_info = handle.contents(MAX_STORE_INFO_SERIALIZED_SIZE).await?;
2975        let mut cursor = std::io::Cursor::new(serialized_info);
2976        let (store_info, _) = StoreInfo::deserialize_with_version(&mut cursor)
2977            .context("Failed to deserialize StoreInfo")?;
2978        store_info
2979    } else {
2980        // The store_info will be absent for a newly created and empty object store.
2981        StoreInfo::default()
2982    })
2983}
2984
2985#[cfg(test)]
2986mod tests {
2987    use super::{
2988        DEFAULT_DATA_ATTRIBUTE_ID, FSVERITY_MERKLE_ATTRIBUTE_ID, FsverityMetadata, HandleOptions,
2989        LastObjectId, LastObjectIdInfo, LockKey, MAX_STORE_INFO_SERIALIZED_SIZE, Mutation,
2990        NO_OWNER, NewChildStoreOptions, OBJECT_ID_HI_MASK, ObjectStore, RootDigest, StoreInfo,
2991        StoreOptions, StoreOwner,
2992    };
2993    use crate::errors::FxfsError;
2994    use crate::filesystem::{FxFilesystem, JournalingObject, OpenFxFilesystem, SyncOptions};
2995    use crate::fsck::{fsck, fsck_volume};
2996    use crate::lsm_tree::Query;
2997    use crate::lsm_tree::types::{ItemRef, LayerIterator};
2998    use crate::object_handle::{
2999        INVALID_OBJECT_ID, ObjectHandle, ReadObjectHandle, WriteObjectHandle,
3000    };
3001    use crate::object_store::directory::Directory;
3002    use crate::object_store::object_record::{AttributeKey, ObjectKey, ObjectKind, ObjectValue};
3003    use crate::object_store::transaction::{Options, lock_keys};
3004    use crate::object_store::volume::root_volume;
3005    use crate::serialized_types::VersionedLatest;
3006    use crate::testing;
3007    use assert_matches::assert_matches;
3008    use async_trait::async_trait;
3009    use fuchsia_async as fasync;
3010    use fuchsia_sync::Mutex;
3011    use futures::join;
3012    use fxfs_crypto::ff1::Ff1;
3013    use fxfs_crypto::{
3014        Crypt, FXFS_KEY_SIZE, FXFS_WRAPPED_KEY_SIZE, FxfsKey, UnwrappedKey, WrappedKeyBytes,
3015    };
3016    use fxfs_insecure_crypto::new_insecure_crypt;
3017
3018    use std::sync::Arc;
3019    use std::time::Duration;
3020    use storage_device::DeviceHolder;
3021    use storage_device::fake_device::FakeDevice;
3022    use test_case::test_case;
3023
3024    const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
3025
3026    async fn test_filesystem() -> OpenFxFilesystem {
3027        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
3028        FxFilesystem::new_empty(device).await.expect("new_empty failed")
3029    }
3030
3031    #[fuchsia::test]
3032    async fn test_item_sequences() {
3033        let fs = test_filesystem().await;
3034        let object1;
3035        let object2;
3036        let object3;
3037        let mut transaction = fs
3038            .clone()
3039            .new_transaction(lock_keys![], Options::default())
3040            .await
3041            .expect("new_transaction failed");
3042        let store = fs.root_store();
3043        object1 = Arc::new(
3044            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
3045                .await
3046                .expect("create_object failed"),
3047        );
3048        transaction.commit().await.expect("commit failed");
3049        let mut transaction = fs
3050            .clone()
3051            .new_transaction(lock_keys![], Options::default())
3052            .await
3053            .expect("new_transaction failed");
3054        object2 = Arc::new(
3055            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
3056                .await
3057                .expect("create_object failed"),
3058        );
3059        transaction.commit().await.expect("commit failed");
3060
3061        fs.sync(SyncOptions::default()).await.expect("sync failed");
3062
3063        let mut transaction = fs
3064            .clone()
3065            .new_transaction(lock_keys![], Options::default())
3066            .await
3067            .expect("new_transaction failed");
3068        object3 = Arc::new(
3069            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
3070                .await
3071                .expect("create_object failed"),
3072        );
3073        transaction.commit().await.expect("commit failed");
3074
3075        let layer_set = store.tree.layer_set();
3076        let mut merger = layer_set.merger();
3077        let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
3078        let mut sequences = [0u64; 3];
3079        while let Some(ItemRef { key: ObjectKey { object_id, .. }, sequence, .. }) = iter.get() {
3080            if *object_id == object1.object_id() {
3081                sequences[0] = sequence;
3082            } else if *object_id == object2.object_id() {
3083                sequences[1] = sequence;
3084            } else if *object_id == object3.object_id() {
3085                sequences[2] = sequence;
3086            }
3087            iter.advance().await.expect("advance failed");
3088        }
3089
3090        assert!(sequences[0] <= sequences[1], "sequences: {:?}", sequences);
3091        // The last item came after a sync, so should be strictly greater.
3092        assert!(sequences[1] < sequences[2], "sequences: {:?}", sequences);
3093        fs.close().await.expect("Close failed");
3094    }
3095
3096    #[fuchsia::test]
3097    async fn test_verified_file_with_verified_attribute() {
3098        let fs: OpenFxFilesystem = test_filesystem().await;
3099        let mut transaction = fs
3100            .clone()
3101            .new_transaction(lock_keys![], Options::default())
3102            .await
3103            .expect("new_transaction failed");
3104        let store = fs.root_store();
3105        let object = Arc::new(
3106            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
3107                .await
3108                .expect("create_object failed"),
3109        );
3110
3111        transaction.add(
3112            store.store_object_id(),
3113            Mutation::replace_or_insert_object(
3114                ObjectKey::attribute(
3115                    object.object_id(),
3116                    DEFAULT_DATA_ATTRIBUTE_ID,
3117                    AttributeKey::Attribute,
3118                ),
3119                ObjectValue::verified_attribute(
3120                    0,
3121                    FsverityMetadata::Internal(RootDigest::Sha256([0; 32]), vec![]),
3122                ),
3123            ),
3124        );
3125
3126        transaction.add(
3127            store.store_object_id(),
3128            Mutation::replace_or_insert_object(
3129                ObjectKey::attribute(
3130                    object.object_id(),
3131                    FSVERITY_MERKLE_ATTRIBUTE_ID,
3132                    AttributeKey::Attribute,
3133                ),
3134                ObjectValue::attribute(0, false),
3135            ),
3136        );
3137
3138        transaction.commit().await.unwrap();
3139
3140        let handle =
3141            ObjectStore::open_object(&store, object.object_id(), HandleOptions::default(), None)
3142                .await
3143                .expect("open_object failed");
3144
3145        assert!(handle.is_verified_file());
3146
3147        fs.close().await.expect("Close failed");
3148    }
3149
3150    #[fuchsia::test]
3151    async fn test_verified_file_without_verified_attribute() {
3152        let fs: OpenFxFilesystem = test_filesystem().await;
3153        let mut transaction = fs
3154            .clone()
3155            .new_transaction(lock_keys![], Options::default())
3156            .await
3157            .expect("new_transaction failed");
3158        let store = fs.root_store();
3159        let object = Arc::new(
3160            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
3161                .await
3162                .expect("create_object failed"),
3163        );
3164
3165        transaction.commit().await.unwrap();
3166
3167        let handle =
3168            ObjectStore::open_object(&store, object.object_id(), HandleOptions::default(), None)
3169                .await
3170                .expect("open_object failed");
3171
3172        assert!(!handle.is_verified_file());
3173
3174        fs.close().await.expect("Close failed");
3175    }
3176
3177    #[fuchsia::test]
3178    async fn test_create_and_open_store() {
3179        let fs = test_filesystem().await;
3180        let store_id = {
3181            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3182            root_volume
3183                .new_volume(
3184                    "test",
3185                    NewChildStoreOptions {
3186                        options: StoreOptions {
3187                            owner: NO_OWNER,
3188                            crypt: Some(Arc::new(new_insecure_crypt())),
3189                        },
3190                        ..Default::default()
3191                    },
3192                )
3193                .await
3194                .expect("new_volume failed")
3195                .store_object_id()
3196        };
3197
3198        fs.close().await.expect("close failed");
3199        let device = fs.take_device().await;
3200        device.reopen(false);
3201        let fs = FxFilesystem::open(device).await.expect("open failed");
3202
3203        {
3204            let store = fs.object_manager().store(store_id).expect("store not found");
3205            store.unlock(NO_OWNER, Arc::new(new_insecure_crypt())).await.expect("unlock failed");
3206        }
3207        fs.close().await.expect("Close failed");
3208    }
3209
3210    #[fuchsia::test]
3211    async fn test_create_and_open_internal_dir() {
3212        let fs = test_filesystem().await;
3213        let dir_id;
3214        let store_id;
3215        {
3216            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3217            let store = root_volume
3218                .new_volume(
3219                    "test",
3220                    NewChildStoreOptions {
3221                        options: StoreOptions {
3222                            owner: NO_OWNER,
3223                            crypt: Some(Arc::new(new_insecure_crypt())),
3224                        },
3225                        ..Default::default()
3226                    },
3227                )
3228                .await
3229                .expect("new_volume failed");
3230            dir_id =
3231                store.get_or_create_internal_directory_id().await.expect("Create internal dir");
3232            store_id = store.store_object_id();
3233        }
3234
3235        fs.close().await.expect("close failed");
3236        let device = fs.take_device().await;
3237        device.reopen(false);
3238        let fs = FxFilesystem::open(device).await.expect("open failed");
3239
3240        {
3241            let store = fs.object_manager().store(store_id).expect("store not found");
3242            store.unlock(NO_OWNER, Arc::new(new_insecure_crypt())).await.expect("unlock failed");
3243            assert_eq!(
3244                dir_id,
3245                store.get_or_create_internal_directory_id().await.expect("Retrieving dir")
3246            );
3247            let obj = store
3248                .tree()
3249                .find(&ObjectKey::object(dir_id))
3250                .await
3251                .expect("Searching tree for dir")
3252                .unwrap();
3253            assert_matches!(
3254                obj.value,
3255                ObjectValue::Object { kind: ObjectKind::Directory { .. }, .. }
3256            );
3257        }
3258        fs.close().await.expect("Close failed");
3259    }
3260
3261    #[fuchsia::test]
3262    async fn test_create_and_open_internal_dir_unencrypted() {
3263        let fs = test_filesystem().await;
3264        let dir_id;
3265        let store_id;
3266        {
3267            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3268            let store = root_volume
3269                .new_volume("test", NewChildStoreOptions::default())
3270                .await
3271                .expect("new_volume failed");
3272            dir_id =
3273                store.get_or_create_internal_directory_id().await.expect("Create internal dir");
3274            store_id = store.store_object_id();
3275        }
3276
3277        fs.close().await.expect("close failed");
3278        let device = fs.take_device().await;
3279        device.reopen(false);
3280        let fs = FxFilesystem::open(device).await.expect("open failed");
3281
3282        {
3283            let store = fs.object_manager().store(store_id).expect("store not found");
3284            assert_eq!(
3285                dir_id,
3286                store.get_or_create_internal_directory_id().await.expect("Retrieving dir")
3287            );
3288            let obj = store
3289                .tree()
3290                .find(&ObjectKey::object(dir_id))
3291                .await
3292                .expect("Searching tree for dir")
3293                .unwrap();
3294            assert_matches!(
3295                obj.value,
3296                ObjectValue::Object { kind: ObjectKind::Directory { .. }, .. }
3297            );
3298        }
3299        fs.close().await.expect("Close failed");
3300    }
3301
3302    #[fuchsia::test(threads = 10)]
3303    async fn test_old_layers_are_purged() {
3304        let fs = test_filesystem().await;
3305
3306        let store = fs.root_store();
3307        let mut transaction = fs
3308            .clone()
3309            .new_transaction(lock_keys![], Options::default())
3310            .await
3311            .expect("new_transaction failed");
3312        let object = Arc::new(
3313            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
3314                .await
3315                .expect("create_object failed"),
3316        );
3317        transaction.commit().await.expect("commit failed");
3318
3319        store.flush().await.expect("flush failed");
3320
3321        let mut buf = object.allocate_buffer(5).await;
3322        buf.as_mut_slice().copy_from_slice(b"hello");
3323        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3324
3325        // Getting the layer-set should cause the flush to stall.
3326        let layer_set = store.tree().layer_set();
3327
3328        let done = Mutex::new(false);
3329        let mut object_id = 0;
3330
3331        join!(
3332            async {
3333                store.flush().await.expect("flush failed");
3334                assert!(*done.lock());
3335            },
3336            async {
3337                // This is a halting problem so all we can do is sleep.
3338                fasync::Timer::new(Duration::from_secs(1)).await;
3339                *done.lock() = true;
3340                object_id = layer_set.layers.last().unwrap().handle().unwrap().object_id();
3341                std::mem::drop(layer_set);
3342            }
3343        );
3344
3345        if let Err(e) = ObjectStore::open_object(
3346            &store.parent_store.as_ref().unwrap(),
3347            object_id,
3348            HandleOptions::default(),
3349            store.crypt(),
3350        )
3351        .await
3352        {
3353            assert!(FxfsError::NotFound.matches(&e));
3354        } else {
3355            panic!("open_object succeeded");
3356        }
3357    }
3358
3359    #[fuchsia::test]
3360    async fn test_tombstone_deletes_data() {
3361        let fs = test_filesystem().await;
3362        let root_store = fs.root_store();
3363        let child_id = {
3364            let mut transaction = fs
3365                .clone()
3366                .new_transaction(lock_keys![], Options::default())
3367                .await
3368                .expect("new_transaction failed");
3369            let child = ObjectStore::create_object(
3370                &root_store,
3371                &mut transaction,
3372                HandleOptions::default(),
3373                None,
3374            )
3375            .await
3376            .expect("create_object failed");
3377            transaction.commit().await.expect("commit failed");
3378
3379            // Allocate an extent in the file.
3380            let mut buffer = child.allocate_buffer(8192).await;
3381            buffer.as_mut_slice().fill(0xaa);
3382            child.write_or_append(Some(0), buffer.as_ref()).await.expect("write failed");
3383
3384            child.object_id()
3385        };
3386
3387        root_store.tombstone_object(child_id, Options::default()).await.expect("tombstone failed");
3388
3389        // Let fsck check allocations.
3390        fsck(fs.clone()).await.expect("fsck failed");
3391    }
3392
3393    #[fuchsia::test]
3394    async fn test_tombstone_purges_keys() {
3395        let fs = test_filesystem().await;
3396        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3397        let store = root_volume
3398            .new_volume(
3399                "test",
3400                NewChildStoreOptions {
3401                    options: StoreOptions {
3402                        crypt: Some(Arc::new(new_insecure_crypt())),
3403                        ..StoreOptions::default()
3404                    },
3405                    ..NewChildStoreOptions::default()
3406                },
3407            )
3408            .await
3409            .expect("new_volume failed");
3410        let mut transaction = fs
3411            .clone()
3412            .new_transaction(lock_keys![], Options::default())
3413            .await
3414            .expect("new_transaction failed");
3415        let child =
3416            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
3417                .await
3418                .expect("create_object failed");
3419        transaction.commit().await.expect("commit failed");
3420        assert!(store.key_manager.get(child.object_id()).await.unwrap().is_some());
3421        store
3422            .tombstone_object(child.object_id(), Options::default())
3423            .await
3424            .expect("tombstone_object failed");
3425        assert!(store.key_manager.get(child.object_id()).await.unwrap().is_none());
3426        fs.close().await.expect("close failed");
3427    }
3428
3429    #[fuchsia::test]
3430    async fn test_major_compaction_discards_unnecessary_records() {
3431        let fs = test_filesystem().await;
3432        let root_store = fs.root_store();
3433        let child_id = {
3434            let mut transaction = fs
3435                .clone()
3436                .new_transaction(lock_keys![], Options::default())
3437                .await
3438                .expect("new_transaction failed");
3439            let child = ObjectStore::create_object(
3440                &root_store,
3441                &mut transaction,
3442                HandleOptions::default(),
3443                None,
3444            )
3445            .await
3446            .expect("create_object failed");
3447            transaction.commit().await.expect("commit failed");
3448
3449            // Allocate an extent in the file.
3450            let mut buffer = child.allocate_buffer(8192).await;
3451            buffer.as_mut_slice().fill(0xaa);
3452            child.write_or_append(Some(0), buffer.as_ref()).await.expect("write failed");
3453
3454            child.object_id()
3455        };
3456
3457        root_store.tombstone_object(child_id, Options::default()).await.expect("tombstone failed");
3458        {
3459            let layers = root_store.tree.layer_set();
3460            let mut merger = layers.merger();
3461            let iter = merger
3462                .query(Query::FullRange(&ObjectKey::object(child_id)))
3463                .await
3464                .expect("seek failed");
3465            // Find at least one object still in the tree.
3466            match iter.get() {
3467                Some(ItemRef { key: ObjectKey { object_id, .. }, .. })
3468                    if *object_id == child_id => {}
3469                _ => panic!("Objects should still be in the tree."),
3470            }
3471        }
3472        root_store.flush().await.expect("flush failed");
3473
3474        // There should be no records for the object.
3475        let layers = root_store.tree.layer_set();
3476        let mut merger = layers.merger();
3477        let iter = merger
3478            .query(Query::FullRange(&ObjectKey::object(child_id)))
3479            .await
3480            .expect("seek failed");
3481        match iter.get() {
3482            None => {}
3483            Some(ItemRef { key: ObjectKey { object_id, .. }, .. }) => {
3484                assert_ne!(*object_id, child_id)
3485            }
3486        }
3487    }
3488
3489    #[fuchsia::test]
3490    async fn test_overlapping_extents_in_different_layers() {
3491        let fs = test_filesystem().await;
3492        let store = fs.root_store();
3493
3494        let mut transaction = fs
3495            .clone()
3496            .new_transaction(
3497                lock_keys![LockKey::object(
3498                    store.store_object_id(),
3499                    store.root_directory_object_id()
3500                )],
3501                Options::default(),
3502            )
3503            .await
3504            .expect("new_transaction failed");
3505        let root_directory =
3506            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
3507        let object = root_directory
3508            .create_child_file(&mut transaction, "test")
3509            .await
3510            .expect("create_child_file failed");
3511        transaction.commit().await.expect("commit failed");
3512
3513        let buf = object.allocate_buffer(16384).await;
3514        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3515
3516        store.flush().await.expect("flush failed");
3517
3518        object.write_or_append(Some(0), buf.subslice(0..4096)).await.expect("write failed");
3519
3520        // At this point, we should have an extent for 0..16384 in a layer that has been flushed,
3521        // and an extent for 0..4096 that partially overwrites it.  Writing to 0..16384 should
3522        // overwrite both of those extents.
3523        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3524
3525        fsck(fs.clone()).await.expect("fsck failed");
3526    }
3527
3528    #[fuchsia::test(threads = 10)]
3529    async fn test_encrypted_mutations() {
3530        async fn one_iteration(
3531            fs: OpenFxFilesystem,
3532            crypt: Arc<dyn Crypt>,
3533            iteration: u64,
3534        ) -> OpenFxFilesystem {
3535            async fn reopen(fs: OpenFxFilesystem) -> OpenFxFilesystem {
3536                fs.close().await.expect("Close failed");
3537                let device = fs.take_device().await;
3538                device.reopen(false);
3539                FxFilesystem::open(device).await.expect("FS open failed")
3540            }
3541
3542            let fs = reopen(fs).await;
3543
3544            let (store_object_id, object_id) = {
3545                let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3546                let store = root_volume
3547                    .volume(
3548                        "test",
3549                        StoreOptions { crypt: Some(crypt.clone()), ..StoreOptions::default() },
3550                    )
3551                    .await
3552                    .expect("volume failed");
3553
3554                let mut transaction = fs
3555                    .clone()
3556                    .new_transaction(
3557                        lock_keys![LockKey::object(
3558                            store.store_object_id(),
3559                            store.root_directory_object_id(),
3560                        )],
3561                        Options::default(),
3562                    )
3563                    .await
3564                    .expect("new_transaction failed");
3565                let root_directory = Directory::open(&store, store.root_directory_object_id())
3566                    .await
3567                    .expect("open failed");
3568                let object = root_directory
3569                    .create_child_file(&mut transaction, &format!("test {}", iteration))
3570                    .await
3571                    .expect("create_child_file failed");
3572                transaction.commit().await.expect("commit failed");
3573
3574                let mut buf = object.allocate_buffer(1000).await;
3575                for i in 0..buf.len() {
3576                    buf.as_mut_slice()[i] = i as u8;
3577                }
3578                object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3579
3580                (store.store_object_id(), object.object_id())
3581            };
3582
3583            let fs = reopen(fs).await;
3584
3585            let check_object = |fs: Arc<FxFilesystem>| {
3586                let crypt = crypt.clone();
3587                async move {
3588                    let root_volume = root_volume(fs).await.expect("root_volume failed");
3589                    let volume = root_volume
3590                        .volume(
3591                            "test",
3592                            StoreOptions { crypt: Some(crypt), ..StoreOptions::default() },
3593                        )
3594                        .await
3595                        .expect("volume failed");
3596
3597                    let object = ObjectStore::open_object(
3598                        &volume,
3599                        object_id,
3600                        HandleOptions::default(),
3601                        None,
3602                    )
3603                    .await
3604                    .expect("open_object failed");
3605                    let mut buf = object.allocate_buffer(1000).await;
3606                    assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), 1000);
3607                    for i in 0..buf.len() {
3608                        assert_eq!(buf.as_slice()[i], i as u8);
3609                    }
3610                }
3611            };
3612
3613            check_object(fs.clone()).await;
3614
3615            let fs = reopen(fs).await;
3616
3617            // At this point the "test" volume is locked.  Before checking the object, flush the
3618            // filesystem.  This should leave a file with encrypted mutations.
3619            fs.object_manager().flush().await.expect("flush failed");
3620
3621            assert_ne!(
3622                fs.object_manager()
3623                    .store(store_object_id)
3624                    .unwrap()
3625                    .load_store_info()
3626                    .await
3627                    .expect("load_store_info failed")
3628                    .encrypted_mutations_object_id,
3629                INVALID_OBJECT_ID
3630            );
3631
3632            check_object(fs.clone()).await;
3633
3634            // Checking the object should have triggered a flush and so now there should be no
3635            // encrypted mutations object.
3636            assert_eq!(
3637                fs.object_manager()
3638                    .store(store_object_id)
3639                    .unwrap()
3640                    .load_store_info()
3641                    .await
3642                    .expect("load_store_info failed")
3643                    .encrypted_mutations_object_id,
3644                INVALID_OBJECT_ID
3645            );
3646
3647            let fs = reopen(fs).await;
3648
3649            fsck(fs.clone()).await.expect("fsck failed");
3650
3651            let fs = reopen(fs).await;
3652
3653            check_object(fs.clone()).await;
3654
3655            fs
3656        }
3657
3658        let mut fs = test_filesystem().await;
3659        let crypt = Arc::new(new_insecure_crypt());
3660
3661        {
3662            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3663            let _store = root_volume
3664                .new_volume(
3665                    "test",
3666                    NewChildStoreOptions {
3667                        options: StoreOptions {
3668                            crypt: Some(crypt.clone()),
3669                            ..StoreOptions::default()
3670                        },
3671                        ..Default::default()
3672                    },
3673                )
3674                .await
3675                .expect("new_volume failed");
3676        }
3677
3678        // Run a few iterations so that we test changes with the stream cipher offset.
3679        for i in 0..5 {
3680            fs = one_iteration(fs, crypt.clone(), i).await;
3681        }
3682    }
3683
3684    #[test_case(true; "with a flush")]
3685    #[test_case(false; "without a flush")]
3686    #[fuchsia::test(threads = 10)]
3687    async fn test_object_id_cipher_roll(with_flush: bool) {
3688        let fs = test_filesystem().await;
3689        let crypt = Arc::new(new_insecure_crypt());
3690
3691        let expected_key = {
3692            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3693            let store = root_volume
3694                .new_volume(
3695                    "test",
3696                    NewChildStoreOptions {
3697                        options: StoreOptions {
3698                            crypt: Some(crypt.clone()),
3699                            ..StoreOptions::default()
3700                        },
3701                        ..Default::default()
3702                    },
3703                )
3704                .await
3705                .expect("new_volume failed");
3706
3707            // Create some files so that our in-memory copy of StoreInfo has changes (the object
3708            // count) pending a flush.
3709            let root_dir_id = store.root_directory_object_id();
3710            let root_dir =
3711                Arc::new(Directory::open(&store, root_dir_id).await.expect("open failed"));
3712            let mut transaction = fs
3713                .clone()
3714                .new_transaction(
3715                    lock_keys![LockKey::object(store.store_object_id(), root_dir_id)],
3716                    Options::default(),
3717                )
3718                .await
3719                .expect("new_transaction failed");
3720            for i in 0..10 {
3721                root_dir.create_child_file(&mut transaction, &format!("file {i}")).await.unwrap();
3722            }
3723            transaction.commit().await.expect("commit failed");
3724
3725            let orig_store_info = store.store_info().unwrap();
3726
3727            // Hack the last object ID to force a roll of the object ID cipher.
3728            {
3729                let mut last_object_id = store.last_object_id.lock();
3730                match &mut *last_object_id {
3731                    LastObjectId::Encrypted { id, .. } => {
3732                        assert_eq!(*id & OBJECT_ID_HI_MASK, 0);
3733                        *id |= 0xffffffff;
3734                    }
3735                    _ => unreachable!(),
3736                }
3737            }
3738
3739            let mut transaction = fs
3740                .clone()
3741                .new_transaction(
3742                    lock_keys![LockKey::object(
3743                        store.store_object_id(),
3744                        store.root_directory_object_id()
3745                    )],
3746                    Options::default(),
3747                )
3748                .await
3749                .expect("new_transaction failed");
3750            let root_directory = Directory::open(&store, store.root_directory_object_id())
3751                .await
3752                .expect("open failed");
3753            let object = root_directory
3754                .create_child_file(&mut transaction, "test")
3755                .await
3756                .expect("create_child_file failed");
3757            transaction.commit().await.expect("commit failed");
3758
3759            assert_eq!(object.object_id() & OBJECT_ID_HI_MASK, 1u64 << 32);
3760
3761            // Check that the key has been changed.
3762            let key = match (
3763                store.store_info().unwrap().last_object_id,
3764                orig_store_info.last_object_id,
3765            ) {
3766                (
3767                    LastObjectIdInfo::Encrypted { key, id },
3768                    LastObjectIdInfo::Encrypted { key: orig_key, .. },
3769                ) => {
3770                    assert_ne!(key, orig_key);
3771                    assert_eq!(id, 1u64 << 32);
3772                    key
3773                }
3774                _ => unreachable!(),
3775            };
3776
3777            if with_flush {
3778                fs.journal().force_compact().await.unwrap();
3779            }
3780
3781            let last_object_id = store.last_object_id.lock();
3782            assert_eq!(last_object_id.id(), 1u64 << 32);
3783            key
3784        };
3785
3786        fs.close().await.expect("Close failed");
3787        let device = fs.take_device().await;
3788        device.reopen(false);
3789        let fs = FxFilesystem::open(device).await.expect("open failed");
3790        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3791        let store = root_volume
3792            .volume("test", StoreOptions { crypt: Some(crypt.clone()), ..StoreOptions::default() })
3793            .await
3794            .expect("volume failed");
3795
3796        assert_matches!(store.store_info().unwrap().last_object_id, LastObjectIdInfo::Encrypted { key, .. } if key == expected_key);
3797        assert_eq!(store.last_object_id.lock().id(), 1u64 << 32);
3798
3799        fsck(fs.clone()).await.expect("fsck failed");
3800        fsck_volume(&fs, store.store_object_id(), None).await.expect("fsck_volume failed");
3801    }
3802
3803    #[fuchsia::test(threads = 2)]
3804    async fn test_race_object_id_cipher_roll_and_flush() {
3805        let fs = test_filesystem().await;
3806        let crypt = Arc::new(new_insecure_crypt());
3807
3808        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3809        let store = root_volume
3810            .new_volume(
3811                "test",
3812                NewChildStoreOptions {
3813                    options: StoreOptions { crypt: Some(crypt.clone()), ..StoreOptions::default() },
3814                    ..Default::default()
3815                },
3816            )
3817            .await
3818            .expect("new_volume failed");
3819
3820        assert!(matches!(&*store.last_object_id.lock(), LastObjectId::Encrypted { .. }));
3821
3822        // Create some files so that our in-memory copy of StoreInfo has changes (the object
3823        // count) pending a flush.
3824        let root_dir_id = store.root_directory_object_id();
3825        let root_dir = Arc::new(Directory::open(&store, root_dir_id).await.expect("open failed"));
3826
3827        let _executor_tasks = testing::force_executor_threads_to_run(2).await;
3828
3829        for j in 0..100 {
3830            let mut transaction = fs
3831                .clone()
3832                .new_transaction(
3833                    lock_keys![LockKey::object(store.store_object_id(), root_dir_id)],
3834                    Options::default(),
3835                )
3836                .await
3837                .expect("new_transaction failed");
3838            root_dir.create_child_file(&mut transaction, &format!("file {j}")).await.unwrap();
3839            transaction.commit().await.expect("commit failed");
3840
3841            let task = {
3842                let fs = fs.clone();
3843                fasync::Task::spawn(async move {
3844                    fs.journal().force_compact().await.unwrap();
3845                })
3846            };
3847
3848            // Hack the last object ID to force a roll of the object ID cipher.
3849            {
3850                let mut last_object_id = store.last_object_id.lock();
3851                let LastObjectId::Encrypted { id, .. } = &mut *last_object_id else {
3852                    unreachable!()
3853                };
3854                assert_eq!(*id >> 32, j);
3855                *id |= 0xffffffff;
3856            }
3857
3858            let mut transaction = fs
3859                .clone()
3860                .new_transaction(
3861                    lock_keys![LockKey::object(
3862                        store.store_object_id(),
3863                        store.root_directory_object_id()
3864                    )],
3865                    Options::default(),
3866                )
3867                .await
3868                .expect("new_transaction failed");
3869            let root_directory = Directory::open(&store, store.root_directory_object_id())
3870                .await
3871                .expect("open failed");
3872            root_directory
3873                .create_child_file(&mut transaction, "test {j}")
3874                .await
3875                .expect("create_child_file failed");
3876            transaction.commit().await.expect("commit failed");
3877
3878            task.await;
3879
3880            // Check that the key has been changed.
3881            let new_store_info = store.load_store_info().await.unwrap();
3882
3883            let LastObjectIdInfo::Encrypted { id, key } = new_store_info.last_object_id else {
3884                unreachable!()
3885            };
3886            assert_eq!(id >> 32, j + 1);
3887            let LastObjectIdInfo::Encrypted { key: in_memory_key, .. } =
3888                store.store_info().unwrap().last_object_id
3889            else {
3890                unreachable!()
3891            };
3892            assert_eq!(key, in_memory_key);
3893        }
3894
3895        fs.close().await.expect("Close failed");
3896    }
3897
3898    #[fuchsia::test]
3899    async fn test_object_id_no_roll_for_unencrypted_store() {
3900        let fs = test_filesystem().await;
3901
3902        {
3903            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3904            let store = root_volume
3905                .new_volume("test", NewChildStoreOptions::default())
3906                .await
3907                .expect("new_volume failed");
3908
3909            // Hack the last object ID.
3910            {
3911                let mut last_object_id = store.last_object_id.lock();
3912                match &mut *last_object_id {
3913                    LastObjectId::Unencrypted { id } => {
3914                        assert_eq!(*id & OBJECT_ID_HI_MASK, 0);
3915                        *id |= 0xffffffff;
3916                    }
3917                    _ => unreachable!(),
3918                }
3919            }
3920
3921            let mut transaction = fs
3922                .clone()
3923                .new_transaction(
3924                    lock_keys![LockKey::object(
3925                        store.store_object_id(),
3926                        store.root_directory_object_id()
3927                    )],
3928                    Options::default(),
3929                )
3930                .await
3931                .expect("new_transaction failed");
3932            let root_directory = Directory::open(&store, store.root_directory_object_id())
3933                .await
3934                .expect("open failed");
3935            let object = root_directory
3936                .create_child_file(&mut transaction, "test")
3937                .await
3938                .expect("create_child_file failed");
3939            transaction.commit().await.expect("commit failed");
3940
3941            assert_eq!(object.object_id(), 0x1_0000_0000);
3942
3943            // Check that there is still no key.
3944            assert_matches!(
3945                store.store_info().unwrap().last_object_id,
3946                LastObjectIdInfo::Unencrypted { .. }
3947            );
3948
3949            assert_eq!(store.last_object_id.lock().id(), 0x1_0000_0000);
3950        };
3951
3952        fs.close().await.expect("Close failed");
3953        let device = fs.take_device().await;
3954        device.reopen(false);
3955        let fs = FxFilesystem::open(device).await.expect("open failed");
3956        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3957        let store =
3958            root_volume.volume("test", StoreOptions::default()).await.expect("volume failed");
3959
3960        assert_eq!(store.last_object_id.lock().id(), 0x1_0000_0000);
3961    }
3962
3963    #[fuchsia::test]
3964    fn test_object_id_is_not_invalid_object_id() {
3965        let key = UnwrappedKey::new(vec![0; FXFS_KEY_SIZE]);
3966        // 1106634048 results in INVALID_OBJECT_ID with this key.
3967        let mut last_object_id =
3968            LastObjectId::Encrypted { id: 1106634047, cipher: Box::new(Ff1::new(&key)) };
3969        assert!(last_object_id.try_get_next().is_some());
3970        assert!(last_object_id.try_get_next().is_some());
3971    }
3972
3973    #[fuchsia::test]
3974    async fn test_last_object_id_is_correct_after_unlock() {
3975        let fs = test_filesystem().await;
3976        let crypt = Arc::new(new_insecure_crypt());
3977
3978        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3979        let store = root_volume
3980            .new_volume(
3981                "test",
3982                NewChildStoreOptions {
3983                    options: StoreOptions { crypt: Some(crypt.clone()), ..StoreOptions::default() },
3984                    ..Default::default()
3985                },
3986            )
3987            .await
3988            .expect("new_volume failed");
3989
3990        let mut transaction = fs
3991            .clone()
3992            .new_transaction(
3993                lock_keys![LockKey::object(
3994                    store.store_object_id(),
3995                    store.root_directory_object_id()
3996                )],
3997                Options::default(),
3998            )
3999            .await
4000            .expect("new_transaction failed");
4001        let root_directory =
4002            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
4003        root_directory
4004            .create_child_file(&mut transaction, "test")
4005            .await
4006            .expect("create_child_file failed");
4007        transaction.commit().await.expect("commit failed");
4008
4009        // Compact so that StoreInfo is written.
4010        fs.journal().force_compact().await.unwrap();
4011
4012        let last_object_id = store.last_object_id.lock().id();
4013
4014        store.lock().await.unwrap();
4015        store.unlock(NO_OWNER, crypt.clone()).await.unwrap();
4016
4017        assert_eq!(store.last_object_id.lock().id(), last_object_id);
4018    }
4019
4020    #[fuchsia::test(threads = 20)]
4021    async fn test_race_when_rolling_last_object_id_cipher() {
4022        // NOTE: This test is trying to test a race, so if it fails, it might be flaky.
4023
4024        const NUM_THREADS: usize = 20;
4025
4026        let fs = test_filesystem().await;
4027        let crypt = Arc::new(new_insecure_crypt());
4028
4029        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
4030        let store = root_volume
4031            .new_volume(
4032                "test",
4033                NewChildStoreOptions {
4034                    options: StoreOptions { crypt: Some(crypt.clone()), ..StoreOptions::default() },
4035                    ..Default::default()
4036                },
4037            )
4038            .await
4039            .expect("new_volume failed");
4040
4041        let store_id = store.store_object_id();
4042        let root_dir_id = store.root_directory_object_id();
4043
4044        let root_directory =
4045            Arc::new(Directory::open(&store, root_dir_id).await.expect("open failed"));
4046
4047        // Create directories.
4048        let mut directories = Vec::new();
4049        for _ in 0..NUM_THREADS {
4050            let mut transaction = fs
4051                .clone()
4052                .new_transaction(
4053                    lock_keys![LockKey::object(store_id, root_dir_id,)],
4054                    Options::default(),
4055                )
4056                .await
4057                .expect("new_transaction failed");
4058            directories.push(
4059                root_directory
4060                    .create_child_dir(&mut transaction, "test")
4061                    .await
4062                    .expect("create_child_file failed"),
4063            );
4064            transaction.commit().await.expect("commit failed");
4065        }
4066
4067        // Hack the last object ID so that the next ID will require a roll.
4068        match &mut *store.last_object_id.lock() {
4069            LastObjectId::Encrypted { id, .. } => *id |= 0xffff_ffff,
4070            _ => unreachable!(),
4071        }
4072
4073        let scope = fasync::Scope::new();
4074
4075        let _executor_tasks = testing::force_executor_threads_to_run(NUM_THREADS).await;
4076
4077        for dir in directories {
4078            let fs = fs.clone();
4079            scope.spawn(async move {
4080                let mut transaction = fs
4081                    .clone()
4082                    .new_transaction(
4083                        lock_keys![LockKey::object(store_id, dir.object_id(),)],
4084                        Options::default(),
4085                    )
4086                    .await
4087                    .expect("new_transaction failed");
4088                dir.create_child_file(&mut transaction, "test")
4089                    .await
4090                    .expect("create_child_file failed");
4091                transaction.commit().await.expect("commit failed");
4092            });
4093        }
4094
4095        scope.on_no_tasks().await;
4096
4097        assert_eq!(store.last_object_id.lock().id(), 0x1_0000_0000 + NUM_THREADS as u64 - 1);
4098    }
4099
4100    #[fuchsia::test(threads = 10)]
4101    async fn test_lock_store() {
4102        let fs = test_filesystem().await;
4103        let crypt = Arc::new(new_insecure_crypt());
4104
4105        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
4106        let store = root_volume
4107            .new_volume(
4108                "test",
4109                NewChildStoreOptions {
4110                    options: StoreOptions { crypt: Some(crypt.clone()), ..StoreOptions::default() },
4111                    ..NewChildStoreOptions::default()
4112                },
4113            )
4114            .await
4115            .expect("new_volume failed");
4116        let mut transaction = fs
4117            .clone()
4118            .new_transaction(
4119                lock_keys![LockKey::object(
4120                    store.store_object_id(),
4121                    store.root_directory_object_id()
4122                )],
4123                Options::default(),
4124            )
4125            .await
4126            .expect("new_transaction failed");
4127        let root_directory =
4128            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
4129        root_directory
4130            .create_child_file(&mut transaction, "test")
4131            .await
4132            .expect("create_child_file failed");
4133        transaction.commit().await.expect("commit failed");
4134        store.lock().await.expect("lock failed");
4135
4136        store.unlock(NO_OWNER, crypt).await.expect("unlock failed");
4137        root_directory.lookup("test").await.expect("lookup failed").expect("not found");
4138    }
4139
4140    #[fuchsia::test(threads = 10)]
4141    async fn test_unlock_read_only() {
4142        let fs = test_filesystem().await;
4143        let crypt = Arc::new(new_insecure_crypt());
4144
4145        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
4146        let store = root_volume
4147            .new_volume(
4148                "test",
4149                NewChildStoreOptions {
4150                    options: StoreOptions { crypt: Some(crypt.clone()), ..StoreOptions::default() },
4151                    ..NewChildStoreOptions::default()
4152                },
4153            )
4154            .await
4155            .expect("new_volume failed");
4156        let mut transaction = fs
4157            .clone()
4158            .new_transaction(
4159                lock_keys![LockKey::object(
4160                    store.store_object_id(),
4161                    store.root_directory_object_id()
4162                )],
4163                Options::default(),
4164            )
4165            .await
4166            .expect("new_transaction failed");
4167        let root_directory =
4168            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
4169        root_directory
4170            .create_child_file(&mut transaction, "test")
4171            .await
4172            .expect("create_child_file failed");
4173        transaction.commit().await.expect("commit failed");
4174        store.lock().await.expect("lock failed");
4175
4176        store.unlock_read_only(crypt.clone()).await.expect("unlock failed");
4177        root_directory.lookup("test").await.expect("lookup failed").expect("not found");
4178        store.lock_read_only();
4179        store.unlock_read_only(crypt).await.expect("unlock failed");
4180        root_directory.lookup("test").await.expect("lookup failed").expect("not found");
4181    }
4182
4183    #[fuchsia::test(threads = 10)]
4184    async fn test_key_rolled_when_unlocked() {
4185        let fs = test_filesystem().await;
4186        let crypt = Arc::new(new_insecure_crypt());
4187
4188        let object_id;
4189        {
4190            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
4191            let store = root_volume
4192                .new_volume(
4193                    "test",
4194                    NewChildStoreOptions {
4195                        options: StoreOptions {
4196                            crypt: Some(crypt.clone()),
4197                            ..StoreOptions::default()
4198                        },
4199                        ..Default::default()
4200                    },
4201                )
4202                .await
4203                .expect("new_volume failed");
4204            let mut transaction = fs
4205                .clone()
4206                .new_transaction(
4207                    lock_keys![LockKey::object(
4208                        store.store_object_id(),
4209                        store.root_directory_object_id()
4210                    )],
4211                    Options::default(),
4212                )
4213                .await
4214                .expect("new_transaction failed");
4215            let root_directory = Directory::open(&store, store.root_directory_object_id())
4216                .await
4217                .expect("open failed");
4218            object_id = root_directory
4219                .create_child_file(&mut transaction, "test")
4220                .await
4221                .expect("create_child_file failed")
4222                .object_id();
4223            transaction.commit().await.expect("commit failed");
4224        }
4225
4226        fs.close().await.expect("Close failed");
4227        let mut device = fs.take_device().await;
4228
4229        // Repeatedly remount so that we can be sure that we can remount when there are many
4230        // mutations keys.
4231        for _ in 0..100 {
4232            device.reopen(false);
4233            let fs = FxFilesystem::open(device).await.expect("open failed");
4234            {
4235                let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
4236                let store = root_volume
4237                    .volume(
4238                        "test",
4239                        StoreOptions { crypt: Some(crypt.clone()), ..StoreOptions::default() },
4240                    )
4241                    .await
4242                    .expect("open_volume failed");
4243
4244                // The key should get rolled every time we unlock.
4245                assert_eq!(store.mutations_cipher.lock().as_ref().unwrap().offset(), 0);
4246
4247                // Make sure there's an encrypted mutation.
4248                let handle =
4249                    ObjectStore::open_object(&store, object_id, HandleOptions::default(), None)
4250                        .await
4251                        .expect("open_object failed");
4252                let buffer = handle.allocate_buffer(100).await;
4253                handle
4254                    .write_or_append(Some(0), buffer.as_ref())
4255                    .await
4256                    .expect("write_or_append failed");
4257            }
4258            fs.close().await.expect("Close failed");
4259            device = fs.take_device().await;
4260        }
4261    }
4262
4263    #[test]
4264    fn test_store_info_max_serialized_size() {
4265        let info = StoreInfo {
4266            guid: [0xff; 16],
4267            last_object_id: LastObjectIdInfo::Encrypted {
4268                id: 0x1234567812345678,
4269                key: FxfsKey {
4270                    wrapping_key_id: 0x1234567812345678u128.to_le_bytes(),
4271                    key: WrappedKeyBytes::from([0xff; FXFS_WRAPPED_KEY_SIZE]),
4272                },
4273            },
4274            // Worst case, each layer should be 3/4 the size of the layer below it (because of the
4275            // compaction policy we're using).  If the smallest layer is 8,192 bytes, then 120
4276            // layers would take up a size that exceeds a 64 bit unsigned integer, so if this fits,
4277            // any size should fit.
4278            layers: vec![0x1234567812345678; 120],
4279            root_directory_object_id: 0x1234567812345678,
4280            graveyard_directory_object_id: 0x1234567812345678,
4281            object_count: 0x1234567812345678,
4282            mutations_key: Some(FxfsKey {
4283                wrapping_key_id: 0x1234567812345678u128.to_le_bytes(),
4284                key: WrappedKeyBytes::from([0xff; FXFS_WRAPPED_KEY_SIZE]),
4285            }),
4286            mutations_cipher_offset: 0x1234567812345678,
4287            encrypted_mutations_object_id: 0x1234567812345678,
4288            internal_directory_object_id: INVALID_OBJECT_ID,
4289        };
4290        let mut serialized_info = Vec::new();
4291        info.serialize_with_version(&mut serialized_info).unwrap();
4292        assert!(
4293            serialized_info.len() <= MAX_STORE_INFO_SERIALIZED_SIZE,
4294            "{}",
4295            serialized_info.len()
4296        );
4297    }
4298
4299    async fn reopen_after_crypt_failure_inner(read_only: bool) {
4300        let fs = test_filesystem().await;
4301        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
4302
4303        let store = {
4304            let crypt = Arc::new(new_insecure_crypt());
4305            let store = root_volume
4306                .new_volume(
4307                    "vol",
4308                    NewChildStoreOptions {
4309                        options: StoreOptions {
4310                            crypt: Some(crypt.clone()),
4311                            ..StoreOptions::default()
4312                        },
4313                        ..Default::default()
4314                    },
4315                )
4316                .await
4317                .expect("new_volume failed");
4318            let root_directory = Directory::open(&store, store.root_directory_object_id())
4319                .await
4320                .expect("open failed");
4321            let mut transaction = fs
4322                .clone()
4323                .new_transaction(
4324                    lock_keys![LockKey::object(
4325                        store.store_object_id(),
4326                        root_directory.object_id()
4327                    )],
4328                    Options::default(),
4329                )
4330                .await
4331                .expect("new_transaction failed");
4332            root_directory
4333                .create_child_file(&mut transaction, "test")
4334                .await
4335                .expect("create_child_file failed");
4336            transaction.commit().await.expect("commit failed");
4337
4338            crypt.shutdown();
4339            let mut transaction = fs
4340                .clone()
4341                .new_transaction(
4342                    lock_keys![LockKey::object(
4343                        store.store_object_id(),
4344                        root_directory.object_id()
4345                    )],
4346                    Options::default(),
4347                )
4348                .await
4349                .expect("new_transaction failed");
4350            root_directory
4351                .create_child_file(&mut transaction, "test2")
4352                .await
4353                .map(|_| ())
4354                .expect_err("create_child_file should fail");
4355            store.lock().await.expect("lock failed");
4356            store
4357        };
4358
4359        let crypt = Arc::new(new_insecure_crypt());
4360        if read_only {
4361            store.unlock_read_only(crypt).await.expect("unlock failed");
4362        } else {
4363            store.unlock(NO_OWNER, crypt).await.expect("unlock failed");
4364        }
4365        let root_directory =
4366            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
4367        root_directory.lookup("test").await.expect("lookup failed").expect("not found");
4368    }
4369
4370    #[fuchsia::test(threads = 10)]
4371    async fn test_reopen_after_crypt_failure() {
4372        reopen_after_crypt_failure_inner(false).await;
4373    }
4374
4375    #[fuchsia::test(threads = 10)]
4376    async fn test_reopen_read_only_after_crypt_failure() {
4377        reopen_after_crypt_failure_inner(true).await;
4378    }
4379
4380    #[fuchsia::test(threads = 10)]
4381    #[should_panic(expected = "Insufficient reservation space")]
4382    #[cfg(debug_assertions)]
4383    async fn large_transaction_causes_panic_in_debug_builds() {
4384        let fs = test_filesystem().await;
4385        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
4386        let store = root_volume
4387            .new_volume("vol", NewChildStoreOptions::default())
4388            .await
4389            .expect("new_volume failed");
4390        let root_directory =
4391            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
4392        let mut transaction = fs
4393            .clone()
4394            .new_transaction(
4395                lock_keys![LockKey::object(store.store_object_id(), root_directory.object_id())],
4396                Options::default(),
4397            )
4398            .await
4399            .expect("transaction");
4400        for i in 0..500 {
4401            root_directory
4402                .create_symlink(&mut transaction, b"link", &format!("{}", i))
4403                .await
4404                .expect("symlink");
4405        }
4406        assert_eq!(transaction.commit().await.expect("commit"), 0);
4407    }
4408
4409    #[fuchsia::test]
4410    async fn test_crypt_failure_does_not_fuse_journal() {
4411        let fs = test_filesystem().await;
4412
4413        struct Owner;
4414        #[async_trait]
4415        impl StoreOwner for Owner {
4416            async fn force_lock(self: Arc<Self>, store: &ObjectStore) -> Result<(), anyhow::Error> {
4417                store.lock().await
4418            }
4419        }
4420        let owner = Arc::new(Owner) as Arc<dyn StoreOwner>;
4421
4422        {
4423            // Create two stores and a record for each store, so the journal will need to flush them
4424            // both later.
4425            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
4426            let store1 = root_volume
4427                .new_volume(
4428                    "vol1",
4429                    NewChildStoreOptions {
4430                        options: StoreOptions {
4431                            crypt: Some(Arc::new(new_insecure_crypt())),
4432                            ..StoreOptions::default()
4433                        },
4434                        ..Default::default()
4435                    },
4436                )
4437                .await
4438                .expect("new_volume failed");
4439            let crypt = Arc::new(new_insecure_crypt());
4440            let store2 = root_volume
4441                .new_volume(
4442                    "vol2",
4443                    NewChildStoreOptions {
4444                        options: StoreOptions {
4445                            owner: Arc::downgrade(&owner),
4446                            crypt: Some(crypt.clone()),
4447                        },
4448                        ..Default::default()
4449                    },
4450                )
4451                .await
4452                .expect("new_volume failed");
4453            for store in [&store1, &store2] {
4454                let root_directory = Directory::open(store, store.root_directory_object_id())
4455                    .await
4456                    .expect("open failed");
4457                let mut transaction = fs
4458                    .clone()
4459                    .new_transaction(
4460                        lock_keys![LockKey::object(
4461                            store.store_object_id(),
4462                            root_directory.object_id()
4463                        )],
4464                        Options::default(),
4465                    )
4466                    .await
4467                    .expect("new_transaction failed");
4468                root_directory
4469                    .create_child_file(&mut transaction, "test")
4470                    .await
4471                    .expect("create_child_file failed");
4472                transaction.commit().await.expect("commit failed");
4473            }
4474            // Shut down the crypt instance for store2, and then compact.  Compaction should not
4475            // fail, and the store should become locked.
4476            crypt.shutdown();
4477            fs.journal().force_compact().await.expect("compact failed");
4478            // The store should now be locked.
4479            assert!(store2.is_locked());
4480        }
4481
4482        // Even though the store wasn't flushed, the mutation to store2 will still be valid as it is
4483        // held in the journal.
4484        fs.close().await.expect("close failed");
4485        let device = fs.take_device().await;
4486        device.reopen(false);
4487        let fs = FxFilesystem::open(device).await.expect("open failed");
4488        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
4489
4490        for volume_name in ["vol1", "vol2"] {
4491            let store = root_volume
4492                .volume(
4493                    volume_name,
4494                    StoreOptions {
4495                        crypt: Some(Arc::new(new_insecure_crypt())),
4496                        ..StoreOptions::default()
4497                    },
4498                )
4499                .await
4500                .expect("open volume failed");
4501            let root_directory = Directory::open(&store, store.root_directory_object_id())
4502                .await
4503                .expect("open failed");
4504            assert!(root_directory.lookup("test").await.expect("lookup failed").is_some());
4505        }
4506
4507        fs.close().await.expect("close failed");
4508    }
4509
4510    #[fuchsia::test]
4511    async fn test_crypt_failure_during_unlock_race() {
4512        let fs = test_filesystem().await;
4513
4514        struct Owner;
4515        #[async_trait]
4516        impl StoreOwner for Owner {
4517            async fn force_lock(self: Arc<Self>, store: &ObjectStore) -> Result<(), anyhow::Error> {
4518                store.lock().await
4519            }
4520        }
4521        let owner = Arc::new(Owner) as Arc<dyn StoreOwner>;
4522
4523        let store_object_id = {
4524            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
4525            let store = root_volume
4526                .new_volume(
4527                    "vol",
4528                    NewChildStoreOptions {
4529                        options: StoreOptions {
4530                            owner: Arc::downgrade(&owner),
4531                            crypt: Some(Arc::new(new_insecure_crypt())),
4532                        },
4533                        ..Default::default()
4534                    },
4535                )
4536                .await
4537                .expect("new_volume failed");
4538            let root_directory = Directory::open(&store, store.root_directory_object_id())
4539                .await
4540                .expect("open failed");
4541            let mut transaction = fs
4542                .clone()
4543                .new_transaction(
4544                    lock_keys![LockKey::object(
4545                        store.store_object_id(),
4546                        root_directory.object_id()
4547                    )],
4548                    Options::default(),
4549                )
4550                .await
4551                .expect("new_transaction failed");
4552            root_directory
4553                .create_child_file(&mut transaction, "test")
4554                .await
4555                .expect("create_child_file failed");
4556            transaction.commit().await.expect("commit failed");
4557            store.store_object_id()
4558        };
4559
4560        fs.close().await.expect("close failed");
4561        let device = fs.take_device().await;
4562        device.reopen(false);
4563
4564        let fs = FxFilesystem::open(device).await.expect("open failed");
4565        {
4566            let fs_clone = fs.clone();
4567            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
4568
4569            let crypt = Arc::new(new_insecure_crypt());
4570            let crypt_clone = crypt.clone();
4571            join!(
4572                async move {
4573                    // Unlock might fail, so ignore errors.
4574                    let _ = root_volume
4575                        .volume(
4576                            "vol",
4577                            StoreOptions {
4578                                owner: Arc::downgrade(&owner),
4579                                crypt: Some(crypt_clone),
4580                            },
4581                        )
4582                        .await;
4583                },
4584                async move {
4585                    // Block until unlock is finished but before flushing due to unlock is finished, to
4586                    // maximize the chances of weirdness.
4587                    let keys = lock_keys![LockKey::flush(store_object_id)];
4588                    let _ = fs_clone.lock_manager().write_lock(keys).await;
4589                    crypt.shutdown();
4590                }
4591            );
4592        }
4593
4594        fs.close().await.expect("close failed");
4595        let device = fs.take_device().await;
4596        device.reopen(false);
4597
4598        let fs = FxFilesystem::open(device).await.expect("open failed");
4599        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
4600        let store = root_volume
4601            .volume(
4602                "vol",
4603                StoreOptions {
4604                    crypt: Some(Arc::new(new_insecure_crypt())),
4605                    ..StoreOptions::default()
4606                },
4607            )
4608            .await
4609            .expect("open volume failed");
4610        let root_directory =
4611            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
4612        assert!(root_directory.lookup("test").await.expect("lookup failed").is_some());
4613
4614        fs.close().await.expect("close failed");
4615    }
4616
4617    #[fuchsia::test]
4618    async fn test_low_32_bit_object_ids() {
4619        let device = DeviceHolder::new(FakeDevice::new(16384, TEST_DEVICE_BLOCK_SIZE));
4620        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
4621
4622        {
4623            let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
4624
4625            let store = root_vol
4626                .new_volume(
4627                    "test",
4628                    NewChildStoreOptions { low_32_bit_object_ids: true, ..Default::default() },
4629                )
4630                .await
4631                .expect("new_volume failed");
4632
4633            let root_dir = Directory::open(&store, store.root_directory_object_id())
4634                .await
4635                .expect("open failed");
4636
4637            let mut ids = std::collections::HashSet::new();
4638
4639            for i in 0..100 {
4640                let mut transaction = fs
4641                    .clone()
4642                    .new_transaction(
4643                        lock_keys![LockKey::object(store.store_object_id(), root_dir.object_id())],
4644                        Options::default(),
4645                    )
4646                    .await
4647                    .expect("new_transaction failed");
4648
4649                for j in 0..100 {
4650                    let object = root_dir
4651                        .create_child_dir(&mut transaction, &format!("{i}.{j}"))
4652                        .await
4653                        .expect("create_child_file failed");
4654
4655                    assert!(object.object_id() < 1 << 32);
4656                    assert_ne!(object.object_id(), INVALID_OBJECT_ID);
4657                    assert!(ids.insert(object.object_id()));
4658                }
4659
4660                transaction.commit().await.expect("commit failed");
4661            }
4662
4663            assert_matches!(store.store_info().unwrap().last_object_id, LastObjectIdInfo::Low32Bit);
4664
4665            fsck_volume(&fs, store.store_object_id(), None).await.expect("fsck_volume failed");
4666        }
4667
4668        // Verify persistence
4669        fs.close().await.expect("Close failed");
4670        let device = fs.take_device().await;
4671        device.reopen(false);
4672        let fs = FxFilesystem::open(device).await.expect("open failed");
4673        let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
4674        let store = root_vol.volume("test", StoreOptions::default()).await.expect("volume failed");
4675
4676        // Check that we can still create files and they have low 32-bit IDs.
4677        let root_dir =
4678            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
4679        let mut transaction = fs
4680            .clone()
4681            .new_transaction(
4682                lock_keys![LockKey::object(store.store_object_id(), root_dir.object_id())],
4683                Options::default(),
4684            )
4685            .await
4686            .expect("new_transaction failed");
4687
4688        let object = root_dir
4689            .create_child_file(&mut transaction, "persistence_check")
4690            .await
4691            .expect("create_child_file failed");
4692        assert!(object.object_id() < 1 << 32);
4693
4694        transaction.commit().await.expect("commit failed");
4695
4696        assert_matches!(store.store_info().unwrap().last_object_id, LastObjectIdInfo::Low32Bit);
4697    }
4698}