1use crate::log::*;
6use crate::lsm_tree;
7use crate::lsm_tree::types::{
8 BoxedItem, Item, ItemRef, Key, Layer, LayerIterator, LayerIteratorMut, LayerKey, MergeType,
9 OrdLowerBound, Value,
10};
11use anyhow::Error;
12use async_trait::async_trait;
13use futures::try_join;
14use std::cmp::Ordering;
15use std::collections::BinaryHeap;
16use std::fmt::{Debug, Write};
17use std::ops::{Bound, Deref, DerefMut};
18use std::sync::{Arc, atomic};
19
20#[derive(Debug, Eq, PartialEq)]
21pub enum ItemOp<K, V> {
22 Keep,
24
25 Discard,
27
28 Replace(BoxedItem<K, V>),
31}
32
33#[derive(Debug, Eq, PartialEq)]
34pub enum MergeResult<K, V> {
35 EmitLeft,
38
39 Other { emit: Option<BoxedItem<K, V>>, left: ItemOp<K, V>, right: ItemOp<K, V> },
60}
61
62pub type MergeFn<K, V> =
67 fn(&MergeLayerIterator<'_, K, V>, &MergeLayerIterator<'_, K, V>) -> MergeResult<K, V>;
68
69pub enum MergeItem<K, V> {
70 None,
71 Item(BoxedItem<K, V>),
72 Iter,
73}
74
75enum RawIterator<'a, K, V> {
76 None,
77 Const(Box<dyn LayerIterator<K, V> + 'a>),
78 Mut(Box<dyn LayerIteratorMut<K, V> + 'a>),
79}
80
81unsafe impl<K, V> Send for RawIterator<'_, K, V> {}
84
85pub struct MergeLayerIterator<'a, K, V> {
88 layer: Option<&'a dyn Layer<K, V>>,
89
90 iter: RawIterator<'a, K, V>,
92
93 pub layer_index: u16,
95
96 item: MergeItem<K, V>,
98}
99
100impl<'a, K, V> MergeLayerIterator<'a, K, V> {
101 pub fn key(&self) -> &K {
102 self.item().key
103 }
104
105 pub fn value(&self) -> &V {
106 self.item().value
107 }
108
109 fn new(layer_index: u16, layer: &'a dyn Layer<K, V>) -> Self {
110 MergeLayerIterator {
111 layer: Some(layer),
112 iter: RawIterator::None,
113 layer_index,
114 item: MergeItem::None,
115 }
116 }
117
118 fn new_with_item(layer_index: u16, item: MergeItem<K, V>) -> Self {
119 MergeLayerIterator { layer: None, iter: RawIterator::None, layer_index, item }
120 }
121
122 fn item(&self) -> ItemRef<'_, K, V> {
123 match &self.item {
124 MergeItem::None => panic!("No item!"),
125 MergeItem::Item(item) => item.as_item_ref(),
126 MergeItem::Iter => self.get().unwrap(),
127 }
128 }
129
130 fn get(&self) -> Option<ItemRef<'_, K, V>> {
131 match &self.iter {
132 RawIterator::None => panic!("No iterator!"),
133 RawIterator::Const(iter) => iter.get(),
134 RawIterator::Mut(iter) => iter.get(),
135 }
136 }
137
138 fn set_item_from_iter(&mut self) {
139 self.item = if match &self.iter {
140 RawIterator::None => unreachable!(),
141 RawIterator::Const(iter) => iter.get(),
142 RawIterator::Mut(iter) => iter.get(),
143 }
144 .is_some()
145 {
146 MergeItem::Iter
147 } else {
148 MergeItem::None
149 };
150 }
151
152 fn take_item(&mut self) -> Option<BoxedItem<K, V>> {
153 if matches!(self.item, MergeItem::Item(_)) {
154 if let MergeItem::Item(item) = std::mem::replace(&mut self.item, MergeItem::None) {
155 return Some(item);
156 }
157 }
158 None
159 }
160
161 async fn advance(&mut self) -> Result<(), Error> {
162 if let MergeItem::Iter = self.item {
163 if let RawIterator::Const(iter) = &mut self.iter {
164 iter.advance().await?;
165 } else {
166 unreachable!();
168 }
169 }
170 self.set_item_from_iter();
171 Ok(())
172 }
173
174 fn replace(&mut self, item: BoxedItem<K, V>) {
175 self.item = MergeItem::Item(item);
176 }
177
178 fn is_some(&self) -> bool {
179 !matches!(self.item, MergeItem::None)
180 }
181
182 async fn maybe_advance(&mut self, op: &ItemOp<K, V>) -> Result<(), Error> {
185 if let ItemOp::Keep = op { Ok(()) } else { self.advance().await }
186 }
187}
188
189impl<K: OrdLowerBound, V> Ord for MergeLayerIterator<'_, K, V> {
191 fn cmp(&self, other: &Self) -> Ordering {
192 other.key().cmp_lower_bound(self.key()).then(other.layer_index.cmp(&self.layer_index))
194 }
195}
196impl<K: OrdLowerBound, V> PartialOrd for MergeLayerIterator<'_, K, V> {
197 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
198 Some(self.cmp(other))
199 }
200}
201impl<K: OrdLowerBound, V> PartialEq for MergeLayerIterator<'_, K, V> {
202 fn eq(&self, other: &Self) -> bool {
203 self.cmp(other) == Ordering::Equal
204 }
205}
206impl<K: OrdLowerBound, V> Eq for MergeLayerIterator<'_, K, V> {}
207
208enum CurrentItem<'a, 'b, K, V> {
211 None,
212 Item(BoxedItem<K, V>),
213 Iterator(&'a mut MergeLayerIterator<'b, K, V>),
214}
215
216impl<'a, 'b, K, V> CurrentItem<'a, 'b, K, V> {
217 fn take_iterator(&mut self) -> Option<&'a mut MergeLayerIterator<'b, K, V>> {
218 if matches!(self, CurrentItem::Iterator(_)) {
219 if let CurrentItem::Iterator(iter) = std::mem::replace(self, CurrentItem::None) {
220 return Some(iter);
221 }
222 }
223 None
224 }
225}
226
227impl<'a, K, V> From<&'a CurrentItem<'_, '_, K, V>> for Option<ItemRef<'a, K, V>> {
228 fn from(iter: &'a CurrentItem<'_, '_, K, V>) -> Option<ItemRef<'a, K, V>> {
229 match iter {
230 CurrentItem::None => None,
231 CurrentItem::Iterator(iterator) => Some(iterator.item()),
232 CurrentItem::Item(item) => Some(item.as_item_ref()),
233 }
234 }
235}
236
237pub struct Merger<'a, K, V> {
239 iterators: Vec<MergeLayerIterator<'a, K, V>>,
241
242 merge_fn: MergeFn<K, V>,
244
245 trace: bool,
247
248 counters: Arc<lsm_tree::TreeCounters>,
250}
251
252#[derive(Debug, Clone)]
259pub enum Query<'a, K: Key + LayerKey + OrdLowerBound> {
260 Point(&'a K),
265
266 LimitedRange(&'a K),
274
275 FullRange(&'a K),
281
282 FullScan,
285}
286
287impl<'a, K: Key + LayerKey + OrdLowerBound> Query<'a, K> {
288 fn needs_layer<V>(&self, layer: &dyn Layer<K, V>) -> bool {
289 match self {
290 Self::Point(key) => layer.maybe_contains_key(key),
291 Self::LimitedRange(key) => layer.maybe_contains_key(key),
292 Self::FullRange(_) => true,
293 Self::FullScan => true,
294 }
295 }
296}
297
298#[fxfs_trace::trace]
299impl<'a, K: Key + LayerKey + OrdLowerBound, V: Value> Merger<'a, K, V> {
300 pub(super) fn new<I: Iterator<Item = &'a dyn Layer<K, V>>>(
301 layers: I,
302 merge_fn: MergeFn<K, V>,
303 counters: Arc<lsm_tree::TreeCounters>,
304 ) -> Merger<'a, K, V> {
305 Merger {
306 iterators: layers
307 .enumerate()
308 .map(|(index, layer)| MergeLayerIterator::new(index as u16, layer))
309 .collect(),
310 merge_fn: merge_fn,
311 trace: false,
312 counters,
313 }
314 }
315
316 #[trace]
319 pub async fn query(
320 &mut self,
321 query: Query<'_, K>,
322 ) -> Result<MergerIterator<'_, 'a, K, V>, Error> {
323 if let Query::Point(key) = query {
324 debug_assert!(!key.is_range_key())
332 };
333 let len = self.iterators.len();
334 let pending_iterators = {
335 fxfs_trace::duration!("Merger::filter_layer_files", "len" => len);
336 self.iterators
337 .iter_mut()
338 .rev()
339 .filter(|l| query.needs_layer(l.layer.clone().unwrap()))
340 .collect::<Vec<&mut MergeLayerIterator<'a, K, V>>>()
341 };
342 let layer_count = pending_iterators.len();
343 {
344 self.counters.num_seeks.fetch_add(1, atomic::Ordering::Relaxed);
345 self.counters.layer_files_total.fetch_add(len, atomic::Ordering::Relaxed);
346 self.counters
347 .layer_files_skipped
348 .fetch_add(len - layer_count, atomic::Ordering::Relaxed);
349 }
350 log::debug!(query:?; "Consulting {}/{} layers", layer_count, len);
351 let mut merger_iter = MergerIterator {
352 merge_fn: self.merge_fn,
353 pending_iterators,
354 heap: BinaryHeap::with_capacity(layer_count),
355 item: CurrentItem::None,
356 trace: self.trace,
357 history: String::new(),
358 };
359 let owned_key;
360 let search_key = match query {
361 Query::Point(key) => Bound::Included(key),
362 Query::LimitedRange(key) => match key.search_key() {
363 Some(k) => {
364 owned_key = k;
365 Bound::Included(&owned_key)
366 }
367 None => Bound::Included(key),
368 },
369 Query::FullRange(key) => {
370 assert!(key.is_search_key());
371 Bound::Included(key)
372 }
373 Query::FullScan => Bound::Unbounded,
374 };
375 merger_iter.seek(search_key).await?;
376 Ok(merger_iter)
377 }
378
379 pub fn set_trace(&mut self, v: bool) {
380 self.trace = v;
381 }
382}
383
384pub struct MergerIterator<'a, 'b, K, V> {
387 merge_fn: MergeFn<K, V>,
388
389 pending_iterators: Vec<&'a mut MergeLayerIterator<'b, K, V>>,
391
392 heap: BinaryHeap<&'a mut MergeLayerIterator<'b, K, V>>,
394
395 item: CurrentItem<'a, 'b, K, V>,
397
398 trace: bool,
400
401 history: String,
403}
404
405impl<'a, 'b, K: Key + LayerKey + OrdLowerBound, V: Value> MergerIterator<'a, 'b, K, V> {
406 pub fn pending_iterators_len(&self) -> usize {
407 self.pending_iterators.len()
408 }
409
410 async fn seek(&mut self, search_key: Bound<&K>) -> Result<(), Error> {
412 self.push_iterators(search_key).await?;
413 self.advance_impl(search_key).await
414 }
415
416 async fn advance_impl(&mut self, search_key: Bound<&K>) -> Result<(), Error> {
422 loop {
423 loop {
424 if self.heap.is_empty() {
425 self.item = CurrentItem::None;
426 return Ok(());
427 }
428 let lowest = self.heap.pop().unwrap();
429 let maybe_second_lowest = self.heap.pop();
430 if let Some(second_lowest) = maybe_second_lowest {
431 let result = (self.merge_fn)(&lowest, &second_lowest);
432 if self.trace {
433 writeln!(
434 self.history,
435 "merge {:?}, {:?} -> {:?}",
436 lowest.item(),
437 second_lowest.item(),
438 result
439 )
440 .unwrap();
441 }
442 match result {
443 MergeResult::EmitLeft => {
444 self.heap.push(second_lowest);
445 self.item = CurrentItem::Iterator(lowest);
446 break;
447 }
448 MergeResult::Other { emit, left, right } => {
449 try_join!(
450 lowest.maybe_advance(&left),
451 second_lowest.maybe_advance(&right)
452 )?;
453 self.update_item(lowest, left);
454 self.update_item(second_lowest, right);
455 if let Some(emit) = emit {
456 self.item = CurrentItem::Item(emit);
457 break;
458 }
459 }
460 }
461 } else {
462 self.item = CurrentItem::Iterator(lowest);
463 break;
464 }
465 }
466
467 match search_key {
487 Bound::Included(key)
488 if self.get().unwrap().key.cmp_upper_bound(key) == Ordering::Less => {}
489 Bound::Excluded(key)
490 if self.get().unwrap().key.cmp_upper_bound(key) != Ordering::Greater => {}
491 _ => return Ok(()),
492 }
493 if let Some(iterator) = self.item.take_iterator() {
494 iterator.advance().await?;
495 if iterator.is_some() {
496 self.heap.push(iterator);
497 }
498 }
499 }
500 }
501
502 fn needs_more_iterators(&self, search_key: Bound<&K>) -> bool {
504 if self.pending_iterators.is_empty() {
505 return false;
506 }
507 if self.heap.is_empty() {
508 return true;
509 }
510 let Bound::Included(target) = search_key else { return true };
511 match target.merge_type() {
512 MergeType::FullMerge => true,
513 MergeType::OptimizedMerge => !self.heap.peek().unwrap().key().overlaps(target),
514 }
515 }
516
517 async fn push_iterators(&mut self, search_key: Bound<&K>) -> Result<(), Error> {
520 while self.needs_more_iterators(search_key) {
521 let iter = self.pending_iterators.pop().unwrap();
522 let sub_iter = iter.layer.as_ref().unwrap().seek(search_key).await?;
523 if self.trace {
524 writeln!(
525 self.history,
526 "merger: search for {:?}, found {:?}",
527 search_key,
528 sub_iter.get()
529 )
530 .unwrap();
531 }
532 iter.iter = RawIterator::Const(sub_iter);
533 iter.set_item_from_iter();
534 if iter.is_some() {
535 self.heap.push(iter);
536 }
537 }
538 Ok(())
539 }
540
541 fn update_item(&mut self, item: &'a mut MergeLayerIterator<'b, K, V>, op: ItemOp<K, V>) {
544 match op {
545 ItemOp::Keep => self.heap.push(item),
546 ItemOp::Discard => {
547 if item.is_some() {
549 self.heap.push(item);
550 }
551 }
552 ItemOp::Replace(replacement) => {
553 item.replace(replacement);
554 self.heap.push(item);
555 }
556 }
557 }
558}
559
560#[async_trait]
561impl<'a, K: Key + LayerKey + OrdLowerBound, V: Value> LayerIterator<K, V>
562 for MergerIterator<'a, '_, K, V>
563{
564 async fn advance(&mut self) -> Result<(), Error> {
565 let owned_key;
566 let mut search_key = Bound::Unbounded;
567 if !self.pending_iterators.is_empty() {
568 if let Some(ItemRef { key, .. }) = self.get() {
569 match key.next_key() {
570 Some(k) => {
571 assert!(k.is_search_key());
572 owned_key = k;
573 search_key = Bound::Included(&owned_key);
574 }
575 None => {
576 owned_key = key.clone();
577 search_key = Bound::Excluded(&owned_key);
578 }
579 }
580 }
581 }
582
583 if let Some(iterator) = self.item.take_iterator() {
586 if self.needs_more_iterators(search_key) {
587 let existing_item = iterator.item().boxed();
588 iterator.advance().await?;
589 if let Bound::Included(s) = search_key
590 && iterator.is_some()
591 && iterator.key().merge_type() == MergeType::OptimizedMerge
592 && s.overlaps(iterator.key())
593 {
594 } else {
598 iterator.replace(existing_item);
602
603 self.push_iterators(search_key).await?;
606 }
607 } else {
608 iterator.advance().await?;
609 }
610 if iterator.is_some() {
611 self.heap.push(iterator);
612 }
613 } else {
614 self.push_iterators(search_key).await?;
615 }
616
617 self.advance_impl(search_key).await
618 }
619
620 fn get(&self) -> Option<ItemRef<'_, K, V>> {
621 (&self.item).into()
622 }
623}
624
625struct MutMergeLayerIterator<'a, K, V>(MergeLayerIterator<'a, K, V>);
626
627impl<K, V> MutMergeLayerIterator<'_, K, V> {
628 fn advance(&mut self) {
629 if let MergeItem::Iter = self.item {
630 self.as_mut().advance();
631 }
632 self.set_item_from_iter();
633 }
634}
635
636impl<'a, K, V> AsMut<dyn LayerIteratorMut<K, V> + 'a> for MutMergeLayerIterator<'a, K, V> {
637 fn as_mut(&mut self) -> &mut (dyn LayerIteratorMut<K, V> + 'a) {
638 let RawIterator::Mut(iter) = &mut self.0.iter else { unreachable!() };
639 iter.as_mut()
640 }
641}
642
643impl<'a, K, V> Deref for MutMergeLayerIterator<'a, K, V> {
644 type Target = MergeLayerIterator<'a, K, V>;
645
646 fn deref(&self) -> &Self::Target {
647 &self.0
648 }
649}
650
651impl<K, V> DerefMut for MutMergeLayerIterator<'_, K, V> {
652 fn deref_mut(&mut self) -> &mut Self::Target {
653 &mut self.0
654 }
655}
656
657pub(super) fn merge_into<K: Debug + OrdLowerBound, V: Debug>(
659 mut_iter: Box<dyn LayerIteratorMut<K, V> + '_>,
660 item: Item<K, V>,
661 merge_fn: MergeFn<K, V>,
662) -> Result<(), Error> {
663 let merge_item = if mut_iter.get().is_some() { MergeItem::Iter } else { MergeItem::None };
664 let mut mut_merge_iter = MutMergeLayerIterator(MergeLayerIterator {
665 layer: None,
666 iter: RawIterator::Mut(mut_iter),
667 layer_index: 1,
668 item: merge_item,
669 });
670 let mut item_merge_iter = MergeLayerIterator::new_with_item(0, MergeItem::Item(item.boxed()));
671 while mut_merge_iter.is_some() && item_merge_iter.is_some() {
672 if mut_merge_iter.0 > item_merge_iter {
673 let merge_result = merge_fn(&mut_merge_iter, &item_merge_iter);
675 debug!(
676 lhs:? = mut_merge_iter.key(),
677 rhs:? = item_merge_iter.key(),
678 result:? = merge_result;
679 "(1) merge");
680 match merge_result {
681 MergeResult::EmitLeft => {
682 if let Some(item) = mut_merge_iter.take_item() {
683 mut_merge_iter.as_mut().insert(*item);
684 mut_merge_iter.set_item_from_iter();
685 } else {
686 mut_merge_iter.advance();
687 }
688 }
689 MergeResult::Other { emit, left, right } => {
690 if let Some(emit) = emit {
691 mut_merge_iter.as_mut().insert(*emit);
692 }
693 match left {
694 ItemOp::Keep => {}
695 ItemOp::Discard => {
696 if matches!(mut_merge_iter.item, MergeItem::Iter) {
697 mut_merge_iter.as_mut().erase();
698 }
699 mut_merge_iter.set_item_from_iter();
700 }
701 ItemOp::Replace(item) => {
702 if let MergeItem::Iter = mut_merge_iter.item {
703 mut_merge_iter.as_mut().erase();
704 }
705 mut_merge_iter.item = MergeItem::Item(item)
706 }
707 }
708 match right {
709 ItemOp::Keep => {}
710 ItemOp::Discard => item_merge_iter.item = MergeItem::None,
711 ItemOp::Replace(item) => item_merge_iter.item = MergeItem::Item(item),
712 }
713 }
714 }
715 } else {
716 let merge_result = merge_fn(&item_merge_iter, &mut_merge_iter);
718 debug!(
719 lhs:? = mut_merge_iter.key(),
720 rhs:? = item_merge_iter.key(),
721 result:? = merge_result;
722 "(2) merge");
723 match merge_result {
724 MergeResult::EmitLeft => break, MergeResult::Other { emit, left, right } => {
726 if let Some(emit) = emit {
727 mut_merge_iter.as_mut().insert(*emit);
728 }
729 match left {
730 ItemOp::Keep => {}
731 ItemOp::Discard => item_merge_iter.item = MergeItem::None,
732 ItemOp::Replace(item) => item_merge_iter.item = MergeItem::Item(item),
733 }
734 match right {
735 ItemOp::Keep => {}
736 ItemOp::Discard => {
737 if matches!(mut_merge_iter.item, MergeItem::Iter) {
738 mut_merge_iter.as_mut().erase();
739 }
740 mut_merge_iter.set_item_from_iter();
741 }
742 ItemOp::Replace(item) => {
743 if let MergeItem::Iter = mut_merge_iter.item {
744 mut_merge_iter.as_mut().erase();
745 }
746 mut_merge_iter.item = MergeItem::Item(item)
747 }
748 }
749 }
750 }
751 }
752 } if let MergeItem::Item(item) = item_merge_iter.item {
757 mut_merge_iter.as_mut().insert(*item);
758 }
759 if let Some(item) = mut_merge_iter.take_item() {
760 mut_merge_iter.as_mut().insert(*item);
761 }
762 if let RawIterator::Mut(mut iter) = mut_merge_iter.0.iter {
763 iter.commit();
764 }
765 Ok(())
766}
767
768#[cfg(test)]
769mod tests {
770 use super::ItemOp::{Discard, Keep, Replace};
771 use super::{MergeLayerIterator, MergeResult, Merger};
772 use crate::lsm_tree::persistent_layer::{PersistentLayer, PersistentLayerWriter};
773 use crate::lsm_tree::skip_list_layer::SkipListLayer;
774 use crate::lsm_tree::types::{
775 FuzzyHash, Item, ItemRef, Key, Layer, LayerIterator, LayerKey, LayerWriter, MergeType,
776 OrdLowerBound, OrdUpperBound, SortByU64,
777 };
778 use crate::lsm_tree::{self, Query, Value};
779 use crate::object_store::{self, ObjectKey, ObjectValue, VOLUME_DATA_KEY_ID};
780 use crate::serialized_types::{
781 LATEST_VERSION, Version, Versioned, VersionedLatest, versioned_type,
782 };
783 use crate::testing::fake_object::{FakeObject, FakeObjectHandle};
784 use crate::testing::writer::Writer;
785 use fprint::TypeFingerprint;
786 use fxfs_macros::{FuzzyHash, SerializeKey};
787 use rand::Rng;
788 use std::hash::Hash;
789 use std::ops::{Bound, Range};
790 use std::sync::Arc;
791
792 use crate::lsm_tree::testing::TestKey;
793
794 impl Value for i32 {
795 const DELETED_MARKER: Self = 0;
796 }
797
798 fn layer_ref_iter<K: Key, V: Value>(
799 layers: &[Arc<SkipListLayer<K, V>>],
800 ) -> impl Iterator<Item = &dyn Layer<K, V>> {
801 layers.iter().map(|x| x.as_ref() as &dyn Layer<K, V>)
802 }
803
804 fn dyn_layer_ref_iter<K: Key, V: Value>(
805 layers: &[Arc<dyn Layer<K, V>>],
806 ) -> impl Iterator<Item = &dyn Layer<K, V>> {
807 layers.iter().map(|x| x.as_ref())
808 }
809
810 fn counters() -> Arc<lsm_tree::TreeCounters> {
811 Arc::new(lsm_tree::TreeCounters::default())
812 }
813
814 #[fuchsia::test]
815 async fn test_emit_left() {
816 let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
817 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
818 skip_lists[0].insert(items[1].clone()).expect("insert error");
819 skip_lists[1].insert(items[0].clone()).expect("insert error");
820 let mut merger = Merger::new(
821 layer_ref_iter(&skip_lists),
822 |_left, _right| MergeResult::EmitLeft,
823 counters(),
824 );
825 let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
826 let ItemRef { key, value, .. } = iter.get().expect("missing item");
827 assert_eq!((key, value), (&items[0].key, &items[0].value));
828 iter.advance().await.unwrap();
829 let ItemRef { key, value, .. } = iter.get().expect("missing item");
830 assert_eq!((key, value), (&items[1].key, &items[1].value));
831 iter.advance().await.unwrap();
832 assert!(iter.get().is_none());
833 }
834
835 #[fuchsia::test]
836 async fn test_other_emit() {
837 let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
838 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
839 skip_lists[0].insert(items[1].clone()).expect("insert error");
840 skip_lists[1].insert(items[0].clone()).expect("insert error");
841 let mut merger = Merger::new(
842 layer_ref_iter(&skip_lists),
843 |_left, _right| MergeResult::Other {
844 emit: Some(Item::new(TestKey(3..3), 3).boxed()),
845 left: Discard,
846 right: Discard,
847 },
848 counters(),
849 );
850 let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
851
852 let ItemRef { key, value, .. } = iter.get().expect("missing item");
853 assert_eq!((key, value), (&TestKey(3..3), &3));
854 iter.advance().await.unwrap();
855 assert!(iter.get().is_none());
856 }
857
858 #[fuchsia::test]
859 async fn test_replace_left() {
860 let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
861 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
862 skip_lists[0].insert(items[1].clone()).expect("insert error");
863 skip_lists[1].insert(items[0].clone()).expect("insert error");
864 let mut merger = Merger::new(
865 layer_ref_iter(&skip_lists),
866 |_left, _right| MergeResult::Other {
867 emit: None,
868 left: Replace(Item::new(TestKey(3..3), 3).boxed()),
869 right: Discard,
870 },
871 counters(),
872 );
873 let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
874
875 let ItemRef { key, value, .. } = iter.get().expect("missing item");
878 assert_eq!((key, value), (&TestKey(3..3), &3));
879 iter.advance().await.unwrap();
880 assert!(iter.get().is_none());
881 }
882
883 #[fuchsia::test]
884 async fn test_replace_right() {
885 let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
886 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
887 skip_lists[0].insert(items[1].clone()).expect("insert error");
888 skip_lists[1].insert(items[0].clone()).expect("insert error");
889 let mut merger = Merger::new(
890 layer_ref_iter(&skip_lists),
891 |_left, _right| MergeResult::Other {
892 emit: None,
893 left: Discard,
894 right: Replace(Item::new(TestKey(3..3), 3).boxed()),
895 },
896 counters(),
897 );
898 let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
899
900 let ItemRef { key, value, .. } = iter.get().expect("missing item");
903 assert_eq!((key, value), (&TestKey(3..3), &3));
904 iter.advance().await.unwrap();
905 assert!(iter.get().is_none());
906 }
907
908 #[fuchsia::test]
909 async fn test_left_less_than_right() {
910 let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
911 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
912 skip_lists[0].insert(items[1].clone()).expect("insert error");
913 skip_lists[1].insert(items[0].clone()).expect("insert error");
914 let mut merger = Merger::new(
915 layer_ref_iter(&skip_lists),
916 |left, right| {
917 assert_eq!((left.key(), left.value()), (&TestKey(1..1), &1));
918 assert_eq!((right.key(), right.value()), (&TestKey(2..2), &2));
919 MergeResult::EmitLeft
920 },
921 counters(),
922 );
923 merger.query(Query::FullScan).await.expect("seek failed");
924 }
925
926 #[fuchsia::test]
927 async fn test_left_equals_right() {
928 let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
929 let item = Item::new(TestKey(1..1), 1);
930 skip_lists[0].insert(item.clone()).expect("insert error");
931 skip_lists[1].insert(item.clone()).expect("insert error");
932 let mut merger = Merger::new(
933 layer_ref_iter(&skip_lists),
934 |left, right| {
935 assert_eq!((left.key(), left.value()), (&TestKey(1..1), &1));
936 assert_eq!((right.key(), right.value()), (&TestKey(1..1), &1));
937 assert_eq!(left.layer_index, 0);
938 assert_eq!(right.layer_index, 1);
939 MergeResult::EmitLeft
940 },
941 counters(),
942 );
943 merger.query(Query::FullScan).await.expect("seek failed");
944 }
945
946 #[fuchsia::test]
947 async fn test_keep() {
948 let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
949 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
950 skip_lists[0].insert(items[1].clone()).expect("insert error");
951 skip_lists[1].insert(items[0].clone()).expect("insert error");
952 let mut merger = Merger::new(
953 layer_ref_iter(&skip_lists),
954 |left, right| {
955 if left.key() == &TestKey(1..1) {
956 MergeResult::Other {
957 emit: None,
958 left: Replace(Item::new(TestKey(3..3), 3).boxed()),
959 right: Keep,
960 }
961 } else {
962 assert_eq!(left.key(), &TestKey(2..2));
963 assert_eq!(right.key(), &TestKey(3..3));
964 MergeResult::Other { emit: None, left: Discard, right: Keep }
965 }
966 },
967 counters(),
968 );
969 let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
970
971 let ItemRef { key, value, .. } = iter.get().expect("missing item");
974 assert_eq!((key, value), (&TestKey(3..3), &3));
975 iter.advance().await.unwrap();
976 assert!(iter.get().is_none());
977 }
978
979 #[fuchsia::test]
980 async fn test_merge_10_layers() {
981 let skip_lists: Vec<_> = (0..10).map(|_| SkipListLayer::new(100)).collect();
982 let mut rng = rand::rng();
983 for i in 0..100 {
984 skip_lists[rng.random_range(0..10) as usize]
985 .insert(Item::new(TestKey(i..i), i))
986 .expect("insert error");
987 }
988 let mut merger = Merger::new(
989 layer_ref_iter(&skip_lists),
990 |_left, _right| MergeResult::EmitLeft,
991 counters(),
992 );
993 let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
994
995 for i in 0..100 {
996 let ItemRef { key, value, .. } = iter.get().expect("missing item");
997 assert_eq!((key, value), (&TestKey(i..i), &i));
998 iter.advance().await.unwrap();
999 }
1000 assert!(iter.get().is_none());
1001 }
1002
1003 #[fuchsia::test]
1004 async fn test_merge_uses_cmp_lower_bound() {
1005 let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1006 let items = [Item::new(TestKey(1..10), 1), Item::new(TestKey(2..3), 2)];
1007 skip_lists[0].insert(items[1].clone()).expect("insert error");
1008 skip_lists[1].insert(items[0].clone()).expect("insert error");
1009 let mut merger = Merger::new(
1010 layer_ref_iter(&skip_lists),
1011 |_left, _right| MergeResult::EmitLeft,
1012 counters(),
1013 );
1014 let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
1015
1016 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1017 assert_eq!((key, value), (&items[0].key, &items[0].value));
1018 iter.advance().await.unwrap();
1019 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1020 assert_eq!((key, value), (&items[1].key, &items[1].value));
1021 iter.advance().await.unwrap();
1022 assert!(iter.get().is_none());
1023 }
1024
1025 #[fuchsia::test]
1026 async fn test_merge_into_emit_left() {
1027 let skip_list = SkipListLayer::new(100);
1028 let items =
1029 [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2), Item::new(TestKey(3..3), 3)];
1030 skip_list.insert(items[0].clone()).expect("insert error");
1031 skip_list.insert(items[2].clone()).expect("insert error");
1032 skip_list
1033 .merge_into(items[1].clone(), &items[0].key, |_left, _right| MergeResult::EmitLeft);
1034
1035 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1036 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1037 assert_eq!((key, value), (&items[0].key, &items[0].value));
1038 iter.advance().await.unwrap();
1039 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1040 assert_eq!((key, value), (&items[1].key, &items[1].value));
1041 iter.advance().await.unwrap();
1042 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1043 assert_eq!((key, value), (&items[2].key, &items[2].value));
1044 iter.advance().await.unwrap();
1045 assert!(iter.get().is_none());
1046 }
1047
1048 #[fuchsia::test]
1049 async fn test_merge_into_emit_last_after_replacing() {
1050 let skip_list = SkipListLayer::new(100);
1051 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
1052 skip_list.insert(items[0].clone()).expect("insert error");
1053
1054 skip_list.merge_into(items[1].clone(), &items[0].key, |left, right| {
1055 if left.key() == &TestKey(1..1) {
1056 assert_eq!(right.key(), &TestKey(2..2));
1057 MergeResult::Other {
1058 emit: None,
1059 left: Replace(Item::new(TestKey(3..3), 3).boxed()),
1060 right: Keep,
1061 }
1062 } else {
1063 assert_eq!(left.key(), &TestKey(2..2));
1064 assert_eq!(right.key(), &TestKey(3..3));
1065 MergeResult::EmitLeft
1066 }
1067 });
1068
1069 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1070 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1071 assert_eq!((key, value), (&items[1].key, &items[1].value));
1072 iter.advance().await.unwrap();
1073 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1074 assert_eq!((key, value), (&TestKey(3..3), &3));
1075 iter.advance().await.unwrap();
1076 assert!(iter.get().is_none());
1077 }
1078
1079 #[fuchsia::test]
1080 async fn test_merge_into_emit_left_after_replacing() {
1081 let skip_list = SkipListLayer::new(100);
1082 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(3..3), 3)];
1083 skip_list.insert(items[0].clone()).expect("insert error");
1084
1085 skip_list.merge_into(items[1].clone(), &items[0].key, |left, right| {
1086 if left.key() == &TestKey(1..1) {
1087 assert_eq!(right.key(), &TestKey(3..3));
1088 MergeResult::Other {
1089 emit: None,
1090 left: Replace(Item::new(TestKey(2..2), 2).boxed()),
1091 right: Keep,
1092 }
1093 } else {
1094 assert_eq!(left.key(), &TestKey(2..2));
1095 assert_eq!(right.key(), &TestKey(3..3));
1096 MergeResult::EmitLeft
1097 }
1098 });
1099
1100 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1101 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1102 assert_eq!((key, value), (&TestKey(2..2), &2));
1103 iter.advance().await.unwrap();
1104 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1105 assert_eq!((key, value), (&items[1].key, &items[1].value));
1106 iter.advance().await.unwrap();
1107 assert!(iter.get().is_none());
1108 }
1109
1110 #[fuchsia::test]
1112 async fn test_merge_into_emit_other_and_discard() {
1113 let skip_list = SkipListLayer::new(100);
1114 let items =
1115 [Item::new(TestKey(1..1), 1), Item::new(TestKey(3..3), 3), Item::new(TestKey(5..5), 3)];
1116 skip_list.insert(items[0].clone()).expect("insert error");
1117 skip_list.insert(items[2].clone()).expect("insert error");
1118
1119 skip_list.merge_into(items[1].clone(), &items[0].key, |left, right| {
1120 if left.key() == &TestKey(1..1) {
1121 assert_eq!(right.key(), &TestKey(3..3));
1123 MergeResult::Other {
1124 emit: Some(Item::new(TestKey(2..2), 2).boxed()),
1125 left: Discard,
1126 right: Keep,
1127 }
1128 } else {
1129 assert_eq!(left.key(), &TestKey(3..3));
1131 assert_eq!(right.key(), &TestKey(5..5));
1132 MergeResult::Other {
1133 emit: Some(Item::new(TestKey(4..4), 4).boxed()),
1134 left: Discard,
1135 right: Discard,
1136 }
1137 }
1138 });
1139
1140 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1141 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1142 assert_eq!((key, value), (&TestKey(2..2), &2));
1143 iter.advance().await.unwrap();
1144 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1145 assert_eq!((key, value), (&TestKey(4..4), &4));
1146 iter.advance().await.unwrap();
1147 assert!(iter.get().is_none());
1148 }
1149
1150 #[fuchsia::test]
1153 async fn test_merge_into_replace_and_discard() {
1154 let skip_list = SkipListLayer::new(100);
1155 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(3..3), 3)];
1156 skip_list.insert(items[0].clone()).expect("insert error");
1157
1158 skip_list.merge_into(items[1].clone(), &items[0].key, |_left, _right| MergeResult::Other {
1159 emit: Some(Item::new(TestKey(2..2), 2).boxed()),
1160 left: Replace(Item::new(TestKey(4..4), 4).boxed()),
1161 right: Discard,
1162 });
1163
1164 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1165 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1166 assert_eq!((key, value), (&TestKey(2..2), &2));
1167 iter.advance().await.unwrap();
1168 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1169 assert_eq!((key, value), (&TestKey(4..4), &4));
1170 iter.advance().await.unwrap();
1171 assert!(iter.get().is_none());
1172 }
1173
1174 #[fuchsia::test]
1177 async fn test_merge_into_replace_merge_item() {
1178 let skip_list = SkipListLayer::new(100);
1179 let items =
1180 [Item::new(TestKey(1..1), 1), Item::new(TestKey(3..3), 3), Item::new(TestKey(5..5), 5)];
1181 skip_list.insert(items[0].clone()).expect("insert error");
1182 skip_list.insert(items[2].clone()).expect("insert error");
1183
1184 skip_list.merge_into(items[1].clone(), &items[0].key, |_left, right| {
1185 if right.key() == &TestKey(3..3) {
1186 MergeResult::Other {
1187 emit: None,
1188 left: Discard,
1189 right: Replace(Item::new(TestKey(2..2), 2).boxed()),
1190 }
1191 } else {
1192 assert_eq!(right.key(), &TestKey(5..5));
1193 MergeResult::Other {
1194 emit: None,
1195 left: Replace(Item::new(TestKey(4..4), 4).boxed()),
1196 right: Discard,
1197 }
1198 }
1199 });
1200
1201 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1202 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1203 assert_eq!((key, value), (&TestKey(4..4), &4));
1204 iter.advance().await.unwrap();
1205 assert!(iter.get().is_none());
1206 }
1207
1208 #[fuchsia::test]
1210 async fn test_merge_into_replace_existing() {
1211 let skip_list = SkipListLayer::new(100);
1212 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(3..3), 3)];
1213 skip_list.insert(items[1].clone()).expect("insert error");
1214
1215 skip_list.merge_into(items[0].clone(), &items[0].key, |_left, right| {
1216 if right.key() == &TestKey(3..3) {
1217 MergeResult::Other {
1218 emit: None,
1219 left: Keep,
1220 right: Replace(Item::new(TestKey(2..2), 2).boxed()),
1221 }
1222 } else {
1223 MergeResult::EmitLeft
1224 }
1225 });
1226
1227 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1228 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1229 assert_eq!((key, value), (&items[0].key, &items[0].value));
1230 iter.advance().await.unwrap();
1231 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1232 assert_eq!((key, value), (&TestKey(2..2), &2));
1233 iter.advance().await.unwrap();
1234 assert!(iter.get().is_none());
1235 }
1236
1237 #[fuchsia::test]
1238 async fn test_merge_into_discard_last() {
1239 let skip_list = SkipListLayer::new(100);
1240 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
1241 skip_list.insert(items[0].clone()).expect("insert error");
1242
1243 skip_list.merge_into(items[1].clone(), &items[0].key, |_left, _right| MergeResult::Other {
1244 emit: None,
1245 left: Discard,
1246 right: Keep,
1247 });
1248
1249 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1250 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1251 assert_eq!((key, value), (&items[1].key, &items[1].value));
1252 iter.advance().await.unwrap();
1253 assert!(iter.get().is_none());
1254 }
1255
1256 #[fuchsia::test]
1257 async fn test_merge_into_empty() {
1258 let skip_list = SkipListLayer::new(100);
1259 let items = [Item::new(TestKey(1..1), 1)];
1260
1261 skip_list.merge_into(items[0].clone(), &items[0].key, |_left, _right| {
1262 panic!("Unexpected merge!");
1263 });
1264
1265 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1266 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1267 assert_eq!((key, value), (&items[0].key, &items[0].value));
1268 iter.advance().await.unwrap();
1269 assert!(iter.get().is_none());
1270 }
1271
1272 #[fuchsia::test]
1273 async fn test_seek_uses_minimum_number_of_iterators() {
1274 let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1275 let items = [Item::new(TestKey(1..2), 1), Item::new(TestKey(1..2), 2)];
1276 skip_lists[0].insert(items[0].clone()).expect("insert error");
1277 skip_lists[1].insert(items[1].clone()).expect("insert error");
1278 let mut merger = Merger::new(
1279 layer_ref_iter(&skip_lists),
1280 |_left, _right| MergeResult::Other { emit: None, left: Discard, right: Keep },
1281 counters(),
1282 );
1283 let iter = merger
1284 .query(Query::FullRange(&items[0].key.search_key().unwrap()))
1285 .await
1286 .expect("seek failed");
1287
1288 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1291 assert_eq!((key, value), (&items[0].key, &items[0].value));
1292 }
1293
1294 async fn test_advance<K: Eq + Key + LayerKey + OrdLowerBound>(
1297 layers: &[&[(K, i32)]],
1298 query: Query<'_, K>,
1299 expected: &[(K, i32)],
1300 ) {
1301 let mut skip_lists = Vec::new();
1302 for &layer in layers {
1303 let skip_list = SkipListLayer::new(100);
1304 for (k, v) in layer {
1305 skip_list.insert(Item::new(k.clone(), *v)).expect("insert error");
1306 }
1307 skip_lists.push(skip_list);
1308 }
1309 let mut merger = Merger::new(
1310 layer_ref_iter(&skip_lists),
1311 |_left, _right| MergeResult::EmitLeft,
1312 counters(),
1313 );
1314 let mut iter = merger.query(query).await.expect("seek failed");
1315 for (k, v) in expected {
1316 let ItemRef { key, value, .. } = iter.get().expect("get failed");
1317 assert_eq!((key, value), (k, v));
1318 iter.advance().await.expect("advance failed");
1319 }
1320 assert!(iter.get().is_none());
1321 }
1322
1323 #[fuchsia::test]
1324 async fn test_seek_skips_replaced_items() {
1325 test_advance(
1327 &[
1328 &[(TestKey(1..2), 1), (TestKey(2..3), 2), (TestKey(4..5), 3)],
1329 &[(TestKey(1..2), 4), (TestKey(2..3), 5), (TestKey(3..4), 6)],
1330 ],
1331 Query::FullRange(&TestKey(1..2).search_key().unwrap()),
1332 &[(TestKey(1..2), 1), (TestKey(2..3), 2), (TestKey(3..4), 6), (TestKey(4..5), 3)],
1333 )
1334 .await;
1335 }
1336
1337 #[fuchsia::test]
1338 async fn test_advance_skips_replaced_items_at_end() {
1339 test_advance(
1342 &[&[(TestKey(1..2), 1)], &[(TestKey(1..2), 2)]],
1343 Query::FullRange(&TestKey(1..2).search_key().unwrap()),
1344 &[(TestKey(1..2), 1)],
1345 )
1346 .await;
1347 }
1348
1349 #[derive(
1350 Clone,
1351 Eq,
1352 Hash,
1353 FuzzyHash,
1354 PartialEq,
1355 Debug,
1356 serde::Serialize,
1357 serde::Deserialize,
1358 TypeFingerprint,
1359 Versioned,
1360 SerializeKey,
1361 )]
1362 struct TestKeyWithFullMerge(Range<u64>);
1363
1364 versioned_type! { 1.. => TestKeyWithFullMerge }
1365
1366 impl LayerKey for TestKeyWithFullMerge {
1367 fn merge_type(&self) -> MergeType {
1368 MergeType::FullMerge
1369 }
1370
1371 fn search_key(&self) -> Option<Self> {
1372 Some(Self(0..self.0.start + 1))
1373 }
1374
1375 fn is_search_key(&self) -> bool {
1376 self.0.start == 0
1377 }
1378
1379 fn overlaps(&self, other: &Self) -> bool {
1380 self.0.start < other.0.end && self.0.end > other.0.start
1381 }
1382 }
1383
1384 impl SortByU64 for TestKeyWithFullMerge {
1385 fn get_leading_u64(&self) -> u64 {
1386 self.0.end
1387 }
1388 }
1389
1390 impl OrdUpperBound for TestKeyWithFullMerge {
1391 fn cmp_upper_bound(&self, other: &TestKeyWithFullMerge) -> std::cmp::Ordering {
1392 self.0.end.cmp(&other.0.end)
1393 }
1394 }
1395
1396 impl OrdLowerBound for TestKeyWithFullMerge {
1397 fn cmp_lower_bound(&self, other: &Self) -> std::cmp::Ordering {
1398 self.0.start.cmp(&other.0.start)
1399 }
1400 }
1401
1402 #[fuchsia::test]
1405 async fn test_full_merge_consistent_advance_ordering() {
1406 let layer_set = [
1407 [
1408 (TestKeyWithFullMerge(1..2), 1i32),
1409 (TestKeyWithFullMerge(2..3), 2i32),
1410 (TestKeyWithFullMerge(4..5), 3i32),
1411 ]
1412 .as_slice(),
1413 [
1414 (TestKeyWithFullMerge(1..2), 4i32),
1415 (TestKeyWithFullMerge(2..3), 5i32),
1416 (TestKeyWithFullMerge(3..4), 6i32),
1417 ]
1418 .as_slice(),
1419 ];
1420
1421 let full_merge_result = [
1422 (TestKeyWithFullMerge(1..2), 1),
1423 (TestKeyWithFullMerge(1..2), 4),
1424 (TestKeyWithFullMerge(2..3), 2),
1425 (TestKeyWithFullMerge(2..3), 5),
1426 (TestKeyWithFullMerge(3..4), 6),
1427 (TestKeyWithFullMerge(4..5), 3),
1428 ];
1429
1430 test_advance(layer_set.as_slice(), Query::FullScan, &full_merge_result).await;
1431
1432 test_advance(
1433 layer_set.as_slice(),
1434 Query::FullRange(&TestKeyWithFullMerge(1..2).search_key().unwrap()),
1435 &full_merge_result,
1436 )
1437 .await;
1438
1439 test_advance(
1440 layer_set.as_slice(),
1441 Query::FullRange(&TestKeyWithFullMerge(2..3).search_key().unwrap()),
1442 &full_merge_result[2..],
1443 )
1444 .await;
1445
1446 test_advance(
1447 layer_set.as_slice(),
1448 Query::FullRange(&TestKeyWithFullMerge(3..4).search_key().unwrap()),
1449 &full_merge_result[4..],
1450 )
1451 .await;
1452 }
1453
1454 #[fuchsia::test]
1455 async fn test_full_merge_always_consult_all_layers() {
1456 let skip_lists =
1460 [SkipListLayer::new(100), SkipListLayer::new(100), SkipListLayer::new(100)];
1461 let items = [
1462 Item::new(TestKeyWithFullMerge(1..2), 1),
1463 Item::new(TestKeyWithFullMerge(2..3), 2),
1464 Item::new(TestKeyWithFullMerge(1..2), 3),
1465 Item::new(TestKeyWithFullMerge(2..3), 4),
1466 ];
1467 skip_lists[0].insert(items[0].clone()).expect("insert error");
1468 skip_lists[1].insert(items[1].clone()).expect("insert error");
1469 skip_lists[2].insert(items[2].clone()).expect("insert error");
1470 skip_lists[2].insert(items[3].clone()).expect("insert error");
1471 let mut merger = Merger::new(
1472 layer_ref_iter(&skip_lists),
1473 |left, right| {
1474 if left.key() == right.key() {
1476 MergeResult::Other {
1477 emit: None,
1478 left: Discard,
1479 right: Replace(
1480 Item::new(left.key().clone(), left.value() + right.value()).boxed(),
1481 ),
1482 }
1483 } else {
1484 MergeResult::EmitLeft
1485 }
1486 },
1487 counters(),
1488 );
1489 let mut iter = merger
1490 .query(Query::FullRange(&items[0].key.search_key().unwrap()))
1491 .await
1492 .expect("seek failed");
1493
1494 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1495 assert_eq!((key, *value), (&items[0].key, items[0].value + items[2].value));
1496 iter.advance().await.expect("advance");
1497 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1498 assert_eq!((key, *value), (&items[1].key, items[1].value + items[3].value));
1499 iter.advance().await.expect("advance");
1500 assert!(iter.get().is_none());
1501 }
1502
1503 #[derive(
1504 Clone,
1505 Eq,
1506 Hash,
1507 FuzzyHash,
1508 PartialEq,
1509 Debug,
1510 serde::Serialize,
1511 serde::Deserialize,
1512 TypeFingerprint,
1513 Versioned,
1514 SerializeKey,
1515 )]
1516 struct TestKeyWithDefaultLayerKey(Range<u64>);
1517
1518 versioned_type! { 1.. => TestKeyWithDefaultLayerKey }
1519
1520 impl LayerKey for TestKeyWithDefaultLayerKey {
1522 fn search_key(&self) -> Option<Self> {
1523 Some(Self(0..self.0.start + 1))
1524 }
1525
1526 fn is_search_key(&self) -> bool {
1527 self.0.start == 0
1528 }
1529
1530 fn overlaps(&self, other: &Self) -> bool {
1531 self.0.start < other.0.end && self.0.end > other.0.start
1532 }
1533 }
1534
1535 impl SortByU64 for TestKeyWithDefaultLayerKey {
1536 fn get_leading_u64(&self) -> u64 {
1537 self.0.end
1538 }
1539 }
1540
1541 impl OrdUpperBound for TestKeyWithDefaultLayerKey {
1542 fn cmp_upper_bound(&self, other: &TestKeyWithDefaultLayerKey) -> std::cmp::Ordering {
1543 self.0.end.cmp(&other.0.end)
1544 }
1545 }
1546
1547 impl OrdLowerBound for TestKeyWithDefaultLayerKey {
1548 fn cmp_lower_bound(&self, other: &Self) -> std::cmp::Ordering {
1549 self.0.start.cmp(&other.0.start)
1550 }
1551 }
1552
1553 #[fuchsia::test]
1554 async fn test_no_merge_unbounded_include_all_layers() {
1555 test_advance(
1556 &[
1557 &[
1558 (TestKeyWithDefaultLayerKey(1..2), 1),
1559 (TestKeyWithDefaultLayerKey(2..3), 2),
1560 (TestKeyWithDefaultLayerKey(4..5), 3),
1561 ],
1562 &[
1563 (TestKeyWithDefaultLayerKey(1..2), 4),
1564 (TestKeyWithDefaultLayerKey(2..3), 5),
1565 (TestKeyWithDefaultLayerKey(3..4), 6),
1566 ],
1567 ],
1568 Query::FullScan,
1569 &[
1570 (TestKeyWithDefaultLayerKey(1..2), 1),
1571 (TestKeyWithDefaultLayerKey(1..2), 4),
1572 (TestKeyWithDefaultLayerKey(2..3), 2),
1573 (TestKeyWithDefaultLayerKey(2..3), 5),
1574 (TestKeyWithDefaultLayerKey(3..4), 6),
1575 (TestKeyWithDefaultLayerKey(4..5), 3),
1576 ],
1577 )
1578 .await;
1579 }
1580
1581 #[fuchsia::test]
1582 async fn test_no_merge_proceeds_comprehensively_after_seek() {
1583 test_advance(
1584 &[
1585 &[
1586 (TestKeyWithDefaultLayerKey(1..2), 1),
1587 (TestKeyWithDefaultLayerKey(2..3), 2),
1588 (TestKeyWithDefaultLayerKey(4..5), 3),
1589 ],
1590 &[
1591 (TestKeyWithDefaultLayerKey(1..2), 4),
1592 (TestKeyWithDefaultLayerKey(2..3), 5),
1593 (TestKeyWithDefaultLayerKey(3..4), 6),
1594 ],
1595 ],
1596 Query::FullRange(&TestKeyWithDefaultLayerKey(1..2).search_key().unwrap()),
1597 &[
1598 (TestKeyWithDefaultLayerKey(1..2), 1),
1599 (TestKeyWithDefaultLayerKey(1..2), 4),
1600 (TestKeyWithDefaultLayerKey(2..3), 2),
1601 (TestKeyWithDefaultLayerKey(2..3), 5),
1602 (TestKeyWithDefaultLayerKey(3..4), 6),
1603 (TestKeyWithDefaultLayerKey(4..5), 3),
1604 ],
1605 )
1606 .await;
1607 }
1608
1609 #[fuchsia::test]
1610 async fn test_no_merge_seek_finds_lower_layer() {
1611 let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1612 let items = [
1613 Item::new(TestKeyWithDefaultLayerKey(2..3), 1),
1614 Item::new(TestKeyWithDefaultLayerKey(0..1), 2),
1615 ];
1616 skip_lists[0].insert(items[0].clone()).expect("insert error");
1617 skip_lists[1].insert(items[1].clone()).expect("insert error");
1618 let mut merger = Merger::new(
1619 layer_ref_iter(&skip_lists),
1620 |_left, _right| MergeResult::EmitLeft,
1621 counters(),
1622 );
1623 let iter = merger
1624 .query(Query::FullRange(&items[1].key.search_key().unwrap()))
1625 .await
1626 .expect("seek failed");
1627
1628 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1629 assert_eq!((key, value), (&items[1].key, &items[1].value));
1630 }
1631
1632 #[fuchsia::test]
1633 async fn test_no_merge_seek_stops_at_exact_match() {
1634 let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1635 let items = [
1636 Item::new(TestKeyWithDefaultLayerKey(2..3), 1),
1637 Item::new(TestKeyWithDefaultLayerKey(1..4), 2),
1638 ];
1639 skip_lists[0].insert(items[0].clone()).expect("insert error");
1640 skip_lists[1].insert(items[1].clone()).expect("insert error");
1641 let mut merger = Merger::new(
1642 layer_ref_iter(&skip_lists),
1643 |_left, _right| MergeResult::Other { emit: None, left: Discard, right: Keep },
1644 counters(),
1645 );
1646 let iter = merger
1647 .query(Query::FullRange(&items[0].key.search_key().unwrap()))
1648 .await
1649 .expect("seek failed");
1650
1651 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1654 assert_eq!((key, value), (&items[0].key, &items[0].value));
1655 }
1656
1657 #[fuchsia::test]
1658 async fn test_seek_less_than() {
1659 let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1660 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
1661 skip_lists[0].insert(items[0].clone()).expect("insert error");
1662 skip_lists[1].insert(items[1].clone()).expect("insert error");
1663 let mut merger = Merger::new(
1665 layer_ref_iter(&skip_lists),
1666 |_left, _right| MergeResult::Other { emit: None, left: Discard, right: Keep },
1667 counters(),
1668 );
1669 let iter = merger
1670 .query(Query::FullRange(&TestKey(0..0).search_key().unwrap()))
1671 .await
1672 .expect("seek failed");
1673
1674 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1676 assert_eq!((key, value), (&items[1].key, &items[1].value));
1677 }
1678
1679 #[fuchsia::test]
1680 async fn test_seek_to_end() {
1681 let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1682 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
1683 skip_lists[0].insert(items[0].clone()).expect("insert error");
1684 skip_lists[1].insert(items[1].clone()).expect("insert error");
1685 let mut merger = Merger::new(
1686 layer_ref_iter(&skip_lists),
1687 |_left, _right| MergeResult::Other { emit: None, left: Discard, right: Keep },
1688 counters(),
1689 );
1690 let iter = merger
1691 .query(Query::FullRange(&TestKey(3..3).search_key().unwrap()))
1692 .await
1693 .expect("seek failed");
1694
1695 assert!(iter.get().is_none());
1696 }
1697
1698 #[fuchsia::test]
1699 async fn test_merge_all_discarded() {
1700 let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1701 let items = [Item::new(TestKey(1..2), 1), Item::new(TestKey(2..3), 2)];
1702 skip_lists[0].insert(items[1].clone()).expect("insert error");
1703 skip_lists[1].insert(items[0].clone()).expect("insert error");
1704 let mut merger = Merger::new(
1705 layer_ref_iter(&skip_lists),
1706 |_left, _right| MergeResult::Other { emit: None, left: Discard, right: Discard },
1707 counters(),
1708 );
1709 let iter = merger.query(Query::FullScan).await.expect("seek failed");
1710 assert!(iter.get().is_none());
1711 }
1712
1713 #[fuchsia::test]
1714 async fn test_overlapping_keys() {
1715 let skip_lists =
1716 [SkipListLayer::new(100), SkipListLayer::new(100), SkipListLayer::new(100)];
1717 let items = [
1718 Item::new(TestKey(0..10), 1),
1719 Item::new(TestKey(0..20), 2),
1720 Item::new(TestKey(0..30), 3),
1721 ];
1722 skip_lists[0].insert(items[0].clone()).expect("insert error");
1723 skip_lists[1].insert(items[1].clone()).expect("insert error");
1724 skip_lists[2].insert(items[2].clone()).expect("insert error");
1725 let mut merger = Merger::new(
1726 layer_ref_iter(&skip_lists),
1727 |left, right| {
1728 if left.key().0.end <= right.key().0.start {
1729 MergeResult::EmitLeft
1730 } else {
1731 if left.key() == &TestKey(0..30) && right.key() == &TestKey(10..20) {
1733 MergeResult::Other {
1734 emit: Some(Item::new(TestKey(0..10), 1).boxed()),
1735 left: Replace(Item::new(TestKey(10..30), 1).boxed()),
1736 right: Keep,
1737 }
1738 } else {
1739 MergeResult::Other {
1740 emit: None,
1741 left: Keep,
1742 right: Replace(
1743 Item::new(TestKey(left.key().0.end..right.key().0.end), 1).boxed(),
1744 ),
1745 }
1746 }
1747 }
1748 },
1749 counters(),
1750 );
1751 let mut iter = merger
1752 .query(Query::FullRange(&TestKey(0..1).search_key().unwrap()))
1753 .await
1754 .expect("seek failed");
1755 assert_eq!(iter.pending_iterators_len(), 2);
1756 let ItemRef { key, .. } = iter.get().expect("missing item");
1757 assert_eq!(key, &TestKey(0..10));
1758 iter.advance().await.expect("advance failed");
1759 assert_eq!(iter.pending_iterators_len(), 1);
1760 let ItemRef { key, .. } = iter.get().expect("missing item");
1761 assert_eq!(key, &TestKey(10..20));
1762 iter.advance().await.expect("advance failed");
1763 assert_eq!(iter.pending_iterators_len(), 0);
1764 let ItemRef { key, .. } = iter.get().expect("missing item");
1765 assert_eq!(key, &TestKey(20..30));
1766 iter.advance().await.expect("advance failed");
1767 assert_eq!(iter.pending_iterators_len(), 0);
1768 assert_eq!(iter.get(), None);
1769 }
1770
1771 async fn write_layer<K: Key, V: Value>(items: Vec<Item<K, V>>) -> Arc<dyn Layer<K, V>> {
1772 let object = Arc::new(FakeObject::new());
1773 let write_handle = FakeObjectHandle::new(object.clone());
1774 let mut writer =
1775 PersistentLayerWriter::<_, K, V>::new(Writer::new(&write_handle).await, 1, 512)
1776 .await
1777 .expect("PersistentLayerWriter::new failed");
1778 for item in items {
1779 writer.write(item.as_item_ref()).await.expect("write failed");
1780 }
1781 writer.flush().await.expect("flush failed");
1782 PersistentLayer::open(FakeObjectHandle::new(object))
1783 .await
1784 .expect("open_persistent_layer failed")
1785 }
1786
1787 fn merge_sum(
1788 left: &MergeLayerIterator<'_, i32, i32>,
1789 right: &MergeLayerIterator<'_, i32, i32>,
1790 ) -> MergeResult<i32, i32> {
1791 if left.key() == right.key() {
1793 MergeResult::Other {
1794 emit: None,
1795 left: Discard,
1796 right: Replace(Item::new(left.key().clone(), left.value() + right.value()).boxed()),
1797 }
1798 } else {
1799 MergeResult::EmitLeft
1800 }
1801 }
1802
1803 #[fuchsia::test]
1804 async fn test_merge_bloom_filters_point_query() {
1805 let layer_0_items = vec![Item::new(1, 1), Item::new(2, 1)];
1806 let layer_1_items = vec![Item::new(2, 1), Item::new(4, 1)];
1807 let items = [Item::new(1, 1), Item::new(2, 2), Item::new(4, 1)];
1808 let layers: [Arc<dyn Layer<i32, i32>>; 2] =
1809 [write_layer(layer_0_items).await, write_layer(layer_1_items).await];
1810 let mut merger = Merger::new(dyn_layer_ref_iter(&layers), merge_sum, counters());
1811
1812 {
1813 let iter = merger.query(Query::Point(&1)).await.expect("seek failed");
1815 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1816 assert_eq!((key, *value), (&items[0].key, items[0].value));
1817 }
1818 {
1819 let iter = merger.query(Query::Point(&2)).await.expect("seek failed");
1821 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1822 assert_eq!((key, *value), (&items[1].key, items[1].value));
1823 }
1824 {
1825 let iter = merger.query(Query::Point(&4)).await.expect("seek failed");
1827 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1828 assert_eq!((key, *value), (&items[2].key, items[2].value));
1829 }
1830 {
1831 let iter = merger.query(Query::Point(&400)).await.expect("seek failed");
1833 assert!(iter.get().is_none());
1834 }
1835 }
1836
1837 #[fuchsia::test]
1838 async fn test_merge_bloom_filters_limited_range() {
1839 let layer_0_items = vec![Item::new(
1842 ObjectKey::extent(0, 0, 0..2048),
1843 ObjectValue::extent(0, VOLUME_DATA_KEY_ID),
1844 )];
1845 let layer_1_items = vec![
1846 Item::new(
1847 ObjectKey::extent(0, 0, 1024..4096),
1848 ObjectValue::extent(32768, VOLUME_DATA_KEY_ID),
1849 ),
1850 Item::new(
1851 ObjectKey::extent(0, 0, 16384..17408),
1852 ObjectValue::extent(65536, VOLUME_DATA_KEY_ID),
1853 ),
1854 ];
1855 let items = [
1856 Item::new(ObjectKey::extent(0, 0, 0..2048), ObjectValue::extent(0, VOLUME_DATA_KEY_ID)),
1857 Item::new(
1858 ObjectKey::extent(0, 0, 2048..4096),
1859 ObjectValue::extent(33792, VOLUME_DATA_KEY_ID),
1860 ),
1861 Item::new(
1862 ObjectKey::extent(0, 0, 16384..17408),
1863 ObjectValue::extent(65536, VOLUME_DATA_KEY_ID),
1864 ),
1865 ];
1866 let layers: [Arc<dyn Layer<ObjectKey, ObjectValue>>; 2] =
1867 [write_layer(layer_0_items).await, write_layer(layer_1_items).await];
1868 let mut merger =
1869 Merger::new(dyn_layer_ref_iter(&layers), object_store::merge::merge, counters());
1870
1871 {
1872 let mut iter = merger
1874 .query(Query::LimitedRange(&ObjectKey::extent(0, 0, 16384..16386)))
1875 .await
1876 .expect("seek failed");
1877 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1878 assert_eq!(key, &items[2].key);
1879 assert_eq!(value, &items[2].value);
1880 iter.advance().await.expect("advance");
1881 assert!(iter.get().is_none());
1882 }
1883 {
1884 let mut iter = merger
1886 .query(Query::LimitedRange(&ObjectKey::extent(0, 0, 0..4096)))
1887 .await
1888 .expect("seek failed");
1889 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1890 assert_eq!(key, &items[0].key);
1891 assert_eq!(value, &items[0].value);
1892 iter.advance().await.expect("advance");
1893 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1894 assert_eq!(key, &items[1].key);
1895 assert_eq!(value, &items[1].value);
1896 iter.advance().await.expect("advance");
1897 }
1898 {
1899 let mut iter = merger
1901 .query(Query::LimitedRange(&ObjectKey::extent(0, 0, 8192..12288)))
1902 .await
1903 .expect("seek failed");
1904 let ItemRef { key, .. } = iter.get().expect("missing item");
1905 assert_eq!(key, &items[2].key);
1906 iter.advance().await.expect("advance");
1907 assert!(iter.get().is_none());
1908 }
1909 }
1910
1911 #[fuchsia::test]
1912 async fn test_merge_bloom_filters_full_range() {
1913 let layer_0_items = vec![Item::new(1, 1), Item::new(2, 1)];
1914 let layer_1_items = vec![Item::new(2, 1), Item::new(4, 1)];
1915 let items = [Item::new(1, 1), Item::new(2, 2), Item::new(4, 1)];
1916 let layers: [Arc<dyn Layer<i32, i32>>; 2] =
1917 [write_layer(layer_0_items).await, write_layer(layer_1_items).await];
1918 let mut merger = Merger::new(dyn_layer_ref_iter(&layers), merge_sum, counters());
1919
1920 let mut iter = merger.query(Query::FullRange(&0)).await.expect("seek failed");
1921 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1922 assert_eq!((key, *value), (&items[0].key, items[0].value));
1923 iter.advance().await.expect("advance");
1924 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1925 assert_eq!((key, *value), (&items[1].key, items[1].value));
1926 iter.advance().await.expect("advance");
1927 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1928 assert_eq!((key, *value), (&items[2].key, items[2].value));
1929 iter.advance().await.expect("advance");
1930 assert!(iter.get().is_none());
1931 }
1932
1933 #[fuchsia::test]
1934 async fn test_merge_bloom_filters_full_scan() {
1935 let layer_0_items = vec![Item::new(1, 1), Item::new(2, 1)];
1936 let layer_1_items = vec![Item::new(2, 1), Item::new(4, 1)];
1937 let items = [Item::new(1, 1), Item::new(2, 2), Item::new(4, 1)];
1938 let layers: [Arc<dyn Layer<i32, i32>>; 2] =
1939 [write_layer(layer_0_items).await, write_layer(layer_1_items).await];
1940 let mut merger = Merger::new(dyn_layer_ref_iter(&layers), merge_sum, counters());
1941
1942 let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
1943 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1944 assert_eq!((key, *value), (&items[0].key, items[0].value));
1945 iter.advance().await.expect("advance");
1946 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1947 assert_eq!((key, *value), (&items[1].key, items[1].value));
1948 iter.advance().await.expect("advance");
1949 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1950 assert_eq!((key, *value), (&items[2].key, items[2].value));
1951 iter.advance().await.expect("advance");
1952 assert!(iter.get().is_none());
1953 }
1954
1955 #[fuchsia::test]
1956 async fn test_optimized_merge_lazy_pull() {
1957 let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1958 let items_top = [
1959 Item::new(TestKey(10..20), 1),
1960 Item::new(TestKey(20..30), 2),
1961 Item::new(TestKey(30..40), 3),
1962 ];
1963 let items_bottom = [Item::new(TestKey(40..50), 4)];
1964
1965 for item in &items_top {
1966 skip_lists[0].insert(item.clone()).expect("insert error");
1967 }
1968 for item in &items_bottom {
1969 skip_lists[1].insert(item.clone()).expect("insert error");
1970 }
1971
1972 let mut merger =
1973 Merger::new(layer_ref_iter(&skip_lists), |_, _| MergeResult::EmitLeft, counters());
1974 merger.set_trace(true);
1975
1976 let mut iter =
1977 merger.query(Query::LimitedRange(&TestKey(10..20))).await.expect("seek failed");
1978 assert_eq!(iter.pending_iterators_len(), 1);
1979
1980 let ItemRef { key, .. } = iter.get().expect("missing item");
1981 assert_eq!(key, &TestKey(10..20));
1982
1983 iter.advance().await.expect("advance failed");
1984 assert_eq!(iter.pending_iterators_len(), 1);
1985
1986 let ItemRef { key, .. } = iter.get().expect("missing item");
1987 assert_eq!(key, &TestKey(20..30));
1988
1989 iter.advance().await.expect("advance failed");
1990 assert_eq!(iter.pending_iterators_len(), 1);
1991
1992 let ItemRef { key, .. } = iter.get().expect("missing item");
1993 assert_eq!(key, &TestKey(30..40));
1994
1995 iter.advance().await.expect("advance failed");
1996 assert_eq!(iter.pending_iterators_len(), 0);
1997
1998 let ItemRef { key, .. } = iter.get().expect("missing item");
1999 assert_eq!(key, &TestKey(40..50));
2000
2001 iter.advance().await.expect("advance failed");
2002 assert!(iter.get().is_none());
2003 }
2004}