1mod bloom_filter;
18pub mod cache;
19pub mod merge;
20pub mod persistent_layer;
21pub mod skip_list_layer;
22pub mod types;
23
24#[cfg(any(test, fuzz))]
25pub mod testing;
26
27use crate::drop_event::DropEvent;
28use crate::log::*;
29use crate::metrics::DurationMeasureScope;
30use crate::object_handle::{ReadObjectHandle, WriteBytes};
31use crate::serialized_types::{LATEST_VERSION, Version};
32
33use anyhow::Error;
34use cache::{ObjectCache, ObjectCacheResult};
35
36use fuchsia_inspect::HistogramProperty;
37use fuchsia_sync::RwLock;
38use persistent_layer::{PersistentLayer, PersistentLayerWriter};
39use skip_list_layer::SkipListLayer;
40use std::fmt;
41use std::sync::atomic::{AtomicUsize, Ordering};
42use std::sync::{Arc, Mutex};
43use types::{
44 Existence, Item, ItemRef, Key, Layer, LayerIterator, LayerKey, LayerWriter, MergeableKey,
45 OrdLowerBound, Value,
46};
47
48pub use merge::Query;
49
50const SKIP_LIST_LAYER_ITEMS: usize = 512;
51
52pub use persistent_layer::{
54 LayerHeader as PersistentLayerHeader, LayerHeaderV39 as PersistentLayerHeaderV39,
55 LayerInfo as PersistentLayerInfo, LayerInfoV39 as PersistentLayerInfoV39,
56};
57
58pub async fn layers_from_handles<K: Key, V: Value>(
59 handles: impl IntoIterator<Item = impl ReadObjectHandle + 'static>,
60) -> Result<Vec<Arc<dyn Layer<K, V>>>, Error> {
61 let mut layers = Vec::new();
62 for handle in handles {
63 layers.push(PersistentLayer::open(handle).await? as Arc<dyn Layer<K, V>>);
64 }
65 Ok(layers)
66}
67
68#[derive(Eq, PartialEq, Debug)]
69pub enum Operation {
70 Insert,
71 ReplaceOrInsert,
72 MergeInto,
73}
74
75pub type MutationCallback<K, V> = Option<Box<dyn Fn(Operation, &Item<K, V>) + Send + Sync>>;
76
77struct Inner<K, V> {
78 mutable_layer: Arc<SkipListLayer<K, V>>,
79 layers: Vec<Arc<dyn Layer<K, V>>>,
80 mutation_callback: MutationCallback<K, V>,
81}
82
83pub const LOG2_HISTOGRAM_BUCKETS: usize = 32;
84
85pub struct CompactionCounters {
87 pub compactions: u64,
89 pub compaction_bytes_written: u64,
91 pub compaction_time_ns: u64,
93 pub total_layers_added: u64,
95 pub max_layer_count: u64,
97 pub layer_size_histogram: [u64; LOG2_HISTOGRAM_BUCKETS],
99}
100
101impl Default for CompactionCounters {
102 fn default() -> Self {
103 Self {
104 compactions: 0,
105 compaction_bytes_written: 0,
106 compaction_time_ns: 0,
107 total_layers_added: 0,
108 max_layer_count: 0,
109 layer_size_histogram: [0; LOG2_HISTOGRAM_BUCKETS],
110 }
111 }
112}
113
114pub struct TreeCounters {
116 pub num_seeks: AtomicUsize,
118 pub layer_files_total: AtomicUsize,
121 pub layer_files_skipped: AtomicUsize,
123 pub compaction: Mutex<CompactionCounters>,
125}
126
127impl Default for TreeCounters {
128 fn default() -> Self {
129 Self {
130 num_seeks: AtomicUsize::new(0),
131 layer_files_total: AtomicUsize::new(0),
132 layer_files_skipped: AtomicUsize::new(0),
133 compaction: Mutex::new(CompactionCounters::default()),
134 }
135 }
136}
137
138#[fxfs_trace::trace]
140pub async fn compact_with_iterator<K: Key, V: Value, W: WriteBytes + Send>(
141 mut iterator: impl LayerIterator<K, V>,
142 num_items: usize,
143 writer: W,
144 block_size: u64,
145 mut yielder: Option<impl Yielder>,
146) -> Result<u64, Error> {
147 let mut writer = PersistentLayerWriter::<W, K, V>::new(writer, num_items, block_size).await?;
148 while let Some(item_ref) = iterator.get() {
149 debug!(item_ref:?; "compact: writing");
150 writer.write(item_ref).await?;
151 iterator.advance().await?;
152 if let Some(y) = yielder.as_mut() {
153 y.yield_now().await;
154 }
155 }
156 writer.complete().await
157}
158
159pub struct LSMTree<K, V> {
163 data: RwLock<Inner<K, V>>,
164 merge_fn: merge::MergeFn<K, V>,
165 cache: Box<dyn ObjectCache<K, V>>,
166 counters: Arc<TreeCounters>,
167}
168
169#[fxfs_trace::trace]
170impl<'tree, K: MergeableKey, V: Value> LSMTree<K, V> {
171 pub fn new(merge_fn: merge::MergeFn<K, V>, cache: Box<dyn ObjectCache<K, V>>) -> Self {
173 let counters = TreeCounters::default();
174 counters.compaction.lock().unwrap().max_layer_count = 1;
175 LSMTree {
176 data: RwLock::new(Inner {
177 mutable_layer: Self::new_mutable_layer(),
178 layers: Vec::new(),
179 mutation_callback: None,
180 }),
181 merge_fn,
182 cache,
183 counters: Arc::new(counters),
184 }
185 }
186
187 pub async fn open(
189 merge_fn: merge::MergeFn<K, V>,
190 handles: impl IntoIterator<Item = impl ReadObjectHandle + 'static>,
191 cache: Box<dyn ObjectCache<K, V>>,
192 ) -> Result<Self, Error> {
193 let layers = layers_from_handles(handles).await?;
194 let max_layer_count = layers.len() as u64 + 1;
195 let counters = TreeCounters::default();
196 counters.compaction.lock().unwrap().max_layer_count = max_layer_count;
197 Ok(LSMTree {
198 data: RwLock::new(Inner {
199 mutable_layer: Self::new_mutable_layer(),
200 layers,
201 mutation_callback: None,
202 }),
203 merge_fn,
204 cache,
205 counters: Arc::new(counters),
206 })
207 }
208
209 pub fn set_layers(&self, layers: Vec<Arc<dyn Layer<K, V>>>) {
211 let mut data = self.data.write();
212 data.layers = layers;
213 let layer_count = data.layers.len() + 1;
214 let mut counters = self.counters.compaction.lock().unwrap();
215 counters.max_layer_count = std::cmp::max(counters.max_layer_count, layer_count as u64);
216 }
217
218 pub async fn append_layers(
221 &self,
222 handles: impl IntoIterator<Item = impl ReadObjectHandle + 'static>,
223 ) -> Result<(), Error> {
224 let mut layers = layers_from_handles(handles).await?;
225 let mut data = self.data.write();
226 data.layers.append(&mut layers);
227 let layer_count = data.layers.len() + 1;
228 let mut counters = self.counters.compaction.lock().unwrap();
229 counters.max_layer_count = std::cmp::max(counters.max_layer_count, layer_count as u64);
230 Ok(())
231 }
232
233 pub fn reset_immutable_layers(&self) {
235 self.data.write().layers = Vec::new();
236 }
237
238 pub fn seal(&self) {
240 let mut data = self.data.write();
243 let layer = std::mem::replace(&mut data.mutable_layer, Self::new_mutable_layer());
244 data.layers.insert(0, layer);
245 let layer_count = data.layers.len() + 1;
246 let mut counters = self.counters.compaction.lock().unwrap();
247 counters.max_layer_count = std::cmp::max(counters.max_layer_count, layer_count as u64);
248 counters.total_layers_added += 1;
249 }
250
251 pub fn reset(&self) {
253 let mut data = self.data.write();
254 data.layers = Vec::new();
255 data.mutable_layer = Self::new_mutable_layer();
256 }
257
258 pub fn report_compaction_metrics(
259 &self,
260 bytes_written: u64,
261 duration: std::time::Duration,
262 layer_count: usize,
263 ) {
264 let mut counters = self.counters.compaction.lock().unwrap();
265 counters.compactions += 1;
266 counters.compaction_bytes_written += bytes_written;
267 counters.compaction_time_ns += duration.as_nanos() as u64;
268
269 let bucket = if bytes_written == 0 {
270 0
271 } else {
272 std::cmp::min(LOG2_HISTOGRAM_BUCKETS - 1, 63 - bytes_written.leading_zeros() as usize)
273 };
274 counters.layer_size_histogram[bucket] += 1;
275
276 crate::metrics::lsm_tree_metrics().compaction_layer_stack_depth.insert(layer_count as u64);
277 }
278
279 pub fn compaction_bytes_written(&self) -> u64 {
280 self.counters.compaction.lock().unwrap().compaction_bytes_written
281 }
282
283 pub fn empty_layer_set(&self) -> LayerSet<K, V> {
285 LayerSet { layers: Vec::new(), merge_fn: self.merge_fn, counters: self.counters.clone() }
286 }
287
288 pub fn add_all_layers_to_layer_set(&self, layer_set: &mut LayerSet<K, V>) {
290 let data = self.data.read();
291 layer_set.layers.reserve_exact(data.layers.len() + 1);
292 layer_set
293 .layers
294 .push(LockedLayer::from(data.mutable_layer.clone() as Arc<dyn Layer<K, V>>));
295 for layer in &data.layers {
296 layer_set.layers.push(layer.clone().into());
297 }
298 }
299
300 pub fn layer_set(&self) -> LayerSet<K, V> {
303 let mut layer_set = self.empty_layer_set();
304 self.add_all_layers_to_layer_set(&mut layer_set);
305 layer_set
306 }
307
308 pub fn immutable_layer_set(&self) -> LayerSet<K, V> {
312 let data = self.data.read();
313 let mut layers = Vec::with_capacity(data.layers.len());
314 for layer in &data.layers {
315 layers.push(layer.clone().into());
316 }
317 LayerSet { layers, merge_fn: self.merge_fn, counters: self.counters.clone() }
318 }
319
320 pub fn insert(&self, item: Item<K, V>) -> Result<(), Error> {
323 let _measure = DurationMeasureScope::new(&crate::metrics::lsm_tree_metrics().insert);
324
325 let key = item.key.clone();
326 let val = if item.value == V::DELETED_MARKER { None } else { Some(item.value.clone()) };
327 {
328 let data = self.data.read();
330 if let Some(mutation_callback) = data.mutation_callback.as_ref() {
331 mutation_callback(Operation::Insert, &item);
332 }
333 data.mutable_layer.insert(item)?;
334 }
335 self.cache.invalidate(key, val);
336 Ok(())
337 }
338
339 pub fn replace_or_insert(&self, item: Item<K, V>) {
341 let _measure =
342 DurationMeasureScope::new(&crate::metrics::lsm_tree_metrics().replace_or_insert);
343
344 let key = item.key.clone();
345 let val = if item.value == V::DELETED_MARKER { None } else { Some(item.value.clone()) };
346 {
347 let data = self.data.read();
349 if let Some(mutation_callback) = data.mutation_callback.as_ref() {
350 mutation_callback(Operation::ReplaceOrInsert, &item);
351 }
352 data.mutable_layer.replace_or_insert(item);
353 }
354 self.cache.invalidate(key, val);
355 }
356
357 pub fn merge_into(&self, item: Item<K, V>, lower_bound: &K) {
359 let _measure = DurationMeasureScope::new(&crate::metrics::lsm_tree_metrics().merge_into);
360
361 let key = item.key.clone();
362 {
363 let data = self.data.read();
365 if let Some(mutation_callback) = data.mutation_callback.as_ref() {
366 mutation_callback(Operation::MergeInto, &item);
367 }
368 data.mutable_layer.merge_into(item, lower_bound, self.merge_fn);
369 }
370 self.cache.invalidate(key, None);
371 }
372
373 pub async fn find(&self, search_key: &K) -> Result<Option<Item<K, V>>, Error>
376 where
377 K: Eq,
378 {
379 let _measure = DurationMeasureScope::new(&crate::metrics::lsm_tree_metrics().find);
380 let token = match self.cache.lookup_or_reserve(search_key) {
384 ObjectCacheResult::Value(value) => {
385 if value == V::DELETED_MARKER {
386 return Ok(None);
387 } else {
388 return Ok(Some(Item::new(search_key.clone(), value)));
389 }
390 }
391 ObjectCacheResult::Placeholder(token) => Some(token),
392 ObjectCacheResult::NoCache => None,
393 };
394 let layer_set = self.layer_set();
395 let mut merger = layer_set.merger();
396
397 Ok(match merger.query(Query::Point(search_key)).await?.get() {
398 Some(ItemRef { key, value }) if key == search_key && *value != V::DELETED_MARKER => {
399 if let Some(token) = token {
400 token.complete(Some(value));
401 }
402 Some(Item { key: key.clone(), value: value.clone() })
403 }
404 _ => None,
405 })
406 }
407
408 pub fn mutable_layer(&self) -> Arc<SkipListLayer<K, V>> {
409 self.data.read().mutable_layer.clone()
410 }
411
412 pub fn set_mutation_callback(&self, mutation_callback: MutationCallback<K, V>) {
416 self.data.write().mutation_callback = mutation_callback;
417 }
418
419 pub fn get_earliest_version(&self) -> Version {
421 let mut earliest_version = LATEST_VERSION;
422 for layer in self.layer_set().layers {
423 let layer_version = layer.get_version();
424 if layer_version < earliest_version {
425 earliest_version = layer_version;
426 }
427 }
428 return earliest_version;
429 }
430
431 pub fn new_mutable_layer() -> Arc<SkipListLayer<K, V>> {
433 SkipListLayer::new(SKIP_LIST_LAYER_ITEMS)
434 }
435
436 pub fn set_mutable_layer(&self, layer: Arc<SkipListLayer<K, V>>) {
438 self.data.write().mutable_layer = layer;
439 }
440
441 pub fn record_inspect_data(&self, root: &fuchsia_inspect::Node) {
443 let layer_set = self.layer_set();
444 root.record_child("layers", move |node| {
445 let mut index = 0;
446 for layer in layer_set.layers {
447 node.record_child(format!("{index}"), move |node| {
448 layer.1.record_inspect_data(node)
449 });
450 index += 1;
451 }
452 });
453 {
454 let counters = self.counters.compaction.lock().unwrap();
455 root.record_uint("num_seeks", self.counters.num_seeks.load(Ordering::Relaxed) as u64);
456 root.record_uint("bloom_filter_success_percent", {
457 let layer_files_total = self.counters.layer_files_total.load(Ordering::Relaxed);
458 let layer_files_skipped = self.counters.layer_files_skipped.load(Ordering::Relaxed);
459 if layer_files_total == 0 {
460 0
461 } else {
462 (layer_files_skipped * 100).div_ceil(layer_files_total) as u64
463 }
464 });
465 root.record_uint("compactions", counters.compactions);
466 root.record_uint("compaction_bytes_written", counters.compaction_bytes_written);
467 root.record_uint("compaction_time_ns", counters.compaction_time_ns);
468 root.record_uint("total_layers_added", counters.total_layers_added);
469 root.record_uint("max_layer_count", counters.max_layer_count);
470
471 let layer_sizes = root.create_uint_exponential_histogram(
472 "layer_size_histogram_log2",
473 fuchsia_inspect::ExponentialHistogramParams {
474 floor: 1,
475 initial_step: 1,
476 step_multiplier: 2,
477 buckets: LOG2_HISTOGRAM_BUCKETS,
478 },
479 );
480 for (i, count) in counters.layer_size_histogram.iter().enumerate() {
481 layer_sizes.insert_multiple(1u64 << i, *count as usize);
482 }
483 root.record(layer_sizes);
484 }
485 }
486}
487
488pub struct LockedLayer<K, V>(Arc<DropEvent>, Arc<dyn Layer<K, V>>);
491
492impl<K, V> LockedLayer<K, V> {
493 pub async fn close_layer(self) {
494 let layer = self.1;
495 std::mem::drop(self.0);
496 layer.close().await;
497 }
498}
499
500impl<K, V> From<Arc<dyn Layer<K, V>>> for LockedLayer<K, V> {
501 fn from(layer: Arc<dyn Layer<K, V>>) -> Self {
502 let event = layer.lock().unwrap();
503 Self(event, layer)
504 }
505}
506
507impl<K, V> std::ops::Deref for LockedLayer<K, V> {
508 type Target = Arc<dyn Layer<K, V>>;
509
510 fn deref(&self) -> &Self::Target {
511 &self.1
512 }
513}
514
515impl<K, V> AsRef<dyn Layer<K, V>> for LockedLayer<K, V> {
516 fn as_ref(&self) -> &(dyn Layer<K, V> + 'static) {
517 self.1.as_ref()
518 }
519}
520
521pub struct LayerSet<K, V> {
524 pub layers: Vec<LockedLayer<K, V>>,
525 merge_fn: merge::MergeFn<K, V>,
526 counters: Arc<TreeCounters>,
527}
528
529impl<K: Key + LayerKey + OrdLowerBound, V: Value> LayerSet<K, V> {
530 pub fn sum_len(&self) -> usize {
531 let mut size = 0;
532 for layer in &self.layers {
533 size += layer.len()
534 }
535 size
536 }
537
538 pub fn merger(&self) -> merge::Merger<'_, K, V> {
539 merge::Merger::new(
540 self.layers.iter().map(|x| x.as_ref()),
541 self.merge_fn,
542 self.counters.clone(),
543 )
544 }
545
546 pub async fn key_exists(&self, key: &K) -> Result<Existence, Error> {
548 for l in &self.layers {
549 match l.key_exists(key).await? {
550 e @ (Existence::Exists | Existence::MaybeExists) => return Ok(e),
551 _ => {}
552 }
553 }
554 Ok(Existence::Missing)
555 }
556}
557
558impl<K, V> fmt::Debug for LayerSet<K, V> {
559 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
560 fmt.debug_list()
561 .entries(self.layers.iter().map(|l| {
562 if let Some(handle) = l.handle() {
563 format!("{}", handle.object_id())
564 } else {
565 format!("{:?}", Arc::as_ptr(l))
566 }
567 }))
568 .finish()
569 }
570}
571
572pub trait Yielder: Send {
574 fn yield_now(&mut self) -> impl Future<Output = ()> + Send;
575}
576
577#[cfg(test)]
578mod tests {
579 use super::{LSMTree, Yielder, compact_with_iterator};
580 use crate::drop_event::DropEvent;
581 use crate::lsm_tree::cache::{
582 NullCache, ObjectCache, ObjectCachePlaceholder, ObjectCacheResult,
583 };
584 use crate::lsm_tree::merge::{MergeLayerIterator, MergeResult};
585 use crate::lsm_tree::types::{
586 BoxedLayerIterator, Existence, Item, ItemRef, Key, Layer, LayerIterator, Value,
587 };
588 use crate::lsm_tree::{Query, layers_from_handles};
589 use crate::object_handle::ObjectHandle;
590 use crate::serialized_types::{LATEST_VERSION, Version};
591 use crate::testing::fake_object::{FakeObject, FakeObjectHandle};
592 use crate::testing::writer::Writer;
593 use anyhow::{Error, anyhow};
594 use async_trait::async_trait;
595
596 use fuchsia_sync::Mutex;
597
598 use rand::rng;
599 use rand::seq::SliceRandom;
600
601 use std::sync::Arc;
602
603 use super::testing::TestKey;
604
605 fn emit_left_merge_fn(
606 _left: &MergeLayerIterator<'_, TestKey, u64>,
607 _right: &MergeLayerIterator<'_, TestKey, u64>,
608 ) -> MergeResult<TestKey, u64> {
609 MergeResult::EmitLeft
610 }
611
612 impl Value for u64 {
613 const DELETED_MARKER: Self = 0;
614 }
615
616 struct NoOpYielder;
617 impl Yielder for NoOpYielder {
618 async fn yield_now(&mut self) {}
619 }
620
621 #[fuchsia::test]
622 async fn test_iteration() {
623 let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
624 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
625 tree.insert(items[0].clone()).expect("insert error");
626 tree.insert(items[1].clone()).expect("insert error");
627 let layers = tree.layer_set();
628 let mut merger = layers.merger();
629 let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
630 let ItemRef { key, value, .. } = iter.get().expect("missing item");
631 assert_eq!((key, value), (&items[0].key, &items[0].value));
632 iter.advance().await.expect("advance failed");
633 let ItemRef { key, value, .. } = iter.get().expect("missing item");
634 assert_eq!((key, value), (&items[1].key, &items[1].value));
635 iter.advance().await.expect("advance failed");
636 assert!(iter.get().is_none());
637 }
638
639 #[fuchsia::test]
640 async fn test_compact() {
641 let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
642 let items = [
643 Item::new(TestKey(1..1), 1),
644 Item::new(TestKey(2..2), 2),
645 Item::new(TestKey(3..3), 3),
646 Item::new(TestKey(4..4), 4),
647 ];
648 tree.insert(items[0].clone()).expect("insert error");
649 tree.insert(items[1].clone()).expect("insert error");
650 tree.seal();
651 tree.insert(items[2].clone()).expect("insert error");
652 tree.insert(items[3].clone()).expect("insert error");
653 tree.seal();
654 let object = Arc::new(FakeObject::new());
655 let handle = FakeObjectHandle::new(object.clone());
656 {
657 let layer_set = tree.immutable_layer_set();
658 let mut merger = layer_set.merger();
659 let iter = merger.query(Query::FullScan).await.expect("create merger");
660 compact_with_iterator(
661 iter,
662 items.len(),
663 Writer::new(&handle).await,
664 handle.block_size(),
665 Option::<NoOpYielder>::None,
666 )
667 .await
668 .expect("compact failed");
669 }
670 tree.set_layers(layers_from_handles([handle]).await.expect("layers_from_handles failed"));
671 let handle = FakeObjectHandle::new(object.clone());
672 let tree = LSMTree::open(emit_left_merge_fn, [handle], Box::new(NullCache {}))
673 .await
674 .expect("open failed");
675
676 let layers = tree.layer_set();
677 let mut merger = layers.merger();
678 let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
679 for i in 1..5 {
680 let ItemRef { key, value, .. } = iter.get().expect("missing item");
681 assert_eq!((key, value), (&TestKey(i..i), &i));
682 iter.advance().await.expect("advance failed");
683 }
684 assert!(iter.get().is_none());
685 }
686
687 #[fuchsia::test]
688 async fn test_find() {
689 let items = [
690 Item::new(TestKey(1..1), 1),
691 Item::new(TestKey(2..2), 2),
692 Item::new(TestKey(3..3), 3),
693 Item::new(TestKey(4..4), 4),
694 ];
695 let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
696 tree.insert(items[0].clone()).expect("insert error");
697 tree.insert(items[1].clone()).expect("insert error");
698 tree.seal();
699 tree.insert(items[2].clone()).expect("insert error");
700 tree.insert(items[3].clone()).expect("insert error");
701
702 let item = tree.find(&items[1].key).await.expect("find failed").expect("not found");
703 assert_eq!(item, items[1]);
704 assert!(tree.find(&TestKey(100..100)).await.expect("find failed").is_none());
705 }
706
707 #[fuchsia::test]
708 async fn test_find_no_return_deleted_values() {
709 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), u64::DELETED_MARKER)];
710 let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
711 tree.insert(items[0].clone()).expect("insert error");
712 tree.insert(items[1].clone()).expect("insert error");
713
714 let item = tree.find(&items[0].key).await.expect("find failed").expect("not found");
715 assert_eq!(item, items[0]);
716 assert!(tree.find(&items[1].key).await.expect("find failed").is_none());
717 }
718
719 #[fuchsia::test]
720 async fn test_empty_seal() {
721 let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
722 tree.seal();
723 let item = Item::new(TestKey(1..1), 1);
724 tree.insert(item.clone()).expect("insert error");
725 let object = Arc::new(FakeObject::new());
726 let handle = FakeObjectHandle::new(object.clone());
727 {
728 let layer_set = tree.immutable_layer_set();
729 let mut merger = layer_set.merger();
730 let iter = merger.query(Query::FullScan).await.expect("create merger");
731 compact_with_iterator(
732 iter,
733 0,
734 Writer::new(&handle).await,
735 handle.block_size(),
736 Option::<NoOpYielder>::None,
737 )
738 .await
739 .expect("compact failed");
740 }
741 tree.set_layers(layers_from_handles([handle]).await.expect("layers_from_handles failed"));
742 let found_item = tree.find(&item.key).await.expect("find failed").expect("not found");
743 assert_eq!(found_item, item);
744 assert!(tree.find(&TestKey(2..2)).await.expect("find failed").is_none());
745 }
746
747 #[fuchsia::test]
748 async fn test_filter() {
749 let items = [
750 Item::new(TestKey(1..1), 1),
751 Item::new(TestKey(2..2), 2),
752 Item::new(TestKey(3..3), 3),
753 Item::new(TestKey(4..4), 4),
754 ];
755 let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
756 tree.insert(items[0].clone()).expect("insert error");
757 tree.insert(items[1].clone()).expect("insert error");
758 tree.insert(items[2].clone()).expect("insert error");
759 tree.insert(items[3].clone()).expect("insert error");
760
761 let layers = tree.layer_set();
762 let mut merger = layers.merger();
763
764 let mut iter = merger
766 .query(Query::FullScan)
767 .await
768 .expect("seek failed")
769 .filter(|item: ItemRef<'_, TestKey, u64>| item.key.0.start % 2 == 0)
770 .await
771 .expect("filter failed");
772
773 assert_eq!(iter.get(), Some(items[1].as_item_ref()));
774 iter.advance().await.expect("advance failed");
775 assert_eq!(iter.get(), Some(items[3].as_item_ref()));
776 iter.advance().await.expect("advance failed");
777 assert!(iter.get().is_none());
778 }
779
780 #[fuchsia::test]
781 async fn test_insert_order_agnostic() {
782 let items = [
783 Item::new(TestKey(1..1), 1),
784 Item::new(TestKey(2..2), 2),
785 Item::new(TestKey(3..3), 3),
786 Item::new(TestKey(4..4), 4),
787 Item::new(TestKey(5..5), 5),
788 Item::new(TestKey(6..6), 6),
789 ];
790 let a = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
791 for item in &items {
792 a.insert(item.clone()).expect("insert error");
793 }
794 let b = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
795 let mut shuffled = items.clone();
796 shuffled.shuffle(&mut rng());
797 for item in &shuffled {
798 b.insert(item.clone()).expect("insert error");
799 }
800 let layers = a.layer_set();
801 let mut merger = layers.merger();
802 let mut iter_a = merger.query(Query::FullScan).await.expect("seek failed");
803 let layers = b.layer_set();
804 let mut merger = layers.merger();
805 let mut iter_b = merger.query(Query::FullScan).await.expect("seek failed");
806
807 for item in items {
808 assert_eq!(Some(item.as_item_ref()), iter_a.get());
809 assert_eq!(Some(item.as_item_ref()), iter_b.get());
810 iter_a.advance().await.expect("advance failed");
811 iter_b.advance().await.expect("advance failed");
812 }
813 assert!(iter_a.get().is_none());
814 assert!(iter_b.get().is_none());
815 }
816
817 struct AuditCacheInner<'a, V: Value> {
818 lookups: u64,
819 completions: u64,
820 invalidations: u64,
821 drops: u64,
822 result: Option<ObjectCacheResult<'a, V>>,
823 }
824
825 impl<V: Value> AuditCacheInner<'_, V> {
826 fn stats(&self) -> (u64, u64, u64, u64) {
827 (self.lookups, self.completions, self.invalidations, self.drops)
828 }
829 }
830
831 struct AuditCache<'a, V: Value> {
832 inner: Arc<Mutex<AuditCacheInner<'a, V>>>,
833 }
834
835 impl<V: Value> AuditCache<'_, V> {
836 fn new() -> Self {
837 Self {
838 inner: Arc::new(Mutex::new(AuditCacheInner {
839 lookups: 0,
840 completions: 0,
841 invalidations: 0,
842 drops: 0,
843 result: None,
844 })),
845 }
846 }
847 }
848
849 struct AuditPlaceholder<'a, V: Value> {
850 inner: Arc<Mutex<AuditCacheInner<'a, V>>>,
851 completed: Mutex<bool>,
852 }
853
854 impl<V: Value> ObjectCachePlaceholder<V> for AuditPlaceholder<'_, V> {
855 fn complete(self: Box<Self>, _: Option<&V>) {
856 self.inner.lock().completions += 1;
857 *self.completed.lock() = true;
858 }
859 }
860
861 impl<V: Value> Drop for AuditPlaceholder<'_, V> {
862 fn drop(&mut self) {
863 if !*self.completed.lock() {
864 self.inner.lock().drops += 1;
865 }
866 }
867 }
868
869 impl<K: Key + std::cmp::PartialEq, V: Value> ObjectCache<K, V> for AuditCache<'_, V> {
870 fn lookup_or_reserve(&self, _key: &K) -> ObjectCacheResult<'_, V> {
871 {
872 let mut inner = self.inner.lock();
873 inner.lookups += 1;
874 if inner.result.is_some() {
875 return std::mem::take(&mut inner.result).unwrap();
876 }
877 }
878 ObjectCacheResult::Placeholder(Box::new(AuditPlaceholder {
879 inner: self.inner.clone(),
880 completed: Mutex::new(false),
881 }))
882 }
883
884 fn invalidate(&self, _key: K, _value: Option<V>) {
885 self.inner.lock().invalidations += 1;
886 }
887 }
888
889 #[fuchsia::test]
890 async fn test_cache_handling() {
891 let item = Item::new(TestKey(1..1), 1);
892 let cache = Box::new(AuditCache::new());
893 let inner = cache.inner.clone();
894 let a = LSMTree::new(emit_left_merge_fn, cache);
895
896 assert_eq!(inner.lock().stats(), (0, 0, 0, 0));
898
899 assert!(a.find(&item.key).await.expect("Failed find").is_none());
901 assert_eq!(inner.lock().stats(), (1, 0, 0, 1));
902
903 let _ = a.insert(item.clone());
905 assert_eq!(inner.lock().stats(), (1, 0, 1, 1));
906
907 assert_eq!(
909 a.find(&item.key).await.expect("Failed find").expect("Item should be found.").value,
910 item.value
911 );
912 assert_eq!(inner.lock().stats(), (2, 1, 1, 1));
913
914 a.replace_or_insert(item.clone());
916 assert_eq!(inner.lock().stats(), (2, 1, 2, 1));
917 }
918
919 #[fuchsia::test]
920 async fn test_cache_hit() {
921 let item = Item::new(TestKey(1..1), 1);
922 let cache = Box::new(AuditCache::new());
923 let inner = cache.inner.clone();
924 let a = LSMTree::new(emit_left_merge_fn, cache);
925
926 assert_eq!(inner.lock().stats(), (0, 0, 0, 0));
928
929 let _ = a.insert(item.clone());
931 assert_eq!(inner.lock().stats(), (0, 0, 1, 0));
932
933 inner.lock().result = Some(ObjectCacheResult::Value(item.value.clone()));
935
936 assert_eq!(
938 a.find(&item.key).await.expect("Failed find").expect("Item should be found.").value,
939 item.value
940 );
941 assert_eq!(inner.lock().stats(), (1, 0, 1, 0));
942 }
943
944 #[fuchsia::test]
945 async fn test_cache_says_uncacheable() {
946 let item = Item::new(TestKey(1..1), 1);
947 let cache = Box::new(AuditCache::new());
948 let inner = cache.inner.clone();
949 let a = LSMTree::new(emit_left_merge_fn, cache);
950 let _ = a.insert(item.clone());
951
952 assert_eq!(inner.lock().stats(), (0, 0, 1, 0));
954
955 inner.lock().result = Some(ObjectCacheResult::NoCache);
957
958 assert_eq!(
960 a.find(&item.key).await.expect("Failed find").expect("Should find item").value,
961 item.value
962 );
963 assert_eq!(inner.lock().stats(), (1, 0, 1, 0));
964 }
965
966 struct FailLayer {
967 drop_event: Mutex<Option<Arc<DropEvent>>>,
968 }
969
970 impl FailLayer {
971 fn new() -> Self {
972 Self { drop_event: Mutex::new(Some(Arc::new(DropEvent::new()))) }
973 }
974 }
975
976 #[async_trait]
977 impl<K: Key, V: Value> Layer<K, V> for FailLayer {
978 async fn seek(
979 &self,
980 _bound: std::ops::Bound<&K>,
981 ) -> Result<BoxedLayerIterator<'_, K, V>, Error> {
982 Err(anyhow!("Purposely failed seek"))
983 }
984
985 fn lock(&self) -> Option<Arc<DropEvent>> {
986 self.drop_event.lock().clone()
987 }
988
989 fn len(&self) -> usize {
990 0
991 }
992
993 async fn close(&self) {
994 let listener = match std::mem::replace(&mut (*self.drop_event.lock()), None) {
995 Some(drop_event) => drop_event.listen(),
996 None => return,
997 };
998 listener.await;
999 }
1000
1001 fn get_version(&self) -> Version {
1002 LATEST_VERSION
1003 }
1004
1005 async fn key_exists(&self, _key: &K) -> Result<Existence, Error> {
1006 unimplemented!();
1007 }
1008 }
1009
1010 struct MockLayer {
1011 exists_result: Existence,
1012 drop_event: Mutex<Option<Arc<DropEvent>>>,
1013 }
1014
1015 impl MockLayer {
1016 fn new(exists_result: Existence) -> Self {
1017 Self { exists_result, drop_event: Mutex::new(Some(Arc::new(DropEvent::new()))) }
1018 }
1019 }
1020
1021 #[async_trait]
1022 impl<K: Key, V: Value> Layer<K, V> for MockLayer {
1023 async fn seek(
1024 &self,
1025 _bound: std::ops::Bound<&K>,
1026 ) -> Result<BoxedLayerIterator<'_, K, V>, Error> {
1027 unimplemented!()
1028 }
1029
1030 fn lock(&self) -> Option<Arc<DropEvent>> {
1031 self.drop_event.lock().clone()
1032 }
1033
1034 fn len(&self) -> usize {
1035 0
1036 }
1037
1038 async fn close(&self) {
1039 let listener = match std::mem::replace(&mut (*self.drop_event.lock()), None) {
1040 Some(drop_event) => drop_event.listen(),
1041 None => return,
1042 };
1043 listener.await;
1044 }
1045
1046 fn get_version(&self) -> Version {
1047 LATEST_VERSION
1048 }
1049
1050 async fn key_exists(&self, _key: &K) -> Result<Existence, Error> {
1051 Ok(self.exists_result)
1052 }
1053 }
1054
1055 #[fuchsia::test]
1056 async fn test_layer_set_key_exists() {
1057 use super::LockedLayer;
1058
1059 let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
1060 let mut layer_set = tree.empty_layer_set();
1061
1062 assert_eq!(
1064 layer_set.key_exists(&TestKey(0..1)).await.expect("key_exists failed"),
1065 Existence::Missing
1066 );
1067
1068 layer_set.layers.push(LockedLayer::from(
1070 Arc::new(MockLayer::new(Existence::Missing)) as Arc<dyn Layer<TestKey, u64>>
1071 ));
1072 assert_eq!(
1073 layer_set.key_exists(&TestKey(0..1)).await.expect("key_exists failed"),
1074 Existence::Missing
1075 );
1076
1077 layer_set.layers.push(LockedLayer::from(
1079 Arc::new(MockLayer::new(Existence::MaybeExists)) as Arc<dyn Layer<TestKey, u64>>
1080 ));
1081 assert_eq!(
1082 layer_set.key_exists(&TestKey(0..1)).await.expect("key_exists failed"),
1083 Existence::MaybeExists
1084 );
1085
1086 layer_set.layers.insert(
1088 0,
1089 LockedLayer::from(
1090 Arc::new(MockLayer::new(Existence::Exists)) as Arc<dyn Layer<TestKey, u64>>
1091 ),
1092 );
1093 assert_eq!(
1094 layer_set.key_exists(&TestKey(0..1)).await.expect("key_exists failed"),
1095 Existence::Exists
1096 );
1097 }
1098
1099 #[fuchsia::test]
1100 async fn test_failed_lookup() {
1101 let cache = Box::new(AuditCache::new());
1102 let inner = cache.inner.clone();
1103 let a = LSMTree::new(emit_left_merge_fn, cache);
1104 a.set_layers(vec![Arc::new(FailLayer::new())]);
1105
1106 assert_eq!(inner.lock().stats(), (0, 0, 0, 0));
1108
1109 assert!(a.find(&TestKey(1..1)).await.is_err());
1111 assert_eq!(inner.lock().stats(), (1, 0, 0, 1));
1112 }
1113}
1114
1115#[cfg(fuzz)]
1116mod fuzz {
1117 use crate::lsm_tree::types::{Item, Value};
1118 use crate::serialized_types::{
1119 LATEST_VERSION, Version, Versioned, VersionedLatest, versioned_type,
1120 };
1121 use arbitrary::Arbitrary;
1122
1123 use fuzz::fuzz;
1124
1125 use super::testing::TestKey;
1126
1127 impl Versioned for u64 {}
1128 versioned_type! { 1.. => u64 }
1129
1130 impl Value for u64 {
1131 const DELETED_MARKER: Self = 0;
1132 }
1133
1134 #[allow(dead_code)]
1138 #[derive(Arbitrary)]
1139 enum FuzzAction {
1140 Insert(Item<TestKey, u64>),
1141 ReplaceOrInsert(Item<TestKey, u64>),
1142 MergeInto(Item<TestKey, u64>, TestKey),
1143 Find(TestKey),
1144 Seal,
1145 }
1146
1147 #[fuzz]
1148 fn fuzz_lsm_tree_actions(actions: Vec<FuzzAction>) {
1149 use super::LSMTree;
1150 use super::cache::NullCache;
1151 use crate::lsm_tree::merge::{MergeLayerIterator, MergeResult};
1152 use futures::executor::block_on;
1153
1154 fn emit_left_merge_fn(
1155 _left: &MergeLayerIterator<'_, TestKey, u64>,
1156 _right: &MergeLayerIterator<'_, TestKey, u64>,
1157 ) -> MergeResult<TestKey, u64> {
1158 MergeResult::EmitLeft
1159 }
1160
1161 let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
1162 for action in actions {
1163 match action {
1164 FuzzAction::Insert(item) => {
1165 let _ = tree.insert(item);
1166 }
1167 FuzzAction::ReplaceOrInsert(item) => {
1168 tree.replace_or_insert(item);
1169 }
1170 FuzzAction::Find(key) => {
1171 block_on(tree.find(&key)).expect("find failed");
1172 }
1173 FuzzAction::MergeInto(item, bound) => tree.merge_into(item, &bound),
1174 FuzzAction::Seal => tree.seal(),
1175 };
1176 }
1177 }
1178}