Skip to main content

fxfs/object_store/
flush.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5// This module is responsible for flushing (a.k.a. compacting) the object store trees.
6
7use crate::errors::FxfsError;
8use crate::log::*;
9use crate::lsm_tree::types::{ItemRef, LayerIterator};
10use crate::lsm_tree::{LSMTree, layers_from_handles};
11use crate::object_handle::{INVALID_OBJECT_ID, ObjectHandle, ReadObjectHandle};
12use crate::object_store::extent_record::ExtentValue;
13use crate::object_store::object_manager::{ObjectManager, ReservationUpdate};
14use crate::object_store::object_record::{ObjectKey, ObjectValue};
15use crate::object_store::transaction::{AssociatedObject, LockKey, Mutation, lock_keys};
16use crate::object_store::{
17    AssocObj, DirectWriter, EncryptedMutations, HandleOptions, LastObjectId, LastObjectIdInfo,
18    LockState, MAX_ENCRYPTED_MUTATIONS_SIZE, ObjectStore, Options, ReservedId, StoreInfo,
19    layer_size_from_encrypted_mutations_size, tree,
20};
21use crate::serialized_types::{LATEST_VERSION, Version, VersionedLatest};
22use anyhow::{Context, Error, anyhow};
23use std::sync::OnceLock;
24use std::sync::atomic::Ordering;
25
26#[derive(Copy, Clone, Debug, PartialEq, Eq)]
27pub enum Reason {
28    /// Journal memory or space pressure.
29    Journal,
30
31    /// After unlock and replay of encrypted mutations.
32    Unlock,
33}
34
35#[fxfs_trace::trace]
36impl ObjectStore {
37    #[trace("store_object_id" => self.store_object_id)]
38    pub async fn flush_with_reason(&self, reason: Reason) -> Result<Version, Error> {
39        if self.parent_store.is_none() {
40            // Early exit, but still return the earliest version used by a struct in the tree
41            return Ok(self.tree.get_earliest_version());
42        }
43        let filesystem = self.filesystem();
44        let object_manager = filesystem.object_manager();
45
46        let keys = lock_keys![LockKey::flush(self.store_object_id())];
47        let _guard = Some(filesystem.lock_manager().write_lock(keys).await);
48
49        // After taking the lock, check to see if the store has been deleted.
50        if matches!(*self.lock_state.lock(), LockState::Deleted) {
51            // When we compact, it's possible that the store has been deleted since we gathered the
52            // list of stores that need compacting.  This is benign.
53            return Ok(LATEST_VERSION);
54        }
55
56        match reason {
57            Reason::Unlock => {
58                // If we're unlocking, only flush if there are encrypted mutations currently stored
59                // in a file.  We don't worry if they're in memory because a flush should get
60                // triggered when the journal gets full.
61                // Safe to unwrap store_info here because this was invoked from ObjectStore::unlock,
62                // so store_info is already accessible.
63                if self.store_info().unwrap().encrypted_mutations_object_id == INVALID_OBJECT_ID {
64                    // TODO(https://fxbug.dev/42179266): Add earliest_version support for encrypted
65                    // mutations.
66                    // Early exit, but still return the earliest version used by a struct in the
67                    // tree.
68                    return Ok(self.tree.get_earliest_version());
69                }
70            }
71            Reason::Journal => {
72                // We flush if we have something to flush *or* the on-disk version of data is not
73                // the latest.
74                let earliest_version = self.tree.get_earliest_version();
75                if !object_manager.needs_flush(self.store_object_id)
76                    && earliest_version == LATEST_VERSION
77                {
78                    // Early exit, but still return the earliest version used by a struct in the
79                    // tree.
80                    return Ok(earliest_version);
81                }
82            }
83        }
84
85        let trace = self.trace.load(Ordering::Relaxed);
86        if trace {
87            info!(store_id = self.store_object_id(); "OS: begin flush");
88        }
89
90        if matches!(&*self.lock_state.lock(), LockState::Locked) {
91            self.flush_locked().await.with_context(|| {
92                format!("Failed to flush object store {}", self.store_object_id)
93            })?;
94        } else {
95            self.flush_unlocked(reason).await.with_context(|| {
96                format!("Failed to flush object store {}", self.store_object_id)
97            })?;
98        }
99
100        if trace {
101            info!(store_id = self.store_object_id(); "OS: end flush");
102        }
103        if let Some(callback) = &*self.flush_callback.lock() {
104            callback(self);
105        }
106
107        let mut counters = self.counters.lock();
108        counters.num_flushes += 1;
109        counters.last_flush_time = Some(std::time::SystemTime::now());
110        // Return the earliest version used by a struct in the tree
111        Ok(self.tree.get_earliest_version())
112    }
113
114    // Flushes an unlocked store. Returns the layer file sizes.
115    async fn flush_unlocked(&self, reason: Reason) -> Result<Vec<u64>, Error> {
116        struct StoreInfoSnapshot<'a> {
117            store: &'a ObjectStore,
118            store_info: OnceLock<StoreInfo>,
119        }
120        impl AssociatedObject for StoreInfoSnapshot<'_> {
121            fn will_apply_mutation(
122                &self,
123                _mutation: &Mutation,
124                _object_id: u64,
125                _manager: &ObjectManager,
126            ) {
127                let mut store_info = self.store.store_info().unwrap();
128
129                // Capture the offset in the cipher stream.
130                let mutations_cipher = self.store.mutations_cipher.lock();
131                if let Some(cipher) = mutations_cipher.as_ref() {
132                    store_info.mutations_cipher_offset = cipher.offset();
133                }
134
135                self.store_info.set(store_info).unwrap();
136            }
137        }
138
139        let store_info_snapshot = StoreInfoSnapshot { store: self, store_info: OnceLock::new() };
140
141        let filesystem = self.filesystem();
142        let object_manager = filesystem.object_manager();
143        let reservation = object_manager.metadata_reservation();
144        let txn_options = Options {
145            skip_journal_checks: true,
146            skip_key_roll: true,
147            borrow_metadata_space: true,
148            allocator_reservation: Some(reservation),
149            ..Default::default()
150        };
151
152        // The BeginFlush mutation must be within a transaction that has no impact on StoreInfo
153        // since we want to get an accurate snapshot of StoreInfo.
154        let mut transaction = self.new_transaction(lock_keys![], txn_options).await?;
155        transaction.add_with_object(
156            self.store_object_id(),
157            Mutation::BeginFlush,
158            AssocObj::Borrowed(&store_info_snapshot),
159        );
160        transaction.commit().await?;
161
162        let mut new_store_info = store_info_snapshot.store_info.into_inner().unwrap();
163
164        // There is a transaction to create objects at the start and then another transaction at the
165        // end. Between those two transactions, there are transactions that write to the files.  In
166        // the first transaction, objects are created in the graveyard. Upon success, the objects
167        // are removed from the graveyard.
168        let mut transaction = self.new_transaction(lock_keys![], txn_options).await?;
169
170        // Create and write a new layer, compacting existing layers.
171        let parent_store = self.parent_store.as_ref().unwrap();
172        let handle_options = HandleOptions { skip_journal_checks: true, ..Default::default() };
173        let id_and_key = {
174            let mut lock_state = self.lock_state.lock();
175            match &mut *lock_state {
176                LockState::Unlocked { cached_keys, .. } => {
177                    if let Some(item) = cached_keys.pop() {
178                        Ok(Some(item))
179                    } else {
180                        log::warn!("No cached keys for flush for store {}", self.store_object_id());
181                        Err(anyhow!(FxfsError::Internal).context("No cached keys for flush"))
182                    }
183                }
184                LockState::UnlockedReadOnly(..) => {
185                    Err(anyhow!(FxfsError::Internal).context("Flush on read-only store"))
186                }
187                LockState::Unencrypted => Ok(None),
188                _ => Err(anyhow!(FxfsError::Internal))
189                    .with_context(|| format!("Invalid lock state ({:?}) for flush", *lock_state)),
190            }
191        }?;
192
193        let new_object_tree_layer = if let Some((raw_id, key, unwrapped_key)) = id_and_key {
194            let object_id = ReservedId::new(parent_store, raw_id);
195            ObjectStore::create_object_with_key(
196                parent_store,
197                &mut transaction,
198                object_id,
199                handle_options,
200                key,
201                unwrapped_key,
202            )
203            .await?
204        } else {
205            ObjectStore::create_object(parent_store, &mut transaction, handle_options, None).await?
206        };
207        let writer = DirectWriter::new(&new_object_tree_layer, txn_options).await;
208        let new_object_tree_layer_object_id = new_object_tree_layer.object_id();
209        parent_store.add_to_graveyard(&mut transaction, new_object_tree_layer_object_id);
210
211        transaction.commit().await?;
212
213        // *Do* the actual compaction.
214        let (layers_to_keep, old_layers) = tree::flush(
215            &self.tree,
216            writer,
217            (reason == Reason::Journal).then(|| filesystem.journal().get_compaction_yielder()),
218        )
219        .await
220        .context("Failed to flush tree")?;
221
222        // Finalise the compaction.
223        let mut new_layers = layers_from_handles([new_object_tree_layer]).await?;
224        new_layers.extend(layers_to_keep.iter().map(|l| (*l).clone()));
225
226        new_store_info.layers = Vec::new();
227        for layer in &new_layers {
228            if let Some(handle) = layer.handle() {
229                new_store_info.layers.push(handle.object_id());
230            }
231        }
232
233        let reservation_update: ReservationUpdate; // Must live longer than end_transaction.
234        let mut end_transaction = parent_store
235            .new_transaction(
236                lock_keys![LockKey::object(
237                    self.parent_store.as_ref().unwrap().store_object_id(),
238                    self.store_info_handle_object_id().unwrap(),
239                )],
240                txn_options,
241            )
242            .await?;
243
244        parent_store.remove_from_graveyard(&mut end_transaction, new_object_tree_layer_object_id);
245
246        // Move the existing layers we're compacting to the graveyard at the end.
247        for layer in &old_layers {
248            if let Some(handle) = layer.handle() {
249                parent_store.add_to_graveyard(&mut end_transaction, handle.object_id());
250            }
251        }
252
253        let old_encrypted_mutations_object_id =
254            std::mem::replace(&mut new_store_info.encrypted_mutations_object_id, INVALID_OBJECT_ID);
255        if old_encrypted_mutations_object_id != INVALID_OBJECT_ID {
256            parent_store.add_to_graveyard(&mut end_transaction, old_encrypted_mutations_object_id);
257        }
258
259        // `last_object_id` is updated differently to other members of `StoreInfo`.  We must ensure
260        // that those fields match the current in-memory values.  See the lengthy comment in
261        // `get_next_object_id` for more information.  `end_transaction` has a lock on the same lock
262        // that `get_next_object_id` uses, so there's no danger of the key changing now.
263        //
264        // This might capture object IDs that might be in transactions not yet committed.  In
265        // theory, we could do better than this but it's not worth the effort.
266        match &mut new_store_info.last_object_id {
267            LastObjectIdInfo::Unencrypted { id } => {
268                let LastObjectId::Unencrypted { id: in_memory_value } =
269                    &*self.last_object_id.lock()
270                else {
271                    unreachable!()
272                };
273                *id = *in_memory_value;
274            }
275            LastObjectIdInfo::Encrypted { id, key } => {
276                let LastObjectId::Encrypted { id: in_memory_value, .. } =
277                    &*self.last_object_id.lock()
278                else {
279                    unreachable!()
280                };
281                *id = *in_memory_value;
282                let guard = self.store_info.lock();
283                let current_store_info = guard.as_ref().unwrap();
284                let LastObjectIdInfo::Encrypted { key: in_memory_value, .. } =
285                    &current_store_info.last_object_id
286                else {
287                    unreachable!()
288                };
289                *key = in_memory_value.clone();
290            }
291            LastObjectIdInfo::Low32Bit => {}
292        }
293
294        self.write_store_info(&mut end_transaction, &new_store_info).await?;
295
296        let layer_file_sizes = new_layers
297            .iter()
298            .map(|l| l.handle().map(ReadObjectHandle::get_size).unwrap_or(0))
299            .collect::<Vec<u64>>();
300
301        let total_layer_size = layer_file_sizes.iter().sum();
302        reservation_update =
303            ReservationUpdate::new(tree::reservation_amount_from_layer_size(total_layer_size));
304
305        end_transaction.add_with_object(
306            self.store_object_id(),
307            Mutation::EndFlush,
308            AssocObj::Borrowed(&reservation_update),
309        );
310
311        if self.trace.load(Ordering::Relaxed) {
312            info!(
313                store_id = self.store_object_id(),
314                old_layer_count = old_layers.len(),
315                new_layer_count = new_layers.len(),
316                total_layer_size,
317                new_store_info:?;
318                "OS: compacting"
319            );
320        }
321
322        end_transaction
323            .commit_with_callback(|_| {
324                let mut store_info = self.store_info.lock();
325                let info = store_info.as_mut().unwrap();
326                info.layers = new_store_info.layers;
327                info.encrypted_mutations_object_id = new_store_info.encrypted_mutations_object_id;
328                info.mutations_cipher_offset = new_store_info.mutations_cipher_offset;
329                self.tree.set_layers(new_layers);
330            })
331            .await?;
332
333        // Now close the layers and purge them.
334        for layer in old_layers {
335            let object_id = layer.handle().map(|h| h.object_id());
336            layer.close_layer().await;
337            if let Some(object_id) = object_id {
338                parent_store.tombstone_object(object_id, txn_options).await?;
339            }
340        }
341
342        if old_encrypted_mutations_object_id != INVALID_OBJECT_ID {
343            parent_store.tombstone_object(old_encrypted_mutations_object_id, txn_options).await?;
344        }
345
346        Ok(layer_file_sizes)
347    }
348
349    // Flushes a locked store.
350    async fn flush_locked(&self) -> Result<(), Error> {
351        let filesystem = self.filesystem();
352        let object_manager = filesystem.object_manager();
353        let reservation = object_manager.metadata_reservation();
354        let txn_options = Options {
355            skip_journal_checks: true,
356            skip_key_roll: true,
357            borrow_metadata_space: true,
358            allocator_reservation: Some(reservation),
359            ..Default::default()
360        };
361
362        let mut transaction = self.new_transaction(lock_keys![], txn_options).await?;
363        transaction.add(self.store_object_id(), Mutation::BeginFlush);
364        transaction.commit().await?;
365
366        let mut new_store_info = self.load_store_info().await?;
367
368        // There is a transaction to create objects at the start and then another transaction at the
369        // end. Between those two transactions, there are transactions that write to the files.  In
370        // the first transaction, objects are created in the graveyard. Upon success, the objects
371        // are removed from the graveyard.
372        let mut transaction = self.new_transaction(lock_keys![], txn_options).await?;
373
374        let reservation_update: ReservationUpdate; // Must live longer than end_transaction.
375        let handle; // Must live longer than end_transaction.
376        let mut end_transaction;
377
378        // We need to either write our encrypted mutations to a new file, or append them to an
379        // existing one.
380        let parent_store = self.parent_store.as_ref().unwrap();
381        handle = if new_store_info.encrypted_mutations_object_id == INVALID_OBJECT_ID {
382            let handle = ObjectStore::create_object(
383                parent_store,
384                &mut transaction,
385                HandleOptions { skip_journal_checks: true, ..Default::default() },
386                None,
387            )
388            .await?;
389            let oid = handle.object_id();
390            end_transaction = parent_store
391                .new_transaction(
392                    lock_keys![
393                        LockKey::object(parent_store.store_object_id(), oid),
394                        LockKey::object(
395                            parent_store.store_object_id(),
396                            self.store_info_handle_object_id().unwrap(),
397                        ),
398                    ],
399                    txn_options,
400                )
401                .await?;
402            new_store_info.encrypted_mutations_object_id = oid;
403            parent_store.add_to_graveyard(&mut transaction, oid);
404            parent_store.remove_from_graveyard(&mut end_transaction, oid);
405            handle
406        } else {
407            end_transaction = parent_store
408                .new_transaction(
409                    lock_keys![
410                        LockKey::object(
411                            parent_store.store_object_id(),
412                            new_store_info.encrypted_mutations_object_id,
413                        ),
414                        LockKey::object(
415                            parent_store.store_object_id(),
416                            self.store_info_handle_object_id().unwrap(),
417                        ),
418                    ],
419                    txn_options,
420                )
421                .await?;
422            ObjectStore::open_object(
423                parent_store,
424                new_store_info.encrypted_mutations_object_id,
425                HandleOptions { skip_journal_checks: true, ..Default::default() },
426                None,
427            )
428            .await?
429        };
430        transaction.commit().await?;
431
432        // Append the encrypted mutations, which need to be read from the journal.
433        // This assumes that the journal has no buffered mutations for this store (see Self::lock).
434        let journaled = filesystem
435            .journal()
436            .read_transactions_for_object(self.store_object_id)
437            .await
438            .context("Failed to read encrypted mutations from journal")?;
439        let mut buffer = handle.allocate_buffer(MAX_ENCRYPTED_MUTATIONS_SIZE).await;
440        let mut cursor = std::io::Cursor::new(buffer.as_mut_slice());
441        EncryptedMutations::from_replayed_mutations(self.store_object_id, journaled)
442            .serialize_with_version(&mut cursor)?;
443        let len = cursor.position() as usize;
444        handle.txn_write(&mut end_transaction, handle.get_size(), buffer.subslice(..len)).await?;
445
446        self.write_store_info(&mut end_transaction, &new_store_info).await?;
447
448        let mut total_layer_size = 0;
449        for &oid in &new_store_info.layers {
450            total_layer_size += parent_store.get_file_size(oid).await?;
451        }
452        total_layer_size +=
453            layer_size_from_encrypted_mutations_size(handle.get_size() + len as u64);
454
455        reservation_update =
456            ReservationUpdate::new(tree::reservation_amount_from_layer_size(total_layer_size));
457
458        end_transaction.add_with_object(
459            self.store_object_id(),
460            Mutation::EndFlush,
461            AssocObj::Borrowed(&reservation_update),
462        );
463
464        end_transaction.commit().await?;
465
466        Ok(())
467    }
468}
469
470#[cfg(test)]
471mod tests {
472    use crate::filesystem::{FxFilesystem, FxFilesystemBuilder, JournalingObject, SyncOptions};
473    use crate::object_handle::{INVALID_OBJECT_ID, ObjectHandle};
474    use crate::object_store::directory::Directory;
475    use crate::object_store::transaction::{Options, lock_keys};
476    use crate::object_store::volume::root_volume;
477    use crate::object_store::{
478        HandleOptions, LockKey, NewChildStoreOptions, ObjectStore, StoreOptions,
479        layer_size_from_encrypted_mutations_size, tree,
480    };
481    use fxfs_insecure_crypto::new_insecure_crypt;
482    use std::sync::Arc;
483    use storage_device::DeviceHolder;
484    use storage_device::fake_device::FakeDevice;
485
486    async fn run_key_roll_test(flush_before_unlock: bool) {
487        let device = DeviceHolder::new(FakeDevice::new(8192, 1024));
488        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
489        let store_id = {
490            let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
491            root_volume
492                .new_volume(
493                    "test",
494                    NewChildStoreOptions {
495                        options: StoreOptions {
496                            crypt: Some(Arc::new(new_insecure_crypt())),
497                            ..StoreOptions::default()
498                        },
499                        ..NewChildStoreOptions::default()
500                    },
501                )
502                .await
503                .expect("new_volume failed")
504                .store_object_id()
505        };
506
507        fs.close().await.expect("close failed");
508        let device = fs.take_device().await;
509        device.reopen(false);
510
511        let fs = FxFilesystemBuilder::new()
512            .roll_metadata_key_byte_count(512 * 1024)
513            .open(device)
514            .await
515            .expect("open failed");
516
517        let (first_filename, last_filename) = {
518            let store = fs.object_manager().store(store_id).expect("store not found");
519            store.unlock(Arc::new(new_insecure_crypt())).await.expect("unlock failed");
520
521            // Keep writing until we notice the key has rolled.
522            let root_dir = Directory::open(&store, store.root_directory_object_id())
523                .await
524                .expect("open failed");
525
526            let mut last_mutations_cipher_offset = 0;
527            let mut i = 0;
528            let first_filename = format!("{:<200}", i);
529            loop {
530                let mut transaction = store
531                    .new_transaction(
532                        lock_keys![LockKey::object(store_id, root_dir.object_id())],
533                        Options::default(),
534                    )
535                    .await
536                    .expect("new_transaction failed");
537                root_dir
538                    .create_child_file(&mut transaction, &format!("{:<200}", i))
539                    .await
540                    .expect("create_child_file failed");
541                i += 1;
542                transaction.commit().await.expect("commit failed");
543                let cipher_offset = store.mutations_cipher.lock().as_ref().unwrap().offset();
544                if cipher_offset < last_mutations_cipher_offset {
545                    break;
546                }
547                last_mutations_cipher_offset = cipher_offset;
548            }
549
550            // Sync now, so that we can be fairly certain that the next transaction *won't* trigger
551            // a store flush (so we'll still have something to flush when we reopen the filesystem).
552            fs.sync(SyncOptions::default()).await.expect("sync failed");
553
554            // Write one more file to ensure the cipher has a non-zero offset.
555            let mut transaction = store
556                .new_transaction(
557                    lock_keys![LockKey::object(store_id, root_dir.object_id())],
558                    Options::default(),
559                )
560                .await
561                .expect("new_transaction failed");
562            let last_filename = format!("{:<200}", i);
563            root_dir
564                .create_child_file(&mut transaction, &last_filename)
565                .await
566                .expect("create_child_file failed");
567            transaction.commit().await.expect("commit failed");
568            (first_filename, last_filename)
569        };
570
571        fs.close().await.expect("close failed");
572
573        // Reopen and make sure replay succeeds.
574        let device = fs.take_device().await;
575        device.reopen(false);
576        let fs = FxFilesystemBuilder::new()
577            .roll_metadata_key_byte_count(512 * 1024)
578            .open(device)
579            .await
580            .expect("open failed");
581
582        if flush_before_unlock {
583            // Flush before unlocking the store which will see that the encrypted mutations get
584            // written to a file.
585            fs.object_manager().flush().await.expect("flush failed");
586        }
587
588        {
589            let store = fs.object_manager().store(store_id).expect("store not found");
590            store.unlock(Arc::new(new_insecure_crypt())).await.expect("unlock failed");
591
592            // The key should get rolled when we unlock.
593            assert_eq!(store.mutations_cipher.lock().as_ref().unwrap().offset(), 0);
594
595            let root_dir = Directory::open(&store, store.root_directory_object_id())
596                .await
597                .expect("open failed");
598            root_dir
599                .lookup(&first_filename)
600                .await
601                .expect("Lookup failed")
602                .expect("First created file wasn't present");
603            root_dir
604                .lookup(&last_filename)
605                .await
606                .expect("Lookup failed")
607                .expect("Last created file wasn't present");
608        }
609    }
610
611    #[fuchsia::test(threads = 10)]
612    async fn test_metadata_key_roll() {
613        run_key_roll_test(/* flush_before_unlock: */ false).await;
614    }
615
616    #[fuchsia::test(threads = 10)]
617    async fn test_metadata_key_roll_with_flush_before_unlock() {
618        run_key_roll_test(/* flush_before_unlock: */ true).await;
619    }
620
621    #[fuchsia::test]
622    async fn test_flush_when_locked() {
623        let device = DeviceHolder::new(FakeDevice::new(8192, 1024));
624        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
625        let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
626        let crypt = Arc::new(new_insecure_crypt());
627        let store = root_volume
628            .new_volume(
629                "test",
630                NewChildStoreOptions {
631                    options: StoreOptions { crypt: Some(crypt.clone()), ..StoreOptions::default() },
632                    ..NewChildStoreOptions::default()
633                },
634            )
635            .await
636            .expect("new_volume failed");
637        let root_dir =
638            Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
639        let mut transaction = fs
640            .root_store()
641            .new_transaction(
642                lock_keys![LockKey::object(store.store_object_id(), root_dir.object_id())],
643                Options::default(),
644            )
645            .await
646            .expect("new_transaction failed");
647        let foo = root_dir
648            .create_child_file(&mut transaction, "foo")
649            .await
650            .expect("create_child_file failed");
651        transaction.commit().await.expect("commit failed");
652
653        // When the volume is first created it will include a new mutations key but we want to test
654        // what happens when the encrypted mutations file doesn't contain a new mutations key, so we
655        // flush here.
656        store.flush().await.expect("flush failed");
657
658        let mut transaction = fs
659            .root_store()
660            .new_transaction(
661                lock_keys![LockKey::object(store.store_object_id(), root_dir.object_id())],
662                Options::default(),
663            )
664            .await
665            .expect("new_transaction failed");
666        let bar = root_dir
667            .create_child_file(&mut transaction, "bar")
668            .await
669            .expect("create_child_file failed");
670        transaction.commit().await.expect("commit failed");
671
672        store.lock().await.expect("lock failed");
673
674        // Flushing the store whilst locked should create an encrypted mutations file.
675        store.flush().await.expect("flush failed");
676
677        // Check the reservation.
678        let info = store.load_store_info().await.unwrap();
679        let parent_store = store.parent_store().unwrap();
680        let mut total_layer_size = 0;
681        for &oid in &info.layers {
682            total_layer_size +=
683                parent_store.get_file_size(oid).await.expect("get_file_size failed");
684        }
685        assert_ne!(info.encrypted_mutations_object_id, INVALID_OBJECT_ID);
686        total_layer_size += layer_size_from_encrypted_mutations_size(
687            parent_store
688                .get_file_size(info.encrypted_mutations_object_id)
689                .await
690                .expect("get_file_size failed"),
691        );
692        assert_eq!(
693            fs.object_manager().reservation(store.store_object_id()),
694            Some(tree::reservation_amount_from_layer_size(total_layer_size))
695        );
696
697        // Unlocking the store should replay that encrypted mutations file.
698        store.unlock(crypt).await.expect("unlock failed");
699
700        ObjectStore::open_object(&store, foo.object_id(), HandleOptions::default(), None)
701            .await
702            .expect("open_object failed");
703
704        ObjectStore::open_object(&store, bar.object_id(), HandleOptions::default(), None)
705            .await
706            .expect("open_object failed");
707
708        fs.close().await.expect("close failed");
709    }
710}
711
712impl tree::MajorCompactable<ObjectKey, ObjectValue> for LSMTree<ObjectKey, ObjectValue> {
713    async fn major_iter(
714        iter: impl LayerIterator<ObjectKey, ObjectValue>,
715    ) -> Result<impl LayerIterator<ObjectKey, ObjectValue>, Error> {
716        iter.filter(|item: ItemRef<'_, _, _>| match item {
717            // Object Tombstone.
718            ItemRef { value: ObjectValue::None, .. } => false,
719            // Deleted extent.
720            ItemRef { value: ObjectValue::Extent(ExtentValue::None), .. } => false,
721            _ => true,
722        })
723        .await
724    }
725}