Skip to main content

fxfs/lsm_tree/
merge.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::log::*;
6use crate::lsm_tree;
7use crate::lsm_tree::types::{
8    BoxedItem, Item, ItemRef, Key, Layer, LayerIterator, LayerIteratorMut, LayerKey, MergeType,
9    OrdLowerBound, Value,
10};
11use anyhow::Error;
12use async_trait::async_trait;
13use futures::try_join;
14use std::cmp::Ordering;
15use std::collections::BinaryHeap;
16use std::fmt::{Debug, Write};
17use std::ops::{Bound, Deref, DerefMut};
18use std::sync::{Arc, atomic};
19
20#[derive(Debug, Eq, PartialEq)]
21pub enum ItemOp<K, V> {
22    /// Keeps the item to be presented to the merger subsequently with a new merge pair.
23    Keep,
24
25    /// Discards the item and moves on to the next item in the respective layer.
26    Discard,
27
28    /// Replaces the item with something new which will be presented to the merger subsequently with
29    /// a new pair.
30    Replace(BoxedItem<K, V>),
31}
32
33#[derive(Debug, Eq, PartialEq)]
34pub enum MergeResult<K, V> {
35    /// Emits the left item unchanged. Keeps the right item. This is the common case. Once an item
36    /// has been emitted, it will never be seen again by the merge function.
37    EmitLeft,
38
39    /// All other merge results are covered by the following. Take care when replacing items
40    /// that you replace the correct item. The merger will never merge two items together from
41    /// the same layer. Consider the following scenario:
42    ///
43    ///        +-----------+              +-----------+
44    /// 0:     |    A      |              |    C      |
45    ///        +-----------+--------------+-----------+
46    /// 1:                 |      B       |
47    ///                    +--------------+
48    ///
49    /// Let's say that all three items can be merged together. The merge function will first be
50    /// presented with items A and B, at which point it has the option of replacing the left item
51    /// (i.e. A, in layer 0) or the right item (i.e. B in layer 1). However, if you replace the left
52    /// item, the merge function will not then be given the opportunity to merge it with C, so the
53    /// correct thing to do in this case is to replace the right item B in layer 1, and discard the
54    /// left item. A rule you can use is that you should avoid replacing an item with another item
55    /// whose upper bound exceeds that of the item you are replacing.
56    ///
57    /// There are some combinations that might lead to infinite loops (e.g. None, Keep, Keep) and
58    /// should obviously be avoided.
59    Other { emit: Option<BoxedItem<K, V>>, left: ItemOp<K, V>, right: ItemOp<K, V> },
60}
61
62/// Users must provide a merge function which will take pairs of items, left and right, and return a
63/// merge result. The left item's key will either be less than the right item's key, or if they are
64/// the same, then the left item will be in a lower layer index (lower layer indexes indicate more
65/// recent entries). The last remaining item is always emitted.
66pub type MergeFn<K, V> =
67    fn(&MergeLayerIterator<'_, K, V>, &MergeLayerIterator<'_, K, V>) -> MergeResult<K, V>;
68
69pub enum MergeItem<K, V> {
70    None,
71    Item(BoxedItem<K, V>),
72    Iter,
73}
74
75enum RawIterator<'a, K, V> {
76    None,
77    Const(Box<dyn LayerIterator<K, V> + 'a>),
78    Mut(Box<dyn LayerIteratorMut<K, V> + 'a>),
79}
80
81// SAFETY: LayerIteratorMut is not Sync or Send, but it's only used by merge_into, which doesn't
82// send across threads (and is sync, not async).
83unsafe impl<K, V> Send for RawIterator<'_, K, V> {}
84
85// An iterator that keeps track of where we are for each of the layers. We push these onto a
86// min-heap.
87pub struct MergeLayerIterator<'a, K, V> {
88    layer: Option<&'a dyn Layer<K, V>>,
89
90    // The underlying iterator.
91    iter: RawIterator<'a, K, V>,
92
93    // The index of the layer this is for.
94    pub layer_index: u16,
95
96    // The item we are currently pointing at.
97    item: MergeItem<K, V>,
98}
99
100impl<'a, K, V> MergeLayerIterator<'a, K, V> {
101    pub fn key(&self) -> &K {
102        self.item().key
103    }
104
105    pub fn value(&self) -> &V {
106        self.item().value
107    }
108
109    pub fn sequence(&self) -> u64 {
110        self.item().sequence
111    }
112
113    fn new(layer_index: u16, layer: &'a dyn Layer<K, V>) -> Self {
114        MergeLayerIterator {
115            layer: Some(layer),
116            iter: RawIterator::None,
117            layer_index,
118            item: MergeItem::None,
119        }
120    }
121
122    fn new_with_item(layer_index: u16, item: MergeItem<K, V>) -> Self {
123        MergeLayerIterator { layer: None, iter: RawIterator::None, layer_index, item }
124    }
125
126    fn item(&self) -> ItemRef<'_, K, V> {
127        match &self.item {
128            MergeItem::None => panic!("No item!"),
129            MergeItem::Item(item) => ItemRef::from(item),
130            MergeItem::Iter => self.get().unwrap(),
131        }
132    }
133
134    fn get(&self) -> Option<ItemRef<'_, K, V>> {
135        match &self.iter {
136            RawIterator::None => panic!("No iterator!"),
137            RawIterator::Const(iter) => iter.get(),
138            RawIterator::Mut(iter) => iter.get(),
139        }
140    }
141
142    fn set_item_from_iter(&mut self) {
143        self.item = if match &self.iter {
144            RawIterator::None => unreachable!(),
145            RawIterator::Const(iter) => iter.get(),
146            RawIterator::Mut(iter) => iter.get(),
147        }
148        .is_some()
149        {
150            MergeItem::Iter
151        } else {
152            MergeItem::None
153        };
154    }
155
156    fn take_item(&mut self) -> Option<BoxedItem<K, V>> {
157        if let MergeItem::Item(_) = self.item {
158            let mut item = MergeItem::None;
159            std::mem::swap(&mut self.item, &mut item);
160            if let MergeItem::Item(item) = item {
161                Some(item)
162            } else {
163                unreachable!();
164            }
165        } else {
166            None
167        }
168    }
169
170    async fn advance(&mut self) -> Result<(), Error> {
171        if let MergeItem::Iter = self.item {
172            if let RawIterator::Const(iter) = &mut self.iter {
173                iter.advance().await?;
174            } else {
175                // This will never get called in the RawIterator::Mut case.
176                unreachable!();
177            }
178        }
179        self.set_item_from_iter();
180        Ok(())
181    }
182
183    fn replace(&mut self, item: BoxedItem<K, V>) {
184        self.item = MergeItem::Item(item);
185    }
186
187    fn is_some(&self) -> bool {
188        !matches!(self.item, MergeItem::None)
189    }
190
191    // This function exists so that we can advance multiple iterators concurrently using, say,
192    // try_join!.
193    async fn maybe_advance(&mut self, op: &ItemOp<K, V>) -> Result<(), Error> {
194        if let ItemOp::Keep = op { Ok(()) } else { self.advance().await }
195    }
196}
197
198// -- Ord and friends --
199impl<K: OrdLowerBound, V> Ord for MergeLayerIterator<'_, K, V> {
200    fn cmp(&self, other: &Self) -> Ordering {
201        // Reverse ordering because we want min-heap not max-heap.
202        other.key().cmp_lower_bound(self.key()).then(other.layer_index.cmp(&self.layer_index))
203    }
204}
205impl<K: OrdLowerBound, V> PartialOrd for MergeLayerIterator<'_, K, V> {
206    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
207        Some(self.cmp(other))
208    }
209}
210impl<K: OrdLowerBound, V> PartialEq for MergeLayerIterator<'_, K, V> {
211    fn eq(&self, other: &Self) -> bool {
212        self.cmp(other) == Ordering::Equal
213    }
214}
215impl<K: OrdLowerBound, V> Eq for MergeLayerIterator<'_, K, V> {}
216
217// As we merge items, the current item can be an item that has been replaced (and later emitted) by
218// the merge function, or an item referenced by an iterator, or nothing.
219enum CurrentItem<'a, 'b, K, V> {
220    None,
221    Item(BoxedItem<K, V>),
222    Iterator(&'a mut MergeLayerIterator<'b, K, V>),
223}
224
225impl<'a, 'b, K, V> CurrentItem<'a, 'b, K, V> {
226    // Takes the iterator if one is present and replaces the current item with None; otherwise,
227    // leaves the current item untouched.
228    fn take_iterator(&mut self) -> Option<&'a mut MergeLayerIterator<'b, K, V>> {
229        if let CurrentItem::Iterator(_) = self {
230            let mut result = CurrentItem::None;
231            std::mem::swap(self, &mut result);
232            if let CurrentItem::Iterator(iter) = result {
233                Some(iter)
234            } else {
235                unreachable!();
236            }
237        } else {
238            None
239        }
240    }
241}
242
243impl<'a, K, V> From<&'a CurrentItem<'_, '_, K, V>> for Option<ItemRef<'a, K, V>> {
244    fn from(iter: &'a CurrentItem<'_, '_, K, V>) -> Option<ItemRef<'a, K, V>> {
245        match iter {
246            CurrentItem::None => None,
247            CurrentItem::Iterator(iterator) => Some(iterator.item()),
248            CurrentItem::Item(item) => Some(item.into()),
249        }
250    }
251}
252
253/// Merger is the main entry point to merging.
254pub struct Merger<'a, K, V> {
255    // A buffer containing all the MergeLayerIterator objects.
256    iterators: Vec<MergeLayerIterator<'a, K, V>>,
257
258    // The function to be used for merging items.
259    merge_fn: MergeFn<K, V>,
260
261    // If true, additional logging is enabled.
262    trace: bool,
263
264    // Tracks statistics related to the LSM tree.
265    counters: Arc<lsm_tree::TreeCounters>,
266}
267
268/// Query describes the goal of a search in the LSM tree.  The caller specifies this to guide the
269/// merger on which layer files it must consult.
270/// Layers might be skipped during a query for a few reasons:
271///   * Existence filters might allow layer files to be skipped.
272///   * For bounded queries (i.e. any except FullScan), if the search key has
273///     MergeType::OptimizedMerge, we can use that to omit older layers once we find a match.
274#[derive(Debug, Clone)]
275pub enum Query<'a, K: Key + LayerKey + OrdLowerBound> {
276    /// Point queries look for a specific key in the LSM tree.  In this case, the existence filters
277    /// for each layer file can be used to decide if the layer file needs to be consulted.
278    /// Note that it is an error to use `Point` for range-like keys.  Either `LimitedRange` or
279    /// `FullRange` must be used instead.
280    Point(&'a K),
281    /// LimitedRange queries position the iterator to `K::search_key`, and scans forward until the
282    /// first record which does not overlap the provided key.  In this case, the existence filters
283    /// for each layer file can be used, but we have to check for all possible keys in the range we
284    /// wish to search.  Obviously, that means that the range should be, well, limited.  Fuzzy
285    /// hashes permit this for extent-like keys, but these queries are not the right choice for
286    /// things like searching a directory.
287    LimitedRange(&'a K),
288    /// FullRange queries position the iterator to a starting key, and scan forward to the
289    /// first key of a different type.  In this case, the existence filters are not used.
290    FullRange(&'a K),
291    /// FullScan queries are intended to yield every record in the tree.  In this case, the
292    /// existence filters are not used.
293    FullScan,
294}
295
296impl<'a, K: Key + LayerKey + OrdLowerBound> Query<'a, K> {
297    fn needs_layer<V>(&self, layer: &dyn Layer<K, V>) -> bool {
298        match self {
299            Self::Point(key) => layer.maybe_contains_key(key),
300            Self::LimitedRange(key) => layer.maybe_contains_key(key),
301            Self::FullRange(_) => true,
302            Self::FullScan => true,
303        }
304    }
305}
306
307#[fxfs_trace::trace]
308impl<'a, K: Key + LayerKey + OrdLowerBound, V: Value> Merger<'a, K, V> {
309    pub(super) fn new<I: Iterator<Item = &'a dyn Layer<K, V>>>(
310        layers: I,
311        merge_fn: MergeFn<K, V>,
312        counters: Arc<lsm_tree::TreeCounters>,
313    ) -> Merger<'a, K, V> {
314        Merger {
315            iterators: layers
316                .enumerate()
317                .map(|(index, layer)| MergeLayerIterator::new(index as u16, layer))
318                .collect(),
319            merge_fn: merge_fn,
320            trace: false,
321            counters,
322        }
323    }
324
325    /// Executes `query`, positioning the iterator to the first matching item.  See `Query` for
326    /// details.
327    #[trace]
328    pub async fn query(
329        &mut self,
330        query: Query<'_, K>,
331    ) -> Result<MergerIterator<'_, 'a, K, V>, Error> {
332        if let Query::Point(key) = query {
333            // NB: It is almost certainly an error to provide a range-like key to a Point query,
334            // hence this debug assertion.  This will most likely result in the wrong result, since
335            // the starting bound would be the original range key and therefore we might miss
336            // records that start earlier but overlap with the key.
337            // For example, if there is a layer which contains 0..100 and 100..200, and we search
338            // for 50..150, we should get both extents, which requires a different starting bound
339            // than 50..150 (particularly it would require the `K::search_key`).
340            debug_assert!(!key.is_range_key())
341        };
342        let len = self.iterators.len();
343        let pending_iterators = {
344            fxfs_trace::duration!("Merger::filter_layer_files", "len" => len);
345            self.iterators
346                .iter_mut()
347                .rev()
348                .filter(|l| query.needs_layer(l.layer.clone().unwrap()))
349                .collect::<Vec<&mut MergeLayerIterator<'a, K, V>>>()
350        };
351        let layer_count = pending_iterators.len();
352        {
353            self.counters.num_seeks.fetch_add(1, atomic::Ordering::Relaxed);
354            self.counters.layer_files_total.fetch_add(len, atomic::Ordering::Relaxed);
355            self.counters
356                .layer_files_skipped
357                .fetch_add(len - layer_count, atomic::Ordering::Relaxed);
358        }
359        log::debug!(query:?; "Consulting {}/{} layers", layer_count, len);
360        let mut merger_iter = MergerIterator {
361            merge_fn: self.merge_fn,
362            pending_iterators,
363            heap: BinaryHeap::with_capacity(layer_count),
364            item: CurrentItem::None,
365            trace: self.trace,
366            history: String::new(),
367        };
368        let owned_bound;
369        let bound = match query {
370            Query::Point(key) => Bound::Included(key),
371            Query::LimitedRange(key) => {
372                owned_bound = Bound::Included(key.search_key());
373                owned_bound.as_ref()
374            }
375            Query::FullRange(start) => Bound::Included(start),
376            Query::FullScan => Bound::Unbounded,
377        };
378        merger_iter.seek(bound).await?;
379        Ok(merger_iter)
380    }
381
382    pub fn set_trace(&mut self, v: bool) {
383        self.trace = v;
384    }
385}
386
387/// This is an iterator that will allow iteration over merged layers.  The primary interface is via
388/// the LayerIterator trait.
389pub struct MergerIterator<'a, 'b, K, V> {
390    merge_fn: MergeFn<K, V>,
391
392    // Iterators that we have not yet pushed onto the heap.
393    pending_iterators: Vec<&'a mut MergeLayerIterator<'b, K, V>>,
394
395    // A heap with the merge iterators.
396    heap: BinaryHeap<&'a mut MergeLayerIterator<'b, K, V>>,
397
398    // The current item.
399    item: CurrentItem<'a, 'b, K, V>,
400
401    // If true, logs regarding merger behaviour are appended to history.
402    trace: bool,
403
404    // Holds trace information if trace is true.
405    history: String,
406}
407
408impl<'a, 'b, K: Key + LayerKey + OrdLowerBound, V: Value> MergerIterator<'a, 'b, K, V> {
409    async fn seek(&mut self, bound: Bound<&K>) -> Result<(), Error> {
410        let next_key = match bound {
411            Bound::Unbounded => None,
412            Bound::Included(key) => Some(key),
413            Bound::Excluded(_) => panic!("Excluded bounds not supported!"),
414        };
415
416        self.push_iterators(next_key, bound).await?;
417        self.advance_impl(bound).await
418    }
419
420    // Merges items from an array of layers using the provided merge function. The merge function is
421    // repeatedly provided the lowest and the second lowest element, if one exists. In cases where
422    // the two lowest elements compare equal, the element with the lowest layer (i.e. whichever
423    // comes first in the layers array) will come first.  `next_key_bound` is a bound for the next
424    // key we expect.
425    async fn advance_impl(&mut self, next_key_bound: Bound<&K>) -> Result<(), Error> {
426        loop {
427            loop {
428                if self.heap.is_empty() {
429                    self.item = CurrentItem::None;
430                    return Ok(());
431                }
432                let lowest = self.heap.pop().unwrap();
433                let maybe_second_lowest = self.heap.pop();
434                if let Some(second_lowest) = maybe_second_lowest {
435                    let result = (self.merge_fn)(&lowest, &second_lowest);
436                    if self.trace {
437                        writeln!(
438                            self.history,
439                            "merge {:?}, {:?} -> {:?}",
440                            lowest.item(),
441                            second_lowest.item(),
442                            result
443                        )
444                        .unwrap();
445                    }
446                    match result {
447                        MergeResult::EmitLeft => {
448                            self.heap.push(second_lowest);
449                            self.item = CurrentItem::Iterator(lowest);
450                            break;
451                        }
452                        MergeResult::Other { emit, left, right } => {
453                            try_join!(
454                                lowest.maybe_advance(&left),
455                                second_lowest.maybe_advance(&right)
456                            )?;
457                            self.update_item(lowest, left);
458                            self.update_item(second_lowest, right);
459                            if let Some(emit) = emit {
460                                self.item = CurrentItem::Item(emit);
461                                break;
462                            }
463                        }
464                    }
465                } else {
466                    self.item = CurrentItem::Iterator(lowest);
467                    break;
468                }
469            }
470
471            // If the item we're about to yield isn't within `next_key_bound`, ignore it and
472            // continue.  To see how this would happen, imagine the following scenario:
473            //
474            //       0          10          20            30
475            //       +----------+
476            //   0   |          |
477            //       +----------+-----------+
478            //   1   |                      |
479            //       +----------------------+-------------+
480            //   2   |                                    |
481            //       +------------------------------------+
482            //
483            // If we are to seek for 0..1 and then iterate over what we find, we expect the sequence
484            // 0..10, 10..20, 20..30.  After the first seek, we should only consult iterator 0.  For
485            // the first advance, we will consult iterator 1 but we need to merge the 0..20 element
486            // with the 0..10 element which we already emitted.  To make this work, we push the
487            // 0..10 item back on the heap (see advance) and then merge, but that will yield the
488            // 0..10 entry again, so here we need to skip over it, and then merge again, at which
489            // point we should see the 10..20 entry as expected.
490            match next_key_bound {
491                Bound::Included(key)
492                    if self.get().unwrap().key.cmp_upper_bound(key) == Ordering::Less => {}
493                Bound::Excluded(key)
494                    if self.get().unwrap().key.cmp_upper_bound(key) != Ordering::Greater => {}
495                _ => return Ok(()),
496            }
497            if let Some(iterator) = self.item.take_iterator() {
498                iterator.advance().await?;
499                if iterator.is_some() {
500                    self.heap.push(iterator);
501                }
502            }
503        }
504    }
505
506    // Returns whether more iterators are required for the given `next_key`.  See push_iterators.
507    fn needs_more_iterators(&self, next_key: Option<&K>, next_key_bound: Bound<&K>) -> bool {
508        if self.pending_iterators.is_empty() {
509            return false;
510        }
511        if self.heap.is_empty() {
512            return true;
513        }
514        let target;
515        match next_key_bound {
516            Bound::Included(k) => target = k,
517            Bound::Excluded(_) | Bound::Unbounded => return true,
518        }
519        match target.merge_type() {
520            MergeType::FullMerge => true,
521            MergeType::OptimizedMerge => {
522                match next_key {
523                    Some(k) => {
524                        self.heap.peek().unwrap().key().cmp_lower_bound(k) == Ordering::Greater
525                    }
526                    None => {
527                        // If we've found an exact match, return it since nothing needs to merge.
528                        self.heap.peek().unwrap().key().cmp_lower_bound(target) != Ordering::Equal
529                    }
530                }
531            }
532        }
533    }
534
535    // Pushes additional iterators onto the heap until we are confident that the top element will
536    // yield what we are looking for.  If next_key is set, we will stop pushing iterators as soon as
537    // a key is encountered that equals or precedes it.  If next_key is None, then all layers are
538    // pushed.  next_key_bound is the bound to search for if another layer does need to be
539    // consulted.  See the comment for the MergeType trait.
540    async fn push_iterators(
541        &mut self,
542        next_key: Option<&K>,
543        next_key_bound: Bound<&K>,
544    ) -> Result<(), Error> {
545        while self.needs_more_iterators(next_key, next_key_bound) {
546            let iter = self.pending_iterators.pop().unwrap();
547            let sub_iter = iter.layer.as_ref().unwrap().seek(next_key_bound).await?;
548            if self.trace {
549                writeln!(
550                    self.history,
551                    "merger: search for {:?}, found {:?}",
552                    next_key_bound,
553                    sub_iter.get()
554                )
555                .unwrap();
556            }
557            iter.iter = RawIterator::Const(sub_iter);
558            iter.set_item_from_iter();
559            if iter.is_some() {
560                self.heap.push(iter);
561            }
562        }
563        Ok(())
564    }
565
566    // Updates the merge iterator depending on |op|. If discarding, the iterator should have already
567    // been advanced.
568    fn update_item(&mut self, item: &'a mut MergeLayerIterator<'b, K, V>, op: ItemOp<K, V>) {
569        match op {
570            ItemOp::Keep => self.heap.push(item),
571            ItemOp::Discard => {
572                // The iterator should have already been advanced.
573                if item.is_some() {
574                    self.heap.push(item);
575                }
576            }
577            ItemOp::Replace(replacement) => {
578                item.replace(replacement);
579                self.heap.push(item);
580            }
581        }
582    }
583}
584
585#[async_trait]
586impl<'a, K: Key + LayerKey + OrdLowerBound, V: Value> LayerIterator<K, V>
587    for MergerIterator<'a, '_, K, V>
588{
589    async fn advance(&mut self) -> Result<(), Error> {
590        let current_key;
591        let mut next_key = None;
592        let mut next_key_bound = Bound::Unbounded;
593        if !self.pending_iterators.is_empty() {
594            if let Some(ItemRef { key, .. }) = self.get() {
595                next_key = key.next_key();
596                match next_key {
597                    None => {
598                        // If there is no next key, we must now query all layers and the key we
599                        // search for is the immediate successor of the current key.
600                        current_key = Some(key.clone());
601                        next_key_bound = Bound::Excluded(current_key.as_ref().unwrap());
602                    }
603                    Some(ref key) => next_key_bound = Bound::Included(key),
604                }
605            }
606        }
607
608        // Advance the iterator for the current item and push it onto the heap, and also push any
609        // additional iterators onto the heap (by calling push_iterators).
610        if let Some(iterator) = self.item.take_iterator() {
611            if self.needs_more_iterators(next_key.as_ref(), next_key_bound) {
612                let existing_item = iterator.item().boxed();
613                iterator.advance().await?;
614                match &next_key {
615                    Some(next_key)
616                        if iterator.is_some()
617                            && iterator.key().cmp_lower_bound(next_key) != Ordering::Greater =>
618                    {
619                        // In this case, the key immediately following is a good candidate, and all
620                        // we need to do is merge it with existing iterators; we shouldn't need to
621                        // consult with any more iterators.
622                    }
623                    _ => {
624                        // We are going to need to consult more iterators so we need to go back to
625                        // the previous item so that we can merge with it.  See the comment in
626                        // advance_impl.
627                        iterator.replace(existing_item);
628
629                        // We must push other iterators here before pushing iterator onto the heap
630                        // because we know `iterator` would end up at the top of the heap.
631                        self.push_iterators(next_key.as_ref(), next_key_bound).await?;
632                    }
633                }
634            } else {
635                iterator.advance().await?;
636            }
637            if iterator.is_some() {
638                self.heap.push(iterator);
639            }
640        } else {
641            self.push_iterators(next_key.as_ref(), next_key_bound).await?;
642        }
643
644        self.advance_impl(next_key_bound).await
645    }
646
647    fn get(&self) -> Option<ItemRef<'_, K, V>> {
648        (&self.item).into()
649    }
650}
651
652struct MutMergeLayerIterator<'a, K, V>(MergeLayerIterator<'a, K, V>);
653
654impl<K, V> MutMergeLayerIterator<'_, K, V> {
655    fn advance(&mut self) {
656        if let MergeItem::Iter = self.item {
657            self.as_mut().advance();
658        }
659        self.set_item_from_iter();
660    }
661}
662
663impl<'a, K, V> AsMut<dyn LayerIteratorMut<K, V> + 'a> for MutMergeLayerIterator<'a, K, V> {
664    fn as_mut(&mut self) -> &mut (dyn LayerIteratorMut<K, V> + 'a) {
665        let RawIterator::Mut(iter) = &mut self.0.iter else { unreachable!() };
666        iter.as_mut()
667    }
668}
669
670impl<'a, K, V> Deref for MutMergeLayerIterator<'a, K, V> {
671    type Target = MergeLayerIterator<'a, K, V>;
672
673    fn deref(&self) -> &Self::Target {
674        &self.0
675    }
676}
677
678impl<K, V> DerefMut for MutMergeLayerIterator<'_, K, V> {
679    fn deref_mut(&mut self) -> &mut Self::Target {
680        &mut self.0
681    }
682}
683
684// Merges the given item into a mutable layer.
685pub(super) fn merge_into<K: Debug + OrdLowerBound, V: Debug>(
686    mut_iter: Box<dyn LayerIteratorMut<K, V> + '_>,
687    item: Item<K, V>,
688    merge_fn: MergeFn<K, V>,
689) -> Result<(), Error> {
690    let merge_item = if mut_iter.get().is_some() { MergeItem::Iter } else { MergeItem::None };
691    let mut mut_merge_iter = MutMergeLayerIterator(MergeLayerIterator {
692        layer: None,
693        iter: RawIterator::Mut(mut_iter),
694        layer_index: 1,
695        item: merge_item,
696    });
697    let mut item_merge_iter = MergeLayerIterator::new_with_item(0, MergeItem::Item(item.boxed()));
698    while mut_merge_iter.is_some() && item_merge_iter.is_some() {
699        if mut_merge_iter.0 > item_merge_iter {
700            // In this branch the mutable layer is left and the item we're merging-in is right.
701            let merge_result = merge_fn(&mut_merge_iter, &item_merge_iter);
702            debug!(
703                lhs:? = mut_merge_iter.key(),
704                rhs:? = item_merge_iter.key(),
705                result:? = merge_result;
706                "(1) merge");
707            match merge_result {
708                MergeResult::EmitLeft => {
709                    if let Some(item) = mut_merge_iter.take_item() {
710                        mut_merge_iter.as_mut().insert(*item);
711                        mut_merge_iter.set_item_from_iter();
712                    } else {
713                        mut_merge_iter.advance();
714                    }
715                }
716                MergeResult::Other { emit, left, right } => {
717                    if let Some(emit) = emit {
718                        mut_merge_iter.as_mut().insert(*emit);
719                    }
720                    match left {
721                        ItemOp::Keep => {}
722                        ItemOp::Discard => {
723                            if matches!(mut_merge_iter.item, MergeItem::Iter) {
724                                mut_merge_iter.as_mut().erase();
725                            }
726                            mut_merge_iter.set_item_from_iter();
727                        }
728                        ItemOp::Replace(item) => {
729                            if let MergeItem::Iter = mut_merge_iter.item {
730                                mut_merge_iter.as_mut().erase();
731                            }
732                            mut_merge_iter.item = MergeItem::Item(item)
733                        }
734                    }
735                    match right {
736                        ItemOp::Keep => {}
737                        ItemOp::Discard => item_merge_iter.item = MergeItem::None,
738                        ItemOp::Replace(item) => item_merge_iter.item = MergeItem::Item(item),
739                    }
740                }
741            }
742        } else {
743            // In this branch, the item we're merging-in is left and the mutable layer is right.
744            let merge_result = merge_fn(&item_merge_iter, &mut_merge_iter);
745            debug!(
746                lhs:? = mut_merge_iter.key(),
747                rhs:? = item_merge_iter.key(),
748                result:? = merge_result;
749                "(2) merge");
750            match merge_result {
751                MergeResult::EmitLeft => break, // Item is inserted outside the loop
752                MergeResult::Other { emit, left, right } => {
753                    if let Some(emit) = emit {
754                        mut_merge_iter.as_mut().insert(*emit);
755                    }
756                    match left {
757                        ItemOp::Keep => {}
758                        ItemOp::Discard => item_merge_iter.item = MergeItem::None,
759                        ItemOp::Replace(item) => item_merge_iter.item = MergeItem::Item(item),
760                    }
761                    match right {
762                        ItemOp::Keep => {}
763                        ItemOp::Discard => {
764                            if matches!(mut_merge_iter.item, MergeItem::Iter) {
765                                mut_merge_iter.as_mut().erase();
766                            }
767                            mut_merge_iter.set_item_from_iter();
768                        }
769                        ItemOp::Replace(item) => {
770                            if let MergeItem::Iter = mut_merge_iter.item {
771                                mut_merge_iter.as_mut().erase();
772                            }
773                            mut_merge_iter.item = MergeItem::Item(item)
774                        }
775                    }
776                }
777            }
778        }
779    } // while ...
780
781    // The only way we could get here with both items is via the break above, so we know the correct
782    // order required here.
783    if let MergeItem::Item(item) = item_merge_iter.item {
784        mut_merge_iter.as_mut().insert(*item);
785    }
786    if let Some(item) = mut_merge_iter.take_item() {
787        mut_merge_iter.as_mut().insert(*item);
788    }
789    if let RawIterator::Mut(mut iter) = mut_merge_iter.0.iter {
790        iter.commit();
791    }
792    Ok(())
793}
794
795#[cfg(test)]
796mod tests {
797    use super::ItemOp::{Discard, Keep, Replace};
798    use super::{MergeLayerIterator, MergeResult, Merger};
799    use crate::lsm_tree::persistent_layer::{PersistentLayer, PersistentLayerWriter};
800    use crate::lsm_tree::skip_list_layer::SkipListLayer;
801    use crate::lsm_tree::types::{
802        FuzzyHash, Item, ItemRef, Key, Layer, LayerIterator, LayerKey, LayerWriter, MergeType,
803        OrdLowerBound, OrdUpperBound, SortByU64,
804    };
805    use crate::lsm_tree::{self, Query, Value};
806    use crate::object_store::{self, ObjectKey, ObjectValue, VOLUME_DATA_KEY_ID};
807    use crate::serialized_types::{
808        LATEST_VERSION, Version, Versioned, VersionedLatest, versioned_type,
809    };
810    use crate::testing::fake_object::{FakeObject, FakeObjectHandle};
811    use crate::testing::writer::Writer;
812    use fprint::TypeFingerprint;
813    use fxfs_macros::FuzzyHash;
814    use rand::Rng;
815    use std::hash::Hash;
816    use std::ops::{Bound, Range};
817    use std::sync::Arc;
818
819    #[derive(
820        Clone,
821        Eq,
822        Hash,
823        FuzzyHash,
824        PartialEq,
825        Debug,
826        serde::Serialize,
827        serde::Deserialize,
828        TypeFingerprint,
829        Versioned,
830    )]
831    struct TestKey(Range<u64>);
832
833    impl Value for i32 {
834        const DELETED_MARKER: Self = 0;
835    }
836
837    versioned_type! { 1.. => TestKey }
838
839    impl SortByU64 for TestKey {
840        fn get_leading_u64(&self) -> u64 {
841            self.0.start
842        }
843    }
844
845    impl LayerKey for TestKey {
846        fn merge_type(&self) -> MergeType {
847            MergeType::OptimizedMerge
848        }
849
850        fn next_key(&self) -> Option<Self> {
851            Some(TestKey(self.0.end..self.0.end + 1))
852        }
853    }
854
855    impl OrdUpperBound for TestKey {
856        fn cmp_upper_bound(&self, other: &TestKey) -> std::cmp::Ordering {
857            self.0.end.cmp(&other.0.end)
858        }
859    }
860
861    impl OrdLowerBound for TestKey {
862        fn cmp_lower_bound(&self, other: &Self) -> std::cmp::Ordering {
863            self.0.start.cmp(&other.0.start)
864        }
865    }
866
867    fn layer_ref_iter<K: Key, V: Value>(
868        layers: &[Arc<SkipListLayer<K, V>>],
869    ) -> impl Iterator<Item = &dyn Layer<K, V>> {
870        layers.iter().map(|x| x.as_ref() as &dyn Layer<K, V>)
871    }
872
873    fn dyn_layer_ref_iter<K: Key, V: Value>(
874        layers: &[Arc<dyn Layer<K, V>>],
875    ) -> impl Iterator<Item = &dyn Layer<K, V>> {
876        layers.iter().map(|x| x.as_ref())
877    }
878
879    fn counters() -> Arc<lsm_tree::TreeCounters> {
880        Arc::new(lsm_tree::TreeCounters::default())
881    }
882
883    #[fuchsia::test]
884    async fn test_emit_left() {
885        let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
886        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
887        skip_lists[0].insert(items[1].clone()).expect("insert error");
888        skip_lists[1].insert(items[0].clone()).expect("insert error");
889        let mut merger = Merger::new(
890            layer_ref_iter(&skip_lists),
891            |_left, _right| MergeResult::EmitLeft,
892            counters(),
893        );
894        let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
895        let ItemRef { key, value, .. } = iter.get().expect("missing item");
896        assert_eq!((key, value), (&items[0].key, &items[0].value));
897        iter.advance().await.unwrap();
898        let ItemRef { key, value, .. } = iter.get().expect("missing item");
899        assert_eq!((key, value), (&items[1].key, &items[1].value));
900        iter.advance().await.unwrap();
901        assert!(iter.get().is_none());
902    }
903
904    #[fuchsia::test]
905    async fn test_other_emit() {
906        let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
907        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
908        skip_lists[0].insert(items[1].clone()).expect("insert error");
909        skip_lists[1].insert(items[0].clone()).expect("insert error");
910        let mut merger = Merger::new(
911            layer_ref_iter(&skip_lists),
912            |_left, _right| MergeResult::Other {
913                emit: Some(Item::new(TestKey(3..3), 3).boxed()),
914                left: Discard,
915                right: Discard,
916            },
917            counters(),
918        );
919        let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
920
921        let ItemRef { key, value, .. } = iter.get().expect("missing item");
922        assert_eq!((key, value), (&TestKey(3..3), &3));
923        iter.advance().await.unwrap();
924        assert!(iter.get().is_none());
925    }
926
927    #[fuchsia::test]
928    async fn test_replace_left() {
929        let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
930        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
931        skip_lists[0].insert(items[1].clone()).expect("insert error");
932        skip_lists[1].insert(items[0].clone()).expect("insert error");
933        let mut merger = Merger::new(
934            layer_ref_iter(&skip_lists),
935            |_left, _right| MergeResult::Other {
936                emit: None,
937                left: Replace(Item::new(TestKey(3..3), 3).boxed()),
938                right: Discard,
939            },
940            counters(),
941        );
942        let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
943
944        // The merger should replace the left item and then after discarding the right item, it
945        // should emit the replacement.
946        let ItemRef { key, value, .. } = iter.get().expect("missing item");
947        assert_eq!((key, value), (&TestKey(3..3), &3));
948        iter.advance().await.unwrap();
949        assert!(iter.get().is_none());
950    }
951
952    #[fuchsia::test]
953    async fn test_replace_right() {
954        let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
955        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
956        skip_lists[0].insert(items[1].clone()).expect("insert error");
957        skip_lists[1].insert(items[0].clone()).expect("insert error");
958        let mut merger = Merger::new(
959            layer_ref_iter(&skip_lists),
960            |_left, _right| MergeResult::Other {
961                emit: None,
962                left: Discard,
963                right: Replace(Item::new(TestKey(3..3), 3).boxed()),
964            },
965            counters(),
966        );
967        let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
968
969        // The merger should replace the right item and then after discarding the left item, it
970        // should emit the replacement.
971        let ItemRef { key, value, .. } = iter.get().expect("missing item");
972        assert_eq!((key, value), (&TestKey(3..3), &3));
973        iter.advance().await.unwrap();
974        assert!(iter.get().is_none());
975    }
976
977    #[fuchsia::test]
978    async fn test_left_less_than_right() {
979        let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
980        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
981        skip_lists[0].insert(items[1].clone()).expect("insert error");
982        skip_lists[1].insert(items[0].clone()).expect("insert error");
983        let mut merger = Merger::new(
984            layer_ref_iter(&skip_lists),
985            |left, right| {
986                assert_eq!((left.key(), left.value()), (&TestKey(1..1), &1));
987                assert_eq!((right.key(), right.value()), (&TestKey(2..2), &2));
988                MergeResult::EmitLeft
989            },
990            counters(),
991        );
992        merger.query(Query::FullScan).await.expect("seek failed");
993    }
994
995    #[fuchsia::test]
996    async fn test_left_equals_right() {
997        let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
998        let item = Item::new(TestKey(1..1), 1);
999        skip_lists[0].insert(item.clone()).expect("insert error");
1000        skip_lists[1].insert(item.clone()).expect("insert error");
1001        let mut merger = Merger::new(
1002            layer_ref_iter(&skip_lists),
1003            |left, right| {
1004                assert_eq!((left.key(), left.value()), (&TestKey(1..1), &1));
1005                assert_eq!((right.key(), right.value()), (&TestKey(1..1), &1));
1006                assert_eq!(left.layer_index, 0);
1007                assert_eq!(right.layer_index, 1);
1008                MergeResult::EmitLeft
1009            },
1010            counters(),
1011        );
1012        merger.query(Query::FullScan).await.expect("seek failed");
1013    }
1014
1015    #[fuchsia::test]
1016    async fn test_keep() {
1017        let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1018        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
1019        skip_lists[0].insert(items[1].clone()).expect("insert error");
1020        skip_lists[1].insert(items[0].clone()).expect("insert error");
1021        let mut merger = Merger::new(
1022            layer_ref_iter(&skip_lists),
1023            |left, right| {
1024                if left.key() == &TestKey(1..1) {
1025                    MergeResult::Other {
1026                        emit: None,
1027                        left: Replace(Item::new(TestKey(3..3), 3).boxed()),
1028                        right: Keep,
1029                    }
1030                } else {
1031                    assert_eq!(left.key(), &TestKey(2..2));
1032                    assert_eq!(right.key(), &TestKey(3..3));
1033                    MergeResult::Other { emit: None, left: Discard, right: Keep }
1034                }
1035            },
1036            counters(),
1037        );
1038        let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
1039
1040        // The merger should first replace left and then it should call the merger again with 2 & 3
1041        // and end up just keeping 3.
1042        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1043        assert_eq!((key, value), (&TestKey(3..3), &3));
1044        iter.advance().await.unwrap();
1045        assert!(iter.get().is_none());
1046    }
1047
1048    #[fuchsia::test]
1049    async fn test_merge_10_layers() {
1050        let skip_lists: Vec<_> = (0..10).map(|_| SkipListLayer::new(100)).collect();
1051        let mut rng = rand::rng();
1052        for i in 0..100 {
1053            skip_lists[rng.random_range(0..10) as usize]
1054                .insert(Item::new(TestKey(i..i), i))
1055                .expect("insert error");
1056        }
1057        let mut merger = Merger::new(
1058            layer_ref_iter(&skip_lists),
1059            |_left, _right| MergeResult::EmitLeft,
1060            counters(),
1061        );
1062        let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
1063
1064        for i in 0..100 {
1065            let ItemRef { key, value, .. } = iter.get().expect("missing item");
1066            assert_eq!((key, value), (&TestKey(i..i), &i));
1067            iter.advance().await.unwrap();
1068        }
1069        assert!(iter.get().is_none());
1070    }
1071
1072    #[fuchsia::test]
1073    async fn test_merge_uses_cmp_lower_bound() {
1074        let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1075        let items = [Item::new(TestKey(1..10), 1), Item::new(TestKey(2..3), 2)];
1076        skip_lists[0].insert(items[1].clone()).expect("insert error");
1077        skip_lists[1].insert(items[0].clone()).expect("insert error");
1078        let mut merger = Merger::new(
1079            layer_ref_iter(&skip_lists),
1080            |_left, _right| MergeResult::EmitLeft,
1081            counters(),
1082        );
1083        let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
1084
1085        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1086        assert_eq!((key, value), (&items[0].key, &items[0].value));
1087        iter.advance().await.unwrap();
1088        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1089        assert_eq!((key, value), (&items[1].key, &items[1].value));
1090        iter.advance().await.unwrap();
1091        assert!(iter.get().is_none());
1092    }
1093
1094    #[fuchsia::test]
1095    async fn test_merge_into_emit_left() {
1096        let skip_list = SkipListLayer::new(100);
1097        let items =
1098            [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2), Item::new(TestKey(3..3), 3)];
1099        skip_list.insert(items[0].clone()).expect("insert error");
1100        skip_list.insert(items[2].clone()).expect("insert error");
1101        skip_list
1102            .merge_into(items[1].clone(), &items[0].key, |_left, _right| MergeResult::EmitLeft);
1103
1104        let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1105        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1106        assert_eq!((key, value), (&items[0].key, &items[0].value));
1107        iter.advance().await.unwrap();
1108        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1109        assert_eq!((key, value), (&items[1].key, &items[1].value));
1110        iter.advance().await.unwrap();
1111        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1112        assert_eq!((key, value), (&items[2].key, &items[2].value));
1113        iter.advance().await.unwrap();
1114        assert!(iter.get().is_none());
1115    }
1116
1117    #[fuchsia::test]
1118    async fn test_merge_into_emit_last_after_replacing() {
1119        let skip_list = SkipListLayer::new(100);
1120        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
1121        skip_list.insert(items[0].clone()).expect("insert error");
1122
1123        skip_list.merge_into(items[1].clone(), &items[0].key, |left, right| {
1124            if left.key() == &TestKey(1..1) {
1125                assert_eq!(right.key(), &TestKey(2..2));
1126                MergeResult::Other {
1127                    emit: None,
1128                    left: Replace(Item::new(TestKey(3..3), 3).boxed()),
1129                    right: Keep,
1130                }
1131            } else {
1132                assert_eq!(left.key(), &TestKey(2..2));
1133                assert_eq!(right.key(), &TestKey(3..3));
1134                MergeResult::EmitLeft
1135            }
1136        });
1137
1138        let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1139        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1140        assert_eq!((key, value), (&items[1].key, &items[1].value));
1141        iter.advance().await.unwrap();
1142        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1143        assert_eq!((key, value), (&TestKey(3..3), &3));
1144        iter.advance().await.unwrap();
1145        assert!(iter.get().is_none());
1146    }
1147
1148    #[fuchsia::test]
1149    async fn test_merge_into_emit_left_after_replacing() {
1150        let skip_list = SkipListLayer::new(100);
1151        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(3..3), 3)];
1152        skip_list.insert(items[0].clone()).expect("insert error");
1153
1154        skip_list.merge_into(items[1].clone(), &items[0].key, |left, right| {
1155            if left.key() == &TestKey(1..1) {
1156                assert_eq!(right.key(), &TestKey(3..3));
1157                MergeResult::Other {
1158                    emit: None,
1159                    left: Replace(Item::new(TestKey(2..2), 2).boxed()),
1160                    right: Keep,
1161                }
1162            } else {
1163                assert_eq!(left.key(), &TestKey(2..2));
1164                assert_eq!(right.key(), &TestKey(3..3));
1165                MergeResult::EmitLeft
1166            }
1167        });
1168
1169        let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1170        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1171        assert_eq!((key, value), (&TestKey(2..2), &2));
1172        iter.advance().await.unwrap();
1173        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1174        assert_eq!((key, value), (&items[1].key, &items[1].value));
1175        iter.advance().await.unwrap();
1176        assert!(iter.get().is_none());
1177    }
1178
1179    // This tests emitting in both branches of merge_into, and most of the discard paths.
1180    #[fuchsia::test]
1181    async fn test_merge_into_emit_other_and_discard() {
1182        let skip_list = SkipListLayer::new(100);
1183        let items =
1184            [Item::new(TestKey(1..1), 1), Item::new(TestKey(3..3), 3), Item::new(TestKey(5..5), 3)];
1185        skip_list.insert(items[0].clone()).expect("insert error");
1186        skip_list.insert(items[2].clone()).expect("insert error");
1187
1188        skip_list.merge_into(items[1].clone(), &items[0].key, |left, right| {
1189            if left.key() == &TestKey(1..1) {
1190                // This tests the top branch in merge_into.
1191                assert_eq!(right.key(), &TestKey(3..3));
1192                MergeResult::Other {
1193                    emit: Some(Item::new(TestKey(2..2), 2).boxed()),
1194                    left: Discard,
1195                    right: Keep,
1196                }
1197            } else {
1198                // This tests the bottom branch in merge_into.
1199                assert_eq!(left.key(), &TestKey(3..3));
1200                assert_eq!(right.key(), &TestKey(5..5));
1201                MergeResult::Other {
1202                    emit: Some(Item::new(TestKey(4..4), 4).boxed()),
1203                    left: Discard,
1204                    right: Discard,
1205                }
1206            }
1207        });
1208
1209        let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1210        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1211        assert_eq!((key, value), (&TestKey(2..2), &2));
1212        iter.advance().await.unwrap();
1213        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1214        assert_eq!((key, value), (&TestKey(4..4), &4));
1215        iter.advance().await.unwrap();
1216        assert!(iter.get().is_none());
1217    }
1218
1219    // This tests replacing the item and discarding the right item (the one remaining untested
1220    // discard path) in the top branch in merge_into.
1221    #[fuchsia::test]
1222    async fn test_merge_into_replace_and_discard() {
1223        let skip_list = SkipListLayer::new(100);
1224        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(3..3), 3)];
1225        skip_list.insert(items[0].clone()).expect("insert error");
1226
1227        skip_list.merge_into(items[1].clone(), &items[0].key, |_left, _right| MergeResult::Other {
1228            emit: Some(Item::new(TestKey(2..2), 2).boxed()),
1229            left: Replace(Item::new(TestKey(4..4), 4).boxed()),
1230            right: Discard,
1231        });
1232
1233        let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1234        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1235        assert_eq!((key, value), (&TestKey(2..2), &2));
1236        iter.advance().await.unwrap();
1237        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1238        assert_eq!((key, value), (&TestKey(4..4), &4));
1239        iter.advance().await.unwrap();
1240        assert!(iter.get().is_none());
1241    }
1242
1243    // This tests replacing the right item in the top branch of merge_into and the left item in the
1244    // bottom branch of merge_into.
1245    #[fuchsia::test]
1246    async fn test_merge_into_replace_merge_item() {
1247        let skip_list = SkipListLayer::new(100);
1248        let items =
1249            [Item::new(TestKey(1..1), 1), Item::new(TestKey(3..3), 3), Item::new(TestKey(5..5), 5)];
1250        skip_list.insert(items[0].clone()).expect("insert error");
1251        skip_list.insert(items[2].clone()).expect("insert error");
1252
1253        skip_list.merge_into(items[1].clone(), &items[0].key, |_left, right| {
1254            if right.key() == &TestKey(3..3) {
1255                MergeResult::Other {
1256                    emit: None,
1257                    left: Discard,
1258                    right: Replace(Item::new(TestKey(2..2), 2).boxed()),
1259                }
1260            } else {
1261                assert_eq!(right.key(), &TestKey(5..5));
1262                MergeResult::Other {
1263                    emit: None,
1264                    left: Replace(Item::new(TestKey(4..4), 4).boxed()),
1265                    right: Discard,
1266                }
1267            }
1268        });
1269
1270        let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1271        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1272        assert_eq!((key, value), (&TestKey(4..4), &4));
1273        iter.advance().await.unwrap();
1274        assert!(iter.get().is_none());
1275    }
1276
1277    // This tests replacing the right item in the bottom branch of merge_into.
1278    #[fuchsia::test]
1279    async fn test_merge_into_replace_existing() {
1280        let skip_list = SkipListLayer::new(100);
1281        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(3..3), 3)];
1282        skip_list.insert(items[1].clone()).expect("insert error");
1283
1284        skip_list.merge_into(items[0].clone(), &items[0].key, |_left, right| {
1285            if right.key() == &TestKey(3..3) {
1286                MergeResult::Other {
1287                    emit: None,
1288                    left: Keep,
1289                    right: Replace(Item::new(TestKey(2..2), 2).boxed()),
1290                }
1291            } else {
1292                MergeResult::EmitLeft
1293            }
1294        });
1295
1296        let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1297        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1298        assert_eq!((key, value), (&items[0].key, &items[0].value));
1299        iter.advance().await.unwrap();
1300        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1301        assert_eq!((key, value), (&TestKey(2..2), &2));
1302        iter.advance().await.unwrap();
1303        assert!(iter.get().is_none());
1304    }
1305
1306    #[fuchsia::test]
1307    async fn test_merge_into_discard_last() {
1308        let skip_list = SkipListLayer::new(100);
1309        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
1310        skip_list.insert(items[0].clone()).expect("insert error");
1311
1312        skip_list.merge_into(items[1].clone(), &items[0].key, |_left, _right| MergeResult::Other {
1313            emit: None,
1314            left: Discard,
1315            right: Keep,
1316        });
1317
1318        let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1319        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1320        assert_eq!((key, value), (&items[1].key, &items[1].value));
1321        iter.advance().await.unwrap();
1322        assert!(iter.get().is_none());
1323    }
1324
1325    #[fuchsia::test]
1326    async fn test_merge_into_empty() {
1327        let skip_list = SkipListLayer::new(100);
1328        let items = [Item::new(TestKey(1..1), 1)];
1329
1330        skip_list.merge_into(items[0].clone(), &items[0].key, |_left, _right| {
1331            panic!("Unexpected merge!");
1332        });
1333
1334        let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1335        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1336        assert_eq!((key, value), (&items[0].key, &items[0].value));
1337        iter.advance().await.unwrap();
1338        assert!(iter.get().is_none());
1339    }
1340
1341    #[fuchsia::test]
1342    async fn test_seek_uses_minimum_number_of_iterators() {
1343        let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1344        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(1..1), 2)];
1345        skip_lists[0].insert(items[0].clone()).expect("insert error");
1346        skip_lists[1].insert(items[1].clone()).expect("insert error");
1347        let mut merger = Merger::new(
1348            layer_ref_iter(&skip_lists),
1349            |_left, _right| MergeResult::Other { emit: None, left: Discard, right: Keep },
1350            counters(),
1351        );
1352        let iter = merger.query(Query::FullRange(&items[0].key)).await.expect("seek failed");
1353
1354        // Seek should only search in the first skip list, so no merge should take place, and we'll
1355        // know if it has because we'll see a different value (2 rather than 1).
1356        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1357        assert_eq!((key, value), (&items[0].key, &items[0].value));
1358    }
1359
1360    // Checks that merging the given layers produces |expected| sequence of items starting from
1361    // |start|.
1362    async fn test_advance<K: Eq + Key + LayerKey + OrdLowerBound>(
1363        layers: &[&[(K, i32)]],
1364        query: Query<'_, K>,
1365        expected: &[(K, i32)],
1366    ) {
1367        let mut skip_lists = Vec::new();
1368        for &layer in layers {
1369            let skip_list = SkipListLayer::new(100);
1370            for (k, v) in layer {
1371                skip_list.insert(Item::new(k.clone(), *v)).expect("insert error");
1372            }
1373            skip_lists.push(skip_list);
1374        }
1375        let mut merger = Merger::new(
1376            layer_ref_iter(&skip_lists),
1377            |_left, _right| MergeResult::EmitLeft,
1378            counters(),
1379        );
1380        let mut iter = merger.query(query).await.expect("seek failed");
1381        for (k, v) in expected {
1382            let ItemRef { key, value, .. } = iter.get().expect("get failed");
1383            assert_eq!((key, value), (k, v));
1384            iter.advance().await.expect("advance failed");
1385        }
1386        assert!(iter.get().is_none());
1387    }
1388
1389    #[fuchsia::test]
1390    async fn test_seek_skips_replaced_items() {
1391        // The 1..2 and the 2..3 items are overwritten and merging them should be skipped.
1392        test_advance(
1393            &[
1394                &[(TestKey(1..2), 1), (TestKey(2..3), 2), (TestKey(4..5), 3)],
1395                &[(TestKey(1..2), 4), (TestKey(2..3), 5), (TestKey(3..4), 6)],
1396            ],
1397            Query::FullRange(&TestKey(1..2)),
1398            &[(TestKey(1..2), 1), (TestKey(2..3), 2), (TestKey(3..4), 6), (TestKey(4..5), 3)],
1399        )
1400        .await;
1401    }
1402
1403    #[fuchsia::test]
1404    async fn test_advance_skips_replaced_items_at_end() {
1405        // Like the last test, the 1..2 item is overwritten and seeking for it should skip the merge
1406        // but this time, the keys are at the end.
1407        test_advance(
1408            &[&[(TestKey(1..2), 1)], &[(TestKey(1..2), 2)]],
1409            Query::FullRange(&TestKey(1..2)),
1410            &[(TestKey(1..2), 1)],
1411        )
1412        .await;
1413    }
1414
1415    #[derive(
1416        Clone,
1417        Eq,
1418        Hash,
1419        FuzzyHash,
1420        PartialEq,
1421        Debug,
1422        serde::Serialize,
1423        serde::Deserialize,
1424        TypeFingerprint,
1425        Versioned,
1426    )]
1427    struct TestKeyWithFullMerge(Range<u64>);
1428
1429    versioned_type! { 1.. => TestKeyWithFullMerge }
1430
1431    impl LayerKey for TestKeyWithFullMerge {
1432        fn merge_type(&self) -> MergeType {
1433            MergeType::FullMerge
1434        }
1435    }
1436
1437    impl SortByU64 for TestKeyWithFullMerge {
1438        fn get_leading_u64(&self) -> u64 {
1439            self.0.start
1440        }
1441    }
1442
1443    impl OrdUpperBound for TestKeyWithFullMerge {
1444        fn cmp_upper_bound(&self, other: &TestKeyWithFullMerge) -> std::cmp::Ordering {
1445            self.0.end.cmp(&other.0.end)
1446        }
1447    }
1448
1449    impl OrdLowerBound for TestKeyWithFullMerge {
1450        fn cmp_lower_bound(&self, other: &Self) -> std::cmp::Ordering {
1451            self.0.start.cmp(&other.0.start)
1452        }
1453    }
1454
1455    // Same set up as `test_seek_skips_replaced_items` except here nothing should skip. Any seek
1456    // should just place the iterator at a particular spot in the merged layers.
1457    #[fuchsia::test]
1458    async fn test_full_merge_consistent_advance_ordering() {
1459        let layer_set = [
1460            [
1461                (TestKeyWithFullMerge(1..2), 1i32),
1462                (TestKeyWithFullMerge(2..3), 2i32),
1463                (TestKeyWithFullMerge(4..5), 3i32),
1464            ]
1465            .as_slice(),
1466            [
1467                (TestKeyWithFullMerge(1..2), 4i32),
1468                (TestKeyWithFullMerge(2..3), 5i32),
1469                (TestKeyWithFullMerge(3..4), 6i32),
1470            ]
1471            .as_slice(),
1472        ];
1473
1474        let full_merge_result = [
1475            (TestKeyWithFullMerge(1..2), 1),
1476            (TestKeyWithFullMerge(1..2), 4),
1477            (TestKeyWithFullMerge(2..3), 2),
1478            (TestKeyWithFullMerge(2..3), 5),
1479            (TestKeyWithFullMerge(3..4), 6),
1480            (TestKeyWithFullMerge(4..5), 3),
1481        ];
1482
1483        test_advance(layer_set.as_slice(), Query::FullScan, &full_merge_result).await;
1484
1485        test_advance(
1486            layer_set.as_slice(),
1487            Query::FullRange(&TestKeyWithFullMerge(1..2)),
1488            &full_merge_result,
1489        )
1490        .await;
1491
1492        test_advance(
1493            layer_set.as_slice(),
1494            Query::FullRange(&TestKeyWithFullMerge(2..3)),
1495            &full_merge_result[2..],
1496        )
1497        .await;
1498
1499        test_advance(
1500            layer_set.as_slice(),
1501            Query::FullRange(&TestKeyWithFullMerge(3..4)),
1502            &full_merge_result[4..],
1503        )
1504        .await;
1505    }
1506
1507    #[fuchsia::test]
1508    async fn test_full_merge_always_consult_all_layers() {
1509        // | 1 |   |
1510        // |   | 2 |
1511        // | 3 | 4 |
1512        let skip_lists =
1513            [SkipListLayer::new(100), SkipListLayer::new(100), SkipListLayer::new(100)];
1514        let items = [
1515            Item::new(TestKeyWithFullMerge(1..2), 1),
1516            Item::new(TestKeyWithFullMerge(2..3), 2),
1517            Item::new(TestKeyWithFullMerge(1..2), 3),
1518            Item::new(TestKeyWithFullMerge(2..3), 4),
1519        ];
1520        skip_lists[0].insert(items[0].clone()).expect("insert error");
1521        skip_lists[1].insert(items[1].clone()).expect("insert error");
1522        skip_lists[2].insert(items[2].clone()).expect("insert error");
1523        skip_lists[2].insert(items[3].clone()).expect("insert error");
1524        let mut merger = Merger::new(
1525            layer_ref_iter(&skip_lists),
1526            |left, right| {
1527                // Sum matching keys.
1528                if left.key() == right.key() {
1529                    MergeResult::Other {
1530                        emit: None,
1531                        left: Discard,
1532                        right: Replace(
1533                            Item::new(left.key().clone(), left.value() + right.value()).boxed(),
1534                        ),
1535                    }
1536                } else {
1537                    MergeResult::EmitLeft
1538                }
1539            },
1540            counters(),
1541        );
1542        let mut iter = merger.query(Query::FullRange(&items[0].key)).await.expect("seek failed");
1543
1544        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1545        assert_eq!((key, *value), (&items[0].key, items[0].value + items[2].value));
1546        iter.advance().await.expect("advance");
1547        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1548        assert_eq!((key, *value), (&items[1].key, items[1].value + items[3].value));
1549        iter.advance().await.expect("advance");
1550        assert!(iter.get().is_none());
1551    }
1552
1553    #[derive(
1554        Clone,
1555        Eq,
1556        Hash,
1557        FuzzyHash,
1558        PartialEq,
1559        Debug,
1560        serde::Serialize,
1561        serde::Deserialize,
1562        TypeFingerprint,
1563        Versioned,
1564    )]
1565    struct TestKeyWithDefaultLayerKey(Range<u64>);
1566
1567    versioned_type! { 1.. => TestKeyWithDefaultLayerKey }
1568
1569    // Default layer key is using `MergeType::FullMerge` and returns None for `next_key()`.
1570    impl LayerKey for TestKeyWithDefaultLayerKey {}
1571
1572    impl SortByU64 for TestKeyWithDefaultLayerKey {
1573        fn get_leading_u64(&self) -> u64 {
1574            self.0.start
1575        }
1576    }
1577
1578    impl OrdUpperBound for TestKeyWithDefaultLayerKey {
1579        fn cmp_upper_bound(&self, other: &TestKeyWithDefaultLayerKey) -> std::cmp::Ordering {
1580            self.0.end.cmp(&other.0.end)
1581        }
1582    }
1583
1584    impl OrdLowerBound for TestKeyWithDefaultLayerKey {
1585        fn cmp_lower_bound(&self, other: &Self) -> std::cmp::Ordering {
1586            self.0.start.cmp(&other.0.start)
1587        }
1588    }
1589
1590    #[fuchsia::test]
1591    async fn test_no_merge_unbounded_include_all_layers() {
1592        test_advance(
1593            &[
1594                &[
1595                    (TestKeyWithDefaultLayerKey(1..2), 1),
1596                    (TestKeyWithDefaultLayerKey(2..3), 2),
1597                    (TestKeyWithDefaultLayerKey(4..5), 3),
1598                ],
1599                &[
1600                    (TestKeyWithDefaultLayerKey(1..2), 4),
1601                    (TestKeyWithDefaultLayerKey(2..3), 5),
1602                    (TestKeyWithDefaultLayerKey(3..4), 6),
1603                ],
1604            ],
1605            Query::FullScan,
1606            &[
1607                (TestKeyWithDefaultLayerKey(1..2), 1),
1608                (TestKeyWithDefaultLayerKey(1..2), 4),
1609                (TestKeyWithDefaultLayerKey(2..3), 2),
1610                (TestKeyWithDefaultLayerKey(2..3), 5),
1611                (TestKeyWithDefaultLayerKey(3..4), 6),
1612                (TestKeyWithDefaultLayerKey(4..5), 3),
1613            ],
1614        )
1615        .await;
1616    }
1617
1618    #[fuchsia::test]
1619    async fn test_no_merge_proceeds_comprehensively_after_seek() {
1620        test_advance(
1621            &[
1622                &[
1623                    (TestKeyWithDefaultLayerKey(1..2), 1),
1624                    (TestKeyWithDefaultLayerKey(2..3), 2),
1625                    (TestKeyWithDefaultLayerKey(4..5), 3),
1626                ],
1627                &[
1628                    (TestKeyWithDefaultLayerKey(1..2), 4),
1629                    (TestKeyWithDefaultLayerKey(2..3), 5),
1630                    (TestKeyWithDefaultLayerKey(3..4), 6),
1631                ],
1632            ],
1633            Query::FullRange(&TestKeyWithDefaultLayerKey(1..2)),
1634            &[
1635                (TestKeyWithDefaultLayerKey(1..2), 1),
1636                (TestKeyWithDefaultLayerKey(1..2), 4),
1637                (TestKeyWithDefaultLayerKey(2..3), 2),
1638                (TestKeyWithDefaultLayerKey(2..3), 5),
1639                (TestKeyWithDefaultLayerKey(3..4), 6),
1640                (TestKeyWithDefaultLayerKey(4..5), 3),
1641            ],
1642        )
1643        .await;
1644    }
1645
1646    #[fuchsia::test]
1647    async fn test_no_merge_seek_finds_lower_layer() {
1648        let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1649        let items = [
1650            Item::new(TestKeyWithDefaultLayerKey(2..3), 1),
1651            Item::new(TestKeyWithDefaultLayerKey(1..1), 2),
1652        ];
1653        skip_lists[0].insert(items[0].clone()).expect("insert error");
1654        skip_lists[1].insert(items[1].clone()).expect("insert error");
1655        let mut merger = Merger::new(
1656            layer_ref_iter(&skip_lists),
1657            |_left, _right| MergeResult::EmitLeft,
1658            counters(),
1659        );
1660        let iter = merger.query(Query::FullRange(&items[1].key)).await.expect("seek failed");
1661
1662        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1663        assert_eq!((key, value), (&items[1].key, &items[1].value));
1664    }
1665
1666    #[fuchsia::test]
1667    async fn test_no_merge_seek_stops_at_exact_match() {
1668        let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1669        let items = [
1670            Item::new(TestKeyWithDefaultLayerKey(2..3), 1),
1671            Item::new(TestKeyWithDefaultLayerKey(1..4), 2),
1672        ];
1673        skip_lists[0].insert(items[0].clone()).expect("insert error");
1674        skip_lists[1].insert(items[1].clone()).expect("insert error");
1675        let mut merger = Merger::new(
1676            layer_ref_iter(&skip_lists),
1677            |_left, _right| MergeResult::Other { emit: None, left: Discard, right: Keep },
1678            counters(),
1679        );
1680        let iter = merger.query(Query::FullRange(&items[0].key)).await.expect("seek failed");
1681
1682        // Seek should only search in the first skip list, so no merge should take place, and we'll
1683        // know if it has because we'll see a different value (2 rather than 1).
1684        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1685        assert_eq!((key, value), (&items[0].key, &items[0].value));
1686    }
1687
1688    #[fuchsia::test]
1689    async fn test_seek_less_than() {
1690        let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1691        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
1692        skip_lists[0].insert(items[0].clone()).expect("insert error");
1693        skip_lists[1].insert(items[1].clone()).expect("insert error");
1694        // Search for a key before 1..1.
1695        let mut merger = Merger::new(
1696            layer_ref_iter(&skip_lists),
1697            |_left, _right| MergeResult::Other { emit: None, left: Discard, right: Keep },
1698            counters(),
1699        );
1700        let iter = merger.query(Query::FullRange(&TestKey(0..0))).await.expect("seek failed");
1701
1702        // This should find the 2..2 key because of our merge function.
1703        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1704        assert_eq!((key, value), (&items[1].key, &items[1].value));
1705    }
1706
1707    #[fuchsia::test]
1708    async fn test_seek_to_end() {
1709        let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1710        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
1711        skip_lists[0].insert(items[0].clone()).expect("insert error");
1712        skip_lists[1].insert(items[1].clone()).expect("insert error");
1713        let mut merger = Merger::new(
1714            layer_ref_iter(&skip_lists),
1715            |_left, _right| MergeResult::Other { emit: None, left: Discard, right: Keep },
1716            counters(),
1717        );
1718        let iter = merger.query(Query::FullRange(&TestKey(3..3))).await.expect("seek failed");
1719
1720        assert!(iter.get().is_none());
1721    }
1722
1723    #[fuchsia::test]
1724    async fn test_merge_all_discarded() {
1725        let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1726        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
1727        skip_lists[0].insert(items[1].clone()).expect("insert error");
1728        skip_lists[1].insert(items[0].clone()).expect("insert error");
1729        let mut merger = Merger::new(
1730            layer_ref_iter(&skip_lists),
1731            |_left, _right| MergeResult::Other { emit: None, left: Discard, right: Discard },
1732            counters(),
1733        );
1734        let iter = merger.query(Query::FullScan).await.expect("seek failed");
1735        assert!(iter.get().is_none());
1736    }
1737
1738    #[fuchsia::test]
1739    async fn test_seek_with_merged_key_less_than() {
1740        let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1741        let items = [Item::new(TestKey(1..8), 1), Item::new(TestKey(2..10), 2)];
1742        skip_lists[0].insert(items[0].clone()).expect("insert error");
1743        skip_lists[1].insert(items[1].clone()).expect("insert error");
1744        let mut merger = Merger::new(
1745            layer_ref_iter(&skip_lists),
1746            |left, _right| {
1747                if left.key() == &TestKey(1..8) {
1748                    MergeResult::Other {
1749                        emit: None,
1750                        left: Replace(Item::new(TestKey(1..2), 1).boxed()),
1751                        right: Keep,
1752                    }
1753                } else {
1754                    MergeResult::EmitLeft
1755                }
1756            },
1757            counters(),
1758        );
1759        let iter = merger.query(Query::FullRange(&TestKey(0..3))).await.expect("seek failed");
1760        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1761        assert_eq!((key, value), (&items[1].key, &items[1].value));
1762    }
1763
1764    #[fuchsia::test]
1765    async fn test_overlapping_keys() {
1766        let skip_lists =
1767            [SkipListLayer::new(100), SkipListLayer::new(100), SkipListLayer::new(100)];
1768        let items = [
1769            Item::new(TestKey(0..10), 1),
1770            Item::new(TestKey(0..20), 2),
1771            Item::new(TestKey(0..30), 3),
1772        ];
1773        skip_lists[0].insert(items[0].clone()).expect("insert error");
1774        skip_lists[1].insert(items[1].clone()).expect("insert error");
1775        skip_lists[2].insert(items[2].clone()).expect("insert error");
1776        let mut merger = Merger::new(
1777            layer_ref_iter(&skip_lists),
1778            |left, right| {
1779                let result = if left.key().0.end <= right.key().0.start {
1780                    MergeResult::EmitLeft
1781                } else {
1782                    if left.key() == &TestKey(0..30) && right.key() == &TestKey(10..20) {
1783                        MergeResult::Other {
1784                            emit: Some(Item::new(TestKey(0..10), 1).boxed()),
1785                            left: Replace(Item::new(TestKey(10..30), 1).boxed()),
1786                            right: Keep,
1787                        }
1788                    } else {
1789                        MergeResult::Other {
1790                            emit: None,
1791                            left: Keep,
1792                            right: Replace(
1793                                Item::new(TestKey(left.key().0.end..right.key().0.end), 1).boxed(),
1794                            ),
1795                        }
1796                    }
1797                };
1798                result
1799            },
1800            counters(),
1801        );
1802        let mut iter = merger.query(Query::FullRange(&TestKey(0..1))).await.expect("seek failed");
1803        let ItemRef { key, .. } = iter.get().expect("missing item");
1804        assert_eq!(key, &TestKey(0..10));
1805        iter.advance().await.expect("advance failed");
1806        let ItemRef { key, .. } = iter.get().expect("missing item");
1807        assert_eq!(key, &TestKey(10..20));
1808        iter.advance().await.expect("advance failed");
1809        let ItemRef { key, .. } = iter.get().expect("missing item");
1810        assert_eq!(key, &TestKey(20..30));
1811        iter.advance().await.expect("advance failed");
1812        assert_eq!(iter.get(), None);
1813    }
1814
1815    async fn write_layer<K: Key, V: Value>(items: Vec<Item<K, V>>) -> Arc<dyn Layer<K, V>> {
1816        let object = Arc::new(FakeObject::new());
1817        let write_handle = FakeObjectHandle::new(object.clone());
1818        let mut writer =
1819            PersistentLayerWriter::<_, K, V>::new(Writer::new(&write_handle).await, 1, 512)
1820                .await
1821                .expect("PersistentLayerWriter::new failed");
1822        for item in items {
1823            writer.write(item.as_item_ref()).await.expect("write failed");
1824        }
1825        writer.flush().await.expect("flush failed");
1826        PersistentLayer::open(FakeObjectHandle::new(object))
1827            .await
1828            .expect("open_persistent_layer failed")
1829    }
1830
1831    fn merge_sum(
1832        left: &MergeLayerIterator<'_, i32, i32>,
1833        right: &MergeLayerIterator<'_, i32, i32>,
1834    ) -> MergeResult<i32, i32> {
1835        // Sum matching keys.
1836        if left.key() == right.key() {
1837            MergeResult::Other {
1838                emit: None,
1839                left: Discard,
1840                right: Replace(Item::new(left.key().clone(), left.value() + right.value()).boxed()),
1841            }
1842        } else {
1843            MergeResult::EmitLeft
1844        }
1845    }
1846
1847    #[fuchsia::test]
1848    async fn test_merge_bloom_filters_point_query() {
1849        let layer_0_items = vec![Item::new(1, 1), Item::new(2, 1)];
1850        let layer_1_items = vec![Item::new(2, 1), Item::new(4, 1)];
1851        let items = [Item::new(1, 1), Item::new(2, 2), Item::new(4, 1)];
1852        let layers: [Arc<dyn Layer<i32, i32>>; 2] =
1853            [write_layer(layer_0_items).await, write_layer(layer_1_items).await];
1854        let mut merger = Merger::new(dyn_layer_ref_iter(&layers), merge_sum, counters());
1855
1856        {
1857            // Key that exists in layer 0 only
1858            let iter = merger.query(Query::Point(&1)).await.expect("seek failed");
1859            let ItemRef { key, value, .. } = iter.get().expect("missing item");
1860            assert_eq!((key, *value), (&items[0].key, items[0].value));
1861        }
1862        {
1863            // Key that exists in layer 1 and 2
1864            let iter = merger.query(Query::Point(&2)).await.expect("seek failed");
1865            let ItemRef { key, value, .. } = iter.get().expect("missing item");
1866            assert_eq!((key, *value), (&items[1].key, items[1].value));
1867        }
1868        {
1869            // Key that exists in layer 1
1870            let iter = merger.query(Query::Point(&4)).await.expect("seek failed");
1871            let ItemRef { key, value, .. } = iter.get().expect("missing item");
1872            assert_eq!((key, *value), (&items[2].key, items[2].value));
1873        }
1874        {
1875            // Key that doesn't exist at all
1876            let iter = merger.query(Query::Point(&400)).await.expect("seek failed");
1877            assert!(iter.get().is_none());
1878        }
1879    }
1880
1881    #[fuchsia::test]
1882    async fn test_merge_bloom_filters_limited_range() {
1883        // NB: This test uses ObjectKey so that we don't have to reimplement the complex merging
1884        // logic for range-like keys.
1885        let layer_0_items = vec![Item::new(
1886            ObjectKey::extent(0, 0, 0..2048),
1887            ObjectValue::extent(0, VOLUME_DATA_KEY_ID),
1888        )];
1889        let layer_1_items = vec![
1890            Item::new(
1891                ObjectKey::extent(0, 0, 1024..4096),
1892                ObjectValue::extent(32768, VOLUME_DATA_KEY_ID),
1893            ),
1894            Item::new(
1895                ObjectKey::extent(0, 0, 16384..17408),
1896                ObjectValue::extent(65536, VOLUME_DATA_KEY_ID),
1897            ),
1898        ];
1899        let items = [
1900            Item::new(ObjectKey::extent(0, 0, 0..2048), ObjectValue::extent(0, VOLUME_DATA_KEY_ID)),
1901            Item::new(
1902                ObjectKey::extent(0, 0, 2048..4096),
1903                ObjectValue::extent(33792, VOLUME_DATA_KEY_ID),
1904            ),
1905            Item::new(
1906                ObjectKey::extent(0, 0, 16384..17408),
1907                ObjectValue::extent(65536, VOLUME_DATA_KEY_ID),
1908            ),
1909        ];
1910        let layers: [Arc<dyn Layer<ObjectKey, ObjectValue>>; 2] =
1911            [write_layer(layer_0_items).await, write_layer(layer_1_items).await];
1912        let mut merger =
1913            Merger::new(dyn_layer_ref_iter(&layers), object_store::merge::merge, counters());
1914
1915        {
1916            // Range contains just keys in layer 1
1917            let mut iter = merger
1918                .query(Query::LimitedRange(&ObjectKey::extent(0, 0, 16384..16386)))
1919                .await
1920                .expect("seek failed");
1921            let ItemRef { key, value, .. } = iter.get().expect("missing item");
1922            assert_eq!(key, &items[2].key);
1923            assert_eq!(value, &items[2].value);
1924            iter.advance().await.expect("advance");
1925            assert!(iter.get().is_none());
1926        }
1927        {
1928            // Range contains keys in layer 0 and 1
1929            let mut iter = merger
1930                .query(Query::LimitedRange(&ObjectKey::extent(0, 0, 0..4096)))
1931                .await
1932                .expect("seek failed");
1933            let ItemRef { key, value, .. } = iter.get().expect("missing item");
1934            assert_eq!(key, &items[0].key);
1935            assert_eq!(value, &items[0].value);
1936            iter.advance().await.expect("advance");
1937            let ItemRef { key, value, .. } = iter.get().expect("missing item");
1938            assert_eq!(key, &items[1].key);
1939            assert_eq!(value, &items[1].value);
1940            iter.advance().await.expect("advance");
1941        }
1942        {
1943            // Range contains no keys
1944            let mut iter = merger
1945                .query(Query::LimitedRange(&ObjectKey::extent(0, 0, 8192..12288)))
1946                .await
1947                .expect("seek failed");
1948            let ItemRef { key, .. } = iter.get().expect("missing item");
1949            assert_eq!(key, &items[2].key);
1950            iter.advance().await.expect("advance");
1951            assert!(iter.get().is_none());
1952        }
1953    }
1954
1955    #[fuchsia::test]
1956    async fn test_merge_bloom_filters_full_range() {
1957        let layer_0_items = vec![Item::new(1, 1), Item::new(2, 1)];
1958        let layer_1_items = vec![Item::new(2, 1), Item::new(4, 1)];
1959        let items = [Item::new(1, 1), Item::new(2, 2), Item::new(4, 1)];
1960        let layers: [Arc<dyn Layer<i32, i32>>; 2] =
1961            [write_layer(layer_0_items).await, write_layer(layer_1_items).await];
1962        let mut merger = Merger::new(dyn_layer_ref_iter(&layers), merge_sum, counters());
1963
1964        let mut iter = merger.query(Query::FullRange(&0)).await.expect("seek failed");
1965        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1966        assert_eq!((key, *value), (&items[0].key, items[0].value));
1967        iter.advance().await.expect("advance");
1968        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1969        assert_eq!((key, *value), (&items[1].key, items[1].value));
1970        iter.advance().await.expect("advance");
1971        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1972        assert_eq!((key, *value), (&items[2].key, items[2].value));
1973        iter.advance().await.expect("advance");
1974        assert!(iter.get().is_none());
1975    }
1976
1977    #[fuchsia::test]
1978    async fn test_merge_bloom_filters_full_scan() {
1979        let layer_0_items = vec![Item::new(1, 1), Item::new(2, 1)];
1980        let layer_1_items = vec![Item::new(2, 1), Item::new(4, 1)];
1981        let items = [Item::new(1, 1), Item::new(2, 2), Item::new(4, 1)];
1982        let layers: [Arc<dyn Layer<i32, i32>>; 2] =
1983            [write_layer(layer_0_items).await, write_layer(layer_1_items).await];
1984        let mut merger = Merger::new(dyn_layer_ref_iter(&layers), merge_sum, counters());
1985
1986        let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
1987        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1988        assert_eq!((key, *value), (&items[0].key, items[0].value));
1989        iter.advance().await.expect("advance");
1990        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1991        assert_eq!((key, *value), (&items[1].key, items[1].value));
1992        iter.advance().await.expect("advance");
1993        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1994        assert_eq!((key, *value), (&items[2].key, items[2].value));
1995        iter.advance().await.expect("advance");
1996        assert!(iter.get().is_none());
1997    }
1998}