1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
45use crate::lsm_tree::types::{LayerIterator, MergeableKey, Value};
6use crate::lsm_tree::{LSMTree, LockedLayer, Query};
7use crate::object_handle::WriteBytes;
8use crate::object_store::journal;
9use crate::serialized_types::LATEST_VERSION;
10use anyhow::{Context, Error};
11use std::future::{ready, Future};
1213pub trait MajorCompactable<K: 'static, V: 'static> {
14/// Returns an iterator that wraps another iterator that is appropriate for major compactions.
15 /// Such an iterator should elide items that don't need to be written if a major compaction is
16 /// taking place.
17fn major_iter(
18 iter: impl LayerIterator<K, V>,
19 ) -> impl Future<Output = Result<impl LayerIterator<K, V>, Error>> + Send {
20 ready(Ok(iter))
21 }
22}
2324type Layers<K, V> = Vec<LockedLayer<K, V>>;
2526/// Picks an appropriate set of layers to flush from the tree and writes them to writer. After
27/// successfully writing a new layer, it returns the layers that should be kept and the old layers
28/// that should be purged.
29pub async fn flush<'a, K: MergeableKey, V: Value>(
30 tree: &'a LSMTree<K, V>,
31 writer: impl WriteBytes + Send,
32) -> Result<(Layers<K, V>, Layers<K, V>), Error>
33where
34LSMTree<K, V>: MajorCompactable<K, V>,
35{
36let earliest_version = tree.get_earliest_version();
37let mut layer_set = tree.immutable_layer_set();
38let mut total_size = 0;
39let mut layer_count = 0;
40let mut split_index = if earliest_version == LATEST_VERSION {
41 layer_set
42 .layers
43 .iter()
44 .position(|layer| {
45match layer.handle() {
46None => {}
47Some(handle) => {
48let size = handle.get_size();
49// Stop adding more layers when the total size of all the immutable layers
50 // so far is less than 3/4 of the layer we are looking at.
51if total_size > 0 && total_size * 4 < size * 3 {
52return true;
53 }
54 total_size += size;
55 layer_count += 1;
56 }
57 }
58false
59})
60 .unwrap_or(layer_set.layers.len())
61 } else {
62// We force a full compaction if there is an older version data structure
63 // in the layer set to upgrade it to the latest.
64layer_set.layers.len()
65 };
6667// If there's only one immutable layer to merge with and it's big, don't merge with it.
68 // Instead, we'll create a new layer and eventually that will grow to be big enough to merge
69 // with the existing layers. The threshold here cannot be so big such that merging with a
70 // layer that big ends up using so much journal space that we have to immediately flush
71 // again as that has the potential to end up in an infinite loop, and so the number chosen
72 // here is related to the journal's RECLAIM_SIZE.
73if layer_count == 1
74&& total_size > journal::DEFAULT_RECLAIM_SIZE
75 && layer_set.layers[split_index - 1].handle().is_some()
76 {
77 split_index -= 1;
78 }
7980let layers_to_keep = layer_set.layers.split_off(split_index);
8182 {
83let block_size = writer.block_size();
84let total_len = layer_set.sum_len();
85let mut merger = layer_set.merger();
86let iter = merger.query(Query::FullScan).await?;
87if layers_to_keep.is_empty() {
88let major_iter = LSMTree::<K, V>::major_iter(iter).await?;
89 tree.compact_with_iterator(major_iter, total_len, writer, block_size).await
90} else {
91 tree.compact_with_iterator(iter, total_len, writer, block_size).await
92}
93 .context("ObjectStore::flush")?;
94 }
9596Ok((layers_to_keep, layer_set.layers))
97}
9899/// Returns the amount of space that should be reserved to allow for compactions when the size of
100/// all layers is `layer_size`.
101pub fn reservation_amount_from_layer_size(layer_size: u64) -> u64 {
102// Conservatively allow for 50% overheads (which covers the amount of metadata space required
103 // for compactions).
104layer_size * 3 / 2
105}