1use crate::log::*;
6use crate::lsm_tree;
7use crate::lsm_tree::types::{
8 BoxedItem, Item, ItemRef, Key, Layer, LayerIterator, LayerIteratorMut, LayerKey, MergeType,
9 OrdLowerBound, Value,
10};
11use anyhow::Error;
12use async_trait::async_trait;
13use futures::try_join;
14use std::cmp::Ordering;
15use std::collections::BinaryHeap;
16use std::fmt::{Debug, Write};
17use std::ops::{Bound, Deref, DerefMut};
18use std::sync::{Arc, atomic};
19
20#[derive(Debug, Eq, PartialEq)]
21pub enum ItemOp<K, V> {
22 Keep,
24
25 Discard,
27
28 Replace(BoxedItem<K, V>),
31}
32
33#[derive(Debug, Eq, PartialEq)]
34pub enum MergeResult<K, V> {
35 EmitLeft,
38
39 Other { emit: Option<BoxedItem<K, V>>, left: ItemOp<K, V>, right: ItemOp<K, V> },
60}
61
62pub type MergeFn<K, V> =
67 fn(&MergeLayerIterator<'_, K, V>, &MergeLayerIterator<'_, K, V>) -> MergeResult<K, V>;
68
69pub enum MergeItem<K, V> {
70 None,
71 Item(BoxedItem<K, V>),
72 Iter,
73}
74
75enum RawIterator<'a, K, V> {
76 None,
77 Const(Box<dyn LayerIterator<K, V> + 'a>),
78 Mut(Box<dyn LayerIteratorMut<K, V> + 'a>),
79}
80
81unsafe impl<K, V> Send for RawIterator<'_, K, V> {}
84
85pub struct MergeLayerIterator<'a, K, V> {
88 layer: Option<&'a dyn Layer<K, V>>,
89
90 iter: RawIterator<'a, K, V>,
92
93 pub layer_index: u16,
95
96 item: MergeItem<K, V>,
98}
99
100impl<'a, K, V> MergeLayerIterator<'a, K, V> {
101 pub fn key(&self) -> &K {
102 self.item().key
103 }
104
105 pub fn value(&self) -> &V {
106 self.item().value
107 }
108
109 pub fn sequence(&self) -> u64 {
110 self.item().sequence
111 }
112
113 fn new(layer_index: u16, layer: &'a dyn Layer<K, V>) -> Self {
114 MergeLayerIterator {
115 layer: Some(layer),
116 iter: RawIterator::None,
117 layer_index,
118 item: MergeItem::None,
119 }
120 }
121
122 fn new_with_item(layer_index: u16, item: MergeItem<K, V>) -> Self {
123 MergeLayerIterator { layer: None, iter: RawIterator::None, layer_index, item }
124 }
125
126 fn item(&self) -> ItemRef<'_, K, V> {
127 match &self.item {
128 MergeItem::None => panic!("No item!"),
129 MergeItem::Item(item) => ItemRef::from(item),
130 MergeItem::Iter => self.get().unwrap(),
131 }
132 }
133
134 fn get(&self) -> Option<ItemRef<'_, K, V>> {
135 match &self.iter {
136 RawIterator::None => panic!("No iterator!"),
137 RawIterator::Const(iter) => iter.get(),
138 RawIterator::Mut(iter) => iter.get(),
139 }
140 }
141
142 fn set_item_from_iter(&mut self) {
143 self.item = if match &self.iter {
144 RawIterator::None => unreachable!(),
145 RawIterator::Const(iter) => iter.get(),
146 RawIterator::Mut(iter) => iter.get(),
147 }
148 .is_some()
149 {
150 MergeItem::Iter
151 } else {
152 MergeItem::None
153 };
154 }
155
156 fn take_item(&mut self) -> Option<BoxedItem<K, V>> {
157 if let MergeItem::Item(_) = self.item {
158 let mut item = MergeItem::None;
159 std::mem::swap(&mut self.item, &mut item);
160 if let MergeItem::Item(item) = item {
161 Some(item)
162 } else {
163 unreachable!();
164 }
165 } else {
166 None
167 }
168 }
169
170 async fn advance(&mut self) -> Result<(), Error> {
171 if let MergeItem::Iter = self.item {
172 if let RawIterator::Const(iter) = &mut self.iter {
173 iter.advance().await?;
174 } else {
175 unreachable!();
177 }
178 }
179 self.set_item_from_iter();
180 Ok(())
181 }
182
183 fn replace(&mut self, item: BoxedItem<K, V>) {
184 self.item = MergeItem::Item(item);
185 }
186
187 fn is_some(&self) -> bool {
188 !matches!(self.item, MergeItem::None)
189 }
190
191 async fn maybe_advance(&mut self, op: &ItemOp<K, V>) -> Result<(), Error> {
194 if let ItemOp::Keep = op { Ok(()) } else { self.advance().await }
195 }
196}
197
198impl<K: OrdLowerBound, V> Ord for MergeLayerIterator<'_, K, V> {
200 fn cmp(&self, other: &Self) -> Ordering {
201 other.key().cmp_lower_bound(self.key()).then(other.layer_index.cmp(&self.layer_index))
203 }
204}
205impl<K: OrdLowerBound, V> PartialOrd for MergeLayerIterator<'_, K, V> {
206 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
207 Some(self.cmp(other))
208 }
209}
210impl<K: OrdLowerBound, V> PartialEq for MergeLayerIterator<'_, K, V> {
211 fn eq(&self, other: &Self) -> bool {
212 self.cmp(other) == Ordering::Equal
213 }
214}
215impl<K: OrdLowerBound, V> Eq for MergeLayerIterator<'_, K, V> {}
216
217enum CurrentItem<'a, 'b, K, V> {
220 None,
221 Item(BoxedItem<K, V>),
222 Iterator(&'a mut MergeLayerIterator<'b, K, V>),
223}
224
225impl<'a, 'b, K, V> CurrentItem<'a, 'b, K, V> {
226 fn take_iterator(&mut self) -> Option<&'a mut MergeLayerIterator<'b, K, V>> {
229 if let CurrentItem::Iterator(_) = self {
230 let mut result = CurrentItem::None;
231 std::mem::swap(self, &mut result);
232 if let CurrentItem::Iterator(iter) = result {
233 Some(iter)
234 } else {
235 unreachable!();
236 }
237 } else {
238 None
239 }
240 }
241}
242
243impl<'a, K, V> From<&'a CurrentItem<'_, '_, K, V>> for Option<ItemRef<'a, K, V>> {
244 fn from(iter: &'a CurrentItem<'_, '_, K, V>) -> Option<ItemRef<'a, K, V>> {
245 match iter {
246 CurrentItem::None => None,
247 CurrentItem::Iterator(iterator) => Some(iterator.item()),
248 CurrentItem::Item(item) => Some(item.into()),
249 }
250 }
251}
252
253pub struct Merger<'a, K, V> {
255 iterators: Vec<MergeLayerIterator<'a, K, V>>,
257
258 merge_fn: MergeFn<K, V>,
260
261 trace: bool,
263
264 counters: Arc<lsm_tree::TreeCounters>,
266}
267
268#[derive(Debug, Clone)]
275pub enum Query<'a, K: Key + LayerKey + OrdLowerBound> {
276 Point(&'a K),
281 LimitedRange(&'a K),
288 FullRange(&'a K),
291 FullScan,
294}
295
296impl<'a, K: Key + LayerKey + OrdLowerBound> Query<'a, K> {
297 fn needs_layer<V>(&self, layer: &dyn Layer<K, V>) -> bool {
298 match self {
299 Self::Point(key) => layer.maybe_contains_key(key),
300 Self::LimitedRange(key) => layer.maybe_contains_key(key),
301 Self::FullRange(_) => true,
302 Self::FullScan => true,
303 }
304 }
305}
306
307#[fxfs_trace::trace]
308impl<'a, K: Key + LayerKey + OrdLowerBound, V: Value> Merger<'a, K, V> {
309 pub(super) fn new<I: Iterator<Item = &'a dyn Layer<K, V>>>(
310 layers: I,
311 merge_fn: MergeFn<K, V>,
312 counters: Arc<lsm_tree::TreeCounters>,
313 ) -> Merger<'a, K, V> {
314 Merger {
315 iterators: layers
316 .enumerate()
317 .map(|(index, layer)| MergeLayerIterator::new(index as u16, layer))
318 .collect(),
319 merge_fn: merge_fn,
320 trace: false,
321 counters,
322 }
323 }
324
325 #[trace]
328 pub async fn query(
329 &mut self,
330 query: Query<'_, K>,
331 ) -> Result<MergerIterator<'_, 'a, K, V>, Error> {
332 if let Query::Point(key) = query {
333 debug_assert!(!key.is_range_key())
341 };
342 let len = self.iterators.len();
343 let pending_iterators = {
344 fxfs_trace::duration!("Merger::filter_layer_files", "len" => len);
345 self.iterators
346 .iter_mut()
347 .rev()
348 .filter(|l| query.needs_layer(l.layer.clone().unwrap()))
349 .collect::<Vec<&mut MergeLayerIterator<'a, K, V>>>()
350 };
351 let layer_count = pending_iterators.len();
352 {
353 self.counters.num_seeks.fetch_add(1, atomic::Ordering::Relaxed);
354 self.counters.layer_files_total.fetch_add(len, atomic::Ordering::Relaxed);
355 self.counters
356 .layer_files_skipped
357 .fetch_add(len - layer_count, atomic::Ordering::Relaxed);
358 }
359 log::debug!(query:?; "Consulting {}/{} layers", layer_count, len);
360 let mut merger_iter = MergerIterator {
361 merge_fn: self.merge_fn,
362 pending_iterators,
363 heap: BinaryHeap::with_capacity(layer_count),
364 item: CurrentItem::None,
365 trace: self.trace,
366 history: String::new(),
367 };
368 let owned_bound;
369 let bound = match query {
370 Query::Point(key) => Bound::Included(key),
371 Query::LimitedRange(key) => {
372 owned_bound = Bound::Included(key.search_key());
373 owned_bound.as_ref()
374 }
375 Query::FullRange(start) => Bound::Included(start),
376 Query::FullScan => Bound::Unbounded,
377 };
378 merger_iter.seek(bound).await?;
379 Ok(merger_iter)
380 }
381
382 pub fn set_trace(&mut self, v: bool) {
383 self.trace = v;
384 }
385}
386
387pub struct MergerIterator<'a, 'b, K, V> {
390 merge_fn: MergeFn<K, V>,
391
392 pending_iterators: Vec<&'a mut MergeLayerIterator<'b, K, V>>,
394
395 heap: BinaryHeap<&'a mut MergeLayerIterator<'b, K, V>>,
397
398 item: CurrentItem<'a, 'b, K, V>,
400
401 trace: bool,
403
404 history: String,
406}
407
408impl<'a, 'b, K: Key + LayerKey + OrdLowerBound, V: Value> MergerIterator<'a, 'b, K, V> {
409 async fn seek(&mut self, bound: Bound<&K>) -> Result<(), Error> {
410 let next_key = match bound {
411 Bound::Unbounded => None,
412 Bound::Included(key) => Some(key),
413 Bound::Excluded(_) => panic!("Excluded bounds not supported!"),
414 };
415
416 self.push_iterators(next_key, bound).await?;
417 self.advance_impl(bound).await
418 }
419
420 async fn advance_impl(&mut self, next_key_bound: Bound<&K>) -> Result<(), Error> {
426 loop {
427 loop {
428 if self.heap.is_empty() {
429 self.item = CurrentItem::None;
430 return Ok(());
431 }
432 let lowest = self.heap.pop().unwrap();
433 let maybe_second_lowest = self.heap.pop();
434 if let Some(second_lowest) = maybe_second_lowest {
435 let result = (self.merge_fn)(&lowest, &second_lowest);
436 if self.trace {
437 writeln!(
438 self.history,
439 "merge {:?}, {:?} -> {:?}",
440 lowest.item(),
441 second_lowest.item(),
442 result
443 )
444 .unwrap();
445 }
446 match result {
447 MergeResult::EmitLeft => {
448 self.heap.push(second_lowest);
449 self.item = CurrentItem::Iterator(lowest);
450 break;
451 }
452 MergeResult::Other { emit, left, right } => {
453 try_join!(
454 lowest.maybe_advance(&left),
455 second_lowest.maybe_advance(&right)
456 )?;
457 self.update_item(lowest, left);
458 self.update_item(second_lowest, right);
459 if let Some(emit) = emit {
460 self.item = CurrentItem::Item(emit);
461 break;
462 }
463 }
464 }
465 } else {
466 self.item = CurrentItem::Iterator(lowest);
467 break;
468 }
469 }
470
471 match next_key_bound {
491 Bound::Included(key)
492 if self.get().unwrap().key.cmp_upper_bound(key) == Ordering::Less => {}
493 Bound::Excluded(key)
494 if self.get().unwrap().key.cmp_upper_bound(key) != Ordering::Greater => {}
495 _ => return Ok(()),
496 }
497 if let Some(iterator) = self.item.take_iterator() {
498 iterator.advance().await?;
499 if iterator.is_some() {
500 self.heap.push(iterator);
501 }
502 }
503 }
504 }
505
506 fn needs_more_iterators(&self, next_key: Option<&K>, next_key_bound: Bound<&K>) -> bool {
508 if self.pending_iterators.is_empty() {
509 return false;
510 }
511 if self.heap.is_empty() {
512 return true;
513 }
514 let target;
515 match next_key_bound {
516 Bound::Included(k) => target = k,
517 Bound::Excluded(_) | Bound::Unbounded => return true,
518 }
519 match target.merge_type() {
520 MergeType::FullMerge => true,
521 MergeType::OptimizedMerge => {
522 match next_key {
523 Some(k) => {
524 self.heap.peek().unwrap().key().cmp_lower_bound(k) == Ordering::Greater
525 }
526 None => {
527 self.heap.peek().unwrap().key().cmp_lower_bound(target) != Ordering::Equal
529 }
530 }
531 }
532 }
533 }
534
535 async fn push_iterators(
541 &mut self,
542 next_key: Option<&K>,
543 next_key_bound: Bound<&K>,
544 ) -> Result<(), Error> {
545 while self.needs_more_iterators(next_key, next_key_bound) {
546 let iter = self.pending_iterators.pop().unwrap();
547 let sub_iter = iter.layer.as_ref().unwrap().seek(next_key_bound).await?;
548 if self.trace {
549 writeln!(
550 self.history,
551 "merger: search for {:?}, found {:?}",
552 next_key_bound,
553 sub_iter.get()
554 )
555 .unwrap();
556 }
557 iter.iter = RawIterator::Const(sub_iter);
558 iter.set_item_from_iter();
559 if iter.is_some() {
560 self.heap.push(iter);
561 }
562 }
563 Ok(())
564 }
565
566 fn update_item(&mut self, item: &'a mut MergeLayerIterator<'b, K, V>, op: ItemOp<K, V>) {
569 match op {
570 ItemOp::Keep => self.heap.push(item),
571 ItemOp::Discard => {
572 if item.is_some() {
574 self.heap.push(item);
575 }
576 }
577 ItemOp::Replace(replacement) => {
578 item.replace(replacement);
579 self.heap.push(item);
580 }
581 }
582 }
583}
584
585#[async_trait]
586impl<'a, K: Key + LayerKey + OrdLowerBound, V: Value> LayerIterator<K, V>
587 for MergerIterator<'a, '_, K, V>
588{
589 async fn advance(&mut self) -> Result<(), Error> {
590 let current_key;
591 let mut next_key = None;
592 let mut next_key_bound = Bound::Unbounded;
593 if !self.pending_iterators.is_empty() {
594 if let Some(ItemRef { key, .. }) = self.get() {
595 next_key = key.next_key();
596 match next_key {
597 None => {
598 current_key = Some(key.clone());
601 next_key_bound = Bound::Excluded(current_key.as_ref().unwrap());
602 }
603 Some(ref key) => next_key_bound = Bound::Included(key),
604 }
605 }
606 }
607
608 if let Some(iterator) = self.item.take_iterator() {
611 if self.needs_more_iterators(next_key.as_ref(), next_key_bound) {
612 let existing_item = iterator.item().boxed();
613 iterator.advance().await?;
614 match &next_key {
615 Some(next_key)
616 if iterator.is_some()
617 && iterator.key().cmp_lower_bound(next_key) != Ordering::Greater =>
618 {
619 }
623 _ => {
624 iterator.replace(existing_item);
628
629 self.push_iterators(next_key.as_ref(), next_key_bound).await?;
632 }
633 }
634 } else {
635 iterator.advance().await?;
636 }
637 if iterator.is_some() {
638 self.heap.push(iterator);
639 }
640 } else {
641 self.push_iterators(next_key.as_ref(), next_key_bound).await?;
642 }
643
644 self.advance_impl(next_key_bound).await
645 }
646
647 fn get(&self) -> Option<ItemRef<'_, K, V>> {
648 (&self.item).into()
649 }
650}
651
652struct MutMergeLayerIterator<'a, K, V>(MergeLayerIterator<'a, K, V>);
653
654impl<K, V> MutMergeLayerIterator<'_, K, V> {
655 fn advance(&mut self) {
656 if let MergeItem::Iter = self.item {
657 self.as_mut().advance();
658 }
659 self.set_item_from_iter();
660 }
661}
662
663impl<'a, K, V> AsMut<dyn LayerIteratorMut<K, V> + 'a> for MutMergeLayerIterator<'a, K, V> {
664 fn as_mut(&mut self) -> &mut (dyn LayerIteratorMut<K, V> + 'a) {
665 let RawIterator::Mut(iter) = &mut self.0.iter else { unreachable!() };
666 iter.as_mut()
667 }
668}
669
670impl<'a, K, V> Deref for MutMergeLayerIterator<'a, K, V> {
671 type Target = MergeLayerIterator<'a, K, V>;
672
673 fn deref(&self) -> &Self::Target {
674 &self.0
675 }
676}
677
678impl<K, V> DerefMut for MutMergeLayerIterator<'_, K, V> {
679 fn deref_mut(&mut self) -> &mut Self::Target {
680 &mut self.0
681 }
682}
683
684pub(super) fn merge_into<K: Debug + OrdLowerBound, V: Debug>(
686 mut_iter: Box<dyn LayerIteratorMut<K, V> + '_>,
687 item: Item<K, V>,
688 merge_fn: MergeFn<K, V>,
689) -> Result<(), Error> {
690 let merge_item = if mut_iter.get().is_some() { MergeItem::Iter } else { MergeItem::None };
691 let mut mut_merge_iter = MutMergeLayerIterator(MergeLayerIterator {
692 layer: None,
693 iter: RawIterator::Mut(mut_iter),
694 layer_index: 1,
695 item: merge_item,
696 });
697 let mut item_merge_iter = MergeLayerIterator::new_with_item(0, MergeItem::Item(item.boxed()));
698 while mut_merge_iter.is_some() && item_merge_iter.is_some() {
699 if mut_merge_iter.0 > item_merge_iter {
700 let merge_result = merge_fn(&mut_merge_iter, &item_merge_iter);
702 debug!(
703 lhs:? = mut_merge_iter.key(),
704 rhs:? = item_merge_iter.key(),
705 result:? = merge_result;
706 "(1) merge");
707 match merge_result {
708 MergeResult::EmitLeft => {
709 if let Some(item) = mut_merge_iter.take_item() {
710 mut_merge_iter.as_mut().insert(*item);
711 mut_merge_iter.set_item_from_iter();
712 } else {
713 mut_merge_iter.advance();
714 }
715 }
716 MergeResult::Other { emit, left, right } => {
717 if let Some(emit) = emit {
718 mut_merge_iter.as_mut().insert(*emit);
719 }
720 match left {
721 ItemOp::Keep => {}
722 ItemOp::Discard => {
723 if matches!(mut_merge_iter.item, MergeItem::Iter) {
724 mut_merge_iter.as_mut().erase();
725 }
726 mut_merge_iter.set_item_from_iter();
727 }
728 ItemOp::Replace(item) => {
729 if let MergeItem::Iter = mut_merge_iter.item {
730 mut_merge_iter.as_mut().erase();
731 }
732 mut_merge_iter.item = MergeItem::Item(item)
733 }
734 }
735 match right {
736 ItemOp::Keep => {}
737 ItemOp::Discard => item_merge_iter.item = MergeItem::None,
738 ItemOp::Replace(item) => item_merge_iter.item = MergeItem::Item(item),
739 }
740 }
741 }
742 } else {
743 let merge_result = merge_fn(&item_merge_iter, &mut_merge_iter);
745 debug!(
746 lhs:? = mut_merge_iter.key(),
747 rhs:? = item_merge_iter.key(),
748 result:? = merge_result;
749 "(2) merge");
750 match merge_result {
751 MergeResult::EmitLeft => break, MergeResult::Other { emit, left, right } => {
753 if let Some(emit) = emit {
754 mut_merge_iter.as_mut().insert(*emit);
755 }
756 match left {
757 ItemOp::Keep => {}
758 ItemOp::Discard => item_merge_iter.item = MergeItem::None,
759 ItemOp::Replace(item) => item_merge_iter.item = MergeItem::Item(item),
760 }
761 match right {
762 ItemOp::Keep => {}
763 ItemOp::Discard => {
764 if matches!(mut_merge_iter.item, MergeItem::Iter) {
765 mut_merge_iter.as_mut().erase();
766 }
767 mut_merge_iter.set_item_from_iter();
768 }
769 ItemOp::Replace(item) => {
770 if let MergeItem::Iter = mut_merge_iter.item {
771 mut_merge_iter.as_mut().erase();
772 }
773 mut_merge_iter.item = MergeItem::Item(item)
774 }
775 }
776 }
777 }
778 }
779 } if let MergeItem::Item(item) = item_merge_iter.item {
784 mut_merge_iter.as_mut().insert(*item);
785 }
786 if let Some(item) = mut_merge_iter.take_item() {
787 mut_merge_iter.as_mut().insert(*item);
788 }
789 if let RawIterator::Mut(mut iter) = mut_merge_iter.0.iter {
790 iter.commit();
791 }
792 Ok(())
793}
794
795#[cfg(test)]
796mod tests {
797 use super::ItemOp::{Discard, Keep, Replace};
798 use super::{MergeLayerIterator, MergeResult, Merger};
799 use crate::lsm_tree::persistent_layer::{PersistentLayer, PersistentLayerWriter};
800 use crate::lsm_tree::skip_list_layer::SkipListLayer;
801 use crate::lsm_tree::types::{
802 FuzzyHash, Item, ItemRef, Key, Layer, LayerIterator, LayerKey, LayerWriter, MergeType,
803 OrdLowerBound, OrdUpperBound, SortByU64,
804 };
805 use crate::lsm_tree::{self, Query, Value};
806 use crate::object_store::{self, ObjectKey, ObjectValue, VOLUME_DATA_KEY_ID};
807 use crate::serialized_types::{
808 LATEST_VERSION, Version, Versioned, VersionedLatest, versioned_type,
809 };
810 use crate::testing::fake_object::{FakeObject, FakeObjectHandle};
811 use crate::testing::writer::Writer;
812 use fprint::TypeFingerprint;
813 use fxfs_macros::FuzzyHash;
814 use rand::Rng;
815 use std::hash::Hash;
816 use std::ops::{Bound, Range};
817 use std::sync::Arc;
818
819 #[derive(
820 Clone,
821 Eq,
822 Hash,
823 FuzzyHash,
824 PartialEq,
825 Debug,
826 serde::Serialize,
827 serde::Deserialize,
828 TypeFingerprint,
829 Versioned,
830 )]
831 struct TestKey(Range<u64>);
832
833 impl Value for i32 {
834 const DELETED_MARKER: Self = 0;
835 }
836
837 versioned_type! { 1.. => TestKey }
838
839 impl SortByU64 for TestKey {
840 fn get_leading_u64(&self) -> u64 {
841 self.0.start
842 }
843 }
844
845 impl LayerKey for TestKey {
846 fn merge_type(&self) -> MergeType {
847 MergeType::OptimizedMerge
848 }
849
850 fn next_key(&self) -> Option<Self> {
851 Some(TestKey(self.0.end..self.0.end + 1))
852 }
853 }
854
855 impl OrdUpperBound for TestKey {
856 fn cmp_upper_bound(&self, other: &TestKey) -> std::cmp::Ordering {
857 self.0.end.cmp(&other.0.end)
858 }
859 }
860
861 impl OrdLowerBound for TestKey {
862 fn cmp_lower_bound(&self, other: &Self) -> std::cmp::Ordering {
863 self.0.start.cmp(&other.0.start)
864 }
865 }
866
867 fn layer_ref_iter<K: Key, V: Value>(
868 layers: &[Arc<SkipListLayer<K, V>>],
869 ) -> impl Iterator<Item = &dyn Layer<K, V>> {
870 layers.iter().map(|x| x.as_ref() as &dyn Layer<K, V>)
871 }
872
873 fn dyn_layer_ref_iter<K: Key, V: Value>(
874 layers: &[Arc<dyn Layer<K, V>>],
875 ) -> impl Iterator<Item = &dyn Layer<K, V>> {
876 layers.iter().map(|x| x.as_ref())
877 }
878
879 fn counters() -> Arc<lsm_tree::TreeCounters> {
880 Arc::new(lsm_tree::TreeCounters::default())
881 }
882
883 #[fuchsia::test]
884 async fn test_emit_left() {
885 let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
886 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
887 skip_lists[0].insert(items[1].clone()).expect("insert error");
888 skip_lists[1].insert(items[0].clone()).expect("insert error");
889 let mut merger = Merger::new(
890 layer_ref_iter(&skip_lists),
891 |_left, _right| MergeResult::EmitLeft,
892 counters(),
893 );
894 let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
895 let ItemRef { key, value, .. } = iter.get().expect("missing item");
896 assert_eq!((key, value), (&items[0].key, &items[0].value));
897 iter.advance().await.unwrap();
898 let ItemRef { key, value, .. } = iter.get().expect("missing item");
899 assert_eq!((key, value), (&items[1].key, &items[1].value));
900 iter.advance().await.unwrap();
901 assert!(iter.get().is_none());
902 }
903
904 #[fuchsia::test]
905 async fn test_other_emit() {
906 let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
907 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
908 skip_lists[0].insert(items[1].clone()).expect("insert error");
909 skip_lists[1].insert(items[0].clone()).expect("insert error");
910 let mut merger = Merger::new(
911 layer_ref_iter(&skip_lists),
912 |_left, _right| MergeResult::Other {
913 emit: Some(Item::new(TestKey(3..3), 3).boxed()),
914 left: Discard,
915 right: Discard,
916 },
917 counters(),
918 );
919 let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
920
921 let ItemRef { key, value, .. } = iter.get().expect("missing item");
922 assert_eq!((key, value), (&TestKey(3..3), &3));
923 iter.advance().await.unwrap();
924 assert!(iter.get().is_none());
925 }
926
927 #[fuchsia::test]
928 async fn test_replace_left() {
929 let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
930 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
931 skip_lists[0].insert(items[1].clone()).expect("insert error");
932 skip_lists[1].insert(items[0].clone()).expect("insert error");
933 let mut merger = Merger::new(
934 layer_ref_iter(&skip_lists),
935 |_left, _right| MergeResult::Other {
936 emit: None,
937 left: Replace(Item::new(TestKey(3..3), 3).boxed()),
938 right: Discard,
939 },
940 counters(),
941 );
942 let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
943
944 let ItemRef { key, value, .. } = iter.get().expect("missing item");
947 assert_eq!((key, value), (&TestKey(3..3), &3));
948 iter.advance().await.unwrap();
949 assert!(iter.get().is_none());
950 }
951
952 #[fuchsia::test]
953 async fn test_replace_right() {
954 let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
955 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
956 skip_lists[0].insert(items[1].clone()).expect("insert error");
957 skip_lists[1].insert(items[0].clone()).expect("insert error");
958 let mut merger = Merger::new(
959 layer_ref_iter(&skip_lists),
960 |_left, _right| MergeResult::Other {
961 emit: None,
962 left: Discard,
963 right: Replace(Item::new(TestKey(3..3), 3).boxed()),
964 },
965 counters(),
966 );
967 let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
968
969 let ItemRef { key, value, .. } = iter.get().expect("missing item");
972 assert_eq!((key, value), (&TestKey(3..3), &3));
973 iter.advance().await.unwrap();
974 assert!(iter.get().is_none());
975 }
976
977 #[fuchsia::test]
978 async fn test_left_less_than_right() {
979 let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
980 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
981 skip_lists[0].insert(items[1].clone()).expect("insert error");
982 skip_lists[1].insert(items[0].clone()).expect("insert error");
983 let mut merger = Merger::new(
984 layer_ref_iter(&skip_lists),
985 |left, right| {
986 assert_eq!((left.key(), left.value()), (&TestKey(1..1), &1));
987 assert_eq!((right.key(), right.value()), (&TestKey(2..2), &2));
988 MergeResult::EmitLeft
989 },
990 counters(),
991 );
992 merger.query(Query::FullScan).await.expect("seek failed");
993 }
994
995 #[fuchsia::test]
996 async fn test_left_equals_right() {
997 let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
998 let item = Item::new(TestKey(1..1), 1);
999 skip_lists[0].insert(item.clone()).expect("insert error");
1000 skip_lists[1].insert(item.clone()).expect("insert error");
1001 let mut merger = Merger::new(
1002 layer_ref_iter(&skip_lists),
1003 |left, right| {
1004 assert_eq!((left.key(), left.value()), (&TestKey(1..1), &1));
1005 assert_eq!((right.key(), right.value()), (&TestKey(1..1), &1));
1006 assert_eq!(left.layer_index, 0);
1007 assert_eq!(right.layer_index, 1);
1008 MergeResult::EmitLeft
1009 },
1010 counters(),
1011 );
1012 merger.query(Query::FullScan).await.expect("seek failed");
1013 }
1014
1015 #[fuchsia::test]
1016 async fn test_keep() {
1017 let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1018 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
1019 skip_lists[0].insert(items[1].clone()).expect("insert error");
1020 skip_lists[1].insert(items[0].clone()).expect("insert error");
1021 let mut merger = Merger::new(
1022 layer_ref_iter(&skip_lists),
1023 |left, right| {
1024 if left.key() == &TestKey(1..1) {
1025 MergeResult::Other {
1026 emit: None,
1027 left: Replace(Item::new(TestKey(3..3), 3).boxed()),
1028 right: Keep,
1029 }
1030 } else {
1031 assert_eq!(left.key(), &TestKey(2..2));
1032 assert_eq!(right.key(), &TestKey(3..3));
1033 MergeResult::Other { emit: None, left: Discard, right: Keep }
1034 }
1035 },
1036 counters(),
1037 );
1038 let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
1039
1040 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1043 assert_eq!((key, value), (&TestKey(3..3), &3));
1044 iter.advance().await.unwrap();
1045 assert!(iter.get().is_none());
1046 }
1047
1048 #[fuchsia::test]
1049 async fn test_merge_10_layers() {
1050 let skip_lists: Vec<_> = (0..10).map(|_| SkipListLayer::new(100)).collect();
1051 let mut rng = rand::rng();
1052 for i in 0..100 {
1053 skip_lists[rng.random_range(0..10) as usize]
1054 .insert(Item::new(TestKey(i..i), i))
1055 .expect("insert error");
1056 }
1057 let mut merger = Merger::new(
1058 layer_ref_iter(&skip_lists),
1059 |_left, _right| MergeResult::EmitLeft,
1060 counters(),
1061 );
1062 let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
1063
1064 for i in 0..100 {
1065 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1066 assert_eq!((key, value), (&TestKey(i..i), &i));
1067 iter.advance().await.unwrap();
1068 }
1069 assert!(iter.get().is_none());
1070 }
1071
1072 #[fuchsia::test]
1073 async fn test_merge_uses_cmp_lower_bound() {
1074 let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1075 let items = [Item::new(TestKey(1..10), 1), Item::new(TestKey(2..3), 2)];
1076 skip_lists[0].insert(items[1].clone()).expect("insert error");
1077 skip_lists[1].insert(items[0].clone()).expect("insert error");
1078 let mut merger = Merger::new(
1079 layer_ref_iter(&skip_lists),
1080 |_left, _right| MergeResult::EmitLeft,
1081 counters(),
1082 );
1083 let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
1084
1085 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1086 assert_eq!((key, value), (&items[0].key, &items[0].value));
1087 iter.advance().await.unwrap();
1088 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1089 assert_eq!((key, value), (&items[1].key, &items[1].value));
1090 iter.advance().await.unwrap();
1091 assert!(iter.get().is_none());
1092 }
1093
1094 #[fuchsia::test]
1095 async fn test_merge_into_emit_left() {
1096 let skip_list = SkipListLayer::new(100);
1097 let items =
1098 [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2), Item::new(TestKey(3..3), 3)];
1099 skip_list.insert(items[0].clone()).expect("insert error");
1100 skip_list.insert(items[2].clone()).expect("insert error");
1101 skip_list
1102 .merge_into(items[1].clone(), &items[0].key, |_left, _right| MergeResult::EmitLeft);
1103
1104 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1105 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1106 assert_eq!((key, value), (&items[0].key, &items[0].value));
1107 iter.advance().await.unwrap();
1108 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1109 assert_eq!((key, value), (&items[1].key, &items[1].value));
1110 iter.advance().await.unwrap();
1111 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1112 assert_eq!((key, value), (&items[2].key, &items[2].value));
1113 iter.advance().await.unwrap();
1114 assert!(iter.get().is_none());
1115 }
1116
1117 #[fuchsia::test]
1118 async fn test_merge_into_emit_last_after_replacing() {
1119 let skip_list = SkipListLayer::new(100);
1120 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
1121 skip_list.insert(items[0].clone()).expect("insert error");
1122
1123 skip_list.merge_into(items[1].clone(), &items[0].key, |left, right| {
1124 if left.key() == &TestKey(1..1) {
1125 assert_eq!(right.key(), &TestKey(2..2));
1126 MergeResult::Other {
1127 emit: None,
1128 left: Replace(Item::new(TestKey(3..3), 3).boxed()),
1129 right: Keep,
1130 }
1131 } else {
1132 assert_eq!(left.key(), &TestKey(2..2));
1133 assert_eq!(right.key(), &TestKey(3..3));
1134 MergeResult::EmitLeft
1135 }
1136 });
1137
1138 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1139 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1140 assert_eq!((key, value), (&items[1].key, &items[1].value));
1141 iter.advance().await.unwrap();
1142 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1143 assert_eq!((key, value), (&TestKey(3..3), &3));
1144 iter.advance().await.unwrap();
1145 assert!(iter.get().is_none());
1146 }
1147
1148 #[fuchsia::test]
1149 async fn test_merge_into_emit_left_after_replacing() {
1150 let skip_list = SkipListLayer::new(100);
1151 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(3..3), 3)];
1152 skip_list.insert(items[0].clone()).expect("insert error");
1153
1154 skip_list.merge_into(items[1].clone(), &items[0].key, |left, right| {
1155 if left.key() == &TestKey(1..1) {
1156 assert_eq!(right.key(), &TestKey(3..3));
1157 MergeResult::Other {
1158 emit: None,
1159 left: Replace(Item::new(TestKey(2..2), 2).boxed()),
1160 right: Keep,
1161 }
1162 } else {
1163 assert_eq!(left.key(), &TestKey(2..2));
1164 assert_eq!(right.key(), &TestKey(3..3));
1165 MergeResult::EmitLeft
1166 }
1167 });
1168
1169 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1170 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1171 assert_eq!((key, value), (&TestKey(2..2), &2));
1172 iter.advance().await.unwrap();
1173 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1174 assert_eq!((key, value), (&items[1].key, &items[1].value));
1175 iter.advance().await.unwrap();
1176 assert!(iter.get().is_none());
1177 }
1178
1179 #[fuchsia::test]
1181 async fn test_merge_into_emit_other_and_discard() {
1182 let skip_list = SkipListLayer::new(100);
1183 let items =
1184 [Item::new(TestKey(1..1), 1), Item::new(TestKey(3..3), 3), Item::new(TestKey(5..5), 3)];
1185 skip_list.insert(items[0].clone()).expect("insert error");
1186 skip_list.insert(items[2].clone()).expect("insert error");
1187
1188 skip_list.merge_into(items[1].clone(), &items[0].key, |left, right| {
1189 if left.key() == &TestKey(1..1) {
1190 assert_eq!(right.key(), &TestKey(3..3));
1192 MergeResult::Other {
1193 emit: Some(Item::new(TestKey(2..2), 2).boxed()),
1194 left: Discard,
1195 right: Keep,
1196 }
1197 } else {
1198 assert_eq!(left.key(), &TestKey(3..3));
1200 assert_eq!(right.key(), &TestKey(5..5));
1201 MergeResult::Other {
1202 emit: Some(Item::new(TestKey(4..4), 4).boxed()),
1203 left: Discard,
1204 right: Discard,
1205 }
1206 }
1207 });
1208
1209 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1210 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1211 assert_eq!((key, value), (&TestKey(2..2), &2));
1212 iter.advance().await.unwrap();
1213 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1214 assert_eq!((key, value), (&TestKey(4..4), &4));
1215 iter.advance().await.unwrap();
1216 assert!(iter.get().is_none());
1217 }
1218
1219 #[fuchsia::test]
1222 async fn test_merge_into_replace_and_discard() {
1223 let skip_list = SkipListLayer::new(100);
1224 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(3..3), 3)];
1225 skip_list.insert(items[0].clone()).expect("insert error");
1226
1227 skip_list.merge_into(items[1].clone(), &items[0].key, |_left, _right| MergeResult::Other {
1228 emit: Some(Item::new(TestKey(2..2), 2).boxed()),
1229 left: Replace(Item::new(TestKey(4..4), 4).boxed()),
1230 right: Discard,
1231 });
1232
1233 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1234 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1235 assert_eq!((key, value), (&TestKey(2..2), &2));
1236 iter.advance().await.unwrap();
1237 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1238 assert_eq!((key, value), (&TestKey(4..4), &4));
1239 iter.advance().await.unwrap();
1240 assert!(iter.get().is_none());
1241 }
1242
1243 #[fuchsia::test]
1246 async fn test_merge_into_replace_merge_item() {
1247 let skip_list = SkipListLayer::new(100);
1248 let items =
1249 [Item::new(TestKey(1..1), 1), Item::new(TestKey(3..3), 3), Item::new(TestKey(5..5), 5)];
1250 skip_list.insert(items[0].clone()).expect("insert error");
1251 skip_list.insert(items[2].clone()).expect("insert error");
1252
1253 skip_list.merge_into(items[1].clone(), &items[0].key, |_left, right| {
1254 if right.key() == &TestKey(3..3) {
1255 MergeResult::Other {
1256 emit: None,
1257 left: Discard,
1258 right: Replace(Item::new(TestKey(2..2), 2).boxed()),
1259 }
1260 } else {
1261 assert_eq!(right.key(), &TestKey(5..5));
1262 MergeResult::Other {
1263 emit: None,
1264 left: Replace(Item::new(TestKey(4..4), 4).boxed()),
1265 right: Discard,
1266 }
1267 }
1268 });
1269
1270 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1271 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1272 assert_eq!((key, value), (&TestKey(4..4), &4));
1273 iter.advance().await.unwrap();
1274 assert!(iter.get().is_none());
1275 }
1276
1277 #[fuchsia::test]
1279 async fn test_merge_into_replace_existing() {
1280 let skip_list = SkipListLayer::new(100);
1281 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(3..3), 3)];
1282 skip_list.insert(items[1].clone()).expect("insert error");
1283
1284 skip_list.merge_into(items[0].clone(), &items[0].key, |_left, right| {
1285 if right.key() == &TestKey(3..3) {
1286 MergeResult::Other {
1287 emit: None,
1288 left: Keep,
1289 right: Replace(Item::new(TestKey(2..2), 2).boxed()),
1290 }
1291 } else {
1292 MergeResult::EmitLeft
1293 }
1294 });
1295
1296 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1297 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1298 assert_eq!((key, value), (&items[0].key, &items[0].value));
1299 iter.advance().await.unwrap();
1300 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1301 assert_eq!((key, value), (&TestKey(2..2), &2));
1302 iter.advance().await.unwrap();
1303 assert!(iter.get().is_none());
1304 }
1305
1306 #[fuchsia::test]
1307 async fn test_merge_into_discard_last() {
1308 let skip_list = SkipListLayer::new(100);
1309 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
1310 skip_list.insert(items[0].clone()).expect("insert error");
1311
1312 skip_list.merge_into(items[1].clone(), &items[0].key, |_left, _right| MergeResult::Other {
1313 emit: None,
1314 left: Discard,
1315 right: Keep,
1316 });
1317
1318 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1319 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1320 assert_eq!((key, value), (&items[1].key, &items[1].value));
1321 iter.advance().await.unwrap();
1322 assert!(iter.get().is_none());
1323 }
1324
1325 #[fuchsia::test]
1326 async fn test_merge_into_empty() {
1327 let skip_list = SkipListLayer::new(100);
1328 let items = [Item::new(TestKey(1..1), 1)];
1329
1330 skip_list.merge_into(items[0].clone(), &items[0].key, |_left, _right| {
1331 panic!("Unexpected merge!");
1332 });
1333
1334 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1335 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1336 assert_eq!((key, value), (&items[0].key, &items[0].value));
1337 iter.advance().await.unwrap();
1338 assert!(iter.get().is_none());
1339 }
1340
1341 #[fuchsia::test]
1342 async fn test_seek_uses_minimum_number_of_iterators() {
1343 let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1344 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(1..1), 2)];
1345 skip_lists[0].insert(items[0].clone()).expect("insert error");
1346 skip_lists[1].insert(items[1].clone()).expect("insert error");
1347 let mut merger = Merger::new(
1348 layer_ref_iter(&skip_lists),
1349 |_left, _right| MergeResult::Other { emit: None, left: Discard, right: Keep },
1350 counters(),
1351 );
1352 let iter = merger.query(Query::FullRange(&items[0].key)).await.expect("seek failed");
1353
1354 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1357 assert_eq!((key, value), (&items[0].key, &items[0].value));
1358 }
1359
1360 async fn test_advance<K: Eq + Key + LayerKey + OrdLowerBound>(
1363 layers: &[&[(K, i32)]],
1364 query: Query<'_, K>,
1365 expected: &[(K, i32)],
1366 ) {
1367 let mut skip_lists = Vec::new();
1368 for &layer in layers {
1369 let skip_list = SkipListLayer::new(100);
1370 for (k, v) in layer {
1371 skip_list.insert(Item::new(k.clone(), *v)).expect("insert error");
1372 }
1373 skip_lists.push(skip_list);
1374 }
1375 let mut merger = Merger::new(
1376 layer_ref_iter(&skip_lists),
1377 |_left, _right| MergeResult::EmitLeft,
1378 counters(),
1379 );
1380 let mut iter = merger.query(query).await.expect("seek failed");
1381 for (k, v) in expected {
1382 let ItemRef { key, value, .. } = iter.get().expect("get failed");
1383 assert_eq!((key, value), (k, v));
1384 iter.advance().await.expect("advance failed");
1385 }
1386 assert!(iter.get().is_none());
1387 }
1388
1389 #[fuchsia::test]
1390 async fn test_seek_skips_replaced_items() {
1391 test_advance(
1393 &[
1394 &[(TestKey(1..2), 1), (TestKey(2..3), 2), (TestKey(4..5), 3)],
1395 &[(TestKey(1..2), 4), (TestKey(2..3), 5), (TestKey(3..4), 6)],
1396 ],
1397 Query::FullRange(&TestKey(1..2)),
1398 &[(TestKey(1..2), 1), (TestKey(2..3), 2), (TestKey(3..4), 6), (TestKey(4..5), 3)],
1399 )
1400 .await;
1401 }
1402
1403 #[fuchsia::test]
1404 async fn test_advance_skips_replaced_items_at_end() {
1405 test_advance(
1408 &[&[(TestKey(1..2), 1)], &[(TestKey(1..2), 2)]],
1409 Query::FullRange(&TestKey(1..2)),
1410 &[(TestKey(1..2), 1)],
1411 )
1412 .await;
1413 }
1414
1415 #[derive(
1416 Clone,
1417 Eq,
1418 Hash,
1419 FuzzyHash,
1420 PartialEq,
1421 Debug,
1422 serde::Serialize,
1423 serde::Deserialize,
1424 TypeFingerprint,
1425 Versioned,
1426 )]
1427 struct TestKeyWithFullMerge(Range<u64>);
1428
1429 versioned_type! { 1.. => TestKeyWithFullMerge }
1430
1431 impl LayerKey for TestKeyWithFullMerge {
1432 fn merge_type(&self) -> MergeType {
1433 MergeType::FullMerge
1434 }
1435 }
1436
1437 impl SortByU64 for TestKeyWithFullMerge {
1438 fn get_leading_u64(&self) -> u64 {
1439 self.0.start
1440 }
1441 }
1442
1443 impl OrdUpperBound for TestKeyWithFullMerge {
1444 fn cmp_upper_bound(&self, other: &TestKeyWithFullMerge) -> std::cmp::Ordering {
1445 self.0.end.cmp(&other.0.end)
1446 }
1447 }
1448
1449 impl OrdLowerBound for TestKeyWithFullMerge {
1450 fn cmp_lower_bound(&self, other: &Self) -> std::cmp::Ordering {
1451 self.0.start.cmp(&other.0.start)
1452 }
1453 }
1454
1455 #[fuchsia::test]
1458 async fn test_full_merge_consistent_advance_ordering() {
1459 let layer_set = [
1460 [
1461 (TestKeyWithFullMerge(1..2), 1i32),
1462 (TestKeyWithFullMerge(2..3), 2i32),
1463 (TestKeyWithFullMerge(4..5), 3i32),
1464 ]
1465 .as_slice(),
1466 [
1467 (TestKeyWithFullMerge(1..2), 4i32),
1468 (TestKeyWithFullMerge(2..3), 5i32),
1469 (TestKeyWithFullMerge(3..4), 6i32),
1470 ]
1471 .as_slice(),
1472 ];
1473
1474 let full_merge_result = [
1475 (TestKeyWithFullMerge(1..2), 1),
1476 (TestKeyWithFullMerge(1..2), 4),
1477 (TestKeyWithFullMerge(2..3), 2),
1478 (TestKeyWithFullMerge(2..3), 5),
1479 (TestKeyWithFullMerge(3..4), 6),
1480 (TestKeyWithFullMerge(4..5), 3),
1481 ];
1482
1483 test_advance(layer_set.as_slice(), Query::FullScan, &full_merge_result).await;
1484
1485 test_advance(
1486 layer_set.as_slice(),
1487 Query::FullRange(&TestKeyWithFullMerge(1..2)),
1488 &full_merge_result,
1489 )
1490 .await;
1491
1492 test_advance(
1493 layer_set.as_slice(),
1494 Query::FullRange(&TestKeyWithFullMerge(2..3)),
1495 &full_merge_result[2..],
1496 )
1497 .await;
1498
1499 test_advance(
1500 layer_set.as_slice(),
1501 Query::FullRange(&TestKeyWithFullMerge(3..4)),
1502 &full_merge_result[4..],
1503 )
1504 .await;
1505 }
1506
1507 #[fuchsia::test]
1508 async fn test_full_merge_always_consult_all_layers() {
1509 let skip_lists =
1513 [SkipListLayer::new(100), SkipListLayer::new(100), SkipListLayer::new(100)];
1514 let items = [
1515 Item::new(TestKeyWithFullMerge(1..2), 1),
1516 Item::new(TestKeyWithFullMerge(2..3), 2),
1517 Item::new(TestKeyWithFullMerge(1..2), 3),
1518 Item::new(TestKeyWithFullMerge(2..3), 4),
1519 ];
1520 skip_lists[0].insert(items[0].clone()).expect("insert error");
1521 skip_lists[1].insert(items[1].clone()).expect("insert error");
1522 skip_lists[2].insert(items[2].clone()).expect("insert error");
1523 skip_lists[2].insert(items[3].clone()).expect("insert error");
1524 let mut merger = Merger::new(
1525 layer_ref_iter(&skip_lists),
1526 |left, right| {
1527 if left.key() == right.key() {
1529 MergeResult::Other {
1530 emit: None,
1531 left: Discard,
1532 right: Replace(
1533 Item::new(left.key().clone(), left.value() + right.value()).boxed(),
1534 ),
1535 }
1536 } else {
1537 MergeResult::EmitLeft
1538 }
1539 },
1540 counters(),
1541 );
1542 let mut iter = merger.query(Query::FullRange(&items[0].key)).await.expect("seek failed");
1543
1544 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1545 assert_eq!((key, *value), (&items[0].key, items[0].value + items[2].value));
1546 iter.advance().await.expect("advance");
1547 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1548 assert_eq!((key, *value), (&items[1].key, items[1].value + items[3].value));
1549 iter.advance().await.expect("advance");
1550 assert!(iter.get().is_none());
1551 }
1552
1553 #[derive(
1554 Clone,
1555 Eq,
1556 Hash,
1557 FuzzyHash,
1558 PartialEq,
1559 Debug,
1560 serde::Serialize,
1561 serde::Deserialize,
1562 TypeFingerprint,
1563 Versioned,
1564 )]
1565 struct TestKeyWithDefaultLayerKey(Range<u64>);
1566
1567 versioned_type! { 1.. => TestKeyWithDefaultLayerKey }
1568
1569 impl LayerKey for TestKeyWithDefaultLayerKey {}
1571
1572 impl SortByU64 for TestKeyWithDefaultLayerKey {
1573 fn get_leading_u64(&self) -> u64 {
1574 self.0.start
1575 }
1576 }
1577
1578 impl OrdUpperBound for TestKeyWithDefaultLayerKey {
1579 fn cmp_upper_bound(&self, other: &TestKeyWithDefaultLayerKey) -> std::cmp::Ordering {
1580 self.0.end.cmp(&other.0.end)
1581 }
1582 }
1583
1584 impl OrdLowerBound for TestKeyWithDefaultLayerKey {
1585 fn cmp_lower_bound(&self, other: &Self) -> std::cmp::Ordering {
1586 self.0.start.cmp(&other.0.start)
1587 }
1588 }
1589
1590 #[fuchsia::test]
1591 async fn test_no_merge_unbounded_include_all_layers() {
1592 test_advance(
1593 &[
1594 &[
1595 (TestKeyWithDefaultLayerKey(1..2), 1),
1596 (TestKeyWithDefaultLayerKey(2..3), 2),
1597 (TestKeyWithDefaultLayerKey(4..5), 3),
1598 ],
1599 &[
1600 (TestKeyWithDefaultLayerKey(1..2), 4),
1601 (TestKeyWithDefaultLayerKey(2..3), 5),
1602 (TestKeyWithDefaultLayerKey(3..4), 6),
1603 ],
1604 ],
1605 Query::FullScan,
1606 &[
1607 (TestKeyWithDefaultLayerKey(1..2), 1),
1608 (TestKeyWithDefaultLayerKey(1..2), 4),
1609 (TestKeyWithDefaultLayerKey(2..3), 2),
1610 (TestKeyWithDefaultLayerKey(2..3), 5),
1611 (TestKeyWithDefaultLayerKey(3..4), 6),
1612 (TestKeyWithDefaultLayerKey(4..5), 3),
1613 ],
1614 )
1615 .await;
1616 }
1617
1618 #[fuchsia::test]
1619 async fn test_no_merge_proceeds_comprehensively_after_seek() {
1620 test_advance(
1621 &[
1622 &[
1623 (TestKeyWithDefaultLayerKey(1..2), 1),
1624 (TestKeyWithDefaultLayerKey(2..3), 2),
1625 (TestKeyWithDefaultLayerKey(4..5), 3),
1626 ],
1627 &[
1628 (TestKeyWithDefaultLayerKey(1..2), 4),
1629 (TestKeyWithDefaultLayerKey(2..3), 5),
1630 (TestKeyWithDefaultLayerKey(3..4), 6),
1631 ],
1632 ],
1633 Query::FullRange(&TestKeyWithDefaultLayerKey(1..2)),
1634 &[
1635 (TestKeyWithDefaultLayerKey(1..2), 1),
1636 (TestKeyWithDefaultLayerKey(1..2), 4),
1637 (TestKeyWithDefaultLayerKey(2..3), 2),
1638 (TestKeyWithDefaultLayerKey(2..3), 5),
1639 (TestKeyWithDefaultLayerKey(3..4), 6),
1640 (TestKeyWithDefaultLayerKey(4..5), 3),
1641 ],
1642 )
1643 .await;
1644 }
1645
1646 #[fuchsia::test]
1647 async fn test_no_merge_seek_finds_lower_layer() {
1648 let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1649 let items = [
1650 Item::new(TestKeyWithDefaultLayerKey(2..3), 1),
1651 Item::new(TestKeyWithDefaultLayerKey(1..1), 2),
1652 ];
1653 skip_lists[0].insert(items[0].clone()).expect("insert error");
1654 skip_lists[1].insert(items[1].clone()).expect("insert error");
1655 let mut merger = Merger::new(
1656 layer_ref_iter(&skip_lists),
1657 |_left, _right| MergeResult::EmitLeft,
1658 counters(),
1659 );
1660 let iter = merger.query(Query::FullRange(&items[1].key)).await.expect("seek failed");
1661
1662 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1663 assert_eq!((key, value), (&items[1].key, &items[1].value));
1664 }
1665
1666 #[fuchsia::test]
1667 async fn test_no_merge_seek_stops_at_exact_match() {
1668 let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1669 let items = [
1670 Item::new(TestKeyWithDefaultLayerKey(2..3), 1),
1671 Item::new(TestKeyWithDefaultLayerKey(1..4), 2),
1672 ];
1673 skip_lists[0].insert(items[0].clone()).expect("insert error");
1674 skip_lists[1].insert(items[1].clone()).expect("insert error");
1675 let mut merger = Merger::new(
1676 layer_ref_iter(&skip_lists),
1677 |_left, _right| MergeResult::Other { emit: None, left: Discard, right: Keep },
1678 counters(),
1679 );
1680 let iter = merger.query(Query::FullRange(&items[0].key)).await.expect("seek failed");
1681
1682 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1685 assert_eq!((key, value), (&items[0].key, &items[0].value));
1686 }
1687
1688 #[fuchsia::test]
1689 async fn test_seek_less_than() {
1690 let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1691 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
1692 skip_lists[0].insert(items[0].clone()).expect("insert error");
1693 skip_lists[1].insert(items[1].clone()).expect("insert error");
1694 let mut merger = Merger::new(
1696 layer_ref_iter(&skip_lists),
1697 |_left, _right| MergeResult::Other { emit: None, left: Discard, right: Keep },
1698 counters(),
1699 );
1700 let iter = merger.query(Query::FullRange(&TestKey(0..0))).await.expect("seek failed");
1701
1702 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1704 assert_eq!((key, value), (&items[1].key, &items[1].value));
1705 }
1706
1707 #[fuchsia::test]
1708 async fn test_seek_to_end() {
1709 let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1710 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
1711 skip_lists[0].insert(items[0].clone()).expect("insert error");
1712 skip_lists[1].insert(items[1].clone()).expect("insert error");
1713 let mut merger = Merger::new(
1714 layer_ref_iter(&skip_lists),
1715 |_left, _right| MergeResult::Other { emit: None, left: Discard, right: Keep },
1716 counters(),
1717 );
1718 let iter = merger.query(Query::FullRange(&TestKey(3..3))).await.expect("seek failed");
1719
1720 assert!(iter.get().is_none());
1721 }
1722
1723 #[fuchsia::test]
1724 async fn test_merge_all_discarded() {
1725 let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1726 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
1727 skip_lists[0].insert(items[1].clone()).expect("insert error");
1728 skip_lists[1].insert(items[0].clone()).expect("insert error");
1729 let mut merger = Merger::new(
1730 layer_ref_iter(&skip_lists),
1731 |_left, _right| MergeResult::Other { emit: None, left: Discard, right: Discard },
1732 counters(),
1733 );
1734 let iter = merger.query(Query::FullScan).await.expect("seek failed");
1735 assert!(iter.get().is_none());
1736 }
1737
1738 #[fuchsia::test]
1739 async fn test_seek_with_merged_key_less_than() {
1740 let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1741 let items = [Item::new(TestKey(1..8), 1), Item::new(TestKey(2..10), 2)];
1742 skip_lists[0].insert(items[0].clone()).expect("insert error");
1743 skip_lists[1].insert(items[1].clone()).expect("insert error");
1744 let mut merger = Merger::new(
1745 layer_ref_iter(&skip_lists),
1746 |left, _right| {
1747 if left.key() == &TestKey(1..8) {
1748 MergeResult::Other {
1749 emit: None,
1750 left: Replace(Item::new(TestKey(1..2), 1).boxed()),
1751 right: Keep,
1752 }
1753 } else {
1754 MergeResult::EmitLeft
1755 }
1756 },
1757 counters(),
1758 );
1759 let iter = merger.query(Query::FullRange(&TestKey(0..3))).await.expect("seek failed");
1760 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1761 assert_eq!((key, value), (&items[1].key, &items[1].value));
1762 }
1763
1764 #[fuchsia::test]
1765 async fn test_overlapping_keys() {
1766 let skip_lists =
1767 [SkipListLayer::new(100), SkipListLayer::new(100), SkipListLayer::new(100)];
1768 let items = [
1769 Item::new(TestKey(0..10), 1),
1770 Item::new(TestKey(0..20), 2),
1771 Item::new(TestKey(0..30), 3),
1772 ];
1773 skip_lists[0].insert(items[0].clone()).expect("insert error");
1774 skip_lists[1].insert(items[1].clone()).expect("insert error");
1775 skip_lists[2].insert(items[2].clone()).expect("insert error");
1776 let mut merger = Merger::new(
1777 layer_ref_iter(&skip_lists),
1778 |left, right| {
1779 let result = if left.key().0.end <= right.key().0.start {
1780 MergeResult::EmitLeft
1781 } else {
1782 if left.key() == &TestKey(0..30) && right.key() == &TestKey(10..20) {
1783 MergeResult::Other {
1784 emit: Some(Item::new(TestKey(0..10), 1).boxed()),
1785 left: Replace(Item::new(TestKey(10..30), 1).boxed()),
1786 right: Keep,
1787 }
1788 } else {
1789 MergeResult::Other {
1790 emit: None,
1791 left: Keep,
1792 right: Replace(
1793 Item::new(TestKey(left.key().0.end..right.key().0.end), 1).boxed(),
1794 ),
1795 }
1796 }
1797 };
1798 result
1799 },
1800 counters(),
1801 );
1802 let mut iter = merger.query(Query::FullRange(&TestKey(0..1))).await.expect("seek failed");
1803 let ItemRef { key, .. } = iter.get().expect("missing item");
1804 assert_eq!(key, &TestKey(0..10));
1805 iter.advance().await.expect("advance failed");
1806 let ItemRef { key, .. } = iter.get().expect("missing item");
1807 assert_eq!(key, &TestKey(10..20));
1808 iter.advance().await.expect("advance failed");
1809 let ItemRef { key, .. } = iter.get().expect("missing item");
1810 assert_eq!(key, &TestKey(20..30));
1811 iter.advance().await.expect("advance failed");
1812 assert_eq!(iter.get(), None);
1813 }
1814
1815 async fn write_layer<K: Key, V: Value>(items: Vec<Item<K, V>>) -> Arc<dyn Layer<K, V>> {
1816 let object = Arc::new(FakeObject::new());
1817 let write_handle = FakeObjectHandle::new(object.clone());
1818 let mut writer =
1819 PersistentLayerWriter::<_, K, V>::new(Writer::new(&write_handle).await, 1, 512)
1820 .await
1821 .expect("PersistentLayerWriter::new failed");
1822 for item in items {
1823 writer.write(item.as_item_ref()).await.expect("write failed");
1824 }
1825 writer.flush().await.expect("flush failed");
1826 PersistentLayer::open(FakeObjectHandle::new(object))
1827 .await
1828 .expect("open_persistent_layer failed")
1829 }
1830
1831 fn merge_sum(
1832 left: &MergeLayerIterator<'_, i32, i32>,
1833 right: &MergeLayerIterator<'_, i32, i32>,
1834 ) -> MergeResult<i32, i32> {
1835 if left.key() == right.key() {
1837 MergeResult::Other {
1838 emit: None,
1839 left: Discard,
1840 right: Replace(Item::new(left.key().clone(), left.value() + right.value()).boxed()),
1841 }
1842 } else {
1843 MergeResult::EmitLeft
1844 }
1845 }
1846
1847 #[fuchsia::test]
1848 async fn test_merge_bloom_filters_point_query() {
1849 let layer_0_items = vec![Item::new(1, 1), Item::new(2, 1)];
1850 let layer_1_items = vec![Item::new(2, 1), Item::new(4, 1)];
1851 let items = [Item::new(1, 1), Item::new(2, 2), Item::new(4, 1)];
1852 let layers: [Arc<dyn Layer<i32, i32>>; 2] =
1853 [write_layer(layer_0_items).await, write_layer(layer_1_items).await];
1854 let mut merger = Merger::new(dyn_layer_ref_iter(&layers), merge_sum, counters());
1855
1856 {
1857 let iter = merger.query(Query::Point(&1)).await.expect("seek failed");
1859 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1860 assert_eq!((key, *value), (&items[0].key, items[0].value));
1861 }
1862 {
1863 let iter = merger.query(Query::Point(&2)).await.expect("seek failed");
1865 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1866 assert_eq!((key, *value), (&items[1].key, items[1].value));
1867 }
1868 {
1869 let iter = merger.query(Query::Point(&4)).await.expect("seek failed");
1871 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1872 assert_eq!((key, *value), (&items[2].key, items[2].value));
1873 }
1874 {
1875 let iter = merger.query(Query::Point(&400)).await.expect("seek failed");
1877 assert!(iter.get().is_none());
1878 }
1879 }
1880
1881 #[fuchsia::test]
1882 async fn test_merge_bloom_filters_limited_range() {
1883 let layer_0_items = vec![Item::new(
1886 ObjectKey::extent(0, 0, 0..2048),
1887 ObjectValue::extent(0, VOLUME_DATA_KEY_ID),
1888 )];
1889 let layer_1_items = vec![
1890 Item::new(
1891 ObjectKey::extent(0, 0, 1024..4096),
1892 ObjectValue::extent(32768, VOLUME_DATA_KEY_ID),
1893 ),
1894 Item::new(
1895 ObjectKey::extent(0, 0, 16384..17408),
1896 ObjectValue::extent(65536, VOLUME_DATA_KEY_ID),
1897 ),
1898 ];
1899 let items = [
1900 Item::new(ObjectKey::extent(0, 0, 0..2048), ObjectValue::extent(0, VOLUME_DATA_KEY_ID)),
1901 Item::new(
1902 ObjectKey::extent(0, 0, 2048..4096),
1903 ObjectValue::extent(33792, VOLUME_DATA_KEY_ID),
1904 ),
1905 Item::new(
1906 ObjectKey::extent(0, 0, 16384..17408),
1907 ObjectValue::extent(65536, VOLUME_DATA_KEY_ID),
1908 ),
1909 ];
1910 let layers: [Arc<dyn Layer<ObjectKey, ObjectValue>>; 2] =
1911 [write_layer(layer_0_items).await, write_layer(layer_1_items).await];
1912 let mut merger =
1913 Merger::new(dyn_layer_ref_iter(&layers), object_store::merge::merge, counters());
1914
1915 {
1916 let mut iter = merger
1918 .query(Query::LimitedRange(&ObjectKey::extent(0, 0, 16384..16386)))
1919 .await
1920 .expect("seek failed");
1921 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1922 assert_eq!(key, &items[2].key);
1923 assert_eq!(value, &items[2].value);
1924 iter.advance().await.expect("advance");
1925 assert!(iter.get().is_none());
1926 }
1927 {
1928 let mut iter = merger
1930 .query(Query::LimitedRange(&ObjectKey::extent(0, 0, 0..4096)))
1931 .await
1932 .expect("seek failed");
1933 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1934 assert_eq!(key, &items[0].key);
1935 assert_eq!(value, &items[0].value);
1936 iter.advance().await.expect("advance");
1937 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1938 assert_eq!(key, &items[1].key);
1939 assert_eq!(value, &items[1].value);
1940 iter.advance().await.expect("advance");
1941 }
1942 {
1943 let mut iter = merger
1945 .query(Query::LimitedRange(&ObjectKey::extent(0, 0, 8192..12288)))
1946 .await
1947 .expect("seek failed");
1948 let ItemRef { key, .. } = iter.get().expect("missing item");
1949 assert_eq!(key, &items[2].key);
1950 iter.advance().await.expect("advance");
1951 assert!(iter.get().is_none());
1952 }
1953 }
1954
1955 #[fuchsia::test]
1956 async fn test_merge_bloom_filters_full_range() {
1957 let layer_0_items = vec![Item::new(1, 1), Item::new(2, 1)];
1958 let layer_1_items = vec![Item::new(2, 1), Item::new(4, 1)];
1959 let items = [Item::new(1, 1), Item::new(2, 2), Item::new(4, 1)];
1960 let layers: [Arc<dyn Layer<i32, i32>>; 2] =
1961 [write_layer(layer_0_items).await, write_layer(layer_1_items).await];
1962 let mut merger = Merger::new(dyn_layer_ref_iter(&layers), merge_sum, counters());
1963
1964 let mut iter = merger.query(Query::FullRange(&0)).await.expect("seek failed");
1965 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1966 assert_eq!((key, *value), (&items[0].key, items[0].value));
1967 iter.advance().await.expect("advance");
1968 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1969 assert_eq!((key, *value), (&items[1].key, items[1].value));
1970 iter.advance().await.expect("advance");
1971 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1972 assert_eq!((key, *value), (&items[2].key, items[2].value));
1973 iter.advance().await.expect("advance");
1974 assert!(iter.get().is_none());
1975 }
1976
1977 #[fuchsia::test]
1978 async fn test_merge_bloom_filters_full_scan() {
1979 let layer_0_items = vec![Item::new(1, 1), Item::new(2, 1)];
1980 let layer_1_items = vec![Item::new(2, 1), Item::new(4, 1)];
1981 let items = [Item::new(1, 1), Item::new(2, 2), Item::new(4, 1)];
1982 let layers: [Arc<dyn Layer<i32, i32>>; 2] =
1983 [write_layer(layer_0_items).await, write_layer(layer_1_items).await];
1984 let mut merger = Merger::new(dyn_layer_ref_iter(&layers), merge_sum, counters());
1985
1986 let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
1987 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1988 assert_eq!((key, *value), (&items[0].key, items[0].value));
1989 iter.advance().await.expect("advance");
1990 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1991 assert_eq!((key, *value), (&items[1].key, items[1].value));
1992 iter.advance().await.expect("advance");
1993 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1994 assert_eq!((key, *value), (&items[2].key, items[2].value));
1995 iter.advance().await.expect("advance");
1996 assert!(iter.get().is_none());
1997 }
1998}