1use crate::log::*;
6use crate::lsm_tree;
7use crate::lsm_tree::types::{
8 BoxedItem, Item, ItemRef, Key, Layer, LayerIterator, LayerIteratorMut, LayerKey, MergeType,
9 OrdLowerBound, Value,
10};
11use anyhow::Error;
12use async_trait::async_trait;
13use fuchsia_sync::Mutex;
14use futures::try_join;
15use std::cmp::Ordering;
16use std::collections::BinaryHeap;
17use std::fmt::{Debug, Write};
18use std::ops::{Bound, Deref, DerefMut};
19use std::sync::Arc;
20
21#[derive(Debug, Eq, PartialEq)]
22pub enum ItemOp<K, V> {
23 Keep,
25
26 Discard,
28
29 Replace(BoxedItem<K, V>),
32}
33
34#[derive(Debug, Eq, PartialEq)]
35pub enum MergeResult<K, V> {
36 EmitLeft,
39
40 Other { emit: Option<BoxedItem<K, V>>, left: ItemOp<K, V>, right: ItemOp<K, V> },
61}
62
63pub type MergeFn<K, V> =
68 fn(&MergeLayerIterator<'_, K, V>, &MergeLayerIterator<'_, K, V>) -> MergeResult<K, V>;
69
70pub enum MergeItem<K, V> {
71 None,
72 Item(BoxedItem<K, V>),
73 Iter,
74}
75
76enum RawIterator<'a, K, V> {
77 None,
78 Const(Box<dyn LayerIterator<K, V> + 'a>),
79 Mut(Box<dyn LayerIteratorMut<K, V> + 'a>),
80}
81
82unsafe impl<K, V> Send for RawIterator<'_, K, V> {}
85
86pub struct MergeLayerIterator<'a, K, V> {
89 layer: Option<&'a dyn Layer<K, V>>,
90
91 iter: RawIterator<'a, K, V>,
93
94 pub layer_index: u16,
96
97 item: MergeItem<K, V>,
99}
100
101impl<'a, K, V> MergeLayerIterator<'a, K, V> {
102 pub fn key(&self) -> &K {
103 self.item().key
104 }
105
106 pub fn value(&self) -> &V {
107 self.item().value
108 }
109
110 pub fn sequence(&self) -> u64 {
111 self.item().sequence
112 }
113
114 fn new(layer_index: u16, layer: &'a dyn Layer<K, V>) -> Self {
115 MergeLayerIterator {
116 layer: Some(layer),
117 iter: RawIterator::None,
118 layer_index,
119 item: MergeItem::None,
120 }
121 }
122
123 fn new_with_item(layer_index: u16, item: MergeItem<K, V>) -> Self {
124 MergeLayerIterator { layer: None, iter: RawIterator::None, layer_index, item }
125 }
126
127 fn item(&self) -> ItemRef<'_, K, V> {
128 match &self.item {
129 MergeItem::None => panic!("No item!"),
130 MergeItem::Item(item) => ItemRef::from(item),
131 MergeItem::Iter => self.get().unwrap(),
132 }
133 }
134
135 fn get(&self) -> Option<ItemRef<'_, K, V>> {
136 match &self.iter {
137 RawIterator::None => panic!("No iterator!"),
138 RawIterator::Const(iter) => iter.get(),
139 RawIterator::Mut(iter) => iter.get(),
140 }
141 }
142
143 fn set_item_from_iter(&mut self) {
144 self.item = if match &self.iter {
145 RawIterator::None => unreachable!(),
146 RawIterator::Const(iter) => iter.get(),
147 RawIterator::Mut(iter) => iter.get(),
148 }
149 .is_some()
150 {
151 MergeItem::Iter
152 } else {
153 MergeItem::None
154 };
155 }
156
157 fn take_item(&mut self) -> Option<BoxedItem<K, V>> {
158 if let MergeItem::Item(_) = self.item {
159 let mut item = MergeItem::None;
160 std::mem::swap(&mut self.item, &mut item);
161 if let MergeItem::Item(item) = item {
162 Some(item)
163 } else {
164 unreachable!();
165 }
166 } else {
167 None
168 }
169 }
170
171 async fn advance(&mut self) -> Result<(), Error> {
172 if let MergeItem::Iter = self.item {
173 if let RawIterator::Const(iter) = &mut self.iter {
174 iter.advance().await?;
175 } else {
176 unreachable!();
178 }
179 }
180 self.set_item_from_iter();
181 Ok(())
182 }
183
184 fn replace(&mut self, item: BoxedItem<K, V>) {
185 self.item = MergeItem::Item(item);
186 }
187
188 fn is_some(&self) -> bool {
189 !matches!(self.item, MergeItem::None)
190 }
191
192 async fn maybe_advance(&mut self, op: &ItemOp<K, V>) -> Result<(), Error> {
195 if let ItemOp::Keep = op {
196 Ok(())
197 } else {
198 self.advance().await
199 }
200 }
201}
202
203impl<K: OrdLowerBound, V> Ord for MergeLayerIterator<'_, K, V> {
205 fn cmp(&self, other: &Self) -> Ordering {
206 other.key().cmp_lower_bound(self.key()).then(other.layer_index.cmp(&self.layer_index))
208 }
209}
210impl<K: OrdLowerBound, V> PartialOrd for MergeLayerIterator<'_, K, V> {
211 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
212 Some(self.cmp(other))
213 }
214}
215impl<K: OrdLowerBound, V> PartialEq for MergeLayerIterator<'_, K, V> {
216 fn eq(&self, other: &Self) -> bool {
217 self.cmp(other) == Ordering::Equal
218 }
219}
220impl<K: OrdLowerBound, V> Eq for MergeLayerIterator<'_, K, V> {}
221
222enum CurrentItem<'a, 'b, K, V> {
225 None,
226 Item(BoxedItem<K, V>),
227 Iterator(&'a mut MergeLayerIterator<'b, K, V>),
228}
229
230impl<'a, 'b, K, V> CurrentItem<'a, 'b, K, V> {
231 fn take_iterator(&mut self) -> Option<&'a mut MergeLayerIterator<'b, K, V>> {
234 if let CurrentItem::Iterator(_) = self {
235 let mut result = CurrentItem::None;
236 std::mem::swap(self, &mut result);
237 if let CurrentItem::Iterator(iter) = result {
238 Some(iter)
239 } else {
240 unreachable!();
241 }
242 } else {
243 None
244 }
245 }
246}
247
248impl<'a, K, V> From<&'a CurrentItem<'_, '_, K, V>> for Option<ItemRef<'a, K, V>> {
249 fn from(iter: &'a CurrentItem<'_, '_, K, V>) -> Option<ItemRef<'a, K, V>> {
250 match iter {
251 CurrentItem::None => None,
252 CurrentItem::Iterator(iterator) => Some(iterator.item()),
253 CurrentItem::Item(item) => Some(item.into()),
254 }
255 }
256}
257
258pub struct Merger<'a, K, V> {
260 iterators: Vec<MergeLayerIterator<'a, K, V>>,
262
263 merge_fn: MergeFn<K, V>,
265
266 trace: bool,
268
269 counters: Arc<Mutex<lsm_tree::Counters>>,
271}
272
273#[derive(Debug, Clone)]
280pub enum Query<'a, K: Key + LayerKey + OrdLowerBound> {
281 Point(&'a K),
286 LimitedRange(&'a K),
293 FullRange(&'a K),
296 FullScan,
299}
300
301impl<'a, K: Key + LayerKey + OrdLowerBound> Query<'a, K> {
302 fn needs_layer<V>(&self, layer: &dyn Layer<K, V>) -> bool {
303 match self {
304 Self::Point(key) => layer.maybe_contains_key(key),
305 Self::LimitedRange(key) => layer.maybe_contains_key(key),
306 Self::FullRange(_) => true,
307 Self::FullScan => true,
308 }
309 }
310}
311
312#[fxfs_trace::trace]
313impl<'a, K: Key + LayerKey + OrdLowerBound, V: Value> Merger<'a, K, V> {
314 pub(super) fn new<I: Iterator<Item = &'a dyn Layer<K, V>>>(
315 layers: I,
316 merge_fn: MergeFn<K, V>,
317 counters: Arc<Mutex<lsm_tree::Counters>>,
318 ) -> Merger<'a, K, V> {
319 Merger {
320 iterators: layers
321 .enumerate()
322 .map(|(index, layer)| MergeLayerIterator::new(index as u16, layer))
323 .collect(),
324 merge_fn: merge_fn,
325 trace: false,
326 counters,
327 }
328 }
329
330 #[trace]
333 pub async fn query(
334 &mut self,
335 query: Query<'_, K>,
336 ) -> Result<MergerIterator<'_, 'a, K, V>, Error> {
337 if let Query::Point(key) = query {
338 debug_assert!(!key.is_range_key())
346 };
347 let len = self.iterators.len();
348 let pending_iterators = {
349 fxfs_trace::duration!(c"Merger::filter_layer_files", "len" => len);
350 self.iterators
351 .iter_mut()
352 .rev()
353 .filter(|l| query.needs_layer(l.layer.clone().unwrap()))
354 .collect::<Vec<&mut MergeLayerIterator<'a, K, V>>>()
355 };
356 let layer_count = pending_iterators.len();
357 {
358 let mut counters = self.counters.lock();
359 counters.num_seeks += 1;
360 counters.layer_files_total += len;
361 counters.layer_files_skipped += len - layer_count;
362 }
363 log::debug!(query:?; "Consulting {}/{} layers", layer_count, len);
364 let mut merger_iter = MergerIterator {
365 merge_fn: self.merge_fn,
366 pending_iterators,
367 heap: BinaryHeap::with_capacity(layer_count),
368 item: CurrentItem::None,
369 trace: self.trace,
370 history: String::new(),
371 };
372 let owned_bound;
373 let bound = match query {
374 Query::Point(key) => Bound::Included(key),
375 Query::LimitedRange(key) => {
376 owned_bound = Bound::Included(key.search_key());
377 owned_bound.as_ref()
378 }
379 Query::FullRange(start) => Bound::Included(start),
380 Query::FullScan => Bound::Unbounded,
381 };
382 merger_iter.seek(bound).await?;
383 Ok(merger_iter)
384 }
385
386 pub fn set_trace(&mut self, v: bool) {
387 self.trace = v;
388 }
389}
390
391pub struct MergerIterator<'a, 'b, K, V> {
394 merge_fn: MergeFn<K, V>,
395
396 pending_iterators: Vec<&'a mut MergeLayerIterator<'b, K, V>>,
398
399 heap: BinaryHeap<&'a mut MergeLayerIterator<'b, K, V>>,
401
402 item: CurrentItem<'a, 'b, K, V>,
404
405 trace: bool,
407
408 history: String,
410}
411
412impl<'a, 'b, K: Key + LayerKey + OrdLowerBound, V: Value> MergerIterator<'a, 'b, K, V> {
413 async fn seek(&mut self, bound: Bound<&K>) -> Result<(), Error> {
414 let next_key = match bound {
415 Bound::Unbounded => None,
416 Bound::Included(key) => Some(key),
417 Bound::Excluded(_) => panic!("Excluded bounds not supported!"),
418 };
419
420 self.push_iterators(next_key, bound).await?;
421 self.advance_impl(bound).await
422 }
423
424 async fn advance_impl(&mut self, next_key_bound: Bound<&K>) -> Result<(), Error> {
430 loop {
431 loop {
432 if self.heap.is_empty() {
433 self.item = CurrentItem::None;
434 return Ok(());
435 }
436 let lowest = self.heap.pop().unwrap();
437 let maybe_second_lowest = self.heap.pop();
438 if let Some(second_lowest) = maybe_second_lowest {
439 let result = (self.merge_fn)(&lowest, &second_lowest);
440 if self.trace {
441 writeln!(
442 self.history,
443 "merge {:?}, {:?} -> {:?}",
444 lowest.item(),
445 second_lowest.item(),
446 result
447 )
448 .unwrap();
449 }
450 match result {
451 MergeResult::EmitLeft => {
452 self.heap.push(second_lowest);
453 self.item = CurrentItem::Iterator(lowest);
454 break;
455 }
456 MergeResult::Other { emit, left, right } => {
457 try_join!(
458 lowest.maybe_advance(&left),
459 second_lowest.maybe_advance(&right)
460 )?;
461 self.update_item(lowest, left);
462 self.update_item(second_lowest, right);
463 if let Some(emit) = emit {
464 self.item = CurrentItem::Item(emit);
465 break;
466 }
467 }
468 }
469 } else {
470 self.item = CurrentItem::Iterator(lowest);
471 break;
472 }
473 }
474
475 match next_key_bound {
495 Bound::Included(key)
496 if self.get().unwrap().key.cmp_upper_bound(key) == Ordering::Less => {}
497 Bound::Excluded(key)
498 if self.get().unwrap().key.cmp_upper_bound(key) != Ordering::Greater => {}
499 _ => return Ok(()),
500 }
501 if let Some(iterator) = self.item.take_iterator() {
502 iterator.advance().await?;
503 if iterator.is_some() {
504 self.heap.push(iterator);
505 }
506 }
507 }
508 }
509
510 fn needs_more_iterators(&self, next_key: Option<&K>, next_key_bound: Bound<&K>) -> bool {
512 if self.pending_iterators.is_empty() {
513 return false;
514 }
515 if self.heap.is_empty() {
516 return true;
517 }
518 let target;
519 match next_key_bound {
520 Bound::Included(k) => target = k,
521 Bound::Excluded(_) | Bound::Unbounded => return true,
522 }
523 match target.merge_type() {
524 MergeType::FullMerge => true,
525 MergeType::OptimizedMerge => {
526 match next_key {
527 Some(k) => {
528 self.heap.peek().unwrap().key().cmp_lower_bound(k) == Ordering::Greater
529 }
530 None => {
531 self.heap.peek().unwrap().key().cmp_lower_bound(target) != Ordering::Equal
533 }
534 }
535 }
536 }
537 }
538
539 async fn push_iterators(
545 &mut self,
546 next_key: Option<&K>,
547 next_key_bound: Bound<&K>,
548 ) -> Result<(), Error> {
549 while self.needs_more_iterators(next_key, next_key_bound) {
550 let iter = self.pending_iterators.pop().unwrap();
551 let sub_iter = iter.layer.as_ref().unwrap().seek(next_key_bound).await?;
552 if self.trace {
553 writeln!(
554 self.history,
555 "merger: search for {:?}, found {:?}",
556 next_key_bound,
557 sub_iter.get()
558 )
559 .unwrap();
560 }
561 iter.iter = RawIterator::Const(sub_iter);
562 iter.set_item_from_iter();
563 if iter.is_some() {
564 self.heap.push(iter);
565 }
566 }
567 Ok(())
568 }
569
570 fn update_item(&mut self, item: &'a mut MergeLayerIterator<'b, K, V>, op: ItemOp<K, V>) {
573 match op {
574 ItemOp::Keep => self.heap.push(item),
575 ItemOp::Discard => {
576 if item.is_some() {
578 self.heap.push(item);
579 }
580 }
581 ItemOp::Replace(replacement) => {
582 item.replace(replacement);
583 self.heap.push(item);
584 }
585 }
586 }
587}
588
589#[async_trait]
590impl<'a, K: Key + LayerKey + OrdLowerBound, V: Value> LayerIterator<K, V>
591 for MergerIterator<'a, '_, K, V>
592{
593 async fn advance(&mut self) -> Result<(), Error> {
594 let current_key;
595 let mut next_key = None;
596 let mut next_key_bound = Bound::Unbounded;
597 if !self.pending_iterators.is_empty() {
598 if let Some(ItemRef { key, .. }) = self.get() {
599 next_key = key.next_key();
600 match next_key {
601 None => {
602 current_key = Some(key.clone());
605 next_key_bound = Bound::Excluded(current_key.as_ref().unwrap());
606 }
607 Some(ref key) => next_key_bound = Bound::Included(key),
608 }
609 }
610 }
611
612 if let Some(iterator) = self.item.take_iterator() {
615 if self.needs_more_iterators(next_key.as_ref(), next_key_bound) {
616 let existing_item = iterator.item().boxed();
617 iterator.advance().await?;
618 match &next_key {
619 Some(next_key)
620 if iterator.is_some()
621 && iterator.key().cmp_lower_bound(next_key) != Ordering::Greater =>
622 {
623 }
627 _ => {
628 iterator.replace(existing_item);
632
633 self.push_iterators(next_key.as_ref(), next_key_bound).await?;
636 }
637 }
638 } else {
639 iterator.advance().await?;
640 }
641 if iterator.is_some() {
642 self.heap.push(iterator);
643 }
644 } else {
645 self.push_iterators(next_key.as_ref(), next_key_bound).await?;
646 }
647
648 self.advance_impl(next_key_bound).await
649 }
650
651 fn get(&self) -> Option<ItemRef<'_, K, V>> {
652 (&self.item).into()
653 }
654}
655
656struct MutMergeLayerIterator<'a, K, V>(MergeLayerIterator<'a, K, V>);
657
658impl<K, V> MutMergeLayerIterator<'_, K, V> {
659 fn advance(&mut self) {
660 if let MergeItem::Iter = self.item {
661 self.as_mut().advance();
662 }
663 self.set_item_from_iter();
664 }
665}
666
667impl<'a, K, V> AsMut<dyn LayerIteratorMut<K, V> + 'a> for MutMergeLayerIterator<'a, K, V> {
668 fn as_mut(&mut self) -> &mut (dyn LayerIteratorMut<K, V> + 'a) {
669 let RawIterator::Mut(iter) = &mut self.0.iter else { unreachable!() };
670 iter.as_mut()
671 }
672}
673
674impl<'a, K, V> Deref for MutMergeLayerIterator<'a, K, V> {
675 type Target = MergeLayerIterator<'a, K, V>;
676
677 fn deref(&self) -> &Self::Target {
678 &self.0
679 }
680}
681
682impl<K, V> DerefMut for MutMergeLayerIterator<'_, K, V> {
683 fn deref_mut(&mut self) -> &mut Self::Target {
684 &mut self.0
685 }
686}
687
688pub(super) fn merge_into<K: Debug + OrdLowerBound, V: Debug>(
690 mut_iter: Box<dyn LayerIteratorMut<K, V> + '_>,
691 item: Item<K, V>,
692 merge_fn: MergeFn<K, V>,
693) -> Result<(), Error> {
694 let merge_item = if mut_iter.get().is_some() { MergeItem::Iter } else { MergeItem::None };
695 let mut mut_merge_iter = MutMergeLayerIterator(MergeLayerIterator {
696 layer: None,
697 iter: RawIterator::Mut(mut_iter),
698 layer_index: 1,
699 item: merge_item,
700 });
701 let mut item_merge_iter = MergeLayerIterator::new_with_item(0, MergeItem::Item(item.boxed()));
702 while mut_merge_iter.is_some() && item_merge_iter.is_some() {
703 if mut_merge_iter.0 > item_merge_iter {
704 let merge_result = merge_fn(&mut_merge_iter, &item_merge_iter);
706 debug!(
707 lhs:? = mut_merge_iter.key(),
708 rhs:? = item_merge_iter.key(),
709 result:? = merge_result;
710 "(1) merge");
711 match merge_result {
712 MergeResult::EmitLeft => {
713 if let Some(item) = mut_merge_iter.take_item() {
714 mut_merge_iter.as_mut().insert(*item);
715 mut_merge_iter.set_item_from_iter();
716 } else {
717 mut_merge_iter.advance();
718 }
719 }
720 MergeResult::Other { emit, left, right } => {
721 if let Some(emit) = emit {
722 mut_merge_iter.as_mut().insert(*emit);
723 }
724 match left {
725 ItemOp::Keep => {}
726 ItemOp::Discard => {
727 if matches!(mut_merge_iter.item, MergeItem::Iter) {
728 mut_merge_iter.as_mut().erase();
729 }
730 mut_merge_iter.set_item_from_iter();
731 }
732 ItemOp::Replace(item) => {
733 if let MergeItem::Iter = mut_merge_iter.item {
734 mut_merge_iter.as_mut().erase();
735 }
736 mut_merge_iter.item = MergeItem::Item(item)
737 }
738 }
739 match right {
740 ItemOp::Keep => {}
741 ItemOp::Discard => item_merge_iter.item = MergeItem::None,
742 ItemOp::Replace(item) => item_merge_iter.item = MergeItem::Item(item),
743 }
744 }
745 }
746 } else {
747 let merge_result = merge_fn(&item_merge_iter, &mut_merge_iter);
749 debug!(
750 lhs:? = mut_merge_iter.key(),
751 rhs:? = item_merge_iter.key(),
752 result:? = merge_result;
753 "(2) merge");
754 match merge_result {
755 MergeResult::EmitLeft => break, MergeResult::Other { emit, left, right } => {
757 if let Some(emit) = emit {
758 mut_merge_iter.as_mut().insert(*emit);
759 }
760 match left {
761 ItemOp::Keep => {}
762 ItemOp::Discard => item_merge_iter.item = MergeItem::None,
763 ItemOp::Replace(item) => item_merge_iter.item = MergeItem::Item(item),
764 }
765 match right {
766 ItemOp::Keep => {}
767 ItemOp::Discard => {
768 if matches!(mut_merge_iter.item, MergeItem::Iter) {
769 mut_merge_iter.as_mut().erase();
770 }
771 mut_merge_iter.set_item_from_iter();
772 }
773 ItemOp::Replace(item) => {
774 if let MergeItem::Iter = mut_merge_iter.item {
775 mut_merge_iter.as_mut().erase();
776 }
777 mut_merge_iter.item = MergeItem::Item(item)
778 }
779 }
780 }
781 }
782 }
783 } if let MergeItem::Item(item) = item_merge_iter.item {
788 mut_merge_iter.as_mut().insert(*item);
789 }
790 if let Some(item) = mut_merge_iter.take_item() {
791 mut_merge_iter.as_mut().insert(*item);
792 }
793 if let RawIterator::Mut(mut iter) = mut_merge_iter.0.iter {
794 iter.commit();
795 }
796 Ok(())
797}
798
799#[cfg(test)]
800mod tests {
801 use super::ItemOp::{Discard, Keep, Replace};
802 use super::{MergeLayerIterator, MergeResult, Merger};
803 use crate::lsm_tree::persistent_layer::{PersistentLayer, PersistentLayerWriter};
804 use crate::lsm_tree::skip_list_layer::SkipListLayer;
805 use crate::lsm_tree::types::{
806 FuzzyHash, Item, ItemRef, Key, Layer, LayerIterator, LayerKey, LayerWriter, MergeType,
807 OrdLowerBound, OrdUpperBound, SortByU64,
808 };
809 use crate::lsm_tree::{self, Query, Value};
810 use crate::object_store::{self, ObjectKey, ObjectValue, VOLUME_DATA_KEY_ID};
811 use crate::serialized_types::{
812 versioned_type, Version, Versioned, VersionedLatest, LATEST_VERSION,
813 };
814 use crate::testing::fake_object::{FakeObject, FakeObjectHandle};
815 use crate::testing::writer::Writer;
816 use fprint::TypeFingerprint;
817 use fuchsia_sync::Mutex;
818 use fxfs_macros::FuzzyHash;
819 use rand::Rng;
820 use std::hash::Hash;
821 use std::ops::{Bound, Range};
822 use std::sync::Arc;
823
824 #[derive(
825 Clone,
826 Eq,
827 Hash,
828 FuzzyHash,
829 PartialEq,
830 Debug,
831 serde::Serialize,
832 serde::Deserialize,
833 TypeFingerprint,
834 Versioned,
835 )]
836 struct TestKey(Range<u64>);
837
838 impl Value for i32 {
839 const DELETED_MARKER: Self = 0;
840 }
841
842 versioned_type! { 1.. => TestKey }
843
844 impl SortByU64 for TestKey {
845 fn get_leading_u64(&self) -> u64 {
846 self.0.start
847 }
848 }
849
850 impl LayerKey for TestKey {
851 fn merge_type(&self) -> MergeType {
852 MergeType::OptimizedMerge
853 }
854
855 fn next_key(&self) -> Option<Self> {
856 Some(TestKey(self.0.end..self.0.end + 1))
857 }
858 }
859
860 impl OrdUpperBound for TestKey {
861 fn cmp_upper_bound(&self, other: &TestKey) -> std::cmp::Ordering {
862 self.0.end.cmp(&other.0.end)
863 }
864 }
865
866 impl OrdLowerBound for TestKey {
867 fn cmp_lower_bound(&self, other: &Self) -> std::cmp::Ordering {
868 self.0.start.cmp(&other.0.start)
869 }
870 }
871
872 fn layer_ref_iter<K: Key, V: Value>(
873 layers: &[Arc<SkipListLayer<K, V>>],
874 ) -> impl Iterator<Item = &dyn Layer<K, V>> {
875 layers.iter().map(|x| x.as_ref() as &dyn Layer<K, V>)
876 }
877
878 fn dyn_layer_ref_iter<K: Key, V: Value>(
879 layers: &[Arc<dyn Layer<K, V>>],
880 ) -> impl Iterator<Item = &dyn Layer<K, V>> {
881 layers.iter().map(|x| x.as_ref())
882 }
883
884 fn counters() -> Arc<Mutex<lsm_tree::Counters>> {
885 Arc::new(Mutex::new(lsm_tree::Counters::default()))
886 }
887
888 #[fuchsia::test]
889 async fn test_emit_left() {
890 let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
891 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
892 skip_lists[0].insert(items[1].clone()).expect("insert error");
893 skip_lists[1].insert(items[0].clone()).expect("insert error");
894 let mut merger = Merger::new(
895 layer_ref_iter(&skip_lists),
896 |_left, _right| MergeResult::EmitLeft,
897 counters(),
898 );
899 let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
900 let ItemRef { key, value, .. } = iter.get().expect("missing item");
901 assert_eq!((key, value), (&items[0].key, &items[0].value));
902 iter.advance().await.unwrap();
903 let ItemRef { key, value, .. } = iter.get().expect("missing item");
904 assert_eq!((key, value), (&items[1].key, &items[1].value));
905 iter.advance().await.unwrap();
906 assert!(iter.get().is_none());
907 }
908
909 #[fuchsia::test]
910 async fn test_other_emit() {
911 let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
912 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
913 skip_lists[0].insert(items[1].clone()).expect("insert error");
914 skip_lists[1].insert(items[0].clone()).expect("insert error");
915 let mut merger = Merger::new(
916 layer_ref_iter(&skip_lists),
917 |_left, _right| MergeResult::Other {
918 emit: Some(Item::new(TestKey(3..3), 3).boxed()),
919 left: Discard,
920 right: Discard,
921 },
922 counters(),
923 );
924 let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
925
926 let ItemRef { key, value, .. } = iter.get().expect("missing item");
927 assert_eq!((key, value), (&TestKey(3..3), &3));
928 iter.advance().await.unwrap();
929 assert!(iter.get().is_none());
930 }
931
932 #[fuchsia::test]
933 async fn test_replace_left() {
934 let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
935 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
936 skip_lists[0].insert(items[1].clone()).expect("insert error");
937 skip_lists[1].insert(items[0].clone()).expect("insert error");
938 let mut merger = Merger::new(
939 layer_ref_iter(&skip_lists),
940 |_left, _right| MergeResult::Other {
941 emit: None,
942 left: Replace(Item::new(TestKey(3..3), 3).boxed()),
943 right: Discard,
944 },
945 counters(),
946 );
947 let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
948
949 let ItemRef { key, value, .. } = iter.get().expect("missing item");
952 assert_eq!((key, value), (&TestKey(3..3), &3));
953 iter.advance().await.unwrap();
954 assert!(iter.get().is_none());
955 }
956
957 #[fuchsia::test]
958 async fn test_replace_right() {
959 let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
960 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
961 skip_lists[0].insert(items[1].clone()).expect("insert error");
962 skip_lists[1].insert(items[0].clone()).expect("insert error");
963 let mut merger = Merger::new(
964 layer_ref_iter(&skip_lists),
965 |_left, _right| MergeResult::Other {
966 emit: None,
967 left: Discard,
968 right: Replace(Item::new(TestKey(3..3), 3).boxed()),
969 },
970 counters(),
971 );
972 let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
973
974 let ItemRef { key, value, .. } = iter.get().expect("missing item");
977 assert_eq!((key, value), (&TestKey(3..3), &3));
978 iter.advance().await.unwrap();
979 assert!(iter.get().is_none());
980 }
981
982 #[fuchsia::test]
983 async fn test_left_less_than_right() {
984 let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
985 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
986 skip_lists[0].insert(items[1].clone()).expect("insert error");
987 skip_lists[1].insert(items[0].clone()).expect("insert error");
988 let mut merger = Merger::new(
989 layer_ref_iter(&skip_lists),
990 |left, right| {
991 assert_eq!((left.key(), left.value()), (&TestKey(1..1), &1));
992 assert_eq!((right.key(), right.value()), (&TestKey(2..2), &2));
993 MergeResult::EmitLeft
994 },
995 counters(),
996 );
997 merger.query(Query::FullScan).await.expect("seek failed");
998 }
999
1000 #[fuchsia::test]
1001 async fn test_left_equals_right() {
1002 let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1003 let item = Item::new(TestKey(1..1), 1);
1004 skip_lists[0].insert(item.clone()).expect("insert error");
1005 skip_lists[1].insert(item.clone()).expect("insert error");
1006 let mut merger = Merger::new(
1007 layer_ref_iter(&skip_lists),
1008 |left, right| {
1009 assert_eq!((left.key(), left.value()), (&TestKey(1..1), &1));
1010 assert_eq!((right.key(), right.value()), (&TestKey(1..1), &1));
1011 assert_eq!(left.layer_index, 0);
1012 assert_eq!(right.layer_index, 1);
1013 MergeResult::EmitLeft
1014 },
1015 counters(),
1016 );
1017 merger.query(Query::FullScan).await.expect("seek failed");
1018 }
1019
1020 #[fuchsia::test]
1021 async fn test_keep() {
1022 let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1023 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
1024 skip_lists[0].insert(items[1].clone()).expect("insert error");
1025 skip_lists[1].insert(items[0].clone()).expect("insert error");
1026 let mut merger = Merger::new(
1027 layer_ref_iter(&skip_lists),
1028 |left, right| {
1029 if left.key() == &TestKey(1..1) {
1030 MergeResult::Other {
1031 emit: None,
1032 left: Replace(Item::new(TestKey(3..3), 3).boxed()),
1033 right: Keep,
1034 }
1035 } else {
1036 assert_eq!(left.key(), &TestKey(2..2));
1037 assert_eq!(right.key(), &TestKey(3..3));
1038 MergeResult::Other { emit: None, left: Discard, right: Keep }
1039 }
1040 },
1041 counters(),
1042 );
1043 let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
1044
1045 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1048 assert_eq!((key, value), (&TestKey(3..3), &3));
1049 iter.advance().await.unwrap();
1050 assert!(iter.get().is_none());
1051 }
1052
1053 #[fuchsia::test]
1054 async fn test_merge_10_layers() {
1055 let skip_lists: Vec<_> = (0..10).map(|_| SkipListLayer::new(100)).collect();
1056 let mut rng = rand::thread_rng();
1057 for i in 0..100 {
1058 skip_lists[rng.gen_range(0..10) as usize]
1059 .insert(Item::new(TestKey(i..i), i))
1060 .expect("insert error");
1061 }
1062 let mut merger = Merger::new(
1063 layer_ref_iter(&skip_lists),
1064 |_left, _right| MergeResult::EmitLeft,
1065 counters(),
1066 );
1067 let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
1068
1069 for i in 0..100 {
1070 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1071 assert_eq!((key, value), (&TestKey(i..i), &i));
1072 iter.advance().await.unwrap();
1073 }
1074 assert!(iter.get().is_none());
1075 }
1076
1077 #[fuchsia::test]
1078 async fn test_merge_uses_cmp_lower_bound() {
1079 let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1080 let items = [Item::new(TestKey(1..10), 1), Item::new(TestKey(2..3), 2)];
1081 skip_lists[0].insert(items[1].clone()).expect("insert error");
1082 skip_lists[1].insert(items[0].clone()).expect("insert error");
1083 let mut merger = Merger::new(
1084 layer_ref_iter(&skip_lists),
1085 |_left, _right| MergeResult::EmitLeft,
1086 counters(),
1087 );
1088 let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
1089
1090 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1091 assert_eq!((key, value), (&items[0].key, &items[0].value));
1092 iter.advance().await.unwrap();
1093 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1094 assert_eq!((key, value), (&items[1].key, &items[1].value));
1095 iter.advance().await.unwrap();
1096 assert!(iter.get().is_none());
1097 }
1098
1099 #[fuchsia::test]
1100 async fn test_merge_into_emit_left() {
1101 let skip_list = SkipListLayer::new(100);
1102 let items =
1103 [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2), Item::new(TestKey(3..3), 3)];
1104 skip_list.insert(items[0].clone()).expect("insert error");
1105 skip_list.insert(items[2].clone()).expect("insert error");
1106 skip_list
1107 .merge_into(items[1].clone(), &items[0].key, |_left, _right| MergeResult::EmitLeft);
1108
1109 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1110 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1111 assert_eq!((key, value), (&items[0].key, &items[0].value));
1112 iter.advance().await.unwrap();
1113 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1114 assert_eq!((key, value), (&items[1].key, &items[1].value));
1115 iter.advance().await.unwrap();
1116 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1117 assert_eq!((key, value), (&items[2].key, &items[2].value));
1118 iter.advance().await.unwrap();
1119 assert!(iter.get().is_none());
1120 }
1121
1122 #[fuchsia::test]
1123 async fn test_merge_into_emit_last_after_replacing() {
1124 let skip_list = SkipListLayer::new(100);
1125 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
1126 skip_list.insert(items[0].clone()).expect("insert error");
1127
1128 skip_list.merge_into(items[1].clone(), &items[0].key, |left, right| {
1129 if left.key() == &TestKey(1..1) {
1130 assert_eq!(right.key(), &TestKey(2..2));
1131 MergeResult::Other {
1132 emit: None,
1133 left: Replace(Item::new(TestKey(3..3), 3).boxed()),
1134 right: Keep,
1135 }
1136 } else {
1137 assert_eq!(left.key(), &TestKey(2..2));
1138 assert_eq!(right.key(), &TestKey(3..3));
1139 MergeResult::EmitLeft
1140 }
1141 });
1142
1143 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1144 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1145 assert_eq!((key, value), (&items[1].key, &items[1].value));
1146 iter.advance().await.unwrap();
1147 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1148 assert_eq!((key, value), (&TestKey(3..3), &3));
1149 iter.advance().await.unwrap();
1150 assert!(iter.get().is_none());
1151 }
1152
1153 #[fuchsia::test]
1154 async fn test_merge_into_emit_left_after_replacing() {
1155 let skip_list = SkipListLayer::new(100);
1156 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(3..3), 3)];
1157 skip_list.insert(items[0].clone()).expect("insert error");
1158
1159 skip_list.merge_into(items[1].clone(), &items[0].key, |left, right| {
1160 if left.key() == &TestKey(1..1) {
1161 assert_eq!(right.key(), &TestKey(3..3));
1162 MergeResult::Other {
1163 emit: None,
1164 left: Replace(Item::new(TestKey(2..2), 2).boxed()),
1165 right: Keep,
1166 }
1167 } else {
1168 assert_eq!(left.key(), &TestKey(2..2));
1169 assert_eq!(right.key(), &TestKey(3..3));
1170 MergeResult::EmitLeft
1171 }
1172 });
1173
1174 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1175 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1176 assert_eq!((key, value), (&TestKey(2..2), &2));
1177 iter.advance().await.unwrap();
1178 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1179 assert_eq!((key, value), (&items[1].key, &items[1].value));
1180 iter.advance().await.unwrap();
1181 assert!(iter.get().is_none());
1182 }
1183
1184 #[fuchsia::test]
1186 async fn test_merge_into_emit_other_and_discard() {
1187 let skip_list = SkipListLayer::new(100);
1188 let items =
1189 [Item::new(TestKey(1..1), 1), Item::new(TestKey(3..3), 3), Item::new(TestKey(5..5), 3)];
1190 skip_list.insert(items[0].clone()).expect("insert error");
1191 skip_list.insert(items[2].clone()).expect("insert error");
1192
1193 skip_list.merge_into(items[1].clone(), &items[0].key, |left, right| {
1194 if left.key() == &TestKey(1..1) {
1195 assert_eq!(right.key(), &TestKey(3..3));
1197 MergeResult::Other {
1198 emit: Some(Item::new(TestKey(2..2), 2).boxed()),
1199 left: Discard,
1200 right: Keep,
1201 }
1202 } else {
1203 assert_eq!(left.key(), &TestKey(3..3));
1205 assert_eq!(right.key(), &TestKey(5..5));
1206 MergeResult::Other {
1207 emit: Some(Item::new(TestKey(4..4), 4).boxed()),
1208 left: Discard,
1209 right: Discard,
1210 }
1211 }
1212 });
1213
1214 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1215 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1216 assert_eq!((key, value), (&TestKey(2..2), &2));
1217 iter.advance().await.unwrap();
1218 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1219 assert_eq!((key, value), (&TestKey(4..4), &4));
1220 iter.advance().await.unwrap();
1221 assert!(iter.get().is_none());
1222 }
1223
1224 #[fuchsia::test]
1227 async fn test_merge_into_replace_and_discard() {
1228 let skip_list = SkipListLayer::new(100);
1229 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(3..3), 3)];
1230 skip_list.insert(items[0].clone()).expect("insert error");
1231
1232 skip_list.merge_into(items[1].clone(), &items[0].key, |_left, _right| MergeResult::Other {
1233 emit: Some(Item::new(TestKey(2..2), 2).boxed()),
1234 left: Replace(Item::new(TestKey(4..4), 4).boxed()),
1235 right: Discard,
1236 });
1237
1238 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1239 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1240 assert_eq!((key, value), (&TestKey(2..2), &2));
1241 iter.advance().await.unwrap();
1242 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1243 assert_eq!((key, value), (&TestKey(4..4), &4));
1244 iter.advance().await.unwrap();
1245 assert!(iter.get().is_none());
1246 }
1247
1248 #[fuchsia::test]
1251 async fn test_merge_into_replace_merge_item() {
1252 let skip_list = SkipListLayer::new(100);
1253 let items =
1254 [Item::new(TestKey(1..1), 1), Item::new(TestKey(3..3), 3), Item::new(TestKey(5..5), 5)];
1255 skip_list.insert(items[0].clone()).expect("insert error");
1256 skip_list.insert(items[2].clone()).expect("insert error");
1257
1258 skip_list.merge_into(items[1].clone(), &items[0].key, |_left, right| {
1259 if right.key() == &TestKey(3..3) {
1260 MergeResult::Other {
1261 emit: None,
1262 left: Discard,
1263 right: Replace(Item::new(TestKey(2..2), 2).boxed()),
1264 }
1265 } else {
1266 assert_eq!(right.key(), &TestKey(5..5));
1267 MergeResult::Other {
1268 emit: None,
1269 left: Replace(Item::new(TestKey(4..4), 4).boxed()),
1270 right: Discard,
1271 }
1272 }
1273 });
1274
1275 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1276 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1277 assert_eq!((key, value), (&TestKey(4..4), &4));
1278 iter.advance().await.unwrap();
1279 assert!(iter.get().is_none());
1280 }
1281
1282 #[fuchsia::test]
1284 async fn test_merge_into_replace_existing() {
1285 let skip_list = SkipListLayer::new(100);
1286 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(3..3), 3)];
1287 skip_list.insert(items[1].clone()).expect("insert error");
1288
1289 skip_list.merge_into(items[0].clone(), &items[0].key, |_left, right| {
1290 if right.key() == &TestKey(3..3) {
1291 MergeResult::Other {
1292 emit: None,
1293 left: Keep,
1294 right: Replace(Item::new(TestKey(2..2), 2).boxed()),
1295 }
1296 } else {
1297 MergeResult::EmitLeft
1298 }
1299 });
1300
1301 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1302 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1303 assert_eq!((key, value), (&items[0].key, &items[0].value));
1304 iter.advance().await.unwrap();
1305 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1306 assert_eq!((key, value), (&TestKey(2..2), &2));
1307 iter.advance().await.unwrap();
1308 assert!(iter.get().is_none());
1309 }
1310
1311 #[fuchsia::test]
1312 async fn test_merge_into_discard_last() {
1313 let skip_list = SkipListLayer::new(100);
1314 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
1315 skip_list.insert(items[0].clone()).expect("insert error");
1316
1317 skip_list.merge_into(items[1].clone(), &items[0].key, |_left, _right| MergeResult::Other {
1318 emit: None,
1319 left: Discard,
1320 right: Keep,
1321 });
1322
1323 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1324 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1325 assert_eq!((key, value), (&items[1].key, &items[1].value));
1326 iter.advance().await.unwrap();
1327 assert!(iter.get().is_none());
1328 }
1329
1330 #[fuchsia::test]
1331 async fn test_merge_into_empty() {
1332 let skip_list = SkipListLayer::new(100);
1333 let items = [Item::new(TestKey(1..1), 1)];
1334
1335 skip_list.merge_into(items[0].clone(), &items[0].key, |_left, _right| {
1336 panic!("Unexpected merge!");
1337 });
1338
1339 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1340 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1341 assert_eq!((key, value), (&items[0].key, &items[0].value));
1342 iter.advance().await.unwrap();
1343 assert!(iter.get().is_none());
1344 }
1345
1346 #[fuchsia::test]
1347 async fn test_seek_uses_minimum_number_of_iterators() {
1348 let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1349 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(1..1), 2)];
1350 skip_lists[0].insert(items[0].clone()).expect("insert error");
1351 skip_lists[1].insert(items[1].clone()).expect("insert error");
1352 let mut merger = Merger::new(
1353 layer_ref_iter(&skip_lists),
1354 |_left, _right| MergeResult::Other { emit: None, left: Discard, right: Keep },
1355 counters(),
1356 );
1357 let iter = merger.query(Query::FullRange(&items[0].key)).await.expect("seek failed");
1358
1359 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1362 assert_eq!((key, value), (&items[0].key, &items[0].value));
1363 }
1364
1365 async fn test_advance<K: Eq + Key + LayerKey + OrdLowerBound>(
1368 layers: &[&[(K, i32)]],
1369 query: Query<'_, K>,
1370 expected: &[(K, i32)],
1371 ) {
1372 let mut skip_lists = Vec::new();
1373 for &layer in layers {
1374 let skip_list = SkipListLayer::new(100);
1375 for (k, v) in layer {
1376 skip_list.insert(Item::new(k.clone(), *v)).expect("insert error");
1377 }
1378 skip_lists.push(skip_list);
1379 }
1380 let mut merger = Merger::new(
1381 layer_ref_iter(&skip_lists),
1382 |_left, _right| MergeResult::EmitLeft,
1383 counters(),
1384 );
1385 let mut iter = merger.query(query).await.expect("seek failed");
1386 for (k, v) in expected {
1387 let ItemRef { key, value, .. } = iter.get().expect("get failed");
1388 assert_eq!((key, value), (k, v));
1389 iter.advance().await.expect("advance failed");
1390 }
1391 assert!(iter.get().is_none());
1392 }
1393
1394 #[fuchsia::test]
1395 async fn test_seek_skips_replaced_items() {
1396 test_advance(
1398 &[
1399 &[(TestKey(1..2), 1), (TestKey(2..3), 2), (TestKey(4..5), 3)],
1400 &[(TestKey(1..2), 4), (TestKey(2..3), 5), (TestKey(3..4), 6)],
1401 ],
1402 Query::FullRange(&TestKey(1..2)),
1403 &[(TestKey(1..2), 1), (TestKey(2..3), 2), (TestKey(3..4), 6), (TestKey(4..5), 3)],
1404 )
1405 .await;
1406 }
1407
1408 #[fuchsia::test]
1409 async fn test_advance_skips_replaced_items_at_end() {
1410 test_advance(
1413 &[&[(TestKey(1..2), 1)], &[(TestKey(1..2), 2)]],
1414 Query::FullRange(&TestKey(1..2)),
1415 &[(TestKey(1..2), 1)],
1416 )
1417 .await;
1418 }
1419
1420 #[derive(
1421 Clone,
1422 Eq,
1423 Hash,
1424 FuzzyHash,
1425 PartialEq,
1426 Debug,
1427 serde::Serialize,
1428 serde::Deserialize,
1429 TypeFingerprint,
1430 Versioned,
1431 )]
1432 struct TestKeyWithFullMerge(Range<u64>);
1433
1434 versioned_type! { 1.. => TestKeyWithFullMerge }
1435
1436 impl LayerKey for TestKeyWithFullMerge {
1437 fn merge_type(&self) -> MergeType {
1438 MergeType::FullMerge
1439 }
1440 }
1441
1442 impl SortByU64 for TestKeyWithFullMerge {
1443 fn get_leading_u64(&self) -> u64 {
1444 self.0.start
1445 }
1446 }
1447
1448 impl OrdUpperBound for TestKeyWithFullMerge {
1449 fn cmp_upper_bound(&self, other: &TestKeyWithFullMerge) -> std::cmp::Ordering {
1450 self.0.end.cmp(&other.0.end)
1451 }
1452 }
1453
1454 impl OrdLowerBound for TestKeyWithFullMerge {
1455 fn cmp_lower_bound(&self, other: &Self) -> std::cmp::Ordering {
1456 self.0.start.cmp(&other.0.start)
1457 }
1458 }
1459
1460 #[fuchsia::test]
1463 async fn test_full_merge_consistent_advance_ordering() {
1464 let layer_set = [
1465 [
1466 (TestKeyWithFullMerge(1..2), 1i32),
1467 (TestKeyWithFullMerge(2..3), 2i32),
1468 (TestKeyWithFullMerge(4..5), 3i32),
1469 ]
1470 .as_slice(),
1471 [
1472 (TestKeyWithFullMerge(1..2), 4i32),
1473 (TestKeyWithFullMerge(2..3), 5i32),
1474 (TestKeyWithFullMerge(3..4), 6i32),
1475 ]
1476 .as_slice(),
1477 ];
1478
1479 let full_merge_result = [
1480 (TestKeyWithFullMerge(1..2), 1),
1481 (TestKeyWithFullMerge(1..2), 4),
1482 (TestKeyWithFullMerge(2..3), 2),
1483 (TestKeyWithFullMerge(2..3), 5),
1484 (TestKeyWithFullMerge(3..4), 6),
1485 (TestKeyWithFullMerge(4..5), 3),
1486 ];
1487
1488 test_advance(layer_set.as_slice(), Query::FullScan, &full_merge_result).await;
1489
1490 test_advance(
1491 layer_set.as_slice(),
1492 Query::FullRange(&TestKeyWithFullMerge(1..2)),
1493 &full_merge_result,
1494 )
1495 .await;
1496
1497 test_advance(
1498 layer_set.as_slice(),
1499 Query::FullRange(&TestKeyWithFullMerge(2..3)),
1500 &full_merge_result[2..],
1501 )
1502 .await;
1503
1504 test_advance(
1505 layer_set.as_slice(),
1506 Query::FullRange(&TestKeyWithFullMerge(3..4)),
1507 &full_merge_result[4..],
1508 )
1509 .await;
1510 }
1511
1512 #[fuchsia::test]
1513 async fn test_full_merge_always_consult_all_layers() {
1514 let skip_lists =
1518 [SkipListLayer::new(100), SkipListLayer::new(100), SkipListLayer::new(100)];
1519 let items = [
1520 Item::new(TestKeyWithFullMerge(1..2), 1),
1521 Item::new(TestKeyWithFullMerge(2..3), 2),
1522 Item::new(TestKeyWithFullMerge(1..2), 3),
1523 Item::new(TestKeyWithFullMerge(2..3), 4),
1524 ];
1525 skip_lists[0].insert(items[0].clone()).expect("insert error");
1526 skip_lists[1].insert(items[1].clone()).expect("insert error");
1527 skip_lists[2].insert(items[2].clone()).expect("insert error");
1528 skip_lists[2].insert(items[3].clone()).expect("insert error");
1529 let mut merger = Merger::new(
1530 layer_ref_iter(&skip_lists),
1531 |left, right| {
1532 if left.key() == right.key() {
1534 MergeResult::Other {
1535 emit: None,
1536 left: Discard,
1537 right: Replace(
1538 Item::new(left.key().clone(), left.value() + right.value()).boxed(),
1539 ),
1540 }
1541 } else {
1542 MergeResult::EmitLeft
1543 }
1544 },
1545 counters(),
1546 );
1547 let mut iter = merger.query(Query::FullRange(&items[0].key)).await.expect("seek failed");
1548
1549 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1550 assert_eq!((key, *value), (&items[0].key, items[0].value + items[2].value));
1551 iter.advance().await.expect("advance");
1552 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1553 assert_eq!((key, *value), (&items[1].key, items[1].value + items[3].value));
1554 iter.advance().await.expect("advance");
1555 assert!(iter.get().is_none());
1556 }
1557
1558 #[derive(
1559 Clone,
1560 Eq,
1561 Hash,
1562 FuzzyHash,
1563 PartialEq,
1564 Debug,
1565 serde::Serialize,
1566 serde::Deserialize,
1567 TypeFingerprint,
1568 Versioned,
1569 )]
1570 struct TestKeyWithDefaultLayerKey(Range<u64>);
1571
1572 versioned_type! { 1.. => TestKeyWithDefaultLayerKey }
1573
1574 impl LayerKey for TestKeyWithDefaultLayerKey {}
1576
1577 impl SortByU64 for TestKeyWithDefaultLayerKey {
1578 fn get_leading_u64(&self) -> u64 {
1579 self.0.start
1580 }
1581 }
1582
1583 impl OrdUpperBound for TestKeyWithDefaultLayerKey {
1584 fn cmp_upper_bound(&self, other: &TestKeyWithDefaultLayerKey) -> std::cmp::Ordering {
1585 self.0.end.cmp(&other.0.end)
1586 }
1587 }
1588
1589 impl OrdLowerBound for TestKeyWithDefaultLayerKey {
1590 fn cmp_lower_bound(&self, other: &Self) -> std::cmp::Ordering {
1591 self.0.start.cmp(&other.0.start)
1592 }
1593 }
1594
1595 #[fuchsia::test]
1596 async fn test_no_merge_unbounded_include_all_layers() {
1597 test_advance(
1598 &[
1599 &[
1600 (TestKeyWithDefaultLayerKey(1..2), 1),
1601 (TestKeyWithDefaultLayerKey(2..3), 2),
1602 (TestKeyWithDefaultLayerKey(4..5), 3),
1603 ],
1604 &[
1605 (TestKeyWithDefaultLayerKey(1..2), 4),
1606 (TestKeyWithDefaultLayerKey(2..3), 5),
1607 (TestKeyWithDefaultLayerKey(3..4), 6),
1608 ],
1609 ],
1610 Query::FullScan,
1611 &[
1612 (TestKeyWithDefaultLayerKey(1..2), 1),
1613 (TestKeyWithDefaultLayerKey(1..2), 4),
1614 (TestKeyWithDefaultLayerKey(2..3), 2),
1615 (TestKeyWithDefaultLayerKey(2..3), 5),
1616 (TestKeyWithDefaultLayerKey(3..4), 6),
1617 (TestKeyWithDefaultLayerKey(4..5), 3),
1618 ],
1619 )
1620 .await;
1621 }
1622
1623 #[fuchsia::test]
1624 async fn test_no_merge_proceeds_comprehensively_after_seek() {
1625 test_advance(
1626 &[
1627 &[
1628 (TestKeyWithDefaultLayerKey(1..2), 1),
1629 (TestKeyWithDefaultLayerKey(2..3), 2),
1630 (TestKeyWithDefaultLayerKey(4..5), 3),
1631 ],
1632 &[
1633 (TestKeyWithDefaultLayerKey(1..2), 4),
1634 (TestKeyWithDefaultLayerKey(2..3), 5),
1635 (TestKeyWithDefaultLayerKey(3..4), 6),
1636 ],
1637 ],
1638 Query::FullRange(&TestKeyWithDefaultLayerKey(1..2)),
1639 &[
1640 (TestKeyWithDefaultLayerKey(1..2), 1),
1641 (TestKeyWithDefaultLayerKey(1..2), 4),
1642 (TestKeyWithDefaultLayerKey(2..3), 2),
1643 (TestKeyWithDefaultLayerKey(2..3), 5),
1644 (TestKeyWithDefaultLayerKey(3..4), 6),
1645 (TestKeyWithDefaultLayerKey(4..5), 3),
1646 ],
1647 )
1648 .await;
1649 }
1650
1651 #[fuchsia::test]
1652 async fn test_no_merge_seek_finds_lower_layer() {
1653 let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1654 let items = [
1655 Item::new(TestKeyWithDefaultLayerKey(2..3), 1),
1656 Item::new(TestKeyWithDefaultLayerKey(1..1), 2),
1657 ];
1658 skip_lists[0].insert(items[0].clone()).expect("insert error");
1659 skip_lists[1].insert(items[1].clone()).expect("insert error");
1660 let mut merger = Merger::new(
1661 layer_ref_iter(&skip_lists),
1662 |_left, _right| MergeResult::EmitLeft,
1663 counters(),
1664 );
1665 let iter = merger.query(Query::FullRange(&items[1].key)).await.expect("seek failed");
1666
1667 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1668 assert_eq!((key, value), (&items[1].key, &items[1].value));
1669 }
1670
1671 #[fuchsia::test]
1672 async fn test_no_merge_seek_stops_at_exact_match() {
1673 let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1674 let items = [
1675 Item::new(TestKeyWithDefaultLayerKey(2..3), 1),
1676 Item::new(TestKeyWithDefaultLayerKey(1..4), 2),
1677 ];
1678 skip_lists[0].insert(items[0].clone()).expect("insert error");
1679 skip_lists[1].insert(items[1].clone()).expect("insert error");
1680 let mut merger = Merger::new(
1681 layer_ref_iter(&skip_lists),
1682 |_left, _right| MergeResult::Other { emit: None, left: Discard, right: Keep },
1683 counters(),
1684 );
1685 let iter = merger.query(Query::FullRange(&items[0].key)).await.expect("seek failed");
1686
1687 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1690 assert_eq!((key, value), (&items[0].key, &items[0].value));
1691 }
1692
1693 #[fuchsia::test]
1694 async fn test_seek_less_than() {
1695 let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1696 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
1697 skip_lists[0].insert(items[0].clone()).expect("insert error");
1698 skip_lists[1].insert(items[1].clone()).expect("insert error");
1699 let mut merger = Merger::new(
1701 layer_ref_iter(&skip_lists),
1702 |_left, _right| MergeResult::Other { emit: None, left: Discard, right: Keep },
1703 counters(),
1704 );
1705 let iter = merger.query(Query::FullRange(&TestKey(0..0))).await.expect("seek failed");
1706
1707 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1709 assert_eq!((key, value), (&items[1].key, &items[1].value));
1710 }
1711
1712 #[fuchsia::test]
1713 async fn test_seek_to_end() {
1714 let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1715 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
1716 skip_lists[0].insert(items[0].clone()).expect("insert error");
1717 skip_lists[1].insert(items[1].clone()).expect("insert error");
1718 let mut merger = Merger::new(
1719 layer_ref_iter(&skip_lists),
1720 |_left, _right| MergeResult::Other { emit: None, left: Discard, right: Keep },
1721 counters(),
1722 );
1723 let iter = merger.query(Query::FullRange(&TestKey(3..3))).await.expect("seek failed");
1724
1725 assert!(iter.get().is_none());
1726 }
1727
1728 #[fuchsia::test]
1729 async fn test_merge_all_discarded() {
1730 let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1731 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
1732 skip_lists[0].insert(items[1].clone()).expect("insert error");
1733 skip_lists[1].insert(items[0].clone()).expect("insert error");
1734 let mut merger = Merger::new(
1735 layer_ref_iter(&skip_lists),
1736 |_left, _right| MergeResult::Other { emit: None, left: Discard, right: Discard },
1737 counters(),
1738 );
1739 let iter = merger.query(Query::FullScan).await.expect("seek failed");
1740 assert!(iter.get().is_none());
1741 }
1742
1743 #[fuchsia::test]
1744 async fn test_seek_with_merged_key_less_than() {
1745 let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1746 let items = [Item::new(TestKey(1..8), 1), Item::new(TestKey(2..10), 2)];
1747 skip_lists[0].insert(items[0].clone()).expect("insert error");
1748 skip_lists[1].insert(items[1].clone()).expect("insert error");
1749 let mut merger = Merger::new(
1750 layer_ref_iter(&skip_lists),
1751 |left, _right| {
1752 if left.key() == &TestKey(1..8) {
1753 MergeResult::Other {
1754 emit: None,
1755 left: Replace(Item::new(TestKey(1..2), 1).boxed()),
1756 right: Keep,
1757 }
1758 } else {
1759 MergeResult::EmitLeft
1760 }
1761 },
1762 counters(),
1763 );
1764 let iter = merger.query(Query::FullRange(&TestKey(0..3))).await.expect("seek failed");
1765 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1766 assert_eq!((key, value), (&items[1].key, &items[1].value));
1767 }
1768
1769 #[fuchsia::test]
1770 async fn test_overlapping_keys() {
1771 let skip_lists =
1772 [SkipListLayer::new(100), SkipListLayer::new(100), SkipListLayer::new(100)];
1773 let items = [
1774 Item::new(TestKey(0..10), 1),
1775 Item::new(TestKey(0..20), 2),
1776 Item::new(TestKey(0..30), 3),
1777 ];
1778 skip_lists[0].insert(items[0].clone()).expect("insert error");
1779 skip_lists[1].insert(items[1].clone()).expect("insert error");
1780 skip_lists[2].insert(items[2].clone()).expect("insert error");
1781 let mut merger = Merger::new(
1782 layer_ref_iter(&skip_lists),
1783 |left, right| {
1784 let result = if left.key().0.end <= right.key().0.start {
1785 MergeResult::EmitLeft
1786 } else {
1787 if left.key() == &TestKey(0..30) && right.key() == &TestKey(10..20) {
1788 MergeResult::Other {
1789 emit: Some(Item::new(TestKey(0..10), 1).boxed()),
1790 left: Replace(Item::new(TestKey(10..30), 1).boxed()),
1791 right: Keep,
1792 }
1793 } else {
1794 MergeResult::Other {
1795 emit: None,
1796 left: Keep,
1797 right: Replace(
1798 Item::new(TestKey(left.key().0.end..right.key().0.end), 1).boxed(),
1799 ),
1800 }
1801 }
1802 };
1803 result
1804 },
1805 counters(),
1806 );
1807 let mut iter = merger.query(Query::FullRange(&TestKey(0..1))).await.expect("seek failed");
1808 let ItemRef { key, .. } = iter.get().expect("missing item");
1809 assert_eq!(key, &TestKey(0..10));
1810 iter.advance().await.expect("advance failed");
1811 let ItemRef { key, .. } = iter.get().expect("missing item");
1812 assert_eq!(key, &TestKey(10..20));
1813 iter.advance().await.expect("advance failed");
1814 let ItemRef { key, .. } = iter.get().expect("missing item");
1815 assert_eq!(key, &TestKey(20..30));
1816 iter.advance().await.expect("advance failed");
1817 assert_eq!(iter.get(), None);
1818 }
1819
1820 async fn write_layer<K: Key, V: Value>(items: Vec<Item<K, V>>) -> Arc<dyn Layer<K, V>> {
1821 let object = Arc::new(FakeObject::new());
1822 let write_handle = FakeObjectHandle::new(object.clone());
1823 let mut writer =
1824 PersistentLayerWriter::<_, K, V>::new(Writer::new(&write_handle).await, 1, 512)
1825 .await
1826 .expect("PersistentLayerWriter::new failed");
1827 for item in items {
1828 writer.write(item.as_item_ref()).await.expect("write failed");
1829 }
1830 writer.flush().await.expect("flush failed");
1831 PersistentLayer::open(FakeObjectHandle::new(object))
1832 .await
1833 .expect("open_persistent_layer failed")
1834 }
1835
1836 fn merge_sum(
1837 left: &MergeLayerIterator<'_, i32, i32>,
1838 right: &MergeLayerIterator<'_, i32, i32>,
1839 ) -> MergeResult<i32, i32> {
1840 if left.key() == right.key() {
1842 MergeResult::Other {
1843 emit: None,
1844 left: Discard,
1845 right: Replace(Item::new(left.key().clone(), left.value() + right.value()).boxed()),
1846 }
1847 } else {
1848 MergeResult::EmitLeft
1849 }
1850 }
1851
1852 #[fuchsia::test]
1853 async fn test_merge_bloom_filters_point_query() {
1854 let layer_0_items = vec![Item::new(1, 1), Item::new(2, 1)];
1855 let layer_1_items = vec![Item::new(2, 1), Item::new(4, 1)];
1856 let items = [Item::new(1, 1), Item::new(2, 2), Item::new(4, 1)];
1857 let layers: [Arc<dyn Layer<i32, i32>>; 2] =
1858 [write_layer(layer_0_items).await, write_layer(layer_1_items).await];
1859 let mut merger = Merger::new(dyn_layer_ref_iter(&layers), merge_sum, counters());
1860
1861 {
1862 let iter = merger.query(Query::Point(&1)).await.expect("seek failed");
1864 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1865 assert_eq!((key, *value), (&items[0].key, items[0].value));
1866 }
1867 {
1868 let iter = merger.query(Query::Point(&2)).await.expect("seek failed");
1870 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1871 assert_eq!((key, *value), (&items[1].key, items[1].value));
1872 }
1873 {
1874 let iter = merger.query(Query::Point(&4)).await.expect("seek failed");
1876 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1877 assert_eq!((key, *value), (&items[2].key, items[2].value));
1878 }
1879 {
1880 let iter = merger.query(Query::Point(&400)).await.expect("seek failed");
1882 assert!(iter.get().is_none());
1883 }
1884 }
1885
1886 #[fuchsia::test]
1887 async fn test_merge_bloom_filters_limited_range() {
1888 let layer_0_items = vec![Item::new(
1891 ObjectKey::extent(0, 0, 0..2048),
1892 ObjectValue::extent(0, VOLUME_DATA_KEY_ID),
1893 )];
1894 let layer_1_items = vec![
1895 Item::new(
1896 ObjectKey::extent(0, 0, 1024..4096),
1897 ObjectValue::extent(32768, VOLUME_DATA_KEY_ID),
1898 ),
1899 Item::new(
1900 ObjectKey::extent(0, 0, 16384..17408),
1901 ObjectValue::extent(65536, VOLUME_DATA_KEY_ID),
1902 ),
1903 ];
1904 let items = [
1905 Item::new(ObjectKey::extent(0, 0, 0..2048), ObjectValue::extent(0, VOLUME_DATA_KEY_ID)),
1906 Item::new(
1907 ObjectKey::extent(0, 0, 2048..4096),
1908 ObjectValue::extent(33792, VOLUME_DATA_KEY_ID),
1909 ),
1910 Item::new(
1911 ObjectKey::extent(0, 0, 16384..17408),
1912 ObjectValue::extent(65536, VOLUME_DATA_KEY_ID),
1913 ),
1914 ];
1915 let layers: [Arc<dyn Layer<ObjectKey, ObjectValue>>; 2] =
1916 [write_layer(layer_0_items).await, write_layer(layer_1_items).await];
1917 let mut merger =
1918 Merger::new(dyn_layer_ref_iter(&layers), object_store::merge::merge, counters());
1919
1920 {
1921 let mut iter = merger
1923 .query(Query::LimitedRange(&ObjectKey::extent(0, 0, 16384..16386)))
1924 .await
1925 .expect("seek failed");
1926 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1927 assert_eq!(key, &items[2].key);
1928 assert_eq!(value, &items[2].value);
1929 iter.advance().await.expect("advance");
1930 assert!(iter.get().is_none());
1931 }
1932 {
1933 let mut iter = merger
1935 .query(Query::LimitedRange(&ObjectKey::extent(0, 0, 0..4096)))
1936 .await
1937 .expect("seek failed");
1938 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1939 assert_eq!(key, &items[0].key);
1940 assert_eq!(value, &items[0].value);
1941 iter.advance().await.expect("advance");
1942 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1943 assert_eq!(key, &items[1].key);
1944 assert_eq!(value, &items[1].value);
1945 iter.advance().await.expect("advance");
1946 }
1947 {
1948 let mut iter = merger
1950 .query(Query::LimitedRange(&ObjectKey::extent(0, 0, 8192..12288)))
1951 .await
1952 .expect("seek failed");
1953 let ItemRef { key, .. } = iter.get().expect("missing item");
1954 assert_eq!(key, &items[2].key);
1955 iter.advance().await.expect("advance");
1956 assert!(iter.get().is_none());
1957 }
1958 }
1959
1960 #[fuchsia::test]
1961 async fn test_merge_bloom_filters_full_range() {
1962 let layer_0_items = vec![Item::new(1, 1), Item::new(2, 1)];
1963 let layer_1_items = vec![Item::new(2, 1), Item::new(4, 1)];
1964 let items = [Item::new(1, 1), Item::new(2, 2), Item::new(4, 1)];
1965 let layers: [Arc<dyn Layer<i32, i32>>; 2] =
1966 [write_layer(layer_0_items).await, write_layer(layer_1_items).await];
1967 let mut merger = Merger::new(dyn_layer_ref_iter(&layers), merge_sum, counters());
1968
1969 let mut iter = merger.query(Query::FullRange(&0)).await.expect("seek failed");
1970 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1971 assert_eq!((key, *value), (&items[0].key, items[0].value));
1972 iter.advance().await.expect("advance");
1973 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1974 assert_eq!((key, *value), (&items[1].key, items[1].value));
1975 iter.advance().await.expect("advance");
1976 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1977 assert_eq!((key, *value), (&items[2].key, items[2].value));
1978 iter.advance().await.expect("advance");
1979 assert!(iter.get().is_none());
1980 }
1981
1982 #[fuchsia::test]
1983 async fn test_merge_bloom_filters_full_scan() {
1984 let layer_0_items = vec![Item::new(1, 1), Item::new(2, 1)];
1985 let layer_1_items = vec![Item::new(2, 1), Item::new(4, 1)];
1986 let items = [Item::new(1, 1), Item::new(2, 2), Item::new(4, 1)];
1987 let layers: [Arc<dyn Layer<i32, i32>>; 2] =
1988 [write_layer(layer_0_items).await, write_layer(layer_1_items).await];
1989 let mut merger = Merger::new(dyn_layer_ref_iter(&layers), merge_sum, counters());
1990
1991 let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
1992 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1993 assert_eq!((key, *value), (&items[0].key, items[0].value));
1994 iter.advance().await.expect("advance");
1995 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1996 assert_eq!((key, *value), (&items[1].key, items[1].value));
1997 iter.advance().await.expect("advance");
1998 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1999 assert_eq!((key, *value), (&items[2].key, items[2].value));
2000 iter.advance().await.expect("advance");
2001 assert!(iter.get().is_none());
2002 }
2003}