1mod bootstrap_handle;
20mod checksum_list;
21mod reader;
22pub mod super_block;
23mod writer;
24
25use crate::checksum::{Checksum, Checksums, ChecksumsV38};
26use crate::errors::FxfsError;
27use crate::filesystem::{ApplyContext, ApplyMode, FxFilesystem, SyncOptions};
28use crate::log::*;
29use crate::lsm_tree::cache::NullCache;
30use crate::lsm_tree::types::Layer;
31use crate::object_handle::{ObjectHandle as _, ReadObjectHandle};
32use crate::object_store::allocator::Allocator;
33use crate::object_store::data_object_handle::OverwriteOptions;
34use crate::object_store::extent_record::{
35 DEFAULT_DATA_ATTRIBUTE_ID, ExtentKey, ExtentMode, ExtentValue,
36};
37use crate::object_store::graveyard::Graveyard;
38use crate::object_store::journal::bootstrap_handle::BootstrapObjectHandle;
39use crate::object_store::journal::checksum_list::ChecksumList;
40use crate::object_store::journal::reader::{JournalReader, ReadResult};
41use crate::object_store::journal::super_block::{
42 SuperBlockHeader, SuperBlockInstance, SuperBlockManager,
43};
44use crate::object_store::journal::writer::JournalWriter;
45use crate::object_store::object_manager::ObjectManager;
46use crate::object_store::object_record::{AttributeKey, ObjectKey, ObjectKeyData, ObjectValue};
47use crate::object_store::transaction::{
48 AllocatorMutation, LockKey, Mutation, MutationV40, MutationV41, MutationV43, MutationV46,
49 MutationV47, MutationV49, ObjectStoreMutation, Options, TRANSACTION_MAX_JOURNAL_USAGE,
50 Transaction, TxnMutation, lock_keys,
51};
52use crate::object_store::{
53 AssocObj, DataObjectHandle, HandleOptions, HandleOwner, INVALID_OBJECT_ID, Item, ItemRef,
54 NewChildStoreOptions, ObjectStore,
55};
56use crate::range::RangeExt;
57use crate::round::{round_div, round_down};
58use crate::serialized_types::{LATEST_VERSION, Migrate, Version, Versioned, migrate_to_version};
59use anyhow::{Context, Error, anyhow, bail, ensure};
60use event_listener::Event;
61use fprint::TypeFingerprint;
62use fuchsia_sync::Mutex;
63use futures::FutureExt as _;
64use futures::future::poll_fn;
65use once_cell::sync::OnceCell;
66use rustc_hash::FxHashMap as HashMap;
67use serde::{Deserialize, Serialize};
68use static_assertions::const_assert;
69use std::clone::Clone;
70use std::collections::HashSet;
71use std::ops::{Bound, Range};
72use std::sync::Arc;
73use std::sync::atomic::{AtomicBool, Ordering};
74use std::task::{Poll, Waker};
75
76pub const BLOCK_SIZE: u64 = 4096;
78
79const CHUNK_SIZE: u64 = 131_072;
81const_assert!(CHUNK_SIZE > TRANSACTION_MAX_JOURNAL_USAGE);
82
83pub const DEFAULT_RECLAIM_SIZE: u64 = 262_144;
85
86pub const RESERVED_SPACE: u64 = 1_048_576;
89
90const RESET_XOR: u64 = 0xffffffffffffffff;
95
96pub type JournalCheckpoint = JournalCheckpointV32;
100
101#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize, TypeFingerprint)]
102pub struct JournalCheckpointV32 {
103 pub file_offset: u64,
104
105 pub checksum: Checksum,
108
109 pub version: Version,
113}
114
115pub type JournalRecord = JournalRecordV49;
116
117#[allow(clippy::large_enum_variant)]
118#[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint, Versioned)]
119#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
120pub enum JournalRecordV49 {
121 EndBlock,
123 Mutation { object_id: u64, mutation: MutationV49 },
126 Commit,
128 Discard(u64),
130 DidFlushDevice(u64),
140 DataChecksums(Range<u64>, ChecksumsV38, bool),
148}
149
150#[allow(clippy::large_enum_variant)]
151#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
152#[migrate_to_version(JournalRecordV49)]
153pub enum JournalRecordV47 {
154 EndBlock,
155 Mutation { object_id: u64, mutation: MutationV47 },
156 Commit,
157 Discard(u64),
158 DidFlushDevice(u64),
159 DataChecksums(Range<u64>, ChecksumsV38, bool),
160}
161
162#[allow(clippy::large_enum_variant)]
163#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
164#[migrate_to_version(JournalRecordV47)]
165pub enum JournalRecordV46 {
166 EndBlock,
167 Mutation { object_id: u64, mutation: MutationV46 },
168 Commit,
169 Discard(u64),
170 DidFlushDevice(u64),
171 DataChecksums(Range<u64>, ChecksumsV38, bool),
172}
173
174#[allow(clippy::large_enum_variant)]
175#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
176#[migrate_to_version(JournalRecordV46)]
177pub enum JournalRecordV43 {
178 EndBlock,
179 Mutation { object_id: u64, mutation: MutationV43 },
180 Commit,
181 Discard(u64),
182 DidFlushDevice(u64),
183 DataChecksums(Range<u64>, ChecksumsV38, bool),
184}
185
186#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
187#[migrate_to_version(JournalRecordV43)]
188pub enum JournalRecordV42 {
189 EndBlock,
190 Mutation { object_id: u64, mutation: MutationV41 },
191 Commit,
192 Discard(u64),
193 DidFlushDevice(u64),
194 DataChecksums(Range<u64>, ChecksumsV38, bool),
195}
196
197#[derive(Serialize, Deserialize, TypeFingerprint, Versioned)]
198pub enum JournalRecordV41 {
199 EndBlock,
200 Mutation { object_id: u64, mutation: MutationV41 },
201 Commit,
202 Discard(u64),
203 DidFlushDevice(u64),
204 DataChecksums(Range<u64>, ChecksumsV38),
205}
206
207impl From<JournalRecordV41> for JournalRecordV42 {
208 fn from(record: JournalRecordV41) -> Self {
209 match record {
210 JournalRecordV41::EndBlock => Self::EndBlock,
211 JournalRecordV41::Mutation { object_id, mutation } => {
212 Self::Mutation { object_id, mutation: mutation.into() }
213 }
214 JournalRecordV41::Commit => Self::Commit,
215 JournalRecordV41::Discard(offset) => Self::Discard(offset),
216 JournalRecordV41::DidFlushDevice(offset) => Self::DidFlushDevice(offset),
217 JournalRecordV41::DataChecksums(range, sums) => {
218 Self::DataChecksums(range, sums, true)
221 }
222 }
223 }
224}
225
226#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
227#[migrate_to_version(JournalRecordV41)]
228pub enum JournalRecordV40 {
229 EndBlock,
230 Mutation { object_id: u64, mutation: MutationV40 },
231 Commit,
232 Discard(u64),
233 DidFlushDevice(u64),
234 DataChecksums(Range<u64>, ChecksumsV38),
235}
236
237pub(super) fn journal_handle_options() -> HandleOptions {
238 HandleOptions { skip_journal_checks: true, ..Default::default() }
239}
240
241pub struct Journal {
247 objects: Arc<ObjectManager>,
248 handle: OnceCell<DataObjectHandle<ObjectStore>>,
249 super_block_manager: SuperBlockManager,
250 inner: Mutex<Inner>,
251 writer_mutex: Mutex<()>,
252 sync_mutex: futures::lock::Mutex<()>,
253 trace: AtomicBool,
254
255 reclaim_event: Event,
257}
258
259struct Inner {
260 super_block_header: SuperBlockHeader,
261
262 zero_offset: Option<u64>,
264
265 device_flushed_offset: u64,
267
268 needs_did_flush_device: bool,
270
271 writer: JournalWriter,
273
274 output_reset_version: bool,
277
278 flush_waker: Option<Waker>,
280
281 terminate: bool,
283
284 terminate_reason: Option<Error>,
286
287 disable_compactions: bool,
289
290 compaction_running: bool,
292
293 sync_waker: Option<Waker>,
295
296 flushed_offset: u64,
298
299 valid_to: u64,
304
305 discard_offset: Option<u64>,
309
310 reclaim_size: u64,
314
315 image_builder_mode: Option<SuperBlockInstance>,
316
317 barriers_enabled: bool,
321
322 needs_barrier: bool,
325}
326
327impl Inner {
328 fn terminate(&mut self, reason: Option<Error>) {
329 self.terminate = true;
330
331 if let Some(err) = reason {
332 error!(error:? = err; "Terminating journal");
333 if let Some(prev_err) = self.terminate_reason.as_ref() {
335 error!(error:? = prev_err; "Journal previously terminated");
336 } else {
337 self.terminate_reason = Some(err);
338 }
339 }
340
341 if let Some(waker) = self.flush_waker.take() {
342 waker.wake();
343 }
344 if let Some(waker) = self.sync_waker.take() {
345 waker.wake();
346 }
347 }
348}
349
350pub struct JournalOptions {
351 pub reclaim_size: u64,
355
356 pub barriers_enabled: bool,
360}
361
362impl Default for JournalOptions {
363 fn default() -> Self {
364 JournalOptions { reclaim_size: DEFAULT_RECLAIM_SIZE, barriers_enabled: false }
365 }
366}
367
368struct JournaledTransactions {
369 transactions: Vec<JournaledTransaction>,
370 device_flushed_offset: u64,
371}
372
373#[derive(Debug, Default)]
374pub struct JournaledTransaction {
375 pub checkpoint: JournalCheckpoint,
376 pub root_parent_mutations: Vec<Mutation>,
377 pub root_mutations: Vec<Mutation>,
378 pub non_root_mutations: Vec<(u64, Mutation)>,
380 pub end_offset: u64,
381 pub checksums: Vec<JournaledChecksums>,
382
383 pub end_flush: Option<(u64, u64)>,
387
388 pub volume_deleted: Option<u64>,
390}
391
392impl JournaledTransaction {
393 fn new(checkpoint: JournalCheckpoint) -> Self {
394 Self { checkpoint, ..Default::default() }
395 }
396}
397
398const VOLUME_DELETED: u64 = u64::MAX;
399
400#[derive(Debug)]
401pub struct JournaledChecksums {
402 pub device_range: Range<u64>,
403 pub checksums: Checksums,
404 pub first_write: bool,
405}
406
407pub trait JournalHandle: ReadObjectHandle {
410 fn end_offset(&self) -> Option<u64>;
414 fn push_extent(&mut self, added_offset: u64, device_range: Range<u64>);
418 fn discard_extents(&mut self, discard_offset: u64);
420}
421
422impl<S: HandleOwner> JournalHandle for DataObjectHandle<S> {
426 fn end_offset(&self) -> Option<u64> {
427 None
428 }
429 fn push_extent(&mut self, _added_offset: u64, _device_range: Range<u64>) {
430 }
432 fn discard_extents(&mut self, _discard_offset: u64) {
433 }
435}
436
437#[fxfs_trace::trace]
438impl Journal {
439 pub fn new(objects: Arc<ObjectManager>, options: JournalOptions) -> Journal {
440 let starting_checksum = rand::random_range(1..u64::MAX);
441 Journal {
442 objects: objects,
443 handle: OnceCell::new(),
444 super_block_manager: SuperBlockManager::new(),
445 inner: Mutex::new(Inner {
446 super_block_header: SuperBlockHeader::default(),
447 zero_offset: None,
448 device_flushed_offset: 0,
449 needs_did_flush_device: false,
450 writer: JournalWriter::new(BLOCK_SIZE as usize, starting_checksum),
451 output_reset_version: false,
452 flush_waker: None,
453 terminate: false,
454 terminate_reason: None,
455 disable_compactions: false,
456 compaction_running: false,
457 sync_waker: None,
458 flushed_offset: 0,
459 valid_to: 0,
460 discard_offset: None,
461 reclaim_size: options.reclaim_size,
462 image_builder_mode: None,
463 barriers_enabled: options.barriers_enabled,
464 needs_barrier: false,
465 }),
466 writer_mutex: Mutex::new(()),
467 sync_mutex: futures::lock::Mutex::new(()),
468 trace: AtomicBool::new(false),
469 reclaim_event: Event::new(),
470 }
471 }
472
473 pub fn set_trace(&self, trace: bool) {
474 let old_value = self.trace.swap(trace, Ordering::Relaxed);
475 if trace != old_value {
476 info!(trace; "J: trace");
477 }
478 }
479
480 pub fn set_image_builder_mode(&self, mode: Option<SuperBlockInstance>) {
481 self.inner.lock().image_builder_mode = mode;
482 if let Some(instance) = mode {
483 *self.super_block_manager.next_instance.lock() = instance;
484 }
485 }
486
487 pub fn image_builder_mode(&self) -> Option<SuperBlockInstance> {
488 self.inner.lock().image_builder_mode
489 }
490
491 #[cfg(feature = "migration")]
492 pub fn set_filesystem_uuid(&self, uuid: &[u8; 16]) -> Result<(), Error> {
493 ensure!(
494 self.inner.lock().image_builder_mode.is_some(),
495 "Can only set filesystem uuid in image builder mode."
496 );
497 self.inner.lock().super_block_header.guid.0 = uuid::Uuid::from_bytes(*uuid);
498 Ok(())
499 }
500
501 fn validate_mutation(&self, mutation: &Mutation, block_size: u64, device_size: u64) -> bool {
505 match mutation {
506 Mutation::ObjectStore(ObjectStoreMutation {
507 item:
508 Item {
509 key:
510 ObjectKey {
511 data:
512 ObjectKeyData::Attribute(
513 _,
514 AttributeKey::Extent(ExtentKey { range }),
515 ),
516 ..
517 },
518 value: ObjectValue::Extent(ExtentValue::Some { device_offset, mode, .. }),
519 ..
520 },
521 ..
522 }) => {
523 if range.is_empty() || !range.is_aligned(block_size) {
524 return false;
525 }
526 let len = range.length().unwrap();
527 if let ExtentMode::Cow(checksums) = mode {
528 if checksums.len() > 0 {
529 if len % checksums.len() as u64 != 0 {
530 return false;
531 }
532 if (len / checksums.len() as u64) % block_size != 0 {
533 return false;
534 }
535 }
536 }
537 if *device_offset % block_size != 0
538 || *device_offset >= device_size
539 || device_size - *device_offset < len
540 {
541 return false;
542 }
543 }
544 Mutation::ObjectStore(_) => {}
545 Mutation::EncryptedObjectStore(_) => {}
546 Mutation::Allocator(AllocatorMutation::Allocate { device_range, owner_object_id }) => {
547 return !device_range.is_empty()
548 && *owner_object_id != INVALID_OBJECT_ID
549 && device_range.end <= device_size;
550 }
551 Mutation::Allocator(AllocatorMutation::Deallocate {
552 device_range,
553 owner_object_id,
554 }) => {
555 return !device_range.is_empty()
556 && *owner_object_id != INVALID_OBJECT_ID
557 && device_range.end <= device_size;
558 }
559 Mutation::Allocator(AllocatorMutation::MarkForDeletion(owner_object_id)) => {
560 return *owner_object_id != INVALID_OBJECT_ID;
561 }
562 Mutation::Allocator(AllocatorMutation::SetLimit { owner_object_id, .. }) => {
563 return *owner_object_id != INVALID_OBJECT_ID;
564 }
565 Mutation::BeginFlush => {}
566 Mutation::EndFlush => {}
567 Mutation::DeleteVolume => {}
568 Mutation::UpdateBorrowed(_) => {}
569 Mutation::UpdateMutationsKey(_) => {}
570 Mutation::CreateInternalDir(owner_object_id) => {
571 return *owner_object_id != INVALID_OBJECT_ID;
572 }
573 }
574 true
575 }
576
577 fn update_checksum_list(
579 &self,
580 journal_offset: u64,
581 mutation: &Mutation,
582 checksum_list: &mut ChecksumList,
583 ) -> Result<(), Error> {
584 match mutation {
585 Mutation::ObjectStore(_) => {}
586 Mutation::Allocator(AllocatorMutation::Deallocate { device_range, .. }) => {
587 checksum_list.mark_deallocated(journal_offset, device_range.clone().into());
588 }
589 _ => {}
590 }
591 Ok(())
592 }
593
594 #[trace]
596 pub async fn replay(
597 &self,
598 filesystem: Arc<FxFilesystem>,
599 on_new_allocator: Option<Box<dyn Fn(Arc<Allocator>) + Send + Sync>>,
600 ) -> Result<(), Error> {
601 let block_size = filesystem.block_size();
602
603 let (super_block, root_parent) =
604 self.super_block_manager.load(filesystem.device(), block_size).await?;
605
606 let root_parent = Arc::new(ObjectStore::attach_filesystem(root_parent, filesystem.clone()));
607
608 self.objects.set_root_parent_store(root_parent.clone());
609 let allocator =
610 Arc::new(Allocator::new(filesystem.clone(), super_block.allocator_object_id));
611 if let Some(on_new_allocator) = on_new_allocator {
612 on_new_allocator(allocator.clone());
613 }
614 self.objects.set_allocator(allocator.clone());
615 self.objects.set_borrowed_metadata_space(super_block.borrowed_metadata_space);
616 self.objects.set_last_end_offset(super_block.super_block_journal_file_offset);
617 {
618 let mut inner = self.inner.lock();
619 inner.super_block_header = super_block.clone();
620 }
621
622 let device = filesystem.device();
623
624 let mut handle;
625 {
626 let root_parent_layer = root_parent.tree().mutable_layer();
627 let mut iter = root_parent_layer
628 .seek(Bound::Included(&ObjectKey::attribute(
629 super_block.journal_object_id,
630 DEFAULT_DATA_ATTRIBUTE_ID,
631 AttributeKey::Extent(ExtentKey::search_key_from_offset(round_down(
632 super_block.journal_checkpoint.file_offset,
633 BLOCK_SIZE,
634 ))),
635 )))
636 .await
637 .context("Failed to seek root parent store")?;
638 let start_offset = if let Some(ItemRef {
639 key:
640 ObjectKey {
641 data:
642 ObjectKeyData::Attribute(
643 DEFAULT_DATA_ATTRIBUTE_ID,
644 AttributeKey::Extent(ExtentKey { range }),
645 ),
646 ..
647 },
648 ..
649 }) = iter.get()
650 {
651 range.start
652 } else {
653 0
654 };
655 handle = BootstrapObjectHandle::new_with_start_offset(
656 super_block.journal_object_id,
657 device.clone(),
658 start_offset,
659 );
660 while let Some(item) = iter.get() {
661 if !match item.into() {
662 Some((
663 object_id,
664 DEFAULT_DATA_ATTRIBUTE_ID,
665 ExtentKey { range },
666 ExtentValue::Some { device_offset, .. },
667 )) if object_id == super_block.journal_object_id => {
668 if let Some(end_offset) = handle.end_offset() {
669 if range.start != end_offset {
670 bail!(anyhow!(FxfsError::Inconsistent).context(format!(
671 "Unexpected journal extent {:?}, expected start: {}",
672 item, end_offset
673 )));
674 }
675 }
676 handle.push_extent(
677 0, *device_offset
679 ..*device_offset + range.length().context("Invalid extent")?,
680 );
681 true
682 }
683 _ => false,
684 } {
685 break;
686 }
687 iter.advance().await.context("Failed to advance root parent store iterator")?;
688 }
689 }
690
691 let mut reader = JournalReader::new(handle, &super_block.journal_checkpoint);
692 let JournaledTransactions { mut transactions, device_flushed_offset } = self
693 .read_transactions(&mut reader, None, INVALID_OBJECT_ID)
694 .await
695 .context("Reading transactions for replay")?;
696
697 let mut checksum_list = ChecksumList::new(device_flushed_offset);
699 let mut valid_to = reader.journal_file_checkpoint().file_offset;
700 let device_size = device.size();
701 'bad_replay: for JournaledTransaction {
702 checkpoint,
703 root_parent_mutations,
704 root_mutations,
705 non_root_mutations,
706 checksums,
707 ..
708 } in &transactions
709 {
710 for JournaledChecksums { device_range, checksums, first_write } in checksums {
711 checksum_list
712 .push(
713 checkpoint.file_offset,
714 device_range.clone(),
715 checksums.maybe_as_ref().context("Malformed checksums")?,
716 *first_write,
717 )
718 .context("Pushing journal checksum records to checksum list")?;
719 }
720 for mutation in root_parent_mutations
721 .iter()
722 .chain(root_mutations)
723 .chain(non_root_mutations.iter().map(|(_, m)| m))
724 {
725 if !self.validate_mutation(mutation, block_size, device_size) {
726 info!(mutation:?; "Stopping replay at bad mutation");
727 valid_to = checkpoint.file_offset;
728 break 'bad_replay;
729 }
730 self.update_checksum_list(checkpoint.file_offset, &mutation, &mut checksum_list)?;
731 }
732 }
733
734 let valid_to = checksum_list
737 .verify(device.as_ref(), valid_to)
738 .await
739 .context("Failed to validate checksums")?;
740
741 let mut last_checkpoint = reader.journal_file_checkpoint();
744 let mut journal_offsets = super_block.journal_file_offsets.clone();
745
746 for (
749 index,
750 JournaledTransaction {
751 checkpoint,
752 root_parent_mutations,
753 end_flush,
754 volume_deleted,
755 ..
756 },
757 ) in transactions.iter_mut().enumerate()
758 {
759 if checkpoint.file_offset >= valid_to {
760 last_checkpoint = checkpoint.clone();
761
762 transactions.truncate(index);
764 break;
765 }
766
767 let context = ApplyContext { mode: ApplyMode::Replay, checkpoint: checkpoint.clone() };
768 for mutation in root_parent_mutations.drain(..) {
769 self.objects
770 .apply_mutation(
771 super_block.root_parent_store_object_id,
772 mutation,
773 &context,
774 AssocObj::None,
775 )
776 .context("Failed to replay root parent store mutations")?;
777 }
778
779 if let Some((object_id, journal_offset)) = end_flush {
780 journal_offsets.insert(*object_id, *journal_offset);
781 }
782
783 if let Some(object_id) = volume_deleted {
784 journal_offsets.insert(*object_id, VOLUME_DELETED);
785 }
786 }
787
788 let root_store = ObjectStore::open(
790 &root_parent,
791 super_block.root_store_object_id,
792 Box::new(NullCache {}),
793 )
794 .await
795 .context("Unable to open root store")?;
796
797 ensure!(
798 !root_store.is_encrypted(),
799 anyhow!(FxfsError::Inconsistent).context("Root store is encrypted")
800 );
801 self.objects.set_root_store(root_store);
802
803 let root_store_offset =
804 journal_offsets.get(&super_block.root_store_object_id).copied().unwrap_or(0);
805
806 for JournaledTransaction { checkpoint, root_mutations, .. } in &mut transactions {
808 if checkpoint.file_offset < root_store_offset {
809 continue;
810 }
811
812 let context = ApplyContext { mode: ApplyMode::Replay, checkpoint: checkpoint.clone() };
813 for mutation in root_mutations.drain(..) {
814 self.objects
815 .apply_mutation(
816 super_block.root_store_object_id,
817 mutation,
818 &context,
819 AssocObj::None,
820 )
821 .context("Failed to replay root store mutations")?;
822 }
823 }
824
825 allocator.open().await.context("Failed to open allocator")?;
827
828 for JournaledTransaction { checkpoint, non_root_mutations, end_offset, .. } in transactions
830 {
831 self.objects
832 .replay_mutations(
833 non_root_mutations,
834 &journal_offsets,
835 &ApplyContext { mode: ApplyMode::Replay, checkpoint },
836 end_offset,
837 )
838 .await
839 .context("Failed to replay mutations")?;
840 }
841
842 allocator.on_replay_complete().await.context("Failed to complete replay for allocator")?;
843
844 let discarded_to =
845 if last_checkpoint.file_offset != reader.journal_file_checkpoint().file_offset {
846 Some(reader.journal_file_checkpoint().file_offset)
847 } else {
848 None
849 };
850
851 {
853 if last_checkpoint.file_offset < super_block.super_block_journal_file_offset {
854 return Err(anyhow!(FxfsError::Inconsistent).context(format!(
855 "journal replay cut short; journal finishes at {}, but super-block was \
856 written at {}",
857 last_checkpoint.file_offset, super_block.super_block_journal_file_offset
858 )));
859 }
860 let handle = ObjectStore::open_object(
861 &root_parent,
862 super_block.journal_object_id,
863 journal_handle_options(),
864 None,
865 )
866 .await
867 .with_context(|| {
868 format!(
869 "Failed to open journal file (object id: {})",
870 super_block.journal_object_id
871 )
872 })?;
873 let _ = self.handle.set(handle);
874 let mut inner = self.inner.lock();
875 reader.skip_to_end_of_block();
876 let mut writer_checkpoint = reader.journal_file_checkpoint();
877
878 std::mem::drop(reader);
880
881 writer_checkpoint.checksum ^= RESET_XOR;
883 writer_checkpoint.version = LATEST_VERSION;
884 inner.flushed_offset = writer_checkpoint.file_offset;
885
886 inner.device_flushed_offset = inner.flushed_offset;
888
889 inner.writer.seek(writer_checkpoint);
890 inner.output_reset_version = true;
891 inner.valid_to = last_checkpoint.file_offset;
892 if last_checkpoint.file_offset < inner.flushed_offset {
893 inner.discard_offset = Some(last_checkpoint.file_offset);
894 }
895 }
896
897 self.objects
898 .on_replay_complete()
899 .await
900 .context("Failed to complete replay for object manager")?;
901
902 info!(checkpoint = last_checkpoint.file_offset, discarded_to; "replay complete");
903 Ok(())
904 }
905
906 async fn read_transactions(
907 &self,
908 reader: &mut JournalReader,
909 end_offset: Option<u64>,
910 object_id_filter: u64,
911 ) -> Result<JournaledTransactions, Error> {
912 let mut transactions = Vec::new();
913 let (mut device_flushed_offset, root_parent_store_object_id, root_store_object_id) = {
914 let super_block = &self.inner.lock().super_block_header;
915 (
916 super_block.super_block_journal_file_offset,
917 super_block.root_parent_store_object_id,
918 super_block.root_store_object_id,
919 )
920 };
921 let mut current_transaction = None;
922 let mut begin_flush_offsets = HashMap::default();
923 let mut stores_deleted = HashSet::new();
924 loop {
925 let checkpoint = reader.journal_file_checkpoint();
927 if let Some(end_offset) = end_offset {
928 if checkpoint.file_offset >= end_offset {
929 break;
930 }
931 }
932 let result =
933 reader.deserialize().await.context("Failed to deserialize journal record")?;
934 match result {
935 ReadResult::Reset(_) => {
936 if current_transaction.is_some() {
937 current_transaction = None;
938 transactions.pop();
939 }
940 let offset = reader.journal_file_checkpoint().file_offset;
941 if offset > device_flushed_offset {
942 device_flushed_offset = offset;
943 }
944 }
945 ReadResult::Some(record) => {
946 match record {
947 JournalRecord::EndBlock => {
948 reader.skip_to_end_of_block();
949 }
950 JournalRecord::Mutation { object_id, mutation } => {
951 let current_transaction = match current_transaction.as_mut() {
952 None => {
953 transactions.push(JournaledTransaction::new(checkpoint));
954 current_transaction = transactions.last_mut();
955 current_transaction.as_mut().unwrap()
956 }
957 Some(transaction) => transaction,
958 };
959
960 if stores_deleted.contains(&object_id) {
961 bail!(
962 anyhow!(FxfsError::Inconsistent)
963 .context("Encountered mutations for deleted store")
964 );
965 }
966
967 match &mutation {
968 Mutation::BeginFlush => {
969 begin_flush_offsets.insert(
970 object_id,
971 current_transaction.checkpoint.file_offset,
972 );
973 }
974 Mutation::EndFlush => {
975 if let Some(offset) = begin_flush_offsets.remove(&object_id) {
976 if let Some(deleted_volume) =
977 ¤t_transaction.volume_deleted
978 {
979 if *deleted_volume == object_id {
980 bail!(anyhow!(FxfsError::Inconsistent).context(
981 "Multiple EndFlush/DeleteVolume mutations in a \
982 single transaction for the same object"
983 ));
984 }
985 }
986 if current_transaction
989 .end_flush
990 .replace((object_id, offset + 1))
991 .is_some()
992 {
993 bail!(anyhow!(FxfsError::Inconsistent).context(
994 "Multiple EndFlush mutations in a \
995 single transaction"
996 ));
997 }
998 }
999 }
1000 Mutation::DeleteVolume => {
1001 if let Some((flushed_object, _)) =
1002 ¤t_transaction.end_flush
1003 {
1004 if *flushed_object == object_id {
1005 bail!(anyhow!(FxfsError::Inconsistent).context(
1006 "Multiple EndFlush/DeleteVolume mutations in a \
1007 single transaction for the same object"
1008 ));
1009 }
1010 }
1011 if current_transaction
1012 .volume_deleted
1013 .replace(object_id)
1014 .is_some()
1015 {
1016 bail!(anyhow!(FxfsError::Inconsistent).context(
1017 "Multiple DeleteVolume mutations in a single \
1018 transaction"
1019 ));
1020 }
1021 stores_deleted.insert(object_id);
1022 }
1023 _ => {}
1024 }
1025
1026 if (object_id_filter == INVALID_OBJECT_ID
1029 || object_id_filter == object_id)
1030 && self.should_apply(object_id, ¤t_transaction.checkpoint)
1031 {
1032 if object_id == root_parent_store_object_id {
1033 current_transaction.root_parent_mutations.push(mutation);
1034 } else if object_id == root_store_object_id {
1035 current_transaction.root_mutations.push(mutation);
1036 } else {
1037 current_transaction
1038 .non_root_mutations
1039 .push((object_id, mutation));
1040 }
1041 }
1042 }
1043 JournalRecord::DataChecksums(device_range, checksums, first_write) => {
1044 let current_transaction = match current_transaction.as_mut() {
1045 None => {
1046 transactions.push(JournaledTransaction::new(checkpoint));
1047 current_transaction = transactions.last_mut();
1048 current_transaction.as_mut().unwrap()
1049 }
1050 Some(transaction) => transaction,
1051 };
1052 current_transaction.checksums.push(JournaledChecksums {
1053 device_range,
1054 checksums,
1055 first_write,
1056 });
1057 }
1058 JournalRecord::Commit => {
1059 if let Some(&mut JournaledTransaction {
1060 ref checkpoint,
1061 ref root_parent_mutations,
1062 ref mut end_offset,
1063 ..
1064 }) = current_transaction.take()
1065 {
1066 for mutation in root_parent_mutations {
1067 if let Mutation::ObjectStore(ObjectStoreMutation {
1071 item:
1072 Item {
1073 key:
1074 ObjectKey {
1075 object_id,
1076 data:
1077 ObjectKeyData::Attribute(
1078 DEFAULT_DATA_ATTRIBUTE_ID,
1079 AttributeKey::Extent(ExtentKey {
1080 range,
1081 }),
1082 ),
1083 ..
1084 },
1085 value:
1086 ObjectValue::Extent(ExtentValue::Some {
1087 device_offset,
1088 ..
1089 }),
1090 ..
1091 },
1092 ..
1093 }) = mutation
1094 {
1095 let handle = reader.handle();
1098 if *object_id != handle.object_id() {
1099 continue;
1100 }
1101 if let Some(end_offset) = handle.end_offset() {
1102 if range.start != end_offset {
1103 bail!(anyhow!(FxfsError::Inconsistent).context(
1104 format!(
1105 "Unexpected journal extent {:?} -> {}, \
1106 expected start: {}",
1107 range, device_offset, end_offset,
1108 )
1109 ));
1110 }
1111 }
1112 handle.push_extent(
1113 checkpoint.file_offset,
1114 *device_offset
1115 ..*device_offset
1116 + range.length().context("Invalid extent")?,
1117 );
1118 }
1119 }
1120 *end_offset = reader.journal_file_checkpoint().file_offset;
1121 }
1122 }
1123 JournalRecord::Discard(offset) => {
1124 if offset == 0 {
1125 bail!(
1126 anyhow!(FxfsError::Inconsistent)
1127 .context("Invalid offset for Discard")
1128 );
1129 }
1130 if let Some(transaction) = current_transaction.as_ref() {
1131 if transaction.checkpoint.file_offset < offset {
1132 continue;
1134 }
1135 }
1136 current_transaction = None;
1137 while let Some(transaction) = transactions.last() {
1138 if transaction.checkpoint.file_offset < offset {
1139 break;
1140 }
1141 transactions.pop();
1142 }
1143 reader.handle().discard_extents(offset);
1144 }
1145 JournalRecord::DidFlushDevice(offset) => {
1146 if offset > device_flushed_offset {
1147 device_flushed_offset = offset;
1148 }
1149 }
1150 }
1151 }
1152 ReadResult::ChecksumMismatch => break,
1154 }
1155 }
1156
1157 if current_transaction.is_some() {
1159 transactions.pop();
1160 }
1161
1162 Ok(JournaledTransactions { transactions, device_flushed_offset })
1163 }
1164
1165 pub async fn init_empty(&self, filesystem: Arc<FxFilesystem>) -> Result<(), Error> {
1168 const INIT_ROOT_PARENT_STORE_OBJECT_ID: u64 = 3;
1174 const INIT_ROOT_STORE_OBJECT_ID: u64 = 4;
1175 const INIT_ALLOCATOR_OBJECT_ID: u64 = 5;
1176
1177 info!(device_size = filesystem.device().size(); "Formatting");
1178
1179 let checkpoint = JournalCheckpoint {
1180 version: LATEST_VERSION,
1181 ..self.inner.lock().writer.journal_file_checkpoint()
1182 };
1183
1184 let root_parent = ObjectStore::new_empty(
1185 None,
1186 INIT_ROOT_PARENT_STORE_OBJECT_ID,
1187 filesystem.clone(),
1188 Box::new(NullCache {}),
1189 );
1190 self.objects.set_root_parent_store(root_parent.clone());
1191
1192 let allocator = Arc::new(Allocator::new(filesystem.clone(), INIT_ALLOCATOR_OBJECT_ID));
1193 self.objects.set_allocator(allocator.clone());
1194 self.objects.init_metadata_reservation()?;
1195
1196 let journal_handle;
1197 let super_block_a_handle;
1198 let super_block_b_handle;
1199 let root_store;
1200 let mut transaction = filesystem
1201 .clone()
1202 .new_transaction(
1203 lock_keys![],
1204 Options { skip_journal_checks: true, ..Default::default() },
1205 )
1206 .await?;
1207 root_store = root_parent
1208 .new_child_store(
1209 &mut transaction,
1210 NewChildStoreOptions { object_id: INIT_ROOT_STORE_OBJECT_ID, ..Default::default() },
1211 Box::new(NullCache {}),
1212 )
1213 .await
1214 .context("new_child_store")?;
1215 self.objects.set_root_store(root_store.clone());
1216
1217 allocator.create(&mut transaction).await?;
1218
1219 super_block_a_handle = ObjectStore::create_object_with_id(
1221 &root_store,
1222 &mut transaction,
1223 SuperBlockInstance::A.object_id(),
1224 HandleOptions::default(),
1225 None,
1226 )
1227 .context("create super block")?;
1228 root_store.update_last_object_id(SuperBlockInstance::A.object_id());
1229 super_block_a_handle
1230 .extend(&mut transaction, SuperBlockInstance::A.first_extent())
1231 .await
1232 .context("extend super block")?;
1233 super_block_b_handle = ObjectStore::create_object_with_id(
1234 &root_store,
1235 &mut transaction,
1236 SuperBlockInstance::B.object_id(),
1237 HandleOptions::default(),
1238 None,
1239 )
1240 .context("create super block")?;
1241 root_store.update_last_object_id(SuperBlockInstance::B.object_id());
1242 super_block_b_handle
1243 .extend(&mut transaction, SuperBlockInstance::B.first_extent())
1244 .await
1245 .context("extend super block")?;
1246
1247 journal_handle = ObjectStore::create_object(
1249 &root_parent,
1250 &mut transaction,
1251 journal_handle_options(),
1252 None,
1253 )
1254 .await
1255 .context("create journal")?;
1256 if self.inner.lock().image_builder_mode.is_none() {
1257 let mut file_range = 0..self.chunk_size();
1258 journal_handle
1259 .preallocate_range(&mut transaction, &mut file_range)
1260 .await
1261 .context("preallocate journal")?;
1262 if file_range.start < file_range.end {
1263 bail!("preallocate_range returned too little space");
1264 }
1265 }
1266
1267 root_store.create(&mut transaction).await?;
1269
1270 root_parent
1272 .set_graveyard_directory_object_id(Graveyard::create(&mut transaction, &root_parent));
1273
1274 transaction.commit().await?;
1275
1276 self.inner.lock().super_block_header = SuperBlockHeader::new(
1277 root_parent.store_object_id(),
1278 root_parent.graveyard_directory_object_id(),
1279 root_store.store_object_id(),
1280 allocator.object_id(),
1281 journal_handle.object_id(),
1282 checkpoint,
1283 LATEST_VERSION,
1284 );
1285
1286 let _ = self.handle.set(journal_handle);
1288 Ok(())
1289 }
1290
1291 pub async fn allocate_journal(&self) -> Result<(), Error> {
1294 let handle = self.handle.get().unwrap();
1295 let filesystem = handle.store().filesystem();
1296 let mut transaction = filesystem
1297 .clone()
1298 .new_transaction(
1299 lock_keys![LockKey::object(handle.store().store_object_id(), handle.object_id()),],
1300 Options { skip_journal_checks: true, ..Default::default() },
1301 )
1302 .await?;
1303 let mut file_range = 0..self.chunk_size();
1304 self.handle
1305 .get()
1306 .unwrap()
1307 .preallocate_range(&mut transaction, &mut file_range)
1308 .await
1309 .context("preallocate journal")?;
1310 if file_range.start < file_range.end {
1311 bail!("preallocate_range returned too little space");
1312 }
1313 transaction.commit().await?;
1314 Ok(())
1315 }
1316
1317 pub async fn init_superblocks(&self) -> Result<(), Error> {
1318 for _ in 0..2 {
1320 self.write_super_block().await?;
1321 }
1322 Ok(())
1323 }
1324
1325 pub async fn read_transactions_for_object(
1331 &self,
1332 object_id: u64,
1333 ) -> Result<Vec<JournaledTransaction>, Error> {
1334 let handle = self.handle.get().expect("No journal handle");
1335 let handle = ObjectStore::open_object(
1337 handle.owner(),
1338 handle.object_id(),
1339 journal_handle_options(),
1340 None,
1341 )
1342 .await?;
1343
1344 let checkpoint = match self.objects.journal_checkpoint(object_id) {
1345 Some(checkpoint) => checkpoint,
1346 None => return Ok(vec![]),
1347 };
1348 let mut reader = JournalReader::new(handle, &checkpoint);
1349 let end_offset = self.inner.lock().valid_to;
1352 Ok(self.read_transactions(&mut reader, Some(end_offset), object_id).await?.transactions)
1353 }
1354
1355 pub async fn commit(&self, transaction: &mut Transaction<'_>) -> Result<u64, Error> {
1357 if transaction.is_empty() {
1358 return Ok(self.inner.lock().writer.journal_file_checkpoint().file_offset);
1359 }
1360
1361 self.pre_commit(transaction).await?;
1362 Ok(self.write_and_apply_mutations(transaction))
1363 }
1364
1365 async fn pre_commit(&self, transaction: &Transaction<'_>) -> Result<(), Error> {
1368 let handle;
1369
1370 let (size, zero_offset) = {
1371 let mut inner = self.inner.lock();
1372
1373 if std::mem::take(&mut inner.output_reset_version) {
1375 LATEST_VERSION.serialize_into(&mut inner.writer)?;
1376 }
1377
1378 if let Some(discard_offset) = inner.discard_offset {
1379 JournalRecord::Discard(discard_offset).serialize_into(&mut inner.writer)?;
1380 inner.discard_offset = None;
1381 }
1382
1383 if inner.needs_did_flush_device {
1384 let offset = inner.device_flushed_offset;
1385 JournalRecord::DidFlushDevice(offset).serialize_into(&mut inner.writer)?;
1386 inner.needs_did_flush_device = false;
1387 }
1388
1389 handle = match self.handle.get() {
1390 None => return Ok(()),
1391 Some(x) => x,
1392 };
1393
1394 let file_offset = inner.writer.journal_file_checkpoint().file_offset;
1395
1396 let size = handle.get_size();
1397 let size = if file_offset + self.chunk_size() > size { Some(size) } else { None };
1398
1399 if size.is_none()
1400 && inner.zero_offset.is_none()
1401 && !self.objects.needs_borrow_for_journal(file_offset)
1402 {
1403 return Ok(());
1404 }
1405
1406 (size, inner.zero_offset)
1407 };
1408
1409 let mut transaction = handle
1410 .new_transaction_with_options(Options {
1411 skip_journal_checks: true,
1412 borrow_metadata_space: true,
1413 allocator_reservation: Some(self.objects.metadata_reservation()),
1414 txn_guard: Some(transaction.txn_guard()),
1415 ..Default::default()
1416 })
1417 .await?;
1418 if let Some(size) = size {
1419 handle
1420 .preallocate_range(&mut transaction, &mut (size..size + self.chunk_size()))
1421 .await?;
1422 }
1423 if let Some(zero_offset) = zero_offset {
1424 handle.zero(&mut transaction, 0..zero_offset).await?;
1425 }
1426
1427 self.write_and_apply_mutations(&mut transaction);
1430
1431 let mut inner = self.inner.lock();
1432
1433 if let Some(size) = size {
1436 assert!(inner.writer.journal_file_checkpoint().file_offset < size);
1437 }
1438
1439 if inner.zero_offset == zero_offset {
1440 inner.zero_offset = None;
1441 }
1442
1443 Ok(())
1444 }
1445
1446 fn should_apply(&self, object_id: u64, journal_file_checkpoint: &JournalCheckpoint) -> bool {
1450 let super_block_header = &self.inner.lock().super_block_header;
1451 let offset = super_block_header
1452 .journal_file_offsets
1453 .get(&object_id)
1454 .cloned()
1455 .unwrap_or(super_block_header.super_block_journal_file_offset);
1456 journal_file_checkpoint.file_offset >= offset
1457 }
1458
1459 async fn write_super_block(&self) -> Result<(), Error> {
1462 let root_parent_store = self.objects.root_parent_store();
1463
1464 let old_layers;
1469 let old_super_block_offset;
1470 let mut new_super_block_header;
1471 let checkpoint;
1472 let borrowed;
1473
1474 {
1475 let _sync_guard = debug_assert_not_too_long!(self.sync_mutex.lock());
1476 {
1477 let _write_guard = self.writer_mutex.lock();
1478 (checkpoint, borrowed) = self.pad_to_block()?;
1479 old_layers = super_block::compact_root_parent(&*root_parent_store)?;
1480 }
1481 self.flush_device(checkpoint.file_offset)
1482 .await
1483 .context("flush failed when writing superblock")?;
1484 }
1485
1486 new_super_block_header = self.inner.lock().super_block_header.clone();
1487
1488 old_super_block_offset = new_super_block_header.journal_checkpoint.file_offset;
1489
1490 let (journal_file_offsets, min_checkpoint) = self.objects.journal_file_offsets();
1491
1492 new_super_block_header.generation =
1493 new_super_block_header.generation.checked_add(1).ok_or(FxfsError::Inconsistent)?;
1494 new_super_block_header.super_block_journal_file_offset = checkpoint.file_offset;
1495 new_super_block_header.journal_checkpoint = min_checkpoint.unwrap_or(checkpoint);
1496 new_super_block_header.journal_checkpoint.version = LATEST_VERSION;
1497 new_super_block_header.journal_file_offsets = journal_file_offsets;
1498 new_super_block_header.borrowed_metadata_space = borrowed;
1499
1500 self.super_block_manager
1501 .save(
1502 new_super_block_header.clone(),
1503 self.objects.root_parent_store().filesystem(),
1504 old_layers,
1505 )
1506 .await?;
1507 {
1508 let mut inner = self.inner.lock();
1509 inner.super_block_header = new_super_block_header;
1510 inner.zero_offset = Some(round_down(old_super_block_offset, BLOCK_SIZE));
1511 }
1512
1513 Ok(())
1514 }
1515
1516 pub async fn sync(
1523 &self,
1524 options: SyncOptions<'_>,
1525 ) -> Result<Option<(JournalCheckpoint, u64)>, Error> {
1526 let _guard = debug_assert_not_too_long!(self.sync_mutex.lock());
1527
1528 let (checkpoint, borrowed) = {
1529 if let Some(precondition) = options.precondition {
1530 if !precondition() {
1531 return Ok(None);
1532 }
1533 }
1534
1535 let _guard = self.writer_mutex.lock();
1538
1539 self.pad_to_block()?
1540 };
1541
1542 if options.flush_device {
1543 self.flush_device(checkpoint.file_offset).await.context("sync: flush failed")?;
1544 }
1545
1546 Ok(Some((checkpoint, borrowed)))
1547 }
1548
1549 fn pad_to_block(&self) -> Result<(JournalCheckpoint, u64), Error> {
1553 let mut inner = self.inner.lock();
1554 let checkpoint = inner.writer.journal_file_checkpoint();
1555 if checkpoint.file_offset % BLOCK_SIZE != 0 {
1556 JournalRecord::EndBlock.serialize_into(&mut inner.writer)?;
1557 inner.writer.pad_to_block()?;
1558 if let Some(waker) = inner.flush_waker.take() {
1559 waker.wake();
1560 }
1561 }
1562 Ok((checkpoint, self.objects.borrowed_metadata_space()))
1563 }
1564
1565 async fn flush_device(&self, checkpoint_offset: u64) -> Result<(), Error> {
1566 debug_assert_not_too_long!(poll_fn(|ctx| {
1567 let mut inner = self.inner.lock();
1568 if inner.flushed_offset >= checkpoint_offset {
1569 Poll::Ready(Ok(()))
1570 } else if inner.terminate {
1571 let context = inner
1572 .terminate_reason
1573 .as_ref()
1574 .map(|e| format!("Journal closed with error: {:?}", e))
1575 .unwrap_or_else(|| "Journal closed".to_string());
1576 Poll::Ready(Err(anyhow!(FxfsError::JournalFlushError).context(context)))
1577 } else {
1578 inner.sync_waker = Some(ctx.waker().clone());
1579 Poll::Pending
1580 }
1581 }))?;
1582
1583 let needs_flush = self.inner.lock().device_flushed_offset < checkpoint_offset;
1584 if needs_flush {
1585 let trace = self.trace.load(Ordering::Relaxed);
1586 if trace {
1587 info!("J: start flush device");
1588 }
1589 self.handle.get().unwrap().flush_device().await?;
1590 if trace {
1591 info!("J: end flush device");
1592 }
1593
1594 {
1602 let mut inner = self.inner.lock();
1603 inner.device_flushed_offset = checkpoint_offset;
1604 inner.needs_did_flush_device = true;
1605 }
1606
1607 self.objects.allocator().did_flush_device(checkpoint_offset);
1610 if trace {
1611 info!("J: did flush device");
1612 }
1613 }
1614
1615 Ok(())
1616 }
1617
1618 pub fn super_block_header(&self) -> SuperBlockHeader {
1620 self.inner.lock().super_block_header.clone()
1621 }
1622
1623 pub async fn check_journal_space(&self) -> Result<(), Error> {
1625 loop {
1626 debug_assert_not_too_long!({
1627 let inner = self.inner.lock();
1628 if inner.terminate {
1629 let context = inner
1632 .terminate_reason
1633 .as_ref()
1634 .map(|e| format!("Journal closed with error: {:?}", e))
1635 .unwrap_or_else(|| "Journal closed".to_string());
1636 break Err(anyhow!(FxfsError::JournalFlushError).context(context));
1637 }
1638 if self.objects.last_end_offset()
1639 - inner.super_block_header.journal_checkpoint.file_offset
1640 < inner.reclaim_size
1641 {
1642 break Ok(());
1643 }
1644 if inner.image_builder_mode.is_some() {
1645 break Ok(());
1646 }
1647 if inner.disable_compactions {
1648 break Err(
1649 anyhow!(FxfsError::JournalFlushError).context("Compactions disabled")
1650 );
1651 }
1652 self.reclaim_event.listen()
1653 });
1654 }
1655 }
1656
1657 fn chunk_size(&self) -> u64 {
1658 CHUNK_SIZE
1659 }
1660
1661 fn write_and_apply_mutations(&self, transaction: &mut Transaction<'_>) -> u64 {
1662 let checkpoint_before;
1663 let checkpoint_after;
1664 {
1665 let _guard = self.writer_mutex.lock();
1666 checkpoint_before = {
1667 let mut inner = self.inner.lock();
1668 if transaction.includes_write() {
1669 inner.needs_barrier = true;
1670 }
1671 let checkpoint = inner.writer.journal_file_checkpoint();
1672 for TxnMutation { object_id, mutation, .. } in transaction.mutations() {
1673 self.objects.write_mutation(
1674 *object_id,
1675 mutation,
1676 Writer(*object_id, &mut inner.writer),
1677 );
1678 }
1679 checkpoint
1680 };
1681 let maybe_mutation =
1682 self.objects.apply_transaction(transaction, &checkpoint_before).expect(
1683 "apply_transaction should not fail in live mode; \
1684 filesystem will be in an inconsistent state",
1685 );
1686 checkpoint_after = {
1687 let mut inner = self.inner.lock();
1688 if let Some(mutation) = maybe_mutation {
1689 inner
1690 .writer
1691 .write_record(&JournalRecord::Mutation { object_id: 0, mutation })
1692 .unwrap();
1693 }
1694 for (device_range, checksums, first_write) in
1695 transaction.take_checksums().into_iter()
1696 {
1697 inner
1698 .writer
1699 .write_record(&JournalRecord::DataChecksums(
1700 device_range,
1701 Checksums::fletcher(checksums),
1702 first_write,
1703 ))
1704 .unwrap();
1705 }
1706 inner.writer.write_record(&JournalRecord::Commit).unwrap();
1707
1708 inner.writer.journal_file_checkpoint()
1709 };
1710 }
1711 self.objects.did_commit_transaction(
1712 transaction,
1713 &checkpoint_before,
1714 checkpoint_after.file_offset,
1715 );
1716
1717 if let Some(waker) = self.inner.lock().flush_waker.take() {
1718 waker.wake();
1719 }
1720
1721 checkpoint_before.file_offset
1722 }
1723
1724 pub async fn flush_task(self: Arc<Self>) {
1728 let mut flush_fut = None;
1729 let mut compact_fut = None;
1730 let mut flush_error = false;
1731 poll_fn(|ctx| {
1732 loop {
1733 {
1734 let mut inner = self.inner.lock();
1735 if flush_fut.is_none() && !flush_error && self.handle.get().is_some() {
1736 let flushable = inner.writer.flushable_bytes();
1737 if flushable > 0 {
1738 flush_fut = Some(self.flush(flushable).boxed());
1739 }
1740 }
1741 if inner.terminate && flush_fut.is_none() && compact_fut.is_none() {
1742 return Poll::Ready(());
1743 }
1744 if compact_fut.is_none()
1748 && !inner.terminate
1749 && !inner.disable_compactions
1750 && inner.image_builder_mode.is_none()
1751 && self.objects.last_end_offset()
1752 - inner.super_block_header.journal_checkpoint.file_offset
1753 > inner.reclaim_size / 2
1754 {
1755 compact_fut = Some(self.compact().boxed());
1756 inner.compaction_running = true;
1757 }
1758 inner.flush_waker = Some(ctx.waker().clone());
1759 }
1760 let mut pending = true;
1761 if let Some(fut) = flush_fut.as_mut() {
1762 if let Poll::Ready(result) = fut.poll_unpin(ctx) {
1763 if let Err(e) = result {
1764 self.inner.lock().terminate(Some(e.context("Flush error")));
1765 self.reclaim_event.notify(usize::MAX);
1766 flush_error = true;
1767 }
1768 flush_fut = None;
1769 pending = false;
1770 }
1771 }
1772 if let Some(fut) = compact_fut.as_mut() {
1773 if let Poll::Ready(result) = fut.poll_unpin(ctx) {
1774 let mut inner = self.inner.lock();
1775 if let Err(e) = result {
1776 inner.terminate(Some(e.context("Compaction error")));
1777 }
1778 compact_fut = None;
1779 inner.compaction_running = false;
1780 self.reclaim_event.notify(usize::MAX);
1781 pending = false;
1782 }
1783 }
1784 if pending {
1785 return Poll::Pending;
1786 }
1787 }
1788 })
1789 .await;
1790 }
1791
1792 async fn flush(&self, amount: usize) -> Result<(), Error> {
1793 let handle = self.handle.get().unwrap();
1794 let mut buf = handle.allocate_buffer(amount).await;
1795 let (offset, len, barrier_on_first_write) = {
1796 let mut inner = self.inner.lock();
1797 let offset = inner.writer.take_flushable(buf.as_mut());
1798 let barrier_on_first_write = inner.needs_barrier && inner.barriers_enabled;
1799 inner.needs_barrier = false;
1802 (offset, buf.len() as u64, barrier_on_first_write)
1803 };
1804 self.handle
1805 .get()
1806 .unwrap()
1807 .overwrite(
1808 offset,
1809 buf.as_mut(),
1810 OverwriteOptions { barrier_on_first_write, ..Default::default() },
1811 )
1812 .await?;
1813
1814 let mut inner = self.inner.lock();
1815 if let Some(waker) = inner.sync_waker.take() {
1816 waker.wake();
1817 }
1818 inner.flushed_offset = offset + len;
1819 inner.valid_to = inner.flushed_offset;
1820 Ok(())
1821 }
1822
1823 #[trace]
1826 pub async fn compact(&self) -> Result<(), Error> {
1827 let trace = self.trace.load(Ordering::Relaxed);
1828 debug!("Compaction starting");
1829 if trace {
1830 info!("J: start compaction");
1831 }
1832 let earliest_version = self.objects.flush().await.context("Failed to flush objects")?;
1833 self.inner.lock().super_block_header.earliest_version = earliest_version;
1834 self.write_super_block().await.context("Failed to write superblock")?;
1835 if trace {
1836 info!("J: end compaction");
1837 }
1838 debug!("Compaction finished");
1839 Ok(())
1840 }
1841
1842 pub async fn stop_compactions(&self) {
1843 loop {
1844 debug_assert_not_too_long!({
1845 let mut inner = self.inner.lock();
1846 inner.disable_compactions = true;
1847 if !inner.compaction_running {
1848 return;
1849 }
1850 self.reclaim_event.listen()
1851 });
1852 }
1853 }
1854
1855 pub fn track_statistics(self: &Arc<Self>, parent: &fuchsia_inspect::Node, name: &str) {
1858 let this = Arc::downgrade(self);
1859 parent.record_lazy_child(name, move || {
1860 let this_clone = this.clone();
1861 async move {
1862 let inspector = fuchsia_inspect::Inspector::default();
1863 if let Some(this) = this_clone.upgrade() {
1864 let (journal_min, journal_max, journal_reclaim_size) = {
1865 let inner = this.inner.lock();
1867 (
1868 round_down(
1869 inner.super_block_header.journal_checkpoint.file_offset,
1870 BLOCK_SIZE,
1871 ),
1872 inner.flushed_offset,
1873 inner.reclaim_size,
1874 )
1875 };
1876 let root = inspector.root();
1877 root.record_uint("journal_min_offset", journal_min);
1878 root.record_uint("journal_max_offset", journal_max);
1879 root.record_uint("journal_size", journal_max - journal_min);
1880 root.record_uint("journal_reclaim_size", journal_reclaim_size);
1881
1882 if let Some(x) = round_div(
1884 100 * (journal_max - journal_min),
1885 this.objects.allocator().get_disk_bytes(),
1886 ) {
1887 root.record_uint("journal_size_to_disk_size_percent", x);
1888 }
1889 }
1890 Ok(inspector)
1891 }
1892 .boxed()
1893 });
1894 }
1895
1896 pub fn terminate(&self) {
1898 self.inner.lock().terminate(None);
1899 self.reclaim_event.notify(usize::MAX);
1900 }
1901}
1902
1903pub struct Writer<'a>(u64, &'a mut JournalWriter);
1905
1906impl Writer<'_> {
1907 pub fn write(&mut self, mutation: Mutation) {
1908 self.1.write_record(&JournalRecord::Mutation { object_id: self.0, mutation }).unwrap();
1909 }
1910}
1911
1912#[cfg(test)]
1913mod tests {
1914 use crate::filesystem::{FxFilesystem, SyncOptions};
1915 use crate::fsck::fsck;
1916 use crate::object_handle::{ObjectHandle, ReadObjectHandle, WriteObjectHandle};
1917 use crate::object_store::directory::Directory;
1918 use crate::object_store::transaction::Options;
1919 use crate::object_store::volume::root_volume;
1920 use crate::object_store::{
1921 HandleOptions, LockKey, NewChildStoreOptions, ObjectStore, StoreOptions, lock_keys,
1922 };
1923 use fuchsia_async as fasync;
1924 use fuchsia_async::MonotonicDuration;
1925 use storage_device::DeviceHolder;
1926 use storage_device::fake_device::FakeDevice;
1927
1928 const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
1929
1930 #[fuchsia::test]
1931 async fn test_replay() {
1932 const TEST_DATA: &[u8] = b"hello";
1933
1934 let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1935
1936 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1937
1938 let object_id = {
1939 let root_store = fs.root_store();
1940 let root_directory =
1941 Directory::open(&root_store, root_store.root_directory_object_id())
1942 .await
1943 .expect("open failed");
1944 let mut transaction = fs
1945 .clone()
1946 .new_transaction(
1947 lock_keys![LockKey::object(
1948 root_store.store_object_id(),
1949 root_store.root_directory_object_id(),
1950 )],
1951 Options::default(),
1952 )
1953 .await
1954 .expect("new_transaction failed");
1955 let handle = root_directory
1956 .create_child_file(&mut transaction, "test")
1957 .await
1958 .expect("create_child_file failed");
1959
1960 transaction.commit().await.expect("commit failed");
1961 let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
1962 buf.as_mut_slice().copy_from_slice(TEST_DATA);
1963 handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
1964 fs.sync(SyncOptions::default()).await.expect("sync failed");
1967 handle.object_id()
1968 };
1969
1970 {
1971 fs.close().await.expect("Close failed");
1972 let device = fs.take_device().await;
1973 device.reopen(false);
1974 let fs = FxFilesystem::open(device).await.expect("open failed");
1975 let handle = ObjectStore::open_object(
1976 &fs.root_store(),
1977 object_id,
1978 HandleOptions::default(),
1979 None,
1980 )
1981 .await
1982 .expect("open_object failed");
1983 let mut buf = handle.allocate_buffer(TEST_DEVICE_BLOCK_SIZE as usize).await;
1984 assert_eq!(handle.read(0, buf.as_mut()).await.expect("read failed"), TEST_DATA.len());
1985 assert_eq!(&buf.as_slice()[..TEST_DATA.len()], TEST_DATA);
1986 fsck(fs.clone()).await.expect("fsck failed");
1987 fs.close().await.expect("Close failed");
1988 }
1989 }
1990
1991 #[fuchsia::test]
1992 async fn test_reset() {
1993 const TEST_DATA: &[u8] = b"hello";
1994
1995 let device = DeviceHolder::new(FakeDevice::new(32768, TEST_DEVICE_BLOCK_SIZE));
1996
1997 let mut object_ids = Vec::new();
1998
1999 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2000 {
2001 let root_store = fs.root_store();
2002 let root_directory =
2003 Directory::open(&root_store, root_store.root_directory_object_id())
2004 .await
2005 .expect("open failed");
2006 let mut transaction = fs
2007 .clone()
2008 .new_transaction(
2009 lock_keys![LockKey::object(
2010 root_store.store_object_id(),
2011 root_store.root_directory_object_id(),
2012 )],
2013 Options::default(),
2014 )
2015 .await
2016 .expect("new_transaction failed");
2017 let handle = root_directory
2018 .create_child_file(&mut transaction, "test")
2019 .await
2020 .expect("create_child_file failed");
2021 transaction.commit().await.expect("commit failed");
2022 let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
2023 buf.as_mut_slice().copy_from_slice(TEST_DATA);
2024 handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2025 fs.sync(SyncOptions::default()).await.expect("sync failed");
2026 object_ids.push(handle.object_id());
2027
2028 for i in 0..1000 {
2031 let mut transaction = fs
2032 .clone()
2033 .new_transaction(
2034 lock_keys![LockKey::object(
2035 root_store.store_object_id(),
2036 root_store.root_directory_object_id(),
2037 )],
2038 Options::default(),
2039 )
2040 .await
2041 .expect("new_transaction failed");
2042 let handle = root_directory
2043 .create_child_file(&mut transaction, &format!("{}", i))
2044 .await
2045 .expect("create_child_file failed");
2046 transaction.commit().await.expect("commit failed");
2047 let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
2048 buf.as_mut_slice().copy_from_slice(TEST_DATA);
2049 handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2050 object_ids.push(handle.object_id());
2051 }
2052 }
2053 fs.close().await.expect("fs close failed");
2054 let device = fs.take_device().await;
2055 device.reopen(false);
2056 let fs = FxFilesystem::open(device).await.expect("open failed");
2057 fsck(fs.clone()).await.expect("fsck failed");
2058 {
2059 let root_store = fs.root_store();
2060 for &object_id in &object_ids[0..1] {
2062 let handle = ObjectStore::open_object(
2063 &root_store,
2064 object_id,
2065 HandleOptions::default(),
2066 None,
2067 )
2068 .await
2069 .expect("open_object failed");
2070 let mut buf = handle.allocate_buffer(TEST_DEVICE_BLOCK_SIZE as usize).await;
2071 assert_eq!(
2072 handle.read(0, buf.as_mut()).await.expect("read failed"),
2073 TEST_DATA.len()
2074 );
2075 assert_eq!(&buf.as_slice()[..TEST_DATA.len()], TEST_DATA);
2076 }
2077
2078 let root_directory =
2080 Directory::open(&root_store, root_store.root_directory_object_id())
2081 .await
2082 .expect("open failed");
2083 let mut transaction = fs
2084 .clone()
2085 .new_transaction(
2086 lock_keys![LockKey::object(
2087 root_store.store_object_id(),
2088 root_store.root_directory_object_id(),
2089 )],
2090 Options::default(),
2091 )
2092 .await
2093 .expect("new_transaction failed");
2094 let handle = root_directory
2095 .create_child_file(&mut transaction, "test2")
2096 .await
2097 .expect("create_child_file failed");
2098 transaction.commit().await.expect("commit failed");
2099 let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
2100 buf.as_mut_slice().copy_from_slice(TEST_DATA);
2101 handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2102 fs.sync(SyncOptions::default()).await.expect("sync failed");
2103 object_ids.push(handle.object_id());
2104 }
2105
2106 fs.close().await.expect("close failed");
2107 let device = fs.take_device().await;
2108 device.reopen(false);
2109 let fs = FxFilesystem::open(device).await.expect("open failed");
2110 {
2111 fsck(fs.clone()).await.expect("fsck failed");
2112
2113 for &object_id in object_ids[0..1].iter().chain(object_ids.last().cloned().iter()) {
2115 let handle = ObjectStore::open_object(
2116 &fs.root_store(),
2117 object_id,
2118 HandleOptions::default(),
2119 None,
2120 )
2121 .await
2122 .unwrap_or_else(|e| {
2123 panic!("open_object failed (object_id: {}): {:?}", object_id, e)
2124 });
2125 let mut buf = handle.allocate_buffer(TEST_DEVICE_BLOCK_SIZE as usize).await;
2126 assert_eq!(
2127 handle.read(0, buf.as_mut()).await.expect("read failed"),
2128 TEST_DATA.len()
2129 );
2130 assert_eq!(&buf.as_slice()[..TEST_DATA.len()], TEST_DATA);
2131 }
2132 }
2133 fs.close().await.expect("close failed");
2134 }
2135
2136 #[fuchsia::test]
2137 async fn test_discard() {
2138 let device = {
2139 let device = DeviceHolder::new(FakeDevice::new(8192, 4096));
2140 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2141 let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
2142
2143 let store = root_volume
2144 .new_volume("test", NewChildStoreOptions::default())
2145 .await
2146 .expect("new_volume failed");
2147 let root_directory = Directory::open(&store, store.root_directory_object_id())
2148 .await
2149 .expect("open failed");
2150
2151 let mut i = 0;
2153 loop {
2154 let mut transaction = fs
2155 .clone()
2156 .new_transaction(
2157 lock_keys![LockKey::object(
2158 store.store_object_id(),
2159 store.root_directory_object_id()
2160 )],
2161 Options::default(),
2162 )
2163 .await
2164 .expect("new_transaction failed");
2165 root_directory
2166 .create_child_file(&mut transaction, &format!("a {i}"))
2167 .await
2168 .expect("create_child_file failed");
2169 if transaction.commit().await.expect("commit failed") > super::CHUNK_SIZE {
2170 break;
2171 }
2172 i += 1;
2173 }
2174
2175 fs.journal().compact().await.expect("compact failed");
2177 fs.journal().stop_compactions().await;
2178
2179 let mut i = 0;
2181 loop {
2182 let mut transaction = fs
2183 .clone()
2184 .new_transaction(
2185 lock_keys![LockKey::object(
2186 store.store_object_id(),
2187 store.root_directory_object_id()
2188 )],
2189 Options::default(),
2190 )
2191 .await
2192 .expect("new_transaction failed");
2193 root_directory
2194 .create_child_file(&mut transaction, &format!("b {i}"))
2195 .await
2196 .expect("create_child_file failed");
2197 if transaction.commit().await.expect("commit failed") > 2 * super::CHUNK_SIZE {
2198 break;
2199 }
2200 i += 1;
2201 }
2202
2203 fasync::Timer::new(MonotonicDuration::from_millis(10)).await;
2205 fs.device().snapshot().expect("snapshot failed")
2208 };
2209
2210 let fs = FxFilesystem::open(device).await.expect("open failed");
2211
2212 {
2213 let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
2214
2215 let store =
2216 root_volume.volume("test", StoreOptions::default()).await.expect("volume failed");
2217
2218 let root_directory = Directory::open(&store, store.root_directory_object_id())
2219 .await
2220 .expect("open failed");
2221
2222 let mut transaction = fs
2224 .clone()
2225 .new_transaction(
2226 lock_keys![LockKey::object(
2227 store.store_object_id(),
2228 store.root_directory_object_id()
2229 )],
2230 Options::default(),
2231 )
2232 .await
2233 .expect("new_transaction failed");
2234 root_directory
2235 .create_child_file(&mut transaction, &format!("d"))
2236 .await
2237 .expect("create_child_file failed");
2238 transaction.commit().await.expect("commit failed");
2239 }
2240
2241 fs.close().await.expect("close failed");
2242 let device = fs.take_device().await;
2243 device.reopen(false);
2244
2245 let fs = FxFilesystem::open(device).await.expect("open failed");
2246 fsck(fs.clone()).await.expect("fsck failed");
2247 fs.close().await.expect("close failed");
2248 }
2249}
2250
2251#[cfg(fuzz)]
2252mod fuzz {
2253 use fuzz::fuzz;
2254
2255 #[fuzz]
2256 fn fuzz_journal_bytes(input: Vec<u8>) {
2257 use crate::filesystem::FxFilesystem;
2258 use fuchsia_async as fasync;
2259 use std::io::Write;
2260 use storage_device::DeviceHolder;
2261 use storage_device::fake_device::FakeDevice;
2262
2263 fasync::SendExecutorBuilder::new().num_threads(4).build().run(async move {
2264 let device = DeviceHolder::new(FakeDevice::new(32768, 512));
2265 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2266 fs.journal().inner.lock().writer.write_all(&input).expect("write failed");
2267 fs.close().await.expect("close failed");
2268 let device = fs.take_device().await;
2269 device.reopen(false);
2270 if let Ok(fs) = FxFilesystem::open(device).await {
2271 let _ = fs.close().await;
2274 }
2275 });
2276 }
2277
2278 #[fuzz]
2279 fn fuzz_journal(input: Vec<super::JournalRecord>) {
2280 use crate::filesystem::FxFilesystem;
2281 use fuchsia_async as fasync;
2282 use storage_device::DeviceHolder;
2283 use storage_device::fake_device::FakeDevice;
2284
2285 fasync::SendExecutorBuilder::new().num_threads(4).build().run(async move {
2286 let device = DeviceHolder::new(FakeDevice::new(32768, 512));
2287 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2288 {
2289 let mut inner = fs.journal().inner.lock();
2290 for record in &input {
2291 let _ = inner.writer.write_record(record);
2292 }
2293 }
2294 fs.close().await.expect("close failed");
2295 let device = fs.take_device().await;
2296 device.reopen(false);
2297 if let Ok(fs) = FxFilesystem::open(device).await {
2298 let _ = fs.close().await;
2301 }
2302 });
2303 }
2304}