Skip to main content

fxfs/lsm_tree/
merge.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::log::*;
6use crate::lsm_tree;
7use crate::lsm_tree::types::{
8    BoxedItem, Item, ItemRef, Key, Layer, LayerIterator, LayerIteratorMut, LayerKey, MergeType,
9    OrdLowerBound, Value,
10};
11use anyhow::Error;
12use async_trait::async_trait;
13use futures::try_join;
14use std::cmp::Ordering;
15use std::collections::BinaryHeap;
16use std::fmt::{Debug, Write};
17use std::ops::{Bound, Deref, DerefMut};
18use std::sync::{Arc, atomic};
19
20#[derive(Debug, Eq, PartialEq)]
21pub enum ItemOp<K, V> {
22    /// Keeps the item to be presented to the merger subsequently with a new merge pair.
23    Keep,
24
25    /// Discards the item and moves on to the next item in the respective layer.
26    Discard,
27
28    /// Replaces the item with something new which will be presented to the merger subsequently with
29    /// a new pair.
30    Replace(BoxedItem<K, V>),
31}
32
33#[derive(Debug, Eq, PartialEq)]
34pub enum MergeResult<K, V> {
35    /// Emits the left item unchanged. Keeps the right item. This is the common case. Once an item
36    /// has been emitted, it will never be seen again by the merge function.
37    EmitLeft,
38
39    /// All other merge results are covered by the following. Take care when replacing items
40    /// that you replace the correct item. The merger will never merge two items together from
41    /// the same layer. Consider the following scenario:
42    ///
43    ///        +-----------+              +-----------+
44    /// 0:     |    A      |              |    C      |
45    ///        +-----------+--------------+-----------+
46    /// 1:                 |      B       |
47    ///                    +--------------+
48    ///
49    /// Let's say that all three items can be merged together. The merge function will first be
50    /// presented with items A and B, at which point it has the option of replacing the left item
51    /// (i.e. A, in layer 0) or the right item (i.e. B in layer 1). However, if you replace the left
52    /// item, the merge function will not then be given the opportunity to merge it with C, so the
53    /// correct thing to do in this case is to replace the right item B in layer 1, and discard the
54    /// left item. A rule you can use is that you should avoid replacing an item with another item
55    /// whose upper bound exceeds that of the item you are replacing.
56    ///
57    /// There are some combinations that might lead to infinite loops (e.g. None, Keep, Keep) and
58    /// should obviously be avoided.
59    Other { emit: Option<BoxedItem<K, V>>, left: ItemOp<K, V>, right: ItemOp<K, V> },
60}
61
62/// Users must provide a merge function which will take pairs of items, left and right, and return a
63/// merge result. The left item's key will either be less than the right item's key, or if they are
64/// the same, then the left item will be in a lower layer index (lower layer indexes indicate more
65/// recent entries). The last remaining item is always emitted.
66pub type MergeFn<K, V> =
67    fn(&MergeLayerIterator<'_, K, V>, &MergeLayerIterator<'_, K, V>) -> MergeResult<K, V>;
68
69pub enum MergeItem<K, V> {
70    None,
71    Item(BoxedItem<K, V>),
72    Iter,
73}
74
75enum RawIterator<'a, K, V> {
76    None,
77    Const(Box<dyn LayerIterator<K, V> + 'a>),
78    Mut(Box<dyn LayerIteratorMut<K, V> + 'a>),
79}
80
81// SAFETY: LayerIteratorMut is not Sync or Send, but it's only used by merge_into, which doesn't
82// send across threads (and is sync, not async).
83unsafe impl<K, V> Send for RawIterator<'_, K, V> {}
84
85// An iterator that keeps track of where we are for each of the layers. We push these onto a
86// min-heap.
87pub struct MergeLayerIterator<'a, K, V> {
88    layer: Option<&'a dyn Layer<K, V>>,
89
90    // The underlying iterator.
91    iter: RawIterator<'a, K, V>,
92
93    // The index of the layer this is for.
94    pub layer_index: u16,
95
96    // The item we are currently pointing at.
97    item: MergeItem<K, V>,
98}
99
100impl<'a, K, V> MergeLayerIterator<'a, K, V> {
101    pub fn key(&self) -> &K {
102        self.item().key
103    }
104
105    pub fn value(&self) -> &V {
106        self.item().value
107    }
108
109    fn new(layer_index: u16, layer: &'a dyn Layer<K, V>) -> Self {
110        MergeLayerIterator {
111            layer: Some(layer),
112            iter: RawIterator::None,
113            layer_index,
114            item: MergeItem::None,
115        }
116    }
117
118    fn new_with_item(layer_index: u16, item: MergeItem<K, V>) -> Self {
119        MergeLayerIterator { layer: None, iter: RawIterator::None, layer_index, item }
120    }
121
122    fn item(&self) -> ItemRef<'_, K, V> {
123        match &self.item {
124            MergeItem::None => panic!("No item!"),
125            MergeItem::Item(item) => item.as_item_ref(),
126            MergeItem::Iter => self.get().unwrap(),
127        }
128    }
129
130    fn get(&self) -> Option<ItemRef<'_, K, V>> {
131        match &self.iter {
132            RawIterator::None => panic!("No iterator!"),
133            RawIterator::Const(iter) => iter.get(),
134            RawIterator::Mut(iter) => iter.get(),
135        }
136    }
137
138    fn set_item_from_iter(&mut self) {
139        self.item = if match &self.iter {
140            RawIterator::None => unreachable!(),
141            RawIterator::Const(iter) => iter.get(),
142            RawIterator::Mut(iter) => iter.get(),
143        }
144        .is_some()
145        {
146            MergeItem::Iter
147        } else {
148            MergeItem::None
149        };
150    }
151
152    fn take_item(&mut self) -> Option<BoxedItem<K, V>> {
153        if matches!(self.item, MergeItem::Item(_)) {
154            if let MergeItem::Item(item) = std::mem::replace(&mut self.item, MergeItem::None) {
155                return Some(item);
156            }
157        }
158        None
159    }
160
161    async fn advance(&mut self) -> Result<(), Error> {
162        if let MergeItem::Iter = self.item {
163            if let RawIterator::Const(iter) = &mut self.iter {
164                iter.advance().await?;
165            } else {
166                // This will never get called in the RawIterator::Mut case.
167                unreachable!();
168            }
169        }
170        self.set_item_from_iter();
171        Ok(())
172    }
173
174    fn replace(&mut self, item: BoxedItem<K, V>) {
175        self.item = MergeItem::Item(item);
176    }
177
178    fn is_some(&self) -> bool {
179        !matches!(self.item, MergeItem::None)
180    }
181
182    // This function exists so that we can advance multiple iterators concurrently using, say,
183    // try_join!.
184    async fn maybe_advance(&mut self, op: &ItemOp<K, V>) -> Result<(), Error> {
185        if let ItemOp::Keep = op { Ok(()) } else { self.advance().await }
186    }
187}
188
189// -- Ord and friends --
190impl<K: OrdLowerBound, V> Ord for MergeLayerIterator<'_, K, V> {
191    fn cmp(&self, other: &Self) -> Ordering {
192        // Reverse ordering because we want min-heap not max-heap.
193        other.key().cmp_lower_bound(self.key()).then(other.layer_index.cmp(&self.layer_index))
194    }
195}
196impl<K: OrdLowerBound, V> PartialOrd for MergeLayerIterator<'_, K, V> {
197    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
198        Some(self.cmp(other))
199    }
200}
201impl<K: OrdLowerBound, V> PartialEq for MergeLayerIterator<'_, K, V> {
202    fn eq(&self, other: &Self) -> bool {
203        self.cmp(other) == Ordering::Equal
204    }
205}
206impl<K: OrdLowerBound, V> Eq for MergeLayerIterator<'_, K, V> {}
207
208// As we merge items, the current item can be an item that has been replaced (and later emitted) by
209// the merge function, or an item referenced by an iterator, or nothing.
210enum CurrentItem<'a, 'b, K, V> {
211    None,
212    Item(BoxedItem<K, V>),
213    Iterator(&'a mut MergeLayerIterator<'b, K, V>),
214}
215
216impl<'a, 'b, K, V> CurrentItem<'a, 'b, K, V> {
217    fn take_iterator(&mut self) -> Option<&'a mut MergeLayerIterator<'b, K, V>> {
218        if matches!(self, CurrentItem::Iterator(_)) {
219            if let CurrentItem::Iterator(iter) = std::mem::replace(self, CurrentItem::None) {
220                return Some(iter);
221            }
222        }
223        None
224    }
225}
226
227impl<'a, K, V> From<&'a CurrentItem<'_, '_, K, V>> for Option<ItemRef<'a, K, V>> {
228    fn from(iter: &'a CurrentItem<'_, '_, K, V>) -> Option<ItemRef<'a, K, V>> {
229        match iter {
230            CurrentItem::None => None,
231            CurrentItem::Iterator(iterator) => Some(iterator.item()),
232            CurrentItem::Item(item) => Some(item.as_item_ref()),
233        }
234    }
235}
236
237/// Merger is the main entry point to merging.
238pub struct Merger<'a, K, V> {
239    // A buffer containing all the MergeLayerIterator objects.
240    iterators: Vec<MergeLayerIterator<'a, K, V>>,
241
242    // The function to be used for merging items.
243    merge_fn: MergeFn<K, V>,
244
245    // If true, additional logging is enabled.
246    trace: bool,
247
248    // Tracks statistics related to the LSM tree.
249    counters: Arc<lsm_tree::TreeCounters>,
250}
251
252/// Query describes the goal of a search in the LSM tree.  The caller specifies this to guide the
253/// merger on which layer files it must consult.
254/// Layers might be skipped during a query for a few reasons:
255///   * Existence filters might allow layer files to be skipped.
256///   * For bounded queries (i.e. any except FullScan), if the search key has
257///     MergeType::OptimizedMerge, we can use that to omit older layers once we find a match.
258#[derive(Debug, Clone)]
259pub enum Query<'a, K: Key + LayerKey + OrdLowerBound> {
260    /// Point queries look for a specific key in the LSM tree.  In this case, the existence filters
261    /// for each layer file can be used to decide if the layer file needs to be consulted.
262    /// Note that it is an error to use `Point` for range-like keys.  Either `LimitedRange` or
263    /// `FullRange` must be used instead.
264    Point(&'a K),
265
266    /// LimitedRange queries allow for iteration over a range of keys.  The returned iterator will
267    /// not necessarily terminate at the end of the range; the caller should stop iterating when the
268    /// iterator passes the end of the range: the records returned cannot be relied upon as
269    /// accurate.  For a LimitedRange query, the existence filters for each layer file can be used,
270    /// but we have to check for all possible keys in the range we wish to search.  Obviously, that
271    /// means that the range should be, well, limited.  Fuzzy hashes permit this for extent-like
272    /// keys, but these queries are not the right choice for things like searching a directory.
273    LimitedRange(&'a K),
274
275    /// FullRange queries position the iterator to a starting key, and scans forward to the first
276    /// key of a different type.  In this case, the existence filters are not used.  The key should
277    /// be a search key (see `LayerKey::search_key`) (which, for extent based keys, would normally
278    /// be `0..start + 1`).  This kind of query will still used optimized merges if the keys found
279    /// support it.
280    FullRange(&'a K),
281
282    /// FullScan queries are intended to yield every record in the tree.  In this case, the
283    /// existence filters are not used.
284    FullScan,
285}
286
287impl<'a, K: Key + LayerKey + OrdLowerBound> Query<'a, K> {
288    fn needs_layer<V>(&self, layer: &dyn Layer<K, V>) -> bool {
289        match self {
290            Self::Point(key) => layer.maybe_contains_key(key),
291            Self::LimitedRange(key) => layer.maybe_contains_key(key),
292            Self::FullRange(_) => true,
293            Self::FullScan => true,
294        }
295    }
296}
297
298#[fxfs_trace::trace]
299impl<'a, K: Key + LayerKey + OrdLowerBound, V: Value> Merger<'a, K, V> {
300    pub(super) fn new<I: Iterator<Item = &'a dyn Layer<K, V>>>(
301        layers: I,
302        merge_fn: MergeFn<K, V>,
303        counters: Arc<lsm_tree::TreeCounters>,
304    ) -> Merger<'a, K, V> {
305        Merger {
306            iterators: layers
307                .enumerate()
308                .map(|(index, layer)| MergeLayerIterator::new(index as u16, layer))
309                .collect(),
310            merge_fn: merge_fn,
311            trace: false,
312            counters,
313        }
314    }
315
316    /// Executes `query`, positioning the iterator to the first matching item.  See `Query` for
317    /// details.
318    #[trace]
319    pub async fn query(
320        &mut self,
321        query: Query<'_, K>,
322    ) -> Result<MergerIterator<'_, 'a, K, V>, Error> {
323        if let Query::Point(key) = query {
324            // NB: It is almost certainly an error to provide a range-like key to a Point query,
325            // hence this debug assertion.  This will most likely result in the wrong result, since
326            // the starting bound would be the original range key and therefore we might miss
327            // records that start earlier but overlap with the key.
328            // For example, if there is a layer which contains 0..100 and 100..200, and we search
329            // for 50..150, we should get both extents, which requires a different starting bound
330            // than 50..150 (particularly it would require the `K::search_key`).
331            debug_assert!(!key.is_range_key())
332        };
333        let len = self.iterators.len();
334        let pending_iterators = {
335            fxfs_trace::duration!("Merger::filter_layer_files", "len" => len);
336            self.iterators
337                .iter_mut()
338                .rev()
339                .filter(|l| query.needs_layer(l.layer.clone().unwrap()))
340                .collect::<Vec<&mut MergeLayerIterator<'a, K, V>>>()
341        };
342        let layer_count = pending_iterators.len();
343        {
344            self.counters.num_seeks.fetch_add(1, atomic::Ordering::Relaxed);
345            self.counters.layer_files_total.fetch_add(len, atomic::Ordering::Relaxed);
346            self.counters
347                .layer_files_skipped
348                .fetch_add(len - layer_count, atomic::Ordering::Relaxed);
349        }
350        log::debug!(query:?; "Consulting {}/{} layers", layer_count, len);
351        let mut merger_iter = MergerIterator {
352            merge_fn: self.merge_fn,
353            pending_iterators,
354            heap: BinaryHeap::with_capacity(layer_count),
355            item: CurrentItem::None,
356            trace: self.trace,
357            history: String::new(),
358        };
359        let owned_key;
360        let search_key = match query {
361            Query::Point(key) => Bound::Included(key),
362            Query::LimitedRange(key) => match key.search_key() {
363                Some(k) => {
364                    owned_key = k;
365                    Bound::Included(&owned_key)
366                }
367                None => Bound::Included(key),
368            },
369            Query::FullRange(key) => {
370                assert!(key.is_search_key());
371                Bound::Included(key)
372            }
373            Query::FullScan => Bound::Unbounded,
374        };
375        merger_iter.seek(search_key).await?;
376        Ok(merger_iter)
377    }
378
379    pub fn set_trace(&mut self, v: bool) {
380        self.trace = v;
381    }
382}
383
384/// This is an iterator that will allow iteration over merged layers.  The primary interface is via
385/// the LayerIterator trait.
386pub struct MergerIterator<'a, 'b, K, V> {
387    merge_fn: MergeFn<K, V>,
388
389    // Iterators that we have not yet pushed onto the heap.
390    pending_iterators: Vec<&'a mut MergeLayerIterator<'b, K, V>>,
391
392    // A heap with the merge iterators.
393    heap: BinaryHeap<&'a mut MergeLayerIterator<'b, K, V>>,
394
395    // The current item.
396    item: CurrentItem<'a, 'b, K, V>,
397
398    // If true, logs regarding merger behaviour are appended to history.
399    trace: bool,
400
401    // Holds trace information if trace is true.
402    history: String,
403}
404
405impl<'a, 'b, K: Key + LayerKey + OrdLowerBound, V: Value> MergerIterator<'a, 'b, K, V> {
406    pub fn pending_iterators_len(&self) -> usize {
407        self.pending_iterators.len()
408    }
409
410    /// Positions the iterator using `search_key`.
411    async fn seek(&mut self, search_key: Bound<&K>) -> Result<(), Error> {
412        self.push_iterators(search_key).await?;
413        self.advance_impl(search_key).await
414    }
415
416    // Merges items from an array of layers using the provided merge function. The merge function is
417    // repeatedly provided the lowest and the second lowest element, if one exists. In cases where
418    // the two lowest elements compare equal, the element with the lowest layer (i.e. whichever
419    // comes first in the layers array) will come first.  `search_key` is a bound for the next
420    // key we expect.
421    async fn advance_impl(&mut self, search_key: Bound<&K>) -> Result<(), Error> {
422        loop {
423            loop {
424                if self.heap.is_empty() {
425                    self.item = CurrentItem::None;
426                    return Ok(());
427                }
428                let lowest = self.heap.pop().unwrap();
429                let maybe_second_lowest = self.heap.pop();
430                if let Some(second_lowest) = maybe_second_lowest {
431                    let result = (self.merge_fn)(&lowest, &second_lowest);
432                    if self.trace {
433                        writeln!(
434                            self.history,
435                            "merge {:?}, {:?} -> {:?}",
436                            lowest.item(),
437                            second_lowest.item(),
438                            result
439                        )
440                        .unwrap();
441                    }
442                    match result {
443                        MergeResult::EmitLeft => {
444                            self.heap.push(second_lowest);
445                            self.item = CurrentItem::Iterator(lowest);
446                            break;
447                        }
448                        MergeResult::Other { emit, left, right } => {
449                            try_join!(
450                                lowest.maybe_advance(&left),
451                                second_lowest.maybe_advance(&right)
452                            )?;
453                            self.update_item(lowest, left);
454                            self.update_item(second_lowest, right);
455                            if let Some(emit) = emit {
456                                self.item = CurrentItem::Item(emit);
457                                break;
458                            }
459                        }
460                    }
461                } else {
462                    self.item = CurrentItem::Iterator(lowest);
463                    break;
464                }
465            }
466
467            // If the item we're about to yield isn't within `search_key`, ignore it and
468            // continue.  To see how this would happen, imagine the following scenario:
469            //
470            //       0          10          20            30
471            //       +----------+
472            //   0   |          |
473            //       +----------+-----------+
474            //   1   |                      |
475            //       +----------------------+-------------+
476            //   2   |                                    |
477            //       +------------------------------------+
478            //
479            // If we are to seek for 0..1 and then iterate over what we find, we expect the sequence
480            // 0..10, 10..20, 20..30.  After the first seek, we should only consult iterator 0.  For
481            // the first advance, we will consult iterator 1 but we need to merge the 0..20 element
482            // with the 0..10 element which we already emitted.  To make this work, we push the
483            // 0..10 item back on the heap (see advance) and then merge, but that will yield the
484            // 0..10 entry again, so here we need to skip over it, and then merge again, at which
485            // point we should see the 10..20 entry as expected.
486            match search_key {
487                Bound::Included(key)
488                    if self.get().unwrap().key.cmp_upper_bound(key) == Ordering::Less => {}
489                Bound::Excluded(key)
490                    if self.get().unwrap().key.cmp_upper_bound(key) != Ordering::Greater => {}
491                _ => return Ok(()),
492            }
493            if let Some(iterator) = self.item.take_iterator() {
494                iterator.advance().await?;
495                if iterator.is_some() {
496                    self.heap.push(iterator);
497                }
498            }
499        }
500    }
501
502    // Returns whether more iterators are required for the given `search_key`.  See push_iterators.
503    fn needs_more_iterators(&self, search_key: Bound<&K>) -> bool {
504        if self.pending_iterators.is_empty() {
505            return false;
506        }
507        if self.heap.is_empty() {
508            return true;
509        }
510        let Bound::Included(target) = search_key else { return true };
511        match target.merge_type() {
512            MergeType::FullMerge => true,
513            MergeType::OptimizedMerge => !self.heap.peek().unwrap().key().overlaps(target),
514        }
515    }
516
517    // Pushes additional iterators onto the heap until we are confident that the top element will
518    // yield what we are looking for.
519    async fn push_iterators(&mut self, search_key: Bound<&K>) -> Result<(), Error> {
520        while self.needs_more_iterators(search_key) {
521            let iter = self.pending_iterators.pop().unwrap();
522            let sub_iter = iter.layer.as_ref().unwrap().seek(search_key).await?;
523            if self.trace {
524                writeln!(
525                    self.history,
526                    "merger: search for {:?}, found {:?}",
527                    search_key,
528                    sub_iter.get()
529                )
530                .unwrap();
531            }
532            iter.iter = RawIterator::Const(sub_iter);
533            iter.set_item_from_iter();
534            if iter.is_some() {
535                self.heap.push(iter);
536            }
537        }
538        Ok(())
539    }
540
541    // Updates the merge iterator depending on |op|. If discarding, the iterator should have already
542    // been advanced.
543    fn update_item(&mut self, item: &'a mut MergeLayerIterator<'b, K, V>, op: ItemOp<K, V>) {
544        match op {
545            ItemOp::Keep => self.heap.push(item),
546            ItemOp::Discard => {
547                // The iterator should have already been advanced.
548                if item.is_some() {
549                    self.heap.push(item);
550                }
551            }
552            ItemOp::Replace(replacement) => {
553                item.replace(replacement);
554                self.heap.push(item);
555            }
556        }
557    }
558}
559
560#[async_trait]
561impl<'a, K: Key + LayerKey + OrdLowerBound, V: Value> LayerIterator<K, V>
562    for MergerIterator<'a, '_, K, V>
563{
564    async fn advance(&mut self) -> Result<(), Error> {
565        let owned_key;
566        let mut search_key = Bound::Unbounded;
567        if !self.pending_iterators.is_empty() {
568            if let Some(ItemRef { key, .. }) = self.get() {
569                match key.next_key() {
570                    Some(k) => {
571                        assert!(k.is_search_key());
572                        owned_key = k;
573                        search_key = Bound::Included(&owned_key);
574                    }
575                    None => {
576                        owned_key = key.clone();
577                        search_key = Bound::Excluded(&owned_key);
578                    }
579                }
580            }
581        }
582
583        // Advance the iterator for the current item and push it onto the heap, and also push any
584        // additional iterators onto the heap (by calling push_iterators).
585        if let Some(iterator) = self.item.take_iterator() {
586            if self.needs_more_iterators(search_key) {
587                let existing_item = iterator.item().boxed();
588                iterator.advance().await?;
589                if let Bound::Included(s) = search_key
590                    && iterator.is_some()
591                    && iterator.key().merge_type() == MergeType::OptimizedMerge
592                    && s.overlaps(iterator.key())
593                {
594                    // In this case, the key immediately following is a good candidate, and all
595                    // we need to do is merge it with existing iterators; we shouldn't need to
596                    // consult with any more iterators.
597                } else {
598                    // We are going to need to consult more iterators so we need to go back to
599                    // the previous item so that we can merge with it.  See the comment in
600                    // advance_impl.
601                    iterator.replace(existing_item);
602
603                    // We must push other iterators here before pushing iterator onto the heap
604                    // because we know `iterator` would end up at the top of the heap.
605                    self.push_iterators(search_key).await?;
606                }
607            } else {
608                iterator.advance().await?;
609            }
610            if iterator.is_some() {
611                self.heap.push(iterator);
612            }
613        } else {
614            self.push_iterators(search_key).await?;
615        }
616
617        self.advance_impl(search_key).await
618    }
619
620    fn get(&self) -> Option<ItemRef<'_, K, V>> {
621        (&self.item).into()
622    }
623}
624
625struct MutMergeLayerIterator<'a, K, V>(MergeLayerIterator<'a, K, V>);
626
627impl<K, V> MutMergeLayerIterator<'_, K, V> {
628    fn advance(&mut self) {
629        if let MergeItem::Iter = self.item {
630            self.as_mut().advance();
631        }
632        self.set_item_from_iter();
633    }
634}
635
636impl<'a, K, V> AsMut<dyn LayerIteratorMut<K, V> + 'a> for MutMergeLayerIterator<'a, K, V> {
637    fn as_mut(&mut self) -> &mut (dyn LayerIteratorMut<K, V> + 'a) {
638        let RawIterator::Mut(iter) = &mut self.0.iter else { unreachable!() };
639        iter.as_mut()
640    }
641}
642
643impl<'a, K, V> Deref for MutMergeLayerIterator<'a, K, V> {
644    type Target = MergeLayerIterator<'a, K, V>;
645
646    fn deref(&self) -> &Self::Target {
647        &self.0
648    }
649}
650
651impl<K, V> DerefMut for MutMergeLayerIterator<'_, K, V> {
652    fn deref_mut(&mut self) -> &mut Self::Target {
653        &mut self.0
654    }
655}
656
657// Merges the given item into a mutable layer.
658pub(super) fn merge_into<K: Debug + OrdLowerBound, V: Debug>(
659    mut_iter: Box<dyn LayerIteratorMut<K, V> + '_>,
660    item: Item<K, V>,
661    merge_fn: MergeFn<K, V>,
662) -> Result<(), Error> {
663    let merge_item = if mut_iter.get().is_some() { MergeItem::Iter } else { MergeItem::None };
664    let mut mut_merge_iter = MutMergeLayerIterator(MergeLayerIterator {
665        layer: None,
666        iter: RawIterator::Mut(mut_iter),
667        layer_index: 1,
668        item: merge_item,
669    });
670    let mut item_merge_iter = MergeLayerIterator::new_with_item(0, MergeItem::Item(item.boxed()));
671    while mut_merge_iter.is_some() && item_merge_iter.is_some() {
672        if mut_merge_iter.0 > item_merge_iter {
673            // In this branch the mutable layer is left and the item we're merging-in is right.
674            let merge_result = merge_fn(&mut_merge_iter, &item_merge_iter);
675            debug!(
676                lhs:? = mut_merge_iter.key(),
677                rhs:? = item_merge_iter.key(),
678                result:? = merge_result;
679                "(1) merge");
680            match merge_result {
681                MergeResult::EmitLeft => {
682                    if let Some(item) = mut_merge_iter.take_item() {
683                        mut_merge_iter.as_mut().insert(*item);
684                        mut_merge_iter.set_item_from_iter();
685                    } else {
686                        mut_merge_iter.advance();
687                    }
688                }
689                MergeResult::Other { emit, left, right } => {
690                    if let Some(emit) = emit {
691                        mut_merge_iter.as_mut().insert(*emit);
692                    }
693                    match left {
694                        ItemOp::Keep => {}
695                        ItemOp::Discard => {
696                            if matches!(mut_merge_iter.item, MergeItem::Iter) {
697                                mut_merge_iter.as_mut().erase();
698                            }
699                            mut_merge_iter.set_item_from_iter();
700                        }
701                        ItemOp::Replace(item) => {
702                            if let MergeItem::Iter = mut_merge_iter.item {
703                                mut_merge_iter.as_mut().erase();
704                            }
705                            mut_merge_iter.item = MergeItem::Item(item)
706                        }
707                    }
708                    match right {
709                        ItemOp::Keep => {}
710                        ItemOp::Discard => item_merge_iter.item = MergeItem::None,
711                        ItemOp::Replace(item) => item_merge_iter.item = MergeItem::Item(item),
712                    }
713                }
714            }
715        } else {
716            // In this branch, the item we're merging-in is left and the mutable layer is right.
717            let merge_result = merge_fn(&item_merge_iter, &mut_merge_iter);
718            debug!(
719                lhs:? = mut_merge_iter.key(),
720                rhs:? = item_merge_iter.key(),
721                result:? = merge_result;
722                "(2) merge");
723            match merge_result {
724                MergeResult::EmitLeft => break, // Item is inserted outside the loop
725                MergeResult::Other { emit, left, right } => {
726                    if let Some(emit) = emit {
727                        mut_merge_iter.as_mut().insert(*emit);
728                    }
729                    match left {
730                        ItemOp::Keep => {}
731                        ItemOp::Discard => item_merge_iter.item = MergeItem::None,
732                        ItemOp::Replace(item) => item_merge_iter.item = MergeItem::Item(item),
733                    }
734                    match right {
735                        ItemOp::Keep => {}
736                        ItemOp::Discard => {
737                            if matches!(mut_merge_iter.item, MergeItem::Iter) {
738                                mut_merge_iter.as_mut().erase();
739                            }
740                            mut_merge_iter.set_item_from_iter();
741                        }
742                        ItemOp::Replace(item) => {
743                            if let MergeItem::Iter = mut_merge_iter.item {
744                                mut_merge_iter.as_mut().erase();
745                            }
746                            mut_merge_iter.item = MergeItem::Item(item)
747                        }
748                    }
749                }
750            }
751        }
752    } // while ...
753
754    // The only way we could get here with both items is via the break above, so we know the correct
755    // order required here.
756    if let MergeItem::Item(item) = item_merge_iter.item {
757        mut_merge_iter.as_mut().insert(*item);
758    }
759    if let Some(item) = mut_merge_iter.take_item() {
760        mut_merge_iter.as_mut().insert(*item);
761    }
762    if let RawIterator::Mut(mut iter) = mut_merge_iter.0.iter {
763        iter.commit();
764    }
765    Ok(())
766}
767
768#[cfg(test)]
769mod tests {
770    use super::ItemOp::{Discard, Keep, Replace};
771    use super::{MergeLayerIterator, MergeResult, Merger};
772    use crate::lsm_tree::persistent_layer::{PersistentLayer, PersistentLayerWriter};
773    use crate::lsm_tree::skip_list_layer::SkipListLayer;
774    use crate::lsm_tree::types::{
775        FuzzyHash, Item, ItemRef, Key, Layer, LayerIterator, LayerKey, LayerWriter, MergeType,
776        OrdLowerBound, OrdUpperBound, SortByU64,
777    };
778    use crate::lsm_tree::{self, Query, Value};
779    use crate::object_store::{self, ObjectKey, ObjectValue, VOLUME_DATA_KEY_ID};
780    use crate::serialized_types::{
781        LATEST_VERSION, Version, Versioned, VersionedLatest, versioned_type,
782    };
783    use crate::testing::fake_object::{FakeObject, FakeObjectHandle};
784    use crate::testing::writer::Writer;
785    use fprint::TypeFingerprint;
786    use fxfs_macros::{FuzzyHash, SerializeKey};
787    use rand::Rng;
788    use std::hash::Hash;
789    use std::ops::{Bound, Range};
790    use std::sync::Arc;
791
792    use crate::lsm_tree::testing::TestKey;
793
794    impl Value for i32 {
795        const DELETED_MARKER: Self = 0;
796    }
797
798    fn layer_ref_iter<K: Key, V: Value>(
799        layers: &[Arc<SkipListLayer<K, V>>],
800    ) -> impl Iterator<Item = &dyn Layer<K, V>> {
801        layers.iter().map(|x| x.as_ref() as &dyn Layer<K, V>)
802    }
803
804    fn dyn_layer_ref_iter<K: Key, V: Value>(
805        layers: &[Arc<dyn Layer<K, V>>],
806    ) -> impl Iterator<Item = &dyn Layer<K, V>> {
807        layers.iter().map(|x| x.as_ref())
808    }
809
810    fn counters() -> Arc<lsm_tree::TreeCounters> {
811        Arc::new(lsm_tree::TreeCounters::default())
812    }
813
814    #[fuchsia::test]
815    async fn test_emit_left() {
816        let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
817        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
818        skip_lists[0].insert(items[1].clone()).expect("insert error");
819        skip_lists[1].insert(items[0].clone()).expect("insert error");
820        let mut merger = Merger::new(
821            layer_ref_iter(&skip_lists),
822            |_left, _right| MergeResult::EmitLeft,
823            counters(),
824        );
825        let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
826        let ItemRef { key, value, .. } = iter.get().expect("missing item");
827        assert_eq!((key, value), (&items[0].key, &items[0].value));
828        iter.advance().await.unwrap();
829        let ItemRef { key, value, .. } = iter.get().expect("missing item");
830        assert_eq!((key, value), (&items[1].key, &items[1].value));
831        iter.advance().await.unwrap();
832        assert!(iter.get().is_none());
833    }
834
835    #[fuchsia::test]
836    async fn test_other_emit() {
837        let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
838        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
839        skip_lists[0].insert(items[1].clone()).expect("insert error");
840        skip_lists[1].insert(items[0].clone()).expect("insert error");
841        let mut merger = Merger::new(
842            layer_ref_iter(&skip_lists),
843            |_left, _right| MergeResult::Other {
844                emit: Some(Item::new(TestKey(3..3), 3).boxed()),
845                left: Discard,
846                right: Discard,
847            },
848            counters(),
849        );
850        let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
851
852        let ItemRef { key, value, .. } = iter.get().expect("missing item");
853        assert_eq!((key, value), (&TestKey(3..3), &3));
854        iter.advance().await.unwrap();
855        assert!(iter.get().is_none());
856    }
857
858    #[fuchsia::test]
859    async fn test_replace_left() {
860        let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
861        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
862        skip_lists[0].insert(items[1].clone()).expect("insert error");
863        skip_lists[1].insert(items[0].clone()).expect("insert error");
864        let mut merger = Merger::new(
865            layer_ref_iter(&skip_lists),
866            |_left, _right| MergeResult::Other {
867                emit: None,
868                left: Replace(Item::new(TestKey(3..3), 3).boxed()),
869                right: Discard,
870            },
871            counters(),
872        );
873        let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
874
875        // The merger should replace the left item and then after discarding the right item, it
876        // should emit the replacement.
877        let ItemRef { key, value, .. } = iter.get().expect("missing item");
878        assert_eq!((key, value), (&TestKey(3..3), &3));
879        iter.advance().await.unwrap();
880        assert!(iter.get().is_none());
881    }
882
883    #[fuchsia::test]
884    async fn test_replace_right() {
885        let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
886        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
887        skip_lists[0].insert(items[1].clone()).expect("insert error");
888        skip_lists[1].insert(items[0].clone()).expect("insert error");
889        let mut merger = Merger::new(
890            layer_ref_iter(&skip_lists),
891            |_left, _right| MergeResult::Other {
892                emit: None,
893                left: Discard,
894                right: Replace(Item::new(TestKey(3..3), 3).boxed()),
895            },
896            counters(),
897        );
898        let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
899
900        // The merger should replace the right item and then after discarding the left item, it
901        // should emit the replacement.
902        let ItemRef { key, value, .. } = iter.get().expect("missing item");
903        assert_eq!((key, value), (&TestKey(3..3), &3));
904        iter.advance().await.unwrap();
905        assert!(iter.get().is_none());
906    }
907
908    #[fuchsia::test]
909    async fn test_left_less_than_right() {
910        let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
911        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
912        skip_lists[0].insert(items[1].clone()).expect("insert error");
913        skip_lists[1].insert(items[0].clone()).expect("insert error");
914        let mut merger = Merger::new(
915            layer_ref_iter(&skip_lists),
916            |left, right| {
917                assert_eq!((left.key(), left.value()), (&TestKey(1..1), &1));
918                assert_eq!((right.key(), right.value()), (&TestKey(2..2), &2));
919                MergeResult::EmitLeft
920            },
921            counters(),
922        );
923        merger.query(Query::FullScan).await.expect("seek failed");
924    }
925
926    #[fuchsia::test]
927    async fn test_left_equals_right() {
928        let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
929        let item = Item::new(TestKey(1..1), 1);
930        skip_lists[0].insert(item.clone()).expect("insert error");
931        skip_lists[1].insert(item.clone()).expect("insert error");
932        let mut merger = Merger::new(
933            layer_ref_iter(&skip_lists),
934            |left, right| {
935                assert_eq!((left.key(), left.value()), (&TestKey(1..1), &1));
936                assert_eq!((right.key(), right.value()), (&TestKey(1..1), &1));
937                assert_eq!(left.layer_index, 0);
938                assert_eq!(right.layer_index, 1);
939                MergeResult::EmitLeft
940            },
941            counters(),
942        );
943        merger.query(Query::FullScan).await.expect("seek failed");
944    }
945
946    #[fuchsia::test]
947    async fn test_keep() {
948        let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
949        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
950        skip_lists[0].insert(items[1].clone()).expect("insert error");
951        skip_lists[1].insert(items[0].clone()).expect("insert error");
952        let mut merger = Merger::new(
953            layer_ref_iter(&skip_lists),
954            |left, right| {
955                if left.key() == &TestKey(1..1) {
956                    MergeResult::Other {
957                        emit: None,
958                        left: Replace(Item::new(TestKey(3..3), 3).boxed()),
959                        right: Keep,
960                    }
961                } else {
962                    assert_eq!(left.key(), &TestKey(2..2));
963                    assert_eq!(right.key(), &TestKey(3..3));
964                    MergeResult::Other { emit: None, left: Discard, right: Keep }
965                }
966            },
967            counters(),
968        );
969        let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
970
971        // The merger should first replace left and then it should call the merger again with 2 & 3
972        // and end up just keeping 3.
973        let ItemRef { key, value, .. } = iter.get().expect("missing item");
974        assert_eq!((key, value), (&TestKey(3..3), &3));
975        iter.advance().await.unwrap();
976        assert!(iter.get().is_none());
977    }
978
979    #[fuchsia::test]
980    async fn test_merge_10_layers() {
981        let skip_lists: Vec<_> = (0..10).map(|_| SkipListLayer::new(100)).collect();
982        let mut rng = rand::rng();
983        for i in 0..100 {
984            skip_lists[rng.random_range(0..10) as usize]
985                .insert(Item::new(TestKey(i..i), i))
986                .expect("insert error");
987        }
988        let mut merger = Merger::new(
989            layer_ref_iter(&skip_lists),
990            |_left, _right| MergeResult::EmitLeft,
991            counters(),
992        );
993        let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
994
995        for i in 0..100 {
996            let ItemRef { key, value, .. } = iter.get().expect("missing item");
997            assert_eq!((key, value), (&TestKey(i..i), &i));
998            iter.advance().await.unwrap();
999        }
1000        assert!(iter.get().is_none());
1001    }
1002
1003    #[fuchsia::test]
1004    async fn test_merge_uses_cmp_lower_bound() {
1005        let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1006        let items = [Item::new(TestKey(1..10), 1), Item::new(TestKey(2..3), 2)];
1007        skip_lists[0].insert(items[1].clone()).expect("insert error");
1008        skip_lists[1].insert(items[0].clone()).expect("insert error");
1009        let mut merger = Merger::new(
1010            layer_ref_iter(&skip_lists),
1011            |_left, _right| MergeResult::EmitLeft,
1012            counters(),
1013        );
1014        let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
1015
1016        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1017        assert_eq!((key, value), (&items[0].key, &items[0].value));
1018        iter.advance().await.unwrap();
1019        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1020        assert_eq!((key, value), (&items[1].key, &items[1].value));
1021        iter.advance().await.unwrap();
1022        assert!(iter.get().is_none());
1023    }
1024
1025    #[fuchsia::test]
1026    async fn test_merge_into_emit_left() {
1027        let skip_list = SkipListLayer::new(100);
1028        let items =
1029            [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2), Item::new(TestKey(3..3), 3)];
1030        skip_list.insert(items[0].clone()).expect("insert error");
1031        skip_list.insert(items[2].clone()).expect("insert error");
1032        skip_list
1033            .merge_into(items[1].clone(), &items[0].key, |_left, _right| MergeResult::EmitLeft);
1034
1035        let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1036        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1037        assert_eq!((key, value), (&items[0].key, &items[0].value));
1038        iter.advance().await.unwrap();
1039        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1040        assert_eq!((key, value), (&items[1].key, &items[1].value));
1041        iter.advance().await.unwrap();
1042        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1043        assert_eq!((key, value), (&items[2].key, &items[2].value));
1044        iter.advance().await.unwrap();
1045        assert!(iter.get().is_none());
1046    }
1047
1048    #[fuchsia::test]
1049    async fn test_merge_into_emit_last_after_replacing() {
1050        let skip_list = SkipListLayer::new(100);
1051        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
1052        skip_list.insert(items[0].clone()).expect("insert error");
1053
1054        skip_list.merge_into(items[1].clone(), &items[0].key, |left, right| {
1055            if left.key() == &TestKey(1..1) {
1056                assert_eq!(right.key(), &TestKey(2..2));
1057                MergeResult::Other {
1058                    emit: None,
1059                    left: Replace(Item::new(TestKey(3..3), 3).boxed()),
1060                    right: Keep,
1061                }
1062            } else {
1063                assert_eq!(left.key(), &TestKey(2..2));
1064                assert_eq!(right.key(), &TestKey(3..3));
1065                MergeResult::EmitLeft
1066            }
1067        });
1068
1069        let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1070        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1071        assert_eq!((key, value), (&items[1].key, &items[1].value));
1072        iter.advance().await.unwrap();
1073        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1074        assert_eq!((key, value), (&TestKey(3..3), &3));
1075        iter.advance().await.unwrap();
1076        assert!(iter.get().is_none());
1077    }
1078
1079    #[fuchsia::test]
1080    async fn test_merge_into_emit_left_after_replacing() {
1081        let skip_list = SkipListLayer::new(100);
1082        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(3..3), 3)];
1083        skip_list.insert(items[0].clone()).expect("insert error");
1084
1085        skip_list.merge_into(items[1].clone(), &items[0].key, |left, right| {
1086            if left.key() == &TestKey(1..1) {
1087                assert_eq!(right.key(), &TestKey(3..3));
1088                MergeResult::Other {
1089                    emit: None,
1090                    left: Replace(Item::new(TestKey(2..2), 2).boxed()),
1091                    right: Keep,
1092                }
1093            } else {
1094                assert_eq!(left.key(), &TestKey(2..2));
1095                assert_eq!(right.key(), &TestKey(3..3));
1096                MergeResult::EmitLeft
1097            }
1098        });
1099
1100        let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1101        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1102        assert_eq!((key, value), (&TestKey(2..2), &2));
1103        iter.advance().await.unwrap();
1104        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1105        assert_eq!((key, value), (&items[1].key, &items[1].value));
1106        iter.advance().await.unwrap();
1107        assert!(iter.get().is_none());
1108    }
1109
1110    // This tests emitting in both branches of merge_into, and most of the discard paths.
1111    #[fuchsia::test]
1112    async fn test_merge_into_emit_other_and_discard() {
1113        let skip_list = SkipListLayer::new(100);
1114        let items =
1115            [Item::new(TestKey(1..1), 1), Item::new(TestKey(3..3), 3), Item::new(TestKey(5..5), 3)];
1116        skip_list.insert(items[0].clone()).expect("insert error");
1117        skip_list.insert(items[2].clone()).expect("insert error");
1118
1119        skip_list.merge_into(items[1].clone(), &items[0].key, |left, right| {
1120            if left.key() == &TestKey(1..1) {
1121                // This tests the top branch in merge_into.
1122                assert_eq!(right.key(), &TestKey(3..3));
1123                MergeResult::Other {
1124                    emit: Some(Item::new(TestKey(2..2), 2).boxed()),
1125                    left: Discard,
1126                    right: Keep,
1127                }
1128            } else {
1129                // This tests the bottom branch in merge_into.
1130                assert_eq!(left.key(), &TestKey(3..3));
1131                assert_eq!(right.key(), &TestKey(5..5));
1132                MergeResult::Other {
1133                    emit: Some(Item::new(TestKey(4..4), 4).boxed()),
1134                    left: Discard,
1135                    right: Discard,
1136                }
1137            }
1138        });
1139
1140        let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1141        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1142        assert_eq!((key, value), (&TestKey(2..2), &2));
1143        iter.advance().await.unwrap();
1144        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1145        assert_eq!((key, value), (&TestKey(4..4), &4));
1146        iter.advance().await.unwrap();
1147        assert!(iter.get().is_none());
1148    }
1149
1150    // This tests replacing the item and discarding the right item (the one remaining untested
1151    // discard path) in the top branch in merge_into.
1152    #[fuchsia::test]
1153    async fn test_merge_into_replace_and_discard() {
1154        let skip_list = SkipListLayer::new(100);
1155        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(3..3), 3)];
1156        skip_list.insert(items[0].clone()).expect("insert error");
1157
1158        skip_list.merge_into(items[1].clone(), &items[0].key, |_left, _right| MergeResult::Other {
1159            emit: Some(Item::new(TestKey(2..2), 2).boxed()),
1160            left: Replace(Item::new(TestKey(4..4), 4).boxed()),
1161            right: Discard,
1162        });
1163
1164        let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1165        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1166        assert_eq!((key, value), (&TestKey(2..2), &2));
1167        iter.advance().await.unwrap();
1168        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1169        assert_eq!((key, value), (&TestKey(4..4), &4));
1170        iter.advance().await.unwrap();
1171        assert!(iter.get().is_none());
1172    }
1173
1174    // This tests replacing the right item in the top branch of merge_into and the left item in the
1175    // bottom branch of merge_into.
1176    #[fuchsia::test]
1177    async fn test_merge_into_replace_merge_item() {
1178        let skip_list = SkipListLayer::new(100);
1179        let items =
1180            [Item::new(TestKey(1..1), 1), Item::new(TestKey(3..3), 3), Item::new(TestKey(5..5), 5)];
1181        skip_list.insert(items[0].clone()).expect("insert error");
1182        skip_list.insert(items[2].clone()).expect("insert error");
1183
1184        skip_list.merge_into(items[1].clone(), &items[0].key, |_left, right| {
1185            if right.key() == &TestKey(3..3) {
1186                MergeResult::Other {
1187                    emit: None,
1188                    left: Discard,
1189                    right: Replace(Item::new(TestKey(2..2), 2).boxed()),
1190                }
1191            } else {
1192                assert_eq!(right.key(), &TestKey(5..5));
1193                MergeResult::Other {
1194                    emit: None,
1195                    left: Replace(Item::new(TestKey(4..4), 4).boxed()),
1196                    right: Discard,
1197                }
1198            }
1199        });
1200
1201        let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1202        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1203        assert_eq!((key, value), (&TestKey(4..4), &4));
1204        iter.advance().await.unwrap();
1205        assert!(iter.get().is_none());
1206    }
1207
1208    // This tests replacing the right item in the bottom branch of merge_into.
1209    #[fuchsia::test]
1210    async fn test_merge_into_replace_existing() {
1211        let skip_list = SkipListLayer::new(100);
1212        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(3..3), 3)];
1213        skip_list.insert(items[1].clone()).expect("insert error");
1214
1215        skip_list.merge_into(items[0].clone(), &items[0].key, |_left, right| {
1216            if right.key() == &TestKey(3..3) {
1217                MergeResult::Other {
1218                    emit: None,
1219                    left: Keep,
1220                    right: Replace(Item::new(TestKey(2..2), 2).boxed()),
1221                }
1222            } else {
1223                MergeResult::EmitLeft
1224            }
1225        });
1226
1227        let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1228        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1229        assert_eq!((key, value), (&items[0].key, &items[0].value));
1230        iter.advance().await.unwrap();
1231        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1232        assert_eq!((key, value), (&TestKey(2..2), &2));
1233        iter.advance().await.unwrap();
1234        assert!(iter.get().is_none());
1235    }
1236
1237    #[fuchsia::test]
1238    async fn test_merge_into_discard_last() {
1239        let skip_list = SkipListLayer::new(100);
1240        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
1241        skip_list.insert(items[0].clone()).expect("insert error");
1242
1243        skip_list.merge_into(items[1].clone(), &items[0].key, |_left, _right| MergeResult::Other {
1244            emit: None,
1245            left: Discard,
1246            right: Keep,
1247        });
1248
1249        let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1250        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1251        assert_eq!((key, value), (&items[1].key, &items[1].value));
1252        iter.advance().await.unwrap();
1253        assert!(iter.get().is_none());
1254    }
1255
1256    #[fuchsia::test]
1257    async fn test_merge_into_empty() {
1258        let skip_list = SkipListLayer::new(100);
1259        let items = [Item::new(TestKey(1..1), 1)];
1260
1261        skip_list.merge_into(items[0].clone(), &items[0].key, |_left, _right| {
1262            panic!("Unexpected merge!");
1263        });
1264
1265        let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1266        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1267        assert_eq!((key, value), (&items[0].key, &items[0].value));
1268        iter.advance().await.unwrap();
1269        assert!(iter.get().is_none());
1270    }
1271
1272    #[fuchsia::test]
1273    async fn test_seek_uses_minimum_number_of_iterators() {
1274        let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1275        let items = [Item::new(TestKey(1..2), 1), Item::new(TestKey(1..2), 2)];
1276        skip_lists[0].insert(items[0].clone()).expect("insert error");
1277        skip_lists[1].insert(items[1].clone()).expect("insert error");
1278        let mut merger = Merger::new(
1279            layer_ref_iter(&skip_lists),
1280            |_left, _right| MergeResult::Other { emit: None, left: Discard, right: Keep },
1281            counters(),
1282        );
1283        let iter = merger
1284            .query(Query::FullRange(&items[0].key.search_key().unwrap()))
1285            .await
1286            .expect("seek failed");
1287
1288        // Seek should only search in the first skip list, so no merge should take place, and we'll
1289        // know if it has because we'll see a different value (2 rather than 1).
1290        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1291        assert_eq!((key, value), (&items[0].key, &items[0].value));
1292    }
1293
1294    // Checks that merging the given layers produces |expected| sequence of items starting from
1295    // |start|.
1296    async fn test_advance<K: Eq + Key + LayerKey + OrdLowerBound>(
1297        layers: &[&[(K, i32)]],
1298        query: Query<'_, K>,
1299        expected: &[(K, i32)],
1300    ) {
1301        let mut skip_lists = Vec::new();
1302        for &layer in layers {
1303            let skip_list = SkipListLayer::new(100);
1304            for (k, v) in layer {
1305                skip_list.insert(Item::new(k.clone(), *v)).expect("insert error");
1306            }
1307            skip_lists.push(skip_list);
1308        }
1309        let mut merger = Merger::new(
1310            layer_ref_iter(&skip_lists),
1311            |_left, _right| MergeResult::EmitLeft,
1312            counters(),
1313        );
1314        let mut iter = merger.query(query).await.expect("seek failed");
1315        for (k, v) in expected {
1316            let ItemRef { key, value, .. } = iter.get().expect("get failed");
1317            assert_eq!((key, value), (k, v));
1318            iter.advance().await.expect("advance failed");
1319        }
1320        assert!(iter.get().is_none());
1321    }
1322
1323    #[fuchsia::test]
1324    async fn test_seek_skips_replaced_items() {
1325        // The 1..2 and the 2..3 items are overwritten and merging them should be skipped.
1326        test_advance(
1327            &[
1328                &[(TestKey(1..2), 1), (TestKey(2..3), 2), (TestKey(4..5), 3)],
1329                &[(TestKey(1..2), 4), (TestKey(2..3), 5), (TestKey(3..4), 6)],
1330            ],
1331            Query::FullRange(&TestKey(1..2).search_key().unwrap()),
1332            &[(TestKey(1..2), 1), (TestKey(2..3), 2), (TestKey(3..4), 6), (TestKey(4..5), 3)],
1333        )
1334        .await;
1335    }
1336
1337    #[fuchsia::test]
1338    async fn test_advance_skips_replaced_items_at_end() {
1339        // Like the last test, the 1..2 item is overwritten and seeking for it should skip the merge
1340        // but this time, the keys are at the end.
1341        test_advance(
1342            &[&[(TestKey(1..2), 1)], &[(TestKey(1..2), 2)]],
1343            Query::FullRange(&TestKey(1..2).search_key().unwrap()),
1344            &[(TestKey(1..2), 1)],
1345        )
1346        .await;
1347    }
1348
1349    #[derive(
1350        Clone,
1351        Eq,
1352        Hash,
1353        FuzzyHash,
1354        PartialEq,
1355        Debug,
1356        serde::Serialize,
1357        serde::Deserialize,
1358        TypeFingerprint,
1359        Versioned,
1360        SerializeKey,
1361    )]
1362    struct TestKeyWithFullMerge(Range<u64>);
1363
1364    versioned_type! { 1.. => TestKeyWithFullMerge }
1365
1366    impl LayerKey for TestKeyWithFullMerge {
1367        fn merge_type(&self) -> MergeType {
1368            MergeType::FullMerge
1369        }
1370
1371        fn search_key(&self) -> Option<Self> {
1372            Some(Self(0..self.0.start + 1))
1373        }
1374
1375        fn is_search_key(&self) -> bool {
1376            self.0.start == 0
1377        }
1378
1379        fn overlaps(&self, other: &Self) -> bool {
1380            self.0.start < other.0.end && self.0.end > other.0.start
1381        }
1382    }
1383
1384    impl SortByU64 for TestKeyWithFullMerge {
1385        fn get_leading_u64(&self) -> u64 {
1386            self.0.end
1387        }
1388    }
1389
1390    impl OrdUpperBound for TestKeyWithFullMerge {
1391        fn cmp_upper_bound(&self, other: &TestKeyWithFullMerge) -> std::cmp::Ordering {
1392            self.0.end.cmp(&other.0.end)
1393        }
1394    }
1395
1396    impl OrdLowerBound for TestKeyWithFullMerge {
1397        fn cmp_lower_bound(&self, other: &Self) -> std::cmp::Ordering {
1398            self.0.start.cmp(&other.0.start)
1399        }
1400    }
1401
1402    // Same set up as `test_seek_skips_replaced_items` except here nothing should skip. Any seek
1403    // should just place the iterator at a particular spot in the merged layers.
1404    #[fuchsia::test]
1405    async fn test_full_merge_consistent_advance_ordering() {
1406        let layer_set = [
1407            [
1408                (TestKeyWithFullMerge(1..2), 1i32),
1409                (TestKeyWithFullMerge(2..3), 2i32),
1410                (TestKeyWithFullMerge(4..5), 3i32),
1411            ]
1412            .as_slice(),
1413            [
1414                (TestKeyWithFullMerge(1..2), 4i32),
1415                (TestKeyWithFullMerge(2..3), 5i32),
1416                (TestKeyWithFullMerge(3..4), 6i32),
1417            ]
1418            .as_slice(),
1419        ];
1420
1421        let full_merge_result = [
1422            (TestKeyWithFullMerge(1..2), 1),
1423            (TestKeyWithFullMerge(1..2), 4),
1424            (TestKeyWithFullMerge(2..3), 2),
1425            (TestKeyWithFullMerge(2..3), 5),
1426            (TestKeyWithFullMerge(3..4), 6),
1427            (TestKeyWithFullMerge(4..5), 3),
1428        ];
1429
1430        test_advance(layer_set.as_slice(), Query::FullScan, &full_merge_result).await;
1431
1432        test_advance(
1433            layer_set.as_slice(),
1434            Query::FullRange(&TestKeyWithFullMerge(1..2).search_key().unwrap()),
1435            &full_merge_result,
1436        )
1437        .await;
1438
1439        test_advance(
1440            layer_set.as_slice(),
1441            Query::FullRange(&TestKeyWithFullMerge(2..3).search_key().unwrap()),
1442            &full_merge_result[2..],
1443        )
1444        .await;
1445
1446        test_advance(
1447            layer_set.as_slice(),
1448            Query::FullRange(&TestKeyWithFullMerge(3..4).search_key().unwrap()),
1449            &full_merge_result[4..],
1450        )
1451        .await;
1452    }
1453
1454    #[fuchsia::test]
1455    async fn test_full_merge_always_consult_all_layers() {
1456        // | 1 |   |
1457        // |   | 2 |
1458        // | 3 | 4 |
1459        let skip_lists =
1460            [SkipListLayer::new(100), SkipListLayer::new(100), SkipListLayer::new(100)];
1461        let items = [
1462            Item::new(TestKeyWithFullMerge(1..2), 1),
1463            Item::new(TestKeyWithFullMerge(2..3), 2),
1464            Item::new(TestKeyWithFullMerge(1..2), 3),
1465            Item::new(TestKeyWithFullMerge(2..3), 4),
1466        ];
1467        skip_lists[0].insert(items[0].clone()).expect("insert error");
1468        skip_lists[1].insert(items[1].clone()).expect("insert error");
1469        skip_lists[2].insert(items[2].clone()).expect("insert error");
1470        skip_lists[2].insert(items[3].clone()).expect("insert error");
1471        let mut merger = Merger::new(
1472            layer_ref_iter(&skip_lists),
1473            |left, right| {
1474                // Sum matching keys.
1475                if left.key() == right.key() {
1476                    MergeResult::Other {
1477                        emit: None,
1478                        left: Discard,
1479                        right: Replace(
1480                            Item::new(left.key().clone(), left.value() + right.value()).boxed(),
1481                        ),
1482                    }
1483                } else {
1484                    MergeResult::EmitLeft
1485                }
1486            },
1487            counters(),
1488        );
1489        let mut iter = merger
1490            .query(Query::FullRange(&items[0].key.search_key().unwrap()))
1491            .await
1492            .expect("seek failed");
1493
1494        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1495        assert_eq!((key, *value), (&items[0].key, items[0].value + items[2].value));
1496        iter.advance().await.expect("advance");
1497        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1498        assert_eq!((key, *value), (&items[1].key, items[1].value + items[3].value));
1499        iter.advance().await.expect("advance");
1500        assert!(iter.get().is_none());
1501    }
1502
1503    #[derive(
1504        Clone,
1505        Eq,
1506        Hash,
1507        FuzzyHash,
1508        PartialEq,
1509        Debug,
1510        serde::Serialize,
1511        serde::Deserialize,
1512        TypeFingerprint,
1513        Versioned,
1514        SerializeKey,
1515    )]
1516    struct TestKeyWithDefaultLayerKey(Range<u64>);
1517
1518    versioned_type! { 1.. => TestKeyWithDefaultLayerKey }
1519
1520    // Default layer key is using `MergeType::FullMerge` and returns None for `next_key()`.
1521    impl LayerKey for TestKeyWithDefaultLayerKey {
1522        fn search_key(&self) -> Option<Self> {
1523            Some(Self(0..self.0.start + 1))
1524        }
1525
1526        fn is_search_key(&self) -> bool {
1527            self.0.start == 0
1528        }
1529
1530        fn overlaps(&self, other: &Self) -> bool {
1531            self.0.start < other.0.end && self.0.end > other.0.start
1532        }
1533    }
1534
1535    impl SortByU64 for TestKeyWithDefaultLayerKey {
1536        fn get_leading_u64(&self) -> u64 {
1537            self.0.end
1538        }
1539    }
1540
1541    impl OrdUpperBound for TestKeyWithDefaultLayerKey {
1542        fn cmp_upper_bound(&self, other: &TestKeyWithDefaultLayerKey) -> std::cmp::Ordering {
1543            self.0.end.cmp(&other.0.end)
1544        }
1545    }
1546
1547    impl OrdLowerBound for TestKeyWithDefaultLayerKey {
1548        fn cmp_lower_bound(&self, other: &Self) -> std::cmp::Ordering {
1549            self.0.start.cmp(&other.0.start)
1550        }
1551    }
1552
1553    #[fuchsia::test]
1554    async fn test_no_merge_unbounded_include_all_layers() {
1555        test_advance(
1556            &[
1557                &[
1558                    (TestKeyWithDefaultLayerKey(1..2), 1),
1559                    (TestKeyWithDefaultLayerKey(2..3), 2),
1560                    (TestKeyWithDefaultLayerKey(4..5), 3),
1561                ],
1562                &[
1563                    (TestKeyWithDefaultLayerKey(1..2), 4),
1564                    (TestKeyWithDefaultLayerKey(2..3), 5),
1565                    (TestKeyWithDefaultLayerKey(3..4), 6),
1566                ],
1567            ],
1568            Query::FullScan,
1569            &[
1570                (TestKeyWithDefaultLayerKey(1..2), 1),
1571                (TestKeyWithDefaultLayerKey(1..2), 4),
1572                (TestKeyWithDefaultLayerKey(2..3), 2),
1573                (TestKeyWithDefaultLayerKey(2..3), 5),
1574                (TestKeyWithDefaultLayerKey(3..4), 6),
1575                (TestKeyWithDefaultLayerKey(4..5), 3),
1576            ],
1577        )
1578        .await;
1579    }
1580
1581    #[fuchsia::test]
1582    async fn test_no_merge_proceeds_comprehensively_after_seek() {
1583        test_advance(
1584            &[
1585                &[
1586                    (TestKeyWithDefaultLayerKey(1..2), 1),
1587                    (TestKeyWithDefaultLayerKey(2..3), 2),
1588                    (TestKeyWithDefaultLayerKey(4..5), 3),
1589                ],
1590                &[
1591                    (TestKeyWithDefaultLayerKey(1..2), 4),
1592                    (TestKeyWithDefaultLayerKey(2..3), 5),
1593                    (TestKeyWithDefaultLayerKey(3..4), 6),
1594                ],
1595            ],
1596            Query::FullRange(&TestKeyWithDefaultLayerKey(1..2).search_key().unwrap()),
1597            &[
1598                (TestKeyWithDefaultLayerKey(1..2), 1),
1599                (TestKeyWithDefaultLayerKey(1..2), 4),
1600                (TestKeyWithDefaultLayerKey(2..3), 2),
1601                (TestKeyWithDefaultLayerKey(2..3), 5),
1602                (TestKeyWithDefaultLayerKey(3..4), 6),
1603                (TestKeyWithDefaultLayerKey(4..5), 3),
1604            ],
1605        )
1606        .await;
1607    }
1608
1609    #[fuchsia::test]
1610    async fn test_no_merge_seek_finds_lower_layer() {
1611        let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1612        let items = [
1613            Item::new(TestKeyWithDefaultLayerKey(2..3), 1),
1614            Item::new(TestKeyWithDefaultLayerKey(0..1), 2),
1615        ];
1616        skip_lists[0].insert(items[0].clone()).expect("insert error");
1617        skip_lists[1].insert(items[1].clone()).expect("insert error");
1618        let mut merger = Merger::new(
1619            layer_ref_iter(&skip_lists),
1620            |_left, _right| MergeResult::EmitLeft,
1621            counters(),
1622        );
1623        let iter = merger
1624            .query(Query::FullRange(&items[1].key.search_key().unwrap()))
1625            .await
1626            .expect("seek failed");
1627
1628        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1629        assert_eq!((key, value), (&items[1].key, &items[1].value));
1630    }
1631
1632    #[fuchsia::test]
1633    async fn test_no_merge_seek_stops_at_exact_match() {
1634        let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1635        let items = [
1636            Item::new(TestKeyWithDefaultLayerKey(2..3), 1),
1637            Item::new(TestKeyWithDefaultLayerKey(1..4), 2),
1638        ];
1639        skip_lists[0].insert(items[0].clone()).expect("insert error");
1640        skip_lists[1].insert(items[1].clone()).expect("insert error");
1641        let mut merger = Merger::new(
1642            layer_ref_iter(&skip_lists),
1643            |_left, _right| MergeResult::Other { emit: None, left: Discard, right: Keep },
1644            counters(),
1645        );
1646        let iter = merger
1647            .query(Query::FullRange(&items[0].key.search_key().unwrap()))
1648            .await
1649            .expect("seek failed");
1650
1651        // Seek should only search in the first skip list, so no merge should take place, and we'll
1652        // know if it has because we'll see a different value (2 rather than 1).
1653        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1654        assert_eq!((key, value), (&items[0].key, &items[0].value));
1655    }
1656
1657    #[fuchsia::test]
1658    async fn test_seek_less_than() {
1659        let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1660        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
1661        skip_lists[0].insert(items[0].clone()).expect("insert error");
1662        skip_lists[1].insert(items[1].clone()).expect("insert error");
1663        // Search for a key before 1..1.
1664        let mut merger = Merger::new(
1665            layer_ref_iter(&skip_lists),
1666            |_left, _right| MergeResult::Other { emit: None, left: Discard, right: Keep },
1667            counters(),
1668        );
1669        let iter = merger
1670            .query(Query::FullRange(&TestKey(0..0).search_key().unwrap()))
1671            .await
1672            .expect("seek failed");
1673
1674        // This should find the 2..2 key because of our merge function.
1675        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1676        assert_eq!((key, value), (&items[1].key, &items[1].value));
1677    }
1678
1679    #[fuchsia::test]
1680    async fn test_seek_to_end() {
1681        let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1682        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
1683        skip_lists[0].insert(items[0].clone()).expect("insert error");
1684        skip_lists[1].insert(items[1].clone()).expect("insert error");
1685        let mut merger = Merger::new(
1686            layer_ref_iter(&skip_lists),
1687            |_left, _right| MergeResult::Other { emit: None, left: Discard, right: Keep },
1688            counters(),
1689        );
1690        let iter = merger
1691            .query(Query::FullRange(&TestKey(3..3).search_key().unwrap()))
1692            .await
1693            .expect("seek failed");
1694
1695        assert!(iter.get().is_none());
1696    }
1697
1698    #[fuchsia::test]
1699    async fn test_merge_all_discarded() {
1700        let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1701        let items = [Item::new(TestKey(1..2), 1), Item::new(TestKey(2..3), 2)];
1702        skip_lists[0].insert(items[1].clone()).expect("insert error");
1703        skip_lists[1].insert(items[0].clone()).expect("insert error");
1704        let mut merger = Merger::new(
1705            layer_ref_iter(&skip_lists),
1706            |_left, _right| MergeResult::Other { emit: None, left: Discard, right: Discard },
1707            counters(),
1708        );
1709        let iter = merger.query(Query::FullScan).await.expect("seek failed");
1710        assert!(iter.get().is_none());
1711    }
1712
1713    #[fuchsia::test]
1714    async fn test_overlapping_keys() {
1715        let skip_lists =
1716            [SkipListLayer::new(100), SkipListLayer::new(100), SkipListLayer::new(100)];
1717        let items = [
1718            Item::new(TestKey(0..10), 1),
1719            Item::new(TestKey(0..20), 2),
1720            Item::new(TestKey(0..30), 3),
1721        ];
1722        skip_lists[0].insert(items[0].clone()).expect("insert error");
1723        skip_lists[1].insert(items[1].clone()).expect("insert error");
1724        skip_lists[2].insert(items[2].clone()).expect("insert error");
1725        let mut merger = Merger::new(
1726            layer_ref_iter(&skip_lists),
1727            |left, right| {
1728                if left.key().0.end <= right.key().0.start {
1729                    MergeResult::EmitLeft
1730                } else {
1731                    // Hardcoded rule for specific test data to test containment split.
1732                    if left.key() == &TestKey(0..30) && right.key() == &TestKey(10..20) {
1733                        MergeResult::Other {
1734                            emit: Some(Item::new(TestKey(0..10), 1).boxed()),
1735                            left: Replace(Item::new(TestKey(10..30), 1).boxed()),
1736                            right: Keep,
1737                        }
1738                    } else {
1739                        MergeResult::Other {
1740                            emit: None,
1741                            left: Keep,
1742                            right: Replace(
1743                                Item::new(TestKey(left.key().0.end..right.key().0.end), 1).boxed(),
1744                            ),
1745                        }
1746                    }
1747                }
1748            },
1749            counters(),
1750        );
1751        let mut iter = merger
1752            .query(Query::FullRange(&TestKey(0..1).search_key().unwrap()))
1753            .await
1754            .expect("seek failed");
1755        assert_eq!(iter.pending_iterators_len(), 2);
1756        let ItemRef { key, .. } = iter.get().expect("missing item");
1757        assert_eq!(key, &TestKey(0..10));
1758        iter.advance().await.expect("advance failed");
1759        assert_eq!(iter.pending_iterators_len(), 1);
1760        let ItemRef { key, .. } = iter.get().expect("missing item");
1761        assert_eq!(key, &TestKey(10..20));
1762        iter.advance().await.expect("advance failed");
1763        assert_eq!(iter.pending_iterators_len(), 0);
1764        let ItemRef { key, .. } = iter.get().expect("missing item");
1765        assert_eq!(key, &TestKey(20..30));
1766        iter.advance().await.expect("advance failed");
1767        assert_eq!(iter.pending_iterators_len(), 0);
1768        assert_eq!(iter.get(), None);
1769    }
1770
1771    async fn write_layer<K: Key, V: Value>(items: Vec<Item<K, V>>) -> Arc<dyn Layer<K, V>> {
1772        let object = Arc::new(FakeObject::new());
1773        let write_handle = FakeObjectHandle::new(object.clone());
1774        let mut writer =
1775            PersistentLayerWriter::<_, K, V>::new(Writer::new(&write_handle).await, 1, 512)
1776                .await
1777                .expect("PersistentLayerWriter::new failed");
1778        for item in items {
1779            writer.write(item.as_item_ref()).await.expect("write failed");
1780        }
1781        writer.flush().await.expect("flush failed");
1782        PersistentLayer::open(FakeObjectHandle::new(object))
1783            .await
1784            .expect("open_persistent_layer failed")
1785    }
1786
1787    fn merge_sum(
1788        left: &MergeLayerIterator<'_, i32, i32>,
1789        right: &MergeLayerIterator<'_, i32, i32>,
1790    ) -> MergeResult<i32, i32> {
1791        // Sum matching keys.
1792        if left.key() == right.key() {
1793            MergeResult::Other {
1794                emit: None,
1795                left: Discard,
1796                right: Replace(Item::new(left.key().clone(), left.value() + right.value()).boxed()),
1797            }
1798        } else {
1799            MergeResult::EmitLeft
1800        }
1801    }
1802
1803    #[fuchsia::test]
1804    async fn test_merge_bloom_filters_point_query() {
1805        let layer_0_items = vec![Item::new(1, 1), Item::new(2, 1)];
1806        let layer_1_items = vec![Item::new(2, 1), Item::new(4, 1)];
1807        let items = [Item::new(1, 1), Item::new(2, 2), Item::new(4, 1)];
1808        let layers: [Arc<dyn Layer<i32, i32>>; 2] =
1809            [write_layer(layer_0_items).await, write_layer(layer_1_items).await];
1810        let mut merger = Merger::new(dyn_layer_ref_iter(&layers), merge_sum, counters());
1811
1812        {
1813            // Key that exists in layer 0 only
1814            let iter = merger.query(Query::Point(&1)).await.expect("seek failed");
1815            let ItemRef { key, value, .. } = iter.get().expect("missing item");
1816            assert_eq!((key, *value), (&items[0].key, items[0].value));
1817        }
1818        {
1819            // Key that exists in layer 1 and 2
1820            let iter = merger.query(Query::Point(&2)).await.expect("seek failed");
1821            let ItemRef { key, value, .. } = iter.get().expect("missing item");
1822            assert_eq!((key, *value), (&items[1].key, items[1].value));
1823        }
1824        {
1825            // Key that exists in layer 1
1826            let iter = merger.query(Query::Point(&4)).await.expect("seek failed");
1827            let ItemRef { key, value, .. } = iter.get().expect("missing item");
1828            assert_eq!((key, *value), (&items[2].key, items[2].value));
1829        }
1830        {
1831            // Key that doesn't exist at all
1832            let iter = merger.query(Query::Point(&400)).await.expect("seek failed");
1833            assert!(iter.get().is_none());
1834        }
1835    }
1836
1837    #[fuchsia::test]
1838    async fn test_merge_bloom_filters_limited_range() {
1839        // NB: This test uses ObjectKey so that we don't have to reimplement the complex merging
1840        // logic for range-like keys.
1841        let layer_0_items = vec![Item::new(
1842            ObjectKey::extent(0, 0, 0..2048),
1843            ObjectValue::extent(0, VOLUME_DATA_KEY_ID),
1844        )];
1845        let layer_1_items = vec![
1846            Item::new(
1847                ObjectKey::extent(0, 0, 1024..4096),
1848                ObjectValue::extent(32768, VOLUME_DATA_KEY_ID),
1849            ),
1850            Item::new(
1851                ObjectKey::extent(0, 0, 16384..17408),
1852                ObjectValue::extent(65536, VOLUME_DATA_KEY_ID),
1853            ),
1854        ];
1855        let items = [
1856            Item::new(ObjectKey::extent(0, 0, 0..2048), ObjectValue::extent(0, VOLUME_DATA_KEY_ID)),
1857            Item::new(
1858                ObjectKey::extent(0, 0, 2048..4096),
1859                ObjectValue::extent(33792, VOLUME_DATA_KEY_ID),
1860            ),
1861            Item::new(
1862                ObjectKey::extent(0, 0, 16384..17408),
1863                ObjectValue::extent(65536, VOLUME_DATA_KEY_ID),
1864            ),
1865        ];
1866        let layers: [Arc<dyn Layer<ObjectKey, ObjectValue>>; 2] =
1867            [write_layer(layer_0_items).await, write_layer(layer_1_items).await];
1868        let mut merger =
1869            Merger::new(dyn_layer_ref_iter(&layers), object_store::merge::merge, counters());
1870
1871        {
1872            // Range contains just keys in layer 1
1873            let mut iter = merger
1874                .query(Query::LimitedRange(&ObjectKey::extent(0, 0, 16384..16386)))
1875                .await
1876                .expect("seek failed");
1877            let ItemRef { key, value, .. } = iter.get().expect("missing item");
1878            assert_eq!(key, &items[2].key);
1879            assert_eq!(value, &items[2].value);
1880            iter.advance().await.expect("advance");
1881            assert!(iter.get().is_none());
1882        }
1883        {
1884            // Range contains keys in layer 0 and 1
1885            let mut iter = merger
1886                .query(Query::LimitedRange(&ObjectKey::extent(0, 0, 0..4096)))
1887                .await
1888                .expect("seek failed");
1889            let ItemRef { key, value, .. } = iter.get().expect("missing item");
1890            assert_eq!(key, &items[0].key);
1891            assert_eq!(value, &items[0].value);
1892            iter.advance().await.expect("advance");
1893            let ItemRef { key, value, .. } = iter.get().expect("missing item");
1894            assert_eq!(key, &items[1].key);
1895            assert_eq!(value, &items[1].value);
1896            iter.advance().await.expect("advance");
1897        }
1898        {
1899            // Range contains no keys
1900            let mut iter = merger
1901                .query(Query::LimitedRange(&ObjectKey::extent(0, 0, 8192..12288)))
1902                .await
1903                .expect("seek failed");
1904            let ItemRef { key, .. } = iter.get().expect("missing item");
1905            assert_eq!(key, &items[2].key);
1906            iter.advance().await.expect("advance");
1907            assert!(iter.get().is_none());
1908        }
1909    }
1910
1911    #[fuchsia::test]
1912    async fn test_merge_bloom_filters_full_range() {
1913        let layer_0_items = vec![Item::new(1, 1), Item::new(2, 1)];
1914        let layer_1_items = vec![Item::new(2, 1), Item::new(4, 1)];
1915        let items = [Item::new(1, 1), Item::new(2, 2), Item::new(4, 1)];
1916        let layers: [Arc<dyn Layer<i32, i32>>; 2] =
1917            [write_layer(layer_0_items).await, write_layer(layer_1_items).await];
1918        let mut merger = Merger::new(dyn_layer_ref_iter(&layers), merge_sum, counters());
1919
1920        let mut iter = merger.query(Query::FullRange(&0)).await.expect("seek failed");
1921        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1922        assert_eq!((key, *value), (&items[0].key, items[0].value));
1923        iter.advance().await.expect("advance");
1924        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1925        assert_eq!((key, *value), (&items[1].key, items[1].value));
1926        iter.advance().await.expect("advance");
1927        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1928        assert_eq!((key, *value), (&items[2].key, items[2].value));
1929        iter.advance().await.expect("advance");
1930        assert!(iter.get().is_none());
1931    }
1932
1933    #[fuchsia::test]
1934    async fn test_merge_bloom_filters_full_scan() {
1935        let layer_0_items = vec![Item::new(1, 1), Item::new(2, 1)];
1936        let layer_1_items = vec![Item::new(2, 1), Item::new(4, 1)];
1937        let items = [Item::new(1, 1), Item::new(2, 2), Item::new(4, 1)];
1938        let layers: [Arc<dyn Layer<i32, i32>>; 2] =
1939            [write_layer(layer_0_items).await, write_layer(layer_1_items).await];
1940        let mut merger = Merger::new(dyn_layer_ref_iter(&layers), merge_sum, counters());
1941
1942        let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
1943        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1944        assert_eq!((key, *value), (&items[0].key, items[0].value));
1945        iter.advance().await.expect("advance");
1946        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1947        assert_eq!((key, *value), (&items[1].key, items[1].value));
1948        iter.advance().await.expect("advance");
1949        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1950        assert_eq!((key, *value), (&items[2].key, items[2].value));
1951        iter.advance().await.expect("advance");
1952        assert!(iter.get().is_none());
1953    }
1954
1955    #[fuchsia::test]
1956    async fn test_optimized_merge_lazy_pull() {
1957        let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1958        let items_top = [
1959            Item::new(TestKey(10..20), 1),
1960            Item::new(TestKey(20..30), 2),
1961            Item::new(TestKey(30..40), 3),
1962        ];
1963        let items_bottom = [Item::new(TestKey(40..50), 4)];
1964
1965        for item in &items_top {
1966            skip_lists[0].insert(item.clone()).expect("insert error");
1967        }
1968        for item in &items_bottom {
1969            skip_lists[1].insert(item.clone()).expect("insert error");
1970        }
1971
1972        let mut merger =
1973            Merger::new(layer_ref_iter(&skip_lists), |_, _| MergeResult::EmitLeft, counters());
1974        merger.set_trace(true);
1975
1976        let mut iter =
1977            merger.query(Query::LimitedRange(&TestKey(10..20))).await.expect("seek failed");
1978        assert_eq!(iter.pending_iterators_len(), 1);
1979
1980        let ItemRef { key, .. } = iter.get().expect("missing item");
1981        assert_eq!(key, &TestKey(10..20));
1982
1983        iter.advance().await.expect("advance failed");
1984        assert_eq!(iter.pending_iterators_len(), 1);
1985
1986        let ItemRef { key, .. } = iter.get().expect("missing item");
1987        assert_eq!(key, &TestKey(20..30));
1988
1989        iter.advance().await.expect("advance failed");
1990        assert_eq!(iter.pending_iterators_len(), 1);
1991
1992        let ItemRef { key, .. } = iter.get().expect("missing item");
1993        assert_eq!(key, &TestKey(30..40));
1994
1995        iter.advance().await.expect("advance failed");
1996        assert_eq!(iter.pending_iterators_len(), 0);
1997
1998        let ItemRef { key, .. } = iter.get().expect("missing item");
1999        assert_eq!(key, &TestKey(40..50));
2000
2001        iter.advance().await.expect("advance failed");
2002        assert!(iter.get().is_none());
2003    }
2004}