fxfs/lsm_tree/
merge.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::log::*;
6use crate::lsm_tree;
7use crate::lsm_tree::types::{
8    BoxedItem, Item, ItemRef, Key, Layer, LayerIterator, LayerIteratorMut, LayerKey, MergeType,
9    OrdLowerBound, Value,
10};
11use anyhow::Error;
12use async_trait::async_trait;
13use fuchsia_sync::Mutex;
14use futures::try_join;
15use std::cmp::Ordering;
16use std::collections::BinaryHeap;
17use std::fmt::{Debug, Write};
18use std::ops::{Bound, Deref, DerefMut};
19use std::sync::Arc;
20
21#[derive(Debug, Eq, PartialEq)]
22pub enum ItemOp<K, V> {
23    /// Keeps the item to be presented to the merger subsequently with a new merge pair.
24    Keep,
25
26    /// Discards the item and moves on to the next item in the respective layer.
27    Discard,
28
29    /// Replaces the item with something new which will be presented to the merger subsequently with
30    /// a new pair.
31    Replace(BoxedItem<K, V>),
32}
33
34#[derive(Debug, Eq, PartialEq)]
35pub enum MergeResult<K, V> {
36    /// Emits the left item unchanged. Keeps the right item. This is the common case. Once an item
37    /// has been emitted, it will never be seen again by the merge function.
38    EmitLeft,
39
40    /// All other merge results are covered by the following. Take care when replacing items
41    /// that you replace the correct item. The merger will never merge two items together from
42    /// the same layer. Consider the following scenario:
43    ///
44    ///        +-----------+              +-----------+
45    /// 0:     |    A      |              |    C      |
46    ///        +-----------+--------------+-----------+
47    /// 1:                 |      B       |
48    ///                    +--------------+
49    ///
50    /// Let's say that all three items can be merged together. The merge function will first be
51    /// presented with items A and B, at which point it has the option of replacing the left item
52    /// (i.e. A, in layer 0) or the right item (i.e. B in layer 1). However, if you replace the left
53    /// item, the merge function will not then be given the opportunity to merge it with C, so the
54    /// correct thing to do in this case is to replace the right item B in layer 1, and discard the
55    /// left item. A rule you can use is that you should avoid replacing an item with another item
56    /// whose upper bound exceeds that of the item you are replacing.
57    ///
58    /// There are some combinations that might lead to infinite loops (e.g. None, Keep, Keep) and
59    /// should obviously be avoided.
60    Other { emit: Option<BoxedItem<K, V>>, left: ItemOp<K, V>, right: ItemOp<K, V> },
61}
62
63/// Users must provide a merge function which will take pairs of items, left and right, and return a
64/// merge result. The left item's key will either be less than the right item's key, or if they are
65/// the same, then the left item will be in a lower layer index (lower layer indexes indicate more
66/// recent entries). The last remaining item is always emitted.
67pub type MergeFn<K, V> =
68    fn(&MergeLayerIterator<'_, K, V>, &MergeLayerIterator<'_, K, V>) -> MergeResult<K, V>;
69
70pub enum MergeItem<K, V> {
71    None,
72    Item(BoxedItem<K, V>),
73    Iter,
74}
75
76enum RawIterator<'a, K, V> {
77    None,
78    Const(Box<dyn LayerIterator<K, V> + 'a>),
79    Mut(Box<dyn LayerIteratorMut<K, V> + 'a>),
80}
81
82// SAFETY: LayerIteratorMut is not Sync or Send, but it's only used by merge_into, which doesn't
83// send across threads (and is sync, not async).
84unsafe impl<K, V> Send for RawIterator<'_, K, V> {}
85
86// An iterator that keeps track of where we are for each of the layers. We push these onto a
87// min-heap.
88pub struct MergeLayerIterator<'a, K, V> {
89    layer: Option<&'a dyn Layer<K, V>>,
90
91    // The underlying iterator.
92    iter: RawIterator<'a, K, V>,
93
94    // The index of the layer this is for.
95    pub layer_index: u16,
96
97    // The item we are currently pointing at.
98    item: MergeItem<K, V>,
99}
100
101impl<'a, K, V> MergeLayerIterator<'a, K, V> {
102    pub fn key(&self) -> &K {
103        self.item().key
104    }
105
106    pub fn value(&self) -> &V {
107        self.item().value
108    }
109
110    pub fn sequence(&self) -> u64 {
111        self.item().sequence
112    }
113
114    fn new(layer_index: u16, layer: &'a dyn Layer<K, V>) -> Self {
115        MergeLayerIterator {
116            layer: Some(layer),
117            iter: RawIterator::None,
118            layer_index,
119            item: MergeItem::None,
120        }
121    }
122
123    fn new_with_item(layer_index: u16, item: MergeItem<K, V>) -> Self {
124        MergeLayerIterator { layer: None, iter: RawIterator::None, layer_index, item }
125    }
126
127    fn item(&self) -> ItemRef<'_, K, V> {
128        match &self.item {
129            MergeItem::None => panic!("No item!"),
130            MergeItem::Item(item) => ItemRef::from(item),
131            MergeItem::Iter => self.get().unwrap(),
132        }
133    }
134
135    fn get(&self) -> Option<ItemRef<'_, K, V>> {
136        match &self.iter {
137            RawIterator::None => panic!("No iterator!"),
138            RawIterator::Const(iter) => iter.get(),
139            RawIterator::Mut(iter) => iter.get(),
140        }
141    }
142
143    fn set_item_from_iter(&mut self) {
144        self.item = if match &self.iter {
145            RawIterator::None => unreachable!(),
146            RawIterator::Const(iter) => iter.get(),
147            RawIterator::Mut(iter) => iter.get(),
148        }
149        .is_some()
150        {
151            MergeItem::Iter
152        } else {
153            MergeItem::None
154        };
155    }
156
157    fn take_item(&mut self) -> Option<BoxedItem<K, V>> {
158        if let MergeItem::Item(_) = self.item {
159            let mut item = MergeItem::None;
160            std::mem::swap(&mut self.item, &mut item);
161            if let MergeItem::Item(item) = item {
162                Some(item)
163            } else {
164                unreachable!();
165            }
166        } else {
167            None
168        }
169    }
170
171    async fn advance(&mut self) -> Result<(), Error> {
172        if let MergeItem::Iter = self.item {
173            if let RawIterator::Const(iter) = &mut self.iter {
174                iter.advance().await?;
175            } else {
176                // This will never get called in the RawIterator::Mut case.
177                unreachable!();
178            }
179        }
180        self.set_item_from_iter();
181        Ok(())
182    }
183
184    fn replace(&mut self, item: BoxedItem<K, V>) {
185        self.item = MergeItem::Item(item);
186    }
187
188    fn is_some(&self) -> bool {
189        !matches!(self.item, MergeItem::None)
190    }
191
192    // This function exists so that we can advance multiple iterators concurrently using, say,
193    // try_join!.
194    async fn maybe_advance(&mut self, op: &ItemOp<K, V>) -> Result<(), Error> {
195        if let ItemOp::Keep = op { Ok(()) } else { self.advance().await }
196    }
197}
198
199// -- Ord and friends --
200impl<K: OrdLowerBound, V> Ord for MergeLayerIterator<'_, K, V> {
201    fn cmp(&self, other: &Self) -> Ordering {
202        // Reverse ordering because we want min-heap not max-heap.
203        other.key().cmp_lower_bound(self.key()).then(other.layer_index.cmp(&self.layer_index))
204    }
205}
206impl<K: OrdLowerBound, V> PartialOrd for MergeLayerIterator<'_, K, V> {
207    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
208        Some(self.cmp(other))
209    }
210}
211impl<K: OrdLowerBound, V> PartialEq for MergeLayerIterator<'_, K, V> {
212    fn eq(&self, other: &Self) -> bool {
213        self.cmp(other) == Ordering::Equal
214    }
215}
216impl<K: OrdLowerBound, V> Eq for MergeLayerIterator<'_, K, V> {}
217
218// As we merge items, the current item can be an item that has been replaced (and later emitted) by
219// the merge function, or an item referenced by an iterator, or nothing.
220enum CurrentItem<'a, 'b, K, V> {
221    None,
222    Item(BoxedItem<K, V>),
223    Iterator(&'a mut MergeLayerIterator<'b, K, V>),
224}
225
226impl<'a, 'b, K, V> CurrentItem<'a, 'b, K, V> {
227    // Takes the iterator if one is present and replaces the current item with None; otherwise,
228    // leaves the current item untouched.
229    fn take_iterator(&mut self) -> Option<&'a mut MergeLayerIterator<'b, K, V>> {
230        if let CurrentItem::Iterator(_) = self {
231            let mut result = CurrentItem::None;
232            std::mem::swap(self, &mut result);
233            if let CurrentItem::Iterator(iter) = result {
234                Some(iter)
235            } else {
236                unreachable!();
237            }
238        } else {
239            None
240        }
241    }
242}
243
244impl<'a, K, V> From<&'a CurrentItem<'_, '_, K, V>> for Option<ItemRef<'a, K, V>> {
245    fn from(iter: &'a CurrentItem<'_, '_, K, V>) -> Option<ItemRef<'a, K, V>> {
246        match iter {
247            CurrentItem::None => None,
248            CurrentItem::Iterator(iterator) => Some(iterator.item()),
249            CurrentItem::Item(item) => Some(item.into()),
250        }
251    }
252}
253
254/// Merger is the main entry point to merging.
255pub struct Merger<'a, K, V> {
256    // A buffer containing all the MergeLayerIterator objects.
257    iterators: Vec<MergeLayerIterator<'a, K, V>>,
258
259    // The function to be used for merging items.
260    merge_fn: MergeFn<K, V>,
261
262    // If true, additional logging is enabled.
263    trace: bool,
264
265    // Tracks statistics related to the LSM tree.
266    counters: Arc<Mutex<lsm_tree::Counters>>,
267}
268
269/// Query describes the goal of a search in the LSM tree.  The caller specifies this to guide the
270/// merger on which layer files it must consult.
271/// Layers might be skipped during a query for a few reasons:
272///   * Existence filters might allow layer files to be skipped.
273///   * For bounded queries (i.e. any except FullScan), if the search key has
274///     MergeType::OptimizedMerge, we can use that to omit older layers once we find a match.
275#[derive(Debug, Clone)]
276pub enum Query<'a, K: Key + LayerKey + OrdLowerBound> {
277    /// Point queries look for a specific key in the LSM tree.  In this case, the existence filters
278    /// for each layer file can be used to decide if the layer file needs to be consulted.
279    /// Note that it is an error to use `Point` for range-like keys.  Either `LimitedRange` or
280    /// `FullRange` must be used instead.
281    Point(&'a K),
282    /// LimitedRange queries position the iterator to `K::search_key`, and scans forward until the
283    /// first record which does not overlap the provided key.  In this case, the existence filters
284    /// for each layer file can be used, but we have to check for all possible keys in the range we
285    /// wish to search.  Obviously, that means that the range should be, well, limited.  Fuzzy
286    /// hashes permit this for extent-like keys, but these queries are not the right choice for
287    /// things like searching a directory.
288    LimitedRange(&'a K),
289    /// FullRange queries position the iterator to a starting key, and scan forward to the
290    /// first key of a different type.  In this case, the existence filters are not used.
291    FullRange(&'a K),
292    /// FullScan queries are intended to yield every record in the tree.  In this case, the
293    /// existence filters are not used.
294    FullScan,
295}
296
297impl<'a, K: Key + LayerKey + OrdLowerBound> Query<'a, K> {
298    fn needs_layer<V>(&self, layer: &dyn Layer<K, V>) -> bool {
299        match self {
300            Self::Point(key) => layer.maybe_contains_key(key),
301            Self::LimitedRange(key) => layer.maybe_contains_key(key),
302            Self::FullRange(_) => true,
303            Self::FullScan => true,
304        }
305    }
306}
307
308#[fxfs_trace::trace]
309impl<'a, K: Key + LayerKey + OrdLowerBound, V: Value> Merger<'a, K, V> {
310    pub(super) fn new<I: Iterator<Item = &'a dyn Layer<K, V>>>(
311        layers: I,
312        merge_fn: MergeFn<K, V>,
313        counters: Arc<Mutex<lsm_tree::Counters>>,
314    ) -> Merger<'a, K, V> {
315        Merger {
316            iterators: layers
317                .enumerate()
318                .map(|(index, layer)| MergeLayerIterator::new(index as u16, layer))
319                .collect(),
320            merge_fn: merge_fn,
321            trace: false,
322            counters,
323        }
324    }
325
326    /// Executes `query`, positioning the iterator to the first matching item.  See `Query` for
327    /// details.
328    #[trace]
329    pub async fn query(
330        &mut self,
331        query: Query<'_, K>,
332    ) -> Result<MergerIterator<'_, 'a, K, V>, Error> {
333        if let Query::Point(key) = query {
334            // NB: It is almost certainly an error to provide a range-like key to a Point query,
335            // hence this debug assertion.  This will most likely result in the wrong result, since
336            // the starting bound would be the original range key and therefore we might miss
337            // records that start earlier but overlap with the key.
338            // For example, if there is a layer which contains 0..100 and 100..200, and we search
339            // for 50..150, we should get both extents, which requires a different starting bound
340            // than 50..150 (particularly it would require the `K::search_key`).
341            debug_assert!(!key.is_range_key())
342        };
343        let len = self.iterators.len();
344        let pending_iterators = {
345            fxfs_trace::duration!(c"Merger::filter_layer_files", "len" => len);
346            self.iterators
347                .iter_mut()
348                .rev()
349                .filter(|l| query.needs_layer(l.layer.clone().unwrap()))
350                .collect::<Vec<&mut MergeLayerIterator<'a, K, V>>>()
351        };
352        let layer_count = pending_iterators.len();
353        {
354            let mut counters = self.counters.lock();
355            counters.num_seeks += 1;
356            counters.layer_files_total += len;
357            counters.layer_files_skipped += len - layer_count;
358        }
359        log::debug!(query:?; "Consulting {}/{} layers", layer_count, len);
360        let mut merger_iter = MergerIterator {
361            merge_fn: self.merge_fn,
362            pending_iterators,
363            heap: BinaryHeap::with_capacity(layer_count),
364            item: CurrentItem::None,
365            trace: self.trace,
366            history: String::new(),
367        };
368        let owned_bound;
369        let bound = match query {
370            Query::Point(key) => Bound::Included(key),
371            Query::LimitedRange(key) => {
372                owned_bound = Bound::Included(key.search_key());
373                owned_bound.as_ref()
374            }
375            Query::FullRange(start) => Bound::Included(start),
376            Query::FullScan => Bound::Unbounded,
377        };
378        merger_iter.seek(bound).await?;
379        Ok(merger_iter)
380    }
381
382    pub fn set_trace(&mut self, v: bool) {
383        self.trace = v;
384    }
385}
386
387/// This is an iterator that will allow iteration over merged layers.  The primary interface is via
388/// the LayerIterator trait.
389pub struct MergerIterator<'a, 'b, K, V> {
390    merge_fn: MergeFn<K, V>,
391
392    // Iterators that we have not yet pushed onto the heap.
393    pending_iterators: Vec<&'a mut MergeLayerIterator<'b, K, V>>,
394
395    // A heap with the merge iterators.
396    heap: BinaryHeap<&'a mut MergeLayerIterator<'b, K, V>>,
397
398    // The current item.
399    item: CurrentItem<'a, 'b, K, V>,
400
401    // If true, logs regarding merger behaviour are appended to history.
402    trace: bool,
403
404    // Holds trace information if trace is true.
405    history: String,
406}
407
408impl<'a, 'b, K: Key + LayerKey + OrdLowerBound, V: Value> MergerIterator<'a, 'b, K, V> {
409    async fn seek(&mut self, bound: Bound<&K>) -> Result<(), Error> {
410        let next_key = match bound {
411            Bound::Unbounded => None,
412            Bound::Included(key) => Some(key),
413            Bound::Excluded(_) => panic!("Excluded bounds not supported!"),
414        };
415
416        self.push_iterators(next_key, bound).await?;
417        self.advance_impl(bound).await
418    }
419
420    // Merges items from an array of layers using the provided merge function. The merge function is
421    // repeatedly provided the lowest and the second lowest element, if one exists. In cases where
422    // the two lowest elements compare equal, the element with the lowest layer (i.e. whichever
423    // comes first in the layers array) will come first.  `next_key_bound` is a bound for the next
424    // key we expect.
425    async fn advance_impl(&mut self, next_key_bound: Bound<&K>) -> Result<(), Error> {
426        loop {
427            loop {
428                if self.heap.is_empty() {
429                    self.item = CurrentItem::None;
430                    return Ok(());
431                }
432                let lowest = self.heap.pop().unwrap();
433                let maybe_second_lowest = self.heap.pop();
434                if let Some(second_lowest) = maybe_second_lowest {
435                    let result = (self.merge_fn)(&lowest, &second_lowest);
436                    if self.trace {
437                        writeln!(
438                            self.history,
439                            "merge {:?}, {:?} -> {:?}",
440                            lowest.item(),
441                            second_lowest.item(),
442                            result
443                        )
444                        .unwrap();
445                    }
446                    match result {
447                        MergeResult::EmitLeft => {
448                            self.heap.push(second_lowest);
449                            self.item = CurrentItem::Iterator(lowest);
450                            break;
451                        }
452                        MergeResult::Other { emit, left, right } => {
453                            try_join!(
454                                lowest.maybe_advance(&left),
455                                second_lowest.maybe_advance(&right)
456                            )?;
457                            self.update_item(lowest, left);
458                            self.update_item(second_lowest, right);
459                            if let Some(emit) = emit {
460                                self.item = CurrentItem::Item(emit);
461                                break;
462                            }
463                        }
464                    }
465                } else {
466                    self.item = CurrentItem::Iterator(lowest);
467                    break;
468                }
469            }
470
471            // If the item we're about to yield isn't within `next_key_bound`, ignore it and
472            // continue.  To see how this would happen, imagine the following scenario:
473            //
474            //       0          10          20            30
475            //       +----------+
476            //   0   |          |
477            //       +----------+-----------+
478            //   1   |                      |
479            //       +----------------------+-------------+
480            //   2   |                                    |
481            //       +------------------------------------+
482            //
483            // If we are to seek for 0..1 and then iterate over what we find, we expect the sequence
484            // 0..10, 10..20, 20..30.  After the first seek, we should only consult iterator 0.  For
485            // the first advance, we will consult iterator 1 but we need to merge the 0..20 element
486            // with the 0..10 element which we already emitted.  To make this work, we push the
487            // 0..10 item back on the heap (see advance) and then merge, but that will yield the
488            // 0..10 entry again, so here we need to skip over it, and then merge again, at which
489            // point we should see the 10..20 entry as expected.
490            match next_key_bound {
491                Bound::Included(key)
492                    if self.get().unwrap().key.cmp_upper_bound(key) == Ordering::Less => {}
493                Bound::Excluded(key)
494                    if self.get().unwrap().key.cmp_upper_bound(key) != Ordering::Greater => {}
495                _ => return Ok(()),
496            }
497            if let Some(iterator) = self.item.take_iterator() {
498                iterator.advance().await?;
499                if iterator.is_some() {
500                    self.heap.push(iterator);
501                }
502            }
503        }
504    }
505
506    // Returns whether more iterators are required for the given `next_key`.  See push_iterators.
507    fn needs_more_iterators(&self, next_key: Option<&K>, next_key_bound: Bound<&K>) -> bool {
508        if self.pending_iterators.is_empty() {
509            return false;
510        }
511        if self.heap.is_empty() {
512            return true;
513        }
514        let target;
515        match next_key_bound {
516            Bound::Included(k) => target = k,
517            Bound::Excluded(_) | Bound::Unbounded => return true,
518        }
519        match target.merge_type() {
520            MergeType::FullMerge => true,
521            MergeType::OptimizedMerge => {
522                match next_key {
523                    Some(k) => {
524                        self.heap.peek().unwrap().key().cmp_lower_bound(k) == Ordering::Greater
525                    }
526                    None => {
527                        // If we've found an exact match, return it since nothing needs to merge.
528                        self.heap.peek().unwrap().key().cmp_lower_bound(target) != Ordering::Equal
529                    }
530                }
531            }
532        }
533    }
534
535    // Pushes additional iterators onto the heap until we are confident that the top element will
536    // yield what we are looking for.  If next_key is set, we will stop pushing iterators as soon as
537    // a key is encountered that equals or precedes it.  If next_key is None, then all layers are
538    // pushed.  next_key_bound is the bound to search for if another layer does need to be
539    // consulted.  See the comment for the MergeType trait.
540    async fn push_iterators(
541        &mut self,
542        next_key: Option<&K>,
543        next_key_bound: Bound<&K>,
544    ) -> Result<(), Error> {
545        while self.needs_more_iterators(next_key, next_key_bound) {
546            let iter = self.pending_iterators.pop().unwrap();
547            let sub_iter = iter.layer.as_ref().unwrap().seek(next_key_bound).await?;
548            if self.trace {
549                writeln!(
550                    self.history,
551                    "merger: search for {:?}, found {:?}",
552                    next_key_bound,
553                    sub_iter.get()
554                )
555                .unwrap();
556            }
557            iter.iter = RawIterator::Const(sub_iter);
558            iter.set_item_from_iter();
559            if iter.is_some() {
560                self.heap.push(iter);
561            }
562        }
563        Ok(())
564    }
565
566    // Updates the merge iterator depending on |op|. If discarding, the iterator should have already
567    // been advanced.
568    fn update_item(&mut self, item: &'a mut MergeLayerIterator<'b, K, V>, op: ItemOp<K, V>) {
569        match op {
570            ItemOp::Keep => self.heap.push(item),
571            ItemOp::Discard => {
572                // The iterator should have already been advanced.
573                if item.is_some() {
574                    self.heap.push(item);
575                }
576            }
577            ItemOp::Replace(replacement) => {
578                item.replace(replacement);
579                self.heap.push(item);
580            }
581        }
582    }
583}
584
585#[async_trait]
586impl<'a, K: Key + LayerKey + OrdLowerBound, V: Value> LayerIterator<K, V>
587    for MergerIterator<'a, '_, K, V>
588{
589    async fn advance(&mut self) -> Result<(), Error> {
590        let current_key;
591        let mut next_key = None;
592        let mut next_key_bound = Bound::Unbounded;
593        if !self.pending_iterators.is_empty() {
594            if let Some(ItemRef { key, .. }) = self.get() {
595                next_key = key.next_key();
596                match next_key {
597                    None => {
598                        // If there is no next key, we must now query all layers and the key we
599                        // search for is the immediate successor of the current key.
600                        current_key = Some(key.clone());
601                        next_key_bound = Bound::Excluded(current_key.as_ref().unwrap());
602                    }
603                    Some(ref key) => next_key_bound = Bound::Included(key),
604                }
605            }
606        }
607
608        // Advance the iterator for the current item and push it onto the heap, and also push any
609        // additional iterators onto the heap (by calling push_iterators).
610        if let Some(iterator) = self.item.take_iterator() {
611            if self.needs_more_iterators(next_key.as_ref(), next_key_bound) {
612                let existing_item = iterator.item().boxed();
613                iterator.advance().await?;
614                match &next_key {
615                    Some(next_key)
616                        if iterator.is_some()
617                            && iterator.key().cmp_lower_bound(next_key) != Ordering::Greater =>
618                    {
619                        // In this case, the key immediately following is a good candidate, and all
620                        // we need to do is merge it with existing iterators; we shouldn't need to
621                        // consult with any more iterators.
622                    }
623                    _ => {
624                        // We are going to need to consult more iterators so we need to go back to
625                        // the previous item so that we can merge with it.  See the comment in
626                        // advance_impl.
627                        iterator.replace(existing_item);
628
629                        // We must push other iterators here before pushing iterator onto the heap
630                        // because we know `iterator` would end up at the top of the heap.
631                        self.push_iterators(next_key.as_ref(), next_key_bound).await?;
632                    }
633                }
634            } else {
635                iterator.advance().await?;
636            }
637            if iterator.is_some() {
638                self.heap.push(iterator);
639            }
640        } else {
641            self.push_iterators(next_key.as_ref(), next_key_bound).await?;
642        }
643
644        self.advance_impl(next_key_bound).await
645    }
646
647    fn get(&self) -> Option<ItemRef<'_, K, V>> {
648        (&self.item).into()
649    }
650}
651
652struct MutMergeLayerIterator<'a, K, V>(MergeLayerIterator<'a, K, V>);
653
654impl<K, V> MutMergeLayerIterator<'_, K, V> {
655    fn advance(&mut self) {
656        if let MergeItem::Iter = self.item {
657            self.as_mut().advance();
658        }
659        self.set_item_from_iter();
660    }
661}
662
663impl<'a, K, V> AsMut<dyn LayerIteratorMut<K, V> + 'a> for MutMergeLayerIterator<'a, K, V> {
664    fn as_mut(&mut self) -> &mut (dyn LayerIteratorMut<K, V> + 'a) {
665        let RawIterator::Mut(iter) = &mut self.0.iter else { unreachable!() };
666        iter.as_mut()
667    }
668}
669
670impl<'a, K, V> Deref for MutMergeLayerIterator<'a, K, V> {
671    type Target = MergeLayerIterator<'a, K, V>;
672
673    fn deref(&self) -> &Self::Target {
674        &self.0
675    }
676}
677
678impl<K, V> DerefMut for MutMergeLayerIterator<'_, K, V> {
679    fn deref_mut(&mut self) -> &mut Self::Target {
680        &mut self.0
681    }
682}
683
684// Merges the given item into a mutable layer.
685pub(super) fn merge_into<K: Debug + OrdLowerBound, V: Debug>(
686    mut_iter: Box<dyn LayerIteratorMut<K, V> + '_>,
687    item: Item<K, V>,
688    merge_fn: MergeFn<K, V>,
689) -> Result<(), Error> {
690    let merge_item = if mut_iter.get().is_some() { MergeItem::Iter } else { MergeItem::None };
691    let mut mut_merge_iter = MutMergeLayerIterator(MergeLayerIterator {
692        layer: None,
693        iter: RawIterator::Mut(mut_iter),
694        layer_index: 1,
695        item: merge_item,
696    });
697    let mut item_merge_iter = MergeLayerIterator::new_with_item(0, MergeItem::Item(item.boxed()));
698    while mut_merge_iter.is_some() && item_merge_iter.is_some() {
699        if mut_merge_iter.0 > item_merge_iter {
700            // In this branch the mutable layer is left and the item we're merging-in is right.
701            let merge_result = merge_fn(&mut_merge_iter, &item_merge_iter);
702            debug!(
703                lhs:? = mut_merge_iter.key(),
704                rhs:? = item_merge_iter.key(),
705                result:? = merge_result;
706                "(1) merge");
707            match merge_result {
708                MergeResult::EmitLeft => {
709                    if let Some(item) = mut_merge_iter.take_item() {
710                        mut_merge_iter.as_mut().insert(*item);
711                        mut_merge_iter.set_item_from_iter();
712                    } else {
713                        mut_merge_iter.advance();
714                    }
715                }
716                MergeResult::Other { emit, left, right } => {
717                    if let Some(emit) = emit {
718                        mut_merge_iter.as_mut().insert(*emit);
719                    }
720                    match left {
721                        ItemOp::Keep => {}
722                        ItemOp::Discard => {
723                            if matches!(mut_merge_iter.item, MergeItem::Iter) {
724                                mut_merge_iter.as_mut().erase();
725                            }
726                            mut_merge_iter.set_item_from_iter();
727                        }
728                        ItemOp::Replace(item) => {
729                            if let MergeItem::Iter = mut_merge_iter.item {
730                                mut_merge_iter.as_mut().erase();
731                            }
732                            mut_merge_iter.item = MergeItem::Item(item)
733                        }
734                    }
735                    match right {
736                        ItemOp::Keep => {}
737                        ItemOp::Discard => item_merge_iter.item = MergeItem::None,
738                        ItemOp::Replace(item) => item_merge_iter.item = MergeItem::Item(item),
739                    }
740                }
741            }
742        } else {
743            // In this branch, the item we're merging-in is left and the mutable layer is right.
744            let merge_result = merge_fn(&item_merge_iter, &mut_merge_iter);
745            debug!(
746                lhs:? = mut_merge_iter.key(),
747                rhs:? = item_merge_iter.key(),
748                result:? = merge_result;
749                "(2) merge");
750            match merge_result {
751                MergeResult::EmitLeft => break, // Item is inserted outside the loop
752                MergeResult::Other { emit, left, right } => {
753                    if let Some(emit) = emit {
754                        mut_merge_iter.as_mut().insert(*emit);
755                    }
756                    match left {
757                        ItemOp::Keep => {}
758                        ItemOp::Discard => item_merge_iter.item = MergeItem::None,
759                        ItemOp::Replace(item) => item_merge_iter.item = MergeItem::Item(item),
760                    }
761                    match right {
762                        ItemOp::Keep => {}
763                        ItemOp::Discard => {
764                            if matches!(mut_merge_iter.item, MergeItem::Iter) {
765                                mut_merge_iter.as_mut().erase();
766                            }
767                            mut_merge_iter.set_item_from_iter();
768                        }
769                        ItemOp::Replace(item) => {
770                            if let MergeItem::Iter = mut_merge_iter.item {
771                                mut_merge_iter.as_mut().erase();
772                            }
773                            mut_merge_iter.item = MergeItem::Item(item)
774                        }
775                    }
776                }
777            }
778        }
779    } // while ...
780
781    // The only way we could get here with both items is via the break above, so we know the correct
782    // order required here.
783    if let MergeItem::Item(item) = item_merge_iter.item {
784        mut_merge_iter.as_mut().insert(*item);
785    }
786    if let Some(item) = mut_merge_iter.take_item() {
787        mut_merge_iter.as_mut().insert(*item);
788    }
789    if let RawIterator::Mut(mut iter) = mut_merge_iter.0.iter {
790        iter.commit();
791    }
792    Ok(())
793}
794
795#[cfg(test)]
796mod tests {
797    use super::ItemOp::{Discard, Keep, Replace};
798    use super::{MergeLayerIterator, MergeResult, Merger};
799    use crate::lsm_tree::persistent_layer::{PersistentLayer, PersistentLayerWriter};
800    use crate::lsm_tree::skip_list_layer::SkipListLayer;
801    use crate::lsm_tree::types::{
802        FuzzyHash, Item, ItemRef, Key, Layer, LayerIterator, LayerKey, LayerWriter, MergeType,
803        OrdLowerBound, OrdUpperBound, SortByU64,
804    };
805    use crate::lsm_tree::{self, Query, Value};
806    use crate::object_store::{self, ObjectKey, ObjectValue, VOLUME_DATA_KEY_ID};
807    use crate::serialized_types::{
808        LATEST_VERSION, Version, Versioned, VersionedLatest, versioned_type,
809    };
810    use crate::testing::fake_object::{FakeObject, FakeObjectHandle};
811    use crate::testing::writer::Writer;
812    use fprint::TypeFingerprint;
813    use fuchsia_sync::Mutex;
814    use fxfs_macros::FuzzyHash;
815    use rand::Rng;
816    use std::hash::Hash;
817    use std::ops::{Bound, Range};
818    use std::sync::Arc;
819
820    #[derive(
821        Clone,
822        Eq,
823        Hash,
824        FuzzyHash,
825        PartialEq,
826        Debug,
827        serde::Serialize,
828        serde::Deserialize,
829        TypeFingerprint,
830        Versioned,
831    )]
832    struct TestKey(Range<u64>);
833
834    impl Value for i32 {
835        const DELETED_MARKER: Self = 0;
836    }
837
838    versioned_type! { 1.. => TestKey }
839
840    impl SortByU64 for TestKey {
841        fn get_leading_u64(&self) -> u64 {
842            self.0.start
843        }
844    }
845
846    impl LayerKey for TestKey {
847        fn merge_type(&self) -> MergeType {
848            MergeType::OptimizedMerge
849        }
850
851        fn next_key(&self) -> Option<Self> {
852            Some(TestKey(self.0.end..self.0.end + 1))
853        }
854    }
855
856    impl OrdUpperBound for TestKey {
857        fn cmp_upper_bound(&self, other: &TestKey) -> std::cmp::Ordering {
858            self.0.end.cmp(&other.0.end)
859        }
860    }
861
862    impl OrdLowerBound for TestKey {
863        fn cmp_lower_bound(&self, other: &Self) -> std::cmp::Ordering {
864            self.0.start.cmp(&other.0.start)
865        }
866    }
867
868    fn layer_ref_iter<K: Key, V: Value>(
869        layers: &[Arc<SkipListLayer<K, V>>],
870    ) -> impl Iterator<Item = &dyn Layer<K, V>> {
871        layers.iter().map(|x| x.as_ref() as &dyn Layer<K, V>)
872    }
873
874    fn dyn_layer_ref_iter<K: Key, V: Value>(
875        layers: &[Arc<dyn Layer<K, V>>],
876    ) -> impl Iterator<Item = &dyn Layer<K, V>> {
877        layers.iter().map(|x| x.as_ref())
878    }
879
880    fn counters() -> Arc<Mutex<lsm_tree::Counters>> {
881        Arc::new(Mutex::new(lsm_tree::Counters::default()))
882    }
883
884    #[fuchsia::test]
885    async fn test_emit_left() {
886        let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
887        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
888        skip_lists[0].insert(items[1].clone()).expect("insert error");
889        skip_lists[1].insert(items[0].clone()).expect("insert error");
890        let mut merger = Merger::new(
891            layer_ref_iter(&skip_lists),
892            |_left, _right| MergeResult::EmitLeft,
893            counters(),
894        );
895        let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
896        let ItemRef { key, value, .. } = iter.get().expect("missing item");
897        assert_eq!((key, value), (&items[0].key, &items[0].value));
898        iter.advance().await.unwrap();
899        let ItemRef { key, value, .. } = iter.get().expect("missing item");
900        assert_eq!((key, value), (&items[1].key, &items[1].value));
901        iter.advance().await.unwrap();
902        assert!(iter.get().is_none());
903    }
904
905    #[fuchsia::test]
906    async fn test_other_emit() {
907        let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
908        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
909        skip_lists[0].insert(items[1].clone()).expect("insert error");
910        skip_lists[1].insert(items[0].clone()).expect("insert error");
911        let mut merger = Merger::new(
912            layer_ref_iter(&skip_lists),
913            |_left, _right| MergeResult::Other {
914                emit: Some(Item::new(TestKey(3..3), 3).boxed()),
915                left: Discard,
916                right: Discard,
917            },
918            counters(),
919        );
920        let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
921
922        let ItemRef { key, value, .. } = iter.get().expect("missing item");
923        assert_eq!((key, value), (&TestKey(3..3), &3));
924        iter.advance().await.unwrap();
925        assert!(iter.get().is_none());
926    }
927
928    #[fuchsia::test]
929    async fn test_replace_left() {
930        let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
931        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
932        skip_lists[0].insert(items[1].clone()).expect("insert error");
933        skip_lists[1].insert(items[0].clone()).expect("insert error");
934        let mut merger = Merger::new(
935            layer_ref_iter(&skip_lists),
936            |_left, _right| MergeResult::Other {
937                emit: None,
938                left: Replace(Item::new(TestKey(3..3), 3).boxed()),
939                right: Discard,
940            },
941            counters(),
942        );
943        let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
944
945        // The merger should replace the left item and then after discarding the right item, it
946        // should emit the replacement.
947        let ItemRef { key, value, .. } = iter.get().expect("missing item");
948        assert_eq!((key, value), (&TestKey(3..3), &3));
949        iter.advance().await.unwrap();
950        assert!(iter.get().is_none());
951    }
952
953    #[fuchsia::test]
954    async fn test_replace_right() {
955        let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
956        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
957        skip_lists[0].insert(items[1].clone()).expect("insert error");
958        skip_lists[1].insert(items[0].clone()).expect("insert error");
959        let mut merger = Merger::new(
960            layer_ref_iter(&skip_lists),
961            |_left, _right| MergeResult::Other {
962                emit: None,
963                left: Discard,
964                right: Replace(Item::new(TestKey(3..3), 3).boxed()),
965            },
966            counters(),
967        );
968        let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
969
970        // The merger should replace the right item and then after discarding the left item, it
971        // should emit the replacement.
972        let ItemRef { key, value, .. } = iter.get().expect("missing item");
973        assert_eq!((key, value), (&TestKey(3..3), &3));
974        iter.advance().await.unwrap();
975        assert!(iter.get().is_none());
976    }
977
978    #[fuchsia::test]
979    async fn test_left_less_than_right() {
980        let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
981        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
982        skip_lists[0].insert(items[1].clone()).expect("insert error");
983        skip_lists[1].insert(items[0].clone()).expect("insert error");
984        let mut merger = Merger::new(
985            layer_ref_iter(&skip_lists),
986            |left, right| {
987                assert_eq!((left.key(), left.value()), (&TestKey(1..1), &1));
988                assert_eq!((right.key(), right.value()), (&TestKey(2..2), &2));
989                MergeResult::EmitLeft
990            },
991            counters(),
992        );
993        merger.query(Query::FullScan).await.expect("seek failed");
994    }
995
996    #[fuchsia::test]
997    async fn test_left_equals_right() {
998        let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
999        let item = Item::new(TestKey(1..1), 1);
1000        skip_lists[0].insert(item.clone()).expect("insert error");
1001        skip_lists[1].insert(item.clone()).expect("insert error");
1002        let mut merger = Merger::new(
1003            layer_ref_iter(&skip_lists),
1004            |left, right| {
1005                assert_eq!((left.key(), left.value()), (&TestKey(1..1), &1));
1006                assert_eq!((right.key(), right.value()), (&TestKey(1..1), &1));
1007                assert_eq!(left.layer_index, 0);
1008                assert_eq!(right.layer_index, 1);
1009                MergeResult::EmitLeft
1010            },
1011            counters(),
1012        );
1013        merger.query(Query::FullScan).await.expect("seek failed");
1014    }
1015
1016    #[fuchsia::test]
1017    async fn test_keep() {
1018        let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1019        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
1020        skip_lists[0].insert(items[1].clone()).expect("insert error");
1021        skip_lists[1].insert(items[0].clone()).expect("insert error");
1022        let mut merger = Merger::new(
1023            layer_ref_iter(&skip_lists),
1024            |left, right| {
1025                if left.key() == &TestKey(1..1) {
1026                    MergeResult::Other {
1027                        emit: None,
1028                        left: Replace(Item::new(TestKey(3..3), 3).boxed()),
1029                        right: Keep,
1030                    }
1031                } else {
1032                    assert_eq!(left.key(), &TestKey(2..2));
1033                    assert_eq!(right.key(), &TestKey(3..3));
1034                    MergeResult::Other { emit: None, left: Discard, right: Keep }
1035                }
1036            },
1037            counters(),
1038        );
1039        let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
1040
1041        // The merger should first replace left and then it should call the merger again with 2 & 3
1042        // and end up just keeping 3.
1043        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1044        assert_eq!((key, value), (&TestKey(3..3), &3));
1045        iter.advance().await.unwrap();
1046        assert!(iter.get().is_none());
1047    }
1048
1049    #[fuchsia::test]
1050    async fn test_merge_10_layers() {
1051        let skip_lists: Vec<_> = (0..10).map(|_| SkipListLayer::new(100)).collect();
1052        let mut rng = rand::rng();
1053        for i in 0..100 {
1054            skip_lists[rng.random_range(0..10) as usize]
1055                .insert(Item::new(TestKey(i..i), i))
1056                .expect("insert error");
1057        }
1058        let mut merger = Merger::new(
1059            layer_ref_iter(&skip_lists),
1060            |_left, _right| MergeResult::EmitLeft,
1061            counters(),
1062        );
1063        let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
1064
1065        for i in 0..100 {
1066            let ItemRef { key, value, .. } = iter.get().expect("missing item");
1067            assert_eq!((key, value), (&TestKey(i..i), &i));
1068            iter.advance().await.unwrap();
1069        }
1070        assert!(iter.get().is_none());
1071    }
1072
1073    #[fuchsia::test]
1074    async fn test_merge_uses_cmp_lower_bound() {
1075        let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1076        let items = [Item::new(TestKey(1..10), 1), Item::new(TestKey(2..3), 2)];
1077        skip_lists[0].insert(items[1].clone()).expect("insert error");
1078        skip_lists[1].insert(items[0].clone()).expect("insert error");
1079        let mut merger = Merger::new(
1080            layer_ref_iter(&skip_lists),
1081            |_left, _right| MergeResult::EmitLeft,
1082            counters(),
1083        );
1084        let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
1085
1086        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1087        assert_eq!((key, value), (&items[0].key, &items[0].value));
1088        iter.advance().await.unwrap();
1089        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1090        assert_eq!((key, value), (&items[1].key, &items[1].value));
1091        iter.advance().await.unwrap();
1092        assert!(iter.get().is_none());
1093    }
1094
1095    #[fuchsia::test]
1096    async fn test_merge_into_emit_left() {
1097        let skip_list = SkipListLayer::new(100);
1098        let items =
1099            [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2), Item::new(TestKey(3..3), 3)];
1100        skip_list.insert(items[0].clone()).expect("insert error");
1101        skip_list.insert(items[2].clone()).expect("insert error");
1102        skip_list
1103            .merge_into(items[1].clone(), &items[0].key, |_left, _right| MergeResult::EmitLeft);
1104
1105        let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1106        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1107        assert_eq!((key, value), (&items[0].key, &items[0].value));
1108        iter.advance().await.unwrap();
1109        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1110        assert_eq!((key, value), (&items[1].key, &items[1].value));
1111        iter.advance().await.unwrap();
1112        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1113        assert_eq!((key, value), (&items[2].key, &items[2].value));
1114        iter.advance().await.unwrap();
1115        assert!(iter.get().is_none());
1116    }
1117
1118    #[fuchsia::test]
1119    async fn test_merge_into_emit_last_after_replacing() {
1120        let skip_list = SkipListLayer::new(100);
1121        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
1122        skip_list.insert(items[0].clone()).expect("insert error");
1123
1124        skip_list.merge_into(items[1].clone(), &items[0].key, |left, right| {
1125            if left.key() == &TestKey(1..1) {
1126                assert_eq!(right.key(), &TestKey(2..2));
1127                MergeResult::Other {
1128                    emit: None,
1129                    left: Replace(Item::new(TestKey(3..3), 3).boxed()),
1130                    right: Keep,
1131                }
1132            } else {
1133                assert_eq!(left.key(), &TestKey(2..2));
1134                assert_eq!(right.key(), &TestKey(3..3));
1135                MergeResult::EmitLeft
1136            }
1137        });
1138
1139        let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1140        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1141        assert_eq!((key, value), (&items[1].key, &items[1].value));
1142        iter.advance().await.unwrap();
1143        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1144        assert_eq!((key, value), (&TestKey(3..3), &3));
1145        iter.advance().await.unwrap();
1146        assert!(iter.get().is_none());
1147    }
1148
1149    #[fuchsia::test]
1150    async fn test_merge_into_emit_left_after_replacing() {
1151        let skip_list = SkipListLayer::new(100);
1152        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(3..3), 3)];
1153        skip_list.insert(items[0].clone()).expect("insert error");
1154
1155        skip_list.merge_into(items[1].clone(), &items[0].key, |left, right| {
1156            if left.key() == &TestKey(1..1) {
1157                assert_eq!(right.key(), &TestKey(3..3));
1158                MergeResult::Other {
1159                    emit: None,
1160                    left: Replace(Item::new(TestKey(2..2), 2).boxed()),
1161                    right: Keep,
1162                }
1163            } else {
1164                assert_eq!(left.key(), &TestKey(2..2));
1165                assert_eq!(right.key(), &TestKey(3..3));
1166                MergeResult::EmitLeft
1167            }
1168        });
1169
1170        let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1171        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1172        assert_eq!((key, value), (&TestKey(2..2), &2));
1173        iter.advance().await.unwrap();
1174        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1175        assert_eq!((key, value), (&items[1].key, &items[1].value));
1176        iter.advance().await.unwrap();
1177        assert!(iter.get().is_none());
1178    }
1179
1180    // This tests emitting in both branches of merge_into, and most of the discard paths.
1181    #[fuchsia::test]
1182    async fn test_merge_into_emit_other_and_discard() {
1183        let skip_list = SkipListLayer::new(100);
1184        let items =
1185            [Item::new(TestKey(1..1), 1), Item::new(TestKey(3..3), 3), Item::new(TestKey(5..5), 3)];
1186        skip_list.insert(items[0].clone()).expect("insert error");
1187        skip_list.insert(items[2].clone()).expect("insert error");
1188
1189        skip_list.merge_into(items[1].clone(), &items[0].key, |left, right| {
1190            if left.key() == &TestKey(1..1) {
1191                // This tests the top branch in merge_into.
1192                assert_eq!(right.key(), &TestKey(3..3));
1193                MergeResult::Other {
1194                    emit: Some(Item::new(TestKey(2..2), 2).boxed()),
1195                    left: Discard,
1196                    right: Keep,
1197                }
1198            } else {
1199                // This tests the bottom branch in merge_into.
1200                assert_eq!(left.key(), &TestKey(3..3));
1201                assert_eq!(right.key(), &TestKey(5..5));
1202                MergeResult::Other {
1203                    emit: Some(Item::new(TestKey(4..4), 4).boxed()),
1204                    left: Discard,
1205                    right: Discard,
1206                }
1207            }
1208        });
1209
1210        let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1211        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1212        assert_eq!((key, value), (&TestKey(2..2), &2));
1213        iter.advance().await.unwrap();
1214        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1215        assert_eq!((key, value), (&TestKey(4..4), &4));
1216        iter.advance().await.unwrap();
1217        assert!(iter.get().is_none());
1218    }
1219
1220    // This tests replacing the item and discarding the right item (the one remaining untested
1221    // discard path) in the top branch in merge_into.
1222    #[fuchsia::test]
1223    async fn test_merge_into_replace_and_discard() {
1224        let skip_list = SkipListLayer::new(100);
1225        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(3..3), 3)];
1226        skip_list.insert(items[0].clone()).expect("insert error");
1227
1228        skip_list.merge_into(items[1].clone(), &items[0].key, |_left, _right| MergeResult::Other {
1229            emit: Some(Item::new(TestKey(2..2), 2).boxed()),
1230            left: Replace(Item::new(TestKey(4..4), 4).boxed()),
1231            right: Discard,
1232        });
1233
1234        let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1235        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1236        assert_eq!((key, value), (&TestKey(2..2), &2));
1237        iter.advance().await.unwrap();
1238        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1239        assert_eq!((key, value), (&TestKey(4..4), &4));
1240        iter.advance().await.unwrap();
1241        assert!(iter.get().is_none());
1242    }
1243
1244    // This tests replacing the right item in the top branch of merge_into and the left item in the
1245    // bottom branch of merge_into.
1246    #[fuchsia::test]
1247    async fn test_merge_into_replace_merge_item() {
1248        let skip_list = SkipListLayer::new(100);
1249        let items =
1250            [Item::new(TestKey(1..1), 1), Item::new(TestKey(3..3), 3), Item::new(TestKey(5..5), 5)];
1251        skip_list.insert(items[0].clone()).expect("insert error");
1252        skip_list.insert(items[2].clone()).expect("insert error");
1253
1254        skip_list.merge_into(items[1].clone(), &items[0].key, |_left, right| {
1255            if right.key() == &TestKey(3..3) {
1256                MergeResult::Other {
1257                    emit: None,
1258                    left: Discard,
1259                    right: Replace(Item::new(TestKey(2..2), 2).boxed()),
1260                }
1261            } else {
1262                assert_eq!(right.key(), &TestKey(5..5));
1263                MergeResult::Other {
1264                    emit: None,
1265                    left: Replace(Item::new(TestKey(4..4), 4).boxed()),
1266                    right: Discard,
1267                }
1268            }
1269        });
1270
1271        let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1272        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1273        assert_eq!((key, value), (&TestKey(4..4), &4));
1274        iter.advance().await.unwrap();
1275        assert!(iter.get().is_none());
1276    }
1277
1278    // This tests replacing the right item in the bottom branch of merge_into.
1279    #[fuchsia::test]
1280    async fn test_merge_into_replace_existing() {
1281        let skip_list = SkipListLayer::new(100);
1282        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(3..3), 3)];
1283        skip_list.insert(items[1].clone()).expect("insert error");
1284
1285        skip_list.merge_into(items[0].clone(), &items[0].key, |_left, right| {
1286            if right.key() == &TestKey(3..3) {
1287                MergeResult::Other {
1288                    emit: None,
1289                    left: Keep,
1290                    right: Replace(Item::new(TestKey(2..2), 2).boxed()),
1291                }
1292            } else {
1293                MergeResult::EmitLeft
1294            }
1295        });
1296
1297        let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1298        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1299        assert_eq!((key, value), (&items[0].key, &items[0].value));
1300        iter.advance().await.unwrap();
1301        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1302        assert_eq!((key, value), (&TestKey(2..2), &2));
1303        iter.advance().await.unwrap();
1304        assert!(iter.get().is_none());
1305    }
1306
1307    #[fuchsia::test]
1308    async fn test_merge_into_discard_last() {
1309        let skip_list = SkipListLayer::new(100);
1310        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
1311        skip_list.insert(items[0].clone()).expect("insert error");
1312
1313        skip_list.merge_into(items[1].clone(), &items[0].key, |_left, _right| MergeResult::Other {
1314            emit: None,
1315            left: Discard,
1316            right: Keep,
1317        });
1318
1319        let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1320        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1321        assert_eq!((key, value), (&items[1].key, &items[1].value));
1322        iter.advance().await.unwrap();
1323        assert!(iter.get().is_none());
1324    }
1325
1326    #[fuchsia::test]
1327    async fn test_merge_into_empty() {
1328        let skip_list = SkipListLayer::new(100);
1329        let items = [Item::new(TestKey(1..1), 1)];
1330
1331        skip_list.merge_into(items[0].clone(), &items[0].key, |_left, _right| {
1332            panic!("Unexpected merge!");
1333        });
1334
1335        let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1336        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1337        assert_eq!((key, value), (&items[0].key, &items[0].value));
1338        iter.advance().await.unwrap();
1339        assert!(iter.get().is_none());
1340    }
1341
1342    #[fuchsia::test]
1343    async fn test_seek_uses_minimum_number_of_iterators() {
1344        let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1345        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(1..1), 2)];
1346        skip_lists[0].insert(items[0].clone()).expect("insert error");
1347        skip_lists[1].insert(items[1].clone()).expect("insert error");
1348        let mut merger = Merger::new(
1349            layer_ref_iter(&skip_lists),
1350            |_left, _right| MergeResult::Other { emit: None, left: Discard, right: Keep },
1351            counters(),
1352        );
1353        let iter = merger.query(Query::FullRange(&items[0].key)).await.expect("seek failed");
1354
1355        // Seek should only search in the first skip list, so no merge should take place, and we'll
1356        // know if it has because we'll see a different value (2 rather than 1).
1357        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1358        assert_eq!((key, value), (&items[0].key, &items[0].value));
1359    }
1360
1361    // Checks that merging the given layers produces |expected| sequence of items starting from
1362    // |start|.
1363    async fn test_advance<K: Eq + Key + LayerKey + OrdLowerBound>(
1364        layers: &[&[(K, i32)]],
1365        query: Query<'_, K>,
1366        expected: &[(K, i32)],
1367    ) {
1368        let mut skip_lists = Vec::new();
1369        for &layer in layers {
1370            let skip_list = SkipListLayer::new(100);
1371            for (k, v) in layer {
1372                skip_list.insert(Item::new(k.clone(), *v)).expect("insert error");
1373            }
1374            skip_lists.push(skip_list);
1375        }
1376        let mut merger = Merger::new(
1377            layer_ref_iter(&skip_lists),
1378            |_left, _right| MergeResult::EmitLeft,
1379            counters(),
1380        );
1381        let mut iter = merger.query(query).await.expect("seek failed");
1382        for (k, v) in expected {
1383            let ItemRef { key, value, .. } = iter.get().expect("get failed");
1384            assert_eq!((key, value), (k, v));
1385            iter.advance().await.expect("advance failed");
1386        }
1387        assert!(iter.get().is_none());
1388    }
1389
1390    #[fuchsia::test]
1391    async fn test_seek_skips_replaced_items() {
1392        // The 1..2 and the 2..3 items are overwritten and merging them should be skipped.
1393        test_advance(
1394            &[
1395                &[(TestKey(1..2), 1), (TestKey(2..3), 2), (TestKey(4..5), 3)],
1396                &[(TestKey(1..2), 4), (TestKey(2..3), 5), (TestKey(3..4), 6)],
1397            ],
1398            Query::FullRange(&TestKey(1..2)),
1399            &[(TestKey(1..2), 1), (TestKey(2..3), 2), (TestKey(3..4), 6), (TestKey(4..5), 3)],
1400        )
1401        .await;
1402    }
1403
1404    #[fuchsia::test]
1405    async fn test_advance_skips_replaced_items_at_end() {
1406        // Like the last test, the 1..2 item is overwritten and seeking for it should skip the merge
1407        // but this time, the keys are at the end.
1408        test_advance(
1409            &[&[(TestKey(1..2), 1)], &[(TestKey(1..2), 2)]],
1410            Query::FullRange(&TestKey(1..2)),
1411            &[(TestKey(1..2), 1)],
1412        )
1413        .await;
1414    }
1415
1416    #[derive(
1417        Clone,
1418        Eq,
1419        Hash,
1420        FuzzyHash,
1421        PartialEq,
1422        Debug,
1423        serde::Serialize,
1424        serde::Deserialize,
1425        TypeFingerprint,
1426        Versioned,
1427    )]
1428    struct TestKeyWithFullMerge(Range<u64>);
1429
1430    versioned_type! { 1.. => TestKeyWithFullMerge }
1431
1432    impl LayerKey for TestKeyWithFullMerge {
1433        fn merge_type(&self) -> MergeType {
1434            MergeType::FullMerge
1435        }
1436    }
1437
1438    impl SortByU64 for TestKeyWithFullMerge {
1439        fn get_leading_u64(&self) -> u64 {
1440            self.0.start
1441        }
1442    }
1443
1444    impl OrdUpperBound for TestKeyWithFullMerge {
1445        fn cmp_upper_bound(&self, other: &TestKeyWithFullMerge) -> std::cmp::Ordering {
1446            self.0.end.cmp(&other.0.end)
1447        }
1448    }
1449
1450    impl OrdLowerBound for TestKeyWithFullMerge {
1451        fn cmp_lower_bound(&self, other: &Self) -> std::cmp::Ordering {
1452            self.0.start.cmp(&other.0.start)
1453        }
1454    }
1455
1456    // Same set up as `test_seek_skips_replaced_items` except here nothing should skip. Any seek
1457    // should just place the iterator at a particular spot in the merged layers.
1458    #[fuchsia::test]
1459    async fn test_full_merge_consistent_advance_ordering() {
1460        let layer_set = [
1461            [
1462                (TestKeyWithFullMerge(1..2), 1i32),
1463                (TestKeyWithFullMerge(2..3), 2i32),
1464                (TestKeyWithFullMerge(4..5), 3i32),
1465            ]
1466            .as_slice(),
1467            [
1468                (TestKeyWithFullMerge(1..2), 4i32),
1469                (TestKeyWithFullMerge(2..3), 5i32),
1470                (TestKeyWithFullMerge(3..4), 6i32),
1471            ]
1472            .as_slice(),
1473        ];
1474
1475        let full_merge_result = [
1476            (TestKeyWithFullMerge(1..2), 1),
1477            (TestKeyWithFullMerge(1..2), 4),
1478            (TestKeyWithFullMerge(2..3), 2),
1479            (TestKeyWithFullMerge(2..3), 5),
1480            (TestKeyWithFullMerge(3..4), 6),
1481            (TestKeyWithFullMerge(4..5), 3),
1482        ];
1483
1484        test_advance(layer_set.as_slice(), Query::FullScan, &full_merge_result).await;
1485
1486        test_advance(
1487            layer_set.as_slice(),
1488            Query::FullRange(&TestKeyWithFullMerge(1..2)),
1489            &full_merge_result,
1490        )
1491        .await;
1492
1493        test_advance(
1494            layer_set.as_slice(),
1495            Query::FullRange(&TestKeyWithFullMerge(2..3)),
1496            &full_merge_result[2..],
1497        )
1498        .await;
1499
1500        test_advance(
1501            layer_set.as_slice(),
1502            Query::FullRange(&TestKeyWithFullMerge(3..4)),
1503            &full_merge_result[4..],
1504        )
1505        .await;
1506    }
1507
1508    #[fuchsia::test]
1509    async fn test_full_merge_always_consult_all_layers() {
1510        // | 1 |   |
1511        // |   | 2 |
1512        // | 3 | 4 |
1513        let skip_lists =
1514            [SkipListLayer::new(100), SkipListLayer::new(100), SkipListLayer::new(100)];
1515        let items = [
1516            Item::new(TestKeyWithFullMerge(1..2), 1),
1517            Item::new(TestKeyWithFullMerge(2..3), 2),
1518            Item::new(TestKeyWithFullMerge(1..2), 3),
1519            Item::new(TestKeyWithFullMerge(2..3), 4),
1520        ];
1521        skip_lists[0].insert(items[0].clone()).expect("insert error");
1522        skip_lists[1].insert(items[1].clone()).expect("insert error");
1523        skip_lists[2].insert(items[2].clone()).expect("insert error");
1524        skip_lists[2].insert(items[3].clone()).expect("insert error");
1525        let mut merger = Merger::new(
1526            layer_ref_iter(&skip_lists),
1527            |left, right| {
1528                // Sum matching keys.
1529                if left.key() == right.key() {
1530                    MergeResult::Other {
1531                        emit: None,
1532                        left: Discard,
1533                        right: Replace(
1534                            Item::new(left.key().clone(), left.value() + right.value()).boxed(),
1535                        ),
1536                    }
1537                } else {
1538                    MergeResult::EmitLeft
1539                }
1540            },
1541            counters(),
1542        );
1543        let mut iter = merger.query(Query::FullRange(&items[0].key)).await.expect("seek failed");
1544
1545        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1546        assert_eq!((key, *value), (&items[0].key, items[0].value + items[2].value));
1547        iter.advance().await.expect("advance");
1548        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1549        assert_eq!((key, *value), (&items[1].key, items[1].value + items[3].value));
1550        iter.advance().await.expect("advance");
1551        assert!(iter.get().is_none());
1552    }
1553
1554    #[derive(
1555        Clone,
1556        Eq,
1557        Hash,
1558        FuzzyHash,
1559        PartialEq,
1560        Debug,
1561        serde::Serialize,
1562        serde::Deserialize,
1563        TypeFingerprint,
1564        Versioned,
1565    )]
1566    struct TestKeyWithDefaultLayerKey(Range<u64>);
1567
1568    versioned_type! { 1.. => TestKeyWithDefaultLayerKey }
1569
1570    // Default layer key is using `MergeType::FullMerge` and returns None for `next_key()`.
1571    impl LayerKey for TestKeyWithDefaultLayerKey {}
1572
1573    impl SortByU64 for TestKeyWithDefaultLayerKey {
1574        fn get_leading_u64(&self) -> u64 {
1575            self.0.start
1576        }
1577    }
1578
1579    impl OrdUpperBound for TestKeyWithDefaultLayerKey {
1580        fn cmp_upper_bound(&self, other: &TestKeyWithDefaultLayerKey) -> std::cmp::Ordering {
1581            self.0.end.cmp(&other.0.end)
1582        }
1583    }
1584
1585    impl OrdLowerBound for TestKeyWithDefaultLayerKey {
1586        fn cmp_lower_bound(&self, other: &Self) -> std::cmp::Ordering {
1587            self.0.start.cmp(&other.0.start)
1588        }
1589    }
1590
1591    #[fuchsia::test]
1592    async fn test_no_merge_unbounded_include_all_layers() {
1593        test_advance(
1594            &[
1595                &[
1596                    (TestKeyWithDefaultLayerKey(1..2), 1),
1597                    (TestKeyWithDefaultLayerKey(2..3), 2),
1598                    (TestKeyWithDefaultLayerKey(4..5), 3),
1599                ],
1600                &[
1601                    (TestKeyWithDefaultLayerKey(1..2), 4),
1602                    (TestKeyWithDefaultLayerKey(2..3), 5),
1603                    (TestKeyWithDefaultLayerKey(3..4), 6),
1604                ],
1605            ],
1606            Query::FullScan,
1607            &[
1608                (TestKeyWithDefaultLayerKey(1..2), 1),
1609                (TestKeyWithDefaultLayerKey(1..2), 4),
1610                (TestKeyWithDefaultLayerKey(2..3), 2),
1611                (TestKeyWithDefaultLayerKey(2..3), 5),
1612                (TestKeyWithDefaultLayerKey(3..4), 6),
1613                (TestKeyWithDefaultLayerKey(4..5), 3),
1614            ],
1615        )
1616        .await;
1617    }
1618
1619    #[fuchsia::test]
1620    async fn test_no_merge_proceeds_comprehensively_after_seek() {
1621        test_advance(
1622            &[
1623                &[
1624                    (TestKeyWithDefaultLayerKey(1..2), 1),
1625                    (TestKeyWithDefaultLayerKey(2..3), 2),
1626                    (TestKeyWithDefaultLayerKey(4..5), 3),
1627                ],
1628                &[
1629                    (TestKeyWithDefaultLayerKey(1..2), 4),
1630                    (TestKeyWithDefaultLayerKey(2..3), 5),
1631                    (TestKeyWithDefaultLayerKey(3..4), 6),
1632                ],
1633            ],
1634            Query::FullRange(&TestKeyWithDefaultLayerKey(1..2)),
1635            &[
1636                (TestKeyWithDefaultLayerKey(1..2), 1),
1637                (TestKeyWithDefaultLayerKey(1..2), 4),
1638                (TestKeyWithDefaultLayerKey(2..3), 2),
1639                (TestKeyWithDefaultLayerKey(2..3), 5),
1640                (TestKeyWithDefaultLayerKey(3..4), 6),
1641                (TestKeyWithDefaultLayerKey(4..5), 3),
1642            ],
1643        )
1644        .await;
1645    }
1646
1647    #[fuchsia::test]
1648    async fn test_no_merge_seek_finds_lower_layer() {
1649        let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1650        let items = [
1651            Item::new(TestKeyWithDefaultLayerKey(2..3), 1),
1652            Item::new(TestKeyWithDefaultLayerKey(1..1), 2),
1653        ];
1654        skip_lists[0].insert(items[0].clone()).expect("insert error");
1655        skip_lists[1].insert(items[1].clone()).expect("insert error");
1656        let mut merger = Merger::new(
1657            layer_ref_iter(&skip_lists),
1658            |_left, _right| MergeResult::EmitLeft,
1659            counters(),
1660        );
1661        let iter = merger.query(Query::FullRange(&items[1].key)).await.expect("seek failed");
1662
1663        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1664        assert_eq!((key, value), (&items[1].key, &items[1].value));
1665    }
1666
1667    #[fuchsia::test]
1668    async fn test_no_merge_seek_stops_at_exact_match() {
1669        let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1670        let items = [
1671            Item::new(TestKeyWithDefaultLayerKey(2..3), 1),
1672            Item::new(TestKeyWithDefaultLayerKey(1..4), 2),
1673        ];
1674        skip_lists[0].insert(items[0].clone()).expect("insert error");
1675        skip_lists[1].insert(items[1].clone()).expect("insert error");
1676        let mut merger = Merger::new(
1677            layer_ref_iter(&skip_lists),
1678            |_left, _right| MergeResult::Other { emit: None, left: Discard, right: Keep },
1679            counters(),
1680        );
1681        let iter = merger.query(Query::FullRange(&items[0].key)).await.expect("seek failed");
1682
1683        // Seek should only search in the first skip list, so no merge should take place, and we'll
1684        // know if it has because we'll see a different value (2 rather than 1).
1685        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1686        assert_eq!((key, value), (&items[0].key, &items[0].value));
1687    }
1688
1689    #[fuchsia::test]
1690    async fn test_seek_less_than() {
1691        let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1692        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
1693        skip_lists[0].insert(items[0].clone()).expect("insert error");
1694        skip_lists[1].insert(items[1].clone()).expect("insert error");
1695        // Search for a key before 1..1.
1696        let mut merger = Merger::new(
1697            layer_ref_iter(&skip_lists),
1698            |_left, _right| MergeResult::Other { emit: None, left: Discard, right: Keep },
1699            counters(),
1700        );
1701        let iter = merger.query(Query::FullRange(&TestKey(0..0))).await.expect("seek failed");
1702
1703        // This should find the 2..2 key because of our merge function.
1704        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1705        assert_eq!((key, value), (&items[1].key, &items[1].value));
1706    }
1707
1708    #[fuchsia::test]
1709    async fn test_seek_to_end() {
1710        let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1711        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
1712        skip_lists[0].insert(items[0].clone()).expect("insert error");
1713        skip_lists[1].insert(items[1].clone()).expect("insert error");
1714        let mut merger = Merger::new(
1715            layer_ref_iter(&skip_lists),
1716            |_left, _right| MergeResult::Other { emit: None, left: Discard, right: Keep },
1717            counters(),
1718        );
1719        let iter = merger.query(Query::FullRange(&TestKey(3..3))).await.expect("seek failed");
1720
1721        assert!(iter.get().is_none());
1722    }
1723
1724    #[fuchsia::test]
1725    async fn test_merge_all_discarded() {
1726        let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1727        let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
1728        skip_lists[0].insert(items[1].clone()).expect("insert error");
1729        skip_lists[1].insert(items[0].clone()).expect("insert error");
1730        let mut merger = Merger::new(
1731            layer_ref_iter(&skip_lists),
1732            |_left, _right| MergeResult::Other { emit: None, left: Discard, right: Discard },
1733            counters(),
1734        );
1735        let iter = merger.query(Query::FullScan).await.expect("seek failed");
1736        assert!(iter.get().is_none());
1737    }
1738
1739    #[fuchsia::test]
1740    async fn test_seek_with_merged_key_less_than() {
1741        let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1742        let items = [Item::new(TestKey(1..8), 1), Item::new(TestKey(2..10), 2)];
1743        skip_lists[0].insert(items[0].clone()).expect("insert error");
1744        skip_lists[1].insert(items[1].clone()).expect("insert error");
1745        let mut merger = Merger::new(
1746            layer_ref_iter(&skip_lists),
1747            |left, _right| {
1748                if left.key() == &TestKey(1..8) {
1749                    MergeResult::Other {
1750                        emit: None,
1751                        left: Replace(Item::new(TestKey(1..2), 1).boxed()),
1752                        right: Keep,
1753                    }
1754                } else {
1755                    MergeResult::EmitLeft
1756                }
1757            },
1758            counters(),
1759        );
1760        let iter = merger.query(Query::FullRange(&TestKey(0..3))).await.expect("seek failed");
1761        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1762        assert_eq!((key, value), (&items[1].key, &items[1].value));
1763    }
1764
1765    #[fuchsia::test]
1766    async fn test_overlapping_keys() {
1767        let skip_lists =
1768            [SkipListLayer::new(100), SkipListLayer::new(100), SkipListLayer::new(100)];
1769        let items = [
1770            Item::new(TestKey(0..10), 1),
1771            Item::new(TestKey(0..20), 2),
1772            Item::new(TestKey(0..30), 3),
1773        ];
1774        skip_lists[0].insert(items[0].clone()).expect("insert error");
1775        skip_lists[1].insert(items[1].clone()).expect("insert error");
1776        skip_lists[2].insert(items[2].clone()).expect("insert error");
1777        let mut merger = Merger::new(
1778            layer_ref_iter(&skip_lists),
1779            |left, right| {
1780                let result = if left.key().0.end <= right.key().0.start {
1781                    MergeResult::EmitLeft
1782                } else {
1783                    if left.key() == &TestKey(0..30) && right.key() == &TestKey(10..20) {
1784                        MergeResult::Other {
1785                            emit: Some(Item::new(TestKey(0..10), 1).boxed()),
1786                            left: Replace(Item::new(TestKey(10..30), 1).boxed()),
1787                            right: Keep,
1788                        }
1789                    } else {
1790                        MergeResult::Other {
1791                            emit: None,
1792                            left: Keep,
1793                            right: Replace(
1794                                Item::new(TestKey(left.key().0.end..right.key().0.end), 1).boxed(),
1795                            ),
1796                        }
1797                    }
1798                };
1799                result
1800            },
1801            counters(),
1802        );
1803        let mut iter = merger.query(Query::FullRange(&TestKey(0..1))).await.expect("seek failed");
1804        let ItemRef { key, .. } = iter.get().expect("missing item");
1805        assert_eq!(key, &TestKey(0..10));
1806        iter.advance().await.expect("advance failed");
1807        let ItemRef { key, .. } = iter.get().expect("missing item");
1808        assert_eq!(key, &TestKey(10..20));
1809        iter.advance().await.expect("advance failed");
1810        let ItemRef { key, .. } = iter.get().expect("missing item");
1811        assert_eq!(key, &TestKey(20..30));
1812        iter.advance().await.expect("advance failed");
1813        assert_eq!(iter.get(), None);
1814    }
1815
1816    async fn write_layer<K: Key, V: Value>(items: Vec<Item<K, V>>) -> Arc<dyn Layer<K, V>> {
1817        let object = Arc::new(FakeObject::new());
1818        let write_handle = FakeObjectHandle::new(object.clone());
1819        let mut writer =
1820            PersistentLayerWriter::<_, K, V>::new(Writer::new(&write_handle).await, 1, 512)
1821                .await
1822                .expect("PersistentLayerWriter::new failed");
1823        for item in items {
1824            writer.write(item.as_item_ref()).await.expect("write failed");
1825        }
1826        writer.flush().await.expect("flush failed");
1827        PersistentLayer::open(FakeObjectHandle::new(object))
1828            .await
1829            .expect("open_persistent_layer failed")
1830    }
1831
1832    fn merge_sum(
1833        left: &MergeLayerIterator<'_, i32, i32>,
1834        right: &MergeLayerIterator<'_, i32, i32>,
1835    ) -> MergeResult<i32, i32> {
1836        // Sum matching keys.
1837        if left.key() == right.key() {
1838            MergeResult::Other {
1839                emit: None,
1840                left: Discard,
1841                right: Replace(Item::new(left.key().clone(), left.value() + right.value()).boxed()),
1842            }
1843        } else {
1844            MergeResult::EmitLeft
1845        }
1846    }
1847
1848    #[fuchsia::test]
1849    async fn test_merge_bloom_filters_point_query() {
1850        let layer_0_items = vec![Item::new(1, 1), Item::new(2, 1)];
1851        let layer_1_items = vec![Item::new(2, 1), Item::new(4, 1)];
1852        let items = [Item::new(1, 1), Item::new(2, 2), Item::new(4, 1)];
1853        let layers: [Arc<dyn Layer<i32, i32>>; 2] =
1854            [write_layer(layer_0_items).await, write_layer(layer_1_items).await];
1855        let mut merger = Merger::new(dyn_layer_ref_iter(&layers), merge_sum, counters());
1856
1857        {
1858            // Key that exists in layer 0 only
1859            let iter = merger.query(Query::Point(&1)).await.expect("seek failed");
1860            let ItemRef { key, value, .. } = iter.get().expect("missing item");
1861            assert_eq!((key, *value), (&items[0].key, items[0].value));
1862        }
1863        {
1864            // Key that exists in layer 1 and 2
1865            let iter = merger.query(Query::Point(&2)).await.expect("seek failed");
1866            let ItemRef { key, value, .. } = iter.get().expect("missing item");
1867            assert_eq!((key, *value), (&items[1].key, items[1].value));
1868        }
1869        {
1870            // Key that exists in layer 1
1871            let iter = merger.query(Query::Point(&4)).await.expect("seek failed");
1872            let ItemRef { key, value, .. } = iter.get().expect("missing item");
1873            assert_eq!((key, *value), (&items[2].key, items[2].value));
1874        }
1875        {
1876            // Key that doesn't exist at all
1877            let iter = merger.query(Query::Point(&400)).await.expect("seek failed");
1878            assert!(iter.get().is_none());
1879        }
1880    }
1881
1882    #[fuchsia::test]
1883    async fn test_merge_bloom_filters_limited_range() {
1884        // NB: This test uses ObjectKey so that we don't have to reimplement the complex merging
1885        // logic for range-like keys.
1886        let layer_0_items = vec![Item::new(
1887            ObjectKey::extent(0, 0, 0..2048),
1888            ObjectValue::extent(0, VOLUME_DATA_KEY_ID),
1889        )];
1890        let layer_1_items = vec![
1891            Item::new(
1892                ObjectKey::extent(0, 0, 1024..4096),
1893                ObjectValue::extent(32768, VOLUME_DATA_KEY_ID),
1894            ),
1895            Item::new(
1896                ObjectKey::extent(0, 0, 16384..17408),
1897                ObjectValue::extent(65536, VOLUME_DATA_KEY_ID),
1898            ),
1899        ];
1900        let items = [
1901            Item::new(ObjectKey::extent(0, 0, 0..2048), ObjectValue::extent(0, VOLUME_DATA_KEY_ID)),
1902            Item::new(
1903                ObjectKey::extent(0, 0, 2048..4096),
1904                ObjectValue::extent(33792, VOLUME_DATA_KEY_ID),
1905            ),
1906            Item::new(
1907                ObjectKey::extent(0, 0, 16384..17408),
1908                ObjectValue::extent(65536, VOLUME_DATA_KEY_ID),
1909            ),
1910        ];
1911        let layers: [Arc<dyn Layer<ObjectKey, ObjectValue>>; 2] =
1912            [write_layer(layer_0_items).await, write_layer(layer_1_items).await];
1913        let mut merger =
1914            Merger::new(dyn_layer_ref_iter(&layers), object_store::merge::merge, counters());
1915
1916        {
1917            // Range contains just keys in layer 1
1918            let mut iter = merger
1919                .query(Query::LimitedRange(&ObjectKey::extent(0, 0, 16384..16386)))
1920                .await
1921                .expect("seek failed");
1922            let ItemRef { key, value, .. } = iter.get().expect("missing item");
1923            assert_eq!(key, &items[2].key);
1924            assert_eq!(value, &items[2].value);
1925            iter.advance().await.expect("advance");
1926            assert!(iter.get().is_none());
1927        }
1928        {
1929            // Range contains keys in layer 0 and 1
1930            let mut iter = merger
1931                .query(Query::LimitedRange(&ObjectKey::extent(0, 0, 0..4096)))
1932                .await
1933                .expect("seek failed");
1934            let ItemRef { key, value, .. } = iter.get().expect("missing item");
1935            assert_eq!(key, &items[0].key);
1936            assert_eq!(value, &items[0].value);
1937            iter.advance().await.expect("advance");
1938            let ItemRef { key, value, .. } = iter.get().expect("missing item");
1939            assert_eq!(key, &items[1].key);
1940            assert_eq!(value, &items[1].value);
1941            iter.advance().await.expect("advance");
1942        }
1943        {
1944            // Range contains no keys
1945            let mut iter = merger
1946                .query(Query::LimitedRange(&ObjectKey::extent(0, 0, 8192..12288)))
1947                .await
1948                .expect("seek failed");
1949            let ItemRef { key, .. } = iter.get().expect("missing item");
1950            assert_eq!(key, &items[2].key);
1951            iter.advance().await.expect("advance");
1952            assert!(iter.get().is_none());
1953        }
1954    }
1955
1956    #[fuchsia::test]
1957    async fn test_merge_bloom_filters_full_range() {
1958        let layer_0_items = vec![Item::new(1, 1), Item::new(2, 1)];
1959        let layer_1_items = vec![Item::new(2, 1), Item::new(4, 1)];
1960        let items = [Item::new(1, 1), Item::new(2, 2), Item::new(4, 1)];
1961        let layers: [Arc<dyn Layer<i32, i32>>; 2] =
1962            [write_layer(layer_0_items).await, write_layer(layer_1_items).await];
1963        let mut merger = Merger::new(dyn_layer_ref_iter(&layers), merge_sum, counters());
1964
1965        let mut iter = merger.query(Query::FullRange(&0)).await.expect("seek failed");
1966        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1967        assert_eq!((key, *value), (&items[0].key, items[0].value));
1968        iter.advance().await.expect("advance");
1969        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1970        assert_eq!((key, *value), (&items[1].key, items[1].value));
1971        iter.advance().await.expect("advance");
1972        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1973        assert_eq!((key, *value), (&items[2].key, items[2].value));
1974        iter.advance().await.expect("advance");
1975        assert!(iter.get().is_none());
1976    }
1977
1978    #[fuchsia::test]
1979    async fn test_merge_bloom_filters_full_scan() {
1980        let layer_0_items = vec![Item::new(1, 1), Item::new(2, 1)];
1981        let layer_1_items = vec![Item::new(2, 1), Item::new(4, 1)];
1982        let items = [Item::new(1, 1), Item::new(2, 2), Item::new(4, 1)];
1983        let layers: [Arc<dyn Layer<i32, i32>>; 2] =
1984            [write_layer(layer_0_items).await, write_layer(layer_1_items).await];
1985        let mut merger = Merger::new(dyn_layer_ref_iter(&layers), merge_sum, counters());
1986
1987        let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
1988        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1989        assert_eq!((key, *value), (&items[0].key, items[0].value));
1990        iter.advance().await.expect("advance");
1991        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1992        assert_eq!((key, *value), (&items[1].key, items[1].value));
1993        iter.advance().await.expect("advance");
1994        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1995        assert_eq!((key, *value), (&items[2].key, items[2].value));
1996        iter.advance().await.expect("advance");
1997        assert!(iter.get().is_none());
1998    }
1999}