fxfs/object_store/journal/
checksum_list.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::checksum::{Checksum, fletcher64};
6use crate::errors::FxfsError;
7use crate::range::RangeExt;
8use anyhow::{Error, anyhow, ensure};
9use std::collections::BTreeMap;
10use std::ops::Range;
11use storage_device::Device;
12
13#[derive(Clone, Debug, Eq, PartialEq)]
14enum ChecksumState {
15    Unverified(Vec<Checksum>),
16    Valid,
17    Invalid,
18}
19
20impl ChecksumState {
21    fn checksum(&mut self) -> Option<&mut Vec<Checksum>> {
22        if let ChecksumState::Unverified(c) = self { Some(c) } else { None }
23    }
24}
25
26#[derive(Clone, Debug)]
27struct ChecksumEntry {
28    // |start| is the journal_offset at which this range was written.
29    start_journal_offset: u64,
30    device_range: Range<u64>,
31    // Holds checksums that cover |device_range| that should hold valid from
32    // start_journal_offset..end_journal_offset.
33    // |end_journal_offset| is the journal_offset at which the checksum range was deallocated.
34    // |end_journal_offset| defaults to u64::MAX but may be lowered if a range is deallocated.
35    checksums: Vec<(ChecksumState, /* end_journal_offset */ u64)>,
36}
37
38#[derive(Clone, Default)]
39pub struct ChecksumList {
40    // The offset that is known to have been flushed to the device.  Any entries in the journal that
41    // are prior to this point are ignored since there is no need to verify those checksums.
42    flushed_offset: u64,
43
44    // This is a list of checksums that we might need to verify, in journal order.
45    checksum_entries: Vec<ChecksumEntry>,
46
47    // Records a mapping from the *ending* offset of the device range, to the entry index.
48    device_offset_to_checksum_entry: BTreeMap<u64, usize>,
49
50    // The maximum chunk size within checksum_entries which determines the size of the buffer we
51    // need to allocate during verification.
52    max_chunk_size: u64,
53}
54
55impl ChecksumList {
56    pub fn new(flushed_offset: u64) -> Self {
57        ChecksumList { flushed_offset, ..Default::default() }
58    }
59
60    /// Adds an extent that might need its checksum verifying.  Extents must be pushed in
61    /// journal-offset order.
62    pub fn push(
63        &mut self,
64        journal_offset: u64,
65        mut device_range: Range<u64>,
66        mut checksums: &[u64],
67        first_write: bool,
68    ) -> Result<(), Error> {
69        if journal_offset < self.flushed_offset {
70            // Ignore anything that was prior to being flushed.
71            return Ok(());
72        }
73        let chunk_size = device_range.length().unwrap() / checksums.len() as u64;
74        if chunk_size > self.max_chunk_size {
75            self.max_chunk_size = chunk_size;
76        }
77
78        // Copy on write extents will always write to new device ranges. However, overwrite extents
79        // will continue to write to the same device range repeatedly. On every write, a new
80        // checksum is emitted for each block in the range.
81        //
82        // When replaying the journal, it's valid for any of the possible checksums to match,
83        // because any or all of the future writes to the range may have gone through. The only
84        // time that matters is when the first write to that block has happened during this replay,
85        // since that's the only thing we can actually roll back. On the first write, we create the
86        // checksum entry, and if any further checksums come in, we add them to the existing list.
87        //
88        // Fxfs currently doesn't support cloning, but if and when it does, this code may need to
89        // be revisited.
90        let mut gap_entries = Vec::new();
91        let mut r = self.device_offset_to_checksum_entry.range(device_range.start + 1..);
92        while let Some((_, index)) = r.next() {
93            let entry = &mut self.checksum_entries[*index];
94            if entry.device_range.start >= device_range.end {
95                break;
96            }
97
98            // Put the gap aside for later.
99            let gap = device_range.start..entry.device_range.start;
100            let gap_checksums = if gap.is_empty() {
101                &[]
102            } else {
103                let (head, tail) =
104                    checksums.split_at(((gap.end - gap.start) / chunk_size) as usize);
105                checksums = tail;
106                head
107            };
108
109            // Check that the checksums match.
110            ensure!(
111                (entry.device_range.end - entry.device_range.start) / entry.checksums.len() as u64
112                    == chunk_size,
113                anyhow!(FxfsError::Inconsistent)
114                    .context("Wrong number of checksums for device range"),
115            );
116
117            let overlap = std::cmp::max(device_range.start, entry.device_range.start)
118                ..std::cmp::min(device_range.end, entry.device_range.end);
119            let entry_checksums = &mut entry.checksums[((overlap.start - entry.device_range.start)
120                / chunk_size) as usize
121                ..((overlap.end - entry.device_range.start) / chunk_size) as usize];
122
123            let (head, tail) = checksums.split_at(entry_checksums.len());
124            checksums = tail;
125            for ((checksum_state, _), new_checksum) in entry_checksums.iter_mut().zip(head.iter()) {
126                checksum_state.checksum().unwrap().push(*new_checksum);
127            }
128            device_range.start = overlap.end;
129
130            // Now that we no longer need entry, we can insert the gap into checksum_entries, but we
131            // can't touch device_offset_to_checksum_entry until after the loop.
132            if !gap.is_empty() && first_write {
133                gap_entries.push((gap.end, self.checksum_entries.len()));
134                self.checksum_entries.push(ChecksumEntry {
135                    start_journal_offset: journal_offset,
136                    device_range: gap,
137                    checksums: gap_checksums
138                        .into_iter()
139                        .map(|c| (ChecksumState::Unverified(vec![*c]), u64::MAX))
140                        .collect(),
141                });
142            }
143
144            if device_range.is_empty() {
145                break;
146            }
147        }
148
149        // Add the gap entries we couldn't add earlier.
150        self.device_offset_to_checksum_entry.extend(gap_entries);
151
152        // Add any remainder.
153        if !device_range.is_empty() && first_write {
154            self.device_offset_to_checksum_entry
155                .insert(device_range.end, self.checksum_entries.len());
156            self.checksum_entries.push(ChecksumEntry {
157                start_journal_offset: journal_offset,
158                device_range,
159                checksums: checksums
160                    .iter()
161                    .map(|c| (ChecksumState::Unverified(vec![*c]), u64::MAX))
162                    .collect(),
163            });
164        }
165
166        Ok(())
167    }
168
169    /// Marks an extent as deallocated.  If this journal-offset ends up being replayed, it means
170    /// that we can skip a previously queued checksum.
171    pub fn mark_deallocated(&mut self, journal_offset: u64, mut device_range: Range<u64>) {
172        if journal_offset < self.flushed_offset {
173            // Ignore anything that was prior to being flushed.
174            return;
175        }
176        let mut r = self.device_offset_to_checksum_entry.range(device_range.start + 1..);
177        while let Some((_, index)) = r.next() {
178            let entry = &mut self.checksum_entries[*index];
179            if entry.device_range.start >= device_range.end {
180                break;
181            }
182            let chunk_size =
183                (entry.device_range.length().unwrap() / entry.checksums.len() as u64) as usize;
184            let checksum_index_start = if device_range.start < entry.device_range.start {
185                0
186            } else {
187                (device_range.start - entry.device_range.start) as usize / chunk_size
188            };
189            // Figure out the overlap.
190            if entry.device_range.end >= device_range.end {
191                let checksum_index_end =
192                    (device_range.end - entry.device_range.start) as usize / chunk_size;
193                // This entry covers the remainder.
194                entry.checksums[checksum_index_start..checksum_index_end]
195                    .iter_mut()
196                    .for_each(|c| c.1 = journal_offset);
197                break;
198            }
199            entry.checksums[checksum_index_start..].iter_mut().for_each(|c| c.1 = journal_offset);
200            device_range.start = entry.device_range.end;
201        }
202    }
203
204    /// Verifies the checksums in the list.  `journal_offset` should indicate the last journal
205    /// offset read and verify will return the journal offset that it is safe to replay up to.
206    /// `flushed_offset` indicates the offset that we know to have been flushed and so we don't need
207    /// to perform verification.
208    pub async fn verify(
209        &mut self,
210        device: &dyn Device,
211        mut journal_offset: u64,
212    ) -> Result<u64, Error> {
213        let mut buf = device.allocate_buffer(self.max_chunk_size as usize).await;
214        'try_again: loop {
215            for e in &mut self.checksum_entries {
216                if e.start_journal_offset >= journal_offset {
217                    break;
218                }
219                let chunk_size =
220                    (e.device_range.length().unwrap() / e.checksums.len() as u64) as usize;
221                let mut offset = e.device_range.start;
222                for (checksum_state, dependency) in e.checksums.iter_mut() {
223                    // We only need to verify the checksum if we know the dependency isn't going to
224                    // be replayed and we can skip verifications that we know were done on the
225                    // previous iteration of the loop.
226                    if *dependency >= journal_offset {
227                        if let Some(checksums) = checksum_state.checksum() {
228                            device.read(offset, buf.subslice_mut(0..chunk_size)).await?;
229                            let found_checksum = fletcher64(&buf.as_slice()[0..chunk_size], 0);
230                            if checksums
231                                .iter()
232                                .find(|&&checksum| checksum == found_checksum)
233                                .is_some()
234                            {
235                                *checksum_state = ChecksumState::Valid;
236                            } else {
237                                *checksum_state = ChecksumState::Invalid;
238                            }
239                        }
240                        if *checksum_state == ChecksumState::Invalid {
241                            // Verification failed, so we need to reset the journal_offset to
242                            // before this entry and try again.
243                            journal_offset = e.start_journal_offset;
244                            continue 'try_again;
245                        }
246                    }
247                    offset += chunk_size as u64;
248                }
249            }
250            return Ok(journal_offset);
251        }
252    }
253}
254
255#[cfg(test)]
256mod tests {
257    use super::ChecksumList;
258    use crate::checksum::fletcher64;
259    use storage_device::Device;
260    use storage_device::fake_device::FakeDevice;
261
262    #[fuchsia::test]
263    async fn test_verify() {
264        let device = FakeDevice::new(2048, 512);
265        let mut buffer = device.allocate_buffer(2048).await;
266        let mut list = ChecksumList::new(0);
267
268        buffer.as_mut_slice()[0..512].copy_from_slice(&[1; 512]);
269        buffer.as_mut_slice()[512..1024].copy_from_slice(&[2; 512]);
270        buffer.as_mut_slice()[1024..1536].copy_from_slice(&[3; 512]);
271        buffer.as_mut_slice()[1536..2048].copy_from_slice(&[4; 512]);
272        device.write(512, buffer.as_ref()).await.expect("write failed");
273        list.push(
274            1,
275            512..2048,
276            &[fletcher64(&[1; 512], 0), fletcher64(&[2; 512], 0), fletcher64(&[3; 512], 0)],
277            true,
278        )
279        .unwrap();
280
281        // All entries should pass.
282        assert_eq!(list.clone().verify(&device, 10).await.expect("verify failed"), 10);
283
284        // Corrupt the middle of the three 512 byte blocks.
285        buffer.as_mut_slice()[512] = 0;
286        device.write(512, buffer.as_ref()).await.expect("write failed");
287
288        // Verification should fail now.
289        assert_eq!(list.clone().verify(&device, 10).await.expect("verify failed"), 1);
290
291        // Mark the middle block as deallocated and then it should pass again.
292        list.mark_deallocated(2, 1024..1536);
293        assert_eq!(list.clone().verify(&device, 10).await.expect("verify failed"), 10);
294
295        // Add another entry followed by a deallocation.
296        list.push(3, 2048..2560, &[fletcher64(&[4; 512], 0)], true).unwrap();
297        list.mark_deallocated(4, 1536..2048);
298
299        // All entries should validate.
300        assert_eq!(list.clone().verify(&device, 10).await.expect("verify failed"), 10);
301
302        // Now corrupt the block at 2048.
303        buffer.as_mut_slice()[1536] = 0;
304        device.write(512, buffer.as_ref()).await.expect("write failed");
305
306        // This should only validate up to journal offset 3.
307        assert_eq!(list.clone().verify(&device, 10).await.expect("verify failed"), 3);
308
309        // Corrupt the block that was marked as deallocated in #4.
310        buffer.as_mut_slice()[1024] = 0;
311        device.write(512, buffer.as_ref()).await.expect("write failed");
312
313        // The deallocation in #4 should be ignored and so validation should only succeed up
314        // to offset 1.
315        assert_eq!(list.verify(&device, 10).await.expect("verify failed"), 1);
316    }
317
318    #[fuchsia::test]
319    async fn test_verify_entry_prior_to_flushed_offset_is_ignored() {
320        let device = FakeDevice::new(2048, 512);
321        let mut buffer = device.allocate_buffer(2048).await;
322        let mut list = ChecksumList::new(2);
323
324        buffer.as_mut_slice()[0..512].copy_from_slice(&[1; 512]);
325        buffer.as_mut_slice()[512..1024].copy_from_slice(&[2; 512]);
326        device.write(512, buffer.as_ref()).await.expect("write failed");
327
328        // This entry has the wrong checksum will fail, but it should be ignored anyway because it
329        // is prior to the flushed offset.
330        list.push(1, 512..1024, &[fletcher64(&[2; 512], 0)], true).unwrap();
331
332        list.push(2, 1024..1536, &[fletcher64(&[2; 512], 0)], true).unwrap();
333
334        assert_eq!(list.verify(&device, 10).await.expect("verify failed"), 10);
335    }
336
337    #[fuchsia::test]
338    async fn test_deallocate_overlap() {
339        let device = FakeDevice::new(2048, 512);
340        let mut buffer = device.allocate_buffer(512).await;
341        let mut list = ChecksumList::new(1);
342
343        buffer.as_mut_slice().copy_from_slice(&[2; 512]);
344        device.write(2560, buffer.as_ref()).await.expect("write failed");
345
346        list.push(2, 512..1024, &[fletcher64(&[1; 512], 0)], true).unwrap();
347        list.mark_deallocated(3, 0..1024);
348        list.push(4, 2048..3072, &[fletcher64(&[2; 512], 0); 2], true).unwrap();
349        list.mark_deallocated(5, 1536..2560);
350
351        assert_eq!(list.verify(&device, 10).await.expect("verify failed"), 10);
352    }
353
354    #[fuchsia::test]
355    async fn test_different_chunk_size() {
356        let device = FakeDevice::new(2048, 512);
357        let mut buffer = device.allocate_buffer(1024).await;
358        let mut list = ChecksumList::new(1);
359
360        buffer.as_mut_slice().copy_from_slice(&[2; 1024]);
361        device.write(1024, buffer.as_ref()).await.expect("write failed");
362
363        let c0 = fletcher64(&[0; 512], 0);
364        let c2 = fletcher64(&[2; 512], 0);
365
366        list.push(1, 1024..2048, &[c2, c2], true).unwrap();
367
368        list.push(2, 1024..2048, &[c0], false)
369            .expect_err("Expected failure due to different chunk size");
370        assert_eq!(list.verify(&device, 2).await.expect("verify failed"), 2);
371    }
372
373    #[fuchsia::test]
374    async fn test_drop_unneeded_checksums_after_first_write() {
375        let device = FakeDevice::new(2048, 512);
376        let mut buffer = device.allocate_buffer(1024).await;
377        let mut list = ChecksumList::new(1);
378
379        buffer.as_mut_slice().copy_from_slice(&[2; 1024]);
380        device.write(1024, buffer.as_ref()).await.expect("write failed");
381
382        let c1 = fletcher64(&[1; 512], 0);
383        let c2 = fletcher64(&[2; 512], 0);
384
385        list.push(1, 1024..2048, &[c2, c2], true).unwrap();
386        list.push(2, 0..1024, &[c2, c1], false).unwrap();
387        assert_eq!(list.verify(&device, 2).await.expect("verify failed"), 2);
388    }
389
390    #[fuchsia::test]
391    async fn test_overlapping_checksums() {
392        let device = FakeDevice::new(2048, 512);
393        let mut buffer = device.allocate_buffer(1024).await;
394        let mut list = ChecksumList::new(1);
395
396        buffer.as_mut_slice().copy_from_slice(&[2; 1024]);
397        device.write(1024, buffer.as_ref()).await.expect("write failed");
398
399        let c1 = fletcher64(&[1; 512], 0);
400        let c2 = fletcher64(&[2; 512], 0);
401
402        list.push(1, 1024..2048, &[c2, c2], true).unwrap();
403
404        list.push(2, 512..2560, &[c1, c1, c1, c1], false).unwrap();
405        assert_eq!(list.verify(&device, 2).await.expect("verify failed"), 2);
406
407        // Changing the data to ones should be fine now because the other checksums will match.
408        buffer.as_mut_slice().copy_from_slice(&[1; 1024]);
409        device.write(1024, buffer.as_ref()).await.expect("write failed");
410        assert_eq!(list.verify(&device, 4).await.expect("verify failed"), 4);
411    }
412}