Skip to main content

fxfs/
lsm_tree.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! # End-Key Indexing for File Extents
6//!
7//! Fxfs indexes file extents by their **end offset** because it avoids expensive reverse
8//! iteration (`Prev()`) during lookups in the LSM tree.
9//!
10//! If we indexed by start offset, a `Seek(>= X)` would return the extent starting *after* X,
11//! requiring a `Prev()` to find the extent actually containing X.
12//!
13//! By indexing by end offset, a `Seek(>= X + 1)` returns the first extent ending *after* X.
14//! Since extents do not overlap, this is the only extent that could possibly contain X,
15//! eliminating the need for backtracking.
16
17mod bloom_filter;
18pub mod cache;
19pub mod merge;
20pub mod persistent_layer;
21pub mod skip_list_layer;
22pub mod types;
23
24#[cfg(any(test, fuzz))]
25pub mod testing;
26
27use crate::drop_event::DropEvent;
28use crate::log::*;
29use crate::metrics::DurationMeasureScope;
30use crate::object_handle::{ReadObjectHandle, WriteBytes};
31use crate::serialized_types::{LATEST_VERSION, Version};
32
33use anyhow::Error;
34use cache::{ObjectCache, ObjectCacheResult};
35
36use fuchsia_inspect::HistogramProperty;
37use fuchsia_sync::RwLock;
38use persistent_layer::{PersistentLayer, PersistentLayerWriter};
39use skip_list_layer::SkipListLayer;
40use std::fmt;
41use std::sync::atomic::{AtomicUsize, Ordering};
42use std::sync::{Arc, Mutex};
43use types::{
44    Existence, Item, ItemRef, Key, Layer, LayerIterator, LayerKey, LayerWriter, MergeableKey,
45    OrdLowerBound, Value,
46};
47
48pub use merge::Query;
49
50const SKIP_LIST_LAYER_ITEMS: usize = 512;
51
52// For serialization.
53pub use persistent_layer::{
54    LayerHeader as PersistentLayerHeader, LayerHeaderV39 as PersistentLayerHeaderV39,
55    LayerInfo as PersistentLayerInfo, LayerInfoV39 as PersistentLayerInfoV39,
56};
57
58pub async fn layers_from_handles<K: Key, V: Value>(
59    handles: impl IntoIterator<Item = impl ReadObjectHandle + 'static>,
60) -> Result<Vec<Arc<dyn Layer<K, V>>>, Error> {
61    let mut layers = Vec::new();
62    for handle in handles {
63        layers.push(PersistentLayer::open(handle).await? as Arc<dyn Layer<K, V>>);
64    }
65    Ok(layers)
66}
67
68#[derive(Eq, PartialEq, Debug)]
69pub enum Operation {
70    Insert,
71    ReplaceOrInsert,
72    MergeInto,
73}
74
75pub type MutationCallback<K, V> = Option<Box<dyn Fn(Operation, &Item<K, V>) + Send + Sync>>;
76
77struct Inner<K, V> {
78    mutable_layer: Arc<SkipListLayer<K, V>>,
79    layers: Vec<Arc<dyn Layer<K, V>>>,
80    mutation_callback: MutationCallback<K, V>,
81}
82
83pub const LOG2_HISTOGRAM_BUCKETS: usize = 32;
84
85/// Metrics related to LSM tree churn, layer depths, and compaction performance.
86pub struct CompactionCounters {
87    /// Total number of compaction events that merged layers together.
88    pub compactions: u64,
89    /// Total bytes written during compaction, useful for measuring write amplification.
90    pub compaction_bytes_written: u64,
91    /// Total duration spent compacting layers.
92    pub compaction_time_ns: u64,
93    /// Number of mutable layers sealed. Useful for measuring churn.
94    pub total_layers_added: u64,
95    /// The maximum depth of the LSM tree observed. An indicator of worst-case read amplification.
96    pub max_layer_count: u64,
97    /// Log2 histogram of the sizes of newly compacted layers, used to verify compaction heuristics.
98    pub layer_size_histogram: [u64; LOG2_HISTOGRAM_BUCKETS],
99}
100
101impl Default for CompactionCounters {
102    fn default() -> Self {
103        Self {
104            compactions: 0,
105            compaction_bytes_written: 0,
106            compaction_time_ns: 0,
107            total_layers_added: 0,
108            max_layer_count: 0,
109            layer_size_histogram: [0; LOG2_HISTOGRAM_BUCKETS],
110        }
111    }
112}
113
114/// Global counters and metrics for an LSM tree's lifetime.
115pub struct TreeCounters {
116    /// Number of individual key-lookup attempts (reads) through the tree.
117    pub num_seeks: AtomicUsize,
118    /// Tracks the number of layer files we might have looked at across all seeks.
119    /// Used alongside `layer_files_skipped` to compute the effectiveness of bloom filters.
120    pub layer_files_total: AtomicUsize,
121    /// Tracks how many layer files we skipped searching thanks to the bloom filter rejecting them.
122    pub layer_files_skipped: AtomicUsize,
123    /// Embedded counters for mutable metrics that require locking.
124    pub compaction: Mutex<CompactionCounters>,
125}
126
127impl Default for TreeCounters {
128    fn default() -> Self {
129        Self {
130            num_seeks: AtomicUsize::new(0),
131            layer_files_total: AtomicUsize::new(0),
132            layer_files_skipped: AtomicUsize::new(0),
133            compaction: Mutex::new(CompactionCounters::default()),
134        }
135    }
136}
137
138/// Writes the items yielded by the iterator into the supplied object.
139#[fxfs_trace::trace]
140pub async fn compact_with_iterator<K: Key, V: Value, W: WriteBytes + Send>(
141    mut iterator: impl LayerIterator<K, V>,
142    num_items: usize,
143    writer: W,
144    block_size: u64,
145    mut yielder: Option<impl Yielder>,
146) -> Result<u64, Error> {
147    let mut writer = PersistentLayerWriter::<W, K, V>::new(writer, num_items, block_size).await?;
148    while let Some(item_ref) = iterator.get() {
149        debug!(item_ref:?; "compact: writing");
150        writer.write(item_ref).await?;
151        iterator.advance().await?;
152        if let Some(y) = yielder.as_mut() {
153            y.yield_now().await;
154        }
155    }
156    writer.flush().await?;
157
158    Ok(writer.bytes_written())
159}
160
161/// LSMTree manages a tree of layers to provide a key/value store.  Each layer contains deltas on
162/// the preceding layer.  The top layer is an in-memory mutable layer.  Layers can be compacted to
163/// form a new combined layer.
164pub struct LSMTree<K, V> {
165    data: RwLock<Inner<K, V>>,
166    merge_fn: merge::MergeFn<K, V>,
167    cache: Box<dyn ObjectCache<K, V>>,
168    counters: Arc<TreeCounters>,
169}
170
171#[fxfs_trace::trace]
172impl<'tree, K: MergeableKey, V: Value> LSMTree<K, V> {
173    /// Creates a new empty tree.
174    pub fn new(merge_fn: merge::MergeFn<K, V>, cache: Box<dyn ObjectCache<K, V>>) -> Self {
175        let counters = TreeCounters::default();
176        counters.compaction.lock().unwrap().max_layer_count = 1;
177        LSMTree {
178            data: RwLock::new(Inner {
179                mutable_layer: Self::new_mutable_layer(),
180                layers: Vec::new(),
181                mutation_callback: None,
182            }),
183            merge_fn,
184            cache,
185            counters: Arc::new(counters),
186        }
187    }
188
189    /// Opens an existing tree from the provided handles to the layer objects.
190    pub async fn open(
191        merge_fn: merge::MergeFn<K, V>,
192        handles: impl IntoIterator<Item = impl ReadObjectHandle + 'static>,
193        cache: Box<dyn ObjectCache<K, V>>,
194    ) -> Result<Self, Error> {
195        let layers = layers_from_handles(handles).await?;
196        let max_layer_count = layers.len() as u64 + 1;
197        let counters = TreeCounters::default();
198        counters.compaction.lock().unwrap().max_layer_count = max_layer_count;
199        Ok(LSMTree {
200            data: RwLock::new(Inner {
201                mutable_layer: Self::new_mutable_layer(),
202                layers,
203                mutation_callback: None,
204            }),
205            merge_fn,
206            cache,
207            counters: Arc::new(counters),
208        })
209    }
210
211    /// Replaces the immutable layers.
212    pub fn set_layers(&self, layers: Vec<Arc<dyn Layer<K, V>>>) {
213        let mut data = self.data.write();
214        data.layers = layers;
215        let layer_count = data.layers.len() + 1;
216        let mut counters = self.counters.compaction.lock().unwrap();
217        counters.max_layer_count = std::cmp::max(counters.max_layer_count, layer_count as u64);
218    }
219
220    /// Appends to the given layers at the end i.e. they should be base layers.  This is supposed
221    /// to be used after replay when we are opening a tree and we have discovered the base layers.
222    pub async fn append_layers(
223        &self,
224        handles: impl IntoIterator<Item = impl ReadObjectHandle + 'static>,
225    ) -> Result<(), Error> {
226        let mut layers = layers_from_handles(handles).await?;
227        let mut data = self.data.write();
228        data.layers.append(&mut layers);
229        let layer_count = data.layers.len() + 1;
230        let mut counters = self.counters.compaction.lock().unwrap();
231        counters.max_layer_count = std::cmp::max(counters.max_layer_count, layer_count as u64);
232        Ok(())
233    }
234
235    /// Resets the immutable layers.
236    pub fn reset_immutable_layers(&self) {
237        self.data.write().layers = Vec::new();
238    }
239
240    /// Seals the current mutable layer and creates a new one.
241    pub fn seal(&self) {
242        // We need to be sure there are no mutations currently in-progress.  This is currently
243        // guaranteed by ensuring that all mutations take a read lock on `data`.
244        let mut data = self.data.write();
245        let layer = std::mem::replace(&mut data.mutable_layer, Self::new_mutable_layer());
246        data.layers.insert(0, layer);
247        let layer_count = data.layers.len() + 1;
248        let mut counters = self.counters.compaction.lock().unwrap();
249        counters.max_layer_count = std::cmp::max(counters.max_layer_count, layer_count as u64);
250        counters.total_layers_added += 1;
251    }
252
253    /// Resets the tree to an empty state.
254    pub fn reset(&self) {
255        let mut data = self.data.write();
256        data.layers = Vec::new();
257        data.mutable_layer = Self::new_mutable_layer();
258    }
259
260    pub fn report_compaction_metrics(
261        &self,
262        bytes_written: u64,
263        duration: std::time::Duration,
264        layer_count: usize,
265    ) {
266        let mut counters = self.counters.compaction.lock().unwrap();
267        counters.compactions += 1;
268        counters.compaction_bytes_written += bytes_written;
269        counters.compaction_time_ns += duration.as_nanos() as u64;
270
271        let bucket = if bytes_written == 0 {
272            0
273        } else {
274            std::cmp::min(LOG2_HISTOGRAM_BUCKETS - 1, 63 - bytes_written.leading_zeros() as usize)
275        };
276        counters.layer_size_histogram[bucket] += 1;
277
278        crate::metrics::lsm_tree_metrics().compaction_layer_stack_depth.insert(layer_count as u64);
279    }
280
281    pub fn compaction_bytes_written(&self) -> u64 {
282        self.counters.compaction.lock().unwrap().compaction_bytes_written
283    }
284
285    /// Returns an empty layer-set for this tree.
286    pub fn empty_layer_set(&self) -> LayerSet<K, V> {
287        LayerSet { layers: Vec::new(), merge_fn: self.merge_fn, counters: self.counters.clone() }
288    }
289
290    /// Adds all the layers (including the mutable layer) to `layer_set`.
291    pub fn add_all_layers_to_layer_set(&self, layer_set: &mut LayerSet<K, V>) {
292        let data = self.data.read();
293        layer_set.layers.reserve_exact(data.layers.len() + 1);
294        layer_set
295            .layers
296            .push(LockedLayer::from(data.mutable_layer.clone() as Arc<dyn Layer<K, V>>));
297        for layer in &data.layers {
298            layer_set.layers.push(layer.clone().into());
299        }
300    }
301
302    /// Returns a clone of the current set of layers (including the mutable layer), after which one
303    /// can get an iterator.
304    pub fn layer_set(&self) -> LayerSet<K, V> {
305        let mut layer_set = self.empty_layer_set();
306        self.add_all_layers_to_layer_set(&mut layer_set);
307        layer_set
308    }
309
310    /// Returns the current set of immutable layers after which one can get an iterator (for e.g.
311    /// compacting).  Since these layers are immutable, getting an iterator should not block
312    /// anything else.
313    pub fn immutable_layer_set(&self) -> LayerSet<K, V> {
314        let data = self.data.read();
315        let mut layers = Vec::with_capacity(data.layers.len());
316        for layer in &data.layers {
317            layers.push(layer.clone().into());
318        }
319        LayerSet { layers, merge_fn: self.merge_fn, counters: self.counters.clone() }
320    }
321
322    /// Inserts an item into the mutable layer.
323    /// Returns error if item already exists.
324    pub fn insert(&self, item: Item<K, V>) -> Result<(), Error> {
325        let _measure = DurationMeasureScope::new(&crate::metrics::lsm_tree_metrics().insert);
326
327        let key = item.key.clone();
328        let val = if item.value == V::DELETED_MARKER { None } else { Some(item.value.clone()) };
329        {
330            // `seal` below relies on us holding a read lock whilst we do the mutation.
331            let data = self.data.read();
332            if let Some(mutation_callback) = data.mutation_callback.as_ref() {
333                mutation_callback(Operation::Insert, &item);
334            }
335            data.mutable_layer.insert(item)?;
336        }
337        self.cache.invalidate(key, val);
338        Ok(())
339    }
340
341    /// Replaces or inserts an item into the mutable layer.
342    pub fn replace_or_insert(&self, item: Item<K, V>) {
343        let _measure =
344            DurationMeasureScope::new(&crate::metrics::lsm_tree_metrics().replace_or_insert);
345
346        let key = item.key.clone();
347        let val = if item.value == V::DELETED_MARKER { None } else { Some(item.value.clone()) };
348        {
349            // `seal` below relies on us holding a read lock whilst we do the mutation.
350            let data = self.data.read();
351            if let Some(mutation_callback) = data.mutation_callback.as_ref() {
352                mutation_callback(Operation::ReplaceOrInsert, &item);
353            }
354            data.mutable_layer.replace_or_insert(item);
355        }
356        self.cache.invalidate(key, val);
357    }
358
359    /// Merges the given item into the mutable layer.
360    pub fn merge_into(&self, item: Item<K, V>, lower_bound: &K) {
361        let _measure = DurationMeasureScope::new(&crate::metrics::lsm_tree_metrics().merge_into);
362
363        let key = item.key.clone();
364        {
365            // `seal` below relies on us holding a read lock whilst we do the mutation.
366            let data = self.data.read();
367            if let Some(mutation_callback) = data.mutation_callback.as_ref() {
368                mutation_callback(Operation::MergeInto, &item);
369            }
370            data.mutable_layer.merge_into(item, lower_bound, self.merge_fn);
371        }
372        self.cache.invalidate(key, None);
373    }
374
375    /// Searches for an exact match for the given key. If the value is equal to
376    /// `Value::DELETED_MARKER` the item is considered missing and will not be returned.
377    pub async fn find(&self, search_key: &K) -> Result<Option<Item<K, V>>, Error>
378    where
379        K: Eq,
380    {
381        let _measure = DurationMeasureScope::new(&crate::metrics::lsm_tree_metrics().find);
382        // It is important that the cache lookup is done prior to fetching the layer set as the
383        // placeholder returned acts as a sort of lock for the validity of the item that may be
384        // inserted later via that placeholder.
385        let token = match self.cache.lookup_or_reserve(search_key) {
386            ObjectCacheResult::Value(value) => {
387                if value == V::DELETED_MARKER {
388                    return Ok(None);
389                } else {
390                    return Ok(Some(Item::new(search_key.clone(), value)));
391                }
392            }
393            ObjectCacheResult::Placeholder(token) => Some(token),
394            ObjectCacheResult::NoCache => None,
395        };
396        let layer_set = self.layer_set();
397        let mut merger = layer_set.merger();
398
399        Ok(match merger.query(Query::Point(search_key)).await?.get() {
400            Some(ItemRef { key, value }) if key == search_key && *value != V::DELETED_MARKER => {
401                if let Some(token) = token {
402                    token.complete(Some(value));
403                }
404                Some(Item { key: key.clone(), value: value.clone() })
405            }
406            _ => None,
407        })
408    }
409
410    pub fn mutable_layer(&self) -> Arc<SkipListLayer<K, V>> {
411        self.data.read().mutable_layer.clone()
412    }
413
414    /// Sets a mutation callback which is a callback that is triggered whenever any mutations are
415    /// applied to the tree.  This might be useful for tests that want to record the precise
416    /// sequence of mutations that are applied to the tree.
417    pub fn set_mutation_callback(&self, mutation_callback: MutationCallback<K, V>) {
418        self.data.write().mutation_callback = mutation_callback;
419    }
420
421    /// Returns the earliest version used by a layer in the tree.
422    pub fn get_earliest_version(&self) -> Version {
423        let mut earliest_version = LATEST_VERSION;
424        for layer in self.layer_set().layers {
425            let layer_version = layer.get_version();
426            if layer_version < earliest_version {
427                earliest_version = layer_version;
428            }
429        }
430        return earliest_version;
431    }
432
433    /// Returns a new mutable layer.
434    pub fn new_mutable_layer() -> Arc<SkipListLayer<K, V>> {
435        SkipListLayer::new(SKIP_LIST_LAYER_ITEMS)
436    }
437
438    /// Replaces the mutable layer.
439    pub fn set_mutable_layer(&self, layer: Arc<SkipListLayer<K, V>>) {
440        self.data.write().mutable_layer = layer;
441    }
442
443    /// Records inspect data for the LSM tree into `node`.  Called lazily when inspect is queried.
444    pub fn record_inspect_data(&self, root: &fuchsia_inspect::Node) {
445        let layer_set = self.layer_set();
446        root.record_child("layers", move |node| {
447            let mut index = 0;
448            for layer in layer_set.layers {
449                node.record_child(format!("{index}"), move |node| {
450                    layer.1.record_inspect_data(node)
451                });
452                index += 1;
453            }
454        });
455        {
456            let counters = self.counters.compaction.lock().unwrap();
457            root.record_uint("num_seeks", self.counters.num_seeks.load(Ordering::Relaxed) as u64);
458            root.record_uint("bloom_filter_success_percent", {
459                let layer_files_total = self.counters.layer_files_total.load(Ordering::Relaxed);
460                let layer_files_skipped = self.counters.layer_files_skipped.load(Ordering::Relaxed);
461                if layer_files_total == 0 {
462                    0
463                } else {
464                    (layer_files_skipped * 100).div_ceil(layer_files_total) as u64
465                }
466            });
467            root.record_uint("compactions", counters.compactions);
468            root.record_uint("compaction_bytes_written", counters.compaction_bytes_written);
469            root.record_uint("compaction_time_ns", counters.compaction_time_ns);
470            root.record_uint("total_layers_added", counters.total_layers_added);
471            root.record_uint("max_layer_count", counters.max_layer_count);
472
473            let layer_sizes = root.create_uint_exponential_histogram(
474                "layer_size_histogram_log2",
475                fuchsia_inspect::ExponentialHistogramParams {
476                    floor: 1,
477                    initial_step: 1,
478                    step_multiplier: 2,
479                    buckets: LOG2_HISTOGRAM_BUCKETS,
480                },
481            );
482            for (i, count) in counters.layer_size_histogram.iter().enumerate() {
483                layer_sizes.insert_multiple(1u64 << i, *count as usize);
484            }
485            root.record(layer_sizes);
486        }
487    }
488}
489
490/// This is an RAII wrapper for a layer which holds a lock on the layer (via the Layer::lock
491/// method).
492pub struct LockedLayer<K, V>(Arc<DropEvent>, Arc<dyn Layer<K, V>>);
493
494impl<K, V> LockedLayer<K, V> {
495    pub async fn close_layer(self) {
496        let layer = self.1;
497        std::mem::drop(self.0);
498        layer.close().await;
499    }
500}
501
502impl<K, V> From<Arc<dyn Layer<K, V>>> for LockedLayer<K, V> {
503    fn from(layer: Arc<dyn Layer<K, V>>) -> Self {
504        let event = layer.lock().unwrap();
505        Self(event, layer)
506    }
507}
508
509impl<K, V> std::ops::Deref for LockedLayer<K, V> {
510    type Target = Arc<dyn Layer<K, V>>;
511
512    fn deref(&self) -> &Self::Target {
513        &self.1
514    }
515}
516
517impl<K, V> AsRef<dyn Layer<K, V>> for LockedLayer<K, V> {
518    fn as_ref(&self) -> &(dyn Layer<K, V> + 'static) {
519        self.1.as_ref()
520    }
521}
522
523/// A LayerSet provides a snapshot of the layers at a particular point in time, and allows you to
524/// get an iterator.  Iterators borrow the layers so something needs to hold reference count.
525pub struct LayerSet<K, V> {
526    pub layers: Vec<LockedLayer<K, V>>,
527    merge_fn: merge::MergeFn<K, V>,
528    counters: Arc<TreeCounters>,
529}
530
531impl<K: Key + LayerKey + OrdLowerBound, V: Value> LayerSet<K, V> {
532    pub fn sum_len(&self) -> usize {
533        let mut size = 0;
534        for layer in &self.layers {
535            size += layer.len()
536        }
537        size
538    }
539
540    pub fn merger(&self) -> merge::Merger<'_, K, V> {
541        merge::Merger::new(
542            self.layers.iter().map(|x| x.as_ref()),
543            self.merge_fn,
544            self.counters.clone(),
545        )
546    }
547
548    /// See `Layer::key_exists`.
549    pub async fn key_exists(&self, key: &K) -> Result<Existence, Error> {
550        for l in &self.layers {
551            match l.key_exists(key).await? {
552                e @ (Existence::Exists | Existence::MaybeExists) => return Ok(e),
553                _ => {}
554            }
555        }
556        Ok(Existence::Missing)
557    }
558}
559
560impl<K, V> fmt::Debug for LayerSet<K, V> {
561    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
562        fmt.debug_list()
563            .entries(self.layers.iter().map(|l| {
564                if let Some(handle) = l.handle() {
565                    format!("{}", handle.object_id())
566                } else {
567                    format!("{:?}", Arc::as_ptr(l))
568                }
569            }))
570            .finish()
571    }
572}
573
574/// A yielder can be used during compactions which are low priority.
575pub trait Yielder: Send {
576    fn yield_now(&mut self) -> impl Future<Output = ()> + Send;
577}
578
579#[cfg(test)]
580mod tests {
581    use super::{LSMTree, Yielder, compact_with_iterator};
582    use crate::drop_event::DropEvent;
583    use crate::lsm_tree::cache::{
584        NullCache, ObjectCache, ObjectCachePlaceholder, ObjectCacheResult,
585    };
586    use crate::lsm_tree::merge::{MergeLayerIterator, MergeResult};
587    use crate::lsm_tree::types::{
588        BoxedLayerIterator, Existence, Item, ItemRef, Key, Layer, LayerIterator, Value,
589    };
590    use crate::lsm_tree::{Query, layers_from_handles};
591    use crate::object_handle::ObjectHandle;
592    use crate::serialized_types::{LATEST_VERSION, Version};
593    use crate::testing::fake_object::{FakeObject, FakeObjectHandle};
594    use crate::testing::writer::Writer;
595    use anyhow::{Error, anyhow};
596    use async_trait::async_trait;
597
598    use fuchsia_sync::Mutex;
599
600    use rand::rng;
601    use rand::seq::SliceRandom;
602
603    use std::sync::Arc;
604
605    use super::testing::TestKey;
606
607    fn emit_left_merge_fn(
608        _left: &MergeLayerIterator<'_, TestKey, u64>,
609        _right: &MergeLayerIterator<'_, TestKey, u64>,
610    ) -> MergeResult<TestKey, u64> {
611        MergeResult::EmitLeft
612    }
613
614    impl Value for u64 {
615        const DELETED_MARKER: Self = 0;
616    }
617
618    struct NoOpYielder;
619    impl Yielder for NoOpYielder {
620        async fn yield_now(&mut self) {}
621    }
622
623    #[fuchsia::test]
624    async fn test_iteration() {
625        let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
626        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
627        tree.insert(items[0].clone()).expect("insert error");
628        tree.insert(items[1].clone()).expect("insert error");
629        let layers = tree.layer_set();
630        let mut merger = layers.merger();
631        let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
632        let ItemRef { key, value, .. } = iter.get().expect("missing item");
633        assert_eq!((key, value), (&items[0].key, &items[0].value));
634        iter.advance().await.expect("advance failed");
635        let ItemRef { key, value, .. } = iter.get().expect("missing item");
636        assert_eq!((key, value), (&items[1].key, &items[1].value));
637        iter.advance().await.expect("advance failed");
638        assert!(iter.get().is_none());
639    }
640
641    #[fuchsia::test]
642    async fn test_compact() {
643        let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
644        let items = [
645            Item::new(TestKey(1..1), 1),
646            Item::new(TestKey(2..2), 2),
647            Item::new(TestKey(3..3), 3),
648            Item::new(TestKey(4..4), 4),
649        ];
650        tree.insert(items[0].clone()).expect("insert error");
651        tree.insert(items[1].clone()).expect("insert error");
652        tree.seal();
653        tree.insert(items[2].clone()).expect("insert error");
654        tree.insert(items[3].clone()).expect("insert error");
655        tree.seal();
656        let object = Arc::new(FakeObject::new());
657        let handle = FakeObjectHandle::new(object.clone());
658        {
659            let layer_set = tree.immutable_layer_set();
660            let mut merger = layer_set.merger();
661            let iter = merger.query(Query::FullScan).await.expect("create merger");
662            compact_with_iterator(
663                iter,
664                items.len(),
665                Writer::new(&handle).await,
666                handle.block_size(),
667                Option::<NoOpYielder>::None,
668            )
669            .await
670            .expect("compact failed");
671        }
672        tree.set_layers(layers_from_handles([handle]).await.expect("layers_from_handles failed"));
673        let handle = FakeObjectHandle::new(object.clone());
674        let tree = LSMTree::open(emit_left_merge_fn, [handle], Box::new(NullCache {}))
675            .await
676            .expect("open failed");
677
678        let layers = tree.layer_set();
679        let mut merger = layers.merger();
680        let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
681        for i in 1..5 {
682            let ItemRef { key, value, .. } = iter.get().expect("missing item");
683            assert_eq!((key, value), (&TestKey(i..i), &i));
684            iter.advance().await.expect("advance failed");
685        }
686        assert!(iter.get().is_none());
687    }
688
689    #[fuchsia::test]
690    async fn test_find() {
691        let items = [
692            Item::new(TestKey(1..1), 1),
693            Item::new(TestKey(2..2), 2),
694            Item::new(TestKey(3..3), 3),
695            Item::new(TestKey(4..4), 4),
696        ];
697        let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
698        tree.insert(items[0].clone()).expect("insert error");
699        tree.insert(items[1].clone()).expect("insert error");
700        tree.seal();
701        tree.insert(items[2].clone()).expect("insert error");
702        tree.insert(items[3].clone()).expect("insert error");
703
704        let item = tree.find(&items[1].key).await.expect("find failed").expect("not found");
705        assert_eq!(item, items[1]);
706        assert!(tree.find(&TestKey(100..100)).await.expect("find failed").is_none());
707    }
708
709    #[fuchsia::test]
710    async fn test_find_no_return_deleted_values() {
711        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), u64::DELETED_MARKER)];
712        let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
713        tree.insert(items[0].clone()).expect("insert error");
714        tree.insert(items[1].clone()).expect("insert error");
715
716        let item = tree.find(&items[0].key).await.expect("find failed").expect("not found");
717        assert_eq!(item, items[0]);
718        assert!(tree.find(&items[1].key).await.expect("find failed").is_none());
719    }
720
721    #[fuchsia::test]
722    async fn test_empty_seal() {
723        let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
724        tree.seal();
725        let item = Item::new(TestKey(1..1), 1);
726        tree.insert(item.clone()).expect("insert error");
727        let object = Arc::new(FakeObject::new());
728        let handle = FakeObjectHandle::new(object.clone());
729        {
730            let layer_set = tree.immutable_layer_set();
731            let mut merger = layer_set.merger();
732            let iter = merger.query(Query::FullScan).await.expect("create merger");
733            compact_with_iterator(
734                iter,
735                0,
736                Writer::new(&handle).await,
737                handle.block_size(),
738                Option::<NoOpYielder>::None,
739            )
740            .await
741            .expect("compact failed");
742        }
743        tree.set_layers(layers_from_handles([handle]).await.expect("layers_from_handles failed"));
744        let found_item = tree.find(&item.key).await.expect("find failed").expect("not found");
745        assert_eq!(found_item, item);
746        assert!(tree.find(&TestKey(2..2)).await.expect("find failed").is_none());
747    }
748
749    #[fuchsia::test]
750    async fn test_filter() {
751        let items = [
752            Item::new(TestKey(1..1), 1),
753            Item::new(TestKey(2..2), 2),
754            Item::new(TestKey(3..3), 3),
755            Item::new(TestKey(4..4), 4),
756        ];
757        let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
758        tree.insert(items[0].clone()).expect("insert error");
759        tree.insert(items[1].clone()).expect("insert error");
760        tree.insert(items[2].clone()).expect("insert error");
761        tree.insert(items[3].clone()).expect("insert error");
762
763        let layers = tree.layer_set();
764        let mut merger = layers.merger();
765
766        // Filter out odd keys (which also guarantees we skip the first key which is an edge case).
767        let mut iter = merger
768            .query(Query::FullScan)
769            .await
770            .expect("seek failed")
771            .filter(|item: ItemRef<'_, TestKey, u64>| item.key.0.start % 2 == 0)
772            .await
773            .expect("filter failed");
774
775        assert_eq!(iter.get(), Some(items[1].as_item_ref()));
776        iter.advance().await.expect("advance failed");
777        assert_eq!(iter.get(), Some(items[3].as_item_ref()));
778        iter.advance().await.expect("advance failed");
779        assert!(iter.get().is_none());
780    }
781
782    #[fuchsia::test]
783    async fn test_insert_order_agnostic() {
784        let items = [
785            Item::new(TestKey(1..1), 1),
786            Item::new(TestKey(2..2), 2),
787            Item::new(TestKey(3..3), 3),
788            Item::new(TestKey(4..4), 4),
789            Item::new(TestKey(5..5), 5),
790            Item::new(TestKey(6..6), 6),
791        ];
792        let a = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
793        for item in &items {
794            a.insert(item.clone()).expect("insert error");
795        }
796        let b = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
797        let mut shuffled = items.clone();
798        shuffled.shuffle(&mut rng());
799        for item in &shuffled {
800            b.insert(item.clone()).expect("insert error");
801        }
802        let layers = a.layer_set();
803        let mut merger = layers.merger();
804        let mut iter_a = merger.query(Query::FullScan).await.expect("seek failed");
805        let layers = b.layer_set();
806        let mut merger = layers.merger();
807        let mut iter_b = merger.query(Query::FullScan).await.expect("seek failed");
808
809        for item in items {
810            assert_eq!(Some(item.as_item_ref()), iter_a.get());
811            assert_eq!(Some(item.as_item_ref()), iter_b.get());
812            iter_a.advance().await.expect("advance failed");
813            iter_b.advance().await.expect("advance failed");
814        }
815        assert!(iter_a.get().is_none());
816        assert!(iter_b.get().is_none());
817    }
818
819    struct AuditCacheInner<'a, V: Value> {
820        lookups: u64,
821        completions: u64,
822        invalidations: u64,
823        drops: u64,
824        result: Option<ObjectCacheResult<'a, V>>,
825    }
826
827    impl<V: Value> AuditCacheInner<'_, V> {
828        fn stats(&self) -> (u64, u64, u64, u64) {
829            (self.lookups, self.completions, self.invalidations, self.drops)
830        }
831    }
832
833    struct AuditCache<'a, V: Value> {
834        inner: Arc<Mutex<AuditCacheInner<'a, V>>>,
835    }
836
837    impl<V: Value> AuditCache<'_, V> {
838        fn new() -> Self {
839            Self {
840                inner: Arc::new(Mutex::new(AuditCacheInner {
841                    lookups: 0,
842                    completions: 0,
843                    invalidations: 0,
844                    drops: 0,
845                    result: None,
846                })),
847            }
848        }
849    }
850
851    struct AuditPlaceholder<'a, V: Value> {
852        inner: Arc<Mutex<AuditCacheInner<'a, V>>>,
853        completed: Mutex<bool>,
854    }
855
856    impl<V: Value> ObjectCachePlaceholder<V> for AuditPlaceholder<'_, V> {
857        fn complete(self: Box<Self>, _: Option<&V>) {
858            self.inner.lock().completions += 1;
859            *self.completed.lock() = true;
860        }
861    }
862
863    impl<V: Value> Drop for AuditPlaceholder<'_, V> {
864        fn drop(&mut self) {
865            if !*self.completed.lock() {
866                self.inner.lock().drops += 1;
867            }
868        }
869    }
870
871    impl<K: Key + std::cmp::PartialEq, V: Value> ObjectCache<K, V> for AuditCache<'_, V> {
872        fn lookup_or_reserve(&self, _key: &K) -> ObjectCacheResult<'_, V> {
873            {
874                let mut inner = self.inner.lock();
875                inner.lookups += 1;
876                if inner.result.is_some() {
877                    return std::mem::take(&mut inner.result).unwrap();
878                }
879            }
880            ObjectCacheResult::Placeholder(Box::new(AuditPlaceholder {
881                inner: self.inner.clone(),
882                completed: Mutex::new(false),
883            }))
884        }
885
886        fn invalidate(&self, _key: K, _value: Option<V>) {
887            self.inner.lock().invalidations += 1;
888        }
889    }
890
891    #[fuchsia::test]
892    async fn test_cache_handling() {
893        let item = Item::new(TestKey(1..1), 1);
894        let cache = Box::new(AuditCache::new());
895        let inner = cache.inner.clone();
896        let a = LSMTree::new(emit_left_merge_fn, cache);
897
898        // Zero counters.
899        assert_eq!(inner.lock().stats(), (0, 0, 0, 0));
900
901        // Look for an item, but don't find it. So no insertion. It is dropped.
902        assert!(a.find(&item.key).await.expect("Failed find").is_none());
903        assert_eq!(inner.lock().stats(), (1, 0, 0, 1));
904
905        // Insert attempts to invalidate.
906        let _ = a.insert(item.clone());
907        assert_eq!(inner.lock().stats(), (1, 0, 1, 1));
908
909        // Look for item, find it and insert into the cache.
910        assert_eq!(
911            a.find(&item.key).await.expect("Failed find").expect("Item should be found.").value,
912            item.value
913        );
914        assert_eq!(inner.lock().stats(), (2, 1, 1, 1));
915
916        // Insert or replace attempts to invalidate as well.
917        a.replace_or_insert(item.clone());
918        assert_eq!(inner.lock().stats(), (2, 1, 2, 1));
919    }
920
921    #[fuchsia::test]
922    async fn test_cache_hit() {
923        let item = Item::new(TestKey(1..1), 1);
924        let cache = Box::new(AuditCache::new());
925        let inner = cache.inner.clone();
926        let a = LSMTree::new(emit_left_merge_fn, cache);
927
928        // Zero counters.
929        assert_eq!(inner.lock().stats(), (0, 0, 0, 0));
930
931        // Insert attempts to invalidate.
932        let _ = a.insert(item.clone());
933        assert_eq!(inner.lock().stats(), (0, 0, 1, 0));
934
935        // Set up the item to find in the cache.
936        inner.lock().result = Some(ObjectCacheResult::Value(item.value.clone()));
937
938        // Look for item, find it in cache, so no insert.
939        assert_eq!(
940            a.find(&item.key).await.expect("Failed find").expect("Item should be found.").value,
941            item.value
942        );
943        assert_eq!(inner.lock().stats(), (1, 0, 1, 0));
944    }
945
946    #[fuchsia::test]
947    async fn test_cache_says_uncacheable() {
948        let item = Item::new(TestKey(1..1), 1);
949        let cache = Box::new(AuditCache::new());
950        let inner = cache.inner.clone();
951        let a = LSMTree::new(emit_left_merge_fn, cache);
952        let _ = a.insert(item.clone());
953
954        // One invalidation from the insert.
955        assert_eq!(inner.lock().stats(), (0, 0, 1, 0));
956
957        // Set up the NoCache response to find in the cache.
958        inner.lock().result = Some(ObjectCacheResult::NoCache);
959
960        // Look for item, it is uncacheable, so no insert.
961        assert_eq!(
962            a.find(&item.key).await.expect("Failed find").expect("Should find item").value,
963            item.value
964        );
965        assert_eq!(inner.lock().stats(), (1, 0, 1, 0));
966    }
967
968    struct FailLayer {
969        drop_event: Mutex<Option<Arc<DropEvent>>>,
970    }
971
972    impl FailLayer {
973        fn new() -> Self {
974            Self { drop_event: Mutex::new(Some(Arc::new(DropEvent::new()))) }
975        }
976    }
977
978    #[async_trait]
979    impl<K: Key, V: Value> Layer<K, V> for FailLayer {
980        async fn seek(
981            &self,
982            _bound: std::ops::Bound<&K>,
983        ) -> Result<BoxedLayerIterator<'_, K, V>, Error> {
984            Err(anyhow!("Purposely failed seek"))
985        }
986
987        fn lock(&self) -> Option<Arc<DropEvent>> {
988            self.drop_event.lock().clone()
989        }
990
991        fn len(&self) -> usize {
992            0
993        }
994
995        async fn close(&self) {
996            let listener = match std::mem::replace(&mut (*self.drop_event.lock()), None) {
997                Some(drop_event) => drop_event.listen(),
998                None => return,
999            };
1000            listener.await;
1001        }
1002
1003        fn get_version(&self) -> Version {
1004            LATEST_VERSION
1005        }
1006
1007        async fn key_exists(&self, _key: &K) -> Result<Existence, Error> {
1008            unimplemented!();
1009        }
1010    }
1011
1012    struct MockLayer {
1013        exists_result: Existence,
1014        drop_event: Mutex<Option<Arc<DropEvent>>>,
1015    }
1016
1017    impl MockLayer {
1018        fn new(exists_result: Existence) -> Self {
1019            Self { exists_result, drop_event: Mutex::new(Some(Arc::new(DropEvent::new()))) }
1020        }
1021    }
1022
1023    #[async_trait]
1024    impl<K: Key, V: Value> Layer<K, V> for MockLayer {
1025        async fn seek(
1026            &self,
1027            _bound: std::ops::Bound<&K>,
1028        ) -> Result<BoxedLayerIterator<'_, K, V>, Error> {
1029            unimplemented!()
1030        }
1031
1032        fn lock(&self) -> Option<Arc<DropEvent>> {
1033            self.drop_event.lock().clone()
1034        }
1035
1036        fn len(&self) -> usize {
1037            0
1038        }
1039
1040        async fn close(&self) {
1041            let listener = match std::mem::replace(&mut (*self.drop_event.lock()), None) {
1042                Some(drop_event) => drop_event.listen(),
1043                None => return,
1044            };
1045            listener.await;
1046        }
1047
1048        fn get_version(&self) -> Version {
1049            LATEST_VERSION
1050        }
1051
1052        async fn key_exists(&self, _key: &K) -> Result<Existence, Error> {
1053            Ok(self.exists_result)
1054        }
1055    }
1056
1057    #[fuchsia::test]
1058    async fn test_layer_set_key_exists() {
1059        use super::LockedLayer;
1060
1061        let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
1062        let mut layer_set = tree.empty_layer_set();
1063
1064        // Empty layer set should return Missing.
1065        assert_eq!(
1066            layer_set.key_exists(&TestKey(0..1)).await.expect("key_exists failed"),
1067            Existence::Missing
1068        );
1069
1070        // Add a layer that returns Missing.
1071        layer_set.layers.push(LockedLayer::from(
1072            Arc::new(MockLayer::new(Existence::Missing)) as Arc<dyn Layer<TestKey, u64>>
1073        ));
1074        assert_eq!(
1075            layer_set.key_exists(&TestKey(0..1)).await.expect("key_exists failed"),
1076            Existence::Missing
1077        );
1078
1079        // Add a layer that returns MaybeExists.
1080        layer_set.layers.push(LockedLayer::from(
1081            Arc::new(MockLayer::new(Existence::MaybeExists)) as Arc<dyn Layer<TestKey, u64>>
1082        ));
1083        assert_eq!(
1084            layer_set.key_exists(&TestKey(0..1)).await.expect("key_exists failed"),
1085            Existence::MaybeExists
1086        );
1087
1088        // Add a layer that returns Exists.
1089        layer_set.layers.insert(
1090            0,
1091            LockedLayer::from(
1092                Arc::new(MockLayer::new(Existence::Exists)) as Arc<dyn Layer<TestKey, u64>>
1093            ),
1094        );
1095        assert_eq!(
1096            layer_set.key_exists(&TestKey(0..1)).await.expect("key_exists failed"),
1097            Existence::Exists
1098        );
1099    }
1100
1101    #[fuchsia::test]
1102    async fn test_failed_lookup() {
1103        let cache = Box::new(AuditCache::new());
1104        let inner = cache.inner.clone();
1105        let a = LSMTree::new(emit_left_merge_fn, cache);
1106        a.set_layers(vec![Arc::new(FailLayer::new())]);
1107
1108        // Zero counters.
1109        assert_eq!(inner.lock().stats(), (0, 0, 0, 0));
1110
1111        // Lookup should fail and drop the placeholder.
1112        assert!(a.find(&TestKey(1..1)).await.is_err());
1113        assert_eq!(inner.lock().stats(), (1, 0, 0, 1));
1114    }
1115}
1116
1117#[cfg(fuzz)]
1118mod fuzz {
1119    use crate::lsm_tree::types::{Item, Value};
1120    use crate::serialized_types::{
1121        LATEST_VERSION, Version, Versioned, VersionedLatest, versioned_type,
1122    };
1123    use arbitrary::Arbitrary;
1124
1125    use fuzz::fuzz;
1126
1127    use super::testing::TestKey;
1128
1129    impl Versioned for u64 {}
1130    versioned_type! { 1.. => u64 }
1131
1132    impl Value for u64 {
1133        const DELETED_MARKER: Self = 0;
1134    }
1135
1136    // Note: This code isn't really dead. it's used below in
1137    // `fuzz_lsm_tree_action`. However, the `#[fuzz]` proc macro attribute
1138    // obfuscates the usage enough to confuse the compiler.
1139    #[allow(dead_code)]
1140    #[derive(Arbitrary)]
1141    enum FuzzAction {
1142        Insert(Item<TestKey, u64>),
1143        ReplaceOrInsert(Item<TestKey, u64>),
1144        MergeInto(Item<TestKey, u64>, TestKey),
1145        Find(TestKey),
1146        Seal,
1147    }
1148
1149    #[fuzz]
1150    fn fuzz_lsm_tree_actions(actions: Vec<FuzzAction>) {
1151        use super::LSMTree;
1152        use super::cache::NullCache;
1153        use crate::lsm_tree::merge::{MergeLayerIterator, MergeResult};
1154        use futures::executor::block_on;
1155
1156        fn emit_left_merge_fn(
1157            _left: &MergeLayerIterator<'_, TestKey, u64>,
1158            _right: &MergeLayerIterator<'_, TestKey, u64>,
1159        ) -> MergeResult<TestKey, u64> {
1160            MergeResult::EmitLeft
1161        }
1162
1163        let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
1164        for action in actions {
1165            match action {
1166                FuzzAction::Insert(item) => {
1167                    let _ = tree.insert(item);
1168                }
1169                FuzzAction::ReplaceOrInsert(item) => {
1170                    tree.replace_or_insert(item);
1171                }
1172                FuzzAction::Find(key) => {
1173                    block_on(tree.find(&key)).expect("find failed");
1174                }
1175                FuzzAction::MergeInto(item, bound) => tree.merge_into(item, &bound),
1176                FuzzAction::Seal => tree.seal(),
1177            };
1178        }
1179    }
1180}