fxfs/object_store/journal/
writer.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::checksum::{fletcher64, Checksum};
6use crate::log::*;
7use crate::metrics;
8use crate::object_store::journal::JournalCheckpoint;
9use crate::serialized_types::{Versioned, LATEST_VERSION};
10use anyhow::{anyhow, Error};
11use byteorder::{LittleEndian, WriteBytesExt};
12use fuchsia_inspect::{Property as _, UintProperty};
13use std::cmp::min;
14use std::io::Write;
15use storage_device::buffer::MutableBufferRef;
16
17/// JournalWriter is responsible for writing log records to a journal file.  Each block contains a
18/// fletcher64 checksum at the end of the block.  This is used by both the main journal file and the
19/// super-block.
20pub struct JournalWriter {
21    // The block size used for this journal file.
22    block_size: usize,
23
24    // The checkpoint of the last write.
25    checkpoint: JournalCheckpoint,
26
27    // The last checksum we wrote to the buffer.
28    last_checksum: Checksum,
29
30    // The buffered data for the current block.
31    buf: Vec<u8>,
32
33    // Current journal offset based on the checkpoint of the last write
34    journal_checkpoint_offset: UintProperty,
35}
36
37impl JournalWriter {
38    pub fn new(block_size: usize, last_checksum: u64) -> Self {
39        // We must set the correct version here because the journal is written to when
40        // formatting as part of creating the allocator and must be ready to go.
41        let checkpoint =
42            JournalCheckpoint { version: LATEST_VERSION, ..JournalCheckpoint::default() };
43        JournalWriter {
44            block_size,
45            checkpoint,
46            last_checksum,
47            buf: Vec::new(),
48            journal_checkpoint_offset: metrics::detail()
49                .create_uint("journal_checkpoint_offset", 0),
50        }
51    }
52
53    /// Serializes a new journal record to the journal stream.
54    pub fn write_record<T: Versioned + std::fmt::Debug>(
55        &mut self,
56        record: &T,
57    ) -> Result<(), Error> {
58        let buf_len = self.buf.len();
59        record.serialize_into(&mut *self).unwrap(); // Our write implementation cannot fail at the
60                                                    // moment.
61
62        // For now, our reader cannot handle records that are bigger than a block.
63        if self.buf.len() - buf_len <= self.block_size {
64            Ok(())
65        } else {
66            Err(anyhow!(
67                "Serialized record too big ({} bytes): {:?}",
68                self.buf.len() - buf_len,
69                record
70            ))
71        }
72    }
73
74    /// Pads from the current offset in the buffer to the end of the block.
75    pub fn pad_to_block(&mut self) -> std::io::Result<()> {
76        let align = self.buf.len() % self.block_size;
77        if align > 0 {
78            self.write_all(&vec![0; self.block_size - std::mem::size_of::<Checksum>() - align])?;
79        }
80        Ok(())
81    }
82
83    /// Returns the checkpoint that corresponds to the current location in the journal stream
84    /// assuming that it has been flushed.
85    pub(super) fn journal_file_checkpoint(&self) -> JournalCheckpoint {
86        JournalCheckpoint {
87            file_offset: self.checkpoint.file_offset + self.buf.len() as u64,
88            checksum: self.last_checksum,
89            version: self.checkpoint.version,
90        }
91    }
92
93    /// Returns the number of bytes that are ready to be flushed.
94    pub fn flushable_bytes(&self) -> usize {
95        self.buf.len() - self.buf.len() % self.block_size
96    }
97
98    /// Fills `buf` with as many outstanding complete blocks from the journal object as possible, so
99    /// they can be flushed.  Part blocks can be flushed by calling pad_to_block first.  Returns the
100    /// checkpoint of the last flushed bit of data.  The caller must ensure there's data available
101    /// to flush by first checking `Self::flushable_bytes`.
102    /// If `buf` is smaller than the available blocks, more data will still be available.
103    pub fn take_flushable<'a>(&mut self, mut buf: MutableBufferRef<'a>) -> u64 {
104        // The buffer should always be completely filled.
105        assert!(self.flushable_bytes() >= buf.len());
106        let len = buf.len();
107        debug_assert!(len % self.block_size == 0);
108        buf.as_mut_slice().copy_from_slice(&self.buf[..len]);
109        let offset = self.checkpoint.file_offset;
110        self.journal_checkpoint_offset.set(offset);
111        self.buf.drain(..len);
112        self.checkpoint.file_offset += len as u64;
113        self.checkpoint.checksum = self.last_checksum;
114        offset
115    }
116
117    /// Seeks to the given offset in the journal file -- to be used once replay has finished.
118    ///
119    /// Note that the Journal expects a 4-byte record version to be the first thing after a
120    /// "RESET" event, which is generally the only time we would use seek.
121    ///
122    /// The offset of |checkpoint| must be block-aligned.  This is because the writer should never
123    /// be configured to start overwriting a partially written block -- doing so could leave
124    /// previously journaled data in an inconsistent state.  The writer must always start at a fresh
125    /// block, and a reset marker should be used to terminate the previous block.
126    pub fn seek(&mut self, checkpoint: JournalCheckpoint) {
127        assert!(self.buf.is_empty());
128        assert!(checkpoint.file_offset % self.block_size as u64 == 0);
129        self.checkpoint = checkpoint;
130        self.last_checksum = self.checkpoint.checksum;
131        self.journal_checkpoint_offset.set(self.checkpoint.file_offset);
132    }
133}
134
135impl std::io::Write for JournalWriter {
136    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
137        let mut offset = 0;
138        while offset < buf.len() {
139            let space = self.block_size
140                - std::mem::size_of::<Checksum>()
141                - self.buf.len() % self.block_size;
142            let to_copy = min(space, buf.len() - offset);
143            self.buf.write_all(&buf[offset..offset + to_copy])?;
144            if to_copy == space {
145                let end = self.buf.len();
146                let start = end + std::mem::size_of::<Checksum>() - self.block_size;
147                self.last_checksum = fletcher64(&self.buf[start..end], self.last_checksum);
148                self.buf.write_u64::<LittleEndian>(self.last_checksum)?;
149            }
150            offset += to_copy;
151        }
152        Ok(buf.len())
153    }
154
155    // This does nothing because it's sync.  Users must call the async flush_buffer function to
156    // flush outstanding data.
157    fn flush(&mut self) -> std::io::Result<()> {
158        Ok(())
159    }
160}
161
162impl Drop for JournalWriter {
163    fn drop(&mut self) {
164        // If this message is logged it means we forgot to call flush_buffer(), which in turn might
165        // mean Journal::sync() was not called.
166        if self.buf.len() > 0 {
167            warn!("journal data dropped!");
168        }
169    }
170}
171
172#[cfg(test)]
173mod tests {
174    use super::JournalWriter;
175    use crate::checksum::{fletcher64, Checksum};
176    use crate::object_handle::{ObjectHandle, ReadObjectHandle, WriteObjectHandle};
177    use crate::object_store::journal::JournalCheckpoint;
178    use crate::serialized_types::*;
179    use crate::testing::fake_object::{FakeObject, FakeObjectHandle};
180    use byteorder::{ByteOrder, LittleEndian};
181    use std::sync::Arc;
182
183    const TEST_BLOCK_SIZE: usize = 512;
184
185    #[fuchsia::test]
186    async fn test_write_single_record_and_pad() {
187        let object = Arc::new(FakeObject::new());
188        let mut writer = JournalWriter::new(TEST_BLOCK_SIZE, 0);
189        writer.write_record(&4u32).unwrap();
190        writer.pad_to_block().expect("pad_to_block failed");
191        let handle = FakeObjectHandle::new(object.clone());
192        let mut buf = handle.allocate_buffer(writer.flushable_bytes()).await;
193        let offset = writer.take_flushable(buf.as_mut());
194        handle.write_or_append(Some(offset), buf.as_ref()).await.expect("overwrite failed");
195
196        let handle = FakeObjectHandle::new(object.clone());
197        let mut buf = handle.allocate_buffer(object.get_size() as usize).await;
198        assert_eq!(buf.len(), TEST_BLOCK_SIZE);
199        handle.read(0, buf.as_mut()).await.expect("read failed");
200        let mut cursor = std::io::Cursor::new(buf.as_slice());
201        let value: u32 =
202            u32::deserialize_from(&mut cursor, LATEST_VERSION).expect("deserialize_from failed");
203        assert_eq!(value, 4u32);
204        let (payload, checksum_slice) =
205            buf.as_slice().split_at(buf.len() - std::mem::size_of::<Checksum>());
206        let checksum = LittleEndian::read_u64(checksum_slice);
207        assert_eq!(checksum, fletcher64(payload, 0));
208        assert_eq!(
209            writer.journal_file_checkpoint(),
210            JournalCheckpoint {
211                file_offset: TEST_BLOCK_SIZE as u64,
212                checksum,
213                version: LATEST_VERSION,
214            }
215        );
216    }
217
218    #[fuchsia::test]
219    async fn test_journal_file_checkpoint() {
220        let object = Arc::new(FakeObject::new());
221        let mut writer = JournalWriter::new(TEST_BLOCK_SIZE, 0);
222        writer.write_record(&4u32).unwrap();
223        let checkpoint = writer.journal_file_checkpoint();
224        assert_eq!(checkpoint.checksum, 0);
225        writer.write_record(&17u64).unwrap();
226        writer.pad_to_block().expect("pad_to_block failed");
227        let handle = FakeObjectHandle::new(object.clone());
228        let mut buf = handle.allocate_buffer(writer.flushable_bytes()).await;
229        let offset = writer.take_flushable(buf.as_mut());
230        handle.write_or_append(Some(offset), buf.as_ref()).await.expect("overwrite failed");
231
232        let handle = FakeObjectHandle::new(object.clone());
233        let mut buf = handle.allocate_buffer(object.get_size() as usize).await;
234        assert_eq!(buf.len(), TEST_BLOCK_SIZE);
235        handle.read(0, buf.as_mut()).await.expect("read failed");
236        let mut cursor = std::io::Cursor::new(&buf.as_slice()[checkpoint.file_offset as usize..]);
237        let value: u64 =
238            u64::deserialize_from(&mut cursor, LATEST_VERSION).expect("deserialize_from failed");
239        assert_eq!(value, 17);
240    }
241}