fxfs/
lsm_tree.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5mod bloom_filter;
6pub mod cache;
7pub mod merge;
8pub mod persistent_layer;
9pub mod skip_list_layer;
10pub mod types;
11
12use crate::drop_event::DropEvent;
13use crate::log::*;
14use crate::object_handle::{ReadObjectHandle, WriteBytes};
15use crate::serialized_types::{LATEST_VERSION, Version};
16use anyhow::Error;
17use cache::{ObjectCache, ObjectCacheResult};
18use fuchsia_sync::{Mutex, RwLock};
19use persistent_layer::{PersistentLayer, PersistentLayerWriter};
20use skip_list_layer::SkipListLayer;
21use std::fmt;
22use std::sync::Arc;
23use types::{
24    Item, ItemRef, Key, Layer, LayerIterator, LayerKey, LayerWriter, MergeableKey, OrdLowerBound,
25    Value,
26};
27
28pub use merge::Query;
29
30const SKIP_LIST_LAYER_ITEMS: usize = 512;
31
32// For serialization.
33pub use persistent_layer::{
34    LayerHeader as PersistentLayerHeader, LayerHeaderV39 as PersistentLayerHeaderV39,
35    LayerInfo as PersistentLayerInfo, LayerInfoV39 as PersistentLayerInfoV39,
36};
37
38pub async fn layers_from_handles<K: Key, V: Value>(
39    handles: impl IntoIterator<Item = impl ReadObjectHandle + 'static>,
40) -> Result<Vec<Arc<dyn Layer<K, V>>>, Error> {
41    let mut layers = Vec::new();
42    for handle in handles {
43        layers.push(PersistentLayer::open(handle).await? as Arc<dyn Layer<K, V>>);
44    }
45    Ok(layers)
46}
47
48#[derive(Eq, PartialEq, Debug)]
49pub enum Operation {
50    Insert,
51    ReplaceOrInsert,
52    MergeInto,
53}
54
55pub type MutationCallback<K, V> = Option<Box<dyn Fn(Operation, &Item<K, V>) + Send + Sync>>;
56
57struct Inner<K, V> {
58    mutable_layer: Arc<SkipListLayer<K, V>>,
59    layers: Vec<Arc<dyn Layer<K, V>>>,
60    mutation_callback: MutationCallback<K, V>,
61}
62
63#[derive(Default)]
64pub(super) struct Counters {
65    num_seeks: usize,
66    // The following two metrics are used to compute the effectiveness of the bloom filters.
67    // `layer_files_total` tracks the number of layer files we might have looked at across all
68    // seeks, and `layer_files_skipped` tracks how many we skipped thanks to the bloom filter.
69    layer_files_total: usize,
70    layer_files_skipped: usize,
71}
72
73/// LSMTree manages a tree of layers to provide a key/value store.  Each layer contains deltas on
74/// the preceding layer.  The top layer is an in-memory mutable layer.  Layers can be compacted to
75/// form a new combined layer.
76pub struct LSMTree<K, V> {
77    data: RwLock<Inner<K, V>>,
78    merge_fn: merge::MergeFn<K, V>,
79    cache: Box<dyn ObjectCache<K, V>>,
80    counters: Arc<Mutex<Counters>>,
81}
82
83#[fxfs_trace::trace]
84impl<'tree, K: MergeableKey, V: Value> LSMTree<K, V> {
85    /// Creates a new empty tree.
86    pub fn new(merge_fn: merge::MergeFn<K, V>, cache: Box<dyn ObjectCache<K, V>>) -> Self {
87        LSMTree {
88            data: RwLock::new(Inner {
89                mutable_layer: Self::new_mutable_layer(),
90                layers: Vec::new(),
91                mutation_callback: None,
92            }),
93            merge_fn,
94            cache,
95            counters: Arc::new(Mutex::new(Default::default())),
96        }
97    }
98
99    /// Opens an existing tree from the provided handles to the layer objects.
100    pub async fn open(
101        merge_fn: merge::MergeFn<K, V>,
102        handles: impl IntoIterator<Item = impl ReadObjectHandle + 'static>,
103        cache: Box<dyn ObjectCache<K, V>>,
104    ) -> Result<Self, Error> {
105        Ok(LSMTree {
106            data: RwLock::new(Inner {
107                mutable_layer: Self::new_mutable_layer(),
108                layers: layers_from_handles(handles).await?,
109                mutation_callback: None,
110            }),
111            merge_fn,
112            cache,
113            counters: Arc::new(Mutex::new(Default::default())),
114        })
115    }
116
117    /// Replaces the immutable layers.
118    pub fn set_layers(&self, layers: Vec<Arc<dyn Layer<K, V>>>) {
119        self.data.write().layers = layers;
120    }
121
122    /// Appends to the given layers at the end i.e. they should be base layers.  This is supposed
123    /// to be used after replay when we are opening a tree and we have discovered the base layers.
124    pub async fn append_layers(
125        &self,
126        handles: impl IntoIterator<Item = impl ReadObjectHandle + 'static>,
127    ) -> Result<(), Error> {
128        let mut layers = layers_from_handles(handles).await?;
129        self.data.write().layers.append(&mut layers);
130        Ok(())
131    }
132
133    /// Resets the immutable layers.
134    pub fn reset_immutable_layers(&self) {
135        self.data.write().layers = Vec::new();
136    }
137
138    /// Seals the current mutable layer and creates a new one.
139    pub fn seal(&self) {
140        // We need to be sure there are no mutations currently in-progress.  This is currently
141        // guaranteed by ensuring that all mutations take a read lock on `data`.
142        let mut data = self.data.write();
143        let layer = std::mem::replace(&mut data.mutable_layer, Self::new_mutable_layer());
144        data.layers.insert(0, layer);
145    }
146
147    /// Resets the tree to an empty state.
148    pub fn reset(&self) {
149        let mut data = self.data.write();
150        data.layers = Vec::new();
151        data.mutable_layer = Self::new_mutable_layer();
152    }
153
154    /// Writes the items yielded by the iterator into the supplied object.
155    #[trace]
156    pub async fn compact_with_iterator<W: WriteBytes + Send>(
157        &self,
158        mut iterator: impl LayerIterator<K, V>,
159        num_items: usize,
160        writer: W,
161        block_size: u64,
162    ) -> Result<(), Error> {
163        let mut writer =
164            PersistentLayerWriter::<W, K, V>::new(writer, num_items, block_size).await?;
165        while let Some(item_ref) = iterator.get() {
166            debug!(item_ref:?; "compact: writing");
167            writer.write(item_ref).await?;
168            iterator.advance().await?;
169        }
170        writer.flush().await
171    }
172
173    /// Returns an empty layer-set for this tree.
174    pub fn empty_layer_set(&self) -> LayerSet<K, V> {
175        LayerSet { layers: Vec::new(), merge_fn: self.merge_fn, counters: self.counters.clone() }
176    }
177
178    /// Adds all the layers (including the mutable layer) to `layer_set`.
179    pub fn add_all_layers_to_layer_set(&self, layer_set: &mut LayerSet<K, V>) {
180        let data = self.data.read();
181        layer_set.layers.reserve_exact(data.layers.len() + 1);
182        layer_set
183            .layers
184            .push(LockedLayer::from(data.mutable_layer.clone() as Arc<dyn Layer<K, V>>));
185        for layer in &data.layers {
186            layer_set.layers.push(layer.clone().into());
187        }
188    }
189
190    /// Returns a clone of the current set of layers (including the mutable layer), after which one
191    /// can get an iterator.
192    pub fn layer_set(&self) -> LayerSet<K, V> {
193        let mut layer_set = self.empty_layer_set();
194        self.add_all_layers_to_layer_set(&mut layer_set);
195        layer_set
196    }
197
198    /// Returns the current set of immutable layers after which one can get an iterator (for e.g.
199    /// compacting).  Since these layers are immutable, getting an iterator should not block
200    /// anything else.
201    pub fn immutable_layer_set(&self) -> LayerSet<K, V> {
202        let data = self.data.read();
203        let mut layers = Vec::with_capacity(data.layers.len());
204        for layer in &data.layers {
205            layers.push(layer.clone().into());
206        }
207        LayerSet { layers, merge_fn: self.merge_fn, counters: self.counters.clone() }
208    }
209
210    /// Inserts an item into the mutable layer.
211    /// Returns error if item already exists.
212    pub fn insert(&self, item: Item<K, V>) -> Result<(), Error> {
213        let key = item.key.clone();
214        let val = if item.value == V::DELETED_MARKER { None } else { Some(item.value.clone()) };
215        {
216            // `seal` below relies on us holding a read lock whilst we do the mutation.
217            let data = self.data.read();
218            if let Some(mutation_callback) = data.mutation_callback.as_ref() {
219                mutation_callback(Operation::Insert, &item);
220            }
221            data.mutable_layer.insert(item)?;
222        }
223        self.cache.invalidate(key, val);
224        Ok(())
225    }
226
227    /// Replaces or inserts an item into the mutable layer.
228    pub fn replace_or_insert(&self, item: Item<K, V>) {
229        let key = item.key.clone();
230        let val = if item.value == V::DELETED_MARKER { None } else { Some(item.value.clone()) };
231        {
232            // `seal` below relies on us holding a read lock whilst we do the mutation.
233            let data = self.data.read();
234            if let Some(mutation_callback) = data.mutation_callback.as_ref() {
235                mutation_callback(Operation::ReplaceOrInsert, &item);
236            }
237            data.mutable_layer.replace_or_insert(item);
238        }
239        self.cache.invalidate(key, val);
240    }
241
242    /// Merges the given item into the mutable layer.
243    pub fn merge_into(&self, item: Item<K, V>, lower_bound: &K) {
244        let key = item.key.clone();
245        {
246            // `seal` below relies on us holding a read lock whilst we do the mutation.
247            let data = self.data.read();
248            if let Some(mutation_callback) = data.mutation_callback.as_ref() {
249                mutation_callback(Operation::MergeInto, &item);
250            }
251            data.mutable_layer.merge_into(item, lower_bound, self.merge_fn);
252        }
253        self.cache.invalidate(key, None);
254    }
255
256    /// Searches for an exact match for the given key. If the value is equal to
257    /// `Value::DELETED_MARKER` the item is considered missing and will not be returned.
258    pub async fn find(&self, search_key: &K) -> Result<Option<Item<K, V>>, Error>
259    where
260        K: Eq,
261    {
262        // It is important that the cache lookup is done prior to fetching the layer set as the
263        // placeholder returned acts as a sort of lock for the validity of the item that may be
264        // inserted later via that placeholder.
265        let token = match self.cache.lookup_or_reserve(search_key) {
266            ObjectCacheResult::Value(value) => {
267                if value == V::DELETED_MARKER {
268                    return Ok(None);
269                } else {
270                    return Ok(Some(Item::new(search_key.clone(), value)));
271                }
272            }
273            ObjectCacheResult::Placeholder(token) => Some(token),
274            ObjectCacheResult::NoCache => None,
275        };
276        let layer_set = self.layer_set();
277        let mut merger = layer_set.merger();
278
279        Ok(match merger.query(Query::Point(search_key)).await?.get() {
280            Some(ItemRef { key, value, sequence })
281                if key == search_key && *value != V::DELETED_MARKER =>
282            {
283                if let Some(token) = token {
284                    token.complete(Some(value));
285                }
286                Some(Item { key: key.clone(), value: value.clone(), sequence })
287            }
288            _ => None,
289        })
290    }
291
292    pub fn mutable_layer(&self) -> Arc<SkipListLayer<K, V>> {
293        self.data.read().mutable_layer.clone()
294    }
295
296    /// Sets a mutation callback which is a callback that is triggered whenever any mutations are
297    /// applied to the tree.  This might be useful for tests that want to record the precise
298    /// sequence of mutations that are applied to the tree.
299    pub fn set_mutation_callback(&self, mutation_callback: MutationCallback<K, V>) {
300        self.data.write().mutation_callback = mutation_callback;
301    }
302
303    /// Returns the earliest version used by a layer in the tree.
304    pub fn get_earliest_version(&self) -> Version {
305        let mut earliest_version = LATEST_VERSION;
306        for layer in self.layer_set().layers {
307            let layer_version = layer.get_version();
308            if layer_version < earliest_version {
309                earliest_version = layer_version;
310            }
311        }
312        return earliest_version;
313    }
314
315    /// Returns a new mutable layer.
316    pub fn new_mutable_layer() -> Arc<SkipListLayer<K, V>> {
317        SkipListLayer::new(SKIP_LIST_LAYER_ITEMS)
318    }
319
320    /// Replaces the mutable layer.
321    pub fn set_mutable_layer(&self, layer: Arc<SkipListLayer<K, V>>) {
322        self.data.write().mutable_layer = layer;
323    }
324
325    /// Records inspect data for the LSM tree into `node`.  Called lazily when inspect is queried.
326    pub fn record_inspect_data(&self, root: &fuchsia_inspect::Node) {
327        let layer_set = self.layer_set();
328        root.record_child("layers", move |node| {
329            let mut index = 0;
330            for layer in layer_set.layers {
331                node.record_child(format!("{index}"), move |node| {
332                    layer.1.record_inspect_data(node)
333                });
334                index += 1;
335            }
336        });
337        {
338            let counters = self.counters.lock();
339            root.record_uint("num_seeks", counters.num_seeks as u64);
340            root.record_uint(
341                "bloom_filter_success_percent",
342                if counters.layer_files_total == 0 {
343                    0
344                } else {
345                    (counters.layer_files_skipped * 100).div_ceil(counters.layer_files_total) as u64
346                },
347            );
348        }
349    }
350}
351
352/// This is an RAII wrapper for a layer which holds a lock on the layer (via the Layer::lock
353/// method).
354pub struct LockedLayer<K, V>(Arc<DropEvent>, Arc<dyn Layer<K, V>>);
355
356impl<K, V> LockedLayer<K, V> {
357    pub async fn close_layer(self) {
358        let layer = self.1;
359        std::mem::drop(self.0);
360        layer.close().await;
361    }
362}
363
364impl<K, V> From<Arc<dyn Layer<K, V>>> for LockedLayer<K, V> {
365    fn from(layer: Arc<dyn Layer<K, V>>) -> Self {
366        let event = layer.lock().unwrap();
367        Self(event, layer)
368    }
369}
370
371impl<K, V> std::ops::Deref for LockedLayer<K, V> {
372    type Target = Arc<dyn Layer<K, V>>;
373
374    fn deref(&self) -> &Self::Target {
375        &self.1
376    }
377}
378
379impl<K, V> AsRef<dyn Layer<K, V>> for LockedLayer<K, V> {
380    fn as_ref(&self) -> &(dyn Layer<K, V> + 'static) {
381        self.1.as_ref()
382    }
383}
384
385/// A LayerSet provides a snapshot of the layers at a particular point in time, and allows you to
386/// get an iterator.  Iterators borrow the layers so something needs to hold reference count.
387pub struct LayerSet<K, V> {
388    pub layers: Vec<LockedLayer<K, V>>,
389    merge_fn: merge::MergeFn<K, V>,
390    counters: Arc<Mutex<Counters>>,
391}
392
393impl<K: Key + LayerKey + OrdLowerBound, V: Value> LayerSet<K, V> {
394    pub fn sum_len(&self) -> usize {
395        let mut size = 0;
396        for layer in &self.layers {
397            size += layer.len()
398        }
399        size
400    }
401
402    pub fn merger(&self) -> merge::Merger<'_, K, V> {
403        merge::Merger::new(
404            self.layers.iter().map(|x| x.as_ref()),
405            self.merge_fn,
406            self.counters.clone(),
407        )
408    }
409}
410
411impl<K, V> fmt::Debug for LayerSet<K, V> {
412    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
413        fmt.debug_list()
414            .entries(self.layers.iter().map(|l| {
415                if let Some(handle) = l.handle() {
416                    format!("{}", handle.object_id())
417                } else {
418                    format!("{:?}", Arc::as_ptr(l))
419                }
420            }))
421            .finish()
422    }
423}
424
425#[cfg(test)]
426mod tests {
427    use super::LSMTree;
428    use crate::drop_event::DropEvent;
429    use crate::lsm_tree::cache::{
430        NullCache, ObjectCache, ObjectCachePlaceholder, ObjectCacheResult,
431    };
432    use crate::lsm_tree::merge::{MergeLayerIterator, MergeResult};
433    use crate::lsm_tree::types::{
434        BoxedLayerIterator, FuzzyHash, Item, ItemRef, Key, Layer, LayerIterator, LayerKey,
435        OrdLowerBound, OrdUpperBound, SortByU64, Value,
436    };
437    use crate::lsm_tree::{Query, layers_from_handles};
438    use crate::object_handle::ObjectHandle;
439    use crate::serialized_types::{
440        LATEST_VERSION, Version, Versioned, VersionedLatest, versioned_type,
441    };
442    use crate::testing::fake_object::{FakeObject, FakeObjectHandle};
443    use crate::testing::writer::Writer;
444    use anyhow::{Error, anyhow};
445    use async_trait::async_trait;
446    use fprint::TypeFingerprint;
447    use fuchsia_sync::Mutex;
448    use fxfs_macros::FuzzyHash;
449    use rand::rng;
450    use rand::seq::SliceRandom;
451    use std::hash::Hash;
452    use std::sync::Arc;
453
454    #[derive(
455        Clone,
456        Eq,
457        PartialEq,
458        Debug,
459        Hash,
460        FuzzyHash,
461        serde::Serialize,
462        serde::Deserialize,
463        TypeFingerprint,
464        Versioned,
465    )]
466    struct TestKey(std::ops::Range<u64>);
467
468    versioned_type! { 1.. => TestKey }
469
470    impl SortByU64 for TestKey {
471        fn get_leading_u64(&self) -> u64 {
472            self.0.start
473        }
474    }
475
476    impl LayerKey for TestKey {}
477
478    impl OrdUpperBound for TestKey {
479        fn cmp_upper_bound(&self, other: &TestKey) -> std::cmp::Ordering {
480            self.0.end.cmp(&other.0.end)
481        }
482    }
483
484    impl OrdLowerBound for TestKey {
485        fn cmp_lower_bound(&self, other: &Self) -> std::cmp::Ordering {
486            self.0.start.cmp(&other.0.start)
487        }
488    }
489
490    fn emit_left_merge_fn(
491        _left: &MergeLayerIterator<'_, TestKey, u64>,
492        _right: &MergeLayerIterator<'_, TestKey, u64>,
493    ) -> MergeResult<TestKey, u64> {
494        MergeResult::EmitLeft
495    }
496
497    impl Value for u64 {
498        const DELETED_MARKER: Self = 0;
499    }
500
501    #[fuchsia::test]
502    async fn test_iteration() {
503        let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
504        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
505        tree.insert(items[0].clone()).expect("insert error");
506        tree.insert(items[1].clone()).expect("insert error");
507        let layers = tree.layer_set();
508        let mut merger = layers.merger();
509        let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
510        let ItemRef { key, value, .. } = iter.get().expect("missing item");
511        assert_eq!((key, value), (&items[0].key, &items[0].value));
512        iter.advance().await.expect("advance failed");
513        let ItemRef { key, value, .. } = iter.get().expect("missing item");
514        assert_eq!((key, value), (&items[1].key, &items[1].value));
515        iter.advance().await.expect("advance failed");
516        assert!(iter.get().is_none());
517    }
518
519    #[fuchsia::test]
520    async fn test_compact() {
521        let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
522        let items = [
523            Item::new(TestKey(1..1), 1),
524            Item::new(TestKey(2..2), 2),
525            Item::new(TestKey(3..3), 3),
526            Item::new(TestKey(4..4), 4),
527        ];
528        tree.insert(items[0].clone()).expect("insert error");
529        tree.insert(items[1].clone()).expect("insert error");
530        tree.seal();
531        tree.insert(items[2].clone()).expect("insert error");
532        tree.insert(items[3].clone()).expect("insert error");
533        tree.seal();
534        let object = Arc::new(FakeObject::new());
535        let handle = FakeObjectHandle::new(object.clone());
536        {
537            let layer_set = tree.immutable_layer_set();
538            let mut merger = layer_set.merger();
539            let iter = merger.query(Query::FullScan).await.expect("create merger");
540            tree.compact_with_iterator(
541                iter,
542                items.len(),
543                Writer::new(&handle).await,
544                handle.block_size(),
545            )
546            .await
547            .expect("compact failed");
548        }
549        tree.set_layers(layers_from_handles([handle]).await.expect("layers_from_handles failed"));
550        let handle = FakeObjectHandle::new(object.clone());
551        let tree = LSMTree::open(emit_left_merge_fn, [handle], Box::new(NullCache {}))
552            .await
553            .expect("open failed");
554
555        let layers = tree.layer_set();
556        let mut merger = layers.merger();
557        let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
558        for i in 1..5 {
559            let ItemRef { key, value, .. } = iter.get().expect("missing item");
560            assert_eq!((key, value), (&TestKey(i..i), &i));
561            iter.advance().await.expect("advance failed");
562        }
563        assert!(iter.get().is_none());
564    }
565
566    #[fuchsia::test]
567    async fn test_find() {
568        let items = [
569            Item::new(TestKey(1..1), 1),
570            Item::new(TestKey(2..2), 2),
571            Item::new(TestKey(3..3), 3),
572            Item::new(TestKey(4..4), 4),
573        ];
574        let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
575        tree.insert(items[0].clone()).expect("insert error");
576        tree.insert(items[1].clone()).expect("insert error");
577        tree.seal();
578        tree.insert(items[2].clone()).expect("insert error");
579        tree.insert(items[3].clone()).expect("insert error");
580
581        let item = tree.find(&items[1].key).await.expect("find failed").expect("not found");
582        assert_eq!(item, items[1]);
583        assert!(tree.find(&TestKey(100..100)).await.expect("find failed").is_none());
584    }
585
586    #[fuchsia::test]
587    async fn test_find_no_return_deleted_values() {
588        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), u64::DELETED_MARKER)];
589        let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
590        tree.insert(items[0].clone()).expect("insert error");
591        tree.insert(items[1].clone()).expect("insert error");
592
593        let item = tree.find(&items[0].key).await.expect("find failed").expect("not found");
594        assert_eq!(item, items[0]);
595        assert!(tree.find(&items[1].key).await.expect("find failed").is_none());
596    }
597
598    #[fuchsia::test]
599    async fn test_empty_seal() {
600        let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
601        tree.seal();
602        let item = Item::new(TestKey(1..1), 1);
603        tree.insert(item.clone()).expect("insert error");
604        let object = Arc::new(FakeObject::new());
605        let handle = FakeObjectHandle::new(object.clone());
606        {
607            let layer_set = tree.immutable_layer_set();
608            let mut merger = layer_set.merger();
609            let iter = merger.query(Query::FullScan).await.expect("create merger");
610            tree.compact_with_iterator(iter, 0, Writer::new(&handle).await, handle.block_size())
611                .await
612                .expect("compact failed");
613        }
614        tree.set_layers(layers_from_handles([handle]).await.expect("layers_from_handles failed"));
615        let found_item = tree.find(&item.key).await.expect("find failed").expect("not found");
616        assert_eq!(found_item, item);
617        assert!(tree.find(&TestKey(2..2)).await.expect("find failed").is_none());
618    }
619
620    #[fuchsia::test]
621    async fn test_filter() {
622        let items = [
623            Item::new(TestKey(1..1), 1),
624            Item::new(TestKey(2..2), 2),
625            Item::new(TestKey(3..3), 3),
626            Item::new(TestKey(4..4), 4),
627        ];
628        let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
629        tree.insert(items[0].clone()).expect("insert error");
630        tree.insert(items[1].clone()).expect("insert error");
631        tree.insert(items[2].clone()).expect("insert error");
632        tree.insert(items[3].clone()).expect("insert error");
633
634        let layers = tree.layer_set();
635        let mut merger = layers.merger();
636
637        // Filter out odd keys (which also guarantees we skip the first key which is an edge case).
638        let mut iter = merger
639            .query(Query::FullScan)
640            .await
641            .expect("seek failed")
642            .filter(|item: ItemRef<'_, TestKey, u64>| item.key.0.start % 2 == 0)
643            .await
644            .expect("filter failed");
645
646        assert_eq!(iter.get(), Some(items[1].as_item_ref()));
647        iter.advance().await.expect("advance failed");
648        assert_eq!(iter.get(), Some(items[3].as_item_ref()));
649        iter.advance().await.expect("advance failed");
650        assert!(iter.get().is_none());
651    }
652
653    #[fuchsia::test]
654    async fn test_insert_order_agnostic() {
655        let items = [
656            Item::new(TestKey(1..1), 1),
657            Item::new(TestKey(2..2), 2),
658            Item::new(TestKey(3..3), 3),
659            Item::new(TestKey(4..4), 4),
660            Item::new(TestKey(5..5), 5),
661            Item::new(TestKey(6..6), 6),
662        ];
663        let a = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
664        for item in &items {
665            a.insert(item.clone()).expect("insert error");
666        }
667        let b = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
668        let mut shuffled = items.clone();
669        shuffled.shuffle(&mut rng());
670        for item in &shuffled {
671            b.insert(item.clone()).expect("insert error");
672        }
673        let layers = a.layer_set();
674        let mut merger = layers.merger();
675        let mut iter_a = merger.query(Query::FullScan).await.expect("seek failed");
676        let layers = b.layer_set();
677        let mut merger = layers.merger();
678        let mut iter_b = merger.query(Query::FullScan).await.expect("seek failed");
679
680        for item in items {
681            assert_eq!(Some(item.as_item_ref()), iter_a.get());
682            assert_eq!(Some(item.as_item_ref()), iter_b.get());
683            iter_a.advance().await.expect("advance failed");
684            iter_b.advance().await.expect("advance failed");
685        }
686        assert!(iter_a.get().is_none());
687        assert!(iter_b.get().is_none());
688    }
689
690    struct AuditCacheInner<'a, V: Value> {
691        lookups: u64,
692        completions: u64,
693        invalidations: u64,
694        drops: u64,
695        result: Option<ObjectCacheResult<'a, V>>,
696    }
697
698    impl<V: Value> AuditCacheInner<'_, V> {
699        fn stats(&self) -> (u64, u64, u64, u64) {
700            (self.lookups, self.completions, self.invalidations, self.drops)
701        }
702    }
703
704    struct AuditCache<'a, V: Value> {
705        inner: Arc<Mutex<AuditCacheInner<'a, V>>>,
706    }
707
708    impl<V: Value> AuditCache<'_, V> {
709        fn new() -> Self {
710            Self {
711                inner: Arc::new(Mutex::new(AuditCacheInner {
712                    lookups: 0,
713                    completions: 0,
714                    invalidations: 0,
715                    drops: 0,
716                    result: None,
717                })),
718            }
719        }
720    }
721
722    struct AuditPlaceholder<'a, V: Value> {
723        inner: Arc<Mutex<AuditCacheInner<'a, V>>>,
724        completed: Mutex<bool>,
725    }
726
727    impl<V: Value> ObjectCachePlaceholder<V> for AuditPlaceholder<'_, V> {
728        fn complete(self: Box<Self>, _: Option<&V>) {
729            self.inner.lock().completions += 1;
730            *self.completed.lock() = true;
731        }
732    }
733
734    impl<V: Value> Drop for AuditPlaceholder<'_, V> {
735        fn drop(&mut self) {
736            if !*self.completed.lock() {
737                self.inner.lock().drops += 1;
738            }
739        }
740    }
741
742    impl<K: Key + std::cmp::PartialEq, V: Value> ObjectCache<K, V> for AuditCache<'_, V> {
743        fn lookup_or_reserve(&self, _key: &K) -> ObjectCacheResult<'_, V> {
744            {
745                let mut inner = self.inner.lock();
746                inner.lookups += 1;
747                if inner.result.is_some() {
748                    return std::mem::take(&mut inner.result).unwrap();
749                }
750            }
751            ObjectCacheResult::Placeholder(Box::new(AuditPlaceholder {
752                inner: self.inner.clone(),
753                completed: Mutex::new(false),
754            }))
755        }
756
757        fn invalidate(&self, _key: K, _value: Option<V>) {
758            self.inner.lock().invalidations += 1;
759        }
760    }
761
762    #[fuchsia::test]
763    async fn test_cache_handling() {
764        let item = Item::new(TestKey(1..1), 1);
765        let cache = Box::new(AuditCache::new());
766        let inner = cache.inner.clone();
767        let a = LSMTree::new(emit_left_merge_fn, cache);
768
769        // Zero counters.
770        assert_eq!(inner.lock().stats(), (0, 0, 0, 0));
771
772        // Look for an item, but don't find it. So no insertion. It is dropped.
773        assert!(a.find(&item.key).await.expect("Failed find").is_none());
774        assert_eq!(inner.lock().stats(), (1, 0, 0, 1));
775
776        // Insert attempts to invalidate.
777        let _ = a.insert(item.clone());
778        assert_eq!(inner.lock().stats(), (1, 0, 1, 1));
779
780        // Look for item, find it and insert into the cache.
781        assert_eq!(
782            a.find(&item.key).await.expect("Failed find").expect("Item should be found.").value,
783            item.value
784        );
785        assert_eq!(inner.lock().stats(), (2, 1, 1, 1));
786
787        // Insert or replace attempts to invalidate as well.
788        a.replace_or_insert(item.clone());
789        assert_eq!(inner.lock().stats(), (2, 1, 2, 1));
790    }
791
792    #[fuchsia::test]
793    async fn test_cache_hit() {
794        let item = Item::new(TestKey(1..1), 1);
795        let cache = Box::new(AuditCache::new());
796        let inner = cache.inner.clone();
797        let a = LSMTree::new(emit_left_merge_fn, cache);
798
799        // Zero counters.
800        assert_eq!(inner.lock().stats(), (0, 0, 0, 0));
801
802        // Insert attempts to invalidate.
803        let _ = a.insert(item.clone());
804        assert_eq!(inner.lock().stats(), (0, 0, 1, 0));
805
806        // Set up the item to find in the cache.
807        inner.lock().result = Some(ObjectCacheResult::Value(item.value.clone()));
808
809        // Look for item, find it in cache, so no insert.
810        assert_eq!(
811            a.find(&item.key).await.expect("Failed find").expect("Item should be found.").value,
812            item.value
813        );
814        assert_eq!(inner.lock().stats(), (1, 0, 1, 0));
815    }
816
817    #[fuchsia::test]
818    async fn test_cache_says_uncacheable() {
819        let item = Item::new(TestKey(1..1), 1);
820        let cache = Box::new(AuditCache::new());
821        let inner = cache.inner.clone();
822        let a = LSMTree::new(emit_left_merge_fn, cache);
823        let _ = a.insert(item.clone());
824
825        // One invalidation from the insert.
826        assert_eq!(inner.lock().stats(), (0, 0, 1, 0));
827
828        // Set up the NoCache response to find in the cache.
829        inner.lock().result = Some(ObjectCacheResult::NoCache);
830
831        // Look for item, it is uncacheable, so no insert.
832        assert_eq!(
833            a.find(&item.key).await.expect("Failed find").expect("Should find item").value,
834            item.value
835        );
836        assert_eq!(inner.lock().stats(), (1, 0, 1, 0));
837    }
838
839    struct FailLayer {
840        drop_event: Mutex<Option<Arc<DropEvent>>>,
841    }
842
843    impl FailLayer {
844        fn new() -> Self {
845            Self { drop_event: Mutex::new(Some(Arc::new(DropEvent::new()))) }
846        }
847    }
848
849    #[async_trait]
850    impl<K: Key, V: Value> Layer<K, V> for FailLayer {
851        async fn seek(
852            &self,
853            _bound: std::ops::Bound<&K>,
854        ) -> Result<BoxedLayerIterator<'_, K, V>, Error> {
855            Err(anyhow!("Purposely failed seek"))
856        }
857
858        fn lock(&self) -> Option<Arc<DropEvent>> {
859            self.drop_event.lock().clone()
860        }
861
862        fn len(&self) -> usize {
863            0
864        }
865
866        async fn close(&self) {
867            let listener = match std::mem::replace(&mut (*self.drop_event.lock()), None) {
868                Some(drop_event) => drop_event.listen(),
869                None => return,
870            };
871            listener.await;
872        }
873
874        fn get_version(&self) -> Version {
875            LATEST_VERSION
876        }
877    }
878
879    #[fuchsia::test]
880    async fn test_failed_lookup() {
881        let cache = Box::new(AuditCache::new());
882        let inner = cache.inner.clone();
883        let a = LSMTree::new(emit_left_merge_fn, cache);
884        a.set_layers(vec![Arc::new(FailLayer::new())]);
885
886        // Zero counters.
887        assert_eq!(inner.lock().stats(), (0, 0, 0, 0));
888
889        // Lookup should fail and drop the placeholder.
890        assert!(a.find(&TestKey(1..1)).await.is_err());
891        assert_eq!(inner.lock().stats(), (1, 0, 0, 1));
892    }
893}
894
895#[cfg(fuzz)]
896mod fuzz {
897    use crate::lsm_tree::types::{
898        FuzzyHash, Item, LayerKey, OrdLowerBound, OrdUpperBound, SortByU64, Value,
899    };
900    use crate::serialized_types::{
901        LATEST_VERSION, Version, Versioned, VersionedLatest, versioned_type,
902    };
903    use arbitrary::Arbitrary;
904    use fprint::TypeFingerprint;
905    use fuzz::fuzz;
906    use fxfs_macros::FuzzyHash;
907    use std::hash::Hash;
908
909    #[derive(
910        Arbitrary,
911        Clone,
912        Eq,
913        Hash,
914        FuzzyHash,
915        PartialEq,
916        Debug,
917        serde::Serialize,
918        serde::Deserialize,
919        TypeFingerprint,
920        Versioned,
921    )]
922    struct TestKey(std::ops::Range<u64>);
923
924    versioned_type! { 1.. => TestKey }
925
926    impl Versioned for u64 {}
927    versioned_type! { 1.. => u64 }
928
929    impl LayerKey for TestKey {}
930
931    impl SortByU64 for TestKey {
932        fn get_leading_u64(&self) -> u64 {
933            self.0.start
934        }
935    }
936
937    impl OrdUpperBound for TestKey {
938        fn cmp_upper_bound(&self, other: &TestKey) -> std::cmp::Ordering {
939            self.0.end.cmp(&other.0.end)
940        }
941    }
942
943    impl OrdLowerBound for TestKey {
944        fn cmp_lower_bound(&self, other: &Self) -> std::cmp::Ordering {
945            self.0.start.cmp(&other.0.start)
946        }
947    }
948
949    impl Value for u64 {
950        const DELETED_MARKER: Self = 0;
951    }
952
953    // Note: This code isn't really dead. it's used below in
954    // `fuzz_lsm_tree_action`. However, the `#[fuzz]` proc macro attribute
955    // obfuscates the usage enough to confuse the compiler.
956    #[allow(dead_code)]
957    #[derive(Arbitrary)]
958    enum FuzzAction {
959        Insert(Item<TestKey, u64>),
960        ReplaceOrInsert(Item<TestKey, u64>),
961        MergeInto(Item<TestKey, u64>, TestKey),
962        Find(TestKey),
963        Seal,
964    }
965
966    #[fuzz]
967    fn fuzz_lsm_tree_actions(actions: Vec<FuzzAction>) {
968        use super::LSMTree;
969        use super::cache::NullCache;
970        use crate::lsm_tree::merge::{MergeLayerIterator, MergeResult};
971        use futures::executor::block_on;
972
973        fn emit_left_merge_fn(
974            _left: &MergeLayerIterator<'_, TestKey, u64>,
975            _right: &MergeLayerIterator<'_, TestKey, u64>,
976        ) -> MergeResult<TestKey, u64> {
977            MergeResult::EmitLeft
978        }
979
980        let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
981        for action in actions {
982            match action {
983                FuzzAction::Insert(item) => {
984                    let _ = tree.insert(item);
985                }
986                FuzzAction::ReplaceOrInsert(item) => {
987                    tree.replace_or_insert(item);
988                }
989                FuzzAction::Find(key) => {
990                    block_on(tree.find(&key)).expect("find failed");
991                }
992                FuzzAction::MergeInto(item, bound) => tree.merge_into(item, &bound),
993                FuzzAction::Seal => tree.seal(),
994            };
995        }
996    }
997}