Skip to main content

fxfs/
object_store.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5pub mod allocator;
6pub mod caching_object_handle;
7pub mod data_object_handle;
8pub mod directory;
9pub mod extent;
10mod extent_mapping_iterator;
11mod extent_record;
12mod flush;
13pub mod graveyard;
14mod install;
15pub mod journal;
16mod key_manager;
17pub(crate) mod merge;
18pub mod object_manager;
19pub mod object_record;
20pub mod project_id;
21mod store_object_handle;
22pub mod transaction;
23mod tree;
24mod tree_cache;
25pub mod volume;
26
27pub use data_object_handle::{
28    DataObjectHandle, DirectWriter, FileExtent, FsverityState, FsverityStateInner, RangeType,
29};
30pub use directory::Directory;
31pub use object_record::{ChildValue, DirType, ObjectDescriptor, PosixAttributes, Timestamp};
32pub use store_object_handle::{SetExtendedAttributeMode, StoreObjectHandle};
33
34use crate::errors::FxfsError;
35use crate::filesystem::{
36    ApplyContext, ApplyMode, FxFilesystem, JournalingObject, MAX_FILE_SIZE, SyncOptions,
37    TruncateGuard,
38};
39use crate::log::*;
40use crate::lsm_tree::cache::{NullCache, ObjectCache};
41use crate::lsm_tree::types::{Existence, Item, ItemRef, LayerIterator};
42use crate::lsm_tree::{LSMTree, Query};
43use crate::object_handle::{INVALID_OBJECT_ID, ObjectHandle, ObjectProperties, ReadObjectHandle};
44use crate::object_store::allocator::Allocator;
45use crate::object_store::graveyard::Graveyard;
46use crate::object_store::journal::{JournalCheckpoint, JournalCheckpointV32, JournaledTransaction};
47use crate::object_store::key_manager::KeyManager;
48use crate::object_store::transaction::{
49    AssocObj, AssociatedObject, LockKey, LockKeys, ObjectStoreMutation, Operation, Options,
50    Transaction, WriteGuard, lock_keys,
51};
52use crate::range::RangeExt;
53use crate::round::round_up;
54use crate::serialized_types::{Version, Versioned, VersionedLatest};
55use anyhow::{Context, Error, anyhow, bail, ensure};
56use async_trait::async_trait;
57use fidl_fuchsia_io as fio;
58use fprint::TypeFingerprint;
59use fuchsia_sync::Mutex;
60use fxfs_crypto::ff1::Ff1;
61use fxfs_crypto::{
62    CipherHolder, Crypt, KeyPurpose, ObjectType, StreamCipher, UnwrappedKey, WrappingKeyId,
63    key_to_cipher,
64};
65use fxfs_macros::{Migrate, migrate_to_version};
66use rand::RngCore;
67use scopeguard::ScopeGuard;
68use serde::{Deserialize, Serialize};
69use std::collections::HashSet;
70use std::fmt;
71use std::num::NonZero;
72use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
73use std::sync::{Arc, OnceLock, Weak};
74use storage_device::Device;
75use uuid::Uuid;
76
77pub use extent::Extent;
78pub use extent_record::{ExtentMode, ExtentValue};
79pub use object_record::{
80    AttributeId, AttributeKey, EncryptionKey, EncryptionKeys, ExtendedAttributeValue,
81    FsverityMetadata, FxfsKey, FxfsKeyV40, FxfsKeyV49, ObjectAttributes, ObjectKey, ObjectKeyData,
82    ObjectKind, ObjectValue, ProjectProperty, RootDigest,
83};
84pub use project_id::{ProjectId, ProjectIdExt};
85pub use transaction::Mutation;
86
87// For encrypted stores, the lower 32 bits of the object ID are encrypted to make side-channel
88// attacks more difficult. This mask can be used to extract the hi part of the object ID.
89const OBJECT_ID_HI_MASK: u64 = 0xffffffff00000000;
90
91// At time of writing, this threshold limits transactions that delete extents to about 10,000 bytes.
92const TRANSACTION_MUTATION_THRESHOLD: usize = 200;
93
94// The number of keys we want to pre-cache for flushes.  We cache up to 2 keys.  One is for the
95// current flush, and one is pre-cached for the next flush so that we don't need to call the crypt
96// service during a flush.  To understand why, consider two threads T1 and T2.  Just prior to
97// committing a transaction, T1 ensures there are two keys.  Then, before T1 has committed the
98// transaction, T2 flushes and consumes one of the keys.  T1 then commits the transaction. The next
99// time a flush occurs, there's a key ready. If T1 had only ensured there was one key, there'd be no
100// key.
101const CACHED_KEYS_LIMIT: usize = 1;
102
103// Encrypted files and directories use the fscrypt key (identified by `FSCRYPT_KEY_ID`) to encrypt
104// file contents and filenames respectively. All non-fscrypt encrypted files otherwise default to
105// using the `VOLUME_DATA_KEY_ID` key. Note, the filesystem always uses the `VOLUME_DATA_KEY_ID`
106// key to encrypt large extended attributes. Thus, encrypted files and directories with large
107// xattrs will have both an fscrypt and volume data key.
108pub const VOLUME_DATA_KEY_ID: u64 = 0;
109pub const FSCRYPT_KEY_ID: u64 = 1;
110
111/// DataObjectHandle stores an owner that must implement this trait, which allows the handle to get
112/// back to an ObjectStore.
113pub trait HandleOwner: AsRef<ObjectStore> + Send + Sync + 'static {}
114
115/// StoreInfo stores information about the object store.  This is stored within the parent object
116/// store, and is used, for example, to get the persistent layer objects.
117pub type StoreInfo = StoreInfoV52;
118
119#[derive(Clone, Debug, Default, Serialize, Deserialize, TypeFingerprint, Versioned)]
120pub struct StoreInfoV52 {
121    /// The globally unique identifier for the associated object store. If unset, will be all zero.
122    guid: [u8; 16],
123
124    /// The last used object ID.  Note that this field is not accurate in memory; ObjectStore's
125    /// last_object_id field is the one to use in that case.  Technically, this might not be the
126    /// last object ID used for the latest transaction that created an object because we use this at
127    /// the point of creating the object but before we commit the transaction.  Transactions can
128    /// then get committed in an arbitrary order (or not at all).
129    last_object_id: LastObjectIdInfo,
130
131    /// Object ids for layers.  TODO(https://fxbug.dev/42178036): need a layer of indirection here
132    /// so we can support snapshots.
133    pub layers: Vec<u64>,
134
135    /// The object ID for the root directory.
136    root_directory_object_id: u64,
137
138    /// The object ID for the graveyard.
139    graveyard_directory_object_id: u64,
140
141    /// The number of live objects in the store.  This should *not* be trusted; it can be invalid
142    /// due to filesystem inconsistencies.
143    object_count: u64,
144
145    /// The (wrapped) key that encrypted mutations should use.
146    mutations_key: Option<FxfsKeyV49>,
147
148    /// Mutations for the store are encrypted using a stream cipher.  To decrypt the mutations, we
149    /// need to know the offset in the cipher stream to start it.
150    mutations_cipher_offset: u64,
151
152    /// If we have to flush the store whilst we do not have the key, we need to write the encrypted
153    /// mutations to an object. This is the object ID of that file if it exists.
154    pub encrypted_mutations_object_id: u64,
155
156    /// A directory for storing internal files in a directory structure. Holds INVALID_OBJECT_ID
157    /// when the directory doesn't yet exist.
158    internal_directory_object_id: u64,
159}
160
161#[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint)]
162enum LastObjectIdInfo {
163    Unencrypted {
164        id: u64,
165    },
166    Encrypted {
167        /// The *unencrypted* value of the last object ID.
168        id: u64,
169
170        /// Object IDs are encrypted to reduce the amount of information that sequential object IDs
171        /// reveal (such as the number of files in the system and the ordering of their creation in
172        /// time).  Only the bottom 32 bits of the object ID are encrypted whilst the top 32 bits
173        /// will increment after 2^32 object IDs have been used and this allows us to roll the key.
174        key: FxfsKeyV49,
175    },
176    Low32Bit,
177}
178
179impl Default for LastObjectIdInfo {
180    fn default() -> Self {
181        LastObjectIdInfo::Unencrypted { id: 0 }
182    }
183}
184
185#[derive(Default, Serialize, Deserialize, TypeFingerprint, Versioned)]
186pub struct StoreInfoV49 {
187    guid: [u8; 16],
188    last_object_id: u64,
189    layers: Vec<u64>,
190    root_directory_object_id: u64,
191    graveyard_directory_object_id: u64,
192    object_count: u64,
193    mutations_key: Option<FxfsKeyV49>,
194    mutations_cipher_offset: u64,
195    encrypted_mutations_object_id: u64,
196    object_id_key: Option<FxfsKeyV49>,
197    internal_directory_object_id: u64,
198}
199
200impl From<StoreInfoV49> for StoreInfoV52 {
201    fn from(value: StoreInfoV49) -> Self {
202        Self {
203            guid: value.guid,
204            last_object_id: if let Some(key) = value.object_id_key {
205                LastObjectIdInfo::Encrypted { id: value.last_object_id, key: key }
206            } else {
207                LastObjectIdInfo::Unencrypted { id: value.last_object_id }
208            },
209            layers: value.layers,
210            root_directory_object_id: value.root_directory_object_id,
211            graveyard_directory_object_id: value.graveyard_directory_object_id,
212            object_count: value.object_count,
213            mutations_key: value.mutations_key,
214            mutations_cipher_offset: value.mutations_cipher_offset,
215            encrypted_mutations_object_id: value.encrypted_mutations_object_id,
216            internal_directory_object_id: value.internal_directory_object_id,
217        }
218    }
219}
220
221#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
222#[migrate_to_version(StoreInfoV49)]
223pub struct StoreInfoV40 {
224    guid: [u8; 16],
225    last_object_id: u64,
226    layers: Vec<u64>,
227    root_directory_object_id: u64,
228    graveyard_directory_object_id: u64,
229    object_count: u64,
230    mutations_key: Option<FxfsKeyV40>,
231    mutations_cipher_offset: u64,
232    encrypted_mutations_object_id: u64,
233    object_id_key: Option<FxfsKeyV40>,
234    internal_directory_object_id: u64,
235}
236
237impl StoreInfo {
238    /// Returns the parent objects for this store.
239    pub fn parent_objects(&self) -> Vec<u64> {
240        // We should not include the ID of the store itself, since that should be referred to in the
241        // volume directory.
242        let mut objects = self.layers.to_vec();
243        if self.encrypted_mutations_object_id != INVALID_OBJECT_ID {
244            objects.push(self.encrypted_mutations_object_id);
245        }
246        objects
247    }
248}
249
250// TODO(https://fxbug.dev/42178037): We should test or put checks in place to ensure this limit isn't exceeded.
251// It will likely involve placing limits on the maximum number of layers.
252pub const MAX_STORE_INFO_SERIALIZED_SIZE: usize = 131072;
253
254// This needs to be large enough to accommodate the maximum amount of unflushed data (data that is
255// in the journal but hasn't yet been written to layer files) for a store.  We set a limit because
256// we want to limit the amount of memory use in the case the filesystem is corrupt or under attack.
257pub const MAX_ENCRYPTED_MUTATIONS_SIZE: usize = 8 * journal::DEFAULT_RECLAIM_SIZE as usize;
258
259#[derive(Default)]
260pub struct HandleOptions {
261    /// If true, transactions used by this handle will skip journal space checks.
262    pub skip_journal_checks: bool,
263    /// If true, data written to any attribute of this handle will not have per-block checksums
264    /// computed.
265    pub skip_checksums: bool,
266    /// If true, any files using fsverity will not attempt to perform any verification. This is
267    /// useful to open an object without the correct encryption keys to look at the metadata.
268    pub skip_fsverity: bool,
269}
270
271/// Parameters for encrypting a newly created object.
272pub struct ObjectEncryptionOptions {
273    /// If set, the keys are treated as permanent and never evicted from the KeyManager cache.
274    /// This is necessary when keys are managed by another store; for example, the layer files
275    /// of a child store are objects in the root store, but they are encrypted with keys from the
276    /// child store.  Generally, most objects should have this set to `false`.
277    pub permanent: bool,
278    pub key_id: u64,
279    pub key: EncryptionKey,
280    pub unwrapped_key: UnwrappedKey,
281}
282
283pub struct StoreOptions {
284    /// The store is unencrypted if store is none.
285    pub crypt: Option<Arc<dyn Crypt>>,
286}
287
288impl Default for StoreOptions {
289    fn default() -> Self {
290        Self { crypt: None }
291    }
292}
293
294#[derive(Default)]
295pub struct NewChildStoreOptions {
296    pub options: StoreOptions,
297
298    /// Specifies the object ID in the root store to be used for the store.  If set to
299    /// INVALID_OBJECT_ID (the default and typical case), a suitable ID will be chosen.
300    pub object_id: u64,
301
302    /// If true, reserve all 32 bit object_ids.  All new objects will start with IDs exceeding
303    /// 0x1_0000_0000.
304    pub reserve_32bit_object_ids: bool,
305
306    /// Object IDs will be restricted to 32 bits.  This involves a less performant algorithm and so
307    /// should not be used unless necessary.
308    pub low_32_bit_object_ids: bool,
309
310    /// If set, use this GUID for the new store.
311    pub guid: Option<[u8; 16]>,
312}
313
314pub type EncryptedMutations = EncryptedMutationsV49;
315
316#[derive(Clone, Default, Deserialize, Serialize, TypeFingerprint)]
317pub struct EncryptedMutationsV49 {
318    // Information about the mutations are held here, but the actual encrypted data is held within
319    // data.  For each transaction, we record the checkpoint and the count of mutations within the
320    // transaction.  The checkpoint is required for the log file offset (which we need to apply the
321    // mutations), and the version so that we can correctly decode the mutation after it has been
322    // decrypted. The count specifies the number of serialized mutations encoded in |data|.
323    transactions: Vec<(JournalCheckpointV32, u64)>,
324
325    // The encrypted mutations.
326    #[serde(with = "crate::zerocopy_serialization")]
327    data: Vec<u8>,
328
329    // If the mutations key was rolled, this holds the offset in `data` where the new key should
330    // apply.
331    mutations_key_roll: Vec<(usize, FxfsKeyV49)>,
332}
333
334impl std::fmt::Debug for EncryptedMutations {
335    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
336        f.debug_struct("EncryptedMutations")
337            .field("transactions", &self.transactions)
338            .field("len", &self.data.len())
339            .field(
340                "mutations_key_roll",
341                &self.mutations_key_roll.iter().map(|k| k.0).collect::<Vec<usize>>(),
342            )
343            .finish()
344    }
345}
346
347impl Versioned for EncryptedMutations {
348    fn max_serialized_size() -> Option<u64> {
349        Some(MAX_ENCRYPTED_MUTATIONS_SIZE as u64)
350    }
351}
352
353impl From<EncryptedMutationsV40> for EncryptedMutationsV49 {
354    fn from(value: EncryptedMutationsV40) -> Self {
355        EncryptedMutationsV49 {
356            transactions: value.transactions,
357            data: value.data,
358            mutations_key_roll: value
359                .mutations_key_roll
360                .into_iter()
361                .map(|(offset, key)| (offset, key.into()))
362                .collect(),
363        }
364    }
365}
366
367#[derive(Deserialize, Serialize, TypeFingerprint)]
368pub struct EncryptedMutationsV40 {
369    transactions: Vec<(JournalCheckpointV32, u64)>,
370    data: Vec<u8>,
371    mutations_key_roll: Vec<(usize, FxfsKeyV40)>,
372}
373
374impl Versioned for EncryptedMutationsV40 {
375    fn max_serialized_size() -> Option<u64> {
376        Some(MAX_ENCRYPTED_MUTATIONS_SIZE as u64)
377    }
378}
379
380impl EncryptedMutations {
381    fn from_replayed_mutations(
382        store_object_id: u64,
383        transactions: Vec<JournaledTransaction>,
384    ) -> Self {
385        let mut this = Self::default();
386        for JournaledTransaction { checkpoint, non_root_mutations, .. } in transactions {
387            for (object_id, mutation) in non_root_mutations {
388                if store_object_id == object_id {
389                    if let Mutation::EncryptedObjectStore(data) = mutation {
390                        this.push(&checkpoint, data);
391                    } else if let Mutation::UpdateMutationsKey(key) = mutation {
392                        this.mutations_key_roll.push((this.data.len(), key.into()));
393                    }
394                }
395            }
396        }
397        this
398    }
399
400    fn extend(&mut self, other: &EncryptedMutations) {
401        self.transactions.extend_from_slice(&other.transactions[..]);
402        self.mutations_key_roll.extend(
403            other
404                .mutations_key_roll
405                .iter()
406                .map(|(offset, key)| (offset + self.data.len(), key.clone())),
407        );
408        self.data.extend_from_slice(&other.data[..]);
409    }
410
411    fn push(&mut self, checkpoint: &JournalCheckpoint, data: Box<[u8]>) {
412        self.data.append(&mut data.into());
413        // If the checkpoint is the same as the last mutation we pushed, increment the count.
414        if let Some((last_checkpoint, count)) = self.transactions.last_mut() {
415            if last_checkpoint.file_offset == checkpoint.file_offset {
416                *count += 1;
417                return;
418            }
419        }
420        self.transactions.push((checkpoint.clone(), 1));
421    }
422}
423
424pub enum LockState {
425    Locked,
426    Unencrypted,
427    Unlocked {
428        crypt: Arc<dyn Crypt>,
429        cached_keys: Vec<(NonZero<u64>, EncryptionKey, UnwrappedKey)>,
430    },
431
432    // The store is unlocked, but in a read-only state, and no flushes or other operations will be
433    // performed on the store.
434    UnlockedReadOnly(Arc<dyn Crypt>),
435
436    // The store is encrypted but is now in an unusable state (due to a failure to sync the journal
437    // after locking the store).  The store cannot be unlocked.
438    Invalid,
439
440    // Before we've read the StoreInfo we might not know whether the store is Locked or Unencrypted.
441    // This can happen when lazily opening stores (ObjectManager::lazy_open_store).
442    Unknown,
443
444    // The store is in the process of being locked.  Whilst the store is being locked, the store
445    // isn't usable; assertions will trip if any mutations are applied.
446    Locking,
447
448    // Whilst we're unlocking, we will replay encrypted mutations.  The store isn't usable until
449    // it's in the Unlocked state.
450    Unlocking,
451
452    // The store has been deleted.
453    Deleted,
454}
455
456impl fmt::Debug for LockState {
457    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
458        formatter.write_str(match self {
459            LockState::Locked => "Locked",
460            LockState::Unencrypted => "Unencrypted",
461            LockState::Unlocked { .. } => "Unlocked",
462            LockState::UnlockedReadOnly(..) => "UnlockedReadOnly",
463            LockState::Invalid => "Invalid",
464            LockState::Unknown => "Unknown",
465            LockState::Locking => "Locking",
466            LockState::Unlocking => "Unlocking",
467            LockState::Deleted => "Deleted",
468        })
469    }
470}
471
472enum LastObjectId {
473    // This is used when the store is encrypted, but the key and ID isn't yet available.
474    Pending,
475
476    Unencrypted {
477        id: u64,
478    },
479
480    Encrypted {
481        // The *unencrypted* value of the last object ID.
482        id: u64,
483
484        // Encrypted stores will use a cipher to obfuscate the object ID.
485        cipher: Box<Ff1>,
486    },
487
488    Low32Bit {
489        reserved: HashSet<u32>,
490        unreserved: Vec<u32>,
491    },
492}
493
494impl LastObjectId {
495    /// Returns true if object IDs require reservations.
496    fn uses_reserved_ids(&self) -> bool {
497        matches!(self, LastObjectId::Low32Bit { .. })
498    }
499
500    /// Tries to get the next object ID.  Returns None if a new cipher is required because all
501    /// object IDs that can be generated with the current cipher have been exhausted, or if only
502    /// using the lower 32 bits which requires an async algorithm.
503    fn try_get_next(&mut self) -> Option<NonZero<u64>> {
504        match self {
505            LastObjectId::Unencrypted { id } => {
506                NonZero::new(id.wrapping_add(1)).inspect(|next| *id = next.get())
507            }
508            LastObjectId::Encrypted { id, cipher } => {
509                let mut next = *id;
510                let hi = next & OBJECT_ID_HI_MASK;
511                loop {
512                    if next as u32 == u32::MAX {
513                        return None;
514                    }
515                    next += 1;
516                    let candidate = hi | cipher.encrypt(next as u32) as u64;
517                    if let Some(candidate) = NonZero::new(candidate) {
518                        *id = next;
519                        return Some(candidate);
520                    }
521                }
522            }
523            _ => None,
524        }
525    }
526
527    /// Returns INVALID_OBJECT_ID if it's not possible to peek at the next object ID.
528    fn peek_next(&self) -> u64 {
529        match self {
530            LastObjectId::Unencrypted { id } => id.wrapping_add(1),
531            LastObjectId::Encrypted { id, cipher } => {
532                let mut next = *id;
533                let hi = next & OBJECT_ID_HI_MASK;
534                loop {
535                    if next as u32 == u32::MAX {
536                        return INVALID_OBJECT_ID;
537                    }
538                    next += 1;
539                    let candidate = hi | cipher.encrypt(next as u32) as u64;
540                    if candidate != INVALID_OBJECT_ID {
541                        return candidate;
542                    }
543                }
544            }
545            _ => INVALID_OBJECT_ID,
546        }
547    }
548
549    /// Returns INVALID_OBJECT_ID for algorithms that don't use the last ID.
550    fn id(&self) -> u64 {
551        match self {
552            LastObjectId::Unencrypted { id } | LastObjectId::Encrypted { id, .. } => *id,
553            _ => INVALID_OBJECT_ID,
554        }
555    }
556
557    /// Returns true if `id` is reserved (it must be 32 bits).
558    fn is_reserved(&self, id: u64) -> bool {
559        match self {
560            LastObjectId::Low32Bit { reserved, .. } => {
561                if let Ok(id) = id.try_into() {
562                    reserved.contains(&id)
563                } else {
564                    false
565                }
566            }
567            _ => false,
568        }
569    }
570
571    /// Reserves `id`.
572    fn reserve(&mut self, id: u64) {
573        match self {
574            LastObjectId::Low32Bit { reserved, .. } => {
575                assert!(reserved.insert(id.try_into().unwrap()))
576            }
577            _ => unreachable!(),
578        }
579    }
580
581    /// Unreserves `id`.
582    fn unreserve(&mut self, id: u64) {
583        match self {
584            LastObjectId::Low32Bit { unreserved, .. } => {
585                // To avoid races, where a reserved ID transitions from being reserved to being
586                // actually used in a committed transaction, we delay updating `reserved` until a
587                // suitable point.
588                //
589                // On thread A, we might have:
590                //
591                //   A1. Commit transaction (insert a record into the LSM tree that uses ID)
592                //   A2. `unreserve`
593                //
594                // And on another thread B, we might have:
595                //
596                //   B1. Drain `unreserved`.
597                //   B2. Check tree and `reserved` to see if ID is used.
598                //
599                // B2 will involve calling `LsmTree::layer_set` which should be thought of as a
600                // snapshot, so the change A1 might not be visible to thread B, but it won't matter
601                // because `reserved` will still include the ID.  So long as each thread does the
602                // operations in this order, it should be safe.
603                unreserved.push(id.try_into().unwrap())
604            }
605            _ => {}
606        }
607    }
608
609    /// Removes `unreserved` IDs from the `reserved` list.
610    fn drain_unreserved(&mut self) {
611        match self {
612            LastObjectId::Low32Bit { reserved, unreserved } => {
613                for u in unreserved.drain(..) {
614                    assert!(reserved.remove(&u));
615                }
616            }
617            _ => {}
618        }
619    }
620}
621
622pub struct ReservedId<'a>(&'a ObjectStore, NonZero<u64>);
623
624impl<'a> ReservedId<'a> {
625    pub fn new(store: &'a ObjectStore, id: NonZero<u64>) -> Self {
626        Self(store, id)
627    }
628
629    pub fn get(&self) -> u64 {
630        self.1.get()
631    }
632
633    /// The caller takes responsibility for this id.
634    #[must_use]
635    pub fn release(self) -> NonZero<u64> {
636        let id = self.1;
637        std::mem::forget(self);
638        id
639    }
640}
641
642impl Drop for ReservedId<'_> {
643    fn drop(&mut self) {
644        self.0.last_object_id.lock().unreserve(self.1.get());
645    }
646}
647
648/// An object store supports a file like interface for objects.  Objects are keyed by a 64 bit
649/// identifier.  And object store has to be backed by a parent object store (which stores metadata
650/// for the object store).  The top-level object store (a.k.a. the root parent object store) is
651/// in-memory only.
652pub struct ObjectStore {
653    parent_store: Option<Arc<ObjectStore>>,
654    store_object_id: u64,
655    device: Arc<dyn Device>,
656    block_size: u64,
657    filesystem: Weak<FxFilesystem>,
658    // Lock ordering: This must be taken before `lock_state`.
659    store_info: Mutex<Option<StoreInfo>>,
660    tree: LSMTree<ObjectKey, ObjectValue>,
661
662    // When replaying the journal, the store cannot read StoreInfo until the whole journal
663    // has been replayed, so during that time, store_info_handle will be None and records
664    // just get sent to the tree. Once the journal has been replayed, we can open the store
665    // and load all the other layer information.
666    store_info_handle: OnceLock<DataObjectHandle<ObjectStore>>,
667
668    // The cipher to use for encrypted mutations, if this store is encrypted.
669    mutations_cipher: Mutex<Option<StreamCipher>>,
670
671    // Current lock state of the store.
672    // Lock ordering: This must be taken after `store_info`.
673    lock_state: Mutex<LockState>,
674    pub key_manager: KeyManager,
675
676    // Enable/disable tracing.
677    trace: AtomicBool,
678
679    // Informational counters for events occurring within the store.
680    counters: Mutex<ObjectStoreCounters>,
681
682    // These are updated in performance-sensitive code paths so we use atomics instead of counters.
683    device_read_ops: AtomicU64,
684    device_write_ops: AtomicU64,
685    logical_read_ops: AtomicU64,
686    logical_write_ops: AtomicU64,
687    graveyard_entries: AtomicU64,
688
689    // Contains the last object ID and, optionally, a cipher to be used when generating new object
690    // IDs.
691    last_object_id: Mutex<LastObjectId>,
692
693    // An optional callback to be invoked each time the ObjectStore flushes.  The callback is
694    // invoked at the end of flush, while the write lock is still held.
695    flush_callback: Mutex<Option<Box<dyn Fn(&ObjectStore) + Send + Sync + 'static>>>,
696}
697
698#[derive(Clone, Default)]
699struct ObjectStoreCounters {
700    mutations_applied: u64,
701    mutations_dropped: u64,
702    num_flushes: u64,
703    last_flush_time: Option<std::time::SystemTime>,
704}
705
706impl ObjectStore {
707    fn new(
708        parent_store: Option<Arc<ObjectStore>>,
709        store_object_id: u64,
710        filesystem: Arc<FxFilesystem>,
711        store_info: Option<StoreInfo>,
712        object_cache: Box<dyn ObjectCache<ObjectKey, ObjectValue>>,
713        mutations_cipher: Option<StreamCipher>,
714        lock_state: LockState,
715        last_object_id: LastObjectId,
716    ) -> Arc<ObjectStore> {
717        let device = filesystem.device();
718        let block_size = filesystem.block_size();
719        Arc::new(ObjectStore {
720            parent_store,
721            store_object_id,
722            device,
723            block_size,
724            filesystem: Arc::downgrade(&filesystem),
725            store_info: Mutex::new(store_info),
726            tree: LSMTree::new(merge::merge, object_cache),
727            store_info_handle: OnceLock::new(),
728            mutations_cipher: Mutex::new(mutations_cipher),
729            lock_state: Mutex::new(lock_state),
730            key_manager: KeyManager::new(),
731            trace: AtomicBool::new(false),
732            counters: Mutex::new(ObjectStoreCounters::default()),
733            device_read_ops: AtomicU64::new(0),
734            device_write_ops: AtomicU64::new(0),
735            logical_read_ops: AtomicU64::new(0),
736            logical_write_ops: AtomicU64::new(0),
737            graveyard_entries: AtomicU64::new(0),
738            last_object_id: Mutex::new(last_object_id),
739            flush_callback: Mutex::new(None),
740        })
741    }
742
743    fn new_empty(
744        parent_store: Option<Arc<ObjectStore>>,
745        store_object_id: u64,
746        filesystem: Arc<FxFilesystem>,
747        object_cache: Box<dyn ObjectCache<ObjectKey, ObjectValue>>,
748    ) -> Arc<Self> {
749        Self::new(
750            parent_store,
751            store_object_id,
752            filesystem,
753            Some(StoreInfo::default()),
754            object_cache,
755            None,
756            LockState::Unencrypted,
757            LastObjectId::Unencrypted { id: 0 },
758        )
759    }
760
761    /// Cycle breaker constructor that returns an ObjectStore without a filesystem.
762    /// This should only be used from super block code.
763    pub fn new_root_parent(device: Arc<dyn Device>, block_size: u64, store_object_id: u64) -> Self {
764        ObjectStore {
765            parent_store: None,
766            store_object_id,
767            device,
768            block_size,
769            filesystem: Weak::<FxFilesystem>::new(),
770            store_info: Mutex::new(Some(StoreInfo::default())),
771            tree: LSMTree::new(merge::merge, Box::new(NullCache {})),
772            store_info_handle: OnceLock::new(),
773            mutations_cipher: Mutex::new(None),
774            lock_state: Mutex::new(LockState::Unencrypted),
775            key_manager: KeyManager::new(),
776            trace: AtomicBool::new(false),
777            counters: Mutex::new(ObjectStoreCounters::default()),
778            device_read_ops: AtomicU64::new(0),
779            device_write_ops: AtomicU64::new(0),
780            logical_read_ops: AtomicU64::new(0),
781            logical_write_ops: AtomicU64::new(0),
782            graveyard_entries: AtomicU64::new(0),
783            last_object_id: Mutex::new(LastObjectId::Unencrypted { id: 0 }),
784            flush_callback: Mutex::new(None),
785        }
786    }
787
788    /// Used to set filesystem on root_parent stores at bootstrap time after the filesystem has
789    /// been created.
790    pub fn attach_filesystem(mut this: ObjectStore, filesystem: Arc<FxFilesystem>) -> ObjectStore {
791        this.filesystem = Arc::downgrade(&filesystem);
792        this
793    }
794
795    /// Acquires appropriate locks and starts a new transaction.  A transaction should be associated
796    /// with a store, even though it may have mutations for other objects and parent stores.
797    pub async fn new_transaction<'a>(
798        &self,
799        locks: LockKeys,
800        options: Options<'a>,
801    ) -> Result<Transaction<'a>, Error> {
802        if !options.skip_key_roll && self.needs_mutations_key_roll() {
803            if let Some(crypt) = self.crypt() {
804                let keys = lock_keys![LockKey::mutations_key_roll(self.store_object_id())];
805                let fs = self.filesystem();
806                let _guard = fs.lock_manager().write_lock(keys).await;
807                if self.needs_mutations_key_roll() {
808                    self.roll_mutations_key(crypt.as_ref()).await?;
809                }
810            }
811        }
812        let fs = self.filesystem();
813        Transaction::new(fs, options, locks).await
814    }
815
816    /// Ensures that `cached_keys` is filled up to `CACHED_KEYS_LIMIT`.
817    async fn pre_cache_keys(&self) -> Result<(), Error> {
818        let crypt = match &*self.lock_state.lock() {
819            LockState::Unlocked { cached_keys, crypt, .. }
820                if cached_keys.len() < CACHED_KEYS_LIMIT =>
821            {
822                crypt.clone()
823            }
824            _ => return Ok(()),
825        };
826        loop {
827            let parent_store = self.parent_store.as_ref().unwrap();
828
829            // We assert that the parent store is not using object IDs that need reservation, since
830            // if it did, release() would leak the reservation below.
831            assert!(!parent_store.last_object_id.lock().uses_reserved_ids());
832
833            // Allocate a raw ID from the parent store.  Since the parent store is unencrypted, this
834            // is fast and won't block.
835            let raw_id = {
836                let reserved_id = parent_store
837                    .maybe_get_next_object_id()
838                    .expect("maybe_get_next_object_id failed on parent store");
839                reserved_id.release()
840            };
841
842            let (wrapped, unwrapped) = match crypt.create_key(raw_id.get(), KeyPurpose::Data).await
843            {
844                Ok(v) => v,
845                Err(error) => {
846                    log::warn!(
847                        error:?,
848                        store_id = self.store_object_id();
849                        "Failed to pre-cache key"
850                    );
851                    return Err(error.into());
852                }
853            };
854
855            let mut lock_state = self.lock_state.lock();
856            if let LockState::Unlocked { cached_keys, .. } = &mut *lock_state {
857                cached_keys.push((raw_id, EncryptionKey::Fxfs(wrapped), unwrapped));
858                if cached_keys.len() >= CACHED_KEYS_LIMIT {
859                    break;
860                }
861            } else {
862                // Store was locked while we were awaiting; discard the key.
863                break;
864            }
865        }
866        Ok(())
867    }
868
869    /// Create a child store. It is a multi-step process:
870    ///
871    ///   1. Call `ObjectStore::new_child_store`.
872    ///   2. Register the store with the object-manager.
873    ///   3. Call `ObjectStore::create` to write the store-info.
874    ///
875    /// If the procedure fails, care must be taken to unregister store with the object-manager.
876    ///
877    /// The steps have to be separate because of lifetime issues when working with a transaction.
878    async fn new_child_store(
879        self: &Arc<Self>,
880        transaction: &mut Transaction<'_>,
881        options: NewChildStoreOptions,
882        object_cache: Box<dyn ObjectCache<ObjectKey, ObjectValue>>,
883    ) -> Result<Arc<Self>, Error> {
884        ensure!(
885            !options.reserve_32bit_object_ids || !options.low_32_bit_object_ids,
886            FxfsError::InvalidArgs
887        );
888        let handle = if let Some(object_id) = NonZero::new(options.object_id) {
889            self.update_last_object_id(object_id.get());
890            let handle = ObjectStore::create_object_with_id(
891                self,
892                transaction,
893                ReservedId::new(self, object_id),
894                HandleOptions::default(),
895                None,
896            )?;
897            handle
898        } else {
899            ObjectStore::create_object(self, transaction, HandleOptions::default(), None).await?
900        };
901        let filesystem = self.filesystem();
902        let id = if options.reserve_32bit_object_ids { 0x1_0000_0000 } else { 0 };
903        let (last_object_id, last_object_id_in_memory) = if options.low_32_bit_object_ids {
904            (
905                LastObjectIdInfo::Low32Bit,
906                LastObjectId::Low32Bit { reserved: HashSet::new(), unreserved: Vec::new() },
907            )
908        } else if let Some(crypt) = &options.options.crypt {
909            let (object_id_wrapped, object_id_unwrapped) =
910                crypt.create_key(handle.object_id(), KeyPurpose::Metadata).await?;
911            (
912                LastObjectIdInfo::Encrypted { id, key: object_id_wrapped },
913                LastObjectId::Encrypted { id, cipher: Box::new(Ff1::new(&object_id_unwrapped)) },
914            )
915        } else {
916            (LastObjectIdInfo::Unencrypted { id }, LastObjectId::Unencrypted { id })
917        };
918        let store = if let Some(crypt) = options.options.crypt {
919            let (wrapped_key, unwrapped_key) =
920                crypt.create_key(handle.object_id(), KeyPurpose::Metadata).await?;
921            Self::new(
922                Some(self.clone()),
923                handle.object_id(),
924                filesystem.clone(),
925                Some(StoreInfo {
926                    mutations_key: Some(wrapped_key),
927                    last_object_id,
928                    guid: options.guid.unwrap_or_else(|| *Uuid::new_v4().as_bytes()),
929                    ..Default::default()
930                }),
931                object_cache,
932                Some(StreamCipher::new(&unwrapped_key, 0)),
933                LockState::Unlocked { crypt, cached_keys: Vec::new() },
934                last_object_id_in_memory,
935            )
936        } else {
937            Self::new(
938                Some(self.clone()),
939                handle.object_id(),
940                filesystem.clone(),
941                Some(StoreInfo {
942                    last_object_id,
943                    guid: options.guid.unwrap_or_else(|| *Uuid::new_v4().as_bytes()),
944                    ..Default::default()
945                }),
946                object_cache,
947                None,
948                LockState::Unencrypted,
949                last_object_id_in_memory,
950            )
951        };
952        assert!(store.store_info_handle.set(handle).is_ok());
953        Ok(store)
954    }
955
956    /// Actually creates the store in a transaction.  This will also create a root directory and
957    /// graveyard directory for the store.  See `new_child_store` above.
958    async fn create<'a>(
959        self: &'a Arc<Self>,
960        transaction: &mut Transaction<'a>,
961    ) -> Result<(), Error> {
962        let buf = {
963            // Create a root directory and graveyard directory.
964            let graveyard_directory_object_id = Graveyard::create(transaction, &self).await?;
965            let root_directory = Directory::create(transaction, &self, None).await?;
966
967            let serialized_info = {
968                let mut store_info = self.store_info.lock();
969                let store_info = store_info.as_mut().unwrap();
970
971                store_info.graveyard_directory_object_id = graveyard_directory_object_id;
972                store_info.root_directory_object_id = root_directory.object_id();
973
974                let mut serialized_info = Vec::new();
975                store_info.serialize_with_version(&mut serialized_info)?;
976                serialized_info
977            };
978            let mut buf = self.device.allocate_buffer(serialized_info.len()).await;
979            buf.as_mut_slice().copy_from_slice(&serialized_info[..]);
980            buf
981        };
982
983        if self.filesystem().options().image_builder_mode.is_some() {
984            // If we're in image builder mode, we want to avoid writing to disk unless explicitly
985            // asked to. New object stores will have their StoreInfo written when we compact in
986            // FxFilesystem::finalize().
987            Ok(())
988        } else {
989            self.store_info_handle.get().unwrap().txn_write(transaction, 0u64, buf.as_ref()).await
990        }
991    }
992
993    pub fn set_trace(&self, trace: bool) {
994        let old_value = self.trace.swap(trace, Ordering::Relaxed);
995        if trace != old_value {
996            info!(store_id = self.store_object_id(), trace; "OS: trace",);
997        }
998    }
999
1000    /// Sets a callback to be invoked each time the ObjectStore flushes.  The callback is invoked at
1001    /// the end of flush, while the write lock is still held.
1002    pub fn set_flush_callback<F: Fn(&ObjectStore) + Send + Sync + 'static>(&self, callback: F) {
1003        let mut flush_callback = self.flush_callback.lock();
1004        *flush_callback = Some(Box::new(callback));
1005    }
1006
1007    pub fn is_root(&self) -> bool {
1008        if let Some(parent) = &self.parent_store {
1009            parent.parent_store.is_none()
1010        } else {
1011            // The root parent store isn't the root store.
1012            false
1013        }
1014    }
1015
1016    /// Populates an inspect node with store statistics.
1017    pub fn record_data(self: &Arc<Self>, root: &fuchsia_inspect::Node) {
1018        // TODO(https://fxbug.dev/42069513): Push-back or rate-limit to prevent DoS.
1019        let counters = self.counters.lock();
1020        if let Some(store_info) = self.store_info() {
1021            root.record_string("guid", Uuid::from_bytes(store_info.guid).to_string());
1022        };
1023        root.record_uint("store_object_id", self.store_object_id);
1024        root.record_uint("mutations_applied", counters.mutations_applied);
1025        root.record_uint("mutations_dropped", counters.mutations_dropped);
1026        root.record_uint("num_flushes", counters.num_flushes);
1027        if let Some(last_flush_time) = counters.last_flush_time.as_ref() {
1028            root.record_uint(
1029                "last_flush_time_ms",
1030                last_flush_time
1031                    .duration_since(std::time::UNIX_EPOCH)
1032                    .unwrap_or(std::time::Duration::ZERO)
1033                    .as_millis()
1034                    .try_into()
1035                    .unwrap_or(0u64),
1036            );
1037        }
1038        root.record_uint("device_read_ops", self.device_read_ops.load(Ordering::Relaxed));
1039        root.record_uint("device_write_ops", self.device_write_ops.load(Ordering::Relaxed));
1040        root.record_uint("logical_read_ops", self.logical_read_ops.load(Ordering::Relaxed));
1041        root.record_uint("logical_write_ops", self.logical_write_ops.load(Ordering::Relaxed));
1042        root.record_uint("graveyard_entries", self.graveyard_entries.load(Ordering::Relaxed));
1043        {
1044            let last_object_id = self.last_object_id.lock();
1045            root.record_uint("object_id_hi", last_object_id.id() >> 32);
1046            root.record_bool(
1047                "low_32_bit_object_ids",
1048                matches!(&*last_object_id, LastObjectId::Low32Bit { .. }),
1049            );
1050        }
1051
1052        let this = self.clone();
1053        root.record_child("lsm_tree", move |node| this.tree().record_inspect_data(node));
1054    }
1055
1056    pub fn device(&self) -> &Arc<dyn Device> {
1057        &self.device
1058    }
1059
1060    pub fn block_size(&self) -> u64 {
1061        self.block_size
1062    }
1063
1064    pub fn filesystem(&self) -> Arc<FxFilesystem> {
1065        self.filesystem.upgrade().unwrap()
1066    }
1067
1068    pub fn store_object_id(&self) -> u64 {
1069        self.store_object_id
1070    }
1071
1072    pub fn tree(&self) -> &LSMTree<ObjectKey, ObjectValue> {
1073        &self.tree
1074    }
1075
1076    pub fn root_directory_object_id(&self) -> u64 {
1077        self.store_info.lock().as_ref().unwrap().root_directory_object_id
1078    }
1079
1080    pub fn guid(&self) -> [u8; 16] {
1081        self.store_info.lock().as_ref().unwrap().guid
1082    }
1083
1084    pub fn graveyard_directory_object_id(&self) -> u64 {
1085        self.store_info.lock().as_ref().unwrap().graveyard_directory_object_id
1086    }
1087
1088    fn set_graveyard_directory_object_id(&self, oid: u64) {
1089        assert_eq!(
1090            std::mem::replace(
1091                &mut self.store_info.lock().as_mut().unwrap().graveyard_directory_object_id,
1092                oid
1093            ),
1094            INVALID_OBJECT_ID
1095        );
1096    }
1097
1098    pub fn object_count(&self) -> u64 {
1099        self.store_info.lock().as_ref().unwrap().object_count
1100    }
1101
1102    /// Returns INVALID_OBJECT_ID for algorithms that don't use the last ID.
1103    pub(crate) fn unencrypted_last_object_id(&self) -> u64 {
1104        self.last_object_id.lock().id()
1105    }
1106
1107    pub fn key_manager(&self) -> &KeyManager {
1108        &self.key_manager
1109    }
1110
1111    pub fn parent_store(&self) -> Option<&Arc<ObjectStore>> {
1112        self.parent_store.as_ref()
1113    }
1114
1115    /// Returns the crypt object for the store.  Returns None if the store is unencrypted.
1116    pub fn crypt(&self) -> Option<Arc<dyn Crypt>> {
1117        match &*self.lock_state.lock() {
1118            LockState::Locked => panic!("Store is locked"),
1119            LockState::Invalid
1120            | LockState::Unencrypted
1121            | LockState::Locking
1122            | LockState::Unlocking
1123            | LockState::Deleted => None,
1124            LockState::Unlocked { crypt, .. } => Some(crypt.clone()),
1125            LockState::UnlockedReadOnly(crypt) => Some(crypt.clone()),
1126            LockState::Unknown => {
1127                panic!("Store is of unknown lock state; has the journal been replayed yet?")
1128            }
1129        }
1130    }
1131
1132    /// Returns the id of the internal directory. Returns a NotFound error if this has not been
1133    /// initialized.
1134    pub fn get_internal_directory_id(self: &Arc<Self>) -> Result<u64, Error> {
1135        if let Some(store_info) = self.store_info.lock().as_ref() {
1136            if store_info.internal_directory_object_id == INVALID_OBJECT_ID {
1137                Err(FxfsError::NotFound.into())
1138            } else {
1139                Ok(store_info.internal_directory_object_id)
1140            }
1141        } else {
1142            Err(FxfsError::Unavailable.into())
1143        }
1144    }
1145
1146    pub async fn get_or_create_internal_directory_id(self: &Arc<Self>) -> Result<u64, Error> {
1147        // Create the transaction first to use the object store lock.
1148        let mut transaction = self
1149            .new_transaction(
1150                lock_keys![LockKey::InternalDirectory { store_object_id: self.store_object_id }],
1151                Options::default(),
1152            )
1153            .await?;
1154        let obj_id = self.store_info.lock().as_ref().unwrap().internal_directory_object_id;
1155        if obj_id != INVALID_OBJECT_ID {
1156            return Ok(obj_id);
1157        }
1158
1159        // Need to create an internal directory.
1160        let directory = Directory::create(&mut transaction, self, None).await?;
1161
1162        transaction.add(self.store_object_id, Mutation::CreateInternalDir(directory.object_id()));
1163        transaction.commit().await?;
1164        Ok(directory.object_id())
1165    }
1166
1167    /// Returns the file size for the object without opening the object.
1168    async fn get_file_size(&self, object_id: u64) -> Result<u64, Error> {
1169        let item = self
1170            .tree
1171            .find(&ObjectKey::attribute(object_id, AttributeId::DATA, AttributeKey::Attribute))
1172            .await?
1173            .ok_or(FxfsError::NotFound)?;
1174        if let ObjectValue::Attribute { size, .. } = item.value {
1175            Ok(size)
1176        } else {
1177            bail!(FxfsError::NotFile);
1178        }
1179    }
1180
1181    #[cfg(feature = "migration")]
1182    pub fn last_object_id(&self) -> u64 {
1183        self.last_object_id.lock().id()
1184    }
1185
1186    /// Provides access to the allocator to mark a specific region of the device as allocated.
1187    #[cfg(feature = "migration")]
1188    pub fn mark_allocated(
1189        &self,
1190        transaction: &mut Transaction<'_>,
1191        store_object_id: u64,
1192        device_range: std::ops::Range<u64>,
1193    ) -> Result<(), Error> {
1194        self.allocator().mark_allocated(transaction, store_object_id, device_range)
1195    }
1196
1197    /// `crypt` can be provided if the crypt service should be different to the default; see the
1198    /// comment on create_object.  Users should avoid having more than one handle open for the same
1199    /// object at the same time because they might get out-of-sync; there is no code that will
1200    /// prevent this.  One example where this can cause an issue is if the object ends up using a
1201    /// permanent key (which is the case if a value is passed for `crypt`), the permanent key is
1202    /// dropped when a handle is dropped, which will impact any other handles for the same object.
1203    pub async fn open_object<S: HandleOwner>(
1204        owner: &Arc<S>,
1205        obj_id: u64,
1206        options: HandleOptions,
1207        crypt: Option<Arc<dyn Crypt>>,
1208    ) -> Result<DataObjectHandle<S>, Error> {
1209        let store = owner.as_ref().as_ref();
1210        let mut fsverity_descriptor = None;
1211        let mut overwrite_ranges = Vec::new();
1212        let item = store
1213            .tree
1214            .find(&ObjectKey::attribute(obj_id, AttributeId::DATA, AttributeKey::Attribute))
1215            .await?
1216            .ok_or(FxfsError::NotFound)?;
1217
1218        let (size, track_overwrite_extents) = match item.value {
1219            ObjectValue::Attribute { size, has_overwrite_extents } => (size, has_overwrite_extents),
1220            ObjectValue::VerifiedAttribute { size, fsverity_metadata } => {
1221                if !options.skip_fsverity {
1222                    fsverity_descriptor = Some(fsverity_metadata);
1223                }
1224                // We only track the overwrite extents in memory for writes, reads handle them
1225                // implicitly, which means verified files (where the data won't change anymore)
1226                // don't need to track them.
1227                (size, false)
1228            }
1229            _ => bail!(anyhow!(FxfsError::Inconsistent).context("open_object: Expected attibute")),
1230        };
1231
1232        ensure!(size <= MAX_FILE_SIZE, FxfsError::Inconsistent);
1233
1234        if track_overwrite_extents {
1235            let layer_set = store.tree.layer_set();
1236            let mut merger = layer_set.merger();
1237            let mut iter = merger
1238                .query(Query::FullRange(&ObjectKey::attribute(
1239                    obj_id,
1240                    AttributeId::DATA,
1241                    AttributeKey::Extent(Extent::search_key_from_offset(0)),
1242                )))
1243                .await?;
1244            loop {
1245                match iter.get() {
1246                    Some(ItemRef {
1247                        key:
1248                            ObjectKey {
1249                                object_id,
1250                                data:
1251                                    ObjectKeyData::Attribute(
1252                                        AttributeId::DATA,
1253                                        AttributeKey::Extent(extent),
1254                                    ),
1255                            },
1256                        value,
1257                        ..
1258                    }) if *object_id == obj_id => {
1259                        match value {
1260                            ObjectValue::Extent(ExtentValue::None)
1261                            | ObjectValue::Extent(ExtentValue::Some {
1262                                mode: ExtentMode::Raw,
1263                                ..
1264                            })
1265                            | ObjectValue::Extent(ExtentValue::Some {
1266                                mode: ExtentMode::Cow(_),
1267                                ..
1268                            }) => (),
1269                            ObjectValue::Extent(ExtentValue::Some {
1270                                mode: ExtentMode::OverwritePartial(_),
1271                                ..
1272                            })
1273                            | ObjectValue::Extent(ExtentValue::Some {
1274                                mode: ExtentMode::Overwrite,
1275                                ..
1276                            }) => overwrite_ranges.push(extent.clone().into()),
1277                            _ => bail!(
1278                                anyhow!(FxfsError::Inconsistent)
1279                                    .context("open_object: Expected extent")
1280                            ),
1281                        }
1282                        iter.advance().await?;
1283                    }
1284                    _ => break,
1285                }
1286            }
1287        }
1288
1289        // If a crypt service has been specified, it needs to be a permanent key because cached
1290        // keys can only use the store's crypt service.
1291        let permanent = if let Some(crypt) = crypt {
1292            store
1293                .key_manager
1294                .get_keys(
1295                    obj_id,
1296                    crypt.as_ref(),
1297                    &mut Some(async || store.get_keys(obj_id).await),
1298                    /* permanent= */ true,
1299                    /* force= */ false,
1300                )
1301                .await?;
1302            true
1303        } else {
1304            false
1305        };
1306        let data_object_handle = DataObjectHandle::new(
1307            owner.clone(),
1308            obj_id,
1309            permanent,
1310            AttributeId::DATA,
1311            size,
1312            FsverityState::None,
1313            options,
1314            false,
1315            &overwrite_ranges,
1316        );
1317        if let Some(descriptor) = fsverity_descriptor {
1318            data_object_handle
1319                .set_fsverity_state_some(descriptor)
1320                .await
1321                .context("Invalid or mismatched merkle tree")?;
1322        }
1323        Ok(data_object_handle)
1324    }
1325
1326    pub fn create_object_with_id<S: HandleOwner>(
1327        owner: &Arc<S>,
1328        transaction: &mut Transaction<'_>,
1329        reserved_object_id: ReservedId<'_>,
1330        options: HandleOptions,
1331        encryption_options: Option<ObjectEncryptionOptions>,
1332    ) -> Result<DataObjectHandle<S>, Error> {
1333        let store = owner.as_ref().as_ref();
1334        // Don't permit creating unencrypted objects in an encrypted store.  The converse is OK.
1335        debug_assert!(store.crypt().is_none() || encryption_options.is_some());
1336        let now = Timestamp::now();
1337        let object_id = reserved_object_id.get();
1338        assert!(
1339            transaction
1340                .add(
1341                    store.store_object_id(),
1342                    Mutation::insert_object(
1343                        ObjectKey::object(reserved_object_id.release().get()),
1344                        ObjectValue::file(
1345                            1,
1346                            0,
1347                            now.clone(),
1348                            now.clone(),
1349                            now.clone(),
1350                            now,
1351                            None,
1352                            None
1353                        ),
1354                    ),
1355                )
1356                .is_none()
1357        );
1358        let mut permanent_keys = false;
1359        if let Some(ObjectEncryptionOptions { permanent, key_id, key, unwrapped_key }) =
1360            encryption_options
1361        {
1362            permanent_keys = permanent;
1363            let cipher = key_to_cipher(&key, &unwrapped_key)?;
1364            transaction.add(
1365                store.store_object_id(),
1366                Mutation::insert_object(
1367                    ObjectKey::keys(object_id),
1368                    ObjectValue::keys(vec![(key_id, key)].into()),
1369                ),
1370            );
1371            store.key_manager.insert(
1372                object_id,
1373                Arc::new(vec![(key_id, CipherHolder::Cipher(cipher))].into()),
1374                permanent,
1375            );
1376        }
1377        transaction.add(
1378            store.store_object_id(),
1379            Mutation::insert_object(
1380                ObjectKey::attribute(object_id, AttributeId::DATA, AttributeKey::Attribute),
1381                // This is a new object so nothing has pre-allocated overwrite extents yet.
1382                ObjectValue::attribute(0, false),
1383            ),
1384        );
1385        Ok(DataObjectHandle::new(
1386            owner.clone(),
1387            object_id,
1388            permanent_keys,
1389            AttributeId::DATA,
1390            0,
1391            FsverityState::None,
1392            options,
1393            false,
1394            &[],
1395        ))
1396    }
1397
1398    /// Creates an object in the store.
1399    ///
1400    /// If the store is encrypted, the object will be automatically encrypted as well.
1401    /// If `wrapping_key_id` is set, the new keys will be wrapped with that specific key, and
1402    /// otherwise the default data key is used.
1403    pub async fn create_object<S: HandleOwner>(
1404        owner: &Arc<S>,
1405        mut transaction: &mut Transaction<'_>,
1406        options: HandleOptions,
1407        wrapping_key_id: Option<WrappingKeyId>,
1408    ) -> Result<DataObjectHandle<S>, Error> {
1409        let store = owner.as_ref().as_ref();
1410        let object_id = store.get_next_object_id().await?;
1411        let crypt = store.crypt();
1412        let encryption_options = if let Some(crypt) = crypt {
1413            let key_id =
1414                if wrapping_key_id.is_some() { FSCRYPT_KEY_ID } else { VOLUME_DATA_KEY_ID };
1415            let (key, unwrapped_key) = if let Some(wrapping_key_id) = wrapping_key_id {
1416                crypt.create_key_with_id(object_id.get(), wrapping_key_id, ObjectType::File).await?
1417            } else {
1418                let (fxfs_key, unwrapped_key) =
1419                    crypt.create_key(object_id.get(), KeyPurpose::Data).await?;
1420                (EncryptionKey::Fxfs(fxfs_key), unwrapped_key)
1421            };
1422            Some(ObjectEncryptionOptions { permanent: false, key_id, key, unwrapped_key })
1423        } else {
1424            None
1425        };
1426        ObjectStore::create_object_with_id(
1427            owner,
1428            &mut transaction,
1429            object_id,
1430            options,
1431            encryption_options,
1432        )
1433    }
1434
1435    /// Creates an object using explicitly provided keys.
1436    ///
1437    /// There are some cases where an encrypted object needs to be created in an unencrypted store.
1438    /// For example, when layer files for a child store are created in the root store, but they must
1439    /// be encrypted using the child store's keys.  This method exists for that purpose.
1440    pub(crate) async fn create_object_with_key<S: HandleOwner>(
1441        owner: &Arc<S>,
1442        mut transaction: &mut Transaction<'_>,
1443        object_id: ReservedId<'_>,
1444        options: HandleOptions,
1445        key: EncryptionKey,
1446        unwrapped_key: UnwrappedKey,
1447    ) -> Result<DataObjectHandle<S>, Error> {
1448        ObjectStore::create_object_with_id(
1449            owner,
1450            &mut transaction,
1451            object_id,
1452            options,
1453            Some(ObjectEncryptionOptions {
1454                permanent: true,
1455                key_id: VOLUME_DATA_KEY_ID,
1456                key,
1457                unwrapped_key,
1458            }),
1459        )
1460    }
1461
1462    /// Adjusts the reference count for a given object.  If the reference count reaches zero, the
1463    /// object is moved into the graveyard and true is returned.
1464    pub async fn adjust_refs(
1465        &self,
1466        transaction: &mut Transaction<'_>,
1467        object_id: u64,
1468        delta: i64,
1469    ) -> Result<bool, Error> {
1470        let mut mutation = self.txn_get_object_mutation(transaction, object_id).await?;
1471        let refs = if let ObjectValue::Object {
1472            kind: ObjectKind::File { refs, .. } | ObjectKind::Symlink { refs, .. },
1473            ..
1474        } = &mut mutation.item.value
1475        {
1476            *refs =
1477                refs.checked_add_signed(delta).ok_or_else(|| anyhow!("refs underflow/overflow"))?;
1478            refs
1479        } else {
1480            bail!(FxfsError::NotFile);
1481        };
1482        if *refs == 0 {
1483            self.add_to_graveyard(transaction, object_id);
1484
1485            // We might still need to adjust the reference count if delta was something other than
1486            // -1.
1487            if delta != -1 {
1488                *refs = 1;
1489                transaction.add(self.store_object_id, Mutation::ObjectStore(mutation));
1490            }
1491            // Otherwise, we don't commit the mutation as we want to keep reference count as 1 for
1492            // objects in graveyard.
1493            Ok(true)
1494        } else {
1495            transaction.add(self.store_object_id, Mutation::ObjectStore(mutation));
1496            Ok(false)
1497        }
1498    }
1499
1500    // Purges an object that is in the graveyard.
1501    pub async fn tombstone_object(
1502        &self,
1503        object_id: u64,
1504        txn_options: Options<'_>,
1505    ) -> Result<(), Error> {
1506        debug_assert!(
1507            self.tree.find(&ObjectKey::object(object_id)).await?.is_some(),
1508            "Tombstoning missing object"
1509        );
1510        debug_assert!(
1511            self.tree
1512                .find(&ObjectKey::graveyard_entry(self.graveyard_directory_object_id(), object_id))
1513                .await?
1514                .is_some(),
1515            "Tombstoning object not in graveyard"
1516        );
1517        self.key_manager.remove(object_id).await;
1518        let fs = self.filesystem();
1519        let truncate_guard = fs.truncate_guard(self.store_object_id, object_id).await;
1520        self.trim_or_tombstone(object_id, true, txn_options, &truncate_guard).await
1521    }
1522
1523    /// Trim extents beyond the end of a file for all attributes.  This will remove the entry from
1524    /// the graveyard when done.
1525    pub async fn trim(
1526        &self,
1527        object_id: u64,
1528        truncate_guard: &TruncateGuard<'_>,
1529    ) -> Result<(), Error> {
1530        // For the root and root parent store, we would need to use the metadata reservation which
1531        // we don't currently support, so assert that we're not those stores.
1532        assert!(self.parent_store.as_ref().unwrap().parent_store.is_some());
1533
1534        self.trim_or_tombstone(
1535            object_id,
1536            false,
1537            Options { borrow_metadata_space: true, ..Default::default() },
1538            truncate_guard,
1539        )
1540        .await
1541    }
1542
1543    /// Trims or tombstones an object.
1544    async fn trim_or_tombstone(
1545        &self,
1546        object_id: u64,
1547        for_tombstone: bool,
1548        txn_options: Options<'_>,
1549        _truncate_guard: &TruncateGuard<'_>,
1550    ) -> Result<(), Error> {
1551        let mut next_attribute = Some(AttributeId::SORTED_START);
1552        while let Some(attribute_id) = next_attribute.take() {
1553            let mut transaction = self
1554                .new_transaction(
1555                    lock_keys![
1556                        LockKey::object_attribute(self.store_object_id, object_id, attribute_id),
1557                        LockKey::object(self.store_object_id, object_id),
1558                    ],
1559                    txn_options,
1560                )
1561                .await?;
1562
1563            match self
1564                .trim_some(
1565                    &mut transaction,
1566                    object_id,
1567                    attribute_id,
1568                    if for_tombstone {
1569                        TrimMode::Tombstone(TombstoneMode::Object)
1570                    } else {
1571                        TrimMode::UseSize
1572                    },
1573                )
1574                .await?
1575            {
1576                TrimResult::Incomplete => next_attribute = Some(attribute_id),
1577                TrimResult::Done(None) => {
1578                    if for_tombstone
1579                        || matches!(
1580                            self.tree
1581                                .find(&ObjectKey::graveyard_entry(
1582                                    self.graveyard_directory_object_id(),
1583                                    object_id,
1584                                ))
1585                                .await?,
1586                            Some(Item { value: ObjectValue::Trim, .. })
1587                        )
1588                    {
1589                        self.remove_from_graveyard(&mut transaction, object_id);
1590                    }
1591                    // The last attribute was not the default attribute, it may have been added to
1592                    // the graveyard alongside the object.
1593                    if for_tombstone && attribute_id != AttributeId::DATA {
1594                        self.remove_attribute_from_graveyard(
1595                            &mut transaction,
1596                            object_id,
1597                            attribute_id,
1598                        );
1599                    }
1600                }
1601                TrimResult::Done(id) => {
1602                    // Moved to the next attribute. This one is finished and it may have been
1603                    // added to the graveyard alongside the object.
1604                    if for_tombstone && attribute_id != AttributeId::DATA {
1605                        self.remove_attribute_from_graveyard(
1606                            &mut transaction,
1607                            object_id,
1608                            attribute_id,
1609                        );
1610                    }
1611                    next_attribute = id;
1612                }
1613            }
1614
1615            if !transaction.mutations().is_empty() {
1616                transaction.commit().await?;
1617            }
1618        }
1619        Ok(())
1620    }
1621
1622    // Purges an object's attribute that is in the graveyard.
1623    pub async fn tombstone_attribute(
1624        &self,
1625        object_id: u64,
1626        attribute_id: AttributeId,
1627        txn_options: Options<'_>,
1628    ) -> Result<(), Error> {
1629        // Ensure that we don't double-delete things, it should still exist and be in the graveyard.
1630        debug_assert!(
1631            self.tree
1632                .find(&ObjectKey::attribute(object_id, attribute_id, AttributeKey::Attribute))
1633                .await?
1634                .is_some(),
1635            "Tombstoning missing attribute"
1636        );
1637        debug_assert!(
1638            self.tree
1639                .find(&ObjectKey::graveyard_attribute_entry(
1640                    self.graveyard_directory_object_id(),
1641                    object_id,
1642                    attribute_id
1643                ))
1644                .await?
1645                .is_some(),
1646            "Tombstoning attribute not in graveyard"
1647        );
1648        let mut trim_result = TrimResult::Incomplete;
1649        while matches!(trim_result, TrimResult::Incomplete) {
1650            let mut transaction = self
1651                .new_transaction(
1652                    lock_keys![
1653                        LockKey::object_attribute(self.store_object_id, object_id, attribute_id),
1654                        LockKey::object(self.store_object_id, object_id),
1655                    ],
1656                    txn_options,
1657                )
1658                .await?;
1659            trim_result = self
1660                .trim_some(
1661                    &mut transaction,
1662                    object_id,
1663                    attribute_id,
1664                    TrimMode::Tombstone(TombstoneMode::Attribute),
1665                )
1666                .await?;
1667            if let TrimResult::Done(..) = trim_result {
1668                self.remove_attribute_from_graveyard(&mut transaction, object_id, attribute_id)
1669            }
1670            if !transaction.mutations().is_empty() {
1671                transaction.commit().await?;
1672            }
1673        }
1674        Ok(())
1675    }
1676
1677    /// Deletes extents for attribute `attribute_id` in object `object_id`.  Also see the comments
1678    /// for TrimMode and TrimResult. Should hold a lock on the attribute, and the object as it
1679    /// performs a read-modify-write on the sizes.
1680    pub async fn trim_some(
1681        &self,
1682        transaction: &mut Transaction<'_>,
1683        object_id: u64,
1684        attribute_id: AttributeId,
1685        mode: TrimMode,
1686    ) -> Result<TrimResult, Error> {
1687        let layer_set = self.tree.layer_set();
1688        let mut merger = layer_set.merger();
1689
1690        let aligned_offset = match mode {
1691            TrimMode::FromOffset(offset) => {
1692                round_up(offset, self.block_size).ok_or(FxfsError::Inconsistent)?
1693            }
1694            TrimMode::Tombstone(..) => 0,
1695            TrimMode::UseSize => {
1696                let iter = merger
1697                    .query(Query::FullRange(&ObjectKey::attribute(
1698                        object_id,
1699                        attribute_id,
1700                        AttributeKey::Attribute,
1701                    )))
1702                    .await?;
1703                if let Some(item_ref) = iter.get() {
1704                    if item_ref.key.object_id != object_id {
1705                        return Ok(TrimResult::Done(None));
1706                    }
1707
1708                    if let ItemRef {
1709                        key:
1710                            ObjectKey {
1711                                data:
1712                                    ObjectKeyData::Attribute(size_attribute_id, AttributeKey::Attribute),
1713                                ..
1714                            },
1715                        value: ObjectValue::Attribute { size, .. },
1716                        ..
1717                    } = item_ref
1718                    {
1719                        // If we found a different attribute_id, return so we can get the
1720                        // right lock.
1721                        if *size_attribute_id != attribute_id {
1722                            return Ok(TrimResult::Done(Some(*size_attribute_id)));
1723                        }
1724                        round_up(*size, self.block_size).ok_or(FxfsError::Inconsistent)?
1725                    } else {
1726                        // At time of writing, we should always see a size record or None here, but
1727                        // asserting here would be brittle so just skip to the the next attribute
1728                        // instead.
1729                        return Ok(TrimResult::Done(Some(attribute_id.next())));
1730                    }
1731                } else {
1732                    // End of the tree.
1733                    return Ok(TrimResult::Done(None));
1734                }
1735            }
1736        };
1737
1738        // Loop over the extents and deallocate them.
1739        let mut iter = merger
1740            .query(Query::FullRange(&ObjectKey::from_extent(
1741                object_id,
1742                attribute_id,
1743                Extent::search_key_from_offset(aligned_offset),
1744            )))
1745            .await?;
1746        let mut end = 0;
1747        let allocator = self.allocator();
1748        let mut result = TrimResult::Done(None);
1749        let mut deallocated = 0;
1750        let block_size = self.block_size;
1751
1752        while let Some(item_ref) = iter.get() {
1753            if item_ref.key.object_id != object_id {
1754                break;
1755            }
1756            if let ObjectKey {
1757                data: ObjectKeyData::Attribute(extent_attribute_id, attribute_key),
1758                ..
1759            } = item_ref.key
1760            {
1761                if *extent_attribute_id != attribute_id {
1762                    result = TrimResult::Done(Some(*extent_attribute_id));
1763                    break;
1764                }
1765                if let (
1766                    AttributeKey::Extent(extent),
1767                    ObjectValue::Extent(ExtentValue::Some { device_offset, .. }),
1768                ) = (attribute_key, item_ref.value)
1769                {
1770                    let start = std::cmp::max(extent.start, aligned_offset);
1771                    ensure!(start < extent.end, FxfsError::Inconsistent);
1772                    let device_offset = device_offset
1773                        .checked_add(start - extent.start)
1774                        .ok_or(FxfsError::Inconsistent)?;
1775                    end = extent.end;
1776                    let len = end - start;
1777                    let device_range = device_offset..device_offset + len;
1778                    ensure!(device_range.is_aligned(block_size), FxfsError::Inconsistent);
1779                    allocator.deallocate(transaction, self.store_object_id, device_range).await?;
1780                    deallocated += len;
1781                    // Stop if the transaction is getting too big.
1782                    if transaction.mutations().len() >= TRANSACTION_MUTATION_THRESHOLD {
1783                        result = TrimResult::Incomplete;
1784                        break;
1785                    }
1786                }
1787            }
1788            iter.advance().await?;
1789        }
1790
1791        let finished_tombstone_object = matches!(mode, TrimMode::Tombstone(TombstoneMode::Object))
1792            && matches!(result, TrimResult::Done(None));
1793        let finished_tombstone_attribute =
1794            matches!(mode, TrimMode::Tombstone(TombstoneMode::Attribute))
1795                && !matches!(result, TrimResult::Incomplete);
1796        let mut object_mutation = None;
1797        let nodes = if finished_tombstone_object { -1 } else { 0 };
1798        if nodes != 0 || deallocated != 0 {
1799            let mutation = self.txn_get_object_mutation(transaction, object_id).await?;
1800            if let ObjectValue::Object { attributes: ObjectAttributes { project_id, .. }, .. } =
1801                mutation.item.value
1802            {
1803                if let Some(project_id) = project_id {
1804                    transaction.add(
1805                        self.store_object_id,
1806                        Mutation::merge_object(
1807                            ObjectKey::project_usage(self.root_directory_object_id(), project_id),
1808                            ObjectValue::BytesAndNodes {
1809                                bytes: -i64::try_from(deallocated).unwrap(),
1810                                nodes,
1811                            },
1812                        ),
1813                    );
1814                }
1815                object_mutation = Some(mutation);
1816            } else {
1817                panic!("Inconsistent object type.");
1818            }
1819        }
1820
1821        // Deletion marker records *must* be merged so as to consume all other records for the
1822        // object.
1823        if finished_tombstone_object {
1824            transaction.add(
1825                self.store_object_id,
1826                Mutation::merge_object(ObjectKey::object(object_id), ObjectValue::None),
1827            );
1828        } else {
1829            if finished_tombstone_attribute {
1830                transaction.add(
1831                    self.store_object_id,
1832                    Mutation::merge_object(
1833                        ObjectKey::attribute(object_id, attribute_id, AttributeKey::Attribute),
1834                        ObjectValue::None,
1835                    ),
1836                );
1837            }
1838            if deallocated > 0 {
1839                let mut mutation = match object_mutation {
1840                    Some(mutation) => mutation,
1841                    None => self.txn_get_object_mutation(transaction, object_id).await?,
1842                };
1843                transaction.add(
1844                    self.store_object_id,
1845                    Mutation::merge_object(
1846                        ObjectKey::extent(object_id, attribute_id, aligned_offset..end),
1847                        ObjectValue::deleted_extent(),
1848                    ),
1849                );
1850                // Update allocated size.
1851                if let ObjectValue::Object {
1852                    attributes: ObjectAttributes { allocated_size, .. },
1853                    ..
1854                } = &mut mutation.item.value
1855                {
1856                    // The only way for these to fail are if the volume is inconsistent.
1857                    *allocated_size = allocated_size.checked_sub(deallocated).ok_or_else(|| {
1858                        anyhow!(FxfsError::Inconsistent).context("Allocated size overflow")
1859                    })?;
1860                } else {
1861                    panic!("Unexpected object value");
1862                }
1863                transaction.add(self.store_object_id, Mutation::ObjectStore(mutation));
1864            }
1865        }
1866        Ok(result)
1867    }
1868
1869    /// Returns all objects that exist in the parent store that pertain to this object store.
1870    /// Note that this doesn't include the object_id of the store itself which is generally
1871    /// referenced externally.
1872    pub fn parent_objects(&self) -> Vec<u64> {
1873        assert!(self.store_info_handle.get().is_some());
1874        self.store_info.lock().as_ref().unwrap().parent_objects()
1875    }
1876
1877    /// Returns root objects for this store.
1878    pub fn root_objects(&self) -> Vec<u64> {
1879        let mut objects = Vec::new();
1880        let store_info = self.store_info.lock();
1881        let info = store_info.as_ref().unwrap();
1882        if info.root_directory_object_id != INVALID_OBJECT_ID {
1883            objects.push(info.root_directory_object_id);
1884        }
1885        if info.graveyard_directory_object_id != INVALID_OBJECT_ID {
1886            objects.push(info.graveyard_directory_object_id);
1887        }
1888        if info.internal_directory_object_id != INVALID_OBJECT_ID {
1889            objects.push(info.internal_directory_object_id);
1890        }
1891        objects
1892    }
1893
1894    pub fn store_info(&self) -> Option<StoreInfo> {
1895        self.store_info.lock().as_ref().cloned()
1896    }
1897
1898    /// Returns None if called during journal replay.
1899    pub fn store_info_handle_object_id(&self) -> Option<u64> {
1900        self.store_info_handle.get().map(|h| h.object_id())
1901    }
1902
1903    pub fn graveyard_count(&self) -> u64 {
1904        self.graveyard_entries.load(Ordering::Relaxed)
1905    }
1906
1907    /// Called to open a store, before replay of this store's mutations.
1908    async fn open(
1909        parent_store: &Arc<ObjectStore>,
1910        store_object_id: u64,
1911        object_cache: Box<dyn ObjectCache<ObjectKey, ObjectValue>>,
1912    ) -> Result<Arc<ObjectStore>, Error> {
1913        let handle =
1914            ObjectStore::open_object(parent_store, store_object_id, HandleOptions::default(), None)
1915                .await?;
1916
1917        let info = load_store_info(parent_store, store_object_id).await?;
1918        let is_encrypted = info.mutations_key.is_some();
1919
1920        let mut total_layer_size = 0;
1921        let last_object_id;
1922
1923        // TODO(https://fxbug.dev/42178043): the layer size here could be bad and cause overflow.
1924
1925        // If the store is encrypted, we can't open the object tree layers now, but we need to
1926        // compute the size of the layers.
1927        if is_encrypted {
1928            for &oid in &info.layers {
1929                total_layer_size += parent_store.get_file_size(oid).await?;
1930            }
1931            if info.encrypted_mutations_object_id != INVALID_OBJECT_ID {
1932                total_layer_size += layer_size_from_encrypted_mutations_size(
1933                    parent_store.get_file_size(info.encrypted_mutations_object_id).await?,
1934                );
1935            }
1936            last_object_id = LastObjectId::Pending;
1937            ensure!(
1938                matches!(
1939                    info.last_object_id,
1940                    LastObjectIdInfo::Encrypted { .. } | LastObjectIdInfo::Low32Bit { .. }
1941                ),
1942                FxfsError::Inconsistent
1943            );
1944        } else {
1945            last_object_id = match info.last_object_id {
1946                LastObjectIdInfo::Unencrypted { id } => LastObjectId::Unencrypted { id },
1947                LastObjectIdInfo::Low32Bit => {
1948                    LastObjectId::Low32Bit { reserved: HashSet::new(), unreserved: Vec::new() }
1949                }
1950                _ => bail!(FxfsError::Inconsistent),
1951            };
1952        }
1953
1954        let fs = parent_store.filesystem();
1955
1956        let store = ObjectStore::new(
1957            Some(parent_store.clone()),
1958            store_object_id,
1959            fs.clone(),
1960            if is_encrypted { None } else { Some(info) },
1961            object_cache,
1962            None,
1963            if is_encrypted { LockState::Locked } else { LockState::Unencrypted },
1964            last_object_id,
1965        );
1966
1967        assert!(store.store_info_handle.set(handle).is_ok(), "Failed to set store_info_handle!");
1968
1969        if !is_encrypted {
1970            let object_tree_layer_object_ids =
1971                store.store_info.lock().as_ref().unwrap().layers.clone();
1972            let object_layers = store.open_layers(object_tree_layer_object_ids, None).await?;
1973            total_layer_size = object_layers.iter().map(|h| h.get_size()).sum();
1974            store
1975                .tree
1976                .append_layers(object_layers)
1977                .await
1978                .context("Failed to read object store layers")?;
1979        }
1980
1981        fs.object_manager().update_reservation(
1982            store_object_id,
1983            tree::reservation_amount_from_layer_size(total_layer_size),
1984        );
1985
1986        Ok(store)
1987    }
1988
1989    async fn load_store_info(&self) -> Result<StoreInfo, Error> {
1990        load_store_info_from_handle(self.store_info_handle.get().unwrap()).await
1991    }
1992
1993    async fn open_layers(
1994        &self,
1995        object_ids: impl std::iter::IntoIterator<Item = u64>,
1996        crypt: Option<Arc<dyn Crypt>>,
1997    ) -> Result<Vec<DataObjectHandle<ObjectStore>>, Error> {
1998        let parent_store = self.parent_store.as_ref().unwrap();
1999        let mut handles = Vec::new();
2000        for object_id in object_ids {
2001            let handle = ObjectStore::open_object(
2002                &parent_store,
2003                object_id,
2004                HandleOptions::default(),
2005                crypt.clone(),
2006            )
2007            .await
2008            .with_context(|| format!("Failed to open layer file {}", object_id))?;
2009            handles.push(handle);
2010        }
2011        Ok(handles)
2012    }
2013
2014    /// Unlocks a store so that it is ready to be used.
2015    /// This is not thread-safe.
2016    pub async fn unlock(self: &Arc<Self>, crypt: Arc<dyn Crypt>) -> Result<(), Error> {
2017        self.unlock_inner(crypt, /*read_only=*/ false).await
2018    }
2019
2020    /// Unlocks a store so that it is ready to be read from.
2021    /// The store will generally behave like it is still locked: when flushed, the store will
2022    /// write out its mutations into the encrypted mutations file, rather than directly updating
2023    /// the layer files of the object store.
2024    /// Re-locking the store (which *must* be done with `Self::lock_read_only` will not trigger a
2025    /// flush, although the store might still be flushed during other operations.
2026    /// This is not thread-safe.
2027    pub async fn unlock_read_only(self: &Arc<Self>, crypt: Arc<dyn Crypt>) -> Result<(), Error> {
2028        self.unlock_inner(crypt, /*read_only=*/ true).await
2029    }
2030
2031    async fn unlock_inner(
2032        self: &Arc<Self>,
2033        crypt: Arc<dyn Crypt>,
2034        read_only: bool,
2035    ) -> Result<(), Error> {
2036        // Unless we are unlocking the store as read-only, the filesystem must not be read-only.
2037        assert!(read_only || !self.filesystem().options().read_only);
2038        match &*self.lock_state.lock() {
2039            LockState::Locked => {}
2040            LockState::Unencrypted => bail!(FxfsError::InvalidArgs),
2041            LockState::Invalid | LockState::Deleted => bail!(FxfsError::Internal),
2042            LockState::Unlocked { .. } | LockState::UnlockedReadOnly(..) => {
2043                bail!(FxfsError::AlreadyBound)
2044            }
2045            LockState::Unknown => panic!("Store was unlocked before replay"),
2046            LockState::Locking => panic!("Store is being locked"),
2047            LockState::Unlocking => panic!("Store is being unlocked"),
2048        }
2049        // We must lock flushing since that can modify store_info and the encrypted mutations file.
2050        let keys = lock_keys![LockKey::flush(self.store_object_id())];
2051        let fs = self.filesystem();
2052        let guard = fs.lock_manager().write_lock(keys).await;
2053
2054        let store_info = self.load_store_info().await?;
2055
2056        self.tree
2057            .append_layers(
2058                self.open_layers(store_info.layers.iter().cloned(), Some(crypt.clone())).await?,
2059            )
2060            .await
2061            .context("Failed to read object tree layer file contents")?;
2062
2063        let wrapped_key =
2064            fxfs_crypto::WrappedKey::Fxfs(store_info.mutations_key.clone().unwrap().into());
2065        let unwrapped_key = crypt
2066            .unwrap_key(&wrapped_key, self.store_object_id)
2067            .await
2068            .context("Failed to unwrap mutations keys")?;
2069        // The ChaCha20 stream cipher we use supports up to 64 GiB.  By default we'll roll the key
2070        // after every 128 MiB.  Here we just need to pick a number that won't cause issues if it
2071        // wraps, so we just use u32::MAX (the offset is u64).
2072        ensure!(store_info.mutations_cipher_offset <= u32::MAX as u64, FxfsError::Inconsistent);
2073        let mut mutations_cipher =
2074            StreamCipher::new(&unwrapped_key, store_info.mutations_cipher_offset);
2075
2076        match &store_info.last_object_id {
2077            LastObjectIdInfo::Encrypted { id, key } => {
2078                let wrapped_key = fxfs_crypto::WrappedKey::Fxfs(key.clone().into());
2079                *self.last_object_id.lock() = LastObjectId::Encrypted {
2080                    id: *id,
2081                    cipher: Box::new(Ff1::new(
2082                        &crypt.unwrap_key(&wrapped_key, self.store_object_id).await?,
2083                    )),
2084                };
2085            }
2086            LastObjectIdInfo::Low32Bit => {
2087                *self.last_object_id.lock() = LastObjectId::Low32Bit {
2088                    reserved: Default::default(),
2089                    unreserved: Default::default(),
2090                }
2091            }
2092            _ => unreachable!(),
2093        }
2094
2095        // Apply the encrypted mutations.
2096        let mut mutations = {
2097            if store_info.encrypted_mutations_object_id == INVALID_OBJECT_ID {
2098                EncryptedMutations::default()
2099            } else {
2100                let parent_store = self.parent_store.as_ref().unwrap();
2101                let handle = ObjectStore::open_object(
2102                    &parent_store,
2103                    store_info.encrypted_mutations_object_id,
2104                    HandleOptions::default(),
2105                    None,
2106                )
2107                .await?;
2108                let mut cursor = std::io::Cursor::new(
2109                    handle
2110                        .contents(MAX_ENCRYPTED_MUTATIONS_SIZE)
2111                        .await
2112                        .context(FxfsError::Inconsistent)?,
2113                );
2114                let mut mutations = EncryptedMutations::deserialize_with_version(&mut cursor)
2115                    .context("Failed to deserialize EncryptedMutations")?
2116                    .0;
2117                let len = cursor.get_ref().len() as u64;
2118                while cursor.position() < len {
2119                    mutations.extend(
2120                        &EncryptedMutations::deserialize_with_version(&mut cursor)
2121                            .context("Failed to deserialize EncryptedMutations")?
2122                            .0,
2123                    );
2124                }
2125                mutations
2126            }
2127        };
2128
2129        // This assumes that the journal has no buffered mutations for this store (see Self::lock).
2130        let journaled = EncryptedMutations::from_replayed_mutations(
2131            self.store_object_id,
2132            fs.journal()
2133                .read_transactions_for_object(self.store_object_id)
2134                .await
2135                .context("Failed to read encrypted mutations from journal")?,
2136        );
2137        mutations.extend(&journaled);
2138
2139        let _ = std::mem::replace(&mut *self.lock_state.lock(), LockState::Unlocking);
2140        *self.store_info.lock() = Some(store_info);
2141
2142        let clean_up = scopeguard::guard((), |_| {
2143            *self.lock_state.lock() = LockState::Locked;
2144            *self.store_info.lock() = None;
2145            // Make sure we don't leave unencrypted data lying around in memory.
2146            self.tree.reset();
2147        });
2148
2149        let EncryptedMutations { transactions, mut data, mutations_key_roll } = mutations;
2150
2151        let mut slice = &mut data[..];
2152        let mut last_offset = 0;
2153        for (offset, key) in mutations_key_roll {
2154            let split_offset = offset
2155                .checked_sub(last_offset)
2156                .ok_or(FxfsError::Inconsistent)
2157                .context("Invalid mutation key roll offset")?;
2158            last_offset = offset;
2159            ensure!(split_offset <= slice.len(), FxfsError::Inconsistent);
2160            let (old, new) = slice.split_at_mut(split_offset);
2161            mutations_cipher.decrypt(old);
2162            let unwrapped_key = crypt
2163                .unwrap_key(&fxfs_crypto::WrappedKey::Fxfs(key.into()), self.store_object_id)
2164                .await
2165                .context("Failed to unwrap mutations keys")?;
2166            mutations_cipher = StreamCipher::new(&unwrapped_key, 0);
2167            slice = new;
2168        }
2169        mutations_cipher.decrypt(slice);
2170
2171        // Always roll the mutations key when we unlock which guarantees we won't reuse a
2172        // previous key and nonce.
2173        self.roll_mutations_key(crypt.as_ref()).await?;
2174
2175        let mut cursor = std::io::Cursor::new(data);
2176        for (checkpoint, count) in transactions {
2177            let context = ApplyContext { mode: ApplyMode::Replay, checkpoint };
2178            for _ in 0..count {
2179                let mutation =
2180                    Mutation::deserialize_from_version(&mut cursor, context.checkpoint.version)
2181                        .context("failed to deserialize encrypted mutation")?;
2182                self.apply_mutation(mutation, &context, AssocObj::None)
2183                    .context("failed to apply encrypted mutation")?;
2184            }
2185        }
2186
2187        *self.lock_state.lock() = if read_only {
2188            LockState::UnlockedReadOnly(crypt)
2189        } else {
2190            LockState::Unlocked { crypt, cached_keys: Vec::new() }
2191        };
2192
2193        // To avoid unbounded memory growth, we should flush the encrypted mutations now. Otherwise
2194        // it's possible for more writes to be queued and for the store to be locked before we can
2195        // flush anything and that can repeat.
2196        std::mem::drop(guard);
2197
2198        if !read_only && !self.filesystem().options().read_only {
2199            // We must populate the cache before calling flush, because flush itself does not top up
2200            // the cache (it avoids calling crypt).
2201            self.pre_cache_keys().await?;
2202            self.flush_with_reason(flush::Reason::Unlock).await?;
2203
2204            // Reap purged files within this store.
2205            let _ = self.filesystem().graveyard().initial_reap(&self).await?;
2206        }
2207
2208        // Return and cancel the clean up.
2209        Ok(ScopeGuard::into_inner(clean_up))
2210    }
2211
2212    pub fn is_locked(&self) -> bool {
2213        matches!(
2214            *self.lock_state.lock(),
2215            LockState::Locked | LockState::Locking | LockState::Unknown
2216        )
2217    }
2218
2219    /// NB: This is not the converse of `is_locked`, as there are lock states where neither are
2220    /// true.
2221    pub fn is_unlocked(&self) -> bool {
2222        matches!(
2223            *self.lock_state.lock(),
2224            LockState::Unlocked { .. } | LockState::UnlockedReadOnly(..) | LockState::Unlocking
2225        )
2226    }
2227
2228    pub fn is_unknown(&self) -> bool {
2229        matches!(*self.lock_state.lock(), LockState::Unknown)
2230    }
2231
2232    pub fn is_encrypted(&self) -> bool {
2233        self.store_info.lock().as_ref().unwrap().mutations_key.is_some()
2234    }
2235
2236    // Locks a store.
2237    // This operation will take a flush lock on the store, in case any flushes are ongoing.  Any
2238    // ongoing store accesses might be interrupted by this.  See `Self::crypt`.
2239    // Whilst this can return an error, the store will be placed into an unusable but safe state
2240    // (i.e. no lingering unencrypted data) if an error is encountered.
2241    pub async fn lock(&self) -> Result<(), Error> {
2242        // We must lock flushing since it is not safe for that to be happening whilst we are locking
2243        // the store.
2244        let keys = lock_keys![LockKey::flush(self.store_object_id())];
2245        let fs = self.filesystem();
2246        let _guard = fs.lock_manager().write_lock(keys).await;
2247
2248        {
2249            let mut lock_state = self.lock_state.lock();
2250            if let LockState::Unlocked { .. } = &*lock_state {
2251                *lock_state = LockState::Locking;
2252            } else {
2253                panic!("Unexpected lock state: {:?}", *lock_state);
2254            }
2255        }
2256
2257        // Sync the journal now to ensure that any buffered mutations for this store make it out to
2258        // disk.  This is necessary to be able to unlock the store again.
2259        // We need to establish a barrier at this point (so that the journaled writes are observable
2260        // by any future attempts to unlock the store), hence the flush_device.
2261        let sync_result =
2262            self.filesystem().sync(SyncOptions { flush_device: true, ..Default::default() }).await;
2263
2264        *self.lock_state.lock() = if let Err(error) = &sync_result {
2265            error!(error:?; "Failed to sync journal; store will no longer be usable");
2266            LockState::Invalid
2267        } else {
2268            LockState::Locked
2269        };
2270        self.key_manager.clear();
2271        *self.store_info.lock() = None;
2272        self.tree.reset();
2273
2274        sync_result
2275    }
2276
2277    // Locks a store which was previously unlocked read-only (see `Self::unlock_read_only`).  Data
2278    // is not flushed, and instead any journaled mutations are buffered back into the ObjectStore
2279    // and will be replayed next time the store is unlocked.
2280    pub fn lock_read_only(&self) {
2281        *self.lock_state.lock() = LockState::Locked;
2282        *self.store_info.lock() = None;
2283        self.tree.reset();
2284    }
2285
2286    // Returns None if the object ID cipher needs to be created or rolled, or a more expensive
2287    // algorithm needs to be used.
2288    fn maybe_get_next_object_id(&self) -> Option<ReservedId<'_>> {
2289        self.last_object_id.lock().try_get_next().map(|id| ReservedId::new(self, id))
2290    }
2291
2292    /// Returns a new object ID that can be used.  This will create an object ID cipher if needed.
2293    ///
2294    /// If the object ID key needs to be rolled, a new transaction will be created and committed.
2295    pub(super) async fn get_next_object_id(&self) -> Result<ReservedId<'_>, Error> {
2296        {
2297            let mut last_object_id = self.last_object_id.lock();
2298            if let Some(id) = last_object_id.try_get_next() {
2299                return Ok(ReservedId::new(self, id));
2300            }
2301            ensure!(
2302                !matches!(&*last_object_id, LastObjectId::Unencrypted { .. }),
2303                FxfsError::Inconsistent
2304            );
2305        }
2306
2307        let parent_store = self.parent_store().unwrap();
2308
2309        // Create a transaction (which has a lock) and then check again.
2310        //
2311        // NOTE: Since this can be a nested transaction, we must take care to avoid deadlocks; no
2312        // more locks should be taken whilst we hold this lock.
2313        let mut transaction = parent_store
2314            .new_transaction(
2315                lock_keys![LockKey::object(parent_store.store_object_id, self.store_object_id)],
2316                Options {
2317                    // We must skip journal checks because this transaction might be needed to
2318                    // compact.
2319                    skip_journal_checks: true,
2320                    borrow_metadata_space: true,
2321                    ..Default::default()
2322                },
2323            )
2324            .await?;
2325
2326        let mut next_id_hi = 0;
2327
2328        let is_low_32_bit = {
2329            let mut last_object_id = self.last_object_id.lock();
2330            if let Some(id) = last_object_id.try_get_next() {
2331                // Something else raced and created/rolled the cipher.
2332                return Ok(ReservedId::new(self, id));
2333            }
2334
2335            match &*last_object_id {
2336                LastObjectId::Encrypted { id, .. } => {
2337                    // It shouldn't be possible for last_object_id to wrap within our lifetime, so
2338                    // if this happens, it's most likely due to corruption.
2339                    next_id_hi =
2340                        id.checked_add(1 << 32).ok_or(FxfsError::Inconsistent)? & OBJECT_ID_HI_MASK;
2341
2342                    info!(store_id = self.store_object_id; "Rolling object ID key");
2343
2344                    false
2345                }
2346                LastObjectId::Low32Bit { .. } => true,
2347                _ => unreachable!(),
2348            }
2349        };
2350
2351        if is_low_32_bit {
2352            // Keep picking an object ID at random until we find one free.
2353
2354            // To avoid races, this must be before we capture the layer set.
2355            self.last_object_id.lock().drain_unreserved();
2356
2357            let layer_set = self.tree.layer_set();
2358            let mut key = ObjectKey::object(0);
2359            loop {
2360                let next_id = rand::rng().next_u32() as u64;
2361                let Some(next_id) = NonZero::new(next_id) else { continue };
2362                if self.last_object_id.lock().is_reserved(next_id.get()) {
2363                    continue;
2364                }
2365                key.object_id = next_id.get();
2366                if layer_set.key_exists(&key).await? == Existence::Missing {
2367                    self.last_object_id.lock().reserve(next_id.get());
2368                    return Ok(ReservedId::new(self, next_id));
2369                }
2370            }
2371        } else {
2372            // Create a key.
2373            let (object_id_wrapped, object_id_unwrapped) = self
2374                .crypt()
2375                .unwrap()
2376                .create_key(self.store_object_id, KeyPurpose::Metadata)
2377                .await?;
2378
2379            // Normally we would use a mutation to note the updated key, but that would complicate
2380            // replay.  During replay, we need to keep track of the highest used object ID and this
2381            // is done by watching mutations to see when we create objects, and then decrypting
2382            // the object ID.  This relies on the unwrapped key being available, so as soon as
2383            // we detect the key has changed, we would need to immediately unwrap the key via the
2384            // crypt service.  Currently, this isn't easy to do during replay.  An option we could
2385            // consider would be to include the unencrypted object ID when we create objects, which
2386            // would avoid us having to decrypt the object ID during replay.
2387            //
2388            // For now and for historical reasons, the approach we take is to just write a new
2389            // version of StoreInfo here.  We must take care that we only update the key and not any
2390            // other information contained within StoreInfo because other information should only be
2391            // updated when we flush.  We are holding the lock on the StoreInfo file, so this will
2392            // prevent potential races with flushing.  To make sure we only change the key, we read
2393            // StoreInfo from storage rather than using our in-memory copy.  This won't be
2394            // performant, but rolling the object ID key will be extremely rare.
2395            let new_store_info = StoreInfo {
2396                last_object_id: LastObjectIdInfo::Encrypted {
2397                    id: next_id_hi,
2398                    key: object_id_wrapped.clone(),
2399                },
2400                ..self.load_store_info().await?
2401            };
2402
2403            self.write_store_info(&mut transaction, &new_store_info).await?;
2404
2405            transaction
2406                .commit_with_callback(|_| {
2407                    self.store_info.lock().as_mut().unwrap().last_object_id =
2408                        new_store_info.last_object_id;
2409                    match &mut *self.last_object_id.lock() {
2410                        LastObjectId::Encrypted { id, cipher } => {
2411                            **cipher = Ff1::new(&object_id_unwrapped);
2412                            *id = next_id_hi;
2413                            ReservedId::new(
2414                                self,
2415                                NonZero::new(next_id_hi | cipher.encrypt(0) as u64).unwrap(),
2416                            )
2417                        }
2418                        _ => unreachable!(),
2419                    }
2420                })
2421                .await
2422        }
2423    }
2424
2425    /// Query the next object ID that will be used. Intended for use when checking filesystem
2426    /// consistency. Prefer [`Self::get_next_object_id()`] for general use.
2427    pub(crate) fn query_next_object_id(&self) -> u64 {
2428        self.last_object_id.lock().peek_next()
2429    }
2430
2431    fn allocator(&self) -> Arc<Allocator> {
2432        self.filesystem().allocator()
2433    }
2434
2435    // If |transaction| has an impending mutation for the underlying object, returns that.
2436    // Otherwise, looks up the object from the tree and returns a suitable mutation for it.  The
2437    // mutation is returned here rather than the item because the mutation includes the operation
2438    // which has significance: inserting an object implies it's the first of its kind unlike
2439    // replacing an object.
2440    async fn txn_get_object_mutation(
2441        &self,
2442        transaction: &Transaction<'_>,
2443        object_id: u64,
2444    ) -> Result<ObjectStoreMutation, Error> {
2445        if let Some(mutation) =
2446            transaction.get_object_mutation(self.store_object_id, ObjectKey::object(object_id))
2447        {
2448            Ok(mutation.clone())
2449        } else {
2450            Ok(ObjectStoreMutation {
2451                item: self
2452                    .tree
2453                    .find(&ObjectKey::object(object_id))
2454                    .await?
2455                    .ok_or(FxfsError::Inconsistent)
2456                    .context("Object id missing")?,
2457                op: Operation::ReplaceOrInsert,
2458            })
2459        }
2460    }
2461
2462    /// Like txn_get_object_mutation but with expanded visibility.
2463    /// Only available in migration code.
2464    #[cfg(feature = "migration")]
2465    pub async fn get_object_mutation(
2466        &self,
2467        transaction: &Transaction<'_>,
2468        object_id: u64,
2469    ) -> Result<ObjectStoreMutation, Error> {
2470        self.txn_get_object_mutation(transaction, object_id).await
2471    }
2472
2473    fn update_last_object_id(&self, object_id: u64) {
2474        let mut last_object_id = self.last_object_id.lock();
2475        match &mut *last_object_id {
2476            LastObjectId::Pending => unreachable!(),
2477            LastObjectId::Unencrypted { id } => {
2478                if object_id > *id {
2479                    *id = object_id
2480                }
2481            }
2482            LastObjectId::Encrypted { id, cipher } => {
2483                // For encrypted stores, object_id will be encrypted here, so we must decrypt first.
2484
2485                // If the object ID cipher has been rolled, then it's possible we might see object
2486                // IDs that were generated using a different cipher so the decrypt here will return
2487                // the wrong value, but that won't matter because the hi part of the object ID
2488                // should still discriminate.
2489                let object_id =
2490                    object_id & OBJECT_ID_HI_MASK | cipher.decrypt(object_id as u32) as u64;
2491                if object_id > *id {
2492                    *id = object_id;
2493                }
2494            }
2495            LastObjectId::Low32Bit { .. } => {}
2496        }
2497    }
2498
2499    /// If possible, converts the given object ID to its unencrypted value.  Returns None if it is
2500    /// not possible to convert to its unencrypted value because the key is unavailable.
2501    pub fn to_unencrypted_object_id(&self, object_id: u64) -> Option<u64> {
2502        let last_object_id = self.last_object_id.lock();
2503        match &*last_object_id {
2504            LastObjectId::Pending => None,
2505            LastObjectId::Unencrypted { .. } | LastObjectId::Low32Bit { .. } => Some(object_id),
2506            LastObjectId::Encrypted { id, cipher } => {
2507                if id & OBJECT_ID_HI_MASK != object_id & OBJECT_ID_HI_MASK {
2508                    None
2509                } else {
2510                    Some(object_id & OBJECT_ID_HI_MASK | cipher.decrypt(object_id as u32) as u64)
2511                }
2512            }
2513        }
2514    }
2515
2516    /// Adds the specified object to the graveyard.
2517    pub fn add_to_graveyard(&self, transaction: &mut Transaction<'_>, object_id: u64) {
2518        let graveyard_id = self.graveyard_directory_object_id();
2519        assert_ne!(graveyard_id, INVALID_OBJECT_ID);
2520        transaction.add(
2521            self.store_object_id,
2522            Mutation::replace_or_insert_object(
2523                ObjectKey::graveyard_entry(graveyard_id, object_id),
2524                ObjectValue::Some,
2525            ),
2526        );
2527    }
2528
2529    /// Removes the specified object from the graveyard.  NB: Care should be taken when calling
2530    /// this because graveyard entries are used for purging deleted files *and* for trimming
2531    /// extents.  For example, consider the following sequence:
2532    ///
2533    ///     1. Add Trim graveyard entry.
2534    ///     2. Replace with Some graveyard entry (see above).
2535    ///     3. Remove graveyard entry.
2536    ///
2537    /// If the desire in #3 is just to cancel the effect of the Some entry, then #3 should
2538    /// actually be:
2539    ///
2540    ///     3. Replace with Trim graveyard entry.
2541    pub fn remove_from_graveyard(&self, transaction: &mut Transaction<'_>, object_id: u64) {
2542        transaction.add(
2543            self.store_object_id,
2544            Mutation::replace_or_insert_object(
2545                ObjectKey::graveyard_entry(self.graveyard_directory_object_id(), object_id),
2546                ObjectValue::None,
2547            ),
2548        );
2549    }
2550
2551    /// Removes the specified attribute from the graveyard. Unlike object graveyard entries,
2552    /// attribute graveyard entries only have one functionality (i.e. to purge deleted attributes)
2553    /// so the caller does not need to be concerned about replacing the graveyard attribute entry
2554    /// with its prior state when cancelling it. See comment on `remove_from_graveyard()`.
2555    pub fn remove_attribute_from_graveyard(
2556        &self,
2557        transaction: &mut Transaction<'_>,
2558        object_id: u64,
2559        attribute_id: AttributeId,
2560    ) {
2561        transaction.add(
2562            self.store_object_id,
2563            Mutation::replace_or_insert_object(
2564                ObjectKey::graveyard_attribute_entry(
2565                    self.graveyard_directory_object_id(),
2566                    object_id,
2567                    attribute_id,
2568                ),
2569                ObjectValue::None,
2570            ),
2571        );
2572    }
2573
2574    fn needs_mutations_key_roll(&self) -> bool {
2575        self.mutations_cipher.lock().as_ref().is_some_and(|cipher| {
2576            cipher.offset() >= self.filesystem().options().roll_metadata_key_byte_count
2577        })
2578    }
2579
2580    // Roll the mutations key.  The new key will be written for the next encrypted mutation.
2581    async fn roll_mutations_key(&self, crypt: &dyn Crypt) -> Result<(), Error> {
2582        let (wrapped_key, unwrapped_key) =
2583            crypt.create_key(self.store_object_id, KeyPurpose::Metadata).await?;
2584
2585        // The mutations_cipher lock must be held for the duration so that mutations_cipher and
2586        // store_info are updated atomically.  Otherwise, write_mutation could find a new cipher but
2587        // end up writing the wrong wrapped key.
2588        let mut cipher = self.mutations_cipher.lock();
2589        *cipher = Some(StreamCipher::new(&unwrapped_key, 0));
2590        self.store_info.lock().as_mut().unwrap().mutations_key = Some(wrapped_key);
2591        // mutations_cipher_offset is updated by flush.
2592        Ok(())
2593    }
2594
2595    // When the symlink is unlocked, this function decrypts `link` and returns a bag of bytes that
2596    // is identical to that which was passed in as the target on `create_symlink`.
2597    // If the symlink is locked, this function hashes the encrypted `link` with Sha256 in order to
2598    // get a standard length and then base64 encodes the hash and returns that to the caller.
2599    pub async fn read_encrypted_symlink(
2600        &self,
2601        object_id: u64,
2602        link: Vec<u8>,
2603    ) -> Result<Vec<u8>, Error> {
2604        let mut link = link;
2605        let key = self
2606            .key_manager()
2607            .get_fscrypt_key(object_id, self.crypt().unwrap().as_ref(), async || {
2608                self.get_keys(object_id).await
2609            })
2610            .await?;
2611        if let Some(key) = key.into_cipher() {
2612            key.decrypt_symlink(object_id, &mut link)?;
2613            Ok(link)
2614        } else {
2615            // Locked symlinks are encoded using a hash_code of 0.
2616            let proxy_filename =
2617                fscrypt::proxy_filename::ProxyFilename::new_with_hash_code(0, &link);
2618            let proxy_filename_str: String = proxy_filename.into();
2619            Ok(proxy_filename_str.into_bytes())
2620        }
2621    }
2622
2623    /// Returns the link of a symlink object.
2624    pub async fn read_symlink(&self, object_id: u64) -> Result<Vec<u8>, Error> {
2625        match self.tree.find(&ObjectKey::object(object_id)).await? {
2626            None => bail!(FxfsError::NotFound),
2627            Some(Item {
2628                value: ObjectValue::Object { kind: ObjectKind::EncryptedSymlink { link, .. }, .. },
2629                ..
2630            }) => self.read_encrypted_symlink(object_id, link.to_vec()).await,
2631            Some(Item {
2632                value: ObjectValue::Object { kind: ObjectKind::Symlink { link, .. }, .. },
2633                ..
2634            }) => Ok(link.to_vec()),
2635            Some(item) => Err(anyhow!(FxfsError::Inconsistent)
2636                .context(format!("Unexpected item in lookup: {item:?}"))),
2637        }
2638    }
2639
2640    /// Retrieves the wrapped keys for the given object.  The keys *should* be known to exist and it
2641    /// will be considered an inconsistency if they don't.
2642    pub async fn get_keys(&self, object_id: u64) -> Result<EncryptionKeys, Error> {
2643        match self.tree.find(&ObjectKey::keys(object_id)).await?.ok_or(FxfsError::Inconsistent)? {
2644            Item { value: ObjectValue::Keys(keys), .. } => Ok(keys),
2645            _ => Err(anyhow!(FxfsError::Inconsistent).context("open_object: Expected keys")),
2646        }
2647    }
2648
2649    pub async fn update_attributes<'a>(
2650        &self,
2651        transaction: &mut Transaction<'a>,
2652        object_id: u64,
2653        node_attributes: Option<&fio::MutableNodeAttributes>,
2654        change_time: Option<Timestamp>,
2655    ) -> Result<(), Error> {
2656        if change_time.is_none() {
2657            if let Some(attributes) = node_attributes {
2658                let empty_attributes = fio::MutableNodeAttributes { ..Default::default() };
2659                if *attributes == empty_attributes {
2660                    return Ok(());
2661                }
2662            } else {
2663                return Ok(());
2664            }
2665        }
2666        let mut mutation = self.txn_get_object_mutation(transaction, object_id).await?;
2667        if let ObjectValue::Object { ref mut attributes, .. } = mutation.item.value {
2668            if let Some(time) = change_time {
2669                attributes.change_time = time;
2670            }
2671            if let Some(node_attributes) = node_attributes {
2672                if let Some(time) = node_attributes.creation_time {
2673                    attributes.creation_time = Timestamp::from_nanos(time);
2674                }
2675                if let Some(time) = node_attributes.modification_time {
2676                    attributes.modification_time = Timestamp::from_nanos(time);
2677                }
2678                if let Some(time) = node_attributes.access_time {
2679                    attributes.access_time = Timestamp::from_nanos(time);
2680                }
2681                if node_attributes.mode.is_some()
2682                    || node_attributes.uid.is_some()
2683                    || node_attributes.gid.is_some()
2684                    || node_attributes.rdev.is_some()
2685                {
2686                    if let Some(a) = &mut attributes.posix_attributes {
2687                        if let Some(mode) = node_attributes.mode {
2688                            a.mode = mode;
2689                        }
2690                        if let Some(uid) = node_attributes.uid {
2691                            a.uid = uid;
2692                        }
2693                        if let Some(gid) = node_attributes.gid {
2694                            a.gid = gid;
2695                        }
2696                        if let Some(rdev) = node_attributes.rdev {
2697                            a.rdev = rdev;
2698                        }
2699                    } else {
2700                        attributes.posix_attributes = Some(PosixAttributes {
2701                            mode: node_attributes.mode.unwrap_or_default(),
2702                            uid: node_attributes.uid.unwrap_or_default(),
2703                            gid: node_attributes.gid.unwrap_or_default(),
2704                            rdev: node_attributes.rdev.unwrap_or_default(),
2705                        });
2706                    }
2707                }
2708            }
2709        } else {
2710            bail!(
2711                anyhow!(FxfsError::Inconsistent)
2712                    .context("ObjectStore.update_attributes: Expected object value")
2713            );
2714        };
2715        transaction.add(self.store_object_id(), Mutation::ObjectStore(mutation));
2716        Ok(())
2717    }
2718
2719    // Updates and commits the changes to access time in ObjectProperties. The update matches
2720    // Linux's RELATIME. That is, access time is updated to the current time if access time is less
2721    // than or equal to the last modification or status change, or if it has been more than a day
2722    // since the last access.  `precondition` is a condition to be checked *after* taking the lock
2723    // on the object.  If `precondition` returns false, no update will be performed.
2724    pub async fn update_access_time(
2725        &self,
2726        object_id: u64,
2727        props: &mut ObjectProperties,
2728        precondition: impl FnOnce() -> bool,
2729    ) -> Result<(), Error> {
2730        let access_time = props.access_time.as_nanos();
2731        let modification_time = props.modification_time.as_nanos();
2732        let change_time = props.change_time.as_nanos();
2733        let now = Timestamp::now();
2734        if access_time <= modification_time
2735            || access_time <= change_time
2736            || access_time
2737                < now.as_nanos()
2738                    - Timestamp::from(std::time::Duration::from_secs(24 * 60 * 60)).as_nanos()
2739        {
2740            let mut transaction = self
2741                .new_transaction(
2742                    lock_keys![LockKey::object(self.store_object_id, object_id,)],
2743                    Options { borrow_metadata_space: true, ..Default::default() },
2744                )
2745                .await?;
2746            if precondition() {
2747                self.update_attributes(
2748                    &mut transaction,
2749                    object_id,
2750                    Some(&fio::MutableNodeAttributes {
2751                        access_time: Some(now.as_nanos()),
2752                        ..Default::default()
2753                    }),
2754                    None,
2755                )
2756                .await?;
2757                transaction.commit().await?;
2758                props.access_time = now;
2759            }
2760        }
2761        Ok(())
2762    }
2763
2764    async fn write_store_info<'a>(
2765        &'a self,
2766        transaction: &mut Transaction<'a>,
2767        info: &StoreInfo,
2768    ) -> Result<(), Error> {
2769        let mut serialized_info = Vec::new();
2770        info.serialize_with_version(&mut serialized_info)?;
2771        let mut buf = self.device.allocate_buffer(serialized_info.len()).await;
2772        buf.as_mut_slice().copy_from_slice(&serialized_info[..]);
2773        self.store_info_handle.get().unwrap().txn_write(transaction, 0u64, buf.as_ref()).await
2774    }
2775
2776    pub fn mark_deleted(&self) {
2777        *self.lock_state.lock() = LockState::Deleted;
2778    }
2779
2780    #[cfg(test)]
2781    pub(crate) fn test_set_last_object_id(&self, object_id: u64) {
2782        match &mut *self.last_object_id.lock() {
2783            LastObjectId::Encrypted { id, .. } => *id = object_id,
2784            _ => unreachable!(),
2785        }
2786    }
2787
2788    /// Looks up the size of the attribute. Returns an error if either the object or attribute
2789    /// doesn't exist.
2790    pub async fn get_attribute_size(
2791        &self,
2792        object_id: u64,
2793        attribute_id: AttributeId,
2794    ) -> Result<u64, Error> {
2795        let item = self
2796            .tree
2797            .find(&ObjectKey::attribute(object_id, attribute_id, AttributeKey::Attribute))
2798            .await?
2799            .ok_or(FxfsError::NotFound)?;
2800        let size = match item.value {
2801            ObjectValue::Attribute { size, .. } => size,
2802            ObjectValue::VerifiedAttribute { size, .. } => size,
2803            _ => bail!(FxfsError::Inconsistent),
2804        };
2805        Ok(size)
2806    }
2807}
2808
2809#[async_trait]
2810impl JournalingObject for ObjectStore {
2811    fn apply_mutation(
2812        &self,
2813        mutation: Mutation,
2814        context: &ApplyContext<'_, '_>,
2815        _assoc_obj: AssocObj<'_>,
2816    ) -> Result<(), Error> {
2817        match &*self.lock_state.lock() {
2818            LockState::Locked | LockState::Locking => {
2819                ensure!(
2820                    matches!(mutation, Mutation::BeginFlush | Mutation::EndFlush)
2821                        || matches!(
2822                            mutation,
2823                            Mutation::EncryptedObjectStore(_) | Mutation::UpdateMutationsKey(_)
2824                                if context.mode.is_replay()
2825                        ),
2826                    anyhow!(FxfsError::Inconsistent)
2827                        .context(format!("Unexpected mutation for encrypted store: {mutation:?}"))
2828                );
2829            }
2830            LockState::Invalid
2831            | LockState::Unlocking
2832            | LockState::Unencrypted
2833            | LockState::Unlocked { .. }
2834            | LockState::UnlockedReadOnly(..)
2835            | LockState::Deleted => {}
2836            lock_state @ _ => panic!("Unexpected lock state: {lock_state:?}"),
2837        }
2838        match mutation {
2839            Mutation::ObjectStore(ObjectStoreMutation { item, op }) => {
2840                match op {
2841                    Operation::Insert => {
2842                        let mut unreserve_id = INVALID_OBJECT_ID;
2843                        // If we are inserting an object record for the first time, it signifies the
2844                        // birth of the object so we need to adjust the object count.
2845                        if matches!(item.value, ObjectValue::Object { .. }) {
2846                            {
2847                                let info = &mut self.store_info.lock();
2848                                let object_count = &mut info.as_mut().unwrap().object_count;
2849                                *object_count = object_count.saturating_add(1);
2850                            }
2851                            if context.mode.is_replay() {
2852                                self.update_last_object_id(item.key.object_id);
2853                            } else {
2854                                unreserve_id = item.key.object_id;
2855                            }
2856                        } else if !context.mode.is_replay()
2857                            && matches!(
2858                                item.key.data,
2859                                ObjectKeyData::GraveyardEntry { .. }
2860                                    | ObjectKeyData::GraveyardAttributeEntry { .. }
2861                            )
2862                        {
2863                            if matches!(item.value, ObjectValue::Some | ObjectValue::Trim) {
2864                                self.graveyard_entries.fetch_add(1, Ordering::Relaxed);
2865                            } else if matches!(item.value, ObjectValue::None) {
2866                                self.graveyard_entries.fetch_sub(1, Ordering::Relaxed);
2867                            }
2868                        }
2869                        self.tree.insert(item)?;
2870                        if unreserve_id != INVALID_OBJECT_ID {
2871                            // To avoid races, this *must* be after the `tree.insert(..)` above.
2872                            self.last_object_id.lock().unreserve(unreserve_id);
2873                        }
2874                    }
2875                    Operation::ReplaceOrInsert => {
2876                        if !context.mode.is_replay()
2877                            && matches!(
2878                                item.key.data,
2879                                ObjectKeyData::GraveyardEntry { .. }
2880                                    | ObjectKeyData::GraveyardAttributeEntry { .. }
2881                            )
2882                        {
2883                            if matches!(item.value, ObjectValue::Some | ObjectValue::Trim) {
2884                                self.graveyard_entries.fetch_add(1, Ordering::Relaxed);
2885                            } else if matches!(item.value, ObjectValue::None) {
2886                                self.graveyard_entries.fetch_sub(1, Ordering::Relaxed);
2887                            }
2888                        }
2889                        self.tree.replace_or_insert(item);
2890                    }
2891                    Operation::Merge => {
2892                        if item.is_tombstone() {
2893                            let info = &mut self.store_info.lock();
2894                            let object_count = &mut info.as_mut().unwrap().object_count;
2895                            *object_count = object_count.saturating_sub(1);
2896                        }
2897                        if !context.mode.is_replay()
2898                            && matches!(
2899                                item.key.data,
2900                                ObjectKeyData::GraveyardEntry { .. }
2901                                    | ObjectKeyData::GraveyardAttributeEntry { .. }
2902                            )
2903                        {
2904                            if matches!(item.value, ObjectValue::Some | ObjectValue::Trim) {
2905                                self.graveyard_entries.fetch_add(1, Ordering::Relaxed);
2906                            } else if matches!(item.value, ObjectValue::None) {
2907                                self.graveyard_entries.fetch_sub(1, Ordering::Relaxed);
2908                            }
2909                        }
2910                        let lower_bound = item.key.key_for_merge_into();
2911                        self.tree.merge_into(item, &lower_bound);
2912                    }
2913                }
2914            }
2915            Mutation::BeginFlush => {
2916                ensure!(self.parent_store.is_some(), FxfsError::Inconsistent);
2917                self.tree.seal();
2918            }
2919            Mutation::EndFlush => ensure!(self.parent_store.is_some(), FxfsError::Inconsistent),
2920            Mutation::EncryptedObjectStore(_) | Mutation::UpdateMutationsKey(_) => {
2921                // We will process these during Self::unlock.
2922                ensure!(
2923                    !matches!(&*self.lock_state.lock(), LockState::Unencrypted),
2924                    FxfsError::Inconsistent
2925                );
2926            }
2927            Mutation::CreateInternalDir(object_id) => {
2928                ensure!(object_id != INVALID_OBJECT_ID, FxfsError::Inconsistent);
2929                self.store_info.lock().as_mut().unwrap().internal_directory_object_id = object_id;
2930            }
2931            _ => bail!("unexpected mutation: {:?}", mutation),
2932        }
2933        self.counters.lock().mutations_applied += 1;
2934        Ok(())
2935    }
2936
2937    fn drop_mutation(&self, mutation: Mutation, _transaction: &Transaction<'_>) {
2938        self.counters.lock().mutations_dropped += 1;
2939        if let Mutation::ObjectStore(ObjectStoreMutation {
2940            item: Item { key: ObjectKey { object_id, .. }, value: ObjectValue::Object { .. }, .. },
2941            op: Operation::Insert,
2942        }) = mutation
2943        {
2944            self.last_object_id.lock().unreserve(object_id);
2945        }
2946    }
2947
2948    async fn prepare_commit<'a>(
2949        &self,
2950        filesystem: &'a FxFilesystem,
2951        _transaction: &Transaction<'_>,
2952    ) -> Result<Option<WriteGuard<'a>>, Error> {
2953        // Short circuit check to see if this is an encrypted store.
2954        if !matches!(&*self.lock_state.lock(), LockState::Unlocked { .. }) {
2955            return Ok(None);
2956        }
2957
2958        // We must acquire the keys lock before we can access or modify `cached_keys`.  This guard
2959        // is returned and held until the transaction commits, ensuring that the keys we cache (or
2960        // existing keys) remain valid and are not interfered with by other transactions.
2961        let keys = lock_keys![LockKey::pre_cache_keys(self.store_object_id())];
2962        let guard = filesystem.lock_manager().write_lock(keys).await;
2963
2964        self.pre_cache_keys().await?;
2965
2966        Ok(Some(guard))
2967    }
2968
2969    /// Push all in-memory structures to the device. This is not necessary for sync since the
2970    /// journal will take care of it.  This is supposed to be called when there is either memory or
2971    /// space pressure (flushing the store will persist in-memory data and allow the journal file to
2972    /// be trimmed).
2973    ///
2974    /// Also returns the earliest version of a struct in the filesystem (when known).
2975    async fn flush(&self) -> Result<Version, Error> {
2976        self.flush_with_reason(flush::Reason::Journal).await
2977    }
2978
2979    fn write_mutation(&self, mutation: &Mutation, mut writer: journal::Writer<'_>) {
2980        // Intentionally enumerating all variants to force a decision on any new variants. Encrypt
2981        // all mutations that could affect an encrypted object store contents or the `StoreInfo` of
2982        // the encrypted object store. During `unlock()` any mutations which haven't been encrypted
2983        // won't be replayed after reading `StoreInfo`.
2984        match mutation {
2985            // Whilst CreateInternalDir is a mutation for `StoreInfo`, which isn't encrypted, we
2986            // still choose to encrypt the mutation because it makes it easier to deal with replay.
2987            // When we replay mutations for an encrypted store, the only thing we keep in memory are
2988            // the encrypted mutations; we don't keep `StoreInfo` or changes to it in memory. So, by
2989            // encrypting the CreateInternalDir mutation here, it means we don't have to track both
2990            // encrypted mutations bound for the LSM tree and unencrypted mutations for `StoreInfo`
2991            // to use in `unlock()`. It'll just bundle CreateInternalDir mutations with the other
2992            // encrypted mutations and handled them all in sequence during `unlock()`.
2993            Mutation::ObjectStore(_) | Mutation::CreateInternalDir(_) => {
2994                let mut cipher = self.mutations_cipher.lock();
2995                if let Some(cipher) = cipher.as_mut() {
2996                    // If this is the first time we've used this key, we must write the key out.
2997                    if cipher.offset() == 0 {
2998                        writer.write(Mutation::update_mutations_key(
2999                            self.store_info
3000                                .lock()
3001                                .as_ref()
3002                                .unwrap()
3003                                .mutations_key
3004                                .as_ref()
3005                                .unwrap()
3006                                .clone(),
3007                        ));
3008                    }
3009                    let mut buffer = Vec::new();
3010                    mutation.serialize_into(&mut buffer).unwrap();
3011                    cipher.encrypt(&mut buffer);
3012                    writer.write(Mutation::EncryptedObjectStore(buffer.into()));
3013                    return;
3014                }
3015            }
3016            // `EncryptedObjectStore` and `UpdateMutationsKey` are both obviously associated with
3017            // encrypted object stores, but are either the encrypted mutation data itself or
3018            // metadata governing how the data will be encrypted. They should only be produced here.
3019            Mutation::EncryptedObjectStore(_) | Mutation::UpdateMutationsKey(_) => {
3020                debug_assert!(false, "Only this method should generate encrypted mutations");
3021            }
3022            // `BeginFlush` and `EndFlush` are not needed during `unlock()` and are needed during
3023            // the initial journal replay, so should not be encrypted. `Allocator`, `DeleteVolume`,
3024            // `UpdateBorrowed` mutations are never associated with an encrypted store as we do not
3025            // encrypt the allocator or root/root-parent stores so we can avoid the locking.
3026            Mutation::Allocator(_)
3027            | Mutation::BeginFlush
3028            | Mutation::EndFlush
3029            | Mutation::DeleteVolume
3030            | Mutation::UpdateBorrowed(_) => {}
3031        }
3032        writer.write(mutation.clone());
3033    }
3034}
3035
3036impl Drop for ObjectStore {
3037    fn drop(&mut self) {
3038        let mut last_object_id = self.last_object_id.lock();
3039        last_object_id.drain_unreserved();
3040        match &*last_object_id {
3041            LastObjectId::Low32Bit { reserved, .. } => debug_assert!(reserved.is_empty()),
3042            _ => {}
3043        }
3044    }
3045}
3046
3047impl HandleOwner for ObjectStore {}
3048
3049impl AsRef<ObjectStore> for ObjectStore {
3050    fn as_ref(&self) -> &ObjectStore {
3051        self
3052    }
3053}
3054
3055fn layer_size_from_encrypted_mutations_size(size: u64) -> u64 {
3056    // This is similar to reserved_space_from_journal_usage. It needs to be a worst case estimate of
3057    // the amount of metadata space that might need to be reserved to allow the encrypted mutations
3058    // to be written to layer files.  It needs to be >= than reservation_amount_from_layer_size will
3059    // return once the data has been written to layer files and <= than
3060    // reserved_space_from_journal_usage would use.  We can't just use
3061    // reserved_space_from_journal_usage because the encrypted mutations file includes some extra
3062    // data (it includes the checkpoints) that isn't written in the same way to the journal.
3063    size * 3
3064}
3065
3066impl AssociatedObject for ObjectStore {}
3067
3068/// Argument to the trim_some method.
3069#[derive(Debug)]
3070pub enum TrimMode {
3071    /// Trim extents beyond the current size.
3072    UseSize,
3073
3074    /// Trim extents beyond the supplied offset.
3075    FromOffset(u64),
3076
3077    /// Remove the object (or attribute) from the store once it is fully trimmed.
3078    Tombstone(TombstoneMode),
3079}
3080
3081/// Sets the mode for tombstoning (either at the object or attribute level).
3082#[derive(Debug)]
3083pub enum TombstoneMode {
3084    Object,
3085    Attribute,
3086}
3087
3088/// Result of the trim_some method.
3089#[derive(Debug)]
3090pub enum TrimResult {
3091    /// We reached the limit of the transaction and more extents might follow.
3092    Incomplete,
3093
3094    /// We finished this attribute.  Returns the ID of the next attribute for the same object if
3095    /// there is one.
3096    Done(Option<AttributeId>),
3097}
3098
3099/// Loads store info.
3100pub async fn load_store_info(
3101    parent: &Arc<ObjectStore>,
3102    store_object_id: u64,
3103) -> Result<StoreInfo, Error> {
3104    load_store_info_from_handle(
3105        &ObjectStore::open_object(parent, store_object_id, HandleOptions::default(), None).await?,
3106    )
3107    .await
3108}
3109
3110async fn load_store_info_from_handle(
3111    handle: &DataObjectHandle<impl HandleOwner>,
3112) -> Result<StoreInfo, Error> {
3113    Ok(if handle.get_size() > 0 {
3114        let serialized_info = handle.contents(MAX_STORE_INFO_SERIALIZED_SIZE).await?;
3115        let mut cursor = std::io::Cursor::new(serialized_info);
3116        let (store_info, _) = StoreInfo::deserialize_with_version(&mut cursor)
3117            .context("Failed to deserialize StoreInfo")?;
3118        store_info
3119    } else {
3120        // The store_info will be absent for a newly created and empty object store.
3121        StoreInfo::default()
3122    })
3123}
3124
3125#[cfg(test)]
3126mod tests {
3127    use super::{
3128        AttributeId, FsverityMetadata, HandleOptions, LastObjectId, LastObjectIdInfo, LockKey,
3129        MAX_STORE_INFO_SERIALIZED_SIZE, Mutation, NewChildStoreOptions, OBJECT_ID_HI_MASK,
3130        ObjectStore, RootDigest, StoreInfo, StoreOptions,
3131    };
3132    use crate::errors::FxfsError;
3133    use crate::filesystem::{
3134        FxFilesystem, FxFilesystemBuilder, JournalingObject, OpenFxFilesystem,
3135    };
3136    use crate::fsck::{fsck, fsck_volume};
3137    use crate::lsm_tree::Query;
3138    use crate::lsm_tree::types::{ItemRef, LayerIterator};
3139    use crate::object_handle::{
3140        INVALID_OBJECT_ID, ObjectHandle, ReadObjectHandle, WriteObjectHandle,
3141    };
3142    use crate::object_store::directory::{Directory, replace_child};
3143    use crate::object_store::journal::JournalOptions;
3144    use crate::object_store::object_record::{AttributeKey, ObjectKey, ObjectKind, ObjectValue};
3145    use crate::object_store::transaction::{Options, lock_keys};
3146    use crate::object_store::volume::root_volume;
3147    use crate::serialized_types::VersionedLatest;
3148    use crate::testing;
3149    use assert_matches::assert_matches;
3150    use async_trait::async_trait;
3151    use fuchsia_async as fasync;
3152    use fuchsia_sync::Mutex;
3153    use futures::join;
3154    use fxfs_crypto::ff1::Ff1;
3155    use fxfs_crypto::{
3156        Crypt, EncryptionKey, FXFS_KEY_SIZE, FXFS_WRAPPED_KEY_SIZE, FxfsKey, KeyPurpose,
3157        ObjectType, UnwrappedKey, WrappedKey, WrappedKeyBytes, WrappingKeyId,
3158    };
3159    use fxfs_insecure_crypto::new_insecure_crypt;
3160    use std::sync::Arc;
3161    use std::time::Duration;
3162    use storage_device::DeviceHolder;
3163    use storage_device::fake_device::FakeDevice;
3164    use test_case::test_case;
3165    use zx_status as zx;
3166
3167    const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
3168
3169    async fn test_filesystem() -> OpenFxFilesystem {
3170        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
3171        FxFilesystem::new_empty(device).await.expect("new_empty failed")
3172    }
3173
3174    #[fuchsia::test]
3175    async fn test_verified_file_with_verified_attribute() {
3176        let fs: OpenFxFilesystem = test_filesystem().await;
3177        let mut transaction = fs
3178            .root_store()
3179            .new_transaction(lock_keys![], Options::default())
3180            .await
3181            .expect("new_transaction failed");
3182        let store = fs.root_store();
3183        let object = Arc::new(
3184            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
3185                .await
3186                .expect("create_object failed"),
3187        );
3188
3189        transaction.add(
3190            store.store_object_id(),
3191            Mutation::replace_or_insert_object(
3192                ObjectKey::attribute(
3193                    object.object_id(),
3194                    AttributeId::DATA,
3195                    AttributeKey::Attribute,
3196                ),
3197                ObjectValue::verified_attribute(
3198                    0,
3199                    FsverityMetadata::Internal(RootDigest::Sha256([0; 32]), vec![]),
3200                ),
3201            ),
3202        );
3203
3204        transaction.add(
3205            store.store_object_id(),
3206            Mutation::replace_or_insert_object(
3207                ObjectKey::attribute(
3208                    object.object_id(),
3209                    AttributeId::FSVERITY_MERKLE,
3210                    AttributeKey::Attribute,
3211                ),
3212                ObjectValue::attribute(0, false),
3213            ),
3214        );
3215
3216        transaction.commit().await.unwrap();
3217
3218        let handle =
3219            ObjectStore::open_object(&store, object.object_id(), HandleOptions::default(), None)
3220                .await
3221                .expect("open_object failed");
3222
3223        assert!(handle.is_verified_file());
3224
3225        fs.close().await.expect("Close failed");
3226    }
3227
3228    #[fuchsia::test]
3229    async fn test_verified_file_without_verified_attribute() {
3230        let fs: OpenFxFilesystem = test_filesystem().await;
3231        let mut transaction = fs
3232            .root_store()
3233            .new_transaction(lock_keys![], Options::default())
3234            .await
3235            .expect("new_transaction failed");
3236        let store = fs.root_store();
3237        let object = Arc::new(
3238            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
3239                .await
3240                .expect("create_object failed"),
3241        );
3242
3243        transaction.commit().await.unwrap();
3244
3245        let handle =
3246            ObjectStore::open_object(&store, object.object_id(), HandleOptions::default(), None)
3247                .await
3248                .expect("open_object failed");
3249
3250        assert!(!handle.is_verified_file());
3251
3252        fs.close().await.expect("Close failed");
3253    }
3254
3255    #[fuchsia::test]
3256    async fn test_create_and_open_store() {
3257        let fs = test_filesystem().await;
3258        let store_id = {
3259            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3260            root_volume
3261                .new_volume(
3262                    "test",
3263                    NewChildStoreOptions {
3264                        options: StoreOptions { crypt: Some(Arc::new(new_insecure_crypt())) },
3265                        ..Default::default()
3266                    },
3267                )
3268                .await
3269                .expect("new_volume failed")
3270                .store_object_id()
3271        };
3272
3273        fs.close().await.expect("close failed");
3274        let device = fs.take_device().await;
3275        device.reopen(false);
3276        let fs = FxFilesystem::open(device).await.expect("open failed");
3277
3278        {
3279            let store = fs.object_manager().store(store_id).expect("store not found");
3280            store.unlock(Arc::new(new_insecure_crypt())).await.expect("unlock failed");
3281        }
3282        fs.close().await.expect("Close failed");
3283    }
3284
3285    #[fuchsia::test]
3286    async fn test_create_and_open_internal_dir() {
3287        let fs = test_filesystem().await;
3288        let dir_id;
3289        let store_id;
3290        {
3291            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3292            let store = root_volume
3293                .new_volume(
3294                    "test",
3295                    NewChildStoreOptions {
3296                        options: StoreOptions { crypt: Some(Arc::new(new_insecure_crypt())) },
3297                        ..Default::default()
3298                    },
3299                )
3300                .await
3301                .expect("new_volume failed");
3302            dir_id =
3303                store.get_or_create_internal_directory_id().await.expect("Create internal dir");
3304            store_id = store.store_object_id();
3305        }
3306
3307        fs.close().await.expect("close failed");
3308        let device = fs.take_device().await;
3309        device.reopen(false);
3310        let fs = FxFilesystem::open(device).await.expect("open failed");
3311
3312        {
3313            let store = fs.object_manager().store(store_id).expect("store not found");
3314            store.unlock(Arc::new(new_insecure_crypt())).await.expect("unlock failed");
3315            assert_eq!(
3316                dir_id,
3317                store.get_or_create_internal_directory_id().await.expect("Retrieving dir")
3318            );
3319            let obj = store
3320                .tree()
3321                .find(&ObjectKey::object(dir_id))
3322                .await
3323                .expect("Searching tree for dir")
3324                .unwrap();
3325            assert_matches!(
3326                obj.value,
3327                ObjectValue::Object { kind: ObjectKind::Directory { .. }, .. }
3328            );
3329        }
3330        fs.close().await.expect("Close failed");
3331    }
3332
3333    #[fuchsia::test]
3334    async fn test_create_and_open_internal_dir_unencrypted() {
3335        let fs = test_filesystem().await;
3336        let dir_id;
3337        let store_id;
3338        {
3339            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3340            let store = root_volume
3341                .new_volume("test", NewChildStoreOptions::default())
3342                .await
3343                .expect("new_volume failed");
3344            dir_id =
3345                store.get_or_create_internal_directory_id().await.expect("Create internal dir");
3346            store_id = store.store_object_id();
3347        }
3348
3349        fs.close().await.expect("close failed");
3350        let device = fs.take_device().await;
3351        device.reopen(false);
3352        let fs = FxFilesystem::open(device).await.expect("open failed");
3353
3354        {
3355            let store = fs.object_manager().store(store_id).expect("store not found");
3356            assert_eq!(
3357                dir_id,
3358                store.get_or_create_internal_directory_id().await.expect("Retrieving dir")
3359            );
3360            let obj = store
3361                .tree()
3362                .find(&ObjectKey::object(dir_id))
3363                .await
3364                .expect("Searching tree for dir")
3365                .unwrap();
3366            assert_matches!(
3367                obj.value,
3368                ObjectValue::Object { kind: ObjectKind::Directory { .. }, .. }
3369            );
3370        }
3371        fs.close().await.expect("Close failed");
3372    }
3373
3374    #[fuchsia::test(threads = 10)]
3375    async fn test_old_layers_are_purged() {
3376        let fs = test_filesystem().await;
3377
3378        let store = fs.root_store();
3379        let mut transaction = fs
3380            .root_store()
3381            .new_transaction(lock_keys![], Options::default())
3382            .await
3383            .expect("new_transaction failed");
3384        let object = Arc::new(
3385            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
3386                .await
3387                .expect("create_object failed"),
3388        );
3389        transaction.commit().await.expect("commit failed");
3390
3391        store.flush().await.expect("flush failed");
3392
3393        let mut buf = object.allocate_buffer(5).await;
3394        buf.as_mut_slice().copy_from_slice(b"hello");
3395        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3396
3397        // Getting the layer-set should cause the flush to stall.
3398        let layer_set = store.tree().layer_set();
3399
3400        let done = Mutex::new(false);
3401        let mut object_id = 0;
3402
3403        join!(
3404            async {
3405                store.flush().await.expect("flush failed");
3406                assert!(*done.lock());
3407            },
3408            async {
3409                // This is a halting problem so all we can do is sleep.
3410                fasync::Timer::new(Duration::from_secs(1)).await;
3411                *done.lock() = true;
3412                object_id = layer_set.layers.last().unwrap().handle().unwrap().object_id();
3413                std::mem::drop(layer_set);
3414            }
3415        );
3416
3417        if let Err(e) = ObjectStore::open_object(
3418            &store.parent_store.as_ref().unwrap(),
3419            object_id,
3420            HandleOptions::default(),
3421            store.crypt(),
3422        )
3423        .await
3424        {
3425            assert!(FxfsError::NotFound.matches(&e));
3426        } else {
3427            panic!("open_object succeeded");
3428        }
3429    }
3430
3431    #[fuchsia::test]
3432    async fn test_tombstone_deletes_data() {
3433        let fs = test_filesystem().await;
3434        let root_store = fs.root_store();
3435        let child_id = {
3436            let mut transaction = fs
3437                .root_store()
3438                .new_transaction(lock_keys![], Options::default())
3439                .await
3440                .expect("new_transaction failed");
3441            let child = ObjectStore::create_object(
3442                &root_store,
3443                &mut transaction,
3444                HandleOptions::default(),
3445                None,
3446            )
3447            .await
3448            .expect("create_object failed");
3449            root_store.add_to_graveyard(&mut transaction, child.object_id());
3450            transaction.commit().await.expect("commit failed");
3451
3452            // Allocate an extent in the file.
3453            let mut buffer = child.allocate_buffer(8192).await;
3454            buffer.as_mut_slice().fill(0xaa);
3455            child.write_or_append(Some(0), buffer.as_ref()).await.expect("write failed");
3456
3457            child.object_id()
3458        };
3459
3460        root_store.tombstone_object(child_id, Options::default()).await.expect("tombstone failed");
3461
3462        // Let fsck check allocations.
3463        fsck(fs.clone()).await.expect("fsck failed");
3464    }
3465
3466    #[fuchsia::test]
3467    async fn test_tombstone_purges_keys() {
3468        let fs = test_filesystem().await;
3469        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3470        let store = root_volume
3471            .new_volume(
3472                "test",
3473                NewChildStoreOptions {
3474                    options: StoreOptions {
3475                        crypt: Some(Arc::new(new_insecure_crypt())),
3476                        ..StoreOptions::default()
3477                    },
3478                    ..NewChildStoreOptions::default()
3479                },
3480            )
3481            .await
3482            .expect("new_volume failed");
3483        let mut transaction = fs
3484            .root_store()
3485            .new_transaction(lock_keys![], Options::default())
3486            .await
3487            .expect("new_transaction failed");
3488        let child =
3489            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
3490                .await
3491                .expect("create_object failed");
3492        store.add_to_graveyard(&mut transaction, child.object_id());
3493        transaction.commit().await.expect("commit failed");
3494        assert!(store.key_manager.get(child.object_id()).await.unwrap().is_some());
3495        store
3496            .tombstone_object(child.object_id(), Options::default())
3497            .await
3498            .expect("tombstone_object failed");
3499        assert!(store.key_manager.get(child.object_id()).await.unwrap().is_none());
3500        fs.close().await.expect("close failed");
3501    }
3502
3503    #[fuchsia::test]
3504    async fn test_major_compaction_discards_unnecessary_records() {
3505        let fs = test_filesystem().await;
3506        let root_store = fs.root_store();
3507        let child_id = {
3508            let mut transaction = fs
3509                .root_store()
3510                .new_transaction(lock_keys![], Options::default())
3511                .await
3512                .expect("new_transaction failed");
3513            let child = ObjectStore::create_object(
3514                &root_store,
3515                &mut transaction,
3516                HandleOptions::default(),
3517                None,
3518            )
3519            .await
3520            .expect("create_object failed");
3521            root_store.add_to_graveyard(&mut transaction, child.object_id());
3522            transaction.commit().await.expect("commit failed");
3523
3524            // Allocate an extent in the file.
3525            let mut buffer = child.allocate_buffer(8192).await;
3526            buffer.as_mut_slice().fill(0xaa);
3527            child.write_or_append(Some(0), buffer.as_ref()).await.expect("write failed");
3528
3529            child.object_id()
3530        };
3531
3532        root_store.tombstone_object(child_id, Options::default()).await.expect("tombstone failed");
3533        {
3534            let layers = root_store.tree.layer_set();
3535            let mut merger = layers.merger();
3536            let iter = merger
3537                .query(Query::FullRange(&ObjectKey::object(child_id)))
3538                .await
3539                .expect("seek failed");
3540            // Find at least one object still in the tree.
3541            match iter.get() {
3542                Some(ItemRef { key: ObjectKey { object_id, .. }, .. })
3543                    if *object_id == child_id => {}
3544                _ => panic!("Objects should still be in the tree."),
3545            }
3546        }
3547        root_store.flush().await.expect("flush failed");
3548
3549        // There should be no records for the object.
3550        let layers = root_store.tree.layer_set();
3551        let mut merger = layers.merger();
3552        let iter = merger
3553            .query(Query::FullRange(&ObjectKey::object(child_id)))
3554            .await
3555            .expect("seek failed");
3556        match iter.get() {
3557            None => {}
3558            Some(ItemRef { key: ObjectKey { object_id, .. }, .. }) => {
3559                assert_ne!(*object_id, child_id)
3560            }
3561        }
3562    }
3563
3564    #[fuchsia::test]
3565    async fn test_overlapping_extents_in_different_layers() {
3566        let fs = test_filesystem().await;
3567        let store = fs.root_store();
3568
3569        let mut transaction = store
3570            .new_transaction(
3571                lock_keys![LockKey::object(
3572                    store.store_object_id(),
3573                    store.root_directory_object_id()
3574                )],
3575                Options::default(),
3576            )
3577            .await
3578            .expect("new_transaction failed");
3579        let root_directory =
3580            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
3581        let object = root_directory
3582            .create_child_file(&mut transaction, "test")
3583            .await
3584            .expect("create_child_file failed");
3585        transaction.commit().await.expect("commit failed");
3586
3587        let buf = object.allocate_buffer(16384).await;
3588        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3589
3590        store.flush().await.expect("flush failed");
3591
3592        object.write_or_append(Some(0), buf.subslice(0..4096)).await.expect("write failed");
3593
3594        // At this point, we should have an extent for 0..16384 in a layer that has been flushed,
3595        // and an extent for 0..4096 that partially overwrites it.  Writing to 0..16384 should
3596        // overwrite both of those extents.
3597        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3598
3599        fsck(fs.clone()).await.expect("fsck failed");
3600    }
3601
3602    #[fuchsia::test(threads = 10)]
3603    async fn test_encrypted_mutations() {
3604        async fn one_iteration(
3605            fs: OpenFxFilesystem,
3606            crypt: Arc<dyn Crypt>,
3607            iteration: u64,
3608        ) -> OpenFxFilesystem {
3609            async fn reopen(fs: OpenFxFilesystem) -> OpenFxFilesystem {
3610                fs.close().await.expect("Close failed");
3611                let device = fs.take_device().await;
3612                device.reopen(false);
3613                FxFilesystem::open(device).await.expect("FS open failed")
3614            }
3615
3616            let fs = reopen(fs).await;
3617
3618            let (store_object_id, object_id) = {
3619                let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3620                let store = root_volume
3621                    .volume(
3622                        "test",
3623                        StoreOptions { crypt: Some(crypt.clone()), ..StoreOptions::default() },
3624                    )
3625                    .await
3626                    .expect("volume failed");
3627
3628                let mut transaction = fs
3629                    .root_store()
3630                    .new_transaction(
3631                        lock_keys![LockKey::object(
3632                            store.store_object_id(),
3633                            store.root_directory_object_id(),
3634                        )],
3635                        Options::default(),
3636                    )
3637                    .await
3638                    .expect("new_transaction failed");
3639                let root_directory = Directory::open(&store, store.root_directory_object_id())
3640                    .await
3641                    .expect("open failed");
3642                let object = root_directory
3643                    .create_child_file(&mut transaction, &format!("test {}", iteration))
3644                    .await
3645                    .expect("create_child_file failed");
3646                transaction.commit().await.expect("commit failed");
3647
3648                let mut buf = object.allocate_buffer(1000).await;
3649                for i in 0..buf.len() {
3650                    buf.as_mut_slice()[i] = i as u8;
3651                }
3652                object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3653
3654                (store.store_object_id(), object.object_id())
3655            };
3656
3657            let fs = reopen(fs).await;
3658
3659            let check_object = |fs: Arc<FxFilesystem>| {
3660                let crypt = crypt.clone();
3661                async move {
3662                    let root_volume = root_volume(fs).await.expect("root_volume failed");
3663                    let volume = root_volume
3664                        .volume(
3665                            "test",
3666                            StoreOptions { crypt: Some(crypt), ..StoreOptions::default() },
3667                        )
3668                        .await
3669                        .expect("volume failed");
3670
3671                    let object = ObjectStore::open_object(
3672                        &volume,
3673                        object_id,
3674                        HandleOptions::default(),
3675                        None,
3676                    )
3677                    .await
3678                    .expect("open_object failed");
3679                    let mut buf = object.allocate_buffer(1000).await;
3680                    assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), 1000);
3681                    for i in 0..buf.len() {
3682                        assert_eq!(buf.as_slice()[i], i as u8);
3683                    }
3684                }
3685            };
3686
3687            check_object(fs.clone()).await;
3688
3689            let fs = reopen(fs).await;
3690
3691            // At this point the "test" volume is locked.  Before checking the object, flush the
3692            // filesystem.  This should leave a file with encrypted mutations.
3693            fs.object_manager().flush().await.expect("flush failed");
3694
3695            assert_ne!(
3696                fs.object_manager()
3697                    .store(store_object_id)
3698                    .unwrap()
3699                    .load_store_info()
3700                    .await
3701                    .expect("load_store_info failed")
3702                    .encrypted_mutations_object_id,
3703                INVALID_OBJECT_ID
3704            );
3705
3706            check_object(fs.clone()).await;
3707
3708            // Checking the object should have triggered a flush and so now there should be no
3709            // encrypted mutations object.
3710            assert_eq!(
3711                fs.object_manager()
3712                    .store(store_object_id)
3713                    .unwrap()
3714                    .load_store_info()
3715                    .await
3716                    .expect("load_store_info failed")
3717                    .encrypted_mutations_object_id,
3718                INVALID_OBJECT_ID
3719            );
3720
3721            let fs = reopen(fs).await;
3722
3723            fsck(fs.clone()).await.expect("fsck failed");
3724
3725            let fs = reopen(fs).await;
3726
3727            check_object(fs.clone()).await;
3728
3729            fs
3730        }
3731
3732        let mut fs = test_filesystem().await;
3733        let crypt = Arc::new(new_insecure_crypt());
3734
3735        {
3736            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3737            let _store = root_volume
3738                .new_volume(
3739                    "test",
3740                    NewChildStoreOptions {
3741                        options: StoreOptions {
3742                            crypt: Some(crypt.clone()),
3743                            ..StoreOptions::default()
3744                        },
3745                        ..Default::default()
3746                    },
3747                )
3748                .await
3749                .expect("new_volume failed");
3750        }
3751
3752        // Run a few iterations so that we test changes with the stream cipher offset.
3753        for i in 0..5 {
3754            fs = one_iteration(fs, crypt.clone(), i).await;
3755        }
3756    }
3757
3758    #[test_case(true; "with a flush")]
3759    #[test_case(false; "without a flush")]
3760    #[fuchsia::test(threads = 10)]
3761    async fn test_object_id_cipher_roll(with_flush: bool) {
3762        let fs = test_filesystem().await;
3763        let crypt = Arc::new(new_insecure_crypt());
3764
3765        let expected_key = {
3766            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3767            let store = root_volume
3768                .new_volume(
3769                    "test",
3770                    NewChildStoreOptions {
3771                        options: StoreOptions {
3772                            crypt: Some(crypt.clone()),
3773                            ..StoreOptions::default()
3774                        },
3775                        ..Default::default()
3776                    },
3777                )
3778                .await
3779                .expect("new_volume failed");
3780
3781            // Create some files so that our in-memory copy of StoreInfo has changes (the object
3782            // count) pending a flush.
3783            let root_dir_id = store.root_directory_object_id();
3784            let root_dir =
3785                Arc::new(Directory::open(&store, root_dir_id).await.expect("open failed"));
3786            let mut transaction = store
3787                .new_transaction(
3788                    lock_keys![LockKey::object(store.store_object_id(), root_dir_id)],
3789                    Options::default(),
3790                )
3791                .await
3792                .expect("new_transaction failed");
3793            for i in 0..10 {
3794                root_dir.create_child_file(&mut transaction, &format!("file {i}")).await.unwrap();
3795            }
3796            transaction.commit().await.expect("commit failed");
3797
3798            let orig_store_info = store.store_info().unwrap();
3799
3800            // Hack the last object ID to force a roll of the object ID cipher.
3801            {
3802                let mut last_object_id = store.last_object_id.lock();
3803                match &mut *last_object_id {
3804                    LastObjectId::Encrypted { id, .. } => {
3805                        assert_eq!(*id & OBJECT_ID_HI_MASK, 0);
3806                        *id |= 0xffffffff;
3807                    }
3808                    _ => unreachable!(),
3809                }
3810            }
3811
3812            let mut transaction = store
3813                .new_transaction(
3814                    lock_keys![LockKey::object(
3815                        store.store_object_id(),
3816                        store.root_directory_object_id()
3817                    )],
3818                    Options::default(),
3819                )
3820                .await
3821                .expect("new_transaction failed");
3822            let root_directory = Directory::open(&store, store.root_directory_object_id())
3823                .await
3824                .expect("open failed");
3825            let object = root_directory
3826                .create_child_file(&mut transaction, "test")
3827                .await
3828                .expect("create_child_file failed");
3829            transaction.commit().await.expect("commit failed");
3830
3831            assert_eq!(object.object_id() & OBJECT_ID_HI_MASK, 1u64 << 32);
3832
3833            // Check that the key has been changed.
3834            let key = match (
3835                store.store_info().unwrap().last_object_id,
3836                orig_store_info.last_object_id,
3837            ) {
3838                (
3839                    LastObjectIdInfo::Encrypted { key, id },
3840                    LastObjectIdInfo::Encrypted { key: orig_key, .. },
3841                ) => {
3842                    assert_ne!(key, orig_key);
3843                    assert_eq!(id, 1u64 << 32);
3844                    key
3845                }
3846                _ => unreachable!(),
3847            };
3848
3849            if with_flush {
3850                fs.journal().force_compact().await.unwrap();
3851            }
3852
3853            let last_object_id = store.last_object_id.lock();
3854            assert_eq!(last_object_id.id(), 1u64 << 32);
3855            key
3856        };
3857
3858        fs.close().await.expect("Close failed");
3859        let device = fs.take_device().await;
3860        device.reopen(false);
3861        let fs = FxFilesystem::open(device).await.expect("open failed");
3862        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3863        let store = root_volume
3864            .volume("test", StoreOptions { crypt: Some(crypt.clone()), ..StoreOptions::default() })
3865            .await
3866            .expect("volume failed");
3867
3868        assert_matches!(store.store_info().unwrap().last_object_id, LastObjectIdInfo::Encrypted { key, .. } if key == expected_key);
3869        assert_eq!(store.last_object_id.lock().id(), 1u64 << 32);
3870
3871        fsck(fs.clone()).await.expect("fsck failed");
3872        fsck_volume(&fs, store.store_object_id(), None).await.expect("fsck_volume failed");
3873    }
3874
3875    #[fuchsia::test(threads = 2)]
3876    async fn test_race_object_id_cipher_roll_and_flush() {
3877        let fs = test_filesystem().await;
3878        let crypt = Arc::new(new_insecure_crypt());
3879
3880        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3881        let store = root_volume
3882            .new_volume(
3883                "test",
3884                NewChildStoreOptions {
3885                    options: StoreOptions { crypt: Some(crypt.clone()), ..StoreOptions::default() },
3886                    ..Default::default()
3887                },
3888            )
3889            .await
3890            .expect("new_volume failed");
3891
3892        assert!(matches!(&*store.last_object_id.lock(), LastObjectId::Encrypted { .. }));
3893
3894        // Create some files so that our in-memory copy of StoreInfo has changes (the object
3895        // count) pending a flush.
3896        let root_dir_id = store.root_directory_object_id();
3897        let root_dir = Arc::new(Directory::open(&store, root_dir_id).await.expect("open failed"));
3898
3899        let _executor_tasks = testing::force_executor_threads_to_run(2).await;
3900
3901        for j in 0..100 {
3902            let mut transaction = store
3903                .new_transaction(
3904                    lock_keys![LockKey::object(store.store_object_id(), root_dir_id)],
3905                    Options::default(),
3906                )
3907                .await
3908                .expect("new_transaction failed");
3909            root_dir.create_child_file(&mut transaction, &format!("file {j}")).await.unwrap();
3910            transaction.commit().await.expect("commit failed");
3911
3912            let task = {
3913                let fs = fs.clone();
3914                fasync::Task::spawn(async move {
3915                    fs.journal().force_compact().await.unwrap();
3916                })
3917            };
3918
3919            // Hack the last object ID to force a roll of the object ID cipher.
3920            {
3921                let mut last_object_id = store.last_object_id.lock();
3922                let LastObjectId::Encrypted { id, .. } = &mut *last_object_id else {
3923                    unreachable!()
3924                };
3925                assert_eq!(*id >> 32, j);
3926                *id |= 0xffffffff;
3927            }
3928
3929            let mut transaction = store
3930                .new_transaction(
3931                    lock_keys![LockKey::object(
3932                        store.store_object_id(),
3933                        store.root_directory_object_id()
3934                    )],
3935                    Options::default(),
3936                )
3937                .await
3938                .expect("new_transaction failed");
3939            let root_directory = Directory::open(&store, store.root_directory_object_id())
3940                .await
3941                .expect("open failed");
3942            root_directory
3943                .create_child_file(&mut transaction, "test {j}")
3944                .await
3945                .expect("create_child_file failed");
3946            transaction.commit().await.expect("commit failed");
3947
3948            task.await;
3949
3950            // Check that the key has been changed.
3951            let new_store_info = store.load_store_info().await.unwrap();
3952
3953            let LastObjectIdInfo::Encrypted { id, key } = new_store_info.last_object_id else {
3954                unreachable!()
3955            };
3956            assert_eq!(id >> 32, j + 1);
3957            let LastObjectIdInfo::Encrypted { key: in_memory_key, .. } =
3958                store.store_info().unwrap().last_object_id
3959            else {
3960                unreachable!()
3961            };
3962            assert_eq!(key, in_memory_key);
3963        }
3964
3965        fs.close().await.expect("Close failed");
3966    }
3967
3968    #[fuchsia::test]
3969    async fn test_object_id_no_roll_for_unencrypted_store() {
3970        let fs = test_filesystem().await;
3971
3972        {
3973            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3974            let store = root_volume
3975                .new_volume("test", NewChildStoreOptions::default())
3976                .await
3977                .expect("new_volume failed");
3978
3979            // Hack the last object ID.
3980            {
3981                let mut last_object_id = store.last_object_id.lock();
3982                match &mut *last_object_id {
3983                    LastObjectId::Unencrypted { id } => {
3984                        assert_eq!(*id & OBJECT_ID_HI_MASK, 0);
3985                        *id |= 0xffffffff;
3986                    }
3987                    _ => unreachable!(),
3988                }
3989            }
3990
3991            let mut transaction = store
3992                .new_transaction(
3993                    lock_keys![LockKey::object(
3994                        store.store_object_id(),
3995                        store.root_directory_object_id()
3996                    )],
3997                    Options::default(),
3998                )
3999                .await
4000                .expect("new_transaction failed");
4001            let root_directory = Directory::open(&store, store.root_directory_object_id())
4002                .await
4003                .expect("open failed");
4004            let object = root_directory
4005                .create_child_file(&mut transaction, "test")
4006                .await
4007                .expect("create_child_file failed");
4008            transaction.commit().await.expect("commit failed");
4009
4010            assert_eq!(object.object_id(), 0x1_0000_0000);
4011
4012            // Check that there is still no key.
4013            assert_matches!(
4014                store.store_info().unwrap().last_object_id,
4015                LastObjectIdInfo::Unencrypted { .. }
4016            );
4017
4018            assert_eq!(store.last_object_id.lock().id(), 0x1_0000_0000);
4019        };
4020
4021        fs.close().await.expect("Close failed");
4022        let device = fs.take_device().await;
4023        device.reopen(false);
4024        let fs = FxFilesystem::open(device).await.expect("open failed");
4025        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
4026        let store =
4027            root_volume.volume("test", StoreOptions::default()).await.expect("volume failed");
4028
4029        assert_eq!(store.last_object_id.lock().id(), 0x1_0000_0000);
4030    }
4031
4032    #[fuchsia::test]
4033    fn test_object_id_is_not_invalid_object_id() {
4034        let key = UnwrappedKey::new(vec![0; FXFS_KEY_SIZE]);
4035        // 1106634048 results in INVALID_OBJECT_ID with this key.
4036        let mut last_object_id =
4037            LastObjectId::Encrypted { id: 1106634047, cipher: Box::new(Ff1::new(&key)) };
4038        assert!(last_object_id.try_get_next().is_some());
4039        assert!(last_object_id.try_get_next().is_some());
4040    }
4041
4042    #[fuchsia::test]
4043    async fn test_last_object_id_is_correct_after_unlock() {
4044        let fs = test_filesystem().await;
4045        let crypt = Arc::new(new_insecure_crypt());
4046
4047        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
4048        let store = root_volume
4049            .new_volume(
4050                "test",
4051                NewChildStoreOptions {
4052                    options: StoreOptions { crypt: Some(crypt.clone()), ..StoreOptions::default() },
4053                    ..Default::default()
4054                },
4055            )
4056            .await
4057            .expect("new_volume failed");
4058
4059        let mut transaction = store
4060            .new_transaction(
4061                lock_keys![LockKey::object(
4062                    store.store_object_id(),
4063                    store.root_directory_object_id()
4064                )],
4065                Options::default(),
4066            )
4067            .await
4068            .expect("new_transaction failed");
4069        let root_directory =
4070            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
4071        root_directory
4072            .create_child_file(&mut transaction, "test")
4073            .await
4074            .expect("create_child_file failed");
4075        transaction.commit().await.expect("commit failed");
4076
4077        // Compact so that StoreInfo is written.
4078        fs.journal().force_compact().await.unwrap();
4079
4080        let last_object_id = store.last_object_id.lock().id();
4081
4082        store.lock().await.unwrap();
4083        store.unlock(crypt.clone()).await.unwrap();
4084
4085        assert_eq!(store.last_object_id.lock().id(), last_object_id);
4086    }
4087
4088    #[fuchsia::test(threads = 20)]
4089    async fn test_race_when_rolling_last_object_id_cipher() {
4090        // NOTE: This test is trying to test a race, so if it fails, it might be flaky.
4091
4092        const NUM_THREADS: usize = 20;
4093
4094        let fs = test_filesystem().await;
4095        let crypt = Arc::new(new_insecure_crypt());
4096
4097        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
4098        let store = root_volume
4099            .new_volume(
4100                "test",
4101                NewChildStoreOptions {
4102                    options: StoreOptions { crypt: Some(crypt.clone()), ..StoreOptions::default() },
4103                    ..Default::default()
4104                },
4105            )
4106            .await
4107            .expect("new_volume failed");
4108
4109        let store_id = store.store_object_id();
4110        let root_dir_id = store.root_directory_object_id();
4111
4112        let root_directory =
4113            Arc::new(Directory::open(&store, root_dir_id).await.expect("open failed"));
4114
4115        // Create directories.
4116        let mut directories = Vec::new();
4117        for _ in 0..NUM_THREADS {
4118            let mut transaction = fs
4119                .root_store()
4120                .new_transaction(
4121                    lock_keys![LockKey::object(store_id, root_dir_id,)],
4122                    Options::default(),
4123                )
4124                .await
4125                .expect("new_transaction failed");
4126            directories.push(
4127                root_directory
4128                    .create_child_dir(&mut transaction, "test")
4129                    .await
4130                    .expect("create_child_file failed"),
4131            );
4132            transaction.commit().await.expect("commit failed");
4133        }
4134
4135        // Hack the last object ID so that the next ID will require a roll.
4136        match &mut *store.last_object_id.lock() {
4137            LastObjectId::Encrypted { id, .. } => *id |= 0xffff_ffff,
4138            _ => unreachable!(),
4139        }
4140
4141        let scope = fasync::Scope::new();
4142
4143        let _executor_tasks = testing::force_executor_threads_to_run(NUM_THREADS).await;
4144
4145        for dir in directories {
4146            let fs = fs.clone();
4147            scope.spawn(async move {
4148                let mut transaction = fs
4149                    .root_store()
4150                    .new_transaction(
4151                        lock_keys![LockKey::object(store_id, dir.object_id(),)],
4152                        Options::default(),
4153                    )
4154                    .await
4155                    .expect("new_transaction failed");
4156                dir.create_child_file(&mut transaction, "test")
4157                    .await
4158                    .expect("create_child_file failed");
4159                transaction.commit().await.expect("commit failed");
4160            });
4161        }
4162
4163        scope.on_no_tasks().await;
4164
4165        assert_eq!(store.last_object_id.lock().id(), 0x1_0000_0000 + NUM_THREADS as u64 - 1);
4166    }
4167
4168    #[fuchsia::test(threads = 10)]
4169    async fn test_lock_store() {
4170        let fs = test_filesystem().await;
4171        let crypt = Arc::new(new_insecure_crypt());
4172
4173        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
4174        let store = root_volume
4175            .new_volume(
4176                "test",
4177                NewChildStoreOptions {
4178                    options: StoreOptions { crypt: Some(crypt.clone()), ..StoreOptions::default() },
4179                    ..NewChildStoreOptions::default()
4180                },
4181            )
4182            .await
4183            .expect("new_volume failed");
4184        let mut transaction = store
4185            .new_transaction(
4186                lock_keys![LockKey::object(
4187                    store.store_object_id(),
4188                    store.root_directory_object_id()
4189                )],
4190                Options::default(),
4191            )
4192            .await
4193            .expect("new_transaction failed");
4194        let root_directory =
4195            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
4196        root_directory
4197            .create_child_file(&mut transaction, "test")
4198            .await
4199            .expect("create_child_file failed");
4200        transaction.commit().await.expect("commit failed");
4201        store.lock().await.expect("lock failed");
4202
4203        store.unlock(crypt).await.expect("unlock failed");
4204        root_directory.lookup("test").await.expect("lookup failed").expect("not found");
4205    }
4206
4207    #[fuchsia::test(threads = 10)]
4208    async fn test_unlock_read_only() {
4209        let fs = test_filesystem().await;
4210        let crypt = Arc::new(new_insecure_crypt());
4211
4212        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
4213        let store = root_volume
4214            .new_volume(
4215                "test",
4216                NewChildStoreOptions {
4217                    options: StoreOptions { crypt: Some(crypt.clone()), ..StoreOptions::default() },
4218                    ..NewChildStoreOptions::default()
4219                },
4220            )
4221            .await
4222            .expect("new_volume failed");
4223        let mut transaction = store
4224            .new_transaction(
4225                lock_keys![LockKey::object(
4226                    store.store_object_id(),
4227                    store.root_directory_object_id()
4228                )],
4229                Options::default(),
4230            )
4231            .await
4232            .expect("new_transaction failed");
4233        let root_directory =
4234            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
4235        root_directory
4236            .create_child_file(&mut transaction, "test")
4237            .await
4238            .expect("create_child_file failed");
4239        transaction.commit().await.expect("commit failed");
4240        store.lock().await.expect("lock failed");
4241
4242        store.unlock_read_only(crypt.clone()).await.expect("unlock failed");
4243        root_directory.lookup("test").await.expect("lookup failed").expect("not found");
4244        store.lock_read_only();
4245        store.unlock_read_only(crypt).await.expect("unlock failed");
4246        root_directory.lookup("test").await.expect("lookup failed").expect("not found");
4247    }
4248
4249    #[fuchsia::test(threads = 10)]
4250    async fn test_key_rolled_when_unlocked() {
4251        let fs = test_filesystem().await;
4252        let crypt = Arc::new(new_insecure_crypt());
4253
4254        let object_id;
4255        {
4256            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
4257            let store = root_volume
4258                .new_volume(
4259                    "test",
4260                    NewChildStoreOptions {
4261                        options: StoreOptions {
4262                            crypt: Some(crypt.clone()),
4263                            ..StoreOptions::default()
4264                        },
4265                        ..Default::default()
4266                    },
4267                )
4268                .await
4269                .expect("new_volume failed");
4270            let mut transaction = store
4271                .new_transaction(
4272                    lock_keys![LockKey::object(
4273                        store.store_object_id(),
4274                        store.root_directory_object_id()
4275                    )],
4276                    Options::default(),
4277                )
4278                .await
4279                .expect("new_transaction failed");
4280            let root_directory = Directory::open(&store, store.root_directory_object_id())
4281                .await
4282                .expect("open failed");
4283            object_id = root_directory
4284                .create_child_file(&mut transaction, "test")
4285                .await
4286                .expect("create_child_file failed")
4287                .object_id();
4288            transaction.commit().await.expect("commit failed");
4289        }
4290
4291        fs.close().await.expect("Close failed");
4292        let mut device = fs.take_device().await;
4293
4294        // Repeatedly remount so that we can be sure that we can remount when there are many
4295        // mutations keys.
4296        for _ in 0..100 {
4297            device.reopen(false);
4298            let fs = FxFilesystem::open(device).await.expect("open failed");
4299            {
4300                let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
4301                let store = root_volume
4302                    .volume(
4303                        "test",
4304                        StoreOptions { crypt: Some(crypt.clone()), ..StoreOptions::default() },
4305                    )
4306                    .await
4307                    .expect("open_volume failed");
4308
4309                // The key should get rolled every time we unlock.
4310                assert_eq!(store.mutations_cipher.lock().as_ref().unwrap().offset(), 0);
4311
4312                // Make sure there's an encrypted mutation.
4313                let handle =
4314                    ObjectStore::open_object(&store, object_id, HandleOptions::default(), None)
4315                        .await
4316                        .expect("open_object failed");
4317                let buffer = handle.allocate_buffer(100).await;
4318                handle
4319                    .write_or_append(Some(0), buffer.as_ref())
4320                    .await
4321                    .expect("write_or_append failed");
4322            }
4323            fs.close().await.expect("Close failed");
4324            device = fs.take_device().await;
4325        }
4326    }
4327
4328    #[test]
4329    fn test_store_info_max_serialized_size() {
4330        let info = StoreInfo {
4331            guid: [0xff; 16],
4332            last_object_id: LastObjectIdInfo::Encrypted {
4333                id: 0x1234567812345678,
4334                key: FxfsKey {
4335                    wrapping_key_id: 0x1234567812345678u128.to_le_bytes(),
4336                    key: WrappedKeyBytes::from([0xff; FXFS_WRAPPED_KEY_SIZE]),
4337                },
4338            },
4339            // Worst case, each layer should be 3/4 the size of the layer below it (because of the
4340            // compaction policy we're using).  If the smallest layer is 8,192 bytes, then 120
4341            // layers would take up a size that exceeds a 64 bit unsigned integer, so if this fits,
4342            // any size should fit.
4343            layers: vec![0x1234567812345678; 120],
4344            root_directory_object_id: 0x1234567812345678,
4345            graveyard_directory_object_id: 0x1234567812345678,
4346            object_count: 0x1234567812345678,
4347            mutations_key: Some(FxfsKey {
4348                wrapping_key_id: 0x1234567812345678u128.to_le_bytes(),
4349                key: WrappedKeyBytes::from([0xff; FXFS_WRAPPED_KEY_SIZE]),
4350            }),
4351            mutations_cipher_offset: 0x1234567812345678,
4352            encrypted_mutations_object_id: 0x1234567812345678,
4353            internal_directory_object_id: INVALID_OBJECT_ID,
4354        };
4355        let mut serialized_info = Vec::new();
4356        info.serialize_with_version(&mut serialized_info).unwrap();
4357        assert!(
4358            serialized_info.len() <= MAX_STORE_INFO_SERIALIZED_SIZE,
4359            "{}",
4360            serialized_info.len()
4361        );
4362    }
4363
4364    async fn reopen_after_crypt_failure_inner(read_only: bool) {
4365        let fs = test_filesystem().await;
4366        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
4367
4368        let store = {
4369            let crypt = Arc::new(new_insecure_crypt());
4370            let store = root_volume
4371                .new_volume(
4372                    "vol",
4373                    NewChildStoreOptions {
4374                        options: StoreOptions {
4375                            crypt: Some(crypt.clone()),
4376                            ..StoreOptions::default()
4377                        },
4378                        ..Default::default()
4379                    },
4380                )
4381                .await
4382                .expect("new_volume failed");
4383            let root_directory = Directory::open(&store, store.root_directory_object_id())
4384                .await
4385                .expect("open failed");
4386            let mut transaction = fs
4387                .root_store()
4388                .new_transaction(
4389                    lock_keys![LockKey::object(
4390                        store.store_object_id(),
4391                        root_directory.object_id()
4392                    )],
4393                    Options::default(),
4394                )
4395                .await
4396                .expect("new_transaction failed");
4397            root_directory
4398                .create_child_file(&mut transaction, "test")
4399                .await
4400                .expect("create_child_file failed");
4401            transaction.commit().await.expect("commit failed");
4402
4403            crypt.shutdown();
4404            let mut transaction = fs
4405                .root_store()
4406                .new_transaction(
4407                    lock_keys![LockKey::object(
4408                        store.store_object_id(),
4409                        root_directory.object_id()
4410                    )],
4411                    Options::default(),
4412                )
4413                .await
4414                .expect("new_transaction failed");
4415            root_directory
4416                .create_child_file(&mut transaction, "test2")
4417                .await
4418                .map(|_| ())
4419                .expect_err("create_child_file should fail");
4420            store.lock().await.expect("lock failed");
4421            store
4422        };
4423
4424        let crypt = Arc::new(new_insecure_crypt());
4425        if read_only {
4426            store.unlock_read_only(crypt).await.expect("unlock failed");
4427        } else {
4428            store.unlock(crypt).await.expect("unlock failed");
4429        }
4430        let root_directory =
4431            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
4432        root_directory.lookup("test").await.expect("lookup failed").expect("not found");
4433    }
4434
4435    #[fuchsia::test(threads = 10)]
4436    async fn test_reopen_after_crypt_failure() {
4437        reopen_after_crypt_failure_inner(false).await;
4438    }
4439
4440    #[fuchsia::test(threads = 10)]
4441    async fn test_reopen_read_only_after_crypt_failure() {
4442        reopen_after_crypt_failure_inner(true).await;
4443    }
4444
4445    #[fuchsia::test(threads = 10)]
4446    #[should_panic(expected = "Insufficient reservation space")]
4447    #[cfg(debug_assertions)]
4448    async fn large_transaction_causes_panic_in_debug_builds() {
4449        let fs = test_filesystem().await;
4450        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
4451        let store = root_volume
4452            .new_volume("vol", NewChildStoreOptions::default())
4453            .await
4454            .expect("new_volume failed");
4455        let root_directory =
4456            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
4457        let mut transaction = fs
4458            .root_store()
4459            .new_transaction(
4460                lock_keys![LockKey::object(store.store_object_id(), root_directory.object_id())],
4461                Options::default(),
4462            )
4463            .await
4464            .expect("transaction");
4465        for i in 0..500 {
4466            root_directory
4467                .create_symlink(&mut transaction, b"link", &format!("{}", i))
4468                .await
4469                .expect("symlink");
4470        }
4471        assert_eq!(transaction.commit().await.expect("commit"), 0);
4472    }
4473
4474    #[fuchsia::test]
4475    async fn test_crypt_failure_does_not_fuse_journal() {
4476        let fs = test_filesystem().await;
4477
4478        {
4479            // Create two stores and a record for each store, so the journal will need to flush them
4480            // both later.
4481            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
4482            let store1 = root_volume
4483                .new_volume(
4484                    "vol1",
4485                    NewChildStoreOptions {
4486                        options: StoreOptions {
4487                            crypt: Some(Arc::new(new_insecure_crypt())),
4488                            ..StoreOptions::default()
4489                        },
4490                        ..Default::default()
4491                    },
4492                )
4493                .await
4494                .expect("new_volume failed");
4495            let crypt = Arc::new(new_insecure_crypt());
4496            let store2 = root_volume
4497                .new_volume(
4498                    "vol2",
4499                    NewChildStoreOptions {
4500                        options: StoreOptions { crypt: Some(crypt.clone()) },
4501                        ..Default::default()
4502                    },
4503                )
4504                .await
4505                .expect("new_volume failed");
4506            for store in [&store1, &store2] {
4507                let root_directory = Directory::open(store, store.root_directory_object_id())
4508                    .await
4509                    .expect("open failed");
4510                let mut transaction = store
4511                    .new_transaction(
4512                        lock_keys![LockKey::object(
4513                            store.store_object_id(),
4514                            root_directory.object_id()
4515                        )],
4516                        Options::default(),
4517                    )
4518                    .await
4519                    .expect("new_transaction failed");
4520                root_directory
4521                    .create_child_file(&mut transaction, "test")
4522                    .await
4523                    .expect("create_child_file failed");
4524                transaction.commit().await.expect("commit failed");
4525            }
4526
4527            // Shut down the crypt instance for store2.
4528            crypt.shutdown();
4529
4530            // Compact. This flushes store2 (using cached key) and store1.  This consumes 1 cached
4531            // key from store2.
4532            fs.journal().force_compact().await.expect("compact failed");
4533
4534            // Write to store2 again. This should fail because store2 needs to top up its cached
4535            // keys, which requires calling the (dead) crypt service.
4536            let root_directory2 = Directory::open(&store2, store2.root_directory_object_id())
4537                .await
4538                .expect("open failed");
4539            let (child_id, _, _) = root_directory2
4540                .lookup("test")
4541                .await
4542                .expect("lookup failed")
4543                .expect("test file not found");
4544            let mut transaction2 = store2
4545                .new_transaction(
4546                    lock_keys![
4547                        LockKey::object(store2.store_object_id(), root_directory2.object_id()),
4548                        LockKey::object(store2.store_object_id(), child_id),
4549                    ],
4550                    Options::default(),
4551                )
4552                .await
4553                .expect("new_transaction failed");
4554            replace_child(&mut transaction2, None, (&root_directory2, "test"))
4555                .await
4556                .expect("replace_child failed");
4557            assert!(transaction2.commit().await.is_err());
4558
4559            // Write to store1 should still succeed (its crypt is not dead).
4560            let root_directory1 = Directory::open(&store1, store1.root_directory_object_id())
4561                .await
4562                .expect("open failed");
4563            let mut transaction1 = store1
4564                .new_transaction(
4565                    lock_keys![LockKey::object(
4566                        store1.store_object_id(),
4567                        root_directory1.object_id()
4568                    )],
4569                    Options::default(),
4570                )
4571                .await
4572                .expect("new_transaction failed");
4573            root_directory1
4574                .create_child_file(&mut transaction1, "test2")
4575                .await
4576                .expect("create_child_file failed");
4577            transaction1.commit().await.expect("commit failed");
4578
4579            // Compact again. Should succeed.
4580            fs.journal().force_compact().await.expect("compact failed");
4581        }
4582
4583        // Close and reopen to verify.
4584        fs.close().await.expect("close failed");
4585        let device = fs.take_device().await;
4586        device.reopen(false);
4587        let fs = FxFilesystem::open(device).await.expect("open failed");
4588        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
4589
4590        // vol1 should have "test" and "test2".
4591        let store1 = root_volume
4592            .volume(
4593                "vol1",
4594                StoreOptions {
4595                    crypt: Some(Arc::new(new_insecure_crypt())),
4596                    ..StoreOptions::default()
4597                },
4598            )
4599            .await
4600            .expect("volume failed");
4601        let root_directory1 =
4602            Directory::open(&store1, store1.root_directory_object_id()).await.expect("open failed");
4603        assert!(root_directory1.lookup("test").await.expect("lookup failed").is_some());
4604        assert!(root_directory1.lookup("test2").await.expect("lookup failed").is_some());
4605
4606        // vol2 should only have "test".
4607        let store2 = root_volume
4608            .volume(
4609                "vol2",
4610                StoreOptions {
4611                    crypt: Some(Arc::new(new_insecure_crypt())),
4612                    ..StoreOptions::default()
4613                },
4614            )
4615            .await
4616            .expect("volume failed");
4617        let root_directory2 =
4618            Directory::open(&store2, store2.root_directory_object_id()).await.expect("open failed");
4619        assert!(root_directory2.lookup("test").await.expect("lookup failed").is_some());
4620        assert!(root_directory2.lookup("test2").await.expect("lookup failed").is_none());
4621
4622        fs.close().await.expect("close failed");
4623    }
4624
4625    #[fuchsia::test]
4626    async fn test_crypt_failure_during_unlock_race() {
4627        let fs = test_filesystem().await;
4628
4629        let store_object_id = {
4630            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
4631            let store = root_volume
4632                .new_volume(
4633                    "vol",
4634                    NewChildStoreOptions {
4635                        options: StoreOptions { crypt: Some(Arc::new(new_insecure_crypt())) },
4636                        ..Default::default()
4637                    },
4638                )
4639                .await
4640                .expect("new_volume failed");
4641            let root_directory = Directory::open(&store, store.root_directory_object_id())
4642                .await
4643                .expect("open failed");
4644            let mut transaction = fs
4645                .root_store()
4646                .new_transaction(
4647                    lock_keys![LockKey::object(
4648                        store.store_object_id(),
4649                        root_directory.object_id()
4650                    )],
4651                    Options::default(),
4652                )
4653                .await
4654                .expect("new_transaction failed");
4655            root_directory
4656                .create_child_file(&mut transaction, "test")
4657                .await
4658                .expect("create_child_file failed");
4659            transaction.commit().await.expect("commit failed");
4660            store.store_object_id()
4661        };
4662
4663        fs.close().await.expect("close failed");
4664        let device = fs.take_device().await;
4665        device.reopen(false);
4666
4667        let fs = FxFilesystem::open(device).await.expect("open failed");
4668        {
4669            let fs_clone = fs.clone();
4670            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
4671
4672            let crypt = Arc::new(new_insecure_crypt());
4673            let crypt_clone = crypt.clone();
4674            join!(
4675                async move {
4676                    // Unlock might fail, so ignore errors.
4677                    let _ =
4678                        root_volume.volume("vol", StoreOptions { crypt: Some(crypt_clone) }).await;
4679                },
4680                async move {
4681                    // Block until unlock is finished but before flushing due to unlock is finished, to
4682                    // maximize the chances of weirdness.
4683                    let keys = lock_keys![LockKey::flush(store_object_id)];
4684                    let _ = fs_clone.lock_manager().write_lock(keys).await;
4685                    crypt.shutdown();
4686                }
4687            );
4688        }
4689
4690        fs.close().await.expect("close failed");
4691        let device = fs.take_device().await;
4692        device.reopen(false);
4693
4694        let fs = FxFilesystem::open(device).await.expect("open failed");
4695        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
4696        let store = root_volume
4697            .volume(
4698                "vol",
4699                StoreOptions {
4700                    crypt: Some(Arc::new(new_insecure_crypt())),
4701                    ..StoreOptions::default()
4702                },
4703            )
4704            .await
4705            .expect("open volume failed");
4706        let root_directory =
4707            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
4708        assert!(root_directory.lookup("test").await.expect("lookup failed").is_some());
4709
4710        fs.close().await.expect("close failed");
4711    }
4712
4713    #[fuchsia::test]
4714    async fn test_low_32_bit_object_ids() {
4715        let device = DeviceHolder::new(FakeDevice::new(16384, TEST_DEVICE_BLOCK_SIZE));
4716        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
4717
4718        {
4719            let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
4720
4721            let store = root_vol
4722                .new_volume(
4723                    "test",
4724                    NewChildStoreOptions { low_32_bit_object_ids: true, ..Default::default() },
4725                )
4726                .await
4727                .expect("new_volume failed");
4728
4729            let root_dir = Directory::open(&store, store.root_directory_object_id())
4730                .await
4731                .expect("open failed");
4732
4733            let mut ids = std::collections::HashSet::new();
4734
4735            for i in 0..100 {
4736                let mut transaction = fs
4737                    .root_store()
4738                    .new_transaction(
4739                        lock_keys![LockKey::object(store.store_object_id(), root_dir.object_id())],
4740                        Options::default(),
4741                    )
4742                    .await
4743                    .expect("new_transaction failed");
4744
4745                for j in 0..100 {
4746                    let object = root_dir
4747                        .create_child_dir(&mut transaction, &format!("{i}.{j}"))
4748                        .await
4749                        .expect("create_child_file failed");
4750
4751                    assert!(object.object_id() < 1 << 32);
4752                    assert_ne!(object.object_id(), INVALID_OBJECT_ID);
4753                    assert!(ids.insert(object.object_id()));
4754                }
4755
4756                transaction.commit().await.expect("commit failed");
4757            }
4758
4759            assert_matches!(store.store_info().unwrap().last_object_id, LastObjectIdInfo::Low32Bit);
4760
4761            fsck_volume(&fs, store.store_object_id(), None).await.expect("fsck_volume failed");
4762        }
4763
4764        // Verify persistence
4765        fs.close().await.expect("Close failed");
4766        let device = fs.take_device().await;
4767        device.reopen(false);
4768        let fs = FxFilesystem::open(device).await.expect("open failed");
4769        let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
4770        let store = root_vol.volume("test", StoreOptions::default()).await.expect("volume failed");
4771
4772        // Check that we can still create files and they have low 32-bit IDs.
4773        let root_dir =
4774            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
4775        let mut transaction = fs
4776            .root_store()
4777            .new_transaction(
4778                lock_keys![LockKey::object(store.store_object_id(), root_dir.object_id())],
4779                Options::default(),
4780            )
4781            .await
4782            .expect("new_transaction failed");
4783
4784        let object = root_dir
4785            .create_child_file(&mut transaction, "persistence_check")
4786            .await
4787            .expect("create_child_file failed");
4788        assert!(object.object_id() < 1 << 32);
4789
4790        transaction.commit().await.expect("commit failed");
4791
4792        assert_matches!(store.store_info().unwrap().last_object_id, LastObjectIdInfo::Low32Bit);
4793    }
4794
4795    #[fuchsia::test]
4796    async fn test_mutations_key_roll_during_flush() {
4797        let device = DeviceHolder::new(FakeDevice::new(16384, TEST_DEVICE_BLOCK_SIZE));
4798        let fs = FxFilesystemBuilder::new()
4799            .format(true)
4800            .roll_metadata_key_byte_count(2048)
4801            .open(device)
4802            .await
4803            .expect("open failed");
4804
4805        let crypt = Arc::new(new_insecure_crypt());
4806
4807        {
4808            let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
4809            let store = root_vol
4810                .new_volume(
4811                    "test",
4812                    NewChildStoreOptions {
4813                        options: StoreOptions {
4814                            crypt: Some(crypt.clone()),
4815                            ..StoreOptions::default()
4816                        },
4817                        ..Default::default()
4818                    },
4819                )
4820                .await
4821                .expect("new_volume failed");
4822
4823            let root_dir = Directory::open(&store, store.root_directory_object_id())
4824                .await
4825                .expect("open failed");
4826
4827            let mut last_offset = 0;
4828            loop {
4829                let offset = store.mutations_cipher.lock().as_ref().unwrap().offset();
4830                if offset >= 2048 {
4831                    break;
4832                }
4833                if offset < last_offset {
4834                    panic!("Key rolled during setup loop");
4835                }
4836                last_offset = offset;
4837
4838                let mut transaction = fs
4839                    .root_store()
4840                    .new_transaction(
4841                        lock_keys![LockKey::object(store.store_object_id(), root_dir.object_id())],
4842                        Options::default(),
4843                    )
4844                    .await
4845                    .expect("new_transaction failed");
4846                let name = format!("file_{offset}");
4847                root_dir
4848                    .create_child_file(&mut transaction, &name)
4849                    .await
4850                    .expect("create_child_file failed");
4851                transaction.commit().await.expect("commit failed");
4852            }
4853
4854            store.flush().await.expect("flush failed");
4855
4856            // Compact journal NOW, before writing after_flush.
4857            // Since store is flushed, it is not dirty.
4858            // This will trim journal past the flush (including UpdateMutationsKey).
4859            fs.journal().force_compact().await.expect("compact failed");
4860
4861            // Write a file after flush.
4862            let mut transaction = fs
4863                .root_store()
4864                .new_transaction(
4865                    lock_keys![LockKey::object(store.store_object_id(), root_dir.object_id())],
4866                    Options::default(),
4867                )
4868                .await
4869                .expect("new_transaction failed");
4870            let name = "file_after_flush";
4871            root_dir
4872                .create_child_file(&mut transaction, &name)
4873                .await
4874                .expect("create_child_file failed");
4875            transaction.commit().await.expect("commit failed");
4876        }
4877
4878        fs.close().await.expect("Close failed");
4879        let device = fs.take_device().await;
4880        device.reopen(false);
4881
4882        let fs = FxFilesystem::open(device).await.expect("open failed");
4883
4884        {
4885            let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
4886            let store = root_vol
4887                .volume("test", StoreOptions { crypt: Some(crypt), ..StoreOptions::default() })
4888                .await
4889                .expect("volume failed");
4890
4891            let root_dir = Directory::open(&store, store.root_directory_object_id())
4892                .await
4893                .expect("open failed");
4894
4895            let child = root_dir.lookup("file_after_flush").await.expect("lookup failed");
4896            assert!(child.is_some(), "file_after_flush missing!");
4897        }
4898        fs.close().await.expect("Close failed");
4899    }
4900
4901    struct StallingCrypt {
4902        delegate: Arc<dyn Crypt>,
4903        blocker: Mutex<Option<futures::channel::oneshot::Receiver<()>>>,
4904    }
4905
4906    #[async_trait]
4907    impl Crypt for StallingCrypt {
4908        async fn create_key(
4909            &self,
4910            owner: u64,
4911            purpose: KeyPurpose,
4912        ) -> Result<(FxfsKey, UnwrappedKey), zx::Status> {
4913            let rx = self.blocker.lock().take();
4914            if let Some(rx) = rx {
4915                let _ = rx.await;
4916            }
4917            self.delegate.create_key(owner, purpose).await
4918        }
4919
4920        async fn create_key_with_id(
4921            &self,
4922            owner: u64,
4923            wrapping_key_id: WrappingKeyId,
4924            object_type: ObjectType,
4925        ) -> Result<(EncryptionKey, UnwrappedKey), zx::Status> {
4926            self.delegate.create_key_with_id(owner, wrapping_key_id, object_type).await
4927        }
4928
4929        async fn unwrap_key(
4930            &self,
4931            wrapped_key: &WrappedKey,
4932            owner: u64,
4933        ) -> Result<UnwrappedKey, zx::Status> {
4934            self.delegate.unwrap_key(wrapped_key, owner).await
4935        }
4936    }
4937
4938    #[fuchsia::test]
4939    async fn test_fsck_during_key_pre_cache_stall() {
4940        let device = DeviceHolder::new(FakeDevice::new(16384, TEST_DEVICE_BLOCK_SIZE));
4941        let fs = FxFilesystemBuilder::new().format(true).open(device).await.expect("open failed");
4942
4943        // Initialize crypt without blocker so new_volume doesn't stall.
4944        let crypt = Arc::new(StallingCrypt {
4945            delegate: Arc::new(new_insecure_crypt()),
4946            blocker: Mutex::new(None),
4947        });
4948
4949        let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
4950        let store = root_vol
4951            .new_volume(
4952                "test",
4953                NewChildStoreOptions {
4954                    options: StoreOptions { crypt: Some(crypt.clone()), ..StoreOptions::default() },
4955                    ..Default::default()
4956                },
4957            )
4958            .await
4959            .expect("new_volume failed");
4960
4961        // Now install the blocker.
4962        let (tx, rx) = futures::channel::oneshot::channel();
4963        *crypt.blocker.lock() = Some(rx);
4964
4965        // The volume was created, so it has 2 cached keys.
4966        // We want to exhaust them so that the next transaction tries to pre-cache more.
4967        {
4968            let mut lock_state = store.lock_state.lock();
4969            if let super::LockState::Unlocked { cached_keys, .. } = &mut *lock_state {
4970                cached_keys.clear();
4971            }
4972        }
4973
4974        // Start a transaction in the background. It will try to pre-cache keys,
4975        // and should stall on the crypt service.
4976        let store_clone = store.clone();
4977        let root_dir_id = store.root_directory_object_id();
4978
4979        let tx_join_handle = fasync::Task::spawn(async move {
4980            let root_dir = Directory::open(&store_clone, root_dir_id).await.expect("open failed");
4981            let mut transaction = store_clone
4982                .new_transaction(
4983                    lock_keys![LockKey::object(
4984                        store_clone.store_object_id(),
4985                        root_dir.object_id()
4986                    )],
4987                    Options::default(),
4988                )
4989                .await
4990                .expect("new_transaction failed");
4991            root_dir
4992                .create_child_file(&mut transaction, "foo")
4993                .await
4994                .expect("create_child_file failed");
4995            transaction.commit().await.expect("commit failed");
4996        });
4997
4998        // Yield to let the background task run and block on crypt.
4999        fasync::Timer::new(std::time::Duration::from_millis(100)).await;
5000
5001        // While it is blocked, we should still be able to run fsck.
5002        fsck(fs.clone()).await.expect("fsck failed");
5003
5004        // Unblock the crypt service so the transaction can complete.
5005        let _ = tx.send(());
5006        tx_join_handle.await;
5007
5008        fs.close().await.expect("Close failed");
5009    }
5010
5011    #[fuchsia::test]
5012    async fn test_writes_to_other_store_not_blocked_by_stall() {
5013        let device = DeviceHolder::new(FakeDevice::new(16384, TEST_DEVICE_BLOCK_SIZE));
5014        let fs = FxFilesystemBuilder::new()
5015            .format(true)
5016            .journal_options(JournalOptions { reclaim_size: 32_768, ..Default::default() })
5017            .open(device)
5018            .await
5019            .expect("open failed");
5020
5021        // Initialize crypt for store1 with a blocker.
5022        // We will enable it after volume creation.
5023        let crypt1 = Arc::new(StallingCrypt {
5024            delegate: Arc::new(new_insecure_crypt()),
5025            blocker: Mutex::new(None),
5026        });
5027
5028        // Initialize normal crypt for store2.
5029        let crypt2 = Arc::new(new_insecure_crypt());
5030
5031        let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
5032
5033        let store1 = root_vol
5034            .new_volume(
5035                "test1",
5036                NewChildStoreOptions {
5037                    options: StoreOptions {
5038                        crypt: Some(crypt1.clone()),
5039                        ..StoreOptions::default()
5040                    },
5041                    ..Default::default()
5042                },
5043            )
5044            .await
5045            .expect("new_volume failed");
5046
5047        store1.flush().await.expect("flush failed");
5048
5049        let store2 = root_vol
5050            .new_volume(
5051                "test2",
5052                NewChildStoreOptions {
5053                    options: StoreOptions {
5054                        crypt: Some(crypt2.clone()),
5055                        ..StoreOptions::default()
5056                    },
5057                    ..Default::default()
5058                },
5059            )
5060            .await
5061            .expect("new_volume failed");
5062
5063        // Now install the blocker on crypt1.
5064        let (tx, rx) = futures::channel::oneshot::channel();
5065        *crypt1.blocker.lock() = Some(rx);
5066
5067        // Exhaust cached keys for store1 so next txn tries to pre-cache.
5068        {
5069            let mut lock_state = store1.lock_state.lock();
5070            if let super::LockState::Unlocked { cached_keys, .. } = &mut *lock_state {
5071                cached_keys.clear();
5072            }
5073        }
5074
5075        // Start a transaction on store1 in the background.
5076        // It should stall on crypt1.
5077        let store1_clone = store1.clone();
5078        let root_dir1_id = store1.root_directory_object_id();
5079
5080        let tx_join_handle = fasync::Task::spawn(async move {
5081            let root_dir1 =
5082                Directory::open(&store1_clone, root_dir1_id).await.expect("open failed");
5083            let mut transaction = store1_clone
5084                .new_transaction(
5085                    lock_keys![LockKey::object(
5086                        store1_clone.store_object_id(),
5087                        root_dir1.object_id()
5088                    )],
5089                    Options::default(),
5090                )
5091                .await
5092                .expect("new_transaction failed");
5093            root_dir1
5094                .create_child_file(&mut transaction, "foo")
5095                .await
5096                .expect("create_child_file failed");
5097            transaction.commit().await.expect("commit failed");
5098        });
5099
5100        // Yield to let the background task run and block.
5101        fasync::Timer::new(std::time::Duration::from_millis(100)).await;
5102
5103        // While store1 is blocked, we should still be able to write to store2.
5104        let root_dir2 =
5105            Directory::open(&store2, store2.root_directory_object_id()).await.expect("open failed");
5106        let long_name = "a".repeat(255);
5107        let mut i = 0;
5108        while store2.counters.lock().num_flushes < 3 {
5109            if i > 200 {
5110                panic!(
5111                    "Failed to trigger 3 compactions after 200 transactions. Flushes: {}",
5112                    store2.counters.lock().num_flushes
5113                );
5114            }
5115            let mut transaction = store2
5116                .new_transaction(
5117                    lock_keys![LockKey::object(store2.store_object_id(), root_dir2.object_id())],
5118                    Options::default(),
5119                )
5120                .await
5121                .expect("new_transaction failed");
5122            root_dir2
5123                .create_child_file(&mut transaction, &format!("{}-{:03}", long_name, i))
5124                .await
5125                .expect("create_child_file failed");
5126            transaction.commit().await.expect("commit failed");
5127            i += 1;
5128            fasync::Timer::new(std::time::Duration::from_millis(5)).await;
5129        }
5130
5131        // Unblock crypt1.
5132        let _ = tx.send(());
5133        tx_join_handle.await;
5134
5135        fs.close().await.expect("Close failed");
5136    }
5137
5138    #[fuchsia::test]
5139    async fn test_concurrent_transactions_exhaust_cached_keys() {
5140        let fs = test_filesystem().await;
5141
5142        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
5143        let crypt = Arc::new(new_insecure_crypt());
5144        let store = root_volume
5145            .new_volume(
5146                "vol",
5147                NewChildStoreOptions {
5148                    options: StoreOptions { crypt: Some(crypt.clone()), ..Default::default() },
5149                    ..Default::default()
5150                },
5151            )
5152            .await
5153            .expect("new_volume failed");
5154
5155        let root_directory =
5156            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
5157
5158        // Create three directories first, so we can run transactions concurrently
5159        // without blocking on directory locks.
5160        let mut transaction = store
5161            .new_transaction(
5162                lock_keys![LockKey::object(store.store_object_id(), root_directory.object_id())],
5163                Options::default(),
5164            )
5165            .await
5166            .expect("new_transaction failed");
5167        let dir1 = root_directory
5168            .create_child_dir(&mut transaction, "dir1")
5169            .await
5170            .expect("create_child_dir failed");
5171        let dir2 = root_directory
5172            .create_child_dir(&mut transaction, "dir2")
5173            .await
5174            .expect("create_child_dir failed");
5175        let dir3 = root_directory
5176            .create_child_dir(&mut transaction, "dir3")
5177            .await
5178            .expect("create_child_dir failed");
5179        transaction.commit().await.expect("commit failed");
5180
5181        // Start three transactions. They will all see that the cached keys are full
5182        // (size 2) after the first one tops them up, so they will all succeed to start
5183        // without calling crypt again (except the first one which tops up).
5184
5185        // Transaction 1
5186        let mut transaction1 = store
5187            .new_transaction(
5188                lock_keys![LockKey::object(store.store_object_id(), dir1.object_id())],
5189                Options::default(),
5190            )
5191            .await
5192            .expect("new_transaction 1 failed");
5193        dir1.create_child_file(&mut transaction1, "file1")
5194            .await
5195            .expect("create_child_file 1 failed");
5196
5197        // Transaction 2
5198        let mut transaction2 = store
5199            .new_transaction(
5200                lock_keys![LockKey::object(store.store_object_id(), dir2.object_id())],
5201                Options::default(),
5202            )
5203            .await
5204            .expect("new_transaction 2 failed");
5205        dir2.create_child_file(&mut transaction2, "file2")
5206            .await
5207            .expect("create_child_file 2 failed");
5208
5209        // Transaction 3
5210        let mut transaction3 = store
5211            .new_transaction(
5212                lock_keys![LockKey::object(store.store_object_id(), dir3.object_id())],
5213                Options::default(),
5214            )
5215            .await
5216            .expect("new_transaction 3 failed");
5217        dir3.create_child_file(&mut transaction3, "file3")
5218            .await
5219            .expect("create_child_file 3 failed");
5220
5221        // Now shut down the crypt service.
5222        crypt.shutdown();
5223
5224        // Commit transaction 1 and compact. This should succeed and consume 1 cached key.
5225        transaction1.commit().await.expect("commit 1 failed");
5226        fs.journal().force_compact().await.expect("compact 1 failed");
5227
5228        // Commit transaction 2 should FAIL because it tries to top up (since cache size is 1 < 2)
5229        // and the crypt service is dead.
5230        assert!(transaction2.commit().await.is_err());
5231
5232        fs.close().await.expect("Close failed");
5233    }
5234}