1mod bloom_filter;
6pub mod cache;
7pub mod merge;
8pub mod persistent_layer;
9pub mod skip_list_layer;
10pub mod types;
11
12use crate::drop_event::DropEvent;
13use crate::log::*;
14use crate::object_handle::{ReadObjectHandle, WriteBytes};
15use crate::serialized_types::{LATEST_VERSION, Version};
16use anyhow::Error;
17use cache::{ObjectCache, ObjectCacheResult};
18use fuchsia_sync::{Mutex, RwLock};
19use persistent_layer::{PersistentLayer, PersistentLayerWriter};
20use skip_list_layer::SkipListLayer;
21use std::fmt;
22use std::sync::Arc;
23use types::{
24 Item, ItemRef, Key, Layer, LayerIterator, LayerKey, LayerWriter, MergeableKey, OrdLowerBound,
25 Value,
26};
27
28pub use merge::Query;
29
30const SKIP_LIST_LAYER_ITEMS: usize = 512;
31
32pub use persistent_layer::{
34 LayerHeader as PersistentLayerHeader, LayerHeaderV39 as PersistentLayerHeaderV39,
35 LayerInfo as PersistentLayerInfo, LayerInfoV39 as PersistentLayerInfoV39,
36};
37
38pub async fn layers_from_handles<K: Key, V: Value>(
39 handles: impl IntoIterator<Item = impl ReadObjectHandle + 'static>,
40) -> Result<Vec<Arc<dyn Layer<K, V>>>, Error> {
41 let mut layers = Vec::new();
42 for handle in handles {
43 layers.push(PersistentLayer::open(handle).await? as Arc<dyn Layer<K, V>>);
44 }
45 Ok(layers)
46}
47
48#[derive(Eq, PartialEq, Debug)]
49pub enum Operation {
50 Insert,
51 ReplaceOrInsert,
52 MergeInto,
53}
54
55pub type MutationCallback<K, V> = Option<Box<dyn Fn(Operation, &Item<K, V>) + Send + Sync>>;
56
57struct Inner<K, V> {
58 mutable_layer: Arc<SkipListLayer<K, V>>,
59 layers: Vec<Arc<dyn Layer<K, V>>>,
60 mutation_callback: MutationCallback<K, V>,
61}
62
63#[derive(Default)]
64pub(super) struct Counters {
65 num_seeks: usize,
66 layer_files_total: usize,
70 layer_files_skipped: usize,
71}
72
73pub struct LSMTree<K, V> {
77 data: RwLock<Inner<K, V>>,
78 merge_fn: merge::MergeFn<K, V>,
79 cache: Box<dyn ObjectCache<K, V>>,
80 counters: Arc<Mutex<Counters>>,
81}
82
83#[fxfs_trace::trace]
84impl<'tree, K: MergeableKey, V: Value> LSMTree<K, V> {
85 pub fn new(merge_fn: merge::MergeFn<K, V>, cache: Box<dyn ObjectCache<K, V>>) -> Self {
87 LSMTree {
88 data: RwLock::new(Inner {
89 mutable_layer: Self::new_mutable_layer(),
90 layers: Vec::new(),
91 mutation_callback: None,
92 }),
93 merge_fn,
94 cache,
95 counters: Arc::new(Mutex::new(Default::default())),
96 }
97 }
98
99 pub async fn open(
101 merge_fn: merge::MergeFn<K, V>,
102 handles: impl IntoIterator<Item = impl ReadObjectHandle + 'static>,
103 cache: Box<dyn ObjectCache<K, V>>,
104 ) -> Result<Self, Error> {
105 Ok(LSMTree {
106 data: RwLock::new(Inner {
107 mutable_layer: Self::new_mutable_layer(),
108 layers: layers_from_handles(handles).await?,
109 mutation_callback: None,
110 }),
111 merge_fn,
112 cache,
113 counters: Arc::new(Mutex::new(Default::default())),
114 })
115 }
116
117 pub fn set_layers(&self, layers: Vec<Arc<dyn Layer<K, V>>>) {
119 self.data.write().layers = layers;
120 }
121
122 pub async fn append_layers(
125 &self,
126 handles: impl IntoIterator<Item = impl ReadObjectHandle + 'static>,
127 ) -> Result<(), Error> {
128 let mut layers = layers_from_handles(handles).await?;
129 self.data.write().layers.append(&mut layers);
130 Ok(())
131 }
132
133 pub fn reset_immutable_layers(&self) {
135 self.data.write().layers = Vec::new();
136 }
137
138 pub fn seal(&self) {
140 let mut data = self.data.write();
143 let layer = std::mem::replace(&mut data.mutable_layer, Self::new_mutable_layer());
144 data.layers.insert(0, layer);
145 }
146
147 pub fn reset(&self) {
149 let mut data = self.data.write();
150 data.layers = Vec::new();
151 data.mutable_layer = Self::new_mutable_layer();
152 }
153
154 #[trace]
156 pub async fn compact_with_iterator<W: WriteBytes + Send>(
157 &self,
158 mut iterator: impl LayerIterator<K, V>,
159 num_items: usize,
160 writer: W,
161 block_size: u64,
162 ) -> Result<(), Error> {
163 let mut writer =
164 PersistentLayerWriter::<W, K, V>::new(writer, num_items, block_size).await?;
165 while let Some(item_ref) = iterator.get() {
166 debug!(item_ref:?; "compact: writing");
167 writer.write(item_ref).await?;
168 iterator.advance().await?;
169 }
170 writer.flush().await
171 }
172
173 pub fn empty_layer_set(&self) -> LayerSet<K, V> {
175 LayerSet { layers: Vec::new(), merge_fn: self.merge_fn, counters: self.counters.clone() }
176 }
177
178 pub fn add_all_layers_to_layer_set(&self, layer_set: &mut LayerSet<K, V>) {
180 let data = self.data.read();
181 layer_set.layers.reserve_exact(data.layers.len() + 1);
182 layer_set
183 .layers
184 .push(LockedLayer::from(data.mutable_layer.clone() as Arc<dyn Layer<K, V>>));
185 for layer in &data.layers {
186 layer_set.layers.push(layer.clone().into());
187 }
188 }
189
190 pub fn layer_set(&self) -> LayerSet<K, V> {
193 let mut layer_set = self.empty_layer_set();
194 self.add_all_layers_to_layer_set(&mut layer_set);
195 layer_set
196 }
197
198 pub fn immutable_layer_set(&self) -> LayerSet<K, V> {
202 let data = self.data.read();
203 let mut layers = Vec::with_capacity(data.layers.len());
204 for layer in &data.layers {
205 layers.push(layer.clone().into());
206 }
207 LayerSet { layers, merge_fn: self.merge_fn, counters: self.counters.clone() }
208 }
209
210 pub fn insert(&self, item: Item<K, V>) -> Result<(), Error> {
213 let key = item.key.clone();
214 let val = if item.value == V::DELETED_MARKER { None } else { Some(item.value.clone()) };
215 {
216 let data = self.data.read();
218 if let Some(mutation_callback) = data.mutation_callback.as_ref() {
219 mutation_callback(Operation::Insert, &item);
220 }
221 data.mutable_layer.insert(item)?;
222 }
223 self.cache.invalidate(key, val);
224 Ok(())
225 }
226
227 pub fn replace_or_insert(&self, item: Item<K, V>) {
229 let key = item.key.clone();
230 let val = if item.value == V::DELETED_MARKER { None } else { Some(item.value.clone()) };
231 {
232 let data = self.data.read();
234 if let Some(mutation_callback) = data.mutation_callback.as_ref() {
235 mutation_callback(Operation::ReplaceOrInsert, &item);
236 }
237 data.mutable_layer.replace_or_insert(item);
238 }
239 self.cache.invalidate(key, val);
240 }
241
242 pub fn merge_into(&self, item: Item<K, V>, lower_bound: &K) {
244 let key = item.key.clone();
245 {
246 let data = self.data.read();
248 if let Some(mutation_callback) = data.mutation_callback.as_ref() {
249 mutation_callback(Operation::MergeInto, &item);
250 }
251 data.mutable_layer.merge_into(item, lower_bound, self.merge_fn);
252 }
253 self.cache.invalidate(key, None);
254 }
255
256 pub async fn find(&self, search_key: &K) -> Result<Option<Item<K, V>>, Error>
259 where
260 K: Eq,
261 {
262 let token = match self.cache.lookup_or_reserve(search_key) {
266 ObjectCacheResult::Value(value) => {
267 if value == V::DELETED_MARKER {
268 return Ok(None);
269 } else {
270 return Ok(Some(Item::new(search_key.clone(), value)));
271 }
272 }
273 ObjectCacheResult::Placeholder(token) => Some(token),
274 ObjectCacheResult::NoCache => None,
275 };
276 let layer_set = self.layer_set();
277 let mut merger = layer_set.merger();
278
279 Ok(match merger.query(Query::Point(search_key)).await?.get() {
280 Some(ItemRef { key, value, sequence })
281 if key == search_key && *value != V::DELETED_MARKER =>
282 {
283 if let Some(token) = token {
284 token.complete(Some(value));
285 }
286 Some(Item { key: key.clone(), value: value.clone(), sequence })
287 }
288 _ => None,
289 })
290 }
291
292 pub fn mutable_layer(&self) -> Arc<SkipListLayer<K, V>> {
293 self.data.read().mutable_layer.clone()
294 }
295
296 pub fn set_mutation_callback(&self, mutation_callback: MutationCallback<K, V>) {
300 self.data.write().mutation_callback = mutation_callback;
301 }
302
303 pub fn get_earliest_version(&self) -> Version {
305 let mut earliest_version = LATEST_VERSION;
306 for layer in self.layer_set().layers {
307 let layer_version = layer.get_version();
308 if layer_version < earliest_version {
309 earliest_version = layer_version;
310 }
311 }
312 return earliest_version;
313 }
314
315 pub fn new_mutable_layer() -> Arc<SkipListLayer<K, V>> {
317 SkipListLayer::new(SKIP_LIST_LAYER_ITEMS)
318 }
319
320 pub fn set_mutable_layer(&self, layer: Arc<SkipListLayer<K, V>>) {
322 self.data.write().mutable_layer = layer;
323 }
324
325 pub fn record_inspect_data(&self, root: &fuchsia_inspect::Node) {
327 let layer_set = self.layer_set();
328 root.record_child("layers", move |node| {
329 let mut index = 0;
330 for layer in layer_set.layers {
331 node.record_child(format!("{index}"), move |node| {
332 layer.1.record_inspect_data(node)
333 });
334 index += 1;
335 }
336 });
337 {
338 let counters = self.counters.lock();
339 root.record_uint("num_seeks", counters.num_seeks as u64);
340 root.record_uint(
341 "bloom_filter_success_percent",
342 if counters.layer_files_total == 0 {
343 0
344 } else {
345 (counters.layer_files_skipped * 100).div_ceil(counters.layer_files_total) as u64
346 },
347 );
348 }
349 }
350}
351
352pub struct LockedLayer<K, V>(Arc<DropEvent>, Arc<dyn Layer<K, V>>);
355
356impl<K, V> LockedLayer<K, V> {
357 pub async fn close_layer(self) {
358 let layer = self.1;
359 std::mem::drop(self.0);
360 layer.close().await;
361 }
362}
363
364impl<K, V> From<Arc<dyn Layer<K, V>>> for LockedLayer<K, V> {
365 fn from(layer: Arc<dyn Layer<K, V>>) -> Self {
366 let event = layer.lock().unwrap();
367 Self(event, layer)
368 }
369}
370
371impl<K, V> std::ops::Deref for LockedLayer<K, V> {
372 type Target = Arc<dyn Layer<K, V>>;
373
374 fn deref(&self) -> &Self::Target {
375 &self.1
376 }
377}
378
379impl<K, V> AsRef<dyn Layer<K, V>> for LockedLayer<K, V> {
380 fn as_ref(&self) -> &(dyn Layer<K, V> + 'static) {
381 self.1.as_ref()
382 }
383}
384
385pub struct LayerSet<K, V> {
388 pub layers: Vec<LockedLayer<K, V>>,
389 merge_fn: merge::MergeFn<K, V>,
390 counters: Arc<Mutex<Counters>>,
391}
392
393impl<K: Key + LayerKey + OrdLowerBound, V: Value> LayerSet<K, V> {
394 pub fn sum_len(&self) -> usize {
395 let mut size = 0;
396 for layer in &self.layers {
397 size += layer.len()
398 }
399 size
400 }
401
402 pub fn merger(&self) -> merge::Merger<'_, K, V> {
403 merge::Merger::new(
404 self.layers.iter().map(|x| x.as_ref()),
405 self.merge_fn,
406 self.counters.clone(),
407 )
408 }
409}
410
411impl<K, V> fmt::Debug for LayerSet<K, V> {
412 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
413 fmt.debug_list()
414 .entries(self.layers.iter().map(|l| {
415 if let Some(handle) = l.handle() {
416 format!("{}", handle.object_id())
417 } else {
418 format!("{:?}", Arc::as_ptr(l))
419 }
420 }))
421 .finish()
422 }
423}
424
425#[cfg(test)]
426mod tests {
427 use super::LSMTree;
428 use crate::drop_event::DropEvent;
429 use crate::lsm_tree::cache::{
430 NullCache, ObjectCache, ObjectCachePlaceholder, ObjectCacheResult,
431 };
432 use crate::lsm_tree::merge::{MergeLayerIterator, MergeResult};
433 use crate::lsm_tree::types::{
434 BoxedLayerIterator, FuzzyHash, Item, ItemRef, Key, Layer, LayerIterator, LayerKey,
435 OrdLowerBound, OrdUpperBound, SortByU64, Value,
436 };
437 use crate::lsm_tree::{Query, layers_from_handles};
438 use crate::object_handle::ObjectHandle;
439 use crate::serialized_types::{
440 LATEST_VERSION, Version, Versioned, VersionedLatest, versioned_type,
441 };
442 use crate::testing::fake_object::{FakeObject, FakeObjectHandle};
443 use crate::testing::writer::Writer;
444 use anyhow::{Error, anyhow};
445 use async_trait::async_trait;
446 use fprint::TypeFingerprint;
447 use fuchsia_sync::Mutex;
448 use fxfs_macros::FuzzyHash;
449 use rand::rng;
450 use rand::seq::SliceRandom;
451 use std::hash::Hash;
452 use std::sync::Arc;
453
454 #[derive(
455 Clone,
456 Eq,
457 PartialEq,
458 Debug,
459 Hash,
460 FuzzyHash,
461 serde::Serialize,
462 serde::Deserialize,
463 TypeFingerprint,
464 Versioned,
465 )]
466 struct TestKey(std::ops::Range<u64>);
467
468 versioned_type! { 1.. => TestKey }
469
470 impl SortByU64 for TestKey {
471 fn get_leading_u64(&self) -> u64 {
472 self.0.start
473 }
474 }
475
476 impl LayerKey for TestKey {}
477
478 impl OrdUpperBound for TestKey {
479 fn cmp_upper_bound(&self, other: &TestKey) -> std::cmp::Ordering {
480 self.0.end.cmp(&other.0.end)
481 }
482 }
483
484 impl OrdLowerBound for TestKey {
485 fn cmp_lower_bound(&self, other: &Self) -> std::cmp::Ordering {
486 self.0.start.cmp(&other.0.start)
487 }
488 }
489
490 fn emit_left_merge_fn(
491 _left: &MergeLayerIterator<'_, TestKey, u64>,
492 _right: &MergeLayerIterator<'_, TestKey, u64>,
493 ) -> MergeResult<TestKey, u64> {
494 MergeResult::EmitLeft
495 }
496
497 impl Value for u64 {
498 const DELETED_MARKER: Self = 0;
499 }
500
501 #[fuchsia::test]
502 async fn test_iteration() {
503 let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
504 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), 2)];
505 tree.insert(items[0].clone()).expect("insert error");
506 tree.insert(items[1].clone()).expect("insert error");
507 let layers = tree.layer_set();
508 let mut merger = layers.merger();
509 let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
510 let ItemRef { key, value, .. } = iter.get().expect("missing item");
511 assert_eq!((key, value), (&items[0].key, &items[0].value));
512 iter.advance().await.expect("advance failed");
513 let ItemRef { key, value, .. } = iter.get().expect("missing item");
514 assert_eq!((key, value), (&items[1].key, &items[1].value));
515 iter.advance().await.expect("advance failed");
516 assert!(iter.get().is_none());
517 }
518
519 #[fuchsia::test]
520 async fn test_compact() {
521 let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
522 let items = [
523 Item::new(TestKey(1..1), 1),
524 Item::new(TestKey(2..2), 2),
525 Item::new(TestKey(3..3), 3),
526 Item::new(TestKey(4..4), 4),
527 ];
528 tree.insert(items[0].clone()).expect("insert error");
529 tree.insert(items[1].clone()).expect("insert error");
530 tree.seal();
531 tree.insert(items[2].clone()).expect("insert error");
532 tree.insert(items[3].clone()).expect("insert error");
533 tree.seal();
534 let object = Arc::new(FakeObject::new());
535 let handle = FakeObjectHandle::new(object.clone());
536 {
537 let layer_set = tree.immutable_layer_set();
538 let mut merger = layer_set.merger();
539 let iter = merger.query(Query::FullScan).await.expect("create merger");
540 tree.compact_with_iterator(
541 iter,
542 items.len(),
543 Writer::new(&handle).await,
544 handle.block_size(),
545 )
546 .await
547 .expect("compact failed");
548 }
549 tree.set_layers(layers_from_handles([handle]).await.expect("layers_from_handles failed"));
550 let handle = FakeObjectHandle::new(object.clone());
551 let tree = LSMTree::open(emit_left_merge_fn, [handle], Box::new(NullCache {}))
552 .await
553 .expect("open failed");
554
555 let layers = tree.layer_set();
556 let mut merger = layers.merger();
557 let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
558 for i in 1..5 {
559 let ItemRef { key, value, .. } = iter.get().expect("missing item");
560 assert_eq!((key, value), (&TestKey(i..i), &i));
561 iter.advance().await.expect("advance failed");
562 }
563 assert!(iter.get().is_none());
564 }
565
566 #[fuchsia::test]
567 async fn test_find() {
568 let items = [
569 Item::new(TestKey(1..1), 1),
570 Item::new(TestKey(2..2), 2),
571 Item::new(TestKey(3..3), 3),
572 Item::new(TestKey(4..4), 4),
573 ];
574 let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
575 tree.insert(items[0].clone()).expect("insert error");
576 tree.insert(items[1].clone()).expect("insert error");
577 tree.seal();
578 tree.insert(items[2].clone()).expect("insert error");
579 tree.insert(items[3].clone()).expect("insert error");
580
581 let item = tree.find(&items[1].key).await.expect("find failed").expect("not found");
582 assert_eq!(item, items[1]);
583 assert!(tree.find(&TestKey(100..100)).await.expect("find failed").is_none());
584 }
585
586 #[fuchsia::test]
587 async fn test_find_no_return_deleted_values() {
588 let items = [Item::new(TestKey(1..1), 1), Item::new(TestKey(2..2), u64::DELETED_MARKER)];
589 let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
590 tree.insert(items[0].clone()).expect("insert error");
591 tree.insert(items[1].clone()).expect("insert error");
592
593 let item = tree.find(&items[0].key).await.expect("find failed").expect("not found");
594 assert_eq!(item, items[0]);
595 assert!(tree.find(&items[1].key).await.expect("find failed").is_none());
596 }
597
598 #[fuchsia::test]
599 async fn test_empty_seal() {
600 let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
601 tree.seal();
602 let item = Item::new(TestKey(1..1), 1);
603 tree.insert(item.clone()).expect("insert error");
604 let object = Arc::new(FakeObject::new());
605 let handle = FakeObjectHandle::new(object.clone());
606 {
607 let layer_set = tree.immutable_layer_set();
608 let mut merger = layer_set.merger();
609 let iter = merger.query(Query::FullScan).await.expect("create merger");
610 tree.compact_with_iterator(iter, 0, Writer::new(&handle).await, handle.block_size())
611 .await
612 .expect("compact failed");
613 }
614 tree.set_layers(layers_from_handles([handle]).await.expect("layers_from_handles failed"));
615 let found_item = tree.find(&item.key).await.expect("find failed").expect("not found");
616 assert_eq!(found_item, item);
617 assert!(tree.find(&TestKey(2..2)).await.expect("find failed").is_none());
618 }
619
620 #[fuchsia::test]
621 async fn test_filter() {
622 let items = [
623 Item::new(TestKey(1..1), 1),
624 Item::new(TestKey(2..2), 2),
625 Item::new(TestKey(3..3), 3),
626 Item::new(TestKey(4..4), 4),
627 ];
628 let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
629 tree.insert(items[0].clone()).expect("insert error");
630 tree.insert(items[1].clone()).expect("insert error");
631 tree.insert(items[2].clone()).expect("insert error");
632 tree.insert(items[3].clone()).expect("insert error");
633
634 let layers = tree.layer_set();
635 let mut merger = layers.merger();
636
637 let mut iter = merger
639 .query(Query::FullScan)
640 .await
641 .expect("seek failed")
642 .filter(|item: ItemRef<'_, TestKey, u64>| item.key.0.start % 2 == 0)
643 .await
644 .expect("filter failed");
645
646 assert_eq!(iter.get(), Some(items[1].as_item_ref()));
647 iter.advance().await.expect("advance failed");
648 assert_eq!(iter.get(), Some(items[3].as_item_ref()));
649 iter.advance().await.expect("advance failed");
650 assert!(iter.get().is_none());
651 }
652
653 #[fuchsia::test]
654 async fn test_insert_order_agnostic() {
655 let items = [
656 Item::new(TestKey(1..1), 1),
657 Item::new(TestKey(2..2), 2),
658 Item::new(TestKey(3..3), 3),
659 Item::new(TestKey(4..4), 4),
660 Item::new(TestKey(5..5), 5),
661 Item::new(TestKey(6..6), 6),
662 ];
663 let a = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
664 for item in &items {
665 a.insert(item.clone()).expect("insert error");
666 }
667 let b = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
668 let mut shuffled = items.clone();
669 shuffled.shuffle(&mut rng());
670 for item in &shuffled {
671 b.insert(item.clone()).expect("insert error");
672 }
673 let layers = a.layer_set();
674 let mut merger = layers.merger();
675 let mut iter_a = merger.query(Query::FullScan).await.expect("seek failed");
676 let layers = b.layer_set();
677 let mut merger = layers.merger();
678 let mut iter_b = merger.query(Query::FullScan).await.expect("seek failed");
679
680 for item in items {
681 assert_eq!(Some(item.as_item_ref()), iter_a.get());
682 assert_eq!(Some(item.as_item_ref()), iter_b.get());
683 iter_a.advance().await.expect("advance failed");
684 iter_b.advance().await.expect("advance failed");
685 }
686 assert!(iter_a.get().is_none());
687 assert!(iter_b.get().is_none());
688 }
689
690 struct AuditCacheInner<'a, V: Value> {
691 lookups: u64,
692 completions: u64,
693 invalidations: u64,
694 drops: u64,
695 result: Option<ObjectCacheResult<'a, V>>,
696 }
697
698 impl<V: Value> AuditCacheInner<'_, V> {
699 fn stats(&self) -> (u64, u64, u64, u64) {
700 (self.lookups, self.completions, self.invalidations, self.drops)
701 }
702 }
703
704 struct AuditCache<'a, V: Value> {
705 inner: Arc<Mutex<AuditCacheInner<'a, V>>>,
706 }
707
708 impl<V: Value> AuditCache<'_, V> {
709 fn new() -> Self {
710 Self {
711 inner: Arc::new(Mutex::new(AuditCacheInner {
712 lookups: 0,
713 completions: 0,
714 invalidations: 0,
715 drops: 0,
716 result: None,
717 })),
718 }
719 }
720 }
721
722 struct AuditPlaceholder<'a, V: Value> {
723 inner: Arc<Mutex<AuditCacheInner<'a, V>>>,
724 completed: Mutex<bool>,
725 }
726
727 impl<V: Value> ObjectCachePlaceholder<V> for AuditPlaceholder<'_, V> {
728 fn complete(self: Box<Self>, _: Option<&V>) {
729 self.inner.lock().completions += 1;
730 *self.completed.lock() = true;
731 }
732 }
733
734 impl<V: Value> Drop for AuditPlaceholder<'_, V> {
735 fn drop(&mut self) {
736 if !*self.completed.lock() {
737 self.inner.lock().drops += 1;
738 }
739 }
740 }
741
742 impl<K: Key + std::cmp::PartialEq, V: Value> ObjectCache<K, V> for AuditCache<'_, V> {
743 fn lookup_or_reserve(&self, _key: &K) -> ObjectCacheResult<'_, V> {
744 {
745 let mut inner = self.inner.lock();
746 inner.lookups += 1;
747 if inner.result.is_some() {
748 return std::mem::take(&mut inner.result).unwrap();
749 }
750 }
751 ObjectCacheResult::Placeholder(Box::new(AuditPlaceholder {
752 inner: self.inner.clone(),
753 completed: Mutex::new(false),
754 }))
755 }
756
757 fn invalidate(&self, _key: K, _value: Option<V>) {
758 self.inner.lock().invalidations += 1;
759 }
760 }
761
762 #[fuchsia::test]
763 async fn test_cache_handling() {
764 let item = Item::new(TestKey(1..1), 1);
765 let cache = Box::new(AuditCache::new());
766 let inner = cache.inner.clone();
767 let a = LSMTree::new(emit_left_merge_fn, cache);
768
769 assert_eq!(inner.lock().stats(), (0, 0, 0, 0));
771
772 assert!(a.find(&item.key).await.expect("Failed find").is_none());
774 assert_eq!(inner.lock().stats(), (1, 0, 0, 1));
775
776 let _ = a.insert(item.clone());
778 assert_eq!(inner.lock().stats(), (1, 0, 1, 1));
779
780 assert_eq!(
782 a.find(&item.key).await.expect("Failed find").expect("Item should be found.").value,
783 item.value
784 );
785 assert_eq!(inner.lock().stats(), (2, 1, 1, 1));
786
787 a.replace_or_insert(item.clone());
789 assert_eq!(inner.lock().stats(), (2, 1, 2, 1));
790 }
791
792 #[fuchsia::test]
793 async fn test_cache_hit() {
794 let item = Item::new(TestKey(1..1), 1);
795 let cache = Box::new(AuditCache::new());
796 let inner = cache.inner.clone();
797 let a = LSMTree::new(emit_left_merge_fn, cache);
798
799 assert_eq!(inner.lock().stats(), (0, 0, 0, 0));
801
802 let _ = a.insert(item.clone());
804 assert_eq!(inner.lock().stats(), (0, 0, 1, 0));
805
806 inner.lock().result = Some(ObjectCacheResult::Value(item.value.clone()));
808
809 assert_eq!(
811 a.find(&item.key).await.expect("Failed find").expect("Item should be found.").value,
812 item.value
813 );
814 assert_eq!(inner.lock().stats(), (1, 0, 1, 0));
815 }
816
817 #[fuchsia::test]
818 async fn test_cache_says_uncacheable() {
819 let item = Item::new(TestKey(1..1), 1);
820 let cache = Box::new(AuditCache::new());
821 let inner = cache.inner.clone();
822 let a = LSMTree::new(emit_left_merge_fn, cache);
823 let _ = a.insert(item.clone());
824
825 assert_eq!(inner.lock().stats(), (0, 0, 1, 0));
827
828 inner.lock().result = Some(ObjectCacheResult::NoCache);
830
831 assert_eq!(
833 a.find(&item.key).await.expect("Failed find").expect("Should find item").value,
834 item.value
835 );
836 assert_eq!(inner.lock().stats(), (1, 0, 1, 0));
837 }
838
839 struct FailLayer {
840 drop_event: Mutex<Option<Arc<DropEvent>>>,
841 }
842
843 impl FailLayer {
844 fn new() -> Self {
845 Self { drop_event: Mutex::new(Some(Arc::new(DropEvent::new()))) }
846 }
847 }
848
849 #[async_trait]
850 impl<K: Key, V: Value> Layer<K, V> for FailLayer {
851 async fn seek(
852 &self,
853 _bound: std::ops::Bound<&K>,
854 ) -> Result<BoxedLayerIterator<'_, K, V>, Error> {
855 Err(anyhow!("Purposely failed seek"))
856 }
857
858 fn lock(&self) -> Option<Arc<DropEvent>> {
859 self.drop_event.lock().clone()
860 }
861
862 fn len(&self) -> usize {
863 0
864 }
865
866 async fn close(&self) {
867 let listener = match std::mem::replace(&mut (*self.drop_event.lock()), None) {
868 Some(drop_event) => drop_event.listen(),
869 None => return,
870 };
871 listener.await;
872 }
873
874 fn get_version(&self) -> Version {
875 LATEST_VERSION
876 }
877 }
878
879 #[fuchsia::test]
880 async fn test_failed_lookup() {
881 let cache = Box::new(AuditCache::new());
882 let inner = cache.inner.clone();
883 let a = LSMTree::new(emit_left_merge_fn, cache);
884 a.set_layers(vec![Arc::new(FailLayer::new())]);
885
886 assert_eq!(inner.lock().stats(), (0, 0, 0, 0));
888
889 assert!(a.find(&TestKey(1..1)).await.is_err());
891 assert_eq!(inner.lock().stats(), (1, 0, 0, 1));
892 }
893}
894
895#[cfg(fuzz)]
896mod fuzz {
897 use crate::lsm_tree::types::{
898 FuzzyHash, Item, LayerKey, OrdLowerBound, OrdUpperBound, SortByU64, Value,
899 };
900 use crate::serialized_types::{
901 LATEST_VERSION, Version, Versioned, VersionedLatest, versioned_type,
902 };
903 use arbitrary::Arbitrary;
904 use fprint::TypeFingerprint;
905 use fuzz::fuzz;
906 use fxfs_macros::FuzzyHash;
907 use std::hash::Hash;
908
909 #[derive(
910 Arbitrary,
911 Clone,
912 Eq,
913 Hash,
914 FuzzyHash,
915 PartialEq,
916 Debug,
917 serde::Serialize,
918 serde::Deserialize,
919 TypeFingerprint,
920 Versioned,
921 )]
922 struct TestKey(std::ops::Range<u64>);
923
924 versioned_type! { 1.. => TestKey }
925
926 impl Versioned for u64 {}
927 versioned_type! { 1.. => u64 }
928
929 impl LayerKey for TestKey {}
930
931 impl SortByU64 for TestKey {
932 fn get_leading_u64(&self) -> u64 {
933 self.0.start
934 }
935 }
936
937 impl OrdUpperBound for TestKey {
938 fn cmp_upper_bound(&self, other: &TestKey) -> std::cmp::Ordering {
939 self.0.end.cmp(&other.0.end)
940 }
941 }
942
943 impl OrdLowerBound for TestKey {
944 fn cmp_lower_bound(&self, other: &Self) -> std::cmp::Ordering {
945 self.0.start.cmp(&other.0.start)
946 }
947 }
948
949 impl Value for u64 {
950 const DELETED_MARKER: Self = 0;
951 }
952
953 #[allow(dead_code)]
957 #[derive(Arbitrary)]
958 enum FuzzAction {
959 Insert(Item<TestKey, u64>),
960 ReplaceOrInsert(Item<TestKey, u64>),
961 MergeInto(Item<TestKey, u64>, TestKey),
962 Find(TestKey),
963 Seal,
964 }
965
966 #[fuzz]
967 fn fuzz_lsm_tree_actions(actions: Vec<FuzzAction>) {
968 use super::LSMTree;
969 use super::cache::NullCache;
970 use crate::lsm_tree::merge::{MergeLayerIterator, MergeResult};
971 use futures::executor::block_on;
972
973 fn emit_left_merge_fn(
974 _left: &MergeLayerIterator<'_, TestKey, u64>,
975 _right: &MergeLayerIterator<'_, TestKey, u64>,
976 ) -> MergeResult<TestKey, u64> {
977 MergeResult::EmitLeft
978 }
979
980 let tree = LSMTree::new(emit_left_merge_fn, Box::new(NullCache {}));
981 for action in actions {
982 match action {
983 FuzzAction::Insert(item) => {
984 let _ = tree.insert(item);
985 }
986 FuzzAction::ReplaceOrInsert(item) => {
987 tree.replace_or_insert(item);
988 }
989 FuzzAction::Find(key) => {
990 block_on(tree.find(&key)).expect("find failed");
991 }
992 FuzzAction::MergeInto(item, bound) => tree.merge_into(item, &bound),
993 FuzzAction::Seal => tree.seal(),
994 };
995 }
996 }
997}