fxfs/
object_store.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5pub mod allocator;
6pub mod caching_object_handle;
7pub mod data_object_handle;
8pub mod directory;
9mod extent_mapping_iterator;
10mod extent_record;
11mod flush;
12pub mod graveyard;
13mod install;
14pub mod journal;
15mod key_manager;
16pub(crate) mod merge;
17pub mod object_manager;
18pub mod object_record;
19pub mod project_id;
20mod store_object_handle;
21pub mod transaction;
22mod tree;
23mod tree_cache;
24pub mod volume;
25
26pub use data_object_handle::{
27    DataObjectHandle, DirectWriter, FileExtent, FsverityState, FsverityStateInner, RangeType,
28};
29pub use directory::Directory;
30pub use object_record::{ChildValue, ObjectDescriptor, PosixAttributes, Timestamp};
31pub use store_object_handle::{
32    EXTENDED_ATTRIBUTE_RANGE_END, EXTENDED_ATTRIBUTE_RANGE_START, SetExtendedAttributeMode,
33    StoreObjectHandle,
34};
35
36use crate::errors::FxfsError;
37use crate::filesystem::{
38    ApplyContext, ApplyMode, FxFilesystem, JournalingObject, MAX_FILE_SIZE, SyncOptions,
39    TruncateGuard, TxnGuard,
40};
41use crate::log::*;
42use crate::lsm_tree::cache::{NullCache, ObjectCache};
43use crate::lsm_tree::types::{Item, ItemRef, LayerIterator};
44use crate::lsm_tree::{LSMTree, Query};
45use crate::object_handle::{INVALID_OBJECT_ID, ObjectHandle, ObjectProperties, ReadObjectHandle};
46use crate::object_store::allocator::Allocator;
47use crate::object_store::graveyard::Graveyard;
48use crate::object_store::journal::{JournalCheckpoint, JournalCheckpointV32, JournaledTransaction};
49use crate::object_store::key_manager::KeyManager;
50use crate::object_store::transaction::{
51    AssocObj, AssociatedObject, LockKey, ObjectStoreMutation, Operation, Options, Transaction,
52    lock_keys,
53};
54use crate::range::RangeExt;
55use crate::round::round_up;
56use crate::serialized_types::{Version, Versioned, VersionedLatest};
57use anyhow::{Context, Error, anyhow, bail, ensure};
58use async_trait::async_trait;
59use fidl_fuchsia_io as fio;
60use fprint::TypeFingerprint;
61use fuchsia_sync::Mutex;
62use fxfs_crypto::ff1::Ff1;
63use fxfs_crypto::{
64    Cipher, Crypt, FxfsCipher, KeyPurpose, ObjectType, StreamCipher, UnwrappedKey, WrappingKeyId,
65};
66use fxfs_macros::{Migrate, migrate_to_version};
67use once_cell::sync::OnceCell;
68use scopeguard::ScopeGuard;
69use serde::{Deserialize, Serialize};
70use std::fmt;
71use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
72use std::sync::{Arc, Weak};
73use storage_device::Device;
74use uuid::Uuid;
75
76pub use extent_record::{
77    BLOB_MERKLE_ATTRIBUTE_ID, DEFAULT_DATA_ATTRIBUTE_ID, ExtentKey, ExtentMode, ExtentValue,
78    FSVERITY_MERKLE_ATTRIBUTE_ID,
79};
80pub use object_record::{
81    AttributeKey, EncryptionKey, EncryptionKeys, ExtendedAttributeValue, FsverityMetadata, FxfsKey,
82    FxfsKeyV40, FxfsKeyV49, ObjectAttributes, ObjectKey, ObjectKeyData, ObjectKind, ObjectValue,
83    ProjectProperty, RootDigest,
84};
85pub use transaction::Mutation;
86
87// For encrypted stores, the lower 32 bits of the object ID are encrypted to make side-channel
88// attacks more difficult. This mask can be used to extract the hi part of the object ID.
89const OBJECT_ID_HI_MASK: u64 = 0xffffffff00000000;
90
91// At time of writing, this threshold limits transactions that delete extents to about 10,000 bytes.
92const TRANSACTION_MUTATION_THRESHOLD: usize = 200;
93
94// Encrypted files and directories use the fscrypt key (identified by `FSCRYPT_KEY_ID`) to encrypt
95// file contents and filenames respectively. All non-fscrypt encrypted files otherwise default to
96// using the `VOLUME_DATA_KEY_ID` key. Note, the filesystem always uses the `VOLUME_DATA_KEY_ID`
97// key to encrypt large extended attributes. Thus, encrypted files and directories with large
98// xattrs will have both an fscrypt and volume data key.
99pub const VOLUME_DATA_KEY_ID: u64 = 0;
100pub const FSCRYPT_KEY_ID: u64 = 1;
101
102/// A constant that can be used where an owner is expected of type `Weak<dyn StoreOwner>` but no
103/// owner is required.
104pub const NO_OWNER: Weak<()> = Weak::new();
105impl StoreOwner for () {}
106
107#[async_trait]
108pub trait StoreOwner: Send + Sync {
109    /// Forcibly lock the store.  This exists to give the StoreOwner an opportunity to clean up
110    /// tasks which might access the store before locking it, because ObjectStore::unlock can only
111    /// be called when the store is not in use.
112    async fn force_lock(self: Arc<Self>, _store: &ObjectStore) -> Result<(), Error> {
113        Err(anyhow!(FxfsError::Internal))
114    }
115}
116
117/// DataObjectHandle stores an owner that must implement this trait, which allows the handle to get
118/// back to an ObjectStore.
119pub trait HandleOwner: AsRef<ObjectStore> + Send + Sync + 'static {}
120
121/// StoreInfo stores information about the object store.  This is stored within the parent object
122/// store, and is used, for example, to get the persistent layer objects.
123pub type StoreInfo = StoreInfoV49;
124
125#[derive(Clone, Debug, Default, Serialize, Deserialize, TypeFingerprint, Versioned)]
126pub struct StoreInfoV49 {
127    /// The globally unique identifier for the associated object store. If unset, will be all zero.
128    guid: [u8; 16],
129
130    /// The last used object ID.  Note that this field is not accurate in memory; ObjectStore's
131    /// last_object_id field is the one to use in that case.  Technically, this might not be the
132    /// last object ID used for the latest transaction that created an object because we use this at
133    /// the point of creating the object but before we commit the transaction.  Transactions can
134    /// then get committed in an arbitrary order (or not at all).
135    last_object_id: u64,
136
137    /// Object ids for layers.  TODO(https://fxbug.dev/42178036): need a layer of indirection here
138    /// so we can support snapshots.
139    pub layers: Vec<u64>,
140
141    /// The object ID for the root directory.
142    root_directory_object_id: u64,
143
144    /// The object ID for the graveyard.
145    graveyard_directory_object_id: u64,
146
147    /// The number of live objects in the store.  This should *not* be trusted; it can be invalid
148    /// due to filesystem inconsistencies.
149    object_count: u64,
150
151    /// The (wrapped) key that encrypted mutations should use.
152    mutations_key: Option<FxfsKeyV49>,
153
154    /// Mutations for the store are encrypted using a stream cipher.  To decrypt the mutations, we
155    /// need to know the offset in the cipher stream to start it.
156    mutations_cipher_offset: u64,
157
158    /// If we have to flush the store whilst we do not have the key, we need to write the encrypted
159    /// mutations to an object. This is the object ID of that file if it exists.
160    pub encrypted_mutations_object_id: u64,
161
162    /// Object IDs are encrypted to reduce the amount of information that sequential object IDs
163    /// reveal (such as the number of files in the system and the ordering of their creation in
164    /// time).  Only the bottom 32 bits of the object ID are encrypted whilst the top 32 bits will
165    /// increment after 2^32 object IDs have been used and this allows us to roll the key.
166    object_id_key: Option<FxfsKeyV49>,
167
168    /// A directory for storing internal files in a directory structure. Holds INVALID_OBJECT_ID
169    /// when the directory doesn't yet exist.
170    internal_directory_object_id: u64,
171}
172
173#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
174#[migrate_to_version(StoreInfoV49)]
175pub struct StoreInfoV40 {
176    guid: [u8; 16],
177    last_object_id: u64,
178    pub layers: Vec<u64>,
179    root_directory_object_id: u64,
180    graveyard_directory_object_id: u64,
181    object_count: u64,
182    mutations_key: Option<FxfsKeyV40>,
183    mutations_cipher_offset: u64,
184    pub encrypted_mutations_object_id: u64,
185    object_id_key: Option<FxfsKeyV40>,
186    internal_directory_object_id: u64,
187}
188
189impl StoreInfo {
190    /// Create a new/default [`StoreInfo`] but with a newly generated GUID.
191    fn new_with_guid() -> Self {
192        let guid = Uuid::new_v4();
193        Self { guid: *guid.as_bytes(), ..Default::default() }
194    }
195
196    /// Returns the parent objects for this store.
197    pub fn parent_objects(&self) -> Vec<u64> {
198        // We should not include the ID of the store itself, since that should be referred to in the
199        // volume directory.
200        let mut objects = self.layers.to_vec();
201        if self.encrypted_mutations_object_id != INVALID_OBJECT_ID {
202            objects.push(self.encrypted_mutations_object_id);
203        }
204        objects
205    }
206}
207
208// TODO(https://fxbug.dev/42178037): We should test or put checks in place to ensure this limit isn't exceeded.
209// It will likely involve placing limits on the maximum number of layers.
210pub const MAX_STORE_INFO_SERIALIZED_SIZE: usize = 131072;
211
212// This needs to be large enough to accommodate the maximum amount of unflushed data (data that is
213// in the journal but hasn't yet been written to layer files) for a store.  We set a limit because
214// we want to limit the amount of memory use in the case the filesystem is corrupt or under attack.
215pub const MAX_ENCRYPTED_MUTATIONS_SIZE: usize = 8 * journal::DEFAULT_RECLAIM_SIZE as usize;
216
217#[derive(Default)]
218pub struct HandleOptions {
219    /// If true, transactions used by this handle will skip journal space checks.
220    pub skip_journal_checks: bool,
221    /// If true, data written to any attribute of this handle will not have per-block checksums
222    /// computed.
223    pub skip_checksums: bool,
224}
225
226/// Parameters for encrypting a newly created object.
227pub struct ObjectEncryptionOptions {
228    /// If set, the keys are treated as permanent and never evicted from the KeyManager cache.
229    /// This is necessary when keys are managed by another store; for example, the layer files
230    /// of a child store are objects in the root store, but they are encrypted with keys from the
231    /// child store.  Generally, most objects should have this set to `false`.
232    pub permanent: bool,
233    pub key_id: u64,
234    pub key: EncryptionKey,
235    pub unwrapped_key: UnwrappedKey,
236}
237
238pub struct StoreOptions {
239    /// The owner of the store.
240    pub owner: Weak<dyn StoreOwner>,
241
242    /// The store is unencrypted if store is none.
243    pub crypt: Option<Arc<dyn Crypt>>,
244}
245
246impl Default for StoreOptions {
247    fn default() -> Self {
248        Self { owner: NO_OWNER, crypt: None }
249    }
250}
251
252#[derive(Default)]
253pub struct NewChildStoreOptions {
254    pub options: StoreOptions,
255
256    /// Specifies the object ID in the root store to be used for the store.  If set to
257    /// INVALID_OBJECT_ID (the default and typical case), a suitable ID will be chosen.
258    pub object_id: u64,
259
260    /// If true, reserve all 32 bit object_ids.
261    pub reserve_32bit_object_ids: bool,
262}
263
264pub type EncryptedMutations = EncryptedMutationsV49;
265
266#[derive(Clone, Default, Deserialize, Serialize, TypeFingerprint)]
267pub struct EncryptedMutationsV49 {
268    // Information about the mutations are held here, but the actual encrypted data is held within
269    // data.  For each transaction, we record the checkpoint and the count of mutations within the
270    // transaction.  The checkpoint is required for the log file offset (which we need to apply the
271    // mutations), and the version so that we can correctly decode the mutation after it has been
272    // decrypted. The count specifies the number of serialized mutations encoded in |data|.
273    transactions: Vec<(JournalCheckpointV32, u64)>,
274
275    // The encrypted mutations.
276    data: Vec<u8>,
277
278    // If the mutations key was rolled, this holds the offset in `data` where the new key should
279    // apply.
280    mutations_key_roll: Vec<(usize, FxfsKeyV49)>,
281}
282
283impl std::fmt::Debug for EncryptedMutations {
284    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
285        f.debug_struct("EncryptedMutations")
286            .field("transactions", &self.transactions)
287            .field("len", &self.data.len())
288            .field(
289                "mutations_key_roll",
290                &self.mutations_key_roll.iter().map(|k| k.0).collect::<Vec<usize>>(),
291            )
292            .finish()
293    }
294}
295
296impl Versioned for EncryptedMutations {
297    fn max_serialized_size() -> u64 {
298        MAX_ENCRYPTED_MUTATIONS_SIZE as u64
299    }
300}
301
302impl From<EncryptedMutationsV40> for EncryptedMutationsV49 {
303    fn from(value: EncryptedMutationsV40) -> Self {
304        EncryptedMutationsV49 {
305            transactions: value.transactions,
306            data: value.data,
307            mutations_key_roll: value
308                .mutations_key_roll
309                .into_iter()
310                .map(|(offset, key)| (offset, key.into()))
311                .collect(),
312        }
313    }
314}
315
316#[derive(Deserialize, Serialize, TypeFingerprint)]
317pub struct EncryptedMutationsV40 {
318    transactions: Vec<(JournalCheckpointV32, u64)>,
319    data: Vec<u8>,
320    mutations_key_roll: Vec<(usize, FxfsKeyV40)>,
321}
322
323impl Versioned for EncryptedMutationsV40 {
324    fn max_serialized_size() -> u64 {
325        MAX_ENCRYPTED_MUTATIONS_SIZE as u64
326    }
327}
328
329impl EncryptedMutations {
330    fn from_replayed_mutations(
331        store_object_id: u64,
332        transactions: Vec<JournaledTransaction>,
333    ) -> Self {
334        let mut this = Self::default();
335        for JournaledTransaction { checkpoint, non_root_mutations, .. } in transactions {
336            for (object_id, mutation) in non_root_mutations {
337                if store_object_id == object_id {
338                    if let Mutation::EncryptedObjectStore(data) = mutation {
339                        this.push(&checkpoint, data);
340                    } else if let Mutation::UpdateMutationsKey(key) = mutation {
341                        this.mutations_key_roll.push((this.data.len(), key.into()));
342                    }
343                }
344            }
345        }
346        this
347    }
348
349    fn extend(&mut self, other: &EncryptedMutations) {
350        self.transactions.extend_from_slice(&other.transactions[..]);
351        self.mutations_key_roll.extend(
352            other
353                .mutations_key_roll
354                .iter()
355                .map(|(offset, key)| (offset + self.data.len(), key.clone())),
356        );
357        self.data.extend_from_slice(&other.data[..]);
358    }
359
360    fn push(&mut self, checkpoint: &JournalCheckpoint, data: Box<[u8]>) {
361        self.data.append(&mut data.into());
362        // If the checkpoint is the same as the last mutation we pushed, increment the count.
363        if let Some((last_checkpoint, count)) = self.transactions.last_mut() {
364            if last_checkpoint.file_offset == checkpoint.file_offset {
365                *count += 1;
366                return;
367            }
368        }
369        self.transactions.push((checkpoint.clone(), 1));
370    }
371}
372
373pub enum LockState {
374    Locked,
375    Unencrypted,
376    Unlocked { owner: Weak<dyn StoreOwner>, crypt: Arc<dyn Crypt> },
377
378    // The store is unlocked, but in a read-only state, and no flushes or other operations will be
379    // performed on the store.
380    UnlockedReadOnly(Arc<dyn Crypt>),
381
382    // The store is encrypted but is now in an unusable state (due to a failure to sync the journal
383    // after locking the store).  The store cannot be unlocked.
384    Invalid,
385
386    // Before we've read the StoreInfo we might not know whether the store is Locked or Unencrypted.
387    // This can happen when lazily opening stores (ObjectManager::lazy_open_store).
388    Unknown,
389
390    // The store is in the process of being locked.  Whilst the store is being locked, the store
391    // isn't usable; assertions will trip if any mutations are applied.
392    Locking,
393
394    // Whilst we're unlocking, we will replay encrypted mutations.  The store isn't usable until
395    // it's in the Unlocked state.
396    Unlocking,
397
398    // The store has been deleted.
399    Deleted,
400}
401
402impl LockState {
403    fn owner(&self) -> Option<Arc<dyn StoreOwner>> {
404        if let Self::Unlocked { owner, .. } = self { owner.upgrade() } else { None }
405    }
406}
407
408impl fmt::Debug for LockState {
409    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
410        formatter.write_str(match self {
411            LockState::Locked => "Locked",
412            LockState::Unencrypted => "Unencrypted",
413            LockState::Unlocked { .. } => "Unlocked",
414            LockState::UnlockedReadOnly(..) => "UnlockedReadOnly",
415            LockState::Invalid => "Invalid",
416            LockState::Unknown => "Unknown",
417            LockState::Locking => "Locking",
418            LockState::Unlocking => "Unlocking",
419            LockState::Deleted => "Deleted",
420        })
421    }
422}
423
424#[derive(Default, Clone)]
425struct LastObjectId {
426    // The *unencrypted* value of the last object ID.
427    id: u64,
428
429    // Encrypted stores will use a cipher to obfuscate the object ID.
430    cipher: Option<Ff1>,
431}
432
433impl LastObjectId {
434    // Returns true if a cipher is needed to generate new object IDs.
435    fn should_create_cipher(&self) -> bool {
436        self.cipher.is_some() && self.id as u32 == u32::MAX
437    }
438
439    fn get_next_object_id(&mut self) -> u64 {
440        if let Some(cipher) = &self.cipher {
441            let hi = self.id & OBJECT_ID_HI_MASK;
442            loop {
443                self.id += 1;
444                assert_ne!(self.id as u32, 0); // This would indicate the ID wrapped.
445                let candidate = hi | cipher.encrypt(self.id as u32) as u64;
446                if candidate != INVALID_OBJECT_ID {
447                    break candidate;
448                }
449            }
450        } else {
451            self.id += 1;
452            self.id
453        }
454    }
455}
456
457/// An object store supports a file like interface for objects.  Objects are keyed by a 64 bit
458/// identifier.  And object store has to be backed by a parent object store (which stores metadata
459/// for the object store).  The top-level object store (a.k.a. the root parent object store) is
460/// in-memory only.
461pub struct ObjectStore {
462    parent_store: Option<Arc<ObjectStore>>,
463    store_object_id: u64,
464    device: Arc<dyn Device>,
465    block_size: u64,
466    filesystem: Weak<FxFilesystem>,
467    // Lock ordering: This must be taken before `lock_state`.
468    store_info: Mutex<Option<StoreInfo>>,
469    tree: LSMTree<ObjectKey, ObjectValue>,
470
471    // When replaying the journal, the store cannot read StoreInfo until the whole journal
472    // has been replayed, so during that time, store_info_handle will be None and records
473    // just get sent to the tree. Once the journal has been replayed, we can open the store
474    // and load all the other layer information.
475    store_info_handle: OnceCell<DataObjectHandle<ObjectStore>>,
476
477    // The cipher to use for encrypted mutations, if this store is encrypted.
478    mutations_cipher: Mutex<Option<StreamCipher>>,
479
480    // Current lock state of the store.
481    // Lock ordering: This must be taken after `store_info`.
482    lock_state: Mutex<LockState>,
483    pub key_manager: KeyManager,
484
485    // Enable/disable tracing.
486    trace: AtomicBool,
487
488    // Informational counters for events occurring within the store.
489    counters: Mutex<ObjectStoreCounters>,
490
491    // These are updated in performance-sensitive code paths so we use atomics instead of counters.
492    device_read_ops: AtomicU64,
493    device_write_ops: AtomicU64,
494    logical_read_ops: AtomicU64,
495    logical_write_ops: AtomicU64,
496
497    // Contains the last object ID and, optionally, a cipher to be used when generating new object
498    // IDs.
499    last_object_id: Mutex<LastObjectId>,
500
501    // An optional callback to be invoked each time the ObjectStore flushes.  The callback is
502    // invoked at the end of flush, while the write lock is still held.
503    flush_callback: Mutex<Option<Box<dyn Fn(&ObjectStore) + Send + Sync + 'static>>>,
504}
505
506#[derive(Clone, Default)]
507struct ObjectStoreCounters {
508    mutations_applied: u64,
509    mutations_dropped: u64,
510    num_flushes: u64,
511    last_flush_time: Option<std::time::SystemTime>,
512}
513
514impl ObjectStore {
515    fn new(
516        parent_store: Option<Arc<ObjectStore>>,
517        store_object_id: u64,
518        filesystem: Arc<FxFilesystem>,
519        store_info: Option<StoreInfo>,
520        object_cache: Box<dyn ObjectCache<ObjectKey, ObjectValue>>,
521        mutations_cipher: Option<StreamCipher>,
522        lock_state: LockState,
523        last_object_id: LastObjectId,
524    ) -> Arc<ObjectStore> {
525        let device = filesystem.device();
526        let block_size = filesystem.block_size();
527        Arc::new(ObjectStore {
528            parent_store,
529            store_object_id,
530            device,
531            block_size,
532            filesystem: Arc::downgrade(&filesystem),
533            store_info: Mutex::new(store_info),
534            tree: LSMTree::new(merge::merge, object_cache),
535            store_info_handle: OnceCell::new(),
536            mutations_cipher: Mutex::new(mutations_cipher),
537            lock_state: Mutex::new(lock_state),
538            key_manager: KeyManager::new(),
539            trace: AtomicBool::new(false),
540            counters: Mutex::new(ObjectStoreCounters::default()),
541            device_read_ops: AtomicU64::new(0),
542            device_write_ops: AtomicU64::new(0),
543            logical_read_ops: AtomicU64::new(0),
544            logical_write_ops: AtomicU64::new(0),
545            last_object_id: Mutex::new(last_object_id),
546            flush_callback: Mutex::new(None),
547        })
548    }
549
550    fn new_empty(
551        parent_store: Option<Arc<ObjectStore>>,
552        store_object_id: u64,
553        filesystem: Arc<FxFilesystem>,
554        object_cache: Box<dyn ObjectCache<ObjectKey, ObjectValue>>,
555    ) -> Arc<Self> {
556        Self::new(
557            parent_store,
558            store_object_id,
559            filesystem,
560            Some(StoreInfo::default()),
561            object_cache,
562            None,
563            LockState::Unencrypted,
564            LastObjectId::default(),
565        )
566    }
567
568    /// Cycle breaker constructor that returns an ObjectStore without a filesystem.
569    /// This should only be used from super block code.
570    pub fn new_root_parent(device: Arc<dyn Device>, block_size: u64, store_object_id: u64) -> Self {
571        ObjectStore {
572            parent_store: None,
573            store_object_id,
574            device,
575            block_size,
576            filesystem: Weak::<FxFilesystem>::new(),
577            store_info: Mutex::new(Some(StoreInfo::default())),
578            tree: LSMTree::new(merge::merge, Box::new(NullCache {})),
579            store_info_handle: OnceCell::new(),
580            mutations_cipher: Mutex::new(None),
581            lock_state: Mutex::new(LockState::Unencrypted),
582            key_manager: KeyManager::new(),
583            trace: AtomicBool::new(false),
584            counters: Mutex::new(ObjectStoreCounters::default()),
585            device_read_ops: AtomicU64::new(0),
586            device_write_ops: AtomicU64::new(0),
587            logical_read_ops: AtomicU64::new(0),
588            logical_write_ops: AtomicU64::new(0),
589            last_object_id: Mutex::new(LastObjectId::default()),
590            flush_callback: Mutex::new(None),
591        }
592    }
593
594    /// Used to set filesystem on root_parent stores at bootstrap time after the filesystem has
595    /// been created.
596    pub fn attach_filesystem(mut this: ObjectStore, filesystem: Arc<FxFilesystem>) -> ObjectStore {
597        this.filesystem = Arc::downgrade(&filesystem);
598        this
599    }
600
601    /// Create a child store. It is a multi-step process:
602    ///
603    ///   1. Call `ObjectStore::new_child_store`.
604    ///   2. Register the store with the object-manager.
605    ///   3. Call `ObjectStore::create` to write the store-info.
606    ///
607    /// If the procedure fails, care must be taken to unregister store with the object-manager.
608    ///
609    /// The steps have to be separate because of lifetime issues when working with a transaction.
610    async fn new_child_store(
611        self: &Arc<Self>,
612        transaction: &mut Transaction<'_>,
613        options: NewChildStoreOptions,
614        object_cache: Box<dyn ObjectCache<ObjectKey, ObjectValue>>,
615    ) -> Result<Arc<Self>, Error> {
616        let handle = if options.object_id != INVALID_OBJECT_ID {
617            let handle = ObjectStore::create_object_with_id(
618                self,
619                transaction,
620                options.object_id,
621                HandleOptions::default(),
622                None,
623            )?;
624            self.update_last_object_id(options.object_id);
625            handle
626        } else {
627            ObjectStore::create_object(self, transaction, HandleOptions::default(), None).await?
628        };
629        let filesystem = self.filesystem();
630        let id = if options.reserve_32bit_object_ids { 0x1_0000_0000 } else { 0 };
631        let store = if let Some(crypt) = options.options.crypt {
632            let (wrapped_key, unwrapped_key) =
633                crypt.create_key(handle.object_id(), KeyPurpose::Metadata).await?;
634            let (object_id_wrapped, object_id_unwrapped) =
635                crypt.create_key(handle.object_id(), KeyPurpose::Metadata).await?;
636            Self::new(
637                Some(self.clone()),
638                handle.object_id(),
639                filesystem.clone(),
640                Some(StoreInfo {
641                    mutations_key: Some(wrapped_key),
642                    object_id_key: Some(object_id_wrapped),
643                    ..StoreInfo::new_with_guid()
644                }),
645                object_cache,
646                Some(StreamCipher::new(&unwrapped_key, 0)),
647                LockState::Unlocked { owner: options.options.owner, crypt },
648                LastObjectId { id, cipher: Some(Ff1::new(&object_id_unwrapped)) },
649            )
650        } else {
651            Self::new(
652                Some(self.clone()),
653                handle.object_id(),
654                filesystem.clone(),
655                Some(StoreInfo::new_with_guid()),
656                object_cache,
657                None,
658                LockState::Unencrypted,
659                LastObjectId { id, ..LastObjectId::default() },
660            )
661        };
662        assert!(store.store_info_handle.set(handle).is_ok());
663        Ok(store)
664    }
665
666    /// Actually creates the store in a transaction.  This will also create a root directory and
667    /// graveyard directory for the store.  See `new_child_store` above.
668    async fn create<'a>(
669        self: &'a Arc<Self>,
670        transaction: &mut Transaction<'a>,
671    ) -> Result<(), Error> {
672        let buf = {
673            // Create a root directory and graveyard directory.
674            let graveyard_directory_object_id = Graveyard::create(transaction, &self);
675            let root_directory = Directory::create(transaction, &self, None).await?;
676
677            let serialized_info = {
678                let mut store_info = self.store_info.lock();
679                let store_info = store_info.as_mut().unwrap();
680
681                store_info.graveyard_directory_object_id = graveyard_directory_object_id;
682                store_info.root_directory_object_id = root_directory.object_id();
683
684                let mut serialized_info = Vec::new();
685                store_info.serialize_with_version(&mut serialized_info)?;
686                serialized_info
687            };
688            let mut buf = self.device.allocate_buffer(serialized_info.len()).await;
689            buf.as_mut_slice().copy_from_slice(&serialized_info[..]);
690            buf
691        };
692
693        if self.filesystem().options().image_builder_mode.is_some() {
694            // If we're in image builder mode, we want to avoid writing to disk unless explicitly
695            // asked to. New object stores will have their StoreInfo written when we compact in
696            // FxFilesystem::finalize().
697            Ok(())
698        } else {
699            self.store_info_handle.get().unwrap().txn_write(transaction, 0u64, buf.as_ref()).await
700        }
701    }
702
703    pub fn set_trace(&self, trace: bool) {
704        let old_value = self.trace.swap(trace, Ordering::Relaxed);
705        if trace != old_value {
706            info!(store_id = self.store_object_id(), trace; "OS: trace",);
707        }
708    }
709
710    /// Sets a callback to be invoked each time the ObjectStore flushes.  The callback is invoked at
711    /// the end of flush, while the write lock is still held.
712    pub fn set_flush_callback<F: Fn(&ObjectStore) + Send + Sync + 'static>(&self, callback: F) {
713        let mut flush_callback = self.flush_callback.lock();
714        *flush_callback = Some(Box::new(callback));
715    }
716
717    pub fn is_root(&self) -> bool {
718        if let Some(parent) = &self.parent_store {
719            parent.parent_store.is_none()
720        } else {
721            // The root parent store isn't the root store.
722            false
723        }
724    }
725
726    /// Populates an inspect node with store statistics.
727    pub fn record_data(self: &Arc<Self>, root: &fuchsia_inspect::Node) {
728        // TODO(https://fxbug.dev/42069513): Push-back or rate-limit to prevent DoS.
729        let counters = self.counters.lock();
730        if let Some(store_info) = self.store_info() {
731            root.record_string("guid", Uuid::from_bytes(store_info.guid).to_string());
732        };
733        root.record_uint("store_object_id", self.store_object_id);
734        root.record_uint("mutations_applied", counters.mutations_applied);
735        root.record_uint("mutations_dropped", counters.mutations_dropped);
736        root.record_uint("num_flushes", counters.num_flushes);
737        if let Some(last_flush_time) = counters.last_flush_time.as_ref() {
738            root.record_uint(
739                "last_flush_time_ms",
740                last_flush_time
741                    .duration_since(std::time::UNIX_EPOCH)
742                    .unwrap_or(std::time::Duration::ZERO)
743                    .as_millis()
744                    .try_into()
745                    .unwrap_or(0u64),
746            );
747        }
748        root.record_uint("device_read_ops", self.device_read_ops.load(Ordering::Relaxed));
749        root.record_uint("device_write_ops", self.device_write_ops.load(Ordering::Relaxed));
750        root.record_uint("logical_read_ops", self.logical_read_ops.load(Ordering::Relaxed));
751        root.record_uint("logical_write_ops", self.logical_write_ops.load(Ordering::Relaxed));
752
753        let this = self.clone();
754        root.record_child("lsm_tree", move |node| this.tree().record_inspect_data(node));
755    }
756
757    pub fn device(&self) -> &Arc<dyn Device> {
758        &self.device
759    }
760
761    pub fn block_size(&self) -> u64 {
762        self.block_size
763    }
764
765    pub fn filesystem(&self) -> Arc<FxFilesystem> {
766        self.filesystem.upgrade().unwrap()
767    }
768
769    pub fn store_object_id(&self) -> u64 {
770        self.store_object_id
771    }
772
773    pub fn tree(&self) -> &LSMTree<ObjectKey, ObjectValue> {
774        &self.tree
775    }
776
777    pub fn root_directory_object_id(&self) -> u64 {
778        self.store_info.lock().as_ref().unwrap().root_directory_object_id
779    }
780
781    pub fn graveyard_directory_object_id(&self) -> u64 {
782        self.store_info.lock().as_ref().unwrap().graveyard_directory_object_id
783    }
784
785    fn set_graveyard_directory_object_id(&self, oid: u64) {
786        assert_eq!(
787            std::mem::replace(
788                &mut self.store_info.lock().as_mut().unwrap().graveyard_directory_object_id,
789                oid
790            ),
791            INVALID_OBJECT_ID
792        );
793    }
794
795    pub fn object_count(&self) -> u64 {
796        self.store_info.lock().as_ref().unwrap().object_count
797    }
798
799    pub fn key_manager(&self) -> &KeyManager {
800        &self.key_manager
801    }
802
803    pub fn parent_store(&self) -> Option<&Arc<ObjectStore>> {
804        self.parent_store.as_ref()
805    }
806
807    /// Returns the crypt object for the store.  Returns None if the store is unencrypted.
808    pub fn crypt(&self) -> Option<Arc<dyn Crypt>> {
809        match &*self.lock_state.lock() {
810            LockState::Locked => panic!("Store is locked"),
811            LockState::Invalid
812            | LockState::Unencrypted
813            | LockState::Locking
814            | LockState::Unlocking
815            | LockState::Deleted => None,
816            LockState::Unlocked { crypt, .. } => Some(crypt.clone()),
817            LockState::UnlockedReadOnly(crypt) => Some(crypt.clone()),
818            LockState::Unknown => {
819                panic!("Store is of unknown lock state; has the journal been replayed yet?")
820            }
821        }
822    }
823
824    pub async fn get_or_create_internal_directory_id(self: &Arc<Self>) -> Result<u64, Error> {
825        // Create the transaction first to use the object store lock.
826        let mut transaction = self
827            .filesystem()
828            .new_transaction(
829                lock_keys![LockKey::object(
830                    self.parent_store.as_ref().unwrap().store_object_id,
831                    self.store_object_id,
832                )],
833                Options::default(),
834            )
835            .await?;
836        let obj_id = self.store_info.lock().as_ref().unwrap().internal_directory_object_id;
837        if obj_id != INVALID_OBJECT_ID {
838            return Ok(obj_id);
839        }
840
841        // Need to create an internal directory.
842        let directory = Directory::create(&mut transaction, self, None).await?;
843
844        transaction.add(self.store_object_id, Mutation::CreateInternalDir(directory.object_id()));
845        transaction.commit().await?;
846        Ok(directory.object_id())
847    }
848
849    /// Returns the file size for the object without opening the object.
850    async fn get_file_size(&self, object_id: u64) -> Result<u64, Error> {
851        let item = self
852            .tree
853            .find(&ObjectKey::attribute(
854                object_id,
855                DEFAULT_DATA_ATTRIBUTE_ID,
856                AttributeKey::Attribute,
857            ))
858            .await?
859            .ok_or(FxfsError::NotFound)?;
860        if let ObjectValue::Attribute { size, .. } = item.value {
861            Ok(size)
862        } else {
863            bail!(FxfsError::NotFile);
864        }
865    }
866
867    #[cfg(feature = "migration")]
868    pub fn last_object_id(&self) -> u64 {
869        self.last_object_id.lock().id
870    }
871
872    /// Provides access to the allocator to mark a specific region of the device as allocated.
873    #[cfg(feature = "migration")]
874    pub fn mark_allocated(
875        &self,
876        transaction: &mut Transaction<'_>,
877        store_object_id: u64,
878        device_range: std::ops::Range<u64>,
879    ) -> Result<(), Error> {
880        self.allocator().mark_allocated(transaction, store_object_id, device_range)
881    }
882
883    /// `crypt` can be provided if the crypt service should be different to the default; see the
884    /// comment on create_object.  Users should avoid having more than one handle open for the same
885    /// object at the same time because they might get out-of-sync; there is no code that will
886    /// prevent this.  One example where this can cause an issue is if the object ends up using a
887    /// permanent key (which is the case if a value is passed for `crypt`), the permanent key is
888    /// dropped when a handle is dropped, which will impact any other handles for the same object.
889    pub async fn open_object<S: HandleOwner>(
890        owner: &Arc<S>,
891        obj_id: u64,
892        options: HandleOptions,
893        crypt: Option<Arc<dyn Crypt>>,
894    ) -> Result<DataObjectHandle<S>, Error> {
895        let store = owner.as_ref().as_ref();
896        let mut fsverity_descriptor = None;
897        let mut overwrite_ranges = Vec::new();
898        let item = store
899            .tree
900            .find(&ObjectKey::attribute(obj_id, DEFAULT_DATA_ATTRIBUTE_ID, AttributeKey::Attribute))
901            .await?
902            .ok_or(FxfsError::NotFound)?;
903
904        let (size, track_overwrite_extents) = match item.value {
905            ObjectValue::Attribute { size, has_overwrite_extents } => (size, has_overwrite_extents),
906            ObjectValue::VerifiedAttribute { size, fsverity_metadata } => {
907                fsverity_descriptor = Some(fsverity_metadata);
908                // We only track the overwrite extents in memory for writes, reads handle them
909                // implicitly, which means verified files (where the data won't change anymore)
910                // don't need to track them.
911                (size, false)
912            }
913            _ => bail!(anyhow!(FxfsError::Inconsistent).context("open_object: Expected attibute")),
914        };
915
916        ensure!(size <= MAX_FILE_SIZE, FxfsError::Inconsistent);
917
918        if track_overwrite_extents {
919            let layer_set = store.tree.layer_set();
920            let mut merger = layer_set.merger();
921            let mut iter = merger
922                .query(Query::FullRange(&ObjectKey::attribute(
923                    obj_id,
924                    DEFAULT_DATA_ATTRIBUTE_ID,
925                    AttributeKey::Extent(ExtentKey::search_key_from_offset(0)),
926                )))
927                .await?;
928            loop {
929                match iter.get() {
930                    Some(ItemRef {
931                        key:
932                            ObjectKey {
933                                object_id,
934                                data:
935                                    ObjectKeyData::Attribute(
936                                        attribute_id,
937                                        AttributeKey::Extent(ExtentKey { range }),
938                                    ),
939                            },
940                        value,
941                        ..
942                    }) if *object_id == obj_id && *attribute_id == DEFAULT_DATA_ATTRIBUTE_ID => {
943                        match value {
944                            ObjectValue::Extent(ExtentValue::None)
945                            | ObjectValue::Extent(ExtentValue::Some {
946                                mode: ExtentMode::Raw,
947                                ..
948                            })
949                            | ObjectValue::Extent(ExtentValue::Some {
950                                mode: ExtentMode::Cow(_),
951                                ..
952                            }) => (),
953                            ObjectValue::Extent(ExtentValue::Some {
954                                mode: ExtentMode::OverwritePartial(_),
955                                ..
956                            })
957                            | ObjectValue::Extent(ExtentValue::Some {
958                                mode: ExtentMode::Overwrite,
959                                ..
960                            }) => overwrite_ranges.push(range.clone()),
961                            _ => bail!(
962                                anyhow!(FxfsError::Inconsistent)
963                                    .context("open_object: Expected extent")
964                            ),
965                        }
966                        iter.advance().await?;
967                    }
968                    _ => break,
969                }
970            }
971        }
972
973        // If a crypt service has been specified, it needs to be a permanent key because cached
974        // keys can only use the store's crypt service.
975        let permanent = if let Some(crypt) = crypt {
976            store
977                .key_manager
978                .get_keys(
979                    obj_id,
980                    crypt.as_ref(),
981                    &mut Some(async || store.get_keys(obj_id).await),
982                    /* permanent= */ true,
983                    /* force= */ false,
984                )
985                .await?;
986            true
987        } else {
988            false
989        };
990        let data_object_handle = DataObjectHandle::new(
991            owner.clone(),
992            obj_id,
993            permanent,
994            DEFAULT_DATA_ATTRIBUTE_ID,
995            size,
996            FsverityState::None,
997            options,
998            false,
999            &overwrite_ranges,
1000        );
1001        if let Some(descriptor) = fsverity_descriptor {
1002            match data_object_handle.read_attr(FSVERITY_MERKLE_ATTRIBUTE_ID).await? {
1003                None => {
1004                    return Err(anyhow!(FxfsError::NotFound));
1005                }
1006                Some(data) => {
1007                    data_object_handle
1008                        .set_fsverity_state_some(descriptor, data)
1009                        .context("Invalid or mismatched merkle tree")?;
1010                }
1011            }
1012        }
1013        Ok(data_object_handle)
1014    }
1015
1016    pub fn create_object_with_id<S: HandleOwner>(
1017        owner: &Arc<S>,
1018        transaction: &mut Transaction<'_>,
1019        object_id: u64,
1020        options: HandleOptions,
1021        encryption_options: Option<ObjectEncryptionOptions>,
1022    ) -> Result<DataObjectHandle<S>, Error> {
1023        debug_assert!(object_id != INVALID_OBJECT_ID);
1024        let store = owner.as_ref().as_ref();
1025        // Don't permit creating unencrypted objects in an encrypted store.  The converse is OK.
1026        debug_assert!(store.crypt().is_none() || encryption_options.is_some());
1027        let now = Timestamp::now();
1028        transaction.add(
1029            store.store_object_id(),
1030            Mutation::insert_object(
1031                ObjectKey::object(object_id),
1032                ObjectValue::file(1, 0, now.clone(), now.clone(), now.clone(), now, 0, None),
1033            ),
1034        );
1035        let mut permanent_keys = false;
1036        if let Some(ObjectEncryptionOptions { permanent, key_id, key, unwrapped_key }) =
1037            encryption_options
1038        {
1039            permanent_keys = permanent;
1040            transaction.add(
1041                store.store_object_id(),
1042                Mutation::insert_object(
1043                    ObjectKey::keys(object_id),
1044                    ObjectValue::keys(vec![(key_id, key)].into()),
1045                ),
1046            );
1047            let cipher: Arc<dyn Cipher> = Arc::new(FxfsCipher::new(&unwrapped_key));
1048            store.key_manager.insert(
1049                object_id,
1050                Arc::new(vec![(key_id, Some(cipher))].into()),
1051                permanent,
1052            );
1053        }
1054        transaction.add(
1055            store.store_object_id(),
1056            Mutation::insert_object(
1057                ObjectKey::attribute(object_id, DEFAULT_DATA_ATTRIBUTE_ID, AttributeKey::Attribute),
1058                // This is a new object so nothing has pre-allocated overwrite extents yet.
1059                ObjectValue::attribute(0, false),
1060            ),
1061        );
1062        Ok(DataObjectHandle::new(
1063            owner.clone(),
1064            object_id,
1065            permanent_keys,
1066            DEFAULT_DATA_ATTRIBUTE_ID,
1067            0,
1068            FsverityState::None,
1069            options,
1070            false,
1071            &[],
1072        ))
1073    }
1074
1075    /// Creates an object in the store.
1076    ///
1077    /// If the store is encrypted, the object will be automatically encrypted as well.
1078    /// If `wrapping_key_id` is set, the new keys will be wrapped with that specific key, and
1079    /// otherwise the default data key is used.
1080    pub async fn create_object<S: HandleOwner>(
1081        owner: &Arc<S>,
1082        mut transaction: &mut Transaction<'_>,
1083        options: HandleOptions,
1084        wrapping_key_id: Option<WrappingKeyId>,
1085    ) -> Result<DataObjectHandle<S>, Error> {
1086        let store = owner.as_ref().as_ref();
1087        let object_id = store.get_next_object_id(transaction.txn_guard()).await?;
1088        let crypt = store.crypt();
1089        let encryption_options = if let Some(crypt) = crypt {
1090            let key_id =
1091                if wrapping_key_id.is_some() { FSCRYPT_KEY_ID } else { VOLUME_DATA_KEY_ID };
1092            let (key, unwrapped_key) = if let Some(wrapping_key_id) = wrapping_key_id {
1093                crypt.create_key_with_id(object_id, wrapping_key_id, ObjectType::File).await?
1094            } else {
1095                let (fxfs_key, unwrapped_key) =
1096                    crypt.create_key(object_id, KeyPurpose::Data).await?;
1097                (EncryptionKey::Fxfs(fxfs_key), unwrapped_key)
1098            };
1099            Some(ObjectEncryptionOptions { permanent: false, key_id, key, unwrapped_key })
1100        } else {
1101            None
1102        };
1103        ObjectStore::create_object_with_id(
1104            owner,
1105            &mut transaction,
1106            object_id,
1107            options,
1108            encryption_options,
1109        )
1110    }
1111
1112    /// Creates an object using explicitly provided keys.
1113    ///
1114    /// There are some cases where an encrypted object needs to be created in an unencrypted store.
1115    /// For example, when layer files for a child store are created in the root store, but they must
1116    /// be encrypted using the child store's keys.  This method exists for that purpose.
1117    pub(crate) async fn create_object_with_key<S: HandleOwner>(
1118        owner: &Arc<S>,
1119        mut transaction: &mut Transaction<'_>,
1120        object_id: u64,
1121        options: HandleOptions,
1122        key: EncryptionKey,
1123        unwrapped_key: UnwrappedKey,
1124    ) -> Result<DataObjectHandle<S>, Error> {
1125        ObjectStore::create_object_with_id(
1126            owner,
1127            &mut transaction,
1128            object_id,
1129            options,
1130            Some(ObjectEncryptionOptions {
1131                permanent: true,
1132                key_id: VOLUME_DATA_KEY_ID,
1133                key,
1134                unwrapped_key,
1135            }),
1136        )
1137    }
1138
1139    /// Adjusts the reference count for a given object.  If the reference count reaches zero, the
1140    /// object is moved into the graveyard and true is returned.
1141    pub async fn adjust_refs(
1142        &self,
1143        transaction: &mut Transaction<'_>,
1144        object_id: u64,
1145        delta: i64,
1146    ) -> Result<bool, Error> {
1147        let mut mutation = self.txn_get_object_mutation(transaction, object_id).await?;
1148        let refs = if let ObjectValue::Object {
1149            kind: ObjectKind::File { refs, .. } | ObjectKind::Symlink { refs, .. },
1150            ..
1151        } = &mut mutation.item.value
1152        {
1153            *refs =
1154                refs.checked_add_signed(delta).ok_or_else(|| anyhow!("refs underflow/overflow"))?;
1155            refs
1156        } else {
1157            bail!(FxfsError::NotFile);
1158        };
1159        if *refs == 0 {
1160            self.add_to_graveyard(transaction, object_id);
1161
1162            // We might still need to adjust the reference count if delta was something other than
1163            // -1.
1164            if delta != -1 {
1165                *refs = 1;
1166                transaction.add(self.store_object_id, Mutation::ObjectStore(mutation));
1167            }
1168            // Otherwise, we don't commit the mutation as we want to keep reference count as 1 for
1169            // objects in graveyard.
1170            Ok(true)
1171        } else {
1172            transaction.add(self.store_object_id, Mutation::ObjectStore(mutation));
1173            Ok(false)
1174        }
1175    }
1176
1177    // Purges an object that is in the graveyard.
1178    pub async fn tombstone_object(
1179        &self,
1180        object_id: u64,
1181        txn_options: Options<'_>,
1182    ) -> Result<(), Error> {
1183        self.key_manager.remove(object_id).await;
1184        let fs = self.filesystem();
1185        let truncate_guard = fs.truncate_guard(self.store_object_id, object_id).await;
1186        self.trim_or_tombstone(object_id, true, txn_options, &truncate_guard).await
1187    }
1188
1189    /// Trim extents beyond the end of a file for all attributes.  This will remove the entry from
1190    /// the graveyard when done.
1191    pub async fn trim(
1192        &self,
1193        object_id: u64,
1194        truncate_guard: &TruncateGuard<'_>,
1195    ) -> Result<(), Error> {
1196        // For the root and root parent store, we would need to use the metadata reservation which
1197        // we don't currently support, so assert that we're not those stores.
1198        assert!(self.parent_store.as_ref().unwrap().parent_store.is_some());
1199
1200        self.trim_or_tombstone(
1201            object_id,
1202            false,
1203            Options { borrow_metadata_space: true, ..Default::default() },
1204            truncate_guard,
1205        )
1206        .await
1207    }
1208
1209    /// Trims or tombstones an object.
1210    async fn trim_or_tombstone(
1211        &self,
1212        object_id: u64,
1213        for_tombstone: bool,
1214        txn_options: Options<'_>,
1215        _truncate_guard: &TruncateGuard<'_>,
1216    ) -> Result<(), Error> {
1217        let fs = self.filesystem();
1218        let mut next_attribute = Some(0);
1219        while let Some(attribute_id) = next_attribute.take() {
1220            let mut transaction = fs
1221                .clone()
1222                .new_transaction(
1223                    lock_keys![
1224                        LockKey::object_attribute(self.store_object_id, object_id, attribute_id),
1225                        LockKey::object(self.store_object_id, object_id),
1226                    ],
1227                    txn_options,
1228                )
1229                .await?;
1230
1231            match self
1232                .trim_some(
1233                    &mut transaction,
1234                    object_id,
1235                    attribute_id,
1236                    if for_tombstone {
1237                        TrimMode::Tombstone(TombstoneMode::Object)
1238                    } else {
1239                        TrimMode::UseSize
1240                    },
1241                )
1242                .await?
1243            {
1244                TrimResult::Incomplete => next_attribute = Some(attribute_id),
1245                TrimResult::Done(None) => {
1246                    if for_tombstone
1247                        || matches!(
1248                            self.tree
1249                                .find(&ObjectKey::graveyard_entry(
1250                                    self.graveyard_directory_object_id(),
1251                                    object_id,
1252                                ))
1253                                .await?,
1254                            Some(Item { value: ObjectValue::Trim, .. })
1255                        )
1256                    {
1257                        self.remove_from_graveyard(&mut transaction, object_id);
1258                    }
1259                }
1260                TrimResult::Done(id) => next_attribute = id,
1261            }
1262
1263            if !transaction.mutations().is_empty() {
1264                transaction.commit().await?;
1265            }
1266        }
1267        Ok(())
1268    }
1269
1270    // Purges an object's attribute that is in the graveyard.
1271    pub async fn tombstone_attribute(
1272        &self,
1273        object_id: u64,
1274        attribute_id: u64,
1275        txn_options: Options<'_>,
1276    ) -> Result<(), Error> {
1277        let fs = self.filesystem();
1278        let mut trim_result = TrimResult::Incomplete;
1279        while matches!(trim_result, TrimResult::Incomplete) {
1280            let mut transaction = fs
1281                .clone()
1282                .new_transaction(
1283                    lock_keys![
1284                        LockKey::object_attribute(self.store_object_id, object_id, attribute_id),
1285                        LockKey::object(self.store_object_id, object_id),
1286                    ],
1287                    txn_options,
1288                )
1289                .await?;
1290            trim_result = self
1291                .trim_some(
1292                    &mut transaction,
1293                    object_id,
1294                    attribute_id,
1295                    TrimMode::Tombstone(TombstoneMode::Attribute),
1296                )
1297                .await?;
1298            if let TrimResult::Done(..) = trim_result {
1299                self.remove_attribute_from_graveyard(&mut transaction, object_id, attribute_id)
1300            }
1301            if !transaction.mutations().is_empty() {
1302                transaction.commit().await?;
1303            }
1304        }
1305        Ok(())
1306    }
1307
1308    /// Deletes extents for attribute `attribute_id` in object `object_id`.  Also see the comments
1309    /// for TrimMode and TrimResult. Should hold a lock on the attribute, and the object as it
1310    /// performs a read-modify-write on the sizes.
1311    pub async fn trim_some(
1312        &self,
1313        transaction: &mut Transaction<'_>,
1314        object_id: u64,
1315        attribute_id: u64,
1316        mode: TrimMode,
1317    ) -> Result<TrimResult, Error> {
1318        let layer_set = self.tree.layer_set();
1319        let mut merger = layer_set.merger();
1320
1321        let aligned_offset = match mode {
1322            TrimMode::FromOffset(offset) => {
1323                round_up(offset, self.block_size).ok_or(FxfsError::Inconsistent)?
1324            }
1325            TrimMode::Tombstone(..) => 0,
1326            TrimMode::UseSize => {
1327                let iter = merger
1328                    .query(Query::FullRange(&ObjectKey::attribute(
1329                        object_id,
1330                        attribute_id,
1331                        AttributeKey::Attribute,
1332                    )))
1333                    .await?;
1334                if let Some(item_ref) = iter.get() {
1335                    if item_ref.key.object_id != object_id {
1336                        return Ok(TrimResult::Done(None));
1337                    }
1338
1339                    if let ItemRef {
1340                        key:
1341                            ObjectKey {
1342                                data:
1343                                    ObjectKeyData::Attribute(size_attribute_id, AttributeKey::Attribute),
1344                                ..
1345                            },
1346                        value: ObjectValue::Attribute { size, .. },
1347                        ..
1348                    } = item_ref
1349                    {
1350                        // If we found a different attribute_id, return so we can get the
1351                        // right lock.
1352                        if *size_attribute_id != attribute_id {
1353                            return Ok(TrimResult::Done(Some(*size_attribute_id)));
1354                        }
1355                        round_up(*size, self.block_size).ok_or(FxfsError::Inconsistent)?
1356                    } else {
1357                        // At time of writing, we should always see a size record or None here, but
1358                        // asserting here would be brittle so just skip to the the next attribute
1359                        // instead.
1360                        return Ok(TrimResult::Done(Some(attribute_id + 1)));
1361                    }
1362                } else {
1363                    // End of the tree.
1364                    return Ok(TrimResult::Done(None));
1365                }
1366            }
1367        };
1368
1369        // Loop over the extents and deallocate them.
1370        let mut iter = merger
1371            .query(Query::FullRange(&ObjectKey::from_extent(
1372                object_id,
1373                attribute_id,
1374                ExtentKey::search_key_from_offset(aligned_offset),
1375            )))
1376            .await?;
1377        let mut end = 0;
1378        let allocator = self.allocator();
1379        let mut result = TrimResult::Done(None);
1380        let mut deallocated = 0;
1381        let block_size = self.block_size;
1382
1383        while let Some(item_ref) = iter.get() {
1384            if item_ref.key.object_id != object_id {
1385                break;
1386            }
1387            if let ObjectKey {
1388                data: ObjectKeyData::Attribute(extent_attribute_id, attribute_key),
1389                ..
1390            } = item_ref.key
1391            {
1392                if *extent_attribute_id != attribute_id {
1393                    result = TrimResult::Done(Some(*extent_attribute_id));
1394                    break;
1395                }
1396                if let (
1397                    AttributeKey::Extent(ExtentKey { range }),
1398                    ObjectValue::Extent(ExtentValue::Some { device_offset, .. }),
1399                ) = (attribute_key, item_ref.value)
1400                {
1401                    let start = std::cmp::max(range.start, aligned_offset);
1402                    ensure!(start < range.end, FxfsError::Inconsistent);
1403                    let device_offset = device_offset
1404                        .checked_add(start - range.start)
1405                        .ok_or(FxfsError::Inconsistent)?;
1406                    end = range.end;
1407                    let len = end - start;
1408                    let device_range = device_offset..device_offset + len;
1409                    ensure!(device_range.is_aligned(block_size), FxfsError::Inconsistent);
1410                    allocator.deallocate(transaction, self.store_object_id, device_range).await?;
1411                    deallocated += len;
1412                    // Stop if the transaction is getting too big.
1413                    if transaction.mutations().len() >= TRANSACTION_MUTATION_THRESHOLD {
1414                        result = TrimResult::Incomplete;
1415                        break;
1416                    }
1417                }
1418            }
1419            iter.advance().await?;
1420        }
1421
1422        let finished_tombstone_object = matches!(mode, TrimMode::Tombstone(TombstoneMode::Object))
1423            && matches!(result, TrimResult::Done(None));
1424        let finished_tombstone_attribute =
1425            matches!(mode, TrimMode::Tombstone(TombstoneMode::Attribute))
1426                && !matches!(result, TrimResult::Incomplete);
1427        let mut object_mutation = None;
1428        let nodes = if finished_tombstone_object { -1 } else { 0 };
1429        if nodes != 0 || deallocated != 0 {
1430            let mutation = self.txn_get_object_mutation(transaction, object_id).await?;
1431            if let ObjectValue::Object { attributes: ObjectAttributes { project_id, .. }, .. } =
1432                mutation.item.value
1433            {
1434                if project_id != 0 {
1435                    transaction.add(
1436                        self.store_object_id,
1437                        Mutation::merge_object(
1438                            ObjectKey::project_usage(self.root_directory_object_id(), project_id),
1439                            ObjectValue::BytesAndNodes {
1440                                bytes: -i64::try_from(deallocated).unwrap(),
1441                                nodes,
1442                            },
1443                        ),
1444                    );
1445                }
1446                object_mutation = Some(mutation);
1447            } else {
1448                panic!("Inconsistent object type.");
1449            }
1450        }
1451
1452        // Deletion marker records *must* be merged so as to consume all other records for the
1453        // object.
1454        if finished_tombstone_object {
1455            transaction.add(
1456                self.store_object_id,
1457                Mutation::merge_object(ObjectKey::object(object_id), ObjectValue::None),
1458            );
1459        } else {
1460            if finished_tombstone_attribute {
1461                transaction.add(
1462                    self.store_object_id,
1463                    Mutation::merge_object(
1464                        ObjectKey::attribute(object_id, attribute_id, AttributeKey::Attribute),
1465                        ObjectValue::None,
1466                    ),
1467                );
1468            }
1469            if deallocated > 0 {
1470                let mut mutation = match object_mutation {
1471                    Some(mutation) => mutation,
1472                    None => self.txn_get_object_mutation(transaction, object_id).await?,
1473                };
1474                transaction.add(
1475                    self.store_object_id,
1476                    Mutation::merge_object(
1477                        ObjectKey::extent(object_id, attribute_id, aligned_offset..end),
1478                        ObjectValue::deleted_extent(),
1479                    ),
1480                );
1481                // Update allocated size.
1482                if let ObjectValue::Object {
1483                    attributes: ObjectAttributes { allocated_size, .. },
1484                    ..
1485                } = &mut mutation.item.value
1486                {
1487                    // The only way for these to fail are if the volume is inconsistent.
1488                    *allocated_size = allocated_size.checked_sub(deallocated).ok_or_else(|| {
1489                        anyhow!(FxfsError::Inconsistent).context("Allocated size overflow")
1490                    })?;
1491                } else {
1492                    panic!("Unexpected object value");
1493                }
1494                transaction.add(self.store_object_id, Mutation::ObjectStore(mutation));
1495            }
1496        }
1497        Ok(result)
1498    }
1499
1500    /// Returns all objects that exist in the parent store that pertain to this object store.
1501    /// Note that this doesn't include the object_id of the store itself which is generally
1502    /// referenced externally.
1503    pub fn parent_objects(&self) -> Vec<u64> {
1504        assert!(self.store_info_handle.get().is_some());
1505        self.store_info.lock().as_ref().unwrap().parent_objects()
1506    }
1507
1508    /// Returns root objects for this store.
1509    pub fn root_objects(&self) -> Vec<u64> {
1510        let mut objects = Vec::new();
1511        let store_info = self.store_info.lock();
1512        let info = store_info.as_ref().unwrap();
1513        if info.root_directory_object_id != INVALID_OBJECT_ID {
1514            objects.push(info.root_directory_object_id);
1515        }
1516        if info.graveyard_directory_object_id != INVALID_OBJECT_ID {
1517            objects.push(info.graveyard_directory_object_id);
1518        }
1519        if info.internal_directory_object_id != INVALID_OBJECT_ID {
1520            objects.push(info.internal_directory_object_id);
1521        }
1522        objects
1523    }
1524
1525    pub fn store_info(&self) -> Option<StoreInfo> {
1526        self.store_info.lock().as_ref().cloned()
1527    }
1528
1529    /// Returns None if called during journal replay.
1530    pub fn store_info_handle_object_id(&self) -> Option<u64> {
1531        self.store_info_handle.get().map(|h| h.object_id())
1532    }
1533
1534    /// Called to open a store, before replay of this store's mutations.
1535    async fn open(
1536        parent_store: &Arc<ObjectStore>,
1537        store_object_id: u64,
1538        object_cache: Box<dyn ObjectCache<ObjectKey, ObjectValue>>,
1539    ) -> Result<Arc<ObjectStore>, Error> {
1540        let handle =
1541            ObjectStore::open_object(parent_store, store_object_id, HandleOptions::default(), None)
1542                .await?;
1543
1544        let info = load_store_info(parent_store, store_object_id).await?;
1545        let is_encrypted = info.mutations_key.is_some();
1546
1547        let mut total_layer_size = 0;
1548        let last_object_id;
1549
1550        // TODO(https://fxbug.dev/42178043): the layer size here could be bad and cause overflow.
1551
1552        // If the store is encrypted, we can't open the object tree layers now, but we need to
1553        // compute the size of the layers.
1554        if is_encrypted {
1555            for &oid in &info.layers {
1556                total_layer_size += parent_store.get_file_size(oid).await?;
1557            }
1558            if info.encrypted_mutations_object_id != INVALID_OBJECT_ID {
1559                total_layer_size += layer_size_from_encrypted_mutations_size(
1560                    parent_store.get_file_size(info.encrypted_mutations_object_id).await?,
1561                );
1562            }
1563            last_object_id = LastObjectId::default();
1564        } else {
1565            last_object_id = LastObjectId { id: info.last_object_id, cipher: None };
1566        }
1567
1568        let fs = parent_store.filesystem();
1569
1570        let store = ObjectStore::new(
1571            Some(parent_store.clone()),
1572            store_object_id,
1573            fs.clone(),
1574            if is_encrypted { None } else { Some(info) },
1575            object_cache,
1576            None,
1577            if is_encrypted { LockState::Locked } else { LockState::Unencrypted },
1578            last_object_id,
1579        );
1580
1581        assert!(store.store_info_handle.set(handle).is_ok(), "Failed to set store_info_handle!");
1582
1583        if !is_encrypted {
1584            let object_tree_layer_object_ids =
1585                store.store_info.lock().as_ref().unwrap().layers.clone();
1586            let object_layers = store.open_layers(object_tree_layer_object_ids, None).await?;
1587            total_layer_size = object_layers.iter().map(|h| h.get_size()).sum();
1588            store
1589                .tree
1590                .append_layers(object_layers)
1591                .await
1592                .context("Failed to read object store layers")?;
1593        }
1594
1595        fs.object_manager().update_reservation(
1596            store_object_id,
1597            tree::reservation_amount_from_layer_size(total_layer_size),
1598        );
1599
1600        Ok(store)
1601    }
1602
1603    async fn load_store_info(&self) -> Result<StoreInfo, Error> {
1604        load_store_info(self.parent_store.as_ref().unwrap(), self.store_object_id).await
1605    }
1606
1607    async fn open_layers(
1608        &self,
1609        object_ids: impl std::iter::IntoIterator<Item = u64>,
1610        crypt: Option<Arc<dyn Crypt>>,
1611    ) -> Result<Vec<DataObjectHandle<ObjectStore>>, Error> {
1612        let parent_store = self.parent_store.as_ref().unwrap();
1613        let mut handles = Vec::new();
1614        for object_id in object_ids {
1615            let handle = ObjectStore::open_object(
1616                &parent_store,
1617                object_id,
1618                HandleOptions::default(),
1619                crypt.clone(),
1620            )
1621            .await
1622            .with_context(|| format!("Failed to open layer file {}", object_id))?;
1623            handles.push(handle);
1624        }
1625        Ok(handles)
1626    }
1627
1628    /// Unlocks a store so that it is ready to be used.
1629    /// This is not thread-safe.
1630    pub async fn unlock(
1631        self: &Arc<Self>,
1632        owner: Weak<dyn StoreOwner>,
1633        crypt: Arc<dyn Crypt>,
1634    ) -> Result<(), Error> {
1635        self.unlock_inner(owner, crypt, /*read_only=*/ false).await
1636    }
1637
1638    /// Unlocks a store so that it is ready to be read from.
1639    /// The store will generally behave like it is still locked: when flushed, the store will
1640    /// write out its mutations into the encrypted mutations file, rather than directly updating
1641    /// the layer files of the object store.
1642    /// Re-locking the store (which *must* be done with `Self::lock_read_only` will not trigger a
1643    /// flush, although the store might still be flushed during other operations.
1644    /// This is not thread-safe.
1645    pub async fn unlock_read_only(self: &Arc<Self>, crypt: Arc<dyn Crypt>) -> Result<(), Error> {
1646        self.unlock_inner(NO_OWNER, crypt, /*read_only=*/ true).await
1647    }
1648
1649    async fn unlock_inner(
1650        self: &Arc<Self>,
1651        owner: Weak<dyn StoreOwner>,
1652        crypt: Arc<dyn Crypt>,
1653        read_only: bool,
1654    ) -> Result<(), Error> {
1655        // Unless we are unlocking the store as read-only, the filesystem must not be read-only.
1656        assert!(read_only || !self.filesystem().options().read_only);
1657        match &*self.lock_state.lock() {
1658            LockState::Locked => {}
1659            LockState::Unencrypted => bail!(FxfsError::InvalidArgs),
1660            LockState::Invalid | LockState::Deleted => bail!(FxfsError::Internal),
1661            LockState::Unlocked { .. } | LockState::UnlockedReadOnly(..) => {
1662                bail!(FxfsError::AlreadyBound)
1663            }
1664            LockState::Unknown => panic!("Store was unlocked before replay"),
1665            LockState::Locking => panic!("Store is being locked"),
1666            LockState::Unlocking => panic!("Store is being unlocked"),
1667        }
1668        // We must lock flushing since that can modify store_info and the encrypted mutations file.
1669        let keys = lock_keys![LockKey::flush(self.store_object_id())];
1670        let fs = self.filesystem();
1671        let guard = fs.lock_manager().write_lock(keys).await;
1672
1673        let store_info = self.load_store_info().await?;
1674
1675        self.tree
1676            .append_layers(
1677                self.open_layers(store_info.layers.iter().cloned(), Some(crypt.clone())).await?,
1678            )
1679            .await
1680            .context("Failed to read object tree layer file contents")?;
1681
1682        let wrapped_key =
1683            fxfs_crypto::WrappedKey::Fxfs(store_info.mutations_key.clone().unwrap().into());
1684        let unwrapped_key = crypt
1685            .unwrap_key(&wrapped_key, self.store_object_id)
1686            .await
1687            .context("Failed to unwrap mutations keys")?;
1688        // The ChaCha20 stream cipher we use supports up to 64 GiB.  By default we'll roll the key
1689        // after every 128 MiB.  Here we just need to pick a number that won't cause issues if it
1690        // wraps, so we just use u32::MAX (the offset is u64).
1691        ensure!(store_info.mutations_cipher_offset <= u32::MAX as u64, FxfsError::Inconsistent);
1692        let mut mutations_cipher =
1693            StreamCipher::new(&unwrapped_key, store_info.mutations_cipher_offset);
1694
1695        let wrapped_key = fxfs_crypto::WrappedKey::Fxfs(
1696            store_info.object_id_key.clone().ok_or(FxfsError::Inconsistent)?.into(),
1697        );
1698        let object_id_cipher =
1699            Ff1::new(&crypt.unwrap_key(&wrapped_key, self.store_object_id).await?);
1700        {
1701            let mut last_object_id = self.last_object_id.lock();
1702            last_object_id.cipher = Some(object_id_cipher);
1703        }
1704        self.update_last_object_id(store_info.last_object_id);
1705
1706        // Apply the encrypted mutations.
1707        let mut mutations = {
1708            if store_info.encrypted_mutations_object_id == INVALID_OBJECT_ID {
1709                EncryptedMutations::default()
1710            } else {
1711                let parent_store = self.parent_store.as_ref().unwrap();
1712                let handle = ObjectStore::open_object(
1713                    &parent_store,
1714                    store_info.encrypted_mutations_object_id,
1715                    HandleOptions::default(),
1716                    None,
1717                )
1718                .await?;
1719                let mut cursor = std::io::Cursor::new(
1720                    handle
1721                        .contents(MAX_ENCRYPTED_MUTATIONS_SIZE)
1722                        .await
1723                        .context(FxfsError::Inconsistent)?,
1724                );
1725                let mut mutations = EncryptedMutations::deserialize_with_version(&mut cursor)
1726                    .context("Failed to deserialize EncryptedMutations")?
1727                    .0;
1728                let len = cursor.get_ref().len() as u64;
1729                while cursor.position() < len {
1730                    mutations.extend(
1731                        &EncryptedMutations::deserialize_with_version(&mut cursor)
1732                            .context("Failed to deserialize EncryptedMutations")?
1733                            .0,
1734                    );
1735                }
1736                mutations
1737            }
1738        };
1739
1740        // This assumes that the journal has no buffered mutations for this store (see Self::lock).
1741        let journaled = EncryptedMutations::from_replayed_mutations(
1742            self.store_object_id,
1743            fs.journal()
1744                .read_transactions_for_object(self.store_object_id)
1745                .await
1746                .context("Failed to read encrypted mutations from journal")?,
1747        );
1748        mutations.extend(&journaled);
1749
1750        let _ = std::mem::replace(&mut *self.lock_state.lock(), LockState::Unlocking);
1751        *self.store_info.lock() = Some(store_info);
1752
1753        // If we fail, clean up.
1754        let clean_up = scopeguard::guard((), |_| {
1755            *self.lock_state.lock() = LockState::Locked;
1756            *self.store_info.lock() = None;
1757            // Make sure we don't leave unencrypted data lying around in memory.
1758            self.tree.reset();
1759        });
1760
1761        let EncryptedMutations { transactions, mut data, mutations_key_roll } = mutations;
1762
1763        let mut slice = &mut data[..];
1764        let mut last_offset = 0;
1765        for (offset, key) in mutations_key_roll {
1766            let split_offset = offset
1767                .checked_sub(last_offset)
1768                .ok_or(FxfsError::Inconsistent)
1769                .context("Invalid mutation key roll offset")?;
1770            last_offset = offset;
1771            ensure!(split_offset <= slice.len(), FxfsError::Inconsistent);
1772            let (old, new) = slice.split_at_mut(split_offset);
1773            mutations_cipher.decrypt(old);
1774            let unwrapped_key = crypt
1775                .unwrap_key(&fxfs_crypto::WrappedKey::Fxfs(key.into()), self.store_object_id)
1776                .await
1777                .context("Failed to unwrap mutations keys")?;
1778            mutations_cipher = StreamCipher::new(&unwrapped_key, 0);
1779            slice = new;
1780        }
1781        mutations_cipher.decrypt(slice);
1782
1783        // Always roll the mutations key when we unlock which guarantees we won't reuse a
1784        // previous key and nonce.
1785        self.roll_mutations_key(crypt.as_ref()).await?;
1786
1787        let mut cursor = std::io::Cursor::new(data);
1788        for (checkpoint, count) in transactions {
1789            let context = ApplyContext { mode: ApplyMode::Replay, checkpoint };
1790            for _ in 0..count {
1791                let mutation =
1792                    Mutation::deserialize_from_version(&mut cursor, context.checkpoint.version)
1793                        .context("failed to deserialize encrypted mutation")?;
1794                self.apply_mutation(mutation, &context, AssocObj::None)
1795                    .context("failed to apply encrypted mutation")?;
1796            }
1797        }
1798
1799        *self.lock_state.lock() = if read_only {
1800            LockState::UnlockedReadOnly(crypt)
1801        } else {
1802            LockState::Unlocked { owner, crypt }
1803        };
1804
1805        // To avoid unbounded memory growth, we should flush the encrypted mutations now. Otherwise
1806        // it's possible for more writes to be queued and for the store to be locked before we can
1807        // flush anything and that can repeat.
1808        std::mem::drop(guard);
1809
1810        if !read_only && !self.filesystem().options().read_only {
1811            self.flush_with_reason(flush::Reason::Unlock).await?;
1812
1813            // Reap purged files within this store.
1814            let _ = self.filesystem().graveyard().initial_reap(&self).await?;
1815        }
1816
1817        // Return and cancel the clean up.
1818        Ok(ScopeGuard::into_inner(clean_up))
1819    }
1820
1821    pub fn is_locked(&self) -> bool {
1822        matches!(
1823            *self.lock_state.lock(),
1824            LockState::Locked | LockState::Locking | LockState::Unknown
1825        )
1826    }
1827
1828    /// NB: This is not the converse of `is_locked`, as there are lock states where neither are
1829    /// true.
1830    pub fn is_unlocked(&self) -> bool {
1831        matches!(
1832            *self.lock_state.lock(),
1833            LockState::Unlocked { .. } | LockState::UnlockedReadOnly { .. } | LockState::Unlocking
1834        )
1835    }
1836
1837    pub fn is_unknown(&self) -> bool {
1838        matches!(*self.lock_state.lock(), LockState::Unknown)
1839    }
1840
1841    pub fn is_encrypted(&self) -> bool {
1842        self.store_info.lock().as_ref().unwrap().mutations_key.is_some()
1843    }
1844
1845    // Locks a store.
1846    // This operation will take a flush lock on the store, in case any flushes are ongoing.  Any
1847    // ongoing store accesses might be interrupted by this.  See `Self::crypt`.
1848    // Whilst this can return an error, the store will be placed into an unusable but safe state
1849    // (i.e. no lingering unencrypted data) if an error is encountered.
1850    pub async fn lock(&self) -> Result<(), Error> {
1851        // We must lock flushing since it is not safe for that to be happening whilst we are locking
1852        // the store.
1853        let keys = lock_keys![LockKey::flush(self.store_object_id())];
1854        let fs = self.filesystem();
1855        let _guard = fs.lock_manager().write_lock(keys).await;
1856
1857        {
1858            let mut lock_state = self.lock_state.lock();
1859            if let LockState::Unlocked { .. } = &*lock_state {
1860                *lock_state = LockState::Locking;
1861            } else {
1862                panic!("Unexpected lock state: {:?}", &*lock_state);
1863            }
1864        }
1865
1866        // Sync the journal now to ensure that any buffered mutations for this store make it out to
1867        // disk.  This is necessary to be able to unlock the store again.
1868        // We need to establish a barrier at this point (so that the journaled writes are observable
1869        // by any future attempts to unlock the store), hence the flush_device.
1870        let sync_result =
1871            self.filesystem().sync(SyncOptions { flush_device: true, ..Default::default() }).await;
1872
1873        *self.lock_state.lock() = if let Err(error) = &sync_result {
1874            error!(error:?; "Failed to sync journal; store will no longer be usable");
1875            LockState::Invalid
1876        } else {
1877            LockState::Locked
1878        };
1879        self.key_manager.clear();
1880        *self.store_info.lock() = None;
1881        self.tree.reset();
1882
1883        sync_result
1884    }
1885
1886    // Locks a store which was previously unlocked read-only (see `Self::unlock_read_only`).  Data
1887    // is not flushed, and instead any journaled mutations are buffered back into the ObjectStore
1888    // and will be replayed next time the store is unlocked.
1889    pub fn lock_read_only(&self) {
1890        *self.lock_state.lock() = LockState::Locked;
1891        *self.store_info.lock() = None;
1892        self.tree.reset();
1893    }
1894
1895    // Returns INVALID_OBJECT_ID if the object ID cipher needs to be created or rolled.
1896    pub(super) fn maybe_get_next_object_id(&self) -> u64 {
1897        let mut last_object_id = self.last_object_id.lock();
1898        if last_object_id.should_create_cipher() {
1899            INVALID_OBJECT_ID
1900        } else {
1901            last_object_id.get_next_object_id()
1902        }
1903    }
1904
1905    /// Returns a new object ID that can be used.  This will create an object ID cipher if needed.
1906    ///
1907    /// If the object ID key needs to be rolled, a new transaction will be created and committed.
1908    /// This transaction does not take the filesystem lock, hence `txn_guard`.
1909    pub async fn get_next_object_id(&self, txn_guard: &TxnGuard<'_>) -> Result<u64, Error> {
1910        let object_id = self.maybe_get_next_object_id();
1911        if object_id != INVALID_OBJECT_ID {
1912            return Ok(object_id);
1913        }
1914
1915        // Create a transaction (which has a lock) and then check again.
1916        let mut transaction = self
1917            .filesystem()
1918            .new_transaction(
1919                lock_keys![LockKey::object(
1920                    self.parent_store.as_ref().unwrap().store_object_id,
1921                    self.store_object_id,
1922                )],
1923                Options {
1924                    // We must skip journal checks because this transaction might be needed to
1925                    // compact.
1926                    skip_journal_checks: true,
1927                    borrow_metadata_space: true,
1928                    txn_guard: Some(txn_guard),
1929                    ..Default::default()
1930                },
1931            )
1932            .await?;
1933
1934        {
1935            let mut last_object_id = self.last_object_id.lock();
1936            if !last_object_id.should_create_cipher() {
1937                // We lost a race.
1938                return Ok(last_object_id.get_next_object_id());
1939            }
1940            // It shouldn't be possible for last_object_id to wrap within our lifetime, so if this
1941            // happens, it's most likely due to corruption.
1942            ensure!(
1943                last_object_id.id & OBJECT_ID_HI_MASK != OBJECT_ID_HI_MASK,
1944                FxfsError::Inconsistent
1945            );
1946        }
1947
1948        // Create a key.
1949        let (object_id_wrapped, object_id_unwrapped) =
1950            self.crypt().unwrap().create_key(self.store_object_id, KeyPurpose::Metadata).await?;
1951
1952        // Update StoreInfo.
1953        let buf = {
1954            let serialized_info = {
1955                let mut store_info = self.store_info.lock();
1956                let store_info = store_info.as_mut().unwrap();
1957                store_info.object_id_key = Some(object_id_wrapped);
1958                let mut serialized_info = Vec::new();
1959                store_info.serialize_with_version(&mut serialized_info)?;
1960                serialized_info
1961            };
1962            let mut buf = self.device.allocate_buffer(serialized_info.len()).await;
1963            buf.as_mut_slice().copy_from_slice(&serialized_info[..]);
1964            buf
1965        };
1966
1967        self.store_info_handle
1968            .get()
1969            .unwrap()
1970            .txn_write(&mut transaction, 0u64, buf.as_ref())
1971            .await?;
1972        transaction.commit().await?;
1973
1974        let mut last_object_id = self.last_object_id.lock();
1975        last_object_id.cipher = Some(Ff1::new(&object_id_unwrapped));
1976        last_object_id.id = (last_object_id.id + (1 << 32)) & OBJECT_ID_HI_MASK;
1977
1978        Ok((last_object_id.id & OBJECT_ID_HI_MASK)
1979            | last_object_id.cipher.as_ref().unwrap().encrypt(last_object_id.id as u32) as u64)
1980    }
1981
1982    /// Query the next object ID that will be used. Intended for use when checking filesystem
1983    /// consistency. Prefer [`Self::get_next_object_id()`] for general use.
1984    pub(crate) fn query_next_object_id(&self) -> u64 {
1985        let mut last_object_id = self.last_object_id.lock().clone();
1986        if last_object_id.should_create_cipher() {
1987            INVALID_OBJECT_ID
1988        } else {
1989            last_object_id.get_next_object_id()
1990        }
1991    }
1992
1993    fn allocator(&self) -> Arc<Allocator> {
1994        self.filesystem().allocator()
1995    }
1996
1997    // If |transaction| has an impending mutation for the underlying object, returns that.
1998    // Otherwise, looks up the object from the tree and returns a suitable mutation for it.  The
1999    // mutation is returned here rather than the item because the mutation includes the operation
2000    // which has significance: inserting an object implies it's the first of its kind unlike
2001    // replacing an object.
2002    async fn txn_get_object_mutation(
2003        &self,
2004        transaction: &Transaction<'_>,
2005        object_id: u64,
2006    ) -> Result<ObjectStoreMutation, Error> {
2007        if let Some(mutation) =
2008            transaction.get_object_mutation(self.store_object_id, ObjectKey::object(object_id))
2009        {
2010            Ok(mutation.clone())
2011        } else {
2012            Ok(ObjectStoreMutation {
2013                item: self
2014                    .tree
2015                    .find(&ObjectKey::object(object_id))
2016                    .await?
2017                    .ok_or(FxfsError::Inconsistent)
2018                    .context("Object id missing")?,
2019                op: Operation::ReplaceOrInsert,
2020            })
2021        }
2022    }
2023
2024    /// Like txn_get_object_mutation but with expanded visibility.
2025    /// Only available in migration code.
2026    #[cfg(feature = "migration")]
2027    pub async fn get_object_mutation(
2028        &self,
2029        transaction: &Transaction<'_>,
2030        object_id: u64,
2031    ) -> Result<ObjectStoreMutation, Error> {
2032        self.txn_get_object_mutation(transaction, object_id).await
2033    }
2034
2035    fn update_last_object_id(&self, mut object_id: u64) {
2036        let mut last_object_id = self.last_object_id.lock();
2037        // For encrypted stores, object_id will be encrypted here, so we must decrypt first.
2038        if let Some(cipher) = &last_object_id.cipher {
2039            // If the object ID cipher has been rolled, then it's possible we might see object IDs
2040            // that were generated using a different cipher so the decrypt here will return the
2041            // wrong value, but that won't matter because the hi part of the object ID should still
2042            // discriminate.
2043            object_id = object_id & OBJECT_ID_HI_MASK | cipher.decrypt(object_id as u32) as u64;
2044        }
2045        if object_id > last_object_id.id {
2046            last_object_id.id = object_id;
2047        }
2048    }
2049
2050    /// Adds the specified object to the graveyard.
2051    pub fn add_to_graveyard(&self, transaction: &mut Transaction<'_>, object_id: u64) {
2052        let graveyard_id = self.graveyard_directory_object_id();
2053        assert_ne!(graveyard_id, INVALID_OBJECT_ID);
2054        transaction.add(
2055            self.store_object_id,
2056            Mutation::replace_or_insert_object(
2057                ObjectKey::graveyard_entry(graveyard_id, object_id),
2058                ObjectValue::Some,
2059            ),
2060        );
2061    }
2062
2063    /// Removes the specified object from the graveyard.  NB: Care should be taken when calling
2064    /// this because graveyard entries are used for purging deleted files *and* for trimming
2065    /// extents.  For example, consider the following sequence:
2066    ///
2067    ///     1. Add Trim graveyard entry.
2068    ///     2. Replace with Some graveyard entry (see above).
2069    ///     3. Remove graveyard entry.
2070    ///
2071    /// If the desire in #3 is just to cancel the effect of the Some entry, then #3 should
2072    /// actually be:
2073    ///
2074    ///     3. Replace with Trim graveyard entry.
2075    pub fn remove_from_graveyard(&self, transaction: &mut Transaction<'_>, object_id: u64) {
2076        transaction.add(
2077            self.store_object_id,
2078            Mutation::replace_or_insert_object(
2079                ObjectKey::graveyard_entry(self.graveyard_directory_object_id(), object_id),
2080                ObjectValue::None,
2081            ),
2082        );
2083    }
2084
2085    /// Removes the specified attribute from the graveyard. Unlike object graveyard entries,
2086    /// attribute graveyard entries only have one functionality (i.e. to purge deleted attributes)
2087    /// so the caller does not need to be concerned about replacing the graveyard attribute entry
2088    /// with its prior state when cancelling it. See comment on `remove_from_graveyard()`.
2089    pub fn remove_attribute_from_graveyard(
2090        &self,
2091        transaction: &mut Transaction<'_>,
2092        object_id: u64,
2093        attribute_id: u64,
2094    ) {
2095        transaction.add(
2096            self.store_object_id,
2097            Mutation::replace_or_insert_object(
2098                ObjectKey::graveyard_attribute_entry(
2099                    self.graveyard_directory_object_id(),
2100                    object_id,
2101                    attribute_id,
2102                ),
2103                ObjectValue::None,
2104            ),
2105        );
2106    }
2107
2108    // Roll the mutations key.  The new key will be written for the next encrypted mutation.
2109    async fn roll_mutations_key(&self, crypt: &dyn Crypt) -> Result<(), Error> {
2110        let (wrapped_key, unwrapped_key) =
2111            crypt.create_key(self.store_object_id, KeyPurpose::Metadata).await?;
2112
2113        // The mutations_cipher lock must be held for the duration so that mutations_cipher and
2114        // store_info are updated atomically.  Otherwise, write_mutation could find a new cipher but
2115        // end up writing the wrong wrapped key.
2116        let mut cipher = self.mutations_cipher.lock();
2117        *cipher = Some(StreamCipher::new(&unwrapped_key, 0));
2118        self.store_info.lock().as_mut().unwrap().mutations_key = Some(wrapped_key);
2119        // mutations_cipher_offset is updated by flush.
2120        Ok(())
2121    }
2122
2123    // When the symlink is unlocked, this function decrypts `link` and returns a bag of bytes that
2124    // is identical to that which was passed in as the target on `create_symlink`.
2125    // If the symlink is locked, this function hashes the encrypted `link` with Sha256 in order to
2126    // get a standard length and then base64 encodes the hash and returns that to the caller.
2127    pub async fn read_encrypted_symlink(
2128        &self,
2129        object_id: u64,
2130        link: Vec<u8>,
2131    ) -> Result<Vec<u8>, Error> {
2132        let mut link = link;
2133        let key = self
2134            .key_manager()
2135            .get_fscrypt_key(object_id, self.crypt().unwrap().as_ref(), async || {
2136                self.get_keys(object_id).await
2137            })
2138            .await?;
2139        if let Some(key) = key {
2140            key.decrypt_filename(object_id, &mut link)?;
2141            Ok(link)
2142        } else {
2143            let proxy_filename = fscrypt::proxy_filename::ProxyFilename::new(0, &link);
2144            let proxy_filename_str: String = proxy_filename.into();
2145            Ok(proxy_filename_str.as_bytes().to_vec())
2146        }
2147    }
2148
2149    /// Returns the link of a symlink object.
2150    pub async fn read_symlink(&self, object_id: u64) -> Result<Vec<u8>, Error> {
2151        match self.tree.find(&ObjectKey::object(object_id)).await? {
2152            None => bail!(FxfsError::NotFound),
2153            Some(Item {
2154                value: ObjectValue::Object { kind: ObjectKind::EncryptedSymlink { link, .. }, .. },
2155                ..
2156            }) => self.read_encrypted_symlink(object_id, link.to_vec()).await,
2157            Some(Item {
2158                value: ObjectValue::Object { kind: ObjectKind::Symlink { link, .. }, .. },
2159                ..
2160            }) => Ok(link.to_vec()),
2161            Some(item) => Err(anyhow!(FxfsError::Inconsistent)
2162                .context(format!("Unexpected item in lookup: {item:?}"))),
2163        }
2164    }
2165
2166    /// Retrieves the wrapped keys for the given object.  The keys *should* be known to exist and it
2167    /// will be considered an inconsistency if they don't.
2168    pub async fn get_keys(&self, object_id: u64) -> Result<EncryptionKeys, Error> {
2169        match self.tree.find(&ObjectKey::keys(object_id)).await?.ok_or(FxfsError::Inconsistent)? {
2170            Item { value: ObjectValue::Keys(keys), .. } => Ok(keys),
2171            _ => Err(anyhow!(FxfsError::Inconsistent).context("open_object: Expected keys")),
2172        }
2173    }
2174
2175    pub async fn update_attributes<'a>(
2176        &self,
2177        transaction: &mut Transaction<'a>,
2178        object_id: u64,
2179        node_attributes: Option<&fio::MutableNodeAttributes>,
2180        change_time: Option<Timestamp>,
2181    ) -> Result<(), Error> {
2182        if change_time.is_none() {
2183            if let Some(attributes) = node_attributes {
2184                let empty_attributes = fio::MutableNodeAttributes { ..Default::default() };
2185                if *attributes == empty_attributes {
2186                    return Ok(());
2187                }
2188            } else {
2189                return Ok(());
2190            }
2191        }
2192        let mut mutation = self.txn_get_object_mutation(transaction, object_id).await?;
2193        if let ObjectValue::Object { ref mut attributes, .. } = mutation.item.value {
2194            if let Some(time) = change_time {
2195                attributes.change_time = time;
2196            }
2197            if let Some(node_attributes) = node_attributes {
2198                if let Some(time) = node_attributes.creation_time {
2199                    attributes.creation_time = Timestamp::from_nanos(time);
2200                }
2201                if let Some(time) = node_attributes.modification_time {
2202                    attributes.modification_time = Timestamp::from_nanos(time);
2203                }
2204                if let Some(time) = node_attributes.access_time {
2205                    attributes.access_time = Timestamp::from_nanos(time);
2206                }
2207                if node_attributes.mode.is_some()
2208                    || node_attributes.uid.is_some()
2209                    || node_attributes.gid.is_some()
2210                    || node_attributes.rdev.is_some()
2211                {
2212                    if let Some(a) = &mut attributes.posix_attributes {
2213                        if let Some(mode) = node_attributes.mode {
2214                            a.mode = mode;
2215                        }
2216                        if let Some(uid) = node_attributes.uid {
2217                            a.uid = uid;
2218                        }
2219                        if let Some(gid) = node_attributes.gid {
2220                            a.gid = gid;
2221                        }
2222                        if let Some(rdev) = node_attributes.rdev {
2223                            a.rdev = rdev;
2224                        }
2225                    } else {
2226                        attributes.posix_attributes = Some(PosixAttributes {
2227                            mode: node_attributes.mode.unwrap_or_default(),
2228                            uid: node_attributes.uid.unwrap_or_default(),
2229                            gid: node_attributes.gid.unwrap_or_default(),
2230                            rdev: node_attributes.rdev.unwrap_or_default(),
2231                        });
2232                    }
2233                }
2234            }
2235        } else {
2236            bail!(
2237                anyhow!(FxfsError::Inconsistent)
2238                    .context("ObjectStore.update_attributes: Expected object value")
2239            );
2240        };
2241        transaction.add(self.store_object_id(), Mutation::ObjectStore(mutation));
2242        Ok(())
2243    }
2244
2245    // Updates and commits the changes to access time in ObjectProperties. The update matches
2246    // Linux's RELATIME. That is, access time is updated to the current time if access time is less
2247    // than or equal to the last modification or status change, or if it has been more than a day
2248    // since the last access.
2249    pub async fn update_access_time(
2250        &self,
2251        object_id: u64,
2252        props: &mut ObjectProperties,
2253    ) -> Result<(), Error> {
2254        let access_time = props.access_time.as_nanos();
2255        let modification_time = props.modification_time.as_nanos();
2256        let change_time = props.change_time.as_nanos();
2257        let now = Timestamp::now();
2258        if access_time <= modification_time
2259            || access_time <= change_time
2260            || access_time
2261                < now.as_nanos()
2262                    - Timestamp::from(std::time::Duration::from_secs(24 * 60 * 60)).as_nanos()
2263        {
2264            let mut transaction = self
2265                .filesystem()
2266                .clone()
2267                .new_transaction(
2268                    lock_keys![LockKey::object(self.store_object_id, object_id,)],
2269                    Options { borrow_metadata_space: true, ..Default::default() },
2270                )
2271                .await?;
2272            self.update_attributes(
2273                &mut transaction,
2274                object_id,
2275                Some(&fio::MutableNodeAttributes {
2276                    access_time: Some(now.as_nanos()),
2277                    ..Default::default()
2278                }),
2279                None,
2280            )
2281            .await?;
2282            transaction.commit().await?;
2283            props.access_time = now;
2284        }
2285        Ok(())
2286    }
2287
2288    async fn write_store_info<'a>(
2289        &'a self,
2290        transaction: &mut Transaction<'a>,
2291        info: &StoreInfo,
2292    ) -> Result<(), Error> {
2293        let mut serialized_info = Vec::new();
2294        info.serialize_with_version(&mut serialized_info)?;
2295        let mut buf = self.device.allocate_buffer(serialized_info.len()).await;
2296        buf.as_mut_slice().copy_from_slice(&serialized_info[..]);
2297        self.store_info_handle.get().unwrap().txn_write(transaction, 0u64, buf.as_ref()).await
2298    }
2299
2300    pub fn mark_deleted(&self) {
2301        *self.lock_state.lock() = LockState::Deleted;
2302    }
2303}
2304
2305#[async_trait]
2306impl JournalingObject for ObjectStore {
2307    fn apply_mutation(
2308        &self,
2309        mutation: Mutation,
2310        context: &ApplyContext<'_, '_>,
2311        _assoc_obj: AssocObj<'_>,
2312    ) -> Result<(), Error> {
2313        match &*self.lock_state.lock() {
2314            LockState::Locked | LockState::Locking => {
2315                ensure!(
2316                    matches!(mutation, Mutation::BeginFlush | Mutation::EndFlush)
2317                        || matches!(
2318                            mutation,
2319                            Mutation::EncryptedObjectStore(_) | Mutation::UpdateMutationsKey(_)
2320                                if context.mode.is_replay()
2321                        ),
2322                    anyhow!(FxfsError::Inconsistent)
2323                        .context(format!("Unexpected mutation for encrypted store: {mutation:?}"))
2324                );
2325            }
2326            LockState::Invalid
2327            | LockState::Unlocking
2328            | LockState::Unencrypted
2329            | LockState::Unlocked { .. }
2330            | LockState::UnlockedReadOnly(..)
2331            | LockState::Deleted => {}
2332            lock_state @ _ => panic!("Unexpected lock state: {lock_state:?}"),
2333        }
2334        match mutation {
2335            Mutation::ObjectStore(ObjectStoreMutation { mut item, op }) => {
2336                item.sequence = context.checkpoint.file_offset;
2337                match op {
2338                    Operation::Insert => {
2339                        // If we are inserting an object record for the first time, it signifies the
2340                        // birth of the object so we need to adjust the object count.
2341                        if matches!(item.value, ObjectValue::Object { .. }) {
2342                            {
2343                                let info = &mut self.store_info.lock();
2344                                let object_count = &mut info.as_mut().unwrap().object_count;
2345                                *object_count = object_count.saturating_add(1);
2346                            }
2347                            if context.mode.is_replay() {
2348                                self.update_last_object_id(item.key.object_id);
2349                            }
2350                        }
2351                        self.tree.insert(item)?;
2352                    }
2353                    Operation::ReplaceOrInsert => {
2354                        self.tree.replace_or_insert(item);
2355                    }
2356                    Operation::Merge => {
2357                        if item.is_tombstone() {
2358                            let info = &mut self.store_info.lock();
2359                            let object_count = &mut info.as_mut().unwrap().object_count;
2360                            *object_count = object_count.saturating_sub(1);
2361                        }
2362                        let lower_bound = item.key.key_for_merge_into();
2363                        self.tree.merge_into(item, &lower_bound);
2364                    }
2365                }
2366            }
2367            Mutation::BeginFlush => {
2368                ensure!(self.parent_store.is_some(), FxfsError::Inconsistent);
2369                self.tree.seal();
2370            }
2371            Mutation::EndFlush => ensure!(self.parent_store.is_some(), FxfsError::Inconsistent),
2372            Mutation::EncryptedObjectStore(_) | Mutation::UpdateMutationsKey(_) => {
2373                // We will process these during Self::unlock.
2374                ensure!(
2375                    !matches!(&*self.lock_state.lock(), LockState::Unencrypted),
2376                    FxfsError::Inconsistent
2377                );
2378            }
2379            Mutation::CreateInternalDir(object_id) => {
2380                ensure!(object_id != INVALID_OBJECT_ID, FxfsError::Inconsistent);
2381                self.store_info.lock().as_mut().unwrap().internal_directory_object_id = object_id;
2382            }
2383            _ => bail!("unexpected mutation: {:?}", mutation),
2384        }
2385        self.counters.lock().mutations_applied += 1;
2386        Ok(())
2387    }
2388
2389    fn drop_mutation(&self, _mutation: Mutation, _transaction: &Transaction<'_>) {
2390        self.counters.lock().mutations_dropped += 1;
2391    }
2392
2393    /// Push all in-memory structures to the device. This is not necessary for sync since the
2394    /// journal will take care of it.  This is supposed to be called when there is either memory or
2395    /// space pressure (flushing the store will persist in-memory data and allow the journal file to
2396    /// be trimmed).
2397    ///
2398    /// Also returns the earliest version of a struct in the filesystem (when known).
2399    async fn flush(&self) -> Result<Version, Error> {
2400        self.flush_with_reason(flush::Reason::Journal).await
2401    }
2402
2403    fn write_mutation(&self, mutation: &Mutation, mut writer: journal::Writer<'_>) {
2404        // Intentionally enumerating all variants to force a decision on any new variants. Encrypt
2405        // all mutations that could affect an encrypted object store contents or the `StoreInfo` of
2406        // the encrypted object store. During `unlock()` any mutations which haven't been encrypted
2407        // won't be replayed after reading `StoreInfo`.
2408        match mutation {
2409            // Whilst CreateInternalDir is a mutation for `StoreInfo`, which isn't encrypted, we
2410            // still choose to encrypt the mutation because it makes it easier to deal with replay.
2411            // When we replay mutations for an encrypted store, the only thing we keep in memory are
2412            // the encrypted mutations; we don't keep `StoreInfo` or changes to it in memory. So, by
2413            // encrypting the CreateInternalDir mutation here, it means we don't have to track both
2414            // encrypted mutations bound for the LSM tree and unencrypted mutations for `StoreInfo`
2415            // to use in `unlock()`. It'll just bundle CreateInternalDir mutations with the other
2416            // encrypted mutations and handled them all in sequence during `unlock()`.
2417            Mutation::ObjectStore(_) | Mutation::CreateInternalDir(_) => {
2418                let mut cipher = self.mutations_cipher.lock();
2419                if let Some(cipher) = cipher.as_mut() {
2420                    // If this is the first time we've used this key, we must write the key out.
2421                    if cipher.offset() == 0 {
2422                        writer.write(Mutation::update_mutations_key(
2423                            self.store_info
2424                                .lock()
2425                                .as_ref()
2426                                .unwrap()
2427                                .mutations_key
2428                                .as_ref()
2429                                .unwrap()
2430                                .clone(),
2431                        ));
2432                    }
2433                    let mut buffer = Vec::new();
2434                    mutation.serialize_into(&mut buffer).unwrap();
2435                    cipher.encrypt(&mut buffer);
2436                    writer.write(Mutation::EncryptedObjectStore(buffer.into()));
2437                    return;
2438                }
2439            }
2440            // `EncryptedObjectStore` and `UpdateMutationsKey` are both obviously associated with
2441            // encrypted object stores, but are either the encrypted mutation data itself or
2442            // metadata governing how the data will be encrypted. They should only be produced here.
2443            Mutation::EncryptedObjectStore(_) | Mutation::UpdateMutationsKey(_) => {
2444                debug_assert!(false, "Only this method should generate encrypted mutations");
2445            }
2446            // `BeginFlush` and `EndFlush` are not needed during `unlock()` and are needed during
2447            // the initial journal replay, so should not be encrypted. `Allocator`, `DeleteVolume`,
2448            // `UpdateBorrowed` mutations are never associated with an encrypted store as we do not
2449            // encrypt the allocator or root/root-parent stores so we can avoid the locking.
2450            Mutation::Allocator(_)
2451            | Mutation::BeginFlush
2452            | Mutation::EndFlush
2453            | Mutation::DeleteVolume
2454            | Mutation::UpdateBorrowed(_) => {}
2455        }
2456        writer.write(mutation.clone());
2457    }
2458}
2459
2460impl HandleOwner for ObjectStore {}
2461
2462impl AsRef<ObjectStore> for ObjectStore {
2463    fn as_ref(&self) -> &ObjectStore {
2464        self
2465    }
2466}
2467
2468fn layer_size_from_encrypted_mutations_size(size: u64) -> u64 {
2469    // This is similar to reserved_space_from_journal_usage. It needs to be a worst case estimate of
2470    // the amount of metadata space that might need to be reserved to allow the encrypted mutations
2471    // to be written to layer files.  It needs to be >= than reservation_amount_from_layer_size will
2472    // return once the data has been written to layer files and <= than
2473    // reserved_space_from_journal_usage would use.  We can't just use
2474    // reserved_space_from_journal_usage because the encrypted mutations file includes some extra
2475    // data (it includes the checkpoints) that isn't written in the same way to the journal.
2476    size * 3
2477}
2478
2479impl AssociatedObject for ObjectStore {}
2480
2481/// Argument to the trim_some method.
2482#[derive(Debug)]
2483pub enum TrimMode {
2484    /// Trim extents beyond the current size.
2485    UseSize,
2486
2487    /// Trim extents beyond the supplied offset.
2488    FromOffset(u64),
2489
2490    /// Remove the object (or attribute) from the store once it is fully trimmed.
2491    Tombstone(TombstoneMode),
2492}
2493
2494/// Sets the mode for tombstoning (either at the object or attribute level).
2495#[derive(Debug)]
2496pub enum TombstoneMode {
2497    Object,
2498    Attribute,
2499}
2500
2501/// Result of the trim_some method.
2502#[derive(Debug)]
2503pub enum TrimResult {
2504    /// We reached the limit of the transaction and more extents might follow.
2505    Incomplete,
2506
2507    /// We finished this attribute.  Returns the ID of the next attribute for the same object if
2508    /// there is one.
2509    Done(Option<u64>),
2510}
2511
2512/// Loads store info.
2513pub async fn load_store_info(
2514    parent: &Arc<ObjectStore>,
2515    store_object_id: u64,
2516) -> Result<StoreInfo, Error> {
2517    let handle =
2518        ObjectStore::open_object(parent, store_object_id, HandleOptions::default(), None).await?;
2519
2520    Ok(if handle.get_size() > 0 {
2521        let serialized_info = handle.contents(MAX_STORE_INFO_SERIALIZED_SIZE).await?;
2522        let mut cursor = std::io::Cursor::new(serialized_info);
2523        let (store_info, _) = StoreInfo::deserialize_with_version(&mut cursor)
2524            .context("Failed to deserialize StoreInfo")?;
2525        store_info
2526    } else {
2527        // The store_info will be absent for a newly created and empty object store.
2528        StoreInfo::default()
2529    })
2530}
2531
2532#[cfg(test)]
2533mod tests {
2534    use super::{
2535        DEFAULT_DATA_ATTRIBUTE_ID, FSVERITY_MERKLE_ATTRIBUTE_ID, FsverityMetadata, HandleOptions,
2536        LastObjectId, LockKey, MAX_STORE_INFO_SERIALIZED_SIZE, Mutation, NO_OWNER,
2537        NewChildStoreOptions, OBJECT_ID_HI_MASK, ObjectStore, RootDigest, StoreInfo, StoreOptions,
2538        StoreOwner,
2539    };
2540    use crate::errors::FxfsError;
2541    use crate::filesystem::{FxFilesystem, JournalingObject, OpenFxFilesystem, SyncOptions};
2542    use crate::fsck::fsck;
2543    use crate::lsm_tree::Query;
2544    use crate::lsm_tree::types::{ItemRef, LayerIterator};
2545    use crate::object_handle::{
2546        INVALID_OBJECT_ID, ObjectHandle, ReadObjectHandle, WriteObjectHandle,
2547    };
2548    use crate::object_store::directory::Directory;
2549    use crate::object_store::object_record::{AttributeKey, ObjectKey, ObjectKind, ObjectValue};
2550    use crate::object_store::transaction::{Options, lock_keys};
2551    use crate::object_store::volume::root_volume;
2552    use crate::serialized_types::VersionedLatest;
2553    use assert_matches::assert_matches;
2554    use async_trait::async_trait;
2555    use fuchsia_async as fasync;
2556    use fuchsia_sync::Mutex;
2557    use futures::join;
2558    use fxfs_crypto::ff1::Ff1;
2559    use fxfs_crypto::{
2560        Crypt, FXFS_KEY_SIZE, FXFS_WRAPPED_KEY_SIZE, FxfsKey, UnwrappedKey, WrappedKeyBytes,
2561    };
2562    use fxfs_insecure_crypto::InsecureCrypt;
2563    use std::sync::Arc;
2564    use std::time::Duration;
2565    use storage_device::DeviceHolder;
2566    use storage_device::fake_device::FakeDevice;
2567
2568    const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
2569
2570    async fn test_filesystem() -> OpenFxFilesystem {
2571        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
2572        FxFilesystem::new_empty(device).await.expect("new_empty failed")
2573    }
2574
2575    #[fuchsia::test]
2576    async fn test_item_sequences() {
2577        let fs = test_filesystem().await;
2578        let object1;
2579        let object2;
2580        let object3;
2581        let mut transaction = fs
2582            .clone()
2583            .new_transaction(lock_keys![], Options::default())
2584            .await
2585            .expect("new_transaction failed");
2586        let store = fs.root_store();
2587        object1 = Arc::new(
2588            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2589                .await
2590                .expect("create_object failed"),
2591        );
2592        transaction.commit().await.expect("commit failed");
2593        let mut transaction = fs
2594            .clone()
2595            .new_transaction(lock_keys![], Options::default())
2596            .await
2597            .expect("new_transaction failed");
2598        object2 = Arc::new(
2599            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2600                .await
2601                .expect("create_object failed"),
2602        );
2603        transaction.commit().await.expect("commit failed");
2604
2605        fs.sync(SyncOptions::default()).await.expect("sync failed");
2606
2607        let mut transaction = fs
2608            .clone()
2609            .new_transaction(lock_keys![], Options::default())
2610            .await
2611            .expect("new_transaction failed");
2612        object3 = Arc::new(
2613            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2614                .await
2615                .expect("create_object failed"),
2616        );
2617        transaction.commit().await.expect("commit failed");
2618
2619        let layer_set = store.tree.layer_set();
2620        let mut merger = layer_set.merger();
2621        let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
2622        let mut sequences = [0u64; 3];
2623        while let Some(ItemRef { key: ObjectKey { object_id, .. }, sequence, .. }) = iter.get() {
2624            if *object_id == object1.object_id() {
2625                sequences[0] = sequence;
2626            } else if *object_id == object2.object_id() {
2627                sequences[1] = sequence;
2628            } else if *object_id == object3.object_id() {
2629                sequences[2] = sequence;
2630            }
2631            iter.advance().await.expect("advance failed");
2632        }
2633
2634        assert!(sequences[0] <= sequences[1], "sequences: {:?}", sequences);
2635        // The last item came after a sync, so should be strictly greater.
2636        assert!(sequences[1] < sequences[2], "sequences: {:?}", sequences);
2637        fs.close().await.expect("Close failed");
2638    }
2639
2640    #[fuchsia::test]
2641    async fn test_verified_file_with_verified_attribute() {
2642        let fs: OpenFxFilesystem = test_filesystem().await;
2643        let mut transaction = fs
2644            .clone()
2645            .new_transaction(lock_keys![], Options::default())
2646            .await
2647            .expect("new_transaction failed");
2648        let store = fs.root_store();
2649        let object = Arc::new(
2650            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2651                .await
2652                .expect("create_object failed"),
2653        );
2654
2655        transaction.add(
2656            store.store_object_id(),
2657            Mutation::replace_or_insert_object(
2658                ObjectKey::attribute(
2659                    object.object_id(),
2660                    DEFAULT_DATA_ATTRIBUTE_ID,
2661                    AttributeKey::Attribute,
2662                ),
2663                ObjectValue::verified_attribute(
2664                    0,
2665                    FsverityMetadata { root_digest: RootDigest::Sha256([0; 32]), salt: vec![] },
2666                ),
2667            ),
2668        );
2669
2670        transaction.add(
2671            store.store_object_id(),
2672            Mutation::replace_or_insert_object(
2673                ObjectKey::attribute(
2674                    object.object_id(),
2675                    FSVERITY_MERKLE_ATTRIBUTE_ID,
2676                    AttributeKey::Attribute,
2677                ),
2678                ObjectValue::attribute(0, false),
2679            ),
2680        );
2681
2682        transaction.commit().await.unwrap();
2683
2684        let handle =
2685            ObjectStore::open_object(&store, object.object_id(), HandleOptions::default(), None)
2686                .await
2687                .expect("open_object failed");
2688
2689        assert!(handle.is_verified_file());
2690
2691        fs.close().await.expect("Close failed");
2692    }
2693
2694    #[fuchsia::test]
2695    async fn test_verified_file_without_verified_attribute() {
2696        let fs: OpenFxFilesystem = test_filesystem().await;
2697        let mut transaction = fs
2698            .clone()
2699            .new_transaction(lock_keys![], Options::default())
2700            .await
2701            .expect("new_transaction failed");
2702        let store = fs.root_store();
2703        let object = Arc::new(
2704            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2705                .await
2706                .expect("create_object failed"),
2707        );
2708
2709        transaction.commit().await.unwrap();
2710
2711        let handle =
2712            ObjectStore::open_object(&store, object.object_id(), HandleOptions::default(), None)
2713                .await
2714                .expect("open_object failed");
2715
2716        assert!(!handle.is_verified_file());
2717
2718        fs.close().await.expect("Close failed");
2719    }
2720
2721    #[fuchsia::test]
2722    async fn test_create_and_open_store() {
2723        let fs = test_filesystem().await;
2724        let store_id = {
2725            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
2726            root_volume
2727                .new_volume(
2728                    "test",
2729                    NewChildStoreOptions {
2730                        options: StoreOptions {
2731                            owner: NO_OWNER,
2732                            crypt: Some(Arc::new(InsecureCrypt::new())),
2733                        },
2734                        ..Default::default()
2735                    },
2736                )
2737                .await
2738                .expect("new_volume failed")
2739                .store_object_id()
2740        };
2741
2742        fs.close().await.expect("close failed");
2743        let device = fs.take_device().await;
2744        device.reopen(false);
2745        let fs = FxFilesystem::open(device).await.expect("open failed");
2746
2747        {
2748            let store = fs.object_manager().store(store_id).expect("store not found");
2749            store.unlock(NO_OWNER, Arc::new(InsecureCrypt::new())).await.expect("unlock failed");
2750        }
2751        fs.close().await.expect("Close failed");
2752    }
2753
2754    #[fuchsia::test]
2755    async fn test_create_and_open_internal_dir() {
2756        let fs = test_filesystem().await;
2757        let dir_id;
2758        let store_id;
2759        {
2760            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
2761            let store = root_volume
2762                .new_volume(
2763                    "test",
2764                    NewChildStoreOptions {
2765                        options: StoreOptions {
2766                            owner: NO_OWNER,
2767                            crypt: Some(Arc::new(InsecureCrypt::new())),
2768                        },
2769                        ..Default::default()
2770                    },
2771                )
2772                .await
2773                .expect("new_volume failed");
2774            dir_id =
2775                store.get_or_create_internal_directory_id().await.expect("Create internal dir");
2776            store_id = store.store_object_id();
2777        }
2778
2779        fs.close().await.expect("close failed");
2780        let device = fs.take_device().await;
2781        device.reopen(false);
2782        let fs = FxFilesystem::open(device).await.expect("open failed");
2783
2784        {
2785            let store = fs.object_manager().store(store_id).expect("store not found");
2786            store.unlock(NO_OWNER, Arc::new(InsecureCrypt::new())).await.expect("unlock failed");
2787            assert_eq!(
2788                dir_id,
2789                store.get_or_create_internal_directory_id().await.expect("Retrieving dir")
2790            );
2791            let obj = store
2792                .tree()
2793                .find(&ObjectKey::object(dir_id))
2794                .await
2795                .expect("Searching tree for dir")
2796                .unwrap();
2797            assert_matches!(
2798                obj.value,
2799                ObjectValue::Object { kind: ObjectKind::Directory { .. }, .. }
2800            );
2801        }
2802        fs.close().await.expect("Close failed");
2803    }
2804
2805    #[fuchsia::test]
2806    async fn test_create_and_open_internal_dir_unencrypted() {
2807        let fs = test_filesystem().await;
2808        let dir_id;
2809        let store_id;
2810        {
2811            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
2812            let store = root_volume
2813                .new_volume("test", NewChildStoreOptions::default())
2814                .await
2815                .expect("new_volume failed");
2816            dir_id =
2817                store.get_or_create_internal_directory_id().await.expect("Create internal dir");
2818            store_id = store.store_object_id();
2819        }
2820
2821        fs.close().await.expect("close failed");
2822        let device = fs.take_device().await;
2823        device.reopen(false);
2824        let fs = FxFilesystem::open(device).await.expect("open failed");
2825
2826        {
2827            let store = fs.object_manager().store(store_id).expect("store not found");
2828            assert_eq!(
2829                dir_id,
2830                store.get_or_create_internal_directory_id().await.expect("Retrieving dir")
2831            );
2832            let obj = store
2833                .tree()
2834                .find(&ObjectKey::object(dir_id))
2835                .await
2836                .expect("Searching tree for dir")
2837                .unwrap();
2838            assert_matches!(
2839                obj.value,
2840                ObjectValue::Object { kind: ObjectKind::Directory { .. }, .. }
2841            );
2842        }
2843        fs.close().await.expect("Close failed");
2844    }
2845
2846    #[fuchsia::test(threads = 10)]
2847    async fn test_old_layers_are_purged() {
2848        let fs = test_filesystem().await;
2849
2850        let store = fs.root_store();
2851        let mut transaction = fs
2852            .clone()
2853            .new_transaction(lock_keys![], Options::default())
2854            .await
2855            .expect("new_transaction failed");
2856        let object = Arc::new(
2857            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2858                .await
2859                .expect("create_object failed"),
2860        );
2861        transaction.commit().await.expect("commit failed");
2862
2863        store.flush().await.expect("flush failed");
2864
2865        let mut buf = object.allocate_buffer(5).await;
2866        buf.as_mut_slice().copy_from_slice(b"hello");
2867        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2868
2869        // Getting the layer-set should cause the flush to stall.
2870        let layer_set = store.tree().layer_set();
2871
2872        let done = Mutex::new(false);
2873        let mut object_id = 0;
2874
2875        join!(
2876            async {
2877                store.flush().await.expect("flush failed");
2878                assert!(*done.lock());
2879            },
2880            async {
2881                // This is a halting problem so all we can do is sleep.
2882                fasync::Timer::new(Duration::from_secs(1)).await;
2883                *done.lock() = true;
2884                object_id = layer_set.layers.last().unwrap().handle().unwrap().object_id();
2885                std::mem::drop(layer_set);
2886            }
2887        );
2888
2889        if let Err(e) = ObjectStore::open_object(
2890            &store.parent_store.as_ref().unwrap(),
2891            object_id,
2892            HandleOptions::default(),
2893            store.crypt(),
2894        )
2895        .await
2896        {
2897            assert!(FxfsError::NotFound.matches(&e));
2898        } else {
2899            panic!("open_object succeeded");
2900        }
2901    }
2902
2903    #[fuchsia::test]
2904    async fn test_tombstone_deletes_data() {
2905        let fs = test_filesystem().await;
2906        let root_store = fs.root_store();
2907        let child_id = {
2908            let mut transaction = fs
2909                .clone()
2910                .new_transaction(lock_keys![], Options::default())
2911                .await
2912                .expect("new_transaction failed");
2913            let child = ObjectStore::create_object(
2914                &root_store,
2915                &mut transaction,
2916                HandleOptions::default(),
2917                None,
2918            )
2919            .await
2920            .expect("create_object failed");
2921            transaction.commit().await.expect("commit failed");
2922
2923            // Allocate an extent in the file.
2924            let mut buffer = child.allocate_buffer(8192).await;
2925            buffer.as_mut_slice().fill(0xaa);
2926            child.write_or_append(Some(0), buffer.as_ref()).await.expect("write failed");
2927
2928            child.object_id()
2929        };
2930
2931        root_store.tombstone_object(child_id, Options::default()).await.expect("tombstone failed");
2932
2933        // Let fsck check allocations.
2934        fsck(fs.clone()).await.expect("fsck failed");
2935    }
2936
2937    #[fuchsia::test]
2938    async fn test_tombstone_purges_keys() {
2939        let fs = test_filesystem().await;
2940        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
2941        let store = root_volume
2942            .new_volume(
2943                "test",
2944                NewChildStoreOptions {
2945                    options: StoreOptions {
2946                        crypt: Some(Arc::new(InsecureCrypt::new())),
2947                        ..StoreOptions::default()
2948                    },
2949                    ..NewChildStoreOptions::default()
2950                },
2951            )
2952            .await
2953            .expect("new_volume failed");
2954        let mut transaction = fs
2955            .clone()
2956            .new_transaction(lock_keys![], Options::default())
2957            .await
2958            .expect("new_transaction failed");
2959        let child =
2960            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2961                .await
2962                .expect("create_object failed");
2963        transaction.commit().await.expect("commit failed");
2964        assert!(store.key_manager.get(child.object_id()).await.unwrap().is_some());
2965        store
2966            .tombstone_object(child.object_id(), Options::default())
2967            .await
2968            .expect("tombstone_object failed");
2969        assert!(store.key_manager.get(child.object_id()).await.unwrap().is_none());
2970        fs.close().await.expect("close failed");
2971    }
2972
2973    #[fuchsia::test]
2974    async fn test_major_compaction_discards_unnecessary_records() {
2975        let fs = test_filesystem().await;
2976        let root_store = fs.root_store();
2977        let child_id = {
2978            let mut transaction = fs
2979                .clone()
2980                .new_transaction(lock_keys![], Options::default())
2981                .await
2982                .expect("new_transaction failed");
2983            let child = ObjectStore::create_object(
2984                &root_store,
2985                &mut transaction,
2986                HandleOptions::default(),
2987                None,
2988            )
2989            .await
2990            .expect("create_object failed");
2991            transaction.commit().await.expect("commit failed");
2992
2993            // Allocate an extent in the file.
2994            let mut buffer = child.allocate_buffer(8192).await;
2995            buffer.as_mut_slice().fill(0xaa);
2996            child.write_or_append(Some(0), buffer.as_ref()).await.expect("write failed");
2997
2998            child.object_id()
2999        };
3000
3001        root_store.tombstone_object(child_id, Options::default()).await.expect("tombstone failed");
3002        {
3003            let layers = root_store.tree.layer_set();
3004            let mut merger = layers.merger();
3005            let iter = merger
3006                .query(Query::FullRange(&ObjectKey::object(child_id)))
3007                .await
3008                .expect("seek failed");
3009            // Find at least one object still in the tree.
3010            match iter.get() {
3011                Some(ItemRef { key: ObjectKey { object_id, .. }, .. })
3012                    if *object_id == child_id => {}
3013                _ => panic!("Objects should still be in the tree."),
3014            }
3015        }
3016        root_store.flush().await.expect("flush failed");
3017
3018        // There should be no records for the object.
3019        let layers = root_store.tree.layer_set();
3020        let mut merger = layers.merger();
3021        let iter = merger
3022            .query(Query::FullRange(&ObjectKey::object(child_id)))
3023            .await
3024            .expect("seek failed");
3025        match iter.get() {
3026            None => {}
3027            Some(ItemRef { key: ObjectKey { object_id, .. }, .. }) => {
3028                assert_ne!(*object_id, child_id)
3029            }
3030        }
3031    }
3032
3033    #[fuchsia::test]
3034    async fn test_overlapping_extents_in_different_layers() {
3035        let fs = test_filesystem().await;
3036        let store = fs.root_store();
3037
3038        let mut transaction = fs
3039            .clone()
3040            .new_transaction(
3041                lock_keys![LockKey::object(
3042                    store.store_object_id(),
3043                    store.root_directory_object_id()
3044                )],
3045                Options::default(),
3046            )
3047            .await
3048            .expect("new_transaction failed");
3049        let root_directory =
3050            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
3051        let object = root_directory
3052            .create_child_file(&mut transaction, "test")
3053            .await
3054            .expect("create_child_file failed");
3055        transaction.commit().await.expect("commit failed");
3056
3057        let buf = object.allocate_buffer(16384).await;
3058        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3059
3060        store.flush().await.expect("flush failed");
3061
3062        object.write_or_append(Some(0), buf.subslice(0..4096)).await.expect("write failed");
3063
3064        // At this point, we should have an extent for 0..16384 in a layer that has been flushed,
3065        // and an extent for 0..4096 that partially overwrites it.  Writing to 0..16384 should
3066        // overwrite both of those extents.
3067        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3068
3069        fsck(fs.clone()).await.expect("fsck failed");
3070    }
3071
3072    #[fuchsia::test(threads = 10)]
3073    async fn test_encrypted_mutations() {
3074        async fn one_iteration(
3075            fs: OpenFxFilesystem,
3076            crypt: Arc<dyn Crypt>,
3077            iteration: u64,
3078        ) -> OpenFxFilesystem {
3079            async fn reopen(fs: OpenFxFilesystem) -> OpenFxFilesystem {
3080                fs.close().await.expect("Close failed");
3081                let device = fs.take_device().await;
3082                device.reopen(false);
3083                FxFilesystem::open(device).await.expect("FS open failed")
3084            }
3085
3086            let fs = reopen(fs).await;
3087
3088            let (store_object_id, object_id) = {
3089                let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3090                let store = root_volume
3091                    .volume(
3092                        "test",
3093                        StoreOptions { crypt: Some(crypt.clone()), ..StoreOptions::default() },
3094                    )
3095                    .await
3096                    .expect("volume failed");
3097
3098                let mut transaction = fs
3099                    .clone()
3100                    .new_transaction(
3101                        lock_keys![LockKey::object(
3102                            store.store_object_id(),
3103                            store.root_directory_object_id(),
3104                        )],
3105                        Options::default(),
3106                    )
3107                    .await
3108                    .expect("new_transaction failed");
3109                let root_directory = Directory::open(&store, store.root_directory_object_id())
3110                    .await
3111                    .expect("open failed");
3112                let object = root_directory
3113                    .create_child_file(&mut transaction, &format!("test {}", iteration))
3114                    .await
3115                    .expect("create_child_file failed");
3116                transaction.commit().await.expect("commit failed");
3117
3118                let mut buf = object.allocate_buffer(1000).await;
3119                for i in 0..buf.len() {
3120                    buf.as_mut_slice()[i] = i as u8;
3121                }
3122                object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3123
3124                (store.store_object_id(), object.object_id())
3125            };
3126
3127            let fs = reopen(fs).await;
3128
3129            let check_object = |fs: Arc<FxFilesystem>| {
3130                let crypt = crypt.clone();
3131                async move {
3132                    let root_volume = root_volume(fs).await.expect("root_volume failed");
3133                    let volume = root_volume
3134                        .volume(
3135                            "test",
3136                            StoreOptions { crypt: Some(crypt), ..StoreOptions::default() },
3137                        )
3138                        .await
3139                        .expect("volume failed");
3140
3141                    let object = ObjectStore::open_object(
3142                        &volume,
3143                        object_id,
3144                        HandleOptions::default(),
3145                        None,
3146                    )
3147                    .await
3148                    .expect("open_object failed");
3149                    let mut buf = object.allocate_buffer(1000).await;
3150                    assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), 1000);
3151                    for i in 0..buf.len() {
3152                        assert_eq!(buf.as_slice()[i], i as u8);
3153                    }
3154                }
3155            };
3156
3157            check_object(fs.clone()).await;
3158
3159            let fs = reopen(fs).await;
3160
3161            // At this point the "test" volume is locked.  Before checking the object, flush the
3162            // filesystem.  This should leave a file with encrypted mutations.
3163            fs.object_manager().flush().await.expect("flush failed");
3164
3165            assert_ne!(
3166                fs.object_manager()
3167                    .store(store_object_id)
3168                    .unwrap()
3169                    .load_store_info()
3170                    .await
3171                    .expect("load_store_info failed")
3172                    .encrypted_mutations_object_id,
3173                INVALID_OBJECT_ID
3174            );
3175
3176            check_object(fs.clone()).await;
3177
3178            // Checking the object should have triggered a flush and so now there should be no
3179            // encrypted mutations object.
3180            assert_eq!(
3181                fs.object_manager()
3182                    .store(store_object_id)
3183                    .unwrap()
3184                    .load_store_info()
3185                    .await
3186                    .expect("load_store_info failed")
3187                    .encrypted_mutations_object_id,
3188                INVALID_OBJECT_ID
3189            );
3190
3191            let fs = reopen(fs).await;
3192
3193            fsck(fs.clone()).await.expect("fsck failed");
3194
3195            let fs = reopen(fs).await;
3196
3197            check_object(fs.clone()).await;
3198
3199            fs
3200        }
3201
3202        let mut fs = test_filesystem().await;
3203        let crypt = Arc::new(InsecureCrypt::new());
3204
3205        {
3206            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3207            let _store = root_volume
3208                .new_volume(
3209                    "test",
3210                    NewChildStoreOptions {
3211                        options: StoreOptions {
3212                            crypt: Some(crypt.clone()),
3213                            ..StoreOptions::default()
3214                        },
3215                        ..Default::default()
3216                    },
3217                )
3218                .await
3219                .expect("new_volume failed");
3220        }
3221
3222        // Run a few iterations so that we test changes with the stream cipher offset.
3223        for i in 0..5 {
3224            fs = one_iteration(fs, crypt.clone(), i).await;
3225        }
3226    }
3227
3228    #[fuchsia::test(threads = 10)]
3229    async fn test_object_id_cipher_roll() {
3230        let fs = test_filesystem().await;
3231        let crypt = Arc::new(InsecureCrypt::new());
3232
3233        {
3234            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3235            let store = root_volume
3236                .new_volume(
3237                    "test",
3238                    NewChildStoreOptions {
3239                        options: StoreOptions {
3240                            crypt: Some(crypt.clone()),
3241                            ..StoreOptions::default()
3242                        },
3243                        ..Default::default()
3244                    },
3245                )
3246                .await
3247                .expect("new_volume failed");
3248
3249            let store_info = store.store_info().unwrap();
3250
3251            // Hack the last object ID to force a roll of the object ID cipher.
3252            {
3253                let mut last_object_id = store.last_object_id.lock();
3254                assert_eq!(last_object_id.id & OBJECT_ID_HI_MASK, 0);
3255                last_object_id.id |= 0xffffffff;
3256            }
3257
3258            let mut transaction = fs
3259                .clone()
3260                .new_transaction(
3261                    lock_keys![LockKey::object(
3262                        store.store_object_id(),
3263                        store.root_directory_object_id()
3264                    )],
3265                    Options::default(),
3266                )
3267                .await
3268                .expect("new_transaction failed");
3269            let root_directory = Directory::open(&store, store.root_directory_object_id())
3270                .await
3271                .expect("open failed");
3272            let object = root_directory
3273                .create_child_file(&mut transaction, "test")
3274                .await
3275                .expect("create_child_file failed");
3276            transaction.commit().await.expect("commit failed");
3277
3278            assert_eq!(object.object_id() & OBJECT_ID_HI_MASK, 1u64 << 32);
3279
3280            // Check that the key has been changed.
3281            assert_ne!(store.store_info().unwrap().object_id_key, store_info.object_id_key);
3282
3283            assert_eq!(store.last_object_id.lock().id, 1u64 << 32);
3284        };
3285
3286        fs.close().await.expect("Close failed");
3287        let device = fs.take_device().await;
3288        device.reopen(false);
3289        let fs = FxFilesystem::open(device).await.expect("open failed");
3290        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3291        let store = root_volume
3292            .volume("test", StoreOptions { crypt: Some(crypt.clone()), ..StoreOptions::default() })
3293            .await
3294            .expect("volume failed");
3295
3296        assert_eq!(store.last_object_id.lock().id, 1u64 << 32);
3297    }
3298
3299    #[fuchsia::test]
3300    async fn test_object_id_no_roll_for_unencrypted_store() {
3301        let fs = test_filesystem().await;
3302
3303        {
3304            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3305            let store = root_volume
3306                .new_volume("test", NewChildStoreOptions::default())
3307                .await
3308                .expect("new_volume failed");
3309
3310            // Hack the last object ID.
3311            {
3312                let mut last_object_id = store.last_object_id.lock();
3313                assert_eq!(last_object_id.id & OBJECT_ID_HI_MASK, 0);
3314                last_object_id.id |= 0xffffffff;
3315            }
3316
3317            let mut transaction = fs
3318                .clone()
3319                .new_transaction(
3320                    lock_keys![LockKey::object(
3321                        store.store_object_id(),
3322                        store.root_directory_object_id()
3323                    )],
3324                    Options::default(),
3325                )
3326                .await
3327                .expect("new_transaction failed");
3328            let root_directory = Directory::open(&store, store.root_directory_object_id())
3329                .await
3330                .expect("open failed");
3331            let object = root_directory
3332                .create_child_file(&mut transaction, "test")
3333                .await
3334                .expect("create_child_file failed");
3335            transaction.commit().await.expect("commit failed");
3336
3337            assert_eq!(object.object_id(), 0x1_0000_0000);
3338
3339            // Check that there is still no key.
3340            assert!(store.store_info().unwrap().object_id_key.is_none());
3341
3342            assert_eq!(store.last_object_id.lock().id, 0x1_0000_0000);
3343        };
3344
3345        fs.close().await.expect("Close failed");
3346        let device = fs.take_device().await;
3347        device.reopen(false);
3348        let fs = FxFilesystem::open(device).await.expect("open failed");
3349        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3350        let store =
3351            root_volume.volume("test", StoreOptions::default()).await.expect("volume failed");
3352
3353        assert_eq!(store.last_object_id.lock().id, 0x1_0000_0000);
3354    }
3355
3356    #[fuchsia::test]
3357    fn test_object_id_is_not_invalid_object_id() {
3358        let key = UnwrappedKey::new(vec![0; FXFS_KEY_SIZE]);
3359        // 1106634048 results in INVALID_OBJECT_ID with this key.
3360        let mut last_object_id = LastObjectId { id: 1106634047, cipher: Some(Ff1::new(&key)) };
3361        assert_ne!(last_object_id.get_next_object_id(), INVALID_OBJECT_ID);
3362        assert_ne!(last_object_id.get_next_object_id(), INVALID_OBJECT_ID);
3363    }
3364
3365    #[fuchsia::test(threads = 10)]
3366    async fn test_lock_store() {
3367        let fs = test_filesystem().await;
3368        let crypt = Arc::new(InsecureCrypt::new());
3369
3370        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3371        let store = root_volume
3372            .new_volume(
3373                "test",
3374                NewChildStoreOptions {
3375                    options: StoreOptions { crypt: Some(crypt.clone()), ..StoreOptions::default() },
3376                    ..NewChildStoreOptions::default()
3377                },
3378            )
3379            .await
3380            .expect("new_volume failed");
3381        let mut transaction = fs
3382            .clone()
3383            .new_transaction(
3384                lock_keys![LockKey::object(
3385                    store.store_object_id(),
3386                    store.root_directory_object_id()
3387                )],
3388                Options::default(),
3389            )
3390            .await
3391            .expect("new_transaction failed");
3392        let root_directory =
3393            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
3394        root_directory
3395            .create_child_file(&mut transaction, "test")
3396            .await
3397            .expect("create_child_file failed");
3398        transaction.commit().await.expect("commit failed");
3399        store.lock().await.expect("lock failed");
3400
3401        store.unlock(NO_OWNER, crypt).await.expect("unlock failed");
3402        root_directory.lookup("test").await.expect("lookup failed").expect("not found");
3403    }
3404
3405    #[fuchsia::test(threads = 10)]
3406    async fn test_unlock_read_only() {
3407        let fs = test_filesystem().await;
3408        let crypt = Arc::new(InsecureCrypt::new());
3409
3410        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3411        let store = root_volume
3412            .new_volume(
3413                "test",
3414                NewChildStoreOptions {
3415                    options: StoreOptions { crypt: Some(crypt.clone()), ..StoreOptions::default() },
3416                    ..NewChildStoreOptions::default()
3417                },
3418            )
3419            .await
3420            .expect("new_volume failed");
3421        let mut transaction = fs
3422            .clone()
3423            .new_transaction(
3424                lock_keys![LockKey::object(
3425                    store.store_object_id(),
3426                    store.root_directory_object_id()
3427                )],
3428                Options::default(),
3429            )
3430            .await
3431            .expect("new_transaction failed");
3432        let root_directory =
3433            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
3434        root_directory
3435            .create_child_file(&mut transaction, "test")
3436            .await
3437            .expect("create_child_file failed");
3438        transaction.commit().await.expect("commit failed");
3439        store.lock().await.expect("lock failed");
3440
3441        store.unlock_read_only(crypt.clone()).await.expect("unlock failed");
3442        root_directory.lookup("test").await.expect("lookup failed").expect("not found");
3443        store.lock_read_only();
3444        store.unlock_read_only(crypt).await.expect("unlock failed");
3445        root_directory.lookup("test").await.expect("lookup failed").expect("not found");
3446    }
3447
3448    #[fuchsia::test(threads = 10)]
3449    async fn test_key_rolled_when_unlocked() {
3450        let fs = test_filesystem().await;
3451        let crypt = Arc::new(InsecureCrypt::new());
3452
3453        let object_id;
3454        {
3455            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3456            let store = root_volume
3457                .new_volume(
3458                    "test",
3459                    NewChildStoreOptions {
3460                        options: StoreOptions {
3461                            crypt: Some(crypt.clone()),
3462                            ..StoreOptions::default()
3463                        },
3464                        ..Default::default()
3465                    },
3466                )
3467                .await
3468                .expect("new_volume failed");
3469            let mut transaction = fs
3470                .clone()
3471                .new_transaction(
3472                    lock_keys![LockKey::object(
3473                        store.store_object_id(),
3474                        store.root_directory_object_id()
3475                    )],
3476                    Options::default(),
3477                )
3478                .await
3479                .expect("new_transaction failed");
3480            let root_directory = Directory::open(&store, store.root_directory_object_id())
3481                .await
3482                .expect("open failed");
3483            object_id = root_directory
3484                .create_child_file(&mut transaction, "test")
3485                .await
3486                .expect("create_child_file failed")
3487                .object_id();
3488            transaction.commit().await.expect("commit failed");
3489        }
3490
3491        fs.close().await.expect("Close failed");
3492        let mut device = fs.take_device().await;
3493
3494        // Repeatedly remount so that we can be sure that we can remount when there are many
3495        // mutations keys.
3496        for _ in 0..100 {
3497            device.reopen(false);
3498            let fs = FxFilesystem::open(device).await.expect("open failed");
3499            {
3500                let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3501                let store = root_volume
3502                    .volume(
3503                        "test",
3504                        StoreOptions { crypt: Some(crypt.clone()), ..StoreOptions::default() },
3505                    )
3506                    .await
3507                    .expect("open_volume failed");
3508
3509                // The key should get rolled every time we unlock.
3510                assert_eq!(store.mutations_cipher.lock().as_ref().unwrap().offset(), 0);
3511
3512                // Make sure there's an encrypted mutation.
3513                let handle =
3514                    ObjectStore::open_object(&store, object_id, HandleOptions::default(), None)
3515                        .await
3516                        .expect("open_object failed");
3517                let buffer = handle.allocate_buffer(100).await;
3518                handle
3519                    .write_or_append(Some(0), buffer.as_ref())
3520                    .await
3521                    .expect("write_or_append failed");
3522            }
3523            fs.close().await.expect("Close failed");
3524            device = fs.take_device().await;
3525        }
3526    }
3527
3528    #[test]
3529    fn test_store_info_max_serialized_size() {
3530        let info = StoreInfo {
3531            guid: [0xff; 16],
3532            last_object_id: 0x1234567812345678,
3533            // Worst case, each layer should be 3/4 the size of the layer below it (because of the
3534            // compaction policy we're using).  If the smallest layer is 8,192 bytes, then 120
3535            // layers would take up a size that exceeds a 64 bit unsigned integer, so if this fits,
3536            // any size should fit.
3537            layers: vec![0x1234567812345678; 120],
3538            root_directory_object_id: 0x1234567812345678,
3539            graveyard_directory_object_id: 0x1234567812345678,
3540            object_count: 0x1234567812345678,
3541            mutations_key: Some(FxfsKey {
3542                wrapping_key_id: 0x1234567812345678u128.to_le_bytes(),
3543                key: WrappedKeyBytes::from([0xff; FXFS_WRAPPED_KEY_SIZE]),
3544            }),
3545            mutations_cipher_offset: 0x1234567812345678,
3546            encrypted_mutations_object_id: 0x1234567812345678,
3547            object_id_key: Some(FxfsKey {
3548                wrapping_key_id: 0x1234567812345678u128.to_le_bytes(),
3549                key: WrappedKeyBytes::from([0xff; FXFS_WRAPPED_KEY_SIZE]),
3550            }),
3551            internal_directory_object_id: INVALID_OBJECT_ID,
3552        };
3553        let mut serialized_info = Vec::new();
3554        info.serialize_with_version(&mut serialized_info).unwrap();
3555        assert!(
3556            serialized_info.len() <= MAX_STORE_INFO_SERIALIZED_SIZE,
3557            "{}",
3558            serialized_info.len()
3559        );
3560    }
3561
3562    async fn reopen_after_crypt_failure_inner(read_only: bool) {
3563        let fs = test_filesystem().await;
3564        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3565
3566        let store = {
3567            let crypt = Arc::new(InsecureCrypt::new());
3568            let store = root_volume
3569                .new_volume(
3570                    "vol",
3571                    NewChildStoreOptions {
3572                        options: StoreOptions {
3573                            crypt: Some(crypt.clone()),
3574                            ..StoreOptions::default()
3575                        },
3576                        ..Default::default()
3577                    },
3578                )
3579                .await
3580                .expect("new_volume failed");
3581            let root_directory = Directory::open(&store, store.root_directory_object_id())
3582                .await
3583                .expect("open failed");
3584            let mut transaction = fs
3585                .clone()
3586                .new_transaction(
3587                    lock_keys![LockKey::object(
3588                        store.store_object_id(),
3589                        root_directory.object_id()
3590                    )],
3591                    Options::default(),
3592                )
3593                .await
3594                .expect("new_transaction failed");
3595            root_directory
3596                .create_child_file(&mut transaction, "test")
3597                .await
3598                .expect("create_child_file failed");
3599            transaction.commit().await.expect("commit failed");
3600
3601            crypt.shutdown();
3602            let mut transaction = fs
3603                .clone()
3604                .new_transaction(
3605                    lock_keys![LockKey::object(
3606                        store.store_object_id(),
3607                        root_directory.object_id()
3608                    )],
3609                    Options::default(),
3610                )
3611                .await
3612                .expect("new_transaction failed");
3613            root_directory
3614                .create_child_file(&mut transaction, "test2")
3615                .await
3616                .map(|_| ())
3617                .expect_err("create_child_file should fail");
3618            store.lock().await.expect("lock failed");
3619            store
3620        };
3621
3622        let crypt = Arc::new(InsecureCrypt::new());
3623        if read_only {
3624            store.unlock_read_only(crypt).await.expect("unlock failed");
3625        } else {
3626            store.unlock(NO_OWNER, crypt).await.expect("unlock failed");
3627        }
3628        let root_directory =
3629            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
3630        root_directory.lookup("test").await.expect("lookup failed").expect("not found");
3631    }
3632
3633    #[fuchsia::test(threads = 10)]
3634    async fn test_reopen_after_crypt_failure() {
3635        reopen_after_crypt_failure_inner(false).await;
3636    }
3637
3638    #[fuchsia::test(threads = 10)]
3639    async fn test_reopen_read_only_after_crypt_failure() {
3640        reopen_after_crypt_failure_inner(true).await;
3641    }
3642
3643    #[fuchsia::test(threads = 10)]
3644    #[should_panic(expected = "Insufficient reservation space")]
3645    #[cfg(debug_assertions)]
3646    async fn large_transaction_causes_panic_in_debug_builds() {
3647        let fs = test_filesystem().await;
3648        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3649        let store = root_volume
3650            .new_volume("vol", NewChildStoreOptions::default())
3651            .await
3652            .expect("new_volume failed");
3653        let root_directory =
3654            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
3655        let mut transaction = fs
3656            .clone()
3657            .new_transaction(
3658                lock_keys![LockKey::object(store.store_object_id(), root_directory.object_id())],
3659                Options::default(),
3660            )
3661            .await
3662            .expect("transaction");
3663        for i in 0..500 {
3664            root_directory
3665                .create_symlink(&mut transaction, b"link", &format!("{}", i))
3666                .await
3667                .expect("symlink");
3668        }
3669        assert_eq!(transaction.commit().await.expect("commit"), 0);
3670    }
3671
3672    #[fuchsia::test]
3673    async fn test_crypt_failure_does_not_fuse_journal() {
3674        let fs = test_filesystem().await;
3675
3676        struct Owner;
3677        #[async_trait]
3678        impl StoreOwner for Owner {
3679            async fn force_lock(self: Arc<Self>, store: &ObjectStore) -> Result<(), anyhow::Error> {
3680                store.lock().await
3681            }
3682        }
3683        let owner = Arc::new(Owner) as Arc<dyn StoreOwner>;
3684
3685        {
3686            // Create two stores and a record for each store, so the journal will need to flush them
3687            // both later.
3688            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3689            let store1 = root_volume
3690                .new_volume(
3691                    "vol1",
3692                    NewChildStoreOptions {
3693                        options: StoreOptions {
3694                            crypt: Some(Arc::new(InsecureCrypt::new())),
3695                            ..StoreOptions::default()
3696                        },
3697                        ..Default::default()
3698                    },
3699                )
3700                .await
3701                .expect("new_volume failed");
3702            let crypt = Arc::new(InsecureCrypt::new());
3703            let store2 = root_volume
3704                .new_volume(
3705                    "vol2",
3706                    NewChildStoreOptions {
3707                        options: StoreOptions {
3708                            owner: Arc::downgrade(&owner),
3709                            crypt: Some(crypt.clone()),
3710                        },
3711                        ..Default::default()
3712                    },
3713                )
3714                .await
3715                .expect("new_volume failed");
3716            for store in [&store1, &store2] {
3717                let root_directory = Directory::open(store, store.root_directory_object_id())
3718                    .await
3719                    .expect("open failed");
3720                let mut transaction = fs
3721                    .clone()
3722                    .new_transaction(
3723                        lock_keys![LockKey::object(
3724                            store.store_object_id(),
3725                            root_directory.object_id()
3726                        )],
3727                        Options::default(),
3728                    )
3729                    .await
3730                    .expect("new_transaction failed");
3731                root_directory
3732                    .create_child_file(&mut transaction, "test")
3733                    .await
3734                    .expect("create_child_file failed");
3735                transaction.commit().await.expect("commit failed");
3736            }
3737            // Shut down the crypt instance for store2, and then compact.  Compaction should not
3738            // fail, and the store should become locked.
3739            crypt.shutdown();
3740            fs.journal().compact().await.expect("compact failed");
3741            // The store should now be locked.
3742            assert!(store2.is_locked());
3743        }
3744
3745        // Even though the store wasn't flushed, the mutation to store2 will still be valid as it is
3746        // held in the journal.
3747        fs.close().await.expect("close failed");
3748        let device = fs.take_device().await;
3749        device.reopen(false);
3750        let fs = FxFilesystem::open(device).await.expect("open failed");
3751        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3752
3753        for volume_name in ["vol1", "vol2"] {
3754            let store = root_volume
3755                .volume(
3756                    volume_name,
3757                    StoreOptions {
3758                        crypt: Some(Arc::new(InsecureCrypt::new())),
3759                        ..StoreOptions::default()
3760                    },
3761                )
3762                .await
3763                .expect("open volume failed");
3764            let root_directory = Directory::open(&store, store.root_directory_object_id())
3765                .await
3766                .expect("open failed");
3767            assert!(root_directory.lookup("test").await.expect("lookup failed").is_some());
3768        }
3769
3770        fs.close().await.expect("close failed");
3771    }
3772
3773    #[fuchsia::test]
3774    async fn test_crypt_failure_during_unlock_race() {
3775        let fs = test_filesystem().await;
3776
3777        struct Owner;
3778        #[async_trait]
3779        impl StoreOwner for Owner {
3780            async fn force_lock(self: Arc<Self>, store: &ObjectStore) -> Result<(), anyhow::Error> {
3781                store.lock().await
3782            }
3783        }
3784        let owner = Arc::new(Owner) as Arc<dyn StoreOwner>;
3785
3786        let store_object_id = {
3787            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3788            let store = root_volume
3789                .new_volume(
3790                    "vol",
3791                    NewChildStoreOptions {
3792                        options: StoreOptions {
3793                            owner: Arc::downgrade(&owner),
3794                            crypt: Some(Arc::new(InsecureCrypt::new())),
3795                        },
3796                        ..Default::default()
3797                    },
3798                )
3799                .await
3800                .expect("new_volume failed");
3801            let root_directory = Directory::open(&store, store.root_directory_object_id())
3802                .await
3803                .expect("open failed");
3804            let mut transaction = fs
3805                .clone()
3806                .new_transaction(
3807                    lock_keys![LockKey::object(
3808                        store.store_object_id(),
3809                        root_directory.object_id()
3810                    )],
3811                    Options::default(),
3812                )
3813                .await
3814                .expect("new_transaction failed");
3815            root_directory
3816                .create_child_file(&mut transaction, "test")
3817                .await
3818                .expect("create_child_file failed");
3819            transaction.commit().await.expect("commit failed");
3820            store.store_object_id()
3821        };
3822
3823        fs.close().await.expect("close failed");
3824        let device = fs.take_device().await;
3825        device.reopen(false);
3826
3827        let fs = FxFilesystem::open(device).await.expect("open failed");
3828        {
3829            let fs_clone = fs.clone();
3830            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3831
3832            let crypt = Arc::new(InsecureCrypt::new());
3833            let crypt_clone = crypt.clone();
3834            join!(
3835                async move {
3836                    // Unlock might fail, so ignore errors.
3837                    let _ = root_volume
3838                        .volume(
3839                            "vol",
3840                            StoreOptions {
3841                                owner: Arc::downgrade(&owner),
3842                                crypt: Some(crypt_clone),
3843                            },
3844                        )
3845                        .await;
3846                },
3847                async move {
3848                    // Block until unlock is finished but before flushing due to unlock is finished, to
3849                    // maximize the chances of weirdness.
3850                    let keys = lock_keys![LockKey::flush(store_object_id)];
3851                    let _ = fs_clone.lock_manager().write_lock(keys).await;
3852                    crypt.shutdown();
3853                }
3854            );
3855        }
3856
3857        fs.close().await.expect("close failed");
3858        let device = fs.take_device().await;
3859        device.reopen(false);
3860
3861        let fs = FxFilesystem::open(device).await.expect("open failed");
3862        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3863        let store = root_volume
3864            .volume(
3865                "vol",
3866                StoreOptions {
3867                    crypt: Some(Arc::new(InsecureCrypt::new())),
3868                    ..StoreOptions::default()
3869                },
3870            )
3871            .await
3872            .expect("open volume failed");
3873        let root_directory =
3874            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
3875        assert!(root_directory.lookup("test").await.expect("lookup failed").is_some());
3876
3877        fs.close().await.expect("close failed");
3878    }
3879}