1use crate::drop_event::DropEvent;
9use crate::log::*;
10use crate::lsm_tree::merge::{self, MergeFn};
11use crate::lsm_tree::types::{
12 BoxedLayerIterator, Item, ItemRef, Key, Layer, LayerIterator, LayerIteratorMut, LayerValue,
13 OrdLowerBound, OrdUpperBound,
14};
15use crate::serialized_types::{LATEST_VERSION, Version};
16use anyhow::{Error, bail};
17use async_trait::async_trait;
18use fuchsia_sync::{Mutex, MutexGuard};
19use std::cmp::{Ordering, min};
20use std::collections::BTreeMap;
21use std::ops::Bound;
22use std::ptr::NonNull;
23use std::sync::Arc;
24use std::sync::atomic::{self, AtomicPtr, AtomicU32};
25
26struct PointerList<K, V>(Box<[AtomicPtr<SkipListNode<K, V>>]>);
30
31impl<K, V> PointerList<K, V> {
32 fn new(count: usize) -> PointerList<K, V> {
33 let mut pointers = Vec::new();
34 for _ in 0..count {
35 pointers.push(AtomicPtr::new(std::ptr::null_mut()));
36 }
37 PointerList(pointers.into_boxed_slice())
38 }
39
40 fn len(&self) -> usize {
41 self.0.len()
42 }
43
44 fn get(&self, index: usize) -> Option<NonNull<SkipListNode<K, V>>> {
46 NonNull::new(self.0[index].load(atomic::Ordering::SeqCst))
47 }
48
49 fn set(&self, index: usize, node: Option<NonNull<SkipListNode<K, V>>>) {
51 self.0[index]
52 .store(node.map_or(std::ptr::null_mut(), |n| n.as_ptr()), atomic::Ordering::SeqCst);
53 }
54}
55
56struct SkipListNode<K, V> {
57 item: Item<K, V>,
58 pointers: PointerList<K, V>,
59}
60
61pub struct SkipListLayer<K, V> {
62 pointers: PointerList<K, V>,
64
65 inner: Mutex<Inner<K, V>>,
66
67 write_lock: Mutex<()>,
69
70 allocated: AtomicU32,
72
73 close_event: Mutex<Option<Arc<DropEvent>>>,
74}
75
76struct Inner<K, V> {
82 epoch: u64,
86
87 current_count: u16,
89
90 erase_lists: BTreeMap<u64, EpochEraseList<K, V>>,
92
93 item_count: usize,
95}
96
97struct EpochEraseList<K, V> {
102 count: u16,
105 start: NonNull<SkipListNode<K, V>>,
108 end: Option<NonNull<SkipListNode<K, V>>>,
109}
110
111unsafe impl<K, V> Send for Inner<K, V> {}
113
114impl<K, V> Inner<K, V> {
115 fn new() -> Self {
116 Inner { epoch: 0, current_count: 0, erase_lists: BTreeMap::new(), item_count: 0 }
117 }
118 fn free_erase_list(
119 &mut self,
120 owner: &SkipListLayer<K, V>,
121 start: NonNull<SkipListNode<K, V>>,
122 end: Option<NonNull<SkipListNode<K, V>>>,
123 ) {
124 let mut node = start;
125 loop {
126 let next = unsafe { owner.free_node(node) };
128 if next == end {
129 break;
130 }
131 node = next.unwrap();
132 }
133 }
134}
135
136impl<K, V> SkipListLayer<K, V> {
137 pub fn new(max_item_count: usize) -> Arc<SkipListLayer<K, V>> {
138 Arc::new(SkipListLayer {
139 pointers: PointerList::new((max_item_count as f32).log2() as usize + 1),
140 inner: Mutex::new(Inner::new()),
141 write_lock: Mutex::new(()),
142 allocated: AtomicU32::new(0),
143 close_event: Mutex::new(Some(Arc::new(DropEvent::new()))),
144 })
145 }
146
147 pub fn len(&self) -> usize {
148 self.inner.lock().item_count
149 }
150
151 fn alloc_node(&self, item: Item<K, V>, pointer_count: usize) -> Box<SkipListNode<K, V>> {
152 self.allocated.fetch_add(1, atomic::Ordering::Relaxed);
153 Box::new(SkipListNode { item, pointers: PointerList::new(pointer_count) })
154 }
155
156 unsafe fn free_node(
162 &self,
163 node: NonNull<SkipListNode<K, V>>,
164 ) -> Option<NonNull<SkipListNode<K, V>>> {
165 self.allocated.fetch_sub(1, atomic::Ordering::Relaxed);
166 unsafe { Box::from_raw(node.as_ptr()).pointers.get(0) }
167 }
168}
169
170impl<K: Eq + Key + OrdLowerBound, V: LayerValue> SkipListLayer<K, V> {
171 pub fn erase(&self, key: &K)
173 where
174 K: std::cmp::Eq,
175 {
176 let mut iter = SkipListLayerIterMut::new(self, Bound::Included(key));
177 if let Some(ItemRef { key: k, .. }) = iter.get() {
178 if k == key {
179 iter.erase();
180 } else {
181 warn!("Attempt to erase key not present!");
182 }
183 }
184 iter.commit();
185 }
186
187 pub fn insert(&self, item: Item<K, V>) -> Result<(), Error> {
189 let mut iter = SkipListLayerIterMut::new(self, Bound::Included(&item.key));
190 if let Some(found_item) = iter.get() {
191 if found_item.key == &item.key {
192 bail!("Attempted to insert an existing key");
193 }
194 }
195 iter.insert(item);
196 Ok(())
197 }
198
199 pub fn replace_or_insert(&self, item: Item<K, V>) {
201 let mut iter = SkipListLayerIterMut::new(self, Bound::Included(&item.key));
202 if let Some(found_item) = iter.get() {
203 if found_item.key == &item.key {
204 iter.erase();
205 }
206 }
207 iter.insert(item);
208 }
209
210 pub fn merge_into(&self, item: Item<K, V>, lower_bound: &K, merge_fn: MergeFn<K, V>) {
212 merge::merge_into(
213 Box::new(SkipListLayerIterMut::new(self, Bound::Included(lower_bound))),
214 item,
215 merge_fn,
216 )
217 .unwrap();
218 }
219}
220
221impl<K, V> Drop for SkipListLayer<K, V> {
223 fn drop(&mut self) {
224 let mut next = self.pointers.get(0);
225 while let Some(node) = next {
226 next = unsafe { self.free_node(node) };
228 }
229 assert_eq!(self.allocated.load(atomic::Ordering::Relaxed), 0);
230 }
231}
232
233#[async_trait]
234impl<K: Key, V: LayerValue> Layer<K, V> for SkipListLayer<K, V> {
235 async fn seek<'a>(
236 &'a self,
237 bound: std::ops::Bound<&K>,
238 ) -> Result<BoxedLayerIterator<'a, K, V>, Error> {
239 Ok(Box::new(SkipListLayerIter::new(self, bound)))
240 }
241
242 fn lock(&self) -> Option<Arc<DropEvent>> {
243 self.close_event.lock().clone()
244 }
245
246 fn len(&self) -> usize {
247 self.inner.lock().item_count
248 }
249
250 async fn close(&self) {
251 let listener = self.close_event.lock().take().expect("close already called").listen();
252 listener.await;
253 }
254
255 fn get_version(&self) -> Version {
256 return LATEST_VERSION;
259 }
260
261 fn record_inspect_data(self: Arc<Self>, node: &fuchsia_inspect::Node) {
262 node.record_bool("persistent", false);
263 node.record_uint("num_items", self.inner.lock().item_count as u64);
264 }
265}
266
267struct SkipListLayerIter<'a, K, V> {
270 skip_list: &'a SkipListLayer<K, V>,
271
272 epoch: u64,
274
275 node: Option<NonNull<SkipListNode<K, V>>>,
277}
278
279unsafe impl<K, V> Send for SkipListLayerIter<'_, K, V> {}
281unsafe impl<K, V> Sync for SkipListLayerIter<'_, K, V> {}
282
283impl<'a, K: OrdUpperBound, V> SkipListLayerIter<'a, K, V> {
284 fn new(skip_list: &'a SkipListLayer<K, V>, bound: Bound<&K>) -> Self {
285 let epoch = {
286 let mut inner = skip_list.inner.lock();
287 inner.current_count += 1;
288 inner.epoch
289 };
290 let (included, key) = match bound {
291 Bound::Unbounded => {
292 return SkipListLayerIter { skip_list, epoch, node: skip_list.pointers.get(0) };
293 }
294 Bound::Included(key) => (true, key),
295 Bound::Excluded(key) => (false, key),
296 };
297 let mut last_pointers = &skip_list.pointers;
298
299 let mut node = None;
303 for index in (0..skip_list.pointers.len()).rev() {
304 loop {
306 node = last_pointers.get(index);
307 if let Some(node) = node {
308 let node = unsafe { node.as_ref() };
310 match &node.item.key.cmp_upper_bound(key) {
311 Ordering::Equal if included => break,
312 Ordering::Greater => break,
313 _ => {}
314 }
315 last_pointers = &node.pointers;
316 } else {
317 break;
318 }
319 }
320 }
321 SkipListLayerIter { skip_list, epoch, node }
322 }
323}
324
325impl<K, V> Drop for SkipListLayerIter<'_, K, V> {
326 fn drop(&mut self) {
327 let mut inner = self.skip_list.inner.lock();
328 if self.epoch == inner.epoch {
329 inner.current_count -= 1;
330 } else {
331 if let Some(erase_list) = inner.erase_lists.get_mut(&self.epoch) {
332 erase_list.count -= 1;
333 if erase_list.count == 0 {
334 while let Some(entry) = inner.erase_lists.first_entry() {
335 if entry.get().count == 0 {
336 let EpochEraseList { start, end, .. } = entry.remove_entry().1;
337 inner.free_erase_list(self.skip_list, start, end);
338 } else {
339 break;
340 }
341 }
342 }
343 }
344 }
345 }
346}
347
348#[async_trait]
349impl<K: Key, V: LayerValue> LayerIterator<K, V> for SkipListLayerIter<'_, K, V> {
350 async fn advance(&mut self) -> Result<(), Error> {
351 match self.node {
352 None => {}
353 Some(node) => {
354 self.node = {
355 unsafe { node.as_ref() }.pointers.get(0)
357 }
358 }
359 }
360 Ok(())
361 }
362
363 fn get(&self) -> Option<ItemRef<'_, K, V>> {
364 self.node.map(|node| unsafe { node.as_ref() }.item.as_item_ref())
366 }
367}
368
369type PointerListRefArray<'a, K, V> = Box<[&'a PointerList<K, V>]>;
370
371pub struct SkipListLayerIterMut<'a, K: Key, V: LayerValue> {
379 skip_list: &'a SkipListLayer<K, V>,
380
381 prev_pointers: PointerListRefArray<'a, K, V>,
385
386 insertion_point: Option<PointerListRefArray<'a, K, V>>,
389
390 insertion_nodes: PointerList<K, V>,
392
393 #[allow(dead_code)]
396 write_guard: MutexGuard<'a, ()>,
397
398 item_delta: isize,
400}
401
402impl<'a, K: Key, V: LayerValue> SkipListLayerIterMut<'a, K, V> {
403 pub fn new(skip_list: &'a SkipListLayer<K, V>, bound: std::ops::Bound<&K>) -> Self {
404 let write_guard = skip_list.write_lock.lock();
405 let len = skip_list.pointers.len();
406
407 let mut prev_pointers = vec![&skip_list.pointers; len].into_boxed_slice();
423 match bound {
424 Bound::Unbounded => {}
425 Bound::Included(key) => {
426 let pointers = &mut prev_pointers;
427 for index in (0..len).rev() {
428 while let Some(node) = pointers[index].get(index) {
429 let node = unsafe { node.as_ref() };
435
436 match node.item.key.cmp_upper_bound(key) {
437 Ordering::Equal | Ordering::Greater => break,
438 Ordering::Less => {}
439 }
440 pointers[index] = &node.pointers;
441 }
442 if index > 0 {
443 pointers[index - 1] = pointers[index];
444 }
445 }
446 }
447 Bound::Excluded(_) => panic!("Excluded bounds not supported"),
448 }
449 SkipListLayerIterMut {
450 skip_list,
451 prev_pointers,
452 insertion_point: None,
453 insertion_nodes: PointerList::new(len),
454 write_guard,
455 item_delta: 0,
456 }
457 }
458}
459
460impl<K: Key, V: LayerValue> Drop for SkipListLayerIterMut<'_, K, V> {
461 fn drop(&mut self) {
462 self.commit();
463 }
464}
465
466impl<K: Key, V: LayerValue> LayerIteratorMut<K, V> for SkipListLayerIterMut<'_, K, V> {
467 fn advance(&mut self) {
468 if self.insertion_point.is_some() {
469 if let Some(item) = self.get() {
470 let copy = item.cloned();
472 self.insert(copy);
473 self.erase();
474 }
475 } else {
476 let pointers = &mut self.prev_pointers;
477 if let Some(next) = pointers[0].get(0) {
478 let next = unsafe { next.as_ref() };
481 for i in 0..next.pointers.len() {
482 pointers[i] = &next.pointers;
483 }
484 }
485 }
486 }
487
488 fn get(&self) -> Option<ItemRef<'_, K, V>> {
489 self.prev_pointers[0].get(0).map(|node| unsafe { node.as_ref() }.item.as_item_ref())
492 }
493
494 fn insert(&mut self, item: Item<K, V>) {
495 use rand::Rng;
496 let mut rng = rand::rng();
497 let max_pointers = self.skip_list.pointers.len();
498 let pointer_count = max_pointers
501 - min(
502 (rng.random_range(0..2u32.pow(max_pointers as u32) - 1) as f32).log2() as usize,
503 max_pointers - 1,
504 );
505 let node = Box::leak(self.skip_list.alloc_node(item, pointer_count));
506 if self.insertion_point.is_none() {
507 self.insertion_point = Some(self.prev_pointers.clone());
508 }
509 let node_ptr = node.into();
510 for i in 0..pointer_count {
511 let pointers = self.prev_pointers[i];
512 node.pointers.set(i, pointers.get(i));
513 if self.insertion_nodes.get(i).is_none() {
514 self.insertion_nodes.set(i, Some(node_ptr));
517 } else {
518 pointers.set(i, Some(node_ptr));
521 }
522 self.prev_pointers[i] = &node.pointers;
524 }
525 self.item_delta += 1;
526 }
527
528 fn erase(&mut self) {
529 let pointers = &mut self.prev_pointers;
530 if let Some(next) = pointers[0].get(0) {
531 let next = unsafe { next.as_ref() };
534 if self.insertion_point.is_none() {
535 self.insertion_point = Some(pointers.clone());
536 }
537 if self.insertion_nodes.get(0).is_none() {
538 pointers[0] = &next.pointers;
541 } else {
542 pointers[0].set(0, next.pointers.get(0));
547 }
548 for i in 1..next.pointers.len() {
551 pointers[i].set(i, next.pointers.get(i));
552 }
553 }
554 self.item_delta -= 1;
555 }
556
557 fn commit(&mut self) {
560 let prev_pointers = match self.insertion_point.take() {
562 Some(prev_pointers) => prev_pointers,
563 None => return,
564 };
565
566 let maybe_erase = prev_pointers[0].get(0);
568
569 if self.insertion_nodes.get(0).is_none() {
571 prev_pointers[0].set(0, self.prev_pointers[0].get(0));
575 } else {
576 for i in 0..self.insertion_nodes.len() {
580 if let Some(node) = self.insertion_nodes.get(i) {
581 prev_pointers[i].set(i, Some(node));
582 }
583 }
584 }
585
586 let mut inner = self.skip_list.inner.lock();
588 inner.item_count = inner.item_count.checked_add_signed(self.item_delta).unwrap();
589 if let Some(start) = maybe_erase {
590 let end = self.prev_pointers[0].get(0);
591 if maybe_erase != end {
592 if inner.current_count > 0 || !inner.erase_lists.is_empty() {
593 let count = std::mem::take(&mut inner.current_count);
594 let epoch = inner.epoch;
595 inner.erase_lists.insert(epoch, EpochEraseList { count, start, end });
596 inner.epoch = inner.epoch.wrapping_add(1);
597 } else {
598 inner.free_erase_list(self.skip_list, start, end);
599 }
600 }
601 }
602 }
603}
604
605#[cfg(test)]
606mod tests {
607 use super::{SkipListLayer, SkipListLayerIterMut};
608 use crate::lsm_tree::merge::ItemOp::{Discard, Replace};
609 use crate::lsm_tree::merge::{MergeLayerIterator, MergeResult};
610 use crate::lsm_tree::skip_list_layer::SkipListLayerIter;
611 use crate::lsm_tree::types::{
612 DefaultOrdLowerBound, DefaultOrdUpperBound, FuzzyHash, Item, ItemRef, Layer, LayerIterator,
613 LayerIteratorMut, SortByU64,
614 };
615 use crate::serialized_types::{
616 LATEST_VERSION, Version, Versioned, VersionedLatest, versioned_type,
617 };
618 use assert_matches::assert_matches;
619 use fprint::TypeFingerprint;
620 use fuchsia_async as fasync;
621 use futures::future::join_all;
622 use futures::{FutureExt as _, join};
623 use fxfs_macros::FuzzyHash;
624 use std::hash::Hash;
625 use std::ops::Bound;
626 use std::time::{Duration, Instant};
627
628 #[derive(
629 Clone,
630 Eq,
631 Debug,
632 Hash,
633 FuzzyHash,
634 PartialEq,
635 PartialOrd,
636 Ord,
637 serde::Serialize,
638 serde::Deserialize,
639 TypeFingerprint,
640 Versioned,
641 )]
642 struct TestKey(u64);
643
644 versioned_type! { 1.. => TestKey }
645
646 impl SortByU64 for TestKey {
647 fn get_leading_u64(&self) -> u64 {
648 self.0
649 }
650 }
651
652 impl DefaultOrdLowerBound for TestKey {}
653 impl DefaultOrdUpperBound for TestKey {}
654
655 #[fuchsia::test]
656 async fn test_iteration() {
657 let skip_list = SkipListLayer::new(100);
659 let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2)];
660 skip_list.insert(items[1].clone()).expect("insert error");
661 skip_list.insert(items[0].clone()).expect("insert error");
662 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
663 let ItemRef { key, value, .. } = iter.get().expect("missing item");
664 assert_eq!((key, value), (&items[0].key, &items[0].value));
665 iter.advance().await.unwrap();
666 let ItemRef { key, value, .. } = iter.get().expect("missing item");
667 assert_eq!((key, value), (&items[1].key, &items[1].value));
668 iter.advance().await.unwrap();
669 assert!(iter.get().is_none());
670 }
671
672 #[fuchsia::test]
673 async fn test_seek_exact() {
674 let skip_list = SkipListLayer::new(100);
676 for i in (0..100).rev() {
677 skip_list.insert(Item::new(TestKey(i), i)).expect("insert error");
678 }
679 let mut iter = skip_list.seek(Bound::Included(&TestKey(57))).await.unwrap();
680 let ItemRef { key, value, .. } = iter.get().expect("missing item");
681 assert_eq!((key, value), (&TestKey(57), &57));
682
683 iter.advance().await.unwrap();
685 let ItemRef { key, value, .. } = iter.get().expect("missing item");
686 assert_eq!((key, value), (&TestKey(58), &58));
687 }
688
689 #[fuchsia::test]
690 async fn test_seek_lower_bound() {
691 let skip_list = SkipListLayer::new(100);
693 for i in (0..100).rev() {
694 skip_list.insert(Item::new(TestKey(i * 3), i * 3)).expect("insert error");
695 }
696 let mut expected_index = 57 * 3;
697 let mut iter = skip_list.seek(Bound::Included(&TestKey(expected_index - 1))).await.unwrap();
698 let ItemRef { key, value, .. } = iter.get().expect("missing item");
699 assert_eq!((key, value), (&TestKey(expected_index), &expected_index));
700
701 expected_index += 3;
703 iter.advance().await.unwrap();
704 let ItemRef { key, value, .. } = iter.get().expect("missing item");
705 assert_eq!((key, value), (&TestKey(expected_index), &expected_index));
706 }
707
708 #[fuchsia::test]
709 async fn test_replace_or_insert_replaces() {
710 let skip_list = SkipListLayer::new(100);
711 let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2)];
712 skip_list.insert(items[1].clone()).expect("insert error");
713 skip_list.insert(items[0].clone()).expect("insert error");
714 let replacement_value = 3;
715 skip_list.replace_or_insert(Item::new(items[1].key.clone(), replacement_value));
716
717 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
718 let ItemRef { key, value, .. } = iter.get().expect("missing item");
719 assert_eq!((key, value), (&items[0].key, &items[0].value));
720 iter.advance().await.unwrap();
721 let ItemRef { key, value, .. } = iter.get().expect("missing item");
722 assert_eq!((key, value), (&items[1].key, &replacement_value));
723 iter.advance().await.unwrap();
724 assert!(iter.get().is_none());
725 }
726
727 #[fuchsia::test]
728 async fn test_replace_or_insert_inserts() {
729 let skip_list = SkipListLayer::new(100);
730 let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2), Item::new(TestKey(3), 3)];
731 skip_list.insert(items[2].clone()).expect("insert error");
732 skip_list.insert(items[0].clone()).expect("insert error");
733 skip_list.replace_or_insert(items[1].clone());
734
735 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
736 let ItemRef { key, value, .. } = iter.get().expect("missing item");
737 assert_eq!((key, value), (&items[0].key, &items[0].value));
738 iter.advance().await.unwrap();
739 let ItemRef { key, value, .. } = iter.get().expect("missing item");
740 assert_eq!((key, value), (&items[1].key, &items[1].value));
741 iter.advance().await.unwrap();
742 let ItemRef { key, value, .. } = iter.get().expect("missing item");
743 assert_eq!((key, value), (&items[2].key, &items[2].value));
744 iter.advance().await.unwrap();
745 assert!(iter.get().is_none());
746 }
747
748 #[fuchsia::test]
749 async fn test_erase() {
750 let skip_list = SkipListLayer::new(100);
751 let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2)];
752 skip_list.insert(items[1].clone()).expect("insert error");
753 skip_list.insert(items[0].clone()).expect("insert error");
754
755 assert_eq!(skip_list.len(), 2);
756
757 skip_list.erase(&items[1].key);
758
759 assert_eq!(skip_list.len(), 1);
760
761 {
762 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
763 let ItemRef { key, value, .. } = iter.get().expect("missing item");
764 assert_eq!((key, value), (&items[0].key, &items[0].value));
765 iter.advance().await.unwrap();
766 assert!(iter.get().is_none());
767 }
768
769 skip_list.erase(&items[0].key);
770
771 assert_eq!(skip_list.len(), 0);
772
773 {
774 let iter = skip_list.seek(Bound::Unbounded).await.unwrap();
775 assert!(iter.get().is_none());
776 }
777 }
778
779 #[fuchsia::test]
782 #[ignore]
783 async fn test_seek_is_log_n_complexity() {
784 let mut n = 100;
787 let mut loops = 0;
788 const TARGET_TIME: Duration = Duration::from_millis(500);
789 let time = loop {
790 let skip_list = SkipListLayer::new(n as usize);
791 for i in 0..n {
792 skip_list.insert(Item::new(TestKey(i), i)).expect("insert error");
793 }
794 let start = Instant::now();
795 for i in 0..n {
796 skip_list.seek(Bound::Included(&TestKey(i))).await.unwrap();
797 }
798 let elapsed = Instant::now() - start;
799 if elapsed > TARGET_TIME {
800 break elapsed;
801 }
802 n *= 2;
803 loops += 1;
804 };
805
806 let seek_count = n;
807 n >>= loops / 2; let skip_list = SkipListLayer::new(n as usize);
809 for i in 0..n {
810 skip_list.insert(Item::new(TestKey(i), i)).expect("insert error");
811 }
812 let start = Instant::now();
813 for i in 0..seek_count {
814 skip_list.seek(Bound::Included(&TestKey(i))).await.unwrap();
815 }
816 let elapsed = Instant::now() - start;
817
818 eprintln!(
819 "{} items: {}ms, {} items: {}ms",
820 seek_count,
821 time.as_millis(),
822 n,
823 elapsed.as_millis()
824 );
825
826 assert!(elapsed * 4 > time);
830 }
831
832 #[fuchsia::test]
833 async fn test_large_number_of_items() {
834 let item_count = 1000;
835 let skip_list = SkipListLayer::new(1000);
836 for i in 1..item_count {
837 skip_list.insert(Item::new(TestKey(i), 1)).expect("insert error");
838 }
839 let mut iter = skip_list.seek(Bound::Included(&TestKey(item_count - 10))).await.unwrap();
840 for i in item_count - 10..item_count {
841 assert_eq!(iter.get().expect("missing item").key, &TestKey(i));
842 iter.advance().await.unwrap();
843 }
844 assert!(iter.get().is_none());
845 }
846
847 #[fuchsia::test]
848 async fn test_multiple_readers_allowed() {
849 let skip_list = SkipListLayer::new(100);
850 let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2)];
851 skip_list.insert(items[1].clone()).expect("insert error");
852 skip_list.insert(items[0].clone()).expect("insert error");
853
854 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
856 let ItemRef { key, value, .. } = iter.get().expect("missing item");
857 assert_eq!((key, value), (&items[0].key, &items[0].value));
858
859 let iter2 = skip_list.seek(Bound::Unbounded).await.unwrap();
861 let ItemRef { key, value, .. } = iter2.get().expect("missing item");
862 assert_eq!((key, value), (&items[0].key, &items[0].value));
863
864 iter.advance().await.unwrap();
866 let ItemRef { key, value, .. } = iter.get().expect("missing item");
867 assert_eq!((key, value), (&items[1].key, &items[1].value));
868 }
869
870 fn merge(
871 left: &'_ MergeLayerIterator<'_, TestKey, i32>,
872 right: &'_ MergeLayerIterator<'_, TestKey, i32>,
873 ) -> MergeResult<TestKey, i32> {
874 MergeResult::Other {
875 emit: None,
876 left: Replace(Item::new((*left.key()).clone(), *left.value() + *right.value()).boxed()),
877 right: Discard,
878 }
879 }
880
881 #[fuchsia::test]
882 async fn test_merge_into() {
883 let skip_list = SkipListLayer::new(100);
884 skip_list.insert(Item::new(TestKey(1), 1)).expect("insert error");
885
886 skip_list.merge_into(Item::new(TestKey(2), 2), &TestKey(1), merge);
887
888 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
889 let ItemRef { key, value, .. } = iter.get().expect("missing item");
890 assert_eq!((key, value), (&TestKey(1), &3));
891 iter.advance().await.unwrap();
892 assert!(iter.get().is_none());
893 }
894
895 #[fuchsia::test]
896 async fn test_two_inserts() {
897 let skip_list = SkipListLayer::new(100);
898 let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2)];
899 {
900 let mut iter = SkipListLayerIterMut::new(&skip_list, std::ops::Bound::Unbounded);
901 iter.insert(items[0].clone());
902 iter.insert(items[1].clone());
903 }
904
905 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
906 let ItemRef { key, value, .. } = iter.get().expect("missing item");
907 assert_eq!((key, value), (&items[0].key, &items[0].value));
908 iter.advance().await.unwrap();
909 let ItemRef { key, value, .. } = iter.get().expect("missing item");
910 assert_eq!((key, value), (&items[1].key, &items[1].value));
911 }
912
913 #[fuchsia::test]
914 async fn test_erase_after_insert() {
915 let skip_list = SkipListLayer::new(100);
916 let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2)];
917 skip_list.insert(items[1].clone()).expect("insert error");
918 {
919 let mut iter = SkipListLayerIterMut::new(&skip_list, std::ops::Bound::Unbounded);
920 iter.insert(items[0].clone());
921 iter.erase();
922 }
923
924 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
925 let ItemRef { key, value, .. } = iter.get().expect("missing item");
926 assert_eq!((key, value), (&items[0].key, &items[0].value));
927 iter.advance().await.unwrap();
928 assert!(iter.get().is_none());
929 }
930
931 #[fuchsia::test]
932 async fn test_insert_after_erase() {
933 let skip_list = SkipListLayer::new(100);
934 let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2)];
935 skip_list.insert(items[1].clone()).expect("insert error");
936 {
937 let mut iter = SkipListLayerIterMut::new(&skip_list, std::ops::Bound::Unbounded);
938 iter.erase();
939 iter.insert(items[0].clone());
940 }
941
942 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
943 let ItemRef { key, value, .. } = iter.get().expect("missing item");
944 assert_eq!((key, value), (&items[0].key, &items[0].value));
945 iter.advance().await.unwrap();
946 assert!(iter.get().is_none());
947 }
948
949 #[fuchsia::test]
950 async fn test_insert_erase_insert() {
951 let skip_list = SkipListLayer::new(100);
952 let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2), Item::new(TestKey(3), 3)];
953 skip_list.insert(items[0].clone()).expect("insert error");
954 {
955 let mut iter = SkipListLayerIterMut::new(&skip_list, std::ops::Bound::Unbounded);
956 iter.insert(items[1].clone());
957 iter.erase();
958 iter.insert(items[2].clone());
959 }
960
961 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
962 let ItemRef { key, value, .. } = iter.get().expect("missing item");
963 assert_eq!((key, value), (&items[1].key, &items[1].value));
964 iter.advance().await.unwrap();
965 let ItemRef { key, value, .. } = iter.get().expect("missing item");
966 assert_eq!((key, value), (&items[2].key, &items[2].value));
967 }
968
969 #[fuchsia::test]
970 async fn test_two_erase_erases() {
971 let skip_list = SkipListLayer::new(100);
972 let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2), Item::new(TestKey(3), 3)];
973 skip_list.insert(items[0].clone()).expect("insert error");
974 skip_list.insert(items[1].clone()).expect("insert error");
975 skip_list.insert(items[2].clone()).expect("insert error");
976 {
977 let mut iter = SkipListLayerIterMut::new(&skip_list, std::ops::Bound::Unbounded);
978 iter.erase();
979 iter.erase();
980 }
981
982 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
983 let ItemRef { key, value, .. } = iter.get().expect("missing item");
984 assert_eq!((key, value), (&items[2].key, &items[2].value));
985 iter.advance().await.unwrap();
986 assert!(iter.get().is_none());
987 }
988
989 #[fuchsia::test]
990 async fn test_readers_not_blocked_by_writers() {
991 let skip_list = SkipListLayer::new(100);
992 let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2)];
993 skip_list.insert(items[1].clone()).expect("insert error");
994
995 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
996 let ItemRef { key, value, .. } = iter.get().expect("missing item");
997 assert_eq!((key, value), (&items[1].key, &items[1].value));
998
999 let mut iter2 = skip_list.seek(Bound::Unbounded).await.unwrap();
1000 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1001 assert_eq!((key, value), (&items[1].key, &items[1].value));
1002
1003 join!(async { skip_list.insert(items[0].clone()).expect("insert error") }, async {
1004 loop {
1005 let iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1006 let ItemRef { key, .. } = iter.get().expect("missing item");
1007 if key == &items[0].key {
1008 break;
1009 }
1010 }
1011 iter.advance().await.unwrap();
1012 assert!(iter.get().is_none());
1013 std::mem::drop(iter);
1014 iter2.advance().await.unwrap();
1015 assert!(iter2.get().is_none());
1016 std::mem::drop(iter2);
1017 });
1018 }
1019
1020 #[fuchsia::test(threads = 20)]
1021 async fn test_many_readers_and_writers() {
1022 let skip_list = SkipListLayer::new(100);
1023 join_all(
1024 (0..10)
1025 .map(|i| {
1026 let skip_list_clone = skip_list.clone();
1027 fasync::Task::spawn(async move {
1028 for j in 0..10 {
1029 skip_list_clone
1030 .insert(Item::new(TestKey(i * 100 + j), i))
1031 .expect("insert error");
1032 }
1033 })
1034 })
1035 .chain((0..10).map(|_| {
1036 let skip_list_clone = skip_list.clone();
1037 fasync::Task::spawn(async move {
1038 for _ in 0..300 {
1039 let mut iter =
1040 skip_list_clone.seek(Bound::Unbounded).await.expect("seek failed");
1041 let mut last_item: Option<TestKey> = None;
1042 while let Some(item) = iter.get() {
1043 if let Some(last) = last_item {
1044 assert!(item.key > &last);
1045 }
1046 last_item = Some(item.key.clone());
1047 iter.advance().await.expect("advance failed");
1048 }
1049 }
1050 })
1051 })),
1052 )
1053 .await;
1054 }
1055
1056 #[fuchsia::test]
1057 async fn test_insert_advance_erase() {
1058 let skip_list = SkipListLayer::new(100);
1059 let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2), Item::new(TestKey(3), 3)];
1060 skip_list.insert(items[1].clone()).expect("insert error");
1061 skip_list.insert(items[2].clone()).expect("insert error");
1062
1063 assert_eq!(skip_list.len(), 2);
1064
1065 {
1066 let mut iter = SkipListLayerIterMut::new(&skip_list, std::ops::Bound::Unbounded);
1067 iter.insert(items[0].clone());
1068 iter.advance();
1069 iter.erase();
1070 }
1071
1072 assert_eq!(skip_list.len(), 2);
1073
1074 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1075 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1076 assert_eq!((key, value), (&items[0].key, &items[0].value));
1077 iter.advance().await.unwrap();
1078 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1079 assert_eq!((key, value), (&items[1].key, &items[1].value));
1080 iter.advance().await.unwrap();
1081 assert!(iter.get().is_none());
1082 }
1083
1084 #[fuchsia::test]
1085 async fn test_seek_excluded() {
1086 let skip_list = SkipListLayer::new(100);
1087 let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2)];
1088 skip_list.insert(items[0].clone()).expect("insert error");
1089 skip_list.insert(items[1].clone()).expect("insert error");
1090 let iter = skip_list.seek(Bound::Excluded(&items[0].key)).await.expect("seek failed");
1091 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1092 assert_eq!((key, value), (&items[1].key, &items[1].value));
1093 }
1094
1095 #[fuchsia::test]
1096 fn test_insert_race() {
1097 for _ in 0..1000 {
1098 let skip_list = SkipListLayer::new(100);
1099 skip_list.insert(Item::new(TestKey(2), 2)).expect("insert error");
1100
1101 let skip_list_clone = skip_list.clone();
1102 let thread1 = std::thread::spawn(move || {
1103 skip_list_clone.insert(Item::new(TestKey(1), 1)).expect("insert error")
1104 });
1105 let thread2 = std::thread::spawn(move || {
1106 let iter = SkipListLayerIter::new(&skip_list, Bound::Included(&TestKey(2)));
1107 match iter.get() {
1108 Some(ItemRef { key: TestKey(2), .. }) => {}
1109 result => assert!(false, "{:?}", result),
1110 }
1111 });
1112 thread1.join().unwrap();
1113 thread2.join().unwrap();
1114 }
1115 }
1116
1117 #[fuchsia::test]
1118 fn test_replace_or_insert_multi_thread() {
1119 let skip_list = SkipListLayer::new(100);
1120 skip_list.insert(Item::new(TestKey(1), 1)).expect("insert error");
1121 skip_list.insert(Item::new(TestKey(2), 2)).expect("insert error");
1122 skip_list.insert(Item::new(TestKey(3), 3)).expect("insert error");
1123 skip_list.insert(Item::new(TestKey(4), 4)).expect("insert error");
1124
1125 let mut threads = Vec::new();
1127 for i in 0..200 {
1128 let skip_list_clone = skip_list.clone();
1129 threads.push(std::thread::spawn(move || {
1130 skip_list_clone.replace_or_insert(Item::new(TestKey(3), i));
1131 }));
1132 }
1133
1134 let _checker_thread = std::thread::spawn(move || {
1136 loop {
1137 let mut iter = SkipListLayerIter::new(&skip_list, Bound::Included(&TestKey(2)));
1138 assert_matches!(iter.get(), Some(ItemRef { key: TestKey(2), .. }));
1139 iter.advance().now_or_never().unwrap().unwrap();
1140 assert_matches!(iter.get(), Some(ItemRef { key: TestKey(3), .. }));
1141 iter.advance().now_or_never().unwrap().unwrap();
1142 assert_matches!(iter.get(), Some(ItemRef { key: TestKey(4), .. }));
1143 }
1144 });
1145
1146 for thread in threads {
1147 thread.join().unwrap();
1148 }
1149 }
1150}