1use crate::drop_event::DropEvent;
61use crate::errors::FxfsError;
62use crate::filesystem::MAX_BLOCK_SIZE;
63use crate::log::*;
64use crate::lsm_tree::bloom_filter::{BloomFilterReader, BloomFilterStats, BloomFilterWriter};
65use crate::lsm_tree::types::{
66 BoxedLayerIterator, FuzzyHash, Item, ItemCount, ItemRef, Key, Layer, LayerIterator, LayerValue,
67 LayerWriter,
68};
69use crate::object_handle::{ObjectHandle, ReadObjectHandle, WriteBytes};
70use crate::object_store::caching_object_handle::{CachedChunk, CachingObjectHandle, CHUNK_SIZE};
71use crate::round::{round_down, round_up};
72use crate::serialized_types::{Version, Versioned, VersionedLatest, LATEST_VERSION};
73use anyhow::{anyhow, bail, ensure, Context, Error};
74use async_trait::async_trait;
75use byteorder::{ByteOrder, LittleEndian, ReadBytesExt, WriteBytesExt};
76use fprint::TypeFingerprint;
77use fuchsia_sync::Mutex;
78use rand::Rng as _;
79use serde::{Deserialize, Serialize};
80use static_assertions::const_assert;
81use std::cmp::Ordering;
82use std::io::{Read, Write as _};
83use std::marker::PhantomData;
84use std::ops::Bound;
85use std::sync::Arc;
86
87const PERSISTENT_LAYER_MAGIC: &[u8; 8] = b"FxfsLayr";
88
89pub type LayerHeader = LayerHeaderV39;
91
92#[derive(Debug, Serialize, Deserialize, TypeFingerprint, Versioned)]
93pub struct LayerHeaderV39 {
94 magic: [u8; 8],
96 block_size: u64,
102}
103
104pub type LayerInfo = LayerInfoV39;
106
107#[derive(Debug, Serialize, Deserialize, TypeFingerprint, Versioned)]
108pub struct LayerInfoV39 {
109 num_items: usize,
112 num_data_blocks: u64,
114 bloom_filter_size_bytes: usize,
116 bloom_filter_seed: u64,
118 bloom_filter_num_nonces: usize,
120}
121
122pub type OldLayerInfo = OldLayerInfoV32;
124
125#[derive(Debug, Serialize, Deserialize, TypeFingerprint, Versioned)]
126pub struct OldLayerInfoV32 {
127 key_value_version: Version,
129 block_size: u64,
135}
136
137pub struct PersistentLayer<K, V> {
139 object_handle: Arc<dyn ReadObjectHandle>,
146 caching_object_handle: CachingObjectHandle<Arc<dyn ReadObjectHandle>>,
147 version: Version,
148 block_size: u64,
149 data_size: u64,
150 seek_table: Vec<u64>,
151 num_items: Option<usize>,
152 bloom_filter: Option<BloomFilterReader<K>>,
153 bloom_filter_stats: Option<BloomFilterStats>,
154 close_event: Mutex<Option<Arc<DropEvent>>>,
155 _value_type: PhantomData<V>,
156}
157
158#[derive(Debug)]
159struct BufferCursor {
160 chunk: Option<CachedChunk>,
161 pos: usize,
162}
163
164impl std::io::Read for BufferCursor {
165 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
166 let chunk = if let Some(chunk) = &self.chunk {
167 chunk
168 } else {
169 return Ok(0);
170 };
171 let to_read = std::cmp::min(buf.len(), chunk.len().saturating_sub(self.pos));
172 if to_read > 0 {
173 buf[..to_read].copy_from_slice(&chunk[self.pos..self.pos + to_read]);
174 self.pos += to_read;
175 }
176 Ok(to_read)
177 }
178}
179
180const MIN_BLOCK_SIZE: u64 = 512;
181
182const MINIMUM_DATA_BLOCKS_FOR_BLOOM_FILTER: usize = 4;
184
185const NUM_HEADER_BLOCKS: u64 = 1;
187
188const MINIMUM_LAYER_FILE_BLOCKS: u64 = 2;
191
192const MAX_BLOOM_FILTER_SIZE: usize = 64 * 1024 * 1024;
195const MAX_SEEK_TABLE_SIZE: usize = 64 * 1024 * 1024;
196
197const PER_DATA_BLOCK_HEADER_SIZE: usize = 2;
199const PER_DATA_BLOCK_SEEK_ENTRY_SIZE: usize = 2;
200
201struct KeyOnlyIterator<'iter, K: Key, V: LayerValue> {
203 buffer: BufferCursor,
205
206 layer: &'iter PersistentLayer<K, V>,
207
208 pos: u64,
210
211 item_index: u16,
213
214 item_count: u16,
216
217 key: Option<K>,
219
220 value_deserialized: bool,
223}
224
225impl<K: Key, V: LayerValue> KeyOnlyIterator<'_, K, V> {
226 fn new<'iter>(layer: &'iter PersistentLayer<K, V>, pos: u64) -> KeyOnlyIterator<'iter, K, V> {
227 assert!(pos % layer.block_size == 0);
228 KeyOnlyIterator {
229 layer,
230 buffer: BufferCursor { chunk: None, pos: pos as usize % CHUNK_SIZE },
231 pos,
232 item_index: 0,
233 item_count: 0,
234 key: None,
235 value_deserialized: false,
236 }
237 }
238
239 fn seek_to_block_item(&mut self, index: u16) -> Result<(), Error> {
243 ensure!(index < self.item_count, FxfsError::OutOfRange);
244 if index == self.item_index && self.value_deserialized {
245 return Ok(());
248 }
249 let offset_in_block = if index == 0 {
250 PER_DATA_BLOCK_HEADER_SIZE
253 } else {
254 let old_buffer_pos = self.buffer.pos;
255 self.buffer.pos = round_up(self.buffer.pos, self.layer.block_size as usize).unwrap()
256 - (PER_DATA_BLOCK_SEEK_ENTRY_SIZE * (usize::from(self.item_count - index)));
257 let res = self.buffer.read_u16::<LittleEndian>();
258 self.buffer.pos = old_buffer_pos;
259 let offset_in_block = res.context("Failed to read offset")? as usize;
260 if offset_in_block >= self.layer.block_size as usize
261 || offset_in_block <= PER_DATA_BLOCK_HEADER_SIZE
262 {
263 return Err(anyhow!(FxfsError::Inconsistent))
264 .context(format!("Offset {} is out of valid range.", offset_in_block));
265 }
266 offset_in_block
267 };
268 self.item_index = index;
269 self.buffer.pos =
270 round_down(self.buffer.pos, self.layer.block_size as usize) + offset_in_block;
271 Ok(())
272 }
273
274 async fn advance(&mut self) -> Result<(), Error> {
275 if self.item_index >= self.item_count {
276 if self.pos >= self.layer.data_offset() + self.layer.data_size {
277 self.key = None;
278 return Ok(());
279 }
280 if self.buffer.chunk.is_none() || self.pos as usize % CHUNK_SIZE == 0 {
281 self.buffer.chunk = Some(
282 self.layer
283 .caching_object_handle
284 .read(self.pos as usize)
285 .await
286 .context("Reading during advance")?,
287 );
288 }
289 self.buffer.pos = self.pos as usize % CHUNK_SIZE;
290 self.item_count = self.buffer.read_u16::<LittleEndian>()?;
291 if self.item_count == 0 {
292 bail!(
293 "Read block with zero item count (object: {}, offset: {})",
294 self.layer.object_handle.object_id(),
295 self.pos
296 );
297 }
298 debug!(
299 pos = self.pos,
300 buf:? = self.buffer,
301 object_size = self.layer.data_offset() + self.layer.data_size,
302 oid = self.layer.object_handle.object_id();
303 ""
304 );
305 self.pos += self.layer.block_size;
306 self.item_index = 0;
307 self.value_deserialized = true;
308 }
309 self.seek_to_block_item(self.item_index)?;
310 self.key = Some(
311 K::deserialize_from_version(self.buffer.by_ref(), self.layer.version)
312 .context("Corrupt layer (key)")?,
313 );
314 self.item_index += 1;
315 self.value_deserialized = false;
316 Ok(())
317 }
318
319 fn get(&self) -> Option<&K> {
320 self.key.as_ref()
321 }
322}
323
324struct Iterator<'iter, K: Key, V: LayerValue> {
325 inner: KeyOnlyIterator<'iter, K, V>,
326 item: Option<Item<K, V>>,
328}
329
330impl<'iter, K: Key, V: LayerValue> Iterator<'iter, K, V> {
331 fn new(mut seek_iterator: KeyOnlyIterator<'iter, K, V>) -> Result<Self, Error> {
332 let key = std::mem::take(&mut seek_iterator.key);
333 let item = if let Some(key) = key {
334 seek_iterator.value_deserialized = true;
335 Some(Item {
336 key,
337 value: V::deserialize_from_version(
338 seek_iterator.buffer.by_ref(),
339 seek_iterator.layer.version,
340 )
341 .context("Corrupt layer (value)")?,
342 sequence: seek_iterator
343 .buffer
344 .read_u64::<LittleEndian>()
345 .context("Corrupt layer (seq)")?,
346 })
347 } else {
348 None
349 };
350 Ok(Self { inner: seek_iterator, item })
351 }
352}
353
354#[async_trait]
355impl<'iter, K: Key, V: LayerValue> LayerIterator<K, V> for Iterator<'iter, K, V> {
356 async fn advance(&mut self) -> Result<(), Error> {
357 self.inner.advance().await?;
358 let key = std::mem::take(&mut self.inner.key);
359 self.item = if let Some(key) = key {
360 self.inner.value_deserialized = true;
361 Some(Item {
362 key,
363 value: V::deserialize_from_version(
364 self.inner.buffer.by_ref(),
365 self.inner.layer.version,
366 )
367 .context("Corrupt layer (value)")?,
368 sequence: self
369 .inner
370 .buffer
371 .read_u64::<LittleEndian>()
372 .context("Corrupt layer (seq)")?,
373 })
374 } else {
375 None
376 };
377 Ok(())
378 }
379
380 fn get(&self) -> Option<ItemRef<'_, K, V>> {
381 self.item.as_ref().map(<&Item<K, V>>::into)
382 }
383}
384
385fn seek_table_size(num_data_blocks: u64) -> usize {
387 let seek_table_entries = num_data_blocks.saturating_sub(1) as usize;
389 if seek_table_entries == 0 {
390 return 0;
391 }
392 let entry_size = std::mem::size_of::<u64>();
393 seek_table_entries * entry_size
394}
395
396async fn load_seek_table(
397 object_handle: &(impl ReadObjectHandle + 'static),
398 seek_table_offset: u64,
399 num_data_blocks: u64,
400) -> Result<Vec<u64>, Error> {
401 let seek_table_size = seek_table_size(num_data_blocks);
402 if seek_table_size == 0 {
403 return Ok(vec![]);
404 }
405 if seek_table_size > MAX_SEEK_TABLE_SIZE {
406 return Err(anyhow!(FxfsError::NotSupported)).context("Seek table too large");
407 }
408 let mut buffer = object_handle.allocate_buffer(seek_table_size).await;
409 let bytes_read = object_handle
410 .read(seek_table_offset, buffer.as_mut())
411 .await
412 .context("Reading seek table blocks")?;
413 ensure!(bytes_read == seek_table_size, "Short read");
414
415 let mut seek_table = Vec::with_capacity(num_data_blocks as usize);
416 seek_table.push(0);
418 let mut prev = 0;
419 for chunk in buffer.as_slice().chunks_exact(std::mem::size_of::<u64>()) {
420 let next = LittleEndian::read_u64(chunk);
421 if prev > next {
424 return Err(anyhow!(FxfsError::Inconsistent))
425 .context(format!("Seek table entry out of order, {:?} > {:?}", prev, next));
426 }
427 prev = next;
428 seek_table.push(next);
429 }
430 Ok(seek_table)
431}
432
433async fn load_bloom_filter<K: FuzzyHash>(
434 handle: &(impl ReadObjectHandle + 'static),
435 bloom_filter_offset: u64,
436 layer_info: &LayerInfo,
437) -> Result<Option<BloomFilterReader<K>>, Error> {
438 if layer_info.bloom_filter_size_bytes == 0 {
439 return Ok(None);
440 }
441 if layer_info.bloom_filter_size_bytes > MAX_BLOOM_FILTER_SIZE {
442 return Err(anyhow!(FxfsError::NotSupported)).context("Bloom filter too large");
443 }
444 let mut buffer = handle.allocate_buffer(layer_info.bloom_filter_size_bytes).await;
445 handle.read(bloom_filter_offset, buffer.as_mut()).await.context("Failed to read")?;
446 Ok(Some(BloomFilterReader::read(
447 buffer.as_slice(),
448 layer_info.bloom_filter_seed,
449 layer_info.bloom_filter_num_nonces,
450 )?))
451}
452
453impl<K: Key, V: LayerValue> PersistentLayer<K, V> {
454 pub async fn open(handle: impl ReadObjectHandle + 'static) -> Result<Arc<Self>, Error> {
455 let bs = handle.block_size();
456 let mut buffer = handle.allocate_buffer(bs as usize).await;
457 handle.read(0, buffer.as_mut()).await.context("Failed to read first block")?;
458 let mut cursor = std::io::Cursor::new(buffer.as_slice());
459 let version = Version::deserialize_from(&mut cursor)?;
460
461 ensure!(version <= LATEST_VERSION, FxfsError::InvalidVersion);
462 let header = LayerHeader::deserialize_from_version(&mut cursor, version)
463 .context("Failed to deserialize header")?;
464 if &header.magic != PERSISTENT_LAYER_MAGIC {
465 return Err(anyhow!(FxfsError::Inconsistent).context("Invalid layer file magic"));
466 }
467 if header.block_size == 0 || !header.block_size.is_power_of_two() {
468 return Err(anyhow!(FxfsError::Inconsistent))
469 .context(format!("Invalid block size {}", header.block_size));
470 }
471 ensure!(header.block_size > 0, FxfsError::Inconsistent);
472 ensure!(header.block_size <= MAX_BLOCK_SIZE, FxfsError::NotSupported);
473 let physical_block_size = handle.block_size();
474 if header.block_size % physical_block_size != 0 {
475 return Err(anyhow!(FxfsError::Inconsistent)).context(format!(
476 "{} not a multiple of physical block size {}",
477 header.block_size, physical_block_size
478 ));
479 }
480 std::mem::drop(cursor);
481
482 let bs = header.block_size as usize;
483 if handle.get_size() < MINIMUM_LAYER_FILE_BLOCKS * bs as u64 {
484 return Err(anyhow!(FxfsError::Inconsistent).context("Layer file too short"));
485 }
486
487 let layer_info = {
488 let last_block_offset = handle
489 .get_size()
490 .checked_sub(header.block_size)
491 .ok_or(FxfsError::Inconsistent)
492 .context("Layer file unexpectedly short")?;
493 handle
494 .read(last_block_offset, buffer.subslice_mut(0..header.block_size as usize))
495 .await
496 .context("Failed to read layer info")?;
497 let layer_info_len =
498 LittleEndian::read_u64(&buffer.as_slice()[bs - std::mem::size_of::<u64>()..]);
499 let layer_info_offset = bs
500 .checked_sub(std::mem::size_of::<u64>() + layer_info_len as usize)
501 .ok_or(FxfsError::Inconsistent)
502 .context("Invalid layer info length")?;
503 let mut cursor = std::io::Cursor::new(&buffer.as_slice()[layer_info_offset..]);
504 LayerInfo::deserialize_from_version(&mut cursor, version)
505 .context("Failed to deserialize LayerInfo")?
506 };
507 std::mem::drop(buffer);
508 if layer_info.num_items == 0 && layer_info.num_data_blocks > 0 {
509 return Err(anyhow!(FxfsError::Inconsistent))
510 .context("Invalid num_items/num_data_blocks");
511 }
512 let total_blocks = handle.get_size() / header.block_size;
513 let bloom_filter_blocks =
514 round_up(layer_info.bloom_filter_size_bytes as u64, header.block_size)
515 .unwrap_or(layer_info.bloom_filter_size_bytes as u64)
516 / header.block_size;
517 if layer_info.num_data_blocks + bloom_filter_blocks
518 > total_blocks - MINIMUM_LAYER_FILE_BLOCKS
519 {
520 return Err(anyhow!(FxfsError::Inconsistent)).context("Invalid number of blocks");
521 }
522
523 let bloom_filter_offset =
524 header.block_size * (NUM_HEADER_BLOCKS + layer_info.num_data_blocks);
525 let bloom_filter = load_bloom_filter(&handle, bloom_filter_offset, &layer_info)
526 .await
527 .context("Failed to load bloom filter")?;
528 let bloom_filter_stats = bloom_filter.as_ref().map(|b| b.stats());
529
530 let seek_offset = header.block_size
531 * (NUM_HEADER_BLOCKS + layer_info.num_data_blocks + bloom_filter_blocks);
532 let seek_table = load_seek_table(&handle, seek_offset, layer_info.num_data_blocks)
533 .await
534 .context("Failed to load seek table")?;
535
536 let object_handle = Arc::new(handle) as Arc<dyn ReadObjectHandle>;
537 let caching_object_handle = CachingObjectHandle::new(object_handle.clone());
538 Ok(Arc::new(PersistentLayer {
539 object_handle,
540 caching_object_handle,
541 version,
542 block_size: header.block_size,
543 data_size: layer_info.num_data_blocks * header.block_size,
544 seek_table,
545 num_items: Some(layer_info.num_items),
546 bloom_filter,
547 bloom_filter_stats,
548 close_event: Mutex::new(Some(Arc::new(DropEvent::new()))),
549 _value_type: PhantomData::default(),
550 }))
551 }
552
553 fn data_offset(&self) -> u64 {
554 NUM_HEADER_BLOCKS * self.block_size
555 }
556}
557
558#[async_trait]
559impl<K: Key, V: LayerValue> Layer<K, V> for PersistentLayer<K, V> {
560 fn handle(&self) -> Option<&dyn ReadObjectHandle> {
561 Some(&self.object_handle)
562 }
563
564 fn purge_cached_data(&self) {
565 self.caching_object_handle.purge();
566 }
567
568 async fn seek<'a>(&'a self, bound: Bound<&K>) -> Result<BoxedLayerIterator<'a, K, V>, Error> {
569 let (key, excluded) = match bound {
570 Bound::Unbounded => {
571 let mut iterator = Iterator::new(KeyOnlyIterator::new(self, self.data_offset()))?;
572 iterator.advance().await.context("Unbounded seek advance")?;
573 return Ok(Box::new(iterator));
574 }
575 Bound::Included(k) => (k, false),
576 Bound::Excluded(k) => (k, true),
577 };
578 let first_data_block_index = self.data_offset() / self.block_size;
579
580 let (mut left_offset, mut right_offset) = {
581 let target = key.get_leading_u64();
586 let right_index = self.seek_table.as_slice().partition_point(|&x| x <= target) as u64;
588 let left_index = self.seek_table.as_slice()[..right_index as usize]
591 .partition_point(|&x| x < target)
592 .saturating_sub(1) as u64;
593
594 (
595 (left_index + first_data_block_index) * self.block_size,
596 (right_index + first_data_block_index) * self.block_size,
597 )
598 };
599 let mut left = KeyOnlyIterator::new(self, left_offset);
600 left.advance().await.context("Initial seek advance")?;
601 match left.get() {
602 None => return Ok(Box::new(Iterator::new(left)?)),
603 Some(left_key) => match left_key.cmp_upper_bound(key) {
604 Ordering::Greater => return Ok(Box::new(Iterator::new(left)?)),
605 Ordering::Equal => {
606 if excluded {
607 left.advance().await?;
608 }
609 return Ok(Box::new(Iterator::new(left)?));
610 }
611 Ordering::Less => {}
612 },
613 }
614 let mut right = None;
615 while right_offset - left_offset > self.block_size {
616 let mid_offset =
618 round_down(left_offset + (right_offset - left_offset) / 2, self.block_size);
619 let mut iterator = KeyOnlyIterator::new(self, mid_offset);
620 iterator.advance().await?;
621 let iter_key: &K = iterator.get().unwrap();
622 match iter_key.cmp_upper_bound(key) {
623 Ordering::Greater => {
624 right_offset = mid_offset;
625 right = Some(iterator);
626 }
627 Ordering::Equal => {
628 if excluded {
629 iterator.advance().await?;
630 }
631 return Ok(Box::new(Iterator::new(iterator)?));
632 }
633 Ordering::Less => {
634 left_offset = mid_offset;
635 left = iterator;
636 }
637 }
638 }
639
640 let mut left_index = 0;
642 let mut right_index = left.item_count;
643 while left_index < (right_index - 1) {
645 let mid_index = left_index + ((right_index - left_index) / 2);
646 left.seek_to_block_item(mid_index).context("Read index offset for binary search")?;
647 left.advance().await?;
648 match left.get().unwrap().cmp_upper_bound(key) {
649 Ordering::Greater => {
650 right_index = mid_index;
651 }
652 Ordering::Equal => {
653 if excluded {
654 left.advance().await?;
655 }
656 return Ok(Box::new(Iterator::new(left)?));
657 }
658 Ordering::Less => {
659 left_index = mid_index;
660 }
661 }
662 }
663 if right_index < left.item_count {
668 left.seek_to_block_item(right_index)
669 .context("Read index for offset of right pointer")?;
670 } else if let Some(right) = right {
671 return Ok(Box::new(Iterator::new(right)?));
672 } else {
673 }
679 left.advance().await?;
680 return Ok(Box::new(Iterator::new(left)?));
681 }
682
683 fn estimated_len(&self) -> ItemCount {
684 match self.num_items {
685 Some(num_items) => ItemCount::Precise(num_items),
686 None => {
687 const ITEM_SIZE_ESTIMATE: usize = 32;
691 ItemCount::Estimate(self.object_handle.get_size() as usize / ITEM_SIZE_ESTIMATE)
692 }
693 }
694 }
695
696 fn maybe_contains_key(&self, key: &K) -> bool {
697 self.bloom_filter.as_ref().map_or(true, |f| f.maybe_contains(key))
698 }
699
700 fn lock(&self) -> Option<Arc<DropEvent>> {
701 self.close_event.lock().clone()
702 }
703
704 async fn close(&self) {
705 let listener = self.close_event.lock().take().expect("close already called").listen();
706 listener.await;
707 }
708
709 fn get_version(&self) -> Version {
710 return self.version;
711 }
712
713 fn record_inspect_data(self: Arc<Self>, node: &fuchsia_inspect::Node) {
714 node.record_bool("persistent", true);
715 node.record_uint("size", self.object_handle.get_size());
716 if let Some(stats) = self.bloom_filter_stats.as_ref() {
717 node.record_child("bloom_filter", move |node| {
718 node.record_uint("size", stats.size as u64);
719 node.record_uint("num_nonces", stats.num_nonces as u64);
720 node.record_uint("fill_percentage", stats.fill_percentage as u64);
721 });
722 }
723 if let Some(items) = self.num_items {
724 node.record_uint("num_items", items as u64);
725 }
726 }
727}
728
729const_assert!(MAX_BLOCK_SIZE <= u16::MAX as u64 + 1);
731
732pub struct PersistentLayerWriter<W: WriteBytes, K: Key, V: LayerValue> {
735 writer: W,
736 block_size: u64,
737 buf: Vec<u8>,
738 buf_item_count: u16,
739 item_count: usize,
740 block_offsets: Vec<u16>,
741 block_keys: Vec<u64>,
742 bloom_filter: BloomFilterWriter<K>,
743 _value: PhantomData<V>,
744}
745
746impl<W: WriteBytes, K: Key, V: LayerValue> PersistentLayerWriter<W, K, V> {
747 pub async fn new(
755 mut writer: W,
756 estimated_num_items: usize,
757 block_size: u64,
758 ) -> Result<Self, Error> {
759 ensure!(block_size <= MAX_BLOCK_SIZE, FxfsError::NotSupported);
760 ensure!(block_size >= MIN_BLOCK_SIZE, FxfsError::NotSupported);
761
762 let header = LayerHeader { magic: PERSISTENT_LAYER_MAGIC.clone(), block_size };
764 let mut buf = vec![0u8; block_size as usize];
765 {
766 let mut cursor = std::io::Cursor::new(&mut buf[..]);
767 header.serialize_with_version(&mut cursor)?;
768 }
769 writer.write_bytes(&buf[..]).await?;
770
771 let nonce: u64 = rand::thread_rng().gen();
772 Ok(PersistentLayerWriter {
773 writer,
774 block_size,
775 buf: Vec::new(),
776 buf_item_count: 0,
777 item_count: 0,
778 block_offsets: Vec::new(),
779 block_keys: Vec::new(),
780 bloom_filter: BloomFilterWriter::new(nonce, estimated_num_items),
781 _value: PhantomData,
782 })
783 }
784
785 async fn write_block(&mut self, len: usize) -> Result<(), Error> {
790 if self.buf_item_count == 0 {
791 return Ok(());
792 }
793 let seek_table_size = self.block_offsets.len() * PER_DATA_BLOCK_SEEK_ENTRY_SIZE;
794 assert!(PER_DATA_BLOCK_HEADER_SIZE + seek_table_size + len <= self.block_size as usize);
795 let mut cursor = std::io::Cursor::new(vec![0u8; self.block_size as usize]);
796 cursor.write_u16::<LittleEndian>(self.buf_item_count)?;
797 cursor.write_all(self.buf.drain(..len).as_ref())?;
798 cursor.set_position(self.block_size - seek_table_size as u64);
799 for &offset in &self.block_offsets {
801 cursor.write_u16::<LittleEndian>(offset)?;
802 }
803 self.writer.write_bytes(cursor.get_ref()).await?;
804 debug!(item_count = self.buf_item_count, byte_count = len; "wrote items");
805 self.buf_item_count = 0;
806 self.block_offsets.clear();
807 Ok(())
808 }
809
810 async fn write_seek_table(&mut self) -> Result<usize, Error> {
815 if self.block_keys.len() == 0 {
816 return Ok(0);
817 }
818 let size = self.block_keys.len() * std::mem::size_of::<u64>();
819 self.buf.resize(size, 0);
820 let mut len = 0;
821 for key in &self.block_keys {
822 LittleEndian::write_u64(&mut self.buf[len..len + std::mem::size_of::<u64>()], *key);
823 len += std::mem::size_of::<u64>();
824 }
825 self.writer.write_bytes(&self.buf).await?;
826 Ok(size)
827 }
828
829 async fn write_info(
832 &mut self,
833 num_data_blocks: u64,
834 bloom_filter_size_bytes: usize,
835 seek_table_len: usize,
836 ) -> Result<(), Error> {
837 let block_size = self.writer.block_size() as usize;
838 let layer_info = LayerInfo {
839 num_items: self.item_count,
840 num_data_blocks,
841 bloom_filter_size_bytes,
842 bloom_filter_seed: self.bloom_filter.seed(),
843 bloom_filter_num_nonces: self.bloom_filter.num_nonces(),
844 };
845 let actual_len = {
846 let mut cursor = std::io::Cursor::new(&mut self.buf);
847 layer_info.serialize_into(&mut cursor)?;
848 let layer_info_len = cursor.position();
849 cursor.write_u64::<LittleEndian>(layer_info_len)?;
850 cursor.position() as usize
851 };
852
853 let avail_in_block = block_size - (seek_table_len % block_size);
856 let to_skip = if avail_in_block < actual_len {
857 block_size + avail_in_block - actual_len
858 } else {
859 avail_in_block - actual_len
860 } as u64;
861 self.writer.skip(to_skip).await?;
862 self.writer.write_bytes(&self.buf[..actual_len]).await?;
863 Ok(())
864 }
865
866 async fn write_bloom_filter(&mut self) -> Result<usize, Error> {
869 if self.data_blocks() < MINIMUM_DATA_BLOCKS_FOR_BLOOM_FILTER {
870 return Ok(0);
871 }
872 let size = round_up(self.bloom_filter.serialized_size(), self.block_size as usize).unwrap();
874 self.buf.resize(size, 0);
875 let mut cursor = std::io::Cursor::new(&mut self.buf);
876 self.bloom_filter.write(&mut cursor)?;
877 self.writer.write_bytes(&self.buf).await?;
878 Ok(self.bloom_filter.serialized_size())
879 }
880
881 fn data_blocks(&self) -> usize {
882 if self.item_count == 0 {
883 0
884 } else {
885 self.block_keys.len() + 1
886 }
887 }
888}
889
890impl<W: WriteBytes + Send, K: Key, V: LayerValue> LayerWriter<K, V>
891 for PersistentLayerWriter<W, K, V>
892{
893 async fn write(&mut self, item: ItemRef<'_, K, V>) -> Result<(), Error> {
894 let len = self.buf.len();
896 item.key.serialize_into(&mut self.buf)?;
897 item.value.serialize_into(&mut self.buf)?;
898 self.buf.write_u64::<LittleEndian>(item.sequence)?;
899 let mut added_offset = false;
900 if self.buf_item_count > 0 {
902 self.block_offsets.push(u16::try_from(len + PER_DATA_BLOCK_HEADER_SIZE).unwrap());
903 added_offset = true;
904 }
905
906 if PER_DATA_BLOCK_HEADER_SIZE
909 + self.buf.len()
910 + (self.block_offsets.len() * PER_DATA_BLOCK_SEEK_ENTRY_SIZE)
911 > self.block_size as usize - 1
912 {
913 if added_offset {
914 self.block_offsets.pop();
917 }
918 self.write_block(len).await?;
919
920 self.block_keys.push(item.key.get_leading_u64());
922 }
923
924 self.bloom_filter.insert(&item.key);
925 self.buf_item_count += 1;
926 self.item_count += 1;
927 Ok(())
928 }
929
930 async fn flush(&mut self) -> Result<(), Error> {
931 self.write_block(self.buf.len()).await?;
932 let data_blocks = self.data_blocks() as u64;
933 let bloom_filter_len = self.write_bloom_filter().await?;
934 let seek_table_len = self.write_seek_table().await?;
935 self.write_info(data_blocks, bloom_filter_len, seek_table_len).await?;
936 self.writer.complete().await
937 }
938}
939
940impl<W: WriteBytes, K: Key, V: LayerValue> Drop for PersistentLayerWriter<W, K, V> {
941 fn drop(&mut self) {
942 if self.buf_item_count > 0 {
943 warn!("Dropping unwritten items; did you forget to flush?");
944 }
945 }
946}
947
948#[cfg(test)]
949mod tests {
950 use super::{PersistentLayer, PersistentLayerWriter};
951 use crate::filesystem::MAX_BLOCK_SIZE;
952 use crate::lsm_tree::types::{
953 DefaultOrdUpperBound, FuzzyHash, Item, ItemRef, Layer, LayerKey, LayerWriter, MergeType,
954 SortByU64,
955 };
956 use crate::lsm_tree::LayerIterator;
957 use crate::object_handle::WriteBytes;
958 use crate::round::round_up;
959 use crate::serialized_types::{
960 versioned_type, Version, Versioned, VersionedLatest, LATEST_VERSION,
961 };
962 use crate::testing::fake_object::{FakeObject, FakeObjectHandle};
963 use crate::testing::writer::Writer;
964 use fprint::TypeFingerprint;
965 use fxfs_macros::FuzzyHash;
966 use std::fmt::Debug;
967 use std::hash::Hash;
968 use std::ops::{Bound, Range};
969 use std::sync::Arc;
970
971 impl<W: WriteBytes> Debug for PersistentLayerWriter<W, i32, i32> {
972 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
973 f.debug_struct("rPersistentLayerWriter")
974 .field("block_size", &self.block_size)
975 .field("item_count", &self.buf_item_count)
976 .finish()
977 }
978 }
979
980 #[fuchsia::test]
981 async fn test_iterate_after_write() {
982 const BLOCK_SIZE: u64 = 512;
983 const ITEM_COUNT: i32 = 10000;
984
985 let handle = FakeObjectHandle::new(Arc::new(FakeObject::new()));
986 {
987 let mut writer = PersistentLayerWriter::<_, i32, i32>::new(
988 Writer::new(&handle).await,
989 ITEM_COUNT as usize * 4,
990 BLOCK_SIZE,
991 )
992 .await
993 .expect("writer new");
994 for i in 0..ITEM_COUNT {
995 writer.write(Item::new(i, i).as_item_ref()).await.expect("write failed");
996 }
997 writer.flush().await.expect("flush failed");
998 }
999 let layer = PersistentLayer::<i32, i32>::open(handle).await.expect("new failed");
1000 let mut iterator = layer.seek(Bound::Unbounded).await.expect("seek failed");
1001 for i in 0..ITEM_COUNT {
1002 let ItemRef { key, value, .. } = iterator.get().expect("missing item");
1003 assert_eq!((key, value), (&i, &i));
1004 iterator.advance().await.expect("failed to advance");
1005 }
1006 assert!(iterator.get().is_none());
1007 }
1008
1009 #[fuchsia::test]
1010 async fn test_seek_after_write() {
1011 const BLOCK_SIZE: u64 = 512;
1012 const ITEM_COUNT: i32 = 5000;
1013
1014 let handle = FakeObjectHandle::new(Arc::new(FakeObject::new()));
1015 {
1016 let mut writer = PersistentLayerWriter::<_, i32, i32>::new(
1017 Writer::new(&handle).await,
1018 ITEM_COUNT as usize * 18,
1019 BLOCK_SIZE,
1020 )
1021 .await
1022 .expect("writer new");
1023 for i in 0..ITEM_COUNT {
1024 writer.write(Item::new(i * 2, i * 2).as_item_ref()).await.expect("write failed");
1026 }
1027 writer.flush().await.expect("flush failed");
1028 }
1029 let layer = PersistentLayer::<i32, i32>::open(handle).await.expect("new failed");
1030 for i in 0..ITEM_COUNT * 2 {
1032 let expected = round_up(i, 2).unwrap();
1035 let mut iterator = layer.seek(Bound::Included(&i)).await.expect("failed to seek");
1036 if i >= (ITEM_COUNT * 2) - 1 {
1039 assert!(iterator.get().is_none());
1040 } else {
1041 let ItemRef { key, value, .. } = iterator.get().expect("missing item");
1042 assert_eq!((key, value), (&expected, &expected));
1043 }
1044
1045 iterator.advance().await.expect("failed to advance");
1047 if i >= (ITEM_COUNT * 2) - 3 {
1051 assert!(iterator.get().is_none());
1052 } else {
1053 let ItemRef { key, value, .. } = iterator.get().expect("missing item");
1054 let next = expected + 2;
1055 assert_eq!((key, value), (&next, &next));
1056 }
1057 }
1058 }
1059
1060 #[fuchsia::test]
1061 async fn test_seek_unbounded() {
1062 const BLOCK_SIZE: u64 = 512;
1063 const ITEM_COUNT: i32 = 1000;
1064
1065 let handle = FakeObjectHandle::new(Arc::new(FakeObject::new()));
1066 {
1067 let mut writer = PersistentLayerWriter::<_, i32, i32>::new(
1068 Writer::new(&handle).await,
1069 ITEM_COUNT as usize * 18,
1070 BLOCK_SIZE,
1071 )
1072 .await
1073 .expect("writer new");
1074 for i in 0..ITEM_COUNT {
1075 writer.write(Item::new(i, i).as_item_ref()).await.expect("write failed");
1076 }
1077 writer.flush().await.expect("flush failed");
1078 }
1079 let layer = PersistentLayer::<i32, i32>::open(handle).await.expect("new failed");
1080 let mut iterator = layer.seek(Bound::Unbounded).await.expect("failed to seek");
1081 let ItemRef { key, value, .. } = iterator.get().expect("missing item");
1082 assert_eq!((key, value), (&0, &0));
1083
1084 iterator.advance().await.expect("failed to advance");
1086 let ItemRef { key, value, .. } = iterator.get().expect("missing item");
1087 assert_eq!((key, value), (&1, &1));
1088 }
1089
1090 #[fuchsia::test]
1091 async fn test_zero_items() {
1092 const BLOCK_SIZE: u64 = 512;
1093
1094 let handle = FakeObjectHandle::new(Arc::new(FakeObject::new()));
1095 {
1096 let mut writer = PersistentLayerWriter::<_, i32, i32>::new(
1097 Writer::new(&handle).await,
1098 0,
1099 BLOCK_SIZE,
1100 )
1101 .await
1102 .expect("writer new");
1103 writer.flush().await.expect("flush failed");
1104 }
1105
1106 let layer = PersistentLayer::<i32, i32>::open(handle).await.expect("new failed");
1107 let iterator = (layer.as_ref() as &dyn Layer<i32, i32>)
1108 .seek(Bound::Unbounded)
1109 .await
1110 .expect("seek failed");
1111 assert!(iterator.get().is_none())
1112 }
1113
1114 #[fuchsia::test]
1115 async fn test_one_item() {
1116 const BLOCK_SIZE: u64 = 512;
1117
1118 let handle = FakeObjectHandle::new(Arc::new(FakeObject::new()));
1119 {
1120 let mut writer = PersistentLayerWriter::<_, i32, i32>::new(
1121 Writer::new(&handle).await,
1122 1,
1123 BLOCK_SIZE,
1124 )
1125 .await
1126 .expect("writer new");
1127 writer.write(Item::new(42, 42).as_item_ref()).await.expect("write failed");
1128 writer.flush().await.expect("flush failed");
1129 }
1130
1131 let layer = PersistentLayer::<i32, i32>::open(handle).await.expect("new failed");
1132 {
1133 let mut iterator = (layer.as_ref() as &dyn Layer<i32, i32>)
1134 .seek(Bound::Unbounded)
1135 .await
1136 .expect("seek failed");
1137 let ItemRef { key, value, .. } = iterator.get().expect("missing item");
1138 assert_eq!((key, value), (&42, &42));
1139 iterator.advance().await.expect("failed to advance");
1140 assert!(iterator.get().is_none())
1141 }
1142 {
1143 let mut iterator = (layer.as_ref() as &dyn Layer<i32, i32>)
1144 .seek(Bound::Included(&30))
1145 .await
1146 .expect("seek failed");
1147 let ItemRef { key, value, .. } = iterator.get().expect("missing item");
1148 assert_eq!((key, value), (&42, &42));
1149 iterator.advance().await.expect("failed to advance");
1150 assert!(iterator.get().is_none())
1151 }
1152 {
1153 let mut iterator = (layer.as_ref() as &dyn Layer<i32, i32>)
1154 .seek(Bound::Included(&42))
1155 .await
1156 .expect("seek failed");
1157 let ItemRef { key, value, .. } = iterator.get().expect("missing item");
1158 assert_eq!((key, value), (&42, &42));
1159 iterator.advance().await.expect("failed to advance");
1160 assert!(iterator.get().is_none())
1161 }
1162 {
1163 let iterator = (layer.as_ref() as &dyn Layer<i32, i32>)
1164 .seek(Bound::Included(&43))
1165 .await
1166 .expect("seek failed");
1167 assert!(iterator.get().is_none())
1168 }
1169 }
1170
1171 #[fuchsia::test]
1172 async fn test_large_block_size() {
1173 const BLOCK_SIZE: u64 = MAX_BLOCK_SIZE;
1175 const ITEM_COUNT: i32 = ((BLOCK_SIZE as i32) / 18) * 3;
1177
1178 let handle =
1179 FakeObjectHandle::new_with_block_size(Arc::new(FakeObject::new()), BLOCK_SIZE as usize);
1180 {
1181 let mut writer = PersistentLayerWriter::<_, i32, i32>::new(
1182 Writer::new(&handle).await,
1183 ITEM_COUNT as usize * 18,
1184 BLOCK_SIZE,
1185 )
1186 .await
1187 .expect("writer new");
1188 for i in 2000000000..(2000000000 + ITEM_COUNT) {
1190 writer.write(Item::new(i, i).as_item_ref()).await.expect("write failed");
1191 }
1192 writer.flush().await.expect("flush failed");
1193 }
1194
1195 let layer = PersistentLayer::<i32, i32>::open(handle).await.expect("new failed");
1196 let mut iterator = layer.seek(Bound::Unbounded).await.expect("seek failed");
1197 for i in 2000000000..(2000000000 + ITEM_COUNT) {
1198 let ItemRef { key, value, .. } = iterator.get().expect("missing item");
1199 assert_eq!((key, value), (&i, &i));
1200 iterator.advance().await.expect("failed to advance");
1201 }
1202 assert!(iterator.get().is_none());
1203 }
1204
1205 #[fuchsia::test]
1206 async fn test_overlarge_block_size() {
1207 const BLOCK_SIZE: u64 = MAX_BLOCK_SIZE * 2;
1209
1210 let handle =
1211 FakeObjectHandle::new_with_block_size(Arc::new(FakeObject::new()), BLOCK_SIZE as usize);
1212 PersistentLayerWriter::<_, i32, i32>::new(Writer::new(&handle).await, 0, BLOCK_SIZE)
1213 .await
1214 .expect_err("Creating writer with overlarge block size.");
1215 }
1216
1217 #[fuchsia::test]
1218 async fn test_seek_bound_excluded() {
1219 const BLOCK_SIZE: u64 = 512;
1220 const ITEM_COUNT: i32 = 10000;
1221
1222 let handle = FakeObjectHandle::new(Arc::new(FakeObject::new()));
1223 {
1224 let mut writer = PersistentLayerWriter::<_, i32, i32>::new(
1225 Writer::new(&handle).await,
1226 ITEM_COUNT as usize * 18,
1227 BLOCK_SIZE,
1228 )
1229 .await
1230 .expect("writer new");
1231 for i in 0..ITEM_COUNT {
1232 writer.write(Item::new(i, i).as_item_ref()).await.expect("write failed");
1233 }
1234 writer.flush().await.expect("flush failed");
1235 }
1236 let layer = PersistentLayer::<i32, i32>::open(handle).await.expect("new failed");
1237
1238 for i in 9982..ITEM_COUNT {
1239 let mut iterator = layer.seek(Bound::Excluded(&i)).await.expect("failed to seek");
1240 let i_plus_one = i + 1;
1241 if i_plus_one < ITEM_COUNT {
1242 let ItemRef { key, value, .. } = iterator.get().expect("missing item");
1243
1244 assert_eq!((key, value), (&i_plus_one, &i_plus_one));
1245
1246 iterator.advance().await.expect("failed to advance");
1248 let i_plus_two = i + 2;
1249 if i_plus_two < ITEM_COUNT {
1250 let ItemRef { key, value, .. } = iterator.get().expect("missing item");
1251 assert_eq!((key, value), (&i_plus_two, &i_plus_two));
1252 } else {
1253 assert!(iterator.get().is_none());
1254 }
1255 } else {
1256 assert!(iterator.get().is_none());
1257 }
1258 }
1259 }
1260
1261 #[derive(
1262 Clone,
1263 Eq,
1264 Hash,
1265 FuzzyHash,
1266 PartialEq,
1267 Debug,
1268 serde::Serialize,
1269 serde::Deserialize,
1270 TypeFingerprint,
1271 Versioned,
1272 )]
1273 struct TestKey(Range<u64>);
1274 versioned_type! { 1.. => TestKey }
1275 impl SortByU64 for TestKey {
1276 fn get_leading_u64(&self) -> u64 {
1277 self.0.start
1278 }
1279 }
1280 impl LayerKey for TestKey {
1281 fn merge_type(&self) -> crate::lsm_tree::types::MergeType {
1282 MergeType::OptimizedMerge
1283 }
1284
1285 fn next_key(&self) -> Option<Self> {
1286 Some(TestKey(self.0.end..self.0.end + 1))
1287 }
1288 }
1289 impl Ord for TestKey {
1290 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1291 self.0.start.cmp(&other.0.start).then(self.0.end.cmp(&other.0.end))
1292 }
1293 }
1294 impl PartialOrd for TestKey {
1295 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1296 Some(self.cmp(other))
1297 }
1298 }
1299 impl DefaultOrdUpperBound for TestKey {}
1300
1301 #[fuchsia::test]
1304 async fn test_block_seek_duplicate_keys() {
1305 const BLOCK_SIZE: u64 = 512;
1307 const ITEMS_TO_FILL_BLOCK: u64 = BLOCK_SIZE / 37;
1311
1312 let mut to_find = Vec::new();
1313
1314 let handle =
1315 FakeObjectHandle::new_with_block_size(Arc::new(FakeObject::new()), BLOCK_SIZE as usize);
1316 {
1317 let mut writer = PersistentLayerWriter::<_, TestKey, u64>::new(
1318 Writer::new(&handle).await,
1319 3 * BLOCK_SIZE as usize,
1320 BLOCK_SIZE,
1321 )
1322 .await
1323 .expect("writer new");
1324
1325 let mut current_value = u32::MAX as u64 + 1;
1327
1328 {
1331 let items = ITEMS_TO_FILL_BLOCK * 3;
1332 for i in 0..items {
1333 writer
1334 .write(
1335 Item::new(TestKey(current_value..current_value + i), current_value)
1336 .as_item_ref(),
1337 )
1338 .await
1339 .expect("write failed");
1340 }
1341 to_find.push(TestKey(current_value..current_value));
1342 to_find.push(TestKey(current_value..(current_value + (items / 2))));
1343 to_find.push(TestKey(current_value..current_value + (items - 1)));
1344 current_value += 1;
1345 }
1346
1347 {
1349 let items = ITEMS_TO_FILL_BLOCK * 3;
1350 for _ in 0..items {
1351 writer
1352 .write(
1353 Item::new(TestKey(current_value..current_value), current_value)
1354 .as_item_ref(),
1355 )
1356 .await
1357 .expect("write failed");
1358 current_value += 1;
1359 }
1360 }
1361
1362 {
1365 let items = ITEMS_TO_FILL_BLOCK * 3;
1366 for i in 0..items {
1367 writer
1368 .write(
1369 Item::new(TestKey(current_value..current_value + i), current_value)
1370 .as_item_ref(),
1371 )
1372 .await
1373 .expect("write failed");
1374 }
1375 to_find.push(TestKey(current_value..current_value));
1376 to_find.push(TestKey(current_value..(current_value + (items / 2))));
1377 to_find.push(TestKey(current_value..current_value + (items - 1)));
1378 current_value += 1;
1379 }
1380
1381 {
1383 let items = ITEMS_TO_FILL_BLOCK * 3;
1384 for _ in 0..items {
1385 writer
1386 .write(
1387 Item::new(TestKey(current_value..current_value), current_value)
1388 .as_item_ref(),
1389 )
1390 .await
1391 .expect("write failed");
1392 current_value += 1;
1393 }
1394 }
1395
1396 {
1399 let items = ITEMS_TO_FILL_BLOCK * 3;
1400 for i in 0..items {
1401 writer
1402 .write(
1403 Item::new(TestKey(current_value..current_value + i), current_value)
1404 .as_item_ref(),
1405 )
1406 .await
1407 .expect("write failed");
1408 }
1409 to_find.push(TestKey(current_value..current_value));
1410 to_find.push(TestKey(current_value..(current_value + (items / 2))));
1411 to_find.push(TestKey(current_value..current_value + (items - 1)));
1412 }
1413
1414 writer.flush().await.expect("flush failed");
1415 }
1416
1417 let layer = PersistentLayer::<TestKey, u64>::open(handle).await.expect("new failed");
1418 for target in to_find {
1419 let iterator: Box<dyn LayerIterator<TestKey, u64>> =
1420 layer.seek(Bound::Included(&target)).await.expect("failed to seek");
1421 let ItemRef { key, .. } = iterator.get().expect("missing item");
1422 assert_eq!(&target, key);
1423 }
1424 }
1425
1426 #[fuchsia::test]
1427 async fn test_two_seek_blocks() {
1428 const BLOCK_SIZE: u64 = 512;
1430 const ITEMS_TO_FILL_BLOCK: u64 = BLOCK_SIZE / 37;
1434 const ITEM_COUNT: u64 = ITEMS_TO_FILL_BLOCK * ((BLOCK_SIZE / 8) + 2);
1438
1439 let mut to_find = Vec::new();
1440
1441 let handle =
1442 FakeObjectHandle::new_with_block_size(Arc::new(FakeObject::new()), BLOCK_SIZE as usize);
1443 {
1444 let mut writer = PersistentLayerWriter::<_, TestKey, u64>::new(
1445 Writer::new(&handle).await,
1446 ITEM_COUNT as usize * 18,
1447 BLOCK_SIZE,
1448 )
1449 .await
1450 .expect("writer new");
1451
1452 let initial_value = u32::MAX as u64 + 1;
1454 for i in 0..ITEM_COUNT {
1455 writer
1456 .write(
1457 Item::new(TestKey(initial_value + i..initial_value + i), initial_value)
1458 .as_item_ref(),
1459 )
1460 .await
1461 .expect("write failed");
1462 }
1463 to_find.push(TestKey(initial_value..initial_value));
1465 let middle = initial_value + ITEM_COUNT / 2;
1466 to_find.push(TestKey(middle..middle));
1467 let end = initial_value + ITEM_COUNT - 1;
1468 to_find.push(TestKey(end..end));
1469
1470 writer.flush().await.expect("flush failed");
1471 }
1472
1473 let layer = PersistentLayer::<TestKey, u64>::open(handle).await.expect("new failed");
1474 for target in to_find {
1475 let iterator: Box<dyn LayerIterator<TestKey, u64>> =
1476 layer.seek(Bound::Included(&target)).await.expect("failed to seek");
1477 let ItemRef { key, .. } = iterator.get().expect("missing item");
1478 assert_eq!(&target, key);
1479 }
1480 }
1481
1482 #[fuchsia::test]
1485 async fn test_full_seek_block() {
1486 const BLOCK_SIZE: u64 = 512;
1487
1488 const ITEMS_TO_FILL_BLOCK: u64 = BLOCK_SIZE / 37;
1492
1493 const SEEK_TABLE_ENTRIES: u64 = BLOCK_SIZE / 8;
1495
1496 const START_ENTRIES_COUNT: u64 = ITEMS_TO_FILL_BLOCK * SEEK_TABLE_ENTRIES;
1500
1501 for entries in START_ENTRIES_COUNT..START_ENTRIES_COUNT + (ITEMS_TO_FILL_BLOCK * 2) {
1502 let handle = FakeObjectHandle::new_with_block_size(
1503 Arc::new(FakeObject::new()),
1504 BLOCK_SIZE as usize,
1505 );
1506 {
1507 let mut writer = PersistentLayerWriter::<_, TestKey, u64>::new(
1508 Writer::new(&handle).await,
1509 entries as usize,
1510 BLOCK_SIZE,
1511 )
1512 .await
1513 .expect("writer new");
1514
1515 let initial_value = u32::MAX as u64 + 1;
1517 for i in 0..entries {
1518 writer
1519 .write(
1520 Item::new(TestKey(initial_value + i..initial_value + i), initial_value)
1521 .as_item_ref(),
1522 )
1523 .await
1524 .expect("write failed");
1525 }
1526
1527 writer.flush().await.expect("flush failed");
1528 }
1529 PersistentLayer::<TestKey, u64>::open(handle).await.expect("new failed");
1530 }
1531 }
1532}