fxfs/object_store/
flush.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5// This module is responsible for flushing (a.k.a. compacting) the object store trees.
6
7use crate::filesystem::TxnGuard;
8use crate::log::*;
9use crate::lsm_tree::types::{ItemRef, LayerIterator};
10use crate::lsm_tree::{LSMTree, layers_from_handles};
11use crate::object_handle::{INVALID_OBJECT_ID, ObjectHandle, ReadObjectHandle};
12use crate::object_store::extent_record::ExtentValue;
13use crate::object_store::object_manager::{ObjectManager, ReservationUpdate};
14use crate::object_store::object_record::{ObjectKey, ObjectValue};
15use crate::object_store::transaction::{AssociatedObject, LockKey, Mutation, lock_keys};
16use crate::object_store::{
17    AssocObj, DirectWriter, EncryptedMutations, HandleOptions, LockState,
18    MAX_ENCRYPTED_MUTATIONS_SIZE, ObjectStore, Options, StoreInfo,
19    layer_size_from_encrypted_mutations_size, tree,
20};
21use crate::serialized_types::{LATEST_VERSION, Version, VersionedLatest};
22use anyhow::{Context, Error, bail};
23use fxfs_crypto::{EncryptionKey, KeyPurpose};
24use once_cell::sync::OnceCell;
25use std::sync::atomic::Ordering;
26
27#[derive(Copy, Clone, Debug, PartialEq, Eq)]
28pub enum Reason {
29    /// Journal memory or space pressure.
30    Journal,
31
32    /// After unlock and replay of encrypted mutations.
33    Unlock,
34}
35
36/// If flushing an unlocked fails due to a Crypt error, we want to re-lock the store and flush it
37/// again, so we can make progress on flushing (and therefore journal compaction) without depending
38/// on Crypt.  This is necessary because Crypt is an external component that might crash or become
39/// unresponsive.
40#[derive(Debug)]
41enum FlushResult<T> {
42    Ok(T),
43    CryptError(Error),
44}
45
46#[fxfs_trace::trace]
47impl ObjectStore {
48    #[trace("store_object_id" => self.store_object_id)]
49    pub async fn flush_with_reason(&self, reason: Reason) -> Result<Version, Error> {
50        // Loop to deal with Crypt errors.  If flushing fails due to a Crypt error, we re-lock the
51        // store and try again.  However, another task might racily unlock the store after we lock
52        // but before we flush (since we dropped the lock taken in `try_flush_with_reason`).
53        // We set a limit on the number of times we'll permit a retry, to avoid looping forever if
54        // the race condition is continuously hit.  In practice it is very unlikely to ever occur
55        // at all, since that would require something else busily unlocking the volume and the Crypt
56        // instance dying repeatedly.
57        const MAX_RETRIES: usize = 10;
58        let mut retries = 0;
59        loop {
60            match self.try_flush_with_reason(reason).await? {
61                FlushResult::Ok(version) => return Ok(version),
62                FlushResult::CryptError(error) => {
63                    if reason == Reason::Unlock || retries >= MAX_RETRIES {
64                        // If flushing due to unlock fails due to Crypt issues, just fail, so we
65                        // don't return to `unlock` with a locked store.
66                        return Err(error);
67                    }
68                    log::warn!(
69                        error:?;
70                        "Flushing failed for store {}, re-locking and trying again.",
71                        self.store_object_id()
72                    );
73                    let owner = self.lock_state.lock().owner();
74                    if let Some(owner) = owner {
75                        owner
76                            .force_lock(&self)
77                            .await
78                            .context("Failed to re-lock store during flush")?;
79                    } else {
80                        bail!("No store owner was registered!");
81                    }
82                }
83            }
84            retries += 1;
85        }
86    }
87
88    async fn try_flush_with_reason(&self, reason: Reason) -> Result<FlushResult<Version>, Error> {
89        if self.parent_store.is_none() {
90            // Early exit, but still return the earliest version used by a struct in the tree
91            return Ok(FlushResult::Ok(self.tree.get_earliest_version()));
92        }
93        let filesystem = self.filesystem();
94        let object_manager = filesystem.object_manager();
95
96        // We must take the transaction guard *before* we take the flush lock.
97        let txn_guard = filesystem.clone().txn_guard().await;
98        let keys = lock_keys![LockKey::flush(self.store_object_id())];
99        let _guard = Some(filesystem.lock_manager().write_lock(keys).await);
100
101        // After taking the lock, check to see if the store has been deleted.
102        if matches!(*self.lock_state.lock(), LockState::Deleted) {
103            // When we compact, it's possible that the store has been deleted since we gathered the
104            // list of stores that need compacting.  This is benign.
105            return Ok(FlushResult::Ok(LATEST_VERSION));
106        }
107
108        match reason {
109            Reason::Unlock => {
110                // If we're unlocking, only flush if there are encrypted mutations currently stored
111                // in a file.  We don't worry if they're in memory because a flush should get
112                // triggered when the journal gets full.
113                // Safe to unwrap store_info here because this was invoked from ObjectStore::unlock,
114                // so store_info is already accessible.
115                if self.store_info().unwrap().encrypted_mutations_object_id == INVALID_OBJECT_ID {
116                    // TODO(https://fxbug.dev/42179266): Add earliest_version support for encrypted
117                    // mutations.
118                    // Early exit, but still return the earliest version used by a struct in the
119                    // tree.
120                    return Ok(FlushResult::Ok(self.tree.get_earliest_version()));
121                }
122            }
123            Reason::Journal => {
124                // We flush if we have something to flush *or* the on-disk version of data is not
125                // the latest.
126                let earliest_version = self.tree.get_earliest_version();
127                if !object_manager.needs_flush(self.store_object_id)
128                    && earliest_version == LATEST_VERSION
129                {
130                    // Early exit, but still return the earliest version used by a struct in the
131                    // tree.
132                    return Ok(FlushResult::Ok(earliest_version));
133                }
134            }
135        }
136
137        let trace = self.trace.load(Ordering::Relaxed);
138        if trace {
139            info!(store_id = self.store_object_id(); "OS: begin flush");
140        }
141
142        if matches!(&*self.lock_state.lock(), LockState::Locked) {
143            self.flush_locked(&txn_guard).await.with_context(|| {
144                format!("Failed to flush object store {}", self.store_object_id)
145            })?;
146        } else {
147            if let FlushResult::CryptError(error) = self
148                .flush_unlocked(&txn_guard)
149                .await
150                .with_context(|| format!("Failed to flush object store {}", self.store_object_id))?
151            {
152                return Ok(FlushResult::CryptError(error));
153            }
154        }
155
156        if trace {
157            info!(store_id = self.store_object_id(); "OS: end flush");
158        }
159        if let Some(callback) = &*self.flush_callback.lock() {
160            callback(self);
161        }
162
163        let mut counters = self.counters.lock();
164        counters.num_flushes += 1;
165        counters.last_flush_time = Some(std::time::SystemTime::now());
166        // Return the earliest version used by a struct in the tree
167        Ok(FlushResult::Ok(self.tree.get_earliest_version()))
168    }
169
170    // Flushes an unlocked store. Returns the layer file sizes.
171    async fn flush_unlocked(
172        &self,
173        txn_guard: &TxnGuard<'_>,
174    ) -> Result<FlushResult<Vec<u64>>, Error> {
175        let roll_mutations_key = self
176            .mutations_cipher
177            .lock()
178            .as_ref()
179            .map(|cipher| {
180                cipher.offset() >= self.filesystem().options().roll_metadata_key_byte_count
181            })
182            .unwrap_or(false);
183        if roll_mutations_key {
184            if let Err(error) = self.roll_mutations_key(self.crypt().unwrap().as_ref()).await {
185                log::warn!(
186                    error:?; "Failed to roll mutations key for store {}", self.store_object_id());
187                return Ok(FlushResult::CryptError(error));
188            }
189        }
190
191        struct StoreInfoSnapshot<'a> {
192            store: &'a ObjectStore,
193            store_info: OnceCell<StoreInfo>,
194        }
195        impl AssociatedObject for StoreInfoSnapshot<'_> {
196            fn will_apply_mutation(
197                &self,
198                _mutation: &Mutation,
199                _object_id: u64,
200                _manager: &ObjectManager,
201            ) {
202                let mut store_info = self.store.store_info().unwrap();
203
204                // Capture the offset in the cipher stream.
205                let mutations_cipher = self.store.mutations_cipher.lock();
206                if let Some(cipher) = mutations_cipher.as_ref() {
207                    store_info.mutations_cipher_offset = cipher.offset();
208                }
209
210                // This will capture object IDs that might be in transactions not yet committed.  In
211                // theory, we could do better than this but it's not worth the effort.
212                store_info.last_object_id = self.store.last_object_id.lock().id;
213
214                self.store_info.set(store_info).unwrap();
215            }
216        }
217
218        let store_info_snapshot = StoreInfoSnapshot { store: self, store_info: OnceCell::new() };
219
220        let filesystem = self.filesystem();
221        let object_manager = filesystem.object_manager();
222        let reservation = object_manager.metadata_reservation();
223        let txn_options = Options {
224            skip_journal_checks: true,
225            borrow_metadata_space: true,
226            allocator_reservation: Some(reservation),
227            txn_guard: Some(txn_guard),
228            ..Default::default()
229        };
230
231        // The BeginFlush mutation must be within a transaction that has no impact on StoreInfo
232        // since we want to get an accurate snapshot of StoreInfo.
233        let mut transaction = filesystem.clone().new_transaction(lock_keys![], txn_options).await?;
234        transaction.add_with_object(
235            self.store_object_id(),
236            Mutation::BeginFlush,
237            AssocObj::Borrowed(&store_info_snapshot),
238        );
239        transaction.commit().await?;
240
241        let mut new_store_info = store_info_snapshot.store_info.into_inner().unwrap();
242
243        // There is a transaction to create objects at the start and then another transaction at the
244        // end. Between those two transactions, there are transactions that write to the files.  In
245        // the first transaction, objects are created in the graveyard. Upon success, the objects
246        // are removed from the graveyard.
247        let mut transaction = filesystem.clone().new_transaction(lock_keys![], txn_options).await?;
248
249        let reservation_update: ReservationUpdate; // Must live longer than end_transaction.
250        let mut end_transaction = filesystem
251            .clone()
252            .new_transaction(
253                lock_keys![LockKey::object(
254                    self.parent_store.as_ref().unwrap().store_object_id(),
255                    self.store_info_handle_object_id().unwrap(),
256                )],
257                txn_options,
258            )
259            .await?;
260
261        // Create and write a new layer, compacting existing layers.
262        let parent_store = self.parent_store.as_ref().unwrap();
263        let handle_options = HandleOptions { skip_journal_checks: true, ..Default::default() };
264        let new_object_tree_layer = if let Some(crypt) = self.crypt().as_deref() {
265            let object_id = parent_store.get_next_object_id(transaction.txn_guard()).await?;
266            let (key, unwrapped_key) = match crypt.create_key(object_id, KeyPurpose::Data).await {
267                Ok((key, unwrapped_key)) => (key, unwrapped_key),
268                Err(status) => {
269                    log::warn!(
270                        status:?;
271                        "Failed to create keys while flushing store {}",
272                        self.store_object_id(),
273                    );
274                    return Ok(FlushResult::CryptError(status.into()));
275                }
276            };
277            ObjectStore::create_object_with_key(
278                parent_store,
279                &mut transaction,
280                object_id,
281                handle_options,
282                EncryptionKey::Fxfs(key),
283                unwrapped_key,
284            )
285            .await?
286        } else {
287            ObjectStore::create_object(parent_store, &mut transaction, handle_options, None).await?
288        };
289        let writer = DirectWriter::new(&new_object_tree_layer, txn_options).await;
290        let new_object_tree_layer_object_id = new_object_tree_layer.object_id();
291        parent_store.add_to_graveyard(&mut transaction, new_object_tree_layer_object_id);
292        parent_store.remove_from_graveyard(&mut end_transaction, new_object_tree_layer_object_id);
293
294        transaction.commit().await?;
295        let (layers_to_keep, old_layers) =
296            tree::flush(&self.tree, writer).await.context("Failed to flush tree")?;
297
298        let mut new_layers = layers_from_handles([new_object_tree_layer]).await?;
299        new_layers.extend(layers_to_keep.iter().map(|l| (*l).clone()));
300
301        new_store_info.layers = Vec::new();
302        for layer in &new_layers {
303            if let Some(handle) = layer.handle() {
304                new_store_info.layers.push(handle.object_id());
305            }
306        }
307
308        // Move the existing layers we're compacting to the graveyard at the end.
309        for layer in &old_layers {
310            if let Some(handle) = layer.handle() {
311                parent_store.add_to_graveyard(&mut end_transaction, handle.object_id());
312            }
313        }
314
315        let old_encrypted_mutations_object_id =
316            std::mem::replace(&mut new_store_info.encrypted_mutations_object_id, INVALID_OBJECT_ID);
317        if old_encrypted_mutations_object_id != INVALID_OBJECT_ID {
318            parent_store.add_to_graveyard(&mut end_transaction, old_encrypted_mutations_object_id);
319        }
320
321        self.write_store_info(&mut end_transaction, &new_store_info).await?;
322
323        let layer_file_sizes = new_layers
324            .iter()
325            .map(|l| l.handle().map(ReadObjectHandle::get_size).unwrap_or(0))
326            .collect::<Vec<u64>>();
327
328        let total_layer_size = layer_file_sizes.iter().sum();
329        reservation_update =
330            ReservationUpdate::new(tree::reservation_amount_from_layer_size(total_layer_size));
331
332        end_transaction.add_with_object(
333            self.store_object_id(),
334            Mutation::EndFlush,
335            AssocObj::Borrowed(&reservation_update),
336        );
337
338        if self.trace.load(Ordering::Relaxed) {
339            info!(
340                store_id = self.store_object_id(),
341                old_layer_count = old_layers.len(),
342                new_layer_count = new_layers.len(),
343                total_layer_size,
344                new_store_info:?;
345                "OS: compacting"
346            );
347        }
348
349        end_transaction
350            .commit_with_callback(|_| {
351                let mut store_info = self.store_info.lock();
352                let info = store_info.as_mut().unwrap();
353                info.layers = new_store_info.layers;
354                info.encrypted_mutations_object_id = new_store_info.encrypted_mutations_object_id;
355                info.mutations_cipher_offset = new_store_info.mutations_cipher_offset;
356                self.tree.set_layers(new_layers);
357            })
358            .await?;
359
360        // Now close the layers and purge them.
361        for layer in old_layers {
362            let object_id = layer.handle().map(|h| h.object_id());
363            layer.close_layer().await;
364            if let Some(object_id) = object_id {
365                parent_store.tombstone_object(object_id, txn_options).await?;
366            }
367        }
368
369        if old_encrypted_mutations_object_id != INVALID_OBJECT_ID {
370            parent_store.tombstone_object(old_encrypted_mutations_object_id, txn_options).await?;
371        }
372
373        Ok(FlushResult::Ok(layer_file_sizes))
374    }
375
376    // Flushes a locked store.
377    async fn flush_locked(&self, txn_guard: &TxnGuard<'_>) -> Result<(), Error> {
378        let filesystem = self.filesystem();
379        let object_manager = filesystem.object_manager();
380        let reservation = object_manager.metadata_reservation();
381        let txn_options = Options {
382            skip_journal_checks: true,
383            borrow_metadata_space: true,
384            allocator_reservation: Some(reservation),
385            txn_guard: Some(txn_guard),
386            ..Default::default()
387        };
388
389        let mut transaction = filesystem.clone().new_transaction(lock_keys![], txn_options).await?;
390        transaction.add(self.store_object_id(), Mutation::BeginFlush);
391        transaction.commit().await?;
392
393        let mut new_store_info = self.load_store_info().await?;
394
395        // There is a transaction to create objects at the start and then another transaction at the
396        // end. Between those two transactions, there are transactions that write to the files.  In
397        // the first transaction, objects are created in the graveyard. Upon success, the objects
398        // are removed from the graveyard.
399        let mut transaction = filesystem.clone().new_transaction(lock_keys![], txn_options).await?;
400
401        let reservation_update: ReservationUpdate; // Must live longer than end_transaction.
402        let handle; // Must live longer than end_transaction.
403        let mut end_transaction;
404
405        // We need to either write our encrypted mutations to a new file, or append them to an
406        // existing one.
407        let parent_store = self.parent_store.as_ref().unwrap();
408        handle = if new_store_info.encrypted_mutations_object_id == INVALID_OBJECT_ID {
409            let handle = ObjectStore::create_object(
410                parent_store,
411                &mut transaction,
412                HandleOptions { skip_journal_checks: true, ..Default::default() },
413                None,
414            )
415            .await?;
416            let oid = handle.object_id();
417            end_transaction = filesystem
418                .clone()
419                .new_transaction(
420                    lock_keys![
421                        LockKey::object(parent_store.store_object_id(), oid),
422                        LockKey::object(
423                            parent_store.store_object_id(),
424                            self.store_info_handle_object_id().unwrap(),
425                        ),
426                    ],
427                    txn_options,
428                )
429                .await?;
430            new_store_info.encrypted_mutations_object_id = oid;
431            parent_store.add_to_graveyard(&mut transaction, oid);
432            parent_store.remove_from_graveyard(&mut end_transaction, oid);
433            handle
434        } else {
435            end_transaction = filesystem
436                .clone()
437                .new_transaction(
438                    lock_keys![
439                        LockKey::object(
440                            parent_store.store_object_id(),
441                            new_store_info.encrypted_mutations_object_id,
442                        ),
443                        LockKey::object(
444                            parent_store.store_object_id(),
445                            self.store_info_handle_object_id().unwrap(),
446                        ),
447                    ],
448                    txn_options,
449                )
450                .await?;
451            ObjectStore::open_object(
452                parent_store,
453                new_store_info.encrypted_mutations_object_id,
454                HandleOptions { skip_journal_checks: true, ..Default::default() },
455                None,
456            )
457            .await?
458        };
459        transaction.commit().await?;
460
461        // Append the encrypted mutations, which need to be read from the journal.
462        // This assumes that the journal has no buffered mutations for this store (see Self::lock).
463        let journaled = filesystem
464            .journal()
465            .read_transactions_for_object(self.store_object_id)
466            .await
467            .context("Failed to read encrypted mutations from journal")?;
468        let mut buffer = handle.allocate_buffer(MAX_ENCRYPTED_MUTATIONS_SIZE).await;
469        let mut cursor = std::io::Cursor::new(buffer.as_mut_slice());
470        EncryptedMutations::from_replayed_mutations(self.store_object_id, journaled)
471            .serialize_with_version(&mut cursor)?;
472        let len = cursor.position() as usize;
473        handle.txn_write(&mut end_transaction, handle.get_size(), buffer.subslice(..len)).await?;
474
475        self.write_store_info(&mut end_transaction, &new_store_info).await?;
476
477        let mut total_layer_size = 0;
478        for &oid in &new_store_info.layers {
479            total_layer_size += parent_store.get_file_size(oid).await?;
480        }
481        total_layer_size +=
482            layer_size_from_encrypted_mutations_size(handle.get_size() + len as u64);
483
484        reservation_update =
485            ReservationUpdate::new(tree::reservation_amount_from_layer_size(total_layer_size));
486
487        end_transaction.add_with_object(
488            self.store_object_id(),
489            Mutation::EndFlush,
490            AssocObj::Borrowed(&reservation_update),
491        );
492
493        end_transaction.commit().await?;
494
495        Ok(())
496    }
497}
498
499#[cfg(test)]
500mod tests {
501    use crate::filesystem::{FxFilesystem, FxFilesystemBuilder, JournalingObject, SyncOptions};
502    use crate::object_handle::{INVALID_OBJECT_ID, ObjectHandle};
503    use crate::object_store::directory::Directory;
504    use crate::object_store::transaction::{Options, lock_keys};
505    use crate::object_store::volume::root_volume;
506    use crate::object_store::{
507        HandleOptions, LockKey, NO_OWNER, NewChildStoreOptions, ObjectStore, StoreOptions,
508        layer_size_from_encrypted_mutations_size, tree,
509    };
510    use fxfs_insecure_crypto::InsecureCrypt;
511    use std::sync::Arc;
512    use storage_device::DeviceHolder;
513    use storage_device::fake_device::FakeDevice;
514
515    async fn run_key_roll_test(flush_before_unlock: bool) {
516        let device = DeviceHolder::new(FakeDevice::new(8192, 1024));
517        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
518        let store_id = {
519            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
520            root_volume
521                .new_volume(
522                    "test",
523                    NewChildStoreOptions {
524                        options: StoreOptions {
525                            crypt: Some(Arc::new(InsecureCrypt::new())),
526                            ..StoreOptions::default()
527                        },
528                        ..NewChildStoreOptions::default()
529                    },
530                )
531                .await
532                .expect("new_volume failed")
533                .store_object_id()
534        };
535
536        fs.close().await.expect("close failed");
537        let device = fs.take_device().await;
538        device.reopen(false);
539
540        let fs = FxFilesystemBuilder::new()
541            .roll_metadata_key_byte_count(512 * 1024)
542            .open(device)
543            .await
544            .expect("open failed");
545
546        let (first_filename, last_filename) = {
547            let store = fs.object_manager().store(store_id).expect("store not found");
548            store.unlock(NO_OWNER, Arc::new(InsecureCrypt::new())).await.expect("unlock failed");
549
550            // Keep writing until we notice the key has rolled.
551            let root_dir = Directory::open(&store, store.root_directory_object_id())
552                .await
553                .expect("open failed");
554
555            let mut last_mutations_cipher_offset = 0;
556            let mut i = 0;
557            let first_filename = format!("{:<200}", i);
558            loop {
559                let mut transaction = fs
560                    .clone()
561                    .new_transaction(
562                        lock_keys![LockKey::object(store_id, root_dir.object_id())],
563                        Options::default(),
564                    )
565                    .await
566                    .expect("new_transaction failed");
567                root_dir
568                    .create_child_file(&mut transaction, &format!("{:<200}", i))
569                    .await
570                    .expect("create_child_file failed");
571                i += 1;
572                transaction.commit().await.expect("commit failed");
573                let cipher_offset = store.mutations_cipher.lock().as_ref().unwrap().offset();
574                if cipher_offset < last_mutations_cipher_offset {
575                    break;
576                }
577                last_mutations_cipher_offset = cipher_offset;
578            }
579
580            // Sync now, so that we can be fairly certain that the next transaction *won't* trigger
581            // a store flush (so we'll still have something to flush when we reopen the filesystem).
582            fs.sync(SyncOptions::default()).await.expect("sync failed");
583
584            // Write one more file to ensure the cipher has a non-zero offset.
585            let mut transaction = fs
586                .clone()
587                .new_transaction(
588                    lock_keys![LockKey::object(store_id, root_dir.object_id())],
589                    Options::default(),
590                )
591                .await
592                .expect("new_transaction failed");
593            let last_filename = format!("{:<200}", i);
594            root_dir
595                .create_child_file(&mut transaction, &last_filename)
596                .await
597                .expect("create_child_file failed");
598            transaction.commit().await.expect("commit failed");
599            (first_filename, last_filename)
600        };
601
602        fs.close().await.expect("close failed");
603
604        // Reopen and make sure replay succeeds.
605        let device = fs.take_device().await;
606        device.reopen(false);
607        let fs = FxFilesystemBuilder::new()
608            .roll_metadata_key_byte_count(512 * 1024)
609            .open(device)
610            .await
611            .expect("open failed");
612
613        if flush_before_unlock {
614            // Flush before unlocking the store which will see that the encrypted mutations get
615            // written to a file.
616            fs.object_manager().flush().await.expect("flush failed");
617        }
618
619        {
620            let store = fs.object_manager().store(store_id).expect("store not found");
621            store.unlock(NO_OWNER, Arc::new(InsecureCrypt::new())).await.expect("unlock failed");
622
623            // The key should get rolled when we unlock.
624            assert_eq!(store.mutations_cipher.lock().as_ref().unwrap().offset(), 0);
625
626            let root_dir = Directory::open(&store, store.root_directory_object_id())
627                .await
628                .expect("open failed");
629            root_dir
630                .lookup(&first_filename)
631                .await
632                .expect("Lookup failed")
633                .expect("First created file wasn't present");
634            root_dir
635                .lookup(&last_filename)
636                .await
637                .expect("Lookup failed")
638                .expect("Last created file wasn't present");
639        }
640    }
641
642    #[fuchsia::test(threads = 10)]
643    async fn test_metadata_key_roll() {
644        run_key_roll_test(/* flush_before_unlock: */ false).await;
645    }
646
647    #[fuchsia::test(threads = 10)]
648    async fn test_metadata_key_roll_with_flush_before_unlock() {
649        run_key_roll_test(/* flush_before_unlock: */ true).await;
650    }
651
652    #[fuchsia::test]
653    async fn test_flush_when_locked() {
654        let device = DeviceHolder::new(FakeDevice::new(8192, 1024));
655        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
656        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
657        let crypt = Arc::new(InsecureCrypt::new());
658        let store = root_volume
659            .new_volume(
660                "test",
661                NewChildStoreOptions {
662                    options: StoreOptions { crypt: Some(crypt.clone()), ..StoreOptions::default() },
663                    ..NewChildStoreOptions::default()
664                },
665            )
666            .await
667            .expect("new_volume failed");
668        let root_dir =
669            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
670        let mut transaction = fs
671            .clone()
672            .new_transaction(
673                lock_keys![LockKey::object(store.store_object_id(), root_dir.object_id())],
674                Options::default(),
675            )
676            .await
677            .expect("new_transaction failed");
678        let foo = root_dir
679            .create_child_file(&mut transaction, "foo")
680            .await
681            .expect("create_child_file failed");
682        transaction.commit().await.expect("commit failed");
683
684        // When the volume is first created it will include a new mutations key but we want to test
685        // what happens when the encrypted mutations file doesn't contain a new mutations key, so we
686        // flush here.
687        store.flush().await.expect("flush failed");
688
689        let mut transaction = fs
690            .clone()
691            .new_transaction(
692                lock_keys![LockKey::object(store.store_object_id(), root_dir.object_id())],
693                Options::default(),
694            )
695            .await
696            .expect("new_transaction failed");
697        let bar = root_dir
698            .create_child_file(&mut transaction, "bar")
699            .await
700            .expect("create_child_file failed");
701        transaction.commit().await.expect("commit failed");
702
703        store.lock().await.expect("lock failed");
704
705        // Flushing the store whilst locked should create an encrypted mutations file.
706        store.flush().await.expect("flush failed");
707
708        // Check the reservation.
709        let info = store.load_store_info().await.unwrap();
710        let parent_store = store.parent_store().unwrap();
711        let mut total_layer_size = 0;
712        for &oid in &info.layers {
713            total_layer_size +=
714                parent_store.get_file_size(oid).await.expect("get_file_size failed");
715        }
716        assert_ne!(info.encrypted_mutations_object_id, INVALID_OBJECT_ID);
717        total_layer_size += layer_size_from_encrypted_mutations_size(
718            parent_store
719                .get_file_size(info.encrypted_mutations_object_id)
720                .await
721                .expect("get_file_size failed"),
722        );
723        assert_eq!(
724            fs.object_manager().reservation(store.store_object_id()),
725            Some(tree::reservation_amount_from_layer_size(total_layer_size))
726        );
727
728        // Unlocking the store should replay that encrypted mutations file.
729        store.unlock(NO_OWNER, crypt).await.expect("unlock failed");
730
731        ObjectStore::open_object(&store, foo.object_id(), HandleOptions::default(), None)
732            .await
733            .expect("open_object failed");
734
735        ObjectStore::open_object(&store, bar.object_id(), HandleOptions::default(), None)
736            .await
737            .expect("open_object failed");
738
739        fs.close().await.expect("close failed");
740    }
741}
742
743impl tree::MajorCompactable<ObjectKey, ObjectValue> for LSMTree<ObjectKey, ObjectValue> {
744    async fn major_iter(
745        iter: impl LayerIterator<ObjectKey, ObjectValue>,
746    ) -> Result<impl LayerIterator<ObjectKey, ObjectValue>, Error> {
747        iter.filter(|item: ItemRef<'_, _, _>| match item {
748            // Object Tombstone.
749            ItemRef { value: ObjectValue::None, .. } => false,
750            // Deleted extent.
751            ItemRef { value: ObjectValue::Extent(ExtentValue::None), .. } => false,
752            _ => true,
753        })
754        .await
755    }
756}