fxfs/lsm_tree/types.rs
1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::drop_event::DropEvent;
6use crate::object_handle::ReadObjectHandle;
7use crate::serialized_types::serialized_key::SerializeKey;
8use crate::serialized_types::{Version, Versioned, VersionedLatest};
9use anyhow::Error;
10use async_trait::async_trait;
11use fprint::TypeFingerprint;
12use serde::{Deserialize, Serialize};
13use std::fmt::Debug;
14use std::future::Future;
15use std::hash::Hash;
16use std::marker::PhantomData;
17use std::pin::Pin;
18use std::sync::Arc;
19
20pub use fxfs_macros::impl_fuzzy_hash;
21
22// Force keys to be sorted first by a u64, so that they can be located approximately based on only
23// that integer without the whole key.
24pub trait SortByU64: Sized {
25 // Return the u64 that is used as the first value when deciding on sort order of the key.
26 fn get_leading_u64(&self) -> u64;
27}
28
29/// An extension to `std::hash::Hash` to support values which should be partitioned and hashed into
30/// buckets, where nearby keys will have the same hash value. This is used for existence filtering
31/// in layer files (see `Layer::maybe_contains_key`).
32///
33/// For point-based keys, this can be the same as `std::hash::Hash`, but for range-based keys, the
34/// hash can collapse nearby ranges into the same hash value. Since a range-based key may span
35/// several buckets, `FuzzyHash::fuzzy_hash` must be called to split the key up into each of the
36/// possible values that it overlaps with.
37pub trait FuzzyHash: Hash + Sized {
38 /// To support range-based keys, multiple hash values may need to be checked for a given key.
39 /// For example, an extent [0..1024) might return extents [0..512), [512..1024), each of which
40 /// will have a unique return value for `Self::hash`. For point-based keys, a single hash
41 /// suffices, in which case None is returned and the hash value of `self` should be checked.
42 /// Note that in general only a small number of partitions (e.g. 2) should be checked at once.
43 /// Debug assertions will fire if too large of a range is checked.
44 fn fuzzy_hash(&self) -> impl Iterator<Item = u64>;
45
46 /// Returns whether the type is a range-based key. Used to prevent use of range-based keys as
47 /// a point query (see [`crate::lsm_tree::merge::Query::Point`]).
48 fn is_range_key(&self) -> bool {
49 false
50 }
51}
52
53impl_fuzzy_hash!(u8);
54impl_fuzzy_hash!(u32);
55impl_fuzzy_hash!(u64);
56impl_fuzzy_hash!(String);
57impl_fuzzy_hash!(Vec<u8>);
58
59/// Keys and values need to implement the following traits. For merging, they need to implement
60/// MergeableKey. TODO: Use trait_alias when available.
61pub trait Key:
62 Clone
63 + Debug
64 + Hash
65 + FuzzyHash
66 + OrdUpperBound
67 + Send
68 + SortByU64
69 + Sync
70 + Versioned
71 + VersionedLatest
72 + SerializeKey
73 + std::marker::Unpin
74 + 'static
75{
76}
77
78impl<K> Key for K where
79 K: Clone
80 + Debug
81 + Hash
82 + FuzzyHash
83 + OrdUpperBound
84 + Send
85 + SortByU64
86 + Sync
87 + Versioned
88 + VersionedLatest
89 + SerializeKey
90 + std::marker::Unpin
91 + 'static
92{
93}
94
95pub trait MergeableKey: Key + Eq + LayerKey + OrdLowerBound {}
96impl<K> MergeableKey for K where K: Key + Eq + LayerKey + OrdLowerBound {}
97
98/// Trait required for supporting Layer functionality.
99pub trait LayerValue:
100 Clone + Send + Sync + Versioned + VersionedLatest + Debug + std::marker::Unpin + 'static
101{
102}
103impl<V> LayerValue for V where
104 V: Clone + Send + Sync + Versioned + VersionedLatest + Debug + std::marker::Unpin + 'static
105{
106}
107
108/// Superset of `LayerValue` to additionally support tree searching, requires comparison and an
109/// `DELETED_MARKER` for indicating empty values used to indicate deletion in the `LSMTree`.
110pub trait Value: PartialEq + LayerValue {
111 /// Value used to represent that the entry is actually empty, and should be ignored.
112 const DELETED_MARKER: Self;
113}
114
115/// ItemRef is a struct that contains references to key and value, which is useful since in many
116/// cases since keys and values are stored separately so &Item is not possible.
117#[derive(Debug, PartialEq, Eq, Serialize)]
118pub struct ItemRef<'a, K, V> {
119 pub key: &'a K,
120 pub value: &'a V,
121}
122
123impl<K: Clone, V: Clone> ItemRef<'_, K, V> {
124 pub fn cloned(&self) -> Item<K, V> {
125 Item { key: self.key.clone(), value: self.value.clone() }
126 }
127
128 pub fn boxed(&self) -> BoxedItem<K, V> {
129 Box::new(self.cloned())
130 }
131}
132
133impl<'a, K, V> Clone for ItemRef<'a, K, V> {
134 fn clone(&self) -> Self {
135 *self
136 }
137}
138impl<'a, K, V> Copy for ItemRef<'a, K, V> {}
139
140/// Item is a struct that combines a key and a value.
141#[derive(Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
142#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
143pub struct Item<K, V> {
144 pub key: K,
145 pub value: V,
146}
147
148#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
149#[derive(Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
150pub struct LegacyItem<K, V> {
151 pub key: K,
152 pub value: V,
153 pub sequence: u64,
154}
155
156impl<K: TypeFingerprint, V: TypeFingerprint> TypeFingerprint for LegacyItem<K, V> {
157 fn fingerprint() -> String {
158 "struct {key:".to_owned()
159 + &K::fingerprint()
160 + ",value:"
161 + &V::fingerprint()
162 + ",sequence:u64}"
163 }
164}
165
166pub type BoxedItem<K, V> = Box<Item<K, V>>;
167
168// Nb: type-fprint doesn't support generics yet.
169impl<K: TypeFingerprint, V: TypeFingerprint> TypeFingerprint for Item<K, V> {
170 fn fingerprint() -> String {
171 "struct {key:".to_owned() + &K::fingerprint() + ",value:" + &V::fingerprint() + "}"
172 }
173}
174
175impl<K, V> Item<K, V> {
176 pub fn new(key: K, value: V) -> Item<K, V> {
177 Item { key, value }
178 }
179
180 pub fn as_item_ref(&self) -> ItemRef<'_, K, V> {
181 self.into()
182 }
183
184 pub fn boxed(self) -> BoxedItem<K, V> {
185 Box::new(self)
186 }
187}
188
189impl<'a, K, V> From<&'a Item<K, V>> for ItemRef<'a, K, V> {
190 fn from(item: &'a Item<K, V>) -> ItemRef<'a, K, V> {
191 ItemRef { key: &item.key, value: &item.value }
192 }
193}
194
195/// The find functions will return items with keys that are greater-than or equal to the search key,
196/// so for keys that are like extents, the keys should sort (via OrdUpperBound) using the end
197/// of their ranges, and you should set the search key accordingly.
198///
199/// For example, let's say the tree holds extents 100..200, 200..250 and you want to perform a read
200/// for range 150..250, you should search for 0..151 which will first return the extent 100..200
201/// (and then the iterator can be advanced to 200..250 after). When merging, keys can overlap, so
202/// consider the case where we want to merge an extent with range 100..300 with an existing extent
203/// of 200..250. In that case, we want to treat the extent with range 100..300 as lower than the key
204/// 200..250 because we'll likely want to split the extents (e.g. perhaps we want 100..200,
205/// 200..250, 250..300), so for merging, we need to use a different comparison function and we deal
206/// with that using the OrdLowerBound trait.
207///
208/// If your keys don't have overlapping ranges that need to be merged, then these can be the same as
209/// std::cmp::Ord (use the DefaultOrdUpperBound and DefaultOrdLowerBound traits).
210
211pub trait OrdUpperBound {
212 fn cmp_upper_bound(&self, other: &Self) -> std::cmp::Ordering;
213}
214
215pub trait DefaultOrdUpperBound: OrdUpperBound + Ord {}
216
217impl<T: DefaultOrdUpperBound> OrdUpperBound for T {
218 fn cmp_upper_bound(&self, other: &Self) -> std::cmp::Ordering {
219 // Default to using cmp.
220 self.cmp(other)
221 }
222}
223
224pub trait OrdLowerBound {
225 fn cmp_lower_bound(&self, other: &Self) -> std::cmp::Ordering;
226}
227
228pub trait DefaultOrdLowerBound: OrdLowerBound + Ord {}
229
230impl<T: DefaultOrdLowerBound> OrdLowerBound for T {
231 fn cmp_lower_bound(&self, other: &Self) -> std::cmp::Ordering {
232 // Default to using cmp.
233 self.cmp(other)
234 }
235}
236
237/// Result returned by `merge_type()` to determine how to properly merge values within a layerset.
238#[derive(Clone, PartialEq)]
239pub enum MergeType {
240 /// Always includes every layer in the merger, when seeking or advancing. Always correct, but
241 /// always as slow as possible.
242 FullMerge,
243
244 /// Stops seeking older layers when an exact key match is found in a newer one. Useful for keys
245 /// that only replace data, or with `next_key()` implementations to decide on continued merging.
246 OptimizedMerge,
247}
248
249/// Determines how to iterate forward from the current key, and how many older layers to include
250/// when merging. See the different variants of `MergeKeyType` for more details.
251pub trait LayerKey: Clone {
252 /// Called to determine how to perform merge behaviours while advancing through a layer set.
253 fn merge_type(&self) -> MergeType {
254 // Defaults to full merge. The slowest, but most predictable in behaviour.
255 MergeType::FullMerge
256 }
257
258 /// The next_key() call allows for an optimisation which allows the merger to avoid querying a
259 /// layer if it knows it has found the next possible key. It only makes sense for this to
260 /// return Some() when merge_type() returns OptimizedMerge. Consider the following example
261 /// showing two layers with range based keys.
262 ///
263 /// +----------+------------+
264 /// 0 | 0..100 | 100..200 |
265 /// +----------+------------+
266 /// 1 | 100..200 |
267 /// +------------+
268 ///
269 /// If you search and find the 0..100 key, then only layer 0 will be touched. If you then want
270 /// to advance to the 100..200 record, you can find it in layer 0 but unless you know that it
271 /// immediately follows the 0..100 key i.e. that there is no possible key, K, such that 0..100 <
272 /// K < 100..200, the merger has to consult all other layers to check. `next_key` should return
273 /// a search key for an extent that immediately follows. In practice, for extents, this should
274 /// be 0..end + 1.
275 ///
276 /// This is purely an optimisation; the default None will be correct but not performant.
277 fn next_key(&self) -> Option<Self> {
278 None
279 }
280 /// Returns the search key (S) for this key (K), such that when when searching for S in a layer
281 /// file, it returns the earliest possible key that might be relevant to K. Searching in a
282 /// layer file is done using `cmp_upper_bound` and the iterator will be positioned on a key that
283 /// is greater than or equal to S. Returning `None` here is the right thing to do for
284 /// non-ranged based keys, in which case K is used to search for the key. In practice, the
285 /// implementation should return `Some(0..start + 1)` for range based keys and `None` for
286 /// everything else. As an example, if the tree has extents 50..150 and 150..200 and we wish to
287 /// search for 100..200, search_key would return 0..101 which would position the iterator on
288 /// 50..150. If this method is overridden, `is_search_key` below should also be overridden.
289 fn search_key(&self) -> Option<Self> {
290 None
291 }
292
293 /// If you override `search_key` you should override `is_search_key`.
294 fn is_search_key(&self) -> bool {
295 true
296 }
297
298 /// Returns true if the keys are overlapping or equal.
299 fn overlaps(&self, other: &Self) -> bool;
300}
301
302#[derive(Debug, Eq, PartialEq, Clone, Copy)]
303pub enum Existence {
304 /// The key definitely exists.
305 Exists,
306 /// The key might exist.
307 MaybeExists,
308 /// The key definitely does not exist.
309 Missing,
310}
311
312/// Layer is a trait that all layers need to implement (mutable and immutable).
313#[async_trait]
314pub trait Layer<K, V>: Send + Sync {
315 /// If the layer is persistent, returns the handle to its contents. Returns None for in-memory
316 /// layers.
317 fn handle(&self) -> Option<&dyn ReadObjectHandle> {
318 None
319 }
320
321 /// Some layer implementations may choose to cache data in-memory. Calling this function will
322 /// request that the layer purges unused cached data. This is intended to run on a timer.
323 fn purge_cached_data(&self) {}
324
325 /// Searches for a key. Bound::Excluded is not supported. Bound::Unbounded positions the
326 /// iterator on the first item in the layer.
327 async fn seek(&self, bound: std::ops::Bound<&K>)
328 -> Result<BoxedLayerIterator<'_, K, V>, Error>;
329
330 /// Returns the number of items in the layer file.
331 fn len(&self) -> usize;
332
333 /// Returns whether the layer *might* contain records relevant to `key`. Note that this can
334 /// return true even if the layer has no records relevant to `key`, but it will never return
335 /// false if there are such records. (As such, always returning true is a trivially correct
336 /// implementation.)
337 fn maybe_contains_key(&self, _key: &K) -> bool {
338 true
339 }
340
341 /// This is similar to `maybe_contains_key` except that there *must* be a `key` and possible
342 /// state where this will indicate the key is missing i.e. *always* returning
343 /// `Existence::MaybeExists` is *not* a correct implementation. If an implementation has a low
344 /// cost way to determine if the key *might* exist, then it may use that, but if not, it must
345 /// use a slower algorithm. This method was introduced to allow for an efficient way of
346 /// determining if a particular key is free to be used. `maybe_contains_key` cannot be used for
347 /// this purpose because implementations might *always* return `true` which would mean it would
348 /// be impossible to ever find a key that is free to use. It might not be appropriate to use
349 /// this with range based keys: implementations should use `cmp_upper_bound` and test for
350 /// equality, which might not give the desired results for range based keys.
351 async fn key_exists(&self, key: &K) -> Result<Existence, Error>;
352
353 /// Locks the layer preventing it from being closed. This will never block i.e. there can be
354 /// many locks concurrently. The lock is purely advisory: seek will still work even if lock has
355 /// not been called; it merely causes close to wait until all locks are released. Returns None
356 /// if close has been called for the layer.
357 fn lock(&self) -> Option<Arc<DropEvent>>;
358
359 /// Waits for existing locks readers to finish and then returns. Subsequent calls to lock will
360 /// return None.
361 async fn close(&self);
362
363 /// Returns the version number used by structs in this layer
364 fn get_version(&self) -> Version;
365
366 /// Records inspect data for the layer into `node`. Called lazily when inspect is queried.
367 fn record_inspect_data(self: Arc<Self>, _node: &fuchsia_inspect::Node) {}
368}
369
370/// Something that implements LayerIterator is returned by the seek function.
371#[async_trait]
372pub trait LayerIterator<K, V>: Send + Sync {
373 /// Advances the iterator.
374 async fn advance(&mut self) -> Result<(), Error>;
375
376 /// Returns the current item. This will be None if called when the iterator is first crated i.e.
377 /// before either seek or advance has been called, and None if the iterator has reached the end
378 /// of the layer.
379 fn get(&self) -> Option<ItemRef<'_, K, V>>;
380
381 /// Creates an iterator that only yields items from the underlying iterator for which
382 /// `predicate` returns `true`.
383 async fn filter<P>(self, predicate: P) -> Result<FilterLayerIterator<Self, P, K, V>, Error>
384 where
385 P: for<'b> Fn(ItemRef<'b, K, V>) -> bool + Send + Sync,
386 Self: Sized,
387 K: Send + Sync,
388 V: Send + Sync,
389 {
390 FilterLayerIterator::new(self, predicate).await
391 }
392}
393
394pub type BoxedLayerIterator<'iter, K, V> = Box<dyn LayerIterator<K, V> + 'iter>;
395
396impl<'iter, K, V> LayerIterator<K, V> for BoxedLayerIterator<'iter, K, V> {
397 // Manual expansion of `async_trait` to avoid double boxing the `Future`.
398 fn advance<'a, 'b>(&'a mut self) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'b>>
399 where
400 'a: 'b,
401 Self: 'b,
402 {
403 (**self).advance()
404 }
405 fn get(&self) -> Option<ItemRef<'_, K, V>> {
406 (**self).get()
407 }
408}
409
410/// Mutable layers need an iterator that implements this in order to make merge_into work.
411pub(super) trait LayerIteratorMut<K, V>: Sync {
412 /// Advances the iterator.
413 fn advance(&mut self);
414
415 /// Returns the current item. This will be None if called when the iterator is first crated i.e.
416 /// before either seek or advance has been called, and None if the iterator has reached the end
417 /// of the layer.
418 fn get(&self) -> Option<ItemRef<'_, K, V>>;
419
420 /// Inserts the item before the item that the iterator is located at. The insert won't be
421 /// visible until the changes are committed (see `commit`).
422 fn insert(&mut self, item: Item<K, V>);
423
424 /// Erases the current item and positions the iterator on the next item, if any. The change
425 /// won't be visible until committed (see `commit`).
426 fn erase(&mut self);
427
428 /// Commits the changes. This does not wait for existing readers to finish.
429 fn commit(&mut self);
430}
431
432/// Trait for writing new layers.
433pub trait LayerWriter<K, V>: Sized
434where
435 K: Debug + Send + Versioned + Sync,
436 V: Debug + Send + Versioned + Sync,
437{
438 /// Writes the given item to this layer.
439 fn write(&mut self, item: ItemRef<'_, K, V>) -> impl Future<Output = Result<(), Error>> + Send;
440
441 /// Flushes any buffered items to the backing storage.
442 fn flush(&mut self) -> impl Future<Output = Result<(), Error>> + Send;
443
444 /// Returns the total bytes written to the layer.
445 fn bytes_written(&self) -> u64;
446}
447
448/// A `LayerIterator`` that filters the items of another `LayerIterator`.
449pub struct FilterLayerIterator<I, P, K, V> {
450 iter: I,
451 predicate: P,
452 _key: PhantomData<K>,
453 _value: PhantomData<V>,
454}
455
456impl<I, P, K, V> FilterLayerIterator<I, P, K, V>
457where
458 I: LayerIterator<K, V>,
459 P: for<'b> Fn(ItemRef<'b, K, V>) -> bool + Send + Sync,
460{
461 async fn new(iter: I, predicate: P) -> Result<Self, Error> {
462 let mut filter = Self { iter, predicate, _key: PhantomData, _value: PhantomData };
463 filter.skip_filtered().await?;
464 Ok(filter)
465 }
466
467 async fn skip_filtered(&mut self) -> Result<(), Error> {
468 loop {
469 match self.iter.get() {
470 Some(item) if !(self.predicate)(item) => {}
471 _ => return Ok(()),
472 }
473 self.iter.advance().await?;
474 }
475 }
476}
477
478#[async_trait]
479impl<I, P, K, V> LayerIterator<K, V> for FilterLayerIterator<I, P, K, V>
480where
481 I: LayerIterator<K, V>,
482 P: for<'b> Fn(ItemRef<'b, K, V>) -> bool + Send + Sync,
483 K: Send + Sync,
484 V: Send + Sync,
485{
486 async fn advance(&mut self) -> Result<(), Error> {
487 self.iter.advance().await?;
488 self.skip_filtered().await
489 }
490
491 fn get(&self) -> Option<ItemRef<'_, K, V>> {
492 self.iter.get()
493 }
494}