1use crate::log::*;
6use crate::lsm_tree;
7use crate::lsm_tree::types::{
8 BoxedItem, Item, ItemRef, Key, Layer, LayerIterator, LayerIteratorMut, LayerKey, MergeType,
9 OrdLowerBound, Value,
10};
11use anyhow::Error;
12use async_trait::async_trait;
13use futures::try_join;
14use std::cmp::Ordering;
15use std::collections::BinaryHeap;
16use std::fmt::{Debug, Write};
17use std::ops::{Bound, Deref, DerefMut};
18use std::sync::{Arc, atomic};
19
20#[derive(Debug, Eq, PartialEq)]
21pub enum ItemOp<K, V> {
22 Keep,
24
25 Discard,
27
28 Replace(BoxedItem<K, V>),
31}
32
33#[derive(Debug, Eq, PartialEq)]
34pub enum MergeResult<K, V> {
35 EmitLeft,
38
39 Other { emit: Option<BoxedItem<K, V>>, left: ItemOp<K, V>, right: ItemOp<K, V> },
60}
61
62pub type MergeFn<K, V> =
67 fn(&MergeLayerIterator<'_, K, V>, &MergeLayerIterator<'_, K, V>) -> MergeResult<K, V>;
68
69pub enum MergeItem<K, V> {
70 None,
71 Item(BoxedItem<K, V>),
72 Iter,
73}
74
75enum RawIterator<'a, K, V> {
76 None,
77 Const(Box<dyn LayerIterator<K, V> + 'a>),
78 Mut(Box<dyn LayerIteratorMut<K, V> + 'a>),
79}
80
81unsafe impl<K, V> Send for RawIterator<'_, K, V> {}
84
85pub struct MergeLayerIterator<'a, K, V> {
88 layer: Option<&'a dyn Layer<K, V>>,
89
90 iter: RawIterator<'a, K, V>,
92
93 pub layer_index: u16,
95
96 item: MergeItem<K, V>,
98}
99
100impl<'a, K, V> MergeLayerIterator<'a, K, V> {
101 pub fn key(&self) -> &K {
102 self.item().key
103 }
104
105 pub fn value(&self) -> &V {
106 self.item().value
107 }
108
109 fn new(layer_index: u16, layer: &'a dyn Layer<K, V>) -> Self {
110 MergeLayerIterator {
111 layer: Some(layer),
112 iter: RawIterator::None,
113 layer_index,
114 item: MergeItem::None,
115 }
116 }
117
118 fn new_with_item(layer_index: u16, item: MergeItem<K, V>) -> Self {
119 MergeLayerIterator { layer: None, iter: RawIterator::None, layer_index, item }
120 }
121
122 fn item(&self) -> ItemRef<'_, K, V> {
123 match &self.item {
124 MergeItem::None => panic!("No item!"),
125 MergeItem::Item(item) => item.as_item_ref(),
126 MergeItem::Iter => self.get().unwrap(),
127 }
128 }
129
130 fn get(&self) -> Option<ItemRef<'_, K, V>> {
131 match &self.iter {
132 RawIterator::None => panic!("No iterator!"),
133 RawIterator::Const(iter) => iter.get(),
134 RawIterator::Mut(iter) => iter.get(),
135 }
136 }
137
138 fn set_item_from_iter(&mut self) {
139 self.item = if match &self.iter {
140 RawIterator::None => unreachable!(),
141 RawIterator::Const(iter) => iter.get(),
142 RawIterator::Mut(iter) => iter.get(),
143 }
144 .is_some()
145 {
146 MergeItem::Iter
147 } else {
148 MergeItem::None
149 };
150 }
151
152 fn take_item(&mut self) -> Option<BoxedItem<K, V>> {
153 if matches!(self.item, MergeItem::Item(_)) {
154 if let MergeItem::Item(item) = std::mem::replace(&mut self.item, MergeItem::None) {
155 return Some(item);
156 }
157 }
158 None
159 }
160
161 async fn advance(&mut self) -> Result<(), Error> {
162 if let MergeItem::Iter = self.item {
163 if let RawIterator::Const(iter) = &mut self.iter {
164 iter.advance().await?;
165 } else {
166 unreachable!();
168 }
169 }
170 self.set_item_from_iter();
171 Ok(())
172 }
173
174 fn replace(&mut self, item: BoxedItem<K, V>) {
175 self.item = MergeItem::Item(item);
176 }
177
178 fn is_some(&self) -> bool {
179 !matches!(self.item, MergeItem::None)
180 }
181
182 async fn maybe_advance(&mut self, op: &ItemOp<K, V>) -> Result<(), Error> {
185 if let ItemOp::Keep = op { Ok(()) } else { self.advance().await }
186 }
187}
188
189impl<K: OrdLowerBound, V> Ord for MergeLayerIterator<'_, K, V> {
191 fn cmp(&self, other: &Self) -> Ordering {
192 other.key().cmp_lower_bound(self.key()).then(other.layer_index.cmp(&self.layer_index))
194 }
195}
196impl<K: OrdLowerBound, V> PartialOrd for MergeLayerIterator<'_, K, V> {
197 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
198 Some(self.cmp(other))
199 }
200}
201impl<K: OrdLowerBound, V> PartialEq for MergeLayerIterator<'_, K, V> {
202 fn eq(&self, other: &Self) -> bool {
203 self.cmp(other) == Ordering::Equal
204 }
205}
206impl<K: OrdLowerBound, V> Eq for MergeLayerIterator<'_, K, V> {}
207
208enum CurrentItem<'a, 'b, K, V> {
211 None,
212 Item(BoxedItem<K, V>),
213 Iterator(&'a mut MergeLayerIterator<'b, K, V>),
214}
215
216impl<'a, 'b, K, V> CurrentItem<'a, 'b, K, V> {
217 fn take_iterator(&mut self) -> Option<&'a mut MergeLayerIterator<'b, K, V>> {
218 if matches!(self, CurrentItem::Iterator(_)) {
219 if let CurrentItem::Iterator(iter) = std::mem::replace(self, CurrentItem::None) {
220 return Some(iter);
221 }
222 }
223 None
224 }
225}
226
227impl<'a, K, V> From<&'a CurrentItem<'_, '_, K, V>> for Option<ItemRef<'a, K, V>> {
228 fn from(iter: &'a CurrentItem<'_, '_, K, V>) -> Option<ItemRef<'a, K, V>> {
229 match iter {
230 CurrentItem::None => None,
231 CurrentItem::Iterator(iterator) => Some(iterator.item()),
232 CurrentItem::Item(item) => Some(item.as_item_ref()),
233 }
234 }
235}
236
237pub struct Merger<'a, K, V> {
239 iterators: Vec<MergeLayerIterator<'a, K, V>>,
241
242 merge_fn: MergeFn<K, V>,
244
245 trace: bool,
247
248 counters: Arc<lsm_tree::TreeCounters>,
250}
251
252#[derive(Debug, Clone)]
259pub enum Query<'a, K: Key + LayerKey + OrdLowerBound> {
260 Point(&'a K),
265 LimitedRange(&'a K),
272 FullRange(&'a K),
275 FullScan,
278}
279
280impl<'a, K: Key + LayerKey + OrdLowerBound> Query<'a, K> {
281 fn needs_layer<V>(&self, layer: &dyn Layer<K, V>) -> bool {
282 match self {
283 Self::Point(key) => layer.maybe_contains_key(key),
284 Self::LimitedRange(key) => layer.maybe_contains_key(key),
285 Self::FullRange(_) => true,
286 Self::FullScan => true,
287 }
288 }
289}
290
291#[fxfs_trace::trace]
292impl<'a, K: Key + LayerKey + OrdLowerBound, V: Value> Merger<'a, K, V> {
293 pub(super) fn new<I: Iterator<Item = &'a dyn Layer<K, V>>>(
294 layers: I,
295 merge_fn: MergeFn<K, V>,
296 counters: Arc<lsm_tree::TreeCounters>,
297 ) -> Merger<'a, K, V> {
298 Merger {
299 iterators: layers
300 .enumerate()
301 .map(|(index, layer)| MergeLayerIterator::new(index as u16, layer))
302 .collect(),
303 merge_fn: merge_fn,
304 trace: false,
305 counters,
306 }
307 }
308
309 #[trace]
312 pub async fn query(
313 &mut self,
314 query: Query<'_, K>,
315 ) -> Result<MergerIterator<'_, 'a, K, V>, Error> {
316 if let Query::Point(key) = query {
317 debug_assert!(!key.is_range_key())
325 };
326 let len = self.iterators.len();
327 let pending_iterators = {
328 fxfs_trace::duration!("Merger::filter_layer_files", "len" => len);
329 self.iterators
330 .iter_mut()
331 .rev()
332 .filter(|l| query.needs_layer(l.layer.clone().unwrap()))
333 .collect::<Vec<&mut MergeLayerIterator<'a, K, V>>>()
334 };
335 let layer_count = pending_iterators.len();
336 {
337 self.counters.num_seeks.fetch_add(1, atomic::Ordering::Relaxed);
338 self.counters.layer_files_total.fetch_add(len, atomic::Ordering::Relaxed);
339 self.counters
340 .layer_files_skipped
341 .fetch_add(len - layer_count, atomic::Ordering::Relaxed);
342 }
343 log::debug!(query:?; "Consulting {}/{} layers", layer_count, len);
344 let mut merger_iter = MergerIterator {
345 merge_fn: self.merge_fn,
346 pending_iterators,
347 heap: BinaryHeap::with_capacity(layer_count),
348 item: CurrentItem::None,
349 trace: self.trace,
350 history: String::new(),
351 };
352 let owned_bound;
353 let bound = match query {
354 Query::Point(key) => Bound::Included(key),
355 Query::LimitedRange(key) => {
356 owned_bound = Bound::Included(key.search_key());
357 owned_bound.as_ref()
358 }
359 Query::FullRange(start) => Bound::Included(start),
360 Query::FullScan => Bound::Unbounded,
361 };
362 merger_iter.seek(bound).await?;
363 Ok(merger_iter)
364 }
365
366 pub fn set_trace(&mut self, v: bool) {
367 self.trace = v;
368 }
369}
370
371pub struct MergerIterator<'a, 'b, K, V> {
374 merge_fn: MergeFn<K, V>,
375
376 pending_iterators: Vec<&'a mut MergeLayerIterator<'b, K, V>>,
378
379 heap: BinaryHeap<&'a mut MergeLayerIterator<'b, K, V>>,
381
382 item: CurrentItem<'a, 'b, K, V>,
384
385 trace: bool,
387
388 history: String,
390}
391
392impl<'a, 'b, K: Key + LayerKey + OrdLowerBound, V: Value> MergerIterator<'a, 'b, K, V> {
393 async fn seek(&mut self, bound: Bound<&K>) -> Result<(), Error> {
394 let next_key = match bound {
395 Bound::Unbounded => None,
396 Bound::Included(key) => Some(key),
397 Bound::Excluded(_) => panic!("Excluded bounds not supported!"),
398 };
399
400 self.push_iterators(next_key, bound).await?;
401 self.advance_impl(bound).await
402 }
403
404 async fn advance_impl(&mut self, next_key_bound: Bound<&K>) -> Result<(), Error> {
410 loop {
411 loop {
412 if self.heap.is_empty() {
413 self.item = CurrentItem::None;
414 return Ok(());
415 }
416 let lowest = self.heap.pop().unwrap();
417 let maybe_second_lowest = self.heap.pop();
418 if let Some(second_lowest) = maybe_second_lowest {
419 let result = (self.merge_fn)(&lowest, &second_lowest);
420 if self.trace {
421 writeln!(
422 self.history,
423 "merge {:?}, {:?} -> {:?}",
424 lowest.item(),
425 second_lowest.item(),
426 result
427 )
428 .unwrap();
429 }
430 match result {
431 MergeResult::EmitLeft => {
432 self.heap.push(second_lowest);
433 self.item = CurrentItem::Iterator(lowest);
434 break;
435 }
436 MergeResult::Other { emit, left, right } => {
437 try_join!(
438 lowest.maybe_advance(&left),
439 second_lowest.maybe_advance(&right)
440 )?;
441 self.update_item(lowest, left);
442 self.update_item(second_lowest, right);
443 if let Some(emit) = emit {
444 self.item = CurrentItem::Item(emit);
445 break;
446 }
447 }
448 }
449 } else {
450 self.item = CurrentItem::Iterator(lowest);
451 break;
452 }
453 }
454
455 match next_key_bound {
475 Bound::Included(key)
476 if self.get().unwrap().key.cmp_upper_bound(key) == Ordering::Less => {}
477 Bound::Excluded(key)
478 if self.get().unwrap().key.cmp_upper_bound(key) != Ordering::Greater => {}
479 _ => return Ok(()),
480 }
481 if let Some(iterator) = self.item.take_iterator() {
482 iterator.advance().await?;
483 if iterator.is_some() {
484 self.heap.push(iterator);
485 }
486 }
487 }
488 }
489
490 fn needs_more_iterators(&self, next_key: Option<&K>, next_key_bound: Bound<&K>) -> bool {
492 if self.pending_iterators.is_empty() {
493 return false;
494 }
495 if self.heap.is_empty() {
496 return true;
497 }
498 let target;
499 match next_key_bound {
500 Bound::Included(k) => target = k,
501 Bound::Excluded(_) | Bound::Unbounded => return true,
502 }
503 match target.merge_type() {
504 MergeType::FullMerge => true,
505 MergeType::OptimizedMerge => {
506 match next_key {
507 Some(k) => {
508 self.heap.peek().unwrap().key().cmp_lower_bound(k) == Ordering::Greater
509 }
510 None => {
511 self.heap.peek().unwrap().key().cmp_lower_bound(target) != Ordering::Equal
513 }
514 }
515 }
516 }
517 }
518
519 async fn push_iterators(
525 &mut self,
526 next_key: Option<&K>,
527 next_key_bound: Bound<&K>,
528 ) -> Result<(), Error> {
529 while self.needs_more_iterators(next_key, next_key_bound) {
530 let iter = self.pending_iterators.pop().unwrap();
531 let sub_iter = iter.layer.as_ref().unwrap().seek(next_key_bound).await?;
532 if self.trace {
533 writeln!(
534 self.history,
535 "merger: search for {:?}, found {:?}",
536 next_key_bound,
537 sub_iter.get()
538 )
539 .unwrap();
540 }
541 iter.iter = RawIterator::Const(sub_iter);
542 iter.set_item_from_iter();
543 if iter.is_some() {
544 self.heap.push(iter);
545 }
546 }
547 Ok(())
548 }
549
550 fn update_item(&mut self, item: &'a mut MergeLayerIterator<'b, K, V>, op: ItemOp<K, V>) {
553 match op {
554 ItemOp::Keep => self.heap.push(item),
555 ItemOp::Discard => {
556 if item.is_some() {
558 self.heap.push(item);
559 }
560 }
561 ItemOp::Replace(replacement) => {
562 item.replace(replacement);
563 self.heap.push(item);
564 }
565 }
566 }
567}
568
569#[async_trait]
570impl<'a, K: Key + LayerKey + OrdLowerBound, V: Value> LayerIterator<K, V>
571 for MergerIterator<'a, '_, K, V>
572{
573 async fn advance(&mut self) -> Result<(), Error> {
574 let current_key;
575 let mut next_key = None;
576 let mut next_key_bound = Bound::Unbounded;
577 if !self.pending_iterators.is_empty() {
578 if let Some(ItemRef { key, .. }) = self.get() {
579 next_key = key.next_key();
580 match next_key {
581 None => {
582 current_key = Some(key.clone());
585 next_key_bound = Bound::Excluded(current_key.as_ref().unwrap());
586 }
587 Some(ref key) => next_key_bound = Bound::Included(key),
588 }
589 }
590 }
591
592 if let Some(iterator) = self.item.take_iterator() {
595 if self.needs_more_iterators(next_key.as_ref(), next_key_bound) {
596 let existing_item = iterator.item().boxed();
597 iterator.advance().await?;
598 match &next_key {
599 Some(next_key)
600 if iterator.is_some()
601 && iterator.key().cmp_lower_bound(next_key) != Ordering::Greater =>
602 {
603 }
607 _ => {
608 iterator.replace(existing_item);
612
613 self.push_iterators(next_key.as_ref(), next_key_bound).await?;
616 }
617 }
618 } else {
619 iterator.advance().await?;
620 }
621 if iterator.is_some() {
622 self.heap.push(iterator);
623 }
624 } else {
625 self.push_iterators(next_key.as_ref(), next_key_bound).await?;
626 }
627
628 self.advance_impl(next_key_bound).await
629 }
630
631 fn get(&self) -> Option<ItemRef<'_, K, V>> {
632 (&self.item).into()
633 }
634}
635
636struct MutMergeLayerIterator<'a, K, V>(MergeLayerIterator<'a, K, V>);
637
638impl<K, V> MutMergeLayerIterator<'_, K, V> {
639 fn advance(&mut self) {
640 if let MergeItem::Iter = self.item {
641 self.as_mut().advance();
642 }
643 self.set_item_from_iter();
644 }
645}
646
647impl<'a, K, V> AsMut<dyn LayerIteratorMut<K, V> + 'a> for MutMergeLayerIterator<'a, K, V> {
648 fn as_mut(&mut self) -> &mut (dyn LayerIteratorMut<K, V> + 'a) {
649 let RawIterator::Mut(iter) = &mut self.0.iter else { unreachable!() };
650 iter.as_mut()
651 }
652}
653
654impl<'a, K, V> Deref for MutMergeLayerIterator<'a, K, V> {
655 type Target = MergeLayerIterator<'a, K, V>;
656
657 fn deref(&self) -> &Self::Target {
658 &self.0
659 }
660}
661
662impl<K, V> DerefMut for MutMergeLayerIterator<'_, K, V> {
663 fn deref_mut(&mut self) -> &mut Self::Target {
664 &mut self.0
665 }
666}
667
668pub(super) fn merge_into<K: Debug + OrdLowerBound, V: Debug>(
670 mut_iter: Box<dyn LayerIteratorMut<K, V> + '_>,
671 item: Item<K, V>,
672 merge_fn: MergeFn<K, V>,
673) -> Result<(), Error> {
674 let merge_item = if mut_iter.get().is_some() { MergeItem::Iter } else { MergeItem::None };
675 let mut mut_merge_iter = MutMergeLayerIterator(MergeLayerIterator {
676 layer: None,
677 iter: RawIterator::Mut(mut_iter),
678 layer_index: 1,
679 item: merge_item,
680 });
681 let mut item_merge_iter = MergeLayerIterator::new_with_item(0, MergeItem::Item(item.boxed()));
682 while mut_merge_iter.is_some() && item_merge_iter.is_some() {
683 if mut_merge_iter.0 > item_merge_iter {
684 let merge_result = merge_fn(&mut_merge_iter, &item_merge_iter);
686 debug!(
687 lhs:? = mut_merge_iter.key(),
688 rhs:? = item_merge_iter.key(),
689 result:? = merge_result;
690 "(1) merge");
691 match merge_result {
692 MergeResult::EmitLeft => {
693 if let Some(item) = mut_merge_iter.take_item() {
694 mut_merge_iter.as_mut().insert(*item);
695 mut_merge_iter.set_item_from_iter();
696 } else {
697 mut_merge_iter.advance();
698 }
699 }
700 MergeResult::Other { emit, left, right } => {
701 if let Some(emit) = emit {
702 mut_merge_iter.as_mut().insert(*emit);
703 }
704 match left {
705 ItemOp::Keep => {}
706 ItemOp::Discard => {
707 if matches!(mut_merge_iter.item, MergeItem::Iter) {
708 mut_merge_iter.as_mut().erase();
709 }
710 mut_merge_iter.set_item_from_iter();
711 }
712 ItemOp::Replace(item) => {
713 if let MergeItem::Iter = mut_merge_iter.item {
714 mut_merge_iter.as_mut().erase();
715 }
716 mut_merge_iter.item = MergeItem::Item(item)
717 }
718 }
719 match right {
720 ItemOp::Keep => {}
721 ItemOp::Discard => item_merge_iter.item = MergeItem::None,
722 ItemOp::Replace(item) => item_merge_iter.item = MergeItem::Item(item),
723 }
724 }
725 }
726 } else {
727 let merge_result = merge_fn(&item_merge_iter, &mut_merge_iter);
729 debug!(
730 lhs:? = mut_merge_iter.key(),
731 rhs:? = item_merge_iter.key(),
732 result:? = merge_result;
733 "(2) merge");
734 match merge_result {
735 MergeResult::EmitLeft => break, MergeResult::Other { emit, left, right } => {
737 if let Some(emit) = emit {
738 mut_merge_iter.as_mut().insert(*emit);
739 }
740 match left {
741 ItemOp::Keep => {}
742 ItemOp::Discard => item_merge_iter.item = MergeItem::None,
743 ItemOp::Replace(item) => item_merge_iter.item = MergeItem::Item(item),
744 }
745 match right {
746 ItemOp::Keep => {}
747 ItemOp::Discard => {
748 if matches!(mut_merge_iter.item, MergeItem::Iter) {
749 mut_merge_iter.as_mut().erase();
750 }
751 mut_merge_iter.set_item_from_iter();
752 }
753 ItemOp::Replace(item) => {
754 if let MergeItem::Iter = mut_merge_iter.item {
755 mut_merge_iter.as_mut().erase();
756 }
757 mut_merge_iter.item = MergeItem::Item(item)
758 }
759 }
760 }
761 }
762 }
763 } if let MergeItem::Item(item) = item_merge_iter.item {
768 mut_merge_iter.as_mut().insert(*item);
769 }
770 if let Some(item) = mut_merge_iter.take_item() {
771 mut_merge_iter.as_mut().insert(*item);
772 }
773 if let RawIterator::Mut(mut iter) = mut_merge_iter.0.iter {
774 iter.commit();
775 }
776 Ok(())
777}
778
779#[cfg(test)]
780mod tests {
781 use super::ItemOp::{Discard, Keep, Replace};
782 use super::{MergeLayerIterator, MergeResult, Merger};
783 use crate::lsm_tree::persistent_layer::{PersistentLayer, PersistentLayerWriter};
784 use crate::lsm_tree::skip_list_layer::SkipListLayer;
785 use crate::lsm_tree::types::{
786 FuzzyHash, Item, ItemRef, Key, Layer, LayerIterator, LayerKey, LayerWriter, MergeType,
787 OrdLowerBound, OrdUpperBound, SortByU64,
788 };
789 use crate::lsm_tree::{self, Query, Value};
790 use crate::object_store::{self, ObjectKey, ObjectValue, VOLUME_DATA_KEY_ID};
791 use crate::serialized_types::{
792 LATEST_VERSION, Version, Versioned, VersionedLatest, versioned_type,
793 };
794 use crate::testing::fake_object::{FakeObject, FakeObjectHandle};
795 use crate::testing::writer::Writer;
796 use fprint::TypeFingerprint;
797 use fxfs_macros::FuzzyHash;
798 use rand::Rng;
799 use std::hash::Hash;
800 use std::ops::{Bound, Range};
801 use std::sync::Arc;
802
803 #[derive(
804 Clone,
805 Eq,
806 Hash,
807 FuzzyHash,
808 PartialEq,
809 Debug,
810 serde::Serialize,
811 serde::Deserialize,
812 TypeFingerprint,
813 Versioned,
814 )]
815 struct TestKey(Range<u64>);
816
817 impl Value for i32 {
818 const DELETED_MARKER: Self = 0;
819 }
820
821 versioned_type! { 1.. => TestKey }
822
823 impl SortByU64 for TestKey {
824 fn get_leading_u64(&self) -> u64 {
825 self.0.start
826 }
827 }
828
829 impl LayerKey for TestKey {
830 fn merge_type(&self) -> MergeType {
831 MergeType::OptimizedMerge
832 }
833
834 fn next_key(&self) -> Option<Self> {
835 Some(TestKey(self.0.end..self.0.end + 1))
836 }
837 }
838
839 impl OrdUpperBound for TestKey {
840 fn cmp_upper_bound(&self, other: &TestKey) -> std::cmp::Ordering {
841 self.0.end.cmp(&other.0.end)
842 }
843 }
844
845 impl OrdLowerBound for TestKey {
846 fn cmp_lower_bound(&self, other: &Self) -> std::cmp::Ordering {
847 self.0.start.cmp(&other.0.start)
848 }
849 }
850
851 fn layer_ref_iter<K: Key, V: Value>(
852 layers: &[Arc<SkipListLayer<K, V>>],
853 ) -> impl Iterator<Item = &dyn Layer<K, V>> {
854 layers.iter().map(|x| x.as_ref() as &dyn Layer<K, V>)
855 }
856
857 fn dyn_layer_ref_iter<K: Key, V: Value>(
858 layers: &[Arc<dyn Layer<K, V>>],
859 ) -> impl Iterator<Item = &dyn Layer<K, V>> {
860 layers.iter().map(|x| x.as_ref())
861 }
862
863 fn counters() -> Arc<lsm_tree::TreeCounters> {
864 Arc::new(lsm_tree::TreeCounters::default())
865 }
866
867 #[fuchsia::test]
868 async fn test_emit_left() {
869 let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
870 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
871 skip_lists[0].insert(items[1].clone()).expect("insert error");
872 skip_lists[1].insert(items[0].clone()).expect("insert error");
873 let mut merger = Merger::new(
874 layer_ref_iter(&skip_lists),
875 |_left, _right| MergeResult::EmitLeft,
876 counters(),
877 );
878 let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
879 let ItemRef { key, value, .. } = iter.get().expect("missing item");
880 assert_eq!((key, value), (&items[0].key, &items[0].value));
881 iter.advance().await.unwrap();
882 let ItemRef { key, value, .. } = iter.get().expect("missing item");
883 assert_eq!((key, value), (&items[1].key, &items[1].value));
884 iter.advance().await.unwrap();
885 assert!(iter.get().is_none());
886 }
887
888 #[fuchsia::test]
889 async fn test_other_emit() {
890 let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
891 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
892 skip_lists[0].insert(items[1].clone()).expect("insert error");
893 skip_lists[1].insert(items[0].clone()).expect("insert error");
894 let mut merger = Merger::new(
895 layer_ref_iter(&skip_lists),
896 |_left, _right| MergeResult::Other {
897 emit: Some(Item::new(TestKey(3..3), 3).boxed()),
898 left: Discard,
899 right: Discard,
900 },
901 counters(),
902 );
903 let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
904
905 let ItemRef { key, value, .. } = iter.get().expect("missing item");
906 assert_eq!((key, value), (&TestKey(3..3), &3));
907 iter.advance().await.unwrap();
908 assert!(iter.get().is_none());
909 }
910
911 #[fuchsia::test]
912 async fn test_replace_left() {
913 let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
914 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
915 skip_lists[0].insert(items[1].clone()).expect("insert error");
916 skip_lists[1].insert(items[0].clone()).expect("insert error");
917 let mut merger = Merger::new(
918 layer_ref_iter(&skip_lists),
919 |_left, _right| MergeResult::Other {
920 emit: None,
921 left: Replace(Item::new(TestKey(3..3), 3).boxed()),
922 right: Discard,
923 },
924 counters(),
925 );
926 let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
927
928 let ItemRef { key, value, .. } = iter.get().expect("missing item");
931 assert_eq!((key, value), (&TestKey(3..3), &3));
932 iter.advance().await.unwrap();
933 assert!(iter.get().is_none());
934 }
935
936 #[fuchsia::test]
937 async fn test_replace_right() {
938 let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
939 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
940 skip_lists[0].insert(items[1].clone()).expect("insert error");
941 skip_lists[1].insert(items[0].clone()).expect("insert error");
942 let mut merger = Merger::new(
943 layer_ref_iter(&skip_lists),
944 |_left, _right| MergeResult::Other {
945 emit: None,
946 left: Discard,
947 right: Replace(Item::new(TestKey(3..3), 3).boxed()),
948 },
949 counters(),
950 );
951 let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
952
953 let ItemRef { key, value, .. } = iter.get().expect("missing item");
956 assert_eq!((key, value), (&TestKey(3..3), &3));
957 iter.advance().await.unwrap();
958 assert!(iter.get().is_none());
959 }
960
961 #[fuchsia::test]
962 async fn test_left_less_than_right() {
963 let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
964 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
965 skip_lists[0].insert(items[1].clone()).expect("insert error");
966 skip_lists[1].insert(items[0].clone()).expect("insert error");
967 let mut merger = Merger::new(
968 layer_ref_iter(&skip_lists),
969 |left, right| {
970 assert_eq!((left.key(), left.value()), (&TestKey(1..1), &1));
971 assert_eq!((right.key(), right.value()), (&TestKey(2..2), &2));
972 MergeResult::EmitLeft
973 },
974 counters(),
975 );
976 merger.query(Query::FullScan).await.expect("seek failed");
977 }
978
979 #[fuchsia::test]
980 async fn test_left_equals_right() {
981 let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
982 let item = Item::new(TestKey(1..1), 1);
983 skip_lists[0].insert(item.clone()).expect("insert error");
984 skip_lists[1].insert(item.clone()).expect("insert error");
985 let mut merger = Merger::new(
986 layer_ref_iter(&skip_lists),
987 |left, right| {
988 assert_eq!((left.key(), left.value()), (&TestKey(1..1), &1));
989 assert_eq!((right.key(), right.value()), (&TestKey(1..1), &1));
990 assert_eq!(left.layer_index, 0);
991 assert_eq!(right.layer_index, 1);
992 MergeResult::EmitLeft
993 },
994 counters(),
995 );
996 merger.query(Query::FullScan).await.expect("seek failed");
997 }
998
999 #[fuchsia::test]
1000 async fn test_keep() {
1001 let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1002 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
1003 skip_lists[0].insert(items[1].clone()).expect("insert error");
1004 skip_lists[1].insert(items[0].clone()).expect("insert error");
1005 let mut merger = Merger::new(
1006 layer_ref_iter(&skip_lists),
1007 |left, right| {
1008 if left.key() == &TestKey(1..1) {
1009 MergeResult::Other {
1010 emit: None,
1011 left: Replace(Item::new(TestKey(3..3), 3).boxed()),
1012 right: Keep,
1013 }
1014 } else {
1015 assert_eq!(left.key(), &TestKey(2..2));
1016 assert_eq!(right.key(), &TestKey(3..3));
1017 MergeResult::Other { emit: None, left: Discard, right: Keep }
1018 }
1019 },
1020 counters(),
1021 );
1022 let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
1023
1024 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1027 assert_eq!((key, value), (&TestKey(3..3), &3));
1028 iter.advance().await.unwrap();
1029 assert!(iter.get().is_none());
1030 }
1031
1032 #[fuchsia::test]
1033 async fn test_merge_10_layers() {
1034 let skip_lists: Vec<_> = (0..10).map(|_| SkipListLayer::new(100)).collect();
1035 let mut rng = rand::rng();
1036 for i in 0..100 {
1037 skip_lists[rng.random_range(0..10) as usize]
1038 .insert(Item::new(TestKey(i..i), i))
1039 .expect("insert error");
1040 }
1041 let mut merger = Merger::new(
1042 layer_ref_iter(&skip_lists),
1043 |_left, _right| MergeResult::EmitLeft,
1044 counters(),
1045 );
1046 let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
1047
1048 for i in 0..100 {
1049 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1050 assert_eq!((key, value), (&TestKey(i..i), &i));
1051 iter.advance().await.unwrap();
1052 }
1053 assert!(iter.get().is_none());
1054 }
1055
1056 #[fuchsia::test]
1057 async fn test_merge_uses_cmp_lower_bound() {
1058 let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1059 let items = [Item::new(TestKey(1..10), 1), Item::new(TestKey(2..3), 2)];
1060 skip_lists[0].insert(items[1].clone()).expect("insert error");
1061 skip_lists[1].insert(items[0].clone()).expect("insert error");
1062 let mut merger = Merger::new(
1063 layer_ref_iter(&skip_lists),
1064 |_left, _right| MergeResult::EmitLeft,
1065 counters(),
1066 );
1067 let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
1068
1069 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1070 assert_eq!((key, value), (&items[0].key, &items[0].value));
1071 iter.advance().await.unwrap();
1072 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1073 assert_eq!((key, value), (&items[1].key, &items[1].value));
1074 iter.advance().await.unwrap();
1075 assert!(iter.get().is_none());
1076 }
1077
1078 #[fuchsia::test]
1079 async fn test_merge_into_emit_left() {
1080 let skip_list = SkipListLayer::new(100);
1081 let items =
1082 [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2), Item::new(TestKey(3..3), 3)];
1083 skip_list.insert(items[0].clone()).expect("insert error");
1084 skip_list.insert(items[2].clone()).expect("insert error");
1085 skip_list
1086 .merge_into(items[1].clone(), &items[0].key, |_left, _right| MergeResult::EmitLeft);
1087
1088 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1089 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1090 assert_eq!((key, value), (&items[0].key, &items[0].value));
1091 iter.advance().await.unwrap();
1092 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1093 assert_eq!((key, value), (&items[1].key, &items[1].value));
1094 iter.advance().await.unwrap();
1095 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1096 assert_eq!((key, value), (&items[2].key, &items[2].value));
1097 iter.advance().await.unwrap();
1098 assert!(iter.get().is_none());
1099 }
1100
1101 #[fuchsia::test]
1102 async fn test_merge_into_emit_last_after_replacing() {
1103 let skip_list = SkipListLayer::new(100);
1104 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
1105 skip_list.insert(items[0].clone()).expect("insert error");
1106
1107 skip_list.merge_into(items[1].clone(), &items[0].key, |left, right| {
1108 if left.key() == &TestKey(1..1) {
1109 assert_eq!(right.key(), &TestKey(2..2));
1110 MergeResult::Other {
1111 emit: None,
1112 left: Replace(Item::new(TestKey(3..3), 3).boxed()),
1113 right: Keep,
1114 }
1115 } else {
1116 assert_eq!(left.key(), &TestKey(2..2));
1117 assert_eq!(right.key(), &TestKey(3..3));
1118 MergeResult::EmitLeft
1119 }
1120 });
1121
1122 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1123 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1124 assert_eq!((key, value), (&items[1].key, &items[1].value));
1125 iter.advance().await.unwrap();
1126 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1127 assert_eq!((key, value), (&TestKey(3..3), &3));
1128 iter.advance().await.unwrap();
1129 assert!(iter.get().is_none());
1130 }
1131
1132 #[fuchsia::test]
1133 async fn test_merge_into_emit_left_after_replacing() {
1134 let skip_list = SkipListLayer::new(100);
1135 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(3..3), 3)];
1136 skip_list.insert(items[0].clone()).expect("insert error");
1137
1138 skip_list.merge_into(items[1].clone(), &items[0].key, |left, right| {
1139 if left.key() == &TestKey(1..1) {
1140 assert_eq!(right.key(), &TestKey(3..3));
1141 MergeResult::Other {
1142 emit: None,
1143 left: Replace(Item::new(TestKey(2..2), 2).boxed()),
1144 right: Keep,
1145 }
1146 } else {
1147 assert_eq!(left.key(), &TestKey(2..2));
1148 assert_eq!(right.key(), &TestKey(3..3));
1149 MergeResult::EmitLeft
1150 }
1151 });
1152
1153 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1154 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1155 assert_eq!((key, value), (&TestKey(2..2), &2));
1156 iter.advance().await.unwrap();
1157 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1158 assert_eq!((key, value), (&items[1].key, &items[1].value));
1159 iter.advance().await.unwrap();
1160 assert!(iter.get().is_none());
1161 }
1162
1163 #[fuchsia::test]
1165 async fn test_merge_into_emit_other_and_discard() {
1166 let skip_list = SkipListLayer::new(100);
1167 let items =
1168 [Item::new(TestKey(1..1), 1), Item::new(TestKey(3..3), 3), Item::new(TestKey(5..5), 3)];
1169 skip_list.insert(items[0].clone()).expect("insert error");
1170 skip_list.insert(items[2].clone()).expect("insert error");
1171
1172 skip_list.merge_into(items[1].clone(), &items[0].key, |left, right| {
1173 if left.key() == &TestKey(1..1) {
1174 assert_eq!(right.key(), &TestKey(3..3));
1176 MergeResult::Other {
1177 emit: Some(Item::new(TestKey(2..2), 2).boxed()),
1178 left: Discard,
1179 right: Keep,
1180 }
1181 } else {
1182 assert_eq!(left.key(), &TestKey(3..3));
1184 assert_eq!(right.key(), &TestKey(5..5));
1185 MergeResult::Other {
1186 emit: Some(Item::new(TestKey(4..4), 4).boxed()),
1187 left: Discard,
1188 right: Discard,
1189 }
1190 }
1191 });
1192
1193 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1194 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1195 assert_eq!((key, value), (&TestKey(2..2), &2));
1196 iter.advance().await.unwrap();
1197 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1198 assert_eq!((key, value), (&TestKey(4..4), &4));
1199 iter.advance().await.unwrap();
1200 assert!(iter.get().is_none());
1201 }
1202
1203 #[fuchsia::test]
1206 async fn test_merge_into_replace_and_discard() {
1207 let skip_list = SkipListLayer::new(100);
1208 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(3..3), 3)];
1209 skip_list.insert(items[0].clone()).expect("insert error");
1210
1211 skip_list.merge_into(items[1].clone(), &items[0].key, |_left, _right| MergeResult::Other {
1212 emit: Some(Item::new(TestKey(2..2), 2).boxed()),
1213 left: Replace(Item::new(TestKey(4..4), 4).boxed()),
1214 right: Discard,
1215 });
1216
1217 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1218 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1219 assert_eq!((key, value), (&TestKey(2..2), &2));
1220 iter.advance().await.unwrap();
1221 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1222 assert_eq!((key, value), (&TestKey(4..4), &4));
1223 iter.advance().await.unwrap();
1224 assert!(iter.get().is_none());
1225 }
1226
1227 #[fuchsia::test]
1230 async fn test_merge_into_replace_merge_item() {
1231 let skip_list = SkipListLayer::new(100);
1232 let items =
1233 [Item::new(TestKey(1..1), 1), Item::new(TestKey(3..3), 3), Item::new(TestKey(5..5), 5)];
1234 skip_list.insert(items[0].clone()).expect("insert error");
1235 skip_list.insert(items[2].clone()).expect("insert error");
1236
1237 skip_list.merge_into(items[1].clone(), &items[0].key, |_left, right| {
1238 if right.key() == &TestKey(3..3) {
1239 MergeResult::Other {
1240 emit: None,
1241 left: Discard,
1242 right: Replace(Item::new(TestKey(2..2), 2).boxed()),
1243 }
1244 } else {
1245 assert_eq!(right.key(), &TestKey(5..5));
1246 MergeResult::Other {
1247 emit: None,
1248 left: Replace(Item::new(TestKey(4..4), 4).boxed()),
1249 right: Discard,
1250 }
1251 }
1252 });
1253
1254 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1255 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1256 assert_eq!((key, value), (&TestKey(4..4), &4));
1257 iter.advance().await.unwrap();
1258 assert!(iter.get().is_none());
1259 }
1260
1261 #[fuchsia::test]
1263 async fn test_merge_into_replace_existing() {
1264 let skip_list = SkipListLayer::new(100);
1265 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(3..3), 3)];
1266 skip_list.insert(items[1].clone()).expect("insert error");
1267
1268 skip_list.merge_into(items[0].clone(), &items[0].key, |_left, right| {
1269 if right.key() == &TestKey(3..3) {
1270 MergeResult::Other {
1271 emit: None,
1272 left: Keep,
1273 right: Replace(Item::new(TestKey(2..2), 2).boxed()),
1274 }
1275 } else {
1276 MergeResult::EmitLeft
1277 }
1278 });
1279
1280 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1281 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1282 assert_eq!((key, value), (&items[0].key, &items[0].value));
1283 iter.advance().await.unwrap();
1284 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1285 assert_eq!((key, value), (&TestKey(2..2), &2));
1286 iter.advance().await.unwrap();
1287 assert!(iter.get().is_none());
1288 }
1289
1290 #[fuchsia::test]
1291 async fn test_merge_into_discard_last() {
1292 let skip_list = SkipListLayer::new(100);
1293 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
1294 skip_list.insert(items[0].clone()).expect("insert error");
1295
1296 skip_list.merge_into(items[1].clone(), &items[0].key, |_left, _right| MergeResult::Other {
1297 emit: None,
1298 left: Discard,
1299 right: Keep,
1300 });
1301
1302 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1303 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1304 assert_eq!((key, value), (&items[1].key, &items[1].value));
1305 iter.advance().await.unwrap();
1306 assert!(iter.get().is_none());
1307 }
1308
1309 #[fuchsia::test]
1310 async fn test_merge_into_empty() {
1311 let skip_list = SkipListLayer::new(100);
1312 let items = [Item::new(TestKey(1..1), 1)];
1313
1314 skip_list.merge_into(items[0].clone(), &items[0].key, |_left, _right| {
1315 panic!("Unexpected merge!");
1316 });
1317
1318 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1319 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1320 assert_eq!((key, value), (&items[0].key, &items[0].value));
1321 iter.advance().await.unwrap();
1322 assert!(iter.get().is_none());
1323 }
1324
1325 #[fuchsia::test]
1326 async fn test_seek_uses_minimum_number_of_iterators() {
1327 let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1328 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(1..1), 2)];
1329 skip_lists[0].insert(items[0].clone()).expect("insert error");
1330 skip_lists[1].insert(items[1].clone()).expect("insert error");
1331 let mut merger = Merger::new(
1332 layer_ref_iter(&skip_lists),
1333 |_left, _right| MergeResult::Other { emit: None, left: Discard, right: Keep },
1334 counters(),
1335 );
1336 let iter = merger.query(Query::FullRange(&items[0].key)).await.expect("seek failed");
1337
1338 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1341 assert_eq!((key, value), (&items[0].key, &items[0].value));
1342 }
1343
1344 async fn test_advance<K: Eq + Key + LayerKey + OrdLowerBound>(
1347 layers: &[&[(K, i32)]],
1348 query: Query<'_, K>,
1349 expected: &[(K, i32)],
1350 ) {
1351 let mut skip_lists = Vec::new();
1352 for &layer in layers {
1353 let skip_list = SkipListLayer::new(100);
1354 for (k, v) in layer {
1355 skip_list.insert(Item::new(k.clone(), *v)).expect("insert error");
1356 }
1357 skip_lists.push(skip_list);
1358 }
1359 let mut merger = Merger::new(
1360 layer_ref_iter(&skip_lists),
1361 |_left, _right| MergeResult::EmitLeft,
1362 counters(),
1363 );
1364 let mut iter = merger.query(query).await.expect("seek failed");
1365 for (k, v) in expected {
1366 let ItemRef { key, value, .. } = iter.get().expect("get failed");
1367 assert_eq!((key, value), (k, v));
1368 iter.advance().await.expect("advance failed");
1369 }
1370 assert!(iter.get().is_none());
1371 }
1372
1373 #[fuchsia::test]
1374 async fn test_seek_skips_replaced_items() {
1375 test_advance(
1377 &[
1378 &[(TestKey(1..2), 1), (TestKey(2..3), 2), (TestKey(4..5), 3)],
1379 &[(TestKey(1..2), 4), (TestKey(2..3), 5), (TestKey(3..4), 6)],
1380 ],
1381 Query::FullRange(&TestKey(1..2)),
1382 &[(TestKey(1..2), 1), (TestKey(2..3), 2), (TestKey(3..4), 6), (TestKey(4..5), 3)],
1383 )
1384 .await;
1385 }
1386
1387 #[fuchsia::test]
1388 async fn test_advance_skips_replaced_items_at_end() {
1389 test_advance(
1392 &[&[(TestKey(1..2), 1)], &[(TestKey(1..2), 2)]],
1393 Query::FullRange(&TestKey(1..2)),
1394 &[(TestKey(1..2), 1)],
1395 )
1396 .await;
1397 }
1398
1399 #[derive(
1400 Clone,
1401 Eq,
1402 Hash,
1403 FuzzyHash,
1404 PartialEq,
1405 Debug,
1406 serde::Serialize,
1407 serde::Deserialize,
1408 TypeFingerprint,
1409 Versioned,
1410 )]
1411 struct TestKeyWithFullMerge(Range<u64>);
1412
1413 versioned_type! { 1.. => TestKeyWithFullMerge }
1414
1415 impl LayerKey for TestKeyWithFullMerge {
1416 fn merge_type(&self) -> MergeType {
1417 MergeType::FullMerge
1418 }
1419 }
1420
1421 impl SortByU64 for TestKeyWithFullMerge {
1422 fn get_leading_u64(&self) -> u64 {
1423 self.0.start
1424 }
1425 }
1426
1427 impl OrdUpperBound for TestKeyWithFullMerge {
1428 fn cmp_upper_bound(&self, other: &TestKeyWithFullMerge) -> std::cmp::Ordering {
1429 self.0.end.cmp(&other.0.end)
1430 }
1431 }
1432
1433 impl OrdLowerBound for TestKeyWithFullMerge {
1434 fn cmp_lower_bound(&self, other: &Self) -> std::cmp::Ordering {
1435 self.0.start.cmp(&other.0.start)
1436 }
1437 }
1438
1439 #[fuchsia::test]
1442 async fn test_full_merge_consistent_advance_ordering() {
1443 let layer_set = [
1444 [
1445 (TestKeyWithFullMerge(1..2), 1i32),
1446 (TestKeyWithFullMerge(2..3), 2i32),
1447 (TestKeyWithFullMerge(4..5), 3i32),
1448 ]
1449 .as_slice(),
1450 [
1451 (TestKeyWithFullMerge(1..2), 4i32),
1452 (TestKeyWithFullMerge(2..3), 5i32),
1453 (TestKeyWithFullMerge(3..4), 6i32),
1454 ]
1455 .as_slice(),
1456 ];
1457
1458 let full_merge_result = [
1459 (TestKeyWithFullMerge(1..2), 1),
1460 (TestKeyWithFullMerge(1..2), 4),
1461 (TestKeyWithFullMerge(2..3), 2),
1462 (TestKeyWithFullMerge(2..3), 5),
1463 (TestKeyWithFullMerge(3..4), 6),
1464 (TestKeyWithFullMerge(4..5), 3),
1465 ];
1466
1467 test_advance(layer_set.as_slice(), Query::FullScan, &full_merge_result).await;
1468
1469 test_advance(
1470 layer_set.as_slice(),
1471 Query::FullRange(&TestKeyWithFullMerge(1..2)),
1472 &full_merge_result,
1473 )
1474 .await;
1475
1476 test_advance(
1477 layer_set.as_slice(),
1478 Query::FullRange(&TestKeyWithFullMerge(2..3)),
1479 &full_merge_result[2..],
1480 )
1481 .await;
1482
1483 test_advance(
1484 layer_set.as_slice(),
1485 Query::FullRange(&TestKeyWithFullMerge(3..4)),
1486 &full_merge_result[4..],
1487 )
1488 .await;
1489 }
1490
1491 #[fuchsia::test]
1492 async fn test_full_merge_always_consult_all_layers() {
1493 let skip_lists =
1497 [SkipListLayer::new(100), SkipListLayer::new(100), SkipListLayer::new(100)];
1498 let items = [
1499 Item::new(TestKeyWithFullMerge(1..2), 1),
1500 Item::new(TestKeyWithFullMerge(2..3), 2),
1501 Item::new(TestKeyWithFullMerge(1..2), 3),
1502 Item::new(TestKeyWithFullMerge(2..3), 4),
1503 ];
1504 skip_lists[0].insert(items[0].clone()).expect("insert error");
1505 skip_lists[1].insert(items[1].clone()).expect("insert error");
1506 skip_lists[2].insert(items[2].clone()).expect("insert error");
1507 skip_lists[2].insert(items[3].clone()).expect("insert error");
1508 let mut merger = Merger::new(
1509 layer_ref_iter(&skip_lists),
1510 |left, right| {
1511 if left.key() == right.key() {
1513 MergeResult::Other {
1514 emit: None,
1515 left: Discard,
1516 right: Replace(
1517 Item::new(left.key().clone(), left.value() + right.value()).boxed(),
1518 ),
1519 }
1520 } else {
1521 MergeResult::EmitLeft
1522 }
1523 },
1524 counters(),
1525 );
1526 let mut iter = merger.query(Query::FullRange(&items[0].key)).await.expect("seek failed");
1527
1528 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1529 assert_eq!((key, *value), (&items[0].key, items[0].value + items[2].value));
1530 iter.advance().await.expect("advance");
1531 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1532 assert_eq!((key, *value), (&items[1].key, items[1].value + items[3].value));
1533 iter.advance().await.expect("advance");
1534 assert!(iter.get().is_none());
1535 }
1536
1537 #[derive(
1538 Clone,
1539 Eq,
1540 Hash,
1541 FuzzyHash,
1542 PartialEq,
1543 Debug,
1544 serde::Serialize,
1545 serde::Deserialize,
1546 TypeFingerprint,
1547 Versioned,
1548 )]
1549 struct TestKeyWithDefaultLayerKey(Range<u64>);
1550
1551 versioned_type! { 1.. => TestKeyWithDefaultLayerKey }
1552
1553 impl LayerKey for TestKeyWithDefaultLayerKey {}
1555
1556 impl SortByU64 for TestKeyWithDefaultLayerKey {
1557 fn get_leading_u64(&self) -> u64 {
1558 self.0.start
1559 }
1560 }
1561
1562 impl OrdUpperBound for TestKeyWithDefaultLayerKey {
1563 fn cmp_upper_bound(&self, other: &TestKeyWithDefaultLayerKey) -> std::cmp::Ordering {
1564 self.0.end.cmp(&other.0.end)
1565 }
1566 }
1567
1568 impl OrdLowerBound for TestKeyWithDefaultLayerKey {
1569 fn cmp_lower_bound(&self, other: &Self) -> std::cmp::Ordering {
1570 self.0.start.cmp(&other.0.start)
1571 }
1572 }
1573
1574 #[fuchsia::test]
1575 async fn test_no_merge_unbounded_include_all_layers() {
1576 test_advance(
1577 &[
1578 &[
1579 (TestKeyWithDefaultLayerKey(1..2), 1),
1580 (TestKeyWithDefaultLayerKey(2..3), 2),
1581 (TestKeyWithDefaultLayerKey(4..5), 3),
1582 ],
1583 &[
1584 (TestKeyWithDefaultLayerKey(1..2), 4),
1585 (TestKeyWithDefaultLayerKey(2..3), 5),
1586 (TestKeyWithDefaultLayerKey(3..4), 6),
1587 ],
1588 ],
1589 Query::FullScan,
1590 &[
1591 (TestKeyWithDefaultLayerKey(1..2), 1),
1592 (TestKeyWithDefaultLayerKey(1..2), 4),
1593 (TestKeyWithDefaultLayerKey(2..3), 2),
1594 (TestKeyWithDefaultLayerKey(2..3), 5),
1595 (TestKeyWithDefaultLayerKey(3..4), 6),
1596 (TestKeyWithDefaultLayerKey(4..5), 3),
1597 ],
1598 )
1599 .await;
1600 }
1601
1602 #[fuchsia::test]
1603 async fn test_no_merge_proceeds_comprehensively_after_seek() {
1604 test_advance(
1605 &[
1606 &[
1607 (TestKeyWithDefaultLayerKey(1..2), 1),
1608 (TestKeyWithDefaultLayerKey(2..3), 2),
1609 (TestKeyWithDefaultLayerKey(4..5), 3),
1610 ],
1611 &[
1612 (TestKeyWithDefaultLayerKey(1..2), 4),
1613 (TestKeyWithDefaultLayerKey(2..3), 5),
1614 (TestKeyWithDefaultLayerKey(3..4), 6),
1615 ],
1616 ],
1617 Query::FullRange(&TestKeyWithDefaultLayerKey(1..2)),
1618 &[
1619 (TestKeyWithDefaultLayerKey(1..2), 1),
1620 (TestKeyWithDefaultLayerKey(1..2), 4),
1621 (TestKeyWithDefaultLayerKey(2..3), 2),
1622 (TestKeyWithDefaultLayerKey(2..3), 5),
1623 (TestKeyWithDefaultLayerKey(3..4), 6),
1624 (TestKeyWithDefaultLayerKey(4..5), 3),
1625 ],
1626 )
1627 .await;
1628 }
1629
1630 #[fuchsia::test]
1631 async fn test_no_merge_seek_finds_lower_layer() {
1632 let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1633 let items = [
1634 Item::new(TestKeyWithDefaultLayerKey(2..3), 1),
1635 Item::new(TestKeyWithDefaultLayerKey(1..1), 2),
1636 ];
1637 skip_lists[0].insert(items[0].clone()).expect("insert error");
1638 skip_lists[1].insert(items[1].clone()).expect("insert error");
1639 let mut merger = Merger::new(
1640 layer_ref_iter(&skip_lists),
1641 |_left, _right| MergeResult::EmitLeft,
1642 counters(),
1643 );
1644 let iter = merger.query(Query::FullRange(&items[1].key)).await.expect("seek failed");
1645
1646 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1647 assert_eq!((key, value), (&items[1].key, &items[1].value));
1648 }
1649
1650 #[fuchsia::test]
1651 async fn test_no_merge_seek_stops_at_exact_match() {
1652 let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1653 let items = [
1654 Item::new(TestKeyWithDefaultLayerKey(2..3), 1),
1655 Item::new(TestKeyWithDefaultLayerKey(1..4), 2),
1656 ];
1657 skip_lists[0].insert(items[0].clone()).expect("insert error");
1658 skip_lists[1].insert(items[1].clone()).expect("insert error");
1659 let mut merger = Merger::new(
1660 layer_ref_iter(&skip_lists),
1661 |_left, _right| MergeResult::Other { emit: None, left: Discard, right: Keep },
1662 counters(),
1663 );
1664 let iter = merger.query(Query::FullRange(&items[0].key)).await.expect("seek failed");
1665
1666 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1669 assert_eq!((key, value), (&items[0].key, &items[0].value));
1670 }
1671
1672 #[fuchsia::test]
1673 async fn test_seek_less_than() {
1674 let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1675 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
1676 skip_lists[0].insert(items[0].clone()).expect("insert error");
1677 skip_lists[1].insert(items[1].clone()).expect("insert error");
1678 let mut merger = Merger::new(
1680 layer_ref_iter(&skip_lists),
1681 |_left, _right| MergeResult::Other { emit: None, left: Discard, right: Keep },
1682 counters(),
1683 );
1684 let iter = merger.query(Query::FullRange(&TestKey(0..0))).await.expect("seek failed");
1685
1686 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1688 assert_eq!((key, value), (&items[1].key, &items[1].value));
1689 }
1690
1691 #[fuchsia::test]
1692 async fn test_seek_to_end() {
1693 let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1694 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
1695 skip_lists[0].insert(items[0].clone()).expect("insert error");
1696 skip_lists[1].insert(items[1].clone()).expect("insert error");
1697 let mut merger = Merger::new(
1698 layer_ref_iter(&skip_lists),
1699 |_left, _right| MergeResult::Other { emit: None, left: Discard, right: Keep },
1700 counters(),
1701 );
1702 let iter = merger.query(Query::FullRange(&TestKey(3..3))).await.expect("seek failed");
1703
1704 assert!(iter.get().is_none());
1705 }
1706
1707 #[fuchsia::test]
1708 async fn test_merge_all_discarded() {
1709 let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1710 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
1711 skip_lists[0].insert(items[1].clone()).expect("insert error");
1712 skip_lists[1].insert(items[0].clone()).expect("insert error");
1713 let mut merger = Merger::new(
1714 layer_ref_iter(&skip_lists),
1715 |_left, _right| MergeResult::Other { emit: None, left: Discard, right: Discard },
1716 counters(),
1717 );
1718 let iter = merger.query(Query::FullScan).await.expect("seek failed");
1719 assert!(iter.get().is_none());
1720 }
1721
1722 #[fuchsia::test]
1723 async fn test_seek_with_merged_key_less_than() {
1724 let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1725 let items = [Item::new(TestKey(1..8), 1), Item::new(TestKey(2..10), 2)];
1726 skip_lists[0].insert(items[0].clone()).expect("insert error");
1727 skip_lists[1].insert(items[1].clone()).expect("insert error");
1728 let mut merger = Merger::new(
1729 layer_ref_iter(&skip_lists),
1730 |left, _right| {
1731 if left.key() == &TestKey(1..8) {
1732 MergeResult::Other {
1733 emit: None,
1734 left: Replace(Item::new(TestKey(1..2), 1).boxed()),
1735 right: Keep,
1736 }
1737 } else {
1738 MergeResult::EmitLeft
1739 }
1740 },
1741 counters(),
1742 );
1743 let iter = merger.query(Query::FullRange(&TestKey(0..3))).await.expect("seek failed");
1744 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1745 assert_eq!((key, value), (&items[1].key, &items[1].value));
1746 }
1747
1748 #[fuchsia::test]
1749 async fn test_overlapping_keys() {
1750 let skip_lists =
1751 [SkipListLayer::new(100), SkipListLayer::new(100), SkipListLayer::new(100)];
1752 let items = [
1753 Item::new(TestKey(0..10), 1),
1754 Item::new(TestKey(0..20), 2),
1755 Item::new(TestKey(0..30), 3),
1756 ];
1757 skip_lists[0].insert(items[0].clone()).expect("insert error");
1758 skip_lists[1].insert(items[1].clone()).expect("insert error");
1759 skip_lists[2].insert(items[2].clone()).expect("insert error");
1760 let mut merger = Merger::new(
1761 layer_ref_iter(&skip_lists),
1762 |left, right| {
1763 let result = if left.key().0.end <= right.key().0.start {
1764 MergeResult::EmitLeft
1765 } else {
1766 if left.key() == &TestKey(0..30) && right.key() == &TestKey(10..20) {
1767 MergeResult::Other {
1768 emit: Some(Item::new(TestKey(0..10), 1).boxed()),
1769 left: Replace(Item::new(TestKey(10..30), 1).boxed()),
1770 right: Keep,
1771 }
1772 } else {
1773 MergeResult::Other {
1774 emit: None,
1775 left: Keep,
1776 right: Replace(
1777 Item::new(TestKey(left.key().0.end..right.key().0.end), 1).boxed(),
1778 ),
1779 }
1780 }
1781 };
1782 result
1783 },
1784 counters(),
1785 );
1786 let mut iter = merger.query(Query::FullRange(&TestKey(0..1))).await.expect("seek failed");
1787 let ItemRef { key, .. } = iter.get().expect("missing item");
1788 assert_eq!(key, &TestKey(0..10));
1789 iter.advance().await.expect("advance failed");
1790 let ItemRef { key, .. } = iter.get().expect("missing item");
1791 assert_eq!(key, &TestKey(10..20));
1792 iter.advance().await.expect("advance failed");
1793 let ItemRef { key, .. } = iter.get().expect("missing item");
1794 assert_eq!(key, &TestKey(20..30));
1795 iter.advance().await.expect("advance failed");
1796 assert_eq!(iter.get(), None);
1797 }
1798
1799 async fn write_layer<K: Key, V: Value>(items: Vec<Item<K, V>>) -> Arc<dyn Layer<K, V>> {
1800 let object = Arc::new(FakeObject::new());
1801 let write_handle = FakeObjectHandle::new(object.clone());
1802 let mut writer =
1803 PersistentLayerWriter::<_, K, V>::new(Writer::new(&write_handle).await, 1, 512)
1804 .await
1805 .expect("PersistentLayerWriter::new failed");
1806 for item in items {
1807 writer.write(item.as_item_ref()).await.expect("write failed");
1808 }
1809 writer.flush().await.expect("flush failed");
1810 PersistentLayer::open(FakeObjectHandle::new(object))
1811 .await
1812 .expect("open_persistent_layer failed")
1813 }
1814
1815 fn merge_sum(
1816 left: &MergeLayerIterator<'_, i32, i32>,
1817 right: &MergeLayerIterator<'_, i32, i32>,
1818 ) -> MergeResult<i32, i32> {
1819 if left.key() == right.key() {
1821 MergeResult::Other {
1822 emit: None,
1823 left: Discard,
1824 right: Replace(Item::new(left.key().clone(), left.value() + right.value()).boxed()),
1825 }
1826 } else {
1827 MergeResult::EmitLeft
1828 }
1829 }
1830
1831 #[fuchsia::test]
1832 async fn test_merge_bloom_filters_point_query() {
1833 let layer_0_items = vec![Item::new(1, 1), Item::new(2, 1)];
1834 let layer_1_items = vec![Item::new(2, 1), Item::new(4, 1)];
1835 let items = [Item::new(1, 1), Item::new(2, 2), Item::new(4, 1)];
1836 let layers: [Arc<dyn Layer<i32, i32>>; 2] =
1837 [write_layer(layer_0_items).await, write_layer(layer_1_items).await];
1838 let mut merger = Merger::new(dyn_layer_ref_iter(&layers), merge_sum, counters());
1839
1840 {
1841 let iter = merger.query(Query::Point(&1)).await.expect("seek failed");
1843 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1844 assert_eq!((key, *value), (&items[0].key, items[0].value));
1845 }
1846 {
1847 let iter = merger.query(Query::Point(&2)).await.expect("seek failed");
1849 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1850 assert_eq!((key, *value), (&items[1].key, items[1].value));
1851 }
1852 {
1853 let iter = merger.query(Query::Point(&4)).await.expect("seek failed");
1855 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1856 assert_eq!((key, *value), (&items[2].key, items[2].value));
1857 }
1858 {
1859 let iter = merger.query(Query::Point(&400)).await.expect("seek failed");
1861 assert!(iter.get().is_none());
1862 }
1863 }
1864
1865 #[fuchsia::test]
1866 async fn test_merge_bloom_filters_limited_range() {
1867 let layer_0_items = vec![Item::new(
1870 ObjectKey::extent(0, 0, 0..2048),
1871 ObjectValue::extent(0, VOLUME_DATA_KEY_ID),
1872 )];
1873 let layer_1_items = vec![
1874 Item::new(
1875 ObjectKey::extent(0, 0, 1024..4096),
1876 ObjectValue::extent(32768, VOLUME_DATA_KEY_ID),
1877 ),
1878 Item::new(
1879 ObjectKey::extent(0, 0, 16384..17408),
1880 ObjectValue::extent(65536, VOLUME_DATA_KEY_ID),
1881 ),
1882 ];
1883 let items = [
1884 Item::new(ObjectKey::extent(0, 0, 0..2048), ObjectValue::extent(0, VOLUME_DATA_KEY_ID)),
1885 Item::new(
1886 ObjectKey::extent(0, 0, 2048..4096),
1887 ObjectValue::extent(33792, VOLUME_DATA_KEY_ID),
1888 ),
1889 Item::new(
1890 ObjectKey::extent(0, 0, 16384..17408),
1891 ObjectValue::extent(65536, VOLUME_DATA_KEY_ID),
1892 ),
1893 ];
1894 let layers: [Arc<dyn Layer<ObjectKey, ObjectValue>>; 2] =
1895 [write_layer(layer_0_items).await, write_layer(layer_1_items).await];
1896 let mut merger =
1897 Merger::new(dyn_layer_ref_iter(&layers), object_store::merge::merge, counters());
1898
1899 {
1900 let mut iter = merger
1902 .query(Query::LimitedRange(&ObjectKey::extent(0, 0, 16384..16386)))
1903 .await
1904 .expect("seek failed");
1905 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1906 assert_eq!(key, &items[2].key);
1907 assert_eq!(value, &items[2].value);
1908 iter.advance().await.expect("advance");
1909 assert!(iter.get().is_none());
1910 }
1911 {
1912 let mut iter = merger
1914 .query(Query::LimitedRange(&ObjectKey::extent(0, 0, 0..4096)))
1915 .await
1916 .expect("seek failed");
1917 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1918 assert_eq!(key, &items[0].key);
1919 assert_eq!(value, &items[0].value);
1920 iter.advance().await.expect("advance");
1921 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1922 assert_eq!(key, &items[1].key);
1923 assert_eq!(value, &items[1].value);
1924 iter.advance().await.expect("advance");
1925 }
1926 {
1927 let mut iter = merger
1929 .query(Query::LimitedRange(&ObjectKey::extent(0, 0, 8192..12288)))
1930 .await
1931 .expect("seek failed");
1932 let ItemRef { key, .. } = iter.get().expect("missing item");
1933 assert_eq!(key, &items[2].key);
1934 iter.advance().await.expect("advance");
1935 assert!(iter.get().is_none());
1936 }
1937 }
1938
1939 #[fuchsia::test]
1940 async fn test_merge_bloom_filters_full_range() {
1941 let layer_0_items = vec![Item::new(1, 1), Item::new(2, 1)];
1942 let layer_1_items = vec![Item::new(2, 1), Item::new(4, 1)];
1943 let items = [Item::new(1, 1), Item::new(2, 2), Item::new(4, 1)];
1944 let layers: [Arc<dyn Layer<i32, i32>>; 2] =
1945 [write_layer(layer_0_items).await, write_layer(layer_1_items).await];
1946 let mut merger = Merger::new(dyn_layer_ref_iter(&layers), merge_sum, counters());
1947
1948 let mut iter = merger.query(Query::FullRange(&0)).await.expect("seek failed");
1949 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1950 assert_eq!((key, *value), (&items[0].key, items[0].value));
1951 iter.advance().await.expect("advance");
1952 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1953 assert_eq!((key, *value), (&items[1].key, items[1].value));
1954 iter.advance().await.expect("advance");
1955 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1956 assert_eq!((key, *value), (&items[2].key, items[2].value));
1957 iter.advance().await.expect("advance");
1958 assert!(iter.get().is_none());
1959 }
1960
1961 #[fuchsia::test]
1962 async fn test_merge_bloom_filters_full_scan() {
1963 let layer_0_items = vec![Item::new(1, 1), Item::new(2, 1)];
1964 let layer_1_items = vec![Item::new(2, 1), Item::new(4, 1)];
1965 let items = [Item::new(1, 1), Item::new(2, 2), Item::new(4, 1)];
1966 let layers: [Arc<dyn Layer<i32, i32>>; 2] =
1967 [write_layer(layer_0_items).await, write_layer(layer_1_items).await];
1968 let mut merger = Merger::new(dyn_layer_ref_iter(&layers), merge_sum, counters());
1969
1970 let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
1971 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1972 assert_eq!((key, *value), (&items[0].key, items[0].value));
1973 iter.advance().await.expect("advance");
1974 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1975 assert_eq!((key, *value), (&items[1].key, items[1].value));
1976 iter.advance().await.expect("advance");
1977 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1978 assert_eq!((key, *value), (&items[2].key, items[2].value));
1979 iter.advance().await.expect("advance");
1980 assert!(iter.get().is_none());
1981 }
1982}