fxfs/object_store/journal/
checksum_list.rs1use crate::checksum::{Checksum, fletcher64};
6use crate::errors::FxfsError;
7use crate::range::RangeExt;
8use anyhow::{Error, anyhow, ensure};
9use std::collections::BTreeMap;
10use std::ops::Range;
11use storage_device::Device;
12
13#[derive(Clone, Debug, Eq, PartialEq)]
14enum ChecksumState {
15 Unverified(Vec<Checksum>),
16 Valid,
17 Invalid,
18}
19
20impl ChecksumState {
21 fn checksum(&mut self) -> Option<&mut Vec<Checksum>> {
22 if let ChecksumState::Unverified(c) = self { Some(c) } else { None }
23 }
24}
25
26#[derive(Clone, Debug)]
27struct ChecksumEntry {
28 start_journal_offset: u64,
30 device_range: Range<u64>,
31 checksums: Vec<(ChecksumState, u64)>,
36}
37
38#[derive(Clone, Default)]
39pub struct ChecksumList {
40 flushed_offset: u64,
43
44 checksum_entries: Vec<ChecksumEntry>,
46
47 device_offset_to_checksum_entry: BTreeMap<u64, usize>,
49
50 max_chunk_size: u64,
53}
54
55impl ChecksumList {
56 pub fn new(flushed_offset: u64) -> Self {
57 ChecksumList { flushed_offset, ..Default::default() }
58 }
59
60 pub fn push(
63 &mut self,
64 journal_offset: u64,
65 mut device_range: Range<u64>,
66 mut checksums: &[u64],
67 first_write: bool,
68 ) -> Result<(), Error> {
69 if journal_offset < self.flushed_offset {
70 return Ok(());
72 }
73 let chunk_size = device_range.length().unwrap() / checksums.len() as u64;
74 if chunk_size > self.max_chunk_size {
75 self.max_chunk_size = chunk_size;
76 }
77
78 let mut gap_entries = Vec::new();
91 let mut r = self.device_offset_to_checksum_entry.range(device_range.start + 1..);
92 while let Some((_, index)) = r.next() {
93 let entry = &mut self.checksum_entries[*index];
94 if entry.device_range.start >= device_range.end {
95 break;
96 }
97
98 let gap = device_range.start..entry.device_range.start;
100 let gap_checksums = if gap.is_empty() {
101 &[]
102 } else {
103 let (head, tail) =
104 checksums.split_at(((gap.end - gap.start) / chunk_size) as usize);
105 checksums = tail;
106 head
107 };
108
109 ensure!(
111 (entry.device_range.end - entry.device_range.start) / entry.checksums.len() as u64
112 == chunk_size,
113 anyhow!(FxfsError::Inconsistent)
114 .context("Wrong number of checksums for device range"),
115 );
116
117 let overlap = std::cmp::max(device_range.start, entry.device_range.start)
118 ..std::cmp::min(device_range.end, entry.device_range.end);
119 let entry_checksums = &mut entry.checksums[((overlap.start - entry.device_range.start)
120 / chunk_size) as usize
121 ..((overlap.end - entry.device_range.start) / chunk_size) as usize];
122
123 let (head, tail) = checksums.split_at(entry_checksums.len());
124 checksums = tail;
125 for ((checksum_state, _), new_checksum) in entry_checksums.iter_mut().zip(head.iter()) {
126 checksum_state.checksum().unwrap().push(*new_checksum);
127 }
128 device_range.start = overlap.end;
129
130 if !gap.is_empty() && first_write {
133 gap_entries.push((gap.end, self.checksum_entries.len()));
134 self.checksum_entries.push(ChecksumEntry {
135 start_journal_offset: journal_offset,
136 device_range: gap,
137 checksums: gap_checksums
138 .into_iter()
139 .map(|c| (ChecksumState::Unverified(vec![*c]), u64::MAX))
140 .collect(),
141 });
142 }
143
144 if device_range.is_empty() {
145 break;
146 }
147 }
148
149 self.device_offset_to_checksum_entry.extend(gap_entries);
151
152 if !device_range.is_empty() && first_write {
154 self.device_offset_to_checksum_entry
155 .insert(device_range.end, self.checksum_entries.len());
156 self.checksum_entries.push(ChecksumEntry {
157 start_journal_offset: journal_offset,
158 device_range,
159 checksums: checksums
160 .iter()
161 .map(|c| (ChecksumState::Unverified(vec![*c]), u64::MAX))
162 .collect(),
163 });
164 }
165
166 Ok(())
167 }
168
169 pub fn mark_deallocated(&mut self, journal_offset: u64, mut device_range: Range<u64>) {
172 if journal_offset < self.flushed_offset {
173 return;
175 }
176 let mut r = self.device_offset_to_checksum_entry.range(device_range.start + 1..);
177 while let Some((_, index)) = r.next() {
178 let entry = &mut self.checksum_entries[*index];
179 if entry.device_range.start >= device_range.end {
180 break;
181 }
182 let chunk_size =
183 (entry.device_range.length().unwrap() / entry.checksums.len() as u64) as usize;
184 let checksum_index_start = if device_range.start < entry.device_range.start {
185 0
186 } else {
187 (device_range.start - entry.device_range.start) as usize / chunk_size
188 };
189 if entry.device_range.end >= device_range.end {
191 let checksum_index_end =
192 (device_range.end - entry.device_range.start) as usize / chunk_size;
193 entry.checksums[checksum_index_start..checksum_index_end]
195 .iter_mut()
196 .for_each(|c| c.1 = journal_offset);
197 break;
198 }
199 entry.checksums[checksum_index_start..].iter_mut().for_each(|c| c.1 = journal_offset);
200 device_range.start = entry.device_range.end;
201 }
202 }
203
204 pub async fn verify(
209 &mut self,
210 device: &dyn Device,
211 mut journal_offset: u64,
212 ) -> Result<u64, Error> {
213 let mut buf = device.allocate_buffer(self.max_chunk_size as usize).await;
214 'try_again: loop {
215 for e in &mut self.checksum_entries {
216 if e.start_journal_offset >= journal_offset {
217 break;
218 }
219 let chunk_size =
220 (e.device_range.length().unwrap() / e.checksums.len() as u64) as usize;
221 let mut offset = e.device_range.start;
222 for (checksum_state, dependency) in e.checksums.iter_mut() {
223 if *dependency >= journal_offset {
227 if let Some(checksums) = checksum_state.checksum() {
228 device.read(offset, buf.subslice_mut(0..chunk_size)).await?;
229 let found_checksum = fletcher64(&buf.as_slice()[0..chunk_size], 0);
230 if checksums
231 .iter()
232 .find(|&&checksum| checksum == found_checksum)
233 .is_some()
234 {
235 *checksum_state = ChecksumState::Valid;
236 } else {
237 *checksum_state = ChecksumState::Invalid;
238 }
239 }
240 if *checksum_state == ChecksumState::Invalid {
241 journal_offset = e.start_journal_offset;
244 continue 'try_again;
245 }
246 }
247 offset += chunk_size as u64;
248 }
249 }
250 return Ok(journal_offset);
251 }
252 }
253}
254
255#[cfg(test)]
256mod tests {
257 use super::ChecksumList;
258 use crate::checksum::fletcher64;
259 use storage_device::Device;
260 use storage_device::fake_device::FakeDevice;
261
262 #[fuchsia::test]
263 async fn test_verify() {
264 let device = FakeDevice::new(2048, 512);
265 let mut buffer = device.allocate_buffer(2048).await;
266 let mut list = ChecksumList::new(0);
267
268 buffer.as_mut_slice()[0..512].copy_from_slice(&[1; 512]);
269 buffer.as_mut_slice()[512..1024].copy_from_slice(&[2; 512]);
270 buffer.as_mut_slice()[1024..1536].copy_from_slice(&[3; 512]);
271 buffer.as_mut_slice()[1536..2048].copy_from_slice(&[4; 512]);
272 device.write(512, buffer.as_ref()).await.expect("write failed");
273 list.push(
274 1,
275 512..2048,
276 &[fletcher64(&[1; 512], 0), fletcher64(&[2; 512], 0), fletcher64(&[3; 512], 0)],
277 true,
278 )
279 .unwrap();
280
281 assert_eq!(list.clone().verify(&device, 10).await.expect("verify failed"), 10);
283
284 buffer.as_mut_slice()[512] = 0;
286 device.write(512, buffer.as_ref()).await.expect("write failed");
287
288 assert_eq!(list.clone().verify(&device, 10).await.expect("verify failed"), 1);
290
291 list.mark_deallocated(2, 1024..1536);
293 assert_eq!(list.clone().verify(&device, 10).await.expect("verify failed"), 10);
294
295 list.push(3, 2048..2560, &[fletcher64(&[4; 512], 0)], true).unwrap();
297 list.mark_deallocated(4, 1536..2048);
298
299 assert_eq!(list.clone().verify(&device, 10).await.expect("verify failed"), 10);
301
302 buffer.as_mut_slice()[1536] = 0;
304 device.write(512, buffer.as_ref()).await.expect("write failed");
305
306 assert_eq!(list.clone().verify(&device, 10).await.expect("verify failed"), 3);
308
309 buffer.as_mut_slice()[1024] = 0;
311 device.write(512, buffer.as_ref()).await.expect("write failed");
312
313 assert_eq!(list.verify(&device, 10).await.expect("verify failed"), 1);
316 }
317
318 #[fuchsia::test]
319 async fn test_verify_entry_prior_to_flushed_offset_is_ignored() {
320 let device = FakeDevice::new(2048, 512);
321 let mut buffer = device.allocate_buffer(2048).await;
322 let mut list = ChecksumList::new(2);
323
324 buffer.as_mut_slice()[0..512].copy_from_slice(&[1; 512]);
325 buffer.as_mut_slice()[512..1024].copy_from_slice(&[2; 512]);
326 device.write(512, buffer.as_ref()).await.expect("write failed");
327
328 list.push(1, 512..1024, &[fletcher64(&[2; 512], 0)], true).unwrap();
331
332 list.push(2, 1024..1536, &[fletcher64(&[2; 512], 0)], true).unwrap();
333
334 assert_eq!(list.verify(&device, 10).await.expect("verify failed"), 10);
335 }
336
337 #[fuchsia::test]
338 async fn test_deallocate_overlap() {
339 let device = FakeDevice::new(2048, 512);
340 let mut buffer = device.allocate_buffer(512).await;
341 let mut list = ChecksumList::new(1);
342
343 buffer.as_mut_slice().copy_from_slice(&[2; 512]);
344 device.write(2560, buffer.as_ref()).await.expect("write failed");
345
346 list.push(2, 512..1024, &[fletcher64(&[1; 512], 0)], true).unwrap();
347 list.mark_deallocated(3, 0..1024);
348 list.push(4, 2048..3072, &[fletcher64(&[2; 512], 0); 2], true).unwrap();
349 list.mark_deallocated(5, 1536..2560);
350
351 assert_eq!(list.verify(&device, 10).await.expect("verify failed"), 10);
352 }
353
354 #[fuchsia::test]
355 async fn test_different_chunk_size() {
356 let device = FakeDevice::new(2048, 512);
357 let mut buffer = device.allocate_buffer(1024).await;
358 let mut list = ChecksumList::new(1);
359
360 buffer.as_mut_slice().copy_from_slice(&[2; 1024]);
361 device.write(1024, buffer.as_ref()).await.expect("write failed");
362
363 let c0 = fletcher64(&[0; 512], 0);
364 let c2 = fletcher64(&[2; 512], 0);
365
366 list.push(1, 1024..2048, &[c2, c2], true).unwrap();
367
368 list.push(2, 1024..2048, &[c0], false)
369 .expect_err("Expected failure due to different chunk size");
370 assert_eq!(list.verify(&device, 2).await.expect("verify failed"), 2);
371 }
372
373 #[fuchsia::test]
374 async fn test_drop_unneeded_checksums_after_first_write() {
375 let device = FakeDevice::new(2048, 512);
376 let mut buffer = device.allocate_buffer(1024).await;
377 let mut list = ChecksumList::new(1);
378
379 buffer.as_mut_slice().copy_from_slice(&[2; 1024]);
380 device.write(1024, buffer.as_ref()).await.expect("write failed");
381
382 let c1 = fletcher64(&[1; 512], 0);
383 let c2 = fletcher64(&[2; 512], 0);
384
385 list.push(1, 1024..2048, &[c2, c2], true).unwrap();
386 list.push(2, 0..1024, &[c2, c1], false).unwrap();
387 assert_eq!(list.verify(&device, 2).await.expect("verify failed"), 2);
388 }
389
390 #[fuchsia::test]
391 async fn test_overlapping_checksums() {
392 let device = FakeDevice::new(2048, 512);
393 let mut buffer = device.allocate_buffer(1024).await;
394 let mut list = ChecksumList::new(1);
395
396 buffer.as_mut_slice().copy_from_slice(&[2; 1024]);
397 device.write(1024, buffer.as_ref()).await.expect("write failed");
398
399 let c1 = fletcher64(&[1; 512], 0);
400 let c2 = fletcher64(&[2; 512], 0);
401
402 list.push(1, 1024..2048, &[c2, c2], true).unwrap();
403
404 list.push(2, 512..2560, &[c1, c1, c1, c1], false).unwrap();
405 assert_eq!(list.verify(&device, 2).await.expect("verify failed"), 2);
406
407 buffer.as_mut_slice().copy_from_slice(&[1; 1024]);
409 device.write(1024, buffer.as_ref()).await.expect("write failed");
410 assert_eq!(list.verify(&device, 4).await.expect("verify failed"), 4);
411 }
412}