fxfs/
lsm_tree.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5mod bloom_filter;
6pub mod cache;
7pub mod merge;
8pub mod persistent_layer;
9pub mod skip_list_layer;
10pub mod types;
11
12use crate::drop_event::DropEvent;
13use crate::log::*;
14use crate::object_handle::{ReadObjectHandle, WriteBytes};
15use crate::serialized_types::{LATEST_VERSION, Version};
16use anyhow::Error;
17use cache::{ObjectCache, ObjectCacheResult};
18use fuchsia_sync::{Mutex, RwLock};
19use persistent_layer::{PersistentLayer, PersistentLayerWriter};
20use skip_list_layer::SkipListLayer;
21use std::fmt;
22use std::sync::Arc;
23use types::{
24    Existence, Item, ItemRef, Key, Layer, LayerIterator, LayerKey, LayerWriter, MergeableKey,
25    OrdLowerBound, Value,
26};
27
28pub use merge::Query;
29
30const SKIP_LIST_LAYER_ITEMS: usize = 512;
31
32// For serialization.
33pub use persistent_layer::{
34    LayerHeader as PersistentLayerHeader, LayerHeaderV39 as PersistentLayerHeaderV39,
35    LayerInfo as PersistentLayerInfo, LayerInfoV39 as PersistentLayerInfoV39,
36};
37
38pub async fn layers_from_handles<K: Key, V: Value>(
39    handles: impl IntoIterator<Item = impl ReadObjectHandle + 'static>,
40) -> Result<Vec<Arc<dyn Layer<K, V>>>, Error> {
41    let mut layers = Vec::new();
42    for handle in handles {
43        layers.push(PersistentLayer::open(handle).await? as Arc<dyn Layer<K, V>>);
44    }
45    Ok(layers)
46}
47
48#[derive(Eq, PartialEq, Debug)]
49pub enum Operation {
50    Insert,
51    ReplaceOrInsert,
52    MergeInto,
53}
54
55pub type MutationCallback<K, V> = Option<Box<dyn Fn(Operation, &Item<K, V>) + Send + Sync>>;
56
57struct Inner<K, V> {
58    mutable_layer: Arc<SkipListLayer<K, V>>,
59    layers: Vec<Arc<dyn Layer<K, V>>>,
60    mutation_callback: MutationCallback<K, V>,
61}
62
63#[derive(Default)]
64pub(super) struct Counters {
65    num_seeks: usize,
66    // The following two metrics are used to compute the effectiveness of the bloom filters.
67    // `layer_files_total` tracks the number of layer files we might have looked at across all
68    // seeks, and `layer_files_skipped` tracks how many we skipped thanks to the bloom filter.
69    layer_files_total: usize,
70    layer_files_skipped: usize,
71}
72
73/// LSMTree manages a tree of layers to provide a key/value store.  Each layer contains deltas on
74/// the preceding layer.  The top layer is an in-memory mutable layer.  Layers can be compacted to
75/// form a new combined layer.
76pub struct LSMTree<K, V> {
77    data: RwLock<Inner<K, V>>,
78    merge_fn: merge::MergeFn<K, V>,
79    cache: Box<dyn ObjectCache<K, V>>,
80    counters: Arc<Mutex<Counters>>,
81}
82
83#[fxfs_trace::trace]
84impl<'tree, K: MergeableKey, V: Value> LSMTree<K, V> {
85    /// Creates a new empty tree.
86    pub fn new(merge_fn: merge::MergeFn<K, V>, cache: Box<dyn ObjectCache<K, V>>) -> Self {
87        LSMTree {
88            data: RwLock::new(Inner {
89                mutable_layer: Self::new_mutable_layer(),
90                layers: Vec::new(),
91                mutation_callback: None,
92            }),
93            merge_fn,
94            cache,
95            counters: Arc::new(Mutex::new(Default::default())),
96        }
97    }
98
99    /// Opens an existing tree from the provided handles to the layer objects.
100    pub async fn open(
101        merge_fn: merge::MergeFn<K, V>,
102        handles: impl IntoIterator<Item = impl ReadObjectHandle + 'static>,
103        cache: Box<dyn ObjectCache<K, V>>,
104    ) -> Result<Self, Error> {
105        Ok(LSMTree {
106            data: RwLock::new(Inner {
107                mutable_layer: Self::new_mutable_layer(),
108                layers: layers_from_handles(handles).await?,
109                mutation_callback: None,
110            }),
111            merge_fn,
112            cache,
113            counters: Arc::new(Mutex::new(Default::default())),
114        })
115    }
116
117    /// Replaces the immutable layers.
118    pub fn set_layers(&self, layers: Vec<Arc<dyn Layer<K, V>>>) {
119        self.data.write().layers = layers;
120    }
121
122    /// Appends to the given layers at the end i.e. they should be base layers.  This is supposed
123    /// to be used after replay when we are opening a tree and we have discovered the base layers.
124    pub async fn append_layers(
125        &self,
126        handles: impl IntoIterator<Item = impl ReadObjectHandle + 'static>,
127    ) -> Result<(), Error> {
128        let mut layers = layers_from_handles(handles).await?;
129        self.data.write().layers.append(&mut layers);
130        Ok(())
131    }
132
133    /// Resets the immutable layers.
134    pub fn reset_immutable_layers(&self) {
135        self.data.write().layers = Vec::new();
136    }
137
138    /// Seals the current mutable layer and creates a new one.
139    pub fn seal(&self) {
140        // We need to be sure there are no mutations currently in-progress.  This is currently
141        // guaranteed by ensuring that all mutations take a read lock on `data`.
142        let mut data = self.data.write();
143        let layer = std::mem::replace(&mut data.mutable_layer, Self::new_mutable_layer());
144        data.layers.insert(0, layer);
145    }
146
147    /// Resets the tree to an empty state.
148    pub fn reset(&self) {
149        let mut data = self.data.write();
150        data.layers = Vec::new();
151        data.mutable_layer = Self::new_mutable_layer();
152    }
153
154    /// Writes the items yielded by the iterator into the supplied object.
155    #[trace]
156    pub async fn compact_with_iterator<W: WriteBytes + Send>(
157        &self,
158        mut iterator: impl LayerIterator<K, V>,
159        num_items: usize,
160        writer: W,
161        block_size: u64,
162    ) -> Result<(), Error> {
163        let mut writer =
164            PersistentLayerWriter::<W, K, V>::new(writer, num_items, block_size).await?;
165        while let Some(item_ref) = iterator.get() {
166            debug!(item_ref:?; "compact: writing");
167            writer.write(item_ref).await?;
168            iterator.advance().await?;
169        }
170        writer.flush().await
171    }
172
173    /// Returns an empty layer-set for this tree.
174    pub fn empty_layer_set(&self) -> LayerSet<K, V> {
175        LayerSet { layers: Vec::new(), merge_fn: self.merge_fn, counters: self.counters.clone() }
176    }
177
178    /// Adds all the layers (including the mutable layer) to `layer_set`.
179    pub fn add_all_layers_to_layer_set(&self, layer_set: &mut LayerSet<K, V>) {
180        let data = self.data.read();
181        layer_set.layers.reserve_exact(data.layers.len() + 1);
182        layer_set
183            .layers
184            .push(LockedLayer::from(data.mutable_layer.clone() as Arc<dyn Layer<K, V>>));
185        for layer in &data.layers {
186            layer_set.layers.push(layer.clone().into());
187        }
188    }
189
190    /// Returns a clone of the current set of layers (including the mutable layer), after which one
191    /// can get an iterator.
192    pub fn layer_set(&self) -> LayerSet<K, V> {
193        let mut layer_set = self.empty_layer_set();
194        self.add_all_layers_to_layer_set(&mut layer_set);
195        layer_set
196    }
197
198    /// Returns the current set of immutable layers after which one can get an iterator (for e.g.
199    /// compacting).  Since these layers are immutable, getting an iterator should not block
200    /// anything else.
201    pub fn immutable_layer_set(&self) -> LayerSet<K, V> {
202        let data = self.data.read();
203        let mut layers = Vec::with_capacity(data.layers.len());
204        for layer in &data.layers {
205            layers.push(layer.clone().into());
206        }
207        LayerSet { layers, merge_fn: self.merge_fn, counters: self.counters.clone() }
208    }
209
210    /// Inserts an item into the mutable layer.
211    /// Returns error if item already exists.
212    pub fn insert(&self, item: Item<K, V>) -> Result<(), Error> {
213        let key = item.key.clone();
214        let val = if item.value == V::DELETED_MARKER { None } else { Some(item.value.clone()) };
215        {
216            // `seal` below relies on us holding a read lock whilst we do the mutation.
217            let data = self.data.read();
218            if let Some(mutation_callback) = data.mutation_callback.as_ref() {
219                mutation_callback(Operation::Insert, &item);
220            }
221            data.mutable_layer.insert(item)?;
222        }
223        self.cache.invalidate(key, val);
224        Ok(())
225    }
226
227    /// Replaces or inserts an item into the mutable layer.
228    pub fn replace_or_insert(&self, item: Item<K, V>) {
229        let key = item.key.clone();
230        let val = if item.value == V::DELETED_MARKER { None } else { Some(item.value.clone()) };
231        {
232            // `seal` below relies on us holding a read lock whilst we do the mutation.
233            let data = self.data.read();
234            if let Some(mutation_callback) = data.mutation_callback.as_ref() {
235                mutation_callback(Operation::ReplaceOrInsert, &item);
236            }
237            data.mutable_layer.replace_or_insert(item);
238        }
239        self.cache.invalidate(key, val);
240    }
241
242    /// Merges the given item into the mutable layer.
243    pub fn merge_into(&self, item: Item<K, V>, lower_bound: &K) {
244        let key = item.key.clone();
245        {
246            // `seal` below relies on us holding a read lock whilst we do the mutation.
247            let data = self.data.read();
248            if let Some(mutation_callback) = data.mutation_callback.as_ref() {
249                mutation_callback(Operation::MergeInto, &item);
250            }
251            data.mutable_layer.merge_into(item, lower_bound, self.merge_fn);
252        }
253        self.cache.invalidate(key, None);
254    }
255
256    /// Searches for an exact match for the given key. If the value is equal to
257    /// `Value::DELETED_MARKER` the item is considered missing and will not be returned.
258    pub async fn find(&self, search_key: &K) -> Result<Option<Item<K, V>>, Error>
259    where
260        K: Eq,
261    {
262        // It is important that the cache lookup is done prior to fetching the layer set as the
263        // placeholder returned acts as a sort of lock for the validity of the item that may be
264        // inserted later via that placeholder.
265        let token = match self.cache.lookup_or_reserve(search_key) {
266            ObjectCacheResult::Value(value) => {
267                if value == V::DELETED_MARKER {
268                    return Ok(None);
269                } else {
270                    return Ok(Some(Item::new(search_key.clone(), value)));
271                }
272            }
273            ObjectCacheResult::Placeholder(token) => Some(token),
274            ObjectCacheResult::NoCache => None,
275        };
276        let layer_set = self.layer_set();
277        let mut merger = layer_set.merger();
278
279        Ok(match merger.query(Query::Point(search_key)).await?.get() {
280            Some(ItemRef { key, value, sequence })
281                if key == search_key && *value != V::DELETED_MARKER =>
282            {
283                if let Some(token) = token {
284                    token.complete(Some(value));
285                }
286                Some(Item { key: key.clone(), value: value.clone(), sequence })
287            }
288            _ => None,
289        })
290    }
291
292    pub fn mutable_layer(&self) -> Arc<SkipListLayer<K, V>> {
293        self.data.read().mutable_layer.clone()
294    }
295
296    /// Sets a mutation callback which is a callback that is triggered whenever any mutations are
297    /// applied to the tree.  This might be useful for tests that want to record the precise
298    /// sequence of mutations that are applied to the tree.
299    pub fn set_mutation_callback(&self, mutation_callback: MutationCallback<K, V>) {
300        self.data.write().mutation_callback = mutation_callback;
301    }
302
303    /// Returns the earliest version used by a layer in the tree.
304    pub fn get_earliest_version(&self) -> Version {
305        let mut earliest_version = LATEST_VERSION;
306        for layer in self.layer_set().layers {
307            let layer_version = layer.get_version();
308            if layer_version < earliest_version {
309                earliest_version = layer_version;
310            }
311        }
312        return earliest_version;
313    }
314
315    /// Returns a new mutable layer.
316    pub fn new_mutable_layer() -> Arc<SkipListLayer<K, V>> {
317        SkipListLayer::new(SKIP_LIST_LAYER_ITEMS)
318    }
319
320    /// Replaces the mutable layer.
321    pub fn set_mutable_layer(&self, layer: Arc<SkipListLayer<K, V>>) {
322        self.data.write().mutable_layer = layer;
323    }
324
325    /// Records inspect data for the LSM tree into `node`.  Called lazily when inspect is queried.
326    pub fn record_inspect_data(&self, root: &fuchsia_inspect::Node) {
327        let layer_set = self.layer_set();
328        root.record_child("layers", move |node| {
329            let mut index = 0;
330            for layer in layer_set.layers {
331                node.record_child(format!("{index}"), move |node| {
332                    layer.1.record_inspect_data(node)
333                });
334                index += 1;
335            }
336        });
337        {
338            let counters = self.counters.lock();
339            root.record_uint("num_seeks", counters.num_seeks as u64);
340            root.record_uint(
341                "bloom_filter_success_percent",
342                if counters.layer_files_total == 0 {
343                    0
344                } else {
345                    (counters.layer_files_skipped * 100).div_ceil(counters.layer_files_total) as u64
346                },
347            );
348        }
349    }
350}
351
352/// This is an RAII wrapper for a layer which holds a lock on the layer (via the Layer::lock
353/// method).
354pub struct LockedLayer<K, V>(Arc<DropEvent>, Arc<dyn Layer<K, V>>);
355
356impl<K, V> LockedLayer<K, V> {
357    pub async fn close_layer(self) {
358        let layer = self.1;
359        std::mem::drop(self.0);
360        layer.close().await;
361    }
362}
363
364impl<K, V> From<Arc<dyn Layer<K, V>>> for LockedLayer<K, V> {
365    fn from(layer: Arc<dyn Layer<K, V>>) -> Self {
366        let event = layer.lock().unwrap();
367        Self(event, layer)
368    }
369}
370
371impl<K, V> std::ops::Deref for LockedLayer<K, V> {
372    type Target = Arc<dyn Layer<K, V>>;
373
374    fn deref(&self) -> &Self::Target {
375        &self.1
376    }
377}
378
379impl<K, V> AsRef<dyn Layer<K, V>> for LockedLayer<K, V> {
380    fn as_ref(&self) -> &(dyn Layer<K, V> + 'static) {
381        self.1.as_ref()
382    }
383}
384
385/// A LayerSet provides a snapshot of the layers at a particular point in time, and allows you to
386/// get an iterator.  Iterators borrow the layers so something needs to hold reference count.
387pub struct LayerSet<K, V> {
388    pub layers: Vec<LockedLayer<K, V>>,
389    merge_fn: merge::MergeFn<K, V>,
390    counters: Arc<Mutex<Counters>>,
391}
392
393impl<K: Key + LayerKey + OrdLowerBound, V: Value> LayerSet<K, V> {
394    pub fn sum_len(&self) -> usize {
395        let mut size = 0;
396        for layer in &self.layers {
397            size += layer.len()
398        }
399        size
400    }
401
402    pub fn merger(&self) -> merge::Merger<'_, K, V> {
403        merge::Merger::new(
404            self.layers.iter().map(|x| x.as_ref()),
405            self.merge_fn,
406            self.counters.clone(),
407        )
408    }
409
410    /// See `Layer::key_exists`.
411    pub async fn key_exists(&self, key: &K) -> Result<Existence, Error> {
412        for l in &self.layers {
413            match l.key_exists(key).await? {
414                e @ (Existence::Exists | Existence::MaybeExists) => return Ok(e),
415                _ => {}
416            }
417        }
418        Ok(Existence::Missing)
419    }
420}
421
422impl<K, V> fmt::Debug for LayerSet<K, V> {
423    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
424        fmt.debug_list()
425            .entries(self.layers.iter().map(|l| {
426                if let Some(handle) = l.handle() {
427                    format!("{}", handle.object_id())
428                } else {
429                    format!("{:?}", Arc::as_ptr(l))
430                }
431            }))
432            .finish()
433    }
434}
435
436#[cfg(test)]
437mod tests {
438    use super::LSMTree;
439    use crate::drop_event::DropEvent;
440    use crate::lsm_tree::cache::{
441        NullCache, ObjectCache, ObjectCachePlaceholder, ObjectCacheResult,
442    };
443    use crate::lsm_tree::merge::{MergeLayerIterator, MergeResult};
444    use crate::lsm_tree::types::{
445        BoxedLayerIterator, Existence, FuzzyHash, Item, ItemRef, Key, Layer, LayerIterator,
446        LayerKey, OrdLowerBound, OrdUpperBound, SortByU64, Value,
447    };
448    use crate::lsm_tree::{Query, layers_from_handles};
449    use crate::object_handle::ObjectHandle;
450    use crate::serialized_types::{
451        LATEST_VERSION, Version, Versioned, VersionedLatest, versioned_type,
452    };
453    use crate::testing::fake_object::{FakeObject, FakeObjectHandle};
454    use crate::testing::writer::Writer;
455    use anyhow::{Error, anyhow};
456    use async_trait::async_trait;
457    use fprint::TypeFingerprint;
458    use fuchsia_sync::Mutex;
459    use fxfs_macros::FuzzyHash;
460    use rand::rng;
461    use rand::seq::SliceRandom;
462    use std::hash::Hash;
463    use std::sync::Arc;
464
465    #[derive(
466        Clone,
467        Eq,
468        PartialEq,
469        Debug,
470        Hash,
471        FuzzyHash,
472        serde::Serialize,
473        serde::Deserialize,
474        TypeFingerprint,
475        Versioned,
476    )]
477    struct TestKey(std::ops::Range<u64>);
478
479    versioned_type! { 1.. => TestKey }
480
481    impl SortByU64 for TestKey {
482        fn get_leading_u64(&self) -> u64 {
483            self.0.start
484        }
485    }
486
487    impl LayerKey for TestKey {}
488
489    impl OrdUpperBound for TestKey {
490        fn cmp_upper_bound(&self, other: &TestKey) -> std::cmp::Ordering {
491            self.0.end.cmp(&other.0.end)
492        }
493    }
494
495    impl OrdLowerBound for TestKey {
496        fn cmp_lower_bound(&self, other: &Self) -> std::cmp::Ordering {
497            self.0.start.cmp(&other.0.start)
498        }
499    }
500
501    fn emit_left_merge_fn(
502        _left: &MergeLayerIterator<'_, TestKey, u64>,
503        _right: &MergeLayerIterator<'_, TestKey, u64>,
504    ) -> MergeResult<TestKey, u64> {
505        MergeResult::EmitLeft
506    }
507
508    impl Value for u64 {
509        const DELETED_MARKER: Self = 0;
510    }
511
512    #[fuchsia::test]
513    async fn test_iteration() {
514        let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
515        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
516        tree.insert(items[0].clone()).expect("insert error");
517        tree.insert(items[1].clone()).expect("insert error");
518        let layers = tree.layer_set();
519        let mut merger = layers.merger();
520        let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
521        let ItemRef { key, value, .. } = iter.get().expect("missing item");
522        assert_eq!((key, value), (&items[0].key, &items[0].value));
523        iter.advance().await.expect("advance failed");
524        let ItemRef { key, value, .. } = iter.get().expect("missing item");
525        assert_eq!((key, value), (&items[1].key, &items[1].value));
526        iter.advance().await.expect("advance failed");
527        assert!(iter.get().is_none());
528    }
529
530    #[fuchsia::test]
531    async fn test_compact() {
532        let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
533        let items = [
534            Item::new(TestKey(1..1), 1),
535            Item::new(TestKey(2..2), 2),
536            Item::new(TestKey(3..3), 3),
537            Item::new(TestKey(4..4), 4),
538        ];
539        tree.insert(items[0].clone()).expect("insert error");
540        tree.insert(items[1].clone()).expect("insert error");
541        tree.seal();
542        tree.insert(items[2].clone()).expect("insert error");
543        tree.insert(items[3].clone()).expect("insert error");
544        tree.seal();
545        let object = Arc::new(FakeObject::new());
546        let handle = FakeObjectHandle::new(object.clone());
547        {
548            let layer_set = tree.immutable_layer_set();
549            let mut merger = layer_set.merger();
550            let iter = merger.query(Query::FullScan).await.expect("create merger");
551            tree.compact_with_iterator(
552                iter,
553                items.len(),
554                Writer::new(&handle).await,
555                handle.block_size(),
556            )
557            .await
558            .expect("compact failed");
559        }
560        tree.set_layers(layers_from_handles([handle]).await.expect("layers_from_handles failed"));
561        let handle = FakeObjectHandle::new(object.clone());
562        let tree = LSMTree::open(emit_left_merge_fn, [handle], Box::new(NullCache {}))
563            .await
564            .expect("open failed");
565
566        let layers = tree.layer_set();
567        let mut merger = layers.merger();
568        let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
569        for i in 1..5 {
570            let ItemRef { key, value, .. } = iter.get().expect("missing item");
571            assert_eq!((key, value), (&TestKey(i..i), &i));
572            iter.advance().await.expect("advance failed");
573        }
574        assert!(iter.get().is_none());
575    }
576
577    #[fuchsia::test]
578    async fn test_find() {
579        let items = [
580            Item::new(TestKey(1..1), 1),
581            Item::new(TestKey(2..2), 2),
582            Item::new(TestKey(3..3), 3),
583            Item::new(TestKey(4..4), 4),
584        ];
585        let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
586        tree.insert(items[0].clone()).expect("insert error");
587        tree.insert(items[1].clone()).expect("insert error");
588        tree.seal();
589        tree.insert(items[2].clone()).expect("insert error");
590        tree.insert(items[3].clone()).expect("insert error");
591
592        let item = tree.find(&items[1].key).await.expect("find failed").expect("not found");
593        assert_eq!(item, items[1]);
594        assert!(tree.find(&TestKey(100..100)).await.expect("find failed").is_none());
595    }
596
597    #[fuchsia::test]
598    async fn test_find_no_return_deleted_values() {
599        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), u64::DELETED_MARKER)];
600        let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
601        tree.insert(items[0].clone()).expect("insert error");
602        tree.insert(items[1].clone()).expect("insert error");
603
604        let item = tree.find(&items[0].key).await.expect("find failed").expect("not found");
605        assert_eq!(item, items[0]);
606        assert!(tree.find(&items[1].key).await.expect("find failed").is_none());
607    }
608
609    #[fuchsia::test]
610    async fn test_empty_seal() {
611        let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
612        tree.seal();
613        let item = Item::new(TestKey(1..1), 1);
614        tree.insert(item.clone()).expect("insert error");
615        let object = Arc::new(FakeObject::new());
616        let handle = FakeObjectHandle::new(object.clone());
617        {
618            let layer_set = tree.immutable_layer_set();
619            let mut merger = layer_set.merger();
620            let iter = merger.query(Query::FullScan).await.expect("create merger");
621            tree.compact_with_iterator(iter, 0, Writer::new(&handle).await, handle.block_size())
622                .await
623                .expect("compact failed");
624        }
625        tree.set_layers(layers_from_handles([handle]).await.expect("layers_from_handles failed"));
626        let found_item = tree.find(&item.key).await.expect("find failed").expect("not found");
627        assert_eq!(found_item, item);
628        assert!(tree.find(&TestKey(2..2)).await.expect("find failed").is_none());
629    }
630
631    #[fuchsia::test]
632    async fn test_filter() {
633        let items = [
634            Item::new(TestKey(1..1), 1),
635            Item::new(TestKey(2..2), 2),
636            Item::new(TestKey(3..3), 3),
637            Item::new(TestKey(4..4), 4),
638        ];
639        let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
640        tree.insert(items[0].clone()).expect("insert error");
641        tree.insert(items[1].clone()).expect("insert error");
642        tree.insert(items[2].clone()).expect("insert error");
643        tree.insert(items[3].clone()).expect("insert error");
644
645        let layers = tree.layer_set();
646        let mut merger = layers.merger();
647
648        // Filter out odd keys (which also guarantees we skip the first key which is an edge case).
649        let mut iter = merger
650            .query(Query::FullScan)
651            .await
652            .expect("seek failed")
653            .filter(|item: ItemRef<'_, TestKey, u64>| item.key.0.start % 2 == 0)
654            .await
655            .expect("filter failed");
656
657        assert_eq!(iter.get(), Some(items[1].as_item_ref()));
658        iter.advance().await.expect("advance failed");
659        assert_eq!(iter.get(), Some(items[3].as_item_ref()));
660        iter.advance().await.expect("advance failed");
661        assert!(iter.get().is_none());
662    }
663
664    #[fuchsia::test]
665    async fn test_insert_order_agnostic() {
666        let items = [
667            Item::new(TestKey(1..1), 1),
668            Item::new(TestKey(2..2), 2),
669            Item::new(TestKey(3..3), 3),
670            Item::new(TestKey(4..4), 4),
671            Item::new(TestKey(5..5), 5),
672            Item::new(TestKey(6..6), 6),
673        ];
674        let a = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
675        for item in &items {
676            a.insert(item.clone()).expect("insert error");
677        }
678        let b = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
679        let mut shuffled = items.clone();
680        shuffled.shuffle(&mut rng());
681        for item in &shuffled {
682            b.insert(item.clone()).expect("insert error");
683        }
684        let layers = a.layer_set();
685        let mut merger = layers.merger();
686        let mut iter_a = merger.query(Query::FullScan).await.expect("seek failed");
687        let layers = b.layer_set();
688        let mut merger = layers.merger();
689        let mut iter_b = merger.query(Query::FullScan).await.expect("seek failed");
690
691        for item in items {
692            assert_eq!(Some(item.as_item_ref()), iter_a.get());
693            assert_eq!(Some(item.as_item_ref()), iter_b.get());
694            iter_a.advance().await.expect("advance failed");
695            iter_b.advance().await.expect("advance failed");
696        }
697        assert!(iter_a.get().is_none());
698        assert!(iter_b.get().is_none());
699    }
700
701    struct AuditCacheInner<'a, V: Value> {
702        lookups: u64,
703        completions: u64,
704        invalidations: u64,
705        drops: u64,
706        result: Option<ObjectCacheResult<'a, V>>,
707    }
708
709    impl<V: Value> AuditCacheInner<'_, V> {
710        fn stats(&self) -> (u64, u64, u64, u64) {
711            (self.lookups, self.completions, self.invalidations, self.drops)
712        }
713    }
714
715    struct AuditCache<'a, V: Value> {
716        inner: Arc<Mutex<AuditCacheInner<'a, V>>>,
717    }
718
719    impl<V: Value> AuditCache<'_, V> {
720        fn new() -> Self {
721            Self {
722                inner: Arc::new(Mutex::new(AuditCacheInner {
723                    lookups: 0,
724                    completions: 0,
725                    invalidations: 0,
726                    drops: 0,
727                    result: None,
728                })),
729            }
730        }
731    }
732
733    struct AuditPlaceholder<'a, V: Value> {
734        inner: Arc<Mutex<AuditCacheInner<'a, V>>>,
735        completed: Mutex<bool>,
736    }
737
738    impl<V: Value> ObjectCachePlaceholder<V> for AuditPlaceholder<'_, V> {
739        fn complete(self: Box<Self>, _: Option<&V>) {
740            self.inner.lock().completions += 1;
741            *self.completed.lock() = true;
742        }
743    }
744
745    impl<V: Value> Drop for AuditPlaceholder<'_, V> {
746        fn drop(&mut self) {
747            if !*self.completed.lock() {
748                self.inner.lock().drops += 1;
749            }
750        }
751    }
752
753    impl<K: Key + std::cmp::PartialEq, V: Value> ObjectCache<K, V> for AuditCache<'_, V> {
754        fn lookup_or_reserve(&self, _key: &K) -> ObjectCacheResult<'_, V> {
755            {
756                let mut inner = self.inner.lock();
757                inner.lookups += 1;
758                if inner.result.is_some() {
759                    return std::mem::take(&mut inner.result).unwrap();
760                }
761            }
762            ObjectCacheResult::Placeholder(Box::new(AuditPlaceholder {
763                inner: self.inner.clone(),
764                completed: Mutex::new(false),
765            }))
766        }
767
768        fn invalidate(&self, _key: K, _value: Option<V>) {
769            self.inner.lock().invalidations += 1;
770        }
771    }
772
773    #[fuchsia::test]
774    async fn test_cache_handling() {
775        let item = Item::new(TestKey(1..1), 1);
776        let cache = Box::new(AuditCache::new());
777        let inner = cache.inner.clone();
778        let a = LSMTree::new(emit_left_merge_fn, cache);
779
780        // Zero counters.
781        assert_eq!(inner.lock().stats(), (0, 0, 0, 0));
782
783        // Look for an item, but don't find it. So no insertion. It is dropped.
784        assert!(a.find(&item.key).await.expect("Failed find").is_none());
785        assert_eq!(inner.lock().stats(), (1, 0, 0, 1));
786
787        // Insert attempts to invalidate.
788        let _ = a.insert(item.clone());
789        assert_eq!(inner.lock().stats(), (1, 0, 1, 1));
790
791        // Look for item, find it and insert into the cache.
792        assert_eq!(
793            a.find(&item.key).await.expect("Failed find").expect("Item should be found.").value,
794            item.value
795        );
796        assert_eq!(inner.lock().stats(), (2, 1, 1, 1));
797
798        // Insert or replace attempts to invalidate as well.
799        a.replace_or_insert(item.clone());
800        assert_eq!(inner.lock().stats(), (2, 1, 2, 1));
801    }
802
803    #[fuchsia::test]
804    async fn test_cache_hit() {
805        let item = Item::new(TestKey(1..1), 1);
806        let cache = Box::new(AuditCache::new());
807        let inner = cache.inner.clone();
808        let a = LSMTree::new(emit_left_merge_fn, cache);
809
810        // Zero counters.
811        assert_eq!(inner.lock().stats(), (0, 0, 0, 0));
812
813        // Insert attempts to invalidate.
814        let _ = a.insert(item.clone());
815        assert_eq!(inner.lock().stats(), (0, 0, 1, 0));
816
817        // Set up the item to find in the cache.
818        inner.lock().result = Some(ObjectCacheResult::Value(item.value.clone()));
819
820        // Look for item, find it in cache, so no insert.
821        assert_eq!(
822            a.find(&item.key).await.expect("Failed find").expect("Item should be found.").value,
823            item.value
824        );
825        assert_eq!(inner.lock().stats(), (1, 0, 1, 0));
826    }
827
828    #[fuchsia::test]
829    async fn test_cache_says_uncacheable() {
830        let item = Item::new(TestKey(1..1), 1);
831        let cache = Box::new(AuditCache::new());
832        let inner = cache.inner.clone();
833        let a = LSMTree::new(emit_left_merge_fn, cache);
834        let _ = a.insert(item.clone());
835
836        // One invalidation from the insert.
837        assert_eq!(inner.lock().stats(), (0, 0, 1, 0));
838
839        // Set up the NoCache response to find in the cache.
840        inner.lock().result = Some(ObjectCacheResult::NoCache);
841
842        // Look for item, it is uncacheable, so no insert.
843        assert_eq!(
844            a.find(&item.key).await.expect("Failed find").expect("Should find item").value,
845            item.value
846        );
847        assert_eq!(inner.lock().stats(), (1, 0, 1, 0));
848    }
849
850    struct FailLayer {
851        drop_event: Mutex<Option<Arc<DropEvent>>>,
852    }
853
854    impl FailLayer {
855        fn new() -> Self {
856            Self { drop_event: Mutex::new(Some(Arc::new(DropEvent::new()))) }
857        }
858    }
859
860    #[async_trait]
861    impl<K: Key, V: Value> Layer<K, V> for FailLayer {
862        async fn seek(
863            &self,
864            _bound: std::ops::Bound<&K>,
865        ) -> Result<BoxedLayerIterator<'_, K, V>, Error> {
866            Err(anyhow!("Purposely failed seek"))
867        }
868
869        fn lock(&self) -> Option<Arc<DropEvent>> {
870            self.drop_event.lock().clone()
871        }
872
873        fn len(&self) -> usize {
874            0
875        }
876
877        async fn close(&self) {
878            let listener = match std::mem::replace(&mut (*self.drop_event.lock()), None) {
879                Some(drop_event) => drop_event.listen(),
880                None => return,
881            };
882            listener.await;
883        }
884
885        fn get_version(&self) -> Version {
886            LATEST_VERSION
887        }
888
889        async fn key_exists(&self, _key: &K) -> Result<Existence, Error> {
890            unimplemented!();
891        }
892    }
893
894    struct MockLayer {
895        exists_result: Existence,
896        drop_event: Mutex<Option<Arc<DropEvent>>>,
897    }
898
899    impl MockLayer {
900        fn new(exists_result: Existence) -> Self {
901            Self { exists_result, drop_event: Mutex::new(Some(Arc::new(DropEvent::new()))) }
902        }
903    }
904
905    #[async_trait]
906    impl<K: Key, V: Value> Layer<K, V> for MockLayer {
907        async fn seek(
908            &self,
909            _bound: std::ops::Bound<&K>,
910        ) -> Result<BoxedLayerIterator<'_, K, V>, Error> {
911            unimplemented!()
912        }
913
914        fn lock(&self) -> Option<Arc<DropEvent>> {
915            self.drop_event.lock().clone()
916        }
917
918        fn len(&self) -> usize {
919            0
920        }
921
922        async fn close(&self) {
923            let listener = match std::mem::replace(&mut (*self.drop_event.lock()), None) {
924                Some(drop_event) => drop_event.listen(),
925                None => return,
926            };
927            listener.await;
928        }
929
930        fn get_version(&self) -> Version {
931            LATEST_VERSION
932        }
933
934        async fn key_exists(&self, _key: &K) -> Result<Existence, Error> {
935            Ok(self.exists_result)
936        }
937    }
938
939    #[fuchsia::test]
940    async fn test_layer_set_key_exists() {
941        use super::LockedLayer;
942
943        let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
944        let mut layer_set = tree.empty_layer_set();
945
946        // Empty layer set should return Missing.
947        assert_eq!(
948            layer_set.key_exists(&TestKey(0..1)).await.expect("key_exists failed"),
949            Existence::Missing
950        );
951
952        // Add a layer that returns Missing.
953        layer_set.layers.push(LockedLayer::from(
954            Arc::new(MockLayer::new(Existence::Missing)) as Arc<dyn Layer<TestKey, u64>>
955        ));
956        assert_eq!(
957            layer_set.key_exists(&TestKey(0..1)).await.expect("key_exists failed"),
958            Existence::Missing
959        );
960
961        // Add a layer that returns MaybeExists.
962        layer_set.layers.push(LockedLayer::from(
963            Arc::new(MockLayer::new(Existence::MaybeExists)) as Arc<dyn Layer<TestKey, u64>>
964        ));
965        assert_eq!(
966            layer_set.key_exists(&TestKey(0..1)).await.expect("key_exists failed"),
967            Existence::MaybeExists
968        );
969
970        // Add a layer that returns Exists.
971        layer_set.layers.insert(
972            0,
973            LockedLayer::from(
974                Arc::new(MockLayer::new(Existence::Exists)) as Arc<dyn Layer<TestKey, u64>>
975            ),
976        );
977        assert_eq!(
978            layer_set.key_exists(&TestKey(0..1)).await.expect("key_exists failed"),
979            Existence::Exists
980        );
981    }
982
983    #[fuchsia::test]
984    async fn test_failed_lookup() {
985        let cache = Box::new(AuditCache::new());
986        let inner = cache.inner.clone();
987        let a = LSMTree::new(emit_left_merge_fn, cache);
988        a.set_layers(vec![Arc::new(FailLayer::new())]);
989
990        // Zero counters.
991        assert_eq!(inner.lock().stats(), (0, 0, 0, 0));
992
993        // Lookup should fail and drop the placeholder.
994        assert!(a.find(&TestKey(1..1)).await.is_err());
995        assert_eq!(inner.lock().stats(), (1, 0, 0, 1));
996    }
997}
998
999#[cfg(fuzz)]
1000mod fuzz {
1001    use crate::lsm_tree::types::{
1002        FuzzyHash, Item, LayerKey, OrdLowerBound, OrdUpperBound, SortByU64, Value,
1003    };
1004    use crate::serialized_types::{
1005        LATEST_VERSION, Version, Versioned, VersionedLatest, versioned_type,
1006    };
1007    use arbitrary::Arbitrary;
1008    use fprint::TypeFingerprint;
1009    use fuzz::fuzz;
1010    use fxfs_macros::FuzzyHash;
1011    use std::hash::Hash;
1012
1013    #[derive(
1014        Arbitrary,
1015        Clone,
1016        Eq,
1017        Hash,
1018        FuzzyHash,
1019        PartialEq,
1020        Debug,
1021        serde::Serialize,
1022        serde::Deserialize,
1023        TypeFingerprint,
1024        Versioned,
1025    )]
1026    struct TestKey(std::ops::Range<u64>);
1027
1028    versioned_type! { 1.. => TestKey }
1029
1030    impl Versioned for u64 {}
1031    versioned_type! { 1.. => u64 }
1032
1033    impl LayerKey for TestKey {}
1034
1035    impl SortByU64 for TestKey {
1036        fn get_leading_u64(&self) -> u64 {
1037            self.0.start
1038        }
1039    }
1040
1041    impl OrdUpperBound for TestKey {
1042        fn cmp_upper_bound(&self, other: &TestKey) -> std::cmp::Ordering {
1043            self.0.end.cmp(&other.0.end)
1044        }
1045    }
1046
1047    impl OrdLowerBound for TestKey {
1048        fn cmp_lower_bound(&self, other: &Self) -> std::cmp::Ordering {
1049            self.0.start.cmp(&other.0.start)
1050        }
1051    }
1052
1053    impl Value for u64 {
1054        const DELETED_MARKER: Self = 0;
1055    }
1056
1057    // Note: This code isn't really dead. it's used below in
1058    // `fuzz_lsm_tree_action`. However, the `#[fuzz]` proc macro attribute
1059    // obfuscates the usage enough to confuse the compiler.
1060    #[allow(dead_code)]
1061    #[derive(Arbitrary)]
1062    enum FuzzAction {
1063        Insert(Item<TestKey, u64>),
1064        ReplaceOrInsert(Item<TestKey, u64>),
1065        MergeInto(Item<TestKey, u64>, TestKey),
1066        Find(TestKey),
1067        Seal,
1068    }
1069
1070    #[fuzz]
1071    fn fuzz_lsm_tree_actions(actions: Vec<FuzzAction>) {
1072        use super::LSMTree;
1073        use super::cache::NullCache;
1074        use crate::lsm_tree::merge::{MergeLayerIterator, MergeResult};
1075        use futures::executor::block_on;
1076
1077        fn emit_left_merge_fn(
1078            _left: &MergeLayerIterator<'_, TestKey, u64>,
1079            _right: &MergeLayerIterator<'_, TestKey, u64>,
1080        ) -> MergeResult<TestKey, u64> {
1081            MergeResult::EmitLeft
1082        }
1083
1084        let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
1085        for action in actions {
1086            match action {
1087                FuzzAction::Insert(item) => {
1088                    let _ = tree.insert(item);
1089                }
1090                FuzzAction::ReplaceOrInsert(item) => {
1091                    tree.replace_or_insert(item);
1092                }
1093                FuzzAction::Find(key) => {
1094                    block_on(tree.find(&key)).expect("find failed");
1095                }
1096                FuzzAction::MergeInto(item, bound) => tree.merge_into(item, &bound),
1097                FuzzAction::Seal => tree.seal(),
1098            };
1099        }
1100    }
1101}