Skip to main content

fxfs/object_store/
flush.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5// This module is responsible for flushing (a.k.a. compacting) the object store trees.
6
7use crate::filesystem::TxnGuard;
8use crate::log::*;
9use crate::lsm_tree::types::{ItemRef, LayerIterator};
10use crate::lsm_tree::{LSMTree, layers_from_handles};
11use crate::object_handle::{INVALID_OBJECT_ID, ObjectHandle, ReadObjectHandle};
12use crate::object_store::extent_record::ExtentValue;
13use crate::object_store::object_manager::{ObjectManager, ReservationUpdate};
14use crate::object_store::object_record::{ObjectKey, ObjectValue};
15use crate::object_store::transaction::{AssociatedObject, LockKey, Mutation, lock_keys};
16use crate::object_store::{
17    AssocObj, DirectWriter, EncryptedMutations, HandleOptions, LastObjectId, LastObjectIdInfo,
18    LockState, MAX_ENCRYPTED_MUTATIONS_SIZE, ObjectStore, Options, StoreInfo,
19    layer_size_from_encrypted_mutations_size, tree,
20};
21use crate::serialized_types::{LATEST_VERSION, Version, VersionedLatest};
22use anyhow::{Context, Error};
23use fxfs_crypto::{EncryptionKey, KeyPurpose};
24use std::sync::OnceLock;
25use std::sync::atomic::Ordering;
26
27#[derive(Copy, Clone, Debug, PartialEq, Eq)]
28pub enum Reason {
29    /// Journal memory or space pressure.
30    Journal,
31
32    /// After unlock and replay of encrypted mutations.
33    Unlock,
34}
35
36/// If flushing an unlocked fails due to a Crypt error, we want to re-lock the store and flush it
37/// again, so we can make progress on flushing (and therefore journal compaction) without depending
38/// on Crypt.  This is necessary because Crypt is an external component that might crash or become
39/// unresponsive.
40#[derive(Debug)]
41enum FlushResult<T> {
42    Ok(T),
43    CryptError(Error),
44}
45
46#[fxfs_trace::trace]
47impl ObjectStore {
48    #[trace("store_object_id" => self.store_object_id)]
49    pub async fn flush_with_reason(&self, reason: Reason) -> Result<Version, Error> {
50        // Loop to deal with Crypt errors.  If flushing fails due to a Crypt error, we re-lock the
51        // store and try again.  However, another task might racily unlock the store after we lock
52        // but before we flush (since we dropped the lock taken in `try_flush_with_reason`).
53        // We set a limit on the number of times we'll permit a retry, to avoid looping forever if
54        // the race condition is continuously hit.  In practice it is very unlikely to ever occur
55        // at all, since that would require something else busily unlocking the volume and the Crypt
56        // instance dying repeatedly.
57        const MAX_RETRIES: usize = 10;
58        let mut retries = 0;
59        loop {
60            match self.try_flush_with_reason(reason).await? {
61                FlushResult::Ok(version) => return Ok(version),
62                FlushResult::CryptError(error) => {
63                    if reason == Reason::Unlock || retries >= MAX_RETRIES {
64                        // If flushing due to unlock fails due to Crypt issues, just fail, so we
65                        // don't return to `unlock` with a locked store.
66                        return Err(error);
67                    }
68                    log::warn!(
69                        error:?;
70                        "Flushing failed for store {}, re-locking and trying again.",
71                        self.store_object_id()
72                    );
73                    let owner = self.lock_state.lock().owner();
74                    if let Some(owner) = owner {
75                        owner
76                            .force_lock(&self)
77                            .await
78                            .context("Failed to re-lock store during flush")?;
79                    }
80                    // The owner might have already been cleaned up, and in doing so it might have
81                    // locked the store.  In this case we should try again.
82                }
83            }
84            retries += 1;
85        }
86    }
87
88    async fn try_flush_with_reason(&self, reason: Reason) -> Result<FlushResult<Version>, Error> {
89        if self.parent_store.is_none() {
90            // Early exit, but still return the earliest version used by a struct in the tree
91            return Ok(FlushResult::Ok(self.tree.get_earliest_version()));
92        }
93        let filesystem = self.filesystem();
94        let object_manager = filesystem.object_manager();
95
96        // We must take the transaction guard *before* we take the flush lock.
97        let txn_guard = filesystem.clone().txn_guard().await;
98        let keys = lock_keys![LockKey::flush(self.store_object_id())];
99        let _guard = Some(filesystem.lock_manager().write_lock(keys).await);
100
101        // After taking the lock, check to see if the store has been deleted.
102        if matches!(*self.lock_state.lock(), LockState::Deleted) {
103            // When we compact, it's possible that the store has been deleted since we gathered the
104            // list of stores that need compacting.  This is benign.
105            return Ok(FlushResult::Ok(LATEST_VERSION));
106        }
107
108        match reason {
109            Reason::Unlock => {
110                // If we're unlocking, only flush if there are encrypted mutations currently stored
111                // in a file.  We don't worry if they're in memory because a flush should get
112                // triggered when the journal gets full.
113                // Safe to unwrap store_info here because this was invoked from ObjectStore::unlock,
114                // so store_info is already accessible.
115                if self.store_info().unwrap().encrypted_mutations_object_id == INVALID_OBJECT_ID {
116                    // TODO(https://fxbug.dev/42179266): Add earliest_version support for encrypted
117                    // mutations.
118                    // Early exit, but still return the earliest version used by a struct in the
119                    // tree.
120                    return Ok(FlushResult::Ok(self.tree.get_earliest_version()));
121                }
122            }
123            Reason::Journal => {
124                // We flush if we have something to flush *or* the on-disk version of data is not
125                // the latest.
126                let earliest_version = self.tree.get_earliest_version();
127                if !object_manager.needs_flush(self.store_object_id)
128                    && earliest_version == LATEST_VERSION
129                {
130                    // Early exit, but still return the earliest version used by a struct in the
131                    // tree.
132                    return Ok(FlushResult::Ok(earliest_version));
133                }
134            }
135        }
136
137        let trace = self.trace.load(Ordering::Relaxed);
138        if trace {
139            info!(store_id = self.store_object_id(); "OS: begin flush");
140        }
141
142        if matches!(&*self.lock_state.lock(), LockState::Locked) {
143            self.flush_locked(&txn_guard).await.with_context(|| {
144                format!("Failed to flush object store {}", self.store_object_id)
145            })?;
146        } else {
147            if let FlushResult::CryptError(error) = self
148                .flush_unlocked(&txn_guard, reason)
149                .await
150                .with_context(|| format!("Failed to flush object store {}", self.store_object_id))?
151            {
152                return Ok(FlushResult::CryptError(error));
153            }
154        }
155
156        if trace {
157            info!(store_id = self.store_object_id(); "OS: end flush");
158        }
159        if let Some(callback) = &*self.flush_callback.lock() {
160            callback(self);
161        }
162
163        let mut counters = self.counters.lock();
164        counters.num_flushes += 1;
165        counters.last_flush_time = Some(std::time::SystemTime::now());
166        // Return the earliest version used by a struct in the tree
167        Ok(FlushResult::Ok(self.tree.get_earliest_version()))
168    }
169
170    // Flushes an unlocked store. Returns the layer file sizes.
171    async fn flush_unlocked(
172        &self,
173        txn_guard: &TxnGuard<'_>,
174        reason: Reason,
175    ) -> Result<FlushResult<Vec<u64>>, Error> {
176        let roll_mutations_key = self
177            .mutations_cipher
178            .lock()
179            .as_ref()
180            .map(|cipher| {
181                cipher.offset() >= self.filesystem().options().roll_metadata_key_byte_count
182            })
183            .unwrap_or(false);
184        if roll_mutations_key {
185            if let Err(error) = self.roll_mutations_key(self.crypt().unwrap().as_ref()).await {
186                log::warn!(
187                    error:?; "Failed to roll mutations key for store {}", self.store_object_id());
188                return Ok(FlushResult::CryptError(error));
189            }
190        }
191
192        struct StoreInfoSnapshot<'a> {
193            store: &'a ObjectStore,
194            store_info: OnceLock<StoreInfo>,
195        }
196        impl AssociatedObject for StoreInfoSnapshot<'_> {
197            fn will_apply_mutation(
198                &self,
199                _mutation: &Mutation,
200                _object_id: u64,
201                _manager: &ObjectManager,
202            ) {
203                let mut store_info = self.store.store_info().unwrap();
204
205                // Capture the offset in the cipher stream.
206                let mutations_cipher = self.store.mutations_cipher.lock();
207                if let Some(cipher) = mutations_cipher.as_ref() {
208                    store_info.mutations_cipher_offset = cipher.offset();
209                }
210
211                self.store_info.set(store_info).unwrap();
212            }
213        }
214
215        let store_info_snapshot = StoreInfoSnapshot { store: self, store_info: OnceLock::new() };
216
217        let filesystem = self.filesystem();
218        let object_manager = filesystem.object_manager();
219        let reservation = object_manager.metadata_reservation();
220        let txn_options = Options {
221            skip_journal_checks: true,
222            borrow_metadata_space: true,
223            allocator_reservation: Some(reservation),
224            txn_guard: Some(txn_guard),
225            ..Default::default()
226        };
227
228        // The BeginFlush mutation must be within a transaction that has no impact on StoreInfo
229        // since we want to get an accurate snapshot of StoreInfo.
230        let mut transaction = filesystem.clone().new_transaction(lock_keys![], txn_options).await?;
231        transaction.add_with_object(
232            self.store_object_id(),
233            Mutation::BeginFlush,
234            AssocObj::Borrowed(&store_info_snapshot),
235        );
236        transaction.commit().await?;
237
238        let mut new_store_info = store_info_snapshot.store_info.into_inner().unwrap();
239
240        // There is a transaction to create objects at the start and then another transaction at the
241        // end. Between those two transactions, there are transactions that write to the files.  In
242        // the first transaction, objects are created in the graveyard. Upon success, the objects
243        // are removed from the graveyard.
244        let mut transaction = filesystem.clone().new_transaction(lock_keys![], txn_options).await?;
245
246        // Create and write a new layer, compacting existing layers.
247        let parent_store = self.parent_store.as_ref().unwrap();
248        let handle_options = HandleOptions { skip_journal_checks: true, ..Default::default() };
249        let new_object_tree_layer = if let Some(crypt) = self.crypt().as_deref() {
250            let object_id = parent_store.get_next_object_id(transaction.txn_guard()).await?;
251            let (key, unwrapped_key) =
252                match crypt.create_key(object_id.get(), KeyPurpose::Data).await {
253                    Ok((key, unwrapped_key)) => (key, unwrapped_key),
254                    Err(status) => {
255                        log::warn!(
256                            status:?;
257                            "Failed to create keys while flushing store {}",
258                            self.store_object_id(),
259                        );
260                        return Ok(FlushResult::CryptError(status.into()));
261                    }
262                };
263            ObjectStore::create_object_with_key(
264                parent_store,
265                &mut transaction,
266                object_id,
267                handle_options,
268                EncryptionKey::Fxfs(key),
269                unwrapped_key,
270            )
271            .await?
272        } else {
273            ObjectStore::create_object(parent_store, &mut transaction, handle_options, None).await?
274        };
275        let writer = DirectWriter::new(&new_object_tree_layer, txn_options).await;
276        let new_object_tree_layer_object_id = new_object_tree_layer.object_id();
277        parent_store.add_to_graveyard(&mut transaction, new_object_tree_layer_object_id);
278
279        transaction.commit().await?;
280
281        // *Do* the actual compaction.
282        let (layers_to_keep, old_layers) = tree::flush(
283            &self.tree,
284            writer,
285            (reason == Reason::Journal).then(|| filesystem.journal().get_compaction_yielder()),
286        )
287        .await
288        .context("Failed to flush tree")?;
289
290        // Finalise the compaction.
291        let mut new_layers = layers_from_handles([new_object_tree_layer]).await?;
292        new_layers.extend(layers_to_keep.iter().map(|l| (*l).clone()));
293
294        new_store_info.layers = Vec::new();
295        for layer in &new_layers {
296            if let Some(handle) = layer.handle() {
297                new_store_info.layers.push(handle.object_id());
298            }
299        }
300
301        let reservation_update: ReservationUpdate; // Must live longer than end_transaction.
302        let mut end_transaction = filesystem
303            .clone()
304            .new_transaction(
305                lock_keys![LockKey::object(
306                    self.parent_store.as_ref().unwrap().store_object_id(),
307                    self.store_info_handle_object_id().unwrap(),
308                )],
309                txn_options,
310            )
311            .await?;
312
313        parent_store.remove_from_graveyard(&mut end_transaction, new_object_tree_layer_object_id);
314
315        // Move the existing layers we're compacting to the graveyard at the end.
316        for layer in &old_layers {
317            if let Some(handle) = layer.handle() {
318                parent_store.add_to_graveyard(&mut end_transaction, handle.object_id());
319            }
320        }
321
322        let old_encrypted_mutations_object_id =
323            std::mem::replace(&mut new_store_info.encrypted_mutations_object_id, INVALID_OBJECT_ID);
324        if old_encrypted_mutations_object_id != INVALID_OBJECT_ID {
325            parent_store.add_to_graveyard(&mut end_transaction, old_encrypted_mutations_object_id);
326        }
327
328        // `last_object_id` is updated differently to other members of `StoreInfo`.  We must ensure
329        // that those fields match the current in-memory values.  See the lengthy comment in
330        // `get_next_object_id` for more information.  `end_transaction` has a lock on the same lock
331        // that `get_next_object_id` uses, so there's no danger of the key changing now.
332        //
333        // This might capture object IDs that might be in transactions not yet committed.  In
334        // theory, we could do better than this but it's not worth the effort.
335        match &mut new_store_info.last_object_id {
336            LastObjectIdInfo::Unencrypted { id } => {
337                let LastObjectId::Unencrypted { id: in_memory_value } =
338                    &*self.last_object_id.lock()
339                else {
340                    unreachable!()
341                };
342                *id = *in_memory_value;
343            }
344            LastObjectIdInfo::Encrypted { id, key } => {
345                let LastObjectId::Encrypted { id: in_memory_value, .. } =
346                    &*self.last_object_id.lock()
347                else {
348                    unreachable!()
349                };
350                *id = *in_memory_value;
351                let guard = self.store_info.lock();
352                let current_store_info = guard.as_ref().unwrap();
353                let LastObjectIdInfo::Encrypted { key: in_memory_value, .. } =
354                    &current_store_info.last_object_id
355                else {
356                    unreachable!()
357                };
358                *key = in_memory_value.clone();
359            }
360            LastObjectIdInfo::Low32Bit => {}
361        }
362
363        self.write_store_info(&mut end_transaction, &new_store_info).await?;
364
365        let layer_file_sizes = new_layers
366            .iter()
367            .map(|l| l.handle().map(ReadObjectHandle::get_size).unwrap_or(0))
368            .collect::<Vec<u64>>();
369
370        let total_layer_size = layer_file_sizes.iter().sum();
371        reservation_update =
372            ReservationUpdate::new(tree::reservation_amount_from_layer_size(total_layer_size));
373
374        end_transaction.add_with_object(
375            self.store_object_id(),
376            Mutation::EndFlush,
377            AssocObj::Borrowed(&reservation_update),
378        );
379
380        if self.trace.load(Ordering::Relaxed) {
381            info!(
382                store_id = self.store_object_id(),
383                old_layer_count = old_layers.len(),
384                new_layer_count = new_layers.len(),
385                total_layer_size,
386                new_store_info:?;
387                "OS: compacting"
388            );
389        }
390
391        end_transaction
392            .commit_with_callback(|_| {
393                let mut store_info = self.store_info.lock();
394                let info = store_info.as_mut().unwrap();
395                info.layers = new_store_info.layers;
396                info.encrypted_mutations_object_id = new_store_info.encrypted_mutations_object_id;
397                info.mutations_cipher_offset = new_store_info.mutations_cipher_offset;
398                self.tree.set_layers(new_layers);
399            })
400            .await?;
401
402        // Now close the layers and purge them.
403        for layer in old_layers {
404            let object_id = layer.handle().map(|h| h.object_id());
405            layer.close_layer().await;
406            if let Some(object_id) = object_id {
407                parent_store.tombstone_object(object_id, txn_options).await?;
408            }
409        }
410
411        if old_encrypted_mutations_object_id != INVALID_OBJECT_ID {
412            parent_store.tombstone_object(old_encrypted_mutations_object_id, txn_options).await?;
413        }
414
415        Ok(FlushResult::Ok(layer_file_sizes))
416    }
417
418    // Flushes a locked store.
419    async fn flush_locked(&self, txn_guard: &TxnGuard<'_>) -> Result<(), Error> {
420        let filesystem = self.filesystem();
421        let object_manager = filesystem.object_manager();
422        let reservation = object_manager.metadata_reservation();
423        let txn_options = Options {
424            skip_journal_checks: true,
425            borrow_metadata_space: true,
426            allocator_reservation: Some(reservation),
427            txn_guard: Some(txn_guard),
428            ..Default::default()
429        };
430
431        let mut transaction = filesystem.clone().new_transaction(lock_keys![], txn_options).await?;
432        transaction.add(self.store_object_id(), Mutation::BeginFlush);
433        transaction.commit().await?;
434
435        let mut new_store_info = self.load_store_info().await?;
436
437        // There is a transaction to create objects at the start and then another transaction at the
438        // end. Between those two transactions, there are transactions that write to the files.  In
439        // the first transaction, objects are created in the graveyard. Upon success, the objects
440        // are removed from the graveyard.
441        let mut transaction = filesystem.clone().new_transaction(lock_keys![], txn_options).await?;
442
443        let reservation_update: ReservationUpdate; // Must live longer than end_transaction.
444        let handle; // Must live longer than end_transaction.
445        let mut end_transaction;
446
447        // We need to either write our encrypted mutations to a new file, or append them to an
448        // existing one.
449        let parent_store = self.parent_store.as_ref().unwrap();
450        handle = if new_store_info.encrypted_mutations_object_id == INVALID_OBJECT_ID {
451            let handle = ObjectStore::create_object(
452                parent_store,
453                &mut transaction,
454                HandleOptions { skip_journal_checks: true, ..Default::default() },
455                None,
456            )
457            .await?;
458            let oid = handle.object_id();
459            end_transaction = filesystem
460                .clone()
461                .new_transaction(
462                    lock_keys![
463                        LockKey::object(parent_store.store_object_id(), oid),
464                        LockKey::object(
465                            parent_store.store_object_id(),
466                            self.store_info_handle_object_id().unwrap(),
467                        ),
468                    ],
469                    txn_options,
470                )
471                .await?;
472            new_store_info.encrypted_mutations_object_id = oid;
473            parent_store.add_to_graveyard(&mut transaction, oid);
474            parent_store.remove_from_graveyard(&mut end_transaction, oid);
475            handle
476        } else {
477            end_transaction = filesystem
478                .clone()
479                .new_transaction(
480                    lock_keys![
481                        LockKey::object(
482                            parent_store.store_object_id(),
483                            new_store_info.encrypted_mutations_object_id,
484                        ),
485                        LockKey::object(
486                            parent_store.store_object_id(),
487                            self.store_info_handle_object_id().unwrap(),
488                        ),
489                    ],
490                    txn_options,
491                )
492                .await?;
493            ObjectStore::open_object(
494                parent_store,
495                new_store_info.encrypted_mutations_object_id,
496                HandleOptions { skip_journal_checks: true, ..Default::default() },
497                None,
498            )
499            .await?
500        };
501        transaction.commit().await?;
502
503        // Append the encrypted mutations, which need to be read from the journal.
504        // This assumes that the journal has no buffered mutations for this store (see Self::lock).
505        let journaled = filesystem
506            .journal()
507            .read_transactions_for_object(self.store_object_id)
508            .await
509            .context("Failed to read encrypted mutations from journal")?;
510        let mut buffer = handle.allocate_buffer(MAX_ENCRYPTED_MUTATIONS_SIZE).await;
511        let mut cursor = std::io::Cursor::new(buffer.as_mut_slice());
512        EncryptedMutations::from_replayed_mutations(self.store_object_id, journaled)
513            .serialize_with_version(&mut cursor)?;
514        let len = cursor.position() as usize;
515        handle.txn_write(&mut end_transaction, handle.get_size(), buffer.subslice(..len)).await?;
516
517        self.write_store_info(&mut end_transaction, &new_store_info).await?;
518
519        let mut total_layer_size = 0;
520        for &oid in &new_store_info.layers {
521            total_layer_size += parent_store.get_file_size(oid).await?;
522        }
523        total_layer_size +=
524            layer_size_from_encrypted_mutations_size(handle.get_size() + len as u64);
525
526        reservation_update =
527            ReservationUpdate::new(tree::reservation_amount_from_layer_size(total_layer_size));
528
529        end_transaction.add_with_object(
530            self.store_object_id(),
531            Mutation::EndFlush,
532            AssocObj::Borrowed(&reservation_update),
533        );
534
535        end_transaction.commit().await?;
536
537        Ok(())
538    }
539}
540
541#[cfg(test)]
542mod tests {
543    use crate::filesystem::{FxFilesystem, FxFilesystemBuilder, JournalingObject, SyncOptions};
544    use crate::object_handle::{INVALID_OBJECT_ID, ObjectHandle};
545    use crate::object_store::directory::Directory;
546    use crate::object_store::transaction::{Options, lock_keys};
547    use crate::object_store::volume::root_volume;
548    use crate::object_store::{
549        HandleOptions, LockKey, NO_OWNER, NewChildStoreOptions, ObjectStore, StoreOptions,
550        layer_size_from_encrypted_mutations_size, tree,
551    };
552    use fxfs_insecure_crypto::new_insecure_crypt;
553    use std::sync::Arc;
554    use storage_device::DeviceHolder;
555    use storage_device::fake_device::FakeDevice;
556
557    async fn run_key_roll_test(flush_before_unlock: bool) {
558        let device = DeviceHolder::new(FakeDevice::new(8192, 1024));
559        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
560        let store_id = {
561            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
562            root_volume
563                .new_volume(
564                    "test",
565                    NewChildStoreOptions {
566                        options: StoreOptions {
567                            crypt: Some(Arc::new(new_insecure_crypt())),
568                            ..StoreOptions::default()
569                        },
570                        ..NewChildStoreOptions::default()
571                    },
572                )
573                .await
574                .expect("new_volume failed")
575                .store_object_id()
576        };
577
578        fs.close().await.expect("close failed");
579        let device = fs.take_device().await;
580        device.reopen(false);
581
582        let fs = FxFilesystemBuilder::new()
583            .roll_metadata_key_byte_count(512 * 1024)
584            .open(device)
585            .await
586            .expect("open failed");
587
588        let (first_filename, last_filename) = {
589            let store = fs.object_manager().store(store_id).expect("store not found");
590            store.unlock(NO_OWNER, Arc::new(new_insecure_crypt())).await.expect("unlock failed");
591
592            // Keep writing until we notice the key has rolled.
593            let root_dir = Directory::open(&store, store.root_directory_object_id())
594                .await
595                .expect("open failed");
596
597            let mut last_mutations_cipher_offset = 0;
598            let mut i = 0;
599            let first_filename = format!("{:<200}", i);
600            loop {
601                let mut transaction = fs
602                    .clone()
603                    .new_transaction(
604                        lock_keys![LockKey::object(store_id, root_dir.object_id())],
605                        Options::default(),
606                    )
607                    .await
608                    .expect("new_transaction failed");
609                root_dir
610                    .create_child_file(&mut transaction, &format!("{:<200}", i))
611                    .await
612                    .expect("create_child_file failed");
613                i += 1;
614                transaction.commit().await.expect("commit failed");
615                let cipher_offset = store.mutations_cipher.lock().as_ref().unwrap().offset();
616                if cipher_offset < last_mutations_cipher_offset {
617                    break;
618                }
619                last_mutations_cipher_offset = cipher_offset;
620            }
621
622            // Sync now, so that we can be fairly certain that the next transaction *won't* trigger
623            // a store flush (so we'll still have something to flush when we reopen the filesystem).
624            fs.sync(SyncOptions::default()).await.expect("sync failed");
625
626            // Write one more file to ensure the cipher has a non-zero offset.
627            let mut transaction = fs
628                .clone()
629                .new_transaction(
630                    lock_keys![LockKey::object(store_id, root_dir.object_id())],
631                    Options::default(),
632                )
633                .await
634                .expect("new_transaction failed");
635            let last_filename = format!("{:<200}", i);
636            root_dir
637                .create_child_file(&mut transaction, &last_filename)
638                .await
639                .expect("create_child_file failed");
640            transaction.commit().await.expect("commit failed");
641            (first_filename, last_filename)
642        };
643
644        fs.close().await.expect("close failed");
645
646        // Reopen and make sure replay succeeds.
647        let device = fs.take_device().await;
648        device.reopen(false);
649        let fs = FxFilesystemBuilder::new()
650            .roll_metadata_key_byte_count(512 * 1024)
651            .open(device)
652            .await
653            .expect("open failed");
654
655        if flush_before_unlock {
656            // Flush before unlocking the store which will see that the encrypted mutations get
657            // written to a file.
658            fs.object_manager().flush().await.expect("flush failed");
659        }
660
661        {
662            let store = fs.object_manager().store(store_id).expect("store not found");
663            store.unlock(NO_OWNER, Arc::new(new_insecure_crypt())).await.expect("unlock failed");
664
665            // The key should get rolled when we unlock.
666            assert_eq!(store.mutations_cipher.lock().as_ref().unwrap().offset(), 0);
667
668            let root_dir = Directory::open(&store, store.root_directory_object_id())
669                .await
670                .expect("open failed");
671            root_dir
672                .lookup(&first_filename)
673                .await
674                .expect("Lookup failed")
675                .expect("First created file wasn't present");
676            root_dir
677                .lookup(&last_filename)
678                .await
679                .expect("Lookup failed")
680                .expect("Last created file wasn't present");
681        }
682    }
683
684    #[fuchsia::test(threads = 10)]
685    async fn test_metadata_key_roll() {
686        run_key_roll_test(/* flush_before_unlock: */ false).await;
687    }
688
689    #[fuchsia::test(threads = 10)]
690    async fn test_metadata_key_roll_with_flush_before_unlock() {
691        run_key_roll_test(/* flush_before_unlock: */ true).await;
692    }
693
694    #[fuchsia::test]
695    async fn test_flush_when_locked() {
696        let device = DeviceHolder::new(FakeDevice::new(8192, 1024));
697        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
698        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
699        let crypt = Arc::new(new_insecure_crypt());
700        let store = root_volume
701            .new_volume(
702                "test",
703                NewChildStoreOptions {
704                    options: StoreOptions { crypt: Some(crypt.clone()), ..StoreOptions::default() },
705                    ..NewChildStoreOptions::default()
706                },
707            )
708            .await
709            .expect("new_volume failed");
710        let root_dir =
711            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
712        let mut transaction = fs
713            .clone()
714            .new_transaction(
715                lock_keys![LockKey::object(store.store_object_id(), root_dir.object_id())],
716                Options::default(),
717            )
718            .await
719            .expect("new_transaction failed");
720        let foo = root_dir
721            .create_child_file(&mut transaction, "foo")
722            .await
723            .expect("create_child_file failed");
724        transaction.commit().await.expect("commit failed");
725
726        // When the volume is first created it will include a new mutations key but we want to test
727        // what happens when the encrypted mutations file doesn't contain a new mutations key, so we
728        // flush here.
729        store.flush().await.expect("flush failed");
730
731        let mut transaction = fs
732            .clone()
733            .new_transaction(
734                lock_keys![LockKey::object(store.store_object_id(), root_dir.object_id())],
735                Options::default(),
736            )
737            .await
738            .expect("new_transaction failed");
739        let bar = root_dir
740            .create_child_file(&mut transaction, "bar")
741            .await
742            .expect("create_child_file failed");
743        transaction.commit().await.expect("commit failed");
744
745        store.lock().await.expect("lock failed");
746
747        // Flushing the store whilst locked should create an encrypted mutations file.
748        store.flush().await.expect("flush failed");
749
750        // Check the reservation.
751        let info = store.load_store_info().await.unwrap();
752        let parent_store = store.parent_store().unwrap();
753        let mut total_layer_size = 0;
754        for &oid in &info.layers {
755            total_layer_size +=
756                parent_store.get_file_size(oid).await.expect("get_file_size failed");
757        }
758        assert_ne!(info.encrypted_mutations_object_id, INVALID_OBJECT_ID);
759        total_layer_size += layer_size_from_encrypted_mutations_size(
760            parent_store
761                .get_file_size(info.encrypted_mutations_object_id)
762                .await
763                .expect("get_file_size failed"),
764        );
765        assert_eq!(
766            fs.object_manager().reservation(store.store_object_id()),
767            Some(tree::reservation_amount_from_layer_size(total_layer_size))
768        );
769
770        // Unlocking the store should replay that encrypted mutations file.
771        store.unlock(NO_OWNER, crypt).await.expect("unlock failed");
772
773        ObjectStore::open_object(&store, foo.object_id(), HandleOptions::default(), None)
774            .await
775            .expect("open_object failed");
776
777        ObjectStore::open_object(&store, bar.object_id(), HandleOptions::default(), None)
778            .await
779            .expect("open_object failed");
780
781        fs.close().await.expect("close failed");
782    }
783}
784
785impl tree::MajorCompactable<ObjectKey, ObjectValue> for LSMTree<ObjectKey, ObjectValue> {
786    async fn major_iter(
787        iter: impl LayerIterator<ObjectKey, ObjectValue>,
788    ) -> Result<impl LayerIterator<ObjectKey, ObjectValue>, Error> {
789        iter.filter(|item: ItemRef<'_, _, _>| match item {
790            // Object Tombstone.
791            ItemRef { value: ObjectValue::None, .. } => false,
792            // Deleted extent.
793            ItemRef { value: ObjectValue::Extent(ExtentValue::None), .. } => false,
794            _ => true,
795        })
796        .await
797    }
798}