1use crate::checksum::{fletcher64, Checksum};
6use crate::object_store::journal::{JournalCheckpoint, JournalHandle, BLOCK_SIZE, RESET_XOR};
7use crate::serialized_types::{Version, Versioned, VersionedLatest};
8use anyhow::{bail, Context, Error};
9use byteorder::{ByteOrder, LittleEndian};
10
11pub struct JournalReader {
17 handle: Box<dyn JournalHandle>,
19
20 buf: Vec<u8>,
22
23 buf_range: std::ops::Range<usize>,
25
26 read_offset: u64,
28
29 buf_file_offset: u64,
31
32 checksums: Vec<Checksum>,
35
36 version: Version,
38
39 bad_checksum: bool,
41
42 found_reset: bool,
44
45 first_read: bool,
47
48 eof_ok: bool,
52}
53
54impl JournalReader {
55 pub(super) fn new(handle: impl JournalHandle, checkpoint: &JournalCheckpoint) -> Self {
56 JournalReader {
57 handle: Box::new(handle),
58 buf: Vec::new(),
59 buf_range: 0..0,
60 read_offset: checkpoint.file_offset - checkpoint.file_offset % BLOCK_SIZE,
61 buf_file_offset: checkpoint.file_offset,
62 checksums: vec![checkpoint.checksum],
63 version: checkpoint.version,
64 bad_checksum: false,
65 found_reset: false,
66 first_read: true,
67 eof_ok: false,
68 }
69 }
70
71 pub(super) fn journal_file_checkpoint(&self) -> JournalCheckpoint {
72 JournalCheckpoint {
73 file_offset: self.buf_file_offset,
74 checksum: self.checksums[0],
75 version: self.version,
76 }
77 }
78
79 fn last_read_checksum(&self) -> Checksum {
80 *self.checksums.last().unwrap()
81 }
82
83 pub fn set_version(&mut self, version: Version) {
87 self.version = version;
88
89 self.bad_checksum = false;
91 }
92
93 pub fn set_eof_ok(&mut self) {
95 self.eof_ok = true;
96 }
97
98 pub fn skip_to_end_of_block(&mut self) {
101 let bs = self.block_size();
102 let block_offset = self.buf_file_offset % bs;
103 if block_offset > 0 {
104 self.consume((bs - block_offset) as usize - std::mem::size_of::<Checksum>());
105 }
106 }
107
108 pub fn handle(&mut self) -> &mut dyn JournalHandle {
109 self.handle.as_mut()
110 }
111
112 pub async fn deserialize<T: VersionedLatest>(&mut self) -> Result<ReadResult<T>, Error>
116 where
117 T: Versioned,
118 {
119 self.fill_buf().await?;
120 let mut cursor = std::io::Cursor::new(self.buffer());
121 let version = self.version;
122 match T::deserialize_from_version(&mut cursor, version) {
123 Ok(record) => {
124 let consumed = cursor.position() as usize;
125 self.consume(consumed);
126 Ok(ReadResult::Some(record))
127 }
128 Err(e) => {
129 if self.found_reset {
130 self.found_reset = false;
131
132 self.consume(self.buf_range.end - self.buf_range.start);
134 self.buf_range =
135 self.buf_range.end..self.buf.len() - std::mem::size_of::<Checksum>();
136 let (version, to_consume) = {
137 let mut cursor = std::io::Cursor::new(self.buffer());
138 let version = Version::deserialize_from(&mut cursor)
139 .context("Failed to deserialize version")?;
140 (version, cursor.position() as usize)
141 };
142 self.version = version.clone();
143 self.consume(to_consume);
144 return Ok(ReadResult::Reset(version));
145 } else if let Some(io_error) = e.downcast_ref::<std::io::Error>() {
146 if io_error.kind() == std::io::ErrorKind::UnexpectedEof && self.bad_checksum {
147 return Ok(ReadResult::ChecksumMismatch);
148 }
149 }
150 Err(e.into())
151 }
152 }
153 }
154
155 pub async fn fill_buf(&mut self) -> Result<(), Error> {
159 let bs = self.block_size() as usize;
160 let min_required = bs - std::mem::size_of::<Checksum>();
161
162 if self.found_reset
163 || self.bad_checksum
164 || self.buf_range.end - self.buf_range.start >= min_required
165 {
166 return Ok(());
167 }
168
169 self.buf.copy_within(self.buf_range.clone(), 0);
171 self.buf_range = 0..self.buf_range.end - self.buf_range.start;
172
173 while self.buf_range.end - self.buf_range.start < min_required {
174 self.buf.resize(self.buf_range.end + bs, 0);
175 let last_read_checksum = self.last_read_checksum();
176
177 let mut buffer = self.handle.allocate_buffer(bs).await;
179 assert!(self.read_offset % bs as u64 == 0);
180 let bytes_read = self.handle.read(self.read_offset, buffer.as_mut()).await?;
181 if bytes_read != bs {
182 if self.eof_ok {
183 return Ok(());
184 } else {
185 bail!("unexpected end of journal file");
186 }
187 }
188 self.buf.as_mut_slice()[self.buf_range.end..].copy_from_slice(buffer.as_slice());
189
190 let (contents_slice, checksum_slice) =
191 buffer.as_slice().split_at(bs - std::mem::size_of::<Checksum>());
192 let stored_checksum = LittleEndian::read_u64(checksum_slice);
193
194 match verify_checksum(
195 contents_slice,
196 stored_checksum,
197 last_read_checksum,
198 !self.first_read,
199 ) {
200 ChecksumResult::Ok => {}
201 ChecksumResult::Reset => {
202 self.found_reset = true;
207 if let Some(checksum) = self.checksums.last_mut() {
211 *checksum ^= RESET_XOR;
212 }
213 self.checksums.push(stored_checksum);
214 self.read_offset += bs as u64;
215 return Ok(());
216 }
217 ChecksumResult::Bad => {
218 self.bad_checksum = true;
219 return Ok(());
220 }
221 }
222
223 self.first_read = false;
224 self.checksums.push(stored_checksum);
225
226 if self.buf_file_offset > self.read_offset {
229 assert!(self.buf_range.start == 0);
230 self.buf_range = (self.buf_file_offset - self.read_offset) as usize
231 ..self.buf.len() - std::mem::size_of::<Checksum>();
232 } else {
233 self.buf_range =
234 self.buf_range.start..self.buf.len() - std::mem::size_of::<Checksum>();
235 }
236 self.read_offset += bs as u64;
237 }
238 Ok(())
239 }
240
241 pub fn consume(&mut self, amount: usize) {
244 let bs = self.block_size();
245 assert!(amount < bs as usize);
246 let block_offset_before = self.buf_file_offset % bs;
247 self.buf_file_offset += amount as u64;
248 if block_offset_before + amount as u64 >= bs - std::mem::size_of::<Checksum>() as u64 {
251 self.buf_file_offset += std::mem::size_of::<Checksum>() as u64;
252 self.checksums.drain(0..1);
253 }
254 self.buf_range.start += amount
255 }
256
257 pub fn buffer(&self) -> &[u8] {
258 &self.buf[self.buf_range.clone()]
259 }
260
261 fn block_size(&self) -> u64 {
262 BLOCK_SIZE
263 }
264}
265
266enum ChecksumResult {
267 Ok,
268 Reset,
269 Bad,
270}
271
272fn verify_checksum(
273 contents: &[u8],
274 stored_checksum: u64,
275 last_read_checksum: u64,
276 allow_reset: bool,
277) -> ChecksumResult {
278 if stored_checksum == fletcher64(contents, last_read_checksum) {
279 ChecksumResult::Ok
280 } else {
281 if allow_reset {
282 if stored_checksum == fletcher64(contents, last_read_checksum ^ RESET_XOR) {
283 return ChecksumResult::Reset;
284 }
285 }
286 ChecksumResult::Bad
287 }
288}
289
290#[derive(Debug, Eq, PartialEq)]
291pub enum ReadResult<T> {
292 Reset(Version),
293 Some(T),
294 ChecksumMismatch,
295}
296
297#[cfg(test)]
298mod tests {
299 use super::{JournalReader, ReadResult, BLOCK_SIZE};
302 use crate::object_handle::{ObjectHandle, WriteObjectHandle};
303 use crate::object_store::journal::writer::JournalWriter;
304 use crate::object_store::journal::{Checksum, JournalCheckpoint, RESET_XOR};
305 use crate::serialized_types::{Version, VersionedLatest, LATEST_VERSION};
306 use crate::testing::fake_object::{FakeObject, FakeObjectHandle};
307 use std::io::Write;
308 use std::sync::Arc;
309
310 async fn write_items<T: VersionedLatest + std::fmt::Debug>(
311 handle: FakeObjectHandle,
312 items: &[T],
313 ) {
314 let mut writer = JournalWriter::new(BLOCK_SIZE as usize, 0);
315 for item in items {
316 writer.write_record(item).unwrap();
317 }
318 writer.pad_to_block().expect("pad_to_block failed");
319 let mut buf = handle.allocate_buffer(writer.flushable_bytes()).await;
320 let offset = writer.take_flushable(buf.as_mut());
321 handle.write_or_append(Some(offset), buf.as_ref()).await.expect("overwrite failed");
322 }
323
324 #[fuchsia::test]
325 async fn test_read_single_record() {
326 let object = Arc::new(FakeObject::new());
327 let handle = FakeObjectHandle::new(object.clone());
328 let len = BLOCK_SIZE as usize * 2;
330 let mut buf = handle.allocate_buffer(len).await;
331 buf.as_mut_slice().fill(0u8);
332 handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
333 write_items(FakeObjectHandle::new(object.clone()), &[4u32]).await;
334
335 let mut reader = JournalReader::new(
336 FakeObjectHandle::new(object.clone()),
337 &JournalCheckpoint { version: LATEST_VERSION, ..JournalCheckpoint::default() },
338 );
339 let value = reader.deserialize().await.expect("deserialize failed");
340 assert_eq!(value, ReadResult::Some(4u32));
341 }
342
343 #[fuchsia::test]
344 async fn test_journal_file_checkpoint() {
345 let object = Arc::new(FakeObject::new());
346 let checkpoint =
347 JournalCheckpoint { version: LATEST_VERSION, ..JournalCheckpoint::default() };
348 let mut reader = JournalReader::new(FakeObjectHandle::new(object.clone()), &checkpoint);
349 assert_eq!(reader.journal_file_checkpoint(), checkpoint);
350 let handle = FakeObjectHandle::new(object.clone());
352 let len = BLOCK_SIZE as usize * 2;
353 let mut buf = handle.allocate_buffer(len).await;
354 buf.as_mut_slice().fill(0u8);
355 handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
356 write_items(FakeObjectHandle::new(object.clone()), &[4u32, 7u32]).await;
357
358 assert_eq!(reader.deserialize().await.expect("deserialize failed"), ReadResult::Some(4u32));
359
360 let checkpoint = reader.journal_file_checkpoint();
363 let mut reader = JournalReader::new(FakeObjectHandle::new(object.clone()), &checkpoint);
364 assert_eq!(reader.deserialize().await.expect("deserialize failed"), ReadResult::Some(7u32));
365 }
366
367 #[fuchsia::test]
368 async fn test_skip_to_end_of_block() {
369 let object = Arc::new(FakeObject::new());
370 let handle = FakeObjectHandle::new(object.clone());
372 let len = BLOCK_SIZE as usize * 3;
373 let mut buf = handle.allocate_buffer(len).await;
374 buf.as_mut_slice().fill(0u8);
375 handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
376 let mut writer = JournalWriter::new(BLOCK_SIZE as usize, 0);
377 writer.write_record(&4u32).unwrap();
378 writer.pad_to_block().expect("pad_to_block failed");
379 writer.write_record(&7u32).unwrap();
380 writer.pad_to_block().expect("pad_to_block failed");
381 let mut buf = handle.allocate_buffer(writer.flushable_bytes()).await;
382 let offset = writer.take_flushable(buf.as_mut());
383 handle.write_or_append(Some(offset), buf.as_ref()).await.expect("overwrite failed");
384 let mut reader = JournalReader::new(
385 FakeObjectHandle::new(object.clone()),
386 &JournalCheckpoint { version: LATEST_VERSION, ..JournalCheckpoint::default() },
387 );
388 assert_eq!(reader.deserialize().await.expect("deserialize failed"), ReadResult::Some(4u32));
389 reader.skip_to_end_of_block();
390 assert_eq!(reader.deserialize().await.expect("deserialize failed"), ReadResult::Some(7u32));
391 }
392
393 #[fuchsia::test]
394 async fn test_handle() {
395 let object = Arc::new(FakeObject::new());
396 let handle = FakeObjectHandle::new(object.clone());
398 let len = BLOCK_SIZE as usize * 3;
399 let mut buf = handle.allocate_buffer(len).await;
400 buf.as_mut_slice().fill(0u8);
401 handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
402 let mut reader = JournalReader::new(
403 FakeObjectHandle::new(object.clone()),
404 &JournalCheckpoint { version: LATEST_VERSION, ..JournalCheckpoint::default() },
405 );
406 assert_eq!(reader.handle().get_size(), BLOCK_SIZE * 3);
407 }
408
409 #[fuchsia::test]
410 async fn test_item_spanning_block() {
411 let object = Arc::new(FakeObject::new());
412 let handle = FakeObjectHandle::new(object.clone());
414 let len = BLOCK_SIZE as usize * 3;
415 let mut buf = handle.allocate_buffer(len).await;
416 buf.as_mut_slice().fill(0u8);
417 handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
418 let mut writer = JournalWriter::new(BLOCK_SIZE as usize, 0);
419 writer.write_record(&4u8).unwrap();
421 let mut count: i32 = 0;
422 while writer.journal_file_checkpoint().file_offset < BLOCK_SIZE {
423 writer.write_record(&12345678u32).unwrap();
424 count += 1;
425 }
426 assert_ne!(writer.journal_file_checkpoint().file_offset, BLOCK_SIZE);
428 writer.pad_to_block().expect("pad_to_block failed");
429 let mut buf = handle.allocate_buffer(writer.flushable_bytes()).await;
430 let offset = writer.take_flushable(buf.as_mut());
431 handle.write_or_append(Some(offset), buf.as_ref()).await.expect("overwrite failed");
432
433 let mut reader = JournalReader::new(
434 FakeObjectHandle::new(object.clone()),
435 &JournalCheckpoint { version: LATEST_VERSION, ..JournalCheckpoint::default() },
436 );
437 assert_eq!(reader.deserialize().await.expect("deserialize failed"), ReadResult::Some(4u8));
438 for _ in 0..count {
439 assert_eq!(
440 reader.deserialize().await.expect("deserialize failed"),
441 ReadResult::Some(12345678u32)
442 );
443 }
444 }
445
446 #[fuchsia::test]
447 async fn test_reset() {
448 let object = Arc::new(FakeObject::new());
449 let handle = FakeObjectHandle::new(object.clone());
451 let len = BLOCK_SIZE as usize * 3;
452 let mut buf = handle.allocate_buffer(len).await;
453 buf.as_mut_slice().fill(0u8);
454 handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
455 write_items(FakeObjectHandle::new(object.clone()), &[4u32, 7u32]).await;
456
457 let mut reader = JournalReader::new(
458 FakeObjectHandle::new(object.clone()),
459 &JournalCheckpoint { version: LATEST_VERSION, ..JournalCheckpoint::default() },
460 );
461 assert_eq!(reader.deserialize().await.expect("deserialize failed"), ReadResult::Some(4u32));
462 assert_eq!(reader.deserialize().await.expect("deserialize failed"), ReadResult::Some(7u32));
463 reader.skip_to_end_of_block();
464 assert_eq!(
465 reader.deserialize::<u32>().await.expect("deserialize failed"),
466 ReadResult::ChecksumMismatch
467 );
468
469 let mut writer = JournalWriter::new(BLOCK_SIZE as usize, 0);
470 let new_version = Version { minor: LATEST_VERSION.minor + 1, ..LATEST_VERSION };
471 writer.seek(JournalCheckpoint {
472 file_offset: BLOCK_SIZE,
473 checksum: reader.last_read_checksum() ^ RESET_XOR,
474 version: new_version,
475 });
476 new_version.serialize_into(&mut writer).expect("write version failed");
477 writer.write_record(&13u32).unwrap();
478 let checkpoint = writer.journal_file_checkpoint();
479 writer.write_record(&78u32).unwrap();
480 writer.pad_to_block().expect("pad_to_block failed");
481 writer.write_record(&90u32).unwrap();
482 writer.pad_to_block().expect("pad_to_block failed");
483 let mut buf = handle.allocate_buffer(writer.flushable_bytes()).await;
484 let offset = writer.take_flushable(buf.as_mut());
485 handle.write_or_append(Some(offset), buf.as_ref()).await.expect("overwrite failed");
486
487 let mut reader = JournalReader::new(
488 FakeObjectHandle::new(object.clone()),
489 &JournalCheckpoint { version: new_version, ..JournalCheckpoint::default() },
490 );
491 assert_eq!(reader.deserialize().await.expect("deserialize failed"), ReadResult::Some(4u32));
492 assert_eq!(reader.deserialize().await.expect("deserialize failed"), ReadResult::Some(7u32));
493 reader.skip_to_end_of_block();
494 assert_eq!(
495 reader.deserialize::<u32>().await.expect("deserialize failed"),
496 ReadResult::Reset(new_version),
497 );
498 assert_eq!(
499 reader.deserialize().await.expect("deserialize failed"),
500 ReadResult::Some(13u32)
501 );
502 assert_eq!(reader.journal_file_checkpoint(), checkpoint);
503 assert_eq!(
504 reader.deserialize().await.expect("deserialize failed"),
505 ReadResult::Some(78u32)
506 );
507 reader.skip_to_end_of_block();
508 assert_eq!(
509 reader.deserialize().await.expect("deserialize failed"),
510 ReadResult::Some(90u32)
511 );
512
513 let mut reader = JournalReader::new(FakeObjectHandle::new(object.clone()), &checkpoint);
515 assert_eq!(
516 reader.deserialize().await.expect("deserialize failed"),
517 ReadResult::Some(78u32)
518 );
519 reader.skip_to_end_of_block();
520 assert_eq!(
521 reader.deserialize().await.expect("deserialize failed"),
522 ReadResult::Some(90u32)
523 );
524 }
525
526 #[fuchsia::test]
527 async fn test_reader_starting_near_end_of_block() {
528 let object = Arc::new(FakeObject::new());
529 let handle = FakeObjectHandle::new(object.clone());
531 let len = BLOCK_SIZE as usize * 3;
532 let mut buf = handle.allocate_buffer(len).await;
533 buf.as_mut_slice().fill(0u8);
534 handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
535 let mut writer = JournalWriter::new(BLOCK_SIZE as usize, 0);
536 let len = 2 * (BLOCK_SIZE as usize - std::mem::size_of::<Checksum>());
537 assert_eq!(writer.write(&vec![78u8; len]).expect("write failed"), len);
538 let mut buf = handle.allocate_buffer(writer.flushable_bytes()).await;
539 let offset = writer.take_flushable(buf.as_mut());
540 handle.write_or_append(Some(offset), buf.as_ref()).await.expect("overwrite failed");
541
542 let checkpoint = JournalCheckpoint {
543 file_offset: BLOCK_SIZE - std::mem::size_of::<Checksum>() as u64 - 1,
544 checksum: 0,
545 version: LATEST_VERSION,
546 };
547 let mut reader = JournalReader::new(FakeObjectHandle::new(object.clone()), &checkpoint);
548 let mut offset = checkpoint.file_offset as usize;
549 while offset < len {
550 reader.fill_buf().await.expect("fill_buf failed");
551 let mut buf = reader.buffer();
552 let amount = std::cmp::min(buf.len(), len - offset);
553 buf = &buf[..amount];
554 assert_eq!(&buf[..amount], vec![78; amount]);
555 offset += amount;
556 reader.consume(amount);
557 }
558 }
559
560 #[fuchsia::test]
561 async fn test_write_to_block_boundary() {
562 let object = Arc::new(FakeObject::new());
563 let handle = FakeObjectHandle::new(object.clone());
565 let len = BLOCK_SIZE as usize * 3;
566 let mut buf = handle.allocate_buffer(len).await;
567 buf.as_mut_slice().fill(0u8);
568 handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
569 let mut writer = JournalWriter::new(BLOCK_SIZE as usize, 0);
570 let len = BLOCK_SIZE as usize - std::mem::size_of::<Checksum>();
571 assert_eq!(writer.write(&vec![78u8; len]).expect("write failed"), len);
572 let mut buf = handle.allocate_buffer(writer.flushable_bytes()).await;
573 let offset = writer.take_flushable(buf.as_mut());
574 handle.write_or_append(Some(offset), buf.as_ref()).await.expect("overwrite failed");
575 assert_eq!(offset, 0);
576
577 let writer_checkpoint = writer.journal_file_checkpoint();
579 assert_eq!(writer_checkpoint.file_offset, BLOCK_SIZE);
580
581 let mut reader = JournalReader::new(
582 FakeObjectHandle::new(object.clone()),
583 &JournalCheckpoint::default(),
584 );
585 reader.set_version(LATEST_VERSION);
586
587 reader.fill_buf().await.expect("fill_buf failed");
588
589 assert_eq!(&reader.buffer()[..len], &buf.as_slice()[..len]);
590
591 reader.consume(len);
592
593 assert_eq!(reader.journal_file_checkpoint(), writer_checkpoint);
594 }
595}