fxfs/lsm_tree/
persistent_layer.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5// PersistentLayer object format
6//
7// The layer is made up of 1 or more "blocks" whose size are some multiple of the block size used
8// by the underlying handle.
9//
10// The persistent layer has 4 types of blocks:
11//  - Header block
12//  - Data block
13//  - BloomFilter block
14//  - Seek block (+LayerInfo)
15//
16// The structure of the file is as follows:
17//
18// blk#     contents
19// 0        [Header]
20// 1        [Data]
21// 2        [Data]
22// ...      [Data]
23// L        [BloomFilter]
24// L + 1    [BloomFilter]
25// ...      [BloomFilter]
26// M        [Seek]
27// M + 1    [Seek]
28// ...      [Seek]
29// N        [Seek/LayerInfo]
30//
31// Generally, there will be an order of magnitude more Data blocks than Seek/BloomFilter blocks.
32//
33// Header contains a Version-prefixed LayerHeader struct.  This version is used for everything in
34// the layer file.
35//
36// Data blocks contain a little endian encoded u16 item count at the start, then a series of
37// serialized items, and a list of little endian u16 offsets within the block for where
38// serialized items start, excluding the first item (since it is at a known offset). The list of
39// offsets ends at the end of the block and since the items are of variable length, there may be
40// space between the two sections if the next item and its offset cannot fit into the block.
41//
42// |item_count|item|item|item|item|item|item|dead space|offset|offset|offset|offset|offset|
43//
44// BloomFilter blocks contain a bitmap which is used to probabilistically determine if a given key
45// might exist in the layer file.   See `BloomFilter` for details on this structure.  Note that this
46// can be absent from the file for small layer files.
47//
48// Seek/LayerInfo blocks contain both the seek table, and a single LayerInfo struct at the tail of
49// the last block, with the LayerInfo's length written as a little-endian u64 at the very end.  The
50// padding between the two structs is ignored but nominally is zeroed. They share blocks to avoid
51// wasting padding bytes.  Note that the seek table can be absent from the file for small layer
52// files (but there will always be one block for the LayerInfo).
53//
54// The seek table consists of a little-endian u64 for every data block except for the first one. The
55// entries should be monotonically increasing, as they represent some mapping for how the keys for
56// the first item in each block would be predominantly sorted, and there may be duplicate entries.
57// There should be exactly as many seek blocks as are required to house one entry fewer than the
58// number of data blocks.
59
60use crate::drop_event::DropEvent;
61use crate::errors::FxfsError;
62use crate::filesystem::MAX_BLOCK_SIZE;
63use crate::log::*;
64use crate::lsm_tree::bloom_filter::{BloomFilterReader, BloomFilterStats, BloomFilterWriter};
65use crate::lsm_tree::types::{
66    BoxedLayerIterator, FuzzyHash, Item, ItemCount, ItemRef, Key, Layer, LayerIterator, LayerValue,
67    LayerWriter,
68};
69use crate::object_handle::{ObjectHandle, ReadObjectHandle, WriteBytes};
70use crate::object_store::caching_object_handle::{CachedChunk, CachingObjectHandle, CHUNK_SIZE};
71use crate::round::{round_down, round_up};
72use crate::serialized_types::{Version, Versioned, VersionedLatest, LATEST_VERSION};
73use anyhow::{anyhow, bail, ensure, Context, Error};
74use async_trait::async_trait;
75use byteorder::{ByteOrder, LittleEndian, ReadBytesExt, WriteBytesExt};
76use fprint::TypeFingerprint;
77use fuchsia_sync::Mutex;
78use rand::Rng as _;
79use serde::{Deserialize, Serialize};
80use static_assertions::const_assert;
81use std::cmp::Ordering;
82use std::io::{Read, Write as _};
83use std::marker::PhantomData;
84use std::ops::Bound;
85use std::sync::Arc;
86
87const PERSISTENT_LAYER_MAGIC: &[u8; 8] = b"FxfsLayr";
88
89/// LayerHeader is stored in the first block of the persistent layer.
90pub type LayerHeader = LayerHeaderV39;
91
92#[derive(Debug, Serialize, Deserialize, TypeFingerprint, Versioned)]
93pub struct LayerHeaderV39 {
94    /// 'FxfsLayr'
95    magic: [u8; 8],
96    /// The block size used within this layer file. This is typically set at compaction time to the
97    /// same block size as the underlying object handle.
98    ///
99    /// (Each block starts with a 2 byte item count so there is a 64k item limit per block,
100    /// regardless of block size).
101    block_size: u64,
102}
103
104/// The last block of each layer contains metadata for the rest of the layer.
105pub type LayerInfo = LayerInfoV39;
106
107#[derive(Debug, Serialize, Deserialize, TypeFingerprint, Versioned)]
108pub struct LayerInfoV39 {
109    /// How many items are in the layer file.  Mainly used for sizing bloom filters during
110    /// compaction.
111    num_items: usize,
112    /// The number of data blocks in the layer file.
113    num_data_blocks: u64,
114    /// The size of the bloom filter in the layer file.  Not necessarily block-aligned.
115    bloom_filter_size_bytes: usize,
116    /// The seed for the nonces used in the bloom filter.
117    bloom_filter_seed: u64,
118    /// How many nonces to use for bloom filter hashing.
119    bloom_filter_num_nonces: usize,
120}
121
122/// Prior to V39, the old layer file format is used.  The LayerInfo was stored in the first block.
123pub type OldLayerInfo = OldLayerInfoV32;
124
125#[derive(Debug, Serialize, Deserialize, TypeFingerprint, Versioned)]
126pub struct OldLayerInfoV32 {
127    /// The version of the key and value structs serialized in this layer.
128    key_value_version: Version,
129    /// The block size used within this layer file. This is typically set at compaction time to the
130    /// same block size as the underlying object handle.
131    ///
132    /// (Each block starts with a 2 byte item count so there is a 64k item limit per block,
133    /// regardless of block size).
134    block_size: u64,
135}
136
137/// A handle to a persistent layer.
138pub struct PersistentLayer<K, V> {
139    // We retain a reference to the underlying object handle so we can hand out references to it for
140    // `Layer::handle` when clients need it.  Internal reads should go through
141    // `caching_object_handle` so they are cached.  Note that `CachingObjectHandle` used to
142    // implement `ReadObjectHandle`, but that was removed so that `CachingObjectHandle` could hand
143    // out data references rather than requiring copying to a buffer, which speeds up LSM tree
144    // operations.
145    object_handle: Arc<dyn ReadObjectHandle>,
146    caching_object_handle: CachingObjectHandle<Arc<dyn ReadObjectHandle>>,
147    version: Version,
148    block_size: u64,
149    data_size: u64,
150    seek_table: Vec<u64>,
151    num_items: Option<usize>,
152    bloom_filter: Option<BloomFilterReader<K>>,
153    bloom_filter_stats: Option<BloomFilterStats>,
154    close_event: Mutex<Option<Arc<DropEvent>>>,
155    _value_type: PhantomData<V>,
156}
157
158#[derive(Debug)]
159struct BufferCursor {
160    chunk: Option<CachedChunk>,
161    pos: usize,
162}
163
164impl std::io::Read for BufferCursor {
165    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
166        let chunk = if let Some(chunk) = &self.chunk {
167            chunk
168        } else {
169            return Ok(0);
170        };
171        let to_read = std::cmp::min(buf.len(), chunk.len().saturating_sub(self.pos));
172        if to_read > 0 {
173            buf[..to_read].copy_from_slice(&chunk[self.pos..self.pos + to_read]);
174            self.pos += to_read;
175        }
176        Ok(to_read)
177    }
178}
179
180const MIN_BLOCK_SIZE: u64 = 512;
181
182// For small layer files, don't bother with the bloom filter.  Arbitrarily chosen.
183const MINIMUM_DATA_BLOCKS_FOR_BLOOM_FILTER: usize = 4;
184
185// How many blocks we reserve for the header.  Data blocks start at this offset.
186const NUM_HEADER_BLOCKS: u64 = 1;
187
188/// The smallest possible (empty) layer file is always 2 blocks, one for the header and one for
189/// LayerInfo.
190const MINIMUM_LAYER_FILE_BLOCKS: u64 = 2;
191
192// Put safety rails on the size of the bloom filter and seek table to avoid OOMing the system.
193// It's more likely that tampering has occurred in these cases.
194const MAX_BLOOM_FILTER_SIZE: usize = 64 * 1024 * 1024;
195const MAX_SEEK_TABLE_SIZE: usize = 64 * 1024 * 1024;
196
197// The following constants refer to sizes of metadata in the data blocks.
198const PER_DATA_BLOCK_HEADER_SIZE: usize = 2;
199const PER_DATA_BLOCK_SEEK_ENTRY_SIZE: usize = 2;
200
201// A key-only iterator, used while seeking through the tree.
202struct KeyOnlyIterator<'iter, K: Key, V: LayerValue> {
203    // Allocated out of |layer|.
204    buffer: BufferCursor,
205
206    layer: &'iter PersistentLayer<K, V>,
207
208    // The position of the _next_ block to be read.
209    pos: u64,
210
211    // The item index in the current block.
212    item_index: u16,
213
214    // The number of items in the current block.
215    item_count: u16,
216
217    // The current key.
218    key: Option<K>,
219
220    // Set by a wrapping iterator once the value has been deserialized, so the KeyOnlyIterator knows
221    // whether it is pointing at the next key or not.
222    value_deserialized: bool,
223}
224
225impl<K: Key, V: LayerValue> KeyOnlyIterator<'_, K, V> {
226    fn new<'iter>(layer: &'iter PersistentLayer<K, V>, pos: u64) -> KeyOnlyIterator<'iter, K, V> {
227        assert!(pos % layer.block_size == 0);
228        KeyOnlyIterator {
229            layer,
230            buffer: BufferCursor { chunk: None, pos: pos as usize % CHUNK_SIZE },
231            pos,
232            item_index: 0,
233            item_count: 0,
234            key: None,
235            value_deserialized: false,
236        }
237    }
238
239    // Repositions the iterator to point to the `index`'th item in the current block.
240    // Returns an error if the index is out of range or the resulting offset contains an obviously
241    // invalid value.
242    fn seek_to_block_item(&mut self, index: u16) -> Result<(), Error> {
243        ensure!(index < self.item_count, FxfsError::OutOfRange);
244        if index == self.item_index && self.value_deserialized {
245            // Fast-path when we are seeking in a linear manner, as is the case when advancing a
246            // wrapping iterator that also deserializes the values.
247            return Ok(());
248        }
249        let offset_in_block = if index == 0 {
250            // First entry isn't actually recorded, it is at the start of the block after the item
251            // count.
252            PER_DATA_BLOCK_HEADER_SIZE
253        } else {
254            let old_buffer_pos = self.buffer.pos;
255            self.buffer.pos = round_up(self.buffer.pos, self.layer.block_size as usize).unwrap()
256                - (PER_DATA_BLOCK_SEEK_ENTRY_SIZE * (usize::from(self.item_count - index)));
257            let res = self.buffer.read_u16::<LittleEndian>();
258            self.buffer.pos = old_buffer_pos;
259            let offset_in_block = res.context("Failed to read offset")? as usize;
260            if offset_in_block >= self.layer.block_size as usize
261                || offset_in_block <= PER_DATA_BLOCK_HEADER_SIZE
262            {
263                return Err(anyhow!(FxfsError::Inconsistent))
264                    .context(format!("Offset {} is out of valid range.", offset_in_block));
265            }
266            offset_in_block
267        };
268        self.item_index = index;
269        self.buffer.pos =
270            round_down(self.buffer.pos, self.layer.block_size as usize) + offset_in_block;
271        Ok(())
272    }
273
274    async fn advance(&mut self) -> Result<(), Error> {
275        if self.item_index >= self.item_count {
276            if self.pos >= self.layer.data_offset() + self.layer.data_size {
277                self.key = None;
278                return Ok(());
279            }
280            if self.buffer.chunk.is_none() || self.pos as usize % CHUNK_SIZE == 0 {
281                self.buffer.chunk = Some(
282                    self.layer
283                        .caching_object_handle
284                        .read(self.pos as usize)
285                        .await
286                        .context("Reading during advance")?,
287                );
288            }
289            self.buffer.pos = self.pos as usize % CHUNK_SIZE;
290            self.item_count = self.buffer.read_u16::<LittleEndian>()?;
291            if self.item_count == 0 {
292                bail!(
293                    "Read block with zero item count (object: {}, offset: {})",
294                    self.layer.object_handle.object_id(),
295                    self.pos
296                );
297            }
298            debug!(
299                pos = self.pos,
300                buf:? = self.buffer,
301                object_size = self.layer.data_offset() + self.layer.data_size,
302                oid = self.layer.object_handle.object_id();
303                ""
304            );
305            self.pos += self.layer.block_size;
306            self.item_index = 0;
307            self.value_deserialized = true;
308        }
309        self.seek_to_block_item(self.item_index)?;
310        self.key = Some(
311            K::deserialize_from_version(self.buffer.by_ref(), self.layer.version)
312                .context("Corrupt layer (key)")?,
313        );
314        self.item_index += 1;
315        self.value_deserialized = false;
316        Ok(())
317    }
318
319    fn get(&self) -> Option<&K> {
320        self.key.as_ref()
321    }
322}
323
324struct Iterator<'iter, K: Key, V: LayerValue> {
325    inner: KeyOnlyIterator<'iter, K, V>,
326    // The current item.
327    item: Option<Item<K, V>>,
328}
329
330impl<'iter, K: Key, V: LayerValue> Iterator<'iter, K, V> {
331    fn new(mut seek_iterator: KeyOnlyIterator<'iter, K, V>) -> Result<Self, Error> {
332        let key = std::mem::take(&mut seek_iterator.key);
333        let item = if let Some(key) = key {
334            seek_iterator.value_deserialized = true;
335            Some(Item {
336                key,
337                value: V::deserialize_from_version(
338                    seek_iterator.buffer.by_ref(),
339                    seek_iterator.layer.version,
340                )
341                .context("Corrupt layer (value)")?,
342                sequence: seek_iterator
343                    .buffer
344                    .read_u64::<LittleEndian>()
345                    .context("Corrupt layer (seq)")?,
346            })
347        } else {
348            None
349        };
350        Ok(Self { inner: seek_iterator, item })
351    }
352}
353
354#[async_trait]
355impl<'iter, K: Key, V: LayerValue> LayerIterator<K, V> for Iterator<'iter, K, V> {
356    async fn advance(&mut self) -> Result<(), Error> {
357        self.inner.advance().await?;
358        let key = std::mem::take(&mut self.inner.key);
359        self.item = if let Some(key) = key {
360            self.inner.value_deserialized = true;
361            Some(Item {
362                key,
363                value: V::deserialize_from_version(
364                    self.inner.buffer.by_ref(),
365                    self.inner.layer.version,
366                )
367                .context("Corrupt layer (value)")?,
368                sequence: self
369                    .inner
370                    .buffer
371                    .read_u64::<LittleEndian>()
372                    .context("Corrupt layer (seq)")?,
373            })
374        } else {
375            None
376        };
377        Ok(())
378    }
379
380    fn get(&self) -> Option<ItemRef<'_, K, V>> {
381        self.item.as_ref().map(<&Item<K, V>>::into)
382    }
383}
384
385// Returns the size of the seek table in bytes.
386fn seek_table_size(num_data_blocks: u64) -> usize {
387    // The first data block doesn't have an entry.
388    let seek_table_entries = num_data_blocks.saturating_sub(1) as usize;
389    if seek_table_entries == 0 {
390        return 0;
391    }
392    let entry_size = std::mem::size_of::<u64>();
393    seek_table_entries * entry_size
394}
395
396async fn load_seek_table(
397    object_handle: &(impl ReadObjectHandle + 'static),
398    seek_table_offset: u64,
399    num_data_blocks: u64,
400) -> Result<Vec<u64>, Error> {
401    let seek_table_size = seek_table_size(num_data_blocks);
402    if seek_table_size == 0 {
403        return Ok(vec![]);
404    }
405    if seek_table_size > MAX_SEEK_TABLE_SIZE {
406        return Err(anyhow!(FxfsError::NotSupported)).context("Seek table too large");
407    }
408    let mut buffer = object_handle.allocate_buffer(seek_table_size).await;
409    let bytes_read = object_handle
410        .read(seek_table_offset, buffer.as_mut())
411        .await
412        .context("Reading seek table blocks")?;
413    ensure!(bytes_read == seek_table_size, "Short read");
414
415    let mut seek_table = Vec::with_capacity(num_data_blocks as usize);
416    // No entry for the first data block, assume a lower bound 0.
417    seek_table.push(0);
418    let mut prev = 0;
419    for chunk in buffer.as_slice().chunks_exact(std::mem::size_of::<u64>()) {
420        let next = LittleEndian::read_u64(chunk);
421        // Should be in strict ascending order, otherwise something's broken, or we've gone off
422        // the end and we're reading zeroes.
423        if prev > next {
424            return Err(anyhow!(FxfsError::Inconsistent))
425                .context(format!("Seek table entry out of order, {:?} > {:?}", prev, next));
426        }
427        prev = next;
428        seek_table.push(next);
429    }
430    Ok(seek_table)
431}
432
433async fn load_bloom_filter<K: FuzzyHash>(
434    handle: &(impl ReadObjectHandle + 'static),
435    bloom_filter_offset: u64,
436    layer_info: &LayerInfo,
437) -> Result<Option<BloomFilterReader<K>>, Error> {
438    if layer_info.bloom_filter_size_bytes == 0 {
439        return Ok(None);
440    }
441    if layer_info.bloom_filter_size_bytes > MAX_BLOOM_FILTER_SIZE {
442        return Err(anyhow!(FxfsError::NotSupported)).context("Bloom filter too large");
443    }
444    let mut buffer = handle.allocate_buffer(layer_info.bloom_filter_size_bytes).await;
445    handle.read(bloom_filter_offset, buffer.as_mut()).await.context("Failed to read")?;
446    Ok(Some(BloomFilterReader::read(
447        buffer.as_slice(),
448        layer_info.bloom_filter_seed,
449        layer_info.bloom_filter_num_nonces,
450    )?))
451}
452
453impl<K: Key, V: LayerValue> PersistentLayer<K, V> {
454    pub async fn open(handle: impl ReadObjectHandle + 'static) -> Result<Arc<Self>, Error> {
455        let bs = handle.block_size();
456        let mut buffer = handle.allocate_buffer(bs as usize).await;
457        handle.read(0, buffer.as_mut()).await.context("Failed to read first block")?;
458        let mut cursor = std::io::Cursor::new(buffer.as_slice());
459        let version = Version::deserialize_from(&mut cursor)?;
460
461        ensure!(version <= LATEST_VERSION, FxfsError::InvalidVersion);
462        let header = LayerHeader::deserialize_from_version(&mut cursor, version)
463            .context("Failed to deserialize header")?;
464        if &header.magic != PERSISTENT_LAYER_MAGIC {
465            return Err(anyhow!(FxfsError::Inconsistent).context("Invalid layer file magic"));
466        }
467        if header.block_size == 0 || !header.block_size.is_power_of_two() {
468            return Err(anyhow!(FxfsError::Inconsistent))
469                .context(format!("Invalid block size {}", header.block_size));
470        }
471        ensure!(header.block_size > 0, FxfsError::Inconsistent);
472        ensure!(header.block_size <= MAX_BLOCK_SIZE, FxfsError::NotSupported);
473        let physical_block_size = handle.block_size();
474        if header.block_size % physical_block_size != 0 {
475            return Err(anyhow!(FxfsError::Inconsistent)).context(format!(
476                "{} not a multiple of physical block size {}",
477                header.block_size, physical_block_size
478            ));
479        }
480        std::mem::drop(cursor);
481
482        let bs = header.block_size as usize;
483        if handle.get_size() < MINIMUM_LAYER_FILE_BLOCKS * bs as u64 {
484            return Err(anyhow!(FxfsError::Inconsistent).context("Layer file too short"));
485        }
486
487        let layer_info = {
488            let last_block_offset = handle
489                .get_size()
490                .checked_sub(header.block_size)
491                .ok_or(FxfsError::Inconsistent)
492                .context("Layer file unexpectedly short")?;
493            handle
494                .read(last_block_offset, buffer.subslice_mut(0..header.block_size as usize))
495                .await
496                .context("Failed to read layer info")?;
497            let layer_info_len =
498                LittleEndian::read_u64(&buffer.as_slice()[bs - std::mem::size_of::<u64>()..]);
499            let layer_info_offset = bs
500                .checked_sub(std::mem::size_of::<u64>() + layer_info_len as usize)
501                .ok_or(FxfsError::Inconsistent)
502                .context("Invalid layer info length")?;
503            let mut cursor = std::io::Cursor::new(&buffer.as_slice()[layer_info_offset..]);
504            LayerInfo::deserialize_from_version(&mut cursor, version)
505                .context("Failed to deserialize LayerInfo")?
506        };
507        std::mem::drop(buffer);
508        if layer_info.num_items == 0 && layer_info.num_data_blocks > 0 {
509            return Err(anyhow!(FxfsError::Inconsistent))
510                .context("Invalid num_items/num_data_blocks");
511        }
512        let total_blocks = handle.get_size() / header.block_size;
513        let bloom_filter_blocks =
514            round_up(layer_info.bloom_filter_size_bytes as u64, header.block_size)
515                .unwrap_or(layer_info.bloom_filter_size_bytes as u64)
516                / header.block_size;
517        if layer_info.num_data_blocks + bloom_filter_blocks
518            > total_blocks - MINIMUM_LAYER_FILE_BLOCKS
519        {
520            return Err(anyhow!(FxfsError::Inconsistent)).context("Invalid number of blocks");
521        }
522
523        let bloom_filter_offset =
524            header.block_size * (NUM_HEADER_BLOCKS + layer_info.num_data_blocks);
525        let bloom_filter = load_bloom_filter(&handle, bloom_filter_offset, &layer_info)
526            .await
527            .context("Failed to load bloom filter")?;
528        let bloom_filter_stats = bloom_filter.as_ref().map(|b| b.stats());
529
530        let seek_offset = header.block_size
531            * (NUM_HEADER_BLOCKS + layer_info.num_data_blocks + bloom_filter_blocks);
532        let seek_table = load_seek_table(&handle, seek_offset, layer_info.num_data_blocks)
533            .await
534            .context("Failed to load seek table")?;
535
536        let object_handle = Arc::new(handle) as Arc<dyn ReadObjectHandle>;
537        let caching_object_handle = CachingObjectHandle::new(object_handle.clone());
538        Ok(Arc::new(PersistentLayer {
539            object_handle,
540            caching_object_handle,
541            version,
542            block_size: header.block_size,
543            data_size: layer_info.num_data_blocks * header.block_size,
544            seek_table,
545            num_items: Some(layer_info.num_items),
546            bloom_filter,
547            bloom_filter_stats,
548            close_event: Mutex::new(Some(Arc::new(DropEvent::new()))),
549            _value_type: PhantomData::default(),
550        }))
551    }
552
553    fn data_offset(&self) -> u64 {
554        NUM_HEADER_BLOCKS * self.block_size
555    }
556}
557
558#[async_trait]
559impl<K: Key, V: LayerValue> Layer<K, V> for PersistentLayer<K, V> {
560    fn handle(&self) -> Option<&dyn ReadObjectHandle> {
561        Some(&self.object_handle)
562    }
563
564    fn purge_cached_data(&self) {
565        self.caching_object_handle.purge();
566    }
567
568    async fn seek<'a>(&'a self, bound: Bound<&K>) -> Result<BoxedLayerIterator<'a, K, V>, Error> {
569        let (key, excluded) = match bound {
570            Bound::Unbounded => {
571                let mut iterator = Iterator::new(KeyOnlyIterator::new(self, self.data_offset()))?;
572                iterator.advance().await.context("Unbounded seek advance")?;
573                return Ok(Box::new(iterator));
574            }
575            Bound::Included(k) => (k, false),
576            Bound::Excluded(k) => (k, true),
577        };
578        let first_data_block_index = self.data_offset() / self.block_size;
579
580        let (mut left_offset, mut right_offset) = {
581            // We are searching for a range here, as multiple items can have the same value in
582            // this approximate search. Since the values used are the smallest in the associated
583            // block it means that if the value equals the target you should also search the
584            // one before it. The goal is for table[left] < target < table[right].
585            let target = key.get_leading_u64();
586            // Because the first entry in the table is always 0, right_index will never be 0.
587            let right_index = self.seek_table.as_slice().partition_point(|&x| x <= target) as u64;
588            // Since partition_point will find the index of the first place where the predicate
589            // is false, we subtract 1 to get the index where it was last true.
590            let left_index = self.seek_table.as_slice()[..right_index as usize]
591                .partition_point(|&x| x < target)
592                .saturating_sub(1) as u64;
593
594            (
595                (left_index + first_data_block_index) * self.block_size,
596                (right_index + first_data_block_index) * self.block_size,
597            )
598        };
599        let mut left = KeyOnlyIterator::new(self, left_offset);
600        left.advance().await.context("Initial seek advance")?;
601        match left.get() {
602            None => return Ok(Box::new(Iterator::new(left)?)),
603            Some(left_key) => match left_key.cmp_upper_bound(key) {
604                Ordering::Greater => return Ok(Box::new(Iterator::new(left)?)),
605                Ordering::Equal => {
606                    if excluded {
607                        left.advance().await?;
608                    }
609                    return Ok(Box::new(Iterator::new(left)?));
610                }
611                Ordering::Less => {}
612            },
613        }
614        let mut right = None;
615        while right_offset - left_offset > self.block_size {
616            // Pick a block midway.
617            let mid_offset =
618                round_down(left_offset + (right_offset - left_offset) / 2, self.block_size);
619            let mut iterator = KeyOnlyIterator::new(self, mid_offset);
620            iterator.advance().await?;
621            let iter_key: &K = iterator.get().unwrap();
622            match iter_key.cmp_upper_bound(key) {
623                Ordering::Greater => {
624                    right_offset = mid_offset;
625                    right = Some(iterator);
626                }
627                Ordering::Equal => {
628                    if excluded {
629                        iterator.advance().await?;
630                    }
631                    return Ok(Box::new(Iterator::new(iterator)?));
632                }
633                Ordering::Less => {
634                    left_offset = mid_offset;
635                    left = iterator;
636                }
637            }
638        }
639
640        // Finish the binary search on the block pointed to by `left`.
641        let mut left_index = 0;
642        let mut right_index = left.item_count;
643        // If the size is zero then we don't touch the iterator.
644        while left_index < (right_index - 1) {
645            let mid_index = left_index + ((right_index - left_index) / 2);
646            left.seek_to_block_item(mid_index).context("Read index offset for binary search")?;
647            left.advance().await?;
648            match left.get().unwrap().cmp_upper_bound(key) {
649                Ordering::Greater => {
650                    right_index = mid_index;
651                }
652                Ordering::Equal => {
653                    if excluded {
654                        left.advance().await?;
655                    }
656                    return Ok(Box::new(Iterator::new(left)?));
657                }
658                Ordering::Less => {
659                    left_index = mid_index;
660                }
661            }
662        }
663        // When we don't find an exact match, we need to return with the first entry *after* the the
664        // target key which might be the first one in the next block, currently already pointed to
665        // by the "right" buffer, but usually it's just the result of the right index within the
666        // "left" buffer.
667        if right_index < left.item_count {
668            left.seek_to_block_item(right_index)
669                .context("Read index for offset of right pointer")?;
670        } else if let Some(right) = right {
671            return Ok(Box::new(Iterator::new(right)?));
672        } else {
673            // We want the end of the layer.  `right_index == left.item_count`, so `left_index ==
674            // left.item_count - 1`, and the left iterator must be positioned on `left_index` since
675            // we cannot have gone through the `Ordering::Greater` path above because `right_index`
676            // would not be equal to `left.item_count` in that case, so all we need to do is advance
677            // the iterator.
678        }
679        left.advance().await?;
680        return Ok(Box::new(Iterator::new(left)?));
681    }
682
683    fn estimated_len(&self) -> ItemCount {
684        match self.num_items {
685            Some(num_items) => ItemCount::Precise(num_items),
686            None => {
687                // Older layer files didn't have the exact number, so guess it.  Precision doesn't
688                // matter too much here, as the impact of getting this wrong is mis-sizing the bloom
689                // filter, which will be corrected during later compactions anyways.
690                const ITEM_SIZE_ESTIMATE: usize = 32;
691                ItemCount::Estimate(self.object_handle.get_size() as usize / ITEM_SIZE_ESTIMATE)
692            }
693        }
694    }
695
696    fn maybe_contains_key(&self, key: &K) -> bool {
697        self.bloom_filter.as_ref().map_or(true, |f| f.maybe_contains(key))
698    }
699
700    fn lock(&self) -> Option<Arc<DropEvent>> {
701        self.close_event.lock().clone()
702    }
703
704    async fn close(&self) {
705        let listener = self.close_event.lock().take().expect("close already called").listen();
706        listener.await;
707    }
708
709    fn get_version(&self) -> Version {
710        return self.version;
711    }
712
713    fn record_inspect_data(self: Arc<Self>, node: &fuchsia_inspect::Node) {
714        node.record_bool("persistent", true);
715        node.record_uint("size", self.object_handle.get_size());
716        if let Some(stats) = self.bloom_filter_stats.as_ref() {
717            node.record_child("bloom_filter", move |node| {
718                node.record_uint("size", stats.size as u64);
719                node.record_uint("num_nonces", stats.num_nonces as u64);
720                node.record_uint("fill_percentage", stats.fill_percentage as u64);
721            });
722        }
723        if let Some(items) = self.num_items {
724            node.record_uint("num_items", items as u64);
725        }
726    }
727}
728
729// This ensures that item_count can't be overflowed below.
730const_assert!(MAX_BLOCK_SIZE <= u16::MAX as u64 + 1);
731
732// -- Writer support --
733
734pub struct PersistentLayerWriter<W: WriteBytes, K: Key, V: LayerValue> {
735    writer: W,
736    block_size: u64,
737    buf: Vec<u8>,
738    buf_item_count: u16,
739    item_count: usize,
740    block_offsets: Vec<u16>,
741    block_keys: Vec<u64>,
742    bloom_filter: BloomFilterWriter<K>,
743    _value: PhantomData<V>,
744}
745
746impl<W: WriteBytes, K: Key, V: LayerValue> PersistentLayerWriter<W, K, V> {
747    /// Creates a new writer that will serialize items to the object accessible via |object_handle|
748    /// (which provides a write interface to the object).
749    /// `estimated_num_items` is an estimate for the number of items that we expect to write into
750    /// the layer.  This need not be exact; it is used for estimating the size of bloom filter to
751    /// create.  In general this is likely to be an over-count, as during compaction items might be
752    /// merged.  That results in bloom filters that are slightly bigger than necessary, which isn't
753    /// a significant problem.
754    pub async fn new(
755        mut writer: W,
756        estimated_num_items: usize,
757        block_size: u64,
758    ) -> Result<Self, Error> {
759        ensure!(block_size <= MAX_BLOCK_SIZE, FxfsError::NotSupported);
760        ensure!(block_size >= MIN_BLOCK_SIZE, FxfsError::NotSupported);
761
762        // Write the header block.
763        let header = LayerHeader { magic: PERSISTENT_LAYER_MAGIC.clone(), block_size };
764        let mut buf = vec![0u8; block_size as usize];
765        {
766            let mut cursor = std::io::Cursor::new(&mut buf[..]);
767            header.serialize_with_version(&mut cursor)?;
768        }
769        writer.write_bytes(&buf[..]).await?;
770
771        let nonce: u64 = rand::thread_rng().gen();
772        Ok(PersistentLayerWriter {
773            writer,
774            block_size,
775            buf: Vec::new(),
776            buf_item_count: 0,
777            item_count: 0,
778            block_offsets: Vec::new(),
779            block_keys: Vec::new(),
780            bloom_filter: BloomFilterWriter::new(nonce, estimated_num_items),
781            _value: PhantomData,
782        })
783    }
784
785    /// Writes 'buf[..len]' out as a block.
786    ///
787    /// Blocks are fixed size, consisting of a 16-bit item count, data, zero padding
788    /// and seek table at the end.
789    async fn write_block(&mut self, len: usize) -> Result<(), Error> {
790        if self.buf_item_count == 0 {
791            return Ok(());
792        }
793        let seek_table_size = self.block_offsets.len() * PER_DATA_BLOCK_SEEK_ENTRY_SIZE;
794        assert!(PER_DATA_BLOCK_HEADER_SIZE + seek_table_size + len <= self.block_size as usize);
795        let mut cursor = std::io::Cursor::new(vec![0u8; self.block_size as usize]);
796        cursor.write_u16::<LittleEndian>(self.buf_item_count)?;
797        cursor.write_all(self.buf.drain(..len).as_ref())?;
798        cursor.set_position(self.block_size - seek_table_size as u64);
799        // Write the seek table. Entries are 2 bytes each and items are always at least 10.
800        for &offset in &self.block_offsets {
801            cursor.write_u16::<LittleEndian>(offset)?;
802        }
803        self.writer.write_bytes(cursor.get_ref()).await?;
804        debug!(item_count = self.buf_item_count, byte_count = len; "wrote items");
805        self.buf_item_count = 0;
806        self.block_offsets.clear();
807        Ok(())
808    }
809
810    // Assumes the writer is positioned to a new block.
811    // Returns the size, in bytes, of the seek table.
812    // Note that the writer will be positioned to exactly the end of the seek table, not to the end
813    // of a block.
814    async fn write_seek_table(&mut self) -> Result<usize, Error> {
815        if self.block_keys.len() == 0 {
816            return Ok(0);
817        }
818        let size = self.block_keys.len() * std::mem::size_of::<u64>();
819        self.buf.resize(size, 0);
820        let mut len = 0;
821        for key in &self.block_keys {
822            LittleEndian::write_u64(&mut self.buf[len..len + std::mem::size_of::<u64>()], *key);
823            len += std::mem::size_of::<u64>();
824        }
825        self.writer.write_bytes(&self.buf).await?;
826        Ok(size)
827    }
828
829    // Assumes the writer is positioned to exactly the end of the seek table, which was
830    // `seek_table_len` bytes.
831    async fn write_info(
832        &mut self,
833        num_data_blocks: u64,
834        bloom_filter_size_bytes: usize,
835        seek_table_len: usize,
836    ) -> Result<(), Error> {
837        let block_size = self.writer.block_size() as usize;
838        let layer_info = LayerInfo {
839            num_items: self.item_count,
840            num_data_blocks,
841            bloom_filter_size_bytes,
842            bloom_filter_seed: self.bloom_filter.seed(),
843            bloom_filter_num_nonces: self.bloom_filter.num_nonces(),
844        };
845        let actual_len = {
846            let mut cursor = std::io::Cursor::new(&mut self.buf);
847            layer_info.serialize_into(&mut cursor)?;
848            let layer_info_len = cursor.position();
849            cursor.write_u64::<LittleEndian>(layer_info_len)?;
850            cursor.position() as usize
851        };
852
853        // We want the LayerInfo to be at the end of the last block.  That might require creating a
854        // new block if we don't have enough room.
855        let avail_in_block = block_size - (seek_table_len % block_size);
856        let to_skip = if avail_in_block < actual_len {
857            block_size + avail_in_block - actual_len
858        } else {
859            avail_in_block - actual_len
860        } as u64;
861        self.writer.skip(to_skip).await?;
862        self.writer.write_bytes(&self.buf[..actual_len]).await?;
863        Ok(())
864    }
865
866    // Assumes the writer is positioned to a new block.
867    // Returns the size of the bloom filter, in bytes.
868    async fn write_bloom_filter(&mut self) -> Result<usize, Error> {
869        if self.data_blocks() < MINIMUM_DATA_BLOCKS_FOR_BLOOM_FILTER {
870            return Ok(0);
871        }
872        // TODO(https://fxbug.dev/323571978): Avoid bounce-buffering.
873        let size = round_up(self.bloom_filter.serialized_size(), self.block_size as usize).unwrap();
874        self.buf.resize(size, 0);
875        let mut cursor = std::io::Cursor::new(&mut self.buf);
876        self.bloom_filter.write(&mut cursor)?;
877        self.writer.write_bytes(&self.buf).await?;
878        Ok(self.bloom_filter.serialized_size())
879    }
880
881    fn data_blocks(&self) -> usize {
882        if self.item_count == 0 {
883            0
884        } else {
885            self.block_keys.len() + 1
886        }
887    }
888}
889
890impl<W: WriteBytes + Send, K: Key, V: LayerValue> LayerWriter<K, V>
891    for PersistentLayerWriter<W, K, V>
892{
893    async fn write(&mut self, item: ItemRef<'_, K, V>) -> Result<(), Error> {
894        // Note the length before we write this item.
895        let len = self.buf.len();
896        item.key.serialize_into(&mut self.buf)?;
897        item.value.serialize_into(&mut self.buf)?;
898        self.buf.write_u64::<LittleEndian>(item.sequence)?;
899        let mut added_offset = false;
900        // Never record the first item. The offset is always the same.
901        if self.buf_item_count > 0 {
902            self.block_offsets.push(u16::try_from(len + PER_DATA_BLOCK_HEADER_SIZE).unwrap());
903            added_offset = true;
904        }
905
906        // If writing the item took us over a block, flush the bytes in the buffer prior to this
907        // item.
908        if PER_DATA_BLOCK_HEADER_SIZE
909            + self.buf.len()
910            + (self.block_offsets.len() * PER_DATA_BLOCK_SEEK_ENTRY_SIZE)
911            > self.block_size as usize - 1
912        {
913            if added_offset {
914                // Drop the recently added offset from the list. The latest item will be the first
915                // on the next block and have a known offset there.
916                self.block_offsets.pop();
917            }
918            self.write_block(len).await?;
919
920            // Note that this will not insert an entry for the first data block.
921            self.block_keys.push(item.key.get_leading_u64());
922        }
923
924        self.bloom_filter.insert(&item.key);
925        self.buf_item_count += 1;
926        self.item_count += 1;
927        Ok(())
928    }
929
930    async fn flush(&mut self) -> Result<(), Error> {
931        self.write_block(self.buf.len()).await?;
932        let data_blocks = self.data_blocks() as u64;
933        let bloom_filter_len = self.write_bloom_filter().await?;
934        let seek_table_len = self.write_seek_table().await?;
935        self.write_info(data_blocks, bloom_filter_len, seek_table_len).await?;
936        self.writer.complete().await
937    }
938}
939
940impl<W: WriteBytes, K: Key, V: LayerValue> Drop for PersistentLayerWriter<W, K, V> {
941    fn drop(&mut self) {
942        if self.buf_item_count > 0 {
943            warn!("Dropping unwritten items; did you forget to flush?");
944        }
945    }
946}
947
948#[cfg(test)]
949mod tests {
950    use super::{PersistentLayer, PersistentLayerWriter};
951    use crate::filesystem::MAX_BLOCK_SIZE;
952    use crate::lsm_tree::types::{
953        DefaultOrdUpperBound, FuzzyHash, Item, ItemRef, Layer, LayerKey, LayerWriter, MergeType,
954        SortByU64,
955    };
956    use crate::lsm_tree::LayerIterator;
957    use crate::object_handle::WriteBytes;
958    use crate::round::round_up;
959    use crate::serialized_types::{
960        versioned_type, Version, Versioned, VersionedLatest, LATEST_VERSION,
961    };
962    use crate::testing::fake_object::{FakeObject, FakeObjectHandle};
963    use crate::testing::writer::Writer;
964    use fprint::TypeFingerprint;
965    use fxfs_macros::FuzzyHash;
966    use std::fmt::Debug;
967    use std::hash::Hash;
968    use std::ops::{Bound, Range};
969    use std::sync::Arc;
970
971    impl<W: WriteBytes> Debug for PersistentLayerWriter<W, i32, i32> {
972        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
973            f.debug_struct("rPersistentLayerWriter")
974                .field("block_size", &self.block_size)
975                .field("item_count", &self.buf_item_count)
976                .finish()
977        }
978    }
979
980    #[fuchsia::test]
981    async fn test_iterate_after_write() {
982        const BLOCK_SIZE: u64 = 512;
983        const ITEM_COUNT: i32 = 10000;
984
985        let handle = FakeObjectHandle::new(Arc::new(FakeObject::new()));
986        {
987            let mut writer = PersistentLayerWriter::<_, i32, i32>::new(
988                Writer::new(&handle).await,
989                ITEM_COUNT as usize * 4,
990                BLOCK_SIZE,
991            )
992            .await
993            .expect("writer new");
994            for i in 0..ITEM_COUNT {
995                writer.write(Item::new(i, i).as_item_ref()).await.expect("write failed");
996            }
997            writer.flush().await.expect("flush failed");
998        }
999        let layer = PersistentLayer::<i32, i32>::open(handle).await.expect("new failed");
1000        let mut iterator = layer.seek(Bound::Unbounded).await.expect("seek failed");
1001        for i in 0..ITEM_COUNT {
1002            let ItemRef { key, value, .. } = iterator.get().expect("missing item");
1003            assert_eq!((key, value), (&i, &i));
1004            iterator.advance().await.expect("failed to advance");
1005        }
1006        assert!(iterator.get().is_none());
1007    }
1008
1009    #[fuchsia::test]
1010    async fn test_seek_after_write() {
1011        const BLOCK_SIZE: u64 = 512;
1012        const ITEM_COUNT: i32 = 5000;
1013
1014        let handle = FakeObjectHandle::new(Arc::new(FakeObject::new()));
1015        {
1016            let mut writer = PersistentLayerWriter::<_, i32, i32>::new(
1017                Writer::new(&handle).await,
1018                ITEM_COUNT as usize * 18,
1019                BLOCK_SIZE,
1020            )
1021            .await
1022            .expect("writer new");
1023            for i in 0..ITEM_COUNT {
1024                // Populate every other value as an item.
1025                writer.write(Item::new(i * 2, i * 2).as_item_ref()).await.expect("write failed");
1026            }
1027            writer.flush().await.expect("flush failed");
1028        }
1029        let layer = PersistentLayer::<i32, i32>::open(handle).await.expect("new failed");
1030        // Search for all values to check the in-between values.
1031        for i in 0..ITEM_COUNT * 2 {
1032            // We've written every other value, we expect to get either the exact value searched
1033            // for, or the next one after it. So round up to the nearest multiple of 2.
1034            let expected = round_up(i, 2).unwrap();
1035            let mut iterator = layer.seek(Bound::Included(&i)).await.expect("failed to seek");
1036            // We've written values up to (N-1)*2=2*N-2, so when looking for 2*N-1 we'll go off the
1037            // end of the layer and get back no item.
1038            if i >= (ITEM_COUNT * 2) - 1 {
1039                assert!(iterator.get().is_none());
1040            } else {
1041                let ItemRef { key, value, .. } = iterator.get().expect("missing item");
1042                assert_eq!((key, value), (&expected, &expected));
1043            }
1044
1045            // Check that we can advance to the next item.
1046            iterator.advance().await.expect("failed to advance");
1047            // The highest value is 2*N-2, searching for 2*N-3 will find the last value, and
1048            // advancing will go off the end of the layer and return no item. If there was
1049            // previously no item, then it will latch and always return no item.
1050            if i >= (ITEM_COUNT * 2) - 3 {
1051                assert!(iterator.get().is_none());
1052            } else {
1053                let ItemRef { key, value, .. } = iterator.get().expect("missing item");
1054                let next = expected + 2;
1055                assert_eq!((key, value), (&next, &next));
1056            }
1057        }
1058    }
1059
1060    #[fuchsia::test]
1061    async fn test_seek_unbounded() {
1062        const BLOCK_SIZE: u64 = 512;
1063        const ITEM_COUNT: i32 = 1000;
1064
1065        let handle = FakeObjectHandle::new(Arc::new(FakeObject::new()));
1066        {
1067            let mut writer = PersistentLayerWriter::<_, i32, i32>::new(
1068                Writer::new(&handle).await,
1069                ITEM_COUNT as usize * 18,
1070                BLOCK_SIZE,
1071            )
1072            .await
1073            .expect("writer new");
1074            for i in 0..ITEM_COUNT {
1075                writer.write(Item::new(i, i).as_item_ref()).await.expect("write failed");
1076            }
1077            writer.flush().await.expect("flush failed");
1078        }
1079        let layer = PersistentLayer::<i32, i32>::open(handle).await.expect("new failed");
1080        let mut iterator = layer.seek(Bound::Unbounded).await.expect("failed to seek");
1081        let ItemRef { key, value, .. } = iterator.get().expect("missing item");
1082        assert_eq!((key, value), (&0, &0));
1083
1084        // Check that we can advance to the next item.
1085        iterator.advance().await.expect("failed to advance");
1086        let ItemRef { key, value, .. } = iterator.get().expect("missing item");
1087        assert_eq!((key, value), (&1, &1));
1088    }
1089
1090    #[fuchsia::test]
1091    async fn test_zero_items() {
1092        const BLOCK_SIZE: u64 = 512;
1093
1094        let handle = FakeObjectHandle::new(Arc::new(FakeObject::new()));
1095        {
1096            let mut writer = PersistentLayerWriter::<_, i32, i32>::new(
1097                Writer::new(&handle).await,
1098                0,
1099                BLOCK_SIZE,
1100            )
1101            .await
1102            .expect("writer new");
1103            writer.flush().await.expect("flush failed");
1104        }
1105
1106        let layer = PersistentLayer::<i32, i32>::open(handle).await.expect("new failed");
1107        let iterator = (layer.as_ref() as &dyn Layer<i32, i32>)
1108            .seek(Bound::Unbounded)
1109            .await
1110            .expect("seek failed");
1111        assert!(iterator.get().is_none())
1112    }
1113
1114    #[fuchsia::test]
1115    async fn test_one_item() {
1116        const BLOCK_SIZE: u64 = 512;
1117
1118        let handle = FakeObjectHandle::new(Arc::new(FakeObject::new()));
1119        {
1120            let mut writer = PersistentLayerWriter::<_, i32, i32>::new(
1121                Writer::new(&handle).await,
1122                1,
1123                BLOCK_SIZE,
1124            )
1125            .await
1126            .expect("writer new");
1127            writer.write(Item::new(42, 42).as_item_ref()).await.expect("write failed");
1128            writer.flush().await.expect("flush failed");
1129        }
1130
1131        let layer = PersistentLayer::<i32, i32>::open(handle).await.expect("new failed");
1132        {
1133            let mut iterator = (layer.as_ref() as &dyn Layer<i32, i32>)
1134                .seek(Bound::Unbounded)
1135                .await
1136                .expect("seek failed");
1137            let ItemRef { key, value, .. } = iterator.get().expect("missing item");
1138            assert_eq!((key, value), (&42, &42));
1139            iterator.advance().await.expect("failed to advance");
1140            assert!(iterator.get().is_none())
1141        }
1142        {
1143            let mut iterator = (layer.as_ref() as &dyn Layer<i32, i32>)
1144                .seek(Bound::Included(&30))
1145                .await
1146                .expect("seek failed");
1147            let ItemRef { key, value, .. } = iterator.get().expect("missing item");
1148            assert_eq!((key, value), (&42, &42));
1149            iterator.advance().await.expect("failed to advance");
1150            assert!(iterator.get().is_none())
1151        }
1152        {
1153            let mut iterator = (layer.as_ref() as &dyn Layer<i32, i32>)
1154                .seek(Bound::Included(&42))
1155                .await
1156                .expect("seek failed");
1157            let ItemRef { key, value, .. } = iterator.get().expect("missing item");
1158            assert_eq!((key, value), (&42, &42));
1159            iterator.advance().await.expect("failed to advance");
1160            assert!(iterator.get().is_none())
1161        }
1162        {
1163            let iterator = (layer.as_ref() as &dyn Layer<i32, i32>)
1164                .seek(Bound::Included(&43))
1165                .await
1166                .expect("seek failed");
1167            assert!(iterator.get().is_none())
1168        }
1169    }
1170
1171    #[fuchsia::test]
1172    async fn test_large_block_size() {
1173        // At the upper end of the supported size.
1174        const BLOCK_SIZE: u64 = MAX_BLOCK_SIZE;
1175        // Items will be 18 bytes, so fill up a few pages.
1176        const ITEM_COUNT: i32 = ((BLOCK_SIZE as i32) / 18) * 3;
1177
1178        let handle =
1179            FakeObjectHandle::new_with_block_size(Arc::new(FakeObject::new()), BLOCK_SIZE as usize);
1180        {
1181            let mut writer = PersistentLayerWriter::<_, i32, i32>::new(
1182                Writer::new(&handle).await,
1183                ITEM_COUNT as usize * 18,
1184                BLOCK_SIZE,
1185            )
1186            .await
1187            .expect("writer new");
1188            // Use large values to force varint encoding to use consistent space.
1189            for i in 2000000000..(2000000000 + ITEM_COUNT) {
1190                writer.write(Item::new(i, i).as_item_ref()).await.expect("write failed");
1191            }
1192            writer.flush().await.expect("flush failed");
1193        }
1194
1195        let layer = PersistentLayer::<i32, i32>::open(handle).await.expect("new failed");
1196        let mut iterator = layer.seek(Bound::Unbounded).await.expect("seek failed");
1197        for i in 2000000000..(2000000000 + ITEM_COUNT) {
1198            let ItemRef { key, value, .. } = iterator.get().expect("missing item");
1199            assert_eq!((key, value), (&i, &i));
1200            iterator.advance().await.expect("failed to advance");
1201        }
1202        assert!(iterator.get().is_none());
1203    }
1204
1205    #[fuchsia::test]
1206    async fn test_overlarge_block_size() {
1207        // At the upper end of the supported size.
1208        const BLOCK_SIZE: u64 = MAX_BLOCK_SIZE * 2;
1209
1210        let handle =
1211            FakeObjectHandle::new_with_block_size(Arc::new(FakeObject::new()), BLOCK_SIZE as usize);
1212        PersistentLayerWriter::<_, i32, i32>::new(Writer::new(&handle).await, 0, BLOCK_SIZE)
1213            .await
1214            .expect_err("Creating writer with overlarge block size.");
1215    }
1216
1217    #[fuchsia::test]
1218    async fn test_seek_bound_excluded() {
1219        const BLOCK_SIZE: u64 = 512;
1220        const ITEM_COUNT: i32 = 10000;
1221
1222        let handle = FakeObjectHandle::new(Arc::new(FakeObject::new()));
1223        {
1224            let mut writer = PersistentLayerWriter::<_, i32, i32>::new(
1225                Writer::new(&handle).await,
1226                ITEM_COUNT as usize * 18,
1227                BLOCK_SIZE,
1228            )
1229            .await
1230            .expect("writer new");
1231            for i in 0..ITEM_COUNT {
1232                writer.write(Item::new(i, i).as_item_ref()).await.expect("write failed");
1233            }
1234            writer.flush().await.expect("flush failed");
1235        }
1236        let layer = PersistentLayer::<i32, i32>::open(handle).await.expect("new failed");
1237
1238        for i in 9982..ITEM_COUNT {
1239            let mut iterator = layer.seek(Bound::Excluded(&i)).await.expect("failed to seek");
1240            let i_plus_one = i + 1;
1241            if i_plus_one < ITEM_COUNT {
1242                let ItemRef { key, value, .. } = iterator.get().expect("missing item");
1243
1244                assert_eq!((key, value), (&i_plus_one, &i_plus_one));
1245
1246                // Check that we can advance to the next item.
1247                iterator.advance().await.expect("failed to advance");
1248                let i_plus_two = i + 2;
1249                if i_plus_two < ITEM_COUNT {
1250                    let ItemRef { key, value, .. } = iterator.get().expect("missing item");
1251                    assert_eq!((key, value), (&i_plus_two, &i_plus_two));
1252                } else {
1253                    assert!(iterator.get().is_none());
1254                }
1255            } else {
1256                assert!(iterator.get().is_none());
1257            }
1258        }
1259    }
1260
1261    #[derive(
1262        Clone,
1263        Eq,
1264        Hash,
1265        FuzzyHash,
1266        PartialEq,
1267        Debug,
1268        serde::Serialize,
1269        serde::Deserialize,
1270        TypeFingerprint,
1271        Versioned,
1272    )]
1273    struct TestKey(Range<u64>);
1274    versioned_type! { 1.. => TestKey }
1275    impl SortByU64 for TestKey {
1276        fn get_leading_u64(&self) -> u64 {
1277            self.0.start
1278        }
1279    }
1280    impl LayerKey for TestKey {
1281        fn merge_type(&self) -> crate::lsm_tree::types::MergeType {
1282            MergeType::OptimizedMerge
1283        }
1284
1285        fn next_key(&self) -> Option<Self> {
1286            Some(TestKey(self.0.end..self.0.end + 1))
1287        }
1288    }
1289    impl Ord for TestKey {
1290        fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1291            self.0.start.cmp(&other.0.start).then(self.0.end.cmp(&other.0.end))
1292        }
1293    }
1294    impl PartialOrd for TestKey {
1295        fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1296            Some(self.cmp(other))
1297        }
1298    }
1299    impl DefaultOrdUpperBound for TestKey {}
1300
1301    // Create a large spread of data across several blocks to ensure that no part of the range is
1302    // lost by the partial search using the layer seek table.
1303    #[fuchsia::test]
1304    async fn test_block_seek_duplicate_keys() {
1305        // At the upper end of the supported size.
1306        const BLOCK_SIZE: u64 = 512;
1307        // Items will be 37 bytes each. Max length varint u64 is 9 bytes, 3 of those plus one
1308        // straight encoded sequence number for another 8 bytes. Then 2 more for each seek table
1309        // entry.
1310        const ITEMS_TO_FILL_BLOCK: u64 = BLOCK_SIZE / 37;
1311
1312        let mut to_find = Vec::new();
1313
1314        let handle =
1315            FakeObjectHandle::new_with_block_size(Arc::new(FakeObject::new()), BLOCK_SIZE as usize);
1316        {
1317            let mut writer = PersistentLayerWriter::<_, TestKey, u64>::new(
1318                Writer::new(&handle).await,
1319                3 * BLOCK_SIZE as usize,
1320                BLOCK_SIZE,
1321            )
1322            .await
1323            .expect("writer new");
1324
1325            // Make all values take up maximum space for varint encoding.
1326            let mut current_value = u32::MAX as u64 + 1;
1327
1328            // First fill the front with a duplicate leading u64 amount, then look at the start,
1329            // middle and end of the range.
1330            {
1331                let items = ITEMS_TO_FILL_BLOCK * 3;
1332                for i in 0..items {
1333                    writer
1334                        .write(
1335                            Item::new(TestKey(current_value..current_value + i), current_value)
1336                                .as_item_ref(),
1337                        )
1338                        .await
1339                        .expect("write failed");
1340                }
1341                to_find.push(TestKey(current_value..current_value));
1342                to_find.push(TestKey(current_value..(current_value + (items / 2))));
1343                to_find.push(TestKey(current_value..current_value + (items - 1)));
1344                current_value += 1;
1345            }
1346
1347            // Add some filler of all different leading u64.
1348            {
1349                let items = ITEMS_TO_FILL_BLOCK * 3;
1350                for _ in 0..items {
1351                    writer
1352                        .write(
1353                            Item::new(TestKey(current_value..current_value), current_value)
1354                                .as_item_ref(),
1355                        )
1356                        .await
1357                        .expect("write failed");
1358                    current_value += 1;
1359                }
1360            }
1361
1362            // Fill the middle with a duplicate leading u64 amount, then look at the start,
1363            // middle and end of the range.
1364            {
1365                let items = ITEMS_TO_FILL_BLOCK * 3;
1366                for i in 0..items {
1367                    writer
1368                        .write(
1369                            Item::new(TestKey(current_value..current_value + i), current_value)
1370                                .as_item_ref(),
1371                        )
1372                        .await
1373                        .expect("write failed");
1374                }
1375                to_find.push(TestKey(current_value..current_value));
1376                to_find.push(TestKey(current_value..(current_value + (items / 2))));
1377                to_find.push(TestKey(current_value..current_value + (items - 1)));
1378                current_value += 1;
1379            }
1380
1381            // Add some filler of all different leading u64.
1382            {
1383                let items = ITEMS_TO_FILL_BLOCK * 3;
1384                for _ in 0..items {
1385                    writer
1386                        .write(
1387                            Item::new(TestKey(current_value..current_value), current_value)
1388                                .as_item_ref(),
1389                        )
1390                        .await
1391                        .expect("write failed");
1392                    current_value += 1;
1393                }
1394            }
1395
1396            // Fill the end with a duplicate leading u64 amount, then look at the start,
1397            // middle and end of the range.
1398            {
1399                let items = ITEMS_TO_FILL_BLOCK * 3;
1400                for i in 0..items {
1401                    writer
1402                        .write(
1403                            Item::new(TestKey(current_value..current_value + i), current_value)
1404                                .as_item_ref(),
1405                        )
1406                        .await
1407                        .expect("write failed");
1408                }
1409                to_find.push(TestKey(current_value..current_value));
1410                to_find.push(TestKey(current_value..(current_value + (items / 2))));
1411                to_find.push(TestKey(current_value..current_value + (items - 1)));
1412            }
1413
1414            writer.flush().await.expect("flush failed");
1415        }
1416
1417        let layer = PersistentLayer::<TestKey, u64>::open(handle).await.expect("new failed");
1418        for target in to_find {
1419            let iterator: Box<dyn LayerIterator<TestKey, u64>> =
1420                layer.seek(Bound::Included(&target)).await.expect("failed to seek");
1421            let ItemRef { key, .. } = iterator.get().expect("missing item");
1422            assert_eq!(&target, key);
1423        }
1424    }
1425
1426    #[fuchsia::test]
1427    async fn test_two_seek_blocks() {
1428        // At the upper end of the supported size.
1429        const BLOCK_SIZE: u64 = 512;
1430        // Items will be 37 bytes each. Max length varint u64 is 9 bytes, 3 of those plus one
1431        // straight encoded sequence number for another 8 bytes. Then 2 more for each seek table
1432        // entry.
1433        const ITEMS_TO_FILL_BLOCK: u64 = BLOCK_SIZE / 37;
1434        // Add enough items to create enough blocks that the seek table can't fit into a single
1435        // block. Entries are 8 bytes. Remember to add one because the first block entry is omitted,
1436        // then add one more to overflow into the next block.
1437        const ITEM_COUNT: u64 = ITEMS_TO_FILL_BLOCK * ((BLOCK_SIZE / 8) + 2);
1438
1439        let mut to_find = Vec::new();
1440
1441        let handle =
1442            FakeObjectHandle::new_with_block_size(Arc::new(FakeObject::new()), BLOCK_SIZE as usize);
1443        {
1444            let mut writer = PersistentLayerWriter::<_, TestKey, u64>::new(
1445                Writer::new(&handle).await,
1446                ITEM_COUNT as usize * 18,
1447                BLOCK_SIZE,
1448            )
1449            .await
1450            .expect("writer new");
1451
1452            // Make all values take up maximum space for varint encoding.
1453            let initial_value = u32::MAX as u64 + 1;
1454            for i in 0..ITEM_COUNT {
1455                writer
1456                    .write(
1457                        Item::new(TestKey(initial_value + i..initial_value + i), initial_value)
1458                            .as_item_ref(),
1459                    )
1460                    .await
1461                    .expect("write failed");
1462            }
1463            // Look at the start middle and end.
1464            to_find.push(TestKey(initial_value..initial_value));
1465            let middle = initial_value + ITEM_COUNT / 2;
1466            to_find.push(TestKey(middle..middle));
1467            let end = initial_value + ITEM_COUNT - 1;
1468            to_find.push(TestKey(end..end));
1469
1470            writer.flush().await.expect("flush failed");
1471        }
1472
1473        let layer = PersistentLayer::<TestKey, u64>::open(handle).await.expect("new failed");
1474        for target in to_find {
1475            let iterator: Box<dyn LayerIterator<TestKey, u64>> =
1476                layer.seek(Bound::Included(&target)).await.expect("failed to seek");
1477            let ItemRef { key, .. } = iterator.get().expect("missing item");
1478            assert_eq!(&target, key);
1479        }
1480    }
1481
1482    // Verifies behaviour around creating full seek blocks, to ensure that it is able to be opened
1483    // and parsed afterward.
1484    #[fuchsia::test]
1485    async fn test_full_seek_block() {
1486        const BLOCK_SIZE: u64 = 512;
1487
1488        // Items will be 37 bytes each. Max length varint u64 is 9 bytes, 3 of those plus one
1489        // straight encoded sequence number for another 8 bytes. Then 2 more for each seek table
1490        // entry.
1491        const ITEMS_TO_FILL_BLOCK: u64 = BLOCK_SIZE / 37;
1492
1493        // How many entries there are in a seek table block.
1494        const SEEK_TABLE_ENTRIES: u64 = BLOCK_SIZE / 8;
1495
1496        // Number of entries to fill a seek block would need one more block of entries, but we're
1497        // starting low here on purpose to do a range and make sure we hit the size we are
1498        // interested in.
1499        const START_ENTRIES_COUNT: u64 = ITEMS_TO_FILL_BLOCK * SEEK_TABLE_ENTRIES;
1500
1501        for entries in START_ENTRIES_COUNT..START_ENTRIES_COUNT + (ITEMS_TO_FILL_BLOCK * 2) {
1502            let handle = FakeObjectHandle::new_with_block_size(
1503                Arc::new(FakeObject::new()),
1504                BLOCK_SIZE as usize,
1505            );
1506            {
1507                let mut writer = PersistentLayerWriter::<_, TestKey, u64>::new(
1508                    Writer::new(&handle).await,
1509                    entries as usize,
1510                    BLOCK_SIZE,
1511                )
1512                .await
1513                .expect("writer new");
1514
1515                // Make all values take up maximum space for varint encoding.
1516                let initial_value = u32::MAX as u64 + 1;
1517                for i in 0..entries {
1518                    writer
1519                        .write(
1520                            Item::new(TestKey(initial_value + i..initial_value + i), initial_value)
1521                                .as_item_ref(),
1522                        )
1523                        .await
1524                        .expect("write failed");
1525                }
1526
1527                writer.flush().await.expect("flush failed");
1528            }
1529            PersistentLayer::<TestKey, u64>::open(handle).await.expect("new failed");
1530        }
1531    }
1532}