1mod bootstrap_handle;
20mod checksum_list;
21mod reader;
22pub mod super_block;
23mod writer;
24
25use crate::checksum::{Checksum, Checksums, ChecksumsV38};
26use crate::errors::FxfsError;
27use crate::filesystem::{ApplyContext, ApplyMode, FxFilesystem, SyncOptions};
28use crate::log::*;
29use crate::lsm_tree::cache::NullCache;
30use crate::lsm_tree::types::Layer;
31use crate::object_handle::{ObjectHandle as _, ReadObjectHandle};
32use crate::object_store::allocator::Allocator;
33use crate::object_store::data_object_handle::OverwriteOptions;
34use crate::object_store::extent_record::{ExtentMode, ExtentValue};
35use crate::object_store::graveyard::Graveyard;
36use crate::object_store::journal::bootstrap_handle::BootstrapObjectHandle;
37use crate::object_store::journal::checksum_list::ChecksumList;
38use crate::object_store::journal::reader::{JournalReader, ReadResult};
39use crate::object_store::journal::super_block::{
40 SuperBlockHeader, SuperBlockInstance, SuperBlockManager,
41};
42use crate::object_store::journal::writer::JournalWriter;
43use crate::object_store::object_manager::ObjectManager;
44use crate::object_store::object_record::{AttributeKey, ObjectKey, ObjectKeyData, ObjectValue};
45use crate::object_store::transaction::{
46 AllocatorMutation, LockKey, Mutation, MutationV40, MutationV41, MutationV43, MutationV46,
47 MutationV47, MutationV49, MutationV50, MutationV54, MutationV55, ObjectStoreMutation, Options,
48 TRANSACTION_MAX_JOURNAL_USAGE, Transaction, TxnMutation, lock_keys,
49};
50use crate::object_store::{
51 AssocObj, AttributeId, DataObjectHandle, Extent, HandleOptions, HandleOwner, INVALID_OBJECT_ID,
52 Item, ItemRef, NewChildStoreOptions, ObjectStore, ReservedId,
53};
54use crate::range::RangeExt;
55use crate::round::{round_div, round_down};
56use crate::serialized_types::{
57 LATEST_VERSION, Migrate, Version, Versioned, migrate_nodefault, migrate_to_version,
58};
59use anyhow::{Context, Error, anyhow, bail, ensure};
60use event_listener::Event;
61use fprint::TypeFingerprint;
62use fuchsia_inspect::NumericProperty;
63use fuchsia_sync::Mutex;
64use futures::FutureExt as _;
65use futures::future::poll_fn;
66use rustc_hash::FxHashMap as HashMap;
67use serde::{Deserialize, Serialize};
68use static_assertions::const_assert;
69use std::clone::Clone;
70use std::collections::HashSet;
71use std::num::NonZero;
72use std::ops::{Bound, Range};
73use std::sync::atomic::{AtomicBool, Ordering};
74use std::sync::{Arc, OnceLock};
75use std::task::{Poll, Waker};
76use storage_device::Device;
77
78pub const BLOCK_SIZE: u64 = 4096;
80
81const CHUNK_SIZE: u64 = 131_072;
83const_assert!(CHUNK_SIZE > TRANSACTION_MAX_JOURNAL_USAGE);
84
85pub const DEFAULT_RECLAIM_SIZE: u64 = 524_288;
87
88pub const RESERVED_SPACE: u64 = 1_048_576;
91
92const RESET_XOR: u64 = 0xffffffffffffffff;
97
98pub type JournalCheckpoint = JournalCheckpointV32;
102
103#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize, TypeFingerprint)]
104pub struct JournalCheckpointV32 {
105 pub file_offset: u64,
106
107 pub checksum: Checksum,
110
111 pub version: Version,
115}
116
117pub type JournalRecord = JournalRecordV55;
118
119#[allow(clippy::large_enum_variant)]
120#[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint, Versioned)]
121#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
122pub enum JournalRecordV55 {
123 EndBlock,
124 Mutation {
125 object_id: u64,
126 mutation: MutationV55,
127 },
128 Commit,
130 Discard(u64),
132 DidFlushDevice(u64),
142 DataChecksums(Range<u64>, crate::checksum::ChecksumsV38, bool),
150}
151
152#[allow(clippy::large_enum_variant)]
153#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
154#[migrate_to_version(JournalRecordV55)]
155#[migrate_nodefault]
156pub enum JournalRecordV54 {
157 EndBlock,
158 Mutation { object_id: u64, mutation: MutationV54 },
159 Commit,
160 Discard(u64),
161 DidFlushDevice(u64),
162 DataChecksums(Range<u64>, crate::checksum::ChecksumsV38, bool),
163}
164
165#[allow(clippy::large_enum_variant)]
166#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
167#[migrate_to_version(JournalRecordV54)]
168#[migrate_nodefault]
169pub enum JournalRecordV50 {
170 EndBlock,
171 Mutation { object_id: u64, mutation: MutationV50 },
172 Commit,
173 Discard(u64),
174 DidFlushDevice(u64),
175 DataChecksums(Range<u64>, ChecksumsV38, bool),
176}
177
178#[allow(clippy::large_enum_variant)]
179#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
180#[migrate_to_version(JournalRecordV50)]
181pub enum JournalRecordV49 {
182 EndBlock,
183 Mutation { object_id: u64, mutation: MutationV49 },
184 Commit,
185 Discard(u64),
186 DidFlushDevice(u64),
187 DataChecksums(Range<u64>, ChecksumsV38, bool),
188}
189
190#[allow(clippy::large_enum_variant)]
191#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
192#[migrate_to_version(JournalRecordV49)]
193pub enum JournalRecordV47 {
194 EndBlock,
195 Mutation { object_id: u64, mutation: MutationV47 },
196 Commit,
197 Discard(u64),
198 DidFlushDevice(u64),
199 DataChecksums(Range<u64>, ChecksumsV38, bool),
200}
201
202#[allow(clippy::large_enum_variant)]
203#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
204#[migrate_to_version(JournalRecordV47)]
205pub enum JournalRecordV46 {
206 EndBlock,
207 Mutation { object_id: u64, mutation: MutationV46 },
208 Commit,
209 Discard(u64),
210 DidFlushDevice(u64),
211 DataChecksums(Range<u64>, ChecksumsV38, bool),
212}
213
214#[allow(clippy::large_enum_variant)]
215#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
216#[migrate_to_version(JournalRecordV46)]
217pub enum JournalRecordV43 {
218 EndBlock,
219 Mutation { object_id: u64, mutation: MutationV43 },
220 Commit,
221 Discard(u64),
222 DidFlushDevice(u64),
223 DataChecksums(Range<u64>, ChecksumsV38, bool),
224}
225
226#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
227#[migrate_to_version(JournalRecordV43)]
228pub enum JournalRecordV42 {
229 EndBlock,
230 Mutation { object_id: u64, mutation: MutationV41 },
231 Commit,
232 Discard(u64),
233 DidFlushDevice(u64),
234 DataChecksums(Range<u64>, ChecksumsV38, bool),
235}
236
237#[derive(Serialize, Deserialize, TypeFingerprint, Versioned)]
238pub enum JournalRecordV41 {
239 EndBlock,
240 Mutation { object_id: u64, mutation: MutationV41 },
241 Commit,
242 Discard(u64),
243 DidFlushDevice(u64),
244 DataChecksums(Range<u64>, ChecksumsV38),
245}
246
247impl From<JournalRecordV41> for JournalRecordV42 {
248 fn from(record: JournalRecordV41) -> Self {
249 match record {
250 JournalRecordV41::EndBlock => Self::EndBlock,
251 JournalRecordV41::Mutation { object_id, mutation } => {
252 Self::Mutation { object_id, mutation: mutation.into() }
253 }
254 JournalRecordV41::Commit => Self::Commit,
255 JournalRecordV41::Discard(offset) => Self::Discard(offset),
256 JournalRecordV41::DidFlushDevice(offset) => Self::DidFlushDevice(offset),
257 JournalRecordV41::DataChecksums(range, sums) => {
258 Self::DataChecksums(range, sums, true)
261 }
262 }
263 }
264}
265
266#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
267#[migrate_to_version(JournalRecordV41)]
268pub enum JournalRecordV40 {
269 EndBlock,
270 Mutation { object_id: u64, mutation: MutationV40 },
271 Commit,
272 Discard(u64),
273 DidFlushDevice(u64),
274 DataChecksums(Range<u64>, ChecksumsV38),
275}
276
277pub(super) fn journal_handle_options() -> HandleOptions {
278 HandleOptions { skip_journal_checks: true, ..Default::default() }
279}
280
281pub struct Journal {
287 objects: Arc<ObjectManager>,
288 handle: OnceLock<DataObjectHandle<ObjectStore>>,
289 super_block_manager: SuperBlockManager,
290 inner: Mutex<Inner>,
291 writer_mutex: Mutex<()>,
292 sync_mutex: futures::lock::Mutex<()>,
293 trace: AtomicBool,
294
295 reclaim_event: Event,
297}
298
299struct Inner {
300 super_block_header: SuperBlockHeader,
301
302 zero_offset: Option<u64>,
304
305 device_flushed_offset: u64,
307
308 needs_did_flush_device: bool,
310
311 writer: JournalWriter,
313
314 output_reset_version: bool,
317
318 flush_waker: Option<Waker>,
320
321 terminate: bool,
323
324 terminate_reason: Option<Error>,
326
327 disable_compactions: bool,
329
330 compaction_running: bool,
332
333 sync_waker: Option<Waker>,
335
336 flushed_offset: u64,
338
339 valid_to: u64,
344
345 discard_offset: Option<u64>,
349
350 reclaim_size: u64,
354
355 image_builder_mode: Option<SuperBlockInstance>,
356
357 barriers_enabled: bool,
361
362 needs_barrier: bool,
365
366 forced_compaction: bool,
368}
369
370impl Inner {
371 fn terminate(&mut self, reason: Option<Error>) {
372 self.terminate = true;
373
374 if let Some(err) = reason {
375 error!(error:? = err; "Terminating journal");
376 if let Some(prev_err) = self.terminate_reason.as_ref() {
378 error!(error:? = prev_err; "Journal previously terminated");
379 } else {
380 self.terminate_reason = Some(err);
381 }
382 }
383
384 if let Some(waker) = self.flush_waker.take() {
385 waker.wake();
386 }
387 if let Some(waker) = self.sync_waker.take() {
388 waker.wake();
389 }
390 }
391}
392
393pub struct JournalOptions {
394 pub reclaim_size: u64,
398
399 pub barriers_enabled: bool,
403}
404
405impl Default for JournalOptions {
406 fn default() -> Self {
407 JournalOptions { reclaim_size: DEFAULT_RECLAIM_SIZE, barriers_enabled: false }
408 }
409}
410
411struct JournaledTransactions {
412 transactions: Vec<JournaledTransaction>,
413 device_flushed_offset: u64,
414}
415
416#[derive(Debug, Default)]
417pub struct JournaledTransaction {
418 pub checkpoint: JournalCheckpoint,
419 pub root_parent_mutations: Vec<Mutation>,
420 pub root_mutations: Vec<Mutation>,
421 pub non_root_mutations: Vec<(u64, Mutation)>,
423 pub end_offset: u64,
424 pub checksums: Vec<JournaledChecksums>,
425
426 pub end_flush: Option<(u64, u64)>,
429
430 pub volume_deleted: Option<u64>,
432}
433
434impl JournaledTransaction {
435 fn new(checkpoint: JournalCheckpoint) -> Self {
436 Self { checkpoint, ..Default::default() }
437 }
438}
439
440const VOLUME_DELETED: u64 = u64::MAX;
441
442#[derive(Debug)]
443pub struct JournaledChecksums {
444 pub device_range: Range<u64>,
445 pub checksums: Checksums,
446 pub first_write: bool,
447}
448
449pub trait JournalHandle: ReadObjectHandle {
452 fn end_offset(&self) -> Option<u64>;
456 fn push_extent(&mut self, added_offset: u64, device_range: Range<u64>);
460 fn discard_extents(&mut self, discard_offset: u64);
462}
463
464impl<S: HandleOwner> JournalHandle for DataObjectHandle<S> {
468 fn end_offset(&self) -> Option<u64> {
469 None
470 }
471 fn push_extent(&mut self, _added_offset: u64, _device_range: Range<u64>) {
472 }
474 fn discard_extents(&mut self, _discard_offset: u64) {
475 }
477}
478
479#[fxfs_trace::trace]
480impl Journal {
481 pub fn new(objects: Arc<ObjectManager>, options: JournalOptions) -> Journal {
482 let starting_checksum = rand::random_range(1..u64::MAX);
483 Journal {
484 objects: objects,
485 handle: OnceLock::new(),
486 super_block_manager: SuperBlockManager::new(),
487 inner: Mutex::new(Inner {
488 super_block_header: SuperBlockHeader::default(),
489 zero_offset: None,
490 device_flushed_offset: 0,
491 needs_did_flush_device: false,
492 writer: JournalWriter::new(BLOCK_SIZE as usize, starting_checksum),
493 output_reset_version: false,
494 flush_waker: None,
495 terminate: false,
496 terminate_reason: None,
497 disable_compactions: false,
498 compaction_running: false,
499 sync_waker: None,
500 flushed_offset: 0,
501 valid_to: 0,
502 discard_offset: None,
503 reclaim_size: options.reclaim_size,
504 image_builder_mode: None,
505 barriers_enabled: options.barriers_enabled,
506 needs_barrier: false,
507 forced_compaction: false,
508 }),
509 writer_mutex: Mutex::new(()),
510 sync_mutex: futures::lock::Mutex::new(()),
511 trace: AtomicBool::new(false),
512 reclaim_event: Event::new(),
513 }
514 }
515
516 pub fn set_trace(&self, trace: bool) {
517 let old_value = self.trace.swap(trace, Ordering::Relaxed);
518 if trace != old_value {
519 info!(trace; "J: trace");
520 }
521 }
522
523 pub fn set_image_builder_mode(&self, mode: Option<SuperBlockInstance>) {
524 self.inner.lock().image_builder_mode = mode;
525 if let Some(instance) = mode {
526 *self.super_block_manager.next_instance.lock() = instance;
527 }
528 }
529
530 pub fn image_builder_mode(&self) -> Option<SuperBlockInstance> {
531 self.inner.lock().image_builder_mode
532 }
533
534 #[cfg(feature = "migration")]
535 pub fn set_filesystem_uuid(&self, uuid: &[u8; 16]) -> Result<(), Error> {
536 ensure!(
537 self.inner.lock().image_builder_mode.is_some(),
538 "Can only set filesystem uuid in image builder mode."
539 );
540 self.inner.lock().super_block_header.guid.0 = uuid::Uuid::from_bytes(*uuid);
541 Ok(())
542 }
543
544 pub(crate) async fn read_superblocks(
545 &self,
546 device: Arc<dyn Device>,
547 block_size: u64,
548 ) -> Result<(SuperBlockHeader, ObjectStore), Error> {
549 self.super_block_manager.load(device, block_size).await
550 }
551
552 fn validate_mutation(&self, mutation: &Mutation, block_size: u64, device_size: u64) -> bool {
556 match mutation {
557 Mutation::ObjectStore(ObjectStoreMutation {
558 item:
559 Item {
560 key:
561 ObjectKey {
562 data: ObjectKeyData::Attribute(_, AttributeKey::Extent(extent)),
563 ..
564 },
565 value: ObjectValue::Extent(ExtentValue::Some { device_offset, mode, .. }),
566 ..
567 },
568 ..
569 }) => {
570 if extent.is_empty() || !extent.is_aligned(block_size) {
571 return false;
572 }
573 let len = extent.length().unwrap();
574 if let ExtentMode::Cow(checksums) = mode {
575 if checksums.len() > 0 {
576 if len % checksums.len() as u64 != 0 {
577 return false;
578 }
579 if (len / checksums.len() as u64) % block_size != 0 {
580 return false;
581 }
582 }
583 }
584 if *device_offset % block_size != 0
585 || *device_offset >= device_size
586 || device_size - *device_offset < len
587 {
588 return false;
589 }
590 }
591 Mutation::ObjectStore(_) => {}
592 Mutation::EncryptedObjectStore(_) => {}
593 Mutation::Allocator(AllocatorMutation::Allocate { device_range, owner_object_id }) => {
594 return !device_range.is_empty()
595 && *owner_object_id != INVALID_OBJECT_ID
596 && device_range.end <= device_size;
597 }
598 Mutation::Allocator(AllocatorMutation::Deallocate {
599 device_range,
600 owner_object_id,
601 }) => {
602 return !device_range.is_empty()
603 && *owner_object_id != INVALID_OBJECT_ID
604 && device_range.end <= device_size;
605 }
606 Mutation::Allocator(AllocatorMutation::MarkForDeletion(owner_object_id)) => {
607 return *owner_object_id != INVALID_OBJECT_ID;
608 }
609 Mutation::Allocator(AllocatorMutation::SetLimit { owner_object_id, .. }) => {
610 return *owner_object_id != INVALID_OBJECT_ID;
611 }
612 Mutation::BeginFlush => {}
613 Mutation::EndFlush => {}
614 Mutation::DeleteVolume => {}
615 Mutation::UpdateBorrowed(_) => {}
616 Mutation::UpdateMutationsKey(_) => {}
617 Mutation::CreateInternalDir(owner_object_id) => {
618 return *owner_object_id != INVALID_OBJECT_ID;
619 }
620 }
621 true
622 }
623
624 fn update_checksum_list(
626 &self,
627 journal_offset: u64,
628 mutation: &Mutation,
629 checksum_list: &mut ChecksumList,
630 ) -> Result<(), Error> {
631 match mutation {
632 Mutation::ObjectStore(_) => {}
633 Mutation::Allocator(AllocatorMutation::Deallocate { device_range, .. }) => {
634 checksum_list.mark_deallocated(journal_offset, device_range.clone().into());
635 }
636 _ => {}
637 }
638 Ok(())
639 }
640
641 #[trace]
643 pub async fn replay(
644 &self,
645 filesystem: Arc<FxFilesystem>,
646 on_new_allocator: Option<Box<dyn Fn(Arc<Allocator>) + Send + Sync>>,
647 ) -> Result<(), Error> {
648 let block_size = filesystem.block_size();
649
650 let (super_block, root_parent) =
651 self.super_block_manager.load(filesystem.device(), block_size).await?;
652
653 let root_parent = Arc::new(ObjectStore::attach_filesystem(root_parent, filesystem.clone()));
654
655 self.objects.set_root_parent_store(root_parent.clone());
656 let allocator =
657 Arc::new(Allocator::new(filesystem.clone(), super_block.allocator_object_id));
658 if let Some(on_new_allocator) = on_new_allocator {
659 on_new_allocator(allocator.clone());
660 }
661 self.objects.set_allocator(allocator.clone());
662 self.objects.set_borrowed_metadata_space(super_block.borrowed_metadata_space);
663 self.objects.set_last_end_offset(super_block.super_block_journal_file_offset);
664 {
665 let mut inner = self.inner.lock();
666 inner.super_block_header = super_block.clone();
667 }
668
669 let device = filesystem.device();
670
671 let mut handle;
672 {
673 let root_parent_layer = root_parent.tree().mutable_layer();
674 let mut iter = root_parent_layer
675 .seek(Bound::Included(&ObjectKey::attribute(
676 super_block.journal_object_id,
677 AttributeId::DATA,
678 AttributeKey::Extent(Extent::search_key_from_offset(round_down(
679 super_block.journal_checkpoint.file_offset,
680 BLOCK_SIZE,
681 ))),
682 )))
683 .await
684 .context("Failed to seek root parent store")?;
685 let start_offset = if let Some(ItemRef {
686 key:
687 ObjectKey {
688 data:
689 ObjectKeyData::Attribute(AttributeId::DATA, AttributeKey::Extent(extent)),
690 ..
691 },
692 ..
693 }) = iter.get()
694 {
695 extent.start
696 } else {
697 0
698 };
699 handle = BootstrapObjectHandle::new_with_start_offset(
700 super_block.journal_object_id,
701 device.clone(),
702 start_offset,
703 );
704 while let Some(item) = iter.get() {
705 if !match item.into() {
706 Some((
707 object_id,
708 AttributeId::DATA,
709 extent,
710 ExtentValue::Some { device_offset, .. },
711 )) if object_id == super_block.journal_object_id => {
712 if let Some(end_offset) = handle.end_offset() {
713 if extent.start != end_offset {
714 bail!(anyhow!(FxfsError::Inconsistent).context(format!(
715 "Unexpected journal extent {:?}, expected start: {}",
716 item, end_offset
717 )));
718 }
719 }
720 handle.push_extent(
721 0, *device_offset
723 ..*device_offset + extent.length().context("Invalid extent")?,
724 );
725 true
726 }
727 _ => false,
728 } {
729 break;
730 }
731 iter.advance().await.context("Failed to advance root parent store iterator")?;
732 }
733 }
734
735 let mut reader = JournalReader::new(handle, &super_block.journal_checkpoint);
736 let JournaledTransactions { mut transactions, device_flushed_offset } = self
737 .read_transactions(&mut reader, None, INVALID_OBJECT_ID)
738 .await
739 .context("Reading transactions for replay")?;
740
741 let mut checksum_list = ChecksumList::new(device_flushed_offset);
743 let mut valid_to = reader.journal_file_checkpoint().file_offset;
744 let device_size = device.size();
745 'bad_replay: for JournaledTransaction {
746 checkpoint,
747 root_parent_mutations,
748 root_mutations,
749 non_root_mutations,
750 checksums,
751 ..
752 } in &transactions
753 {
754 for JournaledChecksums { device_range, checksums, first_write } in checksums {
755 checksum_list
756 .push(
757 checkpoint.file_offset,
758 device_range.clone(),
759 checksums.maybe_as_ref().context("Malformed checksums")?,
760 *first_write,
761 )
762 .context("Pushing journal checksum records to checksum list")?;
763 }
764 for mutation in root_parent_mutations
765 .iter()
766 .chain(root_mutations)
767 .chain(non_root_mutations.iter().map(|(_, m)| m))
768 {
769 if !self.validate_mutation(mutation, block_size, device_size) {
770 info!(mutation:?; "Stopping replay at bad mutation");
771 valid_to = checkpoint.file_offset;
772 break 'bad_replay;
773 }
774 self.update_checksum_list(checkpoint.file_offset, &mutation, &mut checksum_list)?;
775 }
776 }
777
778 let valid_to = checksum_list
781 .verify(device.as_ref(), valid_to)
782 .await
783 .context("Failed to validate checksums")?;
784
785 let mut last_checkpoint = reader.journal_file_checkpoint();
788 let mut journal_offsets = super_block.journal_file_offsets.clone();
789
790 for (
793 index,
794 JournaledTransaction {
795 checkpoint,
796 root_parent_mutations,
797 end_flush,
798 volume_deleted,
799 ..
800 },
801 ) in transactions.iter_mut().enumerate()
802 {
803 if checkpoint.file_offset >= valid_to {
804 last_checkpoint = checkpoint.clone();
805
806 transactions.truncate(index);
808 break;
809 }
810
811 let context = ApplyContext { mode: ApplyMode::Replay, checkpoint: checkpoint.clone() };
812 for mutation in root_parent_mutations.drain(..) {
813 self.objects
814 .apply_mutation(
815 super_block.root_parent_store_object_id,
816 mutation,
817 &context,
818 AssocObj::None,
819 )
820 .context("Failed to replay root parent store mutations")?;
821 }
822
823 if let Some((object_id, journal_offset)) = end_flush {
824 journal_offsets.insert(*object_id, *journal_offset);
825 }
826
827 if let Some(object_id) = volume_deleted {
828 journal_offsets.insert(*object_id, VOLUME_DELETED);
829 }
830 }
831
832 let root_store = ObjectStore::open(
834 &root_parent,
835 super_block.root_store_object_id,
836 Box::new(NullCache {}),
837 )
838 .await
839 .context("Unable to open root store")?;
840
841 ensure!(
842 !root_store.is_encrypted(),
843 anyhow!(FxfsError::Inconsistent).context("Root store is encrypted")
844 );
845 self.objects.set_root_store(root_store);
846
847 let root_store_offset =
848 journal_offsets.get(&super_block.root_store_object_id).copied().unwrap_or(0);
849
850 for JournaledTransaction { checkpoint, root_mutations, .. } in &mut transactions {
852 if checkpoint.file_offset < root_store_offset {
853 continue;
854 }
855
856 let context = ApplyContext { mode: ApplyMode::Replay, checkpoint: checkpoint.clone() };
857 for mutation in root_mutations.drain(..) {
858 self.objects
859 .apply_mutation(
860 super_block.root_store_object_id,
861 mutation,
862 &context,
863 AssocObj::None,
864 )
865 .context("Failed to replay root store mutations")?;
866 }
867 }
868
869 allocator.open().await.context("Failed to open allocator")?;
871
872 for JournaledTransaction { checkpoint, non_root_mutations, end_offset, .. } in transactions
874 {
875 self.objects
876 .replay_mutations(
877 non_root_mutations,
878 &journal_offsets,
879 &ApplyContext { mode: ApplyMode::Replay, checkpoint },
880 end_offset,
881 )
882 .await
883 .context("Failed to replay mutations")?;
884 }
885
886 allocator.on_replay_complete().await.context("Failed to complete replay for allocator")?;
887
888 let discarded_to =
889 if last_checkpoint.file_offset != reader.journal_file_checkpoint().file_offset {
890 Some(reader.journal_file_checkpoint().file_offset)
891 } else {
892 None
893 };
894
895 {
897 if last_checkpoint.file_offset < super_block.super_block_journal_file_offset {
898 return Err(anyhow!(FxfsError::Inconsistent).context(format!(
899 "journal replay cut short; journal finishes at {}, but super-block was \
900 written at {}",
901 last_checkpoint.file_offset, super_block.super_block_journal_file_offset
902 )));
903 }
904 let handle = ObjectStore::open_object(
905 &root_parent,
906 super_block.journal_object_id,
907 journal_handle_options(),
908 None,
909 )
910 .await
911 .with_context(|| {
912 format!(
913 "Failed to open journal file (object id: {})",
914 super_block.journal_object_id
915 )
916 })?;
917 let _ = self.handle.set(handle);
918 let mut inner = self.inner.lock();
919 reader.skip_to_end_of_block();
920 let mut writer_checkpoint = reader.journal_file_checkpoint();
921
922 std::mem::drop(reader);
924
925 writer_checkpoint.checksum ^= RESET_XOR;
927 writer_checkpoint.version = LATEST_VERSION;
928 inner.flushed_offset = writer_checkpoint.file_offset;
929
930 inner.device_flushed_offset = inner.flushed_offset;
932
933 inner.writer.seek(writer_checkpoint);
934 inner.output_reset_version = true;
935 inner.valid_to = last_checkpoint.file_offset;
936 if last_checkpoint.file_offset < inner.flushed_offset {
937 inner.discard_offset = Some(last_checkpoint.file_offset);
938 }
939 }
940
941 self.objects
942 .on_replay_complete()
943 .await
944 .context("Failed to complete replay for object manager")?;
945
946 info!(checkpoint = last_checkpoint.file_offset, discarded_to; "replay complete");
947 Ok(())
948 }
949
950 async fn read_transactions(
951 &self,
952 reader: &mut JournalReader,
953 end_offset: Option<u64>,
954 object_id_filter: u64,
955 ) -> Result<JournaledTransactions, Error> {
956 let mut transactions = Vec::new();
957 let (mut device_flushed_offset, root_parent_store_object_id, root_store_object_id) = {
958 let super_block = &self.inner.lock().super_block_header;
959 (
960 super_block.super_block_journal_file_offset,
961 super_block.root_parent_store_object_id,
962 super_block.root_store_object_id,
963 )
964 };
965 let mut current_transaction = None;
966 let mut begin_flush_offsets = HashMap::default();
967 let mut stores_deleted = HashSet::new();
968 loop {
969 let checkpoint = reader.journal_file_checkpoint();
971 if let Some(end_offset) = end_offset {
972 if checkpoint.file_offset >= end_offset {
973 break;
974 }
975 }
976 let result =
977 reader.deserialize().await.context("Failed to deserialize journal record")?;
978 match result {
979 ReadResult::Reset(_) => {
980 if current_transaction.is_some() {
981 current_transaction = None;
982 transactions.pop();
983 }
984 let offset = reader.journal_file_checkpoint().file_offset;
985 if offset > device_flushed_offset {
986 device_flushed_offset = offset;
987 }
988 }
989 ReadResult::Some(record) => {
990 match record {
991 JournalRecord::EndBlock => {
992 reader.skip_to_end_of_block();
993 }
994 JournalRecord::Mutation { object_id, mutation } => {
995 let current_transaction = match current_transaction.as_mut() {
996 None => {
997 transactions.push(JournaledTransaction::new(checkpoint));
998 current_transaction = transactions.last_mut();
999 current_transaction.as_mut().unwrap()
1000 }
1001 Some(transaction) => transaction,
1002 };
1003
1004 if stores_deleted.contains(&object_id) {
1005 bail!(
1006 anyhow!(FxfsError::Inconsistent)
1007 .context("Encountered mutations for deleted store")
1008 );
1009 }
1010
1011 match &mutation {
1012 Mutation::BeginFlush => {
1013 begin_flush_offsets.insert(
1014 object_id,
1015 current_transaction.checkpoint.file_offset,
1016 );
1017 }
1018 Mutation::EndFlush => {
1019 if let Some(offset) = begin_flush_offsets.remove(&object_id) {
1020 if let Some(deleted_volume) =
1021 ¤t_transaction.volume_deleted
1022 {
1023 if *deleted_volume == object_id {
1024 bail!(anyhow!(FxfsError::Inconsistent).context(
1025 "Multiple EndFlush/DeleteVolume mutations in a \
1026 single transaction for the same object"
1027 ));
1028 }
1029 }
1030 if current_transaction
1034 .end_flush
1035 .replace((object_id, offset + 1))
1036 .is_some()
1037 {
1038 bail!(anyhow!(FxfsError::Inconsistent).context(
1039 "Multiple EndFlush mutations in a \
1040 single transaction"
1041 ));
1042 }
1043 }
1044 }
1045 Mutation::DeleteVolume => {
1046 if let Some((flushed_object, _)) =
1047 ¤t_transaction.end_flush
1048 {
1049 if *flushed_object == object_id {
1050 bail!(anyhow!(FxfsError::Inconsistent).context(
1051 "Multiple EndFlush/DeleteVolume mutations in a \
1052 single transaction for the same object"
1053 ));
1054 }
1055 }
1056 if current_transaction
1057 .volume_deleted
1058 .replace(object_id)
1059 .is_some()
1060 {
1061 bail!(anyhow!(FxfsError::Inconsistent).context(
1062 "Multiple DeleteVolume mutations in a single \
1063 transaction"
1064 ));
1065 }
1066 stores_deleted.insert(object_id);
1067 }
1068 _ => {}
1069 }
1070
1071 if (object_id_filter == INVALID_OBJECT_ID
1074 || object_id_filter == object_id)
1075 && self.should_apply(object_id, ¤t_transaction.checkpoint)
1076 {
1077 if object_id == root_parent_store_object_id {
1078 current_transaction.root_parent_mutations.push(mutation);
1079 } else if object_id == root_store_object_id {
1080 current_transaction.root_mutations.push(mutation);
1081 } else {
1082 current_transaction
1083 .non_root_mutations
1084 .push((object_id, mutation));
1085 }
1086 }
1087 }
1088 JournalRecord::DataChecksums(device_range, checksums, first_write) => {
1089 let current_transaction = match current_transaction.as_mut() {
1090 None => {
1091 transactions.push(JournaledTransaction::new(checkpoint));
1092 current_transaction = transactions.last_mut();
1093 current_transaction.as_mut().unwrap()
1094 }
1095 Some(transaction) => transaction,
1096 };
1097 current_transaction.checksums.push(JournaledChecksums {
1098 device_range,
1099 checksums,
1100 first_write,
1101 });
1102 }
1103 JournalRecord::Commit => {
1104 if let Some(&mut JournaledTransaction {
1105 ref checkpoint,
1106 ref root_parent_mutations,
1107 ref mut end_offset,
1108 ..
1109 }) = current_transaction.take()
1110 {
1111 for mutation in root_parent_mutations {
1112 if let Mutation::ObjectStore(ObjectStoreMutation {
1116 item:
1117 Item {
1118 key:
1119 ObjectKey {
1120 object_id,
1121 data:
1122 ObjectKeyData::Attribute(
1123 AttributeId::DATA,
1124 AttributeKey::Extent(extent),
1125 ),
1126 ..
1127 },
1128 value:
1129 ObjectValue::Extent(ExtentValue::Some {
1130 device_offset,
1131 ..
1132 }),
1133 ..
1134 },
1135 ..
1136 }) = mutation
1137 {
1138 let handle = reader.handle();
1141 if *object_id != handle.object_id() {
1142 continue;
1143 }
1144 if let Some(end_offset) = handle.end_offset() {
1145 if extent.start != end_offset {
1146 bail!(anyhow!(FxfsError::Inconsistent).context(
1147 format!(
1148 "Unexpected journal extent {:?} -> {}, \
1149 expected start: {}",
1150 *extent, device_offset, end_offset,
1151 )
1152 ));
1153 }
1154 }
1155 handle.push_extent(
1156 checkpoint.file_offset,
1157 *device_offset
1158 ..*device_offset
1159 + extent.length().context("Invalid extent")?,
1160 );
1161 }
1162 }
1163 *end_offset = reader.journal_file_checkpoint().file_offset;
1164 }
1165 }
1166 JournalRecord::Discard(offset) => {
1167 if offset == 0 {
1168 bail!(
1169 anyhow!(FxfsError::Inconsistent)
1170 .context("Invalid offset for Discard")
1171 );
1172 }
1173 if let Some(transaction) = current_transaction.as_ref() {
1174 if transaction.checkpoint.file_offset < offset {
1175 continue;
1177 }
1178 }
1179 current_transaction = None;
1180 while let Some(transaction) = transactions.last() {
1181 if transaction.checkpoint.file_offset < offset {
1182 break;
1183 }
1184 transactions.pop();
1185 }
1186 reader.handle().discard_extents(offset);
1187 }
1188 JournalRecord::DidFlushDevice(offset) => {
1189 if offset > device_flushed_offset {
1190 device_flushed_offset = offset;
1191 }
1192 }
1193 }
1194 }
1195 ReadResult::ChecksumMismatch => break,
1197 }
1198 }
1199
1200 if current_transaction.is_some() {
1202 transactions.pop();
1203 }
1204
1205 Ok(JournaledTransactions { transactions, device_flushed_offset })
1206 }
1207
1208 pub async fn init_empty(&self, filesystem: Arc<FxFilesystem>) -> Result<(), Error> {
1211 const INIT_ROOT_PARENT_STORE_OBJECT_ID: u64 = 3;
1217 const INIT_ROOT_STORE_OBJECT_ID: u64 = 4;
1218 const INIT_ALLOCATOR_OBJECT_ID: u64 = 5;
1219
1220 info!(device_size = filesystem.device().size(); "Formatting");
1221
1222 let checkpoint = JournalCheckpoint {
1223 version: LATEST_VERSION,
1224 ..self.inner.lock().writer.journal_file_checkpoint()
1225 };
1226
1227 let mut current_generation = 1;
1228 if filesystem.options().image_builder_mode.is_some() {
1229 let block_size = filesystem.block_size();
1237 match self.read_superblocks(filesystem.device(), block_size).await {
1238 Ok((super_block, _)) => {
1239 log::info!(
1240 "Found existing superblock with generation {}. Bumping by 1.",
1241 super_block.generation
1242 );
1243 current_generation = super_block.generation.wrapping_add(1);
1244 }
1245 Err(_) => {
1246 }
1250 }
1251 }
1252
1253 let root_parent = ObjectStore::new_empty(
1254 None,
1255 INIT_ROOT_PARENT_STORE_OBJECT_ID,
1256 filesystem.clone(),
1257 Box::new(NullCache {}),
1258 );
1259 self.objects.set_root_parent_store(root_parent.clone());
1260
1261 let allocator = Arc::new(Allocator::new(filesystem.clone(), INIT_ALLOCATOR_OBJECT_ID));
1262 self.objects.set_allocator(allocator.clone());
1263 self.objects.init_metadata_reservation()?;
1264
1265 let journal_handle;
1266 let super_block_a_handle;
1267 let super_block_b_handle;
1268 let root_store;
1269 let mut transaction = root_parent
1270 .new_transaction(
1271 lock_keys![],
1272 Options { skip_journal_checks: true, ..Default::default() },
1273 )
1274 .await?;
1275 root_store = root_parent
1276 .new_child_store(
1277 &mut transaction,
1278 NewChildStoreOptions { object_id: INIT_ROOT_STORE_OBJECT_ID, ..Default::default() },
1279 Box::new(NullCache {}),
1280 )
1281 .await
1282 .context("new_child_store")?;
1283 self.objects.set_root_store(root_store.clone());
1284
1285 allocator.create(&mut transaction).await?;
1286
1287 super_block_a_handle = ObjectStore::create_object_with_id(
1289 &root_store,
1290 &mut transaction,
1291 ReservedId::new(&root_store, NonZero::new(SuperBlockInstance::A.object_id()).unwrap()),
1292 HandleOptions::default(),
1293 None,
1294 )
1295 .context("create super block")?;
1296 root_store.update_last_object_id(SuperBlockInstance::A.object_id());
1297 super_block_a_handle
1298 .extend(&mut transaction, SuperBlockInstance::A.first_extent())
1299 .await
1300 .context("extend super block")?;
1301 super_block_b_handle = ObjectStore::create_object_with_id(
1302 &root_store,
1303 &mut transaction,
1304 ReservedId::new(&root_store, NonZero::new(SuperBlockInstance::B.object_id()).unwrap()),
1305 HandleOptions::default(),
1306 None,
1307 )
1308 .context("create super block")?;
1309 root_store.update_last_object_id(SuperBlockInstance::B.object_id());
1310 super_block_b_handle
1311 .extend(&mut transaction, SuperBlockInstance::B.first_extent())
1312 .await
1313 .context("extend super block")?;
1314
1315 journal_handle = ObjectStore::create_object(
1317 &root_parent,
1318 &mut transaction,
1319 journal_handle_options(),
1320 None,
1321 )
1322 .await
1323 .context("create journal")?;
1324 if self.inner.lock().image_builder_mode.is_none() {
1325 let mut file_range = 0..self.chunk_size();
1326 journal_handle
1327 .preallocate_range(&mut transaction, &mut file_range)
1328 .await
1329 .context("preallocate journal")?;
1330 if file_range.start < file_range.end {
1331 bail!("preallocate_range returned too little space");
1332 }
1333 }
1334
1335 root_store.create(&mut transaction).await?;
1337
1338 root_parent.set_graveyard_directory_object_id(
1340 Graveyard::create(&mut transaction, &root_parent).await?,
1341 );
1342
1343 transaction.commit().await?;
1344
1345 self.inner.lock().super_block_header = SuperBlockHeader::new(
1346 current_generation,
1347 root_parent.store_object_id(),
1348 root_parent.graveyard_directory_object_id(),
1349 root_store.store_object_id(),
1350 allocator.object_id(),
1351 journal_handle.object_id(),
1352 checkpoint,
1353 LATEST_VERSION,
1354 );
1355
1356 let _ = self.handle.set(journal_handle);
1358 Ok(())
1359 }
1360
1361 pub async fn allocate_journal(&self) -> Result<(), Error> {
1364 let handle = self.handle.get().unwrap();
1365 let mut transaction = handle
1366 .store()
1367 .new_transaction(
1368 lock_keys![LockKey::object(handle.store().store_object_id(), handle.object_id()),],
1369 Options { skip_journal_checks: true, ..Default::default() },
1370 )
1371 .await?;
1372 let mut file_range = 0..self.chunk_size();
1373 self.handle
1374 .get()
1375 .unwrap()
1376 .preallocate_range(&mut transaction, &mut file_range)
1377 .await
1378 .context("preallocate journal")?;
1379 if file_range.start < file_range.end {
1380 bail!("preallocate_range returned too little space");
1381 }
1382 transaction.commit().await?;
1383 Ok(())
1384 }
1385
1386 pub async fn init_superblocks(&self) -> Result<(), Error> {
1387 for _ in 0..2 {
1389 self.write_super_block().await?;
1390 }
1391 Ok(())
1392 }
1393
1394 pub async fn read_transactions_for_object(
1400 &self,
1401 object_id: u64,
1402 ) -> Result<Vec<JournaledTransaction>, Error> {
1403 let handle = self.handle.get().expect("No journal handle");
1404 let handle = ObjectStore::open_object(
1406 handle.owner(),
1407 handle.object_id(),
1408 journal_handle_options(),
1409 None,
1410 )
1411 .await?;
1412
1413 let checkpoint = match self.objects.journal_checkpoint(object_id) {
1414 Some(checkpoint) => checkpoint,
1415 None => return Ok(vec![]),
1416 };
1417 let mut reader = JournalReader::new(handle, &checkpoint);
1418 let end_offset = self.inner.lock().valid_to;
1421 Ok(self.read_transactions(&mut reader, Some(end_offset), object_id).await?.transactions)
1422 }
1423
1424 pub async fn commit(&self, transaction: &mut Transaction<'_>) -> Result<u64, Error> {
1426 if transaction.is_empty() {
1427 return Ok(self.inner.lock().writer.journal_file_checkpoint().file_offset);
1428 }
1429
1430 self.pre_commit(transaction).await?;
1431 Ok(self.write_and_apply_mutations(transaction))
1432 }
1433
1434 async fn pre_commit(&self, _transaction: &Transaction<'_>) -> Result<(), Error> {
1437 let handle;
1438
1439 let (size, zero_offset) = {
1440 let mut inner = self.inner.lock();
1441
1442 if std::mem::take(&mut inner.output_reset_version) {
1444 LATEST_VERSION.serialize_into(&mut inner.writer)?;
1445 }
1446
1447 if let Some(discard_offset) = inner.discard_offset {
1448 JournalRecord::Discard(discard_offset).serialize_into(&mut inner.writer)?;
1449 inner.discard_offset = None;
1450 }
1451
1452 if inner.needs_did_flush_device {
1453 let offset = inner.device_flushed_offset;
1454 JournalRecord::DidFlushDevice(offset).serialize_into(&mut inner.writer)?;
1455 inner.needs_did_flush_device = false;
1456 }
1457
1458 handle = match self.handle.get() {
1459 None => return Ok(()),
1460 Some(x) => x,
1461 };
1462
1463 let file_offset = inner.writer.journal_file_checkpoint().file_offset;
1464
1465 let size = handle.get_size();
1466 let size = if file_offset + self.chunk_size() > size { Some(size) } else { None };
1467
1468 if size.is_none()
1469 && inner.zero_offset.is_none()
1470 && !self.objects.needs_borrow_for_journal(file_offset)
1471 {
1472 return Ok(());
1473 }
1474
1475 (size, inner.zero_offset)
1476 };
1477
1478 let mut transaction = handle
1479 .new_transaction_with_options(Options {
1480 skip_journal_checks: true,
1481 borrow_metadata_space: true,
1482 allocator_reservation: Some(self.objects.metadata_reservation()),
1483 ..Default::default()
1484 })
1485 .await?;
1486 if let Some(size) = size {
1487 handle
1488 .preallocate_range(&mut transaction, &mut (size..size + self.chunk_size()))
1489 .await?;
1490 }
1491 if let Some(zero_offset) = zero_offset {
1492 handle.zero(&mut transaction, 0..zero_offset).await?;
1493 }
1494
1495 self.write_and_apply_mutations(&mut transaction);
1498
1499 let mut inner = self.inner.lock();
1500
1501 if let Some(size) = size {
1504 assert!(inner.writer.journal_file_checkpoint().file_offset < size);
1505 }
1506
1507 if inner.zero_offset == zero_offset {
1508 inner.zero_offset = None;
1509 }
1510
1511 Ok(())
1512 }
1513
1514 fn should_apply(&self, object_id: u64, journal_file_checkpoint: &JournalCheckpoint) -> bool {
1518 let super_block_header = &self.inner.lock().super_block_header;
1519 let offset = super_block_header
1520 .journal_file_offsets
1521 .get(&object_id)
1522 .cloned()
1523 .unwrap_or(super_block_header.super_block_journal_file_offset);
1524 journal_file_checkpoint.file_offset >= offset
1525 }
1526
1527 async fn write_super_block(&self) -> Result<(), Error> {
1530 let root_parent_store = self.objects.root_parent_store();
1531
1532 let old_layers;
1537 let old_super_block_offset;
1538 let mut new_super_block_header;
1539 let checkpoint;
1540 let borrowed;
1541
1542 {
1543 let _sync_guard = debug_assert_not_too_long!(self.sync_mutex.lock());
1544 {
1545 let _write_guard = self.writer_mutex.lock();
1546 (checkpoint, borrowed) = self.pad_to_block()?;
1547 old_layers = super_block::compact_root_parent(&*root_parent_store)?;
1548 }
1549 self.flush_device(checkpoint.file_offset)
1550 .await
1551 .context("flush failed when writing superblock")?;
1552 }
1553
1554 new_super_block_header = self.inner.lock().super_block_header.clone();
1555
1556 old_super_block_offset = new_super_block_header.journal_checkpoint.file_offset;
1557
1558 let (journal_file_offsets, min_checkpoint) = self.objects.journal_file_offsets();
1559
1560 new_super_block_header.generation = new_super_block_header.generation.wrapping_add(1);
1561 new_super_block_header.super_block_journal_file_offset = checkpoint.file_offset;
1562 new_super_block_header.journal_checkpoint = min_checkpoint.unwrap_or(checkpoint);
1563 new_super_block_header.journal_checkpoint.version = LATEST_VERSION;
1564 new_super_block_header.journal_file_offsets = journal_file_offsets;
1565 new_super_block_header.borrowed_metadata_space = borrowed;
1566
1567 self.super_block_manager
1568 .save(
1569 new_super_block_header.clone(),
1570 self.objects.root_parent_store().filesystem(),
1571 old_layers,
1572 )
1573 .await?;
1574 {
1575 let mut inner = self.inner.lock();
1576 inner.super_block_header = new_super_block_header;
1577 inner.zero_offset = Some(round_down(old_super_block_offset, BLOCK_SIZE));
1578 }
1579
1580 Ok(())
1581 }
1582
1583 pub async fn sync(
1590 &self,
1591 options: SyncOptions<'_>,
1592 ) -> Result<Option<(JournalCheckpoint, u64)>, Error> {
1593 let _guard = debug_assert_not_too_long!(self.sync_mutex.lock());
1594
1595 let (checkpoint, borrowed) = {
1596 if let Some(precondition) = options.precondition {
1597 if !precondition() {
1598 return Ok(None);
1599 }
1600 }
1601
1602 let _guard = self.writer_mutex.lock();
1605
1606 self.pad_to_block()?
1607 };
1608
1609 if options.flush_device {
1610 self.flush_device(checkpoint.file_offset).await.context("sync: flush failed")?;
1611 }
1612
1613 Ok(Some((checkpoint, borrowed)))
1614 }
1615
1616 fn pad_to_block(&self) -> Result<(JournalCheckpoint, u64), Error> {
1620 let mut inner = self.inner.lock();
1621 let checkpoint = inner.writer.journal_file_checkpoint();
1622 if checkpoint.file_offset % BLOCK_SIZE != 0 {
1623 JournalRecord::EndBlock.serialize_into(&mut inner.writer)?;
1624 inner.writer.pad_to_block()?;
1625 if let Some(waker) = inner.flush_waker.take() {
1626 waker.wake();
1627 }
1628 }
1629 Ok((checkpoint, self.objects.borrowed_metadata_space()))
1630 }
1631
1632 async fn flush_device(&self, checkpoint_offset: u64) -> Result<(), Error> {
1633 assert!(
1634 self.inner.lock().image_builder_mode.is_none(),
1635 "flush_device called in image builder mode"
1636 );
1637 debug_assert_not_too_long!(poll_fn(|ctx| {
1638 let mut inner = self.inner.lock();
1639 if inner.flushed_offset >= checkpoint_offset {
1640 Poll::Ready(Ok(()))
1641 } else if inner.terminate {
1642 let context = inner
1643 .terminate_reason
1644 .as_ref()
1645 .map(|e| format!("Journal closed with error: {:?}", e))
1646 .unwrap_or_else(|| "Journal closed".to_string());
1647 Poll::Ready(Err(anyhow!(FxfsError::JournalFlushError).context(context)))
1648 } else {
1649 inner.sync_waker = Some(ctx.waker().clone());
1650 Poll::Pending
1651 }
1652 }))?;
1653
1654 let needs_flush = self.inner.lock().device_flushed_offset < checkpoint_offset;
1655 if needs_flush {
1656 let trace = self.trace.load(Ordering::Relaxed);
1657 if trace {
1658 info!("J: start flush device");
1659 }
1660 self.handle.get().unwrap().flush_device().await?;
1661 if trace {
1662 info!("J: end flush device");
1663 }
1664
1665 {
1673 let mut inner = self.inner.lock();
1674 inner.device_flushed_offset = checkpoint_offset;
1675 inner.needs_did_flush_device = true;
1676 }
1677
1678 self.objects.allocator().did_flush_device(checkpoint_offset);
1681 if trace {
1682 info!("J: did flush device");
1683 }
1684 }
1685
1686 Ok(())
1687 }
1688
1689 pub fn super_block_header(&self) -> SuperBlockHeader {
1691 self.inner.lock().super_block_header.clone()
1692 }
1693
1694 pub async fn check_journal_space(&self) -> Result<(), Error> {
1696 loop {
1697 debug_assert_not_too_long!({
1698 let inner = self.inner.lock();
1699 if inner.terminate {
1700 let context = inner
1703 .terminate_reason
1704 .as_ref()
1705 .map(|e| format!("Journal closed with error: {:?}", e))
1706 .unwrap_or_else(|| "Journal closed".to_string());
1707 break Err(anyhow!(FxfsError::JournalFlushError).context(context));
1708 }
1709 if self.objects.last_end_offset()
1710 - inner.super_block_header.journal_checkpoint.file_offset
1711 < inner.reclaim_size
1712 {
1713 break Ok(());
1714 }
1715 if inner.image_builder_mode.is_some() {
1716 break Ok(());
1717 }
1718 if inner.disable_compactions {
1719 break Err(
1720 anyhow!(FxfsError::JournalFlushError).context("Compactions disabled")
1721 );
1722 }
1723 self.reclaim_event.listen()
1724 });
1725 }
1726 }
1727
1728 fn chunk_size(&self) -> u64 {
1729 CHUNK_SIZE
1730 }
1731
1732 fn write_and_apply_mutations(&self, transaction: &mut Transaction<'_>) -> u64 {
1733 let checkpoint_before;
1734 let checkpoint_after;
1735 {
1736 let _guard = self.writer_mutex.lock();
1737 checkpoint_before = {
1738 let mut inner = self.inner.lock();
1739 if transaction.includes_write() {
1740 inner.needs_barrier = true;
1741 }
1742 let checkpoint = inner.writer.journal_file_checkpoint();
1743 for TxnMutation { object_id, mutation, .. } in transaction.mutations() {
1744 self.objects.write_mutation(
1745 *object_id,
1746 mutation,
1747 Writer(*object_id, &mut inner.writer),
1748 );
1749 }
1750 checkpoint
1751 };
1752 let maybe_mutation =
1753 self.objects.apply_transaction(transaction, &checkpoint_before).expect(
1754 "apply_transaction should not fail in live mode; \
1755 filesystem will be in an inconsistent state",
1756 );
1757 checkpoint_after = {
1758 let mut inner = self.inner.lock();
1759 if let Some(mutation) = maybe_mutation {
1760 inner
1761 .writer
1762 .write_record(&JournalRecord::Mutation { object_id: 0, mutation })
1763 .unwrap();
1764 }
1765 for (device_range, checksums, first_write) in
1766 transaction.take_checksums().into_iter()
1767 {
1768 inner
1769 .writer
1770 .write_record(&JournalRecord::DataChecksums(
1771 device_range,
1772 Checksums::fletcher(checksums),
1773 first_write,
1774 ))
1775 .unwrap();
1776 }
1777 inner.writer.write_record(&JournalRecord::Commit).unwrap();
1778
1779 inner.writer.journal_file_checkpoint()
1780 };
1781 }
1782 self.objects.did_commit_transaction(
1783 transaction,
1784 &checkpoint_before,
1785 checkpoint_after.file_offset,
1786 );
1787
1788 if let Some(waker) = self.inner.lock().flush_waker.take() {
1789 waker.wake();
1790 }
1791
1792 checkpoint_before.file_offset
1793 }
1794
1795 pub async fn flush_task(self: Arc<Self>) {
1799 let mut flush_fut = None;
1800 let mut compact_fut = None;
1801 let mut flush_error = false;
1802 poll_fn(|ctx| {
1803 loop {
1804 {
1805 let mut inner = self.inner.lock();
1806 if flush_fut.is_none() && !flush_error && self.handle.get().is_some() {
1807 let flushable = inner.writer.flushable_bytes();
1808 if flushable > 0 {
1809 flush_fut = Some(Box::pin(self.flush(flushable)));
1810 }
1811 }
1812 if inner.terminate && flush_fut.is_none() && compact_fut.is_none() {
1813 return Poll::Ready(());
1814 }
1815 let journal_bytes = self.objects.last_end_offset()
1820 - inner.super_block_header.journal_checkpoint.file_offset;
1821 fxfs_trace::counter!("journal-bytes", 0, "total" => journal_bytes);
1822 if compact_fut.is_none()
1826 && !inner.terminate
1827 && !inner.disable_compactions
1828 && inner.image_builder_mode.is_none()
1829 && journal_bytes > inner.reclaim_size / 2
1830 {
1831 compact_fut = Some(Box::pin(self.compact()));
1832 inner.compaction_running = true;
1833 }
1834 inner.flush_waker = Some(ctx.waker().clone());
1835 }
1836 let mut pending = true;
1837 if let Some(fut) = flush_fut.as_mut() {
1838 if let Poll::Ready(result) = fut.poll_unpin(ctx) {
1839 if let Err(e) = result {
1840 self.inner.lock().terminate(Some(e.context("Flush error")));
1841 self.reclaim_event.notify(usize::MAX);
1842 flush_error = true;
1843 }
1844 flush_fut = None;
1845 pending = false;
1846 }
1847 }
1848 if let Some(fut) = compact_fut.as_mut() {
1849 if let Poll::Ready(result) = fut.poll_unpin(ctx) {
1850 let mut inner = self.inner.lock();
1851 if let Err(e) = result {
1852 inner.terminate(Some(e.context("Compaction error")));
1853 }
1854 compact_fut = None;
1855 inner.compaction_running = false;
1856 self.reclaim_event.notify(usize::MAX);
1857 pending = false;
1858 fxfs_trace::counter!(
1859 "journal-bytes",
1860 0,
1861 "total" => self.objects.last_end_offset()
1862 - inner.super_block_header.journal_checkpoint.file_offset
1863 );
1864 }
1865 }
1866 if pending {
1867 return Poll::Pending;
1868 }
1869 }
1870 })
1871 .await;
1872 }
1873
1874 pub fn get_compaction_yielder(&self) -> CompactionYielder<'_> {
1876 CompactionYielder::new(self)
1877 }
1878
1879 async fn flush(&self, amount: usize) -> Result<(), Error> {
1880 let handle = self.handle.get().unwrap();
1881 let mut buf = handle.allocate_buffer(amount).await;
1882 let (offset, len, barrier_on_first_write) = {
1883 let mut inner = self.inner.lock();
1884 let offset = inner.writer.take_flushable(buf.as_mut());
1885 let barrier_on_first_write = inner.needs_barrier && inner.barriers_enabled;
1886 inner.needs_barrier = false;
1889 (offset, buf.len() as u64, barrier_on_first_write)
1890 };
1891 self.handle
1892 .get()
1893 .unwrap()
1894 .overwrite(
1895 offset,
1896 buf.as_mut(),
1897 OverwriteOptions { barrier_on_first_write, ..Default::default() },
1898 )
1899 .await?;
1900
1901 let mut inner = self.inner.lock();
1902 if let Some(waker) = inner.sync_waker.take() {
1903 waker.wake();
1904 }
1905 inner.flushed_offset = offset + len;
1906 inner.valid_to = inner.flushed_offset;
1907 Ok(())
1908 }
1909
1910 #[trace]
1911 async fn compact(&self) -> Result<(), Error> {
1912 assert!(
1913 self.inner.lock().image_builder_mode.is_none(),
1914 "compact called in image builder mode"
1915 );
1916 let bytes_before = self.objects.compaction_bytes_written();
1917 let _measure = crate::metrics::DurationMeasureScope::new(
1918 &crate::metrics::lsm_tree_metrics().journal_compaction_time,
1919 );
1920 crate::metrics::lsm_tree_metrics().journal_compactions_total.add(1);
1921 let trace = self.trace.load(Ordering::Relaxed);
1922 debug!("Compaction starting");
1923 if trace {
1924 info!("J: start compaction");
1925 }
1926 let earliest_version = self.objects.flush().await.context("Failed to flush objects")?;
1927 self.inner.lock().super_block_header.earliest_version = earliest_version;
1928 self.write_super_block().await.context("Failed to write superblock")?;
1929 if trace {
1930 info!("J: end compaction");
1931 }
1932 debug!("Compaction finished");
1933 let bytes_after = self.objects.compaction_bytes_written();
1934 crate::metrics::lsm_tree_metrics()
1935 .journal_compaction_bytes_written
1936 .add(bytes_after.saturating_sub(bytes_before));
1937 Ok(())
1938 }
1939
1940 pub async fn force_compact(&self) -> Result<(), Error> {
1943 self.inner.lock().forced_compaction = true;
1944 scopeguard::defer! { self.inner.lock().forced_compaction = false; }
1945 self.compact().await
1946 }
1947
1948 pub async fn stop_compactions(&self) {
1949 loop {
1950 debug_assert_not_too_long!({
1951 let mut inner = self.inner.lock();
1952 inner.disable_compactions = true;
1953 if !inner.compaction_running {
1954 return;
1955 }
1956 self.reclaim_event.listen()
1957 });
1958 }
1959 }
1960
1961 pub fn track_statistics(self: &Arc<Self>, parent: &fuchsia_inspect::Node, name: &str) {
1964 let this = Arc::downgrade(self);
1965 parent.record_lazy_child(name, move || {
1966 let this_clone = this.clone();
1967 async move {
1968 let inspector = fuchsia_inspect::Inspector::default();
1969 if let Some(this) = this_clone.upgrade() {
1970 let (journal_min, journal_max, journal_reclaim_size) = {
1971 let inner = this.inner.lock();
1973 (
1974 round_down(
1975 inner.super_block_header.journal_checkpoint.file_offset,
1976 BLOCK_SIZE,
1977 ),
1978 inner.flushed_offset,
1979 inner.reclaim_size,
1980 )
1981 };
1982 let root = inspector.root();
1983 root.record_uint("journal_min_offset", journal_min);
1984 root.record_uint("journal_max_offset", journal_max);
1985 root.record_uint("journal_size", journal_max - journal_min);
1986 root.record_uint("journal_reclaim_size", journal_reclaim_size);
1987
1988 if let Some(x) = round_div(
1990 100 * (journal_max - journal_min),
1991 this.objects.allocator().get_disk_bytes(),
1992 ) {
1993 root.record_uint("journal_size_to_disk_size_percent", x);
1994 }
1995 }
1996 Ok(inspector)
1997 }
1998 .boxed()
1999 });
2000 }
2001
2002 pub fn terminate(&self) {
2004 self.inner.lock().terminate(None);
2005 self.reclaim_event.notify(usize::MAX);
2006 }
2007}
2008
2009pub struct Writer<'a>(u64, &'a mut JournalWriter);
2011
2012impl Writer<'_> {
2013 pub fn write(&mut self, mutation: Mutation) {
2014 self.1.write_record(&JournalRecord::Mutation { object_id: self.0, mutation }).unwrap();
2015 }
2016}
2017
2018#[cfg(target_os = "fuchsia")]
2019mod yielder {
2020 use super::Journal;
2021 use crate::lsm_tree::Yielder;
2022 use fuchsia_async as fasync;
2023
2024 pub struct CompactionYielder<'a> {
2028 journal: &'a Journal,
2029 low_priority_task: Option<fasync::LowPriorityTask>,
2030 }
2031
2032 impl<'a> CompactionYielder<'a> {
2033 pub fn new(journal: &'a Journal) -> Self {
2034 Self { journal, low_priority_task: None }
2035 }
2036 }
2037
2038 impl Yielder for CompactionYielder<'_> {
2039 async fn yield_now(&mut self) {
2040 const IDLE_PERIOD: zx::MonotonicDuration = zx::MonotonicDuration::from_millis(4);
2044 const MAX_YIELD_DURATION: zx::MonotonicDuration =
2045 zx::MonotonicDuration::from_millis(16);
2046
2047 {
2048 let inner = self.journal.inner.lock();
2049 if inner.forced_compaction {
2050 return;
2051 }
2052 let outstanding = self.journal.objects.last_end_offset()
2053 - inner.super_block_header.journal_checkpoint.file_offset;
2054 let half_reclaim_size = inner.reclaim_size / 2;
2055 if outstanding
2056 .checked_sub(half_reclaim_size)
2057 .is_some_and(|x| x >= half_reclaim_size / 2)
2058 {
2059 self.low_priority_task = None;
2063 return;
2064 }
2065 }
2066
2067 self.low_priority_task
2068 .get_or_insert_with(|| fasync::LowPriorityTask::new())
2069 .wait_until_idle_for(
2070 IDLE_PERIOD,
2071 fasync::MonotonicInstant::after(MAX_YIELD_DURATION),
2072 )
2073 .await;
2074 }
2075 }
2076}
2077
2078#[cfg(not(target_os = "fuchsia"))]
2079mod yielder {
2080 use super::Journal;
2081 use crate::lsm_tree::Yielder;
2082
2083 #[expect(dead_code)]
2084 pub struct CompactionYielder<'a>(&'a Journal);
2085
2086 impl<'a> CompactionYielder<'a> {
2087 pub fn new(journal: &'a Journal) -> Self {
2088 Self(journal)
2089 }
2090 }
2091
2092 impl Yielder for CompactionYielder<'_> {
2093 async fn yield_now(&mut self) {}
2094 }
2095}
2096
2097pub use yielder::*;
2098
2099#[cfg(test)]
2100mod tests {
2101 use super::SuperBlockInstance;
2102 use crate::filesystem::{FxFilesystem, FxFilesystemBuilder, SyncOptions};
2103 use crate::fsck::fsck;
2104 use crate::object_handle::{ObjectHandle, ReadObjectHandle, WriteObjectHandle};
2105 use crate::object_store::directory::Directory;
2106 use crate::object_store::transaction::Options;
2107 use crate::object_store::volume::root_volume;
2108 use crate::object_store::{
2109 HandleOptions, LockKey, NewChildStoreOptions, ObjectStore, StoreOptions, lock_keys,
2110 };
2111 #[cfg(target_os = "fuchsia")]
2112 use fuchsia_async::TestExecutor;
2113 use fuchsia_async::{self as fasync, MonotonicDuration};
2114 use storage_device::DeviceHolder;
2115 use storage_device::fake_device::FakeDevice;
2116
2117 const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
2118
2119 #[fuchsia::test]
2120 async fn test_replay() {
2121 const TEST_DATA: &[u8] = b"hello";
2122
2123 let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
2124
2125 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2126
2127 let object_id = {
2128 let root_store = fs.root_store();
2129 let root_directory =
2130 Directory::open(&root_store, root_store.root_directory_object_id())
2131 .await
2132 .expect("open failed");
2133 let mut transaction = fs
2134 .root_store()
2135 .new_transaction(
2136 lock_keys![LockKey::object(
2137 root_store.store_object_id(),
2138 root_store.root_directory_object_id(),
2139 )],
2140 Options::default(),
2141 )
2142 .await
2143 .expect("new_transaction failed");
2144 let handle = root_directory
2145 .create_child_file(&mut transaction, "test")
2146 .await
2147 .expect("create_child_file failed");
2148
2149 transaction.commit().await.expect("commit failed");
2150 let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
2151 buf.as_mut_slice().copy_from_slice(TEST_DATA);
2152 handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2153 fs.sync(SyncOptions::default()).await.expect("sync failed");
2156 handle.object_id()
2157 };
2158
2159 {
2160 fs.close().await.expect("Close failed");
2161 let device = fs.take_device().await;
2162 device.reopen(false);
2163 let fs = FxFilesystem::open(device).await.expect("open failed");
2164 let handle = ObjectStore::open_object(
2165 &fs.root_store(),
2166 object_id,
2167 HandleOptions::default(),
2168 None,
2169 )
2170 .await
2171 .expect("open_object failed");
2172 let mut buf = handle.allocate_buffer(TEST_DEVICE_BLOCK_SIZE as usize).await;
2173 assert_eq!(handle.read(0, buf.as_mut()).await.expect("read failed"), TEST_DATA.len());
2174 assert_eq!(&buf.as_slice()[..TEST_DATA.len()], TEST_DATA);
2175 fsck(fs.clone()).await.expect("fsck failed");
2176 fs.close().await.expect("Close failed");
2177 }
2178 }
2179
2180 #[fuchsia::test]
2181 async fn test_reset() {
2182 const TEST_DATA: &[u8] = b"hello";
2183
2184 let device = DeviceHolder::new(FakeDevice::new(32768, TEST_DEVICE_BLOCK_SIZE));
2185
2186 let mut object_ids = Vec::new();
2187
2188 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2189 {
2190 let root_store = fs.root_store();
2191 let root_directory =
2192 Directory::open(&root_store, root_store.root_directory_object_id())
2193 .await
2194 .expect("open failed");
2195 let mut transaction = fs
2196 .root_store()
2197 .new_transaction(
2198 lock_keys![LockKey::object(
2199 root_store.store_object_id(),
2200 root_store.root_directory_object_id(),
2201 )],
2202 Options::default(),
2203 )
2204 .await
2205 .expect("new_transaction failed");
2206 let handle = root_directory
2207 .create_child_file(&mut transaction, "test")
2208 .await
2209 .expect("create_child_file failed");
2210 transaction.commit().await.expect("commit failed");
2211 let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
2212 buf.as_mut_slice().copy_from_slice(TEST_DATA);
2213 handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2214 fs.sync(SyncOptions::default()).await.expect("sync failed");
2215 object_ids.push(handle.object_id());
2216
2217 for i in 0..1000 {
2220 let mut transaction = fs
2221 .root_store()
2222 .new_transaction(
2223 lock_keys![LockKey::object(
2224 root_store.store_object_id(),
2225 root_store.root_directory_object_id(),
2226 )],
2227 Options::default(),
2228 )
2229 .await
2230 .expect("new_transaction failed");
2231 let handle = root_directory
2232 .create_child_file(&mut transaction, &format!("{}", i))
2233 .await
2234 .expect("create_child_file failed");
2235 transaction.commit().await.expect("commit failed");
2236 let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
2237 buf.as_mut_slice().copy_from_slice(TEST_DATA);
2238 handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2239 object_ids.push(handle.object_id());
2240 }
2241 }
2242 fs.close().await.expect("fs close failed");
2243 let device = fs.take_device().await;
2244 device.reopen(false);
2245 let fs = FxFilesystem::open(device).await.expect("open failed");
2246 fsck(fs.clone()).await.expect("fsck failed");
2247 {
2248 let root_store = fs.root_store();
2249 for &object_id in &object_ids[0..1] {
2251 let handle = ObjectStore::open_object(
2252 &root_store,
2253 object_id,
2254 HandleOptions::default(),
2255 None,
2256 )
2257 .await
2258 .expect("open_object failed");
2259 let mut buf = handle.allocate_buffer(TEST_DEVICE_BLOCK_SIZE as usize).await;
2260 assert_eq!(
2261 handle.read(0, buf.as_mut()).await.expect("read failed"),
2262 TEST_DATA.len()
2263 );
2264 assert_eq!(&buf.as_slice()[..TEST_DATA.len()], TEST_DATA);
2265 }
2266
2267 let root_directory =
2269 Directory::open(&root_store, root_store.root_directory_object_id())
2270 .await
2271 .expect("open failed");
2272 let mut transaction = fs
2273 .root_store()
2274 .new_transaction(
2275 lock_keys![LockKey::object(
2276 root_store.store_object_id(),
2277 root_store.root_directory_object_id(),
2278 )],
2279 Options::default(),
2280 )
2281 .await
2282 .expect("new_transaction failed");
2283 let handle = root_directory
2284 .create_child_file(&mut transaction, "test2")
2285 .await
2286 .expect("create_child_file failed");
2287 transaction.commit().await.expect("commit failed");
2288 let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
2289 buf.as_mut_slice().copy_from_slice(TEST_DATA);
2290 handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2291 fs.sync(SyncOptions::default()).await.expect("sync failed");
2292 object_ids.push(handle.object_id());
2293 }
2294
2295 fs.close().await.expect("close failed");
2296 let device = fs.take_device().await;
2297 device.reopen(false);
2298 let fs = FxFilesystem::open(device).await.expect("open failed");
2299 {
2300 fsck(fs.clone()).await.expect("fsck failed");
2301
2302 for &object_id in object_ids[0..1].iter().chain(object_ids.last().cloned().iter()) {
2304 let handle = ObjectStore::open_object(
2305 &fs.root_store(),
2306 object_id,
2307 HandleOptions::default(),
2308 None,
2309 )
2310 .await
2311 .unwrap_or_else(|e| {
2312 panic!("open_object failed (object_id: {}): {:?}", object_id, e)
2313 });
2314 let mut buf = handle.allocate_buffer(TEST_DEVICE_BLOCK_SIZE as usize).await;
2315 assert_eq!(
2316 handle.read(0, buf.as_mut()).await.expect("read failed"),
2317 TEST_DATA.len()
2318 );
2319 assert_eq!(&buf.as_slice()[..TEST_DATA.len()], TEST_DATA);
2320 }
2321 }
2322 fs.close().await.expect("close failed");
2323 }
2324
2325 #[fuchsia::test]
2326 async fn test_discard() {
2327 let device = {
2328 let device = DeviceHolder::new(FakeDevice::new(8192, 4096));
2329 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2330 let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
2331
2332 let store = root_volume
2333 .new_volume("test", NewChildStoreOptions::default())
2334 .await
2335 .expect("new_volume failed");
2336 let root_directory = Directory::open(&store, store.root_directory_object_id())
2337 .await
2338 .expect("open failed");
2339
2340 let mut i = 0;
2342 loop {
2343 let mut transaction = fs
2344 .root_store()
2345 .new_transaction(
2346 lock_keys![LockKey::object(
2347 store.store_object_id(),
2348 store.root_directory_object_id()
2349 )],
2350 Options::default(),
2351 )
2352 .await
2353 .expect("new_transaction failed");
2354 root_directory
2355 .create_child_file(&mut transaction, &format!("a {i}"))
2356 .await
2357 .expect("create_child_file failed");
2358 if transaction.commit().await.expect("commit failed") > super::CHUNK_SIZE {
2359 break;
2360 }
2361 i += 1;
2362 }
2363
2364 fs.journal().force_compact().await.expect("compact failed");
2366 fs.journal().stop_compactions().await;
2367
2368 let mut i = 0;
2370 loop {
2371 let mut transaction = fs
2372 .root_store()
2373 .new_transaction(
2374 lock_keys![LockKey::object(
2375 store.store_object_id(),
2376 store.root_directory_object_id()
2377 )],
2378 Options::default(),
2379 )
2380 .await
2381 .expect("new_transaction failed");
2382 root_directory
2383 .create_child_file(&mut transaction, &format!("b {i}"))
2384 .await
2385 .expect("create_child_file failed");
2386 if transaction.commit().await.expect("commit failed") > 2 * super::CHUNK_SIZE {
2387 break;
2388 }
2389 i += 1;
2390 }
2391
2392 fasync::Timer::new(MonotonicDuration::from_millis(10)).await;
2394 fs.device().snapshot().expect("snapshot failed")
2397 };
2398
2399 let fs = FxFilesystem::open(device).await.expect("open failed");
2400
2401 {
2402 let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
2403
2404 let store =
2405 root_volume.volume("test", StoreOptions::default()).await.expect("volume failed");
2406
2407 let root_directory = Directory::open(&store, store.root_directory_object_id())
2408 .await
2409 .expect("open failed");
2410
2411 let mut transaction = fs
2413 .root_store()
2414 .new_transaction(
2415 lock_keys![LockKey::object(
2416 store.store_object_id(),
2417 store.root_directory_object_id()
2418 )],
2419 Options::default(),
2420 )
2421 .await
2422 .expect("new_transaction failed");
2423 root_directory
2424 .create_child_file(&mut transaction, &format!("d"))
2425 .await
2426 .expect("create_child_file failed");
2427 transaction.commit().await.expect("commit failed");
2428 }
2429
2430 fs.close().await.expect("close failed");
2431 let device = fs.take_device().await;
2432 device.reopen(false);
2433
2434 let fs = FxFilesystem::open(device).await.expect("open failed");
2435 fsck(fs.clone()).await.expect("fsck failed");
2436 fs.close().await.expect("close failed");
2437 }
2438
2439 #[fuchsia::test]
2440 async fn test_use_existing_generation() {
2441 let device = DeviceHolder::new(FakeDevice::new(8192, 4096));
2442
2443 let fs = FxFilesystemBuilder::new()
2445 .format(true)
2446 .image_builder_mode(Some(SuperBlockInstance::A))
2447 .open(device)
2448 .await
2449 .expect("open failed");
2450 fs.enable_allocations();
2451 let generation0 = fs.super_block_header().generation;
2452 assert_eq!(generation0, 1);
2453 fs.close().await.expect("close failed");
2454 let device = fs.take_device().await;
2455 device.reopen(false);
2456
2457 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2459 let generation1 = fs.super_block_header().generation;
2460 {
2461 let root_volume = crate::object_store::volume::root_volume(fs.clone())
2462 .await
2463 .expect("root_volume failed");
2464 root_volume
2465 .new_volume("test", crate::object_store::NewChildStoreOptions::default())
2466 .await
2467 .expect("new_volume failed");
2468 }
2469 fs.close().await.expect("close failed");
2470 let device = fs.take_device().await;
2471 device.reopen(false);
2472
2473 let fs = FxFilesystemBuilder::new()
2475 .format(true)
2476 .image_builder_mode(Some(SuperBlockInstance::A))
2477 .open(device)
2478 .await
2479 .expect("open failed");
2480 fs.enable_allocations();
2481 let generation2 = fs.super_block_header().generation;
2482 assert!(
2483 generation2 > generation1,
2484 "generation2 ({}) should be greater than generation1 ({})",
2485 generation2,
2486 generation1
2487 );
2488 fs.close().await.expect("close failed");
2489 }
2490
2491 #[fuchsia::test]
2492 async fn test_image_builder_mode_generation_bump_512_byte_block() {
2493 let device = DeviceHolder::new(FakeDevice::new(16384, 512));
2494
2495 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2497 let generation1 = fs.super_block_header().generation;
2498 fs.close().await.expect("close failed");
2499 let device = fs.take_device().await;
2500 device.reopen(false);
2501
2502 let fs = FxFilesystemBuilder::new()
2504 .format(true)
2505 .image_builder_mode(Some(SuperBlockInstance::A))
2506 .open(device)
2507 .await
2508 .expect("open failed");
2509
2510 fs.enable_allocations();
2511 let generation2 = fs.super_block_header().generation;
2512 assert!(
2513 generation2 > generation1,
2514 "Expected generation bump, got {} vs {}",
2515 generation2,
2516 generation1
2517 );
2518 fs.close().await.expect("close failed");
2519 }
2520
2521 #[fuchsia::test]
2522 #[cfg(target_os = "fuchsia")]
2523 fn test_low_priority_compaction() {
2524 let mut executor = TestExecutor::new_with_fake_time();
2525 let mut fut = std::pin::pin!(async {
2526 use std::sync::Arc;
2527 use std::sync::atomic::{AtomicBool, Ordering};
2528
2529 let device = DeviceHolder::new(FakeDevice::new(16384, TEST_DEVICE_BLOCK_SIZE));
2530 let fs = FxFilesystemBuilder::new()
2531 .journal_options(super::JournalOptions {
2532 reclaim_size: 65536,
2533 ..Default::default()
2534 })
2535 .format(true)
2536 .open(device)
2537 .await
2538 .expect("open failed");
2539
2540 let _low = fasync::LowPriorityTask::new();
2541
2542 {
2544 let root_store = fs.root_store();
2545 let root_directory =
2546 Directory::open(&root_store, root_store.root_directory_object_id())
2547 .await
2548 .expect("open failed");
2549 for i in 0..100 {
2550 let mut transaction = fs
2551 .root_store()
2552 .new_transaction(
2553 lock_keys![LockKey::object(
2554 root_store.store_object_id(),
2555 root_store.root_directory_object_id(),
2556 )],
2557 Options::default(),
2558 )
2559 .await
2560 .expect("new_transaction failed");
2561 root_directory
2562 .create_child_file(&mut transaction, &format!("test{}", i))
2563 .await
2564 .expect("create_child_file failed");
2565 transaction.commit().await.expect("commit failed");
2566 }
2567 }
2568
2569 let stop = Arc::new(AtomicBool::new(false));
2571 let stop_clone = stop.clone();
2572 let _normal_task = fasync::Task::spawn(async move {
2573 while !stop_clone.load(Ordering::Relaxed) {
2574 fasync::Timer::new(fasync::MonotonicInstant::after(
2575 MonotonicDuration::from_millis(1),
2576 ))
2577 .await;
2578 }
2579 });
2580
2581 {
2584 let root_store = fs.root_store();
2585 let root_directory =
2586 Directory::open(&root_store, root_store.root_directory_object_id())
2587 .await
2588 .expect("open failed");
2589 let mut i = 0;
2590 loop {
2591 let mut transaction = fs
2592 .root_store()
2593 .new_transaction(
2594 lock_keys![LockKey::object(
2595 root_store.store_object_id(),
2596 root_store.root_directory_object_id(),
2597 )],
2598 Options::default(),
2599 )
2600 .await
2601 .expect("new_transaction failed");
2602 root_directory
2603 .create_child_file(&mut transaction, &format!("trigger{i}"))
2604 .await
2605 .expect("create_child_file failed");
2606 transaction.commit().await.expect("commit failed");
2607
2608 if fs.journal().inner.lock().compaction_running {
2609 break;
2610 }
2611 TestExecutor::advance_to(fasync::MonotonicInstant::after(
2612 MonotonicDuration::from_millis(1),
2613 ))
2614 .await;
2615 i += 1;
2616 }
2617 }
2618
2619 for _ in 0..10 {
2622 TestExecutor::advance_to(fasync::MonotonicInstant::after(
2623 MonotonicDuration::from_millis(1),
2624 ))
2625 .await;
2626 assert!(fs.journal().inner.lock().compaction_running);
2627 }
2628
2629 stop.store(true, Ordering::Relaxed);
2631 TestExecutor::advance_to(fasync::MonotonicInstant::after(
2632 MonotonicDuration::from_millis(1),
2633 ))
2634 .await;
2635
2636 assert!(fs.journal().inner.lock().compaction_running);
2639
2640 for _ in 0..3 {
2642 TestExecutor::advance_to(fasync::MonotonicInstant::after(
2643 MonotonicDuration::from_millis(1),
2644 ))
2645 .await;
2646 assert!(fs.journal().inner.lock().compaction_running);
2647 }
2648
2649 TestExecutor::advance_to(fasync::MonotonicInstant::after(
2651 MonotonicDuration::from_millis(1),
2652 ))
2653 .await;
2654
2655 let _ = TestExecutor::poll_until_stalled(std::future::pending::<()>()).await;
2657 assert!(!fs.journal().inner.lock().compaction_running);
2658
2659 fs.close().await.expect("Close failed");
2660 });
2661 assert!(executor.run_until_stalled(&mut fut).is_ready());
2662 }
2663
2664 #[fuchsia::test]
2665 #[cfg(target_os = "fuchsia")]
2666 fn test_low_priority_compaction_deadline() {
2667 let mut executor = TestExecutor::new_with_fake_time();
2668 let mut fut = std::pin::pin!(async {
2669 use std::sync::Arc;
2670 use std::sync::atomic::{AtomicBool, Ordering};
2671
2672 let device = DeviceHolder::new(FakeDevice::new(216384, TEST_DEVICE_BLOCK_SIZE));
2673 let fs = FxFilesystemBuilder::new()
2674 .journal_options(super::JournalOptions {
2675 reclaim_size: 65536,
2676 ..Default::default()
2677 })
2678 .format(true)
2679 .open(device)
2680 .await
2681 .expect("open failed");
2682
2683 let _low = fasync::LowPriorityTask::new();
2684
2685 {
2687 let root_store = fs.root_store();
2688 let root_directory =
2689 Directory::open(&root_store, root_store.root_directory_object_id())
2690 .await
2691 .expect("open failed");
2692 for i in 0..10 {
2693 let mut transaction = fs
2694 .root_store()
2695 .new_transaction(
2696 lock_keys![LockKey::object(
2697 root_store.store_object_id(),
2698 root_store.root_directory_object_id(),
2699 )],
2700 Options::default(),
2701 )
2702 .await
2703 .expect("new_transaction failed");
2704 root_directory
2705 .create_child_file(&mut transaction, &format!("test{}", i))
2706 .await
2707 .expect("create_child_file failed");
2708 transaction.commit().await.expect("commit failed");
2709 }
2710 }
2711
2712 let stop = Arc::new(AtomicBool::new(false));
2714 let stop_clone = stop.clone();
2715 let _normal_task = fasync::Task::spawn(async move {
2716 while !stop_clone.load(Ordering::Relaxed) {
2717 fasync::Timer::new(fasync::MonotonicInstant::after(
2718 MonotonicDuration::from_millis(3),
2719 ))
2720 .await;
2721 }
2722 });
2723
2724 {
2726 let root_store = fs.root_store();
2727 let root_directory =
2728 Directory::open(&root_store, root_store.root_directory_object_id())
2729 .await
2730 .expect("open failed");
2731 let mut i = 0;
2732 loop {
2733 let mut transaction = fs
2734 .root_store()
2735 .new_transaction(
2736 lock_keys![LockKey::object(
2737 root_store.store_object_id(),
2738 root_store.root_directory_object_id(),
2739 )],
2740 Options::default(),
2741 )
2742 .await
2743 .expect("new_transaction failed");
2744 root_directory
2745 .create_child_file(&mut transaction, &format!("trigger{i}"))
2746 .await
2747 .expect("create_child_file failed");
2748 transaction.commit().await.expect("commit failed");
2749
2750 if fs.journal().inner.lock().compaction_running {
2751 break;
2752 }
2753 TestExecutor::advance_to(fasync::MonotonicInstant::after(
2754 MonotonicDuration::from_millis(1),
2755 ))
2756 .await;
2757 i += 1;
2758 }
2759 }
2760
2761 let mut count = 0;
2764 for _ in 0..1000 {
2765 TestExecutor::advance_to(fasync::MonotonicInstant::after(
2766 MonotonicDuration::from_millis(20),
2767 ))
2768 .await;
2769 if !fs.journal().inner.lock().compaction_running {
2770 break;
2771 }
2772 count += 1;
2773 }
2774 assert!(!fs.journal().inner.lock().compaction_running);
2775
2776 assert!(count > 200);
2779
2780 stop.store(true, Ordering::Relaxed);
2781 fs.close().await.expect("Close failed");
2782 });
2783 assert!(executor.run_until_stalled(&mut fut).is_ready());
2784 }
2785
2786 #[fuchsia::test]
2787 #[cfg(target_os = "fuchsia")]
2788 fn test_low_priority_compaction_no_yielding_when_full() {
2789 let mut executor = TestExecutor::new_with_fake_time();
2790 let mut fut = std::pin::pin!(async {
2791 use std::sync::Arc;
2792 use std::sync::atomic::{AtomicBool, Ordering};
2793
2794 let reclaim_size = 65536;
2795 let device = DeviceHolder::new(FakeDevice::new(216384, TEST_DEVICE_BLOCK_SIZE));
2796 let fs = FxFilesystemBuilder::new()
2797 .journal_options(super::JournalOptions { reclaim_size, ..Default::default() })
2798 .format(true)
2799 .open(device)
2800 .await
2801 .expect("open failed");
2802
2803 let _low = fasync::LowPriorityTask::new();
2804
2805 {
2807 let root_store = fs.root_store();
2808 let root_directory =
2809 Directory::open(&root_store, root_store.root_directory_object_id())
2810 .await
2811 .expect("open failed");
2812 for i in 0..10 {
2813 let mut transaction = fs
2814 .root_store()
2815 .new_transaction(
2816 lock_keys![LockKey::object(
2817 root_store.store_object_id(),
2818 root_store.root_directory_object_id(),
2819 )],
2820 Options::default(),
2821 )
2822 .await
2823 .expect("new_transaction failed");
2824 root_directory
2825 .create_child_file(&mut transaction, &format!("test{}", i))
2826 .await
2827 .expect("create_child_file failed");
2828 transaction.commit().await.expect("commit failed");
2829 }
2830 }
2831
2832 let stop = Arc::new(AtomicBool::new(false));
2834 let stop_clone = stop.clone();
2835 let _normal_task = fasync::Task::spawn(async move {
2836 while !stop_clone.load(Ordering::Relaxed) {
2837 fasync::Timer::new(fasync::MonotonicInstant::after(
2838 MonotonicDuration::from_millis(3),
2839 ))
2840 .await;
2841 }
2842 });
2843
2844 {
2846 let root_store = fs.root_store();
2847 let root_directory =
2848 Directory::open(&root_store, root_store.root_directory_object_id())
2849 .await
2850 .expect("open failed");
2851 let mut i = 0;
2852 loop {
2853 let mut transaction = fs
2854 .root_store()
2855 .new_transaction(
2856 lock_keys![LockKey::object(
2857 root_store.store_object_id(),
2858 root_store.root_directory_object_id(),
2859 )],
2860 Options::default(),
2861 )
2862 .await
2863 .expect("new_transaction failed");
2864 root_directory
2865 .create_child_file(&mut transaction, &format!("trigger{i}"))
2866 .await
2867 .expect("create_child_file failed");
2868 transaction.commit().await.expect("commit failed");
2869
2870 let outstanding = {
2871 let inner = fs.journal().inner.lock();
2872 fs.journal().objects.last_end_offset()
2873 - inner.super_block_header.journal_checkpoint.file_offset
2874 };
2875 if outstanding >= reclaim_size * 7 / 8 {
2876 break;
2877 }
2878 i += 1;
2880 }
2881 }
2882
2883 TestExecutor::advance_to(fasync::MonotonicInstant::after(
2885 MonotonicDuration::from_millis(4),
2886 ))
2887 .await;
2888
2889 let _ = TestExecutor::poll_until_stalled(std::future::pending::<()>()).await;
2891 assert!(!fs.journal().inner.lock().compaction_running);
2892
2893 stop.store(true, Ordering::Relaxed);
2894 fs.close().await.expect("Close failed");
2895 });
2896 assert!(executor.run_until_stalled(&mut fut).is_ready());
2897 }
2898}
2899
2900#[cfg(fuzz)]
2901mod fuzz {
2902 use fuzz::fuzz;
2903
2904 #[fuzz]
2905 fn fuzz_journal_bytes(input: Vec<u8>) {
2906 use crate::filesystem::FxFilesystem;
2907 use fuchsia_async as fasync;
2908 use std::io::Write;
2909 use storage_device::DeviceHolder;
2910 use storage_device::fake_device::FakeDevice;
2911
2912 fasync::SendExecutorBuilder::new().num_threads(4).build().run(async move {
2913 let device = DeviceHolder::new(FakeDevice::new(32768, 512));
2914 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2915 fs.journal().inner.lock().writer.write_all(&input).expect("write failed");
2916 fs.close().await.expect("close failed");
2917 let device = fs.take_device().await;
2918 device.reopen(false);
2919 if let Ok(fs) = FxFilesystem::open(device).await {
2920 let _ = fs.close().await;
2923 }
2924 });
2925 }
2926
2927 #[fuzz]
2928 fn fuzz_journal(input: Vec<super::JournalRecord>) {
2929 use crate::filesystem::FxFilesystem;
2930 use fuchsia_async as fasync;
2931 use storage_device::DeviceHolder;
2932 use storage_device::fake_device::FakeDevice;
2933
2934 fasync::SendExecutorBuilder::new().num_threads(4).build().run(async move {
2935 let device = DeviceHolder::new(FakeDevice::new(32768, 512));
2936 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2937 {
2938 let mut inner = fs.journal().inner.lock();
2939 for record in &input {
2940 let _ = inner.writer.write_record(record);
2941 }
2942 }
2943 fs.close().await.expect("close failed");
2944 let device = fs.take_device().await;
2945 device.reopen(false);
2946 if let Ok(fs) = FxFilesystem::open(device).await {
2947 let _ = fs.close().await;
2950 }
2951 });
2952 }
2953}