Skip to main content

fxfs/lsm_tree/
types.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::drop_event::DropEvent;
6use crate::object_handle::ReadObjectHandle;
7use crate::serialized_types::{Version, Versioned, VersionedLatest};
8use anyhow::Error;
9use async_trait::async_trait;
10use fprint::TypeFingerprint;
11use serde::{Deserialize, Serialize};
12use std::fmt::Debug;
13use std::future::Future;
14use std::hash::Hash;
15use std::marker::PhantomData;
16use std::pin::Pin;
17use std::sync::Arc;
18
19pub use fxfs_macros::impl_fuzzy_hash;
20
21// Force keys to be sorted first by a u64, so that they can be located approximately based on only
22// that integer without the whole key.
23pub trait SortByU64 {
24    // Return the u64 that is used as the first value when deciding on sort order of the key.
25    fn get_leading_u64(&self) -> u64;
26}
27
28/// An extension to `std::hash::Hash` to support values which should be partitioned and hashed into
29/// buckets, where nearby keys will have the same hash value.  This is used for existence filtering
30/// in layer files (see `Layer::maybe_contains_key`).
31///
32/// For point-based keys, this can be the same as `std::hash::Hash`, but for range-based keys, the
33/// hash can collapse nearby ranges into the same hash value.  Since a range-based key may span
34/// several buckets, `FuzzyHash::fuzzy_hash` must be called to split the key up into each of the
35/// possible values that it overlaps with.
36pub trait FuzzyHash: Hash + Sized {
37    /// To support range-based keys, multiple hash values may need to be checked for a given key.
38    /// For example, an extent [0..1024) might return extents [0..512), [512..1024), each of which
39    /// will have a unique return value for `Self::hash`.  For point-based keys, a single hash
40    /// suffices, in which case None is returned and the hash value of `self` should be checked.
41    /// Note that in general only a small number of partitions (e.g. 2) should be checked at once.
42    /// Debug assertions will fire if too large of a range is checked.
43    fn fuzzy_hash(&self) -> impl Iterator<Item = u64>;
44
45    /// Returns whether the type is a range-based key. Used to prevent use of range-based keys as
46    /// a point query (see [`crate::lsm_tree::merge::Query::Point`]).
47    fn is_range_key(&self) -> bool {
48        false
49    }
50}
51
52impl_fuzzy_hash!(u8);
53impl_fuzzy_hash!(u32);
54impl_fuzzy_hash!(u64);
55impl_fuzzy_hash!(String);
56impl_fuzzy_hash!(Vec<u8>);
57
58/// Keys and values need to implement the following traits.  For merging, they need to implement
59/// MergeableKey.  TODO: Use trait_alias when available.
60pub trait Key:
61    Clone
62    + Debug
63    + Hash
64    + FuzzyHash
65    + OrdUpperBound
66    + Send
67    + SortByU64
68    + Sync
69    + Versioned
70    + VersionedLatest
71    + std::marker::Unpin
72    + 'static
73{
74}
75
76pub trait RangeKey: Key {
77    /// Returns if two keys overlap.
78    fn overlaps(&self, other: &Self) -> bool;
79}
80
81impl<K> Key for K where
82    K: Clone
83        + Debug
84        + Hash
85        + FuzzyHash
86        + OrdUpperBound
87        + Send
88        + SortByU64
89        + Sync
90        + Versioned
91        + VersionedLatest
92        + std::marker::Unpin
93        + 'static
94{
95}
96
97pub trait MergeableKey: Key + Eq + LayerKey + OrdLowerBound {}
98impl<K> MergeableKey for K where K: Key + Eq + LayerKey + OrdLowerBound {}
99
100/// Trait required for supporting Layer functionality.
101pub trait LayerValue:
102    Clone + Send + Sync + Versioned + VersionedLatest + Debug + std::marker::Unpin + 'static
103{
104}
105impl<V> LayerValue for V where
106    V: Clone + Send + Sync + Versioned + VersionedLatest + Debug + std::marker::Unpin + 'static
107{
108}
109
110/// Superset of `LayerValue` to additionally support tree searching, requires comparison and an
111/// `DELETED_MARKER` for indicating empty values used to indicate deletion in the `LSMTree`.
112pub trait Value: PartialEq + LayerValue {
113    /// Value used to represent that the entry is actually empty, and should be ignored.
114    const DELETED_MARKER: Self;
115}
116
117/// ItemRef is a struct that contains references to key and value, which is useful since in many
118/// cases since keys and values are stored separately so &Item is not possible.
119#[derive(Debug, PartialEq, Eq, Serialize)]
120pub struct ItemRef<'a, K, V> {
121    pub key: &'a K,
122    pub value: &'a V,
123}
124
125impl<K: Clone, V: Clone> ItemRef<'_, K, V> {
126    pub fn cloned(&self) -> Item<K, V> {
127        Item { key: self.key.clone(), value: self.value.clone() }
128    }
129
130    pub fn boxed(&self) -> BoxedItem<K, V> {
131        Box::new(self.cloned())
132    }
133}
134
135impl<'a, K, V> Clone for ItemRef<'a, K, V> {
136    fn clone(&self) -> Self {
137        *self
138    }
139}
140impl<'a, K, V> Copy for ItemRef<'a, K, V> {}
141
142/// Item is a struct that combines a key and a value.
143#[derive(Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
144#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
145pub struct Item<K, V> {
146    pub key: K,
147    pub value: V,
148}
149
150#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
151#[derive(Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
152pub struct LegacyItem<K, V> {
153    pub key: K,
154    pub value: V,
155    pub sequence: u64,
156}
157
158impl<K: TypeFingerprint, V: TypeFingerprint> TypeFingerprint for LegacyItem<K, V> {
159    fn fingerprint() -> String {
160        "struct {key:".to_owned()
161            + &K::fingerprint()
162            + ",value:"
163            + &V::fingerprint()
164            + ",sequence:u64}"
165    }
166}
167
168pub type BoxedItem<K, V> = Box<Item<K, V>>;
169
170// Nb: type-fprint doesn't support generics yet.
171impl<K: TypeFingerprint, V: TypeFingerprint> TypeFingerprint for Item<K, V> {
172    fn fingerprint() -> String {
173        "struct {key:".to_owned() + &K::fingerprint() + ",value:" + &V::fingerprint() + "}"
174    }
175}
176
177impl<K, V> Item<K, V> {
178    pub fn new(key: K, value: V) -> Item<K, V> {
179        Item { key, value }
180    }
181
182    pub fn as_item_ref(&self) -> ItemRef<'_, K, V> {
183        self.into()
184    }
185
186    pub fn boxed(self) -> BoxedItem<K, V> {
187        Box::new(self)
188    }
189}
190
191impl<'a, K, V> From<&'a Item<K, V>> for ItemRef<'a, K, V> {
192    fn from(item: &'a Item<K, V>) -> ItemRef<'a, K, V> {
193        ItemRef { key: &item.key, value: &item.value }
194    }
195}
196
197/// The find functions will return items with keys that are greater-than or equal to the search key,
198/// so for keys that are like extents, the keys should sort (via OrdUpperBound) using the end
199/// of their ranges, and you should set the search key accordingly.
200///
201/// For example, let's say the tree holds extents 100..200, 200..250 and you want to perform a read
202/// for range 150..250, you should search for 0..151 which will first return the extent 100..200
203/// (and then the iterator can be advanced to 200..250 after). When merging, keys can overlap, so
204/// consider the case where we want to merge an extent with range 100..300 with an existing extent
205/// of 200..250. In that case, we want to treat the extent with range 100..300 as lower than the key
206/// 200..250 because we'll likely want to split the extents (e.g. perhaps we want 100..200,
207/// 200..250, 250..300), so for merging, we need to use a different comparison function and we deal
208/// with that using the OrdLowerBound trait.
209///
210/// If your keys don't have overlapping ranges that need to be merged, then these can be the same as
211/// std::cmp::Ord (use the DefaultOrdUpperBound and DefaultOrdLowerBound traits).
212
213pub trait OrdUpperBound {
214    fn cmp_upper_bound(&self, other: &Self) -> std::cmp::Ordering;
215}
216
217pub trait DefaultOrdUpperBound: OrdUpperBound + Ord {}
218
219impl<T: DefaultOrdUpperBound> OrdUpperBound for T {
220    fn cmp_upper_bound(&self, other: &Self) -> std::cmp::Ordering {
221        // Default to using cmp.
222        self.cmp(other)
223    }
224}
225
226pub trait OrdLowerBound {
227    fn cmp_lower_bound(&self, other: &Self) -> std::cmp::Ordering;
228}
229
230pub trait DefaultOrdLowerBound: OrdLowerBound + Ord {}
231
232impl<T: DefaultOrdLowerBound> OrdLowerBound for T {
233    fn cmp_lower_bound(&self, other: &Self) -> std::cmp::Ordering {
234        // Default to using cmp.
235        self.cmp(other)
236    }
237}
238
239/// Result returned by `merge_type()` to determine how to properly merge values within a layerset.
240#[derive(Clone, PartialEq)]
241pub enum MergeType {
242    /// Always includes every layer in the merger, when seeking or advancing. Always correct, but
243    /// always as slow as possible.
244    FullMerge,
245
246    /// Stops seeking older layers when an exact key match is found in a newer one. Useful for keys
247    /// that only replace data, or with `next_key()` implementations to decide on continued merging.
248    OptimizedMerge,
249}
250
251/// Determines how to iterate forward from the current key, and how many older layers to include
252/// when merging. See the different variants of `MergeKeyType` for more details.
253pub trait LayerKey: Clone {
254    /// Called to determine how to perform merge behaviours while advancing through a layer set.
255    fn merge_type(&self) -> MergeType {
256        // Defaults to full merge. The slowest, but most predictable in behaviour.
257        MergeType::FullMerge
258    }
259
260    /// The next_key() call allows for an optimisation which allows the merger to avoid querying a
261    /// layer if it knows it has found the next possible key.  It only makes sense for this to
262    /// return Some() when merge_type() returns OptimizedMerge. Consider the following example
263    /// showing two layers with range based keys.
264    ///
265    ///      +----------+------------+
266    ///  0   |  0..100  |  100..200  |
267    ///      +----------+------------+
268    ///  1              |  100..200  |
269    ///                 +------------+
270    ///
271    /// If you search and find the 0..100 key, then only layer 0 will be touched.  If you then want
272    /// to advance to the 100..200 record, you can find it in layer 0 but unless you know that it
273    /// immediately follows the 0..100 key i.e. that there is no possible key, K, such that
274    /// 0..100 < K < 100..200, the merger has to consult all other layers to check.  next_key should
275    /// return a key, N, such that if the merger encounters a key that is <= N (using
276    /// OrdLowerBound), it can stop touching more layers.  The key N should also be the the key to
277    /// search for in other layers if the merger needs to do so.  In the example above, it should be
278    /// a key that is > 0..100 and 99..100, but <= 100..200 (using OrdUpperBound).  In practice,
279    /// what this means is that for range based keys, OrdUpperBound should use the end of the range,
280    /// OrdLowerBound should use the start of the range, and next_key should return end..end + 1.
281    /// This is purely an optimisation; the default None will be correct but not performant.
282    fn next_key(&self) -> Option<Self> {
283        None
284    }
285    /// Returns the search key for this extent; that is, a key which is <= this key under Ord and
286    /// OrdLowerBound.  Note that this is only used for Query::LimitedRange queries (where
287    /// `Self::partition` returns Some).
288    /// For example, if the tree has extents 50..150 and 150..200 and we wish to read 100..200, we'd
289    /// search for 0..101 which would set the iterator to 50..150.
290    fn search_key(&self) -> Self {
291        unreachable!()
292    }
293}
294
295#[derive(Debug, Eq, PartialEq, Clone, Copy)]
296pub enum Existence {
297    /// The key definitely exists.
298    Exists,
299    /// The key might exist.
300    MaybeExists,
301    /// The key definitely does not exist.
302    Missing,
303}
304
305/// Layer is a trait that all layers need to implement (mutable and immutable).
306#[async_trait]
307pub trait Layer<K, V>: Send + Sync {
308    /// If the layer is persistent, returns the handle to its contents.  Returns None for in-memory
309    /// layers.
310    fn handle(&self) -> Option<&dyn ReadObjectHandle> {
311        None
312    }
313
314    /// Some layer implementations may choose to cache data in-memory.  Calling this function will
315    /// request that the layer purges unused cached data.  This is intended to run on a timer.
316    fn purge_cached_data(&self) {}
317
318    /// Searches for a key. Bound::Excluded is not supported. Bound::Unbounded positions the
319    /// iterator on the first item in the layer.
320    async fn seek(&self, bound: std::ops::Bound<&K>)
321    -> Result<BoxedLayerIterator<'_, K, V>, Error>;
322
323    /// Returns the number of items in the layer file.
324    fn len(&self) -> usize;
325
326    /// Returns whether the layer *might* contain records relevant to `key`.  Note that this can
327    /// return true even if the layer has no records relevant to `key`, but it will never return
328    /// false if there are such records.  (As such, always returning true is a trivially correct
329    /// implementation.)
330    fn maybe_contains_key(&self, _key: &K) -> bool {
331        true
332    }
333
334    /// This is similar to `maybe_contains_key` except that there *must* be a `key` and possible
335    /// state where this will indicate the key is missing i.e. *always* returning
336    /// `Existence::MaybeExists` is *not* a correct implementation.  If an implementation has a low
337    /// cost way to determine if the key *might* exist, then it may use that, but if not, it must
338    /// use a slower algorithm.  This method was introduced to allow for an efficient way of
339    /// determining if a particular key is free to be used.  `maybe_contains_key` cannot be used for
340    /// this purpose because implementations might *always* return `true` which would mean it would
341    /// be impossible to ever find a key that is free to use.  It might not be appropriate to use
342    /// this with range based keys: implementations should use `cmp_upper_bound` and test for
343    /// equality, which might not give the desired results for range based keys.
344    async fn key_exists(&self, key: &K) -> Result<Existence, Error>;
345
346    /// Locks the layer preventing it from being closed. This will never block i.e. there can be
347    /// many locks concurrently.  The lock is purely advisory: seek will still work even if lock has
348    /// not been called; it merely causes close to wait until all locks are released.  Returns None
349    /// if close has been called for the layer.
350    fn lock(&self) -> Option<Arc<DropEvent>>;
351
352    /// Waits for existing locks readers to finish and then returns.  Subsequent calls to lock will
353    /// return None.
354    async fn close(&self);
355
356    /// Returns the version number used by structs in this layer
357    fn get_version(&self) -> Version;
358
359    /// Records inspect data for the layer into `node`.  Called lazily when inspect is queried.
360    fn record_inspect_data(self: Arc<Self>, _node: &fuchsia_inspect::Node) {}
361}
362
363/// Something that implements LayerIterator is returned by the seek function.
364#[async_trait]
365pub trait LayerIterator<K, V>: Send + Sync {
366    /// Advances the iterator.
367    async fn advance(&mut self) -> Result<(), Error>;
368
369    /// Returns the current item. This will be None if called when the iterator is first crated i.e.
370    /// before either seek or advance has been called, and None if the iterator has reached the end
371    /// of the layer.
372    fn get(&self) -> Option<ItemRef<'_, K, V>>;
373
374    /// Creates an iterator that only yields items from the underlying iterator for which
375    /// `predicate` returns `true`.
376    async fn filter<P>(self, predicate: P) -> Result<FilterLayerIterator<Self, P, K, V>, Error>
377    where
378        P: for<'b> Fn(ItemRef<'b, K, V>) -> bool + Send + Sync,
379        Self: Sized,
380        K: Send + Sync,
381        V: Send + Sync,
382    {
383        FilterLayerIterator::new(self, predicate).await
384    }
385}
386
387pub type BoxedLayerIterator<'iter, K, V> = Box<dyn LayerIterator<K, V> + 'iter>;
388
389impl<'iter, K, V> LayerIterator<K, V> for BoxedLayerIterator<'iter, K, V> {
390    // Manual expansion of `async_trait` to avoid double boxing the `Future`.
391    fn advance<'a, 'b>(&'a mut self) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'b>>
392    where
393        'a: 'b,
394        Self: 'b,
395    {
396        (**self).advance()
397    }
398    fn get(&self) -> Option<ItemRef<'_, K, V>> {
399        (**self).get()
400    }
401}
402
403/// Mutable layers need an iterator that implements this in order to make merge_into work.
404pub(super) trait LayerIteratorMut<K, V>: Sync {
405    /// Advances the iterator.
406    fn advance(&mut self);
407
408    /// Returns the current item. This will be None if called when the iterator is first crated i.e.
409    /// before either seek or advance has been called, and None if the iterator has reached the end
410    /// of the layer.
411    fn get(&self) -> Option<ItemRef<'_, K, V>>;
412
413    /// Inserts the item before the item that the iterator is located at.  The insert won't be
414    /// visible until the changes are committed (see `commit`).
415    fn insert(&mut self, item: Item<K, V>);
416
417    /// Erases the current item and positions the iterator on the next item, if any.  The change
418    /// won't be visible until committed (see `commit`).
419    fn erase(&mut self);
420
421    /// Commits the changes.  This does not wait for existing readers to finish.
422    fn commit(&mut self);
423}
424
425/// Trait for writing new layers.
426pub trait LayerWriter<K, V>: Sized
427where
428    K: Debug + Send + Versioned + Sync,
429    V: Debug + Send + Versioned + Sync,
430{
431    /// Writes the given item to this layer.
432    fn write(&mut self, item: ItemRef<'_, K, V>) -> impl Future<Output = Result<(), Error>> + Send;
433
434    /// Flushes any buffered items to the backing storage.
435    fn flush(&mut self) -> impl Future<Output = Result<(), Error>> + Send;
436
437    /// Returns the total bytes written to the layer.
438    fn bytes_written(&self) -> u64;
439}
440
441/// A `LayerIterator`` that filters the items of another `LayerIterator`.
442pub struct FilterLayerIterator<I, P, K, V> {
443    iter: I,
444    predicate: P,
445    _key: PhantomData<K>,
446    _value: PhantomData<V>,
447}
448
449impl<I, P, K, V> FilterLayerIterator<I, P, K, V>
450where
451    I: LayerIterator<K, V>,
452    P: for<'b> Fn(ItemRef<'b, K, V>) -> bool + Send + Sync,
453{
454    async fn new(iter: I, predicate: P) -> Result<Self, Error> {
455        let mut filter = Self { iter, predicate, _key: PhantomData, _value: PhantomData };
456        filter.skip_filtered().await?;
457        Ok(filter)
458    }
459
460    async fn skip_filtered(&mut self) -> Result<(), Error> {
461        loop {
462            match self.iter.get() {
463                Some(item) if !(self.predicate)(item) => {}
464                _ => return Ok(()),
465            }
466            self.iter.advance().await?;
467        }
468    }
469}
470
471#[async_trait]
472impl<I, P, K, V> LayerIterator<K, V> for FilterLayerIterator<I, P, K, V>
473where
474    I: LayerIterator<K, V>,
475    P: for<'b> Fn(ItemRef<'b, K, V>) -> bool + Send + Sync,
476    K: Send + Sync,
477    V: Send + Sync,
478{
479    async fn advance(&mut self) -> Result<(), Error> {
480        self.iter.advance().await?;
481        self.skip_filtered().await
482    }
483
484    fn get(&self) -> Option<ItemRef<'_, K, V>> {
485        self.iter.get()
486    }
487}
488
489#[cfg(test)]
490mod test_types {
491    use crate::lsm_tree::types::{
492        DefaultOrdLowerBound, DefaultOrdUpperBound, FuzzyHash, LayerKey, MergeType, SortByU64,
493        impl_fuzzy_hash,
494    };
495
496    impl DefaultOrdUpperBound for i32 {}
497    impl DefaultOrdLowerBound for i32 {}
498    impl SortByU64 for i32 {
499        fn get_leading_u64(&self) -> u64 {
500            if self >= &0 {
501                return u64::try_from(*self).unwrap() + u64::try_from(i32::MAX).unwrap() + 1;
502            }
503            u64::try_from(self + i32::MAX + 1).unwrap()
504        }
505    }
506    impl LayerKey for i32 {
507        fn merge_type(&self) -> MergeType {
508            MergeType::FullMerge
509        }
510    }
511
512    impl_fuzzy_hash!(i32);
513}