fxfs/
object_store.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5pub mod allocator;
6pub mod caching_object_handle;
7mod data_object_handle;
8pub mod directory;
9mod extent_record;
10mod flush;
11pub mod graveyard;
12pub mod journal;
13mod key_manager;
14pub(crate) mod merge;
15pub mod object_manager;
16pub mod object_record;
17pub mod project_id;
18mod store_object_handle;
19pub mod transaction;
20mod tree;
21mod tree_cache;
22pub mod volume;
23
24pub use data_object_handle::{
25    DataObjectHandle, DirectWriter, FsverityState, FsverityStateInner, RangeType,
26};
27pub use directory::Directory;
28pub use object_record::{ChildValue, ObjectDescriptor, PosixAttributes, Timestamp};
29pub use store_object_handle::{
30    SetExtendedAttributeMode, StoreObjectHandle, EXTENDED_ATTRIBUTE_RANGE_END,
31    EXTENDED_ATTRIBUTE_RANGE_START,
32};
33
34use crate::errors::FxfsError;
35use crate::filesystem::{
36    ApplyContext, ApplyMode, FxFilesystem, JournalingObject, SyncOptions, TxnGuard, MAX_FILE_SIZE,
37};
38use crate::log::*;
39use crate::lsm_tree::cache::{NullCache, ObjectCache};
40use crate::lsm_tree::types::{Item, ItemRef, LayerIterator};
41use crate::lsm_tree::{LSMTree, Query};
42use crate::object_handle::{ObjectHandle, ReadObjectHandle, INVALID_OBJECT_ID};
43use crate::object_store::allocator::Allocator;
44use crate::object_store::graveyard::Graveyard;
45use crate::object_store::journal::{JournalCheckpoint, JournaledTransaction};
46use crate::object_store::key_manager::KeyManager;
47use crate::object_store::transaction::{
48    lock_keys, AssocObj, AssociatedObject, LockKey, ObjectStoreMutation, Operation, Options,
49    Transaction,
50};
51use crate::range::RangeExt;
52use crate::round::round_up;
53use crate::serialized_types::{migrate_to_version, Migrate, Version, Versioned, VersionedLatest};
54use anyhow::{anyhow, bail, ensure, Context, Error};
55use async_trait::async_trait;
56use fidl_fuchsia_io as fio;
57use fprint::TypeFingerprint;
58use fuchsia_inspect::ArrayProperty;
59use fuchsia_sync::Mutex;
60use fxfs_crypto::ff1::Ff1;
61use fxfs_crypto::{
62    Crypt, KeyPurpose, StreamCipher, UnwrappedKey, WrappedKey, WrappedKeyV32, WrappedKeyV40,
63    WrappedKeys,
64};
65use once_cell::sync::OnceCell;
66use scopeguard::ScopeGuard;
67use serde::{Deserialize, Serialize};
68use std::fmt;
69use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
70use std::sync::{Arc, Weak};
71use storage_device::Device;
72use uuid::Uuid;
73
74pub use extent_record::{
75    ExtentKey, ExtentMode, ExtentValue, BLOB_MERKLE_ATTRIBUTE_ID, DEFAULT_DATA_ATTRIBUTE_ID,
76    FSVERITY_MERKLE_ATTRIBUTE_ID,
77};
78pub use object_record::{
79    AttributeKey, EncryptionKeys, ExtendedAttributeValue, FsverityMetadata, ObjectAttributes,
80    ObjectKey, ObjectKeyData, ObjectKind, ObjectValue, ProjectProperty, RootDigest,
81};
82pub use transaction::Mutation;
83
84// For encrypted stores, the lower 32 bits of the object ID are encrypted to make side-channel
85// attacks more difficult. This mask can be used to extract the hi part of the object ID.
86const OBJECT_ID_HI_MASK: u64 = 0xffffffff00000000;
87
88// At time of writing, this threshold limits transactions that delete extents to about 10,000 bytes.
89const TRANSACTION_MUTATION_THRESHOLD: usize = 200;
90
91// Encrypted files and directories use the fscrypt key (identified by `FSCRYPT_KEY_ID`) to encrypt
92// file contents and filenames respectively. All non-fscrypt encrypted files otherwise default to
93// using the `VOLUME_DATA_KEY_ID` key. Note, the filesystem always uses the `VOLUME_DATA_KEY_ID`
94// key to encrypt large extended attributes. Thus, encrypted files and directories with large
95// xattrs will have both an fscrypt and volume data key.
96pub const VOLUME_DATA_KEY_ID: u64 = 0;
97pub const FSCRYPT_KEY_ID: u64 = 1;
98
99/// A constant that can be used where an owner is expected of type `Weak<dyn StoreOwner>` but no
100/// owner is required.
101pub const NO_OWNER: Weak<()> = Weak::new();
102impl StoreOwner for () {}
103
104#[async_trait]
105pub trait StoreOwner: Send + Sync {
106    /// Forcibly lock the store.  This exists to give the StoreOwner an opportunity to clean up
107    /// tasks which might access the store before locking it, because ObjectStore::unlock can only
108    /// be called when the store is not in use.
109    async fn force_lock(self: Arc<Self>, _store: &ObjectStore) -> Result<(), Error> {
110        Err(anyhow!(FxfsError::Internal))
111    }
112}
113
114/// DataObjectHandle stores an owner that must implement this trait, which allows the handle to get
115/// back to an ObjectStore.
116pub trait HandleOwner: AsRef<ObjectStore> + Send + Sync + 'static {}
117
118/// StoreInfo stores information about the object store.  This is stored within the parent object
119/// store, and is used, for example, to get the persistent layer objects.
120pub type StoreInfo = StoreInfoV40;
121
122#[derive(Clone, Debug, Default, Serialize, Deserialize, TypeFingerprint, Versioned)]
123pub struct StoreInfoV40 {
124    /// The globally unique identifier for the associated object store. If unset, will be all zero.
125    guid: [u8; 16],
126
127    /// The last used object ID.  Note that this field is not accurate in memory; ObjectStore's
128    /// last_object_id field is the one to use in that case.  Technically, this might not be the
129    /// last object ID used for the latest transaction that created an object because we use this at
130    /// the point of creating the object but before we commit the transaction.  Transactions can
131    /// then get committed in an arbitrary order (or not at all).
132    last_object_id: u64,
133
134    /// Object ids for layers.  TODO(https://fxbug.dev/42178036): need a layer of indirection here
135    /// so we can support snapshots.
136    pub layers: Vec<u64>,
137
138    /// The object ID for the root directory.
139    root_directory_object_id: u64,
140
141    /// The object ID for the graveyard.
142    graveyard_directory_object_id: u64,
143
144    /// The number of live objects in the store.  This should *not* be trusted; it can be invalid
145    /// due to filesystem inconsistencies.
146    object_count: u64,
147
148    /// The (wrapped) key that encrypted mutations should use.
149    mutations_key: Option<WrappedKeyV40>,
150
151    /// Mutations for the store are encrypted using a stream cipher.  To decrypt the mutations, we
152    /// need to know the offset in the cipher stream to start it.
153    mutations_cipher_offset: u64,
154
155    /// If we have to flush the store whilst we do not have the key, we need to write the encrypted
156    /// mutations to an object. This is the object ID of that file if it exists.
157    pub encrypted_mutations_object_id: u64,
158
159    /// Object IDs are encrypted to reduce the amount of information that sequential object IDs
160    /// reveal (such as the number of files in the system and the ordering of their creation in
161    /// time).  Only the bottom 32 bits of the object ID are encrypted whilst the top 32 bits will
162    /// increment after 2^32 object IDs have been used and this allows us to roll the key.
163    object_id_key: Option<WrappedKeyV40>,
164
165    /// A directory for storing internal files in a directory structure. Holds INVALID_OBJECT_ID
166    /// when the directory doesn't yet exist.
167    internal_directory_object_id: u64,
168}
169
170#[derive(Default, Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
171#[migrate_to_version(StoreInfoV40)]
172pub struct StoreInfoV36 {
173    guid: [u8; 16],
174    last_object_id: u64,
175    pub layers: Vec<u64>,
176    root_directory_object_id: u64,
177    graveyard_directory_object_id: u64,
178    object_count: u64,
179    mutations_key: Option<WrappedKeyV32>,
180    mutations_cipher_offset: u64,
181    pub encrypted_mutations_object_id: u64,
182    object_id_key: Option<WrappedKeyV32>,
183    internal_directory_object_id: u64,
184}
185
186#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
187#[migrate_to_version(StoreInfoV36)]
188pub struct StoreInfoV32 {
189    guid: [u8; 16],
190    last_object_id: u64,
191    pub layers: Vec<u64>,
192    root_directory_object_id: u64,
193    graveyard_directory_object_id: u64,
194    object_count: u64,
195    mutations_key: Option<WrappedKeyV32>,
196    mutations_cipher_offset: u64,
197    pub encrypted_mutations_object_id: u64,
198    object_id_key: Option<WrappedKeyV32>,
199}
200
201impl StoreInfo {
202    /// Create a new/default [`StoreInfo`] but with a newly generated GUID.
203    fn new_with_guid() -> Self {
204        let guid = Uuid::new_v4();
205        Self { guid: *guid.as_bytes(), ..Default::default() }
206    }
207
208    /// Returns the parent objects for this store.
209    pub fn parent_objects(&self) -> Vec<u64> {
210        // We should not include the ID of the store itself, since that should be referred to in the
211        // volume directory.
212        let mut objects = self.layers.to_vec();
213        if self.encrypted_mutations_object_id != INVALID_OBJECT_ID {
214            objects.push(self.encrypted_mutations_object_id);
215        }
216        objects
217    }
218}
219
220// TODO(https://fxbug.dev/42178037): We should test or put checks in place to ensure this limit isn't exceeded.
221// It will likely involve placing limits on the maximum number of layers.
222pub const MAX_STORE_INFO_SERIALIZED_SIZE: usize = 131072;
223
224// This needs to be large enough to accommodate the maximum amount of unflushed data (data that is
225// in the journal but hasn't yet been written to layer files) for a store.  We set a limit because
226// we want to limit the amount of memory use in the case the filesystem is corrupt or under attack.
227pub const MAX_ENCRYPTED_MUTATIONS_SIZE: usize = 8 * journal::DEFAULT_RECLAIM_SIZE as usize;
228
229#[derive(Default)]
230pub struct HandleOptions {
231    /// If true, transactions used by this handle will skip journal space checks.
232    pub skip_journal_checks: bool,
233    /// If true, data written to any attribute of this handle will not have per-block checksums
234    /// computed.
235    pub skip_checksums: bool,
236}
237
238/// Parameters for encrypting a newly created object.
239struct ObjectEncryptionOptions {
240    /// If set, the keys are treated as permanent and never evicted from the KeyManager cache.
241    /// This is necessary when keys are managed by another store; for example, the layer files
242    /// of a child store are objects in the root store, but they are encrypted with keys from the
243    /// child store.  Generally, most objects should have this set to `false`.
244    permanent: bool,
245    key_id: u64,
246    key: WrappedKey,
247    unwrapped_key: UnwrappedKey,
248}
249
250pub struct NewChildStoreOptions {
251    /// The owner of the store.
252    pub owner: Weak<dyn StoreOwner>,
253
254    /// The store is unencrypted if store is none.
255    pub crypt: Option<Arc<dyn Crypt>>,
256
257    /// Specifies the object ID in the root store to be used for the store.  If set to
258    /// INVALID_OBJECT_ID (the default and typical case), a suitable ID will be chosen.
259    pub object_id: u64,
260}
261
262impl Default for NewChildStoreOptions {
263    fn default() -> Self {
264        Self { owner: NO_OWNER, crypt: None, object_id: INVALID_OBJECT_ID }
265    }
266}
267
268pub type EncryptedMutations = EncryptedMutationsV40;
269
270#[derive(Clone, Default, Deserialize, Serialize, TypeFingerprint)]
271pub struct EncryptedMutationsV40 {
272    // Information about the mutations are held here, but the actual encrypted data is held within
273    // data.  For each transaction, we record the checkpoint and the count of mutations within the
274    // transaction.  The checkpoint is required for the log file offset (which we need to apply the
275    // mutations), and the version so that we can correctly decode the mutation after it has been
276    // decrypted. The count specifies the number of serialized mutations encoded in |data|.
277    transactions: Vec<(JournalCheckpoint, u64)>,
278
279    // The encrypted mutations.
280    data: Vec<u8>,
281
282    // If the mutations key was rolled, this holds the offset in `data` where the new key should
283    // apply.
284    mutations_key_roll: Vec<(usize, WrappedKeyV40)>,
285}
286
287impl std::fmt::Debug for EncryptedMutations {
288    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
289        f.debug_struct("EncryptedMutations")
290            .field("transactions", &self.transactions)
291            .field("len", &self.data.len())
292            .field(
293                "mutations_key_roll",
294                &self.mutations_key_roll.iter().map(|k| k.0).collect::<Vec<usize>>(),
295            )
296            .finish()
297    }
298}
299
300impl Versioned for EncryptedMutations {
301    fn max_serialized_size() -> u64 {
302        MAX_ENCRYPTED_MUTATIONS_SIZE as u64
303    }
304}
305
306impl EncryptedMutations {
307    fn from_replayed_mutations(
308        store_object_id: u64,
309        transactions: Vec<JournaledTransaction>,
310    ) -> Self {
311        let mut this = Self::default();
312        for JournaledTransaction { checkpoint, non_root_mutations, .. } in transactions {
313            for (object_id, mutation) in non_root_mutations {
314                if store_object_id == object_id {
315                    if let Mutation::EncryptedObjectStore(data) = mutation {
316                        this.push(&checkpoint, data);
317                    } else if let Mutation::UpdateMutationsKey(key) = mutation {
318                        this.mutations_key_roll.push((this.data.len(), key.into()));
319                    }
320                }
321            }
322        }
323        this
324    }
325
326    fn extend(&mut self, other: &EncryptedMutations) {
327        self.transactions.extend_from_slice(&other.transactions[..]);
328        self.mutations_key_roll.extend(
329            other
330                .mutations_key_roll
331                .iter()
332                .map(|(offset, key)| (offset + self.data.len(), key.clone())),
333        );
334        self.data.extend_from_slice(&other.data[..]);
335    }
336
337    fn push(&mut self, checkpoint: &JournalCheckpoint, data: Box<[u8]>) {
338        self.data.append(&mut data.into());
339        // If the checkpoint is the same as the last mutation we pushed, increment the count.
340        if let Some((last_checkpoint, count)) = self.transactions.last_mut() {
341            if last_checkpoint.file_offset == checkpoint.file_offset {
342                *count += 1;
343                return;
344            }
345        }
346        self.transactions.push((checkpoint.clone(), 1));
347    }
348}
349
350pub enum LockState {
351    Locked,
352    Unencrypted,
353    Unlocked { owner: Weak<dyn StoreOwner>, crypt: Arc<dyn Crypt> },
354
355    // The store is unlocked, but in a read-only state, and no flushes or other operations will be
356    // performed on the store.
357    UnlockedReadOnly(Arc<dyn Crypt>),
358
359    // The store is encrypted but is now in an unusable state (due to a failure to sync the journal
360    // after locking the store).  The store cannot be unlocked.
361    Invalid,
362
363    // Before we've read the StoreInfo we might not know whether the store is Locked or Unencrypted.
364    // This can happen when lazily opening stores (ObjectManager::lazy_open_store).
365    Unknown,
366
367    // The store is in the process of being locked.  Whilst the store is being locked, the store
368    // isn't usable; assertions will trip if any mutations are applied.
369    Locking,
370
371    // Whilst we're unlocking, we will replay encrypted mutations.  The store isn't usable until
372    // it's in the Unlocked state.
373    Unlocking,
374}
375
376impl LockState {
377    fn owner(&self) -> Option<Arc<dyn StoreOwner>> {
378        if let Self::Unlocked { owner, .. } = self {
379            owner.upgrade()
380        } else {
381            None
382        }
383    }
384}
385
386impl fmt::Debug for LockState {
387    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
388        formatter.write_str(match self {
389            LockState::Locked => "Locked",
390            LockState::Unencrypted => "Unencrypted",
391            LockState::Unlocked { .. } => "Unlocked",
392            LockState::UnlockedReadOnly(..) => "UnlockedReadOnly",
393            LockState::Invalid => "Invalid",
394            LockState::Unknown => "Unknown",
395            LockState::Locking => "Locking",
396            LockState::Unlocking => "Unlocking",
397        })
398    }
399}
400
401#[derive(Default)]
402struct LastObjectId {
403    // The *unencrypted* value of the last object ID.
404    id: u64,
405
406    // Encrypted stores will use a cipher to obfuscate the object ID.
407    cipher: Option<Ff1>,
408}
409
410impl LastObjectId {
411    // Returns true if a cipher is needed to generate new object IDs.
412    fn should_create_cipher(&self) -> bool {
413        self.id as u32 == u32::MAX
414    }
415
416    fn get_next_object_id(&mut self) -> u64 {
417        self.id += 1;
418        if let Some(cipher) = &self.cipher {
419            let hi = self.id & OBJECT_ID_HI_MASK;
420            assert_ne!(hi, INVALID_OBJECT_ID);
421            assert_ne!(self.id as u32, 0); // This would indicate the ID wrapped.
422            hi | cipher.encrypt(self.id as u32) as u64
423        } else {
424            self.id
425        }
426    }
427}
428
429/// An object store supports a file like interface for objects.  Objects are keyed by a 64 bit
430/// identifier.  And object store has to be backed by a parent object store (which stores metadata
431/// for the object store).  The top-level object store (a.k.a. the root parent object store) is
432/// in-memory only.
433pub struct ObjectStore {
434    parent_store: Option<Arc<ObjectStore>>,
435    store_object_id: u64,
436    device: Arc<dyn Device>,
437    block_size: u64,
438    filesystem: Weak<FxFilesystem>,
439    // Lock ordering: This must be taken before `lock_state`.
440    store_info: Mutex<Option<StoreInfo>>,
441    tree: LSMTree<ObjectKey, ObjectValue>,
442
443    // When replaying the journal, the store cannot read StoreInfo until the whole journal
444    // has been replayed, so during that time, store_info_handle will be None and records
445    // just get sent to the tree. Once the journal has been replayed, we can open the store
446    // and load all the other layer information.
447    store_info_handle: OnceCell<DataObjectHandle<ObjectStore>>,
448
449    // The cipher to use for encrypted mutations, if this store is encrypted.
450    mutations_cipher: Mutex<Option<StreamCipher>>,
451
452    // Current lock state of the store.
453    // Lock ordering: This must be taken after `store_info`.
454    lock_state: Mutex<LockState>,
455    key_manager: KeyManager,
456
457    // Enable/disable tracing.
458    trace: AtomicBool,
459
460    // Informational counters for events occurring within the store.
461    counters: Mutex<ObjectStoreCounters>,
462
463    // These are updated in performance-sensitive code paths so we use atomics instead of counters.
464    device_read_ops: AtomicU64,
465    device_write_ops: AtomicU64,
466    logical_read_ops: AtomicU64,
467    logical_write_ops: AtomicU64,
468
469    // Contains the last object ID and, optionally, a cipher to be used when generating new object
470    // IDs.
471    last_object_id: Mutex<LastObjectId>,
472
473    // An optional callback to be invoked each time the ObjectStore flushes.  The callback is
474    // invoked at the end of flush, while the write lock is still held.
475    flush_callback: Mutex<Option<Box<dyn Fn(&ObjectStore) + Send + Sync + 'static>>>,
476}
477
478#[derive(Clone, Default)]
479struct ObjectStoreCounters {
480    mutations_applied: u64,
481    mutations_dropped: u64,
482    num_flushes: u64,
483    last_flush_time: Option<std::time::SystemTime>,
484    persistent_layer_file_sizes: Vec<u64>,
485}
486
487impl ObjectStore {
488    fn new(
489        parent_store: Option<Arc<ObjectStore>>,
490        store_object_id: u64,
491        filesystem: Arc<FxFilesystem>,
492        store_info: Option<StoreInfo>,
493        object_cache: Box<dyn ObjectCache<ObjectKey, ObjectValue>>,
494        mutations_cipher: Option<StreamCipher>,
495        lock_state: LockState,
496        last_object_id: LastObjectId,
497    ) -> Arc<ObjectStore> {
498        let device = filesystem.device();
499        let block_size = filesystem.block_size();
500        Arc::new(ObjectStore {
501            parent_store,
502            store_object_id,
503            device,
504            block_size,
505            filesystem: Arc::downgrade(&filesystem),
506            store_info: Mutex::new(store_info),
507            tree: LSMTree::new(merge::merge, object_cache),
508            store_info_handle: OnceCell::new(),
509            mutations_cipher: Mutex::new(mutations_cipher),
510            lock_state: Mutex::new(lock_state),
511            key_manager: KeyManager::new(),
512            trace: AtomicBool::new(false),
513            counters: Mutex::new(ObjectStoreCounters::default()),
514            device_read_ops: AtomicU64::new(0),
515            device_write_ops: AtomicU64::new(0),
516            logical_read_ops: AtomicU64::new(0),
517            logical_write_ops: AtomicU64::new(0),
518            last_object_id: Mutex::new(last_object_id),
519            flush_callback: Mutex::new(None),
520        })
521    }
522
523    fn new_empty(
524        parent_store: Option<Arc<ObjectStore>>,
525        store_object_id: u64,
526        filesystem: Arc<FxFilesystem>,
527        object_cache: Box<dyn ObjectCache<ObjectKey, ObjectValue>>,
528    ) -> Arc<Self> {
529        Self::new(
530            parent_store,
531            store_object_id,
532            filesystem,
533            Some(StoreInfo::default()),
534            object_cache,
535            None,
536            LockState::Unencrypted,
537            LastObjectId::default(),
538        )
539    }
540
541    /// Cycle breaker constructor that returns an ObjectStore without a filesystem.
542    /// This should only be used from super block code.
543    pub fn new_root_parent(device: Arc<dyn Device>, block_size: u64, store_object_id: u64) -> Self {
544        ObjectStore {
545            parent_store: None,
546            store_object_id,
547            device,
548            block_size,
549            filesystem: Weak::<FxFilesystem>::new(),
550            store_info: Mutex::new(Some(StoreInfo::default())),
551            tree: LSMTree::new(merge::merge, Box::new(NullCache {})),
552            store_info_handle: OnceCell::new(),
553            mutations_cipher: Mutex::new(None),
554            lock_state: Mutex::new(LockState::Unencrypted),
555            key_manager: KeyManager::new(),
556            trace: AtomicBool::new(false),
557            counters: Mutex::new(ObjectStoreCounters::default()),
558            device_read_ops: AtomicU64::new(0),
559            device_write_ops: AtomicU64::new(0),
560            logical_read_ops: AtomicU64::new(0),
561            logical_write_ops: AtomicU64::new(0),
562            last_object_id: Mutex::new(LastObjectId::default()),
563            flush_callback: Mutex::new(None),
564        }
565    }
566
567    /// Used to set filesystem on root_parent stores at bootstrap time after the filesystem has
568    /// been created.
569    pub fn attach_filesystem(mut this: ObjectStore, filesystem: Arc<FxFilesystem>) -> ObjectStore {
570        this.filesystem = Arc::downgrade(&filesystem);
571        this
572    }
573
574    /// Create a child store. It is a multi-step process:
575    ///
576    ///   1. Call `ObjectStore::new_child_store`.
577    ///   2. Register the store with the object-manager.
578    ///   3. Call `ObjectStore::create` to write the store-info.
579    ///
580    /// If the procedure fails, care must be taken to unregister store with the object-manager.
581    ///
582    /// The steps have to be separate because of lifetime issues when working with a transaction.
583    async fn new_child_store(
584        self: &Arc<Self>,
585        transaction: &mut Transaction<'_>,
586        options: NewChildStoreOptions,
587        object_cache: Box<dyn ObjectCache<ObjectKey, ObjectValue>>,
588    ) -> Result<Arc<Self>, Error> {
589        let handle = if options.object_id != INVALID_OBJECT_ID {
590            let handle = ObjectStore::create_object_with_id(
591                self,
592                transaction,
593                options.object_id,
594                HandleOptions::default(),
595                None,
596            )
597            .await?;
598            self.update_last_object_id(options.object_id);
599            handle
600        } else {
601            ObjectStore::create_object(self, transaction, HandleOptions::default(), None).await?
602        };
603        let filesystem = self.filesystem();
604        let store = if let Some(crypt) = options.crypt {
605            let (wrapped_key, unwrapped_key) =
606                crypt.create_key(handle.object_id(), KeyPurpose::Metadata).await?;
607            let (object_id_wrapped, object_id_unwrapped) =
608                crypt.create_key(handle.object_id(), KeyPurpose::Metadata).await?;
609            Self::new(
610                Some(self.clone()),
611                handle.object_id(),
612                filesystem.clone(),
613                Some(StoreInfo {
614                    mutations_key: Some(wrapped_key),
615                    object_id_key: Some(object_id_wrapped),
616                    ..StoreInfo::new_with_guid()
617                }),
618                object_cache,
619                Some(StreamCipher::new(&unwrapped_key, 0)),
620                LockState::Unlocked { owner: options.owner, crypt },
621                LastObjectId {
622                    // We need to avoid accidentally getting INVALID_OBJECT_ID, so we set
623                    // the top 32 bits to a non-zero value.
624                    id: 1 << 32,
625                    cipher: Some(Ff1::new(&object_id_unwrapped)),
626                },
627            )
628        } else {
629            Self::new(
630                Some(self.clone()),
631                handle.object_id(),
632                filesystem.clone(),
633                Some(StoreInfo::new_with_guid()),
634                object_cache,
635                None,
636                LockState::Unencrypted,
637                LastObjectId::default(),
638            )
639        };
640        assert!(store.store_info_handle.set(handle).is_ok());
641        Ok(store)
642    }
643
644    /// Actually creates the store in a transaction.  This will also create a root directory and
645    /// graveyard directory for the store.  See `new_child_store` above.
646    async fn create<'a>(
647        self: &'a Arc<Self>,
648        transaction: &mut Transaction<'a>,
649    ) -> Result<(), Error> {
650        let buf = {
651            // Create a root directory and graveyard directory.
652            let graveyard_directory_object_id = Graveyard::create(transaction, &self);
653            let root_directory = Directory::create(transaction, &self, None).await?;
654
655            let serialized_info = {
656                let mut store_info = self.store_info.lock();
657                let store_info = store_info.as_mut().unwrap();
658
659                store_info.graveyard_directory_object_id = graveyard_directory_object_id;
660                store_info.root_directory_object_id = root_directory.object_id();
661
662                let mut serialized_info = Vec::new();
663                store_info.serialize_with_version(&mut serialized_info)?;
664                serialized_info
665            };
666            let mut buf = self.device.allocate_buffer(serialized_info.len()).await;
667            buf.as_mut_slice().copy_from_slice(&serialized_info[..]);
668            buf
669        };
670
671        if self.filesystem().options().image_builder_mode {
672            // If we're in image builder mode, we want to avoid writing to disk unless explicitly
673            // asked to. New object stores will have their StoreInfo written when we compact in
674            // FxFilesystem::finalize().
675            Ok(())
676        } else {
677            self.store_info_handle.get().unwrap().txn_write(transaction, 0u64, buf.as_ref()).await
678        }
679    }
680
681    pub fn set_trace(&self, trace: bool) {
682        let old_value = self.trace.swap(trace, Ordering::Relaxed);
683        if trace != old_value {
684            info!(store_id = self.store_object_id(), trace; "OS: trace",);
685        }
686    }
687
688    /// Sets a callback to be invoked each time the ObjectStore flushes.  The callback is invoked at
689    /// the end of flush, while the write lock is still held.
690    pub fn set_flush_callback<F: Fn(&ObjectStore) + Send + Sync + 'static>(&self, callback: F) {
691        let mut flush_callback = self.flush_callback.lock();
692        *flush_callback = Some(Box::new(callback));
693    }
694
695    pub fn is_root(&self) -> bool {
696        if let Some(parent) = &self.parent_store {
697            parent.parent_store.is_none()
698        } else {
699            // The root parent store isn't the root store.
700            false
701        }
702    }
703
704    /// Populates an inspect node with store statistics.
705    pub fn record_data(self: &Arc<Self>, root: &fuchsia_inspect::Node) {
706        // TODO(https://fxbug.dev/42069513): Push-back or rate-limit to prevent DoS.
707        let counters = self.counters.lock();
708        if let Some(store_info) = self.store_info() {
709            root.record_string("guid", Uuid::from_bytes(store_info.guid).to_string());
710        } else {
711            warn!("Can't access store_info; store is locked.");
712        };
713        root.record_uint("store_object_id", self.store_object_id);
714        root.record_uint("mutations_applied", counters.mutations_applied);
715        root.record_uint("mutations_dropped", counters.mutations_dropped);
716        root.record_uint("num_flushes", counters.num_flushes);
717        if let Some(last_flush_time) = counters.last_flush_time.as_ref() {
718            root.record_uint(
719                "last_flush_time_ms",
720                last_flush_time
721                    .duration_since(std::time::UNIX_EPOCH)
722                    .unwrap_or(std::time::Duration::ZERO)
723                    .as_millis()
724                    .try_into()
725                    .unwrap_or(0u64),
726            );
727        }
728        let sizes = root.create_uint_array(
729            "persistent_layer_file_sizes",
730            counters.persistent_layer_file_sizes.len(),
731        );
732        for i in 0..counters.persistent_layer_file_sizes.len() {
733            sizes.set(i, counters.persistent_layer_file_sizes[i]);
734        }
735        root.record_uint("device_read_ops", self.device_read_ops.load(Ordering::Relaxed));
736        root.record_uint("device_write_ops", self.device_write_ops.load(Ordering::Relaxed));
737        root.record_uint("logical_read_ops", self.logical_read_ops.load(Ordering::Relaxed));
738        root.record_uint("logical_write_ops", self.logical_write_ops.load(Ordering::Relaxed));
739
740        root.record(sizes);
741
742        let this = self.clone();
743        root.record_child("lsm_tree", move |node| this.tree().record_inspect_data(node));
744    }
745
746    pub fn device(&self) -> &Arc<dyn Device> {
747        &self.device
748    }
749
750    pub fn block_size(&self) -> u64 {
751        self.block_size
752    }
753
754    pub fn filesystem(&self) -> Arc<FxFilesystem> {
755        self.filesystem.upgrade().unwrap()
756    }
757
758    pub fn store_object_id(&self) -> u64 {
759        self.store_object_id
760    }
761
762    pub fn tree(&self) -> &LSMTree<ObjectKey, ObjectValue> {
763        &self.tree
764    }
765
766    pub fn root_directory_object_id(&self) -> u64 {
767        self.store_info.lock().as_ref().unwrap().root_directory_object_id
768    }
769
770    pub fn graveyard_directory_object_id(&self) -> u64 {
771        self.store_info.lock().as_ref().unwrap().graveyard_directory_object_id
772    }
773
774    fn set_graveyard_directory_object_id(&self, oid: u64) {
775        assert_eq!(
776            std::mem::replace(
777                &mut self.store_info.lock().as_mut().unwrap().graveyard_directory_object_id,
778                oid
779            ),
780            INVALID_OBJECT_ID
781        );
782    }
783
784    pub fn object_count(&self) -> u64 {
785        self.store_info.lock().as_ref().unwrap().object_count
786    }
787
788    pub fn key_manager(&self) -> &KeyManager {
789        &self.key_manager
790    }
791
792    pub fn parent_store(&self) -> Option<&Arc<ObjectStore>> {
793        self.parent_store.as_ref()
794    }
795
796    /// Returns the crypt object for the store.  Returns None if the store is unencrypted.
797    pub fn crypt(&self) -> Option<Arc<dyn Crypt>> {
798        match &*self.lock_state.lock() {
799            LockState::Locked => panic!("Store is locked"),
800            LockState::Invalid
801            | LockState::Unencrypted
802            | LockState::Locking
803            | LockState::Unlocking => None,
804            LockState::Unlocked { crypt, .. } => Some(crypt.clone()),
805            LockState::UnlockedReadOnly(crypt) => Some(crypt.clone()),
806            LockState::Unknown => {
807                panic!("Store is of unknown lock state; has the journal been replayed yet?")
808            }
809        }
810    }
811
812    pub async fn get_or_create_internal_directory_id(self: &Arc<Self>) -> Result<u64, Error> {
813        // Create the transaction first to use the object store lock.
814        let mut transaction = self
815            .filesystem()
816            .new_transaction(
817                lock_keys![LockKey::object(
818                    self.parent_store.as_ref().unwrap().store_object_id,
819                    self.store_object_id,
820                )],
821                Options::default(),
822            )
823            .await?;
824        let obj_id = self.store_info.lock().as_ref().unwrap().internal_directory_object_id;
825        if obj_id != INVALID_OBJECT_ID {
826            return Ok(obj_id);
827        }
828
829        // Need to create an internal directory.
830        let directory = Directory::create(&mut transaction, self, None).await?;
831
832        transaction.add(self.store_object_id, Mutation::CreateInternalDir(directory.object_id()));
833        transaction.commit().await?;
834        Ok(directory.object_id())
835    }
836
837    /// Returns the file size for the object without opening the object.
838    async fn get_file_size(&self, object_id: u64) -> Result<u64, Error> {
839        let item = self
840            .tree
841            .find(&ObjectKey::attribute(
842                object_id,
843                DEFAULT_DATA_ATTRIBUTE_ID,
844                AttributeKey::Attribute,
845            ))
846            .await?
847            .ok_or(FxfsError::NotFound)?;
848        if let ObjectValue::Attribute { size, .. } = item.value {
849            Ok(size)
850        } else {
851            bail!(FxfsError::NotFile);
852        }
853    }
854
855    /// `crypt` can be provided if the crypt service should be different to the default; see the
856    /// comment on create_object.  Users should avoid having more than one handle open for the same
857    /// object at the same time because they might get out-of-sync; there is no code that will
858    /// prevent this.  One example where this can cause an issue is if the object ends up using a
859    /// permanent key (which is the case if a value is passed for `crypt`), the permanent key is
860    /// dropped when a handle is dropped, which will impact any other handles for the same object.
861    pub async fn open_object<S: HandleOwner>(
862        owner: &Arc<S>,
863        obj_id: u64,
864        options: HandleOptions,
865        crypt: Option<Arc<dyn Crypt>>,
866    ) -> Result<DataObjectHandle<S>, Error> {
867        let store = owner.as_ref().as_ref();
868        let mut fsverity_descriptor = None;
869        let mut overwrite_ranges = Vec::new();
870        let item = store
871            .tree
872            .find(&ObjectKey::attribute(obj_id, DEFAULT_DATA_ATTRIBUTE_ID, AttributeKey::Attribute))
873            .await?
874            .ok_or(FxfsError::NotFound)?;
875
876        let (size, track_overwrite_extents) = match item.value {
877            ObjectValue::Attribute { size, has_overwrite_extents } => (size, has_overwrite_extents),
878            ObjectValue::VerifiedAttribute { size, fsverity_metadata } => {
879                fsverity_descriptor = Some(fsverity_metadata);
880                // We only track the overwrite extents in memory for writes, reads handle them
881                // implicitly, which means verified files (where the data won't change anymore)
882                // don't need to track them.
883                (size, false)
884            }
885            _ => bail!(anyhow!(FxfsError::Inconsistent).context("open_object: Expected attibute")),
886        };
887
888        ensure!(size <= MAX_FILE_SIZE, FxfsError::Inconsistent);
889
890        if track_overwrite_extents {
891            let layer_set = store.tree.layer_set();
892            let mut merger = layer_set.merger();
893            let mut iter = merger
894                .query(Query::FullRange(&ObjectKey::attribute(
895                    obj_id,
896                    DEFAULT_DATA_ATTRIBUTE_ID,
897                    AttributeKey::Extent(ExtentKey::search_key_from_offset(0)),
898                )))
899                .await?;
900            loop {
901                match iter.get() {
902                    Some(ItemRef {
903                        key:
904                            ObjectKey {
905                                object_id,
906                                data:
907                                    ObjectKeyData::Attribute(
908                                        attribute_id,
909                                        AttributeKey::Extent(ExtentKey { range }),
910                                    ),
911                            },
912                        value,
913                        ..
914                    }) if *object_id == obj_id && *attribute_id == DEFAULT_DATA_ATTRIBUTE_ID => {
915                        match value {
916                            ObjectValue::Extent(ExtentValue::None)
917                            | ObjectValue::Extent(ExtentValue::Some {
918                                mode: ExtentMode::Raw,
919                                ..
920                            })
921                            | ObjectValue::Extent(ExtentValue::Some {
922                                mode: ExtentMode::Cow(_),
923                                ..
924                            }) => (),
925                            ObjectValue::Extent(ExtentValue::Some {
926                                mode: ExtentMode::OverwritePartial(_),
927                                ..
928                            })
929                            | ObjectValue::Extent(ExtentValue::Some {
930                                mode: ExtentMode::Overwrite,
931                                ..
932                            }) => overwrite_ranges.push(range.clone()),
933                            _ => bail!(anyhow!(FxfsError::Inconsistent)
934                                .context("open_object: Expected extent")),
935                        }
936                        iter.advance().await?;
937                    }
938                    _ => break,
939                }
940            }
941        }
942
943        // If a crypt service has been specified, it needs to be a permanent key because cached
944        // keys can only use the store's crypt service.
945        let permanent = if let Some(crypt) = crypt {
946            store
947                .key_manager
948                .get_keys(
949                    obj_id,
950                    crypt.as_ref(),
951                    &mut Some(async || store.get_keys(obj_id).await),
952                    /* permanent= */ true,
953                    /* force= */ false,
954                )
955                .await?;
956            true
957        } else {
958            false
959        };
960        let data_object_handle = DataObjectHandle::new(
961            owner.clone(),
962            obj_id,
963            permanent,
964            DEFAULT_DATA_ATTRIBUTE_ID,
965            size,
966            FsverityState::None,
967            options,
968            false,
969            overwrite_ranges,
970        );
971        if let Some(descriptor) = fsverity_descriptor {
972            match data_object_handle.read_attr(FSVERITY_MERKLE_ATTRIBUTE_ID).await? {
973                None => {
974                    return Err(anyhow!(FxfsError::NotFound));
975                }
976                Some(data) => {
977                    data_object_handle.set_fsverity_state_some(descriptor, data);
978                }
979            }
980        }
981        Ok(data_object_handle)
982    }
983
984    async fn create_object_with_id<S: HandleOwner>(
985        owner: &Arc<S>,
986        transaction: &mut Transaction<'_>,
987        object_id: u64,
988        options: HandleOptions,
989        encryption_options: Option<ObjectEncryptionOptions>,
990    ) -> Result<DataObjectHandle<S>, Error> {
991        debug_assert!(object_id != INVALID_OBJECT_ID);
992        let store = owner.as_ref().as_ref();
993        // Don't permit creating unencrypted objects in an encrypted store.  The converse is OK.
994        debug_assert!(store.crypt().is_none() || encryption_options.is_some());
995        let now = Timestamp::now();
996        transaction.add(
997            store.store_object_id(),
998            Mutation::insert_object(
999                ObjectKey::object(object_id),
1000                ObjectValue::file(1, 0, now.clone(), now.clone(), now.clone(), now, 0, None),
1001            ),
1002        );
1003        let mut permanent_keys = false;
1004        if let Some(ObjectEncryptionOptions { permanent, key_id, key, unwrapped_key }) =
1005            encryption_options
1006        {
1007            permanent_keys = permanent;
1008            transaction.add(
1009                store.store_object_id(),
1010                Mutation::insert_object(
1011                    ObjectKey::keys(object_id),
1012                    ObjectValue::keys(EncryptionKeys::AES256XTS(WrappedKeys::from(vec![(
1013                        key_id, key,
1014                    )]))),
1015                ),
1016            );
1017            store.key_manager.insert(object_id, &vec![(key_id, Some(unwrapped_key))], permanent);
1018        }
1019        transaction.add(
1020            store.store_object_id(),
1021            Mutation::insert_object(
1022                ObjectKey::attribute(object_id, DEFAULT_DATA_ATTRIBUTE_ID, AttributeKey::Attribute),
1023                // This is a new object so nothing has pre-allocated overwrite extents yet.
1024                ObjectValue::attribute(0, false),
1025            ),
1026        );
1027        Ok(DataObjectHandle::new(
1028            owner.clone(),
1029            object_id,
1030            permanent_keys,
1031            DEFAULT_DATA_ATTRIBUTE_ID,
1032            0,
1033            FsverityState::None,
1034            options,
1035            false,
1036            Vec::new(),
1037        ))
1038    }
1039
1040    /// Creates an object in the store.
1041    ///
1042    /// If the store is encrypted, the object will be automatically encrypted as well.
1043    /// If `wrapping_key_id` is set, the new keys will be wrapped with that specific key, and
1044    /// otherwise the default data key is used.
1045    pub async fn create_object<S: HandleOwner>(
1046        owner: &Arc<S>,
1047        mut transaction: &mut Transaction<'_>,
1048        options: HandleOptions,
1049        wrapping_key_id: Option<u128>,
1050    ) -> Result<DataObjectHandle<S>, Error> {
1051        let store = owner.as_ref().as_ref();
1052        let object_id = store.get_next_object_id(transaction.txn_guard()).await?;
1053        let crypt = store.crypt();
1054        let encryption_options = if let Some(crypt) = crypt {
1055            let key_id =
1056                if wrapping_key_id.is_some() { FSCRYPT_KEY_ID } else { VOLUME_DATA_KEY_ID };
1057            let (key, unwrapped_key) = if let Some(wrapping_key_id) = wrapping_key_id {
1058                crypt.create_key_with_id(object_id, wrapping_key_id).await?
1059            } else {
1060                crypt.create_key(object_id, KeyPurpose::Data).await?
1061            };
1062            Some(ObjectEncryptionOptions { permanent: false, key_id, key, unwrapped_key })
1063        } else {
1064            None
1065        };
1066        ObjectStore::create_object_with_id(
1067            owner,
1068            &mut transaction,
1069            object_id,
1070            options,
1071            encryption_options,
1072        )
1073        .await
1074    }
1075
1076    /// Creates an object using explicitly provided keys.
1077    ///
1078    /// There are some cases where an encrypted object needs to be created in an unencrypted store.
1079    /// For example, when layer files for a child store are created in the root store, but they must
1080    /// be encrypted using the child store's keys.  This method exists for that purpose.
1081    pub(crate) async fn create_object_with_key<S: HandleOwner>(
1082        owner: &Arc<S>,
1083        mut transaction: &mut Transaction<'_>,
1084        object_id: u64,
1085        options: HandleOptions,
1086        key: WrappedKey,
1087        unwrapped_key: UnwrappedKey,
1088    ) -> Result<DataObjectHandle<S>, Error> {
1089        ObjectStore::create_object_with_id(
1090            owner,
1091            &mut transaction,
1092            object_id,
1093            options,
1094            Some(ObjectEncryptionOptions {
1095                permanent: true,
1096                key_id: VOLUME_DATA_KEY_ID,
1097                key,
1098                unwrapped_key,
1099            }),
1100        )
1101        .await
1102    }
1103
1104    /// Adjusts the reference count for a given object.  If the reference count reaches zero, the
1105    /// object is moved into the graveyard and true is returned.
1106    pub async fn adjust_refs(
1107        &self,
1108        transaction: &mut Transaction<'_>,
1109        oid: u64,
1110        delta: i64,
1111    ) -> Result<bool, Error> {
1112        let mut mutation = self.txn_get_object_mutation(transaction, oid).await?;
1113        let refs = if let ObjectValue::Object {
1114            kind: ObjectKind::File { refs, .. } | ObjectKind::Symlink { refs, .. },
1115            ..
1116        } = &mut mutation.item.value
1117        {
1118            *refs =
1119                refs.checked_add_signed(delta).ok_or_else(|| anyhow!("refs underflow/overflow"))?;
1120            refs
1121        } else {
1122            bail!(FxfsError::NotFile);
1123        };
1124        if *refs == 0 {
1125            self.add_to_graveyard(transaction, oid);
1126
1127            // We might still need to adjust the reference count if delta was something other than
1128            // -1.
1129            if delta != -1 {
1130                *refs = 1;
1131                transaction.add(self.store_object_id, Mutation::ObjectStore(mutation));
1132            }
1133            // Otherwise, we don't commit the mutation as we want to keep reference count as 1 for
1134            // objects in graveyard.
1135            Ok(true)
1136        } else {
1137            transaction.add(self.store_object_id, Mutation::ObjectStore(mutation));
1138            Ok(false)
1139        }
1140    }
1141
1142    // Purges an object that is in the graveyard.
1143    pub async fn tombstone_object(
1144        &self,
1145        object_id: u64,
1146        txn_options: Options<'_>,
1147    ) -> Result<(), Error> {
1148        self.key_manager.remove(object_id).await;
1149        self.trim_or_tombstone(object_id, true, txn_options).await
1150    }
1151
1152    /// Trim extents beyond the end of a file for all attributes.  This will remove the entry from
1153    /// the graveyard when done.
1154    pub async fn trim(&self, object_id: u64) -> Result<(), Error> {
1155        // For the root and root parent store, we would need to use the metadata reservation which
1156        // we don't currently support, so assert that we're not those stores.
1157        assert!(self.parent_store.as_ref().unwrap().parent_store.is_some());
1158
1159        self.trim_or_tombstone(
1160            object_id,
1161            false,
1162            Options { borrow_metadata_space: true, ..Default::default() },
1163        )
1164        .await
1165    }
1166
1167    async fn trim_or_tombstone(
1168        &self,
1169        object_id: u64,
1170        for_tombstone: bool,
1171        txn_options: Options<'_>,
1172    ) -> Result<(), Error> {
1173        let fs = self.filesystem();
1174        let mut next_attribute = Some(0);
1175        while let Some(attribute_id) = next_attribute.take() {
1176            let mut transaction = fs
1177                .clone()
1178                .new_transaction(
1179                    lock_keys![
1180                        LockKey::object_attribute(self.store_object_id, object_id, attribute_id),
1181                        LockKey::object(self.store_object_id, object_id),
1182                    ],
1183                    txn_options,
1184                )
1185                .await?;
1186
1187            match self
1188                .trim_some(
1189                    &mut transaction,
1190                    object_id,
1191                    attribute_id,
1192                    if for_tombstone {
1193                        TrimMode::Tombstone(TombstoneMode::Object)
1194                    } else {
1195                        TrimMode::UseSize
1196                    },
1197                )
1198                .await?
1199            {
1200                TrimResult::Incomplete => next_attribute = Some(attribute_id),
1201                TrimResult::Done(None) => {
1202                    if for_tombstone
1203                        || matches!(
1204                            self.tree
1205                                .find(&ObjectKey::graveyard_entry(
1206                                    self.graveyard_directory_object_id(),
1207                                    object_id,
1208                                ))
1209                                .await?,
1210                            Some(Item { value: ObjectValue::Trim, .. })
1211                        )
1212                    {
1213                        self.remove_from_graveyard(&mut transaction, object_id);
1214                    }
1215                }
1216                TrimResult::Done(id) => next_attribute = id,
1217            }
1218
1219            if !transaction.mutations().is_empty() {
1220                transaction.commit().await?;
1221            }
1222        }
1223        Ok(())
1224    }
1225
1226    // Purges an object's attribute that is in the graveyard.
1227    pub async fn tombstone_attribute(
1228        &self,
1229        object_id: u64,
1230        attribute_id: u64,
1231        txn_options: Options<'_>,
1232    ) -> Result<(), Error> {
1233        let fs = self.filesystem();
1234        let mut trim_result = TrimResult::Incomplete;
1235        while matches!(trim_result, TrimResult::Incomplete) {
1236            let mut transaction = fs
1237                .clone()
1238                .new_transaction(
1239                    lock_keys![
1240                        LockKey::object_attribute(self.store_object_id, object_id, attribute_id),
1241                        LockKey::object(self.store_object_id, object_id),
1242                    ],
1243                    txn_options,
1244                )
1245                .await?;
1246            trim_result = self
1247                .trim_some(
1248                    &mut transaction,
1249                    object_id,
1250                    attribute_id,
1251                    TrimMode::Tombstone(TombstoneMode::Attribute),
1252                )
1253                .await?;
1254            if let TrimResult::Done(..) = trim_result {
1255                self.remove_attribute_from_graveyard(&mut transaction, object_id, attribute_id)
1256            }
1257            if !transaction.mutations().is_empty() {
1258                transaction.commit().await?;
1259            }
1260        }
1261        Ok(())
1262    }
1263
1264    /// Deletes extents for attribute `attribute_id` in object `object_id`.  Also see the comments
1265    /// for TrimMode and TrimResult. Should hold a lock on the attribute, and the object as it
1266    /// performs a read-modify-write on the sizes.
1267    pub async fn trim_some(
1268        &self,
1269        transaction: &mut Transaction<'_>,
1270        object_id: u64,
1271        attribute_id: u64,
1272        mode: TrimMode,
1273    ) -> Result<TrimResult, Error> {
1274        let layer_set = self.tree.layer_set();
1275        let mut merger = layer_set.merger();
1276
1277        let aligned_offset = match mode {
1278            TrimMode::FromOffset(offset) => {
1279                round_up(offset, self.block_size).ok_or(FxfsError::Inconsistent)?
1280            }
1281            TrimMode::Tombstone(..) => 0,
1282            TrimMode::UseSize => {
1283                let iter = merger
1284                    .query(Query::FullRange(&ObjectKey::attribute(
1285                        object_id,
1286                        attribute_id,
1287                        AttributeKey::Attribute,
1288                    )))
1289                    .await?;
1290                if let Some(item_ref) = iter.get() {
1291                    if item_ref.key.object_id != object_id {
1292                        return Ok(TrimResult::Done(None));
1293                    }
1294
1295                    if let ItemRef {
1296                        key:
1297                            ObjectKey {
1298                                data:
1299                                    ObjectKeyData::Attribute(size_attribute_id, AttributeKey::Attribute),
1300                                ..
1301                            },
1302                        value: ObjectValue::Attribute { size, .. },
1303                        ..
1304                    } = item_ref
1305                    {
1306                        // If we found a different attribute_id, return so we can get the
1307                        // right lock.
1308                        if *size_attribute_id != attribute_id {
1309                            return Ok(TrimResult::Done(Some(*size_attribute_id)));
1310                        }
1311                        round_up(*size, self.block_size).ok_or(FxfsError::Inconsistent)?
1312                    } else {
1313                        // At time of writing, we should always see a size record or None here, but
1314                        // asserting here would be brittle so just skip to the the next attribute
1315                        // instead.
1316                        return Ok(TrimResult::Done(Some(attribute_id + 1)));
1317                    }
1318                } else {
1319                    // End of the tree.
1320                    return Ok(TrimResult::Done(None));
1321                }
1322            }
1323        };
1324
1325        // Loop over the extents and deallocate them.
1326        let mut iter = merger
1327            .query(Query::FullRange(&ObjectKey::from_extent(
1328                object_id,
1329                attribute_id,
1330                ExtentKey::search_key_from_offset(aligned_offset),
1331            )))
1332            .await?;
1333        let mut end = 0;
1334        let allocator = self.allocator();
1335        let mut result = TrimResult::Done(None);
1336        let mut deallocated = 0;
1337        let block_size = self.block_size;
1338
1339        while let Some(item_ref) = iter.get() {
1340            if item_ref.key.object_id != object_id {
1341                break;
1342            }
1343            if let ObjectKey {
1344                data: ObjectKeyData::Attribute(extent_attribute_id, attribute_key),
1345                ..
1346            } = item_ref.key
1347            {
1348                if *extent_attribute_id != attribute_id {
1349                    result = TrimResult::Done(Some(*extent_attribute_id));
1350                    break;
1351                }
1352                if let (
1353                    AttributeKey::Extent(ExtentKey { range }),
1354                    ObjectValue::Extent(ExtentValue::Some { device_offset, .. }),
1355                ) = (attribute_key, item_ref.value)
1356                {
1357                    let start = std::cmp::max(range.start, aligned_offset);
1358                    ensure!(start < range.end, FxfsError::Inconsistent);
1359                    let device_offset = device_offset
1360                        .checked_add(start - range.start)
1361                        .ok_or(FxfsError::Inconsistent)?;
1362                    end = range.end;
1363                    let len = end - start;
1364                    let device_range = device_offset..device_offset + len;
1365                    ensure!(device_range.is_aligned(block_size), FxfsError::Inconsistent);
1366                    allocator.deallocate(transaction, self.store_object_id, device_range).await?;
1367                    deallocated += len;
1368                    // Stop if the transaction is getting too big.
1369                    if transaction.mutations().len() >= TRANSACTION_MUTATION_THRESHOLD {
1370                        result = TrimResult::Incomplete;
1371                        break;
1372                    }
1373                }
1374            }
1375            iter.advance().await?;
1376        }
1377
1378        let finished_tombstone_object = matches!(mode, TrimMode::Tombstone(TombstoneMode::Object))
1379            && matches!(result, TrimResult::Done(None));
1380        let finished_tombstone_attribute =
1381            matches!(mode, TrimMode::Tombstone(TombstoneMode::Attribute))
1382                && !matches!(result, TrimResult::Incomplete);
1383        let mut object_mutation = None;
1384        let nodes = if finished_tombstone_object { -1 } else { 0 };
1385        if nodes != 0 || deallocated != 0 {
1386            let mutation = self.txn_get_object_mutation(transaction, object_id).await?;
1387            if let ObjectValue::Object { attributes: ObjectAttributes { project_id, .. }, .. } =
1388                mutation.item.value
1389            {
1390                if project_id != 0 {
1391                    transaction.add(
1392                        self.store_object_id,
1393                        Mutation::merge_object(
1394                            ObjectKey::project_usage(self.root_directory_object_id(), project_id),
1395                            ObjectValue::BytesAndNodes {
1396                                bytes: -i64::try_from(deallocated).unwrap(),
1397                                nodes,
1398                            },
1399                        ),
1400                    );
1401                }
1402                object_mutation = Some(mutation);
1403            } else {
1404                panic!("Inconsistent object type.");
1405            }
1406        }
1407
1408        // Deletion marker records *must* be merged so as to consume all other records for the
1409        // object.
1410        if finished_tombstone_object {
1411            transaction.add(
1412                self.store_object_id,
1413                Mutation::merge_object(ObjectKey::object(object_id), ObjectValue::None),
1414            );
1415        } else {
1416            if finished_tombstone_attribute {
1417                transaction.add(
1418                    self.store_object_id,
1419                    Mutation::merge_object(
1420                        ObjectKey::attribute(object_id, attribute_id, AttributeKey::Attribute),
1421                        ObjectValue::None,
1422                    ),
1423                );
1424            }
1425            if deallocated > 0 {
1426                let mut mutation = match object_mutation {
1427                    Some(mutation) => mutation,
1428                    None => self.txn_get_object_mutation(transaction, object_id).await?,
1429                };
1430                transaction.add(
1431                    self.store_object_id,
1432                    Mutation::merge_object(
1433                        ObjectKey::extent(object_id, attribute_id, aligned_offset..end),
1434                        ObjectValue::deleted_extent(),
1435                    ),
1436                );
1437                // Update allocated size.
1438                if let ObjectValue::Object {
1439                    attributes: ObjectAttributes { allocated_size, .. },
1440                    ..
1441                } = &mut mutation.item.value
1442                {
1443                    // The only way for these to fail are if the volume is inconsistent.
1444                    *allocated_size = allocated_size.checked_sub(deallocated).ok_or_else(|| {
1445                        anyhow!(FxfsError::Inconsistent).context("Allocated size overflow")
1446                    })?;
1447                } else {
1448                    panic!("Unexpected object value");
1449                }
1450                transaction.add(self.store_object_id, Mutation::ObjectStore(mutation));
1451            }
1452        }
1453        Ok(result)
1454    }
1455
1456    /// Returns all objects that exist in the parent store that pertain to this object store.
1457    /// Note that this doesn't include the object_id of the store itself which is generally
1458    /// referenced externally.
1459    pub fn parent_objects(&self) -> Vec<u64> {
1460        assert!(self.store_info_handle.get().is_some());
1461        self.store_info.lock().as_ref().unwrap().parent_objects()
1462    }
1463
1464    /// Returns root objects for this store.
1465    pub fn root_objects(&self) -> Vec<u64> {
1466        let mut objects = Vec::new();
1467        let store_info = self.store_info.lock();
1468        let info = store_info.as_ref().unwrap();
1469        if info.root_directory_object_id != INVALID_OBJECT_ID {
1470            objects.push(info.root_directory_object_id);
1471        }
1472        if info.graveyard_directory_object_id != INVALID_OBJECT_ID {
1473            objects.push(info.graveyard_directory_object_id);
1474        }
1475        if info.internal_directory_object_id != INVALID_OBJECT_ID {
1476            objects.push(info.internal_directory_object_id);
1477        }
1478        objects
1479    }
1480
1481    pub fn store_info(&self) -> Option<StoreInfo> {
1482        self.store_info.lock().as_ref().cloned()
1483    }
1484
1485    /// Returns None if called during journal replay.
1486    pub fn store_info_handle_object_id(&self) -> Option<u64> {
1487        self.store_info_handle.get().map(|h| h.object_id())
1488    }
1489
1490    /// Called to open a store, before replay of this store's mutations.
1491    async fn open(
1492        parent_store: &Arc<ObjectStore>,
1493        store_object_id: u64,
1494        object_cache: Box<dyn ObjectCache<ObjectKey, ObjectValue>>,
1495    ) -> Result<Arc<ObjectStore>, Error> {
1496        let handle =
1497            ObjectStore::open_object(parent_store, store_object_id, HandleOptions::default(), None)
1498                .await?;
1499
1500        let info = load_store_info(parent_store, store_object_id).await?;
1501        let is_encrypted = info.mutations_key.is_some();
1502
1503        let mut total_layer_size = 0;
1504        let last_object_id;
1505
1506        // TODO(https://fxbug.dev/42178043): the layer size here could be bad and cause overflow.
1507
1508        // If the store is encrypted, we can't open the object tree layers now, but we need to
1509        // compute the size of the layers.
1510        if is_encrypted {
1511            for &oid in &info.layers {
1512                total_layer_size += parent_store.get_file_size(oid).await?;
1513            }
1514            if info.encrypted_mutations_object_id != INVALID_OBJECT_ID {
1515                total_layer_size += layer_size_from_encrypted_mutations_size(
1516                    parent_store.get_file_size(info.encrypted_mutations_object_id).await?,
1517                );
1518            }
1519            last_object_id = LastObjectId::default();
1520        } else {
1521            last_object_id = LastObjectId { id: info.last_object_id, cipher: None };
1522        }
1523
1524        let fs = parent_store.filesystem();
1525
1526        let store = ObjectStore::new(
1527            Some(parent_store.clone()),
1528            store_object_id,
1529            fs.clone(),
1530            if is_encrypted { None } else { Some(info) },
1531            object_cache,
1532            None,
1533            if is_encrypted { LockState::Locked } else { LockState::Unencrypted },
1534            last_object_id,
1535        );
1536
1537        assert!(store.store_info_handle.set(handle).is_ok(), "Failed to set store_info_handle!");
1538
1539        if !is_encrypted {
1540            let object_tree_layer_object_ids =
1541                store.store_info.lock().as_ref().unwrap().layers.clone();
1542            let object_layers = store.open_layers(object_tree_layer_object_ids, None).await?;
1543            total_layer_size = object_layers.iter().map(|h| h.get_size()).sum();
1544            store
1545                .tree
1546                .append_layers(object_layers)
1547                .await
1548                .context("Failed to read object store layers")?;
1549        }
1550
1551        fs.object_manager().update_reservation(
1552            store_object_id,
1553            tree::reservation_amount_from_layer_size(total_layer_size),
1554        );
1555
1556        Ok(store)
1557    }
1558
1559    async fn load_store_info(&self) -> Result<StoreInfo, Error> {
1560        load_store_info(self.parent_store.as_ref().unwrap(), self.store_object_id).await
1561    }
1562
1563    async fn open_layers(
1564        &self,
1565        object_ids: impl std::iter::IntoIterator<Item = u64>,
1566        crypt: Option<Arc<dyn Crypt>>,
1567    ) -> Result<Vec<DataObjectHandle<ObjectStore>>, Error> {
1568        let parent_store = self.parent_store.as_ref().unwrap();
1569        let mut handles = Vec::new();
1570        let mut sizes = Vec::new();
1571        for object_id in object_ids {
1572            let handle = ObjectStore::open_object(
1573                &parent_store,
1574                object_id,
1575                HandleOptions::default(),
1576                crypt.clone(),
1577            )
1578            .await
1579            .with_context(|| format!("Failed to open layer file {}", object_id))?;
1580            sizes.push(handle.get_size());
1581            handles.push(handle);
1582        }
1583        self.counters.lock().persistent_layer_file_sizes = sizes;
1584        Ok(handles)
1585    }
1586
1587    /// Unlocks a store so that it is ready to be used.
1588    /// This is not thread-safe.
1589    pub async fn unlock(
1590        self: &Arc<Self>,
1591        owner: Weak<dyn StoreOwner>,
1592        crypt: Arc<dyn Crypt>,
1593    ) -> Result<(), Error> {
1594        self.unlock_inner(owner, crypt, /*read_only=*/ false).await
1595    }
1596
1597    /// Unlocks a store so that it is ready to be read from.
1598    /// The store will generally behave like it is still locked: when flushed, the store will
1599    /// write out its mutations into the encrypted mutations file, rather than directly updating
1600    /// the layer files of the object store.
1601    /// Re-locking the store (which *must* be done with `Self::lock_read_only` will not trigger a
1602    /// flush, although the store might still be flushed during other operations.
1603    /// This is not thread-safe.
1604    pub async fn unlock_read_only(self: &Arc<Self>, crypt: Arc<dyn Crypt>) -> Result<(), Error> {
1605        self.unlock_inner(NO_OWNER, crypt, /*read_only=*/ true).await
1606    }
1607
1608    async fn unlock_inner(
1609        self: &Arc<Self>,
1610        owner: Weak<dyn StoreOwner>,
1611        crypt: Arc<dyn Crypt>,
1612        read_only: bool,
1613    ) -> Result<(), Error> {
1614        match &*self.lock_state.lock() {
1615            LockState::Locked => {}
1616            LockState::Unencrypted => bail!(FxfsError::InvalidArgs),
1617            LockState::Invalid => bail!(FxfsError::Internal),
1618            LockState::Unlocked { .. } | LockState::UnlockedReadOnly(..) => {
1619                bail!(FxfsError::AlreadyBound)
1620            }
1621            LockState::Unknown => panic!("Store was unlocked before replay"),
1622            LockState::Locking => panic!("Store is being locked"),
1623            LockState::Unlocking => panic!("Store is being unlocked"),
1624        }
1625        // We must lock flushing since that can modify store_info and the encrypted mutations file.
1626        let keys = lock_keys![LockKey::flush(self.store_object_id())];
1627        let fs = self.filesystem();
1628        let guard = fs.lock_manager().write_lock(keys).await;
1629
1630        let store_info = self.load_store_info().await?;
1631
1632        self.tree
1633            .append_layers(
1634                self.open_layers(store_info.layers.iter().cloned(), Some(crypt.clone())).await?,
1635            )
1636            .await
1637            .context("Failed to read object tree layer file contents")?;
1638
1639        let unwrapped_key = crypt
1640            .unwrap_key(store_info.mutations_key.as_ref().unwrap(), self.store_object_id)
1641            .await
1642            .context("Failed to unwrap mutations keys")?;
1643        // The ChaCha20 stream cipher we use supports up to 64 GiB.  By default we'll roll the key
1644        // after every 128 MiB.  Here we just need to pick a number that won't cause issues if it
1645        // wraps, so we just use u32::MAX (the offset is u64).
1646        ensure!(store_info.mutations_cipher_offset <= u32::MAX as u64, FxfsError::Inconsistent);
1647        let mut mutations_cipher =
1648            StreamCipher::new(&unwrapped_key, store_info.mutations_cipher_offset);
1649
1650        let wrapped_key = store_info.object_id_key.as_ref().ok_or(FxfsError::Inconsistent)?;
1651        let object_id_cipher =
1652            Ff1::new(&crypt.unwrap_key(wrapped_key, self.store_object_id).await?);
1653        {
1654            let mut last_object_id = self.last_object_id.lock();
1655            last_object_id.cipher = Some(object_id_cipher);
1656        }
1657        self.update_last_object_id(store_info.last_object_id);
1658
1659        // Apply the encrypted mutations.
1660        let mut mutations = {
1661            if store_info.encrypted_mutations_object_id == INVALID_OBJECT_ID {
1662                EncryptedMutations::default()
1663            } else {
1664                let parent_store = self.parent_store.as_ref().unwrap();
1665                let handle = ObjectStore::open_object(
1666                    &parent_store,
1667                    store_info.encrypted_mutations_object_id,
1668                    HandleOptions::default(),
1669                    None,
1670                )
1671                .await?;
1672                let mut cursor = std::io::Cursor::new(
1673                    handle
1674                        .contents(MAX_ENCRYPTED_MUTATIONS_SIZE)
1675                        .await
1676                        .context(FxfsError::Inconsistent)?,
1677                );
1678                let mut mutations = EncryptedMutations::deserialize_with_version(&mut cursor)
1679                    .context("Failed to deserialize EncryptedMutations")?
1680                    .0;
1681                let len = cursor.get_ref().len() as u64;
1682                while cursor.position() < len {
1683                    mutations.extend(
1684                        &EncryptedMutations::deserialize_with_version(&mut cursor)
1685                            .context("Failed to deserialize EncryptedMutations")?
1686                            .0,
1687                    );
1688                }
1689                mutations
1690            }
1691        };
1692
1693        // This assumes that the journal has no buffered mutations for this store (see Self::lock).
1694        let journaled = EncryptedMutations::from_replayed_mutations(
1695            self.store_object_id,
1696            fs.journal()
1697                .read_transactions_for_object(self.store_object_id)
1698                .await
1699                .context("Failed to read encrypted mutations from journal")?,
1700        );
1701        mutations.extend(&journaled);
1702
1703        let _ = std::mem::replace(&mut *self.lock_state.lock(), LockState::Unlocking);
1704        *self.store_info.lock() = Some(store_info);
1705
1706        // If we fail, clean up.
1707        let clean_up = scopeguard::guard((), |_| {
1708            *self.lock_state.lock() = LockState::Locked;
1709            *self.store_info.lock() = None;
1710            // Make sure we don't leave unencrypted data lying around in memory.
1711            self.tree.reset();
1712        });
1713
1714        let EncryptedMutations { transactions, mut data, mutations_key_roll } = mutations;
1715
1716        let mut slice = &mut data[..];
1717        let mut last_offset = 0;
1718        for (offset, key) in mutations_key_roll {
1719            let split_offset = offset
1720                .checked_sub(last_offset)
1721                .ok_or(FxfsError::Inconsistent)
1722                .context("Invalid mutation key roll offset")?;
1723            last_offset = offset;
1724            ensure!(split_offset <= slice.len(), FxfsError::Inconsistent);
1725            let (old, new) = slice.split_at_mut(split_offset);
1726            mutations_cipher.decrypt(old);
1727            let unwrapped_key = crypt
1728                .unwrap_key(&key, self.store_object_id)
1729                .await
1730                .context("Failed to unwrap mutations keys")?;
1731            mutations_cipher = StreamCipher::new(&unwrapped_key, 0);
1732            slice = new;
1733        }
1734        mutations_cipher.decrypt(slice);
1735
1736        // Always roll the mutations key when we unlock which guarantees we won't reuse a
1737        // previous key and nonce.
1738        self.roll_mutations_key(crypt.as_ref()).await?;
1739
1740        let mut cursor = std::io::Cursor::new(data);
1741        for (checkpoint, count) in transactions {
1742            let context = ApplyContext { mode: ApplyMode::Replay, checkpoint };
1743            for _ in 0..count {
1744                let mutation =
1745                    Mutation::deserialize_from_version(&mut cursor, context.checkpoint.version)
1746                        .context("failed to deserialize encrypted mutation")?;
1747                self.apply_mutation(mutation, &context, AssocObj::None)
1748                    .context("failed to apply encrypted mutation")?;
1749            }
1750        }
1751
1752        *self.lock_state.lock() = if read_only {
1753            LockState::UnlockedReadOnly(crypt)
1754        } else {
1755            LockState::Unlocked { owner, crypt }
1756        };
1757
1758        // To avoid unbounded memory growth, we should flush the encrypted mutations now. Otherwise
1759        // it's possible for more writes to be queued and for the store to be locked before we can
1760        // flush anything and that can repeat.
1761        std::mem::drop(guard);
1762
1763        if !read_only && !self.filesystem().options().read_only {
1764            self.flush_with_reason(flush::Reason::Unlock).await?;
1765
1766            // Reap purged files within this store.
1767            let _ = self.filesystem().graveyard().initial_reap(&self).await?;
1768        }
1769
1770        // Return and cancel the clean up.
1771        Ok(ScopeGuard::into_inner(clean_up))
1772    }
1773
1774    pub fn is_locked(&self) -> bool {
1775        matches!(
1776            *self.lock_state.lock(),
1777            LockState::Locked | LockState::Locking | LockState::Unknown
1778        )
1779    }
1780
1781    /// NB: This is not the converse of `is_locked`, as there are lock states where neither are
1782    /// true.
1783    pub fn is_unlocked(&self) -> bool {
1784        matches!(
1785            *self.lock_state.lock(),
1786            LockState::Unlocked { .. } | LockState::UnlockedReadOnly { .. } | LockState::Unlocking
1787        )
1788    }
1789
1790    pub fn is_unknown(&self) -> bool {
1791        matches!(*self.lock_state.lock(), LockState::Unknown)
1792    }
1793
1794    pub fn is_encrypted(&self) -> bool {
1795        self.store_info.lock().as_ref().unwrap().mutations_key.is_some()
1796    }
1797
1798    // Locks a store.
1799    // This operation will take a flush lock on the store, in case any flushes are ongoing.  Any
1800    // ongoing store accesses might be interrupted by this.  See `Self::crypt`.
1801    // Whilst this can return an error, the store will be placed into an unusable but safe state
1802    // (i.e. no lingering unencrypted data) if an error is encountered.
1803    pub async fn lock(&self) -> Result<(), Error> {
1804        // We must lock flushing since it is not safe for that to be happening whilst we are locking
1805        // the store.
1806        let keys = lock_keys![LockKey::flush(self.store_object_id())];
1807        let fs = self.filesystem();
1808        let _guard = fs.lock_manager().write_lock(keys).await;
1809
1810        {
1811            let mut lock_state = self.lock_state.lock();
1812            if let LockState::Unlocked { .. } = &*lock_state {
1813                *lock_state = LockState::Locking;
1814            } else {
1815                panic!("Unexpected lock state: {:?}", &*lock_state);
1816            }
1817        }
1818
1819        // Sync the journal now to ensure that any buffered mutations for this store make it out to
1820        // disk.  This is necessary to be able to unlock the store again.
1821        // We need to establish a barrier at this point (so that the journaled writes are observable
1822        // by any future attempts to unlock the store), hence the flush_device.
1823        let sync_result =
1824            self.filesystem().sync(SyncOptions { flush_device: true, ..Default::default() }).await;
1825
1826        *self.lock_state.lock() = if let Err(error) = &sync_result {
1827            error!(error:?; "Failed to sync journal; store will no longer be usable");
1828            LockState::Invalid
1829        } else {
1830            LockState::Locked
1831        };
1832        self.key_manager.clear();
1833        *self.store_info.lock() = None;
1834        self.tree.reset();
1835
1836        sync_result
1837    }
1838
1839    // Locks a store which was previously unlocked read-only (see `Self::unlock_read_only`).  Data
1840    // is not flushed, and instead any journaled mutations are buffered back into the ObjectStore
1841    // and will be replayed next time the store is unlocked.
1842    pub fn lock_read_only(&self) {
1843        *self.lock_state.lock() = LockState::Locked;
1844        *self.store_info.lock() = None;
1845        self.tree.reset();
1846    }
1847
1848    // Returns INVALID_OBJECT_ID if the object ID cipher needs to be created or rolled.
1849    fn maybe_get_next_object_id(&self) -> u64 {
1850        let mut last_object_id = self.last_object_id.lock();
1851        if last_object_id.should_create_cipher() {
1852            INVALID_OBJECT_ID
1853        } else {
1854            last_object_id.get_next_object_id()
1855        }
1856    }
1857
1858    /// Returns a new object ID that can be used.  This will create an object ID cipher if needed.
1859    ///
1860    /// If the object ID key needs to be rolled, a new transaction will be created and committed.
1861    /// This transaction does not take the filesystem lock, hence `txn_guard`.
1862    pub async fn get_next_object_id(&self, txn_guard: &TxnGuard<'_>) -> Result<u64, Error> {
1863        let object_id = self.maybe_get_next_object_id();
1864        if object_id != INVALID_OBJECT_ID {
1865            return Ok(object_id);
1866        }
1867
1868        // Create a transaction (which has a lock) and then check again.
1869        let mut transaction = self
1870            .filesystem()
1871            .new_transaction(
1872                lock_keys![LockKey::object(
1873                    self.parent_store.as_ref().unwrap().store_object_id,
1874                    self.store_object_id,
1875                )],
1876                Options {
1877                    // We must skip journal checks because this transaction might be needed to
1878                    // compact.
1879                    skip_journal_checks: true,
1880                    borrow_metadata_space: true,
1881                    txn_guard: Some(txn_guard),
1882                    ..Default::default()
1883                },
1884            )
1885            .await?;
1886
1887        {
1888            let mut last_object_id = self.last_object_id.lock();
1889            if !last_object_id.should_create_cipher() {
1890                // We lost a race.
1891                return Ok(last_object_id.get_next_object_id());
1892            }
1893            // It shouldn't be possible for last_object_id to wrap within our lifetime, so if this
1894            // happens, it's most likely due to corruption.
1895            ensure!(
1896                last_object_id.id & OBJECT_ID_HI_MASK != OBJECT_ID_HI_MASK,
1897                FxfsError::Inconsistent
1898            );
1899        }
1900
1901        // Create a key.
1902        let (object_id_wrapped, object_id_unwrapped) =
1903            self.crypt().unwrap().create_key(self.store_object_id, KeyPurpose::Metadata).await?;
1904
1905        // Update StoreInfo.
1906        let buf = {
1907            let serialized_info = {
1908                let mut store_info = self.store_info.lock();
1909                let store_info = store_info.as_mut().unwrap();
1910                store_info.object_id_key = Some(object_id_wrapped);
1911                let mut serialized_info = Vec::new();
1912                store_info.serialize_with_version(&mut serialized_info)?;
1913                serialized_info
1914            };
1915            let mut buf = self.device.allocate_buffer(serialized_info.len()).await;
1916            buf.as_mut_slice().copy_from_slice(&serialized_info[..]);
1917            buf
1918        };
1919
1920        self.store_info_handle
1921            .get()
1922            .unwrap()
1923            .txn_write(&mut transaction, 0u64, buf.as_ref())
1924            .await?;
1925        transaction.commit().await?;
1926
1927        let mut last_object_id = self.last_object_id.lock();
1928        last_object_id.cipher = Some(Ff1::new(&object_id_unwrapped));
1929        last_object_id.id = (last_object_id.id + (1 << 32)) & OBJECT_ID_HI_MASK;
1930
1931        Ok((last_object_id.id & OBJECT_ID_HI_MASK)
1932            | last_object_id.cipher.as_ref().unwrap().encrypt(last_object_id.id as u32) as u64)
1933    }
1934
1935    fn allocator(&self) -> Arc<Allocator> {
1936        self.filesystem().allocator()
1937    }
1938
1939    // If |transaction| has an impending mutation for the underlying object, returns that.
1940    // Otherwise, looks up the object from the tree and returns a suitable mutation for it.  The
1941    // mutation is returned here rather than the item because the mutation includes the operation
1942    // which has significance: inserting an object implies it's the first of its kind unlike
1943    // replacing an object.
1944    async fn txn_get_object_mutation(
1945        &self,
1946        transaction: &Transaction<'_>,
1947        object_id: u64,
1948    ) -> Result<ObjectStoreMutation, Error> {
1949        if let Some(mutation) =
1950            transaction.get_object_mutation(self.store_object_id, ObjectKey::object(object_id))
1951        {
1952            Ok(mutation.clone())
1953        } else {
1954            Ok(ObjectStoreMutation {
1955                item: self
1956                    .tree
1957                    .find(&ObjectKey::object(object_id))
1958                    .await?
1959                    .ok_or(FxfsError::Inconsistent)
1960                    .context("Object id missing")?,
1961                op: Operation::ReplaceOrInsert,
1962            })
1963        }
1964    }
1965
1966    fn update_last_object_id(&self, mut object_id: u64) {
1967        let mut last_object_id = self.last_object_id.lock();
1968        // For encrypted stores, object_id will be encrypted here, so we must decrypt first.
1969        if let Some(cipher) = &last_object_id.cipher {
1970            // If the object ID cipher has been rolled, then it's possible we might see object IDs
1971            // that were generated using a different cipher so the decrypt here will return the
1972            // wrong value, but that won't matter because the hi part of the object ID should still
1973            // discriminate.
1974            object_id = object_id & OBJECT_ID_HI_MASK | cipher.decrypt(object_id as u32) as u64;
1975        }
1976        if object_id > last_object_id.id {
1977            last_object_id.id = object_id;
1978        }
1979    }
1980
1981    /// Adds the specified object to the graveyard.
1982    pub fn add_to_graveyard(&self, transaction: &mut Transaction<'_>, object_id: u64) {
1983        let graveyard_id = self.graveyard_directory_object_id();
1984        assert_ne!(graveyard_id, INVALID_OBJECT_ID);
1985        transaction.add(
1986            self.store_object_id,
1987            Mutation::replace_or_insert_object(
1988                ObjectKey::graveyard_entry(graveyard_id, object_id),
1989                ObjectValue::Some,
1990            ),
1991        );
1992    }
1993
1994    /// Removes the specified object from the graveyard.  NB: Care should be taken when calling
1995    /// this because graveyard entries are used for purging deleted files *and* for trimming
1996    /// extents.  For example, consider the following sequence:
1997    ///
1998    ///     1. Add Trim graveyard entry.
1999    ///     2. Replace with Some graveyard entry (see above).
2000    ///     3. Remove graveyard entry.
2001    ///
2002    /// If the desire in #3 is just to cancel the effect of the Some entry, then #3 should
2003    /// actually be:
2004    ///
2005    ///     3. Replace with Trim graveyard entry.
2006    pub fn remove_from_graveyard(&self, transaction: &mut Transaction<'_>, object_id: u64) {
2007        transaction.add(
2008            self.store_object_id,
2009            Mutation::replace_or_insert_object(
2010                ObjectKey::graveyard_entry(self.graveyard_directory_object_id(), object_id),
2011                ObjectValue::None,
2012            ),
2013        );
2014    }
2015
2016    /// Removes the specified attribute from the graveyard. Unlike object graveyard entries,
2017    /// attribute graveyard entries only have one functionality (i.e. to purge deleted attributes)
2018    /// so the caller does not need to be concerned about replacing the graveyard attribute entry
2019    /// with its prior state when cancelling it. See comment on `remove_from_graveyard()`.
2020    pub fn remove_attribute_from_graveyard(
2021        &self,
2022        transaction: &mut Transaction<'_>,
2023        object_id: u64,
2024        attribute_id: u64,
2025    ) {
2026        transaction.add(
2027            self.store_object_id,
2028            Mutation::replace_or_insert_object(
2029                ObjectKey::graveyard_attribute_entry(
2030                    self.graveyard_directory_object_id(),
2031                    object_id,
2032                    attribute_id,
2033                ),
2034                ObjectValue::None,
2035            ),
2036        );
2037    }
2038
2039    // Roll the mutations key.  The new key will be written for the next encrypted mutation.
2040    async fn roll_mutations_key(&self, crypt: &dyn Crypt) -> Result<(), Error> {
2041        let (wrapped_key, unwrapped_key) =
2042            crypt.create_key(self.store_object_id, KeyPurpose::Metadata).await?;
2043
2044        // The mutations_cipher lock must be held for the duration so that mutations_cipher and
2045        // store_info are updated atomically.  Otherwise, write_mutation could find a new cipher but
2046        // end up writing the wrong wrapped key.
2047        let mut cipher = self.mutations_cipher.lock();
2048        *cipher = Some(StreamCipher::new(&unwrapped_key, 0));
2049        self.store_info.lock().as_mut().unwrap().mutations_key = Some(wrapped_key);
2050        // mutations_cipher_offset is updated by flush.
2051        Ok(())
2052    }
2053
2054    /// Returns the link of a symlink object.
2055    pub async fn read_symlink(&self, object_id: u64) -> Result<Vec<u8>, Error> {
2056        match self.tree.find(&ObjectKey::object(object_id)).await? {
2057            None => bail!(FxfsError::NotFound),
2058            Some(Item {
2059                value: ObjectValue::Object { kind: ObjectKind::Symlink { link, .. }, .. },
2060                ..
2061            }) => Ok(link),
2062            Some(item) => Err(anyhow!(FxfsError::Inconsistent)
2063                .context(format!("Unexpected item in lookup: {item:?}"))),
2064        }
2065    }
2066
2067    /// Retrieves the wrapped keys for the given object.  The keys *should* be known to exist and it
2068    /// will be considered an inconsistency if they don't.
2069    pub async fn get_keys(&self, object_id: u64) -> Result<WrappedKeys, Error> {
2070        match self.tree.find(&ObjectKey::keys(object_id)).await?.ok_or(FxfsError::Inconsistent)? {
2071            Item { value: ObjectValue::Keys(EncryptionKeys::AES256XTS(keys)), .. } => Ok(keys),
2072            _ => Err(anyhow!(FxfsError::Inconsistent).context("open_object: Expected keys")),
2073        }
2074    }
2075
2076    pub async fn update_attributes<'a>(
2077        &self,
2078        transaction: &mut Transaction<'a>,
2079        object_id: u64,
2080        node_attributes: Option<&fio::MutableNodeAttributes>,
2081        change_time: Option<Timestamp>,
2082    ) -> Result<(), Error> {
2083        if change_time.is_none() {
2084            if let Some(attributes) = node_attributes {
2085                let empty_attributes = fio::MutableNodeAttributes { ..Default::default() };
2086                if *attributes == empty_attributes {
2087                    return Ok(());
2088                }
2089            } else {
2090                return Ok(());
2091            }
2092        }
2093        let mut mutation = self.txn_get_object_mutation(transaction, object_id).await?;
2094        if let ObjectValue::Object { ref mut attributes, .. } = mutation.item.value {
2095            if let Some(time) = change_time {
2096                attributes.change_time = time;
2097            }
2098            if let Some(node_attributes) = node_attributes {
2099                if let Some(time) = node_attributes.creation_time {
2100                    attributes.creation_time = Timestamp::from_nanos(time);
2101                }
2102                if let Some(time) = node_attributes.modification_time {
2103                    attributes.modification_time = Timestamp::from_nanos(time);
2104                }
2105                if let Some(time) = node_attributes.access_time {
2106                    attributes.access_time = Timestamp::from_nanos(time);
2107                }
2108                if node_attributes.mode.is_some()
2109                    || node_attributes.uid.is_some()
2110                    || node_attributes.gid.is_some()
2111                    || node_attributes.rdev.is_some()
2112                {
2113                    if let Some(a) = &mut attributes.posix_attributes {
2114                        if let Some(mode) = node_attributes.mode {
2115                            a.mode = mode;
2116                        }
2117                        if let Some(uid) = node_attributes.uid {
2118                            a.uid = uid;
2119                        }
2120                        if let Some(gid) = node_attributes.gid {
2121                            a.gid = gid;
2122                        }
2123                        if let Some(rdev) = node_attributes.rdev {
2124                            a.rdev = rdev;
2125                        }
2126                    } else {
2127                        attributes.posix_attributes = Some(PosixAttributes {
2128                            mode: node_attributes.mode.unwrap_or_default(),
2129                            uid: node_attributes.uid.unwrap_or_default(),
2130                            gid: node_attributes.gid.unwrap_or_default(),
2131                            rdev: node_attributes.rdev.unwrap_or_default(),
2132                        });
2133                    }
2134                }
2135            }
2136        } else {
2137            bail!(anyhow!(FxfsError::Inconsistent)
2138                .context("ObjectStore.update_attributes: Expected object value"));
2139        };
2140        transaction.add(self.store_object_id(), Mutation::ObjectStore(mutation));
2141        Ok(())
2142    }
2143}
2144
2145#[async_trait]
2146impl JournalingObject for ObjectStore {
2147    fn apply_mutation(
2148        &self,
2149        mutation: Mutation,
2150        context: &ApplyContext<'_, '_>,
2151        _assoc_obj: AssocObj<'_>,
2152    ) -> Result<(), Error> {
2153        match &*self.lock_state.lock() {
2154            LockState::Locked | LockState::Locking => {
2155                ensure!(
2156                    matches!(mutation, Mutation::BeginFlush | Mutation::EndFlush)
2157                        || matches!(
2158                            mutation,
2159                            Mutation::EncryptedObjectStore(_) | Mutation::UpdateMutationsKey(_)
2160                                if context.mode.is_replay()
2161                        ),
2162                    anyhow!(FxfsError::Inconsistent)
2163                        .context(format!("Unexpected mutation for encrypted store: {mutation:?}"))
2164                );
2165            }
2166            LockState::Invalid
2167            | LockState::Unlocking
2168            | LockState::Unencrypted
2169            | LockState::Unlocked { .. }
2170            | LockState::UnlockedReadOnly(..) => {}
2171            lock_state @ _ => panic!("Unexpected lock state: {lock_state:?}"),
2172        }
2173        match mutation {
2174            Mutation::ObjectStore(ObjectStoreMutation { mut item, op }) => {
2175                item.sequence = context.checkpoint.file_offset;
2176                match op {
2177                    Operation::Insert => {
2178                        // If we are inserting an object record for the first time, it signifies the
2179                        // birth of the object so we need to adjust the object count.
2180                        if matches!(item.value, ObjectValue::Object { .. }) {
2181                            {
2182                                let info = &mut self.store_info.lock();
2183                                let object_count = &mut info.as_mut().unwrap().object_count;
2184                                *object_count = object_count.saturating_add(1);
2185                            }
2186                            if context.mode.is_replay() {
2187                                self.update_last_object_id(item.key.object_id);
2188                            }
2189                        }
2190                        self.tree.insert(item)?;
2191                    }
2192                    Operation::ReplaceOrInsert => {
2193                        self.tree.replace_or_insert(item);
2194                    }
2195                    Operation::Merge => {
2196                        if item.is_tombstone() {
2197                            let info = &mut self.store_info.lock();
2198                            let object_count = &mut info.as_mut().unwrap().object_count;
2199                            *object_count = object_count.saturating_sub(1);
2200                        }
2201                        let lower_bound = item.key.key_for_merge_into();
2202                        self.tree.merge_into(item, &lower_bound);
2203                    }
2204                }
2205            }
2206            Mutation::BeginFlush => {
2207                ensure!(self.parent_store.is_some(), FxfsError::Inconsistent);
2208                self.tree.seal();
2209            }
2210            Mutation::EndFlush => ensure!(self.parent_store.is_some(), FxfsError::Inconsistent),
2211            Mutation::EncryptedObjectStore(_) | Mutation::UpdateMutationsKey(_) => {
2212                // We will process these during Self::unlock.
2213                ensure!(
2214                    !matches!(&*self.lock_state.lock(), LockState::Unencrypted),
2215                    FxfsError::Inconsistent
2216                );
2217            }
2218            Mutation::CreateInternalDir(object_id) => {
2219                ensure!(object_id != INVALID_OBJECT_ID, FxfsError::Inconsistent);
2220                self.store_info.lock().as_mut().unwrap().internal_directory_object_id = object_id;
2221            }
2222            _ => bail!("unexpected mutation: {:?}", mutation),
2223        }
2224        self.counters.lock().mutations_applied += 1;
2225        Ok(())
2226    }
2227
2228    fn drop_mutation(&self, _mutation: Mutation, _transaction: &Transaction<'_>) {
2229        self.counters.lock().mutations_dropped += 1;
2230    }
2231
2232    /// Push all in-memory structures to the device. This is not necessary for sync since the
2233    /// journal will take care of it.  This is supposed to be called when there is either memory or
2234    /// space pressure (flushing the store will persist in-memory data and allow the journal file to
2235    /// be trimmed).
2236    ///
2237    /// Also returns the earliest version of a struct in the filesystem (when known).
2238    async fn flush(&self) -> Result<Version, Error> {
2239        self.flush_with_reason(flush::Reason::Journal).await
2240    }
2241
2242    fn write_mutation(&self, mutation: &Mutation, mut writer: journal::Writer<'_>) {
2243        // Intentionally enumerating all variants to force a decision on any new variants. Encrypt
2244        // all mutations that could affect an encrypted object store contents or the `StoreInfo` of
2245        // the encrypted object store. During `unlock()` any mutations which haven't been encrypted
2246        // won't be replayed after reading `StoreInfo`.
2247        match mutation {
2248            // Whilst CreateInternalDir is a mutation for `StoreInfo`, which isn't encrypted, we
2249            // still choose to encrypt the mutation because it makes it easier to deal with replay.
2250            // When we replay mutations for an encrypted store, the only thing we keep in memory are
2251            // the encrypted mutations; we don't keep `StoreInfo` or changes to it in memory. So, by
2252            // encrypting the CreateInternalDir mutation here, it means we don't have to track both
2253            // encrypted mutations bound for the LSM tree and unencrypted mutations for `StoreInfo`
2254            // to use in `unlock()`. It'll just bundle CreateInternalDir mutations with the other
2255            // encrypted mutations and handled them all in sequence during `unlock()`.
2256            Mutation::ObjectStore(_) | Mutation::CreateInternalDir(_) => {
2257                let mut cipher = self.mutations_cipher.lock();
2258                if let Some(cipher) = cipher.as_mut() {
2259                    // If this is the first time we've used this key, we must write the key out.
2260                    if cipher.offset() == 0 {
2261                        writer.write(Mutation::update_mutations_key(
2262                            self.store_info
2263                                .lock()
2264                                .as_ref()
2265                                .unwrap()
2266                                .mutations_key
2267                                .as_ref()
2268                                .unwrap()
2269                                .clone(),
2270                        ));
2271                    }
2272                    let mut buffer = Vec::new();
2273                    mutation.serialize_into(&mut buffer).unwrap();
2274                    cipher.encrypt(&mut buffer);
2275                    writer.write(Mutation::EncryptedObjectStore(buffer.into()));
2276                    return;
2277                }
2278            }
2279            // `EncryptedObjectStore` and `UpdateMutationsKey` are both obviously associated with
2280            // encrypted object stores, but are either the encrypted mutation data itself or
2281            // metadata governing how the data will be encrypted. They should only be produced here.
2282            Mutation::EncryptedObjectStore(_) | Mutation::UpdateMutationsKey(_) => {
2283                debug_assert!(false, "Only this method should generate encrypted mutations");
2284            }
2285            // `BeginFlush` and `EndFlush` are not needed during `unlock()` and are needed during
2286            // the initial journal replay, so should not be encrypted. `Allocator`, `DeleteVolume`,
2287            // `UpdateBorrowed` mutations are never associated with an encrypted store as we do not
2288            // encrypt the allocator or root/root-parent stores so we can avoid the locking.
2289            Mutation::Allocator(_)
2290            | Mutation::BeginFlush
2291            | Mutation::EndFlush
2292            | Mutation::DeleteVolume
2293            | Mutation::UpdateBorrowed(_) => {}
2294        }
2295        writer.write(mutation.clone());
2296    }
2297}
2298
2299impl HandleOwner for ObjectStore {}
2300
2301impl AsRef<ObjectStore> for ObjectStore {
2302    fn as_ref(&self) -> &ObjectStore {
2303        self
2304    }
2305}
2306
2307fn layer_size_from_encrypted_mutations_size(size: u64) -> u64 {
2308    // This is similar to reserved_space_from_journal_usage. It needs to be a worst case estimate of
2309    // the amount of metadata space that might need to be reserved to allow the encrypted mutations
2310    // to be written to layer files.  It needs to be >= than reservation_amount_from_layer_size will
2311    // return once the data has been written to layer files and <= than
2312    // reserved_space_from_journal_usage would use.  We can't just use
2313    // reserved_space_from_journal_usage because the encrypted mutations file includes some extra
2314    // data (it includes the checkpoints) that isn't written in the same way to the journal.
2315    size * 3
2316}
2317
2318impl AssociatedObject for ObjectStore {}
2319
2320/// Argument to the trim_some method.
2321#[derive(Debug)]
2322pub enum TrimMode {
2323    /// Trim extents beyond the current size.
2324    UseSize,
2325
2326    /// Trim extents beyond the supplied offset.
2327    FromOffset(u64),
2328
2329    /// Remove the object (or attribute) from the store once it is fully trimmed.
2330    Tombstone(TombstoneMode),
2331}
2332
2333/// Sets the mode for tombstoning (either at the object or attribute level).
2334#[derive(Debug)]
2335pub enum TombstoneMode {
2336    Object,
2337    Attribute,
2338}
2339
2340/// Result of the trim_some method.
2341#[derive(Debug)]
2342pub enum TrimResult {
2343    /// We reached the limit of the transaction and more extents might follow.
2344    Incomplete,
2345
2346    /// We finished this attribute.  Returns the ID of the next attribute for the same object if
2347    /// there is one.
2348    Done(Option<u64>),
2349}
2350
2351/// Loads store info.
2352pub async fn load_store_info(
2353    parent: &Arc<ObjectStore>,
2354    store_object_id: u64,
2355) -> Result<StoreInfo, Error> {
2356    let handle =
2357        ObjectStore::open_object(parent, store_object_id, HandleOptions::default(), None).await?;
2358
2359    Ok(if handle.get_size() > 0 {
2360        let serialized_info = handle.contents(MAX_STORE_INFO_SERIALIZED_SIZE).await?;
2361        let mut cursor = std::io::Cursor::new(serialized_info);
2362        let (store_info, _) = StoreInfo::deserialize_with_version(&mut cursor)
2363            .context("Failed to deserialize StoreInfo")?;
2364        store_info
2365    } else {
2366        // The store_info will be absent for a newly created and empty object store.
2367        StoreInfo::default()
2368    })
2369}
2370
2371#[cfg(test)]
2372mod tests {
2373    use super::{
2374        StoreInfo, DEFAULT_DATA_ATTRIBUTE_ID, FSVERITY_MERKLE_ATTRIBUTE_ID,
2375        MAX_STORE_INFO_SERIALIZED_SIZE, NO_OWNER, OBJECT_ID_HI_MASK,
2376    };
2377    use crate::errors::FxfsError;
2378    use crate::filesystem::{FxFilesystem, JournalingObject, OpenFxFilesystem, SyncOptions};
2379    use crate::fsck::fsck;
2380    use crate::lsm_tree::types::{ItemRef, LayerIterator};
2381    use crate::lsm_tree::Query;
2382    use crate::object_handle::{
2383        ObjectHandle, ReadObjectHandle, WriteObjectHandle, INVALID_OBJECT_ID,
2384    };
2385    use crate::object_store::directory::Directory;
2386    use crate::object_store::object_record::{AttributeKey, ObjectKey, ObjectKind, ObjectValue};
2387    use crate::object_store::transaction::{lock_keys, Options};
2388    use crate::object_store::volume::root_volume;
2389    use crate::object_store::{
2390        FsverityMetadata, HandleOptions, LockKey, Mutation, ObjectStore, RootDigest, StoreOwner,
2391    };
2392    use crate::serialized_types::VersionedLatest;
2393    use assert_matches::assert_matches;
2394    use async_trait::async_trait;
2395    use fuchsia_async as fasync;
2396    use fuchsia_sync::Mutex;
2397    use futures::join;
2398    use fxfs_crypto::{Crypt, WrappedKey, WrappedKeyBytes, WRAPPED_KEY_SIZE};
2399    use fxfs_insecure_crypto::InsecureCrypt;
2400    use std::sync::Arc;
2401    use std::time::Duration;
2402    use storage_device::fake_device::FakeDevice;
2403    use storage_device::DeviceHolder;
2404
2405    const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
2406
2407    async fn test_filesystem() -> OpenFxFilesystem {
2408        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
2409        FxFilesystem::new_empty(device).await.expect("new_empty failed")
2410    }
2411
2412    #[fuchsia::test]
2413    async fn test_item_sequences() {
2414        let fs = test_filesystem().await;
2415        let object1;
2416        let object2;
2417        let object3;
2418        let mut transaction = fs
2419            .clone()
2420            .new_transaction(lock_keys![], Options::default())
2421            .await
2422            .expect("new_transaction failed");
2423        let store = fs.root_store();
2424        object1 = Arc::new(
2425            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2426                .await
2427                .expect("create_object failed"),
2428        );
2429        transaction.commit().await.expect("commit failed");
2430        let mut transaction = fs
2431            .clone()
2432            .new_transaction(lock_keys![], Options::default())
2433            .await
2434            .expect("new_transaction failed");
2435        object2 = Arc::new(
2436            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2437                .await
2438                .expect("create_object failed"),
2439        );
2440        transaction.commit().await.expect("commit failed");
2441
2442        fs.sync(SyncOptions::default()).await.expect("sync failed");
2443
2444        let mut transaction = fs
2445            .clone()
2446            .new_transaction(lock_keys![], Options::default())
2447            .await
2448            .expect("new_transaction failed");
2449        object3 = Arc::new(
2450            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2451                .await
2452                .expect("create_object failed"),
2453        );
2454        transaction.commit().await.expect("commit failed");
2455
2456        let layer_set = store.tree.layer_set();
2457        let mut merger = layer_set.merger();
2458        let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
2459        let mut sequences = [0u64; 3];
2460        while let Some(ItemRef { key: ObjectKey { object_id, .. }, sequence, .. }) = iter.get() {
2461            if *object_id == object1.object_id() {
2462                sequences[0] = sequence;
2463            } else if *object_id == object2.object_id() {
2464                sequences[1] = sequence;
2465            } else if *object_id == object3.object_id() {
2466                sequences[2] = sequence;
2467            }
2468            iter.advance().await.expect("advance failed");
2469        }
2470
2471        assert!(sequences[0] <= sequences[1], "sequences: {:?}", sequences);
2472        // The last item came after a sync, so should be strictly greater.
2473        assert!(sequences[1] < sequences[2], "sequences: {:?}", sequences);
2474        fs.close().await.expect("Close failed");
2475    }
2476
2477    #[fuchsia::test]
2478    async fn test_verified_file_with_verified_attribute() {
2479        let fs: OpenFxFilesystem = test_filesystem().await;
2480        let mut transaction = fs
2481            .clone()
2482            .new_transaction(lock_keys![], Options::default())
2483            .await
2484            .expect("new_transaction failed");
2485        let store = fs.root_store();
2486        let object = Arc::new(
2487            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2488                .await
2489                .expect("create_object failed"),
2490        );
2491
2492        transaction.add(
2493            store.store_object_id(),
2494            Mutation::replace_or_insert_object(
2495                ObjectKey::attribute(
2496                    object.object_id(),
2497                    DEFAULT_DATA_ATTRIBUTE_ID,
2498                    AttributeKey::Attribute,
2499                ),
2500                ObjectValue::verified_attribute(
2501                    0,
2502                    FsverityMetadata { root_digest: RootDigest::Sha256([0; 32]), salt: vec![] },
2503                ),
2504            ),
2505        );
2506
2507        transaction.add(
2508            store.store_object_id(),
2509            Mutation::replace_or_insert_object(
2510                ObjectKey::attribute(
2511                    object.object_id(),
2512                    FSVERITY_MERKLE_ATTRIBUTE_ID,
2513                    AttributeKey::Attribute,
2514                ),
2515                ObjectValue::attribute(0, false),
2516            ),
2517        );
2518
2519        transaction.commit().await.unwrap();
2520
2521        let handle =
2522            ObjectStore::open_object(&store, object.object_id(), HandleOptions::default(), None)
2523                .await
2524                .expect("open_object failed");
2525
2526        assert!(handle.is_verified_file());
2527
2528        fs.close().await.expect("Close failed");
2529    }
2530
2531    #[fuchsia::test]
2532    async fn test_verified_file_without_verified_attribute() {
2533        let fs: OpenFxFilesystem = test_filesystem().await;
2534        let mut transaction = fs
2535            .clone()
2536            .new_transaction(lock_keys![], Options::default())
2537            .await
2538            .expect("new_transaction failed");
2539        let store = fs.root_store();
2540        let object = Arc::new(
2541            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2542                .await
2543                .expect("create_object failed"),
2544        );
2545
2546        transaction.commit().await.unwrap();
2547
2548        let handle =
2549            ObjectStore::open_object(&store, object.object_id(), HandleOptions::default(), None)
2550                .await
2551                .expect("open_object failed");
2552
2553        assert!(!handle.is_verified_file());
2554
2555        fs.close().await.expect("Close failed");
2556    }
2557
2558    #[fuchsia::test]
2559    async fn test_create_and_open_store() {
2560        let fs = test_filesystem().await;
2561        let store_id = {
2562            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
2563            root_volume
2564                .new_volume("test", NO_OWNER, Some(Arc::new(InsecureCrypt::new())))
2565                .await
2566                .expect("new_volume failed")
2567                .store_object_id()
2568        };
2569
2570        fs.close().await.expect("close failed");
2571        let device = fs.take_device().await;
2572        device.reopen(false);
2573        let fs = FxFilesystem::open(device).await.expect("open failed");
2574
2575        {
2576            let store = fs.object_manager().store(store_id).expect("store not found");
2577            store.unlock(NO_OWNER, Arc::new(InsecureCrypt::new())).await.expect("unlock failed");
2578        }
2579        fs.close().await.expect("Close failed");
2580    }
2581
2582    #[fuchsia::test]
2583    async fn test_create_and_open_internal_dir() {
2584        let fs = test_filesystem().await;
2585        let dir_id;
2586        let store_id;
2587        {
2588            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
2589            let store = root_volume
2590                .new_volume("test", NO_OWNER, Some(Arc::new(InsecureCrypt::new())))
2591                .await
2592                .expect("new_volume failed");
2593            dir_id =
2594                store.get_or_create_internal_directory_id().await.expect("Create internal dir");
2595            store_id = store.store_object_id();
2596        }
2597
2598        fs.close().await.expect("close failed");
2599        let device = fs.take_device().await;
2600        device.reopen(false);
2601        let fs = FxFilesystem::open(device).await.expect("open failed");
2602
2603        {
2604            let store = fs.object_manager().store(store_id).expect("store not found");
2605            store.unlock(NO_OWNER, Arc::new(InsecureCrypt::new())).await.expect("unlock failed");
2606            assert_eq!(
2607                dir_id,
2608                store.get_or_create_internal_directory_id().await.expect("Retrieving dir")
2609            );
2610            let obj = store
2611                .tree()
2612                .find(&ObjectKey::object(dir_id))
2613                .await
2614                .expect("Searching tree for dir")
2615                .unwrap();
2616            assert_matches!(
2617                obj.value,
2618                ObjectValue::Object { kind: ObjectKind::Directory { .. }, .. }
2619            );
2620        }
2621        fs.close().await.expect("Close failed");
2622    }
2623
2624    #[fuchsia::test]
2625    async fn test_create_and_open_internal_dir_unencrypted() {
2626        let fs = test_filesystem().await;
2627        let dir_id;
2628        let store_id;
2629        {
2630            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
2631            let store =
2632                root_volume.new_volume("test", NO_OWNER, None).await.expect("new_volume failed");
2633            dir_id =
2634                store.get_or_create_internal_directory_id().await.expect("Create internal dir");
2635            store_id = store.store_object_id();
2636        }
2637
2638        fs.close().await.expect("close failed");
2639        let device = fs.take_device().await;
2640        device.reopen(false);
2641        let fs = FxFilesystem::open(device).await.expect("open failed");
2642
2643        {
2644            let store = fs.object_manager().store(store_id).expect("store not found");
2645            assert_eq!(
2646                dir_id,
2647                store.get_or_create_internal_directory_id().await.expect("Retrieving dir")
2648            );
2649            let obj = store
2650                .tree()
2651                .find(&ObjectKey::object(dir_id))
2652                .await
2653                .expect("Searching tree for dir")
2654                .unwrap();
2655            assert_matches!(
2656                obj.value,
2657                ObjectValue::Object { kind: ObjectKind::Directory { .. }, .. }
2658            );
2659        }
2660        fs.close().await.expect("Close failed");
2661    }
2662
2663    #[fuchsia::test(threads = 10)]
2664    async fn test_old_layers_are_purged() {
2665        let fs = test_filesystem().await;
2666
2667        let store = fs.root_store();
2668        let mut transaction = fs
2669            .clone()
2670            .new_transaction(lock_keys![], Options::default())
2671            .await
2672            .expect("new_transaction failed");
2673        let object = Arc::new(
2674            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2675                .await
2676                .expect("create_object failed"),
2677        );
2678        transaction.commit().await.expect("commit failed");
2679
2680        store.flush().await.expect("flush failed");
2681
2682        let mut buf = object.allocate_buffer(5).await;
2683        buf.as_mut_slice().copy_from_slice(b"hello");
2684        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2685
2686        // Getting the layer-set should cause the flush to stall.
2687        let layer_set = store.tree().layer_set();
2688
2689        let done = Mutex::new(false);
2690        let mut object_id = 0;
2691
2692        join!(
2693            async {
2694                store.flush().await.expect("flush failed");
2695                assert!(*done.lock());
2696            },
2697            async {
2698                // This is a halting problem so all we can do is sleep.
2699                fasync::Timer::new(Duration::from_secs(1)).await;
2700                *done.lock() = true;
2701                object_id = layer_set.layers.last().unwrap().handle().unwrap().object_id();
2702                std::mem::drop(layer_set);
2703            }
2704        );
2705
2706        if let Err(e) = ObjectStore::open_object(
2707            &store.parent_store.as_ref().unwrap(),
2708            object_id,
2709            HandleOptions::default(),
2710            store.crypt(),
2711        )
2712        .await
2713        {
2714            assert!(FxfsError::NotFound.matches(&e));
2715        } else {
2716            panic!("open_object succeeded");
2717        }
2718    }
2719
2720    #[fuchsia::test]
2721    async fn test_tombstone_deletes_data() {
2722        let fs = test_filesystem().await;
2723        let root_store = fs.root_store();
2724        let child_id = {
2725            let mut transaction = fs
2726                .clone()
2727                .new_transaction(lock_keys![], Options::default())
2728                .await
2729                .expect("new_transaction failed");
2730            let child = ObjectStore::create_object(
2731                &root_store,
2732                &mut transaction,
2733                HandleOptions::default(),
2734                None,
2735            )
2736            .await
2737            .expect("create_object failed");
2738            transaction.commit().await.expect("commit failed");
2739
2740            // Allocate an extent in the file.
2741            let mut buffer = child.allocate_buffer(8192).await;
2742            buffer.as_mut_slice().fill(0xaa);
2743            child.write_or_append(Some(0), buffer.as_ref()).await.expect("write failed");
2744
2745            child.object_id()
2746        };
2747
2748        root_store.tombstone_object(child_id, Options::default()).await.expect("tombstone failed");
2749
2750        // Let fsck check allocations.
2751        fsck(fs.clone()).await.expect("fsck failed");
2752    }
2753
2754    #[fuchsia::test]
2755    async fn test_tombstone_purges_keys() {
2756        let fs = test_filesystem().await;
2757        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
2758        let store = root_volume
2759            .new_volume("test", NO_OWNER, Some(Arc::new(InsecureCrypt::new())))
2760            .await
2761            .expect("new_volume failed");
2762        let mut transaction = fs
2763            .clone()
2764            .new_transaction(lock_keys![], Options::default())
2765            .await
2766            .expect("new_transaction failed");
2767        let child =
2768            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2769                .await
2770                .expect("create_object failed");
2771        transaction.commit().await.expect("commit failed");
2772        assert!(store.key_manager.get(child.object_id()).await.unwrap().is_some());
2773        store
2774            .tombstone_object(child.object_id(), Options::default())
2775            .await
2776            .expect("tombstone_object failed");
2777        assert!(store.key_manager.get(child.object_id()).await.unwrap().is_none());
2778        fs.close().await.expect("close failed");
2779    }
2780
2781    #[fuchsia::test]
2782    async fn test_major_compaction_discards_unnecessary_records() {
2783        let fs = test_filesystem().await;
2784        let root_store = fs.root_store();
2785        let child_id = {
2786            let mut transaction = fs
2787                .clone()
2788                .new_transaction(lock_keys![], Options::default())
2789                .await
2790                .expect("new_transaction failed");
2791            let child = ObjectStore::create_object(
2792                &root_store,
2793                &mut transaction,
2794                HandleOptions::default(),
2795                None,
2796            )
2797            .await
2798            .expect("create_object failed");
2799            transaction.commit().await.expect("commit failed");
2800
2801            // Allocate an extent in the file.
2802            let mut buffer = child.allocate_buffer(8192).await;
2803            buffer.as_mut_slice().fill(0xaa);
2804            child.write_or_append(Some(0), buffer.as_ref()).await.expect("write failed");
2805
2806            child.object_id()
2807        };
2808
2809        root_store.tombstone_object(child_id, Options::default()).await.expect("tombstone failed");
2810        {
2811            let layers = root_store.tree.layer_set();
2812            let mut merger = layers.merger();
2813            let iter = merger
2814                .query(Query::FullRange(&ObjectKey::object(child_id)))
2815                .await
2816                .expect("seek failed");
2817            // Find at least one object still in the tree.
2818            match iter.get() {
2819                Some(ItemRef { key: ObjectKey { object_id, .. }, .. })
2820                    if *object_id == child_id => {}
2821                _ => panic!("Objects should still be in the tree."),
2822            }
2823        }
2824        root_store.flush().await.expect("flush failed");
2825
2826        // There should be no records for the object.
2827        let layers = root_store.tree.layer_set();
2828        let mut merger = layers.merger();
2829        let iter = merger
2830            .query(Query::FullRange(&ObjectKey::object(child_id)))
2831            .await
2832            .expect("seek failed");
2833        match iter.get() {
2834            None => {}
2835            Some(ItemRef { key: ObjectKey { object_id, .. }, .. }) => {
2836                assert_ne!(*object_id, child_id)
2837            }
2838        }
2839    }
2840
2841    #[fuchsia::test]
2842    async fn test_overlapping_extents_in_different_layers() {
2843        let fs = test_filesystem().await;
2844        let store = fs.root_store();
2845
2846        let mut transaction = fs
2847            .clone()
2848            .new_transaction(
2849                lock_keys![LockKey::object(
2850                    store.store_object_id(),
2851                    store.root_directory_object_id()
2852                )],
2853                Options::default(),
2854            )
2855            .await
2856            .expect("new_transaction failed");
2857        let root_directory =
2858            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
2859        let object = root_directory
2860            .create_child_file(&mut transaction, "test")
2861            .await
2862            .expect("create_child_file failed");
2863        transaction.commit().await.expect("commit failed");
2864
2865        let buf = object.allocate_buffer(16384).await;
2866        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2867
2868        store.flush().await.expect("flush failed");
2869
2870        object.write_or_append(Some(0), buf.subslice(0..4096)).await.expect("write failed");
2871
2872        // At this point, we should have an extent for 0..16384 in a layer that has been flushed,
2873        // and an extent for 0..4096 that partially overwrites it.  Writing to 0..16384 should
2874        // overwrite both of those extents.
2875        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2876
2877        fsck(fs.clone()).await.expect("fsck failed");
2878    }
2879
2880    #[fuchsia::test(threads = 10)]
2881    async fn test_encrypted_mutations() {
2882        async fn one_iteration(
2883            fs: OpenFxFilesystem,
2884            crypt: Arc<dyn Crypt>,
2885            iteration: u64,
2886        ) -> OpenFxFilesystem {
2887            async fn reopen(fs: OpenFxFilesystem) -> OpenFxFilesystem {
2888                fs.close().await.expect("Close failed");
2889                let device = fs.take_device().await;
2890                device.reopen(false);
2891                FxFilesystem::open(device).await.expect("FS open failed")
2892            }
2893
2894            let fs = reopen(fs).await;
2895
2896            let (store_object_id, object_id) = {
2897                let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
2898                let store = root_volume
2899                    .volume("test", NO_OWNER, Some(crypt.clone()))
2900                    .await
2901                    .expect("volume failed");
2902
2903                let mut transaction = fs
2904                    .clone()
2905                    .new_transaction(
2906                        lock_keys![LockKey::object(
2907                            store.store_object_id(),
2908                            store.root_directory_object_id(),
2909                        )],
2910                        Options::default(),
2911                    )
2912                    .await
2913                    .expect("new_transaction failed");
2914                let root_directory = Directory::open(&store, store.root_directory_object_id())
2915                    .await
2916                    .expect("open failed");
2917                let object = root_directory
2918                    .create_child_file(&mut transaction, &format!("test {}", iteration))
2919                    .await
2920                    .expect("create_child_file failed");
2921                transaction.commit().await.expect("commit failed");
2922
2923                let mut buf = object.allocate_buffer(1000).await;
2924                for i in 0..buf.len() {
2925                    buf.as_mut_slice()[i] = i as u8;
2926                }
2927                object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2928
2929                (store.store_object_id(), object.object_id())
2930            };
2931
2932            let fs = reopen(fs).await;
2933
2934            let check_object = |fs: Arc<FxFilesystem>| {
2935                let crypt = crypt.clone();
2936                async move {
2937                    let root_volume = root_volume(fs).await.expect("root_volume failed");
2938                    let volume = root_volume
2939                        .volume("test", NO_OWNER, Some(crypt))
2940                        .await
2941                        .expect("volume failed");
2942
2943                    let object = ObjectStore::open_object(
2944                        &volume,
2945                        object_id,
2946                        HandleOptions::default(),
2947                        None,
2948                    )
2949                    .await
2950                    .expect("open_object failed");
2951                    let mut buf = object.allocate_buffer(1000).await;
2952                    assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), 1000);
2953                    for i in 0..buf.len() {
2954                        assert_eq!(buf.as_slice()[i], i as u8);
2955                    }
2956                }
2957            };
2958
2959            check_object(fs.clone()).await;
2960
2961            let fs = reopen(fs).await;
2962
2963            // At this point the "test" volume is locked.  Before checking the object, flush the
2964            // filesystem.  This should leave a file with encrypted mutations.
2965            fs.object_manager().flush().await.expect("flush failed");
2966
2967            assert_ne!(
2968                fs.object_manager()
2969                    .store(store_object_id)
2970                    .unwrap()
2971                    .load_store_info()
2972                    .await
2973                    .expect("load_store_info failed")
2974                    .encrypted_mutations_object_id,
2975                INVALID_OBJECT_ID
2976            );
2977
2978            check_object(fs.clone()).await;
2979
2980            // Checking the object should have triggered a flush and so now there should be no
2981            // encrypted mutations object.
2982            assert_eq!(
2983                fs.object_manager()
2984                    .store(store_object_id)
2985                    .unwrap()
2986                    .load_store_info()
2987                    .await
2988                    .expect("load_store_info failed")
2989                    .encrypted_mutations_object_id,
2990                INVALID_OBJECT_ID
2991            );
2992
2993            let fs = reopen(fs).await;
2994
2995            fsck(fs.clone()).await.expect("fsck failed");
2996
2997            let fs = reopen(fs).await;
2998
2999            check_object(fs.clone()).await;
3000
3001            fs
3002        }
3003
3004        let mut fs = test_filesystem().await;
3005        let crypt = Arc::new(InsecureCrypt::new());
3006
3007        {
3008            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3009            let _store = root_volume
3010                .new_volume("test", NO_OWNER, Some(crypt.clone()))
3011                .await
3012                .expect("new_volume failed");
3013        }
3014
3015        // Run a few iterations so that we test changes with the stream cipher offset.
3016        for i in 0..5 {
3017            fs = one_iteration(fs, crypt.clone(), i).await;
3018        }
3019    }
3020
3021    #[fuchsia::test(threads = 10)]
3022    async fn test_object_id_cipher_roll() {
3023        let fs = test_filesystem().await;
3024        let crypt = Arc::new(InsecureCrypt::new());
3025
3026        {
3027            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3028            let store = root_volume
3029                .new_volume("test", NO_OWNER, Some(crypt.clone()))
3030                .await
3031                .expect("new_volume failed");
3032
3033            let store_info = store.store_info().unwrap();
3034
3035            // Hack the last object ID to force a roll of the object ID cipher.
3036            {
3037                let mut last_object_id = store.last_object_id.lock();
3038                assert_eq!(last_object_id.id & OBJECT_ID_HI_MASK, 1u64 << 32);
3039                last_object_id.id |= 0xffffffff;
3040            }
3041
3042            let mut transaction = fs
3043                .clone()
3044                .new_transaction(
3045                    lock_keys![LockKey::object(
3046                        store.store_object_id(),
3047                        store.root_directory_object_id()
3048                    )],
3049                    Options::default(),
3050                )
3051                .await
3052                .expect("new_transaction failed");
3053            let root_directory = Directory::open(&store, store.root_directory_object_id())
3054                .await
3055                .expect("open failed");
3056            let object = root_directory
3057                .create_child_file(&mut transaction, "test")
3058                .await
3059                .expect("create_child_file failed");
3060            transaction.commit().await.expect("commit failed");
3061
3062            assert_eq!(object.object_id() & OBJECT_ID_HI_MASK, 2u64 << 32);
3063
3064            // Check that the key has been changed.
3065            assert_ne!(store.store_info().unwrap().object_id_key, store_info.object_id_key);
3066
3067            assert_eq!(store.last_object_id.lock().id, 2u64 << 32);
3068        };
3069
3070        fs.close().await.expect("Close failed");
3071        let device = fs.take_device().await;
3072        device.reopen(false);
3073        let fs = FxFilesystem::open(device).await.expect("open failed");
3074        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3075        let store =
3076            root_volume.volume("test", NO_OWNER, Some(crypt.clone())).await.expect("volume failed");
3077
3078        assert_eq!(store.last_object_id.lock().id, 2u64 << 32);
3079    }
3080
3081    #[fuchsia::test(threads = 10)]
3082    async fn test_lock_store() {
3083        let fs = test_filesystem().await;
3084        let crypt = Arc::new(InsecureCrypt::new());
3085
3086        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3087        let store = root_volume
3088            .new_volume("test", NO_OWNER, Some(crypt.clone()))
3089            .await
3090            .expect("new_volume failed");
3091        let mut transaction = fs
3092            .clone()
3093            .new_transaction(
3094                lock_keys![LockKey::object(
3095                    store.store_object_id(),
3096                    store.root_directory_object_id()
3097                )],
3098                Options::default(),
3099            )
3100            .await
3101            .expect("new_transaction failed");
3102        let root_directory =
3103            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
3104        root_directory
3105            .create_child_file(&mut transaction, "test")
3106            .await
3107            .expect("create_child_file failed");
3108        transaction.commit().await.expect("commit failed");
3109        store.lock().await.expect("lock failed");
3110
3111        store.unlock(NO_OWNER, crypt).await.expect("unlock failed");
3112        root_directory.lookup("test").await.expect("lookup failed").expect("not found");
3113    }
3114
3115    #[fuchsia::test(threads = 10)]
3116    async fn test_unlock_read_only() {
3117        let fs = test_filesystem().await;
3118        let crypt = Arc::new(InsecureCrypt::new());
3119
3120        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3121        let store = root_volume
3122            .new_volume("test", NO_OWNER, Some(crypt.clone()))
3123            .await
3124            .expect("new_volume failed");
3125        let mut transaction = fs
3126            .clone()
3127            .new_transaction(
3128                lock_keys![LockKey::object(
3129                    store.store_object_id(),
3130                    store.root_directory_object_id()
3131                )],
3132                Options::default(),
3133            )
3134            .await
3135            .expect("new_transaction failed");
3136        let root_directory =
3137            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
3138        root_directory
3139            .create_child_file(&mut transaction, "test")
3140            .await
3141            .expect("create_child_file failed");
3142        transaction.commit().await.expect("commit failed");
3143        store.lock().await.expect("lock failed");
3144
3145        store.unlock_read_only(crypt.clone()).await.expect("unlock failed");
3146        root_directory.lookup("test").await.expect("lookup failed").expect("not found");
3147        store.lock_read_only();
3148        store.unlock_read_only(crypt).await.expect("unlock failed");
3149        root_directory.lookup("test").await.expect("lookup failed").expect("not found");
3150    }
3151
3152    #[fuchsia::test(threads = 10)]
3153    async fn test_key_rolled_when_unlocked() {
3154        let fs = test_filesystem().await;
3155        let crypt = Arc::new(InsecureCrypt::new());
3156
3157        let object_id;
3158        {
3159            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3160            let store = root_volume
3161                .new_volume("test", NO_OWNER, Some(crypt.clone()))
3162                .await
3163                .expect("new_volume failed");
3164            let mut transaction = fs
3165                .clone()
3166                .new_transaction(
3167                    lock_keys![LockKey::object(
3168                        store.store_object_id(),
3169                        store.root_directory_object_id()
3170                    )],
3171                    Options::default(),
3172                )
3173                .await
3174                .expect("new_transaction failed");
3175            let root_directory = Directory::open(&store, store.root_directory_object_id())
3176                .await
3177                .expect("open failed");
3178            object_id = root_directory
3179                .create_child_file(&mut transaction, "test")
3180                .await
3181                .expect("create_child_file failed")
3182                .object_id();
3183            transaction.commit().await.expect("commit failed");
3184        }
3185
3186        fs.close().await.expect("Close failed");
3187        let mut device = fs.take_device().await;
3188
3189        // Repeatedly remount so that we can be sure that we can remount when there are many
3190        // mutations keys.
3191        for _ in 0..100 {
3192            device.reopen(false);
3193            let fs = FxFilesystem::open(device).await.expect("open failed");
3194            {
3195                let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3196                let store = root_volume
3197                    .volume("test", NO_OWNER, Some(crypt.clone()))
3198                    .await
3199                    .expect("open_volume failed");
3200
3201                // The key should get rolled every time we unlock.
3202                assert_eq!(store.mutations_cipher.lock().as_ref().unwrap().offset(), 0);
3203
3204                // Make sure there's an encrypted mutation.
3205                let handle =
3206                    ObjectStore::open_object(&store, object_id, HandleOptions::default(), None)
3207                        .await
3208                        .expect("open_object failed");
3209                let buffer = handle.allocate_buffer(100).await;
3210                handle
3211                    .write_or_append(Some(0), buffer.as_ref())
3212                    .await
3213                    .expect("write_or_append failed");
3214            }
3215            fs.close().await.expect("Close failed");
3216            device = fs.take_device().await;
3217        }
3218    }
3219
3220    #[test]
3221    fn test_store_info_max_serialized_size() {
3222        let info = StoreInfo {
3223            guid: [0xff; 16],
3224            last_object_id: 0x1234567812345678,
3225            // Worst case, each layer should be 3/4 the size of the layer below it (because of the
3226            // compaction policy we're using).  If the smallest layer is 8,192 bytes, then 120
3227            // layers would take up a size that exceeds a 64 bit unsigned integer, so if this fits,
3228            // any size should fit.
3229            layers: vec![0x1234567812345678; 120],
3230            root_directory_object_id: 0x1234567812345678,
3231            graveyard_directory_object_id: 0x1234567812345678,
3232            object_count: 0x1234567812345678,
3233            mutations_key: Some(WrappedKey {
3234                wrapping_key_id: 0x1234567812345678,
3235                key: WrappedKeyBytes::from([0xff; WRAPPED_KEY_SIZE]),
3236            }),
3237            mutations_cipher_offset: 0x1234567812345678,
3238            encrypted_mutations_object_id: 0x1234567812345678,
3239            object_id_key: Some(WrappedKey {
3240                wrapping_key_id: 0x1234567812345678,
3241                key: WrappedKeyBytes::from([0xff; WRAPPED_KEY_SIZE]),
3242            }),
3243            internal_directory_object_id: INVALID_OBJECT_ID,
3244        };
3245        let mut serialized_info = Vec::new();
3246        info.serialize_with_version(&mut serialized_info).unwrap();
3247        assert!(
3248            serialized_info.len() <= MAX_STORE_INFO_SERIALIZED_SIZE,
3249            "{}",
3250            serialized_info.len()
3251        );
3252    }
3253
3254    async fn reopen_after_crypt_failure_inner(read_only: bool) {
3255        let fs = test_filesystem().await;
3256        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3257
3258        let store = {
3259            let crypt = Arc::new(InsecureCrypt::new());
3260            let store = root_volume
3261                .new_volume("vol", NO_OWNER, Some(crypt.clone()))
3262                .await
3263                .expect("new_volume failed");
3264            let root_directory = Directory::open(&store, store.root_directory_object_id())
3265                .await
3266                .expect("open failed");
3267            let mut transaction = fs
3268                .clone()
3269                .new_transaction(
3270                    lock_keys![LockKey::object(
3271                        store.store_object_id(),
3272                        root_directory.object_id()
3273                    )],
3274                    Options::default(),
3275                )
3276                .await
3277                .expect("new_transaction failed");
3278            root_directory
3279                .create_child_file(&mut transaction, "test")
3280                .await
3281                .expect("create_child_file failed");
3282            transaction.commit().await.expect("commit failed");
3283
3284            crypt.shutdown();
3285            let mut transaction = fs
3286                .clone()
3287                .new_transaction(
3288                    lock_keys![LockKey::object(
3289                        store.store_object_id(),
3290                        root_directory.object_id()
3291                    )],
3292                    Options::default(),
3293                )
3294                .await
3295                .expect("new_transaction failed");
3296            root_directory
3297                .create_child_file(&mut transaction, "test2")
3298                .await
3299                .map(|_| ())
3300                .expect_err("create_child_file should fail");
3301            store.lock().await.expect("lock failed");
3302            store
3303        };
3304
3305        let crypt = Arc::new(InsecureCrypt::new());
3306        if read_only {
3307            store.unlock_read_only(crypt).await.expect("unlock failed");
3308        } else {
3309            store.unlock(NO_OWNER, crypt).await.expect("unlock failed");
3310        }
3311        let root_directory =
3312            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
3313        root_directory.lookup("test").await.expect("lookup failed").expect("not found");
3314    }
3315
3316    #[fuchsia::test(threads = 10)]
3317    async fn test_reopen_after_crypt_failure() {
3318        reopen_after_crypt_failure_inner(false).await;
3319    }
3320
3321    #[fuchsia::test(threads = 10)]
3322    async fn test_reopen_read_only_after_crypt_failure() {
3323        reopen_after_crypt_failure_inner(true).await;
3324    }
3325
3326    #[fuchsia::test(threads = 10)]
3327    #[should_panic(expected = "Insufficient reservation space")]
3328    #[cfg(debug_assertions)]
3329    async fn large_transaction_causes_panic_in_debug_builds() {
3330        let fs = test_filesystem().await;
3331        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3332        let store = root_volume.new_volume("vol", NO_OWNER, None).await.expect("new_volume failed");
3333        let root_directory =
3334            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
3335        let mut transaction = fs
3336            .clone()
3337            .new_transaction(
3338                lock_keys![LockKey::object(store.store_object_id(), root_directory.object_id())],
3339                Options::default(),
3340            )
3341            .await
3342            .expect("transaction");
3343        for i in 0..500 {
3344            root_directory
3345                .create_symlink(&mut transaction, b"link", &format!("{}", i))
3346                .await
3347                .expect("symlink");
3348        }
3349        assert_eq!(transaction.commit().await.expect("commit"), 0);
3350    }
3351
3352    #[fuchsia::test]
3353    async fn test_crypt_failure_does_not_fuse_journal() {
3354        let fs = test_filesystem().await;
3355
3356        struct Owner;
3357        #[async_trait]
3358        impl StoreOwner for Owner {
3359            async fn force_lock(self: Arc<Self>, store: &ObjectStore) -> Result<(), anyhow::Error> {
3360                store.lock().await
3361            }
3362        }
3363        let owner = Arc::new(Owner) as Arc<dyn StoreOwner>;
3364
3365        {
3366            // Create two stores and a record for each store, so the journal will need to flush them
3367            // both later.
3368            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3369            let store1 = root_volume
3370                .new_volume("vol1", NO_OWNER, Some(Arc::new(InsecureCrypt::new())))
3371                .await
3372                .expect("new_volume failed");
3373            let crypt = Arc::new(InsecureCrypt::new());
3374            let store2 = root_volume
3375                .new_volume("vol2", Arc::downgrade(&owner), Some(crypt.clone()))
3376                .await
3377                .expect("new_volume failed");
3378            for store in [&store1, &store2] {
3379                let root_directory = Directory::open(store, store.root_directory_object_id())
3380                    .await
3381                    .expect("open failed");
3382                let mut transaction = fs
3383                    .clone()
3384                    .new_transaction(
3385                        lock_keys![LockKey::object(
3386                            store.store_object_id(),
3387                            root_directory.object_id()
3388                        )],
3389                        Options::default(),
3390                    )
3391                    .await
3392                    .expect("new_transaction failed");
3393                root_directory
3394                    .create_child_file(&mut transaction, "test")
3395                    .await
3396                    .expect("create_child_file failed");
3397                transaction.commit().await.expect("commit failed");
3398            }
3399            // Shut down the crypt instance for store2, and then compact.  Compaction should not
3400            // fail, and the store should become locked.
3401            crypt.shutdown();
3402            fs.journal().compact().await.expect("compact failed");
3403            // The store should now be locked.
3404            assert!(store2.is_locked());
3405        }
3406
3407        // Even though the store wasn't flushed, the mutation to store2 will still be valid as it is
3408        // held in the journal.
3409        fs.close().await.expect("close failed");
3410        let device = fs.take_device().await;
3411        device.reopen(false);
3412        let fs = FxFilesystem::open(device).await.expect("open failed");
3413        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3414
3415        for volume_name in ["vol1", "vol2"] {
3416            let store = root_volume
3417                .volume(volume_name, NO_OWNER, Some(Arc::new(InsecureCrypt::new())))
3418                .await
3419                .expect("open volume failed");
3420            let root_directory = Directory::open(&store, store.root_directory_object_id())
3421                .await
3422                .expect("open failed");
3423            assert!(root_directory.lookup("test").await.expect("lookup failed").is_some());
3424        }
3425
3426        fs.close().await.expect("close failed");
3427    }
3428
3429    #[fuchsia::test]
3430    async fn test_crypt_failure_during_unlock_race() {
3431        let fs = test_filesystem().await;
3432
3433        struct Owner;
3434        #[async_trait]
3435        impl StoreOwner for Owner {
3436            async fn force_lock(self: Arc<Self>, store: &ObjectStore) -> Result<(), anyhow::Error> {
3437                store.lock().await
3438            }
3439        }
3440        let owner = Arc::new(Owner) as Arc<dyn StoreOwner>;
3441
3442        let store_object_id = {
3443            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3444            let store = root_volume
3445                .new_volume("vol", Arc::downgrade(&owner), Some(Arc::new(InsecureCrypt::new())))
3446                .await
3447                .expect("new_volume failed");
3448            let root_directory = Directory::open(&store, store.root_directory_object_id())
3449                .await
3450                .expect("open failed");
3451            let mut transaction = fs
3452                .clone()
3453                .new_transaction(
3454                    lock_keys![LockKey::object(
3455                        store.store_object_id(),
3456                        root_directory.object_id()
3457                    )],
3458                    Options::default(),
3459                )
3460                .await
3461                .expect("new_transaction failed");
3462            root_directory
3463                .create_child_file(&mut transaction, "test")
3464                .await
3465                .expect("create_child_file failed");
3466            transaction.commit().await.expect("commit failed");
3467            store.store_object_id()
3468        };
3469
3470        fs.close().await.expect("close failed");
3471        let device = fs.take_device().await;
3472        device.reopen(false);
3473
3474        let fs = FxFilesystem::open(device).await.expect("open failed");
3475        {
3476            let fs_clone = fs.clone();
3477            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3478
3479            let crypt = Arc::new(InsecureCrypt::new());
3480            let crypt_clone = crypt.clone();
3481            join!(
3482                async move {
3483                    // Unlock might fail, so ignore errors.
3484                    let _ =
3485                        root_volume.volume("vol", Arc::downgrade(&owner), Some(crypt_clone)).await;
3486                },
3487                async move {
3488                    // Block until unlock is finished but before flushing due to unlock is finished, to
3489                    // maximize the chances of weirdness.
3490                    let keys = lock_keys![LockKey::flush(store_object_id)];
3491                    let _ = fs_clone.lock_manager().write_lock(keys).await;
3492                    crypt.shutdown();
3493                }
3494            );
3495        }
3496
3497        fs.close().await.expect("close failed");
3498        let device = fs.take_device().await;
3499        device.reopen(false);
3500
3501        let fs = FxFilesystem::open(device).await.expect("open failed");
3502        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3503        let store = root_volume
3504            .volume("vol", NO_OWNER, Some(Arc::new(InsecureCrypt::new())))
3505            .await
3506            .expect("open volume failed");
3507        let root_directory =
3508            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
3509        assert!(root_directory.lookup("test").await.expect("lookup failed").is_some());
3510
3511        fs.close().await.expect("close failed");
3512    }
3513}