1mod bootstrap_handle;
20mod checksum_list;
21mod reader;
22pub mod super_block;
23mod writer;
24
25use crate::checksum::{Checksum, Checksums, ChecksumsV38};
26use crate::errors::FxfsError;
27use crate::filesystem::{ApplyContext, ApplyMode, FxFilesystem, SyncOptions};
28use crate::log::*;
29use crate::lsm_tree::cache::NullCache;
30use crate::lsm_tree::types::Layer;
31use crate::object_handle::{ObjectHandle as _, ReadObjectHandle};
32use crate::object_store::allocator::Allocator;
33use crate::object_store::data_object_handle::OverwriteOptions;
34use crate::object_store::extent_record::{
35 DEFAULT_DATA_ATTRIBUTE_ID, ExtentKey, ExtentMode, ExtentValue,
36};
37use crate::object_store::graveyard::Graveyard;
38use crate::object_store::journal::bootstrap_handle::BootstrapObjectHandle;
39use crate::object_store::journal::checksum_list::ChecksumList;
40use crate::object_store::journal::reader::{JournalReader, ReadResult};
41use crate::object_store::journal::super_block::{
42 SuperBlockHeader, SuperBlockInstance, SuperBlockManager,
43};
44use crate::object_store::journal::writer::JournalWriter;
45use crate::object_store::object_manager::ObjectManager;
46use crate::object_store::object_record::{AttributeKey, ObjectKey, ObjectKeyData, ObjectValue};
47use crate::object_store::transaction::{
48 AllocatorMutation, LockKey, Mutation, MutationV40, MutationV41, MutationV43, MutationV46,
49 MutationV47, MutationV49, MutationV50, MutationV54, ObjectStoreMutation, Options,
50 TRANSACTION_MAX_JOURNAL_USAGE, Transaction, TxnMutation, lock_keys,
51};
52use crate::object_store::{
53 AssocObj, DataObjectHandle, HandleOptions, HandleOwner, INVALID_OBJECT_ID, Item, ItemRef,
54 NewChildStoreOptions, ObjectStore, ReservedId,
55};
56use crate::range::RangeExt;
57use crate::round::{round_div, round_down};
58use crate::serialized_types::{
59 LATEST_VERSION, Migrate, Version, Versioned, migrate_nodefault, migrate_to_version,
60};
61use anyhow::{Context, Error, anyhow, bail, ensure};
62use event_listener::Event;
63use fprint::TypeFingerprint;
64use fuchsia_inspect::NumericProperty;
65use fuchsia_sync::Mutex;
66use futures::FutureExt as _;
67use futures::future::poll_fn;
68use rustc_hash::FxHashMap as HashMap;
69use serde::{Deserialize, Serialize};
70use static_assertions::const_assert;
71use std::clone::Clone;
72use std::collections::HashSet;
73use std::num::NonZero;
74use std::ops::{Bound, Range};
75use std::sync::atomic::{AtomicBool, Ordering};
76use std::sync::{Arc, OnceLock};
77use std::task::{Poll, Waker};
78use storage_device::Device;
79
80pub const BLOCK_SIZE: u64 = 4096;
82
83const CHUNK_SIZE: u64 = 131_072;
85const_assert!(CHUNK_SIZE > TRANSACTION_MAX_JOURNAL_USAGE);
86
87pub const DEFAULT_RECLAIM_SIZE: u64 = 524_288;
89
90pub const RESERVED_SPACE: u64 = 1_048_576;
93
94const RESET_XOR: u64 = 0xffffffffffffffff;
99
100pub type JournalCheckpoint = JournalCheckpointV32;
104
105#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize, TypeFingerprint)]
106pub struct JournalCheckpointV32 {
107 pub file_offset: u64,
108
109 pub checksum: Checksum,
112
113 pub version: Version,
117}
118
119pub type JournalRecord = JournalRecordV54;
120
121#[allow(clippy::large_enum_variant)]
122#[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint, Versioned)]
123#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
124pub enum JournalRecordV54 {
125 EndBlock,
127 Mutation { object_id: u64, mutation: MutationV54 },
130 Commit,
132 Discard(u64),
134 DidFlushDevice(u64),
144 DataChecksums(Range<u64>, crate::checksum::ChecksumsV38, bool),
152}
153
154#[allow(clippy::large_enum_variant)]
155#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
156#[migrate_to_version(JournalRecordV54)]
157#[migrate_nodefault]
158pub enum JournalRecordV50 {
159 EndBlock,
160 Mutation { object_id: u64, mutation: MutationV50 },
161 Commit,
162 Discard(u64),
163 DidFlushDevice(u64),
164 DataChecksums(Range<u64>, ChecksumsV38, bool),
165}
166
167#[allow(clippy::large_enum_variant)]
168#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
169#[migrate_to_version(JournalRecordV50)]
170pub enum JournalRecordV49 {
171 EndBlock,
172 Mutation { object_id: u64, mutation: MutationV49 },
173 Commit,
174 Discard(u64),
175 DidFlushDevice(u64),
176 DataChecksums(Range<u64>, ChecksumsV38, bool),
177}
178
179#[allow(clippy::large_enum_variant)]
180#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
181#[migrate_to_version(JournalRecordV49)]
182pub enum JournalRecordV47 {
183 EndBlock,
184 Mutation { object_id: u64, mutation: MutationV47 },
185 Commit,
186 Discard(u64),
187 DidFlushDevice(u64),
188 DataChecksums(Range<u64>, ChecksumsV38, bool),
189}
190
191#[allow(clippy::large_enum_variant)]
192#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
193#[migrate_to_version(JournalRecordV47)]
194pub enum JournalRecordV46 {
195 EndBlock,
196 Mutation { object_id: u64, mutation: MutationV46 },
197 Commit,
198 Discard(u64),
199 DidFlushDevice(u64),
200 DataChecksums(Range<u64>, ChecksumsV38, bool),
201}
202
203#[allow(clippy::large_enum_variant)]
204#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
205#[migrate_to_version(JournalRecordV46)]
206pub enum JournalRecordV43 {
207 EndBlock,
208 Mutation { object_id: u64, mutation: MutationV43 },
209 Commit,
210 Discard(u64),
211 DidFlushDevice(u64),
212 DataChecksums(Range<u64>, ChecksumsV38, bool),
213}
214
215#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
216#[migrate_to_version(JournalRecordV43)]
217pub enum JournalRecordV42 {
218 EndBlock,
219 Mutation { object_id: u64, mutation: MutationV41 },
220 Commit,
221 Discard(u64),
222 DidFlushDevice(u64),
223 DataChecksums(Range<u64>, ChecksumsV38, bool),
224}
225
226#[derive(Serialize, Deserialize, TypeFingerprint, Versioned)]
227pub enum JournalRecordV41 {
228 EndBlock,
229 Mutation { object_id: u64, mutation: MutationV41 },
230 Commit,
231 Discard(u64),
232 DidFlushDevice(u64),
233 DataChecksums(Range<u64>, ChecksumsV38),
234}
235
236impl From<JournalRecordV41> for JournalRecordV42 {
237 fn from(record: JournalRecordV41) -> Self {
238 match record {
239 JournalRecordV41::EndBlock => Self::EndBlock,
240 JournalRecordV41::Mutation { object_id, mutation } => {
241 Self::Mutation { object_id, mutation: mutation.into() }
242 }
243 JournalRecordV41::Commit => Self::Commit,
244 JournalRecordV41::Discard(offset) => Self::Discard(offset),
245 JournalRecordV41::DidFlushDevice(offset) => Self::DidFlushDevice(offset),
246 JournalRecordV41::DataChecksums(range, sums) => {
247 Self::DataChecksums(range, sums, true)
250 }
251 }
252 }
253}
254
255#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
256#[migrate_to_version(JournalRecordV41)]
257pub enum JournalRecordV40 {
258 EndBlock,
259 Mutation { object_id: u64, mutation: MutationV40 },
260 Commit,
261 Discard(u64),
262 DidFlushDevice(u64),
263 DataChecksums(Range<u64>, ChecksumsV38),
264}
265
266pub(super) fn journal_handle_options() -> HandleOptions {
267 HandleOptions { skip_journal_checks: true, ..Default::default() }
268}
269
270pub struct Journal {
276 objects: Arc<ObjectManager>,
277 handle: OnceLock<DataObjectHandle<ObjectStore>>,
278 super_block_manager: SuperBlockManager,
279 inner: Mutex<Inner>,
280 writer_mutex: Mutex<()>,
281 sync_mutex: futures::lock::Mutex<()>,
282 trace: AtomicBool,
283
284 reclaim_event: Event,
286}
287
288struct Inner {
289 super_block_header: SuperBlockHeader,
290
291 zero_offset: Option<u64>,
293
294 device_flushed_offset: u64,
296
297 needs_did_flush_device: bool,
299
300 writer: JournalWriter,
302
303 output_reset_version: bool,
306
307 flush_waker: Option<Waker>,
309
310 terminate: bool,
312
313 terminate_reason: Option<Error>,
315
316 disable_compactions: bool,
318
319 compaction_running: bool,
321
322 sync_waker: Option<Waker>,
324
325 flushed_offset: u64,
327
328 valid_to: u64,
333
334 discard_offset: Option<u64>,
338
339 reclaim_size: u64,
343
344 image_builder_mode: Option<SuperBlockInstance>,
345
346 barriers_enabled: bool,
350
351 needs_barrier: bool,
354
355 forced_compaction: bool,
357}
358
359impl Inner {
360 fn terminate(&mut self, reason: Option<Error>) {
361 self.terminate = true;
362
363 if let Some(err) = reason {
364 error!(error:? = err; "Terminating journal");
365 if let Some(prev_err) = self.terminate_reason.as_ref() {
367 error!(error:? = prev_err; "Journal previously terminated");
368 } else {
369 self.terminate_reason = Some(err);
370 }
371 }
372
373 if let Some(waker) = self.flush_waker.take() {
374 waker.wake();
375 }
376 if let Some(waker) = self.sync_waker.take() {
377 waker.wake();
378 }
379 }
380}
381
382pub struct JournalOptions {
383 pub reclaim_size: u64,
387
388 pub barriers_enabled: bool,
392}
393
394impl Default for JournalOptions {
395 fn default() -> Self {
396 JournalOptions { reclaim_size: DEFAULT_RECLAIM_SIZE, barriers_enabled: false }
397 }
398}
399
400struct JournaledTransactions {
401 transactions: Vec<JournaledTransaction>,
402 device_flushed_offset: u64,
403}
404
405#[derive(Debug, Default)]
406pub struct JournaledTransaction {
407 pub checkpoint: JournalCheckpoint,
408 pub root_parent_mutations: Vec<Mutation>,
409 pub root_mutations: Vec<Mutation>,
410 pub non_root_mutations: Vec<(u64, Mutation)>,
412 pub end_offset: u64,
413 pub checksums: Vec<JournaledChecksums>,
414
415 pub end_flush: Option<(u64, u64)>,
418
419 pub volume_deleted: Option<u64>,
421}
422
423impl JournaledTransaction {
424 fn new(checkpoint: JournalCheckpoint) -> Self {
425 Self { checkpoint, ..Default::default() }
426 }
427}
428
429const VOLUME_DELETED: u64 = u64::MAX;
430
431#[derive(Debug)]
432pub struct JournaledChecksums {
433 pub device_range: Range<u64>,
434 pub checksums: Checksums,
435 pub first_write: bool,
436}
437
438pub trait JournalHandle: ReadObjectHandle {
441 fn end_offset(&self) -> Option<u64>;
445 fn push_extent(&mut self, added_offset: u64, device_range: Range<u64>);
449 fn discard_extents(&mut self, discard_offset: u64);
451}
452
453impl<S: HandleOwner> JournalHandle for DataObjectHandle<S> {
457 fn end_offset(&self) -> Option<u64> {
458 None
459 }
460 fn push_extent(&mut self, _added_offset: u64, _device_range: Range<u64>) {
461 }
463 fn discard_extents(&mut self, _discard_offset: u64) {
464 }
466}
467
468#[fxfs_trace::trace]
469impl Journal {
470 pub fn new(objects: Arc<ObjectManager>, options: JournalOptions) -> Journal {
471 let starting_checksum = rand::random_range(1..u64::MAX);
472 Journal {
473 objects: objects,
474 handle: OnceLock::new(),
475 super_block_manager: SuperBlockManager::new(),
476 inner: Mutex::new(Inner {
477 super_block_header: SuperBlockHeader::default(),
478 zero_offset: None,
479 device_flushed_offset: 0,
480 needs_did_flush_device: false,
481 writer: JournalWriter::new(BLOCK_SIZE as usize, starting_checksum),
482 output_reset_version: false,
483 flush_waker: None,
484 terminate: false,
485 terminate_reason: None,
486 disable_compactions: false,
487 compaction_running: false,
488 sync_waker: None,
489 flushed_offset: 0,
490 valid_to: 0,
491 discard_offset: None,
492 reclaim_size: options.reclaim_size,
493 image_builder_mode: None,
494 barriers_enabled: options.barriers_enabled,
495 needs_barrier: false,
496 forced_compaction: false,
497 }),
498 writer_mutex: Mutex::new(()),
499 sync_mutex: futures::lock::Mutex::new(()),
500 trace: AtomicBool::new(false),
501 reclaim_event: Event::new(),
502 }
503 }
504
505 pub fn set_trace(&self, trace: bool) {
506 let old_value = self.trace.swap(trace, Ordering::Relaxed);
507 if trace != old_value {
508 info!(trace; "J: trace");
509 }
510 }
511
512 pub fn set_image_builder_mode(&self, mode: Option<SuperBlockInstance>) {
513 self.inner.lock().image_builder_mode = mode;
514 if let Some(instance) = mode {
515 *self.super_block_manager.next_instance.lock() = instance;
516 }
517 }
518
519 pub fn image_builder_mode(&self) -> Option<SuperBlockInstance> {
520 self.inner.lock().image_builder_mode
521 }
522
523 #[cfg(feature = "migration")]
524 pub fn set_filesystem_uuid(&self, uuid: &[u8; 16]) -> Result<(), Error> {
525 ensure!(
526 self.inner.lock().image_builder_mode.is_some(),
527 "Can only set filesystem uuid in image builder mode."
528 );
529 self.inner.lock().super_block_header.guid.0 = uuid::Uuid::from_bytes(*uuid);
530 Ok(())
531 }
532
533 pub(crate) async fn read_superblocks(
534 &self,
535 device: Arc<dyn Device>,
536 block_size: u64,
537 ) -> Result<(SuperBlockHeader, ObjectStore), Error> {
538 self.super_block_manager.load(device, block_size).await
539 }
540
541 fn validate_mutation(&self, mutation: &Mutation, block_size: u64, device_size: u64) -> bool {
545 match mutation {
546 Mutation::ObjectStore(ObjectStoreMutation {
547 item:
548 Item {
549 key:
550 ObjectKey {
551 data:
552 ObjectKeyData::Attribute(
553 _,
554 AttributeKey::Extent(ExtentKey { range }),
555 ),
556 ..
557 },
558 value: ObjectValue::Extent(ExtentValue::Some { device_offset, mode, .. }),
559 ..
560 },
561 ..
562 }) => {
563 if range.is_empty() || !range.is_aligned(block_size) {
564 return false;
565 }
566 let len = range.length().unwrap();
567 if let ExtentMode::Cow(checksums) = mode {
568 if checksums.len() > 0 {
569 if len % checksums.len() as u64 != 0 {
570 return false;
571 }
572 if (len / checksums.len() as u64) % block_size != 0 {
573 return false;
574 }
575 }
576 }
577 if *device_offset % block_size != 0
578 || *device_offset >= device_size
579 || device_size - *device_offset < len
580 {
581 return false;
582 }
583 }
584 Mutation::ObjectStore(_) => {}
585 Mutation::EncryptedObjectStore(_) => {}
586 Mutation::Allocator(AllocatorMutation::Allocate { device_range, owner_object_id }) => {
587 return !device_range.is_empty()
588 && *owner_object_id != INVALID_OBJECT_ID
589 && device_range.end <= device_size;
590 }
591 Mutation::Allocator(AllocatorMutation::Deallocate {
592 device_range,
593 owner_object_id,
594 }) => {
595 return !device_range.is_empty()
596 && *owner_object_id != INVALID_OBJECT_ID
597 && device_range.end <= device_size;
598 }
599 Mutation::Allocator(AllocatorMutation::MarkForDeletion(owner_object_id)) => {
600 return *owner_object_id != INVALID_OBJECT_ID;
601 }
602 Mutation::Allocator(AllocatorMutation::SetLimit { owner_object_id, .. }) => {
603 return *owner_object_id != INVALID_OBJECT_ID;
604 }
605 Mutation::BeginFlush => {}
606 Mutation::EndFlush => {}
607 Mutation::DeleteVolume => {}
608 Mutation::UpdateBorrowed(_) => {}
609 Mutation::UpdateMutationsKey(_) => {}
610 Mutation::CreateInternalDir(owner_object_id) => {
611 return *owner_object_id != INVALID_OBJECT_ID;
612 }
613 }
614 true
615 }
616
617 fn update_checksum_list(
619 &self,
620 journal_offset: u64,
621 mutation: &Mutation,
622 checksum_list: &mut ChecksumList,
623 ) -> Result<(), Error> {
624 match mutation {
625 Mutation::ObjectStore(_) => {}
626 Mutation::Allocator(AllocatorMutation::Deallocate { device_range, .. }) => {
627 checksum_list.mark_deallocated(journal_offset, device_range.clone().into());
628 }
629 _ => {}
630 }
631 Ok(())
632 }
633
634 #[trace]
636 pub async fn replay(
637 &self,
638 filesystem: Arc<FxFilesystem>,
639 on_new_allocator: Option<Box<dyn Fn(Arc<Allocator>) + Send + Sync>>,
640 ) -> Result<(), Error> {
641 let block_size = filesystem.block_size();
642
643 let (super_block, root_parent) =
644 self.super_block_manager.load(filesystem.device(), block_size).await?;
645
646 let root_parent = Arc::new(ObjectStore::attach_filesystem(root_parent, filesystem.clone()));
647
648 self.objects.set_root_parent_store(root_parent.clone());
649 let allocator =
650 Arc::new(Allocator::new(filesystem.clone(), super_block.allocator_object_id));
651 if let Some(on_new_allocator) = on_new_allocator {
652 on_new_allocator(allocator.clone());
653 }
654 self.objects.set_allocator(allocator.clone());
655 self.objects.set_borrowed_metadata_space(super_block.borrowed_metadata_space);
656 self.objects.set_last_end_offset(super_block.super_block_journal_file_offset);
657 {
658 let mut inner = self.inner.lock();
659 inner.super_block_header = super_block.clone();
660 }
661
662 let device = filesystem.device();
663
664 let mut handle;
665 {
666 let root_parent_layer = root_parent.tree().mutable_layer();
667 let mut iter = root_parent_layer
668 .seek(Bound::Included(&ObjectKey::attribute(
669 super_block.journal_object_id,
670 DEFAULT_DATA_ATTRIBUTE_ID,
671 AttributeKey::Extent(ExtentKey::search_key_from_offset(round_down(
672 super_block.journal_checkpoint.file_offset,
673 BLOCK_SIZE,
674 ))),
675 )))
676 .await
677 .context("Failed to seek root parent store")?;
678 let start_offset = if let Some(ItemRef {
679 key:
680 ObjectKey {
681 data:
682 ObjectKeyData::Attribute(
683 DEFAULT_DATA_ATTRIBUTE_ID,
684 AttributeKey::Extent(ExtentKey { range }),
685 ),
686 ..
687 },
688 ..
689 }) = iter.get()
690 {
691 range.start
692 } else {
693 0
694 };
695 handle = BootstrapObjectHandle::new_with_start_offset(
696 super_block.journal_object_id,
697 device.clone(),
698 start_offset,
699 );
700 while let Some(item) = iter.get() {
701 if !match item.into() {
702 Some((
703 object_id,
704 DEFAULT_DATA_ATTRIBUTE_ID,
705 ExtentKey { range },
706 ExtentValue::Some { device_offset, .. },
707 )) if object_id == super_block.journal_object_id => {
708 if let Some(end_offset) = handle.end_offset() {
709 if range.start != end_offset {
710 bail!(anyhow!(FxfsError::Inconsistent).context(format!(
711 "Unexpected journal extent {:?}, expected start: {}",
712 item, end_offset
713 )));
714 }
715 }
716 handle.push_extent(
717 0, *device_offset
719 ..*device_offset + range.length().context("Invalid extent")?,
720 );
721 true
722 }
723 _ => false,
724 } {
725 break;
726 }
727 iter.advance().await.context("Failed to advance root parent store iterator")?;
728 }
729 }
730
731 let mut reader = JournalReader::new(handle, &super_block.journal_checkpoint);
732 let JournaledTransactions { mut transactions, device_flushed_offset } = self
733 .read_transactions(&mut reader, None, INVALID_OBJECT_ID)
734 .await
735 .context("Reading transactions for replay")?;
736
737 let mut checksum_list = ChecksumList::new(device_flushed_offset);
739 let mut valid_to = reader.journal_file_checkpoint().file_offset;
740 let device_size = device.size();
741 'bad_replay: for JournaledTransaction {
742 checkpoint,
743 root_parent_mutations,
744 root_mutations,
745 non_root_mutations,
746 checksums,
747 ..
748 } in &transactions
749 {
750 for JournaledChecksums { device_range, checksums, first_write } in checksums {
751 checksum_list
752 .push(
753 checkpoint.file_offset,
754 device_range.clone(),
755 checksums.maybe_as_ref().context("Malformed checksums")?,
756 *first_write,
757 )
758 .context("Pushing journal checksum records to checksum list")?;
759 }
760 for mutation in root_parent_mutations
761 .iter()
762 .chain(root_mutations)
763 .chain(non_root_mutations.iter().map(|(_, m)| m))
764 {
765 if !self.validate_mutation(mutation, block_size, device_size) {
766 info!(mutation:?; "Stopping replay at bad mutation");
767 valid_to = checkpoint.file_offset;
768 break 'bad_replay;
769 }
770 self.update_checksum_list(checkpoint.file_offset, &mutation, &mut checksum_list)?;
771 }
772 }
773
774 let valid_to = checksum_list
777 .verify(device.as_ref(), valid_to)
778 .await
779 .context("Failed to validate checksums")?;
780
781 let mut last_checkpoint = reader.journal_file_checkpoint();
784 let mut journal_offsets = super_block.journal_file_offsets.clone();
785
786 for (
789 index,
790 JournaledTransaction {
791 checkpoint,
792 root_parent_mutations,
793 end_flush,
794 volume_deleted,
795 ..
796 },
797 ) in transactions.iter_mut().enumerate()
798 {
799 if checkpoint.file_offset >= valid_to {
800 last_checkpoint = checkpoint.clone();
801
802 transactions.truncate(index);
804 break;
805 }
806
807 let context = ApplyContext { mode: ApplyMode::Replay, checkpoint: checkpoint.clone() };
808 for mutation in root_parent_mutations.drain(..) {
809 self.objects
810 .apply_mutation(
811 super_block.root_parent_store_object_id,
812 mutation,
813 &context,
814 AssocObj::None,
815 )
816 .context("Failed to replay root parent store mutations")?;
817 }
818
819 if let Some((object_id, journal_offset)) = end_flush {
820 journal_offsets.insert(*object_id, *journal_offset);
821 }
822
823 if let Some(object_id) = volume_deleted {
824 journal_offsets.insert(*object_id, VOLUME_DELETED);
825 }
826 }
827
828 let root_store = ObjectStore::open(
830 &root_parent,
831 super_block.root_store_object_id,
832 Box::new(NullCache {}),
833 )
834 .await
835 .context("Unable to open root store")?;
836
837 ensure!(
838 !root_store.is_encrypted(),
839 anyhow!(FxfsError::Inconsistent).context("Root store is encrypted")
840 );
841 self.objects.set_root_store(root_store);
842
843 let root_store_offset =
844 journal_offsets.get(&super_block.root_store_object_id).copied().unwrap_or(0);
845
846 for JournaledTransaction { checkpoint, root_mutations, .. } in &mut transactions {
848 if checkpoint.file_offset < root_store_offset {
849 continue;
850 }
851
852 let context = ApplyContext { mode: ApplyMode::Replay, checkpoint: checkpoint.clone() };
853 for mutation in root_mutations.drain(..) {
854 self.objects
855 .apply_mutation(
856 super_block.root_store_object_id,
857 mutation,
858 &context,
859 AssocObj::None,
860 )
861 .context("Failed to replay root store mutations")?;
862 }
863 }
864
865 allocator.open().await.context("Failed to open allocator")?;
867
868 for JournaledTransaction { checkpoint, non_root_mutations, end_offset, .. } in transactions
870 {
871 self.objects
872 .replay_mutations(
873 non_root_mutations,
874 &journal_offsets,
875 &ApplyContext { mode: ApplyMode::Replay, checkpoint },
876 end_offset,
877 )
878 .await
879 .context("Failed to replay mutations")?;
880 }
881
882 allocator.on_replay_complete().await.context("Failed to complete replay for allocator")?;
883
884 let discarded_to =
885 if last_checkpoint.file_offset != reader.journal_file_checkpoint().file_offset {
886 Some(reader.journal_file_checkpoint().file_offset)
887 } else {
888 None
889 };
890
891 {
893 if last_checkpoint.file_offset < super_block.super_block_journal_file_offset {
894 return Err(anyhow!(FxfsError::Inconsistent).context(format!(
895 "journal replay cut short; journal finishes at {}, but super-block was \
896 written at {}",
897 last_checkpoint.file_offset, super_block.super_block_journal_file_offset
898 )));
899 }
900 let handle = ObjectStore::open_object(
901 &root_parent,
902 super_block.journal_object_id,
903 journal_handle_options(),
904 None,
905 )
906 .await
907 .with_context(|| {
908 format!(
909 "Failed to open journal file (object id: {})",
910 super_block.journal_object_id
911 )
912 })?;
913 let _ = self.handle.set(handle);
914 let mut inner = self.inner.lock();
915 reader.skip_to_end_of_block();
916 let mut writer_checkpoint = reader.journal_file_checkpoint();
917
918 std::mem::drop(reader);
920
921 writer_checkpoint.checksum ^= RESET_XOR;
923 writer_checkpoint.version = LATEST_VERSION;
924 inner.flushed_offset = writer_checkpoint.file_offset;
925
926 inner.device_flushed_offset = inner.flushed_offset;
928
929 inner.writer.seek(writer_checkpoint);
930 inner.output_reset_version = true;
931 inner.valid_to = last_checkpoint.file_offset;
932 if last_checkpoint.file_offset < inner.flushed_offset {
933 inner.discard_offset = Some(last_checkpoint.file_offset);
934 }
935 }
936
937 self.objects
938 .on_replay_complete()
939 .await
940 .context("Failed to complete replay for object manager")?;
941
942 info!(checkpoint = last_checkpoint.file_offset, discarded_to; "replay complete");
943 Ok(())
944 }
945
946 async fn read_transactions(
947 &self,
948 reader: &mut JournalReader,
949 end_offset: Option<u64>,
950 object_id_filter: u64,
951 ) -> Result<JournaledTransactions, Error> {
952 let mut transactions = Vec::new();
953 let (mut device_flushed_offset, root_parent_store_object_id, root_store_object_id) = {
954 let super_block = &self.inner.lock().super_block_header;
955 (
956 super_block.super_block_journal_file_offset,
957 super_block.root_parent_store_object_id,
958 super_block.root_store_object_id,
959 )
960 };
961 let mut current_transaction = None;
962 let mut begin_flush_offsets = HashMap::default();
963 let mut stores_deleted = HashSet::new();
964 loop {
965 let checkpoint = reader.journal_file_checkpoint();
967 if let Some(end_offset) = end_offset {
968 if checkpoint.file_offset >= end_offset {
969 break;
970 }
971 }
972 let result =
973 reader.deserialize().await.context("Failed to deserialize journal record")?;
974 match result {
975 ReadResult::Reset(_) => {
976 if current_transaction.is_some() {
977 current_transaction = None;
978 transactions.pop();
979 }
980 let offset = reader.journal_file_checkpoint().file_offset;
981 if offset > device_flushed_offset {
982 device_flushed_offset = offset;
983 }
984 }
985 ReadResult::Some(record) => {
986 match record {
987 JournalRecord::EndBlock => {
988 reader.skip_to_end_of_block();
989 }
990 JournalRecord::Mutation { object_id, mutation } => {
991 let current_transaction = match current_transaction.as_mut() {
992 None => {
993 transactions.push(JournaledTransaction::new(checkpoint));
994 current_transaction = transactions.last_mut();
995 current_transaction.as_mut().unwrap()
996 }
997 Some(transaction) => transaction,
998 };
999
1000 if stores_deleted.contains(&object_id) {
1001 bail!(
1002 anyhow!(FxfsError::Inconsistent)
1003 .context("Encountered mutations for deleted store")
1004 );
1005 }
1006
1007 match &mutation {
1008 Mutation::BeginFlush => {
1009 begin_flush_offsets.insert(
1010 object_id,
1011 current_transaction.checkpoint.file_offset,
1012 );
1013 }
1014 Mutation::EndFlush => {
1015 if let Some(offset) = begin_flush_offsets.remove(&object_id) {
1016 if let Some(deleted_volume) =
1017 ¤t_transaction.volume_deleted
1018 {
1019 if *deleted_volume == object_id {
1020 bail!(anyhow!(FxfsError::Inconsistent).context(
1021 "Multiple EndFlush/DeleteVolume mutations in a \
1022 single transaction for the same object"
1023 ));
1024 }
1025 }
1026 if current_transaction
1030 .end_flush
1031 .replace((object_id, offset + 1))
1032 .is_some()
1033 {
1034 bail!(anyhow!(FxfsError::Inconsistent).context(
1035 "Multiple EndFlush mutations in a \
1036 single transaction"
1037 ));
1038 }
1039 }
1040 }
1041 Mutation::DeleteVolume => {
1042 if let Some((flushed_object, _)) =
1043 ¤t_transaction.end_flush
1044 {
1045 if *flushed_object == object_id {
1046 bail!(anyhow!(FxfsError::Inconsistent).context(
1047 "Multiple EndFlush/DeleteVolume mutations in a \
1048 single transaction for the same object"
1049 ));
1050 }
1051 }
1052 if current_transaction
1053 .volume_deleted
1054 .replace(object_id)
1055 .is_some()
1056 {
1057 bail!(anyhow!(FxfsError::Inconsistent).context(
1058 "Multiple DeleteVolume mutations in a single \
1059 transaction"
1060 ));
1061 }
1062 stores_deleted.insert(object_id);
1063 }
1064 _ => {}
1065 }
1066
1067 if (object_id_filter == INVALID_OBJECT_ID
1070 || object_id_filter == object_id)
1071 && self.should_apply(object_id, ¤t_transaction.checkpoint)
1072 {
1073 if object_id == root_parent_store_object_id {
1074 current_transaction.root_parent_mutations.push(mutation);
1075 } else if object_id == root_store_object_id {
1076 current_transaction.root_mutations.push(mutation);
1077 } else {
1078 current_transaction
1079 .non_root_mutations
1080 .push((object_id, mutation));
1081 }
1082 }
1083 }
1084 JournalRecord::DataChecksums(device_range, checksums, first_write) => {
1085 let current_transaction = match current_transaction.as_mut() {
1086 None => {
1087 transactions.push(JournaledTransaction::new(checkpoint));
1088 current_transaction = transactions.last_mut();
1089 current_transaction.as_mut().unwrap()
1090 }
1091 Some(transaction) => transaction,
1092 };
1093 current_transaction.checksums.push(JournaledChecksums {
1094 device_range,
1095 checksums,
1096 first_write,
1097 });
1098 }
1099 JournalRecord::Commit => {
1100 if let Some(&mut JournaledTransaction {
1101 ref checkpoint,
1102 ref root_parent_mutations,
1103 ref mut end_offset,
1104 ..
1105 }) = current_transaction.take()
1106 {
1107 for mutation in root_parent_mutations {
1108 if let Mutation::ObjectStore(ObjectStoreMutation {
1112 item:
1113 Item {
1114 key:
1115 ObjectKey {
1116 object_id,
1117 data:
1118 ObjectKeyData::Attribute(
1119 DEFAULT_DATA_ATTRIBUTE_ID,
1120 AttributeKey::Extent(ExtentKey {
1121 range,
1122 }),
1123 ),
1124 ..
1125 },
1126 value:
1127 ObjectValue::Extent(ExtentValue::Some {
1128 device_offset,
1129 ..
1130 }),
1131 ..
1132 },
1133 ..
1134 }) = mutation
1135 {
1136 let handle = reader.handle();
1139 if *object_id != handle.object_id() {
1140 continue;
1141 }
1142 if let Some(end_offset) = handle.end_offset() {
1143 if range.start != end_offset {
1144 bail!(anyhow!(FxfsError::Inconsistent).context(
1145 format!(
1146 "Unexpected journal extent {:?} -> {}, \
1147 expected start: {}",
1148 range, device_offset, end_offset,
1149 )
1150 ));
1151 }
1152 }
1153 handle.push_extent(
1154 checkpoint.file_offset,
1155 *device_offset
1156 ..*device_offset
1157 + range.length().context("Invalid extent")?,
1158 );
1159 }
1160 }
1161 *end_offset = reader.journal_file_checkpoint().file_offset;
1162 }
1163 }
1164 JournalRecord::Discard(offset) => {
1165 if offset == 0 {
1166 bail!(
1167 anyhow!(FxfsError::Inconsistent)
1168 .context("Invalid offset for Discard")
1169 );
1170 }
1171 if let Some(transaction) = current_transaction.as_ref() {
1172 if transaction.checkpoint.file_offset < offset {
1173 continue;
1175 }
1176 }
1177 current_transaction = None;
1178 while let Some(transaction) = transactions.last() {
1179 if transaction.checkpoint.file_offset < offset {
1180 break;
1181 }
1182 transactions.pop();
1183 }
1184 reader.handle().discard_extents(offset);
1185 }
1186 JournalRecord::DidFlushDevice(offset) => {
1187 if offset > device_flushed_offset {
1188 device_flushed_offset = offset;
1189 }
1190 }
1191 }
1192 }
1193 ReadResult::ChecksumMismatch => break,
1195 }
1196 }
1197
1198 if current_transaction.is_some() {
1200 transactions.pop();
1201 }
1202
1203 Ok(JournaledTransactions { transactions, device_flushed_offset })
1204 }
1205
1206 pub async fn init_empty(&self, filesystem: Arc<FxFilesystem>) -> Result<(), Error> {
1209 const INIT_ROOT_PARENT_STORE_OBJECT_ID: u64 = 3;
1215 const INIT_ROOT_STORE_OBJECT_ID: u64 = 4;
1216 const INIT_ALLOCATOR_OBJECT_ID: u64 = 5;
1217
1218 info!(device_size = filesystem.device().size(); "Formatting");
1219
1220 let checkpoint = JournalCheckpoint {
1221 version: LATEST_VERSION,
1222 ..self.inner.lock().writer.journal_file_checkpoint()
1223 };
1224
1225 let mut current_generation = 1;
1226 if filesystem.options().image_builder_mode.is_some() {
1227 let block_size = filesystem.block_size();
1235 match self.read_superblocks(filesystem.device(), block_size).await {
1236 Ok((super_block, _)) => {
1237 log::info!(
1238 "Found existing superblock with generation {}. Bumping by 1.",
1239 super_block.generation
1240 );
1241 current_generation = super_block.generation.wrapping_add(1);
1242 }
1243 Err(_) => {
1244 }
1248 }
1249 }
1250
1251 let root_parent = ObjectStore::new_empty(
1252 None,
1253 INIT_ROOT_PARENT_STORE_OBJECT_ID,
1254 filesystem.clone(),
1255 Box::new(NullCache {}),
1256 );
1257 self.objects.set_root_parent_store(root_parent.clone());
1258
1259 let allocator = Arc::new(Allocator::new(filesystem.clone(), INIT_ALLOCATOR_OBJECT_ID));
1260 self.objects.set_allocator(allocator.clone());
1261 self.objects.init_metadata_reservation()?;
1262
1263 let journal_handle;
1264 let super_block_a_handle;
1265 let super_block_b_handle;
1266 let root_store;
1267 let mut transaction = filesystem
1268 .clone()
1269 .new_transaction(
1270 lock_keys![],
1271 Options { skip_journal_checks: true, ..Default::default() },
1272 )
1273 .await?;
1274 root_store = root_parent
1275 .new_child_store(
1276 &mut transaction,
1277 NewChildStoreOptions { object_id: INIT_ROOT_STORE_OBJECT_ID, ..Default::default() },
1278 Box::new(NullCache {}),
1279 )
1280 .await
1281 .context("new_child_store")?;
1282 self.objects.set_root_store(root_store.clone());
1283
1284 allocator.create(&mut transaction).await?;
1285
1286 super_block_a_handle = ObjectStore::create_object_with_id(
1288 &root_store,
1289 &mut transaction,
1290 ReservedId::new(&root_store, NonZero::new(SuperBlockInstance::A.object_id()).unwrap()),
1291 HandleOptions::default(),
1292 None,
1293 )
1294 .context("create super block")?;
1295 root_store.update_last_object_id(SuperBlockInstance::A.object_id());
1296 super_block_a_handle
1297 .extend(&mut transaction, SuperBlockInstance::A.first_extent())
1298 .await
1299 .context("extend super block")?;
1300 super_block_b_handle = ObjectStore::create_object_with_id(
1301 &root_store,
1302 &mut transaction,
1303 ReservedId::new(&root_store, NonZero::new(SuperBlockInstance::B.object_id()).unwrap()),
1304 HandleOptions::default(),
1305 None,
1306 )
1307 .context("create super block")?;
1308 root_store.update_last_object_id(SuperBlockInstance::B.object_id());
1309 super_block_b_handle
1310 .extend(&mut transaction, SuperBlockInstance::B.first_extent())
1311 .await
1312 .context("extend super block")?;
1313
1314 journal_handle = ObjectStore::create_object(
1316 &root_parent,
1317 &mut transaction,
1318 journal_handle_options(),
1319 None,
1320 )
1321 .await
1322 .context("create journal")?;
1323 if self.inner.lock().image_builder_mode.is_none() {
1324 let mut file_range = 0..self.chunk_size();
1325 journal_handle
1326 .preallocate_range(&mut transaction, &mut file_range)
1327 .await
1328 .context("preallocate journal")?;
1329 if file_range.start < file_range.end {
1330 bail!("preallocate_range returned too little space");
1331 }
1332 }
1333
1334 root_store.create(&mut transaction).await?;
1336
1337 root_parent.set_graveyard_directory_object_id(
1339 Graveyard::create(&mut transaction, &root_parent).await?,
1340 );
1341
1342 transaction.commit().await?;
1343
1344 self.inner.lock().super_block_header = SuperBlockHeader::new(
1345 current_generation,
1346 root_parent.store_object_id(),
1347 root_parent.graveyard_directory_object_id(),
1348 root_store.store_object_id(),
1349 allocator.object_id(),
1350 journal_handle.object_id(),
1351 checkpoint,
1352 LATEST_VERSION,
1353 );
1354
1355 let _ = self.handle.set(journal_handle);
1357 Ok(())
1358 }
1359
1360 pub async fn allocate_journal(&self) -> Result<(), Error> {
1363 let handle = self.handle.get().unwrap();
1364 let filesystem = handle.store().filesystem();
1365 let mut transaction = filesystem
1366 .clone()
1367 .new_transaction(
1368 lock_keys![LockKey::object(handle.store().store_object_id(), handle.object_id()),],
1369 Options { skip_journal_checks: true, ..Default::default() },
1370 )
1371 .await?;
1372 let mut file_range = 0..self.chunk_size();
1373 self.handle
1374 .get()
1375 .unwrap()
1376 .preallocate_range(&mut transaction, &mut file_range)
1377 .await
1378 .context("preallocate journal")?;
1379 if file_range.start < file_range.end {
1380 bail!("preallocate_range returned too little space");
1381 }
1382 transaction.commit().await?;
1383 Ok(())
1384 }
1385
1386 pub async fn init_superblocks(&self) -> Result<(), Error> {
1387 for _ in 0..2 {
1389 self.write_super_block().await?;
1390 }
1391 Ok(())
1392 }
1393
1394 pub async fn read_transactions_for_object(
1400 &self,
1401 object_id: u64,
1402 ) -> Result<Vec<JournaledTransaction>, Error> {
1403 let handle = self.handle.get().expect("No journal handle");
1404 let handle = ObjectStore::open_object(
1406 handle.owner(),
1407 handle.object_id(),
1408 journal_handle_options(),
1409 None,
1410 )
1411 .await?;
1412
1413 let checkpoint = match self.objects.journal_checkpoint(object_id) {
1414 Some(checkpoint) => checkpoint,
1415 None => return Ok(vec![]),
1416 };
1417 let mut reader = JournalReader::new(handle, &checkpoint);
1418 let end_offset = self.inner.lock().valid_to;
1421 Ok(self.read_transactions(&mut reader, Some(end_offset), object_id).await?.transactions)
1422 }
1423
1424 pub async fn commit(&self, transaction: &mut Transaction<'_>) -> Result<u64, Error> {
1426 if transaction.is_empty() {
1427 return Ok(self.inner.lock().writer.journal_file_checkpoint().file_offset);
1428 }
1429
1430 self.pre_commit(transaction).await?;
1431 Ok(self.write_and_apply_mutations(transaction))
1432 }
1433
1434 async fn pre_commit(&self, transaction: &Transaction<'_>) -> Result<(), Error> {
1437 let handle;
1438
1439 let (size, zero_offset) = {
1440 let mut inner = self.inner.lock();
1441
1442 if std::mem::take(&mut inner.output_reset_version) {
1444 LATEST_VERSION.serialize_into(&mut inner.writer)?;
1445 }
1446
1447 if let Some(discard_offset) = inner.discard_offset {
1448 JournalRecord::Discard(discard_offset).serialize_into(&mut inner.writer)?;
1449 inner.discard_offset = None;
1450 }
1451
1452 if inner.needs_did_flush_device {
1453 let offset = inner.device_flushed_offset;
1454 JournalRecord::DidFlushDevice(offset).serialize_into(&mut inner.writer)?;
1455 inner.needs_did_flush_device = false;
1456 }
1457
1458 handle = match self.handle.get() {
1459 None => return Ok(()),
1460 Some(x) => x,
1461 };
1462
1463 let file_offset = inner.writer.journal_file_checkpoint().file_offset;
1464
1465 let size = handle.get_size();
1466 let size = if file_offset + self.chunk_size() > size { Some(size) } else { None };
1467
1468 if size.is_none()
1469 && inner.zero_offset.is_none()
1470 && !self.objects.needs_borrow_for_journal(file_offset)
1471 {
1472 return Ok(());
1473 }
1474
1475 (size, inner.zero_offset)
1476 };
1477
1478 let mut transaction = handle
1479 .new_transaction_with_options(Options {
1480 skip_journal_checks: true,
1481 borrow_metadata_space: true,
1482 allocator_reservation: Some(self.objects.metadata_reservation()),
1483 txn_guard: Some(transaction.txn_guard()),
1484 ..Default::default()
1485 })
1486 .await?;
1487 if let Some(size) = size {
1488 handle
1489 .preallocate_range(&mut transaction, &mut (size..size + self.chunk_size()))
1490 .await?;
1491 }
1492 if let Some(zero_offset) = zero_offset {
1493 handle.zero(&mut transaction, 0..zero_offset).await?;
1494 }
1495
1496 self.write_and_apply_mutations(&mut transaction);
1499
1500 let mut inner = self.inner.lock();
1501
1502 if let Some(size) = size {
1505 assert!(inner.writer.journal_file_checkpoint().file_offset < size);
1506 }
1507
1508 if inner.zero_offset == zero_offset {
1509 inner.zero_offset = None;
1510 }
1511
1512 Ok(())
1513 }
1514
1515 fn should_apply(&self, object_id: u64, journal_file_checkpoint: &JournalCheckpoint) -> bool {
1519 let super_block_header = &self.inner.lock().super_block_header;
1520 let offset = super_block_header
1521 .journal_file_offsets
1522 .get(&object_id)
1523 .cloned()
1524 .unwrap_or(super_block_header.super_block_journal_file_offset);
1525 journal_file_checkpoint.file_offset >= offset
1526 }
1527
1528 async fn write_super_block(&self) -> Result<(), Error> {
1531 let root_parent_store = self.objects.root_parent_store();
1532
1533 let old_layers;
1538 let old_super_block_offset;
1539 let mut new_super_block_header;
1540 let checkpoint;
1541 let borrowed;
1542
1543 {
1544 let _sync_guard = debug_assert_not_too_long!(self.sync_mutex.lock());
1545 {
1546 let _write_guard = self.writer_mutex.lock();
1547 (checkpoint, borrowed) = self.pad_to_block()?;
1548 old_layers = super_block::compact_root_parent(&*root_parent_store)?;
1549 }
1550 self.flush_device(checkpoint.file_offset)
1551 .await
1552 .context("flush failed when writing superblock")?;
1553 }
1554
1555 new_super_block_header = self.inner.lock().super_block_header.clone();
1556
1557 old_super_block_offset = new_super_block_header.journal_checkpoint.file_offset;
1558
1559 let (journal_file_offsets, min_checkpoint) = self.objects.journal_file_offsets();
1560
1561 new_super_block_header.generation = new_super_block_header.generation.wrapping_add(1);
1562 new_super_block_header.super_block_journal_file_offset = checkpoint.file_offset;
1563 new_super_block_header.journal_checkpoint = min_checkpoint.unwrap_or(checkpoint);
1564 new_super_block_header.journal_checkpoint.version = LATEST_VERSION;
1565 new_super_block_header.journal_file_offsets = journal_file_offsets;
1566 new_super_block_header.borrowed_metadata_space = borrowed;
1567
1568 self.super_block_manager
1569 .save(
1570 new_super_block_header.clone(),
1571 self.objects.root_parent_store().filesystem(),
1572 old_layers,
1573 )
1574 .await?;
1575 {
1576 let mut inner = self.inner.lock();
1577 inner.super_block_header = new_super_block_header;
1578 inner.zero_offset = Some(round_down(old_super_block_offset, BLOCK_SIZE));
1579 }
1580
1581 Ok(())
1582 }
1583
1584 pub async fn sync(
1591 &self,
1592 options: SyncOptions<'_>,
1593 ) -> Result<Option<(JournalCheckpoint, u64)>, Error> {
1594 let _guard = debug_assert_not_too_long!(self.sync_mutex.lock());
1595
1596 let (checkpoint, borrowed) = {
1597 if let Some(precondition) = options.precondition {
1598 if !precondition() {
1599 return Ok(None);
1600 }
1601 }
1602
1603 let _guard = self.writer_mutex.lock();
1606
1607 self.pad_to_block()?
1608 };
1609
1610 if options.flush_device {
1611 self.flush_device(checkpoint.file_offset).await.context("sync: flush failed")?;
1612 }
1613
1614 Ok(Some((checkpoint, borrowed)))
1615 }
1616
1617 fn pad_to_block(&self) -> Result<(JournalCheckpoint, u64), Error> {
1621 let mut inner = self.inner.lock();
1622 let checkpoint = inner.writer.journal_file_checkpoint();
1623 if checkpoint.file_offset % BLOCK_SIZE != 0 {
1624 JournalRecord::EndBlock.serialize_into(&mut inner.writer)?;
1625 inner.writer.pad_to_block()?;
1626 if let Some(waker) = inner.flush_waker.take() {
1627 waker.wake();
1628 }
1629 }
1630 Ok((checkpoint, self.objects.borrowed_metadata_space()))
1631 }
1632
1633 async fn flush_device(&self, checkpoint_offset: u64) -> Result<(), Error> {
1634 assert!(
1635 self.inner.lock().image_builder_mode.is_none(),
1636 "flush_device called in image builder mode"
1637 );
1638 debug_assert_not_too_long!(poll_fn(|ctx| {
1639 let mut inner = self.inner.lock();
1640 if inner.flushed_offset >= checkpoint_offset {
1641 Poll::Ready(Ok(()))
1642 } else if inner.terminate {
1643 let context = inner
1644 .terminate_reason
1645 .as_ref()
1646 .map(|e| format!("Journal closed with error: {:?}", e))
1647 .unwrap_or_else(|| "Journal closed".to_string());
1648 Poll::Ready(Err(anyhow!(FxfsError::JournalFlushError).context(context)))
1649 } else {
1650 inner.sync_waker = Some(ctx.waker().clone());
1651 Poll::Pending
1652 }
1653 }))?;
1654
1655 let needs_flush = self.inner.lock().device_flushed_offset < checkpoint_offset;
1656 if needs_flush {
1657 let trace = self.trace.load(Ordering::Relaxed);
1658 if trace {
1659 info!("J: start flush device");
1660 }
1661 self.handle.get().unwrap().flush_device().await?;
1662 if trace {
1663 info!("J: end flush device");
1664 }
1665
1666 {
1674 let mut inner = self.inner.lock();
1675 inner.device_flushed_offset = checkpoint_offset;
1676 inner.needs_did_flush_device = true;
1677 }
1678
1679 self.objects.allocator().did_flush_device(checkpoint_offset);
1682 if trace {
1683 info!("J: did flush device");
1684 }
1685 }
1686
1687 Ok(())
1688 }
1689
1690 pub fn super_block_header(&self) -> SuperBlockHeader {
1692 self.inner.lock().super_block_header.clone()
1693 }
1694
1695 pub async fn check_journal_space(&self) -> Result<(), Error> {
1697 loop {
1698 debug_assert_not_too_long!({
1699 let inner = self.inner.lock();
1700 if inner.terminate {
1701 let context = inner
1704 .terminate_reason
1705 .as_ref()
1706 .map(|e| format!("Journal closed with error: {:?}", e))
1707 .unwrap_or_else(|| "Journal closed".to_string());
1708 break Err(anyhow!(FxfsError::JournalFlushError).context(context));
1709 }
1710 if self.objects.last_end_offset()
1711 - inner.super_block_header.journal_checkpoint.file_offset
1712 < inner.reclaim_size
1713 {
1714 break Ok(());
1715 }
1716 if inner.image_builder_mode.is_some() {
1717 break Ok(());
1718 }
1719 if inner.disable_compactions {
1720 break Err(
1721 anyhow!(FxfsError::JournalFlushError).context("Compactions disabled")
1722 );
1723 }
1724 self.reclaim_event.listen()
1725 });
1726 }
1727 }
1728
1729 fn chunk_size(&self) -> u64 {
1730 CHUNK_SIZE
1731 }
1732
1733 fn write_and_apply_mutations(&self, transaction: &mut Transaction<'_>) -> u64 {
1734 let checkpoint_before;
1735 let checkpoint_after;
1736 {
1737 let _guard = self.writer_mutex.lock();
1738 checkpoint_before = {
1739 let mut inner = self.inner.lock();
1740 if transaction.includes_write() {
1741 inner.needs_barrier = true;
1742 }
1743 let checkpoint = inner.writer.journal_file_checkpoint();
1744 for TxnMutation { object_id, mutation, .. } in transaction.mutations() {
1745 self.objects.write_mutation(
1746 *object_id,
1747 mutation,
1748 Writer(*object_id, &mut inner.writer),
1749 );
1750 }
1751 checkpoint
1752 };
1753 let maybe_mutation =
1754 self.objects.apply_transaction(transaction, &checkpoint_before).expect(
1755 "apply_transaction should not fail in live mode; \
1756 filesystem will be in an inconsistent state",
1757 );
1758 checkpoint_after = {
1759 let mut inner = self.inner.lock();
1760 if let Some(mutation) = maybe_mutation {
1761 inner
1762 .writer
1763 .write_record(&JournalRecord::Mutation { object_id: 0, mutation })
1764 .unwrap();
1765 }
1766 for (device_range, checksums, first_write) in
1767 transaction.take_checksums().into_iter()
1768 {
1769 inner
1770 .writer
1771 .write_record(&JournalRecord::DataChecksums(
1772 device_range,
1773 Checksums::fletcher(checksums),
1774 first_write,
1775 ))
1776 .unwrap();
1777 }
1778 inner.writer.write_record(&JournalRecord::Commit).unwrap();
1779
1780 inner.writer.journal_file_checkpoint()
1781 };
1782 }
1783 self.objects.did_commit_transaction(
1784 transaction,
1785 &checkpoint_before,
1786 checkpoint_after.file_offset,
1787 );
1788
1789 if let Some(waker) = self.inner.lock().flush_waker.take() {
1790 waker.wake();
1791 }
1792
1793 checkpoint_before.file_offset
1794 }
1795
1796 pub async fn flush_task(self: Arc<Self>) {
1800 let mut flush_fut = None;
1801 let mut compact_fut = None;
1802 let mut flush_error = false;
1803 poll_fn(|ctx| {
1804 loop {
1805 {
1806 let mut inner = self.inner.lock();
1807 if flush_fut.is_none() && !flush_error && self.handle.get().is_some() {
1808 let flushable = inner.writer.flushable_bytes();
1809 if flushable > 0 {
1810 flush_fut = Some(Box::pin(self.flush(flushable)));
1811 }
1812 }
1813 if inner.terminate && flush_fut.is_none() && compact_fut.is_none() {
1814 return Poll::Ready(());
1815 }
1816 if compact_fut.is_none()
1820 && !inner.terminate
1821 && !inner.disable_compactions
1822 && inner.image_builder_mode.is_none()
1823 && self.objects.last_end_offset()
1824 - inner.super_block_header.journal_checkpoint.file_offset
1825 > inner.reclaim_size / 2
1826 {
1827 compact_fut = Some(Box::pin(self.compact()));
1828 inner.compaction_running = true;
1829 }
1830 inner.flush_waker = Some(ctx.waker().clone());
1831 }
1832 let mut pending = true;
1833 if let Some(fut) = flush_fut.as_mut() {
1834 if let Poll::Ready(result) = fut.poll_unpin(ctx) {
1835 if let Err(e) = result {
1836 self.inner.lock().terminate(Some(e.context("Flush error")));
1837 self.reclaim_event.notify(usize::MAX);
1838 flush_error = true;
1839 }
1840 flush_fut = None;
1841 pending = false;
1842 }
1843 }
1844 if let Some(fut) = compact_fut.as_mut() {
1845 if let Poll::Ready(result) = fut.poll_unpin(ctx) {
1846 let mut inner = self.inner.lock();
1847 if let Err(e) = result {
1848 inner.terminate(Some(e.context("Compaction error")));
1849 }
1850 compact_fut = None;
1851 inner.compaction_running = false;
1852 self.reclaim_event.notify(usize::MAX);
1853 pending = false;
1854 }
1855 }
1856 if pending {
1857 return Poll::Pending;
1858 }
1859 }
1860 })
1861 .await;
1862 }
1863
1864 pub fn get_compaction_yielder(&self) -> CompactionYielder<'_> {
1866 CompactionYielder::new(self)
1867 }
1868
1869 async fn flush(&self, amount: usize) -> Result<(), Error> {
1870 let handle = self.handle.get().unwrap();
1871 let mut buf = handle.allocate_buffer(amount).await;
1872 let (offset, len, barrier_on_first_write) = {
1873 let mut inner = self.inner.lock();
1874 let offset = inner.writer.take_flushable(buf.as_mut());
1875 let barrier_on_first_write = inner.needs_barrier && inner.barriers_enabled;
1876 inner.needs_barrier = false;
1879 (offset, buf.len() as u64, barrier_on_first_write)
1880 };
1881 self.handle
1882 .get()
1883 .unwrap()
1884 .overwrite(
1885 offset,
1886 buf.as_mut(),
1887 OverwriteOptions { barrier_on_first_write, ..Default::default() },
1888 )
1889 .await?;
1890
1891 let mut inner = self.inner.lock();
1892 if let Some(waker) = inner.sync_waker.take() {
1893 waker.wake();
1894 }
1895 inner.flushed_offset = offset + len;
1896 inner.valid_to = inner.flushed_offset;
1897 Ok(())
1898 }
1899
1900 #[trace]
1901 async fn compact(&self) -> Result<(), Error> {
1902 assert!(
1903 self.inner.lock().image_builder_mode.is_none(),
1904 "compact called in image builder mode"
1905 );
1906 let bytes_before = self.objects.compaction_bytes_written();
1907 let _measure = crate::metrics::DurationMeasureScope::new(
1908 &crate::metrics::lsm_tree_metrics().journal_compaction_time,
1909 );
1910 crate::metrics::lsm_tree_metrics().journal_compactions_total.add(1);
1911 let trace = self.trace.load(Ordering::Relaxed);
1912 debug!("Compaction starting");
1913 if trace {
1914 info!("J: start compaction");
1915 }
1916 let earliest_version = self.objects.flush().await.context("Failed to flush objects")?;
1917 self.inner.lock().super_block_header.earliest_version = earliest_version;
1918 self.write_super_block().await.context("Failed to write superblock")?;
1919 if trace {
1920 info!("J: end compaction");
1921 }
1922 debug!("Compaction finished");
1923 let bytes_after = self.objects.compaction_bytes_written();
1924 crate::metrics::lsm_tree_metrics()
1925 .journal_compaction_bytes_written
1926 .add(bytes_after.saturating_sub(bytes_before));
1927 Ok(())
1928 }
1929
1930 pub async fn force_compact(&self) -> Result<(), Error> {
1933 self.inner.lock().forced_compaction = true;
1934 scopeguard::defer! { self.inner.lock().forced_compaction = false; }
1935 self.compact().await
1936 }
1937
1938 pub async fn stop_compactions(&self) {
1939 loop {
1940 debug_assert_not_too_long!({
1941 let mut inner = self.inner.lock();
1942 inner.disable_compactions = true;
1943 if !inner.compaction_running {
1944 return;
1945 }
1946 self.reclaim_event.listen()
1947 });
1948 }
1949 }
1950
1951 pub fn track_statistics(self: &Arc<Self>, parent: &fuchsia_inspect::Node, name: &str) {
1954 let this = Arc::downgrade(self);
1955 parent.record_lazy_child(name, move || {
1956 let this_clone = this.clone();
1957 async move {
1958 let inspector = fuchsia_inspect::Inspector::default();
1959 if let Some(this) = this_clone.upgrade() {
1960 let (journal_min, journal_max, journal_reclaim_size) = {
1961 let inner = this.inner.lock();
1963 (
1964 round_down(
1965 inner.super_block_header.journal_checkpoint.file_offset,
1966 BLOCK_SIZE,
1967 ),
1968 inner.flushed_offset,
1969 inner.reclaim_size,
1970 )
1971 };
1972 let root = inspector.root();
1973 root.record_uint("journal_min_offset", journal_min);
1974 root.record_uint("journal_max_offset", journal_max);
1975 root.record_uint("journal_size", journal_max - journal_min);
1976 root.record_uint("journal_reclaim_size", journal_reclaim_size);
1977
1978 if let Some(x) = round_div(
1980 100 * (journal_max - journal_min),
1981 this.objects.allocator().get_disk_bytes(),
1982 ) {
1983 root.record_uint("journal_size_to_disk_size_percent", x);
1984 }
1985 }
1986 Ok(inspector)
1987 }
1988 .boxed()
1989 });
1990 }
1991
1992 pub fn terminate(&self) {
1994 self.inner.lock().terminate(None);
1995 self.reclaim_event.notify(usize::MAX);
1996 }
1997}
1998
1999pub struct Writer<'a>(u64, &'a mut JournalWriter);
2001
2002impl Writer<'_> {
2003 pub fn write(&mut self, mutation: Mutation) {
2004 self.1.write_record(&JournalRecord::Mutation { object_id: self.0, mutation }).unwrap();
2005 }
2006}
2007
2008#[cfg(target_os = "fuchsia")]
2009mod yielder {
2010 use super::Journal;
2011 use crate::lsm_tree::Yielder;
2012 use fuchsia_async as fasync;
2013
2014 pub struct CompactionYielder<'a> {
2018 journal: &'a Journal,
2019 low_priority_task: Option<fasync::LowPriorityTask>,
2020 }
2021
2022 impl<'a> CompactionYielder<'a> {
2023 pub fn new(journal: &'a Journal) -> Self {
2024 Self { journal, low_priority_task: None }
2025 }
2026 }
2027
2028 impl Yielder for CompactionYielder<'_> {
2029 async fn yield_now(&mut self) {
2030 const IDLE_PERIOD: zx::MonotonicDuration = zx::MonotonicDuration::from_millis(4);
2034 const MAX_YIELD_DURATION: zx::MonotonicDuration =
2035 zx::MonotonicDuration::from_millis(16);
2036
2037 {
2038 let inner = self.journal.inner.lock();
2039 if inner.forced_compaction {
2040 return;
2041 }
2042 let outstanding = self.journal.objects.last_end_offset()
2043 - inner.super_block_header.journal_checkpoint.file_offset;
2044 let half_reclaim_size = inner.reclaim_size / 2;
2045 if outstanding
2046 .checked_sub(half_reclaim_size)
2047 .is_some_and(|x| x >= half_reclaim_size / 2)
2048 {
2049 self.low_priority_task = None;
2053 return;
2054 }
2055 }
2056
2057 self.low_priority_task
2058 .get_or_insert_with(|| fasync::LowPriorityTask::new())
2059 .wait_until_idle_for(
2060 IDLE_PERIOD,
2061 fasync::MonotonicInstant::after(MAX_YIELD_DURATION),
2062 )
2063 .await;
2064 }
2065 }
2066}
2067
2068#[cfg(not(target_os = "fuchsia"))]
2069mod yielder {
2070 use super::Journal;
2071 use crate::lsm_tree::Yielder;
2072
2073 #[expect(dead_code)]
2074 pub struct CompactionYielder<'a>(&'a Journal);
2075
2076 impl<'a> CompactionYielder<'a> {
2077 pub fn new(journal: &'a Journal) -> Self {
2078 Self(journal)
2079 }
2080 }
2081
2082 impl Yielder for CompactionYielder<'_> {
2083 async fn yield_now(&mut self) {}
2084 }
2085}
2086
2087pub use yielder::*;
2088
2089#[cfg(test)]
2090mod tests {
2091 use super::SuperBlockInstance;
2092 use crate::filesystem::{FxFilesystem, FxFilesystemBuilder, SyncOptions};
2093 use crate::fsck::fsck;
2094 use crate::object_handle::{ObjectHandle, ReadObjectHandle, WriteObjectHandle};
2095 use crate::object_store::directory::Directory;
2096 use crate::object_store::transaction::Options;
2097 use crate::object_store::volume::root_volume;
2098 use crate::object_store::{
2099 HandleOptions, LockKey, NewChildStoreOptions, ObjectStore, StoreOptions, lock_keys,
2100 };
2101 use fuchsia_async as fasync;
2102 use fuchsia_async::MonotonicDuration;
2103 use storage_device::DeviceHolder;
2104 use storage_device::fake_device::FakeDevice;
2105
2106 const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
2107
2108 #[fuchsia::test]
2109 async fn test_replay() {
2110 const TEST_DATA: &[u8] = b"hello";
2111
2112 let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
2113
2114 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2115
2116 let object_id = {
2117 let root_store = fs.root_store();
2118 let root_directory =
2119 Directory::open(&root_store, root_store.root_directory_object_id())
2120 .await
2121 .expect("open failed");
2122 let mut transaction = fs
2123 .clone()
2124 .new_transaction(
2125 lock_keys![LockKey::object(
2126 root_store.store_object_id(),
2127 root_store.root_directory_object_id(),
2128 )],
2129 Options::default(),
2130 )
2131 .await
2132 .expect("new_transaction failed");
2133 let handle = root_directory
2134 .create_child_file(&mut transaction, "test")
2135 .await
2136 .expect("create_child_file failed");
2137
2138 transaction.commit().await.expect("commit failed");
2139 let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
2140 buf.as_mut_slice().copy_from_slice(TEST_DATA);
2141 handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2142 fs.sync(SyncOptions::default()).await.expect("sync failed");
2145 handle.object_id()
2146 };
2147
2148 {
2149 fs.close().await.expect("Close failed");
2150 let device = fs.take_device().await;
2151 device.reopen(false);
2152 let fs = FxFilesystem::open(device).await.expect("open failed");
2153 let handle = ObjectStore::open_object(
2154 &fs.root_store(),
2155 object_id,
2156 HandleOptions::default(),
2157 None,
2158 )
2159 .await
2160 .expect("open_object failed");
2161 let mut buf = handle.allocate_buffer(TEST_DEVICE_BLOCK_SIZE as usize).await;
2162 assert_eq!(handle.read(0, buf.as_mut()).await.expect("read failed"), TEST_DATA.len());
2163 assert_eq!(&buf.as_slice()[..TEST_DATA.len()], TEST_DATA);
2164 fsck(fs.clone()).await.expect("fsck failed");
2165 fs.close().await.expect("Close failed");
2166 }
2167 }
2168
2169 #[fuchsia::test]
2170 async fn test_reset() {
2171 const TEST_DATA: &[u8] = b"hello";
2172
2173 let device = DeviceHolder::new(FakeDevice::new(32768, TEST_DEVICE_BLOCK_SIZE));
2174
2175 let mut object_ids = Vec::new();
2176
2177 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2178 {
2179 let root_store = fs.root_store();
2180 let root_directory =
2181 Directory::open(&root_store, root_store.root_directory_object_id())
2182 .await
2183 .expect("open failed");
2184 let mut transaction = fs
2185 .clone()
2186 .new_transaction(
2187 lock_keys![LockKey::object(
2188 root_store.store_object_id(),
2189 root_store.root_directory_object_id(),
2190 )],
2191 Options::default(),
2192 )
2193 .await
2194 .expect("new_transaction failed");
2195 let handle = root_directory
2196 .create_child_file(&mut transaction, "test")
2197 .await
2198 .expect("create_child_file failed");
2199 transaction.commit().await.expect("commit failed");
2200 let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
2201 buf.as_mut_slice().copy_from_slice(TEST_DATA);
2202 handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2203 fs.sync(SyncOptions::default()).await.expect("sync failed");
2204 object_ids.push(handle.object_id());
2205
2206 for i in 0..1000 {
2209 let mut transaction = fs
2210 .clone()
2211 .new_transaction(
2212 lock_keys![LockKey::object(
2213 root_store.store_object_id(),
2214 root_store.root_directory_object_id(),
2215 )],
2216 Options::default(),
2217 )
2218 .await
2219 .expect("new_transaction failed");
2220 let handle = root_directory
2221 .create_child_file(&mut transaction, &format!("{}", i))
2222 .await
2223 .expect("create_child_file failed");
2224 transaction.commit().await.expect("commit failed");
2225 let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
2226 buf.as_mut_slice().copy_from_slice(TEST_DATA);
2227 handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2228 object_ids.push(handle.object_id());
2229 }
2230 }
2231 fs.close().await.expect("fs close failed");
2232 let device = fs.take_device().await;
2233 device.reopen(false);
2234 let fs = FxFilesystem::open(device).await.expect("open failed");
2235 fsck(fs.clone()).await.expect("fsck failed");
2236 {
2237 let root_store = fs.root_store();
2238 for &object_id in &object_ids[0..1] {
2240 let handle = ObjectStore::open_object(
2241 &root_store,
2242 object_id,
2243 HandleOptions::default(),
2244 None,
2245 )
2246 .await
2247 .expect("open_object failed");
2248 let mut buf = handle.allocate_buffer(TEST_DEVICE_BLOCK_SIZE as usize).await;
2249 assert_eq!(
2250 handle.read(0, buf.as_mut()).await.expect("read failed"),
2251 TEST_DATA.len()
2252 );
2253 assert_eq!(&buf.as_slice()[..TEST_DATA.len()], TEST_DATA);
2254 }
2255
2256 let root_directory =
2258 Directory::open(&root_store, root_store.root_directory_object_id())
2259 .await
2260 .expect("open failed");
2261 let mut transaction = fs
2262 .clone()
2263 .new_transaction(
2264 lock_keys![LockKey::object(
2265 root_store.store_object_id(),
2266 root_store.root_directory_object_id(),
2267 )],
2268 Options::default(),
2269 )
2270 .await
2271 .expect("new_transaction failed");
2272 let handle = root_directory
2273 .create_child_file(&mut transaction, "test2")
2274 .await
2275 .expect("create_child_file failed");
2276 transaction.commit().await.expect("commit failed");
2277 let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
2278 buf.as_mut_slice().copy_from_slice(TEST_DATA);
2279 handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2280 fs.sync(SyncOptions::default()).await.expect("sync failed");
2281 object_ids.push(handle.object_id());
2282 }
2283
2284 fs.close().await.expect("close failed");
2285 let device = fs.take_device().await;
2286 device.reopen(false);
2287 let fs = FxFilesystem::open(device).await.expect("open failed");
2288 {
2289 fsck(fs.clone()).await.expect("fsck failed");
2290
2291 for &object_id in object_ids[0..1].iter().chain(object_ids.last().cloned().iter()) {
2293 let handle = ObjectStore::open_object(
2294 &fs.root_store(),
2295 object_id,
2296 HandleOptions::default(),
2297 None,
2298 )
2299 .await
2300 .unwrap_or_else(|e| {
2301 panic!("open_object failed (object_id: {}): {:?}", object_id, e)
2302 });
2303 let mut buf = handle.allocate_buffer(TEST_DEVICE_BLOCK_SIZE as usize).await;
2304 assert_eq!(
2305 handle.read(0, buf.as_mut()).await.expect("read failed"),
2306 TEST_DATA.len()
2307 );
2308 assert_eq!(&buf.as_slice()[..TEST_DATA.len()], TEST_DATA);
2309 }
2310 }
2311 fs.close().await.expect("close failed");
2312 }
2313
2314 #[fuchsia::test]
2315 async fn test_discard() {
2316 let device = {
2317 let device = DeviceHolder::new(FakeDevice::new(8192, 4096));
2318 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2319 let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
2320
2321 let store = root_volume
2322 .new_volume("test", NewChildStoreOptions::default())
2323 .await
2324 .expect("new_volume failed");
2325 let root_directory = Directory::open(&store, store.root_directory_object_id())
2326 .await
2327 .expect("open failed");
2328
2329 let mut i = 0;
2331 loop {
2332 let mut transaction = fs
2333 .clone()
2334 .new_transaction(
2335 lock_keys![LockKey::object(
2336 store.store_object_id(),
2337 store.root_directory_object_id()
2338 )],
2339 Options::default(),
2340 )
2341 .await
2342 .expect("new_transaction failed");
2343 root_directory
2344 .create_child_file(&mut transaction, &format!("a {i}"))
2345 .await
2346 .expect("create_child_file failed");
2347 if transaction.commit().await.expect("commit failed") > super::CHUNK_SIZE {
2348 break;
2349 }
2350 i += 1;
2351 }
2352
2353 fs.journal().force_compact().await.expect("compact failed");
2355 fs.journal().stop_compactions().await;
2356
2357 let mut i = 0;
2359 loop {
2360 let mut transaction = fs
2361 .clone()
2362 .new_transaction(
2363 lock_keys![LockKey::object(
2364 store.store_object_id(),
2365 store.root_directory_object_id()
2366 )],
2367 Options::default(),
2368 )
2369 .await
2370 .expect("new_transaction failed");
2371 root_directory
2372 .create_child_file(&mut transaction, &format!("b {i}"))
2373 .await
2374 .expect("create_child_file failed");
2375 if transaction.commit().await.expect("commit failed") > 2 * super::CHUNK_SIZE {
2376 break;
2377 }
2378 i += 1;
2379 }
2380
2381 fasync::Timer::new(MonotonicDuration::from_millis(10)).await;
2383 fs.device().snapshot().expect("snapshot failed")
2386 };
2387
2388 let fs = FxFilesystem::open(device).await.expect("open failed");
2389
2390 {
2391 let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
2392
2393 let store =
2394 root_volume.volume("test", StoreOptions::default()).await.expect("volume failed");
2395
2396 let root_directory = Directory::open(&store, store.root_directory_object_id())
2397 .await
2398 .expect("open failed");
2399
2400 let mut transaction = fs
2402 .clone()
2403 .new_transaction(
2404 lock_keys![LockKey::object(
2405 store.store_object_id(),
2406 store.root_directory_object_id()
2407 )],
2408 Options::default(),
2409 )
2410 .await
2411 .expect("new_transaction failed");
2412 root_directory
2413 .create_child_file(&mut transaction, &format!("d"))
2414 .await
2415 .expect("create_child_file failed");
2416 transaction.commit().await.expect("commit failed");
2417 }
2418
2419 fs.close().await.expect("close failed");
2420 let device = fs.take_device().await;
2421 device.reopen(false);
2422
2423 let fs = FxFilesystem::open(device).await.expect("open failed");
2424 fsck(fs.clone()).await.expect("fsck failed");
2425 fs.close().await.expect("close failed");
2426 }
2427
2428 #[fuchsia::test]
2429 async fn test_use_existing_generation() {
2430 let device = DeviceHolder::new(FakeDevice::new(8192, 4096));
2431
2432 let fs = FxFilesystemBuilder::new()
2434 .format(true)
2435 .image_builder_mode(Some(SuperBlockInstance::A))
2436 .open(device)
2437 .await
2438 .expect("open failed");
2439 fs.enable_allocations();
2440 let generation0 = fs.super_block_header().generation;
2441 assert_eq!(generation0, 1);
2442 fs.close().await.expect("close failed");
2443 let device = fs.take_device().await;
2444 device.reopen(false);
2445
2446 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2448 let generation1 = fs.super_block_header().generation;
2449 {
2450 let root_volume = crate::object_store::volume::root_volume(fs.clone())
2451 .await
2452 .expect("root_volume failed");
2453 root_volume
2454 .new_volume("test", crate::object_store::NewChildStoreOptions::default())
2455 .await
2456 .expect("new_volume failed");
2457 }
2458 fs.close().await.expect("close failed");
2459 let device = fs.take_device().await;
2460 device.reopen(false);
2461
2462 let fs = FxFilesystemBuilder::new()
2464 .format(true)
2465 .image_builder_mode(Some(SuperBlockInstance::A))
2466 .open(device)
2467 .await
2468 .expect("open failed");
2469 fs.enable_allocations();
2470 let generation2 = fs.super_block_header().generation;
2471 assert!(
2472 generation2 > generation1,
2473 "generation2 ({}) should be greater than generation1 ({})",
2474 generation2,
2475 generation1
2476 );
2477 fs.close().await.expect("close failed");
2478 }
2479
2480 #[fuchsia::test]
2481 async fn test_image_builder_mode_generation_bump_512_byte_block() {
2482 let device = DeviceHolder::new(FakeDevice::new(16384, 512));
2483
2484 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2486 let generation1 = fs.super_block_header().generation;
2487 fs.close().await.expect("close failed");
2488 let device = fs.take_device().await;
2489 device.reopen(false);
2490
2491 let fs = FxFilesystemBuilder::new()
2493 .format(true)
2494 .image_builder_mode(Some(SuperBlockInstance::A))
2495 .open(device)
2496 .await
2497 .expect("open failed");
2498
2499 fs.enable_allocations();
2500 let generation2 = fs.super_block_header().generation;
2501 assert!(
2502 generation2 > generation1,
2503 "Expected generation bump, got {} vs {}",
2504 generation2,
2505 generation1
2506 );
2507 fs.close().await.expect("close failed");
2508 }
2509
2510 #[fuchsia::test(allow_stalls = false)]
2511 #[cfg(target_os = "fuchsia")]
2512 async fn test_low_priority_compaction() {
2513 use fuchsia_async::TestExecutor;
2514 use std::sync::Arc;
2515 use std::sync::atomic::{AtomicBool, Ordering};
2516
2517 let device = DeviceHolder::new(FakeDevice::new(16384, TEST_DEVICE_BLOCK_SIZE));
2518 let fs = FxFilesystemBuilder::new()
2519 .journal_options(super::JournalOptions { reclaim_size: 65536, ..Default::default() })
2520 .format(true)
2521 .open(device)
2522 .await
2523 .expect("open failed");
2524
2525 let _low = fasync::LowPriorityTask::new();
2526
2527 {
2529 let root_store = fs.root_store();
2530 let root_directory =
2531 Directory::open(&root_store, root_store.root_directory_object_id())
2532 .await
2533 .expect("open failed");
2534 for i in 0..100 {
2535 let mut transaction = fs
2536 .clone()
2537 .new_transaction(
2538 lock_keys![LockKey::object(
2539 root_store.store_object_id(),
2540 root_store.root_directory_object_id(),
2541 )],
2542 Options::default(),
2543 )
2544 .await
2545 .expect("new_transaction failed");
2546 root_directory
2547 .create_child_file(&mut transaction, &format!("test{}", i))
2548 .await
2549 .expect("create_child_file failed");
2550 transaction.commit().await.expect("commit failed");
2551 }
2552 }
2553
2554 let stop = Arc::new(AtomicBool::new(false));
2556 let stop_clone = stop.clone();
2557 let _normal_task = fasync::Task::spawn(async move {
2558 while !stop_clone.load(Ordering::Relaxed) {
2559 fasync::Timer::new(fasync::MonotonicInstant::after(
2560 MonotonicDuration::from_millis(1),
2561 ))
2562 .await;
2563 }
2564 });
2565
2566 {
2569 let root_store = fs.root_store();
2570 let root_directory =
2571 Directory::open(&root_store, root_store.root_directory_object_id())
2572 .await
2573 .expect("open failed");
2574 let mut i = 0;
2575 loop {
2576 let mut transaction = fs
2577 .clone()
2578 .new_transaction(
2579 lock_keys![LockKey::object(
2580 root_store.store_object_id(),
2581 root_store.root_directory_object_id(),
2582 )],
2583 Options::default(),
2584 )
2585 .await
2586 .expect("new_transaction failed");
2587 root_directory
2588 .create_child_file(&mut transaction, &format!("trigger{i}"))
2589 .await
2590 .expect("create_child_file failed");
2591 transaction.commit().await.expect("commit failed");
2592
2593 if fs.journal().inner.lock().compaction_running {
2594 break;
2595 }
2596 TestExecutor::advance_to(fasync::MonotonicInstant::after(
2597 MonotonicDuration::from_millis(1),
2598 ))
2599 .await;
2600 i += 1;
2601 }
2602 }
2603
2604 for _ in 0..10 {
2607 TestExecutor::advance_to(fasync::MonotonicInstant::after(
2608 MonotonicDuration::from_millis(1),
2609 ))
2610 .await;
2611 assert!(fs.journal().inner.lock().compaction_running);
2612 }
2613
2614 stop.store(true, Ordering::Relaxed);
2616 TestExecutor::advance_to(fasync::MonotonicInstant::after(MonotonicDuration::from_millis(
2617 1,
2618 )))
2619 .await;
2620
2621 assert!(fs.journal().inner.lock().compaction_running);
2624
2625 for _ in 0..3 {
2627 TestExecutor::advance_to(fasync::MonotonicInstant::after(
2628 MonotonicDuration::from_millis(1),
2629 ))
2630 .await;
2631 assert!(fs.journal().inner.lock().compaction_running);
2632 }
2633
2634 TestExecutor::advance_to(fasync::MonotonicInstant::after(MonotonicDuration::from_millis(
2636 1,
2637 )))
2638 .await;
2639
2640 let _ = TestExecutor::poll_until_stalled(std::future::pending::<()>()).await;
2642 assert!(!fs.journal().inner.lock().compaction_running);
2643
2644 fs.close().await.expect("Close failed");
2645 }
2646
2647 #[fuchsia::test(allow_stalls = false)]
2648 #[cfg(target_os = "fuchsia")]
2649 async fn test_low_priority_compaction_deadline() {
2650 use fuchsia_async::TestExecutor;
2651 use std::sync::Arc;
2652 use std::sync::atomic::{AtomicBool, Ordering};
2653
2654 let device = DeviceHolder::new(FakeDevice::new(216384, TEST_DEVICE_BLOCK_SIZE));
2655 let fs = FxFilesystemBuilder::new()
2656 .journal_options(super::JournalOptions { reclaim_size: 65536, ..Default::default() })
2657 .format(true)
2658 .open(device)
2659 .await
2660 .expect("open failed");
2661
2662 let _low = fasync::LowPriorityTask::new();
2663
2664 {
2666 let root_store = fs.root_store();
2667 let root_directory =
2668 Directory::open(&root_store, root_store.root_directory_object_id())
2669 .await
2670 .expect("open failed");
2671 for i in 0..10 {
2672 let mut transaction = fs
2673 .clone()
2674 .new_transaction(
2675 lock_keys![LockKey::object(
2676 root_store.store_object_id(),
2677 root_store.root_directory_object_id(),
2678 )],
2679 Options::default(),
2680 )
2681 .await
2682 .expect("new_transaction failed");
2683 root_directory
2684 .create_child_file(&mut transaction, &format!("test{}", i))
2685 .await
2686 .expect("create_child_file failed");
2687 transaction.commit().await.expect("commit failed");
2688 }
2689 }
2690
2691 let stop = Arc::new(AtomicBool::new(false));
2693 let stop_clone = stop.clone();
2694 let _normal_task = fasync::Task::spawn(async move {
2695 while !stop_clone.load(Ordering::Relaxed) {
2696 fasync::Timer::new(fasync::MonotonicInstant::after(
2697 MonotonicDuration::from_millis(3),
2698 ))
2699 .await;
2700 }
2701 });
2702
2703 {
2705 let root_store = fs.root_store();
2706 let root_directory =
2707 Directory::open(&root_store, root_store.root_directory_object_id())
2708 .await
2709 .expect("open failed");
2710 let mut i = 0;
2711 loop {
2712 let mut transaction = fs
2713 .clone()
2714 .new_transaction(
2715 lock_keys![LockKey::object(
2716 root_store.store_object_id(),
2717 root_store.root_directory_object_id(),
2718 )],
2719 Options::default(),
2720 )
2721 .await
2722 .expect("new_transaction failed");
2723 root_directory
2724 .create_child_file(&mut transaction, &format!("trigger{i}"))
2725 .await
2726 .expect("create_child_file failed");
2727 transaction.commit().await.expect("commit failed");
2728
2729 if fs.journal().inner.lock().compaction_running {
2730 break;
2731 }
2732 TestExecutor::advance_to(fasync::MonotonicInstant::after(
2733 MonotonicDuration::from_millis(1),
2734 ))
2735 .await;
2736 i += 1;
2737 }
2738 }
2739
2740 let mut count = 0;
2743 for _ in 0..1000 {
2744 TestExecutor::advance_to(fasync::MonotonicInstant::after(
2745 MonotonicDuration::from_millis(20),
2746 ))
2747 .await;
2748 if !fs.journal().inner.lock().compaction_running {
2749 break;
2750 }
2751 count += 1;
2752 }
2753 assert!(!fs.journal().inner.lock().compaction_running);
2754
2755 assert!(count > 200);
2758
2759 stop.store(true, Ordering::Relaxed);
2760 fs.close().await.expect("Close failed");
2761 }
2762
2763 #[fuchsia::test(allow_stalls = false)]
2764 #[cfg(target_os = "fuchsia")]
2765 async fn test_low_priority_compaction_no_yielding_when_full() {
2766 use fuchsia_async::TestExecutor;
2767 use std::sync::Arc;
2768 use std::sync::atomic::{AtomicBool, Ordering};
2769
2770 let reclaim_size = 65536;
2771 let device = DeviceHolder::new(FakeDevice::new(216384, TEST_DEVICE_BLOCK_SIZE));
2772 let fs = FxFilesystemBuilder::new()
2773 .journal_options(super::JournalOptions { reclaim_size, ..Default::default() })
2774 .format(true)
2775 .open(device)
2776 .await
2777 .expect("open failed");
2778
2779 let _low = fasync::LowPriorityTask::new();
2780
2781 {
2783 let root_store = fs.root_store();
2784 let root_directory =
2785 Directory::open(&root_store, root_store.root_directory_object_id())
2786 .await
2787 .expect("open failed");
2788 for i in 0..10 {
2789 let mut transaction = fs
2790 .clone()
2791 .new_transaction(
2792 lock_keys![LockKey::object(
2793 root_store.store_object_id(),
2794 root_store.root_directory_object_id(),
2795 )],
2796 Options::default(),
2797 )
2798 .await
2799 .expect("new_transaction failed");
2800 root_directory
2801 .create_child_file(&mut transaction, &format!("test{}", i))
2802 .await
2803 .expect("create_child_file failed");
2804 transaction.commit().await.expect("commit failed");
2805 }
2806 }
2807
2808 let stop = Arc::new(AtomicBool::new(false));
2810 let stop_clone = stop.clone();
2811 let _normal_task = fasync::Task::spawn(async move {
2812 while !stop_clone.load(Ordering::Relaxed) {
2813 fasync::Timer::new(fasync::MonotonicInstant::after(
2814 MonotonicDuration::from_millis(3),
2815 ))
2816 .await;
2817 }
2818 });
2819
2820 {
2822 let root_store = fs.root_store();
2823 let root_directory =
2824 Directory::open(&root_store, root_store.root_directory_object_id())
2825 .await
2826 .expect("open failed");
2827 let mut i = 0;
2828 loop {
2829 let mut transaction = fs
2830 .clone()
2831 .new_transaction(
2832 lock_keys![LockKey::object(
2833 root_store.store_object_id(),
2834 root_store.root_directory_object_id(),
2835 )],
2836 Options::default(),
2837 )
2838 .await
2839 .expect("new_transaction failed");
2840 root_directory
2841 .create_child_file(&mut transaction, &format!("trigger{i}"))
2842 .await
2843 .expect("create_child_file failed");
2844 transaction.commit().await.expect("commit failed");
2845
2846 let outstanding = {
2847 let inner = fs.journal().inner.lock();
2848 fs.journal().objects.last_end_offset()
2849 - inner.super_block_header.journal_checkpoint.file_offset
2850 };
2851 if outstanding >= reclaim_size * 7 / 8 {
2852 break;
2853 }
2854 i += 1;
2856 }
2857 }
2858
2859 TestExecutor::advance_to(fasync::MonotonicInstant::after(MonotonicDuration::from_millis(
2861 4,
2862 )))
2863 .await;
2864
2865 let _ = TestExecutor::poll_until_stalled(std::future::pending::<()>()).await;
2867 assert!(!fs.journal().inner.lock().compaction_running);
2868
2869 stop.store(true, Ordering::Relaxed);
2870 fs.close().await.expect("Close failed");
2871 }
2872}
2873
2874#[cfg(fuzz)]
2875mod fuzz {
2876 use fuzz::fuzz;
2877
2878 #[fuzz]
2879 fn fuzz_journal_bytes(input: Vec<u8>) {
2880 use crate::filesystem::FxFilesystem;
2881 use fuchsia_async as fasync;
2882 use std::io::Write;
2883 use storage_device::DeviceHolder;
2884 use storage_device::fake_device::FakeDevice;
2885
2886 fasync::SendExecutorBuilder::new().num_threads(4).build().run(async move {
2887 let device = DeviceHolder::new(FakeDevice::new(32768, 512));
2888 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2889 fs.journal().inner.lock().writer.write_all(&input).expect("write failed");
2890 fs.close().await.expect("close failed");
2891 let device = fs.take_device().await;
2892 device.reopen(false);
2893 if let Ok(fs) = FxFilesystem::open(device).await {
2894 let _ = fs.close().await;
2897 }
2898 });
2899 }
2900
2901 #[fuzz]
2902 fn fuzz_journal(input: Vec<super::JournalRecord>) {
2903 use crate::filesystem::FxFilesystem;
2904 use fuchsia_async as fasync;
2905 use storage_device::DeviceHolder;
2906 use storage_device::fake_device::FakeDevice;
2907
2908 fasync::SendExecutorBuilder::new().num_threads(4).build().run(async move {
2909 let device = DeviceHolder::new(FakeDevice::new(32768, 512));
2910 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2911 {
2912 let mut inner = fs.journal().inner.lock();
2913 for record in &input {
2914 let _ = inner.writer.write_record(record);
2915 }
2916 }
2917 fs.close().await.expect("close failed");
2918 let device = fs.take_device().await;
2919 device.reopen(false);
2920 if let Ok(fs) = FxFilesystem::open(device).await {
2921 let _ = fs.close().await;
2924 }
2925 });
2926 }
2927}