1use crate::drop_event::DropEvent;
9use crate::log::*;
10use crate::lsm_tree::merge::{self, MergeFn};
11use crate::lsm_tree::types::{
12 BoxedLayerIterator, Existence, Item, ItemRef, Key, Layer, LayerIterator, LayerIteratorMut,
13 LayerValue, OrdLowerBound, OrdUpperBound,
14};
15use crate::serialized_types::{LATEST_VERSION, Version};
16use anyhow::{Error, bail};
17use async_trait::async_trait;
18use fuchsia_sync::{Mutex, MutexGuard};
19use std::cmp::{Ordering, min};
20use std::collections::BTreeMap;
21use std::ops::Bound;
22use std::ptr::NonNull;
23use std::sync::Arc;
24use std::sync::atomic::{self, AtomicPtr, AtomicU32};
25
26struct PointerList<K, V>(Box<[AtomicPtr<SkipListNode<K, V>>]>);
30
31impl<K, V> PointerList<K, V> {
32 fn new(count: usize) -> PointerList<K, V> {
33 let mut pointers = Vec::new();
34 for _ in 0..count {
35 pointers.push(AtomicPtr::new(std::ptr::null_mut()));
36 }
37 PointerList(pointers.into_boxed_slice())
38 }
39
40 fn len(&self) -> usize {
41 self.0.len()
42 }
43
44 fn get(&self, index: usize) -> Option<NonNull<SkipListNode<K, V>>> {
46 NonNull::new(self.0[index].load(atomic::Ordering::SeqCst))
47 }
48
49 fn set(&self, index: usize, node: Option<NonNull<SkipListNode<K, V>>>) {
51 self.0[index]
52 .store(node.map_or(std::ptr::null_mut(), |n| n.as_ptr()), atomic::Ordering::SeqCst);
53 }
54}
55
56struct SkipListNode<K, V> {
57 item: Item<K, V>,
58 pointers: PointerList<K, V>,
59}
60
61pub struct SkipListLayer<K, V> {
62 pointers: PointerList<K, V>,
64
65 inner: Mutex<Inner<K, V>>,
66
67 write_lock: Mutex<()>,
69
70 allocated: AtomicU32,
72
73 close_event: Mutex<Option<Arc<DropEvent>>>,
74}
75
76struct Inner<K, V> {
82 epoch: u64,
86
87 current_count: u16,
89
90 erase_lists: BTreeMap<u64, EpochEraseList<K, V>>,
92
93 item_count: usize,
95}
96
97struct EpochEraseList<K, V> {
102 count: u16,
105 start: NonNull<SkipListNode<K, V>>,
108 end: Option<NonNull<SkipListNode<K, V>>>,
109}
110
111unsafe impl<K, V> Send for Inner<K, V> {}
113
114impl<K, V> Inner<K, V> {
115 fn new() -> Self {
116 Inner { epoch: 0, current_count: 0, erase_lists: BTreeMap::new(), item_count: 0 }
117 }
118 fn free_erase_list(
119 &mut self,
120 owner: &SkipListLayer<K, V>,
121 start: NonNull<SkipListNode<K, V>>,
122 end: Option<NonNull<SkipListNode<K, V>>>,
123 ) {
124 let mut node = start;
125 loop {
126 let next = unsafe { owner.free_node(node) };
128 if next == end {
129 break;
130 }
131 node = next.unwrap();
132 }
133 }
134}
135
136impl<K, V> SkipListLayer<K, V> {
137 pub fn new(max_item_count: usize) -> Arc<SkipListLayer<K, V>> {
138 Arc::new(SkipListLayer {
139 pointers: PointerList::new((max_item_count as f32).log2() as usize + 1),
140 inner: Mutex::new(Inner::new()),
141 write_lock: Mutex::new(()),
142 allocated: AtomicU32::new(0),
143 close_event: Mutex::new(Some(Arc::new(DropEvent::new()))),
144 })
145 }
146
147 pub fn len(&self) -> usize {
148 self.inner.lock().item_count
149 }
150
151 fn alloc_node(&self, item: Item<K, V>, pointer_count: usize) -> Box<SkipListNode<K, V>> {
152 self.allocated.fetch_add(1, atomic::Ordering::Relaxed);
153 Box::new(SkipListNode { item, pointers: PointerList::new(pointer_count) })
154 }
155
156 unsafe fn free_node(
162 &self,
163 node: NonNull<SkipListNode<K, V>>,
164 ) -> Option<NonNull<SkipListNode<K, V>>> {
165 self.allocated.fetch_sub(1, atomic::Ordering::Relaxed);
166 unsafe { Box::from_raw(node.as_ptr()).pointers.get(0) }
167 }
168}
169
170impl<K: Eq + Key + OrdLowerBound, V: LayerValue> SkipListLayer<K, V> {
171 pub fn erase(&self, key: &K)
173 where
174 K: std::cmp::Eq,
175 {
176 let mut iter = SkipListLayerIterMut::new(self, Bound::Included(key));
177 if let Some(ItemRef { key: k, .. }) = iter.get() {
178 if k == key {
179 iter.erase();
180 } else {
181 warn!("Attempt to erase key not present!");
182 }
183 }
184 iter.commit();
185 }
186
187 pub fn insert(&self, item: Item<K, V>) -> Result<(), Error> {
189 let mut iter = SkipListLayerIterMut::new(self, Bound::Included(&item.key));
190 if let Some(found_item) = iter.get() {
191 if found_item.key == &item.key {
192 bail!("Attempted to insert an existing key");
193 }
194 }
195 iter.insert(item);
196 Ok(())
197 }
198
199 pub fn replace_or_insert(&self, item: Item<K, V>) {
201 let mut iter = SkipListLayerIterMut::new(self, Bound::Included(&item.key));
202 if let Some(found_item) = iter.get() {
203 if found_item.key == &item.key {
204 iter.erase();
205 }
206 }
207 iter.insert(item);
208 }
209
210 pub fn merge_into(&self, item: Item<K, V>, lower_bound: &K, merge_fn: MergeFn<K, V>) {
212 merge::merge_into(
213 Box::new(SkipListLayerIterMut::new(self, Bound::Included(lower_bound))),
214 item,
215 merge_fn,
216 )
217 .unwrap();
218 }
219}
220
221impl<K, V> Drop for SkipListLayer<K, V> {
223 fn drop(&mut self) {
224 let mut next = self.pointers.get(0);
225 while let Some(node) = next {
226 next = unsafe { self.free_node(node) };
228 }
229 assert_eq!(self.allocated.load(atomic::Ordering::Relaxed), 0);
230 }
231}
232
233#[async_trait]
234impl<K: Key, V: LayerValue> Layer<K, V> for SkipListLayer<K, V> {
235 async fn seek<'a>(
236 &'a self,
237 bound: std::ops::Bound<&K>,
238 ) -> Result<BoxedLayerIterator<'a, K, V>, Error> {
239 Ok(Box::new(SkipListLayerIter::new(self, bound)))
240 }
241
242 fn lock(&self) -> Option<Arc<DropEvent>> {
243 self.close_event.lock().clone()
244 }
245
246 fn len(&self) -> usize {
247 self.inner.lock().item_count
248 }
249
250 async fn close(&self) {
251 let listener = self.close_event.lock().take().expect("close already called").listen();
252 listener.await;
253 }
254
255 fn get_version(&self) -> Version {
256 return LATEST_VERSION;
259 }
260
261 fn record_inspect_data(self: Arc<Self>, node: &fuchsia_inspect::Node) {
262 node.record_bool("persistent", false);
263 node.record_uint("num_items", self.inner.lock().item_count as u64);
264 }
265
266 async fn key_exists(&self, key: &K) -> Result<Existence, Error> {
267 let iter = SkipListLayerIter::new(self, Bound::Included(key));
268 Ok(iter.get().map_or(Existence::Missing, |i| {
269 if i.key.cmp_upper_bound(key).is_eq() { Existence::Exists } else { Existence::Missing }
270 }))
271 }
272}
273
274struct SkipListLayerIter<'a, K, V> {
277 skip_list: &'a SkipListLayer<K, V>,
278
279 epoch: u64,
281
282 node: Option<NonNull<SkipListNode<K, V>>>,
284}
285
286unsafe impl<K, V> Send for SkipListLayerIter<'_, K, V> {}
288unsafe impl<K, V> Sync for SkipListLayerIter<'_, K, V> {}
289
290impl<'a, K: OrdUpperBound, V> SkipListLayerIter<'a, K, V> {
291 fn new(skip_list: &'a SkipListLayer<K, V>, bound: Bound<&K>) -> Self {
292 let epoch = {
293 let mut inner = skip_list.inner.lock();
294 inner.current_count += 1;
295 inner.epoch
296 };
297 let (included, key) = match bound {
298 Bound::Unbounded => {
299 return SkipListLayerIter { skip_list, epoch, node: skip_list.pointers.get(0) };
300 }
301 Bound::Included(key) => (true, key),
302 Bound::Excluded(key) => (false, key),
303 };
304 let mut last_pointers = &skip_list.pointers;
305
306 let mut node = None;
310 for index in (0..skip_list.pointers.len()).rev() {
311 loop {
313 node = last_pointers.get(index);
314 if let Some(node) = node {
315 let node = unsafe { node.as_ref() };
317 match &node.item.key.cmp_upper_bound(key) {
318 Ordering::Equal if included => break,
319 Ordering::Greater => break,
320 _ => {}
321 }
322 last_pointers = &node.pointers;
323 } else {
324 break;
325 }
326 }
327 }
328 SkipListLayerIter { skip_list, epoch, node }
329 }
330}
331
332impl<K, V> Drop for SkipListLayerIter<'_, K, V> {
333 fn drop(&mut self) {
334 let mut inner = self.skip_list.inner.lock();
335 if self.epoch == inner.epoch {
336 inner.current_count -= 1;
337 } else {
338 if let Some(erase_list) = inner.erase_lists.get_mut(&self.epoch) {
339 erase_list.count -= 1;
340 if erase_list.count == 0 {
341 while let Some(entry) = inner.erase_lists.first_entry() {
342 if entry.get().count == 0 {
343 let EpochEraseList { start, end, .. } = entry.remove_entry().1;
344 inner.free_erase_list(self.skip_list, start, end);
345 } else {
346 break;
347 }
348 }
349 }
350 }
351 }
352 }
353}
354
355#[async_trait]
356impl<K: Key, V: LayerValue> LayerIterator<K, V> for SkipListLayerIter<'_, K, V> {
357 async fn advance(&mut self) -> Result<(), Error> {
358 match self.node {
359 None => {}
360 Some(node) => {
361 self.node = {
362 unsafe { node.as_ref() }.pointers.get(0)
364 }
365 }
366 }
367 Ok(())
368 }
369
370 fn get(&self) -> Option<ItemRef<'_, K, V>> {
371 self.node.map(|node| unsafe { node.as_ref() }.item.as_item_ref())
373 }
374}
375
376type PointerListRefArray<'a, K, V> = Box<[&'a PointerList<K, V>]>;
377
378pub struct SkipListLayerIterMut<'a, K: Key, V: LayerValue> {
386 skip_list: &'a SkipListLayer<K, V>,
387
388 prev_pointers: PointerListRefArray<'a, K, V>,
392
393 insertion_point: Option<PointerListRefArray<'a, K, V>>,
396
397 insertion_nodes: PointerList<K, V>,
399
400 #[allow(dead_code)]
403 write_guard: MutexGuard<'a, ()>,
404
405 item_delta: isize,
407}
408
409impl<'a, K: Key, V: LayerValue> SkipListLayerIterMut<'a, K, V> {
410 pub fn new(skip_list: &'a SkipListLayer<K, V>, bound: std::ops::Bound<&K>) -> Self {
411 let write_guard = skip_list.write_lock.lock();
412 let len = skip_list.pointers.len();
413
414 let mut prev_pointers = vec![&skip_list.pointers; len].into_boxed_slice();
430 match bound {
431 Bound::Unbounded => {}
432 Bound::Included(key) => {
433 let pointers = &mut prev_pointers;
434 for index in (0..len).rev() {
435 while let Some(node) = pointers[index].get(index) {
436 let node = unsafe { node.as_ref() };
442
443 match node.item.key.cmp_upper_bound(key) {
444 Ordering::Equal | Ordering::Greater => break,
445 Ordering::Less => {}
446 }
447 pointers[index] = &node.pointers;
448 }
449 if index > 0 {
450 pointers[index - 1] = pointers[index];
451 }
452 }
453 }
454 Bound::Excluded(_) => panic!("Excluded bounds not supported"),
455 }
456 SkipListLayerIterMut {
457 skip_list,
458 prev_pointers,
459 insertion_point: None,
460 insertion_nodes: PointerList::new(len),
461 write_guard,
462 item_delta: 0,
463 }
464 }
465}
466
467impl<K: Key, V: LayerValue> Drop for SkipListLayerIterMut<'_, K, V> {
468 fn drop(&mut self) {
469 self.commit();
470 }
471}
472
473impl<K: Key, V: LayerValue> LayerIteratorMut<K, V> for SkipListLayerIterMut<'_, K, V> {
474 fn advance(&mut self) {
475 if self.insertion_point.is_some() {
476 if let Some(item) = self.get() {
477 let copy = item.cloned();
479 self.insert(copy);
480 self.erase();
481 }
482 } else {
483 let pointers = &mut self.prev_pointers;
484 if let Some(next) = pointers[0].get(0) {
485 let next = unsafe { next.as_ref() };
488 for i in 0..next.pointers.len() {
489 pointers[i] = &next.pointers;
490 }
491 }
492 }
493 }
494
495 fn get(&self) -> Option<ItemRef<'_, K, V>> {
496 self.prev_pointers[0].get(0).map(|node| unsafe { node.as_ref() }.item.as_item_ref())
499 }
500
501 fn insert(&mut self, item: Item<K, V>) {
502 use rand::Rng;
503 let mut rng = rand::rng();
504 let max_pointers = self.skip_list.pointers.len();
505 let pointer_count = max_pointers
508 - min(
509 (rng.random_range(0..2u32.pow(max_pointers as u32) - 1) as f32).log2() as usize,
510 max_pointers - 1,
511 );
512 let node = Box::leak(self.skip_list.alloc_node(item, pointer_count));
513 if self.insertion_point.is_none() {
514 self.insertion_point = Some(self.prev_pointers.clone());
515 }
516 let node_ptr = node.into();
517 for i in 0..pointer_count {
518 let pointers = self.prev_pointers[i];
519 node.pointers.set(i, pointers.get(i));
520 if self.insertion_nodes.get(i).is_none() {
521 self.insertion_nodes.set(i, Some(node_ptr));
524 } else {
525 pointers.set(i, Some(node_ptr));
528 }
529 self.prev_pointers[i] = &node.pointers;
531 }
532 self.item_delta += 1;
533 }
534
535 fn erase(&mut self) {
536 let pointers = &mut self.prev_pointers;
537 if let Some(next) = pointers[0].get(0) {
538 let next = unsafe { next.as_ref() };
541 if self.insertion_point.is_none() {
542 self.insertion_point = Some(pointers.clone());
543 }
544 if self.insertion_nodes.get(0).is_none() {
545 pointers[0] = &next.pointers;
548 } else {
549 pointers[0].set(0, next.pointers.get(0));
554 }
555 for i in 1..next.pointers.len() {
558 pointers[i].set(i, next.pointers.get(i));
559 }
560 }
561 self.item_delta -= 1;
562 }
563
564 fn commit(&mut self) {
567 let prev_pointers = match self.insertion_point.take() {
569 Some(prev_pointers) => prev_pointers,
570 None => return,
571 };
572
573 let maybe_erase = prev_pointers[0].get(0);
575
576 if self.insertion_nodes.get(0).is_none() {
578 prev_pointers[0].set(0, self.prev_pointers[0].get(0));
582 } else {
583 for i in 0..self.insertion_nodes.len() {
587 if let Some(node) = self.insertion_nodes.get(i) {
588 prev_pointers[i].set(i, Some(node));
589 }
590 }
591 }
592
593 let mut inner = self.skip_list.inner.lock();
595 inner.item_count = inner.item_count.checked_add_signed(self.item_delta).unwrap();
596 if let Some(start) = maybe_erase {
597 let end = self.prev_pointers[0].get(0);
598 if maybe_erase != end {
599 if inner.current_count > 0 || !inner.erase_lists.is_empty() {
600 let count = std::mem::take(&mut inner.current_count);
601 let epoch = inner.epoch;
602 inner.erase_lists.insert(epoch, EpochEraseList { count, start, end });
603 inner.epoch = inner.epoch.wrapping_add(1);
604 } else {
605 inner.free_erase_list(self.skip_list, start, end);
606 }
607 }
608 }
609 }
610}
611
612#[cfg(test)]
613mod tests {
614 use super::{SkipListLayer, SkipListLayerIterMut};
615 use crate::lsm_tree::merge::ItemOp::{Discard, Replace};
616 use crate::lsm_tree::merge::{MergeLayerIterator, MergeResult};
617 use crate::lsm_tree::skip_list_layer::SkipListLayerIter;
618 use crate::lsm_tree::types::{
619 DefaultOrdLowerBound, DefaultOrdUpperBound, Existence, FuzzyHash, Item, ItemRef, Layer,
620 LayerIterator, LayerIteratorMut, SortByU64,
621 };
622 use crate::serialized_types::{
623 LATEST_VERSION, Version, Versioned, VersionedLatest, versioned_type,
624 };
625 use assert_matches::assert_matches;
626 use fprint::TypeFingerprint;
627 use fuchsia_async as fasync;
628 use futures::future::join_all;
629 use futures::{FutureExt as _, join};
630 use fxfs_macros::FuzzyHash;
631 use std::hash::Hash;
632 use std::ops::Bound;
633 use std::time::{Duration, Instant};
634
635 #[derive(
636 Clone,
637 Eq,
638 Debug,
639 Hash,
640 FuzzyHash,
641 PartialEq,
642 PartialOrd,
643 Ord,
644 serde::Serialize,
645 serde::Deserialize,
646 TypeFingerprint,
647 Versioned,
648 )]
649 struct TestKey(u64);
650
651 versioned_type! { 1.. => TestKey }
652
653 impl SortByU64 for TestKey {
654 fn get_leading_u64(&self) -> u64 {
655 self.0
656 }
657 }
658
659 impl DefaultOrdLowerBound for TestKey {}
660 impl DefaultOrdUpperBound for TestKey {}
661
662 #[fuchsia::test]
663 async fn test_key_exists() {
664 let skip_list = SkipListLayer::new(100);
665 skip_list.insert(Item::new(TestKey(1), 1)).expect("insert error");
666 skip_list.insert(Item::new(TestKey(3), 3)).expect("insert error");
667
668 assert_eq!(
669 skip_list.key_exists(&TestKey(0)).await.expect("key_exists failed"),
670 Existence::Missing
671 );
672 assert_eq!(
673 skip_list.key_exists(&TestKey(1)).await.expect("key_exists failed"),
674 Existence::Exists
675 );
676 assert_eq!(
677 skip_list.key_exists(&TestKey(2)).await.expect("key_exists failed"),
678 Existence::Missing
679 );
680 assert_eq!(
681 skip_list.key_exists(&TestKey(3)).await.expect("key_exists failed"),
682 Existence::Exists
683 );
684 assert_eq!(
685 skip_list.key_exists(&TestKey(4)).await.expect("key_exists failed"),
686 Existence::Missing
687 );
688 }
689
690 #[fuchsia::test]
691 async fn test_iteration() {
692 let skip_list = SkipListLayer::new(100);
694 let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2)];
695 skip_list.insert(items[1].clone()).expect("insert error");
696 skip_list.insert(items[0].clone()).expect("insert error");
697 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
698 let ItemRef { key, value, .. } = iter.get().expect("missing item");
699 assert_eq!((key, value), (&items[0].key, &items[0].value));
700 iter.advance().await.unwrap();
701 let ItemRef { key, value, .. } = iter.get().expect("missing item");
702 assert_eq!((key, value), (&items[1].key, &items[1].value));
703 iter.advance().await.unwrap();
704 assert!(iter.get().is_none());
705 }
706
707 #[fuchsia::test]
708 async fn test_seek_exact() {
709 let skip_list = SkipListLayer::new(100);
711 for i in (0..100).rev() {
712 skip_list.insert(Item::new(TestKey(i), i)).expect("insert error");
713 }
714 let mut iter = skip_list.seek(Bound::Included(&TestKey(57))).await.unwrap();
715 let ItemRef { key, value, .. } = iter.get().expect("missing item");
716 assert_eq!((key, value), (&TestKey(57), &57));
717
718 iter.advance().await.unwrap();
720 let ItemRef { key, value, .. } = iter.get().expect("missing item");
721 assert_eq!((key, value), (&TestKey(58), &58));
722 }
723
724 #[fuchsia::test]
725 async fn test_seek_lower_bound() {
726 let skip_list = SkipListLayer::new(100);
728 for i in (0..100).rev() {
729 skip_list.insert(Item::new(TestKey(i * 3), i * 3)).expect("insert error");
730 }
731 let mut expected_index = 57 * 3;
732 let mut iter = skip_list.seek(Bound::Included(&TestKey(expected_index - 1))).await.unwrap();
733 let ItemRef { key, value, .. } = iter.get().expect("missing item");
734 assert_eq!((key, value), (&TestKey(expected_index), &expected_index));
735
736 expected_index += 3;
738 iter.advance().await.unwrap();
739 let ItemRef { key, value, .. } = iter.get().expect("missing item");
740 assert_eq!((key, value), (&TestKey(expected_index), &expected_index));
741 }
742
743 #[fuchsia::test]
744 async fn test_replace_or_insert_replaces() {
745 let skip_list = SkipListLayer::new(100);
746 let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2)];
747 skip_list.insert(items[1].clone()).expect("insert error");
748 skip_list.insert(items[0].clone()).expect("insert error");
749 let replacement_value = 3;
750 skip_list.replace_or_insert(Item::new(items[1].key.clone(), replacement_value));
751
752 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
753 let ItemRef { key, value, .. } = iter.get().expect("missing item");
754 assert_eq!((key, value), (&items[0].key, &items[0].value));
755 iter.advance().await.unwrap();
756 let ItemRef { key, value, .. } = iter.get().expect("missing item");
757 assert_eq!((key, value), (&items[1].key, &replacement_value));
758 iter.advance().await.unwrap();
759 assert!(iter.get().is_none());
760 }
761
762 #[fuchsia::test]
763 async fn test_replace_or_insert_inserts() {
764 let skip_list = SkipListLayer::new(100);
765 let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2), Item::new(TestKey(3), 3)];
766 skip_list.insert(items[2].clone()).expect("insert error");
767 skip_list.insert(items[0].clone()).expect("insert error");
768 skip_list.replace_or_insert(items[1].clone());
769
770 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
771 let ItemRef { key, value, .. } = iter.get().expect("missing item");
772 assert_eq!((key, value), (&items[0].key, &items[0].value));
773 iter.advance().await.unwrap();
774 let ItemRef { key, value, .. } = iter.get().expect("missing item");
775 assert_eq!((key, value), (&items[1].key, &items[1].value));
776 iter.advance().await.unwrap();
777 let ItemRef { key, value, .. } = iter.get().expect("missing item");
778 assert_eq!((key, value), (&items[2].key, &items[2].value));
779 iter.advance().await.unwrap();
780 assert!(iter.get().is_none());
781 }
782
783 #[fuchsia::test]
784 async fn test_erase() {
785 let skip_list = SkipListLayer::new(100);
786 let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2)];
787 skip_list.insert(items[1].clone()).expect("insert error");
788 skip_list.insert(items[0].clone()).expect("insert error");
789
790 assert_eq!(skip_list.len(), 2);
791
792 skip_list.erase(&items[1].key);
793
794 assert_eq!(skip_list.len(), 1);
795
796 {
797 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
798 let ItemRef { key, value, .. } = iter.get().expect("missing item");
799 assert_eq!((key, value), (&items[0].key, &items[0].value));
800 iter.advance().await.unwrap();
801 assert!(iter.get().is_none());
802 }
803
804 skip_list.erase(&items[0].key);
805
806 assert_eq!(skip_list.len(), 0);
807
808 {
809 let iter = skip_list.seek(Bound::Unbounded).await.unwrap();
810 assert!(iter.get().is_none());
811 }
812 }
813
814 #[fuchsia::test]
817 #[ignore]
818 async fn test_seek_is_log_n_complexity() {
819 let mut n = 100;
822 let mut loops = 0;
823 const TARGET_TIME: Duration = Duration::from_millis(500);
824 let time = loop {
825 let skip_list = SkipListLayer::new(n as usize);
826 for i in 0..n {
827 skip_list.insert(Item::new(TestKey(i), i)).expect("insert error");
828 }
829 let start = Instant::now();
830 for i in 0..n {
831 skip_list.seek(Bound::Included(&TestKey(i))).await.unwrap();
832 }
833 let elapsed = Instant::now() - start;
834 if elapsed > TARGET_TIME {
835 break elapsed;
836 }
837 n *= 2;
838 loops += 1;
839 };
840
841 let seek_count = n;
842 n >>= loops / 2; let skip_list = SkipListLayer::new(n as usize);
844 for i in 0..n {
845 skip_list.insert(Item::new(TestKey(i), i)).expect("insert error");
846 }
847 let start = Instant::now();
848 for i in 0..seek_count {
849 skip_list.seek(Bound::Included(&TestKey(i))).await.unwrap();
850 }
851 let elapsed = Instant::now() - start;
852
853 eprintln!(
854 "{} items: {}ms, {} items: {}ms",
855 seek_count,
856 time.as_millis(),
857 n,
858 elapsed.as_millis()
859 );
860
861 assert!(elapsed * 4 > time);
865 }
866
867 #[fuchsia::test]
868 async fn test_large_number_of_items() {
869 let item_count = 1000;
870 let skip_list = SkipListLayer::new(1000);
871 for i in 1..item_count {
872 skip_list.insert(Item::new(TestKey(i), 1)).expect("insert error");
873 }
874 let mut iter = skip_list.seek(Bound::Included(&TestKey(item_count - 10))).await.unwrap();
875 for i in item_count - 10..item_count {
876 assert_eq!(iter.get().expect("missing item").key, &TestKey(i));
877 iter.advance().await.unwrap();
878 }
879 assert!(iter.get().is_none());
880 }
881
882 #[fuchsia::test]
883 async fn test_multiple_readers_allowed() {
884 let skip_list = SkipListLayer::new(100);
885 let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2)];
886 skip_list.insert(items[1].clone()).expect("insert error");
887 skip_list.insert(items[0].clone()).expect("insert error");
888
889 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
891 let ItemRef { key, value, .. } = iter.get().expect("missing item");
892 assert_eq!((key, value), (&items[0].key, &items[0].value));
893
894 let iter2 = skip_list.seek(Bound::Unbounded).await.unwrap();
896 let ItemRef { key, value, .. } = iter2.get().expect("missing item");
897 assert_eq!((key, value), (&items[0].key, &items[0].value));
898
899 iter.advance().await.unwrap();
901 let ItemRef { key, value, .. } = iter.get().expect("missing item");
902 assert_eq!((key, value), (&items[1].key, &items[1].value));
903 }
904
905 fn merge(
906 left: &'_ MergeLayerIterator<'_, TestKey, i32>,
907 right: &'_ MergeLayerIterator<'_, TestKey, i32>,
908 ) -> MergeResult<TestKey, i32> {
909 MergeResult::Other {
910 emit: None,
911 left: Replace(Item::new((*left.key()).clone(), *left.value() + *right.value()).boxed()),
912 right: Discard,
913 }
914 }
915
916 #[fuchsia::test]
917 async fn test_merge_into() {
918 let skip_list = SkipListLayer::new(100);
919 skip_list.insert(Item::new(TestKey(1), 1)).expect("insert error");
920
921 skip_list.merge_into(Item::new(TestKey(2), 2), &TestKey(1), merge);
922
923 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
924 let ItemRef { key, value, .. } = iter.get().expect("missing item");
925 assert_eq!((key, value), (&TestKey(1), &3));
926 iter.advance().await.unwrap();
927 assert!(iter.get().is_none());
928 }
929
930 #[fuchsia::test]
931 async fn test_two_inserts() {
932 let skip_list = SkipListLayer::new(100);
933 let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2)];
934 {
935 let mut iter = SkipListLayerIterMut::new(&skip_list, std::ops::Bound::Unbounded);
936 iter.insert(items[0].clone());
937 iter.insert(items[1].clone());
938 }
939
940 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
941 let ItemRef { key, value, .. } = iter.get().expect("missing item");
942 assert_eq!((key, value), (&items[0].key, &items[0].value));
943 iter.advance().await.unwrap();
944 let ItemRef { key, value, .. } = iter.get().expect("missing item");
945 assert_eq!((key, value), (&items[1].key, &items[1].value));
946 }
947
948 #[fuchsia::test]
949 async fn test_erase_after_insert() {
950 let skip_list = SkipListLayer::new(100);
951 let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2)];
952 skip_list.insert(items[1].clone()).expect("insert error");
953 {
954 let mut iter = SkipListLayerIterMut::new(&skip_list, std::ops::Bound::Unbounded);
955 iter.insert(items[0].clone());
956 iter.erase();
957 }
958
959 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
960 let ItemRef { key, value, .. } = iter.get().expect("missing item");
961 assert_eq!((key, value), (&items[0].key, &items[0].value));
962 iter.advance().await.unwrap();
963 assert!(iter.get().is_none());
964 }
965
966 #[fuchsia::test]
967 async fn test_insert_after_erase() {
968 let skip_list = SkipListLayer::new(100);
969 let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2)];
970 skip_list.insert(items[1].clone()).expect("insert error");
971 {
972 let mut iter = SkipListLayerIterMut::new(&skip_list, std::ops::Bound::Unbounded);
973 iter.erase();
974 iter.insert(items[0].clone());
975 }
976
977 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
978 let ItemRef { key, value, .. } = iter.get().expect("missing item");
979 assert_eq!((key, value), (&items[0].key, &items[0].value));
980 iter.advance().await.unwrap();
981 assert!(iter.get().is_none());
982 }
983
984 #[fuchsia::test]
985 async fn test_insert_erase_insert() {
986 let skip_list = SkipListLayer::new(100);
987 let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2), Item::new(TestKey(3), 3)];
988 skip_list.insert(items[0].clone()).expect("insert error");
989 {
990 let mut iter = SkipListLayerIterMut::new(&skip_list, std::ops::Bound::Unbounded);
991 iter.insert(items[1].clone());
992 iter.erase();
993 iter.insert(items[2].clone());
994 }
995
996 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
997 let ItemRef { key, value, .. } = iter.get().expect("missing item");
998 assert_eq!((key, value), (&items[1].key, &items[1].value));
999 iter.advance().await.unwrap();
1000 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1001 assert_eq!((key, value), (&items[2].key, &items[2].value));
1002 }
1003
1004 #[fuchsia::test]
1005 async fn test_two_erase_erases() {
1006 let skip_list = SkipListLayer::new(100);
1007 let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2), Item::new(TestKey(3), 3)];
1008 skip_list.insert(items[0].clone()).expect("insert error");
1009 skip_list.insert(items[1].clone()).expect("insert error");
1010 skip_list.insert(items[2].clone()).expect("insert error");
1011 {
1012 let mut iter = SkipListLayerIterMut::new(&skip_list, std::ops::Bound::Unbounded);
1013 iter.erase();
1014 iter.erase();
1015 }
1016
1017 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1018 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1019 assert_eq!((key, value), (&items[2].key, &items[2].value));
1020 iter.advance().await.unwrap();
1021 assert!(iter.get().is_none());
1022 }
1023
1024 #[fuchsia::test]
1025 async fn test_readers_not_blocked_by_writers() {
1026 let skip_list = SkipListLayer::new(100);
1027 let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2)];
1028 skip_list.insert(items[1].clone()).expect("insert error");
1029
1030 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1031 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1032 assert_eq!((key, value), (&items[1].key, &items[1].value));
1033
1034 let mut iter2 = skip_list.seek(Bound::Unbounded).await.unwrap();
1035 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1036 assert_eq!((key, value), (&items[1].key, &items[1].value));
1037
1038 join!(async { skip_list.insert(items[0].clone()).expect("insert error") }, async {
1039 loop {
1040 let iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1041 let ItemRef { key, .. } = iter.get().expect("missing item");
1042 if key == &items[0].key {
1043 break;
1044 }
1045 }
1046 iter.advance().await.unwrap();
1047 assert!(iter.get().is_none());
1048 std::mem::drop(iter);
1049 iter2.advance().await.unwrap();
1050 assert!(iter2.get().is_none());
1051 std::mem::drop(iter2);
1052 });
1053 }
1054
1055 #[fuchsia::test(threads = 20)]
1056 async fn test_many_readers_and_writers() {
1057 let skip_list = SkipListLayer::new(100);
1058 join_all(
1059 (0..10)
1060 .map(|i| {
1061 let skip_list_clone = skip_list.clone();
1062 fasync::Task::spawn(async move {
1063 for j in 0..10 {
1064 skip_list_clone
1065 .insert(Item::new(TestKey(i * 100 + j), i))
1066 .expect("insert error");
1067 }
1068 })
1069 })
1070 .chain((0..10).map(|_| {
1071 let skip_list_clone = skip_list.clone();
1072 fasync::Task::spawn(async move {
1073 for _ in 0..300 {
1074 let mut iter =
1075 skip_list_clone.seek(Bound::Unbounded).await.expect("seek failed");
1076 let mut last_item: Option<TestKey> = None;
1077 while let Some(item) = iter.get() {
1078 if let Some(last) = last_item {
1079 assert!(item.key > &last);
1080 }
1081 last_item = Some(item.key.clone());
1082 iter.advance().await.expect("advance failed");
1083 }
1084 }
1085 })
1086 })),
1087 )
1088 .await;
1089 }
1090
1091 #[fuchsia::test]
1092 async fn test_insert_advance_erase() {
1093 let skip_list = SkipListLayer::new(100);
1094 let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2), Item::new(TestKey(3), 3)];
1095 skip_list.insert(items[1].clone()).expect("insert error");
1096 skip_list.insert(items[2].clone()).expect("insert error");
1097
1098 assert_eq!(skip_list.len(), 2);
1099
1100 {
1101 let mut iter = SkipListLayerIterMut::new(&skip_list, std::ops::Bound::Unbounded);
1102 iter.insert(items[0].clone());
1103 iter.advance();
1104 iter.erase();
1105 }
1106
1107 assert_eq!(skip_list.len(), 2);
1108
1109 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1110 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1111 assert_eq!((key, value), (&items[0].key, &items[0].value));
1112 iter.advance().await.unwrap();
1113 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1114 assert_eq!((key, value), (&items[1].key, &items[1].value));
1115 iter.advance().await.unwrap();
1116 assert!(iter.get().is_none());
1117 }
1118
1119 #[fuchsia::test]
1120 async fn test_seek_excluded() {
1121 let skip_list = SkipListLayer::new(100);
1122 let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2)];
1123 skip_list.insert(items[0].clone()).expect("insert error");
1124 skip_list.insert(items[1].clone()).expect("insert error");
1125 let iter = skip_list.seek(Bound::Excluded(&items[0].key)).await.expect("seek failed");
1126 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1127 assert_eq!((key, value), (&items[1].key, &items[1].value));
1128 }
1129
1130 #[fuchsia::test]
1131 fn test_insert_race() {
1132 for _ in 0..1000 {
1133 let skip_list = SkipListLayer::new(100);
1134 skip_list.insert(Item::new(TestKey(2), 2)).expect("insert error");
1135
1136 let skip_list_clone = skip_list.clone();
1137 let thread1 = std::thread::spawn(move || {
1138 skip_list_clone.insert(Item::new(TestKey(1), 1)).expect("insert error")
1139 });
1140 let thread2 = std::thread::spawn(move || {
1141 let iter = SkipListLayerIter::new(&skip_list, Bound::Included(&TestKey(2)));
1142 match iter.get() {
1143 Some(ItemRef { key: TestKey(2), .. }) => {}
1144 result => assert!(false, "{:?}", result),
1145 }
1146 });
1147 thread1.join().unwrap();
1148 thread2.join().unwrap();
1149 }
1150 }
1151
1152 #[fuchsia::test]
1153 fn test_replace_or_insert_multi_thread() {
1154 let skip_list = SkipListLayer::new(100);
1155 skip_list.insert(Item::new(TestKey(1), 1)).expect("insert error");
1156 skip_list.insert(Item::new(TestKey(2), 2)).expect("insert error");
1157 skip_list.insert(Item::new(TestKey(3), 3)).expect("insert error");
1158 skip_list.insert(Item::new(TestKey(4), 4)).expect("insert error");
1159
1160 let mut threads = Vec::new();
1162 for i in 0..200 {
1163 let skip_list_clone = skip_list.clone();
1164 threads.push(std::thread::spawn(move || {
1165 skip_list_clone.replace_or_insert(Item::new(TestKey(3), i));
1166 }));
1167 }
1168
1169 let _checker_thread = std::thread::spawn(move || {
1171 loop {
1172 let mut iter = SkipListLayerIter::new(&skip_list, Bound::Included(&TestKey(2)));
1173 assert_matches!(iter.get(), Some(ItemRef { key: TestKey(2), .. }));
1174 iter.advance().now_or_never().unwrap().unwrap();
1175 assert_matches!(iter.get(), Some(ItemRef { key: TestKey(3), .. }));
1176 iter.advance().now_or_never().unwrap().unwrap();
1177 assert_matches!(iter.get(), Some(ItemRef { key: TestKey(4), .. }));
1178 }
1179 });
1180
1181 for thread in threads {
1182 thread.join().unwrap();
1183 }
1184 }
1185}