1mod bloom_filter;
6pub mod cache;
7pub mod merge;
8pub mod persistent_layer;
9pub mod skip_list_layer;
10pub mod types;
11
12use crate::drop_event::DropEvent;
13use crate::log::*;
14use crate::object_handle::{ReadObjectHandle, WriteBytes};
15use crate::serialized_types::{LATEST_VERSION, Version};
16use anyhow::Error;
17use cache::{ObjectCache, ObjectCacheResult};
18use fuchsia_sync::{Mutex, RwLock};
19use persistent_layer::{PersistentLayer, PersistentLayerWriter};
20use skip_list_layer::SkipListLayer;
21use std::fmt;
22use std::sync::Arc;
23use types::{
24 Existence, Item, ItemRef, Key, Layer, LayerIterator, LayerKey, LayerWriter, MergeableKey,
25 OrdLowerBound, Value,
26};
27
28pub use merge::Query;
29
30const SKIP_LIST_LAYER_ITEMS: usize = 512;
31
32pub use persistent_layer::{
34 LayerHeader as PersistentLayerHeader, LayerHeaderV39 as PersistentLayerHeaderV39,
35 LayerInfo as PersistentLayerInfo, LayerInfoV39 as PersistentLayerInfoV39,
36};
37
38pub async fn layers_from_handles<K: Key, V: Value>(
39 handles: impl IntoIterator<Item = impl ReadObjectHandle + 'static>,
40) -> Result<Vec<Arc<dyn Layer<K, V>>>, Error> {
41 let mut layers = Vec::new();
42 for handle in handles {
43 layers.push(PersistentLayer::open(handle).await? as Arc<dyn Layer<K, V>>);
44 }
45 Ok(layers)
46}
47
48#[derive(Eq, PartialEq, Debug)]
49pub enum Operation {
50 Insert,
51 ReplaceOrInsert,
52 MergeInto,
53}
54
55pub type MutationCallback<K, V> = Option<Box<dyn Fn(Operation, &Item<K, V>) + Send + Sync>>;
56
57struct Inner<K, V> {
58 mutable_layer: Arc<SkipListLayer<K, V>>,
59 layers: Vec<Arc<dyn Layer<K, V>>>,
60 mutation_callback: MutationCallback<K, V>,
61}
62
63#[derive(Default)]
64pub(super) struct Counters {
65 num_seeks: usize,
66 layer_files_total: usize,
70 layer_files_skipped: usize,
71}
72
73pub struct LSMTree<K, V> {
77 data: RwLock<Inner<K, V>>,
78 merge_fn: merge::MergeFn<K, V>,
79 cache: Box<dyn ObjectCache<K, V>>,
80 counters: Arc<Mutex<Counters>>,
81}
82
83#[fxfs_trace::trace]
84impl<'tree, K: MergeableKey, V: Value> LSMTree<K, V> {
85 pub fn new(merge_fn: merge::MergeFn<K, V>, cache: Box<dyn ObjectCache<K, V>>) -> Self {
87 LSMTree {
88 data: RwLock::new(Inner {
89 mutable_layer: Self::new_mutable_layer(),
90 layers: Vec::new(),
91 mutation_callback: None,
92 }),
93 merge_fn,
94 cache,
95 counters: Arc::new(Mutex::new(Default::default())),
96 }
97 }
98
99 pub async fn open(
101 merge_fn: merge::MergeFn<K, V>,
102 handles: impl IntoIterator<Item = impl ReadObjectHandle + 'static>,
103 cache: Box<dyn ObjectCache<K, V>>,
104 ) -> Result<Self, Error> {
105 Ok(LSMTree {
106 data: RwLock::new(Inner {
107 mutable_layer: Self::new_mutable_layer(),
108 layers: layers_from_handles(handles).await?,
109 mutation_callback: None,
110 }),
111 merge_fn,
112 cache,
113 counters: Arc::new(Mutex::new(Default::default())),
114 })
115 }
116
117 pub fn set_layers(&self, layers: Vec<Arc<dyn Layer<K, V>>>) {
119 self.data.write().layers = layers;
120 }
121
122 pub async fn append_layers(
125 &self,
126 handles: impl IntoIterator<Item = impl ReadObjectHandle + 'static>,
127 ) -> Result<(), Error> {
128 let mut layers = layers_from_handles(handles).await?;
129 self.data.write().layers.append(&mut layers);
130 Ok(())
131 }
132
133 pub fn reset_immutable_layers(&self) {
135 self.data.write().layers = Vec::new();
136 }
137
138 pub fn seal(&self) {
140 let mut data = self.data.write();
143 let layer = std::mem::replace(&mut data.mutable_layer, Self::new_mutable_layer());
144 data.layers.insert(0, layer);
145 }
146
147 pub fn reset(&self) {
149 let mut data = self.data.write();
150 data.layers = Vec::new();
151 data.mutable_layer = Self::new_mutable_layer();
152 }
153
154 #[trace]
156 pub async fn compact_with_iterator<W: WriteBytes + Send>(
157 &self,
158 mut iterator: impl LayerIterator<K, V>,
159 num_items: usize,
160 writer: W,
161 block_size: u64,
162 ) -> Result<(), Error> {
163 let mut writer =
164 PersistentLayerWriter::<W, K, V>::new(writer, num_items, block_size).await?;
165 while let Some(item_ref) = iterator.get() {
166 debug!(item_ref:?; "compact: writing");
167 writer.write(item_ref).await?;
168 iterator.advance().await?;
169 }
170 writer.flush().await
171 }
172
173 pub fn empty_layer_set(&self) -> LayerSet<K, V> {
175 LayerSet { layers: Vec::new(), merge_fn: self.merge_fn, counters: self.counters.clone() }
176 }
177
178 pub fn add_all_layers_to_layer_set(&self, layer_set: &mut LayerSet<K, V>) {
180 let data = self.data.read();
181 layer_set.layers.reserve_exact(data.layers.len() + 1);
182 layer_set
183 .layers
184 .push(LockedLayer::from(data.mutable_layer.clone() as Arc<dyn Layer<K, V>>));
185 for layer in &data.layers {
186 layer_set.layers.push(layer.clone().into());
187 }
188 }
189
190 pub fn layer_set(&self) -> LayerSet<K, V> {
193 let mut layer_set = self.empty_layer_set();
194 self.add_all_layers_to_layer_set(&mut layer_set);
195 layer_set
196 }
197
198 pub fn immutable_layer_set(&self) -> LayerSet<K, V> {
202 let data = self.data.read();
203 let mut layers = Vec::with_capacity(data.layers.len());
204 for layer in &data.layers {
205 layers.push(layer.clone().into());
206 }
207 LayerSet { layers, merge_fn: self.merge_fn, counters: self.counters.clone() }
208 }
209
210 pub fn insert(&self, item: Item<K, V>) -> Result<(), Error> {
213 let key = item.key.clone();
214 let val = if item.value == V::DELETED_MARKER { None } else { Some(item.value.clone()) };
215 {
216 let data = self.data.read();
218 if let Some(mutation_callback) = data.mutation_callback.as_ref() {
219 mutation_callback(Operation::Insert, &item);
220 }
221 data.mutable_layer.insert(item)?;
222 }
223 self.cache.invalidate(key, val);
224 Ok(())
225 }
226
227 pub fn replace_or_insert(&self, item: Item<K, V>) {
229 let key = item.key.clone();
230 let val = if item.value == V::DELETED_MARKER { None } else { Some(item.value.clone()) };
231 {
232 let data = self.data.read();
234 if let Some(mutation_callback) = data.mutation_callback.as_ref() {
235 mutation_callback(Operation::ReplaceOrInsert, &item);
236 }
237 data.mutable_layer.replace_or_insert(item);
238 }
239 self.cache.invalidate(key, val);
240 }
241
242 pub fn merge_into(&self, item: Item<K, V>, lower_bound: &K) {
244 let key = item.key.clone();
245 {
246 let data = self.data.read();
248 if let Some(mutation_callback) = data.mutation_callback.as_ref() {
249 mutation_callback(Operation::MergeInto, &item);
250 }
251 data.mutable_layer.merge_into(item, lower_bound, self.merge_fn);
252 }
253 self.cache.invalidate(key, None);
254 }
255
256 pub async fn find(&self, search_key: &K) -> Result<Option<Item<K, V>>, Error>
259 where
260 K: Eq,
261 {
262 let token = match self.cache.lookup_or_reserve(search_key) {
266 ObjectCacheResult::Value(value) => {
267 if value == V::DELETED_MARKER {
268 return Ok(None);
269 } else {
270 return Ok(Some(Item::new(search_key.clone(), value)));
271 }
272 }
273 ObjectCacheResult::Placeholder(token) => Some(token),
274 ObjectCacheResult::NoCache => None,
275 };
276 let layer_set = self.layer_set();
277 let mut merger = layer_set.merger();
278
279 Ok(match merger.query(Query::Point(search_key)).await?.get() {
280 Some(ItemRef { key, value, sequence })
281 if key == search_key && *value != V::DELETED_MARKER =>
282 {
283 if let Some(token) = token {
284 token.complete(Some(value));
285 }
286 Some(Item { key: key.clone(), value: value.clone(), sequence })
287 }
288 _ => None,
289 })
290 }
291
292 pub fn mutable_layer(&self) -> Arc<SkipListLayer<K, V>> {
293 self.data.read().mutable_layer.clone()
294 }
295
296 pub fn set_mutation_callback(&self, mutation_callback: MutationCallback<K, V>) {
300 self.data.write().mutation_callback = mutation_callback;
301 }
302
303 pub fn get_earliest_version(&self) -> Version {
305 let mut earliest_version = LATEST_VERSION;
306 for layer in self.layer_set().layers {
307 let layer_version = layer.get_version();
308 if layer_version < earliest_version {
309 earliest_version = layer_version;
310 }
311 }
312 return earliest_version;
313 }
314
315 pub fn new_mutable_layer() -> Arc<SkipListLayer<K, V>> {
317 SkipListLayer::new(SKIP_LIST_LAYER_ITEMS)
318 }
319
320 pub fn set_mutable_layer(&self, layer: Arc<SkipListLayer<K, V>>) {
322 self.data.write().mutable_layer = layer;
323 }
324
325 pub fn record_inspect_data(&self, root: &fuchsia_inspect::Node) {
327 let layer_set = self.layer_set();
328 root.record_child("layers", move |node| {
329 let mut index = 0;
330 for layer in layer_set.layers {
331 node.record_child(format!("{index}"), move |node| {
332 layer.1.record_inspect_data(node)
333 });
334 index += 1;
335 }
336 });
337 {
338 let counters = self.counters.lock();
339 root.record_uint("num_seeks", counters.num_seeks as u64);
340 root.record_uint(
341 "bloom_filter_success_percent",
342 if counters.layer_files_total == 0 {
343 0
344 } else {
345 (counters.layer_files_skipped * 100).div_ceil(counters.layer_files_total) as u64
346 },
347 );
348 }
349 }
350}
351
352pub struct LockedLayer<K, V>(Arc<DropEvent>, Arc<dyn Layer<K, V>>);
355
356impl<K, V> LockedLayer<K, V> {
357 pub async fn close_layer(self) {
358 let layer = self.1;
359 std::mem::drop(self.0);
360 layer.close().await;
361 }
362}
363
364impl<K, V> From<Arc<dyn Layer<K, V>>> for LockedLayer<K, V> {
365 fn from(layer: Arc<dyn Layer<K, V>>) -> Self {
366 let event = layer.lock().unwrap();
367 Self(event, layer)
368 }
369}
370
371impl<K, V> std::ops::Deref for LockedLayer<K, V> {
372 type Target = Arc<dyn Layer<K, V>>;
373
374 fn deref(&self) -> &Self::Target {
375 &self.1
376 }
377}
378
379impl<K, V> AsRef<dyn Layer<K, V>> for LockedLayer<K, V> {
380 fn as_ref(&self) -> &(dyn Layer<K, V> + 'static) {
381 self.1.as_ref()
382 }
383}
384
385pub struct LayerSet<K, V> {
388 pub layers: Vec<LockedLayer<K, V>>,
389 merge_fn: merge::MergeFn<K, V>,
390 counters: Arc<Mutex<Counters>>,
391}
392
393impl<K: Key + LayerKey + OrdLowerBound, V: Value> LayerSet<K, V> {
394 pub fn sum_len(&self) -> usize {
395 let mut size = 0;
396 for layer in &self.layers {
397 size += layer.len()
398 }
399 size
400 }
401
402 pub fn merger(&self) -> merge::Merger<'_, K, V> {
403 merge::Merger::new(
404 self.layers.iter().map(|x| x.as_ref()),
405 self.merge_fn,
406 self.counters.clone(),
407 )
408 }
409
410 pub async fn key_exists(&self, key: &K) -> Result<Existence, Error> {
412 for l in &self.layers {
413 match l.key_exists(key).await? {
414 e @ (Existence::Exists | Existence::MaybeExists) => return Ok(e),
415 _ => {}
416 }
417 }
418 Ok(Existence::Missing)
419 }
420}
421
422impl<K, V> fmt::Debug for LayerSet<K, V> {
423 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
424 fmt.debug_list()
425 .entries(self.layers.iter().map(|l| {
426 if let Some(handle) = l.handle() {
427 format!("{}", handle.object_id())
428 } else {
429 format!("{:?}", Arc::as_ptr(l))
430 }
431 }))
432 .finish()
433 }
434}
435
436#[cfg(test)]
437mod tests {
438 use super::LSMTree;
439 use crate::drop_event::DropEvent;
440 use crate::lsm_tree::cache::{
441 NullCache, ObjectCache, ObjectCachePlaceholder, ObjectCacheResult,
442 };
443 use crate::lsm_tree::merge::{MergeLayerIterator, MergeResult};
444 use crate::lsm_tree::types::{
445 BoxedLayerIterator, Existence, FuzzyHash, Item, ItemRef, Key, Layer, LayerIterator,
446 LayerKey, OrdLowerBound, OrdUpperBound, SortByU64, Value,
447 };
448 use crate::lsm_tree::{Query, layers_from_handles};
449 use crate::object_handle::ObjectHandle;
450 use crate::serialized_types::{
451 LATEST_VERSION, Version, Versioned, VersionedLatest, versioned_type,
452 };
453 use crate::testing::fake_object::{FakeObject, FakeObjectHandle};
454 use crate::testing::writer::Writer;
455 use anyhow::{Error, anyhow};
456 use async_trait::async_trait;
457 use fprint::TypeFingerprint;
458 use fuchsia_sync::Mutex;
459 use fxfs_macros::FuzzyHash;
460 use rand::rng;
461 use rand::seq::SliceRandom;
462 use std::hash::Hash;
463 use std::sync::Arc;
464
465 #[derive(
466 Clone,
467 Eq,
468 PartialEq,
469 Debug,
470 Hash,
471 FuzzyHash,
472 serde::Serialize,
473 serde::Deserialize,
474 TypeFingerprint,
475 Versioned,
476 )]
477 struct TestKey(std::ops::Range<u64>);
478
479 versioned_type! { 1.. => TestKey }
480
481 impl SortByU64 for TestKey {
482 fn get_leading_u64(&self) -> u64 {
483 self.0.start
484 }
485 }
486
487 impl LayerKey for TestKey {}
488
489 impl OrdUpperBound for TestKey {
490 fn cmp_upper_bound(&self, other: &TestKey) -> std::cmp::Ordering {
491 self.0.end.cmp(&other.0.end)
492 }
493 }
494
495 impl OrdLowerBound for TestKey {
496 fn cmp_lower_bound(&self, other: &Self) -> std::cmp::Ordering {
497 self.0.start.cmp(&other.0.start)
498 }
499 }
500
501 fn emit_left_merge_fn(
502 _left: &MergeLayerIterator<'_, TestKey, u64>,
503 _right: &MergeLayerIterator<'_, TestKey, u64>,
504 ) -> MergeResult<TestKey, u64> {
505 MergeResult::EmitLeft
506 }
507
508 impl Value for u64 {
509 const DELETED_MARKER: Self = 0;
510 }
511
512 #[fuchsia::test]
513 async fn test_iteration() {
514 let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
515 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
516 tree.insert(items[0].clone()).expect("insert error");
517 tree.insert(items[1].clone()).expect("insert error");
518 let layers = tree.layer_set();
519 let mut merger = layers.merger();
520 let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
521 let ItemRef { key, value, .. } = iter.get().expect("missing item");
522 assert_eq!((key, value), (&items[0].key, &items[0].value));
523 iter.advance().await.expect("advance failed");
524 let ItemRef { key, value, .. } = iter.get().expect("missing item");
525 assert_eq!((key, value), (&items[1].key, &items[1].value));
526 iter.advance().await.expect("advance failed");
527 assert!(iter.get().is_none());
528 }
529
530 #[fuchsia::test]
531 async fn test_compact() {
532 let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
533 let items = [
534 Item::new(TestKey(1..1), 1),
535 Item::new(TestKey(2..2), 2),
536 Item::new(TestKey(3..3), 3),
537 Item::new(TestKey(4..4), 4),
538 ];
539 tree.insert(items[0].clone()).expect("insert error");
540 tree.insert(items[1].clone()).expect("insert error");
541 tree.seal();
542 tree.insert(items[2].clone()).expect("insert error");
543 tree.insert(items[3].clone()).expect("insert error");
544 tree.seal();
545 let object = Arc::new(FakeObject::new());
546 let handle = FakeObjectHandle::new(object.clone());
547 {
548 let layer_set = tree.immutable_layer_set();
549 let mut merger = layer_set.merger();
550 let iter = merger.query(Query::FullScan).await.expect("create merger");
551 tree.compact_with_iterator(
552 iter,
553 items.len(),
554 Writer::new(&handle).await,
555 handle.block_size(),
556 )
557 .await
558 .expect("compact failed");
559 }
560 tree.set_layers(layers_from_handles([handle]).await.expect("layers_from_handles failed"));
561 let handle = FakeObjectHandle::new(object.clone());
562 let tree = LSMTree::open(emit_left_merge_fn, [handle], Box::new(NullCache {}))
563 .await
564 .expect("open failed");
565
566 let layers = tree.layer_set();
567 let mut merger = layers.merger();
568 let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
569 for i in 1..5 {
570 let ItemRef { key, value, .. } = iter.get().expect("missing item");
571 assert_eq!((key, value), (&TestKey(i..i), &i));
572 iter.advance().await.expect("advance failed");
573 }
574 assert!(iter.get().is_none());
575 }
576
577 #[fuchsia::test]
578 async fn test_find() {
579 let items = [
580 Item::new(TestKey(1..1), 1),
581 Item::new(TestKey(2..2), 2),
582 Item::new(TestKey(3..3), 3),
583 Item::new(TestKey(4..4), 4),
584 ];
585 let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
586 tree.insert(items[0].clone()).expect("insert error");
587 tree.insert(items[1].clone()).expect("insert error");
588 tree.seal();
589 tree.insert(items[2].clone()).expect("insert error");
590 tree.insert(items[3].clone()).expect("insert error");
591
592 let item = tree.find(&items[1].key).await.expect("find failed").expect("not found");
593 assert_eq!(item, items[1]);
594 assert!(tree.find(&TestKey(100..100)).await.expect("find failed").is_none());
595 }
596
597 #[fuchsia::test]
598 async fn test_find_no_return_deleted_values() {
599 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), u64::DELETED_MARKER)];
600 let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
601 tree.insert(items[0].clone()).expect("insert error");
602 tree.insert(items[1].clone()).expect("insert error");
603
604 let item = tree.find(&items[0].key).await.expect("find failed").expect("not found");
605 assert_eq!(item, items[0]);
606 assert!(tree.find(&items[1].key).await.expect("find failed").is_none());
607 }
608
609 #[fuchsia::test]
610 async fn test_empty_seal() {
611 let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
612 tree.seal();
613 let item = Item::new(TestKey(1..1), 1);
614 tree.insert(item.clone()).expect("insert error");
615 let object = Arc::new(FakeObject::new());
616 let handle = FakeObjectHandle::new(object.clone());
617 {
618 let layer_set = tree.immutable_layer_set();
619 let mut merger = layer_set.merger();
620 let iter = merger.query(Query::FullScan).await.expect("create merger");
621 tree.compact_with_iterator(iter, 0, Writer::new(&handle).await, handle.block_size())
622 .await
623 .expect("compact failed");
624 }
625 tree.set_layers(layers_from_handles([handle]).await.expect("layers_from_handles failed"));
626 let found_item = tree.find(&item.key).await.expect("find failed").expect("not found");
627 assert_eq!(found_item, item);
628 assert!(tree.find(&TestKey(2..2)).await.expect("find failed").is_none());
629 }
630
631 #[fuchsia::test]
632 async fn test_filter() {
633 let items = [
634 Item::new(TestKey(1..1), 1),
635 Item::new(TestKey(2..2), 2),
636 Item::new(TestKey(3..3), 3),
637 Item::new(TestKey(4..4), 4),
638 ];
639 let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
640 tree.insert(items[0].clone()).expect("insert error");
641 tree.insert(items[1].clone()).expect("insert error");
642 tree.insert(items[2].clone()).expect("insert error");
643 tree.insert(items[3].clone()).expect("insert error");
644
645 let layers = tree.layer_set();
646 let mut merger = layers.merger();
647
648 let mut iter = merger
650 .query(Query::FullScan)
651 .await
652 .expect("seek failed")
653 .filter(|item: ItemRef<'_, TestKey, u64>| item.key.0.start % 2 == 0)
654 .await
655 .expect("filter failed");
656
657 assert_eq!(iter.get(), Some(items[1].as_item_ref()));
658 iter.advance().await.expect("advance failed");
659 assert_eq!(iter.get(), Some(items[3].as_item_ref()));
660 iter.advance().await.expect("advance failed");
661 assert!(iter.get().is_none());
662 }
663
664 #[fuchsia::test]
665 async fn test_insert_order_agnostic() {
666 let items = [
667 Item::new(TestKey(1..1), 1),
668 Item::new(TestKey(2..2), 2),
669 Item::new(TestKey(3..3), 3),
670 Item::new(TestKey(4..4), 4),
671 Item::new(TestKey(5..5), 5),
672 Item::new(TestKey(6..6), 6),
673 ];
674 let a = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
675 for item in &items {
676 a.insert(item.clone()).expect("insert error");
677 }
678 let b = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
679 let mut shuffled = items.clone();
680 shuffled.shuffle(&mut rng());
681 for item in &shuffled {
682 b.insert(item.clone()).expect("insert error");
683 }
684 let layers = a.layer_set();
685 let mut merger = layers.merger();
686 let mut iter_a = merger.query(Query::FullScan).await.expect("seek failed");
687 let layers = b.layer_set();
688 let mut merger = layers.merger();
689 let mut iter_b = merger.query(Query::FullScan).await.expect("seek failed");
690
691 for item in items {
692 assert_eq!(Some(item.as_item_ref()), iter_a.get());
693 assert_eq!(Some(item.as_item_ref()), iter_b.get());
694 iter_a.advance().await.expect("advance failed");
695 iter_b.advance().await.expect("advance failed");
696 }
697 assert!(iter_a.get().is_none());
698 assert!(iter_b.get().is_none());
699 }
700
701 struct AuditCacheInner<'a, V: Value> {
702 lookups: u64,
703 completions: u64,
704 invalidations: u64,
705 drops: u64,
706 result: Option<ObjectCacheResult<'a, V>>,
707 }
708
709 impl<V: Value> AuditCacheInner<'_, V> {
710 fn stats(&self) -> (u64, u64, u64, u64) {
711 (self.lookups, self.completions, self.invalidations, self.drops)
712 }
713 }
714
715 struct AuditCache<'a, V: Value> {
716 inner: Arc<Mutex<AuditCacheInner<'a, V>>>,
717 }
718
719 impl<V: Value> AuditCache<'_, V> {
720 fn new() -> Self {
721 Self {
722 inner: Arc::new(Mutex::new(AuditCacheInner {
723 lookups: 0,
724 completions: 0,
725 invalidations: 0,
726 drops: 0,
727 result: None,
728 })),
729 }
730 }
731 }
732
733 struct AuditPlaceholder<'a, V: Value> {
734 inner: Arc<Mutex<AuditCacheInner<'a, V>>>,
735 completed: Mutex<bool>,
736 }
737
738 impl<V: Value> ObjectCachePlaceholder<V> for AuditPlaceholder<'_, V> {
739 fn complete(self: Box<Self>, _: Option<&V>) {
740 self.inner.lock().completions += 1;
741 *self.completed.lock() = true;
742 }
743 }
744
745 impl<V: Value> Drop for AuditPlaceholder<'_, V> {
746 fn drop(&mut self) {
747 if !*self.completed.lock() {
748 self.inner.lock().drops += 1;
749 }
750 }
751 }
752
753 impl<K: Key + std::cmp::PartialEq, V: Value> ObjectCache<K, V> for AuditCache<'_, V> {
754 fn lookup_or_reserve(&self, _key: &K) -> ObjectCacheResult<'_, V> {
755 {
756 let mut inner = self.inner.lock();
757 inner.lookups += 1;
758 if inner.result.is_some() {
759 return std::mem::take(&mut inner.result).unwrap();
760 }
761 }
762 ObjectCacheResult::Placeholder(Box::new(AuditPlaceholder {
763 inner: self.inner.clone(),
764 completed: Mutex::new(false),
765 }))
766 }
767
768 fn invalidate(&self, _key: K, _value: Option<V>) {
769 self.inner.lock().invalidations += 1;
770 }
771 }
772
773 #[fuchsia::test]
774 async fn test_cache_handling() {
775 let item = Item::new(TestKey(1..1), 1);
776 let cache = Box::new(AuditCache::new());
777 let inner = cache.inner.clone();
778 let a = LSMTree::new(emit_left_merge_fn, cache);
779
780 assert_eq!(inner.lock().stats(), (0, 0, 0, 0));
782
783 assert!(a.find(&item.key).await.expect("Failed find").is_none());
785 assert_eq!(inner.lock().stats(), (1, 0, 0, 1));
786
787 let _ = a.insert(item.clone());
789 assert_eq!(inner.lock().stats(), (1, 0, 1, 1));
790
791 assert_eq!(
793 a.find(&item.key).await.expect("Failed find").expect("Item should be found.").value,
794 item.value
795 );
796 assert_eq!(inner.lock().stats(), (2, 1, 1, 1));
797
798 a.replace_or_insert(item.clone());
800 assert_eq!(inner.lock().stats(), (2, 1, 2, 1));
801 }
802
803 #[fuchsia::test]
804 async fn test_cache_hit() {
805 let item = Item::new(TestKey(1..1), 1);
806 let cache = Box::new(AuditCache::new());
807 let inner = cache.inner.clone();
808 let a = LSMTree::new(emit_left_merge_fn, cache);
809
810 assert_eq!(inner.lock().stats(), (0, 0, 0, 0));
812
813 let _ = a.insert(item.clone());
815 assert_eq!(inner.lock().stats(), (0, 0, 1, 0));
816
817 inner.lock().result = Some(ObjectCacheResult::Value(item.value.clone()));
819
820 assert_eq!(
822 a.find(&item.key).await.expect("Failed find").expect("Item should be found.").value,
823 item.value
824 );
825 assert_eq!(inner.lock().stats(), (1, 0, 1, 0));
826 }
827
828 #[fuchsia::test]
829 async fn test_cache_says_uncacheable() {
830 let item = Item::new(TestKey(1..1), 1);
831 let cache = Box::new(AuditCache::new());
832 let inner = cache.inner.clone();
833 let a = LSMTree::new(emit_left_merge_fn, cache);
834 let _ = a.insert(item.clone());
835
836 assert_eq!(inner.lock().stats(), (0, 0, 1, 0));
838
839 inner.lock().result = Some(ObjectCacheResult::NoCache);
841
842 assert_eq!(
844 a.find(&item.key).await.expect("Failed find").expect("Should find item").value,
845 item.value
846 );
847 assert_eq!(inner.lock().stats(), (1, 0, 1, 0));
848 }
849
850 struct FailLayer {
851 drop_event: Mutex<Option<Arc<DropEvent>>>,
852 }
853
854 impl FailLayer {
855 fn new() -> Self {
856 Self { drop_event: Mutex::new(Some(Arc::new(DropEvent::new()))) }
857 }
858 }
859
860 #[async_trait]
861 impl<K: Key, V: Value> Layer<K, V> for FailLayer {
862 async fn seek(
863 &self,
864 _bound: std::ops::Bound<&K>,
865 ) -> Result<BoxedLayerIterator<'_, K, V>, Error> {
866 Err(anyhow!("Purposely failed seek"))
867 }
868
869 fn lock(&self) -> Option<Arc<DropEvent>> {
870 self.drop_event.lock().clone()
871 }
872
873 fn len(&self) -> usize {
874 0
875 }
876
877 async fn close(&self) {
878 let listener = match std::mem::replace(&mut (*self.drop_event.lock()), None) {
879 Some(drop_event) => drop_event.listen(),
880 None => return,
881 };
882 listener.await;
883 }
884
885 fn get_version(&self) -> Version {
886 LATEST_VERSION
887 }
888
889 async fn key_exists(&self, _key: &K) -> Result<Existence, Error> {
890 unimplemented!();
891 }
892 }
893
894 struct MockLayer {
895 exists_result: Existence,
896 drop_event: Mutex<Option<Arc<DropEvent>>>,
897 }
898
899 impl MockLayer {
900 fn new(exists_result: Existence) -> Self {
901 Self { exists_result, drop_event: Mutex::new(Some(Arc::new(DropEvent::new()))) }
902 }
903 }
904
905 #[async_trait]
906 impl<K: Key, V: Value> Layer<K, V> for MockLayer {
907 async fn seek(
908 &self,
909 _bound: std::ops::Bound<&K>,
910 ) -> Result<BoxedLayerIterator<'_, K, V>, Error> {
911 unimplemented!()
912 }
913
914 fn lock(&self) -> Option<Arc<DropEvent>> {
915 self.drop_event.lock().clone()
916 }
917
918 fn len(&self) -> usize {
919 0
920 }
921
922 async fn close(&self) {
923 let listener = match std::mem::replace(&mut (*self.drop_event.lock()), None) {
924 Some(drop_event) => drop_event.listen(),
925 None => return,
926 };
927 listener.await;
928 }
929
930 fn get_version(&self) -> Version {
931 LATEST_VERSION
932 }
933
934 async fn key_exists(&self, _key: &K) -> Result<Existence, Error> {
935 Ok(self.exists_result)
936 }
937 }
938
939 #[fuchsia::test]
940 async fn test_layer_set_key_exists() {
941 use super::LockedLayer;
942
943 let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
944 let mut layer_set = tree.empty_layer_set();
945
946 assert_eq!(
948 layer_set.key_exists(&TestKey(0..1)).await.expect("key_exists failed"),
949 Existence::Missing
950 );
951
952 layer_set.layers.push(LockedLayer::from(
954 Arc::new(MockLayer::new(Existence::Missing)) as Arc<dyn Layer<TestKey, u64>>
955 ));
956 assert_eq!(
957 layer_set.key_exists(&TestKey(0..1)).await.expect("key_exists failed"),
958 Existence::Missing
959 );
960
961 layer_set.layers.push(LockedLayer::from(
963 Arc::new(MockLayer::new(Existence::MaybeExists)) as Arc<dyn Layer<TestKey, u64>>
964 ));
965 assert_eq!(
966 layer_set.key_exists(&TestKey(0..1)).await.expect("key_exists failed"),
967 Existence::MaybeExists
968 );
969
970 layer_set.layers.insert(
972 0,
973 LockedLayer::from(
974 Arc::new(MockLayer::new(Existence::Exists)) as Arc<dyn Layer<TestKey, u64>>
975 ),
976 );
977 assert_eq!(
978 layer_set.key_exists(&TestKey(0..1)).await.expect("key_exists failed"),
979 Existence::Exists
980 );
981 }
982
983 #[fuchsia::test]
984 async fn test_failed_lookup() {
985 let cache = Box::new(AuditCache::new());
986 let inner = cache.inner.clone();
987 let a = LSMTree::new(emit_left_merge_fn, cache);
988 a.set_layers(vec![Arc::new(FailLayer::new())]);
989
990 assert_eq!(inner.lock().stats(), (0, 0, 0, 0));
992
993 assert!(a.find(&TestKey(1..1)).await.is_err());
995 assert_eq!(inner.lock().stats(), (1, 0, 0, 1));
996 }
997}
998
999#[cfg(fuzz)]
1000mod fuzz {
1001 use crate::lsm_tree::types::{
1002 FuzzyHash, Item, LayerKey, OrdLowerBound, OrdUpperBound, SortByU64, Value,
1003 };
1004 use crate::serialized_types::{
1005 LATEST_VERSION, Version, Versioned, VersionedLatest, versioned_type,
1006 };
1007 use arbitrary::Arbitrary;
1008 use fprint::TypeFingerprint;
1009 use fuzz::fuzz;
1010 use fxfs_macros::FuzzyHash;
1011 use std::hash::Hash;
1012
1013 #[derive(
1014 Arbitrary,
1015 Clone,
1016 Eq,
1017 Hash,
1018 FuzzyHash,
1019 PartialEq,
1020 Debug,
1021 serde::Serialize,
1022 serde::Deserialize,
1023 TypeFingerprint,
1024 Versioned,
1025 )]
1026 struct TestKey(std::ops::Range<u64>);
1027
1028 versioned_type! { 1.. => TestKey }
1029
1030 impl Versioned for u64 {}
1031 versioned_type! { 1.. => u64 }
1032
1033 impl LayerKey for TestKey {}
1034
1035 impl SortByU64 for TestKey {
1036 fn get_leading_u64(&self) -> u64 {
1037 self.0.start
1038 }
1039 }
1040
1041 impl OrdUpperBound for TestKey {
1042 fn cmp_upper_bound(&self, other: &TestKey) -> std::cmp::Ordering {
1043 self.0.end.cmp(&other.0.end)
1044 }
1045 }
1046
1047 impl OrdLowerBound for TestKey {
1048 fn cmp_lower_bound(&self, other: &Self) -> std::cmp::Ordering {
1049 self.0.start.cmp(&other.0.start)
1050 }
1051 }
1052
1053 impl Value for u64 {
1054 const DELETED_MARKER: Self = 0;
1055 }
1056
1057 #[allow(dead_code)]
1061 #[derive(Arbitrary)]
1062 enum FuzzAction {
1063 Insert(Item<TestKey, u64>),
1064 ReplaceOrInsert(Item<TestKey, u64>),
1065 MergeInto(Item<TestKey, u64>, TestKey),
1066 Find(TestKey),
1067 Seal,
1068 }
1069
1070 #[fuzz]
1071 fn fuzz_lsm_tree_actions(actions: Vec<FuzzAction>) {
1072 use super::LSMTree;
1073 use super::cache::NullCache;
1074 use crate::lsm_tree::merge::{MergeLayerIterator, MergeResult};
1075 use futures::executor::block_on;
1076
1077 fn emit_left_merge_fn(
1078 _left: &MergeLayerIterator<'_, TestKey, u64>,
1079 _right: &MergeLayerIterator<'_, TestKey, u64>,
1080 ) -> MergeResult<TestKey, u64> {
1081 MergeResult::EmitLeft
1082 }
1083
1084 let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
1085 for action in actions {
1086 match action {
1087 FuzzAction::Insert(item) => {
1088 let _ = tree.insert(item);
1089 }
1090 FuzzAction::ReplaceOrInsert(item) => {
1091 tree.replace_or_insert(item);
1092 }
1093 FuzzAction::Find(key) => {
1094 block_on(tree.find(&key)).expect("find failed");
1095 }
1096 FuzzAction::MergeInto(item, bound) => tree.merge_into(item, &bound),
1097 FuzzAction::Seal => tree.seal(),
1098 };
1099 }
1100 }
1101}