fxfs/object_store/
flush.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5// This module is responsible for flushing (a.k.a. compacting) the object store trees.
6
7use crate::filesystem::TxnGuard;
8use crate::log::*;
9use crate::lsm_tree::types::{ItemRef, LayerIterator};
10use crate::lsm_tree::{LSMTree, layers_from_handles};
11use crate::object_handle::{INVALID_OBJECT_ID, ObjectHandle, ReadObjectHandle};
12use crate::object_store::extent_record::ExtentValue;
13use crate::object_store::object_manager::{ObjectManager, ReservationUpdate};
14use crate::object_store::object_record::{ObjectKey, ObjectValue};
15use crate::object_store::transaction::{AssociatedObject, LockKey, Mutation, lock_keys};
16use crate::object_store::{
17    AssocObj, DirectWriter, EncryptedMutations, HandleOptions, LastObjectId, LastObjectIdInfo,
18    LockState, MAX_ENCRYPTED_MUTATIONS_SIZE, ObjectStore, Options, StoreInfo,
19    layer_size_from_encrypted_mutations_size, tree,
20};
21use crate::serialized_types::{LATEST_VERSION, Version, VersionedLatest};
22use anyhow::{Context, Error};
23use fxfs_crypto::{EncryptionKey, KeyPurpose};
24use std::sync::OnceLock;
25use std::sync::atomic::Ordering;
26
27#[derive(Copy, Clone, Debug, PartialEq, Eq)]
28pub enum Reason {
29    /// Journal memory or space pressure.
30    Journal,
31
32    /// After unlock and replay of encrypted mutations.
33    Unlock,
34}
35
36/// If flushing an unlocked fails due to a Crypt error, we want to re-lock the store and flush it
37/// again, so we can make progress on flushing (and therefore journal compaction) without depending
38/// on Crypt.  This is necessary because Crypt is an external component that might crash or become
39/// unresponsive.
40#[derive(Debug)]
41enum FlushResult<T> {
42    Ok(T),
43    CryptError(Error),
44}
45
46#[fxfs_trace::trace]
47impl ObjectStore {
48    #[trace("store_object_id" => self.store_object_id)]
49    pub async fn flush_with_reason(&self, reason: Reason) -> Result<Version, Error> {
50        // Loop to deal with Crypt errors.  If flushing fails due to a Crypt error, we re-lock the
51        // store and try again.  However, another task might racily unlock the store after we lock
52        // but before we flush (since we dropped the lock taken in `try_flush_with_reason`).
53        // We set a limit on the number of times we'll permit a retry, to avoid looping forever if
54        // the race condition is continuously hit.  In practice it is very unlikely to ever occur
55        // at all, since that would require something else busily unlocking the volume and the Crypt
56        // instance dying repeatedly.
57        const MAX_RETRIES: usize = 10;
58        let mut retries = 0;
59        loop {
60            match self.try_flush_with_reason(reason).await? {
61                FlushResult::Ok(version) => return Ok(version),
62                FlushResult::CryptError(error) => {
63                    if reason == Reason::Unlock || retries >= MAX_RETRIES {
64                        // If flushing due to unlock fails due to Crypt issues, just fail, so we
65                        // don't return to `unlock` with a locked store.
66                        return Err(error);
67                    }
68                    log::warn!(
69                        error:?;
70                        "Flushing failed for store {}, re-locking and trying again.",
71                        self.store_object_id()
72                    );
73                    let owner = self.lock_state.lock().owner();
74                    if let Some(owner) = owner {
75                        owner
76                            .force_lock(&self)
77                            .await
78                            .context("Failed to re-lock store during flush")?;
79                    }
80                    // The owner might have already been cleaned up, and in doing so it might have
81                    // locked the store.  In this case we should try again.
82                }
83            }
84            retries += 1;
85        }
86    }
87
88    async fn try_flush_with_reason(&self, reason: Reason) -> Result<FlushResult<Version>, Error> {
89        if self.parent_store.is_none() {
90            // Early exit, but still return the earliest version used by a struct in the tree
91            return Ok(FlushResult::Ok(self.tree.get_earliest_version()));
92        }
93        let filesystem = self.filesystem();
94        let object_manager = filesystem.object_manager();
95
96        // We must take the transaction guard *before* we take the flush lock.
97        let txn_guard = filesystem.clone().txn_guard().await;
98        let keys = lock_keys![LockKey::flush(self.store_object_id())];
99        let _guard = Some(filesystem.lock_manager().write_lock(keys).await);
100
101        // After taking the lock, check to see if the store has been deleted.
102        if matches!(*self.lock_state.lock(), LockState::Deleted) {
103            // When we compact, it's possible that the store has been deleted since we gathered the
104            // list of stores that need compacting.  This is benign.
105            return Ok(FlushResult::Ok(LATEST_VERSION));
106        }
107
108        match reason {
109            Reason::Unlock => {
110                // If we're unlocking, only flush if there are encrypted mutations currently stored
111                // in a file.  We don't worry if they're in memory because a flush should get
112                // triggered when the journal gets full.
113                // Safe to unwrap store_info here because this was invoked from ObjectStore::unlock,
114                // so store_info is already accessible.
115                if self.store_info().unwrap().encrypted_mutations_object_id == INVALID_OBJECT_ID {
116                    // TODO(https://fxbug.dev/42179266): Add earliest_version support for encrypted
117                    // mutations.
118                    // Early exit, but still return the earliest version used by a struct in the
119                    // tree.
120                    return Ok(FlushResult::Ok(self.tree.get_earliest_version()));
121                }
122            }
123            Reason::Journal => {
124                // We flush if we have something to flush *or* the on-disk version of data is not
125                // the latest.
126                let earliest_version = self.tree.get_earliest_version();
127                if !object_manager.needs_flush(self.store_object_id)
128                    && earliest_version == LATEST_VERSION
129                {
130                    // Early exit, but still return the earliest version used by a struct in the
131                    // tree.
132                    return Ok(FlushResult::Ok(earliest_version));
133                }
134            }
135        }
136
137        let trace = self.trace.load(Ordering::Relaxed);
138        if trace {
139            info!(store_id = self.store_object_id(); "OS: begin flush");
140        }
141
142        if matches!(&*self.lock_state.lock(), LockState::Locked) {
143            self.flush_locked(&txn_guard).await.with_context(|| {
144                format!("Failed to flush object store {}", self.store_object_id)
145            })?;
146        } else {
147            if let FlushResult::CryptError(error) = self
148                .flush_unlocked(&txn_guard)
149                .await
150                .with_context(|| format!("Failed to flush object store {}", self.store_object_id))?
151            {
152                return Ok(FlushResult::CryptError(error));
153            }
154        }
155
156        if trace {
157            info!(store_id = self.store_object_id(); "OS: end flush");
158        }
159        if let Some(callback) = &*self.flush_callback.lock() {
160            callback(self);
161        }
162
163        let mut counters = self.counters.lock();
164        counters.num_flushes += 1;
165        counters.last_flush_time = Some(std::time::SystemTime::now());
166        // Return the earliest version used by a struct in the tree
167        Ok(FlushResult::Ok(self.tree.get_earliest_version()))
168    }
169
170    // Flushes an unlocked store. Returns the layer file sizes.
171    async fn flush_unlocked(
172        &self,
173        txn_guard: &TxnGuard<'_>,
174    ) -> Result<FlushResult<Vec<u64>>, Error> {
175        let roll_mutations_key = self
176            .mutations_cipher
177            .lock()
178            .as_ref()
179            .map(|cipher| {
180                cipher.offset() >= self.filesystem().options().roll_metadata_key_byte_count
181            })
182            .unwrap_or(false);
183        if roll_mutations_key {
184            if let Err(error) = self.roll_mutations_key(self.crypt().unwrap().as_ref()).await {
185                log::warn!(
186                    error:?; "Failed to roll mutations key for store {}", self.store_object_id());
187                return Ok(FlushResult::CryptError(error));
188            }
189        }
190
191        struct StoreInfoSnapshot<'a> {
192            store: &'a ObjectStore,
193            store_info: OnceLock<StoreInfo>,
194        }
195        impl AssociatedObject for StoreInfoSnapshot<'_> {
196            fn will_apply_mutation(
197                &self,
198                _mutation: &Mutation,
199                _object_id: u64,
200                _manager: &ObjectManager,
201            ) {
202                let mut store_info = self.store.store_info().unwrap();
203
204                // Capture the offset in the cipher stream.
205                let mutations_cipher = self.store.mutations_cipher.lock();
206                if let Some(cipher) = mutations_cipher.as_ref() {
207                    store_info.mutations_cipher_offset = cipher.offset();
208                }
209
210                self.store_info.set(store_info).unwrap();
211            }
212        }
213
214        let store_info_snapshot = StoreInfoSnapshot { store: self, store_info: OnceLock::new() };
215
216        let filesystem = self.filesystem();
217        let object_manager = filesystem.object_manager();
218        let reservation = object_manager.metadata_reservation();
219        let txn_options = Options {
220            skip_journal_checks: true,
221            borrow_metadata_space: true,
222            allocator_reservation: Some(reservation),
223            txn_guard: Some(txn_guard),
224            ..Default::default()
225        };
226
227        // The BeginFlush mutation must be within a transaction that has no impact on StoreInfo
228        // since we want to get an accurate snapshot of StoreInfo.
229        let mut transaction = filesystem.clone().new_transaction(lock_keys![], txn_options).await?;
230        transaction.add_with_object(
231            self.store_object_id(),
232            Mutation::BeginFlush,
233            AssocObj::Borrowed(&store_info_snapshot),
234        );
235        transaction.commit().await?;
236
237        let mut new_store_info = store_info_snapshot.store_info.into_inner().unwrap();
238
239        // There is a transaction to create objects at the start and then another transaction at the
240        // end. Between those two transactions, there are transactions that write to the files.  In
241        // the first transaction, objects are created in the graveyard. Upon success, the objects
242        // are removed from the graveyard.
243        let mut transaction = filesystem.clone().new_transaction(lock_keys![], txn_options).await?;
244
245        // Create and write a new layer, compacting existing layers.
246        let parent_store = self.parent_store.as_ref().unwrap();
247        let handle_options = HandleOptions { skip_journal_checks: true, ..Default::default() };
248        let new_object_tree_layer = if let Some(crypt) = self.crypt().as_deref() {
249            let object_id = parent_store.get_next_object_id(transaction.txn_guard()).await?;
250            let (key, unwrapped_key) =
251                match crypt.create_key(object_id.get(), KeyPurpose::Data).await {
252                    Ok((key, unwrapped_key)) => (key, unwrapped_key),
253                    Err(status) => {
254                        log::warn!(
255                            status:?;
256                            "Failed to create keys while flushing store {}",
257                            self.store_object_id(),
258                        );
259                        return Ok(FlushResult::CryptError(status.into()));
260                    }
261                };
262            ObjectStore::create_object_with_key(
263                parent_store,
264                &mut transaction,
265                object_id,
266                handle_options,
267                EncryptionKey::Fxfs(key),
268                unwrapped_key,
269            )
270            .await?
271        } else {
272            ObjectStore::create_object(parent_store, &mut transaction, handle_options, None).await?
273        };
274        let writer = DirectWriter::new(&new_object_tree_layer, txn_options).await;
275        let new_object_tree_layer_object_id = new_object_tree_layer.object_id();
276        parent_store.add_to_graveyard(&mut transaction, new_object_tree_layer_object_id);
277
278        transaction.commit().await?;
279        let (layers_to_keep, old_layers) =
280            tree::flush(&self.tree, writer).await.context("Failed to flush tree")?;
281
282        let mut new_layers = layers_from_handles([new_object_tree_layer]).await?;
283        new_layers.extend(layers_to_keep.iter().map(|l| (*l).clone()));
284
285        new_store_info.layers = Vec::new();
286        for layer in &new_layers {
287            if let Some(handle) = layer.handle() {
288                new_store_info.layers.push(handle.object_id());
289            }
290        }
291
292        let reservation_update: ReservationUpdate; // Must live longer than end_transaction.
293        let mut end_transaction = filesystem
294            .clone()
295            .new_transaction(
296                lock_keys![LockKey::object(
297                    self.parent_store.as_ref().unwrap().store_object_id(),
298                    self.store_info_handle_object_id().unwrap(),
299                )],
300                txn_options,
301            )
302            .await?;
303
304        parent_store.remove_from_graveyard(&mut end_transaction, new_object_tree_layer_object_id);
305
306        // Move the existing layers we're compacting to the graveyard at the end.
307        for layer in &old_layers {
308            if let Some(handle) = layer.handle() {
309                parent_store.add_to_graveyard(&mut end_transaction, handle.object_id());
310            }
311        }
312
313        let old_encrypted_mutations_object_id =
314            std::mem::replace(&mut new_store_info.encrypted_mutations_object_id, INVALID_OBJECT_ID);
315        if old_encrypted_mutations_object_id != INVALID_OBJECT_ID {
316            parent_store.add_to_graveyard(&mut end_transaction, old_encrypted_mutations_object_id);
317        }
318
319        // `last_object_id` is updated differently to other members of `StoreInfo`.  We must ensure
320        // that those fields match the current in-memory values.  See the lengthy comment in
321        // `get_next_object_id` for more information.  `end_transaction` has a lock on the same lock
322        // that `get_next_object_id` uses, so there's no danger of the key changing now.
323        //
324        // This might capture object IDs that might be in transactions not yet committed.  In
325        // theory, we could do better than this but it's not worth the effort.
326        match &mut new_store_info.last_object_id {
327            LastObjectIdInfo::Unencrypted { id } => {
328                let LastObjectId::Unencrypted { id: in_memory_value } =
329                    &*self.last_object_id.lock()
330                else {
331                    unreachable!()
332                };
333                *id = *in_memory_value;
334            }
335            LastObjectIdInfo::Encrypted { id, key } => {
336                let LastObjectId::Encrypted { id: in_memory_value, .. } =
337                    &*self.last_object_id.lock()
338                else {
339                    unreachable!()
340                };
341                *id = *in_memory_value;
342                let guard = self.store_info.lock();
343                let current_store_info = guard.as_ref().unwrap();
344                let LastObjectIdInfo::Encrypted { key: in_memory_value, .. } =
345                    &current_store_info.last_object_id
346                else {
347                    unreachable!()
348                };
349                *key = in_memory_value.clone();
350            }
351            LastObjectIdInfo::Low32Bit => {}
352        }
353
354        self.write_store_info(&mut end_transaction, &new_store_info).await?;
355
356        let layer_file_sizes = new_layers
357            .iter()
358            .map(|l| l.handle().map(ReadObjectHandle::get_size).unwrap_or(0))
359            .collect::<Vec<u64>>();
360
361        let total_layer_size = layer_file_sizes.iter().sum();
362        reservation_update =
363            ReservationUpdate::new(tree::reservation_amount_from_layer_size(total_layer_size));
364
365        end_transaction.add_with_object(
366            self.store_object_id(),
367            Mutation::EndFlush,
368            AssocObj::Borrowed(&reservation_update),
369        );
370
371        if self.trace.load(Ordering::Relaxed) {
372            info!(
373                store_id = self.store_object_id(),
374                old_layer_count = old_layers.len(),
375                new_layer_count = new_layers.len(),
376                total_layer_size,
377                new_store_info:?;
378                "OS: compacting"
379            );
380        }
381
382        end_transaction
383            .commit_with_callback(|_| {
384                let mut store_info = self.store_info.lock();
385                let info = store_info.as_mut().unwrap();
386                info.layers = new_store_info.layers;
387                info.encrypted_mutations_object_id = new_store_info.encrypted_mutations_object_id;
388                info.mutations_cipher_offset = new_store_info.mutations_cipher_offset;
389                self.tree.set_layers(new_layers);
390            })
391            .await?;
392
393        // Now close the layers and purge them.
394        for layer in old_layers {
395            let object_id = layer.handle().map(|h| h.object_id());
396            layer.close_layer().await;
397            if let Some(object_id) = object_id {
398                parent_store.tombstone_object(object_id, txn_options).await?;
399            }
400        }
401
402        if old_encrypted_mutations_object_id != INVALID_OBJECT_ID {
403            parent_store.tombstone_object(old_encrypted_mutations_object_id, txn_options).await?;
404        }
405
406        Ok(FlushResult::Ok(layer_file_sizes))
407    }
408
409    // Flushes a locked store.
410    async fn flush_locked(&self, txn_guard: &TxnGuard<'_>) -> Result<(), Error> {
411        let filesystem = self.filesystem();
412        let object_manager = filesystem.object_manager();
413        let reservation = object_manager.metadata_reservation();
414        let txn_options = Options {
415            skip_journal_checks: true,
416            borrow_metadata_space: true,
417            allocator_reservation: Some(reservation),
418            txn_guard: Some(txn_guard),
419            ..Default::default()
420        };
421
422        let mut transaction = filesystem.clone().new_transaction(lock_keys![], txn_options).await?;
423        transaction.add(self.store_object_id(), Mutation::BeginFlush);
424        transaction.commit().await?;
425
426        let mut new_store_info = self.load_store_info().await?;
427
428        // There is a transaction to create objects at the start and then another transaction at the
429        // end. Between those two transactions, there are transactions that write to the files.  In
430        // the first transaction, objects are created in the graveyard. Upon success, the objects
431        // are removed from the graveyard.
432        let mut transaction = filesystem.clone().new_transaction(lock_keys![], txn_options).await?;
433
434        let reservation_update: ReservationUpdate; // Must live longer than end_transaction.
435        let handle; // Must live longer than end_transaction.
436        let mut end_transaction;
437
438        // We need to either write our encrypted mutations to a new file, or append them to an
439        // existing one.
440        let parent_store = self.parent_store.as_ref().unwrap();
441        handle = if new_store_info.encrypted_mutations_object_id == INVALID_OBJECT_ID {
442            let handle = ObjectStore::create_object(
443                parent_store,
444                &mut transaction,
445                HandleOptions { skip_journal_checks: true, ..Default::default() },
446                None,
447            )
448            .await?;
449            let oid = handle.object_id();
450            end_transaction = filesystem
451                .clone()
452                .new_transaction(
453                    lock_keys![
454                        LockKey::object(parent_store.store_object_id(), oid),
455                        LockKey::object(
456                            parent_store.store_object_id(),
457                            self.store_info_handle_object_id().unwrap(),
458                        ),
459                    ],
460                    txn_options,
461                )
462                .await?;
463            new_store_info.encrypted_mutations_object_id = oid;
464            parent_store.add_to_graveyard(&mut transaction, oid);
465            parent_store.remove_from_graveyard(&mut end_transaction, oid);
466            handle
467        } else {
468            end_transaction = filesystem
469                .clone()
470                .new_transaction(
471                    lock_keys![
472                        LockKey::object(
473                            parent_store.store_object_id(),
474                            new_store_info.encrypted_mutations_object_id,
475                        ),
476                        LockKey::object(
477                            parent_store.store_object_id(),
478                            self.store_info_handle_object_id().unwrap(),
479                        ),
480                    ],
481                    txn_options,
482                )
483                .await?;
484            ObjectStore::open_object(
485                parent_store,
486                new_store_info.encrypted_mutations_object_id,
487                HandleOptions { skip_journal_checks: true, ..Default::default() },
488                None,
489            )
490            .await?
491        };
492        transaction.commit().await?;
493
494        // Append the encrypted mutations, which need to be read from the journal.
495        // This assumes that the journal has no buffered mutations for this store (see Self::lock).
496        let journaled = filesystem
497            .journal()
498            .read_transactions_for_object(self.store_object_id)
499            .await
500            .context("Failed to read encrypted mutations from journal")?;
501        let mut buffer = handle.allocate_buffer(MAX_ENCRYPTED_MUTATIONS_SIZE).await;
502        let mut cursor = std::io::Cursor::new(buffer.as_mut_slice());
503        EncryptedMutations::from_replayed_mutations(self.store_object_id, journaled)
504            .serialize_with_version(&mut cursor)?;
505        let len = cursor.position() as usize;
506        handle.txn_write(&mut end_transaction, handle.get_size(), buffer.subslice(..len)).await?;
507
508        self.write_store_info(&mut end_transaction, &new_store_info).await?;
509
510        let mut total_layer_size = 0;
511        for &oid in &new_store_info.layers {
512            total_layer_size += parent_store.get_file_size(oid).await?;
513        }
514        total_layer_size +=
515            layer_size_from_encrypted_mutations_size(handle.get_size() + len as u64);
516
517        reservation_update =
518            ReservationUpdate::new(tree::reservation_amount_from_layer_size(total_layer_size));
519
520        end_transaction.add_with_object(
521            self.store_object_id(),
522            Mutation::EndFlush,
523            AssocObj::Borrowed(&reservation_update),
524        );
525
526        end_transaction.commit().await?;
527
528        Ok(())
529    }
530}
531
532#[cfg(test)]
533mod tests {
534    use crate::filesystem::{FxFilesystem, FxFilesystemBuilder, JournalingObject, SyncOptions};
535    use crate::object_handle::{INVALID_OBJECT_ID, ObjectHandle};
536    use crate::object_store::directory::Directory;
537    use crate::object_store::transaction::{Options, lock_keys};
538    use crate::object_store::volume::root_volume;
539    use crate::object_store::{
540        HandleOptions, LockKey, NO_OWNER, NewChildStoreOptions, ObjectStore, StoreOptions,
541        layer_size_from_encrypted_mutations_size, tree,
542    };
543    use fxfs_insecure_crypto::new_insecure_crypt;
544    use std::sync::Arc;
545    use storage_device::DeviceHolder;
546    use storage_device::fake_device::FakeDevice;
547
548    async fn run_key_roll_test(flush_before_unlock: bool) {
549        let device = DeviceHolder::new(FakeDevice::new(8192, 1024));
550        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
551        let store_id = {
552            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
553            root_volume
554                .new_volume(
555                    "test",
556                    NewChildStoreOptions {
557                        options: StoreOptions {
558                            crypt: Some(Arc::new(new_insecure_crypt())),
559                            ..StoreOptions::default()
560                        },
561                        ..NewChildStoreOptions::default()
562                    },
563                )
564                .await
565                .expect("new_volume failed")
566                .store_object_id()
567        };
568
569        fs.close().await.expect("close failed");
570        let device = fs.take_device().await;
571        device.reopen(false);
572
573        let fs = FxFilesystemBuilder::new()
574            .roll_metadata_key_byte_count(512 * 1024)
575            .open(device)
576            .await
577            .expect("open failed");
578
579        let (first_filename, last_filename) = {
580            let store = fs.object_manager().store(store_id).expect("store not found");
581            store.unlock(NO_OWNER, Arc::new(new_insecure_crypt())).await.expect("unlock failed");
582
583            // Keep writing until we notice the key has rolled.
584            let root_dir = Directory::open(&store, store.root_directory_object_id())
585                .await
586                .expect("open failed");
587
588            let mut last_mutations_cipher_offset = 0;
589            let mut i = 0;
590            let first_filename = format!("{:<200}", i);
591            loop {
592                let mut transaction = fs
593                    .clone()
594                    .new_transaction(
595                        lock_keys![LockKey::object(store_id, root_dir.object_id())],
596                        Options::default(),
597                    )
598                    .await
599                    .expect("new_transaction failed");
600                root_dir
601                    .create_child_file(&mut transaction, &format!("{:<200}", i))
602                    .await
603                    .expect("create_child_file failed");
604                i += 1;
605                transaction.commit().await.expect("commit failed");
606                let cipher_offset = store.mutations_cipher.lock().as_ref().unwrap().offset();
607                if cipher_offset < last_mutations_cipher_offset {
608                    break;
609                }
610                last_mutations_cipher_offset = cipher_offset;
611            }
612
613            // Sync now, so that we can be fairly certain that the next transaction *won't* trigger
614            // a store flush (so we'll still have something to flush when we reopen the filesystem).
615            fs.sync(SyncOptions::default()).await.expect("sync failed");
616
617            // Write one more file to ensure the cipher has a non-zero offset.
618            let mut transaction = fs
619                .clone()
620                .new_transaction(
621                    lock_keys![LockKey::object(store_id, root_dir.object_id())],
622                    Options::default(),
623                )
624                .await
625                .expect("new_transaction failed");
626            let last_filename = format!("{:<200}", i);
627            root_dir
628                .create_child_file(&mut transaction, &last_filename)
629                .await
630                .expect("create_child_file failed");
631            transaction.commit().await.expect("commit failed");
632            (first_filename, last_filename)
633        };
634
635        fs.close().await.expect("close failed");
636
637        // Reopen and make sure replay succeeds.
638        let device = fs.take_device().await;
639        device.reopen(false);
640        let fs = FxFilesystemBuilder::new()
641            .roll_metadata_key_byte_count(512 * 1024)
642            .open(device)
643            .await
644            .expect("open failed");
645
646        if flush_before_unlock {
647            // Flush before unlocking the store which will see that the encrypted mutations get
648            // written to a file.
649            fs.object_manager().flush().await.expect("flush failed");
650        }
651
652        {
653            let store = fs.object_manager().store(store_id).expect("store not found");
654            store.unlock(NO_OWNER, Arc::new(new_insecure_crypt())).await.expect("unlock failed");
655
656            // The key should get rolled when we unlock.
657            assert_eq!(store.mutations_cipher.lock().as_ref().unwrap().offset(), 0);
658
659            let root_dir = Directory::open(&store, store.root_directory_object_id())
660                .await
661                .expect("open failed");
662            root_dir
663                .lookup(&first_filename)
664                .await
665                .expect("Lookup failed")
666                .expect("First created file wasn't present");
667            root_dir
668                .lookup(&last_filename)
669                .await
670                .expect("Lookup failed")
671                .expect("Last created file wasn't present");
672        }
673    }
674
675    #[fuchsia::test(threads = 10)]
676    async fn test_metadata_key_roll() {
677        run_key_roll_test(/* flush_before_unlock: */ false).await;
678    }
679
680    #[fuchsia::test(threads = 10)]
681    async fn test_metadata_key_roll_with_flush_before_unlock() {
682        run_key_roll_test(/* flush_before_unlock: */ true).await;
683    }
684
685    #[fuchsia::test]
686    async fn test_flush_when_locked() {
687        let device = DeviceHolder::new(FakeDevice::new(8192, 1024));
688        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
689        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
690        let crypt = Arc::new(new_insecure_crypt());
691        let store = root_volume
692            .new_volume(
693                "test",
694                NewChildStoreOptions {
695                    options: StoreOptions { crypt: Some(crypt.clone()), ..StoreOptions::default() },
696                    ..NewChildStoreOptions::default()
697                },
698            )
699            .await
700            .expect("new_volume failed");
701        let root_dir =
702            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
703        let mut transaction = fs
704            .clone()
705            .new_transaction(
706                lock_keys![LockKey::object(store.store_object_id(), root_dir.object_id())],
707                Options::default(),
708            )
709            .await
710            .expect("new_transaction failed");
711        let foo = root_dir
712            .create_child_file(&mut transaction, "foo")
713            .await
714            .expect("create_child_file failed");
715        transaction.commit().await.expect("commit failed");
716
717        // When the volume is first created it will include a new mutations key but we want to test
718        // what happens when the encrypted mutations file doesn't contain a new mutations key, so we
719        // flush here.
720        store.flush().await.expect("flush failed");
721
722        let mut transaction = fs
723            .clone()
724            .new_transaction(
725                lock_keys![LockKey::object(store.store_object_id(), root_dir.object_id())],
726                Options::default(),
727            )
728            .await
729            .expect("new_transaction failed");
730        let bar = root_dir
731            .create_child_file(&mut transaction, "bar")
732            .await
733            .expect("create_child_file failed");
734        transaction.commit().await.expect("commit failed");
735
736        store.lock().await.expect("lock failed");
737
738        // Flushing the store whilst locked should create an encrypted mutations file.
739        store.flush().await.expect("flush failed");
740
741        // Check the reservation.
742        let info = store.load_store_info().await.unwrap();
743        let parent_store = store.parent_store().unwrap();
744        let mut total_layer_size = 0;
745        for &oid in &info.layers {
746            total_layer_size +=
747                parent_store.get_file_size(oid).await.expect("get_file_size failed");
748        }
749        assert_ne!(info.encrypted_mutations_object_id, INVALID_OBJECT_ID);
750        total_layer_size += layer_size_from_encrypted_mutations_size(
751            parent_store
752                .get_file_size(info.encrypted_mutations_object_id)
753                .await
754                .expect("get_file_size failed"),
755        );
756        assert_eq!(
757            fs.object_manager().reservation(store.store_object_id()),
758            Some(tree::reservation_amount_from_layer_size(total_layer_size))
759        );
760
761        // Unlocking the store should replay that encrypted mutations file.
762        store.unlock(NO_OWNER, crypt).await.expect("unlock failed");
763
764        ObjectStore::open_object(&store, foo.object_id(), HandleOptions::default(), None)
765            .await
766            .expect("open_object failed");
767
768        ObjectStore::open_object(&store, bar.object_id(), HandleOptions::default(), None)
769            .await
770            .expect("open_object failed");
771
772        fs.close().await.expect("close failed");
773    }
774}
775
776impl tree::MajorCompactable<ObjectKey, ObjectValue> for LSMTree<ObjectKey, ObjectValue> {
777    async fn major_iter(
778        iter: impl LayerIterator<ObjectKey, ObjectValue>,
779    ) -> Result<impl LayerIterator<ObjectKey, ObjectValue>, Error> {
780        iter.filter(|item: ItemRef<'_, _, _>| match item {
781            // Object Tombstone.
782            ItemRef { value: ObjectValue::None, .. } => false,
783            // Deleted extent.
784            ItemRef { value: ObjectValue::Extent(ExtentValue::None), .. } => false,
785            _ => true,
786        })
787        .await
788    }
789}