fxfs/object_store/
flush.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5// This module is responsible for flushing (a.k.a. compacting) the object store trees.
6
7use crate::filesystem::TxnGuard;
8use crate::log::*;
9use crate::lsm_tree::types::{ItemRef, LayerIterator};
10use crate::lsm_tree::{layers_from_handles, LSMTree};
11use crate::object_handle::{ObjectHandle, ReadObjectHandle, INVALID_OBJECT_ID};
12use crate::object_store::extent_record::ExtentValue;
13use crate::object_store::object_manager::{ObjectManager, ReservationUpdate};
14use crate::object_store::object_record::{ObjectKey, ObjectValue};
15use crate::object_store::transaction::{lock_keys, AssociatedObject, LockKey, Mutation};
16use crate::object_store::{
17    layer_size_from_encrypted_mutations_size, tree, AssocObj, DirectWriter, EncryptedMutations,
18    HandleOptions, LockState, ObjectStore, Options, StoreInfo, Transaction,
19    MAX_ENCRYPTED_MUTATIONS_SIZE,
20};
21use crate::serialized_types::{Version, VersionedLatest, LATEST_VERSION};
22use anyhow::{bail, Context, Error};
23use fxfs_crypto::KeyPurpose;
24use once_cell::sync::OnceCell;
25use std::sync::atomic::Ordering;
26
27#[derive(Copy, Clone, Debug, PartialEq, Eq)]
28pub enum Reason {
29    /// Journal memory or space pressure.
30    Journal,
31
32    /// After unlock and replay of encrypted mutations.
33    Unlock,
34}
35
36/// If flushing an unlocked fails due to a Crypt error, we want to re-lock the store and flush it
37/// again, so we can make progress on flushing (and therefore journal compaction) without depending
38/// on Crypt.  This is necessary because Crypt is an external component that might crash or become
39/// unresponsive.
40#[derive(Debug)]
41enum FlushResult<T> {
42    Ok(T),
43    CryptError(Error),
44}
45
46#[fxfs_trace::trace]
47impl ObjectStore {
48    #[trace("store_object_id" => self.store_object_id)]
49    pub async fn flush_with_reason(&self, reason: Reason) -> Result<Version, Error> {
50        // Loop to deal with Crypt errors.  If flushing fails due to a Crypt error, we re-lock the
51        // store and try again.  However, another task might racily unlock the store after we lock
52        // but before we flush (since we dropped the lock taken in `try_flush_with_reason`).
53        // We set a limit on the number of times we'll permit a retry, to avoid looping forever if
54        // the race condition is continuously hit.  In practice it is very unlikely to ever occur
55        // at all, since that would require something else busily unlocking the volume and the Crypt
56        // instance dying repeatedly.
57        const MAX_RETRIES: usize = 10;
58        let mut retries = 0;
59        loop {
60            match self.try_flush_with_reason(reason).await? {
61                FlushResult::Ok(version) => return Ok(version),
62                FlushResult::CryptError(error) => {
63                    if reason == Reason::Unlock || retries >= MAX_RETRIES {
64                        // If flushing due to unlock fails due to Crypt issues, just fail, so we
65                        // don't return to `unlock` with a locked store.
66                        return Err(error);
67                    }
68                    log::warn!(
69                        error:?;
70                        "Flushing failed for store {}, re-locking and trying again.",
71                        self.store_object_id()
72                    );
73                    let owner = self.lock_state.lock().owner();
74                    if let Some(owner) = owner {
75                        owner
76                            .force_lock(&self)
77                            .await
78                            .context("Failed to re-lock store during flush")?;
79                    } else {
80                        bail!("No store owner was registered!");
81                    }
82                }
83            }
84            retries += 1;
85        }
86    }
87
88    async fn try_flush_with_reason(&self, reason: Reason) -> Result<FlushResult<Version>, Error> {
89        if self.parent_store.is_none() {
90            // Early exit, but still return the earliest version used by a struct in the tree
91            return Ok(FlushResult::Ok(self.tree.get_earliest_version()));
92        }
93        let filesystem = self.filesystem();
94        let object_manager = filesystem.object_manager();
95
96        // We must take the transaction guard *before* we take the flush lock.
97        let txn_guard = filesystem.clone().txn_guard().await;
98        let keys = lock_keys![LockKey::flush(self.store_object_id())];
99        let _guard = Some(filesystem.lock_manager().write_lock(keys).await);
100
101        match reason {
102            Reason::Unlock => {
103                // If we're unlocking, only flush if there are encrypted mutations currently stored
104                // in a file.  We don't worry if they're in memory because a flush should get
105                // triggered when the journal gets full.
106                // Safe to unwrap store_info here because this was invoked from ObjectStore::unlock,
107                // so store_info is already accessible.
108                if self.store_info().unwrap().encrypted_mutations_object_id == INVALID_OBJECT_ID {
109                    // TODO(https://fxbug.dev/42179266): Add earliest_version support for encrypted
110                    // mutations.
111                    // Early exit, but still return the earliest version used by a struct in the
112                    // tree.
113                    return Ok(FlushResult::Ok(self.tree.get_earliest_version()));
114                }
115            }
116            Reason::Journal => {
117                // We flush if we have something to flush *or* the on-disk version of data is not
118                // the latest.
119                let earliest_version = self.tree.get_earliest_version();
120                if !object_manager.needs_flush(self.store_object_id)
121                    && earliest_version == LATEST_VERSION
122                {
123                    // Early exit, but still return the earliest version used by a struct in the
124                    // tree.
125                    return Ok(FlushResult::Ok(earliest_version));
126                }
127            }
128        }
129
130        let trace = self.trace.load(Ordering::Relaxed);
131        if trace {
132            info!(store_id = self.store_object_id(); "OS: begin flush");
133        }
134
135        let layer_file_sizes =
136            if matches!(&*self.lock_state.lock(), LockState::Locked) {
137                self.flush_locked(&txn_guard).await.with_context(|| {
138                    format!("Failed to flush object store {}", self.store_object_id)
139                })?;
140                None
141            } else {
142                match self.flush_unlocked(&txn_guard).await.with_context(|| {
143                    format!("Failed to flush object store {}", self.store_object_id)
144                })? {
145                    FlushResult::Ok(layer_file_sizes) => Some(layer_file_sizes),
146                    FlushResult::CryptError(error) => return Ok(FlushResult::CryptError(error)),
147                }
148            };
149
150        if trace {
151            info!(store_id = self.store_object_id(); "OS: end flush");
152        }
153        if let Some(callback) = &*self.flush_callback.lock() {
154            callback(self);
155        }
156
157        let mut counters = self.counters.lock();
158        counters.num_flushes += 1;
159        counters.last_flush_time = Some(std::time::SystemTime::now());
160        if let Some(layer_file_sizes) = layer_file_sizes {
161            counters.persistent_layer_file_sizes = layer_file_sizes;
162        }
163        // Return the earliest version used by a struct in the tree
164        Ok(FlushResult::Ok(self.tree.get_earliest_version()))
165    }
166
167    // Flushes an unlocked store. Returns the layer file sizes.
168    async fn flush_unlocked(
169        &self,
170        txn_guard: &TxnGuard<'_>,
171    ) -> Result<FlushResult<Vec<u64>>, Error> {
172        let roll_mutations_key = self
173            .mutations_cipher
174            .lock()
175            .as_ref()
176            .map(|cipher| {
177                cipher.offset() >= self.filesystem().options().roll_metadata_key_byte_count
178            })
179            .unwrap_or(false);
180        if roll_mutations_key {
181            if let Err(error) = self.roll_mutations_key(self.crypt().unwrap().as_ref()).await {
182                log::warn!(
183                    error:?; "Failed to roll mutations key for store {}", self.store_object_id());
184                return Ok(FlushResult::CryptError(error));
185            }
186        }
187
188        struct StoreInfoSnapshot<'a> {
189            store: &'a ObjectStore,
190            store_info: OnceCell<StoreInfo>,
191        }
192        impl AssociatedObject for StoreInfoSnapshot<'_> {
193            fn will_apply_mutation(
194                &self,
195                _mutation: &Mutation,
196                _object_id: u64,
197                _manager: &ObjectManager,
198            ) {
199                let mut store_info = self.store.store_info().unwrap();
200
201                // Capture the offset in the cipher stream.
202                let mutations_cipher = self.store.mutations_cipher.lock();
203                if let Some(cipher) = mutations_cipher.as_ref() {
204                    store_info.mutations_cipher_offset = cipher.offset();
205                }
206
207                // This will capture object IDs that might be in transactions not yet committed.  In
208                // theory, we could do better than this but it's not worth the effort.
209                store_info.last_object_id = self.store.last_object_id.lock().id;
210
211                self.store_info.set(store_info).unwrap();
212            }
213        }
214
215        let store_info_snapshot = StoreInfoSnapshot { store: self, store_info: OnceCell::new() };
216
217        let filesystem = self.filesystem();
218        let object_manager = filesystem.object_manager();
219        let reservation = object_manager.metadata_reservation();
220        let txn_options = Options {
221            skip_journal_checks: true,
222            borrow_metadata_space: true,
223            allocator_reservation: Some(reservation),
224            txn_guard: Some(txn_guard),
225            ..Default::default()
226        };
227
228        // The BeginFlush mutation must be within a transaction that has no impact on StoreInfo
229        // since we want to get an accurate snapshot of StoreInfo.
230        let mut transaction = filesystem.clone().new_transaction(lock_keys![], txn_options).await?;
231        transaction.add_with_object(
232            self.store_object_id(),
233            Mutation::BeginFlush,
234            AssocObj::Borrowed(&store_info_snapshot),
235        );
236        transaction.commit().await?;
237
238        let mut new_store_info = store_info_snapshot.store_info.into_inner().unwrap();
239
240        // There is a transaction to create objects at the start and then another transaction at the
241        // end. Between those two transactions, there are transactions that write to the files.  In
242        // the first transaction, objects are created in the graveyard. Upon success, the objects
243        // are removed from the graveyard.
244        let mut transaction = filesystem.clone().new_transaction(lock_keys![], txn_options).await?;
245
246        let reservation_update: ReservationUpdate; // Must live longer than end_transaction.
247        let mut end_transaction = filesystem
248            .clone()
249            .new_transaction(
250                lock_keys![LockKey::object(
251                    self.parent_store.as_ref().unwrap().store_object_id(),
252                    self.store_info_handle_object_id().unwrap(),
253                )],
254                txn_options,
255            )
256            .await?;
257
258        // Create and write a new layer, compacting existing layers.
259        let parent_store = self.parent_store.as_ref().unwrap();
260        let handle_options = HandleOptions { skip_journal_checks: true, ..Default::default() };
261        let new_object_tree_layer = if let Some(crypt) = self.crypt().as_deref() {
262            let object_id = parent_store.get_next_object_id(transaction.txn_guard()).await?;
263            let (key, unwrapped_key) = match crypt.create_key(object_id, KeyPurpose::Data).await {
264                Ok((key, unwrapped_key)) => (key, unwrapped_key),
265                Err(status) => {
266                    log::warn!(
267                        status:?;
268                        "Failed to create keys while flushing store {}",
269                        self.store_object_id(),
270                    );
271                    return Ok(FlushResult::CryptError(status.into()));
272                }
273            };
274            ObjectStore::create_object_with_key(
275                parent_store,
276                &mut transaction,
277                object_id,
278                handle_options,
279                key,
280                unwrapped_key,
281            )
282            .await?
283        } else {
284            ObjectStore::create_object(parent_store, &mut transaction, handle_options, None).await?
285        };
286        let writer = DirectWriter::new(&new_object_tree_layer, txn_options).await;
287        let new_object_tree_layer_object_id = new_object_tree_layer.object_id();
288        parent_store.add_to_graveyard(&mut transaction, new_object_tree_layer_object_id);
289        parent_store.remove_from_graveyard(&mut end_transaction, new_object_tree_layer_object_id);
290
291        transaction.commit().await?;
292        let (layers_to_keep, old_layers) =
293            tree::flush(&self.tree, writer).await.context("Failed to flush tree")?;
294
295        let mut new_layers = layers_from_handles([new_object_tree_layer]).await?;
296        new_layers.extend(layers_to_keep.iter().map(|l| (*l).clone()));
297
298        new_store_info.layers = Vec::new();
299        for layer in &new_layers {
300            if let Some(handle) = layer.handle() {
301                new_store_info.layers.push(handle.object_id());
302            }
303        }
304
305        // Move the existing layers we're compacting to the graveyard at the end.
306        for layer in &old_layers {
307            if let Some(handle) = layer.handle() {
308                parent_store.add_to_graveyard(&mut end_transaction, handle.object_id());
309            }
310        }
311
312        let old_encrypted_mutations_object_id =
313            std::mem::replace(&mut new_store_info.encrypted_mutations_object_id, INVALID_OBJECT_ID);
314        if old_encrypted_mutations_object_id != INVALID_OBJECT_ID {
315            parent_store.add_to_graveyard(&mut end_transaction, old_encrypted_mutations_object_id);
316        }
317
318        self.write_store_info(&mut end_transaction, &new_store_info).await?;
319
320        let layer_file_sizes = new_layers
321            .iter()
322            .map(|l| l.handle().map(ReadObjectHandle::get_size).unwrap_or(0))
323            .collect::<Vec<u64>>();
324
325        let total_layer_size = layer_file_sizes.iter().sum();
326        reservation_update =
327            ReservationUpdate::new(tree::reservation_amount_from_layer_size(total_layer_size));
328
329        end_transaction.add_with_object(
330            self.store_object_id(),
331            Mutation::EndFlush,
332            AssocObj::Borrowed(&reservation_update),
333        );
334
335        if self.trace.load(Ordering::Relaxed) {
336            info!(
337                store_id = self.store_object_id(),
338                old_layer_count = old_layers.len(),
339                new_layer_count = new_layers.len(),
340                total_layer_size,
341                new_store_info:?;
342                "OS: compacting"
343            );
344        }
345
346        end_transaction
347            .commit_with_callback(|_| {
348                let mut store_info = self.store_info.lock();
349                let info = store_info.as_mut().unwrap();
350                info.layers = new_store_info.layers;
351                info.encrypted_mutations_object_id = new_store_info.encrypted_mutations_object_id;
352                info.mutations_cipher_offset = new_store_info.mutations_cipher_offset;
353                self.tree.set_layers(new_layers);
354            })
355            .await?;
356
357        // Now close the layers and purge them.
358        for layer in old_layers {
359            let object_id = layer.handle().map(|h| h.object_id());
360            layer.close_layer().await;
361            if let Some(object_id) = object_id {
362                parent_store.tombstone_object(object_id, txn_options).await?;
363            }
364        }
365
366        if old_encrypted_mutations_object_id != INVALID_OBJECT_ID {
367            parent_store.tombstone_object(old_encrypted_mutations_object_id, txn_options).await?;
368        }
369
370        Ok(FlushResult::Ok(layer_file_sizes))
371    }
372
373    // Flushes a locked store.
374    async fn flush_locked(&self, txn_guard: &TxnGuard<'_>) -> Result<(), Error> {
375        let filesystem = self.filesystem();
376        let object_manager = filesystem.object_manager();
377        let reservation = object_manager.metadata_reservation();
378        let txn_options = Options {
379            skip_journal_checks: true,
380            borrow_metadata_space: true,
381            allocator_reservation: Some(reservation),
382            txn_guard: Some(txn_guard),
383            ..Default::default()
384        };
385
386        let mut transaction = filesystem.clone().new_transaction(lock_keys![], txn_options).await?;
387        transaction.add(self.store_object_id(), Mutation::BeginFlush);
388        transaction.commit().await?;
389
390        let mut new_store_info = self.load_store_info().await?;
391
392        // There is a transaction to create objects at the start and then another transaction at the
393        // end. Between those two transactions, there are transactions that write to the files.  In
394        // the first transaction, objects are created in the graveyard. Upon success, the objects
395        // are removed from the graveyard.
396        let mut transaction = filesystem.clone().new_transaction(lock_keys![], txn_options).await?;
397
398        let reservation_update: ReservationUpdate; // Must live longer than end_transaction.
399        let handle; // Must live longer than end_transaction.
400        let mut end_transaction;
401
402        // We need to either write our encrypted mutations to a new file, or append them to an
403        // existing one.
404        let parent_store = self.parent_store.as_ref().unwrap();
405        handle = if new_store_info.encrypted_mutations_object_id == INVALID_OBJECT_ID {
406            let handle = ObjectStore::create_object(
407                parent_store,
408                &mut transaction,
409                HandleOptions { skip_journal_checks: true, ..Default::default() },
410                None,
411            )
412            .await?;
413            let oid = handle.object_id();
414            end_transaction = filesystem
415                .clone()
416                .new_transaction(
417                    lock_keys![
418                        LockKey::object(parent_store.store_object_id(), oid),
419                        LockKey::object(
420                            parent_store.store_object_id(),
421                            self.store_info_handle_object_id().unwrap(),
422                        ),
423                    ],
424                    txn_options,
425                )
426                .await?;
427            new_store_info.encrypted_mutations_object_id = oid;
428            parent_store.add_to_graveyard(&mut transaction, oid);
429            parent_store.remove_from_graveyard(&mut end_transaction, oid);
430            handle
431        } else {
432            end_transaction = filesystem
433                .clone()
434                .new_transaction(
435                    lock_keys![
436                        LockKey::object(
437                            parent_store.store_object_id(),
438                            new_store_info.encrypted_mutations_object_id,
439                        ),
440                        LockKey::object(
441                            parent_store.store_object_id(),
442                            self.store_info_handle_object_id().unwrap(),
443                        ),
444                    ],
445                    txn_options,
446                )
447                .await?;
448            ObjectStore::open_object(
449                parent_store,
450                new_store_info.encrypted_mutations_object_id,
451                HandleOptions { skip_journal_checks: true, ..Default::default() },
452                None,
453            )
454            .await?
455        };
456        transaction.commit().await?;
457
458        // Append the encrypted mutations, which need to be read from the journal.
459        // This assumes that the journal has no buffered mutations for this store (see Self::lock).
460        let journaled = filesystem
461            .journal()
462            .read_transactions_for_object(self.store_object_id)
463            .await
464            .context("Failed to read encrypted mutations from journal")?;
465        let mut buffer = handle.allocate_buffer(MAX_ENCRYPTED_MUTATIONS_SIZE).await;
466        let mut cursor = std::io::Cursor::new(buffer.as_mut_slice());
467        EncryptedMutations::from_replayed_mutations(self.store_object_id, journaled)
468            .serialize_with_version(&mut cursor)?;
469        let len = cursor.position() as usize;
470        handle.txn_write(&mut end_transaction, handle.get_size(), buffer.subslice(..len)).await?;
471
472        self.write_store_info(&mut end_transaction, &new_store_info).await?;
473
474        let mut total_layer_size = 0;
475        for &oid in &new_store_info.layers {
476            total_layer_size += parent_store.get_file_size(oid).await?;
477        }
478        total_layer_size +=
479            layer_size_from_encrypted_mutations_size(handle.get_size() + len as u64);
480
481        reservation_update =
482            ReservationUpdate::new(tree::reservation_amount_from_layer_size(total_layer_size));
483
484        end_transaction.add_with_object(
485            self.store_object_id(),
486            Mutation::EndFlush,
487            AssocObj::Borrowed(&reservation_update),
488        );
489
490        end_transaction.commit().await?;
491
492        Ok(())
493    }
494
495    async fn write_store_info<'a>(
496        &'a self,
497        transaction: &mut Transaction<'a>,
498        new_store_info: &StoreInfo,
499    ) -> Result<(), Error> {
500        let mut serialized_info = Vec::new();
501        new_store_info.serialize_with_version(&mut serialized_info)?;
502        let mut buf = self.device.allocate_buffer(serialized_info.len()).await;
503        buf.as_mut_slice().copy_from_slice(&serialized_info[..]);
504
505        self.store_info_handle.get().unwrap().txn_write(transaction, 0u64, buf.as_ref()).await
506    }
507}
508
509#[cfg(test)]
510mod tests {
511    use crate::filesystem::{FxFilesystem, FxFilesystemBuilder, JournalingObject, SyncOptions};
512    use crate::object_handle::{ObjectHandle, INVALID_OBJECT_ID};
513    use crate::object_store::directory::Directory;
514    use crate::object_store::transaction::{lock_keys, Options};
515    use crate::object_store::volume::root_volume;
516    use crate::object_store::{
517        layer_size_from_encrypted_mutations_size, tree, HandleOptions, LockKey, ObjectStore,
518        NO_OWNER,
519    };
520    use fxfs_insecure_crypto::InsecureCrypt;
521    use std::sync::Arc;
522    use storage_device::fake_device::FakeDevice;
523    use storage_device::DeviceHolder;
524
525    async fn run_key_roll_test(flush_before_unlock: bool) {
526        let device = DeviceHolder::new(FakeDevice::new(8192, 1024));
527        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
528        let store_id = {
529            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
530            root_volume
531                .new_volume("test", NO_OWNER, Some(Arc::new(InsecureCrypt::new())))
532                .await
533                .expect("new_volume failed")
534                .store_object_id()
535        };
536
537        fs.close().await.expect("close failed");
538        let device = fs.take_device().await;
539        device.reopen(false);
540
541        let fs = FxFilesystemBuilder::new()
542            .roll_metadata_key_byte_count(512 * 1024)
543            .open(device)
544            .await
545            .expect("open failed");
546
547        let (first_filename, last_filename) = {
548            let store = fs.object_manager().store(store_id).expect("store not found");
549            store.unlock(NO_OWNER, Arc::new(InsecureCrypt::new())).await.expect("unlock failed");
550
551            // Keep writing until we notice the key has rolled.
552            let root_dir = Directory::open(&store, store.root_directory_object_id())
553                .await
554                .expect("open failed");
555
556            let mut last_mutations_cipher_offset = 0;
557            let mut i = 0;
558            let first_filename = format!("{:<200}", i);
559            loop {
560                let mut transaction = fs
561                    .clone()
562                    .new_transaction(
563                        lock_keys![LockKey::object(store_id, root_dir.object_id())],
564                        Options::default(),
565                    )
566                    .await
567                    .expect("new_transaction failed");
568                root_dir
569                    .create_child_file(&mut transaction, &format!("{:<200}", i))
570                    .await
571                    .expect("create_child_file failed");
572                i += 1;
573                transaction.commit().await.expect("commit failed");
574                let cipher_offset = store.mutations_cipher.lock().as_ref().unwrap().offset();
575                if cipher_offset < last_mutations_cipher_offset {
576                    break;
577                }
578                last_mutations_cipher_offset = cipher_offset;
579            }
580
581            // Sync now, so that we can be fairly certain that the next transaction *won't* trigger
582            // a store flush (so we'll still have something to flush when we reopen the filesystem).
583            fs.sync(SyncOptions::default()).await.expect("sync failed");
584
585            // Write one more file to ensure the cipher has a non-zero offset.
586            let mut transaction = fs
587                .clone()
588                .new_transaction(
589                    lock_keys![LockKey::object(store_id, root_dir.object_id())],
590                    Options::default(),
591                )
592                .await
593                .expect("new_transaction failed");
594            let last_filename = format!("{:<200}", i);
595            root_dir
596                .create_child_file(&mut transaction, &last_filename)
597                .await
598                .expect("create_child_file failed");
599            transaction.commit().await.expect("commit failed");
600            (first_filename, last_filename)
601        };
602
603        fs.close().await.expect("close failed");
604
605        // Reopen and make sure replay succeeds.
606        let device = fs.take_device().await;
607        device.reopen(false);
608        let fs = FxFilesystemBuilder::new()
609            .roll_metadata_key_byte_count(512 * 1024)
610            .open(device)
611            .await
612            .expect("open failed");
613
614        if flush_before_unlock {
615            // Flush before unlocking the store which will see that the encrypted mutations get
616            // written to a file.
617            fs.object_manager().flush().await.expect("flush failed");
618        }
619
620        {
621            let store = fs.object_manager().store(store_id).expect("store not found");
622            store.unlock(NO_OWNER, Arc::new(InsecureCrypt::new())).await.expect("unlock failed");
623
624            // The key should get rolled when we unlock.
625            assert_eq!(store.mutations_cipher.lock().as_ref().unwrap().offset(), 0);
626
627            let root_dir = Directory::open(&store, store.root_directory_object_id())
628                .await
629                .expect("open failed");
630            root_dir
631                .lookup(&first_filename)
632                .await
633                .expect("Lookup failed")
634                .expect("First created file wasn't present");
635            root_dir
636                .lookup(&last_filename)
637                .await
638                .expect("Lookup failed")
639                .expect("Last created file wasn't present");
640        }
641    }
642
643    #[fuchsia::test(threads = 10)]
644    async fn test_metadata_key_roll() {
645        run_key_roll_test(/* flush_before_unlock: */ false).await;
646    }
647
648    #[fuchsia::test(threads = 10)]
649    async fn test_metadata_key_roll_with_flush_before_unlock() {
650        run_key_roll_test(/* flush_before_unlock: */ true).await;
651    }
652
653    #[fuchsia::test]
654    async fn test_flush_when_locked() {
655        let device = DeviceHolder::new(FakeDevice::new(8192, 1024));
656        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
657        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
658        let crypt = Arc::new(InsecureCrypt::new());
659        let store = root_volume
660            .new_volume("test", NO_OWNER, Some(crypt.clone()))
661            .await
662            .expect("new_volume failed");
663        let root_dir =
664            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
665        let mut transaction = fs
666            .clone()
667            .new_transaction(
668                lock_keys![LockKey::object(store.store_object_id(), root_dir.object_id())],
669                Options::default(),
670            )
671            .await
672            .expect("new_transaction failed");
673        let foo = root_dir
674            .create_child_file(&mut transaction, "foo")
675            .await
676            .expect("create_child_file failed");
677        transaction.commit().await.expect("commit failed");
678
679        // When the volume is first created it will include a new mutations key but we want to test
680        // what happens when the encrypted mutations file doesn't contain a new mutations key, so we
681        // flush here.
682        store.flush().await.expect("flush failed");
683
684        let mut transaction = fs
685            .clone()
686            .new_transaction(
687                lock_keys![LockKey::object(store.store_object_id(), root_dir.object_id())],
688                Options::default(),
689            )
690            .await
691            .expect("new_transaction failed");
692        let bar = root_dir
693            .create_child_file(&mut transaction, "bar")
694            .await
695            .expect("create_child_file failed");
696        transaction.commit().await.expect("commit failed");
697
698        store.lock().await.expect("lock failed");
699
700        // Flushing the store whilst locked should create an encrypted mutations file.
701        store.flush().await.expect("flush failed");
702
703        // Check the reservation.
704        let info = store.load_store_info().await.unwrap();
705        let parent_store = store.parent_store().unwrap();
706        let mut total_layer_size = 0;
707        for &oid in &info.layers {
708            total_layer_size +=
709                parent_store.get_file_size(oid).await.expect("get_file_size failed");
710        }
711        assert_ne!(info.encrypted_mutations_object_id, INVALID_OBJECT_ID);
712        total_layer_size += layer_size_from_encrypted_mutations_size(
713            parent_store
714                .get_file_size(info.encrypted_mutations_object_id)
715                .await
716                .expect("get_file_size failed"),
717        );
718        assert_eq!(
719            fs.object_manager().reservation(store.store_object_id()),
720            Some(tree::reservation_amount_from_layer_size(total_layer_size))
721        );
722
723        // Unlocking the store should replay that encrypted mutations file.
724        store.unlock(NO_OWNER, crypt).await.expect("unlock failed");
725
726        ObjectStore::open_object(&store, foo.object_id(), HandleOptions::default(), None)
727            .await
728            .expect("open_object failed");
729
730        ObjectStore::open_object(&store, bar.object_id(), HandleOptions::default(), None)
731            .await
732            .expect("open_object failed");
733
734        fs.close().await.expect("close failed");
735    }
736}
737
738impl tree::MajorCompactable<ObjectKey, ObjectValue> for LSMTree<ObjectKey, ObjectValue> {
739    async fn major_iter(
740        iter: impl LayerIterator<ObjectKey, ObjectValue>,
741    ) -> Result<impl LayerIterator<ObjectKey, ObjectValue>, Error> {
742        iter.filter(|item: ItemRef<'_, _, _>| match item {
743            // Object Tombstone.
744            ItemRef { value: ObjectValue::None, .. } => false,
745            // Deleted extent.
746            ItemRef { value: ObjectValue::Extent(ExtentValue::None), .. } => false,
747            _ => true,
748        })
749        .await
750    }
751}