fxfs/object_store/
tree.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::lsm_tree::types::{LayerIterator, MergeableKey, Value};
6use crate::lsm_tree::{LSMTree, LockedLayer, Query};
7use crate::object_handle::WriteBytes;
8use crate::object_store::journal;
9use crate::serialized_types::LATEST_VERSION;
10use anyhow::{Context, Error};
11use std::future::{ready, Future};
12
13pub trait MajorCompactable<K: 'static, V: 'static> {
14    /// Returns an iterator that wraps another iterator that is appropriate for major compactions.
15    /// Such an iterator should elide items that don't need to be written if a major compaction is
16    /// taking place.
17    fn major_iter(
18        iter: impl LayerIterator<K, V>,
19    ) -> impl Future<Output = Result<impl LayerIterator<K, V>, Error>> + Send {
20        ready(Ok(iter))
21    }
22}
23
24type Layers<K, V> = Vec<LockedLayer<K, V>>;
25
26/// Picks an appropriate set of layers to flush from the tree and writes them to writer.  After
27/// successfully writing a new layer, it returns the layers that should be kept and the old layers
28/// that should be purged.
29pub async fn flush<'a, K: MergeableKey, V: Value>(
30    tree: &'a LSMTree<K, V>,
31    writer: impl WriteBytes + Send,
32) -> Result<(Layers<K, V>, Layers<K, V>), Error>
33where
34    LSMTree<K, V>: MajorCompactable<K, V>,
35{
36    let earliest_version = tree.get_earliest_version();
37    let mut layer_set = tree.immutable_layer_set();
38    let mut total_size = 0;
39    let mut layer_count = 0;
40    let mut split_index = if earliest_version == LATEST_VERSION {
41        layer_set
42            .layers
43            .iter()
44            .position(|layer| {
45                match layer.handle() {
46                    None => {}
47                    Some(handle) => {
48                        let size = handle.get_size();
49                        // Stop adding more layers when the total size of all the immutable layers
50                        // so far is less than 3/4 of the layer we are looking at.
51                        if total_size > 0 && total_size * 4 < size * 3 {
52                            return true;
53                        }
54                        total_size += size;
55                        layer_count += 1;
56                    }
57                }
58                false
59            })
60            .unwrap_or(layer_set.layers.len())
61    } else {
62        // We force a full compaction if there is an older version data structure
63        // in the layer set to upgrade it to the latest.
64        layer_set.layers.len()
65    };
66
67    // If there's only one immutable layer to merge with and it's big, don't merge with it.
68    // Instead, we'll create a new layer and eventually that will grow to be big enough to merge
69    // with the existing layers.  The threshold here cannot be so big such that merging with a
70    // layer that big ends up using so much journal space that we have to immediately flush
71    // again as that has the potential to end up in an infinite loop, and so the number chosen
72    // here is related to the journal's RECLAIM_SIZE.
73    if layer_count == 1
74        && total_size > journal::DEFAULT_RECLAIM_SIZE
75        && layer_set.layers[split_index - 1].handle().is_some()
76    {
77        split_index -= 1;
78    }
79
80    let layers_to_keep = layer_set.layers.split_off(split_index);
81
82    {
83        let block_size = writer.block_size();
84        let total_len = layer_set.sum_len();
85        let mut merger = layer_set.merger();
86        let iter = merger.query(Query::FullScan).await?;
87        if layers_to_keep.is_empty() {
88            let major_iter = LSMTree::<K, V>::major_iter(iter).await?;
89            tree.compact_with_iterator(major_iter, total_len, writer, block_size).await
90        } else {
91            tree.compact_with_iterator(iter, total_len, writer, block_size).await
92        }
93        .context("ObjectStore::flush")?;
94    }
95
96    Ok((layers_to_keep, layer_set.layers))
97}
98
99/// Returns the amount of space that should be reserved to allow for compactions when the size of
100/// all layers is `layer_size`.
101pub fn reservation_amount_from_layer_size(layer_size: u64) -> u64 {
102    // Conservatively allow for 50% overheads (which covers the amount of metadata space required
103    // for compactions).
104    layer_size * 3 / 2
105}