fxfs/
lsm_tree.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5mod bloom_filter;
6pub mod cache;
7pub mod merge;
8pub mod persistent_layer;
9pub mod skip_list_layer;
10pub mod types;
11
12use crate::drop_event::DropEvent;
13use crate::log::*;
14use crate::object_handle::{ReadObjectHandle, WriteBytes};
15use crate::serialized_types::{Version, LATEST_VERSION};
16use anyhow::Error;
17use cache::{ObjectCache, ObjectCacheResult};
18use fuchsia_sync::{Mutex, RwLock};
19use persistent_layer::{PersistentLayer, PersistentLayerWriter};
20use skip_list_layer::SkipListLayer;
21use std::fmt;
22use std::sync::Arc;
23use types::{
24    Item, ItemRef, Key, Layer, LayerIterator, LayerKey, LayerWriter, MergeableKey, OrdLowerBound,
25    Value,
26};
27
28pub use merge::Query;
29
30const SKIP_LIST_LAYER_ITEMS: usize = 512;
31
32// For serialization.
33pub use persistent_layer::{
34    LayerHeader as PersistentLayerHeader, LayerHeaderV39 as PersistentLayerHeaderV39,
35    LayerInfo as PersistentLayerInfo, LayerInfoV39 as PersistentLayerInfoV39,
36    OldLayerInfo as OldPersistentLayerInfo, OldLayerInfoV32 as OldPersistentLayerInfoV32,
37};
38
39pub async fn layers_from_handles<K: Key, V: Value>(
40    handles: impl IntoIterator<Item = impl ReadObjectHandle + 'static>,
41) -> Result<Vec<Arc<dyn Layer<K, V>>>, Error> {
42    let mut layers = Vec::new();
43    for handle in handles {
44        layers.push(PersistentLayer::open(handle).await? as Arc<dyn Layer<K, V>>);
45    }
46    Ok(layers)
47}
48
49#[derive(Eq, PartialEq, Debug)]
50pub enum Operation {
51    Insert,
52    ReplaceOrInsert,
53    MergeInto,
54}
55
56pub type MutationCallback<K, V> = Option<Box<dyn Fn(Operation, &Item<K, V>) + Send + Sync>>;
57
58struct Inner<K, V> {
59    mutable_layer: Arc<SkipListLayer<K, V>>,
60    layers: Vec<Arc<dyn Layer<K, V>>>,
61    mutation_callback: MutationCallback<K, V>,
62}
63
64#[derive(Default)]
65pub(super) struct Counters {
66    num_seeks: usize,
67    // The following two metrics are used to compute the effectiveness of the bloom filters.
68    // `layer_files_total` tracks the number of layer files we might have looked at across all
69    // seeks, and `layer_files_skipped` tracks how many we skipped thanks to the bloom filter.
70    layer_files_total: usize,
71    layer_files_skipped: usize,
72}
73
74/// LSMTree manages a tree of layers to provide a key/value store.  Each layer contains deltas on
75/// the preceding layer.  The top layer is an in-memory mutable layer.  Layers can be compacted to
76/// form a new combined layer.
77pub struct LSMTree<K, V> {
78    data: RwLock<Inner<K, V>>,
79    merge_fn: merge::MergeFn<K, V>,
80    cache: Box<dyn ObjectCache<K, V>>,
81    counters: Arc<Mutex<Counters>>,
82}
83
84#[fxfs_trace::trace]
85impl<'tree, K: MergeableKey, V: Value> LSMTree<K, V> {
86    /// Creates a new empty tree.
87    pub fn new(merge_fn: merge::MergeFn<K, V>, cache: Box<dyn ObjectCache<K, V>>) -> Self {
88        LSMTree {
89            data: RwLock::new(Inner {
90                mutable_layer: Self::new_mutable_layer(),
91                layers: Vec::new(),
92                mutation_callback: None,
93            }),
94            merge_fn,
95            cache,
96            counters: Arc::new(Mutex::new(Default::default())),
97        }
98    }
99
100    /// Opens an existing tree from the provided handles to the layer objects.
101    pub async fn open(
102        merge_fn: merge::MergeFn<K, V>,
103        handles: impl IntoIterator<Item = impl ReadObjectHandle + 'static>,
104        cache: Box<dyn ObjectCache<K, V>>,
105    ) -> Result<Self, Error> {
106        Ok(LSMTree {
107            data: RwLock::new(Inner {
108                mutable_layer: Self::new_mutable_layer(),
109                layers: layers_from_handles(handles).await?,
110                mutation_callback: None,
111            }),
112            merge_fn,
113            cache,
114            counters: Arc::new(Mutex::new(Default::default())),
115        })
116    }
117
118    /// Replaces the immutable layers.
119    pub fn set_layers(&self, layers: Vec<Arc<dyn Layer<K, V>>>) {
120        self.data.write().layers = layers;
121    }
122
123    /// Appends to the given layers at the end i.e. they should be base layers.  This is supposed
124    /// to be used after replay when we are opening a tree and we have discovered the base layers.
125    pub async fn append_layers(
126        &self,
127        handles: impl IntoIterator<Item = impl ReadObjectHandle + 'static>,
128    ) -> Result<(), Error> {
129        let mut layers = layers_from_handles(handles).await?;
130        self.data.write().layers.append(&mut layers);
131        Ok(())
132    }
133
134    /// Resets the immutable layers.
135    pub fn reset_immutable_layers(&self) {
136        self.data.write().layers = Vec::new();
137    }
138
139    /// Seals the current mutable layer and creates a new one.
140    pub fn seal(&self) {
141        // We need to be sure there are no mutations currently in-progress.  This is currently
142        // guaranteed by ensuring that all mutations take a read lock on `data`.
143        let mut data = self.data.write();
144        let layer = std::mem::replace(&mut data.mutable_layer, Self::new_mutable_layer());
145        data.layers.insert(0, layer);
146    }
147
148    /// Resets the tree to an empty state.
149    pub fn reset(&self) {
150        let mut data = self.data.write();
151        data.layers = Vec::new();
152        data.mutable_layer = Self::new_mutable_layer();
153    }
154
155    /// Writes the items yielded by the iterator into the supplied object.
156    #[trace]
157    pub async fn compact_with_iterator<W: WriteBytes + Send>(
158        &self,
159        mut iterator: impl LayerIterator<K, V>,
160        num_items: usize,
161        writer: W,
162        block_size: u64,
163    ) -> Result<(), Error> {
164        let mut writer =
165            PersistentLayerWriter::<W, K, V>::new(writer, num_items, block_size).await?;
166        while let Some(item_ref) = iterator.get() {
167            debug!(item_ref:?; "compact: writing");
168            writer.write(item_ref).await?;
169            iterator.advance().await?;
170        }
171        writer.flush().await
172    }
173
174    /// Returns an empty layer-set for this tree.
175    pub fn empty_layer_set(&self) -> LayerSet<K, V> {
176        LayerSet { layers: Vec::new(), merge_fn: self.merge_fn, counters: self.counters.clone() }
177    }
178
179    /// Adds all the layers (including the mutable layer) to `layer_set`.
180    pub fn add_all_layers_to_layer_set(&self, layer_set: &mut LayerSet<K, V>) {
181        let data = self.data.read();
182        layer_set.layers.reserve_exact(data.layers.len() + 1);
183        layer_set
184            .layers
185            .push(LockedLayer::from(data.mutable_layer.clone() as Arc<dyn Layer<K, V>>));
186        for layer in &data.layers {
187            layer_set.layers.push(layer.clone().into());
188        }
189    }
190
191    /// Returns a clone of the current set of layers (including the mutable layer), after which one
192    /// can get an iterator.
193    pub fn layer_set(&self) -> LayerSet<K, V> {
194        let mut layer_set = self.empty_layer_set();
195        self.add_all_layers_to_layer_set(&mut layer_set);
196        layer_set
197    }
198
199    /// Returns the current set of immutable layers after which one can get an iterator (for e.g.
200    /// compacting).  Since these layers are immutable, getting an iterator should not block
201    /// anything else.
202    pub fn immutable_layer_set(&self) -> LayerSet<K, V> {
203        let data = self.data.read();
204        let mut layers = Vec::with_capacity(data.layers.len());
205        for layer in &data.layers {
206            layers.push(layer.clone().into());
207        }
208        LayerSet { layers, merge_fn: self.merge_fn, counters: self.counters.clone() }
209    }
210
211    /// Inserts an item into the mutable layer.
212    /// Returns error if item already exists.
213    pub fn insert(&self, item: Item<K, V>) -> Result<(), Error> {
214        let key = item.key.clone();
215        let val = if item.value == V::DELETED_MARKER { None } else { Some(item.value.clone()) };
216        {
217            // `seal` below relies on us holding a read lock whilst we do the mutation.
218            let data = self.data.read();
219            if let Some(mutation_callback) = data.mutation_callback.as_ref() {
220                mutation_callback(Operation::Insert, &item);
221            }
222            data.mutable_layer.insert(item)?;
223        }
224        self.cache.invalidate(key, val);
225        Ok(())
226    }
227
228    /// Replaces or inserts an item into the mutable layer.
229    pub fn replace_or_insert(&self, item: Item<K, V>) {
230        let key = item.key.clone();
231        let val = if item.value == V::DELETED_MARKER { None } else { Some(item.value.clone()) };
232        {
233            // `seal` below relies on us holding a read lock whilst we do the mutation.
234            let data = self.data.read();
235            if let Some(mutation_callback) = data.mutation_callback.as_ref() {
236                mutation_callback(Operation::ReplaceOrInsert, &item);
237            }
238            data.mutable_layer.replace_or_insert(item);
239        }
240        self.cache.invalidate(key, val);
241    }
242
243    /// Merges the given item into the mutable layer.
244    pub fn merge_into(&self, item: Item<K, V>, lower_bound: &K) {
245        let key = item.key.clone();
246        {
247            // `seal` below relies on us holding a read lock whilst we do the mutation.
248            let data = self.data.read();
249            if let Some(mutation_callback) = data.mutation_callback.as_ref() {
250                mutation_callback(Operation::MergeInto, &item);
251            }
252            data.mutable_layer.merge_into(item, lower_bound, self.merge_fn);
253        }
254        self.cache.invalidate(key, None);
255    }
256
257    /// Searches for an exact match for the given key. If the value is equal to
258    /// `Value::DELETED_MARKER` the item is considered missing and will not be returned.
259    pub async fn find(&self, search_key: &K) -> Result<Option<Item<K, V>>, Error>
260    where
261        K: Eq,
262    {
263        // It is important that the cache lookup is done prior to fetching the layer set as the
264        // placeholder returned acts as a sort of lock for the validity of the item that may be
265        // inserted later via that placeholder.
266        let token = match self.cache.lookup_or_reserve(search_key) {
267            ObjectCacheResult::Value(value) => {
268                if value == V::DELETED_MARKER {
269                    return Ok(None);
270                } else {
271                    return Ok(Some(Item::new(search_key.clone(), value)));
272                }
273            }
274            ObjectCacheResult::Placeholder(token) => Some(token),
275            ObjectCacheResult::NoCache => None,
276        };
277        let layer_set = self.layer_set();
278        let mut merger = layer_set.merger();
279
280        Ok(match merger.query(Query::Point(search_key)).await?.get() {
281            Some(ItemRef { key, value, sequence })
282                if key == search_key && *value != V::DELETED_MARKER =>
283            {
284                if let Some(token) = token {
285                    token.complete(Some(value));
286                }
287                Some(Item { key: key.clone(), value: value.clone(), sequence })
288            }
289            _ => None,
290        })
291    }
292
293    pub fn mutable_layer(&self) -> Arc<SkipListLayer<K, V>> {
294        self.data.read().mutable_layer.clone()
295    }
296
297    /// Sets a mutation callback which is a callback that is triggered whenever any mutations are
298    /// applied to the tree.  This might be useful for tests that want to record the precise
299    /// sequence of mutations that are applied to the tree.
300    pub fn set_mutation_callback(&self, mutation_callback: MutationCallback<K, V>) {
301        self.data.write().mutation_callback = mutation_callback;
302    }
303
304    /// Returns the earliest version used by a layer in the tree.
305    pub fn get_earliest_version(&self) -> Version {
306        let mut earliest_version = LATEST_VERSION;
307        for layer in self.layer_set().layers {
308            let layer_version = layer.get_version();
309            if layer_version < earliest_version {
310                earliest_version = layer_version;
311            }
312        }
313        return earliest_version;
314    }
315
316    /// Returns a new mutable layer.
317    pub fn new_mutable_layer() -> Arc<SkipListLayer<K, V>> {
318        SkipListLayer::new(SKIP_LIST_LAYER_ITEMS)
319    }
320
321    /// Replaces the mutable layer.
322    pub fn set_mutable_layer(&self, layer: Arc<SkipListLayer<K, V>>) {
323        self.data.write().mutable_layer = layer;
324    }
325
326    /// Records inspect data for the LSM tree into `node`.  Called lazily when inspect is queried.
327    pub fn record_inspect_data(&self, root: &fuchsia_inspect::Node) {
328        let layer_set = self.layer_set();
329        root.record_child("layers", move |node| {
330            let mut index = 0;
331            for layer in layer_set.layers {
332                node.record_child(format!("{index}"), move |node| {
333                    layer.1.record_inspect_data(node)
334                });
335                index += 1;
336            }
337        });
338        {
339            let counters = self.counters.lock();
340            root.record_uint("num_seeks", counters.num_seeks as u64);
341            root.record_uint(
342                "bloom_filter_success_percent",
343                if counters.layer_files_total == 0 {
344                    0
345                } else {
346                    (counters.layer_files_skipped * 100).div_ceil(counters.layer_files_total) as u64
347                },
348            );
349        }
350    }
351}
352
353/// This is an RAII wrapper for a layer which holds a lock on the layer (via the Layer::lock
354/// method).
355pub struct LockedLayer<K, V>(Arc<DropEvent>, Arc<dyn Layer<K, V>>);
356
357impl<K, V> LockedLayer<K, V> {
358    pub async fn close_layer(self) {
359        let layer = self.1;
360        std::mem::drop(self.0);
361        layer.close().await;
362    }
363}
364
365impl<K, V> From<Arc<dyn Layer<K, V>>> for LockedLayer<K, V> {
366    fn from(layer: Arc<dyn Layer<K, V>>) -> Self {
367        let event = layer.lock().unwrap();
368        Self(event, layer)
369    }
370}
371
372impl<K, V> std::ops::Deref for LockedLayer<K, V> {
373    type Target = Arc<dyn Layer<K, V>>;
374
375    fn deref(&self) -> &Self::Target {
376        &self.1
377    }
378}
379
380impl<K, V> AsRef<dyn Layer<K, V>> for LockedLayer<K, V> {
381    fn as_ref(&self) -> &(dyn Layer<K, V> + 'static) {
382        self.1.as_ref()
383    }
384}
385
386/// A LayerSet provides a snapshot of the layers at a particular point in time, and allows you to
387/// get an iterator.  Iterators borrow the layers so something needs to hold reference count.
388pub struct LayerSet<K, V> {
389    pub layers: Vec<LockedLayer<K, V>>,
390    merge_fn: merge::MergeFn<K, V>,
391    counters: Arc<Mutex<Counters>>,
392}
393
394impl<K: Key + LayerKey + OrdLowerBound, V: Value> LayerSet<K, V> {
395    pub fn sum_len(&self) -> usize {
396        let mut size = 0;
397        for layer in &self.layers {
398            size += *layer.estimated_len()
399        }
400        size
401    }
402
403    pub fn merger(&self) -> merge::Merger<'_, K, V> {
404        merge::Merger::new(
405            self.layers.iter().map(|x| x.as_ref()),
406            self.merge_fn,
407            self.counters.clone(),
408        )
409    }
410}
411
412impl<K, V> fmt::Debug for LayerSet<K, V> {
413    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
414        fmt.debug_list()
415            .entries(self.layers.iter().map(|l| {
416                if let Some(handle) = l.handle() {
417                    format!("{}", handle.object_id())
418                } else {
419                    format!("{:?}", Arc::as_ptr(l))
420                }
421            }))
422            .finish()
423    }
424}
425
426#[cfg(test)]
427mod tests {
428    use super::LSMTree;
429    use crate::drop_event::DropEvent;
430    use crate::lsm_tree::cache::{
431        NullCache, ObjectCache, ObjectCachePlaceholder, ObjectCacheResult,
432    };
433    use crate::lsm_tree::merge::{MergeLayerIterator, MergeResult};
434    use crate::lsm_tree::types::{
435        BoxedLayerIterator, FuzzyHash, Item, ItemCount, ItemRef, Key, Layer, LayerIterator,
436        LayerKey, OrdLowerBound, OrdUpperBound, SortByU64, Value,
437    };
438    use crate::lsm_tree::{layers_from_handles, Query};
439    use crate::object_handle::ObjectHandle;
440    use crate::serialized_types::{
441        versioned_type, Version, Versioned, VersionedLatest, LATEST_VERSION,
442    };
443    use crate::testing::fake_object::{FakeObject, FakeObjectHandle};
444    use crate::testing::writer::Writer;
445    use anyhow::{anyhow, Error};
446    use async_trait::async_trait;
447    use fprint::TypeFingerprint;
448    use fuchsia_sync::Mutex;
449    use fxfs_macros::FuzzyHash;
450    use rand::seq::SliceRandom;
451    use rand::thread_rng;
452    use std::hash::Hash;
453    use std::sync::Arc;
454
455    #[derive(
456        Clone,
457        Eq,
458        PartialEq,
459        Debug,
460        Hash,
461        FuzzyHash,
462        serde::Serialize,
463        serde::Deserialize,
464        TypeFingerprint,
465        Versioned,
466    )]
467    struct TestKey(std::ops::Range<u64>);
468
469    versioned_type! { 1.. => TestKey }
470
471    impl SortByU64 for TestKey {
472        fn get_leading_u64(&self) -> u64 {
473            self.0.start
474        }
475    }
476
477    impl LayerKey for TestKey {}
478
479    impl OrdUpperBound for TestKey {
480        fn cmp_upper_bound(&self, other: &TestKey) -> std::cmp::Ordering {
481            self.0.end.cmp(&other.0.end)
482        }
483    }
484
485    impl OrdLowerBound for TestKey {
486        fn cmp_lower_bound(&self, other: &Self) -> std::cmp::Ordering {
487            self.0.start.cmp(&other.0.start)
488        }
489    }
490
491    fn emit_left_merge_fn(
492        _left: &MergeLayerIterator<'_, TestKey, u64>,
493        _right: &MergeLayerIterator<'_, TestKey, u64>,
494    ) -> MergeResult<TestKey, u64> {
495        MergeResult::EmitLeft
496    }
497
498    impl Value for u64 {
499        const DELETED_MARKER: Self = 0;
500    }
501
502    #[fuchsia::test]
503    async fn test_iteration() {
504        let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
505        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
506        tree.insert(items[0].clone()).expect("insert error");
507        tree.insert(items[1].clone()).expect("insert error");
508        let layers = tree.layer_set();
509        let mut merger = layers.merger();
510        let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
511        let ItemRef { key, value, .. } = iter.get().expect("missing item");
512        assert_eq!((key, value), (&items[0].key, &items[0].value));
513        iter.advance().await.expect("advance failed");
514        let ItemRef { key, value, .. } = iter.get().expect("missing item");
515        assert_eq!((key, value), (&items[1].key, &items[1].value));
516        iter.advance().await.expect("advance failed");
517        assert!(iter.get().is_none());
518    }
519
520    #[fuchsia::test]
521    async fn test_compact() {
522        let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
523        let items = [
524            Item::new(TestKey(1..1), 1),
525            Item::new(TestKey(2..2), 2),
526            Item::new(TestKey(3..3), 3),
527            Item::new(TestKey(4..4), 4),
528        ];
529        tree.insert(items[0].clone()).expect("insert error");
530        tree.insert(items[1].clone()).expect("insert error");
531        tree.seal();
532        tree.insert(items[2].clone()).expect("insert error");
533        tree.insert(items[3].clone()).expect("insert error");
534        tree.seal();
535        let object = Arc::new(FakeObject::new());
536        let handle = FakeObjectHandle::new(object.clone());
537        {
538            let layer_set = tree.immutable_layer_set();
539            let mut merger = layer_set.merger();
540            let iter = merger.query(Query::FullScan).await.expect("create merger");
541            tree.compact_with_iterator(
542                iter,
543                items.len(),
544                Writer::new(&handle).await,
545                handle.block_size(),
546            )
547            .await
548            .expect("compact failed");
549        }
550        tree.set_layers(layers_from_handles([handle]).await.expect("layers_from_handles failed"));
551        let handle = FakeObjectHandle::new(object.clone());
552        let tree = LSMTree::open(emit_left_merge_fn, [handle], Box::new(NullCache {}))
553            .await
554            .expect("open failed");
555
556        let layers = tree.layer_set();
557        let mut merger = layers.merger();
558        let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
559        for i in 1..5 {
560            let ItemRef { key, value, .. } = iter.get().expect("missing item");
561            assert_eq!((key, value), (&TestKey(i..i), &i));
562            iter.advance().await.expect("advance failed");
563        }
564        assert!(iter.get().is_none());
565    }
566
567    #[fuchsia::test]
568    async fn test_find() {
569        let items = [
570            Item::new(TestKey(1..1), 1),
571            Item::new(TestKey(2..2), 2),
572            Item::new(TestKey(3..3), 3),
573            Item::new(TestKey(4..4), 4),
574        ];
575        let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
576        tree.insert(items[0].clone()).expect("insert error");
577        tree.insert(items[1].clone()).expect("insert error");
578        tree.seal();
579        tree.insert(items[2].clone()).expect("insert error");
580        tree.insert(items[3].clone()).expect("insert error");
581
582        let item = tree.find(&items[1].key).await.expect("find failed").expect("not found");
583        assert_eq!(item, items[1]);
584        assert!(tree.find(&TestKey(100..100)).await.expect("find failed").is_none());
585    }
586
587    #[fuchsia::test]
588    async fn test_find_no_return_deleted_values() {
589        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), u64::DELETED_MARKER)];
590        let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
591        tree.insert(items[0].clone()).expect("insert error");
592        tree.insert(items[1].clone()).expect("insert error");
593
594        let item = tree.find(&items[0].key).await.expect("find failed").expect("not found");
595        assert_eq!(item, items[0]);
596        assert!(tree.find(&items[1].key).await.expect("find failed").is_none());
597    }
598
599    #[fuchsia::test]
600    async fn test_empty_seal() {
601        let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
602        tree.seal();
603        let item = Item::new(TestKey(1..1), 1);
604        tree.insert(item.clone()).expect("insert error");
605        let object = Arc::new(FakeObject::new());
606        let handle = FakeObjectHandle::new(object.clone());
607        {
608            let layer_set = tree.immutable_layer_set();
609            let mut merger = layer_set.merger();
610            let iter = merger.query(Query::FullScan).await.expect("create merger");
611            tree.compact_with_iterator(iter, 0, Writer::new(&handle).await, handle.block_size())
612                .await
613                .expect("compact failed");
614        }
615        tree.set_layers(layers_from_handles([handle]).await.expect("layers_from_handles failed"));
616        let found_item = tree.find(&item.key).await.expect("find failed").expect("not found");
617        assert_eq!(found_item, item);
618        assert!(tree.find(&TestKey(2..2)).await.expect("find failed").is_none());
619    }
620
621    #[fuchsia::test]
622    async fn test_filter() {
623        let items = [
624            Item::new(TestKey(1..1), 1),
625            Item::new(TestKey(2..2), 2),
626            Item::new(TestKey(3..3), 3),
627            Item::new(TestKey(4..4), 4),
628        ];
629        let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
630        tree.insert(items[0].clone()).expect("insert error");
631        tree.insert(items[1].clone()).expect("insert error");
632        tree.insert(items[2].clone()).expect("insert error");
633        tree.insert(items[3].clone()).expect("insert error");
634
635        let layers = tree.layer_set();
636        let mut merger = layers.merger();
637
638        // Filter out odd keys (which also guarantees we skip the first key which is an edge case).
639        let mut iter = merger
640            .query(Query::FullScan)
641            .await
642            .expect("seek failed")
643            .filter(|item: ItemRef<'_, TestKey, u64>| item.key.0.start % 2 == 0)
644            .await
645            .expect("filter failed");
646
647        assert_eq!(iter.get(), Some(items[1].as_item_ref()));
648        iter.advance().await.expect("advance failed");
649        assert_eq!(iter.get(), Some(items[3].as_item_ref()));
650        iter.advance().await.expect("advance failed");
651        assert!(iter.get().is_none());
652    }
653
654    #[fuchsia::test]
655    async fn test_insert_order_agnostic() {
656        let items = [
657            Item::new(TestKey(1..1), 1),
658            Item::new(TestKey(2..2), 2),
659            Item::new(TestKey(3..3), 3),
660            Item::new(TestKey(4..4), 4),
661            Item::new(TestKey(5..5), 5),
662            Item::new(TestKey(6..6), 6),
663        ];
664        let a = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
665        for item in &items {
666            a.insert(item.clone()).expect("insert error");
667        }
668        let b = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
669        let mut shuffled = items.clone();
670        shuffled.shuffle(&mut thread_rng());
671        for item in &shuffled {
672            b.insert(item.clone()).expect("insert error");
673        }
674        let layers = a.layer_set();
675        let mut merger = layers.merger();
676        let mut iter_a = merger.query(Query::FullScan).await.expect("seek failed");
677        let layers = b.layer_set();
678        let mut merger = layers.merger();
679        let mut iter_b = merger.query(Query::FullScan).await.expect("seek failed");
680
681        for item in items {
682            assert_eq!(Some(item.as_item_ref()), iter_a.get());
683            assert_eq!(Some(item.as_item_ref()), iter_b.get());
684            iter_a.advance().await.expect("advance failed");
685            iter_b.advance().await.expect("advance failed");
686        }
687        assert!(iter_a.get().is_none());
688        assert!(iter_b.get().is_none());
689    }
690
691    struct AuditCacheInner<'a, V: Value> {
692        lookups: u64,
693        completions: u64,
694        invalidations: u64,
695        drops: u64,
696        result: Option<ObjectCacheResult<'a, V>>,
697    }
698
699    impl<V: Value> AuditCacheInner<'_, V> {
700        fn stats(&self) -> (u64, u64, u64, u64) {
701            (self.lookups, self.completions, self.invalidations, self.drops)
702        }
703    }
704
705    struct AuditCache<'a, V: Value> {
706        inner: Arc<Mutex<AuditCacheInner<'a, V>>>,
707    }
708
709    impl<V: Value> AuditCache<'_, V> {
710        fn new() -> Self {
711            Self {
712                inner: Arc::new(Mutex::new(AuditCacheInner {
713                    lookups: 0,
714                    completions: 0,
715                    invalidations: 0,
716                    drops: 0,
717                    result: None,
718                })),
719            }
720        }
721    }
722
723    struct AuditPlaceholder<'a, V: Value> {
724        inner: Arc<Mutex<AuditCacheInner<'a, V>>>,
725        completed: Mutex<bool>,
726    }
727
728    impl<V: Value> ObjectCachePlaceholder<V> for AuditPlaceholder<'_, V> {
729        fn complete(self: Box<Self>, _: Option<&V>) {
730            self.inner.lock().completions += 1;
731            *self.completed.lock() = true;
732        }
733    }
734
735    impl<V: Value> Drop for AuditPlaceholder<'_, V> {
736        fn drop(&mut self) {
737            if !*self.completed.lock() {
738                self.inner.lock().drops += 1;
739            }
740        }
741    }
742
743    impl<K: Key + std::cmp::PartialEq, V: Value> ObjectCache<K, V> for AuditCache<'_, V> {
744        fn lookup_or_reserve(&self, _key: &K) -> ObjectCacheResult<'_, V> {
745            {
746                let mut inner = self.inner.lock();
747                inner.lookups += 1;
748                if inner.result.is_some() {
749                    return std::mem::take(&mut inner.result).unwrap();
750                }
751            }
752            ObjectCacheResult::Placeholder(Box::new(AuditPlaceholder {
753                inner: self.inner.clone(),
754                completed: Mutex::new(false),
755            }))
756        }
757
758        fn invalidate(&self, _key: K, _value: Option<V>) {
759            self.inner.lock().invalidations += 1;
760        }
761    }
762
763    #[fuchsia::test]
764    async fn test_cache_handling() {
765        let item = Item::new(TestKey(1..1), 1);
766        let cache = Box::new(AuditCache::new());
767        let inner = cache.inner.clone();
768        let a = LSMTree::new(emit_left_merge_fn, cache);
769
770        // Zero counters.
771        assert_eq!(inner.lock().stats(), (0, 0, 0, 0));
772
773        // Look for an item, but don't find it. So no insertion. It is dropped.
774        assert!(a.find(&item.key).await.expect("Failed find").is_none());
775        assert_eq!(inner.lock().stats(), (1, 0, 0, 1));
776
777        // Insert attempts to invalidate.
778        let _ = a.insert(item.clone());
779        assert_eq!(inner.lock().stats(), (1, 0, 1, 1));
780
781        // Look for item, find it and insert into the cache.
782        assert_eq!(
783            a.find(&item.key).await.expect("Failed find").expect("Item should be found.").value,
784            item.value
785        );
786        assert_eq!(inner.lock().stats(), (2, 1, 1, 1));
787
788        // Insert or replace attempts to invalidate as well.
789        a.replace_or_insert(item.clone());
790        assert_eq!(inner.lock().stats(), (2, 1, 2, 1));
791    }
792
793    #[fuchsia::test]
794    async fn test_cache_hit() {
795        let item = Item::new(TestKey(1..1), 1);
796        let cache = Box::new(AuditCache::new());
797        let inner = cache.inner.clone();
798        let a = LSMTree::new(emit_left_merge_fn, cache);
799
800        // Zero counters.
801        assert_eq!(inner.lock().stats(), (0, 0, 0, 0));
802
803        // Insert attempts to invalidate.
804        let _ = a.insert(item.clone());
805        assert_eq!(inner.lock().stats(), (0, 0, 1, 0));
806
807        // Set up the item to find in the cache.
808        inner.lock().result = Some(ObjectCacheResult::Value(item.value.clone()));
809
810        // Look for item, find it in cache, so no insert.
811        assert_eq!(
812            a.find(&item.key).await.expect("Failed find").expect("Item should be found.").value,
813            item.value
814        );
815        assert_eq!(inner.lock().stats(), (1, 0, 1, 0));
816    }
817
818    #[fuchsia::test]
819    async fn test_cache_says_uncacheable() {
820        let item = Item::new(TestKey(1..1), 1);
821        let cache = Box::new(AuditCache::new());
822        let inner = cache.inner.clone();
823        let a = LSMTree::new(emit_left_merge_fn, cache);
824        let _ = a.insert(item.clone());
825
826        // One invalidation from the insert.
827        assert_eq!(inner.lock().stats(), (0, 0, 1, 0));
828
829        // Set up the NoCache response to find in the cache.
830        inner.lock().result = Some(ObjectCacheResult::NoCache);
831
832        // Look for item, it is uncacheable, so no insert.
833        assert_eq!(
834            a.find(&item.key).await.expect("Failed find").expect("Should find item").value,
835            item.value
836        );
837        assert_eq!(inner.lock().stats(), (1, 0, 1, 0));
838    }
839
840    struct FailLayer {
841        drop_event: Mutex<Option<Arc<DropEvent>>>,
842    }
843
844    impl FailLayer {
845        fn new() -> Self {
846            Self { drop_event: Mutex::new(Some(Arc::new(DropEvent::new()))) }
847        }
848    }
849
850    #[async_trait]
851    impl<K: Key, V: Value> Layer<K, V> for FailLayer {
852        async fn seek(
853            &self,
854            _bound: std::ops::Bound<&K>,
855        ) -> Result<BoxedLayerIterator<'_, K, V>, Error> {
856            Err(anyhow!("Purposely failed seek"))
857        }
858
859        fn lock(&self) -> Option<Arc<DropEvent>> {
860            self.drop_event.lock().clone()
861        }
862
863        fn estimated_len(&self) -> ItemCount {
864            ItemCount::Estimate(0)
865        }
866
867        async fn close(&self) {
868            let listener = match std::mem::replace(&mut (*self.drop_event.lock()), None) {
869                Some(drop_event) => drop_event.listen(),
870                None => return,
871            };
872            listener.await;
873        }
874
875        fn get_version(&self) -> Version {
876            LATEST_VERSION
877        }
878    }
879
880    #[fuchsia::test]
881    async fn test_failed_lookup() {
882        let cache = Box::new(AuditCache::new());
883        let inner = cache.inner.clone();
884        let a = LSMTree::new(emit_left_merge_fn, cache);
885        a.set_layers(vec![Arc::new(FailLayer::new())]);
886
887        // Zero counters.
888        assert_eq!(inner.lock().stats(), (0, 0, 0, 0));
889
890        // Lookup should fail and drop the placeholder.
891        assert!(a.find(&TestKey(1..1)).await.is_err());
892        assert_eq!(inner.lock().stats(), (1, 0, 0, 1));
893    }
894}
895
896#[cfg(fuzz)]
897mod fuzz {
898    use crate::lsm_tree::types::{
899        FuzzyHash, Item, LayerKey, OrdLowerBound, OrdUpperBound, SortByU64, Value,
900    };
901    use crate::serialized_types::{
902        versioned_type, Version, Versioned, VersionedLatest, LATEST_VERSION,
903    };
904    use arbitrary::Arbitrary;
905    use fprint::TypeFingerprint;
906    use fuzz::fuzz;
907    use fxfs_macros::FuzzyHash;
908    use std::hash::Hash;
909
910    #[derive(
911        Arbitrary,
912        Clone,
913        Eq,
914        Hash,
915        FuzzyHash,
916        PartialEq,
917        Debug,
918        serde::Serialize,
919        serde::Deserialize,
920        TypeFingerprint,
921        Versioned,
922    )]
923    struct TestKey(std::ops::Range<u64>);
924
925    versioned_type! { 1.. => TestKey }
926
927    impl Versioned for u64 {}
928    versioned_type! { 1.. => u64 }
929
930    impl LayerKey for TestKey {}
931
932    impl SortByU64 for TestKey {
933        fn get_leading_u64(&self) -> u64 {
934            self.0.start
935        }
936    }
937
938    impl OrdUpperBound for TestKey {
939        fn cmp_upper_bound(&self, other: &TestKey) -> std::cmp::Ordering {
940            self.0.end.cmp(&other.0.end)
941        }
942    }
943
944    impl OrdLowerBound for TestKey {
945        fn cmp_lower_bound(&self, other: &Self) -> std::cmp::Ordering {
946            self.0.start.cmp(&other.0.start)
947        }
948    }
949
950    impl Value for u64 {
951        const DELETED_MARKER: Self = 0;
952    }
953
954    // Note: This code isn't really dead. it's used below in
955    // `fuzz_lsm_tree_action`. However, the `#[fuzz]` proc macro attribute
956    // obfuscates the usage enough to confuse the compiler.
957    #[allow(dead_code)]
958    #[derive(Arbitrary)]
959    enum FuzzAction {
960        Insert(Item<TestKey, u64>),
961        ReplaceOrInsert(Item<TestKey, u64>),
962        MergeInto(Item<TestKey, u64>, TestKey),
963        Find(TestKey),
964        Seal,
965    }
966
967    #[fuzz]
968    fn fuzz_lsm_tree_actions(actions: Vec<FuzzAction>) {
969        use super::cache::NullCache;
970        use super::LSMTree;
971        use crate::lsm_tree::merge::{MergeLayerIterator, MergeResult};
972        use futures::executor::block_on;
973
974        fn emit_left_merge_fn(
975            _left: &MergeLayerIterator<'_, TestKey, u64>,
976            _right: &MergeLayerIterator<'_, TestKey, u64>,
977        ) -> MergeResult<TestKey, u64> {
978            MergeResult::EmitLeft
979        }
980
981        let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
982        for action in actions {
983            match action {
984                FuzzAction::Insert(item) => {
985                    let _ = tree.insert(item);
986                }
987                FuzzAction::ReplaceOrInsert(item) => {
988                    tree.replace_or_insert(item);
989                }
990                FuzzAction::Find(key) => {
991                    block_on(tree.find(&key)).expect("find failed");
992                }
993                FuzzAction::MergeInto(item, bound) => tree.merge_into(item, &bound),
994                FuzzAction::Seal => tree.seal(),
995            };
996        }
997    }
998}