Skip to main content

fxfs/
lsm_tree.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5mod bloom_filter;
6pub mod cache;
7pub mod merge;
8pub mod persistent_layer;
9pub mod skip_list_layer;
10pub mod types;
11
12use crate::drop_event::DropEvent;
13use crate::log::*;
14use crate::metrics::DurationMeasureScope;
15use crate::object_handle::{ReadObjectHandle, WriteBytes};
16use crate::serialized_types::{LATEST_VERSION, Version};
17
18use anyhow::Error;
19use cache::{ObjectCache, ObjectCacheResult};
20
21use fuchsia_sync::{Mutex, RwLock};
22use persistent_layer::{PersistentLayer, PersistentLayerWriter};
23use skip_list_layer::SkipListLayer;
24use std::fmt;
25use std::sync::Arc;
26use types::{
27    Existence, Item, ItemRef, Key, Layer, LayerIterator, LayerKey, LayerWriter, MergeableKey,
28    OrdLowerBound, Value,
29};
30
31pub use merge::Query;
32
33const SKIP_LIST_LAYER_ITEMS: usize = 512;
34
35// For serialization.
36pub use persistent_layer::{
37    LayerHeader as PersistentLayerHeader, LayerHeaderV39 as PersistentLayerHeaderV39,
38    LayerInfo as PersistentLayerInfo, LayerInfoV39 as PersistentLayerInfoV39,
39};
40
41pub async fn layers_from_handles<K: Key, V: Value>(
42    handles: impl IntoIterator<Item = impl ReadObjectHandle + 'static>,
43) -> Result<Vec<Arc<dyn Layer<K, V>>>, Error> {
44    let mut layers = Vec::new();
45    for handle in handles {
46        layers.push(PersistentLayer::open(handle).await? as Arc<dyn Layer<K, V>>);
47    }
48    Ok(layers)
49}
50
51#[derive(Eq, PartialEq, Debug)]
52pub enum Operation {
53    Insert,
54    ReplaceOrInsert,
55    MergeInto,
56}
57
58pub type MutationCallback<K, V> = Option<Box<dyn Fn(Operation, &Item<K, V>) + Send + Sync>>;
59
60struct Inner<K, V> {
61    mutable_layer: Arc<SkipListLayer<K, V>>,
62    layers: Vec<Arc<dyn Layer<K, V>>>,
63    mutation_callback: MutationCallback<K, V>,
64}
65
66#[derive(Default)]
67pub(super) struct Counters {
68    num_seeks: usize,
69    // The following two metrics are used to compute the effectiveness of the bloom filters.
70    // `layer_files_total` tracks the number of layer files we might have looked at across all
71    // seeks, and `layer_files_skipped` tracks how many we skipped thanks to the bloom filter.
72    layer_files_total: usize,
73    layer_files_skipped: usize,
74}
75
76/// LSMTree manages a tree of layers to provide a key/value store.  Each layer contains deltas on
77/// the preceding layer.  The top layer is an in-memory mutable layer.  Layers can be compacted to
78/// form a new combined layer.
79pub struct LSMTree<K, V> {
80    data: RwLock<Inner<K, V>>,
81    merge_fn: merge::MergeFn<K, V>,
82    cache: Box<dyn ObjectCache<K, V>>,
83    counters: Arc<Mutex<Counters>>,
84}
85
86#[fxfs_trace::trace]
87impl<'tree, K: MergeableKey, V: Value> LSMTree<K, V> {
88    /// Creates a new empty tree.
89    pub fn new(merge_fn: merge::MergeFn<K, V>, cache: Box<dyn ObjectCache<K, V>>) -> Self {
90        LSMTree {
91            data: RwLock::new(Inner {
92                mutable_layer: Self::new_mutable_layer(),
93                layers: Vec::new(),
94                mutation_callback: None,
95            }),
96            merge_fn,
97            cache,
98            counters: Arc::new(Mutex::new(Default::default())),
99        }
100    }
101
102    /// Opens an existing tree from the provided handles to the layer objects.
103    pub async fn open(
104        merge_fn: merge::MergeFn<K, V>,
105        handles: impl IntoIterator<Item = impl ReadObjectHandle + 'static>,
106        cache: Box<dyn ObjectCache<K, V>>,
107    ) -> Result<Self, Error> {
108        Ok(LSMTree {
109            data: RwLock::new(Inner {
110                mutable_layer: Self::new_mutable_layer(),
111                layers: layers_from_handles(handles).await?,
112                mutation_callback: None,
113            }),
114            merge_fn,
115            cache,
116            counters: Arc::new(Mutex::new(Default::default())),
117        })
118    }
119
120    /// Replaces the immutable layers.
121    pub fn set_layers(&self, layers: Vec<Arc<dyn Layer<K, V>>>) {
122        self.data.write().layers = layers;
123    }
124
125    /// Appends to the given layers at the end i.e. they should be base layers.  This is supposed
126    /// to be used after replay when we are opening a tree and we have discovered the base layers.
127    pub async fn append_layers(
128        &self,
129        handles: impl IntoIterator<Item = impl ReadObjectHandle + 'static>,
130    ) -> Result<(), Error> {
131        let mut layers = layers_from_handles(handles).await?;
132        self.data.write().layers.append(&mut layers);
133        Ok(())
134    }
135
136    /// Resets the immutable layers.
137    pub fn reset_immutable_layers(&self) {
138        self.data.write().layers = Vec::new();
139    }
140
141    /// Seals the current mutable layer and creates a new one.
142    pub fn seal(&self) {
143        // We need to be sure there are no mutations currently in-progress.  This is currently
144        // guaranteed by ensuring that all mutations take a read lock on `data`.
145        let mut data = self.data.write();
146        let layer = std::mem::replace(&mut data.mutable_layer, Self::new_mutable_layer());
147        data.layers.insert(0, layer);
148    }
149
150    /// Resets the tree to an empty state.
151    pub fn reset(&self) {
152        let mut data = self.data.write();
153        data.layers = Vec::new();
154        data.mutable_layer = Self::new_mutable_layer();
155    }
156
157    /// Writes the items yielded by the iterator into the supplied object.
158    #[trace]
159    pub async fn compact_with_iterator<W: WriteBytes + Send>(
160        &self,
161        mut iterator: impl LayerIterator<K, V>,
162        num_items: usize,
163        writer: W,
164        block_size: u64,
165    ) -> Result<(), Error> {
166        let mut writer =
167            PersistentLayerWriter::<W, K, V>::new(writer, num_items, block_size).await?;
168        while let Some(item_ref) = iterator.get() {
169            debug!(item_ref:?; "compact: writing");
170            writer.write(item_ref).await?;
171            iterator.advance().await?;
172        }
173        writer.flush().await
174    }
175
176    /// Returns an empty layer-set for this tree.
177    pub fn empty_layer_set(&self) -> LayerSet<K, V> {
178        LayerSet { layers: Vec::new(), merge_fn: self.merge_fn, counters: self.counters.clone() }
179    }
180
181    /// Adds all the layers (including the mutable layer) to `layer_set`.
182    pub fn add_all_layers_to_layer_set(&self, layer_set: &mut LayerSet<K, V>) {
183        let data = self.data.read();
184        layer_set.layers.reserve_exact(data.layers.len() + 1);
185        layer_set
186            .layers
187            .push(LockedLayer::from(data.mutable_layer.clone() as Arc<dyn Layer<K, V>>));
188        for layer in &data.layers {
189            layer_set.layers.push(layer.clone().into());
190        }
191    }
192
193    /// Returns a clone of the current set of layers (including the mutable layer), after which one
194    /// can get an iterator.
195    pub fn layer_set(&self) -> LayerSet<K, V> {
196        let mut layer_set = self.empty_layer_set();
197        self.add_all_layers_to_layer_set(&mut layer_set);
198        layer_set
199    }
200
201    /// Returns the current set of immutable layers after which one can get an iterator (for e.g.
202    /// compacting).  Since these layers are immutable, getting an iterator should not block
203    /// anything else.
204    pub fn immutable_layer_set(&self) -> LayerSet<K, V> {
205        let data = self.data.read();
206        let mut layers = Vec::with_capacity(data.layers.len());
207        for layer in &data.layers {
208            layers.push(layer.clone().into());
209        }
210        LayerSet { layers, merge_fn: self.merge_fn, counters: self.counters.clone() }
211    }
212
213    /// Inserts an item into the mutable layer.
214    /// Returns error if item already exists.
215    pub fn insert(&self, item: Item<K, V>) -> Result<(), Error> {
216        let _measure = DurationMeasureScope::new(&crate::metrics::lsm_tree_metrics().insert);
217
218        let key = item.key.clone();
219        let val = if item.value == V::DELETED_MARKER { None } else { Some(item.value.clone()) };
220        {
221            // `seal` below relies on us holding a read lock whilst we do the mutation.
222            let data = self.data.read();
223            if let Some(mutation_callback) = data.mutation_callback.as_ref() {
224                mutation_callback(Operation::Insert, &item);
225            }
226            data.mutable_layer.insert(item)?;
227        }
228        self.cache.invalidate(key, val);
229        Ok(())
230    }
231
232    /// Replaces or inserts an item into the mutable layer.
233    pub fn replace_or_insert(&self, item: Item<K, V>) {
234        let _measure =
235            DurationMeasureScope::new(&crate::metrics::lsm_tree_metrics().replace_or_insert);
236
237        let key = item.key.clone();
238        let val = if item.value == V::DELETED_MARKER { None } else { Some(item.value.clone()) };
239        {
240            // `seal` below relies on us holding a read lock whilst we do the mutation.
241            let data = self.data.read();
242            if let Some(mutation_callback) = data.mutation_callback.as_ref() {
243                mutation_callback(Operation::ReplaceOrInsert, &item);
244            }
245            data.mutable_layer.replace_or_insert(item);
246        }
247        self.cache.invalidate(key, val);
248    }
249
250    /// Merges the given item into the mutable layer.
251    pub fn merge_into(&self, item: Item<K, V>, lower_bound: &K) {
252        let _measure = DurationMeasureScope::new(&crate::metrics::lsm_tree_metrics().merge_into);
253
254        let key = item.key.clone();
255        {
256            // `seal` below relies on us holding a read lock whilst we do the mutation.
257            let data = self.data.read();
258            if let Some(mutation_callback) = data.mutation_callback.as_ref() {
259                mutation_callback(Operation::MergeInto, &item);
260            }
261            data.mutable_layer.merge_into(item, lower_bound, self.merge_fn);
262        }
263        self.cache.invalidate(key, None);
264    }
265
266    /// Searches for an exact match for the given key. If the value is equal to
267    /// `Value::DELETED_MARKER` the item is considered missing and will not be returned.
268    pub async fn find(&self, search_key: &K) -> Result<Option<Item<K, V>>, Error>
269    where
270        K: Eq,
271    {
272        let _measure = DurationMeasureScope::new(&crate::metrics::lsm_tree_metrics().find);
273        // It is important that the cache lookup is done prior to fetching the layer set as the
274        // placeholder returned acts as a sort of lock for the validity of the item that may be
275        // inserted later via that placeholder.
276        let token = match self.cache.lookup_or_reserve(search_key) {
277            ObjectCacheResult::Value(value) => {
278                if value == V::DELETED_MARKER {
279                    return Ok(None);
280                } else {
281                    return Ok(Some(Item::new(search_key.clone(), value)));
282                }
283            }
284            ObjectCacheResult::Placeholder(token) => Some(token),
285            ObjectCacheResult::NoCache => None,
286        };
287        let layer_set = self.layer_set();
288        let mut merger = layer_set.merger();
289
290        Ok(match merger.query(Query::Point(search_key)).await?.get() {
291            Some(ItemRef { key, value, sequence })
292                if key == search_key && *value != V::DELETED_MARKER =>
293            {
294                if let Some(token) = token {
295                    token.complete(Some(value));
296                }
297                Some(Item { key: key.clone(), value: value.clone(), sequence })
298            }
299            _ => None,
300        })
301    }
302
303    pub fn mutable_layer(&self) -> Arc<SkipListLayer<K, V>> {
304        self.data.read().mutable_layer.clone()
305    }
306
307    /// Sets a mutation callback which is a callback that is triggered whenever any mutations are
308    /// applied to the tree.  This might be useful for tests that want to record the precise
309    /// sequence of mutations that are applied to the tree.
310    pub fn set_mutation_callback(&self, mutation_callback: MutationCallback<K, V>) {
311        self.data.write().mutation_callback = mutation_callback;
312    }
313
314    /// Returns the earliest version used by a layer in the tree.
315    pub fn get_earliest_version(&self) -> Version {
316        let mut earliest_version = LATEST_VERSION;
317        for layer in self.layer_set().layers {
318            let layer_version = layer.get_version();
319            if layer_version < earliest_version {
320                earliest_version = layer_version;
321            }
322        }
323        return earliest_version;
324    }
325
326    /// Returns a new mutable layer.
327    pub fn new_mutable_layer() -> Arc<SkipListLayer<K, V>> {
328        SkipListLayer::new(SKIP_LIST_LAYER_ITEMS)
329    }
330
331    /// Replaces the mutable layer.
332    pub fn set_mutable_layer(&self, layer: Arc<SkipListLayer<K, V>>) {
333        self.data.write().mutable_layer = layer;
334    }
335
336    /// Records inspect data for the LSM tree into `node`.  Called lazily when inspect is queried.
337    pub fn record_inspect_data(&self, root: &fuchsia_inspect::Node) {
338        let layer_set = self.layer_set();
339        root.record_child("layers", move |node| {
340            let mut index = 0;
341            for layer in layer_set.layers {
342                node.record_child(format!("{index}"), move |node| {
343                    layer.1.record_inspect_data(node)
344                });
345                index += 1;
346            }
347        });
348        {
349            let counters = self.counters.lock();
350            root.record_uint("num_seeks", counters.num_seeks as u64);
351            root.record_uint(
352                "bloom_filter_success_percent",
353                if counters.layer_files_total == 0 {
354                    0
355                } else {
356                    (counters.layer_files_skipped * 100).div_ceil(counters.layer_files_total) as u64
357                },
358            );
359        }
360    }
361}
362
363/// This is an RAII wrapper for a layer which holds a lock on the layer (via the Layer::lock
364/// method).
365pub struct LockedLayer<K, V>(Arc<DropEvent>, Arc<dyn Layer<K, V>>);
366
367impl<K, V> LockedLayer<K, V> {
368    pub async fn close_layer(self) {
369        let layer = self.1;
370        std::mem::drop(self.0);
371        layer.close().await;
372    }
373}
374
375impl<K, V> From<Arc<dyn Layer<K, V>>> for LockedLayer<K, V> {
376    fn from(layer: Arc<dyn Layer<K, V>>) -> Self {
377        let event = layer.lock().unwrap();
378        Self(event, layer)
379    }
380}
381
382impl<K, V> std::ops::Deref for LockedLayer<K, V> {
383    type Target = Arc<dyn Layer<K, V>>;
384
385    fn deref(&self) -> &Self::Target {
386        &self.1
387    }
388}
389
390impl<K, V> AsRef<dyn Layer<K, V>> for LockedLayer<K, V> {
391    fn as_ref(&self) -> &(dyn Layer<K, V> + 'static) {
392        self.1.as_ref()
393    }
394}
395
396/// A LayerSet provides a snapshot of the layers at a particular point in time, and allows you to
397/// get an iterator.  Iterators borrow the layers so something needs to hold reference count.
398pub struct LayerSet<K, V> {
399    pub layers: Vec<LockedLayer<K, V>>,
400    merge_fn: merge::MergeFn<K, V>,
401    counters: Arc<Mutex<Counters>>,
402}
403
404impl<K: Key + LayerKey + OrdLowerBound, V: Value> LayerSet<K, V> {
405    pub fn sum_len(&self) -> usize {
406        let mut size = 0;
407        for layer in &self.layers {
408            size += layer.len()
409        }
410        size
411    }
412
413    pub fn merger(&self) -> merge::Merger<'_, K, V> {
414        merge::Merger::new(
415            self.layers.iter().map(|x| x.as_ref()),
416            self.merge_fn,
417            self.counters.clone(),
418        )
419    }
420
421    /// See `Layer::key_exists`.
422    pub async fn key_exists(&self, key: &K) -> Result<Existence, Error> {
423        for l in &self.layers {
424            match l.key_exists(key).await? {
425                e @ (Existence::Exists | Existence::MaybeExists) => return Ok(e),
426                _ => {}
427            }
428        }
429        Ok(Existence::Missing)
430    }
431}
432
433impl<K, V> fmt::Debug for LayerSet<K, V> {
434    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
435        fmt.debug_list()
436            .entries(self.layers.iter().map(|l| {
437                if let Some(handle) = l.handle() {
438                    format!("{}", handle.object_id())
439                } else {
440                    format!("{:?}", Arc::as_ptr(l))
441                }
442            }))
443            .finish()
444    }
445}
446
447#[cfg(test)]
448mod tests {
449    use super::LSMTree;
450    use crate::drop_event::DropEvent;
451    use crate::lsm_tree::cache::{
452        NullCache, ObjectCache, ObjectCachePlaceholder, ObjectCacheResult,
453    };
454    use crate::lsm_tree::merge::{MergeLayerIterator, MergeResult};
455    use crate::lsm_tree::types::{
456        BoxedLayerIterator, Existence, FuzzyHash, Item, ItemRef, Key, Layer, LayerIterator,
457        LayerKey, OrdLowerBound, OrdUpperBound, SortByU64, Value,
458    };
459    use crate::lsm_tree::{Query, layers_from_handles};
460    use crate::object_handle::ObjectHandle;
461    use crate::serialized_types::{
462        LATEST_VERSION, Version, Versioned, VersionedLatest, versioned_type,
463    };
464    use crate::testing::fake_object::{FakeObject, FakeObjectHandle};
465    use crate::testing::writer::Writer;
466    use anyhow::{Error, anyhow};
467    use async_trait::async_trait;
468    use fprint::TypeFingerprint;
469    use fuchsia_sync::Mutex;
470    use fxfs_macros::FuzzyHash;
471    use rand::rng;
472    use rand::seq::SliceRandom;
473    use std::hash::Hash;
474    use std::sync::Arc;
475
476    #[derive(
477        Clone,
478        Eq,
479        PartialEq,
480        Debug,
481        Hash,
482        FuzzyHash,
483        serde::Serialize,
484        serde::Deserialize,
485        TypeFingerprint,
486        Versioned,
487    )]
488    struct TestKey(std::ops::Range<u64>);
489
490    versioned_type! { 1.. => TestKey }
491
492    impl SortByU64 for TestKey {
493        fn get_leading_u64(&self) -> u64 {
494            self.0.start
495        }
496    }
497
498    impl LayerKey for TestKey {}
499
500    impl OrdUpperBound for TestKey {
501        fn cmp_upper_bound(&self, other: &TestKey) -> std::cmp::Ordering {
502            self.0.end.cmp(&other.0.end)
503        }
504    }
505
506    impl OrdLowerBound for TestKey {
507        fn cmp_lower_bound(&self, other: &Self) -> std::cmp::Ordering {
508            self.0.start.cmp(&other.0.start)
509        }
510    }
511
512    fn emit_left_merge_fn(
513        _left: &MergeLayerIterator<'_, TestKey, u64>,
514        _right: &MergeLayerIterator<'_, TestKey, u64>,
515    ) -> MergeResult<TestKey, u64> {
516        MergeResult::EmitLeft
517    }
518
519    impl Value for u64 {
520        const DELETED_MARKER: Self = 0;
521    }
522
523    #[fuchsia::test]
524    async fn test_iteration() {
525        let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
526        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
527        tree.insert(items[0].clone()).expect("insert error");
528        tree.insert(items[1].clone()).expect("insert error");
529        let layers = tree.layer_set();
530        let mut merger = layers.merger();
531        let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
532        let ItemRef { key, value, .. } = iter.get().expect("missing item");
533        assert_eq!((key, value), (&items[0].key, &items[0].value));
534        iter.advance().await.expect("advance failed");
535        let ItemRef { key, value, .. } = iter.get().expect("missing item");
536        assert_eq!((key, value), (&items[1].key, &items[1].value));
537        iter.advance().await.expect("advance failed");
538        assert!(iter.get().is_none());
539    }
540
541    #[fuchsia::test]
542    async fn test_compact() {
543        let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
544        let items = [
545            Item::new(TestKey(1..1), 1),
546            Item::new(TestKey(2..2), 2),
547            Item::new(TestKey(3..3), 3),
548            Item::new(TestKey(4..4), 4),
549        ];
550        tree.insert(items[0].clone()).expect("insert error");
551        tree.insert(items[1].clone()).expect("insert error");
552        tree.seal();
553        tree.insert(items[2].clone()).expect("insert error");
554        tree.insert(items[3].clone()).expect("insert error");
555        tree.seal();
556        let object = Arc::new(FakeObject::new());
557        let handle = FakeObjectHandle::new(object.clone());
558        {
559            let layer_set = tree.immutable_layer_set();
560            let mut merger = layer_set.merger();
561            let iter = merger.query(Query::FullScan).await.expect("create merger");
562            tree.compact_with_iterator(
563                iter,
564                items.len(),
565                Writer::new(&handle).await,
566                handle.block_size(),
567            )
568            .await
569            .expect("compact failed");
570        }
571        tree.set_layers(layers_from_handles([handle]).await.expect("layers_from_handles failed"));
572        let handle = FakeObjectHandle::new(object.clone());
573        let tree = LSMTree::open(emit_left_merge_fn, [handle], Box::new(NullCache {}))
574            .await
575            .expect("open failed");
576
577        let layers = tree.layer_set();
578        let mut merger = layers.merger();
579        let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
580        for i in 1..5 {
581            let ItemRef { key, value, .. } = iter.get().expect("missing item");
582            assert_eq!((key, value), (&TestKey(i..i), &i));
583            iter.advance().await.expect("advance failed");
584        }
585        assert!(iter.get().is_none());
586    }
587
588    #[fuchsia::test]
589    async fn test_find() {
590        let items = [
591            Item::new(TestKey(1..1), 1),
592            Item::new(TestKey(2..2), 2),
593            Item::new(TestKey(3..3), 3),
594            Item::new(TestKey(4..4), 4),
595        ];
596        let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
597        tree.insert(items[0].clone()).expect("insert error");
598        tree.insert(items[1].clone()).expect("insert error");
599        tree.seal();
600        tree.insert(items[2].clone()).expect("insert error");
601        tree.insert(items[3].clone()).expect("insert error");
602
603        let item = tree.find(&items[1].key).await.expect("find failed").expect("not found");
604        assert_eq!(item, items[1]);
605        assert!(tree.find(&TestKey(100..100)).await.expect("find failed").is_none());
606    }
607
608    #[fuchsia::test]
609    async fn test_find_no_return_deleted_values() {
610        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), u64::DELETED_MARKER)];
611        let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
612        tree.insert(items[0].clone()).expect("insert error");
613        tree.insert(items[1].clone()).expect("insert error");
614
615        let item = tree.find(&items[0].key).await.expect("find failed").expect("not found");
616        assert_eq!(item, items[0]);
617        assert!(tree.find(&items[1].key).await.expect("find failed").is_none());
618    }
619
620    #[fuchsia::test]
621    async fn test_empty_seal() {
622        let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
623        tree.seal();
624        let item = Item::new(TestKey(1..1), 1);
625        tree.insert(item.clone()).expect("insert error");
626        let object = Arc::new(FakeObject::new());
627        let handle = FakeObjectHandle::new(object.clone());
628        {
629            let layer_set = tree.immutable_layer_set();
630            let mut merger = layer_set.merger();
631            let iter = merger.query(Query::FullScan).await.expect("create merger");
632            tree.compact_with_iterator(iter, 0, Writer::new(&handle).await, handle.block_size())
633                .await
634                .expect("compact failed");
635        }
636        tree.set_layers(layers_from_handles([handle]).await.expect("layers_from_handles failed"));
637        let found_item = tree.find(&item.key).await.expect("find failed").expect("not found");
638        assert_eq!(found_item, item);
639        assert!(tree.find(&TestKey(2..2)).await.expect("find failed").is_none());
640    }
641
642    #[fuchsia::test]
643    async fn test_filter() {
644        let items = [
645            Item::new(TestKey(1..1), 1),
646            Item::new(TestKey(2..2), 2),
647            Item::new(TestKey(3..3), 3),
648            Item::new(TestKey(4..4), 4),
649        ];
650        let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
651        tree.insert(items[0].clone()).expect("insert error");
652        tree.insert(items[1].clone()).expect("insert error");
653        tree.insert(items[2].clone()).expect("insert error");
654        tree.insert(items[3].clone()).expect("insert error");
655
656        let layers = tree.layer_set();
657        let mut merger = layers.merger();
658
659        // Filter out odd keys (which also guarantees we skip the first key which is an edge case).
660        let mut iter = merger
661            .query(Query::FullScan)
662            .await
663            .expect("seek failed")
664            .filter(|item: ItemRef<'_, TestKey, u64>| item.key.0.start % 2 == 0)
665            .await
666            .expect("filter failed");
667
668        assert_eq!(iter.get(), Some(items[1].as_item_ref()));
669        iter.advance().await.expect("advance failed");
670        assert_eq!(iter.get(), Some(items[3].as_item_ref()));
671        iter.advance().await.expect("advance failed");
672        assert!(iter.get().is_none());
673    }
674
675    #[fuchsia::test]
676    async fn test_insert_order_agnostic() {
677        let items = [
678            Item::new(TestKey(1..1), 1),
679            Item::new(TestKey(2..2), 2),
680            Item::new(TestKey(3..3), 3),
681            Item::new(TestKey(4..4), 4),
682            Item::new(TestKey(5..5), 5),
683            Item::new(TestKey(6..6), 6),
684        ];
685        let a = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
686        for item in &items {
687            a.insert(item.clone()).expect("insert error");
688        }
689        let b = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
690        let mut shuffled = items.clone();
691        shuffled.shuffle(&mut rng());
692        for item in &shuffled {
693            b.insert(item.clone()).expect("insert error");
694        }
695        let layers = a.layer_set();
696        let mut merger = layers.merger();
697        let mut iter_a = merger.query(Query::FullScan).await.expect("seek failed");
698        let layers = b.layer_set();
699        let mut merger = layers.merger();
700        let mut iter_b = merger.query(Query::FullScan).await.expect("seek failed");
701
702        for item in items {
703            assert_eq!(Some(item.as_item_ref()), iter_a.get());
704            assert_eq!(Some(item.as_item_ref()), iter_b.get());
705            iter_a.advance().await.expect("advance failed");
706            iter_b.advance().await.expect("advance failed");
707        }
708        assert!(iter_a.get().is_none());
709        assert!(iter_b.get().is_none());
710    }
711
712    struct AuditCacheInner<'a, V: Value> {
713        lookups: u64,
714        completions: u64,
715        invalidations: u64,
716        drops: u64,
717        result: Option<ObjectCacheResult<'a, V>>,
718    }
719
720    impl<V: Value> AuditCacheInner<'_, V> {
721        fn stats(&self) -> (u64, u64, u64, u64) {
722            (self.lookups, self.completions, self.invalidations, self.drops)
723        }
724    }
725
726    struct AuditCache<'a, V: Value> {
727        inner: Arc<Mutex<AuditCacheInner<'a, V>>>,
728    }
729
730    impl<V: Value> AuditCache<'_, V> {
731        fn new() -> Self {
732            Self {
733                inner: Arc::new(Mutex::new(AuditCacheInner {
734                    lookups: 0,
735                    completions: 0,
736                    invalidations: 0,
737                    drops: 0,
738                    result: None,
739                })),
740            }
741        }
742    }
743
744    struct AuditPlaceholder<'a, V: Value> {
745        inner: Arc<Mutex<AuditCacheInner<'a, V>>>,
746        completed: Mutex<bool>,
747    }
748
749    impl<V: Value> ObjectCachePlaceholder<V> for AuditPlaceholder<'_, V> {
750        fn complete(self: Box<Self>, _: Option<&V>) {
751            self.inner.lock().completions += 1;
752            *self.completed.lock() = true;
753        }
754    }
755
756    impl<V: Value> Drop for AuditPlaceholder<'_, V> {
757        fn drop(&mut self) {
758            if !*self.completed.lock() {
759                self.inner.lock().drops += 1;
760            }
761        }
762    }
763
764    impl<K: Key + std::cmp::PartialEq, V: Value> ObjectCache<K, V> for AuditCache<'_, V> {
765        fn lookup_or_reserve(&self, _key: &K) -> ObjectCacheResult<'_, V> {
766            {
767                let mut inner = self.inner.lock();
768                inner.lookups += 1;
769                if inner.result.is_some() {
770                    return std::mem::take(&mut inner.result).unwrap();
771                }
772            }
773            ObjectCacheResult::Placeholder(Box::new(AuditPlaceholder {
774                inner: self.inner.clone(),
775                completed: Mutex::new(false),
776            }))
777        }
778
779        fn invalidate(&self, _key: K, _value: Option<V>) {
780            self.inner.lock().invalidations += 1;
781        }
782    }
783
784    #[fuchsia::test]
785    async fn test_cache_handling() {
786        let item = Item::new(TestKey(1..1), 1);
787        let cache = Box::new(AuditCache::new());
788        let inner = cache.inner.clone();
789        let a = LSMTree::new(emit_left_merge_fn, cache);
790
791        // Zero counters.
792        assert_eq!(inner.lock().stats(), (0, 0, 0, 0));
793
794        // Look for an item, but don't find it. So no insertion. It is dropped.
795        assert!(a.find(&item.key).await.expect("Failed find").is_none());
796        assert_eq!(inner.lock().stats(), (1, 0, 0, 1));
797
798        // Insert attempts to invalidate.
799        let _ = a.insert(item.clone());
800        assert_eq!(inner.lock().stats(), (1, 0, 1, 1));
801
802        // Look for item, find it and insert into the cache.
803        assert_eq!(
804            a.find(&item.key).await.expect("Failed find").expect("Item should be found.").value,
805            item.value
806        );
807        assert_eq!(inner.lock().stats(), (2, 1, 1, 1));
808
809        // Insert or replace attempts to invalidate as well.
810        a.replace_or_insert(item.clone());
811        assert_eq!(inner.lock().stats(), (2, 1, 2, 1));
812    }
813
814    #[fuchsia::test]
815    async fn test_cache_hit() {
816        let item = Item::new(TestKey(1..1), 1);
817        let cache = Box::new(AuditCache::new());
818        let inner = cache.inner.clone();
819        let a = LSMTree::new(emit_left_merge_fn, cache);
820
821        // Zero counters.
822        assert_eq!(inner.lock().stats(), (0, 0, 0, 0));
823
824        // Insert attempts to invalidate.
825        let _ = a.insert(item.clone());
826        assert_eq!(inner.lock().stats(), (0, 0, 1, 0));
827
828        // Set up the item to find in the cache.
829        inner.lock().result = Some(ObjectCacheResult::Value(item.value.clone()));
830
831        // Look for item, find it in cache, so no insert.
832        assert_eq!(
833            a.find(&item.key).await.expect("Failed find").expect("Item should be found.").value,
834            item.value
835        );
836        assert_eq!(inner.lock().stats(), (1, 0, 1, 0));
837    }
838
839    #[fuchsia::test]
840    async fn test_cache_says_uncacheable() {
841        let item = Item::new(TestKey(1..1), 1);
842        let cache = Box::new(AuditCache::new());
843        let inner = cache.inner.clone();
844        let a = LSMTree::new(emit_left_merge_fn, cache);
845        let _ = a.insert(item.clone());
846
847        // One invalidation from the insert.
848        assert_eq!(inner.lock().stats(), (0, 0, 1, 0));
849
850        // Set up the NoCache response to find in the cache.
851        inner.lock().result = Some(ObjectCacheResult::NoCache);
852
853        // Look for item, it is uncacheable, so no insert.
854        assert_eq!(
855            a.find(&item.key).await.expect("Failed find").expect("Should find item").value,
856            item.value
857        );
858        assert_eq!(inner.lock().stats(), (1, 0, 1, 0));
859    }
860
861    struct FailLayer {
862        drop_event: Mutex<Option<Arc<DropEvent>>>,
863    }
864
865    impl FailLayer {
866        fn new() -> Self {
867            Self { drop_event: Mutex::new(Some(Arc::new(DropEvent::new()))) }
868        }
869    }
870
871    #[async_trait]
872    impl<K: Key, V: Value> Layer<K, V> for FailLayer {
873        async fn seek(
874            &self,
875            _bound: std::ops::Bound<&K>,
876        ) -> Result<BoxedLayerIterator<'_, K, V>, Error> {
877            Err(anyhow!("Purposely failed seek"))
878        }
879
880        fn lock(&self) -> Option<Arc<DropEvent>> {
881            self.drop_event.lock().clone()
882        }
883
884        fn len(&self) -> usize {
885            0
886        }
887
888        async fn close(&self) {
889            let listener = match std::mem::replace(&mut (*self.drop_event.lock()), None) {
890                Some(drop_event) => drop_event.listen(),
891                None => return,
892            };
893            listener.await;
894        }
895
896        fn get_version(&self) -> Version {
897            LATEST_VERSION
898        }
899
900        async fn key_exists(&self, _key: &K) -> Result<Existence, Error> {
901            unimplemented!();
902        }
903    }
904
905    struct MockLayer {
906        exists_result: Existence,
907        drop_event: Mutex<Option<Arc<DropEvent>>>,
908    }
909
910    impl MockLayer {
911        fn new(exists_result: Existence) -> Self {
912            Self { exists_result, drop_event: Mutex::new(Some(Arc::new(DropEvent::new()))) }
913        }
914    }
915
916    #[async_trait]
917    impl<K: Key, V: Value> Layer<K, V> for MockLayer {
918        async fn seek(
919            &self,
920            _bound: std::ops::Bound<&K>,
921        ) -> Result<BoxedLayerIterator<'_, K, V>, Error> {
922            unimplemented!()
923        }
924
925        fn lock(&self) -> Option<Arc<DropEvent>> {
926            self.drop_event.lock().clone()
927        }
928
929        fn len(&self) -> usize {
930            0
931        }
932
933        async fn close(&self) {
934            let listener = match std::mem::replace(&mut (*self.drop_event.lock()), None) {
935                Some(drop_event) => drop_event.listen(),
936                None => return,
937            };
938            listener.await;
939        }
940
941        fn get_version(&self) -> Version {
942            LATEST_VERSION
943        }
944
945        async fn key_exists(&self, _key: &K) -> Result<Existence, Error> {
946            Ok(self.exists_result)
947        }
948    }
949
950    #[fuchsia::test]
951    async fn test_layer_set_key_exists() {
952        use super::LockedLayer;
953
954        let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
955        let mut layer_set = tree.empty_layer_set();
956
957        // Empty layer set should return Missing.
958        assert_eq!(
959            layer_set.key_exists(&TestKey(0..1)).await.expect("key_exists failed"),
960            Existence::Missing
961        );
962
963        // Add a layer that returns Missing.
964        layer_set.layers.push(LockedLayer::from(
965            Arc::new(MockLayer::new(Existence::Missing)) as Arc<dyn Layer<TestKey, u64>>
966        ));
967        assert_eq!(
968            layer_set.key_exists(&TestKey(0..1)).await.expect("key_exists failed"),
969            Existence::Missing
970        );
971
972        // Add a layer that returns MaybeExists.
973        layer_set.layers.push(LockedLayer::from(
974            Arc::new(MockLayer::new(Existence::MaybeExists)) as Arc<dyn Layer<TestKey, u64>>
975        ));
976        assert_eq!(
977            layer_set.key_exists(&TestKey(0..1)).await.expect("key_exists failed"),
978            Existence::MaybeExists
979        );
980
981        // Add a layer that returns Exists.
982        layer_set.layers.insert(
983            0,
984            LockedLayer::from(
985                Arc::new(MockLayer::new(Existence::Exists)) as Arc<dyn Layer<TestKey, u64>>
986            ),
987        );
988        assert_eq!(
989            layer_set.key_exists(&TestKey(0..1)).await.expect("key_exists failed"),
990            Existence::Exists
991        );
992    }
993
994    #[fuchsia::test]
995    async fn test_failed_lookup() {
996        let cache = Box::new(AuditCache::new());
997        let inner = cache.inner.clone();
998        let a = LSMTree::new(emit_left_merge_fn, cache);
999        a.set_layers(vec![Arc::new(FailLayer::new())]);
1000
1001        // Zero counters.
1002        assert_eq!(inner.lock().stats(), (0, 0, 0, 0));
1003
1004        // Lookup should fail and drop the placeholder.
1005        assert!(a.find(&TestKey(1..1)).await.is_err());
1006        assert_eq!(inner.lock().stats(), (1, 0, 0, 1));
1007    }
1008}
1009
1010#[cfg(fuzz)]
1011mod fuzz {
1012    use crate::lsm_tree::types::{
1013        FuzzyHash, Item, LayerKey, OrdLowerBound, OrdUpperBound, SortByU64, Value,
1014    };
1015    use crate::serialized_types::{
1016        LATEST_VERSION, Version, Versioned, VersionedLatest, versioned_type,
1017    };
1018    use arbitrary::Arbitrary;
1019    use fprint::TypeFingerprint;
1020    use fuzz::fuzz;
1021    use fxfs_macros::FuzzyHash;
1022    use std::hash::Hash;
1023
1024    #[derive(
1025        Arbitrary,
1026        Clone,
1027        Eq,
1028        Hash,
1029        FuzzyHash,
1030        PartialEq,
1031        Debug,
1032        serde::Serialize,
1033        serde::Deserialize,
1034        TypeFingerprint,
1035        Versioned,
1036    )]
1037    struct TestKey(std::ops::Range<u64>);
1038
1039    versioned_type! { 1.. => TestKey }
1040
1041    impl Versioned for u64 {}
1042    versioned_type! { 1.. => u64 }
1043
1044    impl LayerKey for TestKey {}
1045
1046    impl SortByU64 for TestKey {
1047        fn get_leading_u64(&self) -> u64 {
1048            self.0.start
1049        }
1050    }
1051
1052    impl OrdUpperBound for TestKey {
1053        fn cmp_upper_bound(&self, other: &TestKey) -> std::cmp::Ordering {
1054            self.0.end.cmp(&other.0.end)
1055        }
1056    }
1057
1058    impl OrdLowerBound for TestKey {
1059        fn cmp_lower_bound(&self, other: &Self) -> std::cmp::Ordering {
1060            self.0.start.cmp(&other.0.start)
1061        }
1062    }
1063
1064    impl Value for u64 {
1065        const DELETED_MARKER: Self = 0;
1066    }
1067
1068    // Note: This code isn't really dead. it's used below in
1069    // `fuzz_lsm_tree_action`. However, the `#[fuzz]` proc macro attribute
1070    // obfuscates the usage enough to confuse the compiler.
1071    #[allow(dead_code)]
1072    #[derive(Arbitrary)]
1073    enum FuzzAction {
1074        Insert(Item<TestKey, u64>),
1075        ReplaceOrInsert(Item<TestKey, u64>),
1076        MergeInto(Item<TestKey, u64>, TestKey),
1077        Find(TestKey),
1078        Seal,
1079    }
1080
1081    #[fuzz]
1082    fn fuzz_lsm_tree_actions(actions: Vec<FuzzAction>) {
1083        use super::LSMTree;
1084        use super::cache::NullCache;
1085        use crate::lsm_tree::merge::{MergeLayerIterator, MergeResult};
1086        use futures::executor::block_on;
1087
1088        fn emit_left_merge_fn(
1089            _left: &MergeLayerIterator<'_, TestKey, u64>,
1090            _right: &MergeLayerIterator<'_, TestKey, u64>,
1091        ) -> MergeResult<TestKey, u64> {
1092            MergeResult::EmitLeft
1093        }
1094
1095        let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
1096        for action in actions {
1097            match action {
1098                FuzzAction::Insert(item) => {
1099                    let _ = tree.insert(item);
1100                }
1101                FuzzAction::ReplaceOrInsert(item) => {
1102                    tree.replace_or_insert(item);
1103                }
1104                FuzzAction::Find(key) => {
1105                    block_on(tree.find(&key)).expect("find failed");
1106                }
1107                FuzzAction::MergeInto(item, bound) => tree.merge_into(item, &bound),
1108                FuzzAction::Seal => tree.seal(),
1109            };
1110        }
1111    }
1112}