1mod bootstrap_handle;
20mod checksum_list;
21mod reader;
22pub mod super_block;
23mod writer;
24
25use crate::checksum::{Checksum, Checksums, ChecksumsV38};
26use crate::errors::FxfsError;
27use crate::filesystem::{ApplyContext, ApplyMode, FxFilesystem, SyncOptions};
28use crate::log::*;
29use crate::lsm_tree::cache::NullCache;
30use crate::lsm_tree::types::Layer;
31use crate::object_handle::{ObjectHandle as _, ReadObjectHandle};
32use crate::object_store::allocator::Allocator;
33use crate::object_store::data_object_handle::OverwriteOptions;
34use crate::object_store::extent_record::{ExtentMode, ExtentValue};
35use crate::object_store::graveyard::Graveyard;
36use crate::object_store::journal::bootstrap_handle::BootstrapObjectHandle;
37use crate::object_store::journal::checksum_list::ChecksumList;
38use crate::object_store::journal::reader::{JournalReader, ReadResult};
39use crate::object_store::journal::super_block::{
40 SuperBlockHeader, SuperBlockInstance, SuperBlockManager,
41};
42use crate::object_store::journal::writer::JournalWriter;
43use crate::object_store::object_manager::ObjectManager;
44use crate::object_store::object_record::{AttributeKey, ObjectKey, ObjectKeyData, ObjectValue};
45use crate::object_store::transaction::{
46 AllocatorMutation, LockKey, Mutation, MutationV40, MutationV41, MutationV43, MutationV46,
47 MutationV47, MutationV49, MutationV50, MutationV54, MutationV55, ObjectStoreMutation, Options,
48 TRANSACTION_MAX_JOURNAL_USAGE, Transaction, TxnMutation, lock_keys,
49};
50use crate::object_store::{
51 AssocObj, AttributeId, DataObjectHandle, Extent, HandleOptions, HandleOwner, INVALID_OBJECT_ID,
52 Item, ItemRef, NewChildStoreOptions, ObjectStore, ReservedId,
53};
54use crate::range::RangeExt;
55use crate::round::{round_div, round_down};
56use crate::serialized_types::{
57 LATEST_VERSION, Migrate, Version, Versioned, migrate_nodefault, migrate_to_version,
58};
59use anyhow::{Context, Error, anyhow, bail, ensure};
60use event_listener::Event;
61use fprint::TypeFingerprint;
62use fuchsia_inspect::NumericProperty;
63use fuchsia_sync::Mutex;
64use futures::FutureExt as _;
65use futures::future::poll_fn;
66use rustc_hash::FxHashMap as HashMap;
67use serde::{Deserialize, Serialize};
68use static_assertions::const_assert;
69use std::clone::Clone;
70use std::collections::HashSet;
71use std::num::NonZero;
72use std::ops::{Bound, Range};
73use std::sync::atomic::{AtomicBool, Ordering};
74use std::sync::{Arc, OnceLock};
75use std::task::{Poll, Waker};
76use storage_device::Device;
77
78pub const BLOCK_SIZE: u64 = 4096;
80
81const CHUNK_SIZE: u64 = 131_072;
83const_assert!(CHUNK_SIZE > TRANSACTION_MAX_JOURNAL_USAGE);
84
85pub const DEFAULT_RECLAIM_SIZE: u64 = 524_288;
87
88pub const RESERVED_SPACE: u64 = 1_048_576;
91
92const RESET_XOR: u64 = 0xffffffffffffffff;
97
98pub type JournalCheckpoint = JournalCheckpointV32;
102
103#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize, TypeFingerprint)]
104pub struct JournalCheckpointV32 {
105 pub file_offset: u64,
106
107 pub checksum: Checksum,
110
111 pub version: Version,
115}
116
117pub type JournalRecord = JournalRecordV55;
118
119#[allow(clippy::large_enum_variant)]
120#[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint, Versioned)]
121#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
122pub enum JournalRecordV55 {
123 EndBlock,
124 Mutation {
125 object_id: u64,
126 mutation: MutationV55,
127 },
128 Commit,
130 Discard(u64),
132 DidFlushDevice(u64),
142 DataChecksums(Range<u64>, crate::checksum::ChecksumsV38, bool),
150}
151
152#[allow(clippy::large_enum_variant)]
153#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
154#[migrate_to_version(JournalRecordV55)]
155#[migrate_nodefault]
156pub enum JournalRecordV54 {
157 EndBlock,
158 Mutation { object_id: u64, mutation: MutationV54 },
159 Commit,
160 Discard(u64),
161 DidFlushDevice(u64),
162 DataChecksums(Range<u64>, crate::checksum::ChecksumsV38, bool),
163}
164
165#[allow(clippy::large_enum_variant)]
166#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
167#[migrate_to_version(JournalRecordV54)]
168#[migrate_nodefault]
169pub enum JournalRecordV50 {
170 EndBlock,
171 Mutation { object_id: u64, mutation: MutationV50 },
172 Commit,
173 Discard(u64),
174 DidFlushDevice(u64),
175 DataChecksums(Range<u64>, ChecksumsV38, bool),
176}
177
178#[allow(clippy::large_enum_variant)]
179#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
180#[migrate_to_version(JournalRecordV50)]
181pub enum JournalRecordV49 {
182 EndBlock,
183 Mutation { object_id: u64, mutation: MutationV49 },
184 Commit,
185 Discard(u64),
186 DidFlushDevice(u64),
187 DataChecksums(Range<u64>, ChecksumsV38, bool),
188}
189
190#[allow(clippy::large_enum_variant)]
191#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
192#[migrate_to_version(JournalRecordV49)]
193pub enum JournalRecordV47 {
194 EndBlock,
195 Mutation { object_id: u64, mutation: MutationV47 },
196 Commit,
197 Discard(u64),
198 DidFlushDevice(u64),
199 DataChecksums(Range<u64>, ChecksumsV38, bool),
200}
201
202#[allow(clippy::large_enum_variant)]
203#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
204#[migrate_to_version(JournalRecordV47)]
205pub enum JournalRecordV46 {
206 EndBlock,
207 Mutation { object_id: u64, mutation: MutationV46 },
208 Commit,
209 Discard(u64),
210 DidFlushDevice(u64),
211 DataChecksums(Range<u64>, ChecksumsV38, bool),
212}
213
214#[allow(clippy::large_enum_variant)]
215#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
216#[migrate_to_version(JournalRecordV46)]
217pub enum JournalRecordV43 {
218 EndBlock,
219 Mutation { object_id: u64, mutation: MutationV43 },
220 Commit,
221 Discard(u64),
222 DidFlushDevice(u64),
223 DataChecksums(Range<u64>, ChecksumsV38, bool),
224}
225
226#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
227#[migrate_to_version(JournalRecordV43)]
228pub enum JournalRecordV42 {
229 EndBlock,
230 Mutation { object_id: u64, mutation: MutationV41 },
231 Commit,
232 Discard(u64),
233 DidFlushDevice(u64),
234 DataChecksums(Range<u64>, ChecksumsV38, bool),
235}
236
237#[derive(Serialize, Deserialize, TypeFingerprint, Versioned)]
238pub enum JournalRecordV41 {
239 EndBlock,
240 Mutation { object_id: u64, mutation: MutationV41 },
241 Commit,
242 Discard(u64),
243 DidFlushDevice(u64),
244 DataChecksums(Range<u64>, ChecksumsV38),
245}
246
247impl From<JournalRecordV41> for JournalRecordV42 {
248 fn from(record: JournalRecordV41) -> Self {
249 match record {
250 JournalRecordV41::EndBlock => Self::EndBlock,
251 JournalRecordV41::Mutation { object_id, mutation } => {
252 Self::Mutation { object_id, mutation: mutation.into() }
253 }
254 JournalRecordV41::Commit => Self::Commit,
255 JournalRecordV41::Discard(offset) => Self::Discard(offset),
256 JournalRecordV41::DidFlushDevice(offset) => Self::DidFlushDevice(offset),
257 JournalRecordV41::DataChecksums(range, sums) => {
258 Self::DataChecksums(range, sums, true)
261 }
262 }
263 }
264}
265
266#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
267#[migrate_to_version(JournalRecordV41)]
268pub enum JournalRecordV40 {
269 EndBlock,
270 Mutation { object_id: u64, mutation: MutationV40 },
271 Commit,
272 Discard(u64),
273 DidFlushDevice(u64),
274 DataChecksums(Range<u64>, ChecksumsV38),
275}
276
277pub(super) fn journal_handle_options() -> HandleOptions {
278 HandleOptions { skip_journal_checks: true, ..Default::default() }
279}
280
281pub struct Journal {
287 objects: Arc<ObjectManager>,
288 handle: OnceLock<DataObjectHandle<ObjectStore>>,
289 super_block_manager: SuperBlockManager,
290 inner: Mutex<Inner>,
291 writer_mutex: Mutex<()>,
292 sync_mutex: futures::lock::Mutex<()>,
293 trace: AtomicBool,
294
295 reclaim_event: Event,
297}
298
299struct Inner {
300 super_block_header: SuperBlockHeader,
301
302 zero_offset: Option<u64>,
304
305 device_flushed_offset: u64,
307
308 needs_did_flush_device: bool,
310
311 writer: JournalWriter,
313
314 output_reset_version: bool,
317
318 flush_waker: Option<Waker>,
320
321 terminate: bool,
323
324 terminate_reason: Option<Error>,
326
327 disable_compactions: bool,
329
330 compaction_running: bool,
332
333 sync_waker: Option<Waker>,
335
336 flushed_offset: u64,
338
339 valid_to: u64,
344
345 discard_offset: Option<u64>,
349
350 reclaim_size: u64,
354
355 image_builder_mode: Option<SuperBlockInstance>,
356
357 barriers_enabled: bool,
361
362 needs_barrier: bool,
365
366 forced_compaction: bool,
368}
369
370impl Inner {
371 fn terminate(&mut self, reason: Option<Error>) {
372 self.terminate = true;
373
374 if let Some(err) = reason {
375 error!(error:? = err; "Terminating journal");
376 if let Some(prev_err) = self.terminate_reason.as_ref() {
378 error!(error:? = prev_err; "Journal previously terminated");
379 } else {
380 self.terminate_reason = Some(err);
381 }
382 }
383
384 if let Some(waker) = self.flush_waker.take() {
385 waker.wake();
386 }
387 if let Some(waker) = self.sync_waker.take() {
388 waker.wake();
389 }
390 }
391}
392
393pub struct JournalOptions {
394 pub reclaim_size: u64,
398
399 pub barriers_enabled: bool,
403}
404
405impl Default for JournalOptions {
406 fn default() -> Self {
407 JournalOptions { reclaim_size: DEFAULT_RECLAIM_SIZE, barriers_enabled: false }
408 }
409}
410
411struct JournaledTransactions {
412 transactions: Vec<JournaledTransaction>,
413 device_flushed_offset: u64,
414}
415
416#[derive(Debug, Default)]
417pub struct JournaledTransaction {
418 pub checkpoint: JournalCheckpoint,
419 pub root_parent_mutations: Vec<Mutation>,
420 pub root_mutations: Vec<Mutation>,
421 pub non_root_mutations: Vec<(u64, Mutation)>,
423 pub end_offset: u64,
424 pub checksums: Vec<JournaledChecksums>,
425
426 pub end_flush: Option<(u64, u64)>,
429
430 pub volume_deleted: Option<u64>,
432}
433
434impl JournaledTransaction {
435 fn new(checkpoint: JournalCheckpoint) -> Self {
436 Self { checkpoint, ..Default::default() }
437 }
438}
439
440const VOLUME_DELETED: u64 = u64::MAX;
441
442#[derive(Debug)]
443pub struct JournaledChecksums {
444 pub device_range: Range<u64>,
445 pub checksums: Checksums,
446 pub first_write: bool,
447}
448
449pub trait JournalHandle: ReadObjectHandle {
452 fn end_offset(&self) -> Option<u64>;
456 fn push_extent(&mut self, added_offset: u64, device_range: Range<u64>);
460 fn discard_extents(&mut self, discard_offset: u64);
462}
463
464impl<S: HandleOwner> JournalHandle for DataObjectHandle<S> {
468 fn end_offset(&self) -> Option<u64> {
469 None
470 }
471 fn push_extent(&mut self, _added_offset: u64, _device_range: Range<u64>) {
472 }
474 fn discard_extents(&mut self, _discard_offset: u64) {
475 }
477}
478
479#[fxfs_trace::trace]
480impl Journal {
481 pub fn new(objects: Arc<ObjectManager>, options: JournalOptions) -> Journal {
482 let starting_checksum = rand::random_range(1..u64::MAX);
483 Journal {
484 objects: objects,
485 handle: OnceLock::new(),
486 super_block_manager: SuperBlockManager::new(),
487 inner: Mutex::new(Inner {
488 super_block_header: SuperBlockHeader::default(),
489 zero_offset: None,
490 device_flushed_offset: 0,
491 needs_did_flush_device: false,
492 writer: JournalWriter::new(BLOCK_SIZE as usize, starting_checksum),
493 output_reset_version: false,
494 flush_waker: None,
495 terminate: false,
496 terminate_reason: None,
497 disable_compactions: false,
498 compaction_running: false,
499 sync_waker: None,
500 flushed_offset: 0,
501 valid_to: 0,
502 discard_offset: None,
503 reclaim_size: options.reclaim_size,
504 image_builder_mode: None,
505 barriers_enabled: options.barriers_enabled,
506 needs_barrier: false,
507 forced_compaction: false,
508 }),
509 writer_mutex: Mutex::new(()),
510 sync_mutex: futures::lock::Mutex::new(()),
511 trace: AtomicBool::new(false),
512 reclaim_event: Event::new(),
513 }
514 }
515
516 pub fn set_trace(&self, trace: bool) {
517 let old_value = self.trace.swap(trace, Ordering::Relaxed);
518 if trace != old_value {
519 info!(trace; "J: trace");
520 }
521 }
522
523 pub fn set_image_builder_mode(&self, mode: Option<SuperBlockInstance>) {
524 self.inner.lock().image_builder_mode = mode;
525 if let Some(instance) = mode {
526 *self.super_block_manager.next_instance.lock() = instance;
527 }
528 }
529
530 pub fn image_builder_mode(&self) -> Option<SuperBlockInstance> {
531 self.inner.lock().image_builder_mode
532 }
533
534 #[cfg(feature = "migration")]
535 pub fn set_filesystem_uuid(&self, uuid: &[u8; 16]) -> Result<(), Error> {
536 ensure!(
537 self.inner.lock().image_builder_mode.is_some(),
538 "Can only set filesystem uuid in image builder mode."
539 );
540 self.inner.lock().super_block_header.guid.0 = uuid::Uuid::from_bytes(*uuid);
541 Ok(())
542 }
543
544 pub(crate) async fn read_superblocks(
545 &self,
546 device: Arc<dyn Device>,
547 block_size: u64,
548 ) -> Result<(SuperBlockHeader, ObjectStore), Error> {
549 self.super_block_manager.load(device, block_size).await
550 }
551
552 fn validate_mutation(&self, mutation: &Mutation, block_size: u64, device_size: u64) -> bool {
556 match mutation {
557 Mutation::ObjectStore(ObjectStoreMutation {
558 item:
559 Item {
560 key:
561 ObjectKey {
562 data: ObjectKeyData::Attribute(_, AttributeKey::Extent(extent)),
563 ..
564 },
565 value: ObjectValue::Extent(ExtentValue::Some { device_offset, mode, .. }),
566 ..
567 },
568 ..
569 }) => {
570 if extent.is_empty() || !extent.is_aligned(block_size) {
571 return false;
572 }
573 let len = extent.length().unwrap();
574 if let ExtentMode::Cow(checksums) = mode {
575 if checksums.len() > 0 {
576 if len % checksums.len() as u64 != 0 {
577 return false;
578 }
579 if (len / checksums.len() as u64) % block_size != 0 {
580 return false;
581 }
582 }
583 }
584 if *device_offset % block_size != 0
585 || *device_offset >= device_size
586 || device_size - *device_offset < len
587 {
588 return false;
589 }
590 }
591 Mutation::ObjectStore(_) => {}
592 Mutation::EncryptedObjectStore(_) => {}
593 Mutation::Allocator(AllocatorMutation::Allocate { device_range, owner_object_id }) => {
594 return !device_range.is_empty()
595 && *owner_object_id != INVALID_OBJECT_ID
596 && device_range.end <= device_size;
597 }
598 Mutation::Allocator(AllocatorMutation::Deallocate {
599 device_range,
600 owner_object_id,
601 }) => {
602 return !device_range.is_empty()
603 && *owner_object_id != INVALID_OBJECT_ID
604 && device_range.end <= device_size;
605 }
606 Mutation::Allocator(AllocatorMutation::MarkForDeletion(owner_object_id)) => {
607 return *owner_object_id != INVALID_OBJECT_ID;
608 }
609 Mutation::Allocator(AllocatorMutation::SetLimit { owner_object_id, .. }) => {
610 return *owner_object_id != INVALID_OBJECT_ID;
611 }
612 Mutation::BeginFlush => {}
613 Mutation::EndFlush => {}
614 Mutation::DeleteVolume => {}
615 Mutation::UpdateBorrowed(_) => {}
616 Mutation::UpdateMutationsKey(_) => {}
617 Mutation::CreateInternalDir(owner_object_id) => {
618 return *owner_object_id != INVALID_OBJECT_ID;
619 }
620 }
621 true
622 }
623
624 fn update_checksum_list(
626 &self,
627 journal_offset: u64,
628 mutation: &Mutation,
629 checksum_list: &mut ChecksumList,
630 ) -> Result<(), Error> {
631 match mutation {
632 Mutation::ObjectStore(_) => {}
633 Mutation::Allocator(AllocatorMutation::Deallocate { device_range, .. }) => {
634 checksum_list.mark_deallocated(journal_offset, device_range.clone().into());
635 }
636 _ => {}
637 }
638 Ok(())
639 }
640
641 #[trace]
643 pub async fn replay(
644 &self,
645 filesystem: Arc<FxFilesystem>,
646 on_new_allocator: Option<Box<dyn Fn(Arc<Allocator>) + Send + Sync>>,
647 ) -> Result<(), Error> {
648 let block_size = filesystem.block_size();
649
650 let (super_block, root_parent) =
651 self.super_block_manager.load(filesystem.device(), block_size).await?;
652
653 let root_parent = Arc::new(ObjectStore::attach_filesystem(root_parent, filesystem.clone()));
654
655 self.objects.set_root_parent_store(root_parent.clone());
656 let allocator =
657 Arc::new(Allocator::new(filesystem.clone(), super_block.allocator_object_id));
658 if let Some(on_new_allocator) = on_new_allocator {
659 on_new_allocator(allocator.clone());
660 }
661 self.objects.set_allocator(allocator.clone());
662 self.objects.set_borrowed_metadata_space(super_block.borrowed_metadata_space);
663 self.objects.set_last_end_offset(super_block.super_block_journal_file_offset);
664 {
665 let mut inner = self.inner.lock();
666 inner.super_block_header = super_block.clone();
667 }
668
669 let device = filesystem.device();
670
671 let mut handle;
672 {
673 let root_parent_layer = root_parent.tree().mutable_layer();
674 let mut iter = root_parent_layer
675 .seek(Bound::Included(&ObjectKey::attribute(
676 super_block.journal_object_id,
677 AttributeId::DATA,
678 AttributeKey::Extent(Extent::search_key_from_offset(round_down(
679 super_block.journal_checkpoint.file_offset,
680 BLOCK_SIZE,
681 ))),
682 )))
683 .await
684 .context("Failed to seek root parent store")?;
685 let start_offset = if let Some(ItemRef {
686 key:
687 ObjectKey {
688 data:
689 ObjectKeyData::Attribute(AttributeId::DATA, AttributeKey::Extent(extent)),
690 ..
691 },
692 ..
693 }) = iter.get()
694 {
695 extent.start
696 } else {
697 0
698 };
699 handle = BootstrapObjectHandle::new_with_start_offset(
700 super_block.journal_object_id,
701 device.clone(),
702 start_offset,
703 );
704 while let Some(item) = iter.get() {
705 if !match item.into() {
706 Some((
707 object_id,
708 AttributeId::DATA,
709 extent,
710 ExtentValue::Some { device_offset, .. },
711 )) if object_id == super_block.journal_object_id => {
712 if let Some(end_offset) = handle.end_offset() {
713 if extent.start != end_offset {
714 bail!(anyhow!(FxfsError::Inconsistent).context(format!(
715 "Unexpected journal extent {:?}, expected start: {}",
716 item, end_offset
717 )));
718 }
719 }
720 handle.push_extent(
721 0, *device_offset
723 ..*device_offset + extent.length().context("Invalid extent")?,
724 );
725 true
726 }
727 _ => false,
728 } {
729 break;
730 }
731 iter.advance().await.context("Failed to advance root parent store iterator")?;
732 }
733 }
734
735 let mut reader = JournalReader::new(handle, &super_block.journal_checkpoint);
736 let JournaledTransactions { mut transactions, device_flushed_offset } = self
737 .read_transactions(&mut reader, None, INVALID_OBJECT_ID)
738 .await
739 .context("Reading transactions for replay")?;
740
741 let mut checksum_list = ChecksumList::new(device_flushed_offset);
743 let mut valid_to = reader.journal_file_checkpoint().file_offset;
744 let device_size = device.size();
745 'bad_replay: for JournaledTransaction {
746 checkpoint,
747 root_parent_mutations,
748 root_mutations,
749 non_root_mutations,
750 checksums,
751 ..
752 } in &transactions
753 {
754 for JournaledChecksums { device_range, checksums, first_write } in checksums {
755 checksum_list
756 .push(
757 checkpoint.file_offset,
758 device_range.clone(),
759 checksums.maybe_as_ref().context("Malformed checksums")?,
760 *first_write,
761 )
762 .context("Pushing journal checksum records to checksum list")?;
763 }
764 for mutation in root_parent_mutations
765 .iter()
766 .chain(root_mutations)
767 .chain(non_root_mutations.iter().map(|(_, m)| m))
768 {
769 if !self.validate_mutation(mutation, block_size, device_size) {
770 info!(mutation:?; "Stopping replay at bad mutation");
771 valid_to = checkpoint.file_offset;
772 break 'bad_replay;
773 }
774 self.update_checksum_list(checkpoint.file_offset, &mutation, &mut checksum_list)?;
775 }
776 }
777
778 let valid_to = checksum_list
781 .verify(device.as_ref(), valid_to)
782 .await
783 .context("Failed to validate checksums")?;
784
785 let mut last_checkpoint = reader.journal_file_checkpoint();
788 let mut journal_offsets = super_block.journal_file_offsets.clone();
789
790 for (
793 index,
794 JournaledTransaction {
795 checkpoint,
796 root_parent_mutations,
797 end_flush,
798 volume_deleted,
799 ..
800 },
801 ) in transactions.iter_mut().enumerate()
802 {
803 if checkpoint.file_offset >= valid_to {
804 last_checkpoint = checkpoint.clone();
805
806 transactions.truncate(index);
808 break;
809 }
810
811 let context = ApplyContext { mode: ApplyMode::Replay, checkpoint: checkpoint.clone() };
812 for mutation in root_parent_mutations.drain(..) {
813 self.objects
814 .apply_mutation(
815 super_block.root_parent_store_object_id,
816 mutation,
817 &context,
818 AssocObj::None,
819 )
820 .context("Failed to replay root parent store mutations")?;
821 }
822
823 if let Some((object_id, journal_offset)) = end_flush {
824 journal_offsets.insert(*object_id, *journal_offset);
825 }
826
827 if let Some(object_id) = volume_deleted {
828 journal_offsets.insert(*object_id, VOLUME_DELETED);
829 }
830 }
831
832 let root_store = ObjectStore::open(
834 &root_parent,
835 super_block.root_store_object_id,
836 Box::new(NullCache {}),
837 )
838 .await
839 .context("Unable to open root store")?;
840
841 ensure!(
842 !root_store.is_encrypted(),
843 anyhow!(FxfsError::Inconsistent).context("Root store is encrypted")
844 );
845 self.objects.set_root_store(root_store);
846
847 let root_store_offset =
848 journal_offsets.get(&super_block.root_store_object_id).copied().unwrap_or(0);
849
850 for JournaledTransaction { checkpoint, root_mutations, .. } in &mut transactions {
852 if checkpoint.file_offset < root_store_offset {
853 continue;
854 }
855
856 let context = ApplyContext { mode: ApplyMode::Replay, checkpoint: checkpoint.clone() };
857 for mutation in root_mutations.drain(..) {
858 self.objects
859 .apply_mutation(
860 super_block.root_store_object_id,
861 mutation,
862 &context,
863 AssocObj::None,
864 )
865 .context("Failed to replay root store mutations")?;
866 }
867 }
868
869 allocator.open().await.context("Failed to open allocator")?;
871
872 for JournaledTransaction { checkpoint, non_root_mutations, end_offset, .. } in transactions
874 {
875 self.objects
876 .replay_mutations(
877 non_root_mutations,
878 &journal_offsets,
879 &ApplyContext { mode: ApplyMode::Replay, checkpoint },
880 end_offset,
881 )
882 .await
883 .context("Failed to replay mutations")?;
884 }
885
886 allocator.on_replay_complete().await.context("Failed to complete replay for allocator")?;
887
888 let discarded_to =
889 if last_checkpoint.file_offset != reader.journal_file_checkpoint().file_offset {
890 Some(reader.journal_file_checkpoint().file_offset)
891 } else {
892 None
893 };
894
895 {
897 if last_checkpoint.file_offset < super_block.super_block_journal_file_offset {
898 return Err(anyhow!(FxfsError::Inconsistent).context(format!(
899 "journal replay cut short; journal finishes at {}, but super-block was \
900 written at {}",
901 last_checkpoint.file_offset, super_block.super_block_journal_file_offset
902 )));
903 }
904 let handle = ObjectStore::open_object(
905 &root_parent,
906 super_block.journal_object_id,
907 journal_handle_options(),
908 None,
909 )
910 .await
911 .with_context(|| {
912 format!(
913 "Failed to open journal file (object id: {})",
914 super_block.journal_object_id
915 )
916 })?;
917 let _ = self.handle.set(handle);
918 let mut inner = self.inner.lock();
919 reader.skip_to_end_of_block();
920 let mut writer_checkpoint = reader.journal_file_checkpoint();
921
922 std::mem::drop(reader);
924
925 writer_checkpoint.checksum ^= RESET_XOR;
927 writer_checkpoint.version = LATEST_VERSION;
928 inner.flushed_offset = writer_checkpoint.file_offset;
929
930 inner.device_flushed_offset = inner.flushed_offset;
932
933 inner.writer.seek(writer_checkpoint);
934 inner.output_reset_version = true;
935 inner.valid_to = last_checkpoint.file_offset;
936 if last_checkpoint.file_offset < inner.flushed_offset {
937 inner.discard_offset = Some(last_checkpoint.file_offset);
938 }
939 }
940
941 self.objects
942 .on_replay_complete()
943 .await
944 .context("Failed to complete replay for object manager")?;
945
946 info!(checkpoint = last_checkpoint.file_offset, discarded_to; "replay complete");
947 Ok(())
948 }
949
950 async fn read_transactions(
951 &self,
952 reader: &mut JournalReader,
953 end_offset: Option<u64>,
954 object_id_filter: u64,
955 ) -> Result<JournaledTransactions, Error> {
956 let mut transactions = Vec::new();
957 let (mut device_flushed_offset, root_parent_store_object_id, root_store_object_id) = {
958 let super_block = &self.inner.lock().super_block_header;
959 (
960 super_block.super_block_journal_file_offset,
961 super_block.root_parent_store_object_id,
962 super_block.root_store_object_id,
963 )
964 };
965 let mut current_transaction = None;
966 let mut begin_flush_offsets = HashMap::default();
967 let mut stores_deleted = HashSet::new();
968 loop {
969 let checkpoint = reader.journal_file_checkpoint();
971 if let Some(end_offset) = end_offset {
972 if checkpoint.file_offset >= end_offset {
973 break;
974 }
975 }
976 let result =
977 reader.deserialize().await.context("Failed to deserialize journal record")?;
978 match result {
979 ReadResult::Reset(_) => {
980 if current_transaction.is_some() {
981 current_transaction = None;
982 transactions.pop();
983 }
984 let offset = reader.journal_file_checkpoint().file_offset;
985 if offset > device_flushed_offset {
986 device_flushed_offset = offset;
987 }
988 }
989 ReadResult::Some(record) => {
990 match record {
991 JournalRecord::EndBlock => {
992 reader.skip_to_end_of_block();
993 }
994 JournalRecord::Mutation { object_id, mutation } => {
995 let current_transaction = match current_transaction.as_mut() {
996 None => {
997 transactions.push(JournaledTransaction::new(checkpoint));
998 current_transaction = transactions.last_mut();
999 current_transaction.as_mut().unwrap()
1000 }
1001 Some(transaction) => transaction,
1002 };
1003
1004 if stores_deleted.contains(&object_id) {
1005 bail!(
1006 anyhow!(FxfsError::Inconsistent)
1007 .context("Encountered mutations for deleted store")
1008 );
1009 }
1010
1011 match &mutation {
1012 Mutation::BeginFlush => {
1013 begin_flush_offsets.insert(
1014 object_id,
1015 current_transaction.checkpoint.file_offset,
1016 );
1017 }
1018 Mutation::EndFlush => {
1019 if let Some(offset) = begin_flush_offsets.remove(&object_id) {
1020 if let Some(deleted_volume) =
1021 ¤t_transaction.volume_deleted
1022 {
1023 if *deleted_volume == object_id {
1024 bail!(anyhow!(FxfsError::Inconsistent).context(
1025 "Multiple EndFlush/DeleteVolume mutations in a \
1026 single transaction for the same object"
1027 ));
1028 }
1029 }
1030 if current_transaction
1034 .end_flush
1035 .replace((object_id, offset + 1))
1036 .is_some()
1037 {
1038 bail!(anyhow!(FxfsError::Inconsistent).context(
1039 "Multiple EndFlush mutations in a \
1040 single transaction"
1041 ));
1042 }
1043 }
1044 }
1045 Mutation::DeleteVolume => {
1046 if let Some((flushed_object, _)) =
1047 ¤t_transaction.end_flush
1048 {
1049 if *flushed_object == object_id {
1050 bail!(anyhow!(FxfsError::Inconsistent).context(
1051 "Multiple EndFlush/DeleteVolume mutations in a \
1052 single transaction for the same object"
1053 ));
1054 }
1055 }
1056 if current_transaction
1057 .volume_deleted
1058 .replace(object_id)
1059 .is_some()
1060 {
1061 bail!(anyhow!(FxfsError::Inconsistent).context(
1062 "Multiple DeleteVolume mutations in a single \
1063 transaction"
1064 ));
1065 }
1066 stores_deleted.insert(object_id);
1067 }
1068 _ => {}
1069 }
1070
1071 if (object_id_filter == INVALID_OBJECT_ID
1074 || object_id_filter == object_id)
1075 && self.should_apply(object_id, ¤t_transaction.checkpoint)
1076 {
1077 if object_id == root_parent_store_object_id {
1078 current_transaction.root_parent_mutations.push(mutation);
1079 } else if object_id == root_store_object_id {
1080 current_transaction.root_mutations.push(mutation);
1081 } else {
1082 current_transaction
1083 .non_root_mutations
1084 .push((object_id, mutation));
1085 }
1086 }
1087 }
1088 JournalRecord::DataChecksums(device_range, checksums, first_write) => {
1089 let current_transaction = match current_transaction.as_mut() {
1090 None => {
1091 transactions.push(JournaledTransaction::new(checkpoint));
1092 current_transaction = transactions.last_mut();
1093 current_transaction.as_mut().unwrap()
1094 }
1095 Some(transaction) => transaction,
1096 };
1097 current_transaction.checksums.push(JournaledChecksums {
1098 device_range,
1099 checksums,
1100 first_write,
1101 });
1102 }
1103 JournalRecord::Commit => {
1104 if let Some(&mut JournaledTransaction {
1105 ref checkpoint,
1106 ref root_parent_mutations,
1107 ref mut end_offset,
1108 ..
1109 }) = current_transaction.take()
1110 {
1111 for mutation in root_parent_mutations {
1112 if let Mutation::ObjectStore(ObjectStoreMutation {
1116 item:
1117 Item {
1118 key:
1119 ObjectKey {
1120 object_id,
1121 data:
1122 ObjectKeyData::Attribute(
1123 AttributeId::DATA,
1124 AttributeKey::Extent(extent),
1125 ),
1126 ..
1127 },
1128 value:
1129 ObjectValue::Extent(ExtentValue::Some {
1130 device_offset,
1131 ..
1132 }),
1133 ..
1134 },
1135 ..
1136 }) = mutation
1137 {
1138 let handle = reader.handle();
1141 if *object_id != handle.object_id() {
1142 continue;
1143 }
1144 if let Some(end_offset) = handle.end_offset() {
1145 if extent.start != end_offset {
1146 bail!(anyhow!(FxfsError::Inconsistent).context(
1147 format!(
1148 "Unexpected journal extent {:?} -> {}, \
1149 expected start: {}",
1150 *extent, device_offset, end_offset,
1151 )
1152 ));
1153 }
1154 }
1155 handle.push_extent(
1156 checkpoint.file_offset,
1157 *device_offset
1158 ..*device_offset
1159 + extent.length().context("Invalid extent")?,
1160 );
1161 }
1162 }
1163 *end_offset = reader.journal_file_checkpoint().file_offset;
1164 }
1165 }
1166 JournalRecord::Discard(offset) => {
1167 if offset == 0 {
1168 bail!(
1169 anyhow!(FxfsError::Inconsistent)
1170 .context("Invalid offset for Discard")
1171 );
1172 }
1173 if let Some(transaction) = current_transaction.as_ref() {
1174 if transaction.checkpoint.file_offset < offset {
1175 continue;
1177 }
1178 }
1179 current_transaction = None;
1180 while let Some(transaction) = transactions.last() {
1181 if transaction.checkpoint.file_offset < offset {
1182 break;
1183 }
1184 transactions.pop();
1185 }
1186 reader.handle().discard_extents(offset);
1187 }
1188 JournalRecord::DidFlushDevice(offset) => {
1189 if offset > device_flushed_offset {
1190 device_flushed_offset = offset;
1191 }
1192 }
1193 }
1194 }
1195 ReadResult::ChecksumMismatch => break,
1197 }
1198 }
1199
1200 if current_transaction.is_some() {
1202 transactions.pop();
1203 }
1204
1205 Ok(JournaledTransactions { transactions, device_flushed_offset })
1206 }
1207
1208 pub async fn init_empty(&self, filesystem: Arc<FxFilesystem>) -> Result<(), Error> {
1211 const INIT_ROOT_PARENT_STORE_OBJECT_ID: u64 = 3;
1217 const INIT_ROOT_STORE_OBJECT_ID: u64 = 4;
1218 const INIT_ALLOCATOR_OBJECT_ID: u64 = 5;
1219
1220 info!(device_size = filesystem.device().size(); "Formatting");
1221
1222 let checkpoint = JournalCheckpoint {
1223 version: LATEST_VERSION,
1224 ..self.inner.lock().writer.journal_file_checkpoint()
1225 };
1226
1227 let mut current_generation = 1;
1228 if filesystem.options().image_builder_mode.is_some() {
1229 let block_size = filesystem.block_size();
1237 match self.read_superblocks(filesystem.device(), block_size).await {
1238 Ok((super_block, _)) => {
1239 log::info!(
1240 "Found existing superblock with generation {}. Bumping by 1.",
1241 super_block.generation
1242 );
1243 current_generation = super_block.generation.wrapping_add(1);
1244 }
1245 Err(_) => {
1246 }
1250 }
1251 }
1252
1253 let root_parent = ObjectStore::new_empty(
1254 None,
1255 INIT_ROOT_PARENT_STORE_OBJECT_ID,
1256 filesystem.clone(),
1257 Box::new(NullCache {}),
1258 );
1259 self.objects.set_root_parent_store(root_parent.clone());
1260
1261 let allocator = Arc::new(Allocator::new(filesystem.clone(), INIT_ALLOCATOR_OBJECT_ID));
1262 self.objects.set_allocator(allocator.clone());
1263 self.objects.init_metadata_reservation()?;
1264
1265 let journal_handle;
1266 let super_block_a_handle;
1267 let super_block_b_handle;
1268 let root_store;
1269 let mut transaction = filesystem
1270 .clone()
1271 .new_transaction(
1272 lock_keys![],
1273 Options { skip_journal_checks: true, ..Default::default() },
1274 )
1275 .await?;
1276 root_store = root_parent
1277 .new_child_store(
1278 &mut transaction,
1279 NewChildStoreOptions { object_id: INIT_ROOT_STORE_OBJECT_ID, ..Default::default() },
1280 Box::new(NullCache {}),
1281 )
1282 .await
1283 .context("new_child_store")?;
1284 self.objects.set_root_store(root_store.clone());
1285
1286 allocator.create(&mut transaction).await?;
1287
1288 super_block_a_handle = ObjectStore::create_object_with_id(
1290 &root_store,
1291 &mut transaction,
1292 ReservedId::new(&root_store, NonZero::new(SuperBlockInstance::A.object_id()).unwrap()),
1293 HandleOptions::default(),
1294 None,
1295 )
1296 .context("create super block")?;
1297 root_store.update_last_object_id(SuperBlockInstance::A.object_id());
1298 super_block_a_handle
1299 .extend(&mut transaction, SuperBlockInstance::A.first_extent())
1300 .await
1301 .context("extend super block")?;
1302 super_block_b_handle = ObjectStore::create_object_with_id(
1303 &root_store,
1304 &mut transaction,
1305 ReservedId::new(&root_store, NonZero::new(SuperBlockInstance::B.object_id()).unwrap()),
1306 HandleOptions::default(),
1307 None,
1308 )
1309 .context("create super block")?;
1310 root_store.update_last_object_id(SuperBlockInstance::B.object_id());
1311 super_block_b_handle
1312 .extend(&mut transaction, SuperBlockInstance::B.first_extent())
1313 .await
1314 .context("extend super block")?;
1315
1316 journal_handle = ObjectStore::create_object(
1318 &root_parent,
1319 &mut transaction,
1320 journal_handle_options(),
1321 None,
1322 )
1323 .await
1324 .context("create journal")?;
1325 if self.inner.lock().image_builder_mode.is_none() {
1326 let mut file_range = 0..self.chunk_size();
1327 journal_handle
1328 .preallocate_range(&mut transaction, &mut file_range)
1329 .await
1330 .context("preallocate journal")?;
1331 if file_range.start < file_range.end {
1332 bail!("preallocate_range returned too little space");
1333 }
1334 }
1335
1336 root_store.create(&mut transaction).await?;
1338
1339 root_parent.set_graveyard_directory_object_id(
1341 Graveyard::create(&mut transaction, &root_parent).await?,
1342 );
1343
1344 transaction.commit().await?;
1345
1346 self.inner.lock().super_block_header = SuperBlockHeader::new(
1347 current_generation,
1348 root_parent.store_object_id(),
1349 root_parent.graveyard_directory_object_id(),
1350 root_store.store_object_id(),
1351 allocator.object_id(),
1352 journal_handle.object_id(),
1353 checkpoint,
1354 LATEST_VERSION,
1355 );
1356
1357 let _ = self.handle.set(journal_handle);
1359 Ok(())
1360 }
1361
1362 pub async fn allocate_journal(&self) -> Result<(), Error> {
1365 let handle = self.handle.get().unwrap();
1366 let filesystem = handle.store().filesystem();
1367 let mut transaction = filesystem
1368 .clone()
1369 .new_transaction(
1370 lock_keys![LockKey::object(handle.store().store_object_id(), handle.object_id()),],
1371 Options { skip_journal_checks: true, ..Default::default() },
1372 )
1373 .await?;
1374 let mut file_range = 0..self.chunk_size();
1375 self.handle
1376 .get()
1377 .unwrap()
1378 .preallocate_range(&mut transaction, &mut file_range)
1379 .await
1380 .context("preallocate journal")?;
1381 if file_range.start < file_range.end {
1382 bail!("preallocate_range returned too little space");
1383 }
1384 transaction.commit().await?;
1385 Ok(())
1386 }
1387
1388 pub async fn init_superblocks(&self) -> Result<(), Error> {
1389 for _ in 0..2 {
1391 self.write_super_block().await?;
1392 }
1393 Ok(())
1394 }
1395
1396 pub async fn read_transactions_for_object(
1402 &self,
1403 object_id: u64,
1404 ) -> Result<Vec<JournaledTransaction>, Error> {
1405 let handle = self.handle.get().expect("No journal handle");
1406 let handle = ObjectStore::open_object(
1408 handle.owner(),
1409 handle.object_id(),
1410 journal_handle_options(),
1411 None,
1412 )
1413 .await?;
1414
1415 let checkpoint = match self.objects.journal_checkpoint(object_id) {
1416 Some(checkpoint) => checkpoint,
1417 None => return Ok(vec![]),
1418 };
1419 let mut reader = JournalReader::new(handle, &checkpoint);
1420 let end_offset = self.inner.lock().valid_to;
1423 Ok(self.read_transactions(&mut reader, Some(end_offset), object_id).await?.transactions)
1424 }
1425
1426 pub async fn commit(&self, transaction: &mut Transaction<'_>) -> Result<u64, Error> {
1428 if transaction.is_empty() {
1429 return Ok(self.inner.lock().writer.journal_file_checkpoint().file_offset);
1430 }
1431
1432 self.pre_commit(transaction).await?;
1433 Ok(self.write_and_apply_mutations(transaction))
1434 }
1435
1436 async fn pre_commit(&self, transaction: &Transaction<'_>) -> Result<(), Error> {
1439 let handle;
1440
1441 let (size, zero_offset) = {
1442 let mut inner = self.inner.lock();
1443
1444 if std::mem::take(&mut inner.output_reset_version) {
1446 LATEST_VERSION.serialize_into(&mut inner.writer)?;
1447 }
1448
1449 if let Some(discard_offset) = inner.discard_offset {
1450 JournalRecord::Discard(discard_offset).serialize_into(&mut inner.writer)?;
1451 inner.discard_offset = None;
1452 }
1453
1454 if inner.needs_did_flush_device {
1455 let offset = inner.device_flushed_offset;
1456 JournalRecord::DidFlushDevice(offset).serialize_into(&mut inner.writer)?;
1457 inner.needs_did_flush_device = false;
1458 }
1459
1460 handle = match self.handle.get() {
1461 None => return Ok(()),
1462 Some(x) => x,
1463 };
1464
1465 let file_offset = inner.writer.journal_file_checkpoint().file_offset;
1466
1467 let size = handle.get_size();
1468 let size = if file_offset + self.chunk_size() > size { Some(size) } else { None };
1469
1470 if size.is_none()
1471 && inner.zero_offset.is_none()
1472 && !self.objects.needs_borrow_for_journal(file_offset)
1473 {
1474 return Ok(());
1475 }
1476
1477 (size, inner.zero_offset)
1478 };
1479
1480 let mut transaction = handle
1481 .new_transaction_with_options(Options {
1482 skip_journal_checks: true,
1483 borrow_metadata_space: true,
1484 allocator_reservation: Some(self.objects.metadata_reservation()),
1485 txn_guard: Some(transaction.txn_guard()),
1486 ..Default::default()
1487 })
1488 .await?;
1489 if let Some(size) = size {
1490 handle
1491 .preallocate_range(&mut transaction, &mut (size..size + self.chunk_size()))
1492 .await?;
1493 }
1494 if let Some(zero_offset) = zero_offset {
1495 handle.zero(&mut transaction, 0..zero_offset).await?;
1496 }
1497
1498 self.write_and_apply_mutations(&mut transaction);
1501
1502 let mut inner = self.inner.lock();
1503
1504 if let Some(size) = size {
1507 assert!(inner.writer.journal_file_checkpoint().file_offset < size);
1508 }
1509
1510 if inner.zero_offset == zero_offset {
1511 inner.zero_offset = None;
1512 }
1513
1514 Ok(())
1515 }
1516
1517 fn should_apply(&self, object_id: u64, journal_file_checkpoint: &JournalCheckpoint) -> bool {
1521 let super_block_header = &self.inner.lock().super_block_header;
1522 let offset = super_block_header
1523 .journal_file_offsets
1524 .get(&object_id)
1525 .cloned()
1526 .unwrap_or(super_block_header.super_block_journal_file_offset);
1527 journal_file_checkpoint.file_offset >= offset
1528 }
1529
1530 async fn write_super_block(&self) -> Result<(), Error> {
1533 let root_parent_store = self.objects.root_parent_store();
1534
1535 let old_layers;
1540 let old_super_block_offset;
1541 let mut new_super_block_header;
1542 let checkpoint;
1543 let borrowed;
1544
1545 {
1546 let _sync_guard = debug_assert_not_too_long!(self.sync_mutex.lock());
1547 {
1548 let _write_guard = self.writer_mutex.lock();
1549 (checkpoint, borrowed) = self.pad_to_block()?;
1550 old_layers = super_block::compact_root_parent(&*root_parent_store)?;
1551 }
1552 self.flush_device(checkpoint.file_offset)
1553 .await
1554 .context("flush failed when writing superblock")?;
1555 }
1556
1557 new_super_block_header = self.inner.lock().super_block_header.clone();
1558
1559 old_super_block_offset = new_super_block_header.journal_checkpoint.file_offset;
1560
1561 let (journal_file_offsets, min_checkpoint) = self.objects.journal_file_offsets();
1562
1563 new_super_block_header.generation = new_super_block_header.generation.wrapping_add(1);
1564 new_super_block_header.super_block_journal_file_offset = checkpoint.file_offset;
1565 new_super_block_header.journal_checkpoint = min_checkpoint.unwrap_or(checkpoint);
1566 new_super_block_header.journal_checkpoint.version = LATEST_VERSION;
1567 new_super_block_header.journal_file_offsets = journal_file_offsets;
1568 new_super_block_header.borrowed_metadata_space = borrowed;
1569
1570 self.super_block_manager
1571 .save(
1572 new_super_block_header.clone(),
1573 self.objects.root_parent_store().filesystem(),
1574 old_layers,
1575 )
1576 .await?;
1577 {
1578 let mut inner = self.inner.lock();
1579 inner.super_block_header = new_super_block_header;
1580 inner.zero_offset = Some(round_down(old_super_block_offset, BLOCK_SIZE));
1581 }
1582
1583 Ok(())
1584 }
1585
1586 pub async fn sync(
1593 &self,
1594 options: SyncOptions<'_>,
1595 ) -> Result<Option<(JournalCheckpoint, u64)>, Error> {
1596 let _guard = debug_assert_not_too_long!(self.sync_mutex.lock());
1597
1598 let (checkpoint, borrowed) = {
1599 if let Some(precondition) = options.precondition {
1600 if !precondition() {
1601 return Ok(None);
1602 }
1603 }
1604
1605 let _guard = self.writer_mutex.lock();
1608
1609 self.pad_to_block()?
1610 };
1611
1612 if options.flush_device {
1613 self.flush_device(checkpoint.file_offset).await.context("sync: flush failed")?;
1614 }
1615
1616 Ok(Some((checkpoint, borrowed)))
1617 }
1618
1619 fn pad_to_block(&self) -> Result<(JournalCheckpoint, u64), Error> {
1623 let mut inner = self.inner.lock();
1624 let checkpoint = inner.writer.journal_file_checkpoint();
1625 if checkpoint.file_offset % BLOCK_SIZE != 0 {
1626 JournalRecord::EndBlock.serialize_into(&mut inner.writer)?;
1627 inner.writer.pad_to_block()?;
1628 if let Some(waker) = inner.flush_waker.take() {
1629 waker.wake();
1630 }
1631 }
1632 Ok((checkpoint, self.objects.borrowed_metadata_space()))
1633 }
1634
1635 async fn flush_device(&self, checkpoint_offset: u64) -> Result<(), Error> {
1636 assert!(
1637 self.inner.lock().image_builder_mode.is_none(),
1638 "flush_device called in image builder mode"
1639 );
1640 debug_assert_not_too_long!(poll_fn(|ctx| {
1641 let mut inner = self.inner.lock();
1642 if inner.flushed_offset >= checkpoint_offset {
1643 Poll::Ready(Ok(()))
1644 } else if inner.terminate {
1645 let context = inner
1646 .terminate_reason
1647 .as_ref()
1648 .map(|e| format!("Journal closed with error: {:?}", e))
1649 .unwrap_or_else(|| "Journal closed".to_string());
1650 Poll::Ready(Err(anyhow!(FxfsError::JournalFlushError).context(context)))
1651 } else {
1652 inner.sync_waker = Some(ctx.waker().clone());
1653 Poll::Pending
1654 }
1655 }))?;
1656
1657 let needs_flush = self.inner.lock().device_flushed_offset < checkpoint_offset;
1658 if needs_flush {
1659 let trace = self.trace.load(Ordering::Relaxed);
1660 if trace {
1661 info!("J: start flush device");
1662 }
1663 self.handle.get().unwrap().flush_device().await?;
1664 if trace {
1665 info!("J: end flush device");
1666 }
1667
1668 {
1676 let mut inner = self.inner.lock();
1677 inner.device_flushed_offset = checkpoint_offset;
1678 inner.needs_did_flush_device = true;
1679 }
1680
1681 self.objects.allocator().did_flush_device(checkpoint_offset);
1684 if trace {
1685 info!("J: did flush device");
1686 }
1687 }
1688
1689 Ok(())
1690 }
1691
1692 pub fn super_block_header(&self) -> SuperBlockHeader {
1694 self.inner.lock().super_block_header.clone()
1695 }
1696
1697 pub async fn check_journal_space(&self) -> Result<(), Error> {
1699 loop {
1700 debug_assert_not_too_long!({
1701 let inner = self.inner.lock();
1702 if inner.terminate {
1703 let context = inner
1706 .terminate_reason
1707 .as_ref()
1708 .map(|e| format!("Journal closed with error: {:?}", e))
1709 .unwrap_or_else(|| "Journal closed".to_string());
1710 break Err(anyhow!(FxfsError::JournalFlushError).context(context));
1711 }
1712 if self.objects.last_end_offset()
1713 - inner.super_block_header.journal_checkpoint.file_offset
1714 < inner.reclaim_size
1715 {
1716 break Ok(());
1717 }
1718 if inner.image_builder_mode.is_some() {
1719 break Ok(());
1720 }
1721 if inner.disable_compactions {
1722 break Err(
1723 anyhow!(FxfsError::JournalFlushError).context("Compactions disabled")
1724 );
1725 }
1726 self.reclaim_event.listen()
1727 });
1728 }
1729 }
1730
1731 fn chunk_size(&self) -> u64 {
1732 CHUNK_SIZE
1733 }
1734
1735 fn write_and_apply_mutations(&self, transaction: &mut Transaction<'_>) -> u64 {
1736 let checkpoint_before;
1737 let checkpoint_after;
1738 {
1739 let _guard = self.writer_mutex.lock();
1740 checkpoint_before = {
1741 let mut inner = self.inner.lock();
1742 if transaction.includes_write() {
1743 inner.needs_barrier = true;
1744 }
1745 let checkpoint = inner.writer.journal_file_checkpoint();
1746 for TxnMutation { object_id, mutation, .. } in transaction.mutations() {
1747 self.objects.write_mutation(
1748 *object_id,
1749 mutation,
1750 Writer(*object_id, &mut inner.writer),
1751 );
1752 }
1753 checkpoint
1754 };
1755 let maybe_mutation =
1756 self.objects.apply_transaction(transaction, &checkpoint_before).expect(
1757 "apply_transaction should not fail in live mode; \
1758 filesystem will be in an inconsistent state",
1759 );
1760 checkpoint_after = {
1761 let mut inner = self.inner.lock();
1762 if let Some(mutation) = maybe_mutation {
1763 inner
1764 .writer
1765 .write_record(&JournalRecord::Mutation { object_id: 0, mutation })
1766 .unwrap();
1767 }
1768 for (device_range, checksums, first_write) in
1769 transaction.take_checksums().into_iter()
1770 {
1771 inner
1772 .writer
1773 .write_record(&JournalRecord::DataChecksums(
1774 device_range,
1775 Checksums::fletcher(checksums),
1776 first_write,
1777 ))
1778 .unwrap();
1779 }
1780 inner.writer.write_record(&JournalRecord::Commit).unwrap();
1781
1782 inner.writer.journal_file_checkpoint()
1783 };
1784 }
1785 self.objects.did_commit_transaction(
1786 transaction,
1787 &checkpoint_before,
1788 checkpoint_after.file_offset,
1789 );
1790
1791 if let Some(waker) = self.inner.lock().flush_waker.take() {
1792 waker.wake();
1793 }
1794
1795 checkpoint_before.file_offset
1796 }
1797
1798 pub async fn flush_task(self: Arc<Self>) {
1802 let mut flush_fut = None;
1803 let mut compact_fut = None;
1804 let mut flush_error = false;
1805 poll_fn(|ctx| {
1806 loop {
1807 {
1808 let mut inner = self.inner.lock();
1809 if flush_fut.is_none() && !flush_error && self.handle.get().is_some() {
1810 let flushable = inner.writer.flushable_bytes();
1811 if flushable > 0 {
1812 flush_fut = Some(Box::pin(self.flush(flushable)));
1813 }
1814 }
1815 if inner.terminate && flush_fut.is_none() && compact_fut.is_none() {
1816 return Poll::Ready(());
1817 }
1818 let journal_bytes = self.objects.last_end_offset()
1823 - inner.super_block_header.journal_checkpoint.file_offset;
1824 fxfs_trace::counter!("journal-bytes", 0, "total" => journal_bytes);
1825 if compact_fut.is_none()
1829 && !inner.terminate
1830 && !inner.disable_compactions
1831 && inner.image_builder_mode.is_none()
1832 && journal_bytes > inner.reclaim_size / 2
1833 {
1834 compact_fut = Some(Box::pin(self.compact()));
1835 inner.compaction_running = true;
1836 }
1837 inner.flush_waker = Some(ctx.waker().clone());
1838 }
1839 let mut pending = true;
1840 if let Some(fut) = flush_fut.as_mut() {
1841 if let Poll::Ready(result) = fut.poll_unpin(ctx) {
1842 if let Err(e) = result {
1843 self.inner.lock().terminate(Some(e.context("Flush error")));
1844 self.reclaim_event.notify(usize::MAX);
1845 flush_error = true;
1846 }
1847 flush_fut = None;
1848 pending = false;
1849 }
1850 }
1851 if let Some(fut) = compact_fut.as_mut() {
1852 if let Poll::Ready(result) = fut.poll_unpin(ctx) {
1853 let mut inner = self.inner.lock();
1854 if let Err(e) = result {
1855 inner.terminate(Some(e.context("Compaction error")));
1856 }
1857 compact_fut = None;
1858 inner.compaction_running = false;
1859 self.reclaim_event.notify(usize::MAX);
1860 pending = false;
1861 fxfs_trace::counter!(
1862 "journal-bytes",
1863 0,
1864 "total" => self.objects.last_end_offset()
1865 - inner.super_block_header.journal_checkpoint.file_offset
1866 );
1867 }
1868 }
1869 if pending {
1870 return Poll::Pending;
1871 }
1872 }
1873 })
1874 .await;
1875 }
1876
1877 pub fn get_compaction_yielder(&self) -> CompactionYielder<'_> {
1879 CompactionYielder::new(self)
1880 }
1881
1882 async fn flush(&self, amount: usize) -> Result<(), Error> {
1883 let handle = self.handle.get().unwrap();
1884 let mut buf = handle.allocate_buffer(amount).await;
1885 let (offset, len, barrier_on_first_write) = {
1886 let mut inner = self.inner.lock();
1887 let offset = inner.writer.take_flushable(buf.as_mut());
1888 let barrier_on_first_write = inner.needs_barrier && inner.barriers_enabled;
1889 inner.needs_barrier = false;
1892 (offset, buf.len() as u64, barrier_on_first_write)
1893 };
1894 self.handle
1895 .get()
1896 .unwrap()
1897 .overwrite(
1898 offset,
1899 buf.as_mut(),
1900 OverwriteOptions { barrier_on_first_write, ..Default::default() },
1901 )
1902 .await?;
1903
1904 let mut inner = self.inner.lock();
1905 if let Some(waker) = inner.sync_waker.take() {
1906 waker.wake();
1907 }
1908 inner.flushed_offset = offset + len;
1909 inner.valid_to = inner.flushed_offset;
1910 Ok(())
1911 }
1912
1913 #[trace]
1914 async fn compact(&self) -> Result<(), Error> {
1915 assert!(
1916 self.inner.lock().image_builder_mode.is_none(),
1917 "compact called in image builder mode"
1918 );
1919 let bytes_before = self.objects.compaction_bytes_written();
1920 let _measure = crate::metrics::DurationMeasureScope::new(
1921 &crate::metrics::lsm_tree_metrics().journal_compaction_time,
1922 );
1923 crate::metrics::lsm_tree_metrics().journal_compactions_total.add(1);
1924 let trace = self.trace.load(Ordering::Relaxed);
1925 debug!("Compaction starting");
1926 if trace {
1927 info!("J: start compaction");
1928 }
1929 let earliest_version = self.objects.flush().await.context("Failed to flush objects")?;
1930 self.inner.lock().super_block_header.earliest_version = earliest_version;
1931 self.write_super_block().await.context("Failed to write superblock")?;
1932 if trace {
1933 info!("J: end compaction");
1934 }
1935 debug!("Compaction finished");
1936 let bytes_after = self.objects.compaction_bytes_written();
1937 crate::metrics::lsm_tree_metrics()
1938 .journal_compaction_bytes_written
1939 .add(bytes_after.saturating_sub(bytes_before));
1940 Ok(())
1941 }
1942
1943 pub async fn force_compact(&self) -> Result<(), Error> {
1946 self.inner.lock().forced_compaction = true;
1947 scopeguard::defer! { self.inner.lock().forced_compaction = false; }
1948 self.compact().await
1949 }
1950
1951 pub async fn stop_compactions(&self) {
1952 loop {
1953 debug_assert_not_too_long!({
1954 let mut inner = self.inner.lock();
1955 inner.disable_compactions = true;
1956 if !inner.compaction_running {
1957 return;
1958 }
1959 self.reclaim_event.listen()
1960 });
1961 }
1962 }
1963
1964 pub fn track_statistics(self: &Arc<Self>, parent: &fuchsia_inspect::Node, name: &str) {
1967 let this = Arc::downgrade(self);
1968 parent.record_lazy_child(name, move || {
1969 let this_clone = this.clone();
1970 async move {
1971 let inspector = fuchsia_inspect::Inspector::default();
1972 if let Some(this) = this_clone.upgrade() {
1973 let (journal_min, journal_max, journal_reclaim_size) = {
1974 let inner = this.inner.lock();
1976 (
1977 round_down(
1978 inner.super_block_header.journal_checkpoint.file_offset,
1979 BLOCK_SIZE,
1980 ),
1981 inner.flushed_offset,
1982 inner.reclaim_size,
1983 )
1984 };
1985 let root = inspector.root();
1986 root.record_uint("journal_min_offset", journal_min);
1987 root.record_uint("journal_max_offset", journal_max);
1988 root.record_uint("journal_size", journal_max - journal_min);
1989 root.record_uint("journal_reclaim_size", journal_reclaim_size);
1990
1991 if let Some(x) = round_div(
1993 100 * (journal_max - journal_min),
1994 this.objects.allocator().get_disk_bytes(),
1995 ) {
1996 root.record_uint("journal_size_to_disk_size_percent", x);
1997 }
1998 }
1999 Ok(inspector)
2000 }
2001 .boxed()
2002 });
2003 }
2004
2005 pub fn terminate(&self) {
2007 self.inner.lock().terminate(None);
2008 self.reclaim_event.notify(usize::MAX);
2009 }
2010}
2011
2012pub struct Writer<'a>(u64, &'a mut JournalWriter);
2014
2015impl Writer<'_> {
2016 pub fn write(&mut self, mutation: Mutation) {
2017 self.1.write_record(&JournalRecord::Mutation { object_id: self.0, mutation }).unwrap();
2018 }
2019}
2020
2021#[cfg(target_os = "fuchsia")]
2022mod yielder {
2023 use super::Journal;
2024 use crate::lsm_tree::Yielder;
2025 use fuchsia_async as fasync;
2026
2027 pub struct CompactionYielder<'a> {
2031 journal: &'a Journal,
2032 low_priority_task: Option<fasync::LowPriorityTask>,
2033 }
2034
2035 impl<'a> CompactionYielder<'a> {
2036 pub fn new(journal: &'a Journal) -> Self {
2037 Self { journal, low_priority_task: None }
2038 }
2039 }
2040
2041 impl Yielder for CompactionYielder<'_> {
2042 async fn yield_now(&mut self) {
2043 const IDLE_PERIOD: zx::MonotonicDuration = zx::MonotonicDuration::from_millis(4);
2047 const MAX_YIELD_DURATION: zx::MonotonicDuration =
2048 zx::MonotonicDuration::from_millis(16);
2049
2050 {
2051 let inner = self.journal.inner.lock();
2052 if inner.forced_compaction {
2053 return;
2054 }
2055 let outstanding = self.journal.objects.last_end_offset()
2056 - inner.super_block_header.journal_checkpoint.file_offset;
2057 let half_reclaim_size = inner.reclaim_size / 2;
2058 if outstanding
2059 .checked_sub(half_reclaim_size)
2060 .is_some_and(|x| x >= half_reclaim_size / 2)
2061 {
2062 self.low_priority_task = None;
2066 return;
2067 }
2068 }
2069
2070 self.low_priority_task
2071 .get_or_insert_with(|| fasync::LowPriorityTask::new())
2072 .wait_until_idle_for(
2073 IDLE_PERIOD,
2074 fasync::MonotonicInstant::after(MAX_YIELD_DURATION),
2075 )
2076 .await;
2077 }
2078 }
2079}
2080
2081#[cfg(not(target_os = "fuchsia"))]
2082mod yielder {
2083 use super::Journal;
2084 use crate::lsm_tree::Yielder;
2085
2086 #[expect(dead_code)]
2087 pub struct CompactionYielder<'a>(&'a Journal);
2088
2089 impl<'a> CompactionYielder<'a> {
2090 pub fn new(journal: &'a Journal) -> Self {
2091 Self(journal)
2092 }
2093 }
2094
2095 impl Yielder for CompactionYielder<'_> {
2096 async fn yield_now(&mut self) {}
2097 }
2098}
2099
2100pub use yielder::*;
2101
2102#[cfg(test)]
2103mod tests {
2104 use super::SuperBlockInstance;
2105 use crate::filesystem::{FxFilesystem, FxFilesystemBuilder, SyncOptions};
2106 use crate::fsck::fsck;
2107 use crate::object_handle::{ObjectHandle, ReadObjectHandle, WriteObjectHandle};
2108 use crate::object_store::directory::Directory;
2109 use crate::object_store::transaction::Options;
2110 use crate::object_store::volume::root_volume;
2111 use crate::object_store::{
2112 HandleOptions, LockKey, NewChildStoreOptions, ObjectStore, StoreOptions, lock_keys,
2113 };
2114 #[cfg(target_os = "fuchsia")]
2115 use fuchsia_async::TestExecutor;
2116 use fuchsia_async::{self as fasync, MonotonicDuration};
2117 use storage_device::DeviceHolder;
2118 use storage_device::fake_device::FakeDevice;
2119
2120 const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
2121
2122 #[fuchsia::test]
2123 async fn test_replay() {
2124 const TEST_DATA: &[u8] = b"hello";
2125
2126 let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
2127
2128 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2129
2130 let object_id = {
2131 let root_store = fs.root_store();
2132 let root_directory =
2133 Directory::open(&root_store, root_store.root_directory_object_id())
2134 .await
2135 .expect("open failed");
2136 let mut transaction = fs
2137 .clone()
2138 .new_transaction(
2139 lock_keys![LockKey::object(
2140 root_store.store_object_id(),
2141 root_store.root_directory_object_id(),
2142 )],
2143 Options::default(),
2144 )
2145 .await
2146 .expect("new_transaction failed");
2147 let handle = root_directory
2148 .create_child_file(&mut transaction, "test")
2149 .await
2150 .expect("create_child_file failed");
2151
2152 transaction.commit().await.expect("commit failed");
2153 let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
2154 buf.as_mut_slice().copy_from_slice(TEST_DATA);
2155 handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2156 fs.sync(SyncOptions::default()).await.expect("sync failed");
2159 handle.object_id()
2160 };
2161
2162 {
2163 fs.close().await.expect("Close failed");
2164 let device = fs.take_device().await;
2165 device.reopen(false);
2166 let fs = FxFilesystem::open(device).await.expect("open failed");
2167 let handle = ObjectStore::open_object(
2168 &fs.root_store(),
2169 object_id,
2170 HandleOptions::default(),
2171 None,
2172 )
2173 .await
2174 .expect("open_object failed");
2175 let mut buf = handle.allocate_buffer(TEST_DEVICE_BLOCK_SIZE as usize).await;
2176 assert_eq!(handle.read(0, buf.as_mut()).await.expect("read failed"), TEST_DATA.len());
2177 assert_eq!(&buf.as_slice()[..TEST_DATA.len()], TEST_DATA);
2178 fsck(fs.clone()).await.expect("fsck failed");
2179 fs.close().await.expect("Close failed");
2180 }
2181 }
2182
2183 #[fuchsia::test]
2184 async fn test_reset() {
2185 const TEST_DATA: &[u8] = b"hello";
2186
2187 let device = DeviceHolder::new(FakeDevice::new(32768, TEST_DEVICE_BLOCK_SIZE));
2188
2189 let mut object_ids = Vec::new();
2190
2191 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2192 {
2193 let root_store = fs.root_store();
2194 let root_directory =
2195 Directory::open(&root_store, root_store.root_directory_object_id())
2196 .await
2197 .expect("open failed");
2198 let mut transaction = fs
2199 .clone()
2200 .new_transaction(
2201 lock_keys![LockKey::object(
2202 root_store.store_object_id(),
2203 root_store.root_directory_object_id(),
2204 )],
2205 Options::default(),
2206 )
2207 .await
2208 .expect("new_transaction failed");
2209 let handle = root_directory
2210 .create_child_file(&mut transaction, "test")
2211 .await
2212 .expect("create_child_file failed");
2213 transaction.commit().await.expect("commit failed");
2214 let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
2215 buf.as_mut_slice().copy_from_slice(TEST_DATA);
2216 handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2217 fs.sync(SyncOptions::default()).await.expect("sync failed");
2218 object_ids.push(handle.object_id());
2219
2220 for i in 0..1000 {
2223 let mut transaction = fs
2224 .clone()
2225 .new_transaction(
2226 lock_keys![LockKey::object(
2227 root_store.store_object_id(),
2228 root_store.root_directory_object_id(),
2229 )],
2230 Options::default(),
2231 )
2232 .await
2233 .expect("new_transaction failed");
2234 let handle = root_directory
2235 .create_child_file(&mut transaction, &format!("{}", i))
2236 .await
2237 .expect("create_child_file failed");
2238 transaction.commit().await.expect("commit failed");
2239 let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
2240 buf.as_mut_slice().copy_from_slice(TEST_DATA);
2241 handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2242 object_ids.push(handle.object_id());
2243 }
2244 }
2245 fs.close().await.expect("fs close failed");
2246 let device = fs.take_device().await;
2247 device.reopen(false);
2248 let fs = FxFilesystem::open(device).await.expect("open failed");
2249 fsck(fs.clone()).await.expect("fsck failed");
2250 {
2251 let root_store = fs.root_store();
2252 for &object_id in &object_ids[0..1] {
2254 let handle = ObjectStore::open_object(
2255 &root_store,
2256 object_id,
2257 HandleOptions::default(),
2258 None,
2259 )
2260 .await
2261 .expect("open_object failed");
2262 let mut buf = handle.allocate_buffer(TEST_DEVICE_BLOCK_SIZE as usize).await;
2263 assert_eq!(
2264 handle.read(0, buf.as_mut()).await.expect("read failed"),
2265 TEST_DATA.len()
2266 );
2267 assert_eq!(&buf.as_slice()[..TEST_DATA.len()], TEST_DATA);
2268 }
2269
2270 let root_directory =
2272 Directory::open(&root_store, root_store.root_directory_object_id())
2273 .await
2274 .expect("open failed");
2275 let mut transaction = fs
2276 .clone()
2277 .new_transaction(
2278 lock_keys![LockKey::object(
2279 root_store.store_object_id(),
2280 root_store.root_directory_object_id(),
2281 )],
2282 Options::default(),
2283 )
2284 .await
2285 .expect("new_transaction failed");
2286 let handle = root_directory
2287 .create_child_file(&mut transaction, "test2")
2288 .await
2289 .expect("create_child_file failed");
2290 transaction.commit().await.expect("commit failed");
2291 let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
2292 buf.as_mut_slice().copy_from_slice(TEST_DATA);
2293 handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2294 fs.sync(SyncOptions::default()).await.expect("sync failed");
2295 object_ids.push(handle.object_id());
2296 }
2297
2298 fs.close().await.expect("close failed");
2299 let device = fs.take_device().await;
2300 device.reopen(false);
2301 let fs = FxFilesystem::open(device).await.expect("open failed");
2302 {
2303 fsck(fs.clone()).await.expect("fsck failed");
2304
2305 for &object_id in object_ids[0..1].iter().chain(object_ids.last().cloned().iter()) {
2307 let handle = ObjectStore::open_object(
2308 &fs.root_store(),
2309 object_id,
2310 HandleOptions::default(),
2311 None,
2312 )
2313 .await
2314 .unwrap_or_else(|e| {
2315 panic!("open_object failed (object_id: {}): {:?}", object_id, e)
2316 });
2317 let mut buf = handle.allocate_buffer(TEST_DEVICE_BLOCK_SIZE as usize).await;
2318 assert_eq!(
2319 handle.read(0, buf.as_mut()).await.expect("read failed"),
2320 TEST_DATA.len()
2321 );
2322 assert_eq!(&buf.as_slice()[..TEST_DATA.len()], TEST_DATA);
2323 }
2324 }
2325 fs.close().await.expect("close failed");
2326 }
2327
2328 #[fuchsia::test]
2329 async fn test_discard() {
2330 let device = {
2331 let device = DeviceHolder::new(FakeDevice::new(8192, 4096));
2332 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2333 let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
2334
2335 let store = root_volume
2336 .new_volume("test", NewChildStoreOptions::default())
2337 .await
2338 .expect("new_volume failed");
2339 let root_directory = Directory::open(&store, store.root_directory_object_id())
2340 .await
2341 .expect("open failed");
2342
2343 let mut i = 0;
2345 loop {
2346 let mut transaction = fs
2347 .clone()
2348 .new_transaction(
2349 lock_keys![LockKey::object(
2350 store.store_object_id(),
2351 store.root_directory_object_id()
2352 )],
2353 Options::default(),
2354 )
2355 .await
2356 .expect("new_transaction failed");
2357 root_directory
2358 .create_child_file(&mut transaction, &format!("a {i}"))
2359 .await
2360 .expect("create_child_file failed");
2361 if transaction.commit().await.expect("commit failed") > super::CHUNK_SIZE {
2362 break;
2363 }
2364 i += 1;
2365 }
2366
2367 fs.journal().force_compact().await.expect("compact failed");
2369 fs.journal().stop_compactions().await;
2370
2371 let mut i = 0;
2373 loop {
2374 let mut transaction = fs
2375 .clone()
2376 .new_transaction(
2377 lock_keys![LockKey::object(
2378 store.store_object_id(),
2379 store.root_directory_object_id()
2380 )],
2381 Options::default(),
2382 )
2383 .await
2384 .expect("new_transaction failed");
2385 root_directory
2386 .create_child_file(&mut transaction, &format!("b {i}"))
2387 .await
2388 .expect("create_child_file failed");
2389 if transaction.commit().await.expect("commit failed") > 2 * super::CHUNK_SIZE {
2390 break;
2391 }
2392 i += 1;
2393 }
2394
2395 fasync::Timer::new(MonotonicDuration::from_millis(10)).await;
2397 fs.device().snapshot().expect("snapshot failed")
2400 };
2401
2402 let fs = FxFilesystem::open(device).await.expect("open failed");
2403
2404 {
2405 let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
2406
2407 let store =
2408 root_volume.volume("test", StoreOptions::default()).await.expect("volume failed");
2409
2410 let root_directory = Directory::open(&store, store.root_directory_object_id())
2411 .await
2412 .expect("open failed");
2413
2414 let mut transaction = fs
2416 .clone()
2417 .new_transaction(
2418 lock_keys![LockKey::object(
2419 store.store_object_id(),
2420 store.root_directory_object_id()
2421 )],
2422 Options::default(),
2423 )
2424 .await
2425 .expect("new_transaction failed");
2426 root_directory
2427 .create_child_file(&mut transaction, &format!("d"))
2428 .await
2429 .expect("create_child_file failed");
2430 transaction.commit().await.expect("commit failed");
2431 }
2432
2433 fs.close().await.expect("close failed");
2434 let device = fs.take_device().await;
2435 device.reopen(false);
2436
2437 let fs = FxFilesystem::open(device).await.expect("open failed");
2438 fsck(fs.clone()).await.expect("fsck failed");
2439 fs.close().await.expect("close failed");
2440 }
2441
2442 #[fuchsia::test]
2443 async fn test_use_existing_generation() {
2444 let device = DeviceHolder::new(FakeDevice::new(8192, 4096));
2445
2446 let fs = FxFilesystemBuilder::new()
2448 .format(true)
2449 .image_builder_mode(Some(SuperBlockInstance::A))
2450 .open(device)
2451 .await
2452 .expect("open failed");
2453 fs.enable_allocations();
2454 let generation0 = fs.super_block_header().generation;
2455 assert_eq!(generation0, 1);
2456 fs.close().await.expect("close failed");
2457 let device = fs.take_device().await;
2458 device.reopen(false);
2459
2460 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2462 let generation1 = fs.super_block_header().generation;
2463 {
2464 let root_volume = crate::object_store::volume::root_volume(fs.clone())
2465 .await
2466 .expect("root_volume failed");
2467 root_volume
2468 .new_volume("test", crate::object_store::NewChildStoreOptions::default())
2469 .await
2470 .expect("new_volume failed");
2471 }
2472 fs.close().await.expect("close failed");
2473 let device = fs.take_device().await;
2474 device.reopen(false);
2475
2476 let fs = FxFilesystemBuilder::new()
2478 .format(true)
2479 .image_builder_mode(Some(SuperBlockInstance::A))
2480 .open(device)
2481 .await
2482 .expect("open failed");
2483 fs.enable_allocations();
2484 let generation2 = fs.super_block_header().generation;
2485 assert!(
2486 generation2 > generation1,
2487 "generation2 ({}) should be greater than generation1 ({})",
2488 generation2,
2489 generation1
2490 );
2491 fs.close().await.expect("close failed");
2492 }
2493
2494 #[fuchsia::test]
2495 async fn test_image_builder_mode_generation_bump_512_byte_block() {
2496 let device = DeviceHolder::new(FakeDevice::new(16384, 512));
2497
2498 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2500 let generation1 = fs.super_block_header().generation;
2501 fs.close().await.expect("close failed");
2502 let device = fs.take_device().await;
2503 device.reopen(false);
2504
2505 let fs = FxFilesystemBuilder::new()
2507 .format(true)
2508 .image_builder_mode(Some(SuperBlockInstance::A))
2509 .open(device)
2510 .await
2511 .expect("open failed");
2512
2513 fs.enable_allocations();
2514 let generation2 = fs.super_block_header().generation;
2515 assert!(
2516 generation2 > generation1,
2517 "Expected generation bump, got {} vs {}",
2518 generation2,
2519 generation1
2520 );
2521 fs.close().await.expect("close failed");
2522 }
2523
2524 #[fuchsia::test]
2525 #[cfg(target_os = "fuchsia")]
2526 fn test_low_priority_compaction() {
2527 let mut executor = TestExecutor::new_with_fake_time();
2528 let mut fut = std::pin::pin!(async {
2529 use std::sync::Arc;
2530 use std::sync::atomic::{AtomicBool, Ordering};
2531
2532 let device = DeviceHolder::new(FakeDevice::new(16384, TEST_DEVICE_BLOCK_SIZE));
2533 let fs = FxFilesystemBuilder::new()
2534 .journal_options(super::JournalOptions {
2535 reclaim_size: 65536,
2536 ..Default::default()
2537 })
2538 .format(true)
2539 .open(device)
2540 .await
2541 .expect("open failed");
2542
2543 let _low = fasync::LowPriorityTask::new();
2544
2545 {
2547 let root_store = fs.root_store();
2548 let root_directory =
2549 Directory::open(&root_store, root_store.root_directory_object_id())
2550 .await
2551 .expect("open failed");
2552 for i in 0..100 {
2553 let mut transaction = fs
2554 .clone()
2555 .new_transaction(
2556 lock_keys![LockKey::object(
2557 root_store.store_object_id(),
2558 root_store.root_directory_object_id(),
2559 )],
2560 Options::default(),
2561 )
2562 .await
2563 .expect("new_transaction failed");
2564 root_directory
2565 .create_child_file(&mut transaction, &format!("test{}", i))
2566 .await
2567 .expect("create_child_file failed");
2568 transaction.commit().await.expect("commit failed");
2569 }
2570 }
2571
2572 let stop = Arc::new(AtomicBool::new(false));
2574 let stop_clone = stop.clone();
2575 let _normal_task = fasync::Task::spawn(async move {
2576 while !stop_clone.load(Ordering::Relaxed) {
2577 fasync::Timer::new(fasync::MonotonicInstant::after(
2578 MonotonicDuration::from_millis(1),
2579 ))
2580 .await;
2581 }
2582 });
2583
2584 {
2587 let root_store = fs.root_store();
2588 let root_directory =
2589 Directory::open(&root_store, root_store.root_directory_object_id())
2590 .await
2591 .expect("open failed");
2592 let mut i = 0;
2593 loop {
2594 let mut transaction = fs
2595 .clone()
2596 .new_transaction(
2597 lock_keys![LockKey::object(
2598 root_store.store_object_id(),
2599 root_store.root_directory_object_id(),
2600 )],
2601 Options::default(),
2602 )
2603 .await
2604 .expect("new_transaction failed");
2605 root_directory
2606 .create_child_file(&mut transaction, &format!("trigger{i}"))
2607 .await
2608 .expect("create_child_file failed");
2609 transaction.commit().await.expect("commit failed");
2610
2611 if fs.journal().inner.lock().compaction_running {
2612 break;
2613 }
2614 TestExecutor::advance_to(fasync::MonotonicInstant::after(
2615 MonotonicDuration::from_millis(1),
2616 ))
2617 .await;
2618 i += 1;
2619 }
2620 }
2621
2622 for _ in 0..10 {
2625 TestExecutor::advance_to(fasync::MonotonicInstant::after(
2626 MonotonicDuration::from_millis(1),
2627 ))
2628 .await;
2629 assert!(fs.journal().inner.lock().compaction_running);
2630 }
2631
2632 stop.store(true, Ordering::Relaxed);
2634 TestExecutor::advance_to(fasync::MonotonicInstant::after(
2635 MonotonicDuration::from_millis(1),
2636 ))
2637 .await;
2638
2639 assert!(fs.journal().inner.lock().compaction_running);
2642
2643 for _ in 0..3 {
2645 TestExecutor::advance_to(fasync::MonotonicInstant::after(
2646 MonotonicDuration::from_millis(1),
2647 ))
2648 .await;
2649 assert!(fs.journal().inner.lock().compaction_running);
2650 }
2651
2652 TestExecutor::advance_to(fasync::MonotonicInstant::after(
2654 MonotonicDuration::from_millis(1),
2655 ))
2656 .await;
2657
2658 let _ = TestExecutor::poll_until_stalled(std::future::pending::<()>()).await;
2660 assert!(!fs.journal().inner.lock().compaction_running);
2661
2662 fs.close().await.expect("Close failed");
2663 });
2664 assert!(executor.run_until_stalled(&mut fut).is_ready());
2665 }
2666
2667 #[fuchsia::test]
2668 #[cfg(target_os = "fuchsia")]
2669 fn test_low_priority_compaction_deadline() {
2670 let mut executor = TestExecutor::new_with_fake_time();
2671 let mut fut = std::pin::pin!(async {
2672 use std::sync::Arc;
2673 use std::sync::atomic::{AtomicBool, Ordering};
2674
2675 let device = DeviceHolder::new(FakeDevice::new(216384, TEST_DEVICE_BLOCK_SIZE));
2676 let fs = FxFilesystemBuilder::new()
2677 .journal_options(super::JournalOptions {
2678 reclaim_size: 65536,
2679 ..Default::default()
2680 })
2681 .format(true)
2682 .open(device)
2683 .await
2684 .expect("open failed");
2685
2686 let _low = fasync::LowPriorityTask::new();
2687
2688 {
2690 let root_store = fs.root_store();
2691 let root_directory =
2692 Directory::open(&root_store, root_store.root_directory_object_id())
2693 .await
2694 .expect("open failed");
2695 for i in 0..10 {
2696 let mut transaction = fs
2697 .clone()
2698 .new_transaction(
2699 lock_keys![LockKey::object(
2700 root_store.store_object_id(),
2701 root_store.root_directory_object_id(),
2702 )],
2703 Options::default(),
2704 )
2705 .await
2706 .expect("new_transaction failed");
2707 root_directory
2708 .create_child_file(&mut transaction, &format!("test{}", i))
2709 .await
2710 .expect("create_child_file failed");
2711 transaction.commit().await.expect("commit failed");
2712 }
2713 }
2714
2715 let stop = Arc::new(AtomicBool::new(false));
2717 let stop_clone = stop.clone();
2718 let _normal_task = fasync::Task::spawn(async move {
2719 while !stop_clone.load(Ordering::Relaxed) {
2720 fasync::Timer::new(fasync::MonotonicInstant::after(
2721 MonotonicDuration::from_millis(3),
2722 ))
2723 .await;
2724 }
2725 });
2726
2727 {
2729 let root_store = fs.root_store();
2730 let root_directory =
2731 Directory::open(&root_store, root_store.root_directory_object_id())
2732 .await
2733 .expect("open failed");
2734 let mut i = 0;
2735 loop {
2736 let mut transaction = fs
2737 .clone()
2738 .new_transaction(
2739 lock_keys![LockKey::object(
2740 root_store.store_object_id(),
2741 root_store.root_directory_object_id(),
2742 )],
2743 Options::default(),
2744 )
2745 .await
2746 .expect("new_transaction failed");
2747 root_directory
2748 .create_child_file(&mut transaction, &format!("trigger{i}"))
2749 .await
2750 .expect("create_child_file failed");
2751 transaction.commit().await.expect("commit failed");
2752
2753 if fs.journal().inner.lock().compaction_running {
2754 break;
2755 }
2756 TestExecutor::advance_to(fasync::MonotonicInstant::after(
2757 MonotonicDuration::from_millis(1),
2758 ))
2759 .await;
2760 i += 1;
2761 }
2762 }
2763
2764 let mut count = 0;
2767 for _ in 0..1000 {
2768 TestExecutor::advance_to(fasync::MonotonicInstant::after(
2769 MonotonicDuration::from_millis(20),
2770 ))
2771 .await;
2772 if !fs.journal().inner.lock().compaction_running {
2773 break;
2774 }
2775 count += 1;
2776 }
2777 assert!(!fs.journal().inner.lock().compaction_running);
2778
2779 assert!(count > 200);
2782
2783 stop.store(true, Ordering::Relaxed);
2784 fs.close().await.expect("Close failed");
2785 });
2786 assert!(executor.run_until_stalled(&mut fut).is_ready());
2787 }
2788
2789 #[fuchsia::test]
2790 #[cfg(target_os = "fuchsia")]
2791 fn test_low_priority_compaction_no_yielding_when_full() {
2792 let mut executor = TestExecutor::new_with_fake_time();
2793 let mut fut = std::pin::pin!(async {
2794 use std::sync::Arc;
2795 use std::sync::atomic::{AtomicBool, Ordering};
2796
2797 let reclaim_size = 65536;
2798 let device = DeviceHolder::new(FakeDevice::new(216384, TEST_DEVICE_BLOCK_SIZE));
2799 let fs = FxFilesystemBuilder::new()
2800 .journal_options(super::JournalOptions { reclaim_size, ..Default::default() })
2801 .format(true)
2802 .open(device)
2803 .await
2804 .expect("open failed");
2805
2806 let _low = fasync::LowPriorityTask::new();
2807
2808 {
2810 let root_store = fs.root_store();
2811 let root_directory =
2812 Directory::open(&root_store, root_store.root_directory_object_id())
2813 .await
2814 .expect("open failed");
2815 for i in 0..10 {
2816 let mut transaction = fs
2817 .clone()
2818 .new_transaction(
2819 lock_keys![LockKey::object(
2820 root_store.store_object_id(),
2821 root_store.root_directory_object_id(),
2822 )],
2823 Options::default(),
2824 )
2825 .await
2826 .expect("new_transaction failed");
2827 root_directory
2828 .create_child_file(&mut transaction, &format!("test{}", i))
2829 .await
2830 .expect("create_child_file failed");
2831 transaction.commit().await.expect("commit failed");
2832 }
2833 }
2834
2835 let stop = Arc::new(AtomicBool::new(false));
2837 let stop_clone = stop.clone();
2838 let _normal_task = fasync::Task::spawn(async move {
2839 while !stop_clone.load(Ordering::Relaxed) {
2840 fasync::Timer::new(fasync::MonotonicInstant::after(
2841 MonotonicDuration::from_millis(3),
2842 ))
2843 .await;
2844 }
2845 });
2846
2847 {
2849 let root_store = fs.root_store();
2850 let root_directory =
2851 Directory::open(&root_store, root_store.root_directory_object_id())
2852 .await
2853 .expect("open failed");
2854 let mut i = 0;
2855 loop {
2856 let mut transaction = fs
2857 .clone()
2858 .new_transaction(
2859 lock_keys![LockKey::object(
2860 root_store.store_object_id(),
2861 root_store.root_directory_object_id(),
2862 )],
2863 Options::default(),
2864 )
2865 .await
2866 .expect("new_transaction failed");
2867 root_directory
2868 .create_child_file(&mut transaction, &format!("trigger{i}"))
2869 .await
2870 .expect("create_child_file failed");
2871 transaction.commit().await.expect("commit failed");
2872
2873 let outstanding = {
2874 let inner = fs.journal().inner.lock();
2875 fs.journal().objects.last_end_offset()
2876 - inner.super_block_header.journal_checkpoint.file_offset
2877 };
2878 if outstanding >= reclaim_size * 7 / 8 {
2879 break;
2880 }
2881 i += 1;
2883 }
2884 }
2885
2886 TestExecutor::advance_to(fasync::MonotonicInstant::after(
2888 MonotonicDuration::from_millis(4),
2889 ))
2890 .await;
2891
2892 let _ = TestExecutor::poll_until_stalled(std::future::pending::<()>()).await;
2894 assert!(!fs.journal().inner.lock().compaction_running);
2895
2896 stop.store(true, Ordering::Relaxed);
2897 fs.close().await.expect("Close failed");
2898 });
2899 assert!(executor.run_until_stalled(&mut fut).is_ready());
2900 }
2901}
2902
2903#[cfg(fuzz)]
2904mod fuzz {
2905 use fuzz::fuzz;
2906
2907 #[fuzz]
2908 fn fuzz_journal_bytes(input: Vec<u8>) {
2909 use crate::filesystem::FxFilesystem;
2910 use fuchsia_async as fasync;
2911 use std::io::Write;
2912 use storage_device::DeviceHolder;
2913 use storage_device::fake_device::FakeDevice;
2914
2915 fasync::SendExecutorBuilder::new().num_threads(4).build().run(async move {
2916 let device = DeviceHolder::new(FakeDevice::new(32768, 512));
2917 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2918 fs.journal().inner.lock().writer.write_all(&input).expect("write failed");
2919 fs.close().await.expect("close failed");
2920 let device = fs.take_device().await;
2921 device.reopen(false);
2922 if let Ok(fs) = FxFilesystem::open(device).await {
2923 let _ = fs.close().await;
2926 }
2927 });
2928 }
2929
2930 #[fuzz]
2931 fn fuzz_journal(input: Vec<super::JournalRecord>) {
2932 use crate::filesystem::FxFilesystem;
2933 use fuchsia_async as fasync;
2934 use storage_device::DeviceHolder;
2935 use storage_device::fake_device::FakeDevice;
2936
2937 fasync::SendExecutorBuilder::new().num_threads(4).build().run(async move {
2938 let device = DeviceHolder::new(FakeDevice::new(32768, 512));
2939 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2940 {
2941 let mut inner = fs.journal().inner.lock();
2942 for record in &input {
2943 let _ = inner.writer.write_record(record);
2944 }
2945 }
2946 fs.close().await.expect("close failed");
2947 let device = fs.take_device().await;
2948 device.reopen(false);
2949 if let Ok(fs) = FxFilesystem::open(device).await {
2950 let _ = fs.close().await;
2953 }
2954 });
2955 }
2956}