fxfs/lsm_tree/
merge.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::log::*;
6use crate::lsm_tree;
7use crate::lsm_tree::types::{
8    BoxedItem, Item, ItemRef, Key, Layer, LayerIterator, LayerIteratorMut, LayerKey, MergeType,
9    OrdLowerBound, Value,
10};
11use anyhow::Error;
12use async_trait::async_trait;
13use fuchsia_sync::Mutex;
14use futures::try_join;
15use std::cmp::Ordering;
16use std::collections::BinaryHeap;
17use std::fmt::{Debug, Write};
18use std::ops::{Bound, Deref, DerefMut};
19use std::sync::Arc;
20
21#[derive(Debug, Eq, PartialEq)]
22pub enum ItemOp<K, V> {
23    /// Keeps the item to be presented to the merger subsequently with a new merge pair.
24    Keep,
25
26    /// Discards the item and moves on to the next item in the respective layer.
27    Discard,
28
29    /// Replaces the item with something new which will be presented to the merger subsequently with
30    /// a new pair.
31    Replace(BoxedItem<K, V>),
32}
33
34#[derive(Debug, Eq, PartialEq)]
35pub enum MergeResult<K, V> {
36    /// Emits the left item unchanged. Keeps the right item. This is the common case. Once an item
37    /// has been emitted, it will never be seen again by the merge function.
38    EmitLeft,
39
40    /// All other merge results are covered by the following. Take care when replacing items
41    /// that you replace the correct item. The merger will never merge two items together from
42    /// the same layer. Consider the following scenario:
43    ///
44    ///        +-----------+              +-----------+
45    /// 0:     |    A      |              |    C      |
46    ///        +-----------+--------------+-----------+
47    /// 1:                 |      B       |
48    ///                    +--------------+
49    ///
50    /// Let's say that all three items can be merged together. The merge function will first be
51    /// presented with items A and B, at which point it has the option of replacing the left item
52    /// (i.e. A, in layer 0) or the right item (i.e. B in layer 1). However, if you replace the left
53    /// item, the merge function will not then be given the opportunity to merge it with C, so the
54    /// correct thing to do in this case is to replace the right item B in layer 1, and discard the
55    /// left item. A rule you can use is that you should avoid replacing an item with another item
56    /// whose upper bound exceeds that of the item you are replacing.
57    ///
58    /// There are some combinations that might lead to infinite loops (e.g. None, Keep, Keep) and
59    /// should obviously be avoided.
60    Other { emit: Option<BoxedItem<K, V>>, left: ItemOp<K, V>, right: ItemOp<K, V> },
61}
62
63/// Users must provide a merge function which will take pairs of items, left and right, and return a
64/// merge result. The left item's key will either be less than the right item's key, or if they are
65/// the same, then the left item will be in a lower layer index (lower layer indexes indicate more
66/// recent entries). The last remaining item is always emitted.
67pub type MergeFn<K, V> =
68    fn(&MergeLayerIterator<'_, K, V>, &MergeLayerIterator<'_, K, V>) -> MergeResult<K, V>;
69
70pub enum MergeItem<K, V> {
71    None,
72    Item(BoxedItem<K, V>),
73    Iter,
74}
75
76enum RawIterator<'a, K, V> {
77    None,
78    Const(Box<dyn LayerIterator<K, V> + 'a>),
79    Mut(Box<dyn LayerIteratorMut<K, V> + 'a>),
80}
81
82// SAFETY: LayerIteratorMut is not Sync or Send, but it's only used by merge_into, which doesn't
83// send across threads (and is sync, not async).
84unsafe impl<K, V> Send for RawIterator<'_, K, V> {}
85
86// An iterator that keeps track of where we are for each of the layers. We push these onto a
87// min-heap.
88pub struct MergeLayerIterator<'a, K, V> {
89    layer: Option<&'a dyn Layer<K, V>>,
90
91    // The underlying iterator.
92    iter: RawIterator<'a, K, V>,
93
94    // The index of the layer this is for.
95    pub layer_index: u16,
96
97    // The item we are currently pointing at.
98    item: MergeItem<K, V>,
99}
100
101impl<'a, K, V> MergeLayerIterator<'a, K, V> {
102    pub fn key(&self) -> &K {
103        self.item().key
104    }
105
106    pub fn value(&self) -> &V {
107        self.item().value
108    }
109
110    pub fn sequence(&self) -> u64 {
111        self.item().sequence
112    }
113
114    fn new(layer_index: u16, layer: &'a dyn Layer<K, V>) -> Self {
115        MergeLayerIterator {
116            layer: Some(layer),
117            iter: RawIterator::None,
118            layer_index,
119            item: MergeItem::None,
120        }
121    }
122
123    fn new_with_item(layer_index: u16, item: MergeItem<K, V>) -> Self {
124        MergeLayerIterator { layer: None, iter: RawIterator::None, layer_index, item }
125    }
126
127    fn item(&self) -> ItemRef<'_, K, V> {
128        match &self.item {
129            MergeItem::None => panic!("No item!"),
130            MergeItem::Item(item) => ItemRef::from(item),
131            MergeItem::Iter => self.get().unwrap(),
132        }
133    }
134
135    fn get(&self) -> Option<ItemRef<'_, K, V>> {
136        match &self.iter {
137            RawIterator::None => panic!("No iterator!"),
138            RawIterator::Const(iter) => iter.get(),
139            RawIterator::Mut(iter) => iter.get(),
140        }
141    }
142
143    fn set_item_from_iter(&mut self) {
144        self.item = if match &self.iter {
145            RawIterator::None => unreachable!(),
146            RawIterator::Const(iter) => iter.get(),
147            RawIterator::Mut(iter) => iter.get(),
148        }
149        .is_some()
150        {
151            MergeItem::Iter
152        } else {
153            MergeItem::None
154        };
155    }
156
157    fn take_item(&mut self) -> Option<BoxedItem<K, V>> {
158        if let MergeItem::Item(_) = self.item {
159            let mut item = MergeItem::None;
160            std::mem::swap(&mut self.item, &mut item);
161            if let MergeItem::Item(item) = item {
162                Some(item)
163            } else {
164                unreachable!();
165            }
166        } else {
167            None
168        }
169    }
170
171    async fn advance(&mut self) -> Result<(), Error> {
172        if let MergeItem::Iter = self.item {
173            if let RawIterator::Const(iter) = &mut self.iter {
174                iter.advance().await?;
175            } else {
176                // This will never get called in the RawIterator::Mut case.
177                unreachable!();
178            }
179        }
180        self.set_item_from_iter();
181        Ok(())
182    }
183
184    fn replace(&mut self, item: BoxedItem<K, V>) {
185        self.item = MergeItem::Item(item);
186    }
187
188    fn is_some(&self) -> bool {
189        !matches!(self.item, MergeItem::None)
190    }
191
192    // This function exists so that we can advance multiple iterators concurrently using, say,
193    // try_join!.
194    async fn maybe_advance(&mut self, op: &ItemOp<K, V>) -> Result<(), Error> {
195        if let ItemOp::Keep = op {
196            Ok(())
197        } else {
198            self.advance().await
199        }
200    }
201}
202
203// -- Ord and friends --
204impl<K: OrdLowerBound, V> Ord for MergeLayerIterator<'_, K, V> {
205    fn cmp(&self, other: &Self) -> Ordering {
206        // Reverse ordering because we want min-heap not max-heap.
207        other.key().cmp_lower_bound(self.key()).then(other.layer_index.cmp(&self.layer_index))
208    }
209}
210impl<K: OrdLowerBound, V> PartialOrd for MergeLayerIterator<'_, K, V> {
211    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
212        Some(self.cmp(other))
213    }
214}
215impl<K: OrdLowerBound, V> PartialEq for MergeLayerIterator<'_, K, V> {
216    fn eq(&self, other: &Self) -> bool {
217        self.cmp(other) == Ordering::Equal
218    }
219}
220impl<K: OrdLowerBound, V> Eq for MergeLayerIterator<'_, K, V> {}
221
222// As we merge items, the current item can be an item that has been replaced (and later emitted) by
223// the merge function, or an item referenced by an iterator, or nothing.
224enum CurrentItem<'a, 'b, K, V> {
225    None,
226    Item(BoxedItem<K, V>),
227    Iterator(&'a mut MergeLayerIterator<'b, K, V>),
228}
229
230impl<'a, 'b, K, V> CurrentItem<'a, 'b, K, V> {
231    // Takes the iterator if one is present and replaces the current item with None; otherwise,
232    // leaves the current item untouched.
233    fn take_iterator(&mut self) -> Option<&'a mut MergeLayerIterator<'b, K, V>> {
234        if let CurrentItem::Iterator(_) = self {
235            let mut result = CurrentItem::None;
236            std::mem::swap(self, &mut result);
237            if let CurrentItem::Iterator(iter) = result {
238                Some(iter)
239            } else {
240                unreachable!();
241            }
242        } else {
243            None
244        }
245    }
246}
247
248impl<'a, K, V> From<&'a CurrentItem<'_, '_, K, V>> for Option<ItemRef<'a, K, V>> {
249    fn from(iter: &'a CurrentItem<'_, '_, K, V>) -> Option<ItemRef<'a, K, V>> {
250        match iter {
251            CurrentItem::None => None,
252            CurrentItem::Iterator(iterator) => Some(iterator.item()),
253            CurrentItem::Item(item) => Some(item.into()),
254        }
255    }
256}
257
258/// Merger is the main entry point to merging.
259pub struct Merger<'a, K, V> {
260    // A buffer containing all the MergeLayerIterator objects.
261    iterators: Vec<MergeLayerIterator<'a, K, V>>,
262
263    // The function to be used for merging items.
264    merge_fn: MergeFn<K, V>,
265
266    // If true, additional logging is enabled.
267    trace: bool,
268
269    // Tracks statistics related to the LSM tree.
270    counters: Arc<Mutex<lsm_tree::Counters>>,
271}
272
273/// Query describes the goal of a search in the LSM tree.  The caller specifies this to guide the
274/// merger on which layer files it must consult.
275/// Layers might be skipped during a query for a few reasons:
276///   * Existence filters might allow layer files to be skipped.
277///   * For bounded queries (i.e. any except FullScan), if the search key has
278///     MergeType::OptimizedMerge, we can use that to omit older layers once we find a match.
279#[derive(Debug, Clone)]
280pub enum Query<'a, K: Key + LayerKey + OrdLowerBound> {
281    /// Point queries look for a specific key in the LSM tree.  In this case, the existence filters
282    /// for each layer file can be used to decide if the layer file needs to be consulted.
283    /// Note that it is an error to use `Point` for range-like keys.  Either `LimitedRange` or
284    /// `FullRange` must be used instead.
285    Point(&'a K),
286    /// LimitedRange queries position the iterator to `K::search_key`, and scans forward until the
287    /// first record which does not overlap the provided key.  In this case, the existence filters
288    /// for each layer file can be used, but we have to check for all possible keys in the range we
289    /// wish to search.  Obviously, that means that the range should be, well, limited.  Fuzzy
290    /// hashes permit this for extent-like keys, but these queries are not the right choice for
291    /// things like searching a directory.
292    LimitedRange(&'a K),
293    /// FullRange queries position the iterator to a starting key, and scan forward to the
294    /// first key of a different type.  In this case, the existence filters are not used.
295    FullRange(&'a K),
296    /// FullScan queries are intended to yield every record in the tree.  In this case, the
297    /// existence filters are not used.
298    FullScan,
299}
300
301impl<'a, K: Key + LayerKey + OrdLowerBound> Query<'a, K> {
302    fn needs_layer<V>(&self, layer: &dyn Layer<K, V>) -> bool {
303        match self {
304            Self::Point(key) => layer.maybe_contains_key(key),
305            Self::LimitedRange(key) => layer.maybe_contains_key(key),
306            Self::FullRange(_) => true,
307            Self::FullScan => true,
308        }
309    }
310}
311
312#[fxfs_trace::trace]
313impl<'a, K: Key + LayerKey + OrdLowerBound, V: Value> Merger<'a, K, V> {
314    pub(super) fn new<I: Iterator<Item = &'a dyn Layer<K, V>>>(
315        layers: I,
316        merge_fn: MergeFn<K, V>,
317        counters: Arc<Mutex<lsm_tree::Counters>>,
318    ) -> Merger<'a, K, V> {
319        Merger {
320            iterators: layers
321                .enumerate()
322                .map(|(index, layer)| MergeLayerIterator::new(index as u16, layer))
323                .collect(),
324            merge_fn: merge_fn,
325            trace: false,
326            counters,
327        }
328    }
329
330    /// Executes `query`, positioning the iterator to the first matching item.  See `Query` for
331    /// details.
332    #[trace]
333    pub async fn query(
334        &mut self,
335        query: Query<'_, K>,
336    ) -> Result<MergerIterator<'_, 'a, K, V>, Error> {
337        if let Query::Point(key) = query {
338            // NB: It is almost certainly an error to provide a range-like key to a Point query,
339            // hence this debug assertion.  This will most likely result in the wrong result, since
340            // the starting bound would be the original range key and therefore we might miss
341            // records that start earlier but overlap with the key.
342            // For example, if there is a layer which contains 0..100 and 100..200, and we search
343            // for 50..150, we should get both extents, which requires a different starting bound
344            // than 50..150 (particularly it would require the `K::search_key`).
345            debug_assert!(!key.is_range_key())
346        };
347        let len = self.iterators.len();
348        let pending_iterators = {
349            fxfs_trace::duration!(c"Merger::filter_layer_files", "len" => len);
350            self.iterators
351                .iter_mut()
352                .rev()
353                .filter(|l| query.needs_layer(l.layer.clone().unwrap()))
354                .collect::<Vec<&mut MergeLayerIterator<'a, K, V>>>()
355        };
356        let layer_count = pending_iterators.len();
357        {
358            let mut counters = self.counters.lock();
359            counters.num_seeks += 1;
360            counters.layer_files_total += len;
361            counters.layer_files_skipped += len - layer_count;
362        }
363        log::debug!(query:?; "Consulting {}/{} layers", layer_count, len);
364        let mut merger_iter = MergerIterator {
365            merge_fn: self.merge_fn,
366            pending_iterators,
367            heap: BinaryHeap::with_capacity(layer_count),
368            item: CurrentItem::None,
369            trace: self.trace,
370            history: String::new(),
371        };
372        let owned_bound;
373        let bound = match query {
374            Query::Point(key) => Bound::Included(key),
375            Query::LimitedRange(key) => {
376                owned_bound = Bound::Included(key.search_key());
377                owned_bound.as_ref()
378            }
379            Query::FullRange(start) => Bound::Included(start),
380            Query::FullScan => Bound::Unbounded,
381        };
382        merger_iter.seek(bound).await?;
383        Ok(merger_iter)
384    }
385
386    pub fn set_trace(&mut self, v: bool) {
387        self.trace = v;
388    }
389}
390
391/// This is an iterator that will allow iteration over merged layers.  The primary interface is via
392/// the LayerIterator trait.
393pub struct MergerIterator<'a, 'b, K, V> {
394    merge_fn: MergeFn<K, V>,
395
396    // Iterators that we have not yet pushed onto the heap.
397    pending_iterators: Vec<&'a mut MergeLayerIterator<'b, K, V>>,
398
399    // A heap with the merge iterators.
400    heap: BinaryHeap<&'a mut MergeLayerIterator<'b, K, V>>,
401
402    // The current item.
403    item: CurrentItem<'a, 'b, K, V>,
404
405    // If true, logs regarding merger behaviour are appended to history.
406    trace: bool,
407
408    // Holds trace information if trace is true.
409    history: String,
410}
411
412impl<'a, 'b, K: Key + LayerKey + OrdLowerBound, V: Value> MergerIterator<'a, 'b, K, V> {
413    async fn seek(&mut self, bound: Bound<&K>) -> Result<(), Error> {
414        let next_key = match bound {
415            Bound::Unbounded => None,
416            Bound::Included(key) => Some(key),
417            Bound::Excluded(_) => panic!("Excluded bounds not supported!"),
418        };
419
420        self.push_iterators(next_key, bound).await?;
421        self.advance_impl(bound).await
422    }
423
424    // Merges items from an array of layers using the provided merge function. The merge function is
425    // repeatedly provided the lowest and the second lowest element, if one exists. In cases where
426    // the two lowest elements compare equal, the element with the lowest layer (i.e. whichever
427    // comes first in the layers array) will come first.  `next_key_bound` is a bound for the next
428    // key we expect.
429    async fn advance_impl(&mut self, next_key_bound: Bound<&K>) -> Result<(), Error> {
430        loop {
431            loop {
432                if self.heap.is_empty() {
433                    self.item = CurrentItem::None;
434                    return Ok(());
435                }
436                let lowest = self.heap.pop().unwrap();
437                let maybe_second_lowest = self.heap.pop();
438                if let Some(second_lowest) = maybe_second_lowest {
439                    let result = (self.merge_fn)(&lowest, &second_lowest);
440                    if self.trace {
441                        writeln!(
442                            self.history,
443                            "merge {:?}, {:?} -> {:?}",
444                            lowest.item(),
445                            second_lowest.item(),
446                            result
447                        )
448                        .unwrap();
449                    }
450                    match result {
451                        MergeResult::EmitLeft => {
452                            self.heap.push(second_lowest);
453                            self.item = CurrentItem::Iterator(lowest);
454                            break;
455                        }
456                        MergeResult::Other { emit, left, right } => {
457                            try_join!(
458                                lowest.maybe_advance(&left),
459                                second_lowest.maybe_advance(&right)
460                            )?;
461                            self.update_item(lowest, left);
462                            self.update_item(second_lowest, right);
463                            if let Some(emit) = emit {
464                                self.item = CurrentItem::Item(emit);
465                                break;
466                            }
467                        }
468                    }
469                } else {
470                    self.item = CurrentItem::Iterator(lowest);
471                    break;
472                }
473            }
474
475            // If the item we're about to yield isn't within `next_key_bound`, ignore it and
476            // continue.  To see how this would happen, imagine the following scenario:
477            //
478            //       0          10          20            30
479            //       +----------+
480            //   0   |          |
481            //       +----------+-----------+
482            //   1   |                      |
483            //       +----------------------+-------------+
484            //   2   |                                    |
485            //       +------------------------------------+
486            //
487            // If we are to seek for 0..1 and then iterate over what we find, we expect the sequence
488            // 0..10, 10..20, 20..30.  After the first seek, we should only consult iterator 0.  For
489            // the first advance, we will consult iterator 1 but we need to merge the 0..20 element
490            // with the 0..10 element which we already emitted.  To make this work, we push the
491            // 0..10 item back on the heap (see advance) and then merge, but that will yield the
492            // 0..10 entry again, so here we need to skip over it, and then merge again, at which
493            // point we should see the 10..20 entry as expected.
494            match next_key_bound {
495                Bound::Included(key)
496                    if self.get().unwrap().key.cmp_upper_bound(key) == Ordering::Less => {}
497                Bound::Excluded(key)
498                    if self.get().unwrap().key.cmp_upper_bound(key) != Ordering::Greater => {}
499                _ => return Ok(()),
500            }
501            if let Some(iterator) = self.item.take_iterator() {
502                iterator.advance().await?;
503                if iterator.is_some() {
504                    self.heap.push(iterator);
505                }
506            }
507        }
508    }
509
510    // Returns whether more iterators are required for the given `next_key`.  See push_iterators.
511    fn needs_more_iterators(&self, next_key: Option<&K>, next_key_bound: Bound<&K>) -> bool {
512        if self.pending_iterators.is_empty() {
513            return false;
514        }
515        if self.heap.is_empty() {
516            return true;
517        }
518        let target;
519        match next_key_bound {
520            Bound::Included(k) => target = k,
521            Bound::Excluded(_) | Bound::Unbounded => return true,
522        }
523        match target.merge_type() {
524            MergeType::FullMerge => true,
525            MergeType::OptimizedMerge => {
526                match next_key {
527                    Some(k) => {
528                        self.heap.peek().unwrap().key().cmp_lower_bound(k) == Ordering::Greater
529                    }
530                    None => {
531                        // If we've found an exact match, return it since nothing needs to merge.
532                        self.heap.peek().unwrap().key().cmp_lower_bound(target) != Ordering::Equal
533                    }
534                }
535            }
536        }
537    }
538
539    // Pushes additional iterators onto the heap until we are confident that the top element will
540    // yield what we are looking for.  If next_key is set, we will stop pushing iterators as soon as
541    // a key is encountered that equals or precedes it.  If next_key is None, then all layers are
542    // pushed.  next_key_bound is the bound to search for if another layer does need to be
543    // consulted.  See the comment for the MergeType trait.
544    async fn push_iterators(
545        &mut self,
546        next_key: Option<&K>,
547        next_key_bound: Bound<&K>,
548    ) -> Result<(), Error> {
549        while self.needs_more_iterators(next_key, next_key_bound) {
550            let iter = self.pending_iterators.pop().unwrap();
551            let sub_iter = iter.layer.as_ref().unwrap().seek(next_key_bound).await?;
552            if self.trace {
553                writeln!(
554                    self.history,
555                    "merger: search for {:?}, found {:?}",
556                    next_key_bound,
557                    sub_iter.get()
558                )
559                .unwrap();
560            }
561            iter.iter = RawIterator::Const(sub_iter);
562            iter.set_item_from_iter();
563            if iter.is_some() {
564                self.heap.push(iter);
565            }
566        }
567        Ok(())
568    }
569
570    // Updates the merge iterator depending on |op|. If discarding, the iterator should have already
571    // been advanced.
572    fn update_item(&mut self, item: &'a mut MergeLayerIterator<'b, K, V>, op: ItemOp<K, V>) {
573        match op {
574            ItemOp::Keep => self.heap.push(item),
575            ItemOp::Discard => {
576                // The iterator should have already been advanced.
577                if item.is_some() {
578                    self.heap.push(item);
579                }
580            }
581            ItemOp::Replace(replacement) => {
582                item.replace(replacement);
583                self.heap.push(item);
584            }
585        }
586    }
587}
588
589#[async_trait]
590impl<'a, K: Key + LayerKey + OrdLowerBound, V: Value> LayerIterator<K, V>
591    for MergerIterator<'a, '_, K, V>
592{
593    async fn advance(&mut self) -> Result<(), Error> {
594        let current_key;
595        let mut next_key = None;
596        let mut next_key_bound = Bound::Unbounded;
597        if !self.pending_iterators.is_empty() {
598            if let Some(ItemRef { key, .. }) = self.get() {
599                next_key = key.next_key();
600                match next_key {
601                    None => {
602                        // If there is no next key, we must now query all layers and the key we
603                        // search for is the immediate successor of the current key.
604                        current_key = Some(key.clone());
605                        next_key_bound = Bound::Excluded(current_key.as_ref().unwrap());
606                    }
607                    Some(ref key) => next_key_bound = Bound::Included(key),
608                }
609            }
610        }
611
612        // Advance the iterator for the current item and push it onto the heap, and also push any
613        // additional iterators onto the heap (by calling push_iterators).
614        if let Some(iterator) = self.item.take_iterator() {
615            if self.needs_more_iterators(next_key.as_ref(), next_key_bound) {
616                let existing_item = iterator.item().boxed();
617                iterator.advance().await?;
618                match &next_key {
619                    Some(next_key)
620                        if iterator.is_some()
621                            && iterator.key().cmp_lower_bound(next_key) != Ordering::Greater =>
622                    {
623                        // In this case, the key immediately following is a good candidate, and all
624                        // we need to do is merge it with existing iterators; we shouldn't need to
625                        // consult with any more iterators.
626                    }
627                    _ => {
628                        // We are going to need to consult more iterators so we need to go back to
629                        // the previous item so that we can merge with it.  See the comment in
630                        // advance_impl.
631                        iterator.replace(existing_item);
632
633                        // We must push other iterators here before pushing iterator onto the heap
634                        // because we know `iterator` would end up at the top of the heap.
635                        self.push_iterators(next_key.as_ref(), next_key_bound).await?;
636                    }
637                }
638            } else {
639                iterator.advance().await?;
640            }
641            if iterator.is_some() {
642                self.heap.push(iterator);
643            }
644        } else {
645            self.push_iterators(next_key.as_ref(), next_key_bound).await?;
646        }
647
648        self.advance_impl(next_key_bound).await
649    }
650
651    fn get(&self) -> Option<ItemRef<'_, K, V>> {
652        (&self.item).into()
653    }
654}
655
656struct MutMergeLayerIterator<'a, K, V>(MergeLayerIterator<'a, K, V>);
657
658impl<K, V> MutMergeLayerIterator<'_, K, V> {
659    fn advance(&mut self) {
660        if let MergeItem::Iter = self.item {
661            self.as_mut().advance();
662        }
663        self.set_item_from_iter();
664    }
665}
666
667impl<'a, K, V> AsMut<dyn LayerIteratorMut<K, V> + 'a> for MutMergeLayerIterator<'a, K, V> {
668    fn as_mut(&mut self) -> &mut (dyn LayerIteratorMut<K, V> + 'a) {
669        let RawIterator::Mut(iter) = &mut self.0.iter else { unreachable!() };
670        iter.as_mut()
671    }
672}
673
674impl<'a, K, V> Deref for MutMergeLayerIterator<'a, K, V> {
675    type Target = MergeLayerIterator<'a, K, V>;
676
677    fn deref(&self) -> &Self::Target {
678        &self.0
679    }
680}
681
682impl<K, V> DerefMut for MutMergeLayerIterator<'_, K, V> {
683    fn deref_mut(&mut self) -> &mut Self::Target {
684        &mut self.0
685    }
686}
687
688// Merges the given item into a mutable layer.
689pub(super) fn merge_into<K: Debug + OrdLowerBound, V: Debug>(
690    mut_iter: Box<dyn LayerIteratorMut<K, V> + '_>,
691    item: Item<K, V>,
692    merge_fn: MergeFn<K, V>,
693) -> Result<(), Error> {
694    let merge_item = if mut_iter.get().is_some() { MergeItem::Iter } else { MergeItem::None };
695    let mut mut_merge_iter = MutMergeLayerIterator(MergeLayerIterator {
696        layer: None,
697        iter: RawIterator::Mut(mut_iter),
698        layer_index: 1,
699        item: merge_item,
700    });
701    let mut item_merge_iter = MergeLayerIterator::new_with_item(0, MergeItem::Item(item.boxed()));
702    while mut_merge_iter.is_some() && item_merge_iter.is_some() {
703        if mut_merge_iter.0 > item_merge_iter {
704            // In this branch the mutable layer is left and the item we're merging-in is right.
705            let merge_result = merge_fn(&mut_merge_iter, &item_merge_iter);
706            debug!(
707                lhs:? = mut_merge_iter.key(),
708                rhs:? = item_merge_iter.key(),
709                result:? = merge_result;
710                "(1) merge");
711            match merge_result {
712                MergeResult::EmitLeft => {
713                    if let Some(item) = mut_merge_iter.take_item() {
714                        mut_merge_iter.as_mut().insert(*item);
715                        mut_merge_iter.set_item_from_iter();
716                    } else {
717                        mut_merge_iter.advance();
718                    }
719                }
720                MergeResult::Other { emit, left, right } => {
721                    if let Some(emit) = emit {
722                        mut_merge_iter.as_mut().insert(*emit);
723                    }
724                    match left {
725                        ItemOp::Keep => {}
726                        ItemOp::Discard => {
727                            if matches!(mut_merge_iter.item, MergeItem::Iter) {
728                                mut_merge_iter.as_mut().erase();
729                            }
730                            mut_merge_iter.set_item_from_iter();
731                        }
732                        ItemOp::Replace(item) => {
733                            if let MergeItem::Iter = mut_merge_iter.item {
734                                mut_merge_iter.as_mut().erase();
735                            }
736                            mut_merge_iter.item = MergeItem::Item(item)
737                        }
738                    }
739                    match right {
740                        ItemOp::Keep => {}
741                        ItemOp::Discard => item_merge_iter.item = MergeItem::None,
742                        ItemOp::Replace(item) => item_merge_iter.item = MergeItem::Item(item),
743                    }
744                }
745            }
746        } else {
747            // In this branch, the item we're merging-in is left and the mutable layer is right.
748            let merge_result = merge_fn(&item_merge_iter, &mut_merge_iter);
749            debug!(
750                lhs:? = mut_merge_iter.key(),
751                rhs:? = item_merge_iter.key(),
752                result:? = merge_result;
753                "(2) merge");
754            match merge_result {
755                MergeResult::EmitLeft => break, // Item is inserted outside the loop
756                MergeResult::Other { emit, left, right } => {
757                    if let Some(emit) = emit {
758                        mut_merge_iter.as_mut().insert(*emit);
759                    }
760                    match left {
761                        ItemOp::Keep => {}
762                        ItemOp::Discard => item_merge_iter.item = MergeItem::None,
763                        ItemOp::Replace(item) => item_merge_iter.item = MergeItem::Item(item),
764                    }
765                    match right {
766                        ItemOp::Keep => {}
767                        ItemOp::Discard => {
768                            if matches!(mut_merge_iter.item, MergeItem::Iter) {
769                                mut_merge_iter.as_mut().erase();
770                            }
771                            mut_merge_iter.set_item_from_iter();
772                        }
773                        ItemOp::Replace(item) => {
774                            if let MergeItem::Iter = mut_merge_iter.item {
775                                mut_merge_iter.as_mut().erase();
776                            }
777                            mut_merge_iter.item = MergeItem::Item(item)
778                        }
779                    }
780                }
781            }
782        }
783    } // while ...
784
785    // The only way we could get here with both items is via the break above, so we know the correct
786    // order required here.
787    if let MergeItem::Item(item) = item_merge_iter.item {
788        mut_merge_iter.as_mut().insert(*item);
789    }
790    if let Some(item) = mut_merge_iter.take_item() {
791        mut_merge_iter.as_mut().insert(*item);
792    }
793    if let RawIterator::Mut(mut iter) = mut_merge_iter.0.iter {
794        iter.commit();
795    }
796    Ok(())
797}
798
799#[cfg(test)]
800mod tests {
801    use super::ItemOp::{Discard, Keep, Replace};
802    use super::{MergeLayerIterator, MergeResult, Merger};
803    use crate::lsm_tree::persistent_layer::{PersistentLayer, PersistentLayerWriter};
804    use crate::lsm_tree::skip_list_layer::SkipListLayer;
805    use crate::lsm_tree::types::{
806        FuzzyHash, Item, ItemRef, Key, Layer, LayerIterator, LayerKey, LayerWriter, MergeType,
807        OrdLowerBound, OrdUpperBound, SortByU64,
808    };
809    use crate::lsm_tree::{self, Query, Value};
810    use crate::object_store::{self, ObjectKey, ObjectValue, VOLUME_DATA_KEY_ID};
811    use crate::serialized_types::{
812        versioned_type, Version, Versioned, VersionedLatest, LATEST_VERSION,
813    };
814    use crate::testing::fake_object::{FakeObject, FakeObjectHandle};
815    use crate::testing::writer::Writer;
816    use fprint::TypeFingerprint;
817    use fuchsia_sync::Mutex;
818    use fxfs_macros::FuzzyHash;
819    use rand::Rng;
820    use std::hash::Hash;
821    use std::ops::{Bound, Range};
822    use std::sync::Arc;
823
824    #[derive(
825        Clone,
826        Eq,
827        Hash,
828        FuzzyHash,
829        PartialEq,
830        Debug,
831        serde::Serialize,
832        serde::Deserialize,
833        TypeFingerprint,
834        Versioned,
835    )]
836    struct TestKey(Range<u64>);
837
838    impl Value for i32 {
839        const DELETED_MARKER: Self = 0;
840    }
841
842    versioned_type! { 1.. => TestKey }
843
844    impl SortByU64 for TestKey {
845        fn get_leading_u64(&self) -> u64 {
846            self.0.start
847        }
848    }
849
850    impl LayerKey for TestKey {
851        fn merge_type(&self) -> MergeType {
852            MergeType::OptimizedMerge
853        }
854
855        fn next_key(&self) -> Option<Self> {
856            Some(TestKey(self.0.end..self.0.end + 1))
857        }
858    }
859
860    impl OrdUpperBound for TestKey {
861        fn cmp_upper_bound(&self, other: &TestKey) -> std::cmp::Ordering {
862            self.0.end.cmp(&other.0.end)
863        }
864    }
865
866    impl OrdLowerBound for TestKey {
867        fn cmp_lower_bound(&self, other: &Self) -> std::cmp::Ordering {
868            self.0.start.cmp(&other.0.start)
869        }
870    }
871
872    fn layer_ref_iter<K: Key, V: Value>(
873        layers: &[Arc<SkipListLayer<K, V>>],
874    ) -> impl Iterator<Item = &dyn Layer<K, V>> {
875        layers.iter().map(|x| x.as_ref() as &dyn Layer<K, V>)
876    }
877
878    fn dyn_layer_ref_iter<K: Key, V: Value>(
879        layers: &[Arc<dyn Layer<K, V>>],
880    ) -> impl Iterator<Item = &dyn Layer<K, V>> {
881        layers.iter().map(|x| x.as_ref())
882    }
883
884    fn counters() -> Arc<Mutex<lsm_tree::Counters>> {
885        Arc::new(Mutex::new(lsm_tree::Counters::default()))
886    }
887
888    #[fuchsia::test]
889    async fn test_emit_left() {
890        let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
891        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
892        skip_lists[0].insert(items[1].clone()).expect("insert error");
893        skip_lists[1].insert(items[0].clone()).expect("insert error");
894        let mut merger = Merger::new(
895            layer_ref_iter(&skip_lists),
896            |_left, _right| MergeResult::EmitLeft,
897            counters(),
898        );
899        let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
900        let ItemRef { key, value, .. } = iter.get().expect("missing item");
901        assert_eq!((key, value), (&items[0].key, &items[0].value));
902        iter.advance().await.unwrap();
903        let ItemRef { key, value, .. } = iter.get().expect("missing item");
904        assert_eq!((key, value), (&items[1].key, &items[1].value));
905        iter.advance().await.unwrap();
906        assert!(iter.get().is_none());
907    }
908
909    #[fuchsia::test]
910    async fn test_other_emit() {
911        let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
912        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
913        skip_lists[0].insert(items[1].clone()).expect("insert error");
914        skip_lists[1].insert(items[0].clone()).expect("insert error");
915        let mut merger = Merger::new(
916            layer_ref_iter(&skip_lists),
917            |_left, _right| MergeResult::Other {
918                emit: Some(Item::new(TestKey(3..3), 3).boxed()),
919                left: Discard,
920                right: Discard,
921            },
922            counters(),
923        );
924        let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
925
926        let ItemRef { key, value, .. } = iter.get().expect("missing item");
927        assert_eq!((key, value), (&TestKey(3..3), &3));
928        iter.advance().await.unwrap();
929        assert!(iter.get().is_none());
930    }
931
932    #[fuchsia::test]
933    async fn test_replace_left() {
934        let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
935        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
936        skip_lists[0].insert(items[1].clone()).expect("insert error");
937        skip_lists[1].insert(items[0].clone()).expect("insert error");
938        let mut merger = Merger::new(
939            layer_ref_iter(&skip_lists),
940            |_left, _right| MergeResult::Other {
941                emit: None,
942                left: Replace(Item::new(TestKey(3..3), 3).boxed()),
943                right: Discard,
944            },
945            counters(),
946        );
947        let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
948
949        // The merger should replace the left item and then after discarding the right item, it
950        // should emit the replacement.
951        let ItemRef { key, value, .. } = iter.get().expect("missing item");
952        assert_eq!((key, value), (&TestKey(3..3), &3));
953        iter.advance().await.unwrap();
954        assert!(iter.get().is_none());
955    }
956
957    #[fuchsia::test]
958    async fn test_replace_right() {
959        let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
960        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
961        skip_lists[0].insert(items[1].clone()).expect("insert error");
962        skip_lists[1].insert(items[0].clone()).expect("insert error");
963        let mut merger = Merger::new(
964            layer_ref_iter(&skip_lists),
965            |_left, _right| MergeResult::Other {
966                emit: None,
967                left: Discard,
968                right: Replace(Item::new(TestKey(3..3), 3).boxed()),
969            },
970            counters(),
971        );
972        let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
973
974        // The merger should replace the right item and then after discarding the left item, it
975        // should emit the replacement.
976        let ItemRef { key, value, .. } = iter.get().expect("missing item");
977        assert_eq!((key, value), (&TestKey(3..3), &3));
978        iter.advance().await.unwrap();
979        assert!(iter.get().is_none());
980    }
981
982    #[fuchsia::test]
983    async fn test_left_less_than_right() {
984        let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
985        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
986        skip_lists[0].insert(items[1].clone()).expect("insert error");
987        skip_lists[1].insert(items[0].clone()).expect("insert error");
988        let mut merger = Merger::new(
989            layer_ref_iter(&skip_lists),
990            |left, right| {
991                assert_eq!((left.key(), left.value()), (&TestKey(1..1), &1));
992                assert_eq!((right.key(), right.value()), (&TestKey(2..2), &2));
993                MergeResult::EmitLeft
994            },
995            counters(),
996        );
997        merger.query(Query::FullScan).await.expect("seek failed");
998    }
999
1000    #[fuchsia::test]
1001    async fn test_left_equals_right() {
1002        let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1003        let item = Item::new(TestKey(1..1), 1);
1004        skip_lists[0].insert(item.clone()).expect("insert error");
1005        skip_lists[1].insert(item.clone()).expect("insert error");
1006        let mut merger = Merger::new(
1007            layer_ref_iter(&skip_lists),
1008            |left, right| {
1009                assert_eq!((left.key(), left.value()), (&TestKey(1..1), &1));
1010                assert_eq!((right.key(), right.value()), (&TestKey(1..1), &1));
1011                assert_eq!(left.layer_index, 0);
1012                assert_eq!(right.layer_index, 1);
1013                MergeResult::EmitLeft
1014            },
1015            counters(),
1016        );
1017        merger.query(Query::FullScan).await.expect("seek failed");
1018    }
1019
1020    #[fuchsia::test]
1021    async fn test_keep() {
1022        let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1023        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
1024        skip_lists[0].insert(items[1].clone()).expect("insert error");
1025        skip_lists[1].insert(items[0].clone()).expect("insert error");
1026        let mut merger = Merger::new(
1027            layer_ref_iter(&skip_lists),
1028            |left, right| {
1029                if left.key() == &TestKey(1..1) {
1030                    MergeResult::Other {
1031                        emit: None,
1032                        left: Replace(Item::new(TestKey(3..3), 3).boxed()),
1033                        right: Keep,
1034                    }
1035                } else {
1036                    assert_eq!(left.key(), &TestKey(2..2));
1037                    assert_eq!(right.key(), &TestKey(3..3));
1038                    MergeResult::Other { emit: None, left: Discard, right: Keep }
1039                }
1040            },
1041            counters(),
1042        );
1043        let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
1044
1045        // The merger should first replace left and then it should call the merger again with 2 & 3
1046        // and end up just keeping 3.
1047        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1048        assert_eq!((key, value), (&TestKey(3..3), &3));
1049        iter.advance().await.unwrap();
1050        assert!(iter.get().is_none());
1051    }
1052
1053    #[fuchsia::test]
1054    async fn test_merge_10_layers() {
1055        let skip_lists: Vec<_> = (0..10).map(|_| SkipListLayer::new(100)).collect();
1056        let mut rng = rand::thread_rng();
1057        for i in 0..100 {
1058            skip_lists[rng.gen_range(0..10) as usize]
1059                .insert(Item::new(TestKey(i..i), i))
1060                .expect("insert error");
1061        }
1062        let mut merger = Merger::new(
1063            layer_ref_iter(&skip_lists),
1064            |_left, _right| MergeResult::EmitLeft,
1065            counters(),
1066        );
1067        let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
1068
1069        for i in 0..100 {
1070            let ItemRef { key, value, .. } = iter.get().expect("missing item");
1071            assert_eq!((key, value), (&TestKey(i..i), &i));
1072            iter.advance().await.unwrap();
1073        }
1074        assert!(iter.get().is_none());
1075    }
1076
1077    #[fuchsia::test]
1078    async fn test_merge_uses_cmp_lower_bound() {
1079        let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1080        let items = [Item::new(TestKey(1..10), 1), Item::new(TestKey(2..3), 2)];
1081        skip_lists[0].insert(items[1].clone()).expect("insert error");
1082        skip_lists[1].insert(items[0].clone()).expect("insert error");
1083        let mut merger = Merger::new(
1084            layer_ref_iter(&skip_lists),
1085            |_left, _right| MergeResult::EmitLeft,
1086            counters(),
1087        );
1088        let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
1089
1090        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1091        assert_eq!((key, value), (&items[0].key, &items[0].value));
1092        iter.advance().await.unwrap();
1093        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1094        assert_eq!((key, value), (&items[1].key, &items[1].value));
1095        iter.advance().await.unwrap();
1096        assert!(iter.get().is_none());
1097    }
1098
1099    #[fuchsia::test]
1100    async fn test_merge_into_emit_left() {
1101        let skip_list = SkipListLayer::new(100);
1102        let items =
1103            [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2), Item::new(TestKey(3..3), 3)];
1104        skip_list.insert(items[0].clone()).expect("insert error");
1105        skip_list.insert(items[2].clone()).expect("insert error");
1106        skip_list
1107            .merge_into(items[1].clone(), &items[0].key, |_left, _right| MergeResult::EmitLeft);
1108
1109        let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1110        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1111        assert_eq!((key, value), (&items[0].key, &items[0].value));
1112        iter.advance().await.unwrap();
1113        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1114        assert_eq!((key, value), (&items[1].key, &items[1].value));
1115        iter.advance().await.unwrap();
1116        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1117        assert_eq!((key, value), (&items[2].key, &items[2].value));
1118        iter.advance().await.unwrap();
1119        assert!(iter.get().is_none());
1120    }
1121
1122    #[fuchsia::test]
1123    async fn test_merge_into_emit_last_after_replacing() {
1124        let skip_list = SkipListLayer::new(100);
1125        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
1126        skip_list.insert(items[0].clone()).expect("insert error");
1127
1128        skip_list.merge_into(items[1].clone(), &items[0].key, |left, right| {
1129            if left.key() == &TestKey(1..1) {
1130                assert_eq!(right.key(), &TestKey(2..2));
1131                MergeResult::Other {
1132                    emit: None,
1133                    left: Replace(Item::new(TestKey(3..3), 3).boxed()),
1134                    right: Keep,
1135                }
1136            } else {
1137                assert_eq!(left.key(), &TestKey(2..2));
1138                assert_eq!(right.key(), &TestKey(3..3));
1139                MergeResult::EmitLeft
1140            }
1141        });
1142
1143        let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1144        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1145        assert_eq!((key, value), (&items[1].key, &items[1].value));
1146        iter.advance().await.unwrap();
1147        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1148        assert_eq!((key, value), (&TestKey(3..3), &3));
1149        iter.advance().await.unwrap();
1150        assert!(iter.get().is_none());
1151    }
1152
1153    #[fuchsia::test]
1154    async fn test_merge_into_emit_left_after_replacing() {
1155        let skip_list = SkipListLayer::new(100);
1156        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(3..3), 3)];
1157        skip_list.insert(items[0].clone()).expect("insert error");
1158
1159        skip_list.merge_into(items[1].clone(), &items[0].key, |left, right| {
1160            if left.key() == &TestKey(1..1) {
1161                assert_eq!(right.key(), &TestKey(3..3));
1162                MergeResult::Other {
1163                    emit: None,
1164                    left: Replace(Item::new(TestKey(2..2), 2).boxed()),
1165                    right: Keep,
1166                }
1167            } else {
1168                assert_eq!(left.key(), &TestKey(2..2));
1169                assert_eq!(right.key(), &TestKey(3..3));
1170                MergeResult::EmitLeft
1171            }
1172        });
1173
1174        let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1175        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1176        assert_eq!((key, value), (&TestKey(2..2), &2));
1177        iter.advance().await.unwrap();
1178        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1179        assert_eq!((key, value), (&items[1].key, &items[1].value));
1180        iter.advance().await.unwrap();
1181        assert!(iter.get().is_none());
1182    }
1183
1184    // This tests emitting in both branches of merge_into, and most of the discard paths.
1185    #[fuchsia::test]
1186    async fn test_merge_into_emit_other_and_discard() {
1187        let skip_list = SkipListLayer::new(100);
1188        let items =
1189            [Item::new(TestKey(1..1), 1), Item::new(TestKey(3..3), 3), Item::new(TestKey(5..5), 3)];
1190        skip_list.insert(items[0].clone()).expect("insert error");
1191        skip_list.insert(items[2].clone()).expect("insert error");
1192
1193        skip_list.merge_into(items[1].clone(), &items[0].key, |left, right| {
1194            if left.key() == &TestKey(1..1) {
1195                // This tests the top branch in merge_into.
1196                assert_eq!(right.key(), &TestKey(3..3));
1197                MergeResult::Other {
1198                    emit: Some(Item::new(TestKey(2..2), 2).boxed()),
1199                    left: Discard,
1200                    right: Keep,
1201                }
1202            } else {
1203                // This tests the bottom branch in merge_into.
1204                assert_eq!(left.key(), &TestKey(3..3));
1205                assert_eq!(right.key(), &TestKey(5..5));
1206                MergeResult::Other {
1207                    emit: Some(Item::new(TestKey(4..4), 4).boxed()),
1208                    left: Discard,
1209                    right: Discard,
1210                }
1211            }
1212        });
1213
1214        let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1215        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1216        assert_eq!((key, value), (&TestKey(2..2), &2));
1217        iter.advance().await.unwrap();
1218        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1219        assert_eq!((key, value), (&TestKey(4..4), &4));
1220        iter.advance().await.unwrap();
1221        assert!(iter.get().is_none());
1222    }
1223
1224    // This tests replacing the item and discarding the right item (the one remaining untested
1225    // discard path) in the top branch in merge_into.
1226    #[fuchsia::test]
1227    async fn test_merge_into_replace_and_discard() {
1228        let skip_list = SkipListLayer::new(100);
1229        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(3..3), 3)];
1230        skip_list.insert(items[0].clone()).expect("insert error");
1231
1232        skip_list.merge_into(items[1].clone(), &items[0].key, |_left, _right| MergeResult::Other {
1233            emit: Some(Item::new(TestKey(2..2), 2).boxed()),
1234            left: Replace(Item::new(TestKey(4..4), 4).boxed()),
1235            right: Discard,
1236        });
1237
1238        let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1239        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1240        assert_eq!((key, value), (&TestKey(2..2), &2));
1241        iter.advance().await.unwrap();
1242        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1243        assert_eq!((key, value), (&TestKey(4..4), &4));
1244        iter.advance().await.unwrap();
1245        assert!(iter.get().is_none());
1246    }
1247
1248    // This tests replacing the right item in the top branch of merge_into and the left item in the
1249    // bottom branch of merge_into.
1250    #[fuchsia::test]
1251    async fn test_merge_into_replace_merge_item() {
1252        let skip_list = SkipListLayer::new(100);
1253        let items =
1254            [Item::new(TestKey(1..1), 1), Item::new(TestKey(3..3), 3), Item::new(TestKey(5..5), 5)];
1255        skip_list.insert(items[0].clone()).expect("insert error");
1256        skip_list.insert(items[2].clone()).expect("insert error");
1257
1258        skip_list.merge_into(items[1].clone(), &items[0].key, |_left, right| {
1259            if right.key() == &TestKey(3..3) {
1260                MergeResult::Other {
1261                    emit: None,
1262                    left: Discard,
1263                    right: Replace(Item::new(TestKey(2..2), 2).boxed()),
1264                }
1265            } else {
1266                assert_eq!(right.key(), &TestKey(5..5));
1267                MergeResult::Other {
1268                    emit: None,
1269                    left: Replace(Item::new(TestKey(4..4), 4).boxed()),
1270                    right: Discard,
1271                }
1272            }
1273        });
1274
1275        let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1276        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1277        assert_eq!((key, value), (&TestKey(4..4), &4));
1278        iter.advance().await.unwrap();
1279        assert!(iter.get().is_none());
1280    }
1281
1282    // This tests replacing the right item in the bottom branch of merge_into.
1283    #[fuchsia::test]
1284    async fn test_merge_into_replace_existing() {
1285        let skip_list = SkipListLayer::new(100);
1286        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(3..3), 3)];
1287        skip_list.insert(items[1].clone()).expect("insert error");
1288
1289        skip_list.merge_into(items[0].clone(), &items[0].key, |_left, right| {
1290            if right.key() == &TestKey(3..3) {
1291                MergeResult::Other {
1292                    emit: None,
1293                    left: Keep,
1294                    right: Replace(Item::new(TestKey(2..2), 2).boxed()),
1295                }
1296            } else {
1297                MergeResult::EmitLeft
1298            }
1299        });
1300
1301        let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1302        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1303        assert_eq!((key, value), (&items[0].key, &items[0].value));
1304        iter.advance().await.unwrap();
1305        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1306        assert_eq!((key, value), (&TestKey(2..2), &2));
1307        iter.advance().await.unwrap();
1308        assert!(iter.get().is_none());
1309    }
1310
1311    #[fuchsia::test]
1312    async fn test_merge_into_discard_last() {
1313        let skip_list = SkipListLayer::new(100);
1314        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
1315        skip_list.insert(items[0].clone()).expect("insert error");
1316
1317        skip_list.merge_into(items[1].clone(), &items[0].key, |_left, _right| MergeResult::Other {
1318            emit: None,
1319            left: Discard,
1320            right: Keep,
1321        });
1322
1323        let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1324        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1325        assert_eq!((key, value), (&items[1].key, &items[1].value));
1326        iter.advance().await.unwrap();
1327        assert!(iter.get().is_none());
1328    }
1329
1330    #[fuchsia::test]
1331    async fn test_merge_into_empty() {
1332        let skip_list = SkipListLayer::new(100);
1333        let items = [Item::new(TestKey(1..1), 1)];
1334
1335        skip_list.merge_into(items[0].clone(), &items[0].key, |_left, _right| {
1336            panic!("Unexpected merge!");
1337        });
1338
1339        let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1340        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1341        assert_eq!((key, value), (&items[0].key, &items[0].value));
1342        iter.advance().await.unwrap();
1343        assert!(iter.get().is_none());
1344    }
1345
1346    #[fuchsia::test]
1347    async fn test_seek_uses_minimum_number_of_iterators() {
1348        let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1349        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(1..1), 2)];
1350        skip_lists[0].insert(items[0].clone()).expect("insert error");
1351        skip_lists[1].insert(items[1].clone()).expect("insert error");
1352        let mut merger = Merger::new(
1353            layer_ref_iter(&skip_lists),
1354            |_left, _right| MergeResult::Other { emit: None, left: Discard, right: Keep },
1355            counters(),
1356        );
1357        let iter = merger.query(Query::FullRange(&items[0].key)).await.expect("seek failed");
1358
1359        // Seek should only search in the first skip list, so no merge should take place, and we'll
1360        // know if it has because we'll see a different value (2 rather than 1).
1361        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1362        assert_eq!((key, value), (&items[0].key, &items[0].value));
1363    }
1364
1365    // Checks that merging the given layers produces |expected| sequence of items starting from
1366    // |start|.
1367    async fn test_advance<K: Eq + Key + LayerKey + OrdLowerBound>(
1368        layers: &[&[(K, i32)]],
1369        query: Query<'_, K>,
1370        expected: &[(K, i32)],
1371    ) {
1372        let mut skip_lists = Vec::new();
1373        for &layer in layers {
1374            let skip_list = SkipListLayer::new(100);
1375            for (k, v) in layer {
1376                skip_list.insert(Item::new(k.clone(), *v)).expect("insert error");
1377            }
1378            skip_lists.push(skip_list);
1379        }
1380        let mut merger = Merger::new(
1381            layer_ref_iter(&skip_lists),
1382            |_left, _right| MergeResult::EmitLeft,
1383            counters(),
1384        );
1385        let mut iter = merger.query(query).await.expect("seek failed");
1386        for (k, v) in expected {
1387            let ItemRef { key, value, .. } = iter.get().expect("get failed");
1388            assert_eq!((key, value), (k, v));
1389            iter.advance().await.expect("advance failed");
1390        }
1391        assert!(iter.get().is_none());
1392    }
1393
1394    #[fuchsia::test]
1395    async fn test_seek_skips_replaced_items() {
1396        // The 1..2 and the 2..3 items are overwritten and merging them should be skipped.
1397        test_advance(
1398            &[
1399                &[(TestKey(1..2), 1), (TestKey(2..3), 2), (TestKey(4..5), 3)],
1400                &[(TestKey(1..2), 4), (TestKey(2..3), 5), (TestKey(3..4), 6)],
1401            ],
1402            Query::FullRange(&TestKey(1..2)),
1403            &[(TestKey(1..2), 1), (TestKey(2..3), 2), (TestKey(3..4), 6), (TestKey(4..5), 3)],
1404        )
1405        .await;
1406    }
1407
1408    #[fuchsia::test]
1409    async fn test_advance_skips_replaced_items_at_end() {
1410        // Like the last test, the 1..2 item is overwritten and seeking for it should skip the merge
1411        // but this time, the keys are at the end.
1412        test_advance(
1413            &[&[(TestKey(1..2), 1)], &[(TestKey(1..2), 2)]],
1414            Query::FullRange(&TestKey(1..2)),
1415            &[(TestKey(1..2), 1)],
1416        )
1417        .await;
1418    }
1419
1420    #[derive(
1421        Clone,
1422        Eq,
1423        Hash,
1424        FuzzyHash,
1425        PartialEq,
1426        Debug,
1427        serde::Serialize,
1428        serde::Deserialize,
1429        TypeFingerprint,
1430        Versioned,
1431    )]
1432    struct TestKeyWithFullMerge(Range<u64>);
1433
1434    versioned_type! { 1.. => TestKeyWithFullMerge }
1435
1436    impl LayerKey for TestKeyWithFullMerge {
1437        fn merge_type(&self) -> MergeType {
1438            MergeType::FullMerge
1439        }
1440    }
1441
1442    impl SortByU64 for TestKeyWithFullMerge {
1443        fn get_leading_u64(&self) -> u64 {
1444            self.0.start
1445        }
1446    }
1447
1448    impl OrdUpperBound for TestKeyWithFullMerge {
1449        fn cmp_upper_bound(&self, other: &TestKeyWithFullMerge) -> std::cmp::Ordering {
1450            self.0.end.cmp(&other.0.end)
1451        }
1452    }
1453
1454    impl OrdLowerBound for TestKeyWithFullMerge {
1455        fn cmp_lower_bound(&self, other: &Self) -> std::cmp::Ordering {
1456            self.0.start.cmp(&other.0.start)
1457        }
1458    }
1459
1460    // Same set up as `test_seek_skips_replaced_items` except here nothing should skip. Any seek
1461    // should just place the iterator at a particular spot in the merged layers.
1462    #[fuchsia::test]
1463    async fn test_full_merge_consistent_advance_ordering() {
1464        let layer_set = [
1465            [
1466                (TestKeyWithFullMerge(1..2), 1i32),
1467                (TestKeyWithFullMerge(2..3), 2i32),
1468                (TestKeyWithFullMerge(4..5), 3i32),
1469            ]
1470            .as_slice(),
1471            [
1472                (TestKeyWithFullMerge(1..2), 4i32),
1473                (TestKeyWithFullMerge(2..3), 5i32),
1474                (TestKeyWithFullMerge(3..4), 6i32),
1475            ]
1476            .as_slice(),
1477        ];
1478
1479        let full_merge_result = [
1480            (TestKeyWithFullMerge(1..2), 1),
1481            (TestKeyWithFullMerge(1..2), 4),
1482            (TestKeyWithFullMerge(2..3), 2),
1483            (TestKeyWithFullMerge(2..3), 5),
1484            (TestKeyWithFullMerge(3..4), 6),
1485            (TestKeyWithFullMerge(4..5), 3),
1486        ];
1487
1488        test_advance(layer_set.as_slice(), Query::FullScan, &full_merge_result).await;
1489
1490        test_advance(
1491            layer_set.as_slice(),
1492            Query::FullRange(&TestKeyWithFullMerge(1..2)),
1493            &full_merge_result,
1494        )
1495        .await;
1496
1497        test_advance(
1498            layer_set.as_slice(),
1499            Query::FullRange(&TestKeyWithFullMerge(2..3)),
1500            &full_merge_result[2..],
1501        )
1502        .await;
1503
1504        test_advance(
1505            layer_set.as_slice(),
1506            Query::FullRange(&TestKeyWithFullMerge(3..4)),
1507            &full_merge_result[4..],
1508        )
1509        .await;
1510    }
1511
1512    #[fuchsia::test]
1513    async fn test_full_merge_always_consult_all_layers() {
1514        // | 1 |   |
1515        // |   | 2 |
1516        // | 3 | 4 |
1517        let skip_lists =
1518            [SkipListLayer::new(100), SkipListLayer::new(100), SkipListLayer::new(100)];
1519        let items = [
1520            Item::new(TestKeyWithFullMerge(1..2), 1),
1521            Item::new(TestKeyWithFullMerge(2..3), 2),
1522            Item::new(TestKeyWithFullMerge(1..2), 3),
1523            Item::new(TestKeyWithFullMerge(2..3), 4),
1524        ];
1525        skip_lists[0].insert(items[0].clone()).expect("insert error");
1526        skip_lists[1].insert(items[1].clone()).expect("insert error");
1527        skip_lists[2].insert(items[2].clone()).expect("insert error");
1528        skip_lists[2].insert(items[3].clone()).expect("insert error");
1529        let mut merger = Merger::new(
1530            layer_ref_iter(&skip_lists),
1531            |left, right| {
1532                // Sum matching keys.
1533                if left.key() == right.key() {
1534                    MergeResult::Other {
1535                        emit: None,
1536                        left: Discard,
1537                        right: Replace(
1538                            Item::new(left.key().clone(), left.value() + right.value()).boxed(),
1539                        ),
1540                    }
1541                } else {
1542                    MergeResult::EmitLeft
1543                }
1544            },
1545            counters(),
1546        );
1547        let mut iter = merger.query(Query::FullRange(&items[0].key)).await.expect("seek failed");
1548
1549        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1550        assert_eq!((key, *value), (&items[0].key, items[0].value + items[2].value));
1551        iter.advance().await.expect("advance");
1552        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1553        assert_eq!((key, *value), (&items[1].key, items[1].value + items[3].value));
1554        iter.advance().await.expect("advance");
1555        assert!(iter.get().is_none());
1556    }
1557
1558    #[derive(
1559        Clone,
1560        Eq,
1561        Hash,
1562        FuzzyHash,
1563        PartialEq,
1564        Debug,
1565        serde::Serialize,
1566        serde::Deserialize,
1567        TypeFingerprint,
1568        Versioned,
1569    )]
1570    struct TestKeyWithDefaultLayerKey(Range<u64>);
1571
1572    versioned_type! { 1.. => TestKeyWithDefaultLayerKey }
1573
1574    // Default layer key is using `MergeType::FullMerge` and returns None for `next_key()`.
1575    impl LayerKey for TestKeyWithDefaultLayerKey {}
1576
1577    impl SortByU64 for TestKeyWithDefaultLayerKey {
1578        fn get_leading_u64(&self) -> u64 {
1579            self.0.start
1580        }
1581    }
1582
1583    impl OrdUpperBound for TestKeyWithDefaultLayerKey {
1584        fn cmp_upper_bound(&self, other: &TestKeyWithDefaultLayerKey) -> std::cmp::Ordering {
1585            self.0.end.cmp(&other.0.end)
1586        }
1587    }
1588
1589    impl OrdLowerBound for TestKeyWithDefaultLayerKey {
1590        fn cmp_lower_bound(&self, other: &Self) -> std::cmp::Ordering {
1591            self.0.start.cmp(&other.0.start)
1592        }
1593    }
1594
1595    #[fuchsia::test]
1596    async fn test_no_merge_unbounded_include_all_layers() {
1597        test_advance(
1598            &[
1599                &[
1600                    (TestKeyWithDefaultLayerKey(1..2), 1),
1601                    (TestKeyWithDefaultLayerKey(2..3), 2),
1602                    (TestKeyWithDefaultLayerKey(4..5), 3),
1603                ],
1604                &[
1605                    (TestKeyWithDefaultLayerKey(1..2), 4),
1606                    (TestKeyWithDefaultLayerKey(2..3), 5),
1607                    (TestKeyWithDefaultLayerKey(3..4), 6),
1608                ],
1609            ],
1610            Query::FullScan,
1611            &[
1612                (TestKeyWithDefaultLayerKey(1..2), 1),
1613                (TestKeyWithDefaultLayerKey(1..2), 4),
1614                (TestKeyWithDefaultLayerKey(2..3), 2),
1615                (TestKeyWithDefaultLayerKey(2..3), 5),
1616                (TestKeyWithDefaultLayerKey(3..4), 6),
1617                (TestKeyWithDefaultLayerKey(4..5), 3),
1618            ],
1619        )
1620        .await;
1621    }
1622
1623    #[fuchsia::test]
1624    async fn test_no_merge_proceeds_comprehensively_after_seek() {
1625        test_advance(
1626            &[
1627                &[
1628                    (TestKeyWithDefaultLayerKey(1..2), 1),
1629                    (TestKeyWithDefaultLayerKey(2..3), 2),
1630                    (TestKeyWithDefaultLayerKey(4..5), 3),
1631                ],
1632                &[
1633                    (TestKeyWithDefaultLayerKey(1..2), 4),
1634                    (TestKeyWithDefaultLayerKey(2..3), 5),
1635                    (TestKeyWithDefaultLayerKey(3..4), 6),
1636                ],
1637            ],
1638            Query::FullRange(&TestKeyWithDefaultLayerKey(1..2)),
1639            &[
1640                (TestKeyWithDefaultLayerKey(1..2), 1),
1641                (TestKeyWithDefaultLayerKey(1..2), 4),
1642                (TestKeyWithDefaultLayerKey(2..3), 2),
1643                (TestKeyWithDefaultLayerKey(2..3), 5),
1644                (TestKeyWithDefaultLayerKey(3..4), 6),
1645                (TestKeyWithDefaultLayerKey(4..5), 3),
1646            ],
1647        )
1648        .await;
1649    }
1650
1651    #[fuchsia::test]
1652    async fn test_no_merge_seek_finds_lower_layer() {
1653        let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1654        let items = [
1655            Item::new(TestKeyWithDefaultLayerKey(2..3), 1),
1656            Item::new(TestKeyWithDefaultLayerKey(1..1), 2),
1657        ];
1658        skip_lists[0].insert(items[0].clone()).expect("insert error");
1659        skip_lists[1].insert(items[1].clone()).expect("insert error");
1660        let mut merger = Merger::new(
1661            layer_ref_iter(&skip_lists),
1662            |_left, _right| MergeResult::EmitLeft,
1663            counters(),
1664        );
1665        let iter = merger.query(Query::FullRange(&items[1].key)).await.expect("seek failed");
1666
1667        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1668        assert_eq!((key, value), (&items[1].key, &items[1].value));
1669    }
1670
1671    #[fuchsia::test]
1672    async fn test_no_merge_seek_stops_at_exact_match() {
1673        let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1674        let items = [
1675            Item::new(TestKeyWithDefaultLayerKey(2..3), 1),
1676            Item::new(TestKeyWithDefaultLayerKey(1..4), 2),
1677        ];
1678        skip_lists[0].insert(items[0].clone()).expect("insert error");
1679        skip_lists[1].insert(items[1].clone()).expect("insert error");
1680        let mut merger = Merger::new(
1681            layer_ref_iter(&skip_lists),
1682            |_left, _right| MergeResult::Other { emit: None, left: Discard, right: Keep },
1683            counters(),
1684        );
1685        let iter = merger.query(Query::FullRange(&items[0].key)).await.expect("seek failed");
1686
1687        // Seek should only search in the first skip list, so no merge should take place, and we'll
1688        // know if it has because we'll see a different value (2 rather than 1).
1689        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1690        assert_eq!((key, value), (&items[0].key, &items[0].value));
1691    }
1692
1693    #[fuchsia::test]
1694    async fn test_seek_less_than() {
1695        let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1696        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
1697        skip_lists[0].insert(items[0].clone()).expect("insert error");
1698        skip_lists[1].insert(items[1].clone()).expect("insert error");
1699        // Search for a key before 1..1.
1700        let mut merger = Merger::new(
1701            layer_ref_iter(&skip_lists),
1702            |_left, _right| MergeResult::Other { emit: None, left: Discard, right: Keep },
1703            counters(),
1704        );
1705        let iter = merger.query(Query::FullRange(&TestKey(0..0))).await.expect("seek failed");
1706
1707        // This should find the 2..2 key because of our merge function.
1708        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1709        assert_eq!((key, value), (&items[1].key, &items[1].value));
1710    }
1711
1712    #[fuchsia::test]
1713    async fn test_seek_to_end() {
1714        let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1715        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
1716        skip_lists[0].insert(items[0].clone()).expect("insert error");
1717        skip_lists[1].insert(items[1].clone()).expect("insert error");
1718        let mut merger = Merger::new(
1719            layer_ref_iter(&skip_lists),
1720            |_left, _right| MergeResult::Other { emit: None, left: Discard, right: Keep },
1721            counters(),
1722        );
1723        let iter = merger.query(Query::FullRange(&TestKey(3..3))).await.expect("seek failed");
1724
1725        assert!(iter.get().is_none());
1726    }
1727
1728    #[fuchsia::test]
1729    async fn test_merge_all_discarded() {
1730        let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1731        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
1732        skip_lists[0].insert(items[1].clone()).expect("insert error");
1733        skip_lists[1].insert(items[0].clone()).expect("insert error");
1734        let mut merger = Merger::new(
1735            layer_ref_iter(&skip_lists),
1736            |_left, _right| MergeResult::Other { emit: None, left: Discard, right: Discard },
1737            counters(),
1738        );
1739        let iter = merger.query(Query::FullScan).await.expect("seek failed");
1740        assert!(iter.get().is_none());
1741    }
1742
1743    #[fuchsia::test]
1744    async fn test_seek_with_merged_key_less_than() {
1745        let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1746        let items = [Item::new(TestKey(1..8), 1), Item::new(TestKey(2..10), 2)];
1747        skip_lists[0].insert(items[0].clone()).expect("insert error");
1748        skip_lists[1].insert(items[1].clone()).expect("insert error");
1749        let mut merger = Merger::new(
1750            layer_ref_iter(&skip_lists),
1751            |left, _right| {
1752                if left.key() == &TestKey(1..8) {
1753                    MergeResult::Other {
1754                        emit: None,
1755                        left: Replace(Item::new(TestKey(1..2), 1).boxed()),
1756                        right: Keep,
1757                    }
1758                } else {
1759                    MergeResult::EmitLeft
1760                }
1761            },
1762            counters(),
1763        );
1764        let iter = merger.query(Query::FullRange(&TestKey(0..3))).await.expect("seek failed");
1765        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1766        assert_eq!((key, value), (&items[1].key, &items[1].value));
1767    }
1768
1769    #[fuchsia::test]
1770    async fn test_overlapping_keys() {
1771        let skip_lists =
1772            [SkipListLayer::new(100), SkipListLayer::new(100), SkipListLayer::new(100)];
1773        let items = [
1774            Item::new(TestKey(0..10), 1),
1775            Item::new(TestKey(0..20), 2),
1776            Item::new(TestKey(0..30), 3),
1777        ];
1778        skip_lists[0].insert(items[0].clone()).expect("insert error");
1779        skip_lists[1].insert(items[1].clone()).expect("insert error");
1780        skip_lists[2].insert(items[2].clone()).expect("insert error");
1781        let mut merger = Merger::new(
1782            layer_ref_iter(&skip_lists),
1783            |left, right| {
1784                let result = if left.key().0.end <= right.key().0.start {
1785                    MergeResult::EmitLeft
1786                } else {
1787                    if left.key() == &TestKey(0..30) && right.key() == &TestKey(10..20) {
1788                        MergeResult::Other {
1789                            emit: Some(Item::new(TestKey(0..10), 1).boxed()),
1790                            left: Replace(Item::new(TestKey(10..30), 1).boxed()),
1791                            right: Keep,
1792                        }
1793                    } else {
1794                        MergeResult::Other {
1795                            emit: None,
1796                            left: Keep,
1797                            right: Replace(
1798                                Item::new(TestKey(left.key().0.end..right.key().0.end), 1).boxed(),
1799                            ),
1800                        }
1801                    }
1802                };
1803                result
1804            },
1805            counters(),
1806        );
1807        let mut iter = merger.query(Query::FullRange(&TestKey(0..1))).await.expect("seek failed");
1808        let ItemRef { key, .. } = iter.get().expect("missing item");
1809        assert_eq!(key, &TestKey(0..10));
1810        iter.advance().await.expect("advance failed");
1811        let ItemRef { key, .. } = iter.get().expect("missing item");
1812        assert_eq!(key, &TestKey(10..20));
1813        iter.advance().await.expect("advance failed");
1814        let ItemRef { key, .. } = iter.get().expect("missing item");
1815        assert_eq!(key, &TestKey(20..30));
1816        iter.advance().await.expect("advance failed");
1817        assert_eq!(iter.get(), None);
1818    }
1819
1820    async fn write_layer<K: Key, V: Value>(items: Vec<Item<K, V>>) -> Arc<dyn Layer<K, V>> {
1821        let object = Arc::new(FakeObject::new());
1822        let write_handle = FakeObjectHandle::new(object.clone());
1823        let mut writer =
1824            PersistentLayerWriter::<_, K, V>::new(Writer::new(&write_handle).await, 1, 512)
1825                .await
1826                .expect("PersistentLayerWriter::new failed");
1827        for item in items {
1828            writer.write(item.as_item_ref()).await.expect("write failed");
1829        }
1830        writer.flush().await.expect("flush failed");
1831        PersistentLayer::open(FakeObjectHandle::new(object))
1832            .await
1833            .expect("open_persistent_layer failed")
1834    }
1835
1836    fn merge_sum(
1837        left: &MergeLayerIterator<'_, i32, i32>,
1838        right: &MergeLayerIterator<'_, i32, i32>,
1839    ) -> MergeResult<i32, i32> {
1840        // Sum matching keys.
1841        if left.key() == right.key() {
1842            MergeResult::Other {
1843                emit: None,
1844                left: Discard,
1845                right: Replace(Item::new(left.key().clone(), left.value() + right.value()).boxed()),
1846            }
1847        } else {
1848            MergeResult::EmitLeft
1849        }
1850    }
1851
1852    #[fuchsia::test]
1853    async fn test_merge_bloom_filters_point_query() {
1854        let layer_0_items = vec![Item::new(1, 1), Item::new(2, 1)];
1855        let layer_1_items = vec![Item::new(2, 1), Item::new(4, 1)];
1856        let items = [Item::new(1, 1), Item::new(2, 2), Item::new(4, 1)];
1857        let layers: [Arc<dyn Layer<i32, i32>>; 2] =
1858            [write_layer(layer_0_items).await, write_layer(layer_1_items).await];
1859        let mut merger = Merger::new(dyn_layer_ref_iter(&layers), merge_sum, counters());
1860
1861        {
1862            // Key that exists in layer 0 only
1863            let iter = merger.query(Query::Point(&1)).await.expect("seek failed");
1864            let ItemRef { key, value, .. } = iter.get().expect("missing item");
1865            assert_eq!((key, *value), (&items[0].key, items[0].value));
1866        }
1867        {
1868            // Key that exists in layer 1 and 2
1869            let iter = merger.query(Query::Point(&2)).await.expect("seek failed");
1870            let ItemRef { key, value, .. } = iter.get().expect("missing item");
1871            assert_eq!((key, *value), (&items[1].key, items[1].value));
1872        }
1873        {
1874            // Key that exists in layer 1
1875            let iter = merger.query(Query::Point(&4)).await.expect("seek failed");
1876            let ItemRef { key, value, .. } = iter.get().expect("missing item");
1877            assert_eq!((key, *value), (&items[2].key, items[2].value));
1878        }
1879        {
1880            // Key that doesn't exist at all
1881            let iter = merger.query(Query::Point(&400)).await.expect("seek failed");
1882            assert!(iter.get().is_none());
1883        }
1884    }
1885
1886    #[fuchsia::test]
1887    async fn test_merge_bloom_filters_limited_range() {
1888        // NB: This test uses ObjectKey so that we don't have to reimplement the complex merging
1889        // logic for range-like keys.
1890        let layer_0_items = vec![Item::new(
1891            ObjectKey::extent(0, 0, 0..2048),
1892            ObjectValue::extent(0, VOLUME_DATA_KEY_ID),
1893        )];
1894        let layer_1_items = vec![
1895            Item::new(
1896                ObjectKey::extent(0, 0, 1024..4096),
1897                ObjectValue::extent(32768, VOLUME_DATA_KEY_ID),
1898            ),
1899            Item::new(
1900                ObjectKey::extent(0, 0, 16384..17408),
1901                ObjectValue::extent(65536, VOLUME_DATA_KEY_ID),
1902            ),
1903        ];
1904        let items = [
1905            Item::new(ObjectKey::extent(0, 0, 0..2048), ObjectValue::extent(0, VOLUME_DATA_KEY_ID)),
1906            Item::new(
1907                ObjectKey::extent(0, 0, 2048..4096),
1908                ObjectValue::extent(33792, VOLUME_DATA_KEY_ID),
1909            ),
1910            Item::new(
1911                ObjectKey::extent(0, 0, 16384..17408),
1912                ObjectValue::extent(65536, VOLUME_DATA_KEY_ID),
1913            ),
1914        ];
1915        let layers: [Arc<dyn Layer<ObjectKey, ObjectValue>>; 2] =
1916            [write_layer(layer_0_items).await, write_layer(layer_1_items).await];
1917        let mut merger =
1918            Merger::new(dyn_layer_ref_iter(&layers), object_store::merge::merge, counters());
1919
1920        {
1921            // Range contains just keys in layer 1
1922            let mut iter = merger
1923                .query(Query::LimitedRange(&ObjectKey::extent(0, 0, 16384..16386)))
1924                .await
1925                .expect("seek failed");
1926            let ItemRef { key, value, .. } = iter.get().expect("missing item");
1927            assert_eq!(key, &items[2].key);
1928            assert_eq!(value, &items[2].value);
1929            iter.advance().await.expect("advance");
1930            assert!(iter.get().is_none());
1931        }
1932        {
1933            // Range contains keys in layer 0 and 1
1934            let mut iter = merger
1935                .query(Query::LimitedRange(&ObjectKey::extent(0, 0, 0..4096)))
1936                .await
1937                .expect("seek failed");
1938            let ItemRef { key, value, .. } = iter.get().expect("missing item");
1939            assert_eq!(key, &items[0].key);
1940            assert_eq!(value, &items[0].value);
1941            iter.advance().await.expect("advance");
1942            let ItemRef { key, value, .. } = iter.get().expect("missing item");
1943            assert_eq!(key, &items[1].key);
1944            assert_eq!(value, &items[1].value);
1945            iter.advance().await.expect("advance");
1946        }
1947        {
1948            // Range contains no keys
1949            let mut iter = merger
1950                .query(Query::LimitedRange(&ObjectKey::extent(0, 0, 8192..12288)))
1951                .await
1952                .expect("seek failed");
1953            let ItemRef { key, .. } = iter.get().expect("missing item");
1954            assert_eq!(key, &items[2].key);
1955            iter.advance().await.expect("advance");
1956            assert!(iter.get().is_none());
1957        }
1958    }
1959
1960    #[fuchsia::test]
1961    async fn test_merge_bloom_filters_full_range() {
1962        let layer_0_items = vec![Item::new(1, 1), Item::new(2, 1)];
1963        let layer_1_items = vec![Item::new(2, 1), Item::new(4, 1)];
1964        let items = [Item::new(1, 1), Item::new(2, 2), Item::new(4, 1)];
1965        let layers: [Arc<dyn Layer<i32, i32>>; 2] =
1966            [write_layer(layer_0_items).await, write_layer(layer_1_items).await];
1967        let mut merger = Merger::new(dyn_layer_ref_iter(&layers), merge_sum, counters());
1968
1969        let mut iter = merger.query(Query::FullRange(&0)).await.expect("seek failed");
1970        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1971        assert_eq!((key, *value), (&items[0].key, items[0].value));
1972        iter.advance().await.expect("advance");
1973        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1974        assert_eq!((key, *value), (&items[1].key, items[1].value));
1975        iter.advance().await.expect("advance");
1976        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1977        assert_eq!((key, *value), (&items[2].key, items[2].value));
1978        iter.advance().await.expect("advance");
1979        assert!(iter.get().is_none());
1980    }
1981
1982    #[fuchsia::test]
1983    async fn test_merge_bloom_filters_full_scan() {
1984        let layer_0_items = vec![Item::new(1, 1), Item::new(2, 1)];
1985        let layer_1_items = vec![Item::new(2, 1), Item::new(4, 1)];
1986        let items = [Item::new(1, 1), Item::new(2, 2), Item::new(4, 1)];
1987        let layers: [Arc<dyn Layer<i32, i32>>; 2] =
1988            [write_layer(layer_0_items).await, write_layer(layer_1_items).await];
1989        let mut merger = Merger::new(dyn_layer_ref_iter(&layers), merge_sum, counters());
1990
1991        let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
1992        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1993        assert_eq!((key, *value), (&items[0].key, items[0].value));
1994        iter.advance().await.expect("advance");
1995        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1996        assert_eq!((key, *value), (&items[1].key, items[1].value));
1997        iter.advance().await.expect("advance");
1998        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1999        assert_eq!((key, *value), (&items[2].key, items[2].value));
2000        iter.advance().await.expect("advance");
2001        assert!(iter.get().is_none());
2002    }
2003}