1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
45use crate::checksum::{fletcher64, Checksum};
6use crate::log::*;
7use crate::metrics;
8use crate::object_store::journal::JournalCheckpoint;
9use crate::serialized_types::{Versioned, LATEST_VERSION};
10use anyhow::{anyhow, Error};
11use byteorder::{LittleEndian, WriteBytesExt};
12use fuchsia_inspect::{Property as _, UintProperty};
13use std::cmp::min;
14use std::io::Write;
15use storage_device::buffer::MutableBufferRef;
1617/// JournalWriter is responsible for writing log records to a journal file. Each block contains a
18/// fletcher64 checksum at the end of the block. This is used by both the main journal file and the
19/// super-block.
20pub struct JournalWriter {
21// The block size used for this journal file.
22block_size: usize,
2324// The checkpoint of the last write.
25checkpoint: JournalCheckpoint,
2627// The last checksum we wrote to the buffer.
28last_checksum: Checksum,
2930// The buffered data for the current block.
31buf: Vec<u8>,
3233// Current journal offset based on the checkpoint of the last write
34journal_checkpoint_offset: UintProperty,
35}
3637impl JournalWriter {
38pub fn new(block_size: usize, last_checksum: u64) -> Self {
39// We must set the correct version here because the journal is written to when
40 // formatting as part of creating the allocator and must be ready to go.
41let checkpoint =
42 JournalCheckpoint { version: LATEST_VERSION, ..JournalCheckpoint::default() };
43 JournalWriter {
44 block_size,
45 checkpoint,
46 last_checksum,
47 buf: Vec::new(),
48 journal_checkpoint_offset: metrics::detail()
49 .create_uint("journal_checkpoint_offset", 0),
50 }
51 }
5253/// Serializes a new journal record to the journal stream.
54pub fn write_record<T: Versioned + std::fmt::Debug>(
55&mut self,
56 record: &T,
57 ) -> Result<(), Error> {
58let buf_len = self.buf.len();
59 record.serialize_into(&mut *self).unwrap(); // Our write implementation cannot fail at the
60 // moment.
6162 // For now, our reader cannot handle records that are bigger than a block.
63if self.buf.len() - buf_len <= self.block_size {
64Ok(())
65 } else {
66Err(anyhow!(
67"Serialized record too big ({} bytes): {:?}",
68self.buf.len() - buf_len,
69 record
70 ))
71 }
72 }
7374/// Pads from the current offset in the buffer to the end of the block.
75pub fn pad_to_block(&mut self) -> std::io::Result<()> {
76let align = self.buf.len() % self.block_size;
77if align > 0 {
78self.write_all(&vec![0; self.block_size - std::mem::size_of::<Checksum>() - align])?;
79 }
80Ok(())
81 }
8283/// Returns the checkpoint that corresponds to the current location in the journal stream
84 /// assuming that it has been flushed.
85pub(super) fn journal_file_checkpoint(&self) -> JournalCheckpoint {
86 JournalCheckpoint {
87 file_offset: self.checkpoint.file_offset + self.buf.len() as u64,
88 checksum: self.last_checksum,
89 version: self.checkpoint.version,
90 }
91 }
9293/// Returns the number of bytes that are ready to be flushed.
94pub fn flushable_bytes(&self) -> usize {
95self.buf.len() - self.buf.len() % self.block_size
96 }
9798/// Fills `buf` with as many outstanding complete blocks from the journal object as possible, so
99 /// they can be flushed. Part blocks can be flushed by calling pad_to_block first. Returns the
100 /// checkpoint of the last flushed bit of data. The caller must ensure there's data available
101 /// to flush by first checking `Self::flushable_bytes`.
102 /// If `buf` is smaller than the available blocks, more data will still be available.
103pub fn take_flushable<'a>(&mut self, mut buf: MutableBufferRef<'a>) -> u64 {
104// The buffer should always be completely filled.
105assert!(self.flushable_bytes() >= buf.len());
106let len = buf.len();
107debug_assert!(len % self.block_size == 0);
108 buf.as_mut_slice().copy_from_slice(&self.buf[..len]);
109let offset = self.checkpoint.file_offset;
110self.journal_checkpoint_offset.set(offset);
111self.buf.drain(..len);
112self.checkpoint.file_offset += len as u64;
113self.checkpoint.checksum = self.last_checksum;
114 offset
115 }
116117/// Seeks to the given offset in the journal file -- to be used once replay has finished.
118 ///
119 /// Note that the Journal expects a 4-byte record version to be the first thing after a
120 /// "RESET" event, which is generally the only time we would use seek.
121 ///
122 /// The offset of |checkpoint| must be block-aligned. This is because the writer should never
123 /// be configured to start overwriting a partially written block -- doing so could leave
124 /// previously journaled data in an inconsistent state. The writer must always start at a fresh
125 /// block, and a reset marker should be used to terminate the previous block.
126pub fn seek(&mut self, checkpoint: JournalCheckpoint) {
127assert!(self.buf.is_empty());
128assert!(checkpoint.file_offset % self.block_size as u64 == 0);
129self.checkpoint = checkpoint;
130self.last_checksum = self.checkpoint.checksum;
131self.journal_checkpoint_offset.set(self.checkpoint.file_offset);
132 }
133}
134135impl std::io::Write for JournalWriter {
136fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
137let mut offset = 0;
138while offset < buf.len() {
139let space = self.block_size
140 - std::mem::size_of::<Checksum>()
141 - self.buf.len() % self.block_size;
142let to_copy = min(space, buf.len() - offset);
143self.buf.write_all(&buf[offset..offset + to_copy])?;
144if to_copy == space {
145let end = self.buf.len();
146let start = end + std::mem::size_of::<Checksum>() - self.block_size;
147self.last_checksum = fletcher64(&self.buf[start..end], self.last_checksum);
148self.buf.write_u64::<LittleEndian>(self.last_checksum)?;
149 }
150 offset += to_copy;
151 }
152Ok(buf.len())
153 }
154155// This does nothing because it's sync. Users must call the async flush_buffer function to
156 // flush outstanding data.
157fn flush(&mut self) -> std::io::Result<()> {
158Ok(())
159 }
160}
161162impl Drop for JournalWriter {
163fn drop(&mut self) {
164// If this message is logged it means we forgot to call flush_buffer(), which in turn might
165 // mean Journal::sync() was not called.
166if self.buf.len() > 0 {
167warn!("journal data dropped!");
168 }
169 }
170}
171172#[cfg(test)]
173mod tests {
174use super::JournalWriter;
175use crate::checksum::{fletcher64, Checksum};
176use crate::object_handle::{ObjectHandle, ReadObjectHandle, WriteObjectHandle};
177use crate::object_store::journal::JournalCheckpoint;
178use crate::serialized_types::*;
179use crate::testing::fake_object::{FakeObject, FakeObjectHandle};
180use byteorder::{ByteOrder, LittleEndian};
181use std::sync::Arc;
182183const TEST_BLOCK_SIZE: usize = 512;
184185#[fuchsia::test]
186async fn test_write_single_record_and_pad() {
187let object = Arc::new(FakeObject::new());
188let mut writer = JournalWriter::new(TEST_BLOCK_SIZE, 0);
189 writer.write_record(&4u32).unwrap();
190 writer.pad_to_block().expect("pad_to_block failed");
191let handle = FakeObjectHandle::new(object.clone());
192let mut buf = handle.allocate_buffer(writer.flushable_bytes()).await;
193let offset = writer.take_flushable(buf.as_mut());
194 handle.write_or_append(Some(offset), buf.as_ref()).await.expect("overwrite failed");
195196let handle = FakeObjectHandle::new(object.clone());
197let mut buf = handle.allocate_buffer(object.get_size() as usize).await;
198assert_eq!(buf.len(), TEST_BLOCK_SIZE);
199 handle.read(0, buf.as_mut()).await.expect("read failed");
200let mut cursor = std::io::Cursor::new(buf.as_slice());
201let value: u32 =
202 u32::deserialize_from(&mut cursor, LATEST_VERSION).expect("deserialize_from failed");
203assert_eq!(value, 4u32);
204let (payload, checksum_slice) =
205 buf.as_slice().split_at(buf.len() - std::mem::size_of::<Checksum>());
206let checksum = LittleEndian::read_u64(checksum_slice);
207assert_eq!(checksum, fletcher64(payload, 0));
208assert_eq!(
209 writer.journal_file_checkpoint(),
210 JournalCheckpoint {
211 file_offset: TEST_BLOCK_SIZE as u64,
212 checksum,
213 version: LATEST_VERSION,
214 }
215 );
216 }
217218#[fuchsia::test]
219async fn test_journal_file_checkpoint() {
220let object = Arc::new(FakeObject::new());
221let mut writer = JournalWriter::new(TEST_BLOCK_SIZE, 0);
222 writer.write_record(&4u32).unwrap();
223let checkpoint = writer.journal_file_checkpoint();
224assert_eq!(checkpoint.checksum, 0);
225 writer.write_record(&17u64).unwrap();
226 writer.pad_to_block().expect("pad_to_block failed");
227let handle = FakeObjectHandle::new(object.clone());
228let mut buf = handle.allocate_buffer(writer.flushable_bytes()).await;
229let offset = writer.take_flushable(buf.as_mut());
230 handle.write_or_append(Some(offset), buf.as_ref()).await.expect("overwrite failed");
231232let handle = FakeObjectHandle::new(object.clone());
233let mut buf = handle.allocate_buffer(object.get_size() as usize).await;
234assert_eq!(buf.len(), TEST_BLOCK_SIZE);
235 handle.read(0, buf.as_mut()).await.expect("read failed");
236let mut cursor = std::io::Cursor::new(&buf.as_slice()[checkpoint.file_offset as usize..]);
237let value: u64 =
238 u64::deserialize_from(&mut cursor, LATEST_VERSION).expect("deserialize_from failed");
239assert_eq!(value, 17);
240 }
241}