1use crate::drop_event::DropEvent;
61use crate::errors::FxfsError;
62use crate::filesystem::MAX_BLOCK_SIZE;
63use crate::log::*;
64use crate::lsm_tree::bloom_filter::{BloomFilterReader, BloomFilterStats, BloomFilterWriter};
65use crate::lsm_tree::types::{
66 BoxedLayerIterator, Existence, FuzzyHash, Item, ItemRef, Key, Layer, LayerIterator, LayerValue,
67 LayerWriter,
68};
69use crate::object_handle::{ObjectHandle, ReadObjectHandle, WriteBytes};
70use crate::object_store::caching_object_handle::{CHUNK_SIZE, CachedChunk, CachingObjectHandle};
71use crate::round::{round_down, round_up};
72use crate::serialized_types::{
73 LATEST_VERSION, REMOVE_ITEM_SEQUENCE_VERSION, Version, Versioned, VersionedLatest,
74};
75use anyhow::{Context, Error, anyhow, bail, ensure};
76use async_trait::async_trait;
77use byteorder::{ByteOrder, LittleEndian, ReadBytesExt, WriteBytesExt};
78use fprint::TypeFingerprint;
79use fuchsia_sync::Mutex;
80use serde::{Deserialize, Serialize};
81use static_assertions::const_assert;
82use std::cmp::Ordering;
83use std::io::{Read, Write as _};
84use std::marker::PhantomData;
85use std::ops::Bound;
86use std::sync::Arc;
87
88const PERSISTENT_LAYER_MAGIC: &[u8; 8] = b"FxfsLayr";
89
90pub type LayerHeader = LayerHeaderV39;
92
93#[derive(Debug, Serialize, Deserialize, TypeFingerprint, Versioned)]
94pub struct LayerHeaderV39 {
95 magic: [u8; 8],
97 block_size: u64,
103}
104
105pub type LayerInfo = LayerInfoV39;
107
108#[derive(Debug, Serialize, Deserialize, TypeFingerprint, Versioned)]
109pub struct LayerInfoV39 {
110 num_items: usize,
113 num_data_blocks: u64,
115 bloom_filter_size_bytes: usize,
117 bloom_filter_seed: u64,
119 bloom_filter_num_hashes: usize,
121}
122
123pub struct PersistentLayer<K, V> {
125 object_handle: Arc<dyn ReadObjectHandle>,
132 caching_object_handle: CachingObjectHandle<Arc<dyn ReadObjectHandle>>,
133 version: Version,
134 block_size: u64,
135 data_size: u64,
136 seek_table: Vec<u64>,
137 num_items: usize,
138 bloom_filter: Option<BloomFilterReader<K>>,
139 bloom_filter_stats: Option<BloomFilterStats>,
140 close_event: Mutex<Option<Arc<DropEvent>>>,
141 _value_type: PhantomData<V>,
142}
143
144#[derive(Debug)]
145struct BufferCursor {
146 chunk: Option<CachedChunk>,
147 pos: usize,
148}
149
150impl std::io::Read for BufferCursor {
151 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
152 let chunk = if let Some(chunk) = &self.chunk {
153 chunk
154 } else {
155 return Ok(0);
156 };
157 let to_read = std::cmp::min(buf.len(), chunk.len().saturating_sub(self.pos));
158 if to_read > 0 {
159 buf[..to_read].copy_from_slice(&chunk[self.pos..self.pos + to_read]);
160 self.pos += to_read;
161 }
162 Ok(to_read)
163 }
164}
165
166const MIN_BLOCK_SIZE: u64 = 512;
167
168const MINIMUM_DATA_BLOCKS_FOR_BLOOM_FILTER: usize = 4;
170
171const NUM_HEADER_BLOCKS: u64 = 1;
173
174const MINIMUM_LAYER_FILE_BLOCKS: u64 = 2;
177
178const MAX_BLOOM_FILTER_SIZE: usize = 64 * 1024 * 1024;
181const MAX_SEEK_TABLE_SIZE: usize = 64 * 1024 * 1024;
182
183const PER_DATA_BLOCK_HEADER_SIZE: usize = 2;
185const PER_DATA_BLOCK_SEEK_ENTRY_SIZE: usize = 2;
186
187struct KeyOnlyIterator<'iter, K: Key, V: LayerValue> {
189 buffer: BufferCursor,
191
192 layer: &'iter PersistentLayer<K, V>,
193
194 pos: u64,
196
197 item_index: u16,
199
200 item_count: u16,
202
203 key: Option<K>,
205
206 value_deserialized: bool,
209}
210
211impl<K: Key, V: LayerValue> KeyOnlyIterator<'_, K, V> {
212 fn new<'iter>(layer: &'iter PersistentLayer<K, V>, pos: u64) -> KeyOnlyIterator<'iter, K, V> {
213 assert!(pos % layer.block_size == 0);
214 KeyOnlyIterator {
215 layer,
216 buffer: BufferCursor { chunk: None, pos: pos as usize % CHUNK_SIZE },
217 pos,
218 item_index: 0,
219 item_count: 0,
220 key: None,
221 value_deserialized: false,
222 }
223 }
224
225 fn seek_to_block_item(&mut self, index: u16) -> Result<(), Error> {
229 ensure!(index < self.item_count, FxfsError::OutOfRange);
230 if index == self.item_index && self.value_deserialized {
231 return Ok(());
234 }
235 let offset_in_block = if index == 0 {
236 PER_DATA_BLOCK_HEADER_SIZE
239 } else {
240 let old_buffer_pos = self.buffer.pos;
241 self.buffer.pos = round_up(self.buffer.pos, self.layer.block_size as usize).unwrap()
242 - (PER_DATA_BLOCK_SEEK_ENTRY_SIZE * (usize::from(self.item_count - index)));
243 let res = self.buffer.read_u16::<LittleEndian>();
244 self.buffer.pos = old_buffer_pos;
245 let offset_in_block = res.context("Failed to read offset")? as usize;
246 if offset_in_block >= self.layer.block_size as usize
247 || offset_in_block <= PER_DATA_BLOCK_HEADER_SIZE
248 {
249 return Err(anyhow!(FxfsError::Inconsistent))
250 .context(format!("Offset {} is out of valid range.", offset_in_block));
251 }
252 offset_in_block
253 };
254 self.item_index = index;
255 self.buffer.pos =
256 round_down(self.buffer.pos, self.layer.block_size as usize) + offset_in_block;
257 Ok(())
258 }
259
260 async fn advance(&mut self) -> Result<(), Error> {
261 if self.item_index >= self.item_count {
262 if self.pos >= self.layer.data_offset() + self.layer.data_size {
263 self.key = None;
264 return Ok(());
265 }
266 if self.buffer.chunk.is_none() || self.pos as usize % CHUNK_SIZE == 0 {
267 self.buffer.chunk = Some(
268 self.layer
269 .caching_object_handle
270 .read(self.pos as usize)
271 .await
272 .context("Reading during advance")?,
273 );
274 }
275 self.buffer.pos = self.pos as usize % CHUNK_SIZE;
276 self.item_count = self.buffer.read_u16::<LittleEndian>()?;
277 if self.item_count == 0 {
278 bail!(
279 "Read block with zero item count (object: {}, offset: {})",
280 self.layer.object_handle.object_id(),
281 self.pos
282 );
283 }
284 debug!(
285 pos = self.pos,
286 buf:? = self.buffer,
287 object_size = self.layer.data_offset() + self.layer.data_size,
288 oid = self.layer.object_handle.object_id();
289 ""
290 );
291 self.pos += self.layer.block_size;
292 self.item_index = 0;
293 self.value_deserialized = true;
294 }
295 self.seek_to_block_item(self.item_index)?;
296 self.key = Some(
297 K::deserialize_from_version(self.buffer.by_ref(), self.layer.version)
298 .context("Corrupt layer (key)")?,
299 );
300 self.item_index += 1;
301 self.value_deserialized = false;
302 Ok(())
303 }
304
305 fn get(&self) -> Option<&K> {
306 self.key.as_ref()
307 }
308}
309
310struct Iterator<'iter, K: Key, V: LayerValue> {
311 inner: KeyOnlyIterator<'iter, K, V>,
312 item: Option<Item<K, V>>,
314}
315
316impl<'iter, K: Key, V: LayerValue> Iterator<'iter, K, V> {
317 fn new(mut seek_iterator: KeyOnlyIterator<'iter, K, V>) -> Result<Self, Error> {
318 let key = std::mem::take(&mut seek_iterator.key);
319 let item = if let Some(key) = key {
320 seek_iterator.value_deserialized = true;
321 let value = V::deserialize_from_version(
322 seek_iterator.buffer.by_ref(),
323 seek_iterator.layer.version,
324 )
325 .context("Corrupt layer (value)")?;
326 if seek_iterator.layer.version.major < REMOVE_ITEM_SEQUENCE_VERSION {
327 seek_iterator.buffer.read_u64::<LittleEndian>().context("Corrupt layer (seq)")?;
328 }
329 Some(Item { key, value })
330 } else {
331 None
332 };
333 Ok(Self { inner: seek_iterator, item })
334 }
335}
336
337#[async_trait]
338impl<'iter, K: Key, V: LayerValue> LayerIterator<K, V> for Iterator<'iter, K, V> {
339 async fn advance(&mut self) -> Result<(), Error> {
340 self.inner.advance().await?;
341 let key = std::mem::take(&mut self.inner.key);
342 self.item = if let Some(key) = key {
343 self.inner.value_deserialized = true;
344 let value =
345 V::deserialize_from_version(self.inner.buffer.by_ref(), self.inner.layer.version)
346 .context("Corrupt layer (value)")?;
347 if self.inner.layer.version.major < REMOVE_ITEM_SEQUENCE_VERSION {
348 self.inner.buffer.read_u64::<LittleEndian>().context("Corrupt layer (seq)")?;
349 }
350 Some(Item { key, value })
351 } else {
352 None
353 };
354 Ok(())
355 }
356
357 fn get(&self) -> Option<ItemRef<'_, K, V>> {
358 self.item.as_ref().map(<&Item<K, V>>::into)
359 }
360}
361
362fn seek_table_size(num_data_blocks: u64) -> usize {
364 let seek_table_entries = num_data_blocks.saturating_sub(1) as usize;
366 if seek_table_entries == 0 {
367 return 0;
368 }
369 let entry_size = std::mem::size_of::<u64>();
370 seek_table_entries * entry_size
371}
372
373async fn load_seek_table(
374 object_handle: &(impl ReadObjectHandle + 'static),
375 seek_table_offset: u64,
376 num_data_blocks: u64,
377) -> Result<Vec<u64>, Error> {
378 let seek_table_size = seek_table_size(num_data_blocks);
379 if seek_table_size == 0 {
380 return Ok(vec![]);
381 }
382 if seek_table_size > MAX_SEEK_TABLE_SIZE {
383 return Err(anyhow!(FxfsError::NotSupported)).context("Seek table too large");
384 }
385 let mut buffer = object_handle.allocate_buffer(seek_table_size).await;
386 let bytes_read = object_handle
387 .read(seek_table_offset, buffer.as_mut())
388 .await
389 .context("Reading seek table blocks")?;
390 ensure!(bytes_read == seek_table_size, "Short read");
391
392 let mut seek_table = Vec::with_capacity(num_data_blocks as usize);
393 seek_table.push(0);
395 let mut prev = 0;
396 for chunk in buffer.as_slice().chunks_exact(std::mem::size_of::<u64>()) {
397 let next = LittleEndian::read_u64(chunk);
398 if prev > next {
401 return Err(anyhow!(FxfsError::Inconsistent))
402 .context(format!("Seek table entry out of order, {:?} > {:?}", prev, next));
403 }
404 prev = next;
405 seek_table.push(next);
406 }
407 Ok(seek_table)
408}
409
410async fn load_bloom_filter<K: FuzzyHash>(
411 handle: &(impl ReadObjectHandle + 'static),
412 bloom_filter_offset: u64,
413 layer_info: &LayerInfo,
414) -> Result<Option<BloomFilterReader<K>>, Error> {
415 if layer_info.bloom_filter_size_bytes == 0 {
416 return Ok(None);
417 }
418 if layer_info.bloom_filter_size_bytes > MAX_BLOOM_FILTER_SIZE {
419 return Err(anyhow!(FxfsError::NotSupported)).context("Bloom filter too large");
420 }
421 let mut buffer = handle.allocate_buffer(layer_info.bloom_filter_size_bytes).await;
422 handle.read(bloom_filter_offset, buffer.as_mut()).await.context("Failed to read")?;
423 Ok(Some(BloomFilterReader::read(
424 buffer.as_slice(),
425 layer_info.bloom_filter_seed,
426 layer_info.bloom_filter_num_hashes,
427 )?))
428}
429
430impl<K: Key, V: LayerValue> PersistentLayer<K, V> {
431 pub async fn open(handle: impl ReadObjectHandle + 'static) -> Result<Arc<Self>, Error> {
432 let bs = handle.block_size();
433 let mut buffer = handle.allocate_buffer(bs as usize).await;
434 handle.read(0, buffer.as_mut()).await.context("Failed to read first block")?;
435 let mut cursor = std::io::Cursor::new(buffer.as_slice());
436 let version = Version::deserialize_from(&mut cursor)?;
437
438 ensure!(version <= LATEST_VERSION, FxfsError::InvalidVersion);
439 let header = LayerHeader::deserialize_from_version(&mut cursor, version)
440 .context("Failed to deserialize header")?;
441 if &header.magic != PERSISTENT_LAYER_MAGIC {
442 return Err(anyhow!(FxfsError::Inconsistent).context("Invalid layer file magic"));
443 }
444 if header.block_size == 0 || !header.block_size.is_power_of_two() {
445 return Err(anyhow!(FxfsError::Inconsistent))
446 .context(format!("Invalid block size {}", header.block_size));
447 }
448 ensure!(header.block_size > 0, FxfsError::Inconsistent);
449 ensure!(header.block_size <= MAX_BLOCK_SIZE, FxfsError::NotSupported);
450 let physical_block_size = handle.block_size();
451 if header.block_size % physical_block_size != 0 {
452 return Err(anyhow!(FxfsError::Inconsistent)).context(format!(
453 "{} not a multiple of physical block size {}",
454 header.block_size, physical_block_size
455 ));
456 }
457 std::mem::drop(cursor);
458
459 let bs = header.block_size as usize;
460 if handle.get_size() < MINIMUM_LAYER_FILE_BLOCKS * bs as u64 {
461 return Err(anyhow!(FxfsError::Inconsistent).context("Layer file too short"));
462 }
463
464 let layer_info = {
465 let last_block_offset = handle
466 .get_size()
467 .checked_sub(header.block_size)
468 .ok_or(FxfsError::Inconsistent)
469 .context("Layer file unexpectedly short")?;
470 handle
471 .read(last_block_offset, buffer.subslice_mut(0..header.block_size as usize))
472 .await
473 .context("Failed to read layer info")?;
474 let layer_info_len =
475 LittleEndian::read_u64(&buffer.as_slice()[bs - std::mem::size_of::<u64>()..]);
476 let layer_info_offset = bs
477 .checked_sub(std::mem::size_of::<u64>() + layer_info_len as usize)
478 .ok_or(FxfsError::Inconsistent)
479 .context("Invalid layer info length")?;
480 let mut cursor = std::io::Cursor::new(&buffer.as_slice()[layer_info_offset..]);
481 LayerInfo::deserialize_from_version(&mut cursor, version)
482 .context("Failed to deserialize LayerInfo")?
483 };
484 std::mem::drop(buffer);
485 if layer_info.num_items == 0 && layer_info.num_data_blocks > 0 {
486 return Err(anyhow!(FxfsError::Inconsistent))
487 .context("Invalid num_items/num_data_blocks");
488 }
489 let total_blocks = handle.get_size() / header.block_size;
490 let bloom_filter_blocks =
491 round_up(layer_info.bloom_filter_size_bytes as u64, header.block_size)
492 .unwrap_or(layer_info.bloom_filter_size_bytes as u64)
493 / header.block_size;
494 if layer_info.num_data_blocks + bloom_filter_blocks
495 > total_blocks - MINIMUM_LAYER_FILE_BLOCKS
496 {
497 return Err(anyhow!(FxfsError::Inconsistent)).context("Invalid number of blocks");
498 }
499
500 let bloom_filter_offset =
501 header.block_size * (NUM_HEADER_BLOCKS + layer_info.num_data_blocks);
502 let bloom_filter = if version == LATEST_VERSION {
503 load_bloom_filter(&handle, bloom_filter_offset, &layer_info)
504 .await
505 .context("Failed to load bloom filter")?
506 } else {
507 None
511 };
512 let bloom_filter_stats = bloom_filter.as_ref().map(|b| b.stats());
513
514 let seek_offset = header.block_size
515 * (NUM_HEADER_BLOCKS + layer_info.num_data_blocks + bloom_filter_blocks);
516 let seek_table = load_seek_table(&handle, seek_offset, layer_info.num_data_blocks)
517 .await
518 .context("Failed to load seek table")?;
519
520 let object_handle = Arc::new(handle) as Arc<dyn ReadObjectHandle>;
521 let caching_object_handle = CachingObjectHandle::new(object_handle.clone());
522 Ok(Arc::new(PersistentLayer {
523 object_handle,
524 caching_object_handle,
525 version,
526 block_size: header.block_size,
527 data_size: layer_info.num_data_blocks * header.block_size,
528 seek_table,
529 num_items: layer_info.num_items,
530 bloom_filter,
531 bloom_filter_stats,
532 close_event: Mutex::new(Some(Arc::new(DropEvent::new()))),
533 _value_type: PhantomData::default(),
534 }))
535 }
536
537 pub fn has_bloom_filter(&self) -> bool {
542 self.bloom_filter.is_some()
543 }
544
545 fn data_offset(&self) -> u64 {
546 NUM_HEADER_BLOCKS * self.block_size
547 }
548}
549
550#[async_trait]
551impl<K: Key, V: LayerValue> Layer<K, V> for PersistentLayer<K, V> {
552 fn handle(&self) -> Option<&dyn ReadObjectHandle> {
553 Some(&self.object_handle)
554 }
555
556 fn purge_cached_data(&self) {
557 self.caching_object_handle.purge();
558 }
559
560 async fn seek<'a>(&'a self, bound: Bound<&K>) -> Result<BoxedLayerIterator<'a, K, V>, Error> {
561 let (key, excluded) = match bound {
562 Bound::Unbounded => {
563 let mut iterator = Iterator::new(KeyOnlyIterator::new(self, self.data_offset()))?;
564 iterator.advance().await.context("Unbounded seek advance")?;
565 return Ok(Box::new(iterator));
566 }
567 Bound::Included(k) => (k, false),
568 Bound::Excluded(k) => (k, true),
569 };
570 let first_data_block_index = self.data_offset() / self.block_size;
571
572 let (mut left_offset, mut right_offset) = {
573 let target = key.get_leading_u64();
578 let right_index = self.seek_table.as_slice().partition_point(|&x| x <= target) as u64;
580 let left_index = self.seek_table.as_slice()[..right_index as usize]
583 .partition_point(|&x| x < target)
584 .saturating_sub(1) as u64;
585
586 (
587 (left_index + first_data_block_index) * self.block_size,
588 (right_index + first_data_block_index) * self.block_size,
589 )
590 };
591 let mut left = KeyOnlyIterator::new(self, left_offset);
592 left.advance().await.context("Initial seek advance")?;
593 match left.get() {
594 None => return Ok(Box::new(Iterator::new(left)?)),
595 Some(left_key) => match left_key.cmp_upper_bound(key) {
596 Ordering::Greater => return Ok(Box::new(Iterator::new(left)?)),
597 Ordering::Equal => {
598 if excluded {
599 left.advance().await?;
600 }
601 return Ok(Box::new(Iterator::new(left)?));
602 }
603 Ordering::Less => {}
604 },
605 }
606 let mut right = None;
607 while right_offset - left_offset > self.block_size {
608 let mid_offset =
610 round_down(left_offset + (right_offset - left_offset) / 2, self.block_size);
611 let mut iterator = KeyOnlyIterator::new(self, mid_offset);
612 iterator.advance().await?;
613 let iter_key: &K = iterator.get().unwrap();
614 match iter_key.cmp_upper_bound(key) {
615 Ordering::Greater => {
616 right_offset = mid_offset;
617 right = Some(iterator);
618 }
619 Ordering::Equal => {
620 if excluded {
621 iterator.advance().await?;
622 }
623 return Ok(Box::new(Iterator::new(iterator)?));
624 }
625 Ordering::Less => {
626 left_offset = mid_offset;
627 left = iterator;
628 }
629 }
630 }
631
632 let mut left_index = 0;
634 let mut right_index = left.item_count;
635 while left_index < (right_index - 1) {
637 let mid_index = left_index + ((right_index - left_index) / 2);
638 left.seek_to_block_item(mid_index).context("Read index offset for binary search")?;
639 left.advance().await?;
640 match left.get().unwrap().cmp_upper_bound(key) {
641 Ordering::Greater => {
642 right_index = mid_index;
643 }
644 Ordering::Equal => {
645 if excluded {
646 left.advance().await?;
647 }
648 return Ok(Box::new(Iterator::new(left)?));
649 }
650 Ordering::Less => {
651 left_index = mid_index;
652 }
653 }
654 }
655 if right_index < left.item_count {
660 left.seek_to_block_item(right_index)
661 .context("Read index for offset of right pointer")?;
662 } else if let Some(right) = right {
663 return Ok(Box::new(Iterator::new(right)?));
664 } else {
665 }
671 left.advance().await?;
672 return Ok(Box::new(Iterator::new(left)?));
673 }
674
675 fn len(&self) -> usize {
676 self.num_items
677 }
678
679 fn maybe_contains_key(&self, key: &K) -> bool {
680 self.bloom_filter.as_ref().map_or(true, |f| f.maybe_contains(key))
681 }
682
683 async fn key_exists(&self, key: &K) -> Result<Existence, Error> {
684 match &self.bloom_filter {
685 Some(filter) => Ok(if filter.maybe_contains(key) {
686 Existence::MaybeExists
687 } else {
688 Existence::Missing
689 }),
690 None => {
691 let iter = self.seek(Bound::Included(key)).await?;
692 Ok(iter.get().map_or(Existence::Missing, |i| {
693 if i.key.cmp_upper_bound(key).is_eq() {
694 Existence::Exists
695 } else {
696 Existence::Missing
697 }
698 }))
699 }
700 }
701 }
702
703 fn lock(&self) -> Option<Arc<DropEvent>> {
704 self.close_event.lock().clone()
705 }
706
707 async fn close(&self) {
708 let listener = self.close_event.lock().take().expect("close already called").listen();
709 listener.await;
710 }
711
712 fn get_version(&self) -> Version {
713 return self.version;
714 }
715
716 fn record_inspect_data(self: Arc<Self>, node: &fuchsia_inspect::Node) {
717 node.record_uint("num_items", self.num_items as u64);
718 node.record_bool("persistent", true);
719 node.record_uint("size", self.object_handle.get_size());
720 if let Some(stats) = self.bloom_filter_stats.as_ref() {
721 node.record_child("bloom_filter", move |node| {
722 node.record_uint("size", stats.size as u64);
723 node.record_uint("num_hashes", stats.num_hashes as u64);
724 node.record_uint("fill_percentage", stats.fill_percentage as u64);
725 });
726 }
727 }
728}
729
730const_assert!(MAX_BLOCK_SIZE <= u16::MAX as u64 + 1);
732
733pub struct PersistentLayerWriter<W: WriteBytes, K: Key, V: LayerValue> {
736 writer: W,
737 block_size: u64,
738 buf: Vec<u8>,
739 buf_item_count: LayerWriterBufItemCount,
740 item_count: usize,
741 block_offsets: Vec<u16>,
742 block_keys: Vec<u64>,
743 bloom_filter: BloomFilterWriter<K>,
744 _value: PhantomData<V>,
745}
746
747impl<W: WriteBytes, K: Key, V: LayerValue> PersistentLayerWriter<W, K, V> {
748 pub async fn new(writer: W, num_items: usize, block_size: u64) -> Result<Self, Error> {
750 Self::new_with_version(writer, num_items, block_size, LATEST_VERSION).await
751 }
752
753 pub(crate) async fn new_with_version(
754 mut writer: W,
755 num_items: usize,
756 block_size: u64,
757 version: Version,
758 ) -> Result<Self, Error> {
759 ensure!(block_size <= MAX_BLOCK_SIZE, FxfsError::NotSupported);
760 ensure!(block_size >= MIN_BLOCK_SIZE, FxfsError::NotSupported);
761
762 let header = LayerHeader { magic: PERSISTENT_LAYER_MAGIC.clone(), block_size };
764 let mut buf = vec![0u8; block_size as usize];
765 {
766 let mut cursor = std::io::Cursor::new(&mut buf[..]);
767 version.serialize_into(&mut cursor)?;
768 header.serialize_into(&mut cursor)?;
769 }
770 writer.write_bytes(&buf[..]).await?;
771
772 let seed: u64 = rand::random();
773 Ok(Self {
774 writer,
775 block_size,
776 buf: Vec::new(),
777 buf_item_count: LayerWriterBufItemCount(0),
778 item_count: 0,
779 block_offsets: Vec::new(),
780 block_keys: Vec::new(),
781 bloom_filter: BloomFilterWriter::new(seed, num_items),
782 _value: PhantomData,
783 })
784 }
785
786 async fn write_block(&mut self, len: usize) -> Result<(), Error> {
791 if *self.buf_item_count == 0 {
792 return Ok(());
793 }
794 let seek_table_size = self.block_offsets.len() * PER_DATA_BLOCK_SEEK_ENTRY_SIZE;
795 assert!(PER_DATA_BLOCK_HEADER_SIZE + seek_table_size + len <= self.block_size as usize);
796 let mut cursor = std::io::Cursor::new(vec![0u8; self.block_size as usize]);
797 cursor.write_u16::<LittleEndian>(*self.buf_item_count)?;
798 cursor.write_all(self.buf.drain(..len).as_ref())?;
799 cursor.set_position(self.block_size - seek_table_size as u64);
800 for &offset in &self.block_offsets {
802 cursor.write_u16::<LittleEndian>(offset)?;
803 }
804 self.writer.write_bytes(cursor.get_ref()).await?;
805 debug!(item_count = *self.buf_item_count, byte_count = len; "wrote items");
806 *self.buf_item_count = 0;
807 self.block_offsets.clear();
808 Ok(())
809 }
810
811 async fn write_seek_table(&mut self) -> Result<usize, Error> {
816 if self.block_keys.len() == 0 {
817 return Ok(0);
818 }
819 let size = self.block_keys.len() * std::mem::size_of::<u64>();
820 self.buf.resize(size, 0);
821 let mut len = 0;
822 for key in &self.block_keys {
823 LittleEndian::write_u64(&mut self.buf[len..len + std::mem::size_of::<u64>()], *key);
824 len += std::mem::size_of::<u64>();
825 }
826 self.writer.write_bytes(&self.buf).await?;
827 Ok(size)
828 }
829
830 async fn write_info(
833 &mut self,
834 num_data_blocks: u64,
835 bloom_filter_size_bytes: usize,
836 seek_table_len: usize,
837 ) -> Result<(), Error> {
838 let block_size = self.writer.block_size() as usize;
839 let layer_info = LayerInfo {
840 num_items: self.item_count,
841 num_data_blocks,
842 bloom_filter_size_bytes,
843 bloom_filter_seed: self.bloom_filter.seed(),
844 bloom_filter_num_hashes: self.bloom_filter.num_hashes(),
845 };
846 let actual_len = {
847 let mut cursor = std::io::Cursor::new(&mut self.buf);
848 layer_info.serialize_into(&mut cursor)?;
849 let layer_info_len = cursor.position();
850 cursor.write_u64::<LittleEndian>(layer_info_len)?;
851 cursor.position() as usize
852 };
853
854 let avail_in_block = block_size - (seek_table_len % block_size);
857 let to_skip = if avail_in_block < actual_len {
858 block_size + avail_in_block - actual_len
859 } else {
860 avail_in_block - actual_len
861 } as u64;
862 self.writer.skip(to_skip).await?;
863 self.writer.write_bytes(&self.buf[..actual_len]).await?;
864 Ok(())
865 }
866
867 async fn write_bloom_filter(&mut self) -> Result<usize, Error> {
870 if self.data_blocks() < MINIMUM_DATA_BLOCKS_FOR_BLOOM_FILTER {
871 return Ok(0);
872 }
873 let size = round_up(self.bloom_filter.serialized_size(), self.block_size as usize).unwrap();
875 self.buf.resize(size, 0);
876 let mut cursor = std::io::Cursor::new(&mut self.buf);
877 self.bloom_filter.write(&mut cursor)?;
878 self.writer.write_bytes(&self.buf).await?;
879 Ok(self.bloom_filter.serialized_size())
880 }
881
882 #[cfg(test)]
885 pub(crate) fn bloom_filter(&mut self) -> &mut BloomFilterWriter<K> {
886 &mut self.bloom_filter
887 }
888
889 fn data_blocks(&self) -> usize {
890 if self.item_count == 0 { 0 } else { self.block_keys.len() + 1 }
891 }
892}
893
894impl<W: WriteBytes + Send, K: Key, V: LayerValue> LayerWriter<K, V>
895 for PersistentLayerWriter<W, K, V>
896{
897 async fn write(&mut self, item: ItemRef<'_, K, V>) -> Result<(), Error> {
898 let len = self.buf.len();
900 item.key.serialize_into(&mut self.buf)?;
901 item.value.serialize_into(&mut self.buf)?;
902
903 let mut added_offset = false;
904 if *self.buf_item_count > 0 {
906 self.block_offsets.push(u16::try_from(len + PER_DATA_BLOCK_HEADER_SIZE).unwrap());
907 added_offset = true;
908 }
909
910 if PER_DATA_BLOCK_HEADER_SIZE
913 + self.buf.len()
914 + (self.block_offsets.len() * PER_DATA_BLOCK_SEEK_ENTRY_SIZE)
915 > self.block_size as usize - 1
916 {
917 if added_offset {
918 self.block_offsets.pop();
921 }
922 self.write_block(len).await?;
923
924 self.block_keys.push(item.key.get_leading_u64());
926 }
927
928 self.bloom_filter.insert(&item.key);
929 *self.buf_item_count += 1;
930 self.item_count += 1;
931 Ok(())
932 }
933
934 async fn complete(mut self) -> Result<u64, Error> {
935 self.write_block(self.buf.len()).await?;
936 let data_blocks = self.data_blocks() as u64;
937 let bloom_filter_len = self.write_bloom_filter().await?;
938 let seek_table_len = self.write_seek_table().await?;
939 self.write_info(data_blocks, bloom_filter_len, seek_table_len).await?;
940 self.writer.complete().await
941 }
942}
943
944#[repr(transparent)]
946struct LayerWriterBufItemCount(u16);
947
948impl Drop for LayerWriterBufItemCount {
949 fn drop(&mut self) {
950 debug_assert!(self.0 == 0, "Dropping unwritten items; did you forget to call complete?");
951 if self.0 > 0 {
952 warn!("Dropping unwritten items; did you forget to call complete?");
953 }
954 }
955}
956
957impl std::ops::Deref for LayerWriterBufItemCount {
958 type Target = u16;
959 fn deref(&self) -> &u16 {
960 &self.0
961 }
962}
963
964impl std::ops::DerefMut for LayerWriterBufItemCount {
965 fn deref_mut(&mut self) -> &mut u16 {
966 &mut self.0
967 }
968}
969
970#[cfg(test)]
971mod tests {
972 use super::{PersistentLayer, PersistentLayerWriter};
973 use crate::filesystem::MAX_BLOCK_SIZE;
974 use crate::lsm_tree::LayerIterator;
975 use crate::lsm_tree::persistent_layer::MINIMUM_DATA_BLOCKS_FOR_BLOOM_FILTER;
976 use crate::lsm_tree::types::{Existence, Item, ItemRef, Layer, LayerWriter, OrdUpperBound};
977 use crate::object_handle::WriteBytes;
978 use crate::object_store::AttributeId;
979 use crate::object_store::object_record::ObjectKey;
980 use crate::round::round_up;
981 use crate::serialized_types::{LATEST_VERSION, Version};
982 use crate::testing::fake_object::{FakeObject, FakeObjectHandle};
983 use crate::testing::writer::Writer;
984
985 use std::fmt::Debug;
986
987 use std::ops::{Bound, Range};
988 use std::sync::Arc;
989
990 impl<W: WriteBytes> Debug for PersistentLayerWriter<W, i32, i32> {
991 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
992 f.debug_struct("rPersistentLayerWriter")
993 .field("block_size", &self.block_size)
994 .field("item_count", &*self.buf_item_count)
995 .finish()
996 }
997 }
998
999 #[fuchsia::test]
1000 async fn test_iterate_after_write() {
1001 const BLOCK_SIZE: u64 = 512;
1002 const ITEM_COUNT: i32 = 10000;
1003
1004 let handle = FakeObjectHandle::new(Arc::new(FakeObject::new()));
1005 {
1006 let mut writer = PersistentLayerWriter::<_, i32, i32>::new(
1007 Writer::new(&handle).await,
1008 ITEM_COUNT as usize * 4,
1009 BLOCK_SIZE,
1010 )
1011 .await
1012 .expect("writer new");
1013 for i in 0..ITEM_COUNT {
1014 writer.write(Item::new(i, i).as_item_ref()).await.expect("write failed");
1015 }
1016 writer.complete().await.expect("flush failed");
1017 }
1018 let layer = PersistentLayer::<i32, i32>::open(handle).await.expect("new failed");
1019 let mut iterator = layer.seek(Bound::Unbounded).await.expect("seek failed");
1020 for i in 0..ITEM_COUNT {
1021 let ItemRef { key, value, .. } = iterator.get().expect("missing item");
1022 assert_eq!((key, value), (&i, &i));
1023 iterator.advance().await.expect("failed to advance");
1024 }
1025 assert!(iterator.get().is_none());
1026 }
1027
1028 #[fuchsia::test]
1029 async fn test_seek_after_write() {
1030 const BLOCK_SIZE: u64 = 512;
1031 const ITEM_COUNT: i32 = 5000;
1032
1033 let handle = FakeObjectHandle::new(Arc::new(FakeObject::new()));
1034 {
1035 let mut writer = PersistentLayerWriter::<_, i32, i32>::new(
1036 Writer::new(&handle).await,
1037 ITEM_COUNT as usize * 18,
1038 BLOCK_SIZE,
1039 )
1040 .await
1041 .expect("writer new");
1042 for i in 0..ITEM_COUNT {
1043 writer.write(Item::new(i * 2, i * 2).as_item_ref()).await.expect("write failed");
1045 }
1046 writer.complete().await.expect("flush failed");
1047 }
1048 let layer = PersistentLayer::<i32, i32>::open(handle).await.expect("new failed");
1049 for i in 0..ITEM_COUNT * 2 {
1051 let expected = round_up(i, 2).unwrap();
1054 let mut iterator = layer.seek(Bound::Included(&i)).await.expect("failed to seek");
1055 if i >= (ITEM_COUNT * 2) - 1 {
1058 assert!(iterator.get().is_none());
1059 } else {
1060 let ItemRef { key, value, .. } = iterator.get().expect("missing item");
1061 assert_eq!((key, value), (&expected, &expected));
1062 }
1063
1064 iterator.advance().await.expect("failed to advance");
1066 if i >= (ITEM_COUNT * 2) - 3 {
1070 assert!(iterator.get().is_none());
1071 } else {
1072 let ItemRef { key, value, .. } = iterator.get().expect("missing item");
1073 let next = expected + 2;
1074 assert_eq!((key, value), (&next, &next));
1075 }
1076 }
1077 }
1078
1079 #[fuchsia::test]
1080 async fn test_seek_unbounded() {
1081 const BLOCK_SIZE: u64 = 512;
1082 const ITEM_COUNT: i32 = 1000;
1083
1084 let handle = FakeObjectHandle::new(Arc::new(FakeObject::new()));
1085 {
1086 let mut writer = PersistentLayerWriter::<_, i32, i32>::new(
1087 Writer::new(&handle).await,
1088 ITEM_COUNT as usize * 18,
1089 BLOCK_SIZE,
1090 )
1091 .await
1092 .expect("writer new");
1093 for i in 0..ITEM_COUNT {
1094 writer.write(Item::new(i, i).as_item_ref()).await.expect("write failed");
1095 }
1096 writer.complete().await.expect("flush failed");
1097 }
1098 let layer = PersistentLayer::<i32, i32>::open(handle).await.expect("new failed");
1099 let mut iterator = layer.seek(Bound::Unbounded).await.expect("failed to seek");
1100 let ItemRef { key, value, .. } = iterator.get().expect("missing item");
1101 assert_eq!((key, value), (&0, &0));
1102
1103 iterator.advance().await.expect("failed to advance");
1105 let ItemRef { key, value, .. } = iterator.get().expect("missing item");
1106 assert_eq!((key, value), (&1, &1));
1107 }
1108
1109 #[fuchsia::test]
1110 async fn test_zero_items() {
1111 const BLOCK_SIZE: u64 = 512;
1112
1113 let handle = FakeObjectHandle::new(Arc::new(FakeObject::new()));
1114 {
1115 let writer = PersistentLayerWriter::<_, i32, i32>::new(
1116 Writer::new(&handle).await,
1117 0,
1118 BLOCK_SIZE,
1119 )
1120 .await
1121 .expect("writer new");
1122 writer.complete().await.expect("flush failed");
1123 }
1124
1125 let layer = PersistentLayer::<i32, i32>::open(handle).await.expect("new failed");
1126 let iterator = (layer.as_ref() as &dyn Layer<i32, i32>)
1127 .seek(Bound::Unbounded)
1128 .await
1129 .expect("seek failed");
1130 assert!(iterator.get().is_none())
1131 }
1132
1133 #[fuchsia::test]
1134 async fn test_one_item() {
1135 const BLOCK_SIZE: u64 = 512;
1136
1137 let handle = FakeObjectHandle::new(Arc::new(FakeObject::new()));
1138 {
1139 let mut writer = PersistentLayerWriter::<_, i32, i32>::new(
1140 Writer::new(&handle).await,
1141 1,
1142 BLOCK_SIZE,
1143 )
1144 .await
1145 .expect("writer new");
1146 writer.write(Item::new(42, 42).as_item_ref()).await.expect("write failed");
1147 writer.complete().await.expect("flush failed");
1148 }
1149
1150 let layer = PersistentLayer::<i32, i32>::open(handle).await.expect("new failed");
1151 {
1152 let mut iterator = (layer.as_ref() as &dyn Layer<i32, i32>)
1153 .seek(Bound::Unbounded)
1154 .await
1155 .expect("seek failed");
1156 let ItemRef { key, value, .. } = iterator.get().expect("missing item");
1157 assert_eq!((key, value), (&42, &42));
1158 iterator.advance().await.expect("failed to advance");
1159 assert!(iterator.get().is_none())
1160 }
1161 {
1162 let mut iterator = (layer.as_ref() as &dyn Layer<i32, i32>)
1163 .seek(Bound::Included(&30))
1164 .await
1165 .expect("seek failed");
1166 let ItemRef { key, value, .. } = iterator.get().expect("missing item");
1167 assert_eq!((key, value), (&42, &42));
1168 iterator.advance().await.expect("failed to advance");
1169 assert!(iterator.get().is_none())
1170 }
1171 {
1172 let mut iterator = (layer.as_ref() as &dyn Layer<i32, i32>)
1173 .seek(Bound::Included(&42))
1174 .await
1175 .expect("seek failed");
1176 let ItemRef { key, value, .. } = iterator.get().expect("missing item");
1177 assert_eq!((key, value), (&42, &42));
1178 iterator.advance().await.expect("failed to advance");
1179 assert!(iterator.get().is_none())
1180 }
1181 {
1182 let iterator = (layer.as_ref() as &dyn Layer<i32, i32>)
1183 .seek(Bound::Included(&43))
1184 .await
1185 .expect("seek failed");
1186 assert!(iterator.get().is_none())
1187 }
1188 }
1189
1190 #[fuchsia::test]
1191 async fn test_large_block_size() {
1192 const BLOCK_SIZE: u64 = MAX_BLOCK_SIZE;
1194 const ITEM_COUNT: i32 = ((BLOCK_SIZE as i32) / 18) * 3;
1196
1197 let handle =
1198 FakeObjectHandle::new_with_block_size(Arc::new(FakeObject::new()), BLOCK_SIZE as usize);
1199 {
1200 let mut writer = PersistentLayerWriter::<_, i32, i32>::new(
1201 Writer::new(&handle).await,
1202 ITEM_COUNT as usize * 18,
1203 BLOCK_SIZE,
1204 )
1205 .await
1206 .expect("writer new");
1207 for i in 2000000000..(2000000000 + ITEM_COUNT) {
1209 writer.write(Item::new(i, i).as_item_ref()).await.expect("write failed");
1210 }
1211 writer.complete().await.expect("flush failed");
1212 }
1213
1214 let layer = PersistentLayer::<i32, i32>::open(handle).await.expect("new failed");
1215 let mut iterator = layer.seek(Bound::Unbounded).await.expect("seek failed");
1216 for i in 2000000000..(2000000000 + ITEM_COUNT) {
1217 let ItemRef { key, value, .. } = iterator.get().expect("missing item");
1218 assert_eq!((key, value), (&i, &i));
1219 iterator.advance().await.expect("failed to advance");
1220 }
1221 assert!(iterator.get().is_none());
1222 }
1223
1224 #[fuchsia::test]
1225 async fn test_overlarge_block_size() {
1226 const BLOCK_SIZE: u64 = MAX_BLOCK_SIZE * 2;
1228
1229 let handle =
1230 FakeObjectHandle::new_with_block_size(Arc::new(FakeObject::new()), BLOCK_SIZE as usize);
1231 PersistentLayerWriter::<_, i32, i32>::new(Writer::new(&handle).await, 0, BLOCK_SIZE)
1232 .await
1233 .expect_err("Creating writer with overlarge block size.");
1234 }
1235
1236 #[fuchsia::test]
1237 async fn test_seek_bound_excluded() {
1238 const BLOCK_SIZE: u64 = 512;
1239 const ITEM_COUNT: i32 = 10000;
1240
1241 let handle = FakeObjectHandle::new(Arc::new(FakeObject::new()));
1242 {
1243 let mut writer = PersistentLayerWriter::<_, i32, i32>::new(
1244 Writer::new(&handle).await,
1245 ITEM_COUNT as usize * 18,
1246 BLOCK_SIZE,
1247 )
1248 .await
1249 .expect("writer new");
1250 for i in 0..ITEM_COUNT {
1251 writer.write(Item::new(i, i).as_item_ref()).await.expect("write failed");
1252 }
1253 writer.complete().await.expect("flush failed");
1254 }
1255 let layer = PersistentLayer::<i32, i32>::open(handle).await.expect("new failed");
1256
1257 for i in 9982..ITEM_COUNT {
1258 let mut iterator = layer.seek(Bound::Excluded(&i)).await.expect("failed to seek");
1259 let i_plus_one = i + 1;
1260 if i_plus_one < ITEM_COUNT {
1261 let ItemRef { key, value, .. } = iterator.get().expect("missing item");
1262
1263 assert_eq!((key, value), (&i_plus_one, &i_plus_one));
1264
1265 iterator.advance().await.expect("failed to advance");
1267 let i_plus_two = i + 2;
1268 if i_plus_two < ITEM_COUNT {
1269 let ItemRef { key, value, .. } = iterator.get().expect("missing item");
1270 assert_eq!((key, value), (&i_plus_two, &i_plus_two));
1271 } else {
1272 assert!(iterator.get().is_none());
1273 }
1274 } else {
1275 assert!(iterator.get().is_none());
1276 }
1277 }
1278 }
1279
1280 use crate::lsm_tree::testing::TestKey;
1281
1282 fn generate_extents(
1286 object_id: u64,
1287 base_offset: u64,
1288 count: u64,
1289 ) -> (Vec<Item<ObjectKey, u64>>, u64) {
1290 let mut items = Vec::new();
1291 for i in 0..count {
1292 items.push(Item::new(
1293 ObjectKey::extent(
1294 object_id,
1295 AttributeId::TEST_ID,
1296 base_offset + i..base_offset + i + 1,
1297 ),
1298 object_id,
1299 ));
1300 }
1301 (items, object_id + 1)
1302 }
1303
1304 fn generate_objects(object_id_range: Range<u64>) -> (Vec<Item<ObjectKey, u64>>, u64) {
1308 let mut items = Vec::new();
1309 let end = object_id_range.end;
1310 for object_id in object_id_range {
1311 items.push(Item::new(ObjectKey::object(object_id), object_id));
1312 }
1313 (items, end)
1314 }
1315
1316 #[fuchsia::test]
1319 async fn test_block_seek_duplicate_leading_u64() {
1320 const BLOCK_SIZE: u64 = 512;
1322 const ITEMS_PER_PHASE: u64 = 50;
1323
1324 let mut to_find = Vec::new();
1325
1326 let handle =
1327 FakeObjectHandle::new_with_block_size(Arc::new(FakeObject::new()), BLOCK_SIZE as usize);
1328 {
1329 let mut items = Vec::new();
1330 let mut object_id = u32::MAX as u64 + 1;
1332
1333 {
1336 let base_extent_offset = 0;
1337 let (mut generated, next_object_id) =
1338 generate_extents(object_id, base_extent_offset, ITEMS_PER_PHASE * 3);
1339 items.append(&mut generated);
1340 let count = ITEMS_PER_PHASE * 3;
1341 to_find.push(ObjectKey::extent(
1342 object_id,
1343 AttributeId::TEST_ID,
1344 base_extent_offset..base_extent_offset + 1,
1345 ));
1346 to_find.push(ObjectKey::extent(
1347 object_id,
1348 AttributeId::TEST_ID,
1349 base_extent_offset + (count / 2)..base_extent_offset + (count / 2) + 1,
1350 ));
1351 to_find.push(ObjectKey::extent(
1352 object_id,
1353 AttributeId::TEST_ID,
1354 base_extent_offset + (count - 1)..base_extent_offset + count,
1355 ));
1356 object_id = next_object_id;
1357 }
1358
1359 {
1361 let (mut generated, next_object_id) =
1362 generate_objects(object_id..object_id + ITEMS_PER_PHASE * 3);
1363 items.append(&mut generated);
1364 object_id = next_object_id;
1365 }
1366
1367 {
1370 let base_extent_offset = 1000;
1371 let (mut generated, next_object_id) =
1372 generate_extents(object_id, base_extent_offset, ITEMS_PER_PHASE * 3);
1373 items.append(&mut generated);
1374 let count = ITEMS_PER_PHASE * 3;
1375 to_find.push(ObjectKey::extent(
1376 object_id,
1377 AttributeId::TEST_ID,
1378 base_extent_offset..base_extent_offset + 1,
1379 ));
1380 to_find.push(ObjectKey::extent(
1381 object_id,
1382 AttributeId::TEST_ID,
1383 base_extent_offset + (count / 2)..base_extent_offset + (count / 2) + 1,
1384 ));
1385 to_find.push(ObjectKey::extent(
1386 object_id,
1387 AttributeId::TEST_ID,
1388 base_extent_offset + (count - 1)..base_extent_offset + count,
1389 ));
1390 object_id = next_object_id;
1391 }
1392
1393 {
1395 let (mut generated, next_object_id) =
1396 generate_objects(object_id..object_id + ITEMS_PER_PHASE * 3);
1397 items.append(&mut generated);
1398 object_id = next_object_id;
1399 }
1400
1401 {
1404 let base_extent_offset = 2000;
1405 let (mut generated, _) =
1406 generate_extents(object_id, base_extent_offset, ITEMS_PER_PHASE * 3);
1407 items.append(&mut generated);
1408 let count = ITEMS_PER_PHASE * 3;
1409 to_find.push(ObjectKey::extent(
1410 object_id,
1411 AttributeId::TEST_ID,
1412 base_extent_offset..base_extent_offset + 1,
1413 ));
1414 to_find.push(ObjectKey::extent(
1415 object_id,
1416 AttributeId::TEST_ID,
1417 base_extent_offset + (count / 2)..base_extent_offset + (count / 2) + 1,
1418 ));
1419 to_find.push(ObjectKey::extent(
1420 object_id,
1421 AttributeId::TEST_ID,
1422 base_extent_offset + (count - 1)..base_extent_offset + count,
1423 ));
1424 }
1425
1426 items.sort_by(|a, b| a.key.cmp_upper_bound(&b.key));
1428
1429 let mut writer = PersistentLayerWriter::<_, ObjectKey, u64>::new(
1430 Writer::new(&handle).await,
1431 3 * BLOCK_SIZE as usize,
1432 BLOCK_SIZE,
1433 )
1434 .await
1435 .expect("writer new");
1436
1437 for item in items {
1438 writer.write(item.as_item_ref()).await.expect("write failed");
1439 }
1440
1441 writer.complete().await.expect("flush failed");
1442 }
1443
1444 let layer = PersistentLayer::<ObjectKey, u64>::open(handle).await.expect("new failed");
1445 for target in to_find {
1446 let iterator: Box<dyn LayerIterator<ObjectKey, u64>> =
1447 layer.seek(Bound::Included(&target)).await.expect("failed to seek");
1448 let ItemRef { key, .. } = iterator.get().expect("missing item");
1449 assert_eq!(&target, key);
1450 }
1451 }
1452
1453 #[fuchsia::test]
1454 async fn test_two_seek_blocks() {
1455 const BLOCK_SIZE: u64 = 512;
1457 const ITEMS_PER_PHASE: u64 = 50;
1458 const ITEM_COUNT: u64 = ITEMS_PER_PHASE * ((BLOCK_SIZE / 8) + 2);
1459
1460 let mut to_find = Vec::new();
1461
1462 let handle =
1463 FakeObjectHandle::new_with_block_size(Arc::new(FakeObject::new()), BLOCK_SIZE as usize);
1464 {
1465 let mut writer = PersistentLayerWriter::<_, TestKey, u64>::new(
1466 Writer::new(&handle).await,
1467 ITEM_COUNT as usize * 18,
1468 BLOCK_SIZE,
1469 )
1470 .await
1471 .expect("writer new");
1472
1473 let initial_value = u32::MAX as u64 + 1;
1475 for i in 0..ITEM_COUNT {
1476 writer
1477 .write(
1478 Item::new(TestKey(initial_value + i..initial_value + i), initial_value)
1479 .as_item_ref(),
1480 )
1481 .await
1482 .expect("write failed");
1483 }
1484 to_find.push(TestKey(initial_value..initial_value));
1486 let middle = initial_value + ITEM_COUNT / 2;
1487 to_find.push(TestKey(middle..middle));
1488 let end = initial_value + ITEM_COUNT - 1;
1489 to_find.push(TestKey(end..end));
1490
1491 writer.complete().await.expect("flush failed");
1492 }
1493
1494 let layer = PersistentLayer::<TestKey, u64>::open(handle).await.expect("new failed");
1495 for target in to_find {
1496 let iterator: Box<dyn LayerIterator<TestKey, u64>> =
1497 layer.seek(Bound::Included(&target)).await.expect("failed to seek");
1498 let ItemRef { key, .. } = iterator.get().expect("missing item");
1499 assert_eq!(&target, key);
1500 }
1501 }
1502
1503 #[fuchsia::test]
1506 async fn test_full_seek_block() {
1507 const BLOCK_SIZE: u64 = 512;
1508 const ITEMS_PER_PHASE: u64 = 50;
1509
1510 const SEEK_TABLE_ENTRIES: u64 = BLOCK_SIZE / 8;
1512
1513 const START_ENTRIES_COUNT: u64 = ITEMS_PER_PHASE * SEEK_TABLE_ENTRIES;
1517
1518 for entries in START_ENTRIES_COUNT..START_ENTRIES_COUNT + (ITEMS_PER_PHASE * 2) {
1519 let handle = FakeObjectHandle::new_with_block_size(
1520 Arc::new(FakeObject::new()),
1521 BLOCK_SIZE as usize,
1522 );
1523 {
1524 let mut writer = PersistentLayerWriter::<_, TestKey, u64>::new(
1525 Writer::new(&handle).await,
1526 entries as usize,
1527 BLOCK_SIZE,
1528 )
1529 .await
1530 .expect("writer new");
1531
1532 let initial_value = u32::MAX as u64 + 1;
1534 for i in 0..entries {
1535 writer
1536 .write(
1537 Item::new(TestKey(initial_value + i..initial_value + i), initial_value)
1538 .as_item_ref(),
1539 )
1540 .await
1541 .expect("write failed");
1542 }
1543
1544 writer.complete().await.expect("flush failed");
1545 }
1546 PersistentLayer::<TestKey, u64>::open(handle).await.expect("new failed");
1547 }
1548 }
1549
1550 #[fuchsia::test]
1551 async fn test_ignore_bloom_filter_on_older_versions() {
1552 const BLOCK_SIZE: u64 = 512;
1553 const ITEMS_PER_PHASE: u64 = 50;
1554 const ITEM_COUNT: u64 = (1 + MINIMUM_DATA_BLOCKS_FOR_BLOOM_FILTER as u64) * ITEMS_PER_PHASE;
1556
1557 let old_version_handle =
1558 FakeObjectHandle::new_with_block_size(Arc::new(FakeObject::new()), BLOCK_SIZE as usize);
1559 let current_version_handle =
1560 FakeObjectHandle::new_with_block_size(Arc::new(FakeObject::new()), BLOCK_SIZE as usize);
1561 {
1562 let mut old_version_writer =
1563 PersistentLayerWriter::<_, TestKey, u64>::new_with_version(
1564 Writer::new(&old_version_handle).await,
1565 ITEM_COUNT as usize,
1566 BLOCK_SIZE,
1567 Version { major: LATEST_VERSION.major - 1, minor: 0 },
1568 )
1569 .await
1570 .expect("writer new");
1571 let mut current_version_writer = PersistentLayerWriter::<_, TestKey, u64>::new(
1572 Writer::new(¤t_version_handle).await,
1573 ITEM_COUNT as usize,
1574 BLOCK_SIZE,
1575 )
1576 .await
1577 .expect("writer new");
1578
1579 let initial_value = u32::MAX as u64 + 1;
1581 for i in 0..ITEM_COUNT {
1582 old_version_writer
1583 .write(
1584 Item::new(TestKey(initial_value + i..initial_value + i), initial_value)
1585 .as_item_ref(),
1586 )
1587 .await
1588 .expect("write failed");
1589 current_version_writer
1590 .write(
1591 Item::new(TestKey(initial_value + i..initial_value + i), initial_value)
1592 .as_item_ref(),
1593 )
1594 .await
1595 .expect("write failed");
1596 }
1597
1598 old_version_writer.complete().await.expect("flush failed");
1599 current_version_writer.complete().await.expect("flush failed");
1600 }
1601
1602 let old_layer =
1603 PersistentLayer::<TestKey, u64>::open(old_version_handle).await.expect("open failed");
1604 let current_layer = PersistentLayer::<TestKey, u64>::open(current_version_handle)
1605 .await
1606 .expect("open failed");
1607 assert!(!old_layer.has_bloom_filter());
1608 assert!(current_layer.has_bloom_filter());
1609 }
1610
1611 #[fuchsia::test]
1612 async fn test_key_exists_no_bloom_filter() {
1613 const BLOCK_SIZE: u64 = 8192;
1614 const ITEM_COUNT: i32 = 100;
1616
1617 let handle =
1618 FakeObjectHandle::new_with_block_size(Arc::new(FakeObject::new()), BLOCK_SIZE as usize);
1619 {
1620 let mut writer = PersistentLayerWriter::<_, i32, i32>::new(
1621 Writer::new(&handle).await,
1622 ITEM_COUNT as usize,
1623 BLOCK_SIZE,
1624 )
1625 .await
1626 .expect("writer new");
1627 for i in 0..ITEM_COUNT {
1628 writer.write(Item::new(i * 2, i * 2).as_item_ref()).await.expect("write failed");
1629 }
1630 writer.complete().await.expect("flush failed");
1631 }
1632 let layer = PersistentLayer::<i32, i32>::open(handle).await.expect("new failed");
1633 assert!(!layer.has_bloom_filter());
1634
1635 for i in 0..ITEM_COUNT {
1636 assert_eq!(
1637 layer.key_exists(&(i * 2)).await.expect("key_exists failed"),
1638 Existence::Exists
1639 );
1640 assert_eq!(
1641 layer.key_exists(&(i * 2 + 1)).await.expect("key_exists failed"),
1642 Existence::Missing
1643 );
1644 }
1645 }
1646
1647 #[fuchsia::test]
1648 async fn test_key_exists_with_bloom_filter() {
1649 const BLOCK_SIZE: u64 = 512;
1650 const ITEM_COUNT: i32 = 10000;
1652
1653 let handle = FakeObjectHandle::new(Arc::new(FakeObject::new()));
1654 {
1655 let mut writer = PersistentLayerWriter::<_, i32, i32>::new(
1656 Writer::new(&handle).await,
1657 ITEM_COUNT as usize,
1658 BLOCK_SIZE,
1659 )
1660 .await
1661 .expect("writer new");
1662 for i in 0..ITEM_COUNT {
1663 writer.write(Item::new(i * 2, i * 2).as_item_ref()).await.expect("write failed");
1664 }
1665 writer.complete().await.expect("flush failed");
1666 }
1667 let layer = PersistentLayer::<i32, i32>::open(handle).await.expect("new failed");
1668 assert!(layer.has_bloom_filter());
1669
1670 for i in 0..ITEM_COUNT {
1671 assert_eq!(
1673 layer.key_exists(&(i * 2)).await.expect("key_exists failed"),
1674 Existence::MaybeExists
1675 );
1676 }
1677
1678 let mut missing_count = 0;
1681 for i in 0..ITEM_COUNT {
1682 let result = layer.key_exists(&(i * 2 + 1)).await.expect("key_exists failed");
1683 assert_ne!(result, Existence::Exists);
1684 if result == Existence::Missing {
1685 missing_count += 1;
1686 }
1687 }
1688 assert!(missing_count > ITEM_COUNT / 2);
1690 }
1691}