fxfs/
object_store.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5pub mod allocator;
6pub mod caching_object_handle;
7pub mod data_object_handle;
8pub mod directory;
9mod extent_mapping_iterator;
10mod extent_record;
11mod flush;
12pub mod graveyard;
13mod install;
14pub mod journal;
15mod key_manager;
16pub(crate) mod merge;
17pub mod object_manager;
18pub mod object_record;
19pub mod project_id;
20mod store_object_handle;
21pub mod transaction;
22mod tree;
23mod tree_cache;
24pub mod volume;
25
26pub use data_object_handle::{
27    DataObjectHandle, DirectWriter, FileExtent, FsverityState, FsverityStateInner, RangeType,
28};
29pub use directory::Directory;
30pub use object_record::{ChildValue, ObjectDescriptor, PosixAttributes, Timestamp};
31pub use store_object_handle::{
32    EXTENDED_ATTRIBUTE_RANGE_END, EXTENDED_ATTRIBUTE_RANGE_START, SetExtendedAttributeMode,
33    StoreObjectHandle,
34};
35
36use crate::errors::FxfsError;
37use crate::filesystem::{
38    ApplyContext, ApplyMode, FxFilesystem, JournalingObject, MAX_FILE_SIZE, SyncOptions,
39    TruncateGuard, TxnGuard,
40};
41use crate::log::*;
42use crate::lsm_tree::cache::{NullCache, ObjectCache};
43use crate::lsm_tree::types::{Item, ItemRef, LayerIterator};
44use crate::lsm_tree::{LSMTree, Query};
45use crate::object_handle::{INVALID_OBJECT_ID, ObjectHandle, ObjectProperties, ReadObjectHandle};
46use crate::object_store::allocator::Allocator;
47use crate::object_store::graveyard::Graveyard;
48use crate::object_store::journal::{JournalCheckpoint, JournalCheckpointV32, JournaledTransaction};
49use crate::object_store::key_manager::KeyManager;
50use crate::object_store::transaction::{
51    AssocObj, AssociatedObject, LockKey, ObjectStoreMutation, Operation, Options, Transaction,
52    lock_keys,
53};
54use crate::range::RangeExt;
55use crate::round::round_up;
56use crate::serialized_types::{Version, Versioned, VersionedLatest};
57use anyhow::{Context, Error, anyhow, bail, ensure};
58use async_trait::async_trait;
59use fidl_fuchsia_io as fio;
60use fprint::TypeFingerprint;
61use fuchsia_sync::Mutex;
62use fxfs_crypto::ff1::Ff1;
63use fxfs_crypto::{
64    Cipher, CipherHolder, Crypt, FxfsCipher, KeyPurpose, ObjectType, StreamCipher, UnwrappedKey,
65    WrappingKeyId,
66};
67use fxfs_macros::{Migrate, migrate_to_version};
68use scopeguard::ScopeGuard;
69use serde::{Deserialize, Serialize};
70use std::fmt;
71use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
72use std::sync::{Arc, OnceLock, Weak};
73use storage_device::Device;
74use uuid::Uuid;
75
76pub use extent_record::{
77    BLOB_MERKLE_ATTRIBUTE_ID, DEFAULT_DATA_ATTRIBUTE_ID, ExtentKey, ExtentMode, ExtentValue,
78    FSVERITY_MERKLE_ATTRIBUTE_ID,
79};
80pub use object_record::{
81    AttributeKey, EncryptionKey, EncryptionKeys, ExtendedAttributeValue, FsverityMetadata, FxfsKey,
82    FxfsKeyV40, FxfsKeyV49, ObjectAttributes, ObjectKey, ObjectKeyData, ObjectKind, ObjectValue,
83    ProjectProperty, RootDigest,
84};
85pub use transaction::Mutation;
86
87// For encrypted stores, the lower 32 bits of the object ID are encrypted to make side-channel
88// attacks more difficult. This mask can be used to extract the hi part of the object ID.
89const OBJECT_ID_HI_MASK: u64 = 0xffffffff00000000;
90
91// At time of writing, this threshold limits transactions that delete extents to about 10,000 bytes.
92const TRANSACTION_MUTATION_THRESHOLD: usize = 200;
93
94// Encrypted files and directories use the fscrypt key (identified by `FSCRYPT_KEY_ID`) to encrypt
95// file contents and filenames respectively. All non-fscrypt encrypted files otherwise default to
96// using the `VOLUME_DATA_KEY_ID` key. Note, the filesystem always uses the `VOLUME_DATA_KEY_ID`
97// key to encrypt large extended attributes. Thus, encrypted files and directories with large
98// xattrs will have both an fscrypt and volume data key.
99pub const VOLUME_DATA_KEY_ID: u64 = 0;
100pub const FSCRYPT_KEY_ID: u64 = 1;
101
102/// A constant that can be used where an owner is expected of type `Weak<dyn StoreOwner>` but no
103/// owner is required.
104pub const NO_OWNER: Weak<()> = Weak::new();
105impl StoreOwner for () {}
106
107#[async_trait]
108pub trait StoreOwner: Send + Sync {
109    /// Forcibly lock the store.  This exists to give the StoreOwner an opportunity to clean up
110    /// tasks which might access the store before locking it, because ObjectStore::unlock can only
111    /// be called when the store is not in use.
112    async fn force_lock(self: Arc<Self>, _store: &ObjectStore) -> Result<(), Error> {
113        Err(anyhow!(FxfsError::Internal))
114    }
115}
116
117/// DataObjectHandle stores an owner that must implement this trait, which allows the handle to get
118/// back to an ObjectStore.
119pub trait HandleOwner: AsRef<ObjectStore> + Send + Sync + 'static {}
120
121/// StoreInfo stores information about the object store.  This is stored within the parent object
122/// store, and is used, for example, to get the persistent layer objects.
123pub type StoreInfo = StoreInfoV49;
124
125#[derive(Clone, Debug, Default, Serialize, Deserialize, TypeFingerprint, Versioned)]
126pub struct StoreInfoV49 {
127    /// The globally unique identifier for the associated object store. If unset, will be all zero.
128    guid: [u8; 16],
129
130    /// The last used object ID.  Note that this field is not accurate in memory; ObjectStore's
131    /// last_object_id field is the one to use in that case.  Technically, this might not be the
132    /// last object ID used for the latest transaction that created an object because we use this at
133    /// the point of creating the object but before we commit the transaction.  Transactions can
134    /// then get committed in an arbitrary order (or not at all).
135    last_object_id: u64,
136
137    /// Object ids for layers.  TODO(https://fxbug.dev/42178036): need a layer of indirection here
138    /// so we can support snapshots.
139    pub layers: Vec<u64>,
140
141    /// The object ID for the root directory.
142    root_directory_object_id: u64,
143
144    /// The object ID for the graveyard.
145    graveyard_directory_object_id: u64,
146
147    /// The number of live objects in the store.  This should *not* be trusted; it can be invalid
148    /// due to filesystem inconsistencies.
149    object_count: u64,
150
151    /// The (wrapped) key that encrypted mutations should use.
152    mutations_key: Option<FxfsKeyV49>,
153
154    /// Mutations for the store are encrypted using a stream cipher.  To decrypt the mutations, we
155    /// need to know the offset in the cipher stream to start it.
156    mutations_cipher_offset: u64,
157
158    /// If we have to flush the store whilst we do not have the key, we need to write the encrypted
159    /// mutations to an object. This is the object ID of that file if it exists.
160    pub encrypted_mutations_object_id: u64,
161
162    /// Object IDs are encrypted to reduce the amount of information that sequential object IDs
163    /// reveal (such as the number of files in the system and the ordering of their creation in
164    /// time).  Only the bottom 32 bits of the object ID are encrypted whilst the top 32 bits will
165    /// increment after 2^32 object IDs have been used and this allows us to roll the key.
166    object_id_key: Option<FxfsKeyV49>,
167
168    /// A directory for storing internal files in a directory structure. Holds INVALID_OBJECT_ID
169    /// when the directory doesn't yet exist.
170    internal_directory_object_id: u64,
171}
172
173#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
174#[migrate_to_version(StoreInfoV49)]
175pub struct StoreInfoV40 {
176    guid: [u8; 16],
177    last_object_id: u64,
178    pub layers: Vec<u64>,
179    root_directory_object_id: u64,
180    graveyard_directory_object_id: u64,
181    object_count: u64,
182    mutations_key: Option<FxfsKeyV40>,
183    mutations_cipher_offset: u64,
184    pub encrypted_mutations_object_id: u64,
185    object_id_key: Option<FxfsKeyV40>,
186    internal_directory_object_id: u64,
187}
188
189impl StoreInfo {
190    /// Returns the parent objects for this store.
191    pub fn parent_objects(&self) -> Vec<u64> {
192        // We should not include the ID of the store itself, since that should be referred to in the
193        // volume directory.
194        let mut objects = self.layers.to_vec();
195        if self.encrypted_mutations_object_id != INVALID_OBJECT_ID {
196            objects.push(self.encrypted_mutations_object_id);
197        }
198        objects
199    }
200}
201
202// TODO(https://fxbug.dev/42178037): We should test or put checks in place to ensure this limit isn't exceeded.
203// It will likely involve placing limits on the maximum number of layers.
204pub const MAX_STORE_INFO_SERIALIZED_SIZE: usize = 131072;
205
206// This needs to be large enough to accommodate the maximum amount of unflushed data (data that is
207// in the journal but hasn't yet been written to layer files) for a store.  We set a limit because
208// we want to limit the amount of memory use in the case the filesystem is corrupt or under attack.
209pub const MAX_ENCRYPTED_MUTATIONS_SIZE: usize = 8 * journal::DEFAULT_RECLAIM_SIZE as usize;
210
211#[derive(Default)]
212pub struct HandleOptions {
213    /// If true, transactions used by this handle will skip journal space checks.
214    pub skip_journal_checks: bool,
215    /// If true, data written to any attribute of this handle will not have per-block checksums
216    /// computed.
217    pub skip_checksums: bool,
218    /// If true, any files using fsverity will not attempt to perform any verification. This is
219    /// useful to open an object without the correct encryption keys to look at the metadata.
220    pub skip_fsverity: bool,
221}
222
223/// Parameters for encrypting a newly created object.
224pub struct ObjectEncryptionOptions {
225    /// If set, the keys are treated as permanent and never evicted from the KeyManager cache.
226    /// This is necessary when keys are managed by another store; for example, the layer files
227    /// of a child store are objects in the root store, but they are encrypted with keys from the
228    /// child store.  Generally, most objects should have this set to `false`.
229    pub permanent: bool,
230    pub key_id: u64,
231    pub key: EncryptionKey,
232    pub unwrapped_key: UnwrappedKey,
233}
234
235pub struct StoreOptions {
236    /// The owner of the store.
237    pub owner: Weak<dyn StoreOwner>,
238
239    /// The store is unencrypted if store is none.
240    pub crypt: Option<Arc<dyn Crypt>>,
241}
242
243impl Default for StoreOptions {
244    fn default() -> Self {
245        Self { owner: NO_OWNER, crypt: None }
246    }
247}
248
249#[derive(Default)]
250pub struct NewChildStoreOptions {
251    pub options: StoreOptions,
252
253    /// Specifies the object ID in the root store to be used for the store.  If set to
254    /// INVALID_OBJECT_ID (the default and typical case), a suitable ID will be chosen.
255    pub object_id: u64,
256
257    /// If true, reserve all 32 bit object_ids.
258    pub reserve_32bit_object_ids: bool,
259
260    /// If set, use this GUID for the new store.
261    pub guid: Option<[u8; 16]>,
262}
263
264pub type EncryptedMutations = EncryptedMutationsV49;
265
266#[derive(Clone, Default, Deserialize, Serialize, TypeFingerprint)]
267pub struct EncryptedMutationsV49 {
268    // Information about the mutations are held here, but the actual encrypted data is held within
269    // data.  For each transaction, we record the checkpoint and the count of mutations within the
270    // transaction.  The checkpoint is required for the log file offset (which we need to apply the
271    // mutations), and the version so that we can correctly decode the mutation after it has been
272    // decrypted. The count specifies the number of serialized mutations encoded in |data|.
273    transactions: Vec<(JournalCheckpointV32, u64)>,
274
275    // The encrypted mutations.
276    data: Vec<u8>,
277
278    // If the mutations key was rolled, this holds the offset in `data` where the new key should
279    // apply.
280    mutations_key_roll: Vec<(usize, FxfsKeyV49)>,
281}
282
283impl std::fmt::Debug for EncryptedMutations {
284    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
285        f.debug_struct("EncryptedMutations")
286            .field("transactions", &self.transactions)
287            .field("len", &self.data.len())
288            .field(
289                "mutations_key_roll",
290                &self.mutations_key_roll.iter().map(|k| k.0).collect::<Vec<usize>>(),
291            )
292            .finish()
293    }
294}
295
296impl Versioned for EncryptedMutations {
297    fn max_serialized_size() -> u64 {
298        MAX_ENCRYPTED_MUTATIONS_SIZE as u64
299    }
300}
301
302impl From<EncryptedMutationsV40> for EncryptedMutationsV49 {
303    fn from(value: EncryptedMutationsV40) -> Self {
304        EncryptedMutationsV49 {
305            transactions: value.transactions,
306            data: value.data,
307            mutations_key_roll: value
308                .mutations_key_roll
309                .into_iter()
310                .map(|(offset, key)| (offset, key.into()))
311                .collect(),
312        }
313    }
314}
315
316#[derive(Deserialize, Serialize, TypeFingerprint)]
317pub struct EncryptedMutationsV40 {
318    transactions: Vec<(JournalCheckpointV32, u64)>,
319    data: Vec<u8>,
320    mutations_key_roll: Vec<(usize, FxfsKeyV40)>,
321}
322
323impl Versioned for EncryptedMutationsV40 {
324    fn max_serialized_size() -> u64 {
325        MAX_ENCRYPTED_MUTATIONS_SIZE as u64
326    }
327}
328
329impl EncryptedMutations {
330    fn from_replayed_mutations(
331        store_object_id: u64,
332        transactions: Vec<JournaledTransaction>,
333    ) -> Self {
334        let mut this = Self::default();
335        for JournaledTransaction { checkpoint, non_root_mutations, .. } in transactions {
336            for (object_id, mutation) in non_root_mutations {
337                if store_object_id == object_id {
338                    if let Mutation::EncryptedObjectStore(data) = mutation {
339                        this.push(&checkpoint, data);
340                    } else if let Mutation::UpdateMutationsKey(key) = mutation {
341                        this.mutations_key_roll.push((this.data.len(), key.into()));
342                    }
343                }
344            }
345        }
346        this
347    }
348
349    fn extend(&mut self, other: &EncryptedMutations) {
350        self.transactions.extend_from_slice(&other.transactions[..]);
351        self.mutations_key_roll.extend(
352            other
353                .mutations_key_roll
354                .iter()
355                .map(|(offset, key)| (offset + self.data.len(), key.clone())),
356        );
357        self.data.extend_from_slice(&other.data[..]);
358    }
359
360    fn push(&mut self, checkpoint: &JournalCheckpoint, data: Box<[u8]>) {
361        self.data.append(&mut data.into());
362        // If the checkpoint is the same as the last mutation we pushed, increment the count.
363        if let Some((last_checkpoint, count)) = self.transactions.last_mut() {
364            if last_checkpoint.file_offset == checkpoint.file_offset {
365                *count += 1;
366                return;
367            }
368        }
369        self.transactions.push((checkpoint.clone(), 1));
370    }
371}
372
373pub enum LockState {
374    Locked,
375    Unencrypted,
376    Unlocked { owner: Weak<dyn StoreOwner>, crypt: Arc<dyn Crypt> },
377
378    // The store is unlocked, but in a read-only state, and no flushes or other operations will be
379    // performed on the store.
380    UnlockedReadOnly(Arc<dyn Crypt>),
381
382    // The store is encrypted but is now in an unusable state (due to a failure to sync the journal
383    // after locking the store).  The store cannot be unlocked.
384    Invalid,
385
386    // Before we've read the StoreInfo we might not know whether the store is Locked or Unencrypted.
387    // This can happen when lazily opening stores (ObjectManager::lazy_open_store).
388    Unknown,
389
390    // The store is in the process of being locked.  Whilst the store is being locked, the store
391    // isn't usable; assertions will trip if any mutations are applied.
392    Locking,
393
394    // Whilst we're unlocking, we will replay encrypted mutations.  The store isn't usable until
395    // it's in the Unlocked state.
396    Unlocking,
397
398    // The store has been deleted.
399    Deleted,
400}
401
402impl LockState {
403    fn owner(&self) -> Option<Arc<dyn StoreOwner>> {
404        if let Self::Unlocked { owner, .. } = self { owner.upgrade() } else { None }
405    }
406}
407
408impl fmt::Debug for LockState {
409    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
410        formatter.write_str(match self {
411            LockState::Locked => "Locked",
412            LockState::Unencrypted => "Unencrypted",
413            LockState::Unlocked { .. } => "Unlocked",
414            LockState::UnlockedReadOnly(..) => "UnlockedReadOnly",
415            LockState::Invalid => "Invalid",
416            LockState::Unknown => "Unknown",
417            LockState::Locking => "Locking",
418            LockState::Unlocking => "Unlocking",
419            LockState::Deleted => "Deleted",
420        })
421    }
422}
423
424#[derive(Default, Clone)]
425struct LastObjectId {
426    // The *unencrypted* value of the last object ID.
427    id: u64,
428
429    // Encrypted stores will use a cipher to obfuscate the object ID.
430    cipher: Option<Ff1>,
431}
432
433impl LastObjectId {
434    // Returns true if a cipher is needed to generate new object IDs.
435    fn should_create_cipher(&self) -> bool {
436        self.cipher.is_some() && self.id as u32 == u32::MAX
437    }
438
439    fn get_next_object_id(&mut self) -> u64 {
440        if let Some(cipher) = &self.cipher {
441            let hi = self.id & OBJECT_ID_HI_MASK;
442            loop {
443                self.id += 1;
444                assert_ne!(self.id as u32, 0); // This would indicate the ID wrapped.
445                let candidate = hi | cipher.encrypt(self.id as u32) as u64;
446                if candidate != INVALID_OBJECT_ID {
447                    break candidate;
448                }
449            }
450        } else {
451            self.id += 1;
452            self.id
453        }
454    }
455}
456
457/// An object store supports a file like interface for objects.  Objects are keyed by a 64 bit
458/// identifier.  And object store has to be backed by a parent object store (which stores metadata
459/// for the object store).  The top-level object store (a.k.a. the root parent object store) is
460/// in-memory only.
461pub struct ObjectStore {
462    parent_store: Option<Arc<ObjectStore>>,
463    store_object_id: u64,
464    device: Arc<dyn Device>,
465    block_size: u64,
466    filesystem: Weak<FxFilesystem>,
467    // Lock ordering: This must be taken before `lock_state`.
468    store_info: Mutex<Option<StoreInfo>>,
469    tree: LSMTree<ObjectKey, ObjectValue>,
470
471    // When replaying the journal, the store cannot read StoreInfo until the whole journal
472    // has been replayed, so during that time, store_info_handle will be None and records
473    // just get sent to the tree. Once the journal has been replayed, we can open the store
474    // and load all the other layer information.
475    store_info_handle: OnceLock<DataObjectHandle<ObjectStore>>,
476
477    // The cipher to use for encrypted mutations, if this store is encrypted.
478    mutations_cipher: Mutex<Option<StreamCipher>>,
479
480    // Current lock state of the store.
481    // Lock ordering: This must be taken after `store_info`.
482    lock_state: Mutex<LockState>,
483    pub key_manager: KeyManager,
484
485    // Enable/disable tracing.
486    trace: AtomicBool,
487
488    // Informational counters for events occurring within the store.
489    counters: Mutex<ObjectStoreCounters>,
490
491    // These are updated in performance-sensitive code paths so we use atomics instead of counters.
492    device_read_ops: AtomicU64,
493    device_write_ops: AtomicU64,
494    logical_read_ops: AtomicU64,
495    logical_write_ops: AtomicU64,
496
497    // Contains the last object ID and, optionally, a cipher to be used when generating new object
498    // IDs.
499    last_object_id: Mutex<LastObjectId>,
500
501    // An optional callback to be invoked each time the ObjectStore flushes.  The callback is
502    // invoked at the end of flush, while the write lock is still held.
503    flush_callback: Mutex<Option<Box<dyn Fn(&ObjectStore) + Send + Sync + 'static>>>,
504}
505
506#[derive(Clone, Default)]
507struct ObjectStoreCounters {
508    mutations_applied: u64,
509    mutations_dropped: u64,
510    num_flushes: u64,
511    last_flush_time: Option<std::time::SystemTime>,
512}
513
514impl ObjectStore {
515    fn new(
516        parent_store: Option<Arc<ObjectStore>>,
517        store_object_id: u64,
518        filesystem: Arc<FxFilesystem>,
519        store_info: Option<StoreInfo>,
520        object_cache: Box<dyn ObjectCache<ObjectKey, ObjectValue>>,
521        mutations_cipher: Option<StreamCipher>,
522        lock_state: LockState,
523        last_object_id: LastObjectId,
524    ) -> Arc<ObjectStore> {
525        let device = filesystem.device();
526        let block_size = filesystem.block_size();
527        Arc::new(ObjectStore {
528            parent_store,
529            store_object_id,
530            device,
531            block_size,
532            filesystem: Arc::downgrade(&filesystem),
533            store_info: Mutex::new(store_info),
534            tree: LSMTree::new(merge::merge, object_cache),
535            store_info_handle: OnceLock::new(),
536            mutations_cipher: Mutex::new(mutations_cipher),
537            lock_state: Mutex::new(lock_state),
538            key_manager: KeyManager::new(),
539            trace: AtomicBool::new(false),
540            counters: Mutex::new(ObjectStoreCounters::default()),
541            device_read_ops: AtomicU64::new(0),
542            device_write_ops: AtomicU64::new(0),
543            logical_read_ops: AtomicU64::new(0),
544            logical_write_ops: AtomicU64::new(0),
545            last_object_id: Mutex::new(last_object_id),
546            flush_callback: Mutex::new(None),
547        })
548    }
549
550    fn new_empty(
551        parent_store: Option<Arc<ObjectStore>>,
552        store_object_id: u64,
553        filesystem: Arc<FxFilesystem>,
554        object_cache: Box<dyn ObjectCache<ObjectKey, ObjectValue>>,
555    ) -> Arc<Self> {
556        Self::new(
557            parent_store,
558            store_object_id,
559            filesystem,
560            Some(StoreInfo::default()),
561            object_cache,
562            None,
563            LockState::Unencrypted,
564            LastObjectId::default(),
565        )
566    }
567
568    /// Cycle breaker constructor that returns an ObjectStore without a filesystem.
569    /// This should only be used from super block code.
570    pub fn new_root_parent(device: Arc<dyn Device>, block_size: u64, store_object_id: u64) -> Self {
571        ObjectStore {
572            parent_store: None,
573            store_object_id,
574            device,
575            block_size,
576            filesystem: Weak::<FxFilesystem>::new(),
577            store_info: Mutex::new(Some(StoreInfo::default())),
578            tree: LSMTree::new(merge::merge, Box::new(NullCache {})),
579            store_info_handle: OnceLock::new(),
580            mutations_cipher: Mutex::new(None),
581            lock_state: Mutex::new(LockState::Unencrypted),
582            key_manager: KeyManager::new(),
583            trace: AtomicBool::new(false),
584            counters: Mutex::new(ObjectStoreCounters::default()),
585            device_read_ops: AtomicU64::new(0),
586            device_write_ops: AtomicU64::new(0),
587            logical_read_ops: AtomicU64::new(0),
588            logical_write_ops: AtomicU64::new(0),
589            last_object_id: Mutex::new(LastObjectId::default()),
590            flush_callback: Mutex::new(None),
591        }
592    }
593
594    /// Used to set filesystem on root_parent stores at bootstrap time after the filesystem has
595    /// been created.
596    pub fn attach_filesystem(mut this: ObjectStore, filesystem: Arc<FxFilesystem>) -> ObjectStore {
597        this.filesystem = Arc::downgrade(&filesystem);
598        this
599    }
600
601    /// Create a child store. It is a multi-step process:
602    ///
603    ///   1. Call `ObjectStore::new_child_store`.
604    ///   2. Register the store with the object-manager.
605    ///   3. Call `ObjectStore::create` to write the store-info.
606    ///
607    /// If the procedure fails, care must be taken to unregister store with the object-manager.
608    ///
609    /// The steps have to be separate because of lifetime issues when working with a transaction.
610    async fn new_child_store(
611        self: &Arc<Self>,
612        transaction: &mut Transaction<'_>,
613        options: NewChildStoreOptions,
614        object_cache: Box<dyn ObjectCache<ObjectKey, ObjectValue>>,
615    ) -> Result<Arc<Self>, Error> {
616        let handle = if options.object_id != INVALID_OBJECT_ID {
617            let handle = ObjectStore::create_object_with_id(
618                self,
619                transaction,
620                options.object_id,
621                HandleOptions::default(),
622                None,
623            )?;
624            self.update_last_object_id(options.object_id);
625            handle
626        } else {
627            ObjectStore::create_object(self, transaction, HandleOptions::default(), None).await?
628        };
629        let filesystem = self.filesystem();
630        let id = if options.reserve_32bit_object_ids { 0x1_0000_0000 } else { 0 };
631        let store = if let Some(crypt) = options.options.crypt {
632            let (wrapped_key, unwrapped_key) =
633                crypt.create_key(handle.object_id(), KeyPurpose::Metadata).await?;
634            let (object_id_wrapped, object_id_unwrapped) =
635                crypt.create_key(handle.object_id(), KeyPurpose::Metadata).await?;
636            Self::new(
637                Some(self.clone()),
638                handle.object_id(),
639                filesystem.clone(),
640                Some(StoreInfo {
641                    mutations_key: Some(wrapped_key),
642                    object_id_key: Some(object_id_wrapped),
643                    guid: options.guid.unwrap_or_else(|| *Uuid::new_v4().as_bytes()),
644                    ..Default::default()
645                }),
646                object_cache,
647                Some(StreamCipher::new(&unwrapped_key, 0)),
648                LockState::Unlocked { owner: options.options.owner, crypt },
649                LastObjectId { id, cipher: Some(Ff1::new(&object_id_unwrapped)) },
650            )
651        } else {
652            Self::new(
653                Some(self.clone()),
654                handle.object_id(),
655                filesystem.clone(),
656                Some(StoreInfo {
657                    guid: options.guid.unwrap_or_else(|| *Uuid::new_v4().as_bytes()),
658                    ..Default::default()
659                }),
660                object_cache,
661                None,
662                LockState::Unencrypted,
663                LastObjectId { id, ..LastObjectId::default() },
664            )
665        };
666        assert!(store.store_info_handle.set(handle).is_ok());
667        Ok(store)
668    }
669
670    /// Actually creates the store in a transaction.  This will also create a root directory and
671    /// graveyard directory for the store.  See `new_child_store` above.
672    async fn create<'a>(
673        self: &'a Arc<Self>,
674        transaction: &mut Transaction<'a>,
675    ) -> Result<(), Error> {
676        let buf = {
677            // Create a root directory and graveyard directory.
678            let graveyard_directory_object_id = Graveyard::create(transaction, &self);
679            let root_directory = Directory::create(transaction, &self, None).await?;
680
681            let serialized_info = {
682                let mut store_info = self.store_info.lock();
683                let store_info = store_info.as_mut().unwrap();
684
685                store_info.graveyard_directory_object_id = graveyard_directory_object_id;
686                store_info.root_directory_object_id = root_directory.object_id();
687
688                let mut serialized_info = Vec::new();
689                store_info.serialize_with_version(&mut serialized_info)?;
690                serialized_info
691            };
692            let mut buf = self.device.allocate_buffer(serialized_info.len()).await;
693            buf.as_mut_slice().copy_from_slice(&serialized_info[..]);
694            buf
695        };
696
697        if self.filesystem().options().image_builder_mode.is_some() {
698            // If we're in image builder mode, we want to avoid writing to disk unless explicitly
699            // asked to. New object stores will have their StoreInfo written when we compact in
700            // FxFilesystem::finalize().
701            Ok(())
702        } else {
703            self.store_info_handle.get().unwrap().txn_write(transaction, 0u64, buf.as_ref()).await
704        }
705    }
706
707    pub fn set_trace(&self, trace: bool) {
708        let old_value = self.trace.swap(trace, Ordering::Relaxed);
709        if trace != old_value {
710            info!(store_id = self.store_object_id(), trace; "OS: trace",);
711        }
712    }
713
714    /// Sets a callback to be invoked each time the ObjectStore flushes.  The callback is invoked at
715    /// the end of flush, while the write lock is still held.
716    pub fn set_flush_callback<F: Fn(&ObjectStore) + Send + Sync + 'static>(&self, callback: F) {
717        let mut flush_callback = self.flush_callback.lock();
718        *flush_callback = Some(Box::new(callback));
719    }
720
721    pub fn is_root(&self) -> bool {
722        if let Some(parent) = &self.parent_store {
723            parent.parent_store.is_none()
724        } else {
725            // The root parent store isn't the root store.
726            false
727        }
728    }
729
730    /// Populates an inspect node with store statistics.
731    pub fn record_data(self: &Arc<Self>, root: &fuchsia_inspect::Node) {
732        // TODO(https://fxbug.dev/42069513): Push-back or rate-limit to prevent DoS.
733        let counters = self.counters.lock();
734        if let Some(store_info) = self.store_info() {
735            root.record_string("guid", Uuid::from_bytes(store_info.guid).to_string());
736        };
737        root.record_uint("store_object_id", self.store_object_id);
738        root.record_uint("mutations_applied", counters.mutations_applied);
739        root.record_uint("mutations_dropped", counters.mutations_dropped);
740        root.record_uint("num_flushes", counters.num_flushes);
741        if let Some(last_flush_time) = counters.last_flush_time.as_ref() {
742            root.record_uint(
743                "last_flush_time_ms",
744                last_flush_time
745                    .duration_since(std::time::UNIX_EPOCH)
746                    .unwrap_or(std::time::Duration::ZERO)
747                    .as_millis()
748                    .try_into()
749                    .unwrap_or(0u64),
750            );
751        }
752        root.record_uint("device_read_ops", self.device_read_ops.load(Ordering::Relaxed));
753        root.record_uint("device_write_ops", self.device_write_ops.load(Ordering::Relaxed));
754        root.record_uint("logical_read_ops", self.logical_read_ops.load(Ordering::Relaxed));
755        root.record_uint("logical_write_ops", self.logical_write_ops.load(Ordering::Relaxed));
756        root.record_uint("object_id_hi", self.last_object_id.lock().id >> 32);
757
758        let this = self.clone();
759        root.record_child("lsm_tree", move |node| this.tree().record_inspect_data(node));
760    }
761
762    pub fn device(&self) -> &Arc<dyn Device> {
763        &self.device
764    }
765
766    pub fn block_size(&self) -> u64 {
767        self.block_size
768    }
769
770    pub fn filesystem(&self) -> Arc<FxFilesystem> {
771        self.filesystem.upgrade().unwrap()
772    }
773
774    pub fn store_object_id(&self) -> u64 {
775        self.store_object_id
776    }
777
778    pub fn tree(&self) -> &LSMTree<ObjectKey, ObjectValue> {
779        &self.tree
780    }
781
782    pub fn root_directory_object_id(&self) -> u64 {
783        self.store_info.lock().as_ref().unwrap().root_directory_object_id
784    }
785
786    pub fn guid(&self) -> [u8; 16] {
787        self.store_info.lock().as_ref().unwrap().guid
788    }
789
790    pub fn graveyard_directory_object_id(&self) -> u64 {
791        self.store_info.lock().as_ref().unwrap().graveyard_directory_object_id
792    }
793
794    fn set_graveyard_directory_object_id(&self, oid: u64) {
795        assert_eq!(
796            std::mem::replace(
797                &mut self.store_info.lock().as_mut().unwrap().graveyard_directory_object_id,
798                oid
799            ),
800            INVALID_OBJECT_ID
801        );
802    }
803
804    pub fn object_count(&self) -> u64 {
805        self.store_info.lock().as_ref().unwrap().object_count
806    }
807
808    pub fn key_manager(&self) -> &KeyManager {
809        &self.key_manager
810    }
811
812    pub fn parent_store(&self) -> Option<&Arc<ObjectStore>> {
813        self.parent_store.as_ref()
814    }
815
816    /// Returns the crypt object for the store.  Returns None if the store is unencrypted.
817    pub fn crypt(&self) -> Option<Arc<dyn Crypt>> {
818        match &*self.lock_state.lock() {
819            LockState::Locked => panic!("Store is locked"),
820            LockState::Invalid
821            | LockState::Unencrypted
822            | LockState::Locking
823            | LockState::Unlocking
824            | LockState::Deleted => None,
825            LockState::Unlocked { crypt, .. } => Some(crypt.clone()),
826            LockState::UnlockedReadOnly(crypt) => Some(crypt.clone()),
827            LockState::Unknown => {
828                panic!("Store is of unknown lock state; has the journal been replayed yet?")
829            }
830        }
831    }
832
833    pub async fn get_or_create_internal_directory_id(self: &Arc<Self>) -> Result<u64, Error> {
834        // Create the transaction first to use the object store lock.
835        let mut transaction = self
836            .filesystem()
837            .new_transaction(
838                lock_keys![LockKey::object(
839                    self.parent_store.as_ref().unwrap().store_object_id,
840                    self.store_object_id,
841                )],
842                Options::default(),
843            )
844            .await?;
845        let obj_id = self.store_info.lock().as_ref().unwrap().internal_directory_object_id;
846        if obj_id != INVALID_OBJECT_ID {
847            return Ok(obj_id);
848        }
849
850        // Need to create an internal directory.
851        let directory = Directory::create(&mut transaction, self, None).await?;
852
853        transaction.add(self.store_object_id, Mutation::CreateInternalDir(directory.object_id()));
854        transaction.commit().await?;
855        Ok(directory.object_id())
856    }
857
858    /// Returns the file size for the object without opening the object.
859    async fn get_file_size(&self, object_id: u64) -> Result<u64, Error> {
860        let item = self
861            .tree
862            .find(&ObjectKey::attribute(
863                object_id,
864                DEFAULT_DATA_ATTRIBUTE_ID,
865                AttributeKey::Attribute,
866            ))
867            .await?
868            .ok_or(FxfsError::NotFound)?;
869        if let ObjectValue::Attribute { size, .. } = item.value {
870            Ok(size)
871        } else {
872            bail!(FxfsError::NotFile);
873        }
874    }
875
876    #[cfg(feature = "migration")]
877    pub fn last_object_id(&self) -> u64 {
878        self.last_object_id.lock().id
879    }
880
881    /// Provides access to the allocator to mark a specific region of the device as allocated.
882    #[cfg(feature = "migration")]
883    pub fn mark_allocated(
884        &self,
885        transaction: &mut Transaction<'_>,
886        store_object_id: u64,
887        device_range: std::ops::Range<u64>,
888    ) -> Result<(), Error> {
889        self.allocator().mark_allocated(transaction, store_object_id, device_range)
890    }
891
892    /// `crypt` can be provided if the crypt service should be different to the default; see the
893    /// comment on create_object.  Users should avoid having more than one handle open for the same
894    /// object at the same time because they might get out-of-sync; there is no code that will
895    /// prevent this.  One example where this can cause an issue is if the object ends up using a
896    /// permanent key (which is the case if a value is passed for `crypt`), the permanent key is
897    /// dropped when a handle is dropped, which will impact any other handles for the same object.
898    pub async fn open_object<S: HandleOwner>(
899        owner: &Arc<S>,
900        obj_id: u64,
901        options: HandleOptions,
902        crypt: Option<Arc<dyn Crypt>>,
903    ) -> Result<DataObjectHandle<S>, Error> {
904        let store = owner.as_ref().as_ref();
905        let mut fsverity_descriptor = None;
906        let mut overwrite_ranges = Vec::new();
907        let item = store
908            .tree
909            .find(&ObjectKey::attribute(obj_id, DEFAULT_DATA_ATTRIBUTE_ID, AttributeKey::Attribute))
910            .await?
911            .ok_or(FxfsError::NotFound)?;
912
913        let (size, track_overwrite_extents) = match item.value {
914            ObjectValue::Attribute { size, has_overwrite_extents } => (size, has_overwrite_extents),
915            ObjectValue::VerifiedAttribute { size, fsverity_metadata } => {
916                if !options.skip_fsverity {
917                    fsverity_descriptor = Some(fsverity_metadata);
918                }
919                // We only track the overwrite extents in memory for writes, reads handle them
920                // implicitly, which means verified files (where the data won't change anymore)
921                // don't need to track them.
922                (size, false)
923            }
924            _ => bail!(anyhow!(FxfsError::Inconsistent).context("open_object: Expected attibute")),
925        };
926
927        ensure!(size <= MAX_FILE_SIZE, FxfsError::Inconsistent);
928
929        if track_overwrite_extents {
930            let layer_set = store.tree.layer_set();
931            let mut merger = layer_set.merger();
932            let mut iter = merger
933                .query(Query::FullRange(&ObjectKey::attribute(
934                    obj_id,
935                    DEFAULT_DATA_ATTRIBUTE_ID,
936                    AttributeKey::Extent(ExtentKey::search_key_from_offset(0)),
937                )))
938                .await?;
939            loop {
940                match iter.get() {
941                    Some(ItemRef {
942                        key:
943                            ObjectKey {
944                                object_id,
945                                data:
946                                    ObjectKeyData::Attribute(
947                                        attribute_id,
948                                        AttributeKey::Extent(ExtentKey { range }),
949                                    ),
950                            },
951                        value,
952                        ..
953                    }) if *object_id == obj_id && *attribute_id == DEFAULT_DATA_ATTRIBUTE_ID => {
954                        match value {
955                            ObjectValue::Extent(ExtentValue::None)
956                            | ObjectValue::Extent(ExtentValue::Some {
957                                mode: ExtentMode::Raw,
958                                ..
959                            })
960                            | ObjectValue::Extent(ExtentValue::Some {
961                                mode: ExtentMode::Cow(_),
962                                ..
963                            }) => (),
964                            ObjectValue::Extent(ExtentValue::Some {
965                                mode: ExtentMode::OverwritePartial(_),
966                                ..
967                            })
968                            | ObjectValue::Extent(ExtentValue::Some {
969                                mode: ExtentMode::Overwrite,
970                                ..
971                            }) => overwrite_ranges.push(range.clone()),
972                            _ => bail!(
973                                anyhow!(FxfsError::Inconsistent)
974                                    .context("open_object: Expected extent")
975                            ),
976                        }
977                        iter.advance().await?;
978                    }
979                    _ => break,
980                }
981            }
982        }
983
984        // If a crypt service has been specified, it needs to be a permanent key because cached
985        // keys can only use the store's crypt service.
986        let permanent = if let Some(crypt) = crypt {
987            store
988                .key_manager
989                .get_keys(
990                    obj_id,
991                    crypt.as_ref(),
992                    &mut Some(async || store.get_keys(obj_id).await),
993                    /* permanent= */ true,
994                    /* force= */ false,
995                )
996                .await?;
997            true
998        } else {
999            false
1000        };
1001        let data_object_handle = DataObjectHandle::new(
1002            owner.clone(),
1003            obj_id,
1004            permanent,
1005            DEFAULT_DATA_ATTRIBUTE_ID,
1006            size,
1007            FsverityState::None,
1008            options,
1009            false,
1010            &overwrite_ranges,
1011        );
1012        if let Some(descriptor) = fsverity_descriptor {
1013            data_object_handle
1014                .set_fsverity_state_some(descriptor)
1015                .await
1016                .context("Invalid or mismatched merkle tree")?;
1017        }
1018        Ok(data_object_handle)
1019    }
1020
1021    pub fn create_object_with_id<S: HandleOwner>(
1022        owner: &Arc<S>,
1023        transaction: &mut Transaction<'_>,
1024        object_id: u64,
1025        options: HandleOptions,
1026        encryption_options: Option<ObjectEncryptionOptions>,
1027    ) -> Result<DataObjectHandle<S>, Error> {
1028        debug_assert!(object_id != INVALID_OBJECT_ID);
1029        let store = owner.as_ref().as_ref();
1030        // Don't permit creating unencrypted objects in an encrypted store.  The converse is OK.
1031        debug_assert!(store.crypt().is_none() || encryption_options.is_some());
1032        let now = Timestamp::now();
1033        transaction.add(
1034            store.store_object_id(),
1035            Mutation::insert_object(
1036                ObjectKey::object(object_id),
1037                ObjectValue::file(1, 0, now.clone(), now.clone(), now.clone(), now, 0, None),
1038            ),
1039        );
1040        let mut permanent_keys = false;
1041        if let Some(ObjectEncryptionOptions { permanent, key_id, key, unwrapped_key }) =
1042            encryption_options
1043        {
1044            permanent_keys = permanent;
1045            transaction.add(
1046                store.store_object_id(),
1047                Mutation::insert_object(
1048                    ObjectKey::keys(object_id),
1049                    ObjectValue::keys(vec![(key_id, key)].into()),
1050                ),
1051            );
1052            let cipher: Arc<dyn Cipher> = Arc::new(FxfsCipher::new(&unwrapped_key));
1053            store.key_manager.insert(
1054                object_id,
1055                Arc::new(vec![(key_id, CipherHolder::Cipher(cipher))].into()),
1056                permanent,
1057            );
1058        }
1059        transaction.add(
1060            store.store_object_id(),
1061            Mutation::insert_object(
1062                ObjectKey::attribute(object_id, DEFAULT_DATA_ATTRIBUTE_ID, AttributeKey::Attribute),
1063                // This is a new object so nothing has pre-allocated overwrite extents yet.
1064                ObjectValue::attribute(0, false),
1065            ),
1066        );
1067        Ok(DataObjectHandle::new(
1068            owner.clone(),
1069            object_id,
1070            permanent_keys,
1071            DEFAULT_DATA_ATTRIBUTE_ID,
1072            0,
1073            FsverityState::None,
1074            options,
1075            false,
1076            &[],
1077        ))
1078    }
1079
1080    /// Creates an object in the store.
1081    ///
1082    /// If the store is encrypted, the object will be automatically encrypted as well.
1083    /// If `wrapping_key_id` is set, the new keys will be wrapped with that specific key, and
1084    /// otherwise the default data key is used.
1085    pub async fn create_object<S: HandleOwner>(
1086        owner: &Arc<S>,
1087        mut transaction: &mut Transaction<'_>,
1088        options: HandleOptions,
1089        wrapping_key_id: Option<WrappingKeyId>,
1090    ) -> Result<DataObjectHandle<S>, Error> {
1091        let store = owner.as_ref().as_ref();
1092        let object_id = store.get_next_object_id(transaction.txn_guard()).await?;
1093        let crypt = store.crypt();
1094        let encryption_options = if let Some(crypt) = crypt {
1095            let key_id =
1096                if wrapping_key_id.is_some() { FSCRYPT_KEY_ID } else { VOLUME_DATA_KEY_ID };
1097            let (key, unwrapped_key) = if let Some(wrapping_key_id) = wrapping_key_id {
1098                crypt.create_key_with_id(object_id, wrapping_key_id, ObjectType::File).await?
1099            } else {
1100                let (fxfs_key, unwrapped_key) =
1101                    crypt.create_key(object_id, KeyPurpose::Data).await?;
1102                (EncryptionKey::Fxfs(fxfs_key), unwrapped_key)
1103            };
1104            Some(ObjectEncryptionOptions { permanent: false, key_id, key, unwrapped_key })
1105        } else {
1106            None
1107        };
1108        ObjectStore::create_object_with_id(
1109            owner,
1110            &mut transaction,
1111            object_id,
1112            options,
1113            encryption_options,
1114        )
1115    }
1116
1117    /// Creates an object using explicitly provided keys.
1118    ///
1119    /// There are some cases where an encrypted object needs to be created in an unencrypted store.
1120    /// For example, when layer files for a child store are created in the root store, but they must
1121    /// be encrypted using the child store's keys.  This method exists for that purpose.
1122    pub(crate) async fn create_object_with_key<S: HandleOwner>(
1123        owner: &Arc<S>,
1124        mut transaction: &mut Transaction<'_>,
1125        object_id: u64,
1126        options: HandleOptions,
1127        key: EncryptionKey,
1128        unwrapped_key: UnwrappedKey,
1129    ) -> Result<DataObjectHandle<S>, Error> {
1130        ObjectStore::create_object_with_id(
1131            owner,
1132            &mut transaction,
1133            object_id,
1134            options,
1135            Some(ObjectEncryptionOptions {
1136                permanent: true,
1137                key_id: VOLUME_DATA_KEY_ID,
1138                key,
1139                unwrapped_key,
1140            }),
1141        )
1142    }
1143
1144    /// Adjusts the reference count for a given object.  If the reference count reaches zero, the
1145    /// object is moved into the graveyard and true is returned.
1146    pub async fn adjust_refs(
1147        &self,
1148        transaction: &mut Transaction<'_>,
1149        object_id: u64,
1150        delta: i64,
1151    ) -> Result<bool, Error> {
1152        let mut mutation = self.txn_get_object_mutation(transaction, object_id).await?;
1153        let refs = if let ObjectValue::Object {
1154            kind: ObjectKind::File { refs, .. } | ObjectKind::Symlink { refs, .. },
1155            ..
1156        } = &mut mutation.item.value
1157        {
1158            *refs =
1159                refs.checked_add_signed(delta).ok_or_else(|| anyhow!("refs underflow/overflow"))?;
1160            refs
1161        } else {
1162            bail!(FxfsError::NotFile);
1163        };
1164        if *refs == 0 {
1165            self.add_to_graveyard(transaction, object_id);
1166
1167            // We might still need to adjust the reference count if delta was something other than
1168            // -1.
1169            if delta != -1 {
1170                *refs = 1;
1171                transaction.add(self.store_object_id, Mutation::ObjectStore(mutation));
1172            }
1173            // Otherwise, we don't commit the mutation as we want to keep reference count as 1 for
1174            // objects in graveyard.
1175            Ok(true)
1176        } else {
1177            transaction.add(self.store_object_id, Mutation::ObjectStore(mutation));
1178            Ok(false)
1179        }
1180    }
1181
1182    // Purges an object that is in the graveyard.
1183    pub async fn tombstone_object(
1184        &self,
1185        object_id: u64,
1186        txn_options: Options<'_>,
1187    ) -> Result<(), Error> {
1188        self.key_manager.remove(object_id).await;
1189        let fs = self.filesystem();
1190        let truncate_guard = fs.truncate_guard(self.store_object_id, object_id).await;
1191        self.trim_or_tombstone(object_id, true, txn_options, &truncate_guard).await
1192    }
1193
1194    /// Trim extents beyond the end of a file for all attributes.  This will remove the entry from
1195    /// the graveyard when done.
1196    pub async fn trim(
1197        &self,
1198        object_id: u64,
1199        truncate_guard: &TruncateGuard<'_>,
1200    ) -> Result<(), Error> {
1201        // For the root and root parent store, we would need to use the metadata reservation which
1202        // we don't currently support, so assert that we're not those stores.
1203        assert!(self.parent_store.as_ref().unwrap().parent_store.is_some());
1204
1205        self.trim_or_tombstone(
1206            object_id,
1207            false,
1208            Options { borrow_metadata_space: true, ..Default::default() },
1209            truncate_guard,
1210        )
1211        .await
1212    }
1213
1214    /// Trims or tombstones an object.
1215    async fn trim_or_tombstone(
1216        &self,
1217        object_id: u64,
1218        for_tombstone: bool,
1219        txn_options: Options<'_>,
1220        _truncate_guard: &TruncateGuard<'_>,
1221    ) -> Result<(), Error> {
1222        let fs = self.filesystem();
1223        let mut next_attribute = Some(0);
1224        while let Some(attribute_id) = next_attribute.take() {
1225            let mut transaction = fs
1226                .clone()
1227                .new_transaction(
1228                    lock_keys![
1229                        LockKey::object_attribute(self.store_object_id, object_id, attribute_id),
1230                        LockKey::object(self.store_object_id, object_id),
1231                    ],
1232                    txn_options,
1233                )
1234                .await?;
1235
1236            match self
1237                .trim_some(
1238                    &mut transaction,
1239                    object_id,
1240                    attribute_id,
1241                    if for_tombstone {
1242                        TrimMode::Tombstone(TombstoneMode::Object)
1243                    } else {
1244                        TrimMode::UseSize
1245                    },
1246                )
1247                .await?
1248            {
1249                TrimResult::Incomplete => next_attribute = Some(attribute_id),
1250                TrimResult::Done(None) => {
1251                    if for_tombstone
1252                        || matches!(
1253                            self.tree
1254                                .find(&ObjectKey::graveyard_entry(
1255                                    self.graveyard_directory_object_id(),
1256                                    object_id,
1257                                ))
1258                                .await?,
1259                            Some(Item { value: ObjectValue::Trim, .. })
1260                        )
1261                    {
1262                        self.remove_from_graveyard(&mut transaction, object_id);
1263                    }
1264                }
1265                TrimResult::Done(id) => next_attribute = id,
1266            }
1267
1268            if !transaction.mutations().is_empty() {
1269                transaction.commit().await?;
1270            }
1271        }
1272        Ok(())
1273    }
1274
1275    // Purges an object's attribute that is in the graveyard.
1276    pub async fn tombstone_attribute(
1277        &self,
1278        object_id: u64,
1279        attribute_id: u64,
1280        txn_options: Options<'_>,
1281    ) -> Result<(), Error> {
1282        let fs = self.filesystem();
1283        let mut trim_result = TrimResult::Incomplete;
1284        while matches!(trim_result, TrimResult::Incomplete) {
1285            let mut transaction = fs
1286                .clone()
1287                .new_transaction(
1288                    lock_keys![
1289                        LockKey::object_attribute(self.store_object_id, object_id, attribute_id),
1290                        LockKey::object(self.store_object_id, object_id),
1291                    ],
1292                    txn_options,
1293                )
1294                .await?;
1295            trim_result = self
1296                .trim_some(
1297                    &mut transaction,
1298                    object_id,
1299                    attribute_id,
1300                    TrimMode::Tombstone(TombstoneMode::Attribute),
1301                )
1302                .await?;
1303            if let TrimResult::Done(..) = trim_result {
1304                self.remove_attribute_from_graveyard(&mut transaction, object_id, attribute_id)
1305            }
1306            if !transaction.mutations().is_empty() {
1307                transaction.commit().await?;
1308            }
1309        }
1310        Ok(())
1311    }
1312
1313    /// Deletes extents for attribute `attribute_id` in object `object_id`.  Also see the comments
1314    /// for TrimMode and TrimResult. Should hold a lock on the attribute, and the object as it
1315    /// performs a read-modify-write on the sizes.
1316    pub async fn trim_some(
1317        &self,
1318        transaction: &mut Transaction<'_>,
1319        object_id: u64,
1320        attribute_id: u64,
1321        mode: TrimMode,
1322    ) -> Result<TrimResult, Error> {
1323        let layer_set = self.tree.layer_set();
1324        let mut merger = layer_set.merger();
1325
1326        let aligned_offset = match mode {
1327            TrimMode::FromOffset(offset) => {
1328                round_up(offset, self.block_size).ok_or(FxfsError::Inconsistent)?
1329            }
1330            TrimMode::Tombstone(..) => 0,
1331            TrimMode::UseSize => {
1332                let iter = merger
1333                    .query(Query::FullRange(&ObjectKey::attribute(
1334                        object_id,
1335                        attribute_id,
1336                        AttributeKey::Attribute,
1337                    )))
1338                    .await?;
1339                if let Some(item_ref) = iter.get() {
1340                    if item_ref.key.object_id != object_id {
1341                        return Ok(TrimResult::Done(None));
1342                    }
1343
1344                    if let ItemRef {
1345                        key:
1346                            ObjectKey {
1347                                data:
1348                                    ObjectKeyData::Attribute(size_attribute_id, AttributeKey::Attribute),
1349                                ..
1350                            },
1351                        value: ObjectValue::Attribute { size, .. },
1352                        ..
1353                    } = item_ref
1354                    {
1355                        // If we found a different attribute_id, return so we can get the
1356                        // right lock.
1357                        if *size_attribute_id != attribute_id {
1358                            return Ok(TrimResult::Done(Some(*size_attribute_id)));
1359                        }
1360                        round_up(*size, self.block_size).ok_or(FxfsError::Inconsistent)?
1361                    } else {
1362                        // At time of writing, we should always see a size record or None here, but
1363                        // asserting here would be brittle so just skip to the the next attribute
1364                        // instead.
1365                        return Ok(TrimResult::Done(Some(attribute_id + 1)));
1366                    }
1367                } else {
1368                    // End of the tree.
1369                    return Ok(TrimResult::Done(None));
1370                }
1371            }
1372        };
1373
1374        // Loop over the extents and deallocate them.
1375        let mut iter = merger
1376            .query(Query::FullRange(&ObjectKey::from_extent(
1377                object_id,
1378                attribute_id,
1379                ExtentKey::search_key_from_offset(aligned_offset),
1380            )))
1381            .await?;
1382        let mut end = 0;
1383        let allocator = self.allocator();
1384        let mut result = TrimResult::Done(None);
1385        let mut deallocated = 0;
1386        let block_size = self.block_size;
1387
1388        while let Some(item_ref) = iter.get() {
1389            if item_ref.key.object_id != object_id {
1390                break;
1391            }
1392            if let ObjectKey {
1393                data: ObjectKeyData::Attribute(extent_attribute_id, attribute_key),
1394                ..
1395            } = item_ref.key
1396            {
1397                if *extent_attribute_id != attribute_id {
1398                    result = TrimResult::Done(Some(*extent_attribute_id));
1399                    break;
1400                }
1401                if let (
1402                    AttributeKey::Extent(ExtentKey { range }),
1403                    ObjectValue::Extent(ExtentValue::Some { device_offset, .. }),
1404                ) = (attribute_key, item_ref.value)
1405                {
1406                    let start = std::cmp::max(range.start, aligned_offset);
1407                    ensure!(start < range.end, FxfsError::Inconsistent);
1408                    let device_offset = device_offset
1409                        .checked_add(start - range.start)
1410                        .ok_or(FxfsError::Inconsistent)?;
1411                    end = range.end;
1412                    let len = end - start;
1413                    let device_range = device_offset..device_offset + len;
1414                    ensure!(device_range.is_aligned(block_size), FxfsError::Inconsistent);
1415                    allocator.deallocate(transaction, self.store_object_id, device_range).await?;
1416                    deallocated += len;
1417                    // Stop if the transaction is getting too big.
1418                    if transaction.mutations().len() >= TRANSACTION_MUTATION_THRESHOLD {
1419                        result = TrimResult::Incomplete;
1420                        break;
1421                    }
1422                }
1423            }
1424            iter.advance().await?;
1425        }
1426
1427        let finished_tombstone_object = matches!(mode, TrimMode::Tombstone(TombstoneMode::Object))
1428            && matches!(result, TrimResult::Done(None));
1429        let finished_tombstone_attribute =
1430            matches!(mode, TrimMode::Tombstone(TombstoneMode::Attribute))
1431                && !matches!(result, TrimResult::Incomplete);
1432        let mut object_mutation = None;
1433        let nodes = if finished_tombstone_object { -1 } else { 0 };
1434        if nodes != 0 || deallocated != 0 {
1435            let mutation = self.txn_get_object_mutation(transaction, object_id).await?;
1436            if let ObjectValue::Object { attributes: ObjectAttributes { project_id, .. }, .. } =
1437                mutation.item.value
1438            {
1439                if project_id != 0 {
1440                    transaction.add(
1441                        self.store_object_id,
1442                        Mutation::merge_object(
1443                            ObjectKey::project_usage(self.root_directory_object_id(), project_id),
1444                            ObjectValue::BytesAndNodes {
1445                                bytes: -i64::try_from(deallocated).unwrap(),
1446                                nodes,
1447                            },
1448                        ),
1449                    );
1450                }
1451                object_mutation = Some(mutation);
1452            } else {
1453                panic!("Inconsistent object type.");
1454            }
1455        }
1456
1457        // Deletion marker records *must* be merged so as to consume all other records for the
1458        // object.
1459        if finished_tombstone_object {
1460            transaction.add(
1461                self.store_object_id,
1462                Mutation::merge_object(ObjectKey::object(object_id), ObjectValue::None),
1463            );
1464        } else {
1465            if finished_tombstone_attribute {
1466                transaction.add(
1467                    self.store_object_id,
1468                    Mutation::merge_object(
1469                        ObjectKey::attribute(object_id, attribute_id, AttributeKey::Attribute),
1470                        ObjectValue::None,
1471                    ),
1472                );
1473            }
1474            if deallocated > 0 {
1475                let mut mutation = match object_mutation {
1476                    Some(mutation) => mutation,
1477                    None => self.txn_get_object_mutation(transaction, object_id).await?,
1478                };
1479                transaction.add(
1480                    self.store_object_id,
1481                    Mutation::merge_object(
1482                        ObjectKey::extent(object_id, attribute_id, aligned_offset..end),
1483                        ObjectValue::deleted_extent(),
1484                    ),
1485                );
1486                // Update allocated size.
1487                if let ObjectValue::Object {
1488                    attributes: ObjectAttributes { allocated_size, .. },
1489                    ..
1490                } = &mut mutation.item.value
1491                {
1492                    // The only way for these to fail are if the volume is inconsistent.
1493                    *allocated_size = allocated_size.checked_sub(deallocated).ok_or_else(|| {
1494                        anyhow!(FxfsError::Inconsistent).context("Allocated size overflow")
1495                    })?;
1496                } else {
1497                    panic!("Unexpected object value");
1498                }
1499                transaction.add(self.store_object_id, Mutation::ObjectStore(mutation));
1500            }
1501        }
1502        Ok(result)
1503    }
1504
1505    /// Returns all objects that exist in the parent store that pertain to this object store.
1506    /// Note that this doesn't include the object_id of the store itself which is generally
1507    /// referenced externally.
1508    pub fn parent_objects(&self) -> Vec<u64> {
1509        assert!(self.store_info_handle.get().is_some());
1510        self.store_info.lock().as_ref().unwrap().parent_objects()
1511    }
1512
1513    /// Returns root objects for this store.
1514    pub fn root_objects(&self) -> Vec<u64> {
1515        let mut objects = Vec::new();
1516        let store_info = self.store_info.lock();
1517        let info = store_info.as_ref().unwrap();
1518        if info.root_directory_object_id != INVALID_OBJECT_ID {
1519            objects.push(info.root_directory_object_id);
1520        }
1521        if info.graveyard_directory_object_id != INVALID_OBJECT_ID {
1522            objects.push(info.graveyard_directory_object_id);
1523        }
1524        if info.internal_directory_object_id != INVALID_OBJECT_ID {
1525            objects.push(info.internal_directory_object_id);
1526        }
1527        objects
1528    }
1529
1530    pub fn store_info(&self) -> Option<StoreInfo> {
1531        self.store_info.lock().as_ref().cloned()
1532    }
1533
1534    /// Returns None if called during journal replay.
1535    pub fn store_info_handle_object_id(&self) -> Option<u64> {
1536        self.store_info_handle.get().map(|h| h.object_id())
1537    }
1538
1539    /// Called to open a store, before replay of this store's mutations.
1540    async fn open(
1541        parent_store: &Arc<ObjectStore>,
1542        store_object_id: u64,
1543        object_cache: Box<dyn ObjectCache<ObjectKey, ObjectValue>>,
1544    ) -> Result<Arc<ObjectStore>, Error> {
1545        let handle =
1546            ObjectStore::open_object(parent_store, store_object_id, HandleOptions::default(), None)
1547                .await?;
1548
1549        let info = load_store_info(parent_store, store_object_id).await?;
1550        let is_encrypted = info.mutations_key.is_some();
1551
1552        let mut total_layer_size = 0;
1553        let last_object_id;
1554
1555        // TODO(https://fxbug.dev/42178043): the layer size here could be bad and cause overflow.
1556
1557        // If the store is encrypted, we can't open the object tree layers now, but we need to
1558        // compute the size of the layers.
1559        if is_encrypted {
1560            for &oid in &info.layers {
1561                total_layer_size += parent_store.get_file_size(oid).await?;
1562            }
1563            if info.encrypted_mutations_object_id != INVALID_OBJECT_ID {
1564                total_layer_size += layer_size_from_encrypted_mutations_size(
1565                    parent_store.get_file_size(info.encrypted_mutations_object_id).await?,
1566                );
1567            }
1568            last_object_id = LastObjectId::default();
1569        } else {
1570            last_object_id = LastObjectId { id: info.last_object_id, cipher: None };
1571        }
1572
1573        let fs = parent_store.filesystem();
1574
1575        let store = ObjectStore::new(
1576            Some(parent_store.clone()),
1577            store_object_id,
1578            fs.clone(),
1579            if is_encrypted { None } else { Some(info) },
1580            object_cache,
1581            None,
1582            if is_encrypted { LockState::Locked } else { LockState::Unencrypted },
1583            last_object_id,
1584        );
1585
1586        assert!(store.store_info_handle.set(handle).is_ok(), "Failed to set store_info_handle!");
1587
1588        if !is_encrypted {
1589            let object_tree_layer_object_ids =
1590                store.store_info.lock().as_ref().unwrap().layers.clone();
1591            let object_layers = store.open_layers(object_tree_layer_object_ids, None).await?;
1592            total_layer_size = object_layers.iter().map(|h| h.get_size()).sum();
1593            store
1594                .tree
1595                .append_layers(object_layers)
1596                .await
1597                .context("Failed to read object store layers")?;
1598        }
1599
1600        fs.object_manager().update_reservation(
1601            store_object_id,
1602            tree::reservation_amount_from_layer_size(total_layer_size),
1603        );
1604
1605        Ok(store)
1606    }
1607
1608    async fn load_store_info(&self) -> Result<StoreInfo, Error> {
1609        load_store_info(self.parent_store.as_ref().unwrap(), self.store_object_id).await
1610    }
1611
1612    async fn open_layers(
1613        &self,
1614        object_ids: impl std::iter::IntoIterator<Item = u64>,
1615        crypt: Option<Arc<dyn Crypt>>,
1616    ) -> Result<Vec<DataObjectHandle<ObjectStore>>, Error> {
1617        let parent_store = self.parent_store.as_ref().unwrap();
1618        let mut handles = Vec::new();
1619        for object_id in object_ids {
1620            let handle = ObjectStore::open_object(
1621                &parent_store,
1622                object_id,
1623                HandleOptions::default(),
1624                crypt.clone(),
1625            )
1626            .await
1627            .with_context(|| format!("Failed to open layer file {}", object_id))?;
1628            handles.push(handle);
1629        }
1630        Ok(handles)
1631    }
1632
1633    /// Unlocks a store so that it is ready to be used.
1634    /// This is not thread-safe.
1635    pub async fn unlock(
1636        self: &Arc<Self>,
1637        owner: Weak<dyn StoreOwner>,
1638        crypt: Arc<dyn Crypt>,
1639    ) -> Result<(), Error> {
1640        self.unlock_inner(owner, crypt, /*read_only=*/ false).await
1641    }
1642
1643    /// Unlocks a store so that it is ready to be read from.
1644    /// The store will generally behave like it is still locked: when flushed, the store will
1645    /// write out its mutations into the encrypted mutations file, rather than directly updating
1646    /// the layer files of the object store.
1647    /// Re-locking the store (which *must* be done with `Self::lock_read_only` will not trigger a
1648    /// flush, although the store might still be flushed during other operations.
1649    /// This is not thread-safe.
1650    pub async fn unlock_read_only(self: &Arc<Self>, crypt: Arc<dyn Crypt>) -> Result<(), Error> {
1651        self.unlock_inner(NO_OWNER, crypt, /*read_only=*/ true).await
1652    }
1653
1654    async fn unlock_inner(
1655        self: &Arc<Self>,
1656        owner: Weak<dyn StoreOwner>,
1657        crypt: Arc<dyn Crypt>,
1658        read_only: bool,
1659    ) -> Result<(), Error> {
1660        // Unless we are unlocking the store as read-only, the filesystem must not be read-only.
1661        assert!(read_only || !self.filesystem().options().read_only);
1662        match &*self.lock_state.lock() {
1663            LockState::Locked => {}
1664            LockState::Unencrypted => bail!(FxfsError::InvalidArgs),
1665            LockState::Invalid | LockState::Deleted => bail!(FxfsError::Internal),
1666            LockState::Unlocked { .. } | LockState::UnlockedReadOnly(..) => {
1667                bail!(FxfsError::AlreadyBound)
1668            }
1669            LockState::Unknown => panic!("Store was unlocked before replay"),
1670            LockState::Locking => panic!("Store is being locked"),
1671            LockState::Unlocking => panic!("Store is being unlocked"),
1672        }
1673        // We must lock flushing since that can modify store_info and the encrypted mutations file.
1674        let keys = lock_keys![LockKey::flush(self.store_object_id())];
1675        let fs = self.filesystem();
1676        let guard = fs.lock_manager().write_lock(keys).await;
1677
1678        let store_info = self.load_store_info().await?;
1679
1680        self.tree
1681            .append_layers(
1682                self.open_layers(store_info.layers.iter().cloned(), Some(crypt.clone())).await?,
1683            )
1684            .await
1685            .context("Failed to read object tree layer file contents")?;
1686
1687        let wrapped_key =
1688            fxfs_crypto::WrappedKey::Fxfs(store_info.mutations_key.clone().unwrap().into());
1689        let unwrapped_key = crypt
1690            .unwrap_key(&wrapped_key, self.store_object_id)
1691            .await
1692            .context("Failed to unwrap mutations keys")?;
1693        // The ChaCha20 stream cipher we use supports up to 64 GiB.  By default we'll roll the key
1694        // after every 128 MiB.  Here we just need to pick a number that won't cause issues if it
1695        // wraps, so we just use u32::MAX (the offset is u64).
1696        ensure!(store_info.mutations_cipher_offset <= u32::MAX as u64, FxfsError::Inconsistent);
1697        let mut mutations_cipher =
1698            StreamCipher::new(&unwrapped_key, store_info.mutations_cipher_offset);
1699
1700        let wrapped_key = fxfs_crypto::WrappedKey::Fxfs(
1701            store_info.object_id_key.clone().ok_or(FxfsError::Inconsistent)?.into(),
1702        );
1703        let object_id_cipher =
1704            Ff1::new(&crypt.unwrap_key(&wrapped_key, self.store_object_id).await?);
1705        {
1706            let mut last_object_id = self.last_object_id.lock();
1707            last_object_id.cipher = Some(object_id_cipher);
1708            last_object_id.id = store_info.last_object_id;
1709        }
1710
1711        // Apply the encrypted mutations.
1712        let mut mutations = {
1713            if store_info.encrypted_mutations_object_id == INVALID_OBJECT_ID {
1714                EncryptedMutations::default()
1715            } else {
1716                let parent_store = self.parent_store.as_ref().unwrap();
1717                let handle = ObjectStore::open_object(
1718                    &parent_store,
1719                    store_info.encrypted_mutations_object_id,
1720                    HandleOptions::default(),
1721                    None,
1722                )
1723                .await?;
1724                let mut cursor = std::io::Cursor::new(
1725                    handle
1726                        .contents(MAX_ENCRYPTED_MUTATIONS_SIZE)
1727                        .await
1728                        .context(FxfsError::Inconsistent)?,
1729                );
1730                let mut mutations = EncryptedMutations::deserialize_with_version(&mut cursor)
1731                    .context("Failed to deserialize EncryptedMutations")?
1732                    .0;
1733                let len = cursor.get_ref().len() as u64;
1734                while cursor.position() < len {
1735                    mutations.extend(
1736                        &EncryptedMutations::deserialize_with_version(&mut cursor)
1737                            .context("Failed to deserialize EncryptedMutations")?
1738                            .0,
1739                    );
1740                }
1741                mutations
1742            }
1743        };
1744
1745        // This assumes that the journal has no buffered mutations for this store (see Self::lock).
1746        let journaled = EncryptedMutations::from_replayed_mutations(
1747            self.store_object_id,
1748            fs.journal()
1749                .read_transactions_for_object(self.store_object_id)
1750                .await
1751                .context("Failed to read encrypted mutations from journal")?,
1752        );
1753        mutations.extend(&journaled);
1754
1755        let _ = std::mem::replace(&mut *self.lock_state.lock(), LockState::Unlocking);
1756        *self.store_info.lock() = Some(store_info);
1757
1758        // If we fail, clean up.
1759        let clean_up = scopeguard::guard((), |_| {
1760            *self.lock_state.lock() = LockState::Locked;
1761            *self.store_info.lock() = None;
1762            // Make sure we don't leave unencrypted data lying around in memory.
1763            self.tree.reset();
1764        });
1765
1766        let EncryptedMutations { transactions, mut data, mutations_key_roll } = mutations;
1767
1768        let mut slice = &mut data[..];
1769        let mut last_offset = 0;
1770        for (offset, key) in mutations_key_roll {
1771            let split_offset = offset
1772                .checked_sub(last_offset)
1773                .ok_or(FxfsError::Inconsistent)
1774                .context("Invalid mutation key roll offset")?;
1775            last_offset = offset;
1776            ensure!(split_offset <= slice.len(), FxfsError::Inconsistent);
1777            let (old, new) = slice.split_at_mut(split_offset);
1778            mutations_cipher.decrypt(old);
1779            let unwrapped_key = crypt
1780                .unwrap_key(&fxfs_crypto::WrappedKey::Fxfs(key.into()), self.store_object_id)
1781                .await
1782                .context("Failed to unwrap mutations keys")?;
1783            mutations_cipher = StreamCipher::new(&unwrapped_key, 0);
1784            slice = new;
1785        }
1786        mutations_cipher.decrypt(slice);
1787
1788        // Always roll the mutations key when we unlock which guarantees we won't reuse a
1789        // previous key and nonce.
1790        self.roll_mutations_key(crypt.as_ref()).await?;
1791
1792        let mut cursor = std::io::Cursor::new(data);
1793        for (checkpoint, count) in transactions {
1794            let context = ApplyContext { mode: ApplyMode::Replay, checkpoint };
1795            for _ in 0..count {
1796                let mutation =
1797                    Mutation::deserialize_from_version(&mut cursor, context.checkpoint.version)
1798                        .context("failed to deserialize encrypted mutation")?;
1799                self.apply_mutation(mutation, &context, AssocObj::None)
1800                    .context("failed to apply encrypted mutation")?;
1801            }
1802        }
1803
1804        *self.lock_state.lock() = if read_only {
1805            LockState::UnlockedReadOnly(crypt)
1806        } else {
1807            LockState::Unlocked { owner, crypt }
1808        };
1809
1810        // To avoid unbounded memory growth, we should flush the encrypted mutations now. Otherwise
1811        // it's possible for more writes to be queued and for the store to be locked before we can
1812        // flush anything and that can repeat.
1813        std::mem::drop(guard);
1814
1815        if !read_only && !self.filesystem().options().read_only {
1816            self.flush_with_reason(flush::Reason::Unlock).await?;
1817
1818            // Reap purged files within this store.
1819            let _ = self.filesystem().graveyard().initial_reap(&self).await?;
1820        }
1821
1822        // Return and cancel the clean up.
1823        Ok(ScopeGuard::into_inner(clean_up))
1824    }
1825
1826    pub fn is_locked(&self) -> bool {
1827        matches!(
1828            *self.lock_state.lock(),
1829            LockState::Locked | LockState::Locking | LockState::Unknown
1830        )
1831    }
1832
1833    /// NB: This is not the converse of `is_locked`, as there are lock states where neither are
1834    /// true.
1835    pub fn is_unlocked(&self) -> bool {
1836        matches!(
1837            *self.lock_state.lock(),
1838            LockState::Unlocked { .. } | LockState::UnlockedReadOnly { .. } | LockState::Unlocking
1839        )
1840    }
1841
1842    pub fn is_unknown(&self) -> bool {
1843        matches!(*self.lock_state.lock(), LockState::Unknown)
1844    }
1845
1846    pub fn is_encrypted(&self) -> bool {
1847        self.store_info.lock().as_ref().unwrap().mutations_key.is_some()
1848    }
1849
1850    // Locks a store.
1851    // This operation will take a flush lock on the store, in case any flushes are ongoing.  Any
1852    // ongoing store accesses might be interrupted by this.  See `Self::crypt`.
1853    // Whilst this can return an error, the store will be placed into an unusable but safe state
1854    // (i.e. no lingering unencrypted data) if an error is encountered.
1855    pub async fn lock(&self) -> Result<(), Error> {
1856        // We must lock flushing since it is not safe for that to be happening whilst we are locking
1857        // the store.
1858        let keys = lock_keys![LockKey::flush(self.store_object_id())];
1859        let fs = self.filesystem();
1860        let _guard = fs.lock_manager().write_lock(keys).await;
1861
1862        {
1863            let mut lock_state = self.lock_state.lock();
1864            if let LockState::Unlocked { .. } = &*lock_state {
1865                *lock_state = LockState::Locking;
1866            } else {
1867                panic!("Unexpected lock state: {:?}", &*lock_state);
1868            }
1869        }
1870
1871        // Sync the journal now to ensure that any buffered mutations for this store make it out to
1872        // disk.  This is necessary to be able to unlock the store again.
1873        // We need to establish a barrier at this point (so that the journaled writes are observable
1874        // by any future attempts to unlock the store), hence the flush_device.
1875        let sync_result =
1876            self.filesystem().sync(SyncOptions { flush_device: true, ..Default::default() }).await;
1877
1878        *self.lock_state.lock() = if let Err(error) = &sync_result {
1879            error!(error:?; "Failed to sync journal; store will no longer be usable");
1880            LockState::Invalid
1881        } else {
1882            LockState::Locked
1883        };
1884        self.key_manager.clear();
1885        *self.store_info.lock() = None;
1886        self.tree.reset();
1887
1888        sync_result
1889    }
1890
1891    // Locks a store which was previously unlocked read-only (see `Self::unlock_read_only`).  Data
1892    // is not flushed, and instead any journaled mutations are buffered back into the ObjectStore
1893    // and will be replayed next time the store is unlocked.
1894    pub fn lock_read_only(&self) {
1895        *self.lock_state.lock() = LockState::Locked;
1896        *self.store_info.lock() = None;
1897        self.tree.reset();
1898    }
1899
1900    // Returns INVALID_OBJECT_ID if the object ID cipher needs to be created or rolled.
1901    pub(super) fn maybe_get_next_object_id(&self) -> u64 {
1902        let mut last_object_id = self.last_object_id.lock();
1903        if last_object_id.should_create_cipher() {
1904            INVALID_OBJECT_ID
1905        } else {
1906            last_object_id.get_next_object_id()
1907        }
1908    }
1909
1910    /// Returns a new object ID that can be used.  This will create an object ID cipher if needed.
1911    ///
1912    /// If the object ID key needs to be rolled, a new transaction will be created and committed.
1913    /// This transaction does not take the filesystem lock, hence `txn_guard`.
1914    pub async fn get_next_object_id(&self, txn_guard: &TxnGuard<'_>) -> Result<u64, Error> {
1915        let object_id = self.maybe_get_next_object_id();
1916        if object_id != INVALID_OBJECT_ID {
1917            return Ok(object_id);
1918        }
1919
1920        // Create a transaction (which has a lock) and then check again.
1921        let mut transaction = self
1922            .filesystem()
1923            .new_transaction(
1924                lock_keys![LockKey::object(
1925                    self.parent_store.as_ref().unwrap().store_object_id,
1926                    self.store_object_id,
1927                )],
1928                Options {
1929                    // We must skip journal checks because this transaction might be needed to
1930                    // compact.
1931                    skip_journal_checks: true,
1932                    borrow_metadata_space: true,
1933                    txn_guard: Some(txn_guard),
1934                    ..Default::default()
1935                },
1936            )
1937            .await?;
1938
1939        let next_id_hi = {
1940            let mut last_object_id = self.last_object_id.lock();
1941            if !last_object_id.should_create_cipher() {
1942                // We lost a race.
1943                return Ok(last_object_id.get_next_object_id());
1944            }
1945            // It shouldn't be possible for last_object_id to wrap within our lifetime, so if this
1946            // happens, it's most likely due to corruption.
1947            last_object_id.id.checked_add(1 << 32).ok_or(FxfsError::Inconsistent)?
1948                & OBJECT_ID_HI_MASK
1949        };
1950
1951        info!(store_id = self.store_object_id; "Rolling object ID key");
1952
1953        // Create a key.
1954        let (object_id_wrapped, object_id_unwrapped) =
1955            self.crypt().unwrap().create_key(self.store_object_id, KeyPurpose::Metadata).await?;
1956
1957        // Update StoreInfo.
1958        let buf = {
1959            let serialized_info = {
1960                let mut store_info = self.store_info.lock();
1961                let store_info = store_info.as_mut().unwrap();
1962                store_info.object_id_key = Some(object_id_wrapped);
1963                let mut serialized_info = Vec::new();
1964                store_info.serialize_with_version(&mut serialized_info)?;
1965                serialized_info
1966            };
1967            let mut buf = self.device.allocate_buffer(serialized_info.len()).await;
1968            buf.as_mut_slice().copy_from_slice(&serialized_info[..]);
1969            buf
1970        };
1971
1972        self.store_info_handle
1973            .get()
1974            .unwrap()
1975            .txn_write(&mut transaction, 0u64, buf.as_ref())
1976            .await?;
1977        transaction
1978            .commit_with_callback(|_| {
1979                let mut last_object_id = self.last_object_id.lock();
1980                last_object_id.id = next_id_hi;
1981                next_id_hi
1982                    | last_object_id.cipher.insert(Ff1::new(&object_id_unwrapped)).encrypt(0) as u64
1983            })
1984            .await
1985    }
1986
1987    /// Query the next object ID that will be used. Intended for use when checking filesystem
1988    /// consistency. Prefer [`Self::get_next_object_id()`] for general use.
1989    pub(crate) fn query_next_object_id(&self) -> u64 {
1990        let mut last_object_id = self.last_object_id.lock().clone();
1991        if last_object_id.should_create_cipher() {
1992            INVALID_OBJECT_ID
1993        } else {
1994            last_object_id.get_next_object_id()
1995        }
1996    }
1997
1998    fn allocator(&self) -> Arc<Allocator> {
1999        self.filesystem().allocator()
2000    }
2001
2002    // If |transaction| has an impending mutation for the underlying object, returns that.
2003    // Otherwise, looks up the object from the tree and returns a suitable mutation for it.  The
2004    // mutation is returned here rather than the item because the mutation includes the operation
2005    // which has significance: inserting an object implies it's the first of its kind unlike
2006    // replacing an object.
2007    async fn txn_get_object_mutation(
2008        &self,
2009        transaction: &Transaction<'_>,
2010        object_id: u64,
2011    ) -> Result<ObjectStoreMutation, Error> {
2012        if let Some(mutation) =
2013            transaction.get_object_mutation(self.store_object_id, ObjectKey::object(object_id))
2014        {
2015            Ok(mutation.clone())
2016        } else {
2017            Ok(ObjectStoreMutation {
2018                item: self
2019                    .tree
2020                    .find(&ObjectKey::object(object_id))
2021                    .await?
2022                    .ok_or(FxfsError::Inconsistent)
2023                    .context("Object id missing")?,
2024                op: Operation::ReplaceOrInsert,
2025            })
2026        }
2027    }
2028
2029    /// Like txn_get_object_mutation but with expanded visibility.
2030    /// Only available in migration code.
2031    #[cfg(feature = "migration")]
2032    pub async fn get_object_mutation(
2033        &self,
2034        transaction: &Transaction<'_>,
2035        object_id: u64,
2036    ) -> Result<ObjectStoreMutation, Error> {
2037        self.txn_get_object_mutation(transaction, object_id).await
2038    }
2039
2040    fn update_last_object_id(&self, mut object_id: u64) {
2041        let mut last_object_id = self.last_object_id.lock();
2042        // For encrypted stores, object_id will be encrypted here, so we must decrypt first.
2043        if let Some(cipher) = &last_object_id.cipher {
2044            // If the object ID cipher has been rolled, then it's possible we might see object IDs
2045            // that were generated using a different cipher so the decrypt here will return the
2046            // wrong value, but that won't matter because the hi part of the object ID should still
2047            // discriminate.
2048            object_id = object_id & OBJECT_ID_HI_MASK | cipher.decrypt(object_id as u32) as u64;
2049        }
2050        if object_id > last_object_id.id {
2051            last_object_id.id = object_id;
2052        }
2053    }
2054
2055    /// Adds the specified object to the graveyard.
2056    pub fn add_to_graveyard(&self, transaction: &mut Transaction<'_>, object_id: u64) {
2057        let graveyard_id = self.graveyard_directory_object_id();
2058        assert_ne!(graveyard_id, INVALID_OBJECT_ID);
2059        transaction.add(
2060            self.store_object_id,
2061            Mutation::replace_or_insert_object(
2062                ObjectKey::graveyard_entry(graveyard_id, object_id),
2063                ObjectValue::Some,
2064            ),
2065        );
2066    }
2067
2068    /// Removes the specified object from the graveyard.  NB: Care should be taken when calling
2069    /// this because graveyard entries are used for purging deleted files *and* for trimming
2070    /// extents.  For example, consider the following sequence:
2071    ///
2072    ///     1. Add Trim graveyard entry.
2073    ///     2. Replace with Some graveyard entry (see above).
2074    ///     3. Remove graveyard entry.
2075    ///
2076    /// If the desire in #3 is just to cancel the effect of the Some entry, then #3 should
2077    /// actually be:
2078    ///
2079    ///     3. Replace with Trim graveyard entry.
2080    pub fn remove_from_graveyard(&self, transaction: &mut Transaction<'_>, object_id: u64) {
2081        transaction.add(
2082            self.store_object_id,
2083            Mutation::replace_or_insert_object(
2084                ObjectKey::graveyard_entry(self.graveyard_directory_object_id(), object_id),
2085                ObjectValue::None,
2086            ),
2087        );
2088    }
2089
2090    /// Removes the specified attribute from the graveyard. Unlike object graveyard entries,
2091    /// attribute graveyard entries only have one functionality (i.e. to purge deleted attributes)
2092    /// so the caller does not need to be concerned about replacing the graveyard attribute entry
2093    /// with its prior state when cancelling it. See comment on `remove_from_graveyard()`.
2094    pub fn remove_attribute_from_graveyard(
2095        &self,
2096        transaction: &mut Transaction<'_>,
2097        object_id: u64,
2098        attribute_id: u64,
2099    ) {
2100        transaction.add(
2101            self.store_object_id,
2102            Mutation::replace_or_insert_object(
2103                ObjectKey::graveyard_attribute_entry(
2104                    self.graveyard_directory_object_id(),
2105                    object_id,
2106                    attribute_id,
2107                ),
2108                ObjectValue::None,
2109            ),
2110        );
2111    }
2112
2113    // Roll the mutations key.  The new key will be written for the next encrypted mutation.
2114    async fn roll_mutations_key(&self, crypt: &dyn Crypt) -> Result<(), Error> {
2115        let (wrapped_key, unwrapped_key) =
2116            crypt.create_key(self.store_object_id, KeyPurpose::Metadata).await?;
2117
2118        // The mutations_cipher lock must be held for the duration so that mutations_cipher and
2119        // store_info are updated atomically.  Otherwise, write_mutation could find a new cipher but
2120        // end up writing the wrong wrapped key.
2121        let mut cipher = self.mutations_cipher.lock();
2122        *cipher = Some(StreamCipher::new(&unwrapped_key, 0));
2123        self.store_info.lock().as_mut().unwrap().mutations_key = Some(wrapped_key);
2124        // mutations_cipher_offset is updated by flush.
2125        Ok(())
2126    }
2127
2128    // When the symlink is unlocked, this function decrypts `link` and returns a bag of bytes that
2129    // is identical to that which was passed in as the target on `create_symlink`.
2130    // If the symlink is locked, this function hashes the encrypted `link` with Sha256 in order to
2131    // get a standard length and then base64 encodes the hash and returns that to the caller.
2132    pub async fn read_encrypted_symlink(
2133        &self,
2134        object_id: u64,
2135        link: Vec<u8>,
2136    ) -> Result<Vec<u8>, Error> {
2137        let mut link = link;
2138        let key = self
2139            .key_manager()
2140            .get_fscrypt_key(object_id, self.crypt().unwrap().as_ref(), async || {
2141                self.get_keys(object_id).await
2142            })
2143            .await?;
2144        if let Some(key) = key.into_cipher() {
2145            key.decrypt_symlink(object_id, &mut link)?;
2146            Ok(link)
2147        } else {
2148            // Locked symlinks are encoded using a hash_code of 0.
2149            let proxy_filename =
2150                fscrypt::proxy_filename::ProxyFilename::new_with_hash_code(0, &link);
2151            let proxy_filename_str: String = proxy_filename.into();
2152            Ok(proxy_filename_str.as_bytes().to_vec())
2153        }
2154    }
2155
2156    /// Returns the link of a symlink object.
2157    pub async fn read_symlink(&self, object_id: u64) -> Result<Vec<u8>, Error> {
2158        match self.tree.find(&ObjectKey::object(object_id)).await? {
2159            None => bail!(FxfsError::NotFound),
2160            Some(Item {
2161                value: ObjectValue::Object { kind: ObjectKind::EncryptedSymlink { link, .. }, .. },
2162                ..
2163            }) => self.read_encrypted_symlink(object_id, link.to_vec()).await,
2164            Some(Item {
2165                value: ObjectValue::Object { kind: ObjectKind::Symlink { link, .. }, .. },
2166                ..
2167            }) => Ok(link.to_vec()),
2168            Some(item) => Err(anyhow!(FxfsError::Inconsistent)
2169                .context(format!("Unexpected item in lookup: {item:?}"))),
2170        }
2171    }
2172
2173    /// Retrieves the wrapped keys for the given object.  The keys *should* be known to exist and it
2174    /// will be considered an inconsistency if they don't.
2175    pub async fn get_keys(&self, object_id: u64) -> Result<EncryptionKeys, Error> {
2176        match self.tree.find(&ObjectKey::keys(object_id)).await?.ok_or(FxfsError::Inconsistent)? {
2177            Item { value: ObjectValue::Keys(keys), .. } => Ok(keys),
2178            _ => Err(anyhow!(FxfsError::Inconsistent).context("open_object: Expected keys")),
2179        }
2180    }
2181
2182    pub async fn update_attributes<'a>(
2183        &self,
2184        transaction: &mut Transaction<'a>,
2185        object_id: u64,
2186        node_attributes: Option<&fio::MutableNodeAttributes>,
2187        change_time: Option<Timestamp>,
2188    ) -> Result<(), Error> {
2189        if change_time.is_none() {
2190            if let Some(attributes) = node_attributes {
2191                let empty_attributes = fio::MutableNodeAttributes { ..Default::default() };
2192                if *attributes == empty_attributes {
2193                    return Ok(());
2194                }
2195            } else {
2196                return Ok(());
2197            }
2198        }
2199        let mut mutation = self.txn_get_object_mutation(transaction, object_id).await?;
2200        if let ObjectValue::Object { ref mut attributes, .. } = mutation.item.value {
2201            if let Some(time) = change_time {
2202                attributes.change_time = time;
2203            }
2204            if let Some(node_attributes) = node_attributes {
2205                if let Some(time) = node_attributes.creation_time {
2206                    attributes.creation_time = Timestamp::from_nanos(time);
2207                }
2208                if let Some(time) = node_attributes.modification_time {
2209                    attributes.modification_time = Timestamp::from_nanos(time);
2210                }
2211                if let Some(time) = node_attributes.access_time {
2212                    attributes.access_time = Timestamp::from_nanos(time);
2213                }
2214                if node_attributes.mode.is_some()
2215                    || node_attributes.uid.is_some()
2216                    || node_attributes.gid.is_some()
2217                    || node_attributes.rdev.is_some()
2218                {
2219                    if let Some(a) = &mut attributes.posix_attributes {
2220                        if let Some(mode) = node_attributes.mode {
2221                            a.mode = mode;
2222                        }
2223                        if let Some(uid) = node_attributes.uid {
2224                            a.uid = uid;
2225                        }
2226                        if let Some(gid) = node_attributes.gid {
2227                            a.gid = gid;
2228                        }
2229                        if let Some(rdev) = node_attributes.rdev {
2230                            a.rdev = rdev;
2231                        }
2232                    } else {
2233                        attributes.posix_attributes = Some(PosixAttributes {
2234                            mode: node_attributes.mode.unwrap_or_default(),
2235                            uid: node_attributes.uid.unwrap_or_default(),
2236                            gid: node_attributes.gid.unwrap_or_default(),
2237                            rdev: node_attributes.rdev.unwrap_or_default(),
2238                        });
2239                    }
2240                }
2241            }
2242        } else {
2243            bail!(
2244                anyhow!(FxfsError::Inconsistent)
2245                    .context("ObjectStore.update_attributes: Expected object value")
2246            );
2247        };
2248        transaction.add(self.store_object_id(), Mutation::ObjectStore(mutation));
2249        Ok(())
2250    }
2251
2252    // Updates and commits the changes to access time in ObjectProperties. The update matches
2253    // Linux's RELATIME. That is, access time is updated to the current time if access time is less
2254    // than or equal to the last modification or status change, or if it has been more than a day
2255    // since the last access.
2256    pub async fn update_access_time(
2257        &self,
2258        object_id: u64,
2259        props: &mut ObjectProperties,
2260    ) -> Result<(), Error> {
2261        let access_time = props.access_time.as_nanos();
2262        let modification_time = props.modification_time.as_nanos();
2263        let change_time = props.change_time.as_nanos();
2264        let now = Timestamp::now();
2265        if access_time <= modification_time
2266            || access_time <= change_time
2267            || access_time
2268                < now.as_nanos()
2269                    - Timestamp::from(std::time::Duration::from_secs(24 * 60 * 60)).as_nanos()
2270        {
2271            let mut transaction = self
2272                .filesystem()
2273                .clone()
2274                .new_transaction(
2275                    lock_keys![LockKey::object(self.store_object_id, object_id,)],
2276                    Options { borrow_metadata_space: true, ..Default::default() },
2277                )
2278                .await?;
2279            self.update_attributes(
2280                &mut transaction,
2281                object_id,
2282                Some(&fio::MutableNodeAttributes {
2283                    access_time: Some(now.as_nanos()),
2284                    ..Default::default()
2285                }),
2286                None,
2287            )
2288            .await?;
2289            transaction.commit().await?;
2290            props.access_time = now;
2291        }
2292        Ok(())
2293    }
2294
2295    async fn write_store_info<'a>(
2296        &'a self,
2297        transaction: &mut Transaction<'a>,
2298        info: &StoreInfo,
2299    ) -> Result<(), Error> {
2300        let mut serialized_info = Vec::new();
2301        info.serialize_with_version(&mut serialized_info)?;
2302        let mut buf = self.device.allocate_buffer(serialized_info.len()).await;
2303        buf.as_mut_slice().copy_from_slice(&serialized_info[..]);
2304        self.store_info_handle.get().unwrap().txn_write(transaction, 0u64, buf.as_ref()).await
2305    }
2306
2307    pub fn mark_deleted(&self) {
2308        *self.lock_state.lock() = LockState::Deleted;
2309    }
2310}
2311
2312#[async_trait]
2313impl JournalingObject for ObjectStore {
2314    fn apply_mutation(
2315        &self,
2316        mutation: Mutation,
2317        context: &ApplyContext<'_, '_>,
2318        _assoc_obj: AssocObj<'_>,
2319    ) -> Result<(), Error> {
2320        match &*self.lock_state.lock() {
2321            LockState::Locked | LockState::Locking => {
2322                ensure!(
2323                    matches!(mutation, Mutation::BeginFlush | Mutation::EndFlush)
2324                        || matches!(
2325                            mutation,
2326                            Mutation::EncryptedObjectStore(_) | Mutation::UpdateMutationsKey(_)
2327                                if context.mode.is_replay()
2328                        ),
2329                    anyhow!(FxfsError::Inconsistent)
2330                        .context(format!("Unexpected mutation for encrypted store: {mutation:?}"))
2331                );
2332            }
2333            LockState::Invalid
2334            | LockState::Unlocking
2335            | LockState::Unencrypted
2336            | LockState::Unlocked { .. }
2337            | LockState::UnlockedReadOnly(..)
2338            | LockState::Deleted => {}
2339            lock_state @ _ => panic!("Unexpected lock state: {lock_state:?}"),
2340        }
2341        match mutation {
2342            Mutation::ObjectStore(ObjectStoreMutation { mut item, op }) => {
2343                item.sequence = context.checkpoint.file_offset;
2344                match op {
2345                    Operation::Insert => {
2346                        // If we are inserting an object record for the first time, it signifies the
2347                        // birth of the object so we need to adjust the object count.
2348                        if matches!(item.value, ObjectValue::Object { .. }) {
2349                            {
2350                                let info = &mut self.store_info.lock();
2351                                let object_count = &mut info.as_mut().unwrap().object_count;
2352                                *object_count = object_count.saturating_add(1);
2353                            }
2354                            if context.mode.is_replay() {
2355                                self.update_last_object_id(item.key.object_id);
2356                            }
2357                        }
2358                        self.tree.insert(item)?;
2359                    }
2360                    Operation::ReplaceOrInsert => {
2361                        self.tree.replace_or_insert(item);
2362                    }
2363                    Operation::Merge => {
2364                        if item.is_tombstone() {
2365                            let info = &mut self.store_info.lock();
2366                            let object_count = &mut info.as_mut().unwrap().object_count;
2367                            *object_count = object_count.saturating_sub(1);
2368                        }
2369                        let lower_bound = item.key.key_for_merge_into();
2370                        self.tree.merge_into(item, &lower_bound);
2371                    }
2372                }
2373            }
2374            Mutation::BeginFlush => {
2375                ensure!(self.parent_store.is_some(), FxfsError::Inconsistent);
2376                self.tree.seal();
2377            }
2378            Mutation::EndFlush => ensure!(self.parent_store.is_some(), FxfsError::Inconsistent),
2379            Mutation::EncryptedObjectStore(_) | Mutation::UpdateMutationsKey(_) => {
2380                // We will process these during Self::unlock.
2381                ensure!(
2382                    !matches!(&*self.lock_state.lock(), LockState::Unencrypted),
2383                    FxfsError::Inconsistent
2384                );
2385            }
2386            Mutation::CreateInternalDir(object_id) => {
2387                ensure!(object_id != INVALID_OBJECT_ID, FxfsError::Inconsistent);
2388                self.store_info.lock().as_mut().unwrap().internal_directory_object_id = object_id;
2389            }
2390            _ => bail!("unexpected mutation: {:?}", mutation),
2391        }
2392        self.counters.lock().mutations_applied += 1;
2393        Ok(())
2394    }
2395
2396    fn drop_mutation(&self, _mutation: Mutation, _transaction: &Transaction<'_>) {
2397        self.counters.lock().mutations_dropped += 1;
2398    }
2399
2400    /// Push all in-memory structures to the device. This is not necessary for sync since the
2401    /// journal will take care of it.  This is supposed to be called when there is either memory or
2402    /// space pressure (flushing the store will persist in-memory data and allow the journal file to
2403    /// be trimmed).
2404    ///
2405    /// Also returns the earliest version of a struct in the filesystem (when known).
2406    async fn flush(&self) -> Result<Version, Error> {
2407        self.flush_with_reason(flush::Reason::Journal).await
2408    }
2409
2410    fn write_mutation(&self, mutation: &Mutation, mut writer: journal::Writer<'_>) {
2411        // Intentionally enumerating all variants to force a decision on any new variants. Encrypt
2412        // all mutations that could affect an encrypted object store contents or the `StoreInfo` of
2413        // the encrypted object store. During `unlock()` any mutations which haven't been encrypted
2414        // won't be replayed after reading `StoreInfo`.
2415        match mutation {
2416            // Whilst CreateInternalDir is a mutation for `StoreInfo`, which isn't encrypted, we
2417            // still choose to encrypt the mutation because it makes it easier to deal with replay.
2418            // When we replay mutations for an encrypted store, the only thing we keep in memory are
2419            // the encrypted mutations; we don't keep `StoreInfo` or changes to it in memory. So, by
2420            // encrypting the CreateInternalDir mutation here, it means we don't have to track both
2421            // encrypted mutations bound for the LSM tree and unencrypted mutations for `StoreInfo`
2422            // to use in `unlock()`. It'll just bundle CreateInternalDir mutations with the other
2423            // encrypted mutations and handled them all in sequence during `unlock()`.
2424            Mutation::ObjectStore(_) | Mutation::CreateInternalDir(_) => {
2425                let mut cipher = self.mutations_cipher.lock();
2426                if let Some(cipher) = cipher.as_mut() {
2427                    // If this is the first time we've used this key, we must write the key out.
2428                    if cipher.offset() == 0 {
2429                        writer.write(Mutation::update_mutations_key(
2430                            self.store_info
2431                                .lock()
2432                                .as_ref()
2433                                .unwrap()
2434                                .mutations_key
2435                                .as_ref()
2436                                .unwrap()
2437                                .clone(),
2438                        ));
2439                    }
2440                    let mut buffer = Vec::new();
2441                    mutation.serialize_into(&mut buffer).unwrap();
2442                    cipher.encrypt(&mut buffer);
2443                    writer.write(Mutation::EncryptedObjectStore(buffer.into()));
2444                    return;
2445                }
2446            }
2447            // `EncryptedObjectStore` and `UpdateMutationsKey` are both obviously associated with
2448            // encrypted object stores, but are either the encrypted mutation data itself or
2449            // metadata governing how the data will be encrypted. They should only be produced here.
2450            Mutation::EncryptedObjectStore(_) | Mutation::UpdateMutationsKey(_) => {
2451                debug_assert!(false, "Only this method should generate encrypted mutations");
2452            }
2453            // `BeginFlush` and `EndFlush` are not needed during `unlock()` and are needed during
2454            // the initial journal replay, so should not be encrypted. `Allocator`, `DeleteVolume`,
2455            // `UpdateBorrowed` mutations are never associated with an encrypted store as we do not
2456            // encrypt the allocator or root/root-parent stores so we can avoid the locking.
2457            Mutation::Allocator(_)
2458            | Mutation::BeginFlush
2459            | Mutation::EndFlush
2460            | Mutation::DeleteVolume
2461            | Mutation::UpdateBorrowed(_) => {}
2462        }
2463        writer.write(mutation.clone());
2464    }
2465}
2466
2467impl HandleOwner for ObjectStore {}
2468
2469impl AsRef<ObjectStore> for ObjectStore {
2470    fn as_ref(&self) -> &ObjectStore {
2471        self
2472    }
2473}
2474
2475fn layer_size_from_encrypted_mutations_size(size: u64) -> u64 {
2476    // This is similar to reserved_space_from_journal_usage. It needs to be a worst case estimate of
2477    // the amount of metadata space that might need to be reserved to allow the encrypted mutations
2478    // to be written to layer files.  It needs to be >= than reservation_amount_from_layer_size will
2479    // return once the data has been written to layer files and <= than
2480    // reserved_space_from_journal_usage would use.  We can't just use
2481    // reserved_space_from_journal_usage because the encrypted mutations file includes some extra
2482    // data (it includes the checkpoints) that isn't written in the same way to the journal.
2483    size * 3
2484}
2485
2486impl AssociatedObject for ObjectStore {}
2487
2488/// Argument to the trim_some method.
2489#[derive(Debug)]
2490pub enum TrimMode {
2491    /// Trim extents beyond the current size.
2492    UseSize,
2493
2494    /// Trim extents beyond the supplied offset.
2495    FromOffset(u64),
2496
2497    /// Remove the object (or attribute) from the store once it is fully trimmed.
2498    Tombstone(TombstoneMode),
2499}
2500
2501/// Sets the mode for tombstoning (either at the object or attribute level).
2502#[derive(Debug)]
2503pub enum TombstoneMode {
2504    Object,
2505    Attribute,
2506}
2507
2508/// Result of the trim_some method.
2509#[derive(Debug)]
2510pub enum TrimResult {
2511    /// We reached the limit of the transaction and more extents might follow.
2512    Incomplete,
2513
2514    /// We finished this attribute.  Returns the ID of the next attribute for the same object if
2515    /// there is one.
2516    Done(Option<u64>),
2517}
2518
2519/// Loads store info.
2520pub async fn load_store_info(
2521    parent: &Arc<ObjectStore>,
2522    store_object_id: u64,
2523) -> Result<StoreInfo, Error> {
2524    let handle =
2525        ObjectStore::open_object(parent, store_object_id, HandleOptions::default(), None).await?;
2526
2527    Ok(if handle.get_size() > 0 {
2528        let serialized_info = handle.contents(MAX_STORE_INFO_SERIALIZED_SIZE).await?;
2529        let mut cursor = std::io::Cursor::new(serialized_info);
2530        let (store_info, _) = StoreInfo::deserialize_with_version(&mut cursor)
2531            .context("Failed to deserialize StoreInfo")?;
2532        store_info
2533    } else {
2534        // The store_info will be absent for a newly created and empty object store.
2535        StoreInfo::default()
2536    })
2537}
2538
2539#[cfg(test)]
2540mod tests {
2541    use super::{
2542        DEFAULT_DATA_ATTRIBUTE_ID, FSVERITY_MERKLE_ATTRIBUTE_ID, FsverityMetadata, HandleOptions,
2543        LastObjectId, LockKey, MAX_STORE_INFO_SERIALIZED_SIZE, Mutation, NO_OWNER,
2544        NewChildStoreOptions, OBJECT_ID_HI_MASK, ObjectStore, RootDigest, StoreInfo, StoreOptions,
2545        StoreOwner,
2546    };
2547    use crate::errors::FxfsError;
2548    use crate::filesystem::{FxFilesystem, JournalingObject, OpenFxFilesystem, SyncOptions};
2549    use crate::fsck::fsck;
2550    use crate::lsm_tree::Query;
2551    use crate::lsm_tree::types::{ItemRef, LayerIterator};
2552    use crate::object_handle::{
2553        INVALID_OBJECT_ID, ObjectHandle, ReadObjectHandle, WriteObjectHandle,
2554    };
2555    use crate::object_store::directory::Directory;
2556    use crate::object_store::object_record::{AttributeKey, ObjectKey, ObjectKind, ObjectValue};
2557    use crate::object_store::transaction::{Options, lock_keys};
2558    use crate::object_store::volume::root_volume;
2559    use crate::serialized_types::VersionedLatest;
2560    use crate::testing;
2561    use assert_matches::assert_matches;
2562    use async_trait::async_trait;
2563    use fuchsia_async as fasync;
2564    use fuchsia_sync::Mutex;
2565    use futures::join;
2566    use fxfs_crypto::ff1::Ff1;
2567    use fxfs_crypto::{
2568        Crypt, FXFS_KEY_SIZE, FXFS_WRAPPED_KEY_SIZE, FxfsKey, UnwrappedKey, WrappedKeyBytes,
2569    };
2570    use fxfs_insecure_crypto::new_insecure_crypt;
2571
2572    use std::sync::Arc;
2573    use std::time::Duration;
2574    use storage_device::DeviceHolder;
2575    use storage_device::fake_device::FakeDevice;
2576
2577    const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
2578
2579    async fn test_filesystem() -> OpenFxFilesystem {
2580        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
2581        FxFilesystem::new_empty(device).await.expect("new_empty failed")
2582    }
2583
2584    #[fuchsia::test]
2585    async fn test_item_sequences() {
2586        let fs = test_filesystem().await;
2587        let object1;
2588        let object2;
2589        let object3;
2590        let mut transaction = fs
2591            .clone()
2592            .new_transaction(lock_keys![], Options::default())
2593            .await
2594            .expect("new_transaction failed");
2595        let store = fs.root_store();
2596        object1 = Arc::new(
2597            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2598                .await
2599                .expect("create_object failed"),
2600        );
2601        transaction.commit().await.expect("commit failed");
2602        let mut transaction = fs
2603            .clone()
2604            .new_transaction(lock_keys![], Options::default())
2605            .await
2606            .expect("new_transaction failed");
2607        object2 = Arc::new(
2608            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2609                .await
2610                .expect("create_object failed"),
2611        );
2612        transaction.commit().await.expect("commit failed");
2613
2614        fs.sync(SyncOptions::default()).await.expect("sync failed");
2615
2616        let mut transaction = fs
2617            .clone()
2618            .new_transaction(lock_keys![], Options::default())
2619            .await
2620            .expect("new_transaction failed");
2621        object3 = Arc::new(
2622            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2623                .await
2624                .expect("create_object failed"),
2625        );
2626        transaction.commit().await.expect("commit failed");
2627
2628        let layer_set = store.tree.layer_set();
2629        let mut merger = layer_set.merger();
2630        let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
2631        let mut sequences = [0u64; 3];
2632        while let Some(ItemRef { key: ObjectKey { object_id, .. }, sequence, .. }) = iter.get() {
2633            if *object_id == object1.object_id() {
2634                sequences[0] = sequence;
2635            } else if *object_id == object2.object_id() {
2636                sequences[1] = sequence;
2637            } else if *object_id == object3.object_id() {
2638                sequences[2] = sequence;
2639            }
2640            iter.advance().await.expect("advance failed");
2641        }
2642
2643        assert!(sequences[0] <= sequences[1], "sequences: {:?}", sequences);
2644        // The last item came after a sync, so should be strictly greater.
2645        assert!(sequences[1] < sequences[2], "sequences: {:?}", sequences);
2646        fs.close().await.expect("Close failed");
2647    }
2648
2649    #[fuchsia::test]
2650    async fn test_verified_file_with_verified_attribute() {
2651        let fs: OpenFxFilesystem = test_filesystem().await;
2652        let mut transaction = fs
2653            .clone()
2654            .new_transaction(lock_keys![], Options::default())
2655            .await
2656            .expect("new_transaction failed");
2657        let store = fs.root_store();
2658        let object = Arc::new(
2659            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2660                .await
2661                .expect("create_object failed"),
2662        );
2663
2664        transaction.add(
2665            store.store_object_id(),
2666            Mutation::replace_or_insert_object(
2667                ObjectKey::attribute(
2668                    object.object_id(),
2669                    DEFAULT_DATA_ATTRIBUTE_ID,
2670                    AttributeKey::Attribute,
2671                ),
2672                ObjectValue::verified_attribute(
2673                    0,
2674                    FsverityMetadata::Internal(RootDigest::Sha256([0; 32]), vec![]),
2675                ),
2676            ),
2677        );
2678
2679        transaction.add(
2680            store.store_object_id(),
2681            Mutation::replace_or_insert_object(
2682                ObjectKey::attribute(
2683                    object.object_id(),
2684                    FSVERITY_MERKLE_ATTRIBUTE_ID,
2685                    AttributeKey::Attribute,
2686                ),
2687                ObjectValue::attribute(0, false),
2688            ),
2689        );
2690
2691        transaction.commit().await.unwrap();
2692
2693        let handle =
2694            ObjectStore::open_object(&store, object.object_id(), HandleOptions::default(), None)
2695                .await
2696                .expect("open_object failed");
2697
2698        assert!(handle.is_verified_file());
2699
2700        fs.close().await.expect("Close failed");
2701    }
2702
2703    #[fuchsia::test]
2704    async fn test_verified_file_without_verified_attribute() {
2705        let fs: OpenFxFilesystem = test_filesystem().await;
2706        let mut transaction = fs
2707            .clone()
2708            .new_transaction(lock_keys![], Options::default())
2709            .await
2710            .expect("new_transaction failed");
2711        let store = fs.root_store();
2712        let object = Arc::new(
2713            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2714                .await
2715                .expect("create_object failed"),
2716        );
2717
2718        transaction.commit().await.unwrap();
2719
2720        let handle =
2721            ObjectStore::open_object(&store, object.object_id(), HandleOptions::default(), None)
2722                .await
2723                .expect("open_object failed");
2724
2725        assert!(!handle.is_verified_file());
2726
2727        fs.close().await.expect("Close failed");
2728    }
2729
2730    #[fuchsia::test]
2731    async fn test_create_and_open_store() {
2732        let fs = test_filesystem().await;
2733        let store_id = {
2734            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
2735            root_volume
2736                .new_volume(
2737                    "test",
2738                    NewChildStoreOptions {
2739                        options: StoreOptions {
2740                            owner: NO_OWNER,
2741                            crypt: Some(Arc::new(new_insecure_crypt())),
2742                        },
2743                        ..Default::default()
2744                    },
2745                )
2746                .await
2747                .expect("new_volume failed")
2748                .store_object_id()
2749        };
2750
2751        fs.close().await.expect("close failed");
2752        let device = fs.take_device().await;
2753        device.reopen(false);
2754        let fs = FxFilesystem::open(device).await.expect("open failed");
2755
2756        {
2757            let store = fs.object_manager().store(store_id).expect("store not found");
2758            store.unlock(NO_OWNER, Arc::new(new_insecure_crypt())).await.expect("unlock failed");
2759        }
2760        fs.close().await.expect("Close failed");
2761    }
2762
2763    #[fuchsia::test]
2764    async fn test_create_and_open_internal_dir() {
2765        let fs = test_filesystem().await;
2766        let dir_id;
2767        let store_id;
2768        {
2769            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
2770            let store = root_volume
2771                .new_volume(
2772                    "test",
2773                    NewChildStoreOptions {
2774                        options: StoreOptions {
2775                            owner: NO_OWNER,
2776                            crypt: Some(Arc::new(new_insecure_crypt())),
2777                        },
2778                        ..Default::default()
2779                    },
2780                )
2781                .await
2782                .expect("new_volume failed");
2783            dir_id =
2784                store.get_or_create_internal_directory_id().await.expect("Create internal dir");
2785            store_id = store.store_object_id();
2786        }
2787
2788        fs.close().await.expect("close failed");
2789        let device = fs.take_device().await;
2790        device.reopen(false);
2791        let fs = FxFilesystem::open(device).await.expect("open failed");
2792
2793        {
2794            let store = fs.object_manager().store(store_id).expect("store not found");
2795            store.unlock(NO_OWNER, Arc::new(new_insecure_crypt())).await.expect("unlock failed");
2796            assert_eq!(
2797                dir_id,
2798                store.get_or_create_internal_directory_id().await.expect("Retrieving dir")
2799            );
2800            let obj = store
2801                .tree()
2802                .find(&ObjectKey::object(dir_id))
2803                .await
2804                .expect("Searching tree for dir")
2805                .unwrap();
2806            assert_matches!(
2807                obj.value,
2808                ObjectValue::Object { kind: ObjectKind::Directory { .. }, .. }
2809            );
2810        }
2811        fs.close().await.expect("Close failed");
2812    }
2813
2814    #[fuchsia::test]
2815    async fn test_create_and_open_internal_dir_unencrypted() {
2816        let fs = test_filesystem().await;
2817        let dir_id;
2818        let store_id;
2819        {
2820            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
2821            let store = root_volume
2822                .new_volume("test", NewChildStoreOptions::default())
2823                .await
2824                .expect("new_volume failed");
2825            dir_id =
2826                store.get_or_create_internal_directory_id().await.expect("Create internal dir");
2827            store_id = store.store_object_id();
2828        }
2829
2830        fs.close().await.expect("close failed");
2831        let device = fs.take_device().await;
2832        device.reopen(false);
2833        let fs = FxFilesystem::open(device).await.expect("open failed");
2834
2835        {
2836            let store = fs.object_manager().store(store_id).expect("store not found");
2837            assert_eq!(
2838                dir_id,
2839                store.get_or_create_internal_directory_id().await.expect("Retrieving dir")
2840            );
2841            let obj = store
2842                .tree()
2843                .find(&ObjectKey::object(dir_id))
2844                .await
2845                .expect("Searching tree for dir")
2846                .unwrap();
2847            assert_matches!(
2848                obj.value,
2849                ObjectValue::Object { kind: ObjectKind::Directory { .. }, .. }
2850            );
2851        }
2852        fs.close().await.expect("Close failed");
2853    }
2854
2855    #[fuchsia::test(threads = 10)]
2856    async fn test_old_layers_are_purged() {
2857        let fs = test_filesystem().await;
2858
2859        let store = fs.root_store();
2860        let mut transaction = fs
2861            .clone()
2862            .new_transaction(lock_keys![], Options::default())
2863            .await
2864            .expect("new_transaction failed");
2865        let object = Arc::new(
2866            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2867                .await
2868                .expect("create_object failed"),
2869        );
2870        transaction.commit().await.expect("commit failed");
2871
2872        store.flush().await.expect("flush failed");
2873
2874        let mut buf = object.allocate_buffer(5).await;
2875        buf.as_mut_slice().copy_from_slice(b"hello");
2876        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2877
2878        // Getting the layer-set should cause the flush to stall.
2879        let layer_set = store.tree().layer_set();
2880
2881        let done = Mutex::new(false);
2882        let mut object_id = 0;
2883
2884        join!(
2885            async {
2886                store.flush().await.expect("flush failed");
2887                assert!(*done.lock());
2888            },
2889            async {
2890                // This is a halting problem so all we can do is sleep.
2891                fasync::Timer::new(Duration::from_secs(1)).await;
2892                *done.lock() = true;
2893                object_id = layer_set.layers.last().unwrap().handle().unwrap().object_id();
2894                std::mem::drop(layer_set);
2895            }
2896        );
2897
2898        if let Err(e) = ObjectStore::open_object(
2899            &store.parent_store.as_ref().unwrap(),
2900            object_id,
2901            HandleOptions::default(),
2902            store.crypt(),
2903        )
2904        .await
2905        {
2906            assert!(FxfsError::NotFound.matches(&e));
2907        } else {
2908            panic!("open_object succeeded");
2909        }
2910    }
2911
2912    #[fuchsia::test]
2913    async fn test_tombstone_deletes_data() {
2914        let fs = test_filesystem().await;
2915        let root_store = fs.root_store();
2916        let child_id = {
2917            let mut transaction = fs
2918                .clone()
2919                .new_transaction(lock_keys![], Options::default())
2920                .await
2921                .expect("new_transaction failed");
2922            let child = ObjectStore::create_object(
2923                &root_store,
2924                &mut transaction,
2925                HandleOptions::default(),
2926                None,
2927            )
2928            .await
2929            .expect("create_object failed");
2930            transaction.commit().await.expect("commit failed");
2931
2932            // Allocate an extent in the file.
2933            let mut buffer = child.allocate_buffer(8192).await;
2934            buffer.as_mut_slice().fill(0xaa);
2935            child.write_or_append(Some(0), buffer.as_ref()).await.expect("write failed");
2936
2937            child.object_id()
2938        };
2939
2940        root_store.tombstone_object(child_id, Options::default()).await.expect("tombstone failed");
2941
2942        // Let fsck check allocations.
2943        fsck(fs.clone()).await.expect("fsck failed");
2944    }
2945
2946    #[fuchsia::test]
2947    async fn test_tombstone_purges_keys() {
2948        let fs = test_filesystem().await;
2949        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
2950        let store = root_volume
2951            .new_volume(
2952                "test",
2953                NewChildStoreOptions {
2954                    options: StoreOptions {
2955                        crypt: Some(Arc::new(new_insecure_crypt())),
2956                        ..StoreOptions::default()
2957                    },
2958                    ..NewChildStoreOptions::default()
2959                },
2960            )
2961            .await
2962            .expect("new_volume failed");
2963        let mut transaction = fs
2964            .clone()
2965            .new_transaction(lock_keys![], Options::default())
2966            .await
2967            .expect("new_transaction failed");
2968        let child =
2969            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2970                .await
2971                .expect("create_object failed");
2972        transaction.commit().await.expect("commit failed");
2973        assert!(store.key_manager.get(child.object_id()).await.unwrap().is_some());
2974        store
2975            .tombstone_object(child.object_id(), Options::default())
2976            .await
2977            .expect("tombstone_object failed");
2978        assert!(store.key_manager.get(child.object_id()).await.unwrap().is_none());
2979        fs.close().await.expect("close failed");
2980    }
2981
2982    #[fuchsia::test]
2983    async fn test_major_compaction_discards_unnecessary_records() {
2984        let fs = test_filesystem().await;
2985        let root_store = fs.root_store();
2986        let child_id = {
2987            let mut transaction = fs
2988                .clone()
2989                .new_transaction(lock_keys![], Options::default())
2990                .await
2991                .expect("new_transaction failed");
2992            let child = ObjectStore::create_object(
2993                &root_store,
2994                &mut transaction,
2995                HandleOptions::default(),
2996                None,
2997            )
2998            .await
2999            .expect("create_object failed");
3000            transaction.commit().await.expect("commit failed");
3001
3002            // Allocate an extent in the file.
3003            let mut buffer = child.allocate_buffer(8192).await;
3004            buffer.as_mut_slice().fill(0xaa);
3005            child.write_or_append(Some(0), buffer.as_ref()).await.expect("write failed");
3006
3007            child.object_id()
3008        };
3009
3010        root_store.tombstone_object(child_id, Options::default()).await.expect("tombstone failed");
3011        {
3012            let layers = root_store.tree.layer_set();
3013            let mut merger = layers.merger();
3014            let iter = merger
3015                .query(Query::FullRange(&ObjectKey::object(child_id)))
3016                .await
3017                .expect("seek failed");
3018            // Find at least one object still in the tree.
3019            match iter.get() {
3020                Some(ItemRef { key: ObjectKey { object_id, .. }, .. })
3021                    if *object_id == child_id => {}
3022                _ => panic!("Objects should still be in the tree."),
3023            }
3024        }
3025        root_store.flush().await.expect("flush failed");
3026
3027        // There should be no records for the object.
3028        let layers = root_store.tree.layer_set();
3029        let mut merger = layers.merger();
3030        let iter = merger
3031            .query(Query::FullRange(&ObjectKey::object(child_id)))
3032            .await
3033            .expect("seek failed");
3034        match iter.get() {
3035            None => {}
3036            Some(ItemRef { key: ObjectKey { object_id, .. }, .. }) => {
3037                assert_ne!(*object_id, child_id)
3038            }
3039        }
3040    }
3041
3042    #[fuchsia::test]
3043    async fn test_overlapping_extents_in_different_layers() {
3044        let fs = test_filesystem().await;
3045        let store = fs.root_store();
3046
3047        let mut transaction = fs
3048            .clone()
3049            .new_transaction(
3050                lock_keys![LockKey::object(
3051                    store.store_object_id(),
3052                    store.root_directory_object_id()
3053                )],
3054                Options::default(),
3055            )
3056            .await
3057            .expect("new_transaction failed");
3058        let root_directory =
3059            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
3060        let object = root_directory
3061            .create_child_file(&mut transaction, "test")
3062            .await
3063            .expect("create_child_file failed");
3064        transaction.commit().await.expect("commit failed");
3065
3066        let buf = object.allocate_buffer(16384).await;
3067        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3068
3069        store.flush().await.expect("flush failed");
3070
3071        object.write_or_append(Some(0), buf.subslice(0..4096)).await.expect("write failed");
3072
3073        // At this point, we should have an extent for 0..16384 in a layer that has been flushed,
3074        // and an extent for 0..4096 that partially overwrites it.  Writing to 0..16384 should
3075        // overwrite both of those extents.
3076        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3077
3078        fsck(fs.clone()).await.expect("fsck failed");
3079    }
3080
3081    #[fuchsia::test(threads = 10)]
3082    async fn test_encrypted_mutations() {
3083        async fn one_iteration(
3084            fs: OpenFxFilesystem,
3085            crypt: Arc<dyn Crypt>,
3086            iteration: u64,
3087        ) -> OpenFxFilesystem {
3088            async fn reopen(fs: OpenFxFilesystem) -> OpenFxFilesystem {
3089                fs.close().await.expect("Close failed");
3090                let device = fs.take_device().await;
3091                device.reopen(false);
3092                FxFilesystem::open(device).await.expect("FS open failed")
3093            }
3094
3095            let fs = reopen(fs).await;
3096
3097            let (store_object_id, object_id) = {
3098                let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3099                let store = root_volume
3100                    .volume(
3101                        "test",
3102                        StoreOptions { crypt: Some(crypt.clone()), ..StoreOptions::default() },
3103                    )
3104                    .await
3105                    .expect("volume failed");
3106
3107                let mut transaction = fs
3108                    .clone()
3109                    .new_transaction(
3110                        lock_keys![LockKey::object(
3111                            store.store_object_id(),
3112                            store.root_directory_object_id(),
3113                        )],
3114                        Options::default(),
3115                    )
3116                    .await
3117                    .expect("new_transaction failed");
3118                let root_directory = Directory::open(&store, store.root_directory_object_id())
3119                    .await
3120                    .expect("open failed");
3121                let object = root_directory
3122                    .create_child_file(&mut transaction, &format!("test {}", iteration))
3123                    .await
3124                    .expect("create_child_file failed");
3125                transaction.commit().await.expect("commit failed");
3126
3127                let mut buf = object.allocate_buffer(1000).await;
3128                for i in 0..buf.len() {
3129                    buf.as_mut_slice()[i] = i as u8;
3130                }
3131                object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3132
3133                (store.store_object_id(), object.object_id())
3134            };
3135
3136            let fs = reopen(fs).await;
3137
3138            let check_object = |fs: Arc<FxFilesystem>| {
3139                let crypt = crypt.clone();
3140                async move {
3141                    let root_volume = root_volume(fs).await.expect("root_volume failed");
3142                    let volume = root_volume
3143                        .volume(
3144                            "test",
3145                            StoreOptions { crypt: Some(crypt), ..StoreOptions::default() },
3146                        )
3147                        .await
3148                        .expect("volume failed");
3149
3150                    let object = ObjectStore::open_object(
3151                        &volume,
3152                        object_id,
3153                        HandleOptions::default(),
3154                        None,
3155                    )
3156                    .await
3157                    .expect("open_object failed");
3158                    let mut buf = object.allocate_buffer(1000).await;
3159                    assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), 1000);
3160                    for i in 0..buf.len() {
3161                        assert_eq!(buf.as_slice()[i], i as u8);
3162                    }
3163                }
3164            };
3165
3166            check_object(fs.clone()).await;
3167
3168            let fs = reopen(fs).await;
3169
3170            // At this point the "test" volume is locked.  Before checking the object, flush the
3171            // filesystem.  This should leave a file with encrypted mutations.
3172            fs.object_manager().flush().await.expect("flush failed");
3173
3174            assert_ne!(
3175                fs.object_manager()
3176                    .store(store_object_id)
3177                    .unwrap()
3178                    .load_store_info()
3179                    .await
3180                    .expect("load_store_info failed")
3181                    .encrypted_mutations_object_id,
3182                INVALID_OBJECT_ID
3183            );
3184
3185            check_object(fs.clone()).await;
3186
3187            // Checking the object should have triggered a flush and so now there should be no
3188            // encrypted mutations object.
3189            assert_eq!(
3190                fs.object_manager()
3191                    .store(store_object_id)
3192                    .unwrap()
3193                    .load_store_info()
3194                    .await
3195                    .expect("load_store_info failed")
3196                    .encrypted_mutations_object_id,
3197                INVALID_OBJECT_ID
3198            );
3199
3200            let fs = reopen(fs).await;
3201
3202            fsck(fs.clone()).await.expect("fsck failed");
3203
3204            let fs = reopen(fs).await;
3205
3206            check_object(fs.clone()).await;
3207
3208            fs
3209        }
3210
3211        let mut fs = test_filesystem().await;
3212        let crypt = Arc::new(new_insecure_crypt());
3213
3214        {
3215            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3216            let _store = root_volume
3217                .new_volume(
3218                    "test",
3219                    NewChildStoreOptions {
3220                        options: StoreOptions {
3221                            crypt: Some(crypt.clone()),
3222                            ..StoreOptions::default()
3223                        },
3224                        ..Default::default()
3225                    },
3226                )
3227                .await
3228                .expect("new_volume failed");
3229        }
3230
3231        // Run a few iterations so that we test changes with the stream cipher offset.
3232        for i in 0..5 {
3233            fs = one_iteration(fs, crypt.clone(), i).await;
3234        }
3235    }
3236
3237    #[fuchsia::test(threads = 10)]
3238    async fn test_object_id_cipher_roll() {
3239        let fs = test_filesystem().await;
3240        let crypt = Arc::new(new_insecure_crypt());
3241
3242        {
3243            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3244            let store = root_volume
3245                .new_volume(
3246                    "test",
3247                    NewChildStoreOptions {
3248                        options: StoreOptions {
3249                            crypt: Some(crypt.clone()),
3250                            ..StoreOptions::default()
3251                        },
3252                        ..Default::default()
3253                    },
3254                )
3255                .await
3256                .expect("new_volume failed");
3257
3258            let store_info = store.store_info().unwrap();
3259
3260            // Hack the last object ID to force a roll of the object ID cipher.
3261            {
3262                let mut last_object_id = store.last_object_id.lock();
3263                assert_eq!(last_object_id.id & OBJECT_ID_HI_MASK, 0);
3264                last_object_id.id |= 0xffffffff;
3265            }
3266
3267            let mut transaction = fs
3268                .clone()
3269                .new_transaction(
3270                    lock_keys![LockKey::object(
3271                        store.store_object_id(),
3272                        store.root_directory_object_id()
3273                    )],
3274                    Options::default(),
3275                )
3276                .await
3277                .expect("new_transaction failed");
3278            let root_directory = Directory::open(&store, store.root_directory_object_id())
3279                .await
3280                .expect("open failed");
3281            let object = root_directory
3282                .create_child_file(&mut transaction, "test")
3283                .await
3284                .expect("create_child_file failed");
3285            transaction.commit().await.expect("commit failed");
3286
3287            assert_eq!(object.object_id() & OBJECT_ID_HI_MASK, 1u64 << 32);
3288
3289            // Check that the key has been changed.
3290            assert_ne!(store.store_info().unwrap().object_id_key, store_info.object_id_key);
3291
3292            assert_eq!(store.last_object_id.lock().id, 1u64 << 32);
3293        };
3294
3295        fs.close().await.expect("Close failed");
3296        let device = fs.take_device().await;
3297        device.reopen(false);
3298        let fs = FxFilesystem::open(device).await.expect("open failed");
3299        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3300        let store = root_volume
3301            .volume("test", StoreOptions { crypt: Some(crypt.clone()), ..StoreOptions::default() })
3302            .await
3303            .expect("volume failed");
3304
3305        assert_eq!(store.last_object_id.lock().id, 1u64 << 32);
3306    }
3307
3308    #[fuchsia::test]
3309    async fn test_object_id_no_roll_for_unencrypted_store() {
3310        let fs = test_filesystem().await;
3311
3312        {
3313            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3314            let store = root_volume
3315                .new_volume("test", NewChildStoreOptions::default())
3316                .await
3317                .expect("new_volume failed");
3318
3319            // Hack the last object ID.
3320            {
3321                let mut last_object_id = store.last_object_id.lock();
3322                assert_eq!(last_object_id.id & OBJECT_ID_HI_MASK, 0);
3323                last_object_id.id |= 0xffffffff;
3324            }
3325
3326            let mut transaction = fs
3327                .clone()
3328                .new_transaction(
3329                    lock_keys![LockKey::object(
3330                        store.store_object_id(),
3331                        store.root_directory_object_id()
3332                    )],
3333                    Options::default(),
3334                )
3335                .await
3336                .expect("new_transaction failed");
3337            let root_directory = Directory::open(&store, store.root_directory_object_id())
3338                .await
3339                .expect("open failed");
3340            let object = root_directory
3341                .create_child_file(&mut transaction, "test")
3342                .await
3343                .expect("create_child_file failed");
3344            transaction.commit().await.expect("commit failed");
3345
3346            assert_eq!(object.object_id(), 0x1_0000_0000);
3347
3348            // Check that there is still no key.
3349            assert!(store.store_info().unwrap().object_id_key.is_none());
3350
3351            assert_eq!(store.last_object_id.lock().id, 0x1_0000_0000);
3352        };
3353
3354        fs.close().await.expect("Close failed");
3355        let device = fs.take_device().await;
3356        device.reopen(false);
3357        let fs = FxFilesystem::open(device).await.expect("open failed");
3358        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3359        let store =
3360            root_volume.volume("test", StoreOptions::default()).await.expect("volume failed");
3361
3362        assert_eq!(store.last_object_id.lock().id, 0x1_0000_0000);
3363    }
3364
3365    #[fuchsia::test]
3366    fn test_object_id_is_not_invalid_object_id() {
3367        let key = UnwrappedKey::new(vec![0; FXFS_KEY_SIZE]);
3368        // 1106634048 results in INVALID_OBJECT_ID with this key.
3369        let mut last_object_id = LastObjectId { id: 1106634047, cipher: Some(Ff1::new(&key)) };
3370        assert_ne!(last_object_id.get_next_object_id(), INVALID_OBJECT_ID);
3371        assert_ne!(last_object_id.get_next_object_id(), INVALID_OBJECT_ID);
3372    }
3373
3374    #[fuchsia::test]
3375    async fn test_last_object_id_is_correct_after_unlock() {
3376        let fs = test_filesystem().await;
3377        let crypt = Arc::new(new_insecure_crypt());
3378
3379        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3380        let store = root_volume
3381            .new_volume(
3382                "test",
3383                NewChildStoreOptions {
3384                    options: StoreOptions { crypt: Some(crypt.clone()), ..StoreOptions::default() },
3385                    ..Default::default()
3386                },
3387            )
3388            .await
3389            .expect("new_volume failed");
3390
3391        let mut transaction = fs
3392            .clone()
3393            .new_transaction(
3394                lock_keys![LockKey::object(
3395                    store.store_object_id(),
3396                    store.root_directory_object_id()
3397                )],
3398                Options::default(),
3399            )
3400            .await
3401            .expect("new_transaction failed");
3402        let root_directory =
3403            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
3404        root_directory
3405            .create_child_file(&mut transaction, "test")
3406            .await
3407            .expect("create_child_file failed");
3408        transaction.commit().await.expect("commit failed");
3409
3410        // Compact so that StoreInfo is written.
3411        fs.journal().compact().await.unwrap();
3412
3413        let last_object_id = store.last_object_id.lock().id;
3414
3415        store.lock().await.unwrap();
3416        store.unlock(NO_OWNER, crypt.clone()).await.unwrap();
3417
3418        assert_eq!(store.last_object_id.lock().id, last_object_id);
3419    }
3420
3421    #[fuchsia::test(threads = 20)]
3422    async fn test_race_when_rolling_last_object_id_cipher() {
3423        // NOTE: This test is trying to test a race, so if it fails, it might be flaky.
3424
3425        const NUM_THREADS: usize = 20;
3426
3427        let fs = test_filesystem().await;
3428        let crypt = Arc::new(new_insecure_crypt());
3429
3430        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3431        let store = root_volume
3432            .new_volume(
3433                "test",
3434                NewChildStoreOptions {
3435                    options: StoreOptions { crypt: Some(crypt.clone()), ..StoreOptions::default() },
3436                    ..Default::default()
3437                },
3438            )
3439            .await
3440            .expect("new_volume failed");
3441
3442        let store_id = store.store_object_id();
3443        let root_dir_id = store.root_directory_object_id();
3444
3445        let root_directory =
3446            Arc::new(Directory::open(&store, root_dir_id).await.expect("open failed"));
3447
3448        // Create directories.
3449        let mut directories = Vec::new();
3450        for _ in 0..NUM_THREADS {
3451            let mut transaction = fs
3452                .clone()
3453                .new_transaction(
3454                    lock_keys![LockKey::object(store_id, root_dir_id,)],
3455                    Options::default(),
3456                )
3457                .await
3458                .expect("new_transaction failed");
3459            directories.push(
3460                root_directory
3461                    .create_child_dir(&mut transaction, "test")
3462                    .await
3463                    .expect("create_child_file failed"),
3464            );
3465            transaction.commit().await.expect("commit failed");
3466        }
3467
3468        // Hack the last object ID so that the next ID will require a roll.
3469        store.last_object_id.lock().id |= 0xffff_ffff;
3470
3471        let scope = fasync::Scope::new();
3472
3473        let _executor_tasks = testing::force_executor_threads_to_run(NUM_THREADS).await;
3474
3475        for dir in directories {
3476            let fs = fs.clone();
3477            scope.spawn(async move {
3478                let mut transaction = fs
3479                    .clone()
3480                    .new_transaction(
3481                        lock_keys![LockKey::object(store_id, dir.object_id(),)],
3482                        Options::default(),
3483                    )
3484                    .await
3485                    .expect("new_transaction failed");
3486                dir.create_child_file(&mut transaction, "test")
3487                    .await
3488                    .expect("create_child_file failed");
3489                transaction.commit().await.expect("commit failed");
3490            });
3491        }
3492
3493        scope.on_no_tasks().await;
3494
3495        assert_eq!(store.last_object_id.lock().id, 0x1_0000_0000 + NUM_THREADS as u64 - 1);
3496    }
3497
3498    #[fuchsia::test(threads = 10)]
3499    async fn test_lock_store() {
3500        let fs = test_filesystem().await;
3501        let crypt = Arc::new(new_insecure_crypt());
3502
3503        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3504        let store = root_volume
3505            .new_volume(
3506                "test",
3507                NewChildStoreOptions {
3508                    options: StoreOptions { crypt: Some(crypt.clone()), ..StoreOptions::default() },
3509                    ..NewChildStoreOptions::default()
3510                },
3511            )
3512            .await
3513            .expect("new_volume failed");
3514        let mut transaction = fs
3515            .clone()
3516            .new_transaction(
3517                lock_keys![LockKey::object(
3518                    store.store_object_id(),
3519                    store.root_directory_object_id()
3520                )],
3521                Options::default(),
3522            )
3523            .await
3524            .expect("new_transaction failed");
3525        let root_directory =
3526            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
3527        root_directory
3528            .create_child_file(&mut transaction, "test")
3529            .await
3530            .expect("create_child_file failed");
3531        transaction.commit().await.expect("commit failed");
3532        store.lock().await.expect("lock failed");
3533
3534        store.unlock(NO_OWNER, crypt).await.expect("unlock failed");
3535        root_directory.lookup("test").await.expect("lookup failed").expect("not found");
3536    }
3537
3538    #[fuchsia::test(threads = 10)]
3539    async fn test_unlock_read_only() {
3540        let fs = test_filesystem().await;
3541        let crypt = Arc::new(new_insecure_crypt());
3542
3543        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3544        let store = root_volume
3545            .new_volume(
3546                "test",
3547                NewChildStoreOptions {
3548                    options: StoreOptions { crypt: Some(crypt.clone()), ..StoreOptions::default() },
3549                    ..NewChildStoreOptions::default()
3550                },
3551            )
3552            .await
3553            .expect("new_volume failed");
3554        let mut transaction = fs
3555            .clone()
3556            .new_transaction(
3557                lock_keys![LockKey::object(
3558                    store.store_object_id(),
3559                    store.root_directory_object_id()
3560                )],
3561                Options::default(),
3562            )
3563            .await
3564            .expect("new_transaction failed");
3565        let root_directory =
3566            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
3567        root_directory
3568            .create_child_file(&mut transaction, "test")
3569            .await
3570            .expect("create_child_file failed");
3571        transaction.commit().await.expect("commit failed");
3572        store.lock().await.expect("lock failed");
3573
3574        store.unlock_read_only(crypt.clone()).await.expect("unlock failed");
3575        root_directory.lookup("test").await.expect("lookup failed").expect("not found");
3576        store.lock_read_only();
3577        store.unlock_read_only(crypt).await.expect("unlock failed");
3578        root_directory.lookup("test").await.expect("lookup failed").expect("not found");
3579    }
3580
3581    #[fuchsia::test(threads = 10)]
3582    async fn test_key_rolled_when_unlocked() {
3583        let fs = test_filesystem().await;
3584        let crypt = Arc::new(new_insecure_crypt());
3585
3586        let object_id;
3587        {
3588            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3589            let store = root_volume
3590                .new_volume(
3591                    "test",
3592                    NewChildStoreOptions {
3593                        options: StoreOptions {
3594                            crypt: Some(crypt.clone()),
3595                            ..StoreOptions::default()
3596                        },
3597                        ..Default::default()
3598                    },
3599                )
3600                .await
3601                .expect("new_volume failed");
3602            let mut transaction = fs
3603                .clone()
3604                .new_transaction(
3605                    lock_keys![LockKey::object(
3606                        store.store_object_id(),
3607                        store.root_directory_object_id()
3608                    )],
3609                    Options::default(),
3610                )
3611                .await
3612                .expect("new_transaction failed");
3613            let root_directory = Directory::open(&store, store.root_directory_object_id())
3614                .await
3615                .expect("open failed");
3616            object_id = root_directory
3617                .create_child_file(&mut transaction, "test")
3618                .await
3619                .expect("create_child_file failed")
3620                .object_id();
3621            transaction.commit().await.expect("commit failed");
3622        }
3623
3624        fs.close().await.expect("Close failed");
3625        let mut device = fs.take_device().await;
3626
3627        // Repeatedly remount so that we can be sure that we can remount when there are many
3628        // mutations keys.
3629        for _ in 0..100 {
3630            device.reopen(false);
3631            let fs = FxFilesystem::open(device).await.expect("open failed");
3632            {
3633                let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3634                let store = root_volume
3635                    .volume(
3636                        "test",
3637                        StoreOptions { crypt: Some(crypt.clone()), ..StoreOptions::default() },
3638                    )
3639                    .await
3640                    .expect("open_volume failed");
3641
3642                // The key should get rolled every time we unlock.
3643                assert_eq!(store.mutations_cipher.lock().as_ref().unwrap().offset(), 0);
3644
3645                // Make sure there's an encrypted mutation.
3646                let handle =
3647                    ObjectStore::open_object(&store, object_id, HandleOptions::default(), None)
3648                        .await
3649                        .expect("open_object failed");
3650                let buffer = handle.allocate_buffer(100).await;
3651                handle
3652                    .write_or_append(Some(0), buffer.as_ref())
3653                    .await
3654                    .expect("write_or_append failed");
3655            }
3656            fs.close().await.expect("Close failed");
3657            device = fs.take_device().await;
3658        }
3659    }
3660
3661    #[test]
3662    fn test_store_info_max_serialized_size() {
3663        let info = StoreInfo {
3664            guid: [0xff; 16],
3665            last_object_id: 0x1234567812345678,
3666            // Worst case, each layer should be 3/4 the size of the layer below it (because of the
3667            // compaction policy we're using).  If the smallest layer is 8,192 bytes, then 120
3668            // layers would take up a size that exceeds a 64 bit unsigned integer, so if this fits,
3669            // any size should fit.
3670            layers: vec![0x1234567812345678; 120],
3671            root_directory_object_id: 0x1234567812345678,
3672            graveyard_directory_object_id: 0x1234567812345678,
3673            object_count: 0x1234567812345678,
3674            mutations_key: Some(FxfsKey {
3675                wrapping_key_id: 0x1234567812345678u128.to_le_bytes(),
3676                key: WrappedKeyBytes::from([0xff; FXFS_WRAPPED_KEY_SIZE]),
3677            }),
3678            mutations_cipher_offset: 0x1234567812345678,
3679            encrypted_mutations_object_id: 0x1234567812345678,
3680            object_id_key: Some(FxfsKey {
3681                wrapping_key_id: 0x1234567812345678u128.to_le_bytes(),
3682                key: WrappedKeyBytes::from([0xff; FXFS_WRAPPED_KEY_SIZE]),
3683            }),
3684            internal_directory_object_id: INVALID_OBJECT_ID,
3685        };
3686        let mut serialized_info = Vec::new();
3687        info.serialize_with_version(&mut serialized_info).unwrap();
3688        assert!(
3689            serialized_info.len() <= MAX_STORE_INFO_SERIALIZED_SIZE,
3690            "{}",
3691            serialized_info.len()
3692        );
3693    }
3694
3695    async fn reopen_after_crypt_failure_inner(read_only: bool) {
3696        let fs = test_filesystem().await;
3697        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3698
3699        let store = {
3700            let crypt = Arc::new(new_insecure_crypt());
3701            let store = root_volume
3702                .new_volume(
3703                    "vol",
3704                    NewChildStoreOptions {
3705                        options: StoreOptions {
3706                            crypt: Some(crypt.clone()),
3707                            ..StoreOptions::default()
3708                        },
3709                        ..Default::default()
3710                    },
3711                )
3712                .await
3713                .expect("new_volume failed");
3714            let root_directory = Directory::open(&store, store.root_directory_object_id())
3715                .await
3716                .expect("open failed");
3717            let mut transaction = fs
3718                .clone()
3719                .new_transaction(
3720                    lock_keys![LockKey::object(
3721                        store.store_object_id(),
3722                        root_directory.object_id()
3723                    )],
3724                    Options::default(),
3725                )
3726                .await
3727                .expect("new_transaction failed");
3728            root_directory
3729                .create_child_file(&mut transaction, "test")
3730                .await
3731                .expect("create_child_file failed");
3732            transaction.commit().await.expect("commit failed");
3733
3734            crypt.shutdown();
3735            let mut transaction = fs
3736                .clone()
3737                .new_transaction(
3738                    lock_keys![LockKey::object(
3739                        store.store_object_id(),
3740                        root_directory.object_id()
3741                    )],
3742                    Options::default(),
3743                )
3744                .await
3745                .expect("new_transaction failed");
3746            root_directory
3747                .create_child_file(&mut transaction, "test2")
3748                .await
3749                .map(|_| ())
3750                .expect_err("create_child_file should fail");
3751            store.lock().await.expect("lock failed");
3752            store
3753        };
3754
3755        let crypt = Arc::new(new_insecure_crypt());
3756        if read_only {
3757            store.unlock_read_only(crypt).await.expect("unlock failed");
3758        } else {
3759            store.unlock(NO_OWNER, crypt).await.expect("unlock failed");
3760        }
3761        let root_directory =
3762            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
3763        root_directory.lookup("test").await.expect("lookup failed").expect("not found");
3764    }
3765
3766    #[fuchsia::test(threads = 10)]
3767    async fn test_reopen_after_crypt_failure() {
3768        reopen_after_crypt_failure_inner(false).await;
3769    }
3770
3771    #[fuchsia::test(threads = 10)]
3772    async fn test_reopen_read_only_after_crypt_failure() {
3773        reopen_after_crypt_failure_inner(true).await;
3774    }
3775
3776    #[fuchsia::test(threads = 10)]
3777    #[should_panic(expected = "Insufficient reservation space")]
3778    #[cfg(debug_assertions)]
3779    async fn large_transaction_causes_panic_in_debug_builds() {
3780        let fs = test_filesystem().await;
3781        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3782        let store = root_volume
3783            .new_volume("vol", NewChildStoreOptions::default())
3784            .await
3785            .expect("new_volume failed");
3786        let root_directory =
3787            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
3788        let mut transaction = fs
3789            .clone()
3790            .new_transaction(
3791                lock_keys![LockKey::object(store.store_object_id(), root_directory.object_id())],
3792                Options::default(),
3793            )
3794            .await
3795            .expect("transaction");
3796        for i in 0..500 {
3797            root_directory
3798                .create_symlink(&mut transaction, b"link", &format!("{}", i))
3799                .await
3800                .expect("symlink");
3801        }
3802        assert_eq!(transaction.commit().await.expect("commit"), 0);
3803    }
3804
3805    #[fuchsia::test]
3806    async fn test_crypt_failure_does_not_fuse_journal() {
3807        let fs = test_filesystem().await;
3808
3809        struct Owner;
3810        #[async_trait]
3811        impl StoreOwner for Owner {
3812            async fn force_lock(self: Arc<Self>, store: &ObjectStore) -> Result<(), anyhow::Error> {
3813                store.lock().await
3814            }
3815        }
3816        let owner = Arc::new(Owner) as Arc<dyn StoreOwner>;
3817
3818        {
3819            // Create two stores and a record for each store, so the journal will need to flush them
3820            // both later.
3821            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3822            let store1 = root_volume
3823                .new_volume(
3824                    "vol1",
3825                    NewChildStoreOptions {
3826                        options: StoreOptions {
3827                            crypt: Some(Arc::new(new_insecure_crypt())),
3828                            ..StoreOptions::default()
3829                        },
3830                        ..Default::default()
3831                    },
3832                )
3833                .await
3834                .expect("new_volume failed");
3835            let crypt = Arc::new(new_insecure_crypt());
3836            let store2 = root_volume
3837                .new_volume(
3838                    "vol2",
3839                    NewChildStoreOptions {
3840                        options: StoreOptions {
3841                            owner: Arc::downgrade(&owner),
3842                            crypt: Some(crypt.clone()),
3843                        },
3844                        ..Default::default()
3845                    },
3846                )
3847                .await
3848                .expect("new_volume failed");
3849            for store in [&store1, &store2] {
3850                let root_directory = Directory::open(store, store.root_directory_object_id())
3851                    .await
3852                    .expect("open failed");
3853                let mut transaction = fs
3854                    .clone()
3855                    .new_transaction(
3856                        lock_keys![LockKey::object(
3857                            store.store_object_id(),
3858                            root_directory.object_id()
3859                        )],
3860                        Options::default(),
3861                    )
3862                    .await
3863                    .expect("new_transaction failed");
3864                root_directory
3865                    .create_child_file(&mut transaction, "test")
3866                    .await
3867                    .expect("create_child_file failed");
3868                transaction.commit().await.expect("commit failed");
3869            }
3870            // Shut down the crypt instance for store2, and then compact.  Compaction should not
3871            // fail, and the store should become locked.
3872            crypt.shutdown();
3873            fs.journal().compact().await.expect("compact failed");
3874            // The store should now be locked.
3875            assert!(store2.is_locked());
3876        }
3877
3878        // Even though the store wasn't flushed, the mutation to store2 will still be valid as it is
3879        // held in the journal.
3880        fs.close().await.expect("close failed");
3881        let device = fs.take_device().await;
3882        device.reopen(false);
3883        let fs = FxFilesystem::open(device).await.expect("open failed");
3884        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3885
3886        for volume_name in ["vol1", "vol2"] {
3887            let store = root_volume
3888                .volume(
3889                    volume_name,
3890                    StoreOptions {
3891                        crypt: Some(Arc::new(new_insecure_crypt())),
3892                        ..StoreOptions::default()
3893                    },
3894                )
3895                .await
3896                .expect("open volume failed");
3897            let root_directory = Directory::open(&store, store.root_directory_object_id())
3898                .await
3899                .expect("open failed");
3900            assert!(root_directory.lookup("test").await.expect("lookup failed").is_some());
3901        }
3902
3903        fs.close().await.expect("close failed");
3904    }
3905
3906    #[fuchsia::test]
3907    async fn test_crypt_failure_during_unlock_race() {
3908        let fs = test_filesystem().await;
3909
3910        struct Owner;
3911        #[async_trait]
3912        impl StoreOwner for Owner {
3913            async fn force_lock(self: Arc<Self>, store: &ObjectStore) -> Result<(), anyhow::Error> {
3914                store.lock().await
3915            }
3916        }
3917        let owner = Arc::new(Owner) as Arc<dyn StoreOwner>;
3918
3919        let store_object_id = {
3920            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3921            let store = root_volume
3922                .new_volume(
3923                    "vol",
3924                    NewChildStoreOptions {
3925                        options: StoreOptions {
3926                            owner: Arc::downgrade(&owner),
3927                            crypt: Some(Arc::new(new_insecure_crypt())),
3928                        },
3929                        ..Default::default()
3930                    },
3931                )
3932                .await
3933                .expect("new_volume failed");
3934            let root_directory = Directory::open(&store, store.root_directory_object_id())
3935                .await
3936                .expect("open failed");
3937            let mut transaction = fs
3938                .clone()
3939                .new_transaction(
3940                    lock_keys![LockKey::object(
3941                        store.store_object_id(),
3942                        root_directory.object_id()
3943                    )],
3944                    Options::default(),
3945                )
3946                .await
3947                .expect("new_transaction failed");
3948            root_directory
3949                .create_child_file(&mut transaction, "test")
3950                .await
3951                .expect("create_child_file failed");
3952            transaction.commit().await.expect("commit failed");
3953            store.store_object_id()
3954        };
3955
3956        fs.close().await.expect("close failed");
3957        let device = fs.take_device().await;
3958        device.reopen(false);
3959
3960        let fs = FxFilesystem::open(device).await.expect("open failed");
3961        {
3962            let fs_clone = fs.clone();
3963            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3964
3965            let crypt = Arc::new(new_insecure_crypt());
3966            let crypt_clone = crypt.clone();
3967            join!(
3968                async move {
3969                    // Unlock might fail, so ignore errors.
3970                    let _ = root_volume
3971                        .volume(
3972                            "vol",
3973                            StoreOptions {
3974                                owner: Arc::downgrade(&owner),
3975                                crypt: Some(crypt_clone),
3976                            },
3977                        )
3978                        .await;
3979                },
3980                async move {
3981                    // Block until unlock is finished but before flushing due to unlock is finished, to
3982                    // maximize the chances of weirdness.
3983                    let keys = lock_keys![LockKey::flush(store_object_id)];
3984                    let _ = fs_clone.lock_manager().write_lock(keys).await;
3985                    crypt.shutdown();
3986                }
3987            );
3988        }
3989
3990        fs.close().await.expect("close failed");
3991        let device = fs.take_device().await;
3992        device.reopen(false);
3993
3994        let fs = FxFilesystem::open(device).await.expect("open failed");
3995        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3996        let store = root_volume
3997            .volume(
3998                "vol",
3999                StoreOptions {
4000                    crypt: Some(Arc::new(new_insecure_crypt())),
4001                    ..StoreOptions::default()
4002                },
4003            )
4004            .await
4005            .expect("open volume failed");
4006        let root_directory =
4007            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
4008        assert!(root_directory.lookup("test").await.expect("lookup failed").is_some());
4009
4010        fs.close().await.expect("close failed");
4011    }
4012}