Skip to main content

fxfs/lsm_tree/
merge.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::log::*;
6use crate::lsm_tree;
7use crate::lsm_tree::types::{
8    BoxedItem, Item, ItemRef, Key, Layer, LayerIterator, LayerIteratorMut, LayerKey, MergeType,
9    OrdLowerBound, Value,
10};
11use anyhow::Error;
12use async_trait::async_trait;
13use futures::try_join;
14use std::cmp::Ordering;
15use std::collections::BinaryHeap;
16use std::fmt::{Debug, Write};
17use std::ops::{Bound, Deref, DerefMut};
18use std::sync::{Arc, atomic};
19
20#[derive(Debug, Eq, PartialEq)]
21pub enum ItemOp<K, V> {
22    /// Keeps the item to be presented to the merger subsequently with a new merge pair.
23    Keep,
24
25    /// Discards the item and moves on to the next item in the respective layer.
26    Discard,
27
28    /// Replaces the item with something new which will be presented to the merger subsequently with
29    /// a new pair.
30    Replace(BoxedItem<K, V>),
31}
32
33#[derive(Debug, Eq, PartialEq)]
34pub enum MergeResult<K, V> {
35    /// Emits the left item unchanged. Keeps the right item. This is the common case. Once an item
36    /// has been emitted, it will never be seen again by the merge function.
37    EmitLeft,
38
39    /// All other merge results are covered by the following. Take care when replacing items
40    /// that you replace the correct item. The merger will never merge two items together from
41    /// the same layer. Consider the following scenario:
42    ///
43    ///        +-----------+              +-----------+
44    /// 0:     |    A      |              |    C      |
45    ///        +-----------+--------------+-----------+
46    /// 1:                 |      B       |
47    ///                    +--------------+
48    ///
49    /// Let's say that all three items can be merged together. The merge function will first be
50    /// presented with items A and B, at which point it has the option of replacing the left item
51    /// (i.e. A, in layer 0) or the right item (i.e. B in layer 1). However, if you replace the left
52    /// item, the merge function will not then be given the opportunity to merge it with C, so the
53    /// correct thing to do in this case is to replace the right item B in layer 1, and discard the
54    /// left item. A rule you can use is that you should avoid replacing an item with another item
55    /// whose upper bound exceeds that of the item you are replacing.
56    ///
57    /// There are some combinations that might lead to infinite loops (e.g. None, Keep, Keep) and
58    /// should obviously be avoided.
59    Other { emit: Option<BoxedItem<K, V>>, left: ItemOp<K, V>, right: ItemOp<K, V> },
60}
61
62/// Users must provide a merge function which will take pairs of items, left and right, and return a
63/// merge result. The left item's key will either be less than the right item's key, or if they are
64/// the same, then the left item will be in a lower layer index (lower layer indexes indicate more
65/// recent entries). The last remaining item is always emitted.
66pub type MergeFn<K, V> =
67    fn(&MergeLayerIterator<'_, K, V>, &MergeLayerIterator<'_, K, V>) -> MergeResult<K, V>;
68
69pub enum MergeItem<K, V> {
70    None,
71    Item(BoxedItem<K, V>),
72    Iter,
73}
74
75enum RawIterator<'a, K, V> {
76    None,
77    Const(Box<dyn LayerIterator<K, V> + 'a>),
78    Mut(Box<dyn LayerIteratorMut<K, V> + 'a>),
79}
80
81// SAFETY: LayerIteratorMut is not Sync or Send, but it's only used by merge_into, which doesn't
82// send across threads (and is sync, not async).
83unsafe impl<K, V> Send for RawIterator<'_, K, V> {}
84
85// An iterator that keeps track of where we are for each of the layers. We push these onto a
86// min-heap.
87pub struct MergeLayerIterator<'a, K, V> {
88    layer: Option<&'a dyn Layer<K, V>>,
89
90    // The underlying iterator.
91    iter: RawIterator<'a, K, V>,
92
93    // The index of the layer this is for.
94    pub layer_index: u16,
95
96    // The item we are currently pointing at.
97    item: MergeItem<K, V>,
98}
99
100impl<'a, K, V> MergeLayerIterator<'a, K, V> {
101    pub fn key(&self) -> &K {
102        self.item().key
103    }
104
105    pub fn value(&self) -> &V {
106        self.item().value
107    }
108
109    fn new(layer_index: u16, layer: &'a dyn Layer<K, V>) -> Self {
110        MergeLayerIterator {
111            layer: Some(layer),
112            iter: RawIterator::None,
113            layer_index,
114            item: MergeItem::None,
115        }
116    }
117
118    fn new_with_item(layer_index: u16, item: MergeItem<K, V>) -> Self {
119        MergeLayerIterator { layer: None, iter: RawIterator::None, layer_index, item }
120    }
121
122    fn item(&self) -> ItemRef<'_, K, V> {
123        match &self.item {
124            MergeItem::None => panic!("No item!"),
125            MergeItem::Item(item) => item.as_item_ref(),
126            MergeItem::Iter => self.get().unwrap(),
127        }
128    }
129
130    fn get(&self) -> Option<ItemRef<'_, K, V>> {
131        match &self.iter {
132            RawIterator::None => panic!("No iterator!"),
133            RawIterator::Const(iter) => iter.get(),
134            RawIterator::Mut(iter) => iter.get(),
135        }
136    }
137
138    fn set_item_from_iter(&mut self) {
139        self.item = if match &self.iter {
140            RawIterator::None => unreachable!(),
141            RawIterator::Const(iter) => iter.get(),
142            RawIterator::Mut(iter) => iter.get(),
143        }
144        .is_some()
145        {
146            MergeItem::Iter
147        } else {
148            MergeItem::None
149        };
150    }
151
152    fn take_item(&mut self) -> Option<BoxedItem<K, V>> {
153        if matches!(self.item, MergeItem::Item(_)) {
154            if let MergeItem::Item(item) = std::mem::replace(&mut self.item, MergeItem::None) {
155                return Some(item);
156            }
157        }
158        None
159    }
160
161    async fn advance(&mut self) -> Result<(), Error> {
162        if let MergeItem::Iter = self.item {
163            if let RawIterator::Const(iter) = &mut self.iter {
164                iter.advance().await?;
165            } else {
166                // This will never get called in the RawIterator::Mut case.
167                unreachable!();
168            }
169        }
170        self.set_item_from_iter();
171        Ok(())
172    }
173
174    fn replace(&mut self, item: BoxedItem<K, V>) {
175        self.item = MergeItem::Item(item);
176    }
177
178    fn is_some(&self) -> bool {
179        !matches!(self.item, MergeItem::None)
180    }
181
182    // This function exists so that we can advance multiple iterators concurrently using, say,
183    // try_join!.
184    async fn maybe_advance(&mut self, op: &ItemOp<K, V>) -> Result<(), Error> {
185        if let ItemOp::Keep = op { Ok(()) } else { self.advance().await }
186    }
187}
188
189// -- Ord and friends --
190impl<K: OrdLowerBound, V> Ord for MergeLayerIterator<'_, K, V> {
191    fn cmp(&self, other: &Self) -> Ordering {
192        // Reverse ordering because we want min-heap not max-heap.
193        other.key().cmp_lower_bound(self.key()).then(other.layer_index.cmp(&self.layer_index))
194    }
195}
196impl<K: OrdLowerBound, V> PartialOrd for MergeLayerIterator<'_, K, V> {
197    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
198        Some(self.cmp(other))
199    }
200}
201impl<K: OrdLowerBound, V> PartialEq for MergeLayerIterator<'_, K, V> {
202    fn eq(&self, other: &Self) -> bool {
203        self.cmp(other) == Ordering::Equal
204    }
205}
206impl<K: OrdLowerBound, V> Eq for MergeLayerIterator<'_, K, V> {}
207
208// As we merge items, the current item can be an item that has been replaced (and later emitted) by
209// the merge function, or an item referenced by an iterator, or nothing.
210enum CurrentItem<'a, 'b, K, V> {
211    None,
212    Item(BoxedItem<K, V>),
213    Iterator(&'a mut MergeLayerIterator<'b, K, V>),
214}
215
216impl<'a, 'b, K, V> CurrentItem<'a, 'b, K, V> {
217    fn take_iterator(&mut self) -> Option<&'a mut MergeLayerIterator<'b, K, V>> {
218        if matches!(self, CurrentItem::Iterator(_)) {
219            if let CurrentItem::Iterator(iter) = std::mem::replace(self, CurrentItem::None) {
220                return Some(iter);
221            }
222        }
223        None
224    }
225}
226
227impl<'a, K, V> From<&'a CurrentItem<'_, '_, K, V>> for Option<ItemRef<'a, K, V>> {
228    fn from(iter: &'a CurrentItem<'_, '_, K, V>) -> Option<ItemRef<'a, K, V>> {
229        match iter {
230            CurrentItem::None => None,
231            CurrentItem::Iterator(iterator) => Some(iterator.item()),
232            CurrentItem::Item(item) => Some(item.as_item_ref()),
233        }
234    }
235}
236
237/// Merger is the main entry point to merging.
238pub struct Merger<'a, K, V> {
239    // A buffer containing all the MergeLayerIterator objects.
240    iterators: Vec<MergeLayerIterator<'a, K, V>>,
241
242    // The function to be used for merging items.
243    merge_fn: MergeFn<K, V>,
244
245    // If true, additional logging is enabled.
246    trace: bool,
247
248    // Tracks statistics related to the LSM tree.
249    counters: Arc<lsm_tree::TreeCounters>,
250}
251
252/// Query describes the goal of a search in the LSM tree.  The caller specifies this to guide the
253/// merger on which layer files it must consult.
254/// Layers might be skipped during a query for a few reasons:
255///   * Existence filters might allow layer files to be skipped.
256///   * For bounded queries (i.e. any except FullScan), if the search key has
257///     MergeType::OptimizedMerge, we can use that to omit older layers once we find a match.
258#[derive(Debug, Clone)]
259pub enum Query<'a, K: Key + LayerKey + OrdLowerBound> {
260    /// Point queries look for a specific key in the LSM tree.  In this case, the existence filters
261    /// for each layer file can be used to decide if the layer file needs to be consulted.
262    /// Note that it is an error to use `Point` for range-like keys.  Either `LimitedRange` or
263    /// `FullRange` must be used instead.
264    Point(&'a K),
265    /// LimitedRange queries position the iterator to `K::search_key`, and scans forward until the
266    /// first record which does not overlap the provided key.  In this case, the existence filters
267    /// for each layer file can be used, but we have to check for all possible keys in the range we
268    /// wish to search.  Obviously, that means that the range should be, well, limited.  Fuzzy
269    /// hashes permit this for extent-like keys, but these queries are not the right choice for
270    /// things like searching a directory.
271    LimitedRange(&'a K),
272    /// FullRange queries position the iterator to a starting key, and scan forward to the
273    /// first key of a different type.  In this case, the existence filters are not used.
274    FullRange(&'a K),
275    /// FullScan queries are intended to yield every record in the tree.  In this case, the
276    /// existence filters are not used.
277    FullScan,
278}
279
280impl<'a, K: Key + LayerKey + OrdLowerBound> Query<'a, K> {
281    fn needs_layer<V>(&self, layer: &dyn Layer<K, V>) -> bool {
282        match self {
283            Self::Point(key) => layer.maybe_contains_key(key),
284            Self::LimitedRange(key) => layer.maybe_contains_key(key),
285            Self::FullRange(_) => true,
286            Self::FullScan => true,
287        }
288    }
289}
290
291#[fxfs_trace::trace]
292impl<'a, K: Key + LayerKey + OrdLowerBound, V: Value> Merger<'a, K, V> {
293    pub(super) fn new<I: Iterator<Item = &'a dyn Layer<K, V>>>(
294        layers: I,
295        merge_fn: MergeFn<K, V>,
296        counters: Arc<lsm_tree::TreeCounters>,
297    ) -> Merger<'a, K, V> {
298        Merger {
299            iterators: layers
300                .enumerate()
301                .map(|(index, layer)| MergeLayerIterator::new(index as u16, layer))
302                .collect(),
303            merge_fn: merge_fn,
304            trace: false,
305            counters,
306        }
307    }
308
309    /// Executes `query`, positioning the iterator to the first matching item.  See `Query` for
310    /// details.
311    #[trace]
312    pub async fn query(
313        &mut self,
314        query: Query<'_, K>,
315    ) -> Result<MergerIterator<'_, 'a, K, V>, Error> {
316        if let Query::Point(key) = query {
317            // NB: It is almost certainly an error to provide a range-like key to a Point query,
318            // hence this debug assertion.  This will most likely result in the wrong result, since
319            // the starting bound would be the original range key and therefore we might miss
320            // records that start earlier but overlap with the key.
321            // For example, if there is a layer which contains 0..100 and 100..200, and we search
322            // for 50..150, we should get both extents, which requires a different starting bound
323            // than 50..150 (particularly it would require the `K::search_key`).
324            debug_assert!(!key.is_range_key())
325        };
326        let len = self.iterators.len();
327        let pending_iterators = {
328            fxfs_trace::duration!("Merger::filter_layer_files", "len" => len);
329            self.iterators
330                .iter_mut()
331                .rev()
332                .filter(|l| query.needs_layer(l.layer.clone().unwrap()))
333                .collect::<Vec<&mut MergeLayerIterator<'a, K, V>>>()
334        };
335        let layer_count = pending_iterators.len();
336        {
337            self.counters.num_seeks.fetch_add(1, atomic::Ordering::Relaxed);
338            self.counters.layer_files_total.fetch_add(len, atomic::Ordering::Relaxed);
339            self.counters
340                .layer_files_skipped
341                .fetch_add(len - layer_count, atomic::Ordering::Relaxed);
342        }
343        log::debug!(query:?; "Consulting {}/{} layers", layer_count, len);
344        let mut merger_iter = MergerIterator {
345            merge_fn: self.merge_fn,
346            pending_iterators,
347            heap: BinaryHeap::with_capacity(layer_count),
348            item: CurrentItem::None,
349            trace: self.trace,
350            history: String::new(),
351        };
352        let owned_bound;
353        let bound = match query {
354            Query::Point(key) => Bound::Included(key),
355            Query::LimitedRange(key) => {
356                owned_bound = Bound::Included(key.search_key());
357                owned_bound.as_ref()
358            }
359            Query::FullRange(start) => Bound::Included(start),
360            Query::FullScan => Bound::Unbounded,
361        };
362        merger_iter.seek(bound).await?;
363        Ok(merger_iter)
364    }
365
366    pub fn set_trace(&mut self, v: bool) {
367        self.trace = v;
368    }
369}
370
371/// This is an iterator that will allow iteration over merged layers.  The primary interface is via
372/// the LayerIterator trait.
373pub struct MergerIterator<'a, 'b, K, V> {
374    merge_fn: MergeFn<K, V>,
375
376    // Iterators that we have not yet pushed onto the heap.
377    pending_iterators: Vec<&'a mut MergeLayerIterator<'b, K, V>>,
378
379    // A heap with the merge iterators.
380    heap: BinaryHeap<&'a mut MergeLayerIterator<'b, K, V>>,
381
382    // The current item.
383    item: CurrentItem<'a, 'b, K, V>,
384
385    // If true, logs regarding merger behaviour are appended to history.
386    trace: bool,
387
388    // Holds trace information if trace is true.
389    history: String,
390}
391
392impl<'a, 'b, K: Key + LayerKey + OrdLowerBound, V: Value> MergerIterator<'a, 'b, K, V> {
393    async fn seek(&mut self, bound: Bound<&K>) -> Result<(), Error> {
394        let next_key = match bound {
395            Bound::Unbounded => None,
396            Bound::Included(key) => Some(key),
397            Bound::Excluded(_) => panic!("Excluded bounds not supported!"),
398        };
399
400        self.push_iterators(next_key, bound).await?;
401        self.advance_impl(bound).await
402    }
403
404    // Merges items from an array of layers using the provided merge function. The merge function is
405    // repeatedly provided the lowest and the second lowest element, if one exists. In cases where
406    // the two lowest elements compare equal, the element with the lowest layer (i.e. whichever
407    // comes first in the layers array) will come first.  `next_key_bound` is a bound for the next
408    // key we expect.
409    async fn advance_impl(&mut self, next_key_bound: Bound<&K>) -> Result<(), Error> {
410        loop {
411            loop {
412                if self.heap.is_empty() {
413                    self.item = CurrentItem::None;
414                    return Ok(());
415                }
416                let lowest = self.heap.pop().unwrap();
417                let maybe_second_lowest = self.heap.pop();
418                if let Some(second_lowest) = maybe_second_lowest {
419                    let result = (self.merge_fn)(&lowest, &second_lowest);
420                    if self.trace {
421                        writeln!(
422                            self.history,
423                            "merge {:?}, {:?} -> {:?}",
424                            lowest.item(),
425                            second_lowest.item(),
426                            result
427                        )
428                        .unwrap();
429                    }
430                    match result {
431                        MergeResult::EmitLeft => {
432                            self.heap.push(second_lowest);
433                            self.item = CurrentItem::Iterator(lowest);
434                            break;
435                        }
436                        MergeResult::Other { emit, left, right } => {
437                            try_join!(
438                                lowest.maybe_advance(&left),
439                                second_lowest.maybe_advance(&right)
440                            )?;
441                            self.update_item(lowest, left);
442                            self.update_item(second_lowest, right);
443                            if let Some(emit) = emit {
444                                self.item = CurrentItem::Item(emit);
445                                break;
446                            }
447                        }
448                    }
449                } else {
450                    self.item = CurrentItem::Iterator(lowest);
451                    break;
452                }
453            }
454
455            // If the item we're about to yield isn't within `next_key_bound`, ignore it and
456            // continue.  To see how this would happen, imagine the following scenario:
457            //
458            //       0          10          20            30
459            //       +----------+
460            //   0   |          |
461            //       +----------+-----------+
462            //   1   |                      |
463            //       +----------------------+-------------+
464            //   2   |                                    |
465            //       +------------------------------------+
466            //
467            // If we are to seek for 0..1 and then iterate over what we find, we expect the sequence
468            // 0..10, 10..20, 20..30.  After the first seek, we should only consult iterator 0.  For
469            // the first advance, we will consult iterator 1 but we need to merge the 0..20 element
470            // with the 0..10 element which we already emitted.  To make this work, we push the
471            // 0..10 item back on the heap (see advance) and then merge, but that will yield the
472            // 0..10 entry again, so here we need to skip over it, and then merge again, at which
473            // point we should see the 10..20 entry as expected.
474            match next_key_bound {
475                Bound::Included(key)
476                    if self.get().unwrap().key.cmp_upper_bound(key) == Ordering::Less => {}
477                Bound::Excluded(key)
478                    if self.get().unwrap().key.cmp_upper_bound(key) != Ordering::Greater => {}
479                _ => return Ok(()),
480            }
481            if let Some(iterator) = self.item.take_iterator() {
482                iterator.advance().await?;
483                if iterator.is_some() {
484                    self.heap.push(iterator);
485                }
486            }
487        }
488    }
489
490    // Returns whether more iterators are required for the given `next_key`.  See push_iterators.
491    fn needs_more_iterators(&self, next_key: Option<&K>, next_key_bound: Bound<&K>) -> bool {
492        if self.pending_iterators.is_empty() {
493            return false;
494        }
495        if self.heap.is_empty() {
496            return true;
497        }
498        let target;
499        match next_key_bound {
500            Bound::Included(k) => target = k,
501            Bound::Excluded(_) | Bound::Unbounded => return true,
502        }
503        match target.merge_type() {
504            MergeType::FullMerge => true,
505            MergeType::OptimizedMerge => {
506                match next_key {
507                    Some(k) => {
508                        self.heap.peek().unwrap().key().cmp_lower_bound(k) == Ordering::Greater
509                    }
510                    None => {
511                        // If we've found an exact match, return it since nothing needs to merge.
512                        self.heap.peek().unwrap().key().cmp_lower_bound(target) != Ordering::Equal
513                    }
514                }
515            }
516        }
517    }
518
519    // Pushes additional iterators onto the heap until we are confident that the top element will
520    // yield what we are looking for.  If next_key is set, we will stop pushing iterators as soon as
521    // a key is encountered that equals or precedes it.  If next_key is None, then all layers are
522    // pushed.  next_key_bound is the bound to search for if another layer does need to be
523    // consulted.  See the comment for the MergeType trait.
524    async fn push_iterators(
525        &mut self,
526        next_key: Option<&K>,
527        next_key_bound: Bound<&K>,
528    ) -> Result<(), Error> {
529        while self.needs_more_iterators(next_key, next_key_bound) {
530            let iter = self.pending_iterators.pop().unwrap();
531            let sub_iter = iter.layer.as_ref().unwrap().seek(next_key_bound).await?;
532            if self.trace {
533                writeln!(
534                    self.history,
535                    "merger: search for {:?}, found {:?}",
536                    next_key_bound,
537                    sub_iter.get()
538                )
539                .unwrap();
540            }
541            iter.iter = RawIterator::Const(sub_iter);
542            iter.set_item_from_iter();
543            if iter.is_some() {
544                self.heap.push(iter);
545            }
546        }
547        Ok(())
548    }
549
550    // Updates the merge iterator depending on |op|. If discarding, the iterator should have already
551    // been advanced.
552    fn update_item(&mut self, item: &'a mut MergeLayerIterator<'b, K, V>, op: ItemOp<K, V>) {
553        match op {
554            ItemOp::Keep => self.heap.push(item),
555            ItemOp::Discard => {
556                // The iterator should have already been advanced.
557                if item.is_some() {
558                    self.heap.push(item);
559                }
560            }
561            ItemOp::Replace(replacement) => {
562                item.replace(replacement);
563                self.heap.push(item);
564            }
565        }
566    }
567}
568
569#[async_trait]
570impl<'a, K: Key + LayerKey + OrdLowerBound, V: Value> LayerIterator<K, V>
571    for MergerIterator<'a, '_, K, V>
572{
573    async fn advance(&mut self) -> Result<(), Error> {
574        let current_key;
575        let mut next_key = None;
576        let mut next_key_bound = Bound::Unbounded;
577        if !self.pending_iterators.is_empty() {
578            if let Some(ItemRef { key, .. }) = self.get() {
579                next_key = key.next_key();
580                match next_key {
581                    None => {
582                        // If there is no next key, we must now query all layers and the key we
583                        // search for is the immediate successor of the current key.
584                        current_key = Some(key.clone());
585                        next_key_bound = Bound::Excluded(current_key.as_ref().unwrap());
586                    }
587                    Some(ref key) => next_key_bound = Bound::Included(key),
588                }
589            }
590        }
591
592        // Advance the iterator for the current item and push it onto the heap, and also push any
593        // additional iterators onto the heap (by calling push_iterators).
594        if let Some(iterator) = self.item.take_iterator() {
595            if self.needs_more_iterators(next_key.as_ref(), next_key_bound) {
596                let existing_item = iterator.item().boxed();
597                iterator.advance().await?;
598                match &next_key {
599                    Some(next_key)
600                        if iterator.is_some()
601                            && iterator.key().cmp_lower_bound(next_key) != Ordering::Greater =>
602                    {
603                        // In this case, the key immediately following is a good candidate, and all
604                        // we need to do is merge it with existing iterators; we shouldn't need to
605                        // consult with any more iterators.
606                    }
607                    _ => {
608                        // We are going to need to consult more iterators so we need to go back to
609                        // the previous item so that we can merge with it.  See the comment in
610                        // advance_impl.
611                        iterator.replace(existing_item);
612
613                        // We must push other iterators here before pushing iterator onto the heap
614                        // because we know `iterator` would end up at the top of the heap.
615                        self.push_iterators(next_key.as_ref(), next_key_bound).await?;
616                    }
617                }
618            } else {
619                iterator.advance().await?;
620            }
621            if iterator.is_some() {
622                self.heap.push(iterator);
623            }
624        } else {
625            self.push_iterators(next_key.as_ref(), next_key_bound).await?;
626        }
627
628        self.advance_impl(next_key_bound).await
629    }
630
631    fn get(&self) -> Option<ItemRef<'_, K, V>> {
632        (&self.item).into()
633    }
634}
635
636struct MutMergeLayerIterator<'a, K, V>(MergeLayerIterator<'a, K, V>);
637
638impl<K, V> MutMergeLayerIterator<'_, K, V> {
639    fn advance(&mut self) {
640        if let MergeItem::Iter = self.item {
641            self.as_mut().advance();
642        }
643        self.set_item_from_iter();
644    }
645}
646
647impl<'a, K, V> AsMut<dyn LayerIteratorMut<K, V> + 'a> for MutMergeLayerIterator<'a, K, V> {
648    fn as_mut(&mut self) -> &mut (dyn LayerIteratorMut<K, V> + 'a) {
649        let RawIterator::Mut(iter) = &mut self.0.iter else { unreachable!() };
650        iter.as_mut()
651    }
652}
653
654impl<'a, K, V> Deref for MutMergeLayerIterator<'a, K, V> {
655    type Target = MergeLayerIterator<'a, K, V>;
656
657    fn deref(&self) -> &Self::Target {
658        &self.0
659    }
660}
661
662impl<K, V> DerefMut for MutMergeLayerIterator<'_, K, V> {
663    fn deref_mut(&mut self) -> &mut Self::Target {
664        &mut self.0
665    }
666}
667
668// Merges the given item into a mutable layer.
669pub(super) fn merge_into<K: Debug + OrdLowerBound, V: Debug>(
670    mut_iter: Box<dyn LayerIteratorMut<K, V> + '_>,
671    item: Item<K, V>,
672    merge_fn: MergeFn<K, V>,
673) -> Result<(), Error> {
674    let merge_item = if mut_iter.get().is_some() { MergeItem::Iter } else { MergeItem::None };
675    let mut mut_merge_iter = MutMergeLayerIterator(MergeLayerIterator {
676        layer: None,
677        iter: RawIterator::Mut(mut_iter),
678        layer_index: 1,
679        item: merge_item,
680    });
681    let mut item_merge_iter = MergeLayerIterator::new_with_item(0, MergeItem::Item(item.boxed()));
682    while mut_merge_iter.is_some() && item_merge_iter.is_some() {
683        if mut_merge_iter.0 > item_merge_iter {
684            // In this branch the mutable layer is left and the item we're merging-in is right.
685            let merge_result = merge_fn(&mut_merge_iter, &item_merge_iter);
686            debug!(
687                lhs:? = mut_merge_iter.key(),
688                rhs:? = item_merge_iter.key(),
689                result:? = merge_result;
690                "(1) merge");
691            match merge_result {
692                MergeResult::EmitLeft => {
693                    if let Some(item) = mut_merge_iter.take_item() {
694                        mut_merge_iter.as_mut().insert(*item);
695                        mut_merge_iter.set_item_from_iter();
696                    } else {
697                        mut_merge_iter.advance();
698                    }
699                }
700                MergeResult::Other { emit, left, right } => {
701                    if let Some(emit) = emit {
702                        mut_merge_iter.as_mut().insert(*emit);
703                    }
704                    match left {
705                        ItemOp::Keep => {}
706                        ItemOp::Discard => {
707                            if matches!(mut_merge_iter.item, MergeItem::Iter) {
708                                mut_merge_iter.as_mut().erase();
709                            }
710                            mut_merge_iter.set_item_from_iter();
711                        }
712                        ItemOp::Replace(item) => {
713                            if let MergeItem::Iter = mut_merge_iter.item {
714                                mut_merge_iter.as_mut().erase();
715                            }
716                            mut_merge_iter.item = MergeItem::Item(item)
717                        }
718                    }
719                    match right {
720                        ItemOp::Keep => {}
721                        ItemOp::Discard => item_merge_iter.item = MergeItem::None,
722                        ItemOp::Replace(item) => item_merge_iter.item = MergeItem::Item(item),
723                    }
724                }
725            }
726        } else {
727            // In this branch, the item we're merging-in is left and the mutable layer is right.
728            let merge_result = merge_fn(&item_merge_iter, &mut_merge_iter);
729            debug!(
730                lhs:? = mut_merge_iter.key(),
731                rhs:? = item_merge_iter.key(),
732                result:? = merge_result;
733                "(2) merge");
734            match merge_result {
735                MergeResult::EmitLeft => break, // Item is inserted outside the loop
736                MergeResult::Other { emit, left, right } => {
737                    if let Some(emit) = emit {
738                        mut_merge_iter.as_mut().insert(*emit);
739                    }
740                    match left {
741                        ItemOp::Keep => {}
742                        ItemOp::Discard => item_merge_iter.item = MergeItem::None,
743                        ItemOp::Replace(item) => item_merge_iter.item = MergeItem::Item(item),
744                    }
745                    match right {
746                        ItemOp::Keep => {}
747                        ItemOp::Discard => {
748                            if matches!(mut_merge_iter.item, MergeItem::Iter) {
749                                mut_merge_iter.as_mut().erase();
750                            }
751                            mut_merge_iter.set_item_from_iter();
752                        }
753                        ItemOp::Replace(item) => {
754                            if let MergeItem::Iter = mut_merge_iter.item {
755                                mut_merge_iter.as_mut().erase();
756                            }
757                            mut_merge_iter.item = MergeItem::Item(item)
758                        }
759                    }
760                }
761            }
762        }
763    } // while ...
764
765    // The only way we could get here with both items is via the break above, so we know the correct
766    // order required here.
767    if let MergeItem::Item(item) = item_merge_iter.item {
768        mut_merge_iter.as_mut().insert(*item);
769    }
770    if let Some(item) = mut_merge_iter.take_item() {
771        mut_merge_iter.as_mut().insert(*item);
772    }
773    if let RawIterator::Mut(mut iter) = mut_merge_iter.0.iter {
774        iter.commit();
775    }
776    Ok(())
777}
778
779#[cfg(test)]
780mod tests {
781    use super::ItemOp::{Discard, Keep, Replace};
782    use super::{MergeLayerIterator, MergeResult, Merger};
783    use crate::lsm_tree::persistent_layer::{PersistentLayer, PersistentLayerWriter};
784    use crate::lsm_tree::skip_list_layer::SkipListLayer;
785    use crate::lsm_tree::types::{
786        FuzzyHash, Item, ItemRef, Key, Layer, LayerIterator, LayerKey, LayerWriter, MergeType,
787        OrdLowerBound, OrdUpperBound, SortByU64,
788    };
789    use crate::lsm_tree::{self, Query, Value};
790    use crate::object_store::{self, ObjectKey, ObjectValue, VOLUME_DATA_KEY_ID};
791    use crate::serialized_types::{
792        LATEST_VERSION, Version, Versioned, VersionedLatest, versioned_type,
793    };
794    use crate::testing::fake_object::{FakeObject, FakeObjectHandle};
795    use crate::testing::writer::Writer;
796    use fprint::TypeFingerprint;
797    use fxfs_macros::FuzzyHash;
798    use rand::Rng;
799    use std::hash::Hash;
800    use std::ops::{Bound, Range};
801    use std::sync::Arc;
802
803    #[derive(
804        Clone,
805        Eq,
806        Hash,
807        FuzzyHash,
808        PartialEq,
809        Debug,
810        serde::Serialize,
811        serde::Deserialize,
812        TypeFingerprint,
813        Versioned,
814    )]
815    struct TestKey(Range<u64>);
816
817    impl Value for i32 {
818        const DELETED_MARKER: Self = 0;
819    }
820
821    versioned_type! { 1.. => TestKey }
822
823    impl SortByU64 for TestKey {
824        fn get_leading_u64(&self) -> u64 {
825            self.0.start
826        }
827    }
828
829    impl LayerKey for TestKey {
830        fn merge_type(&self) -> MergeType {
831            MergeType::OptimizedMerge
832        }
833
834        fn next_key(&self) -> Option<Self> {
835            Some(TestKey(self.0.end..self.0.end + 1))
836        }
837    }
838
839    impl OrdUpperBound for TestKey {
840        fn cmp_upper_bound(&self, other: &TestKey) -> std::cmp::Ordering {
841            self.0.end.cmp(&other.0.end)
842        }
843    }
844
845    impl OrdLowerBound for TestKey {
846        fn cmp_lower_bound(&self, other: &Self) -> std::cmp::Ordering {
847            self.0.start.cmp(&other.0.start)
848        }
849    }
850
851    fn layer_ref_iter<K: Key, V: Value>(
852        layers: &[Arc<SkipListLayer<K, V>>],
853    ) -> impl Iterator<Item = &dyn Layer<K, V>> {
854        layers.iter().map(|x| x.as_ref() as &dyn Layer<K, V>)
855    }
856
857    fn dyn_layer_ref_iter<K: Key, V: Value>(
858        layers: &[Arc<dyn Layer<K, V>>],
859    ) -> impl Iterator<Item = &dyn Layer<K, V>> {
860        layers.iter().map(|x| x.as_ref())
861    }
862
863    fn counters() -> Arc<lsm_tree::TreeCounters> {
864        Arc::new(lsm_tree::TreeCounters::default())
865    }
866
867    #[fuchsia::test]
868    async fn test_emit_left() {
869        let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
870        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
871        skip_lists[0].insert(items[1].clone()).expect("insert error");
872        skip_lists[1].insert(items[0].clone()).expect("insert error");
873        let mut merger = Merger::new(
874            layer_ref_iter(&skip_lists),
875            |_left, _right| MergeResult::EmitLeft,
876            counters(),
877        );
878        let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
879        let ItemRef { key, value, .. } = iter.get().expect("missing item");
880        assert_eq!((key, value), (&items[0].key, &items[0].value));
881        iter.advance().await.unwrap();
882        let ItemRef { key, value, .. } = iter.get().expect("missing item");
883        assert_eq!((key, value), (&items[1].key, &items[1].value));
884        iter.advance().await.unwrap();
885        assert!(iter.get().is_none());
886    }
887
888    #[fuchsia::test]
889    async fn test_other_emit() {
890        let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
891        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
892        skip_lists[0].insert(items[1].clone()).expect("insert error");
893        skip_lists[1].insert(items[0].clone()).expect("insert error");
894        let mut merger = Merger::new(
895            layer_ref_iter(&skip_lists),
896            |_left, _right| MergeResult::Other {
897                emit: Some(Item::new(TestKey(3..3), 3).boxed()),
898                left: Discard,
899                right: Discard,
900            },
901            counters(),
902        );
903        let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
904
905        let ItemRef { key, value, .. } = iter.get().expect("missing item");
906        assert_eq!((key, value), (&TestKey(3..3), &3));
907        iter.advance().await.unwrap();
908        assert!(iter.get().is_none());
909    }
910
911    #[fuchsia::test]
912    async fn test_replace_left() {
913        let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
914        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
915        skip_lists[0].insert(items[1].clone()).expect("insert error");
916        skip_lists[1].insert(items[0].clone()).expect("insert error");
917        let mut merger = Merger::new(
918            layer_ref_iter(&skip_lists),
919            |_left, _right| MergeResult::Other {
920                emit: None,
921                left: Replace(Item::new(TestKey(3..3), 3).boxed()),
922                right: Discard,
923            },
924            counters(),
925        );
926        let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
927
928        // The merger should replace the left item and then after discarding the right item, it
929        // should emit the replacement.
930        let ItemRef { key, value, .. } = iter.get().expect("missing item");
931        assert_eq!((key, value), (&TestKey(3..3), &3));
932        iter.advance().await.unwrap();
933        assert!(iter.get().is_none());
934    }
935
936    #[fuchsia::test]
937    async fn test_replace_right() {
938        let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
939        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
940        skip_lists[0].insert(items[1].clone()).expect("insert error");
941        skip_lists[1].insert(items[0].clone()).expect("insert error");
942        let mut merger = Merger::new(
943            layer_ref_iter(&skip_lists),
944            |_left, _right| MergeResult::Other {
945                emit: None,
946                left: Discard,
947                right: Replace(Item::new(TestKey(3..3), 3).boxed()),
948            },
949            counters(),
950        );
951        let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
952
953        // The merger should replace the right item and then after discarding the left item, it
954        // should emit the replacement.
955        let ItemRef { key, value, .. } = iter.get().expect("missing item");
956        assert_eq!((key, value), (&TestKey(3..3), &3));
957        iter.advance().await.unwrap();
958        assert!(iter.get().is_none());
959    }
960
961    #[fuchsia::test]
962    async fn test_left_less_than_right() {
963        let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
964        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
965        skip_lists[0].insert(items[1].clone()).expect("insert error");
966        skip_lists[1].insert(items[0].clone()).expect("insert error");
967        let mut merger = Merger::new(
968            layer_ref_iter(&skip_lists),
969            |left, right| {
970                assert_eq!((left.key(), left.value()), (&TestKey(1..1), &1));
971                assert_eq!((right.key(), right.value()), (&TestKey(2..2), &2));
972                MergeResult::EmitLeft
973            },
974            counters(),
975        );
976        merger.query(Query::FullScan).await.expect("seek failed");
977    }
978
979    #[fuchsia::test]
980    async fn test_left_equals_right() {
981        let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
982        let item = Item::new(TestKey(1..1), 1);
983        skip_lists[0].insert(item.clone()).expect("insert error");
984        skip_lists[1].insert(item.clone()).expect("insert error");
985        let mut merger = Merger::new(
986            layer_ref_iter(&skip_lists),
987            |left, right| {
988                assert_eq!((left.key(), left.value()), (&TestKey(1..1), &1));
989                assert_eq!((right.key(), right.value()), (&TestKey(1..1), &1));
990                assert_eq!(left.layer_index, 0);
991                assert_eq!(right.layer_index, 1);
992                MergeResult::EmitLeft
993            },
994            counters(),
995        );
996        merger.query(Query::FullScan).await.expect("seek failed");
997    }
998
999    #[fuchsia::test]
1000    async fn test_keep() {
1001        let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1002        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
1003        skip_lists[0].insert(items[1].clone()).expect("insert error");
1004        skip_lists[1].insert(items[0].clone()).expect("insert error");
1005        let mut merger = Merger::new(
1006            layer_ref_iter(&skip_lists),
1007            |left, right| {
1008                if left.key() == &TestKey(1..1) {
1009                    MergeResult::Other {
1010                        emit: None,
1011                        left: Replace(Item::new(TestKey(3..3), 3).boxed()),
1012                        right: Keep,
1013                    }
1014                } else {
1015                    assert_eq!(left.key(), &TestKey(2..2));
1016                    assert_eq!(right.key(), &TestKey(3..3));
1017                    MergeResult::Other { emit: None, left: Discard, right: Keep }
1018                }
1019            },
1020            counters(),
1021        );
1022        let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
1023
1024        // The merger should first replace left and then it should call the merger again with 2 & 3
1025        // and end up just keeping 3.
1026        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1027        assert_eq!((key, value), (&TestKey(3..3), &3));
1028        iter.advance().await.unwrap();
1029        assert!(iter.get().is_none());
1030    }
1031
1032    #[fuchsia::test]
1033    async fn test_merge_10_layers() {
1034        let skip_lists: Vec<_> = (0..10).map(|_| SkipListLayer::new(100)).collect();
1035        let mut rng = rand::rng();
1036        for i in 0..100 {
1037            skip_lists[rng.random_range(0..10) as usize]
1038                .insert(Item::new(TestKey(i..i), i))
1039                .expect("insert error");
1040        }
1041        let mut merger = Merger::new(
1042            layer_ref_iter(&skip_lists),
1043            |_left, _right| MergeResult::EmitLeft,
1044            counters(),
1045        );
1046        let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
1047
1048        for i in 0..100 {
1049            let ItemRef { key, value, .. } = iter.get().expect("missing item");
1050            assert_eq!((key, value), (&TestKey(i..i), &i));
1051            iter.advance().await.unwrap();
1052        }
1053        assert!(iter.get().is_none());
1054    }
1055
1056    #[fuchsia::test]
1057    async fn test_merge_uses_cmp_lower_bound() {
1058        let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1059        let items = [Item::new(TestKey(1..10), 1), Item::new(TestKey(2..3), 2)];
1060        skip_lists[0].insert(items[1].clone()).expect("insert error");
1061        skip_lists[1].insert(items[0].clone()).expect("insert error");
1062        let mut merger = Merger::new(
1063            layer_ref_iter(&skip_lists),
1064            |_left, _right| MergeResult::EmitLeft,
1065            counters(),
1066        );
1067        let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
1068
1069        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1070        assert_eq!((key, value), (&items[0].key, &items[0].value));
1071        iter.advance().await.unwrap();
1072        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1073        assert_eq!((key, value), (&items[1].key, &items[1].value));
1074        iter.advance().await.unwrap();
1075        assert!(iter.get().is_none());
1076    }
1077
1078    #[fuchsia::test]
1079    async fn test_merge_into_emit_left() {
1080        let skip_list = SkipListLayer::new(100);
1081        let items =
1082            [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2), Item::new(TestKey(3..3), 3)];
1083        skip_list.insert(items[0].clone()).expect("insert error");
1084        skip_list.insert(items[2].clone()).expect("insert error");
1085        skip_list
1086            .merge_into(items[1].clone(), &items[0].key, |_left, _right| MergeResult::EmitLeft);
1087
1088        let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1089        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1090        assert_eq!((key, value), (&items[0].key, &items[0].value));
1091        iter.advance().await.unwrap();
1092        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1093        assert_eq!((key, value), (&items[1].key, &items[1].value));
1094        iter.advance().await.unwrap();
1095        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1096        assert_eq!((key, value), (&items[2].key, &items[2].value));
1097        iter.advance().await.unwrap();
1098        assert!(iter.get().is_none());
1099    }
1100
1101    #[fuchsia::test]
1102    async fn test_merge_into_emit_last_after_replacing() {
1103        let skip_list = SkipListLayer::new(100);
1104        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
1105        skip_list.insert(items[0].clone()).expect("insert error");
1106
1107        skip_list.merge_into(items[1].clone(), &items[0].key, |left, right| {
1108            if left.key() == &TestKey(1..1) {
1109                assert_eq!(right.key(), &TestKey(2..2));
1110                MergeResult::Other {
1111                    emit: None,
1112                    left: Replace(Item::new(TestKey(3..3), 3).boxed()),
1113                    right: Keep,
1114                }
1115            } else {
1116                assert_eq!(left.key(), &TestKey(2..2));
1117                assert_eq!(right.key(), &TestKey(3..3));
1118                MergeResult::EmitLeft
1119            }
1120        });
1121
1122        let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1123        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1124        assert_eq!((key, value), (&items[1].key, &items[1].value));
1125        iter.advance().await.unwrap();
1126        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1127        assert_eq!((key, value), (&TestKey(3..3), &3));
1128        iter.advance().await.unwrap();
1129        assert!(iter.get().is_none());
1130    }
1131
1132    #[fuchsia::test]
1133    async fn test_merge_into_emit_left_after_replacing() {
1134        let skip_list = SkipListLayer::new(100);
1135        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(3..3), 3)];
1136        skip_list.insert(items[0].clone()).expect("insert error");
1137
1138        skip_list.merge_into(items[1].clone(), &items[0].key, |left, right| {
1139            if left.key() == &TestKey(1..1) {
1140                assert_eq!(right.key(), &TestKey(3..3));
1141                MergeResult::Other {
1142                    emit: None,
1143                    left: Replace(Item::new(TestKey(2..2), 2).boxed()),
1144                    right: Keep,
1145                }
1146            } else {
1147                assert_eq!(left.key(), &TestKey(2..2));
1148                assert_eq!(right.key(), &TestKey(3..3));
1149                MergeResult::EmitLeft
1150            }
1151        });
1152
1153        let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1154        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1155        assert_eq!((key, value), (&TestKey(2..2), &2));
1156        iter.advance().await.unwrap();
1157        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1158        assert_eq!((key, value), (&items[1].key, &items[1].value));
1159        iter.advance().await.unwrap();
1160        assert!(iter.get().is_none());
1161    }
1162
1163    // This tests emitting in both branches of merge_into, and most of the discard paths.
1164    #[fuchsia::test]
1165    async fn test_merge_into_emit_other_and_discard() {
1166        let skip_list = SkipListLayer::new(100);
1167        let items =
1168            [Item::new(TestKey(1..1), 1), Item::new(TestKey(3..3), 3), Item::new(TestKey(5..5), 3)];
1169        skip_list.insert(items[0].clone()).expect("insert error");
1170        skip_list.insert(items[2].clone()).expect("insert error");
1171
1172        skip_list.merge_into(items[1].clone(), &items[0].key, |left, right| {
1173            if left.key() == &TestKey(1..1) {
1174                // This tests the top branch in merge_into.
1175                assert_eq!(right.key(), &TestKey(3..3));
1176                MergeResult::Other {
1177                    emit: Some(Item::new(TestKey(2..2), 2).boxed()),
1178                    left: Discard,
1179                    right: Keep,
1180                }
1181            } else {
1182                // This tests the bottom branch in merge_into.
1183                assert_eq!(left.key(), &TestKey(3..3));
1184                assert_eq!(right.key(), &TestKey(5..5));
1185                MergeResult::Other {
1186                    emit: Some(Item::new(TestKey(4..4), 4).boxed()),
1187                    left: Discard,
1188                    right: Discard,
1189                }
1190            }
1191        });
1192
1193        let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1194        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1195        assert_eq!((key, value), (&TestKey(2..2), &2));
1196        iter.advance().await.unwrap();
1197        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1198        assert_eq!((key, value), (&TestKey(4..4), &4));
1199        iter.advance().await.unwrap();
1200        assert!(iter.get().is_none());
1201    }
1202
1203    // This tests replacing the item and discarding the right item (the one remaining untested
1204    // discard path) in the top branch in merge_into.
1205    #[fuchsia::test]
1206    async fn test_merge_into_replace_and_discard() {
1207        let skip_list = SkipListLayer::new(100);
1208        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(3..3), 3)];
1209        skip_list.insert(items[0].clone()).expect("insert error");
1210
1211        skip_list.merge_into(items[1].clone(), &items[0].key, |_left, _right| MergeResult::Other {
1212            emit: Some(Item::new(TestKey(2..2), 2).boxed()),
1213            left: Replace(Item::new(TestKey(4..4), 4).boxed()),
1214            right: Discard,
1215        });
1216
1217        let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1218        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1219        assert_eq!((key, value), (&TestKey(2..2), &2));
1220        iter.advance().await.unwrap();
1221        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1222        assert_eq!((key, value), (&TestKey(4..4), &4));
1223        iter.advance().await.unwrap();
1224        assert!(iter.get().is_none());
1225    }
1226
1227    // This tests replacing the right item in the top branch of merge_into and the left item in the
1228    // bottom branch of merge_into.
1229    #[fuchsia::test]
1230    async fn test_merge_into_replace_merge_item() {
1231        let skip_list = SkipListLayer::new(100);
1232        let items =
1233            [Item::new(TestKey(1..1), 1), Item::new(TestKey(3..3), 3), Item::new(TestKey(5..5), 5)];
1234        skip_list.insert(items[0].clone()).expect("insert error");
1235        skip_list.insert(items[2].clone()).expect("insert error");
1236
1237        skip_list.merge_into(items[1].clone(), &items[0].key, |_left, right| {
1238            if right.key() == &TestKey(3..3) {
1239                MergeResult::Other {
1240                    emit: None,
1241                    left: Discard,
1242                    right: Replace(Item::new(TestKey(2..2), 2).boxed()),
1243                }
1244            } else {
1245                assert_eq!(right.key(), &TestKey(5..5));
1246                MergeResult::Other {
1247                    emit: None,
1248                    left: Replace(Item::new(TestKey(4..4), 4).boxed()),
1249                    right: Discard,
1250                }
1251            }
1252        });
1253
1254        let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1255        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1256        assert_eq!((key, value), (&TestKey(4..4), &4));
1257        iter.advance().await.unwrap();
1258        assert!(iter.get().is_none());
1259    }
1260
1261    // This tests replacing the right item in the bottom branch of merge_into.
1262    #[fuchsia::test]
1263    async fn test_merge_into_replace_existing() {
1264        let skip_list = SkipListLayer::new(100);
1265        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(3..3), 3)];
1266        skip_list.insert(items[1].clone()).expect("insert error");
1267
1268        skip_list.merge_into(items[0].clone(), &items[0].key, |_left, right| {
1269            if right.key() == &TestKey(3..3) {
1270                MergeResult::Other {
1271                    emit: None,
1272                    left: Keep,
1273                    right: Replace(Item::new(TestKey(2..2), 2).boxed()),
1274                }
1275            } else {
1276                MergeResult::EmitLeft
1277            }
1278        });
1279
1280        let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1281        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1282        assert_eq!((key, value), (&items[0].key, &items[0].value));
1283        iter.advance().await.unwrap();
1284        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1285        assert_eq!((key, value), (&TestKey(2..2), &2));
1286        iter.advance().await.unwrap();
1287        assert!(iter.get().is_none());
1288    }
1289
1290    #[fuchsia::test]
1291    async fn test_merge_into_discard_last() {
1292        let skip_list = SkipListLayer::new(100);
1293        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
1294        skip_list.insert(items[0].clone()).expect("insert error");
1295
1296        skip_list.merge_into(items[1].clone(), &items[0].key, |_left, _right| MergeResult::Other {
1297            emit: None,
1298            left: Discard,
1299            right: Keep,
1300        });
1301
1302        let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1303        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1304        assert_eq!((key, value), (&items[1].key, &items[1].value));
1305        iter.advance().await.unwrap();
1306        assert!(iter.get().is_none());
1307    }
1308
1309    #[fuchsia::test]
1310    async fn test_merge_into_empty() {
1311        let skip_list = SkipListLayer::new(100);
1312        let items = [Item::new(TestKey(1..1), 1)];
1313
1314        skip_list.merge_into(items[0].clone(), &items[0].key, |_left, _right| {
1315            panic!("Unexpected merge!");
1316        });
1317
1318        let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1319        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1320        assert_eq!((key, value), (&items[0].key, &items[0].value));
1321        iter.advance().await.unwrap();
1322        assert!(iter.get().is_none());
1323    }
1324
1325    #[fuchsia::test]
1326    async fn test_seek_uses_minimum_number_of_iterators() {
1327        let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1328        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(1..1), 2)];
1329        skip_lists[0].insert(items[0].clone()).expect("insert error");
1330        skip_lists[1].insert(items[1].clone()).expect("insert error");
1331        let mut merger = Merger::new(
1332            layer_ref_iter(&skip_lists),
1333            |_left, _right| MergeResult::Other { emit: None, left: Discard, right: Keep },
1334            counters(),
1335        );
1336        let iter = merger.query(Query::FullRange(&items[0].key)).await.expect("seek failed");
1337
1338        // Seek should only search in the first skip list, so no merge should take place, and we'll
1339        // know if it has because we'll see a different value (2 rather than 1).
1340        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1341        assert_eq!((key, value), (&items[0].key, &items[0].value));
1342    }
1343
1344    // Checks that merging the given layers produces |expected| sequence of items starting from
1345    // |start|.
1346    async fn test_advance<K: Eq + Key + LayerKey + OrdLowerBound>(
1347        layers: &[&[(K, i32)]],
1348        query: Query<'_, K>,
1349        expected: &[(K, i32)],
1350    ) {
1351        let mut skip_lists = Vec::new();
1352        for &layer in layers {
1353            let skip_list = SkipListLayer::new(100);
1354            for (k, v) in layer {
1355                skip_list.insert(Item::new(k.clone(), *v)).expect("insert error");
1356            }
1357            skip_lists.push(skip_list);
1358        }
1359        let mut merger = Merger::new(
1360            layer_ref_iter(&skip_lists),
1361            |_left, _right| MergeResult::EmitLeft,
1362            counters(),
1363        );
1364        let mut iter = merger.query(query).await.expect("seek failed");
1365        for (k, v) in expected {
1366            let ItemRef { key, value, .. } = iter.get().expect("get failed");
1367            assert_eq!((key, value), (k, v));
1368            iter.advance().await.expect("advance failed");
1369        }
1370        assert!(iter.get().is_none());
1371    }
1372
1373    #[fuchsia::test]
1374    async fn test_seek_skips_replaced_items() {
1375        // The 1..2 and the 2..3 items are overwritten and merging them should be skipped.
1376        test_advance(
1377            &[
1378                &[(TestKey(1..2), 1), (TestKey(2..3), 2), (TestKey(4..5), 3)],
1379                &[(TestKey(1..2), 4), (TestKey(2..3), 5), (TestKey(3..4), 6)],
1380            ],
1381            Query::FullRange(&TestKey(1..2)),
1382            &[(TestKey(1..2), 1), (TestKey(2..3), 2), (TestKey(3..4), 6), (TestKey(4..5), 3)],
1383        )
1384        .await;
1385    }
1386
1387    #[fuchsia::test]
1388    async fn test_advance_skips_replaced_items_at_end() {
1389        // Like the last test, the 1..2 item is overwritten and seeking for it should skip the merge
1390        // but this time, the keys are at the end.
1391        test_advance(
1392            &[&[(TestKey(1..2), 1)], &[(TestKey(1..2), 2)]],
1393            Query::FullRange(&TestKey(1..2)),
1394            &[(TestKey(1..2), 1)],
1395        )
1396        .await;
1397    }
1398
1399    #[derive(
1400        Clone,
1401        Eq,
1402        Hash,
1403        FuzzyHash,
1404        PartialEq,
1405        Debug,
1406        serde::Serialize,
1407        serde::Deserialize,
1408        TypeFingerprint,
1409        Versioned,
1410    )]
1411    struct TestKeyWithFullMerge(Range<u64>);
1412
1413    versioned_type! { 1.. => TestKeyWithFullMerge }
1414
1415    impl LayerKey for TestKeyWithFullMerge {
1416        fn merge_type(&self) -> MergeType {
1417            MergeType::FullMerge
1418        }
1419    }
1420
1421    impl SortByU64 for TestKeyWithFullMerge {
1422        fn get_leading_u64(&self) -> u64 {
1423            self.0.start
1424        }
1425    }
1426
1427    impl OrdUpperBound for TestKeyWithFullMerge {
1428        fn cmp_upper_bound(&self, other: &TestKeyWithFullMerge) -> std::cmp::Ordering {
1429            self.0.end.cmp(&other.0.end)
1430        }
1431    }
1432
1433    impl OrdLowerBound for TestKeyWithFullMerge {
1434        fn cmp_lower_bound(&self, other: &Self) -> std::cmp::Ordering {
1435            self.0.start.cmp(&other.0.start)
1436        }
1437    }
1438
1439    // Same set up as `test_seek_skips_replaced_items` except here nothing should skip. Any seek
1440    // should just place the iterator at a particular spot in the merged layers.
1441    #[fuchsia::test]
1442    async fn test_full_merge_consistent_advance_ordering() {
1443        let layer_set = [
1444            [
1445                (TestKeyWithFullMerge(1..2), 1i32),
1446                (TestKeyWithFullMerge(2..3), 2i32),
1447                (TestKeyWithFullMerge(4..5), 3i32),
1448            ]
1449            .as_slice(),
1450            [
1451                (TestKeyWithFullMerge(1..2), 4i32),
1452                (TestKeyWithFullMerge(2..3), 5i32),
1453                (TestKeyWithFullMerge(3..4), 6i32),
1454            ]
1455            .as_slice(),
1456        ];
1457
1458        let full_merge_result = [
1459            (TestKeyWithFullMerge(1..2), 1),
1460            (TestKeyWithFullMerge(1..2), 4),
1461            (TestKeyWithFullMerge(2..3), 2),
1462            (TestKeyWithFullMerge(2..3), 5),
1463            (TestKeyWithFullMerge(3..4), 6),
1464            (TestKeyWithFullMerge(4..5), 3),
1465        ];
1466
1467        test_advance(layer_set.as_slice(), Query::FullScan, &full_merge_result).await;
1468
1469        test_advance(
1470            layer_set.as_slice(),
1471            Query::FullRange(&TestKeyWithFullMerge(1..2)),
1472            &full_merge_result,
1473        )
1474        .await;
1475
1476        test_advance(
1477            layer_set.as_slice(),
1478            Query::FullRange(&TestKeyWithFullMerge(2..3)),
1479            &full_merge_result[2..],
1480        )
1481        .await;
1482
1483        test_advance(
1484            layer_set.as_slice(),
1485            Query::FullRange(&TestKeyWithFullMerge(3..4)),
1486            &full_merge_result[4..],
1487        )
1488        .await;
1489    }
1490
1491    #[fuchsia::test]
1492    async fn test_full_merge_always_consult_all_layers() {
1493        // | 1 |   |
1494        // |   | 2 |
1495        // | 3 | 4 |
1496        let skip_lists =
1497            [SkipListLayer::new(100), SkipListLayer::new(100), SkipListLayer::new(100)];
1498        let items = [
1499            Item::new(TestKeyWithFullMerge(1..2), 1),
1500            Item::new(TestKeyWithFullMerge(2..3), 2),
1501            Item::new(TestKeyWithFullMerge(1..2), 3),
1502            Item::new(TestKeyWithFullMerge(2..3), 4),
1503        ];
1504        skip_lists[0].insert(items[0].clone()).expect("insert error");
1505        skip_lists[1].insert(items[1].clone()).expect("insert error");
1506        skip_lists[2].insert(items[2].clone()).expect("insert error");
1507        skip_lists[2].insert(items[3].clone()).expect("insert error");
1508        let mut merger = Merger::new(
1509            layer_ref_iter(&skip_lists),
1510            |left, right| {
1511                // Sum matching keys.
1512                if left.key() == right.key() {
1513                    MergeResult::Other {
1514                        emit: None,
1515                        left: Discard,
1516                        right: Replace(
1517                            Item::new(left.key().clone(), left.value() + right.value()).boxed(),
1518                        ),
1519                    }
1520                } else {
1521                    MergeResult::EmitLeft
1522                }
1523            },
1524            counters(),
1525        );
1526        let mut iter = merger.query(Query::FullRange(&items[0].key)).await.expect("seek failed");
1527
1528        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1529        assert_eq!((key, *value), (&items[0].key, items[0].value + items[2].value));
1530        iter.advance().await.expect("advance");
1531        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1532        assert_eq!((key, *value), (&items[1].key, items[1].value + items[3].value));
1533        iter.advance().await.expect("advance");
1534        assert!(iter.get().is_none());
1535    }
1536
1537    #[derive(
1538        Clone,
1539        Eq,
1540        Hash,
1541        FuzzyHash,
1542        PartialEq,
1543        Debug,
1544        serde::Serialize,
1545        serde::Deserialize,
1546        TypeFingerprint,
1547        Versioned,
1548    )]
1549    struct TestKeyWithDefaultLayerKey(Range<u64>);
1550
1551    versioned_type! { 1.. => TestKeyWithDefaultLayerKey }
1552
1553    // Default layer key is using `MergeType::FullMerge` and returns None for `next_key()`.
1554    impl LayerKey for TestKeyWithDefaultLayerKey {}
1555
1556    impl SortByU64 for TestKeyWithDefaultLayerKey {
1557        fn get_leading_u64(&self) -> u64 {
1558            self.0.start
1559        }
1560    }
1561
1562    impl OrdUpperBound for TestKeyWithDefaultLayerKey {
1563        fn cmp_upper_bound(&self, other: &TestKeyWithDefaultLayerKey) -> std::cmp::Ordering {
1564            self.0.end.cmp(&other.0.end)
1565        }
1566    }
1567
1568    impl OrdLowerBound for TestKeyWithDefaultLayerKey {
1569        fn cmp_lower_bound(&self, other: &Self) -> std::cmp::Ordering {
1570            self.0.start.cmp(&other.0.start)
1571        }
1572    }
1573
1574    #[fuchsia::test]
1575    async fn test_no_merge_unbounded_include_all_layers() {
1576        test_advance(
1577            &[
1578                &[
1579                    (TestKeyWithDefaultLayerKey(1..2), 1),
1580                    (TestKeyWithDefaultLayerKey(2..3), 2),
1581                    (TestKeyWithDefaultLayerKey(4..5), 3),
1582                ],
1583                &[
1584                    (TestKeyWithDefaultLayerKey(1..2), 4),
1585                    (TestKeyWithDefaultLayerKey(2..3), 5),
1586                    (TestKeyWithDefaultLayerKey(3..4), 6),
1587                ],
1588            ],
1589            Query::FullScan,
1590            &[
1591                (TestKeyWithDefaultLayerKey(1..2), 1),
1592                (TestKeyWithDefaultLayerKey(1..2), 4),
1593                (TestKeyWithDefaultLayerKey(2..3), 2),
1594                (TestKeyWithDefaultLayerKey(2..3), 5),
1595                (TestKeyWithDefaultLayerKey(3..4), 6),
1596                (TestKeyWithDefaultLayerKey(4..5), 3),
1597            ],
1598        )
1599        .await;
1600    }
1601
1602    #[fuchsia::test]
1603    async fn test_no_merge_proceeds_comprehensively_after_seek() {
1604        test_advance(
1605            &[
1606                &[
1607                    (TestKeyWithDefaultLayerKey(1..2), 1),
1608                    (TestKeyWithDefaultLayerKey(2..3), 2),
1609                    (TestKeyWithDefaultLayerKey(4..5), 3),
1610                ],
1611                &[
1612                    (TestKeyWithDefaultLayerKey(1..2), 4),
1613                    (TestKeyWithDefaultLayerKey(2..3), 5),
1614                    (TestKeyWithDefaultLayerKey(3..4), 6),
1615                ],
1616            ],
1617            Query::FullRange(&TestKeyWithDefaultLayerKey(1..2)),
1618            &[
1619                (TestKeyWithDefaultLayerKey(1..2), 1),
1620                (TestKeyWithDefaultLayerKey(1..2), 4),
1621                (TestKeyWithDefaultLayerKey(2..3), 2),
1622                (TestKeyWithDefaultLayerKey(2..3), 5),
1623                (TestKeyWithDefaultLayerKey(3..4), 6),
1624                (TestKeyWithDefaultLayerKey(4..5), 3),
1625            ],
1626        )
1627        .await;
1628    }
1629
1630    #[fuchsia::test]
1631    async fn test_no_merge_seek_finds_lower_layer() {
1632        let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1633        let items = [
1634            Item::new(TestKeyWithDefaultLayerKey(2..3), 1),
1635            Item::new(TestKeyWithDefaultLayerKey(1..1), 2),
1636        ];
1637        skip_lists[0].insert(items[0].clone()).expect("insert error");
1638        skip_lists[1].insert(items[1].clone()).expect("insert error");
1639        let mut merger = Merger::new(
1640            layer_ref_iter(&skip_lists),
1641            |_left, _right| MergeResult::EmitLeft,
1642            counters(),
1643        );
1644        let iter = merger.query(Query::FullRange(&items[1].key)).await.expect("seek failed");
1645
1646        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1647        assert_eq!((key, value), (&items[1].key, &items[1].value));
1648    }
1649
1650    #[fuchsia::test]
1651    async fn test_no_merge_seek_stops_at_exact_match() {
1652        let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1653        let items = [
1654            Item::new(TestKeyWithDefaultLayerKey(2..3), 1),
1655            Item::new(TestKeyWithDefaultLayerKey(1..4), 2),
1656        ];
1657        skip_lists[0].insert(items[0].clone()).expect("insert error");
1658        skip_lists[1].insert(items[1].clone()).expect("insert error");
1659        let mut merger = Merger::new(
1660            layer_ref_iter(&skip_lists),
1661            |_left, _right| MergeResult::Other { emit: None, left: Discard, right: Keep },
1662            counters(),
1663        );
1664        let iter = merger.query(Query::FullRange(&items[0].key)).await.expect("seek failed");
1665
1666        // Seek should only search in the first skip list, so no merge should take place, and we'll
1667        // know if it has because we'll see a different value (2 rather than 1).
1668        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1669        assert_eq!((key, value), (&items[0].key, &items[0].value));
1670    }
1671
1672    #[fuchsia::test]
1673    async fn test_seek_less_than() {
1674        let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1675        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
1676        skip_lists[0].insert(items[0].clone()).expect("insert error");
1677        skip_lists[1].insert(items[1].clone()).expect("insert error");
1678        // Search for a key before 1..1.
1679        let mut merger = Merger::new(
1680            layer_ref_iter(&skip_lists),
1681            |_left, _right| MergeResult::Other { emit: None, left: Discard, right: Keep },
1682            counters(),
1683        );
1684        let iter = merger.query(Query::FullRange(&TestKey(0..0))).await.expect("seek failed");
1685
1686        // This should find the 2..2 key because of our merge function.
1687        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1688        assert_eq!((key, value), (&items[1].key, &items[1].value));
1689    }
1690
1691    #[fuchsia::test]
1692    async fn test_seek_to_end() {
1693        let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1694        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
1695        skip_lists[0].insert(items[0].clone()).expect("insert error");
1696        skip_lists[1].insert(items[1].clone()).expect("insert error");
1697        let mut merger = Merger::new(
1698            layer_ref_iter(&skip_lists),
1699            |_left, _right| MergeResult::Other { emit: None, left: Discard, right: Keep },
1700            counters(),
1701        );
1702        let iter = merger.query(Query::FullRange(&TestKey(3..3))).await.expect("seek failed");
1703
1704        assert!(iter.get().is_none());
1705    }
1706
1707    #[fuchsia::test]
1708    async fn test_merge_all_discarded() {
1709        let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1710        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
1711        skip_lists[0].insert(items[1].clone()).expect("insert error");
1712        skip_lists[1].insert(items[0].clone()).expect("insert error");
1713        let mut merger = Merger::new(
1714            layer_ref_iter(&skip_lists),
1715            |_left, _right| MergeResult::Other { emit: None, left: Discard, right: Discard },
1716            counters(),
1717        );
1718        let iter = merger.query(Query::FullScan).await.expect("seek failed");
1719        assert!(iter.get().is_none());
1720    }
1721
1722    #[fuchsia::test]
1723    async fn test_seek_with_merged_key_less_than() {
1724        let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1725        let items = [Item::new(TestKey(1..8), 1), Item::new(TestKey(2..10), 2)];
1726        skip_lists[0].insert(items[0].clone()).expect("insert error");
1727        skip_lists[1].insert(items[1].clone()).expect("insert error");
1728        let mut merger = Merger::new(
1729            layer_ref_iter(&skip_lists),
1730            |left, _right| {
1731                if left.key() == &TestKey(1..8) {
1732                    MergeResult::Other {
1733                        emit: None,
1734                        left: Replace(Item::new(TestKey(1..2), 1).boxed()),
1735                        right: Keep,
1736                    }
1737                } else {
1738                    MergeResult::EmitLeft
1739                }
1740            },
1741            counters(),
1742        );
1743        let iter = merger.query(Query::FullRange(&TestKey(0..3))).await.expect("seek failed");
1744        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1745        assert_eq!((key, value), (&items[1].key, &items[1].value));
1746    }
1747
1748    #[fuchsia::test]
1749    async fn test_overlapping_keys() {
1750        let skip_lists =
1751            [SkipListLayer::new(100), SkipListLayer::new(100), SkipListLayer::new(100)];
1752        let items = [
1753            Item::new(TestKey(0..10), 1),
1754            Item::new(TestKey(0..20), 2),
1755            Item::new(TestKey(0..30), 3),
1756        ];
1757        skip_lists[0].insert(items[0].clone()).expect("insert error");
1758        skip_lists[1].insert(items[1].clone()).expect("insert error");
1759        skip_lists[2].insert(items[2].clone()).expect("insert error");
1760        let mut merger = Merger::new(
1761            layer_ref_iter(&skip_lists),
1762            |left, right| {
1763                let result = if left.key().0.end <= right.key().0.start {
1764                    MergeResult::EmitLeft
1765                } else {
1766                    if left.key() == &TestKey(0..30) && right.key() == &TestKey(10..20) {
1767                        MergeResult::Other {
1768                            emit: Some(Item::new(TestKey(0..10), 1).boxed()),
1769                            left: Replace(Item::new(TestKey(10..30), 1).boxed()),
1770                            right: Keep,
1771                        }
1772                    } else {
1773                        MergeResult::Other {
1774                            emit: None,
1775                            left: Keep,
1776                            right: Replace(
1777                                Item::new(TestKey(left.key().0.end..right.key().0.end), 1).boxed(),
1778                            ),
1779                        }
1780                    }
1781                };
1782                result
1783            },
1784            counters(),
1785        );
1786        let mut iter = merger.query(Query::FullRange(&TestKey(0..1))).await.expect("seek failed");
1787        let ItemRef { key, .. } = iter.get().expect("missing item");
1788        assert_eq!(key, &TestKey(0..10));
1789        iter.advance().await.expect("advance failed");
1790        let ItemRef { key, .. } = iter.get().expect("missing item");
1791        assert_eq!(key, &TestKey(10..20));
1792        iter.advance().await.expect("advance failed");
1793        let ItemRef { key, .. } = iter.get().expect("missing item");
1794        assert_eq!(key, &TestKey(20..30));
1795        iter.advance().await.expect("advance failed");
1796        assert_eq!(iter.get(), None);
1797    }
1798
1799    async fn write_layer<K: Key, V: Value>(items: Vec<Item<K, V>>) -> Arc<dyn Layer<K, V>> {
1800        let object = Arc::new(FakeObject::new());
1801        let write_handle = FakeObjectHandle::new(object.clone());
1802        let mut writer =
1803            PersistentLayerWriter::<_, K, V>::new(Writer::new(&write_handle).await, 1, 512)
1804                .await
1805                .expect("PersistentLayerWriter::new failed");
1806        for item in items {
1807            writer.write(item.as_item_ref()).await.expect("write failed");
1808        }
1809        writer.flush().await.expect("flush failed");
1810        PersistentLayer::open(FakeObjectHandle::new(object))
1811            .await
1812            .expect("open_persistent_layer failed")
1813    }
1814
1815    fn merge_sum(
1816        left: &MergeLayerIterator<'_, i32, i32>,
1817        right: &MergeLayerIterator<'_, i32, i32>,
1818    ) -> MergeResult<i32, i32> {
1819        // Sum matching keys.
1820        if left.key() == right.key() {
1821            MergeResult::Other {
1822                emit: None,
1823                left: Discard,
1824                right: Replace(Item::new(left.key().clone(), left.value() + right.value()).boxed()),
1825            }
1826        } else {
1827            MergeResult::EmitLeft
1828        }
1829    }
1830
1831    #[fuchsia::test]
1832    async fn test_merge_bloom_filters_point_query() {
1833        let layer_0_items = vec![Item::new(1, 1), Item::new(2, 1)];
1834        let layer_1_items = vec![Item::new(2, 1), Item::new(4, 1)];
1835        let items = [Item::new(1, 1), Item::new(2, 2), Item::new(4, 1)];
1836        let layers: [Arc<dyn Layer<i32, i32>>; 2] =
1837            [write_layer(layer_0_items).await, write_layer(layer_1_items).await];
1838        let mut merger = Merger::new(dyn_layer_ref_iter(&layers), merge_sum, counters());
1839
1840        {
1841            // Key that exists in layer 0 only
1842            let iter = merger.query(Query::Point(&1)).await.expect("seek failed");
1843            let ItemRef { key, value, .. } = iter.get().expect("missing item");
1844            assert_eq!((key, *value), (&items[0].key, items[0].value));
1845        }
1846        {
1847            // Key that exists in layer 1 and 2
1848            let iter = merger.query(Query::Point(&2)).await.expect("seek failed");
1849            let ItemRef { key, value, .. } = iter.get().expect("missing item");
1850            assert_eq!((key, *value), (&items[1].key, items[1].value));
1851        }
1852        {
1853            // Key that exists in layer 1
1854            let iter = merger.query(Query::Point(&4)).await.expect("seek failed");
1855            let ItemRef { key, value, .. } = iter.get().expect("missing item");
1856            assert_eq!((key, *value), (&items[2].key, items[2].value));
1857        }
1858        {
1859            // Key that doesn't exist at all
1860            let iter = merger.query(Query::Point(&400)).await.expect("seek failed");
1861            assert!(iter.get().is_none());
1862        }
1863    }
1864
1865    #[fuchsia::test]
1866    async fn test_merge_bloom_filters_limited_range() {
1867        // NB: This test uses ObjectKey so that we don't have to reimplement the complex merging
1868        // logic for range-like keys.
1869        let layer_0_items = vec![Item::new(
1870            ObjectKey::extent(0, 0, 0..2048),
1871            ObjectValue::extent(0, VOLUME_DATA_KEY_ID),
1872        )];
1873        let layer_1_items = vec![
1874            Item::new(
1875                ObjectKey::extent(0, 0, 1024..4096),
1876                ObjectValue::extent(32768, VOLUME_DATA_KEY_ID),
1877            ),
1878            Item::new(
1879                ObjectKey::extent(0, 0, 16384..17408),
1880                ObjectValue::extent(65536, VOLUME_DATA_KEY_ID),
1881            ),
1882        ];
1883        let items = [
1884            Item::new(ObjectKey::extent(0, 0, 0..2048), ObjectValue::extent(0, VOLUME_DATA_KEY_ID)),
1885            Item::new(
1886                ObjectKey::extent(0, 0, 2048..4096),
1887                ObjectValue::extent(33792, VOLUME_DATA_KEY_ID),
1888            ),
1889            Item::new(
1890                ObjectKey::extent(0, 0, 16384..17408),
1891                ObjectValue::extent(65536, VOLUME_DATA_KEY_ID),
1892            ),
1893        ];
1894        let layers: [Arc<dyn Layer<ObjectKey, ObjectValue>>; 2] =
1895            [write_layer(layer_0_items).await, write_layer(layer_1_items).await];
1896        let mut merger =
1897            Merger::new(dyn_layer_ref_iter(&layers), object_store::merge::merge, counters());
1898
1899        {
1900            // Range contains just keys in layer 1
1901            let mut iter = merger
1902                .query(Query::LimitedRange(&ObjectKey::extent(0, 0, 16384..16386)))
1903                .await
1904                .expect("seek failed");
1905            let ItemRef { key, value, .. } = iter.get().expect("missing item");
1906            assert_eq!(key, &items[2].key);
1907            assert_eq!(value, &items[2].value);
1908            iter.advance().await.expect("advance");
1909            assert!(iter.get().is_none());
1910        }
1911        {
1912            // Range contains keys in layer 0 and 1
1913            let mut iter = merger
1914                .query(Query::LimitedRange(&ObjectKey::extent(0, 0, 0..4096)))
1915                .await
1916                .expect("seek failed");
1917            let ItemRef { key, value, .. } = iter.get().expect("missing item");
1918            assert_eq!(key, &items[0].key);
1919            assert_eq!(value, &items[0].value);
1920            iter.advance().await.expect("advance");
1921            let ItemRef { key, value, .. } = iter.get().expect("missing item");
1922            assert_eq!(key, &items[1].key);
1923            assert_eq!(value, &items[1].value);
1924            iter.advance().await.expect("advance");
1925        }
1926        {
1927            // Range contains no keys
1928            let mut iter = merger
1929                .query(Query::LimitedRange(&ObjectKey::extent(0, 0, 8192..12288)))
1930                .await
1931                .expect("seek failed");
1932            let ItemRef { key, .. } = iter.get().expect("missing item");
1933            assert_eq!(key, &items[2].key);
1934            iter.advance().await.expect("advance");
1935            assert!(iter.get().is_none());
1936        }
1937    }
1938
1939    #[fuchsia::test]
1940    async fn test_merge_bloom_filters_full_range() {
1941        let layer_0_items = vec![Item::new(1, 1), Item::new(2, 1)];
1942        let layer_1_items = vec![Item::new(2, 1), Item::new(4, 1)];
1943        let items = [Item::new(1, 1), Item::new(2, 2), Item::new(4, 1)];
1944        let layers: [Arc<dyn Layer<i32, i32>>; 2] =
1945            [write_layer(layer_0_items).await, write_layer(layer_1_items).await];
1946        let mut merger = Merger::new(dyn_layer_ref_iter(&layers), merge_sum, counters());
1947
1948        let mut iter = merger.query(Query::FullRange(&0)).await.expect("seek failed");
1949        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1950        assert_eq!((key, *value), (&items[0].key, items[0].value));
1951        iter.advance().await.expect("advance");
1952        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1953        assert_eq!((key, *value), (&items[1].key, items[1].value));
1954        iter.advance().await.expect("advance");
1955        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1956        assert_eq!((key, *value), (&items[2].key, items[2].value));
1957        iter.advance().await.expect("advance");
1958        assert!(iter.get().is_none());
1959    }
1960
1961    #[fuchsia::test]
1962    async fn test_merge_bloom_filters_full_scan() {
1963        let layer_0_items = vec![Item::new(1, 1), Item::new(2, 1)];
1964        let layer_1_items = vec![Item::new(2, 1), Item::new(4, 1)];
1965        let items = [Item::new(1, 1), Item::new(2, 2), Item::new(4, 1)];
1966        let layers: [Arc<dyn Layer<i32, i32>>; 2] =
1967            [write_layer(layer_0_items).await, write_layer(layer_1_items).await];
1968        let mut merger = Merger::new(dyn_layer_ref_iter(&layers), merge_sum, counters());
1969
1970        let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
1971        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1972        assert_eq!((key, *value), (&items[0].key, items[0].value));
1973        iter.advance().await.expect("advance");
1974        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1975        assert_eq!((key, *value), (&items[1].key, items[1].value));
1976        iter.advance().await.expect("advance");
1977        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1978        assert_eq!((key, *value), (&items[2].key, items[2].value));
1979        iter.advance().await.expect("advance");
1980        assert!(iter.get().is_none());
1981    }
1982}