1use crate::drop_event::DropEvent;
61use crate::errors::FxfsError;
62use crate::filesystem::MAX_BLOCK_SIZE;
63use crate::log::*;
64use crate::lsm_tree::bloom_filter::{BloomFilterReader, BloomFilterStats, BloomFilterWriter};
65use crate::lsm_tree::types::{
66 BoxedLayerIterator, Existence, FuzzyHash, Item, ItemRef, Key, Layer, LayerIterator, LayerValue,
67 LayerWriter,
68};
69use crate::object_handle::{ObjectHandle, ReadObjectHandle, WriteBytes};
70use crate::object_store::caching_object_handle::{CHUNK_SIZE, CachedChunk, CachingObjectHandle};
71use crate::round::{round_down, round_up};
72use crate::serialized_types::{LATEST_VERSION, Version, Versioned, VersionedLatest};
73use anyhow::{Context, Error, anyhow, bail, ensure};
74use async_trait::async_trait;
75use byteorder::{ByteOrder, LittleEndian, ReadBytesExt, WriteBytesExt};
76use fprint::TypeFingerprint;
77use fuchsia_sync::Mutex;
78use serde::{Deserialize, Serialize};
79use static_assertions::const_assert;
80use std::cmp::Ordering;
81use std::io::{Read, Write as _};
82use std::marker::PhantomData;
83use std::ops::Bound;
84use std::sync::Arc;
85
86const PERSISTENT_LAYER_MAGIC: &[u8; 8] = b"FxfsLayr";
87
88pub type LayerHeader = LayerHeaderV39;
90
91#[derive(Debug, Serialize, Deserialize, TypeFingerprint, Versioned)]
92pub struct LayerHeaderV39 {
93 magic: [u8; 8],
95 block_size: u64,
101}
102
103pub type LayerInfo = LayerInfoV39;
105
106#[derive(Debug, Serialize, Deserialize, TypeFingerprint, Versioned)]
107pub struct LayerInfoV39 {
108 num_items: usize,
111 num_data_blocks: u64,
113 bloom_filter_size_bytes: usize,
115 bloom_filter_seed: u64,
117 bloom_filter_num_hashes: usize,
119}
120
121pub struct PersistentLayer<K, V> {
123 object_handle: Arc<dyn ReadObjectHandle>,
130 caching_object_handle: CachingObjectHandle<Arc<dyn ReadObjectHandle>>,
131 version: Version,
132 block_size: u64,
133 data_size: u64,
134 seek_table: Vec<u64>,
135 num_items: usize,
136 bloom_filter: Option<BloomFilterReader<K>>,
137 bloom_filter_stats: Option<BloomFilterStats>,
138 close_event: Mutex<Option<Arc<DropEvent>>>,
139 _value_type: PhantomData<V>,
140}
141
142#[derive(Debug)]
143struct BufferCursor {
144 chunk: Option<CachedChunk>,
145 pos: usize,
146}
147
148impl std::io::Read for BufferCursor {
149 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
150 let chunk = if let Some(chunk) = &self.chunk {
151 chunk
152 } else {
153 return Ok(0);
154 };
155 let to_read = std::cmp::min(buf.len(), chunk.len().saturating_sub(self.pos));
156 if to_read > 0 {
157 buf[..to_read].copy_from_slice(&chunk[self.pos..self.pos + to_read]);
158 self.pos += to_read;
159 }
160 Ok(to_read)
161 }
162}
163
164const MIN_BLOCK_SIZE: u64 = 512;
165
166const MINIMUM_DATA_BLOCKS_FOR_BLOOM_FILTER: usize = 4;
168
169const NUM_HEADER_BLOCKS: u64 = 1;
171
172const MINIMUM_LAYER_FILE_BLOCKS: u64 = 2;
175
176const MAX_BLOOM_FILTER_SIZE: usize = 64 * 1024 * 1024;
179const MAX_SEEK_TABLE_SIZE: usize = 64 * 1024 * 1024;
180
181const PER_DATA_BLOCK_HEADER_SIZE: usize = 2;
183const PER_DATA_BLOCK_SEEK_ENTRY_SIZE: usize = 2;
184
185struct KeyOnlyIterator<'iter, K: Key, V: LayerValue> {
187 buffer: BufferCursor,
189
190 layer: &'iter PersistentLayer<K, V>,
191
192 pos: u64,
194
195 item_index: u16,
197
198 item_count: u16,
200
201 key: Option<K>,
203
204 value_deserialized: bool,
207}
208
209impl<K: Key, V: LayerValue> KeyOnlyIterator<'_, K, V> {
210 fn new<'iter>(layer: &'iter PersistentLayer<K, V>, pos: u64) -> KeyOnlyIterator<'iter, K, V> {
211 assert!(pos % layer.block_size == 0);
212 KeyOnlyIterator {
213 layer,
214 buffer: BufferCursor { chunk: None, pos: pos as usize % CHUNK_SIZE },
215 pos,
216 item_index: 0,
217 item_count: 0,
218 key: None,
219 value_deserialized: false,
220 }
221 }
222
223 fn seek_to_block_item(&mut self, index: u16) -> Result<(), Error> {
227 ensure!(index < self.item_count, FxfsError::OutOfRange);
228 if index == self.item_index && self.value_deserialized {
229 return Ok(());
232 }
233 let offset_in_block = if index == 0 {
234 PER_DATA_BLOCK_HEADER_SIZE
237 } else {
238 let old_buffer_pos = self.buffer.pos;
239 self.buffer.pos = round_up(self.buffer.pos, self.layer.block_size as usize).unwrap()
240 - (PER_DATA_BLOCK_SEEK_ENTRY_SIZE * (usize::from(self.item_count - index)));
241 let res = self.buffer.read_u16::<LittleEndian>();
242 self.buffer.pos = old_buffer_pos;
243 let offset_in_block = res.context("Failed to read offset")? as usize;
244 if offset_in_block >= self.layer.block_size as usize
245 || offset_in_block <= PER_DATA_BLOCK_HEADER_SIZE
246 {
247 return Err(anyhow!(FxfsError::Inconsistent))
248 .context(format!("Offset {} is out of valid range.", offset_in_block));
249 }
250 offset_in_block
251 };
252 self.item_index = index;
253 self.buffer.pos =
254 round_down(self.buffer.pos, self.layer.block_size as usize) + offset_in_block;
255 Ok(())
256 }
257
258 async fn advance(&mut self) -> Result<(), Error> {
259 if self.item_index >= self.item_count {
260 if self.pos >= self.layer.data_offset() + self.layer.data_size {
261 self.key = None;
262 return Ok(());
263 }
264 if self.buffer.chunk.is_none() || self.pos as usize % CHUNK_SIZE == 0 {
265 self.buffer.chunk = Some(
266 self.layer
267 .caching_object_handle
268 .read(self.pos as usize)
269 .await
270 .context("Reading during advance")?,
271 );
272 }
273 self.buffer.pos = self.pos as usize % CHUNK_SIZE;
274 self.item_count = self.buffer.read_u16::<LittleEndian>()?;
275 if self.item_count == 0 {
276 bail!(
277 "Read block with zero item count (object: {}, offset: {})",
278 self.layer.object_handle.object_id(),
279 self.pos
280 );
281 }
282 debug!(
283 pos = self.pos,
284 buf:? = self.buffer,
285 object_size = self.layer.data_offset() + self.layer.data_size,
286 oid = self.layer.object_handle.object_id();
287 ""
288 );
289 self.pos += self.layer.block_size;
290 self.item_index = 0;
291 self.value_deserialized = true;
292 }
293 self.seek_to_block_item(self.item_index)?;
294 self.key = Some(
295 K::deserialize_from_version(self.buffer.by_ref(), self.layer.version)
296 .context("Corrupt layer (key)")?,
297 );
298 self.item_index += 1;
299 self.value_deserialized = false;
300 Ok(())
301 }
302
303 fn get(&self) -> Option<&K> {
304 self.key.as_ref()
305 }
306}
307
308struct Iterator<'iter, K: Key, V: LayerValue> {
309 inner: KeyOnlyIterator<'iter, K, V>,
310 item: Option<Item<K, V>>,
312}
313
314impl<'iter, K: Key, V: LayerValue> Iterator<'iter, K, V> {
315 fn new(mut seek_iterator: KeyOnlyIterator<'iter, K, V>) -> Result<Self, Error> {
316 let key = std::mem::take(&mut seek_iterator.key);
317 let item = if let Some(key) = key {
318 seek_iterator.value_deserialized = true;
319 Some(Item {
320 key,
321 value: V::deserialize_from_version(
322 seek_iterator.buffer.by_ref(),
323 seek_iterator.layer.version,
324 )
325 .context("Corrupt layer (value)")?,
326 sequence: seek_iterator
327 .buffer
328 .read_u64::<LittleEndian>()
329 .context("Corrupt layer (seq)")?,
330 })
331 } else {
332 None
333 };
334 Ok(Self { inner: seek_iterator, item })
335 }
336}
337
338#[async_trait]
339impl<'iter, K: Key, V: LayerValue> LayerIterator<K, V> for Iterator<'iter, K, V> {
340 async fn advance(&mut self) -> Result<(), Error> {
341 self.inner.advance().await?;
342 let key = std::mem::take(&mut self.inner.key);
343 self.item = if let Some(key) = key {
344 self.inner.value_deserialized = true;
345 Some(Item {
346 key,
347 value: V::deserialize_from_version(
348 self.inner.buffer.by_ref(),
349 self.inner.layer.version,
350 )
351 .context("Corrupt layer (value)")?,
352 sequence: self
353 .inner
354 .buffer
355 .read_u64::<LittleEndian>()
356 .context("Corrupt layer (seq)")?,
357 })
358 } else {
359 None
360 };
361 Ok(())
362 }
363
364 fn get(&self) -> Option<ItemRef<'_, K, V>> {
365 self.item.as_ref().map(<&Item<K, V>>::into)
366 }
367}
368
369fn seek_table_size(num_data_blocks: u64) -> usize {
371 let seek_table_entries = num_data_blocks.saturating_sub(1) as usize;
373 if seek_table_entries == 0 {
374 return 0;
375 }
376 let entry_size = std::mem::size_of::<u64>();
377 seek_table_entries * entry_size
378}
379
380async fn load_seek_table(
381 object_handle: &(impl ReadObjectHandle + 'static),
382 seek_table_offset: u64,
383 num_data_blocks: u64,
384) -> Result<Vec<u64>, Error> {
385 let seek_table_size = seek_table_size(num_data_blocks);
386 if seek_table_size == 0 {
387 return Ok(vec![]);
388 }
389 if seek_table_size > MAX_SEEK_TABLE_SIZE {
390 return Err(anyhow!(FxfsError::NotSupported)).context("Seek table too large");
391 }
392 let mut buffer = object_handle.allocate_buffer(seek_table_size).await;
393 let bytes_read = object_handle
394 .read(seek_table_offset, buffer.as_mut())
395 .await
396 .context("Reading seek table blocks")?;
397 ensure!(bytes_read == seek_table_size, "Short read");
398
399 let mut seek_table = Vec::with_capacity(num_data_blocks as usize);
400 seek_table.push(0);
402 let mut prev = 0;
403 for chunk in buffer.as_slice().chunks_exact(std::mem::size_of::<u64>()) {
404 let next = LittleEndian::read_u64(chunk);
405 if prev > next {
408 return Err(anyhow!(FxfsError::Inconsistent))
409 .context(format!("Seek table entry out of order, {:?} > {:?}", prev, next));
410 }
411 prev = next;
412 seek_table.push(next);
413 }
414 Ok(seek_table)
415}
416
417async fn load_bloom_filter<K: FuzzyHash>(
418 handle: &(impl ReadObjectHandle + 'static),
419 bloom_filter_offset: u64,
420 layer_info: &LayerInfo,
421) -> Result<Option<BloomFilterReader<K>>, Error> {
422 if layer_info.bloom_filter_size_bytes == 0 {
423 return Ok(None);
424 }
425 if layer_info.bloom_filter_size_bytes > MAX_BLOOM_FILTER_SIZE {
426 return Err(anyhow!(FxfsError::NotSupported)).context("Bloom filter too large");
427 }
428 let mut buffer = handle.allocate_buffer(layer_info.bloom_filter_size_bytes).await;
429 handle.read(bloom_filter_offset, buffer.as_mut()).await.context("Failed to read")?;
430 Ok(Some(BloomFilterReader::read(
431 buffer.as_slice(),
432 layer_info.bloom_filter_seed,
433 layer_info.bloom_filter_num_hashes,
434 )?))
435}
436
437impl<K: Key, V: LayerValue> PersistentLayer<K, V> {
438 pub async fn open(handle: impl ReadObjectHandle + 'static) -> Result<Arc<Self>, Error> {
439 let bs = handle.block_size();
440 let mut buffer = handle.allocate_buffer(bs as usize).await;
441 handle.read(0, buffer.as_mut()).await.context("Failed to read first block")?;
442 let mut cursor = std::io::Cursor::new(buffer.as_slice());
443 let version = Version::deserialize_from(&mut cursor)?;
444
445 ensure!(version <= LATEST_VERSION, FxfsError::InvalidVersion);
446 let header = LayerHeader::deserialize_from_version(&mut cursor, version)
447 .context("Failed to deserialize header")?;
448 if &header.magic != PERSISTENT_LAYER_MAGIC {
449 return Err(anyhow!(FxfsError::Inconsistent).context("Invalid layer file magic"));
450 }
451 if header.block_size == 0 || !header.block_size.is_power_of_two() {
452 return Err(anyhow!(FxfsError::Inconsistent))
453 .context(format!("Invalid block size {}", header.block_size));
454 }
455 ensure!(header.block_size > 0, FxfsError::Inconsistent);
456 ensure!(header.block_size <= MAX_BLOCK_SIZE, FxfsError::NotSupported);
457 let physical_block_size = handle.block_size();
458 if header.block_size % physical_block_size != 0 {
459 return Err(anyhow!(FxfsError::Inconsistent)).context(format!(
460 "{} not a multiple of physical block size {}",
461 header.block_size, physical_block_size
462 ));
463 }
464 std::mem::drop(cursor);
465
466 let bs = header.block_size as usize;
467 if handle.get_size() < MINIMUM_LAYER_FILE_BLOCKS * bs as u64 {
468 return Err(anyhow!(FxfsError::Inconsistent).context("Layer file too short"));
469 }
470
471 let layer_info = {
472 let last_block_offset = handle
473 .get_size()
474 .checked_sub(header.block_size)
475 .ok_or(FxfsError::Inconsistent)
476 .context("Layer file unexpectedly short")?;
477 handle
478 .read(last_block_offset, buffer.subslice_mut(0..header.block_size as usize))
479 .await
480 .context("Failed to read layer info")?;
481 let layer_info_len =
482 LittleEndian::read_u64(&buffer.as_slice()[bs - std::mem::size_of::<u64>()..]);
483 let layer_info_offset = bs
484 .checked_sub(std::mem::size_of::<u64>() + layer_info_len as usize)
485 .ok_or(FxfsError::Inconsistent)
486 .context("Invalid layer info length")?;
487 let mut cursor = std::io::Cursor::new(&buffer.as_slice()[layer_info_offset..]);
488 LayerInfo::deserialize_from_version(&mut cursor, version)
489 .context("Failed to deserialize LayerInfo")?
490 };
491 std::mem::drop(buffer);
492 if layer_info.num_items == 0 && layer_info.num_data_blocks > 0 {
493 return Err(anyhow!(FxfsError::Inconsistent))
494 .context("Invalid num_items/num_data_blocks");
495 }
496 let total_blocks = handle.get_size() / header.block_size;
497 let bloom_filter_blocks =
498 round_up(layer_info.bloom_filter_size_bytes as u64, header.block_size)
499 .unwrap_or(layer_info.bloom_filter_size_bytes as u64)
500 / header.block_size;
501 if layer_info.num_data_blocks + bloom_filter_blocks
502 > total_blocks - MINIMUM_LAYER_FILE_BLOCKS
503 {
504 return Err(anyhow!(FxfsError::Inconsistent)).context("Invalid number of blocks");
505 }
506
507 let bloom_filter_offset =
508 header.block_size * (NUM_HEADER_BLOCKS + layer_info.num_data_blocks);
509 let bloom_filter = if version == LATEST_VERSION {
510 load_bloom_filter(&handle, bloom_filter_offset, &layer_info)
511 .await
512 .context("Failed to load bloom filter")?
513 } else {
514 None
518 };
519 let bloom_filter_stats = bloom_filter.as_ref().map(|b| b.stats());
520
521 let seek_offset = header.block_size
522 * (NUM_HEADER_BLOCKS + layer_info.num_data_blocks + bloom_filter_blocks);
523 let seek_table = load_seek_table(&handle, seek_offset, layer_info.num_data_blocks)
524 .await
525 .context("Failed to load seek table")?;
526
527 let object_handle = Arc::new(handle) as Arc<dyn ReadObjectHandle>;
528 let caching_object_handle = CachingObjectHandle::new(object_handle.clone());
529 Ok(Arc::new(PersistentLayer {
530 object_handle,
531 caching_object_handle,
532 version,
533 block_size: header.block_size,
534 data_size: layer_info.num_data_blocks * header.block_size,
535 seek_table,
536 num_items: layer_info.num_items,
537 bloom_filter,
538 bloom_filter_stats,
539 close_event: Mutex::new(Some(Arc::new(DropEvent::new()))),
540 _value_type: PhantomData::default(),
541 }))
542 }
543
544 pub fn has_bloom_filter(&self) -> bool {
549 self.bloom_filter.is_some()
550 }
551
552 fn data_offset(&self) -> u64 {
553 NUM_HEADER_BLOCKS * self.block_size
554 }
555}
556
557#[async_trait]
558impl<K: Key, V: LayerValue> Layer<K, V> for PersistentLayer<K, V> {
559 fn handle(&self) -> Option<&dyn ReadObjectHandle> {
560 Some(&self.object_handle)
561 }
562
563 fn purge_cached_data(&self) {
564 self.caching_object_handle.purge();
565 }
566
567 async fn seek<'a>(&'a self, bound: Bound<&K>) -> Result<BoxedLayerIterator<'a, K, V>, Error> {
568 let (key, excluded) = match bound {
569 Bound::Unbounded => {
570 let mut iterator = Iterator::new(KeyOnlyIterator::new(self, self.data_offset()))?;
571 iterator.advance().await.context("Unbounded seek advance")?;
572 return Ok(Box::new(iterator));
573 }
574 Bound::Included(k) => (k, false),
575 Bound::Excluded(k) => (k, true),
576 };
577 let first_data_block_index = self.data_offset() / self.block_size;
578
579 let (mut left_offset, mut right_offset) = {
580 let target = key.get_leading_u64();
585 let right_index = self.seek_table.as_slice().partition_point(|&x| x <= target) as u64;
587 let left_index = self.seek_table.as_slice()[..right_index as usize]
590 .partition_point(|&x| x < target)
591 .saturating_sub(1) as u64;
592
593 (
594 (left_index + first_data_block_index) * self.block_size,
595 (right_index + first_data_block_index) * self.block_size,
596 )
597 };
598 let mut left = KeyOnlyIterator::new(self, left_offset);
599 left.advance().await.context("Initial seek advance")?;
600 match left.get() {
601 None => return Ok(Box::new(Iterator::new(left)?)),
602 Some(left_key) => match left_key.cmp_upper_bound(key) {
603 Ordering::Greater => return Ok(Box::new(Iterator::new(left)?)),
604 Ordering::Equal => {
605 if excluded {
606 left.advance().await?;
607 }
608 return Ok(Box::new(Iterator::new(left)?));
609 }
610 Ordering::Less => {}
611 },
612 }
613 let mut right = None;
614 while right_offset - left_offset > self.block_size {
615 let mid_offset =
617 round_down(left_offset + (right_offset - left_offset) / 2, self.block_size);
618 let mut iterator = KeyOnlyIterator::new(self, mid_offset);
619 iterator.advance().await?;
620 let iter_key: &K = iterator.get().unwrap();
621 match iter_key.cmp_upper_bound(key) {
622 Ordering::Greater => {
623 right_offset = mid_offset;
624 right = Some(iterator);
625 }
626 Ordering::Equal => {
627 if excluded {
628 iterator.advance().await?;
629 }
630 return Ok(Box::new(Iterator::new(iterator)?));
631 }
632 Ordering::Less => {
633 left_offset = mid_offset;
634 left = iterator;
635 }
636 }
637 }
638
639 let mut left_index = 0;
641 let mut right_index = left.item_count;
642 while left_index < (right_index - 1) {
644 let mid_index = left_index + ((right_index - left_index) / 2);
645 left.seek_to_block_item(mid_index).context("Read index offset for binary search")?;
646 left.advance().await?;
647 match left.get().unwrap().cmp_upper_bound(key) {
648 Ordering::Greater => {
649 right_index = mid_index;
650 }
651 Ordering::Equal => {
652 if excluded {
653 left.advance().await?;
654 }
655 return Ok(Box::new(Iterator::new(left)?));
656 }
657 Ordering::Less => {
658 left_index = mid_index;
659 }
660 }
661 }
662 if right_index < left.item_count {
667 left.seek_to_block_item(right_index)
668 .context("Read index for offset of right pointer")?;
669 } else if let Some(right) = right {
670 return Ok(Box::new(Iterator::new(right)?));
671 } else {
672 }
678 left.advance().await?;
679 return Ok(Box::new(Iterator::new(left)?));
680 }
681
682 fn len(&self) -> usize {
683 self.num_items
684 }
685
686 fn maybe_contains_key(&self, key: &K) -> bool {
687 self.bloom_filter.as_ref().map_or(true, |f| f.maybe_contains(key))
688 }
689
690 async fn key_exists(&self, key: &K) -> Result<Existence, Error> {
691 match &self.bloom_filter {
692 Some(filter) => Ok(if filter.maybe_contains(key) {
693 Existence::MaybeExists
694 } else {
695 Existence::Missing
696 }),
697 None => {
698 let iter = self.seek(Bound::Included(key)).await?;
699 Ok(iter.get().map_or(Existence::Missing, |i| {
700 if i.key.cmp_upper_bound(key).is_eq() {
701 Existence::Exists
702 } else {
703 Existence::Missing
704 }
705 }))
706 }
707 }
708 }
709
710 fn lock(&self) -> Option<Arc<DropEvent>> {
711 self.close_event.lock().clone()
712 }
713
714 async fn close(&self) {
715 let listener = self.close_event.lock().take().expect("close already called").listen();
716 listener.await;
717 }
718
719 fn get_version(&self) -> Version {
720 return self.version;
721 }
722
723 fn record_inspect_data(self: Arc<Self>, node: &fuchsia_inspect::Node) {
724 node.record_uint("num_items", self.num_items as u64);
725 node.record_bool("persistent", true);
726 node.record_uint("size", self.object_handle.get_size());
727 if let Some(stats) = self.bloom_filter_stats.as_ref() {
728 node.record_child("bloom_filter", move |node| {
729 node.record_uint("size", stats.size as u64);
730 node.record_uint("num_hashes", stats.num_hashes as u64);
731 node.record_uint("fill_percentage", stats.fill_percentage as u64);
732 });
733 }
734 }
735}
736
737const_assert!(MAX_BLOCK_SIZE <= u16::MAX as u64 + 1);
739
740pub struct PersistentLayerWriter<W: WriteBytes, K: Key, V: LayerValue> {
743 writer: W,
744 block_size: u64,
745 buf: Vec<u8>,
746 buf_item_count: u16,
747 item_count: usize,
748 block_offsets: Vec<u16>,
749 block_keys: Vec<u64>,
750 bloom_filter: BloomFilterWriter<K>,
751 _value: PhantomData<V>,
752}
753
754impl<W: WriteBytes, K: Key, V: LayerValue> PersistentLayerWriter<W, K, V> {
755 pub async fn new(writer: W, num_items: usize, block_size: u64) -> Result<Self, Error> {
758 Self::new_with_version(writer, num_items, block_size, LATEST_VERSION).await
759 }
760
761 async fn new_with_version(
762 mut writer: W,
763 num_items: usize,
764 block_size: u64,
765 version: Version,
766 ) -> Result<Self, Error> {
767 ensure!(block_size <= MAX_BLOCK_SIZE, FxfsError::NotSupported);
768 ensure!(block_size >= MIN_BLOCK_SIZE, FxfsError::NotSupported);
769
770 let header = LayerHeader { magic: PERSISTENT_LAYER_MAGIC.clone(), block_size };
772 let mut buf = vec![0u8; block_size as usize];
773 {
774 let mut cursor = std::io::Cursor::new(&mut buf[..]);
775 version.serialize_into(&mut cursor)?;
776 header.serialize_into(&mut cursor)?;
777 }
778 writer.write_bytes(&buf[..]).await?;
779
780 let seed: u64 = rand::random();
781 Ok(PersistentLayerWriter {
782 writer,
783 block_size,
784 buf: Vec::new(),
785 buf_item_count: 0,
786 item_count: 0,
787 block_offsets: Vec::new(),
788 block_keys: Vec::new(),
789 bloom_filter: BloomFilterWriter::new(seed, num_items),
790 _value: PhantomData,
791 })
792 }
793
794 async fn write_block(&mut self, len: usize) -> Result<(), Error> {
799 if self.buf_item_count == 0 {
800 return Ok(());
801 }
802 let seek_table_size = self.block_offsets.len() * PER_DATA_BLOCK_SEEK_ENTRY_SIZE;
803 assert!(PER_DATA_BLOCK_HEADER_SIZE + seek_table_size + len <= self.block_size as usize);
804 let mut cursor = std::io::Cursor::new(vec![0u8; self.block_size as usize]);
805 cursor.write_u16::<LittleEndian>(self.buf_item_count)?;
806 cursor.write_all(self.buf.drain(..len).as_ref())?;
807 cursor.set_position(self.block_size - seek_table_size as u64);
808 for &offset in &self.block_offsets {
810 cursor.write_u16::<LittleEndian>(offset)?;
811 }
812 self.writer.write_bytes(cursor.get_ref()).await?;
813 debug!(item_count = self.buf_item_count, byte_count = len; "wrote items");
814 self.buf_item_count = 0;
815 self.block_offsets.clear();
816 Ok(())
817 }
818
819 async fn write_seek_table(&mut self) -> Result<usize, Error> {
824 if self.block_keys.len() == 0 {
825 return Ok(0);
826 }
827 let size = self.block_keys.len() * std::mem::size_of::<u64>();
828 self.buf.resize(size, 0);
829 let mut len = 0;
830 for key in &self.block_keys {
831 LittleEndian::write_u64(&mut self.buf[len..len + std::mem::size_of::<u64>()], *key);
832 len += std::mem::size_of::<u64>();
833 }
834 self.writer.write_bytes(&self.buf).await?;
835 Ok(size)
836 }
837
838 async fn write_info(
841 &mut self,
842 num_data_blocks: u64,
843 bloom_filter_size_bytes: usize,
844 seek_table_len: usize,
845 ) -> Result<(), Error> {
846 let block_size = self.writer.block_size() as usize;
847 let layer_info = LayerInfo {
848 num_items: self.item_count,
849 num_data_blocks,
850 bloom_filter_size_bytes,
851 bloom_filter_seed: self.bloom_filter.seed(),
852 bloom_filter_num_hashes: self.bloom_filter.num_hashes(),
853 };
854 let actual_len = {
855 let mut cursor = std::io::Cursor::new(&mut self.buf);
856 layer_info.serialize_into(&mut cursor)?;
857 let layer_info_len = cursor.position();
858 cursor.write_u64::<LittleEndian>(layer_info_len)?;
859 cursor.position() as usize
860 };
861
862 let avail_in_block = block_size - (seek_table_len % block_size);
865 let to_skip = if avail_in_block < actual_len {
866 block_size + avail_in_block - actual_len
867 } else {
868 avail_in_block - actual_len
869 } as u64;
870 self.writer.skip(to_skip).await?;
871 self.writer.write_bytes(&self.buf[..actual_len]).await?;
872 Ok(())
873 }
874
875 async fn write_bloom_filter(&mut self) -> Result<usize, Error> {
878 if self.data_blocks() < MINIMUM_DATA_BLOCKS_FOR_BLOOM_FILTER {
879 return Ok(0);
880 }
881 let size = round_up(self.bloom_filter.serialized_size(), self.block_size as usize).unwrap();
883 self.buf.resize(size, 0);
884 let mut cursor = std::io::Cursor::new(&mut self.buf);
885 self.bloom_filter.write(&mut cursor)?;
886 self.writer.write_bytes(&self.buf).await?;
887 Ok(self.bloom_filter.serialized_size())
888 }
889
890 #[cfg(test)]
893 pub(crate) fn bloom_filter(&mut self) -> &mut BloomFilterWriter<K> {
894 &mut self.bloom_filter
895 }
896
897 fn data_blocks(&self) -> usize {
898 if self.item_count == 0 { 0 } else { self.block_keys.len() + 1 }
899 }
900}
901
902impl<W: WriteBytes + Send, K: Key, V: LayerValue> LayerWriter<K, V>
903 for PersistentLayerWriter<W, K, V>
904{
905 async fn write(&mut self, item: ItemRef<'_, K, V>) -> Result<(), Error> {
906 let len = self.buf.len();
908 item.key.serialize_into(&mut self.buf)?;
909 item.value.serialize_into(&mut self.buf)?;
910 self.buf.write_u64::<LittleEndian>(item.sequence)?;
911 let mut added_offset = false;
912 if self.buf_item_count > 0 {
914 self.block_offsets.push(u16::try_from(len + PER_DATA_BLOCK_HEADER_SIZE).unwrap());
915 added_offset = true;
916 }
917
918 if PER_DATA_BLOCK_HEADER_SIZE
921 + self.buf.len()
922 + (self.block_offsets.len() * PER_DATA_BLOCK_SEEK_ENTRY_SIZE)
923 > self.block_size as usize - 1
924 {
925 if added_offset {
926 self.block_offsets.pop();
929 }
930 self.write_block(len).await?;
931
932 self.block_keys.push(item.key.get_leading_u64());
934 }
935
936 self.bloom_filter.insert(&item.key);
937 self.buf_item_count += 1;
938 self.item_count += 1;
939 Ok(())
940 }
941
942 async fn flush(&mut self) -> Result<(), Error> {
943 self.write_block(self.buf.len()).await?;
944 let data_blocks = self.data_blocks() as u64;
945 let bloom_filter_len = self.write_bloom_filter().await?;
946 let seek_table_len = self.write_seek_table().await?;
947 self.write_info(data_blocks, bloom_filter_len, seek_table_len).await?;
948 self.writer.complete().await
949 }
950}
951
952impl<W: WriteBytes, K: Key, V: LayerValue> Drop for PersistentLayerWriter<W, K, V> {
953 fn drop(&mut self) {
954 if self.buf_item_count > 0 {
955 warn!("Dropping unwritten items; did you forget to flush?");
956 }
957 }
958}
959
960#[cfg(test)]
961mod tests {
962 use super::{PersistentLayer, PersistentLayerWriter};
963 use crate::filesystem::MAX_BLOCK_SIZE;
964 use crate::lsm_tree::LayerIterator;
965 use crate::lsm_tree::persistent_layer::MINIMUM_DATA_BLOCKS_FOR_BLOOM_FILTER;
966 use crate::lsm_tree::types::{
967 DefaultOrdUpperBound, Existence, FuzzyHash, Item, ItemRef, Layer, LayerKey, LayerWriter,
968 MergeType, SortByU64,
969 };
970 use crate::object_handle::WriteBytes;
971 use crate::round::round_up;
972 use crate::serialized_types::{
973 LATEST_VERSION, Version, Versioned, VersionedLatest, versioned_type,
974 };
975 use crate::testing::fake_object::{FakeObject, FakeObjectHandle};
976 use crate::testing::writer::Writer;
977 use fprint::TypeFingerprint;
978 use fxfs_macros::FuzzyHash;
979 use std::fmt::Debug;
980 use std::hash::Hash;
981 use std::ops::{Bound, Range};
982 use std::sync::Arc;
983
984 impl<W: WriteBytes> Debug for PersistentLayerWriter<W, i32, i32> {
985 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
986 f.debug_struct("rPersistentLayerWriter")
987 .field("block_size", &self.block_size)
988 .field("item_count", &self.buf_item_count)
989 .finish()
990 }
991 }
992
993 #[fuchsia::test]
994 async fn test_iterate_after_write() {
995 const BLOCK_SIZE: u64 = 512;
996 const ITEM_COUNT: i32 = 10000;
997
998 let handle = FakeObjectHandle::new(Arc::new(FakeObject::new()));
999 {
1000 let mut writer = PersistentLayerWriter::<_, i32, i32>::new(
1001 Writer::new(&handle).await,
1002 ITEM_COUNT as usize * 4,
1003 BLOCK_SIZE,
1004 )
1005 .await
1006 .expect("writer new");
1007 for i in 0..ITEM_COUNT {
1008 writer.write(Item::new(i, i).as_item_ref()).await.expect("write failed");
1009 }
1010 writer.flush().await.expect("flush failed");
1011 }
1012 let layer = PersistentLayer::<i32, i32>::open(handle).await.expect("new failed");
1013 let mut iterator = layer.seek(Bound::Unbounded).await.expect("seek failed");
1014 for i in 0..ITEM_COUNT {
1015 let ItemRef { key, value, .. } = iterator.get().expect("missing item");
1016 assert_eq!((key, value), (&i, &i));
1017 iterator.advance().await.expect("failed to advance");
1018 }
1019 assert!(iterator.get().is_none());
1020 }
1021
1022 #[fuchsia::test]
1023 async fn test_seek_after_write() {
1024 const BLOCK_SIZE: u64 = 512;
1025 const ITEM_COUNT: i32 = 5000;
1026
1027 let handle = FakeObjectHandle::new(Arc::new(FakeObject::new()));
1028 {
1029 let mut writer = PersistentLayerWriter::<_, i32, i32>::new(
1030 Writer::new(&handle).await,
1031 ITEM_COUNT as usize * 18,
1032 BLOCK_SIZE,
1033 )
1034 .await
1035 .expect("writer new");
1036 for i in 0..ITEM_COUNT {
1037 writer.write(Item::new(i * 2, i * 2).as_item_ref()).await.expect("write failed");
1039 }
1040 writer.flush().await.expect("flush failed");
1041 }
1042 let layer = PersistentLayer::<i32, i32>::open(handle).await.expect("new failed");
1043 for i in 0..ITEM_COUNT * 2 {
1045 let expected = round_up(i, 2).unwrap();
1048 let mut iterator = layer.seek(Bound::Included(&i)).await.expect("failed to seek");
1049 if i >= (ITEM_COUNT * 2) - 1 {
1052 assert!(iterator.get().is_none());
1053 } else {
1054 let ItemRef { key, value, .. } = iterator.get().expect("missing item");
1055 assert_eq!((key, value), (&expected, &expected));
1056 }
1057
1058 iterator.advance().await.expect("failed to advance");
1060 if i >= (ITEM_COUNT * 2) - 3 {
1064 assert!(iterator.get().is_none());
1065 } else {
1066 let ItemRef { key, value, .. } = iterator.get().expect("missing item");
1067 let next = expected + 2;
1068 assert_eq!((key, value), (&next, &next));
1069 }
1070 }
1071 }
1072
1073 #[fuchsia::test]
1074 async fn test_seek_unbounded() {
1075 const BLOCK_SIZE: u64 = 512;
1076 const ITEM_COUNT: i32 = 1000;
1077
1078 let handle = FakeObjectHandle::new(Arc::new(FakeObject::new()));
1079 {
1080 let mut writer = PersistentLayerWriter::<_, i32, i32>::new(
1081 Writer::new(&handle).await,
1082 ITEM_COUNT as usize * 18,
1083 BLOCK_SIZE,
1084 )
1085 .await
1086 .expect("writer new");
1087 for i in 0..ITEM_COUNT {
1088 writer.write(Item::new(i, i).as_item_ref()).await.expect("write failed");
1089 }
1090 writer.flush().await.expect("flush failed");
1091 }
1092 let layer = PersistentLayer::<i32, i32>::open(handle).await.expect("new failed");
1093 let mut iterator = layer.seek(Bound::Unbounded).await.expect("failed to seek");
1094 let ItemRef { key, value, .. } = iterator.get().expect("missing item");
1095 assert_eq!((key, value), (&0, &0));
1096
1097 iterator.advance().await.expect("failed to advance");
1099 let ItemRef { key, value, .. } = iterator.get().expect("missing item");
1100 assert_eq!((key, value), (&1, &1));
1101 }
1102
1103 #[fuchsia::test]
1104 async fn test_zero_items() {
1105 const BLOCK_SIZE: u64 = 512;
1106
1107 let handle = FakeObjectHandle::new(Arc::new(FakeObject::new()));
1108 {
1109 let mut writer = PersistentLayerWriter::<_, i32, i32>::new(
1110 Writer::new(&handle).await,
1111 0,
1112 BLOCK_SIZE,
1113 )
1114 .await
1115 .expect("writer new");
1116 writer.flush().await.expect("flush failed");
1117 }
1118
1119 let layer = PersistentLayer::<i32, i32>::open(handle).await.expect("new failed");
1120 let iterator = (layer.as_ref() as &dyn Layer<i32, i32>)
1121 .seek(Bound::Unbounded)
1122 .await
1123 .expect("seek failed");
1124 assert!(iterator.get().is_none())
1125 }
1126
1127 #[fuchsia::test]
1128 async fn test_one_item() {
1129 const BLOCK_SIZE: u64 = 512;
1130
1131 let handle = FakeObjectHandle::new(Arc::new(FakeObject::new()));
1132 {
1133 let mut writer = PersistentLayerWriter::<_, i32, i32>::new(
1134 Writer::new(&handle).await,
1135 1,
1136 BLOCK_SIZE,
1137 )
1138 .await
1139 .expect("writer new");
1140 writer.write(Item::new(42, 42).as_item_ref()).await.expect("write failed");
1141 writer.flush().await.expect("flush failed");
1142 }
1143
1144 let layer = PersistentLayer::<i32, i32>::open(handle).await.expect("new failed");
1145 {
1146 let mut iterator = (layer.as_ref() as &dyn Layer<i32, i32>)
1147 .seek(Bound::Unbounded)
1148 .await
1149 .expect("seek failed");
1150 let ItemRef { key, value, .. } = iterator.get().expect("missing item");
1151 assert_eq!((key, value), (&42, &42));
1152 iterator.advance().await.expect("failed to advance");
1153 assert!(iterator.get().is_none())
1154 }
1155 {
1156 let mut iterator = (layer.as_ref() as &dyn Layer<i32, i32>)
1157 .seek(Bound::Included(&30))
1158 .await
1159 .expect("seek failed");
1160 let ItemRef { key, value, .. } = iterator.get().expect("missing item");
1161 assert_eq!((key, value), (&42, &42));
1162 iterator.advance().await.expect("failed to advance");
1163 assert!(iterator.get().is_none())
1164 }
1165 {
1166 let mut iterator = (layer.as_ref() as &dyn Layer<i32, i32>)
1167 .seek(Bound::Included(&42))
1168 .await
1169 .expect("seek failed");
1170 let ItemRef { key, value, .. } = iterator.get().expect("missing item");
1171 assert_eq!((key, value), (&42, &42));
1172 iterator.advance().await.expect("failed to advance");
1173 assert!(iterator.get().is_none())
1174 }
1175 {
1176 let iterator = (layer.as_ref() as &dyn Layer<i32, i32>)
1177 .seek(Bound::Included(&43))
1178 .await
1179 .expect("seek failed");
1180 assert!(iterator.get().is_none())
1181 }
1182 }
1183
1184 #[fuchsia::test]
1185 async fn test_large_block_size() {
1186 const BLOCK_SIZE: u64 = MAX_BLOCK_SIZE;
1188 const ITEM_COUNT: i32 = ((BLOCK_SIZE as i32) / 18) * 3;
1190
1191 let handle =
1192 FakeObjectHandle::new_with_block_size(Arc::new(FakeObject::new()), BLOCK_SIZE as usize);
1193 {
1194 let mut writer = PersistentLayerWriter::<_, i32, i32>::new(
1195 Writer::new(&handle).await,
1196 ITEM_COUNT as usize * 18,
1197 BLOCK_SIZE,
1198 )
1199 .await
1200 .expect("writer new");
1201 for i in 2000000000..(2000000000 + ITEM_COUNT) {
1203 writer.write(Item::new(i, i).as_item_ref()).await.expect("write failed");
1204 }
1205 writer.flush().await.expect("flush failed");
1206 }
1207
1208 let layer = PersistentLayer::<i32, i32>::open(handle).await.expect("new failed");
1209 let mut iterator = layer.seek(Bound::Unbounded).await.expect("seek failed");
1210 for i in 2000000000..(2000000000 + ITEM_COUNT) {
1211 let ItemRef { key, value, .. } = iterator.get().expect("missing item");
1212 assert_eq!((key, value), (&i, &i));
1213 iterator.advance().await.expect("failed to advance");
1214 }
1215 assert!(iterator.get().is_none());
1216 }
1217
1218 #[fuchsia::test]
1219 async fn test_overlarge_block_size() {
1220 const BLOCK_SIZE: u64 = MAX_BLOCK_SIZE * 2;
1222
1223 let handle =
1224 FakeObjectHandle::new_with_block_size(Arc::new(FakeObject::new()), BLOCK_SIZE as usize);
1225 PersistentLayerWriter::<_, i32, i32>::new(Writer::new(&handle).await, 0, BLOCK_SIZE)
1226 .await
1227 .expect_err("Creating writer with overlarge block size.");
1228 }
1229
1230 #[fuchsia::test]
1231 async fn test_seek_bound_excluded() {
1232 const BLOCK_SIZE: u64 = 512;
1233 const ITEM_COUNT: i32 = 10000;
1234
1235 let handle = FakeObjectHandle::new(Arc::new(FakeObject::new()));
1236 {
1237 let mut writer = PersistentLayerWriter::<_, i32, i32>::new(
1238 Writer::new(&handle).await,
1239 ITEM_COUNT as usize * 18,
1240 BLOCK_SIZE,
1241 )
1242 .await
1243 .expect("writer new");
1244 for i in 0..ITEM_COUNT {
1245 writer.write(Item::new(i, i).as_item_ref()).await.expect("write failed");
1246 }
1247 writer.flush().await.expect("flush failed");
1248 }
1249 let layer = PersistentLayer::<i32, i32>::open(handle).await.expect("new failed");
1250
1251 for i in 9982..ITEM_COUNT {
1252 let mut iterator = layer.seek(Bound::Excluded(&i)).await.expect("failed to seek");
1253 let i_plus_one = i + 1;
1254 if i_plus_one < ITEM_COUNT {
1255 let ItemRef { key, value, .. } = iterator.get().expect("missing item");
1256
1257 assert_eq!((key, value), (&i_plus_one, &i_plus_one));
1258
1259 iterator.advance().await.expect("failed to advance");
1261 let i_plus_two = i + 2;
1262 if i_plus_two < ITEM_COUNT {
1263 let ItemRef { key, value, .. } = iterator.get().expect("missing item");
1264 assert_eq!((key, value), (&i_plus_two, &i_plus_two));
1265 } else {
1266 assert!(iterator.get().is_none());
1267 }
1268 } else {
1269 assert!(iterator.get().is_none());
1270 }
1271 }
1272 }
1273
1274 #[derive(
1275 Clone,
1276 Eq,
1277 Hash,
1278 FuzzyHash,
1279 PartialEq,
1280 Debug,
1281 serde::Serialize,
1282 serde::Deserialize,
1283 TypeFingerprint,
1284 Versioned,
1285 )]
1286 struct TestKey(Range<u64>);
1287 versioned_type! { 1.. => TestKey }
1288 impl SortByU64 for TestKey {
1289 fn get_leading_u64(&self) -> u64 {
1290 self.0.start
1291 }
1292 }
1293 impl LayerKey for TestKey {
1294 fn merge_type(&self) -> crate::lsm_tree::types::MergeType {
1295 MergeType::OptimizedMerge
1296 }
1297
1298 fn next_key(&self) -> Option<Self> {
1299 Some(TestKey(self.0.end..self.0.end + 1))
1300 }
1301 }
1302 impl Ord for TestKey {
1303 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1304 self.0.start.cmp(&other.0.start).then(self.0.end.cmp(&other.0.end))
1305 }
1306 }
1307 impl PartialOrd for TestKey {
1308 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1309 Some(self.cmp(other))
1310 }
1311 }
1312 impl DefaultOrdUpperBound for TestKey {}
1313
1314 #[fuchsia::test]
1317 async fn test_block_seek_duplicate_keys() {
1318 const BLOCK_SIZE: u64 = 512;
1320 const ITEMS_TO_FILL_BLOCK: u64 = BLOCK_SIZE / 37;
1324
1325 let mut to_find = Vec::new();
1326
1327 let handle =
1328 FakeObjectHandle::new_with_block_size(Arc::new(FakeObject::new()), BLOCK_SIZE as usize);
1329 {
1330 let mut writer = PersistentLayerWriter::<_, TestKey, u64>::new(
1331 Writer::new(&handle).await,
1332 3 * BLOCK_SIZE as usize,
1333 BLOCK_SIZE,
1334 )
1335 .await
1336 .expect("writer new");
1337
1338 let mut current_value = u32::MAX as u64 + 1;
1340
1341 {
1344 let items = ITEMS_TO_FILL_BLOCK * 3;
1345 for i in 0..items {
1346 writer
1347 .write(
1348 Item::new(TestKey(current_value..current_value + i), current_value)
1349 .as_item_ref(),
1350 )
1351 .await
1352 .expect("write failed");
1353 }
1354 to_find.push(TestKey(current_value..current_value));
1355 to_find.push(TestKey(current_value..(current_value + (items / 2))));
1356 to_find.push(TestKey(current_value..current_value + (items - 1)));
1357 current_value += 1;
1358 }
1359
1360 {
1362 let items = ITEMS_TO_FILL_BLOCK * 3;
1363 for _ in 0..items {
1364 writer
1365 .write(
1366 Item::new(TestKey(current_value..current_value), current_value)
1367 .as_item_ref(),
1368 )
1369 .await
1370 .expect("write failed");
1371 current_value += 1;
1372 }
1373 }
1374
1375 {
1378 let items = ITEMS_TO_FILL_BLOCK * 3;
1379 for i in 0..items {
1380 writer
1381 .write(
1382 Item::new(TestKey(current_value..current_value + i), current_value)
1383 .as_item_ref(),
1384 )
1385 .await
1386 .expect("write failed");
1387 }
1388 to_find.push(TestKey(current_value..current_value));
1389 to_find.push(TestKey(current_value..(current_value + (items / 2))));
1390 to_find.push(TestKey(current_value..current_value + (items - 1)));
1391 current_value += 1;
1392 }
1393
1394 {
1396 let items = ITEMS_TO_FILL_BLOCK * 3;
1397 for _ in 0..items {
1398 writer
1399 .write(
1400 Item::new(TestKey(current_value..current_value), current_value)
1401 .as_item_ref(),
1402 )
1403 .await
1404 .expect("write failed");
1405 current_value += 1;
1406 }
1407 }
1408
1409 {
1412 let items = ITEMS_TO_FILL_BLOCK * 3;
1413 for i in 0..items {
1414 writer
1415 .write(
1416 Item::new(TestKey(current_value..current_value + i), current_value)
1417 .as_item_ref(),
1418 )
1419 .await
1420 .expect("write failed");
1421 }
1422 to_find.push(TestKey(current_value..current_value));
1423 to_find.push(TestKey(current_value..(current_value + (items / 2))));
1424 to_find.push(TestKey(current_value..current_value + (items - 1)));
1425 }
1426
1427 writer.flush().await.expect("flush failed");
1428 }
1429
1430 let layer = PersistentLayer::<TestKey, u64>::open(handle).await.expect("new failed");
1431 for target in to_find {
1432 let iterator: Box<dyn LayerIterator<TestKey, u64>> =
1433 layer.seek(Bound::Included(&target)).await.expect("failed to seek");
1434 let ItemRef { key, .. } = iterator.get().expect("missing item");
1435 assert_eq!(&target, key);
1436 }
1437 }
1438
1439 #[fuchsia::test]
1440 async fn test_two_seek_blocks() {
1441 const BLOCK_SIZE: u64 = 512;
1443 const ITEMS_TO_FILL_BLOCK: u64 = BLOCK_SIZE / 37;
1447 const ITEM_COUNT: u64 = ITEMS_TO_FILL_BLOCK * ((BLOCK_SIZE / 8) + 2);
1451
1452 let mut to_find = Vec::new();
1453
1454 let handle =
1455 FakeObjectHandle::new_with_block_size(Arc::new(FakeObject::new()), BLOCK_SIZE as usize);
1456 {
1457 let mut writer = PersistentLayerWriter::<_, TestKey, u64>::new(
1458 Writer::new(&handle).await,
1459 ITEM_COUNT as usize * 18,
1460 BLOCK_SIZE,
1461 )
1462 .await
1463 .expect("writer new");
1464
1465 let initial_value = u32::MAX as u64 + 1;
1467 for i in 0..ITEM_COUNT {
1468 writer
1469 .write(
1470 Item::new(TestKey(initial_value + i..initial_value + i), initial_value)
1471 .as_item_ref(),
1472 )
1473 .await
1474 .expect("write failed");
1475 }
1476 to_find.push(TestKey(initial_value..initial_value));
1478 let middle = initial_value + ITEM_COUNT / 2;
1479 to_find.push(TestKey(middle..middle));
1480 let end = initial_value + ITEM_COUNT - 1;
1481 to_find.push(TestKey(end..end));
1482
1483 writer.flush().await.expect("flush failed");
1484 }
1485
1486 let layer = PersistentLayer::<TestKey, u64>::open(handle).await.expect("new failed");
1487 for target in to_find {
1488 let iterator: Box<dyn LayerIterator<TestKey, u64>> =
1489 layer.seek(Bound::Included(&target)).await.expect("failed to seek");
1490 let ItemRef { key, .. } = iterator.get().expect("missing item");
1491 assert_eq!(&target, key);
1492 }
1493 }
1494
1495 #[fuchsia::test]
1498 async fn test_full_seek_block() {
1499 const BLOCK_SIZE: u64 = 512;
1500
1501 const ITEMS_TO_FILL_BLOCK: u64 = BLOCK_SIZE / 37;
1505
1506 const SEEK_TABLE_ENTRIES: u64 = BLOCK_SIZE / 8;
1508
1509 const START_ENTRIES_COUNT: u64 = ITEMS_TO_FILL_BLOCK * SEEK_TABLE_ENTRIES;
1513
1514 for entries in START_ENTRIES_COUNT..START_ENTRIES_COUNT + (ITEMS_TO_FILL_BLOCK * 2) {
1515 let handle = FakeObjectHandle::new_with_block_size(
1516 Arc::new(FakeObject::new()),
1517 BLOCK_SIZE as usize,
1518 );
1519 {
1520 let mut writer = PersistentLayerWriter::<_, TestKey, u64>::new(
1521 Writer::new(&handle).await,
1522 entries as usize,
1523 BLOCK_SIZE,
1524 )
1525 .await
1526 .expect("writer new");
1527
1528 let initial_value = u32::MAX as u64 + 1;
1530 for i in 0..entries {
1531 writer
1532 .write(
1533 Item::new(TestKey(initial_value + i..initial_value + i), initial_value)
1534 .as_item_ref(),
1535 )
1536 .await
1537 .expect("write failed");
1538 }
1539
1540 writer.flush().await.expect("flush failed");
1541 }
1542 PersistentLayer::<TestKey, u64>::open(handle).await.expect("new failed");
1543 }
1544 }
1545
1546 #[fuchsia::test]
1547 async fn test_ignore_bloom_filter_on_older_versions() {
1548 const BLOCK_SIZE: u64 = 512;
1549 const ITEMS_TO_FILL_BLOCK: u64 = BLOCK_SIZE / 37;
1553 const ITEM_COUNT: u64 =
1555 (1 + MINIMUM_DATA_BLOCKS_FOR_BLOOM_FILTER as u64) * ITEMS_TO_FILL_BLOCK;
1556
1557 let old_version_handle =
1558 FakeObjectHandle::new_with_block_size(Arc::new(FakeObject::new()), BLOCK_SIZE as usize);
1559 let current_version_handle =
1560 FakeObjectHandle::new_with_block_size(Arc::new(FakeObject::new()), BLOCK_SIZE as usize);
1561 {
1562 let mut old_version_writer =
1563 PersistentLayerWriter::<_, TestKey, u64>::new_with_version(
1564 Writer::new(&old_version_handle).await,
1565 ITEM_COUNT as usize,
1566 BLOCK_SIZE,
1567 Version { major: LATEST_VERSION.major - 1, minor: 0 },
1568 )
1569 .await
1570 .expect("writer new");
1571 let mut current_version_writer = PersistentLayerWriter::<_, TestKey, u64>::new(
1572 Writer::new(¤t_version_handle).await,
1573 ITEM_COUNT as usize,
1574 BLOCK_SIZE,
1575 )
1576 .await
1577 .expect("writer new");
1578
1579 let initial_value = u32::MAX as u64 + 1;
1581 for i in 0..ITEM_COUNT {
1582 old_version_writer
1583 .write(
1584 Item::new(TestKey(initial_value + i..initial_value + i), initial_value)
1585 .as_item_ref(),
1586 )
1587 .await
1588 .expect("write failed");
1589 current_version_writer
1590 .write(
1591 Item::new(TestKey(initial_value + i..initial_value + i), initial_value)
1592 .as_item_ref(),
1593 )
1594 .await
1595 .expect("write failed");
1596 }
1597
1598 old_version_writer.flush().await.expect("flush failed");
1599 current_version_writer.flush().await.expect("flush failed");
1600 }
1601
1602 let old_layer =
1603 PersistentLayer::<TestKey, u64>::open(old_version_handle).await.expect("open failed");
1604 let current_layer = PersistentLayer::<TestKey, u64>::open(current_version_handle)
1605 .await
1606 .expect("open failed");
1607 assert!(!old_layer.has_bloom_filter());
1608 assert!(current_layer.has_bloom_filter());
1609 }
1610
1611 #[fuchsia::test]
1612 async fn test_key_exists_no_bloom_filter() {
1613 const BLOCK_SIZE: u64 = 512;
1614 const ITEM_COUNT: i32 = 100;
1616
1617 let handle = FakeObjectHandle::new(Arc::new(FakeObject::new()));
1618 {
1619 let mut writer = PersistentLayerWriter::<_, i32, i32>::new(
1620 Writer::new(&handle).await,
1621 ITEM_COUNT as usize,
1622 BLOCK_SIZE,
1623 )
1624 .await
1625 .expect("writer new");
1626 for i in 0..ITEM_COUNT {
1627 writer.write(Item::new(i * 2, i * 2).as_item_ref()).await.expect("write failed");
1628 }
1629 writer.flush().await.expect("flush failed");
1630 }
1631 let layer = PersistentLayer::<i32, i32>::open(handle).await.expect("new failed");
1632 assert!(!layer.has_bloom_filter());
1633
1634 for i in 0..ITEM_COUNT {
1635 assert_eq!(
1636 layer.key_exists(&(i * 2)).await.expect("key_exists failed"),
1637 Existence::Exists
1638 );
1639 assert_eq!(
1640 layer.key_exists(&(i * 2 + 1)).await.expect("key_exists failed"),
1641 Existence::Missing
1642 );
1643 }
1644 }
1645
1646 #[fuchsia::test]
1647 async fn test_key_exists_with_bloom_filter() {
1648 const BLOCK_SIZE: u64 = 512;
1649 const ITEM_COUNT: i32 = 10000;
1651
1652 let handle = FakeObjectHandle::new(Arc::new(FakeObject::new()));
1653 {
1654 let mut writer = PersistentLayerWriter::<_, i32, i32>::new(
1655 Writer::new(&handle).await,
1656 ITEM_COUNT as usize,
1657 BLOCK_SIZE,
1658 )
1659 .await
1660 .expect("writer new");
1661 for i in 0..ITEM_COUNT {
1662 writer.write(Item::new(i * 2, i * 2).as_item_ref()).await.expect("write failed");
1663 }
1664 writer.flush().await.expect("flush failed");
1665 }
1666 let layer = PersistentLayer::<i32, i32>::open(handle).await.expect("new failed");
1667 assert!(layer.has_bloom_filter());
1668
1669 for i in 0..ITEM_COUNT {
1670 assert_eq!(
1672 layer.key_exists(&(i * 2)).await.expect("key_exists failed"),
1673 Existence::MaybeExists
1674 );
1675 }
1676
1677 let mut missing_count = 0;
1680 for i in 0..ITEM_COUNT {
1681 let result = layer.key_exists(&(i * 2 + 1)).await.expect("key_exists failed");
1682 assert_ne!(result, Existence::Exists);
1683 if result == Existence::Missing {
1684 missing_count += 1;
1685 }
1686 }
1687 assert!(missing_count > ITEM_COUNT / 2);
1689 }
1690}