1mod bootstrap_handle;
20mod checksum_list;
21mod reader;
22pub mod super_block;
23mod writer;
24
25use crate::checksum::{Checksum, Checksums, ChecksumsV38};
26use crate::errors::FxfsError;
27use crate::filesystem::{ApplyContext, ApplyMode, FxFilesystem, SyncOptions};
28use crate::log::*;
29use crate::lsm_tree::cache::NullCache;
30use crate::lsm_tree::types::Layer;
31use crate::object_handle::{ObjectHandle as _, ReadObjectHandle};
32use crate::object_store::allocator::Allocator;
33use crate::object_store::data_object_handle::OverwriteOptions;
34use crate::object_store::extent_record::{
35 DEFAULT_DATA_ATTRIBUTE_ID, ExtentKey, ExtentMode, ExtentValue,
36};
37use crate::object_store::graveyard::Graveyard;
38use crate::object_store::journal::bootstrap_handle::BootstrapObjectHandle;
39use crate::object_store::journal::checksum_list::ChecksumList;
40use crate::object_store::journal::reader::{JournalReader, ReadResult};
41use crate::object_store::journal::super_block::{
42 SuperBlockHeader, SuperBlockInstance, SuperBlockManager,
43};
44use crate::object_store::journal::writer::JournalWriter;
45use crate::object_store::object_manager::ObjectManager;
46use crate::object_store::object_record::{AttributeKey, ObjectKey, ObjectKeyData, ObjectValue};
47use crate::object_store::transaction::{
48 AllocatorMutation, LockKey, Mutation, MutationV40, MutationV41, MutationV43, MutationV46,
49 MutationV47, MutationV49, MutationV50, ObjectStoreMutation, Options,
50 TRANSACTION_MAX_JOURNAL_USAGE, Transaction, TxnMutation, lock_keys,
51};
52use crate::object_store::{
53 AssocObj, DataObjectHandle, HandleOptions, HandleOwner, INVALID_OBJECT_ID, Item, ItemRef,
54 NewChildStoreOptions, ObjectStore, ReservedId,
55};
56use crate::range::RangeExt;
57use crate::round::{round_div, round_down};
58use crate::serialized_types::{LATEST_VERSION, Migrate, Version, Versioned, migrate_to_version};
59use anyhow::{Context, Error, anyhow, bail, ensure};
60use event_listener::Event;
61use fprint::TypeFingerprint;
62use fuchsia_sync::Mutex;
63use futures::FutureExt as _;
64use futures::future::poll_fn;
65use rustc_hash::FxHashMap as HashMap;
66use serde::{Deserialize, Serialize};
67use static_assertions::const_assert;
68use std::clone::Clone;
69use std::collections::HashSet;
70use std::num::NonZero;
71use std::ops::{Bound, Range};
72use std::sync::atomic::{AtomicBool, Ordering};
73use std::sync::{Arc, OnceLock};
74use std::task::{Poll, Waker};
75use storage_device::Device;
76
77pub const BLOCK_SIZE: u64 = 4096;
79
80const CHUNK_SIZE: u64 = 131_072;
82const_assert!(CHUNK_SIZE > TRANSACTION_MAX_JOURNAL_USAGE);
83
84pub const DEFAULT_RECLAIM_SIZE: u64 = 262_144;
86
87pub const RESERVED_SPACE: u64 = 1_048_576;
90
91const RESET_XOR: u64 = 0xffffffffffffffff;
96
97pub type JournalCheckpoint = JournalCheckpointV32;
101
102#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize, TypeFingerprint)]
103pub struct JournalCheckpointV32 {
104 pub file_offset: u64,
105
106 pub checksum: Checksum,
109
110 pub version: Version,
114}
115
116pub type JournalRecord = JournalRecordV50;
117
118#[allow(clippy::large_enum_variant)]
119#[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint, Versioned)]
120#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
121pub enum JournalRecordV50 {
122 EndBlock,
124 Mutation { object_id: u64, mutation: MutationV50 },
127 Commit,
129 Discard(u64),
131 DidFlushDevice(u64),
141 DataChecksums(Range<u64>, ChecksumsV38, bool),
149}
150
151#[allow(clippy::large_enum_variant)]
152#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
153#[migrate_to_version(JournalRecordV50)]
154pub enum JournalRecordV49 {
155 EndBlock,
156 Mutation { object_id: u64, mutation: MutationV49 },
157 Commit,
158 Discard(u64),
159 DidFlushDevice(u64),
160 DataChecksums(Range<u64>, ChecksumsV38, bool),
161}
162
163#[allow(clippy::large_enum_variant)]
164#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
165#[migrate_to_version(JournalRecordV49)]
166pub enum JournalRecordV47 {
167 EndBlock,
168 Mutation { object_id: u64, mutation: MutationV47 },
169 Commit,
170 Discard(u64),
171 DidFlushDevice(u64),
172 DataChecksums(Range<u64>, ChecksumsV38, bool),
173}
174
175#[allow(clippy::large_enum_variant)]
176#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
177#[migrate_to_version(JournalRecordV47)]
178pub enum JournalRecordV46 {
179 EndBlock,
180 Mutation { object_id: u64, mutation: MutationV46 },
181 Commit,
182 Discard(u64),
183 DidFlushDevice(u64),
184 DataChecksums(Range<u64>, ChecksumsV38, bool),
185}
186
187#[allow(clippy::large_enum_variant)]
188#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
189#[migrate_to_version(JournalRecordV46)]
190pub enum JournalRecordV43 {
191 EndBlock,
192 Mutation { object_id: u64, mutation: MutationV43 },
193 Commit,
194 Discard(u64),
195 DidFlushDevice(u64),
196 DataChecksums(Range<u64>, ChecksumsV38, bool),
197}
198
199#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
200#[migrate_to_version(JournalRecordV43)]
201pub enum JournalRecordV42 {
202 EndBlock,
203 Mutation { object_id: u64, mutation: MutationV41 },
204 Commit,
205 Discard(u64),
206 DidFlushDevice(u64),
207 DataChecksums(Range<u64>, ChecksumsV38, bool),
208}
209
210#[derive(Serialize, Deserialize, TypeFingerprint, Versioned)]
211pub enum JournalRecordV41 {
212 EndBlock,
213 Mutation { object_id: u64, mutation: MutationV41 },
214 Commit,
215 Discard(u64),
216 DidFlushDevice(u64),
217 DataChecksums(Range<u64>, ChecksumsV38),
218}
219
220impl From<JournalRecordV41> for JournalRecordV42 {
221 fn from(record: JournalRecordV41) -> Self {
222 match record {
223 JournalRecordV41::EndBlock => Self::EndBlock,
224 JournalRecordV41::Mutation { object_id, mutation } => {
225 Self::Mutation { object_id, mutation: mutation.into() }
226 }
227 JournalRecordV41::Commit => Self::Commit,
228 JournalRecordV41::Discard(offset) => Self::Discard(offset),
229 JournalRecordV41::DidFlushDevice(offset) => Self::DidFlushDevice(offset),
230 JournalRecordV41::DataChecksums(range, sums) => {
231 Self::DataChecksums(range, sums, true)
234 }
235 }
236 }
237}
238
239#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
240#[migrate_to_version(JournalRecordV41)]
241pub enum JournalRecordV40 {
242 EndBlock,
243 Mutation { object_id: u64, mutation: MutationV40 },
244 Commit,
245 Discard(u64),
246 DidFlushDevice(u64),
247 DataChecksums(Range<u64>, ChecksumsV38),
248}
249
250pub(super) fn journal_handle_options() -> HandleOptions {
251 HandleOptions { skip_journal_checks: true, ..Default::default() }
252}
253
254pub struct Journal {
260 objects: Arc<ObjectManager>,
261 handle: OnceLock<DataObjectHandle<ObjectStore>>,
262 super_block_manager: SuperBlockManager,
263 inner: Mutex<Inner>,
264 writer_mutex: Mutex<()>,
265 sync_mutex: futures::lock::Mutex<()>,
266 trace: AtomicBool,
267
268 reclaim_event: Event,
270}
271
272struct Inner {
273 super_block_header: SuperBlockHeader,
274
275 zero_offset: Option<u64>,
277
278 device_flushed_offset: u64,
280
281 needs_did_flush_device: bool,
283
284 writer: JournalWriter,
286
287 output_reset_version: bool,
290
291 flush_waker: Option<Waker>,
293
294 terminate: bool,
296
297 terminate_reason: Option<Error>,
299
300 disable_compactions: bool,
302
303 compaction_running: bool,
305
306 sync_waker: Option<Waker>,
308
309 flushed_offset: u64,
311
312 valid_to: u64,
317
318 discard_offset: Option<u64>,
322
323 reclaim_size: u64,
327
328 image_builder_mode: Option<SuperBlockInstance>,
329
330 barriers_enabled: bool,
334
335 needs_barrier: bool,
338}
339
340impl Inner {
341 fn terminate(&mut self, reason: Option<Error>) {
342 self.terminate = true;
343
344 if let Some(err) = reason {
345 error!(error:? = err; "Terminating journal");
346 if let Some(prev_err) = self.terminate_reason.as_ref() {
348 error!(error:? = prev_err; "Journal previously terminated");
349 } else {
350 self.terminate_reason = Some(err);
351 }
352 }
353
354 if let Some(waker) = self.flush_waker.take() {
355 waker.wake();
356 }
357 if let Some(waker) = self.sync_waker.take() {
358 waker.wake();
359 }
360 }
361}
362
363pub struct JournalOptions {
364 pub reclaim_size: u64,
368
369 pub barriers_enabled: bool,
373}
374
375impl Default for JournalOptions {
376 fn default() -> Self {
377 JournalOptions { reclaim_size: DEFAULT_RECLAIM_SIZE, barriers_enabled: false }
378 }
379}
380
381struct JournaledTransactions {
382 transactions: Vec<JournaledTransaction>,
383 device_flushed_offset: u64,
384}
385
386#[derive(Debug, Default)]
387pub struct JournaledTransaction {
388 pub checkpoint: JournalCheckpoint,
389 pub root_parent_mutations: Vec<Mutation>,
390 pub root_mutations: Vec<Mutation>,
391 pub non_root_mutations: Vec<(u64, Mutation)>,
393 pub end_offset: u64,
394 pub checksums: Vec<JournaledChecksums>,
395
396 pub end_flush: Option<(u64, u64)>,
400
401 pub volume_deleted: Option<u64>,
403}
404
405impl JournaledTransaction {
406 fn new(checkpoint: JournalCheckpoint) -> Self {
407 Self { checkpoint, ..Default::default() }
408 }
409}
410
411const VOLUME_DELETED: u64 = u64::MAX;
412
413#[derive(Debug)]
414pub struct JournaledChecksums {
415 pub device_range: Range<u64>,
416 pub checksums: Checksums,
417 pub first_write: bool,
418}
419
420pub trait JournalHandle: ReadObjectHandle {
423 fn end_offset(&self) -> Option<u64>;
427 fn push_extent(&mut self, added_offset: u64, device_range: Range<u64>);
431 fn discard_extents(&mut self, discard_offset: u64);
433}
434
435impl<S: HandleOwner> JournalHandle for DataObjectHandle<S> {
439 fn end_offset(&self) -> Option<u64> {
440 None
441 }
442 fn push_extent(&mut self, _added_offset: u64, _device_range: Range<u64>) {
443 }
445 fn discard_extents(&mut self, _discard_offset: u64) {
446 }
448}
449
450#[fxfs_trace::trace]
451impl Journal {
452 pub fn new(objects: Arc<ObjectManager>, options: JournalOptions) -> Journal {
453 let starting_checksum = rand::random_range(1..u64::MAX);
454 Journal {
455 objects: objects,
456 handle: OnceLock::new(),
457 super_block_manager: SuperBlockManager::new(),
458 inner: Mutex::new(Inner {
459 super_block_header: SuperBlockHeader::default(),
460 zero_offset: None,
461 device_flushed_offset: 0,
462 needs_did_flush_device: false,
463 writer: JournalWriter::new(BLOCK_SIZE as usize, starting_checksum),
464 output_reset_version: false,
465 flush_waker: None,
466 terminate: false,
467 terminate_reason: None,
468 disable_compactions: false,
469 compaction_running: false,
470 sync_waker: None,
471 flushed_offset: 0,
472 valid_to: 0,
473 discard_offset: None,
474 reclaim_size: options.reclaim_size,
475 image_builder_mode: None,
476 barriers_enabled: options.barriers_enabled,
477 needs_barrier: false,
478 }),
479 writer_mutex: Mutex::new(()),
480 sync_mutex: futures::lock::Mutex::new(()),
481 trace: AtomicBool::new(false),
482 reclaim_event: Event::new(),
483 }
484 }
485
486 pub fn set_trace(&self, trace: bool) {
487 let old_value = self.trace.swap(trace, Ordering::Relaxed);
488 if trace != old_value {
489 info!(trace; "J: trace");
490 }
491 }
492
493 pub fn set_image_builder_mode(&self, mode: Option<SuperBlockInstance>) {
494 self.inner.lock().image_builder_mode = mode;
495 if let Some(instance) = mode {
496 *self.super_block_manager.next_instance.lock() = instance;
497 }
498 }
499
500 pub fn image_builder_mode(&self) -> Option<SuperBlockInstance> {
501 self.inner.lock().image_builder_mode
502 }
503
504 #[cfg(feature = "migration")]
505 pub fn set_filesystem_uuid(&self, uuid: &[u8; 16]) -> Result<(), Error> {
506 ensure!(
507 self.inner.lock().image_builder_mode.is_some(),
508 "Can only set filesystem uuid in image builder mode."
509 );
510 self.inner.lock().super_block_header.guid.0 = uuid::Uuid::from_bytes(*uuid);
511 Ok(())
512 }
513
514 pub(crate) async fn read_superblocks(
515 &self,
516 device: Arc<dyn Device>,
517 block_size: u64,
518 ) -> Result<(SuperBlockHeader, ObjectStore), Error> {
519 self.super_block_manager.load(device, block_size).await
520 }
521
522 fn validate_mutation(&self, mutation: &Mutation, block_size: u64, device_size: u64) -> bool {
526 match mutation {
527 Mutation::ObjectStore(ObjectStoreMutation {
528 item:
529 Item {
530 key:
531 ObjectKey {
532 data:
533 ObjectKeyData::Attribute(
534 _,
535 AttributeKey::Extent(ExtentKey { range }),
536 ),
537 ..
538 },
539 value: ObjectValue::Extent(ExtentValue::Some { device_offset, mode, .. }),
540 ..
541 },
542 ..
543 }) => {
544 if range.is_empty() || !range.is_aligned(block_size) {
545 return false;
546 }
547 let len = range.length().unwrap();
548 if let ExtentMode::Cow(checksums) = mode {
549 if checksums.len() > 0 {
550 if len % checksums.len() as u64 != 0 {
551 return false;
552 }
553 if (len / checksums.len() as u64) % block_size != 0 {
554 return false;
555 }
556 }
557 }
558 if *device_offset % block_size != 0
559 || *device_offset >= device_size
560 || device_size - *device_offset < len
561 {
562 return false;
563 }
564 }
565 Mutation::ObjectStore(_) => {}
566 Mutation::EncryptedObjectStore(_) => {}
567 Mutation::Allocator(AllocatorMutation::Allocate { device_range, owner_object_id }) => {
568 return !device_range.is_empty()
569 && *owner_object_id != INVALID_OBJECT_ID
570 && device_range.end <= device_size;
571 }
572 Mutation::Allocator(AllocatorMutation::Deallocate {
573 device_range,
574 owner_object_id,
575 }) => {
576 return !device_range.is_empty()
577 && *owner_object_id != INVALID_OBJECT_ID
578 && device_range.end <= device_size;
579 }
580 Mutation::Allocator(AllocatorMutation::MarkForDeletion(owner_object_id)) => {
581 return *owner_object_id != INVALID_OBJECT_ID;
582 }
583 Mutation::Allocator(AllocatorMutation::SetLimit { owner_object_id, .. }) => {
584 return *owner_object_id != INVALID_OBJECT_ID;
585 }
586 Mutation::BeginFlush => {}
587 Mutation::EndFlush => {}
588 Mutation::DeleteVolume => {}
589 Mutation::UpdateBorrowed(_) => {}
590 Mutation::UpdateMutationsKey(_) => {}
591 Mutation::CreateInternalDir(owner_object_id) => {
592 return *owner_object_id != INVALID_OBJECT_ID;
593 }
594 }
595 true
596 }
597
598 fn update_checksum_list(
600 &self,
601 journal_offset: u64,
602 mutation: &Mutation,
603 checksum_list: &mut ChecksumList,
604 ) -> Result<(), Error> {
605 match mutation {
606 Mutation::ObjectStore(_) => {}
607 Mutation::Allocator(AllocatorMutation::Deallocate { device_range, .. }) => {
608 checksum_list.mark_deallocated(journal_offset, device_range.clone().into());
609 }
610 _ => {}
611 }
612 Ok(())
613 }
614
615 #[trace]
617 pub async fn replay(
618 &self,
619 filesystem: Arc<FxFilesystem>,
620 on_new_allocator: Option<Box<dyn Fn(Arc<Allocator>) + Send + Sync>>,
621 ) -> Result<(), Error> {
622 let block_size = filesystem.block_size();
623
624 let (super_block, root_parent) =
625 self.super_block_manager.load(filesystem.device(), block_size).await?;
626
627 let root_parent = Arc::new(ObjectStore::attach_filesystem(root_parent, filesystem.clone()));
628
629 self.objects.set_root_parent_store(root_parent.clone());
630 let allocator =
631 Arc::new(Allocator::new(filesystem.clone(), super_block.allocator_object_id));
632 if let Some(on_new_allocator) = on_new_allocator {
633 on_new_allocator(allocator.clone());
634 }
635 self.objects.set_allocator(allocator.clone());
636 self.objects.set_borrowed_metadata_space(super_block.borrowed_metadata_space);
637 self.objects.set_last_end_offset(super_block.super_block_journal_file_offset);
638 {
639 let mut inner = self.inner.lock();
640 inner.super_block_header = super_block.clone();
641 }
642
643 let device = filesystem.device();
644
645 let mut handle;
646 {
647 let root_parent_layer = root_parent.tree().mutable_layer();
648 let mut iter = root_parent_layer
649 .seek(Bound::Included(&ObjectKey::attribute(
650 super_block.journal_object_id,
651 DEFAULT_DATA_ATTRIBUTE_ID,
652 AttributeKey::Extent(ExtentKey::search_key_from_offset(round_down(
653 super_block.journal_checkpoint.file_offset,
654 BLOCK_SIZE,
655 ))),
656 )))
657 .await
658 .context("Failed to seek root parent store")?;
659 let start_offset = if let Some(ItemRef {
660 key:
661 ObjectKey {
662 data:
663 ObjectKeyData::Attribute(
664 DEFAULT_DATA_ATTRIBUTE_ID,
665 AttributeKey::Extent(ExtentKey { range }),
666 ),
667 ..
668 },
669 ..
670 }) = iter.get()
671 {
672 range.start
673 } else {
674 0
675 };
676 handle = BootstrapObjectHandle::new_with_start_offset(
677 super_block.journal_object_id,
678 device.clone(),
679 start_offset,
680 );
681 while let Some(item) = iter.get() {
682 if !match item.into() {
683 Some((
684 object_id,
685 DEFAULT_DATA_ATTRIBUTE_ID,
686 ExtentKey { range },
687 ExtentValue::Some { device_offset, .. },
688 )) if object_id == super_block.journal_object_id => {
689 if let Some(end_offset) = handle.end_offset() {
690 if range.start != end_offset {
691 bail!(anyhow!(FxfsError::Inconsistent).context(format!(
692 "Unexpected journal extent {:?}, expected start: {}",
693 item, end_offset
694 )));
695 }
696 }
697 handle.push_extent(
698 0, *device_offset
700 ..*device_offset + range.length().context("Invalid extent")?,
701 );
702 true
703 }
704 _ => false,
705 } {
706 break;
707 }
708 iter.advance().await.context("Failed to advance root parent store iterator")?;
709 }
710 }
711
712 let mut reader = JournalReader::new(handle, &super_block.journal_checkpoint);
713 let JournaledTransactions { mut transactions, device_flushed_offset } = self
714 .read_transactions(&mut reader, None, INVALID_OBJECT_ID)
715 .await
716 .context("Reading transactions for replay")?;
717
718 let mut checksum_list = ChecksumList::new(device_flushed_offset);
720 let mut valid_to = reader.journal_file_checkpoint().file_offset;
721 let device_size = device.size();
722 'bad_replay: for JournaledTransaction {
723 checkpoint,
724 root_parent_mutations,
725 root_mutations,
726 non_root_mutations,
727 checksums,
728 ..
729 } in &transactions
730 {
731 for JournaledChecksums { device_range, checksums, first_write } in checksums {
732 checksum_list
733 .push(
734 checkpoint.file_offset,
735 device_range.clone(),
736 checksums.maybe_as_ref().context("Malformed checksums")?,
737 *first_write,
738 )
739 .context("Pushing journal checksum records to checksum list")?;
740 }
741 for mutation in root_parent_mutations
742 .iter()
743 .chain(root_mutations)
744 .chain(non_root_mutations.iter().map(|(_, m)| m))
745 {
746 if !self.validate_mutation(mutation, block_size, device_size) {
747 info!(mutation:?; "Stopping replay at bad mutation");
748 valid_to = checkpoint.file_offset;
749 break 'bad_replay;
750 }
751 self.update_checksum_list(checkpoint.file_offset, &mutation, &mut checksum_list)?;
752 }
753 }
754
755 let valid_to = checksum_list
758 .verify(device.as_ref(), valid_to)
759 .await
760 .context("Failed to validate checksums")?;
761
762 let mut last_checkpoint = reader.journal_file_checkpoint();
765 let mut journal_offsets = super_block.journal_file_offsets.clone();
766
767 for (
770 index,
771 JournaledTransaction {
772 checkpoint,
773 root_parent_mutations,
774 end_flush,
775 volume_deleted,
776 ..
777 },
778 ) in transactions.iter_mut().enumerate()
779 {
780 if checkpoint.file_offset >= valid_to {
781 last_checkpoint = checkpoint.clone();
782
783 transactions.truncate(index);
785 break;
786 }
787
788 let context = ApplyContext { mode: ApplyMode::Replay, checkpoint: checkpoint.clone() };
789 for mutation in root_parent_mutations.drain(..) {
790 self.objects
791 .apply_mutation(
792 super_block.root_parent_store_object_id,
793 mutation,
794 &context,
795 AssocObj::None,
796 )
797 .context("Failed to replay root parent store mutations")?;
798 }
799
800 if let Some((object_id, journal_offset)) = end_flush {
801 journal_offsets.insert(*object_id, *journal_offset);
802 }
803
804 if let Some(object_id) = volume_deleted {
805 journal_offsets.insert(*object_id, VOLUME_DELETED);
806 }
807 }
808
809 let root_store = ObjectStore::open(
811 &root_parent,
812 super_block.root_store_object_id,
813 Box::new(NullCache {}),
814 )
815 .await
816 .context("Unable to open root store")?;
817
818 ensure!(
819 !root_store.is_encrypted(),
820 anyhow!(FxfsError::Inconsistent).context("Root store is encrypted")
821 );
822 self.objects.set_root_store(root_store);
823
824 let root_store_offset =
825 journal_offsets.get(&super_block.root_store_object_id).copied().unwrap_or(0);
826
827 for JournaledTransaction { checkpoint, root_mutations, .. } in &mut transactions {
829 if checkpoint.file_offset < root_store_offset {
830 continue;
831 }
832
833 let context = ApplyContext { mode: ApplyMode::Replay, checkpoint: checkpoint.clone() };
834 for mutation in root_mutations.drain(..) {
835 self.objects
836 .apply_mutation(
837 super_block.root_store_object_id,
838 mutation,
839 &context,
840 AssocObj::None,
841 )
842 .context("Failed to replay root store mutations")?;
843 }
844 }
845
846 allocator.open().await.context("Failed to open allocator")?;
848
849 for JournaledTransaction { checkpoint, non_root_mutations, end_offset, .. } in transactions
851 {
852 self.objects
853 .replay_mutations(
854 non_root_mutations,
855 &journal_offsets,
856 &ApplyContext { mode: ApplyMode::Replay, checkpoint },
857 end_offset,
858 )
859 .await
860 .context("Failed to replay mutations")?;
861 }
862
863 allocator.on_replay_complete().await.context("Failed to complete replay for allocator")?;
864
865 let discarded_to =
866 if last_checkpoint.file_offset != reader.journal_file_checkpoint().file_offset {
867 Some(reader.journal_file_checkpoint().file_offset)
868 } else {
869 None
870 };
871
872 {
874 if last_checkpoint.file_offset < super_block.super_block_journal_file_offset {
875 return Err(anyhow!(FxfsError::Inconsistent).context(format!(
876 "journal replay cut short; journal finishes at {}, but super-block was \
877 written at {}",
878 last_checkpoint.file_offset, super_block.super_block_journal_file_offset
879 )));
880 }
881 let handle = ObjectStore::open_object(
882 &root_parent,
883 super_block.journal_object_id,
884 journal_handle_options(),
885 None,
886 )
887 .await
888 .with_context(|| {
889 format!(
890 "Failed to open journal file (object id: {})",
891 super_block.journal_object_id
892 )
893 })?;
894 let _ = self.handle.set(handle);
895 let mut inner = self.inner.lock();
896 reader.skip_to_end_of_block();
897 let mut writer_checkpoint = reader.journal_file_checkpoint();
898
899 std::mem::drop(reader);
901
902 writer_checkpoint.checksum ^= RESET_XOR;
904 writer_checkpoint.version = LATEST_VERSION;
905 inner.flushed_offset = writer_checkpoint.file_offset;
906
907 inner.device_flushed_offset = inner.flushed_offset;
909
910 inner.writer.seek(writer_checkpoint);
911 inner.output_reset_version = true;
912 inner.valid_to = last_checkpoint.file_offset;
913 if last_checkpoint.file_offset < inner.flushed_offset {
914 inner.discard_offset = Some(last_checkpoint.file_offset);
915 }
916 }
917
918 self.objects
919 .on_replay_complete()
920 .await
921 .context("Failed to complete replay for object manager")?;
922
923 info!(checkpoint = last_checkpoint.file_offset, discarded_to; "replay complete");
924 Ok(())
925 }
926
927 async fn read_transactions(
928 &self,
929 reader: &mut JournalReader,
930 end_offset: Option<u64>,
931 object_id_filter: u64,
932 ) -> Result<JournaledTransactions, Error> {
933 let mut transactions = Vec::new();
934 let (mut device_flushed_offset, root_parent_store_object_id, root_store_object_id) = {
935 let super_block = &self.inner.lock().super_block_header;
936 (
937 super_block.super_block_journal_file_offset,
938 super_block.root_parent_store_object_id,
939 super_block.root_store_object_id,
940 )
941 };
942 let mut current_transaction = None;
943 let mut begin_flush_offsets = HashMap::default();
944 let mut stores_deleted = HashSet::new();
945 loop {
946 let checkpoint = reader.journal_file_checkpoint();
948 if let Some(end_offset) = end_offset {
949 if checkpoint.file_offset >= end_offset {
950 break;
951 }
952 }
953 let result =
954 reader.deserialize().await.context("Failed to deserialize journal record")?;
955 match result {
956 ReadResult::Reset(_) => {
957 if current_transaction.is_some() {
958 current_transaction = None;
959 transactions.pop();
960 }
961 let offset = reader.journal_file_checkpoint().file_offset;
962 if offset > device_flushed_offset {
963 device_flushed_offset = offset;
964 }
965 }
966 ReadResult::Some(record) => {
967 match record {
968 JournalRecord::EndBlock => {
969 reader.skip_to_end_of_block();
970 }
971 JournalRecord::Mutation { object_id, mutation } => {
972 let current_transaction = match current_transaction.as_mut() {
973 None => {
974 transactions.push(JournaledTransaction::new(checkpoint));
975 current_transaction = transactions.last_mut();
976 current_transaction.as_mut().unwrap()
977 }
978 Some(transaction) => transaction,
979 };
980
981 if stores_deleted.contains(&object_id) {
982 bail!(
983 anyhow!(FxfsError::Inconsistent)
984 .context("Encountered mutations for deleted store")
985 );
986 }
987
988 match &mutation {
989 Mutation::BeginFlush => {
990 begin_flush_offsets.insert(
991 object_id,
992 current_transaction.checkpoint.file_offset,
993 );
994 }
995 Mutation::EndFlush => {
996 if let Some(offset) = begin_flush_offsets.remove(&object_id) {
997 if let Some(deleted_volume) =
998 ¤t_transaction.volume_deleted
999 {
1000 if *deleted_volume == object_id {
1001 bail!(anyhow!(FxfsError::Inconsistent).context(
1002 "Multiple EndFlush/DeleteVolume mutations in a \
1003 single transaction for the same object"
1004 ));
1005 }
1006 }
1007 if current_transaction
1010 .end_flush
1011 .replace((object_id, offset + 1))
1012 .is_some()
1013 {
1014 bail!(anyhow!(FxfsError::Inconsistent).context(
1015 "Multiple EndFlush mutations in a \
1016 single transaction"
1017 ));
1018 }
1019 }
1020 }
1021 Mutation::DeleteVolume => {
1022 if let Some((flushed_object, _)) =
1023 ¤t_transaction.end_flush
1024 {
1025 if *flushed_object == object_id {
1026 bail!(anyhow!(FxfsError::Inconsistent).context(
1027 "Multiple EndFlush/DeleteVolume mutations in a \
1028 single transaction for the same object"
1029 ));
1030 }
1031 }
1032 if current_transaction
1033 .volume_deleted
1034 .replace(object_id)
1035 .is_some()
1036 {
1037 bail!(anyhow!(FxfsError::Inconsistent).context(
1038 "Multiple DeleteVolume mutations in a single \
1039 transaction"
1040 ));
1041 }
1042 stores_deleted.insert(object_id);
1043 }
1044 _ => {}
1045 }
1046
1047 if (object_id_filter == INVALID_OBJECT_ID
1050 || object_id_filter == object_id)
1051 && self.should_apply(object_id, ¤t_transaction.checkpoint)
1052 {
1053 if object_id == root_parent_store_object_id {
1054 current_transaction.root_parent_mutations.push(mutation);
1055 } else if object_id == root_store_object_id {
1056 current_transaction.root_mutations.push(mutation);
1057 } else {
1058 current_transaction
1059 .non_root_mutations
1060 .push((object_id, mutation));
1061 }
1062 }
1063 }
1064 JournalRecord::DataChecksums(device_range, checksums, first_write) => {
1065 let current_transaction = match current_transaction.as_mut() {
1066 None => {
1067 transactions.push(JournaledTransaction::new(checkpoint));
1068 current_transaction = transactions.last_mut();
1069 current_transaction.as_mut().unwrap()
1070 }
1071 Some(transaction) => transaction,
1072 };
1073 current_transaction.checksums.push(JournaledChecksums {
1074 device_range,
1075 checksums,
1076 first_write,
1077 });
1078 }
1079 JournalRecord::Commit => {
1080 if let Some(&mut JournaledTransaction {
1081 ref checkpoint,
1082 ref root_parent_mutations,
1083 ref mut end_offset,
1084 ..
1085 }) = current_transaction.take()
1086 {
1087 for mutation in root_parent_mutations {
1088 if let Mutation::ObjectStore(ObjectStoreMutation {
1092 item:
1093 Item {
1094 key:
1095 ObjectKey {
1096 object_id,
1097 data:
1098 ObjectKeyData::Attribute(
1099 DEFAULT_DATA_ATTRIBUTE_ID,
1100 AttributeKey::Extent(ExtentKey {
1101 range,
1102 }),
1103 ),
1104 ..
1105 },
1106 value:
1107 ObjectValue::Extent(ExtentValue::Some {
1108 device_offset,
1109 ..
1110 }),
1111 ..
1112 },
1113 ..
1114 }) = mutation
1115 {
1116 let handle = reader.handle();
1119 if *object_id != handle.object_id() {
1120 continue;
1121 }
1122 if let Some(end_offset) = handle.end_offset() {
1123 if range.start != end_offset {
1124 bail!(anyhow!(FxfsError::Inconsistent).context(
1125 format!(
1126 "Unexpected journal extent {:?} -> {}, \
1127 expected start: {}",
1128 range, device_offset, end_offset,
1129 )
1130 ));
1131 }
1132 }
1133 handle.push_extent(
1134 checkpoint.file_offset,
1135 *device_offset
1136 ..*device_offset
1137 + range.length().context("Invalid extent")?,
1138 );
1139 }
1140 }
1141 *end_offset = reader.journal_file_checkpoint().file_offset;
1142 }
1143 }
1144 JournalRecord::Discard(offset) => {
1145 if offset == 0 {
1146 bail!(
1147 anyhow!(FxfsError::Inconsistent)
1148 .context("Invalid offset for Discard")
1149 );
1150 }
1151 if let Some(transaction) = current_transaction.as_ref() {
1152 if transaction.checkpoint.file_offset < offset {
1153 continue;
1155 }
1156 }
1157 current_transaction = None;
1158 while let Some(transaction) = transactions.last() {
1159 if transaction.checkpoint.file_offset < offset {
1160 break;
1161 }
1162 transactions.pop();
1163 }
1164 reader.handle().discard_extents(offset);
1165 }
1166 JournalRecord::DidFlushDevice(offset) => {
1167 if offset > device_flushed_offset {
1168 device_flushed_offset = offset;
1169 }
1170 }
1171 }
1172 }
1173 ReadResult::ChecksumMismatch => break,
1175 }
1176 }
1177
1178 if current_transaction.is_some() {
1180 transactions.pop();
1181 }
1182
1183 Ok(JournaledTransactions { transactions, device_flushed_offset })
1184 }
1185
1186 pub async fn init_empty(&self, filesystem: Arc<FxFilesystem>) -> Result<(), Error> {
1189 const INIT_ROOT_PARENT_STORE_OBJECT_ID: u64 = 3;
1195 const INIT_ROOT_STORE_OBJECT_ID: u64 = 4;
1196 const INIT_ALLOCATOR_OBJECT_ID: u64 = 5;
1197
1198 info!(device_size = filesystem.device().size(); "Formatting");
1199
1200 let checkpoint = JournalCheckpoint {
1201 version: LATEST_VERSION,
1202 ..self.inner.lock().writer.journal_file_checkpoint()
1203 };
1204
1205 let mut current_generation = 1;
1206 if filesystem.options().image_builder_mode.is_some() {
1207 let block_size = filesystem.block_size();
1215 match self.read_superblocks(filesystem.device(), block_size).await {
1216 Ok((super_block, _)) => {
1217 log::info!(
1218 "Found existing superblock with generation {}. Bumping by 1.",
1219 super_block.generation
1220 );
1221 current_generation = super_block.generation.wrapping_add(1);
1222 }
1223 Err(_) => {
1224 }
1228 }
1229 }
1230
1231 let root_parent = ObjectStore::new_empty(
1232 None,
1233 INIT_ROOT_PARENT_STORE_OBJECT_ID,
1234 filesystem.clone(),
1235 Box::new(NullCache {}),
1236 );
1237 self.objects.set_root_parent_store(root_parent.clone());
1238
1239 let allocator = Arc::new(Allocator::new(filesystem.clone(), INIT_ALLOCATOR_OBJECT_ID));
1240 self.objects.set_allocator(allocator.clone());
1241 self.objects.init_metadata_reservation()?;
1242
1243 let journal_handle;
1244 let super_block_a_handle;
1245 let super_block_b_handle;
1246 let root_store;
1247 let mut transaction = filesystem
1248 .clone()
1249 .new_transaction(
1250 lock_keys![],
1251 Options { skip_journal_checks: true, ..Default::default() },
1252 )
1253 .await?;
1254 root_store = root_parent
1255 .new_child_store(
1256 &mut transaction,
1257 NewChildStoreOptions { object_id: INIT_ROOT_STORE_OBJECT_ID, ..Default::default() },
1258 Box::new(NullCache {}),
1259 )
1260 .await
1261 .context("new_child_store")?;
1262 self.objects.set_root_store(root_store.clone());
1263
1264 allocator.create(&mut transaction).await?;
1265
1266 super_block_a_handle = ObjectStore::create_object_with_id(
1268 &root_store,
1269 &mut transaction,
1270 ReservedId::new(&root_store, NonZero::new(SuperBlockInstance::A.object_id()).unwrap()),
1271 HandleOptions::default(),
1272 None,
1273 )
1274 .context("create super block")?;
1275 root_store.update_last_object_id(SuperBlockInstance::A.object_id());
1276 super_block_a_handle
1277 .extend(&mut transaction, SuperBlockInstance::A.first_extent())
1278 .await
1279 .context("extend super block")?;
1280 super_block_b_handle = ObjectStore::create_object_with_id(
1281 &root_store,
1282 &mut transaction,
1283 ReservedId::new(&root_store, NonZero::new(SuperBlockInstance::B.object_id()).unwrap()),
1284 HandleOptions::default(),
1285 None,
1286 )
1287 .context("create super block")?;
1288 root_store.update_last_object_id(SuperBlockInstance::B.object_id());
1289 super_block_b_handle
1290 .extend(&mut transaction, SuperBlockInstance::B.first_extent())
1291 .await
1292 .context("extend super block")?;
1293
1294 journal_handle = ObjectStore::create_object(
1296 &root_parent,
1297 &mut transaction,
1298 journal_handle_options(),
1299 None,
1300 )
1301 .await
1302 .context("create journal")?;
1303 if self.inner.lock().image_builder_mode.is_none() {
1304 let mut file_range = 0..self.chunk_size();
1305 journal_handle
1306 .preallocate_range(&mut transaction, &mut file_range)
1307 .await
1308 .context("preallocate journal")?;
1309 if file_range.start < file_range.end {
1310 bail!("preallocate_range returned too little space");
1311 }
1312 }
1313
1314 root_store.create(&mut transaction).await?;
1316
1317 root_parent.set_graveyard_directory_object_id(
1319 Graveyard::create(&mut transaction, &root_parent).await?,
1320 );
1321
1322 transaction.commit().await?;
1323
1324 self.inner.lock().super_block_header = SuperBlockHeader::new(
1325 current_generation,
1326 root_parent.store_object_id(),
1327 root_parent.graveyard_directory_object_id(),
1328 root_store.store_object_id(),
1329 allocator.object_id(),
1330 journal_handle.object_id(),
1331 checkpoint,
1332 LATEST_VERSION,
1333 );
1334
1335 let _ = self.handle.set(journal_handle);
1337 Ok(())
1338 }
1339
1340 pub async fn allocate_journal(&self) -> Result<(), Error> {
1343 let handle = self.handle.get().unwrap();
1344 let filesystem = handle.store().filesystem();
1345 let mut transaction = filesystem
1346 .clone()
1347 .new_transaction(
1348 lock_keys![LockKey::object(handle.store().store_object_id(), handle.object_id()),],
1349 Options { skip_journal_checks: true, ..Default::default() },
1350 )
1351 .await?;
1352 let mut file_range = 0..self.chunk_size();
1353 self.handle
1354 .get()
1355 .unwrap()
1356 .preallocate_range(&mut transaction, &mut file_range)
1357 .await
1358 .context("preallocate journal")?;
1359 if file_range.start < file_range.end {
1360 bail!("preallocate_range returned too little space");
1361 }
1362 transaction.commit().await?;
1363 Ok(())
1364 }
1365
1366 pub async fn init_superblocks(&self) -> Result<(), Error> {
1367 for _ in 0..2 {
1369 self.write_super_block().await?;
1370 }
1371 Ok(())
1372 }
1373
1374 pub async fn read_transactions_for_object(
1380 &self,
1381 object_id: u64,
1382 ) -> Result<Vec<JournaledTransaction>, Error> {
1383 let handle = self.handle.get().expect("No journal handle");
1384 let handle = ObjectStore::open_object(
1386 handle.owner(),
1387 handle.object_id(),
1388 journal_handle_options(),
1389 None,
1390 )
1391 .await?;
1392
1393 let checkpoint = match self.objects.journal_checkpoint(object_id) {
1394 Some(checkpoint) => checkpoint,
1395 None => return Ok(vec![]),
1396 };
1397 let mut reader = JournalReader::new(handle, &checkpoint);
1398 let end_offset = self.inner.lock().valid_to;
1401 Ok(self.read_transactions(&mut reader, Some(end_offset), object_id).await?.transactions)
1402 }
1403
1404 pub async fn commit(&self, transaction: &mut Transaction<'_>) -> Result<u64, Error> {
1406 if transaction.is_empty() {
1407 return Ok(self.inner.lock().writer.journal_file_checkpoint().file_offset);
1408 }
1409
1410 self.pre_commit(transaction).await?;
1411 Ok(self.write_and_apply_mutations(transaction))
1412 }
1413
1414 async fn pre_commit(&self, transaction: &Transaction<'_>) -> Result<(), Error> {
1417 let handle;
1418
1419 let (size, zero_offset) = {
1420 let mut inner = self.inner.lock();
1421
1422 if std::mem::take(&mut inner.output_reset_version) {
1424 LATEST_VERSION.serialize_into(&mut inner.writer)?;
1425 }
1426
1427 if let Some(discard_offset) = inner.discard_offset {
1428 JournalRecord::Discard(discard_offset).serialize_into(&mut inner.writer)?;
1429 inner.discard_offset = None;
1430 }
1431
1432 if inner.needs_did_flush_device {
1433 let offset = inner.device_flushed_offset;
1434 JournalRecord::DidFlushDevice(offset).serialize_into(&mut inner.writer)?;
1435 inner.needs_did_flush_device = false;
1436 }
1437
1438 handle = match self.handle.get() {
1439 None => return Ok(()),
1440 Some(x) => x,
1441 };
1442
1443 let file_offset = inner.writer.journal_file_checkpoint().file_offset;
1444
1445 let size = handle.get_size();
1446 let size = if file_offset + self.chunk_size() > size { Some(size) } else { None };
1447
1448 if size.is_none()
1449 && inner.zero_offset.is_none()
1450 && !self.objects.needs_borrow_for_journal(file_offset)
1451 {
1452 return Ok(());
1453 }
1454
1455 (size, inner.zero_offset)
1456 };
1457
1458 let mut transaction = handle
1459 .new_transaction_with_options(Options {
1460 skip_journal_checks: true,
1461 borrow_metadata_space: true,
1462 allocator_reservation: Some(self.objects.metadata_reservation()),
1463 txn_guard: Some(transaction.txn_guard()),
1464 ..Default::default()
1465 })
1466 .await?;
1467 if let Some(size) = size {
1468 handle
1469 .preallocate_range(&mut transaction, &mut (size..size + self.chunk_size()))
1470 .await?;
1471 }
1472 if let Some(zero_offset) = zero_offset {
1473 handle.zero(&mut transaction, 0..zero_offset).await?;
1474 }
1475
1476 self.write_and_apply_mutations(&mut transaction);
1479
1480 let mut inner = self.inner.lock();
1481
1482 if let Some(size) = size {
1485 assert!(inner.writer.journal_file_checkpoint().file_offset < size);
1486 }
1487
1488 if inner.zero_offset == zero_offset {
1489 inner.zero_offset = None;
1490 }
1491
1492 Ok(())
1493 }
1494
1495 fn should_apply(&self, object_id: u64, journal_file_checkpoint: &JournalCheckpoint) -> bool {
1499 let super_block_header = &self.inner.lock().super_block_header;
1500 let offset = super_block_header
1501 .journal_file_offsets
1502 .get(&object_id)
1503 .cloned()
1504 .unwrap_or(super_block_header.super_block_journal_file_offset);
1505 journal_file_checkpoint.file_offset >= offset
1506 }
1507
1508 async fn write_super_block(&self) -> Result<(), Error> {
1511 let root_parent_store = self.objects.root_parent_store();
1512
1513 let old_layers;
1518 let old_super_block_offset;
1519 let mut new_super_block_header;
1520 let checkpoint;
1521 let borrowed;
1522
1523 {
1524 let _sync_guard = debug_assert_not_too_long!(self.sync_mutex.lock());
1525 {
1526 let _write_guard = self.writer_mutex.lock();
1527 (checkpoint, borrowed) = self.pad_to_block()?;
1528 old_layers = super_block::compact_root_parent(&*root_parent_store)?;
1529 }
1530 self.flush_device(checkpoint.file_offset)
1531 .await
1532 .context("flush failed when writing superblock")?;
1533 }
1534
1535 new_super_block_header = self.inner.lock().super_block_header.clone();
1536
1537 old_super_block_offset = new_super_block_header.journal_checkpoint.file_offset;
1538
1539 let (journal_file_offsets, min_checkpoint) = self.objects.journal_file_offsets();
1540
1541 new_super_block_header.generation = new_super_block_header.generation.wrapping_add(1);
1542 new_super_block_header.super_block_journal_file_offset = checkpoint.file_offset;
1543 new_super_block_header.journal_checkpoint = min_checkpoint.unwrap_or(checkpoint);
1544 new_super_block_header.journal_checkpoint.version = LATEST_VERSION;
1545 new_super_block_header.journal_file_offsets = journal_file_offsets;
1546 new_super_block_header.borrowed_metadata_space = borrowed;
1547
1548 self.super_block_manager
1549 .save(
1550 new_super_block_header.clone(),
1551 self.objects.root_parent_store().filesystem(),
1552 old_layers,
1553 )
1554 .await?;
1555 {
1556 let mut inner = self.inner.lock();
1557 inner.super_block_header = new_super_block_header;
1558 inner.zero_offset = Some(round_down(old_super_block_offset, BLOCK_SIZE));
1559 }
1560
1561 Ok(())
1562 }
1563
1564 pub async fn sync(
1571 &self,
1572 options: SyncOptions<'_>,
1573 ) -> Result<Option<(JournalCheckpoint, u64)>, Error> {
1574 let _guard = debug_assert_not_too_long!(self.sync_mutex.lock());
1575
1576 let (checkpoint, borrowed) = {
1577 if let Some(precondition) = options.precondition {
1578 if !precondition() {
1579 return Ok(None);
1580 }
1581 }
1582
1583 let _guard = self.writer_mutex.lock();
1586
1587 self.pad_to_block()?
1588 };
1589
1590 if options.flush_device {
1591 self.flush_device(checkpoint.file_offset).await.context("sync: flush failed")?;
1592 }
1593
1594 Ok(Some((checkpoint, borrowed)))
1595 }
1596
1597 fn pad_to_block(&self) -> Result<(JournalCheckpoint, u64), Error> {
1601 let mut inner = self.inner.lock();
1602 let checkpoint = inner.writer.journal_file_checkpoint();
1603 if checkpoint.file_offset % BLOCK_SIZE != 0 {
1604 JournalRecord::EndBlock.serialize_into(&mut inner.writer)?;
1605 inner.writer.pad_to_block()?;
1606 if let Some(waker) = inner.flush_waker.take() {
1607 waker.wake();
1608 }
1609 }
1610 Ok((checkpoint, self.objects.borrowed_metadata_space()))
1611 }
1612
1613 async fn flush_device(&self, checkpoint_offset: u64) -> Result<(), Error> {
1614 assert!(
1615 self.inner.lock().image_builder_mode.is_none(),
1616 "flush_device called in image builder mode"
1617 );
1618 debug_assert_not_too_long!(poll_fn(|ctx| {
1619 let mut inner = self.inner.lock();
1620 if inner.flushed_offset >= checkpoint_offset {
1621 Poll::Ready(Ok(()))
1622 } else if inner.terminate {
1623 let context = inner
1624 .terminate_reason
1625 .as_ref()
1626 .map(|e| format!("Journal closed with error: {:?}", e))
1627 .unwrap_or_else(|| "Journal closed".to_string());
1628 Poll::Ready(Err(anyhow!(FxfsError::JournalFlushError).context(context)))
1629 } else {
1630 inner.sync_waker = Some(ctx.waker().clone());
1631 Poll::Pending
1632 }
1633 }))?;
1634
1635 let needs_flush = self.inner.lock().device_flushed_offset < checkpoint_offset;
1636 if needs_flush {
1637 let trace = self.trace.load(Ordering::Relaxed);
1638 if trace {
1639 info!("J: start flush device");
1640 }
1641 self.handle.get().unwrap().flush_device().await?;
1642 if trace {
1643 info!("J: end flush device");
1644 }
1645
1646 {
1654 let mut inner = self.inner.lock();
1655 inner.device_flushed_offset = checkpoint_offset;
1656 inner.needs_did_flush_device = true;
1657 }
1658
1659 self.objects.allocator().did_flush_device(checkpoint_offset);
1662 if trace {
1663 info!("J: did flush device");
1664 }
1665 }
1666
1667 Ok(())
1668 }
1669
1670 pub fn super_block_header(&self) -> SuperBlockHeader {
1672 self.inner.lock().super_block_header.clone()
1673 }
1674
1675 pub async fn check_journal_space(&self) -> Result<(), Error> {
1677 loop {
1678 debug_assert_not_too_long!({
1679 let inner = self.inner.lock();
1680 if inner.terminate {
1681 let context = inner
1684 .terminate_reason
1685 .as_ref()
1686 .map(|e| format!("Journal closed with error: {:?}", e))
1687 .unwrap_or_else(|| "Journal closed".to_string());
1688 break Err(anyhow!(FxfsError::JournalFlushError).context(context));
1689 }
1690 if self.objects.last_end_offset()
1691 - inner.super_block_header.journal_checkpoint.file_offset
1692 < inner.reclaim_size
1693 {
1694 break Ok(());
1695 }
1696 if inner.image_builder_mode.is_some() {
1697 break Ok(());
1698 }
1699 if inner.disable_compactions {
1700 break Err(
1701 anyhow!(FxfsError::JournalFlushError).context("Compactions disabled")
1702 );
1703 }
1704 self.reclaim_event.listen()
1705 });
1706 }
1707 }
1708
1709 fn chunk_size(&self) -> u64 {
1710 CHUNK_SIZE
1711 }
1712
1713 fn write_and_apply_mutations(&self, transaction: &mut Transaction<'_>) -> u64 {
1714 let checkpoint_before;
1715 let checkpoint_after;
1716 {
1717 let _guard = self.writer_mutex.lock();
1718 checkpoint_before = {
1719 let mut inner = self.inner.lock();
1720 if transaction.includes_write() {
1721 inner.needs_barrier = true;
1722 }
1723 let checkpoint = inner.writer.journal_file_checkpoint();
1724 for TxnMutation { object_id, mutation, .. } in transaction.mutations() {
1725 self.objects.write_mutation(
1726 *object_id,
1727 mutation,
1728 Writer(*object_id, &mut inner.writer),
1729 );
1730 }
1731 checkpoint
1732 };
1733 let maybe_mutation =
1734 self.objects.apply_transaction(transaction, &checkpoint_before).expect(
1735 "apply_transaction should not fail in live mode; \
1736 filesystem will be in an inconsistent state",
1737 );
1738 checkpoint_after = {
1739 let mut inner = self.inner.lock();
1740 if let Some(mutation) = maybe_mutation {
1741 inner
1742 .writer
1743 .write_record(&JournalRecord::Mutation { object_id: 0, mutation })
1744 .unwrap();
1745 }
1746 for (device_range, checksums, first_write) in
1747 transaction.take_checksums().into_iter()
1748 {
1749 inner
1750 .writer
1751 .write_record(&JournalRecord::DataChecksums(
1752 device_range,
1753 Checksums::fletcher(checksums),
1754 first_write,
1755 ))
1756 .unwrap();
1757 }
1758 inner.writer.write_record(&JournalRecord::Commit).unwrap();
1759
1760 inner.writer.journal_file_checkpoint()
1761 };
1762 }
1763 self.objects.did_commit_transaction(
1764 transaction,
1765 &checkpoint_before,
1766 checkpoint_after.file_offset,
1767 );
1768
1769 if let Some(waker) = self.inner.lock().flush_waker.take() {
1770 waker.wake();
1771 }
1772
1773 checkpoint_before.file_offset
1774 }
1775
1776 pub async fn flush_task(self: Arc<Self>) {
1780 let mut flush_fut = None;
1781 let mut compact_fut = None;
1782 let mut flush_error = false;
1783 poll_fn(|ctx| {
1784 loop {
1785 {
1786 let mut inner = self.inner.lock();
1787 if flush_fut.is_none() && !flush_error && self.handle.get().is_some() {
1788 let flushable = inner.writer.flushable_bytes();
1789 if flushable > 0 {
1790 flush_fut = Some(Box::pin(self.flush(flushable)));
1791 }
1792 }
1793 if inner.terminate && flush_fut.is_none() && compact_fut.is_none() {
1794 return Poll::Ready(());
1795 }
1796 if compact_fut.is_none()
1800 && !inner.terminate
1801 && !inner.disable_compactions
1802 && inner.image_builder_mode.is_none()
1803 && self.objects.last_end_offset()
1804 - inner.super_block_header.journal_checkpoint.file_offset
1805 > inner.reclaim_size / 2
1806 {
1807 compact_fut = Some(Box::pin(self.compact()));
1808 inner.compaction_running = true;
1809 }
1810 inner.flush_waker = Some(ctx.waker().clone());
1811 }
1812 let mut pending = true;
1813 if let Some(fut) = flush_fut.as_mut() {
1814 if let Poll::Ready(result) = fut.poll_unpin(ctx) {
1815 if let Err(e) = result {
1816 self.inner.lock().terminate(Some(e.context("Flush error")));
1817 self.reclaim_event.notify(usize::MAX);
1818 flush_error = true;
1819 }
1820 flush_fut = None;
1821 pending = false;
1822 }
1823 }
1824 if let Some(fut) = compact_fut.as_mut() {
1825 if let Poll::Ready(result) = fut.poll_unpin(ctx) {
1826 let mut inner = self.inner.lock();
1827 if let Err(e) = result {
1828 inner.terminate(Some(e.context("Compaction error")));
1829 }
1830 compact_fut = None;
1831 inner.compaction_running = false;
1832 self.reclaim_event.notify(usize::MAX);
1833 pending = false;
1834 }
1835 }
1836 if pending {
1837 return Poll::Pending;
1838 }
1839 }
1840 })
1841 .await;
1842 }
1843
1844 async fn flush(&self, amount: usize) -> Result<(), Error> {
1845 let handle = self.handle.get().unwrap();
1846 let mut buf = handle.allocate_buffer(amount).await;
1847 let (offset, len, barrier_on_first_write) = {
1848 let mut inner = self.inner.lock();
1849 let offset = inner.writer.take_flushable(buf.as_mut());
1850 let barrier_on_first_write = inner.needs_barrier && inner.barriers_enabled;
1851 inner.needs_barrier = false;
1854 (offset, buf.len() as u64, barrier_on_first_write)
1855 };
1856 self.handle
1857 .get()
1858 .unwrap()
1859 .overwrite(
1860 offset,
1861 buf.as_mut(),
1862 OverwriteOptions { barrier_on_first_write, ..Default::default() },
1863 )
1864 .await?;
1865
1866 let mut inner = self.inner.lock();
1867 if let Some(waker) = inner.sync_waker.take() {
1868 waker.wake();
1869 }
1870 inner.flushed_offset = offset + len;
1871 inner.valid_to = inner.flushed_offset;
1872 Ok(())
1873 }
1874
1875 #[trace]
1878 pub async fn compact(&self) -> Result<(), Error> {
1879 assert!(
1880 self.inner.lock().image_builder_mode.is_none(),
1881 "compact called in image builder mode"
1882 );
1883 let trace = self.trace.load(Ordering::Relaxed);
1884 debug!("Compaction starting");
1885 if trace {
1886 info!("J: start compaction");
1887 }
1888 let earliest_version = self.objects.flush().await.context("Failed to flush objects")?;
1889 self.inner.lock().super_block_header.earliest_version = earliest_version;
1890 self.write_super_block().await.context("Failed to write superblock")?;
1891 if trace {
1892 info!("J: end compaction");
1893 }
1894 debug!("Compaction finished");
1895 Ok(())
1896 }
1897
1898 pub async fn stop_compactions(&self) {
1899 loop {
1900 debug_assert_not_too_long!({
1901 let mut inner = self.inner.lock();
1902 inner.disable_compactions = true;
1903 if !inner.compaction_running {
1904 return;
1905 }
1906 self.reclaim_event.listen()
1907 });
1908 }
1909 }
1910
1911 pub fn track_statistics(self: &Arc<Self>, parent: &fuchsia_inspect::Node, name: &str) {
1914 let this = Arc::downgrade(self);
1915 parent.record_lazy_child(name, move || {
1916 let this_clone = this.clone();
1917 async move {
1918 let inspector = fuchsia_inspect::Inspector::default();
1919 if let Some(this) = this_clone.upgrade() {
1920 let (journal_min, journal_max, journal_reclaim_size) = {
1921 let inner = this.inner.lock();
1923 (
1924 round_down(
1925 inner.super_block_header.journal_checkpoint.file_offset,
1926 BLOCK_SIZE,
1927 ),
1928 inner.flushed_offset,
1929 inner.reclaim_size,
1930 )
1931 };
1932 let root = inspector.root();
1933 root.record_uint("journal_min_offset", journal_min);
1934 root.record_uint("journal_max_offset", journal_max);
1935 root.record_uint("journal_size", journal_max - journal_min);
1936 root.record_uint("journal_reclaim_size", journal_reclaim_size);
1937
1938 if let Some(x) = round_div(
1940 100 * (journal_max - journal_min),
1941 this.objects.allocator().get_disk_bytes(),
1942 ) {
1943 root.record_uint("journal_size_to_disk_size_percent", x);
1944 }
1945 }
1946 Ok(inspector)
1947 }
1948 .boxed()
1949 });
1950 }
1951
1952 pub fn terminate(&self) {
1954 self.inner.lock().terminate(None);
1955 self.reclaim_event.notify(usize::MAX);
1956 }
1957}
1958
1959pub struct Writer<'a>(u64, &'a mut JournalWriter);
1961
1962impl Writer<'_> {
1963 pub fn write(&mut self, mutation: Mutation) {
1964 self.1.write_record(&JournalRecord::Mutation { object_id: self.0, mutation }).unwrap();
1965 }
1966}
1967
1968#[cfg(test)]
1969mod tests {
1970 use crate::filesystem::{FxFilesystem, FxFilesystemBuilder, SyncOptions};
1971 use crate::fsck::fsck;
1972 use crate::object_handle::{ObjectHandle, ReadObjectHandle, WriteObjectHandle};
1973 use crate::object_store::directory::Directory;
1974 use crate::object_store::journal::SuperBlockInstance;
1975 use crate::object_store::transaction::Options;
1976 use crate::object_store::volume::root_volume;
1977 use crate::object_store::{
1978 HandleOptions, LockKey, NewChildStoreOptions, ObjectStore, StoreOptions, lock_keys,
1979 };
1980 use fuchsia_async as fasync;
1981 use fuchsia_async::MonotonicDuration;
1982 use storage_device::DeviceHolder;
1983 use storage_device::fake_device::FakeDevice;
1984
1985 const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
1986
1987 #[fuchsia::test]
1988 async fn test_replay() {
1989 const TEST_DATA: &[u8] = b"hello";
1990
1991 let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1992
1993 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1994
1995 let object_id = {
1996 let root_store = fs.root_store();
1997 let root_directory =
1998 Directory::open(&root_store, root_store.root_directory_object_id())
1999 .await
2000 .expect("open failed");
2001 let mut transaction = fs
2002 .clone()
2003 .new_transaction(
2004 lock_keys![LockKey::object(
2005 root_store.store_object_id(),
2006 root_store.root_directory_object_id(),
2007 )],
2008 Options::default(),
2009 )
2010 .await
2011 .expect("new_transaction failed");
2012 let handle = root_directory
2013 .create_child_file(&mut transaction, "test")
2014 .await
2015 .expect("create_child_file failed");
2016
2017 transaction.commit().await.expect("commit failed");
2018 let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
2019 buf.as_mut_slice().copy_from_slice(TEST_DATA);
2020 handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2021 fs.sync(SyncOptions::default()).await.expect("sync failed");
2024 handle.object_id()
2025 };
2026
2027 {
2028 fs.close().await.expect("Close failed");
2029 let device = fs.take_device().await;
2030 device.reopen(false);
2031 let fs = FxFilesystem::open(device).await.expect("open failed");
2032 let handle = ObjectStore::open_object(
2033 &fs.root_store(),
2034 object_id,
2035 HandleOptions::default(),
2036 None,
2037 )
2038 .await
2039 .expect("open_object failed");
2040 let mut buf = handle.allocate_buffer(TEST_DEVICE_BLOCK_SIZE as usize).await;
2041 assert_eq!(handle.read(0, buf.as_mut()).await.expect("read failed"), TEST_DATA.len());
2042 assert_eq!(&buf.as_slice()[..TEST_DATA.len()], TEST_DATA);
2043 fsck(fs.clone()).await.expect("fsck failed");
2044 fs.close().await.expect("Close failed");
2045 }
2046 }
2047
2048 #[fuchsia::test]
2049 async fn test_reset() {
2050 const TEST_DATA: &[u8] = b"hello";
2051
2052 let device = DeviceHolder::new(FakeDevice::new(32768, TEST_DEVICE_BLOCK_SIZE));
2053
2054 let mut object_ids = Vec::new();
2055
2056 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2057 {
2058 let root_store = fs.root_store();
2059 let root_directory =
2060 Directory::open(&root_store, root_store.root_directory_object_id())
2061 .await
2062 .expect("open failed");
2063 let mut transaction = fs
2064 .clone()
2065 .new_transaction(
2066 lock_keys![LockKey::object(
2067 root_store.store_object_id(),
2068 root_store.root_directory_object_id(),
2069 )],
2070 Options::default(),
2071 )
2072 .await
2073 .expect("new_transaction failed");
2074 let handle = root_directory
2075 .create_child_file(&mut transaction, "test")
2076 .await
2077 .expect("create_child_file failed");
2078 transaction.commit().await.expect("commit failed");
2079 let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
2080 buf.as_mut_slice().copy_from_slice(TEST_DATA);
2081 handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2082 fs.sync(SyncOptions::default()).await.expect("sync failed");
2083 object_ids.push(handle.object_id());
2084
2085 for i in 0..1000 {
2088 let mut transaction = fs
2089 .clone()
2090 .new_transaction(
2091 lock_keys![LockKey::object(
2092 root_store.store_object_id(),
2093 root_store.root_directory_object_id(),
2094 )],
2095 Options::default(),
2096 )
2097 .await
2098 .expect("new_transaction failed");
2099 let handle = root_directory
2100 .create_child_file(&mut transaction, &format!("{}", i))
2101 .await
2102 .expect("create_child_file failed");
2103 transaction.commit().await.expect("commit failed");
2104 let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
2105 buf.as_mut_slice().copy_from_slice(TEST_DATA);
2106 handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2107 object_ids.push(handle.object_id());
2108 }
2109 }
2110 fs.close().await.expect("fs close failed");
2111 let device = fs.take_device().await;
2112 device.reopen(false);
2113 let fs = FxFilesystem::open(device).await.expect("open failed");
2114 fsck(fs.clone()).await.expect("fsck failed");
2115 {
2116 let root_store = fs.root_store();
2117 for &object_id in &object_ids[0..1] {
2119 let handle = ObjectStore::open_object(
2120 &root_store,
2121 object_id,
2122 HandleOptions::default(),
2123 None,
2124 )
2125 .await
2126 .expect("open_object failed");
2127 let mut buf = handle.allocate_buffer(TEST_DEVICE_BLOCK_SIZE as usize).await;
2128 assert_eq!(
2129 handle.read(0, buf.as_mut()).await.expect("read failed"),
2130 TEST_DATA.len()
2131 );
2132 assert_eq!(&buf.as_slice()[..TEST_DATA.len()], TEST_DATA);
2133 }
2134
2135 let root_directory =
2137 Directory::open(&root_store, root_store.root_directory_object_id())
2138 .await
2139 .expect("open failed");
2140 let mut transaction = fs
2141 .clone()
2142 .new_transaction(
2143 lock_keys![LockKey::object(
2144 root_store.store_object_id(),
2145 root_store.root_directory_object_id(),
2146 )],
2147 Options::default(),
2148 )
2149 .await
2150 .expect("new_transaction failed");
2151 let handle = root_directory
2152 .create_child_file(&mut transaction, "test2")
2153 .await
2154 .expect("create_child_file failed");
2155 transaction.commit().await.expect("commit failed");
2156 let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
2157 buf.as_mut_slice().copy_from_slice(TEST_DATA);
2158 handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
2159 fs.sync(SyncOptions::default()).await.expect("sync failed");
2160 object_ids.push(handle.object_id());
2161 }
2162
2163 fs.close().await.expect("close failed");
2164 let device = fs.take_device().await;
2165 device.reopen(false);
2166 let fs = FxFilesystem::open(device).await.expect("open failed");
2167 {
2168 fsck(fs.clone()).await.expect("fsck failed");
2169
2170 for &object_id in object_ids[0..1].iter().chain(object_ids.last().cloned().iter()) {
2172 let handle = ObjectStore::open_object(
2173 &fs.root_store(),
2174 object_id,
2175 HandleOptions::default(),
2176 None,
2177 )
2178 .await
2179 .unwrap_or_else(|e| {
2180 panic!("open_object failed (object_id: {}): {:?}", object_id, e)
2181 });
2182 let mut buf = handle.allocate_buffer(TEST_DEVICE_BLOCK_SIZE as usize).await;
2183 assert_eq!(
2184 handle.read(0, buf.as_mut()).await.expect("read failed"),
2185 TEST_DATA.len()
2186 );
2187 assert_eq!(&buf.as_slice()[..TEST_DATA.len()], TEST_DATA);
2188 }
2189 }
2190 fs.close().await.expect("close failed");
2191 }
2192
2193 #[fuchsia::test]
2194 async fn test_discard() {
2195 let device = {
2196 let device = DeviceHolder::new(FakeDevice::new(8192, 4096));
2197 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2198 let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
2199
2200 let store = root_volume
2201 .new_volume("test", NewChildStoreOptions::default())
2202 .await
2203 .expect("new_volume failed");
2204 let root_directory = Directory::open(&store, store.root_directory_object_id())
2205 .await
2206 .expect("open failed");
2207
2208 let mut i = 0;
2210 loop {
2211 let mut transaction = fs
2212 .clone()
2213 .new_transaction(
2214 lock_keys![LockKey::object(
2215 store.store_object_id(),
2216 store.root_directory_object_id()
2217 )],
2218 Options::default(),
2219 )
2220 .await
2221 .expect("new_transaction failed");
2222 root_directory
2223 .create_child_file(&mut transaction, &format!("a {i}"))
2224 .await
2225 .expect("create_child_file failed");
2226 if transaction.commit().await.expect("commit failed") > super::CHUNK_SIZE {
2227 break;
2228 }
2229 i += 1;
2230 }
2231
2232 fs.journal().compact().await.expect("compact failed");
2234 fs.journal().stop_compactions().await;
2235
2236 let mut i = 0;
2238 loop {
2239 let mut transaction = fs
2240 .clone()
2241 .new_transaction(
2242 lock_keys![LockKey::object(
2243 store.store_object_id(),
2244 store.root_directory_object_id()
2245 )],
2246 Options::default(),
2247 )
2248 .await
2249 .expect("new_transaction failed");
2250 root_directory
2251 .create_child_file(&mut transaction, &format!("b {i}"))
2252 .await
2253 .expect("create_child_file failed");
2254 if transaction.commit().await.expect("commit failed") > 2 * super::CHUNK_SIZE {
2255 break;
2256 }
2257 i += 1;
2258 }
2259
2260 fasync::Timer::new(MonotonicDuration::from_millis(10)).await;
2262 fs.device().snapshot().expect("snapshot failed")
2265 };
2266
2267 let fs = FxFilesystem::open(device).await.expect("open failed");
2268
2269 {
2270 let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
2271
2272 let store =
2273 root_volume.volume("test", StoreOptions::default()).await.expect("volume failed");
2274
2275 let root_directory = Directory::open(&store, store.root_directory_object_id())
2276 .await
2277 .expect("open failed");
2278
2279 let mut transaction = fs
2281 .clone()
2282 .new_transaction(
2283 lock_keys![LockKey::object(
2284 store.store_object_id(),
2285 store.root_directory_object_id()
2286 )],
2287 Options::default(),
2288 )
2289 .await
2290 .expect("new_transaction failed");
2291 root_directory
2292 .create_child_file(&mut transaction, &format!("d"))
2293 .await
2294 .expect("create_child_file failed");
2295 transaction.commit().await.expect("commit failed");
2296 }
2297
2298 fs.close().await.expect("close failed");
2299 let device = fs.take_device().await;
2300 device.reopen(false);
2301
2302 let fs = FxFilesystem::open(device).await.expect("open failed");
2303 fsck(fs.clone()).await.expect("fsck failed");
2304 fs.close().await.expect("close failed");
2305 }
2306
2307 #[fuchsia::test]
2308 async fn test_use_existing_generation() {
2309 let device = DeviceHolder::new(FakeDevice::new(8192, 4096));
2310
2311 let fs = FxFilesystemBuilder::new()
2313 .format(true)
2314 .image_builder_mode(Some(SuperBlockInstance::A))
2315 .open(device)
2316 .await
2317 .expect("open failed");
2318 fs.enable_allocations();
2319 let generation0 = fs.super_block_header().generation;
2320 assert_eq!(generation0, 1);
2321 fs.close().await.expect("close failed");
2322 let device = fs.take_device().await;
2323 device.reopen(false);
2324
2325 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2327 let generation1 = fs.super_block_header().generation;
2328 {
2329 let root_volume = crate::object_store::volume::root_volume(fs.clone())
2330 .await
2331 .expect("root_volume failed");
2332 root_volume
2333 .new_volume("test", crate::object_store::NewChildStoreOptions::default())
2334 .await
2335 .expect("new_volume failed");
2336 }
2337 fs.close().await.expect("close failed");
2338 let device = fs.take_device().await;
2339 device.reopen(false);
2340
2341 let fs = FxFilesystemBuilder::new()
2343 .format(true)
2344 .image_builder_mode(Some(SuperBlockInstance::A))
2345 .open(device)
2346 .await
2347 .expect("open failed");
2348 fs.enable_allocations();
2349 let generation2 = fs.super_block_header().generation;
2350 assert!(
2351 generation2 > generation1,
2352 "generation2 ({}) should be greater than generation1 ({})",
2353 generation2,
2354 generation1
2355 );
2356 fs.close().await.expect("close failed");
2357 }
2358
2359 #[fuchsia::test]
2360 async fn test_image_builder_mode_generation_bump_512_byte_block() {
2361 let device = DeviceHolder::new(FakeDevice::new(16384, 512));
2362
2363 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2365 let generation1 = fs.super_block_header().generation;
2366 fs.close().await.expect("close failed");
2367 let device = fs.take_device().await;
2368 device.reopen(false);
2369
2370 let fs = FxFilesystemBuilder::new()
2372 .format(true)
2373 .image_builder_mode(Some(SuperBlockInstance::A))
2374 .open(device)
2375 .await
2376 .expect("open failed");
2377
2378 fs.enable_allocations();
2379 let generation2 = fs.super_block_header().generation;
2380 assert!(
2381 generation2 > generation1,
2382 "Expected generation bump, got {} vs {}",
2383 generation2,
2384 generation1
2385 );
2386 fs.close().await.expect("close failed");
2387 }
2388}
2389
2390#[cfg(fuzz)]
2391mod fuzz {
2392 use fuzz::fuzz;
2393
2394 #[fuzz]
2395 fn fuzz_journal_bytes(input: Vec<u8>) {
2396 use crate::filesystem::FxFilesystem;
2397 use fuchsia_async as fasync;
2398 use std::io::Write;
2399 use storage_device::DeviceHolder;
2400 use storage_device::fake_device::FakeDevice;
2401
2402 fasync::SendExecutorBuilder::new().num_threads(4).build().run(async move {
2403 let device = DeviceHolder::new(FakeDevice::new(32768, 512));
2404 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2405 fs.journal().inner.lock().writer.write_all(&input).expect("write failed");
2406 fs.close().await.expect("close failed");
2407 let device = fs.take_device().await;
2408 device.reopen(false);
2409 if let Ok(fs) = FxFilesystem::open(device).await {
2410 let _ = fs.close().await;
2413 }
2414 });
2415 }
2416
2417 #[fuzz]
2418 fn fuzz_journal(input: Vec<super::JournalRecord>) {
2419 use crate::filesystem::FxFilesystem;
2420 use fuchsia_async as fasync;
2421 use storage_device::DeviceHolder;
2422 use storage_device::fake_device::FakeDevice;
2423
2424 fasync::SendExecutorBuilder::new().num_threads(4).build().run(async move {
2425 let device = DeviceHolder::new(FakeDevice::new(32768, 512));
2426 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2427 {
2428 let mut inner = fs.journal().inner.lock();
2429 for record in &input {
2430 let _ = inner.writer.write_record(record);
2431 }
2432 }
2433 fs.close().await.expect("close failed");
2434 let device = fs.take_device().await;
2435 device.reopen(false);
2436 if let Ok(fs) = FxFilesystem::open(device).await {
2437 let _ = fs.close().await;
2440 }
2441 });
2442 }
2443}