Skip to main content

fxfs/lsm_tree/
persistent_layer.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5// PersistentLayer object format
6//
7// The layer is made up of 1 or more "blocks" whose size are some multiple of the block size used
8// by the underlying handle.
9//
10// The persistent layer has 4 types of blocks:
11//  - Header block
12//  - Data block
13//  - BloomFilter block
14//  - Seek block (+LayerInfo)
15//
16// The structure of the file is as follows:
17//
18// blk#     contents
19// 0        [Header]
20// 1        [Data]
21// 2        [Data]
22// ...      [Data]
23// L        [BloomFilter]
24// L + 1    [BloomFilter]
25// ...      [BloomFilter]
26// M        [Seek]
27// M + 1    [Seek]
28// ...      [Seek]
29// N        [Seek/LayerInfo]
30//
31// Generally, there will be an order of magnitude more Data blocks than Seek/BloomFilter blocks.
32//
33// Header contains a Version-prefixed LayerHeader struct.  This version is used for everything in
34// the layer file.
35//
36// Data blocks contain a little endian encoded u16 item count at the start, then a series of
37// serialized items, and a list of little endian u16 offsets within the block for where
38// serialized items start, excluding the first item (since it is at a known offset). The list of
39// offsets ends at the end of the block and since the items are of variable length, there may be
40// space between the two sections if the next item and its offset cannot fit into the block.
41//
42// |item_count|item|item|item|item|item|item|dead space|offset|offset|offset|offset|offset|
43//
44// BloomFilter blocks contain a bitmap which is used to probabilistically determine if a given key
45// might exist in the layer file.   See `BloomFilter` for details on this structure.  Note that this
46// can be absent from the file for small layer files.
47//
48// Seek/LayerInfo blocks contain both the seek table, and a single LayerInfo struct at the tail of
49// the last block, with the LayerInfo's length written as a little-endian u64 at the very end.  The
50// padding between the two structs is ignored but nominally is zeroed. They share blocks to avoid
51// wasting padding bytes.  Note that the seek table can be absent from the file for small layer
52// files (but there will always be one block for the LayerInfo).
53//
54// The seek table consists of a little-endian u64 for every data block except for the first one. The
55// entries should be monotonically increasing, as they represent some mapping for how the keys for
56// the first item in each block would be predominantly sorted, and there may be duplicate entries.
57// There should be exactly as many seek blocks as are required to house one entry fewer than the
58// number of data blocks.
59
60use crate::drop_event::DropEvent;
61use crate::errors::FxfsError;
62use crate::filesystem::MAX_BLOCK_SIZE;
63use crate::log::*;
64use crate::lsm_tree::bloom_filter::{BloomFilterReader, BloomFilterStats, BloomFilterWriter};
65use crate::lsm_tree::types::{
66    BoxedLayerIterator, Existence, FuzzyHash, Item, ItemRef, Key, Layer, LayerIterator, LayerValue,
67    LayerWriter,
68};
69use crate::object_handle::{ObjectHandle, ReadObjectHandle, WriteBytes};
70use crate::object_store::caching_object_handle::{CHUNK_SIZE, CachedChunk, CachingObjectHandle};
71use crate::round::{round_down, round_up};
72use crate::serialized_types::{
73    LATEST_VERSION, REMOVE_ITEM_SEQUENCE_VERSION, Version, Versioned, VersionedLatest,
74};
75use anyhow::{Context, Error, anyhow, bail, ensure};
76use async_trait::async_trait;
77use byteorder::{ByteOrder, LittleEndian, ReadBytesExt, WriteBytesExt};
78use fprint::TypeFingerprint;
79use fuchsia_sync::Mutex;
80use serde::{Deserialize, Serialize};
81use static_assertions::const_assert;
82use std::cmp::Ordering;
83use std::io::{Read, Write as _};
84use std::marker::PhantomData;
85use std::ops::Bound;
86use std::sync::Arc;
87
88const PERSISTENT_LAYER_MAGIC: &[u8; 8] = b"FxfsLayr";
89
90/// LayerHeader is stored in the first block of the persistent layer.
91pub type LayerHeader = LayerHeaderV39;
92
93#[derive(Debug, Serialize, Deserialize, TypeFingerprint, Versioned)]
94pub struct LayerHeaderV39 {
95    /// 'FxfsLayr'
96    magic: [u8; 8],
97    /// The block size used within this layer file. This is typically set at compaction time to the
98    /// same block size as the underlying object handle.
99    ///
100    /// (Each block starts with a 2 byte item count so there is a 64k item limit per block,
101    /// regardless of block size).
102    block_size: u64,
103}
104
105/// The last block of each layer contains metadata for the rest of the layer.
106pub type LayerInfo = LayerInfoV39;
107
108#[derive(Debug, Serialize, Deserialize, TypeFingerprint, Versioned)]
109pub struct LayerInfoV39 {
110    /// How many items are in the layer file.  Mainly used for sizing bloom filters during
111    /// compaction.
112    num_items: usize,
113    /// The number of data blocks in the layer file.
114    num_data_blocks: u64,
115    /// The size of the bloom filter in the layer file.  Not necessarily block-aligned.
116    bloom_filter_size_bytes: usize,
117    /// The seed for the nonces used in the bloom filter.
118    bloom_filter_seed: u64,
119    /// How many nonces to use for bloom filter hashing.
120    bloom_filter_num_hashes: usize,
121}
122
123/// A handle to a persistent layer.
124pub struct PersistentLayer<K, V> {
125    // We retain a reference to the underlying object handle so we can hand out references to it for
126    // `Layer::handle` when clients need it.  Internal reads should go through
127    // `caching_object_handle` so they are cached.  Note that `CachingObjectHandle` used to
128    // implement `ReadObjectHandle`, but that was removed so that `CachingObjectHandle` could hand
129    // out data references rather than requiring copying to a buffer, which speeds up LSM tree
130    // operations.
131    object_handle: Arc<dyn ReadObjectHandle>,
132    caching_object_handle: CachingObjectHandle<Arc<dyn ReadObjectHandle>>,
133    version: Version,
134    block_size: u64,
135    data_size: u64,
136    seek_table: Vec<u64>,
137    num_items: usize,
138    bloom_filter: Option<BloomFilterReader<K>>,
139    bloom_filter_stats: Option<BloomFilterStats>,
140    close_event: Mutex<Option<Arc<DropEvent>>>,
141    _value_type: PhantomData<V>,
142}
143
144#[derive(Debug)]
145struct BufferCursor {
146    chunk: Option<CachedChunk>,
147    pos: usize,
148}
149
150impl std::io::Read for BufferCursor {
151    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
152        let chunk = if let Some(chunk) = &self.chunk {
153            chunk
154        } else {
155            return Ok(0);
156        };
157        let to_read = std::cmp::min(buf.len(), chunk.len().saturating_sub(self.pos));
158        if to_read > 0 {
159            buf[..to_read].copy_from_slice(&chunk[self.pos..self.pos + to_read]);
160            self.pos += to_read;
161        }
162        Ok(to_read)
163    }
164}
165
166const MIN_BLOCK_SIZE: u64 = 512;
167
168// For small layer files, don't bother with the bloom filter.  Arbitrarily chosen.
169const MINIMUM_DATA_BLOCKS_FOR_BLOOM_FILTER: usize = 4;
170
171// How many blocks we reserve for the header.  Data blocks start at this offset.
172const NUM_HEADER_BLOCKS: u64 = 1;
173
174/// The smallest possible (empty) layer file is always 2 blocks, one for the header and one for
175/// LayerInfo.
176const MINIMUM_LAYER_FILE_BLOCKS: u64 = 2;
177
178// Put safety rails on the size of the bloom filter and seek table to avoid OOMing the system.
179// It's more likely that tampering has occurred in these cases.
180const MAX_BLOOM_FILTER_SIZE: usize = 64 * 1024 * 1024;
181const MAX_SEEK_TABLE_SIZE: usize = 64 * 1024 * 1024;
182
183// The following constants refer to sizes of metadata in the data blocks.
184const PER_DATA_BLOCK_HEADER_SIZE: usize = 2;
185const PER_DATA_BLOCK_SEEK_ENTRY_SIZE: usize = 2;
186
187// A key-only iterator, used while seeking through the tree.
188struct KeyOnlyIterator<'iter, K: Key, V: LayerValue> {
189    // Allocated out of |layer|.
190    buffer: BufferCursor,
191
192    layer: &'iter PersistentLayer<K, V>,
193
194    // The position of the _next_ block to be read.
195    pos: u64,
196
197    // The item index in the current block.
198    item_index: u16,
199
200    // The number of items in the current block.
201    item_count: u16,
202
203    // The current key.
204    key: Option<K>,
205
206    // Set by a wrapping iterator once the value has been deserialized, so the KeyOnlyIterator knows
207    // whether it is pointing at the next key or not.
208    value_deserialized: bool,
209}
210
211impl<K: Key, V: LayerValue> KeyOnlyIterator<'_, K, V> {
212    fn new<'iter>(layer: &'iter PersistentLayer<K, V>, pos: u64) -> KeyOnlyIterator<'iter, K, V> {
213        assert!(pos % layer.block_size == 0);
214        KeyOnlyIterator {
215            layer,
216            buffer: BufferCursor { chunk: None, pos: pos as usize % CHUNK_SIZE },
217            pos,
218            item_index: 0,
219            item_count: 0,
220            key: None,
221            value_deserialized: false,
222        }
223    }
224
225    // Repositions the iterator to point to the `index`'th item in the current block.
226    // Returns an error if the index is out of range or the resulting offset contains an obviously
227    // invalid value.
228    fn seek_to_block_item(&mut self, index: u16) -> Result<(), Error> {
229        ensure!(index < self.item_count, FxfsError::OutOfRange);
230        if index == self.item_index && self.value_deserialized {
231            // Fast-path when we are seeking in a linear manner, as is the case when advancing a
232            // wrapping iterator that also deserializes the values.
233            return Ok(());
234        }
235        let offset_in_block = if index == 0 {
236            // First entry isn't actually recorded, it is at the start of the block after the item
237            // count.
238            PER_DATA_BLOCK_HEADER_SIZE
239        } else {
240            let old_buffer_pos = self.buffer.pos;
241            self.buffer.pos = round_up(self.buffer.pos, self.layer.block_size as usize).unwrap()
242                - (PER_DATA_BLOCK_SEEK_ENTRY_SIZE * (usize::from(self.item_count - index)));
243            let res = self.buffer.read_u16::<LittleEndian>();
244            self.buffer.pos = old_buffer_pos;
245            let offset_in_block = res.context("Failed to read offset")? as usize;
246            if offset_in_block >= self.layer.block_size as usize
247                || offset_in_block <= PER_DATA_BLOCK_HEADER_SIZE
248            {
249                return Err(anyhow!(FxfsError::Inconsistent))
250                    .context(format!("Offset {} is out of valid range.", offset_in_block));
251            }
252            offset_in_block
253        };
254        self.item_index = index;
255        self.buffer.pos =
256            round_down(self.buffer.pos, self.layer.block_size as usize) + offset_in_block;
257        Ok(())
258    }
259
260    async fn advance(&mut self) -> Result<(), Error> {
261        if self.item_index >= self.item_count {
262            if self.pos >= self.layer.data_offset() + self.layer.data_size {
263                self.key = None;
264                return Ok(());
265            }
266            if self.buffer.chunk.is_none() || self.pos as usize % CHUNK_SIZE == 0 {
267                self.buffer.chunk = Some(
268                    self.layer
269                        .caching_object_handle
270                        .read(self.pos as usize)
271                        .await
272                        .context("Reading during advance")?,
273                );
274            }
275            self.buffer.pos = self.pos as usize % CHUNK_SIZE;
276            self.item_count = self.buffer.read_u16::<LittleEndian>()?;
277            if self.item_count == 0 {
278                bail!(
279                    "Read block with zero item count (object: {}, offset: {})",
280                    self.layer.object_handle.object_id(),
281                    self.pos
282                );
283            }
284            debug!(
285                pos = self.pos,
286                buf:? = self.buffer,
287                object_size = self.layer.data_offset() + self.layer.data_size,
288                oid = self.layer.object_handle.object_id();
289                ""
290            );
291            self.pos += self.layer.block_size;
292            self.item_index = 0;
293            self.value_deserialized = true;
294        }
295        self.seek_to_block_item(self.item_index)?;
296        self.key = Some(
297            K::deserialize_from_version(self.buffer.by_ref(), self.layer.version)
298                .context("Corrupt layer (key)")?,
299        );
300        self.item_index += 1;
301        self.value_deserialized = false;
302        Ok(())
303    }
304
305    fn get(&self) -> Option<&K> {
306        self.key.as_ref()
307    }
308}
309
310struct Iterator<'iter, K: Key, V: LayerValue> {
311    inner: KeyOnlyIterator<'iter, K, V>,
312    // The current item.
313    item: Option<Item<K, V>>,
314}
315
316impl<'iter, K: Key, V: LayerValue> Iterator<'iter, K, V> {
317    fn new(mut seek_iterator: KeyOnlyIterator<'iter, K, V>) -> Result<Self, Error> {
318        let key = std::mem::take(&mut seek_iterator.key);
319        let item = if let Some(key) = key {
320            seek_iterator.value_deserialized = true;
321            let value = V::deserialize_from_version(
322                seek_iterator.buffer.by_ref(),
323                seek_iterator.layer.version,
324            )
325            .context("Corrupt layer (value)")?;
326            if seek_iterator.layer.version.major < REMOVE_ITEM_SEQUENCE_VERSION {
327                seek_iterator.buffer.read_u64::<LittleEndian>().context("Corrupt layer (seq)")?;
328            }
329            Some(Item { key, value })
330        } else {
331            None
332        };
333        Ok(Self { inner: seek_iterator, item })
334    }
335}
336
337#[async_trait]
338impl<'iter, K: Key, V: LayerValue> LayerIterator<K, V> for Iterator<'iter, K, V> {
339    async fn advance(&mut self) -> Result<(), Error> {
340        self.inner.advance().await?;
341        let key = std::mem::take(&mut self.inner.key);
342        self.item = if let Some(key) = key {
343            self.inner.value_deserialized = true;
344            let value =
345                V::deserialize_from_version(self.inner.buffer.by_ref(), self.inner.layer.version)
346                    .context("Corrupt layer (value)")?;
347            if self.inner.layer.version.major < REMOVE_ITEM_SEQUENCE_VERSION {
348                self.inner.buffer.read_u64::<LittleEndian>().context("Corrupt layer (seq)")?;
349            }
350            Some(Item { key, value })
351        } else {
352            None
353        };
354        Ok(())
355    }
356
357    fn get(&self) -> Option<ItemRef<'_, K, V>> {
358        self.item.as_ref().map(<&Item<K, V>>::into)
359    }
360}
361
362// Returns the size of the seek table in bytes.
363fn seek_table_size(num_data_blocks: u64) -> usize {
364    // The first data block doesn't have an entry.
365    let seek_table_entries = num_data_blocks.saturating_sub(1) as usize;
366    if seek_table_entries == 0 {
367        return 0;
368    }
369    let entry_size = std::mem::size_of::<u64>();
370    seek_table_entries * entry_size
371}
372
373async fn load_seek_table(
374    object_handle: &(impl ReadObjectHandle + 'static),
375    seek_table_offset: u64,
376    num_data_blocks: u64,
377) -> Result<Vec<u64>, Error> {
378    let seek_table_size = seek_table_size(num_data_blocks);
379    if seek_table_size == 0 {
380        return Ok(vec![]);
381    }
382    if seek_table_size > MAX_SEEK_TABLE_SIZE {
383        return Err(anyhow!(FxfsError::NotSupported)).context("Seek table too large");
384    }
385    let mut buffer = object_handle.allocate_buffer(seek_table_size).await;
386    let bytes_read = object_handle
387        .read(seek_table_offset, buffer.as_mut())
388        .await
389        .context("Reading seek table blocks")?;
390    ensure!(bytes_read == seek_table_size, "Short read");
391
392    let mut seek_table = Vec::with_capacity(num_data_blocks as usize);
393    // No entry for the first data block, assume a lower bound 0.
394    seek_table.push(0);
395    let mut prev = 0;
396    for chunk in buffer.as_slice().chunks_exact(std::mem::size_of::<u64>()) {
397        let next = LittleEndian::read_u64(chunk);
398        // Should be in strict ascending order, otherwise something's broken, or we've gone off
399        // the end and we're reading zeroes.
400        if prev > next {
401            return Err(anyhow!(FxfsError::Inconsistent))
402                .context(format!("Seek table entry out of order, {:?} > {:?}", prev, next));
403        }
404        prev = next;
405        seek_table.push(next);
406    }
407    Ok(seek_table)
408}
409
410async fn load_bloom_filter<K: FuzzyHash>(
411    handle: &(impl ReadObjectHandle + 'static),
412    bloom_filter_offset: u64,
413    layer_info: &LayerInfo,
414) -> Result<Option<BloomFilterReader<K>>, Error> {
415    if layer_info.bloom_filter_size_bytes == 0 {
416        return Ok(None);
417    }
418    if layer_info.bloom_filter_size_bytes > MAX_BLOOM_FILTER_SIZE {
419        return Err(anyhow!(FxfsError::NotSupported)).context("Bloom filter too large");
420    }
421    let mut buffer = handle.allocate_buffer(layer_info.bloom_filter_size_bytes).await;
422    handle.read(bloom_filter_offset, buffer.as_mut()).await.context("Failed to read")?;
423    Ok(Some(BloomFilterReader::read(
424        buffer.as_slice(),
425        layer_info.bloom_filter_seed,
426        layer_info.bloom_filter_num_hashes,
427    )?))
428}
429
430impl<K: Key, V: LayerValue> PersistentLayer<K, V> {
431    pub async fn open(handle: impl ReadObjectHandle + 'static) -> Result<Arc<Self>, Error> {
432        let bs = handle.block_size();
433        let mut buffer = handle.allocate_buffer(bs as usize).await;
434        handle.read(0, buffer.as_mut()).await.context("Failed to read first block")?;
435        let mut cursor = std::io::Cursor::new(buffer.as_slice());
436        let version = Version::deserialize_from(&mut cursor)?;
437
438        ensure!(version <= LATEST_VERSION, FxfsError::InvalidVersion);
439        let header = LayerHeader::deserialize_from_version(&mut cursor, version)
440            .context("Failed to deserialize header")?;
441        if &header.magic != PERSISTENT_LAYER_MAGIC {
442            return Err(anyhow!(FxfsError::Inconsistent).context("Invalid layer file magic"));
443        }
444        if header.block_size == 0 || !header.block_size.is_power_of_two() {
445            return Err(anyhow!(FxfsError::Inconsistent))
446                .context(format!("Invalid block size {}", header.block_size));
447        }
448        ensure!(header.block_size > 0, FxfsError::Inconsistent);
449        ensure!(header.block_size <= MAX_BLOCK_SIZE, FxfsError::NotSupported);
450        let physical_block_size = handle.block_size();
451        if header.block_size % physical_block_size != 0 {
452            return Err(anyhow!(FxfsError::Inconsistent)).context(format!(
453                "{} not a multiple of physical block size {}",
454                header.block_size, physical_block_size
455            ));
456        }
457        std::mem::drop(cursor);
458
459        let bs = header.block_size as usize;
460        if handle.get_size() < MINIMUM_LAYER_FILE_BLOCKS * bs as u64 {
461            return Err(anyhow!(FxfsError::Inconsistent).context("Layer file too short"));
462        }
463
464        let layer_info = {
465            let last_block_offset = handle
466                .get_size()
467                .checked_sub(header.block_size)
468                .ok_or(FxfsError::Inconsistent)
469                .context("Layer file unexpectedly short")?;
470            handle
471                .read(last_block_offset, buffer.subslice_mut(0..header.block_size as usize))
472                .await
473                .context("Failed to read layer info")?;
474            let layer_info_len =
475                LittleEndian::read_u64(&buffer.as_slice()[bs - std::mem::size_of::<u64>()..]);
476            let layer_info_offset = bs
477                .checked_sub(std::mem::size_of::<u64>() + layer_info_len as usize)
478                .ok_or(FxfsError::Inconsistent)
479                .context("Invalid layer info length")?;
480            let mut cursor = std::io::Cursor::new(&buffer.as_slice()[layer_info_offset..]);
481            LayerInfo::deserialize_from_version(&mut cursor, version)
482                .context("Failed to deserialize LayerInfo")?
483        };
484        std::mem::drop(buffer);
485        if layer_info.num_items == 0 && layer_info.num_data_blocks > 0 {
486            return Err(anyhow!(FxfsError::Inconsistent))
487                .context("Invalid num_items/num_data_blocks");
488        }
489        let total_blocks = handle.get_size() / header.block_size;
490        let bloom_filter_blocks =
491            round_up(layer_info.bloom_filter_size_bytes as u64, header.block_size)
492                .unwrap_or(layer_info.bloom_filter_size_bytes as u64)
493                / header.block_size;
494        if layer_info.num_data_blocks + bloom_filter_blocks
495            > total_blocks - MINIMUM_LAYER_FILE_BLOCKS
496        {
497            return Err(anyhow!(FxfsError::Inconsistent)).context("Invalid number of blocks");
498        }
499
500        let bloom_filter_offset =
501            header.block_size * (NUM_HEADER_BLOCKS + layer_info.num_data_blocks);
502        let bloom_filter = if version == LATEST_VERSION {
503            load_bloom_filter(&handle, bloom_filter_offset, &layer_info)
504                .await
505                .context("Failed to load bloom filter")?
506        } else {
507            // Ignore the bloom filter for layer files in outdated versions.  We don't know whether
508            // keys have changed formats or not (and therefore have different hash values), so we
509            // must ignore the bloom filter and always query the layer.
510            None
511        };
512        let bloom_filter_stats = bloom_filter.as_ref().map(|b| b.stats());
513
514        let seek_offset = header.block_size
515            * (NUM_HEADER_BLOCKS + layer_info.num_data_blocks + bloom_filter_blocks);
516        let seek_table = load_seek_table(&handle, seek_offset, layer_info.num_data_blocks)
517            .await
518            .context("Failed to load seek table")?;
519
520        let object_handle = Arc::new(handle) as Arc<dyn ReadObjectHandle>;
521        let caching_object_handle = CachingObjectHandle::new(object_handle.clone());
522        Ok(Arc::new(PersistentLayer {
523            object_handle,
524            caching_object_handle,
525            version,
526            block_size: header.block_size,
527            data_size: layer_info.num_data_blocks * header.block_size,
528            seek_table,
529            num_items: layer_info.num_items,
530            bloom_filter,
531            bloom_filter_stats,
532            close_event: Mutex::new(Some(Arc::new(DropEvent::new()))),
533            _value_type: PhantomData::default(),
534        }))
535    }
536
537    /// Whether the bloom filter for the layer file is consulted or not.  If this is false, then
538    /// `maybe_contains_key` will always return true.
539    /// Note that the persistent layer file may still have a bloom filter, but it might be ignored
540    /// (e.g. for a layer file on an older version).
541    pub fn has_bloom_filter(&self) -> bool {
542        self.bloom_filter.is_some()
543    }
544
545    fn data_offset(&self) -> u64 {
546        NUM_HEADER_BLOCKS * self.block_size
547    }
548}
549
550#[async_trait]
551impl<K: Key, V: LayerValue> Layer<K, V> for PersistentLayer<K, V> {
552    fn handle(&self) -> Option<&dyn ReadObjectHandle> {
553        Some(&self.object_handle)
554    }
555
556    fn purge_cached_data(&self) {
557        self.caching_object_handle.purge();
558    }
559
560    async fn seek<'a>(&'a self, bound: Bound<&K>) -> Result<BoxedLayerIterator<'a, K, V>, Error> {
561        let (key, excluded) = match bound {
562            Bound::Unbounded => {
563                let mut iterator = Iterator::new(KeyOnlyIterator::new(self, self.data_offset()))?;
564                iterator.advance().await.context("Unbounded seek advance")?;
565                return Ok(Box::new(iterator));
566            }
567            Bound::Included(k) => (k, false),
568            Bound::Excluded(k) => (k, true),
569        };
570        let first_data_block_index = self.data_offset() / self.block_size;
571
572        let (mut left_offset, mut right_offset) = {
573            // We are searching for a range here, as multiple items can have the same value in
574            // this approximate search. Since the values used are the smallest in the associated
575            // block it means that if the value equals the target you should also search the
576            // one before it. The goal is for table[left] < target < table[right].
577            let target = key.get_leading_u64();
578            // Because the first entry in the table is always 0, right_index will never be 0.
579            let right_index = self.seek_table.as_slice().partition_point(|&x| x <= target) as u64;
580            // Since partition_point will find the index of the first place where the predicate
581            // is false, we subtract 1 to get the index where it was last true.
582            let left_index = self.seek_table.as_slice()[..right_index as usize]
583                .partition_point(|&x| x < target)
584                .saturating_sub(1) as u64;
585
586            (
587                (left_index + first_data_block_index) * self.block_size,
588                (right_index + first_data_block_index) * self.block_size,
589            )
590        };
591        let mut left = KeyOnlyIterator::new(self, left_offset);
592        left.advance().await.context("Initial seek advance")?;
593        match left.get() {
594            None => return Ok(Box::new(Iterator::new(left)?)),
595            Some(left_key) => match left_key.cmp_upper_bound(key) {
596                Ordering::Greater => return Ok(Box::new(Iterator::new(left)?)),
597                Ordering::Equal => {
598                    if excluded {
599                        left.advance().await?;
600                    }
601                    return Ok(Box::new(Iterator::new(left)?));
602                }
603                Ordering::Less => {}
604            },
605        }
606        let mut right = None;
607        while right_offset - left_offset > self.block_size {
608            // Pick a block midway.
609            let mid_offset =
610                round_down(left_offset + (right_offset - left_offset) / 2, self.block_size);
611            let mut iterator = KeyOnlyIterator::new(self, mid_offset);
612            iterator.advance().await?;
613            let iter_key: &K = iterator.get().unwrap();
614            match iter_key.cmp_upper_bound(key) {
615                Ordering::Greater => {
616                    right_offset = mid_offset;
617                    right = Some(iterator);
618                }
619                Ordering::Equal => {
620                    if excluded {
621                        iterator.advance().await?;
622                    }
623                    return Ok(Box::new(Iterator::new(iterator)?));
624                }
625                Ordering::Less => {
626                    left_offset = mid_offset;
627                    left = iterator;
628                }
629            }
630        }
631
632        // Finish the binary search on the block pointed to by `left`.
633        let mut left_index = 0;
634        let mut right_index = left.item_count;
635        // If the size is zero then we don't touch the iterator.
636        while left_index < (right_index - 1) {
637            let mid_index = left_index + ((right_index - left_index) / 2);
638            left.seek_to_block_item(mid_index).context("Read index offset for binary search")?;
639            left.advance().await?;
640            match left.get().unwrap().cmp_upper_bound(key) {
641                Ordering::Greater => {
642                    right_index = mid_index;
643                }
644                Ordering::Equal => {
645                    if excluded {
646                        left.advance().await?;
647                    }
648                    return Ok(Box::new(Iterator::new(left)?));
649                }
650                Ordering::Less => {
651                    left_index = mid_index;
652                }
653            }
654        }
655        // When we don't find an exact match, we need to return with the first entry *after* the the
656        // target key which might be the first one in the next block, currently already pointed to
657        // by the "right" buffer, but usually it's just the result of the right index within the
658        // "left" buffer.
659        if right_index < left.item_count {
660            left.seek_to_block_item(right_index)
661                .context("Read index for offset of right pointer")?;
662        } else if let Some(right) = right {
663            return Ok(Box::new(Iterator::new(right)?));
664        } else {
665            // We want the end of the layer.  `right_index == left.item_count`, so `left_index ==
666            // left.item_count - 1`, and the left iterator must be positioned on `left_index` since
667            // we cannot have gone through the `Ordering::Greater` path above because `right_index`
668            // would not be equal to `left.item_count` in that case, so all we need to do is advance
669            // the iterator.
670        }
671        left.advance().await?;
672        return Ok(Box::new(Iterator::new(left)?));
673    }
674
675    fn len(&self) -> usize {
676        self.num_items
677    }
678
679    fn maybe_contains_key(&self, key: &K) -> bool {
680        self.bloom_filter.as_ref().map_or(true, |f| f.maybe_contains(key))
681    }
682
683    async fn key_exists(&self, key: &K) -> Result<Existence, Error> {
684        match &self.bloom_filter {
685            Some(filter) => Ok(if filter.maybe_contains(key) {
686                Existence::MaybeExists
687            } else {
688                Existence::Missing
689            }),
690            None => {
691                let iter = self.seek(Bound::Included(key)).await?;
692                Ok(iter.get().map_or(Existence::Missing, |i| {
693                    if i.key.cmp_upper_bound(key).is_eq() {
694                        Existence::Exists
695                    } else {
696                        Existence::Missing
697                    }
698                }))
699            }
700        }
701    }
702
703    fn lock(&self) -> Option<Arc<DropEvent>> {
704        self.close_event.lock().clone()
705    }
706
707    async fn close(&self) {
708        let listener = self.close_event.lock().take().expect("close already called").listen();
709        listener.await;
710    }
711
712    fn get_version(&self) -> Version {
713        return self.version;
714    }
715
716    fn record_inspect_data(self: Arc<Self>, node: &fuchsia_inspect::Node) {
717        node.record_uint("num_items", self.num_items as u64);
718        node.record_bool("persistent", true);
719        node.record_uint("size", self.object_handle.get_size());
720        if let Some(stats) = self.bloom_filter_stats.as_ref() {
721            node.record_child("bloom_filter", move |node| {
722                node.record_uint("size", stats.size as u64);
723                node.record_uint("num_hashes", stats.num_hashes as u64);
724                node.record_uint("fill_percentage", stats.fill_percentage as u64);
725            });
726        }
727    }
728}
729
730// This ensures that item_count can't be overflowed below.
731const_assert!(MAX_BLOCK_SIZE <= u16::MAX as u64 + 1);
732
733// -- Writer support --
734
735pub struct PersistentLayerWriter<W: WriteBytes, K: Key, V: LayerValue> {
736    writer: W,
737    block_size: u64,
738    buf: Vec<u8>,
739    buf_item_count: LayerWriterBufItemCount,
740    item_count: usize,
741    block_offsets: Vec<u16>,
742    block_keys: Vec<u64>,
743    bloom_filter: BloomFilterWriter<K>,
744    _value: PhantomData<V>,
745}
746
747impl<W: WriteBytes, K: Key, V: LayerValue> PersistentLayerWriter<W, K, V> {
748    /// Creates a new writer that will serialize items to the object accessible via |object_handle|
749    pub async fn new(writer: W, num_items: usize, block_size: u64) -> Result<Self, Error> {
750        Self::new_with_version(writer, num_items, block_size, LATEST_VERSION).await
751    }
752
753    pub(crate) async fn new_with_version(
754        mut writer: W,
755        num_items: usize,
756        block_size: u64,
757        version: Version,
758    ) -> Result<Self, Error> {
759        ensure!(block_size <= MAX_BLOCK_SIZE, FxfsError::NotSupported);
760        ensure!(block_size >= MIN_BLOCK_SIZE, FxfsError::NotSupported);
761
762        // Write the header block.
763        let header = LayerHeader { magic: PERSISTENT_LAYER_MAGIC.clone(), block_size };
764        let mut buf = vec![0u8; block_size as usize];
765        {
766            let mut cursor = std::io::Cursor::new(&mut buf[..]);
767            version.serialize_into(&mut cursor)?;
768            header.serialize_into(&mut cursor)?;
769        }
770        writer.write_bytes(&buf[..]).await?;
771
772        let seed: u64 = rand::random();
773        Ok(Self {
774            writer,
775            block_size,
776            buf: Vec::new(),
777            buf_item_count: LayerWriterBufItemCount(0),
778            item_count: 0,
779            block_offsets: Vec::new(),
780            block_keys: Vec::new(),
781            bloom_filter: BloomFilterWriter::new(seed, num_items),
782            _value: PhantomData,
783        })
784    }
785
786    /// Writes 'buf[..len]' out as a block.
787    ///
788    /// Blocks are fixed size, consisting of a 16-bit item count, data, zero padding
789    /// and seek table at the end.
790    async fn write_block(&mut self, len: usize) -> Result<(), Error> {
791        if *self.buf_item_count == 0 {
792            return Ok(());
793        }
794        let seek_table_size = self.block_offsets.len() * PER_DATA_BLOCK_SEEK_ENTRY_SIZE;
795        assert!(PER_DATA_BLOCK_HEADER_SIZE + seek_table_size + len <= self.block_size as usize);
796        let mut cursor = std::io::Cursor::new(vec![0u8; self.block_size as usize]);
797        cursor.write_u16::<LittleEndian>(*self.buf_item_count)?;
798        cursor.write_all(self.buf.drain(..len).as_ref())?;
799        cursor.set_position(self.block_size - seek_table_size as u64);
800        // Write the seek table. Entries are 2 bytes each and items are always at least 10.
801        for &offset in &self.block_offsets {
802            cursor.write_u16::<LittleEndian>(offset)?;
803        }
804        self.writer.write_bytes(cursor.get_ref()).await?;
805        debug!(item_count = *self.buf_item_count, byte_count = len; "wrote items");
806        *self.buf_item_count = 0;
807        self.block_offsets.clear();
808        Ok(())
809    }
810
811    // Assumes the writer is positioned to a new block.
812    // Returns the size, in bytes, of the seek table.
813    // Note that the writer will be positioned to exactly the end of the seek table, not to the end
814    // of a block.
815    async fn write_seek_table(&mut self) -> Result<usize, Error> {
816        if self.block_keys.len() == 0 {
817            return Ok(0);
818        }
819        let size = self.block_keys.len() * std::mem::size_of::<u64>();
820        self.buf.resize(size, 0);
821        let mut len = 0;
822        for key in &self.block_keys {
823            LittleEndian::write_u64(&mut self.buf[len..len + std::mem::size_of::<u64>()], *key);
824            len += std::mem::size_of::<u64>();
825        }
826        self.writer.write_bytes(&self.buf).await?;
827        Ok(size)
828    }
829
830    // Assumes the writer is positioned to exactly the end of the seek table, which was
831    // `seek_table_len` bytes.
832    async fn write_info(
833        &mut self,
834        num_data_blocks: u64,
835        bloom_filter_size_bytes: usize,
836        seek_table_len: usize,
837    ) -> Result<(), Error> {
838        let block_size = self.writer.block_size() as usize;
839        let layer_info = LayerInfo {
840            num_items: self.item_count,
841            num_data_blocks,
842            bloom_filter_size_bytes,
843            bloom_filter_seed: self.bloom_filter.seed(),
844            bloom_filter_num_hashes: self.bloom_filter.num_hashes(),
845        };
846        let actual_len = {
847            let mut cursor = std::io::Cursor::new(&mut self.buf);
848            layer_info.serialize_into(&mut cursor)?;
849            let layer_info_len = cursor.position();
850            cursor.write_u64::<LittleEndian>(layer_info_len)?;
851            cursor.position() as usize
852        };
853
854        // We want the LayerInfo to be at the end of the last block.  That might require creating a
855        // new block if we don't have enough room.
856        let avail_in_block = block_size - (seek_table_len % block_size);
857        let to_skip = if avail_in_block < actual_len {
858            block_size + avail_in_block - actual_len
859        } else {
860            avail_in_block - actual_len
861        } as u64;
862        self.writer.skip(to_skip).await?;
863        self.writer.write_bytes(&self.buf[..actual_len]).await?;
864        Ok(())
865    }
866
867    // Assumes the writer is positioned to a new block.
868    // Returns the size of the bloom filter, in bytes.
869    async fn write_bloom_filter(&mut self) -> Result<usize, Error> {
870        if self.data_blocks() < MINIMUM_DATA_BLOCKS_FOR_BLOOM_FILTER {
871            return Ok(0);
872        }
873        // TODO(https://fxbug.dev/323571978): Avoid bounce-buffering.
874        let size = round_up(self.bloom_filter.serialized_size(), self.block_size as usize).unwrap();
875        self.buf.resize(size, 0);
876        let mut cursor = std::io::Cursor::new(&mut self.buf);
877        self.bloom_filter.write(&mut cursor)?;
878        self.writer.write_bytes(&self.buf).await?;
879        Ok(self.bloom_filter.serialized_size())
880    }
881
882    // Returns the bloom filter writer. Intended to be used for testing purposes, e.g., gain access
883    // to the bloom filter to then corrupt it.
884    #[cfg(test)]
885    pub(crate) fn bloom_filter(&mut self) -> &mut BloomFilterWriter<K> {
886        &mut self.bloom_filter
887    }
888
889    fn data_blocks(&self) -> usize {
890        if self.item_count == 0 { 0 } else { self.block_keys.len() + 1 }
891    }
892}
893
894impl<W: WriteBytes + Send, K: Key, V: LayerValue> LayerWriter<K, V>
895    for PersistentLayerWriter<W, K, V>
896{
897    async fn write(&mut self, item: ItemRef<'_, K, V>) -> Result<(), Error> {
898        // Note the length before we write this item.
899        let len = self.buf.len();
900        item.key.serialize_into(&mut self.buf)?;
901        item.value.serialize_into(&mut self.buf)?;
902
903        let mut added_offset = false;
904        // Never record the first item. The offset is always the same.
905        if *self.buf_item_count > 0 {
906            self.block_offsets.push(u16::try_from(len + PER_DATA_BLOCK_HEADER_SIZE).unwrap());
907            added_offset = true;
908        }
909
910        // If writing the item took us over a block, flush the bytes in the buffer prior to this
911        // item.
912        if PER_DATA_BLOCK_HEADER_SIZE
913            + self.buf.len()
914            + (self.block_offsets.len() * PER_DATA_BLOCK_SEEK_ENTRY_SIZE)
915            > self.block_size as usize - 1
916        {
917            if added_offset {
918                // Drop the recently added offset from the list. The latest item will be the first
919                // on the next block and have a known offset there.
920                self.block_offsets.pop();
921            }
922            self.write_block(len).await?;
923
924            // Note that this will not insert an entry for the first data block.
925            self.block_keys.push(item.key.get_leading_u64());
926        }
927
928        self.bloom_filter.insert(&item.key);
929        *self.buf_item_count += 1;
930        self.item_count += 1;
931        Ok(())
932    }
933
934    async fn complete(mut self) -> Result<u64, Error> {
935        self.write_block(self.buf.len()).await?;
936        let data_blocks = self.data_blocks() as u64;
937        let bloom_filter_len = self.write_bloom_filter().await?;
938        let seek_table_len = self.write_seek_table().await?;
939        self.write_info(data_blocks, bloom_filter_len, seek_table_len).await?;
940        self.writer.complete().await
941    }
942}
943
944/// Logs a warning if this object is dropped and the contained value isn't 0.
945#[repr(transparent)]
946struct LayerWriterBufItemCount(u16);
947
948impl Drop for LayerWriterBufItemCount {
949    fn drop(&mut self) {
950        debug_assert!(self.0 == 0, "Dropping unwritten items; did you forget to call complete?");
951        if self.0 > 0 {
952            warn!("Dropping unwritten items; did you forget to call complete?");
953        }
954    }
955}
956
957impl std::ops::Deref for LayerWriterBufItemCount {
958    type Target = u16;
959    fn deref(&self) -> &u16 {
960        &self.0
961    }
962}
963
964impl std::ops::DerefMut for LayerWriterBufItemCount {
965    fn deref_mut(&mut self) -> &mut u16 {
966        &mut self.0
967    }
968}
969
970#[cfg(test)]
971mod tests {
972    use super::{PersistentLayer, PersistentLayerWriter};
973    use crate::filesystem::MAX_BLOCK_SIZE;
974    use crate::lsm_tree::LayerIterator;
975    use crate::lsm_tree::persistent_layer::MINIMUM_DATA_BLOCKS_FOR_BLOOM_FILTER;
976    use crate::lsm_tree::types::{Existence, Item, ItemRef, Layer, LayerWriter, OrdUpperBound};
977    use crate::object_handle::WriteBytes;
978    use crate::object_store::AttributeId;
979    use crate::object_store::object_record::ObjectKey;
980    use crate::round::round_up;
981    use crate::serialized_types::{LATEST_VERSION, Version};
982    use crate::testing::fake_object::{FakeObject, FakeObjectHandle};
983    use crate::testing::writer::Writer;
984
985    use std::fmt::Debug;
986
987    use std::ops::{Bound, Range};
988    use std::sync::Arc;
989
990    impl<W: WriteBytes> Debug for PersistentLayerWriter<W, i32, i32> {
991        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
992            f.debug_struct("rPersistentLayerWriter")
993                .field("block_size", &self.block_size)
994                .field("item_count", &*self.buf_item_count)
995                .finish()
996        }
997    }
998
999    #[fuchsia::test]
1000    async fn test_iterate_after_write() {
1001        const BLOCK_SIZE: u64 = 512;
1002        const ITEM_COUNT: i32 = 10000;
1003
1004        let handle = FakeObjectHandle::new(Arc::new(FakeObject::new()));
1005        {
1006            let mut writer = PersistentLayerWriter::<_, i32, i32>::new(
1007                Writer::new(&handle).await,
1008                ITEM_COUNT as usize * 4,
1009                BLOCK_SIZE,
1010            )
1011            .await
1012            .expect("writer new");
1013            for i in 0..ITEM_COUNT {
1014                writer.write(Item::new(i, i).as_item_ref()).await.expect("write failed");
1015            }
1016            writer.complete().await.expect("flush failed");
1017        }
1018        let layer = PersistentLayer::<i32, i32>::open(handle).await.expect("new failed");
1019        let mut iterator = layer.seek(Bound::Unbounded).await.expect("seek failed");
1020        for i in 0..ITEM_COUNT {
1021            let ItemRef { key, value, .. } = iterator.get().expect("missing item");
1022            assert_eq!((key, value), (&i, &i));
1023            iterator.advance().await.expect("failed to advance");
1024        }
1025        assert!(iterator.get().is_none());
1026    }
1027
1028    #[fuchsia::test]
1029    async fn test_seek_after_write() {
1030        const BLOCK_SIZE: u64 = 512;
1031        const ITEM_COUNT: i32 = 5000;
1032
1033        let handle = FakeObjectHandle::new(Arc::new(FakeObject::new()));
1034        {
1035            let mut writer = PersistentLayerWriter::<_, i32, i32>::new(
1036                Writer::new(&handle).await,
1037                ITEM_COUNT as usize * 18,
1038                BLOCK_SIZE,
1039            )
1040            .await
1041            .expect("writer new");
1042            for i in 0..ITEM_COUNT {
1043                // Populate every other value as an item.
1044                writer.write(Item::new(i * 2, i * 2).as_item_ref()).await.expect("write failed");
1045            }
1046            writer.complete().await.expect("flush failed");
1047        }
1048        let layer = PersistentLayer::<i32, i32>::open(handle).await.expect("new failed");
1049        // Search for all values to check the in-between values.
1050        for i in 0..ITEM_COUNT * 2 {
1051            // We've written every other value, we expect to get either the exact value searched
1052            // for, or the next one after it. So round up to the nearest multiple of 2.
1053            let expected = round_up(i, 2).unwrap();
1054            let mut iterator = layer.seek(Bound::Included(&i)).await.expect("failed to seek");
1055            // We've written values up to (N-1)*2=2*N-2, so when looking for 2*N-1 we'll go off the
1056            // end of the layer and get back no item.
1057            if i >= (ITEM_COUNT * 2) - 1 {
1058                assert!(iterator.get().is_none());
1059            } else {
1060                let ItemRef { key, value, .. } = iterator.get().expect("missing item");
1061                assert_eq!((key, value), (&expected, &expected));
1062            }
1063
1064            // Check that we can advance to the next item.
1065            iterator.advance().await.expect("failed to advance");
1066            // The highest value is 2*N-2, searching for 2*N-3 will find the last value, and
1067            // advancing will go off the end of the layer and return no item. If there was
1068            // previously no item, then it will latch and always return no item.
1069            if i >= (ITEM_COUNT * 2) - 3 {
1070                assert!(iterator.get().is_none());
1071            } else {
1072                let ItemRef { key, value, .. } = iterator.get().expect("missing item");
1073                let next = expected + 2;
1074                assert_eq!((key, value), (&next, &next));
1075            }
1076        }
1077    }
1078
1079    #[fuchsia::test]
1080    async fn test_seek_unbounded() {
1081        const BLOCK_SIZE: u64 = 512;
1082        const ITEM_COUNT: i32 = 1000;
1083
1084        let handle = FakeObjectHandle::new(Arc::new(FakeObject::new()));
1085        {
1086            let mut writer = PersistentLayerWriter::<_, i32, i32>::new(
1087                Writer::new(&handle).await,
1088                ITEM_COUNT as usize * 18,
1089                BLOCK_SIZE,
1090            )
1091            .await
1092            .expect("writer new");
1093            for i in 0..ITEM_COUNT {
1094                writer.write(Item::new(i, i).as_item_ref()).await.expect("write failed");
1095            }
1096            writer.complete().await.expect("flush failed");
1097        }
1098        let layer = PersistentLayer::<i32, i32>::open(handle).await.expect("new failed");
1099        let mut iterator = layer.seek(Bound::Unbounded).await.expect("failed to seek");
1100        let ItemRef { key, value, .. } = iterator.get().expect("missing item");
1101        assert_eq!((key, value), (&0, &0));
1102
1103        // Check that we can advance to the next item.
1104        iterator.advance().await.expect("failed to advance");
1105        let ItemRef { key, value, .. } = iterator.get().expect("missing item");
1106        assert_eq!((key, value), (&1, &1));
1107    }
1108
1109    #[fuchsia::test]
1110    async fn test_zero_items() {
1111        const BLOCK_SIZE: u64 = 512;
1112
1113        let handle = FakeObjectHandle::new(Arc::new(FakeObject::new()));
1114        {
1115            let writer = PersistentLayerWriter::<_, i32, i32>::new(
1116                Writer::new(&handle).await,
1117                0,
1118                BLOCK_SIZE,
1119            )
1120            .await
1121            .expect("writer new");
1122            writer.complete().await.expect("flush failed");
1123        }
1124
1125        let layer = PersistentLayer::<i32, i32>::open(handle).await.expect("new failed");
1126        let iterator = (layer.as_ref() as &dyn Layer<i32, i32>)
1127            .seek(Bound::Unbounded)
1128            .await
1129            .expect("seek failed");
1130        assert!(iterator.get().is_none())
1131    }
1132
1133    #[fuchsia::test]
1134    async fn test_one_item() {
1135        const BLOCK_SIZE: u64 = 512;
1136
1137        let handle = FakeObjectHandle::new(Arc::new(FakeObject::new()));
1138        {
1139            let mut writer = PersistentLayerWriter::<_, i32, i32>::new(
1140                Writer::new(&handle).await,
1141                1,
1142                BLOCK_SIZE,
1143            )
1144            .await
1145            .expect("writer new");
1146            writer.write(Item::new(42, 42).as_item_ref()).await.expect("write failed");
1147            writer.complete().await.expect("flush failed");
1148        }
1149
1150        let layer = PersistentLayer::<i32, i32>::open(handle).await.expect("new failed");
1151        {
1152            let mut iterator = (layer.as_ref() as &dyn Layer<i32, i32>)
1153                .seek(Bound::Unbounded)
1154                .await
1155                .expect("seek failed");
1156            let ItemRef { key, value, .. } = iterator.get().expect("missing item");
1157            assert_eq!((key, value), (&42, &42));
1158            iterator.advance().await.expect("failed to advance");
1159            assert!(iterator.get().is_none())
1160        }
1161        {
1162            let mut iterator = (layer.as_ref() as &dyn Layer<i32, i32>)
1163                .seek(Bound::Included(&30))
1164                .await
1165                .expect("seek failed");
1166            let ItemRef { key, value, .. } = iterator.get().expect("missing item");
1167            assert_eq!((key, value), (&42, &42));
1168            iterator.advance().await.expect("failed to advance");
1169            assert!(iterator.get().is_none())
1170        }
1171        {
1172            let mut iterator = (layer.as_ref() as &dyn Layer<i32, i32>)
1173                .seek(Bound::Included(&42))
1174                .await
1175                .expect("seek failed");
1176            let ItemRef { key, value, .. } = iterator.get().expect("missing item");
1177            assert_eq!((key, value), (&42, &42));
1178            iterator.advance().await.expect("failed to advance");
1179            assert!(iterator.get().is_none())
1180        }
1181        {
1182            let iterator = (layer.as_ref() as &dyn Layer<i32, i32>)
1183                .seek(Bound::Included(&43))
1184                .await
1185                .expect("seek failed");
1186            assert!(iterator.get().is_none())
1187        }
1188    }
1189
1190    #[fuchsia::test]
1191    async fn test_large_block_size() {
1192        // At the upper end of the supported size.
1193        const BLOCK_SIZE: u64 = MAX_BLOCK_SIZE;
1194        // Items will be 18 bytes, so fill up a few pages.
1195        const ITEM_COUNT: i32 = ((BLOCK_SIZE as i32) / 18) * 3;
1196
1197        let handle =
1198            FakeObjectHandle::new_with_block_size(Arc::new(FakeObject::new()), BLOCK_SIZE as usize);
1199        {
1200            let mut writer = PersistentLayerWriter::<_, i32, i32>::new(
1201                Writer::new(&handle).await,
1202                ITEM_COUNT as usize * 18,
1203                BLOCK_SIZE,
1204            )
1205            .await
1206            .expect("writer new");
1207            // Use large values to force varint encoding to use consistent space.
1208            for i in 2000000000..(2000000000 + ITEM_COUNT) {
1209                writer.write(Item::new(i, i).as_item_ref()).await.expect("write failed");
1210            }
1211            writer.complete().await.expect("flush failed");
1212        }
1213
1214        let layer = PersistentLayer::<i32, i32>::open(handle).await.expect("new failed");
1215        let mut iterator = layer.seek(Bound::Unbounded).await.expect("seek failed");
1216        for i in 2000000000..(2000000000 + ITEM_COUNT) {
1217            let ItemRef { key, value, .. } = iterator.get().expect("missing item");
1218            assert_eq!((key, value), (&i, &i));
1219            iterator.advance().await.expect("failed to advance");
1220        }
1221        assert!(iterator.get().is_none());
1222    }
1223
1224    #[fuchsia::test]
1225    async fn test_overlarge_block_size() {
1226        // At the upper end of the supported size.
1227        const BLOCK_SIZE: u64 = MAX_BLOCK_SIZE * 2;
1228
1229        let handle =
1230            FakeObjectHandle::new_with_block_size(Arc::new(FakeObject::new()), BLOCK_SIZE as usize);
1231        PersistentLayerWriter::<_, i32, i32>::new(Writer::new(&handle).await, 0, BLOCK_SIZE)
1232            .await
1233            .expect_err("Creating writer with overlarge block size.");
1234    }
1235
1236    #[fuchsia::test]
1237    async fn test_seek_bound_excluded() {
1238        const BLOCK_SIZE: u64 = 512;
1239        const ITEM_COUNT: i32 = 10000;
1240
1241        let handle = FakeObjectHandle::new(Arc::new(FakeObject::new()));
1242        {
1243            let mut writer = PersistentLayerWriter::<_, i32, i32>::new(
1244                Writer::new(&handle).await,
1245                ITEM_COUNT as usize * 18,
1246                BLOCK_SIZE,
1247            )
1248            .await
1249            .expect("writer new");
1250            for i in 0..ITEM_COUNT {
1251                writer.write(Item::new(i, i).as_item_ref()).await.expect("write failed");
1252            }
1253            writer.complete().await.expect("flush failed");
1254        }
1255        let layer = PersistentLayer::<i32, i32>::open(handle).await.expect("new failed");
1256
1257        for i in 9982..ITEM_COUNT {
1258            let mut iterator = layer.seek(Bound::Excluded(&i)).await.expect("failed to seek");
1259            let i_plus_one = i + 1;
1260            if i_plus_one < ITEM_COUNT {
1261                let ItemRef { key, value, .. } = iterator.get().expect("missing item");
1262
1263                assert_eq!((key, value), (&i_plus_one, &i_plus_one));
1264
1265                // Check that we can advance to the next item.
1266                iterator.advance().await.expect("failed to advance");
1267                let i_plus_two = i + 2;
1268                if i_plus_two < ITEM_COUNT {
1269                    let ItemRef { key, value, .. } = iterator.get().expect("missing item");
1270                    assert_eq!((key, value), (&i_plus_two, &i_plus_two));
1271                } else {
1272                    assert!(iterator.get().is_none());
1273                }
1274            } else {
1275                assert!(iterator.get().is_none());
1276            }
1277        }
1278    }
1279
1280    use crate::lsm_tree::testing::TestKey;
1281
1282    /// Generates extent records for a given object_id (of size 1).
1283    /// This produces a series of records with the same leading_u64.
1284    /// Returns the generated items and the next available object_id.
1285    fn generate_extents(
1286        object_id: u64,
1287        base_offset: u64,
1288        count: u64,
1289    ) -> (Vec<Item<ObjectKey, u64>>, u64) {
1290        let mut items = Vec::new();
1291        for i in 0..count {
1292            items.push(Item::new(
1293                ObjectKey::extent(
1294                    object_id,
1295                    AttributeId::TEST_ID,
1296                    base_offset + i..base_offset + i + 1,
1297                ),
1298                object_id,
1299            ));
1300        }
1301        (items, object_id + 1)
1302    }
1303
1304    /// Generates objects object_ids over a range.
1305    /// This produced a series of records with unique leading_u64.
1306    /// Returns the generated items and the next available value for sequencing.
1307    fn generate_objects(object_id_range: Range<u64>) -> (Vec<Item<ObjectKey, u64>>, u64) {
1308        let mut items = Vec::new();
1309        let end = object_id_range.end;
1310        for object_id in object_id_range {
1311            items.push(Item::new(ObjectKey::object(object_id), object_id));
1312        }
1313        (items, end)
1314    }
1315
1316    // Create a large spread of data across several blocks to ensure that no part of the range is
1317    // lost by the partial search using the layer seek table.
1318    #[fuchsia::test]
1319    async fn test_block_seek_duplicate_leading_u64() {
1320        // At the upper end of the supported size.
1321        const BLOCK_SIZE: u64 = 512;
1322        const ITEMS_PER_PHASE: u64 = 50;
1323
1324        let mut to_find = Vec::new();
1325
1326        let handle =
1327            FakeObjectHandle::new_with_block_size(Arc::new(FakeObject::new()), BLOCK_SIZE as usize);
1328        {
1329            let mut items = Vec::new();
1330            // Make all values take up maximum space for varint encoding.
1331            let mut object_id = u32::MAX as u64 + 1;
1332
1333            // First fill the front with duplicate object IDs, then look at the start,
1334            // middle and end of the range.
1335            {
1336                let base_extent_offset = 0;
1337                let (mut generated, next_object_id) =
1338                    generate_extents(object_id, base_extent_offset, ITEMS_PER_PHASE * 3);
1339                items.append(&mut generated);
1340                let count = ITEMS_PER_PHASE * 3;
1341                to_find.push(ObjectKey::extent(
1342                    object_id,
1343                    AttributeId::TEST_ID,
1344                    base_extent_offset..base_extent_offset + 1,
1345                ));
1346                to_find.push(ObjectKey::extent(
1347                    object_id,
1348                    AttributeId::TEST_ID,
1349                    base_extent_offset + (count / 2)..base_extent_offset + (count / 2) + 1,
1350                ));
1351                to_find.push(ObjectKey::extent(
1352                    object_id,
1353                    AttributeId::TEST_ID,
1354                    base_extent_offset + (count - 1)..base_extent_offset + count,
1355                ));
1356                object_id = next_object_id;
1357            }
1358
1359            // Add some filler of all different leading u64.
1360            {
1361                let (mut generated, next_object_id) =
1362                    generate_objects(object_id..object_id + ITEMS_PER_PHASE * 3);
1363                items.append(&mut generated);
1364                object_id = next_object_id;
1365            }
1366
1367            // Fill the middle with duplicate object IDs, then look at the start,
1368            // middle and end of the range.
1369            {
1370                let base_extent_offset = 1000;
1371                let (mut generated, next_object_id) =
1372                    generate_extents(object_id, base_extent_offset, ITEMS_PER_PHASE * 3);
1373                items.append(&mut generated);
1374                let count = ITEMS_PER_PHASE * 3;
1375                to_find.push(ObjectKey::extent(
1376                    object_id,
1377                    AttributeId::TEST_ID,
1378                    base_extent_offset..base_extent_offset + 1,
1379                ));
1380                to_find.push(ObjectKey::extent(
1381                    object_id,
1382                    AttributeId::TEST_ID,
1383                    base_extent_offset + (count / 2)..base_extent_offset + (count / 2) + 1,
1384                ));
1385                to_find.push(ObjectKey::extent(
1386                    object_id,
1387                    AttributeId::TEST_ID,
1388                    base_extent_offset + (count - 1)..base_extent_offset + count,
1389                ));
1390                object_id = next_object_id;
1391            }
1392
1393            // Add some filler of all different leading u64.
1394            {
1395                let (mut generated, next_object_id) =
1396                    generate_objects(object_id..object_id + ITEMS_PER_PHASE * 3);
1397                items.append(&mut generated);
1398                object_id = next_object_id;
1399            }
1400
1401            // Fill the end with duplicate object IDs, then look at the start,
1402            // middle and end of the range.
1403            {
1404                let base_extent_offset = 2000;
1405                let (mut generated, _) =
1406                    generate_extents(object_id, base_extent_offset, ITEMS_PER_PHASE * 3);
1407                items.append(&mut generated);
1408                let count = ITEMS_PER_PHASE * 3;
1409                to_find.push(ObjectKey::extent(
1410                    object_id,
1411                    AttributeId::TEST_ID,
1412                    base_extent_offset..base_extent_offset + 1,
1413                ));
1414                to_find.push(ObjectKey::extent(
1415                    object_id,
1416                    AttributeId::TEST_ID,
1417                    base_extent_offset + (count / 2)..base_extent_offset + (count / 2) + 1,
1418                ));
1419                to_find.push(ObjectKey::extent(
1420                    object_id,
1421                    AttributeId::TEST_ID,
1422                    base_extent_offset + (count - 1)..base_extent_offset + count,
1423                ));
1424            }
1425
1426            // Sort items by cmp_upper_bound!
1427            items.sort_by(|a, b| a.key.cmp_upper_bound(&b.key));
1428
1429            let mut writer = PersistentLayerWriter::<_, ObjectKey, u64>::new(
1430                Writer::new(&handle).await,
1431                3 * BLOCK_SIZE as usize,
1432                BLOCK_SIZE,
1433            )
1434            .await
1435            .expect("writer new");
1436
1437            for item in items {
1438                writer.write(item.as_item_ref()).await.expect("write failed");
1439            }
1440
1441            writer.complete().await.expect("flush failed");
1442        }
1443
1444        let layer = PersistentLayer::<ObjectKey, u64>::open(handle).await.expect("new failed");
1445        for target in to_find {
1446            let iterator: Box<dyn LayerIterator<ObjectKey, u64>> =
1447                layer.seek(Bound::Included(&target)).await.expect("failed to seek");
1448            let ItemRef { key, .. } = iterator.get().expect("missing item");
1449            assert_eq!(&target, key);
1450        }
1451    }
1452
1453    #[fuchsia::test]
1454    async fn test_two_seek_blocks() {
1455        // At the upper end of the supported size.
1456        const BLOCK_SIZE: u64 = 512;
1457        const ITEMS_PER_PHASE: u64 = 50;
1458        const ITEM_COUNT: u64 = ITEMS_PER_PHASE * ((BLOCK_SIZE / 8) + 2);
1459
1460        let mut to_find = Vec::new();
1461
1462        let handle =
1463            FakeObjectHandle::new_with_block_size(Arc::new(FakeObject::new()), BLOCK_SIZE as usize);
1464        {
1465            let mut writer = PersistentLayerWriter::<_, TestKey, u64>::new(
1466                Writer::new(&handle).await,
1467                ITEM_COUNT as usize * 18,
1468                BLOCK_SIZE,
1469            )
1470            .await
1471            .expect("writer new");
1472
1473            // Make all values take up maximum space for varint encoding.
1474            let initial_value = u32::MAX as u64 + 1;
1475            for i in 0..ITEM_COUNT {
1476                writer
1477                    .write(
1478                        Item::new(TestKey(initial_value + i..initial_value + i), initial_value)
1479                            .as_item_ref(),
1480                    )
1481                    .await
1482                    .expect("write failed");
1483            }
1484            // Look at the start middle and end.
1485            to_find.push(TestKey(initial_value..initial_value));
1486            let middle = initial_value + ITEM_COUNT / 2;
1487            to_find.push(TestKey(middle..middle));
1488            let end = initial_value + ITEM_COUNT - 1;
1489            to_find.push(TestKey(end..end));
1490
1491            writer.complete().await.expect("flush failed");
1492        }
1493
1494        let layer = PersistentLayer::<TestKey, u64>::open(handle).await.expect("new failed");
1495        for target in to_find {
1496            let iterator: Box<dyn LayerIterator<TestKey, u64>> =
1497                layer.seek(Bound::Included(&target)).await.expect("failed to seek");
1498            let ItemRef { key, .. } = iterator.get().expect("missing item");
1499            assert_eq!(&target, key);
1500        }
1501    }
1502
1503    // Verifies behaviour around creating full seek blocks, to ensure that it is able to be opened
1504    // and parsed afterward.
1505    #[fuchsia::test]
1506    async fn test_full_seek_block() {
1507        const BLOCK_SIZE: u64 = 512;
1508        const ITEMS_PER_PHASE: u64 = 50;
1509
1510        // How many entries there are in a seek table block.
1511        const SEEK_TABLE_ENTRIES: u64 = BLOCK_SIZE / 8;
1512
1513        // Number of entries to fill a seek block would need one more block of entries, but we're
1514        // starting low here on purpose to do a range and make sure we hit the size we are
1515        // interested in.
1516        const START_ENTRIES_COUNT: u64 = ITEMS_PER_PHASE * SEEK_TABLE_ENTRIES;
1517
1518        for entries in START_ENTRIES_COUNT..START_ENTRIES_COUNT + (ITEMS_PER_PHASE * 2) {
1519            let handle = FakeObjectHandle::new_with_block_size(
1520                Arc::new(FakeObject::new()),
1521                BLOCK_SIZE as usize,
1522            );
1523            {
1524                let mut writer = PersistentLayerWriter::<_, TestKey, u64>::new(
1525                    Writer::new(&handle).await,
1526                    entries as usize,
1527                    BLOCK_SIZE,
1528                )
1529                .await
1530                .expect("writer new");
1531
1532                // Make all values take up maximum space for varint encoding.
1533                let initial_value = u32::MAX as u64 + 1;
1534                for i in 0..entries {
1535                    writer
1536                        .write(
1537                            Item::new(TestKey(initial_value + i..initial_value + i), initial_value)
1538                                .as_item_ref(),
1539                        )
1540                        .await
1541                        .expect("write failed");
1542                }
1543
1544                writer.complete().await.expect("flush failed");
1545            }
1546            PersistentLayer::<TestKey, u64>::open(handle).await.expect("new failed");
1547        }
1548    }
1549
1550    #[fuchsia::test]
1551    async fn test_ignore_bloom_filter_on_older_versions() {
1552        const BLOCK_SIZE: u64 = 512;
1553        const ITEMS_PER_PHASE: u64 = 50;
1554        // Add enough items to create enough blocks for a bloom filter to be necessary.
1555        const ITEM_COUNT: u64 = (1 + MINIMUM_DATA_BLOCKS_FOR_BLOOM_FILTER as u64) * ITEMS_PER_PHASE;
1556
1557        let old_version_handle =
1558            FakeObjectHandle::new_with_block_size(Arc::new(FakeObject::new()), BLOCK_SIZE as usize);
1559        let current_version_handle =
1560            FakeObjectHandle::new_with_block_size(Arc::new(FakeObject::new()), BLOCK_SIZE as usize);
1561        {
1562            let mut old_version_writer =
1563                PersistentLayerWriter::<_, TestKey, u64>::new_with_version(
1564                    Writer::new(&old_version_handle).await,
1565                    ITEM_COUNT as usize,
1566                    BLOCK_SIZE,
1567                    Version { major: LATEST_VERSION.major - 1, minor: 0 },
1568                )
1569                .await
1570                .expect("writer new");
1571            let mut current_version_writer = PersistentLayerWriter::<_, TestKey, u64>::new(
1572                Writer::new(&current_version_handle).await,
1573                ITEM_COUNT as usize,
1574                BLOCK_SIZE,
1575            )
1576            .await
1577            .expect("writer new");
1578
1579            // Make all values take up maximum space for varint encoding.
1580            let initial_value = u32::MAX as u64 + 1;
1581            for i in 0..ITEM_COUNT {
1582                old_version_writer
1583                    .write(
1584                        Item::new(TestKey(initial_value + i..initial_value + i), initial_value)
1585                            .as_item_ref(),
1586                    )
1587                    .await
1588                    .expect("write failed");
1589                current_version_writer
1590                    .write(
1591                        Item::new(TestKey(initial_value + i..initial_value + i), initial_value)
1592                            .as_item_ref(),
1593                    )
1594                    .await
1595                    .expect("write failed");
1596            }
1597
1598            old_version_writer.complete().await.expect("flush failed");
1599            current_version_writer.complete().await.expect("flush failed");
1600        }
1601
1602        let old_layer =
1603            PersistentLayer::<TestKey, u64>::open(old_version_handle).await.expect("open failed");
1604        let current_layer = PersistentLayer::<TestKey, u64>::open(current_version_handle)
1605            .await
1606            .expect("open failed");
1607        assert!(!old_layer.has_bloom_filter());
1608        assert!(current_layer.has_bloom_filter());
1609    }
1610
1611    #[fuchsia::test]
1612    async fn test_key_exists_no_bloom_filter() {
1613        const BLOCK_SIZE: u64 = 8192;
1614        // Not enough items to trigger a bloom filter.
1615        const ITEM_COUNT: i32 = 100;
1616
1617        let handle =
1618            FakeObjectHandle::new_with_block_size(Arc::new(FakeObject::new()), BLOCK_SIZE as usize);
1619        {
1620            let mut writer = PersistentLayerWriter::<_, i32, i32>::new(
1621                Writer::new(&handle).await,
1622                ITEM_COUNT as usize,
1623                BLOCK_SIZE,
1624            )
1625            .await
1626            .expect("writer new");
1627            for i in 0..ITEM_COUNT {
1628                writer.write(Item::new(i * 2, i * 2).as_item_ref()).await.expect("write failed");
1629            }
1630            writer.complete().await.expect("flush failed");
1631        }
1632        let layer = PersistentLayer::<i32, i32>::open(handle).await.expect("new failed");
1633        assert!(!layer.has_bloom_filter());
1634
1635        for i in 0..ITEM_COUNT {
1636            assert_eq!(
1637                layer.key_exists(&(i * 2)).await.expect("key_exists failed"),
1638                Existence::Exists
1639            );
1640            assert_eq!(
1641                layer.key_exists(&(i * 2 + 1)).await.expect("key_exists failed"),
1642                Existence::Missing
1643            );
1644        }
1645    }
1646
1647    #[fuchsia::test]
1648    async fn test_key_exists_with_bloom_filter() {
1649        const BLOCK_SIZE: u64 = 512;
1650        // Enough items to trigger a bloom filter.
1651        const ITEM_COUNT: i32 = 10000;
1652
1653        let handle = FakeObjectHandle::new(Arc::new(FakeObject::new()));
1654        {
1655            let mut writer = PersistentLayerWriter::<_, i32, i32>::new(
1656                Writer::new(&handle).await,
1657                ITEM_COUNT as usize,
1658                BLOCK_SIZE,
1659            )
1660            .await
1661            .expect("writer new");
1662            for i in 0..ITEM_COUNT {
1663                writer.write(Item::new(i * 2, i * 2).as_item_ref()).await.expect("write failed");
1664            }
1665            writer.complete().await.expect("flush failed");
1666        }
1667        let layer = PersistentLayer::<i32, i32>::open(handle).await.expect("new failed");
1668        assert!(layer.has_bloom_filter());
1669
1670        for i in 0..ITEM_COUNT {
1671            // With a bloom filter, we expect MaybeExists for present keys.
1672            assert_eq!(
1673                layer.key_exists(&(i * 2)).await.expect("key_exists failed"),
1674                Existence::MaybeExists
1675            );
1676        }
1677
1678        // For missing keys, we expect Missing, but might get MaybeExists due to false positives.
1679        // We can at least assert it's NOT Exists.
1680        let mut missing_count = 0;
1681        for i in 0..ITEM_COUNT {
1682            let result = layer.key_exists(&(i * 2 + 1)).await.expect("key_exists failed");
1683            assert_ne!(result, Existence::Exists);
1684            if result == Existence::Missing {
1685                missing_count += 1;
1686            }
1687        }
1688        // We expect mostly Missing.
1689        assert!(missing_count > ITEM_COUNT / 2);
1690    }
1691}