fxfs/lsm_tree/
skip_list_layer.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5// There are a great many optimisations that could be considered to improve performance and maybe
6// memory usage.
7
8use crate::drop_event::DropEvent;
9use crate::log::*;
10use crate::lsm_tree::merge::{self, MergeFn};
11use crate::lsm_tree::types::{
12    BoxedLayerIterator, Item, ItemCount, ItemRef, Key, Layer, LayerIterator, LayerIteratorMut,
13    LayerValue, OrdLowerBound, OrdUpperBound,
14};
15use crate::serialized_types::{Version, LATEST_VERSION};
16use anyhow::{bail, Error};
17use async_trait::async_trait;
18use fuchsia_sync::{Mutex, MutexGuard};
19use std::cell::UnsafeCell;
20use std::cmp::{min, Ordering};
21use std::collections::BTreeMap;
22use std::ops::{Bound, Range};
23use std::sync::atomic::{self, AtomicPtr, AtomicU32};
24use std::sync::Arc;
25
26// Each skip list node contains a variable sized pointer list. The head pointers also exist in the
27// form of a pointer list. Index 0 in the pointer list is the chain with the most elements i.e.
28// contains every element in the list.
29struct PointerList<K, V>(Box<[AtomicPtr<SkipListNode<K, V>>]>);
30
31impl<K, V> PointerList<K, V> {
32    fn new(count: usize) -> PointerList<K, V> {
33        let mut pointers = Vec::new();
34        for _ in 0..count {
35            pointers.push(AtomicPtr::new(std::ptr::null_mut()));
36        }
37        PointerList(pointers.into_boxed_slice())
38    }
39
40    fn len(&self) -> usize {
41        self.0.len()
42    }
43
44    // Extracts the pointer at the given index.
45    fn get_mut<'a>(&self, index: usize) -> Option<&'a mut SkipListNode<K, V>> {
46        unsafe { self.0[index].load(atomic::Ordering::SeqCst).as_mut() }
47    }
48
49    // Same as previous, but returns an immutable reference.
50    fn get<'a>(&self, index: usize) -> Option<&'a SkipListNode<K, V>> {
51        unsafe { self.0[index].load(atomic::Ordering::SeqCst).as_ref() }
52    }
53
54    // Sets the pointer at the given index.
55    fn set(&self, index: usize, node: Option<&SkipListNode<K, V>>) {
56        self.0[index].store(
57            match node {
58                None => std::ptr::null_mut(),
59                Some(node) => {
60                    // https://github.com/rust-lang/rust/issues/66136#issuecomment-550003651
61                    // suggests that the following is the best way to cast from const* to mut*.
62                    unsafe {
63                        (&*(node as *const SkipListNode<K, V>
64                            as *const UnsafeCell<SkipListNode<K, V>>))
65                            .get()
66                    }
67                }
68            },
69            atomic::Ordering::SeqCst,
70        );
71    }
72
73    fn get_ptr(&self, index: usize) -> *mut SkipListNode<K, V> {
74        self.0[index].load(atomic::Ordering::SeqCst)
75    }
76}
77
78struct SkipListNode<K, V> {
79    item: Item<K, V>,
80    pointers: PointerList<K, V>,
81}
82
83pub struct SkipListLayer<K, V> {
84    // These are the head pointers for the list.
85    pointers: PointerList<K, V>,
86
87    inner: Mutex<Inner<K, V>>,
88
89    // Writes are locked using this lock.
90    write_lock: Mutex<()>,
91
92    // The number of nodes that have been allocated.  This is only used for debugging purposes.
93    allocated: AtomicU32,
94
95    close_event: Mutex<Option<Arc<DropEvent>>>,
96}
97
98// The writer needs to synchronize with the readers and this is done by keeping track of read
99// counts.  We could, in theory, remove the mutex and make the read counts atomic (and thus make
100// reads truly lock free) but it's simpler and easier to reason about with a mutex and what matters
101// most is that we avoid using a futures::lock::Mutex for readers because that can be blocked for
102// relatively long periods of time.
103struct Inner<K, V> {
104    // After a write, if there are nodes that need to be freed, and existing readers, the epoch
105    // changes and new readers will be in a new epoch.  When all the old readers finish, the nodes
106    // can be freed.
107    epoch: u64,
108
109    // The number of readers on the current epoch.
110    current_count: u16,
111
112    // A list of nodes to be freed once the read counts have reached zero.
113    erase_lists: BTreeMap<u64, EpochEraseList<K, V>>,
114
115    // The number of items in the skip-list.
116    item_count: usize,
117}
118
119// After a mutation that involves erasing nodes, we must keep the nodes alive until there are no
120// more readers in any of the epochs prior to the mutation.  To deal with this, we track the number
121// of outstanding readers in each epoch so that when the count reaches zero, we know it is safe to
122// free the nodes.
123struct EpochEraseList<K, V> {
124    // The number of readers still associated with this epoch.  When this reaches zero, the list can
125    // be freed once all previous epochs have been freed.
126    count: u16,
127    // We represent the list by storing the head and tail of the list which each node chained to the
128    // next.
129    range: Range<*mut SkipListNode<K, V>>,
130}
131
132// Required because of `erase_lists` which holds pointers.
133unsafe impl<K, V> Send for Inner<K, V> {}
134
135impl<K, V> Inner<K, V> {
136    fn new() -> Self {
137        Inner { epoch: 0, current_count: 0, erase_lists: BTreeMap::new(), item_count: 0 }
138    }
139
140    fn free_erase_list(
141        &mut self,
142        owner: &SkipListLayer<K, V>,
143        list: Range<*mut SkipListNode<K, V>>,
144    ) {
145        let mut maybe_node = unsafe { list.start.as_mut() };
146        loop {
147            match maybe_node {
148                Some(node) if node as *const _ != list.end => {
149                    maybe_node = owner.free_node(node);
150                }
151                _ => break,
152            }
153        }
154    }
155}
156
157impl<K, V> SkipListLayer<K, V> {
158    pub fn new(max_item_count: usize) -> Arc<SkipListLayer<K, V>> {
159        Arc::new(SkipListLayer {
160            pointers: PointerList::new((max_item_count as f32).log2() as usize + 1),
161            inner: Mutex::new(Inner::new()),
162            write_lock: Mutex::new(()),
163            allocated: AtomicU32::new(0),
164            close_event: Mutex::new(Some(Arc::new(DropEvent::new()))),
165        })
166    }
167
168    pub fn len(&self) -> usize {
169        self.inner.lock().item_count
170    }
171
172    fn alloc_node(&self, item: Item<K, V>, pointer_count: usize) -> Box<SkipListNode<K, V>> {
173        self.allocated.fetch_add(1, atomic::Ordering::Relaxed);
174        Box::new(SkipListNode { item, pointers: PointerList::new(pointer_count) })
175    }
176
177    // Frees and then returns the next node in the chain.
178    fn free_node(&self, node: &mut SkipListNode<K, V>) -> Option<&mut SkipListNode<K, V>> {
179        self.allocated.fetch_sub(1, atomic::Ordering::Relaxed);
180        unsafe { Box::from_raw(node).pointers.get_mut(0) }
181    }
182}
183
184impl<K: Eq + Key + OrdLowerBound, V: LayerValue> SkipListLayer<K, V> {
185    // Erases the given item. Does nothing if the item doesn't exist.
186    pub fn erase(&self, key: &K)
187    where
188        K: std::cmp::Eq,
189    {
190        let mut iter = SkipListLayerIterMut::new(self, Bound::Included(key));
191        if let Some(ItemRef { key: k, .. }) = iter.get() {
192            if k == key {
193                iter.erase();
194            } else {
195                warn!("Attempt to erase key not present!");
196            }
197        }
198        iter.commit();
199    }
200
201    /// Inserts the given item.
202    pub fn insert(&self, item: Item<K, V>) -> Result<(), Error> {
203        let mut iter = SkipListLayerIterMut::new(self, Bound::Included(&item.key));
204        if let Some(found_item) = iter.get() {
205            if found_item.key == &item.key {
206                bail!("Attempted to insert an existing key");
207            }
208        }
209        iter.insert(item);
210        Ok(())
211    }
212
213    /// Replaces or inserts the given item.
214    pub fn replace_or_insert(&self, item: Item<K, V>) {
215        let mut iter = SkipListLayerIterMut::new(self, Bound::Included(&item.key));
216        if let Some(found_item) = iter.get() {
217            if found_item.key == &item.key {
218                iter.erase();
219            }
220        }
221        iter.insert(item);
222    }
223
224    /// Merges the item into the layer.
225    pub fn merge_into(&self, item: Item<K, V>, lower_bound: &K, merge_fn: MergeFn<K, V>) {
226        merge::merge_into(
227            Box::new(SkipListLayerIterMut::new(self, Bound::Included(lower_bound))),
228            item,
229            merge_fn,
230        )
231        .unwrap();
232    }
233}
234
235// We have to manually manage memory.
236impl<K, V> Drop for SkipListLayer<K, V> {
237    fn drop(&mut self) {
238        let mut next = self.pointers.get_mut(0);
239        while let Some(node) = next {
240            next = self.free_node(node);
241        }
242        assert_eq!(self.allocated.load(atomic::Ordering::Relaxed), 0);
243    }
244}
245
246#[async_trait]
247impl<K: Key, V: LayerValue> Layer<K, V> for SkipListLayer<K, V> {
248    async fn seek<'a>(
249        &'a self,
250        bound: std::ops::Bound<&K>,
251    ) -> Result<BoxedLayerIterator<'a, K, V>, Error> {
252        Ok(Box::new(SkipListLayerIter::new(self, bound)))
253    }
254
255    fn lock(&self) -> Option<Arc<DropEvent>> {
256        self.close_event.lock().clone()
257    }
258
259    fn estimated_len(&self) -> ItemCount {
260        ItemCount::Precise(self.inner.lock().item_count)
261    }
262
263    async fn close(&self) {
264        let listener = self.close_event.lock().take().expect("close already called").listen();
265        listener.await;
266    }
267
268    fn get_version(&self) -> Version {
269        // The SkipListLayer is stored in RAM and written to disk as a SimplePersistentLayer
270        // Hence, the SkipListLayer is always at the latest version
271        return LATEST_VERSION;
272    }
273
274    fn record_inspect_data(self: Arc<Self>, node: &fuchsia_inspect::Node) {
275        node.record_bool("persistent", false);
276        node.record_uint("num_items", self.inner.lock().item_count as u64);
277    }
278}
279
280// -- SkipListLayerIter --
281
282struct SkipListLayerIter<'a, K, V> {
283    skip_list: &'a SkipListLayer<K, V>,
284
285    // The epoch for this reader.
286    epoch: u64,
287
288    // The current node.
289    node: Option<&'a SkipListNode<K, V>>,
290}
291
292impl<'a, K: OrdUpperBound, V> SkipListLayerIter<'a, K, V> {
293    fn new(skip_list: &'a SkipListLayer<K, V>, bound: Bound<&K>) -> Self {
294        let epoch = {
295            let mut inner = skip_list.inner.lock();
296            inner.current_count += 1;
297            inner.epoch
298        };
299        let (included, key) = match bound {
300            Bound::Unbounded => {
301                return SkipListLayerIter { skip_list, epoch, node: skip_list.pointers.get(0) };
302            }
303            Bound::Included(key) => (true, key),
304            Bound::Excluded(key) => (false, key),
305        };
306        let mut last_pointers = &skip_list.pointers;
307
308        // Some care needs to be taken here because new elements can be inserted atomically, so it
309        // is important that the node we return in the iterator is the same node that we performed
310        // the last comparison on.
311        let mut node = None;
312        for index in (0..skip_list.pointers.len()).rev() {
313            // Keep iterating along this level until we encounter a key that's >= our search key.
314            loop {
315                node = last_pointers.get(index);
316                if let Some(node) = node {
317                    match &node.item.key.cmp_upper_bound(key) {
318                        Ordering::Equal if included => break,
319                        Ordering::Greater => break,
320                        _ => {}
321                    }
322                    last_pointers = &node.pointers;
323                } else {
324                    break;
325                }
326            }
327        }
328        SkipListLayerIter { skip_list, epoch, node }
329    }
330}
331
332impl<K, V> Drop for SkipListLayerIter<'_, K, V> {
333    fn drop(&mut self) {
334        let mut inner = self.skip_list.inner.lock();
335        if self.epoch == inner.epoch {
336            inner.current_count -= 1;
337        } else {
338            if let Some(erase_list) = inner.erase_lists.get_mut(&self.epoch) {
339                erase_list.count -= 1;
340                if erase_list.count == 0 {
341                    while let Some(entry) = inner.erase_lists.first_entry() {
342                        if entry.get().count == 0 {
343                            let range = entry.remove_entry().1.range;
344                            inner.free_erase_list(self.skip_list, range);
345                        } else {
346                            break;
347                        }
348                    }
349                }
350            }
351        }
352    }
353}
354
355#[async_trait]
356impl<K: Key, V: LayerValue> LayerIterator<K, V> for SkipListLayerIter<'_, K, V> {
357    async fn advance(&mut self) -> Result<(), Error> {
358        match self.node {
359            None => {}
360            Some(node) => self.node = node.pointers.get(0),
361        }
362        Ok(())
363    }
364
365    fn get(&self) -> Option<ItemRef<'_, K, V>> {
366        self.node.map(|node| node.item.as_item_ref())
367    }
368}
369
370type PointerListRefArray<'a, K, V> = Box<[&'a PointerList<K, V>]>;
371
372// -- SkipListLayerIterMut --
373
374// This works by building an insertion chain.  When that chain is committed, it is done atomically
375// so that readers are not interrupted.  When the existing readers are finished, it is then safe to
376// release memory for any nodes that might have been erased.  In the case that we are only erasing
377// elements, there will be no insertion chain, in which case we just atomically remove the elements
378// from the chain.
379pub struct SkipListLayerIterMut<'a, K: Key, V: LayerValue> {
380    skip_list: &'a SkipListLayer<K, V>,
381
382    // Since this is a mutable iterator, we need to keep pointers to all the nodes that precede the
383    // current position at every level, so that we can update them when inserting or erasing
384    // elements.
385    prev_pointers: PointerListRefArray<'a, K, V>,
386
387    // When we first insert or erase an element, we take a copy of prev_pointers so that
388    // we know which pointers need to be updated when we commit.
389    insertion_point: Option<PointerListRefArray<'a, K, V>>,
390
391    // These are the nodes that we should point to when we commit.
392    insertion_nodes: PointerList<K, V>,
393
394    // Only one write can proceed at a time.  We only need a place to keep the mutex guard, which is
395    // why Rust thinks this is unused.
396    #[allow(dead_code)]
397    write_guard: MutexGuard<'a, ()>,
398
399    // The change in item count as a result of this mutation.
400    item_delta: isize,
401}
402
403impl<'a, K: Key, V: LayerValue> SkipListLayerIterMut<'a, K, V> {
404    pub fn new(skip_list: &'a SkipListLayer<K, V>, bound: std::ops::Bound<&K>) -> Self {
405        let write_guard = skip_list.write_lock.lock();
406        let len = skip_list.pointers.len();
407
408        // Start by setting all the previous pointers to the head.
409        //
410        // To understand how the previous pointers work, imagine the list looks something like the
411        // following:
412        //
413        // 2  |--->|
414        // 1  |--->|--|------->|
415        // 0  |--->|--|--|--|->|
416        //  HEAD   A  B  C  D  E  F
417        //
418        // Now imagine that the iterator is pointing at element D. In that case, the previous
419        // pointers will point at C for index 0, B for index 1 and A for index 2. With that
420        // information, it will be possible to insert an element immediately prior to D and
421        // correctly update as many pointers as required (remember a new element will be given a
422        // random number of levels).
423        let mut prev_pointers = vec![&skip_list.pointers; len].into_boxed_slice();
424        match bound {
425            Bound::Unbounded => {}
426            Bound::Included(key) => {
427                let pointers = &mut prev_pointers;
428                for index in (0..len).rev() {
429                    while let Some(node) = pointers[index].get(index) {
430                        // Keep iterating along this level until we encounter a key that's >= our
431                        // search key.
432                        match &(node.item.key).cmp_upper_bound(key) {
433                            Ordering::Equal | Ordering::Greater => break,
434                            Ordering::Less => {}
435                        }
436                        pointers[index] = &node.pointers;
437                    }
438                    if index > 0 {
439                        pointers[index - 1] = pointers[index];
440                    }
441                }
442            }
443            Bound::Excluded(_) => panic!("Excluded bounds not supported"),
444        }
445        SkipListLayerIterMut {
446            skip_list,
447            prev_pointers,
448            insertion_point: None,
449            insertion_nodes: PointerList::new(len),
450            write_guard,
451            item_delta: 0,
452        }
453    }
454}
455
456impl<K: Key, V: LayerValue> Drop for SkipListLayerIterMut<'_, K, V> {
457    fn drop(&mut self) {
458        self.commit();
459    }
460}
461
462impl<K: Key, V: LayerValue> LayerIteratorMut<K, V> for SkipListLayerIterMut<'_, K, V> {
463    fn advance(&mut self) {
464        if self.insertion_point.is_some() {
465            if let Some(item) = self.get() {
466                // Copy the current item into the insertion chain.
467                let copy = item.cloned();
468                self.insert(copy);
469                self.erase();
470            }
471        } else {
472            let pointers = &mut self.prev_pointers;
473            if let Some(next) = pointers[0].get_mut(0) {
474                for i in 0..next.pointers.len() {
475                    pointers[i] = &next.pointers;
476                }
477            }
478        }
479    }
480
481    fn get(&self) -> Option<ItemRef<'_, K, V>> {
482        self.prev_pointers[0].get(0).map(|node| node.item.as_item_ref())
483    }
484
485    fn insert(&mut self, item: Item<K, V>) {
486        use rand::Rng;
487        let mut rng = rand::thread_rng();
488        let max_pointers = self.skip_list.pointers.len();
489        // This chooses a random number of pointers such that each level has half the number of
490        // pointers of the previous one.
491        let pointer_count = max_pointers
492            - min(
493                (rng.gen_range(0..2u32.pow(max_pointers as u32) - 1) as f32).log2() as usize,
494                max_pointers - 1,
495            );
496        let node = Box::leak(self.skip_list.alloc_node(item, pointer_count));
497        if self.insertion_point.is_none() {
498            self.insertion_point = Some(self.prev_pointers.clone());
499        }
500        for i in 0..pointer_count {
501            let pointers = self.prev_pointers[i];
502            node.pointers.set(i, pointers.get(i));
503            if self.insertion_nodes.get(i).is_none() {
504                // If there's no insertion node at this level, record this node as the node to
505                // switch in when we commit.
506                self.insertion_nodes.set(i, Some(node));
507            } else {
508                // There's already an insertion node at this level which means that it's part of the
509                // insertion chain, so we can just update the pointers now.
510                pointers.set(i, Some(&node));
511            }
512            // The iterator should point at the node following the new node i.e. the existing node.
513            self.prev_pointers[i] = &node.pointers;
514        }
515        self.item_delta += 1;
516    }
517
518    fn erase(&mut self) {
519        let pointers = &mut self.prev_pointers;
520        if let Some(next) = pointers[0].get_mut(0) {
521            if self.insertion_point.is_none() {
522                self.insertion_point = Some(pointers.clone());
523            }
524            if self.insertion_nodes.get(0).is_none() {
525                // If there's no insertion node, then just update the iterator position to point to
526                // the next node, and then when we commit, it'll get erased.
527                pointers[0] = &next.pointers;
528            } else {
529                // There's an insertion node, so the current element must be part of the insertion
530                // chain and so we can update the pointers immediately.  There will be another node
531                // that isn't part of the insertion chain that will still point at this node, but it
532                // will disappear when we commit.
533                pointers[0].set(0, next.pointers.get(0));
534            }
535            // Fix up all the pointers except the bottom one. Readers will still find this node,
536            // just not as efficiently.
537            for i in 1..next.pointers.len() {
538                pointers[i].set(i, next.pointers.get(i));
539            }
540        }
541        self.item_delta -= 1;
542    }
543
544    // Commits the changes.  Note that this doesn't wait for readers to finish; any barrier that be
545    // required should be handled by the caller.
546    fn commit(&mut self) {
547        // Splice the changes into the list.
548        let prev_pointers = match self.insertion_point.take() {
549            Some(prev_pointers) => prev_pointers,
550            None => return,
551        };
552
553        // Keep track of the first node that we might need to erase later.
554        let maybe_erase = prev_pointers[0].get_mut(0);
555
556        // If there are no insertion nodes, then it means that we're only erasing nodes.
557        if self.insertion_nodes.get(0).is_none() {
558            // Erase all elements between the insertion point and the current element. The
559            // pointers for levels > 0 should already have been done, so it's only level 0 we
560            // need to worry about.
561            prev_pointers[0].set(0, self.prev_pointers[0].get(0));
562        } else {
563            // Switch the pointers over so that the insertion chain is spliced in.  This is safe
564            // so long as the bottom pointer is done first because that guarantees the new nodes
565            // will be found, just maybe not as efficiently.
566            for i in 0..self.insertion_nodes.len() {
567                if let Some(node) = self.insertion_nodes.get_mut(i) {
568                    prev_pointers[i].set(i, Some(node));
569                }
570            }
571        }
572
573        // Switch the epoch so that we can track when existing readers have finished.
574        let mut inner = self.skip_list.inner.lock();
575        inner.item_count = inner.item_count.checked_add_signed(self.item_delta).unwrap();
576        if let Some(start) = maybe_erase {
577            let end = self.prev_pointers[0].get_ptr(0);
578            if start as *mut _ != end {
579                if inner.current_count > 0 || !inner.erase_lists.is_empty() {
580                    let count = std::mem::take(&mut inner.current_count);
581                    let epoch = inner.epoch;
582                    inner.erase_lists.insert(epoch, EpochEraseList { count, range: start..end });
583                    inner.epoch = inner.epoch.wrapping_add(1);
584                } else {
585                    inner.free_erase_list(self.skip_list, start..end);
586                }
587            }
588        }
589    }
590}
591
592#[cfg(test)]
593mod tests {
594    use super::{SkipListLayer, SkipListLayerIterMut};
595    use crate::lsm_tree::merge::ItemOp::{Discard, Replace};
596    use crate::lsm_tree::merge::{MergeLayerIterator, MergeResult};
597    use crate::lsm_tree::skip_list_layer::SkipListLayerIter;
598    use crate::lsm_tree::types::{
599        DefaultOrdLowerBound, DefaultOrdUpperBound, FuzzyHash, Item, ItemRef, Layer, LayerIterator,
600        LayerIteratorMut, SortByU64,
601    };
602    use crate::serialized_types::{
603        versioned_type, Version, Versioned, VersionedLatest, LATEST_VERSION,
604    };
605    use assert_matches::assert_matches;
606    use fprint::TypeFingerprint;
607    use fuchsia_async as fasync;
608    use futures::future::join_all;
609    use futures::{join, FutureExt as _};
610    use fxfs_macros::FuzzyHash;
611    use std::hash::Hash;
612    use std::ops::Bound;
613    use std::time::{Duration, Instant};
614
615    #[derive(
616        Clone,
617        Eq,
618        Debug,
619        Hash,
620        FuzzyHash,
621        PartialEq,
622        PartialOrd,
623        Ord,
624        serde::Serialize,
625        serde::Deserialize,
626        TypeFingerprint,
627        Versioned,
628    )]
629    struct TestKey(u64);
630
631    versioned_type! { 1.. => TestKey }
632
633    impl SortByU64 for TestKey {
634        fn get_leading_u64(&self) -> u64 {
635            self.0
636        }
637    }
638
639    impl DefaultOrdLowerBound for TestKey {}
640    impl DefaultOrdUpperBound for TestKey {}
641
642    #[fuchsia::test]
643    async fn test_iteration() {
644        // Insert two items and make sure we can iterate back in the correct order.
645        let skip_list = SkipListLayer::new(100);
646        let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2)];
647        skip_list.insert(items[1].clone()).expect("insert error");
648        skip_list.insert(items[0].clone()).expect("insert error");
649        let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
650        let ItemRef { key, value, .. } = iter.get().expect("missing item");
651        assert_eq!((key, value), (&items[0].key, &items[0].value));
652        iter.advance().await.unwrap();
653        let ItemRef { key, value, .. } = iter.get().expect("missing item");
654        assert_eq!((key, value), (&items[1].key, &items[1].value));
655        iter.advance().await.unwrap();
656        assert!(iter.get().is_none());
657    }
658
659    #[fuchsia::test]
660    async fn test_seek_exact() {
661        // Seek for an exact match.
662        let skip_list = SkipListLayer::new(100);
663        for i in (0..100).rev() {
664            skip_list.insert(Item::new(TestKey(i), i)).expect("insert error");
665        }
666        let mut iter = skip_list.seek(Bound::Included(&TestKey(57))).await.unwrap();
667        let ItemRef { key, value, .. } = iter.get().expect("missing item");
668        assert_eq!((key, value), (&TestKey(57), &57));
669
670        // And check the next item is correct.
671        iter.advance().await.unwrap();
672        let ItemRef { key, value, .. } = iter.get().expect("missing item");
673        assert_eq!((key, value), (&TestKey(58), &58));
674    }
675
676    #[fuchsia::test]
677    async fn test_seek_lower_bound() {
678        // Seek for a non-exact match.
679        let skip_list = SkipListLayer::new(100);
680        for i in (0..100).rev() {
681            skip_list.insert(Item::new(TestKey(i * 3), i * 3)).expect("insert error");
682        }
683        let mut expected_index = 57 * 3;
684        let mut iter = skip_list.seek(Bound::Included(&TestKey(expected_index - 1))).await.unwrap();
685        let ItemRef { key, value, .. } = iter.get().expect("missing item");
686        assert_eq!((key, value), (&TestKey(expected_index), &expected_index));
687
688        // And check the next item is correct.
689        expected_index += 3;
690        iter.advance().await.unwrap();
691        let ItemRef { key, value, .. } = iter.get().expect("missing item");
692        assert_eq!((key, value), (&TestKey(expected_index), &expected_index));
693    }
694
695    #[fuchsia::test]
696    async fn test_replace_or_insert_replaces() {
697        let skip_list = SkipListLayer::new(100);
698        let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2)];
699        skip_list.insert(items[1].clone()).expect("insert error");
700        skip_list.insert(items[0].clone()).expect("insert error");
701        let replacement_value = 3;
702        skip_list.replace_or_insert(Item::new(items[1].key.clone(), replacement_value));
703
704        let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
705        let ItemRef { key, value, .. } = iter.get().expect("missing item");
706        assert_eq!((key, value), (&items[0].key, &items[0].value));
707        iter.advance().await.unwrap();
708        let ItemRef { key, value, .. } = iter.get().expect("missing item");
709        assert_eq!((key, value), (&items[1].key, &replacement_value));
710        iter.advance().await.unwrap();
711        assert!(iter.get().is_none());
712    }
713
714    #[fuchsia::test]
715    async fn test_replace_or_insert_inserts() {
716        let skip_list = SkipListLayer::new(100);
717        let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2), Item::new(TestKey(3), 3)];
718        skip_list.insert(items[2].clone()).expect("insert error");
719        skip_list.insert(items[0].clone()).expect("insert error");
720        skip_list.replace_or_insert(items[1].clone());
721
722        let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
723        let ItemRef { key, value, .. } = iter.get().expect("missing item");
724        assert_eq!((key, value), (&items[0].key, &items[0].value));
725        iter.advance().await.unwrap();
726        let ItemRef { key, value, .. } = iter.get().expect("missing item");
727        assert_eq!((key, value), (&items[1].key, &items[1].value));
728        iter.advance().await.unwrap();
729        let ItemRef { key, value, .. } = iter.get().expect("missing item");
730        assert_eq!((key, value), (&items[2].key, &items[2].value));
731        iter.advance().await.unwrap();
732        assert!(iter.get().is_none());
733    }
734
735    #[fuchsia::test]
736    async fn test_erase() {
737        let skip_list = SkipListLayer::new(100);
738        let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2)];
739        skip_list.insert(items[1].clone()).expect("insert error");
740        skip_list.insert(items[0].clone()).expect("insert error");
741
742        assert_eq!(skip_list.len(), 2);
743
744        skip_list.erase(&items[1].key);
745
746        assert_eq!(skip_list.len(), 1);
747
748        {
749            let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
750            let ItemRef { key, value, .. } = iter.get().expect("missing item");
751            assert_eq!((key, value), (&items[0].key, &items[0].value));
752            iter.advance().await.unwrap();
753            assert!(iter.get().is_none());
754        }
755
756        skip_list.erase(&items[0].key);
757
758        assert_eq!(skip_list.len(), 0);
759
760        {
761            let iter = skip_list.seek(Bound::Unbounded).await.unwrap();
762            assert!(iter.get().is_none());
763        }
764    }
765
766    // This test ends up being flaky on CQ. It is left here as it might be useful in case
767    // significant changes are made.
768    #[fuchsia::test]
769    #[ignore]
770    async fn test_seek_is_log_n_complexity() {
771        // Keep doubling up the number of items until it takes about 500ms to search and then go
772        // back and measure something that should, in theory, take about half that time.
773        let mut n = 100;
774        let mut loops = 0;
775        const TARGET_TIME: Duration = Duration::from_millis(500);
776        let time = loop {
777            let skip_list = SkipListLayer::new(n as usize);
778            for i in 0..n {
779                skip_list.insert(Item::new(TestKey(i), i)).expect("insert error");
780            }
781            let start = Instant::now();
782            for i in 0..n {
783                skip_list.seek(Bound::Included(&TestKey(i))).await.unwrap();
784            }
785            let elapsed = Instant::now() - start;
786            if elapsed > TARGET_TIME {
787                break elapsed;
788            }
789            n *= 2;
790            loops += 1;
791        };
792
793        let seek_count = n;
794        n >>= loops / 2; // This should, in theory, result in 50% seek time.
795        let skip_list = SkipListLayer::new(n as usize);
796        for i in 0..n {
797            skip_list.insert(Item::new(TestKey(i), i)).expect("insert error");
798        }
799        let start = Instant::now();
800        for i in 0..seek_count {
801            skip_list.seek(Bound::Included(&TestKey(i))).await.unwrap();
802        }
803        let elapsed = Instant::now() - start;
804
805        eprintln!(
806            "{} items: {}ms, {} items: {}ms",
807            seek_count,
808            time.as_millis(),
809            n,
810            elapsed.as_millis()
811        );
812
813        // Experimental results show that typically we do a bit better than log(n), but here we just
814        // check that the time we just measured is above 25% of the time we first measured, the
815        // theory suggests it should be around 50%.
816        assert!(elapsed * 4 > time);
817    }
818
819    #[fuchsia::test]
820    async fn test_large_number_of_items() {
821        let item_count = 1000;
822        let skip_list = SkipListLayer::new(1000);
823        for i in 1..item_count {
824            skip_list.insert(Item::new(TestKey(i), 1)).expect("insert error");
825        }
826        let mut iter = skip_list.seek(Bound::Included(&TestKey(item_count - 10))).await.unwrap();
827        for i in item_count - 10..item_count {
828            assert_eq!(iter.get().expect("missing item").key, &TestKey(i));
829            iter.advance().await.unwrap();
830        }
831        assert!(iter.get().is_none());
832    }
833
834    #[fuchsia::test]
835    async fn test_multiple_readers_allowed() {
836        let skip_list = SkipListLayer::new(100);
837        let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2)];
838        skip_list.insert(items[1].clone()).expect("insert error");
839        skip_list.insert(items[0].clone()).expect("insert error");
840
841        // Create the first iterator and check the first item.
842        let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
843        let ItemRef { key, value, .. } = iter.get().expect("missing item");
844        assert_eq!((key, value), (&items[0].key, &items[0].value));
845
846        // Create a second iterator and check the first item.
847        let iter2 = skip_list.seek(Bound::Unbounded).await.unwrap();
848        let ItemRef { key, value, .. } = iter2.get().expect("missing item");
849        assert_eq!((key, value), (&items[0].key, &items[0].value));
850
851        // Now go back to the first iterator and check the second item.
852        iter.advance().await.unwrap();
853        let ItemRef { key, value, .. } = iter.get().expect("missing item");
854        assert_eq!((key, value), (&items[1].key, &items[1].value));
855    }
856
857    fn merge(
858        left: &'_ MergeLayerIterator<'_, TestKey, i32>,
859        right: &'_ MergeLayerIterator<'_, TestKey, i32>,
860    ) -> MergeResult<TestKey, i32> {
861        MergeResult::Other {
862            emit: None,
863            left: Replace(Item::new((*left.key()).clone(), *left.value() + *right.value()).boxed()),
864            right: Discard,
865        }
866    }
867
868    #[fuchsia::test]
869    async fn test_merge_into() {
870        let skip_list = SkipListLayer::new(100);
871        skip_list.insert(Item::new(TestKey(1), 1)).expect("insert error");
872
873        skip_list.merge_into(Item::new(TestKey(2), 2), &TestKey(1), merge);
874
875        let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
876        let ItemRef { key, value, .. } = iter.get().expect("missing item");
877        assert_eq!((key, value), (&TestKey(1), &3));
878        iter.advance().await.unwrap();
879        assert!(iter.get().is_none());
880    }
881
882    #[fuchsia::test]
883    async fn test_two_inserts() {
884        let skip_list = SkipListLayer::new(100);
885        let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2)];
886        {
887            let mut iter = SkipListLayerIterMut::new(&skip_list, std::ops::Bound::Unbounded);
888            iter.insert(items[0].clone());
889            iter.insert(items[1].clone());
890        }
891
892        let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
893        let ItemRef { key, value, .. } = iter.get().expect("missing item");
894        assert_eq!((key, value), (&items[0].key, &items[0].value));
895        iter.advance().await.unwrap();
896        let ItemRef { key, value, .. } = iter.get().expect("missing item");
897        assert_eq!((key, value), (&items[1].key, &items[1].value));
898    }
899
900    #[fuchsia::test]
901    async fn test_erase_after_insert() {
902        let skip_list = SkipListLayer::new(100);
903        let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2)];
904        skip_list.insert(items[1].clone()).expect("insert error");
905        {
906            let mut iter = SkipListLayerIterMut::new(&skip_list, std::ops::Bound::Unbounded);
907            iter.insert(items[0].clone());
908            iter.erase();
909        }
910
911        let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
912        let ItemRef { key, value, .. } = iter.get().expect("missing item");
913        assert_eq!((key, value), (&items[0].key, &items[0].value));
914        iter.advance().await.unwrap();
915        assert!(iter.get().is_none());
916    }
917
918    #[fuchsia::test]
919    async fn test_insert_after_erase() {
920        let skip_list = SkipListLayer::new(100);
921        let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2)];
922        skip_list.insert(items[1].clone()).expect("insert error");
923        {
924            let mut iter = SkipListLayerIterMut::new(&skip_list, std::ops::Bound::Unbounded);
925            iter.erase();
926            iter.insert(items[0].clone());
927        }
928
929        let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
930        let ItemRef { key, value, .. } = iter.get().expect("missing item");
931        assert_eq!((key, value), (&items[0].key, &items[0].value));
932        iter.advance().await.unwrap();
933        assert!(iter.get().is_none());
934    }
935
936    #[fuchsia::test]
937    async fn test_insert_erase_insert() {
938        let skip_list = SkipListLayer::new(100);
939        let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2), Item::new(TestKey(3), 3)];
940        skip_list.insert(items[0].clone()).expect("insert error");
941        {
942            let mut iter = SkipListLayerIterMut::new(&skip_list, std::ops::Bound::Unbounded);
943            iter.insert(items[1].clone());
944            iter.erase();
945            iter.insert(items[2].clone());
946        }
947
948        let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
949        let ItemRef { key, value, .. } = iter.get().expect("missing item");
950        assert_eq!((key, value), (&items[1].key, &items[1].value));
951        iter.advance().await.unwrap();
952        let ItemRef { key, value, .. } = iter.get().expect("missing item");
953        assert_eq!((key, value), (&items[2].key, &items[2].value));
954    }
955
956    #[fuchsia::test]
957    async fn test_two_erase_erases() {
958        let skip_list = SkipListLayer::new(100);
959        let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2), Item::new(TestKey(3), 3)];
960        skip_list.insert(items[0].clone()).expect("insert error");
961        skip_list.insert(items[1].clone()).expect("insert error");
962        skip_list.insert(items[2].clone()).expect("insert error");
963        {
964            let mut iter = SkipListLayerIterMut::new(&skip_list, std::ops::Bound::Unbounded);
965            iter.erase();
966            iter.erase();
967        }
968
969        let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
970        let ItemRef { key, value, .. } = iter.get().expect("missing item");
971        assert_eq!((key, value), (&items[2].key, &items[2].value));
972        iter.advance().await.unwrap();
973        assert!(iter.get().is_none());
974    }
975
976    #[fuchsia::test]
977    async fn test_readers_not_blocked_by_writers() {
978        let skip_list = SkipListLayer::new(100);
979        let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2)];
980        skip_list.insert(items[1].clone()).expect("insert error");
981
982        let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
983        let ItemRef { key, value, .. } = iter.get().expect("missing item");
984        assert_eq!((key, value), (&items[1].key, &items[1].value));
985
986        let mut iter2 = skip_list.seek(Bound::Unbounded).await.unwrap();
987        let ItemRef { key, value, .. } = iter.get().expect("missing item");
988        assert_eq!((key, value), (&items[1].key, &items[1].value));
989
990        join!(async { skip_list.insert(items[0].clone()).expect("insert error") }, async {
991            loop {
992                let iter = skip_list.seek(Bound::Unbounded).await.unwrap();
993                let ItemRef { key, .. } = iter.get().expect("missing item");
994                if key == &items[0].key {
995                    break;
996                }
997            }
998            iter.advance().await.unwrap();
999            assert!(iter.get().is_none());
1000            std::mem::drop(iter);
1001            iter2.advance().await.unwrap();
1002            assert!(iter2.get().is_none());
1003            std::mem::drop(iter2);
1004        });
1005    }
1006
1007    #[fuchsia::test(threads = 20)]
1008    async fn test_many_readers_and_writers() {
1009        let skip_list = SkipListLayer::new(100);
1010        join_all(
1011            (0..10)
1012                .map(|i| {
1013                    let skip_list_clone = skip_list.clone();
1014                    fasync::Task::spawn(async move {
1015                        for j in 0..10 {
1016                            skip_list_clone
1017                                .insert(Item::new(TestKey(i * 100 + j), i))
1018                                .expect("insert error");
1019                        }
1020                    })
1021                })
1022                .chain((0..10).map(|_| {
1023                    let skip_list_clone = skip_list.clone();
1024                    fasync::Task::spawn(async move {
1025                        for _ in 0..300 {
1026                            let mut iter =
1027                                skip_list_clone.seek(Bound::Unbounded).await.expect("seek failed");
1028                            let mut last_item: Option<TestKey> = None;
1029                            while let Some(item) = iter.get() {
1030                                if let Some(last) = last_item {
1031                                    assert!(item.key > &last);
1032                                }
1033                                last_item = Some(item.key.clone());
1034                                iter.advance().await.expect("advance failed");
1035                            }
1036                        }
1037                    })
1038                })),
1039        )
1040        .await;
1041    }
1042
1043    #[fuchsia::test]
1044    async fn test_insert_advance_erase() {
1045        let skip_list = SkipListLayer::new(100);
1046        let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2), Item::new(TestKey(3), 3)];
1047        skip_list.insert(items[1].clone()).expect("insert error");
1048        skip_list.insert(items[2].clone()).expect("insert error");
1049
1050        assert_eq!(skip_list.len(), 2);
1051
1052        {
1053            let mut iter = SkipListLayerIterMut::new(&skip_list, std::ops::Bound::Unbounded);
1054            iter.insert(items[0].clone());
1055            iter.advance();
1056            iter.erase();
1057        }
1058
1059        assert_eq!(skip_list.len(), 2);
1060
1061        let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1062        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1063        assert_eq!((key, value), (&items[0].key, &items[0].value));
1064        iter.advance().await.unwrap();
1065        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1066        assert_eq!((key, value), (&items[1].key, &items[1].value));
1067        iter.advance().await.unwrap();
1068        assert!(iter.get().is_none());
1069    }
1070
1071    #[fuchsia::test]
1072    async fn test_seek_excluded() {
1073        let skip_list = SkipListLayer::new(100);
1074        let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2)];
1075        skip_list.insert(items[0].clone()).expect("insert error");
1076        skip_list.insert(items[1].clone()).expect("insert error");
1077        let iter = skip_list.seek(Bound::Excluded(&items[0].key)).await.expect("seek failed");
1078        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1079        assert_eq!((key, value), (&items[1].key, &items[1].value));
1080    }
1081
1082    #[fuchsia::test]
1083    fn test_insert_race() {
1084        for _ in 0..1000 {
1085            let skip_list = SkipListLayer::new(100);
1086            skip_list.insert(Item::new(TestKey(2), 2)).expect("insert error");
1087
1088            let skip_list_clone = skip_list.clone();
1089            let thread1 = std::thread::spawn(move || {
1090                skip_list_clone.insert(Item::new(TestKey(1), 1)).expect("insert error")
1091            });
1092            let thread2 = std::thread::spawn(move || {
1093                let iter = SkipListLayerIter::new(&skip_list, Bound::Included(&TestKey(2)));
1094                match iter.get() {
1095                    Some(ItemRef { key: TestKey(2), .. }) => {}
1096                    result => assert!(false, "{:?}", result),
1097                }
1098            });
1099            thread1.join().unwrap();
1100            thread2.join().unwrap();
1101        }
1102    }
1103
1104    #[fuchsia::test]
1105    fn test_replace_or_insert_multi_thread() {
1106        let skip_list = SkipListLayer::new(100);
1107        skip_list.insert(Item::new(TestKey(1), 1)).expect("insert error");
1108        skip_list.insert(Item::new(TestKey(2), 2)).expect("insert error");
1109        skip_list.insert(Item::new(TestKey(3), 3)).expect("insert error");
1110        skip_list.insert(Item::new(TestKey(4), 4)).expect("insert error");
1111
1112        // Set up a number of threads that are repeatedly replacing the '3' key.
1113        let mut threads = Vec::new();
1114        for i in 0..200 {
1115            let skip_list_clone = skip_list.clone();
1116            threads.push(std::thread::spawn(move || {
1117                skip_list_clone.replace_or_insert(Item::new(TestKey(3), i));
1118            }));
1119        }
1120
1121        // Have one thread repeatedly checking the list.
1122        let _checker_thread = std::thread::spawn(move || loop {
1123            let mut iter = SkipListLayerIter::new(&skip_list, Bound::Included(&TestKey(2)));
1124            assert_matches!(iter.get(), Some(ItemRef { key: TestKey(2), .. }));
1125            iter.advance().now_or_never().unwrap().unwrap();
1126            assert_matches!(iter.get(), Some(ItemRef { key: TestKey(3), .. }));
1127            iter.advance().now_or_never().unwrap().unwrap();
1128            assert_matches!(iter.get(), Some(ItemRef { key: TestKey(4), .. }));
1129        });
1130
1131        for thread in threads {
1132            thread.join().unwrap();
1133        }
1134    }
1135}