fxfs/
object_store.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5pub mod allocator;
6pub mod caching_object_handle;
7pub mod data_object_handle;
8pub mod directory;
9mod extent_mapping_iterator;
10mod extent_record;
11mod flush;
12pub mod graveyard;
13mod install;
14pub mod journal;
15mod key_manager;
16pub(crate) mod merge;
17pub mod object_manager;
18pub mod object_record;
19pub mod project_id;
20mod store_object_handle;
21pub mod transaction;
22mod tree;
23mod tree_cache;
24pub mod volume;
25
26pub use data_object_handle::{
27    DataObjectHandle, DirectWriter, FileExtent, FsverityState, FsverityStateInner, RangeType,
28};
29pub use directory::Directory;
30pub use object_record::{ChildValue, ObjectDescriptor, PosixAttributes, Timestamp};
31pub use store_object_handle::{
32    EXTENDED_ATTRIBUTE_RANGE_END, EXTENDED_ATTRIBUTE_RANGE_START, SetExtendedAttributeMode,
33    StoreObjectHandle,
34};
35
36use crate::errors::FxfsError;
37use crate::filesystem::{
38    ApplyContext, ApplyMode, FxFilesystem, JournalingObject, MAX_FILE_SIZE, SyncOptions,
39    TruncateGuard, TxnGuard,
40};
41use crate::log::*;
42use crate::lsm_tree::cache::{NullCache, ObjectCache};
43use crate::lsm_tree::types::{Item, ItemRef, LayerIterator};
44use crate::lsm_tree::{LSMTree, Query};
45use crate::object_handle::{INVALID_OBJECT_ID, ObjectHandle, ObjectProperties, ReadObjectHandle};
46use crate::object_store::allocator::Allocator;
47use crate::object_store::graveyard::Graveyard;
48use crate::object_store::journal::{JournalCheckpoint, JournalCheckpointV32, JournaledTransaction};
49use crate::object_store::key_manager::KeyManager;
50use crate::object_store::transaction::{
51    AssocObj, AssociatedObject, LockKey, ObjectStoreMutation, Operation, Options, Transaction,
52    lock_keys,
53};
54use crate::range::RangeExt;
55use crate::round::round_up;
56use crate::serialized_types::{Version, Versioned, VersionedLatest};
57use anyhow::{Context, Error, anyhow, bail, ensure};
58use async_trait::async_trait;
59use fidl_fuchsia_io as fio;
60use fprint::TypeFingerprint;
61use fuchsia_sync::Mutex;
62use fxfs_crypto::ff1::Ff1;
63use fxfs_crypto::{
64    Cipher, Crypt, FxfsCipher, KeyPurpose, ObjectType, StreamCipher, UnwrappedKey, WrappingKeyId,
65};
66use fxfs_macros::{Migrate, migrate_to_version};
67use scopeguard::ScopeGuard;
68use serde::{Deserialize, Serialize};
69use std::fmt;
70use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
71use std::sync::{Arc, OnceLock, Weak};
72use storage_device::Device;
73use uuid::Uuid;
74
75pub use extent_record::{
76    BLOB_MERKLE_ATTRIBUTE_ID, DEFAULT_DATA_ATTRIBUTE_ID, ExtentKey, ExtentMode, ExtentValue,
77    FSVERITY_MERKLE_ATTRIBUTE_ID,
78};
79pub use object_record::{
80    AttributeKey, EncryptionKey, EncryptionKeys, ExtendedAttributeValue, FsverityMetadata, FxfsKey,
81    FxfsKeyV40, FxfsKeyV49, ObjectAttributes, ObjectKey, ObjectKeyData, ObjectKind, ObjectValue,
82    ProjectProperty, RootDigest,
83};
84pub use transaction::Mutation;
85
86// For encrypted stores, the lower 32 bits of the object ID are encrypted to make side-channel
87// attacks more difficult. This mask can be used to extract the hi part of the object ID.
88const OBJECT_ID_HI_MASK: u64 = 0xffffffff00000000;
89
90// At time of writing, this threshold limits transactions that delete extents to about 10,000 bytes.
91const TRANSACTION_MUTATION_THRESHOLD: usize = 200;
92
93// Encrypted files and directories use the fscrypt key (identified by `FSCRYPT_KEY_ID`) to encrypt
94// file contents and filenames respectively. All non-fscrypt encrypted files otherwise default to
95// using the `VOLUME_DATA_KEY_ID` key. Note, the filesystem always uses the `VOLUME_DATA_KEY_ID`
96// key to encrypt large extended attributes. Thus, encrypted files and directories with large
97// xattrs will have both an fscrypt and volume data key.
98pub const VOLUME_DATA_KEY_ID: u64 = 0;
99pub const FSCRYPT_KEY_ID: u64 = 1;
100
101/// A constant that can be used where an owner is expected of type `Weak<dyn StoreOwner>` but no
102/// owner is required.
103pub const NO_OWNER: Weak<()> = Weak::new();
104impl StoreOwner for () {}
105
106#[async_trait]
107pub trait StoreOwner: Send + Sync {
108    /// Forcibly lock the store.  This exists to give the StoreOwner an opportunity to clean up
109    /// tasks which might access the store before locking it, because ObjectStore::unlock can only
110    /// be called when the store is not in use.
111    async fn force_lock(self: Arc<Self>, _store: &ObjectStore) -> Result<(), Error> {
112        Err(anyhow!(FxfsError::Internal))
113    }
114}
115
116/// DataObjectHandle stores an owner that must implement this trait, which allows the handle to get
117/// back to an ObjectStore.
118pub trait HandleOwner: AsRef<ObjectStore> + Send + Sync + 'static {}
119
120/// StoreInfo stores information about the object store.  This is stored within the parent object
121/// store, and is used, for example, to get the persistent layer objects.
122pub type StoreInfo = StoreInfoV49;
123
124#[derive(Clone, Debug, Default, Serialize, Deserialize, TypeFingerprint, Versioned)]
125pub struct StoreInfoV49 {
126    /// The globally unique identifier for the associated object store. If unset, will be all zero.
127    guid: [u8; 16],
128
129    /// The last used object ID.  Note that this field is not accurate in memory; ObjectStore's
130    /// last_object_id field is the one to use in that case.  Technically, this might not be the
131    /// last object ID used for the latest transaction that created an object because we use this at
132    /// the point of creating the object but before we commit the transaction.  Transactions can
133    /// then get committed in an arbitrary order (or not at all).
134    last_object_id: u64,
135
136    /// Object ids for layers.  TODO(https://fxbug.dev/42178036): need a layer of indirection here
137    /// so we can support snapshots.
138    pub layers: Vec<u64>,
139
140    /// The object ID for the root directory.
141    root_directory_object_id: u64,
142
143    /// The object ID for the graveyard.
144    graveyard_directory_object_id: u64,
145
146    /// The number of live objects in the store.  This should *not* be trusted; it can be invalid
147    /// due to filesystem inconsistencies.
148    object_count: u64,
149
150    /// The (wrapped) key that encrypted mutations should use.
151    mutations_key: Option<FxfsKeyV49>,
152
153    /// Mutations for the store are encrypted using a stream cipher.  To decrypt the mutations, we
154    /// need to know the offset in the cipher stream to start it.
155    mutations_cipher_offset: u64,
156
157    /// If we have to flush the store whilst we do not have the key, we need to write the encrypted
158    /// mutations to an object. This is the object ID of that file if it exists.
159    pub encrypted_mutations_object_id: u64,
160
161    /// Object IDs are encrypted to reduce the amount of information that sequential object IDs
162    /// reveal (such as the number of files in the system and the ordering of their creation in
163    /// time).  Only the bottom 32 bits of the object ID are encrypted whilst the top 32 bits will
164    /// increment after 2^32 object IDs have been used and this allows us to roll the key.
165    object_id_key: Option<FxfsKeyV49>,
166
167    /// A directory for storing internal files in a directory structure. Holds INVALID_OBJECT_ID
168    /// when the directory doesn't yet exist.
169    internal_directory_object_id: u64,
170}
171
172#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
173#[migrate_to_version(StoreInfoV49)]
174pub struct StoreInfoV40 {
175    guid: [u8; 16],
176    last_object_id: u64,
177    pub layers: Vec<u64>,
178    root_directory_object_id: u64,
179    graveyard_directory_object_id: u64,
180    object_count: u64,
181    mutations_key: Option<FxfsKeyV40>,
182    mutations_cipher_offset: u64,
183    pub encrypted_mutations_object_id: u64,
184    object_id_key: Option<FxfsKeyV40>,
185    internal_directory_object_id: u64,
186}
187
188impl StoreInfo {
189    /// Create a new/default [`StoreInfo`] but with a newly generated GUID.
190    fn new_with_guid() -> Self {
191        let guid = Uuid::new_v4();
192        Self { guid: *guid.as_bytes(), ..Default::default() }
193    }
194
195    /// Returns the parent objects for this store.
196    pub fn parent_objects(&self) -> Vec<u64> {
197        // We should not include the ID of the store itself, since that should be referred to in the
198        // volume directory.
199        let mut objects = self.layers.to_vec();
200        if self.encrypted_mutations_object_id != INVALID_OBJECT_ID {
201            objects.push(self.encrypted_mutations_object_id);
202        }
203        objects
204    }
205}
206
207// TODO(https://fxbug.dev/42178037): We should test or put checks in place to ensure this limit isn't exceeded.
208// It will likely involve placing limits on the maximum number of layers.
209pub const MAX_STORE_INFO_SERIALIZED_SIZE: usize = 131072;
210
211// This needs to be large enough to accommodate the maximum amount of unflushed data (data that is
212// in the journal but hasn't yet been written to layer files) for a store.  We set a limit because
213// we want to limit the amount of memory use in the case the filesystem is corrupt or under attack.
214pub const MAX_ENCRYPTED_MUTATIONS_SIZE: usize = 8 * journal::DEFAULT_RECLAIM_SIZE as usize;
215
216#[derive(Default)]
217pub struct HandleOptions {
218    /// If true, transactions used by this handle will skip journal space checks.
219    pub skip_journal_checks: bool,
220    /// If true, data written to any attribute of this handle will not have per-block checksums
221    /// computed.
222    pub skip_checksums: bool,
223    /// If true, any files using fsverity will not attempt to perform any verification. This is
224    /// useful to open an object without the correct encryption keys to look at the metadata.
225    pub skip_fsverity: bool,
226}
227
228/// Parameters for encrypting a newly created object.
229pub struct ObjectEncryptionOptions {
230    /// If set, the keys are treated as permanent and never evicted from the KeyManager cache.
231    /// This is necessary when keys are managed by another store; for example, the layer files
232    /// of a child store are objects in the root store, but they are encrypted with keys from the
233    /// child store.  Generally, most objects should have this set to `false`.
234    pub permanent: bool,
235    pub key_id: u64,
236    pub key: EncryptionKey,
237    pub unwrapped_key: UnwrappedKey,
238}
239
240pub struct StoreOptions {
241    /// The owner of the store.
242    pub owner: Weak<dyn StoreOwner>,
243
244    /// The store is unencrypted if store is none.
245    pub crypt: Option<Arc<dyn Crypt>>,
246}
247
248impl Default for StoreOptions {
249    fn default() -> Self {
250        Self { owner: NO_OWNER, crypt: None }
251    }
252}
253
254#[derive(Default)]
255pub struct NewChildStoreOptions {
256    pub options: StoreOptions,
257
258    /// Specifies the object ID in the root store to be used for the store.  If set to
259    /// INVALID_OBJECT_ID (the default and typical case), a suitable ID will be chosen.
260    pub object_id: u64,
261
262    /// If true, reserve all 32 bit object_ids.
263    pub reserve_32bit_object_ids: bool,
264}
265
266pub type EncryptedMutations = EncryptedMutationsV49;
267
268#[derive(Clone, Default, Deserialize, Serialize, TypeFingerprint)]
269pub struct EncryptedMutationsV49 {
270    // Information about the mutations are held here, but the actual encrypted data is held within
271    // data.  For each transaction, we record the checkpoint and the count of mutations within the
272    // transaction.  The checkpoint is required for the log file offset (which we need to apply the
273    // mutations), and the version so that we can correctly decode the mutation after it has been
274    // decrypted. The count specifies the number of serialized mutations encoded in |data|.
275    transactions: Vec<(JournalCheckpointV32, u64)>,
276
277    // The encrypted mutations.
278    data: Vec<u8>,
279
280    // If the mutations key was rolled, this holds the offset in `data` where the new key should
281    // apply.
282    mutations_key_roll: Vec<(usize, FxfsKeyV49)>,
283}
284
285impl std::fmt::Debug for EncryptedMutations {
286    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
287        f.debug_struct("EncryptedMutations")
288            .field("transactions", &self.transactions)
289            .field("len", &self.data.len())
290            .field(
291                "mutations_key_roll",
292                &self.mutations_key_roll.iter().map(|k| k.0).collect::<Vec<usize>>(),
293            )
294            .finish()
295    }
296}
297
298impl Versioned for EncryptedMutations {
299    fn max_serialized_size() -> u64 {
300        MAX_ENCRYPTED_MUTATIONS_SIZE as u64
301    }
302}
303
304impl From<EncryptedMutationsV40> for EncryptedMutationsV49 {
305    fn from(value: EncryptedMutationsV40) -> Self {
306        EncryptedMutationsV49 {
307            transactions: value.transactions,
308            data: value.data,
309            mutations_key_roll: value
310                .mutations_key_roll
311                .into_iter()
312                .map(|(offset, key)| (offset, key.into()))
313                .collect(),
314        }
315    }
316}
317
318#[derive(Deserialize, Serialize, TypeFingerprint)]
319pub struct EncryptedMutationsV40 {
320    transactions: Vec<(JournalCheckpointV32, u64)>,
321    data: Vec<u8>,
322    mutations_key_roll: Vec<(usize, FxfsKeyV40)>,
323}
324
325impl Versioned for EncryptedMutationsV40 {
326    fn max_serialized_size() -> u64 {
327        MAX_ENCRYPTED_MUTATIONS_SIZE as u64
328    }
329}
330
331impl EncryptedMutations {
332    fn from_replayed_mutations(
333        store_object_id: u64,
334        transactions: Vec<JournaledTransaction>,
335    ) -> Self {
336        let mut this = Self::default();
337        for JournaledTransaction { checkpoint, non_root_mutations, .. } in transactions {
338            for (object_id, mutation) in non_root_mutations {
339                if store_object_id == object_id {
340                    if let Mutation::EncryptedObjectStore(data) = mutation {
341                        this.push(&checkpoint, data);
342                    } else if let Mutation::UpdateMutationsKey(key) = mutation {
343                        this.mutations_key_roll.push((this.data.len(), key.into()));
344                    }
345                }
346            }
347        }
348        this
349    }
350
351    fn extend(&mut self, other: &EncryptedMutations) {
352        self.transactions.extend_from_slice(&other.transactions[..]);
353        self.mutations_key_roll.extend(
354            other
355                .mutations_key_roll
356                .iter()
357                .map(|(offset, key)| (offset + self.data.len(), key.clone())),
358        );
359        self.data.extend_from_slice(&other.data[..]);
360    }
361
362    fn push(&mut self, checkpoint: &JournalCheckpoint, data: Box<[u8]>) {
363        self.data.append(&mut data.into());
364        // If the checkpoint is the same as the last mutation we pushed, increment the count.
365        if let Some((last_checkpoint, count)) = self.transactions.last_mut() {
366            if last_checkpoint.file_offset == checkpoint.file_offset {
367                *count += 1;
368                return;
369            }
370        }
371        self.transactions.push((checkpoint.clone(), 1));
372    }
373}
374
375pub enum LockState {
376    Locked,
377    Unencrypted,
378    Unlocked { owner: Weak<dyn StoreOwner>, crypt: Arc<dyn Crypt> },
379
380    // The store is unlocked, but in a read-only state, and no flushes or other operations will be
381    // performed on the store.
382    UnlockedReadOnly(Arc<dyn Crypt>),
383
384    // The store is encrypted but is now in an unusable state (due to a failure to sync the journal
385    // after locking the store).  The store cannot be unlocked.
386    Invalid,
387
388    // Before we've read the StoreInfo we might not know whether the store is Locked or Unencrypted.
389    // This can happen when lazily opening stores (ObjectManager::lazy_open_store).
390    Unknown,
391
392    // The store is in the process of being locked.  Whilst the store is being locked, the store
393    // isn't usable; assertions will trip if any mutations are applied.
394    Locking,
395
396    // Whilst we're unlocking, we will replay encrypted mutations.  The store isn't usable until
397    // it's in the Unlocked state.
398    Unlocking,
399
400    // The store has been deleted.
401    Deleted,
402}
403
404impl LockState {
405    fn owner(&self) -> Option<Arc<dyn StoreOwner>> {
406        if let Self::Unlocked { owner, .. } = self { owner.upgrade() } else { None }
407    }
408}
409
410impl fmt::Debug for LockState {
411    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
412        formatter.write_str(match self {
413            LockState::Locked => "Locked",
414            LockState::Unencrypted => "Unencrypted",
415            LockState::Unlocked { .. } => "Unlocked",
416            LockState::UnlockedReadOnly(..) => "UnlockedReadOnly",
417            LockState::Invalid => "Invalid",
418            LockState::Unknown => "Unknown",
419            LockState::Locking => "Locking",
420            LockState::Unlocking => "Unlocking",
421            LockState::Deleted => "Deleted",
422        })
423    }
424}
425
426#[derive(Default, Clone)]
427struct LastObjectId {
428    // The *unencrypted* value of the last object ID.
429    id: u64,
430
431    // Encrypted stores will use a cipher to obfuscate the object ID.
432    cipher: Option<Ff1>,
433}
434
435impl LastObjectId {
436    // Returns true if a cipher is needed to generate new object IDs.
437    fn should_create_cipher(&self) -> bool {
438        self.cipher.is_some() && self.id as u32 == u32::MAX
439    }
440
441    fn get_next_object_id(&mut self) -> u64 {
442        if let Some(cipher) = &self.cipher {
443            let hi = self.id & OBJECT_ID_HI_MASK;
444            loop {
445                self.id += 1;
446                assert_ne!(self.id as u32, 0); // This would indicate the ID wrapped.
447                let candidate = hi | cipher.encrypt(self.id as u32) as u64;
448                if candidate != INVALID_OBJECT_ID {
449                    break candidate;
450                }
451            }
452        } else {
453            self.id += 1;
454            self.id
455        }
456    }
457}
458
459/// An object store supports a file like interface for objects.  Objects are keyed by a 64 bit
460/// identifier.  And object store has to be backed by a parent object store (which stores metadata
461/// for the object store).  The top-level object store (a.k.a. the root parent object store) is
462/// in-memory only.
463pub struct ObjectStore {
464    parent_store: Option<Arc<ObjectStore>>,
465    store_object_id: u64,
466    device: Arc<dyn Device>,
467    block_size: u64,
468    filesystem: Weak<FxFilesystem>,
469    // Lock ordering: This must be taken before `lock_state`.
470    store_info: Mutex<Option<StoreInfo>>,
471    tree: LSMTree<ObjectKey, ObjectValue>,
472
473    // When replaying the journal, the store cannot read StoreInfo until the whole journal
474    // has been replayed, so during that time, store_info_handle will be None and records
475    // just get sent to the tree. Once the journal has been replayed, we can open the store
476    // and load all the other layer information.
477    store_info_handle: OnceLock<DataObjectHandle<ObjectStore>>,
478
479    // The cipher to use for encrypted mutations, if this store is encrypted.
480    mutations_cipher: Mutex<Option<StreamCipher>>,
481
482    // Current lock state of the store.
483    // Lock ordering: This must be taken after `store_info`.
484    lock_state: Mutex<LockState>,
485    pub key_manager: KeyManager,
486
487    // Enable/disable tracing.
488    trace: AtomicBool,
489
490    // Informational counters for events occurring within the store.
491    counters: Mutex<ObjectStoreCounters>,
492
493    // These are updated in performance-sensitive code paths so we use atomics instead of counters.
494    device_read_ops: AtomicU64,
495    device_write_ops: AtomicU64,
496    logical_read_ops: AtomicU64,
497    logical_write_ops: AtomicU64,
498
499    // Contains the last object ID and, optionally, a cipher to be used when generating new object
500    // IDs.
501    last_object_id: Mutex<LastObjectId>,
502
503    // An optional callback to be invoked each time the ObjectStore flushes.  The callback is
504    // invoked at the end of flush, while the write lock is still held.
505    flush_callback: Mutex<Option<Box<dyn Fn(&ObjectStore) + Send + Sync + 'static>>>,
506}
507
508#[derive(Clone, Default)]
509struct ObjectStoreCounters {
510    mutations_applied: u64,
511    mutations_dropped: u64,
512    num_flushes: u64,
513    last_flush_time: Option<std::time::SystemTime>,
514}
515
516impl ObjectStore {
517    fn new(
518        parent_store: Option<Arc<ObjectStore>>,
519        store_object_id: u64,
520        filesystem: Arc<FxFilesystem>,
521        store_info: Option<StoreInfo>,
522        object_cache: Box<dyn ObjectCache<ObjectKey, ObjectValue>>,
523        mutations_cipher: Option<StreamCipher>,
524        lock_state: LockState,
525        last_object_id: LastObjectId,
526    ) -> Arc<ObjectStore> {
527        let device = filesystem.device();
528        let block_size = filesystem.block_size();
529        Arc::new(ObjectStore {
530            parent_store,
531            store_object_id,
532            device,
533            block_size,
534            filesystem: Arc::downgrade(&filesystem),
535            store_info: Mutex::new(store_info),
536            tree: LSMTree::new(merge::merge, object_cache),
537            store_info_handle: OnceLock::new(),
538            mutations_cipher: Mutex::new(mutations_cipher),
539            lock_state: Mutex::new(lock_state),
540            key_manager: KeyManager::new(),
541            trace: AtomicBool::new(false),
542            counters: Mutex::new(ObjectStoreCounters::default()),
543            device_read_ops: AtomicU64::new(0),
544            device_write_ops: AtomicU64::new(0),
545            logical_read_ops: AtomicU64::new(0),
546            logical_write_ops: AtomicU64::new(0),
547            last_object_id: Mutex::new(last_object_id),
548            flush_callback: Mutex::new(None),
549        })
550    }
551
552    fn new_empty(
553        parent_store: Option<Arc<ObjectStore>>,
554        store_object_id: u64,
555        filesystem: Arc<FxFilesystem>,
556        object_cache: Box<dyn ObjectCache<ObjectKey, ObjectValue>>,
557    ) -> Arc<Self> {
558        Self::new(
559            parent_store,
560            store_object_id,
561            filesystem,
562            Some(StoreInfo::default()),
563            object_cache,
564            None,
565            LockState::Unencrypted,
566            LastObjectId::default(),
567        )
568    }
569
570    /// Cycle breaker constructor that returns an ObjectStore without a filesystem.
571    /// This should only be used from super block code.
572    pub fn new_root_parent(device: Arc<dyn Device>, block_size: u64, store_object_id: u64) -> Self {
573        ObjectStore {
574            parent_store: None,
575            store_object_id,
576            device,
577            block_size,
578            filesystem: Weak::<FxFilesystem>::new(),
579            store_info: Mutex::new(Some(StoreInfo::default())),
580            tree: LSMTree::new(merge::merge, Box::new(NullCache {})),
581            store_info_handle: OnceLock::new(),
582            mutations_cipher: Mutex::new(None),
583            lock_state: Mutex::new(LockState::Unencrypted),
584            key_manager: KeyManager::new(),
585            trace: AtomicBool::new(false),
586            counters: Mutex::new(ObjectStoreCounters::default()),
587            device_read_ops: AtomicU64::new(0),
588            device_write_ops: AtomicU64::new(0),
589            logical_read_ops: AtomicU64::new(0),
590            logical_write_ops: AtomicU64::new(0),
591            last_object_id: Mutex::new(LastObjectId::default()),
592            flush_callback: Mutex::new(None),
593        }
594    }
595
596    /// Used to set filesystem on root_parent stores at bootstrap time after the filesystem has
597    /// been created.
598    pub fn attach_filesystem(mut this: ObjectStore, filesystem: Arc<FxFilesystem>) -> ObjectStore {
599        this.filesystem = Arc::downgrade(&filesystem);
600        this
601    }
602
603    /// Create a child store. It is a multi-step process:
604    ///
605    ///   1. Call `ObjectStore::new_child_store`.
606    ///   2. Register the store with the object-manager.
607    ///   3. Call `ObjectStore::create` to write the store-info.
608    ///
609    /// If the procedure fails, care must be taken to unregister store with the object-manager.
610    ///
611    /// The steps have to be separate because of lifetime issues when working with a transaction.
612    async fn new_child_store(
613        self: &Arc<Self>,
614        transaction: &mut Transaction<'_>,
615        options: NewChildStoreOptions,
616        object_cache: Box<dyn ObjectCache<ObjectKey, ObjectValue>>,
617    ) -> Result<Arc<Self>, Error> {
618        let handle = if options.object_id != INVALID_OBJECT_ID {
619            let handle = ObjectStore::create_object_with_id(
620                self,
621                transaction,
622                options.object_id,
623                HandleOptions::default(),
624                None,
625            )?;
626            self.update_last_object_id(options.object_id);
627            handle
628        } else {
629            ObjectStore::create_object(self, transaction, HandleOptions::default(), None).await?
630        };
631        let filesystem = self.filesystem();
632        let id = if options.reserve_32bit_object_ids { 0x1_0000_0000 } else { 0 };
633        let store = if let Some(crypt) = options.options.crypt {
634            let (wrapped_key, unwrapped_key) =
635                crypt.create_key(handle.object_id(), KeyPurpose::Metadata).await?;
636            let (object_id_wrapped, object_id_unwrapped) =
637                crypt.create_key(handle.object_id(), KeyPurpose::Metadata).await?;
638            Self::new(
639                Some(self.clone()),
640                handle.object_id(),
641                filesystem.clone(),
642                Some(StoreInfo {
643                    mutations_key: Some(wrapped_key),
644                    object_id_key: Some(object_id_wrapped),
645                    ..StoreInfo::new_with_guid()
646                }),
647                object_cache,
648                Some(StreamCipher::new(&unwrapped_key, 0)),
649                LockState::Unlocked { owner: options.options.owner, crypt },
650                LastObjectId { id, cipher: Some(Ff1::new(&object_id_unwrapped)) },
651            )
652        } else {
653            Self::new(
654                Some(self.clone()),
655                handle.object_id(),
656                filesystem.clone(),
657                Some(StoreInfo::new_with_guid()),
658                object_cache,
659                None,
660                LockState::Unencrypted,
661                LastObjectId { id, ..LastObjectId::default() },
662            )
663        };
664        assert!(store.store_info_handle.set(handle).is_ok());
665        Ok(store)
666    }
667
668    /// Actually creates the store in a transaction.  This will also create a root directory and
669    /// graveyard directory for the store.  See `new_child_store` above.
670    async fn create<'a>(
671        self: &'a Arc<Self>,
672        transaction: &mut Transaction<'a>,
673    ) -> Result<(), Error> {
674        let buf = {
675            // Create a root directory and graveyard directory.
676            let graveyard_directory_object_id = Graveyard::create(transaction, &self);
677            let root_directory = Directory::create(transaction, &self, None).await?;
678
679            let serialized_info = {
680                let mut store_info = self.store_info.lock();
681                let store_info = store_info.as_mut().unwrap();
682
683                store_info.graveyard_directory_object_id = graveyard_directory_object_id;
684                store_info.root_directory_object_id = root_directory.object_id();
685
686                let mut serialized_info = Vec::new();
687                store_info.serialize_with_version(&mut serialized_info)?;
688                serialized_info
689            };
690            let mut buf = self.device.allocate_buffer(serialized_info.len()).await;
691            buf.as_mut_slice().copy_from_slice(&serialized_info[..]);
692            buf
693        };
694
695        if self.filesystem().options().image_builder_mode.is_some() {
696            // If we're in image builder mode, we want to avoid writing to disk unless explicitly
697            // asked to. New object stores will have their StoreInfo written when we compact in
698            // FxFilesystem::finalize().
699            Ok(())
700        } else {
701            self.store_info_handle.get().unwrap().txn_write(transaction, 0u64, buf.as_ref()).await
702        }
703    }
704
705    pub fn set_trace(&self, trace: bool) {
706        let old_value = self.trace.swap(trace, Ordering::Relaxed);
707        if trace != old_value {
708            info!(store_id = self.store_object_id(), trace; "OS: trace",);
709        }
710    }
711
712    /// Sets a callback to be invoked each time the ObjectStore flushes.  The callback is invoked at
713    /// the end of flush, while the write lock is still held.
714    pub fn set_flush_callback<F: Fn(&ObjectStore) + Send + Sync + 'static>(&self, callback: F) {
715        let mut flush_callback = self.flush_callback.lock();
716        *flush_callback = Some(Box::new(callback));
717    }
718
719    pub fn is_root(&self) -> bool {
720        if let Some(parent) = &self.parent_store {
721            parent.parent_store.is_none()
722        } else {
723            // The root parent store isn't the root store.
724            false
725        }
726    }
727
728    /// Populates an inspect node with store statistics.
729    pub fn record_data(self: &Arc<Self>, root: &fuchsia_inspect::Node) {
730        // TODO(https://fxbug.dev/42069513): Push-back or rate-limit to prevent DoS.
731        let counters = self.counters.lock();
732        if let Some(store_info) = self.store_info() {
733            root.record_string("guid", Uuid::from_bytes(store_info.guid).to_string());
734        };
735        root.record_uint("store_object_id", self.store_object_id);
736        root.record_uint("mutations_applied", counters.mutations_applied);
737        root.record_uint("mutations_dropped", counters.mutations_dropped);
738        root.record_uint("num_flushes", counters.num_flushes);
739        if let Some(last_flush_time) = counters.last_flush_time.as_ref() {
740            root.record_uint(
741                "last_flush_time_ms",
742                last_flush_time
743                    .duration_since(std::time::UNIX_EPOCH)
744                    .unwrap_or(std::time::Duration::ZERO)
745                    .as_millis()
746                    .try_into()
747                    .unwrap_or(0u64),
748            );
749        }
750        root.record_uint("device_read_ops", self.device_read_ops.load(Ordering::Relaxed));
751        root.record_uint("device_write_ops", self.device_write_ops.load(Ordering::Relaxed));
752        root.record_uint("logical_read_ops", self.logical_read_ops.load(Ordering::Relaxed));
753        root.record_uint("logical_write_ops", self.logical_write_ops.load(Ordering::Relaxed));
754        root.record_uint("object_id_hi", self.last_object_id.lock().id >> 32);
755
756        let this = self.clone();
757        root.record_child("lsm_tree", move |node| this.tree().record_inspect_data(node));
758    }
759
760    pub fn device(&self) -> &Arc<dyn Device> {
761        &self.device
762    }
763
764    pub fn block_size(&self) -> u64 {
765        self.block_size
766    }
767
768    pub fn filesystem(&self) -> Arc<FxFilesystem> {
769        self.filesystem.upgrade().unwrap()
770    }
771
772    pub fn store_object_id(&self) -> u64 {
773        self.store_object_id
774    }
775
776    pub fn tree(&self) -> &LSMTree<ObjectKey, ObjectValue> {
777        &self.tree
778    }
779
780    pub fn root_directory_object_id(&self) -> u64 {
781        self.store_info.lock().as_ref().unwrap().root_directory_object_id
782    }
783
784    pub fn graveyard_directory_object_id(&self) -> u64 {
785        self.store_info.lock().as_ref().unwrap().graveyard_directory_object_id
786    }
787
788    fn set_graveyard_directory_object_id(&self, oid: u64) {
789        assert_eq!(
790            std::mem::replace(
791                &mut self.store_info.lock().as_mut().unwrap().graveyard_directory_object_id,
792                oid
793            ),
794            INVALID_OBJECT_ID
795        );
796    }
797
798    pub fn object_count(&self) -> u64 {
799        self.store_info.lock().as_ref().unwrap().object_count
800    }
801
802    pub fn key_manager(&self) -> &KeyManager {
803        &self.key_manager
804    }
805
806    pub fn parent_store(&self) -> Option<&Arc<ObjectStore>> {
807        self.parent_store.as_ref()
808    }
809
810    /// Returns the crypt object for the store.  Returns None if the store is unencrypted.
811    pub fn crypt(&self) -> Option<Arc<dyn Crypt>> {
812        match &*self.lock_state.lock() {
813            LockState::Locked => panic!("Store is locked"),
814            LockState::Invalid
815            | LockState::Unencrypted
816            | LockState::Locking
817            | LockState::Unlocking
818            | LockState::Deleted => None,
819            LockState::Unlocked { crypt, .. } => Some(crypt.clone()),
820            LockState::UnlockedReadOnly(crypt) => Some(crypt.clone()),
821            LockState::Unknown => {
822                panic!("Store is of unknown lock state; has the journal been replayed yet?")
823            }
824        }
825    }
826
827    pub async fn get_or_create_internal_directory_id(self: &Arc<Self>) -> Result<u64, Error> {
828        // Create the transaction first to use the object store lock.
829        let mut transaction = self
830            .filesystem()
831            .new_transaction(
832                lock_keys![LockKey::object(
833                    self.parent_store.as_ref().unwrap().store_object_id,
834                    self.store_object_id,
835                )],
836                Options::default(),
837            )
838            .await?;
839        let obj_id = self.store_info.lock().as_ref().unwrap().internal_directory_object_id;
840        if obj_id != INVALID_OBJECT_ID {
841            return Ok(obj_id);
842        }
843
844        // Need to create an internal directory.
845        let directory = Directory::create(&mut transaction, self, None).await?;
846
847        transaction.add(self.store_object_id, Mutation::CreateInternalDir(directory.object_id()));
848        transaction.commit().await?;
849        Ok(directory.object_id())
850    }
851
852    /// Returns the file size for the object without opening the object.
853    async fn get_file_size(&self, object_id: u64) -> Result<u64, Error> {
854        let item = self
855            .tree
856            .find(&ObjectKey::attribute(
857                object_id,
858                DEFAULT_DATA_ATTRIBUTE_ID,
859                AttributeKey::Attribute,
860            ))
861            .await?
862            .ok_or(FxfsError::NotFound)?;
863        if let ObjectValue::Attribute { size, .. } = item.value {
864            Ok(size)
865        } else {
866            bail!(FxfsError::NotFile);
867        }
868    }
869
870    #[cfg(feature = "migration")]
871    pub fn last_object_id(&self) -> u64 {
872        self.last_object_id.lock().id
873    }
874
875    /// Provides access to the allocator to mark a specific region of the device as allocated.
876    #[cfg(feature = "migration")]
877    pub fn mark_allocated(
878        &self,
879        transaction: &mut Transaction<'_>,
880        store_object_id: u64,
881        device_range: std::ops::Range<u64>,
882    ) -> Result<(), Error> {
883        self.allocator().mark_allocated(transaction, store_object_id, device_range)
884    }
885
886    /// `crypt` can be provided if the crypt service should be different to the default; see the
887    /// comment on create_object.  Users should avoid having more than one handle open for the same
888    /// object at the same time because they might get out-of-sync; there is no code that will
889    /// prevent this.  One example where this can cause an issue is if the object ends up using a
890    /// permanent key (which is the case if a value is passed for `crypt`), the permanent key is
891    /// dropped when a handle is dropped, which will impact any other handles for the same object.
892    pub async fn open_object<S: HandleOwner>(
893        owner: &Arc<S>,
894        obj_id: u64,
895        options: HandleOptions,
896        crypt: Option<Arc<dyn Crypt>>,
897    ) -> Result<DataObjectHandle<S>, Error> {
898        let store = owner.as_ref().as_ref();
899        let mut fsverity_descriptor = None;
900        let mut overwrite_ranges = Vec::new();
901        let item = store
902            .tree
903            .find(&ObjectKey::attribute(obj_id, DEFAULT_DATA_ATTRIBUTE_ID, AttributeKey::Attribute))
904            .await?
905            .ok_or(FxfsError::NotFound)?;
906
907        let (size, track_overwrite_extents) = match item.value {
908            ObjectValue::Attribute { size, has_overwrite_extents } => (size, has_overwrite_extents),
909            ObjectValue::VerifiedAttribute { size, fsverity_metadata } => {
910                if !options.skip_fsverity {
911                    fsverity_descriptor = Some(fsverity_metadata);
912                }
913                // We only track the overwrite extents in memory for writes, reads handle them
914                // implicitly, which means verified files (where the data won't change anymore)
915                // don't need to track them.
916                (size, false)
917            }
918            _ => bail!(anyhow!(FxfsError::Inconsistent).context("open_object: Expected attibute")),
919        };
920
921        ensure!(size <= MAX_FILE_SIZE, FxfsError::Inconsistent);
922
923        if track_overwrite_extents {
924            let layer_set = store.tree.layer_set();
925            let mut merger = layer_set.merger();
926            let mut iter = merger
927                .query(Query::FullRange(&ObjectKey::attribute(
928                    obj_id,
929                    DEFAULT_DATA_ATTRIBUTE_ID,
930                    AttributeKey::Extent(ExtentKey::search_key_from_offset(0)),
931                )))
932                .await?;
933            loop {
934                match iter.get() {
935                    Some(ItemRef {
936                        key:
937                            ObjectKey {
938                                object_id,
939                                data:
940                                    ObjectKeyData::Attribute(
941                                        attribute_id,
942                                        AttributeKey::Extent(ExtentKey { range }),
943                                    ),
944                            },
945                        value,
946                        ..
947                    }) if *object_id == obj_id && *attribute_id == DEFAULT_DATA_ATTRIBUTE_ID => {
948                        match value {
949                            ObjectValue::Extent(ExtentValue::None)
950                            | ObjectValue::Extent(ExtentValue::Some {
951                                mode: ExtentMode::Raw,
952                                ..
953                            })
954                            | ObjectValue::Extent(ExtentValue::Some {
955                                mode: ExtentMode::Cow(_),
956                                ..
957                            }) => (),
958                            ObjectValue::Extent(ExtentValue::Some {
959                                mode: ExtentMode::OverwritePartial(_),
960                                ..
961                            })
962                            | ObjectValue::Extent(ExtentValue::Some {
963                                mode: ExtentMode::Overwrite,
964                                ..
965                            }) => overwrite_ranges.push(range.clone()),
966                            _ => bail!(
967                                anyhow!(FxfsError::Inconsistent)
968                                    .context("open_object: Expected extent")
969                            ),
970                        }
971                        iter.advance().await?;
972                    }
973                    _ => break,
974                }
975            }
976        }
977
978        // If a crypt service has been specified, it needs to be a permanent key because cached
979        // keys can only use the store's crypt service.
980        let permanent = if let Some(crypt) = crypt {
981            store
982                .key_manager
983                .get_keys(
984                    obj_id,
985                    crypt.as_ref(),
986                    &mut Some(async || store.get_keys(obj_id).await),
987                    /* permanent= */ true,
988                    /* force= */ false,
989                )
990                .await?;
991            true
992        } else {
993            false
994        };
995        let data_object_handle = DataObjectHandle::new(
996            owner.clone(),
997            obj_id,
998            permanent,
999            DEFAULT_DATA_ATTRIBUTE_ID,
1000            size,
1001            FsverityState::None,
1002            options,
1003            false,
1004            &overwrite_ranges,
1005        );
1006        if let Some(descriptor) = fsverity_descriptor {
1007            data_object_handle
1008                .set_fsverity_state_some(descriptor)
1009                .await
1010                .context("Invalid or mismatched merkle tree")?;
1011        }
1012        Ok(data_object_handle)
1013    }
1014
1015    pub fn create_object_with_id<S: HandleOwner>(
1016        owner: &Arc<S>,
1017        transaction: &mut Transaction<'_>,
1018        object_id: u64,
1019        options: HandleOptions,
1020        encryption_options: Option<ObjectEncryptionOptions>,
1021    ) -> Result<DataObjectHandle<S>, Error> {
1022        debug_assert!(object_id != INVALID_OBJECT_ID);
1023        let store = owner.as_ref().as_ref();
1024        // Don't permit creating unencrypted objects in an encrypted store.  The converse is OK.
1025        debug_assert!(store.crypt().is_none() || encryption_options.is_some());
1026        let now = Timestamp::now();
1027        transaction.add(
1028            store.store_object_id(),
1029            Mutation::insert_object(
1030                ObjectKey::object(object_id),
1031                ObjectValue::file(1, 0, now.clone(), now.clone(), now.clone(), now, 0, None),
1032            ),
1033        );
1034        let mut permanent_keys = false;
1035        if let Some(ObjectEncryptionOptions { permanent, key_id, key, unwrapped_key }) =
1036            encryption_options
1037        {
1038            permanent_keys = permanent;
1039            transaction.add(
1040                store.store_object_id(),
1041                Mutation::insert_object(
1042                    ObjectKey::keys(object_id),
1043                    ObjectValue::keys(vec![(key_id, key)].into()),
1044                ),
1045            );
1046            let cipher: Arc<dyn Cipher> = Arc::new(FxfsCipher::new(&unwrapped_key));
1047            store.key_manager.insert(
1048                object_id,
1049                Arc::new(vec![(key_id, Some(cipher))].into()),
1050                permanent,
1051            );
1052        }
1053        transaction.add(
1054            store.store_object_id(),
1055            Mutation::insert_object(
1056                ObjectKey::attribute(object_id, DEFAULT_DATA_ATTRIBUTE_ID, AttributeKey::Attribute),
1057                // This is a new object so nothing has pre-allocated overwrite extents yet.
1058                ObjectValue::attribute(0, false),
1059            ),
1060        );
1061        Ok(DataObjectHandle::new(
1062            owner.clone(),
1063            object_id,
1064            permanent_keys,
1065            DEFAULT_DATA_ATTRIBUTE_ID,
1066            0,
1067            FsverityState::None,
1068            options,
1069            false,
1070            &[],
1071        ))
1072    }
1073
1074    /// Creates an object in the store.
1075    ///
1076    /// If the store is encrypted, the object will be automatically encrypted as well.
1077    /// If `wrapping_key_id` is set, the new keys will be wrapped with that specific key, and
1078    /// otherwise the default data key is used.
1079    pub async fn create_object<S: HandleOwner>(
1080        owner: &Arc<S>,
1081        mut transaction: &mut Transaction<'_>,
1082        options: HandleOptions,
1083        wrapping_key_id: Option<WrappingKeyId>,
1084    ) -> Result<DataObjectHandle<S>, Error> {
1085        let store = owner.as_ref().as_ref();
1086        let object_id = store.get_next_object_id(transaction.txn_guard()).await?;
1087        let crypt = store.crypt();
1088        let encryption_options = if let Some(crypt) = crypt {
1089            let key_id =
1090                if wrapping_key_id.is_some() { FSCRYPT_KEY_ID } else { VOLUME_DATA_KEY_ID };
1091            let (key, unwrapped_key) = if let Some(wrapping_key_id) = wrapping_key_id {
1092                crypt.create_key_with_id(object_id, wrapping_key_id, ObjectType::File).await?
1093            } else {
1094                let (fxfs_key, unwrapped_key) =
1095                    crypt.create_key(object_id, KeyPurpose::Data).await?;
1096                (EncryptionKey::Fxfs(fxfs_key), unwrapped_key)
1097            };
1098            Some(ObjectEncryptionOptions { permanent: false, key_id, key, unwrapped_key })
1099        } else {
1100            None
1101        };
1102        ObjectStore::create_object_with_id(
1103            owner,
1104            &mut transaction,
1105            object_id,
1106            options,
1107            encryption_options,
1108        )
1109    }
1110
1111    /// Creates an object using explicitly provided keys.
1112    ///
1113    /// There are some cases where an encrypted object needs to be created in an unencrypted store.
1114    /// For example, when layer files for a child store are created in the root store, but they must
1115    /// be encrypted using the child store's keys.  This method exists for that purpose.
1116    pub(crate) async fn create_object_with_key<S: HandleOwner>(
1117        owner: &Arc<S>,
1118        mut transaction: &mut Transaction<'_>,
1119        object_id: u64,
1120        options: HandleOptions,
1121        key: EncryptionKey,
1122        unwrapped_key: UnwrappedKey,
1123    ) -> Result<DataObjectHandle<S>, Error> {
1124        ObjectStore::create_object_with_id(
1125            owner,
1126            &mut transaction,
1127            object_id,
1128            options,
1129            Some(ObjectEncryptionOptions {
1130                permanent: true,
1131                key_id: VOLUME_DATA_KEY_ID,
1132                key,
1133                unwrapped_key,
1134            }),
1135        )
1136    }
1137
1138    /// Adjusts the reference count for a given object.  If the reference count reaches zero, the
1139    /// object is moved into the graveyard and true is returned.
1140    pub async fn adjust_refs(
1141        &self,
1142        transaction: &mut Transaction<'_>,
1143        object_id: u64,
1144        delta: i64,
1145    ) -> Result<bool, Error> {
1146        let mut mutation = self.txn_get_object_mutation(transaction, object_id).await?;
1147        let refs = if let ObjectValue::Object {
1148            kind: ObjectKind::File { refs, .. } | ObjectKind::Symlink { refs, .. },
1149            ..
1150        } = &mut mutation.item.value
1151        {
1152            *refs =
1153                refs.checked_add_signed(delta).ok_or_else(|| anyhow!("refs underflow/overflow"))?;
1154            refs
1155        } else {
1156            bail!(FxfsError::NotFile);
1157        };
1158        if *refs == 0 {
1159            self.add_to_graveyard(transaction, object_id);
1160
1161            // We might still need to adjust the reference count if delta was something other than
1162            // -1.
1163            if delta != -1 {
1164                *refs = 1;
1165                transaction.add(self.store_object_id, Mutation::ObjectStore(mutation));
1166            }
1167            // Otherwise, we don't commit the mutation as we want to keep reference count as 1 for
1168            // objects in graveyard.
1169            Ok(true)
1170        } else {
1171            transaction.add(self.store_object_id, Mutation::ObjectStore(mutation));
1172            Ok(false)
1173        }
1174    }
1175
1176    // Purges an object that is in the graveyard.
1177    pub async fn tombstone_object(
1178        &self,
1179        object_id: u64,
1180        txn_options: Options<'_>,
1181    ) -> Result<(), Error> {
1182        self.key_manager.remove(object_id).await;
1183        let fs = self.filesystem();
1184        let truncate_guard = fs.truncate_guard(self.store_object_id, object_id).await;
1185        self.trim_or_tombstone(object_id, true, txn_options, &truncate_guard).await
1186    }
1187
1188    /// Trim extents beyond the end of a file for all attributes.  This will remove the entry from
1189    /// the graveyard when done.
1190    pub async fn trim(
1191        &self,
1192        object_id: u64,
1193        truncate_guard: &TruncateGuard<'_>,
1194    ) -> Result<(), Error> {
1195        // For the root and root parent store, we would need to use the metadata reservation which
1196        // we don't currently support, so assert that we're not those stores.
1197        assert!(self.parent_store.as_ref().unwrap().parent_store.is_some());
1198
1199        self.trim_or_tombstone(
1200            object_id,
1201            false,
1202            Options { borrow_metadata_space: true, ..Default::default() },
1203            truncate_guard,
1204        )
1205        .await
1206    }
1207
1208    /// Trims or tombstones an object.
1209    async fn trim_or_tombstone(
1210        &self,
1211        object_id: u64,
1212        for_tombstone: bool,
1213        txn_options: Options<'_>,
1214        _truncate_guard: &TruncateGuard<'_>,
1215    ) -> Result<(), Error> {
1216        let fs = self.filesystem();
1217        let mut next_attribute = Some(0);
1218        while let Some(attribute_id) = next_attribute.take() {
1219            let mut transaction = fs
1220                .clone()
1221                .new_transaction(
1222                    lock_keys![
1223                        LockKey::object_attribute(self.store_object_id, object_id, attribute_id),
1224                        LockKey::object(self.store_object_id, object_id),
1225                    ],
1226                    txn_options,
1227                )
1228                .await?;
1229
1230            match self
1231                .trim_some(
1232                    &mut transaction,
1233                    object_id,
1234                    attribute_id,
1235                    if for_tombstone {
1236                        TrimMode::Tombstone(TombstoneMode::Object)
1237                    } else {
1238                        TrimMode::UseSize
1239                    },
1240                )
1241                .await?
1242            {
1243                TrimResult::Incomplete => next_attribute = Some(attribute_id),
1244                TrimResult::Done(None) => {
1245                    if for_tombstone
1246                        || matches!(
1247                            self.tree
1248                                .find(&ObjectKey::graveyard_entry(
1249                                    self.graveyard_directory_object_id(),
1250                                    object_id,
1251                                ))
1252                                .await?,
1253                            Some(Item { value: ObjectValue::Trim, .. })
1254                        )
1255                    {
1256                        self.remove_from_graveyard(&mut transaction, object_id);
1257                    }
1258                }
1259                TrimResult::Done(id) => next_attribute = id,
1260            }
1261
1262            if !transaction.mutations().is_empty() {
1263                transaction.commit().await?;
1264            }
1265        }
1266        Ok(())
1267    }
1268
1269    // Purges an object's attribute that is in the graveyard.
1270    pub async fn tombstone_attribute(
1271        &self,
1272        object_id: u64,
1273        attribute_id: u64,
1274        txn_options: Options<'_>,
1275    ) -> Result<(), Error> {
1276        let fs = self.filesystem();
1277        let mut trim_result = TrimResult::Incomplete;
1278        while matches!(trim_result, TrimResult::Incomplete) {
1279            let mut transaction = fs
1280                .clone()
1281                .new_transaction(
1282                    lock_keys![
1283                        LockKey::object_attribute(self.store_object_id, object_id, attribute_id),
1284                        LockKey::object(self.store_object_id, object_id),
1285                    ],
1286                    txn_options,
1287                )
1288                .await?;
1289            trim_result = self
1290                .trim_some(
1291                    &mut transaction,
1292                    object_id,
1293                    attribute_id,
1294                    TrimMode::Tombstone(TombstoneMode::Attribute),
1295                )
1296                .await?;
1297            if let TrimResult::Done(..) = trim_result {
1298                self.remove_attribute_from_graveyard(&mut transaction, object_id, attribute_id)
1299            }
1300            if !transaction.mutations().is_empty() {
1301                transaction.commit().await?;
1302            }
1303        }
1304        Ok(())
1305    }
1306
1307    /// Deletes extents for attribute `attribute_id` in object `object_id`.  Also see the comments
1308    /// for TrimMode and TrimResult. Should hold a lock on the attribute, and the object as it
1309    /// performs a read-modify-write on the sizes.
1310    pub async fn trim_some(
1311        &self,
1312        transaction: &mut Transaction<'_>,
1313        object_id: u64,
1314        attribute_id: u64,
1315        mode: TrimMode,
1316    ) -> Result<TrimResult, Error> {
1317        let layer_set = self.tree.layer_set();
1318        let mut merger = layer_set.merger();
1319
1320        let aligned_offset = match mode {
1321            TrimMode::FromOffset(offset) => {
1322                round_up(offset, self.block_size).ok_or(FxfsError::Inconsistent)?
1323            }
1324            TrimMode::Tombstone(..) => 0,
1325            TrimMode::UseSize => {
1326                let iter = merger
1327                    .query(Query::FullRange(&ObjectKey::attribute(
1328                        object_id,
1329                        attribute_id,
1330                        AttributeKey::Attribute,
1331                    )))
1332                    .await?;
1333                if let Some(item_ref) = iter.get() {
1334                    if item_ref.key.object_id != object_id {
1335                        return Ok(TrimResult::Done(None));
1336                    }
1337
1338                    if let ItemRef {
1339                        key:
1340                            ObjectKey {
1341                                data:
1342                                    ObjectKeyData::Attribute(size_attribute_id, AttributeKey::Attribute),
1343                                ..
1344                            },
1345                        value: ObjectValue::Attribute { size, .. },
1346                        ..
1347                    } = item_ref
1348                    {
1349                        // If we found a different attribute_id, return so we can get the
1350                        // right lock.
1351                        if *size_attribute_id != attribute_id {
1352                            return Ok(TrimResult::Done(Some(*size_attribute_id)));
1353                        }
1354                        round_up(*size, self.block_size).ok_or(FxfsError::Inconsistent)?
1355                    } else {
1356                        // At time of writing, we should always see a size record or None here, but
1357                        // asserting here would be brittle so just skip to the the next attribute
1358                        // instead.
1359                        return Ok(TrimResult::Done(Some(attribute_id + 1)));
1360                    }
1361                } else {
1362                    // End of the tree.
1363                    return Ok(TrimResult::Done(None));
1364                }
1365            }
1366        };
1367
1368        // Loop over the extents and deallocate them.
1369        let mut iter = merger
1370            .query(Query::FullRange(&ObjectKey::from_extent(
1371                object_id,
1372                attribute_id,
1373                ExtentKey::search_key_from_offset(aligned_offset),
1374            )))
1375            .await?;
1376        let mut end = 0;
1377        let allocator = self.allocator();
1378        let mut result = TrimResult::Done(None);
1379        let mut deallocated = 0;
1380        let block_size = self.block_size;
1381
1382        while let Some(item_ref) = iter.get() {
1383            if item_ref.key.object_id != object_id {
1384                break;
1385            }
1386            if let ObjectKey {
1387                data: ObjectKeyData::Attribute(extent_attribute_id, attribute_key),
1388                ..
1389            } = item_ref.key
1390            {
1391                if *extent_attribute_id != attribute_id {
1392                    result = TrimResult::Done(Some(*extent_attribute_id));
1393                    break;
1394                }
1395                if let (
1396                    AttributeKey::Extent(ExtentKey { range }),
1397                    ObjectValue::Extent(ExtentValue::Some { device_offset, .. }),
1398                ) = (attribute_key, item_ref.value)
1399                {
1400                    let start = std::cmp::max(range.start, aligned_offset);
1401                    ensure!(start < range.end, FxfsError::Inconsistent);
1402                    let device_offset = device_offset
1403                        .checked_add(start - range.start)
1404                        .ok_or(FxfsError::Inconsistent)?;
1405                    end = range.end;
1406                    let len = end - start;
1407                    let device_range = device_offset..device_offset + len;
1408                    ensure!(device_range.is_aligned(block_size), FxfsError::Inconsistent);
1409                    allocator.deallocate(transaction, self.store_object_id, device_range).await?;
1410                    deallocated += len;
1411                    // Stop if the transaction is getting too big.
1412                    if transaction.mutations().len() >= TRANSACTION_MUTATION_THRESHOLD {
1413                        result = TrimResult::Incomplete;
1414                        break;
1415                    }
1416                }
1417            }
1418            iter.advance().await?;
1419        }
1420
1421        let finished_tombstone_object = matches!(mode, TrimMode::Tombstone(TombstoneMode::Object))
1422            && matches!(result, TrimResult::Done(None));
1423        let finished_tombstone_attribute =
1424            matches!(mode, TrimMode::Tombstone(TombstoneMode::Attribute))
1425                && !matches!(result, TrimResult::Incomplete);
1426        let mut object_mutation = None;
1427        let nodes = if finished_tombstone_object { -1 } else { 0 };
1428        if nodes != 0 || deallocated != 0 {
1429            let mutation = self.txn_get_object_mutation(transaction, object_id).await?;
1430            if let ObjectValue::Object { attributes: ObjectAttributes { project_id, .. }, .. } =
1431                mutation.item.value
1432            {
1433                if project_id != 0 {
1434                    transaction.add(
1435                        self.store_object_id,
1436                        Mutation::merge_object(
1437                            ObjectKey::project_usage(self.root_directory_object_id(), project_id),
1438                            ObjectValue::BytesAndNodes {
1439                                bytes: -i64::try_from(deallocated).unwrap(),
1440                                nodes,
1441                            },
1442                        ),
1443                    );
1444                }
1445                object_mutation = Some(mutation);
1446            } else {
1447                panic!("Inconsistent object type.");
1448            }
1449        }
1450
1451        // Deletion marker records *must* be merged so as to consume all other records for the
1452        // object.
1453        if finished_tombstone_object {
1454            transaction.add(
1455                self.store_object_id,
1456                Mutation::merge_object(ObjectKey::object(object_id), ObjectValue::None),
1457            );
1458        } else {
1459            if finished_tombstone_attribute {
1460                transaction.add(
1461                    self.store_object_id,
1462                    Mutation::merge_object(
1463                        ObjectKey::attribute(object_id, attribute_id, AttributeKey::Attribute),
1464                        ObjectValue::None,
1465                    ),
1466                );
1467            }
1468            if deallocated > 0 {
1469                let mut mutation = match object_mutation {
1470                    Some(mutation) => mutation,
1471                    None => self.txn_get_object_mutation(transaction, object_id).await?,
1472                };
1473                transaction.add(
1474                    self.store_object_id,
1475                    Mutation::merge_object(
1476                        ObjectKey::extent(object_id, attribute_id, aligned_offset..end),
1477                        ObjectValue::deleted_extent(),
1478                    ),
1479                );
1480                // Update allocated size.
1481                if let ObjectValue::Object {
1482                    attributes: ObjectAttributes { allocated_size, .. },
1483                    ..
1484                } = &mut mutation.item.value
1485                {
1486                    // The only way for these to fail are if the volume is inconsistent.
1487                    *allocated_size = allocated_size.checked_sub(deallocated).ok_or_else(|| {
1488                        anyhow!(FxfsError::Inconsistent).context("Allocated size overflow")
1489                    })?;
1490                } else {
1491                    panic!("Unexpected object value");
1492                }
1493                transaction.add(self.store_object_id, Mutation::ObjectStore(mutation));
1494            }
1495        }
1496        Ok(result)
1497    }
1498
1499    /// Returns all objects that exist in the parent store that pertain to this object store.
1500    /// Note that this doesn't include the object_id of the store itself which is generally
1501    /// referenced externally.
1502    pub fn parent_objects(&self) -> Vec<u64> {
1503        assert!(self.store_info_handle.get().is_some());
1504        self.store_info.lock().as_ref().unwrap().parent_objects()
1505    }
1506
1507    /// Returns root objects for this store.
1508    pub fn root_objects(&self) -> Vec<u64> {
1509        let mut objects = Vec::new();
1510        let store_info = self.store_info.lock();
1511        let info = store_info.as_ref().unwrap();
1512        if info.root_directory_object_id != INVALID_OBJECT_ID {
1513            objects.push(info.root_directory_object_id);
1514        }
1515        if info.graveyard_directory_object_id != INVALID_OBJECT_ID {
1516            objects.push(info.graveyard_directory_object_id);
1517        }
1518        if info.internal_directory_object_id != INVALID_OBJECT_ID {
1519            objects.push(info.internal_directory_object_id);
1520        }
1521        objects
1522    }
1523
1524    pub fn store_info(&self) -> Option<StoreInfo> {
1525        self.store_info.lock().as_ref().cloned()
1526    }
1527
1528    /// Returns None if called during journal replay.
1529    pub fn store_info_handle_object_id(&self) -> Option<u64> {
1530        self.store_info_handle.get().map(|h| h.object_id())
1531    }
1532
1533    /// Called to open a store, before replay of this store's mutations.
1534    async fn open(
1535        parent_store: &Arc<ObjectStore>,
1536        store_object_id: u64,
1537        object_cache: Box<dyn ObjectCache<ObjectKey, ObjectValue>>,
1538    ) -> Result<Arc<ObjectStore>, Error> {
1539        let handle =
1540            ObjectStore::open_object(parent_store, store_object_id, HandleOptions::default(), None)
1541                .await?;
1542
1543        let info = load_store_info(parent_store, store_object_id).await?;
1544        let is_encrypted = info.mutations_key.is_some();
1545
1546        let mut total_layer_size = 0;
1547        let last_object_id;
1548
1549        // TODO(https://fxbug.dev/42178043): the layer size here could be bad and cause overflow.
1550
1551        // If the store is encrypted, we can't open the object tree layers now, but we need to
1552        // compute the size of the layers.
1553        if is_encrypted {
1554            for &oid in &info.layers {
1555                total_layer_size += parent_store.get_file_size(oid).await?;
1556            }
1557            if info.encrypted_mutations_object_id != INVALID_OBJECT_ID {
1558                total_layer_size += layer_size_from_encrypted_mutations_size(
1559                    parent_store.get_file_size(info.encrypted_mutations_object_id).await?,
1560                );
1561            }
1562            last_object_id = LastObjectId::default();
1563        } else {
1564            last_object_id = LastObjectId { id: info.last_object_id, cipher: None };
1565        }
1566
1567        let fs = parent_store.filesystem();
1568
1569        let store = ObjectStore::new(
1570            Some(parent_store.clone()),
1571            store_object_id,
1572            fs.clone(),
1573            if is_encrypted { None } else { Some(info) },
1574            object_cache,
1575            None,
1576            if is_encrypted { LockState::Locked } else { LockState::Unencrypted },
1577            last_object_id,
1578        );
1579
1580        assert!(store.store_info_handle.set(handle).is_ok(), "Failed to set store_info_handle!");
1581
1582        if !is_encrypted {
1583            let object_tree_layer_object_ids =
1584                store.store_info.lock().as_ref().unwrap().layers.clone();
1585            let object_layers = store.open_layers(object_tree_layer_object_ids, None).await?;
1586            total_layer_size = object_layers.iter().map(|h| h.get_size()).sum();
1587            store
1588                .tree
1589                .append_layers(object_layers)
1590                .await
1591                .context("Failed to read object store layers")?;
1592        }
1593
1594        fs.object_manager().update_reservation(
1595            store_object_id,
1596            tree::reservation_amount_from_layer_size(total_layer_size),
1597        );
1598
1599        Ok(store)
1600    }
1601
1602    async fn load_store_info(&self) -> Result<StoreInfo, Error> {
1603        load_store_info(self.parent_store.as_ref().unwrap(), self.store_object_id).await
1604    }
1605
1606    async fn open_layers(
1607        &self,
1608        object_ids: impl std::iter::IntoIterator<Item = u64>,
1609        crypt: Option<Arc<dyn Crypt>>,
1610    ) -> Result<Vec<DataObjectHandle<ObjectStore>>, Error> {
1611        let parent_store = self.parent_store.as_ref().unwrap();
1612        let mut handles = Vec::new();
1613        for object_id in object_ids {
1614            let handle = ObjectStore::open_object(
1615                &parent_store,
1616                object_id,
1617                HandleOptions::default(),
1618                crypt.clone(),
1619            )
1620            .await
1621            .with_context(|| format!("Failed to open layer file {}", object_id))?;
1622            handles.push(handle);
1623        }
1624        Ok(handles)
1625    }
1626
1627    /// Unlocks a store so that it is ready to be used.
1628    /// This is not thread-safe.
1629    pub async fn unlock(
1630        self: &Arc<Self>,
1631        owner: Weak<dyn StoreOwner>,
1632        crypt: Arc<dyn Crypt>,
1633    ) -> Result<(), Error> {
1634        self.unlock_inner(owner, crypt, /*read_only=*/ false).await
1635    }
1636
1637    /// Unlocks a store so that it is ready to be read from.
1638    /// The store will generally behave like it is still locked: when flushed, the store will
1639    /// write out its mutations into the encrypted mutations file, rather than directly updating
1640    /// the layer files of the object store.
1641    /// Re-locking the store (which *must* be done with `Self::lock_read_only` will not trigger a
1642    /// flush, although the store might still be flushed during other operations.
1643    /// This is not thread-safe.
1644    pub async fn unlock_read_only(self: &Arc<Self>, crypt: Arc<dyn Crypt>) -> Result<(), Error> {
1645        self.unlock_inner(NO_OWNER, crypt, /*read_only=*/ true).await
1646    }
1647
1648    async fn unlock_inner(
1649        self: &Arc<Self>,
1650        owner: Weak<dyn StoreOwner>,
1651        crypt: Arc<dyn Crypt>,
1652        read_only: bool,
1653    ) -> Result<(), Error> {
1654        // Unless we are unlocking the store as read-only, the filesystem must not be read-only.
1655        assert!(read_only || !self.filesystem().options().read_only);
1656        match &*self.lock_state.lock() {
1657            LockState::Locked => {}
1658            LockState::Unencrypted => bail!(FxfsError::InvalidArgs),
1659            LockState::Invalid | LockState::Deleted => bail!(FxfsError::Internal),
1660            LockState::Unlocked { .. } | LockState::UnlockedReadOnly(..) => {
1661                bail!(FxfsError::AlreadyBound)
1662            }
1663            LockState::Unknown => panic!("Store was unlocked before replay"),
1664            LockState::Locking => panic!("Store is being locked"),
1665            LockState::Unlocking => panic!("Store is being unlocked"),
1666        }
1667        // We must lock flushing since that can modify store_info and the encrypted mutations file.
1668        let keys = lock_keys![LockKey::flush(self.store_object_id())];
1669        let fs = self.filesystem();
1670        let guard = fs.lock_manager().write_lock(keys).await;
1671
1672        let store_info = self.load_store_info().await?;
1673
1674        self.tree
1675            .append_layers(
1676                self.open_layers(store_info.layers.iter().cloned(), Some(crypt.clone())).await?,
1677            )
1678            .await
1679            .context("Failed to read object tree layer file contents")?;
1680
1681        let wrapped_key =
1682            fxfs_crypto::WrappedKey::Fxfs(store_info.mutations_key.clone().unwrap().into());
1683        let unwrapped_key = crypt
1684            .unwrap_key(&wrapped_key, self.store_object_id)
1685            .await
1686            .context("Failed to unwrap mutations keys")?;
1687        // The ChaCha20 stream cipher we use supports up to 64 GiB.  By default we'll roll the key
1688        // after every 128 MiB.  Here we just need to pick a number that won't cause issues if it
1689        // wraps, so we just use u32::MAX (the offset is u64).
1690        ensure!(store_info.mutations_cipher_offset <= u32::MAX as u64, FxfsError::Inconsistent);
1691        let mut mutations_cipher =
1692            StreamCipher::new(&unwrapped_key, store_info.mutations_cipher_offset);
1693
1694        let wrapped_key = fxfs_crypto::WrappedKey::Fxfs(
1695            store_info.object_id_key.clone().ok_or(FxfsError::Inconsistent)?.into(),
1696        );
1697        let object_id_cipher =
1698            Ff1::new(&crypt.unwrap_key(&wrapped_key, self.store_object_id).await?);
1699        {
1700            let mut last_object_id = self.last_object_id.lock();
1701            last_object_id.cipher = Some(object_id_cipher);
1702        }
1703        self.update_last_object_id(store_info.last_object_id);
1704
1705        // Apply the encrypted mutations.
1706        let mut mutations = {
1707            if store_info.encrypted_mutations_object_id == INVALID_OBJECT_ID {
1708                EncryptedMutations::default()
1709            } else {
1710                let parent_store = self.parent_store.as_ref().unwrap();
1711                let handle = ObjectStore::open_object(
1712                    &parent_store,
1713                    store_info.encrypted_mutations_object_id,
1714                    HandleOptions::default(),
1715                    None,
1716                )
1717                .await?;
1718                let mut cursor = std::io::Cursor::new(
1719                    handle
1720                        .contents(MAX_ENCRYPTED_MUTATIONS_SIZE)
1721                        .await
1722                        .context(FxfsError::Inconsistent)?,
1723                );
1724                let mut mutations = EncryptedMutations::deserialize_with_version(&mut cursor)
1725                    .context("Failed to deserialize EncryptedMutations")?
1726                    .0;
1727                let len = cursor.get_ref().len() as u64;
1728                while cursor.position() < len {
1729                    mutations.extend(
1730                        &EncryptedMutations::deserialize_with_version(&mut cursor)
1731                            .context("Failed to deserialize EncryptedMutations")?
1732                            .0,
1733                    );
1734                }
1735                mutations
1736            }
1737        };
1738
1739        // This assumes that the journal has no buffered mutations for this store (see Self::lock).
1740        let journaled = EncryptedMutations::from_replayed_mutations(
1741            self.store_object_id,
1742            fs.journal()
1743                .read_transactions_for_object(self.store_object_id)
1744                .await
1745                .context("Failed to read encrypted mutations from journal")?,
1746        );
1747        mutations.extend(&journaled);
1748
1749        let _ = std::mem::replace(&mut *self.lock_state.lock(), LockState::Unlocking);
1750        *self.store_info.lock() = Some(store_info);
1751
1752        // If we fail, clean up.
1753        let clean_up = scopeguard::guard((), |_| {
1754            *self.lock_state.lock() = LockState::Locked;
1755            *self.store_info.lock() = None;
1756            // Make sure we don't leave unencrypted data lying around in memory.
1757            self.tree.reset();
1758        });
1759
1760        let EncryptedMutations { transactions, mut data, mutations_key_roll } = mutations;
1761
1762        let mut slice = &mut data[..];
1763        let mut last_offset = 0;
1764        for (offset, key) in mutations_key_roll {
1765            let split_offset = offset
1766                .checked_sub(last_offset)
1767                .ok_or(FxfsError::Inconsistent)
1768                .context("Invalid mutation key roll offset")?;
1769            last_offset = offset;
1770            ensure!(split_offset <= slice.len(), FxfsError::Inconsistent);
1771            let (old, new) = slice.split_at_mut(split_offset);
1772            mutations_cipher.decrypt(old);
1773            let unwrapped_key = crypt
1774                .unwrap_key(&fxfs_crypto::WrappedKey::Fxfs(key.into()), self.store_object_id)
1775                .await
1776                .context("Failed to unwrap mutations keys")?;
1777            mutations_cipher = StreamCipher::new(&unwrapped_key, 0);
1778            slice = new;
1779        }
1780        mutations_cipher.decrypt(slice);
1781
1782        // Always roll the mutations key when we unlock which guarantees we won't reuse a
1783        // previous key and nonce.
1784        self.roll_mutations_key(crypt.as_ref()).await?;
1785
1786        let mut cursor = std::io::Cursor::new(data);
1787        for (checkpoint, count) in transactions {
1788            let context = ApplyContext { mode: ApplyMode::Replay, checkpoint };
1789            for _ in 0..count {
1790                let mutation =
1791                    Mutation::deserialize_from_version(&mut cursor, context.checkpoint.version)
1792                        .context("failed to deserialize encrypted mutation")?;
1793                self.apply_mutation(mutation, &context, AssocObj::None)
1794                    .context("failed to apply encrypted mutation")?;
1795            }
1796        }
1797
1798        *self.lock_state.lock() = if read_only {
1799            LockState::UnlockedReadOnly(crypt)
1800        } else {
1801            LockState::Unlocked { owner, crypt }
1802        };
1803
1804        // To avoid unbounded memory growth, we should flush the encrypted mutations now. Otherwise
1805        // it's possible for more writes to be queued and for the store to be locked before we can
1806        // flush anything and that can repeat.
1807        std::mem::drop(guard);
1808
1809        if !read_only && !self.filesystem().options().read_only {
1810            self.flush_with_reason(flush::Reason::Unlock).await?;
1811
1812            // Reap purged files within this store.
1813            let _ = self.filesystem().graveyard().initial_reap(&self).await?;
1814        }
1815
1816        // Return and cancel the clean up.
1817        Ok(ScopeGuard::into_inner(clean_up))
1818    }
1819
1820    pub fn is_locked(&self) -> bool {
1821        matches!(
1822            *self.lock_state.lock(),
1823            LockState::Locked | LockState::Locking | LockState::Unknown
1824        )
1825    }
1826
1827    /// NB: This is not the converse of `is_locked`, as there are lock states where neither are
1828    /// true.
1829    pub fn is_unlocked(&self) -> bool {
1830        matches!(
1831            *self.lock_state.lock(),
1832            LockState::Unlocked { .. } | LockState::UnlockedReadOnly { .. } | LockState::Unlocking
1833        )
1834    }
1835
1836    pub fn is_unknown(&self) -> bool {
1837        matches!(*self.lock_state.lock(), LockState::Unknown)
1838    }
1839
1840    pub fn is_encrypted(&self) -> bool {
1841        self.store_info.lock().as_ref().unwrap().mutations_key.is_some()
1842    }
1843
1844    // Locks a store.
1845    // This operation will take a flush lock on the store, in case any flushes are ongoing.  Any
1846    // ongoing store accesses might be interrupted by this.  See `Self::crypt`.
1847    // Whilst this can return an error, the store will be placed into an unusable but safe state
1848    // (i.e. no lingering unencrypted data) if an error is encountered.
1849    pub async fn lock(&self) -> Result<(), Error> {
1850        // We must lock flushing since it is not safe for that to be happening whilst we are locking
1851        // the store.
1852        let keys = lock_keys![LockKey::flush(self.store_object_id())];
1853        let fs = self.filesystem();
1854        let _guard = fs.lock_manager().write_lock(keys).await;
1855
1856        {
1857            let mut lock_state = self.lock_state.lock();
1858            if let LockState::Unlocked { .. } = &*lock_state {
1859                *lock_state = LockState::Locking;
1860            } else {
1861                panic!("Unexpected lock state: {:?}", &*lock_state);
1862            }
1863        }
1864
1865        // Sync the journal now to ensure that any buffered mutations for this store make it out to
1866        // disk.  This is necessary to be able to unlock the store again.
1867        // We need to establish a barrier at this point (so that the journaled writes are observable
1868        // by any future attempts to unlock the store), hence the flush_device.
1869        let sync_result =
1870            self.filesystem().sync(SyncOptions { flush_device: true, ..Default::default() }).await;
1871
1872        *self.lock_state.lock() = if let Err(error) = &sync_result {
1873            error!(error:?; "Failed to sync journal; store will no longer be usable");
1874            LockState::Invalid
1875        } else {
1876            LockState::Locked
1877        };
1878        self.key_manager.clear();
1879        *self.store_info.lock() = None;
1880        self.tree.reset();
1881
1882        sync_result
1883    }
1884
1885    // Locks a store which was previously unlocked read-only (see `Self::unlock_read_only`).  Data
1886    // is not flushed, and instead any journaled mutations are buffered back into the ObjectStore
1887    // and will be replayed next time the store is unlocked.
1888    pub fn lock_read_only(&self) {
1889        *self.lock_state.lock() = LockState::Locked;
1890        *self.store_info.lock() = None;
1891        self.tree.reset();
1892    }
1893
1894    // Returns INVALID_OBJECT_ID if the object ID cipher needs to be created or rolled.
1895    pub(super) fn maybe_get_next_object_id(&self) -> u64 {
1896        let mut last_object_id = self.last_object_id.lock();
1897        if last_object_id.should_create_cipher() {
1898            INVALID_OBJECT_ID
1899        } else {
1900            last_object_id.get_next_object_id()
1901        }
1902    }
1903
1904    /// Returns a new object ID that can be used.  This will create an object ID cipher if needed.
1905    ///
1906    /// If the object ID key needs to be rolled, a new transaction will be created and committed.
1907    /// This transaction does not take the filesystem lock, hence `txn_guard`.
1908    pub async fn get_next_object_id(&self, txn_guard: &TxnGuard<'_>) -> Result<u64, Error> {
1909        let object_id = self.maybe_get_next_object_id();
1910        if object_id != INVALID_OBJECT_ID {
1911            return Ok(object_id);
1912        }
1913
1914        // Create a transaction (which has a lock) and then check again.
1915        let mut transaction = self
1916            .filesystem()
1917            .new_transaction(
1918                lock_keys![LockKey::object(
1919                    self.parent_store.as_ref().unwrap().store_object_id,
1920                    self.store_object_id,
1921                )],
1922                Options {
1923                    // We must skip journal checks because this transaction might be needed to
1924                    // compact.
1925                    skip_journal_checks: true,
1926                    borrow_metadata_space: true,
1927                    txn_guard: Some(txn_guard),
1928                    ..Default::default()
1929                },
1930            )
1931            .await?;
1932
1933        {
1934            let mut last_object_id = self.last_object_id.lock();
1935            if !last_object_id.should_create_cipher() {
1936                // We lost a race.
1937                return Ok(last_object_id.get_next_object_id());
1938            }
1939            // It shouldn't be possible for last_object_id to wrap within our lifetime, so if this
1940            // happens, it's most likely due to corruption.
1941            ensure!(
1942                last_object_id.id & OBJECT_ID_HI_MASK != OBJECT_ID_HI_MASK,
1943                FxfsError::Inconsistent
1944            );
1945        }
1946
1947        // Create a key.
1948        let (object_id_wrapped, object_id_unwrapped) =
1949            self.crypt().unwrap().create_key(self.store_object_id, KeyPurpose::Metadata).await?;
1950
1951        // Update StoreInfo.
1952        let buf = {
1953            let serialized_info = {
1954                let mut store_info = self.store_info.lock();
1955                let store_info = store_info.as_mut().unwrap();
1956                store_info.object_id_key = Some(object_id_wrapped);
1957                let mut serialized_info = Vec::new();
1958                store_info.serialize_with_version(&mut serialized_info)?;
1959                serialized_info
1960            };
1961            let mut buf = self.device.allocate_buffer(serialized_info.len()).await;
1962            buf.as_mut_slice().copy_from_slice(&serialized_info[..]);
1963            buf
1964        };
1965
1966        self.store_info_handle
1967            .get()
1968            .unwrap()
1969            .txn_write(&mut transaction, 0u64, buf.as_ref())
1970            .await?;
1971        transaction.commit().await?;
1972
1973        let mut last_object_id = self.last_object_id.lock();
1974        last_object_id.cipher = Some(Ff1::new(&object_id_unwrapped));
1975        last_object_id.id = (last_object_id.id + (1 << 32)) & OBJECT_ID_HI_MASK;
1976
1977        Ok((last_object_id.id & OBJECT_ID_HI_MASK)
1978            | last_object_id.cipher.as_ref().unwrap().encrypt(last_object_id.id as u32) as u64)
1979    }
1980
1981    /// Query the next object ID that will be used. Intended for use when checking filesystem
1982    /// consistency. Prefer [`Self::get_next_object_id()`] for general use.
1983    pub(crate) fn query_next_object_id(&self) -> u64 {
1984        let mut last_object_id = self.last_object_id.lock().clone();
1985        if last_object_id.should_create_cipher() {
1986            INVALID_OBJECT_ID
1987        } else {
1988            last_object_id.get_next_object_id()
1989        }
1990    }
1991
1992    fn allocator(&self) -> Arc<Allocator> {
1993        self.filesystem().allocator()
1994    }
1995
1996    // If |transaction| has an impending mutation for the underlying object, returns that.
1997    // Otherwise, looks up the object from the tree and returns a suitable mutation for it.  The
1998    // mutation is returned here rather than the item because the mutation includes the operation
1999    // which has significance: inserting an object implies it's the first of its kind unlike
2000    // replacing an object.
2001    async fn txn_get_object_mutation(
2002        &self,
2003        transaction: &Transaction<'_>,
2004        object_id: u64,
2005    ) -> Result<ObjectStoreMutation, Error> {
2006        if let Some(mutation) =
2007            transaction.get_object_mutation(self.store_object_id, ObjectKey::object(object_id))
2008        {
2009            Ok(mutation.clone())
2010        } else {
2011            Ok(ObjectStoreMutation {
2012                item: self
2013                    .tree
2014                    .find(&ObjectKey::object(object_id))
2015                    .await?
2016                    .ok_or(FxfsError::Inconsistent)
2017                    .context("Object id missing")?,
2018                op: Operation::ReplaceOrInsert,
2019            })
2020        }
2021    }
2022
2023    /// Like txn_get_object_mutation but with expanded visibility.
2024    /// Only available in migration code.
2025    #[cfg(feature = "migration")]
2026    pub async fn get_object_mutation(
2027        &self,
2028        transaction: &Transaction<'_>,
2029        object_id: u64,
2030    ) -> Result<ObjectStoreMutation, Error> {
2031        self.txn_get_object_mutation(transaction, object_id).await
2032    }
2033
2034    fn update_last_object_id(&self, mut object_id: u64) {
2035        let mut last_object_id = self.last_object_id.lock();
2036        // For encrypted stores, object_id will be encrypted here, so we must decrypt first.
2037        if let Some(cipher) = &last_object_id.cipher {
2038            // If the object ID cipher has been rolled, then it's possible we might see object IDs
2039            // that were generated using a different cipher so the decrypt here will return the
2040            // wrong value, but that won't matter because the hi part of the object ID should still
2041            // discriminate.
2042            object_id = object_id & OBJECT_ID_HI_MASK | cipher.decrypt(object_id as u32) as u64;
2043        }
2044        if object_id > last_object_id.id {
2045            last_object_id.id = object_id;
2046        }
2047    }
2048
2049    /// Adds the specified object to the graveyard.
2050    pub fn add_to_graveyard(&self, transaction: &mut Transaction<'_>, object_id: u64) {
2051        let graveyard_id = self.graveyard_directory_object_id();
2052        assert_ne!(graveyard_id, INVALID_OBJECT_ID);
2053        transaction.add(
2054            self.store_object_id,
2055            Mutation::replace_or_insert_object(
2056                ObjectKey::graveyard_entry(graveyard_id, object_id),
2057                ObjectValue::Some,
2058            ),
2059        );
2060    }
2061
2062    /// Removes the specified object from the graveyard.  NB: Care should be taken when calling
2063    /// this because graveyard entries are used for purging deleted files *and* for trimming
2064    /// extents.  For example, consider the following sequence:
2065    ///
2066    ///     1. Add Trim graveyard entry.
2067    ///     2. Replace with Some graveyard entry (see above).
2068    ///     3. Remove graveyard entry.
2069    ///
2070    /// If the desire in #3 is just to cancel the effect of the Some entry, then #3 should
2071    /// actually be:
2072    ///
2073    ///     3. Replace with Trim graveyard entry.
2074    pub fn remove_from_graveyard(&self, transaction: &mut Transaction<'_>, object_id: u64) {
2075        transaction.add(
2076            self.store_object_id,
2077            Mutation::replace_or_insert_object(
2078                ObjectKey::graveyard_entry(self.graveyard_directory_object_id(), object_id),
2079                ObjectValue::None,
2080            ),
2081        );
2082    }
2083
2084    /// Removes the specified attribute from the graveyard. Unlike object graveyard entries,
2085    /// attribute graveyard entries only have one functionality (i.e. to purge deleted attributes)
2086    /// so the caller does not need to be concerned about replacing the graveyard attribute entry
2087    /// with its prior state when cancelling it. See comment on `remove_from_graveyard()`.
2088    pub fn remove_attribute_from_graveyard(
2089        &self,
2090        transaction: &mut Transaction<'_>,
2091        object_id: u64,
2092        attribute_id: u64,
2093    ) {
2094        transaction.add(
2095            self.store_object_id,
2096            Mutation::replace_or_insert_object(
2097                ObjectKey::graveyard_attribute_entry(
2098                    self.graveyard_directory_object_id(),
2099                    object_id,
2100                    attribute_id,
2101                ),
2102                ObjectValue::None,
2103            ),
2104        );
2105    }
2106
2107    // Roll the mutations key.  The new key will be written for the next encrypted mutation.
2108    async fn roll_mutations_key(&self, crypt: &dyn Crypt) -> Result<(), Error> {
2109        let (wrapped_key, unwrapped_key) =
2110            crypt.create_key(self.store_object_id, KeyPurpose::Metadata).await?;
2111
2112        // The mutations_cipher lock must be held for the duration so that mutations_cipher and
2113        // store_info are updated atomically.  Otherwise, write_mutation could find a new cipher but
2114        // end up writing the wrong wrapped key.
2115        let mut cipher = self.mutations_cipher.lock();
2116        *cipher = Some(StreamCipher::new(&unwrapped_key, 0));
2117        self.store_info.lock().as_mut().unwrap().mutations_key = Some(wrapped_key);
2118        // mutations_cipher_offset is updated by flush.
2119        Ok(())
2120    }
2121
2122    // When the symlink is unlocked, this function decrypts `link` and returns a bag of bytes that
2123    // is identical to that which was passed in as the target on `create_symlink`.
2124    // If the symlink is locked, this function hashes the encrypted `link` with Sha256 in order to
2125    // get a standard length and then base64 encodes the hash and returns that to the caller.
2126    pub async fn read_encrypted_symlink(
2127        &self,
2128        object_id: u64,
2129        link: Vec<u8>,
2130    ) -> Result<Vec<u8>, Error> {
2131        let mut link = link;
2132        let key = self
2133            .key_manager()
2134            .get_fscrypt_key(object_id, self.crypt().unwrap().as_ref(), async || {
2135                self.get_keys(object_id).await
2136            })
2137            .await?;
2138        if let Some(key) = key {
2139            key.decrypt_filename(object_id, &mut link)?;
2140            Ok(link)
2141        } else {
2142            let proxy_filename = fscrypt::proxy_filename::ProxyFilename::new(0, &link);
2143            let proxy_filename_str: String = proxy_filename.into();
2144            Ok(proxy_filename_str.as_bytes().to_vec())
2145        }
2146    }
2147
2148    /// Returns the link of a symlink object.
2149    pub async fn read_symlink(&self, object_id: u64) -> Result<Vec<u8>, Error> {
2150        match self.tree.find(&ObjectKey::object(object_id)).await? {
2151            None => bail!(FxfsError::NotFound),
2152            Some(Item {
2153                value: ObjectValue::Object { kind: ObjectKind::EncryptedSymlink { link, .. }, .. },
2154                ..
2155            }) => self.read_encrypted_symlink(object_id, link.to_vec()).await,
2156            Some(Item {
2157                value: ObjectValue::Object { kind: ObjectKind::Symlink { link, .. }, .. },
2158                ..
2159            }) => Ok(link.to_vec()),
2160            Some(item) => Err(anyhow!(FxfsError::Inconsistent)
2161                .context(format!("Unexpected item in lookup: {item:?}"))),
2162        }
2163    }
2164
2165    /// Retrieves the wrapped keys for the given object.  The keys *should* be known to exist and it
2166    /// will be considered an inconsistency if they don't.
2167    pub async fn get_keys(&self, object_id: u64) -> Result<EncryptionKeys, Error> {
2168        match self.tree.find(&ObjectKey::keys(object_id)).await?.ok_or(FxfsError::Inconsistent)? {
2169            Item { value: ObjectValue::Keys(keys), .. } => Ok(keys),
2170            _ => Err(anyhow!(FxfsError::Inconsistent).context("open_object: Expected keys")),
2171        }
2172    }
2173
2174    pub async fn update_attributes<'a>(
2175        &self,
2176        transaction: &mut Transaction<'a>,
2177        object_id: u64,
2178        node_attributes: Option<&fio::MutableNodeAttributes>,
2179        change_time: Option<Timestamp>,
2180    ) -> Result<(), Error> {
2181        if change_time.is_none() {
2182            if let Some(attributes) = node_attributes {
2183                let empty_attributes = fio::MutableNodeAttributes { ..Default::default() };
2184                if *attributes == empty_attributes {
2185                    return Ok(());
2186                }
2187            } else {
2188                return Ok(());
2189            }
2190        }
2191        let mut mutation = self.txn_get_object_mutation(transaction, object_id).await?;
2192        if let ObjectValue::Object { ref mut attributes, .. } = mutation.item.value {
2193            if let Some(time) = change_time {
2194                attributes.change_time = time;
2195            }
2196            if let Some(node_attributes) = node_attributes {
2197                if let Some(time) = node_attributes.creation_time {
2198                    attributes.creation_time = Timestamp::from_nanos(time);
2199                }
2200                if let Some(time) = node_attributes.modification_time {
2201                    attributes.modification_time = Timestamp::from_nanos(time);
2202                }
2203                if let Some(time) = node_attributes.access_time {
2204                    attributes.access_time = Timestamp::from_nanos(time);
2205                }
2206                if node_attributes.mode.is_some()
2207                    || node_attributes.uid.is_some()
2208                    || node_attributes.gid.is_some()
2209                    || node_attributes.rdev.is_some()
2210                {
2211                    if let Some(a) = &mut attributes.posix_attributes {
2212                        if let Some(mode) = node_attributes.mode {
2213                            a.mode = mode;
2214                        }
2215                        if let Some(uid) = node_attributes.uid {
2216                            a.uid = uid;
2217                        }
2218                        if let Some(gid) = node_attributes.gid {
2219                            a.gid = gid;
2220                        }
2221                        if let Some(rdev) = node_attributes.rdev {
2222                            a.rdev = rdev;
2223                        }
2224                    } else {
2225                        attributes.posix_attributes = Some(PosixAttributes {
2226                            mode: node_attributes.mode.unwrap_or_default(),
2227                            uid: node_attributes.uid.unwrap_or_default(),
2228                            gid: node_attributes.gid.unwrap_or_default(),
2229                            rdev: node_attributes.rdev.unwrap_or_default(),
2230                        });
2231                    }
2232                }
2233            }
2234        } else {
2235            bail!(
2236                anyhow!(FxfsError::Inconsistent)
2237                    .context("ObjectStore.update_attributes: Expected object value")
2238            );
2239        };
2240        transaction.add(self.store_object_id(), Mutation::ObjectStore(mutation));
2241        Ok(())
2242    }
2243
2244    // Updates and commits the changes to access time in ObjectProperties. The update matches
2245    // Linux's RELATIME. That is, access time is updated to the current time if access time is less
2246    // than or equal to the last modification or status change, or if it has been more than a day
2247    // since the last access.
2248    pub async fn update_access_time(
2249        &self,
2250        object_id: u64,
2251        props: &mut ObjectProperties,
2252    ) -> Result<(), Error> {
2253        let access_time = props.access_time.as_nanos();
2254        let modification_time = props.modification_time.as_nanos();
2255        let change_time = props.change_time.as_nanos();
2256        let now = Timestamp::now();
2257        if access_time <= modification_time
2258            || access_time <= change_time
2259            || access_time
2260                < now.as_nanos()
2261                    - Timestamp::from(std::time::Duration::from_secs(24 * 60 * 60)).as_nanos()
2262        {
2263            let mut transaction = self
2264                .filesystem()
2265                .clone()
2266                .new_transaction(
2267                    lock_keys![LockKey::object(self.store_object_id, object_id,)],
2268                    Options { borrow_metadata_space: true, ..Default::default() },
2269                )
2270                .await?;
2271            self.update_attributes(
2272                &mut transaction,
2273                object_id,
2274                Some(&fio::MutableNodeAttributes {
2275                    access_time: Some(now.as_nanos()),
2276                    ..Default::default()
2277                }),
2278                None,
2279            )
2280            .await?;
2281            transaction.commit().await?;
2282            props.access_time = now;
2283        }
2284        Ok(())
2285    }
2286
2287    async fn write_store_info<'a>(
2288        &'a self,
2289        transaction: &mut Transaction<'a>,
2290        info: &StoreInfo,
2291    ) -> Result<(), Error> {
2292        let mut serialized_info = Vec::new();
2293        info.serialize_with_version(&mut serialized_info)?;
2294        let mut buf = self.device.allocate_buffer(serialized_info.len()).await;
2295        buf.as_mut_slice().copy_from_slice(&serialized_info[..]);
2296        self.store_info_handle.get().unwrap().txn_write(transaction, 0u64, buf.as_ref()).await
2297    }
2298
2299    pub fn mark_deleted(&self) {
2300        *self.lock_state.lock() = LockState::Deleted;
2301    }
2302}
2303
2304#[async_trait]
2305impl JournalingObject for ObjectStore {
2306    fn apply_mutation(
2307        &self,
2308        mutation: Mutation,
2309        context: &ApplyContext<'_, '_>,
2310        _assoc_obj: AssocObj<'_>,
2311    ) -> Result<(), Error> {
2312        match &*self.lock_state.lock() {
2313            LockState::Locked | LockState::Locking => {
2314                ensure!(
2315                    matches!(mutation, Mutation::BeginFlush | Mutation::EndFlush)
2316                        || matches!(
2317                            mutation,
2318                            Mutation::EncryptedObjectStore(_) | Mutation::UpdateMutationsKey(_)
2319                                if context.mode.is_replay()
2320                        ),
2321                    anyhow!(FxfsError::Inconsistent)
2322                        .context(format!("Unexpected mutation for encrypted store: {mutation:?}"))
2323                );
2324            }
2325            LockState::Invalid
2326            | LockState::Unlocking
2327            | LockState::Unencrypted
2328            | LockState::Unlocked { .. }
2329            | LockState::UnlockedReadOnly(..)
2330            | LockState::Deleted => {}
2331            lock_state @ _ => panic!("Unexpected lock state: {lock_state:?}"),
2332        }
2333        match mutation {
2334            Mutation::ObjectStore(ObjectStoreMutation { mut item, op }) => {
2335                item.sequence = context.checkpoint.file_offset;
2336                match op {
2337                    Operation::Insert => {
2338                        // If we are inserting an object record for the first time, it signifies the
2339                        // birth of the object so we need to adjust the object count.
2340                        if matches!(item.value, ObjectValue::Object { .. }) {
2341                            {
2342                                let info = &mut self.store_info.lock();
2343                                let object_count = &mut info.as_mut().unwrap().object_count;
2344                                *object_count = object_count.saturating_add(1);
2345                            }
2346                            if context.mode.is_replay() {
2347                                self.update_last_object_id(item.key.object_id);
2348                            }
2349                        }
2350                        self.tree.insert(item)?;
2351                    }
2352                    Operation::ReplaceOrInsert => {
2353                        self.tree.replace_or_insert(item);
2354                    }
2355                    Operation::Merge => {
2356                        if item.is_tombstone() {
2357                            let info = &mut self.store_info.lock();
2358                            let object_count = &mut info.as_mut().unwrap().object_count;
2359                            *object_count = object_count.saturating_sub(1);
2360                        }
2361                        let lower_bound = item.key.key_for_merge_into();
2362                        self.tree.merge_into(item, &lower_bound);
2363                    }
2364                }
2365            }
2366            Mutation::BeginFlush => {
2367                ensure!(self.parent_store.is_some(), FxfsError::Inconsistent);
2368                self.tree.seal();
2369            }
2370            Mutation::EndFlush => ensure!(self.parent_store.is_some(), FxfsError::Inconsistent),
2371            Mutation::EncryptedObjectStore(_) | Mutation::UpdateMutationsKey(_) => {
2372                // We will process these during Self::unlock.
2373                ensure!(
2374                    !matches!(&*self.lock_state.lock(), LockState::Unencrypted),
2375                    FxfsError::Inconsistent
2376                );
2377            }
2378            Mutation::CreateInternalDir(object_id) => {
2379                ensure!(object_id != INVALID_OBJECT_ID, FxfsError::Inconsistent);
2380                self.store_info.lock().as_mut().unwrap().internal_directory_object_id = object_id;
2381            }
2382            _ => bail!("unexpected mutation: {:?}", mutation),
2383        }
2384        self.counters.lock().mutations_applied += 1;
2385        Ok(())
2386    }
2387
2388    fn drop_mutation(&self, _mutation: Mutation, _transaction: &Transaction<'_>) {
2389        self.counters.lock().mutations_dropped += 1;
2390    }
2391
2392    /// Push all in-memory structures to the device. This is not necessary for sync since the
2393    /// journal will take care of it.  This is supposed to be called when there is either memory or
2394    /// space pressure (flushing the store will persist in-memory data and allow the journal file to
2395    /// be trimmed).
2396    ///
2397    /// Also returns the earliest version of a struct in the filesystem (when known).
2398    async fn flush(&self) -> Result<Version, Error> {
2399        self.flush_with_reason(flush::Reason::Journal).await
2400    }
2401
2402    fn write_mutation(&self, mutation: &Mutation, mut writer: journal::Writer<'_>) {
2403        // Intentionally enumerating all variants to force a decision on any new variants. Encrypt
2404        // all mutations that could affect an encrypted object store contents or the `StoreInfo` of
2405        // the encrypted object store. During `unlock()` any mutations which haven't been encrypted
2406        // won't be replayed after reading `StoreInfo`.
2407        match mutation {
2408            // Whilst CreateInternalDir is a mutation for `StoreInfo`, which isn't encrypted, we
2409            // still choose to encrypt the mutation because it makes it easier to deal with replay.
2410            // When we replay mutations for an encrypted store, the only thing we keep in memory are
2411            // the encrypted mutations; we don't keep `StoreInfo` or changes to it in memory. So, by
2412            // encrypting the CreateInternalDir mutation here, it means we don't have to track both
2413            // encrypted mutations bound for the LSM tree and unencrypted mutations for `StoreInfo`
2414            // to use in `unlock()`. It'll just bundle CreateInternalDir mutations with the other
2415            // encrypted mutations and handled them all in sequence during `unlock()`.
2416            Mutation::ObjectStore(_) | Mutation::CreateInternalDir(_) => {
2417                let mut cipher = self.mutations_cipher.lock();
2418                if let Some(cipher) = cipher.as_mut() {
2419                    // If this is the first time we've used this key, we must write the key out.
2420                    if cipher.offset() == 0 {
2421                        writer.write(Mutation::update_mutations_key(
2422                            self.store_info
2423                                .lock()
2424                                .as_ref()
2425                                .unwrap()
2426                                .mutations_key
2427                                .as_ref()
2428                                .unwrap()
2429                                .clone(),
2430                        ));
2431                    }
2432                    let mut buffer = Vec::new();
2433                    mutation.serialize_into(&mut buffer).unwrap();
2434                    cipher.encrypt(&mut buffer);
2435                    writer.write(Mutation::EncryptedObjectStore(buffer.into()));
2436                    return;
2437                }
2438            }
2439            // `EncryptedObjectStore` and `UpdateMutationsKey` are both obviously associated with
2440            // encrypted object stores, but are either the encrypted mutation data itself or
2441            // metadata governing how the data will be encrypted. They should only be produced here.
2442            Mutation::EncryptedObjectStore(_) | Mutation::UpdateMutationsKey(_) => {
2443                debug_assert!(false, "Only this method should generate encrypted mutations");
2444            }
2445            // `BeginFlush` and `EndFlush` are not needed during `unlock()` and are needed during
2446            // the initial journal replay, so should not be encrypted. `Allocator`, `DeleteVolume`,
2447            // `UpdateBorrowed` mutations are never associated with an encrypted store as we do not
2448            // encrypt the allocator or root/root-parent stores so we can avoid the locking.
2449            Mutation::Allocator(_)
2450            | Mutation::BeginFlush
2451            | Mutation::EndFlush
2452            | Mutation::DeleteVolume
2453            | Mutation::UpdateBorrowed(_) => {}
2454        }
2455        writer.write(mutation.clone());
2456    }
2457}
2458
2459impl HandleOwner for ObjectStore {}
2460
2461impl AsRef<ObjectStore> for ObjectStore {
2462    fn as_ref(&self) -> &ObjectStore {
2463        self
2464    }
2465}
2466
2467fn layer_size_from_encrypted_mutations_size(size: u64) -> u64 {
2468    // This is similar to reserved_space_from_journal_usage. It needs to be a worst case estimate of
2469    // the amount of metadata space that might need to be reserved to allow the encrypted mutations
2470    // to be written to layer files.  It needs to be >= than reservation_amount_from_layer_size will
2471    // return once the data has been written to layer files and <= than
2472    // reserved_space_from_journal_usage would use.  We can't just use
2473    // reserved_space_from_journal_usage because the encrypted mutations file includes some extra
2474    // data (it includes the checkpoints) that isn't written in the same way to the journal.
2475    size * 3
2476}
2477
2478impl AssociatedObject for ObjectStore {}
2479
2480/// Argument to the trim_some method.
2481#[derive(Debug)]
2482pub enum TrimMode {
2483    /// Trim extents beyond the current size.
2484    UseSize,
2485
2486    /// Trim extents beyond the supplied offset.
2487    FromOffset(u64),
2488
2489    /// Remove the object (or attribute) from the store once it is fully trimmed.
2490    Tombstone(TombstoneMode),
2491}
2492
2493/// Sets the mode for tombstoning (either at the object or attribute level).
2494#[derive(Debug)]
2495pub enum TombstoneMode {
2496    Object,
2497    Attribute,
2498}
2499
2500/// Result of the trim_some method.
2501#[derive(Debug)]
2502pub enum TrimResult {
2503    /// We reached the limit of the transaction and more extents might follow.
2504    Incomplete,
2505
2506    /// We finished this attribute.  Returns the ID of the next attribute for the same object if
2507    /// there is one.
2508    Done(Option<u64>),
2509}
2510
2511/// Loads store info.
2512pub async fn load_store_info(
2513    parent: &Arc<ObjectStore>,
2514    store_object_id: u64,
2515) -> Result<StoreInfo, Error> {
2516    let handle =
2517        ObjectStore::open_object(parent, store_object_id, HandleOptions::default(), None).await?;
2518
2519    Ok(if handle.get_size() > 0 {
2520        let serialized_info = handle.contents(MAX_STORE_INFO_SERIALIZED_SIZE).await?;
2521        let mut cursor = std::io::Cursor::new(serialized_info);
2522        let (store_info, _) = StoreInfo::deserialize_with_version(&mut cursor)
2523            .context("Failed to deserialize StoreInfo")?;
2524        store_info
2525    } else {
2526        // The store_info will be absent for a newly created and empty object store.
2527        StoreInfo::default()
2528    })
2529}
2530
2531#[cfg(test)]
2532mod tests {
2533    use super::{
2534        DEFAULT_DATA_ATTRIBUTE_ID, FSVERITY_MERKLE_ATTRIBUTE_ID, FsverityMetadata, HandleOptions,
2535        LastObjectId, LockKey, MAX_STORE_INFO_SERIALIZED_SIZE, Mutation, NO_OWNER,
2536        NewChildStoreOptions, OBJECT_ID_HI_MASK, ObjectStore, RootDigest, StoreInfo, StoreOptions,
2537        StoreOwner,
2538    };
2539    use crate::errors::FxfsError;
2540    use crate::filesystem::{FxFilesystem, JournalingObject, OpenFxFilesystem, SyncOptions};
2541    use crate::fsck::fsck;
2542    use crate::lsm_tree::Query;
2543    use crate::lsm_tree::types::{ItemRef, LayerIterator};
2544    use crate::object_handle::{
2545        INVALID_OBJECT_ID, ObjectHandle, ReadObjectHandle, WriteObjectHandle,
2546    };
2547    use crate::object_store::directory::Directory;
2548    use crate::object_store::object_record::{AttributeKey, ObjectKey, ObjectKind, ObjectValue};
2549    use crate::object_store::transaction::{Options, lock_keys};
2550    use crate::object_store::volume::root_volume;
2551    use crate::serialized_types::VersionedLatest;
2552    use assert_matches::assert_matches;
2553    use async_trait::async_trait;
2554    use fuchsia_async as fasync;
2555    use fuchsia_sync::Mutex;
2556    use futures::join;
2557    use fxfs_crypto::ff1::Ff1;
2558    use fxfs_crypto::{
2559        Crypt, FXFS_KEY_SIZE, FXFS_WRAPPED_KEY_SIZE, FxfsKey, UnwrappedKey, WrappedKeyBytes,
2560    };
2561    use fxfs_insecure_crypto::InsecureCrypt;
2562    use std::sync::Arc;
2563    use std::time::Duration;
2564    use storage_device::DeviceHolder;
2565    use storage_device::fake_device::FakeDevice;
2566
2567    const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
2568
2569    async fn test_filesystem() -> OpenFxFilesystem {
2570        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
2571        FxFilesystem::new_empty(device).await.expect("new_empty failed")
2572    }
2573
2574    #[fuchsia::test]
2575    async fn test_item_sequences() {
2576        let fs = test_filesystem().await;
2577        let object1;
2578        let object2;
2579        let object3;
2580        let mut transaction = fs
2581            .clone()
2582            .new_transaction(lock_keys![], Options::default())
2583            .await
2584            .expect("new_transaction failed");
2585        let store = fs.root_store();
2586        object1 = Arc::new(
2587            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2588                .await
2589                .expect("create_object failed"),
2590        );
2591        transaction.commit().await.expect("commit failed");
2592        let mut transaction = fs
2593            .clone()
2594            .new_transaction(lock_keys![], Options::default())
2595            .await
2596            .expect("new_transaction failed");
2597        object2 = Arc::new(
2598            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2599                .await
2600                .expect("create_object failed"),
2601        );
2602        transaction.commit().await.expect("commit failed");
2603
2604        fs.sync(SyncOptions::default()).await.expect("sync failed");
2605
2606        let mut transaction = fs
2607            .clone()
2608            .new_transaction(lock_keys![], Options::default())
2609            .await
2610            .expect("new_transaction failed");
2611        object3 = Arc::new(
2612            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2613                .await
2614                .expect("create_object failed"),
2615        );
2616        transaction.commit().await.expect("commit failed");
2617
2618        let layer_set = store.tree.layer_set();
2619        let mut merger = layer_set.merger();
2620        let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
2621        let mut sequences = [0u64; 3];
2622        while let Some(ItemRef { key: ObjectKey { object_id, .. }, sequence, .. }) = iter.get() {
2623            if *object_id == object1.object_id() {
2624                sequences[0] = sequence;
2625            } else if *object_id == object2.object_id() {
2626                sequences[1] = sequence;
2627            } else if *object_id == object3.object_id() {
2628                sequences[2] = sequence;
2629            }
2630            iter.advance().await.expect("advance failed");
2631        }
2632
2633        assert!(sequences[0] <= sequences[1], "sequences: {:?}", sequences);
2634        // The last item came after a sync, so should be strictly greater.
2635        assert!(sequences[1] < sequences[2], "sequences: {:?}", sequences);
2636        fs.close().await.expect("Close failed");
2637    }
2638
2639    #[fuchsia::test]
2640    async fn test_verified_file_with_verified_attribute() {
2641        let fs: OpenFxFilesystem = test_filesystem().await;
2642        let mut transaction = fs
2643            .clone()
2644            .new_transaction(lock_keys![], Options::default())
2645            .await
2646            .expect("new_transaction failed");
2647        let store = fs.root_store();
2648        let object = Arc::new(
2649            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2650                .await
2651                .expect("create_object failed"),
2652        );
2653
2654        transaction.add(
2655            store.store_object_id(),
2656            Mutation::replace_or_insert_object(
2657                ObjectKey::attribute(
2658                    object.object_id(),
2659                    DEFAULT_DATA_ATTRIBUTE_ID,
2660                    AttributeKey::Attribute,
2661                ),
2662                ObjectValue::verified_attribute(
2663                    0,
2664                    FsverityMetadata::Internal(RootDigest::Sha256([0; 32]), vec![]),
2665                ),
2666            ),
2667        );
2668
2669        transaction.add(
2670            store.store_object_id(),
2671            Mutation::replace_or_insert_object(
2672                ObjectKey::attribute(
2673                    object.object_id(),
2674                    FSVERITY_MERKLE_ATTRIBUTE_ID,
2675                    AttributeKey::Attribute,
2676                ),
2677                ObjectValue::attribute(0, false),
2678            ),
2679        );
2680
2681        transaction.commit().await.unwrap();
2682
2683        let handle =
2684            ObjectStore::open_object(&store, object.object_id(), HandleOptions::default(), None)
2685                .await
2686                .expect("open_object failed");
2687
2688        assert!(handle.is_verified_file());
2689
2690        fs.close().await.expect("Close failed");
2691    }
2692
2693    #[fuchsia::test]
2694    async fn test_verified_file_without_verified_attribute() {
2695        let fs: OpenFxFilesystem = test_filesystem().await;
2696        let mut transaction = fs
2697            .clone()
2698            .new_transaction(lock_keys![], Options::default())
2699            .await
2700            .expect("new_transaction failed");
2701        let store = fs.root_store();
2702        let object = Arc::new(
2703            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2704                .await
2705                .expect("create_object failed"),
2706        );
2707
2708        transaction.commit().await.unwrap();
2709
2710        let handle =
2711            ObjectStore::open_object(&store, object.object_id(), HandleOptions::default(), None)
2712                .await
2713                .expect("open_object failed");
2714
2715        assert!(!handle.is_verified_file());
2716
2717        fs.close().await.expect("Close failed");
2718    }
2719
2720    #[fuchsia::test]
2721    async fn test_create_and_open_store() {
2722        let fs = test_filesystem().await;
2723        let store_id = {
2724            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
2725            root_volume
2726                .new_volume(
2727                    "test",
2728                    NewChildStoreOptions {
2729                        options: StoreOptions {
2730                            owner: NO_OWNER,
2731                            crypt: Some(Arc::new(InsecureCrypt::new())),
2732                        },
2733                        ..Default::default()
2734                    },
2735                )
2736                .await
2737                .expect("new_volume failed")
2738                .store_object_id()
2739        };
2740
2741        fs.close().await.expect("close failed");
2742        let device = fs.take_device().await;
2743        device.reopen(false);
2744        let fs = FxFilesystem::open(device).await.expect("open failed");
2745
2746        {
2747            let store = fs.object_manager().store(store_id).expect("store not found");
2748            store.unlock(NO_OWNER, Arc::new(InsecureCrypt::new())).await.expect("unlock failed");
2749        }
2750        fs.close().await.expect("Close failed");
2751    }
2752
2753    #[fuchsia::test]
2754    async fn test_create_and_open_internal_dir() {
2755        let fs = test_filesystem().await;
2756        let dir_id;
2757        let store_id;
2758        {
2759            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
2760            let store = root_volume
2761                .new_volume(
2762                    "test",
2763                    NewChildStoreOptions {
2764                        options: StoreOptions {
2765                            owner: NO_OWNER,
2766                            crypt: Some(Arc::new(InsecureCrypt::new())),
2767                        },
2768                        ..Default::default()
2769                    },
2770                )
2771                .await
2772                .expect("new_volume failed");
2773            dir_id =
2774                store.get_or_create_internal_directory_id().await.expect("Create internal dir");
2775            store_id = store.store_object_id();
2776        }
2777
2778        fs.close().await.expect("close failed");
2779        let device = fs.take_device().await;
2780        device.reopen(false);
2781        let fs = FxFilesystem::open(device).await.expect("open failed");
2782
2783        {
2784            let store = fs.object_manager().store(store_id).expect("store not found");
2785            store.unlock(NO_OWNER, Arc::new(InsecureCrypt::new())).await.expect("unlock failed");
2786            assert_eq!(
2787                dir_id,
2788                store.get_or_create_internal_directory_id().await.expect("Retrieving dir")
2789            );
2790            let obj = store
2791                .tree()
2792                .find(&ObjectKey::object(dir_id))
2793                .await
2794                .expect("Searching tree for dir")
2795                .unwrap();
2796            assert_matches!(
2797                obj.value,
2798                ObjectValue::Object { kind: ObjectKind::Directory { .. }, .. }
2799            );
2800        }
2801        fs.close().await.expect("Close failed");
2802    }
2803
2804    #[fuchsia::test]
2805    async fn test_create_and_open_internal_dir_unencrypted() {
2806        let fs = test_filesystem().await;
2807        let dir_id;
2808        let store_id;
2809        {
2810            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
2811            let store = root_volume
2812                .new_volume("test", NewChildStoreOptions::default())
2813                .await
2814                .expect("new_volume failed");
2815            dir_id =
2816                store.get_or_create_internal_directory_id().await.expect("Create internal dir");
2817            store_id = store.store_object_id();
2818        }
2819
2820        fs.close().await.expect("close failed");
2821        let device = fs.take_device().await;
2822        device.reopen(false);
2823        let fs = FxFilesystem::open(device).await.expect("open failed");
2824
2825        {
2826            let store = fs.object_manager().store(store_id).expect("store not found");
2827            assert_eq!(
2828                dir_id,
2829                store.get_or_create_internal_directory_id().await.expect("Retrieving dir")
2830            );
2831            let obj = store
2832                .tree()
2833                .find(&ObjectKey::object(dir_id))
2834                .await
2835                .expect("Searching tree for dir")
2836                .unwrap();
2837            assert_matches!(
2838                obj.value,
2839                ObjectValue::Object { kind: ObjectKind::Directory { .. }, .. }
2840            );
2841        }
2842        fs.close().await.expect("Close failed");
2843    }
2844
2845    #[fuchsia::test(threads = 10)]
2846    async fn test_old_layers_are_purged() {
2847        let fs = test_filesystem().await;
2848
2849        let store = fs.root_store();
2850        let mut transaction = fs
2851            .clone()
2852            .new_transaction(lock_keys![], Options::default())
2853            .await
2854            .expect("new_transaction failed");
2855        let object = Arc::new(
2856            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2857                .await
2858                .expect("create_object failed"),
2859        );
2860        transaction.commit().await.expect("commit failed");
2861
2862        store.flush().await.expect("flush failed");
2863
2864        let mut buf = object.allocate_buffer(5).await;
2865        buf.as_mut_slice().copy_from_slice(b"hello");
2866        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2867
2868        // Getting the layer-set should cause the flush to stall.
2869        let layer_set = store.tree().layer_set();
2870
2871        let done = Mutex::new(false);
2872        let mut object_id = 0;
2873
2874        join!(
2875            async {
2876                store.flush().await.expect("flush failed");
2877                assert!(*done.lock());
2878            },
2879            async {
2880                // This is a halting problem so all we can do is sleep.
2881                fasync::Timer::new(Duration::from_secs(1)).await;
2882                *done.lock() = true;
2883                object_id = layer_set.layers.last().unwrap().handle().unwrap().object_id();
2884                std::mem::drop(layer_set);
2885            }
2886        );
2887
2888        if let Err(e) = ObjectStore::open_object(
2889            &store.parent_store.as_ref().unwrap(),
2890            object_id,
2891            HandleOptions::default(),
2892            store.crypt(),
2893        )
2894        .await
2895        {
2896            assert!(FxfsError::NotFound.matches(&e));
2897        } else {
2898            panic!("open_object succeeded");
2899        }
2900    }
2901
2902    #[fuchsia::test]
2903    async fn test_tombstone_deletes_data() {
2904        let fs = test_filesystem().await;
2905        let root_store = fs.root_store();
2906        let child_id = {
2907            let mut transaction = fs
2908                .clone()
2909                .new_transaction(lock_keys![], Options::default())
2910                .await
2911                .expect("new_transaction failed");
2912            let child = ObjectStore::create_object(
2913                &root_store,
2914                &mut transaction,
2915                HandleOptions::default(),
2916                None,
2917            )
2918            .await
2919            .expect("create_object failed");
2920            transaction.commit().await.expect("commit failed");
2921
2922            // Allocate an extent in the file.
2923            let mut buffer = child.allocate_buffer(8192).await;
2924            buffer.as_mut_slice().fill(0xaa);
2925            child.write_or_append(Some(0), buffer.as_ref()).await.expect("write failed");
2926
2927            child.object_id()
2928        };
2929
2930        root_store.tombstone_object(child_id, Options::default()).await.expect("tombstone failed");
2931
2932        // Let fsck check allocations.
2933        fsck(fs.clone()).await.expect("fsck failed");
2934    }
2935
2936    #[fuchsia::test]
2937    async fn test_tombstone_purges_keys() {
2938        let fs = test_filesystem().await;
2939        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
2940        let store = root_volume
2941            .new_volume(
2942                "test",
2943                NewChildStoreOptions {
2944                    options: StoreOptions {
2945                        crypt: Some(Arc::new(InsecureCrypt::new())),
2946                        ..StoreOptions::default()
2947                    },
2948                    ..NewChildStoreOptions::default()
2949                },
2950            )
2951            .await
2952            .expect("new_volume failed");
2953        let mut transaction = fs
2954            .clone()
2955            .new_transaction(lock_keys![], Options::default())
2956            .await
2957            .expect("new_transaction failed");
2958        let child =
2959            ObjectStore::create_object(&store, &mut transaction, HandleOptions::default(), None)
2960                .await
2961                .expect("create_object failed");
2962        transaction.commit().await.expect("commit failed");
2963        assert!(store.key_manager.get(child.object_id()).await.unwrap().is_some());
2964        store
2965            .tombstone_object(child.object_id(), Options::default())
2966            .await
2967            .expect("tombstone_object failed");
2968        assert!(store.key_manager.get(child.object_id()).await.unwrap().is_none());
2969        fs.close().await.expect("close failed");
2970    }
2971
2972    #[fuchsia::test]
2973    async fn test_major_compaction_discards_unnecessary_records() {
2974        let fs = test_filesystem().await;
2975        let root_store = fs.root_store();
2976        let child_id = {
2977            let mut transaction = fs
2978                .clone()
2979                .new_transaction(lock_keys![], Options::default())
2980                .await
2981                .expect("new_transaction failed");
2982            let child = ObjectStore::create_object(
2983                &root_store,
2984                &mut transaction,
2985                HandleOptions::default(),
2986                None,
2987            )
2988            .await
2989            .expect("create_object failed");
2990            transaction.commit().await.expect("commit failed");
2991
2992            // Allocate an extent in the file.
2993            let mut buffer = child.allocate_buffer(8192).await;
2994            buffer.as_mut_slice().fill(0xaa);
2995            child.write_or_append(Some(0), buffer.as_ref()).await.expect("write failed");
2996
2997            child.object_id()
2998        };
2999
3000        root_store.tombstone_object(child_id, Options::default()).await.expect("tombstone failed");
3001        {
3002            let layers = root_store.tree.layer_set();
3003            let mut merger = layers.merger();
3004            let iter = merger
3005                .query(Query::FullRange(&ObjectKey::object(child_id)))
3006                .await
3007                .expect("seek failed");
3008            // Find at least one object still in the tree.
3009            match iter.get() {
3010                Some(ItemRef { key: ObjectKey { object_id, .. }, .. })
3011                    if *object_id == child_id => {}
3012                _ => panic!("Objects should still be in the tree."),
3013            }
3014        }
3015        root_store.flush().await.expect("flush failed");
3016
3017        // There should be no records for the object.
3018        let layers = root_store.tree.layer_set();
3019        let mut merger = layers.merger();
3020        let iter = merger
3021            .query(Query::FullRange(&ObjectKey::object(child_id)))
3022            .await
3023            .expect("seek failed");
3024        match iter.get() {
3025            None => {}
3026            Some(ItemRef { key: ObjectKey { object_id, .. }, .. }) => {
3027                assert_ne!(*object_id, child_id)
3028            }
3029        }
3030    }
3031
3032    #[fuchsia::test]
3033    async fn test_overlapping_extents_in_different_layers() {
3034        let fs = test_filesystem().await;
3035        let store = fs.root_store();
3036
3037        let mut transaction = fs
3038            .clone()
3039            .new_transaction(
3040                lock_keys![LockKey::object(
3041                    store.store_object_id(),
3042                    store.root_directory_object_id()
3043                )],
3044                Options::default(),
3045            )
3046            .await
3047            .expect("new_transaction failed");
3048        let root_directory =
3049            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
3050        let object = root_directory
3051            .create_child_file(&mut transaction, "test")
3052            .await
3053            .expect("create_child_file failed");
3054        transaction.commit().await.expect("commit failed");
3055
3056        let buf = object.allocate_buffer(16384).await;
3057        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3058
3059        store.flush().await.expect("flush failed");
3060
3061        object.write_or_append(Some(0), buf.subslice(0..4096)).await.expect("write failed");
3062
3063        // At this point, we should have an extent for 0..16384 in a layer that has been flushed,
3064        // and an extent for 0..4096 that partially overwrites it.  Writing to 0..16384 should
3065        // overwrite both of those extents.
3066        object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3067
3068        fsck(fs.clone()).await.expect("fsck failed");
3069    }
3070
3071    #[fuchsia::test(threads = 10)]
3072    async fn test_encrypted_mutations() {
3073        async fn one_iteration(
3074            fs: OpenFxFilesystem,
3075            crypt: Arc<dyn Crypt>,
3076            iteration: u64,
3077        ) -> OpenFxFilesystem {
3078            async fn reopen(fs: OpenFxFilesystem) -> OpenFxFilesystem {
3079                fs.close().await.expect("Close failed");
3080                let device = fs.take_device().await;
3081                device.reopen(false);
3082                FxFilesystem::open(device).await.expect("FS open failed")
3083            }
3084
3085            let fs = reopen(fs).await;
3086
3087            let (store_object_id, object_id) = {
3088                let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3089                let store = root_volume
3090                    .volume(
3091                        "test",
3092                        StoreOptions { crypt: Some(crypt.clone()), ..StoreOptions::default() },
3093                    )
3094                    .await
3095                    .expect("volume failed");
3096
3097                let mut transaction = fs
3098                    .clone()
3099                    .new_transaction(
3100                        lock_keys![LockKey::object(
3101                            store.store_object_id(),
3102                            store.root_directory_object_id(),
3103                        )],
3104                        Options::default(),
3105                    )
3106                    .await
3107                    .expect("new_transaction failed");
3108                let root_directory = Directory::open(&store, store.root_directory_object_id())
3109                    .await
3110                    .expect("open failed");
3111                let object = root_directory
3112                    .create_child_file(&mut transaction, &format!("test {}", iteration))
3113                    .await
3114                    .expect("create_child_file failed");
3115                transaction.commit().await.expect("commit failed");
3116
3117                let mut buf = object.allocate_buffer(1000).await;
3118                for i in 0..buf.len() {
3119                    buf.as_mut_slice()[i] = i as u8;
3120                }
3121                object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
3122
3123                (store.store_object_id(), object.object_id())
3124            };
3125
3126            let fs = reopen(fs).await;
3127
3128            let check_object = |fs: Arc<FxFilesystem>| {
3129                let crypt = crypt.clone();
3130                async move {
3131                    let root_volume = root_volume(fs).await.expect("root_volume failed");
3132                    let volume = root_volume
3133                        .volume(
3134                            "test",
3135                            StoreOptions { crypt: Some(crypt), ..StoreOptions::default() },
3136                        )
3137                        .await
3138                        .expect("volume failed");
3139
3140                    let object = ObjectStore::open_object(
3141                        &volume,
3142                        object_id,
3143                        HandleOptions::default(),
3144                        None,
3145                    )
3146                    .await
3147                    .expect("open_object failed");
3148                    let mut buf = object.allocate_buffer(1000).await;
3149                    assert_eq!(object.read(0, buf.as_mut()).await.expect("read failed"), 1000);
3150                    for i in 0..buf.len() {
3151                        assert_eq!(buf.as_slice()[i], i as u8);
3152                    }
3153                }
3154            };
3155
3156            check_object(fs.clone()).await;
3157
3158            let fs = reopen(fs).await;
3159
3160            // At this point the "test" volume is locked.  Before checking the object, flush the
3161            // filesystem.  This should leave a file with encrypted mutations.
3162            fs.object_manager().flush().await.expect("flush failed");
3163
3164            assert_ne!(
3165                fs.object_manager()
3166                    .store(store_object_id)
3167                    .unwrap()
3168                    .load_store_info()
3169                    .await
3170                    .expect("load_store_info failed")
3171                    .encrypted_mutations_object_id,
3172                INVALID_OBJECT_ID
3173            );
3174
3175            check_object(fs.clone()).await;
3176
3177            // Checking the object should have triggered a flush and so now there should be no
3178            // encrypted mutations object.
3179            assert_eq!(
3180                fs.object_manager()
3181                    .store(store_object_id)
3182                    .unwrap()
3183                    .load_store_info()
3184                    .await
3185                    .expect("load_store_info failed")
3186                    .encrypted_mutations_object_id,
3187                INVALID_OBJECT_ID
3188            );
3189
3190            let fs = reopen(fs).await;
3191
3192            fsck(fs.clone()).await.expect("fsck failed");
3193
3194            let fs = reopen(fs).await;
3195
3196            check_object(fs.clone()).await;
3197
3198            fs
3199        }
3200
3201        let mut fs = test_filesystem().await;
3202        let crypt = Arc::new(InsecureCrypt::new());
3203
3204        {
3205            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3206            let _store = root_volume
3207                .new_volume(
3208                    "test",
3209                    NewChildStoreOptions {
3210                        options: StoreOptions {
3211                            crypt: Some(crypt.clone()),
3212                            ..StoreOptions::default()
3213                        },
3214                        ..Default::default()
3215                    },
3216                )
3217                .await
3218                .expect("new_volume failed");
3219        }
3220
3221        // Run a few iterations so that we test changes with the stream cipher offset.
3222        for i in 0..5 {
3223            fs = one_iteration(fs, crypt.clone(), i).await;
3224        }
3225    }
3226
3227    #[fuchsia::test(threads = 10)]
3228    async fn test_object_id_cipher_roll() {
3229        let fs = test_filesystem().await;
3230        let crypt = Arc::new(InsecureCrypt::new());
3231
3232        {
3233            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3234            let store = root_volume
3235                .new_volume(
3236                    "test",
3237                    NewChildStoreOptions {
3238                        options: StoreOptions {
3239                            crypt: Some(crypt.clone()),
3240                            ..StoreOptions::default()
3241                        },
3242                        ..Default::default()
3243                    },
3244                )
3245                .await
3246                .expect("new_volume failed");
3247
3248            let store_info = store.store_info().unwrap();
3249
3250            // Hack the last object ID to force a roll of the object ID cipher.
3251            {
3252                let mut last_object_id = store.last_object_id.lock();
3253                assert_eq!(last_object_id.id & OBJECT_ID_HI_MASK, 0);
3254                last_object_id.id |= 0xffffffff;
3255            }
3256
3257            let mut transaction = fs
3258                .clone()
3259                .new_transaction(
3260                    lock_keys![LockKey::object(
3261                        store.store_object_id(),
3262                        store.root_directory_object_id()
3263                    )],
3264                    Options::default(),
3265                )
3266                .await
3267                .expect("new_transaction failed");
3268            let root_directory = Directory::open(&store, store.root_directory_object_id())
3269                .await
3270                .expect("open failed");
3271            let object = root_directory
3272                .create_child_file(&mut transaction, "test")
3273                .await
3274                .expect("create_child_file failed");
3275            transaction.commit().await.expect("commit failed");
3276
3277            assert_eq!(object.object_id() & OBJECT_ID_HI_MASK, 1u64 << 32);
3278
3279            // Check that the key has been changed.
3280            assert_ne!(store.store_info().unwrap().object_id_key, store_info.object_id_key);
3281
3282            assert_eq!(store.last_object_id.lock().id, 1u64 << 32);
3283        };
3284
3285        fs.close().await.expect("Close failed");
3286        let device = fs.take_device().await;
3287        device.reopen(false);
3288        let fs = FxFilesystem::open(device).await.expect("open failed");
3289        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3290        let store = root_volume
3291            .volume("test", StoreOptions { crypt: Some(crypt.clone()), ..StoreOptions::default() })
3292            .await
3293            .expect("volume failed");
3294
3295        assert_eq!(store.last_object_id.lock().id, 1u64 << 32);
3296    }
3297
3298    #[fuchsia::test]
3299    async fn test_object_id_no_roll_for_unencrypted_store() {
3300        let fs = test_filesystem().await;
3301
3302        {
3303            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3304            let store = root_volume
3305                .new_volume("test", NewChildStoreOptions::default())
3306                .await
3307                .expect("new_volume failed");
3308
3309            // Hack the last object ID.
3310            {
3311                let mut last_object_id = store.last_object_id.lock();
3312                assert_eq!(last_object_id.id & OBJECT_ID_HI_MASK, 0);
3313                last_object_id.id |= 0xffffffff;
3314            }
3315
3316            let mut transaction = fs
3317                .clone()
3318                .new_transaction(
3319                    lock_keys![LockKey::object(
3320                        store.store_object_id(),
3321                        store.root_directory_object_id()
3322                    )],
3323                    Options::default(),
3324                )
3325                .await
3326                .expect("new_transaction failed");
3327            let root_directory = Directory::open(&store, store.root_directory_object_id())
3328                .await
3329                .expect("open failed");
3330            let object = root_directory
3331                .create_child_file(&mut transaction, "test")
3332                .await
3333                .expect("create_child_file failed");
3334            transaction.commit().await.expect("commit failed");
3335
3336            assert_eq!(object.object_id(), 0x1_0000_0000);
3337
3338            // Check that there is still no key.
3339            assert!(store.store_info().unwrap().object_id_key.is_none());
3340
3341            assert_eq!(store.last_object_id.lock().id, 0x1_0000_0000);
3342        };
3343
3344        fs.close().await.expect("Close failed");
3345        let device = fs.take_device().await;
3346        device.reopen(false);
3347        let fs = FxFilesystem::open(device).await.expect("open failed");
3348        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3349        let store =
3350            root_volume.volume("test", StoreOptions::default()).await.expect("volume failed");
3351
3352        assert_eq!(store.last_object_id.lock().id, 0x1_0000_0000);
3353    }
3354
3355    #[fuchsia::test]
3356    fn test_object_id_is_not_invalid_object_id() {
3357        let key = UnwrappedKey::new(vec![0; FXFS_KEY_SIZE]);
3358        // 1106634048 results in INVALID_OBJECT_ID with this key.
3359        let mut last_object_id = LastObjectId { id: 1106634047, cipher: Some(Ff1::new(&key)) };
3360        assert_ne!(last_object_id.get_next_object_id(), INVALID_OBJECT_ID);
3361        assert_ne!(last_object_id.get_next_object_id(), INVALID_OBJECT_ID);
3362    }
3363
3364    #[fuchsia::test(threads = 10)]
3365    async fn test_lock_store() {
3366        let fs = test_filesystem().await;
3367        let crypt = Arc::new(InsecureCrypt::new());
3368
3369        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3370        let store = root_volume
3371            .new_volume(
3372                "test",
3373                NewChildStoreOptions {
3374                    options: StoreOptions { crypt: Some(crypt.clone()), ..StoreOptions::default() },
3375                    ..NewChildStoreOptions::default()
3376                },
3377            )
3378            .await
3379            .expect("new_volume failed");
3380        let mut transaction = fs
3381            .clone()
3382            .new_transaction(
3383                lock_keys![LockKey::object(
3384                    store.store_object_id(),
3385                    store.root_directory_object_id()
3386                )],
3387                Options::default(),
3388            )
3389            .await
3390            .expect("new_transaction failed");
3391        let root_directory =
3392            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
3393        root_directory
3394            .create_child_file(&mut transaction, "test")
3395            .await
3396            .expect("create_child_file failed");
3397        transaction.commit().await.expect("commit failed");
3398        store.lock().await.expect("lock failed");
3399
3400        store.unlock(NO_OWNER, crypt).await.expect("unlock failed");
3401        root_directory.lookup("test").await.expect("lookup failed").expect("not found");
3402    }
3403
3404    #[fuchsia::test(threads = 10)]
3405    async fn test_unlock_read_only() {
3406        let fs = test_filesystem().await;
3407        let crypt = Arc::new(InsecureCrypt::new());
3408
3409        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3410        let store = root_volume
3411            .new_volume(
3412                "test",
3413                NewChildStoreOptions {
3414                    options: StoreOptions { crypt: Some(crypt.clone()), ..StoreOptions::default() },
3415                    ..NewChildStoreOptions::default()
3416                },
3417            )
3418            .await
3419            .expect("new_volume failed");
3420        let mut transaction = fs
3421            .clone()
3422            .new_transaction(
3423                lock_keys![LockKey::object(
3424                    store.store_object_id(),
3425                    store.root_directory_object_id()
3426                )],
3427                Options::default(),
3428            )
3429            .await
3430            .expect("new_transaction failed");
3431        let root_directory =
3432            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
3433        root_directory
3434            .create_child_file(&mut transaction, "test")
3435            .await
3436            .expect("create_child_file failed");
3437        transaction.commit().await.expect("commit failed");
3438        store.lock().await.expect("lock failed");
3439
3440        store.unlock_read_only(crypt.clone()).await.expect("unlock failed");
3441        root_directory.lookup("test").await.expect("lookup failed").expect("not found");
3442        store.lock_read_only();
3443        store.unlock_read_only(crypt).await.expect("unlock failed");
3444        root_directory.lookup("test").await.expect("lookup failed").expect("not found");
3445    }
3446
3447    #[fuchsia::test(threads = 10)]
3448    async fn test_key_rolled_when_unlocked() {
3449        let fs = test_filesystem().await;
3450        let crypt = Arc::new(InsecureCrypt::new());
3451
3452        let object_id;
3453        {
3454            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3455            let store = root_volume
3456                .new_volume(
3457                    "test",
3458                    NewChildStoreOptions {
3459                        options: StoreOptions {
3460                            crypt: Some(crypt.clone()),
3461                            ..StoreOptions::default()
3462                        },
3463                        ..Default::default()
3464                    },
3465                )
3466                .await
3467                .expect("new_volume failed");
3468            let mut transaction = fs
3469                .clone()
3470                .new_transaction(
3471                    lock_keys![LockKey::object(
3472                        store.store_object_id(),
3473                        store.root_directory_object_id()
3474                    )],
3475                    Options::default(),
3476                )
3477                .await
3478                .expect("new_transaction failed");
3479            let root_directory = Directory::open(&store, store.root_directory_object_id())
3480                .await
3481                .expect("open failed");
3482            object_id = root_directory
3483                .create_child_file(&mut transaction, "test")
3484                .await
3485                .expect("create_child_file failed")
3486                .object_id();
3487            transaction.commit().await.expect("commit failed");
3488        }
3489
3490        fs.close().await.expect("Close failed");
3491        let mut device = fs.take_device().await;
3492
3493        // Repeatedly remount so that we can be sure that we can remount when there are many
3494        // mutations keys.
3495        for _ in 0..100 {
3496            device.reopen(false);
3497            let fs = FxFilesystem::open(device).await.expect("open failed");
3498            {
3499                let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3500                let store = root_volume
3501                    .volume(
3502                        "test",
3503                        StoreOptions { crypt: Some(crypt.clone()), ..StoreOptions::default() },
3504                    )
3505                    .await
3506                    .expect("open_volume failed");
3507
3508                // The key should get rolled every time we unlock.
3509                assert_eq!(store.mutations_cipher.lock().as_ref().unwrap().offset(), 0);
3510
3511                // Make sure there's an encrypted mutation.
3512                let handle =
3513                    ObjectStore::open_object(&store, object_id, HandleOptions::default(), None)
3514                        .await
3515                        .expect("open_object failed");
3516                let buffer = handle.allocate_buffer(100).await;
3517                handle
3518                    .write_or_append(Some(0), buffer.as_ref())
3519                    .await
3520                    .expect("write_or_append failed");
3521            }
3522            fs.close().await.expect("Close failed");
3523            device = fs.take_device().await;
3524        }
3525    }
3526
3527    #[test]
3528    fn test_store_info_max_serialized_size() {
3529        let info = StoreInfo {
3530            guid: [0xff; 16],
3531            last_object_id: 0x1234567812345678,
3532            // Worst case, each layer should be 3/4 the size of the layer below it (because of the
3533            // compaction policy we're using).  If the smallest layer is 8,192 bytes, then 120
3534            // layers would take up a size that exceeds a 64 bit unsigned integer, so if this fits,
3535            // any size should fit.
3536            layers: vec![0x1234567812345678; 120],
3537            root_directory_object_id: 0x1234567812345678,
3538            graveyard_directory_object_id: 0x1234567812345678,
3539            object_count: 0x1234567812345678,
3540            mutations_key: Some(FxfsKey {
3541                wrapping_key_id: 0x1234567812345678u128.to_le_bytes(),
3542                key: WrappedKeyBytes::from([0xff; FXFS_WRAPPED_KEY_SIZE]),
3543            }),
3544            mutations_cipher_offset: 0x1234567812345678,
3545            encrypted_mutations_object_id: 0x1234567812345678,
3546            object_id_key: Some(FxfsKey {
3547                wrapping_key_id: 0x1234567812345678u128.to_le_bytes(),
3548                key: WrappedKeyBytes::from([0xff; FXFS_WRAPPED_KEY_SIZE]),
3549            }),
3550            internal_directory_object_id: INVALID_OBJECT_ID,
3551        };
3552        let mut serialized_info = Vec::new();
3553        info.serialize_with_version(&mut serialized_info).unwrap();
3554        assert!(
3555            serialized_info.len() <= MAX_STORE_INFO_SERIALIZED_SIZE,
3556            "{}",
3557            serialized_info.len()
3558        );
3559    }
3560
3561    async fn reopen_after_crypt_failure_inner(read_only: bool) {
3562        let fs = test_filesystem().await;
3563        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3564
3565        let store = {
3566            let crypt = Arc::new(InsecureCrypt::new());
3567            let store = root_volume
3568                .new_volume(
3569                    "vol",
3570                    NewChildStoreOptions {
3571                        options: StoreOptions {
3572                            crypt: Some(crypt.clone()),
3573                            ..StoreOptions::default()
3574                        },
3575                        ..Default::default()
3576                    },
3577                )
3578                .await
3579                .expect("new_volume failed");
3580            let root_directory = Directory::open(&store, store.root_directory_object_id())
3581                .await
3582                .expect("open failed");
3583            let mut transaction = fs
3584                .clone()
3585                .new_transaction(
3586                    lock_keys![LockKey::object(
3587                        store.store_object_id(),
3588                        root_directory.object_id()
3589                    )],
3590                    Options::default(),
3591                )
3592                .await
3593                .expect("new_transaction failed");
3594            root_directory
3595                .create_child_file(&mut transaction, "test")
3596                .await
3597                .expect("create_child_file failed");
3598            transaction.commit().await.expect("commit failed");
3599
3600            crypt.shutdown();
3601            let mut transaction = fs
3602                .clone()
3603                .new_transaction(
3604                    lock_keys![LockKey::object(
3605                        store.store_object_id(),
3606                        root_directory.object_id()
3607                    )],
3608                    Options::default(),
3609                )
3610                .await
3611                .expect("new_transaction failed");
3612            root_directory
3613                .create_child_file(&mut transaction, "test2")
3614                .await
3615                .map(|_| ())
3616                .expect_err("create_child_file should fail");
3617            store.lock().await.expect("lock failed");
3618            store
3619        };
3620
3621        let crypt = Arc::new(InsecureCrypt::new());
3622        if read_only {
3623            store.unlock_read_only(crypt).await.expect("unlock failed");
3624        } else {
3625            store.unlock(NO_OWNER, crypt).await.expect("unlock failed");
3626        }
3627        let root_directory =
3628            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
3629        root_directory.lookup("test").await.expect("lookup failed").expect("not found");
3630    }
3631
3632    #[fuchsia::test(threads = 10)]
3633    async fn test_reopen_after_crypt_failure() {
3634        reopen_after_crypt_failure_inner(false).await;
3635    }
3636
3637    #[fuchsia::test(threads = 10)]
3638    async fn test_reopen_read_only_after_crypt_failure() {
3639        reopen_after_crypt_failure_inner(true).await;
3640    }
3641
3642    #[fuchsia::test(threads = 10)]
3643    #[should_panic(expected = "Insufficient reservation space")]
3644    #[cfg(debug_assertions)]
3645    async fn large_transaction_causes_panic_in_debug_builds() {
3646        let fs = test_filesystem().await;
3647        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3648        let store = root_volume
3649            .new_volume("vol", NewChildStoreOptions::default())
3650            .await
3651            .expect("new_volume failed");
3652        let root_directory =
3653            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
3654        let mut transaction = fs
3655            .clone()
3656            .new_transaction(
3657                lock_keys![LockKey::object(store.store_object_id(), root_directory.object_id())],
3658                Options::default(),
3659            )
3660            .await
3661            .expect("transaction");
3662        for i in 0..500 {
3663            root_directory
3664                .create_symlink(&mut transaction, b"link", &format!("{}", i))
3665                .await
3666                .expect("symlink");
3667        }
3668        assert_eq!(transaction.commit().await.expect("commit"), 0);
3669    }
3670
3671    #[fuchsia::test]
3672    async fn test_crypt_failure_does_not_fuse_journal() {
3673        let fs = test_filesystem().await;
3674
3675        struct Owner;
3676        #[async_trait]
3677        impl StoreOwner for Owner {
3678            async fn force_lock(self: Arc<Self>, store: &ObjectStore) -> Result<(), anyhow::Error> {
3679                store.lock().await
3680            }
3681        }
3682        let owner = Arc::new(Owner) as Arc<dyn StoreOwner>;
3683
3684        {
3685            // Create two stores and a record for each store, so the journal will need to flush them
3686            // both later.
3687            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3688            let store1 = root_volume
3689                .new_volume(
3690                    "vol1",
3691                    NewChildStoreOptions {
3692                        options: StoreOptions {
3693                            crypt: Some(Arc::new(InsecureCrypt::new())),
3694                            ..StoreOptions::default()
3695                        },
3696                        ..Default::default()
3697                    },
3698                )
3699                .await
3700                .expect("new_volume failed");
3701            let crypt = Arc::new(InsecureCrypt::new());
3702            let store2 = root_volume
3703                .new_volume(
3704                    "vol2",
3705                    NewChildStoreOptions {
3706                        options: StoreOptions {
3707                            owner: Arc::downgrade(&owner),
3708                            crypt: Some(crypt.clone()),
3709                        },
3710                        ..Default::default()
3711                    },
3712                )
3713                .await
3714                .expect("new_volume failed");
3715            for store in [&store1, &store2] {
3716                let root_directory = Directory::open(store, store.root_directory_object_id())
3717                    .await
3718                    .expect("open failed");
3719                let mut transaction = fs
3720                    .clone()
3721                    .new_transaction(
3722                        lock_keys![LockKey::object(
3723                            store.store_object_id(),
3724                            root_directory.object_id()
3725                        )],
3726                        Options::default(),
3727                    )
3728                    .await
3729                    .expect("new_transaction failed");
3730                root_directory
3731                    .create_child_file(&mut transaction, "test")
3732                    .await
3733                    .expect("create_child_file failed");
3734                transaction.commit().await.expect("commit failed");
3735            }
3736            // Shut down the crypt instance for store2, and then compact.  Compaction should not
3737            // fail, and the store should become locked.
3738            crypt.shutdown();
3739            fs.journal().compact().await.expect("compact failed");
3740            // The store should now be locked.
3741            assert!(store2.is_locked());
3742        }
3743
3744        // Even though the store wasn't flushed, the mutation to store2 will still be valid as it is
3745        // held in the journal.
3746        fs.close().await.expect("close failed");
3747        let device = fs.take_device().await;
3748        device.reopen(false);
3749        let fs = FxFilesystem::open(device).await.expect("open failed");
3750        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3751
3752        for volume_name in ["vol1", "vol2"] {
3753            let store = root_volume
3754                .volume(
3755                    volume_name,
3756                    StoreOptions {
3757                        crypt: Some(Arc::new(InsecureCrypt::new())),
3758                        ..StoreOptions::default()
3759                    },
3760                )
3761                .await
3762                .expect("open volume failed");
3763            let root_directory = Directory::open(&store, store.root_directory_object_id())
3764                .await
3765                .expect("open failed");
3766            assert!(root_directory.lookup("test").await.expect("lookup failed").is_some());
3767        }
3768
3769        fs.close().await.expect("close failed");
3770    }
3771
3772    #[fuchsia::test]
3773    async fn test_crypt_failure_during_unlock_race() {
3774        let fs = test_filesystem().await;
3775
3776        struct Owner;
3777        #[async_trait]
3778        impl StoreOwner for Owner {
3779            async fn force_lock(self: Arc<Self>, store: &ObjectStore) -> Result<(), anyhow::Error> {
3780                store.lock().await
3781            }
3782        }
3783        let owner = Arc::new(Owner) as Arc<dyn StoreOwner>;
3784
3785        let store_object_id = {
3786            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3787            let store = root_volume
3788                .new_volume(
3789                    "vol",
3790                    NewChildStoreOptions {
3791                        options: StoreOptions {
3792                            owner: Arc::downgrade(&owner),
3793                            crypt: Some(Arc::new(InsecureCrypt::new())),
3794                        },
3795                        ..Default::default()
3796                    },
3797                )
3798                .await
3799                .expect("new_volume failed");
3800            let root_directory = Directory::open(&store, store.root_directory_object_id())
3801                .await
3802                .expect("open failed");
3803            let mut transaction = fs
3804                .clone()
3805                .new_transaction(
3806                    lock_keys![LockKey::object(
3807                        store.store_object_id(),
3808                        root_directory.object_id()
3809                    )],
3810                    Options::default(),
3811                )
3812                .await
3813                .expect("new_transaction failed");
3814            root_directory
3815                .create_child_file(&mut transaction, "test")
3816                .await
3817                .expect("create_child_file failed");
3818            transaction.commit().await.expect("commit failed");
3819            store.store_object_id()
3820        };
3821
3822        fs.close().await.expect("close failed");
3823        let device = fs.take_device().await;
3824        device.reopen(false);
3825
3826        let fs = FxFilesystem::open(device).await.expect("open failed");
3827        {
3828            let fs_clone = fs.clone();
3829            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3830
3831            let crypt = Arc::new(InsecureCrypt::new());
3832            let crypt_clone = crypt.clone();
3833            join!(
3834                async move {
3835                    // Unlock might fail, so ignore errors.
3836                    let _ = root_volume
3837                        .volume(
3838                            "vol",
3839                            StoreOptions {
3840                                owner: Arc::downgrade(&owner),
3841                                crypt: Some(crypt_clone),
3842                            },
3843                        )
3844                        .await;
3845                },
3846                async move {
3847                    // Block until unlock is finished but before flushing due to unlock is finished, to
3848                    // maximize the chances of weirdness.
3849                    let keys = lock_keys![LockKey::flush(store_object_id)];
3850                    let _ = fs_clone.lock_manager().write_lock(keys).await;
3851                    crypt.shutdown();
3852                }
3853            );
3854        }
3855
3856        fs.close().await.expect("close failed");
3857        let device = fs.take_device().await;
3858        device.reopen(false);
3859
3860        let fs = FxFilesystem::open(device).await.expect("open failed");
3861        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
3862        let store = root_volume
3863            .volume(
3864                "vol",
3865                StoreOptions {
3866                    crypt: Some(Arc::new(InsecureCrypt::new())),
3867                    ..StoreOptions::default()
3868                },
3869            )
3870            .await
3871            .expect("open volume failed");
3872        let root_directory =
3873            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
3874        assert!(root_directory.lookup("test").await.expect("lookup failed").is_some());
3875
3876        fs.close().await.expect("close failed");
3877    }
3878}