fxfs/lsm_tree/
persistent_layer.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5// PersistentLayer object format
6//
7// The layer is made up of 1 or more "blocks" whose size are some multiple of the block size used
8// by the underlying handle.
9//
10// The persistent layer has 4 types of blocks:
11//  - Header block
12//  - Data block
13//  - BloomFilter block
14//  - Seek block (+LayerInfo)
15//
16// The structure of the file is as follows:
17//
18// blk#     contents
19// 0        [Header]
20// 1        [Data]
21// 2        [Data]
22// ...      [Data]
23// L        [BloomFilter]
24// L + 1    [BloomFilter]
25// ...      [BloomFilter]
26// M        [Seek]
27// M + 1    [Seek]
28// ...      [Seek]
29// N        [Seek/LayerInfo]
30//
31// Generally, there will be an order of magnitude more Data blocks than Seek/BloomFilter blocks.
32//
33// Header contains a Version-prefixed LayerHeader struct.  This version is used for everything in
34// the layer file.
35//
36// Data blocks contain a little endian encoded u16 item count at the start, then a series of
37// serialized items, and a list of little endian u16 offsets within the block for where
38// serialized items start, excluding the first item (since it is at a known offset). The list of
39// offsets ends at the end of the block and since the items are of variable length, there may be
40// space between the two sections if the next item and its offset cannot fit into the block.
41//
42// |item_count|item|item|item|item|item|item|dead space|offset|offset|offset|offset|offset|
43//
44// BloomFilter blocks contain a bitmap which is used to probabilistically determine if a given key
45// might exist in the layer file.   See `BloomFilter` for details on this structure.  Note that this
46// can be absent from the file for small layer files.
47//
48// Seek/LayerInfo blocks contain both the seek table, and a single LayerInfo struct at the tail of
49// the last block, with the LayerInfo's length written as a little-endian u64 at the very end.  The
50// padding between the two structs is ignored but nominally is zeroed. They share blocks to avoid
51// wasting padding bytes.  Note that the seek table can be absent from the file for small layer
52// files (but there will always be one block for the LayerInfo).
53//
54// The seek table consists of a little-endian u64 for every data block except for the first one. The
55// entries should be monotonically increasing, as they represent some mapping for how the keys for
56// the first item in each block would be predominantly sorted, and there may be duplicate entries.
57// There should be exactly as many seek blocks as are required to house one entry fewer than the
58// number of data blocks.
59
60use crate::drop_event::DropEvent;
61use crate::errors::FxfsError;
62use crate::filesystem::MAX_BLOCK_SIZE;
63use crate::log::*;
64use crate::lsm_tree::bloom_filter::{BloomFilterReader, BloomFilterStats, BloomFilterWriter};
65use crate::lsm_tree::types::{
66    BoxedLayerIterator, Existence, FuzzyHash, Item, ItemRef, Key, Layer, LayerIterator, LayerValue,
67    LayerWriter,
68};
69use crate::object_handle::{ObjectHandle, ReadObjectHandle, WriteBytes};
70use crate::object_store::caching_object_handle::{CHUNK_SIZE, CachedChunk, CachingObjectHandle};
71use crate::round::{round_down, round_up};
72use crate::serialized_types::{LATEST_VERSION, Version, Versioned, VersionedLatest};
73use anyhow::{Context, Error, anyhow, bail, ensure};
74use async_trait::async_trait;
75use byteorder::{ByteOrder, LittleEndian, ReadBytesExt, WriteBytesExt};
76use fprint::TypeFingerprint;
77use fuchsia_sync::Mutex;
78use serde::{Deserialize, Serialize};
79use static_assertions::const_assert;
80use std::cmp::Ordering;
81use std::io::{Read, Write as _};
82use std::marker::PhantomData;
83use std::ops::Bound;
84use std::sync::Arc;
85
86const PERSISTENT_LAYER_MAGIC: &[u8; 8] = b"FxfsLayr";
87
88/// LayerHeader is stored in the first block of the persistent layer.
89pub type LayerHeader = LayerHeaderV39;
90
91#[derive(Debug, Serialize, Deserialize, TypeFingerprint, Versioned)]
92pub struct LayerHeaderV39 {
93    /// 'FxfsLayr'
94    magic: [u8; 8],
95    /// The block size used within this layer file. This is typically set at compaction time to the
96    /// same block size as the underlying object handle.
97    ///
98    /// (Each block starts with a 2 byte item count so there is a 64k item limit per block,
99    /// regardless of block size).
100    block_size: u64,
101}
102
103/// The last block of each layer contains metadata for the rest of the layer.
104pub type LayerInfo = LayerInfoV39;
105
106#[derive(Debug, Serialize, Deserialize, TypeFingerprint, Versioned)]
107pub struct LayerInfoV39 {
108    /// How many items are in the layer file.  Mainly used for sizing bloom filters during
109    /// compaction.
110    num_items: usize,
111    /// The number of data blocks in the layer file.
112    num_data_blocks: u64,
113    /// The size of the bloom filter in the layer file.  Not necessarily block-aligned.
114    bloom_filter_size_bytes: usize,
115    /// The seed for the nonces used in the bloom filter.
116    bloom_filter_seed: u64,
117    /// How many nonces to use for bloom filter hashing.
118    bloom_filter_num_hashes: usize,
119}
120
121/// A handle to a persistent layer.
122pub struct PersistentLayer<K, V> {
123    // We retain a reference to the underlying object handle so we can hand out references to it for
124    // `Layer::handle` when clients need it.  Internal reads should go through
125    // `caching_object_handle` so they are cached.  Note that `CachingObjectHandle` used to
126    // implement `ReadObjectHandle`, but that was removed so that `CachingObjectHandle` could hand
127    // out data references rather than requiring copying to a buffer, which speeds up LSM tree
128    // operations.
129    object_handle: Arc<dyn ReadObjectHandle>,
130    caching_object_handle: CachingObjectHandle<Arc<dyn ReadObjectHandle>>,
131    version: Version,
132    block_size: u64,
133    data_size: u64,
134    seek_table: Vec<u64>,
135    num_items: usize,
136    bloom_filter: Option<BloomFilterReader<K>>,
137    bloom_filter_stats: Option<BloomFilterStats>,
138    close_event: Mutex<Option<Arc<DropEvent>>>,
139    _value_type: PhantomData<V>,
140}
141
142#[derive(Debug)]
143struct BufferCursor {
144    chunk: Option<CachedChunk>,
145    pos: usize,
146}
147
148impl std::io::Read for BufferCursor {
149    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
150        let chunk = if let Some(chunk) = &self.chunk {
151            chunk
152        } else {
153            return Ok(0);
154        };
155        let to_read = std::cmp::min(buf.len(), chunk.len().saturating_sub(self.pos));
156        if to_read > 0 {
157            buf[..to_read].copy_from_slice(&chunk[self.pos..self.pos + to_read]);
158            self.pos += to_read;
159        }
160        Ok(to_read)
161    }
162}
163
164const MIN_BLOCK_SIZE: u64 = 512;
165
166// For small layer files, don't bother with the bloom filter.  Arbitrarily chosen.
167const MINIMUM_DATA_BLOCKS_FOR_BLOOM_FILTER: usize = 4;
168
169// How many blocks we reserve for the header.  Data blocks start at this offset.
170const NUM_HEADER_BLOCKS: u64 = 1;
171
172/// The smallest possible (empty) layer file is always 2 blocks, one for the header and one for
173/// LayerInfo.
174const MINIMUM_LAYER_FILE_BLOCKS: u64 = 2;
175
176// Put safety rails on the size of the bloom filter and seek table to avoid OOMing the system.
177// It's more likely that tampering has occurred in these cases.
178const MAX_BLOOM_FILTER_SIZE: usize = 64 * 1024 * 1024;
179const MAX_SEEK_TABLE_SIZE: usize = 64 * 1024 * 1024;
180
181// The following constants refer to sizes of metadata in the data blocks.
182const PER_DATA_BLOCK_HEADER_SIZE: usize = 2;
183const PER_DATA_BLOCK_SEEK_ENTRY_SIZE: usize = 2;
184
185// A key-only iterator, used while seeking through the tree.
186struct KeyOnlyIterator<'iter, K: Key, V: LayerValue> {
187    // Allocated out of |layer|.
188    buffer: BufferCursor,
189
190    layer: &'iter PersistentLayer<K, V>,
191
192    // The position of the _next_ block to be read.
193    pos: u64,
194
195    // The item index in the current block.
196    item_index: u16,
197
198    // The number of items in the current block.
199    item_count: u16,
200
201    // The current key.
202    key: Option<K>,
203
204    // Set by a wrapping iterator once the value has been deserialized, so the KeyOnlyIterator knows
205    // whether it is pointing at the next key or not.
206    value_deserialized: bool,
207}
208
209impl<K: Key, V: LayerValue> KeyOnlyIterator<'_, K, V> {
210    fn new<'iter>(layer: &'iter PersistentLayer<K, V>, pos: u64) -> KeyOnlyIterator<'iter, K, V> {
211        assert!(pos % layer.block_size == 0);
212        KeyOnlyIterator {
213            layer,
214            buffer: BufferCursor { chunk: None, pos: pos as usize % CHUNK_SIZE },
215            pos,
216            item_index: 0,
217            item_count: 0,
218            key: None,
219            value_deserialized: false,
220        }
221    }
222
223    // Repositions the iterator to point to the `index`'th item in the current block.
224    // Returns an error if the index is out of range or the resulting offset contains an obviously
225    // invalid value.
226    fn seek_to_block_item(&mut self, index: u16) -> Result<(), Error> {
227        ensure!(index < self.item_count, FxfsError::OutOfRange);
228        if index == self.item_index && self.value_deserialized {
229            // Fast-path when we are seeking in a linear manner, as is the case when advancing a
230            // wrapping iterator that also deserializes the values.
231            return Ok(());
232        }
233        let offset_in_block = if index == 0 {
234            // First entry isn't actually recorded, it is at the start of the block after the item
235            // count.
236            PER_DATA_BLOCK_HEADER_SIZE
237        } else {
238            let old_buffer_pos = self.buffer.pos;
239            self.buffer.pos = round_up(self.buffer.pos, self.layer.block_size as usize).unwrap()
240                - (PER_DATA_BLOCK_SEEK_ENTRY_SIZE * (usize::from(self.item_count - index)));
241            let res = self.buffer.read_u16::<LittleEndian>();
242            self.buffer.pos = old_buffer_pos;
243            let offset_in_block = res.context("Failed to read offset")? as usize;
244            if offset_in_block >= self.layer.block_size as usize
245                || offset_in_block <= PER_DATA_BLOCK_HEADER_SIZE
246            {
247                return Err(anyhow!(FxfsError::Inconsistent))
248                    .context(format!("Offset {} is out of valid range.", offset_in_block));
249            }
250            offset_in_block
251        };
252        self.item_index = index;
253        self.buffer.pos =
254            round_down(self.buffer.pos, self.layer.block_size as usize) + offset_in_block;
255        Ok(())
256    }
257
258    async fn advance(&mut self) -> Result<(), Error> {
259        if self.item_index >= self.item_count {
260            if self.pos >= self.layer.data_offset() + self.layer.data_size {
261                self.key = None;
262                return Ok(());
263            }
264            if self.buffer.chunk.is_none() || self.pos as usize % CHUNK_SIZE == 0 {
265                self.buffer.chunk = Some(
266                    self.layer
267                        .caching_object_handle
268                        .read(self.pos as usize)
269                        .await
270                        .context("Reading during advance")?,
271                );
272            }
273            self.buffer.pos = self.pos as usize % CHUNK_SIZE;
274            self.item_count = self.buffer.read_u16::<LittleEndian>()?;
275            if self.item_count == 0 {
276                bail!(
277                    "Read block with zero item count (object: {}, offset: {})",
278                    self.layer.object_handle.object_id(),
279                    self.pos
280                );
281            }
282            debug!(
283                pos = self.pos,
284                buf:? = self.buffer,
285                object_size = self.layer.data_offset() + self.layer.data_size,
286                oid = self.layer.object_handle.object_id();
287                ""
288            );
289            self.pos += self.layer.block_size;
290            self.item_index = 0;
291            self.value_deserialized = true;
292        }
293        self.seek_to_block_item(self.item_index)?;
294        self.key = Some(
295            K::deserialize_from_version(self.buffer.by_ref(), self.layer.version)
296                .context("Corrupt layer (key)")?,
297        );
298        self.item_index += 1;
299        self.value_deserialized = false;
300        Ok(())
301    }
302
303    fn get(&self) -> Option<&K> {
304        self.key.as_ref()
305    }
306}
307
308struct Iterator<'iter, K: Key, V: LayerValue> {
309    inner: KeyOnlyIterator<'iter, K, V>,
310    // The current item.
311    item: Option<Item<K, V>>,
312}
313
314impl<'iter, K: Key, V: LayerValue> Iterator<'iter, K, V> {
315    fn new(mut seek_iterator: KeyOnlyIterator<'iter, K, V>) -> Result<Self, Error> {
316        let key = std::mem::take(&mut seek_iterator.key);
317        let item = if let Some(key) = key {
318            seek_iterator.value_deserialized = true;
319            Some(Item {
320                key,
321                value: V::deserialize_from_version(
322                    seek_iterator.buffer.by_ref(),
323                    seek_iterator.layer.version,
324                )
325                .context("Corrupt layer (value)")?,
326                sequence: seek_iterator
327                    .buffer
328                    .read_u64::<LittleEndian>()
329                    .context("Corrupt layer (seq)")?,
330            })
331        } else {
332            None
333        };
334        Ok(Self { inner: seek_iterator, item })
335    }
336}
337
338#[async_trait]
339impl<'iter, K: Key, V: LayerValue> LayerIterator<K, V> for Iterator<'iter, K, V> {
340    async fn advance(&mut self) -> Result<(), Error> {
341        self.inner.advance().await?;
342        let key = std::mem::take(&mut self.inner.key);
343        self.item = if let Some(key) = key {
344            self.inner.value_deserialized = true;
345            Some(Item {
346                key,
347                value: V::deserialize_from_version(
348                    self.inner.buffer.by_ref(),
349                    self.inner.layer.version,
350                )
351                .context("Corrupt layer (value)")?,
352                sequence: self
353                    .inner
354                    .buffer
355                    .read_u64::<LittleEndian>()
356                    .context("Corrupt layer (seq)")?,
357            })
358        } else {
359            None
360        };
361        Ok(())
362    }
363
364    fn get(&self) -> Option<ItemRef<'_, K, V>> {
365        self.item.as_ref().map(<&Item<K, V>>::into)
366    }
367}
368
369// Returns the size of the seek table in bytes.
370fn seek_table_size(num_data_blocks: u64) -> usize {
371    // The first data block doesn't have an entry.
372    let seek_table_entries = num_data_blocks.saturating_sub(1) as usize;
373    if seek_table_entries == 0 {
374        return 0;
375    }
376    let entry_size = std::mem::size_of::<u64>();
377    seek_table_entries * entry_size
378}
379
380async fn load_seek_table(
381    object_handle: &(impl ReadObjectHandle + 'static),
382    seek_table_offset: u64,
383    num_data_blocks: u64,
384) -> Result<Vec<u64>, Error> {
385    let seek_table_size = seek_table_size(num_data_blocks);
386    if seek_table_size == 0 {
387        return Ok(vec![]);
388    }
389    if seek_table_size > MAX_SEEK_TABLE_SIZE {
390        return Err(anyhow!(FxfsError::NotSupported)).context("Seek table too large");
391    }
392    let mut buffer = object_handle.allocate_buffer(seek_table_size).await;
393    let bytes_read = object_handle
394        .read(seek_table_offset, buffer.as_mut())
395        .await
396        .context("Reading seek table blocks")?;
397    ensure!(bytes_read == seek_table_size, "Short read");
398
399    let mut seek_table = Vec::with_capacity(num_data_blocks as usize);
400    // No entry for the first data block, assume a lower bound 0.
401    seek_table.push(0);
402    let mut prev = 0;
403    for chunk in buffer.as_slice().chunks_exact(std::mem::size_of::<u64>()) {
404        let next = LittleEndian::read_u64(chunk);
405        // Should be in strict ascending order, otherwise something's broken, or we've gone off
406        // the end and we're reading zeroes.
407        if prev > next {
408            return Err(anyhow!(FxfsError::Inconsistent))
409                .context(format!("Seek table entry out of order, {:?} > {:?}", prev, next));
410        }
411        prev = next;
412        seek_table.push(next);
413    }
414    Ok(seek_table)
415}
416
417async fn load_bloom_filter<K: FuzzyHash>(
418    handle: &(impl ReadObjectHandle + 'static),
419    bloom_filter_offset: u64,
420    layer_info: &LayerInfo,
421) -> Result<Option<BloomFilterReader<K>>, Error> {
422    if layer_info.bloom_filter_size_bytes == 0 {
423        return Ok(None);
424    }
425    if layer_info.bloom_filter_size_bytes > MAX_BLOOM_FILTER_SIZE {
426        return Err(anyhow!(FxfsError::NotSupported)).context("Bloom filter too large");
427    }
428    let mut buffer = handle.allocate_buffer(layer_info.bloom_filter_size_bytes).await;
429    handle.read(bloom_filter_offset, buffer.as_mut()).await.context("Failed to read")?;
430    Ok(Some(BloomFilterReader::read(
431        buffer.as_slice(),
432        layer_info.bloom_filter_seed,
433        layer_info.bloom_filter_num_hashes,
434    )?))
435}
436
437impl<K: Key, V: LayerValue> PersistentLayer<K, V> {
438    pub async fn open(handle: impl ReadObjectHandle + 'static) -> Result<Arc<Self>, Error> {
439        let bs = handle.block_size();
440        let mut buffer = handle.allocate_buffer(bs as usize).await;
441        handle.read(0, buffer.as_mut()).await.context("Failed to read first block")?;
442        let mut cursor = std::io::Cursor::new(buffer.as_slice());
443        let version = Version::deserialize_from(&mut cursor)?;
444
445        ensure!(version <= LATEST_VERSION, FxfsError::InvalidVersion);
446        let header = LayerHeader::deserialize_from_version(&mut cursor, version)
447            .context("Failed to deserialize header")?;
448        if &header.magic != PERSISTENT_LAYER_MAGIC {
449            return Err(anyhow!(FxfsError::Inconsistent).context("Invalid layer file magic"));
450        }
451        if header.block_size == 0 || !header.block_size.is_power_of_two() {
452            return Err(anyhow!(FxfsError::Inconsistent))
453                .context(format!("Invalid block size {}", header.block_size));
454        }
455        ensure!(header.block_size > 0, FxfsError::Inconsistent);
456        ensure!(header.block_size <= MAX_BLOCK_SIZE, FxfsError::NotSupported);
457        let physical_block_size = handle.block_size();
458        if header.block_size % physical_block_size != 0 {
459            return Err(anyhow!(FxfsError::Inconsistent)).context(format!(
460                "{} not a multiple of physical block size {}",
461                header.block_size, physical_block_size
462            ));
463        }
464        std::mem::drop(cursor);
465
466        let bs = header.block_size as usize;
467        if handle.get_size() < MINIMUM_LAYER_FILE_BLOCKS * bs as u64 {
468            return Err(anyhow!(FxfsError::Inconsistent).context("Layer file too short"));
469        }
470
471        let layer_info = {
472            let last_block_offset = handle
473                .get_size()
474                .checked_sub(header.block_size)
475                .ok_or(FxfsError::Inconsistent)
476                .context("Layer file unexpectedly short")?;
477            handle
478                .read(last_block_offset, buffer.subslice_mut(0..header.block_size as usize))
479                .await
480                .context("Failed to read layer info")?;
481            let layer_info_len =
482                LittleEndian::read_u64(&buffer.as_slice()[bs - std::mem::size_of::<u64>()..]);
483            let layer_info_offset = bs
484                .checked_sub(std::mem::size_of::<u64>() + layer_info_len as usize)
485                .ok_or(FxfsError::Inconsistent)
486                .context("Invalid layer info length")?;
487            let mut cursor = std::io::Cursor::new(&buffer.as_slice()[layer_info_offset..]);
488            LayerInfo::deserialize_from_version(&mut cursor, version)
489                .context("Failed to deserialize LayerInfo")?
490        };
491        std::mem::drop(buffer);
492        if layer_info.num_items == 0 && layer_info.num_data_blocks > 0 {
493            return Err(anyhow!(FxfsError::Inconsistent))
494                .context("Invalid num_items/num_data_blocks");
495        }
496        let total_blocks = handle.get_size() / header.block_size;
497        let bloom_filter_blocks =
498            round_up(layer_info.bloom_filter_size_bytes as u64, header.block_size)
499                .unwrap_or(layer_info.bloom_filter_size_bytes as u64)
500                / header.block_size;
501        if layer_info.num_data_blocks + bloom_filter_blocks
502            > total_blocks - MINIMUM_LAYER_FILE_BLOCKS
503        {
504            return Err(anyhow!(FxfsError::Inconsistent)).context("Invalid number of blocks");
505        }
506
507        let bloom_filter_offset =
508            header.block_size * (NUM_HEADER_BLOCKS + layer_info.num_data_blocks);
509        let bloom_filter = if version == LATEST_VERSION {
510            load_bloom_filter(&handle, bloom_filter_offset, &layer_info)
511                .await
512                .context("Failed to load bloom filter")?
513        } else {
514            // Ignore the bloom filter for layer files in outdated versions.  We don't know whether
515            // keys have changed formats or not (and therefore have different hash values), so we
516            // must ignore the bloom filter and always query the layer.
517            None
518        };
519        let bloom_filter_stats = bloom_filter.as_ref().map(|b| b.stats());
520
521        let seek_offset = header.block_size
522            * (NUM_HEADER_BLOCKS + layer_info.num_data_blocks + bloom_filter_blocks);
523        let seek_table = load_seek_table(&handle, seek_offset, layer_info.num_data_blocks)
524            .await
525            .context("Failed to load seek table")?;
526
527        let object_handle = Arc::new(handle) as Arc<dyn ReadObjectHandle>;
528        let caching_object_handle = CachingObjectHandle::new(object_handle.clone());
529        Ok(Arc::new(PersistentLayer {
530            object_handle,
531            caching_object_handle,
532            version,
533            block_size: header.block_size,
534            data_size: layer_info.num_data_blocks * header.block_size,
535            seek_table,
536            num_items: layer_info.num_items,
537            bloom_filter,
538            bloom_filter_stats,
539            close_event: Mutex::new(Some(Arc::new(DropEvent::new()))),
540            _value_type: PhantomData::default(),
541        }))
542    }
543
544    /// Whether the bloom filter for the layer file is consulted or not.  If this is false, then
545    /// `maybe_contains_key` will always return true.
546    /// Note that the persistent layer file may still have a bloom filter, but it might be ignored
547    /// (e.g. for a layer file on an older version).
548    pub fn has_bloom_filter(&self) -> bool {
549        self.bloom_filter.is_some()
550    }
551
552    fn data_offset(&self) -> u64 {
553        NUM_HEADER_BLOCKS * self.block_size
554    }
555}
556
557#[async_trait]
558impl<K: Key, V: LayerValue> Layer<K, V> for PersistentLayer<K, V> {
559    fn handle(&self) -> Option<&dyn ReadObjectHandle> {
560        Some(&self.object_handle)
561    }
562
563    fn purge_cached_data(&self) {
564        self.caching_object_handle.purge();
565    }
566
567    async fn seek<'a>(&'a self, bound: Bound<&K>) -> Result<BoxedLayerIterator<'a, K, V>, Error> {
568        let (key, excluded) = match bound {
569            Bound::Unbounded => {
570                let mut iterator = Iterator::new(KeyOnlyIterator::new(self, self.data_offset()))?;
571                iterator.advance().await.context("Unbounded seek advance")?;
572                return Ok(Box::new(iterator));
573            }
574            Bound::Included(k) => (k, false),
575            Bound::Excluded(k) => (k, true),
576        };
577        let first_data_block_index = self.data_offset() / self.block_size;
578
579        let (mut left_offset, mut right_offset) = {
580            // We are searching for a range here, as multiple items can have the same value in
581            // this approximate search. Since the values used are the smallest in the associated
582            // block it means that if the value equals the target you should also search the
583            // one before it. The goal is for table[left] < target < table[right].
584            let target = key.get_leading_u64();
585            // Because the first entry in the table is always 0, right_index will never be 0.
586            let right_index = self.seek_table.as_slice().partition_point(|&x| x <= target) as u64;
587            // Since partition_point will find the index of the first place where the predicate
588            // is false, we subtract 1 to get the index where it was last true.
589            let left_index = self.seek_table.as_slice()[..right_index as usize]
590                .partition_point(|&x| x < target)
591                .saturating_sub(1) as u64;
592
593            (
594                (left_index + first_data_block_index) * self.block_size,
595                (right_index + first_data_block_index) * self.block_size,
596            )
597        };
598        let mut left = KeyOnlyIterator::new(self, left_offset);
599        left.advance().await.context("Initial seek advance")?;
600        match left.get() {
601            None => return Ok(Box::new(Iterator::new(left)?)),
602            Some(left_key) => match left_key.cmp_upper_bound(key) {
603                Ordering::Greater => return Ok(Box::new(Iterator::new(left)?)),
604                Ordering::Equal => {
605                    if excluded {
606                        left.advance().await?;
607                    }
608                    return Ok(Box::new(Iterator::new(left)?));
609                }
610                Ordering::Less => {}
611            },
612        }
613        let mut right = None;
614        while right_offset - left_offset > self.block_size {
615            // Pick a block midway.
616            let mid_offset =
617                round_down(left_offset + (right_offset - left_offset) / 2, self.block_size);
618            let mut iterator = KeyOnlyIterator::new(self, mid_offset);
619            iterator.advance().await?;
620            let iter_key: &K = iterator.get().unwrap();
621            match iter_key.cmp_upper_bound(key) {
622                Ordering::Greater => {
623                    right_offset = mid_offset;
624                    right = Some(iterator);
625                }
626                Ordering::Equal => {
627                    if excluded {
628                        iterator.advance().await?;
629                    }
630                    return Ok(Box::new(Iterator::new(iterator)?));
631                }
632                Ordering::Less => {
633                    left_offset = mid_offset;
634                    left = iterator;
635                }
636            }
637        }
638
639        // Finish the binary search on the block pointed to by `left`.
640        let mut left_index = 0;
641        let mut right_index = left.item_count;
642        // If the size is zero then we don't touch the iterator.
643        while left_index < (right_index - 1) {
644            let mid_index = left_index + ((right_index - left_index) / 2);
645            left.seek_to_block_item(mid_index).context("Read index offset for binary search")?;
646            left.advance().await?;
647            match left.get().unwrap().cmp_upper_bound(key) {
648                Ordering::Greater => {
649                    right_index = mid_index;
650                }
651                Ordering::Equal => {
652                    if excluded {
653                        left.advance().await?;
654                    }
655                    return Ok(Box::new(Iterator::new(left)?));
656                }
657                Ordering::Less => {
658                    left_index = mid_index;
659                }
660            }
661        }
662        // When we don't find an exact match, we need to return with the first entry *after* the the
663        // target key which might be the first one in the next block, currently already pointed to
664        // by the "right" buffer, but usually it's just the result of the right index within the
665        // "left" buffer.
666        if right_index < left.item_count {
667            left.seek_to_block_item(right_index)
668                .context("Read index for offset of right pointer")?;
669        } else if let Some(right) = right {
670            return Ok(Box::new(Iterator::new(right)?));
671        } else {
672            // We want the end of the layer.  `right_index == left.item_count`, so `left_index ==
673            // left.item_count - 1`, and the left iterator must be positioned on `left_index` since
674            // we cannot have gone through the `Ordering::Greater` path above because `right_index`
675            // would not be equal to `left.item_count` in that case, so all we need to do is advance
676            // the iterator.
677        }
678        left.advance().await?;
679        return Ok(Box::new(Iterator::new(left)?));
680    }
681
682    fn len(&self) -> usize {
683        self.num_items
684    }
685
686    fn maybe_contains_key(&self, key: &K) -> bool {
687        self.bloom_filter.as_ref().map_or(true, |f| f.maybe_contains(key))
688    }
689
690    async fn key_exists(&self, key: &K) -> Result<Existence, Error> {
691        match &self.bloom_filter {
692            Some(filter) => Ok(if filter.maybe_contains(key) {
693                Existence::MaybeExists
694            } else {
695                Existence::Missing
696            }),
697            None => {
698                let iter = self.seek(Bound::Included(key)).await?;
699                Ok(iter.get().map_or(Existence::Missing, |i| {
700                    if i.key.cmp_upper_bound(key).is_eq() {
701                        Existence::Exists
702                    } else {
703                        Existence::Missing
704                    }
705                }))
706            }
707        }
708    }
709
710    fn lock(&self) -> Option<Arc<DropEvent>> {
711        self.close_event.lock().clone()
712    }
713
714    async fn close(&self) {
715        let listener = self.close_event.lock().take().expect("close already called").listen();
716        listener.await;
717    }
718
719    fn get_version(&self) -> Version {
720        return self.version;
721    }
722
723    fn record_inspect_data(self: Arc<Self>, node: &fuchsia_inspect::Node) {
724        node.record_uint("num_items", self.num_items as u64);
725        node.record_bool("persistent", true);
726        node.record_uint("size", self.object_handle.get_size());
727        if let Some(stats) = self.bloom_filter_stats.as_ref() {
728            node.record_child("bloom_filter", move |node| {
729                node.record_uint("size", stats.size as u64);
730                node.record_uint("num_hashes", stats.num_hashes as u64);
731                node.record_uint("fill_percentage", stats.fill_percentage as u64);
732            });
733        }
734    }
735}
736
737// This ensures that item_count can't be overflowed below.
738const_assert!(MAX_BLOCK_SIZE <= u16::MAX as u64 + 1);
739
740// -- Writer support --
741
742pub struct PersistentLayerWriter<W: WriteBytes, K: Key, V: LayerValue> {
743    writer: W,
744    block_size: u64,
745    buf: Vec<u8>,
746    buf_item_count: u16,
747    item_count: usize,
748    block_offsets: Vec<u16>,
749    block_keys: Vec<u64>,
750    bloom_filter: BloomFilterWriter<K>,
751    _value: PhantomData<V>,
752}
753
754impl<W: WriteBytes, K: Key, V: LayerValue> PersistentLayerWriter<W, K, V> {
755    /// Creates a new writer that will serialize items to the object accessible via |object_handle|
756    /// (which provides a write interface to the object).
757    pub async fn new(writer: W, num_items: usize, block_size: u64) -> Result<Self, Error> {
758        Self::new_with_version(writer, num_items, block_size, LATEST_VERSION).await
759    }
760
761    async fn new_with_version(
762        mut writer: W,
763        num_items: usize,
764        block_size: u64,
765        version: Version,
766    ) -> Result<Self, Error> {
767        ensure!(block_size <= MAX_BLOCK_SIZE, FxfsError::NotSupported);
768        ensure!(block_size >= MIN_BLOCK_SIZE, FxfsError::NotSupported);
769
770        // Write the header block.
771        let header = LayerHeader { magic: PERSISTENT_LAYER_MAGIC.clone(), block_size };
772        let mut buf = vec![0u8; block_size as usize];
773        {
774            let mut cursor = std::io::Cursor::new(&mut buf[..]);
775            version.serialize_into(&mut cursor)?;
776            header.serialize_into(&mut cursor)?;
777        }
778        writer.write_bytes(&buf[..]).await?;
779
780        let seed: u64 = rand::random();
781        Ok(PersistentLayerWriter {
782            writer,
783            block_size,
784            buf: Vec::new(),
785            buf_item_count: 0,
786            item_count: 0,
787            block_offsets: Vec::new(),
788            block_keys: Vec::new(),
789            bloom_filter: BloomFilterWriter::new(seed, num_items),
790            _value: PhantomData,
791        })
792    }
793
794    /// Writes 'buf[..len]' out as a block.
795    ///
796    /// Blocks are fixed size, consisting of a 16-bit item count, data, zero padding
797    /// and seek table at the end.
798    async fn write_block(&mut self, len: usize) -> Result<(), Error> {
799        if self.buf_item_count == 0 {
800            return Ok(());
801        }
802        let seek_table_size = self.block_offsets.len() * PER_DATA_BLOCK_SEEK_ENTRY_SIZE;
803        assert!(PER_DATA_BLOCK_HEADER_SIZE + seek_table_size + len <= self.block_size as usize);
804        let mut cursor = std::io::Cursor::new(vec![0u8; self.block_size as usize]);
805        cursor.write_u16::<LittleEndian>(self.buf_item_count)?;
806        cursor.write_all(self.buf.drain(..len).as_ref())?;
807        cursor.set_position(self.block_size - seek_table_size as u64);
808        // Write the seek table. Entries are 2 bytes each and items are always at least 10.
809        for &offset in &self.block_offsets {
810            cursor.write_u16::<LittleEndian>(offset)?;
811        }
812        self.writer.write_bytes(cursor.get_ref()).await?;
813        debug!(item_count = self.buf_item_count, byte_count = len; "wrote items");
814        self.buf_item_count = 0;
815        self.block_offsets.clear();
816        Ok(())
817    }
818
819    // Assumes the writer is positioned to a new block.
820    // Returns the size, in bytes, of the seek table.
821    // Note that the writer will be positioned to exactly the end of the seek table, not to the end
822    // of a block.
823    async fn write_seek_table(&mut self) -> Result<usize, Error> {
824        if self.block_keys.len() == 0 {
825            return Ok(0);
826        }
827        let size = self.block_keys.len() * std::mem::size_of::<u64>();
828        self.buf.resize(size, 0);
829        let mut len = 0;
830        for key in &self.block_keys {
831            LittleEndian::write_u64(&mut self.buf[len..len + std::mem::size_of::<u64>()], *key);
832            len += std::mem::size_of::<u64>();
833        }
834        self.writer.write_bytes(&self.buf).await?;
835        Ok(size)
836    }
837
838    // Assumes the writer is positioned to exactly the end of the seek table, which was
839    // `seek_table_len` bytes.
840    async fn write_info(
841        &mut self,
842        num_data_blocks: u64,
843        bloom_filter_size_bytes: usize,
844        seek_table_len: usize,
845    ) -> Result<(), Error> {
846        let block_size = self.writer.block_size() as usize;
847        let layer_info = LayerInfo {
848            num_items: self.item_count,
849            num_data_blocks,
850            bloom_filter_size_bytes,
851            bloom_filter_seed: self.bloom_filter.seed(),
852            bloom_filter_num_hashes: self.bloom_filter.num_hashes(),
853        };
854        let actual_len = {
855            let mut cursor = std::io::Cursor::new(&mut self.buf);
856            layer_info.serialize_into(&mut cursor)?;
857            let layer_info_len = cursor.position();
858            cursor.write_u64::<LittleEndian>(layer_info_len)?;
859            cursor.position() as usize
860        };
861
862        // We want the LayerInfo to be at the end of the last block.  That might require creating a
863        // new block if we don't have enough room.
864        let avail_in_block = block_size - (seek_table_len % block_size);
865        let to_skip = if avail_in_block < actual_len {
866            block_size + avail_in_block - actual_len
867        } else {
868            avail_in_block - actual_len
869        } as u64;
870        self.writer.skip(to_skip).await?;
871        self.writer.write_bytes(&self.buf[..actual_len]).await?;
872        Ok(())
873    }
874
875    // Assumes the writer is positioned to a new block.
876    // Returns the size of the bloom filter, in bytes.
877    async fn write_bloom_filter(&mut self) -> Result<usize, Error> {
878        if self.data_blocks() < MINIMUM_DATA_BLOCKS_FOR_BLOOM_FILTER {
879            return Ok(0);
880        }
881        // TODO(https://fxbug.dev/323571978): Avoid bounce-buffering.
882        let size = round_up(self.bloom_filter.serialized_size(), self.block_size as usize).unwrap();
883        self.buf.resize(size, 0);
884        let mut cursor = std::io::Cursor::new(&mut self.buf);
885        self.bloom_filter.write(&mut cursor)?;
886        self.writer.write_bytes(&self.buf).await?;
887        Ok(self.bloom_filter.serialized_size())
888    }
889
890    // Returns the bloom filter writer. Intended to be used for testing purposes, e.g., gain access
891    // to the bloom filter to then corrupt it.
892    #[cfg(test)]
893    pub(crate) fn bloom_filter(&mut self) -> &mut BloomFilterWriter<K> {
894        &mut self.bloom_filter
895    }
896
897    fn data_blocks(&self) -> usize {
898        if self.item_count == 0 { 0 } else { self.block_keys.len() + 1 }
899    }
900}
901
902impl<W: WriteBytes + Send, K: Key, V: LayerValue> LayerWriter<K, V>
903    for PersistentLayerWriter<W, K, V>
904{
905    async fn write(&mut self, item: ItemRef<'_, K, V>) -> Result<(), Error> {
906        // Note the length before we write this item.
907        let len = self.buf.len();
908        item.key.serialize_into(&mut self.buf)?;
909        item.value.serialize_into(&mut self.buf)?;
910        self.buf.write_u64::<LittleEndian>(item.sequence)?;
911        let mut added_offset = false;
912        // Never record the first item. The offset is always the same.
913        if self.buf_item_count > 0 {
914            self.block_offsets.push(u16::try_from(len + PER_DATA_BLOCK_HEADER_SIZE).unwrap());
915            added_offset = true;
916        }
917
918        // If writing the item took us over a block, flush the bytes in the buffer prior to this
919        // item.
920        if PER_DATA_BLOCK_HEADER_SIZE
921            + self.buf.len()
922            + (self.block_offsets.len() * PER_DATA_BLOCK_SEEK_ENTRY_SIZE)
923            > self.block_size as usize - 1
924        {
925            if added_offset {
926                // Drop the recently added offset from the list. The latest item will be the first
927                // on the next block and have a known offset there.
928                self.block_offsets.pop();
929            }
930            self.write_block(len).await?;
931
932            // Note that this will not insert an entry for the first data block.
933            self.block_keys.push(item.key.get_leading_u64());
934        }
935
936        self.bloom_filter.insert(&item.key);
937        self.buf_item_count += 1;
938        self.item_count += 1;
939        Ok(())
940    }
941
942    async fn flush(&mut self) -> Result<(), Error> {
943        self.write_block(self.buf.len()).await?;
944        let data_blocks = self.data_blocks() as u64;
945        let bloom_filter_len = self.write_bloom_filter().await?;
946        let seek_table_len = self.write_seek_table().await?;
947        self.write_info(data_blocks, bloom_filter_len, seek_table_len).await?;
948        self.writer.complete().await
949    }
950}
951
952impl<W: WriteBytes, K: Key, V: LayerValue> Drop for PersistentLayerWriter<W, K, V> {
953    fn drop(&mut self) {
954        if self.buf_item_count > 0 {
955            warn!("Dropping unwritten items; did you forget to flush?");
956        }
957    }
958}
959
960#[cfg(test)]
961mod tests {
962    use super::{PersistentLayer, PersistentLayerWriter};
963    use crate::filesystem::MAX_BLOCK_SIZE;
964    use crate::lsm_tree::LayerIterator;
965    use crate::lsm_tree::persistent_layer::MINIMUM_DATA_BLOCKS_FOR_BLOOM_FILTER;
966    use crate::lsm_tree::types::{
967        DefaultOrdUpperBound, Existence, FuzzyHash, Item, ItemRef, Layer, LayerKey, LayerWriter,
968        MergeType, SortByU64,
969    };
970    use crate::object_handle::WriteBytes;
971    use crate::round::round_up;
972    use crate::serialized_types::{
973        LATEST_VERSION, Version, Versioned, VersionedLatest, versioned_type,
974    };
975    use crate::testing::fake_object::{FakeObject, FakeObjectHandle};
976    use crate::testing::writer::Writer;
977    use fprint::TypeFingerprint;
978    use fxfs_macros::FuzzyHash;
979    use std::fmt::Debug;
980    use std::hash::Hash;
981    use std::ops::{Bound, Range};
982    use std::sync::Arc;
983
984    impl<W: WriteBytes> Debug for PersistentLayerWriter<W, i32, i32> {
985        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
986            f.debug_struct("rPersistentLayerWriter")
987                .field("block_size", &self.block_size)
988                .field("item_count", &self.buf_item_count)
989                .finish()
990        }
991    }
992
993    #[fuchsia::test]
994    async fn test_iterate_after_write() {
995        const BLOCK_SIZE: u64 = 512;
996        const ITEM_COUNT: i32 = 10000;
997
998        let handle = FakeObjectHandle::new(Arc::new(FakeObject::new()));
999        {
1000            let mut writer = PersistentLayerWriter::<_, i32, i32>::new(
1001                Writer::new(&handle).await,
1002                ITEM_COUNT as usize * 4,
1003                BLOCK_SIZE,
1004            )
1005            .await
1006            .expect("writer new");
1007            for i in 0..ITEM_COUNT {
1008                writer.write(Item::new(i, i).as_item_ref()).await.expect("write failed");
1009            }
1010            writer.flush().await.expect("flush failed");
1011        }
1012        let layer = PersistentLayer::<i32, i32>::open(handle).await.expect("new failed");
1013        let mut iterator = layer.seek(Bound::Unbounded).await.expect("seek failed");
1014        for i in 0..ITEM_COUNT {
1015            let ItemRef { key, value, .. } = iterator.get().expect("missing item");
1016            assert_eq!((key, value), (&i, &i));
1017            iterator.advance().await.expect("failed to advance");
1018        }
1019        assert!(iterator.get().is_none());
1020    }
1021
1022    #[fuchsia::test]
1023    async fn test_seek_after_write() {
1024        const BLOCK_SIZE: u64 = 512;
1025        const ITEM_COUNT: i32 = 5000;
1026
1027        let handle = FakeObjectHandle::new(Arc::new(FakeObject::new()));
1028        {
1029            let mut writer = PersistentLayerWriter::<_, i32, i32>::new(
1030                Writer::new(&handle).await,
1031                ITEM_COUNT as usize * 18,
1032                BLOCK_SIZE,
1033            )
1034            .await
1035            .expect("writer new");
1036            for i in 0..ITEM_COUNT {
1037                // Populate every other value as an item.
1038                writer.write(Item::new(i * 2, i * 2).as_item_ref()).await.expect("write failed");
1039            }
1040            writer.flush().await.expect("flush failed");
1041        }
1042        let layer = PersistentLayer::<i32, i32>::open(handle).await.expect("new failed");
1043        // Search for all values to check the in-between values.
1044        for i in 0..ITEM_COUNT * 2 {
1045            // We've written every other value, we expect to get either the exact value searched
1046            // for, or the next one after it. So round up to the nearest multiple of 2.
1047            let expected = round_up(i, 2).unwrap();
1048            let mut iterator = layer.seek(Bound::Included(&i)).await.expect("failed to seek");
1049            // We've written values up to (N-1)*2=2*N-2, so when looking for 2*N-1 we'll go off the
1050            // end of the layer and get back no item.
1051            if i >= (ITEM_COUNT * 2) - 1 {
1052                assert!(iterator.get().is_none());
1053            } else {
1054                let ItemRef { key, value, .. } = iterator.get().expect("missing item");
1055                assert_eq!((key, value), (&expected, &expected));
1056            }
1057
1058            // Check that we can advance to the next item.
1059            iterator.advance().await.expect("failed to advance");
1060            // The highest value is 2*N-2, searching for 2*N-3 will find the last value, and
1061            // advancing will go off the end of the layer and return no item. If there was
1062            // previously no item, then it will latch and always return no item.
1063            if i >= (ITEM_COUNT * 2) - 3 {
1064                assert!(iterator.get().is_none());
1065            } else {
1066                let ItemRef { key, value, .. } = iterator.get().expect("missing item");
1067                let next = expected + 2;
1068                assert_eq!((key, value), (&next, &next));
1069            }
1070        }
1071    }
1072
1073    #[fuchsia::test]
1074    async fn test_seek_unbounded() {
1075        const BLOCK_SIZE: u64 = 512;
1076        const ITEM_COUNT: i32 = 1000;
1077
1078        let handle = FakeObjectHandle::new(Arc::new(FakeObject::new()));
1079        {
1080            let mut writer = PersistentLayerWriter::<_, i32, i32>::new(
1081                Writer::new(&handle).await,
1082                ITEM_COUNT as usize * 18,
1083                BLOCK_SIZE,
1084            )
1085            .await
1086            .expect("writer new");
1087            for i in 0..ITEM_COUNT {
1088                writer.write(Item::new(i, i).as_item_ref()).await.expect("write failed");
1089            }
1090            writer.flush().await.expect("flush failed");
1091        }
1092        let layer = PersistentLayer::<i32, i32>::open(handle).await.expect("new failed");
1093        let mut iterator = layer.seek(Bound::Unbounded).await.expect("failed to seek");
1094        let ItemRef { key, value, .. } = iterator.get().expect("missing item");
1095        assert_eq!((key, value), (&0, &0));
1096
1097        // Check that we can advance to the next item.
1098        iterator.advance().await.expect("failed to advance");
1099        let ItemRef { key, value, .. } = iterator.get().expect("missing item");
1100        assert_eq!((key, value), (&1, &1));
1101    }
1102
1103    #[fuchsia::test]
1104    async fn test_zero_items() {
1105        const BLOCK_SIZE: u64 = 512;
1106
1107        let handle = FakeObjectHandle::new(Arc::new(FakeObject::new()));
1108        {
1109            let mut writer = PersistentLayerWriter::<_, i32, i32>::new(
1110                Writer::new(&handle).await,
1111                0,
1112                BLOCK_SIZE,
1113            )
1114            .await
1115            .expect("writer new");
1116            writer.flush().await.expect("flush failed");
1117        }
1118
1119        let layer = PersistentLayer::<i32, i32>::open(handle).await.expect("new failed");
1120        let iterator = (layer.as_ref() as &dyn Layer<i32, i32>)
1121            .seek(Bound::Unbounded)
1122            .await
1123            .expect("seek failed");
1124        assert!(iterator.get().is_none())
1125    }
1126
1127    #[fuchsia::test]
1128    async fn test_one_item() {
1129        const BLOCK_SIZE: u64 = 512;
1130
1131        let handle = FakeObjectHandle::new(Arc::new(FakeObject::new()));
1132        {
1133            let mut writer = PersistentLayerWriter::<_, i32, i32>::new(
1134                Writer::new(&handle).await,
1135                1,
1136                BLOCK_SIZE,
1137            )
1138            .await
1139            .expect("writer new");
1140            writer.write(Item::new(42, 42).as_item_ref()).await.expect("write failed");
1141            writer.flush().await.expect("flush failed");
1142        }
1143
1144        let layer = PersistentLayer::<i32, i32>::open(handle).await.expect("new failed");
1145        {
1146            let mut iterator = (layer.as_ref() as &dyn Layer<i32, i32>)
1147                .seek(Bound::Unbounded)
1148                .await
1149                .expect("seek failed");
1150            let ItemRef { key, value, .. } = iterator.get().expect("missing item");
1151            assert_eq!((key, value), (&42, &42));
1152            iterator.advance().await.expect("failed to advance");
1153            assert!(iterator.get().is_none())
1154        }
1155        {
1156            let mut iterator = (layer.as_ref() as &dyn Layer<i32, i32>)
1157                .seek(Bound::Included(&30))
1158                .await
1159                .expect("seek failed");
1160            let ItemRef { key, value, .. } = iterator.get().expect("missing item");
1161            assert_eq!((key, value), (&42, &42));
1162            iterator.advance().await.expect("failed to advance");
1163            assert!(iterator.get().is_none())
1164        }
1165        {
1166            let mut iterator = (layer.as_ref() as &dyn Layer<i32, i32>)
1167                .seek(Bound::Included(&42))
1168                .await
1169                .expect("seek failed");
1170            let ItemRef { key, value, .. } = iterator.get().expect("missing item");
1171            assert_eq!((key, value), (&42, &42));
1172            iterator.advance().await.expect("failed to advance");
1173            assert!(iterator.get().is_none())
1174        }
1175        {
1176            let iterator = (layer.as_ref() as &dyn Layer<i32, i32>)
1177                .seek(Bound::Included(&43))
1178                .await
1179                .expect("seek failed");
1180            assert!(iterator.get().is_none())
1181        }
1182    }
1183
1184    #[fuchsia::test]
1185    async fn test_large_block_size() {
1186        // At the upper end of the supported size.
1187        const BLOCK_SIZE: u64 = MAX_BLOCK_SIZE;
1188        // Items will be 18 bytes, so fill up a few pages.
1189        const ITEM_COUNT: i32 = ((BLOCK_SIZE as i32) / 18) * 3;
1190
1191        let handle =
1192            FakeObjectHandle::new_with_block_size(Arc::new(FakeObject::new()), BLOCK_SIZE as usize);
1193        {
1194            let mut writer = PersistentLayerWriter::<_, i32, i32>::new(
1195                Writer::new(&handle).await,
1196                ITEM_COUNT as usize * 18,
1197                BLOCK_SIZE,
1198            )
1199            .await
1200            .expect("writer new");
1201            // Use large values to force varint encoding to use consistent space.
1202            for i in 2000000000..(2000000000 + ITEM_COUNT) {
1203                writer.write(Item::new(i, i).as_item_ref()).await.expect("write failed");
1204            }
1205            writer.flush().await.expect("flush failed");
1206        }
1207
1208        let layer = PersistentLayer::<i32, i32>::open(handle).await.expect("new failed");
1209        let mut iterator = layer.seek(Bound::Unbounded).await.expect("seek failed");
1210        for i in 2000000000..(2000000000 + ITEM_COUNT) {
1211            let ItemRef { key, value, .. } = iterator.get().expect("missing item");
1212            assert_eq!((key, value), (&i, &i));
1213            iterator.advance().await.expect("failed to advance");
1214        }
1215        assert!(iterator.get().is_none());
1216    }
1217
1218    #[fuchsia::test]
1219    async fn test_overlarge_block_size() {
1220        // At the upper end of the supported size.
1221        const BLOCK_SIZE: u64 = MAX_BLOCK_SIZE * 2;
1222
1223        let handle =
1224            FakeObjectHandle::new_with_block_size(Arc::new(FakeObject::new()), BLOCK_SIZE as usize);
1225        PersistentLayerWriter::<_, i32, i32>::new(Writer::new(&handle).await, 0, BLOCK_SIZE)
1226            .await
1227            .expect_err("Creating writer with overlarge block size.");
1228    }
1229
1230    #[fuchsia::test]
1231    async fn test_seek_bound_excluded() {
1232        const BLOCK_SIZE: u64 = 512;
1233        const ITEM_COUNT: i32 = 10000;
1234
1235        let handle = FakeObjectHandle::new(Arc::new(FakeObject::new()));
1236        {
1237            let mut writer = PersistentLayerWriter::<_, i32, i32>::new(
1238                Writer::new(&handle).await,
1239                ITEM_COUNT as usize * 18,
1240                BLOCK_SIZE,
1241            )
1242            .await
1243            .expect("writer new");
1244            for i in 0..ITEM_COUNT {
1245                writer.write(Item::new(i, i).as_item_ref()).await.expect("write failed");
1246            }
1247            writer.flush().await.expect("flush failed");
1248        }
1249        let layer = PersistentLayer::<i32, i32>::open(handle).await.expect("new failed");
1250
1251        for i in 9982..ITEM_COUNT {
1252            let mut iterator = layer.seek(Bound::Excluded(&i)).await.expect("failed to seek");
1253            let i_plus_one = i + 1;
1254            if i_plus_one < ITEM_COUNT {
1255                let ItemRef { key, value, .. } = iterator.get().expect("missing item");
1256
1257                assert_eq!((key, value), (&i_plus_one, &i_plus_one));
1258
1259                // Check that we can advance to the next item.
1260                iterator.advance().await.expect("failed to advance");
1261                let i_plus_two = i + 2;
1262                if i_plus_two < ITEM_COUNT {
1263                    let ItemRef { key, value, .. } = iterator.get().expect("missing item");
1264                    assert_eq!((key, value), (&i_plus_two, &i_plus_two));
1265                } else {
1266                    assert!(iterator.get().is_none());
1267                }
1268            } else {
1269                assert!(iterator.get().is_none());
1270            }
1271        }
1272    }
1273
1274    #[derive(
1275        Clone,
1276        Eq,
1277        Hash,
1278        FuzzyHash,
1279        PartialEq,
1280        Debug,
1281        serde::Serialize,
1282        serde::Deserialize,
1283        TypeFingerprint,
1284        Versioned,
1285    )]
1286    struct TestKey(Range<u64>);
1287    versioned_type! { 1.. => TestKey }
1288    impl SortByU64 for TestKey {
1289        fn get_leading_u64(&self) -> u64 {
1290            self.0.start
1291        }
1292    }
1293    impl LayerKey for TestKey {
1294        fn merge_type(&self) -> crate::lsm_tree::types::MergeType {
1295            MergeType::OptimizedMerge
1296        }
1297
1298        fn next_key(&self) -> Option<Self> {
1299            Some(TestKey(self.0.end..self.0.end + 1))
1300        }
1301    }
1302    impl Ord for TestKey {
1303        fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1304            self.0.start.cmp(&other.0.start).then(self.0.end.cmp(&other.0.end))
1305        }
1306    }
1307    impl PartialOrd for TestKey {
1308        fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1309            Some(self.cmp(other))
1310        }
1311    }
1312    impl DefaultOrdUpperBound for TestKey {}
1313
1314    // Create a large spread of data across several blocks to ensure that no part of the range is
1315    // lost by the partial search using the layer seek table.
1316    #[fuchsia::test]
1317    async fn test_block_seek_duplicate_keys() {
1318        // At the upper end of the supported size.
1319        const BLOCK_SIZE: u64 = 512;
1320        // Items will be 37 bytes each. Max length varint u64 is 9 bytes, 3 of those plus one
1321        // straight encoded sequence number for another 8 bytes. Then 2 more for each seek table
1322        // entry.
1323        const ITEMS_TO_FILL_BLOCK: u64 = BLOCK_SIZE / 37;
1324
1325        let mut to_find = Vec::new();
1326
1327        let handle =
1328            FakeObjectHandle::new_with_block_size(Arc::new(FakeObject::new()), BLOCK_SIZE as usize);
1329        {
1330            let mut writer = PersistentLayerWriter::<_, TestKey, u64>::new(
1331                Writer::new(&handle).await,
1332                3 * BLOCK_SIZE as usize,
1333                BLOCK_SIZE,
1334            )
1335            .await
1336            .expect("writer new");
1337
1338            // Make all values take up maximum space for varint encoding.
1339            let mut current_value = u32::MAX as u64 + 1;
1340
1341            // First fill the front with a duplicate leading u64 amount, then look at the start,
1342            // middle and end of the range.
1343            {
1344                let items = ITEMS_TO_FILL_BLOCK * 3;
1345                for i in 0..items {
1346                    writer
1347                        .write(
1348                            Item::new(TestKey(current_value..current_value + i), current_value)
1349                                .as_item_ref(),
1350                        )
1351                        .await
1352                        .expect("write failed");
1353                }
1354                to_find.push(TestKey(current_value..current_value));
1355                to_find.push(TestKey(current_value..(current_value + (items / 2))));
1356                to_find.push(TestKey(current_value..current_value + (items - 1)));
1357                current_value += 1;
1358            }
1359
1360            // Add some filler of all different leading u64.
1361            {
1362                let items = ITEMS_TO_FILL_BLOCK * 3;
1363                for _ in 0..items {
1364                    writer
1365                        .write(
1366                            Item::new(TestKey(current_value..current_value), current_value)
1367                                .as_item_ref(),
1368                        )
1369                        .await
1370                        .expect("write failed");
1371                    current_value += 1;
1372                }
1373            }
1374
1375            // Fill the middle with a duplicate leading u64 amount, then look at the start,
1376            // middle and end of the range.
1377            {
1378                let items = ITEMS_TO_FILL_BLOCK * 3;
1379                for i in 0..items {
1380                    writer
1381                        .write(
1382                            Item::new(TestKey(current_value..current_value + i), current_value)
1383                                .as_item_ref(),
1384                        )
1385                        .await
1386                        .expect("write failed");
1387                }
1388                to_find.push(TestKey(current_value..current_value));
1389                to_find.push(TestKey(current_value..(current_value + (items / 2))));
1390                to_find.push(TestKey(current_value..current_value + (items - 1)));
1391                current_value += 1;
1392            }
1393
1394            // Add some filler of all different leading u64.
1395            {
1396                let items = ITEMS_TO_FILL_BLOCK * 3;
1397                for _ in 0..items {
1398                    writer
1399                        .write(
1400                            Item::new(TestKey(current_value..current_value), current_value)
1401                                .as_item_ref(),
1402                        )
1403                        .await
1404                        .expect("write failed");
1405                    current_value += 1;
1406                }
1407            }
1408
1409            // Fill the end with a duplicate leading u64 amount, then look at the start,
1410            // middle and end of the range.
1411            {
1412                let items = ITEMS_TO_FILL_BLOCK * 3;
1413                for i in 0..items {
1414                    writer
1415                        .write(
1416                            Item::new(TestKey(current_value..current_value + i), current_value)
1417                                .as_item_ref(),
1418                        )
1419                        .await
1420                        .expect("write failed");
1421                }
1422                to_find.push(TestKey(current_value..current_value));
1423                to_find.push(TestKey(current_value..(current_value + (items / 2))));
1424                to_find.push(TestKey(current_value..current_value + (items - 1)));
1425            }
1426
1427            writer.flush().await.expect("flush failed");
1428        }
1429
1430        let layer = PersistentLayer::<TestKey, u64>::open(handle).await.expect("new failed");
1431        for target in to_find {
1432            let iterator: Box<dyn LayerIterator<TestKey, u64>> =
1433                layer.seek(Bound::Included(&target)).await.expect("failed to seek");
1434            let ItemRef { key, .. } = iterator.get().expect("missing item");
1435            assert_eq!(&target, key);
1436        }
1437    }
1438
1439    #[fuchsia::test]
1440    async fn test_two_seek_blocks() {
1441        // At the upper end of the supported size.
1442        const BLOCK_SIZE: u64 = 512;
1443        // Items will be 37 bytes each. Max length varint u64 is 9 bytes, 3 of those plus one
1444        // straight encoded sequence number for another 8 bytes. Then 2 more for each seek table
1445        // entry.
1446        const ITEMS_TO_FILL_BLOCK: u64 = BLOCK_SIZE / 37;
1447        // Add enough items to create enough blocks that the seek table can't fit into a single
1448        // block. Entries are 8 bytes. Remember to add one because the first block entry is omitted,
1449        // then add one more to overflow into the next block.
1450        const ITEM_COUNT: u64 = ITEMS_TO_FILL_BLOCK * ((BLOCK_SIZE / 8) + 2);
1451
1452        let mut to_find = Vec::new();
1453
1454        let handle =
1455            FakeObjectHandle::new_with_block_size(Arc::new(FakeObject::new()), BLOCK_SIZE as usize);
1456        {
1457            let mut writer = PersistentLayerWriter::<_, TestKey, u64>::new(
1458                Writer::new(&handle).await,
1459                ITEM_COUNT as usize * 18,
1460                BLOCK_SIZE,
1461            )
1462            .await
1463            .expect("writer new");
1464
1465            // Make all values take up maximum space for varint encoding.
1466            let initial_value = u32::MAX as u64 + 1;
1467            for i in 0..ITEM_COUNT {
1468                writer
1469                    .write(
1470                        Item::new(TestKey(initial_value + i..initial_value + i), initial_value)
1471                            .as_item_ref(),
1472                    )
1473                    .await
1474                    .expect("write failed");
1475            }
1476            // Look at the start middle and end.
1477            to_find.push(TestKey(initial_value..initial_value));
1478            let middle = initial_value + ITEM_COUNT / 2;
1479            to_find.push(TestKey(middle..middle));
1480            let end = initial_value + ITEM_COUNT - 1;
1481            to_find.push(TestKey(end..end));
1482
1483            writer.flush().await.expect("flush failed");
1484        }
1485
1486        let layer = PersistentLayer::<TestKey, u64>::open(handle).await.expect("new failed");
1487        for target in to_find {
1488            let iterator: Box<dyn LayerIterator<TestKey, u64>> =
1489                layer.seek(Bound::Included(&target)).await.expect("failed to seek");
1490            let ItemRef { key, .. } = iterator.get().expect("missing item");
1491            assert_eq!(&target, key);
1492        }
1493    }
1494
1495    // Verifies behaviour around creating full seek blocks, to ensure that it is able to be opened
1496    // and parsed afterward.
1497    #[fuchsia::test]
1498    async fn test_full_seek_block() {
1499        const BLOCK_SIZE: u64 = 512;
1500
1501        // Items will be 37 bytes each. Max length varint u64 is 9 bytes, 3 of those plus one
1502        // straight encoded sequence number for another 8 bytes. Then 2 more for each seek table
1503        // entry.
1504        const ITEMS_TO_FILL_BLOCK: u64 = BLOCK_SIZE / 37;
1505
1506        // How many entries there are in a seek table block.
1507        const SEEK_TABLE_ENTRIES: u64 = BLOCK_SIZE / 8;
1508
1509        // Number of entries to fill a seek block would need one more block of entries, but we're
1510        // starting low here on purpose to do a range and make sure we hit the size we are
1511        // interested in.
1512        const START_ENTRIES_COUNT: u64 = ITEMS_TO_FILL_BLOCK * SEEK_TABLE_ENTRIES;
1513
1514        for entries in START_ENTRIES_COUNT..START_ENTRIES_COUNT + (ITEMS_TO_FILL_BLOCK * 2) {
1515            let handle = FakeObjectHandle::new_with_block_size(
1516                Arc::new(FakeObject::new()),
1517                BLOCK_SIZE as usize,
1518            );
1519            {
1520                let mut writer = PersistentLayerWriter::<_, TestKey, u64>::new(
1521                    Writer::new(&handle).await,
1522                    entries as usize,
1523                    BLOCK_SIZE,
1524                )
1525                .await
1526                .expect("writer new");
1527
1528                // Make all values take up maximum space for varint encoding.
1529                let initial_value = u32::MAX as u64 + 1;
1530                for i in 0..entries {
1531                    writer
1532                        .write(
1533                            Item::new(TestKey(initial_value + i..initial_value + i), initial_value)
1534                                .as_item_ref(),
1535                        )
1536                        .await
1537                        .expect("write failed");
1538                }
1539
1540                writer.flush().await.expect("flush failed");
1541            }
1542            PersistentLayer::<TestKey, u64>::open(handle).await.expect("new failed");
1543        }
1544    }
1545
1546    #[fuchsia::test]
1547    async fn test_ignore_bloom_filter_on_older_versions() {
1548        const BLOCK_SIZE: u64 = 512;
1549        // Items will be 37 bytes each. Max length varint u64 is 9 bytes, 3 of those plus one
1550        // straight encoded sequence number for another 8 bytes. Then 2 more for each seek table
1551        // entry.
1552        const ITEMS_TO_FILL_BLOCK: u64 = BLOCK_SIZE / 37;
1553        // Add enough items to create enough blocks for a bloom filter to be necessary.
1554        const ITEM_COUNT: u64 =
1555            (1 + MINIMUM_DATA_BLOCKS_FOR_BLOOM_FILTER as u64) * ITEMS_TO_FILL_BLOCK;
1556
1557        let old_version_handle =
1558            FakeObjectHandle::new_with_block_size(Arc::new(FakeObject::new()), BLOCK_SIZE as usize);
1559        let current_version_handle =
1560            FakeObjectHandle::new_with_block_size(Arc::new(FakeObject::new()), BLOCK_SIZE as usize);
1561        {
1562            let mut old_version_writer =
1563                PersistentLayerWriter::<_, TestKey, u64>::new_with_version(
1564                    Writer::new(&old_version_handle).await,
1565                    ITEM_COUNT as usize,
1566                    BLOCK_SIZE,
1567                    Version { major: LATEST_VERSION.major - 1, minor: 0 },
1568                )
1569                .await
1570                .expect("writer new");
1571            let mut current_version_writer = PersistentLayerWriter::<_, TestKey, u64>::new(
1572                Writer::new(&current_version_handle).await,
1573                ITEM_COUNT as usize,
1574                BLOCK_SIZE,
1575            )
1576            .await
1577            .expect("writer new");
1578
1579            // Make all values take up maximum space for varint encoding.
1580            let initial_value = u32::MAX as u64 + 1;
1581            for i in 0..ITEM_COUNT {
1582                old_version_writer
1583                    .write(
1584                        Item::new(TestKey(initial_value + i..initial_value + i), initial_value)
1585                            .as_item_ref(),
1586                    )
1587                    .await
1588                    .expect("write failed");
1589                current_version_writer
1590                    .write(
1591                        Item::new(TestKey(initial_value + i..initial_value + i), initial_value)
1592                            .as_item_ref(),
1593                    )
1594                    .await
1595                    .expect("write failed");
1596            }
1597
1598            old_version_writer.flush().await.expect("flush failed");
1599            current_version_writer.flush().await.expect("flush failed");
1600        }
1601
1602        let old_layer =
1603            PersistentLayer::<TestKey, u64>::open(old_version_handle).await.expect("open failed");
1604        let current_layer = PersistentLayer::<TestKey, u64>::open(current_version_handle)
1605            .await
1606            .expect("open failed");
1607        assert!(!old_layer.has_bloom_filter());
1608        assert!(current_layer.has_bloom_filter());
1609    }
1610
1611    #[fuchsia::test]
1612    async fn test_key_exists_no_bloom_filter() {
1613        const BLOCK_SIZE: u64 = 512;
1614        // Not enough items to trigger a bloom filter.
1615        const ITEM_COUNT: i32 = 100;
1616
1617        let handle = FakeObjectHandle::new(Arc::new(FakeObject::new()));
1618        {
1619            let mut writer = PersistentLayerWriter::<_, i32, i32>::new(
1620                Writer::new(&handle).await,
1621                ITEM_COUNT as usize,
1622                BLOCK_SIZE,
1623            )
1624            .await
1625            .expect("writer new");
1626            for i in 0..ITEM_COUNT {
1627                writer.write(Item::new(i * 2, i * 2).as_item_ref()).await.expect("write failed");
1628            }
1629            writer.flush().await.expect("flush failed");
1630        }
1631        let layer = PersistentLayer::<i32, i32>::open(handle).await.expect("new failed");
1632        assert!(!layer.has_bloom_filter());
1633
1634        for i in 0..ITEM_COUNT {
1635            assert_eq!(
1636                layer.key_exists(&(i * 2)).await.expect("key_exists failed"),
1637                Existence::Exists
1638            );
1639            assert_eq!(
1640                layer.key_exists(&(i * 2 + 1)).await.expect("key_exists failed"),
1641                Existence::Missing
1642            );
1643        }
1644    }
1645
1646    #[fuchsia::test]
1647    async fn test_key_exists_with_bloom_filter() {
1648        const BLOCK_SIZE: u64 = 512;
1649        // Enough items to trigger a bloom filter.
1650        const ITEM_COUNT: i32 = 10000;
1651
1652        let handle = FakeObjectHandle::new(Arc::new(FakeObject::new()));
1653        {
1654            let mut writer = PersistentLayerWriter::<_, i32, i32>::new(
1655                Writer::new(&handle).await,
1656                ITEM_COUNT as usize,
1657                BLOCK_SIZE,
1658            )
1659            .await
1660            .expect("writer new");
1661            for i in 0..ITEM_COUNT {
1662                writer.write(Item::new(i * 2, i * 2).as_item_ref()).await.expect("write failed");
1663            }
1664            writer.flush().await.expect("flush failed");
1665        }
1666        let layer = PersistentLayer::<i32, i32>::open(handle).await.expect("new failed");
1667        assert!(layer.has_bloom_filter());
1668
1669        for i in 0..ITEM_COUNT {
1670            // With a bloom filter, we expect MaybeExists for present keys.
1671            assert_eq!(
1672                layer.key_exists(&(i * 2)).await.expect("key_exists failed"),
1673                Existence::MaybeExists
1674            );
1675        }
1676
1677        // For missing keys, we expect Missing, but might get MaybeExists due to false positives.
1678        // We can at least assert it's NOT Exists.
1679        let mut missing_count = 0;
1680        for i in 0..ITEM_COUNT {
1681            let result = layer.key_exists(&(i * 2 + 1)).await.expect("key_exists failed");
1682            assert_ne!(result, Existence::Exists);
1683            if result == Existence::Missing {
1684                missing_count += 1;
1685            }
1686        }
1687        // We expect mostly Missing.
1688        assert!(missing_count > ITEM_COUNT / 2);
1689    }
1690}