1mod bloom_filter;
18pub mod cache;
19pub mod merge;
20pub mod persistent_layer;
21pub mod skip_list_layer;
22pub mod types;
23
24#[cfg(any(test, fuzz))]
25pub mod testing;
26
27use crate::drop_event::DropEvent;
28use crate::log::*;
29use crate::metrics::DurationMeasureScope;
30use crate::object_handle::{ReadObjectHandle, WriteBytes};
31use crate::serialized_types::{LATEST_VERSION, Version};
32
33use anyhow::Error;
34use cache::{ObjectCache, ObjectCacheResult};
35
36use fuchsia_inspect::HistogramProperty;
37use fuchsia_sync::RwLock;
38use persistent_layer::{PersistentLayer, PersistentLayerWriter};
39use skip_list_layer::SkipListLayer;
40use std::fmt;
41use std::sync::atomic::{AtomicUsize, Ordering};
42use std::sync::{Arc, Mutex};
43use types::{
44 Existence, Item, ItemRef, Key, Layer, LayerIterator, LayerKey, LayerWriter, MergeableKey,
45 OrdLowerBound, Value,
46};
47
48pub use merge::Query;
49
50const SKIP_LIST_LAYER_ITEMS: usize = 512;
51
52pub use persistent_layer::{
54 LayerHeader as PersistentLayerHeader, LayerHeaderV39 as PersistentLayerHeaderV39,
55 LayerInfo as PersistentLayerInfo, LayerInfoV39 as PersistentLayerInfoV39,
56};
57
58pub async fn layers_from_handles<K: Key, V: Value>(
59 handles: impl IntoIterator<Item = impl ReadObjectHandle + 'static>,
60) -> Result<Vec<Arc<dyn Layer<K, V>>>, Error> {
61 let mut layers = Vec::new();
62 for handle in handles {
63 layers.push(PersistentLayer::open(handle).await? as Arc<dyn Layer<K, V>>);
64 }
65 Ok(layers)
66}
67
68#[derive(Eq, PartialEq, Debug)]
69pub enum Operation {
70 Insert,
71 ReplaceOrInsert,
72 MergeInto,
73}
74
75pub type MutationCallback<K, V> = Option<Box<dyn Fn(Operation, &Item<K, V>) + Send + Sync>>;
76
77struct Inner<K, V> {
78 mutable_layer: Arc<SkipListLayer<K, V>>,
79 layers: Vec<Arc<dyn Layer<K, V>>>,
80 mutation_callback: MutationCallback<K, V>,
81}
82
83pub const LOG2_HISTOGRAM_BUCKETS: usize = 32;
84
85pub struct CompactionCounters {
87 pub compactions: u64,
89 pub compaction_bytes_written: u64,
91 pub compaction_time_ns: u64,
93 pub total_layers_added: u64,
95 pub max_layer_count: u64,
97 pub layer_size_histogram: [u64; LOG2_HISTOGRAM_BUCKETS],
99}
100
101impl Default for CompactionCounters {
102 fn default() -> Self {
103 Self {
104 compactions: 0,
105 compaction_bytes_written: 0,
106 compaction_time_ns: 0,
107 total_layers_added: 0,
108 max_layer_count: 0,
109 layer_size_histogram: [0; LOG2_HISTOGRAM_BUCKETS],
110 }
111 }
112}
113
114pub struct TreeCounters {
116 pub num_seeks: AtomicUsize,
118 pub layer_files_total: AtomicUsize,
121 pub layer_files_skipped: AtomicUsize,
123 pub compaction: Mutex<CompactionCounters>,
125}
126
127impl Default for TreeCounters {
128 fn default() -> Self {
129 Self {
130 num_seeks: AtomicUsize::new(0),
131 layer_files_total: AtomicUsize::new(0),
132 layer_files_skipped: AtomicUsize::new(0),
133 compaction: Mutex::new(CompactionCounters::default()),
134 }
135 }
136}
137
138#[fxfs_trace::trace]
140pub async fn compact_with_iterator<K: Key, V: Value, W: WriteBytes + Send>(
141 mut iterator: impl LayerIterator<K, V>,
142 num_items: usize,
143 writer: W,
144 block_size: u64,
145 mut yielder: Option<impl Yielder>,
146) -> Result<u64, Error> {
147 let mut writer = PersistentLayerWriter::<W, K, V>::new(writer, num_items, block_size).await?;
148 while let Some(item_ref) = iterator.get() {
149 debug!(item_ref:?; "compact: writing");
150 writer.write(item_ref).await?;
151 iterator.advance().await?;
152 if let Some(y) = yielder.as_mut() {
153 y.yield_now().await;
154 }
155 }
156 writer.flush().await?;
157
158 Ok(writer.bytes_written())
159}
160
161pub struct LSMTree<K, V> {
165 data: RwLock<Inner<K, V>>,
166 merge_fn: merge::MergeFn<K, V>,
167 cache: Box<dyn ObjectCache<K, V>>,
168 counters: Arc<TreeCounters>,
169}
170
171#[fxfs_trace::trace]
172impl<'tree, K: MergeableKey, V: Value> LSMTree<K, V> {
173 pub fn new(merge_fn: merge::MergeFn<K, V>, cache: Box<dyn ObjectCache<K, V>>) -> Self {
175 let counters = TreeCounters::default();
176 counters.compaction.lock().unwrap().max_layer_count = 1;
177 LSMTree {
178 data: RwLock::new(Inner {
179 mutable_layer: Self::new_mutable_layer(),
180 layers: Vec::new(),
181 mutation_callback: None,
182 }),
183 merge_fn,
184 cache,
185 counters: Arc::new(counters),
186 }
187 }
188
189 pub async fn open(
191 merge_fn: merge::MergeFn<K, V>,
192 handles: impl IntoIterator<Item = impl ReadObjectHandle + 'static>,
193 cache: Box<dyn ObjectCache<K, V>>,
194 ) -> Result<Self, Error> {
195 let layers = layers_from_handles(handles).await?;
196 let max_layer_count = layers.len() as u64 + 1;
197 let counters = TreeCounters::default();
198 counters.compaction.lock().unwrap().max_layer_count = max_layer_count;
199 Ok(LSMTree {
200 data: RwLock::new(Inner {
201 mutable_layer: Self::new_mutable_layer(),
202 layers,
203 mutation_callback: None,
204 }),
205 merge_fn,
206 cache,
207 counters: Arc::new(counters),
208 })
209 }
210
211 pub fn set_layers(&self, layers: Vec<Arc<dyn Layer<K, V>>>) {
213 let mut data = self.data.write();
214 data.layers = layers;
215 let layer_count = data.layers.len() + 1;
216 let mut counters = self.counters.compaction.lock().unwrap();
217 counters.max_layer_count = std::cmp::max(counters.max_layer_count, layer_count as u64);
218 }
219
220 pub async fn append_layers(
223 &self,
224 handles: impl IntoIterator<Item = impl ReadObjectHandle + 'static>,
225 ) -> Result<(), Error> {
226 let mut layers = layers_from_handles(handles).await?;
227 let mut data = self.data.write();
228 data.layers.append(&mut layers);
229 let layer_count = data.layers.len() + 1;
230 let mut counters = self.counters.compaction.lock().unwrap();
231 counters.max_layer_count = std::cmp::max(counters.max_layer_count, layer_count as u64);
232 Ok(())
233 }
234
235 pub fn reset_immutable_layers(&self) {
237 self.data.write().layers = Vec::new();
238 }
239
240 pub fn seal(&self) {
242 let mut data = self.data.write();
245 let layer = std::mem::replace(&mut data.mutable_layer, Self::new_mutable_layer());
246 data.layers.insert(0, layer);
247 let layer_count = data.layers.len() + 1;
248 let mut counters = self.counters.compaction.lock().unwrap();
249 counters.max_layer_count = std::cmp::max(counters.max_layer_count, layer_count as u64);
250 counters.total_layers_added += 1;
251 }
252
253 pub fn reset(&self) {
255 let mut data = self.data.write();
256 data.layers = Vec::new();
257 data.mutable_layer = Self::new_mutable_layer();
258 }
259
260 pub fn report_compaction_metrics(
261 &self,
262 bytes_written: u64,
263 duration: std::time::Duration,
264 layer_count: usize,
265 ) {
266 let mut counters = self.counters.compaction.lock().unwrap();
267 counters.compactions += 1;
268 counters.compaction_bytes_written += bytes_written;
269 counters.compaction_time_ns += duration.as_nanos() as u64;
270
271 let bucket = if bytes_written == 0 {
272 0
273 } else {
274 std::cmp::min(LOG2_HISTOGRAM_BUCKETS - 1, 63 - bytes_written.leading_zeros() as usize)
275 };
276 counters.layer_size_histogram[bucket] += 1;
277
278 crate::metrics::lsm_tree_metrics().compaction_layer_stack_depth.insert(layer_count as u64);
279 }
280
281 pub fn compaction_bytes_written(&self) -> u64 {
282 self.counters.compaction.lock().unwrap().compaction_bytes_written
283 }
284
285 pub fn empty_layer_set(&self) -> LayerSet<K, V> {
287 LayerSet { layers: Vec::new(), merge_fn: self.merge_fn, counters: self.counters.clone() }
288 }
289
290 pub fn add_all_layers_to_layer_set(&self, layer_set: &mut LayerSet<K, V>) {
292 let data = self.data.read();
293 layer_set.layers.reserve_exact(data.layers.len() + 1);
294 layer_set
295 .layers
296 .push(LockedLayer::from(data.mutable_layer.clone() as Arc<dyn Layer<K, V>>));
297 for layer in &data.layers {
298 layer_set.layers.push(layer.clone().into());
299 }
300 }
301
302 pub fn layer_set(&self) -> LayerSet<K, V> {
305 let mut layer_set = self.empty_layer_set();
306 self.add_all_layers_to_layer_set(&mut layer_set);
307 layer_set
308 }
309
310 pub fn immutable_layer_set(&self) -> LayerSet<K, V> {
314 let data = self.data.read();
315 let mut layers = Vec::with_capacity(data.layers.len());
316 for layer in &data.layers {
317 layers.push(layer.clone().into());
318 }
319 LayerSet { layers, merge_fn: self.merge_fn, counters: self.counters.clone() }
320 }
321
322 pub fn insert(&self, item: Item<K, V>) -> Result<(), Error> {
325 let _measure = DurationMeasureScope::new(&crate::metrics::lsm_tree_metrics().insert);
326
327 let key = item.key.clone();
328 let val = if item.value == V::DELETED_MARKER { None } else { Some(item.value.clone()) };
329 {
330 let data = self.data.read();
332 if let Some(mutation_callback) = data.mutation_callback.as_ref() {
333 mutation_callback(Operation::Insert, &item);
334 }
335 data.mutable_layer.insert(item)?;
336 }
337 self.cache.invalidate(key, val);
338 Ok(())
339 }
340
341 pub fn replace_or_insert(&self, item: Item<K, V>) {
343 let _measure =
344 DurationMeasureScope::new(&crate::metrics::lsm_tree_metrics().replace_or_insert);
345
346 let key = item.key.clone();
347 let val = if item.value == V::DELETED_MARKER { None } else { Some(item.value.clone()) };
348 {
349 let data = self.data.read();
351 if let Some(mutation_callback) = data.mutation_callback.as_ref() {
352 mutation_callback(Operation::ReplaceOrInsert, &item);
353 }
354 data.mutable_layer.replace_or_insert(item);
355 }
356 self.cache.invalidate(key, val);
357 }
358
359 pub fn merge_into(&self, item: Item<K, V>, lower_bound: &K) {
361 let _measure = DurationMeasureScope::new(&crate::metrics::lsm_tree_metrics().merge_into);
362
363 let key = item.key.clone();
364 {
365 let data = self.data.read();
367 if let Some(mutation_callback) = data.mutation_callback.as_ref() {
368 mutation_callback(Operation::MergeInto, &item);
369 }
370 data.mutable_layer.merge_into(item, lower_bound, self.merge_fn);
371 }
372 self.cache.invalidate(key, None);
373 }
374
375 pub async fn find(&self, search_key: &K) -> Result<Option<Item<K, V>>, Error>
378 where
379 K: Eq,
380 {
381 let _measure = DurationMeasureScope::new(&crate::metrics::lsm_tree_metrics().find);
382 let token = match self.cache.lookup_or_reserve(search_key) {
386 ObjectCacheResult::Value(value) => {
387 if value == V::DELETED_MARKER {
388 return Ok(None);
389 } else {
390 return Ok(Some(Item::new(search_key.clone(), value)));
391 }
392 }
393 ObjectCacheResult::Placeholder(token) => Some(token),
394 ObjectCacheResult::NoCache => None,
395 };
396 let layer_set = self.layer_set();
397 let mut merger = layer_set.merger();
398
399 Ok(match merger.query(Query::Point(search_key)).await?.get() {
400 Some(ItemRef { key, value }) if key == search_key && *value != V::DELETED_MARKER => {
401 if let Some(token) = token {
402 token.complete(Some(value));
403 }
404 Some(Item { key: key.clone(), value: value.clone() })
405 }
406 _ => None,
407 })
408 }
409
410 pub fn mutable_layer(&self) -> Arc<SkipListLayer<K, V>> {
411 self.data.read().mutable_layer.clone()
412 }
413
414 pub fn set_mutation_callback(&self, mutation_callback: MutationCallback<K, V>) {
418 self.data.write().mutation_callback = mutation_callback;
419 }
420
421 pub fn get_earliest_version(&self) -> Version {
423 let mut earliest_version = LATEST_VERSION;
424 for layer in self.layer_set().layers {
425 let layer_version = layer.get_version();
426 if layer_version < earliest_version {
427 earliest_version = layer_version;
428 }
429 }
430 return earliest_version;
431 }
432
433 pub fn new_mutable_layer() -> Arc<SkipListLayer<K, V>> {
435 SkipListLayer::new(SKIP_LIST_LAYER_ITEMS)
436 }
437
438 pub fn set_mutable_layer(&self, layer: Arc<SkipListLayer<K, V>>) {
440 self.data.write().mutable_layer = layer;
441 }
442
443 pub fn record_inspect_data(&self, root: &fuchsia_inspect::Node) {
445 let layer_set = self.layer_set();
446 root.record_child("layers", move |node| {
447 let mut index = 0;
448 for layer in layer_set.layers {
449 node.record_child(format!("{index}"), move |node| {
450 layer.1.record_inspect_data(node)
451 });
452 index += 1;
453 }
454 });
455 {
456 let counters = self.counters.compaction.lock().unwrap();
457 root.record_uint("num_seeks", self.counters.num_seeks.load(Ordering::Relaxed) as u64);
458 root.record_uint("bloom_filter_success_percent", {
459 let layer_files_total = self.counters.layer_files_total.load(Ordering::Relaxed);
460 let layer_files_skipped = self.counters.layer_files_skipped.load(Ordering::Relaxed);
461 if layer_files_total == 0 {
462 0
463 } else {
464 (layer_files_skipped * 100).div_ceil(layer_files_total) as u64
465 }
466 });
467 root.record_uint("compactions", counters.compactions);
468 root.record_uint("compaction_bytes_written", counters.compaction_bytes_written);
469 root.record_uint("compaction_time_ns", counters.compaction_time_ns);
470 root.record_uint("total_layers_added", counters.total_layers_added);
471 root.record_uint("max_layer_count", counters.max_layer_count);
472
473 let layer_sizes = root.create_uint_exponential_histogram(
474 "layer_size_histogram_log2",
475 fuchsia_inspect::ExponentialHistogramParams {
476 floor: 1,
477 initial_step: 1,
478 step_multiplier: 2,
479 buckets: LOG2_HISTOGRAM_BUCKETS,
480 },
481 );
482 for (i, count) in counters.layer_size_histogram.iter().enumerate() {
483 layer_sizes.insert_multiple(1u64 << i, *count as usize);
484 }
485 root.record(layer_sizes);
486 }
487 }
488}
489
490pub struct LockedLayer<K, V>(Arc<DropEvent>, Arc<dyn Layer<K, V>>);
493
494impl<K, V> LockedLayer<K, V> {
495 pub async fn close_layer(self) {
496 let layer = self.1;
497 std::mem::drop(self.0);
498 layer.close().await;
499 }
500}
501
502impl<K, V> From<Arc<dyn Layer<K, V>>> for LockedLayer<K, V> {
503 fn from(layer: Arc<dyn Layer<K, V>>) -> Self {
504 let event = layer.lock().unwrap();
505 Self(event, layer)
506 }
507}
508
509impl<K, V> std::ops::Deref for LockedLayer<K, V> {
510 type Target = Arc<dyn Layer<K, V>>;
511
512 fn deref(&self) -> &Self::Target {
513 &self.1
514 }
515}
516
517impl<K, V> AsRef<dyn Layer<K, V>> for LockedLayer<K, V> {
518 fn as_ref(&self) -> &(dyn Layer<K, V> + 'static) {
519 self.1.as_ref()
520 }
521}
522
523pub struct LayerSet<K, V> {
526 pub layers: Vec<LockedLayer<K, V>>,
527 merge_fn: merge::MergeFn<K, V>,
528 counters: Arc<TreeCounters>,
529}
530
531impl<K: Key + LayerKey + OrdLowerBound, V: Value> LayerSet<K, V> {
532 pub fn sum_len(&self) -> usize {
533 let mut size = 0;
534 for layer in &self.layers {
535 size += layer.len()
536 }
537 size
538 }
539
540 pub fn merger(&self) -> merge::Merger<'_, K, V> {
541 merge::Merger::new(
542 self.layers.iter().map(|x| x.as_ref()),
543 self.merge_fn,
544 self.counters.clone(),
545 )
546 }
547
548 pub async fn key_exists(&self, key: &K) -> Result<Existence, Error> {
550 for l in &self.layers {
551 match l.key_exists(key).await? {
552 e @ (Existence::Exists | Existence::MaybeExists) => return Ok(e),
553 _ => {}
554 }
555 }
556 Ok(Existence::Missing)
557 }
558}
559
560impl<K, V> fmt::Debug for LayerSet<K, V> {
561 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
562 fmt.debug_list()
563 .entries(self.layers.iter().map(|l| {
564 if let Some(handle) = l.handle() {
565 format!("{}", handle.object_id())
566 } else {
567 format!("{:?}", Arc::as_ptr(l))
568 }
569 }))
570 .finish()
571 }
572}
573
574pub trait Yielder: Send {
576 fn yield_now(&mut self) -> impl Future<Output = ()> + Send;
577}
578
579#[cfg(test)]
580mod tests {
581 use super::{LSMTree, Yielder, compact_with_iterator};
582 use crate::drop_event::DropEvent;
583 use crate::lsm_tree::cache::{
584 NullCache, ObjectCache, ObjectCachePlaceholder, ObjectCacheResult,
585 };
586 use crate::lsm_tree::merge::{MergeLayerIterator, MergeResult};
587 use crate::lsm_tree::types::{
588 BoxedLayerIterator, Existence, Item, ItemRef, Key, Layer, LayerIterator, Value,
589 };
590 use crate::lsm_tree::{Query, layers_from_handles};
591 use crate::object_handle::ObjectHandle;
592 use crate::serialized_types::{LATEST_VERSION, Version};
593 use crate::testing::fake_object::{FakeObject, FakeObjectHandle};
594 use crate::testing::writer::Writer;
595 use anyhow::{Error, anyhow};
596 use async_trait::async_trait;
597
598 use fuchsia_sync::Mutex;
599
600 use rand::rng;
601 use rand::seq::SliceRandom;
602
603 use std::sync::Arc;
604
605 use super::testing::TestKey;
606
607 fn emit_left_merge_fn(
608 _left: &MergeLayerIterator<'_, TestKey, u64>,
609 _right: &MergeLayerIterator<'_, TestKey, u64>,
610 ) -> MergeResult<TestKey, u64> {
611 MergeResult::EmitLeft
612 }
613
614 impl Value for u64 {
615 const DELETED_MARKER: Self = 0;
616 }
617
618 struct NoOpYielder;
619 impl Yielder for NoOpYielder {
620 async fn yield_now(&mut self) {}
621 }
622
623 #[fuchsia::test]
624 async fn test_iteration() {
625 let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
626 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
627 tree.insert(items[0].clone()).expect("insert error");
628 tree.insert(items[1].clone()).expect("insert error");
629 let layers = tree.layer_set();
630 let mut merger = layers.merger();
631 let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
632 let ItemRef { key, value, .. } = iter.get().expect("missing item");
633 assert_eq!((key, value), (&items[0].key, &items[0].value));
634 iter.advance().await.expect("advance failed");
635 let ItemRef { key, value, .. } = iter.get().expect("missing item");
636 assert_eq!((key, value), (&items[1].key, &items[1].value));
637 iter.advance().await.expect("advance failed");
638 assert!(iter.get().is_none());
639 }
640
641 #[fuchsia::test]
642 async fn test_compact() {
643 let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
644 let items = [
645 Item::new(TestKey(1..1), 1),
646 Item::new(TestKey(2..2), 2),
647 Item::new(TestKey(3..3), 3),
648 Item::new(TestKey(4..4), 4),
649 ];
650 tree.insert(items[0].clone()).expect("insert error");
651 tree.insert(items[1].clone()).expect("insert error");
652 tree.seal();
653 tree.insert(items[2].clone()).expect("insert error");
654 tree.insert(items[3].clone()).expect("insert error");
655 tree.seal();
656 let object = Arc::new(FakeObject::new());
657 let handle = FakeObjectHandle::new(object.clone());
658 {
659 let layer_set = tree.immutable_layer_set();
660 let mut merger = layer_set.merger();
661 let iter = merger.query(Query::FullScan).await.expect("create merger");
662 compact_with_iterator(
663 iter,
664 items.len(),
665 Writer::new(&handle).await,
666 handle.block_size(),
667 Option::<NoOpYielder>::None,
668 )
669 .await
670 .expect("compact failed");
671 }
672 tree.set_layers(layers_from_handles([handle]).await.expect("layers_from_handles failed"));
673 let handle = FakeObjectHandle::new(object.clone());
674 let tree = LSMTree::open(emit_left_merge_fn, [handle], Box::new(NullCache {}))
675 .await
676 .expect("open failed");
677
678 let layers = tree.layer_set();
679 let mut merger = layers.merger();
680 let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
681 for i in 1..5 {
682 let ItemRef { key, value, .. } = iter.get().expect("missing item");
683 assert_eq!((key, value), (&TestKey(i..i), &i));
684 iter.advance().await.expect("advance failed");
685 }
686 assert!(iter.get().is_none());
687 }
688
689 #[fuchsia::test]
690 async fn test_find() {
691 let items = [
692 Item::new(TestKey(1..1), 1),
693 Item::new(TestKey(2..2), 2),
694 Item::new(TestKey(3..3), 3),
695 Item::new(TestKey(4..4), 4),
696 ];
697 let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
698 tree.insert(items[0].clone()).expect("insert error");
699 tree.insert(items[1].clone()).expect("insert error");
700 tree.seal();
701 tree.insert(items[2].clone()).expect("insert error");
702 tree.insert(items[3].clone()).expect("insert error");
703
704 let item = tree.find(&items[1].key).await.expect("find failed").expect("not found");
705 assert_eq!(item, items[1]);
706 assert!(tree.find(&TestKey(100..100)).await.expect("find failed").is_none());
707 }
708
709 #[fuchsia::test]
710 async fn test_find_no_return_deleted_values() {
711 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), u64::DELETED_MARKER)];
712 let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
713 tree.insert(items[0].clone()).expect("insert error");
714 tree.insert(items[1].clone()).expect("insert error");
715
716 let item = tree.find(&items[0].key).await.expect("find failed").expect("not found");
717 assert_eq!(item, items[0]);
718 assert!(tree.find(&items[1].key).await.expect("find failed").is_none());
719 }
720
721 #[fuchsia::test]
722 async fn test_empty_seal() {
723 let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
724 tree.seal();
725 let item = Item::new(TestKey(1..1), 1);
726 tree.insert(item.clone()).expect("insert error");
727 let object = Arc::new(FakeObject::new());
728 let handle = FakeObjectHandle::new(object.clone());
729 {
730 let layer_set = tree.immutable_layer_set();
731 let mut merger = layer_set.merger();
732 let iter = merger.query(Query::FullScan).await.expect("create merger");
733 compact_with_iterator(
734 iter,
735 0,
736 Writer::new(&handle).await,
737 handle.block_size(),
738 Option::<NoOpYielder>::None,
739 )
740 .await
741 .expect("compact failed");
742 }
743 tree.set_layers(layers_from_handles([handle]).await.expect("layers_from_handles failed"));
744 let found_item = tree.find(&item.key).await.expect("find failed").expect("not found");
745 assert_eq!(found_item, item);
746 assert!(tree.find(&TestKey(2..2)).await.expect("find failed").is_none());
747 }
748
749 #[fuchsia::test]
750 async fn test_filter() {
751 let items = [
752 Item::new(TestKey(1..1), 1),
753 Item::new(TestKey(2..2), 2),
754 Item::new(TestKey(3..3), 3),
755 Item::new(TestKey(4..4), 4),
756 ];
757 let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
758 tree.insert(items[0].clone()).expect("insert error");
759 tree.insert(items[1].clone()).expect("insert error");
760 tree.insert(items[2].clone()).expect("insert error");
761 tree.insert(items[3].clone()).expect("insert error");
762
763 let layers = tree.layer_set();
764 let mut merger = layers.merger();
765
766 let mut iter = merger
768 .query(Query::FullScan)
769 .await
770 .expect("seek failed")
771 .filter(|item: ItemRef<'_, TestKey, u64>| item.key.0.start % 2 == 0)
772 .await
773 .expect("filter failed");
774
775 assert_eq!(iter.get(), Some(items[1].as_item_ref()));
776 iter.advance().await.expect("advance failed");
777 assert_eq!(iter.get(), Some(items[3].as_item_ref()));
778 iter.advance().await.expect("advance failed");
779 assert!(iter.get().is_none());
780 }
781
782 #[fuchsia::test]
783 async fn test_insert_order_agnostic() {
784 let items = [
785 Item::new(TestKey(1..1), 1),
786 Item::new(TestKey(2..2), 2),
787 Item::new(TestKey(3..3), 3),
788 Item::new(TestKey(4..4), 4),
789 Item::new(TestKey(5..5), 5),
790 Item::new(TestKey(6..6), 6),
791 ];
792 let a = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
793 for item in &items {
794 a.insert(item.clone()).expect("insert error");
795 }
796 let b = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
797 let mut shuffled = items.clone();
798 shuffled.shuffle(&mut rng());
799 for item in &shuffled {
800 b.insert(item.clone()).expect("insert error");
801 }
802 let layers = a.layer_set();
803 let mut merger = layers.merger();
804 let mut iter_a = merger.query(Query::FullScan).await.expect("seek failed");
805 let layers = b.layer_set();
806 let mut merger = layers.merger();
807 let mut iter_b = merger.query(Query::FullScan).await.expect("seek failed");
808
809 for item in items {
810 assert_eq!(Some(item.as_item_ref()), iter_a.get());
811 assert_eq!(Some(item.as_item_ref()), iter_b.get());
812 iter_a.advance().await.expect("advance failed");
813 iter_b.advance().await.expect("advance failed");
814 }
815 assert!(iter_a.get().is_none());
816 assert!(iter_b.get().is_none());
817 }
818
819 struct AuditCacheInner<'a, V: Value> {
820 lookups: u64,
821 completions: u64,
822 invalidations: u64,
823 drops: u64,
824 result: Option<ObjectCacheResult<'a, V>>,
825 }
826
827 impl<V: Value> AuditCacheInner<'_, V> {
828 fn stats(&self) -> (u64, u64, u64, u64) {
829 (self.lookups, self.completions, self.invalidations, self.drops)
830 }
831 }
832
833 struct AuditCache<'a, V: Value> {
834 inner: Arc<Mutex<AuditCacheInner<'a, V>>>,
835 }
836
837 impl<V: Value> AuditCache<'_, V> {
838 fn new() -> Self {
839 Self {
840 inner: Arc::new(Mutex::new(AuditCacheInner {
841 lookups: 0,
842 completions: 0,
843 invalidations: 0,
844 drops: 0,
845 result: None,
846 })),
847 }
848 }
849 }
850
851 struct AuditPlaceholder<'a, V: Value> {
852 inner: Arc<Mutex<AuditCacheInner<'a, V>>>,
853 completed: Mutex<bool>,
854 }
855
856 impl<V: Value> ObjectCachePlaceholder<V> for AuditPlaceholder<'_, V> {
857 fn complete(self: Box<Self>, _: Option<&V>) {
858 self.inner.lock().completions += 1;
859 *self.completed.lock() = true;
860 }
861 }
862
863 impl<V: Value> Drop for AuditPlaceholder<'_, V> {
864 fn drop(&mut self) {
865 if !*self.completed.lock() {
866 self.inner.lock().drops += 1;
867 }
868 }
869 }
870
871 impl<K: Key + std::cmp::PartialEq, V: Value> ObjectCache<K, V> for AuditCache<'_, V> {
872 fn lookup_or_reserve(&self, _key: &K) -> ObjectCacheResult<'_, V> {
873 {
874 let mut inner = self.inner.lock();
875 inner.lookups += 1;
876 if inner.result.is_some() {
877 return std::mem::take(&mut inner.result).unwrap();
878 }
879 }
880 ObjectCacheResult::Placeholder(Box::new(AuditPlaceholder {
881 inner: self.inner.clone(),
882 completed: Mutex::new(false),
883 }))
884 }
885
886 fn invalidate(&self, _key: K, _value: Option<V>) {
887 self.inner.lock().invalidations += 1;
888 }
889 }
890
891 #[fuchsia::test]
892 async fn test_cache_handling() {
893 let item = Item::new(TestKey(1..1), 1);
894 let cache = Box::new(AuditCache::new());
895 let inner = cache.inner.clone();
896 let a = LSMTree::new(emit_left_merge_fn, cache);
897
898 assert_eq!(inner.lock().stats(), (0, 0, 0, 0));
900
901 assert!(a.find(&item.key).await.expect("Failed find").is_none());
903 assert_eq!(inner.lock().stats(), (1, 0, 0, 1));
904
905 let _ = a.insert(item.clone());
907 assert_eq!(inner.lock().stats(), (1, 0, 1, 1));
908
909 assert_eq!(
911 a.find(&item.key).await.expect("Failed find").expect("Item should be found.").value,
912 item.value
913 );
914 assert_eq!(inner.lock().stats(), (2, 1, 1, 1));
915
916 a.replace_or_insert(item.clone());
918 assert_eq!(inner.lock().stats(), (2, 1, 2, 1));
919 }
920
921 #[fuchsia::test]
922 async fn test_cache_hit() {
923 let item = Item::new(TestKey(1..1), 1);
924 let cache = Box::new(AuditCache::new());
925 let inner = cache.inner.clone();
926 let a = LSMTree::new(emit_left_merge_fn, cache);
927
928 assert_eq!(inner.lock().stats(), (0, 0, 0, 0));
930
931 let _ = a.insert(item.clone());
933 assert_eq!(inner.lock().stats(), (0, 0, 1, 0));
934
935 inner.lock().result = Some(ObjectCacheResult::Value(item.value.clone()));
937
938 assert_eq!(
940 a.find(&item.key).await.expect("Failed find").expect("Item should be found.").value,
941 item.value
942 );
943 assert_eq!(inner.lock().stats(), (1, 0, 1, 0));
944 }
945
946 #[fuchsia::test]
947 async fn test_cache_says_uncacheable() {
948 let item = Item::new(TestKey(1..1), 1);
949 let cache = Box::new(AuditCache::new());
950 let inner = cache.inner.clone();
951 let a = LSMTree::new(emit_left_merge_fn, cache);
952 let _ = a.insert(item.clone());
953
954 assert_eq!(inner.lock().stats(), (0, 0, 1, 0));
956
957 inner.lock().result = Some(ObjectCacheResult::NoCache);
959
960 assert_eq!(
962 a.find(&item.key).await.expect("Failed find").expect("Should find item").value,
963 item.value
964 );
965 assert_eq!(inner.lock().stats(), (1, 0, 1, 0));
966 }
967
968 struct FailLayer {
969 drop_event: Mutex<Option<Arc<DropEvent>>>,
970 }
971
972 impl FailLayer {
973 fn new() -> Self {
974 Self { drop_event: Mutex::new(Some(Arc::new(DropEvent::new()))) }
975 }
976 }
977
978 #[async_trait]
979 impl<K: Key, V: Value> Layer<K, V> for FailLayer {
980 async fn seek(
981 &self,
982 _bound: std::ops::Bound<&K>,
983 ) -> Result<BoxedLayerIterator<'_, K, V>, Error> {
984 Err(anyhow!("Purposely failed seek"))
985 }
986
987 fn lock(&self) -> Option<Arc<DropEvent>> {
988 self.drop_event.lock().clone()
989 }
990
991 fn len(&self) -> usize {
992 0
993 }
994
995 async fn close(&self) {
996 let listener = match std::mem::replace(&mut (*self.drop_event.lock()), None) {
997 Some(drop_event) => drop_event.listen(),
998 None => return,
999 };
1000 listener.await;
1001 }
1002
1003 fn get_version(&self) -> Version {
1004 LATEST_VERSION
1005 }
1006
1007 async fn key_exists(&self, _key: &K) -> Result<Existence, Error> {
1008 unimplemented!();
1009 }
1010 }
1011
1012 struct MockLayer {
1013 exists_result: Existence,
1014 drop_event: Mutex<Option<Arc<DropEvent>>>,
1015 }
1016
1017 impl MockLayer {
1018 fn new(exists_result: Existence) -> Self {
1019 Self { exists_result, drop_event: Mutex::new(Some(Arc::new(DropEvent::new()))) }
1020 }
1021 }
1022
1023 #[async_trait]
1024 impl<K: Key, V: Value> Layer<K, V> for MockLayer {
1025 async fn seek(
1026 &self,
1027 _bound: std::ops::Bound<&K>,
1028 ) -> Result<BoxedLayerIterator<'_, K, V>, Error> {
1029 unimplemented!()
1030 }
1031
1032 fn lock(&self) -> Option<Arc<DropEvent>> {
1033 self.drop_event.lock().clone()
1034 }
1035
1036 fn len(&self) -> usize {
1037 0
1038 }
1039
1040 async fn close(&self) {
1041 let listener = match std::mem::replace(&mut (*self.drop_event.lock()), None) {
1042 Some(drop_event) => drop_event.listen(),
1043 None => return,
1044 };
1045 listener.await;
1046 }
1047
1048 fn get_version(&self) -> Version {
1049 LATEST_VERSION
1050 }
1051
1052 async fn key_exists(&self, _key: &K) -> Result<Existence, Error> {
1053 Ok(self.exists_result)
1054 }
1055 }
1056
1057 #[fuchsia::test]
1058 async fn test_layer_set_key_exists() {
1059 use super::LockedLayer;
1060
1061 let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
1062 let mut layer_set = tree.empty_layer_set();
1063
1064 assert_eq!(
1066 layer_set.key_exists(&TestKey(0..1)).await.expect("key_exists failed"),
1067 Existence::Missing
1068 );
1069
1070 layer_set.layers.push(LockedLayer::from(
1072 Arc::new(MockLayer::new(Existence::Missing)) as Arc<dyn Layer<TestKey, u64>>
1073 ));
1074 assert_eq!(
1075 layer_set.key_exists(&TestKey(0..1)).await.expect("key_exists failed"),
1076 Existence::Missing
1077 );
1078
1079 layer_set.layers.push(LockedLayer::from(
1081 Arc::new(MockLayer::new(Existence::MaybeExists)) as Arc<dyn Layer<TestKey, u64>>
1082 ));
1083 assert_eq!(
1084 layer_set.key_exists(&TestKey(0..1)).await.expect("key_exists failed"),
1085 Existence::MaybeExists
1086 );
1087
1088 layer_set.layers.insert(
1090 0,
1091 LockedLayer::from(
1092 Arc::new(MockLayer::new(Existence::Exists)) as Arc<dyn Layer<TestKey, u64>>
1093 ),
1094 );
1095 assert_eq!(
1096 layer_set.key_exists(&TestKey(0..1)).await.expect("key_exists failed"),
1097 Existence::Exists
1098 );
1099 }
1100
1101 #[fuchsia::test]
1102 async fn test_failed_lookup() {
1103 let cache = Box::new(AuditCache::new());
1104 let inner = cache.inner.clone();
1105 let a = LSMTree::new(emit_left_merge_fn, cache);
1106 a.set_layers(vec![Arc::new(FailLayer::new())]);
1107
1108 assert_eq!(inner.lock().stats(), (0, 0, 0, 0));
1110
1111 assert!(a.find(&TestKey(1..1)).await.is_err());
1113 assert_eq!(inner.lock().stats(), (1, 0, 0, 1));
1114 }
1115}
1116
1117#[cfg(fuzz)]
1118mod fuzz {
1119 use crate::lsm_tree::types::{Item, Value};
1120 use crate::serialized_types::{
1121 LATEST_VERSION, Version, Versioned, VersionedLatest, versioned_type,
1122 };
1123 use arbitrary::Arbitrary;
1124
1125 use fuzz::fuzz;
1126
1127 use super::testing::TestKey;
1128
1129 impl Versioned for u64 {}
1130 versioned_type! { 1.. => u64 }
1131
1132 impl Value for u64 {
1133 const DELETED_MARKER: Self = 0;
1134 }
1135
1136 #[allow(dead_code)]
1140 #[derive(Arbitrary)]
1141 enum FuzzAction {
1142 Insert(Item<TestKey, u64>),
1143 ReplaceOrInsert(Item<TestKey, u64>),
1144 MergeInto(Item<TestKey, u64>, TestKey),
1145 Find(TestKey),
1146 Seal,
1147 }
1148
1149 #[fuzz]
1150 fn fuzz_lsm_tree_actions(actions: Vec<FuzzAction>) {
1151 use super::LSMTree;
1152 use super::cache::NullCache;
1153 use crate::lsm_tree::merge::{MergeLayerIterator, MergeResult};
1154 use futures::executor::block_on;
1155
1156 fn emit_left_merge_fn(
1157 _left: &MergeLayerIterator<'_, TestKey, u64>,
1158 _right: &MergeLayerIterator<'_, TestKey, u64>,
1159 ) -> MergeResult<TestKey, u64> {
1160 MergeResult::EmitLeft
1161 }
1162
1163 let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
1164 for action in actions {
1165 match action {
1166 FuzzAction::Insert(item) => {
1167 let _ = tree.insert(item);
1168 }
1169 FuzzAction::ReplaceOrInsert(item) => {
1170 tree.replace_or_insert(item);
1171 }
1172 FuzzAction::Find(key) => {
1173 block_on(tree.find(&key)).expect("find failed");
1174 }
1175 FuzzAction::MergeInto(item, bound) => tree.merge_into(item, &bound),
1176 FuzzAction::Seal => tree.seal(),
1177 };
1178 }
1179 }
1180}