1mod bloom_filter;
6pub mod cache;
7pub mod merge;
8pub mod persistent_layer;
9pub mod skip_list_layer;
10pub mod types;
11
12use crate::drop_event::DropEvent;
13use crate::log::*;
14use crate::metrics::DurationMeasureScope;
15use crate::object_handle::{ReadObjectHandle, WriteBytes};
16use crate::serialized_types::{LATEST_VERSION, Version};
17
18use anyhow::Error;
19use cache::{ObjectCache, ObjectCacheResult};
20
21use fuchsia_sync::{Mutex, RwLock};
22use persistent_layer::{PersistentLayer, PersistentLayerWriter};
23use skip_list_layer::SkipListLayer;
24use std::fmt;
25use std::sync::Arc;
26use types::{
27 Existence, Item, ItemRef, Key, Layer, LayerIterator, LayerKey, LayerWriter, MergeableKey,
28 OrdLowerBound, Value,
29};
30
31pub use merge::Query;
32
33const SKIP_LIST_LAYER_ITEMS: usize = 512;
34
35pub use persistent_layer::{
37 LayerHeader as PersistentLayerHeader, LayerHeaderV39 as PersistentLayerHeaderV39,
38 LayerInfo as PersistentLayerInfo, LayerInfoV39 as PersistentLayerInfoV39,
39};
40
41pub async fn layers_from_handles<K: Key, V: Value>(
42 handles: impl IntoIterator<Item = impl ReadObjectHandle + 'static>,
43) -> Result<Vec<Arc<dyn Layer<K, V>>>, Error> {
44 let mut layers = Vec::new();
45 for handle in handles {
46 layers.push(PersistentLayer::open(handle).await? as Arc<dyn Layer<K, V>>);
47 }
48 Ok(layers)
49}
50
51#[derive(Eq, PartialEq, Debug)]
52pub enum Operation {
53 Insert,
54 ReplaceOrInsert,
55 MergeInto,
56}
57
58pub type MutationCallback<K, V> = Option<Box<dyn Fn(Operation, &Item<K, V>) + Send + Sync>>;
59
60struct Inner<K, V> {
61 mutable_layer: Arc<SkipListLayer<K, V>>,
62 layers: Vec<Arc<dyn Layer<K, V>>>,
63 mutation_callback: MutationCallback<K, V>,
64}
65
66#[derive(Default)]
67pub(super) struct Counters {
68 num_seeks: usize,
69 layer_files_total: usize,
73 layer_files_skipped: usize,
74}
75
76pub struct LSMTree<K, V> {
80 data: RwLock<Inner<K, V>>,
81 merge_fn: merge::MergeFn<K, V>,
82 cache: Box<dyn ObjectCache<K, V>>,
83 counters: Arc<Mutex<Counters>>,
84}
85
86#[fxfs_trace::trace]
87impl<'tree, K: MergeableKey, V: Value> LSMTree<K, V> {
88 pub fn new(merge_fn: merge::MergeFn<K, V>, cache: Box<dyn ObjectCache<K, V>>) -> Self {
90 LSMTree {
91 data: RwLock::new(Inner {
92 mutable_layer: Self::new_mutable_layer(),
93 layers: Vec::new(),
94 mutation_callback: None,
95 }),
96 merge_fn,
97 cache,
98 counters: Arc::new(Mutex::new(Default::default())),
99 }
100 }
101
102 pub async fn open(
104 merge_fn: merge::MergeFn<K, V>,
105 handles: impl IntoIterator<Item = impl ReadObjectHandle + 'static>,
106 cache: Box<dyn ObjectCache<K, V>>,
107 ) -> Result<Self, Error> {
108 Ok(LSMTree {
109 data: RwLock::new(Inner {
110 mutable_layer: Self::new_mutable_layer(),
111 layers: layers_from_handles(handles).await?,
112 mutation_callback: None,
113 }),
114 merge_fn,
115 cache,
116 counters: Arc::new(Mutex::new(Default::default())),
117 })
118 }
119
120 pub fn set_layers(&self, layers: Vec<Arc<dyn Layer<K, V>>>) {
122 self.data.write().layers = layers;
123 }
124
125 pub async fn append_layers(
128 &self,
129 handles: impl IntoIterator<Item = impl ReadObjectHandle + 'static>,
130 ) -> Result<(), Error> {
131 let mut layers = layers_from_handles(handles).await?;
132 self.data.write().layers.append(&mut layers);
133 Ok(())
134 }
135
136 pub fn reset_immutable_layers(&self) {
138 self.data.write().layers = Vec::new();
139 }
140
141 pub fn seal(&self) {
143 let mut data = self.data.write();
146 let layer = std::mem::replace(&mut data.mutable_layer, Self::new_mutable_layer());
147 data.layers.insert(0, layer);
148 }
149
150 pub fn reset(&self) {
152 let mut data = self.data.write();
153 data.layers = Vec::new();
154 data.mutable_layer = Self::new_mutable_layer();
155 }
156
157 #[trace]
159 pub async fn compact_with_iterator<W: WriteBytes + Send>(
160 &self,
161 mut iterator: impl LayerIterator<K, V>,
162 num_items: usize,
163 writer: W,
164 block_size: u64,
165 ) -> Result<(), Error> {
166 let mut writer =
167 PersistentLayerWriter::<W, K, V>::new(writer, num_items, block_size).await?;
168 while let Some(item_ref) = iterator.get() {
169 debug!(item_ref:?; "compact: writing");
170 writer.write(item_ref).await?;
171 iterator.advance().await?;
172 }
173 writer.flush().await
174 }
175
176 pub fn empty_layer_set(&self) -> LayerSet<K, V> {
178 LayerSet { layers: Vec::new(), merge_fn: self.merge_fn, counters: self.counters.clone() }
179 }
180
181 pub fn add_all_layers_to_layer_set(&self, layer_set: &mut LayerSet<K, V>) {
183 let data = self.data.read();
184 layer_set.layers.reserve_exact(data.layers.len() + 1);
185 layer_set
186 .layers
187 .push(LockedLayer::from(data.mutable_layer.clone() as Arc<dyn Layer<K, V>>));
188 for layer in &data.layers {
189 layer_set.layers.push(layer.clone().into());
190 }
191 }
192
193 pub fn layer_set(&self) -> LayerSet<K, V> {
196 let mut layer_set = self.empty_layer_set();
197 self.add_all_layers_to_layer_set(&mut layer_set);
198 layer_set
199 }
200
201 pub fn immutable_layer_set(&self) -> LayerSet<K, V> {
205 let data = self.data.read();
206 let mut layers = Vec::with_capacity(data.layers.len());
207 for layer in &data.layers {
208 layers.push(layer.clone().into());
209 }
210 LayerSet { layers, merge_fn: self.merge_fn, counters: self.counters.clone() }
211 }
212
213 pub fn insert(&self, item: Item<K, V>) -> Result<(), Error> {
216 let _measure = DurationMeasureScope::new(&crate::metrics::lsm_tree_metrics().insert);
217
218 let key = item.key.clone();
219 let val = if item.value == V::DELETED_MARKER { None } else { Some(item.value.clone()) };
220 {
221 let data = self.data.read();
223 if let Some(mutation_callback) = data.mutation_callback.as_ref() {
224 mutation_callback(Operation::Insert, &item);
225 }
226 data.mutable_layer.insert(item)?;
227 }
228 self.cache.invalidate(key, val);
229 Ok(())
230 }
231
232 pub fn replace_or_insert(&self, item: Item<K, V>) {
234 let _measure =
235 DurationMeasureScope::new(&crate::metrics::lsm_tree_metrics().replace_or_insert);
236
237 let key = item.key.clone();
238 let val = if item.value == V::DELETED_MARKER { None } else { Some(item.value.clone()) };
239 {
240 let data = self.data.read();
242 if let Some(mutation_callback) = data.mutation_callback.as_ref() {
243 mutation_callback(Operation::ReplaceOrInsert, &item);
244 }
245 data.mutable_layer.replace_or_insert(item);
246 }
247 self.cache.invalidate(key, val);
248 }
249
250 pub fn merge_into(&self, item: Item<K, V>, lower_bound: &K) {
252 let _measure = DurationMeasureScope::new(&crate::metrics::lsm_tree_metrics().merge_into);
253
254 let key = item.key.clone();
255 {
256 let data = self.data.read();
258 if let Some(mutation_callback) = data.mutation_callback.as_ref() {
259 mutation_callback(Operation::MergeInto, &item);
260 }
261 data.mutable_layer.merge_into(item, lower_bound, self.merge_fn);
262 }
263 self.cache.invalidate(key, None);
264 }
265
266 pub async fn find(&self, search_key: &K) -> Result<Option<Item<K, V>>, Error>
269 where
270 K: Eq,
271 {
272 let _measure = DurationMeasureScope::new(&crate::metrics::lsm_tree_metrics().find);
273 let token = match self.cache.lookup_or_reserve(search_key) {
277 ObjectCacheResult::Value(value) => {
278 if value == V::DELETED_MARKER {
279 return Ok(None);
280 } else {
281 return Ok(Some(Item::new(search_key.clone(), value)));
282 }
283 }
284 ObjectCacheResult::Placeholder(token) => Some(token),
285 ObjectCacheResult::NoCache => None,
286 };
287 let layer_set = self.layer_set();
288 let mut merger = layer_set.merger();
289
290 Ok(match merger.query(Query::Point(search_key)).await?.get() {
291 Some(ItemRef { key, value, sequence })
292 if key == search_key && *value != V::DELETED_MARKER =>
293 {
294 if let Some(token) = token {
295 token.complete(Some(value));
296 }
297 Some(Item { key: key.clone(), value: value.clone(), sequence })
298 }
299 _ => None,
300 })
301 }
302
303 pub fn mutable_layer(&self) -> Arc<SkipListLayer<K, V>> {
304 self.data.read().mutable_layer.clone()
305 }
306
307 pub fn set_mutation_callback(&self, mutation_callback: MutationCallback<K, V>) {
311 self.data.write().mutation_callback = mutation_callback;
312 }
313
314 pub fn get_earliest_version(&self) -> Version {
316 let mut earliest_version = LATEST_VERSION;
317 for layer in self.layer_set().layers {
318 let layer_version = layer.get_version();
319 if layer_version < earliest_version {
320 earliest_version = layer_version;
321 }
322 }
323 return earliest_version;
324 }
325
326 pub fn new_mutable_layer() -> Arc<SkipListLayer<K, V>> {
328 SkipListLayer::new(SKIP_LIST_LAYER_ITEMS)
329 }
330
331 pub fn set_mutable_layer(&self, layer: Arc<SkipListLayer<K, V>>) {
333 self.data.write().mutable_layer = layer;
334 }
335
336 pub fn record_inspect_data(&self, root: &fuchsia_inspect::Node) {
338 let layer_set = self.layer_set();
339 root.record_child("layers", move |node| {
340 let mut index = 0;
341 for layer in layer_set.layers {
342 node.record_child(format!("{index}"), move |node| {
343 layer.1.record_inspect_data(node)
344 });
345 index += 1;
346 }
347 });
348 {
349 let counters = self.counters.lock();
350 root.record_uint("num_seeks", counters.num_seeks as u64);
351 root.record_uint(
352 "bloom_filter_success_percent",
353 if counters.layer_files_total == 0 {
354 0
355 } else {
356 (counters.layer_files_skipped * 100).div_ceil(counters.layer_files_total) as u64
357 },
358 );
359 }
360 }
361}
362
363pub struct LockedLayer<K, V>(Arc<DropEvent>, Arc<dyn Layer<K, V>>);
366
367impl<K, V> LockedLayer<K, V> {
368 pub async fn close_layer(self) {
369 let layer = self.1;
370 std::mem::drop(self.0);
371 layer.close().await;
372 }
373}
374
375impl<K, V> From<Arc<dyn Layer<K, V>>> for LockedLayer<K, V> {
376 fn from(layer: Arc<dyn Layer<K, V>>) -> Self {
377 let event = layer.lock().unwrap();
378 Self(event, layer)
379 }
380}
381
382impl<K, V> std::ops::Deref for LockedLayer<K, V> {
383 type Target = Arc<dyn Layer<K, V>>;
384
385 fn deref(&self) -> &Self::Target {
386 &self.1
387 }
388}
389
390impl<K, V> AsRef<dyn Layer<K, V>> for LockedLayer<K, V> {
391 fn as_ref(&self) -> &(dyn Layer<K, V> + 'static) {
392 self.1.as_ref()
393 }
394}
395
396pub struct LayerSet<K, V> {
399 pub layers: Vec<LockedLayer<K, V>>,
400 merge_fn: merge::MergeFn<K, V>,
401 counters: Arc<Mutex<Counters>>,
402}
403
404impl<K: Key + LayerKey + OrdLowerBound, V: Value> LayerSet<K, V> {
405 pub fn sum_len(&self) -> usize {
406 let mut size = 0;
407 for layer in &self.layers {
408 size += layer.len()
409 }
410 size
411 }
412
413 pub fn merger(&self) -> merge::Merger<'_, K, V> {
414 merge::Merger::new(
415 self.layers.iter().map(|x| x.as_ref()),
416 self.merge_fn,
417 self.counters.clone(),
418 )
419 }
420
421 pub async fn key_exists(&self, key: &K) -> Result<Existence, Error> {
423 for l in &self.layers {
424 match l.key_exists(key).await? {
425 e @ (Existence::Exists | Existence::MaybeExists) => return Ok(e),
426 _ => {}
427 }
428 }
429 Ok(Existence::Missing)
430 }
431}
432
433impl<K, V> fmt::Debug for LayerSet<K, V> {
434 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
435 fmt.debug_list()
436 .entries(self.layers.iter().map(|l| {
437 if let Some(handle) = l.handle() {
438 format!("{}", handle.object_id())
439 } else {
440 format!("{:?}", Arc::as_ptr(l))
441 }
442 }))
443 .finish()
444 }
445}
446
447#[cfg(test)]
448mod tests {
449 use super::LSMTree;
450 use crate::drop_event::DropEvent;
451 use crate::lsm_tree::cache::{
452 NullCache, ObjectCache, ObjectCachePlaceholder, ObjectCacheResult,
453 };
454 use crate::lsm_tree::merge::{MergeLayerIterator, MergeResult};
455 use crate::lsm_tree::types::{
456 BoxedLayerIterator, Existence, FuzzyHash, Item, ItemRef, Key, Layer, LayerIterator,
457 LayerKey, OrdLowerBound, OrdUpperBound, SortByU64, Value,
458 };
459 use crate::lsm_tree::{Query, layers_from_handles};
460 use crate::object_handle::ObjectHandle;
461 use crate::serialized_types::{
462 LATEST_VERSION, Version, Versioned, VersionedLatest, versioned_type,
463 };
464 use crate::testing::fake_object::{FakeObject, FakeObjectHandle};
465 use crate::testing::writer::Writer;
466 use anyhow::{Error, anyhow};
467 use async_trait::async_trait;
468 use fprint::TypeFingerprint;
469 use fuchsia_sync::Mutex;
470 use fxfs_macros::FuzzyHash;
471 use rand::rng;
472 use rand::seq::SliceRandom;
473 use std::hash::Hash;
474 use std::sync::Arc;
475
476 #[derive(
477 Clone,
478 Eq,
479 PartialEq,
480 Debug,
481 Hash,
482 FuzzyHash,
483 serde::Serialize,
484 serde::Deserialize,
485 TypeFingerprint,
486 Versioned,
487 )]
488 struct TestKey(std::ops::Range<u64>);
489
490 versioned_type! { 1.. => TestKey }
491
492 impl SortByU64 for TestKey {
493 fn get_leading_u64(&self) -> u64 {
494 self.0.start
495 }
496 }
497
498 impl LayerKey for TestKey {}
499
500 impl OrdUpperBound for TestKey {
501 fn cmp_upper_bound(&self, other: &TestKey) -> std::cmp::Ordering {
502 self.0.end.cmp(&other.0.end)
503 }
504 }
505
506 impl OrdLowerBound for TestKey {
507 fn cmp_lower_bound(&self, other: &Self) -> std::cmp::Ordering {
508 self.0.start.cmp(&other.0.start)
509 }
510 }
511
512 fn emit_left_merge_fn(
513 _left: &MergeLayerIterator<'_, TestKey, u64>,
514 _right: &MergeLayerIterator<'_, TestKey, u64>,
515 ) -> MergeResult<TestKey, u64> {
516 MergeResult::EmitLeft
517 }
518
519 impl Value for u64 {
520 const DELETED_MARKER: Self = 0;
521 }
522
523 #[fuchsia::test]
524 async fn test_iteration() {
525 let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
526 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
527 tree.insert(items[0].clone()).expect("insert error");
528 tree.insert(items[1].clone()).expect("insert error");
529 let layers = tree.layer_set();
530 let mut merger = layers.merger();
531 let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
532 let ItemRef { key, value, .. } = iter.get().expect("missing item");
533 assert_eq!((key, value), (&items[0].key, &items[0].value));
534 iter.advance().await.expect("advance failed");
535 let ItemRef { key, value, .. } = iter.get().expect("missing item");
536 assert_eq!((key, value), (&items[1].key, &items[1].value));
537 iter.advance().await.expect("advance failed");
538 assert!(iter.get().is_none());
539 }
540
541 #[fuchsia::test]
542 async fn test_compact() {
543 let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
544 let items = [
545 Item::new(TestKey(1..1), 1),
546 Item::new(TestKey(2..2), 2),
547 Item::new(TestKey(3..3), 3),
548 Item::new(TestKey(4..4), 4),
549 ];
550 tree.insert(items[0].clone()).expect("insert error");
551 tree.insert(items[1].clone()).expect("insert error");
552 tree.seal();
553 tree.insert(items[2].clone()).expect("insert error");
554 tree.insert(items[3].clone()).expect("insert error");
555 tree.seal();
556 let object = Arc::new(FakeObject::new());
557 let handle = FakeObjectHandle::new(object.clone());
558 {
559 let layer_set = tree.immutable_layer_set();
560 let mut merger = layer_set.merger();
561 let iter = merger.query(Query::FullScan).await.expect("create merger");
562 tree.compact_with_iterator(
563 iter,
564 items.len(),
565 Writer::new(&handle).await,
566 handle.block_size(),
567 )
568 .await
569 .expect("compact failed");
570 }
571 tree.set_layers(layers_from_handles([handle]).await.expect("layers_from_handles failed"));
572 let handle = FakeObjectHandle::new(object.clone());
573 let tree = LSMTree::open(emit_left_merge_fn, [handle], Box::new(NullCache {}))
574 .await
575 .expect("open failed");
576
577 let layers = tree.layer_set();
578 let mut merger = layers.merger();
579 let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
580 for i in 1..5 {
581 let ItemRef { key, value, .. } = iter.get().expect("missing item");
582 assert_eq!((key, value), (&TestKey(i..i), &i));
583 iter.advance().await.expect("advance failed");
584 }
585 assert!(iter.get().is_none());
586 }
587
588 #[fuchsia::test]
589 async fn test_find() {
590 let items = [
591 Item::new(TestKey(1..1), 1),
592 Item::new(TestKey(2..2), 2),
593 Item::new(TestKey(3..3), 3),
594 Item::new(TestKey(4..4), 4),
595 ];
596 let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
597 tree.insert(items[0].clone()).expect("insert error");
598 tree.insert(items[1].clone()).expect("insert error");
599 tree.seal();
600 tree.insert(items[2].clone()).expect("insert error");
601 tree.insert(items[3].clone()).expect("insert error");
602
603 let item = tree.find(&items[1].key).await.expect("find failed").expect("not found");
604 assert_eq!(item, items[1]);
605 assert!(tree.find(&TestKey(100..100)).await.expect("find failed").is_none());
606 }
607
608 #[fuchsia::test]
609 async fn test_find_no_return_deleted_values() {
610 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), u64::DELETED_MARKER)];
611 let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
612 tree.insert(items[0].clone()).expect("insert error");
613 tree.insert(items[1].clone()).expect("insert error");
614
615 let item = tree.find(&items[0].key).await.expect("find failed").expect("not found");
616 assert_eq!(item, items[0]);
617 assert!(tree.find(&items[1].key).await.expect("find failed").is_none());
618 }
619
620 #[fuchsia::test]
621 async fn test_empty_seal() {
622 let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
623 tree.seal();
624 let item = Item::new(TestKey(1..1), 1);
625 tree.insert(item.clone()).expect("insert error");
626 let object = Arc::new(FakeObject::new());
627 let handle = FakeObjectHandle::new(object.clone());
628 {
629 let layer_set = tree.immutable_layer_set();
630 let mut merger = layer_set.merger();
631 let iter = merger.query(Query::FullScan).await.expect("create merger");
632 tree.compact_with_iterator(iter, 0, Writer::new(&handle).await, handle.block_size())
633 .await
634 .expect("compact failed");
635 }
636 tree.set_layers(layers_from_handles([handle]).await.expect("layers_from_handles failed"));
637 let found_item = tree.find(&item.key).await.expect("find failed").expect("not found");
638 assert_eq!(found_item, item);
639 assert!(tree.find(&TestKey(2..2)).await.expect("find failed").is_none());
640 }
641
642 #[fuchsia::test]
643 async fn test_filter() {
644 let items = [
645 Item::new(TestKey(1..1), 1),
646 Item::new(TestKey(2..2), 2),
647 Item::new(TestKey(3..3), 3),
648 Item::new(TestKey(4..4), 4),
649 ];
650 let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
651 tree.insert(items[0].clone()).expect("insert error");
652 tree.insert(items[1].clone()).expect("insert error");
653 tree.insert(items[2].clone()).expect("insert error");
654 tree.insert(items[3].clone()).expect("insert error");
655
656 let layers = tree.layer_set();
657 let mut merger = layers.merger();
658
659 let mut iter = merger
661 .query(Query::FullScan)
662 .await
663 .expect("seek failed")
664 .filter(|item: ItemRef<'_, TestKey, u64>| item.key.0.start % 2 == 0)
665 .await
666 .expect("filter failed");
667
668 assert_eq!(iter.get(), Some(items[1].as_item_ref()));
669 iter.advance().await.expect("advance failed");
670 assert_eq!(iter.get(), Some(items[3].as_item_ref()));
671 iter.advance().await.expect("advance failed");
672 assert!(iter.get().is_none());
673 }
674
675 #[fuchsia::test]
676 async fn test_insert_order_agnostic() {
677 let items = [
678 Item::new(TestKey(1..1), 1),
679 Item::new(TestKey(2..2), 2),
680 Item::new(TestKey(3..3), 3),
681 Item::new(TestKey(4..4), 4),
682 Item::new(TestKey(5..5), 5),
683 Item::new(TestKey(6..6), 6),
684 ];
685 let a = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
686 for item in &items {
687 a.insert(item.clone()).expect("insert error");
688 }
689 let b = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
690 let mut shuffled = items.clone();
691 shuffled.shuffle(&mut rng());
692 for item in &shuffled {
693 b.insert(item.clone()).expect("insert error");
694 }
695 let layers = a.layer_set();
696 let mut merger = layers.merger();
697 let mut iter_a = merger.query(Query::FullScan).await.expect("seek failed");
698 let layers = b.layer_set();
699 let mut merger = layers.merger();
700 let mut iter_b = merger.query(Query::FullScan).await.expect("seek failed");
701
702 for item in items {
703 assert_eq!(Some(item.as_item_ref()), iter_a.get());
704 assert_eq!(Some(item.as_item_ref()), iter_b.get());
705 iter_a.advance().await.expect("advance failed");
706 iter_b.advance().await.expect("advance failed");
707 }
708 assert!(iter_a.get().is_none());
709 assert!(iter_b.get().is_none());
710 }
711
712 struct AuditCacheInner<'a, V: Value> {
713 lookups: u64,
714 completions: u64,
715 invalidations: u64,
716 drops: u64,
717 result: Option<ObjectCacheResult<'a, V>>,
718 }
719
720 impl<V: Value> AuditCacheInner<'_, V> {
721 fn stats(&self) -> (u64, u64, u64, u64) {
722 (self.lookups, self.completions, self.invalidations, self.drops)
723 }
724 }
725
726 struct AuditCache<'a, V: Value> {
727 inner: Arc<Mutex<AuditCacheInner<'a, V>>>,
728 }
729
730 impl<V: Value> AuditCache<'_, V> {
731 fn new() -> Self {
732 Self {
733 inner: Arc::new(Mutex::new(AuditCacheInner {
734 lookups: 0,
735 completions: 0,
736 invalidations: 0,
737 drops: 0,
738 result: None,
739 })),
740 }
741 }
742 }
743
744 struct AuditPlaceholder<'a, V: Value> {
745 inner: Arc<Mutex<AuditCacheInner<'a, V>>>,
746 completed: Mutex<bool>,
747 }
748
749 impl<V: Value> ObjectCachePlaceholder<V> for AuditPlaceholder<'_, V> {
750 fn complete(self: Box<Self>, _: Option<&V>) {
751 self.inner.lock().completions += 1;
752 *self.completed.lock() = true;
753 }
754 }
755
756 impl<V: Value> Drop for AuditPlaceholder<'_, V> {
757 fn drop(&mut self) {
758 if !*self.completed.lock() {
759 self.inner.lock().drops += 1;
760 }
761 }
762 }
763
764 impl<K: Key + std::cmp::PartialEq, V: Value> ObjectCache<K, V> for AuditCache<'_, V> {
765 fn lookup_or_reserve(&self, _key: &K) -> ObjectCacheResult<'_, V> {
766 {
767 let mut inner = self.inner.lock();
768 inner.lookups += 1;
769 if inner.result.is_some() {
770 return std::mem::take(&mut inner.result).unwrap();
771 }
772 }
773 ObjectCacheResult::Placeholder(Box::new(AuditPlaceholder {
774 inner: self.inner.clone(),
775 completed: Mutex::new(false),
776 }))
777 }
778
779 fn invalidate(&self, _key: K, _value: Option<V>) {
780 self.inner.lock().invalidations += 1;
781 }
782 }
783
784 #[fuchsia::test]
785 async fn test_cache_handling() {
786 let item = Item::new(TestKey(1..1), 1);
787 let cache = Box::new(AuditCache::new());
788 let inner = cache.inner.clone();
789 let a = LSMTree::new(emit_left_merge_fn, cache);
790
791 assert_eq!(inner.lock().stats(), (0, 0, 0, 0));
793
794 assert!(a.find(&item.key).await.expect("Failed find").is_none());
796 assert_eq!(inner.lock().stats(), (1, 0, 0, 1));
797
798 let _ = a.insert(item.clone());
800 assert_eq!(inner.lock().stats(), (1, 0, 1, 1));
801
802 assert_eq!(
804 a.find(&item.key).await.expect("Failed find").expect("Item should be found.").value,
805 item.value
806 );
807 assert_eq!(inner.lock().stats(), (2, 1, 1, 1));
808
809 a.replace_or_insert(item.clone());
811 assert_eq!(inner.lock().stats(), (2, 1, 2, 1));
812 }
813
814 #[fuchsia::test]
815 async fn test_cache_hit() {
816 let item = Item::new(TestKey(1..1), 1);
817 let cache = Box::new(AuditCache::new());
818 let inner = cache.inner.clone();
819 let a = LSMTree::new(emit_left_merge_fn, cache);
820
821 assert_eq!(inner.lock().stats(), (0, 0, 0, 0));
823
824 let _ = a.insert(item.clone());
826 assert_eq!(inner.lock().stats(), (0, 0, 1, 0));
827
828 inner.lock().result = Some(ObjectCacheResult::Value(item.value.clone()));
830
831 assert_eq!(
833 a.find(&item.key).await.expect("Failed find").expect("Item should be found.").value,
834 item.value
835 );
836 assert_eq!(inner.lock().stats(), (1, 0, 1, 0));
837 }
838
839 #[fuchsia::test]
840 async fn test_cache_says_uncacheable() {
841 let item = Item::new(TestKey(1..1), 1);
842 let cache = Box::new(AuditCache::new());
843 let inner = cache.inner.clone();
844 let a = LSMTree::new(emit_left_merge_fn, cache);
845 let _ = a.insert(item.clone());
846
847 assert_eq!(inner.lock().stats(), (0, 0, 1, 0));
849
850 inner.lock().result = Some(ObjectCacheResult::NoCache);
852
853 assert_eq!(
855 a.find(&item.key).await.expect("Failed find").expect("Should find item").value,
856 item.value
857 );
858 assert_eq!(inner.lock().stats(), (1, 0, 1, 0));
859 }
860
861 struct FailLayer {
862 drop_event: Mutex<Option<Arc<DropEvent>>>,
863 }
864
865 impl FailLayer {
866 fn new() -> Self {
867 Self { drop_event: Mutex::new(Some(Arc::new(DropEvent::new()))) }
868 }
869 }
870
871 #[async_trait]
872 impl<K: Key, V: Value> Layer<K, V> for FailLayer {
873 async fn seek(
874 &self,
875 _bound: std::ops::Bound<&K>,
876 ) -> Result<BoxedLayerIterator<'_, K, V>, Error> {
877 Err(anyhow!("Purposely failed seek"))
878 }
879
880 fn lock(&self) -> Option<Arc<DropEvent>> {
881 self.drop_event.lock().clone()
882 }
883
884 fn len(&self) -> usize {
885 0
886 }
887
888 async fn close(&self) {
889 let listener = match std::mem::replace(&mut (*self.drop_event.lock()), None) {
890 Some(drop_event) => drop_event.listen(),
891 None => return,
892 };
893 listener.await;
894 }
895
896 fn get_version(&self) -> Version {
897 LATEST_VERSION
898 }
899
900 async fn key_exists(&self, _key: &K) -> Result<Existence, Error> {
901 unimplemented!();
902 }
903 }
904
905 struct MockLayer {
906 exists_result: Existence,
907 drop_event: Mutex<Option<Arc<DropEvent>>>,
908 }
909
910 impl MockLayer {
911 fn new(exists_result: Existence) -> Self {
912 Self { exists_result, drop_event: Mutex::new(Some(Arc::new(DropEvent::new()))) }
913 }
914 }
915
916 #[async_trait]
917 impl<K: Key, V: Value> Layer<K, V> for MockLayer {
918 async fn seek(
919 &self,
920 _bound: std::ops::Bound<&K>,
921 ) -> Result<BoxedLayerIterator<'_, K, V>, Error> {
922 unimplemented!()
923 }
924
925 fn lock(&self) -> Option<Arc<DropEvent>> {
926 self.drop_event.lock().clone()
927 }
928
929 fn len(&self) -> usize {
930 0
931 }
932
933 async fn close(&self) {
934 let listener = match std::mem::replace(&mut (*self.drop_event.lock()), None) {
935 Some(drop_event) => drop_event.listen(),
936 None => return,
937 };
938 listener.await;
939 }
940
941 fn get_version(&self) -> Version {
942 LATEST_VERSION
943 }
944
945 async fn key_exists(&self, _key: &K) -> Result<Existence, Error> {
946 Ok(self.exists_result)
947 }
948 }
949
950 #[fuchsia::test]
951 async fn test_layer_set_key_exists() {
952 use super::LockedLayer;
953
954 let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
955 let mut layer_set = tree.empty_layer_set();
956
957 assert_eq!(
959 layer_set.key_exists(&TestKey(0..1)).await.expect("key_exists failed"),
960 Existence::Missing
961 );
962
963 layer_set.layers.push(LockedLayer::from(
965 Arc::new(MockLayer::new(Existence::Missing)) as Arc<dyn Layer<TestKey, u64>>
966 ));
967 assert_eq!(
968 layer_set.key_exists(&TestKey(0..1)).await.expect("key_exists failed"),
969 Existence::Missing
970 );
971
972 layer_set.layers.push(LockedLayer::from(
974 Arc::new(MockLayer::new(Existence::MaybeExists)) as Arc<dyn Layer<TestKey, u64>>
975 ));
976 assert_eq!(
977 layer_set.key_exists(&TestKey(0..1)).await.expect("key_exists failed"),
978 Existence::MaybeExists
979 );
980
981 layer_set.layers.insert(
983 0,
984 LockedLayer::from(
985 Arc::new(MockLayer::new(Existence::Exists)) as Arc<dyn Layer<TestKey, u64>>
986 ),
987 );
988 assert_eq!(
989 layer_set.key_exists(&TestKey(0..1)).await.expect("key_exists failed"),
990 Existence::Exists
991 );
992 }
993
994 #[fuchsia::test]
995 async fn test_failed_lookup() {
996 let cache = Box::new(AuditCache::new());
997 let inner = cache.inner.clone();
998 let a = LSMTree::new(emit_left_merge_fn, cache);
999 a.set_layers(vec![Arc::new(FailLayer::new())]);
1000
1001 assert_eq!(inner.lock().stats(), (0, 0, 0, 0));
1003
1004 assert!(a.find(&TestKey(1..1)).await.is_err());
1006 assert_eq!(inner.lock().stats(), (1, 0, 0, 1));
1007 }
1008}
1009
1010#[cfg(fuzz)]
1011mod fuzz {
1012 use crate::lsm_tree::types::{
1013 FuzzyHash, Item, LayerKey, OrdLowerBound, OrdUpperBound, SortByU64, Value,
1014 };
1015 use crate::serialized_types::{
1016 LATEST_VERSION, Version, Versioned, VersionedLatest, versioned_type,
1017 };
1018 use arbitrary::Arbitrary;
1019 use fprint::TypeFingerprint;
1020 use fuzz::fuzz;
1021 use fxfs_macros::FuzzyHash;
1022 use std::hash::Hash;
1023
1024 #[derive(
1025 Arbitrary,
1026 Clone,
1027 Eq,
1028 Hash,
1029 FuzzyHash,
1030 PartialEq,
1031 Debug,
1032 serde::Serialize,
1033 serde::Deserialize,
1034 TypeFingerprint,
1035 Versioned,
1036 )]
1037 struct TestKey(std::ops::Range<u64>);
1038
1039 versioned_type! { 1.. => TestKey }
1040
1041 impl Versioned for u64 {}
1042 versioned_type! { 1.. => u64 }
1043
1044 impl LayerKey for TestKey {}
1045
1046 impl SortByU64 for TestKey {
1047 fn get_leading_u64(&self) -> u64 {
1048 self.0.start
1049 }
1050 }
1051
1052 impl OrdUpperBound for TestKey {
1053 fn cmp_upper_bound(&self, other: &TestKey) -> std::cmp::Ordering {
1054 self.0.end.cmp(&other.0.end)
1055 }
1056 }
1057
1058 impl OrdLowerBound for TestKey {
1059 fn cmp_lower_bound(&self, other: &Self) -> std::cmp::Ordering {
1060 self.0.start.cmp(&other.0.start)
1061 }
1062 }
1063
1064 impl Value for u64 {
1065 const DELETED_MARKER: Self = 0;
1066 }
1067
1068 #[allow(dead_code)]
1072 #[derive(Arbitrary)]
1073 enum FuzzAction {
1074 Insert(Item<TestKey, u64>),
1075 ReplaceOrInsert(Item<TestKey, u64>),
1076 MergeInto(Item<TestKey, u64>, TestKey),
1077 Find(TestKey),
1078 Seal,
1079 }
1080
1081 #[fuzz]
1082 fn fuzz_lsm_tree_actions(actions: Vec<FuzzAction>) {
1083 use super::LSMTree;
1084 use super::cache::NullCache;
1085 use crate::lsm_tree::merge::{MergeLayerIterator, MergeResult};
1086 use futures::executor::block_on;
1087
1088 fn emit_left_merge_fn(
1089 _left: &MergeLayerIterator<'_, TestKey, u64>,
1090 _right: &MergeLayerIterator<'_, TestKey, u64>,
1091 ) -> MergeResult<TestKey, u64> {
1092 MergeResult::EmitLeft
1093 }
1094
1095 let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
1096 for action in actions {
1097 match action {
1098 FuzzAction::Insert(item) => {
1099 let _ = tree.insert(item);
1100 }
1101 FuzzAction::ReplaceOrInsert(item) => {
1102 tree.replace_or_insert(item);
1103 }
1104 FuzzAction::Find(key) => {
1105 block_on(tree.find(&key)).expect("find failed");
1106 }
1107 FuzzAction::MergeInto(item, bound) => tree.merge_into(item, &bound),
1108 FuzzAction::Seal => tree.seal(),
1109 };
1110 }
1111 }
1112}