1mod bootstrap_handle;
20mod checksum_list;
21mod reader;
22pub mod super_block;
23mod writer;
24
25use crate::checksum::{Checksum, Checksums, ChecksumsV38};
26use crate::errors::FxfsError;
27use crate::filesystem::{ApplyContext, ApplyMode, FxFilesystem, SyncOptions};
28use crate::log::*;
29use crate::lsm_tree::cache::NullCache;
30use crate::lsm_tree::types::Layer;
31use crate::object_handle::{ObjectHandle as _, ReadObjectHandle};
32use crate::object_store::allocator::Allocator;
33use crate::object_store::data_object_handle::OverwriteOptions;
34use crate::object_store::extent_record::{
35 DEFAULT_DATA_ATTRIBUTE_ID, ExtentKey, ExtentMode, ExtentValue,
36};
37use crate::object_store::graveyard::Graveyard;
38use crate::object_store::journal::bootstrap_handle::BootstrapObjectHandle;
39use crate::object_store::journal::checksum_list::ChecksumList;
40use crate::object_store::journal::reader::{JournalReader, ReadResult};
41use crate::object_store::journal::super_block::{
42 SuperBlockHeader, SuperBlockInstance, SuperBlockManager,
43};
44use crate::object_store::journal::writer::JournalWriter;
45use crate::object_store::object_manager::ObjectManager;
46use crate::object_store::object_record::{AttributeKey, ObjectKey, ObjectKeyData, ObjectValue};
47use crate::object_store::transaction::{
48 AllocatorMutation, LockKey, Mutation, MutationV40, MutationV41, MutationV43, MutationV46,
49 MutationV47, MutationV49, MutationV50, ObjectStoreMutation, Options,
50 TRANSACTION_MAX_JOURNAL_USAGE, Transaction, TxnMutation, lock_keys,
51};
52use crate::object_store::{
53 AssocObj, DataObjectHandle, HandleOptions, HandleOwner, INVALID_OBJECT_ID, Item, ItemRef,
54 NewChildStoreOptions, ObjectStore,
55};
56use crate::range::RangeExt;
57use crate::round::{round_div, round_down};
58use crate::serialized_types::{LATEST_VERSION, Migrate, Version, Versioned, migrate_to_version};
59use anyhow::{Context, Error, anyhow, bail, ensure};
60use event_listener::Event;
61use fprint::TypeFingerprint;
62use fuchsia_sync::Mutex;
63use futures::FutureExt as _;
64use futures::future::poll_fn;
65use rustc_hash::FxHashMap as HashMap;
66use serde::{Deserialize, Serialize};
67use static_assertions::const_assert;
68use std::clone::Clone;
69use std::collections::HashSet;
70use std::ops::{Bound, Range};
71use std::sync::atomic::{AtomicBool, Ordering};
72use std::sync::{Arc, OnceLock};
73use std::task::{Poll, Waker};
74
75pub const BLOCK_SIZE: u64 = 4096;
77
78const CHUNK_SIZE: u64 = 131_072;
80const_assert!(CHUNK_SIZE > TRANSACTION_MAX_JOURNAL_USAGE);
81
82pub const DEFAULT_RECLAIM_SIZE: u64 = 262_144;
84
85pub const RESERVED_SPACE: u64 = 1_048_576;
88
89const RESET_XOR: u64 = 0xffffffffffffffff;
94
95pub type JournalCheckpoint = JournalCheckpointV32;
99
100#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize, TypeFingerprint)]
101pub struct JournalCheckpointV32 {
102 pub file_offset: u64,
103
104 pub checksum: Checksum,
107
108 pub version: Version,
112}
113
114pub type JournalRecord = JournalRecordV50;
115
116#[allow(clippy::large_enum_variant)]
117#[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint, Versioned)]
118#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
119pub enum JournalRecordV50 {
120 EndBlock,
122 Mutation { object_id: u64, mutation: MutationV50 },
125 Commit,
127 Discard(u64),
129 DidFlushDevice(u64),
139 DataChecksums(Range<u64>, ChecksumsV38, bool),
147}
148
149#[allow(clippy::large_enum_variant)]
150#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
151#[migrate_to_version(JournalRecordV50)]
152pub enum JournalRecordV49 {
153 EndBlock,
154 Mutation { object_id: u64, mutation: MutationV49 },
155 Commit,
156 Discard(u64),
157 DidFlushDevice(u64),
158 DataChecksums(Range<u64>, ChecksumsV38, bool),
159}
160
161#[allow(clippy::large_enum_variant)]
162#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
163#[migrate_to_version(JournalRecordV49)]
164pub enum JournalRecordV47 {
165 EndBlock,
166 Mutation { object_id: u64, mutation: MutationV47 },
167 Commit,
168 Discard(u64),
169 DidFlushDevice(u64),
170 DataChecksums(Range<u64>, ChecksumsV38, bool),
171}
172
173#[allow(clippy::large_enum_variant)]
174#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
175#[migrate_to_version(JournalRecordV47)]
176pub enum JournalRecordV46 {
177 EndBlock,
178 Mutation { object_id: u64, mutation: MutationV46 },
179 Commit,
180 Discard(u64),
181 DidFlushDevice(u64),
182 DataChecksums(Range<u64>, ChecksumsV38, bool),
183}
184
185#[allow(clippy::large_enum_variant)]
186#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
187#[migrate_to_version(JournalRecordV46)]
188pub enum JournalRecordV43 {
189 EndBlock,
190 Mutation { object_id: u64, mutation: MutationV43 },
191 Commit,
192 Discard(u64),
193 DidFlushDevice(u64),
194 DataChecksums(Range<u64>, ChecksumsV38, bool),
195}
196
197#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
198#[migrate_to_version(JournalRecordV43)]
199pub enum JournalRecordV42 {
200 EndBlock,
201 Mutation { object_id: u64, mutation: MutationV41 },
202 Commit,
203 Discard(u64),
204 DidFlushDevice(u64),
205 DataChecksums(Range<u64>, ChecksumsV38, bool),
206}
207
208#[derive(Serialize, Deserialize, TypeFingerprint, Versioned)]
209pub enum JournalRecordV41 {
210 EndBlock,
211 Mutation { object_id: u64, mutation: MutationV41 },
212 Commit,
213 Discard(u64),
214 DidFlushDevice(u64),
215 DataChecksums(Range<u64>, ChecksumsV38),
216}
217
218impl From<JournalRecordV41> for JournalRecordV42 {
219 fn from(record: JournalRecordV41) -> Self {
220 match record {
221 JournalRecordV41::EndBlock => Self::EndBlock,
222 JournalRecordV41::Mutation { object_id, mutation } => {
223 Self::Mutation { object_id, mutation: mutation.into() }
224 }
225 JournalRecordV41::Commit => Self::Commit,
226 JournalRecordV41::Discard(offset) => Self::Discard(offset),
227 JournalRecordV41::DidFlushDevice(offset) => Self::DidFlushDevice(offset),
228 JournalRecordV41::DataChecksums(range, sums) => {
229 Self::DataChecksums(range, sums, true)
232 }
233 }
234 }
235}
236
237#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
238#[migrate_to_version(JournalRecordV41)]
239pub enum JournalRecordV40 {
240 EndBlock,
241 Mutation { object_id: u64, mutation: MutationV40 },
242 Commit,
243 Discard(u64),
244 DidFlushDevice(u64),
245 DataChecksums(Range<u64>, ChecksumsV38),
246}
247
248pub(super) fn journal_handle_options() -> HandleOptions {
249 HandleOptions { skip_journal_checks: true, ..Default::default() }
250}
251
252pub struct Journal {
258 objects: Arc<ObjectManager>,
259 handle: OnceLock<DataObjectHandle<ObjectStore>>,
260 super_block_manager: SuperBlockManager,
261 inner: Mutex<Inner>,
262 writer_mutex: Mutex<()>,
263 sync_mutex: futures::lock::Mutex<()>,
264 trace: AtomicBool,
265
266 reclaim_event: Event,
268}
269
270struct Inner {
271 super_block_header: SuperBlockHeader,
272
273 zero_offset: Option<u64>,
275
276 device_flushed_offset: u64,
278
279 needs_did_flush_device: bool,
281
282 writer: JournalWriter,
284
285 output_reset_version: bool,
288
289 flush_waker: Option<Waker>,
291
292 terminate: bool,
294
295 terminate_reason: Option<Error>,
297
298 disable_compactions: bool,
300
301 compaction_running: bool,
303
304 sync_waker: Option<Waker>,
306
307 flushed_offset: u64,
309
310 valid_to: u64,
315
316 discard_offset: Option<u64>,
320
321 reclaim_size: u64,
325
326 image_builder_mode: Option<SuperBlockInstance>,
327
328 barriers_enabled: bool,
332
333 needs_barrier: bool,
336}
337
338impl Inner {
339 fn terminate(&mut self, reason: Option<Error>) {
340 self.terminate = true;
341
342 if let Some(err) = reason {
343 error!(error:? = err; "Terminating journal");
344 if let Some(prev_err) = self.terminate_reason.as_ref() {
346 error!(error:? = prev_err; "Journal previously terminated");
347 } else {
348 self.terminate_reason = Some(err);
349 }
350 }
351
352 if let Some(waker) = self.flush_waker.take() {
353 waker.wake();
354 }
355 if let Some(waker) = self.sync_waker.take() {
356 waker.wake();
357 }
358 }
359}
360
361pub struct JournalOptions {
362 pub reclaim_size: u64,
366
367 pub barriers_enabled: bool,
371}
372
373impl Default for JournalOptions {
374 fn default() -> Self {
375 JournalOptions { reclaim_size: DEFAULT_RECLAIM_SIZE, barriers_enabled: false }
376 }
377}
378
379struct JournaledTransactions {
380 transactions: Vec<JournaledTransaction>,
381 device_flushed_offset: u64,
382}
383
384#[derive(Debug, Default)]
385pub struct JournaledTransaction {
386 pub checkpoint: JournalCheckpoint,
387 pub root_parent_mutations: Vec<Mutation>,
388 pub root_mutations: Vec<Mutation>,
389 pub non_root_mutations: Vec<(u64, Mutation)>,
391 pub end_offset: u64,
392 pub checksums: Vec<JournaledChecksums>,
393
394 pub end_flush: Option<(u64, u64)>,
398
399 pub volume_deleted: Option<u64>,
401}
402
403impl JournaledTransaction {
404 fn new(checkpoint: JournalCheckpoint) -> Self {
405 Self { checkpoint, ..Default::default() }
406 }
407}
408
409const VOLUME_DELETED: u64 = u64::MAX;
410
411#[derive(Debug)]
412pub struct JournaledChecksums {
413 pub device_range: Range<u64>,
414 pub checksums: Checksums,
415 pub first_write: bool,
416}
417
418pub trait JournalHandle: ReadObjectHandle {
421 fn end_offset(&self) -> Option<u64>;
425 fn push_extent(&mut self, added_offset: u64, device_range: Range<u64>);
429 fn discard_extents(&mut self, discard_offset: u64);
431}
432
433impl<S: HandleOwner> JournalHandle for DataObjectHandle<S> {
437 fn end_offset(&self) -> Option<u64> {
438 None
439 }
440 fn push_extent(&mut self, _added_offset: u64, _device_range: Range<u64>) {
441 }
443 fn discard_extents(&mut self, _discard_offset: u64) {
444 }
446}
447
448#[fxfs_trace::trace]
449impl Journal {
450 pub fn new(objects: Arc<ObjectManager>, options: JournalOptions) -> Journal {
451 let starting_checksum = rand::random_range(1..u64::MAX);
452 Journal {
453 objects: objects,
454 handle: OnceLock::new(),
455 super_block_manager: SuperBlockManager::new(),
456 inner: Mutex::new(Inner {
457 super_block_header: SuperBlockHeader::default(),
458 zero_offset: None,
459 device_flushed_offset: 0,
460 needs_did_flush_device: false,
461 writer: JournalWriter::new(BLOCK_SIZE as usize, starting_checksum),
462 output_reset_version: false,
463 flush_waker: None,
464 terminate: false,
465 terminate_reason: None,
466 disable_compactions: false,
467 compaction_running: false,
468 sync_waker: None,
469 flushed_offset: 0,
470 valid_to: 0,
471 discard_offset: None,
472 reclaim_size: options.reclaim_size,
473 image_builder_mode: None,
474 barriers_enabled: options.barriers_enabled,
475 needs_barrier: false,
476 }),
477 writer_mutex: Mutex::new(()),
478 sync_mutex: futures::lock::Mutex::new(()),
479 trace: AtomicBool::new(false),
480 reclaim_event: Event::new(),
481 }
482 }
483
484 pub fn set_trace(&self, trace: bool) {
485 let old_value = self.trace.swap(trace, Ordering::Relaxed);
486 if trace != old_value {
487 info!(trace; "J: trace");
488 }
489 }
490
491 pub fn set_image_builder_mode(&self, mode: Option<SuperBlockInstance>) {
492 self.inner.lock().image_builder_mode = mode;
493 if let Some(instance) = mode {
494 *self.super_block_manager.next_instance.lock() = instance;
495 }
496 }
497
498 pub fn image_builder_mode(&self) -> Option<SuperBlockInstance> {
499 self.inner.lock().image_builder_mode
500 }
501
502 #[cfg(feature = "migration")]
503 pub fn set_filesystem_uuid(&self, uuid: &[u8; 16]) -> Result<(), Error> {
504 ensure!(
505 self.inner.lock().image_builder_mode.is_some(),
506 "Can only set filesystem uuid in image builder mode."
507 );
508 self.inner.lock().super_block_header.guid.0 = uuid::Uuid::from_bytes(*uuid);
509 Ok(())
510 }
511
512 fn validate_mutation(&self, mutation: &Mutation, block_size: u64, device_size: u64) -> bool {
516 match mutation {
517 Mutation::ObjectStore(ObjectStoreMutation {
518 item:
519 Item {
520 key:
521 ObjectKey {
522 data:
523 ObjectKeyData::Attribute(
524 _,
525 AttributeKey::Extent(ExtentKey { range }),
526 ),
527 ..
528 },
529 value: ObjectValue::Extent(ExtentValue::Some { device_offset, mode, .. }),
530 ..
531 },
532 ..
533 }) => {
534 if range.is_empty() || !range.is_aligned(block_size) {
535 return false;
536 }
537 let len = range.length().unwrap();
538 if let ExtentMode::Cow(checksums) = mode {
539 if checksums.len() > 0 {
540 if len % checksums.len() as u64 != 0 {
541 return false;
542 }
543 if (len / checksums.len() as u64) % block_size != 0 {
544 return false;
545 }
546 }
547 }
548 if *device_offset % block_size != 0
549 || *device_offset >= device_size
550 || device_size - *device_offset < len
551 {
552 return false;
553 }
554 }
555 Mutation::ObjectStore(_) => {}
556 Mutation::EncryptedObjectStore(_) => {}
557 Mutation::Allocator(AllocatorMutation::Allocate { device_range, owner_object_id }) => {
558 return !device_range.is_empty()
559 && *owner_object_id != INVALID_OBJECT_ID
560 && device_range.end <= device_size;
561 }
562 Mutation::Allocator(AllocatorMutation::Deallocate {
563 device_range,
564 owner_object_id,
565 }) => {
566 return !device_range.is_empty()
567 && *owner_object_id != INVALID_OBJECT_ID
568 && device_range.end <= device_size;
569 }
570 Mutation::Allocator(AllocatorMutation::MarkForDeletion(owner_object_id)) => {
571 return *owner_object_id != INVALID_OBJECT_ID;
572 }
573 Mutation::Allocator(AllocatorMutation::SetLimit { owner_object_id, .. }) => {
574 return *owner_object_id != INVALID_OBJECT_ID;
575 }
576 Mutation::BeginFlush => {}
577 Mutation::EndFlush => {}
578 Mutation::DeleteVolume => {}
579 Mutation::UpdateBorrowed(_) => {}
580 Mutation::UpdateMutationsKey(_) => {}
581 Mutation::CreateInternalDir(owner_object_id) => {
582 return *owner_object_id != INVALID_OBJECT_ID;
583 }
584 }
585 true
586 }
587
588 fn update_checksum_list(
590 &self,
591 journal_offset: u64,
592 mutation: &Mutation,
593 checksum_list: &mut ChecksumList,
594 ) -> Result<(), Error> {
595 match mutation {
596 Mutation::ObjectStore(_) => {}
597 Mutation::Allocator(AllocatorMutation::Deallocate { device_range, .. }) => {
598 checksum_list.mark_deallocated(journal_offset, device_range.clone().into());
599 }
600 _ => {}
601 }
602 Ok(())
603 }
604
605 #[trace]
607 pub async fn replay(
608 &self,
609 filesystem: Arc<FxFilesystem>,
610 on_new_allocator: Option<Box<dyn Fn(Arc<Allocator>) + Send + Sync>>,
611 ) -> Result<(), Error> {
612 let block_size = filesystem.block_size();
613
614 let (super_block, root_parent) =
615 self.super_block_manager.load(filesystem.device(), block_size).await?;
616
617 let root_parent = Arc::new(ObjectStore::attach_filesystem(root_parent, filesystem.clone()));
618
619 self.objects.set_root_parent_store(root_parent.clone());
620 let allocator =
621 Arc::new(Allocator::new(filesystem.clone(), super_block.allocator_object_id));
622 if let Some(on_new_allocator) = on_new_allocator {
623 on_new_allocator(allocator.clone());
624 }
625 self.objects.set_allocator(allocator.clone());
626 self.objects.set_borrowed_metadata_space(super_block.borrowed_metadata_space);
627 self.objects.set_last_end_offset(super_block.super_block_journal_file_offset);
628 {
629 let mut inner = self.inner.lock();
630 inner.super_block_header = super_block.clone();
631 }
632
633 let device = filesystem.device();
634
635 let mut handle;
636 {
637 let root_parent_layer = root_parent.tree().mutable_layer();
638 let mut iter = root_parent_layer
639 .seek(Bound::Included(&ObjectKey::attribute(
640 super_block.journal_object_id,
641 DEFAULT_DATA_ATTRIBUTE_ID,
642 AttributeKey::Extent(ExtentKey::search_key_from_offset(round_down(
643 super_block.journal_checkpoint.file_offset,
644 BLOCK_SIZE,
645 ))),
646 )))
647 .await
648 .context("Failed to seek root parent store")?;
649 let start_offset = if let Some(ItemRef {
650 key:
651 ObjectKey {
652 data:
653 ObjectKeyData::Attribute(
654 DEFAULT_DATA_ATTRIBUTE_ID,
655 AttributeKey::Extent(ExtentKey { range }),
656 ),
657 ..
658 },
659 ..
660 }) = iter.get()
661 {
662 range.start
663 } else {
664 0
665 };
666 handle = BootstrapObjectHandle::new_with_start_offset(
667 super_block.journal_object_id,
668 device.clone(),
669 start_offset,
670 );
671 while let Some(item) = iter.get() {
672 if !match item.into() {
673 Some((
674 object_id,
675 DEFAULT_DATA_ATTRIBUTE_ID,
676 ExtentKey { range },
677 ExtentValue::Some { device_offset, .. },
678 )) if object_id == super_block.journal_object_id => {
679 if let Some(end_offset) = handle.end_offset() {
680 if range.start != end_offset {
681 bail!(anyhow!(FxfsError::Inconsistent).context(format!(
682 "Unexpected journal extent {:?}, expected start: {}",
683 item, end_offset
684 )));
685 }
686 }
687 handle.push_extent(
688 0, *device_offset
690 ..*device_offset + range.length().context("Invalid extent")?,
691 );
692 true
693 }
694 _ => false,
695 } {
696 break;
697 }
698 iter.advance().await.context("Failed to advance root parent store iterator")?;
699 }
700 }
701
702 let mut reader = JournalReader::new(handle, &super_block.journal_checkpoint);
703 let JournaledTransactions { mut transactions, device_flushed_offset } = self
704 .read_transactions(&mut reader, None, INVALID_OBJECT_ID)
705 .await
706 .context("Reading transactions for replay")?;
707
708 let mut checksum_list = ChecksumList::new(device_flushed_offset);
710 let mut valid_to = reader.journal_file_checkpoint().file_offset;
711 let device_size = device.size();
712 'bad_replay: for JournaledTransaction {
713 checkpoint,
714 root_parent_mutations,
715 root_mutations,
716 non_root_mutations,
717 checksums,
718 ..
719 } in &transactions
720 {
721 for JournaledChecksums { device_range, checksums, first_write } in checksums {
722 checksum_list
723 .push(
724 checkpoint.file_offset,
725 device_range.clone(),
726 checksums.maybe_as_ref().context("Malformed checksums")?,
727 *first_write,
728 )
729 .context("Pushing journal checksum records to checksum list")?;
730 }
731 for mutation in root_parent_mutations
732 .iter()
733 .chain(root_mutations)
734 .chain(non_root_mutations.iter().map(|(_, m)| m))
735 {
736 if !self.validate_mutation(mutation, block_size, device_size) {
737 info!(mutation:?; "Stopping replay at bad mutation");
738 valid_to = checkpoint.file_offset;
739 break 'bad_replay;
740 }
741 self.update_checksum_list(checkpoint.file_offset, &mutation, &mut checksum_list)?;
742 }
743 }
744
745 let valid_to = checksum_list
748 .verify(device.as_ref(), valid_to)
749 .await
750 .context("Failed to validate checksums")?;
751
752 let mut last_checkpoint = reader.journal_file_checkpoint();
755 let mut journal_offsets = super_block.journal_file_offsets.clone();
756
757 for (
760 index,
761 JournaledTransaction {
762 checkpoint,
763 root_parent_mutations,
764 end_flush,
765 volume_deleted,
766 ..
767 },
768 ) in transactions.iter_mut().enumerate()
769 {
770 if checkpoint.file_offset >= valid_to {
771 last_checkpoint = checkpoint.clone();
772
773 transactions.truncate(index);
775 break;
776 }
777
778 let context = ApplyContext { mode: ApplyMode::Replay, checkpoint: checkpoint.clone() };
779 for mutation in root_parent_mutations.drain(..) {
780 self.objects
781 .apply_mutation(
782 super_block.root_parent_store_object_id,
783 mutation,
784 &context,
785 AssocObj::None,
786 )
787 .context("Failed to replay root parent store mutations")?;
788 }
789
790 if let Some((object_id, journal_offset)) = end_flush {
791 journal_offsets.insert(*object_id, *journal_offset);
792 }
793
794 if let Some(object_id) = volume_deleted {
795 journal_offsets.insert(*object_id, VOLUME_DELETED);
796 }
797 }
798
799 let root_store = ObjectStore::open(
801 &root_parent,
802 super_block.root_store_object_id,
803 Box::new(NullCache {}),
804 )
805 .await
806 .context("Unable to open root store")?;
807
808 ensure!(
809 !root_store.is_encrypted(),
810 anyhow!(FxfsError::Inconsistent).context("Root store is encrypted")
811 );
812 self.objects.set_root_store(root_store);
813
814 let root_store_offset =
815 journal_offsets.get(&super_block.root_store_object_id).copied().unwrap_or(0);
816
817 for JournaledTransaction { checkpoint, root_mutations, .. } in &mut transactions {
819 if checkpoint.file_offset < root_store_offset {
820 continue;
821 }
822
823 let context = ApplyContext { mode: ApplyMode::Replay, checkpoint: checkpoint.clone() };
824 for mutation in root_mutations.drain(..) {
825 self.objects
826 .apply_mutation(
827 super_block.root_store_object_id,
828 mutation,
829 &context,
830 AssocObj::None,
831 )
832 .context("Failed to replay root store mutations")?;
833 }
834 }
835
836 allocator.open().await.context("Failed to open allocator")?;
838
839 for JournaledTransaction { checkpoint, non_root_mutations, end_offset, .. } in transactions
841 {
842 self.objects
843 .replay_mutations(
844 non_root_mutations,
845 &journal_offsets,
846 &ApplyContext { mode: ApplyMode::Replay, checkpoint },
847 end_offset,
848 )
849 .await
850 .context("Failed to replay mutations")?;
851 }
852
853 allocator.on_replay_complete().await.context("Failed to complete replay for allocator")?;
854
855 let discarded_to =
856 if last_checkpoint.file_offset != reader.journal_file_checkpoint().file_offset {
857 Some(reader.journal_file_checkpoint().file_offset)
858 } else {
859 None
860 };
861
862 {
864 if last_checkpoint.file_offset < super_block.super_block_journal_file_offset {
865 return Err(anyhow!(FxfsError::Inconsistent).context(format!(
866 "journal replay cut short; journal finishes at {}, but super-block was \
867 written at {}",
868 last_checkpoint.file_offset, super_block.super_block_journal_file_offset
869 )));
870 }
871 let handle = ObjectStore::open_object(
872 &root_parent,
873 super_block.journal_object_id,
874 journal_handle_options(),
875 None,
876 )
877 .await
878 .with_context(|| {
879 format!(
880 "Failed to open journal file (object id: {})",
881 super_block.journal_object_id
882 )
883 })?;
884 let _ = self.handle.set(handle);
885 let mut inner = self.inner.lock();
886 reader.skip_to_end_of_block();
887 let mut writer_checkpoint = reader.journal_file_checkpoint();
888
889 std::mem::drop(reader);
891
892 writer_checkpoint.checksum ^= RESET_XOR;
894 writer_checkpoint.version = LATEST_VERSION;
895 inner.flushed_offset = writer_checkpoint.file_offset;
896
897 inner.device_flushed_offset = inner.flushed_offset;
899
900 inner.writer.seek(writer_checkpoint);
901 inner.output_reset_version = true;
902 inner.valid_to = last_checkpoint.file_offset;
903 if last_checkpoint.file_offset < inner.flushed_offset {
904 inner.discard_offset = Some(last_checkpoint.file_offset);
905 }
906 }
907
908 self.objects
909 .on_replay_complete()
910 .await
911 .context("Failed to complete replay for object manager")?;
912
913 info!(checkpoint = last_checkpoint.file_offset, discarded_to; "replay complete");
914 Ok(())
915 }
916
917 async fn read_transactions(
918 &self,
919 reader: &mut JournalReader,
920 end_offset: Option<u64>,
921 object_id_filter: u64,
922 ) -> Result<JournaledTransactions, Error> {
923 let mut transactions = Vec::new();
924 let (mut device_flushed_offset, root_parent_store_object_id, root_store_object_id) = {
925 let super_block = &self.inner.lock().super_block_header;
926 (
927 super_block.super_block_journal_file_offset,
928 super_block.root_parent_store_object_id,
929 super_block.root_store_object_id,
930 )
931 };
932 let mut current_transaction = None;
933 let mut begin_flush_offsets = HashMap::default();
934 let mut stores_deleted = HashSet::new();
935 loop {
936 let checkpoint = reader.journal_file_checkpoint();
938 if let Some(end_offset) = end_offset {
939 if checkpoint.file_offset >= end_offset {
940 break;
941 }
942 }
943 let result =
944 reader.deserialize().await.context("Failed to deserialize journal record")?;
945 match result {
946 ReadResult::Reset(_) => {
947 if current_transaction.is_some() {
948 current_transaction = None;
949 transactions.pop();
950 }
951 let offset = reader.journal_file_checkpoint().file_offset;
952 if offset > device_flushed_offset {
953 device_flushed_offset = offset;
954 }
955 }
956 ReadResult::Some(record) => {
957 match record {
958 JournalRecord::EndBlock => {
959 reader.skip_to_end_of_block();
960 }
961 JournalRecord::Mutation { object_id, mutation } => {
962 let current_transaction = match current_transaction.as_mut() {
963 None => {
964 transactions.push(JournaledTransaction::new(checkpoint));
965 current_transaction = transactions.last_mut();
966 current_transaction.as_mut().unwrap()
967 }
968 Some(transaction) => transaction,
969 };
970
971 if stores_deleted.contains(&object_id) {
972 bail!(
973 anyhow!(FxfsError::Inconsistent)
974 .context("Encountered mutations for deleted store")
975 );
976 }
977
978 match &mutation {
979 Mutation::BeginFlush => {
980 begin_flush_offsets.insert(
981 object_id,
982 current_transaction.checkpoint.file_offset,
983 );
984 }
985 Mutation::EndFlush => {
986 if let Some(offset) = begin_flush_offsets.remove(&object_id) {
987 if let Some(deleted_volume) =
988 ¤t_transaction.volume_deleted
989 {
990 if *deleted_volume == object_id {
991 bail!(anyhow!(FxfsError::Inconsistent).context(
992 "Multiple EndFlush/DeleteVolume mutations in a \
993 single transaction for the same object"
994 ));
995 }
996 }
997 if current_transaction
1000 .end_flush
1001 .replace((object_id, offset + 1))
1002 .is_some()
1003 {
1004 bail!(anyhow!(FxfsError::Inconsistent).context(
1005 "Multiple EndFlush mutations in a \
1006 single transaction"
1007 ));
1008 }
1009 }
1010 }
1011 Mutation::DeleteVolume => {
1012 if let Some((flushed_object, _)) =
1013 ¤t_transaction.end_flush
1014 {
1015 if *flushed_object == object_id {
1016 bail!(anyhow!(FxfsError::Inconsistent).context(
1017 "Multiple EndFlush/DeleteVolume mutations in a \
1018 single transaction for the same object"
1019 ));
1020 }
1021 }
1022 if current_transaction
1023 .volume_deleted
1024 .replace(object_id)
1025 .is_some()
1026 {
1027 bail!(anyhow!(FxfsError::Inconsistent).context(
1028 "Multiple DeleteVolume mutations in a single \
1029 transaction"
1030 ));
1031 }
1032 stores_deleted.insert(object_id);
1033 }
1034 _ => {}
1035 }
1036
1037 if (object_id_filter == INVALID_OBJECT_ID
1040 || object_id_filter == object_id)
1041 && self.should_apply(object_id, ¤t_transaction.checkpoint)
1042 {
1043 if object_id == root_parent_store_object_id {
1044 current_transaction.root_parent_mutations.push(mutation);
1045 } else if object_id == root_store_object_id {
1046 current_transaction.root_mutations.push(mutation);
1047 } else {
1048 current_transaction
1049 .non_root_mutations
1050 .push((object_id, mutation));
1051 }
1052 }
1053 }
1054 JournalRecord::DataChecksums(device_range, checksums, first_write) => {
1055 let current_transaction = match current_transaction.as_mut() {
1056 None => {
1057 transactions.push(JournaledTransaction::new(checkpoint));
1058 current_transaction = transactions.last_mut();
1059 current_transaction.as_mut().unwrap()
1060 }
1061 Some(transaction) => transaction,
1062 };
1063 current_transaction.checksums.push(JournaledChecksums {
1064 device_range,
1065 checksums,
1066 first_write,
1067 });
1068 }
1069 JournalRecord::Commit => {
1070 if let Some(&mut JournaledTransaction {
1071 ref checkpoint,
1072 ref root_parent_mutations,
1073 ref mut end_offset,
1074 ..
1075 }) = current_transaction.take()
1076 {
1077 for mutation in root_parent_mutations {
1078 if let Mutation::ObjectStore(ObjectStoreMutation {
1082 item:
1083 Item {
1084 key:
1085 ObjectKey {
1086 object_id,
1087 data:
1088 ObjectKeyData::Attribute(
1089 DEFAULT_DATA_ATTRIBUTE_ID,
1090 AttributeKey::Extent(ExtentKey {
1091 range,
1092 }),
1093 ),
1094 ..
1095 },
1096 value:
1097 ObjectValue::Extent(ExtentValue::Some {
1098 device_offset,
1099 ..
1100 }),
1101 ..
1102 },
1103 ..
1104 }) = mutation
1105 {
1106 let handle = reader.handle();
1109 if *object_id != handle.object_id() {
1110 continue;
1111 }
1112 if let Some(end_offset) = handle.end_offset() {
1113 if range.start != end_offset {
1114 bail!(anyhow!(FxfsError::Inconsistent).context(
1115 format!(
1116 "Unexpected journal extent {:?} -> {}, \
1117 expected start: {}",
1118 range, device_offset, end_offset,
1119 )
1120 ));
1121 }
1122 }
1123 handle.push_extent(
1124 checkpoint.file_offset,
1125 *device_offset
1126 ..*device_offset
1127 + range.length().context("Invalid extent")?,
1128 );
1129 }
1130 }
1131 *end_offset = reader.journal_file_checkpoint().file_offset;
1132 }
1133 }
1134 JournalRecord::Discard(offset) => {
1135 if offset == 0 {
1136 bail!(
1137 anyhow!(FxfsError::Inconsistent)
1138 .context("Invalid offset for Discard")
1139 );
1140 }
1141 if let Some(transaction) = current_transaction.as_ref() {
1142 if transaction.checkpoint.file_offset < offset {
1143 continue;
1145 }
1146 }
1147 current_transaction = None;
1148 while let Some(transaction) = transactions.last() {
1149 if transaction.checkpoint.file_offset < offset {
1150 break;
1151 }
1152 transactions.pop();
1153 }
1154 reader.handle().discard_extents(offset);
1155 }
1156 JournalRecord::DidFlushDevice(offset) => {
1157 if offset > device_flushed_offset {
1158 device_flushed_offset = offset;
1159 }
1160 }
1161 }
1162 }
1163 ReadResult::ChecksumMismatch => break,
1165 }
1166 }
1167
1168 if current_transaction.is_some() {
1170 transactions.pop();
1171 }
1172
1173 Ok(JournaledTransactions { transactions, device_flushed_offset })
1174 }
1175
1176 pub async fn init_empty(&self, filesystem: Arc<FxFilesystem>) -> Result<(), Error> {
1179 const INIT_ROOT_PARENT_STORE_OBJECT_ID: u64 = 3;
1185 const INIT_ROOT_STORE_OBJECT_ID: u64 = 4;
1186 const INIT_ALLOCATOR_OBJECT_ID: u64 = 5;
1187
1188 info!(device_size = filesystem.device().size(); "Formatting");
1189
1190 let checkpoint = JournalCheckpoint {
1191 version: LATEST_VERSION,
1192 ..self.inner.lock().writer.journal_file_checkpoint()
1193 };
1194
1195 let root_parent = ObjectStore::new_empty(
1196 None,
1197 INIT_ROOT_PARENT_STORE_OBJECT_ID,
1198 filesystem.clone(),
1199 Box::new(NullCache {}),
1200 );
1201 self.objects.set_root_parent_store(root_parent.clone());
1202
1203 let allocator = Arc::new(Allocator::new(filesystem.clone(), INIT_ALLOCATOR_OBJECT_ID));
1204 self.objects.set_allocator(allocator.clone());
1205 self.objects.init_metadata_reservation()?;
1206
1207 let journal_handle;
1208 let super_block_a_handle;
1209 let super_block_b_handle;
1210 let root_store;
1211 let mut transaction = filesystem
1212 .clone()
1213 .new_transaction(
1214 lock_keys![],
1215 Options { skip_journal_checks: true, ..Default::default() },
1216 )
1217 .await?;
1218 root_store = root_parent
1219 .new_child_store(
1220 &mut transaction,
1221 NewChildStoreOptions { object_id: INIT_ROOT_STORE_OBJECT_ID, ..Default::default() },
1222 Box::new(NullCache {}),
1223 )
1224 .await
1225 .context("new_child_store")?;
1226 self.objects.set_root_store(root_store.clone());
1227
1228 allocator.create(&mut transaction).await?;
1229
1230 super_block_a_handle = ObjectStore::create_object_with_id(
1232 &root_store,
1233 &mut transaction,
1234 SuperBlockInstance::A.object_id(),
1235 HandleOptions::default(),
1236 None,
1237 )
1238 .context("create super block")?;
1239 root_store.update_last_object_id(SuperBlockInstance::A.object_id());
1240 super_block_a_handle
1241 .extend(&mut transaction, SuperBlockInstance::A.first_extent())
1242 .await
1243 .context("extend super block")?;
1244 super_block_b_handle = ObjectStore::create_object_with_id(
1245 &root_store,
1246 &mut transaction,
1247 SuperBlockInstance::B.object_id(),
1248 HandleOptions::default(),
1249 None,
1250 )
1251 .context("create super block")?;
1252 root_store.update_last_object_id(SuperBlockInstance::B.object_id());
1253 super_block_b_handle
1254 .extend(&mut transaction, SuperBlockInstance::B.first_extent())
1255 .await
1256 .context("extend super block")?;
1257
1258 journal_handle = ObjectStore::create_object(
1260 &root_parent,
1261 &mut transaction,
1262 journal_handle_options(),
1263 None,
1264 )
1265 .await
1266 .context("create journal")?;
1267 if self.inner.lock().image_builder_mode.is_none() {
1268 let mut file_range = 0..self.chunk_size();
1269 journal_handle
1270 .preallocate_range(&mut transaction, &mut file_range)
1271 .await
1272 .context("preallocate journal")?;
1273 if file_range.start < file_range.end {
1274 bail!("preallocate_range returned too little space");
1275 }
1276 }
1277
1278 root_store.create(&mut transaction).await?;
1280
1281 root_parent
1283 .set_graveyard_directory_object_id(Graveyard::create(&mut transaction, &root_parent));
1284
1285 transaction.commit().await?;
1286
1287 self.inner.lock().super_block_header = SuperBlockHeader::new(
1288 root_parent.store_object_id(),
1289 root_parent.graveyard_directory_object_id(),
1290 root_store.store_object_id(),
1291 allocator.object_id(),
1292 journal_handle.object_id(),
1293 checkpoint,
1294 LATEST_VERSION,
1295 );
1296
1297 let _ = self.handle.set(journal_handle);
1299 Ok(())
1300 }
1301
1302 pub async fn allocate_journal(&self) -> Result<(), Error> {
1305 let handle = self.handle.get().unwrap();
1306 let filesystem = handle.store().filesystem();
1307 let mut transaction = filesystem
1308 .clone()
1309 .new_transaction(
1310 lock_keys![LockKey::object(handle.store().store_object_id(), handle.object_id()),],
1311 Options { skip_journal_checks: true, ..Default::default() },
1312 )
1313 .await?;
1314 let mut file_range = 0..self.chunk_size();
1315 self.handle
1316 .get()
1317 .unwrap()
1318 .preallocate_range(&mut transaction, &mut file_range)
1319 .await
1320 .context("preallocate journal")?;
1321 if file_range.start < file_range.end {
1322 bail!("preallocate_range returned too little space");
1323 }
1324 transaction.commit().await?;
1325 Ok(())
1326 }
1327
1328 pub async fn init_superblocks(&self) -> Result<(), Error> {
1329 for _ in 0..2 {
1331 self.write_super_block().await?;
1332 }
1333 Ok(())
1334 }
1335
1336 pub async fn read_transactions_for_object(
1342 &self,
1343 object_id: u64,
1344 ) -> Result<Vec<JournaledTransaction>, Error> {
1345 let handle = self.handle.get().expect("No journal handle");
1346 let handle = ObjectStore::open_object(
1348 handle.owner(),
1349 handle.object_id(),
1350 journal_handle_options(),
1351 None,
1352 )
1353 .await?;
1354
1355 let checkpoint = match self.objects.journal_checkpoint(object_id) {
1356 Some(checkpoint) => checkpoint,
1357 None => return Ok(vec![]),
1358 };
1359 let mut reader = JournalReader::new(handle, &checkpoint);
1360 let end_offset = self.inner.lock().valid_to;
1363 Ok(self.read_transactions(&mut reader, Some(end_offset), object_id).await?.transactions)
1364 }
1365
1366 pub async fn commit(&self, transaction: &mut Transaction<'_>) -> Result<u64, Error> {
1368 if transaction.is_empty() {
1369 return Ok(self.inner.lock().writer.journal_file_checkpoint().file_offset);
1370 }
1371
1372 self.pre_commit(transaction).await?;
1373 Ok(self.write_and_apply_mutations(transaction))
1374 }
1375
1376 async fn pre_commit(&self, transaction: &Transaction<'_>) -> Result<(), Error> {
1379 let handle;
1380
1381 let (size, zero_offset) = {
1382 let mut inner = self.inner.lock();
1383
1384 if std::mem::take(&mut inner.output_reset_version) {
1386 LATEST_VERSION.serialize_into(&mut inner.writer)?;
1387 }
1388
1389 if let Some(discard_offset) = inner.discard_offset {
1390 JournalRecord::Discard(discard_offset).serialize_into(&mut inner.writer)?;
1391 inner.discard_offset = None;
1392 }
1393
1394 if inner.needs_did_flush_device {
1395 let offset = inner.device_flushed_offset;
1396 JournalRecord::DidFlushDevice(offset).serialize_into(&mut inner.writer)?;
1397 inner.needs_did_flush_device = false;
1398 }
1399
1400 handle = match self.handle.get() {
1401 None => return Ok(()),
1402 Some(x) => x,
1403 };
1404
1405 let file_offset = inner.writer.journal_file_checkpoint().file_offset;
1406
1407 let size = handle.get_size();
1408 let size = if file_offset + self.chunk_size() > size { Some(size) } else { None };
1409
1410 if size.is_none()
1411 && inner.zero_offset.is_none()
1412 && !self.objects.needs_borrow_for_journal(file_offset)
1413 {
1414 return Ok(());
1415 }
1416
1417 (size, inner.zero_offset)
1418 };
1419
1420 let mut transaction = handle
1421 .new_transaction_with_options(Options {
1422 skip_journal_checks: true,
1423 borrow_metadata_space: true,
1424 allocator_reservation: Some(self.objects.metadata_reservation()),
1425 txn_guard: Some(transaction.txn_guard()),
1426 ..Default::default()
1427 })
1428 .await?;
1429 if let Some(size) = size {
1430 handle
1431 .preallocate_range(&mut transaction, &mut (size..size + self.chunk_size()))
1432 .await?;
1433 }
1434 if let Some(zero_offset) = zero_offset {
1435 handle.zero(&mut transaction, 0..zero_offset).await?;
1436 }
1437
1438 self.write_and_apply_mutations(&mut transaction);
1441
1442 let mut inner = self.inner.lock();
1443
1444 if let Some(size) = size {
1447 assert!(inner.writer.journal_file_checkpoint().file_offset < size);
1448 }
1449
1450 if inner.zero_offset == zero_offset {
1451 inner.zero_offset = None;
1452 }
1453
1454 Ok(())
1455 }
1456
1457 fn should_apply(&self, object_id: u64, journal_file_checkpoint: &JournalCheckpoint) -> bool {
1461 let super_block_header = &self.inner.lock().super_block_header;
1462 let offset = super_block_header
1463 .journal_file_offsets
1464 .get(&object_id)
1465 .cloned()
1466 .unwrap_or(super_block_header.super_block_journal_file_offset);
1467 journal_file_checkpoint.file_offset >= offset
1468 }
1469
1470 async fn write_super_block(&self) -> Result<(), Error> {
1473 let root_parent_store = self.objects.root_parent_store();
1474
1475 let old_layers;
1480 let old_super_block_offset;
1481 let mut new_super_block_header;
1482 let checkpoint;
1483 let borrowed;
1484
1485 {
1486 let _sync_guard = debug_assert_not_too_long!(self.sync_mutex.lock());
1487 {
1488 let _write_guard = self.writer_mutex.lock();
1489 (checkpoint, borrowed) = self.pad_to_block()?;
1490 old_layers = super_block::compact_root_parent(&*root_parent_store)?;
1491 }
1492 self.flush_device(checkpoint.file_offset)
1493 .await
1494 .context("flush failed when writing superblock")?;
1495 }
1496
1497 new_super_block_header = self.inner.lock().super_block_header.clone();
1498
1499 old_super_block_offset = new_super_block_header.journal_checkpoint.file_offset;
1500
1501 let (journal_file_offsets, min_checkpoint) = self.objects.journal_file_offsets();
1502
1503 new_super_block_header.generation =
1504 new_super_block_header.generation.checked_add(1).ok_or(FxfsError::Inconsistent)?;
1505 new_super_block_header.super_block_journal_file_offset = checkpoint.file_offset;
1506 new_super_block_header.journal_checkpoint = min_checkpoint.unwrap_or(checkpoint);
1507 new_super_block_header.journal_checkpoint.version = LATEST_VERSION;
1508 new_super_block_header.journal_file_offsets = journal_file_offsets;
1509 new_super_block_header.borrowed_metadata_space = borrowed;
1510
1511 self.super_block_manager
1512 .save(
1513 new_super_block_header.clone(),
1514 self.objects.root_parent_store().filesystem(),
1515 old_layers,
1516 )
1517 .await?;
1518 {
1519 let mut inner = self.inner.lock();
1520 inner.super_block_header = new_super_block_header;
1521 inner.zero_offset = Some(round_down(old_super_block_offset, BLOCK_SIZE));
1522 }
1523
1524 Ok(())
1525 }
1526
1527 pub async fn sync(
1534 &self,
1535 options: SyncOptions<'_>,
1536 ) -> Result<Option<(JournalCheckpoint, u64)>, Error> {
1537 let _guard = debug_assert_not_too_long!(self.sync_mutex.lock());
1538
1539 let (checkpoint, borrowed) = {
1540 if let Some(precondition) = options.precondition {
1541 if !precondition() {
1542 return Ok(None);
1543 }
1544 }
1545
1546 let _guard = self.writer_mutex.lock();
1549
1550 self.pad_to_block()?
1551 };
1552
1553 if options.flush_device {
1554 self.flush_device(checkpoint.file_offset).await.context("sync: flush failed")?;
1555 }
1556
1557 Ok(Some((checkpoint, borrowed)))
1558 }
1559
1560 fn pad_to_block(&self) -> Result<(JournalCheckpoint, u64), Error> {
1564 let mut inner = self.inner.lock();
1565 let checkpoint = inner.writer.journal_file_checkpoint();
1566 if checkpoint.file_offset % BLOCK_SIZE != 0 {
1567 JournalRecord::EndBlock.serialize_into(&mut inner.writer)?;
1568 inner.writer.pad_to_block()?;
1569 if let Some(waker) = inner.flush_waker.take() {
1570 waker.wake();
1571 }
1572 }
1573 Ok((checkpoint, self.objects.borrowed_metadata_space()))
1574 }
1575
1576 async fn flush_device(&self, checkpoint_offset: u64) -> Result<(), Error> {
1577 debug_assert_not_too_long!(poll_fn(|ctx| {
1578 let mut inner = self.inner.lock();
1579 if inner.flushed_offset >= checkpoint_offset {
1580 Poll::Ready(Ok(()))
1581 } else if inner.terminate {
1582 let context = inner
1583 .terminate_reason
1584 .as_ref()
1585 .map(|e| format!("Journal closed with error: {:?}", e))
1586 .unwrap_or_else(|| "Journal closed".to_string());
1587 Poll::Ready(Err(anyhow!(FxfsError::JournalFlushError).context(context)))
1588 } else {
1589 inner.sync_waker = Some(ctx.waker().clone());
1590 Poll::Pending
1591 }
1592 }))?;
1593
1594 let needs_flush = self.inner.lock().device_flushed_offset < checkpoint_offset;
1595 if needs_flush {
1596 let trace = self.trace.load(Ordering::Relaxed);
1597 if trace {
1598 info!("J: start flush device");
1599 }
1600 self.handle.get().unwrap().flush_device().await?;
1601 if trace {
1602 info!("J: end flush device");
1603 }
1604
1605 {
1613 let mut inner = self.inner.lock();
1614 inner.device_flushed_offset = checkpoint_offset;
1615 inner.needs_did_flush_device = true;
1616 }
1617
1618 self.objects.allocator().did_flush_device(checkpoint_offset);
1621 if trace {
1622 info!("J: did flush device");
1623 }
1624 }
1625
1626 Ok(())
1627 }
1628
1629 pub fn super_block_header(&self) -> SuperBlockHeader {
1631 self.inner.lock().super_block_header.clone()
1632 }
1633
1634 pub async fn check_journal_space(&self) -> Result<(), Error> {
1636 loop {
1637 debug_assert_not_too_long!({
1638 let inner = self.inner.lock();
1639 if inner.terminate {
1640 let context = inner
1643 .terminate_reason
1644 .as_ref()
1645 .map(|e| format!("Journal closed with error: {:?}", e))
1646 .unwrap_or_else(|| "Journal closed".to_string());
1647 break Err(anyhow!(FxfsError::JournalFlushError).context(context));
1648 }
1649 if self.objects.last_end_offset()
1650 - inner.super_block_header.journal_checkpoint.file_offset
1651 < inner.reclaim_size
1652 {
1653 break Ok(());
1654 }
1655 if inner.image_builder_mode.is_some() {
1656 break Ok(());
1657 }
1658 if inner.disable_compactions {
1659 break Err(
1660 anyhow!(FxfsError::JournalFlushError).context("Compactions disabled")
1661 );
1662 }
1663 self.reclaim_event.listen()
1664 });
1665 }
1666 }
1667
1668 fn chunk_size(&self) -> u64 {
1669 CHUNK_SIZE
1670 }
1671
1672 fn write_and_apply_mutations(&self, transaction: &mut Transaction<'_>) -> u64 {
1673 let checkpoint_before;
1674 let checkpoint_after;
1675 {
1676 let _guard = self.writer_mutex.lock();
1677 checkpoint_before = {
1678 let mut inner = self.inner.lock();
1679 if transaction.includes_write() {
1680 inner.needs_barrier = true;
1681 }
1682 let checkpoint = inner.writer.journal_file_checkpoint();
1683 for TxnMutation { object_id, mutation, .. } in transaction.mutations() {
1684 self.objects.write_mutation(
1685 *object_id,
1686 mutation,
1687 Writer(*object_id, &mut inner.writer),
1688 );
1689 }
1690 checkpoint
1691 };
1692 let maybe_mutation =
1693 self.objects.apply_transaction(transaction, &checkpoint_before).expect(
1694 "apply_transaction should not fail in live mode; \
1695 filesystem will be in an inconsistent state",
1696 );
1697 checkpoint_after = {
1698 let mut inner = self.inner.lock();
1699 if let Some(mutation) = maybe_mutation {
1700 inner
1701 .writer
1702 .write_record(&JournalRecord::Mutation { object_id: 0, mutation })
1703 .unwrap();
1704 }
1705 for (device_range, checksums, first_write) in
1706 transaction.take_checksums().into_iter()
1707 {
1708 inner
1709 .writer
1710 .write_record(&JournalRecord::DataChecksums(
1711 device_range,
1712 Checksums::fletcher(checksums),
1713 first_write,
1714 ))
1715 .unwrap();
1716 }
1717 inner.writer.write_record(&JournalRecord::Commit).unwrap();
1718
1719 inner.writer.journal_file_checkpoint()
1720 };
1721 }
1722 self.objects.did_commit_transaction(
1723 transaction,
1724 &checkpoint_before,
1725 checkpoint_after.file_offset,
1726 );
1727
1728 if let Some(waker) = self.inner.lock().flush_waker.take() {
1729 waker.wake();
1730 }
1731
1732 checkpoint_before.file_offset
1733 }
1734
1735 pub async fn flush_task(self: Arc<Self>) {
1739 let mut flush_fut = None;
1740 let mut compact_fut = None;
1741 let mut flush_error = false;
1742 poll_fn(|ctx| {
1743 loop {
1744 {
1745 let mut inner = self.inner.lock();
1746 if flush_fut.is_none() && !flush_error && self.handle.get().is_some() {
1747 let flushable = inner.writer.flushable_bytes();
1748 if flushable > 0 {
1749 flush_fut = Some(self.flush(flushable).boxed());
1750 }
1751 }
1752 if inner.terminate && flush_fut.is_none() && compact_fut.is_none() {
1753 return Poll::Ready(());
1754 }
1755 if compact_fut.is_none()
1759 && !inner.terminate
1760 && !inner.disable_compactions
1761 && inner.image_builder_mode.is_none()
1762 && self.objects.last_end_offset()
1763 - inner.super_block_header.journal_checkpoint.file_offset
1764 > inner.reclaim_size / 2
1765 {
1766 compact_fut = Some(self.compact().boxed());
1767 inner.compaction_running = true;
1768 }
1769 inner.flush_waker = Some(ctx.waker().clone());
1770 }
1771 let mut pending = true;
1772 if let Some(fut) = flush_fut.as_mut() {
1773 if let Poll::Ready(result) = fut.poll_unpin(ctx) {
1774 if let Err(e) = result {
1775 self.inner.lock().terminate(Some(e.context("Flush error")));
1776 self.reclaim_event.notify(usize::MAX);
1777 flush_error = true;
1778 }
1779 flush_fut = None;
1780 pending = false;
1781 }
1782 }
1783 if let Some(fut) = compact_fut.as_mut() {
1784 if let Poll::Ready(result) = fut.poll_unpin(ctx) {
1785 let mut inner = self.inner.lock();
1786 if let Err(e) = result {
1787 inner.terminate(Some(e.context("Compaction error")));
1788 }
1789 compact_fut = None;
1790 inner.compaction_running = false;
1791 self.reclaim_event.notify(usize::MAX);
1792 pending = false;
1793 }
1794 }
1795 if pending {
1796 return Poll::Pending;
1797 }
1798 }
1799 })
1800 .await;
1801 }
1802
1803 async fn flush(&self, amount: usize) -> Result<(), Error> {
1804 let handle = self.handle.get().unwrap();
1805 let mut buf = handle.allocate_buffer(amount).await;
1806 let (offset, len, barrier_on_first_write) = {
1807 let mut inner = self.inner.lock();
1808 let offset = inner.writer.take_flushable(buf.as_mut());
1809 let barrier_on_first_write = inner.needs_barrier && inner.barriers_enabled;
1810 inner.needs_barrier = false;
1813 (offset, buf.len() as u64, barrier_on_first_write)
1814 };
1815 self.handle
1816 .get()
1817 .unwrap()
1818 .overwrite(
1819 offset,
1820 buf.as_mut(),
1821 OverwriteOptions { barrier_on_first_write, ..Default::default() },
1822 )
1823 .await?;
1824
1825 let mut inner = self.inner.lock();
1826 if let Some(waker) = inner.sync_waker.take() {
1827 waker.wake();
1828 }
1829 inner.flushed_offset = offset + len;
1830 inner.valid_to = inner.flushed_offset;
1831 Ok(())
1832 }
1833
1834 #[trace]
1837 pub async fn compact(&self) -> Result<(), Error> {
1838 let trace = self.trace.load(Ordering::Relaxed);
1839 debug!("Compaction starting");
1840 if trace {
1841 info!("J: start compaction");
1842 }
1843 let earliest_version = self.objects.flush().await.context("Failed to flush objects")?;
1844 self.inner.lock().super_block_header.earliest_version = earliest_version;
1845 self.write_super_block().await.context("Failed to write superblock")?;
1846 if trace {
1847 info!("J: end compaction");
1848 }
1849 debug!("Compaction finished");
1850 Ok(())
1851 }
1852
1853 pub async fn stop_compactions(&self) {
1854 loop {
1855 debug_assert_not_too_long!({
1856 let mut inner = self.inner.lock();
1857 inner.disable_compactions = true;
1858 if !inner.compaction_running {
1859 return;
1860 }
1861 self.reclaim_event.listen()
1862 });
1863 }
1864 }
1865
1866 pub fn track_statistics(self: &Arc<Self>, parent: &fuchsia_inspect::Node, name: &str) {
1869 let this = Arc::downgrade(self);
1870 parent.record_lazy_child(name, move || {
1871 let this_clone = this.clone();
1872 async move {
1873 let inspector = fuchsia_inspect::Inspector::default();
1874 if let Some(this) = this_clone.upgrade() {
1875 let (journal_min, journal_max, journal_reclaim_size) = {
1876 let inner = this.inner.lock();
1878 (
1879 round_down(
1880 inner.super_block_header.journal_checkpoint.file_offset,
1881 BLOCK_SIZE,
1882 ),
1883 inner.flushed_offset,
1884 inner.reclaim_size,
1885 )
1886 };
1887 let root = inspector.root();
1888 root.record_uint("journal_min_offset", journal_min);
1889 root.record_uint("journal_max_offset", journal_max);
1890 root.record_uint("journal_size", journal_max - journal_min);
1891 root.record_uint("journal_reclaim_size", journal_reclaim_size);
1892
1893 if let Some(x) = round_div(
1895 100 * (journal_max - journal_min),
1896 this.objects.allocator().get_disk_bytes(),
1897 ) {
1898 root.record_uint("journal_size_to_disk_size_percent", x);
1899 }
1900 }
1901 Ok(inspector)
1902 }
1903 .boxed()
1904 });
1905 }
1906
1907 pub fn terminate(&self) {
1909 self.inner.lock().terminate(None);
1910 self.reclaim_event.notify(usize::MAX);
1911 }
1912}
1913
1914pub struct Writer<'a>(u64, &'a mut JournalWriter);
1916
1917impl Writer<'_> {
1918 pub fn write(&mut self, mutation: Mutation) {
1919 self.1.write_record(&JournalRecord::Mutation { object_id: self.0, mutation }).unwrap();
1920 }
1921}
1922
1923#[cfg(test)]
1924mod tests {
1925 use crate::filesystem::{FxFilesystem, SyncOptions};
1926 use crate::fsck::fsck;
1927 use crate::object_handle::{ObjectHandle, ReadObjectHandle, WriteObjectHandle};
1928 use crate::object_store::directory::Directory;
1929 use crate::object_store::transaction::Options;
1930 use crate::object_store::volume::root_volume;
1931 use crate::object_store::{
1932 HandleOptions, LockKey, NewChildStoreOptions, ObjectStore, StoreOptions, lock_keys,
1933 };
1934 use fuchsia_async as fasync;
1935 use fuchsia_async::MonotonicDuration;
1936 use storage_device::DeviceHolder;
1937 use storage_device::fake_device::FakeDevice;
1938
1939 const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
1940
1941 #[fuchsia::test]
1942 async fn test_replay() {
1943 const TEST_DATA: &[u8] = b"hello";
1944
1945 let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1946
1947 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1948
1949 let object_id = {
1950 let root_store = fs.root_store();
1951 let root_directory =
1952 Directory::open(&root_store, root_store.root_directory_object_id())
1953 .await
1954 .expect("open failed");
1955 let mut transaction = fs
1956 .clone()
1957 .new_transaction(
1958 lock_keys![LockKey::object(
1959 root_store.store_object_id(),
1960 root_store.root_directory_object_id(),
1961 )],
1962 Options::default(),
1963 )
1964 .await
1965 .expect("new_transaction failed");
1966 let handle = root_directory
1967 .create_child_file(&mut transaction, "test")
1968 .await
1969 .expect("create_child_file failed");
1970
1971 transaction.commit().await.expect("commit failed");
1972 let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
1973 buf.as_mut_slice().copy_from_slice(TEST_DATA);
1974 handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
1975 fs.sync(SyncOptions::default()).await.expect("sync failed");
1978 handle.object_id()
1979 };
1980
1981 {
1982 fs.close().await.expect("Close failed");
1983 let device = fs.take_device().await;
1984 device.reopen(false);
1985 let fs = FxFilesystem::open(device).await.expect("open failed");
1986 let handle = ObjectStore::open_object(
1987 &fs.root_store(),
1988 object_id,
1989 HandleOptions::default(),
1990 None,
1991 )
1992 .await
1993 .expect("open_object failed");
1994 let mut buf = handle.allocate_buffer(TEST_DEVICE_BLOCK_SIZE as usize).await;
1995 assert_eq!(handle.read(0, buf.as_mut()).await.expect("read failed"), TEST_DATA.len());
1996 assert_eq!(&buf.as_slice()[..TEST_DATA.len()], TEST_DATA);
1997 fsck(fs.clone()).await.expect("fsck failed");
1998 fs.close().await.expect("Close failed");
1999 }
2000 }
2001
2002 #[fuchsia::test]
2003 async fn test_reset() {
2004 const TEST_DATA: &[u8] = b"hello";
2005
2006 let device = DeviceHolder::new(FakeDevice::new(32768, TEST_DEVICE_BLOCK_SIZE));
2007
2008 let mut object_ids = Vec::new();
2009
2010 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2011 {
2012 let root_store = fs.root_store();
2013 let root_directory =
2014 Directory::open(&root_store, root_store.root_directory_object_id())
2015 .await
2016 .expect("open failed");
2017 let mut transaction = fs
2018 .clone()
2019 .new_transaction(
2020 lock_keys![LockKey::object(
2021 root_store.store_object_id(),
2022 root_store.root_directory_object_id(),
2023 )],
2024 Options::default(),
2025 )
2026 .await
2027 .expect("new_transaction failed");
2028 let handle = root_directory
2029 .create_child_file(&mut transaction, "test")
2030 .await
2031 .expect("create_child_file failed");
2032 transaction.commit().await.expect("commit failed");
2033 let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
2034 buf.as_mut_slice().copy_from_slice(TEST_DATA);
2035 handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2036 fs.sync(SyncOptions::default()).await.expect("sync failed");
2037 object_ids.push(handle.object_id());
2038
2039 for i in 0..1000 {
2042 let mut transaction = fs
2043 .clone()
2044 .new_transaction(
2045 lock_keys![LockKey::object(
2046 root_store.store_object_id(),
2047 root_store.root_directory_object_id(),
2048 )],
2049 Options::default(),
2050 )
2051 .await
2052 .expect("new_transaction failed");
2053 let handle = root_directory
2054 .create_child_file(&mut transaction, &format!("{}", i))
2055 .await
2056 .expect("create_child_file failed");
2057 transaction.commit().await.expect("commit failed");
2058 let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
2059 buf.as_mut_slice().copy_from_slice(TEST_DATA);
2060 handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2061 object_ids.push(handle.object_id());
2062 }
2063 }
2064 fs.close().await.expect("fs close failed");
2065 let device = fs.take_device().await;
2066 device.reopen(false);
2067 let fs = FxFilesystem::open(device).await.expect("open failed");
2068 fsck(fs.clone()).await.expect("fsck failed");
2069 {
2070 let root_store = fs.root_store();
2071 for &object_id in &object_ids[0..1] {
2073 let handle = ObjectStore::open_object(
2074 &root_store,
2075 object_id,
2076 HandleOptions::default(),
2077 None,
2078 )
2079 .await
2080 .expect("open_object failed");
2081 let mut buf = handle.allocate_buffer(TEST_DEVICE_BLOCK_SIZE as usize).await;
2082 assert_eq!(
2083 handle.read(0, buf.as_mut()).await.expect("read failed"),
2084 TEST_DATA.len()
2085 );
2086 assert_eq!(&buf.as_slice()[..TEST_DATA.len()], TEST_DATA);
2087 }
2088
2089 let root_directory =
2091 Directory::open(&root_store, root_store.root_directory_object_id())
2092 .await
2093 .expect("open failed");
2094 let mut transaction = fs
2095 .clone()
2096 .new_transaction(
2097 lock_keys![LockKey::object(
2098 root_store.store_object_id(),
2099 root_store.root_directory_object_id(),
2100 )],
2101 Options::default(),
2102 )
2103 .await
2104 .expect("new_transaction failed");
2105 let handle = root_directory
2106 .create_child_file(&mut transaction, "test2")
2107 .await
2108 .expect("create_child_file failed");
2109 transaction.commit().await.expect("commit failed");
2110 let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
2111 buf.as_mut_slice().copy_from_slice(TEST_DATA);
2112 handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2113 fs.sync(SyncOptions::default()).await.expect("sync failed");
2114 object_ids.push(handle.object_id());
2115 }
2116
2117 fs.close().await.expect("close failed");
2118 let device = fs.take_device().await;
2119 device.reopen(false);
2120 let fs = FxFilesystem::open(device).await.expect("open failed");
2121 {
2122 fsck(fs.clone()).await.expect("fsck failed");
2123
2124 for &object_id in object_ids[0..1].iter().chain(object_ids.last().cloned().iter()) {
2126 let handle = ObjectStore::open_object(
2127 &fs.root_store(),
2128 object_id,
2129 HandleOptions::default(),
2130 None,
2131 )
2132 .await
2133 .unwrap_or_else(|e| {
2134 panic!("open_object failed (object_id: {}): {:?}", object_id, e)
2135 });
2136 let mut buf = handle.allocate_buffer(TEST_DEVICE_BLOCK_SIZE as usize).await;
2137 assert_eq!(
2138 handle.read(0, buf.as_mut()).await.expect("read failed"),
2139 TEST_DATA.len()
2140 );
2141 assert_eq!(&buf.as_slice()[..TEST_DATA.len()], TEST_DATA);
2142 }
2143 }
2144 fs.close().await.expect("close failed");
2145 }
2146
2147 #[fuchsia::test]
2148 async fn test_discard() {
2149 let device = {
2150 let device = DeviceHolder::new(FakeDevice::new(8192, 4096));
2151 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2152 let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
2153
2154 let store = root_volume
2155 .new_volume("test", NewChildStoreOptions::default())
2156 .await
2157 .expect("new_volume failed");
2158 let root_directory = Directory::open(&store, store.root_directory_object_id())
2159 .await
2160 .expect("open failed");
2161
2162 let mut i = 0;
2164 loop {
2165 let mut transaction = fs
2166 .clone()
2167 .new_transaction(
2168 lock_keys![LockKey::object(
2169 store.store_object_id(),
2170 store.root_directory_object_id()
2171 )],
2172 Options::default(),
2173 )
2174 .await
2175 .expect("new_transaction failed");
2176 root_directory
2177 .create_child_file(&mut transaction, &format!("a {i}"))
2178 .await
2179 .expect("create_child_file failed");
2180 if transaction.commit().await.expect("commit failed") > super::CHUNK_SIZE {
2181 break;
2182 }
2183 i += 1;
2184 }
2185
2186 fs.journal().compact().await.expect("compact failed");
2188 fs.journal().stop_compactions().await;
2189
2190 let mut i = 0;
2192 loop {
2193 let mut transaction = fs
2194 .clone()
2195 .new_transaction(
2196 lock_keys![LockKey::object(
2197 store.store_object_id(),
2198 store.root_directory_object_id()
2199 )],
2200 Options::default(),
2201 )
2202 .await
2203 .expect("new_transaction failed");
2204 root_directory
2205 .create_child_file(&mut transaction, &format!("b {i}"))
2206 .await
2207 .expect("create_child_file failed");
2208 if transaction.commit().await.expect("commit failed") > 2 * super::CHUNK_SIZE {
2209 break;
2210 }
2211 i += 1;
2212 }
2213
2214 fasync::Timer::new(MonotonicDuration::from_millis(10)).await;
2216 fs.device().snapshot().expect("snapshot failed")
2219 };
2220
2221 let fs = FxFilesystem::open(device).await.expect("open failed");
2222
2223 {
2224 let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
2225
2226 let store =
2227 root_volume.volume("test", StoreOptions::default()).await.expect("volume failed");
2228
2229 let root_directory = Directory::open(&store, store.root_directory_object_id())
2230 .await
2231 .expect("open failed");
2232
2233 let mut transaction = fs
2235 .clone()
2236 .new_transaction(
2237 lock_keys![LockKey::object(
2238 store.store_object_id(),
2239 store.root_directory_object_id()
2240 )],
2241 Options::default(),
2242 )
2243 .await
2244 .expect("new_transaction failed");
2245 root_directory
2246 .create_child_file(&mut transaction, &format!("d"))
2247 .await
2248 .expect("create_child_file failed");
2249 transaction.commit().await.expect("commit failed");
2250 }
2251
2252 fs.close().await.expect("close failed");
2253 let device = fs.take_device().await;
2254 device.reopen(false);
2255
2256 let fs = FxFilesystem::open(device).await.expect("open failed");
2257 fsck(fs.clone()).await.expect("fsck failed");
2258 fs.close().await.expect("close failed");
2259 }
2260}
2261
2262#[cfg(fuzz)]
2263mod fuzz {
2264 use fuzz::fuzz;
2265
2266 #[fuzz]
2267 fn fuzz_journal_bytes(input: Vec<u8>) {
2268 use crate::filesystem::FxFilesystem;
2269 use fuchsia_async as fasync;
2270 use std::io::Write;
2271 use storage_device::DeviceHolder;
2272 use storage_device::fake_device::FakeDevice;
2273
2274 fasync::SendExecutorBuilder::new().num_threads(4).build().run(async move {
2275 let device = DeviceHolder::new(FakeDevice::new(32768, 512));
2276 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2277 fs.journal().inner.lock().writer.write_all(&input).expect("write failed");
2278 fs.close().await.expect("close failed");
2279 let device = fs.take_device().await;
2280 device.reopen(false);
2281 if let Ok(fs) = FxFilesystem::open(device).await {
2282 let _ = fs.close().await;
2285 }
2286 });
2287 }
2288
2289 #[fuzz]
2290 fn fuzz_journal(input: Vec<super::JournalRecord>) {
2291 use crate::filesystem::FxFilesystem;
2292 use fuchsia_async as fasync;
2293 use storage_device::DeviceHolder;
2294 use storage_device::fake_device::FakeDevice;
2295
2296 fasync::SendExecutorBuilder::new().num_threads(4).build().run(async move {
2297 let device = DeviceHolder::new(FakeDevice::new(32768, 512));
2298 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2299 {
2300 let mut inner = fs.journal().inner.lock();
2301 for record in &input {
2302 let _ = inner.writer.write_record(record);
2303 }
2304 }
2305 fs.close().await.expect("close failed");
2306 let device = fs.take_device().await;
2307 device.reopen(false);
2308 if let Ok(fs) = FxFilesystem::open(device).await {
2309 let _ = fs.close().await;
2312 }
2313 });
2314 }
2315}