1use crate::filesystem::FxFilesystem;
6use crate::fsck::errors::{FsckError, FsckFatal, FsckIssue, FsckWarning};
7use crate::log::*;
8use crate::lsm_tree::Query;
9use crate::lsm_tree::skip_list_layer::SkipListLayer;
10use crate::lsm_tree::types::{
11 BoxedLayerIterator, Item, Key, Layer, LayerIterator, LayerKey, Value,
12};
13use crate::object_handle::INVALID_OBJECT_ID;
14use crate::object_store::allocator::{AllocatorKey, AllocatorValue, CoalescingIterator};
15use crate::object_store::journal::super_block::SuperBlockInstance;
16use crate::object_store::load_store_info;
17
18use crate::object_store::volume::root_volume;
19use anyhow::{Context, Error, anyhow};
20use futures::try_join;
21use fxfs_crypto::Crypt;
22use rustc_hash::FxHashSet as HashSet;
23use std::collections::BTreeMap;
24use std::iter::zip;
25use std::ops::Bound;
26use std::sync::Arc;
27use std::sync::atomic::{AtomicU64, Ordering};
28
29pub mod errors;
30
31mod store_scanner;
32
33#[cfg(test)]
34mod tests;
35
36pub const NUM_FRAGMENTATION_HISTOGRAM_SLOTS: usize = 12;
38#[derive(Default, Debug)]
39pub struct FragmentationStats {
40 pub extent_size: [u64; NUM_FRAGMENTATION_HISTOGRAM_SLOTS],
42 pub extent_count: [u64; NUM_FRAGMENTATION_HISTOGRAM_SLOTS],
44 pub free_space: [u64; NUM_FRAGMENTATION_HISTOGRAM_SLOTS],
46}
47
48impl FragmentationStats {
49 pub fn get_histogram_bucket_for_size(size: u64) -> usize {
51 return Self::get_histogram_bucket_for_count(size / 4096);
52 }
53 pub fn get_histogram_bucket_for_count(count: u64) -> usize {
55 let log_count = (64 - count.leading_zeros()) as usize;
56 return log_count.clamp(1, NUM_FRAGMENTATION_HISTOGRAM_SLOTS) - 1;
57 }
58}
59
60#[derive(Default, Debug)]
62pub struct FsckResult {
63 pub fragmentation: FragmentationStats,
64}
65
66pub struct FsckOptions<'a> {
67 pub fail_on_warning: bool,
69 pub halt_on_error: bool,
71 pub do_slow_passes: bool,
73 pub on_error: Box<dyn Fn(&FsckIssue) + Send + Sync + 'a>,
75 pub quiet: bool,
77 pub verbose: bool,
79 pub no_lock: bool,
81}
82
83impl Default for FsckOptions<'_> {
84 fn default() -> Self {
85 Self {
86 fail_on_warning: false,
87 halt_on_error: false,
88 do_slow_passes: true,
89 on_error: Box::new(FsckIssue::log),
90 quiet: false,
91 verbose: false,
92 no_lock: false,
93 }
94 }
95}
96
97pub async fn fsck(filesystem: Arc<FxFilesystem>) -> Result<FsckResult, Error> {
105 fsck_with_options(filesystem, &FsckOptions::default()).await
106}
107
108pub async fn fsck_with_options(
109 filesystem: Arc<FxFilesystem>,
110 options: &FsckOptions<'_>,
111) -> Result<FsckResult, Error> {
112 let mut result = FsckResult::default();
113
114 if !options.quiet {
115 info!("Starting fsck");
116 }
117
118 let _guard = if options.no_lock { None } else { Some(filesystem.lock_commits().await) };
119
120 let mut fsck = Fsck::new(options);
121
122 let object_manager = filesystem.object_manager();
123 let super_block_header = filesystem.super_block_header();
124
125 let mut journal_checkpoint_ids: HashSet<u64> = HashSet::default();
128 journal_checkpoint_ids.insert(super_block_header.allocator_object_id);
129 journal_checkpoint_ids.insert(super_block_header.root_store_object_id);
130
131 let mut root_objects =
133 vec![super_block_header.root_store_object_id, super_block_header.journal_object_id];
134 root_objects.append(&mut object_manager.root_store().parent_objects());
135 fsck.verbose("Scanning root parent store...");
136 store_scanner::scan_store(
137 &fsck,
138 object_manager.root_parent_store().as_ref(),
139 &root_objects,
140 &mut result,
141 )
142 .await?;
143 fsck.verbose("Scanning root parent store done");
144
145 let root_store = &object_manager.root_store();
146 let mut root_store_root_objects = Vec::new();
147 root_store_root_objects.append(&mut vec![
148 super_block_header.allocator_object_id,
149 SuperBlockInstance::A.object_id(),
150 SuperBlockInstance::B.object_id(),
151 ]);
152 root_store_root_objects.append(&mut root_store.root_objects());
153
154 let root_volume = root_volume(filesystem.clone()).await?;
155 let volume_directory = root_volume.volume_directory();
156 let layer_set = volume_directory.store().tree().layer_set();
157 let mut merger = layer_set.merger();
158 let mut iter = volume_directory.iter(&mut merger).await?;
159
160 while let Some((_, store_id, _)) = iter.get() {
162 journal_checkpoint_ids.insert(store_id);
163 fsck.check_child_store_metadata(
164 filesystem.as_ref(),
165 store_id,
166 &mut root_store_root_objects,
167 )
168 .await?;
169 iter.advance().await?;
170 }
171
172 let allocator = filesystem.allocator();
173 root_store_root_objects.append(&mut allocator.parent_objects());
174
175 if fsck.options.do_slow_passes {
176 let layer_set = root_store.tree().immutable_layer_set();
178 fsck.verbose(format!("Checking {} layers for root store...", layer_set.layers.len()));
179 for layer in layer_set.layers {
180 if let Some(handle) = layer.handle() {
181 fsck.verbose(format!(
182 "Layer file {} for root_store is {} bytes",
183 handle.object_id(),
184 handle.get_size()
185 ));
186 }
187 fsck.check_layer_file_contents(
188 root_store.store_object_id(),
189 layer.handle().map(|h| h.object_id()).unwrap_or(INVALID_OBJECT_ID),
190 layer.clone(),
191 )
192 .await?;
193 }
194
195 let layer_set = allocator.tree().immutable_layer_set();
197 fsck.verbose(format!("Checking {} layers for allocator...", layer_set.layers.len()));
198 for layer in layer_set.layers {
199 if let Some(handle) = layer.handle() {
200 fsck.verbose(format!(
201 "Layer file {} for allocator is {} bytes",
202 handle.object_id(),
203 handle.get_size()
204 ));
205 }
206 fsck.check_layer_file_contents(
207 allocator.object_id(),
208 layer.handle().map(|h| h.object_id()).unwrap_or(INVALID_OBJECT_ID),
209 layer.clone(),
210 )
211 .await?;
212 }
213 fsck.verbose("Checking layers done");
214 }
215
216 fsck.verbose("Scanning root object store...");
218 store_scanner::scan_store(&fsck, root_store.as_ref(), &root_store_root_objects, &mut result)
219 .await?;
220 fsck.verbose("Scanning root object store done");
221
222 fsck.verbose("Verifying allocations...");
224 let mut store_ids = HashSet::default();
225 store_ids.insert(root_store.store_object_id());
226 store_ids.insert(object_manager.root_parent_store().store_object_id());
227 fsck.verify_allocations(filesystem.as_ref(), &store_ids, &mut result).await?;
228 fsck.verbose("Verifying allocations done");
229
230 for object_id in object_manager.journal_file_offsets().0.keys() {
235 if !journal_checkpoint_ids.contains(object_id) {
236 fsck.error(FsckError::UnexpectedJournalFileOffset(*object_id))?;
237 }
238 }
239
240 let errors = fsck.errors();
241 let warnings = fsck.warnings();
242 if errors > 0 || (fsck.options.fail_on_warning && warnings > 0) {
243 Err(anyhow!("Fsck encountered {} errors, {} warnings", errors, warnings))
244 } else {
245 if warnings > 0 {
246 warn!(count = warnings; "Fsck encountered warnings");
247 } else {
248 if !options.quiet {
249 info!("No issues detected");
250 }
251 }
252 Ok(result)
253 }
254}
255
256pub async fn fsck_volume(
260 filesystem: &FxFilesystem,
261 store_id: u64,
262 crypt: Option<Arc<dyn Crypt>>,
263) -> Result<FsckResult, Error> {
264 fsck_volume_with_options(filesystem, &FsckOptions::default(), store_id, crypt).await
265}
266
267pub async fn fsck_volume_with_options(
268 filesystem: &FxFilesystem,
269 options: &FsckOptions<'_>,
270 store_id: u64,
271 crypt: Option<Arc<dyn Crypt>>,
272) -> Result<FsckResult, Error> {
273 let mut result = FsckResult::default();
274 if !options.quiet {
275 info!(store_id:?; "Starting volume fsck");
276 }
277
278 let _guard = if options.no_lock { None } else { Some(filesystem.lock_commits().await) };
279
280 let mut fsck = Fsck::new(options);
281 fsck.check_child_store(filesystem, store_id, crypt, &mut result).await?;
282 let mut store_ids = HashSet::default();
283 store_ids.insert(store_id);
284 fsck.verify_allocations(filesystem, &store_ids, &mut result).await?;
285
286 let errors = fsck.errors();
287 let warnings = fsck.warnings();
288 if errors > 0 || (fsck.options.fail_on_warning && warnings > 0) {
289 Err(anyhow!("Volume fsck encountered {} errors, {} warnings", errors, warnings))
290 } else {
291 if warnings > 0 {
292 warn!(count = warnings; "Volume fsck encountered warnings");
293 } else {
294 if !options.quiet {
295 info!("No issues detected");
296 }
297 }
298 Ok(result)
299 }
300}
301
302struct Fsck<'a> {
303 options: &'a FsckOptions<'a>,
304 allocations: Arc<SkipListLayer<AllocatorKey, AllocatorValue>>,
306 errors: AtomicU64,
307 warnings: AtomicU64,
308}
309
310impl<'a> Fsck<'a> {
311 fn new(options: &'a FsckOptions<'a>) -> Self {
312 Fsck {
313 options,
314 allocations: SkipListLayer::new(2048),
316 errors: AtomicU64::new(0),
317 warnings: AtomicU64::new(0),
318 }
319 }
320
321 fn verbose(&self, message: impl AsRef<str>) {
323 if self.options.verbose {
324 info!(message = message.as_ref(); "fsck");
325 }
326 }
327
328 fn errors(&self) -> u64 {
329 self.errors.load(Ordering::Relaxed)
330 }
331
332 fn warnings(&self) -> u64 {
333 self.warnings.load(Ordering::Relaxed)
334 }
335
336 fn assert<V>(&self, res: Result<V, Error>, error: FsckFatal) -> Result<V, Error> {
337 if res.is_err() {
338 (self.options.on_error)(&FsckIssue::Fatal(error.clone()));
339 return Err(anyhow!("{:?}", error)).context(res.err().unwrap());
340 }
341 res
342 }
343
344 fn warning(&self, error: FsckWarning) -> Result<(), Error> {
345 (self.options.on_error)(&FsckIssue::Warning(error));
346 self.warnings.fetch_add(1, Ordering::Relaxed);
347 Ok(())
348 }
349
350 fn error(&self, error: FsckError) -> Result<(), Error> {
351 (self.options.on_error)(&FsckIssue::Error(error.clone()));
352 self.errors.fetch_add(1, Ordering::Relaxed);
353 if self.options.halt_on_error { Err(anyhow!("{:?}", error)) } else { Ok(()) }
354 }
355
356 fn fatal(&self, error: FsckFatal) -> Result<(), Error> {
357 (self.options.on_error)(&FsckIssue::Fatal(error.clone()));
358 Err(anyhow!("{:?}", error))
359 }
360
361 async fn check_child_store_metadata(
363 &mut self,
364 filesystem: &FxFilesystem,
365 store_id: u64,
366 root_store_root_objects: &mut Vec<u64>,
367 ) -> Result<(), Error> {
368 let root_store = filesystem.root_store();
369
370 let info = self.assert(
372 load_store_info(&root_store, store_id).await,
373 FsckFatal::MalformedStore(store_id),
374 )?;
375 root_store_root_objects.append(&mut info.parent_objects());
376 Ok(())
377 }
378
379 async fn check_child_store(
380 &mut self,
381 filesystem: &FxFilesystem,
382 store_id: u64,
383 crypt: Option<Arc<dyn Crypt>>,
384 result: &mut FsckResult,
385 ) -> Result<(), Error> {
386 let store =
387 filesystem.object_manager().store(store_id).context("open_store failed").unwrap();
388
389 let _relock_guard;
390 if store.is_locked() {
391 if let Some(crypt) = &crypt {
392 store.unlock_read_only(crypt.clone()).await?;
393 _relock_guard = scopeguard::guard(store.clone(), |store| {
394 store.lock_read_only();
395 });
396 } else {
397 return Err(anyhow!("Invalid key"));
398 }
399 }
400
401 if self.options.do_slow_passes {
402 let layer_set = store.tree().immutable_layer_set();
403 for layer in layer_set.layers {
404 let (layer_object_id, layer_size) = if let Some(h) = layer.handle() {
405 (h.object_id(), h.get_size())
406 } else {
407 (0, 0)
408 };
409 self.verbose(format!(
410 "Layer file {} for store {} is {} bytes",
411 layer_object_id, store_id, layer_size,
412 ));
413 self.check_layer_file_contents(store_id, layer_object_id, layer.clone()).await?
414 }
415 }
416
417 store_scanner::scan_store(self, store.as_ref(), &store.root_objects(), result)
418 .await
419 .context("scan_store failed")
420 }
421
422 async fn check_layer_file_contents<K: Key + LayerKey, V: Value>(
423 &self,
424 allocator_or_store_object_id: u64,
426 layer_file_object_id: u64,
427 layer: Arc<dyn Layer<K, V>>,
428 ) -> Result<(), Error> {
429 let mut iter: BoxedLayerIterator<'_, K, V> = self.assert(
430 layer.seek(Bound::Unbounded).await,
431 FsckFatal::MalformedLayerFile(allocator_or_store_object_id, layer_file_object_id),
432 )?;
433
434 let mut last_item: Option<Item<K, V>> = None;
435 while let Some(item) = iter.get() {
436 if let Some(last) = last_item {
437 if !last.key.cmp_upper_bound(&item.key).is_le() {
438 self.fatal(FsckFatal::MisOrderedLayerFile(
439 allocator_or_store_object_id,
440 layer_file_object_id,
441 ))?;
442 }
443 if last.key.overlaps(&item.key) {
444 self.fatal(FsckFatal::OverlappingKeysInLayerFile(
445 allocator_or_store_object_id,
446 layer_file_object_id,
447 item.into(),
448 last.as_item_ref().into(),
449 ))?;
450 }
451 }
452 if !layer.maybe_contains_key(item.key) {
453 self.fatal(FsckFatal::InvalidBloomFilter(
455 allocator_or_store_object_id,
456 layer_file_object_id,
457 item.into(),
458 ))?;
459 }
460 last_item = Some(item.cloned());
461 self.assert(
462 iter.advance().await,
463 FsckFatal::MalformedLayerFile(allocator_or_store_object_id, layer_file_object_id),
464 )?;
465 }
466 Ok(())
467 }
468
469 async fn verify_allocations(
471 &self,
472 filesystem: &FxFilesystem,
473 store_object_ids: &HashSet<u64>,
474 result: &mut FsckResult,
475 ) -> Result<(), Error> {
476 let allocator = filesystem.allocator();
477 let layer_set = allocator.tree().layer_set();
478 let mut merger = layer_set.merger();
479 let mut stored_allocations = CoalescingIterator::new(
480 allocator.filter(merger.query(Query::FullScan).await?, true).await?,
481 )
482 .await
483 .expect("filter failed");
484 let mut observed_allocations =
485 CoalescingIterator::new(self.allocations.seek(Bound::Unbounded).await?).await?;
486 let mut observed_owner_allocated_bytes = BTreeMap::new();
487 let mut extra_allocations: Vec<errors::Allocation> = vec![];
488 let bs = filesystem.block_size();
489 let mut previous_allocation_end = 0;
490 while let Some(allocation) = stored_allocations.get() {
491 if allocation.key.device_range.start % bs > 0
492 || allocation.key.device_range.end % bs > 0
493 {
494 self.error(FsckError::MisalignedAllocation(allocation.into()))?;
495 } else if allocation.key.device_range.start >= allocation.key.device_range.end {
496 self.error(FsckError::MalformedAllocation(allocation.into()))?;
497 }
498 let owner_object_id = match allocation.value {
499 AllocatorValue::None => INVALID_OBJECT_ID,
500 AllocatorValue::Abs { owner_object_id, .. } => *owner_object_id,
501 };
502 let r = &allocation.key.device_range;
503
504 if allocation.value != &AllocatorValue::None {
506 if r.start > previous_allocation_end {
507 let size = r.start - previous_allocation_end;
508 result.fragmentation.free_space
509 [FragmentationStats::get_histogram_bucket_for_size(size)] += 1;
510 }
511 previous_allocation_end = r.end;
512 }
513
514 *observed_owner_allocated_bytes.entry(owner_object_id).or_insert(0) += r.end - r.start;
515 if !store_object_ids.contains(&owner_object_id) {
516 if filesystem.object_manager().store(owner_object_id).is_none() {
517 self.error(FsckError::AllocationForNonexistentOwner(allocation.into()))?;
518 }
519 stored_allocations.advance().await?;
520 continue;
521 }
522 match observed_allocations.get() {
524 None => extra_allocations.push(allocation.into()),
525 Some(observed_allocation) => {
526 if allocation.key.device_range.end <= observed_allocation.key.device_range.start
527 {
528 extra_allocations.push(allocation.into());
529 stored_allocations.advance().await?;
530 continue;
531 }
532 if observed_allocation.key.device_range.end <= allocation.key.device_range.start
533 {
534 self.error(FsckError::MissingAllocation(observed_allocation.into()))?;
535 observed_allocations.advance().await?;
536 continue;
537 }
538 if allocation.key != observed_allocation.key
540 || allocation.value != observed_allocation.value
541 {
542 self.error(FsckError::AllocationMismatch(
543 observed_allocation.into(),
544 allocation.into(),
545 ))?;
546 stored_allocations.advance().await?;
547 continue;
548 }
549 }
550 }
551 try_join!(stored_allocations.advance(), observed_allocations.advance())?;
552 }
553 let device_size =
554 filesystem.device().block_count() * filesystem.device().block_size() as u64;
555 if previous_allocation_end < device_size {
556 let size = device_size - previous_allocation_end;
557 result.fragmentation.free_space
558 [FragmentationStats::get_histogram_bucket_for_size(size)] += 1;
559 }
560 while let Some(allocation) = observed_allocations.get() {
561 self.error(FsckError::MissingAllocation(allocation.into()))?;
562 observed_allocations.advance().await?;
563 continue;
564 }
565 let expected_allocated_bytes = observed_owner_allocated_bytes.values().sum::<u64>();
566 self.verbose(format!(
567 "Found {} bytes allocated (expected {} bytes). Total device size is {} bytes.",
568 allocator.get_allocated_bytes(),
569 expected_allocated_bytes,
570 device_size,
571 ));
572 if !extra_allocations.is_empty() {
573 self.error(FsckError::ExtraAllocations(extra_allocations))?;
574 }
575 let owner_allocated_bytes = allocator
579 .get_owner_allocated_bytes()
580 .into_iter()
581 .filter(|(_, v)| *v > 0)
582 .collect::<BTreeMap<_, _>>();
583 if expected_allocated_bytes != allocator.get_allocated_bytes()
584 || observed_owner_allocated_bytes.len() != owner_allocated_bytes.len()
585 || zip(observed_owner_allocated_bytes.iter(), owner_allocated_bytes.iter())
586 .filter(|((k1, v1), (k2, v2))| (*k1, *v1) != (*k2, *v2))
587 .count()
588 != 0
589 {
590 self.error(FsckError::AllocatedBytesMismatch(
591 observed_owner_allocated_bytes.iter().map(|(k, v)| (*k, *v)).collect(),
592 owner_allocated_bytes.iter().map(|(k, v)| (*k, *v)).collect(),
593 ))?;
594 }
595 for (k, v) in allocator.owner_byte_limits() {
596 if !owner_allocated_bytes.contains_key(&k) {
597 self.warning(FsckWarning::LimitForNonExistentStore(k, v))?;
598 }
599 }
600 Ok(())
601 }
602}