1use crate::drop_event::DropEvent;
9use crate::log::*;
10use crate::lsm_tree::merge::{self, MergeFn};
11use crate::lsm_tree::types::{
12 BoxedLayerIterator, Item, ItemCount, ItemRef, Key, Layer, LayerIterator, LayerIteratorMut,
13 LayerValue, OrdLowerBound, OrdUpperBound,
14};
15use crate::serialized_types::{Version, LATEST_VERSION};
16use anyhow::{bail, Error};
17use async_trait::async_trait;
18use fuchsia_sync::{Mutex, MutexGuard};
19use std::cell::UnsafeCell;
20use std::cmp::{min, Ordering};
21use std::collections::BTreeMap;
22use std::ops::{Bound, Range};
23use std::sync::atomic::{self, AtomicPtr, AtomicU32};
24use std::sync::Arc;
25
26struct PointerList<K, V>(Box<[AtomicPtr<SkipListNode<K, V>>]>);
30
31impl<K, V> PointerList<K, V> {
32 fn new(count: usize) -> PointerList<K, V> {
33 let mut pointers = Vec::new();
34 for _ in 0..count {
35 pointers.push(AtomicPtr::new(std::ptr::null_mut()));
36 }
37 PointerList(pointers.into_boxed_slice())
38 }
39
40 fn len(&self) -> usize {
41 self.0.len()
42 }
43
44 fn get_mut<'a>(&self, index: usize) -> Option<&'a mut SkipListNode<K, V>> {
46 unsafe { self.0[index].load(atomic::Ordering::SeqCst).as_mut() }
47 }
48
49 fn get<'a>(&self, index: usize) -> Option<&'a SkipListNode<K, V>> {
51 unsafe { self.0[index].load(atomic::Ordering::SeqCst).as_ref() }
52 }
53
54 fn set(&self, index: usize, node: Option<&SkipListNode<K, V>>) {
56 self.0[index].store(
57 match node {
58 None => std::ptr::null_mut(),
59 Some(node) => {
60 unsafe {
63 (&*(node as *const SkipListNode<K, V>
64 as *const UnsafeCell<SkipListNode<K, V>>))
65 .get()
66 }
67 }
68 },
69 atomic::Ordering::SeqCst,
70 );
71 }
72
73 fn get_ptr(&self, index: usize) -> *mut SkipListNode<K, V> {
74 self.0[index].load(atomic::Ordering::SeqCst)
75 }
76}
77
78struct SkipListNode<K, V> {
79 item: Item<K, V>,
80 pointers: PointerList<K, V>,
81}
82
83pub struct SkipListLayer<K, V> {
84 pointers: PointerList<K, V>,
86
87 inner: Mutex<Inner<K, V>>,
88
89 write_lock: Mutex<()>,
91
92 allocated: AtomicU32,
94
95 close_event: Mutex<Option<Arc<DropEvent>>>,
96}
97
98struct Inner<K, V> {
104 epoch: u64,
108
109 current_count: u16,
111
112 erase_lists: BTreeMap<u64, EpochEraseList<K, V>>,
114
115 item_count: usize,
117}
118
119struct EpochEraseList<K, V> {
124 count: u16,
127 range: Range<*mut SkipListNode<K, V>>,
130}
131
132unsafe impl<K, V> Send for Inner<K, V> {}
134
135impl<K, V> Inner<K, V> {
136 fn new() -> Self {
137 Inner { epoch: 0, current_count: 0, erase_lists: BTreeMap::new(), item_count: 0 }
138 }
139
140 fn free_erase_list(
141 &mut self,
142 owner: &SkipListLayer<K, V>,
143 list: Range<*mut SkipListNode<K, V>>,
144 ) {
145 let mut maybe_node = unsafe { list.start.as_mut() };
146 loop {
147 match maybe_node {
148 Some(node) if node as *const _ != list.end => {
149 maybe_node = owner.free_node(node);
150 }
151 _ => break,
152 }
153 }
154 }
155}
156
157impl<K, V> SkipListLayer<K, V> {
158 pub fn new(max_item_count: usize) -> Arc<SkipListLayer<K, V>> {
159 Arc::new(SkipListLayer {
160 pointers: PointerList::new((max_item_count as f32).log2() as usize + 1),
161 inner: Mutex::new(Inner::new()),
162 write_lock: Mutex::new(()),
163 allocated: AtomicU32::new(0),
164 close_event: Mutex::new(Some(Arc::new(DropEvent::new()))),
165 })
166 }
167
168 pub fn len(&self) -> usize {
169 self.inner.lock().item_count
170 }
171
172 fn alloc_node(&self, item: Item<K, V>, pointer_count: usize) -> Box<SkipListNode<K, V>> {
173 self.allocated.fetch_add(1, atomic::Ordering::Relaxed);
174 Box::new(SkipListNode { item, pointers: PointerList::new(pointer_count) })
175 }
176
177 fn free_node(&self, node: &mut SkipListNode<K, V>) -> Option<&mut SkipListNode<K, V>> {
179 self.allocated.fetch_sub(1, atomic::Ordering::Relaxed);
180 unsafe { Box::from_raw(node).pointers.get_mut(0) }
181 }
182}
183
184impl<K: Eq + Key + OrdLowerBound, V: LayerValue> SkipListLayer<K, V> {
185 pub fn erase(&self, key: &K)
187 where
188 K: std::cmp::Eq,
189 {
190 let mut iter = SkipListLayerIterMut::new(self, Bound::Included(key));
191 if let Some(ItemRef { key: k, .. }) = iter.get() {
192 if k == key {
193 iter.erase();
194 } else {
195 warn!("Attempt to erase key not present!");
196 }
197 }
198 iter.commit();
199 }
200
201 pub fn insert(&self, item: Item<K, V>) -> Result<(), Error> {
203 let mut iter = SkipListLayerIterMut::new(self, Bound::Included(&item.key));
204 if let Some(found_item) = iter.get() {
205 if found_item.key == &item.key {
206 bail!("Attempted to insert an existing key");
207 }
208 }
209 iter.insert(item);
210 Ok(())
211 }
212
213 pub fn replace_or_insert(&self, item: Item<K, V>) {
215 let mut iter = SkipListLayerIterMut::new(self, Bound::Included(&item.key));
216 if let Some(found_item) = iter.get() {
217 if found_item.key == &item.key {
218 iter.erase();
219 }
220 }
221 iter.insert(item);
222 }
223
224 pub fn merge_into(&self, item: Item<K, V>, lower_bound: &K, merge_fn: MergeFn<K, V>) {
226 merge::merge_into(
227 Box::new(SkipListLayerIterMut::new(self, Bound::Included(lower_bound))),
228 item,
229 merge_fn,
230 )
231 .unwrap();
232 }
233}
234
235impl<K, V> Drop for SkipListLayer<K, V> {
237 fn drop(&mut self) {
238 let mut next = self.pointers.get_mut(0);
239 while let Some(node) = next {
240 next = self.free_node(node);
241 }
242 assert_eq!(self.allocated.load(atomic::Ordering::Relaxed), 0);
243 }
244}
245
246#[async_trait]
247impl<K: Key, V: LayerValue> Layer<K, V> for SkipListLayer<K, V> {
248 async fn seek<'a>(
249 &'a self,
250 bound: std::ops::Bound<&K>,
251 ) -> Result<BoxedLayerIterator<'a, K, V>, Error> {
252 Ok(Box::new(SkipListLayerIter::new(self, bound)))
253 }
254
255 fn lock(&self) -> Option<Arc<DropEvent>> {
256 self.close_event.lock().clone()
257 }
258
259 fn estimated_len(&self) -> ItemCount {
260 ItemCount::Precise(self.inner.lock().item_count)
261 }
262
263 async fn close(&self) {
264 let listener = self.close_event.lock().take().expect("close already called").listen();
265 listener.await;
266 }
267
268 fn get_version(&self) -> Version {
269 return LATEST_VERSION;
272 }
273
274 fn record_inspect_data(self: Arc<Self>, node: &fuchsia_inspect::Node) {
275 node.record_bool("persistent", false);
276 node.record_uint("num_items", self.inner.lock().item_count as u64);
277 }
278}
279
280struct SkipListLayerIter<'a, K, V> {
283 skip_list: &'a SkipListLayer<K, V>,
284
285 epoch: u64,
287
288 node: Option<&'a SkipListNode<K, V>>,
290}
291
292impl<'a, K: OrdUpperBound, V> SkipListLayerIter<'a, K, V> {
293 fn new(skip_list: &'a SkipListLayer<K, V>, bound: Bound<&K>) -> Self {
294 let epoch = {
295 let mut inner = skip_list.inner.lock();
296 inner.current_count += 1;
297 inner.epoch
298 };
299 let (included, key) = match bound {
300 Bound::Unbounded => {
301 return SkipListLayerIter { skip_list, epoch, node: skip_list.pointers.get(0) };
302 }
303 Bound::Included(key) => (true, key),
304 Bound::Excluded(key) => (false, key),
305 };
306 let mut last_pointers = &skip_list.pointers;
307
308 let mut node = None;
312 for index in (0..skip_list.pointers.len()).rev() {
313 loop {
315 node = last_pointers.get(index);
316 if let Some(node) = node {
317 match &node.item.key.cmp_upper_bound(key) {
318 Ordering::Equal if included => break,
319 Ordering::Greater => break,
320 _ => {}
321 }
322 last_pointers = &node.pointers;
323 } else {
324 break;
325 }
326 }
327 }
328 SkipListLayerIter { skip_list, epoch, node }
329 }
330}
331
332impl<K, V> Drop for SkipListLayerIter<'_, K, V> {
333 fn drop(&mut self) {
334 let mut inner = self.skip_list.inner.lock();
335 if self.epoch == inner.epoch {
336 inner.current_count -= 1;
337 } else {
338 if let Some(erase_list) = inner.erase_lists.get_mut(&self.epoch) {
339 erase_list.count -= 1;
340 if erase_list.count == 0 {
341 while let Some(entry) = inner.erase_lists.first_entry() {
342 if entry.get().count == 0 {
343 let range = entry.remove_entry().1.range;
344 inner.free_erase_list(self.skip_list, range);
345 } else {
346 break;
347 }
348 }
349 }
350 }
351 }
352 }
353}
354
355#[async_trait]
356impl<K: Key, V: LayerValue> LayerIterator<K, V> for SkipListLayerIter<'_, K, V> {
357 async fn advance(&mut self) -> Result<(), Error> {
358 match self.node {
359 None => {}
360 Some(node) => self.node = node.pointers.get(0),
361 }
362 Ok(())
363 }
364
365 fn get(&self) -> Option<ItemRef<'_, K, V>> {
366 self.node.map(|node| node.item.as_item_ref())
367 }
368}
369
370type PointerListRefArray<'a, K, V> = Box<[&'a PointerList<K, V>]>;
371
372pub struct SkipListLayerIterMut<'a, K: Key, V: LayerValue> {
380 skip_list: &'a SkipListLayer<K, V>,
381
382 prev_pointers: PointerListRefArray<'a, K, V>,
386
387 insertion_point: Option<PointerListRefArray<'a, K, V>>,
390
391 insertion_nodes: PointerList<K, V>,
393
394 #[allow(dead_code)]
397 write_guard: MutexGuard<'a, ()>,
398
399 item_delta: isize,
401}
402
403impl<'a, K: Key, V: LayerValue> SkipListLayerIterMut<'a, K, V> {
404 pub fn new(skip_list: &'a SkipListLayer<K, V>, bound: std::ops::Bound<&K>) -> Self {
405 let write_guard = skip_list.write_lock.lock();
406 let len = skip_list.pointers.len();
407
408 let mut prev_pointers = vec![&skip_list.pointers; len].into_boxed_slice();
424 match bound {
425 Bound::Unbounded => {}
426 Bound::Included(key) => {
427 let pointers = &mut prev_pointers;
428 for index in (0..len).rev() {
429 while let Some(node) = pointers[index].get(index) {
430 match &(node.item.key).cmp_upper_bound(key) {
433 Ordering::Equal | Ordering::Greater => break,
434 Ordering::Less => {}
435 }
436 pointers[index] = &node.pointers;
437 }
438 if index > 0 {
439 pointers[index - 1] = pointers[index];
440 }
441 }
442 }
443 Bound::Excluded(_) => panic!("Excluded bounds not supported"),
444 }
445 SkipListLayerIterMut {
446 skip_list,
447 prev_pointers,
448 insertion_point: None,
449 insertion_nodes: PointerList::new(len),
450 write_guard,
451 item_delta: 0,
452 }
453 }
454}
455
456impl<K: Key, V: LayerValue> Drop for SkipListLayerIterMut<'_, K, V> {
457 fn drop(&mut self) {
458 self.commit();
459 }
460}
461
462impl<K: Key, V: LayerValue> LayerIteratorMut<K, V> for SkipListLayerIterMut<'_, K, V> {
463 fn advance(&mut self) {
464 if self.insertion_point.is_some() {
465 if let Some(item) = self.get() {
466 let copy = item.cloned();
468 self.insert(copy);
469 self.erase();
470 }
471 } else {
472 let pointers = &mut self.prev_pointers;
473 if let Some(next) = pointers[0].get_mut(0) {
474 for i in 0..next.pointers.len() {
475 pointers[i] = &next.pointers;
476 }
477 }
478 }
479 }
480
481 fn get(&self) -> Option<ItemRef<'_, K, V>> {
482 self.prev_pointers[0].get(0).map(|node| node.item.as_item_ref())
483 }
484
485 fn insert(&mut self, item: Item<K, V>) {
486 use rand::Rng;
487 let mut rng = rand::thread_rng();
488 let max_pointers = self.skip_list.pointers.len();
489 let pointer_count = max_pointers
492 - min(
493 (rng.gen_range(0..2u32.pow(max_pointers as u32) - 1) as f32).log2() as usize,
494 max_pointers - 1,
495 );
496 let node = Box::leak(self.skip_list.alloc_node(item, pointer_count));
497 if self.insertion_point.is_none() {
498 self.insertion_point = Some(self.prev_pointers.clone());
499 }
500 for i in 0..pointer_count {
501 let pointers = self.prev_pointers[i];
502 node.pointers.set(i, pointers.get(i));
503 if self.insertion_nodes.get(i).is_none() {
504 self.insertion_nodes.set(i, Some(node));
507 } else {
508 pointers.set(i, Some(&node));
511 }
512 self.prev_pointers[i] = &node.pointers;
514 }
515 self.item_delta += 1;
516 }
517
518 fn erase(&mut self) {
519 let pointers = &mut self.prev_pointers;
520 if let Some(next) = pointers[0].get_mut(0) {
521 if self.insertion_point.is_none() {
522 self.insertion_point = Some(pointers.clone());
523 }
524 if self.insertion_nodes.get(0).is_none() {
525 pointers[0] = &next.pointers;
528 } else {
529 pointers[0].set(0, next.pointers.get(0));
534 }
535 for i in 1..next.pointers.len() {
538 pointers[i].set(i, next.pointers.get(i));
539 }
540 }
541 self.item_delta -= 1;
542 }
543
544 fn commit(&mut self) {
547 let prev_pointers = match self.insertion_point.take() {
549 Some(prev_pointers) => prev_pointers,
550 None => return,
551 };
552
553 let maybe_erase = prev_pointers[0].get_mut(0);
555
556 if self.insertion_nodes.get(0).is_none() {
558 prev_pointers[0].set(0, self.prev_pointers[0].get(0));
562 } else {
563 for i in 0..self.insertion_nodes.len() {
567 if let Some(node) = self.insertion_nodes.get_mut(i) {
568 prev_pointers[i].set(i, Some(node));
569 }
570 }
571 }
572
573 let mut inner = self.skip_list.inner.lock();
575 inner.item_count = inner.item_count.checked_add_signed(self.item_delta).unwrap();
576 if let Some(start) = maybe_erase {
577 let end = self.prev_pointers[0].get_ptr(0);
578 if start as *mut _ != end {
579 if inner.current_count > 0 || !inner.erase_lists.is_empty() {
580 let count = std::mem::take(&mut inner.current_count);
581 let epoch = inner.epoch;
582 inner.erase_lists.insert(epoch, EpochEraseList { count, range: start..end });
583 inner.epoch = inner.epoch.wrapping_add(1);
584 } else {
585 inner.free_erase_list(self.skip_list, start..end);
586 }
587 }
588 }
589 }
590}
591
592#[cfg(test)]
593mod tests {
594 use super::{SkipListLayer, SkipListLayerIterMut};
595 use crate::lsm_tree::merge::ItemOp::{Discard, Replace};
596 use crate::lsm_tree::merge::{MergeLayerIterator, MergeResult};
597 use crate::lsm_tree::skip_list_layer::SkipListLayerIter;
598 use crate::lsm_tree::types::{
599 DefaultOrdLowerBound, DefaultOrdUpperBound, FuzzyHash, Item, ItemRef, Layer, LayerIterator,
600 LayerIteratorMut, SortByU64,
601 };
602 use crate::serialized_types::{
603 versioned_type, Version, Versioned, VersionedLatest, LATEST_VERSION,
604 };
605 use assert_matches::assert_matches;
606 use fprint::TypeFingerprint;
607 use fuchsia_async as fasync;
608 use futures::future::join_all;
609 use futures::{join, FutureExt as _};
610 use fxfs_macros::FuzzyHash;
611 use std::hash::Hash;
612 use std::ops::Bound;
613 use std::time::{Duration, Instant};
614
615 #[derive(
616 Clone,
617 Eq,
618 Debug,
619 Hash,
620 FuzzyHash,
621 PartialEq,
622 PartialOrd,
623 Ord,
624 serde::Serialize,
625 serde::Deserialize,
626 TypeFingerprint,
627 Versioned,
628 )]
629 struct TestKey(u64);
630
631 versioned_type! { 1.. => TestKey }
632
633 impl SortByU64 for TestKey {
634 fn get_leading_u64(&self) -> u64 {
635 self.0
636 }
637 }
638
639 impl DefaultOrdLowerBound for TestKey {}
640 impl DefaultOrdUpperBound for TestKey {}
641
642 #[fuchsia::test]
643 async fn test_iteration() {
644 let skip_list = SkipListLayer::new(100);
646 let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2)];
647 skip_list.insert(items[1].clone()).expect("insert error");
648 skip_list.insert(items[0].clone()).expect("insert error");
649 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
650 let ItemRef { key, value, .. } = iter.get().expect("missing item");
651 assert_eq!((key, value), (&items[0].key, &items[0].value));
652 iter.advance().await.unwrap();
653 let ItemRef { key, value, .. } = iter.get().expect("missing item");
654 assert_eq!((key, value), (&items[1].key, &items[1].value));
655 iter.advance().await.unwrap();
656 assert!(iter.get().is_none());
657 }
658
659 #[fuchsia::test]
660 async fn test_seek_exact() {
661 let skip_list = SkipListLayer::new(100);
663 for i in (0..100).rev() {
664 skip_list.insert(Item::new(TestKey(i), i)).expect("insert error");
665 }
666 let mut iter = skip_list.seek(Bound::Included(&TestKey(57))).await.unwrap();
667 let ItemRef { key, value, .. } = iter.get().expect("missing item");
668 assert_eq!((key, value), (&TestKey(57), &57));
669
670 iter.advance().await.unwrap();
672 let ItemRef { key, value, .. } = iter.get().expect("missing item");
673 assert_eq!((key, value), (&TestKey(58), &58));
674 }
675
676 #[fuchsia::test]
677 async fn test_seek_lower_bound() {
678 let skip_list = SkipListLayer::new(100);
680 for i in (0..100).rev() {
681 skip_list.insert(Item::new(TestKey(i * 3), i * 3)).expect("insert error");
682 }
683 let mut expected_index = 57 * 3;
684 let mut iter = skip_list.seek(Bound::Included(&TestKey(expected_index - 1))).await.unwrap();
685 let ItemRef { key, value, .. } = iter.get().expect("missing item");
686 assert_eq!((key, value), (&TestKey(expected_index), &expected_index));
687
688 expected_index += 3;
690 iter.advance().await.unwrap();
691 let ItemRef { key, value, .. } = iter.get().expect("missing item");
692 assert_eq!((key, value), (&TestKey(expected_index), &expected_index));
693 }
694
695 #[fuchsia::test]
696 async fn test_replace_or_insert_replaces() {
697 let skip_list = SkipListLayer::new(100);
698 let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2)];
699 skip_list.insert(items[1].clone()).expect("insert error");
700 skip_list.insert(items[0].clone()).expect("insert error");
701 let replacement_value = 3;
702 skip_list.replace_or_insert(Item::new(items[1].key.clone(), replacement_value));
703
704 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
705 let ItemRef { key, value, .. } = iter.get().expect("missing item");
706 assert_eq!((key, value), (&items[0].key, &items[0].value));
707 iter.advance().await.unwrap();
708 let ItemRef { key, value, .. } = iter.get().expect("missing item");
709 assert_eq!((key, value), (&items[1].key, &replacement_value));
710 iter.advance().await.unwrap();
711 assert!(iter.get().is_none());
712 }
713
714 #[fuchsia::test]
715 async fn test_replace_or_insert_inserts() {
716 let skip_list = SkipListLayer::new(100);
717 let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2), Item::new(TestKey(3), 3)];
718 skip_list.insert(items[2].clone()).expect("insert error");
719 skip_list.insert(items[0].clone()).expect("insert error");
720 skip_list.replace_or_insert(items[1].clone());
721
722 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
723 let ItemRef { key, value, .. } = iter.get().expect("missing item");
724 assert_eq!((key, value), (&items[0].key, &items[0].value));
725 iter.advance().await.unwrap();
726 let ItemRef { key, value, .. } = iter.get().expect("missing item");
727 assert_eq!((key, value), (&items[1].key, &items[1].value));
728 iter.advance().await.unwrap();
729 let ItemRef { key, value, .. } = iter.get().expect("missing item");
730 assert_eq!((key, value), (&items[2].key, &items[2].value));
731 iter.advance().await.unwrap();
732 assert!(iter.get().is_none());
733 }
734
735 #[fuchsia::test]
736 async fn test_erase() {
737 let skip_list = SkipListLayer::new(100);
738 let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2)];
739 skip_list.insert(items[1].clone()).expect("insert error");
740 skip_list.insert(items[0].clone()).expect("insert error");
741
742 assert_eq!(skip_list.len(), 2);
743
744 skip_list.erase(&items[1].key);
745
746 assert_eq!(skip_list.len(), 1);
747
748 {
749 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
750 let ItemRef { key, value, .. } = iter.get().expect("missing item");
751 assert_eq!((key, value), (&items[0].key, &items[0].value));
752 iter.advance().await.unwrap();
753 assert!(iter.get().is_none());
754 }
755
756 skip_list.erase(&items[0].key);
757
758 assert_eq!(skip_list.len(), 0);
759
760 {
761 let iter = skip_list.seek(Bound::Unbounded).await.unwrap();
762 assert!(iter.get().is_none());
763 }
764 }
765
766 #[fuchsia::test]
769 #[ignore]
770 async fn test_seek_is_log_n_complexity() {
771 let mut n = 100;
774 let mut loops = 0;
775 const TARGET_TIME: Duration = Duration::from_millis(500);
776 let time = loop {
777 let skip_list = SkipListLayer::new(n as usize);
778 for i in 0..n {
779 skip_list.insert(Item::new(TestKey(i), i)).expect("insert error");
780 }
781 let start = Instant::now();
782 for i in 0..n {
783 skip_list.seek(Bound::Included(&TestKey(i))).await.unwrap();
784 }
785 let elapsed = Instant::now() - start;
786 if elapsed > TARGET_TIME {
787 break elapsed;
788 }
789 n *= 2;
790 loops += 1;
791 };
792
793 let seek_count = n;
794 n >>= loops / 2; let skip_list = SkipListLayer::new(n as usize);
796 for i in 0..n {
797 skip_list.insert(Item::new(TestKey(i), i)).expect("insert error");
798 }
799 let start = Instant::now();
800 for i in 0..seek_count {
801 skip_list.seek(Bound::Included(&TestKey(i))).await.unwrap();
802 }
803 let elapsed = Instant::now() - start;
804
805 eprintln!(
806 "{} items: {}ms, {} items: {}ms",
807 seek_count,
808 time.as_millis(),
809 n,
810 elapsed.as_millis()
811 );
812
813 assert!(elapsed * 4 > time);
817 }
818
819 #[fuchsia::test]
820 async fn test_large_number_of_items() {
821 let item_count = 1000;
822 let skip_list = SkipListLayer::new(1000);
823 for i in 1..item_count {
824 skip_list.insert(Item::new(TestKey(i), 1)).expect("insert error");
825 }
826 let mut iter = skip_list.seek(Bound::Included(&TestKey(item_count - 10))).await.unwrap();
827 for i in item_count - 10..item_count {
828 assert_eq!(iter.get().expect("missing item").key, &TestKey(i));
829 iter.advance().await.unwrap();
830 }
831 assert!(iter.get().is_none());
832 }
833
834 #[fuchsia::test]
835 async fn test_multiple_readers_allowed() {
836 let skip_list = SkipListLayer::new(100);
837 let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2)];
838 skip_list.insert(items[1].clone()).expect("insert error");
839 skip_list.insert(items[0].clone()).expect("insert error");
840
841 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
843 let ItemRef { key, value, .. } = iter.get().expect("missing item");
844 assert_eq!((key, value), (&items[0].key, &items[0].value));
845
846 let iter2 = skip_list.seek(Bound::Unbounded).await.unwrap();
848 let ItemRef { key, value, .. } = iter2.get().expect("missing item");
849 assert_eq!((key, value), (&items[0].key, &items[0].value));
850
851 iter.advance().await.unwrap();
853 let ItemRef { key, value, .. } = iter.get().expect("missing item");
854 assert_eq!((key, value), (&items[1].key, &items[1].value));
855 }
856
857 fn merge(
858 left: &'_ MergeLayerIterator<'_, TestKey, i32>,
859 right: &'_ MergeLayerIterator<'_, TestKey, i32>,
860 ) -> MergeResult<TestKey, i32> {
861 MergeResult::Other {
862 emit: None,
863 left: Replace(Item::new((*left.key()).clone(), *left.value() + *right.value()).boxed()),
864 right: Discard,
865 }
866 }
867
868 #[fuchsia::test]
869 async fn test_merge_into() {
870 let skip_list = SkipListLayer::new(100);
871 skip_list.insert(Item::new(TestKey(1), 1)).expect("insert error");
872
873 skip_list.merge_into(Item::new(TestKey(2), 2), &TestKey(1), merge);
874
875 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
876 let ItemRef { key, value, .. } = iter.get().expect("missing item");
877 assert_eq!((key, value), (&TestKey(1), &3));
878 iter.advance().await.unwrap();
879 assert!(iter.get().is_none());
880 }
881
882 #[fuchsia::test]
883 async fn test_two_inserts() {
884 let skip_list = SkipListLayer::new(100);
885 let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2)];
886 {
887 let mut iter = SkipListLayerIterMut::new(&skip_list, std::ops::Bound::Unbounded);
888 iter.insert(items[0].clone());
889 iter.insert(items[1].clone());
890 }
891
892 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
893 let ItemRef { key, value, .. } = iter.get().expect("missing item");
894 assert_eq!((key, value), (&items[0].key, &items[0].value));
895 iter.advance().await.unwrap();
896 let ItemRef { key, value, .. } = iter.get().expect("missing item");
897 assert_eq!((key, value), (&items[1].key, &items[1].value));
898 }
899
900 #[fuchsia::test]
901 async fn test_erase_after_insert() {
902 let skip_list = SkipListLayer::new(100);
903 let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2)];
904 skip_list.insert(items[1].clone()).expect("insert error");
905 {
906 let mut iter = SkipListLayerIterMut::new(&skip_list, std::ops::Bound::Unbounded);
907 iter.insert(items[0].clone());
908 iter.erase();
909 }
910
911 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
912 let ItemRef { key, value, .. } = iter.get().expect("missing item");
913 assert_eq!((key, value), (&items[0].key, &items[0].value));
914 iter.advance().await.unwrap();
915 assert!(iter.get().is_none());
916 }
917
918 #[fuchsia::test]
919 async fn test_insert_after_erase() {
920 let skip_list = SkipListLayer::new(100);
921 let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2)];
922 skip_list.insert(items[1].clone()).expect("insert error");
923 {
924 let mut iter = SkipListLayerIterMut::new(&skip_list, std::ops::Bound::Unbounded);
925 iter.erase();
926 iter.insert(items[0].clone());
927 }
928
929 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
930 let ItemRef { key, value, .. } = iter.get().expect("missing item");
931 assert_eq!((key, value), (&items[0].key, &items[0].value));
932 iter.advance().await.unwrap();
933 assert!(iter.get().is_none());
934 }
935
936 #[fuchsia::test]
937 async fn test_insert_erase_insert() {
938 let skip_list = SkipListLayer::new(100);
939 let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2), Item::new(TestKey(3), 3)];
940 skip_list.insert(items[0].clone()).expect("insert error");
941 {
942 let mut iter = SkipListLayerIterMut::new(&skip_list, std::ops::Bound::Unbounded);
943 iter.insert(items[1].clone());
944 iter.erase();
945 iter.insert(items[2].clone());
946 }
947
948 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
949 let ItemRef { key, value, .. } = iter.get().expect("missing item");
950 assert_eq!((key, value), (&items[1].key, &items[1].value));
951 iter.advance().await.unwrap();
952 let ItemRef { key, value, .. } = iter.get().expect("missing item");
953 assert_eq!((key, value), (&items[2].key, &items[2].value));
954 }
955
956 #[fuchsia::test]
957 async fn test_two_erase_erases() {
958 let skip_list = SkipListLayer::new(100);
959 let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2), Item::new(TestKey(3), 3)];
960 skip_list.insert(items[0].clone()).expect("insert error");
961 skip_list.insert(items[1].clone()).expect("insert error");
962 skip_list.insert(items[2].clone()).expect("insert error");
963 {
964 let mut iter = SkipListLayerIterMut::new(&skip_list, std::ops::Bound::Unbounded);
965 iter.erase();
966 iter.erase();
967 }
968
969 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
970 let ItemRef { key, value, .. } = iter.get().expect("missing item");
971 assert_eq!((key, value), (&items[2].key, &items[2].value));
972 iter.advance().await.unwrap();
973 assert!(iter.get().is_none());
974 }
975
976 #[fuchsia::test]
977 async fn test_readers_not_blocked_by_writers() {
978 let skip_list = SkipListLayer::new(100);
979 let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2)];
980 skip_list.insert(items[1].clone()).expect("insert error");
981
982 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
983 let ItemRef { key, value, .. } = iter.get().expect("missing item");
984 assert_eq!((key, value), (&items[1].key, &items[1].value));
985
986 let mut iter2 = skip_list.seek(Bound::Unbounded).await.unwrap();
987 let ItemRef { key, value, .. } = iter.get().expect("missing item");
988 assert_eq!((key, value), (&items[1].key, &items[1].value));
989
990 join!(async { skip_list.insert(items[0].clone()).expect("insert error") }, async {
991 loop {
992 let iter = skip_list.seek(Bound::Unbounded).await.unwrap();
993 let ItemRef { key, .. } = iter.get().expect("missing item");
994 if key == &items[0].key {
995 break;
996 }
997 }
998 iter.advance().await.unwrap();
999 assert!(iter.get().is_none());
1000 std::mem::drop(iter);
1001 iter2.advance().await.unwrap();
1002 assert!(iter2.get().is_none());
1003 std::mem::drop(iter2);
1004 });
1005 }
1006
1007 #[fuchsia::test(threads = 20)]
1008 async fn test_many_readers_and_writers() {
1009 let skip_list = SkipListLayer::new(100);
1010 join_all(
1011 (0..10)
1012 .map(|i| {
1013 let skip_list_clone = skip_list.clone();
1014 fasync::Task::spawn(async move {
1015 for j in 0..10 {
1016 skip_list_clone
1017 .insert(Item::new(TestKey(i * 100 + j), i))
1018 .expect("insert error");
1019 }
1020 })
1021 })
1022 .chain((0..10).map(|_| {
1023 let skip_list_clone = skip_list.clone();
1024 fasync::Task::spawn(async move {
1025 for _ in 0..300 {
1026 let mut iter =
1027 skip_list_clone.seek(Bound::Unbounded).await.expect("seek failed");
1028 let mut last_item: Option<TestKey> = None;
1029 while let Some(item) = iter.get() {
1030 if let Some(last) = last_item {
1031 assert!(item.key > &last);
1032 }
1033 last_item = Some(item.key.clone());
1034 iter.advance().await.expect("advance failed");
1035 }
1036 }
1037 })
1038 })),
1039 )
1040 .await;
1041 }
1042
1043 #[fuchsia::test]
1044 async fn test_insert_advance_erase() {
1045 let skip_list = SkipListLayer::new(100);
1046 let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2), Item::new(TestKey(3), 3)];
1047 skip_list.insert(items[1].clone()).expect("insert error");
1048 skip_list.insert(items[2].clone()).expect("insert error");
1049
1050 assert_eq!(skip_list.len(), 2);
1051
1052 {
1053 let mut iter = SkipListLayerIterMut::new(&skip_list, std::ops::Bound::Unbounded);
1054 iter.insert(items[0].clone());
1055 iter.advance();
1056 iter.erase();
1057 }
1058
1059 assert_eq!(skip_list.len(), 2);
1060
1061 let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1062 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1063 assert_eq!((key, value), (&items[0].key, &items[0].value));
1064 iter.advance().await.unwrap();
1065 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1066 assert_eq!((key, value), (&items[1].key, &items[1].value));
1067 iter.advance().await.unwrap();
1068 assert!(iter.get().is_none());
1069 }
1070
1071 #[fuchsia::test]
1072 async fn test_seek_excluded() {
1073 let skip_list = SkipListLayer::new(100);
1074 let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2)];
1075 skip_list.insert(items[0].clone()).expect("insert error");
1076 skip_list.insert(items[1].clone()).expect("insert error");
1077 let iter = skip_list.seek(Bound::Excluded(&items[0].key)).await.expect("seek failed");
1078 let ItemRef { key, value, .. } = iter.get().expect("missing item");
1079 assert_eq!((key, value), (&items[1].key, &items[1].value));
1080 }
1081
1082 #[fuchsia::test]
1083 fn test_insert_race() {
1084 for _ in 0..1000 {
1085 let skip_list = SkipListLayer::new(100);
1086 skip_list.insert(Item::new(TestKey(2), 2)).expect("insert error");
1087
1088 let skip_list_clone = skip_list.clone();
1089 let thread1 = std::thread::spawn(move || {
1090 skip_list_clone.insert(Item::new(TestKey(1), 1)).expect("insert error")
1091 });
1092 let thread2 = std::thread::spawn(move || {
1093 let iter = SkipListLayerIter::new(&skip_list, Bound::Included(&TestKey(2)));
1094 match iter.get() {
1095 Some(ItemRef { key: TestKey(2), .. }) => {}
1096 result => assert!(false, "{:?}", result),
1097 }
1098 });
1099 thread1.join().unwrap();
1100 thread2.join().unwrap();
1101 }
1102 }
1103
1104 #[fuchsia::test]
1105 fn test_replace_or_insert_multi_thread() {
1106 let skip_list = SkipListLayer::new(100);
1107 skip_list.insert(Item::new(TestKey(1), 1)).expect("insert error");
1108 skip_list.insert(Item::new(TestKey(2), 2)).expect("insert error");
1109 skip_list.insert(Item::new(TestKey(3), 3)).expect("insert error");
1110 skip_list.insert(Item::new(TestKey(4), 4)).expect("insert error");
1111
1112 let mut threads = Vec::new();
1114 for i in 0..200 {
1115 let skip_list_clone = skip_list.clone();
1116 threads.push(std::thread::spawn(move || {
1117 skip_list_clone.replace_or_insert(Item::new(TestKey(3), i));
1118 }));
1119 }
1120
1121 let _checker_thread = std::thread::spawn(move || loop {
1123 let mut iter = SkipListLayerIter::new(&skip_list, Bound::Included(&TestKey(2)));
1124 assert_matches!(iter.get(), Some(ItemRef { key: TestKey(2), .. }));
1125 iter.advance().now_or_never().unwrap().unwrap();
1126 assert_matches!(iter.get(), Some(ItemRef { key: TestKey(3), .. }));
1127 iter.advance().now_or_never().unwrap().unwrap();
1128 assert_matches!(iter.get(), Some(ItemRef { key: TestKey(4), .. }));
1129 });
1130
1131 for thread in threads {
1132 thread.join().unwrap();
1133 }
1134 }
1135}