fxfs/object_store/journal/
checksum_list.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::checksum::{fletcher64, Checksum};
6use crate::errors::FxfsError;
7use crate::range::RangeExt;
8use anyhow::{anyhow, ensure, Error};
9use std::collections::BTreeMap;
10use std::ops::Range;
11use storage_device::Device;
12
13#[derive(Clone, Debug, Eq, PartialEq)]
14enum ChecksumState {
15    Unverified(Vec<Checksum>),
16    Valid,
17    Invalid,
18}
19
20impl ChecksumState {
21    fn checksum(&mut self) -> Option<&mut Vec<Checksum>> {
22        if let ChecksumState::Unverified(c) = self {
23            Some(c)
24        } else {
25            None
26        }
27    }
28}
29
30#[derive(Clone, Debug)]
31struct ChecksumEntry {
32    // |start| is the journal_offset at which this range was written.
33    start_journal_offset: u64,
34    device_range: Range<u64>,
35    // Holds checksums that cover |device_range| that should hold valid from
36    // start_journal_offset..end_journal_offset.
37    // |end_journal_offset| is the journal_offset at which the checksum range was deallocated.
38    // |end_journal_offset| defaults to u64::MAX but may be lowered if a range is deallocated.
39    checksums: Vec<(ChecksumState, /* end_journal_offset */ u64)>,
40}
41
42#[derive(Clone, Default)]
43pub struct ChecksumList {
44    // The offset that is known to have been flushed to the device.  Any entries in the journal that
45    // are prior to this point are ignored since there is no need to verify those checksums.
46    flushed_offset: u64,
47
48    // This is a list of checksums that we might need to verify, in journal order.
49    checksum_entries: Vec<ChecksumEntry>,
50
51    // Records a mapping from the *ending* offset of the device range, to the entry index.
52    device_offset_to_checksum_entry: BTreeMap<u64, usize>,
53
54    // The maximum chunk size within checksum_entries which determines the size of the buffer we
55    // need to allocate during verification.
56    max_chunk_size: u64,
57}
58
59impl ChecksumList {
60    pub fn new(flushed_offset: u64) -> Self {
61        ChecksumList { flushed_offset, ..Default::default() }
62    }
63
64    /// Adds an extent that might need its checksum verifying.  Extents must be pushed in
65    /// journal-offset order.
66    pub fn push(
67        &mut self,
68        journal_offset: u64,
69        mut device_range: Range<u64>,
70        mut checksums: &[u64],
71        first_write: bool,
72    ) -> Result<(), Error> {
73        if journal_offset < self.flushed_offset {
74            // Ignore anything that was prior to being flushed.
75            return Ok(());
76        }
77        let chunk_size = device_range.length().unwrap() / checksums.len() as u64;
78        if chunk_size > self.max_chunk_size {
79            self.max_chunk_size = chunk_size;
80        }
81
82        // Copy on write extents will always write to new device ranges. However, overwrite extents
83        // will continue to write to the same device range repeatedly. On every write, a new
84        // checksum is emitted for each block in the range.
85        //
86        // When replaying the journal, it's valid for any of the possible checksums to match,
87        // because any or all of the future writes to the range may have gone through. The only
88        // time that matters is when the first write to that block has happened during this replay,
89        // since that's the only thing we can actually roll back. On the first write, we create the
90        // checksum entry, and if any further checksums come in, we add them to the existing list.
91        //
92        // Fxfs currently doesn't support cloning, but if and when it does, this code may need to
93        // be revisited.
94        let mut gap_entries = Vec::new();
95        let mut r = self.device_offset_to_checksum_entry.range(device_range.start + 1..);
96        while let Some((_, index)) = r.next() {
97            let entry = &mut self.checksum_entries[*index];
98            if entry.device_range.start >= device_range.end {
99                break;
100            }
101
102            // Put the gap aside for later.
103            let gap = device_range.start..entry.device_range.start;
104            let gap_checksums = if gap.is_empty() {
105                &[]
106            } else {
107                let (head, tail) =
108                    checksums.split_at(((gap.end - gap.start) / chunk_size) as usize);
109                checksums = tail;
110                head
111            };
112
113            // Check that the checksums match.
114            ensure!(
115                (entry.device_range.end - entry.device_range.start) / entry.checksums.len() as u64
116                    == chunk_size,
117                anyhow!(FxfsError::Inconsistent)
118                    .context("Wrong number of checksums for device range"),
119            );
120
121            let overlap = std::cmp::max(device_range.start, entry.device_range.start)
122                ..std::cmp::min(device_range.end, entry.device_range.end);
123            let entry_checksums = &mut entry.checksums[((overlap.start - entry.device_range.start)
124                / chunk_size) as usize
125                ..((overlap.end - entry.device_range.start) / chunk_size) as usize];
126
127            let (head, tail) = checksums.split_at(entry_checksums.len());
128            checksums = tail;
129            for ((checksum_state, _), new_checksum) in entry_checksums.iter_mut().zip(head.iter()) {
130                checksum_state.checksum().unwrap().push(*new_checksum);
131            }
132            device_range.start = overlap.end;
133
134            // Now that we no longer need entry, we can insert the gap into checksum_entries, but we
135            // can't touch device_offset_to_checksum_entry until after the loop.
136            if !gap.is_empty() && first_write {
137                gap_entries.push((gap.end, self.checksum_entries.len()));
138                self.checksum_entries.push(ChecksumEntry {
139                    start_journal_offset: journal_offset,
140                    device_range: gap,
141                    checksums: gap_checksums
142                        .into_iter()
143                        .map(|c| (ChecksumState::Unverified(vec![*c]), u64::MAX))
144                        .collect(),
145                });
146            }
147
148            if device_range.is_empty() {
149                break;
150            }
151        }
152
153        // Add the gap entries we couldn't add earlier.
154        self.device_offset_to_checksum_entry.extend(gap_entries);
155
156        // Add any remainder.
157        if !device_range.is_empty() && first_write {
158            self.device_offset_to_checksum_entry
159                .insert(device_range.end, self.checksum_entries.len());
160            self.checksum_entries.push(ChecksumEntry {
161                start_journal_offset: journal_offset,
162                device_range,
163                checksums: checksums
164                    .iter()
165                    .map(|c| (ChecksumState::Unverified(vec![*c]), u64::MAX))
166                    .collect(),
167            });
168        }
169
170        Ok(())
171    }
172
173    /// Marks an extent as deallocated.  If this journal-offset ends up being replayed, it means
174    /// that we can skip a previously queued checksum.
175    pub fn mark_deallocated(&mut self, journal_offset: u64, mut device_range: Range<u64>) {
176        if journal_offset < self.flushed_offset {
177            // Ignore anything that was prior to being flushed.
178            return;
179        }
180        let mut r = self.device_offset_to_checksum_entry.range(device_range.start + 1..);
181        while let Some((_, index)) = r.next() {
182            let entry = &mut self.checksum_entries[*index];
183            if entry.device_range.start >= device_range.end {
184                break;
185            }
186            let chunk_size =
187                (entry.device_range.length().unwrap() / entry.checksums.len() as u64) as usize;
188            let checksum_index_start = if device_range.start < entry.device_range.start {
189                0
190            } else {
191                (device_range.start - entry.device_range.start) as usize / chunk_size
192            };
193            // Figure out the overlap.
194            if entry.device_range.end >= device_range.end {
195                let checksum_index_end =
196                    (device_range.end - entry.device_range.start) as usize / chunk_size;
197                // This entry covers the remainder.
198                entry.checksums[checksum_index_start..checksum_index_end]
199                    .iter_mut()
200                    .for_each(|c| c.1 = journal_offset);
201                break;
202            }
203            entry.checksums[checksum_index_start..].iter_mut().for_each(|c| c.1 = journal_offset);
204            device_range.start = entry.device_range.end;
205        }
206    }
207
208    /// Verifies the checksums in the list.  `journal_offset` should indicate the last journal
209    /// offset read and verify will return the journal offset that it is safe to replay up to.
210    /// `flushed_offset` indicates the offset that we know to have been flushed and so we don't need
211    /// to perform verification.
212    pub async fn verify(
213        &mut self,
214        device: &dyn Device,
215        mut journal_offset: u64,
216    ) -> Result<u64, Error> {
217        let mut buf = device.allocate_buffer(self.max_chunk_size as usize).await;
218        'try_again: loop {
219            for e in &mut self.checksum_entries {
220                if e.start_journal_offset >= journal_offset {
221                    break;
222                }
223                let chunk_size =
224                    (e.device_range.length().unwrap() / e.checksums.len() as u64) as usize;
225                let mut offset = e.device_range.start;
226                for (checksum_state, dependency) in e.checksums.iter_mut() {
227                    // We only need to verify the checksum if we know the dependency isn't going to
228                    // be replayed and we can skip verifications that we know were done on the
229                    // previous iteration of the loop.
230                    if *dependency >= journal_offset {
231                        if let Some(checksums) = checksum_state.checksum() {
232                            device.read(offset, buf.subslice_mut(0..chunk_size)).await?;
233                            let found_checksum = fletcher64(&buf.as_slice()[0..chunk_size], 0);
234                            if checksums
235                                .iter()
236                                .find(|&&checksum| checksum == found_checksum)
237                                .is_some()
238                            {
239                                *checksum_state = ChecksumState::Valid;
240                            } else {
241                                *checksum_state = ChecksumState::Invalid;
242                            }
243                        }
244                        if *checksum_state == ChecksumState::Invalid {
245                            // Verification failed, so we need to reset the journal_offset to
246                            // before this entry and try again.
247                            journal_offset = e.start_journal_offset;
248                            continue 'try_again;
249                        }
250                    }
251                    offset += chunk_size as u64;
252                }
253            }
254            return Ok(journal_offset);
255        }
256    }
257}
258
259#[cfg(test)]
260mod tests {
261    use super::ChecksumList;
262    use crate::checksum::fletcher64;
263    use storage_device::fake_device::FakeDevice;
264    use storage_device::Device;
265
266    #[fuchsia::test]
267    async fn test_verify() {
268        let device = FakeDevice::new(2048, 512);
269        let mut buffer = device.allocate_buffer(2048).await;
270        let mut list = ChecksumList::new(0);
271
272        buffer.as_mut_slice()[0..512].copy_from_slice(&[1; 512]);
273        buffer.as_mut_slice()[512..1024].copy_from_slice(&[2; 512]);
274        buffer.as_mut_slice()[1024..1536].copy_from_slice(&[3; 512]);
275        buffer.as_mut_slice()[1536..2048].copy_from_slice(&[4; 512]);
276        device.write(512, buffer.as_ref()).await.expect("write failed");
277        list.push(
278            1,
279            512..2048,
280            &[fletcher64(&[1; 512], 0), fletcher64(&[2; 512], 0), fletcher64(&[3; 512], 0)],
281            true,
282        )
283        .unwrap();
284
285        // All entries should pass.
286        assert_eq!(list.clone().verify(&device, 10).await.expect("verify failed"), 10);
287
288        // Corrupt the middle of the three 512 byte blocks.
289        buffer.as_mut_slice()[512] = 0;
290        device.write(512, buffer.as_ref()).await.expect("write failed");
291
292        // Verification should fail now.
293        assert_eq!(list.clone().verify(&device, 10).await.expect("verify failed"), 1);
294
295        // Mark the middle block as deallocated and then it should pass again.
296        list.mark_deallocated(2, 1024..1536);
297        assert_eq!(list.clone().verify(&device, 10).await.expect("verify failed"), 10);
298
299        // Add another entry followed by a deallocation.
300        list.push(3, 2048..2560, &[fletcher64(&[4; 512], 0)], true).unwrap();
301        list.mark_deallocated(4, 1536..2048);
302
303        // All entries should validate.
304        assert_eq!(list.clone().verify(&device, 10).await.expect("verify failed"), 10);
305
306        // Now corrupt the block at 2048.
307        buffer.as_mut_slice()[1536] = 0;
308        device.write(512, buffer.as_ref()).await.expect("write failed");
309
310        // This should only validate up to journal offset 3.
311        assert_eq!(list.clone().verify(&device, 10).await.expect("verify failed"), 3);
312
313        // Corrupt the block that was marked as deallocated in #4.
314        buffer.as_mut_slice()[1024] = 0;
315        device.write(512, buffer.as_ref()).await.expect("write failed");
316
317        // The deallocation in #4 should be ignored and so validation should only succeed up
318        // to offset 1.
319        assert_eq!(list.verify(&device, 10).await.expect("verify failed"), 1);
320    }
321
322    #[fuchsia::test]
323    async fn test_verify_entry_prior_to_flushed_offset_is_ignored() {
324        let device = FakeDevice::new(2048, 512);
325        let mut buffer = device.allocate_buffer(2048).await;
326        let mut list = ChecksumList::new(2);
327
328        buffer.as_mut_slice()[0..512].copy_from_slice(&[1; 512]);
329        buffer.as_mut_slice()[512..1024].copy_from_slice(&[2; 512]);
330        device.write(512, buffer.as_ref()).await.expect("write failed");
331
332        // This entry has the wrong checksum will fail, but it should be ignored anyway because it
333        // is prior to the flushed offset.
334        list.push(1, 512..1024, &[fletcher64(&[2; 512], 0)], true).unwrap();
335
336        list.push(2, 1024..1536, &[fletcher64(&[2; 512], 0)], true).unwrap();
337
338        assert_eq!(list.verify(&device, 10).await.expect("verify failed"), 10);
339    }
340
341    #[fuchsia::test]
342    async fn test_deallocate_overlap() {
343        let device = FakeDevice::new(2048, 512);
344        let mut buffer = device.allocate_buffer(512).await;
345        let mut list = ChecksumList::new(1);
346
347        buffer.as_mut_slice().copy_from_slice(&[2; 512]);
348        device.write(2560, buffer.as_ref()).await.expect("write failed");
349
350        list.push(2, 512..1024, &[fletcher64(&[1; 512], 0)], true).unwrap();
351        list.mark_deallocated(3, 0..1024);
352        list.push(4, 2048..3072, &[fletcher64(&[2; 512], 0); 2], true).unwrap();
353        list.mark_deallocated(5, 1536..2560);
354
355        assert_eq!(list.verify(&device, 10).await.expect("verify failed"), 10);
356    }
357
358    #[fuchsia::test]
359    async fn test_different_chunk_size() {
360        let device = FakeDevice::new(2048, 512);
361        let mut buffer = device.allocate_buffer(1024).await;
362        let mut list = ChecksumList::new(1);
363
364        buffer.as_mut_slice().copy_from_slice(&[2; 1024]);
365        device.write(1024, buffer.as_ref()).await.expect("write failed");
366
367        let c0 = fletcher64(&[0; 512], 0);
368        let c2 = fletcher64(&[2; 512], 0);
369
370        list.push(1, 1024..2048, &[c2, c2], true).unwrap();
371
372        list.push(2, 1024..2048, &[c0], false)
373            .expect_err("Expected failure due to different chunk size");
374        assert_eq!(list.verify(&device, 2).await.expect("verify failed"), 2);
375    }
376
377    #[fuchsia::test]
378    async fn test_drop_unneeded_checksums_after_first_write() {
379        let device = FakeDevice::new(2048, 512);
380        let mut buffer = device.allocate_buffer(1024).await;
381        let mut list = ChecksumList::new(1);
382
383        buffer.as_mut_slice().copy_from_slice(&[2; 1024]);
384        device.write(1024, buffer.as_ref()).await.expect("write failed");
385
386        let c1 = fletcher64(&[1; 512], 0);
387        let c2 = fletcher64(&[2; 512], 0);
388
389        list.push(1, 1024..2048, &[c2, c2], true).unwrap();
390        list.push(2, 0..1024, &[c2, c1], false).unwrap();
391        assert_eq!(list.verify(&device, 2).await.expect("verify failed"), 2);
392    }
393
394    #[fuchsia::test]
395    async fn test_overlapping_checksums() {
396        let device = FakeDevice::new(2048, 512);
397        let mut buffer = device.allocate_buffer(1024).await;
398        let mut list = ChecksumList::new(1);
399
400        buffer.as_mut_slice().copy_from_slice(&[2; 1024]);
401        device.write(1024, buffer.as_ref()).await.expect("write failed");
402
403        let c1 = fletcher64(&[1; 512], 0);
404        let c2 = fletcher64(&[2; 512], 0);
405
406        list.push(1, 1024..2048, &[c2, c2], true).unwrap();
407
408        list.push(2, 512..2560, &[c1, c1, c1, c1], false).unwrap();
409        assert_eq!(list.verify(&device, 2).await.expect("verify failed"), 2);
410
411        // Changing the data to ones should be fine now because the other checksums will match.
412        buffer.as_mut_slice().copy_from_slice(&[1; 1024]);
413        device.write(1024, buffer.as_ref()).await.expect("write failed");
414        assert_eq!(list.verify(&device, 4).await.expect("verify failed"), 4);
415    }
416}