1mod bloom_filter;
6pub mod cache;
7pub mod merge;
8pub mod persistent_layer;
9pub mod skip_list_layer;
10pub mod types;
11
12use crate::drop_event::DropEvent;
13use crate::log::*;
14use crate::metrics::DurationMeasureScope;
15use crate::object_handle::{ReadObjectHandle, WriteBytes};
16use crate::serialized_types::{LATEST_VERSION, Version};
17
18use anyhow::Error;
19use cache::{ObjectCache, ObjectCacheResult};
20
21use fuchsia_inspect::HistogramProperty;
22use fuchsia_sync::RwLock;
23use persistent_layer::{PersistentLayer, PersistentLayerWriter};
24use skip_list_layer::SkipListLayer;
25use std::fmt;
26use std::sync::atomic::{AtomicUsize, Ordering};
27use std::sync::{Arc, Mutex};
28use types::{
29 Existence, Item, ItemRef, Key, Layer, LayerIterator, LayerKey, LayerWriter, MergeableKey,
30 OrdLowerBound, Value,
31};
32
33pub use merge::Query;
34
35const SKIP_LIST_LAYER_ITEMS: usize = 512;
36
37pub use persistent_layer::{
39 LayerHeader as PersistentLayerHeader, LayerHeaderV39 as PersistentLayerHeaderV39,
40 LayerInfo as PersistentLayerInfo, LayerInfoV39 as PersistentLayerInfoV39,
41};
42
43pub async fn layers_from_handles<K: Key, V: Value>(
44 handles: impl IntoIterator<Item = impl ReadObjectHandle + 'static>,
45) -> Result<Vec<Arc<dyn Layer<K, V>>>, Error> {
46 let mut layers = Vec::new();
47 for handle in handles {
48 layers.push(PersistentLayer::open(handle).await? as Arc<dyn Layer<K, V>>);
49 }
50 Ok(layers)
51}
52
53#[derive(Eq, PartialEq, Debug)]
54pub enum Operation {
55 Insert,
56 ReplaceOrInsert,
57 MergeInto,
58}
59
60pub type MutationCallback<K, V> = Option<Box<dyn Fn(Operation, &Item<K, V>) + Send + Sync>>;
61
62struct Inner<K, V> {
63 mutable_layer: Arc<SkipListLayer<K, V>>,
64 layers: Vec<Arc<dyn Layer<K, V>>>,
65 mutation_callback: MutationCallback<K, V>,
66}
67
68pub const LOG2_HISTOGRAM_BUCKETS: usize = 32;
69
70pub struct CompactionCounters {
72 pub compactions: u64,
74 pub compaction_bytes_written: u64,
76 pub compaction_time_ns: u64,
78 pub total_layers_added: u64,
80 pub max_layer_count: u64,
82 pub layer_size_histogram: [u64; LOG2_HISTOGRAM_BUCKETS],
84}
85
86impl Default for CompactionCounters {
87 fn default() -> Self {
88 Self {
89 compactions: 0,
90 compaction_bytes_written: 0,
91 compaction_time_ns: 0,
92 total_layers_added: 0,
93 max_layer_count: 0,
94 layer_size_histogram: [0; LOG2_HISTOGRAM_BUCKETS],
95 }
96 }
97}
98
99pub struct TreeCounters {
101 pub num_seeks: AtomicUsize,
103 pub layer_files_total: AtomicUsize,
106 pub layer_files_skipped: AtomicUsize,
108 pub compaction: Mutex<CompactionCounters>,
110}
111
112impl Default for TreeCounters {
113 fn default() -> Self {
114 Self {
115 num_seeks: AtomicUsize::new(0),
116 layer_files_total: AtomicUsize::new(0),
117 layer_files_skipped: AtomicUsize::new(0),
118 compaction: Mutex::new(CompactionCounters::default()),
119 }
120 }
121}
122
123#[fxfs_trace::trace]
125pub async fn compact_with_iterator<K: Key, V: Value, W: WriteBytes + Send>(
126 mut iterator: impl LayerIterator<K, V>,
127 num_items: usize,
128 writer: W,
129 block_size: u64,
130 mut yielder: Option<impl Yielder>,
131) -> Result<u64, Error> {
132 let mut writer = PersistentLayerWriter::<W, K, V>::new(writer, num_items, block_size).await?;
133 while let Some(item_ref) = iterator.get() {
134 debug!(item_ref:?; "compact: writing");
135 writer.write(item_ref).await?;
136 iterator.advance().await?;
137 if let Some(y) = yielder.as_mut() {
138 y.yield_now().await;
139 }
140 }
141 writer.flush().await?;
142
143 Ok(writer.bytes_written())
144}
145
146pub struct LSMTree<K, V> {
150 data: RwLock<Inner<K, V>>,
151 merge_fn: merge::MergeFn<K, V>,
152 cache: Box<dyn ObjectCache<K, V>>,
153 counters: Arc<TreeCounters>,
154}
155
156#[fxfs_trace::trace]
157impl<'tree, K: MergeableKey, V: Value> LSMTree<K, V> {
158 pub fn new(merge_fn: merge::MergeFn<K, V>, cache: Box<dyn ObjectCache<K, V>>) -> Self {
160 let counters = TreeCounters::default();
161 counters.compaction.lock().unwrap().max_layer_count = 1;
162 LSMTree {
163 data: RwLock::new(Inner {
164 mutable_layer: Self::new_mutable_layer(),
165 layers: Vec::new(),
166 mutation_callback: None,
167 }),
168 merge_fn,
169 cache,
170 counters: Arc::new(counters),
171 }
172 }
173
174 pub async fn open(
176 merge_fn: merge::MergeFn<K, V>,
177 handles: impl IntoIterator<Item = impl ReadObjectHandle + 'static>,
178 cache: Box<dyn ObjectCache<K, V>>,
179 ) -> Result<Self, Error> {
180 let layers = layers_from_handles(handles).await?;
181 let max_layer_count = layers.len() as u64 + 1;
182 let counters = TreeCounters::default();
183 counters.compaction.lock().unwrap().max_layer_count = max_layer_count;
184 Ok(LSMTree {
185 data: RwLock::new(Inner {
186 mutable_layer: Self::new_mutable_layer(),
187 layers,
188 mutation_callback: None,
189 }),
190 merge_fn,
191 cache,
192 counters: Arc::new(counters),
193 })
194 }
195
196 pub fn set_layers(&self, layers: Vec<Arc<dyn Layer<K, V>>>) {
198 let mut data = self.data.write();
199 data.layers = layers;
200 let layer_count = data.layers.len() + 1;
201 let mut counters = self.counters.compaction.lock().unwrap();
202 counters.max_layer_count = std::cmp::max(counters.max_layer_count, layer_count as u64);
203 }
204
205 pub async fn append_layers(
208 &self,
209 handles: impl IntoIterator<Item = impl ReadObjectHandle + 'static>,
210 ) -> Result<(), Error> {
211 let mut layers = layers_from_handles(handles).await?;
212 let mut data = self.data.write();
213 data.layers.append(&mut layers);
214 let layer_count = data.layers.len() + 1;
215 let mut counters = self.counters.compaction.lock().unwrap();
216 counters.max_layer_count = std::cmp::max(counters.max_layer_count, layer_count as u64);
217 Ok(())
218 }
219
220 pub fn reset_immutable_layers(&self) {
222 self.data.write().layers = Vec::new();
223 }
224
225 pub fn seal(&self) {
227 let mut data = self.data.write();
230 let layer = std::mem::replace(&mut data.mutable_layer, Self::new_mutable_layer());
231 data.layers.insert(0, layer);
232 let layer_count = data.layers.len() + 1;
233 let mut counters = self.counters.compaction.lock().unwrap();
234 counters.max_layer_count = std::cmp::max(counters.max_layer_count, layer_count as u64);
235 counters.total_layers_added += 1;
236 }
237
238 pub fn reset(&self) {
240 let mut data = self.data.write();
241 data.layers = Vec::new();
242 data.mutable_layer = Self::new_mutable_layer();
243 }
244
245 pub fn report_compaction_metrics(
246 &self,
247 bytes_written: u64,
248 duration: std::time::Duration,
249 layer_count: usize,
250 ) {
251 let mut counters = self.counters.compaction.lock().unwrap();
252 counters.compactions += 1;
253 counters.compaction_bytes_written += bytes_written;
254 counters.compaction_time_ns += duration.as_nanos() as u64;
255
256 let bucket = if bytes_written == 0 {
257 0
258 } else {
259 std::cmp::min(LOG2_HISTOGRAM_BUCKETS - 1, 63 - bytes_written.leading_zeros() as usize)
260 };
261 counters.layer_size_histogram[bucket] += 1;
262
263 crate::metrics::lsm_tree_metrics().compaction_layer_stack_depth.insert(layer_count as u64);
264 }
265
266 pub fn compaction_bytes_written(&self) -> u64 {
267 self.counters.compaction.lock().unwrap().compaction_bytes_written
268 }
269
270 pub fn empty_layer_set(&self) -> LayerSet<K, V> {
272 LayerSet { layers: Vec::new(), merge_fn: self.merge_fn, counters: self.counters.clone() }
273 }
274
275 pub fn add_all_layers_to_layer_set(&self, layer_set: &mut LayerSet<K, V>) {
277 let data = self.data.read();
278 layer_set.layers.reserve_exact(data.layers.len() + 1);
279 layer_set
280 .layers
281 .push(LockedLayer::from(data.mutable_layer.clone() as Arc<dyn Layer<K, V>>));
282 for layer in &data.layers {
283 layer_set.layers.push(layer.clone().into());
284 }
285 }
286
287 pub fn layer_set(&self) -> LayerSet<K, V> {
290 let mut layer_set = self.empty_layer_set();
291 self.add_all_layers_to_layer_set(&mut layer_set);
292 layer_set
293 }
294
295 pub fn immutable_layer_set(&self) -> LayerSet<K, V> {
299 let data = self.data.read();
300 let mut layers = Vec::with_capacity(data.layers.len());
301 for layer in &data.layers {
302 layers.push(layer.clone().into());
303 }
304 LayerSet { layers, merge_fn: self.merge_fn, counters: self.counters.clone() }
305 }
306
307 pub fn insert(&self, item: Item<K, V>) -> Result<(), Error> {
310 let _measure = DurationMeasureScope::new(&crate::metrics::lsm_tree_metrics().insert);
311
312 let key = item.key.clone();
313 let val = if item.value == V::DELETED_MARKER { None } else { Some(item.value.clone()) };
314 {
315 let data = self.data.read();
317 if let Some(mutation_callback) = data.mutation_callback.as_ref() {
318 mutation_callback(Operation::Insert, &item);
319 }
320 data.mutable_layer.insert(item)?;
321 }
322 self.cache.invalidate(key, val);
323 Ok(())
324 }
325
326 pub fn replace_or_insert(&self, item: Item<K, V>) {
328 let _measure =
329 DurationMeasureScope::new(&crate::metrics::lsm_tree_metrics().replace_or_insert);
330
331 let key = item.key.clone();
332 let val = if item.value == V::DELETED_MARKER { None } else { Some(item.value.clone()) };
333 {
334 let data = self.data.read();
336 if let Some(mutation_callback) = data.mutation_callback.as_ref() {
337 mutation_callback(Operation::ReplaceOrInsert, &item);
338 }
339 data.mutable_layer.replace_or_insert(item);
340 }
341 self.cache.invalidate(key, val);
342 }
343
344 pub fn merge_into(&self, item: Item<K, V>, lower_bound: &K) {
346 let _measure = DurationMeasureScope::new(&crate::metrics::lsm_tree_metrics().merge_into);
347
348 let key = item.key.clone();
349 {
350 let data = self.data.read();
352 if let Some(mutation_callback) = data.mutation_callback.as_ref() {
353 mutation_callback(Operation::MergeInto, &item);
354 }
355 data.mutable_layer.merge_into(item, lower_bound, self.merge_fn);
356 }
357 self.cache.invalidate(key, None);
358 }
359
360 pub async fn find(&self, search_key: &K) -> Result<Option<Item<K, V>>, Error>
363 where
364 K: Eq,
365 {
366 let _measure = DurationMeasureScope::new(&crate::metrics::lsm_tree_metrics().find);
367 let token = match self.cache.lookup_or_reserve(search_key) {
371 ObjectCacheResult::Value(value) => {
372 if value == V::DELETED_MARKER {
373 return Ok(None);
374 } else {
375 return Ok(Some(Item::new(search_key.clone(), value)));
376 }
377 }
378 ObjectCacheResult::Placeholder(token) => Some(token),
379 ObjectCacheResult::NoCache => None,
380 };
381 let layer_set = self.layer_set();
382 let mut merger = layer_set.merger();
383
384 Ok(match merger.query(Query::Point(search_key)).await?.get() {
385 Some(ItemRef { key, value, sequence })
386 if key == search_key && *value != V::DELETED_MARKER =>
387 {
388 if let Some(token) = token {
389 token.complete(Some(value));
390 }
391 Some(Item { key: key.clone(), value: value.clone(), sequence })
392 }
393 _ => None,
394 })
395 }
396
397 pub fn mutable_layer(&self) -> Arc<SkipListLayer<K, V>> {
398 self.data.read().mutable_layer.clone()
399 }
400
401 pub fn set_mutation_callback(&self, mutation_callback: MutationCallback<K, V>) {
405 self.data.write().mutation_callback = mutation_callback;
406 }
407
408 pub fn get_earliest_version(&self) -> Version {
410 let mut earliest_version = LATEST_VERSION;
411 for layer in self.layer_set().layers {
412 let layer_version = layer.get_version();
413 if layer_version < earliest_version {
414 earliest_version = layer_version;
415 }
416 }
417 return earliest_version;
418 }
419
420 pub fn new_mutable_layer() -> Arc<SkipListLayer<K, V>> {
422 SkipListLayer::new(SKIP_LIST_LAYER_ITEMS)
423 }
424
425 pub fn set_mutable_layer(&self, layer: Arc<SkipListLayer<K, V>>) {
427 self.data.write().mutable_layer = layer;
428 }
429
430 pub fn record_inspect_data(&self, root: &fuchsia_inspect::Node) {
432 let layer_set = self.layer_set();
433 root.record_child("layers", move |node| {
434 let mut index = 0;
435 for layer in layer_set.layers {
436 node.record_child(format!("{index}"), move |node| {
437 layer.1.record_inspect_data(node)
438 });
439 index += 1;
440 }
441 });
442 {
443 let counters = self.counters.compaction.lock().unwrap();
444 root.record_uint("num_seeks", self.counters.num_seeks.load(Ordering::Relaxed) as u64);
445 root.record_uint("bloom_filter_success_percent", {
446 let layer_files_total = self.counters.layer_files_total.load(Ordering::Relaxed);
447 let layer_files_skipped = self.counters.layer_files_skipped.load(Ordering::Relaxed);
448 if layer_files_total == 0 {
449 0
450 } else {
451 (layer_files_skipped * 100).div_ceil(layer_files_total) as u64
452 }
453 });
454 root.record_uint("compactions", counters.compactions);
455 root.record_uint("compaction_bytes_written", counters.compaction_bytes_written);
456 root.record_uint("compaction_time_ns", counters.compaction_time_ns);
457 root.record_uint("total_layers_added", counters.total_layers_added);
458 root.record_uint("max_layer_count", counters.max_layer_count);
459
460 let layer_sizes = root.create_uint_exponential_histogram(
461 "layer_size_histogram_log2",
462 fuchsia_inspect::ExponentialHistogramParams {
463 floor: 1,
464 initial_step: 1,
465 step_multiplier: 2,
466 buckets: LOG2_HISTOGRAM_BUCKETS,
467 },
468 );
469 for (i, count) in counters.layer_size_histogram.iter().enumerate() {
470 layer_sizes.insert_multiple(1u64 << i, *count as usize);
471 }
472 root.record(layer_sizes);
473 }
474 }
475}
476
477pub struct LockedLayer<K, V>(Arc<DropEvent>, Arc<dyn Layer<K, V>>);
480
481impl<K, V> LockedLayer<K, V> {
482 pub async fn close_layer(self) {
483 let layer = self.1;
484 std::mem::drop(self.0);
485 layer.close().await;
486 }
487}
488
489impl<K, V> From<Arc<dyn Layer<K, V>>> for LockedLayer<K, V> {
490 fn from(layer: Arc<dyn Layer<K, V>>) -> Self {
491 let event = layer.lock().unwrap();
492 Self(event, layer)
493 }
494}
495
496impl<K, V> std::ops::Deref for LockedLayer<K, V> {
497 type Target = Arc<dyn Layer<K, V>>;
498
499 fn deref(&self) -> &Self::Target {
500 &self.1
501 }
502}
503
504impl<K, V> AsRef<dyn Layer<K, V>> for LockedLayer<K, V> {
505 fn as_ref(&self) -> &(dyn Layer<K, V> + 'static) {
506 self.1.as_ref()
507 }
508}
509
510pub struct LayerSet<K, V> {
513 pub layers: Vec<LockedLayer<K, V>>,
514 merge_fn: merge::MergeFn<K, V>,
515 counters: Arc<TreeCounters>,
516}
517
518impl<K: Key + LayerKey + OrdLowerBound, V: Value> LayerSet<K, V> {
519 pub fn sum_len(&self) -> usize {
520 let mut size = 0;
521 for layer in &self.layers {
522 size += layer.len()
523 }
524 size
525 }
526
527 pub fn merger(&self) -> merge::Merger<'_, K, V> {
528 merge::Merger::new(
529 self.layers.iter().map(|x| x.as_ref()),
530 self.merge_fn,
531 self.counters.clone(),
532 )
533 }
534
535 pub async fn key_exists(&self, key: &K) -> Result<Existence, Error> {
537 for l in &self.layers {
538 match l.key_exists(key).await? {
539 e @ (Existence::Exists | Existence::MaybeExists) => return Ok(e),
540 _ => {}
541 }
542 }
543 Ok(Existence::Missing)
544 }
545}
546
547impl<K, V> fmt::Debug for LayerSet<K, V> {
548 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
549 fmt.debug_list()
550 .entries(self.layers.iter().map(|l| {
551 if let Some(handle) = l.handle() {
552 format!("{}", handle.object_id())
553 } else {
554 format!("{:?}", Arc::as_ptr(l))
555 }
556 }))
557 .finish()
558 }
559}
560
561pub trait Yielder: Send {
563 fn yield_now(&mut self) -> impl Future<Output = ()> + Send;
564}
565
566#[cfg(test)]
567mod tests {
568 use super::{LSMTree, Yielder, compact_with_iterator};
569 use crate::drop_event::DropEvent;
570 use crate::lsm_tree::cache::{
571 NullCache, ObjectCache, ObjectCachePlaceholder, ObjectCacheResult,
572 };
573 use crate::lsm_tree::merge::{MergeLayerIterator, MergeResult};
574 use crate::lsm_tree::types::{
575 BoxedLayerIterator, Existence, FuzzyHash, Item, ItemRef, Key, Layer, LayerIterator,
576 LayerKey, OrdLowerBound, OrdUpperBound, SortByU64, Value,
577 };
578 use crate::lsm_tree::{Query, layers_from_handles};
579 use crate::object_handle::ObjectHandle;
580 use crate::serialized_types::{
581 LATEST_VERSION, Version, Versioned, VersionedLatest, versioned_type,
582 };
583 use crate::testing::fake_object::{FakeObject, FakeObjectHandle};
584 use crate::testing::writer::Writer;
585 use anyhow::{Error, anyhow};
586 use async_trait::async_trait;
587 use fprint::TypeFingerprint;
588 use fuchsia_sync::Mutex;
589 use fxfs_macros::FuzzyHash;
590 use rand::rng;
591 use rand::seq::SliceRandom;
592 use std::hash::Hash;
593 use std::sync::Arc;
594
595 #[derive(
596 Clone,
597 Eq,
598 PartialEq,
599 Debug,
600 Hash,
601 FuzzyHash,
602 serde::Serialize,
603 serde::Deserialize,
604 TypeFingerprint,
605 Versioned,
606 )]
607 struct TestKey(std::ops::Range<u64>);
608
609 versioned_type! { 1.. => TestKey }
610
611 impl SortByU64 for TestKey {
612 fn get_leading_u64(&self) -> u64 {
613 self.0.start
614 }
615 }
616
617 impl LayerKey for TestKey {}
618
619 impl OrdUpperBound for TestKey {
620 fn cmp_upper_bound(&self, other: &TestKey) -> std::cmp::Ordering {
621 self.0.end.cmp(&other.0.end)
622 }
623 }
624
625 impl OrdLowerBound for TestKey {
626 fn cmp_lower_bound(&self, other: &Self) -> std::cmp::Ordering {
627 self.0.start.cmp(&other.0.start)
628 }
629 }
630
631 fn emit_left_merge_fn(
632 _left: &MergeLayerIterator<'_, TestKey, u64>,
633 _right: &MergeLayerIterator<'_, TestKey, u64>,
634 ) -> MergeResult<TestKey, u64> {
635 MergeResult::EmitLeft
636 }
637
638 impl Value for u64 {
639 const DELETED_MARKER: Self = 0;
640 }
641
642 struct NoOpYielder;
643 impl Yielder for NoOpYielder {
644 async fn yield_now(&mut self) {}
645 }
646
647 #[fuchsia::test]
648 async fn test_iteration() {
649 let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
650 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
651 tree.insert(items[0].clone()).expect("insert error");
652 tree.insert(items[1].clone()).expect("insert error");
653 let layers = tree.layer_set();
654 let mut merger = layers.merger();
655 let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
656 let ItemRef { key, value, .. } = iter.get().expect("missing item");
657 assert_eq!((key, value), (&items[0].key, &items[0].value));
658 iter.advance().await.expect("advance failed");
659 let ItemRef { key, value, .. } = iter.get().expect("missing item");
660 assert_eq!((key, value), (&items[1].key, &items[1].value));
661 iter.advance().await.expect("advance failed");
662 assert!(iter.get().is_none());
663 }
664
665 #[fuchsia::test]
666 async fn test_compact() {
667 let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
668 let items = [
669 Item::new(TestKey(1..1), 1),
670 Item::new(TestKey(2..2), 2),
671 Item::new(TestKey(3..3), 3),
672 Item::new(TestKey(4..4), 4),
673 ];
674 tree.insert(items[0].clone()).expect("insert error");
675 tree.insert(items[1].clone()).expect("insert error");
676 tree.seal();
677 tree.insert(items[2].clone()).expect("insert error");
678 tree.insert(items[3].clone()).expect("insert error");
679 tree.seal();
680 let object = Arc::new(FakeObject::new());
681 let handle = FakeObjectHandle::new(object.clone());
682 {
683 let layer_set = tree.immutable_layer_set();
684 let mut merger = layer_set.merger();
685 let iter = merger.query(Query::FullScan).await.expect("create merger");
686 compact_with_iterator(
687 iter,
688 items.len(),
689 Writer::new(&handle).await,
690 handle.block_size(),
691 Option::<NoOpYielder>::None,
692 )
693 .await
694 .expect("compact failed");
695 }
696 tree.set_layers(layers_from_handles([handle]).await.expect("layers_from_handles failed"));
697 let handle = FakeObjectHandle::new(object.clone());
698 let tree = LSMTree::open(emit_left_merge_fn, [handle], Box::new(NullCache {}))
699 .await
700 .expect("open failed");
701
702 let layers = tree.layer_set();
703 let mut merger = layers.merger();
704 let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
705 for i in 1..5 {
706 let ItemRef { key, value, .. } = iter.get().expect("missing item");
707 assert_eq!((key, value), (&TestKey(i..i), &i));
708 iter.advance().await.expect("advance failed");
709 }
710 assert!(iter.get().is_none());
711 }
712
713 #[fuchsia::test]
714 async fn test_find() {
715 let items = [
716 Item::new(TestKey(1..1), 1),
717 Item::new(TestKey(2..2), 2),
718 Item::new(TestKey(3..3), 3),
719 Item::new(TestKey(4..4), 4),
720 ];
721 let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
722 tree.insert(items[0].clone()).expect("insert error");
723 tree.insert(items[1].clone()).expect("insert error");
724 tree.seal();
725 tree.insert(items[2].clone()).expect("insert error");
726 tree.insert(items[3].clone()).expect("insert error");
727
728 let item = tree.find(&items[1].key).await.expect("find failed").expect("not found");
729 assert_eq!(item, items[1]);
730 assert!(tree.find(&TestKey(100..100)).await.expect("find failed").is_none());
731 }
732
733 #[fuchsia::test]
734 async fn test_find_no_return_deleted_values() {
735 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), u64::DELETED_MARKER)];
736 let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
737 tree.insert(items[0].clone()).expect("insert error");
738 tree.insert(items[1].clone()).expect("insert error");
739
740 let item = tree.find(&items[0].key).await.expect("find failed").expect("not found");
741 assert_eq!(item, items[0]);
742 assert!(tree.find(&items[1].key).await.expect("find failed").is_none());
743 }
744
745 #[fuchsia::test]
746 async fn test_empty_seal() {
747 let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
748 tree.seal();
749 let item = Item::new(TestKey(1..1), 1);
750 tree.insert(item.clone()).expect("insert error");
751 let object = Arc::new(FakeObject::new());
752 let handle = FakeObjectHandle::new(object.clone());
753 {
754 let layer_set = tree.immutable_layer_set();
755 let mut merger = layer_set.merger();
756 let iter = merger.query(Query::FullScan).await.expect("create merger");
757 compact_with_iterator(
758 iter,
759 0,
760 Writer::new(&handle).await,
761 handle.block_size(),
762 Option::<NoOpYielder>::None,
763 )
764 .await
765 .expect("compact failed");
766 }
767 tree.set_layers(layers_from_handles([handle]).await.expect("layers_from_handles failed"));
768 let found_item = tree.find(&item.key).await.expect("find failed").expect("not found");
769 assert_eq!(found_item, item);
770 assert!(tree.find(&TestKey(2..2)).await.expect("find failed").is_none());
771 }
772
773 #[fuchsia::test]
774 async fn test_filter() {
775 let items = [
776 Item::new(TestKey(1..1), 1),
777 Item::new(TestKey(2..2), 2),
778 Item::new(TestKey(3..3), 3),
779 Item::new(TestKey(4..4), 4),
780 ];
781 let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
782 tree.insert(items[0].clone()).expect("insert error");
783 tree.insert(items[1].clone()).expect("insert error");
784 tree.insert(items[2].clone()).expect("insert error");
785 tree.insert(items[3].clone()).expect("insert error");
786
787 let layers = tree.layer_set();
788 let mut merger = layers.merger();
789
790 let mut iter = merger
792 .query(Query::FullScan)
793 .await
794 .expect("seek failed")
795 .filter(|item: ItemRef<'_, TestKey, u64>| item.key.0.start % 2 == 0)
796 .await
797 .expect("filter failed");
798
799 assert_eq!(iter.get(), Some(items[1].as_item_ref()));
800 iter.advance().await.expect("advance failed");
801 assert_eq!(iter.get(), Some(items[3].as_item_ref()));
802 iter.advance().await.expect("advance failed");
803 assert!(iter.get().is_none());
804 }
805
806 #[fuchsia::test]
807 async fn test_insert_order_agnostic() {
808 let items = [
809 Item::new(TestKey(1..1), 1),
810 Item::new(TestKey(2..2), 2),
811 Item::new(TestKey(3..3), 3),
812 Item::new(TestKey(4..4), 4),
813 Item::new(TestKey(5..5), 5),
814 Item::new(TestKey(6..6), 6),
815 ];
816 let a = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
817 for item in &items {
818 a.insert(item.clone()).expect("insert error");
819 }
820 let b = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
821 let mut shuffled = items.clone();
822 shuffled.shuffle(&mut rng());
823 for item in &shuffled {
824 b.insert(item.clone()).expect("insert error");
825 }
826 let layers = a.layer_set();
827 let mut merger = layers.merger();
828 let mut iter_a = merger.query(Query::FullScan).await.expect("seek failed");
829 let layers = b.layer_set();
830 let mut merger = layers.merger();
831 let mut iter_b = merger.query(Query::FullScan).await.expect("seek failed");
832
833 for item in items {
834 assert_eq!(Some(item.as_item_ref()), iter_a.get());
835 assert_eq!(Some(item.as_item_ref()), iter_b.get());
836 iter_a.advance().await.expect("advance failed");
837 iter_b.advance().await.expect("advance failed");
838 }
839 assert!(iter_a.get().is_none());
840 assert!(iter_b.get().is_none());
841 }
842
843 struct AuditCacheInner<'a, V: Value> {
844 lookups: u64,
845 completions: u64,
846 invalidations: u64,
847 drops: u64,
848 result: Option<ObjectCacheResult<'a, V>>,
849 }
850
851 impl<V: Value> AuditCacheInner<'_, V> {
852 fn stats(&self) -> (u64, u64, u64, u64) {
853 (self.lookups, self.completions, self.invalidations, self.drops)
854 }
855 }
856
857 struct AuditCache<'a, V: Value> {
858 inner: Arc<Mutex<AuditCacheInner<'a, V>>>,
859 }
860
861 impl<V: Value> AuditCache<'_, V> {
862 fn new() -> Self {
863 Self {
864 inner: Arc::new(Mutex::new(AuditCacheInner {
865 lookups: 0,
866 completions: 0,
867 invalidations: 0,
868 drops: 0,
869 result: None,
870 })),
871 }
872 }
873 }
874
875 struct AuditPlaceholder<'a, V: Value> {
876 inner: Arc<Mutex<AuditCacheInner<'a, V>>>,
877 completed: Mutex<bool>,
878 }
879
880 impl<V: Value> ObjectCachePlaceholder<V> for AuditPlaceholder<'_, V> {
881 fn complete(self: Box<Self>, _: Option<&V>) {
882 self.inner.lock().completions += 1;
883 *self.completed.lock() = true;
884 }
885 }
886
887 impl<V: Value> Drop for AuditPlaceholder<'_, V> {
888 fn drop(&mut self) {
889 if !*self.completed.lock() {
890 self.inner.lock().drops += 1;
891 }
892 }
893 }
894
895 impl<K: Key + std::cmp::PartialEq, V: Value> ObjectCache<K, V> for AuditCache<'_, V> {
896 fn lookup_or_reserve(&self, _key: &K) -> ObjectCacheResult<'_, V> {
897 {
898 let mut inner = self.inner.lock();
899 inner.lookups += 1;
900 if inner.result.is_some() {
901 return std::mem::take(&mut inner.result).unwrap();
902 }
903 }
904 ObjectCacheResult::Placeholder(Box::new(AuditPlaceholder {
905 inner: self.inner.clone(),
906 completed: Mutex::new(false),
907 }))
908 }
909
910 fn invalidate(&self, _key: K, _value: Option<V>) {
911 self.inner.lock().invalidations += 1;
912 }
913 }
914
915 #[fuchsia::test]
916 async fn test_cache_handling() {
917 let item = Item::new(TestKey(1..1), 1);
918 let cache = Box::new(AuditCache::new());
919 let inner = cache.inner.clone();
920 let a = LSMTree::new(emit_left_merge_fn, cache);
921
922 assert_eq!(inner.lock().stats(), (0, 0, 0, 0));
924
925 assert!(a.find(&item.key).await.expect("Failed find").is_none());
927 assert_eq!(inner.lock().stats(), (1, 0, 0, 1));
928
929 let _ = a.insert(item.clone());
931 assert_eq!(inner.lock().stats(), (1, 0, 1, 1));
932
933 assert_eq!(
935 a.find(&item.key).await.expect("Failed find").expect("Item should be found.").value,
936 item.value
937 );
938 assert_eq!(inner.lock().stats(), (2, 1, 1, 1));
939
940 a.replace_or_insert(item.clone());
942 assert_eq!(inner.lock().stats(), (2, 1, 2, 1));
943 }
944
945 #[fuchsia::test]
946 async fn test_cache_hit() {
947 let item = Item::new(TestKey(1..1), 1);
948 let cache = Box::new(AuditCache::new());
949 let inner = cache.inner.clone();
950 let a = LSMTree::new(emit_left_merge_fn, cache);
951
952 assert_eq!(inner.lock().stats(), (0, 0, 0, 0));
954
955 let _ = a.insert(item.clone());
957 assert_eq!(inner.lock().stats(), (0, 0, 1, 0));
958
959 inner.lock().result = Some(ObjectCacheResult::Value(item.value.clone()));
961
962 assert_eq!(
964 a.find(&item.key).await.expect("Failed find").expect("Item should be found.").value,
965 item.value
966 );
967 assert_eq!(inner.lock().stats(), (1, 0, 1, 0));
968 }
969
970 #[fuchsia::test]
971 async fn test_cache_says_uncacheable() {
972 let item = Item::new(TestKey(1..1), 1);
973 let cache = Box::new(AuditCache::new());
974 let inner = cache.inner.clone();
975 let a = LSMTree::new(emit_left_merge_fn, cache);
976 let _ = a.insert(item.clone());
977
978 assert_eq!(inner.lock().stats(), (0, 0, 1, 0));
980
981 inner.lock().result = Some(ObjectCacheResult::NoCache);
983
984 assert_eq!(
986 a.find(&item.key).await.expect("Failed find").expect("Should find item").value,
987 item.value
988 );
989 assert_eq!(inner.lock().stats(), (1, 0, 1, 0));
990 }
991
992 struct FailLayer {
993 drop_event: Mutex<Option<Arc<DropEvent>>>,
994 }
995
996 impl FailLayer {
997 fn new() -> Self {
998 Self { drop_event: Mutex::new(Some(Arc::new(DropEvent::new()))) }
999 }
1000 }
1001
1002 #[async_trait]
1003 impl<K: Key, V: Value> Layer<K, V> for FailLayer {
1004 async fn seek(
1005 &self,
1006 _bound: std::ops::Bound<&K>,
1007 ) -> Result<BoxedLayerIterator<'_, K, V>, Error> {
1008 Err(anyhow!("Purposely failed seek"))
1009 }
1010
1011 fn lock(&self) -> Option<Arc<DropEvent>> {
1012 self.drop_event.lock().clone()
1013 }
1014
1015 fn len(&self) -> usize {
1016 0
1017 }
1018
1019 async fn close(&self) {
1020 let listener = match std::mem::replace(&mut (*self.drop_event.lock()), None) {
1021 Some(drop_event) => drop_event.listen(),
1022 None => return,
1023 };
1024 listener.await;
1025 }
1026
1027 fn get_version(&self) -> Version {
1028 LATEST_VERSION
1029 }
1030
1031 async fn key_exists(&self, _key: &K) -> Result<Existence, Error> {
1032 unimplemented!();
1033 }
1034 }
1035
1036 struct MockLayer {
1037 exists_result: Existence,
1038 drop_event: Mutex<Option<Arc<DropEvent>>>,
1039 }
1040
1041 impl MockLayer {
1042 fn new(exists_result: Existence) -> Self {
1043 Self { exists_result, drop_event: Mutex::new(Some(Arc::new(DropEvent::new()))) }
1044 }
1045 }
1046
1047 #[async_trait]
1048 impl<K: Key, V: Value> Layer<K, V> for MockLayer {
1049 async fn seek(
1050 &self,
1051 _bound: std::ops::Bound<&K>,
1052 ) -> Result<BoxedLayerIterator<'_, K, V>, Error> {
1053 unimplemented!()
1054 }
1055
1056 fn lock(&self) -> Option<Arc<DropEvent>> {
1057 self.drop_event.lock().clone()
1058 }
1059
1060 fn len(&self) -> usize {
1061 0
1062 }
1063
1064 async fn close(&self) {
1065 let listener = match std::mem::replace(&mut (*self.drop_event.lock()), None) {
1066 Some(drop_event) => drop_event.listen(),
1067 None => return,
1068 };
1069 listener.await;
1070 }
1071
1072 fn get_version(&self) -> Version {
1073 LATEST_VERSION
1074 }
1075
1076 async fn key_exists(&self, _key: &K) -> Result<Existence, Error> {
1077 Ok(self.exists_result)
1078 }
1079 }
1080
1081 #[fuchsia::test]
1082 async fn test_layer_set_key_exists() {
1083 use super::LockedLayer;
1084
1085 let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
1086 let mut layer_set = tree.empty_layer_set();
1087
1088 assert_eq!(
1090 layer_set.key_exists(&TestKey(0..1)).await.expect("key_exists failed"),
1091 Existence::Missing
1092 );
1093
1094 layer_set.layers.push(LockedLayer::from(
1096 Arc::new(MockLayer::new(Existence::Missing)) as Arc<dyn Layer<TestKey, u64>>
1097 ));
1098 assert_eq!(
1099 layer_set.key_exists(&TestKey(0..1)).await.expect("key_exists failed"),
1100 Existence::Missing
1101 );
1102
1103 layer_set.layers.push(LockedLayer::from(
1105 Arc::new(MockLayer::new(Existence::MaybeExists)) as Arc<dyn Layer<TestKey, u64>>
1106 ));
1107 assert_eq!(
1108 layer_set.key_exists(&TestKey(0..1)).await.expect("key_exists failed"),
1109 Existence::MaybeExists
1110 );
1111
1112 layer_set.layers.insert(
1114 0,
1115 LockedLayer::from(
1116 Arc::new(MockLayer::new(Existence::Exists)) as Arc<dyn Layer<TestKey, u64>>
1117 ),
1118 );
1119 assert_eq!(
1120 layer_set.key_exists(&TestKey(0..1)).await.expect("key_exists failed"),
1121 Existence::Exists
1122 );
1123 }
1124
1125 #[fuchsia::test]
1126 async fn test_failed_lookup() {
1127 let cache = Box::new(AuditCache::new());
1128 let inner = cache.inner.clone();
1129 let a = LSMTree::new(emit_left_merge_fn, cache);
1130 a.set_layers(vec![Arc::new(FailLayer::new())]);
1131
1132 assert_eq!(inner.lock().stats(), (0, 0, 0, 0));
1134
1135 assert!(a.find(&TestKey(1..1)).await.is_err());
1137 assert_eq!(inner.lock().stats(), (1, 0, 0, 1));
1138 }
1139}
1140
1141#[cfg(fuzz)]
1142mod fuzz {
1143 use crate::lsm_tree::types::{
1144 FuzzyHash, Item, LayerKey, OrdLowerBound, OrdUpperBound, SortByU64, Value,
1145 };
1146 use crate::serialized_types::{
1147 LATEST_VERSION, Version, Versioned, VersionedLatest, versioned_type,
1148 };
1149 use arbitrary::Arbitrary;
1150 use fprint::TypeFingerprint;
1151 use fuzz::fuzz;
1152 use fxfs_macros::FuzzyHash;
1153 use std::hash::Hash;
1154
1155 #[derive(
1156 Arbitrary,
1157 Clone,
1158 Eq,
1159 Hash,
1160 FuzzyHash,
1161 PartialEq,
1162 Debug,
1163 serde::Serialize,
1164 serde::Deserialize,
1165 TypeFingerprint,
1166 Versioned,
1167 )]
1168 struct TestKey(std::ops::Range<u64>);
1169
1170 versioned_type! { 1.. => TestKey }
1171
1172 impl Versioned for u64 {}
1173 versioned_type! { 1.. => u64 }
1174
1175 impl LayerKey for TestKey {}
1176
1177 impl SortByU64 for TestKey {
1178 fn get_leading_u64(&self) -> u64 {
1179 self.0.start
1180 }
1181 }
1182
1183 impl OrdUpperBound for TestKey {
1184 fn cmp_upper_bound(&self, other: &TestKey) -> std::cmp::Ordering {
1185 self.0.end.cmp(&other.0.end)
1186 }
1187 }
1188
1189 impl OrdLowerBound for TestKey {
1190 fn cmp_lower_bound(&self, other: &Self) -> std::cmp::Ordering {
1191 self.0.start.cmp(&other.0.start)
1192 }
1193 }
1194
1195 impl Value for u64 {
1196 const DELETED_MARKER: Self = 0;
1197 }
1198
1199 #[allow(dead_code)]
1203 #[derive(Arbitrary)]
1204 enum FuzzAction {
1205 Insert(Item<TestKey, u64>),
1206 ReplaceOrInsert(Item<TestKey, u64>),
1207 MergeInto(Item<TestKey, u64>, TestKey),
1208 Find(TestKey),
1209 Seal,
1210 }
1211
1212 #[fuzz]
1213 fn fuzz_lsm_tree_actions(actions: Vec<FuzzAction>) {
1214 use super::LSMTree;
1215 use super::cache::NullCache;
1216 use crate::lsm_tree::merge::{MergeLayerIterator, MergeResult};
1217 use futures::executor::block_on;
1218
1219 fn emit_left_merge_fn(
1220 _left: &MergeLayerIterator<'_, TestKey, u64>,
1221 _right: &MergeLayerIterator<'_, TestKey, u64>,
1222 ) -> MergeResult<TestKey, u64> {
1223 MergeResult::EmitLeft
1224 }
1225
1226 let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
1227 for action in actions {
1228 match action {
1229 FuzzAction::Insert(item) => {
1230 let _ = tree.insert(item);
1231 }
1232 FuzzAction::ReplaceOrInsert(item) => {
1233 tree.replace_or_insert(item);
1234 }
1235 FuzzAction::Find(key) => {
1236 block_on(tree.find(&key)).expect("find failed");
1237 }
1238 FuzzAction::MergeInto(item, bound) => tree.merge_into(item, &bound),
1239 FuzzAction::Seal => tree.seal(),
1240 };
1241 }
1242 }
1243}