Skip to main content

fxfs/
lsm_tree.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! # End-Key Indexing for File Extents
6//!
7//! Fxfs indexes file extents by their **end offset** because it avoids expensive reverse
8//! iteration (`Prev()`) during lookups in the LSM tree.
9//!
10//! If we indexed by start offset, a `Seek(>= X)` would return the extent starting *after* X,
11//! requiring a `Prev()` to find the extent actually containing X.
12//!
13//! By indexing by end offset, a `Seek(>= X + 1)` returns the first extent ending *after* X.
14//! Since extents do not overlap, this is the only extent that could possibly contain X,
15//! eliminating the need for backtracking.
16
17mod bloom_filter;
18pub mod cache;
19pub mod merge;
20pub mod persistent_layer;
21pub mod skip_list_layer;
22pub mod types;
23
24#[cfg(any(test, fuzz))]
25pub mod testing;
26
27use crate::drop_event::DropEvent;
28use crate::log::*;
29use crate::metrics::DurationMeasureScope;
30use crate::object_handle::{ReadObjectHandle, WriteBytes};
31use crate::serialized_types::{LATEST_VERSION, Version};
32
33use anyhow::Error;
34use cache::{ObjectCache, ObjectCacheResult};
35
36use fuchsia_inspect::HistogramProperty;
37use fuchsia_sync::RwLock;
38use persistent_layer::{PersistentLayer, PersistentLayerWriter};
39use skip_list_layer::SkipListLayer;
40use std::fmt;
41use std::sync::atomic::{AtomicUsize, Ordering};
42use std::sync::{Arc, Mutex};
43use types::{
44    Existence, Item, ItemRef, Key, Layer, LayerIterator, LayerKey, LayerWriter, MergeableKey,
45    OrdLowerBound, Value,
46};
47
48pub use merge::Query;
49
50const SKIP_LIST_LAYER_ITEMS: usize = 512;
51
52// For serialization.
53pub use persistent_layer::{
54    LayerHeader as PersistentLayerHeader, LayerHeaderV39 as PersistentLayerHeaderV39,
55    LayerInfo as PersistentLayerInfo, LayerInfoV39 as PersistentLayerInfoV39,
56};
57
58pub async fn layers_from_handles<K: Key, V: Value>(
59    handles: impl IntoIterator<Item = impl ReadObjectHandle + 'static>,
60) -> Result<Vec<Arc<dyn Layer<K, V>>>, Error> {
61    let mut layers = Vec::new();
62    for handle in handles {
63        layers.push(PersistentLayer::open(handle).await? as Arc<dyn Layer<K, V>>);
64    }
65    Ok(layers)
66}
67
68#[derive(Eq, PartialEq, Debug)]
69pub enum Operation {
70    Insert,
71    ReplaceOrInsert,
72    MergeInto,
73}
74
75pub type MutationCallback<K, V> = Option<Box<dyn Fn(Operation, &Item<K, V>) + Send + Sync>>;
76
77struct Inner<K, V> {
78    mutable_layer: Arc<SkipListLayer<K, V>>,
79    layers: Vec<Arc<dyn Layer<K, V>>>,
80    mutation_callback: MutationCallback<K, V>,
81}
82
83pub const LOG2_HISTOGRAM_BUCKETS: usize = 32;
84
85/// Metrics related to LSM tree churn, layer depths, and compaction performance.
86pub struct CompactionCounters {
87    /// Total number of compaction events that merged layers together.
88    pub compactions: u64,
89    /// Total bytes written during compaction, useful for measuring write amplification.
90    pub compaction_bytes_written: u64,
91    /// Total duration spent compacting layers.
92    pub compaction_time_ns: u64,
93    /// Number of mutable layers sealed. Useful for measuring churn.
94    pub total_layers_added: u64,
95    /// The maximum depth of the LSM tree observed. An indicator of worst-case read amplification.
96    pub max_layer_count: u64,
97    /// Log2 histogram of the sizes of newly compacted layers, used to verify compaction heuristics.
98    pub layer_size_histogram: [u64; LOG2_HISTOGRAM_BUCKETS],
99}
100
101impl Default for CompactionCounters {
102    fn default() -> Self {
103        Self {
104            compactions: 0,
105            compaction_bytes_written: 0,
106            compaction_time_ns: 0,
107            total_layers_added: 0,
108            max_layer_count: 0,
109            layer_size_histogram: [0; LOG2_HISTOGRAM_BUCKETS],
110        }
111    }
112}
113
114/// Global counters and metrics for an LSM tree's lifetime.
115pub struct TreeCounters {
116    /// Number of individual key-lookup attempts (reads) through the tree.
117    pub num_seeks: AtomicUsize,
118    /// Tracks the number of layer files we might have looked at across all seeks.
119    /// Used alongside `layer_files_skipped` to compute the effectiveness of bloom filters.
120    pub layer_files_total: AtomicUsize,
121    /// Tracks how many layer files we skipped searching thanks to the bloom filter rejecting them.
122    pub layer_files_skipped: AtomicUsize,
123    /// Embedded counters for mutable metrics that require locking.
124    pub compaction: Mutex<CompactionCounters>,
125}
126
127impl Default for TreeCounters {
128    fn default() -> Self {
129        Self {
130            num_seeks: AtomicUsize::new(0),
131            layer_files_total: AtomicUsize::new(0),
132            layer_files_skipped: AtomicUsize::new(0),
133            compaction: Mutex::new(CompactionCounters::default()),
134        }
135    }
136}
137
138/// Writes the items yielded by the iterator into the supplied object.
139#[fxfs_trace::trace]
140pub async fn compact_with_iterator<K: Key, V: Value, W: WriteBytes + Send>(
141    mut iterator: impl LayerIterator<K, V>,
142    num_items: usize,
143    writer: W,
144    block_size: u64,
145    mut yielder: Option<impl Yielder>,
146) -> Result<u64, Error> {
147    let mut writer = PersistentLayerWriter::<W, K, V>::new(writer, num_items, block_size).await?;
148    while let Some(item_ref) = iterator.get() {
149        debug!(item_ref:?; "compact: writing");
150        writer.write(item_ref).await?;
151        iterator.advance().await?;
152        if let Some(y) = yielder.as_mut() {
153            y.yield_now().await;
154        }
155    }
156    writer.complete().await
157}
158
159/// LSMTree manages a tree of layers to provide a key/value store.  Each layer contains deltas on
160/// the preceding layer.  The top layer is an in-memory mutable layer.  Layers can be compacted to
161/// form a new combined layer.
162pub struct LSMTree<K, V> {
163    data: RwLock<Inner<K, V>>,
164    merge_fn: merge::MergeFn<K, V>,
165    cache: Box<dyn ObjectCache<K, V>>,
166    counters: Arc<TreeCounters>,
167}
168
169#[fxfs_trace::trace]
170impl<'tree, K: MergeableKey, V: Value> LSMTree<K, V> {
171    /// Creates a new empty tree.
172    pub fn new(merge_fn: merge::MergeFn<K, V>, cache: Box<dyn ObjectCache<K, V>>) -> Self {
173        let counters = TreeCounters::default();
174        counters.compaction.lock().unwrap().max_layer_count = 1;
175        LSMTree {
176            data: RwLock::new(Inner {
177                mutable_layer: Self::new_mutable_layer(),
178                layers: Vec::new(),
179                mutation_callback: None,
180            }),
181            merge_fn,
182            cache,
183            counters: Arc::new(counters),
184        }
185    }
186
187    /// Opens an existing tree from the provided handles to the layer objects.
188    pub async fn open(
189        merge_fn: merge::MergeFn<K, V>,
190        handles: impl IntoIterator<Item = impl ReadObjectHandle + 'static>,
191        cache: Box<dyn ObjectCache<K, V>>,
192    ) -> Result<Self, Error> {
193        let layers = layers_from_handles(handles).await?;
194        let max_layer_count = layers.len() as u64 + 1;
195        let counters = TreeCounters::default();
196        counters.compaction.lock().unwrap().max_layer_count = max_layer_count;
197        Ok(LSMTree {
198            data: RwLock::new(Inner {
199                mutable_layer: Self::new_mutable_layer(),
200                layers,
201                mutation_callback: None,
202            }),
203            merge_fn,
204            cache,
205            counters: Arc::new(counters),
206        })
207    }
208
209    /// Replaces the immutable layers.
210    pub fn set_layers(&self, layers: Vec<Arc<dyn Layer<K, V>>>) {
211        let mut data = self.data.write();
212        data.layers = layers;
213        let layer_count = data.layers.len() + 1;
214        let mut counters = self.counters.compaction.lock().unwrap();
215        counters.max_layer_count = std::cmp::max(counters.max_layer_count, layer_count as u64);
216    }
217
218    /// Appends to the given layers at the end i.e. they should be base layers.  This is supposed
219    /// to be used after replay when we are opening a tree and we have discovered the base layers.
220    pub async fn append_layers(
221        &self,
222        handles: impl IntoIterator<Item = impl ReadObjectHandle + 'static>,
223    ) -> Result<(), Error> {
224        let mut layers = layers_from_handles(handles).await?;
225        let mut data = self.data.write();
226        data.layers.append(&mut layers);
227        let layer_count = data.layers.len() + 1;
228        let mut counters = self.counters.compaction.lock().unwrap();
229        counters.max_layer_count = std::cmp::max(counters.max_layer_count, layer_count as u64);
230        Ok(())
231    }
232
233    /// Resets the immutable layers.
234    pub fn reset_immutable_layers(&self) {
235        self.data.write().layers = Vec::new();
236    }
237
238    /// Seals the current mutable layer and creates a new one.
239    pub fn seal(&self) {
240        // We need to be sure there are no mutations currently in-progress.  This is currently
241        // guaranteed by ensuring that all mutations take a read lock on `data`.
242        let mut data = self.data.write();
243        let layer = std::mem::replace(&mut data.mutable_layer, Self::new_mutable_layer());
244        data.layers.insert(0, layer);
245        let layer_count = data.layers.len() + 1;
246        let mut counters = self.counters.compaction.lock().unwrap();
247        counters.max_layer_count = std::cmp::max(counters.max_layer_count, layer_count as u64);
248        counters.total_layers_added += 1;
249    }
250
251    /// Resets the tree to an empty state.
252    pub fn reset(&self) {
253        let mut data = self.data.write();
254        data.layers = Vec::new();
255        data.mutable_layer = Self::new_mutable_layer();
256    }
257
258    pub fn report_compaction_metrics(
259        &self,
260        bytes_written: u64,
261        duration: std::time::Duration,
262        layer_count: usize,
263    ) {
264        let mut counters = self.counters.compaction.lock().unwrap();
265        counters.compactions += 1;
266        counters.compaction_bytes_written += bytes_written;
267        counters.compaction_time_ns += duration.as_nanos() as u64;
268
269        let bucket = if bytes_written == 0 {
270            0
271        } else {
272            std::cmp::min(LOG2_HISTOGRAM_BUCKETS - 1, 63 - bytes_written.leading_zeros() as usize)
273        };
274        counters.layer_size_histogram[bucket] += 1;
275
276        crate::metrics::lsm_tree_metrics().compaction_layer_stack_depth.insert(layer_count as u64);
277    }
278
279    pub fn compaction_bytes_written(&self) -> u64 {
280        self.counters.compaction.lock().unwrap().compaction_bytes_written
281    }
282
283    /// Returns an empty layer-set for this tree.
284    pub fn empty_layer_set(&self) -> LayerSet<K, V> {
285        LayerSet { layers: Vec::new(), merge_fn: self.merge_fn, counters: self.counters.clone() }
286    }
287
288    /// Adds all the layers (including the mutable layer) to `layer_set`.
289    pub fn add_all_layers_to_layer_set(&self, layer_set: &mut LayerSet<K, V>) {
290        let data = self.data.read();
291        layer_set.layers.reserve_exact(data.layers.len() + 1);
292        layer_set
293            .layers
294            .push(LockedLayer::from(data.mutable_layer.clone() as Arc<dyn Layer<K, V>>));
295        for layer in &data.layers {
296            layer_set.layers.push(layer.clone().into());
297        }
298    }
299
300    /// Returns a clone of the current set of layers (including the mutable layer), after which one
301    /// can get an iterator.
302    pub fn layer_set(&self) -> LayerSet<K, V> {
303        let mut layer_set = self.empty_layer_set();
304        self.add_all_layers_to_layer_set(&mut layer_set);
305        layer_set
306    }
307
308    /// Returns the current set of immutable layers after which one can get an iterator (for e.g.
309    /// compacting).  Since these layers are immutable, getting an iterator should not block
310    /// anything else.
311    pub fn immutable_layer_set(&self) -> LayerSet<K, V> {
312        let data = self.data.read();
313        let mut layers = Vec::with_capacity(data.layers.len());
314        for layer in &data.layers {
315            layers.push(layer.clone().into());
316        }
317        LayerSet { layers, merge_fn: self.merge_fn, counters: self.counters.clone() }
318    }
319
320    /// Inserts an item into the mutable layer.
321    /// Returns error if item already exists.
322    pub fn insert(&self, item: Item<K, V>) -> Result<(), Error> {
323        let _measure = DurationMeasureScope::new(&crate::metrics::lsm_tree_metrics().insert);
324
325        let key = item.key.clone();
326        let val = if item.value == V::DELETED_MARKER { None } else { Some(item.value.clone()) };
327        {
328            // `seal` below relies on us holding a read lock whilst we do the mutation.
329            let data = self.data.read();
330            if let Some(mutation_callback) = data.mutation_callback.as_ref() {
331                mutation_callback(Operation::Insert, &item);
332            }
333            data.mutable_layer.insert(item)?;
334        }
335        self.cache.invalidate(key, val);
336        Ok(())
337    }
338
339    /// Replaces or inserts an item into the mutable layer.
340    pub fn replace_or_insert(&self, item: Item<K, V>) {
341        let _measure =
342            DurationMeasureScope::new(&crate::metrics::lsm_tree_metrics().replace_or_insert);
343
344        let key = item.key.clone();
345        let val = if item.value == V::DELETED_MARKER { None } else { Some(item.value.clone()) };
346        {
347            // `seal` below relies on us holding a read lock whilst we do the mutation.
348            let data = self.data.read();
349            if let Some(mutation_callback) = data.mutation_callback.as_ref() {
350                mutation_callback(Operation::ReplaceOrInsert, &item);
351            }
352            data.mutable_layer.replace_or_insert(item);
353        }
354        self.cache.invalidate(key, val);
355    }
356
357    /// Merges the given item into the mutable layer.
358    pub fn merge_into(&self, item: Item<K, V>, lower_bound: &K) {
359        let _measure = DurationMeasureScope::new(&crate::metrics::lsm_tree_metrics().merge_into);
360
361        let key = item.key.clone();
362        {
363            // `seal` below relies on us holding a read lock whilst we do the mutation.
364            let data = self.data.read();
365            if let Some(mutation_callback) = data.mutation_callback.as_ref() {
366                mutation_callback(Operation::MergeInto, &item);
367            }
368            data.mutable_layer.merge_into(item, lower_bound, self.merge_fn);
369        }
370        self.cache.invalidate(key, None);
371    }
372
373    /// Searches for an exact match for the given key. If the value is equal to
374    /// `Value::DELETED_MARKER` the item is considered missing and will not be returned.
375    pub async fn find(&self, search_key: &K) -> Result<Option<Item<K, V>>, Error>
376    where
377        K: Eq,
378    {
379        let _measure = DurationMeasureScope::new(&crate::metrics::lsm_tree_metrics().find);
380        // It is important that the cache lookup is done prior to fetching the layer set as the
381        // placeholder returned acts as a sort of lock for the validity of the item that may be
382        // inserted later via that placeholder.
383        let token = match self.cache.lookup_or_reserve(search_key) {
384            ObjectCacheResult::Value(value) => {
385                if value == V::DELETED_MARKER {
386                    return Ok(None);
387                } else {
388                    return Ok(Some(Item::new(search_key.clone(), value)));
389                }
390            }
391            ObjectCacheResult::Placeholder(token) => Some(token),
392            ObjectCacheResult::NoCache => None,
393        };
394        let layer_set = self.layer_set();
395        let mut merger = layer_set.merger();
396
397        Ok(match merger.query(Query::Point(search_key)).await?.get() {
398            Some(ItemRef { key, value }) if key == search_key && *value != V::DELETED_MARKER => {
399                if let Some(token) = token {
400                    token.complete(Some(value));
401                }
402                Some(Item { key: key.clone(), value: value.clone() })
403            }
404            _ => None,
405        })
406    }
407
408    pub fn mutable_layer(&self) -> Arc<SkipListLayer<K, V>> {
409        self.data.read().mutable_layer.clone()
410    }
411
412    /// Sets a mutation callback which is a callback that is triggered whenever any mutations are
413    /// applied to the tree.  This might be useful for tests that want to record the precise
414    /// sequence of mutations that are applied to the tree.
415    pub fn set_mutation_callback(&self, mutation_callback: MutationCallback<K, V>) {
416        self.data.write().mutation_callback = mutation_callback;
417    }
418
419    /// Returns the earliest version used by a layer in the tree.
420    pub fn get_earliest_version(&self) -> Version {
421        let mut earliest_version = LATEST_VERSION;
422        for layer in self.layer_set().layers {
423            let layer_version = layer.get_version();
424            if layer_version < earliest_version {
425                earliest_version = layer_version;
426            }
427        }
428        return earliest_version;
429    }
430
431    /// Returns a new mutable layer.
432    pub fn new_mutable_layer() -> Arc<SkipListLayer<K, V>> {
433        SkipListLayer::new(SKIP_LIST_LAYER_ITEMS)
434    }
435
436    /// Replaces the mutable layer.
437    pub fn set_mutable_layer(&self, layer: Arc<SkipListLayer<K, V>>) {
438        self.data.write().mutable_layer = layer;
439    }
440
441    /// Records inspect data for the LSM tree into `node`.  Called lazily when inspect is queried.
442    pub fn record_inspect_data(&self, root: &fuchsia_inspect::Node) {
443        let layer_set = self.layer_set();
444        root.record_child("layers", move |node| {
445            let mut index = 0;
446            for layer in layer_set.layers {
447                node.record_child(format!("{index}"), move |node| {
448                    layer.1.record_inspect_data(node)
449                });
450                index += 1;
451            }
452        });
453        {
454            let counters = self.counters.compaction.lock().unwrap();
455            root.record_uint("num_seeks", self.counters.num_seeks.load(Ordering::Relaxed) as u64);
456            root.record_uint("bloom_filter_success_percent", {
457                let layer_files_total = self.counters.layer_files_total.load(Ordering::Relaxed);
458                let layer_files_skipped = self.counters.layer_files_skipped.load(Ordering::Relaxed);
459                if layer_files_total == 0 {
460                    0
461                } else {
462                    (layer_files_skipped * 100).div_ceil(layer_files_total) as u64
463                }
464            });
465            root.record_uint("compactions", counters.compactions);
466            root.record_uint("compaction_bytes_written", counters.compaction_bytes_written);
467            root.record_uint("compaction_time_ns", counters.compaction_time_ns);
468            root.record_uint("total_layers_added", counters.total_layers_added);
469            root.record_uint("max_layer_count", counters.max_layer_count);
470
471            let layer_sizes = root.create_uint_exponential_histogram(
472                "layer_size_histogram_log2",
473                fuchsia_inspect::ExponentialHistogramParams {
474                    floor: 1,
475                    initial_step: 1,
476                    step_multiplier: 2,
477                    buckets: LOG2_HISTOGRAM_BUCKETS,
478                },
479            );
480            for (i, count) in counters.layer_size_histogram.iter().enumerate() {
481                layer_sizes.insert_multiple(1u64 << i, *count as usize);
482            }
483            root.record(layer_sizes);
484        }
485    }
486}
487
488/// This is an RAII wrapper for a layer which holds a lock on the layer (via the Layer::lock
489/// method).
490pub struct LockedLayer<K, V>(Arc<DropEvent>, Arc<dyn Layer<K, V>>);
491
492impl<K, V> LockedLayer<K, V> {
493    pub async fn close_layer(self) {
494        let layer = self.1;
495        std::mem::drop(self.0);
496        layer.close().await;
497    }
498}
499
500impl<K, V> From<Arc<dyn Layer<K, V>>> for LockedLayer<K, V> {
501    fn from(layer: Arc<dyn Layer<K, V>>) -> Self {
502        let event = layer.lock().unwrap();
503        Self(event, layer)
504    }
505}
506
507impl<K, V> std::ops::Deref for LockedLayer<K, V> {
508    type Target = Arc<dyn Layer<K, V>>;
509
510    fn deref(&self) -> &Self::Target {
511        &self.1
512    }
513}
514
515impl<K, V> AsRef<dyn Layer<K, V>> for LockedLayer<K, V> {
516    fn as_ref(&self) -> &(dyn Layer<K, V> + 'static) {
517        self.1.as_ref()
518    }
519}
520
521/// A LayerSet provides a snapshot of the layers at a particular point in time, and allows you to
522/// get an iterator.  Iterators borrow the layers so something needs to hold reference count.
523pub struct LayerSet<K, V> {
524    pub layers: Vec<LockedLayer<K, V>>,
525    merge_fn: merge::MergeFn<K, V>,
526    counters: Arc<TreeCounters>,
527}
528
529impl<K: Key + LayerKey + OrdLowerBound, V: Value> LayerSet<K, V> {
530    pub fn sum_len(&self) -> usize {
531        let mut size = 0;
532        for layer in &self.layers {
533            size += layer.len()
534        }
535        size
536    }
537
538    pub fn merger(&self) -> merge::Merger<'_, K, V> {
539        merge::Merger::new(
540            self.layers.iter().map(|x| x.as_ref()),
541            self.merge_fn,
542            self.counters.clone(),
543        )
544    }
545
546    /// See `Layer::key_exists`.
547    pub async fn key_exists(&self, key: &K) -> Result<Existence, Error> {
548        for l in &self.layers {
549            match l.key_exists(key).await? {
550                e @ (Existence::Exists | Existence::MaybeExists) => return Ok(e),
551                _ => {}
552            }
553        }
554        Ok(Existence::Missing)
555    }
556}
557
558impl<K, V> fmt::Debug for LayerSet<K, V> {
559    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
560        fmt.debug_list()
561            .entries(self.layers.iter().map(|l| {
562                if let Some(handle) = l.handle() {
563                    format!("{}", handle.object_id())
564                } else {
565                    format!("{:?}", Arc::as_ptr(l))
566                }
567            }))
568            .finish()
569    }
570}
571
572/// A yielder can be used during compactions which are low priority.
573pub trait Yielder: Send {
574    fn yield_now(&mut self) -> impl Future<Output = ()> + Send;
575}
576
577#[cfg(test)]
578mod tests {
579    use super::{LSMTree, Yielder, compact_with_iterator};
580    use crate::drop_event::DropEvent;
581    use crate::lsm_tree::cache::{
582        NullCache, ObjectCache, ObjectCachePlaceholder, ObjectCacheResult,
583    };
584    use crate::lsm_tree::merge::{MergeLayerIterator, MergeResult};
585    use crate::lsm_tree::types::{
586        BoxedLayerIterator, Existence, Item, ItemRef, Key, Layer, LayerIterator, Value,
587    };
588    use crate::lsm_tree::{Query, layers_from_handles};
589    use crate::object_handle::ObjectHandle;
590    use crate::serialized_types::{LATEST_VERSION, Version};
591    use crate::testing::fake_object::{FakeObject, FakeObjectHandle};
592    use crate::testing::writer::Writer;
593    use anyhow::{Error, anyhow};
594    use async_trait::async_trait;
595
596    use fuchsia_sync::Mutex;
597
598    use rand::rng;
599    use rand::seq::SliceRandom;
600
601    use std::sync::Arc;
602
603    use super::testing::TestKey;
604
605    fn emit_left_merge_fn(
606        _left: &MergeLayerIterator<'_, TestKey, u64>,
607        _right: &MergeLayerIterator<'_, TestKey, u64>,
608    ) -> MergeResult<TestKey, u64> {
609        MergeResult::EmitLeft
610    }
611
612    impl Value for u64 {
613        const DELETED_MARKER: Self = 0;
614    }
615
616    struct NoOpYielder;
617    impl Yielder for NoOpYielder {
618        async fn yield_now(&mut self) {}
619    }
620
621    #[fuchsia::test]
622    async fn test_iteration() {
623        let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
624        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
625        tree.insert(items[0].clone()).expect("insert error");
626        tree.insert(items[1].clone()).expect("insert error");
627        let layers = tree.layer_set();
628        let mut merger = layers.merger();
629        let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
630        let ItemRef { key, value, .. } = iter.get().expect("missing item");
631        assert_eq!((key, value), (&items[0].key, &items[0].value));
632        iter.advance().await.expect("advance failed");
633        let ItemRef { key, value, .. } = iter.get().expect("missing item");
634        assert_eq!((key, value), (&items[1].key, &items[1].value));
635        iter.advance().await.expect("advance failed");
636        assert!(iter.get().is_none());
637    }
638
639    #[fuchsia::test]
640    async fn test_compact() {
641        let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
642        let items = [
643            Item::new(TestKey(1..1), 1),
644            Item::new(TestKey(2..2), 2),
645            Item::new(TestKey(3..3), 3),
646            Item::new(TestKey(4..4), 4),
647        ];
648        tree.insert(items[0].clone()).expect("insert error");
649        tree.insert(items[1].clone()).expect("insert error");
650        tree.seal();
651        tree.insert(items[2].clone()).expect("insert error");
652        tree.insert(items[3].clone()).expect("insert error");
653        tree.seal();
654        let object = Arc::new(FakeObject::new());
655        let handle = FakeObjectHandle::new(object.clone());
656        {
657            let layer_set = tree.immutable_layer_set();
658            let mut merger = layer_set.merger();
659            let iter = merger.query(Query::FullScan).await.expect("create merger");
660            compact_with_iterator(
661                iter,
662                items.len(),
663                Writer::new(&handle).await,
664                handle.block_size(),
665                Option::<NoOpYielder>::None,
666            )
667            .await
668            .expect("compact failed");
669        }
670        tree.set_layers(layers_from_handles([handle]).await.expect("layers_from_handles failed"));
671        let handle = FakeObjectHandle::new(object.clone());
672        let tree = LSMTree::open(emit_left_merge_fn, [handle], Box::new(NullCache {}))
673            .await
674            .expect("open failed");
675
676        let layers = tree.layer_set();
677        let mut merger = layers.merger();
678        let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
679        for i in 1..5 {
680            let ItemRef { key, value, .. } = iter.get().expect("missing item");
681            assert_eq!((key, value), (&TestKey(i..i), &i));
682            iter.advance().await.expect("advance failed");
683        }
684        assert!(iter.get().is_none());
685    }
686
687    #[fuchsia::test]
688    async fn test_find() {
689        let items = [
690            Item::new(TestKey(1..1), 1),
691            Item::new(TestKey(2..2), 2),
692            Item::new(TestKey(3..3), 3),
693            Item::new(TestKey(4..4), 4),
694        ];
695        let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
696        tree.insert(items[0].clone()).expect("insert error");
697        tree.insert(items[1].clone()).expect("insert error");
698        tree.seal();
699        tree.insert(items[2].clone()).expect("insert error");
700        tree.insert(items[3].clone()).expect("insert error");
701
702        let item = tree.find(&items[1].key).await.expect("find failed").expect("not found");
703        assert_eq!(item, items[1]);
704        assert!(tree.find(&TestKey(100..100)).await.expect("find failed").is_none());
705    }
706
707    #[fuchsia::test]
708    async fn test_find_no_return_deleted_values() {
709        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), u64::DELETED_MARKER)];
710        let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
711        tree.insert(items[0].clone()).expect("insert error");
712        tree.insert(items[1].clone()).expect("insert error");
713
714        let item = tree.find(&items[0].key).await.expect("find failed").expect("not found");
715        assert_eq!(item, items[0]);
716        assert!(tree.find(&items[1].key).await.expect("find failed").is_none());
717    }
718
719    #[fuchsia::test]
720    async fn test_empty_seal() {
721        let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
722        tree.seal();
723        let item = Item::new(TestKey(1..1), 1);
724        tree.insert(item.clone()).expect("insert error");
725        let object = Arc::new(FakeObject::new());
726        let handle = FakeObjectHandle::new(object.clone());
727        {
728            let layer_set = tree.immutable_layer_set();
729            let mut merger = layer_set.merger();
730            let iter = merger.query(Query::FullScan).await.expect("create merger");
731            compact_with_iterator(
732                iter,
733                0,
734                Writer::new(&handle).await,
735                handle.block_size(),
736                Option::<NoOpYielder>::None,
737            )
738            .await
739            .expect("compact failed");
740        }
741        tree.set_layers(layers_from_handles([handle]).await.expect("layers_from_handles failed"));
742        let found_item = tree.find(&item.key).await.expect("find failed").expect("not found");
743        assert_eq!(found_item, item);
744        assert!(tree.find(&TestKey(2..2)).await.expect("find failed").is_none());
745    }
746
747    #[fuchsia::test]
748    async fn test_filter() {
749        let items = [
750            Item::new(TestKey(1..1), 1),
751            Item::new(TestKey(2..2), 2),
752            Item::new(TestKey(3..3), 3),
753            Item::new(TestKey(4..4), 4),
754        ];
755        let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
756        tree.insert(items[0].clone()).expect("insert error");
757        tree.insert(items[1].clone()).expect("insert error");
758        tree.insert(items[2].clone()).expect("insert error");
759        tree.insert(items[3].clone()).expect("insert error");
760
761        let layers = tree.layer_set();
762        let mut merger = layers.merger();
763
764        // Filter out odd keys (which also guarantees we skip the first key which is an edge case).
765        let mut iter = merger
766            .query(Query::FullScan)
767            .await
768            .expect("seek failed")
769            .filter(|item: ItemRef<'_, TestKey, u64>| item.key.0.start % 2 == 0)
770            .await
771            .expect("filter failed");
772
773        assert_eq!(iter.get(), Some(items[1].as_item_ref()));
774        iter.advance().await.expect("advance failed");
775        assert_eq!(iter.get(), Some(items[3].as_item_ref()));
776        iter.advance().await.expect("advance failed");
777        assert!(iter.get().is_none());
778    }
779
780    #[fuchsia::test]
781    async fn test_insert_order_agnostic() {
782        let items = [
783            Item::new(TestKey(1..1), 1),
784            Item::new(TestKey(2..2), 2),
785            Item::new(TestKey(3..3), 3),
786            Item::new(TestKey(4..4), 4),
787            Item::new(TestKey(5..5), 5),
788            Item::new(TestKey(6..6), 6),
789        ];
790        let a = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
791        for item in &items {
792            a.insert(item.clone()).expect("insert error");
793        }
794        let b = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
795        let mut shuffled = items.clone();
796        shuffled.shuffle(&mut rng());
797        for item in &shuffled {
798            b.insert(item.clone()).expect("insert error");
799        }
800        let layers = a.layer_set();
801        let mut merger = layers.merger();
802        let mut iter_a = merger.query(Query::FullScan).await.expect("seek failed");
803        let layers = b.layer_set();
804        let mut merger = layers.merger();
805        let mut iter_b = merger.query(Query::FullScan).await.expect("seek failed");
806
807        for item in items {
808            assert_eq!(Some(item.as_item_ref()), iter_a.get());
809            assert_eq!(Some(item.as_item_ref()), iter_b.get());
810            iter_a.advance().await.expect("advance failed");
811            iter_b.advance().await.expect("advance failed");
812        }
813        assert!(iter_a.get().is_none());
814        assert!(iter_b.get().is_none());
815    }
816
817    struct AuditCacheInner<'a, V: Value> {
818        lookups: u64,
819        completions: u64,
820        invalidations: u64,
821        drops: u64,
822        result: Option<ObjectCacheResult<'a, V>>,
823    }
824
825    impl<V: Value> AuditCacheInner<'_, V> {
826        fn stats(&self) -> (u64, u64, u64, u64) {
827            (self.lookups, self.completions, self.invalidations, self.drops)
828        }
829    }
830
831    struct AuditCache<'a, V: Value> {
832        inner: Arc<Mutex<AuditCacheInner<'a, V>>>,
833    }
834
835    impl<V: Value> AuditCache<'_, V> {
836        fn new() -> Self {
837            Self {
838                inner: Arc::new(Mutex::new(AuditCacheInner {
839                    lookups: 0,
840                    completions: 0,
841                    invalidations: 0,
842                    drops: 0,
843                    result: None,
844                })),
845            }
846        }
847    }
848
849    struct AuditPlaceholder<'a, V: Value> {
850        inner: Arc<Mutex<AuditCacheInner<'a, V>>>,
851        completed: Mutex<bool>,
852    }
853
854    impl<V: Value> ObjectCachePlaceholder<V> for AuditPlaceholder<'_, V> {
855        fn complete(self: Box<Self>, _: Option<&V>) {
856            self.inner.lock().completions += 1;
857            *self.completed.lock() = true;
858        }
859    }
860
861    impl<V: Value> Drop for AuditPlaceholder<'_, V> {
862        fn drop(&mut self) {
863            if !*self.completed.lock() {
864                self.inner.lock().drops += 1;
865            }
866        }
867    }
868
869    impl<K: Key + std::cmp::PartialEq, V: Value> ObjectCache<K, V> for AuditCache<'_, V> {
870        fn lookup_or_reserve(&self, _key: &K) -> ObjectCacheResult<'_, V> {
871            {
872                let mut inner = self.inner.lock();
873                inner.lookups += 1;
874                if inner.result.is_some() {
875                    return std::mem::take(&mut inner.result).unwrap();
876                }
877            }
878            ObjectCacheResult::Placeholder(Box::new(AuditPlaceholder {
879                inner: self.inner.clone(),
880                completed: Mutex::new(false),
881            }))
882        }
883
884        fn invalidate(&self, _key: K, _value: Option<V>) {
885            self.inner.lock().invalidations += 1;
886        }
887    }
888
889    #[fuchsia::test]
890    async fn test_cache_handling() {
891        let item = Item::new(TestKey(1..1), 1);
892        let cache = Box::new(AuditCache::new());
893        let inner = cache.inner.clone();
894        let a = LSMTree::new(emit_left_merge_fn, cache);
895
896        // Zero counters.
897        assert_eq!(inner.lock().stats(), (0, 0, 0, 0));
898
899        // Look for an item, but don't find it. So no insertion. It is dropped.
900        assert!(a.find(&item.key).await.expect("Failed find").is_none());
901        assert_eq!(inner.lock().stats(), (1, 0, 0, 1));
902
903        // Insert attempts to invalidate.
904        let _ = a.insert(item.clone());
905        assert_eq!(inner.lock().stats(), (1, 0, 1, 1));
906
907        // Look for item, find it and insert into the cache.
908        assert_eq!(
909            a.find(&item.key).await.expect("Failed find").expect("Item should be found.").value,
910            item.value
911        );
912        assert_eq!(inner.lock().stats(), (2, 1, 1, 1));
913
914        // Insert or replace attempts to invalidate as well.
915        a.replace_or_insert(item.clone());
916        assert_eq!(inner.lock().stats(), (2, 1, 2, 1));
917    }
918
919    #[fuchsia::test]
920    async fn test_cache_hit() {
921        let item = Item::new(TestKey(1..1), 1);
922        let cache = Box::new(AuditCache::new());
923        let inner = cache.inner.clone();
924        let a = LSMTree::new(emit_left_merge_fn, cache);
925
926        // Zero counters.
927        assert_eq!(inner.lock().stats(), (0, 0, 0, 0));
928
929        // Insert attempts to invalidate.
930        let _ = a.insert(item.clone());
931        assert_eq!(inner.lock().stats(), (0, 0, 1, 0));
932
933        // Set up the item to find in the cache.
934        inner.lock().result = Some(ObjectCacheResult::Value(item.value.clone()));
935
936        // Look for item, find it in cache, so no insert.
937        assert_eq!(
938            a.find(&item.key).await.expect("Failed find").expect("Item should be found.").value,
939            item.value
940        );
941        assert_eq!(inner.lock().stats(), (1, 0, 1, 0));
942    }
943
944    #[fuchsia::test]
945    async fn test_cache_says_uncacheable() {
946        let item = Item::new(TestKey(1..1), 1);
947        let cache = Box::new(AuditCache::new());
948        let inner = cache.inner.clone();
949        let a = LSMTree::new(emit_left_merge_fn, cache);
950        let _ = a.insert(item.clone());
951
952        // One invalidation from the insert.
953        assert_eq!(inner.lock().stats(), (0, 0, 1, 0));
954
955        // Set up the NoCache response to find in the cache.
956        inner.lock().result = Some(ObjectCacheResult::NoCache);
957
958        // Look for item, it is uncacheable, so no insert.
959        assert_eq!(
960            a.find(&item.key).await.expect("Failed find").expect("Should find item").value,
961            item.value
962        );
963        assert_eq!(inner.lock().stats(), (1, 0, 1, 0));
964    }
965
966    struct FailLayer {
967        drop_event: Mutex<Option<Arc<DropEvent>>>,
968    }
969
970    impl FailLayer {
971        fn new() -> Self {
972            Self { drop_event: Mutex::new(Some(Arc::new(DropEvent::new()))) }
973        }
974    }
975
976    #[async_trait]
977    impl<K: Key, V: Value> Layer<K, V> for FailLayer {
978        async fn seek(
979            &self,
980            _bound: std::ops::Bound<&K>,
981        ) -> Result<BoxedLayerIterator<'_, K, V>, Error> {
982            Err(anyhow!("Purposely failed seek"))
983        }
984
985        fn lock(&self) -> Option<Arc<DropEvent>> {
986            self.drop_event.lock().clone()
987        }
988
989        fn len(&self) -> usize {
990            0
991        }
992
993        async fn close(&self) {
994            let listener = match std::mem::replace(&mut (*self.drop_event.lock()), None) {
995                Some(drop_event) => drop_event.listen(),
996                None => return,
997            };
998            listener.await;
999        }
1000
1001        fn get_version(&self) -> Version {
1002            LATEST_VERSION
1003        }
1004
1005        async fn key_exists(&self, _key: &K) -> Result<Existence, Error> {
1006            unimplemented!();
1007        }
1008    }
1009
1010    struct MockLayer {
1011        exists_result: Existence,
1012        drop_event: Mutex<Option<Arc<DropEvent>>>,
1013    }
1014
1015    impl MockLayer {
1016        fn new(exists_result: Existence) -> Self {
1017            Self { exists_result, drop_event: Mutex::new(Some(Arc::new(DropEvent::new()))) }
1018        }
1019    }
1020
1021    #[async_trait]
1022    impl<K: Key, V: Value> Layer<K, V> for MockLayer {
1023        async fn seek(
1024            &self,
1025            _bound: std::ops::Bound<&K>,
1026        ) -> Result<BoxedLayerIterator<'_, K, V>, Error> {
1027            unimplemented!()
1028        }
1029
1030        fn lock(&self) -> Option<Arc<DropEvent>> {
1031            self.drop_event.lock().clone()
1032        }
1033
1034        fn len(&self) -> usize {
1035            0
1036        }
1037
1038        async fn close(&self) {
1039            let listener = match std::mem::replace(&mut (*self.drop_event.lock()), None) {
1040                Some(drop_event) => drop_event.listen(),
1041                None => return,
1042            };
1043            listener.await;
1044        }
1045
1046        fn get_version(&self) -> Version {
1047            LATEST_VERSION
1048        }
1049
1050        async fn key_exists(&self, _key: &K) -> Result<Existence, Error> {
1051            Ok(self.exists_result)
1052        }
1053    }
1054
1055    #[fuchsia::test]
1056    async fn test_layer_set_key_exists() {
1057        use super::LockedLayer;
1058
1059        let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
1060        let mut layer_set = tree.empty_layer_set();
1061
1062        // Empty layer set should return Missing.
1063        assert_eq!(
1064            layer_set.key_exists(&TestKey(0..1)).await.expect("key_exists failed"),
1065            Existence::Missing
1066        );
1067
1068        // Add a layer that returns Missing.
1069        layer_set.layers.push(LockedLayer::from(
1070            Arc::new(MockLayer::new(Existence::Missing)) as Arc<dyn Layer<TestKey, u64>>
1071        ));
1072        assert_eq!(
1073            layer_set.key_exists(&TestKey(0..1)).await.expect("key_exists failed"),
1074            Existence::Missing
1075        );
1076
1077        // Add a layer that returns MaybeExists.
1078        layer_set.layers.push(LockedLayer::from(
1079            Arc::new(MockLayer::new(Existence::MaybeExists)) as Arc<dyn Layer<TestKey, u64>>
1080        ));
1081        assert_eq!(
1082            layer_set.key_exists(&TestKey(0..1)).await.expect("key_exists failed"),
1083            Existence::MaybeExists
1084        );
1085
1086        // Add a layer that returns Exists.
1087        layer_set.layers.insert(
1088            0,
1089            LockedLayer::from(
1090                Arc::new(MockLayer::new(Existence::Exists)) as Arc<dyn Layer<TestKey, u64>>
1091            ),
1092        );
1093        assert_eq!(
1094            layer_set.key_exists(&TestKey(0..1)).await.expect("key_exists failed"),
1095            Existence::Exists
1096        );
1097    }
1098
1099    #[fuchsia::test]
1100    async fn test_failed_lookup() {
1101        let cache = Box::new(AuditCache::new());
1102        let inner = cache.inner.clone();
1103        let a = LSMTree::new(emit_left_merge_fn, cache);
1104        a.set_layers(vec![Arc::new(FailLayer::new())]);
1105
1106        // Zero counters.
1107        assert_eq!(inner.lock().stats(), (0, 0, 0, 0));
1108
1109        // Lookup should fail and drop the placeholder.
1110        assert!(a.find(&TestKey(1..1)).await.is_err());
1111        assert_eq!(inner.lock().stats(), (1, 0, 0, 1));
1112    }
1113}
1114
1115#[cfg(fuzz)]
1116mod fuzz {
1117    use crate::lsm_tree::types::{Item, Value};
1118    use crate::serialized_types::{
1119        LATEST_VERSION, Version, Versioned, VersionedLatest, versioned_type,
1120    };
1121    use arbitrary::Arbitrary;
1122
1123    use fuzz::fuzz;
1124
1125    use super::testing::TestKey;
1126
1127    impl Versioned for u64 {}
1128    versioned_type! { 1.. => u64 }
1129
1130    impl Value for u64 {
1131        const DELETED_MARKER: Self = 0;
1132    }
1133
1134    // Note: This code isn't really dead. it's used below in
1135    // `fuzz_lsm_tree_action`. However, the `#[fuzz]` proc macro attribute
1136    // obfuscates the usage enough to confuse the compiler.
1137    #[allow(dead_code)]
1138    #[derive(Arbitrary)]
1139    enum FuzzAction {
1140        Insert(Item<TestKey, u64>),
1141        ReplaceOrInsert(Item<TestKey, u64>),
1142        MergeInto(Item<TestKey, u64>, TestKey),
1143        Find(TestKey),
1144        Seal,
1145    }
1146
1147    #[fuzz]
1148    fn fuzz_lsm_tree_actions(actions: Vec<FuzzAction>) {
1149        use super::LSMTree;
1150        use super::cache::NullCache;
1151        use crate::lsm_tree::merge::{MergeLayerIterator, MergeResult};
1152        use futures::executor::block_on;
1153
1154        fn emit_left_merge_fn(
1155            _left: &MergeLayerIterator<'_, TestKey, u64>,
1156            _right: &MergeLayerIterator<'_, TestKey, u64>,
1157        ) -> MergeResult<TestKey, u64> {
1158            MergeResult::EmitLeft
1159        }
1160
1161        let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
1162        for action in actions {
1163            match action {
1164                FuzzAction::Insert(item) => {
1165                    let _ = tree.insert(item);
1166                }
1167                FuzzAction::ReplaceOrInsert(item) => {
1168                    tree.replace_or_insert(item);
1169                }
1170                FuzzAction::Find(key) => {
1171                    block_on(tree.find(&key)).expect("find failed");
1172                }
1173                FuzzAction::MergeInto(item, bound) => tree.merge_into(item, &bound),
1174                FuzzAction::Seal => tree.seal(),
1175            };
1176        }
1177    }
1178}