1mod bootstrap_handle;
20mod checksum_list;
21mod reader;
22pub mod super_block;
23mod writer;
24
25use crate::checksum::{Checksum, Checksums, ChecksumsV38};
26use crate::errors::FxfsError;
27use crate::filesystem::{ApplyContext, ApplyMode, FxFilesystem, SyncOptions};
28use crate::log::*;
29use crate::lsm_tree::cache::NullCache;
30use crate::lsm_tree::types::Layer;
31use crate::object_handle::{ObjectHandle as _, ReadObjectHandle};
32use crate::object_store::allocator::Allocator;
33use crate::object_store::data_object_handle::OverwriteOptions;
34use crate::object_store::extent_record::{
35 DEFAULT_DATA_ATTRIBUTE_ID, ExtentKey, ExtentMode, ExtentValue,
36};
37use crate::object_store::graveyard::Graveyard;
38use crate::object_store::journal::bootstrap_handle::BootstrapObjectHandle;
39use crate::object_store::journal::checksum_list::ChecksumList;
40use crate::object_store::journal::reader::{JournalReader, ReadResult};
41use crate::object_store::journal::super_block::{
42 SuperBlockHeader, SuperBlockInstance, SuperBlockManager,
43};
44use crate::object_store::journal::writer::JournalWriter;
45use crate::object_store::object_manager::ObjectManager;
46use crate::object_store::object_record::{AttributeKey, ObjectKey, ObjectKeyData, ObjectValue};
47use crate::object_store::transaction::{
48 AllocatorMutation, LockKey, Mutation, MutationV40, MutationV41, MutationV43, MutationV46,
49 MutationV47, MutationV49, MutationV50, ObjectStoreMutation, Options,
50 TRANSACTION_MAX_JOURNAL_USAGE, Transaction, TxnMutation, lock_keys,
51};
52use crate::object_store::{
53 AssocObj, DataObjectHandle, HandleOptions, HandleOwner, INVALID_OBJECT_ID, Item, ItemRef,
54 NewChildStoreOptions, ObjectStore, ReservedId,
55};
56use crate::range::RangeExt;
57use crate::round::{round_div, round_down};
58use crate::serialized_types::{LATEST_VERSION, Migrate, Version, Versioned, migrate_to_version};
59use anyhow::{Context, Error, anyhow, bail, ensure};
60use event_listener::Event;
61use fprint::TypeFingerprint;
62use fuchsia_inspect::NumericProperty;
63use fuchsia_sync::Mutex;
64use futures::FutureExt as _;
65use futures::future::poll_fn;
66use rustc_hash::FxHashMap as HashMap;
67use serde::{Deserialize, Serialize};
68use static_assertions::const_assert;
69use std::clone::Clone;
70use std::collections::HashSet;
71use std::num::NonZero;
72use std::ops::{Bound, Range};
73use std::sync::atomic::{AtomicBool, Ordering};
74use std::sync::{Arc, OnceLock};
75use std::task::{Poll, Waker};
76use storage_device::Device;
77
78pub const BLOCK_SIZE: u64 = 4096;
80
81const CHUNK_SIZE: u64 = 131_072;
83const_assert!(CHUNK_SIZE > TRANSACTION_MAX_JOURNAL_USAGE);
84
85pub const DEFAULT_RECLAIM_SIZE: u64 = 524_288;
87
88pub const RESERVED_SPACE: u64 = 1_048_576;
91
92const RESET_XOR: u64 = 0xffffffffffffffff;
97
98pub type JournalCheckpoint = JournalCheckpointV32;
102
103#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize, TypeFingerprint)]
104pub struct JournalCheckpointV32 {
105 pub file_offset: u64,
106
107 pub checksum: Checksum,
110
111 pub version: Version,
115}
116
117pub type JournalRecord = JournalRecordV50;
118
119#[allow(clippy::large_enum_variant)]
120#[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint, Versioned)]
121#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
122pub enum JournalRecordV50 {
123 EndBlock,
125 Mutation { object_id: u64, mutation: MutationV50 },
128 Commit,
130 Discard(u64),
132 DidFlushDevice(u64),
142 DataChecksums(Range<u64>, ChecksumsV38, bool),
150}
151
152#[allow(clippy::large_enum_variant)]
153#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
154#[migrate_to_version(JournalRecordV50)]
155pub enum JournalRecordV49 {
156 EndBlock,
157 Mutation { object_id: u64, mutation: MutationV49 },
158 Commit,
159 Discard(u64),
160 DidFlushDevice(u64),
161 DataChecksums(Range<u64>, ChecksumsV38, bool),
162}
163
164#[allow(clippy::large_enum_variant)]
165#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
166#[migrate_to_version(JournalRecordV49)]
167pub enum JournalRecordV47 {
168 EndBlock,
169 Mutation { object_id: u64, mutation: MutationV47 },
170 Commit,
171 Discard(u64),
172 DidFlushDevice(u64),
173 DataChecksums(Range<u64>, ChecksumsV38, bool),
174}
175
176#[allow(clippy::large_enum_variant)]
177#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
178#[migrate_to_version(JournalRecordV47)]
179pub enum JournalRecordV46 {
180 EndBlock,
181 Mutation { object_id: u64, mutation: MutationV46 },
182 Commit,
183 Discard(u64),
184 DidFlushDevice(u64),
185 DataChecksums(Range<u64>, ChecksumsV38, bool),
186}
187
188#[allow(clippy::large_enum_variant)]
189#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
190#[migrate_to_version(JournalRecordV46)]
191pub enum JournalRecordV43 {
192 EndBlock,
193 Mutation { object_id: u64, mutation: MutationV43 },
194 Commit,
195 Discard(u64),
196 DidFlushDevice(u64),
197 DataChecksums(Range<u64>, ChecksumsV38, bool),
198}
199
200#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
201#[migrate_to_version(JournalRecordV43)]
202pub enum JournalRecordV42 {
203 EndBlock,
204 Mutation { object_id: u64, mutation: MutationV41 },
205 Commit,
206 Discard(u64),
207 DidFlushDevice(u64),
208 DataChecksums(Range<u64>, ChecksumsV38, bool),
209}
210
211#[derive(Serialize, Deserialize, TypeFingerprint, Versioned)]
212pub enum JournalRecordV41 {
213 EndBlock,
214 Mutation { object_id: u64, mutation: MutationV41 },
215 Commit,
216 Discard(u64),
217 DidFlushDevice(u64),
218 DataChecksums(Range<u64>, ChecksumsV38),
219}
220
221impl From<JournalRecordV41> for JournalRecordV42 {
222 fn from(record: JournalRecordV41) -> Self {
223 match record {
224 JournalRecordV41::EndBlock => Self::EndBlock,
225 JournalRecordV41::Mutation { object_id, mutation } => {
226 Self::Mutation { object_id, mutation: mutation.into() }
227 }
228 JournalRecordV41::Commit => Self::Commit,
229 JournalRecordV41::Discard(offset) => Self::Discard(offset),
230 JournalRecordV41::DidFlushDevice(offset) => Self::DidFlushDevice(offset),
231 JournalRecordV41::DataChecksums(range, sums) => {
232 Self::DataChecksums(range, sums, true)
235 }
236 }
237 }
238}
239
240#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
241#[migrate_to_version(JournalRecordV41)]
242pub enum JournalRecordV40 {
243 EndBlock,
244 Mutation { object_id: u64, mutation: MutationV40 },
245 Commit,
246 Discard(u64),
247 DidFlushDevice(u64),
248 DataChecksums(Range<u64>, ChecksumsV38),
249}
250
251pub(super) fn journal_handle_options() -> HandleOptions {
252 HandleOptions { skip_journal_checks: true, ..Default::default() }
253}
254
255pub struct Journal {
261 objects: Arc<ObjectManager>,
262 handle: OnceLock<DataObjectHandle<ObjectStore>>,
263 super_block_manager: SuperBlockManager,
264 inner: Mutex<Inner>,
265 writer_mutex: Mutex<()>,
266 sync_mutex: futures::lock::Mutex<()>,
267 trace: AtomicBool,
268
269 reclaim_event: Event,
271}
272
273struct Inner {
274 super_block_header: SuperBlockHeader,
275
276 zero_offset: Option<u64>,
278
279 device_flushed_offset: u64,
281
282 needs_did_flush_device: bool,
284
285 writer: JournalWriter,
287
288 output_reset_version: bool,
291
292 flush_waker: Option<Waker>,
294
295 terminate: bool,
297
298 terminate_reason: Option<Error>,
300
301 disable_compactions: bool,
303
304 compaction_running: bool,
306
307 sync_waker: Option<Waker>,
309
310 flushed_offset: u64,
312
313 valid_to: u64,
318
319 discard_offset: Option<u64>,
323
324 reclaim_size: u64,
328
329 image_builder_mode: Option<SuperBlockInstance>,
330
331 barriers_enabled: bool,
335
336 needs_barrier: bool,
339
340 forced_compaction: bool,
342}
343
344impl Inner {
345 fn terminate(&mut self, reason: Option<Error>) {
346 self.terminate = true;
347
348 if let Some(err) = reason {
349 error!(error:? = err; "Terminating journal");
350 if let Some(prev_err) = self.terminate_reason.as_ref() {
352 error!(error:? = prev_err; "Journal previously terminated");
353 } else {
354 self.terminate_reason = Some(err);
355 }
356 }
357
358 if let Some(waker) = self.flush_waker.take() {
359 waker.wake();
360 }
361 if let Some(waker) = self.sync_waker.take() {
362 waker.wake();
363 }
364 }
365}
366
367pub struct JournalOptions {
368 pub reclaim_size: u64,
372
373 pub barriers_enabled: bool,
377}
378
379impl Default for JournalOptions {
380 fn default() -> Self {
381 JournalOptions { reclaim_size: DEFAULT_RECLAIM_SIZE, barriers_enabled: false }
382 }
383}
384
385struct JournaledTransactions {
386 transactions: Vec<JournaledTransaction>,
387 device_flushed_offset: u64,
388}
389
390#[derive(Debug, Default)]
391pub struct JournaledTransaction {
392 pub checkpoint: JournalCheckpoint,
393 pub root_parent_mutations: Vec<Mutation>,
394 pub root_mutations: Vec<Mutation>,
395 pub non_root_mutations: Vec<(u64, Mutation)>,
397 pub end_offset: u64,
398 pub checksums: Vec<JournaledChecksums>,
399
400 pub end_flush: Option<(u64, u64)>,
403
404 pub volume_deleted: Option<u64>,
406}
407
408impl JournaledTransaction {
409 fn new(checkpoint: JournalCheckpoint) -> Self {
410 Self { checkpoint, ..Default::default() }
411 }
412}
413
414const VOLUME_DELETED: u64 = u64::MAX;
415
416#[derive(Debug)]
417pub struct JournaledChecksums {
418 pub device_range: Range<u64>,
419 pub checksums: Checksums,
420 pub first_write: bool,
421}
422
423pub trait JournalHandle: ReadObjectHandle {
426 fn end_offset(&self) -> Option<u64>;
430 fn push_extent(&mut self, added_offset: u64, device_range: Range<u64>);
434 fn discard_extents(&mut self, discard_offset: u64);
436}
437
438impl<S: HandleOwner> JournalHandle for DataObjectHandle<S> {
442 fn end_offset(&self) -> Option<u64> {
443 None
444 }
445 fn push_extent(&mut self, _added_offset: u64, _device_range: Range<u64>) {
446 }
448 fn discard_extents(&mut self, _discard_offset: u64) {
449 }
451}
452
453#[fxfs_trace::trace]
454impl Journal {
455 pub fn new(objects: Arc<ObjectManager>, options: JournalOptions) -> Journal {
456 let starting_checksum = rand::random_range(1..u64::MAX);
457 Journal {
458 objects: objects,
459 handle: OnceLock::new(),
460 super_block_manager: SuperBlockManager::new(),
461 inner: Mutex::new(Inner {
462 super_block_header: SuperBlockHeader::default(),
463 zero_offset: None,
464 device_flushed_offset: 0,
465 needs_did_flush_device: false,
466 writer: JournalWriter::new(BLOCK_SIZE as usize, starting_checksum),
467 output_reset_version: false,
468 flush_waker: None,
469 terminate: false,
470 terminate_reason: None,
471 disable_compactions: false,
472 compaction_running: false,
473 sync_waker: None,
474 flushed_offset: 0,
475 valid_to: 0,
476 discard_offset: None,
477 reclaim_size: options.reclaim_size,
478 image_builder_mode: None,
479 barriers_enabled: options.barriers_enabled,
480 needs_barrier: false,
481 forced_compaction: false,
482 }),
483 writer_mutex: Mutex::new(()),
484 sync_mutex: futures::lock::Mutex::new(()),
485 trace: AtomicBool::new(false),
486 reclaim_event: Event::new(),
487 }
488 }
489
490 pub fn set_trace(&self, trace: bool) {
491 let old_value = self.trace.swap(trace, Ordering::Relaxed);
492 if trace != old_value {
493 info!(trace; "J: trace");
494 }
495 }
496
497 pub fn set_image_builder_mode(&self, mode: Option<SuperBlockInstance>) {
498 self.inner.lock().image_builder_mode = mode;
499 if let Some(instance) = mode {
500 *self.super_block_manager.next_instance.lock() = instance;
501 }
502 }
503
504 pub fn image_builder_mode(&self) -> Option<SuperBlockInstance> {
505 self.inner.lock().image_builder_mode
506 }
507
508 #[cfg(feature = "migration")]
509 pub fn set_filesystem_uuid(&self, uuid: &[u8; 16]) -> Result<(), Error> {
510 ensure!(
511 self.inner.lock().image_builder_mode.is_some(),
512 "Can only set filesystem uuid in image builder mode."
513 );
514 self.inner.lock().super_block_header.guid.0 = uuid::Uuid::from_bytes(*uuid);
515 Ok(())
516 }
517
518 pub(crate) async fn read_superblocks(
519 &self,
520 device: Arc<dyn Device>,
521 block_size: u64,
522 ) -> Result<(SuperBlockHeader, ObjectStore), Error> {
523 self.super_block_manager.load(device, block_size).await
524 }
525
526 fn validate_mutation(&self, mutation: &Mutation, block_size: u64, device_size: u64) -> bool {
530 match mutation {
531 Mutation::ObjectStore(ObjectStoreMutation {
532 item:
533 Item {
534 key:
535 ObjectKey {
536 data:
537 ObjectKeyData::Attribute(
538 _,
539 AttributeKey::Extent(ExtentKey { range }),
540 ),
541 ..
542 },
543 value: ObjectValue::Extent(ExtentValue::Some { device_offset, mode, .. }),
544 ..
545 },
546 ..
547 }) => {
548 if range.is_empty() || !range.is_aligned(block_size) {
549 return false;
550 }
551 let len = range.length().unwrap();
552 if let ExtentMode::Cow(checksums) = mode {
553 if checksums.len() > 0 {
554 if len % checksums.len() as u64 != 0 {
555 return false;
556 }
557 if (len / checksums.len() as u64) % block_size != 0 {
558 return false;
559 }
560 }
561 }
562 if *device_offset % block_size != 0
563 || *device_offset >= device_size
564 || device_size - *device_offset < len
565 {
566 return false;
567 }
568 }
569 Mutation::ObjectStore(_) => {}
570 Mutation::EncryptedObjectStore(_) => {}
571 Mutation::Allocator(AllocatorMutation::Allocate { device_range, owner_object_id }) => {
572 return !device_range.is_empty()
573 && *owner_object_id != INVALID_OBJECT_ID
574 && device_range.end <= device_size;
575 }
576 Mutation::Allocator(AllocatorMutation::Deallocate {
577 device_range,
578 owner_object_id,
579 }) => {
580 return !device_range.is_empty()
581 && *owner_object_id != INVALID_OBJECT_ID
582 && device_range.end <= device_size;
583 }
584 Mutation::Allocator(AllocatorMutation::MarkForDeletion(owner_object_id)) => {
585 return *owner_object_id != INVALID_OBJECT_ID;
586 }
587 Mutation::Allocator(AllocatorMutation::SetLimit { owner_object_id, .. }) => {
588 return *owner_object_id != INVALID_OBJECT_ID;
589 }
590 Mutation::BeginFlush => {}
591 Mutation::EndFlush => {}
592 Mutation::DeleteVolume => {}
593 Mutation::UpdateBorrowed(_) => {}
594 Mutation::UpdateMutationsKey(_) => {}
595 Mutation::CreateInternalDir(owner_object_id) => {
596 return *owner_object_id != INVALID_OBJECT_ID;
597 }
598 }
599 true
600 }
601
602 fn update_checksum_list(
604 &self,
605 journal_offset: u64,
606 mutation: &Mutation,
607 checksum_list: &mut ChecksumList,
608 ) -> Result<(), Error> {
609 match mutation {
610 Mutation::ObjectStore(_) => {}
611 Mutation::Allocator(AllocatorMutation::Deallocate { device_range, .. }) => {
612 checksum_list.mark_deallocated(journal_offset, device_range.clone().into());
613 }
614 _ => {}
615 }
616 Ok(())
617 }
618
619 #[trace]
621 pub async fn replay(
622 &self,
623 filesystem: Arc<FxFilesystem>,
624 on_new_allocator: Option<Box<dyn Fn(Arc<Allocator>) + Send + Sync>>,
625 ) -> Result<(), Error> {
626 let block_size = filesystem.block_size();
627
628 let (super_block, root_parent) =
629 self.super_block_manager.load(filesystem.device(), block_size).await?;
630
631 let root_parent = Arc::new(ObjectStore::attach_filesystem(root_parent, filesystem.clone()));
632
633 self.objects.set_root_parent_store(root_parent.clone());
634 let allocator =
635 Arc::new(Allocator::new(filesystem.clone(), super_block.allocator_object_id));
636 if let Some(on_new_allocator) = on_new_allocator {
637 on_new_allocator(allocator.clone());
638 }
639 self.objects.set_allocator(allocator.clone());
640 self.objects.set_borrowed_metadata_space(super_block.borrowed_metadata_space);
641 self.objects.set_last_end_offset(super_block.super_block_journal_file_offset);
642 {
643 let mut inner = self.inner.lock();
644 inner.super_block_header = super_block.clone();
645 }
646
647 let device = filesystem.device();
648
649 let mut handle;
650 {
651 let root_parent_layer = root_parent.tree().mutable_layer();
652 let mut iter = root_parent_layer
653 .seek(Bound::Included(&ObjectKey::attribute(
654 super_block.journal_object_id,
655 DEFAULT_DATA_ATTRIBUTE_ID,
656 AttributeKey::Extent(ExtentKey::search_key_from_offset(round_down(
657 super_block.journal_checkpoint.file_offset,
658 BLOCK_SIZE,
659 ))),
660 )))
661 .await
662 .context("Failed to seek root parent store")?;
663 let start_offset = if let Some(ItemRef {
664 key:
665 ObjectKey {
666 data:
667 ObjectKeyData::Attribute(
668 DEFAULT_DATA_ATTRIBUTE_ID,
669 AttributeKey::Extent(ExtentKey { range }),
670 ),
671 ..
672 },
673 ..
674 }) = iter.get()
675 {
676 range.start
677 } else {
678 0
679 };
680 handle = BootstrapObjectHandle::new_with_start_offset(
681 super_block.journal_object_id,
682 device.clone(),
683 start_offset,
684 );
685 while let Some(item) = iter.get() {
686 if !match item.into() {
687 Some((
688 object_id,
689 DEFAULT_DATA_ATTRIBUTE_ID,
690 ExtentKey { range },
691 ExtentValue::Some { device_offset, .. },
692 )) if object_id == super_block.journal_object_id => {
693 if let Some(end_offset) = handle.end_offset() {
694 if range.start != end_offset {
695 bail!(anyhow!(FxfsError::Inconsistent).context(format!(
696 "Unexpected journal extent {:?}, expected start: {}",
697 item, end_offset
698 )));
699 }
700 }
701 handle.push_extent(
702 0, *device_offset
704 ..*device_offset + range.length().context("Invalid extent")?,
705 );
706 true
707 }
708 _ => false,
709 } {
710 break;
711 }
712 iter.advance().await.context("Failed to advance root parent store iterator")?;
713 }
714 }
715
716 let mut reader = JournalReader::new(handle, &super_block.journal_checkpoint);
717 let JournaledTransactions { mut transactions, device_flushed_offset } = self
718 .read_transactions(&mut reader, None, INVALID_OBJECT_ID)
719 .await
720 .context("Reading transactions for replay")?;
721
722 let mut checksum_list = ChecksumList::new(device_flushed_offset);
724 let mut valid_to = reader.journal_file_checkpoint().file_offset;
725 let device_size = device.size();
726 'bad_replay: for JournaledTransaction {
727 checkpoint,
728 root_parent_mutations,
729 root_mutations,
730 non_root_mutations,
731 checksums,
732 ..
733 } in &transactions
734 {
735 for JournaledChecksums { device_range, checksums, first_write } in checksums {
736 checksum_list
737 .push(
738 checkpoint.file_offset,
739 device_range.clone(),
740 checksums.maybe_as_ref().context("Malformed checksums")?,
741 *first_write,
742 )
743 .context("Pushing journal checksum records to checksum list")?;
744 }
745 for mutation in root_parent_mutations
746 .iter()
747 .chain(root_mutations)
748 .chain(non_root_mutations.iter().map(|(_, m)| m))
749 {
750 if !self.validate_mutation(mutation, block_size, device_size) {
751 info!(mutation:?; "Stopping replay at bad mutation");
752 valid_to = checkpoint.file_offset;
753 break 'bad_replay;
754 }
755 self.update_checksum_list(checkpoint.file_offset, &mutation, &mut checksum_list)?;
756 }
757 }
758
759 let valid_to = checksum_list
762 .verify(device.as_ref(), valid_to)
763 .await
764 .context("Failed to validate checksums")?;
765
766 let mut last_checkpoint = reader.journal_file_checkpoint();
769 let mut journal_offsets = super_block.journal_file_offsets.clone();
770
771 for (
774 index,
775 JournaledTransaction {
776 checkpoint,
777 root_parent_mutations,
778 end_flush,
779 volume_deleted,
780 ..
781 },
782 ) in transactions.iter_mut().enumerate()
783 {
784 if checkpoint.file_offset >= valid_to {
785 last_checkpoint = checkpoint.clone();
786
787 transactions.truncate(index);
789 break;
790 }
791
792 let context = ApplyContext { mode: ApplyMode::Replay, checkpoint: checkpoint.clone() };
793 for mutation in root_parent_mutations.drain(..) {
794 self.objects
795 .apply_mutation(
796 super_block.root_parent_store_object_id,
797 mutation,
798 &context,
799 AssocObj::None,
800 )
801 .context("Failed to replay root parent store mutations")?;
802 }
803
804 if let Some((object_id, journal_offset)) = end_flush {
805 journal_offsets.insert(*object_id, *journal_offset);
806 }
807
808 if let Some(object_id) = volume_deleted {
809 journal_offsets.insert(*object_id, VOLUME_DELETED);
810 }
811 }
812
813 let root_store = ObjectStore::open(
815 &root_parent,
816 super_block.root_store_object_id,
817 Box::new(NullCache {}),
818 )
819 .await
820 .context("Unable to open root store")?;
821
822 ensure!(
823 !root_store.is_encrypted(),
824 anyhow!(FxfsError::Inconsistent).context("Root store is encrypted")
825 );
826 self.objects.set_root_store(root_store);
827
828 let root_store_offset =
829 journal_offsets.get(&super_block.root_store_object_id).copied().unwrap_or(0);
830
831 for JournaledTransaction { checkpoint, root_mutations, .. } in &mut transactions {
833 if checkpoint.file_offset < root_store_offset {
834 continue;
835 }
836
837 let context = ApplyContext { mode: ApplyMode::Replay, checkpoint: checkpoint.clone() };
838 for mutation in root_mutations.drain(..) {
839 self.objects
840 .apply_mutation(
841 super_block.root_store_object_id,
842 mutation,
843 &context,
844 AssocObj::None,
845 )
846 .context("Failed to replay root store mutations")?;
847 }
848 }
849
850 allocator.open().await.context("Failed to open allocator")?;
852
853 for JournaledTransaction { checkpoint, non_root_mutations, end_offset, .. } in transactions
855 {
856 self.objects
857 .replay_mutations(
858 non_root_mutations,
859 &journal_offsets,
860 &ApplyContext { mode: ApplyMode::Replay, checkpoint },
861 end_offset,
862 )
863 .await
864 .context("Failed to replay mutations")?;
865 }
866
867 allocator.on_replay_complete().await.context("Failed to complete replay for allocator")?;
868
869 let discarded_to =
870 if last_checkpoint.file_offset != reader.journal_file_checkpoint().file_offset {
871 Some(reader.journal_file_checkpoint().file_offset)
872 } else {
873 None
874 };
875
876 {
878 if last_checkpoint.file_offset < super_block.super_block_journal_file_offset {
879 return Err(anyhow!(FxfsError::Inconsistent).context(format!(
880 "journal replay cut short; journal finishes at {}, but super-block was \
881 written at {}",
882 last_checkpoint.file_offset, super_block.super_block_journal_file_offset
883 )));
884 }
885 let handle = ObjectStore::open_object(
886 &root_parent,
887 super_block.journal_object_id,
888 journal_handle_options(),
889 None,
890 )
891 .await
892 .with_context(|| {
893 format!(
894 "Failed to open journal file (object id: {})",
895 super_block.journal_object_id
896 )
897 })?;
898 let _ = self.handle.set(handle);
899 let mut inner = self.inner.lock();
900 reader.skip_to_end_of_block();
901 let mut writer_checkpoint = reader.journal_file_checkpoint();
902
903 std::mem::drop(reader);
905
906 writer_checkpoint.checksum ^= RESET_XOR;
908 writer_checkpoint.version = LATEST_VERSION;
909 inner.flushed_offset = writer_checkpoint.file_offset;
910
911 inner.device_flushed_offset = inner.flushed_offset;
913
914 inner.writer.seek(writer_checkpoint);
915 inner.output_reset_version = true;
916 inner.valid_to = last_checkpoint.file_offset;
917 if last_checkpoint.file_offset < inner.flushed_offset {
918 inner.discard_offset = Some(last_checkpoint.file_offset);
919 }
920 }
921
922 self.objects
923 .on_replay_complete()
924 .await
925 .context("Failed to complete replay for object manager")?;
926
927 info!(checkpoint = last_checkpoint.file_offset, discarded_to; "replay complete");
928 Ok(())
929 }
930
931 async fn read_transactions(
932 &self,
933 reader: &mut JournalReader,
934 end_offset: Option<u64>,
935 object_id_filter: u64,
936 ) -> Result<JournaledTransactions, Error> {
937 let mut transactions = Vec::new();
938 let (mut device_flushed_offset, root_parent_store_object_id, root_store_object_id) = {
939 let super_block = &self.inner.lock().super_block_header;
940 (
941 super_block.super_block_journal_file_offset,
942 super_block.root_parent_store_object_id,
943 super_block.root_store_object_id,
944 )
945 };
946 let mut current_transaction = None;
947 let mut begin_flush_offsets = HashMap::default();
948 let mut stores_deleted = HashSet::new();
949 loop {
950 let checkpoint = reader.journal_file_checkpoint();
952 if let Some(end_offset) = end_offset {
953 if checkpoint.file_offset >= end_offset {
954 break;
955 }
956 }
957 let result =
958 reader.deserialize().await.context("Failed to deserialize journal record")?;
959 match result {
960 ReadResult::Reset(_) => {
961 if current_transaction.is_some() {
962 current_transaction = None;
963 transactions.pop();
964 }
965 let offset = reader.journal_file_checkpoint().file_offset;
966 if offset > device_flushed_offset {
967 device_flushed_offset = offset;
968 }
969 }
970 ReadResult::Some(record) => {
971 match record {
972 JournalRecord::EndBlock => {
973 reader.skip_to_end_of_block();
974 }
975 JournalRecord::Mutation { object_id, mutation } => {
976 let current_transaction = match current_transaction.as_mut() {
977 None => {
978 transactions.push(JournaledTransaction::new(checkpoint));
979 current_transaction = transactions.last_mut();
980 current_transaction.as_mut().unwrap()
981 }
982 Some(transaction) => transaction,
983 };
984
985 if stores_deleted.contains(&object_id) {
986 bail!(
987 anyhow!(FxfsError::Inconsistent)
988 .context("Encountered mutations for deleted store")
989 );
990 }
991
992 match &mutation {
993 Mutation::BeginFlush => {
994 begin_flush_offsets.insert(
995 object_id,
996 current_transaction.checkpoint.file_offset,
997 );
998 }
999 Mutation::EndFlush => {
1000 if let Some(offset) = begin_flush_offsets.remove(&object_id) {
1001 if let Some(deleted_volume) =
1002 ¤t_transaction.volume_deleted
1003 {
1004 if *deleted_volume == object_id {
1005 bail!(anyhow!(FxfsError::Inconsistent).context(
1006 "Multiple EndFlush/DeleteVolume mutations in a \
1007 single transaction for the same object"
1008 ));
1009 }
1010 }
1011 if current_transaction
1015 .end_flush
1016 .replace((object_id, offset + 1))
1017 .is_some()
1018 {
1019 bail!(anyhow!(FxfsError::Inconsistent).context(
1020 "Multiple EndFlush mutations in a \
1021 single transaction"
1022 ));
1023 }
1024 }
1025 }
1026 Mutation::DeleteVolume => {
1027 if let Some((flushed_object, _)) =
1028 ¤t_transaction.end_flush
1029 {
1030 if *flushed_object == object_id {
1031 bail!(anyhow!(FxfsError::Inconsistent).context(
1032 "Multiple EndFlush/DeleteVolume mutations in a \
1033 single transaction for the same object"
1034 ));
1035 }
1036 }
1037 if current_transaction
1038 .volume_deleted
1039 .replace(object_id)
1040 .is_some()
1041 {
1042 bail!(anyhow!(FxfsError::Inconsistent).context(
1043 "Multiple DeleteVolume mutations in a single \
1044 transaction"
1045 ));
1046 }
1047 stores_deleted.insert(object_id);
1048 }
1049 _ => {}
1050 }
1051
1052 if (object_id_filter == INVALID_OBJECT_ID
1055 || object_id_filter == object_id)
1056 && self.should_apply(object_id, ¤t_transaction.checkpoint)
1057 {
1058 if object_id == root_parent_store_object_id {
1059 current_transaction.root_parent_mutations.push(mutation);
1060 } else if object_id == root_store_object_id {
1061 current_transaction.root_mutations.push(mutation);
1062 } else {
1063 current_transaction
1064 .non_root_mutations
1065 .push((object_id, mutation));
1066 }
1067 }
1068 }
1069 JournalRecord::DataChecksums(device_range, checksums, first_write) => {
1070 let current_transaction = match current_transaction.as_mut() {
1071 None => {
1072 transactions.push(JournaledTransaction::new(checkpoint));
1073 current_transaction = transactions.last_mut();
1074 current_transaction.as_mut().unwrap()
1075 }
1076 Some(transaction) => transaction,
1077 };
1078 current_transaction.checksums.push(JournaledChecksums {
1079 device_range,
1080 checksums,
1081 first_write,
1082 });
1083 }
1084 JournalRecord::Commit => {
1085 if let Some(&mut JournaledTransaction {
1086 ref checkpoint,
1087 ref root_parent_mutations,
1088 ref mut end_offset,
1089 ..
1090 }) = current_transaction.take()
1091 {
1092 for mutation in root_parent_mutations {
1093 if let Mutation::ObjectStore(ObjectStoreMutation {
1097 item:
1098 Item {
1099 key:
1100 ObjectKey {
1101 object_id,
1102 data:
1103 ObjectKeyData::Attribute(
1104 DEFAULT_DATA_ATTRIBUTE_ID,
1105 AttributeKey::Extent(ExtentKey {
1106 range,
1107 }),
1108 ),
1109 ..
1110 },
1111 value:
1112 ObjectValue::Extent(ExtentValue::Some {
1113 device_offset,
1114 ..
1115 }),
1116 ..
1117 },
1118 ..
1119 }) = mutation
1120 {
1121 let handle = reader.handle();
1124 if *object_id != handle.object_id() {
1125 continue;
1126 }
1127 if let Some(end_offset) = handle.end_offset() {
1128 if range.start != end_offset {
1129 bail!(anyhow!(FxfsError::Inconsistent).context(
1130 format!(
1131 "Unexpected journal extent {:?} -> {}, \
1132 expected start: {}",
1133 range, device_offset, end_offset,
1134 )
1135 ));
1136 }
1137 }
1138 handle.push_extent(
1139 checkpoint.file_offset,
1140 *device_offset
1141 ..*device_offset
1142 + range.length().context("Invalid extent")?,
1143 );
1144 }
1145 }
1146 *end_offset = reader.journal_file_checkpoint().file_offset;
1147 }
1148 }
1149 JournalRecord::Discard(offset) => {
1150 if offset == 0 {
1151 bail!(
1152 anyhow!(FxfsError::Inconsistent)
1153 .context("Invalid offset for Discard")
1154 );
1155 }
1156 if let Some(transaction) = current_transaction.as_ref() {
1157 if transaction.checkpoint.file_offset < offset {
1158 continue;
1160 }
1161 }
1162 current_transaction = None;
1163 while let Some(transaction) = transactions.last() {
1164 if transaction.checkpoint.file_offset < offset {
1165 break;
1166 }
1167 transactions.pop();
1168 }
1169 reader.handle().discard_extents(offset);
1170 }
1171 JournalRecord::DidFlushDevice(offset) => {
1172 if offset > device_flushed_offset {
1173 device_flushed_offset = offset;
1174 }
1175 }
1176 }
1177 }
1178 ReadResult::ChecksumMismatch => break,
1180 }
1181 }
1182
1183 if current_transaction.is_some() {
1185 transactions.pop();
1186 }
1187
1188 Ok(JournaledTransactions { transactions, device_flushed_offset })
1189 }
1190
1191 pub async fn init_empty(&self, filesystem: Arc<FxFilesystem>) -> Result<(), Error> {
1194 const INIT_ROOT_PARENT_STORE_OBJECT_ID: u64 = 3;
1200 const INIT_ROOT_STORE_OBJECT_ID: u64 = 4;
1201 const INIT_ALLOCATOR_OBJECT_ID: u64 = 5;
1202
1203 info!(device_size = filesystem.device().size(); "Formatting");
1204
1205 let checkpoint = JournalCheckpoint {
1206 version: LATEST_VERSION,
1207 ..self.inner.lock().writer.journal_file_checkpoint()
1208 };
1209
1210 let mut current_generation = 1;
1211 if filesystem.options().image_builder_mode.is_some() {
1212 let block_size = filesystem.block_size();
1220 match self.read_superblocks(filesystem.device(), block_size).await {
1221 Ok((super_block, _)) => {
1222 log::info!(
1223 "Found existing superblock with generation {}. Bumping by 1.",
1224 super_block.generation
1225 );
1226 current_generation = super_block.generation.wrapping_add(1);
1227 }
1228 Err(_) => {
1229 }
1233 }
1234 }
1235
1236 let root_parent = ObjectStore::new_empty(
1237 None,
1238 INIT_ROOT_PARENT_STORE_OBJECT_ID,
1239 filesystem.clone(),
1240 Box::new(NullCache {}),
1241 );
1242 self.objects.set_root_parent_store(root_parent.clone());
1243
1244 let allocator = Arc::new(Allocator::new(filesystem.clone(), INIT_ALLOCATOR_OBJECT_ID));
1245 self.objects.set_allocator(allocator.clone());
1246 self.objects.init_metadata_reservation()?;
1247
1248 let journal_handle;
1249 let super_block_a_handle;
1250 let super_block_b_handle;
1251 let root_store;
1252 let mut transaction = filesystem
1253 .clone()
1254 .new_transaction(
1255 lock_keys![],
1256 Options { skip_journal_checks: true, ..Default::default() },
1257 )
1258 .await?;
1259 root_store = root_parent
1260 .new_child_store(
1261 &mut transaction,
1262 NewChildStoreOptions { object_id: INIT_ROOT_STORE_OBJECT_ID, ..Default::default() },
1263 Box::new(NullCache {}),
1264 )
1265 .await
1266 .context("new_child_store")?;
1267 self.objects.set_root_store(root_store.clone());
1268
1269 allocator.create(&mut transaction).await?;
1270
1271 super_block_a_handle = ObjectStore::create_object_with_id(
1273 &root_store,
1274 &mut transaction,
1275 ReservedId::new(&root_store, NonZero::new(SuperBlockInstance::A.object_id()).unwrap()),
1276 HandleOptions::default(),
1277 None,
1278 )
1279 .context("create super block")?;
1280 root_store.update_last_object_id(SuperBlockInstance::A.object_id());
1281 super_block_a_handle
1282 .extend(&mut transaction, SuperBlockInstance::A.first_extent())
1283 .await
1284 .context("extend super block")?;
1285 super_block_b_handle = ObjectStore::create_object_with_id(
1286 &root_store,
1287 &mut transaction,
1288 ReservedId::new(&root_store, NonZero::new(SuperBlockInstance::B.object_id()).unwrap()),
1289 HandleOptions::default(),
1290 None,
1291 )
1292 .context("create super block")?;
1293 root_store.update_last_object_id(SuperBlockInstance::B.object_id());
1294 super_block_b_handle
1295 .extend(&mut transaction, SuperBlockInstance::B.first_extent())
1296 .await
1297 .context("extend super block")?;
1298
1299 journal_handle = ObjectStore::create_object(
1301 &root_parent,
1302 &mut transaction,
1303 journal_handle_options(),
1304 None,
1305 )
1306 .await
1307 .context("create journal")?;
1308 if self.inner.lock().image_builder_mode.is_none() {
1309 let mut file_range = 0..self.chunk_size();
1310 journal_handle
1311 .preallocate_range(&mut transaction, &mut file_range)
1312 .await
1313 .context("preallocate journal")?;
1314 if file_range.start < file_range.end {
1315 bail!("preallocate_range returned too little space");
1316 }
1317 }
1318
1319 root_store.create(&mut transaction).await?;
1321
1322 root_parent.set_graveyard_directory_object_id(
1324 Graveyard::create(&mut transaction, &root_parent).await?,
1325 );
1326
1327 transaction.commit().await?;
1328
1329 self.inner.lock().super_block_header = SuperBlockHeader::new(
1330 current_generation,
1331 root_parent.store_object_id(),
1332 root_parent.graveyard_directory_object_id(),
1333 root_store.store_object_id(),
1334 allocator.object_id(),
1335 journal_handle.object_id(),
1336 checkpoint,
1337 LATEST_VERSION,
1338 );
1339
1340 let _ = self.handle.set(journal_handle);
1342 Ok(())
1343 }
1344
1345 pub async fn allocate_journal(&self) -> Result<(), Error> {
1348 let handle = self.handle.get().unwrap();
1349 let filesystem = handle.store().filesystem();
1350 let mut transaction = filesystem
1351 .clone()
1352 .new_transaction(
1353 lock_keys![LockKey::object(handle.store().store_object_id(), handle.object_id()),],
1354 Options { skip_journal_checks: true, ..Default::default() },
1355 )
1356 .await?;
1357 let mut file_range = 0..self.chunk_size();
1358 self.handle
1359 .get()
1360 .unwrap()
1361 .preallocate_range(&mut transaction, &mut file_range)
1362 .await
1363 .context("preallocate journal")?;
1364 if file_range.start < file_range.end {
1365 bail!("preallocate_range returned too little space");
1366 }
1367 transaction.commit().await?;
1368 Ok(())
1369 }
1370
1371 pub async fn init_superblocks(&self) -> Result<(), Error> {
1372 for _ in 0..2 {
1374 self.write_super_block().await?;
1375 }
1376 Ok(())
1377 }
1378
1379 pub async fn read_transactions_for_object(
1385 &self,
1386 object_id: u64,
1387 ) -> Result<Vec<JournaledTransaction>, Error> {
1388 let handle = self.handle.get().expect("No journal handle");
1389 let handle = ObjectStore::open_object(
1391 handle.owner(),
1392 handle.object_id(),
1393 journal_handle_options(),
1394 None,
1395 )
1396 .await?;
1397
1398 let checkpoint = match self.objects.journal_checkpoint(object_id) {
1399 Some(checkpoint) => checkpoint,
1400 None => return Ok(vec![]),
1401 };
1402 let mut reader = JournalReader::new(handle, &checkpoint);
1403 let end_offset = self.inner.lock().valid_to;
1406 Ok(self.read_transactions(&mut reader, Some(end_offset), object_id).await?.transactions)
1407 }
1408
1409 pub async fn commit(&self, transaction: &mut Transaction<'_>) -> Result<u64, Error> {
1411 if transaction.is_empty() {
1412 return Ok(self.inner.lock().writer.journal_file_checkpoint().file_offset);
1413 }
1414
1415 self.pre_commit(transaction).await?;
1416 Ok(self.write_and_apply_mutations(transaction))
1417 }
1418
1419 async fn pre_commit(&self, transaction: &Transaction<'_>) -> Result<(), Error> {
1422 let handle;
1423
1424 let (size, zero_offset) = {
1425 let mut inner = self.inner.lock();
1426
1427 if std::mem::take(&mut inner.output_reset_version) {
1429 LATEST_VERSION.serialize_into(&mut inner.writer)?;
1430 }
1431
1432 if let Some(discard_offset) = inner.discard_offset {
1433 JournalRecord::Discard(discard_offset).serialize_into(&mut inner.writer)?;
1434 inner.discard_offset = None;
1435 }
1436
1437 if inner.needs_did_flush_device {
1438 let offset = inner.device_flushed_offset;
1439 JournalRecord::DidFlushDevice(offset).serialize_into(&mut inner.writer)?;
1440 inner.needs_did_flush_device = false;
1441 }
1442
1443 handle = match self.handle.get() {
1444 None => return Ok(()),
1445 Some(x) => x,
1446 };
1447
1448 let file_offset = inner.writer.journal_file_checkpoint().file_offset;
1449
1450 let size = handle.get_size();
1451 let size = if file_offset + self.chunk_size() > size { Some(size) } else { None };
1452
1453 if size.is_none()
1454 && inner.zero_offset.is_none()
1455 && !self.objects.needs_borrow_for_journal(file_offset)
1456 {
1457 return Ok(());
1458 }
1459
1460 (size, inner.zero_offset)
1461 };
1462
1463 let mut transaction = handle
1464 .new_transaction_with_options(Options {
1465 skip_journal_checks: true,
1466 borrow_metadata_space: true,
1467 allocator_reservation: Some(self.objects.metadata_reservation()),
1468 txn_guard: Some(transaction.txn_guard()),
1469 ..Default::default()
1470 })
1471 .await?;
1472 if let Some(size) = size {
1473 handle
1474 .preallocate_range(&mut transaction, &mut (size..size + self.chunk_size()))
1475 .await?;
1476 }
1477 if let Some(zero_offset) = zero_offset {
1478 handle.zero(&mut transaction, 0..zero_offset).await?;
1479 }
1480
1481 self.write_and_apply_mutations(&mut transaction);
1484
1485 let mut inner = self.inner.lock();
1486
1487 if let Some(size) = size {
1490 assert!(inner.writer.journal_file_checkpoint().file_offset < size);
1491 }
1492
1493 if inner.zero_offset == zero_offset {
1494 inner.zero_offset = None;
1495 }
1496
1497 Ok(())
1498 }
1499
1500 fn should_apply(&self, object_id: u64, journal_file_checkpoint: &JournalCheckpoint) -> bool {
1504 let super_block_header = &self.inner.lock().super_block_header;
1505 let offset = super_block_header
1506 .journal_file_offsets
1507 .get(&object_id)
1508 .cloned()
1509 .unwrap_or(super_block_header.super_block_journal_file_offset);
1510 journal_file_checkpoint.file_offset >= offset
1511 }
1512
1513 async fn write_super_block(&self) -> Result<(), Error> {
1516 let root_parent_store = self.objects.root_parent_store();
1517
1518 let old_layers;
1523 let old_super_block_offset;
1524 let mut new_super_block_header;
1525 let checkpoint;
1526 let borrowed;
1527
1528 {
1529 let _sync_guard = debug_assert_not_too_long!(self.sync_mutex.lock());
1530 {
1531 let _write_guard = self.writer_mutex.lock();
1532 (checkpoint, borrowed) = self.pad_to_block()?;
1533 old_layers = super_block::compact_root_parent(&*root_parent_store)?;
1534 }
1535 self.flush_device(checkpoint.file_offset)
1536 .await
1537 .context("flush failed when writing superblock")?;
1538 }
1539
1540 new_super_block_header = self.inner.lock().super_block_header.clone();
1541
1542 old_super_block_offset = new_super_block_header.journal_checkpoint.file_offset;
1543
1544 let (journal_file_offsets, min_checkpoint) = self.objects.journal_file_offsets();
1545
1546 new_super_block_header.generation = new_super_block_header.generation.wrapping_add(1);
1547 new_super_block_header.super_block_journal_file_offset = checkpoint.file_offset;
1548 new_super_block_header.journal_checkpoint = min_checkpoint.unwrap_or(checkpoint);
1549 new_super_block_header.journal_checkpoint.version = LATEST_VERSION;
1550 new_super_block_header.journal_file_offsets = journal_file_offsets;
1551 new_super_block_header.borrowed_metadata_space = borrowed;
1552
1553 self.super_block_manager
1554 .save(
1555 new_super_block_header.clone(),
1556 self.objects.root_parent_store().filesystem(),
1557 old_layers,
1558 )
1559 .await?;
1560 {
1561 let mut inner = self.inner.lock();
1562 inner.super_block_header = new_super_block_header;
1563 inner.zero_offset = Some(round_down(old_super_block_offset, BLOCK_SIZE));
1564 }
1565
1566 Ok(())
1567 }
1568
1569 pub async fn sync(
1576 &self,
1577 options: SyncOptions<'_>,
1578 ) -> Result<Option<(JournalCheckpoint, u64)>, Error> {
1579 let _guard = debug_assert_not_too_long!(self.sync_mutex.lock());
1580
1581 let (checkpoint, borrowed) = {
1582 if let Some(precondition) = options.precondition {
1583 if !precondition() {
1584 return Ok(None);
1585 }
1586 }
1587
1588 let _guard = self.writer_mutex.lock();
1591
1592 self.pad_to_block()?
1593 };
1594
1595 if options.flush_device {
1596 self.flush_device(checkpoint.file_offset).await.context("sync: flush failed")?;
1597 }
1598
1599 Ok(Some((checkpoint, borrowed)))
1600 }
1601
1602 fn pad_to_block(&self) -> Result<(JournalCheckpoint, u64), Error> {
1606 let mut inner = self.inner.lock();
1607 let checkpoint = inner.writer.journal_file_checkpoint();
1608 if checkpoint.file_offset % BLOCK_SIZE != 0 {
1609 JournalRecord::EndBlock.serialize_into(&mut inner.writer)?;
1610 inner.writer.pad_to_block()?;
1611 if let Some(waker) = inner.flush_waker.take() {
1612 waker.wake();
1613 }
1614 }
1615 Ok((checkpoint, self.objects.borrowed_metadata_space()))
1616 }
1617
1618 async fn flush_device(&self, checkpoint_offset: u64) -> Result<(), Error> {
1619 assert!(
1620 self.inner.lock().image_builder_mode.is_none(),
1621 "flush_device called in image builder mode"
1622 );
1623 debug_assert_not_too_long!(poll_fn(|ctx| {
1624 let mut inner = self.inner.lock();
1625 if inner.flushed_offset >= checkpoint_offset {
1626 Poll::Ready(Ok(()))
1627 } else if inner.terminate {
1628 let context = inner
1629 .terminate_reason
1630 .as_ref()
1631 .map(|e| format!("Journal closed with error: {:?}", e))
1632 .unwrap_or_else(|| "Journal closed".to_string());
1633 Poll::Ready(Err(anyhow!(FxfsError::JournalFlushError).context(context)))
1634 } else {
1635 inner.sync_waker = Some(ctx.waker().clone());
1636 Poll::Pending
1637 }
1638 }))?;
1639
1640 let needs_flush = self.inner.lock().device_flushed_offset < checkpoint_offset;
1641 if needs_flush {
1642 let trace = self.trace.load(Ordering::Relaxed);
1643 if trace {
1644 info!("J: start flush device");
1645 }
1646 self.handle.get().unwrap().flush_device().await?;
1647 if trace {
1648 info!("J: end flush device");
1649 }
1650
1651 {
1659 let mut inner = self.inner.lock();
1660 inner.device_flushed_offset = checkpoint_offset;
1661 inner.needs_did_flush_device = true;
1662 }
1663
1664 self.objects.allocator().did_flush_device(checkpoint_offset);
1667 if trace {
1668 info!("J: did flush device");
1669 }
1670 }
1671
1672 Ok(())
1673 }
1674
1675 pub fn super_block_header(&self) -> SuperBlockHeader {
1677 self.inner.lock().super_block_header.clone()
1678 }
1679
1680 pub async fn check_journal_space(&self) -> Result<(), Error> {
1682 loop {
1683 debug_assert_not_too_long!({
1684 let inner = self.inner.lock();
1685 if inner.terminate {
1686 let context = inner
1689 .terminate_reason
1690 .as_ref()
1691 .map(|e| format!("Journal closed with error: {:?}", e))
1692 .unwrap_or_else(|| "Journal closed".to_string());
1693 break Err(anyhow!(FxfsError::JournalFlushError).context(context));
1694 }
1695 if self.objects.last_end_offset()
1696 - inner.super_block_header.journal_checkpoint.file_offset
1697 < inner.reclaim_size
1698 {
1699 break Ok(());
1700 }
1701 if inner.image_builder_mode.is_some() {
1702 break Ok(());
1703 }
1704 if inner.disable_compactions {
1705 break Err(
1706 anyhow!(FxfsError::JournalFlushError).context("Compactions disabled")
1707 );
1708 }
1709 self.reclaim_event.listen()
1710 });
1711 }
1712 }
1713
1714 fn chunk_size(&self) -> u64 {
1715 CHUNK_SIZE
1716 }
1717
1718 fn write_and_apply_mutations(&self, transaction: &mut Transaction<'_>) -> u64 {
1719 let checkpoint_before;
1720 let checkpoint_after;
1721 {
1722 let _guard = self.writer_mutex.lock();
1723 checkpoint_before = {
1724 let mut inner = self.inner.lock();
1725 if transaction.includes_write() {
1726 inner.needs_barrier = true;
1727 }
1728 let checkpoint = inner.writer.journal_file_checkpoint();
1729 for TxnMutation { object_id, mutation, .. } in transaction.mutations() {
1730 self.objects.write_mutation(
1731 *object_id,
1732 mutation,
1733 Writer(*object_id, &mut inner.writer),
1734 );
1735 }
1736 checkpoint
1737 };
1738 let maybe_mutation =
1739 self.objects.apply_transaction(transaction, &checkpoint_before).expect(
1740 "apply_transaction should not fail in live mode; \
1741 filesystem will be in an inconsistent state",
1742 );
1743 checkpoint_after = {
1744 let mut inner = self.inner.lock();
1745 if let Some(mutation) = maybe_mutation {
1746 inner
1747 .writer
1748 .write_record(&JournalRecord::Mutation { object_id: 0, mutation })
1749 .unwrap();
1750 }
1751 for (device_range, checksums, first_write) in
1752 transaction.take_checksums().into_iter()
1753 {
1754 inner
1755 .writer
1756 .write_record(&JournalRecord::DataChecksums(
1757 device_range,
1758 Checksums::fletcher(checksums),
1759 first_write,
1760 ))
1761 .unwrap();
1762 }
1763 inner.writer.write_record(&JournalRecord::Commit).unwrap();
1764
1765 inner.writer.journal_file_checkpoint()
1766 };
1767 }
1768 self.objects.did_commit_transaction(
1769 transaction,
1770 &checkpoint_before,
1771 checkpoint_after.file_offset,
1772 );
1773
1774 if let Some(waker) = self.inner.lock().flush_waker.take() {
1775 waker.wake();
1776 }
1777
1778 checkpoint_before.file_offset
1779 }
1780
1781 pub async fn flush_task(self: Arc<Self>) {
1785 let mut flush_fut = None;
1786 let mut compact_fut = None;
1787 let mut flush_error = false;
1788 poll_fn(|ctx| {
1789 loop {
1790 {
1791 let mut inner = self.inner.lock();
1792 if flush_fut.is_none() && !flush_error && self.handle.get().is_some() {
1793 let flushable = inner.writer.flushable_bytes();
1794 if flushable > 0 {
1795 flush_fut = Some(Box::pin(self.flush(flushable)));
1796 }
1797 }
1798 if inner.terminate && flush_fut.is_none() && compact_fut.is_none() {
1799 return Poll::Ready(());
1800 }
1801 if compact_fut.is_none()
1805 && !inner.terminate
1806 && !inner.disable_compactions
1807 && inner.image_builder_mode.is_none()
1808 && self.objects.last_end_offset()
1809 - inner.super_block_header.journal_checkpoint.file_offset
1810 > inner.reclaim_size / 2
1811 {
1812 compact_fut = Some(Box::pin(self.compact()));
1813 inner.compaction_running = true;
1814 }
1815 inner.flush_waker = Some(ctx.waker().clone());
1816 }
1817 let mut pending = true;
1818 if let Some(fut) = flush_fut.as_mut() {
1819 if let Poll::Ready(result) = fut.poll_unpin(ctx) {
1820 if let Err(e) = result {
1821 self.inner.lock().terminate(Some(e.context("Flush error")));
1822 self.reclaim_event.notify(usize::MAX);
1823 flush_error = true;
1824 }
1825 flush_fut = None;
1826 pending = false;
1827 }
1828 }
1829 if let Some(fut) = compact_fut.as_mut() {
1830 if let Poll::Ready(result) = fut.poll_unpin(ctx) {
1831 let mut inner = self.inner.lock();
1832 if let Err(e) = result {
1833 inner.terminate(Some(e.context("Compaction error")));
1834 }
1835 compact_fut = None;
1836 inner.compaction_running = false;
1837 self.reclaim_event.notify(usize::MAX);
1838 pending = false;
1839 }
1840 }
1841 if pending {
1842 return Poll::Pending;
1843 }
1844 }
1845 })
1846 .await;
1847 }
1848
1849 pub fn get_compaction_yielder(&self) -> CompactionYielder<'_> {
1851 CompactionYielder::new(self)
1852 }
1853
1854 async fn flush(&self, amount: usize) -> Result<(), Error> {
1855 let handle = self.handle.get().unwrap();
1856 let mut buf = handle.allocate_buffer(amount).await;
1857 let (offset, len, barrier_on_first_write) = {
1858 let mut inner = self.inner.lock();
1859 let offset = inner.writer.take_flushable(buf.as_mut());
1860 let barrier_on_first_write = inner.needs_barrier && inner.barriers_enabled;
1861 inner.needs_barrier = false;
1864 (offset, buf.len() as u64, barrier_on_first_write)
1865 };
1866 self.handle
1867 .get()
1868 .unwrap()
1869 .overwrite(
1870 offset,
1871 buf.as_mut(),
1872 OverwriteOptions { barrier_on_first_write, ..Default::default() },
1873 )
1874 .await?;
1875
1876 let mut inner = self.inner.lock();
1877 if let Some(waker) = inner.sync_waker.take() {
1878 waker.wake();
1879 }
1880 inner.flushed_offset = offset + len;
1881 inner.valid_to = inner.flushed_offset;
1882 Ok(())
1883 }
1884
1885 #[trace]
1886 async fn compact(&self) -> Result<(), Error> {
1887 assert!(
1888 self.inner.lock().image_builder_mode.is_none(),
1889 "compact called in image builder mode"
1890 );
1891 let bytes_before = self.objects.compaction_bytes_written();
1892 let _measure = crate::metrics::DurationMeasureScope::new(
1893 &crate::metrics::lsm_tree_metrics().journal_compaction_time,
1894 );
1895 crate::metrics::lsm_tree_metrics().journal_compactions_total.add(1);
1896 let trace = self.trace.load(Ordering::Relaxed);
1897 debug!("Compaction starting");
1898 if trace {
1899 info!("J: start compaction");
1900 }
1901 let earliest_version = self.objects.flush().await.context("Failed to flush objects")?;
1902 self.inner.lock().super_block_header.earliest_version = earliest_version;
1903 self.write_super_block().await.context("Failed to write superblock")?;
1904 if trace {
1905 info!("J: end compaction");
1906 }
1907 debug!("Compaction finished");
1908 let bytes_after = self.objects.compaction_bytes_written();
1909 crate::metrics::lsm_tree_metrics()
1910 .journal_compaction_bytes_written
1911 .add(bytes_after.saturating_sub(bytes_before));
1912 Ok(())
1913 }
1914
1915 pub async fn force_compact(&self) -> Result<(), Error> {
1918 self.inner.lock().forced_compaction = true;
1919 scopeguard::defer! { self.inner.lock().forced_compaction = false; }
1920 self.compact().await
1921 }
1922
1923 pub async fn stop_compactions(&self) {
1924 loop {
1925 debug_assert_not_too_long!({
1926 let mut inner = self.inner.lock();
1927 inner.disable_compactions = true;
1928 if !inner.compaction_running {
1929 return;
1930 }
1931 self.reclaim_event.listen()
1932 });
1933 }
1934 }
1935
1936 pub fn track_statistics(self: &Arc<Self>, parent: &fuchsia_inspect::Node, name: &str) {
1939 let this = Arc::downgrade(self);
1940 parent.record_lazy_child(name, move || {
1941 let this_clone = this.clone();
1942 async move {
1943 let inspector = fuchsia_inspect::Inspector::default();
1944 if let Some(this) = this_clone.upgrade() {
1945 let (journal_min, journal_max, journal_reclaim_size) = {
1946 let inner = this.inner.lock();
1948 (
1949 round_down(
1950 inner.super_block_header.journal_checkpoint.file_offset,
1951 BLOCK_SIZE,
1952 ),
1953 inner.flushed_offset,
1954 inner.reclaim_size,
1955 )
1956 };
1957 let root = inspector.root();
1958 root.record_uint("journal_min_offset", journal_min);
1959 root.record_uint("journal_max_offset", journal_max);
1960 root.record_uint("journal_size", journal_max - journal_min);
1961 root.record_uint("journal_reclaim_size", journal_reclaim_size);
1962
1963 if let Some(x) = round_div(
1965 100 * (journal_max - journal_min),
1966 this.objects.allocator().get_disk_bytes(),
1967 ) {
1968 root.record_uint("journal_size_to_disk_size_percent", x);
1969 }
1970 }
1971 Ok(inspector)
1972 }
1973 .boxed()
1974 });
1975 }
1976
1977 pub fn terminate(&self) {
1979 self.inner.lock().terminate(None);
1980 self.reclaim_event.notify(usize::MAX);
1981 }
1982}
1983
1984pub struct Writer<'a>(u64, &'a mut JournalWriter);
1986
1987impl Writer<'_> {
1988 pub fn write(&mut self, mutation: Mutation) {
1989 self.1.write_record(&JournalRecord::Mutation { object_id: self.0, mutation }).unwrap();
1990 }
1991}
1992
1993#[cfg(target_os = "fuchsia")]
1994mod yielder {
1995 use super::Journal;
1996 use crate::lsm_tree::Yielder;
1997 use fuchsia_async as fasync;
1998
1999 pub struct CompactionYielder<'a> {
2003 journal: &'a Journal,
2004 low_priority_task: Option<fasync::LowPriorityTask>,
2005 }
2006
2007 impl<'a> CompactionYielder<'a> {
2008 pub fn new(journal: &'a Journal) -> Self {
2009 Self { journal, low_priority_task: None }
2010 }
2011 }
2012
2013 impl Yielder for CompactionYielder<'_> {
2014 async fn yield_now(&mut self) {
2015 const IDLE_PERIOD: zx::MonotonicDuration = zx::MonotonicDuration::from_millis(4);
2019 const MAX_YIELD_DURATION: zx::MonotonicDuration =
2020 zx::MonotonicDuration::from_millis(16);
2021
2022 {
2023 let inner = self.journal.inner.lock();
2024 if inner.forced_compaction {
2025 return;
2026 }
2027 let outstanding = self.journal.objects.last_end_offset()
2028 - inner.super_block_header.journal_checkpoint.file_offset;
2029 let half_reclaim_size = inner.reclaim_size / 2;
2030 if outstanding
2031 .checked_sub(half_reclaim_size)
2032 .is_some_and(|x| x >= half_reclaim_size / 2)
2033 {
2034 self.low_priority_task = None;
2038 return;
2039 }
2040 }
2041
2042 self.low_priority_task
2043 .get_or_insert_with(|| fasync::LowPriorityTask::new())
2044 .wait_until_idle_for(
2045 IDLE_PERIOD,
2046 fasync::MonotonicInstant::after(MAX_YIELD_DURATION),
2047 )
2048 .await;
2049 }
2050 }
2051}
2052
2053#[cfg(not(target_os = "fuchsia"))]
2054mod yielder {
2055 use super::Journal;
2056 use crate::lsm_tree::Yielder;
2057
2058 #[expect(dead_code)]
2059 pub struct CompactionYielder<'a>(&'a Journal);
2060
2061 impl<'a> CompactionYielder<'a> {
2062 pub fn new(journal: &'a Journal) -> Self {
2063 Self(journal)
2064 }
2065 }
2066
2067 impl Yielder for CompactionYielder<'_> {
2068 async fn yield_now(&mut self) {}
2069 }
2070}
2071
2072pub use yielder::*;
2073
2074#[cfg(test)]
2075mod tests {
2076 use super::SuperBlockInstance;
2077 use crate::filesystem::{FxFilesystem, FxFilesystemBuilder, SyncOptions};
2078 use crate::fsck::fsck;
2079 use crate::object_handle::{ObjectHandle, ReadObjectHandle, WriteObjectHandle};
2080 use crate::object_store::directory::Directory;
2081 use crate::object_store::transaction::Options;
2082 use crate::object_store::volume::root_volume;
2083 use crate::object_store::{
2084 HandleOptions, LockKey, NewChildStoreOptions, ObjectStore, StoreOptions, lock_keys,
2085 };
2086 use fuchsia_async as fasync;
2087 use fuchsia_async::MonotonicDuration;
2088 use storage_device::DeviceHolder;
2089 use storage_device::fake_device::FakeDevice;
2090
2091 const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
2092
2093 #[fuchsia::test]
2094 async fn test_replay() {
2095 const TEST_DATA: &[u8] = b"hello";
2096
2097 let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
2098
2099 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2100
2101 let object_id = {
2102 let root_store = fs.root_store();
2103 let root_directory =
2104 Directory::open(&root_store, root_store.root_directory_object_id())
2105 .await
2106 .expect("open failed");
2107 let mut transaction = fs
2108 .clone()
2109 .new_transaction(
2110 lock_keys![LockKey::object(
2111 root_store.store_object_id(),
2112 root_store.root_directory_object_id(),
2113 )],
2114 Options::default(),
2115 )
2116 .await
2117 .expect("new_transaction failed");
2118 let handle = root_directory
2119 .create_child_file(&mut transaction, "test")
2120 .await
2121 .expect("create_child_file failed");
2122
2123 transaction.commit().await.expect("commit failed");
2124 let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
2125 buf.as_mut_slice().copy_from_slice(TEST_DATA);
2126 handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2127 fs.sync(SyncOptions::default()).await.expect("sync failed");
2130 handle.object_id()
2131 };
2132
2133 {
2134 fs.close().await.expect("Close failed");
2135 let device = fs.take_device().await;
2136 device.reopen(false);
2137 let fs = FxFilesystem::open(device).await.expect("open failed");
2138 let handle = ObjectStore::open_object(
2139 &fs.root_store(),
2140 object_id,
2141 HandleOptions::default(),
2142 None,
2143 )
2144 .await
2145 .expect("open_object failed");
2146 let mut buf = handle.allocate_buffer(TEST_DEVICE_BLOCK_SIZE as usize).await;
2147 assert_eq!(handle.read(0, buf.as_mut()).await.expect("read failed"), TEST_DATA.len());
2148 assert_eq!(&buf.as_slice()[..TEST_DATA.len()], TEST_DATA);
2149 fsck(fs.clone()).await.expect("fsck failed");
2150 fs.close().await.expect("Close failed");
2151 }
2152 }
2153
2154 #[fuchsia::test]
2155 async fn test_reset() {
2156 const TEST_DATA: &[u8] = b"hello";
2157
2158 let device = DeviceHolder::new(FakeDevice::new(32768, TEST_DEVICE_BLOCK_SIZE));
2159
2160 let mut object_ids = Vec::new();
2161
2162 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2163 {
2164 let root_store = fs.root_store();
2165 let root_directory =
2166 Directory::open(&root_store, root_store.root_directory_object_id())
2167 .await
2168 .expect("open failed");
2169 let mut transaction = fs
2170 .clone()
2171 .new_transaction(
2172 lock_keys![LockKey::object(
2173 root_store.store_object_id(),
2174 root_store.root_directory_object_id(),
2175 )],
2176 Options::default(),
2177 )
2178 .await
2179 .expect("new_transaction failed");
2180 let handle = root_directory
2181 .create_child_file(&mut transaction, "test")
2182 .await
2183 .expect("create_child_file failed");
2184 transaction.commit().await.expect("commit failed");
2185 let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
2186 buf.as_mut_slice().copy_from_slice(TEST_DATA);
2187 handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2188 fs.sync(SyncOptions::default()).await.expect("sync failed");
2189 object_ids.push(handle.object_id());
2190
2191 for i in 0..1000 {
2194 let mut transaction = fs
2195 .clone()
2196 .new_transaction(
2197 lock_keys![LockKey::object(
2198 root_store.store_object_id(),
2199 root_store.root_directory_object_id(),
2200 )],
2201 Options::default(),
2202 )
2203 .await
2204 .expect("new_transaction failed");
2205 let handle = root_directory
2206 .create_child_file(&mut transaction, &format!("{}", i))
2207 .await
2208 .expect("create_child_file failed");
2209 transaction.commit().await.expect("commit failed");
2210 let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
2211 buf.as_mut_slice().copy_from_slice(TEST_DATA);
2212 handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2213 object_ids.push(handle.object_id());
2214 }
2215 }
2216 fs.close().await.expect("fs close failed");
2217 let device = fs.take_device().await;
2218 device.reopen(false);
2219 let fs = FxFilesystem::open(device).await.expect("open failed");
2220 fsck(fs.clone()).await.expect("fsck failed");
2221 {
2222 let root_store = fs.root_store();
2223 for &object_id in &object_ids[0..1] {
2225 let handle = ObjectStore::open_object(
2226 &root_store,
2227 object_id,
2228 HandleOptions::default(),
2229 None,
2230 )
2231 .await
2232 .expect("open_object failed");
2233 let mut buf = handle.allocate_buffer(TEST_DEVICE_BLOCK_SIZE as usize).await;
2234 assert_eq!(
2235 handle.read(0, buf.as_mut()).await.expect("read failed"),
2236 TEST_DATA.len()
2237 );
2238 assert_eq!(&buf.as_slice()[..TEST_DATA.len()], TEST_DATA);
2239 }
2240
2241 let root_directory =
2243 Directory::open(&root_store, root_store.root_directory_object_id())
2244 .await
2245 .expect("open failed");
2246 let mut transaction = fs
2247 .clone()
2248 .new_transaction(
2249 lock_keys![LockKey::object(
2250 root_store.store_object_id(),
2251 root_store.root_directory_object_id(),
2252 )],
2253 Options::default(),
2254 )
2255 .await
2256 .expect("new_transaction failed");
2257 let handle = root_directory
2258 .create_child_file(&mut transaction, "test2")
2259 .await
2260 .expect("create_child_file failed");
2261 transaction.commit().await.expect("commit failed");
2262 let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
2263 buf.as_mut_slice().copy_from_slice(TEST_DATA);
2264 handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2265 fs.sync(SyncOptions::default()).await.expect("sync failed");
2266 object_ids.push(handle.object_id());
2267 }
2268
2269 fs.close().await.expect("close failed");
2270 let device = fs.take_device().await;
2271 device.reopen(false);
2272 let fs = FxFilesystem::open(device).await.expect("open failed");
2273 {
2274 fsck(fs.clone()).await.expect("fsck failed");
2275
2276 for &object_id in object_ids[0..1].iter().chain(object_ids.last().cloned().iter()) {
2278 let handle = ObjectStore::open_object(
2279 &fs.root_store(),
2280 object_id,
2281 HandleOptions::default(),
2282 None,
2283 )
2284 .await
2285 .unwrap_or_else(|e| {
2286 panic!("open_object failed (object_id: {}): {:?}", object_id, e)
2287 });
2288 let mut buf = handle.allocate_buffer(TEST_DEVICE_BLOCK_SIZE as usize).await;
2289 assert_eq!(
2290 handle.read(0, buf.as_mut()).await.expect("read failed"),
2291 TEST_DATA.len()
2292 );
2293 assert_eq!(&buf.as_slice()[..TEST_DATA.len()], TEST_DATA);
2294 }
2295 }
2296 fs.close().await.expect("close failed");
2297 }
2298
2299 #[fuchsia::test]
2300 async fn test_discard() {
2301 let device = {
2302 let device = DeviceHolder::new(FakeDevice::new(8192, 4096));
2303 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2304 let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
2305
2306 let store = root_volume
2307 .new_volume("test", NewChildStoreOptions::default())
2308 .await
2309 .expect("new_volume failed");
2310 let root_directory = Directory::open(&store, store.root_directory_object_id())
2311 .await
2312 .expect("open failed");
2313
2314 let mut i = 0;
2316 loop {
2317 let mut transaction = fs
2318 .clone()
2319 .new_transaction(
2320 lock_keys![LockKey::object(
2321 store.store_object_id(),
2322 store.root_directory_object_id()
2323 )],
2324 Options::default(),
2325 )
2326 .await
2327 .expect("new_transaction failed");
2328 root_directory
2329 .create_child_file(&mut transaction, &format!("a {i}"))
2330 .await
2331 .expect("create_child_file failed");
2332 if transaction.commit().await.expect("commit failed") > super::CHUNK_SIZE {
2333 break;
2334 }
2335 i += 1;
2336 }
2337
2338 fs.journal().force_compact().await.expect("compact failed");
2340 fs.journal().stop_compactions().await;
2341
2342 let mut i = 0;
2344 loop {
2345 let mut transaction = fs
2346 .clone()
2347 .new_transaction(
2348 lock_keys![LockKey::object(
2349 store.store_object_id(),
2350 store.root_directory_object_id()
2351 )],
2352 Options::default(),
2353 )
2354 .await
2355 .expect("new_transaction failed");
2356 root_directory
2357 .create_child_file(&mut transaction, &format!("b {i}"))
2358 .await
2359 .expect("create_child_file failed");
2360 if transaction.commit().await.expect("commit failed") > 2 * super::CHUNK_SIZE {
2361 break;
2362 }
2363 i += 1;
2364 }
2365
2366 fasync::Timer::new(MonotonicDuration::from_millis(10)).await;
2368 fs.device().snapshot().expect("snapshot failed")
2371 };
2372
2373 let fs = FxFilesystem::open(device).await.expect("open failed");
2374
2375 {
2376 let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
2377
2378 let store =
2379 root_volume.volume("test", StoreOptions::default()).await.expect("volume failed");
2380
2381 let root_directory = Directory::open(&store, store.root_directory_object_id())
2382 .await
2383 .expect("open failed");
2384
2385 let mut transaction = fs
2387 .clone()
2388 .new_transaction(
2389 lock_keys![LockKey::object(
2390 store.store_object_id(),
2391 store.root_directory_object_id()
2392 )],
2393 Options::default(),
2394 )
2395 .await
2396 .expect("new_transaction failed");
2397 root_directory
2398 .create_child_file(&mut transaction, &format!("d"))
2399 .await
2400 .expect("create_child_file failed");
2401 transaction.commit().await.expect("commit failed");
2402 }
2403
2404 fs.close().await.expect("close failed");
2405 let device = fs.take_device().await;
2406 device.reopen(false);
2407
2408 let fs = FxFilesystem::open(device).await.expect("open failed");
2409 fsck(fs.clone()).await.expect("fsck failed");
2410 fs.close().await.expect("close failed");
2411 }
2412
2413 #[fuchsia::test]
2414 async fn test_use_existing_generation() {
2415 let device = DeviceHolder::new(FakeDevice::new(8192, 4096));
2416
2417 let fs = FxFilesystemBuilder::new()
2419 .format(true)
2420 .image_builder_mode(Some(SuperBlockInstance::A))
2421 .open(device)
2422 .await
2423 .expect("open failed");
2424 fs.enable_allocations();
2425 let generation0 = fs.super_block_header().generation;
2426 assert_eq!(generation0, 1);
2427 fs.close().await.expect("close failed");
2428 let device = fs.take_device().await;
2429 device.reopen(false);
2430
2431 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2433 let generation1 = fs.super_block_header().generation;
2434 {
2435 let root_volume = crate::object_store::volume::root_volume(fs.clone())
2436 .await
2437 .expect("root_volume failed");
2438 root_volume
2439 .new_volume("test", crate::object_store::NewChildStoreOptions::default())
2440 .await
2441 .expect("new_volume failed");
2442 }
2443 fs.close().await.expect("close failed");
2444 let device = fs.take_device().await;
2445 device.reopen(false);
2446
2447 let fs = FxFilesystemBuilder::new()
2449 .format(true)
2450 .image_builder_mode(Some(SuperBlockInstance::A))
2451 .open(device)
2452 .await
2453 .expect("open failed");
2454 fs.enable_allocations();
2455 let generation2 = fs.super_block_header().generation;
2456 assert!(
2457 generation2 > generation1,
2458 "generation2 ({}) should be greater than generation1 ({})",
2459 generation2,
2460 generation1
2461 );
2462 fs.close().await.expect("close failed");
2463 }
2464
2465 #[fuchsia::test]
2466 async fn test_image_builder_mode_generation_bump_512_byte_block() {
2467 let device = DeviceHolder::new(FakeDevice::new(16384, 512));
2468
2469 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2471 let generation1 = fs.super_block_header().generation;
2472 fs.close().await.expect("close failed");
2473 let device = fs.take_device().await;
2474 device.reopen(false);
2475
2476 let fs = FxFilesystemBuilder::new()
2478 .format(true)
2479 .image_builder_mode(Some(SuperBlockInstance::A))
2480 .open(device)
2481 .await
2482 .expect("open failed");
2483
2484 fs.enable_allocations();
2485 let generation2 = fs.super_block_header().generation;
2486 assert!(
2487 generation2 > generation1,
2488 "Expected generation bump, got {} vs {}",
2489 generation2,
2490 generation1
2491 );
2492 fs.close().await.expect("close failed");
2493 }
2494
2495 #[fuchsia::test(allow_stalls = false)]
2496 #[cfg(target_os = "fuchsia")]
2497 async fn test_low_priority_compaction() {
2498 use fuchsia_async::TestExecutor;
2499 use std::sync::Arc;
2500 use std::sync::atomic::{AtomicBool, Ordering};
2501
2502 let device = DeviceHolder::new(FakeDevice::new(16384, TEST_DEVICE_BLOCK_SIZE));
2503 let fs = FxFilesystemBuilder::new()
2504 .journal_options(super::JournalOptions { reclaim_size: 65536, ..Default::default() })
2505 .format(true)
2506 .open(device)
2507 .await
2508 .expect("open failed");
2509
2510 let _low = fasync::LowPriorityTask::new();
2511
2512 {
2514 let root_store = fs.root_store();
2515 let root_directory =
2516 Directory::open(&root_store, root_store.root_directory_object_id())
2517 .await
2518 .expect("open failed");
2519 for i in 0..100 {
2520 let mut transaction = fs
2521 .clone()
2522 .new_transaction(
2523 lock_keys![LockKey::object(
2524 root_store.store_object_id(),
2525 root_store.root_directory_object_id(),
2526 )],
2527 Options::default(),
2528 )
2529 .await
2530 .expect("new_transaction failed");
2531 root_directory
2532 .create_child_file(&mut transaction, &format!("test{}", i))
2533 .await
2534 .expect("create_child_file failed");
2535 transaction.commit().await.expect("commit failed");
2536 }
2537 }
2538
2539 let stop = Arc::new(AtomicBool::new(false));
2541 let stop_clone = stop.clone();
2542 let _normal_task = fasync::Task::spawn(async move {
2543 while !stop_clone.load(Ordering::Relaxed) {
2544 fasync::Timer::new(fasync::MonotonicInstant::after(
2545 MonotonicDuration::from_millis(1),
2546 ))
2547 .await;
2548 }
2549 });
2550
2551 {
2554 let root_store = fs.root_store();
2555 let root_directory =
2556 Directory::open(&root_store, root_store.root_directory_object_id())
2557 .await
2558 .expect("open failed");
2559 let mut i = 0;
2560 loop {
2561 let mut transaction = fs
2562 .clone()
2563 .new_transaction(
2564 lock_keys![LockKey::object(
2565 root_store.store_object_id(),
2566 root_store.root_directory_object_id(),
2567 )],
2568 Options::default(),
2569 )
2570 .await
2571 .expect("new_transaction failed");
2572 root_directory
2573 .create_child_file(&mut transaction, &format!("trigger{i}"))
2574 .await
2575 .expect("create_child_file failed");
2576 transaction.commit().await.expect("commit failed");
2577
2578 if fs.journal().inner.lock().compaction_running {
2579 break;
2580 }
2581 TestExecutor::advance_to(fasync::MonotonicInstant::after(
2582 MonotonicDuration::from_millis(1),
2583 ))
2584 .await;
2585 i += 1;
2586 }
2587 }
2588
2589 for _ in 0..10 {
2592 TestExecutor::advance_to(fasync::MonotonicInstant::after(
2593 MonotonicDuration::from_millis(1),
2594 ))
2595 .await;
2596 assert!(fs.journal().inner.lock().compaction_running);
2597 }
2598
2599 stop.store(true, Ordering::Relaxed);
2601 TestExecutor::advance_to(fasync::MonotonicInstant::after(MonotonicDuration::from_millis(
2602 1,
2603 )))
2604 .await;
2605
2606 assert!(fs.journal().inner.lock().compaction_running);
2609
2610 for _ in 0..3 {
2612 TestExecutor::advance_to(fasync::MonotonicInstant::after(
2613 MonotonicDuration::from_millis(1),
2614 ))
2615 .await;
2616 assert!(fs.journal().inner.lock().compaction_running);
2617 }
2618
2619 TestExecutor::advance_to(fasync::MonotonicInstant::after(MonotonicDuration::from_millis(
2621 1,
2622 )))
2623 .await;
2624
2625 let _ = TestExecutor::poll_until_stalled(std::future::pending::<()>()).await;
2627 assert!(!fs.journal().inner.lock().compaction_running);
2628
2629 fs.close().await.expect("Close failed");
2630 }
2631
2632 #[fuchsia::test(allow_stalls = false)]
2633 #[cfg(target_os = "fuchsia")]
2634 async fn test_low_priority_compaction_deadline() {
2635 use fuchsia_async::TestExecutor;
2636 use std::sync::Arc;
2637 use std::sync::atomic::{AtomicBool, Ordering};
2638
2639 let device = DeviceHolder::new(FakeDevice::new(216384, TEST_DEVICE_BLOCK_SIZE));
2640 let fs = FxFilesystemBuilder::new()
2641 .journal_options(super::JournalOptions { reclaim_size: 65536, ..Default::default() })
2642 .format(true)
2643 .open(device)
2644 .await
2645 .expect("open failed");
2646
2647 let _low = fasync::LowPriorityTask::new();
2648
2649 {
2651 let root_store = fs.root_store();
2652 let root_directory =
2653 Directory::open(&root_store, root_store.root_directory_object_id())
2654 .await
2655 .expect("open failed");
2656 for i in 0..10 {
2657 let mut transaction = fs
2658 .clone()
2659 .new_transaction(
2660 lock_keys![LockKey::object(
2661 root_store.store_object_id(),
2662 root_store.root_directory_object_id(),
2663 )],
2664 Options::default(),
2665 )
2666 .await
2667 .expect("new_transaction failed");
2668 root_directory
2669 .create_child_file(&mut transaction, &format!("test{}", i))
2670 .await
2671 .expect("create_child_file failed");
2672 transaction.commit().await.expect("commit failed");
2673 }
2674 }
2675
2676 let stop = Arc::new(AtomicBool::new(false));
2678 let stop_clone = stop.clone();
2679 let _normal_task = fasync::Task::spawn(async move {
2680 while !stop_clone.load(Ordering::Relaxed) {
2681 fasync::Timer::new(fasync::MonotonicInstant::after(
2682 MonotonicDuration::from_millis(3),
2683 ))
2684 .await;
2685 }
2686 });
2687
2688 {
2690 let root_store = fs.root_store();
2691 let root_directory =
2692 Directory::open(&root_store, root_store.root_directory_object_id())
2693 .await
2694 .expect("open failed");
2695 let mut i = 0;
2696 loop {
2697 let mut transaction = fs
2698 .clone()
2699 .new_transaction(
2700 lock_keys![LockKey::object(
2701 root_store.store_object_id(),
2702 root_store.root_directory_object_id(),
2703 )],
2704 Options::default(),
2705 )
2706 .await
2707 .expect("new_transaction failed");
2708 root_directory
2709 .create_child_file(&mut transaction, &format!("trigger{i}"))
2710 .await
2711 .expect("create_child_file failed");
2712 transaction.commit().await.expect("commit failed");
2713
2714 if fs.journal().inner.lock().compaction_running {
2715 break;
2716 }
2717 TestExecutor::advance_to(fasync::MonotonicInstant::after(
2718 MonotonicDuration::from_millis(1),
2719 ))
2720 .await;
2721 i += 1;
2722 }
2723 }
2724
2725 let mut count = 0;
2728 for _ in 0..1000 {
2729 TestExecutor::advance_to(fasync::MonotonicInstant::after(
2730 MonotonicDuration::from_millis(20),
2731 ))
2732 .await;
2733 if !fs.journal().inner.lock().compaction_running {
2734 break;
2735 }
2736 count += 1;
2737 }
2738 assert!(!fs.journal().inner.lock().compaction_running);
2739
2740 assert!(count > 200);
2743
2744 stop.store(true, Ordering::Relaxed);
2745 fs.close().await.expect("Close failed");
2746 }
2747
2748 #[fuchsia::test(allow_stalls = false)]
2749 #[cfg(target_os = "fuchsia")]
2750 async fn test_low_priority_compaction_no_yielding_when_full() {
2751 use fuchsia_async::TestExecutor;
2752 use std::sync::Arc;
2753 use std::sync::atomic::{AtomicBool, Ordering};
2754
2755 let reclaim_size = 65536;
2756 let device = DeviceHolder::new(FakeDevice::new(216384, TEST_DEVICE_BLOCK_SIZE));
2757 let fs = FxFilesystemBuilder::new()
2758 .journal_options(super::JournalOptions { reclaim_size, ..Default::default() })
2759 .format(true)
2760 .open(device)
2761 .await
2762 .expect("open failed");
2763
2764 let _low = fasync::LowPriorityTask::new();
2765
2766 {
2768 let root_store = fs.root_store();
2769 let root_directory =
2770 Directory::open(&root_store, root_store.root_directory_object_id())
2771 .await
2772 .expect("open failed");
2773 for i in 0..10 {
2774 let mut transaction = fs
2775 .clone()
2776 .new_transaction(
2777 lock_keys![LockKey::object(
2778 root_store.store_object_id(),
2779 root_store.root_directory_object_id(),
2780 )],
2781 Options::default(),
2782 )
2783 .await
2784 .expect("new_transaction failed");
2785 root_directory
2786 .create_child_file(&mut transaction, &format!("test{}", i))
2787 .await
2788 .expect("create_child_file failed");
2789 transaction.commit().await.expect("commit failed");
2790 }
2791 }
2792
2793 let stop = Arc::new(AtomicBool::new(false));
2795 let stop_clone = stop.clone();
2796 let _normal_task = fasync::Task::spawn(async move {
2797 while !stop_clone.load(Ordering::Relaxed) {
2798 fasync::Timer::new(fasync::MonotonicInstant::after(
2799 MonotonicDuration::from_millis(3),
2800 ))
2801 .await;
2802 }
2803 });
2804
2805 {
2807 let root_store = fs.root_store();
2808 let root_directory =
2809 Directory::open(&root_store, root_store.root_directory_object_id())
2810 .await
2811 .expect("open failed");
2812 let mut i = 0;
2813 loop {
2814 let mut transaction = fs
2815 .clone()
2816 .new_transaction(
2817 lock_keys![LockKey::object(
2818 root_store.store_object_id(),
2819 root_store.root_directory_object_id(),
2820 )],
2821 Options::default(),
2822 )
2823 .await
2824 .expect("new_transaction failed");
2825 root_directory
2826 .create_child_file(&mut transaction, &format!("trigger{i}"))
2827 .await
2828 .expect("create_child_file failed");
2829 transaction.commit().await.expect("commit failed");
2830
2831 let outstanding = {
2832 let inner = fs.journal().inner.lock();
2833 fs.journal().objects.last_end_offset()
2834 - inner.super_block_header.journal_checkpoint.file_offset
2835 };
2836 if outstanding >= reclaim_size * 7 / 8 {
2837 break;
2838 }
2839 i += 1;
2841 }
2842 }
2843
2844 TestExecutor::advance_to(fasync::MonotonicInstant::after(MonotonicDuration::from_millis(
2846 4,
2847 )))
2848 .await;
2849
2850 let _ = TestExecutor::poll_until_stalled(std::future::pending::<()>()).await;
2852 assert!(!fs.journal().inner.lock().compaction_running);
2853
2854 stop.store(true, Ordering::Relaxed);
2855 fs.close().await.expect("Close failed");
2856 }
2857}
2858
2859#[cfg(fuzz)]
2860mod fuzz {
2861 use fuzz::fuzz;
2862
2863 #[fuzz]
2864 fn fuzz_journal_bytes(input: Vec<u8>) {
2865 use crate::filesystem::FxFilesystem;
2866 use fuchsia_async as fasync;
2867 use std::io::Write;
2868 use storage_device::DeviceHolder;
2869 use storage_device::fake_device::FakeDevice;
2870
2871 fasync::SendExecutorBuilder::new().num_threads(4).build().run(async move {
2872 let device = DeviceHolder::new(FakeDevice::new(32768, 512));
2873 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2874 fs.journal().inner.lock().writer.write_all(&input).expect("write failed");
2875 fs.close().await.expect("close failed");
2876 let device = fs.take_device().await;
2877 device.reopen(false);
2878 if let Ok(fs) = FxFilesystem::open(device).await {
2879 let _ = fs.close().await;
2882 }
2883 });
2884 }
2885
2886 #[fuzz]
2887 fn fuzz_journal(input: Vec<super::JournalRecord>) {
2888 use crate::filesystem::FxFilesystem;
2889 use fuchsia_async as fasync;
2890 use storage_device::DeviceHolder;
2891 use storage_device::fake_device::FakeDevice;
2892
2893 fasync::SendExecutorBuilder::new().num_threads(4).build().run(async move {
2894 let device = DeviceHolder::new(FakeDevice::new(32768, 512));
2895 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2896 {
2897 let mut inner = fs.journal().inner.lock();
2898 for record in &input {
2899 let _ = inner.writer.write_record(record);
2900 }
2901 }
2902 fs.close().await.expect("close failed");
2903 let device = fs.take_device().await;
2904 device.reopen(false);
2905 if let Ok(fs) = FxFilesystem::open(device).await {
2906 let _ = fs.close().await;
2909 }
2910 });
2911 }
2912}