1mod bootstrap_handle;
20mod checksum_list;
21mod reader;
22pub mod super_block;
23mod writer;
24
25use crate::checksum::{Checksum, Checksums, ChecksumsV38};
26use crate::errors::FxfsError;
27use crate::filesystem::{ApplyContext, ApplyMode, FxFilesystem, SyncOptions};
28use crate::log::*;
29use crate::lsm_tree::cache::NullCache;
30use crate::lsm_tree::types::Layer;
31use crate::object_handle::{ObjectHandle as _, ReadObjectHandle};
32use crate::object_store::allocator::Allocator;
33use crate::object_store::data_object_handle::OverwriteOptions;
34use crate::object_store::extent_record::{
35 DEFAULT_DATA_ATTRIBUTE_ID, ExtentKey, ExtentMode, ExtentValue,
36};
37use crate::object_store::graveyard::Graveyard;
38use crate::object_store::journal::bootstrap_handle::BootstrapObjectHandle;
39use crate::object_store::journal::checksum_list::ChecksumList;
40use crate::object_store::journal::reader::{JournalReader, ReadResult};
41use crate::object_store::journal::super_block::{
42 SuperBlockHeader, SuperBlockInstance, SuperBlockManager,
43};
44use crate::object_store::journal::writer::JournalWriter;
45use crate::object_store::object_manager::ObjectManager;
46use crate::object_store::object_record::{AttributeKey, ObjectKey, ObjectKeyData, ObjectValue};
47use crate::object_store::transaction::{
48 AllocatorMutation, LockKey, Mutation, MutationV40, MutationV41, MutationV43, MutationV46,
49 MutationV47, MutationV49, MutationV50, MutationV54, MutationV55, ObjectStoreMutation, Options,
50 TRANSACTION_MAX_JOURNAL_USAGE, Transaction, TxnMutation, lock_keys,
51};
52use crate::object_store::{
53 AssocObj, DataObjectHandle, HandleOptions, HandleOwner, INVALID_OBJECT_ID, Item, ItemRef,
54 NewChildStoreOptions, ObjectStore, ReservedId,
55};
56use crate::range::RangeExt;
57use crate::round::{round_div, round_down};
58use crate::serialized_types::{
59 LATEST_VERSION, Migrate, Version, Versioned, migrate_nodefault, migrate_to_version,
60};
61use anyhow::{Context, Error, anyhow, bail, ensure};
62use event_listener::Event;
63use fprint::TypeFingerprint;
64use fuchsia_inspect::NumericProperty;
65use fuchsia_sync::Mutex;
66use futures::FutureExt as _;
67use futures::future::poll_fn;
68use rustc_hash::FxHashMap as HashMap;
69use serde::{Deserialize, Serialize};
70use static_assertions::const_assert;
71use std::clone::Clone;
72use std::collections::HashSet;
73use std::num::NonZero;
74use std::ops::{Bound, Range};
75use std::sync::atomic::{AtomicBool, Ordering};
76use std::sync::{Arc, OnceLock};
77use std::task::{Poll, Waker};
78use storage_device::Device;
79
80pub const BLOCK_SIZE: u64 = 4096;
82
83const CHUNK_SIZE: u64 = 131_072;
85const_assert!(CHUNK_SIZE > TRANSACTION_MAX_JOURNAL_USAGE);
86
87pub const DEFAULT_RECLAIM_SIZE: u64 = 524_288;
89
90pub const RESERVED_SPACE: u64 = 1_048_576;
93
94const RESET_XOR: u64 = 0xffffffffffffffff;
99
100pub type JournalCheckpoint = JournalCheckpointV32;
104
105#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize, TypeFingerprint)]
106pub struct JournalCheckpointV32 {
107 pub file_offset: u64,
108
109 pub checksum: Checksum,
112
113 pub version: Version,
117}
118
119pub type JournalRecord = JournalRecordV55;
120
121#[allow(clippy::large_enum_variant)]
122#[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint, Versioned)]
123#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
124pub enum JournalRecordV55 {
125 EndBlock,
126 Mutation {
127 object_id: u64,
128 mutation: MutationV55,
129 },
130 Commit,
132 Discard(u64),
134 DidFlushDevice(u64),
144 DataChecksums(Range<u64>, crate::checksum::ChecksumsV38, bool),
152}
153
154#[allow(clippy::large_enum_variant)]
155#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
156#[migrate_to_version(JournalRecordV55)]
157#[migrate_nodefault]
158pub enum JournalRecordV54 {
159 EndBlock,
160 Mutation { object_id: u64, mutation: MutationV54 },
161 Commit,
162 Discard(u64),
163 DidFlushDevice(u64),
164 DataChecksums(Range<u64>, crate::checksum::ChecksumsV38, bool),
165}
166
167#[allow(clippy::large_enum_variant)]
168#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
169#[migrate_to_version(JournalRecordV54)]
170#[migrate_nodefault]
171pub enum JournalRecordV50 {
172 EndBlock,
173 Mutation { object_id: u64, mutation: MutationV50 },
174 Commit,
175 Discard(u64),
176 DidFlushDevice(u64),
177 DataChecksums(Range<u64>, ChecksumsV38, bool),
178}
179
180#[allow(clippy::large_enum_variant)]
181#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
182#[migrate_to_version(JournalRecordV50)]
183pub enum JournalRecordV49 {
184 EndBlock,
185 Mutation { object_id: u64, mutation: MutationV49 },
186 Commit,
187 Discard(u64),
188 DidFlushDevice(u64),
189 DataChecksums(Range<u64>, ChecksumsV38, bool),
190}
191
192#[allow(clippy::large_enum_variant)]
193#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
194#[migrate_to_version(JournalRecordV49)]
195pub enum JournalRecordV47 {
196 EndBlock,
197 Mutation { object_id: u64, mutation: MutationV47 },
198 Commit,
199 Discard(u64),
200 DidFlushDevice(u64),
201 DataChecksums(Range<u64>, ChecksumsV38, bool),
202}
203
204#[allow(clippy::large_enum_variant)]
205#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
206#[migrate_to_version(JournalRecordV47)]
207pub enum JournalRecordV46 {
208 EndBlock,
209 Mutation { object_id: u64, mutation: MutationV46 },
210 Commit,
211 Discard(u64),
212 DidFlushDevice(u64),
213 DataChecksums(Range<u64>, ChecksumsV38, bool),
214}
215
216#[allow(clippy::large_enum_variant)]
217#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
218#[migrate_to_version(JournalRecordV46)]
219pub enum JournalRecordV43 {
220 EndBlock,
221 Mutation { object_id: u64, mutation: MutationV43 },
222 Commit,
223 Discard(u64),
224 DidFlushDevice(u64),
225 DataChecksums(Range<u64>, ChecksumsV38, bool),
226}
227
228#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
229#[migrate_to_version(JournalRecordV43)]
230pub enum JournalRecordV42 {
231 EndBlock,
232 Mutation { object_id: u64, mutation: MutationV41 },
233 Commit,
234 Discard(u64),
235 DidFlushDevice(u64),
236 DataChecksums(Range<u64>, ChecksumsV38, bool),
237}
238
239#[derive(Serialize, Deserialize, TypeFingerprint, Versioned)]
240pub enum JournalRecordV41 {
241 EndBlock,
242 Mutation { object_id: u64, mutation: MutationV41 },
243 Commit,
244 Discard(u64),
245 DidFlushDevice(u64),
246 DataChecksums(Range<u64>, ChecksumsV38),
247}
248
249impl From<JournalRecordV41> for JournalRecordV42 {
250 fn from(record: JournalRecordV41) -> Self {
251 match record {
252 JournalRecordV41::EndBlock => Self::EndBlock,
253 JournalRecordV41::Mutation { object_id, mutation } => {
254 Self::Mutation { object_id, mutation: mutation.into() }
255 }
256 JournalRecordV41::Commit => Self::Commit,
257 JournalRecordV41::Discard(offset) => Self::Discard(offset),
258 JournalRecordV41::DidFlushDevice(offset) => Self::DidFlushDevice(offset),
259 JournalRecordV41::DataChecksums(range, sums) => {
260 Self::DataChecksums(range, sums, true)
263 }
264 }
265 }
266}
267
268#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
269#[migrate_to_version(JournalRecordV41)]
270pub enum JournalRecordV40 {
271 EndBlock,
272 Mutation { object_id: u64, mutation: MutationV40 },
273 Commit,
274 Discard(u64),
275 DidFlushDevice(u64),
276 DataChecksums(Range<u64>, ChecksumsV38),
277}
278
279pub(super) fn journal_handle_options() -> HandleOptions {
280 HandleOptions { skip_journal_checks: true, ..Default::default() }
281}
282
283pub struct Journal {
289 objects: Arc<ObjectManager>,
290 handle: OnceLock<DataObjectHandle<ObjectStore>>,
291 super_block_manager: SuperBlockManager,
292 inner: Mutex<Inner>,
293 writer_mutex: Mutex<()>,
294 sync_mutex: futures::lock::Mutex<()>,
295 trace: AtomicBool,
296
297 reclaim_event: Event,
299}
300
301struct Inner {
302 super_block_header: SuperBlockHeader,
303
304 zero_offset: Option<u64>,
306
307 device_flushed_offset: u64,
309
310 needs_did_flush_device: bool,
312
313 writer: JournalWriter,
315
316 output_reset_version: bool,
319
320 flush_waker: Option<Waker>,
322
323 terminate: bool,
325
326 terminate_reason: Option<Error>,
328
329 disable_compactions: bool,
331
332 compaction_running: bool,
334
335 sync_waker: Option<Waker>,
337
338 flushed_offset: u64,
340
341 valid_to: u64,
346
347 discard_offset: Option<u64>,
351
352 reclaim_size: u64,
356
357 image_builder_mode: Option<SuperBlockInstance>,
358
359 barriers_enabled: bool,
363
364 needs_barrier: bool,
367
368 forced_compaction: bool,
370}
371
372impl Inner {
373 fn terminate(&mut self, reason: Option<Error>) {
374 self.terminate = true;
375
376 if let Some(err) = reason {
377 error!(error:? = err; "Terminating journal");
378 if let Some(prev_err) = self.terminate_reason.as_ref() {
380 error!(error:? = prev_err; "Journal previously terminated");
381 } else {
382 self.terminate_reason = Some(err);
383 }
384 }
385
386 if let Some(waker) = self.flush_waker.take() {
387 waker.wake();
388 }
389 if let Some(waker) = self.sync_waker.take() {
390 waker.wake();
391 }
392 }
393}
394
395pub struct JournalOptions {
396 pub reclaim_size: u64,
400
401 pub barriers_enabled: bool,
405}
406
407impl Default for JournalOptions {
408 fn default() -> Self {
409 JournalOptions { reclaim_size: DEFAULT_RECLAIM_SIZE, barriers_enabled: false }
410 }
411}
412
413struct JournaledTransactions {
414 transactions: Vec<JournaledTransaction>,
415 device_flushed_offset: u64,
416}
417
418#[derive(Debug, Default)]
419pub struct JournaledTransaction {
420 pub checkpoint: JournalCheckpoint,
421 pub root_parent_mutations: Vec<Mutation>,
422 pub root_mutations: Vec<Mutation>,
423 pub non_root_mutations: Vec<(u64, Mutation)>,
425 pub end_offset: u64,
426 pub checksums: Vec<JournaledChecksums>,
427
428 pub end_flush: Option<(u64, u64)>,
431
432 pub volume_deleted: Option<u64>,
434}
435
436impl JournaledTransaction {
437 fn new(checkpoint: JournalCheckpoint) -> Self {
438 Self { checkpoint, ..Default::default() }
439 }
440}
441
442const VOLUME_DELETED: u64 = u64::MAX;
443
444#[derive(Debug)]
445pub struct JournaledChecksums {
446 pub device_range: Range<u64>,
447 pub checksums: Checksums,
448 pub first_write: bool,
449}
450
451pub trait JournalHandle: ReadObjectHandle {
454 fn end_offset(&self) -> Option<u64>;
458 fn push_extent(&mut self, added_offset: u64, device_range: Range<u64>);
462 fn discard_extents(&mut self, discard_offset: u64);
464}
465
466impl<S: HandleOwner> JournalHandle for DataObjectHandle<S> {
470 fn end_offset(&self) -> Option<u64> {
471 None
472 }
473 fn push_extent(&mut self, _added_offset: u64, _device_range: Range<u64>) {
474 }
476 fn discard_extents(&mut self, _discard_offset: u64) {
477 }
479}
480
481#[fxfs_trace::trace]
482impl Journal {
483 pub fn new(objects: Arc<ObjectManager>, options: JournalOptions) -> Journal {
484 let starting_checksum = rand::random_range(1..u64::MAX);
485 Journal {
486 objects: objects,
487 handle: OnceLock::new(),
488 super_block_manager: SuperBlockManager::new(),
489 inner: Mutex::new(Inner {
490 super_block_header: SuperBlockHeader::default(),
491 zero_offset: None,
492 device_flushed_offset: 0,
493 needs_did_flush_device: false,
494 writer: JournalWriter::new(BLOCK_SIZE as usize, starting_checksum),
495 output_reset_version: false,
496 flush_waker: None,
497 terminate: false,
498 terminate_reason: None,
499 disable_compactions: false,
500 compaction_running: false,
501 sync_waker: None,
502 flushed_offset: 0,
503 valid_to: 0,
504 discard_offset: None,
505 reclaim_size: options.reclaim_size,
506 image_builder_mode: None,
507 barriers_enabled: options.barriers_enabled,
508 needs_barrier: false,
509 forced_compaction: false,
510 }),
511 writer_mutex: Mutex::new(()),
512 sync_mutex: futures::lock::Mutex::new(()),
513 trace: AtomicBool::new(false),
514 reclaim_event: Event::new(),
515 }
516 }
517
518 pub fn set_trace(&self, trace: bool) {
519 let old_value = self.trace.swap(trace, Ordering::Relaxed);
520 if trace != old_value {
521 info!(trace; "J: trace");
522 }
523 }
524
525 pub fn set_image_builder_mode(&self, mode: Option<SuperBlockInstance>) {
526 self.inner.lock().image_builder_mode = mode;
527 if let Some(instance) = mode {
528 *self.super_block_manager.next_instance.lock() = instance;
529 }
530 }
531
532 pub fn image_builder_mode(&self) -> Option<SuperBlockInstance> {
533 self.inner.lock().image_builder_mode
534 }
535
536 #[cfg(feature = "migration")]
537 pub fn set_filesystem_uuid(&self, uuid: &[u8; 16]) -> Result<(), Error> {
538 ensure!(
539 self.inner.lock().image_builder_mode.is_some(),
540 "Can only set filesystem uuid in image builder mode."
541 );
542 self.inner.lock().super_block_header.guid.0 = uuid::Uuid::from_bytes(*uuid);
543 Ok(())
544 }
545
546 pub(crate) async fn read_superblocks(
547 &self,
548 device: Arc<dyn Device>,
549 block_size: u64,
550 ) -> Result<(SuperBlockHeader, ObjectStore), Error> {
551 self.super_block_manager.load(device, block_size).await
552 }
553
554 fn validate_mutation(&self, mutation: &Mutation, block_size: u64, device_size: u64) -> bool {
558 match mutation {
559 Mutation::ObjectStore(ObjectStoreMutation {
560 item:
561 Item {
562 key:
563 ObjectKey {
564 data:
565 ObjectKeyData::Attribute(
566 _,
567 AttributeKey::Extent(ExtentKey { range }),
568 ),
569 ..
570 },
571 value: ObjectValue::Extent(ExtentValue::Some { device_offset, mode, .. }),
572 ..
573 },
574 ..
575 }) => {
576 if range.is_empty() || !range.is_aligned(block_size) {
577 return false;
578 }
579 let len = range.length().unwrap();
580 if let ExtentMode::Cow(checksums) = mode {
581 if checksums.len() > 0 {
582 if len % checksums.len() as u64 != 0 {
583 return false;
584 }
585 if (len / checksums.len() as u64) % block_size != 0 {
586 return false;
587 }
588 }
589 }
590 if *device_offset % block_size != 0
591 || *device_offset >= device_size
592 || device_size - *device_offset < len
593 {
594 return false;
595 }
596 }
597 Mutation::ObjectStore(_) => {}
598 Mutation::EncryptedObjectStore(_) => {}
599 Mutation::Allocator(AllocatorMutation::Allocate { device_range, owner_object_id }) => {
600 return !device_range.is_empty()
601 && *owner_object_id != INVALID_OBJECT_ID
602 && device_range.end <= device_size;
603 }
604 Mutation::Allocator(AllocatorMutation::Deallocate {
605 device_range,
606 owner_object_id,
607 }) => {
608 return !device_range.is_empty()
609 && *owner_object_id != INVALID_OBJECT_ID
610 && device_range.end <= device_size;
611 }
612 Mutation::Allocator(AllocatorMutation::MarkForDeletion(owner_object_id)) => {
613 return *owner_object_id != INVALID_OBJECT_ID;
614 }
615 Mutation::Allocator(AllocatorMutation::SetLimit { owner_object_id, .. }) => {
616 return *owner_object_id != INVALID_OBJECT_ID;
617 }
618 Mutation::BeginFlush => {}
619 Mutation::EndFlush => {}
620 Mutation::DeleteVolume => {}
621 Mutation::UpdateBorrowed(_) => {}
622 Mutation::UpdateMutationsKey(_) => {}
623 Mutation::CreateInternalDir(owner_object_id) => {
624 return *owner_object_id != INVALID_OBJECT_ID;
625 }
626 }
627 true
628 }
629
630 fn update_checksum_list(
632 &self,
633 journal_offset: u64,
634 mutation: &Mutation,
635 checksum_list: &mut ChecksumList,
636 ) -> Result<(), Error> {
637 match mutation {
638 Mutation::ObjectStore(_) => {}
639 Mutation::Allocator(AllocatorMutation::Deallocate { device_range, .. }) => {
640 checksum_list.mark_deallocated(journal_offset, device_range.clone().into());
641 }
642 _ => {}
643 }
644 Ok(())
645 }
646
647 #[trace]
649 pub async fn replay(
650 &self,
651 filesystem: Arc<FxFilesystem>,
652 on_new_allocator: Option<Box<dyn Fn(Arc<Allocator>) + Send + Sync>>,
653 ) -> Result<(), Error> {
654 let block_size = filesystem.block_size();
655
656 let (super_block, root_parent) =
657 self.super_block_manager.load(filesystem.device(), block_size).await?;
658
659 let root_parent = Arc::new(ObjectStore::attach_filesystem(root_parent, filesystem.clone()));
660
661 self.objects.set_root_parent_store(root_parent.clone());
662 let allocator =
663 Arc::new(Allocator::new(filesystem.clone(), super_block.allocator_object_id));
664 if let Some(on_new_allocator) = on_new_allocator {
665 on_new_allocator(allocator.clone());
666 }
667 self.objects.set_allocator(allocator.clone());
668 self.objects.set_borrowed_metadata_space(super_block.borrowed_metadata_space);
669 self.objects.set_last_end_offset(super_block.super_block_journal_file_offset);
670 {
671 let mut inner = self.inner.lock();
672 inner.super_block_header = super_block.clone();
673 }
674
675 let device = filesystem.device();
676
677 let mut handle;
678 {
679 let root_parent_layer = root_parent.tree().mutable_layer();
680 let mut iter = root_parent_layer
681 .seek(Bound::Included(&ObjectKey::attribute(
682 super_block.journal_object_id,
683 DEFAULT_DATA_ATTRIBUTE_ID,
684 AttributeKey::Extent(ExtentKey::search_key_from_offset(round_down(
685 super_block.journal_checkpoint.file_offset,
686 BLOCK_SIZE,
687 ))),
688 )))
689 .await
690 .context("Failed to seek root parent store")?;
691 let start_offset = if let Some(ItemRef {
692 key:
693 ObjectKey {
694 data:
695 ObjectKeyData::Attribute(
696 DEFAULT_DATA_ATTRIBUTE_ID,
697 AttributeKey::Extent(ExtentKey { range }),
698 ),
699 ..
700 },
701 ..
702 }) = iter.get()
703 {
704 range.start
705 } else {
706 0
707 };
708 handle = BootstrapObjectHandle::new_with_start_offset(
709 super_block.journal_object_id,
710 device.clone(),
711 start_offset,
712 );
713 while let Some(item) = iter.get() {
714 if !match item.into() {
715 Some((
716 object_id,
717 DEFAULT_DATA_ATTRIBUTE_ID,
718 ExtentKey { range },
719 ExtentValue::Some { device_offset, .. },
720 )) if object_id == super_block.journal_object_id => {
721 if let Some(end_offset) = handle.end_offset() {
722 if range.start != end_offset {
723 bail!(anyhow!(FxfsError::Inconsistent).context(format!(
724 "Unexpected journal extent {:?}, expected start: {}",
725 item, end_offset
726 )));
727 }
728 }
729 handle.push_extent(
730 0, *device_offset
732 ..*device_offset + range.length().context("Invalid extent")?,
733 );
734 true
735 }
736 _ => false,
737 } {
738 break;
739 }
740 iter.advance().await.context("Failed to advance root parent store iterator")?;
741 }
742 }
743
744 let mut reader = JournalReader::new(handle, &super_block.journal_checkpoint);
745 let JournaledTransactions { mut transactions, device_flushed_offset } = self
746 .read_transactions(&mut reader, None, INVALID_OBJECT_ID)
747 .await
748 .context("Reading transactions for replay")?;
749
750 let mut checksum_list = ChecksumList::new(device_flushed_offset);
752 let mut valid_to = reader.journal_file_checkpoint().file_offset;
753 let device_size = device.size();
754 'bad_replay: for JournaledTransaction {
755 checkpoint,
756 root_parent_mutations,
757 root_mutations,
758 non_root_mutations,
759 checksums,
760 ..
761 } in &transactions
762 {
763 for JournaledChecksums { device_range, checksums, first_write } in checksums {
764 checksum_list
765 .push(
766 checkpoint.file_offset,
767 device_range.clone(),
768 checksums.maybe_as_ref().context("Malformed checksums")?,
769 *first_write,
770 )
771 .context("Pushing journal checksum records to checksum list")?;
772 }
773 for mutation in root_parent_mutations
774 .iter()
775 .chain(root_mutations)
776 .chain(non_root_mutations.iter().map(|(_, m)| m))
777 {
778 if !self.validate_mutation(mutation, block_size, device_size) {
779 info!(mutation:?; "Stopping replay at bad mutation");
780 valid_to = checkpoint.file_offset;
781 break 'bad_replay;
782 }
783 self.update_checksum_list(checkpoint.file_offset, &mutation, &mut checksum_list)?;
784 }
785 }
786
787 let valid_to = checksum_list
790 .verify(device.as_ref(), valid_to)
791 .await
792 .context("Failed to validate checksums")?;
793
794 let mut last_checkpoint = reader.journal_file_checkpoint();
797 let mut journal_offsets = super_block.journal_file_offsets.clone();
798
799 for (
802 index,
803 JournaledTransaction {
804 checkpoint,
805 root_parent_mutations,
806 end_flush,
807 volume_deleted,
808 ..
809 },
810 ) in transactions.iter_mut().enumerate()
811 {
812 if checkpoint.file_offset >= valid_to {
813 last_checkpoint = checkpoint.clone();
814
815 transactions.truncate(index);
817 break;
818 }
819
820 let context = ApplyContext { mode: ApplyMode::Replay, checkpoint: checkpoint.clone() };
821 for mutation in root_parent_mutations.drain(..) {
822 self.objects
823 .apply_mutation(
824 super_block.root_parent_store_object_id,
825 mutation,
826 &context,
827 AssocObj::None,
828 )
829 .context("Failed to replay root parent store mutations")?;
830 }
831
832 if let Some((object_id, journal_offset)) = end_flush {
833 journal_offsets.insert(*object_id, *journal_offset);
834 }
835
836 if let Some(object_id) = volume_deleted {
837 journal_offsets.insert(*object_id, VOLUME_DELETED);
838 }
839 }
840
841 let root_store = ObjectStore::open(
843 &root_parent,
844 super_block.root_store_object_id,
845 Box::new(NullCache {}),
846 )
847 .await
848 .context("Unable to open root store")?;
849
850 ensure!(
851 !root_store.is_encrypted(),
852 anyhow!(FxfsError::Inconsistent).context("Root store is encrypted")
853 );
854 self.objects.set_root_store(root_store);
855
856 let root_store_offset =
857 journal_offsets.get(&super_block.root_store_object_id).copied().unwrap_or(0);
858
859 for JournaledTransaction { checkpoint, root_mutations, .. } in &mut transactions {
861 if checkpoint.file_offset < root_store_offset {
862 continue;
863 }
864
865 let context = ApplyContext { mode: ApplyMode::Replay, checkpoint: checkpoint.clone() };
866 for mutation in root_mutations.drain(..) {
867 self.objects
868 .apply_mutation(
869 super_block.root_store_object_id,
870 mutation,
871 &context,
872 AssocObj::None,
873 )
874 .context("Failed to replay root store mutations")?;
875 }
876 }
877
878 allocator.open().await.context("Failed to open allocator")?;
880
881 for JournaledTransaction { checkpoint, non_root_mutations, end_offset, .. } in transactions
883 {
884 self.objects
885 .replay_mutations(
886 non_root_mutations,
887 &journal_offsets,
888 &ApplyContext { mode: ApplyMode::Replay, checkpoint },
889 end_offset,
890 )
891 .await
892 .context("Failed to replay mutations")?;
893 }
894
895 allocator.on_replay_complete().await.context("Failed to complete replay for allocator")?;
896
897 let discarded_to =
898 if last_checkpoint.file_offset != reader.journal_file_checkpoint().file_offset {
899 Some(reader.journal_file_checkpoint().file_offset)
900 } else {
901 None
902 };
903
904 {
906 if last_checkpoint.file_offset < super_block.super_block_journal_file_offset {
907 return Err(anyhow!(FxfsError::Inconsistent).context(format!(
908 "journal replay cut short; journal finishes at {}, but super-block was \
909 written at {}",
910 last_checkpoint.file_offset, super_block.super_block_journal_file_offset
911 )));
912 }
913 let handle = ObjectStore::open_object(
914 &root_parent,
915 super_block.journal_object_id,
916 journal_handle_options(),
917 None,
918 )
919 .await
920 .with_context(|| {
921 format!(
922 "Failed to open journal file (object id: {})",
923 super_block.journal_object_id
924 )
925 })?;
926 let _ = self.handle.set(handle);
927 let mut inner = self.inner.lock();
928 reader.skip_to_end_of_block();
929 let mut writer_checkpoint = reader.journal_file_checkpoint();
930
931 std::mem::drop(reader);
933
934 writer_checkpoint.checksum ^= RESET_XOR;
936 writer_checkpoint.version = LATEST_VERSION;
937 inner.flushed_offset = writer_checkpoint.file_offset;
938
939 inner.device_flushed_offset = inner.flushed_offset;
941
942 inner.writer.seek(writer_checkpoint);
943 inner.output_reset_version = true;
944 inner.valid_to = last_checkpoint.file_offset;
945 if last_checkpoint.file_offset < inner.flushed_offset {
946 inner.discard_offset = Some(last_checkpoint.file_offset);
947 }
948 }
949
950 self.objects
951 .on_replay_complete()
952 .await
953 .context("Failed to complete replay for object manager")?;
954
955 info!(checkpoint = last_checkpoint.file_offset, discarded_to; "replay complete");
956 Ok(())
957 }
958
959 async fn read_transactions(
960 &self,
961 reader: &mut JournalReader,
962 end_offset: Option<u64>,
963 object_id_filter: u64,
964 ) -> Result<JournaledTransactions, Error> {
965 let mut transactions = Vec::new();
966 let (mut device_flushed_offset, root_parent_store_object_id, root_store_object_id) = {
967 let super_block = &self.inner.lock().super_block_header;
968 (
969 super_block.super_block_journal_file_offset,
970 super_block.root_parent_store_object_id,
971 super_block.root_store_object_id,
972 )
973 };
974 let mut current_transaction = None;
975 let mut begin_flush_offsets = HashMap::default();
976 let mut stores_deleted = HashSet::new();
977 loop {
978 let checkpoint = reader.journal_file_checkpoint();
980 if let Some(end_offset) = end_offset {
981 if checkpoint.file_offset >= end_offset {
982 break;
983 }
984 }
985 let result =
986 reader.deserialize().await.context("Failed to deserialize journal record")?;
987 match result {
988 ReadResult::Reset(_) => {
989 if current_transaction.is_some() {
990 current_transaction = None;
991 transactions.pop();
992 }
993 let offset = reader.journal_file_checkpoint().file_offset;
994 if offset > device_flushed_offset {
995 device_flushed_offset = offset;
996 }
997 }
998 ReadResult::Some(record) => {
999 match record {
1000 JournalRecord::EndBlock => {
1001 reader.skip_to_end_of_block();
1002 }
1003 JournalRecord::Mutation { object_id, mutation } => {
1004 let current_transaction = match current_transaction.as_mut() {
1005 None => {
1006 transactions.push(JournaledTransaction::new(checkpoint));
1007 current_transaction = transactions.last_mut();
1008 current_transaction.as_mut().unwrap()
1009 }
1010 Some(transaction) => transaction,
1011 };
1012
1013 if stores_deleted.contains(&object_id) {
1014 bail!(
1015 anyhow!(FxfsError::Inconsistent)
1016 .context("Encountered mutations for deleted store")
1017 );
1018 }
1019
1020 match &mutation {
1021 Mutation::BeginFlush => {
1022 begin_flush_offsets.insert(
1023 object_id,
1024 current_transaction.checkpoint.file_offset,
1025 );
1026 }
1027 Mutation::EndFlush => {
1028 if let Some(offset) = begin_flush_offsets.remove(&object_id) {
1029 if let Some(deleted_volume) =
1030 ¤t_transaction.volume_deleted
1031 {
1032 if *deleted_volume == object_id {
1033 bail!(anyhow!(FxfsError::Inconsistent).context(
1034 "Multiple EndFlush/DeleteVolume mutations in a \
1035 single transaction for the same object"
1036 ));
1037 }
1038 }
1039 if current_transaction
1043 .end_flush
1044 .replace((object_id, offset + 1))
1045 .is_some()
1046 {
1047 bail!(anyhow!(FxfsError::Inconsistent).context(
1048 "Multiple EndFlush mutations in a \
1049 single transaction"
1050 ));
1051 }
1052 }
1053 }
1054 Mutation::DeleteVolume => {
1055 if let Some((flushed_object, _)) =
1056 ¤t_transaction.end_flush
1057 {
1058 if *flushed_object == object_id {
1059 bail!(anyhow!(FxfsError::Inconsistent).context(
1060 "Multiple EndFlush/DeleteVolume mutations in a \
1061 single transaction for the same object"
1062 ));
1063 }
1064 }
1065 if current_transaction
1066 .volume_deleted
1067 .replace(object_id)
1068 .is_some()
1069 {
1070 bail!(anyhow!(FxfsError::Inconsistent).context(
1071 "Multiple DeleteVolume mutations in a single \
1072 transaction"
1073 ));
1074 }
1075 stores_deleted.insert(object_id);
1076 }
1077 _ => {}
1078 }
1079
1080 if (object_id_filter == INVALID_OBJECT_ID
1083 || object_id_filter == object_id)
1084 && self.should_apply(object_id, ¤t_transaction.checkpoint)
1085 {
1086 if object_id == root_parent_store_object_id {
1087 current_transaction.root_parent_mutations.push(mutation);
1088 } else if object_id == root_store_object_id {
1089 current_transaction.root_mutations.push(mutation);
1090 } else {
1091 current_transaction
1092 .non_root_mutations
1093 .push((object_id, mutation));
1094 }
1095 }
1096 }
1097 JournalRecord::DataChecksums(device_range, checksums, first_write) => {
1098 let current_transaction = match current_transaction.as_mut() {
1099 None => {
1100 transactions.push(JournaledTransaction::new(checkpoint));
1101 current_transaction = transactions.last_mut();
1102 current_transaction.as_mut().unwrap()
1103 }
1104 Some(transaction) => transaction,
1105 };
1106 current_transaction.checksums.push(JournaledChecksums {
1107 device_range,
1108 checksums,
1109 first_write,
1110 });
1111 }
1112 JournalRecord::Commit => {
1113 if let Some(&mut JournaledTransaction {
1114 ref checkpoint,
1115 ref root_parent_mutations,
1116 ref mut end_offset,
1117 ..
1118 }) = current_transaction.take()
1119 {
1120 for mutation in root_parent_mutations {
1121 if let Mutation::ObjectStore(ObjectStoreMutation {
1125 item:
1126 Item {
1127 key:
1128 ObjectKey {
1129 object_id,
1130 data:
1131 ObjectKeyData::Attribute(
1132 DEFAULT_DATA_ATTRIBUTE_ID,
1133 AttributeKey::Extent(ExtentKey {
1134 range,
1135 }),
1136 ),
1137 ..
1138 },
1139 value:
1140 ObjectValue::Extent(ExtentValue::Some {
1141 device_offset,
1142 ..
1143 }),
1144 ..
1145 },
1146 ..
1147 }) = mutation
1148 {
1149 let handle = reader.handle();
1152 if *object_id != handle.object_id() {
1153 continue;
1154 }
1155 if let Some(end_offset) = handle.end_offset() {
1156 if range.start != end_offset {
1157 bail!(anyhow!(FxfsError::Inconsistent).context(
1158 format!(
1159 "Unexpected journal extent {:?} -> {}, \
1160 expected start: {}",
1161 range, device_offset, end_offset,
1162 )
1163 ));
1164 }
1165 }
1166 handle.push_extent(
1167 checkpoint.file_offset,
1168 *device_offset
1169 ..*device_offset
1170 + range.length().context("Invalid extent")?,
1171 );
1172 }
1173 }
1174 *end_offset = reader.journal_file_checkpoint().file_offset;
1175 }
1176 }
1177 JournalRecord::Discard(offset) => {
1178 if offset == 0 {
1179 bail!(
1180 anyhow!(FxfsError::Inconsistent)
1181 .context("Invalid offset for Discard")
1182 );
1183 }
1184 if let Some(transaction) = current_transaction.as_ref() {
1185 if transaction.checkpoint.file_offset < offset {
1186 continue;
1188 }
1189 }
1190 current_transaction = None;
1191 while let Some(transaction) = transactions.last() {
1192 if transaction.checkpoint.file_offset < offset {
1193 break;
1194 }
1195 transactions.pop();
1196 }
1197 reader.handle().discard_extents(offset);
1198 }
1199 JournalRecord::DidFlushDevice(offset) => {
1200 if offset > device_flushed_offset {
1201 device_flushed_offset = offset;
1202 }
1203 }
1204 }
1205 }
1206 ReadResult::ChecksumMismatch => break,
1208 }
1209 }
1210
1211 if current_transaction.is_some() {
1213 transactions.pop();
1214 }
1215
1216 Ok(JournaledTransactions { transactions, device_flushed_offset })
1217 }
1218
1219 pub async fn init_empty(&self, filesystem: Arc<FxFilesystem>) -> Result<(), Error> {
1222 const INIT_ROOT_PARENT_STORE_OBJECT_ID: u64 = 3;
1228 const INIT_ROOT_STORE_OBJECT_ID: u64 = 4;
1229 const INIT_ALLOCATOR_OBJECT_ID: u64 = 5;
1230
1231 info!(device_size = filesystem.device().size(); "Formatting");
1232
1233 let checkpoint = JournalCheckpoint {
1234 version: LATEST_VERSION,
1235 ..self.inner.lock().writer.journal_file_checkpoint()
1236 };
1237
1238 let mut current_generation = 1;
1239 if filesystem.options().image_builder_mode.is_some() {
1240 let block_size = filesystem.block_size();
1248 match self.read_superblocks(filesystem.device(), block_size).await {
1249 Ok((super_block, _)) => {
1250 log::info!(
1251 "Found existing superblock with generation {}. Bumping by 1.",
1252 super_block.generation
1253 );
1254 current_generation = super_block.generation.wrapping_add(1);
1255 }
1256 Err(_) => {
1257 }
1261 }
1262 }
1263
1264 let root_parent = ObjectStore::new_empty(
1265 None,
1266 INIT_ROOT_PARENT_STORE_OBJECT_ID,
1267 filesystem.clone(),
1268 Box::new(NullCache {}),
1269 );
1270 self.objects.set_root_parent_store(root_parent.clone());
1271
1272 let allocator = Arc::new(Allocator::new(filesystem.clone(), INIT_ALLOCATOR_OBJECT_ID));
1273 self.objects.set_allocator(allocator.clone());
1274 self.objects.init_metadata_reservation()?;
1275
1276 let journal_handle;
1277 let super_block_a_handle;
1278 let super_block_b_handle;
1279 let root_store;
1280 let mut transaction = filesystem
1281 .clone()
1282 .new_transaction(
1283 lock_keys![],
1284 Options { skip_journal_checks: true, ..Default::default() },
1285 )
1286 .await?;
1287 root_store = root_parent
1288 .new_child_store(
1289 &mut transaction,
1290 NewChildStoreOptions { object_id: INIT_ROOT_STORE_OBJECT_ID, ..Default::default() },
1291 Box::new(NullCache {}),
1292 )
1293 .await
1294 .context("new_child_store")?;
1295 self.objects.set_root_store(root_store.clone());
1296
1297 allocator.create(&mut transaction).await?;
1298
1299 super_block_a_handle = ObjectStore::create_object_with_id(
1301 &root_store,
1302 &mut transaction,
1303 ReservedId::new(&root_store, NonZero::new(SuperBlockInstance::A.object_id()).unwrap()),
1304 HandleOptions::default(),
1305 None,
1306 )
1307 .context("create super block")?;
1308 root_store.update_last_object_id(SuperBlockInstance::A.object_id());
1309 super_block_a_handle
1310 .extend(&mut transaction, SuperBlockInstance::A.first_extent())
1311 .await
1312 .context("extend super block")?;
1313 super_block_b_handle = ObjectStore::create_object_with_id(
1314 &root_store,
1315 &mut transaction,
1316 ReservedId::new(&root_store, NonZero::new(SuperBlockInstance::B.object_id()).unwrap()),
1317 HandleOptions::default(),
1318 None,
1319 )
1320 .context("create super block")?;
1321 root_store.update_last_object_id(SuperBlockInstance::B.object_id());
1322 super_block_b_handle
1323 .extend(&mut transaction, SuperBlockInstance::B.first_extent())
1324 .await
1325 .context("extend super block")?;
1326
1327 journal_handle = ObjectStore::create_object(
1329 &root_parent,
1330 &mut transaction,
1331 journal_handle_options(),
1332 None,
1333 )
1334 .await
1335 .context("create journal")?;
1336 if self.inner.lock().image_builder_mode.is_none() {
1337 let mut file_range = 0..self.chunk_size();
1338 journal_handle
1339 .preallocate_range(&mut transaction, &mut file_range)
1340 .await
1341 .context("preallocate journal")?;
1342 if file_range.start < file_range.end {
1343 bail!("preallocate_range returned too little space");
1344 }
1345 }
1346
1347 root_store.create(&mut transaction).await?;
1349
1350 root_parent.set_graveyard_directory_object_id(
1352 Graveyard::create(&mut transaction, &root_parent).await?,
1353 );
1354
1355 transaction.commit().await?;
1356
1357 self.inner.lock().super_block_header = SuperBlockHeader::new(
1358 current_generation,
1359 root_parent.store_object_id(),
1360 root_parent.graveyard_directory_object_id(),
1361 root_store.store_object_id(),
1362 allocator.object_id(),
1363 journal_handle.object_id(),
1364 checkpoint,
1365 LATEST_VERSION,
1366 );
1367
1368 let _ = self.handle.set(journal_handle);
1370 Ok(())
1371 }
1372
1373 pub async fn allocate_journal(&self) -> Result<(), Error> {
1376 let handle = self.handle.get().unwrap();
1377 let filesystem = handle.store().filesystem();
1378 let mut transaction = filesystem
1379 .clone()
1380 .new_transaction(
1381 lock_keys![LockKey::object(handle.store().store_object_id(), handle.object_id()),],
1382 Options { skip_journal_checks: true, ..Default::default() },
1383 )
1384 .await?;
1385 let mut file_range = 0..self.chunk_size();
1386 self.handle
1387 .get()
1388 .unwrap()
1389 .preallocate_range(&mut transaction, &mut file_range)
1390 .await
1391 .context("preallocate journal")?;
1392 if file_range.start < file_range.end {
1393 bail!("preallocate_range returned too little space");
1394 }
1395 transaction.commit().await?;
1396 Ok(())
1397 }
1398
1399 pub async fn init_superblocks(&self) -> Result<(), Error> {
1400 for _ in 0..2 {
1402 self.write_super_block().await?;
1403 }
1404 Ok(())
1405 }
1406
1407 pub async fn read_transactions_for_object(
1413 &self,
1414 object_id: u64,
1415 ) -> Result<Vec<JournaledTransaction>, Error> {
1416 let handle = self.handle.get().expect("No journal handle");
1417 let handle = ObjectStore::open_object(
1419 handle.owner(),
1420 handle.object_id(),
1421 journal_handle_options(),
1422 None,
1423 )
1424 .await?;
1425
1426 let checkpoint = match self.objects.journal_checkpoint(object_id) {
1427 Some(checkpoint) => checkpoint,
1428 None => return Ok(vec![]),
1429 };
1430 let mut reader = JournalReader::new(handle, &checkpoint);
1431 let end_offset = self.inner.lock().valid_to;
1434 Ok(self.read_transactions(&mut reader, Some(end_offset), object_id).await?.transactions)
1435 }
1436
1437 pub async fn commit(&self, transaction: &mut Transaction<'_>) -> Result<u64, Error> {
1439 if transaction.is_empty() {
1440 return Ok(self.inner.lock().writer.journal_file_checkpoint().file_offset);
1441 }
1442
1443 self.pre_commit(transaction).await?;
1444 Ok(self.write_and_apply_mutations(transaction))
1445 }
1446
1447 async fn pre_commit(&self, transaction: &Transaction<'_>) -> Result<(), Error> {
1450 let handle;
1451
1452 let (size, zero_offset) = {
1453 let mut inner = self.inner.lock();
1454
1455 if std::mem::take(&mut inner.output_reset_version) {
1457 LATEST_VERSION.serialize_into(&mut inner.writer)?;
1458 }
1459
1460 if let Some(discard_offset) = inner.discard_offset {
1461 JournalRecord::Discard(discard_offset).serialize_into(&mut inner.writer)?;
1462 inner.discard_offset = None;
1463 }
1464
1465 if inner.needs_did_flush_device {
1466 let offset = inner.device_flushed_offset;
1467 JournalRecord::DidFlushDevice(offset).serialize_into(&mut inner.writer)?;
1468 inner.needs_did_flush_device = false;
1469 }
1470
1471 handle = match self.handle.get() {
1472 None => return Ok(()),
1473 Some(x) => x,
1474 };
1475
1476 let file_offset = inner.writer.journal_file_checkpoint().file_offset;
1477
1478 let size = handle.get_size();
1479 let size = if file_offset + self.chunk_size() > size { Some(size) } else { None };
1480
1481 if size.is_none()
1482 && inner.zero_offset.is_none()
1483 && !self.objects.needs_borrow_for_journal(file_offset)
1484 {
1485 return Ok(());
1486 }
1487
1488 (size, inner.zero_offset)
1489 };
1490
1491 let mut transaction = handle
1492 .new_transaction_with_options(Options {
1493 skip_journal_checks: true,
1494 borrow_metadata_space: true,
1495 allocator_reservation: Some(self.objects.metadata_reservation()),
1496 txn_guard: Some(transaction.txn_guard()),
1497 ..Default::default()
1498 })
1499 .await?;
1500 if let Some(size) = size {
1501 handle
1502 .preallocate_range(&mut transaction, &mut (size..size + self.chunk_size()))
1503 .await?;
1504 }
1505 if let Some(zero_offset) = zero_offset {
1506 handle.zero(&mut transaction, 0..zero_offset).await?;
1507 }
1508
1509 self.write_and_apply_mutations(&mut transaction);
1512
1513 let mut inner = self.inner.lock();
1514
1515 if let Some(size) = size {
1518 assert!(inner.writer.journal_file_checkpoint().file_offset < size);
1519 }
1520
1521 if inner.zero_offset == zero_offset {
1522 inner.zero_offset = None;
1523 }
1524
1525 Ok(())
1526 }
1527
1528 fn should_apply(&self, object_id: u64, journal_file_checkpoint: &JournalCheckpoint) -> bool {
1532 let super_block_header = &self.inner.lock().super_block_header;
1533 let offset = super_block_header
1534 .journal_file_offsets
1535 .get(&object_id)
1536 .cloned()
1537 .unwrap_or(super_block_header.super_block_journal_file_offset);
1538 journal_file_checkpoint.file_offset >= offset
1539 }
1540
1541 async fn write_super_block(&self) -> Result<(), Error> {
1544 let root_parent_store = self.objects.root_parent_store();
1545
1546 let old_layers;
1551 let old_super_block_offset;
1552 let mut new_super_block_header;
1553 let checkpoint;
1554 let borrowed;
1555
1556 {
1557 let _sync_guard = debug_assert_not_too_long!(self.sync_mutex.lock());
1558 {
1559 let _write_guard = self.writer_mutex.lock();
1560 (checkpoint, borrowed) = self.pad_to_block()?;
1561 old_layers = super_block::compact_root_parent(&*root_parent_store)?;
1562 }
1563 self.flush_device(checkpoint.file_offset)
1564 .await
1565 .context("flush failed when writing superblock")?;
1566 }
1567
1568 new_super_block_header = self.inner.lock().super_block_header.clone();
1569
1570 old_super_block_offset = new_super_block_header.journal_checkpoint.file_offset;
1571
1572 let (journal_file_offsets, min_checkpoint) = self.objects.journal_file_offsets();
1573
1574 new_super_block_header.generation = new_super_block_header.generation.wrapping_add(1);
1575 new_super_block_header.super_block_journal_file_offset = checkpoint.file_offset;
1576 new_super_block_header.journal_checkpoint = min_checkpoint.unwrap_or(checkpoint);
1577 new_super_block_header.journal_checkpoint.version = LATEST_VERSION;
1578 new_super_block_header.journal_file_offsets = journal_file_offsets;
1579 new_super_block_header.borrowed_metadata_space = borrowed;
1580
1581 self.super_block_manager
1582 .save(
1583 new_super_block_header.clone(),
1584 self.objects.root_parent_store().filesystem(),
1585 old_layers,
1586 )
1587 .await?;
1588 {
1589 let mut inner = self.inner.lock();
1590 inner.super_block_header = new_super_block_header;
1591 inner.zero_offset = Some(round_down(old_super_block_offset, BLOCK_SIZE));
1592 }
1593
1594 Ok(())
1595 }
1596
1597 pub async fn sync(
1604 &self,
1605 options: SyncOptions<'_>,
1606 ) -> Result<Option<(JournalCheckpoint, u64)>, Error> {
1607 let _guard = debug_assert_not_too_long!(self.sync_mutex.lock());
1608
1609 let (checkpoint, borrowed) = {
1610 if let Some(precondition) = options.precondition {
1611 if !precondition() {
1612 return Ok(None);
1613 }
1614 }
1615
1616 let _guard = self.writer_mutex.lock();
1619
1620 self.pad_to_block()?
1621 };
1622
1623 if options.flush_device {
1624 self.flush_device(checkpoint.file_offset).await.context("sync: flush failed")?;
1625 }
1626
1627 Ok(Some((checkpoint, borrowed)))
1628 }
1629
1630 fn pad_to_block(&self) -> Result<(JournalCheckpoint, u64), Error> {
1634 let mut inner = self.inner.lock();
1635 let checkpoint = inner.writer.journal_file_checkpoint();
1636 if checkpoint.file_offset % BLOCK_SIZE != 0 {
1637 JournalRecord::EndBlock.serialize_into(&mut inner.writer)?;
1638 inner.writer.pad_to_block()?;
1639 if let Some(waker) = inner.flush_waker.take() {
1640 waker.wake();
1641 }
1642 }
1643 Ok((checkpoint, self.objects.borrowed_metadata_space()))
1644 }
1645
1646 async fn flush_device(&self, checkpoint_offset: u64) -> Result<(), Error> {
1647 assert!(
1648 self.inner.lock().image_builder_mode.is_none(),
1649 "flush_device called in image builder mode"
1650 );
1651 debug_assert_not_too_long!(poll_fn(|ctx| {
1652 let mut inner = self.inner.lock();
1653 if inner.flushed_offset >= checkpoint_offset {
1654 Poll::Ready(Ok(()))
1655 } else if inner.terminate {
1656 let context = inner
1657 .terminate_reason
1658 .as_ref()
1659 .map(|e| format!("Journal closed with error: {:?}", e))
1660 .unwrap_or_else(|| "Journal closed".to_string());
1661 Poll::Ready(Err(anyhow!(FxfsError::JournalFlushError).context(context)))
1662 } else {
1663 inner.sync_waker = Some(ctx.waker().clone());
1664 Poll::Pending
1665 }
1666 }))?;
1667
1668 let needs_flush = self.inner.lock().device_flushed_offset < checkpoint_offset;
1669 if needs_flush {
1670 let trace = self.trace.load(Ordering::Relaxed);
1671 if trace {
1672 info!("J: start flush device");
1673 }
1674 self.handle.get().unwrap().flush_device().await?;
1675 if trace {
1676 info!("J: end flush device");
1677 }
1678
1679 {
1687 let mut inner = self.inner.lock();
1688 inner.device_flushed_offset = checkpoint_offset;
1689 inner.needs_did_flush_device = true;
1690 }
1691
1692 self.objects.allocator().did_flush_device(checkpoint_offset);
1695 if trace {
1696 info!("J: did flush device");
1697 }
1698 }
1699
1700 Ok(())
1701 }
1702
1703 pub fn super_block_header(&self) -> SuperBlockHeader {
1705 self.inner.lock().super_block_header.clone()
1706 }
1707
1708 pub async fn check_journal_space(&self) -> Result<(), Error> {
1710 loop {
1711 debug_assert_not_too_long!({
1712 let inner = self.inner.lock();
1713 if inner.terminate {
1714 let context = inner
1717 .terminate_reason
1718 .as_ref()
1719 .map(|e| format!("Journal closed with error: {:?}", e))
1720 .unwrap_or_else(|| "Journal closed".to_string());
1721 break Err(anyhow!(FxfsError::JournalFlushError).context(context));
1722 }
1723 if self.objects.last_end_offset()
1724 - inner.super_block_header.journal_checkpoint.file_offset
1725 < inner.reclaim_size
1726 {
1727 break Ok(());
1728 }
1729 if inner.image_builder_mode.is_some() {
1730 break Ok(());
1731 }
1732 if inner.disable_compactions {
1733 break Err(
1734 anyhow!(FxfsError::JournalFlushError).context("Compactions disabled")
1735 );
1736 }
1737 self.reclaim_event.listen()
1738 });
1739 }
1740 }
1741
1742 fn chunk_size(&self) -> u64 {
1743 CHUNK_SIZE
1744 }
1745
1746 fn write_and_apply_mutations(&self, transaction: &mut Transaction<'_>) -> u64 {
1747 let checkpoint_before;
1748 let checkpoint_after;
1749 {
1750 let _guard = self.writer_mutex.lock();
1751 checkpoint_before = {
1752 let mut inner = self.inner.lock();
1753 if transaction.includes_write() {
1754 inner.needs_barrier = true;
1755 }
1756 let checkpoint = inner.writer.journal_file_checkpoint();
1757 for TxnMutation { object_id, mutation, .. } in transaction.mutations() {
1758 self.objects.write_mutation(
1759 *object_id,
1760 mutation,
1761 Writer(*object_id, &mut inner.writer),
1762 );
1763 }
1764 checkpoint
1765 };
1766 let maybe_mutation =
1767 self.objects.apply_transaction(transaction, &checkpoint_before).expect(
1768 "apply_transaction should not fail in live mode; \
1769 filesystem will be in an inconsistent state",
1770 );
1771 checkpoint_after = {
1772 let mut inner = self.inner.lock();
1773 if let Some(mutation) = maybe_mutation {
1774 inner
1775 .writer
1776 .write_record(&JournalRecord::Mutation { object_id: 0, mutation })
1777 .unwrap();
1778 }
1779 for (device_range, checksums, first_write) in
1780 transaction.take_checksums().into_iter()
1781 {
1782 inner
1783 .writer
1784 .write_record(&JournalRecord::DataChecksums(
1785 device_range,
1786 Checksums::fletcher(checksums),
1787 first_write,
1788 ))
1789 .unwrap();
1790 }
1791 inner.writer.write_record(&JournalRecord::Commit).unwrap();
1792
1793 inner.writer.journal_file_checkpoint()
1794 };
1795 }
1796 self.objects.did_commit_transaction(
1797 transaction,
1798 &checkpoint_before,
1799 checkpoint_after.file_offset,
1800 );
1801
1802 if let Some(waker) = self.inner.lock().flush_waker.take() {
1803 waker.wake();
1804 }
1805
1806 checkpoint_before.file_offset
1807 }
1808
1809 pub async fn flush_task(self: Arc<Self>) {
1813 let mut flush_fut = None;
1814 let mut compact_fut = None;
1815 let mut flush_error = false;
1816 poll_fn(|ctx| {
1817 loop {
1818 {
1819 let mut inner = self.inner.lock();
1820 if flush_fut.is_none() && !flush_error && self.handle.get().is_some() {
1821 let flushable = inner.writer.flushable_bytes();
1822 if flushable > 0 {
1823 flush_fut = Some(Box::pin(self.flush(flushable)));
1824 }
1825 }
1826 if inner.terminate && flush_fut.is_none() && compact_fut.is_none() {
1827 return Poll::Ready(());
1828 }
1829 if compact_fut.is_none()
1833 && !inner.terminate
1834 && !inner.disable_compactions
1835 && inner.image_builder_mode.is_none()
1836 && self.objects.last_end_offset()
1837 - inner.super_block_header.journal_checkpoint.file_offset
1838 > inner.reclaim_size / 2
1839 {
1840 compact_fut = Some(Box::pin(self.compact()));
1841 inner.compaction_running = true;
1842 }
1843 inner.flush_waker = Some(ctx.waker().clone());
1844 }
1845 let mut pending = true;
1846 if let Some(fut) = flush_fut.as_mut() {
1847 if let Poll::Ready(result) = fut.poll_unpin(ctx) {
1848 if let Err(e) = result {
1849 self.inner.lock().terminate(Some(e.context("Flush error")));
1850 self.reclaim_event.notify(usize::MAX);
1851 flush_error = true;
1852 }
1853 flush_fut = None;
1854 pending = false;
1855 }
1856 }
1857 if let Some(fut) = compact_fut.as_mut() {
1858 if let Poll::Ready(result) = fut.poll_unpin(ctx) {
1859 let mut inner = self.inner.lock();
1860 if let Err(e) = result {
1861 inner.terminate(Some(e.context("Compaction error")));
1862 }
1863 compact_fut = None;
1864 inner.compaction_running = false;
1865 self.reclaim_event.notify(usize::MAX);
1866 pending = false;
1867 }
1868 }
1869 if pending {
1870 return Poll::Pending;
1871 }
1872 }
1873 })
1874 .await;
1875 }
1876
1877 pub fn get_compaction_yielder(&self) -> CompactionYielder<'_> {
1879 CompactionYielder::new(self)
1880 }
1881
1882 async fn flush(&self, amount: usize) -> Result<(), Error> {
1883 let handle = self.handle.get().unwrap();
1884 let mut buf = handle.allocate_buffer(amount).await;
1885 let (offset, len, barrier_on_first_write) = {
1886 let mut inner = self.inner.lock();
1887 let offset = inner.writer.take_flushable(buf.as_mut());
1888 let barrier_on_first_write = inner.needs_barrier && inner.barriers_enabled;
1889 inner.needs_barrier = false;
1892 (offset, buf.len() as u64, barrier_on_first_write)
1893 };
1894 self.handle
1895 .get()
1896 .unwrap()
1897 .overwrite(
1898 offset,
1899 buf.as_mut(),
1900 OverwriteOptions { barrier_on_first_write, ..Default::default() },
1901 )
1902 .await?;
1903
1904 let mut inner = self.inner.lock();
1905 if let Some(waker) = inner.sync_waker.take() {
1906 waker.wake();
1907 }
1908 inner.flushed_offset = offset + len;
1909 inner.valid_to = inner.flushed_offset;
1910 Ok(())
1911 }
1912
1913 #[trace]
1914 async fn compact(&self) -> Result<(), Error> {
1915 assert!(
1916 self.inner.lock().image_builder_mode.is_none(),
1917 "compact called in image builder mode"
1918 );
1919 let bytes_before = self.objects.compaction_bytes_written();
1920 let _measure = crate::metrics::DurationMeasureScope::new(
1921 &crate::metrics::lsm_tree_metrics().journal_compaction_time,
1922 );
1923 crate::metrics::lsm_tree_metrics().journal_compactions_total.add(1);
1924 let trace = self.trace.load(Ordering::Relaxed);
1925 debug!("Compaction starting");
1926 if trace {
1927 info!("J: start compaction");
1928 }
1929 let earliest_version = self.objects.flush().await.context("Failed to flush objects")?;
1930 self.inner.lock().super_block_header.earliest_version = earliest_version;
1931 self.write_super_block().await.context("Failed to write superblock")?;
1932 if trace {
1933 info!("J: end compaction");
1934 }
1935 debug!("Compaction finished");
1936 let bytes_after = self.objects.compaction_bytes_written();
1937 crate::metrics::lsm_tree_metrics()
1938 .journal_compaction_bytes_written
1939 .add(bytes_after.saturating_sub(bytes_before));
1940 Ok(())
1941 }
1942
1943 pub async fn force_compact(&self) -> Result<(), Error> {
1946 self.inner.lock().forced_compaction = true;
1947 scopeguard::defer! { self.inner.lock().forced_compaction = false; }
1948 self.compact().await
1949 }
1950
1951 pub async fn stop_compactions(&self) {
1952 loop {
1953 debug_assert_not_too_long!({
1954 let mut inner = self.inner.lock();
1955 inner.disable_compactions = true;
1956 if !inner.compaction_running {
1957 return;
1958 }
1959 self.reclaim_event.listen()
1960 });
1961 }
1962 }
1963
1964 pub fn track_statistics(self: &Arc<Self>, parent: &fuchsia_inspect::Node, name: &str) {
1967 let this = Arc::downgrade(self);
1968 parent.record_lazy_child(name, move || {
1969 let this_clone = this.clone();
1970 async move {
1971 let inspector = fuchsia_inspect::Inspector::default();
1972 if let Some(this) = this_clone.upgrade() {
1973 let (journal_min, journal_max, journal_reclaim_size) = {
1974 let inner = this.inner.lock();
1976 (
1977 round_down(
1978 inner.super_block_header.journal_checkpoint.file_offset,
1979 BLOCK_SIZE,
1980 ),
1981 inner.flushed_offset,
1982 inner.reclaim_size,
1983 )
1984 };
1985 let root = inspector.root();
1986 root.record_uint("journal_min_offset", journal_min);
1987 root.record_uint("journal_max_offset", journal_max);
1988 root.record_uint("journal_size", journal_max - journal_min);
1989 root.record_uint("journal_reclaim_size", journal_reclaim_size);
1990
1991 if let Some(x) = round_div(
1993 100 * (journal_max - journal_min),
1994 this.objects.allocator().get_disk_bytes(),
1995 ) {
1996 root.record_uint("journal_size_to_disk_size_percent", x);
1997 }
1998 }
1999 Ok(inspector)
2000 }
2001 .boxed()
2002 });
2003 }
2004
2005 pub fn terminate(&self) {
2007 self.inner.lock().terminate(None);
2008 self.reclaim_event.notify(usize::MAX);
2009 }
2010}
2011
2012pub struct Writer<'a>(u64, &'a mut JournalWriter);
2014
2015impl Writer<'_> {
2016 pub fn write(&mut self, mutation: Mutation) {
2017 self.1.write_record(&JournalRecord::Mutation { object_id: self.0, mutation }).unwrap();
2018 }
2019}
2020
2021#[cfg(target_os = "fuchsia")]
2022mod yielder {
2023 use super::Journal;
2024 use crate::lsm_tree::Yielder;
2025 use fuchsia_async as fasync;
2026
2027 pub struct CompactionYielder<'a> {
2031 journal: &'a Journal,
2032 low_priority_task: Option<fasync::LowPriorityTask>,
2033 }
2034
2035 impl<'a> CompactionYielder<'a> {
2036 pub fn new(journal: &'a Journal) -> Self {
2037 Self { journal, low_priority_task: None }
2038 }
2039 }
2040
2041 impl Yielder for CompactionYielder<'_> {
2042 async fn yield_now(&mut self) {
2043 const IDLE_PERIOD: zx::MonotonicDuration = zx::MonotonicDuration::from_millis(4);
2047 const MAX_YIELD_DURATION: zx::MonotonicDuration =
2048 zx::MonotonicDuration::from_millis(16);
2049
2050 {
2051 let inner = self.journal.inner.lock();
2052 if inner.forced_compaction {
2053 return;
2054 }
2055 let outstanding = self.journal.objects.last_end_offset()
2056 - inner.super_block_header.journal_checkpoint.file_offset;
2057 let half_reclaim_size = inner.reclaim_size / 2;
2058 if outstanding
2059 .checked_sub(half_reclaim_size)
2060 .is_some_and(|x| x >= half_reclaim_size / 2)
2061 {
2062 self.low_priority_task = None;
2066 return;
2067 }
2068 }
2069
2070 self.low_priority_task
2071 .get_or_insert_with(|| fasync::LowPriorityTask::new())
2072 .wait_until_idle_for(
2073 IDLE_PERIOD,
2074 fasync::MonotonicInstant::after(MAX_YIELD_DURATION),
2075 )
2076 .await;
2077 }
2078 }
2079}
2080
2081#[cfg(not(target_os = "fuchsia"))]
2082mod yielder {
2083 use super::Journal;
2084 use crate::lsm_tree::Yielder;
2085
2086 #[expect(dead_code)]
2087 pub struct CompactionYielder<'a>(&'a Journal);
2088
2089 impl<'a> CompactionYielder<'a> {
2090 pub fn new(journal: &'a Journal) -> Self {
2091 Self(journal)
2092 }
2093 }
2094
2095 impl Yielder for CompactionYielder<'_> {
2096 async fn yield_now(&mut self) {}
2097 }
2098}
2099
2100pub use yielder::*;
2101
2102#[cfg(test)]
2103mod tests {
2104 use super::SuperBlockInstance;
2105 use crate::filesystem::{FxFilesystem, FxFilesystemBuilder, SyncOptions};
2106 use crate::fsck::fsck;
2107 use crate::object_handle::{ObjectHandle, ReadObjectHandle, WriteObjectHandle};
2108 use crate::object_store::directory::Directory;
2109 use crate::object_store::transaction::Options;
2110 use crate::object_store::volume::root_volume;
2111 use crate::object_store::{
2112 HandleOptions, LockKey, NewChildStoreOptions, ObjectStore, StoreOptions, lock_keys,
2113 };
2114 use fuchsia_async as fasync;
2115 use fuchsia_async::MonotonicDuration;
2116 use storage_device::DeviceHolder;
2117 use storage_device::fake_device::FakeDevice;
2118
2119 const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
2120
2121 #[fuchsia::test]
2122 async fn test_replay() {
2123 const TEST_DATA: &[u8] = b"hello";
2124
2125 let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
2126
2127 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2128
2129 let object_id = {
2130 let root_store = fs.root_store();
2131 let root_directory =
2132 Directory::open(&root_store, root_store.root_directory_object_id())
2133 .await
2134 .expect("open failed");
2135 let mut transaction = fs
2136 .clone()
2137 .new_transaction(
2138 lock_keys![LockKey::object(
2139 root_store.store_object_id(),
2140 root_store.root_directory_object_id(),
2141 )],
2142 Options::default(),
2143 )
2144 .await
2145 .expect("new_transaction failed");
2146 let handle = root_directory
2147 .create_child_file(&mut transaction, "test")
2148 .await
2149 .expect("create_child_file failed");
2150
2151 transaction.commit().await.expect("commit failed");
2152 let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
2153 buf.as_mut_slice().copy_from_slice(TEST_DATA);
2154 handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2155 fs.sync(SyncOptions::default()).await.expect("sync failed");
2158 handle.object_id()
2159 };
2160
2161 {
2162 fs.close().await.expect("Close failed");
2163 let device = fs.take_device().await;
2164 device.reopen(false);
2165 let fs = FxFilesystem::open(device).await.expect("open failed");
2166 let handle = ObjectStore::open_object(
2167 &fs.root_store(),
2168 object_id,
2169 HandleOptions::default(),
2170 None,
2171 )
2172 .await
2173 .expect("open_object failed");
2174 let mut buf = handle.allocate_buffer(TEST_DEVICE_BLOCK_SIZE as usize).await;
2175 assert_eq!(handle.read(0, buf.as_mut()).await.expect("read failed"), TEST_DATA.len());
2176 assert_eq!(&buf.as_slice()[..TEST_DATA.len()], TEST_DATA);
2177 fsck(fs.clone()).await.expect("fsck failed");
2178 fs.close().await.expect("Close failed");
2179 }
2180 }
2181
2182 #[fuchsia::test]
2183 async fn test_reset() {
2184 const TEST_DATA: &[u8] = b"hello";
2185
2186 let device = DeviceHolder::new(FakeDevice::new(32768, TEST_DEVICE_BLOCK_SIZE));
2187
2188 let mut object_ids = Vec::new();
2189
2190 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2191 {
2192 let root_store = fs.root_store();
2193 let root_directory =
2194 Directory::open(&root_store, root_store.root_directory_object_id())
2195 .await
2196 .expect("open failed");
2197 let mut transaction = fs
2198 .clone()
2199 .new_transaction(
2200 lock_keys![LockKey::object(
2201 root_store.store_object_id(),
2202 root_store.root_directory_object_id(),
2203 )],
2204 Options::default(),
2205 )
2206 .await
2207 .expect("new_transaction failed");
2208 let handle = root_directory
2209 .create_child_file(&mut transaction, "test")
2210 .await
2211 .expect("create_child_file failed");
2212 transaction.commit().await.expect("commit failed");
2213 let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
2214 buf.as_mut_slice().copy_from_slice(TEST_DATA);
2215 handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2216 fs.sync(SyncOptions::default()).await.expect("sync failed");
2217 object_ids.push(handle.object_id());
2218
2219 for i in 0..1000 {
2222 let mut transaction = fs
2223 .clone()
2224 .new_transaction(
2225 lock_keys![LockKey::object(
2226 root_store.store_object_id(),
2227 root_store.root_directory_object_id(),
2228 )],
2229 Options::default(),
2230 )
2231 .await
2232 .expect("new_transaction failed");
2233 let handle = root_directory
2234 .create_child_file(&mut transaction, &format!("{}", i))
2235 .await
2236 .expect("create_child_file failed");
2237 transaction.commit().await.expect("commit failed");
2238 let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
2239 buf.as_mut_slice().copy_from_slice(TEST_DATA);
2240 handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2241 object_ids.push(handle.object_id());
2242 }
2243 }
2244 fs.close().await.expect("fs close failed");
2245 let device = fs.take_device().await;
2246 device.reopen(false);
2247 let fs = FxFilesystem::open(device).await.expect("open failed");
2248 fsck(fs.clone()).await.expect("fsck failed");
2249 {
2250 let root_store = fs.root_store();
2251 for &object_id in &object_ids[0..1] {
2253 let handle = ObjectStore::open_object(
2254 &root_store,
2255 object_id,
2256 HandleOptions::default(),
2257 None,
2258 )
2259 .await
2260 .expect("open_object failed");
2261 let mut buf = handle.allocate_buffer(TEST_DEVICE_BLOCK_SIZE as usize).await;
2262 assert_eq!(
2263 handle.read(0, buf.as_mut()).await.expect("read failed"),
2264 TEST_DATA.len()
2265 );
2266 assert_eq!(&buf.as_slice()[..TEST_DATA.len()], TEST_DATA);
2267 }
2268
2269 let root_directory =
2271 Directory::open(&root_store, root_store.root_directory_object_id())
2272 .await
2273 .expect("open failed");
2274 let mut transaction = fs
2275 .clone()
2276 .new_transaction(
2277 lock_keys![LockKey::object(
2278 root_store.store_object_id(),
2279 root_store.root_directory_object_id(),
2280 )],
2281 Options::default(),
2282 )
2283 .await
2284 .expect("new_transaction failed");
2285 let handle = root_directory
2286 .create_child_file(&mut transaction, "test2")
2287 .await
2288 .expect("create_child_file failed");
2289 transaction.commit().await.expect("commit failed");
2290 let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
2291 buf.as_mut_slice().copy_from_slice(TEST_DATA);
2292 handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2293 fs.sync(SyncOptions::default()).await.expect("sync failed");
2294 object_ids.push(handle.object_id());
2295 }
2296
2297 fs.close().await.expect("close failed");
2298 let device = fs.take_device().await;
2299 device.reopen(false);
2300 let fs = FxFilesystem::open(device).await.expect("open failed");
2301 {
2302 fsck(fs.clone()).await.expect("fsck failed");
2303
2304 for &object_id in object_ids[0..1].iter().chain(object_ids.last().cloned().iter()) {
2306 let handle = ObjectStore::open_object(
2307 &fs.root_store(),
2308 object_id,
2309 HandleOptions::default(),
2310 None,
2311 )
2312 .await
2313 .unwrap_or_else(|e| {
2314 panic!("open_object failed (object_id: {}): {:?}", object_id, e)
2315 });
2316 let mut buf = handle.allocate_buffer(TEST_DEVICE_BLOCK_SIZE as usize).await;
2317 assert_eq!(
2318 handle.read(0, buf.as_mut()).await.expect("read failed"),
2319 TEST_DATA.len()
2320 );
2321 assert_eq!(&buf.as_slice()[..TEST_DATA.len()], TEST_DATA);
2322 }
2323 }
2324 fs.close().await.expect("close failed");
2325 }
2326
2327 #[fuchsia::test]
2328 async fn test_discard() {
2329 let device = {
2330 let device = DeviceHolder::new(FakeDevice::new(8192, 4096));
2331 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2332 let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
2333
2334 let store = root_volume
2335 .new_volume("test", NewChildStoreOptions::default())
2336 .await
2337 .expect("new_volume failed");
2338 let root_directory = Directory::open(&store, store.root_directory_object_id())
2339 .await
2340 .expect("open failed");
2341
2342 let mut i = 0;
2344 loop {
2345 let mut transaction = fs
2346 .clone()
2347 .new_transaction(
2348 lock_keys![LockKey::object(
2349 store.store_object_id(),
2350 store.root_directory_object_id()
2351 )],
2352 Options::default(),
2353 )
2354 .await
2355 .expect("new_transaction failed");
2356 root_directory
2357 .create_child_file(&mut transaction, &format!("a {i}"))
2358 .await
2359 .expect("create_child_file failed");
2360 if transaction.commit().await.expect("commit failed") > super::CHUNK_SIZE {
2361 break;
2362 }
2363 i += 1;
2364 }
2365
2366 fs.journal().force_compact().await.expect("compact failed");
2368 fs.journal().stop_compactions().await;
2369
2370 let mut i = 0;
2372 loop {
2373 let mut transaction = fs
2374 .clone()
2375 .new_transaction(
2376 lock_keys![LockKey::object(
2377 store.store_object_id(),
2378 store.root_directory_object_id()
2379 )],
2380 Options::default(),
2381 )
2382 .await
2383 .expect("new_transaction failed");
2384 root_directory
2385 .create_child_file(&mut transaction, &format!("b {i}"))
2386 .await
2387 .expect("create_child_file failed");
2388 if transaction.commit().await.expect("commit failed") > 2 * super::CHUNK_SIZE {
2389 break;
2390 }
2391 i += 1;
2392 }
2393
2394 fasync::Timer::new(MonotonicDuration::from_millis(10)).await;
2396 fs.device().snapshot().expect("snapshot failed")
2399 };
2400
2401 let fs = FxFilesystem::open(device).await.expect("open failed");
2402
2403 {
2404 let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
2405
2406 let store =
2407 root_volume.volume("test", StoreOptions::default()).await.expect("volume failed");
2408
2409 let root_directory = Directory::open(&store, store.root_directory_object_id())
2410 .await
2411 .expect("open failed");
2412
2413 let mut transaction = fs
2415 .clone()
2416 .new_transaction(
2417 lock_keys![LockKey::object(
2418 store.store_object_id(),
2419 store.root_directory_object_id()
2420 )],
2421 Options::default(),
2422 )
2423 .await
2424 .expect("new_transaction failed");
2425 root_directory
2426 .create_child_file(&mut transaction, &format!("d"))
2427 .await
2428 .expect("create_child_file failed");
2429 transaction.commit().await.expect("commit failed");
2430 }
2431
2432 fs.close().await.expect("close failed");
2433 let device = fs.take_device().await;
2434 device.reopen(false);
2435
2436 let fs = FxFilesystem::open(device).await.expect("open failed");
2437 fsck(fs.clone()).await.expect("fsck failed");
2438 fs.close().await.expect("close failed");
2439 }
2440
2441 #[fuchsia::test]
2442 async fn test_use_existing_generation() {
2443 let device = DeviceHolder::new(FakeDevice::new(8192, 4096));
2444
2445 let fs = FxFilesystemBuilder::new()
2447 .format(true)
2448 .image_builder_mode(Some(SuperBlockInstance::A))
2449 .open(device)
2450 .await
2451 .expect("open failed");
2452 fs.enable_allocations();
2453 let generation0 = fs.super_block_header().generation;
2454 assert_eq!(generation0, 1);
2455 fs.close().await.expect("close failed");
2456 let device = fs.take_device().await;
2457 device.reopen(false);
2458
2459 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2461 let generation1 = fs.super_block_header().generation;
2462 {
2463 let root_volume = crate::object_store::volume::root_volume(fs.clone())
2464 .await
2465 .expect("root_volume failed");
2466 root_volume
2467 .new_volume("test", crate::object_store::NewChildStoreOptions::default())
2468 .await
2469 .expect("new_volume failed");
2470 }
2471 fs.close().await.expect("close failed");
2472 let device = fs.take_device().await;
2473 device.reopen(false);
2474
2475 let fs = FxFilesystemBuilder::new()
2477 .format(true)
2478 .image_builder_mode(Some(SuperBlockInstance::A))
2479 .open(device)
2480 .await
2481 .expect("open failed");
2482 fs.enable_allocations();
2483 let generation2 = fs.super_block_header().generation;
2484 assert!(
2485 generation2 > generation1,
2486 "generation2 ({}) should be greater than generation1 ({})",
2487 generation2,
2488 generation1
2489 );
2490 fs.close().await.expect("close failed");
2491 }
2492
2493 #[fuchsia::test]
2494 async fn test_image_builder_mode_generation_bump_512_byte_block() {
2495 let device = DeviceHolder::new(FakeDevice::new(16384, 512));
2496
2497 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2499 let generation1 = fs.super_block_header().generation;
2500 fs.close().await.expect("close failed");
2501 let device = fs.take_device().await;
2502 device.reopen(false);
2503
2504 let fs = FxFilesystemBuilder::new()
2506 .format(true)
2507 .image_builder_mode(Some(SuperBlockInstance::A))
2508 .open(device)
2509 .await
2510 .expect("open failed");
2511
2512 fs.enable_allocations();
2513 let generation2 = fs.super_block_header().generation;
2514 assert!(
2515 generation2 > generation1,
2516 "Expected generation bump, got {} vs {}",
2517 generation2,
2518 generation1
2519 );
2520 fs.close().await.expect("close failed");
2521 }
2522
2523 #[fuchsia::test(allow_stalls = false)]
2524 #[cfg(target_os = "fuchsia")]
2525 async fn test_low_priority_compaction() {
2526 use fuchsia_async::TestExecutor;
2527 use std::sync::Arc;
2528 use std::sync::atomic::{AtomicBool, Ordering};
2529
2530 let device = DeviceHolder::new(FakeDevice::new(16384, TEST_DEVICE_BLOCK_SIZE));
2531 let fs = FxFilesystemBuilder::new()
2532 .journal_options(super::JournalOptions { reclaim_size: 65536, ..Default::default() })
2533 .format(true)
2534 .open(device)
2535 .await
2536 .expect("open failed");
2537
2538 let _low = fasync::LowPriorityTask::new();
2539
2540 {
2542 let root_store = fs.root_store();
2543 let root_directory =
2544 Directory::open(&root_store, root_store.root_directory_object_id())
2545 .await
2546 .expect("open failed");
2547 for i in 0..100 {
2548 let mut transaction = fs
2549 .clone()
2550 .new_transaction(
2551 lock_keys![LockKey::object(
2552 root_store.store_object_id(),
2553 root_store.root_directory_object_id(),
2554 )],
2555 Options::default(),
2556 )
2557 .await
2558 .expect("new_transaction failed");
2559 root_directory
2560 .create_child_file(&mut transaction, &format!("test{}", i))
2561 .await
2562 .expect("create_child_file failed");
2563 transaction.commit().await.expect("commit failed");
2564 }
2565 }
2566
2567 let stop = Arc::new(AtomicBool::new(false));
2569 let stop_clone = stop.clone();
2570 let _normal_task = fasync::Task::spawn(async move {
2571 while !stop_clone.load(Ordering::Relaxed) {
2572 fasync::Timer::new(fasync::MonotonicInstant::after(
2573 MonotonicDuration::from_millis(1),
2574 ))
2575 .await;
2576 }
2577 });
2578
2579 {
2582 let root_store = fs.root_store();
2583 let root_directory =
2584 Directory::open(&root_store, root_store.root_directory_object_id())
2585 .await
2586 .expect("open failed");
2587 let mut i = 0;
2588 loop {
2589 let mut transaction = fs
2590 .clone()
2591 .new_transaction(
2592 lock_keys![LockKey::object(
2593 root_store.store_object_id(),
2594 root_store.root_directory_object_id(),
2595 )],
2596 Options::default(),
2597 )
2598 .await
2599 .expect("new_transaction failed");
2600 root_directory
2601 .create_child_file(&mut transaction, &format!("trigger{i}"))
2602 .await
2603 .expect("create_child_file failed");
2604 transaction.commit().await.expect("commit failed");
2605
2606 if fs.journal().inner.lock().compaction_running {
2607 break;
2608 }
2609 TestExecutor::advance_to(fasync::MonotonicInstant::after(
2610 MonotonicDuration::from_millis(1),
2611 ))
2612 .await;
2613 i += 1;
2614 }
2615 }
2616
2617 for _ in 0..10 {
2620 TestExecutor::advance_to(fasync::MonotonicInstant::after(
2621 MonotonicDuration::from_millis(1),
2622 ))
2623 .await;
2624 assert!(fs.journal().inner.lock().compaction_running);
2625 }
2626
2627 stop.store(true, Ordering::Relaxed);
2629 TestExecutor::advance_to(fasync::MonotonicInstant::after(MonotonicDuration::from_millis(
2630 1,
2631 )))
2632 .await;
2633
2634 assert!(fs.journal().inner.lock().compaction_running);
2637
2638 for _ in 0..3 {
2640 TestExecutor::advance_to(fasync::MonotonicInstant::after(
2641 MonotonicDuration::from_millis(1),
2642 ))
2643 .await;
2644 assert!(fs.journal().inner.lock().compaction_running);
2645 }
2646
2647 TestExecutor::advance_to(fasync::MonotonicInstant::after(MonotonicDuration::from_millis(
2649 1,
2650 )))
2651 .await;
2652
2653 let _ = TestExecutor::poll_until_stalled(std::future::pending::<()>()).await;
2655 assert!(!fs.journal().inner.lock().compaction_running);
2656
2657 fs.close().await.expect("Close failed");
2658 }
2659
2660 #[fuchsia::test(allow_stalls = false)]
2661 #[cfg(target_os = "fuchsia")]
2662 async fn test_low_priority_compaction_deadline() {
2663 use fuchsia_async::TestExecutor;
2664 use std::sync::Arc;
2665 use std::sync::atomic::{AtomicBool, Ordering};
2666
2667 let device = DeviceHolder::new(FakeDevice::new(216384, TEST_DEVICE_BLOCK_SIZE));
2668 let fs = FxFilesystemBuilder::new()
2669 .journal_options(super::JournalOptions { reclaim_size: 65536, ..Default::default() })
2670 .format(true)
2671 .open(device)
2672 .await
2673 .expect("open failed");
2674
2675 let _low = fasync::LowPriorityTask::new();
2676
2677 {
2679 let root_store = fs.root_store();
2680 let root_directory =
2681 Directory::open(&root_store, root_store.root_directory_object_id())
2682 .await
2683 .expect("open failed");
2684 for i in 0..10 {
2685 let mut transaction = fs
2686 .clone()
2687 .new_transaction(
2688 lock_keys![LockKey::object(
2689 root_store.store_object_id(),
2690 root_store.root_directory_object_id(),
2691 )],
2692 Options::default(),
2693 )
2694 .await
2695 .expect("new_transaction failed");
2696 root_directory
2697 .create_child_file(&mut transaction, &format!("test{}", i))
2698 .await
2699 .expect("create_child_file failed");
2700 transaction.commit().await.expect("commit failed");
2701 }
2702 }
2703
2704 let stop = Arc::new(AtomicBool::new(false));
2706 let stop_clone = stop.clone();
2707 let _normal_task = fasync::Task::spawn(async move {
2708 while !stop_clone.load(Ordering::Relaxed) {
2709 fasync::Timer::new(fasync::MonotonicInstant::after(
2710 MonotonicDuration::from_millis(3),
2711 ))
2712 .await;
2713 }
2714 });
2715
2716 {
2718 let root_store = fs.root_store();
2719 let root_directory =
2720 Directory::open(&root_store, root_store.root_directory_object_id())
2721 .await
2722 .expect("open failed");
2723 let mut i = 0;
2724 loop {
2725 let mut transaction = fs
2726 .clone()
2727 .new_transaction(
2728 lock_keys![LockKey::object(
2729 root_store.store_object_id(),
2730 root_store.root_directory_object_id(),
2731 )],
2732 Options::default(),
2733 )
2734 .await
2735 .expect("new_transaction failed");
2736 root_directory
2737 .create_child_file(&mut transaction, &format!("trigger{i}"))
2738 .await
2739 .expect("create_child_file failed");
2740 transaction.commit().await.expect("commit failed");
2741
2742 if fs.journal().inner.lock().compaction_running {
2743 break;
2744 }
2745 TestExecutor::advance_to(fasync::MonotonicInstant::after(
2746 MonotonicDuration::from_millis(1),
2747 ))
2748 .await;
2749 i += 1;
2750 }
2751 }
2752
2753 let mut count = 0;
2756 for _ in 0..1000 {
2757 TestExecutor::advance_to(fasync::MonotonicInstant::after(
2758 MonotonicDuration::from_millis(20),
2759 ))
2760 .await;
2761 if !fs.journal().inner.lock().compaction_running {
2762 break;
2763 }
2764 count += 1;
2765 }
2766 assert!(!fs.journal().inner.lock().compaction_running);
2767
2768 assert!(count > 200);
2771
2772 stop.store(true, Ordering::Relaxed);
2773 fs.close().await.expect("Close failed");
2774 }
2775
2776 #[fuchsia::test(allow_stalls = false)]
2777 #[cfg(target_os = "fuchsia")]
2778 async fn test_low_priority_compaction_no_yielding_when_full() {
2779 use fuchsia_async::TestExecutor;
2780 use std::sync::Arc;
2781 use std::sync::atomic::{AtomicBool, Ordering};
2782
2783 let reclaim_size = 65536;
2784 let device = DeviceHolder::new(FakeDevice::new(216384, TEST_DEVICE_BLOCK_SIZE));
2785 let fs = FxFilesystemBuilder::new()
2786 .journal_options(super::JournalOptions { reclaim_size, ..Default::default() })
2787 .format(true)
2788 .open(device)
2789 .await
2790 .expect("open failed");
2791
2792 let _low = fasync::LowPriorityTask::new();
2793
2794 {
2796 let root_store = fs.root_store();
2797 let root_directory =
2798 Directory::open(&root_store, root_store.root_directory_object_id())
2799 .await
2800 .expect("open failed");
2801 for i in 0..10 {
2802 let mut transaction = fs
2803 .clone()
2804 .new_transaction(
2805 lock_keys![LockKey::object(
2806 root_store.store_object_id(),
2807 root_store.root_directory_object_id(),
2808 )],
2809 Options::default(),
2810 )
2811 .await
2812 .expect("new_transaction failed");
2813 root_directory
2814 .create_child_file(&mut transaction, &format!("test{}", i))
2815 .await
2816 .expect("create_child_file failed");
2817 transaction.commit().await.expect("commit failed");
2818 }
2819 }
2820
2821 let stop = Arc::new(AtomicBool::new(false));
2823 let stop_clone = stop.clone();
2824 let _normal_task = fasync::Task::spawn(async move {
2825 while !stop_clone.load(Ordering::Relaxed) {
2826 fasync::Timer::new(fasync::MonotonicInstant::after(
2827 MonotonicDuration::from_millis(3),
2828 ))
2829 .await;
2830 }
2831 });
2832
2833 {
2835 let root_store = fs.root_store();
2836 let root_directory =
2837 Directory::open(&root_store, root_store.root_directory_object_id())
2838 .await
2839 .expect("open failed");
2840 let mut i = 0;
2841 loop {
2842 let mut transaction = fs
2843 .clone()
2844 .new_transaction(
2845 lock_keys![LockKey::object(
2846 root_store.store_object_id(),
2847 root_store.root_directory_object_id(),
2848 )],
2849 Options::default(),
2850 )
2851 .await
2852 .expect("new_transaction failed");
2853 root_directory
2854 .create_child_file(&mut transaction, &format!("trigger{i}"))
2855 .await
2856 .expect("create_child_file failed");
2857 transaction.commit().await.expect("commit failed");
2858
2859 let outstanding = {
2860 let inner = fs.journal().inner.lock();
2861 fs.journal().objects.last_end_offset()
2862 - inner.super_block_header.journal_checkpoint.file_offset
2863 };
2864 if outstanding >= reclaim_size * 7 / 8 {
2865 break;
2866 }
2867 i += 1;
2869 }
2870 }
2871
2872 TestExecutor::advance_to(fasync::MonotonicInstant::after(MonotonicDuration::from_millis(
2874 4,
2875 )))
2876 .await;
2877
2878 let _ = TestExecutor::poll_until_stalled(std::future::pending::<()>()).await;
2880 assert!(!fs.journal().inner.lock().compaction_running);
2881
2882 stop.store(true, Ordering::Relaxed);
2883 fs.close().await.expect("Close failed");
2884 }
2885}
2886
2887#[cfg(fuzz)]
2888mod fuzz {
2889 use fuzz::fuzz;
2890
2891 #[fuzz]
2892 fn fuzz_journal_bytes(input: Vec<u8>) {
2893 use crate::filesystem::FxFilesystem;
2894 use fuchsia_async as fasync;
2895 use std::io::Write;
2896 use storage_device::DeviceHolder;
2897 use storage_device::fake_device::FakeDevice;
2898
2899 fasync::SendExecutorBuilder::new().num_threads(4).build().run(async move {
2900 let device = DeviceHolder::new(FakeDevice::new(32768, 512));
2901 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2902 fs.journal().inner.lock().writer.write_all(&input).expect("write failed");
2903 fs.close().await.expect("close failed");
2904 let device = fs.take_device().await;
2905 device.reopen(false);
2906 if let Ok(fs) = FxFilesystem::open(device).await {
2907 let _ = fs.close().await;
2910 }
2911 });
2912 }
2913
2914 #[fuzz]
2915 fn fuzz_journal(input: Vec<super::JournalRecord>) {
2916 use crate::filesystem::FxFilesystem;
2917 use fuchsia_async as fasync;
2918 use storage_device::DeviceHolder;
2919 use storage_device::fake_device::FakeDevice;
2920
2921 fasync::SendExecutorBuilder::new().num_threads(4).build().run(async move {
2922 let device = DeviceHolder::new(FakeDevice::new(32768, 512));
2923 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2924 {
2925 let mut inner = fs.journal().inner.lock();
2926 for record in &input {
2927 let _ = inner.writer.write_record(record);
2928 }
2929 }
2930 fs.close().await.expect("close failed");
2931 let device = fs.take_device().await;
2932 device.reopen(false);
2933 if let Ok(fs) = FxFilesystem::open(device).await {
2934 let _ = fs.close().await;
2937 }
2938 });
2939 }
2940}