1use crate::log::*;
6use crate::lsm_tree;
7use crate::lsm_tree::types::{
8 BoxedItem, Item, ItemRef, Key, Layer, LayerIterator, LayerIteratorMut, LayerKey, MergeType,
9 OrdLowerBound, Value,
10};
11use anyhow::Error;
12use async_trait::async_trait;
13use fuchsia_sync::Mutex;
14use futures::try_join;
15use std::cmp::Ordering;
16use std::collections::BinaryHeap;
17use std::fmt::{Debug, Write};
18use std::ops::{Bound, Deref, DerefMut};
19use std::sync::Arc;
20
21#[derive(Debug, Eq, PartialEq)]
22pub enum ItemOp<K, V> {
23 Keep,
25
26 Discard,
28
29 Replace(BoxedItem<K, V>),
32}
33
34#[derive(Debug, Eq, PartialEq)]
35pub enum MergeResult<K, V> {
36 EmitLeft,
39
40 Other { emit: Option<BoxedItem<K, V>>, left: ItemOp<K, V>, right: ItemOp<K, V> },
61}
62
63pub type MergeFn<K, V> =
68 fn(&MergeLayerIterator<'_, K, V>, &MergeLayerIterator<'_, K, V>) -> MergeResult<K, V>;
69
70pub enum MergeItem<K, V> {
71 None,
72 Item(BoxedItem<K, V>),
73 Iter,
74}
75
76enum RawIterator<'a, K, V> {
77 None,
78 Const(Box<dyn LayerIterator<K, V> + 'a>),
79 Mut(Box<dyn LayerIteratorMut<K, V> + 'a>),
80}
81
82unsafe impl<K, V> Send for RawIterator<'_, K, V> {}
85
86pub struct MergeLayerIterator<'a, K, V> {
89 layer: Option<&'a dyn Layer<K, V>>,
90
91 iter: RawIterator<'a, K, V>,
93
94 pub layer_index: u16,
96
97 item: MergeItem<K, V>,
99}
100
101impl<'a, K, V> MergeLayerIterator<'a, K, V> {
102 pub fn key(&self) -> &K {
103 self.item().key
104 }
105
106 pub fn value(&self) -> &V {
107 self.item().value
108 }
109
110 pub fn sequence(&self) -> u64 {
111 self.item().sequence
112 }
113
114 fn new(layer_index: u16, layer: &'a dyn Layer<K, V>) -> Self {
115 MergeLayerIterator {
116 layer: Some(layer),
117 iter: RawIterator::None,
118 layer_index,
119 item: MergeItem::None,
120 }
121 }
122
123 fn new_with_item(layer_index: u16, item: MergeItem<K, V>) -> Self {
124 MergeLayerIterator { layer: None, iter: RawIterator::None, layer_index, item }
125 }
126
127 fn item(&self) -> ItemRef<'_, K, V> {
128 match &self.item {
129 MergeItem::None => panic!("No item!"),
130 MergeItem::Item(item) => ItemRef::from(item),
131 MergeItem::Iter => self.get().unwrap(),
132 }
133 }
134
135 fn get(&self) -> Option<ItemRef<'_, K, V>> {
136 match &self.iter {
137 RawIterator::None => panic!("No iterator!"),
138 RawIterator::Const(iter) => iter.get(),
139 RawIterator::Mut(iter) => iter.get(),
140 }
141 }
142
143 fn set_item_from_iter(&mut self) {
144 self.item = if match &self.iter {
145 RawIterator::None => unreachable!(),
146 RawIterator::Const(iter) => iter.get(),
147 RawIterator::Mut(iter) => iter.get(),
148 }
149 .is_some()
150 {
151 MergeItem::Iter
152 } else {
153 MergeItem::None
154 };
155 }
156
157 fn take_item(&mut self) -> Option<BoxedItem<K, V>> {
158 if let MergeItem::Item(_) = self.item {
159 let mut item = MergeItem::None;
160 std::mem::swap(&mut self.item, &mut item);
161 if let MergeItem::Item(item) = item {
162 Some(item)
163 } else {
164 unreachable!();
165 }
166 } else {
167 None
168 }
169 }
170
171 async fn advance(&mut self) -> Result<(), Error> {
172 if let MergeItem::Iter = self.item {
173 if let RawIterator::Const(iter) = &mut self.iter {
174 iter.advance().await?;
175 } else {
176 unreachable!();
178 }
179 }
180 self.set_item_from_iter();
181 Ok(())
182 }
183
184 fn replace(&mut self, item: BoxedItem<K, V>) {
185 self.item = MergeItem::Item(item);
186 }
187
188 fn is_some(&self) -> bool {
189 !matches!(self.item, MergeItem::None)
190 }
191
192 async fn maybe_advance(&mut self, op: &ItemOp<K, V>) -> Result<(), Error> {
195 if let ItemOp::Keep = op { Ok(()) } else { self.advance().await }
196 }
197}
198
199impl<K: OrdLowerBound, V> Ord for MergeLayerIterator<'_, K, V> {
201 fn cmp(&self, other: &Self) -> Ordering {
202 other.key().cmp_lower_bound(self.key()).then(other.layer_index.cmp(&self.layer_index))
204 }
205}
206impl<K: OrdLowerBound, V> PartialOrd for MergeLayerIterator<'_, K, V> {
207 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
208 Some(self.cmp(other))
209 }
210}
211impl<K: OrdLowerBound, V> PartialEq for MergeLayerIterator<'_, K, V> {
212 fn eq(&self, other: &Self) -> bool {
213 self.cmp(other) == Ordering::Equal
214 }
215}
216impl<K: OrdLowerBound, V> Eq for MergeLayerIterator<'_, K, V> {}
217
218enum CurrentItem<'a, 'b, K, V> {
221 None,
222 Item(BoxedItem<K, V>),
223 Iterator(&'a mut MergeLayerIterator<'b, K, V>),
224}
225
226impl<'a, 'b, K, V> CurrentItem<'a, 'b, K, V> {
227 fn take_iterator(&mut self) -> Option<&'a mut MergeLayerIterator<'b, K, V>> {
230 if let CurrentItem::Iterator(_) = self {
231 let mut result = CurrentItem::None;
232 std::mem::swap(self, &mut result);
233 if let CurrentItem::Iterator(iter) = result {
234 Some(iter)
235 } else {
236 unreachable!();
237 }
238 } else {
239 None
240 }
241 }
242}
243
244impl<'a, K, V> From<&'a CurrentItem<'_, '_, K, V>> for Option<ItemRef<'a, K, V>> {
245 fn from(iter: &'a CurrentItem<'_, '_, K, V>) -> Option<ItemRef<'a, K, V>> {
246 match iter {
247 CurrentItem::None => None,
248 CurrentItem::Iterator(iterator) => Some(iterator.item()),
249 CurrentItem::Item(item) => Some(item.into()),
250 }
251 }
252}
253
254pub struct Merger<'a, K, V> {
256 iterators: Vec<MergeLayerIterator<'a, K, V>>,
258
259 merge_fn: MergeFn<K, V>,
261
262 trace: bool,
264
265 counters: Arc<Mutex<lsm_tree::Counters>>,
267}
268
269#[derive(Debug, Clone)]
276pub enum Query<'a, K: Key + LayerKey + OrdLowerBound> {
277 Point(&'a K),
282 LimitedRange(&'a K),
289 FullRange(&'a K),
292 FullScan,
295}
296
297impl<'a, K: Key + LayerKey + OrdLowerBound> Query<'a, K> {
298 fn needs_layer<V>(&self, layer: &dyn Layer<K, V>) -> bool {
299 match self {
300 Self::Point(key) => layer.maybe_contains_key(key),
301 Self::LimitedRange(key) => layer.maybe_contains_key(key),
302 Self::FullRange(_) => true,
303 Self::FullScan => true,
304 }
305 }
306}
307
308#[fxfs_trace::trace]
309impl<'a, K: Key + LayerKey + OrdLowerBound, V: Value> Merger<'a, K, V> {
310 pub(super) fn new<I: Iterator<Item = &'a dyn Layer<K, V>>>(
311 layers: I,
312 merge_fn: MergeFn<K, V>,
313 counters: Arc<Mutex<lsm_tree::Counters>>,
314 ) -> Merger<'a, K, V> {
315 Merger {
316 iterators: layers
317 .enumerate()
318 .map(|(index, layer)| MergeLayerIterator::new(index as u16, layer))
319 .collect(),
320 merge_fn: merge_fn,
321 trace: false,
322 counters,
323 }
324 }
325
326 #[trace]
329 pub async fn query(
330 &mut self,
331 query: Query<'_, K>,
332 ) -> Result<MergerIterator<'_, 'a, K, V>, Error> {
333 if let Query::Point(key) = query {
334 debug_assert!(!key.is_range_key())
342 };
343 let len = self.iterators.len();
344 let pending_iterators = {
345 fxfs_trace::duration!(c"Merger::filter_layer_files", "len" => len);
346 self.iterators
347 .iter_mut()
348 .rev()
349 .filter(|l| query.needs_layer(l.layer.clone().unwrap()))
350 .collect::<Vec<&mut MergeLayerIterator<'a, K, V>>>()
351 };
352 let layer_count = pending_iterators.len();
353 {
354 let mut counters = self.counters.lock();
355 counters.num_seeks += 1;
356 counters.layer_files_total += len;
357 counters.layer_files_skipped += len - layer_count;
358 }
359 log::debug!(query:?; "Consulting {}/{} layers", layer_count, len);
360 let mut merger_iter = MergerIterator {
361 merge_fn: self.merge_fn,
362 pending_iterators,
363 heap: BinaryHeap::with_capacity(layer_count),
364 item: CurrentItem::None,
365 trace: self.trace,
366 history: String::new(),
367 };
368 let owned_bound;
369 let bound = match query {
370 Query::Point(key) => Bound::Included(key),
371 Query::LimitedRange(key) => {
372 owned_bound = Bound::Included(key.search_key());
373 owned_bound.as_ref()
374 }
375 Query::FullRange(start) => Bound::Included(start),
376 Query::FullScan => Bound::Unbounded,
377 };
378 merger_iter.seek(bound).await?;
379 Ok(merger_iter)
380 }
381
382 pub fn set_trace(&mut self, v: bool) {
383 self.trace = v;
384 }
385}
386
387pub struct MergerIterator<'a, 'b, K, V> {
390 merge_fn: MergeFn<K, V>,
391
392 pending_iterators: Vec<&'a mut MergeLayerIterator<'b, K, V>>,
394
395 heap: BinaryHeap<&'a mut MergeLayerIterator<'b, K, V>>,
397
398 item: CurrentItem<'a, 'b, K, V>,
400
401 trace: bool,
403
404 history: String,
406}
407
408impl<'a, 'b, K: Key + LayerKey + OrdLowerBound, V: Value> MergerIterator<'a, 'b, K, V> {
409 async fn seek(&mut self, bound: Bound<&K>) -> Result<(), Error> {
410 let next_key = match bound {
411 Bound::Unbounded => None,
412 Bound::Included(key) => Some(key),
413 Bound::Excluded(_) => panic!("Excluded bounds not supported!"),
414 };
415
416 self.push_iterators(next_key, bound).await?;
417 self.advance_impl(bound).await
418 }
419
420 async fn advance_impl(&mut self, next_key_bound: Bound<&K>) -> Result<(), Error> {
426 loop {
427 loop {
428 if self.heap.is_empty() {
429 self.item = CurrentItem::None;
430 return Ok(());
431 }
432 let lowest = self.heap.pop().unwrap();
433 let maybe_second_lowest = self.heap.pop();
434 if let Some(second_lowest) = maybe_second_lowest {
435 let result = (self.merge_fn)(&lowest, &second_lowest);
436 if self.trace {
437 writeln!(
438 self.history,
439 "merge {:?}, {:?} -> {:?}",
440 lowest.item(),
441 second_lowest.item(),
442 result
443 )
444 .unwrap();
445 }
446 match result {
447 MergeResult::EmitLeft => {
448 self.heap.push(second_lowest);
449 self.item = CurrentItem::Iterator(lowest);
450 break;
451 }
452 MergeResult::Other { emit, left, right } => {
453 try_join!(
454 lowest.maybe_advance(&left),
455 second_lowest.maybe_advance(&right)
456 )?;
457 self.update_item(lowest, left);
458 self.update_item(second_lowest, right);
459 if let Some(emit) = emit {
460 self.item = CurrentItem::Item(emit);
461 break;
462 }
463 }
464 }
465 } else {
466 self.item = CurrentItem::Iterator(lowest);
467 break;
468 }
469 }
470
471 match next_key_bound {
491 Bound::Included(key)
492 if self.get().unwrap().key.cmp_upper_bound(key) == Ordering::Less => {}
493 Bound::Excluded(key)
494 if self.get().unwrap().key.cmp_upper_bound(key) != Ordering::Greater => {}
495 _ => return Ok(()),
496 }
497 if let Some(iterator) = self.item.take_iterator() {
498 iterator.advance().await?;
499 if iterator.is_some() {
500 self.heap.push(iterator);
501 }
502 }
503 }
504 }
505
506 fn needs_more_iterators(&self, next_key: Option<&K>, next_key_bound: Bound<&K>) -> bool {
508 if self.pending_iterators.is_empty() {
509 return false;
510 }
511 if self.heap.is_empty() {
512 return true;
513 }
514 let target;
515 match next_key_bound {
516 Bound::Included(k) => target = k,
517 Bound::Excluded(_) | Bound::Unbounded => return true,
518 }
519 match target.merge_type() {
520 MergeType::FullMerge => true,
521 MergeType::OptimizedMerge => {
522 match next_key {
523 Some(k) => {
524 self.heap.peek().unwrap().key().cmp_lower_bound(k) == Ordering::Greater
525 }
526 None => {
527 self.heap.peek().unwrap().key().cmp_lower_bound(target) != Ordering::Equal
529 }
530 }
531 }
532 }
533 }
534
535 async fn push_iterators(
541 &mut self,
542 next_key: Option<&K>,
543 next_key_bound: Bound<&K>,
544 ) -> Result<(), Error> {
545 while self.needs_more_iterators(next_key, next_key_bound) {
546 let iter = self.pending_iterators.pop().unwrap();
547 let sub_iter = iter.layer.as_ref().unwrap().seek(next_key_bound).await?;
548 if self.trace {
549 writeln!(
550 self.history,
551 "merger: search for {:?}, found {:?}",
552 next_key_bound,
553 sub_iter.get()
554 )
555 .unwrap();
556 }
557 iter.iter = RawIterator::Const(sub_iter);
558 iter.set_item_from_iter();
559 if iter.is_some() {
560 self.heap.push(iter);
561 }
562 }
563 Ok(())
564 }
565
566 fn update_item(&mut self, item: &'a mut MergeLayerIterator<'b, K, V>, op: ItemOp<K, V>) {
569 match op {
570 ItemOp::Keep => self.heap.push(item),
571 ItemOp::Discard => {
572 if item.is_some() {
574 self.heap.push(item);
575 }
576 }
577 ItemOp::Replace(replacement) => {
578 item.replace(replacement);
579 self.heap.push(item);
580 }
581 }
582 }
583}
584
585#[async_trait]
586impl<'a, K: Key + LayerKey + OrdLowerBound, V: Value> LayerIterator<K, V>
587 for MergerIterator<'a, '_, K, V>
588{
589 async fn advance(&mut self) -> Result<(), Error> {
590 let current_key;
591 let mut next_key = None;
592 let mut next_key_bound = Bound::Unbounded;
593 if !self.pending_iterators.is_empty() {
594 if let Some(ItemRef { key, .. }) = self.get() {
595 next_key = key.next_key();
596 match next_key {
597 None => {
598 current_key = Some(key.clone());
601 next_key_bound = Bound::Excluded(current_key.as_ref().unwrap());
602 }
603 Some(ref key) => next_key_bound = Bound::Included(key),
604 }
605 }
606 }
607
608 if let Some(iterator) = self.item.take_iterator() {
611 if self.needs_more_iterators(next_key.as_ref(), next_key_bound) {
612 let existing_item = iterator.item().boxed();
613 iterator.advance().await?;
614 match &next_key {
615 Some(next_key)
616 if iterator.is_some()
617 && iterator.key().cmp_lower_bound(next_key) != Ordering::Greater =>
618 {
619 }
623 _ => {
624 iterator.replace(existing_item);
628
629 self.push_iterators(next_key.as_ref(), next_key_bound).await?;
632 }
633 }
634 } else {
635 iterator.advance().await?;
636 }
637 if iterator.is_some() {
638 self.heap.push(iterator);
639 }
640 } else {
641 self.push_iterators(next_key.as_ref(), next_key_bound).await?;
642 }
643
644 self.advance_impl(next_key_bound).await
645 }
646
647 fn get(&self) -> Option<ItemRef<'_, K, V>> {
648 (&self.item).into()
649 }
650}
651
652struct MutMergeLayerIterator<'a, K, V>(MergeLayerIterator<'a, K, V>);
653
654impl<K, V> MutMergeLayerIterator<'_, K, V> {
655 fn advance(&mut self) {
656 if let MergeItem::Iter = self.item {
657 self.as_mut().advance();
658 }
659 self.set_item_from_iter();
660 }
661}
662
663impl<'a, K, V> AsMut<dyn LayerIteratorMut<K, V> + 'a> for MutMergeLayerIterator<'a, K, V> {
664 fn as_mut(&mut self) -> &mut (dyn LayerIteratorMut<K, V> + 'a) {
665 let RawIterator::Mut(iter) = &mut self.0.iter else { unreachable!() };
666 iter.as_mut()
667 }
668}
669
670impl<'a, K, V> Deref for MutMergeLayerIterator<'a, K, V> {
671 type Target = MergeLayerIterator<'a, K, V>;
672
673 fn deref(&self) -> &Self::Target {
674 &self.0
675 }
676}
677
678impl<K, V> DerefMut for MutMergeLayerIterator<'_, K, V> {
679 fn deref_mut(&mut self) -> &mut Self::Target {
680 &mut self.0
681 }
682}
683
684pub(super) fn merge_into<K: Debug + OrdLowerBound, V: Debug>(
686 mut_iter: Box<dyn LayerIteratorMut<K, V> + '_>,
687 item: Item<K, V>,
688 merge_fn: MergeFn<K, V>,
689) -> Result<(), Error> {
690 let merge_item = if mut_iter.get().is_some() { MergeItem::Iter } else { MergeItem::None };
691 let mut mut_merge_iter = MutMergeLayerIterator(MergeLayerIterator {
692 layer: None,
693 iter: RawIterator::Mut(mut_iter),
694 layer_index: 1,
695 item: merge_item,
696 });
697 let mut item_merge_iter = MergeLayerIterator::new_with_item(0, MergeItem::Item(item.boxed()));
698 while mut_merge_iter.is_some() && item_merge_iter.is_some() {
699 if mut_merge_iter.0 > item_merge_iter {
700 let merge_result = merge_fn(&mut_merge_iter, &item_merge_iter);
702 debug!(
703 lhs:? = mut_merge_iter.key(),
704 rhs:? = item_merge_iter.key(),
705 result:? = merge_result;
706 "(1) merge");
707 match merge_result {
708 MergeResult::EmitLeft => {
709 if let Some(item) = mut_merge_iter.take_item() {
710 mut_merge_iter.as_mut().insert(*item);
711 mut_merge_iter.set_item_from_iter();
712 } else {
713 mut_merge_iter.advance();
714 }
715 }
716 MergeResult::Other { emit, left, right } => {
717 if let Some(emit) = emit {
718 mut_merge_iter.as_mut().insert(*emit);
719 }
720 match left {
721 ItemOp::Keep => {}
722 ItemOp::Discard => {
723 if matches!(mut_merge_iter.item, MergeItem::Iter) {
724 mut_merge_iter.as_mut().erase();
725 }
726 mut_merge_iter.set_item_from_iter();
727 }
728 ItemOp::Replace(item) => {
729 if let MergeItem::Iter = mut_merge_iter.item {
730 mut_merge_iter.as_mut().erase();
731 }
732 mut_merge_iter.item = MergeItem::Item(item)
733 }
734 }
735 match right {
736 ItemOp::Keep => {}
737 ItemOp::Discard => item_merge_iter.item = MergeItem::None,
738 ItemOp::Replace(item) => item_merge_iter.item = MergeItem::Item(item),
739 }
740 }
741 }
742 } else {
743 let merge_result = merge_fn(&item_merge_iter, &mut_merge_iter);
745 debug!(
746 lhs:? = mut_merge_iter.key(),
747 rhs:? = item_merge_iter.key(),
748 result:? = merge_result;
749 "(2) merge");
750 match merge_result {
751 MergeResult::EmitLeft => break, MergeResult::Other { emit, left, right } => {
753 if let Some(emit) = emit {
754 mut_merge_iter.as_mut().insert(*emit);
755 }
756 match left {
757 ItemOp::Keep => {}
758 ItemOp::Discard => item_merge_iter.item = MergeItem::None,
759 ItemOp::Replace(item) => item_merge_iter.item = MergeItem::Item(item),
760 }
761 match right {
762 ItemOp::Keep => {}
763 ItemOp::Discard => {
764 if matches!(mut_merge_iter.item, MergeItem::Iter) {
765 mut_merge_iter.as_mut().erase();
766 }
767 mut_merge_iter.set_item_from_iter();
768 }
769 ItemOp::Replace(item) => {
770 if let MergeItem::Iter = mut_merge_iter.item {
771 mut_merge_iter.as_mut().erase();
772 }
773 mut_merge_iter.item = MergeItem::Item(item)
774 }
775 }
776 }
777 }
778 }
779 } if let MergeItem::Item(item) = item_merge_iter.item {
784 mut_merge_iter.as_mut().insert(*item);
785 }
786 if let Some(item) = mut_merge_iter.take_item() {
787 mut_merge_iter.as_mut().insert(*item);
788 }
789 if let RawIterator::Mut(mut iter) = mut_merge_iter.0.iter {
790 iter.commit();
791 }
792 Ok(())
793}
794
795#[cfg(test)]
796mod tests {
797 use super::ItemOp::{Discard, Keep, Replace};
798 use super::{MergeLayerIterator, MergeResult, Merger};
799 use crate::lsm_tree::persistent_layer::{PersistentLayer, PersistentLayerWriter};
800 use crate::lsm_tree::skip_list_layer::SkipListLayer;
801 use crate::lsm_tree::types::{
802 FuzzyHash, Item, ItemRef, Key, Layer, LayerIterator, LayerKey, LayerWriter, MergeType,
803 OrdLowerBound, OrdUpperBound, SortByU64,
804 };
805 use crate::lsm_tree::{self, Query, Value};
806 use crate::object_store::{self, ObjectKey, ObjectValue, VOLUME_DATA_KEY_ID};
807 use crate::serialized_types::{
808 LATEST_VERSION, Version, Versioned, VersionedLatest, versioned_type,
809 };
810 use crate::testing::fake_object::{FakeObject, FakeObjectHandle};
811 use crate::testing::writer::Writer;
812 use fprint::TypeFingerprint;
813 use fuchsia_sync::Mutex;
814 use fxfs_macros::FuzzyHash;
815 use rand::Rng;
816 use std::hash::Hash;
817 use std::ops::{Bound, Range};
818 use std::sync::Arc;
819
820 #[derive(
821 Clone,
822 Eq,
823 Hash,
824 FuzzyHash,
825 PartialEq,
826 Debug,
827 serde::Serialize,
828 serde::Deserialize,
829 TypeFingerprint,
830 Versioned,
831 )]
832 struct TestKey(Range<u64>);
833
834 impl Value for i32 {
835 const DELETED_MARKER: Self = 0;
836 }
837
838 versioned_type! { 1.. => TestKey }
839
840 impl SortByU64 for TestKey {
841 fn get_leading_u64(&self) -> u64 {
842 self.0.start
843 }
844 }
845
846 impl LayerKey for TestKey {
847 fn merge_type(&self) -> MergeType {
848 MergeType::OptimizedMerge
849 }
850
851 fn next_key(&self) -> Option<Self> {
852 Some(TestKey(self.0.end..self.0.end + 1))
853 }
854 }
855
856 impl OrdUpperBound for TestKey {
857 fn cmp_upper_bound(&self, other: &TestKey) -> std::cmp::Ordering {
858 self.0.end.cmp(&other.0.end)
859 }
860 }
861
862 impl OrdLowerBound for TestKey {
863 fn cmp_lower_bound(&self, other: &Self) -> std::cmp::Ordering {
864 self.0.start.cmp(&other.0.start)
865 }
866 }
867
868 fn layer_ref_iter<K: Key, V: Value>(
869 layers: &[Arc<SkipListLayer<K, V>>],
870 ) -> impl Iterator<Item = &dyn Layer<K, V>> {
871 layers.iter().map(|x| x.as_ref() as &dyn Layer<K, V>)
872 }
873
874 fn dyn_layer_ref_iter<K: Key, V: Value>(
875 layers: &[Arc<dyn Layer<K, V>>],
876 ) -> impl Iterator<Item = &dyn Layer<K, V>> {
877 layers.iter().map(|x| x.as_ref())
878 }
879
880 fn counters() -> Arc<Mutex<lsm_tree::Counters>> {
881 Arc::new(Mutex::new(lsm_tree::Counters::default()))
882 }
883
884 #[fuchsia::test]
885 async fn test_emit_left() {
886 let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
887 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
888 skip_lists[0].insert(items[1].clone()).expect("insert error");
889 skip_lists[1].insert(items[0].clone()).expect("insert error");
890 let mut merger = Merger::new(
891 layer_ref_iter(&skip_lists),
892 |_left, _right| MergeResult::EmitLeft,
893 counters(),
894 );
895 let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
896 let ItemRef { key, value, .. } = iter.get().expect("missing item");
897 assert_eq!((key, value), (&items[0].key, &items[0].value));
898 iter.advance().await.unwrap();
899 let ItemRef { key, value, .. } = iter.get().expect("missing item");
900 assert_eq!((key, value), (&items[1].key, &items[1].value));
901 iter.advance().await.unwrap();
902 assert!(iter.get().is_none());
903 }
904
905 #[fuchsia::test]
906 async fn test_other_emit() {
907 let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
908 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
909 skip_lists[0].insert(items[1].clone()).expect("insert error");
910 skip_lists[1].insert(items[0].clone()).expect("insert error");
911 let mut merger = Merger::new(
912 layer_ref_iter(&skip_lists),
913 |_left, _right| MergeResult::Other {
914 emit: Some(Item::new(TestKey(3..3), 3).boxed()),
915 left: Discard,
916 right: Discard,
917 },
918 counters(),
919 );
920 let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
921
922 let ItemRef { key, value, .. } = iter.get().expect("missing item");
923 assert_eq!((key, value), (&TestKey(3..3), &3));
924 iter.advance().await.unwrap();
925 assert!(iter.get().is_none());
926 }
927
928 #[fuchsia::test]
929 async fn test_replace_left() {
930 let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
931 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
932 skip_lists[0].insert(items[1].clone()).expect("insert error");
933 skip_lists[1].insert(items[0].clone()).expect("insert error");
934 let mut merger = Merger::new(
935 layer_ref_iter(&skip_lists),
936 |_left, _right| MergeResult::Other {
937 emit: None,
938 left: Replace(Item::new(TestKey(3..3), 3).boxed()),
939 right: Discard,
940 },
941 counters(),
942 );
943 let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
944
945 let ItemRef { key, value, .. } = iter.get().expect("missing item");
948 assert_eq!((key, value), (&TestKey(3..3), &3));
949 iter.advance().await.unwrap();
950 assert!(iter.get().is_none());
951 }
952
953 #[fuchsia::test]
954 async fn test_replace_right() {
955 let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
956 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
957 skip_lists[0].insert(items[1].clone()).expect("insert error");
958 skip_lists[1].insert(items[0].clone()).expect("insert error");
959 let mut merger = Merger::new(
960 layer_ref_iter(&skip_lists),
961 |_left, _right| MergeResult::Other {
962 emit: None,
963 left: Discard,
964 right: Replace(Item::new(TestKey(3..3), 3).boxed()),
965 },
966 counters(),
967 );
968 let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
969
970 let ItemRef { key, value, .. } = iter.get().expect("missing item");
973 assert_eq!((key, value), (&TestKey(3..3), &3));
974 iter.advance().await.unwrap();
975 assert!(iter.get().is_none());
976 }
977
978 #[fuchsia::test]
979 async fn test_left_less_than_right() {
980 let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
981 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
982 skip_lists[0].insert(items[1].clone()).expect("insert error");
983 skip_lists[1].insert(items[0].clone()).expect("insert error");
984 let mut merger = Merger::new(
985 layer_ref_iter(&skip_lists),
986 |left, right| {
987 assert_eq!((left.key(), left.value()), (&TestKey(1..1), &1));
988 assert_eq!((right.key(), right.value()), (&TestKey(2..2), &2));
989 MergeResult::EmitLeft
990 },
991 counters(),
992 );
993 merger.query(Query::FullScan).await.expect("seek failed");
994 }
995
996 #[fuchsia::test]
997 async fn test_left_equals_right() {
998 let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
999 let item = Item::new(TestKey(1..1), 1);
1000 skip_lists[0].insert(item.clone()).expect("insert error");
1001 skip_lists[1].insert(item.clone()).expect("insert error");
1002 let mut merger = Merger::new(
1003 layer_ref_iter(&skip_lists),
1004 |left, right| {
1005 assert_eq!((left.key(), left.value()), (&TestKey(1..1), &1));
1006 assert_eq!((right.key(), right.value()), (&TestKey(1..1), &1));
1007 assert_eq!(left.layer_index, 0);
1008 assert_eq!(right.layer_index, 1);
1009 MergeResult::EmitLeft
1010 },
1011 counters(),
1012 );
1013 merger.query(Query::FullScan).await.expect("seek failed");
1014 }
1015
1016 #[fuchsia::test]
1017 async fn test_keep() {
1018 let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1019 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
1020 skip_lists[0].insert(items[1].clone()).expect("insert error");
1021 skip_lists[1].insert(items[0].clone()).expect("insert error");
1022 let mut merger = Merger::new(
1023 layer_ref_iter(&skip_lists),
1024 |left, right| {
1025 if left.key() == &TestKey(1..1) {
1026 MergeResult::Other {
1027 emit: None,
1028 left: Replace(Item::new(TestKey(3..3), 3).boxed()),
1029 right: Keep,
1030 }
1031 } else {
1032 assert_eq!(left.key(), &TestKey(2..2));
1033 assert_eq!(right.key(), &TestKey(3..3));
1034 MergeResult::Other { emit: None, left: Discard, right: Keep }
1035 }
1036 },
1037 counters(),
1038 );
1039 let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
1040
1041 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1044 assert_eq!((key, value), (&TestKey(3..3), &3));
1045 iter.advance().await.unwrap();
1046 assert!(iter.get().is_none());
1047 }
1048
1049 #[fuchsia::test]
1050 async fn test_merge_10_layers() {
1051 let skip_lists: Vec<_> = (0..10).map(|_| SkipListLayer::new(100)).collect();
1052 let mut rng = rand::rng();
1053 for i in 0..100 {
1054 skip_lists[rng.random_range(0..10) as usize]
1055 .insert(Item::new(TestKey(i..i), i))
1056 .expect("insert error");
1057 }
1058 let mut merger = Merger::new(
1059 layer_ref_iter(&skip_lists),
1060 |_left, _right| MergeResult::EmitLeft,
1061 counters(),
1062 );
1063 let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
1064
1065 for i in 0..100 {
1066 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1067 assert_eq!((key, value), (&TestKey(i..i), &i));
1068 iter.advance().await.unwrap();
1069 }
1070 assert!(iter.get().is_none());
1071 }
1072
1073 #[fuchsia::test]
1074 async fn test_merge_uses_cmp_lower_bound() {
1075 let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1076 let items = [Item::new(TestKey(1..10), 1), Item::new(TestKey(2..3), 2)];
1077 skip_lists[0].insert(items[1].clone()).expect("insert error");
1078 skip_lists[1].insert(items[0].clone()).expect("insert error");
1079 let mut merger = Merger::new(
1080 layer_ref_iter(&skip_lists),
1081 |_left, _right| MergeResult::EmitLeft,
1082 counters(),
1083 );
1084 let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
1085
1086 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1087 assert_eq!((key, value), (&items[0].key, &items[0].value));
1088 iter.advance().await.unwrap();
1089 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1090 assert_eq!((key, value), (&items[1].key, &items[1].value));
1091 iter.advance().await.unwrap();
1092 assert!(iter.get().is_none());
1093 }
1094
1095 #[fuchsia::test]
1096 async fn test_merge_into_emit_left() {
1097 let skip_list = SkipListLayer::new(100);
1098 let items =
1099 [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2), Item::new(TestKey(3..3), 3)];
1100 skip_list.insert(items[0].clone()).expect("insert error");
1101 skip_list.insert(items[2].clone()).expect("insert error");
1102 skip_list
1103 .merge_into(items[1].clone(), &items[0].key, |_left, _right| MergeResult::EmitLeft);
1104
1105 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1106 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1107 assert_eq!((key, value), (&items[0].key, &items[0].value));
1108 iter.advance().await.unwrap();
1109 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1110 assert_eq!((key, value), (&items[1].key, &items[1].value));
1111 iter.advance().await.unwrap();
1112 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1113 assert_eq!((key, value), (&items[2].key, &items[2].value));
1114 iter.advance().await.unwrap();
1115 assert!(iter.get().is_none());
1116 }
1117
1118 #[fuchsia::test]
1119 async fn test_merge_into_emit_last_after_replacing() {
1120 let skip_list = SkipListLayer::new(100);
1121 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
1122 skip_list.insert(items[0].clone()).expect("insert error");
1123
1124 skip_list.merge_into(items[1].clone(), &items[0].key, |left, right| {
1125 if left.key() == &TestKey(1..1) {
1126 assert_eq!(right.key(), &TestKey(2..2));
1127 MergeResult::Other {
1128 emit: None,
1129 left: Replace(Item::new(TestKey(3..3), 3).boxed()),
1130 right: Keep,
1131 }
1132 } else {
1133 assert_eq!(left.key(), &TestKey(2..2));
1134 assert_eq!(right.key(), &TestKey(3..3));
1135 MergeResult::EmitLeft
1136 }
1137 });
1138
1139 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1140 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1141 assert_eq!((key, value), (&items[1].key, &items[1].value));
1142 iter.advance().await.unwrap();
1143 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1144 assert_eq!((key, value), (&TestKey(3..3), &3));
1145 iter.advance().await.unwrap();
1146 assert!(iter.get().is_none());
1147 }
1148
1149 #[fuchsia::test]
1150 async fn test_merge_into_emit_left_after_replacing() {
1151 let skip_list = SkipListLayer::new(100);
1152 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(3..3), 3)];
1153 skip_list.insert(items[0].clone()).expect("insert error");
1154
1155 skip_list.merge_into(items[1].clone(), &items[0].key, |left, right| {
1156 if left.key() == &TestKey(1..1) {
1157 assert_eq!(right.key(), &TestKey(3..3));
1158 MergeResult::Other {
1159 emit: None,
1160 left: Replace(Item::new(TestKey(2..2), 2).boxed()),
1161 right: Keep,
1162 }
1163 } else {
1164 assert_eq!(left.key(), &TestKey(2..2));
1165 assert_eq!(right.key(), &TestKey(3..3));
1166 MergeResult::EmitLeft
1167 }
1168 });
1169
1170 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1171 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1172 assert_eq!((key, value), (&TestKey(2..2), &2));
1173 iter.advance().await.unwrap();
1174 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1175 assert_eq!((key, value), (&items[1].key, &items[1].value));
1176 iter.advance().await.unwrap();
1177 assert!(iter.get().is_none());
1178 }
1179
1180 #[fuchsia::test]
1182 async fn test_merge_into_emit_other_and_discard() {
1183 let skip_list = SkipListLayer::new(100);
1184 let items =
1185 [Item::new(TestKey(1..1), 1), Item::new(TestKey(3..3), 3), Item::new(TestKey(5..5), 3)];
1186 skip_list.insert(items[0].clone()).expect("insert error");
1187 skip_list.insert(items[2].clone()).expect("insert error");
1188
1189 skip_list.merge_into(items[1].clone(), &items[0].key, |left, right| {
1190 if left.key() == &TestKey(1..1) {
1191 assert_eq!(right.key(), &TestKey(3..3));
1193 MergeResult::Other {
1194 emit: Some(Item::new(TestKey(2..2), 2).boxed()),
1195 left: Discard,
1196 right: Keep,
1197 }
1198 } else {
1199 assert_eq!(left.key(), &TestKey(3..3));
1201 assert_eq!(right.key(), &TestKey(5..5));
1202 MergeResult::Other {
1203 emit: Some(Item::new(TestKey(4..4), 4).boxed()),
1204 left: Discard,
1205 right: Discard,
1206 }
1207 }
1208 });
1209
1210 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1211 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1212 assert_eq!((key, value), (&TestKey(2..2), &2));
1213 iter.advance().await.unwrap();
1214 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1215 assert_eq!((key, value), (&TestKey(4..4), &4));
1216 iter.advance().await.unwrap();
1217 assert!(iter.get().is_none());
1218 }
1219
1220 #[fuchsia::test]
1223 async fn test_merge_into_replace_and_discard() {
1224 let skip_list = SkipListLayer::new(100);
1225 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(3..3), 3)];
1226 skip_list.insert(items[0].clone()).expect("insert error");
1227
1228 skip_list.merge_into(items[1].clone(), &items[0].key, |_left, _right| MergeResult::Other {
1229 emit: Some(Item::new(TestKey(2..2), 2).boxed()),
1230 left: Replace(Item::new(TestKey(4..4), 4).boxed()),
1231 right: Discard,
1232 });
1233
1234 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1235 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1236 assert_eq!((key, value), (&TestKey(2..2), &2));
1237 iter.advance().await.unwrap();
1238 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1239 assert_eq!((key, value), (&TestKey(4..4), &4));
1240 iter.advance().await.unwrap();
1241 assert!(iter.get().is_none());
1242 }
1243
1244 #[fuchsia::test]
1247 async fn test_merge_into_replace_merge_item() {
1248 let skip_list = SkipListLayer::new(100);
1249 let items =
1250 [Item::new(TestKey(1..1), 1), Item::new(TestKey(3..3), 3), Item::new(TestKey(5..5), 5)];
1251 skip_list.insert(items[0].clone()).expect("insert error");
1252 skip_list.insert(items[2].clone()).expect("insert error");
1253
1254 skip_list.merge_into(items[1].clone(), &items[0].key, |_left, right| {
1255 if right.key() == &TestKey(3..3) {
1256 MergeResult::Other {
1257 emit: None,
1258 left: Discard,
1259 right: Replace(Item::new(TestKey(2..2), 2).boxed()),
1260 }
1261 } else {
1262 assert_eq!(right.key(), &TestKey(5..5));
1263 MergeResult::Other {
1264 emit: None,
1265 left: Replace(Item::new(TestKey(4..4), 4).boxed()),
1266 right: Discard,
1267 }
1268 }
1269 });
1270
1271 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1272 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1273 assert_eq!((key, value), (&TestKey(4..4), &4));
1274 iter.advance().await.unwrap();
1275 assert!(iter.get().is_none());
1276 }
1277
1278 #[fuchsia::test]
1280 async fn test_merge_into_replace_existing() {
1281 let skip_list = SkipListLayer::new(100);
1282 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(3..3), 3)];
1283 skip_list.insert(items[1].clone()).expect("insert error");
1284
1285 skip_list.merge_into(items[0].clone(), &items[0].key, |_left, right| {
1286 if right.key() == &TestKey(3..3) {
1287 MergeResult::Other {
1288 emit: None,
1289 left: Keep,
1290 right: Replace(Item::new(TestKey(2..2), 2).boxed()),
1291 }
1292 } else {
1293 MergeResult::EmitLeft
1294 }
1295 });
1296
1297 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1298 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1299 assert_eq!((key, value), (&items[0].key, &items[0].value));
1300 iter.advance().await.unwrap();
1301 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1302 assert_eq!((key, value), (&TestKey(2..2), &2));
1303 iter.advance().await.unwrap();
1304 assert!(iter.get().is_none());
1305 }
1306
1307 #[fuchsia::test]
1308 async fn test_merge_into_discard_last() {
1309 let skip_list = SkipListLayer::new(100);
1310 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
1311 skip_list.insert(items[0].clone()).expect("insert error");
1312
1313 skip_list.merge_into(items[1].clone(), &items[0].key, |_left, _right| MergeResult::Other {
1314 emit: None,
1315 left: Discard,
1316 right: Keep,
1317 });
1318
1319 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1320 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1321 assert_eq!((key, value), (&items[1].key, &items[1].value));
1322 iter.advance().await.unwrap();
1323 assert!(iter.get().is_none());
1324 }
1325
1326 #[fuchsia::test]
1327 async fn test_merge_into_empty() {
1328 let skip_list = SkipListLayer::new(100);
1329 let items = [Item::new(TestKey(1..1), 1)];
1330
1331 skip_list.merge_into(items[0].clone(), &items[0].key, |_left, _right| {
1332 panic!("Unexpected merge!");
1333 });
1334
1335 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1336 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1337 assert_eq!((key, value), (&items[0].key, &items[0].value));
1338 iter.advance().await.unwrap();
1339 assert!(iter.get().is_none());
1340 }
1341
1342 #[fuchsia::test]
1343 async fn test_seek_uses_minimum_number_of_iterators() {
1344 let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1345 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(1..1), 2)];
1346 skip_lists[0].insert(items[0].clone()).expect("insert error");
1347 skip_lists[1].insert(items[1].clone()).expect("insert error");
1348 let mut merger = Merger::new(
1349 layer_ref_iter(&skip_lists),
1350 |_left, _right| MergeResult::Other { emit: None, left: Discard, right: Keep },
1351 counters(),
1352 );
1353 let iter = merger.query(Query::FullRange(&items[0].key)).await.expect("seek failed");
1354
1355 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1358 assert_eq!((key, value), (&items[0].key, &items[0].value));
1359 }
1360
1361 async fn test_advance<K: Eq + Key + LayerKey + OrdLowerBound>(
1364 layers: &[&[(K, i32)]],
1365 query: Query<'_, K>,
1366 expected: &[(K, i32)],
1367 ) {
1368 let mut skip_lists = Vec::new();
1369 for &layer in layers {
1370 let skip_list = SkipListLayer::new(100);
1371 for (k, v) in layer {
1372 skip_list.insert(Item::new(k.clone(), *v)).expect("insert error");
1373 }
1374 skip_lists.push(skip_list);
1375 }
1376 let mut merger = Merger::new(
1377 layer_ref_iter(&skip_lists),
1378 |_left, _right| MergeResult::EmitLeft,
1379 counters(),
1380 );
1381 let mut iter = merger.query(query).await.expect("seek failed");
1382 for (k, v) in expected {
1383 let ItemRef { key, value, .. } = iter.get().expect("get failed");
1384 assert_eq!((key, value), (k, v));
1385 iter.advance().await.expect("advance failed");
1386 }
1387 assert!(iter.get().is_none());
1388 }
1389
1390 #[fuchsia::test]
1391 async fn test_seek_skips_replaced_items() {
1392 test_advance(
1394 &[
1395 &[(TestKey(1..2), 1), (TestKey(2..3), 2), (TestKey(4..5), 3)],
1396 &[(TestKey(1..2), 4), (TestKey(2..3), 5), (TestKey(3..4), 6)],
1397 ],
1398 Query::FullRange(&TestKey(1..2)),
1399 &[(TestKey(1..2), 1), (TestKey(2..3), 2), (TestKey(3..4), 6), (TestKey(4..5), 3)],
1400 )
1401 .await;
1402 }
1403
1404 #[fuchsia::test]
1405 async fn test_advance_skips_replaced_items_at_end() {
1406 test_advance(
1409 &[&[(TestKey(1..2), 1)], &[(TestKey(1..2), 2)]],
1410 Query::FullRange(&TestKey(1..2)),
1411 &[(TestKey(1..2), 1)],
1412 )
1413 .await;
1414 }
1415
1416 #[derive(
1417 Clone,
1418 Eq,
1419 Hash,
1420 FuzzyHash,
1421 PartialEq,
1422 Debug,
1423 serde::Serialize,
1424 serde::Deserialize,
1425 TypeFingerprint,
1426 Versioned,
1427 )]
1428 struct TestKeyWithFullMerge(Range<u64>);
1429
1430 versioned_type! { 1.. => TestKeyWithFullMerge }
1431
1432 impl LayerKey for TestKeyWithFullMerge {
1433 fn merge_type(&self) -> MergeType {
1434 MergeType::FullMerge
1435 }
1436 }
1437
1438 impl SortByU64 for TestKeyWithFullMerge {
1439 fn get_leading_u64(&self) -> u64 {
1440 self.0.start
1441 }
1442 }
1443
1444 impl OrdUpperBound for TestKeyWithFullMerge {
1445 fn cmp_upper_bound(&self, other: &TestKeyWithFullMerge) -> std::cmp::Ordering {
1446 self.0.end.cmp(&other.0.end)
1447 }
1448 }
1449
1450 impl OrdLowerBound for TestKeyWithFullMerge {
1451 fn cmp_lower_bound(&self, other: &Self) -> std::cmp::Ordering {
1452 self.0.start.cmp(&other.0.start)
1453 }
1454 }
1455
1456 #[fuchsia::test]
1459 async fn test_full_merge_consistent_advance_ordering() {
1460 let layer_set = [
1461 [
1462 (TestKeyWithFullMerge(1..2), 1i32),
1463 (TestKeyWithFullMerge(2..3), 2i32),
1464 (TestKeyWithFullMerge(4..5), 3i32),
1465 ]
1466 .as_slice(),
1467 [
1468 (TestKeyWithFullMerge(1..2), 4i32),
1469 (TestKeyWithFullMerge(2..3), 5i32),
1470 (TestKeyWithFullMerge(3..4), 6i32),
1471 ]
1472 .as_slice(),
1473 ];
1474
1475 let full_merge_result = [
1476 (TestKeyWithFullMerge(1..2), 1),
1477 (TestKeyWithFullMerge(1..2), 4),
1478 (TestKeyWithFullMerge(2..3), 2),
1479 (TestKeyWithFullMerge(2..3), 5),
1480 (TestKeyWithFullMerge(3..4), 6),
1481 (TestKeyWithFullMerge(4..5), 3),
1482 ];
1483
1484 test_advance(layer_set.as_slice(), Query::FullScan, &full_merge_result).await;
1485
1486 test_advance(
1487 layer_set.as_slice(),
1488 Query::FullRange(&TestKeyWithFullMerge(1..2)),
1489 &full_merge_result,
1490 )
1491 .await;
1492
1493 test_advance(
1494 layer_set.as_slice(),
1495 Query::FullRange(&TestKeyWithFullMerge(2..3)),
1496 &full_merge_result[2..],
1497 )
1498 .await;
1499
1500 test_advance(
1501 layer_set.as_slice(),
1502 Query::FullRange(&TestKeyWithFullMerge(3..4)),
1503 &full_merge_result[4..],
1504 )
1505 .await;
1506 }
1507
1508 #[fuchsia::test]
1509 async fn test_full_merge_always_consult_all_layers() {
1510 let skip_lists =
1514 [SkipListLayer::new(100), SkipListLayer::new(100), SkipListLayer::new(100)];
1515 let items = [
1516 Item::new(TestKeyWithFullMerge(1..2), 1),
1517 Item::new(TestKeyWithFullMerge(2..3), 2),
1518 Item::new(TestKeyWithFullMerge(1..2), 3),
1519 Item::new(TestKeyWithFullMerge(2..3), 4),
1520 ];
1521 skip_lists[0].insert(items[0].clone()).expect("insert error");
1522 skip_lists[1].insert(items[1].clone()).expect("insert error");
1523 skip_lists[2].insert(items[2].clone()).expect("insert error");
1524 skip_lists[2].insert(items[3].clone()).expect("insert error");
1525 let mut merger = Merger::new(
1526 layer_ref_iter(&skip_lists),
1527 |left, right| {
1528 if left.key() == right.key() {
1530 MergeResult::Other {
1531 emit: None,
1532 left: Discard,
1533 right: Replace(
1534 Item::new(left.key().clone(), left.value() + right.value()).boxed(),
1535 ),
1536 }
1537 } else {
1538 MergeResult::EmitLeft
1539 }
1540 },
1541 counters(),
1542 );
1543 let mut iter = merger.query(Query::FullRange(&items[0].key)).await.expect("seek failed");
1544
1545 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1546 assert_eq!((key, *value), (&items[0].key, items[0].value + items[2].value));
1547 iter.advance().await.expect("advance");
1548 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1549 assert_eq!((key, *value), (&items[1].key, items[1].value + items[3].value));
1550 iter.advance().await.expect("advance");
1551 assert!(iter.get().is_none());
1552 }
1553
1554 #[derive(
1555 Clone,
1556 Eq,
1557 Hash,
1558 FuzzyHash,
1559 PartialEq,
1560 Debug,
1561 serde::Serialize,
1562 serde::Deserialize,
1563 TypeFingerprint,
1564 Versioned,
1565 )]
1566 struct TestKeyWithDefaultLayerKey(Range<u64>);
1567
1568 versioned_type! { 1.. => TestKeyWithDefaultLayerKey }
1569
1570 impl LayerKey for TestKeyWithDefaultLayerKey {}
1572
1573 impl SortByU64 for TestKeyWithDefaultLayerKey {
1574 fn get_leading_u64(&self) -> u64 {
1575 self.0.start
1576 }
1577 }
1578
1579 impl OrdUpperBound for TestKeyWithDefaultLayerKey {
1580 fn cmp_upper_bound(&self, other: &TestKeyWithDefaultLayerKey) -> std::cmp::Ordering {
1581 self.0.end.cmp(&other.0.end)
1582 }
1583 }
1584
1585 impl OrdLowerBound for TestKeyWithDefaultLayerKey {
1586 fn cmp_lower_bound(&self, other: &Self) -> std::cmp::Ordering {
1587 self.0.start.cmp(&other.0.start)
1588 }
1589 }
1590
1591 #[fuchsia::test]
1592 async fn test_no_merge_unbounded_include_all_layers() {
1593 test_advance(
1594 &[
1595 &[
1596 (TestKeyWithDefaultLayerKey(1..2), 1),
1597 (TestKeyWithDefaultLayerKey(2..3), 2),
1598 (TestKeyWithDefaultLayerKey(4..5), 3),
1599 ],
1600 &[
1601 (TestKeyWithDefaultLayerKey(1..2), 4),
1602 (TestKeyWithDefaultLayerKey(2..3), 5),
1603 (TestKeyWithDefaultLayerKey(3..4), 6),
1604 ],
1605 ],
1606 Query::FullScan,
1607 &[
1608 (TestKeyWithDefaultLayerKey(1..2), 1),
1609 (TestKeyWithDefaultLayerKey(1..2), 4),
1610 (TestKeyWithDefaultLayerKey(2..3), 2),
1611 (TestKeyWithDefaultLayerKey(2..3), 5),
1612 (TestKeyWithDefaultLayerKey(3..4), 6),
1613 (TestKeyWithDefaultLayerKey(4..5), 3),
1614 ],
1615 )
1616 .await;
1617 }
1618
1619 #[fuchsia::test]
1620 async fn test_no_merge_proceeds_comprehensively_after_seek() {
1621 test_advance(
1622 &[
1623 &[
1624 (TestKeyWithDefaultLayerKey(1..2), 1),
1625 (TestKeyWithDefaultLayerKey(2..3), 2),
1626 (TestKeyWithDefaultLayerKey(4..5), 3),
1627 ],
1628 &[
1629 (TestKeyWithDefaultLayerKey(1..2), 4),
1630 (TestKeyWithDefaultLayerKey(2..3), 5),
1631 (TestKeyWithDefaultLayerKey(3..4), 6),
1632 ],
1633 ],
1634 Query::FullRange(&TestKeyWithDefaultLayerKey(1..2)),
1635 &[
1636 (TestKeyWithDefaultLayerKey(1..2), 1),
1637 (TestKeyWithDefaultLayerKey(1..2), 4),
1638 (TestKeyWithDefaultLayerKey(2..3), 2),
1639 (TestKeyWithDefaultLayerKey(2..3), 5),
1640 (TestKeyWithDefaultLayerKey(3..4), 6),
1641 (TestKeyWithDefaultLayerKey(4..5), 3),
1642 ],
1643 )
1644 .await;
1645 }
1646
1647 #[fuchsia::test]
1648 async fn test_no_merge_seek_finds_lower_layer() {
1649 let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1650 let items = [
1651 Item::new(TestKeyWithDefaultLayerKey(2..3), 1),
1652 Item::new(TestKeyWithDefaultLayerKey(1..1), 2),
1653 ];
1654 skip_lists[0].insert(items[0].clone()).expect("insert error");
1655 skip_lists[1].insert(items[1].clone()).expect("insert error");
1656 let mut merger = Merger::new(
1657 layer_ref_iter(&skip_lists),
1658 |_left, _right| MergeResult::EmitLeft,
1659 counters(),
1660 );
1661 let iter = merger.query(Query::FullRange(&items[1].key)).await.expect("seek failed");
1662
1663 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1664 assert_eq!((key, value), (&items[1].key, &items[1].value));
1665 }
1666
1667 #[fuchsia::test]
1668 async fn test_no_merge_seek_stops_at_exact_match() {
1669 let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1670 let items = [
1671 Item::new(TestKeyWithDefaultLayerKey(2..3), 1),
1672 Item::new(TestKeyWithDefaultLayerKey(1..4), 2),
1673 ];
1674 skip_lists[0].insert(items[0].clone()).expect("insert error");
1675 skip_lists[1].insert(items[1].clone()).expect("insert error");
1676 let mut merger = Merger::new(
1677 layer_ref_iter(&skip_lists),
1678 |_left, _right| MergeResult::Other { emit: None, left: Discard, right: Keep },
1679 counters(),
1680 );
1681 let iter = merger.query(Query::FullRange(&items[0].key)).await.expect("seek failed");
1682
1683 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1686 assert_eq!((key, value), (&items[0].key, &items[0].value));
1687 }
1688
1689 #[fuchsia::test]
1690 async fn test_seek_less_than() {
1691 let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1692 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
1693 skip_lists[0].insert(items[0].clone()).expect("insert error");
1694 skip_lists[1].insert(items[1].clone()).expect("insert error");
1695 let mut merger = Merger::new(
1697 layer_ref_iter(&skip_lists),
1698 |_left, _right| MergeResult::Other { emit: None, left: Discard, right: Keep },
1699 counters(),
1700 );
1701 let iter = merger.query(Query::FullRange(&TestKey(0..0))).await.expect("seek failed");
1702
1703 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1705 assert_eq!((key, value), (&items[1].key, &items[1].value));
1706 }
1707
1708 #[fuchsia::test]
1709 async fn test_seek_to_end() {
1710 let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1711 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
1712 skip_lists[0].insert(items[0].clone()).expect("insert error");
1713 skip_lists[1].insert(items[1].clone()).expect("insert error");
1714 let mut merger = Merger::new(
1715 layer_ref_iter(&skip_lists),
1716 |_left, _right| MergeResult::Other { emit: None, left: Discard, right: Keep },
1717 counters(),
1718 );
1719 let iter = merger.query(Query::FullRange(&TestKey(3..3))).await.expect("seek failed");
1720
1721 assert!(iter.get().is_none());
1722 }
1723
1724 #[fuchsia::test]
1725 async fn test_merge_all_discarded() {
1726 let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1727 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
1728 skip_lists[0].insert(items[1].clone()).expect("insert error");
1729 skip_lists[1].insert(items[0].clone()).expect("insert error");
1730 let mut merger = Merger::new(
1731 layer_ref_iter(&skip_lists),
1732 |_left, _right| MergeResult::Other { emit: None, left: Discard, right: Discard },
1733 counters(),
1734 );
1735 let iter = merger.query(Query::FullScan).await.expect("seek failed");
1736 assert!(iter.get().is_none());
1737 }
1738
1739 #[fuchsia::test]
1740 async fn test_seek_with_merged_key_less_than() {
1741 let skip_lists = [SkipListLayer::new(100), SkipListLayer::new(100)];
1742 let items = [Item::new(TestKey(1..8), 1), Item::new(TestKey(2..10), 2)];
1743 skip_lists[0].insert(items[0].clone()).expect("insert error");
1744 skip_lists[1].insert(items[1].clone()).expect("insert error");
1745 let mut merger = Merger::new(
1746 layer_ref_iter(&skip_lists),
1747 |left, _right| {
1748 if left.key() == &TestKey(1..8) {
1749 MergeResult::Other {
1750 emit: None,
1751 left: Replace(Item::new(TestKey(1..2), 1).boxed()),
1752 right: Keep,
1753 }
1754 } else {
1755 MergeResult::EmitLeft
1756 }
1757 },
1758 counters(),
1759 );
1760 let iter = merger.query(Query::FullRange(&TestKey(0..3))).await.expect("seek failed");
1761 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1762 assert_eq!((key, value), (&items[1].key, &items[1].value));
1763 }
1764
1765 #[fuchsia::test]
1766 async fn test_overlapping_keys() {
1767 let skip_lists =
1768 [SkipListLayer::new(100), SkipListLayer::new(100), SkipListLayer::new(100)];
1769 let items = [
1770 Item::new(TestKey(0..10), 1),
1771 Item::new(TestKey(0..20), 2),
1772 Item::new(TestKey(0..30), 3),
1773 ];
1774 skip_lists[0].insert(items[0].clone()).expect("insert error");
1775 skip_lists[1].insert(items[1].clone()).expect("insert error");
1776 skip_lists[2].insert(items[2].clone()).expect("insert error");
1777 let mut merger = Merger::new(
1778 layer_ref_iter(&skip_lists),
1779 |left, right| {
1780 let result = if left.key().0.end <= right.key().0.start {
1781 MergeResult::EmitLeft
1782 } else {
1783 if left.key() == &TestKey(0..30) && right.key() == &TestKey(10..20) {
1784 MergeResult::Other {
1785 emit: Some(Item::new(TestKey(0..10), 1).boxed()),
1786 left: Replace(Item::new(TestKey(10..30), 1).boxed()),
1787 right: Keep,
1788 }
1789 } else {
1790 MergeResult::Other {
1791 emit: None,
1792 left: Keep,
1793 right: Replace(
1794 Item::new(TestKey(left.key().0.end..right.key().0.end), 1).boxed(),
1795 ),
1796 }
1797 }
1798 };
1799 result
1800 },
1801 counters(),
1802 );
1803 let mut iter = merger.query(Query::FullRange(&TestKey(0..1))).await.expect("seek failed");
1804 let ItemRef { key, .. } = iter.get().expect("missing item");
1805 assert_eq!(key, &TestKey(0..10));
1806 iter.advance().await.expect("advance failed");
1807 let ItemRef { key, .. } = iter.get().expect("missing item");
1808 assert_eq!(key, &TestKey(10..20));
1809 iter.advance().await.expect("advance failed");
1810 let ItemRef { key, .. } = iter.get().expect("missing item");
1811 assert_eq!(key, &TestKey(20..30));
1812 iter.advance().await.expect("advance failed");
1813 assert_eq!(iter.get(), None);
1814 }
1815
1816 async fn write_layer<K: Key, V: Value>(items: Vec<Item<K, V>>) -> Arc<dyn Layer<K, V>> {
1817 let object = Arc::new(FakeObject::new());
1818 let write_handle = FakeObjectHandle::new(object.clone());
1819 let mut writer =
1820 PersistentLayerWriter::<_, K, V>::new(Writer::new(&write_handle).await, 1, 512)
1821 .await
1822 .expect("PersistentLayerWriter::new failed");
1823 for item in items {
1824 writer.write(item.as_item_ref()).await.expect("write failed");
1825 }
1826 writer.flush().await.expect("flush failed");
1827 PersistentLayer::open(FakeObjectHandle::new(object))
1828 .await
1829 .expect("open_persistent_layer failed")
1830 }
1831
1832 fn merge_sum(
1833 left: &MergeLayerIterator<'_, i32, i32>,
1834 right: &MergeLayerIterator<'_, i32, i32>,
1835 ) -> MergeResult<i32, i32> {
1836 if left.key() == right.key() {
1838 MergeResult::Other {
1839 emit: None,
1840 left: Discard,
1841 right: Replace(Item::new(left.key().clone(), left.value() + right.value()).boxed()),
1842 }
1843 } else {
1844 MergeResult::EmitLeft
1845 }
1846 }
1847
1848 #[fuchsia::test]
1849 async fn test_merge_bloom_filters_point_query() {
1850 let layer_0_items = vec![Item::new(1, 1), Item::new(2, 1)];
1851 let layer_1_items = vec![Item::new(2, 1), Item::new(4, 1)];
1852 let items = [Item::new(1, 1), Item::new(2, 2), Item::new(4, 1)];
1853 let layers: [Arc<dyn Layer<i32, i32>>; 2] =
1854 [write_layer(layer_0_items).await, write_layer(layer_1_items).await];
1855 let mut merger = Merger::new(dyn_layer_ref_iter(&layers), merge_sum, counters());
1856
1857 {
1858 let iter = merger.query(Query::Point(&1)).await.expect("seek failed");
1860 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1861 assert_eq!((key, *value), (&items[0].key, items[0].value));
1862 }
1863 {
1864 let iter = merger.query(Query::Point(&2)).await.expect("seek failed");
1866 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1867 assert_eq!((key, *value), (&items[1].key, items[1].value));
1868 }
1869 {
1870 let iter = merger.query(Query::Point(&4)).await.expect("seek failed");
1872 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1873 assert_eq!((key, *value), (&items[2].key, items[2].value));
1874 }
1875 {
1876 let iter = merger.query(Query::Point(&400)).await.expect("seek failed");
1878 assert!(iter.get().is_none());
1879 }
1880 }
1881
1882 #[fuchsia::test]
1883 async fn test_merge_bloom_filters_limited_range() {
1884 let layer_0_items = vec![Item::new(
1887 ObjectKey::extent(0, 0, 0..2048),
1888 ObjectValue::extent(0, VOLUME_DATA_KEY_ID),
1889 )];
1890 let layer_1_items = vec![
1891 Item::new(
1892 ObjectKey::extent(0, 0, 1024..4096),
1893 ObjectValue::extent(32768, VOLUME_DATA_KEY_ID),
1894 ),
1895 Item::new(
1896 ObjectKey::extent(0, 0, 16384..17408),
1897 ObjectValue::extent(65536, VOLUME_DATA_KEY_ID),
1898 ),
1899 ];
1900 let items = [
1901 Item::new(ObjectKey::extent(0, 0, 0..2048), ObjectValue::extent(0, VOLUME_DATA_KEY_ID)),
1902 Item::new(
1903 ObjectKey::extent(0, 0, 2048..4096),
1904 ObjectValue::extent(33792, VOLUME_DATA_KEY_ID),
1905 ),
1906 Item::new(
1907 ObjectKey::extent(0, 0, 16384..17408),
1908 ObjectValue::extent(65536, VOLUME_DATA_KEY_ID),
1909 ),
1910 ];
1911 let layers: [Arc<dyn Layer<ObjectKey, ObjectValue>>; 2] =
1912 [write_layer(layer_0_items).await, write_layer(layer_1_items).await];
1913 let mut merger =
1914 Merger::new(dyn_layer_ref_iter(&layers), object_store::merge::merge, counters());
1915
1916 {
1917 let mut iter = merger
1919 .query(Query::LimitedRange(&ObjectKey::extent(0, 0, 16384..16386)))
1920 .await
1921 .expect("seek failed");
1922 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1923 assert_eq!(key, &items[2].key);
1924 assert_eq!(value, &items[2].value);
1925 iter.advance().await.expect("advance");
1926 assert!(iter.get().is_none());
1927 }
1928 {
1929 let mut iter = merger
1931 .query(Query::LimitedRange(&ObjectKey::extent(0, 0, 0..4096)))
1932 .await
1933 .expect("seek failed");
1934 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1935 assert_eq!(key, &items[0].key);
1936 assert_eq!(value, &items[0].value);
1937 iter.advance().await.expect("advance");
1938 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1939 assert_eq!(key, &items[1].key);
1940 assert_eq!(value, &items[1].value);
1941 iter.advance().await.expect("advance");
1942 }
1943 {
1944 let mut iter = merger
1946 .query(Query::LimitedRange(&ObjectKey::extent(0, 0, 8192..12288)))
1947 .await
1948 .expect("seek failed");
1949 let ItemRef { key, .. } = iter.get().expect("missing item");
1950 assert_eq!(key, &items[2].key);
1951 iter.advance().await.expect("advance");
1952 assert!(iter.get().is_none());
1953 }
1954 }
1955
1956 #[fuchsia::test]
1957 async fn test_merge_bloom_filters_full_range() {
1958 let layer_0_items = vec![Item::new(1, 1), Item::new(2, 1)];
1959 let layer_1_items = vec![Item::new(2, 1), Item::new(4, 1)];
1960 let items = [Item::new(1, 1), Item::new(2, 2), Item::new(4, 1)];
1961 let layers: [Arc<dyn Layer<i32, i32>>; 2] =
1962 [write_layer(layer_0_items).await, write_layer(layer_1_items).await];
1963 let mut merger = Merger::new(dyn_layer_ref_iter(&layers), merge_sum, counters());
1964
1965 let mut iter = merger.query(Query::FullRange(&0)).await.expect("seek failed");
1966 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1967 assert_eq!((key, *value), (&items[0].key, items[0].value));
1968 iter.advance().await.expect("advance");
1969 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1970 assert_eq!((key, *value), (&items[1].key, items[1].value));
1971 iter.advance().await.expect("advance");
1972 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1973 assert_eq!((key, *value), (&items[2].key, items[2].value));
1974 iter.advance().await.expect("advance");
1975 assert!(iter.get().is_none());
1976 }
1977
1978 #[fuchsia::test]
1979 async fn test_merge_bloom_filters_full_scan() {
1980 let layer_0_items = vec![Item::new(1, 1), Item::new(2, 1)];
1981 let layer_1_items = vec![Item::new(2, 1), Item::new(4, 1)];
1982 let items = [Item::new(1, 1), Item::new(2, 2), Item::new(4, 1)];
1983 let layers: [Arc<dyn Layer<i32, i32>>; 2] =
1984 [write_layer(layer_0_items).await, write_layer(layer_1_items).await];
1985 let mut merger = Merger::new(dyn_layer_ref_iter(&layers), merge_sum, counters());
1986
1987 let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
1988 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1989 assert_eq!((key, *value), (&items[0].key, items[0].value));
1990 iter.advance().await.expect("advance");
1991 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1992 assert_eq!((key, *value), (&items[1].key, items[1].value));
1993 iter.advance().await.expect("advance");
1994 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1995 assert_eq!((key, *value), (&items[2].key, items[2].value));
1996 iter.advance().await.expect("advance");
1997 assert!(iter.get().is_none());
1998 }
1999}