fxfs/lsm_tree/types.rs
1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::drop_event::DropEvent;
6use crate::object_handle::ReadObjectHandle;
7use crate::serialized_types::{Version, Versioned, VersionedLatest};
8use anyhow::Error;
9use async_trait::async_trait;
10use fprint::TypeFingerprint;
11use serde::{Deserialize, Serialize};
12use std::fmt::Debug;
13use std::future::Future;
14use std::hash::Hash;
15use std::marker::PhantomData;
16use std::pin::Pin;
17use std::sync::Arc;
18
19pub use fxfs_macros::impl_fuzzy_hash;
20
21// Force keys to be sorted first by a u64, so that they can be located approximately based on only
22// that integer without the whole key.
23pub trait SortByU64 {
24 // Return the u64 that is used as the first value when deciding on sort order of the key.
25 fn get_leading_u64(&self) -> u64;
26}
27
28/// An extension to `std::hash::Hash` to support values which should be partitioned and hashed into
29/// buckets, where nearby keys will have the same hash value. This is used for existence filtering
30/// in layer files (see `Layer::maybe_contains_key`).
31///
32/// For point-based keys, this can be the same as `std::hash::Hash`, but for range-based keys, the
33/// hash can collapse nearby ranges into the same hash value. Since a range-based key may span
34/// several buckets, `FuzzyHash::fuzzy_hash` must be called to split the key up into each of the
35/// possible values that it overlaps with.
36pub trait FuzzyHash: Hash + Sized {
37 type Iter: Iterator<Item = u64>;
38 /// To support range-based keys, multiple hash values may need to be checked for a given key.
39 /// For example, an extent [0..1024) might return extents [0..512), [512..1024), each of which
40 /// will have a unique return value for `Self::hash`. For point-based keys, a single hash
41 /// suffices, in which case None is returned and the hash value of `self` should be checked.
42 /// Note that in general only a small number of partitions (e.g. 2) should be checked at once.
43 /// Debug assertions will fire if too large of a range is checked.
44 fn fuzzy_hash(&self) -> Self::Iter;
45
46 /// Returns whether the type is a range-based key.
47 fn is_range_key(&self) -> bool {
48 false
49 }
50}
51
52impl_fuzzy_hash!(u8);
53impl_fuzzy_hash!(u32);
54impl_fuzzy_hash!(u64);
55impl_fuzzy_hash!(String);
56impl_fuzzy_hash!(Vec<u8>);
57
58/// Keys and values need to implement the following traits. For merging, they need to implement
59/// MergeableKey. TODO: Use trait_alias when available.
60pub trait Key:
61 Clone
62 + Debug
63 + Hash
64 + FuzzyHash
65 + OrdUpperBound
66 + Send
67 + SortByU64
68 + Sync
69 + Versioned
70 + VersionedLatest
71 + std::marker::Unpin
72 + 'static
73{
74}
75
76pub trait RangeKey: Key {
77 /// Returns if two keys overlap.
78 fn overlaps(&self, other: &Self) -> bool;
79}
80
81impl<K> Key for K where
82 K: Clone
83 + Debug
84 + Hash
85 + FuzzyHash
86 + OrdUpperBound
87 + Send
88 + SortByU64
89 + Sync
90 + Versioned
91 + VersionedLatest
92 + std::marker::Unpin
93 + 'static
94{
95}
96
97pub trait MergeableKey: Key + Eq + LayerKey + OrdLowerBound {}
98impl<K> MergeableKey for K where K: Key + Eq + LayerKey + OrdLowerBound {}
99
100/// Trait required for supporting Layer functionality.
101pub trait LayerValue:
102 Clone + Send + Sync + Versioned + VersionedLatest + Debug + std::marker::Unpin + 'static
103{
104}
105impl<V> LayerValue for V where
106 V: Clone + Send + Sync + Versioned + VersionedLatest + Debug + std::marker::Unpin + 'static
107{
108}
109
110/// Superset of `LayerValue` to additionally support tree searching, requires comparison and an
111/// `DELETED_MARKER` for indicating empty values used to indicate deletion in the `LSMTree`.
112pub trait Value: PartialEq + LayerValue {
113 /// Value used to represent that the entry is actually empty, and should be ignored.
114 const DELETED_MARKER: Self;
115}
116
117/// ItemRef is a struct that contains references to key and value, which is useful since in many
118/// cases since keys and values are stored separately so &Item is not possible.
119#[derive(Debug, PartialEq, Eq, Serialize)]
120pub struct ItemRef<'a, K, V> {
121 pub key: &'a K,
122 pub value: &'a V,
123 pub sequence: u64,
124}
125
126impl<K: Clone, V: Clone> ItemRef<'_, K, V> {
127 pub fn cloned(&self) -> Item<K, V> {
128 Item { key: self.key.clone(), value: self.value.clone(), sequence: self.sequence }
129 }
130
131 pub fn boxed(&self) -> BoxedItem<K, V> {
132 Box::new(self.cloned())
133 }
134}
135
136impl<'a, K, V> Clone for ItemRef<'a, K, V> {
137 fn clone(&self) -> Self {
138 *self
139 }
140}
141impl<'a, K, V> Copy for ItemRef<'a, K, V> {}
142
143/// Item is a struct that combines a key and a value.
144#[derive(Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
145#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
146pub struct Item<K, V> {
147 pub key: K,
148 pub value: V,
149 /// |sequence| is a monotonically increasing sequence number for the Item, which is set when the
150 /// Item is inserted into the tree. In practice, this is the journal file offset at the time of
151 /// committing the transaction containing the Item. Note that two or more Items may share the
152 /// same |sequence|.
153 pub sequence: u64,
154}
155
156pub type BoxedItem<K, V> = Box<Item<K, V>>;
157
158// Nb: type-fprint doesn't support generics yet.
159impl<K: TypeFingerprint, V: TypeFingerprint> TypeFingerprint for Item<K, V> {
160 fn fingerprint() -> String {
161 "struct {key:".to_owned()
162 + &K::fingerprint()
163 + ",value:"
164 + &V::fingerprint()
165 + ",sequence:u64}"
166 }
167}
168
169impl<K, V> Item<K, V> {
170 pub fn new(key: K, value: V) -> Item<K, V> {
171 Item { key, value, sequence: 0u64 }
172 }
173
174 pub fn new_with_sequence(key: K, value: V, sequence: u64) -> Item<K, V> {
175 Item { key, value, sequence }
176 }
177
178 pub fn as_item_ref(&self) -> ItemRef<'_, K, V> {
179 self.into()
180 }
181
182 pub fn boxed(self) -> BoxedItem<K, V> {
183 Box::new(self)
184 }
185}
186
187impl<'a, K, V> From<&'a Item<K, V>> for ItemRef<'a, K, V> {
188 fn from(item: &'a Item<K, V>) -> ItemRef<'a, K, V> {
189 ItemRef { key: &item.key, value: &item.value, sequence: item.sequence }
190 }
191}
192
193impl<'a, K, V> From<&'a BoxedItem<K, V>> for ItemRef<'a, K, V> {
194 fn from(item: &'a BoxedItem<K, V>) -> ItemRef<'a, K, V> {
195 ItemRef { key: &item.key, value: &item.value, sequence: item.sequence }
196 }
197}
198
199/// The find functions will return items with keys that are greater-than or equal to the search key,
200/// so for keys that are like extents, the keys should sort (via OrdUpperBound) using the end
201/// of their ranges, and you should set the search key accordingly.
202///
203/// For example, let's say the tree holds extents 100..200, 200..250 and you want to perform a read
204/// for range 150..250, you should search for 0..151 which will first return the extent 100..200
205/// (and then the iterator can be advanced to 200..250 after). When merging, keys can overlap, so
206/// consider the case where we want to merge an extent with range 100..300 with an existing extent
207/// of 200..250. In that case, we want to treat the extent with range 100..300 as lower than the key
208/// 200..250 because we'll likely want to split the extents (e.g. perhaps we want 100..200,
209/// 200..250, 250..300), so for merging, we need to use a different comparison function and we deal
210/// with that using the OrdLowerBound trait.
211///
212/// If your keys don't have overlapping ranges that need to be merged, then these can be the same as
213/// std::cmp::Ord (use the DefaultOrdUpperBound and DefaultOrdLowerBound traits).
214
215pub trait OrdUpperBound {
216 fn cmp_upper_bound(&self, other: &Self) -> std::cmp::Ordering;
217}
218
219pub trait DefaultOrdUpperBound: OrdUpperBound + Ord {}
220
221impl<T: DefaultOrdUpperBound> OrdUpperBound for T {
222 fn cmp_upper_bound(&self, other: &Self) -> std::cmp::Ordering {
223 // Default to using cmp.
224 self.cmp(other)
225 }
226}
227
228pub trait OrdLowerBound {
229 fn cmp_lower_bound(&self, other: &Self) -> std::cmp::Ordering;
230}
231
232pub trait DefaultOrdLowerBound: OrdLowerBound + Ord {}
233
234impl<T: DefaultOrdLowerBound> OrdLowerBound for T {
235 fn cmp_lower_bound(&self, other: &Self) -> std::cmp::Ordering {
236 // Default to using cmp.
237 self.cmp(other)
238 }
239}
240
241/// Result returned by `merge_type()` to determine how to properly merge values within a layerset.
242#[derive(Clone, PartialEq)]
243pub enum MergeType {
244 /// Always includes every layer in the merger, when seeking or advancing. Always correct, but
245 /// always as slow as possible.
246 FullMerge,
247
248 /// Stops seeking older layers when an exact key match is found in a newer one. Useful for keys
249 /// that only replace data, or with `next_key()` implementations to decide on continued merging.
250 OptimizedMerge,
251}
252
253/// Determines how to iterate forward from the current key, and how many older layers to include
254/// when merging. See the different variants of `MergeKeyType` for more details.
255pub trait LayerKey: Clone {
256 /// Called to determine how to perform merge behaviours while advancing through a layer set.
257 fn merge_type(&self) -> MergeType {
258 // Defaults to full merge. The slowest, but most predictable in behaviour.
259 MergeType::FullMerge
260 }
261
262 /// The next_key() call allows for an optimisation which allows the merger to avoid querying a
263 /// layer if it knows it has found the next possible key. It only makes sense for this to
264 /// return Some() when merge_type() returns OptimizedMerge. Consider the following example
265 /// showing two layers with range based keys.
266 ///
267 /// +----------+------------+
268 /// 0 | 0..100 | 100..200 |
269 /// +----------+------------+
270 /// 1 | 100..200 |
271 /// +------------+
272 ///
273 /// If you search and find the 0..100 key, then only layer 0 will be touched. If you then want
274 /// to advance to the 100..200 record, you can find it in layer 0 but unless you know that it
275 /// immediately follows the 0..100 key i.e. that there is no possible key, K, such that
276 /// 0..100 < K < 100..200, the merger has to consult all other layers to check. next_key should
277 /// return a key, N, such that if the merger encounters a key that is <= N (using
278 /// OrdLowerBound), it can stop touching more layers. The key N should also be the the key to
279 /// search for in other layers if the merger needs to do so. In the example above, it should be
280 /// a key that is > 0..100 and 99..100, but <= 100..200 (using OrdUpperBound). In practice,
281 /// what this means is that for range based keys, OrdUpperBound should use the end of the range,
282 /// OrdLowerBound should use the start of the range, and next_key should return end..end + 1.
283 /// This is purely an optimisation; the default None will be correct but not performant.
284 fn next_key(&self) -> Option<Self> {
285 None
286 }
287 /// Returns the search key for this extent; that is, a key which is <= this key under Ord and
288 /// OrdLowerBound. Note that this is only used for Query::LimitedRange queries (where
289 /// `Self::partition` returns Some).
290 /// For example, if the tree has extents 50..150 and 150..200 and we wish to read 100..200, we'd
291 /// search for 0..101 which would set the iterator to 50..150.
292 fn search_key(&self) -> Self {
293 unreachable!()
294 }
295}
296
297/// See `Layer::len`.
298pub enum ItemCount {
299 Precise(usize),
300 Estimate(usize),
301}
302
303impl std::ops::Deref for ItemCount {
304 type Target = usize;
305 fn deref(&self) -> &Self::Target {
306 match self {
307 Self::Precise(size) => size,
308 Self::Estimate(size) => size,
309 }
310 }
311}
312
313/// Layer is a trait that all layers need to implement (mutable and immutable).
314#[async_trait]
315pub trait Layer<K, V>: Send + Sync {
316 /// If the layer is persistent, returns the handle to its contents. Returns None for in-memory
317 /// layers.
318 fn handle(&self) -> Option<&dyn ReadObjectHandle> {
319 None
320 }
321
322 /// Some layer implementations may choose to cache data in-memory. Calling this function will
323 /// request that the layer purges unused cached data. This is intended to run on a timer.
324 fn purge_cached_data(&self) {}
325
326 /// Searches for a key. Bound::Excluded is not supported. Bound::Unbounded positions the
327 /// iterator on the first item in the layer.
328 async fn seek(&self, bound: std::ops::Bound<&K>)
329 -> Result<BoxedLayerIterator<'_, K, V>, Error>;
330
331 /// Returns the number of items in the layer file, or an estimate if not known.
332 /// Old persistent layer formats did not keep track of how many entries they have, hence the
333 /// estimate. If this is wrong, bloom filter sizing might be off, but that won't affect
334 /// correctness, and will wash out with future compactions anyways.
335 fn estimated_len(&self) -> ItemCount;
336
337 /// Returns whether the layer *might* contain records relevant to `key`. Note that this can
338 /// return true even if the layer has no records relevant to `key`, but it will never return
339 /// false if there are such records. (As such, always returning true is a trivially correct
340 /// implementation.)
341 fn maybe_contains_key(&self, _key: &K) -> bool {
342 true
343 }
344
345 /// Locks the layer preventing it from being closed. This will never block i.e. there can be
346 /// many locks concurrently. The lock is purely advisory: seek will still work even if lock has
347 /// not been called; it merely causes close to wait until all locks are released. Returns None
348 /// if close has been called for the layer.
349 fn lock(&self) -> Option<Arc<DropEvent>>;
350
351 /// Waits for existing locks readers to finish and then returns. Subsequent calls to lock will
352 /// return None.
353 async fn close(&self);
354
355 /// Returns the version number used by structs in this layer
356 fn get_version(&self) -> Version;
357
358 /// Records inspect data for the layer into `node`. Called lazily when inspect is queried.
359 fn record_inspect_data(self: Arc<Self>, _node: &fuchsia_inspect::Node) {}
360}
361
362/// Something that implements LayerIterator is returned by the seek function.
363#[async_trait]
364pub trait LayerIterator<K, V>: Send + Sync {
365 /// Advances the iterator.
366 async fn advance(&mut self) -> Result<(), Error>;
367
368 /// Returns the current item. This will be None if called when the iterator is first crated i.e.
369 /// before either seek or advance has been called, and None if the iterator has reached the end
370 /// of the layer.
371 fn get(&self) -> Option<ItemRef<'_, K, V>>;
372
373 /// Creates an iterator that only yields items from the underlying iterator for which
374 /// `predicate` returns `true`.
375 async fn filter<P>(self, predicate: P) -> Result<FilterLayerIterator<Self, P, K, V>, Error>
376 where
377 P: for<'b> Fn(ItemRef<'b, K, V>) -> bool + Send + Sync,
378 Self: Sized,
379 K: Send + Sync,
380 V: Send + Sync,
381 {
382 FilterLayerIterator::new(self, predicate).await
383 }
384}
385
386pub type BoxedLayerIterator<'iter, K, V> = Box<dyn LayerIterator<K, V> + 'iter>;
387
388impl<'iter, K, V> LayerIterator<K, V> for BoxedLayerIterator<'iter, K, V> {
389 // Manual expansion of `async_trait` to avoid double boxing the `Future`.
390 fn advance<'a, 'b>(&'a mut self) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'b>>
391 where
392 'a: 'b,
393 Self: 'b,
394 {
395 (**self).advance()
396 }
397 fn get(&self) -> Option<ItemRef<'_, K, V>> {
398 (**self).get()
399 }
400}
401
402/// Mutable layers need an iterator that implements this in order to make merge_into work.
403pub(super) trait LayerIteratorMut<K, V>: Sync {
404 /// Advances the iterator.
405 fn advance(&mut self);
406
407 /// Returns the current item. This will be None if called when the iterator is first crated i.e.
408 /// before either seek or advance has been called, and None if the iterator has reached the end
409 /// of the layer.
410 fn get(&self) -> Option<ItemRef<'_, K, V>>;
411
412 /// Inserts the item before the item that the iterator is located at. The insert won't be
413 /// visible until the changes are committed (see `commit`).
414 fn insert(&mut self, item: Item<K, V>);
415
416 /// Erases the current item and positions the iterator on the next item, if any. The change
417 /// won't be visible until committed (see `commit`).
418 fn erase(&mut self);
419
420 /// Commits the changes. This does not wait for existing readers to finish.
421 fn commit(&mut self);
422}
423
424/// Trait for writing new layers.
425pub trait LayerWriter<K, V>: Sized
426where
427 K: Debug + Send + Versioned + Sync,
428 V: Debug + Send + Versioned + Sync,
429{
430 /// Writes the given item to this layer.
431 fn write(&mut self, item: ItemRef<'_, K, V>) -> impl Future<Output = Result<(), Error>> + Send;
432
433 /// Flushes any buffered items to the backing storage.
434 fn flush(&mut self) -> impl Future<Output = Result<(), Error>> + Send;
435}
436
437/// A `LayerIterator`` that filters the items of another `LayerIterator`.
438pub struct FilterLayerIterator<I, P, K, V> {
439 iter: I,
440 predicate: P,
441 _key: PhantomData<K>,
442 _value: PhantomData<V>,
443}
444
445impl<I, P, K, V> FilterLayerIterator<I, P, K, V>
446where
447 I: LayerIterator<K, V>,
448 P: for<'b> Fn(ItemRef<'b, K, V>) -> bool + Send + Sync,
449{
450 async fn new(iter: I, predicate: P) -> Result<Self, Error> {
451 let mut filter = Self { iter, predicate, _key: PhantomData, _value: PhantomData };
452 filter.skip_filtered().await?;
453 Ok(filter)
454 }
455
456 async fn skip_filtered(&mut self) -> Result<(), Error> {
457 loop {
458 match self.iter.get() {
459 Some(item) if !(self.predicate)(item) => {}
460 _ => return Ok(()),
461 }
462 self.iter.advance().await?;
463 }
464 }
465}
466
467#[async_trait]
468impl<I, P, K, V> LayerIterator<K, V> for FilterLayerIterator<I, P, K, V>
469where
470 I: LayerIterator<K, V>,
471 P: for<'b> Fn(ItemRef<'b, K, V>) -> bool + Send + Sync,
472 K: Send + Sync,
473 V: Send + Sync,
474{
475 async fn advance(&mut self) -> Result<(), Error> {
476 self.iter.advance().await?;
477 self.skip_filtered().await
478 }
479
480 fn get(&self) -> Option<ItemRef<'_, K, V>> {
481 self.iter.get()
482 }
483}
484
485#[cfg(test)]
486mod test_types {
487 use crate::lsm_tree::types::{
488 impl_fuzzy_hash, DefaultOrdLowerBound, DefaultOrdUpperBound, FuzzyHash, LayerKey,
489 MergeType, SortByU64,
490 };
491
492 impl DefaultOrdUpperBound for i32 {}
493 impl DefaultOrdLowerBound for i32 {}
494 impl SortByU64 for i32 {
495 fn get_leading_u64(&self) -> u64 {
496 if self >= &0 {
497 return u64::try_from(*self).unwrap() + u64::try_from(i32::MAX).unwrap() + 1;
498 }
499 u64::try_from(self + i32::MAX + 1).unwrap()
500 }
501 }
502 impl LayerKey for i32 {
503 fn merge_type(&self) -> MergeType {
504 MergeType::FullMerge
505 }
506 }
507
508 impl_fuzzy_hash!(i32);
509}