1use crate::drop_event::DropEvent;
9use crate::log::*;
10use crate::lsm_tree::merge::{self, MergeFn};
11use crate::lsm_tree::types::{
12 BoxedLayerIterator, Existence, Item, ItemRef, Key, Layer, LayerIterator, LayerIteratorMut,
13 LayerValue, OrdLowerBound, OrdUpperBound,
14};
15use crate::serialized_types::{LATEST_VERSION, Version};
16use anyhow::{Error, bail};
17use async_trait::async_trait;
18use fuchsia_sync::{Mutex, MutexGuard};
19use std::cmp::{Ordering, min};
20use std::collections::BTreeMap;
21use std::ops::Bound;
22use std::ptr::NonNull;
23use std::sync::Arc;
24use std::sync::atomic::{self, AtomicPtr, AtomicU32};
25
26struct PointerList<K, V>(Box<[AtomicPtr<SkipListNode<K, V>>]>);
30
31impl<K, V> PointerList<K, V> {
32 fn new(count: usize) -> PointerList<K, V> {
33 let mut pointers = Vec::new();
34 for _ in 0..count {
35 pointers.push(AtomicPtr::new(std::ptr::null_mut()));
36 }
37 PointerList(pointers.into_boxed_slice())
38 }
39
40 fn len(&self) -> usize {
41 self.0.len()
42 }
43
44 fn get(&self, index: usize) -> Option<NonNull<SkipListNode<K, V>>> {
46 NonNull::new(self.0[index].load(atomic::Ordering::SeqCst))
47 }
48
49 fn set(&self, index: usize, node: Option<NonNull<SkipListNode<K, V>>>) {
51 self.0[index]
52 .store(node.map_or(std::ptr::null_mut(), |n| n.as_ptr()), atomic::Ordering::SeqCst);
53 }
54}
55
56struct SkipListNode<K, V> {
57 item: Item<K, V>,
58 pointers: PointerList<K, V>,
59}
60
61pub struct SkipListLayer<K, V> {
62 pointers: PointerList<K, V>,
64
65 inner: Mutex<Inner<K, V>>,
66
67 write_lock: Mutex<()>,
69
70 allocated: AtomicU32,
72
73 close_event: Mutex<Option<Arc<DropEvent>>>,
74}
75
76struct Inner<K, V> {
82 epoch: u64,
86
87 current_count: u16,
89
90 erase_lists: BTreeMap<u64, EpochEraseList<K, V>>,
92
93 item_count: usize,
95}
96
97struct EpochEraseList<K, V> {
102 count: u16,
105 start: NonNull<SkipListNode<K, V>>,
108 end: Option<NonNull<SkipListNode<K, V>>>,
109}
110
111unsafe impl<K, V> Send for Inner<K, V> {}
113
114impl<K, V> Inner<K, V> {
115 fn new() -> Self {
116 Inner { epoch: 0, current_count: 0, erase_lists: BTreeMap::new(), item_count: 0 }
117 }
118 fn free_erase_list(
119 &mut self,
120 owner: &SkipListLayer<K, V>,
121 start: NonNull<SkipListNode<K, V>>,
122 end: Option<NonNull<SkipListNode<K, V>>>,
123 ) {
124 let mut node = start;
125 loop {
126 let next = unsafe { owner.free_node(node) };
128 if next == end {
129 break;
130 }
131 node = next.unwrap();
132 }
133 }
134}
135
136impl<K, V> SkipListLayer<K, V> {
137 pub fn new(max_item_count: usize) -> Arc<SkipListLayer<K, V>> {
138 Arc::new(SkipListLayer {
139 pointers: PointerList::new((max_item_count as f32).log2() as usize + 1),
140 inner: Mutex::new(Inner::new()),
141 write_lock: Mutex::new(()),
142 allocated: AtomicU32::new(0),
143 close_event: Mutex::new(Some(Arc::new(DropEvent::new()))),
144 })
145 }
146
147 pub fn len(&self) -> usize {
148 self.inner.lock().item_count
149 }
150
151 fn alloc_node(&self, item: Item<K, V>, pointer_count: usize) -> Box<SkipListNode<K, V>> {
152 self.allocated.fetch_add(1, atomic::Ordering::Relaxed);
153 Box::new(SkipListNode { item, pointers: PointerList::new(pointer_count) })
154 }
155
156 unsafe fn free_node(
162 &self,
163 node: NonNull<SkipListNode<K, V>>,
164 ) -> Option<NonNull<SkipListNode<K, V>>> {
165 self.allocated.fetch_sub(1, atomic::Ordering::Relaxed);
166 unsafe { Box::from_raw(node.as_ptr()).pointers.get(0) }
167 }
168}
169
170impl<K: Eq + Key + OrdLowerBound, V: LayerValue> SkipListLayer<K, V> {
171 pub fn erase(&self, key: &K)
173 where
174 K: std::cmp::Eq,
175 {
176 let mut iter = SkipListLayerIterMut::new(self, Bound::Included(key));
177 if let Some(ItemRef { key: k, .. }) = iter.get() {
178 if k == key {
179 iter.erase();
180 } else {
181 warn!("Attempt to erase key not present!");
182 }
183 }
184 iter.commit();
185 }
186
187 pub fn insert(&self, item: Item<K, V>) -> Result<(), Error> {
189 let mut iter = SkipListLayerIterMut::new(self, Bound::Included(&item.key));
190 if let Some(found_item) = iter.get() {
191 if found_item.key == &item.key {
192 bail!("Attempted to insert an existing key");
193 }
194 }
195 iter.insert(item);
196 Ok(())
197 }
198
199 pub fn replace_or_insert(&self, item: Item<K, V>) {
201 let mut iter = SkipListLayerIterMut::new(self, Bound::Included(&item.key));
202 if let Some(found_item) = iter.get() {
203 if found_item.key == &item.key {
204 iter.erase();
205 }
206 }
207 iter.insert(item);
208 }
209
210 pub fn merge_into(&self, item: Item<K, V>, lower_bound: &K, merge_fn: MergeFn<K, V>) {
212 merge::merge_into(
213 Box::new(SkipListLayerIterMut::new(self, Bound::Included(lower_bound))),
214 item,
215 merge_fn,
216 )
217 .unwrap();
218 }
219}
220
221impl<K, V> Drop for SkipListLayer<K, V> {
223 fn drop(&mut self) {
224 let mut next = self.pointers.get(0);
225 while let Some(node) = next {
226 next = unsafe { self.free_node(node) };
228 }
229 assert_eq!(self.allocated.load(atomic::Ordering::Relaxed), 0);
230 }
231}
232
233#[async_trait]
234impl<K: Key, V: LayerValue> Layer<K, V> for SkipListLayer<K, V> {
235 async fn seek<'a>(
236 &'a self,
237 bound: std::ops::Bound<&K>,
238 ) -> Result<BoxedLayerIterator<'a, K, V>, Error> {
239 Ok(Box::new(SkipListLayerIter::new(self, bound)))
240 }
241
242 fn lock(&self) -> Option<Arc<DropEvent>> {
243 self.close_event.lock().clone()
244 }
245
246 fn len(&self) -> usize {
247 self.inner.lock().item_count
248 }
249
250 async fn close(&self) {
251 let listener = self.close_event.lock().take().expect("close already called").listen();
252 listener.await;
253 }
254
255 fn get_version(&self) -> Version {
256 return LATEST_VERSION;
259 }
260
261 fn record_inspect_data(self: Arc<Self>, node: &fuchsia_inspect::Node) {
262 node.record_bool("persistent", false);
263 node.record_uint("num_items", self.inner.lock().item_count as u64);
264 }
265
266 async fn key_exists(&self, key: &K) -> Result<Existence, Error> {
267 let iter = SkipListLayerIter::new(self, Bound::Included(key));
268 Ok(iter.get().map_or(Existence::Missing, |i| {
269 if i.key.cmp_upper_bound(key).is_eq() { Existence::Exists } else { Existence::Missing }
270 }))
271 }
272}
273
274struct SkipListLayerIter<'a, K, V> {
277 skip_list: &'a SkipListLayer<K, V>,
278
279 epoch: u64,
281
282 node: Option<NonNull<SkipListNode<K, V>>>,
284}
285
286unsafe impl<K, V> Send for SkipListLayerIter<'_, K, V> {}
288unsafe impl<K, V> Sync for SkipListLayerIter<'_, K, V> {}
289
290impl<'a, K: OrdUpperBound, V> SkipListLayerIter<'a, K, V> {
291 fn new(skip_list: &'a SkipListLayer<K, V>, bound: Bound<&K>) -> Self {
292 let epoch = {
293 let mut inner = skip_list.inner.lock();
294 inner.current_count += 1;
295 inner.epoch
296 };
297 let (included, key) = match bound {
298 Bound::Unbounded => {
299 return SkipListLayerIter { skip_list, epoch, node: skip_list.pointers.get(0) };
300 }
301 Bound::Included(key) => (true, key),
302 Bound::Excluded(key) => (false, key),
303 };
304 let mut last_pointers = &skip_list.pointers;
305
306 let mut node = None;
310 for index in (0..skip_list.pointers.len()).rev() {
311 loop {
313 node = last_pointers.get(index);
314 if let Some(node) = node {
315 let node = unsafe { node.as_ref() };
317 match &node.item.key.cmp_upper_bound(key) {
318 Ordering::Equal if included => break,
319 Ordering::Greater => break,
320 _ => {}
321 }
322 last_pointers = &node.pointers;
323 } else {
324 break;
325 }
326 }
327 }
328 SkipListLayerIter { skip_list, epoch, node }
329 }
330}
331
332impl<K, V> Drop for SkipListLayerIter<'_, K, V> {
333 fn drop(&mut self) {
334 let mut inner = self.skip_list.inner.lock();
335 if self.epoch == inner.epoch {
336 inner.current_count -= 1;
337 } else {
338 if let Some(erase_list) = inner.erase_lists.get_mut(&self.epoch) {
339 erase_list.count -= 1;
340 if erase_list.count == 0 {
341 while let Some(entry) = inner.erase_lists.first_entry() {
342 if entry.get().count == 0 {
343 let EpochEraseList { start, end, .. } = entry.remove_entry().1;
344 inner.free_erase_list(self.skip_list, start, end);
345 } else {
346 break;
347 }
348 }
349 }
350 }
351 }
352 }
353}
354
355#[async_trait]
356impl<K: Key, V: LayerValue> LayerIterator<K, V> for SkipListLayerIter<'_, K, V> {
357 async fn advance(&mut self) -> Result<(), Error> {
358 match self.node {
359 None => {}
360 Some(node) => {
361 self.node = {
362 unsafe { node.as_ref() }.pointers.get(0)
364 }
365 }
366 }
367 Ok(())
368 }
369
370 fn get(&self) -> Option<ItemRef<'_, K, V>> {
371 self.node.map(|node| unsafe { node.as_ref() }.item.as_item_ref())
373 }
374}
375
376type PointerListRefArray<'a, K, V> = Box<[&'a PointerList<K, V>]>;
377
378pub struct SkipListLayerIterMut<'a, K: Key, V: LayerValue> {
386 skip_list: &'a SkipListLayer<K, V>,
387
388 prev_pointers: PointerListRefArray<'a, K, V>,
392
393 insertion_point: Option<PointerListRefArray<'a, K, V>>,
396
397 insertion_nodes: PointerList<K, V>,
399
400 #[allow(dead_code)]
403 write_guard: MutexGuard<'a, ()>,
404
405 item_delta: isize,
407}
408
409impl<'a, K: Key, V: LayerValue> SkipListLayerIterMut<'a, K, V> {
410 pub fn new(skip_list: &'a SkipListLayer<K, V>, bound: std::ops::Bound<&K>) -> Self {
411 let write_guard = skip_list.write_lock.lock();
412 let len = skip_list.pointers.len();
413
414 let mut prev_pointers = vec![&skip_list.pointers; len].into_boxed_slice();
430 match bound {
431 Bound::Unbounded => {}
432 Bound::Included(key) => {
433 let pointers = &mut prev_pointers;
434 for index in (0..len).rev() {
435 while let Some(node) = pointers[index].get(index) {
436 let node = unsafe { node.as_ref() };
442
443 match node.item.key.cmp_upper_bound(key) {
444 Ordering::Equal | Ordering::Greater => break,
445 Ordering::Less => {}
446 }
447 pointers[index] = &node.pointers;
448 }
449 if index > 0 {
450 pointers[index - 1] = pointers[index];
451 }
452 }
453 }
454 Bound::Excluded(_) => panic!("Excluded bounds not supported"),
455 }
456 SkipListLayerIterMut {
457 skip_list,
458 prev_pointers,
459 insertion_point: None,
460 insertion_nodes: PointerList::new(len),
461 write_guard,
462 item_delta: 0,
463 }
464 }
465}
466
467impl<K: Key, V: LayerValue> Drop for SkipListLayerIterMut<'_, K, V> {
468 fn drop(&mut self) {
469 self.commit();
470 }
471}
472
473impl<K: Key, V: LayerValue> LayerIteratorMut<K, V> for SkipListLayerIterMut<'_, K, V> {
474 fn advance(&mut self) {
475 if self.insertion_point.is_some() {
476 if let Some(item) = self.get() {
477 let copy = item.cloned();
479 self.insert(copy);
480 self.erase();
481 }
482 } else {
483 let pointers = &mut self.prev_pointers;
484 if let Some(next) = pointers[0].get(0) {
485 let next = unsafe { next.as_ref() };
488 for i in 0..next.pointers.len() {
489 pointers[i] = &next.pointers;
490 }
491 }
492 }
493 }
494
495 fn get(&self) -> Option<ItemRef<'_, K, V>> {
496 self.prev_pointers[0].get(0).map(|node| unsafe { node.as_ref() }.item.as_item_ref())
499 }
500
501 fn insert(&mut self, item: Item<K, V>) {
502 use rand::Rng;
503 let mut rng = rand::rng();
504 let max_pointers = self.skip_list.pointers.len();
505 let pointer_count = max_pointers
508 - min(
509 (rng.random_range(0..2u32.pow(max_pointers as u32) - 1) as f32).log2() as usize,
510 max_pointers - 1,
511 );
512 let node = Box::leak(self.skip_list.alloc_node(item, pointer_count));
513 if self.insertion_point.is_none() {
514 self.insertion_point = Some(self.prev_pointers.clone());
515 }
516 let node_ptr = node.into();
517 for i in 0..pointer_count {
518 let pointers = self.prev_pointers[i];
519 node.pointers.set(i, pointers.get(i));
520 if self.insertion_nodes.get(i).is_none() {
521 self.insertion_nodes.set(i, Some(node_ptr));
524 } else {
525 pointers.set(i, Some(node_ptr));
528 }
529 self.prev_pointers[i] = &node.pointers;
531 }
532 self.item_delta += 1;
533 }
534
535 fn erase(&mut self) {
536 let pointers = &mut self.prev_pointers;
537 if let Some(next) = pointers[0].get(0) {
538 let next = unsafe { next.as_ref() };
541 if self.insertion_point.is_none() {
542 self.insertion_point = Some(pointers.clone());
543 }
544 if self.insertion_nodes.get(0).is_none() {
545 pointers[0] = &next.pointers;
548 } else {
549 pointers[0].set(0, next.pointers.get(0));
554 }
555 for i in 1..next.pointers.len() {
558 pointers[i].set(i, next.pointers.get(i));
559 }
560 }
561 self.item_delta -= 1;
562 }
563
564 fn commit(&mut self) {
567 let prev_pointers = match self.insertion_point.take() {
569 Some(prev_pointers) => prev_pointers,
570 None => return,
571 };
572
573 let maybe_erase = prev_pointers[0].get(0);
575
576 if self.insertion_nodes.get(0).is_none() {
578 prev_pointers[0].set(0, self.prev_pointers[0].get(0));
582 } else {
583 for i in 0..self.insertion_nodes.len() {
587 if let Some(node) = self.insertion_nodes.get(i) {
588 prev_pointers[i].set(i, Some(node));
589 }
590 }
591 }
592
593 let mut inner = self.skip_list.inner.lock();
595 inner.item_count = inner.item_count.checked_add_signed(self.item_delta).unwrap();
596 if let Some(start) = maybe_erase {
597 let end = self.prev_pointers[0].get(0);
598 if maybe_erase != end {
599 if inner.current_count > 0 || !inner.erase_lists.is_empty() {
600 let count = std::mem::take(&mut inner.current_count);
601 let epoch = inner.epoch;
602 inner.erase_lists.insert(epoch, EpochEraseList { count, start, end });
603 inner.epoch = inner.epoch.wrapping_add(1);
604 } else {
605 inner.free_erase_list(self.skip_list, start, end);
606 }
607 }
608 }
609 }
610}
611
612#[cfg(test)]
613mod tests {
614 use super::{SkipListLayer, SkipListLayerIterMut};
615 use crate::lsm_tree::merge::ItemOp::{Discard, Replace};
616 use crate::lsm_tree::merge::{MergeLayerIterator, MergeResult};
617 use crate::lsm_tree::skip_list_layer::SkipListLayerIter;
618 use crate::lsm_tree::types::{
619 DefaultOrdLowerBound, DefaultOrdUpperBound, Existence, FuzzyHash, Item, ItemRef, Layer,
620 LayerIterator, LayerIteratorMut, SortByU64,
621 };
622 use crate::serialized_types::{
623 LATEST_VERSION, Version, Versioned, VersionedLatest, versioned_type,
624 };
625 use assert_matches::assert_matches;
626 use fprint::TypeFingerprint;
627 use fuchsia_async as fasync;
628 use futures::future::join_all;
629 use futures::{FutureExt as _, join};
630 use fxfs_macros::{FuzzyHash, SerializeKey};
631 use std::hash::Hash;
632 use std::ops::Bound;
633 use std::time::{Duration, Instant};
634
635 #[derive(
636 Clone,
637 Eq,
638 Debug,
639 Hash,
640 FuzzyHash,
641 PartialEq,
642 PartialOrd,
643 Ord,
644 serde::Serialize,
645 serde::Deserialize,
646 TypeFingerprint,
647 Versioned,
648 SerializeKey,
649 )]
650 struct TestKey(u64);
651
652 versioned_type! { 1.. => TestKey }
653
654 impl SortByU64 for TestKey {
655 fn get_leading_u64(&self) -> u64 {
656 self.0
657 }
658 }
659
660 impl DefaultOrdLowerBound for TestKey {}
661 impl DefaultOrdUpperBound for TestKey {}
662
663 #[fuchsia::test]
664 async fn test_key_exists() {
665 let skip_list = SkipListLayer::new(100);
666 skip_list.insert(Item::new(TestKey(1), 1)).expect("insert error");
667 skip_list.insert(Item::new(TestKey(3), 3)).expect("insert error");
668
669 assert_eq!(
670 skip_list.key_exists(&TestKey(0)).await.expect("key_exists failed"),
671 Existence::Missing
672 );
673 assert_eq!(
674 skip_list.key_exists(&TestKey(1)).await.expect("key_exists failed"),
675 Existence::Exists
676 );
677 assert_eq!(
678 skip_list.key_exists(&TestKey(2)).await.expect("key_exists failed"),
679 Existence::Missing
680 );
681 assert_eq!(
682 skip_list.key_exists(&TestKey(3)).await.expect("key_exists failed"),
683 Existence::Exists
684 );
685 assert_eq!(
686 skip_list.key_exists(&TestKey(4)).await.expect("key_exists failed"),
687 Existence::Missing
688 );
689 }
690
691 #[fuchsia::test]
692 async fn test_iteration() {
693 let skip_list = SkipListLayer::new(100);
695 let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2)];
696 skip_list.insert(items[1].clone()).expect("insert error");
697 skip_list.insert(items[0].clone()).expect("insert error");
698 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
699 let ItemRef { key, value, .. } = iter.get().expect("missing item");
700 assert_eq!((key, value), (&items[0].key, &items[0].value));
701 iter.advance().await.unwrap();
702 let ItemRef { key, value, .. } = iter.get().expect("missing item");
703 assert_eq!((key, value), (&items[1].key, &items[1].value));
704 iter.advance().await.unwrap();
705 assert!(iter.get().is_none());
706 }
707
708 #[fuchsia::test]
709 async fn test_seek_exact() {
710 let skip_list = SkipListLayer::new(100);
712 for i in (0..100).rev() {
713 skip_list.insert(Item::new(TestKey(i), i)).expect("insert error");
714 }
715 let mut iter = skip_list.seek(Bound::Included(&TestKey(57))).await.unwrap();
716 let ItemRef { key, value, .. } = iter.get().expect("missing item");
717 assert_eq!((key, value), (&TestKey(57), &57));
718
719 iter.advance().await.unwrap();
721 let ItemRef { key, value, .. } = iter.get().expect("missing item");
722 assert_eq!((key, value), (&TestKey(58), &58));
723 }
724
725 #[fuchsia::test]
726 async fn test_seek_lower_bound() {
727 let skip_list = SkipListLayer::new(100);
729 for i in (0..100).rev() {
730 skip_list.insert(Item::new(TestKey(i * 3), i * 3)).expect("insert error");
731 }
732 let mut expected_index = 57 * 3;
733 let mut iter = skip_list.seek(Bound::Included(&TestKey(expected_index - 1))).await.unwrap();
734 let ItemRef { key, value, .. } = iter.get().expect("missing item");
735 assert_eq!((key, value), (&TestKey(expected_index), &expected_index));
736
737 expected_index += 3;
739 iter.advance().await.unwrap();
740 let ItemRef { key, value, .. } = iter.get().expect("missing item");
741 assert_eq!((key, value), (&TestKey(expected_index), &expected_index));
742 }
743
744 #[fuchsia::test]
745 async fn test_replace_or_insert_replaces() {
746 let skip_list = SkipListLayer::new(100);
747 let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2)];
748 skip_list.insert(items[1].clone()).expect("insert error");
749 skip_list.insert(items[0].clone()).expect("insert error");
750 let replacement_value = 3;
751 skip_list.replace_or_insert(Item::new(items[1].key.clone(), replacement_value));
752
753 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
754 let ItemRef { key, value, .. } = iter.get().expect("missing item");
755 assert_eq!((key, value), (&items[0].key, &items[0].value));
756 iter.advance().await.unwrap();
757 let ItemRef { key, value, .. } = iter.get().expect("missing item");
758 assert_eq!((key, value), (&items[1].key, &replacement_value));
759 iter.advance().await.unwrap();
760 assert!(iter.get().is_none());
761 }
762
763 #[fuchsia::test]
764 async fn test_replace_or_insert_inserts() {
765 let skip_list = SkipListLayer::new(100);
766 let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2), Item::new(TestKey(3), 3)];
767 skip_list.insert(items[2].clone()).expect("insert error");
768 skip_list.insert(items[0].clone()).expect("insert error");
769 skip_list.replace_or_insert(items[1].clone());
770
771 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
772 let ItemRef { key, value, .. } = iter.get().expect("missing item");
773 assert_eq!((key, value), (&items[0].key, &items[0].value));
774 iter.advance().await.unwrap();
775 let ItemRef { key, value, .. } = iter.get().expect("missing item");
776 assert_eq!((key, value), (&items[1].key, &items[1].value));
777 iter.advance().await.unwrap();
778 let ItemRef { key, value, .. } = iter.get().expect("missing item");
779 assert_eq!((key, value), (&items[2].key, &items[2].value));
780 iter.advance().await.unwrap();
781 assert!(iter.get().is_none());
782 }
783
784 #[fuchsia::test]
785 async fn test_erase() {
786 let skip_list = SkipListLayer::new(100);
787 let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2)];
788 skip_list.insert(items[1].clone()).expect("insert error");
789 skip_list.insert(items[0].clone()).expect("insert error");
790
791 assert_eq!(skip_list.len(), 2);
792
793 skip_list.erase(&items[1].key);
794
795 assert_eq!(skip_list.len(), 1);
796
797 {
798 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
799 let ItemRef { key, value, .. } = iter.get().expect("missing item");
800 assert_eq!((key, value), (&items[0].key, &items[0].value));
801 iter.advance().await.unwrap();
802 assert!(iter.get().is_none());
803 }
804
805 skip_list.erase(&items[0].key);
806
807 assert_eq!(skip_list.len(), 0);
808
809 {
810 let iter = skip_list.seek(Bound::Unbounded).await.unwrap();
811 assert!(iter.get().is_none());
812 }
813 }
814
815 #[fuchsia::test]
818 #[ignore]
819 async fn test_seek_is_log_n_complexity() {
820 let mut n = 100;
823 let mut loops = 0;
824 const TARGET_TIME: Duration = Duration::from_millis(500);
825 let time = loop {
826 let skip_list = SkipListLayer::new(n as usize);
827 for i in 0..n {
828 skip_list.insert(Item::new(TestKey(i), i)).expect("insert error");
829 }
830 let start = Instant::now();
831 for i in 0..n {
832 skip_list.seek(Bound::Included(&TestKey(i))).await.unwrap();
833 }
834 let elapsed = Instant::now() - start;
835 if elapsed > TARGET_TIME {
836 break elapsed;
837 }
838 n *= 2;
839 loops += 1;
840 };
841
842 let seek_count = n;
843 n >>= loops / 2; let skip_list = SkipListLayer::new(n as usize);
845 for i in 0..n {
846 skip_list.insert(Item::new(TestKey(i), i)).expect("insert error");
847 }
848 let start = Instant::now();
849 for i in 0..seek_count {
850 skip_list.seek(Bound::Included(&TestKey(i))).await.unwrap();
851 }
852 let elapsed = Instant::now() - start;
853
854 eprintln!(
855 "{} items: {}ms, {} items: {}ms",
856 seek_count,
857 time.as_millis(),
858 n,
859 elapsed.as_millis()
860 );
861
862 assert!(elapsed * 4 > time);
866 }
867
868 #[fuchsia::test]
869 async fn test_large_number_of_items() {
870 let item_count = 1000;
871 let skip_list = SkipListLayer::new(1000);
872 for i in 1..item_count {
873 skip_list.insert(Item::new(TestKey(i), 1)).expect("insert error");
874 }
875 let mut iter = skip_list.seek(Bound::Included(&TestKey(item_count - 10))).await.unwrap();
876 for i in item_count - 10..item_count {
877 assert_eq!(iter.get().expect("missing item").key, &TestKey(i));
878 iter.advance().await.unwrap();
879 }
880 assert!(iter.get().is_none());
881 }
882
883 #[fuchsia::test]
884 async fn test_multiple_readers_allowed() {
885 let skip_list = SkipListLayer::new(100);
886 let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2)];
887 skip_list.insert(items[1].clone()).expect("insert error");
888 skip_list.insert(items[0].clone()).expect("insert error");
889
890 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
892 let ItemRef { key, value, .. } = iter.get().expect("missing item");
893 assert_eq!((key, value), (&items[0].key, &items[0].value));
894
895 let iter2 = skip_list.seek(Bound::Unbounded).await.unwrap();
897 let ItemRef { key, value, .. } = iter2.get().expect("missing item");
898 assert_eq!((key, value), (&items[0].key, &items[0].value));
899
900 iter.advance().await.unwrap();
902 let ItemRef { key, value, .. } = iter.get().expect("missing item");
903 assert_eq!((key, value), (&items[1].key, &items[1].value));
904 }
905
906 fn merge(
907 left: &'_ MergeLayerIterator<'_, TestKey, i32>,
908 right: &'_ MergeLayerIterator<'_, TestKey, i32>,
909 ) -> MergeResult<TestKey, i32> {
910 MergeResult::Other {
911 emit: None,
912 left: Replace(Item::new((*left.key()).clone(), *left.value() + *right.value()).boxed()),
913 right: Discard,
914 }
915 }
916
917 #[fuchsia::test]
918 async fn test_merge_into() {
919 let skip_list = SkipListLayer::new(100);
920 skip_list.insert(Item::new(TestKey(1), 1)).expect("insert error");
921
922 skip_list.merge_into(Item::new(TestKey(2), 2), &TestKey(1), merge);
923
924 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
925 let ItemRef { key, value, .. } = iter.get().expect("missing item");
926 assert_eq!((key, value), (&TestKey(1), &3));
927 iter.advance().await.unwrap();
928 assert!(iter.get().is_none());
929 }
930
931 #[fuchsia::test]
932 async fn test_two_inserts() {
933 let skip_list = SkipListLayer::new(100);
934 let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2)];
935 {
936 let mut iter = SkipListLayerIterMut::new(&skip_list, std::ops::Bound::Unbounded);
937 iter.insert(items[0].clone());
938 iter.insert(items[1].clone());
939 }
940
941 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
942 let ItemRef { key, value, .. } = iter.get().expect("missing item");
943 assert_eq!((key, value), (&items[0].key, &items[0].value));
944 iter.advance().await.unwrap();
945 let ItemRef { key, value, .. } = iter.get().expect("missing item");
946 assert_eq!((key, value), (&items[1].key, &items[1].value));
947 }
948
949 #[fuchsia::test]
950 async fn test_erase_after_insert() {
951 let skip_list = SkipListLayer::new(100);
952 let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2)];
953 skip_list.insert(items[1].clone()).expect("insert error");
954 {
955 let mut iter = SkipListLayerIterMut::new(&skip_list, std::ops::Bound::Unbounded);
956 iter.insert(items[0].clone());
957 iter.erase();
958 }
959
960 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
961 let ItemRef { key, value, .. } = iter.get().expect("missing item");
962 assert_eq!((key, value), (&items[0].key, &items[0].value));
963 iter.advance().await.unwrap();
964 assert!(iter.get().is_none());
965 }
966
967 #[fuchsia::test]
968 async fn test_insert_after_erase() {
969 let skip_list = SkipListLayer::new(100);
970 let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2)];
971 skip_list.insert(items[1].clone()).expect("insert error");
972 {
973 let mut iter = SkipListLayerIterMut::new(&skip_list, std::ops::Bound::Unbounded);
974 iter.erase();
975 iter.insert(items[0].clone());
976 }
977
978 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
979 let ItemRef { key, value, .. } = iter.get().expect("missing item");
980 assert_eq!((key, value), (&items[0].key, &items[0].value));
981 iter.advance().await.unwrap();
982 assert!(iter.get().is_none());
983 }
984
985 #[fuchsia::test]
986 async fn test_insert_erase_insert() {
987 let skip_list = SkipListLayer::new(100);
988 let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2), Item::new(TestKey(3), 3)];
989 skip_list.insert(items[0].clone()).expect("insert error");
990 {
991 let mut iter = SkipListLayerIterMut::new(&skip_list, std::ops::Bound::Unbounded);
992 iter.insert(items[1].clone());
993 iter.erase();
994 iter.insert(items[2].clone());
995 }
996
997 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
998 let ItemRef { key, value, .. } = iter.get().expect("missing item");
999 assert_eq!((key, value), (&items[1].key, &items[1].value));
1000 iter.advance().await.unwrap();
1001 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1002 assert_eq!((key, value), (&items[2].key, &items[2].value));
1003 }
1004
1005 #[fuchsia::test]
1006 async fn test_two_erase_erases() {
1007 let skip_list = SkipListLayer::new(100);
1008 let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2), Item::new(TestKey(3), 3)];
1009 skip_list.insert(items[0].clone()).expect("insert error");
1010 skip_list.insert(items[1].clone()).expect("insert error");
1011 skip_list.insert(items[2].clone()).expect("insert error");
1012 {
1013 let mut iter = SkipListLayerIterMut::new(&skip_list, std::ops::Bound::Unbounded);
1014 iter.erase();
1015 iter.erase();
1016 }
1017
1018 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1019 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1020 assert_eq!((key, value), (&items[2].key, &items[2].value));
1021 iter.advance().await.unwrap();
1022 assert!(iter.get().is_none());
1023 }
1024
1025 #[fuchsia::test]
1026 async fn test_readers_not_blocked_by_writers() {
1027 let skip_list = SkipListLayer::new(100);
1028 let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2)];
1029 skip_list.insert(items[1].clone()).expect("insert error");
1030
1031 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1032 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1033 assert_eq!((key, value), (&items[1].key, &items[1].value));
1034
1035 let mut iter2 = skip_list.seek(Bound::Unbounded).await.unwrap();
1036 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1037 assert_eq!((key, value), (&items[1].key, &items[1].value));
1038
1039 join!(async { skip_list.insert(items[0].clone()).expect("insert error") }, async {
1040 loop {
1041 let iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1042 let ItemRef { key, .. } = iter.get().expect("missing item");
1043 if key == &items[0].key {
1044 break;
1045 }
1046 }
1047 iter.advance().await.unwrap();
1048 assert!(iter.get().is_none());
1049 std::mem::drop(iter);
1050 iter2.advance().await.unwrap();
1051 assert!(iter2.get().is_none());
1052 std::mem::drop(iter2);
1053 });
1054 }
1055
1056 #[fuchsia::test(threads = 20)]
1057 async fn test_many_readers_and_writers() {
1058 let skip_list = SkipListLayer::new(100);
1059 join_all(
1060 (0..10)
1061 .map(|i| {
1062 let skip_list_clone = skip_list.clone();
1063 fasync::Task::spawn(async move {
1064 for j in 0..10 {
1065 skip_list_clone
1066 .insert(Item::new(TestKey(i * 100 + j), i))
1067 .expect("insert error");
1068 }
1069 })
1070 })
1071 .chain((0..10).map(|_| {
1072 let skip_list_clone = skip_list.clone();
1073 fasync::Task::spawn(async move {
1074 for _ in 0..300 {
1075 let mut iter =
1076 skip_list_clone.seek(Bound::Unbounded).await.expect("seek failed");
1077 let mut last_item: Option<TestKey> = None;
1078 while let Some(item) = iter.get() {
1079 if let Some(last) = last_item {
1080 assert!(item.key > &last);
1081 }
1082 last_item = Some(item.key.clone());
1083 iter.advance().await.expect("advance failed");
1084 }
1085 }
1086 })
1087 })),
1088 )
1089 .await;
1090 }
1091
1092 #[fuchsia::test]
1093 async fn test_insert_advance_erase() {
1094 let skip_list = SkipListLayer::new(100);
1095 let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2), Item::new(TestKey(3), 3)];
1096 skip_list.insert(items[1].clone()).expect("insert error");
1097 skip_list.insert(items[2].clone()).expect("insert error");
1098
1099 assert_eq!(skip_list.len(), 2);
1100
1101 {
1102 let mut iter = SkipListLayerIterMut::new(&skip_list, std::ops::Bound::Unbounded);
1103 iter.insert(items[0].clone());
1104 iter.advance();
1105 iter.erase();
1106 }
1107
1108 assert_eq!(skip_list.len(), 2);
1109
1110 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1111 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1112 assert_eq!((key, value), (&items[0].key, &items[0].value));
1113 iter.advance().await.unwrap();
1114 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1115 assert_eq!((key, value), (&items[1].key, &items[1].value));
1116 iter.advance().await.unwrap();
1117 assert!(iter.get().is_none());
1118 }
1119
1120 #[fuchsia::test]
1121 async fn test_seek_excluded() {
1122 let skip_list = SkipListLayer::new(100);
1123 let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2)];
1124 skip_list.insert(items[0].clone()).expect("insert error");
1125 skip_list.insert(items[1].clone()).expect("insert error");
1126 let iter = skip_list.seek(Bound::Excluded(&items[0].key)).await.expect("seek failed");
1127 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1128 assert_eq!((key, value), (&items[1].key, &items[1].value));
1129 }
1130
1131 #[fuchsia::test]
1132 fn test_insert_race() {
1133 for _ in 0..1000 {
1134 let skip_list = SkipListLayer::new(100);
1135 skip_list.insert(Item::new(TestKey(2), 2)).expect("insert error");
1136
1137 let skip_list_clone = skip_list.clone();
1138 let thread1 = std::thread::spawn(move || {
1139 skip_list_clone.insert(Item::new(TestKey(1), 1)).expect("insert error")
1140 });
1141 let thread2 = std::thread::spawn(move || {
1142 let iter = SkipListLayerIter::new(&skip_list, Bound::Included(&TestKey(2)));
1143 match iter.get() {
1144 Some(ItemRef { key: TestKey(2), .. }) => {}
1145 result => assert!(false, "{:?}", result),
1146 }
1147 });
1148 thread1.join().unwrap();
1149 thread2.join().unwrap();
1150 }
1151 }
1152
1153 #[fuchsia::test]
1154 fn test_replace_or_insert_multi_thread() {
1155 let skip_list = SkipListLayer::new(100);
1156 skip_list.insert(Item::new(TestKey(1), 1)).expect("insert error");
1157 skip_list.insert(Item::new(TestKey(2), 2)).expect("insert error");
1158 skip_list.insert(Item::new(TestKey(3), 3)).expect("insert error");
1159 skip_list.insert(Item::new(TestKey(4), 4)).expect("insert error");
1160
1161 let mut threads = Vec::new();
1163 for i in 0..200 {
1164 let skip_list_clone = skip_list.clone();
1165 threads.push(std::thread::spawn(move || {
1166 skip_list_clone.replace_or_insert(Item::new(TestKey(3), i));
1167 }));
1168 }
1169
1170 let _checker_thread = std::thread::spawn(move || {
1172 loop {
1173 let mut iter = SkipListLayerIter::new(&skip_list, Bound::Included(&TestKey(2)));
1174 assert_matches!(iter.get(), Some(ItemRef { key: TestKey(2), .. }));
1175 iter.advance().now_or_never().unwrap().unwrap();
1176 assert_matches!(iter.get(), Some(ItemRef { key: TestKey(3), .. }));
1177 iter.advance().now_or_never().unwrap().unwrap();
1178 assert_matches!(iter.get(), Some(ItemRef { key: TestKey(4), .. }));
1179 }
1180 });
1181
1182 for thread in threads {
1183 thread.join().unwrap();
1184 }
1185 }
1186}