fxfs/object_store/journal/
reader.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::checksum::{fletcher64, Checksum};
6use crate::object_store::journal::{JournalCheckpoint, JournalHandle, BLOCK_SIZE, RESET_XOR};
7use crate::serialized_types::{Version, Versioned, VersionedLatest};
8use anyhow::{bail, Context, Error};
9use byteorder::{ByteOrder, LittleEndian};
10
11/// JournalReader supports reading from a journal file which consist of blocks that have a trailing
12/// fletcher64 checksum in the last 8 bytes of each block.  The checksum takes the check-sum of the
13/// preceding block as an input to the next block so that merely copying a block to a different
14/// location will cause a checksum failure.  The serialization of a single record *must* fit within
15/// a single block.
16pub struct JournalReader {
17    // The handle of the journal file that we are reading.
18    handle: Box<dyn JournalHandle>,
19
20    // The currently buffered data.
21    buf: Vec<u8>,
22
23    // The range within the buffer containing outstanding data.
24    buf_range: std::ops::Range<usize>,
25
26    // The next offset we should read from in handle.
27    read_offset: u64,
28
29    // The file offset for next record due to be deserialized.
30    buf_file_offset: u64,
31
32    // The checksums required for the currently buffered data.  The first checksum is for
33    // the *preceding* block since that is what is required to generate a checkpoint.
34    checksums: Vec<Checksum>,
35
36    // The version of types to deserialize.
37    version: Version,
38
39    // Indicates a bad checksum has been detected and no more data can be read.
40    bad_checksum: bool,
41
42    // Indicates a reset checksum has been detected.
43    found_reset: bool,
44
45    // Indicates whether the next read is the first read.
46    first_read: bool,
47
48    /// For the filesystem journal, we should never hit the end of the file but
49    /// when initially reading the superblock, we only know the first block.
50    /// In this case, we don't treat EOF in fill_buf as an error.
51    eof_ok: bool,
52}
53
54impl JournalReader {
55    pub(super) fn new(handle: impl JournalHandle, checkpoint: &JournalCheckpoint) -> Self {
56        JournalReader {
57            handle: Box::new(handle),
58            buf: Vec::new(),
59            buf_range: 0..0,
60            read_offset: checkpoint.file_offset - checkpoint.file_offset % BLOCK_SIZE,
61            buf_file_offset: checkpoint.file_offset,
62            checksums: vec![checkpoint.checksum],
63            version: checkpoint.version,
64            bad_checksum: false,
65            found_reset: false,
66            first_read: true,
67            eof_ok: false,
68        }
69    }
70
71    pub(super) fn journal_file_checkpoint(&self) -> JournalCheckpoint {
72        JournalCheckpoint {
73            file_offset: self.buf_file_offset,
74            checksum: self.checksums[0],
75            version: self.version,
76        }
77    }
78
79    fn last_read_checksum(&self) -> Checksum {
80        *self.checksums.last().unwrap()
81    }
82
83    /// Sets the version of Key and Value to deserialize.
84    /// This can change across Journal RESET boundaries when the tail of the journal is written to
85    /// by newer versions of the code.
86    pub fn set_version(&mut self, version: Version) {
87        self.version = version;
88
89        // Reset bad_checksum in case this version change triggers a change in the block size.
90        self.bad_checksum = false;
91    }
92
93    /// Used by superblock reader where we expect to hit EOF until we hit the first extent record.
94    pub fn set_eof_ok(&mut self) {
95        self.eof_ok = true;
96    }
97
98    /// To allow users to flush a block, they can store a record that indicates the rest of the
99    /// block should be skipped.  When that record is read, users should call this function.
100    pub fn skip_to_end_of_block(&mut self) {
101        let bs = self.block_size();
102        let block_offset = self.buf_file_offset % bs;
103        if block_offset > 0 {
104            self.consume((bs - block_offset) as usize - std::mem::size_of::<Checksum>());
105        }
106    }
107
108    pub fn handle(&mut self) -> &mut dyn JournalHandle {
109        self.handle.as_mut()
110    }
111
112    /// Tries to deserialize a record of type T from the journal stream.  It might return
113    /// ReadResult::Reset if a reset marker is encountered, or ReadResult::ChecksumMismatch (which
114    /// is expected at the end of the journal stream).
115    pub async fn deserialize<T: VersionedLatest>(&mut self) -> Result<ReadResult<T>, Error>
116    where
117        T: Versioned,
118    {
119        self.fill_buf().await?;
120        let mut cursor = std::io::Cursor::new(self.buffer());
121        let version = self.version;
122        match T::deserialize_from_version(&mut cursor, version) {
123            Ok(record) => {
124                let consumed = cursor.position() as usize;
125                self.consume(consumed);
126                Ok(ReadResult::Some(record))
127            }
128            Err(e) => {
129                if self.found_reset {
130                    self.found_reset = false;
131
132                    // Fix up buf_range now...
133                    self.consume(self.buf_range.end - self.buf_range.start);
134                    self.buf_range =
135                        self.buf_range.end..self.buf.len() - std::mem::size_of::<Checksum>();
136                    let (version, to_consume) = {
137                        let mut cursor = std::io::Cursor::new(self.buffer());
138                        let version = Version::deserialize_from(&mut cursor)
139                            .context("Failed to deserialize version")?;
140                        (version, cursor.position() as usize)
141                    };
142                    self.version = version.clone();
143                    self.consume(to_consume);
144                    return Ok(ReadResult::Reset(version));
145                } else if let Some(io_error) = e.downcast_ref::<std::io::Error>() {
146                    if io_error.kind() == std::io::ErrorKind::UnexpectedEof && self.bad_checksum {
147                        return Ok(ReadResult::ChecksumMismatch);
148                    }
149                }
150                Err(e.into())
151            }
152        }
153    }
154
155    // Fills the buffer so that at least one blocks worth of data is contained within the buffer.
156    // After reading a block, it verifies the checksum.  Once done, it should be possible to
157    // deserialize records.
158    pub async fn fill_buf(&mut self) -> Result<(), Error> {
159        let bs = self.block_size() as usize;
160        let min_required = bs - std::mem::size_of::<Checksum>();
161
162        if self.found_reset
163            || self.bad_checksum
164            || self.buf_range.end - self.buf_range.start >= min_required
165        {
166            return Ok(());
167        }
168
169        // Before we read, shuffle existing data to the beginning of the buffer.
170        self.buf.copy_within(self.buf_range.clone(), 0);
171        self.buf_range = 0..self.buf_range.end - self.buf_range.start;
172
173        while self.buf_range.end - self.buf_range.start < min_required {
174            self.buf.resize(self.buf_range.end + bs, 0);
175            let last_read_checksum = self.last_read_checksum();
176
177            // Read the next block's worth, verify its checksum, and append it to |buf|.
178            let mut buffer = self.handle.allocate_buffer(bs).await;
179            assert!(self.read_offset % bs as u64 == 0);
180            let bytes_read = self.handle.read(self.read_offset, buffer.as_mut()).await?;
181            if bytes_read != bs {
182                if self.eof_ok {
183                    return Ok(());
184                } else {
185                    bail!("unexpected end of journal file");
186                }
187            }
188            self.buf.as_mut_slice()[self.buf_range.end..].copy_from_slice(buffer.as_slice());
189
190            let (contents_slice, checksum_slice) =
191                buffer.as_slice().split_at(bs - std::mem::size_of::<Checksum>());
192            let stored_checksum = LittleEndian::read_u64(checksum_slice);
193
194            match verify_checksum(
195                contents_slice,
196                stored_checksum,
197                last_read_checksum,
198                !self.first_read,
199            ) {
200                ChecksumResult::Ok => {}
201                ChecksumResult::Reset => {
202                    // Record that we've encountered a reset in the stream (a point where the
203                    // journal wasn't cleanly closed in the past) and it starts afresh in this
204                    // block.  We don't adjust buf_range until the reset has been processed, but
205                    // we do push the checksum.
206                    self.found_reset = true;
207                    // We need to adjust the last pushed checksum since it could possibly be
208                    // for checkpoints later (see Self::journal_file_checkpoint), and
209                    // checkpoints should always use the correct seed.
210                    if let Some(checksum) = self.checksums.last_mut() {
211                        *checksum ^= RESET_XOR;
212                    }
213                    self.checksums.push(stored_checksum);
214                    self.read_offset += bs as u64;
215                    return Ok(());
216                }
217                ChecksumResult::Bad => {
218                    self.bad_checksum = true;
219                    return Ok(());
220                }
221            }
222
223            self.first_read = false;
224            self.checksums.push(stored_checksum);
225
226            // If this was our first read, our buffer offset might be somewhere within the middle of
227            // the block, so we need to adjust buf_range accordingly.
228            if self.buf_file_offset > self.read_offset {
229                assert!(self.buf_range.start == 0);
230                self.buf_range = (self.buf_file_offset - self.read_offset) as usize
231                    ..self.buf.len() - std::mem::size_of::<Checksum>();
232            } else {
233                self.buf_range =
234                    self.buf_range.start..self.buf.len() - std::mem::size_of::<Checksum>();
235            }
236            self.read_offset += bs as u64;
237        }
238        Ok(())
239    }
240
241    // Used after deserializing to indicate how many bytes were deserialized and adjusts the buffer
242    // pointers accordingly.
243    pub fn consume(&mut self, amount: usize) {
244        let bs = self.block_size();
245        assert!(amount < bs as usize);
246        let block_offset_before = self.buf_file_offset % bs;
247        self.buf_file_offset += amount as u64;
248        // If we crossed a block boundary, then the file offset needs be incremented by the size of
249        // the checksum.
250        if block_offset_before + amount as u64 >= bs - std::mem::size_of::<Checksum>() as u64 {
251            self.buf_file_offset += std::mem::size_of::<Checksum>() as u64;
252            self.checksums.drain(0..1);
253        }
254        self.buf_range.start += amount
255    }
256
257    pub fn buffer(&self) -> &[u8] {
258        &self.buf[self.buf_range.clone()]
259    }
260
261    fn block_size(&self) -> u64 {
262        BLOCK_SIZE
263    }
264}
265
266enum ChecksumResult {
267    Ok,
268    Reset,
269    Bad,
270}
271
272fn verify_checksum(
273    contents: &[u8],
274    stored_checksum: u64,
275    last_read_checksum: u64,
276    allow_reset: bool,
277) -> ChecksumResult {
278    if stored_checksum == fletcher64(contents, last_read_checksum) {
279        ChecksumResult::Ok
280    } else {
281        if allow_reset {
282            if stored_checksum == fletcher64(contents, last_read_checksum ^ RESET_XOR) {
283                return ChecksumResult::Reset;
284            }
285        }
286        ChecksumResult::Bad
287    }
288}
289
290#[derive(Debug, Eq, PartialEq)]
291pub enum ReadResult<T> {
292    Reset(Version),
293    Some(T),
294    ChecksumMismatch,
295}
296
297#[cfg(test)]
298mod tests {
299    // The following tests use JournalWriter to test our reader implementation. This works so long
300    // as JournalWriter doesn't use JournalReader to test its implementation.
301    use super::{JournalReader, ReadResult, BLOCK_SIZE};
302    use crate::object_handle::{ObjectHandle, WriteObjectHandle};
303    use crate::object_store::journal::writer::JournalWriter;
304    use crate::object_store::journal::{Checksum, JournalCheckpoint, RESET_XOR};
305    use crate::serialized_types::{Version, VersionedLatest, LATEST_VERSION};
306    use crate::testing::fake_object::{FakeObject, FakeObjectHandle};
307    use std::io::Write;
308    use std::sync::Arc;
309
310    async fn write_items<T: VersionedLatest + std::fmt::Debug>(
311        handle: FakeObjectHandle,
312        items: &[T],
313    ) {
314        let mut writer = JournalWriter::new(BLOCK_SIZE as usize, 0);
315        for item in items {
316            writer.write_record(item).unwrap();
317        }
318        writer.pad_to_block().expect("pad_to_block failed");
319        let mut buf = handle.allocate_buffer(writer.flushable_bytes()).await;
320        let offset = writer.take_flushable(buf.as_mut());
321        handle.write_or_append(Some(offset), buf.as_ref()).await.expect("overwrite failed");
322    }
323
324    #[fuchsia::test]
325    async fn test_read_single_record() {
326        let object = Arc::new(FakeObject::new());
327        let handle = FakeObjectHandle::new(object.clone());
328        // Make the journal file a minimum of two blocks since reading to EOF is an error.
329        let len = BLOCK_SIZE as usize * 2;
330        let mut buf = handle.allocate_buffer(len).await;
331        buf.as_mut_slice().fill(0u8);
332        handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
333        write_items(FakeObjectHandle::new(object.clone()), &[4u32]).await;
334
335        let mut reader = JournalReader::new(
336            FakeObjectHandle::new(object.clone()),
337            &JournalCheckpoint { version: LATEST_VERSION, ..JournalCheckpoint::default() },
338        );
339        let value = reader.deserialize().await.expect("deserialize failed");
340        assert_eq!(value, ReadResult::Some(4u32));
341    }
342
343    #[fuchsia::test]
344    async fn test_journal_file_checkpoint() {
345        let object = Arc::new(FakeObject::new());
346        let checkpoint =
347            JournalCheckpoint { version: LATEST_VERSION, ..JournalCheckpoint::default() };
348        let mut reader = JournalReader::new(FakeObjectHandle::new(object.clone()), &checkpoint);
349        assert_eq!(reader.journal_file_checkpoint(), checkpoint);
350        // Make the journal file a minimum of two blocks since reading to EOF is an error.
351        let handle = FakeObjectHandle::new(object.clone());
352        let len = BLOCK_SIZE as usize * 2;
353        let mut buf = handle.allocate_buffer(len).await;
354        buf.as_mut_slice().fill(0u8);
355        handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
356        write_items(FakeObjectHandle::new(object.clone()), &[4u32, 7u32]).await;
357
358        assert_eq!(reader.deserialize().await.expect("deserialize failed"), ReadResult::Some(4u32));
359
360        // If we take the checkpoint here and then create another reader, we should see the second
361        // item.
362        let checkpoint = reader.journal_file_checkpoint();
363        let mut reader = JournalReader::new(FakeObjectHandle::new(object.clone()), &checkpoint);
364        assert_eq!(reader.deserialize().await.expect("deserialize failed"), ReadResult::Some(7u32));
365    }
366
367    #[fuchsia::test]
368    async fn test_skip_to_end_of_block() {
369        let object = Arc::new(FakeObject::new());
370        // Make the journal file a minimum of two blocks since reading to EOF is an error.
371        let handle = FakeObjectHandle::new(object.clone());
372        let len = BLOCK_SIZE as usize * 3;
373        let mut buf = handle.allocate_buffer(len).await;
374        buf.as_mut_slice().fill(0u8);
375        handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
376        let mut writer = JournalWriter::new(BLOCK_SIZE as usize, 0);
377        writer.write_record(&4u32).unwrap();
378        writer.pad_to_block().expect("pad_to_block failed");
379        writer.write_record(&7u32).unwrap();
380        writer.pad_to_block().expect("pad_to_block failed");
381        let mut buf = handle.allocate_buffer(writer.flushable_bytes()).await;
382        let offset = writer.take_flushable(buf.as_mut());
383        handle.write_or_append(Some(offset), buf.as_ref()).await.expect("overwrite failed");
384        let mut reader = JournalReader::new(
385            FakeObjectHandle::new(object.clone()),
386            &JournalCheckpoint { version: LATEST_VERSION, ..JournalCheckpoint::default() },
387        );
388        assert_eq!(reader.deserialize().await.expect("deserialize failed"), ReadResult::Some(4u32));
389        reader.skip_to_end_of_block();
390        assert_eq!(reader.deserialize().await.expect("deserialize failed"), ReadResult::Some(7u32));
391    }
392
393    #[fuchsia::test]
394    async fn test_handle() {
395        let object = Arc::new(FakeObject::new());
396        // Make the journal file a minimum of two blocks since reading to EOF is an error.
397        let handle = FakeObjectHandle::new(object.clone());
398        let len = BLOCK_SIZE as usize * 3;
399        let mut buf = handle.allocate_buffer(len).await;
400        buf.as_mut_slice().fill(0u8);
401        handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
402        let mut reader = JournalReader::new(
403            FakeObjectHandle::new(object.clone()),
404            &JournalCheckpoint { version: LATEST_VERSION, ..JournalCheckpoint::default() },
405        );
406        assert_eq!(reader.handle().get_size(), BLOCK_SIZE * 3);
407    }
408
409    #[fuchsia::test]
410    async fn test_item_spanning_block() {
411        let object = Arc::new(FakeObject::new());
412        // Make the journal file a minimum of two blocks since reading to EOF is an error.
413        let handle = FakeObjectHandle::new(object.clone());
414        let len = BLOCK_SIZE as usize * 3;
415        let mut buf = handle.allocate_buffer(len).await;
416        buf.as_mut_slice().fill(0u8);
417        handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
418        let mut writer = JournalWriter::new(BLOCK_SIZE as usize, 0);
419        // Write one byte so that everything else is misaligned.
420        writer.write_record(&4u8).unwrap();
421        let mut count: i32 = 0;
422        while writer.journal_file_checkpoint().file_offset < BLOCK_SIZE {
423            writer.write_record(&12345678u32).unwrap();
424            count += 1;
425        }
426        // Check that writing didn't end up being aligned on a block.
427        assert_ne!(writer.journal_file_checkpoint().file_offset, BLOCK_SIZE);
428        writer.pad_to_block().expect("pad_to_block failed");
429        let mut buf = handle.allocate_buffer(writer.flushable_bytes()).await;
430        let offset = writer.take_flushable(buf.as_mut());
431        handle.write_or_append(Some(offset), buf.as_ref()).await.expect("overwrite failed");
432
433        let mut reader = JournalReader::new(
434            FakeObjectHandle::new(object.clone()),
435            &JournalCheckpoint { version: LATEST_VERSION, ..JournalCheckpoint::default() },
436        );
437        assert_eq!(reader.deserialize().await.expect("deserialize failed"), ReadResult::Some(4u8));
438        for _ in 0..count {
439            assert_eq!(
440                reader.deserialize().await.expect("deserialize failed"),
441                ReadResult::Some(12345678u32)
442            );
443        }
444    }
445
446    #[fuchsia::test]
447    async fn test_reset() {
448        let object = Arc::new(FakeObject::new());
449        // Make the journal file a minimum of two blocks since reading to EOF is an error.
450        let handle = FakeObjectHandle::new(object.clone());
451        let len = BLOCK_SIZE as usize * 3;
452        let mut buf = handle.allocate_buffer(len).await;
453        buf.as_mut_slice().fill(0u8);
454        handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
455        write_items(FakeObjectHandle::new(object.clone()), &[4u32, 7u32]).await;
456
457        let mut reader = JournalReader::new(
458            FakeObjectHandle::new(object.clone()),
459            &JournalCheckpoint { version: LATEST_VERSION, ..JournalCheckpoint::default() },
460        );
461        assert_eq!(reader.deserialize().await.expect("deserialize failed"), ReadResult::Some(4u32));
462        assert_eq!(reader.deserialize().await.expect("deserialize failed"), ReadResult::Some(7u32));
463        reader.skip_to_end_of_block();
464        assert_eq!(
465            reader.deserialize::<u32>().await.expect("deserialize failed"),
466            ReadResult::ChecksumMismatch
467        );
468
469        let mut writer = JournalWriter::new(BLOCK_SIZE as usize, 0);
470        let new_version = Version { minor: LATEST_VERSION.minor + 1, ..LATEST_VERSION };
471        writer.seek(JournalCheckpoint {
472            file_offset: BLOCK_SIZE,
473            checksum: reader.last_read_checksum() ^ RESET_XOR,
474            version: new_version,
475        });
476        new_version.serialize_into(&mut writer).expect("write version failed");
477        writer.write_record(&13u32).unwrap();
478        let checkpoint = writer.journal_file_checkpoint();
479        writer.write_record(&78u32).unwrap();
480        writer.pad_to_block().expect("pad_to_block failed");
481        writer.write_record(&90u32).unwrap();
482        writer.pad_to_block().expect("pad_to_block failed");
483        let mut buf = handle.allocate_buffer(writer.flushable_bytes()).await;
484        let offset = writer.take_flushable(buf.as_mut());
485        handle.write_or_append(Some(offset), buf.as_ref()).await.expect("overwrite failed");
486
487        let mut reader = JournalReader::new(
488            FakeObjectHandle::new(object.clone()),
489            &JournalCheckpoint { version: new_version, ..JournalCheckpoint::default() },
490        );
491        assert_eq!(reader.deserialize().await.expect("deserialize failed"), ReadResult::Some(4u32));
492        assert_eq!(reader.deserialize().await.expect("deserialize failed"), ReadResult::Some(7u32));
493        reader.skip_to_end_of_block();
494        assert_eq!(
495            reader.deserialize::<u32>().await.expect("deserialize failed"),
496            ReadResult::Reset(new_version),
497        );
498        assert_eq!(
499            reader.deserialize().await.expect("deserialize failed"),
500            ReadResult::Some(13u32)
501        );
502        assert_eq!(reader.journal_file_checkpoint(), checkpoint);
503        assert_eq!(
504            reader.deserialize().await.expect("deserialize failed"),
505            ReadResult::Some(78u32)
506        );
507        reader.skip_to_end_of_block();
508        assert_eq!(
509            reader.deserialize().await.expect("deserialize failed"),
510            ReadResult::Some(90u32)
511        );
512
513        // Make sure a reader can start from the middle of a reset block.
514        let mut reader = JournalReader::new(FakeObjectHandle::new(object.clone()), &checkpoint);
515        assert_eq!(
516            reader.deserialize().await.expect("deserialize failed"),
517            ReadResult::Some(78u32)
518        );
519        reader.skip_to_end_of_block();
520        assert_eq!(
521            reader.deserialize().await.expect("deserialize failed"),
522            ReadResult::Some(90u32)
523        );
524    }
525
526    #[fuchsia::test]
527    async fn test_reader_starting_near_end_of_block() {
528        let object = Arc::new(FakeObject::new());
529        // Make the journal file a minimum of two blocks since reading to EOF is an error.
530        let handle = FakeObjectHandle::new(object.clone());
531        let len = BLOCK_SIZE as usize * 3;
532        let mut buf = handle.allocate_buffer(len).await;
533        buf.as_mut_slice().fill(0u8);
534        handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
535        let mut writer = JournalWriter::new(BLOCK_SIZE as usize, 0);
536        let len = 2 * (BLOCK_SIZE as usize - std::mem::size_of::<Checksum>());
537        assert_eq!(writer.write(&vec![78u8; len]).expect("write failed"), len);
538        let mut buf = handle.allocate_buffer(writer.flushable_bytes()).await;
539        let offset = writer.take_flushable(buf.as_mut());
540        handle.write_or_append(Some(offset), buf.as_ref()).await.expect("overwrite failed");
541
542        let checkpoint = JournalCheckpoint {
543            file_offset: BLOCK_SIZE - std::mem::size_of::<Checksum>() as u64 - 1,
544            checksum: 0,
545            version: LATEST_VERSION,
546        };
547        let mut reader = JournalReader::new(FakeObjectHandle::new(object.clone()), &checkpoint);
548        let mut offset = checkpoint.file_offset as usize;
549        while offset < len {
550            reader.fill_buf().await.expect("fill_buf failed");
551            let mut buf = reader.buffer();
552            let amount = std::cmp::min(buf.len(), len - offset);
553            buf = &buf[..amount];
554            assert_eq!(&buf[..amount], vec![78; amount]);
555            offset += amount;
556            reader.consume(amount);
557        }
558    }
559
560    #[fuchsia::test]
561    async fn test_write_to_block_boundary() {
562        let object = Arc::new(FakeObject::new());
563        // Make the journal file a minimum of two blocks since reading to EOF is an error.
564        let handle = FakeObjectHandle::new(object.clone());
565        let len = BLOCK_SIZE as usize * 3;
566        let mut buf = handle.allocate_buffer(len).await;
567        buf.as_mut_slice().fill(0u8);
568        handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
569        let mut writer = JournalWriter::new(BLOCK_SIZE as usize, 0);
570        let len = BLOCK_SIZE as usize - std::mem::size_of::<Checksum>();
571        assert_eq!(writer.write(&vec![78u8; len]).expect("write failed"), len);
572        let mut buf = handle.allocate_buffer(writer.flushable_bytes()).await;
573        let offset = writer.take_flushable(buf.as_mut());
574        handle.write_or_append(Some(offset), buf.as_ref()).await.expect("overwrite failed");
575        assert_eq!(offset, 0);
576
577        // The checkpoint should be at the start of the next block.
578        let writer_checkpoint = writer.journal_file_checkpoint();
579        assert_eq!(writer_checkpoint.file_offset, BLOCK_SIZE);
580
581        let mut reader = JournalReader::new(
582            FakeObjectHandle::new(object.clone()),
583            &JournalCheckpoint::default(),
584        );
585        reader.set_version(LATEST_VERSION);
586
587        reader.fill_buf().await.expect("fill_buf failed");
588
589        assert_eq!(&reader.buffer()[..len], &buf.as_slice()[..len]);
590
591        reader.consume(len);
592
593        assert_eq!(reader.journal_file_checkpoint(), writer_checkpoint);
594    }
595}