1use crate::filesystem::FxFilesystem;
6use crate::fsck::errors::{FsckError, FsckFatal, FsckIssue, FsckWarning};
7use crate::log::*;
8use crate::lsm_tree::Query;
9use crate::lsm_tree::skip_list_layer::SkipListLayer;
10use crate::lsm_tree::types::{
11 BoxedLayerIterator, Item, Key, Layer, LayerIterator, OrdUpperBound, RangeKey, Value,
12};
13use crate::object_handle::INVALID_OBJECT_ID;
14use crate::object_store::allocator::{AllocatorKey, AllocatorValue, CoalescingIterator};
15use crate::object_store::journal::super_block::SuperBlockInstance;
16use crate::object_store::load_store_info;
17use crate::object_store::transaction::{LockKey, lock_keys};
18use crate::object_store::volume::root_volume;
19use anyhow::{Context, Error, anyhow};
20use futures::try_join;
21use fxfs_crypto::Crypt;
22use rustc_hash::FxHashSet as HashSet;
23use std::collections::BTreeMap;
24use std::iter::zip;
25use std::ops::Bound;
26use std::sync::Arc;
27use std::sync::atomic::{AtomicU64, Ordering};
28
29pub mod errors;
30
31mod store_scanner;
32
33#[cfg(test)]
34mod tests;
35
36pub const NUM_FRAGMENTATION_HISTOGRAM_SLOTS: usize = 12;
38#[derive(Default, Debug)]
39pub struct FragmentationStats {
40 pub extent_size: [u64; NUM_FRAGMENTATION_HISTOGRAM_SLOTS],
42 pub extent_count: [u64; NUM_FRAGMENTATION_HISTOGRAM_SLOTS],
44 pub free_space: [u64; NUM_FRAGMENTATION_HISTOGRAM_SLOTS],
46}
47
48impl FragmentationStats {
49 pub fn get_histogram_bucket_for_size(size: u64) -> usize {
51 return Self::get_histogram_bucket_for_count(size / 4096);
52 }
53 pub fn get_histogram_bucket_for_count(count: u64) -> usize {
55 let log_count = (64 - count.leading_zeros()) as usize;
56 return log_count.clamp(1, NUM_FRAGMENTATION_HISTOGRAM_SLOTS) - 1;
57 }
58}
59
60#[derive(Default, Debug)]
62pub struct FsckResult {
63 pub fragmentation: FragmentationStats,
64}
65
66pub struct FsckOptions<'a> {
67 pub fail_on_warning: bool,
69 pub halt_on_error: bool,
71 pub do_slow_passes: bool,
73 pub on_error: Box<dyn Fn(&FsckIssue) + Send + Sync + 'a>,
75 pub quiet: bool,
77 pub verbose: bool,
79 pub no_lock: bool,
81}
82
83impl Default for FsckOptions<'_> {
84 fn default() -> Self {
85 Self {
86 fail_on_warning: false,
87 halt_on_error: false,
88 do_slow_passes: true,
89 on_error: Box::new(FsckIssue::log),
90 quiet: false,
91 verbose: false,
92 no_lock: false,
93 }
94 }
95}
96
97pub async fn fsck(filesystem: Arc<FxFilesystem>) -> Result<FsckResult, Error> {
105 fsck_with_options(filesystem, &FsckOptions::default()).await
106}
107
108pub async fn fsck_with_options(
109 filesystem: Arc<FxFilesystem>,
110 options: &FsckOptions<'_>,
111) -> Result<FsckResult, Error> {
112 let mut result = FsckResult::default();
113
114 if !options.quiet {
115 info!("Starting fsck");
116 }
117
118 let _guard = if options.no_lock {
119 None
120 } else {
121 Some(filesystem.lock_manager().write_lock(lock_keys![LockKey::Filesystem]).await)
122 };
123
124 let mut fsck = Fsck::new(options);
125
126 let object_manager = filesystem.object_manager();
127 let super_block_header = filesystem.super_block_header();
128
129 let mut journal_checkpoint_ids: HashSet<u64> = HashSet::default();
132 journal_checkpoint_ids.insert(super_block_header.allocator_object_id);
133 journal_checkpoint_ids.insert(super_block_header.root_store_object_id);
134
135 let mut root_objects =
137 vec![super_block_header.root_store_object_id, super_block_header.journal_object_id];
138 root_objects.append(&mut object_manager.root_store().parent_objects());
139 fsck.verbose("Scanning root parent store...");
140 store_scanner::scan_store(
141 &fsck,
142 object_manager.root_parent_store().as_ref(),
143 &root_objects,
144 &mut result,
145 )
146 .await?;
147 fsck.verbose("Scanning root parent store done");
148
149 let root_store = &object_manager.root_store();
150 let mut root_store_root_objects = Vec::new();
151 root_store_root_objects.append(&mut vec![
152 super_block_header.allocator_object_id,
153 SuperBlockInstance::A.object_id(),
154 SuperBlockInstance::B.object_id(),
155 ]);
156 root_store_root_objects.append(&mut root_store.root_objects());
157
158 let root_volume = root_volume(filesystem.clone()).await?;
159 let volume_directory = root_volume.volume_directory();
160 let layer_set = volume_directory.store().tree().layer_set();
161 let mut merger = layer_set.merger();
162 let mut iter = volume_directory.iter(&mut merger).await?;
163
164 while let Some((_, store_id, _)) = iter.get() {
166 journal_checkpoint_ids.insert(store_id);
167 fsck.check_child_store_metadata(
168 filesystem.as_ref(),
169 store_id,
170 &mut root_store_root_objects,
171 )
172 .await?;
173 iter.advance().await?;
174 }
175
176 let allocator = filesystem.allocator();
177 root_store_root_objects.append(&mut allocator.parent_objects());
178
179 if fsck.options.do_slow_passes {
180 let layer_set = root_store.tree().immutable_layer_set();
182 fsck.verbose(format!("Checking {} layers for root store...", layer_set.layers.len()));
183 for layer in layer_set.layers {
184 if let Some(handle) = layer.handle() {
185 fsck.verbose(format!(
186 "Layer file {} for root_store is {} bytes",
187 handle.object_id(),
188 handle.get_size()
189 ));
190 }
191 fsck.check_layer_file_contents(
192 root_store.store_object_id(),
193 layer.handle().map(|h| h.object_id()).unwrap_or(INVALID_OBJECT_ID),
194 layer.clone(),
195 )
196 .await?;
197 }
198
199 let layer_set = allocator.tree().immutable_layer_set();
201 fsck.verbose(format!("Checking {} layers for allocator...", layer_set.layers.len()));
202 for layer in layer_set.layers {
203 if let Some(handle) = layer.handle() {
204 fsck.verbose(format!(
205 "Layer file {} for allocator is {} bytes",
206 handle.object_id(),
207 handle.get_size()
208 ));
209 }
210 fsck.check_layer_file_contents(
211 allocator.object_id(),
212 layer.handle().map(|h| h.object_id()).unwrap_or(INVALID_OBJECT_ID),
213 layer.clone(),
214 )
215 .await?;
216 }
217 fsck.verbose("Checking layers done");
218 }
219
220 fsck.verbose("Scanning root object store...");
222 store_scanner::scan_store(&fsck, root_store.as_ref(), &root_store_root_objects, &mut result)
223 .await?;
224 fsck.verbose("Scanning root object store done");
225
226 fsck.verbose("Verifying allocations...");
228 let mut store_ids = HashSet::default();
229 store_ids.insert(root_store.store_object_id());
230 store_ids.insert(object_manager.root_parent_store().store_object_id());
231 fsck.verify_allocations(filesystem.as_ref(), &store_ids, &mut result).await?;
232 fsck.verbose("Verifying allocations done");
233
234 for object_id in object_manager.journal_file_offsets().0.keys() {
239 if !journal_checkpoint_ids.contains(object_id) {
240 fsck.error(FsckError::UnexpectedJournalFileOffset(*object_id))?;
241 }
242 }
243
244 let errors = fsck.errors();
245 let warnings = fsck.warnings();
246 if errors > 0 || (fsck.options.fail_on_warning && warnings > 0) {
247 Err(anyhow!("Fsck encountered {} errors, {} warnings", errors, warnings))
248 } else {
249 if warnings > 0 {
250 warn!(count = warnings; "Fsck encountered warnings");
251 } else {
252 if !options.quiet {
253 info!("No issues detected");
254 }
255 }
256 Ok(result)
257 }
258}
259
260pub async fn fsck_volume(
264 filesystem: &FxFilesystem,
265 store_id: u64,
266 crypt: Option<Arc<dyn Crypt>>,
267) -> Result<FsckResult, Error> {
268 fsck_volume_with_options(filesystem, &FsckOptions::default(), store_id, crypt).await
269}
270
271pub async fn fsck_volume_with_options(
272 filesystem: &FxFilesystem,
273 options: &FsckOptions<'_>,
274 store_id: u64,
275 crypt: Option<Arc<dyn Crypt>>,
276) -> Result<FsckResult, Error> {
277 let mut result = FsckResult::default();
278 if !options.quiet {
279 info!(store_id:?; "Starting volume fsck");
280 }
281
282 let _guard = if options.no_lock {
283 None
284 } else {
285 Some(filesystem.lock_manager().write_lock(lock_keys![LockKey::Filesystem]).await)
286 };
287
288 let mut fsck = Fsck::new(options);
289 fsck.check_child_store(filesystem, store_id, crypt, &mut result).await?;
290 let mut store_ids = HashSet::default();
291 store_ids.insert(store_id);
292 fsck.verify_allocations(filesystem, &store_ids, &mut result).await?;
293
294 let errors = fsck.errors();
295 let warnings = fsck.warnings();
296 if errors > 0 || (fsck.options.fail_on_warning && warnings > 0) {
297 Err(anyhow!("Volume fsck encountered {} errors, {} warnings", errors, warnings))
298 } else {
299 if warnings > 0 {
300 warn!(count = warnings; "Volume fsck encountered warnings");
301 } else {
302 if !options.quiet {
303 info!("No issues detected");
304 }
305 }
306 Ok(result)
307 }
308}
309
310trait KeyExt: PartialEq {
311 fn overlaps(&self, other: &Self) -> bool;
312}
313
314impl<K: RangeKey + PartialEq> KeyExt for K {
315 fn overlaps(&self, other: &Self) -> bool {
316 RangeKey::overlaps(self, other)
317 }
318}
319
320struct Fsck<'a> {
321 options: &'a FsckOptions<'a>,
322 allocations: Arc<SkipListLayer<AllocatorKey, AllocatorValue>>,
324 errors: AtomicU64,
325 warnings: AtomicU64,
326}
327
328impl<'a> Fsck<'a> {
329 fn new(options: &'a FsckOptions<'a>) -> Self {
330 Fsck {
331 options,
332 allocations: SkipListLayer::new(2048),
334 errors: AtomicU64::new(0),
335 warnings: AtomicU64::new(0),
336 }
337 }
338
339 fn verbose(&self, message: impl AsRef<str>) {
341 if self.options.verbose {
342 info!(message = message.as_ref(); "fsck");
343 }
344 }
345
346 fn errors(&self) -> u64 {
347 self.errors.load(Ordering::Relaxed)
348 }
349
350 fn warnings(&self) -> u64 {
351 self.warnings.load(Ordering::Relaxed)
352 }
353
354 fn assert<V>(&self, res: Result<V, Error>, error: FsckFatal) -> Result<V, Error> {
355 if res.is_err() {
356 (self.options.on_error)(&FsckIssue::Fatal(error.clone()));
357 return Err(anyhow!("{:?}", error)).context(res.err().unwrap());
358 }
359 res
360 }
361
362 fn warning(&self, error: FsckWarning) -> Result<(), Error> {
363 (self.options.on_error)(&FsckIssue::Warning(error));
364 self.warnings.fetch_add(1, Ordering::Relaxed);
365 Ok(())
366 }
367
368 fn error(&self, error: FsckError) -> Result<(), Error> {
369 (self.options.on_error)(&FsckIssue::Error(error.clone()));
370 self.errors.fetch_add(1, Ordering::Relaxed);
371 if self.options.halt_on_error { Err(anyhow!("{:?}", error)) } else { Ok(()) }
372 }
373
374 fn fatal(&self, error: FsckFatal) -> Result<(), Error> {
375 (self.options.on_error)(&FsckIssue::Fatal(error.clone()));
376 Err(anyhow!("{:?}", error))
377 }
378
379 async fn check_child_store_metadata(
381 &mut self,
382 filesystem: &FxFilesystem,
383 store_id: u64,
384 root_store_root_objects: &mut Vec<u64>,
385 ) -> Result<(), Error> {
386 let root_store = filesystem.root_store();
387
388 let info = self.assert(
390 load_store_info(&root_store, store_id).await,
391 FsckFatal::MalformedStore(store_id),
392 )?;
393 root_store_root_objects.append(&mut info.parent_objects());
394 Ok(())
395 }
396
397 async fn check_child_store(
398 &mut self,
399 filesystem: &FxFilesystem,
400 store_id: u64,
401 crypt: Option<Arc<dyn Crypt>>,
402 result: &mut FsckResult,
403 ) -> Result<(), Error> {
404 let store =
405 filesystem.object_manager().store(store_id).context("open_store failed").unwrap();
406
407 let _relock_guard;
408 if store.is_locked() {
409 if let Some(crypt) = &crypt {
410 store.unlock_read_only(crypt.clone()).await?;
411 _relock_guard = scopeguard::guard(store.clone(), |store| {
412 store.lock_read_only();
413 });
414 } else {
415 return Err(anyhow!("Invalid key"));
416 }
417 }
418
419 if self.options.do_slow_passes {
420 let layer_set = store.tree().immutable_layer_set();
421 for layer in layer_set.layers {
422 let (layer_object_id, layer_size) = if let Some(h) = layer.handle() {
423 (h.object_id(), h.get_size())
424 } else {
425 (0, 0)
426 };
427 self.verbose(format!(
428 "Layer file {} for store {} is {} bytes",
429 layer_object_id, store_id, layer_size,
430 ));
431 self.check_layer_file_contents(store_id, layer_object_id, layer.clone()).await?
432 }
433 }
434
435 store_scanner::scan_store(self, store.as_ref(), &store.root_objects(), result)
436 .await
437 .context("scan_store failed")
438 }
439
440 async fn check_layer_file_contents<
441 K: Key + KeyExt + OrdUpperBound + std::fmt::Debug,
442 V: Value + std::fmt::Debug,
443 >(
444 &self,
445 allocator_or_store_object_id: u64,
447 layer_file_object_id: u64,
448 layer: Arc<dyn Layer<K, V>>,
449 ) -> Result<(), Error> {
450 let mut iter: BoxedLayerIterator<'_, K, V> = self.assert(
451 layer.seek(Bound::Unbounded).await,
452 FsckFatal::MalformedLayerFile(allocator_or_store_object_id, layer_file_object_id),
453 )?;
454
455 let mut last_item: Option<Item<K, V>> = None;
456 while let Some(item) = iter.get() {
457 if let Some(last) = last_item {
458 if !last.key.cmp_upper_bound(&item.key).is_le() {
459 self.fatal(FsckFatal::MisOrderedLayerFile(
460 allocator_or_store_object_id,
461 layer_file_object_id,
462 ))?;
463 }
464 if last.key.overlaps(&item.key) {
465 self.fatal(FsckFatal::OverlappingKeysInLayerFile(
466 allocator_or_store_object_id,
467 layer_file_object_id,
468 item.into(),
469 last.as_item_ref().into(),
470 ))?;
471 }
472 }
473 if !layer.maybe_contains_key(item.key) {
474 self.fatal(FsckFatal::InvalidBloomFilter(
476 allocator_or_store_object_id,
477 layer_file_object_id,
478 item.into(),
479 ))?;
480 }
481 last_item = Some(item.cloned());
482 self.assert(
483 iter.advance().await,
484 FsckFatal::MalformedLayerFile(allocator_or_store_object_id, layer_file_object_id),
485 )?;
486 }
487 Ok(())
488 }
489
490 async fn verify_allocations(
492 &self,
493 filesystem: &FxFilesystem,
494 store_object_ids: &HashSet<u64>,
495 result: &mut FsckResult,
496 ) -> Result<(), Error> {
497 let allocator = filesystem.allocator();
498 let layer_set = allocator.tree().layer_set();
499 let mut merger = layer_set.merger();
500 let mut stored_allocations = CoalescingIterator::new(
501 allocator.filter(merger.query(Query::FullScan).await?, true).await?,
502 )
503 .await
504 .expect("filter failed");
505 let mut observed_allocations =
506 CoalescingIterator::new(self.allocations.seek(Bound::Unbounded).await?).await?;
507 let mut observed_owner_allocated_bytes = BTreeMap::new();
508 let mut extra_allocations: Vec<errors::Allocation> = vec![];
509 let bs = filesystem.block_size();
510 let mut previous_allocation_end = 0;
511 while let Some(allocation) = stored_allocations.get() {
512 if allocation.key.device_range.start % bs > 0
513 || allocation.key.device_range.end % bs > 0
514 {
515 self.error(FsckError::MisalignedAllocation(allocation.into()))?;
516 } else if allocation.key.device_range.start >= allocation.key.device_range.end {
517 self.error(FsckError::MalformedAllocation(allocation.into()))?;
518 }
519 let owner_object_id = match allocation.value {
520 AllocatorValue::None => INVALID_OBJECT_ID,
521 AllocatorValue::Abs { owner_object_id, .. } => *owner_object_id,
522 };
523 let r = &allocation.key.device_range;
524
525 if allocation.value != &AllocatorValue::None {
527 if r.start > previous_allocation_end {
528 let size = r.start - previous_allocation_end;
529 result.fragmentation.free_space
530 [FragmentationStats::get_histogram_bucket_for_size(size)] += 1;
531 }
532 previous_allocation_end = r.end;
533 }
534
535 *observed_owner_allocated_bytes.entry(owner_object_id).or_insert(0) += r.end - r.start;
536 if !store_object_ids.contains(&owner_object_id) {
537 if filesystem.object_manager().store(owner_object_id).is_none() {
538 self.error(FsckError::AllocationForNonexistentOwner(allocation.into()))?;
539 }
540 stored_allocations.advance().await?;
541 continue;
542 }
543 match observed_allocations.get() {
545 None => extra_allocations.push(allocation.into()),
546 Some(observed_allocation) => {
547 if allocation.key.device_range.end <= observed_allocation.key.device_range.start
548 {
549 extra_allocations.push(allocation.into());
550 stored_allocations.advance().await?;
551 continue;
552 }
553 if observed_allocation.key.device_range.end <= allocation.key.device_range.start
554 {
555 self.error(FsckError::MissingAllocation(observed_allocation.into()))?;
556 observed_allocations.advance().await?;
557 continue;
558 }
559 if allocation.key != observed_allocation.key
561 || allocation.value != observed_allocation.value
562 {
563 self.error(FsckError::AllocationMismatch(
564 observed_allocation.into(),
565 allocation.into(),
566 ))?;
567 stored_allocations.advance().await?;
568 continue;
569 }
570 }
571 }
572 try_join!(stored_allocations.advance(), observed_allocations.advance())?;
573 }
574 let device_size =
575 filesystem.device().block_count() * filesystem.device().block_size() as u64;
576 if previous_allocation_end < device_size {
577 let size = device_size - previous_allocation_end;
578 result.fragmentation.free_space
579 [FragmentationStats::get_histogram_bucket_for_size(size)] += 1;
580 }
581 while let Some(allocation) = observed_allocations.get() {
582 self.error(FsckError::MissingAllocation(allocation.into()))?;
583 observed_allocations.advance().await?;
584 continue;
585 }
586 let expected_allocated_bytes = observed_owner_allocated_bytes.values().sum::<u64>();
587 self.verbose(format!(
588 "Found {} bytes allocated (expected {} bytes). Total device size is {} bytes.",
589 allocator.get_allocated_bytes(),
590 expected_allocated_bytes,
591 device_size,
592 ));
593 if !extra_allocations.is_empty() {
594 self.error(FsckError::ExtraAllocations(extra_allocations))?;
595 }
596 let owner_allocated_bytes = allocator
600 .get_owner_allocated_bytes()
601 .into_iter()
602 .filter(|(_, v)| *v > 0)
603 .collect::<BTreeMap<_, _>>();
604 if expected_allocated_bytes != allocator.get_allocated_bytes()
605 || observed_owner_allocated_bytes.len() != owner_allocated_bytes.len()
606 || zip(observed_owner_allocated_bytes.iter(), owner_allocated_bytes.iter())
607 .filter(|((k1, v1), (k2, v2))| (*k1, *v1) != (*k2, *v2))
608 .count()
609 != 0
610 {
611 self.error(FsckError::AllocatedBytesMismatch(
612 observed_owner_allocated_bytes.iter().map(|(k, v)| (*k, *v)).collect(),
613 owner_allocated_bytes.iter().map(|(k, v)| (*k, *v)).collect(),
614 ))?;
615 }
616 for (k, v) in allocator.owner_byte_limits() {
617 if !owner_allocated_bytes.contains_key(&k) {
618 self.warning(FsckWarning::LimitForNonExistentStore(k, v))?;
619 }
620 }
621 Ok(())
622 }
623}