Skip to main content

fxfs/lsm_tree/
skip_list_layer.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5// There are a great many optimisations that could be considered to improve performance and maybe
6// memory usage.
7
8use crate::drop_event::DropEvent;
9use crate::log::*;
10use crate::lsm_tree::merge::{self, MergeFn};
11use crate::lsm_tree::types::{
12    BoxedLayerIterator, Existence, Item, ItemRef, Key, Layer, LayerIterator, LayerIteratorMut,
13    LayerValue, OrdLowerBound, OrdUpperBound,
14};
15use crate::serialized_types::{LATEST_VERSION, Version};
16use anyhow::{Error, bail};
17use async_trait::async_trait;
18use fuchsia_sync::{Mutex, MutexGuard};
19use std::cmp::{Ordering, min};
20use std::collections::BTreeMap;
21use std::ops::Bound;
22use std::ptr::NonNull;
23use std::sync::Arc;
24use std::sync::atomic::{self, AtomicPtr, AtomicU32};
25
26// Each skip list node contains a variable sized pointer list. The head pointers also exist in the
27// form of a pointer list. Index 0 in the pointer list is the chain with the most elements i.e.
28// contains every element in the list.
29struct PointerList<K, V>(Box<[AtomicPtr<SkipListNode<K, V>>]>);
30
31impl<K, V> PointerList<K, V> {
32    fn new(count: usize) -> PointerList<K, V> {
33        let mut pointers = Vec::new();
34        for _ in 0..count {
35            pointers.push(AtomicPtr::new(std::ptr::null_mut()));
36        }
37        PointerList(pointers.into_boxed_slice())
38    }
39
40    fn len(&self) -> usize {
41        self.0.len()
42    }
43
44    // Extracts the pointer at the given index.
45    fn get(&self, index: usize) -> Option<NonNull<SkipListNode<K, V>>> {
46        NonNull::new(self.0[index].load(atomic::Ordering::SeqCst))
47    }
48
49    // Sets the pointer at the given index.
50    fn set(&self, index: usize, node: Option<NonNull<SkipListNode<K, V>>>) {
51        self.0[index]
52            .store(node.map_or(std::ptr::null_mut(), |n| n.as_ptr()), atomic::Ordering::SeqCst);
53    }
54}
55
56struct SkipListNode<K, V> {
57    item: Item<K, V>,
58    pointers: PointerList<K, V>,
59}
60
61pub struct SkipListLayer<K, V> {
62    // These are the head pointers for the list.
63    pointers: PointerList<K, V>,
64
65    inner: Mutex<Inner<K, V>>,
66
67    // Writes are locked using this lock.
68    write_lock: Mutex<()>,
69
70    // The number of nodes that have been allocated.  This is only used for debugging purposes.
71    allocated: AtomicU32,
72
73    close_event: Mutex<Option<Arc<DropEvent>>>,
74}
75
76// The writer needs to synchronize with the readers and this is done by keeping track of read
77// counts.  We could, in theory, remove the mutex and make the read counts atomic (and thus make
78// reads truly lock free) but it's simpler and easier to reason about with a mutex and what matters
79// most is that we avoid using a futures::lock::Mutex for readers because that can be blocked for
80// relatively long periods of time.
81struct Inner<K, V> {
82    // After a write, if there are nodes that need to be freed, and existing readers, the epoch
83    // changes and new readers will be in a new epoch.  When all the old readers finish, the nodes
84    // can be freed.
85    epoch: u64,
86
87    // The number of readers on the current epoch.
88    current_count: u16,
89
90    // A list of nodes to be freed once the read counts have reached zero.
91    erase_lists: BTreeMap<u64, EpochEraseList<K, V>>,
92
93    // The number of items in the skip-list.
94    item_count: usize,
95}
96
97// After a mutation that involves erasing nodes, we must keep the nodes alive until there are no
98// more readers in any of the epochs prior to the mutation.  To deal with this, we track the number
99// of outstanding readers in each epoch so that when the count reaches zero, we know it is safe to
100// free the nodes.
101struct EpochEraseList<K, V> {
102    // The number of readers still associated with this epoch.  When this reaches zero, the list can
103    // be freed once all previous epochs have been freed.
104    count: u16,
105    // We represent the list by storing the head and tail of the list which each node chained to the
106    // next.
107    start: NonNull<SkipListNode<K, V>>,
108    end: Option<NonNull<SkipListNode<K, V>>>,
109}
110
111// SAFETY: Required because of `erase_lists` which holds pointers.
112unsafe impl<K, V> Send for Inner<K, V> {}
113
114impl<K, V> Inner<K, V> {
115    fn new() -> Self {
116        Inner { epoch: 0, current_count: 0, erase_lists: BTreeMap::new(), item_count: 0 }
117    }
118    fn free_erase_list(
119        &mut self,
120        owner: &SkipListLayer<K, V>,
121        start: NonNull<SkipListNode<K, V>>,
122        end: Option<NonNull<SkipListNode<K, V>>>,
123    ) {
124        let mut node = start;
125        loop {
126            // SAFETY: This node has no more references.
127            let next = unsafe { owner.free_node(node) };
128            if next == end {
129                break;
130            }
131            node = next.unwrap();
132        }
133    }
134}
135
136impl<K, V> SkipListLayer<K, V> {
137    pub fn new(max_item_count: usize) -> Arc<SkipListLayer<K, V>> {
138        Arc::new(SkipListLayer {
139            pointers: PointerList::new((max_item_count as f32).log2() as usize + 1),
140            inner: Mutex::new(Inner::new()),
141            write_lock: Mutex::new(()),
142            allocated: AtomicU32::new(0),
143            close_event: Mutex::new(Some(Arc::new(DropEvent::new()))),
144        })
145    }
146
147    pub fn len(&self) -> usize {
148        self.inner.lock().item_count
149    }
150
151    fn alloc_node(&self, item: Item<K, V>, pointer_count: usize) -> Box<SkipListNode<K, V>> {
152        self.allocated.fetch_add(1, atomic::Ordering::Relaxed);
153        Box::new(SkipListNode { item, pointers: PointerList::new(pointer_count) })
154    }
155
156    // Frees and then returns the next node in the chain.
157    //
158    // # Safety
159    //
160    // The node must have no other references.
161    unsafe fn free_node(
162        &self,
163        node: NonNull<SkipListNode<K, V>>,
164    ) -> Option<NonNull<SkipListNode<K, V>>> {
165        self.allocated.fetch_sub(1, atomic::Ordering::Relaxed);
166        unsafe { Box::from_raw(node.as_ptr()).pointers.get(0) }
167    }
168}
169
170impl<K: Eq + Key + OrdLowerBound, V: LayerValue> SkipListLayer<K, V> {
171    // Erases the given item. Does nothing if the item doesn't exist.
172    pub fn erase(&self, key: &K)
173    where
174        K: std::cmp::Eq,
175    {
176        let mut iter = SkipListLayerIterMut::new(self, Bound::Included(key));
177        if let Some(ItemRef { key: k, .. }) = iter.get() {
178            if k == key {
179                iter.erase();
180            } else {
181                warn!("Attempt to erase key not present!");
182            }
183        }
184        iter.commit();
185    }
186
187    /// Inserts the given item.
188    pub fn insert(&self, item: Item<K, V>) -> Result<(), Error> {
189        let mut iter = SkipListLayerIterMut::new(self, Bound::Included(&item.key));
190        if let Some(found_item) = iter.get() {
191            if found_item.key == &item.key {
192                bail!("Attempted to insert an existing key");
193            }
194        }
195        iter.insert(item);
196        Ok(())
197    }
198
199    /// Replaces or inserts the given item.
200    pub fn replace_or_insert(&self, item: Item<K, V>) {
201        let mut iter = SkipListLayerIterMut::new(self, Bound::Included(&item.key));
202        if let Some(found_item) = iter.get() {
203            if found_item.key == &item.key {
204                iter.erase();
205            }
206        }
207        iter.insert(item);
208    }
209
210    /// Merges the item into the layer.
211    pub fn merge_into(&self, item: Item<K, V>, lower_bound: &K, merge_fn: MergeFn<K, V>) {
212        merge::merge_into(
213            Box::new(SkipListLayerIterMut::new(self, Bound::Included(lower_bound))),
214            item,
215            merge_fn,
216        )
217        .unwrap();
218    }
219}
220
221// We have to manually manage memory.
222impl<K, V> Drop for SkipListLayer<K, V> {
223    fn drop(&mut self) {
224        let mut next = self.pointers.get(0);
225        while let Some(node) = next {
226            // SAFETY: The node has no more references.
227            next = unsafe { self.free_node(node) };
228        }
229        assert_eq!(self.allocated.load(atomic::Ordering::Relaxed), 0);
230    }
231}
232
233#[async_trait]
234impl<K: Key, V: LayerValue> Layer<K, V> for SkipListLayer<K, V> {
235    async fn seek<'a>(
236        &'a self,
237        bound: std::ops::Bound<&K>,
238    ) -> Result<BoxedLayerIterator<'a, K, V>, Error> {
239        Ok(Box::new(SkipListLayerIter::new(self, bound)))
240    }
241
242    fn lock(&self) -> Option<Arc<DropEvent>> {
243        self.close_event.lock().clone()
244    }
245
246    fn len(&self) -> usize {
247        self.inner.lock().item_count
248    }
249
250    async fn close(&self) {
251        let listener = self.close_event.lock().take().expect("close already called").listen();
252        listener.await;
253    }
254
255    fn get_version(&self) -> Version {
256        // The SkipListLayer is stored in RAM and written to disk as a SimplePersistentLayer
257        // Hence, the SkipListLayer is always at the latest version
258        return LATEST_VERSION;
259    }
260
261    fn record_inspect_data(self: Arc<Self>, node: &fuchsia_inspect::Node) {
262        node.record_bool("persistent", false);
263        node.record_uint("num_items", self.inner.lock().item_count as u64);
264    }
265
266    async fn key_exists(&self, key: &K) -> Result<Existence, Error> {
267        let iter = SkipListLayerIter::new(self, Bound::Included(key));
268        Ok(iter.get().map_or(Existence::Missing, |i| {
269            if i.key.cmp_upper_bound(key).is_eq() { Existence::Exists } else { Existence::Missing }
270        }))
271    }
272}
273
274// -- SkipListLayerIter --
275
276struct SkipListLayerIter<'a, K, V> {
277    skip_list: &'a SkipListLayer<K, V>,
278
279    // The epoch for this reader.
280    epoch: u64,
281
282    // The current node.
283    node: Option<NonNull<SkipListNode<K, V>>>,
284}
285
286// SAFETY: We need this for `node` which is safe to pass across threads.
287unsafe impl<K, V> Send for SkipListLayerIter<'_, K, V> {}
288unsafe impl<K, V> Sync for SkipListLayerIter<'_, K, V> {}
289
290impl<'a, K: OrdUpperBound, V> SkipListLayerIter<'a, K, V> {
291    fn new(skip_list: &'a SkipListLayer<K, V>, bound: Bound<&K>) -> Self {
292        let epoch = {
293            let mut inner = skip_list.inner.lock();
294            inner.current_count += 1;
295            inner.epoch
296        };
297        let (included, key) = match bound {
298            Bound::Unbounded => {
299                return SkipListLayerIter { skip_list, epoch, node: skip_list.pointers.get(0) };
300            }
301            Bound::Included(key) => (true, key),
302            Bound::Excluded(key) => (false, key),
303        };
304        let mut last_pointers = &skip_list.pointers;
305
306        // Some care needs to be taken here because new elements can be inserted atomically, so it
307        // is important that the node we return in the iterator is the same node that we performed
308        // the last comparison on.
309        let mut node = None;
310        for index in (0..skip_list.pointers.len()).rev() {
311            // Keep iterating along this level until we encounter a key that's >= our search key.
312            loop {
313                node = last_pointers.get(index);
314                if let Some(node) = node {
315                    // SAFETY: `node` should be valid; we took a reference to the epoch above.
316                    let node = unsafe { node.as_ref() };
317                    match &node.item.key.cmp_upper_bound(key) {
318                        Ordering::Equal if included => break,
319                        Ordering::Greater => break,
320                        _ => {}
321                    }
322                    last_pointers = &node.pointers;
323                } else {
324                    break;
325                }
326            }
327        }
328        SkipListLayerIter { skip_list, epoch, node }
329    }
330}
331
332impl<K, V> Drop for SkipListLayerIter<'_, K, V> {
333    fn drop(&mut self) {
334        let mut inner = self.skip_list.inner.lock();
335        if self.epoch == inner.epoch {
336            inner.current_count -= 1;
337        } else {
338            if let Some(erase_list) = inner.erase_lists.get_mut(&self.epoch) {
339                erase_list.count -= 1;
340                if erase_list.count == 0 {
341                    while let Some(entry) = inner.erase_lists.first_entry() {
342                        if entry.get().count == 0 {
343                            let EpochEraseList { start, end, .. } = entry.remove_entry().1;
344                            inner.free_erase_list(self.skip_list, start, end);
345                        } else {
346                            break;
347                        }
348                    }
349                }
350            }
351        }
352    }
353}
354
355#[async_trait]
356impl<K: Key, V: LayerValue> LayerIterator<K, V> for SkipListLayerIter<'_, K, V> {
357    async fn advance(&mut self) -> Result<(), Error> {
358        match self.node {
359            None => {}
360            Some(node) => {
361                self.node = {
362                    // SAFETY: `node` should be valid; we took a reference to the epoch in `new`.
363                    unsafe { node.as_ref() }.pointers.get(0)
364                }
365            }
366        }
367        Ok(())
368    }
369
370    fn get(&self) -> Option<ItemRef<'_, K, V>> {
371        // SAFETY: `node` should be valid; we took a reference to the epoch in `new`.
372        self.node.map(|node| unsafe { node.as_ref() }.item.as_item_ref())
373    }
374}
375
376type PointerListRefArray<'a, K, V> = Box<[&'a PointerList<K, V>]>;
377
378// -- SkipListLayerIterMut --
379
380// This works by building an insertion chain.  When that chain is committed, it is done atomically
381// so that readers are not interrupted.  When the existing readers are finished, it is then safe to
382// release memory for any nodes that might have been erased.  In the case that we are only erasing
383// elements, there will be no insertion chain, in which case we just atomically remove the elements
384// from the chain.
385pub struct SkipListLayerIterMut<'a, K: Key, V: LayerValue> {
386    skip_list: &'a SkipListLayer<K, V>,
387
388    // Since this is a mutable iterator, we need to keep pointers to all the nodes that precede the
389    // current position at every level, so that we can update them when inserting or erasing
390    // elements.
391    prev_pointers: PointerListRefArray<'a, K, V>,
392
393    // When we first insert or erase an element, we take a copy of prev_pointers so that
394    // we know which pointers need to be updated when we commit.
395    insertion_point: Option<PointerListRefArray<'a, K, V>>,
396
397    // These are the nodes that we should point to when we commit.
398    insertion_nodes: PointerList<K, V>,
399
400    // Only one write can proceed at a time.  We only need a place to keep the mutex guard, which is
401    // why Rust thinks this is unused.
402    #[allow(dead_code)]
403    write_guard: MutexGuard<'a, ()>,
404
405    // The change in item count as a result of this mutation.
406    item_delta: isize,
407}
408
409impl<'a, K: Key, V: LayerValue> SkipListLayerIterMut<'a, K, V> {
410    pub fn new(skip_list: &'a SkipListLayer<K, V>, bound: std::ops::Bound<&K>) -> Self {
411        let write_guard = skip_list.write_lock.lock();
412        let len = skip_list.pointers.len();
413
414        // Start by setting all the previous pointers to the head.
415        //
416        // To understand how the previous pointers work, imagine the list looks something like the
417        // following:
418        //
419        // 2  |--->|
420        // 1  |--->|--|------->|
421        // 0  |--->|--|--|--|->|
422        //  HEAD   A  B  C  D  E  F
423        //
424        // Now imagine that the iterator is pointing at element D. In that case, the previous
425        // pointers will point at C for index 0, B for index 1 and A for index 2. With that
426        // information, it will be possible to insert an element immediately prior to D and
427        // correctly update as many pointers as required (remember a new element will be given a
428        // random number of levels).
429        let mut prev_pointers = vec![&skip_list.pointers; len].into_boxed_slice();
430        match bound {
431            Bound::Unbounded => {}
432            Bound::Included(key) => {
433                let pointers = &mut prev_pointers;
434                for index in (0..len).rev() {
435                    while let Some(node) = pointers[index].get(index) {
436                        // Keep iterating along this level until we encounter a key that's >= our
437                        // search key.
438
439                        // SAFETY: `node` should be valid; a write guard was taken above so nodes
440                        // in the current epoch cannot be erased.
441                        let node = unsafe { node.as_ref() };
442
443                        match node.item.key.cmp_upper_bound(key) {
444                            Ordering::Equal | Ordering::Greater => break,
445                            Ordering::Less => {}
446                        }
447                        pointers[index] = &node.pointers;
448                    }
449                    if index > 0 {
450                        pointers[index - 1] = pointers[index];
451                    }
452                }
453            }
454            Bound::Excluded(_) => panic!("Excluded bounds not supported"),
455        }
456        SkipListLayerIterMut {
457            skip_list,
458            prev_pointers,
459            insertion_point: None,
460            insertion_nodes: PointerList::new(len),
461            write_guard,
462            item_delta: 0,
463        }
464    }
465}
466
467impl<K: Key, V: LayerValue> Drop for SkipListLayerIterMut<'_, K, V> {
468    fn drop(&mut self) {
469        self.commit();
470    }
471}
472
473impl<K: Key, V: LayerValue> LayerIteratorMut<K, V> for SkipListLayerIterMut<'_, K, V> {
474    fn advance(&mut self) {
475        if self.insertion_point.is_some() {
476            if let Some(item) = self.get() {
477                // Copy the current item into the insertion chain.
478                let copy = item.cloned();
479                self.insert(copy);
480                self.erase();
481            }
482        } else {
483            let pointers = &mut self.prev_pointers;
484            if let Some(next) = pointers[0].get(0) {
485                // SAFETY: `node` should be valid; a write guard was taken above so nodes
486                // in the current epoch cannot be erased.
487                let next = unsafe { next.as_ref() };
488                for i in 0..next.pointers.len() {
489                    pointers[i] = &next.pointers;
490                }
491            }
492        }
493    }
494
495    fn get(&self) -> Option<ItemRef<'_, K, V>> {
496        // SAFETY: `node` should be valid; a write guard was taken above so nodes in the current
497        // epoch cannot be erased.
498        self.prev_pointers[0].get(0).map(|node| unsafe { node.as_ref() }.item.as_item_ref())
499    }
500
501    fn insert(&mut self, item: Item<K, V>) {
502        use rand::Rng;
503        let mut rng = rand::rng();
504        let max_pointers = self.skip_list.pointers.len();
505        // This chooses a random number of pointers such that each level has half the number of
506        // pointers of the previous one.
507        let pointer_count = max_pointers
508            - min(
509                (rng.random_range(0..2u32.pow(max_pointers as u32) - 1) as f32).log2() as usize,
510                max_pointers - 1,
511            );
512        let node = Box::leak(self.skip_list.alloc_node(item, pointer_count));
513        if self.insertion_point.is_none() {
514            self.insertion_point = Some(self.prev_pointers.clone());
515        }
516        let node_ptr = node.into();
517        for i in 0..pointer_count {
518            let pointers = self.prev_pointers[i];
519            node.pointers.set(i, pointers.get(i));
520            if self.insertion_nodes.get(i).is_none() {
521                // If there's no insertion node at this level, record this node as the node to
522                // switch in when we commit.
523                self.insertion_nodes.set(i, Some(node_ptr));
524            } else {
525                // There's already an insertion node at this level which means that it's part of the
526                // insertion chain, so we can just update the pointers now.
527                pointers.set(i, Some(node_ptr));
528            }
529            // The iterator should point at the node following the new node i.e. the existing node.
530            self.prev_pointers[i] = &node.pointers;
531        }
532        self.item_delta += 1;
533    }
534
535    fn erase(&mut self) {
536        let pointers = &mut self.prev_pointers;
537        if let Some(next) = pointers[0].get(0) {
538            // SAFETY: `next` should be valid; a write guard was taken above so nodes in the current
539            // epoch cannot be erased.
540            let next = unsafe { next.as_ref() };
541            if self.insertion_point.is_none() {
542                self.insertion_point = Some(pointers.clone());
543            }
544            if self.insertion_nodes.get(0).is_none() {
545                // If there's no insertion node, then just update the iterator position to point to
546                // the next node, and then when we commit, it'll get erased.
547                pointers[0] = &next.pointers;
548            } else {
549                // There's an insertion node, so the current element must be part of the insertion
550                // chain and so we can update the pointers immediately.  There will be another node
551                // that isn't part of the insertion chain that will still point at this node, but it
552                // will disappear when we commit.
553                pointers[0].set(0, next.pointers.get(0));
554            }
555            // Fix up all the pointers except the bottom one. Readers will still find this node,
556            // just not as efficiently.
557            for i in 1..next.pointers.len() {
558                pointers[i].set(i, next.pointers.get(i));
559            }
560        }
561        self.item_delta -= 1;
562    }
563
564    // Commits the changes.  Note that this doesn't wait for readers to finish; any barrier that be
565    // required should be handled by the caller.
566    fn commit(&mut self) {
567        // Splice the changes into the list.
568        let prev_pointers = match self.insertion_point.take() {
569            Some(prev_pointers) => prev_pointers,
570            None => return,
571        };
572
573        // Keep track of the first node that we might need to erase later.
574        let maybe_erase = prev_pointers[0].get(0);
575
576        // If there are no insertion nodes, then it means that we're only erasing nodes.
577        if self.insertion_nodes.get(0).is_none() {
578            // Erase all elements between the insertion point and the current element. The
579            // pointers for levels > 0 should already have been done, so it's only level 0 we
580            // need to worry about.
581            prev_pointers[0].set(0, self.prev_pointers[0].get(0));
582        } else {
583            // Switch the pointers over so that the insertion chain is spliced in.  This is safe
584            // so long as the bottom pointer is done first because that guarantees the new nodes
585            // will be found, just maybe not as efficiently.
586            for i in 0..self.insertion_nodes.len() {
587                if let Some(node) = self.insertion_nodes.get(i) {
588                    prev_pointers[i].set(i, Some(node));
589                }
590            }
591        }
592
593        // Switch the epoch so that we can track when existing readers have finished.
594        let mut inner = self.skip_list.inner.lock();
595        inner.item_count = inner.item_count.checked_add_signed(self.item_delta).unwrap();
596        if let Some(start) = maybe_erase {
597            let end = self.prev_pointers[0].get(0);
598            if maybe_erase != end {
599                if inner.current_count > 0 || !inner.erase_lists.is_empty() {
600                    let count = std::mem::take(&mut inner.current_count);
601                    let epoch = inner.epoch;
602                    inner.erase_lists.insert(epoch, EpochEraseList { count, start, end });
603                    inner.epoch = inner.epoch.wrapping_add(1);
604                } else {
605                    inner.free_erase_list(self.skip_list, start, end);
606                }
607            }
608        }
609    }
610}
611
612#[cfg(test)]
613mod tests {
614    use super::{SkipListLayer, SkipListLayerIterMut};
615    use crate::lsm_tree::merge::ItemOp::{Discard, Replace};
616    use crate::lsm_tree::merge::{MergeLayerIterator, MergeResult};
617    use crate::lsm_tree::skip_list_layer::SkipListLayerIter;
618    use crate::lsm_tree::types::{
619        DefaultOrdLowerBound, DefaultOrdUpperBound, Existence, FuzzyHash, Item, ItemRef, Layer,
620        LayerIterator, LayerIteratorMut, SortByU64,
621    };
622    use crate::serialized_types::{
623        LATEST_VERSION, Version, Versioned, VersionedLatest, versioned_type,
624    };
625    use assert_matches::assert_matches;
626    use fprint::TypeFingerprint;
627    use fuchsia_async as fasync;
628    use futures::future::join_all;
629    use futures::{FutureExt as _, join};
630    use fxfs_macros::{FuzzyHash, SerializeKey};
631    use std::hash::Hash;
632    use std::ops::Bound;
633    use std::time::{Duration, Instant};
634
635    #[derive(
636        Clone,
637        Eq,
638        Debug,
639        Hash,
640        FuzzyHash,
641        PartialEq,
642        PartialOrd,
643        Ord,
644        serde::Serialize,
645        serde::Deserialize,
646        TypeFingerprint,
647        Versioned,
648        SerializeKey,
649    )]
650    struct TestKey(u64);
651
652    versioned_type! { 1.. => TestKey }
653
654    impl SortByU64 for TestKey {
655        fn get_leading_u64(&self) -> u64 {
656            self.0
657        }
658    }
659
660    impl DefaultOrdLowerBound for TestKey {}
661    impl DefaultOrdUpperBound for TestKey {}
662
663    #[fuchsia::test]
664    async fn test_key_exists() {
665        let skip_list = SkipListLayer::new(100);
666        skip_list.insert(Item::new(TestKey(1), 1)).expect("insert error");
667        skip_list.insert(Item::new(TestKey(3), 3)).expect("insert error");
668
669        assert_eq!(
670            skip_list.key_exists(&TestKey(0)).await.expect("key_exists failed"),
671            Existence::Missing
672        );
673        assert_eq!(
674            skip_list.key_exists(&TestKey(1)).await.expect("key_exists failed"),
675            Existence::Exists
676        );
677        assert_eq!(
678            skip_list.key_exists(&TestKey(2)).await.expect("key_exists failed"),
679            Existence::Missing
680        );
681        assert_eq!(
682            skip_list.key_exists(&TestKey(3)).await.expect("key_exists failed"),
683            Existence::Exists
684        );
685        assert_eq!(
686            skip_list.key_exists(&TestKey(4)).await.expect("key_exists failed"),
687            Existence::Missing
688        );
689    }
690
691    #[fuchsia::test]
692    async fn test_iteration() {
693        // Insert two items and make sure we can iterate back in the correct order.
694        let skip_list = SkipListLayer::new(100);
695        let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2)];
696        skip_list.insert(items[1].clone()).expect("insert error");
697        skip_list.insert(items[0].clone()).expect("insert error");
698        let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
699        let ItemRef { key, value, .. } = iter.get().expect("missing item");
700        assert_eq!((key, value), (&items[0].key, &items[0].value));
701        iter.advance().await.unwrap();
702        let ItemRef { key, value, .. } = iter.get().expect("missing item");
703        assert_eq!((key, value), (&items[1].key, &items[1].value));
704        iter.advance().await.unwrap();
705        assert!(iter.get().is_none());
706    }
707
708    #[fuchsia::test]
709    async fn test_seek_exact() {
710        // Seek for an exact match.
711        let skip_list = SkipListLayer::new(100);
712        for i in (0..100).rev() {
713            skip_list.insert(Item::new(TestKey(i), i)).expect("insert error");
714        }
715        let mut iter = skip_list.seek(Bound::Included(&TestKey(57))).await.unwrap();
716        let ItemRef { key, value, .. } = iter.get().expect("missing item");
717        assert_eq!((key, value), (&TestKey(57), &57));
718
719        // And check the next item is correct.
720        iter.advance().await.unwrap();
721        let ItemRef { key, value, .. } = iter.get().expect("missing item");
722        assert_eq!((key, value), (&TestKey(58), &58));
723    }
724
725    #[fuchsia::test]
726    async fn test_seek_lower_bound() {
727        // Seek for a non-exact match.
728        let skip_list = SkipListLayer::new(100);
729        for i in (0..100).rev() {
730            skip_list.insert(Item::new(TestKey(i * 3), i * 3)).expect("insert error");
731        }
732        let mut expected_index = 57 * 3;
733        let mut iter = skip_list.seek(Bound::Included(&TestKey(expected_index - 1))).await.unwrap();
734        let ItemRef { key, value, .. } = iter.get().expect("missing item");
735        assert_eq!((key, value), (&TestKey(expected_index), &expected_index));
736
737        // And check the next item is correct.
738        expected_index += 3;
739        iter.advance().await.unwrap();
740        let ItemRef { key, value, .. } = iter.get().expect("missing item");
741        assert_eq!((key, value), (&TestKey(expected_index), &expected_index));
742    }
743
744    #[fuchsia::test]
745    async fn test_replace_or_insert_replaces() {
746        let skip_list = SkipListLayer::new(100);
747        let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2)];
748        skip_list.insert(items[1].clone()).expect("insert error");
749        skip_list.insert(items[0].clone()).expect("insert error");
750        let replacement_value = 3;
751        skip_list.replace_or_insert(Item::new(items[1].key.clone(), replacement_value));
752
753        let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
754        let ItemRef { key, value, .. } = iter.get().expect("missing item");
755        assert_eq!((key, value), (&items[0].key, &items[0].value));
756        iter.advance().await.unwrap();
757        let ItemRef { key, value, .. } = iter.get().expect("missing item");
758        assert_eq!((key, value), (&items[1].key, &replacement_value));
759        iter.advance().await.unwrap();
760        assert!(iter.get().is_none());
761    }
762
763    #[fuchsia::test]
764    async fn test_replace_or_insert_inserts() {
765        let skip_list = SkipListLayer::new(100);
766        let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2), Item::new(TestKey(3), 3)];
767        skip_list.insert(items[2].clone()).expect("insert error");
768        skip_list.insert(items[0].clone()).expect("insert error");
769        skip_list.replace_or_insert(items[1].clone());
770
771        let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
772        let ItemRef { key, value, .. } = iter.get().expect("missing item");
773        assert_eq!((key, value), (&items[0].key, &items[0].value));
774        iter.advance().await.unwrap();
775        let ItemRef { key, value, .. } = iter.get().expect("missing item");
776        assert_eq!((key, value), (&items[1].key, &items[1].value));
777        iter.advance().await.unwrap();
778        let ItemRef { key, value, .. } = iter.get().expect("missing item");
779        assert_eq!((key, value), (&items[2].key, &items[2].value));
780        iter.advance().await.unwrap();
781        assert!(iter.get().is_none());
782    }
783
784    #[fuchsia::test]
785    async fn test_erase() {
786        let skip_list = SkipListLayer::new(100);
787        let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2)];
788        skip_list.insert(items[1].clone()).expect("insert error");
789        skip_list.insert(items[0].clone()).expect("insert error");
790
791        assert_eq!(skip_list.len(), 2);
792
793        skip_list.erase(&items[1].key);
794
795        assert_eq!(skip_list.len(), 1);
796
797        {
798            let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
799            let ItemRef { key, value, .. } = iter.get().expect("missing item");
800            assert_eq!((key, value), (&items[0].key, &items[0].value));
801            iter.advance().await.unwrap();
802            assert!(iter.get().is_none());
803        }
804
805        skip_list.erase(&items[0].key);
806
807        assert_eq!(skip_list.len(), 0);
808
809        {
810            let iter = skip_list.seek(Bound::Unbounded).await.unwrap();
811            assert!(iter.get().is_none());
812        }
813    }
814
815    // This test ends up being flaky on CQ. It is left here as it might be useful in case
816    // significant changes are made.
817    #[fuchsia::test]
818    #[ignore]
819    async fn test_seek_is_log_n_complexity() {
820        // Keep doubling up the number of items until it takes about 500ms to search and then go
821        // back and measure something that should, in theory, take about half that time.
822        let mut n = 100;
823        let mut loops = 0;
824        const TARGET_TIME: Duration = Duration::from_millis(500);
825        let time = loop {
826            let skip_list = SkipListLayer::new(n as usize);
827            for i in 0..n {
828                skip_list.insert(Item::new(TestKey(i), i)).expect("insert error");
829            }
830            let start = Instant::now();
831            for i in 0..n {
832                skip_list.seek(Bound::Included(&TestKey(i))).await.unwrap();
833            }
834            let elapsed = Instant::now() - start;
835            if elapsed > TARGET_TIME {
836                break elapsed;
837            }
838            n *= 2;
839            loops += 1;
840        };
841
842        let seek_count = n;
843        n >>= loops / 2; // This should, in theory, result in 50% seek time.
844        let skip_list = SkipListLayer::new(n as usize);
845        for i in 0..n {
846            skip_list.insert(Item::new(TestKey(i), i)).expect("insert error");
847        }
848        let start = Instant::now();
849        for i in 0..seek_count {
850            skip_list.seek(Bound::Included(&TestKey(i))).await.unwrap();
851        }
852        let elapsed = Instant::now() - start;
853
854        eprintln!(
855            "{} items: {}ms, {} items: {}ms",
856            seek_count,
857            time.as_millis(),
858            n,
859            elapsed.as_millis()
860        );
861
862        // Experimental results show that typically we do a bit better than log(n), but here we just
863        // check that the time we just measured is above 25% of the time we first measured, the
864        // theory suggests it should be around 50%.
865        assert!(elapsed * 4 > time);
866    }
867
868    #[fuchsia::test]
869    async fn test_large_number_of_items() {
870        let item_count = 1000;
871        let skip_list = SkipListLayer::new(1000);
872        for i in 1..item_count {
873            skip_list.insert(Item::new(TestKey(i), 1)).expect("insert error");
874        }
875        let mut iter = skip_list.seek(Bound::Included(&TestKey(item_count - 10))).await.unwrap();
876        for i in item_count - 10..item_count {
877            assert_eq!(iter.get().expect("missing item").key, &TestKey(i));
878            iter.advance().await.unwrap();
879        }
880        assert!(iter.get().is_none());
881    }
882
883    #[fuchsia::test]
884    async fn test_multiple_readers_allowed() {
885        let skip_list = SkipListLayer::new(100);
886        let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2)];
887        skip_list.insert(items[1].clone()).expect("insert error");
888        skip_list.insert(items[0].clone()).expect("insert error");
889
890        // Create the first iterator and check the first item.
891        let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
892        let ItemRef { key, value, .. } = iter.get().expect("missing item");
893        assert_eq!((key, value), (&items[0].key, &items[0].value));
894
895        // Create a second iterator and check the first item.
896        let iter2 = skip_list.seek(Bound::Unbounded).await.unwrap();
897        let ItemRef { key, value, .. } = iter2.get().expect("missing item");
898        assert_eq!((key, value), (&items[0].key, &items[0].value));
899
900        // Now go back to the first iterator and check the second item.
901        iter.advance().await.unwrap();
902        let ItemRef { key, value, .. } = iter.get().expect("missing item");
903        assert_eq!((key, value), (&items[1].key, &items[1].value));
904    }
905
906    fn merge(
907        left: &'_ MergeLayerIterator<'_, TestKey, i32>,
908        right: &'_ MergeLayerIterator<'_, TestKey, i32>,
909    ) -> MergeResult<TestKey, i32> {
910        MergeResult::Other {
911            emit: None,
912            left: Replace(Item::new((*left.key()).clone(), *left.value() + *right.value()).boxed()),
913            right: Discard,
914        }
915    }
916
917    #[fuchsia::test]
918    async fn test_merge_into() {
919        let skip_list = SkipListLayer::new(100);
920        skip_list.insert(Item::new(TestKey(1), 1)).expect("insert error");
921
922        skip_list.merge_into(Item::new(TestKey(2), 2), &TestKey(1), merge);
923
924        let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
925        let ItemRef { key, value, .. } = iter.get().expect("missing item");
926        assert_eq!((key, value), (&TestKey(1), &3));
927        iter.advance().await.unwrap();
928        assert!(iter.get().is_none());
929    }
930
931    #[fuchsia::test]
932    async fn test_two_inserts() {
933        let skip_list = SkipListLayer::new(100);
934        let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2)];
935        {
936            let mut iter = SkipListLayerIterMut::new(&skip_list, std::ops::Bound::Unbounded);
937            iter.insert(items[0].clone());
938            iter.insert(items[1].clone());
939        }
940
941        let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
942        let ItemRef { key, value, .. } = iter.get().expect("missing item");
943        assert_eq!((key, value), (&items[0].key, &items[0].value));
944        iter.advance().await.unwrap();
945        let ItemRef { key, value, .. } = iter.get().expect("missing item");
946        assert_eq!((key, value), (&items[1].key, &items[1].value));
947    }
948
949    #[fuchsia::test]
950    async fn test_erase_after_insert() {
951        let skip_list = SkipListLayer::new(100);
952        let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2)];
953        skip_list.insert(items[1].clone()).expect("insert error");
954        {
955            let mut iter = SkipListLayerIterMut::new(&skip_list, std::ops::Bound::Unbounded);
956            iter.insert(items[0].clone());
957            iter.erase();
958        }
959
960        let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
961        let ItemRef { key, value, .. } = iter.get().expect("missing item");
962        assert_eq!((key, value), (&items[0].key, &items[0].value));
963        iter.advance().await.unwrap();
964        assert!(iter.get().is_none());
965    }
966
967    #[fuchsia::test]
968    async fn test_insert_after_erase() {
969        let skip_list = SkipListLayer::new(100);
970        let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2)];
971        skip_list.insert(items[1].clone()).expect("insert error");
972        {
973            let mut iter = SkipListLayerIterMut::new(&skip_list, std::ops::Bound::Unbounded);
974            iter.erase();
975            iter.insert(items[0].clone());
976        }
977
978        let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
979        let ItemRef { key, value, .. } = iter.get().expect("missing item");
980        assert_eq!((key, value), (&items[0].key, &items[0].value));
981        iter.advance().await.unwrap();
982        assert!(iter.get().is_none());
983    }
984
985    #[fuchsia::test]
986    async fn test_insert_erase_insert() {
987        let skip_list = SkipListLayer::new(100);
988        let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2), Item::new(TestKey(3), 3)];
989        skip_list.insert(items[0].clone()).expect("insert error");
990        {
991            let mut iter = SkipListLayerIterMut::new(&skip_list, std::ops::Bound::Unbounded);
992            iter.insert(items[1].clone());
993            iter.erase();
994            iter.insert(items[2].clone());
995        }
996
997        let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
998        let ItemRef { key, value, .. } = iter.get().expect("missing item");
999        assert_eq!((key, value), (&items[1].key, &items[1].value));
1000        iter.advance().await.unwrap();
1001        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1002        assert_eq!((key, value), (&items[2].key, &items[2].value));
1003    }
1004
1005    #[fuchsia::test]
1006    async fn test_two_erase_erases() {
1007        let skip_list = SkipListLayer::new(100);
1008        let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2), Item::new(TestKey(3), 3)];
1009        skip_list.insert(items[0].clone()).expect("insert error");
1010        skip_list.insert(items[1].clone()).expect("insert error");
1011        skip_list.insert(items[2].clone()).expect("insert error");
1012        {
1013            let mut iter = SkipListLayerIterMut::new(&skip_list, std::ops::Bound::Unbounded);
1014            iter.erase();
1015            iter.erase();
1016        }
1017
1018        let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1019        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1020        assert_eq!((key, value), (&items[2].key, &items[2].value));
1021        iter.advance().await.unwrap();
1022        assert!(iter.get().is_none());
1023    }
1024
1025    #[fuchsia::test]
1026    async fn test_readers_not_blocked_by_writers() {
1027        let skip_list = SkipListLayer::new(100);
1028        let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2)];
1029        skip_list.insert(items[1].clone()).expect("insert error");
1030
1031        let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1032        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1033        assert_eq!((key, value), (&items[1].key, &items[1].value));
1034
1035        let mut iter2 = skip_list.seek(Bound::Unbounded).await.unwrap();
1036        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1037        assert_eq!((key, value), (&items[1].key, &items[1].value));
1038
1039        join!(async { skip_list.insert(items[0].clone()).expect("insert error") }, async {
1040            loop {
1041                let iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1042                let ItemRef { key, .. } = iter.get().expect("missing item");
1043                if key == &items[0].key {
1044                    break;
1045                }
1046            }
1047            iter.advance().await.unwrap();
1048            assert!(iter.get().is_none());
1049            std::mem::drop(iter);
1050            iter2.advance().await.unwrap();
1051            assert!(iter2.get().is_none());
1052            std::mem::drop(iter2);
1053        });
1054    }
1055
1056    #[fuchsia::test(threads = 20)]
1057    async fn test_many_readers_and_writers() {
1058        let skip_list = SkipListLayer::new(100);
1059        join_all(
1060            (0..10)
1061                .map(|i| {
1062                    let skip_list_clone = skip_list.clone();
1063                    fasync::Task::spawn(async move {
1064                        for j in 0..10 {
1065                            skip_list_clone
1066                                .insert(Item::new(TestKey(i * 100 + j), i))
1067                                .expect("insert error");
1068                        }
1069                    })
1070                })
1071                .chain((0..10).map(|_| {
1072                    let skip_list_clone = skip_list.clone();
1073                    fasync::Task::spawn(async move {
1074                        for _ in 0..300 {
1075                            let mut iter =
1076                                skip_list_clone.seek(Bound::Unbounded).await.expect("seek failed");
1077                            let mut last_item: Option<TestKey> = None;
1078                            while let Some(item) = iter.get() {
1079                                if let Some(last) = last_item {
1080                                    assert!(item.key > &last);
1081                                }
1082                                last_item = Some(item.key.clone());
1083                                iter.advance().await.expect("advance failed");
1084                            }
1085                        }
1086                    })
1087                })),
1088        )
1089        .await;
1090    }
1091
1092    #[fuchsia::test]
1093    async fn test_insert_advance_erase() {
1094        let skip_list = SkipListLayer::new(100);
1095        let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2), Item::new(TestKey(3), 3)];
1096        skip_list.insert(items[1].clone()).expect("insert error");
1097        skip_list.insert(items[2].clone()).expect("insert error");
1098
1099        assert_eq!(skip_list.len(), 2);
1100
1101        {
1102            let mut iter = SkipListLayerIterMut::new(&skip_list, std::ops::Bound::Unbounded);
1103            iter.insert(items[0].clone());
1104            iter.advance();
1105            iter.erase();
1106        }
1107
1108        assert_eq!(skip_list.len(), 2);
1109
1110        let mut iter = skip_list.seek(Bound::Unbounded).await.unwrap();
1111        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1112        assert_eq!((key, value), (&items[0].key, &items[0].value));
1113        iter.advance().await.unwrap();
1114        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1115        assert_eq!((key, value), (&items[1].key, &items[1].value));
1116        iter.advance().await.unwrap();
1117        assert!(iter.get().is_none());
1118    }
1119
1120    #[fuchsia::test]
1121    async fn test_seek_excluded() {
1122        let skip_list = SkipListLayer::new(100);
1123        let items = [Item::new(TestKey(1), 1), Item::new(TestKey(2), 2)];
1124        skip_list.insert(items[0].clone()).expect("insert error");
1125        skip_list.insert(items[1].clone()).expect("insert error");
1126        let iter = skip_list.seek(Bound::Excluded(&items[0].key)).await.expect("seek failed");
1127        let ItemRef { key, value, .. } = iter.get().expect("missing item");
1128        assert_eq!((key, value), (&items[1].key, &items[1].value));
1129    }
1130
1131    #[fuchsia::test]
1132    fn test_insert_race() {
1133        for _ in 0..1000 {
1134            let skip_list = SkipListLayer::new(100);
1135            skip_list.insert(Item::new(TestKey(2), 2)).expect("insert error");
1136
1137            let skip_list_clone = skip_list.clone();
1138            let thread1 = std::thread::spawn(move || {
1139                skip_list_clone.insert(Item::new(TestKey(1), 1)).expect("insert error")
1140            });
1141            let thread2 = std::thread::spawn(move || {
1142                let iter = SkipListLayerIter::new(&skip_list, Bound::Included(&TestKey(2)));
1143                match iter.get() {
1144                    Some(ItemRef { key: TestKey(2), .. }) => {}
1145                    result => assert!(false, "{:?}", result),
1146                }
1147            });
1148            thread1.join().unwrap();
1149            thread2.join().unwrap();
1150        }
1151    }
1152
1153    #[fuchsia::test]
1154    fn test_replace_or_insert_multi_thread() {
1155        let skip_list = SkipListLayer::new(100);
1156        skip_list.insert(Item::new(TestKey(1), 1)).expect("insert error");
1157        skip_list.insert(Item::new(TestKey(2), 2)).expect("insert error");
1158        skip_list.insert(Item::new(TestKey(3), 3)).expect("insert error");
1159        skip_list.insert(Item::new(TestKey(4), 4)).expect("insert error");
1160
1161        // Set up a number of threads that are repeatedly replacing the '3' key.
1162        let mut threads = Vec::new();
1163        for i in 0..200 {
1164            let skip_list_clone = skip_list.clone();
1165            threads.push(std::thread::spawn(move || {
1166                skip_list_clone.replace_or_insert(Item::new(TestKey(3), i));
1167            }));
1168        }
1169
1170        // Have one thread repeatedly checking the list.
1171        let _checker_thread = std::thread::spawn(move || {
1172            loop {
1173                let mut iter = SkipListLayerIter::new(&skip_list, Bound::Included(&TestKey(2)));
1174                assert_matches!(iter.get(), Some(ItemRef { key: TestKey(2), .. }));
1175                iter.advance().now_or_never().unwrap().unwrap();
1176                assert_matches!(iter.get(), Some(ItemRef { key: TestKey(3), .. }));
1177                iter.advance().now_or_never().unwrap().unwrap();
1178                assert_matches!(iter.get(), Some(ItemRef { key: TestKey(4), .. }));
1179            }
1180        });
1181
1182        for thread in threads {
1183            thread.join().unwrap();
1184        }
1185    }
1186}