1mod bloom_filter;
6pub mod cache;
7pub mod merge;
8pub mod persistent_layer;
9pub mod skip_list_layer;
10pub mod types;
11
12use crate::drop_event::DropEvent;
13use crate::log::*;
14use crate::object_handle::{ReadObjectHandle, WriteBytes};
15use crate::serialized_types::{Version, LATEST_VERSION};
16use anyhow::Error;
17use cache::{ObjectCache, ObjectCacheResult};
18use fuchsia_sync::{Mutex, RwLock};
19use persistent_layer::{PersistentLayer, PersistentLayerWriter};
20use skip_list_layer::SkipListLayer;
21use std::fmt;
22use std::sync::Arc;
23use types::{
24 Item, ItemRef, Key, Layer, LayerIterator, LayerKey, LayerWriter, MergeableKey, OrdLowerBound,
25 Value,
26};
27
28pub use merge::Query;
29
30const SKIP_LIST_LAYER_ITEMS: usize = 512;
31
32pub use persistent_layer::{
34 LayerHeader as PersistentLayerHeader, LayerHeaderV39 as PersistentLayerHeaderV39,
35 LayerInfo as PersistentLayerInfo, LayerInfoV39 as PersistentLayerInfoV39,
36 OldLayerInfo as OldPersistentLayerInfo, OldLayerInfoV32 as OldPersistentLayerInfoV32,
37};
38
39pub async fn layers_from_handles<K: Key, V: Value>(
40 handles: impl IntoIterator<Item = impl ReadObjectHandle + 'static>,
41) -> Result<Vec<Arc<dyn Layer<K, V>>>, Error> {
42 let mut layers = Vec::new();
43 for handle in handles {
44 layers.push(PersistentLayer::open(handle).await? as Arc<dyn Layer<K, V>>);
45 }
46 Ok(layers)
47}
48
49#[derive(Eq, PartialEq, Debug)]
50pub enum Operation {
51 Insert,
52 ReplaceOrInsert,
53 MergeInto,
54}
55
56pub type MutationCallback<K, V> = Option<Box<dyn Fn(Operation, &Item<K, V>) + Send + Sync>>;
57
58struct Inner<K, V> {
59 mutable_layer: Arc<SkipListLayer<K, V>>,
60 layers: Vec<Arc<dyn Layer<K, V>>>,
61 mutation_callback: MutationCallback<K, V>,
62}
63
64#[derive(Default)]
65pub(super) struct Counters {
66 num_seeks: usize,
67 layer_files_total: usize,
71 layer_files_skipped: usize,
72}
73
74pub struct LSMTree<K, V> {
78 data: RwLock<Inner<K, V>>,
79 merge_fn: merge::MergeFn<K, V>,
80 cache: Box<dyn ObjectCache<K, V>>,
81 counters: Arc<Mutex<Counters>>,
82}
83
84#[fxfs_trace::trace]
85impl<'tree, K: MergeableKey, V: Value> LSMTree<K, V> {
86 pub fn new(merge_fn: merge::MergeFn<K, V>, cache: Box<dyn ObjectCache<K, V>>) -> Self {
88 LSMTree {
89 data: RwLock::new(Inner {
90 mutable_layer: Self::new_mutable_layer(),
91 layers: Vec::new(),
92 mutation_callback: None,
93 }),
94 merge_fn,
95 cache,
96 counters: Arc::new(Mutex::new(Default::default())),
97 }
98 }
99
100 pub async fn open(
102 merge_fn: merge::MergeFn<K, V>,
103 handles: impl IntoIterator<Item = impl ReadObjectHandle + 'static>,
104 cache: Box<dyn ObjectCache<K, V>>,
105 ) -> Result<Self, Error> {
106 Ok(LSMTree {
107 data: RwLock::new(Inner {
108 mutable_layer: Self::new_mutable_layer(),
109 layers: layers_from_handles(handles).await?,
110 mutation_callback: None,
111 }),
112 merge_fn,
113 cache,
114 counters: Arc::new(Mutex::new(Default::default())),
115 })
116 }
117
118 pub fn set_layers(&self, layers: Vec<Arc<dyn Layer<K, V>>>) {
120 self.data.write().layers = layers;
121 }
122
123 pub async fn append_layers(
126 &self,
127 handles: impl IntoIterator<Item = impl ReadObjectHandle + 'static>,
128 ) -> Result<(), Error> {
129 let mut layers = layers_from_handles(handles).await?;
130 self.data.write().layers.append(&mut layers);
131 Ok(())
132 }
133
134 pub fn reset_immutable_layers(&self) {
136 self.data.write().layers = Vec::new();
137 }
138
139 pub fn seal(&self) {
141 let mut data = self.data.write();
144 let layer = std::mem::replace(&mut data.mutable_layer, Self::new_mutable_layer());
145 data.layers.insert(0, layer);
146 }
147
148 pub fn reset(&self) {
150 let mut data = self.data.write();
151 data.layers = Vec::new();
152 data.mutable_layer = Self::new_mutable_layer();
153 }
154
155 #[trace]
157 pub async fn compact_with_iterator<W: WriteBytes + Send>(
158 &self,
159 mut iterator: impl LayerIterator<K, V>,
160 num_items: usize,
161 writer: W,
162 block_size: u64,
163 ) -> Result<(), Error> {
164 let mut writer =
165 PersistentLayerWriter::<W, K, V>::new(writer, num_items, block_size).await?;
166 while let Some(item_ref) = iterator.get() {
167 debug!(item_ref:?; "compact: writing");
168 writer.write(item_ref).await?;
169 iterator.advance().await?;
170 }
171 writer.flush().await
172 }
173
174 pub fn empty_layer_set(&self) -> LayerSet<K, V> {
176 LayerSet { layers: Vec::new(), merge_fn: self.merge_fn, counters: self.counters.clone() }
177 }
178
179 pub fn add_all_layers_to_layer_set(&self, layer_set: &mut LayerSet<K, V>) {
181 let data = self.data.read();
182 layer_set.layers.reserve_exact(data.layers.len() + 1);
183 layer_set
184 .layers
185 .push(LockedLayer::from(data.mutable_layer.clone() as Arc<dyn Layer<K, V>>));
186 for layer in &data.layers {
187 layer_set.layers.push(layer.clone().into());
188 }
189 }
190
191 pub fn layer_set(&self) -> LayerSet<K, V> {
194 let mut layer_set = self.empty_layer_set();
195 self.add_all_layers_to_layer_set(&mut layer_set);
196 layer_set
197 }
198
199 pub fn immutable_layer_set(&self) -> LayerSet<K, V> {
203 let data = self.data.read();
204 let mut layers = Vec::with_capacity(data.layers.len());
205 for layer in &data.layers {
206 layers.push(layer.clone().into());
207 }
208 LayerSet { layers, merge_fn: self.merge_fn, counters: self.counters.clone() }
209 }
210
211 pub fn insert(&self, item: Item<K, V>) -> Result<(), Error> {
214 let key = item.key.clone();
215 let val = if item.value == V::DELETED_MARKER { None } else { Some(item.value.clone()) };
216 {
217 let data = self.data.read();
219 if let Some(mutation_callback) = data.mutation_callback.as_ref() {
220 mutation_callback(Operation::Insert, &item);
221 }
222 data.mutable_layer.insert(item)?;
223 }
224 self.cache.invalidate(key, val);
225 Ok(())
226 }
227
228 pub fn replace_or_insert(&self, item: Item<K, V>) {
230 let key = item.key.clone();
231 let val = if item.value == V::DELETED_MARKER { None } else { Some(item.value.clone()) };
232 {
233 let data = self.data.read();
235 if let Some(mutation_callback) = data.mutation_callback.as_ref() {
236 mutation_callback(Operation::ReplaceOrInsert, &item);
237 }
238 data.mutable_layer.replace_or_insert(item);
239 }
240 self.cache.invalidate(key, val);
241 }
242
243 pub fn merge_into(&self, item: Item<K, V>, lower_bound: &K) {
245 let key = item.key.clone();
246 {
247 let data = self.data.read();
249 if let Some(mutation_callback) = data.mutation_callback.as_ref() {
250 mutation_callback(Operation::MergeInto, &item);
251 }
252 data.mutable_layer.merge_into(item, lower_bound, self.merge_fn);
253 }
254 self.cache.invalidate(key, None);
255 }
256
257 pub async fn find(&self, search_key: &K) -> Result<Option<Item<K, V>>, Error>
260 where
261 K: Eq,
262 {
263 let token = match self.cache.lookup_or_reserve(search_key) {
267 ObjectCacheResult::Value(value) => {
268 if value == V::DELETED_MARKER {
269 return Ok(None);
270 } else {
271 return Ok(Some(Item::new(search_key.clone(), value)));
272 }
273 }
274 ObjectCacheResult::Placeholder(token) => Some(token),
275 ObjectCacheResult::NoCache => None,
276 };
277 let layer_set = self.layer_set();
278 let mut merger = layer_set.merger();
279
280 Ok(match merger.query(Query::Point(search_key)).await?.get() {
281 Some(ItemRef { key, value, sequence })
282 if key == search_key && *value != V::DELETED_MARKER =>
283 {
284 if let Some(token) = token {
285 token.complete(Some(value));
286 }
287 Some(Item { key: key.clone(), value: value.clone(), sequence })
288 }
289 _ => None,
290 })
291 }
292
293 pub fn mutable_layer(&self) -> Arc<SkipListLayer<K, V>> {
294 self.data.read().mutable_layer.clone()
295 }
296
297 pub fn set_mutation_callback(&self, mutation_callback: MutationCallback<K, V>) {
301 self.data.write().mutation_callback = mutation_callback;
302 }
303
304 pub fn get_earliest_version(&self) -> Version {
306 let mut earliest_version = LATEST_VERSION;
307 for layer in self.layer_set().layers {
308 let layer_version = layer.get_version();
309 if layer_version < earliest_version {
310 earliest_version = layer_version;
311 }
312 }
313 return earliest_version;
314 }
315
316 pub fn new_mutable_layer() -> Arc<SkipListLayer<K, V>> {
318 SkipListLayer::new(SKIP_LIST_LAYER_ITEMS)
319 }
320
321 pub fn set_mutable_layer(&self, layer: Arc<SkipListLayer<K, V>>) {
323 self.data.write().mutable_layer = layer;
324 }
325
326 pub fn record_inspect_data(&self, root: &fuchsia_inspect::Node) {
328 let layer_set = self.layer_set();
329 root.record_child("layers", move |node| {
330 let mut index = 0;
331 for layer in layer_set.layers {
332 node.record_child(format!("{index}"), move |node| {
333 layer.1.record_inspect_data(node)
334 });
335 index += 1;
336 }
337 });
338 {
339 let counters = self.counters.lock();
340 root.record_uint("num_seeks", counters.num_seeks as u64);
341 root.record_uint(
342 "bloom_filter_success_percent",
343 if counters.layer_files_total == 0 {
344 0
345 } else {
346 (counters.layer_files_skipped * 100).div_ceil(counters.layer_files_total) as u64
347 },
348 );
349 }
350 }
351}
352
353pub struct LockedLayer<K, V>(Arc<DropEvent>, Arc<dyn Layer<K, V>>);
356
357impl<K, V> LockedLayer<K, V> {
358 pub async fn close_layer(self) {
359 let layer = self.1;
360 std::mem::drop(self.0);
361 layer.close().await;
362 }
363}
364
365impl<K, V> From<Arc<dyn Layer<K, V>>> for LockedLayer<K, V> {
366 fn from(layer: Arc<dyn Layer<K, V>>) -> Self {
367 let event = layer.lock().unwrap();
368 Self(event, layer)
369 }
370}
371
372impl<K, V> std::ops::Deref for LockedLayer<K, V> {
373 type Target = Arc<dyn Layer<K, V>>;
374
375 fn deref(&self) -> &Self::Target {
376 &self.1
377 }
378}
379
380impl<K, V> AsRef<dyn Layer<K, V>> for LockedLayer<K, V> {
381 fn as_ref(&self) -> &(dyn Layer<K, V> + 'static) {
382 self.1.as_ref()
383 }
384}
385
386pub struct LayerSet<K, V> {
389 pub layers: Vec<LockedLayer<K, V>>,
390 merge_fn: merge::MergeFn<K, V>,
391 counters: Arc<Mutex<Counters>>,
392}
393
394impl<K: Key + LayerKey + OrdLowerBound, V: Value> LayerSet<K, V> {
395 pub fn sum_len(&self) -> usize {
396 let mut size = 0;
397 for layer in &self.layers {
398 size += *layer.estimated_len()
399 }
400 size
401 }
402
403 pub fn merger(&self) -> merge::Merger<'_, K, V> {
404 merge::Merger::new(
405 self.layers.iter().map(|x| x.as_ref()),
406 self.merge_fn,
407 self.counters.clone(),
408 )
409 }
410}
411
412impl<K, V> fmt::Debug for LayerSet<K, V> {
413 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
414 fmt.debug_list()
415 .entries(self.layers.iter().map(|l| {
416 if let Some(handle) = l.handle() {
417 format!("{}", handle.object_id())
418 } else {
419 format!("{:?}", Arc::as_ptr(l))
420 }
421 }))
422 .finish()
423 }
424}
425
426#[cfg(test)]
427mod tests {
428 use super::LSMTree;
429 use crate::drop_event::DropEvent;
430 use crate::lsm_tree::cache::{
431 NullCache, ObjectCache, ObjectCachePlaceholder, ObjectCacheResult,
432 };
433 use crate::lsm_tree::merge::{MergeLayerIterator, MergeResult};
434 use crate::lsm_tree::types::{
435 BoxedLayerIterator, FuzzyHash, Item, ItemCount, ItemRef, Key, Layer, LayerIterator,
436 LayerKey, OrdLowerBound, OrdUpperBound, SortByU64, Value,
437 };
438 use crate::lsm_tree::{layers_from_handles, Query};
439 use crate::object_handle::ObjectHandle;
440 use crate::serialized_types::{
441 versioned_type, Version, Versioned, VersionedLatest, LATEST_VERSION,
442 };
443 use crate::testing::fake_object::{FakeObject, FakeObjectHandle};
444 use crate::testing::writer::Writer;
445 use anyhow::{anyhow, Error};
446 use async_trait::async_trait;
447 use fprint::TypeFingerprint;
448 use fuchsia_sync::Mutex;
449 use fxfs_macros::FuzzyHash;
450 use rand::seq::SliceRandom;
451 use rand::thread_rng;
452 use std::hash::Hash;
453 use std::sync::Arc;
454
455 #[derive(
456 Clone,
457 Eq,
458 PartialEq,
459 Debug,
460 Hash,
461 FuzzyHash,
462 serde::Serialize,
463 serde::Deserialize,
464 TypeFingerprint,
465 Versioned,
466 )]
467 struct TestKey(std::ops::Range<u64>);
468
469 versioned_type! { 1.. => TestKey }
470
471 impl SortByU64 for TestKey {
472 fn get_leading_u64(&self) -> u64 {
473 self.0.start
474 }
475 }
476
477 impl LayerKey for TestKey {}
478
479 impl OrdUpperBound for TestKey {
480 fn cmp_upper_bound(&self, other: &TestKey) -> std::cmp::Ordering {
481 self.0.end.cmp(&other.0.end)
482 }
483 }
484
485 impl OrdLowerBound for TestKey {
486 fn cmp_lower_bound(&self, other: &Self) -> std::cmp::Ordering {
487 self.0.start.cmp(&other.0.start)
488 }
489 }
490
491 fn emit_left_merge_fn(
492 _left: &MergeLayerIterator<'_, TestKey, u64>,
493 _right: &MergeLayerIterator<'_, TestKey, u64>,
494 ) -> MergeResult<TestKey, u64> {
495 MergeResult::EmitLeft
496 }
497
498 impl Value for u64 {
499 const DELETED_MARKER: Self = 0;
500 }
501
502 #[fuchsia::test]
503 async fn test_iteration() {
504 let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
505 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
506 tree.insert(items[0].clone()).expect("insert error");
507 tree.insert(items[1].clone()).expect("insert error");
508 let layers = tree.layer_set();
509 let mut merger = layers.merger();
510 let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
511 let ItemRef { key, value, .. } = iter.get().expect("missing item");
512 assert_eq!((key, value), (&items[0].key, &items[0].value));
513 iter.advance().await.expect("advance failed");
514 let ItemRef { key, value, .. } = iter.get().expect("missing item");
515 assert_eq!((key, value), (&items[1].key, &items[1].value));
516 iter.advance().await.expect("advance failed");
517 assert!(iter.get().is_none());
518 }
519
520 #[fuchsia::test]
521 async fn test_compact() {
522 let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
523 let items = [
524 Item::new(TestKey(1..1), 1),
525 Item::new(TestKey(2..2), 2),
526 Item::new(TestKey(3..3), 3),
527 Item::new(TestKey(4..4), 4),
528 ];
529 tree.insert(items[0].clone()).expect("insert error");
530 tree.insert(items[1].clone()).expect("insert error");
531 tree.seal();
532 tree.insert(items[2].clone()).expect("insert error");
533 tree.insert(items[3].clone()).expect("insert error");
534 tree.seal();
535 let object = Arc::new(FakeObject::new());
536 let handle = FakeObjectHandle::new(object.clone());
537 {
538 let layer_set = tree.immutable_layer_set();
539 let mut merger = layer_set.merger();
540 let iter = merger.query(Query::FullScan).await.expect("create merger");
541 tree.compact_with_iterator(
542 iter,
543 items.len(),
544 Writer::new(&handle).await,
545 handle.block_size(),
546 )
547 .await
548 .expect("compact failed");
549 }
550 tree.set_layers(layers_from_handles([handle]).await.expect("layers_from_handles failed"));
551 let handle = FakeObjectHandle::new(object.clone());
552 let tree = LSMTree::open(emit_left_merge_fn, [handle], Box::new(NullCache {}))
553 .await
554 .expect("open failed");
555
556 let layers = tree.layer_set();
557 let mut merger = layers.merger();
558 let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
559 for i in 1..5 {
560 let ItemRef { key, value, .. } = iter.get().expect("missing item");
561 assert_eq!((key, value), (&TestKey(i..i), &i));
562 iter.advance().await.expect("advance failed");
563 }
564 assert!(iter.get().is_none());
565 }
566
567 #[fuchsia::test]
568 async fn test_find() {
569 let items = [
570 Item::new(TestKey(1..1), 1),
571 Item::new(TestKey(2..2), 2),
572 Item::new(TestKey(3..3), 3),
573 Item::new(TestKey(4..4), 4),
574 ];
575 let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
576 tree.insert(items[0].clone()).expect("insert error");
577 tree.insert(items[1].clone()).expect("insert error");
578 tree.seal();
579 tree.insert(items[2].clone()).expect("insert error");
580 tree.insert(items[3].clone()).expect("insert error");
581
582 let item = tree.find(&items[1].key).await.expect("find failed").expect("not found");
583 assert_eq!(item, items[1]);
584 assert!(tree.find(&TestKey(100..100)).await.expect("find failed").is_none());
585 }
586
587 #[fuchsia::test]
588 async fn test_find_no_return_deleted_values() {
589 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), u64::DELETED_MARKER)];
590 let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
591 tree.insert(items[0].clone()).expect("insert error");
592 tree.insert(items[1].clone()).expect("insert error");
593
594 let item = tree.find(&items[0].key).await.expect("find failed").expect("not found");
595 assert_eq!(item, items[0]);
596 assert!(tree.find(&items[1].key).await.expect("find failed").is_none());
597 }
598
599 #[fuchsia::test]
600 async fn test_empty_seal() {
601 let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
602 tree.seal();
603 let item = Item::new(TestKey(1..1), 1);
604 tree.insert(item.clone()).expect("insert error");
605 let object = Arc::new(FakeObject::new());
606 let handle = FakeObjectHandle::new(object.clone());
607 {
608 let layer_set = tree.immutable_layer_set();
609 let mut merger = layer_set.merger();
610 let iter = merger.query(Query::FullScan).await.expect("create merger");
611 tree.compact_with_iterator(iter, 0, Writer::new(&handle).await, handle.block_size())
612 .await
613 .expect("compact failed");
614 }
615 tree.set_layers(layers_from_handles([handle]).await.expect("layers_from_handles failed"));
616 let found_item = tree.find(&item.key).await.expect("find failed").expect("not found");
617 assert_eq!(found_item, item);
618 assert!(tree.find(&TestKey(2..2)).await.expect("find failed").is_none());
619 }
620
621 #[fuchsia::test]
622 async fn test_filter() {
623 let items = [
624 Item::new(TestKey(1..1), 1),
625 Item::new(TestKey(2..2), 2),
626 Item::new(TestKey(3..3), 3),
627 Item::new(TestKey(4..4), 4),
628 ];
629 let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
630 tree.insert(items[0].clone()).expect("insert error");
631 tree.insert(items[1].clone()).expect("insert error");
632 tree.insert(items[2].clone()).expect("insert error");
633 tree.insert(items[3].clone()).expect("insert error");
634
635 let layers = tree.layer_set();
636 let mut merger = layers.merger();
637
638 let mut iter = merger
640 .query(Query::FullScan)
641 .await
642 .expect("seek failed")
643 .filter(|item: ItemRef<'_, TestKey, u64>| item.key.0.start % 2 == 0)
644 .await
645 .expect("filter failed");
646
647 assert_eq!(iter.get(), Some(items[1].as_item_ref()));
648 iter.advance().await.expect("advance failed");
649 assert_eq!(iter.get(), Some(items[3].as_item_ref()));
650 iter.advance().await.expect("advance failed");
651 assert!(iter.get().is_none());
652 }
653
654 #[fuchsia::test]
655 async fn test_insert_order_agnostic() {
656 let items = [
657 Item::new(TestKey(1..1), 1),
658 Item::new(TestKey(2..2), 2),
659 Item::new(TestKey(3..3), 3),
660 Item::new(TestKey(4..4), 4),
661 Item::new(TestKey(5..5), 5),
662 Item::new(TestKey(6..6), 6),
663 ];
664 let a = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
665 for item in &items {
666 a.insert(item.clone()).expect("insert error");
667 }
668 let b = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
669 let mut shuffled = items.clone();
670 shuffled.shuffle(&mut thread_rng());
671 for item in &shuffled {
672 b.insert(item.clone()).expect("insert error");
673 }
674 let layers = a.layer_set();
675 let mut merger = layers.merger();
676 let mut iter_a = merger.query(Query::FullScan).await.expect("seek failed");
677 let layers = b.layer_set();
678 let mut merger = layers.merger();
679 let mut iter_b = merger.query(Query::FullScan).await.expect("seek failed");
680
681 for item in items {
682 assert_eq!(Some(item.as_item_ref()), iter_a.get());
683 assert_eq!(Some(item.as_item_ref()), iter_b.get());
684 iter_a.advance().await.expect("advance failed");
685 iter_b.advance().await.expect("advance failed");
686 }
687 assert!(iter_a.get().is_none());
688 assert!(iter_b.get().is_none());
689 }
690
691 struct AuditCacheInner<'a, V: Value> {
692 lookups: u64,
693 completions: u64,
694 invalidations: u64,
695 drops: u64,
696 result: Option<ObjectCacheResult<'a, V>>,
697 }
698
699 impl<V: Value> AuditCacheInner<'_, V> {
700 fn stats(&self) -> (u64, u64, u64, u64) {
701 (self.lookups, self.completions, self.invalidations, self.drops)
702 }
703 }
704
705 struct AuditCache<'a, V: Value> {
706 inner: Arc<Mutex<AuditCacheInner<'a, V>>>,
707 }
708
709 impl<V: Value> AuditCache<'_, V> {
710 fn new() -> Self {
711 Self {
712 inner: Arc::new(Mutex::new(AuditCacheInner {
713 lookups: 0,
714 completions: 0,
715 invalidations: 0,
716 drops: 0,
717 result: None,
718 })),
719 }
720 }
721 }
722
723 struct AuditPlaceholder<'a, V: Value> {
724 inner: Arc<Mutex<AuditCacheInner<'a, V>>>,
725 completed: Mutex<bool>,
726 }
727
728 impl<V: Value> ObjectCachePlaceholder<V> for AuditPlaceholder<'_, V> {
729 fn complete(self: Box<Self>, _: Option<&V>) {
730 self.inner.lock().completions += 1;
731 *self.completed.lock() = true;
732 }
733 }
734
735 impl<V: Value> Drop for AuditPlaceholder<'_, V> {
736 fn drop(&mut self) {
737 if !*self.completed.lock() {
738 self.inner.lock().drops += 1;
739 }
740 }
741 }
742
743 impl<K: Key + std::cmp::PartialEq, V: Value> ObjectCache<K, V> for AuditCache<'_, V> {
744 fn lookup_or_reserve(&self, _key: &K) -> ObjectCacheResult<'_, V> {
745 {
746 let mut inner = self.inner.lock();
747 inner.lookups += 1;
748 if inner.result.is_some() {
749 return std::mem::take(&mut inner.result).unwrap();
750 }
751 }
752 ObjectCacheResult::Placeholder(Box::new(AuditPlaceholder {
753 inner: self.inner.clone(),
754 completed: Mutex::new(false),
755 }))
756 }
757
758 fn invalidate(&self, _key: K, _value: Option<V>) {
759 self.inner.lock().invalidations += 1;
760 }
761 }
762
763 #[fuchsia::test]
764 async fn test_cache_handling() {
765 let item = Item::new(TestKey(1..1), 1);
766 let cache = Box::new(AuditCache::new());
767 let inner = cache.inner.clone();
768 let a = LSMTree::new(emit_left_merge_fn, cache);
769
770 assert_eq!(inner.lock().stats(), (0, 0, 0, 0));
772
773 assert!(a.find(&item.key).await.expect("Failed find").is_none());
775 assert_eq!(inner.lock().stats(), (1, 0, 0, 1));
776
777 let _ = a.insert(item.clone());
779 assert_eq!(inner.lock().stats(), (1, 0, 1, 1));
780
781 assert_eq!(
783 a.find(&item.key).await.expect("Failed find").expect("Item should be found.").value,
784 item.value
785 );
786 assert_eq!(inner.lock().stats(), (2, 1, 1, 1));
787
788 a.replace_or_insert(item.clone());
790 assert_eq!(inner.lock().stats(), (2, 1, 2, 1));
791 }
792
793 #[fuchsia::test]
794 async fn test_cache_hit() {
795 let item = Item::new(TestKey(1..1), 1);
796 let cache = Box::new(AuditCache::new());
797 let inner = cache.inner.clone();
798 let a = LSMTree::new(emit_left_merge_fn, cache);
799
800 assert_eq!(inner.lock().stats(), (0, 0, 0, 0));
802
803 let _ = a.insert(item.clone());
805 assert_eq!(inner.lock().stats(), (0, 0, 1, 0));
806
807 inner.lock().result = Some(ObjectCacheResult::Value(item.value.clone()));
809
810 assert_eq!(
812 a.find(&item.key).await.expect("Failed find").expect("Item should be found.").value,
813 item.value
814 );
815 assert_eq!(inner.lock().stats(), (1, 0, 1, 0));
816 }
817
818 #[fuchsia::test]
819 async fn test_cache_says_uncacheable() {
820 let item = Item::new(TestKey(1..1), 1);
821 let cache = Box::new(AuditCache::new());
822 let inner = cache.inner.clone();
823 let a = LSMTree::new(emit_left_merge_fn, cache);
824 let _ = a.insert(item.clone());
825
826 assert_eq!(inner.lock().stats(), (0, 0, 1, 0));
828
829 inner.lock().result = Some(ObjectCacheResult::NoCache);
831
832 assert_eq!(
834 a.find(&item.key).await.expect("Failed find").expect("Should find item").value,
835 item.value
836 );
837 assert_eq!(inner.lock().stats(), (1, 0, 1, 0));
838 }
839
840 struct FailLayer {
841 drop_event: Mutex<Option<Arc<DropEvent>>>,
842 }
843
844 impl FailLayer {
845 fn new() -> Self {
846 Self { drop_event: Mutex::new(Some(Arc::new(DropEvent::new()))) }
847 }
848 }
849
850 #[async_trait]
851 impl<K: Key, V: Value> Layer<K, V> for FailLayer {
852 async fn seek(
853 &self,
854 _bound: std::ops::Bound<&K>,
855 ) -> Result<BoxedLayerIterator<'_, K, V>, Error> {
856 Err(anyhow!("Purposely failed seek"))
857 }
858
859 fn lock(&self) -> Option<Arc<DropEvent>> {
860 self.drop_event.lock().clone()
861 }
862
863 fn estimated_len(&self) -> ItemCount {
864 ItemCount::Estimate(0)
865 }
866
867 async fn close(&self) {
868 let listener = match std::mem::replace(&mut (*self.drop_event.lock()), None) {
869 Some(drop_event) => drop_event.listen(),
870 None => return,
871 };
872 listener.await;
873 }
874
875 fn get_version(&self) -> Version {
876 LATEST_VERSION
877 }
878 }
879
880 #[fuchsia::test]
881 async fn test_failed_lookup() {
882 let cache = Box::new(AuditCache::new());
883 let inner = cache.inner.clone();
884 let a = LSMTree::new(emit_left_merge_fn, cache);
885 a.set_layers(vec![Arc::new(FailLayer::new())]);
886
887 assert_eq!(inner.lock().stats(), (0, 0, 0, 0));
889
890 assert!(a.find(&TestKey(1..1)).await.is_err());
892 assert_eq!(inner.lock().stats(), (1, 0, 0, 1));
893 }
894}
895
896#[cfg(fuzz)]
897mod fuzz {
898 use crate::lsm_tree::types::{
899 FuzzyHash, Item, LayerKey, OrdLowerBound, OrdUpperBound, SortByU64, Value,
900 };
901 use crate::serialized_types::{
902 versioned_type, Version, Versioned, VersionedLatest, LATEST_VERSION,
903 };
904 use arbitrary::Arbitrary;
905 use fprint::TypeFingerprint;
906 use fuzz::fuzz;
907 use fxfs_macros::FuzzyHash;
908 use std::hash::Hash;
909
910 #[derive(
911 Arbitrary,
912 Clone,
913 Eq,
914 Hash,
915 FuzzyHash,
916 PartialEq,
917 Debug,
918 serde::Serialize,
919 serde::Deserialize,
920 TypeFingerprint,
921 Versioned,
922 )]
923 struct TestKey(std::ops::Range<u64>);
924
925 versioned_type! { 1.. => TestKey }
926
927 impl Versioned for u64 {}
928 versioned_type! { 1.. => u64 }
929
930 impl LayerKey for TestKey {}
931
932 impl SortByU64 for TestKey {
933 fn get_leading_u64(&self) -> u64 {
934 self.0.start
935 }
936 }
937
938 impl OrdUpperBound for TestKey {
939 fn cmp_upper_bound(&self, other: &TestKey) -> std::cmp::Ordering {
940 self.0.end.cmp(&other.0.end)
941 }
942 }
943
944 impl OrdLowerBound for TestKey {
945 fn cmp_lower_bound(&self, other: &Self) -> std::cmp::Ordering {
946 self.0.start.cmp(&other.0.start)
947 }
948 }
949
950 impl Value for u64 {
951 const DELETED_MARKER: Self = 0;
952 }
953
954 #[allow(dead_code)]
958 #[derive(Arbitrary)]
959 enum FuzzAction {
960 Insert(Item<TestKey, u64>),
961 ReplaceOrInsert(Item<TestKey, u64>),
962 MergeInto(Item<TestKey, u64>, TestKey),
963 Find(TestKey),
964 Seal,
965 }
966
967 #[fuzz]
968 fn fuzz_lsm_tree_actions(actions: Vec<FuzzAction>) {
969 use super::cache::NullCache;
970 use super::LSMTree;
971 use crate::lsm_tree::merge::{MergeLayerIterator, MergeResult};
972 use futures::executor::block_on;
973
974 fn emit_left_merge_fn(
975 _left: &MergeLayerIterator<'_, TestKey, u64>,
976 _right: &MergeLayerIterator<'_, TestKey, u64>,
977 ) -> MergeResult<TestKey, u64> {
978 MergeResult::EmitLeft
979 }
980
981 let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
982 for action in actions {
983 match action {
984 FuzzAction::Insert(item) => {
985 let _ = tree.insert(item);
986 }
987 FuzzAction::ReplaceOrInsert(item) => {
988 tree.replace_or_insert(item);
989 }
990 FuzzAction::Find(key) => {
991 block_on(tree.find(&key)).expect("find failed");
992 }
993 FuzzAction::MergeInto(item, bound) => tree.merge_into(item, &bound),
994 FuzzAction::Seal => tree.seal(),
995 };
996 }
997 }
998}