1mod bootstrap_handle;
20mod checksum_list;
21mod reader;
22pub mod super_block;
23mod writer;
24
25use crate::checksum::{Checksum, Checksums, ChecksumsV38};
26use crate::errors::FxfsError;
27use crate::filesystem::{ApplyContext, ApplyMode, FxFilesystem, SyncOptions};
28use crate::log::*;
29use crate::lsm_tree::cache::NullCache;
30use crate::lsm_tree::types::Layer;
31use crate::object_handle::{ObjectHandle as _, ReadObjectHandle};
32use crate::object_store::allocator::Allocator;
33use crate::object_store::data_object_handle::OverwriteOptions;
34use crate::object_store::extent_record::{
35 DEFAULT_DATA_ATTRIBUTE_ID, ExtentKey, ExtentMode, ExtentValue,
36};
37use crate::object_store::graveyard::Graveyard;
38use crate::object_store::journal::bootstrap_handle::BootstrapObjectHandle;
39use crate::object_store::journal::checksum_list::ChecksumList;
40use crate::object_store::journal::reader::{JournalReader, ReadResult};
41use crate::object_store::journal::super_block::{
42 MIN_SUPER_BLOCK_SIZE, SuperBlockHeader, SuperBlockInstance, SuperBlockManager,
43};
44use crate::object_store::journal::writer::JournalWriter;
45use crate::object_store::object_manager::ObjectManager;
46use crate::object_store::object_record::{AttributeKey, ObjectKey, ObjectKeyData, ObjectValue};
47use crate::object_store::transaction::{
48 AllocatorMutation, LockKey, Mutation, MutationV40, MutationV41, MutationV43, MutationV46,
49 MutationV47, MutationV49, MutationV50, ObjectStoreMutation, Options,
50 TRANSACTION_MAX_JOURNAL_USAGE, Transaction, TxnMutation, lock_keys,
51};
52use crate::object_store::{
53 AssocObj, DataObjectHandle, HandleOptions, HandleOwner, INVALID_OBJECT_ID, Item, ItemRef,
54 NewChildStoreOptions, ObjectStore,
55};
56use crate::range::RangeExt;
57use crate::round::{round_div, round_down};
58use crate::serialized_types::{LATEST_VERSION, Migrate, Version, Versioned, migrate_to_version};
59use anyhow::{Context, Error, anyhow, bail, ensure};
60use event_listener::Event;
61use fprint::TypeFingerprint;
62use fuchsia_sync::Mutex;
63use futures::FutureExt as _;
64use futures::future::poll_fn;
65use rustc_hash::FxHashMap as HashMap;
66use serde::{Deserialize, Serialize};
67use static_assertions::const_assert;
68use std::clone::Clone;
69use std::collections::HashSet;
70use std::ops::{Bound, Range};
71use std::sync::atomic::{AtomicBool, Ordering};
72use std::sync::{Arc, OnceLock};
73use std::task::{Poll, Waker};
74use storage_device::Device;
75
76pub const BLOCK_SIZE: u64 = 4096;
78
79const CHUNK_SIZE: u64 = 131_072;
81const_assert!(CHUNK_SIZE > TRANSACTION_MAX_JOURNAL_USAGE);
82
83pub const DEFAULT_RECLAIM_SIZE: u64 = 262_144;
85
86pub const RESERVED_SPACE: u64 = 1_048_576;
89
90const RESET_XOR: u64 = 0xffffffffffffffff;
95
96pub type JournalCheckpoint = JournalCheckpointV32;
100
101#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize, TypeFingerprint)]
102pub struct JournalCheckpointV32 {
103 pub file_offset: u64,
104
105 pub checksum: Checksum,
108
109 pub version: Version,
113}
114
115pub type JournalRecord = JournalRecordV50;
116
117#[allow(clippy::large_enum_variant)]
118#[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint, Versioned)]
119#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
120pub enum JournalRecordV50 {
121 EndBlock,
123 Mutation { object_id: u64, mutation: MutationV50 },
126 Commit,
128 Discard(u64),
130 DidFlushDevice(u64),
140 DataChecksums(Range<u64>, ChecksumsV38, bool),
148}
149
150#[allow(clippy::large_enum_variant)]
151#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
152#[migrate_to_version(JournalRecordV50)]
153pub enum JournalRecordV49 {
154 EndBlock,
155 Mutation { object_id: u64, mutation: MutationV49 },
156 Commit,
157 Discard(u64),
158 DidFlushDevice(u64),
159 DataChecksums(Range<u64>, ChecksumsV38, bool),
160}
161
162#[allow(clippy::large_enum_variant)]
163#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
164#[migrate_to_version(JournalRecordV49)]
165pub enum JournalRecordV47 {
166 EndBlock,
167 Mutation { object_id: u64, mutation: MutationV47 },
168 Commit,
169 Discard(u64),
170 DidFlushDevice(u64),
171 DataChecksums(Range<u64>, ChecksumsV38, bool),
172}
173
174#[allow(clippy::large_enum_variant)]
175#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
176#[migrate_to_version(JournalRecordV47)]
177pub enum JournalRecordV46 {
178 EndBlock,
179 Mutation { object_id: u64, mutation: MutationV46 },
180 Commit,
181 Discard(u64),
182 DidFlushDevice(u64),
183 DataChecksums(Range<u64>, ChecksumsV38, bool),
184}
185
186#[allow(clippy::large_enum_variant)]
187#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
188#[migrate_to_version(JournalRecordV46)]
189pub enum JournalRecordV43 {
190 EndBlock,
191 Mutation { object_id: u64, mutation: MutationV43 },
192 Commit,
193 Discard(u64),
194 DidFlushDevice(u64),
195 DataChecksums(Range<u64>, ChecksumsV38, bool),
196}
197
198#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
199#[migrate_to_version(JournalRecordV43)]
200pub enum JournalRecordV42 {
201 EndBlock,
202 Mutation { object_id: u64, mutation: MutationV41 },
203 Commit,
204 Discard(u64),
205 DidFlushDevice(u64),
206 DataChecksums(Range<u64>, ChecksumsV38, bool),
207}
208
209#[derive(Serialize, Deserialize, TypeFingerprint, Versioned)]
210pub enum JournalRecordV41 {
211 EndBlock,
212 Mutation { object_id: u64, mutation: MutationV41 },
213 Commit,
214 Discard(u64),
215 DidFlushDevice(u64),
216 DataChecksums(Range<u64>, ChecksumsV38),
217}
218
219impl From<JournalRecordV41> for JournalRecordV42 {
220 fn from(record: JournalRecordV41) -> Self {
221 match record {
222 JournalRecordV41::EndBlock => Self::EndBlock,
223 JournalRecordV41::Mutation { object_id, mutation } => {
224 Self::Mutation { object_id, mutation: mutation.into() }
225 }
226 JournalRecordV41::Commit => Self::Commit,
227 JournalRecordV41::Discard(offset) => Self::Discard(offset),
228 JournalRecordV41::DidFlushDevice(offset) => Self::DidFlushDevice(offset),
229 JournalRecordV41::DataChecksums(range, sums) => {
230 Self::DataChecksums(range, sums, true)
233 }
234 }
235 }
236}
237
238#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
239#[migrate_to_version(JournalRecordV41)]
240pub enum JournalRecordV40 {
241 EndBlock,
242 Mutation { object_id: u64, mutation: MutationV40 },
243 Commit,
244 Discard(u64),
245 DidFlushDevice(u64),
246 DataChecksums(Range<u64>, ChecksumsV38),
247}
248
249pub(super) fn journal_handle_options() -> HandleOptions {
250 HandleOptions { skip_journal_checks: true, ..Default::default() }
251}
252
253pub struct Journal {
259 objects: Arc<ObjectManager>,
260 handle: OnceLock<DataObjectHandle<ObjectStore>>,
261 super_block_manager: SuperBlockManager,
262 inner: Mutex<Inner>,
263 writer_mutex: Mutex<()>,
264 sync_mutex: futures::lock::Mutex<()>,
265 trace: AtomicBool,
266
267 reclaim_event: Event,
269}
270
271struct Inner {
272 super_block_header: SuperBlockHeader,
273
274 zero_offset: Option<u64>,
276
277 device_flushed_offset: u64,
279
280 needs_did_flush_device: bool,
282
283 writer: JournalWriter,
285
286 output_reset_version: bool,
289
290 flush_waker: Option<Waker>,
292
293 terminate: bool,
295
296 terminate_reason: Option<Error>,
298
299 disable_compactions: bool,
301
302 compaction_running: bool,
304
305 sync_waker: Option<Waker>,
307
308 flushed_offset: u64,
310
311 valid_to: u64,
316
317 discard_offset: Option<u64>,
321
322 reclaim_size: u64,
326
327 image_builder_mode: Option<SuperBlockInstance>,
328
329 barriers_enabled: bool,
333
334 needs_barrier: bool,
337}
338
339impl Inner {
340 fn terminate(&mut self, reason: Option<Error>) {
341 self.terminate = true;
342
343 if let Some(err) = reason {
344 error!(error:? = err; "Terminating journal");
345 if let Some(prev_err) = self.terminate_reason.as_ref() {
347 error!(error:? = prev_err; "Journal previously terminated");
348 } else {
349 self.terminate_reason = Some(err);
350 }
351 }
352
353 if let Some(waker) = self.flush_waker.take() {
354 waker.wake();
355 }
356 if let Some(waker) = self.sync_waker.take() {
357 waker.wake();
358 }
359 }
360}
361
362pub struct JournalOptions {
363 pub reclaim_size: u64,
367
368 pub barriers_enabled: bool,
372}
373
374impl Default for JournalOptions {
375 fn default() -> Self {
376 JournalOptions { reclaim_size: DEFAULT_RECLAIM_SIZE, barriers_enabled: false }
377 }
378}
379
380struct JournaledTransactions {
381 transactions: Vec<JournaledTransaction>,
382 device_flushed_offset: u64,
383}
384
385#[derive(Debug, Default)]
386pub struct JournaledTransaction {
387 pub checkpoint: JournalCheckpoint,
388 pub root_parent_mutations: Vec<Mutation>,
389 pub root_mutations: Vec<Mutation>,
390 pub non_root_mutations: Vec<(u64, Mutation)>,
392 pub end_offset: u64,
393 pub checksums: Vec<JournaledChecksums>,
394
395 pub end_flush: Option<(u64, u64)>,
399
400 pub volume_deleted: Option<u64>,
402}
403
404impl JournaledTransaction {
405 fn new(checkpoint: JournalCheckpoint) -> Self {
406 Self { checkpoint, ..Default::default() }
407 }
408}
409
410const VOLUME_DELETED: u64 = u64::MAX;
411
412#[derive(Debug)]
413pub struct JournaledChecksums {
414 pub device_range: Range<u64>,
415 pub checksums: Checksums,
416 pub first_write: bool,
417}
418
419pub trait JournalHandle: ReadObjectHandle {
422 fn end_offset(&self) -> Option<u64>;
426 fn push_extent(&mut self, added_offset: u64, device_range: Range<u64>);
430 fn discard_extents(&mut self, discard_offset: u64);
432}
433
434impl<S: HandleOwner> JournalHandle for DataObjectHandle<S> {
438 fn end_offset(&self) -> Option<u64> {
439 None
440 }
441 fn push_extent(&mut self, _added_offset: u64, _device_range: Range<u64>) {
442 }
444 fn discard_extents(&mut self, _discard_offset: u64) {
445 }
447}
448
449#[fxfs_trace::trace]
450impl Journal {
451 pub fn new(objects: Arc<ObjectManager>, options: JournalOptions) -> Journal {
452 let starting_checksum = rand::random_range(1..u64::MAX);
453 Journal {
454 objects: objects,
455 handle: OnceLock::new(),
456 super_block_manager: SuperBlockManager::new(),
457 inner: Mutex::new(Inner {
458 super_block_header: SuperBlockHeader::default(),
459 zero_offset: None,
460 device_flushed_offset: 0,
461 needs_did_flush_device: false,
462 writer: JournalWriter::new(BLOCK_SIZE as usize, starting_checksum),
463 output_reset_version: false,
464 flush_waker: None,
465 terminate: false,
466 terminate_reason: None,
467 disable_compactions: false,
468 compaction_running: false,
469 sync_waker: None,
470 flushed_offset: 0,
471 valid_to: 0,
472 discard_offset: None,
473 reclaim_size: options.reclaim_size,
474 image_builder_mode: None,
475 barriers_enabled: options.barriers_enabled,
476 needs_barrier: false,
477 }),
478 writer_mutex: Mutex::new(()),
479 sync_mutex: futures::lock::Mutex::new(()),
480 trace: AtomicBool::new(false),
481 reclaim_event: Event::new(),
482 }
483 }
484
485 pub fn set_trace(&self, trace: bool) {
486 let old_value = self.trace.swap(trace, Ordering::Relaxed);
487 if trace != old_value {
488 info!(trace; "J: trace");
489 }
490 }
491
492 pub fn set_image_builder_mode(&self, mode: Option<SuperBlockInstance>) {
493 self.inner.lock().image_builder_mode = mode;
494 if let Some(instance) = mode {
495 *self.super_block_manager.next_instance.lock() = instance;
496 }
497 }
498
499 pub fn image_builder_mode(&self) -> Option<SuperBlockInstance> {
500 self.inner.lock().image_builder_mode
501 }
502
503 #[cfg(feature = "migration")]
504 pub fn set_filesystem_uuid(&self, uuid: &[u8; 16]) -> Result<(), Error> {
505 ensure!(
506 self.inner.lock().image_builder_mode.is_some(),
507 "Can only set filesystem uuid in image builder mode."
508 );
509 self.inner.lock().super_block_header.guid.0 = uuid::Uuid::from_bytes(*uuid);
510 Ok(())
511 }
512
513 pub(crate) async fn read_superblocks(
514 &self,
515 device: Arc<dyn Device>,
516 block_size: u64,
517 ) -> Result<(SuperBlockHeader, ObjectStore), Error> {
518 self.super_block_manager.load(device, block_size).await
519 }
520
521 fn validate_mutation(&self, mutation: &Mutation, block_size: u64, device_size: u64) -> bool {
525 match mutation {
526 Mutation::ObjectStore(ObjectStoreMutation {
527 item:
528 Item {
529 key:
530 ObjectKey {
531 data:
532 ObjectKeyData::Attribute(
533 _,
534 AttributeKey::Extent(ExtentKey { range }),
535 ),
536 ..
537 },
538 value: ObjectValue::Extent(ExtentValue::Some { device_offset, mode, .. }),
539 ..
540 },
541 ..
542 }) => {
543 if range.is_empty() || !range.is_aligned(block_size) {
544 return false;
545 }
546 let len = range.length().unwrap();
547 if let ExtentMode::Cow(checksums) = mode {
548 if checksums.len() > 0 {
549 if len % checksums.len() as u64 != 0 {
550 return false;
551 }
552 if (len / checksums.len() as u64) % block_size != 0 {
553 return false;
554 }
555 }
556 }
557 if *device_offset % block_size != 0
558 || *device_offset >= device_size
559 || device_size - *device_offset < len
560 {
561 return false;
562 }
563 }
564 Mutation::ObjectStore(_) => {}
565 Mutation::EncryptedObjectStore(_) => {}
566 Mutation::Allocator(AllocatorMutation::Allocate { device_range, owner_object_id }) => {
567 return !device_range.is_empty()
568 && *owner_object_id != INVALID_OBJECT_ID
569 && device_range.end <= device_size;
570 }
571 Mutation::Allocator(AllocatorMutation::Deallocate {
572 device_range,
573 owner_object_id,
574 }) => {
575 return !device_range.is_empty()
576 && *owner_object_id != INVALID_OBJECT_ID
577 && device_range.end <= device_size;
578 }
579 Mutation::Allocator(AllocatorMutation::MarkForDeletion(owner_object_id)) => {
580 return *owner_object_id != INVALID_OBJECT_ID;
581 }
582 Mutation::Allocator(AllocatorMutation::SetLimit { owner_object_id, .. }) => {
583 return *owner_object_id != INVALID_OBJECT_ID;
584 }
585 Mutation::BeginFlush => {}
586 Mutation::EndFlush => {}
587 Mutation::DeleteVolume => {}
588 Mutation::UpdateBorrowed(_) => {}
589 Mutation::UpdateMutationsKey(_) => {}
590 Mutation::CreateInternalDir(owner_object_id) => {
591 return *owner_object_id != INVALID_OBJECT_ID;
592 }
593 }
594 true
595 }
596
597 fn update_checksum_list(
599 &self,
600 journal_offset: u64,
601 mutation: &Mutation,
602 checksum_list: &mut ChecksumList,
603 ) -> Result<(), Error> {
604 match mutation {
605 Mutation::ObjectStore(_) => {}
606 Mutation::Allocator(AllocatorMutation::Deallocate { device_range, .. }) => {
607 checksum_list.mark_deallocated(journal_offset, device_range.clone().into());
608 }
609 _ => {}
610 }
611 Ok(())
612 }
613
614 #[trace]
616 pub async fn replay(
617 &self,
618 filesystem: Arc<FxFilesystem>,
619 on_new_allocator: Option<Box<dyn Fn(Arc<Allocator>) + Send + Sync>>,
620 ) -> Result<(), Error> {
621 let block_size = filesystem.block_size();
622
623 let (super_block, root_parent) =
624 self.super_block_manager.load(filesystem.device(), block_size).await?;
625
626 let root_parent = Arc::new(ObjectStore::attach_filesystem(root_parent, filesystem.clone()));
627
628 self.objects.set_root_parent_store(root_parent.clone());
629 let allocator =
630 Arc::new(Allocator::new(filesystem.clone(), super_block.allocator_object_id));
631 if let Some(on_new_allocator) = on_new_allocator {
632 on_new_allocator(allocator.clone());
633 }
634 self.objects.set_allocator(allocator.clone());
635 self.objects.set_borrowed_metadata_space(super_block.borrowed_metadata_space);
636 self.objects.set_last_end_offset(super_block.super_block_journal_file_offset);
637 {
638 let mut inner = self.inner.lock();
639 inner.super_block_header = super_block.clone();
640 }
641
642 let device = filesystem.device();
643
644 let mut handle;
645 {
646 let root_parent_layer = root_parent.tree().mutable_layer();
647 let mut iter = root_parent_layer
648 .seek(Bound::Included(&ObjectKey::attribute(
649 super_block.journal_object_id,
650 DEFAULT_DATA_ATTRIBUTE_ID,
651 AttributeKey::Extent(ExtentKey::search_key_from_offset(round_down(
652 super_block.journal_checkpoint.file_offset,
653 BLOCK_SIZE,
654 ))),
655 )))
656 .await
657 .context("Failed to seek root parent store")?;
658 let start_offset = if let Some(ItemRef {
659 key:
660 ObjectKey {
661 data:
662 ObjectKeyData::Attribute(
663 DEFAULT_DATA_ATTRIBUTE_ID,
664 AttributeKey::Extent(ExtentKey { range }),
665 ),
666 ..
667 },
668 ..
669 }) = iter.get()
670 {
671 range.start
672 } else {
673 0
674 };
675 handle = BootstrapObjectHandle::new_with_start_offset(
676 super_block.journal_object_id,
677 device.clone(),
678 start_offset,
679 );
680 while let Some(item) = iter.get() {
681 if !match item.into() {
682 Some((
683 object_id,
684 DEFAULT_DATA_ATTRIBUTE_ID,
685 ExtentKey { range },
686 ExtentValue::Some { device_offset, .. },
687 )) if object_id == super_block.journal_object_id => {
688 if let Some(end_offset) = handle.end_offset() {
689 if range.start != end_offset {
690 bail!(anyhow!(FxfsError::Inconsistent).context(format!(
691 "Unexpected journal extent {:?}, expected start: {}",
692 item, end_offset
693 )));
694 }
695 }
696 handle.push_extent(
697 0, *device_offset
699 ..*device_offset + range.length().context("Invalid extent")?,
700 );
701 true
702 }
703 _ => false,
704 } {
705 break;
706 }
707 iter.advance().await.context("Failed to advance root parent store iterator")?;
708 }
709 }
710
711 let mut reader = JournalReader::new(handle, &super_block.journal_checkpoint);
712 let JournaledTransactions { mut transactions, device_flushed_offset } = self
713 .read_transactions(&mut reader, None, INVALID_OBJECT_ID)
714 .await
715 .context("Reading transactions for replay")?;
716
717 let mut checksum_list = ChecksumList::new(device_flushed_offset);
719 let mut valid_to = reader.journal_file_checkpoint().file_offset;
720 let device_size = device.size();
721 'bad_replay: for JournaledTransaction {
722 checkpoint,
723 root_parent_mutations,
724 root_mutations,
725 non_root_mutations,
726 checksums,
727 ..
728 } in &transactions
729 {
730 for JournaledChecksums { device_range, checksums, first_write } in checksums {
731 checksum_list
732 .push(
733 checkpoint.file_offset,
734 device_range.clone(),
735 checksums.maybe_as_ref().context("Malformed checksums")?,
736 *first_write,
737 )
738 .context("Pushing journal checksum records to checksum list")?;
739 }
740 for mutation in root_parent_mutations
741 .iter()
742 .chain(root_mutations)
743 .chain(non_root_mutations.iter().map(|(_, m)| m))
744 {
745 if !self.validate_mutation(mutation, block_size, device_size) {
746 info!(mutation:?; "Stopping replay at bad mutation");
747 valid_to = checkpoint.file_offset;
748 break 'bad_replay;
749 }
750 self.update_checksum_list(checkpoint.file_offset, &mutation, &mut checksum_list)?;
751 }
752 }
753
754 let valid_to = checksum_list
757 .verify(device.as_ref(), valid_to)
758 .await
759 .context("Failed to validate checksums")?;
760
761 let mut last_checkpoint = reader.journal_file_checkpoint();
764 let mut journal_offsets = super_block.journal_file_offsets.clone();
765
766 for (
769 index,
770 JournaledTransaction {
771 checkpoint,
772 root_parent_mutations,
773 end_flush,
774 volume_deleted,
775 ..
776 },
777 ) in transactions.iter_mut().enumerate()
778 {
779 if checkpoint.file_offset >= valid_to {
780 last_checkpoint = checkpoint.clone();
781
782 transactions.truncate(index);
784 break;
785 }
786
787 let context = ApplyContext { mode: ApplyMode::Replay, checkpoint: checkpoint.clone() };
788 for mutation in root_parent_mutations.drain(..) {
789 self.objects
790 .apply_mutation(
791 super_block.root_parent_store_object_id,
792 mutation,
793 &context,
794 AssocObj::None,
795 )
796 .context("Failed to replay root parent store mutations")?;
797 }
798
799 if let Some((object_id, journal_offset)) = end_flush {
800 journal_offsets.insert(*object_id, *journal_offset);
801 }
802
803 if let Some(object_id) = volume_deleted {
804 journal_offsets.insert(*object_id, VOLUME_DELETED);
805 }
806 }
807
808 let root_store = ObjectStore::open(
810 &root_parent,
811 super_block.root_store_object_id,
812 Box::new(NullCache {}),
813 )
814 .await
815 .context("Unable to open root store")?;
816
817 ensure!(
818 !root_store.is_encrypted(),
819 anyhow!(FxfsError::Inconsistent).context("Root store is encrypted")
820 );
821 self.objects.set_root_store(root_store);
822
823 let root_store_offset =
824 journal_offsets.get(&super_block.root_store_object_id).copied().unwrap_or(0);
825
826 for JournaledTransaction { checkpoint, root_mutations, .. } in &mut transactions {
828 if checkpoint.file_offset < root_store_offset {
829 continue;
830 }
831
832 let context = ApplyContext { mode: ApplyMode::Replay, checkpoint: checkpoint.clone() };
833 for mutation in root_mutations.drain(..) {
834 self.objects
835 .apply_mutation(
836 super_block.root_store_object_id,
837 mutation,
838 &context,
839 AssocObj::None,
840 )
841 .context("Failed to replay root store mutations")?;
842 }
843 }
844
845 allocator.open().await.context("Failed to open allocator")?;
847
848 for JournaledTransaction { checkpoint, non_root_mutations, end_offset, .. } in transactions
850 {
851 self.objects
852 .replay_mutations(
853 non_root_mutations,
854 &journal_offsets,
855 &ApplyContext { mode: ApplyMode::Replay, checkpoint },
856 end_offset,
857 )
858 .await
859 .context("Failed to replay mutations")?;
860 }
861
862 allocator.on_replay_complete().await.context("Failed to complete replay for allocator")?;
863
864 let discarded_to =
865 if last_checkpoint.file_offset != reader.journal_file_checkpoint().file_offset {
866 Some(reader.journal_file_checkpoint().file_offset)
867 } else {
868 None
869 };
870
871 {
873 if last_checkpoint.file_offset < super_block.super_block_journal_file_offset {
874 return Err(anyhow!(FxfsError::Inconsistent).context(format!(
875 "journal replay cut short; journal finishes at {}, but super-block was \
876 written at {}",
877 last_checkpoint.file_offset, super_block.super_block_journal_file_offset
878 )));
879 }
880 let handle = ObjectStore::open_object(
881 &root_parent,
882 super_block.journal_object_id,
883 journal_handle_options(),
884 None,
885 )
886 .await
887 .with_context(|| {
888 format!(
889 "Failed to open journal file (object id: {})",
890 super_block.journal_object_id
891 )
892 })?;
893 let _ = self.handle.set(handle);
894 let mut inner = self.inner.lock();
895 reader.skip_to_end_of_block();
896 let mut writer_checkpoint = reader.journal_file_checkpoint();
897
898 std::mem::drop(reader);
900
901 writer_checkpoint.checksum ^= RESET_XOR;
903 writer_checkpoint.version = LATEST_VERSION;
904 inner.flushed_offset = writer_checkpoint.file_offset;
905
906 inner.device_flushed_offset = inner.flushed_offset;
908
909 inner.writer.seek(writer_checkpoint);
910 inner.output_reset_version = true;
911 inner.valid_to = last_checkpoint.file_offset;
912 if last_checkpoint.file_offset < inner.flushed_offset {
913 inner.discard_offset = Some(last_checkpoint.file_offset);
914 }
915 }
916
917 self.objects
918 .on_replay_complete()
919 .await
920 .context("Failed to complete replay for object manager")?;
921
922 info!(checkpoint = last_checkpoint.file_offset, discarded_to; "replay complete");
923 Ok(())
924 }
925
926 async fn read_transactions(
927 &self,
928 reader: &mut JournalReader,
929 end_offset: Option<u64>,
930 object_id_filter: u64,
931 ) -> Result<JournaledTransactions, Error> {
932 let mut transactions = Vec::new();
933 let (mut device_flushed_offset, root_parent_store_object_id, root_store_object_id) = {
934 let super_block = &self.inner.lock().super_block_header;
935 (
936 super_block.super_block_journal_file_offset,
937 super_block.root_parent_store_object_id,
938 super_block.root_store_object_id,
939 )
940 };
941 let mut current_transaction = None;
942 let mut begin_flush_offsets = HashMap::default();
943 let mut stores_deleted = HashSet::new();
944 loop {
945 let checkpoint = reader.journal_file_checkpoint();
947 if let Some(end_offset) = end_offset {
948 if checkpoint.file_offset >= end_offset {
949 break;
950 }
951 }
952 let result =
953 reader.deserialize().await.context("Failed to deserialize journal record")?;
954 match result {
955 ReadResult::Reset(_) => {
956 if current_transaction.is_some() {
957 current_transaction = None;
958 transactions.pop();
959 }
960 let offset = reader.journal_file_checkpoint().file_offset;
961 if offset > device_flushed_offset {
962 device_flushed_offset = offset;
963 }
964 }
965 ReadResult::Some(record) => {
966 match record {
967 JournalRecord::EndBlock => {
968 reader.skip_to_end_of_block();
969 }
970 JournalRecord::Mutation { object_id, mutation } => {
971 let current_transaction = match current_transaction.as_mut() {
972 None => {
973 transactions.push(JournaledTransaction::new(checkpoint));
974 current_transaction = transactions.last_mut();
975 current_transaction.as_mut().unwrap()
976 }
977 Some(transaction) => transaction,
978 };
979
980 if stores_deleted.contains(&object_id) {
981 bail!(
982 anyhow!(FxfsError::Inconsistent)
983 .context("Encountered mutations for deleted store")
984 );
985 }
986
987 match &mutation {
988 Mutation::BeginFlush => {
989 begin_flush_offsets.insert(
990 object_id,
991 current_transaction.checkpoint.file_offset,
992 );
993 }
994 Mutation::EndFlush => {
995 if let Some(offset) = begin_flush_offsets.remove(&object_id) {
996 if let Some(deleted_volume) =
997 ¤t_transaction.volume_deleted
998 {
999 if *deleted_volume == object_id {
1000 bail!(anyhow!(FxfsError::Inconsistent).context(
1001 "Multiple EndFlush/DeleteVolume mutations in a \
1002 single transaction for the same object"
1003 ));
1004 }
1005 }
1006 if current_transaction
1009 .end_flush
1010 .replace((object_id, offset + 1))
1011 .is_some()
1012 {
1013 bail!(anyhow!(FxfsError::Inconsistent).context(
1014 "Multiple EndFlush mutations in a \
1015 single transaction"
1016 ));
1017 }
1018 }
1019 }
1020 Mutation::DeleteVolume => {
1021 if let Some((flushed_object, _)) =
1022 ¤t_transaction.end_flush
1023 {
1024 if *flushed_object == object_id {
1025 bail!(anyhow!(FxfsError::Inconsistent).context(
1026 "Multiple EndFlush/DeleteVolume mutations in a \
1027 single transaction for the same object"
1028 ));
1029 }
1030 }
1031 if current_transaction
1032 .volume_deleted
1033 .replace(object_id)
1034 .is_some()
1035 {
1036 bail!(anyhow!(FxfsError::Inconsistent).context(
1037 "Multiple DeleteVolume mutations in a single \
1038 transaction"
1039 ));
1040 }
1041 stores_deleted.insert(object_id);
1042 }
1043 _ => {}
1044 }
1045
1046 if (object_id_filter == INVALID_OBJECT_ID
1049 || object_id_filter == object_id)
1050 && self.should_apply(object_id, ¤t_transaction.checkpoint)
1051 {
1052 if object_id == root_parent_store_object_id {
1053 current_transaction.root_parent_mutations.push(mutation);
1054 } else if object_id == root_store_object_id {
1055 current_transaction.root_mutations.push(mutation);
1056 } else {
1057 current_transaction
1058 .non_root_mutations
1059 .push((object_id, mutation));
1060 }
1061 }
1062 }
1063 JournalRecord::DataChecksums(device_range, checksums, first_write) => {
1064 let current_transaction = match current_transaction.as_mut() {
1065 None => {
1066 transactions.push(JournaledTransaction::new(checkpoint));
1067 current_transaction = transactions.last_mut();
1068 current_transaction.as_mut().unwrap()
1069 }
1070 Some(transaction) => transaction,
1071 };
1072 current_transaction.checksums.push(JournaledChecksums {
1073 device_range,
1074 checksums,
1075 first_write,
1076 });
1077 }
1078 JournalRecord::Commit => {
1079 if let Some(&mut JournaledTransaction {
1080 ref checkpoint,
1081 ref root_parent_mutations,
1082 ref mut end_offset,
1083 ..
1084 }) = current_transaction.take()
1085 {
1086 for mutation in root_parent_mutations {
1087 if let Mutation::ObjectStore(ObjectStoreMutation {
1091 item:
1092 Item {
1093 key:
1094 ObjectKey {
1095 object_id,
1096 data:
1097 ObjectKeyData::Attribute(
1098 DEFAULT_DATA_ATTRIBUTE_ID,
1099 AttributeKey::Extent(ExtentKey {
1100 range,
1101 }),
1102 ),
1103 ..
1104 },
1105 value:
1106 ObjectValue::Extent(ExtentValue::Some {
1107 device_offset,
1108 ..
1109 }),
1110 ..
1111 },
1112 ..
1113 }) = mutation
1114 {
1115 let handle = reader.handle();
1118 if *object_id != handle.object_id() {
1119 continue;
1120 }
1121 if let Some(end_offset) = handle.end_offset() {
1122 if range.start != end_offset {
1123 bail!(anyhow!(FxfsError::Inconsistent).context(
1124 format!(
1125 "Unexpected journal extent {:?} -> {}, \
1126 expected start: {}",
1127 range, device_offset, end_offset,
1128 )
1129 ));
1130 }
1131 }
1132 handle.push_extent(
1133 checkpoint.file_offset,
1134 *device_offset
1135 ..*device_offset
1136 + range.length().context("Invalid extent")?,
1137 );
1138 }
1139 }
1140 *end_offset = reader.journal_file_checkpoint().file_offset;
1141 }
1142 }
1143 JournalRecord::Discard(offset) => {
1144 if offset == 0 {
1145 bail!(
1146 anyhow!(FxfsError::Inconsistent)
1147 .context("Invalid offset for Discard")
1148 );
1149 }
1150 if let Some(transaction) = current_transaction.as_ref() {
1151 if transaction.checkpoint.file_offset < offset {
1152 continue;
1154 }
1155 }
1156 current_transaction = None;
1157 while let Some(transaction) = transactions.last() {
1158 if transaction.checkpoint.file_offset < offset {
1159 break;
1160 }
1161 transactions.pop();
1162 }
1163 reader.handle().discard_extents(offset);
1164 }
1165 JournalRecord::DidFlushDevice(offset) => {
1166 if offset > device_flushed_offset {
1167 device_flushed_offset = offset;
1168 }
1169 }
1170 }
1171 }
1172 ReadResult::ChecksumMismatch => break,
1174 }
1175 }
1176
1177 if current_transaction.is_some() {
1179 transactions.pop();
1180 }
1181
1182 Ok(JournaledTransactions { transactions, device_flushed_offset })
1183 }
1184
1185 pub async fn init_empty(&self, filesystem: Arc<FxFilesystem>) -> Result<(), Error> {
1188 const INIT_ROOT_PARENT_STORE_OBJECT_ID: u64 = 3;
1194 const INIT_ROOT_STORE_OBJECT_ID: u64 = 4;
1195 const INIT_ALLOCATOR_OBJECT_ID: u64 = 5;
1196
1197 info!(device_size = filesystem.device().size(); "Formatting");
1198
1199 let checkpoint = JournalCheckpoint {
1200 version: LATEST_VERSION,
1201 ..self.inner.lock().writer.journal_file_checkpoint()
1202 };
1203
1204 let mut current_generation = 1;
1205 if filesystem.options().image_builder_mode.is_some() {
1206 let block_size = filesystem.device().block_size();
1211 if block_size as u64 == MIN_SUPER_BLOCK_SIZE {
1212 match self.read_superblocks(filesystem.device(), block_size.into()).await {
1213 Ok((super_block, _)) => {
1214 log::info!(
1215 "Found existing superblock with generation {}. Bumping by 1.",
1216 super_block.generation
1217 );
1218 current_generation = super_block.generation.wrapping_add(1);
1219 }
1220 Err(_) => {
1221 }
1225 }
1226 }
1227 }
1228
1229 let root_parent = ObjectStore::new_empty(
1230 None,
1231 INIT_ROOT_PARENT_STORE_OBJECT_ID,
1232 filesystem.clone(),
1233 Box::new(NullCache {}),
1234 );
1235 self.objects.set_root_parent_store(root_parent.clone());
1236
1237 let allocator = Arc::new(Allocator::new(filesystem.clone(), INIT_ALLOCATOR_OBJECT_ID));
1238 self.objects.set_allocator(allocator.clone());
1239 self.objects.init_metadata_reservation()?;
1240
1241 let journal_handle;
1242 let super_block_a_handle;
1243 let super_block_b_handle;
1244 let root_store;
1245 let mut transaction = filesystem
1246 .clone()
1247 .new_transaction(
1248 lock_keys![],
1249 Options { skip_journal_checks: true, ..Default::default() },
1250 )
1251 .await?;
1252 root_store = root_parent
1253 .new_child_store(
1254 &mut transaction,
1255 NewChildStoreOptions { object_id: INIT_ROOT_STORE_OBJECT_ID, ..Default::default() },
1256 Box::new(NullCache {}),
1257 )
1258 .await
1259 .context("new_child_store")?;
1260 self.objects.set_root_store(root_store.clone());
1261
1262 allocator.create(&mut transaction).await?;
1263
1264 super_block_a_handle = ObjectStore::create_object_with_id(
1266 &root_store,
1267 &mut transaction,
1268 SuperBlockInstance::A.object_id(),
1269 HandleOptions::default(),
1270 None,
1271 )
1272 .context("create super block")?;
1273 root_store.update_last_object_id(SuperBlockInstance::A.object_id());
1274 super_block_a_handle
1275 .extend(&mut transaction, SuperBlockInstance::A.first_extent())
1276 .await
1277 .context("extend super block")?;
1278 super_block_b_handle = ObjectStore::create_object_with_id(
1279 &root_store,
1280 &mut transaction,
1281 SuperBlockInstance::B.object_id(),
1282 HandleOptions::default(),
1283 None,
1284 )
1285 .context("create super block")?;
1286 root_store.update_last_object_id(SuperBlockInstance::B.object_id());
1287 super_block_b_handle
1288 .extend(&mut transaction, SuperBlockInstance::B.first_extent())
1289 .await
1290 .context("extend super block")?;
1291
1292 journal_handle = ObjectStore::create_object(
1294 &root_parent,
1295 &mut transaction,
1296 journal_handle_options(),
1297 None,
1298 )
1299 .await
1300 .context("create journal")?;
1301 if self.inner.lock().image_builder_mode.is_none() {
1302 let mut file_range = 0..self.chunk_size();
1303 journal_handle
1304 .preallocate_range(&mut transaction, &mut file_range)
1305 .await
1306 .context("preallocate journal")?;
1307 if file_range.start < file_range.end {
1308 bail!("preallocate_range returned too little space");
1309 }
1310 }
1311
1312 root_store.create(&mut transaction).await?;
1314
1315 root_parent
1317 .set_graveyard_directory_object_id(Graveyard::create(&mut transaction, &root_parent));
1318
1319 transaction.commit().await?;
1320
1321 self.inner.lock().super_block_header = SuperBlockHeader::new(
1322 current_generation,
1323 root_parent.store_object_id(),
1324 root_parent.graveyard_directory_object_id(),
1325 root_store.store_object_id(),
1326 allocator.object_id(),
1327 journal_handle.object_id(),
1328 checkpoint,
1329 LATEST_VERSION,
1330 );
1331
1332 let _ = self.handle.set(journal_handle);
1334 Ok(())
1335 }
1336
1337 pub async fn allocate_journal(&self) -> Result<(), Error> {
1340 let handle = self.handle.get().unwrap();
1341 let filesystem = handle.store().filesystem();
1342 let mut transaction = filesystem
1343 .clone()
1344 .new_transaction(
1345 lock_keys![LockKey::object(handle.store().store_object_id(), handle.object_id()),],
1346 Options { skip_journal_checks: true, ..Default::default() },
1347 )
1348 .await?;
1349 let mut file_range = 0..self.chunk_size();
1350 self.handle
1351 .get()
1352 .unwrap()
1353 .preallocate_range(&mut transaction, &mut file_range)
1354 .await
1355 .context("preallocate journal")?;
1356 if file_range.start < file_range.end {
1357 bail!("preallocate_range returned too little space");
1358 }
1359 transaction.commit().await?;
1360 Ok(())
1361 }
1362
1363 pub async fn init_superblocks(&self) -> Result<(), Error> {
1364 for _ in 0..2 {
1366 self.write_super_block().await?;
1367 }
1368 Ok(())
1369 }
1370
1371 pub async fn read_transactions_for_object(
1377 &self,
1378 object_id: u64,
1379 ) -> Result<Vec<JournaledTransaction>, Error> {
1380 let handle = self.handle.get().expect("No journal handle");
1381 let handle = ObjectStore::open_object(
1383 handle.owner(),
1384 handle.object_id(),
1385 journal_handle_options(),
1386 None,
1387 )
1388 .await?;
1389
1390 let checkpoint = match self.objects.journal_checkpoint(object_id) {
1391 Some(checkpoint) => checkpoint,
1392 None => return Ok(vec![]),
1393 };
1394 let mut reader = JournalReader::new(handle, &checkpoint);
1395 let end_offset = self.inner.lock().valid_to;
1398 Ok(self.read_transactions(&mut reader, Some(end_offset), object_id).await?.transactions)
1399 }
1400
1401 pub async fn commit(&self, transaction: &mut Transaction<'_>) -> Result<u64, Error> {
1403 if transaction.is_empty() {
1404 return Ok(self.inner.lock().writer.journal_file_checkpoint().file_offset);
1405 }
1406
1407 self.pre_commit(transaction).await?;
1408 Ok(self.write_and_apply_mutations(transaction))
1409 }
1410
1411 async fn pre_commit(&self, transaction: &Transaction<'_>) -> Result<(), Error> {
1414 let handle;
1415
1416 let (size, zero_offset) = {
1417 let mut inner = self.inner.lock();
1418
1419 if std::mem::take(&mut inner.output_reset_version) {
1421 LATEST_VERSION.serialize_into(&mut inner.writer)?;
1422 }
1423
1424 if let Some(discard_offset) = inner.discard_offset {
1425 JournalRecord::Discard(discard_offset).serialize_into(&mut inner.writer)?;
1426 inner.discard_offset = None;
1427 }
1428
1429 if inner.needs_did_flush_device {
1430 let offset = inner.device_flushed_offset;
1431 JournalRecord::DidFlushDevice(offset).serialize_into(&mut inner.writer)?;
1432 inner.needs_did_flush_device = false;
1433 }
1434
1435 handle = match self.handle.get() {
1436 None => return Ok(()),
1437 Some(x) => x,
1438 };
1439
1440 let file_offset = inner.writer.journal_file_checkpoint().file_offset;
1441
1442 let size = handle.get_size();
1443 let size = if file_offset + self.chunk_size() > size { Some(size) } else { None };
1444
1445 if size.is_none()
1446 && inner.zero_offset.is_none()
1447 && !self.objects.needs_borrow_for_journal(file_offset)
1448 {
1449 return Ok(());
1450 }
1451
1452 (size, inner.zero_offset)
1453 };
1454
1455 let mut transaction = handle
1456 .new_transaction_with_options(Options {
1457 skip_journal_checks: true,
1458 borrow_metadata_space: true,
1459 allocator_reservation: Some(self.objects.metadata_reservation()),
1460 txn_guard: Some(transaction.txn_guard()),
1461 ..Default::default()
1462 })
1463 .await?;
1464 if let Some(size) = size {
1465 handle
1466 .preallocate_range(&mut transaction, &mut (size..size + self.chunk_size()))
1467 .await?;
1468 }
1469 if let Some(zero_offset) = zero_offset {
1470 handle.zero(&mut transaction, 0..zero_offset).await?;
1471 }
1472
1473 self.write_and_apply_mutations(&mut transaction);
1476
1477 let mut inner = self.inner.lock();
1478
1479 if let Some(size) = size {
1482 assert!(inner.writer.journal_file_checkpoint().file_offset < size);
1483 }
1484
1485 if inner.zero_offset == zero_offset {
1486 inner.zero_offset = None;
1487 }
1488
1489 Ok(())
1490 }
1491
1492 fn should_apply(&self, object_id: u64, journal_file_checkpoint: &JournalCheckpoint) -> bool {
1496 let super_block_header = &self.inner.lock().super_block_header;
1497 let offset = super_block_header
1498 .journal_file_offsets
1499 .get(&object_id)
1500 .cloned()
1501 .unwrap_or(super_block_header.super_block_journal_file_offset);
1502 journal_file_checkpoint.file_offset >= offset
1503 }
1504
1505 async fn write_super_block(&self) -> Result<(), Error> {
1508 let root_parent_store = self.objects.root_parent_store();
1509
1510 let old_layers;
1515 let old_super_block_offset;
1516 let mut new_super_block_header;
1517 let checkpoint;
1518 let borrowed;
1519
1520 {
1521 let _sync_guard = debug_assert_not_too_long!(self.sync_mutex.lock());
1522 {
1523 let _write_guard = self.writer_mutex.lock();
1524 (checkpoint, borrowed) = self.pad_to_block()?;
1525 old_layers = super_block::compact_root_parent(&*root_parent_store)?;
1526 }
1527 self.flush_device(checkpoint.file_offset)
1528 .await
1529 .context("flush failed when writing superblock")?;
1530 }
1531
1532 new_super_block_header = self.inner.lock().super_block_header.clone();
1533
1534 old_super_block_offset = new_super_block_header.journal_checkpoint.file_offset;
1535
1536 let (journal_file_offsets, min_checkpoint) = self.objects.journal_file_offsets();
1537
1538 new_super_block_header.generation = new_super_block_header.generation.wrapping_add(1);
1539 new_super_block_header.super_block_journal_file_offset = checkpoint.file_offset;
1540 new_super_block_header.journal_checkpoint = min_checkpoint.unwrap_or(checkpoint);
1541 new_super_block_header.journal_checkpoint.version = LATEST_VERSION;
1542 new_super_block_header.journal_file_offsets = journal_file_offsets;
1543 new_super_block_header.borrowed_metadata_space = borrowed;
1544
1545 self.super_block_manager
1546 .save(
1547 new_super_block_header.clone(),
1548 self.objects.root_parent_store().filesystem(),
1549 old_layers,
1550 )
1551 .await?;
1552 {
1553 let mut inner = self.inner.lock();
1554 inner.super_block_header = new_super_block_header;
1555 inner.zero_offset = Some(round_down(old_super_block_offset, BLOCK_SIZE));
1556 }
1557
1558 Ok(())
1559 }
1560
1561 pub async fn sync(
1568 &self,
1569 options: SyncOptions<'_>,
1570 ) -> Result<Option<(JournalCheckpoint, u64)>, Error> {
1571 let _guard = debug_assert_not_too_long!(self.sync_mutex.lock());
1572
1573 let (checkpoint, borrowed) = {
1574 if let Some(precondition) = options.precondition {
1575 if !precondition() {
1576 return Ok(None);
1577 }
1578 }
1579
1580 let _guard = self.writer_mutex.lock();
1583
1584 self.pad_to_block()?
1585 };
1586
1587 if options.flush_device {
1588 self.flush_device(checkpoint.file_offset).await.context("sync: flush failed")?;
1589 }
1590
1591 Ok(Some((checkpoint, borrowed)))
1592 }
1593
1594 fn pad_to_block(&self) -> Result<(JournalCheckpoint, u64), Error> {
1598 let mut inner = self.inner.lock();
1599 let checkpoint = inner.writer.journal_file_checkpoint();
1600 if checkpoint.file_offset % BLOCK_SIZE != 0 {
1601 JournalRecord::EndBlock.serialize_into(&mut inner.writer)?;
1602 inner.writer.pad_to_block()?;
1603 if let Some(waker) = inner.flush_waker.take() {
1604 waker.wake();
1605 }
1606 }
1607 Ok((checkpoint, self.objects.borrowed_metadata_space()))
1608 }
1609
1610 async fn flush_device(&self, checkpoint_offset: u64) -> Result<(), Error> {
1611 debug_assert_not_too_long!(poll_fn(|ctx| {
1612 let mut inner = self.inner.lock();
1613 if inner.flushed_offset >= checkpoint_offset {
1614 Poll::Ready(Ok(()))
1615 } else if inner.terminate {
1616 let context = inner
1617 .terminate_reason
1618 .as_ref()
1619 .map(|e| format!("Journal closed with error: {:?}", e))
1620 .unwrap_or_else(|| "Journal closed".to_string());
1621 Poll::Ready(Err(anyhow!(FxfsError::JournalFlushError).context(context)))
1622 } else {
1623 inner.sync_waker = Some(ctx.waker().clone());
1624 Poll::Pending
1625 }
1626 }))?;
1627
1628 let needs_flush = self.inner.lock().device_flushed_offset < checkpoint_offset;
1629 if needs_flush {
1630 let trace = self.trace.load(Ordering::Relaxed);
1631 if trace {
1632 info!("J: start flush device");
1633 }
1634 self.handle.get().unwrap().flush_device().await?;
1635 if trace {
1636 info!("J: end flush device");
1637 }
1638
1639 {
1647 let mut inner = self.inner.lock();
1648 inner.device_flushed_offset = checkpoint_offset;
1649 inner.needs_did_flush_device = true;
1650 }
1651
1652 self.objects.allocator().did_flush_device(checkpoint_offset);
1655 if trace {
1656 info!("J: did flush device");
1657 }
1658 }
1659
1660 Ok(())
1661 }
1662
1663 pub fn super_block_header(&self) -> SuperBlockHeader {
1665 self.inner.lock().super_block_header.clone()
1666 }
1667
1668 pub async fn check_journal_space(&self) -> Result<(), Error> {
1670 loop {
1671 debug_assert_not_too_long!({
1672 let inner = self.inner.lock();
1673 if inner.terminate {
1674 let context = inner
1677 .terminate_reason
1678 .as_ref()
1679 .map(|e| format!("Journal closed with error: {:?}", e))
1680 .unwrap_or_else(|| "Journal closed".to_string());
1681 break Err(anyhow!(FxfsError::JournalFlushError).context(context));
1682 }
1683 if self.objects.last_end_offset()
1684 - inner.super_block_header.journal_checkpoint.file_offset
1685 < inner.reclaim_size
1686 {
1687 break Ok(());
1688 }
1689 if inner.image_builder_mode.is_some() {
1690 break Ok(());
1691 }
1692 if inner.disable_compactions {
1693 break Err(
1694 anyhow!(FxfsError::JournalFlushError).context("Compactions disabled")
1695 );
1696 }
1697 self.reclaim_event.listen()
1698 });
1699 }
1700 }
1701
1702 fn chunk_size(&self) -> u64 {
1703 CHUNK_SIZE
1704 }
1705
1706 fn write_and_apply_mutations(&self, transaction: &mut Transaction<'_>) -> u64 {
1707 let checkpoint_before;
1708 let checkpoint_after;
1709 {
1710 let _guard = self.writer_mutex.lock();
1711 checkpoint_before = {
1712 let mut inner = self.inner.lock();
1713 if transaction.includes_write() {
1714 inner.needs_barrier = true;
1715 }
1716 let checkpoint = inner.writer.journal_file_checkpoint();
1717 for TxnMutation { object_id, mutation, .. } in transaction.mutations() {
1718 self.objects.write_mutation(
1719 *object_id,
1720 mutation,
1721 Writer(*object_id, &mut inner.writer),
1722 );
1723 }
1724 checkpoint
1725 };
1726 let maybe_mutation =
1727 self.objects.apply_transaction(transaction, &checkpoint_before).expect(
1728 "apply_transaction should not fail in live mode; \
1729 filesystem will be in an inconsistent state",
1730 );
1731 checkpoint_after = {
1732 let mut inner = self.inner.lock();
1733 if let Some(mutation) = maybe_mutation {
1734 inner
1735 .writer
1736 .write_record(&JournalRecord::Mutation { object_id: 0, mutation })
1737 .unwrap();
1738 }
1739 for (device_range, checksums, first_write) in
1740 transaction.take_checksums().into_iter()
1741 {
1742 inner
1743 .writer
1744 .write_record(&JournalRecord::DataChecksums(
1745 device_range,
1746 Checksums::fletcher(checksums),
1747 first_write,
1748 ))
1749 .unwrap();
1750 }
1751 inner.writer.write_record(&JournalRecord::Commit).unwrap();
1752
1753 inner.writer.journal_file_checkpoint()
1754 };
1755 }
1756 self.objects.did_commit_transaction(
1757 transaction,
1758 &checkpoint_before,
1759 checkpoint_after.file_offset,
1760 );
1761
1762 if let Some(waker) = self.inner.lock().flush_waker.take() {
1763 waker.wake();
1764 }
1765
1766 checkpoint_before.file_offset
1767 }
1768
1769 pub async fn flush_task(self: Arc<Self>) {
1773 let mut flush_fut = None;
1774 let mut compact_fut = None;
1775 let mut flush_error = false;
1776 poll_fn(|ctx| {
1777 loop {
1778 {
1779 let mut inner = self.inner.lock();
1780 if flush_fut.is_none() && !flush_error && self.handle.get().is_some() {
1781 let flushable = inner.writer.flushable_bytes();
1782 if flushable > 0 {
1783 flush_fut = Some(Box::pin(self.flush(flushable)));
1784 }
1785 }
1786 if inner.terminate && flush_fut.is_none() && compact_fut.is_none() {
1787 return Poll::Ready(());
1788 }
1789 if compact_fut.is_none()
1793 && !inner.terminate
1794 && !inner.disable_compactions
1795 && inner.image_builder_mode.is_none()
1796 && self.objects.last_end_offset()
1797 - inner.super_block_header.journal_checkpoint.file_offset
1798 > inner.reclaim_size / 2
1799 {
1800 compact_fut = Some(Box::pin(self.compact()));
1801 inner.compaction_running = true;
1802 }
1803 inner.flush_waker = Some(ctx.waker().clone());
1804 }
1805 let mut pending = true;
1806 if let Some(fut) = flush_fut.as_mut() {
1807 if let Poll::Ready(result) = fut.poll_unpin(ctx) {
1808 if let Err(e) = result {
1809 self.inner.lock().terminate(Some(e.context("Flush error")));
1810 self.reclaim_event.notify(usize::MAX);
1811 flush_error = true;
1812 }
1813 flush_fut = None;
1814 pending = false;
1815 }
1816 }
1817 if let Some(fut) = compact_fut.as_mut() {
1818 if let Poll::Ready(result) = fut.poll_unpin(ctx) {
1819 let mut inner = self.inner.lock();
1820 if let Err(e) = result {
1821 inner.terminate(Some(e.context("Compaction error")));
1822 }
1823 compact_fut = None;
1824 inner.compaction_running = false;
1825 self.reclaim_event.notify(usize::MAX);
1826 pending = false;
1827 }
1828 }
1829 if pending {
1830 return Poll::Pending;
1831 }
1832 }
1833 })
1834 .await;
1835 }
1836
1837 async fn flush(&self, amount: usize) -> Result<(), Error> {
1838 let handle = self.handle.get().unwrap();
1839 let mut buf = handle.allocate_buffer(amount).await;
1840 let (offset, len, barrier_on_first_write) = {
1841 let mut inner = self.inner.lock();
1842 let offset = inner.writer.take_flushable(buf.as_mut());
1843 let barrier_on_first_write = inner.needs_barrier && inner.barriers_enabled;
1844 inner.needs_barrier = false;
1847 (offset, buf.len() as u64, barrier_on_first_write)
1848 };
1849 self.handle
1850 .get()
1851 .unwrap()
1852 .overwrite(
1853 offset,
1854 buf.as_mut(),
1855 OverwriteOptions { barrier_on_first_write, ..Default::default() },
1856 )
1857 .await?;
1858
1859 let mut inner = self.inner.lock();
1860 if let Some(waker) = inner.sync_waker.take() {
1861 waker.wake();
1862 }
1863 inner.flushed_offset = offset + len;
1864 inner.valid_to = inner.flushed_offset;
1865 Ok(())
1866 }
1867
1868 #[trace]
1871 pub async fn compact(&self) -> Result<(), Error> {
1872 let trace = self.trace.load(Ordering::Relaxed);
1873 debug!("Compaction starting");
1874 if trace {
1875 info!("J: start compaction");
1876 }
1877 let earliest_version = self.objects.flush().await.context("Failed to flush objects")?;
1878 self.inner.lock().super_block_header.earliest_version = earliest_version;
1879 self.write_super_block().await.context("Failed to write superblock")?;
1880 if trace {
1881 info!("J: end compaction");
1882 }
1883 debug!("Compaction finished");
1884 Ok(())
1885 }
1886
1887 pub async fn stop_compactions(&self) {
1888 loop {
1889 debug_assert_not_too_long!({
1890 let mut inner = self.inner.lock();
1891 inner.disable_compactions = true;
1892 if !inner.compaction_running {
1893 return;
1894 }
1895 self.reclaim_event.listen()
1896 });
1897 }
1898 }
1899
1900 pub fn track_statistics(self: &Arc<Self>, parent: &fuchsia_inspect::Node, name: &str) {
1903 let this = Arc::downgrade(self);
1904 parent.record_lazy_child(name, move || {
1905 let this_clone = this.clone();
1906 async move {
1907 let inspector = fuchsia_inspect::Inspector::default();
1908 if let Some(this) = this_clone.upgrade() {
1909 let (journal_min, journal_max, journal_reclaim_size) = {
1910 let inner = this.inner.lock();
1912 (
1913 round_down(
1914 inner.super_block_header.journal_checkpoint.file_offset,
1915 BLOCK_SIZE,
1916 ),
1917 inner.flushed_offset,
1918 inner.reclaim_size,
1919 )
1920 };
1921 let root = inspector.root();
1922 root.record_uint("journal_min_offset", journal_min);
1923 root.record_uint("journal_max_offset", journal_max);
1924 root.record_uint("journal_size", journal_max - journal_min);
1925 root.record_uint("journal_reclaim_size", journal_reclaim_size);
1926
1927 if let Some(x) = round_div(
1929 100 * (journal_max - journal_min),
1930 this.objects.allocator().get_disk_bytes(),
1931 ) {
1932 root.record_uint("journal_size_to_disk_size_percent", x);
1933 }
1934 }
1935 Ok(inspector)
1936 }
1937 .boxed()
1938 });
1939 }
1940
1941 pub fn terminate(&self) {
1943 self.inner.lock().terminate(None);
1944 self.reclaim_event.notify(usize::MAX);
1945 }
1946}
1947
1948pub struct Writer<'a>(u64, &'a mut JournalWriter);
1950
1951impl Writer<'_> {
1952 pub fn write(&mut self, mutation: Mutation) {
1953 self.1.write_record(&JournalRecord::Mutation { object_id: self.0, mutation }).unwrap();
1954 }
1955}
1956
1957#[cfg(test)]
1958mod tests {
1959 use crate::filesystem::{FxFilesystem, FxFilesystemBuilder, SyncOptions};
1960 use crate::fsck::fsck;
1961 use crate::object_handle::{ObjectHandle, ReadObjectHandle, WriteObjectHandle};
1962 use crate::object_store::directory::Directory;
1963 use crate::object_store::journal::SuperBlockInstance;
1964 use crate::object_store::transaction::Options;
1965 use crate::object_store::volume::root_volume;
1966 use crate::object_store::{
1967 HandleOptions, LockKey, NewChildStoreOptions, ObjectStore, StoreOptions, lock_keys,
1968 };
1969 use fuchsia_async as fasync;
1970 use fuchsia_async::MonotonicDuration;
1971 use storage_device::DeviceHolder;
1972 use storage_device::fake_device::FakeDevice;
1973
1974 const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
1975
1976 #[fuchsia::test]
1977 async fn test_replay() {
1978 const TEST_DATA: &[u8] = b"hello";
1979
1980 let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1981
1982 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1983
1984 let object_id = {
1985 let root_store = fs.root_store();
1986 let root_directory =
1987 Directory::open(&root_store, root_store.root_directory_object_id())
1988 .await
1989 .expect("open failed");
1990 let mut transaction = fs
1991 .clone()
1992 .new_transaction(
1993 lock_keys![LockKey::object(
1994 root_store.store_object_id(),
1995 root_store.root_directory_object_id(),
1996 )],
1997 Options::default(),
1998 )
1999 .await
2000 .expect("new_transaction failed");
2001 let handle = root_directory
2002 .create_child_file(&mut transaction, "test")
2003 .await
2004 .expect("create_child_file failed");
2005
2006 transaction.commit().await.expect("commit failed");
2007 let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
2008 buf.as_mut_slice().copy_from_slice(TEST_DATA);
2009 handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2010 fs.sync(SyncOptions::default()).await.expect("sync failed");
2013 handle.object_id()
2014 };
2015
2016 {
2017 fs.close().await.expect("Close failed");
2018 let device = fs.take_device().await;
2019 device.reopen(false);
2020 let fs = FxFilesystem::open(device).await.expect("open failed");
2021 let handle = ObjectStore::open_object(
2022 &fs.root_store(),
2023 object_id,
2024 HandleOptions::default(),
2025 None,
2026 )
2027 .await
2028 .expect("open_object failed");
2029 let mut buf = handle.allocate_buffer(TEST_DEVICE_BLOCK_SIZE as usize).await;
2030 assert_eq!(handle.read(0, buf.as_mut()).await.expect("read failed"), TEST_DATA.len());
2031 assert_eq!(&buf.as_slice()[..TEST_DATA.len()], TEST_DATA);
2032 fsck(fs.clone()).await.expect("fsck failed");
2033 fs.close().await.expect("Close failed");
2034 }
2035 }
2036
2037 #[fuchsia::test]
2038 async fn test_reset() {
2039 const TEST_DATA: &[u8] = b"hello";
2040
2041 let device = DeviceHolder::new(FakeDevice::new(32768, TEST_DEVICE_BLOCK_SIZE));
2042
2043 let mut object_ids = Vec::new();
2044
2045 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2046 {
2047 let root_store = fs.root_store();
2048 let root_directory =
2049 Directory::open(&root_store, root_store.root_directory_object_id())
2050 .await
2051 .expect("open failed");
2052 let mut transaction = fs
2053 .clone()
2054 .new_transaction(
2055 lock_keys![LockKey::object(
2056 root_store.store_object_id(),
2057 root_store.root_directory_object_id(),
2058 )],
2059 Options::default(),
2060 )
2061 .await
2062 .expect("new_transaction failed");
2063 let handle = root_directory
2064 .create_child_file(&mut transaction, "test")
2065 .await
2066 .expect("create_child_file failed");
2067 transaction.commit().await.expect("commit failed");
2068 let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
2069 buf.as_mut_slice().copy_from_slice(TEST_DATA);
2070 handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2071 fs.sync(SyncOptions::default()).await.expect("sync failed");
2072 object_ids.push(handle.object_id());
2073
2074 for i in 0..1000 {
2077 let mut transaction = fs
2078 .clone()
2079 .new_transaction(
2080 lock_keys![LockKey::object(
2081 root_store.store_object_id(),
2082 root_store.root_directory_object_id(),
2083 )],
2084 Options::default(),
2085 )
2086 .await
2087 .expect("new_transaction failed");
2088 let handle = root_directory
2089 .create_child_file(&mut transaction, &format!("{}", i))
2090 .await
2091 .expect("create_child_file failed");
2092 transaction.commit().await.expect("commit failed");
2093 let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
2094 buf.as_mut_slice().copy_from_slice(TEST_DATA);
2095 handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2096 object_ids.push(handle.object_id());
2097 }
2098 }
2099 fs.close().await.expect("fs close failed");
2100 let device = fs.take_device().await;
2101 device.reopen(false);
2102 let fs = FxFilesystem::open(device).await.expect("open failed");
2103 fsck(fs.clone()).await.expect("fsck failed");
2104 {
2105 let root_store = fs.root_store();
2106 for &object_id in &object_ids[0..1] {
2108 let handle = ObjectStore::open_object(
2109 &root_store,
2110 object_id,
2111 HandleOptions::default(),
2112 None,
2113 )
2114 .await
2115 .expect("open_object failed");
2116 let mut buf = handle.allocate_buffer(TEST_DEVICE_BLOCK_SIZE as usize).await;
2117 assert_eq!(
2118 handle.read(0, buf.as_mut()).await.expect("read failed"),
2119 TEST_DATA.len()
2120 );
2121 assert_eq!(&buf.as_slice()[..TEST_DATA.len()], TEST_DATA);
2122 }
2123
2124 let root_directory =
2126 Directory::open(&root_store, root_store.root_directory_object_id())
2127 .await
2128 .expect("open failed");
2129 let mut transaction = fs
2130 .clone()
2131 .new_transaction(
2132 lock_keys![LockKey::object(
2133 root_store.store_object_id(),
2134 root_store.root_directory_object_id(),
2135 )],
2136 Options::default(),
2137 )
2138 .await
2139 .expect("new_transaction failed");
2140 let handle = root_directory
2141 .create_child_file(&mut transaction, "test2")
2142 .await
2143 .expect("create_child_file failed");
2144 transaction.commit().await.expect("commit failed");
2145 let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
2146 buf.as_mut_slice().copy_from_slice(TEST_DATA);
2147 handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2148 fs.sync(SyncOptions::default()).await.expect("sync failed");
2149 object_ids.push(handle.object_id());
2150 }
2151
2152 fs.close().await.expect("close failed");
2153 let device = fs.take_device().await;
2154 device.reopen(false);
2155 let fs = FxFilesystem::open(device).await.expect("open failed");
2156 {
2157 fsck(fs.clone()).await.expect("fsck failed");
2158
2159 for &object_id in object_ids[0..1].iter().chain(object_ids.last().cloned().iter()) {
2161 let handle = ObjectStore::open_object(
2162 &fs.root_store(),
2163 object_id,
2164 HandleOptions::default(),
2165 None,
2166 )
2167 .await
2168 .unwrap_or_else(|e| {
2169 panic!("open_object failed (object_id: {}): {:?}", object_id, e)
2170 });
2171 let mut buf = handle.allocate_buffer(TEST_DEVICE_BLOCK_SIZE as usize).await;
2172 assert_eq!(
2173 handle.read(0, buf.as_mut()).await.expect("read failed"),
2174 TEST_DATA.len()
2175 );
2176 assert_eq!(&buf.as_slice()[..TEST_DATA.len()], TEST_DATA);
2177 }
2178 }
2179 fs.close().await.expect("close failed");
2180 }
2181
2182 #[fuchsia::test]
2183 async fn test_discard() {
2184 let device = {
2185 let device = DeviceHolder::new(FakeDevice::new(8192, 4096));
2186 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2187 let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
2188
2189 let store = root_volume
2190 .new_volume("test", NewChildStoreOptions::default())
2191 .await
2192 .expect("new_volume failed");
2193 let root_directory = Directory::open(&store, store.root_directory_object_id())
2194 .await
2195 .expect("open failed");
2196
2197 let mut i = 0;
2199 loop {
2200 let mut transaction = fs
2201 .clone()
2202 .new_transaction(
2203 lock_keys![LockKey::object(
2204 store.store_object_id(),
2205 store.root_directory_object_id()
2206 )],
2207 Options::default(),
2208 )
2209 .await
2210 .expect("new_transaction failed");
2211 root_directory
2212 .create_child_file(&mut transaction, &format!("a {i}"))
2213 .await
2214 .expect("create_child_file failed");
2215 if transaction.commit().await.expect("commit failed") > super::CHUNK_SIZE {
2216 break;
2217 }
2218 i += 1;
2219 }
2220
2221 fs.journal().compact().await.expect("compact failed");
2223 fs.journal().stop_compactions().await;
2224
2225 let mut i = 0;
2227 loop {
2228 let mut transaction = fs
2229 .clone()
2230 .new_transaction(
2231 lock_keys![LockKey::object(
2232 store.store_object_id(),
2233 store.root_directory_object_id()
2234 )],
2235 Options::default(),
2236 )
2237 .await
2238 .expect("new_transaction failed");
2239 root_directory
2240 .create_child_file(&mut transaction, &format!("b {i}"))
2241 .await
2242 .expect("create_child_file failed");
2243 if transaction.commit().await.expect("commit failed") > 2 * super::CHUNK_SIZE {
2244 break;
2245 }
2246 i += 1;
2247 }
2248
2249 fasync::Timer::new(MonotonicDuration::from_millis(10)).await;
2251 fs.device().snapshot().expect("snapshot failed")
2254 };
2255
2256 let fs = FxFilesystem::open(device).await.expect("open failed");
2257
2258 {
2259 let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
2260
2261 let store =
2262 root_volume.volume("test", StoreOptions::default()).await.expect("volume failed");
2263
2264 let root_directory = Directory::open(&store, store.root_directory_object_id())
2265 .await
2266 .expect("open failed");
2267
2268 let mut transaction = fs
2270 .clone()
2271 .new_transaction(
2272 lock_keys![LockKey::object(
2273 store.store_object_id(),
2274 store.root_directory_object_id()
2275 )],
2276 Options::default(),
2277 )
2278 .await
2279 .expect("new_transaction failed");
2280 root_directory
2281 .create_child_file(&mut transaction, &format!("d"))
2282 .await
2283 .expect("create_child_file failed");
2284 transaction.commit().await.expect("commit failed");
2285 }
2286
2287 fs.close().await.expect("close failed");
2288 let device = fs.take_device().await;
2289 device.reopen(false);
2290
2291 let fs = FxFilesystem::open(device).await.expect("open failed");
2292 fsck(fs.clone()).await.expect("fsck failed");
2293 fs.close().await.expect("close failed");
2294 }
2295
2296 #[fuchsia::test]
2297 async fn test_use_existing_generation() {
2298 let device = DeviceHolder::new(FakeDevice::new(8192, 4096));
2299
2300 let fs = FxFilesystemBuilder::new()
2302 .format(true)
2303 .image_builder_mode(Some(SuperBlockInstance::A))
2304 .open(device)
2305 .await
2306 .expect("open failed");
2307 let generation0 = fs.super_block_header().generation;
2308 assert_eq!(generation0, 1);
2309 fs.close().await.expect("close failed");
2310 let device = fs.take_device().await;
2311 device.reopen(false);
2312
2313 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2315 let generation1 = fs.super_block_header().generation;
2316 {
2317 let root_volume = crate::object_store::volume::root_volume(fs.clone())
2318 .await
2319 .expect("root_volume failed");
2320 root_volume
2321 .new_volume("test", crate::object_store::NewChildStoreOptions::default())
2322 .await
2323 .expect("new_volume failed");
2324 }
2325 fs.close().await.expect("close failed");
2326 let device = fs.take_device().await;
2327 device.reopen(false);
2328
2329 let fs = FxFilesystemBuilder::new()
2331 .format(true)
2332 .image_builder_mode(Some(SuperBlockInstance::A))
2333 .open(device)
2334 .await
2335 .expect("open failed");
2336 let generation2 = fs.super_block_header().generation;
2337 assert!(
2338 generation2 > generation1,
2339 "generation2 ({}) should be greater than generation1 ({})",
2340 generation2,
2341 generation1
2342 );
2343 fs.close().await.expect("close failed");
2344 }
2345}
2346
2347#[cfg(fuzz)]
2348mod fuzz {
2349 use fuzz::fuzz;
2350
2351 #[fuzz]
2352 fn fuzz_journal_bytes(input: Vec<u8>) {
2353 use crate::filesystem::FxFilesystem;
2354 use fuchsia_async as fasync;
2355 use std::io::Write;
2356 use storage_device::DeviceHolder;
2357 use storage_device::fake_device::FakeDevice;
2358
2359 fasync::SendExecutorBuilder::new().num_threads(4).build().run(async move {
2360 let device = DeviceHolder::new(FakeDevice::new(32768, 512));
2361 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2362 fs.journal().inner.lock().writer.write_all(&input).expect("write failed");
2363 fs.close().await.expect("close failed");
2364 let device = fs.take_device().await;
2365 device.reopen(false);
2366 if let Ok(fs) = FxFilesystem::open(device).await {
2367 let _ = fs.close().await;
2370 }
2371 });
2372 }
2373
2374 #[fuzz]
2375 fn fuzz_journal(input: Vec<super::JournalRecord>) {
2376 use crate::filesystem::FxFilesystem;
2377 use fuchsia_async as fasync;
2378 use storage_device::DeviceHolder;
2379 use storage_device::fake_device::FakeDevice;
2380
2381 fasync::SendExecutorBuilder::new().num_threads(4).build().run(async move {
2382 let device = DeviceHolder::new(FakeDevice::new(32768, 512));
2383 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2384 {
2385 let mut inner = fs.journal().inner.lock();
2386 for record in &input {
2387 let _ = inner.writer.write_record(record);
2388 }
2389 }
2390 fs.close().await.expect("close failed");
2391 let device = fs.take_device().await;
2392 device.reopen(false);
2393 if let Ok(fs) = FxFilesystem::open(device).await {
2394 let _ = fs.close().await;
2397 }
2398 });
2399 }
2400}