Skip to main content

fxfs/
lsm_tree.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5mod bloom_filter;
6pub mod cache;
7pub mod merge;
8pub mod persistent_layer;
9pub mod skip_list_layer;
10pub mod types;
11
12use crate::drop_event::DropEvent;
13use crate::log::*;
14use crate::metrics::DurationMeasureScope;
15use crate::object_handle::{ReadObjectHandle, WriteBytes};
16use crate::serialized_types::{LATEST_VERSION, Version};
17
18use anyhow::Error;
19use cache::{ObjectCache, ObjectCacheResult};
20
21use fuchsia_inspect::HistogramProperty;
22use fuchsia_sync::RwLock;
23use persistent_layer::{PersistentLayer, PersistentLayerWriter};
24use skip_list_layer::SkipListLayer;
25use std::fmt;
26use std::sync::atomic::{AtomicUsize, Ordering};
27use std::sync::{Arc, Mutex};
28use types::{
29    Existence, Item, ItemRef, Key, Layer, LayerIterator, LayerKey, LayerWriter, MergeableKey,
30    OrdLowerBound, Value,
31};
32
33pub use merge::Query;
34
35const SKIP_LIST_LAYER_ITEMS: usize = 512;
36
37// For serialization.
38pub use persistent_layer::{
39    LayerHeader as PersistentLayerHeader, LayerHeaderV39 as PersistentLayerHeaderV39,
40    LayerInfo as PersistentLayerInfo, LayerInfoV39 as PersistentLayerInfoV39,
41};
42
43pub async fn layers_from_handles<K: Key, V: Value>(
44    handles: impl IntoIterator<Item = impl ReadObjectHandle + 'static>,
45) -> Result<Vec<Arc<dyn Layer<K, V>>>, Error> {
46    let mut layers = Vec::new();
47    for handle in handles {
48        layers.push(PersistentLayer::open(handle).await? as Arc<dyn Layer<K, V>>);
49    }
50    Ok(layers)
51}
52
53#[derive(Eq, PartialEq, Debug)]
54pub enum Operation {
55    Insert,
56    ReplaceOrInsert,
57    MergeInto,
58}
59
60pub type MutationCallback<K, V> = Option<Box<dyn Fn(Operation, &Item<K, V>) + Send + Sync>>;
61
62struct Inner<K, V> {
63    mutable_layer: Arc<SkipListLayer<K, V>>,
64    layers: Vec<Arc<dyn Layer<K, V>>>,
65    mutation_callback: MutationCallback<K, V>,
66}
67
68pub const LOG2_HISTOGRAM_BUCKETS: usize = 32;
69
70/// Metrics related to LSM tree churn, layer depths, and compaction performance.
71pub struct CompactionCounters {
72    /// Total number of compaction events that merged layers together.
73    pub compactions: u64,
74    /// Total bytes written during compaction, useful for measuring write amplification.
75    pub compaction_bytes_written: u64,
76    /// Total duration spent compacting layers.
77    pub compaction_time_ns: u64,
78    /// Number of mutable layers sealed. Useful for measuring churn.
79    pub total_layers_added: u64,
80    /// The maximum depth of the LSM tree observed. An indicator of worst-case read amplification.
81    pub max_layer_count: u64,
82    /// Log2 histogram of the sizes of newly compacted layers, used to verify compaction heuristics.
83    pub layer_size_histogram: [u64; LOG2_HISTOGRAM_BUCKETS],
84}
85
86impl Default for CompactionCounters {
87    fn default() -> Self {
88        Self {
89            compactions: 0,
90            compaction_bytes_written: 0,
91            compaction_time_ns: 0,
92            total_layers_added: 0,
93            max_layer_count: 0,
94            layer_size_histogram: [0; LOG2_HISTOGRAM_BUCKETS],
95        }
96    }
97}
98
99/// Global counters and metrics for an LSM tree's lifetime.
100pub struct TreeCounters {
101    /// Number of individual key-lookup attempts (reads) through the tree.
102    pub num_seeks: AtomicUsize,
103    /// Tracks the number of layer files we might have looked at across all seeks.
104    /// Used alongside `layer_files_skipped` to compute the effectiveness of bloom filters.
105    pub layer_files_total: AtomicUsize,
106    /// Tracks how many layer files we skipped searching thanks to the bloom filter rejecting them.
107    pub layer_files_skipped: AtomicUsize,
108    /// Embedded counters for mutable metrics that require locking.
109    pub compaction: Mutex<CompactionCounters>,
110}
111
112impl Default for TreeCounters {
113    fn default() -> Self {
114        Self {
115            num_seeks: AtomicUsize::new(0),
116            layer_files_total: AtomicUsize::new(0),
117            layer_files_skipped: AtomicUsize::new(0),
118            compaction: Mutex::new(CompactionCounters::default()),
119        }
120    }
121}
122
123/// Writes the items yielded by the iterator into the supplied object.
124#[fxfs_trace::trace]
125pub async fn compact_with_iterator<K: Key, V: Value, W: WriteBytes + Send>(
126    mut iterator: impl LayerIterator<K, V>,
127    num_items: usize,
128    writer: W,
129    block_size: u64,
130    mut yielder: Option<impl Yielder>,
131) -> Result<u64, Error> {
132    let mut writer = PersistentLayerWriter::<W, K, V>::new(writer, num_items, block_size).await?;
133    while let Some(item_ref) = iterator.get() {
134        debug!(item_ref:?; "compact: writing");
135        writer.write(item_ref).await?;
136        iterator.advance().await?;
137        if let Some(y) = yielder.as_mut() {
138            y.yield_now().await;
139        }
140    }
141    writer.flush().await?;
142
143    Ok(writer.bytes_written())
144}
145
146/// LSMTree manages a tree of layers to provide a key/value store.  Each layer contains deltas on
147/// the preceding layer.  The top layer is an in-memory mutable layer.  Layers can be compacted to
148/// form a new combined layer.
149pub struct LSMTree<K, V> {
150    data: RwLock<Inner<K, V>>,
151    merge_fn: merge::MergeFn<K, V>,
152    cache: Box<dyn ObjectCache<K, V>>,
153    counters: Arc<TreeCounters>,
154}
155
156#[fxfs_trace::trace]
157impl<'tree, K: MergeableKey, V: Value> LSMTree<K, V> {
158    /// Creates a new empty tree.
159    pub fn new(merge_fn: merge::MergeFn<K, V>, cache: Box<dyn ObjectCache<K, V>>) -> Self {
160        let counters = TreeCounters::default();
161        counters.compaction.lock().unwrap().max_layer_count = 1;
162        LSMTree {
163            data: RwLock::new(Inner {
164                mutable_layer: Self::new_mutable_layer(),
165                layers: Vec::new(),
166                mutation_callback: None,
167            }),
168            merge_fn,
169            cache,
170            counters: Arc::new(counters),
171        }
172    }
173
174    /// Opens an existing tree from the provided handles to the layer objects.
175    pub async fn open(
176        merge_fn: merge::MergeFn<K, V>,
177        handles: impl IntoIterator<Item = impl ReadObjectHandle + 'static>,
178        cache: Box<dyn ObjectCache<K, V>>,
179    ) -> Result<Self, Error> {
180        let layers = layers_from_handles(handles).await?;
181        let max_layer_count = layers.len() as u64 + 1;
182        let counters = TreeCounters::default();
183        counters.compaction.lock().unwrap().max_layer_count = max_layer_count;
184        Ok(LSMTree {
185            data: RwLock::new(Inner {
186                mutable_layer: Self::new_mutable_layer(),
187                layers,
188                mutation_callback: None,
189            }),
190            merge_fn,
191            cache,
192            counters: Arc::new(counters),
193        })
194    }
195
196    /// Replaces the immutable layers.
197    pub fn set_layers(&self, layers: Vec<Arc<dyn Layer<K, V>>>) {
198        let mut data = self.data.write();
199        data.layers = layers;
200        let layer_count = data.layers.len() + 1;
201        let mut counters = self.counters.compaction.lock().unwrap();
202        counters.max_layer_count = std::cmp::max(counters.max_layer_count, layer_count as u64);
203    }
204
205    /// Appends to the given layers at the end i.e. they should be base layers.  This is supposed
206    /// to be used after replay when we are opening a tree and we have discovered the base layers.
207    pub async fn append_layers(
208        &self,
209        handles: impl IntoIterator<Item = impl ReadObjectHandle + 'static>,
210    ) -> Result<(), Error> {
211        let mut layers = layers_from_handles(handles).await?;
212        let mut data = self.data.write();
213        data.layers.append(&mut layers);
214        let layer_count = data.layers.len() + 1;
215        let mut counters = self.counters.compaction.lock().unwrap();
216        counters.max_layer_count = std::cmp::max(counters.max_layer_count, layer_count as u64);
217        Ok(())
218    }
219
220    /// Resets the immutable layers.
221    pub fn reset_immutable_layers(&self) {
222        self.data.write().layers = Vec::new();
223    }
224
225    /// Seals the current mutable layer and creates a new one.
226    pub fn seal(&self) {
227        // We need to be sure there are no mutations currently in-progress.  This is currently
228        // guaranteed by ensuring that all mutations take a read lock on `data`.
229        let mut data = self.data.write();
230        let layer = std::mem::replace(&mut data.mutable_layer, Self::new_mutable_layer());
231        data.layers.insert(0, layer);
232        let layer_count = data.layers.len() + 1;
233        let mut counters = self.counters.compaction.lock().unwrap();
234        counters.max_layer_count = std::cmp::max(counters.max_layer_count, layer_count as u64);
235        counters.total_layers_added += 1;
236    }
237
238    /// Resets the tree to an empty state.
239    pub fn reset(&self) {
240        let mut data = self.data.write();
241        data.layers = Vec::new();
242        data.mutable_layer = Self::new_mutable_layer();
243    }
244
245    pub fn report_compaction_metrics(
246        &self,
247        bytes_written: u64,
248        duration: std::time::Duration,
249        layer_count: usize,
250    ) {
251        let mut counters = self.counters.compaction.lock().unwrap();
252        counters.compactions += 1;
253        counters.compaction_bytes_written += bytes_written;
254        counters.compaction_time_ns += duration.as_nanos() as u64;
255
256        let bucket = if bytes_written == 0 {
257            0
258        } else {
259            std::cmp::min(LOG2_HISTOGRAM_BUCKETS - 1, 63 - bytes_written.leading_zeros() as usize)
260        };
261        counters.layer_size_histogram[bucket] += 1;
262
263        crate::metrics::lsm_tree_metrics().compaction_layer_stack_depth.insert(layer_count as u64);
264    }
265
266    pub fn compaction_bytes_written(&self) -> u64 {
267        self.counters.compaction.lock().unwrap().compaction_bytes_written
268    }
269
270    /// Returns an empty layer-set for this tree.
271    pub fn empty_layer_set(&self) -> LayerSet<K, V> {
272        LayerSet { layers: Vec::new(), merge_fn: self.merge_fn, counters: self.counters.clone() }
273    }
274
275    /// Adds all the layers (including the mutable layer) to `layer_set`.
276    pub fn add_all_layers_to_layer_set(&self, layer_set: &mut LayerSet<K, V>) {
277        let data = self.data.read();
278        layer_set.layers.reserve_exact(data.layers.len() + 1);
279        layer_set
280            .layers
281            .push(LockedLayer::from(data.mutable_layer.clone() as Arc<dyn Layer<K, V>>));
282        for layer in &data.layers {
283            layer_set.layers.push(layer.clone().into());
284        }
285    }
286
287    /// Returns a clone of the current set of layers (including the mutable layer), after which one
288    /// can get an iterator.
289    pub fn layer_set(&self) -> LayerSet<K, V> {
290        let mut layer_set = self.empty_layer_set();
291        self.add_all_layers_to_layer_set(&mut layer_set);
292        layer_set
293    }
294
295    /// Returns the current set of immutable layers after which one can get an iterator (for e.g.
296    /// compacting).  Since these layers are immutable, getting an iterator should not block
297    /// anything else.
298    pub fn immutable_layer_set(&self) -> LayerSet<K, V> {
299        let data = self.data.read();
300        let mut layers = Vec::with_capacity(data.layers.len());
301        for layer in &data.layers {
302            layers.push(layer.clone().into());
303        }
304        LayerSet { layers, merge_fn: self.merge_fn, counters: self.counters.clone() }
305    }
306
307    /// Inserts an item into the mutable layer.
308    /// Returns error if item already exists.
309    pub fn insert(&self, item: Item<K, V>) -> Result<(), Error> {
310        let _measure = DurationMeasureScope::new(&crate::metrics::lsm_tree_metrics().insert);
311
312        let key = item.key.clone();
313        let val = if item.value == V::DELETED_MARKER { None } else { Some(item.value.clone()) };
314        {
315            // `seal` below relies on us holding a read lock whilst we do the mutation.
316            let data = self.data.read();
317            if let Some(mutation_callback) = data.mutation_callback.as_ref() {
318                mutation_callback(Operation::Insert, &item);
319            }
320            data.mutable_layer.insert(item)?;
321        }
322        self.cache.invalidate(key, val);
323        Ok(())
324    }
325
326    /// Replaces or inserts an item into the mutable layer.
327    pub fn replace_or_insert(&self, item: Item<K, V>) {
328        let _measure =
329            DurationMeasureScope::new(&crate::metrics::lsm_tree_metrics().replace_or_insert);
330
331        let key = item.key.clone();
332        let val = if item.value == V::DELETED_MARKER { None } else { Some(item.value.clone()) };
333        {
334            // `seal` below relies on us holding a read lock whilst we do the mutation.
335            let data = self.data.read();
336            if let Some(mutation_callback) = data.mutation_callback.as_ref() {
337                mutation_callback(Operation::ReplaceOrInsert, &item);
338            }
339            data.mutable_layer.replace_or_insert(item);
340        }
341        self.cache.invalidate(key, val);
342    }
343
344    /// Merges the given item into the mutable layer.
345    pub fn merge_into(&self, item: Item<K, V>, lower_bound: &K) {
346        let _measure = DurationMeasureScope::new(&crate::metrics::lsm_tree_metrics().merge_into);
347
348        let key = item.key.clone();
349        {
350            // `seal` below relies on us holding a read lock whilst we do the mutation.
351            let data = self.data.read();
352            if let Some(mutation_callback) = data.mutation_callback.as_ref() {
353                mutation_callback(Operation::MergeInto, &item);
354            }
355            data.mutable_layer.merge_into(item, lower_bound, self.merge_fn);
356        }
357        self.cache.invalidate(key, None);
358    }
359
360    /// Searches for an exact match for the given key. If the value is equal to
361    /// `Value::DELETED_MARKER` the item is considered missing and will not be returned.
362    pub async fn find(&self, search_key: &K) -> Result<Option<Item<K, V>>, Error>
363    where
364        K: Eq,
365    {
366        let _measure = DurationMeasureScope::new(&crate::metrics::lsm_tree_metrics().find);
367        // It is important that the cache lookup is done prior to fetching the layer set as the
368        // placeholder returned acts as a sort of lock for the validity of the item that may be
369        // inserted later via that placeholder.
370        let token = match self.cache.lookup_or_reserve(search_key) {
371            ObjectCacheResult::Value(value) => {
372                if value == V::DELETED_MARKER {
373                    return Ok(None);
374                } else {
375                    return Ok(Some(Item::new(search_key.clone(), value)));
376                }
377            }
378            ObjectCacheResult::Placeholder(token) => Some(token),
379            ObjectCacheResult::NoCache => None,
380        };
381        let layer_set = self.layer_set();
382        let mut merger = layer_set.merger();
383
384        Ok(match merger.query(Query::Point(search_key)).await?.get() {
385            Some(ItemRef { key, value, sequence })
386                if key == search_key && *value != V::DELETED_MARKER =>
387            {
388                if let Some(token) = token {
389                    token.complete(Some(value));
390                }
391                Some(Item { key: key.clone(), value: value.clone(), sequence })
392            }
393            _ => None,
394        })
395    }
396
397    pub fn mutable_layer(&self) -> Arc<SkipListLayer<K, V>> {
398        self.data.read().mutable_layer.clone()
399    }
400
401    /// Sets a mutation callback which is a callback that is triggered whenever any mutations are
402    /// applied to the tree.  This might be useful for tests that want to record the precise
403    /// sequence of mutations that are applied to the tree.
404    pub fn set_mutation_callback(&self, mutation_callback: MutationCallback<K, V>) {
405        self.data.write().mutation_callback = mutation_callback;
406    }
407
408    /// Returns the earliest version used by a layer in the tree.
409    pub fn get_earliest_version(&self) -> Version {
410        let mut earliest_version = LATEST_VERSION;
411        for layer in self.layer_set().layers {
412            let layer_version = layer.get_version();
413            if layer_version < earliest_version {
414                earliest_version = layer_version;
415            }
416        }
417        return earliest_version;
418    }
419
420    /// Returns a new mutable layer.
421    pub fn new_mutable_layer() -> Arc<SkipListLayer<K, V>> {
422        SkipListLayer::new(SKIP_LIST_LAYER_ITEMS)
423    }
424
425    /// Replaces the mutable layer.
426    pub fn set_mutable_layer(&self, layer: Arc<SkipListLayer<K, V>>) {
427        self.data.write().mutable_layer = layer;
428    }
429
430    /// Records inspect data for the LSM tree into `node`.  Called lazily when inspect is queried.
431    pub fn record_inspect_data(&self, root: &fuchsia_inspect::Node) {
432        let layer_set = self.layer_set();
433        root.record_child("layers", move |node| {
434            let mut index = 0;
435            for layer in layer_set.layers {
436                node.record_child(format!("{index}"), move |node| {
437                    layer.1.record_inspect_data(node)
438                });
439                index += 1;
440            }
441        });
442        {
443            let counters = self.counters.compaction.lock().unwrap();
444            root.record_uint("num_seeks", self.counters.num_seeks.load(Ordering::Relaxed) as u64);
445            root.record_uint("bloom_filter_success_percent", {
446                let layer_files_total = self.counters.layer_files_total.load(Ordering::Relaxed);
447                let layer_files_skipped = self.counters.layer_files_skipped.load(Ordering::Relaxed);
448                if layer_files_total == 0 {
449                    0
450                } else {
451                    (layer_files_skipped * 100).div_ceil(layer_files_total) as u64
452                }
453            });
454            root.record_uint("compactions", counters.compactions);
455            root.record_uint("compaction_bytes_written", counters.compaction_bytes_written);
456            root.record_uint("compaction_time_ns", counters.compaction_time_ns);
457            root.record_uint("total_layers_added", counters.total_layers_added);
458            root.record_uint("max_layer_count", counters.max_layer_count);
459
460            let layer_sizes = root.create_uint_exponential_histogram(
461                "layer_size_histogram_log2",
462                fuchsia_inspect::ExponentialHistogramParams {
463                    floor: 1,
464                    initial_step: 1,
465                    step_multiplier: 2,
466                    buckets: LOG2_HISTOGRAM_BUCKETS,
467                },
468            );
469            for (i, count) in counters.layer_size_histogram.iter().enumerate() {
470                layer_sizes.insert_multiple(1u64 << i, *count as usize);
471            }
472            root.record(layer_sizes);
473        }
474    }
475}
476
477/// This is an RAII wrapper for a layer which holds a lock on the layer (via the Layer::lock
478/// method).
479pub struct LockedLayer<K, V>(Arc<DropEvent>, Arc<dyn Layer<K, V>>);
480
481impl<K, V> LockedLayer<K, V> {
482    pub async fn close_layer(self) {
483        let layer = self.1;
484        std::mem::drop(self.0);
485        layer.close().await;
486    }
487}
488
489impl<K, V> From<Arc<dyn Layer<K, V>>> for LockedLayer<K, V> {
490    fn from(layer: Arc<dyn Layer<K, V>>) -> Self {
491        let event = layer.lock().unwrap();
492        Self(event, layer)
493    }
494}
495
496impl<K, V> std::ops::Deref for LockedLayer<K, V> {
497    type Target = Arc<dyn Layer<K, V>>;
498
499    fn deref(&self) -> &Self::Target {
500        &self.1
501    }
502}
503
504impl<K, V> AsRef<dyn Layer<K, V>> for LockedLayer<K, V> {
505    fn as_ref(&self) -> &(dyn Layer<K, V> + 'static) {
506        self.1.as_ref()
507    }
508}
509
510/// A LayerSet provides a snapshot of the layers at a particular point in time, and allows you to
511/// get an iterator.  Iterators borrow the layers so something needs to hold reference count.
512pub struct LayerSet<K, V> {
513    pub layers: Vec<LockedLayer<K, V>>,
514    merge_fn: merge::MergeFn<K, V>,
515    counters: Arc<TreeCounters>,
516}
517
518impl<K: Key + LayerKey + OrdLowerBound, V: Value> LayerSet<K, V> {
519    pub fn sum_len(&self) -> usize {
520        let mut size = 0;
521        for layer in &self.layers {
522            size += layer.len()
523        }
524        size
525    }
526
527    pub fn merger(&self) -> merge::Merger<'_, K, V> {
528        merge::Merger::new(
529            self.layers.iter().map(|x| x.as_ref()),
530            self.merge_fn,
531            self.counters.clone(),
532        )
533    }
534
535    /// See `Layer::key_exists`.
536    pub async fn key_exists(&self, key: &K) -> Result<Existence, Error> {
537        for l in &self.layers {
538            match l.key_exists(key).await? {
539                e @ (Existence::Exists | Existence::MaybeExists) => return Ok(e),
540                _ => {}
541            }
542        }
543        Ok(Existence::Missing)
544    }
545}
546
547impl<K, V> fmt::Debug for LayerSet<K, V> {
548    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
549        fmt.debug_list()
550            .entries(self.layers.iter().map(|l| {
551                if let Some(handle) = l.handle() {
552                    format!("{}", handle.object_id())
553                } else {
554                    format!("{:?}", Arc::as_ptr(l))
555                }
556            }))
557            .finish()
558    }
559}
560
561/// A yielder can be used during compactions which are low priority.
562pub trait Yielder: Send {
563    fn yield_now(&mut self) -> impl Future<Output = ()> + Send;
564}
565
566#[cfg(test)]
567mod tests {
568    use super::{LSMTree, Yielder, compact_with_iterator};
569    use crate::drop_event::DropEvent;
570    use crate::lsm_tree::cache::{
571        NullCache, ObjectCache, ObjectCachePlaceholder, ObjectCacheResult,
572    };
573    use crate::lsm_tree::merge::{MergeLayerIterator, MergeResult};
574    use crate::lsm_tree::types::{
575        BoxedLayerIterator, Existence, FuzzyHash, Item, ItemRef, Key, Layer, LayerIterator,
576        LayerKey, OrdLowerBound, OrdUpperBound, SortByU64, Value,
577    };
578    use crate::lsm_tree::{Query, layers_from_handles};
579    use crate::object_handle::ObjectHandle;
580    use crate::serialized_types::{
581        LATEST_VERSION, Version, Versioned, VersionedLatest, versioned_type,
582    };
583    use crate::testing::fake_object::{FakeObject, FakeObjectHandle};
584    use crate::testing::writer::Writer;
585    use anyhow::{Error, anyhow};
586    use async_trait::async_trait;
587    use fprint::TypeFingerprint;
588    use fuchsia_sync::Mutex;
589    use fxfs_macros::FuzzyHash;
590    use rand::rng;
591    use rand::seq::SliceRandom;
592    use std::hash::Hash;
593    use std::sync::Arc;
594
595    #[derive(
596        Clone,
597        Eq,
598        PartialEq,
599        Debug,
600        Hash,
601        FuzzyHash,
602        serde::Serialize,
603        serde::Deserialize,
604        TypeFingerprint,
605        Versioned,
606    )]
607    struct TestKey(std::ops::Range<u64>);
608
609    versioned_type! { 1.. => TestKey }
610
611    impl SortByU64 for TestKey {
612        fn get_leading_u64(&self) -> u64 {
613            self.0.start
614        }
615    }
616
617    impl LayerKey for TestKey {}
618
619    impl OrdUpperBound for TestKey {
620        fn cmp_upper_bound(&self, other: &TestKey) -> std::cmp::Ordering {
621            self.0.end.cmp(&other.0.end)
622        }
623    }
624
625    impl OrdLowerBound for TestKey {
626        fn cmp_lower_bound(&self, other: &Self) -> std::cmp::Ordering {
627            self.0.start.cmp(&other.0.start)
628        }
629    }
630
631    fn emit_left_merge_fn(
632        _left: &MergeLayerIterator<'_, TestKey, u64>,
633        _right: &MergeLayerIterator<'_, TestKey, u64>,
634    ) -> MergeResult<TestKey, u64> {
635        MergeResult::EmitLeft
636    }
637
638    impl Value for u64 {
639        const DELETED_MARKER: Self = 0;
640    }
641
642    struct NoOpYielder;
643    impl Yielder for NoOpYielder {
644        async fn yield_now(&mut self) {}
645    }
646
647    #[fuchsia::test]
648    async fn test_iteration() {
649        let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
650        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
651        tree.insert(items[0].clone()).expect("insert error");
652        tree.insert(items[1].clone()).expect("insert error");
653        let layers = tree.layer_set();
654        let mut merger = layers.merger();
655        let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
656        let ItemRef { key, value, .. } = iter.get().expect("missing item");
657        assert_eq!((key, value), (&items[0].key, &items[0].value));
658        iter.advance().await.expect("advance failed");
659        let ItemRef { key, value, .. } = iter.get().expect("missing item");
660        assert_eq!((key, value), (&items[1].key, &items[1].value));
661        iter.advance().await.expect("advance failed");
662        assert!(iter.get().is_none());
663    }
664
665    #[fuchsia::test]
666    async fn test_compact() {
667        let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
668        let items = [
669            Item::new(TestKey(1..1), 1),
670            Item::new(TestKey(2..2), 2),
671            Item::new(TestKey(3..3), 3),
672            Item::new(TestKey(4..4), 4),
673        ];
674        tree.insert(items[0].clone()).expect("insert error");
675        tree.insert(items[1].clone()).expect("insert error");
676        tree.seal();
677        tree.insert(items[2].clone()).expect("insert error");
678        tree.insert(items[3].clone()).expect("insert error");
679        tree.seal();
680        let object = Arc::new(FakeObject::new());
681        let handle = FakeObjectHandle::new(object.clone());
682        {
683            let layer_set = tree.immutable_layer_set();
684            let mut merger = layer_set.merger();
685            let iter = merger.query(Query::FullScan).await.expect("create merger");
686            compact_with_iterator(
687                iter,
688                items.len(),
689                Writer::new(&handle).await,
690                handle.block_size(),
691                Option::<NoOpYielder>::None,
692            )
693            .await
694            .expect("compact failed");
695        }
696        tree.set_layers(layers_from_handles([handle]).await.expect("layers_from_handles failed"));
697        let handle = FakeObjectHandle::new(object.clone());
698        let tree = LSMTree::open(emit_left_merge_fn, [handle], Box::new(NullCache {}))
699            .await
700            .expect("open failed");
701
702        let layers = tree.layer_set();
703        let mut merger = layers.merger();
704        let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
705        for i in 1..5 {
706            let ItemRef { key, value, .. } = iter.get().expect("missing item");
707            assert_eq!((key, value), (&TestKey(i..i), &i));
708            iter.advance().await.expect("advance failed");
709        }
710        assert!(iter.get().is_none());
711    }
712
713    #[fuchsia::test]
714    async fn test_find() {
715        let items = [
716            Item::new(TestKey(1..1), 1),
717            Item::new(TestKey(2..2), 2),
718            Item::new(TestKey(3..3), 3),
719            Item::new(TestKey(4..4), 4),
720        ];
721        let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
722        tree.insert(items[0].clone()).expect("insert error");
723        tree.insert(items[1].clone()).expect("insert error");
724        tree.seal();
725        tree.insert(items[2].clone()).expect("insert error");
726        tree.insert(items[3].clone()).expect("insert error");
727
728        let item = tree.find(&items[1].key).await.expect("find failed").expect("not found");
729        assert_eq!(item, items[1]);
730        assert!(tree.find(&TestKey(100..100)).await.expect("find failed").is_none());
731    }
732
733    #[fuchsia::test]
734    async fn test_find_no_return_deleted_values() {
735        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), u64::DELETED_MARKER)];
736        let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
737        tree.insert(items[0].clone()).expect("insert error");
738        tree.insert(items[1].clone()).expect("insert error");
739
740        let item = tree.find(&items[0].key).await.expect("find failed").expect("not found");
741        assert_eq!(item, items[0]);
742        assert!(tree.find(&items[1].key).await.expect("find failed").is_none());
743    }
744
745    #[fuchsia::test]
746    async fn test_empty_seal() {
747        let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
748        tree.seal();
749        let item = Item::new(TestKey(1..1), 1);
750        tree.insert(item.clone()).expect("insert error");
751        let object = Arc::new(FakeObject::new());
752        let handle = FakeObjectHandle::new(object.clone());
753        {
754            let layer_set = tree.immutable_layer_set();
755            let mut merger = layer_set.merger();
756            let iter = merger.query(Query::FullScan).await.expect("create merger");
757            compact_with_iterator(
758                iter,
759                0,
760                Writer::new(&handle).await,
761                handle.block_size(),
762                Option::<NoOpYielder>::None,
763            )
764            .await
765            .expect("compact failed");
766        }
767        tree.set_layers(layers_from_handles([handle]).await.expect("layers_from_handles failed"));
768        let found_item = tree.find(&item.key).await.expect("find failed").expect("not found");
769        assert_eq!(found_item, item);
770        assert!(tree.find(&TestKey(2..2)).await.expect("find failed").is_none());
771    }
772
773    #[fuchsia::test]
774    async fn test_filter() {
775        let items = [
776            Item::new(TestKey(1..1), 1),
777            Item::new(TestKey(2..2), 2),
778            Item::new(TestKey(3..3), 3),
779            Item::new(TestKey(4..4), 4),
780        ];
781        let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
782        tree.insert(items[0].clone()).expect("insert error");
783        tree.insert(items[1].clone()).expect("insert error");
784        tree.insert(items[2].clone()).expect("insert error");
785        tree.insert(items[3].clone()).expect("insert error");
786
787        let layers = tree.layer_set();
788        let mut merger = layers.merger();
789
790        // Filter out odd keys (which also guarantees we skip the first key which is an edge case).
791        let mut iter = merger
792            .query(Query::FullScan)
793            .await
794            .expect("seek failed")
795            .filter(|item: ItemRef<'_, TestKey, u64>| item.key.0.start % 2 == 0)
796            .await
797            .expect("filter failed");
798
799        assert_eq!(iter.get(), Some(items[1].as_item_ref()));
800        iter.advance().await.expect("advance failed");
801        assert_eq!(iter.get(), Some(items[3].as_item_ref()));
802        iter.advance().await.expect("advance failed");
803        assert!(iter.get().is_none());
804    }
805
806    #[fuchsia::test]
807    async fn test_insert_order_agnostic() {
808        let items = [
809            Item::new(TestKey(1..1), 1),
810            Item::new(TestKey(2..2), 2),
811            Item::new(TestKey(3..3), 3),
812            Item::new(TestKey(4..4), 4),
813            Item::new(TestKey(5..5), 5),
814            Item::new(TestKey(6..6), 6),
815        ];
816        let a = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
817        for item in &items {
818            a.insert(item.clone()).expect("insert error");
819        }
820        let b = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
821        let mut shuffled = items.clone();
822        shuffled.shuffle(&mut rng());
823        for item in &shuffled {
824            b.insert(item.clone()).expect("insert error");
825        }
826        let layers = a.layer_set();
827        let mut merger = layers.merger();
828        let mut iter_a = merger.query(Query::FullScan).await.expect("seek failed");
829        let layers = b.layer_set();
830        let mut merger = layers.merger();
831        let mut iter_b = merger.query(Query::FullScan).await.expect("seek failed");
832
833        for item in items {
834            assert_eq!(Some(item.as_item_ref()), iter_a.get());
835            assert_eq!(Some(item.as_item_ref()), iter_b.get());
836            iter_a.advance().await.expect("advance failed");
837            iter_b.advance().await.expect("advance failed");
838        }
839        assert!(iter_a.get().is_none());
840        assert!(iter_b.get().is_none());
841    }
842
843    struct AuditCacheInner<'a, V: Value> {
844        lookups: u64,
845        completions: u64,
846        invalidations: u64,
847        drops: u64,
848        result: Option<ObjectCacheResult<'a, V>>,
849    }
850
851    impl<V: Value> AuditCacheInner<'_, V> {
852        fn stats(&self) -> (u64, u64, u64, u64) {
853            (self.lookups, self.completions, self.invalidations, self.drops)
854        }
855    }
856
857    struct AuditCache<'a, V: Value> {
858        inner: Arc<Mutex<AuditCacheInner<'a, V>>>,
859    }
860
861    impl<V: Value> AuditCache<'_, V> {
862        fn new() -> Self {
863            Self {
864                inner: Arc::new(Mutex::new(AuditCacheInner {
865                    lookups: 0,
866                    completions: 0,
867                    invalidations: 0,
868                    drops: 0,
869                    result: None,
870                })),
871            }
872        }
873    }
874
875    struct AuditPlaceholder<'a, V: Value> {
876        inner: Arc<Mutex<AuditCacheInner<'a, V>>>,
877        completed: Mutex<bool>,
878    }
879
880    impl<V: Value> ObjectCachePlaceholder<V> for AuditPlaceholder<'_, V> {
881        fn complete(self: Box<Self>, _: Option<&V>) {
882            self.inner.lock().completions += 1;
883            *self.completed.lock() = true;
884        }
885    }
886
887    impl<V: Value> Drop for AuditPlaceholder<'_, V> {
888        fn drop(&mut self) {
889            if !*self.completed.lock() {
890                self.inner.lock().drops += 1;
891            }
892        }
893    }
894
895    impl<K: Key + std::cmp::PartialEq, V: Value> ObjectCache<K, V> for AuditCache<'_, V> {
896        fn lookup_or_reserve(&self, _key: &K) -> ObjectCacheResult<'_, V> {
897            {
898                let mut inner = self.inner.lock();
899                inner.lookups += 1;
900                if inner.result.is_some() {
901                    return std::mem::take(&mut inner.result).unwrap();
902                }
903            }
904            ObjectCacheResult::Placeholder(Box::new(AuditPlaceholder {
905                inner: self.inner.clone(),
906                completed: Mutex::new(false),
907            }))
908        }
909
910        fn invalidate(&self, _key: K, _value: Option<V>) {
911            self.inner.lock().invalidations += 1;
912        }
913    }
914
915    #[fuchsia::test]
916    async fn test_cache_handling() {
917        let item = Item::new(TestKey(1..1), 1);
918        let cache = Box::new(AuditCache::new());
919        let inner = cache.inner.clone();
920        let a = LSMTree::new(emit_left_merge_fn, cache);
921
922        // Zero counters.
923        assert_eq!(inner.lock().stats(), (0, 0, 0, 0));
924
925        // Look for an item, but don't find it. So no insertion. It is dropped.
926        assert!(a.find(&item.key).await.expect("Failed find").is_none());
927        assert_eq!(inner.lock().stats(), (1, 0, 0, 1));
928
929        // Insert attempts to invalidate.
930        let _ = a.insert(item.clone());
931        assert_eq!(inner.lock().stats(), (1, 0, 1, 1));
932
933        // Look for item, find it and insert into the cache.
934        assert_eq!(
935            a.find(&item.key).await.expect("Failed find").expect("Item should be found.").value,
936            item.value
937        );
938        assert_eq!(inner.lock().stats(), (2, 1, 1, 1));
939
940        // Insert or replace attempts to invalidate as well.
941        a.replace_or_insert(item.clone());
942        assert_eq!(inner.lock().stats(), (2, 1, 2, 1));
943    }
944
945    #[fuchsia::test]
946    async fn test_cache_hit() {
947        let item = Item::new(TestKey(1..1), 1);
948        let cache = Box::new(AuditCache::new());
949        let inner = cache.inner.clone();
950        let a = LSMTree::new(emit_left_merge_fn, cache);
951
952        // Zero counters.
953        assert_eq!(inner.lock().stats(), (0, 0, 0, 0));
954
955        // Insert attempts to invalidate.
956        let _ = a.insert(item.clone());
957        assert_eq!(inner.lock().stats(), (0, 0, 1, 0));
958
959        // Set up the item to find in the cache.
960        inner.lock().result = Some(ObjectCacheResult::Value(item.value.clone()));
961
962        // Look for item, find it in cache, so no insert.
963        assert_eq!(
964            a.find(&item.key).await.expect("Failed find").expect("Item should be found.").value,
965            item.value
966        );
967        assert_eq!(inner.lock().stats(), (1, 0, 1, 0));
968    }
969
970    #[fuchsia::test]
971    async fn test_cache_says_uncacheable() {
972        let item = Item::new(TestKey(1..1), 1);
973        let cache = Box::new(AuditCache::new());
974        let inner = cache.inner.clone();
975        let a = LSMTree::new(emit_left_merge_fn, cache);
976        let _ = a.insert(item.clone());
977
978        // One invalidation from the insert.
979        assert_eq!(inner.lock().stats(), (0, 0, 1, 0));
980
981        // Set up the NoCache response to find in the cache.
982        inner.lock().result = Some(ObjectCacheResult::NoCache);
983
984        // Look for item, it is uncacheable, so no insert.
985        assert_eq!(
986            a.find(&item.key).await.expect("Failed find").expect("Should find item").value,
987            item.value
988        );
989        assert_eq!(inner.lock().stats(), (1, 0, 1, 0));
990    }
991
992    struct FailLayer {
993        drop_event: Mutex<Option<Arc<DropEvent>>>,
994    }
995
996    impl FailLayer {
997        fn new() -> Self {
998            Self { drop_event: Mutex::new(Some(Arc::new(DropEvent::new()))) }
999        }
1000    }
1001
1002    #[async_trait]
1003    impl<K: Key, V: Value> Layer<K, V> for FailLayer {
1004        async fn seek(
1005            &self,
1006            _bound: std::ops::Bound<&K>,
1007        ) -> Result<BoxedLayerIterator<'_, K, V>, Error> {
1008            Err(anyhow!("Purposely failed seek"))
1009        }
1010
1011        fn lock(&self) -> Option<Arc<DropEvent>> {
1012            self.drop_event.lock().clone()
1013        }
1014
1015        fn len(&self) -> usize {
1016            0
1017        }
1018
1019        async fn close(&self) {
1020            let listener = match std::mem::replace(&mut (*self.drop_event.lock()), None) {
1021                Some(drop_event) => drop_event.listen(),
1022                None => return,
1023            };
1024            listener.await;
1025        }
1026
1027        fn get_version(&self) -> Version {
1028            LATEST_VERSION
1029        }
1030
1031        async fn key_exists(&self, _key: &K) -> Result<Existence, Error> {
1032            unimplemented!();
1033        }
1034    }
1035
1036    struct MockLayer {
1037        exists_result: Existence,
1038        drop_event: Mutex<Option<Arc<DropEvent>>>,
1039    }
1040
1041    impl MockLayer {
1042        fn new(exists_result: Existence) -> Self {
1043            Self { exists_result, drop_event: Mutex::new(Some(Arc::new(DropEvent::new()))) }
1044        }
1045    }
1046
1047    #[async_trait]
1048    impl<K: Key, V: Value> Layer<K, V> for MockLayer {
1049        async fn seek(
1050            &self,
1051            _bound: std::ops::Bound<&K>,
1052        ) -> Result<BoxedLayerIterator<'_, K, V>, Error> {
1053            unimplemented!()
1054        }
1055
1056        fn lock(&self) -> Option<Arc<DropEvent>> {
1057            self.drop_event.lock().clone()
1058        }
1059
1060        fn len(&self) -> usize {
1061            0
1062        }
1063
1064        async fn close(&self) {
1065            let listener = match std::mem::replace(&mut (*self.drop_event.lock()), None) {
1066                Some(drop_event) => drop_event.listen(),
1067                None => return,
1068            };
1069            listener.await;
1070        }
1071
1072        fn get_version(&self) -> Version {
1073            LATEST_VERSION
1074        }
1075
1076        async fn key_exists(&self, _key: &K) -> Result<Existence, Error> {
1077            Ok(self.exists_result)
1078        }
1079    }
1080
1081    #[fuchsia::test]
1082    async fn test_layer_set_key_exists() {
1083        use super::LockedLayer;
1084
1085        let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
1086        let mut layer_set = tree.empty_layer_set();
1087
1088        // Empty layer set should return Missing.
1089        assert_eq!(
1090            layer_set.key_exists(&TestKey(0..1)).await.expect("key_exists failed"),
1091            Existence::Missing
1092        );
1093
1094        // Add a layer that returns Missing.
1095        layer_set.layers.push(LockedLayer::from(
1096            Arc::new(MockLayer::new(Existence::Missing)) as Arc<dyn Layer<TestKey, u64>>
1097        ));
1098        assert_eq!(
1099            layer_set.key_exists(&TestKey(0..1)).await.expect("key_exists failed"),
1100            Existence::Missing
1101        );
1102
1103        // Add a layer that returns MaybeExists.
1104        layer_set.layers.push(LockedLayer::from(
1105            Arc::new(MockLayer::new(Existence::MaybeExists)) as Arc<dyn Layer<TestKey, u64>>
1106        ));
1107        assert_eq!(
1108            layer_set.key_exists(&TestKey(0..1)).await.expect("key_exists failed"),
1109            Existence::MaybeExists
1110        );
1111
1112        // Add a layer that returns Exists.
1113        layer_set.layers.insert(
1114            0,
1115            LockedLayer::from(
1116                Arc::new(MockLayer::new(Existence::Exists)) as Arc<dyn Layer<TestKey, u64>>
1117            ),
1118        );
1119        assert_eq!(
1120            layer_set.key_exists(&TestKey(0..1)).await.expect("key_exists failed"),
1121            Existence::Exists
1122        );
1123    }
1124
1125    #[fuchsia::test]
1126    async fn test_failed_lookup() {
1127        let cache = Box::new(AuditCache::new());
1128        let inner = cache.inner.clone();
1129        let a = LSMTree::new(emit_left_merge_fn, cache);
1130        a.set_layers(vec![Arc::new(FailLayer::new())]);
1131
1132        // Zero counters.
1133        assert_eq!(inner.lock().stats(), (0, 0, 0, 0));
1134
1135        // Lookup should fail and drop the placeholder.
1136        assert!(a.find(&TestKey(1..1)).await.is_err());
1137        assert_eq!(inner.lock().stats(), (1, 0, 0, 1));
1138    }
1139}
1140
1141#[cfg(fuzz)]
1142mod fuzz {
1143    use crate::lsm_tree::types::{
1144        FuzzyHash, Item, LayerKey, OrdLowerBound, OrdUpperBound, SortByU64, Value,
1145    };
1146    use crate::serialized_types::{
1147        LATEST_VERSION, Version, Versioned, VersionedLatest, versioned_type,
1148    };
1149    use arbitrary::Arbitrary;
1150    use fprint::TypeFingerprint;
1151    use fuzz::fuzz;
1152    use fxfs_macros::FuzzyHash;
1153    use std::hash::Hash;
1154
1155    #[derive(
1156        Arbitrary,
1157        Clone,
1158        Eq,
1159        Hash,
1160        FuzzyHash,
1161        PartialEq,
1162        Debug,
1163        serde::Serialize,
1164        serde::Deserialize,
1165        TypeFingerprint,
1166        Versioned,
1167    )]
1168    struct TestKey(std::ops::Range<u64>);
1169
1170    versioned_type! { 1.. => TestKey }
1171
1172    impl Versioned for u64 {}
1173    versioned_type! { 1.. => u64 }
1174
1175    impl LayerKey for TestKey {}
1176
1177    impl SortByU64 for TestKey {
1178        fn get_leading_u64(&self) -> u64 {
1179            self.0.start
1180        }
1181    }
1182
1183    impl OrdUpperBound for TestKey {
1184        fn cmp_upper_bound(&self, other: &TestKey) -> std::cmp::Ordering {
1185            self.0.end.cmp(&other.0.end)
1186        }
1187    }
1188
1189    impl OrdLowerBound for TestKey {
1190        fn cmp_lower_bound(&self, other: &Self) -> std::cmp::Ordering {
1191            self.0.start.cmp(&other.0.start)
1192        }
1193    }
1194
1195    impl Value for u64 {
1196        const DELETED_MARKER: Self = 0;
1197    }
1198
1199    // Note: This code isn't really dead. it's used below in
1200    // `fuzz_lsm_tree_action`. However, the `#[fuzz]` proc macro attribute
1201    // obfuscates the usage enough to confuse the compiler.
1202    #[allow(dead_code)]
1203    #[derive(Arbitrary)]
1204    enum FuzzAction {
1205        Insert(Item<TestKey, u64>),
1206        ReplaceOrInsert(Item<TestKey, u64>),
1207        MergeInto(Item<TestKey, u64>, TestKey),
1208        Find(TestKey),
1209        Seal,
1210    }
1211
1212    #[fuzz]
1213    fn fuzz_lsm_tree_actions(actions: Vec<FuzzAction>) {
1214        use super::LSMTree;
1215        use super::cache::NullCache;
1216        use crate::lsm_tree::merge::{MergeLayerIterator, MergeResult};
1217        use futures::executor::block_on;
1218
1219        fn emit_left_merge_fn(
1220            _left: &MergeLayerIterator<'_, TestKey, u64>,
1221            _right: &MergeLayerIterator<'_, TestKey, u64>,
1222        ) -> MergeResult<TestKey, u64> {
1223            MergeResult::EmitLeft
1224        }
1225
1226        let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
1227        for action in actions {
1228            match action {
1229                FuzzAction::Insert(item) => {
1230                    let _ = tree.insert(item);
1231                }
1232                FuzzAction::ReplaceOrInsert(item) => {
1233                    tree.replace_or_insert(item);
1234                }
1235                FuzzAction::Find(key) => {
1236                    block_on(tree.find(&key)).expect("find failed");
1237                }
1238                FuzzAction::MergeInto(item, bound) => tree.merge_into(item, &bound),
1239                FuzzAction::Seal => tree.seal(),
1240            };
1241        }
1242    }
1243}