1mod bootstrap_handle;
20mod checksum_list;
21mod reader;
22pub mod super_block;
23mod writer;
24
25use crate::checksum::{Checksum, Checksums, ChecksumsV38};
26use crate::debug_assert_not_too_long;
27use crate::errors::FxfsError;
28use crate::filesystem::{ApplyContext, ApplyMode, FxFilesystem, SyncOptions};
29use crate::log::*;
30use crate::lsm_tree::cache::NullCache;
31use crate::lsm_tree::types::Layer;
32use crate::object_handle::{ObjectHandle as _, ReadObjectHandle};
33use crate::object_store::allocator::Allocator;
34use crate::object_store::extent_record::{
35 ExtentKey, ExtentMode, ExtentValue, DEFAULT_DATA_ATTRIBUTE_ID,
36};
37use crate::object_store::graveyard::Graveyard;
38use crate::object_store::journal::bootstrap_handle::BootstrapObjectHandle;
39use crate::object_store::journal::checksum_list::ChecksumList;
40use crate::object_store::journal::reader::{JournalReader, ReadResult};
41use crate::object_store::journal::super_block::{
42 SuperBlockHeader, SuperBlockInstance, SuperBlockManager,
43};
44use crate::object_store::journal::writer::JournalWriter;
45use crate::object_store::object_manager::ObjectManager;
46use crate::object_store::object_record::{AttributeKey, ObjectKey, ObjectKeyData, ObjectValue};
47use crate::object_store::transaction::{
48 lock_keys, AllocatorMutation, LockKey, Mutation, MutationV40, MutationV41, MutationV43,
49 ObjectStoreMutation, Options, Transaction, TxnMutation, TRANSACTION_MAX_JOURNAL_USAGE,
50};
51use crate::object_store::{
52 AssocObj, DataObjectHandle, HandleOptions, HandleOwner, Item, ItemRef, NewChildStoreOptions,
53 ObjectStore, INVALID_OBJECT_ID,
54};
55use crate::range::RangeExt;
56use crate::round::{round_div, round_down};
57use crate::serialized_types::{migrate_to_version, Migrate, Version, Versioned, LATEST_VERSION};
58use anyhow::{anyhow, bail, ensure, Context, Error};
59use event_listener::Event;
60use fprint::TypeFingerprint;
61use fuchsia_sync::Mutex;
62use futures::future::poll_fn;
63use futures::FutureExt as _;
64use once_cell::sync::OnceCell;
65use rand::Rng;
66use rustc_hash::FxHashMap as HashMap;
67use serde::{Deserialize, Serialize};
68use static_assertions::const_assert;
69use std::clone::Clone;
70use std::collections::HashSet;
71use std::ops::{Bound, Range};
72use std::sync::atomic::{AtomicBool, Ordering};
73use std::sync::Arc;
74use std::task::{Poll, Waker};
75
76pub const BLOCK_SIZE: u64 = 4096;
78
79const CHUNK_SIZE: u64 = 131_072;
81const_assert!(CHUNK_SIZE > TRANSACTION_MAX_JOURNAL_USAGE);
82
83pub const DEFAULT_RECLAIM_SIZE: u64 = 262_144;
85
86pub const RESERVED_SPACE: u64 = 1_048_576;
89
90const RESET_XOR: u64 = 0xffffffffffffffff;
95
96pub type JournalCheckpoint = JournalCheckpointV32;
100
101#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize, TypeFingerprint)]
102pub struct JournalCheckpointV32 {
103 pub file_offset: u64,
104
105 pub checksum: Checksum,
108
109 pub version: Version,
113}
114
115pub type JournalRecord = JournalRecordV43;
116
117#[allow(clippy::large_enum_variant)]
118#[derive(Clone, Debug, Serialize, Deserialize, TypeFingerprint, Versioned)]
119#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
120pub enum JournalRecordV43 {
121 EndBlock,
123 Mutation { object_id: u64, mutation: MutationV43 },
126 Commit,
128 Discard(u64),
130 DidFlushDevice(u64),
140 DataChecksums(Range<u64>, ChecksumsV38, bool),
148}
149
150#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
151#[migrate_to_version(JournalRecordV43)]
152pub enum JournalRecordV42 {
153 EndBlock,
154 Mutation { object_id: u64, mutation: MutationV41 },
155 Commit,
156 Discard(u64),
157 DidFlushDevice(u64),
158 DataChecksums(Range<u64>, ChecksumsV38, bool),
159}
160
161#[derive(Serialize, Deserialize, TypeFingerprint, Versioned)]
162pub enum JournalRecordV41 {
163 EndBlock,
164 Mutation { object_id: u64, mutation: MutationV41 },
165 Commit,
166 Discard(u64),
167 DidFlushDevice(u64),
168 DataChecksums(Range<u64>, ChecksumsV38),
169}
170
171impl From<JournalRecordV41> for JournalRecordV42 {
172 fn from(record: JournalRecordV41) -> Self {
173 match record {
174 JournalRecordV41::EndBlock => Self::EndBlock,
175 JournalRecordV41::Mutation { object_id, mutation } => {
176 Self::Mutation { object_id, mutation: mutation.into() }
177 }
178 JournalRecordV41::Commit => Self::Commit,
179 JournalRecordV41::Discard(offset) => Self::Discard(offset),
180 JournalRecordV41::DidFlushDevice(offset) => Self::DidFlushDevice(offset),
181 JournalRecordV41::DataChecksums(range, sums) => {
182 Self::DataChecksums(range, sums, true)
185 }
186 }
187 }
188}
189
190#[derive(Migrate, Serialize, Deserialize, TypeFingerprint, Versioned)]
191#[migrate_to_version(JournalRecordV41)]
192pub enum JournalRecordV40 {
193 EndBlock,
194 Mutation { object_id: u64, mutation: MutationV40 },
195 Commit,
196 Discard(u64),
197 DidFlushDevice(u64),
198 DataChecksums(Range<u64>, ChecksumsV38),
199}
200
201pub(super) fn journal_handle_options() -> HandleOptions {
202 HandleOptions { skip_journal_checks: true, ..Default::default() }
203}
204
205pub struct Journal {
211 objects: Arc<ObjectManager>,
212 handle: OnceCell<DataObjectHandle<ObjectStore>>,
213 super_block_manager: SuperBlockManager,
214 inner: Mutex<Inner>,
215 writer_mutex: Mutex<()>,
216 sync_mutex: futures::lock::Mutex<()>,
217 trace: AtomicBool,
218
219 reclaim_event: Event,
221}
222
223struct Inner {
224 super_block_header: SuperBlockHeader,
225
226 zero_offset: Option<u64>,
228
229 device_flushed_offset: u64,
231
232 needs_did_flush_device: bool,
234
235 writer: JournalWriter,
237
238 output_reset_version: bool,
241
242 flush_waker: Option<Waker>,
244
245 terminate: bool,
247
248 terminate_reason: Option<Error>,
250
251 disable_compactions: bool,
253
254 compaction_running: bool,
256
257 sync_waker: Option<Waker>,
259
260 flushed_offset: u64,
262
263 valid_to: u64,
268
269 discard_offset: Option<u64>,
273
274 reclaim_size: u64,
278
279 image_builder_mode: bool,
280}
281
282impl Inner {
283 fn terminate(&mut self, reason: Option<Error>) {
284 self.terminate = true;
285
286 if let Some(err) = reason {
287 error!(error:? = err; "Terminating journal");
288 if let Some(prev_err) = self.terminate_reason.as_ref() {
290 error!(error:? = prev_err; "Journal previously terminated");
291 } else {
292 self.terminate_reason = Some(err);
293 }
294 }
295
296 if let Some(waker) = self.flush_waker.take() {
297 waker.wake();
298 }
299 if let Some(waker) = self.sync_waker.take() {
300 waker.wake();
301 }
302 }
303}
304
305pub struct JournalOptions {
306 pub reclaim_size: u64,
310}
311
312impl Default for JournalOptions {
313 fn default() -> Self {
314 JournalOptions { reclaim_size: DEFAULT_RECLAIM_SIZE }
315 }
316}
317
318struct JournaledTransactions {
319 transactions: Vec<JournaledTransaction>,
320 device_flushed_offset: u64,
321}
322
323#[derive(Debug, Default)]
324pub struct JournaledTransaction {
325 pub checkpoint: JournalCheckpoint,
326 pub root_parent_mutations: Vec<Mutation>,
327 pub root_mutations: Vec<Mutation>,
328 pub non_root_mutations: Vec<(u64, Mutation)>,
330 pub end_offset: u64,
331 pub checksums: Vec<JournaledChecksums>,
332
333 pub end_flush: Option<(u64, u64)>,
337}
338
339impl JournaledTransaction {
340 fn new(checkpoint: JournalCheckpoint) -> Self {
341 Self { checkpoint, ..Default::default() }
342 }
343}
344
345const STORE_DELETED: u64 = u64::MAX;
346
347#[derive(Debug)]
348pub struct JournaledChecksums {
349 pub device_range: Range<u64>,
350 pub checksums: Checksums,
351 pub first_write: bool,
352}
353
354pub trait JournalHandle: ReadObjectHandle {
357 fn end_offset(&self) -> Option<u64>;
361 fn push_extent(&mut self, added_offset: u64, device_range: Range<u64>);
365 fn discard_extents(&mut self, discard_offset: u64);
367}
368
369impl<S: HandleOwner> JournalHandle for DataObjectHandle<S> {
373 fn end_offset(&self) -> Option<u64> {
374 None
375 }
376 fn push_extent(&mut self, _added_offset: u64, _device_range: Range<u64>) {
377 }
379 fn discard_extents(&mut self, _discard_offset: u64) {
380 }
382}
383
384#[fxfs_trace::trace]
385impl Journal {
386 pub fn new(objects: Arc<ObjectManager>, options: JournalOptions) -> Journal {
387 let starting_checksum = rand::thread_rng().gen_range(1..u64::MAX);
388 Journal {
389 objects: objects,
390 handle: OnceCell::new(),
391 super_block_manager: SuperBlockManager::new(),
392 inner: Mutex::new(Inner {
393 super_block_header: SuperBlockHeader::default(),
394 zero_offset: None,
395 device_flushed_offset: 0,
396 needs_did_flush_device: false,
397 writer: JournalWriter::new(BLOCK_SIZE as usize, starting_checksum),
398 output_reset_version: false,
399 flush_waker: None,
400 terminate: false,
401 terminate_reason: None,
402 disable_compactions: false,
403 compaction_running: false,
404 sync_waker: None,
405 flushed_offset: 0,
406 valid_to: 0,
407 discard_offset: None,
408 reclaim_size: options.reclaim_size,
409 image_builder_mode: false,
410 }),
411 writer_mutex: Mutex::new(()),
412 sync_mutex: futures::lock::Mutex::new(()),
413 trace: AtomicBool::new(false),
414 reclaim_event: Event::new(),
415 }
416 }
417
418 pub fn set_trace(&self, trace: bool) {
419 let old_value = self.trace.swap(trace, Ordering::Relaxed);
420 if trace != old_value {
421 info!(trace; "J: trace");
422 }
423 }
424
425 pub fn set_image_builder_mode(&self, enabled: bool) {
426 self.inner.lock().image_builder_mode = enabled;
427 }
428
429 pub fn image_builder_mode(&self) -> bool {
430 self.inner.lock().image_builder_mode
431 }
432
433 fn validate_mutation(&self, mutation: &Mutation, block_size: u64, device_size: u64) -> bool {
437 match mutation {
438 Mutation::ObjectStore(ObjectStoreMutation {
439 item:
440 Item {
441 key:
442 ObjectKey {
443 data:
444 ObjectKeyData::Attribute(
445 _,
446 AttributeKey::Extent(ExtentKey { range }),
447 ),
448 ..
449 },
450 value: ObjectValue::Extent(ExtentValue::Some { device_offset, mode, .. }),
451 ..
452 },
453 ..
454 }) => {
455 if range.is_empty() || !range.is_aligned(block_size) {
456 return false;
457 }
458 let len = range.length().unwrap();
459 if let ExtentMode::Cow(checksums) = mode {
460 if checksums.len() > 0 {
461 if len % checksums.len() as u64 != 0 {
462 return false;
463 }
464 if (len / checksums.len() as u64) % block_size != 0 {
465 return false;
466 }
467 }
468 }
469 if *device_offset % block_size != 0
470 || *device_offset >= device_size
471 || device_size - *device_offset < len
472 {
473 return false;
474 }
475 }
476 Mutation::ObjectStore(_) => {}
477 Mutation::EncryptedObjectStore(_) => {}
478 Mutation::Allocator(AllocatorMutation::Allocate { device_range, owner_object_id }) => {
479 return !device_range.is_empty()
480 && *owner_object_id != INVALID_OBJECT_ID
481 && device_range.end <= device_size;
482 }
483 Mutation::Allocator(AllocatorMutation::Deallocate {
484 device_range,
485 owner_object_id,
486 }) => {
487 return !device_range.is_empty()
488 && *owner_object_id != INVALID_OBJECT_ID
489 && device_range.end <= device_size;
490 }
491 Mutation::Allocator(AllocatorMutation::MarkForDeletion(owner_object_id)) => {
492 return *owner_object_id != INVALID_OBJECT_ID;
493 }
494 Mutation::Allocator(AllocatorMutation::SetLimit { owner_object_id, .. }) => {
495 return *owner_object_id != INVALID_OBJECT_ID;
496 }
497 Mutation::BeginFlush => {}
498 Mutation::EndFlush => {}
499 Mutation::DeleteVolume => {}
500 Mutation::UpdateBorrowed(_) => {}
501 Mutation::UpdateMutationsKey(_) => {}
502 Mutation::CreateInternalDir(owner_object_id) => {
503 return *owner_object_id != INVALID_OBJECT_ID;
504 }
505 }
506 true
507 }
508
509 fn update_checksum_list(
511 &self,
512 journal_offset: u64,
513 mutation: &Mutation,
514 checksum_list: &mut ChecksumList,
515 ) -> Result<(), Error> {
516 match mutation {
517 Mutation::ObjectStore(_) => {}
518 Mutation::Allocator(AllocatorMutation::Deallocate { device_range, .. }) => {
519 checksum_list.mark_deallocated(journal_offset, device_range.clone().into());
520 }
521 _ => {}
522 }
523 Ok(())
524 }
525
526 #[trace]
528 pub async fn replay(
529 &self,
530 filesystem: Arc<FxFilesystem>,
531 on_new_allocator: Option<Box<dyn Fn(Arc<Allocator>) + Send + Sync>>,
532 ) -> Result<(), Error> {
533 let block_size = filesystem.block_size();
534
535 let (super_block, root_parent) =
536 self.super_block_manager.load(filesystem.device(), block_size).await?;
537
538 let root_parent = Arc::new(ObjectStore::attach_filesystem(root_parent, filesystem.clone()));
539
540 self.objects.set_root_parent_store(root_parent.clone());
541 let allocator =
542 Arc::new(Allocator::new(filesystem.clone(), super_block.allocator_object_id));
543 if let Some(on_new_allocator) = on_new_allocator {
544 on_new_allocator(allocator.clone());
545 }
546 self.objects.set_allocator(allocator.clone());
547 self.objects.set_borrowed_metadata_space(super_block.borrowed_metadata_space);
548 self.objects.set_last_end_offset(super_block.super_block_journal_file_offset);
549 {
550 let mut inner = self.inner.lock();
551 inner.super_block_header = super_block.clone();
552 }
553
554 let device = filesystem.device();
555
556 let mut handle;
557 {
558 let root_parent_layer = root_parent.tree().mutable_layer();
559 let mut iter = root_parent_layer
560 .seek(Bound::Included(&ObjectKey::attribute(
561 super_block.journal_object_id,
562 DEFAULT_DATA_ATTRIBUTE_ID,
563 AttributeKey::Extent(ExtentKey::search_key_from_offset(round_down(
564 super_block.journal_checkpoint.file_offset,
565 BLOCK_SIZE,
566 ))),
567 )))
568 .await
569 .context("Failed to seek root parent store")?;
570 let start_offset = if let Some(ItemRef {
571 key:
572 ObjectKey {
573 data:
574 ObjectKeyData::Attribute(
575 DEFAULT_DATA_ATTRIBUTE_ID,
576 AttributeKey::Extent(ExtentKey { range }),
577 ),
578 ..
579 },
580 ..
581 }) = iter.get()
582 {
583 range.start
584 } else {
585 0
586 };
587 handle = BootstrapObjectHandle::new_with_start_offset(
588 super_block.journal_object_id,
589 device.clone(),
590 start_offset,
591 );
592 while let Some(item) = iter.get() {
593 if !match item.into() {
594 Some((
595 object_id,
596 DEFAULT_DATA_ATTRIBUTE_ID,
597 ExtentKey { range },
598 ExtentValue::Some { device_offset, .. },
599 )) if object_id == super_block.journal_object_id => {
600 if let Some(end_offset) = handle.end_offset() {
601 if range.start != end_offset {
602 bail!(anyhow!(FxfsError::Inconsistent).context(format!(
603 "Unexpected journal extent {:?}, expected start: {}",
604 item, end_offset
605 )));
606 }
607 }
608 handle.push_extent(
609 0, *device_offset
611 ..*device_offset + range.length().context("Invalid extent")?,
612 );
613 true
614 }
615 _ => false,
616 } {
617 break;
618 }
619 iter.advance().await.context("Failed to advance root parent store iterator")?;
620 }
621 }
622
623 let mut reader = JournalReader::new(handle, &super_block.journal_checkpoint);
624 let JournaledTransactions { mut transactions, device_flushed_offset } = self
625 .read_transactions(&mut reader, None, INVALID_OBJECT_ID)
626 .await
627 .context("Reading transactions for replay")?;
628
629 let mut checksum_list = ChecksumList::new(device_flushed_offset);
631 let mut valid_to = reader.journal_file_checkpoint().file_offset;
632 let device_size = device.size();
633 'bad_replay: for JournaledTransaction {
634 checkpoint,
635 root_parent_mutations,
636 root_mutations,
637 non_root_mutations,
638 checksums,
639 ..
640 } in &transactions
641 {
642 for JournaledChecksums { device_range, checksums, first_write } in checksums {
643 checksum_list
644 .push(
645 checkpoint.file_offset,
646 device_range.clone(),
647 checksums.maybe_as_ref().context("Malformed checksums")?,
648 *first_write,
649 )
650 .context("Pushing journal checksum records to checksum list")?;
651 }
652 for mutation in root_parent_mutations
653 .iter()
654 .chain(root_mutations)
655 .chain(non_root_mutations.iter().map(|(_, m)| m))
656 {
657 if !self.validate_mutation(mutation, block_size, device_size) {
658 info!(mutation:?; "Stopping replay at bad mutation");
659 valid_to = checkpoint.file_offset;
660 break 'bad_replay;
661 }
662 self.update_checksum_list(checkpoint.file_offset, &mutation, &mut checksum_list)?;
663 }
664 }
665
666 let valid_to = checksum_list
668 .verify(device.as_ref(), valid_to)
669 .await
670 .context("Failed to validate checksums")?;
671
672 let mut last_checkpoint = reader.journal_file_checkpoint();
675 let mut journal_offsets = super_block.journal_file_offsets.clone();
676
677 for (index, JournaledTransaction { checkpoint, root_parent_mutations, end_flush, .. }) in
680 transactions.iter_mut().enumerate()
681 {
682 if checkpoint.file_offset >= valid_to {
683 last_checkpoint = checkpoint.clone();
684
685 transactions.truncate(index);
687 break;
688 }
689
690 let context = ApplyContext { mode: ApplyMode::Replay, checkpoint: checkpoint.clone() };
691 for mutation in root_parent_mutations.drain(..) {
692 self.objects
693 .apply_mutation(
694 super_block.root_parent_store_object_id,
695 mutation,
696 &context,
697 AssocObj::None,
698 )
699 .context("Failed to replay root parent store mutations")?;
700 }
701
702 if let Some((object_id, journal_offset)) = end_flush {
703 journal_offsets.insert(*object_id, *journal_offset);
704 }
705 }
706
707 let root_store = ObjectStore::open(
709 &root_parent,
710 super_block.root_store_object_id,
711 Box::new(NullCache {}),
712 )
713 .await
714 .context("Unable to open root store")?;
715
716 ensure!(
717 !root_store.is_encrypted(),
718 anyhow!(FxfsError::Inconsistent).context("Root store is encrypted")
719 );
720 self.objects.set_root_store(root_store);
721
722 let root_store_offset =
723 journal_offsets.get(&super_block.root_store_object_id).copied().unwrap_or(0);
724
725 for JournaledTransaction { checkpoint, root_mutations, .. } in &mut transactions {
727 if checkpoint.file_offset < root_store_offset {
728 continue;
729 }
730
731 let context = ApplyContext { mode: ApplyMode::Replay, checkpoint: checkpoint.clone() };
732 for mutation in root_mutations.drain(..) {
733 self.objects
734 .apply_mutation(
735 super_block.root_store_object_id,
736 mutation,
737 &context,
738 AssocObj::None,
739 )
740 .context("Failed to replay root store mutations")?;
741 }
742 }
743
744 allocator.open().await.context("Failed to open allocator")?;
746
747 for JournaledTransaction { checkpoint, non_root_mutations, end_offset, .. } in transactions
749 {
750 self.objects
751 .replay_mutations(
752 non_root_mutations,
753 &journal_offsets,
754 &ApplyContext { mode: ApplyMode::Replay, checkpoint },
755 end_offset,
756 )
757 .await
758 .context("Failed to replay mutations")?;
759 }
760
761 allocator.on_replay_complete().await.context("Failed to complete replay for allocator")?;
762
763 let discarded_to =
764 if last_checkpoint.file_offset != reader.journal_file_checkpoint().file_offset {
765 Some(reader.journal_file_checkpoint().file_offset)
766 } else {
767 None
768 };
769
770 {
772 if last_checkpoint.file_offset < super_block.super_block_journal_file_offset {
773 return Err(anyhow!(FxfsError::Inconsistent).context(format!(
774 "journal replay cut short; journal finishes at {}, but super-block was \
775 written at {}",
776 last_checkpoint.file_offset, super_block.super_block_journal_file_offset
777 )));
778 }
779 let handle = ObjectStore::open_object(
780 &root_parent,
781 super_block.journal_object_id,
782 journal_handle_options(),
783 None,
784 )
785 .await
786 .with_context(|| {
787 format!(
788 "Failed to open journal file (object id: {})",
789 super_block.journal_object_id
790 )
791 })?;
792 let _ = self.handle.set(handle);
793 let mut inner = self.inner.lock();
794 reader.skip_to_end_of_block();
795 let mut writer_checkpoint = reader.journal_file_checkpoint();
796
797 std::mem::drop(reader);
799
800 writer_checkpoint.checksum ^= RESET_XOR;
802 writer_checkpoint.version = LATEST_VERSION;
803 inner.flushed_offset = writer_checkpoint.file_offset;
804
805 inner.device_flushed_offset = inner.flushed_offset;
807
808 inner.writer.seek(writer_checkpoint);
809 inner.output_reset_version = true;
810 inner.valid_to = last_checkpoint.file_offset;
811 if last_checkpoint.file_offset < inner.flushed_offset {
812 inner.discard_offset = Some(last_checkpoint.file_offset);
813 }
814 }
815
816 self.objects
817 .on_replay_complete()
818 .await
819 .context("Failed to complete replay for object manager")?;
820
821 info!(checkpoint = last_checkpoint.file_offset, discarded_to; "replay complete");
822 Ok(())
823 }
824
825 async fn read_transactions(
826 &self,
827 reader: &mut JournalReader,
828 end_offset: Option<u64>,
829 object_id_filter: u64,
830 ) -> Result<JournaledTransactions, Error> {
831 let mut transactions = Vec::new();
832 let (mut device_flushed_offset, root_parent_store_object_id, root_store_object_id) = {
833 let super_block = &self.inner.lock().super_block_header;
834 (
835 super_block.super_block_journal_file_offset,
836 super_block.root_parent_store_object_id,
837 super_block.root_store_object_id,
838 )
839 };
840 let mut current_transaction = None;
841 let mut begin_flush_offsets = HashMap::default();
842 let mut stores_deleted = HashSet::new();
843 loop {
844 let checkpoint = reader.journal_file_checkpoint();
846 if let Some(end_offset) = end_offset {
847 if checkpoint.file_offset >= end_offset {
848 break;
849 }
850 }
851 let result =
852 reader.deserialize().await.context("Failed to deserialize journal record")?;
853 match result {
854 ReadResult::Reset(_) => {
855 if current_transaction.is_some() {
856 current_transaction = None;
857 transactions.pop();
858 }
859 let offset = reader.journal_file_checkpoint().file_offset;
860 if offset > device_flushed_offset {
861 device_flushed_offset = offset;
862 }
863 }
864 ReadResult::Some(record) => {
865 match record {
866 JournalRecord::EndBlock => {
867 reader.skip_to_end_of_block();
868 }
869 JournalRecord::Mutation { object_id, mutation } => {
870 let current_transaction = match current_transaction.as_mut() {
871 None => {
872 transactions.push(JournaledTransaction::new(checkpoint));
873 current_transaction = transactions.last_mut();
874 current_transaction.as_mut().unwrap()
875 }
876 Some(transaction) => transaction,
877 };
878
879 if stores_deleted.contains(&object_id) {
880 bail!(anyhow!(FxfsError::Inconsistent)
881 .context("Encountered mutations for deleted store"));
882 }
883
884 match &mutation {
885 Mutation::BeginFlush => {
886 begin_flush_offsets.insert(
887 object_id,
888 current_transaction.checkpoint.file_offset,
889 );
890 }
891 Mutation::EndFlush => {
892 if let Some(offset) = begin_flush_offsets.remove(&object_id) {
893 if current_transaction
896 .end_flush
897 .replace((object_id, offset + 1))
898 .is_some()
899 {
900 bail!(anyhow!(FxfsError::Inconsistent).context(
901 "Multiple EndFlush/DeleteVolume mutations in a \
902 single transaction"
903 ));
904 }
905 }
906 }
907 Mutation::DeleteVolume => {
908 if current_transaction
909 .end_flush
910 .replace((object_id, STORE_DELETED))
911 .is_some()
912 {
913 bail!(anyhow!(FxfsError::Inconsistent).context(
914 "Multiple EndFlush/DeleteVolume mutations in a single \
915 transaction"
916 ));
917 }
918 stores_deleted.insert(object_id);
919 }
920 _ => {}
921 }
922
923 if (object_id_filter == INVALID_OBJECT_ID
926 || object_id_filter == object_id)
927 && self.should_apply(object_id, ¤t_transaction.checkpoint)
928 {
929 if object_id == root_parent_store_object_id {
930 current_transaction.root_parent_mutations.push(mutation);
931 } else if object_id == root_store_object_id {
932 current_transaction.root_mutations.push(mutation);
933 } else {
934 current_transaction
935 .non_root_mutations
936 .push((object_id, mutation));
937 }
938 }
939 }
940 JournalRecord::DataChecksums(device_range, checksums, first_write) => {
941 let current_transaction = match current_transaction.as_mut() {
942 None => {
943 transactions.push(JournaledTransaction::new(checkpoint));
944 current_transaction = transactions.last_mut();
945 current_transaction.as_mut().unwrap()
946 }
947 Some(transaction) => transaction,
948 };
949 current_transaction.checksums.push(JournaledChecksums {
950 device_range,
951 checksums,
952 first_write,
953 });
954 }
955 JournalRecord::Commit => {
956 if let Some(JournaledTransaction {
957 ref checkpoint,
958 ref root_parent_mutations,
959 ref mut end_offset,
960 ..
961 }) = current_transaction.take()
962 {
963 for mutation in root_parent_mutations {
964 if let Mutation::ObjectStore(ObjectStoreMutation {
968 item:
969 Item {
970 key:
971 ObjectKey {
972 object_id,
973 data:
974 ObjectKeyData::Attribute(
975 DEFAULT_DATA_ATTRIBUTE_ID,
976 AttributeKey::Extent(ExtentKey {
977 range,
978 }),
979 ),
980 ..
981 },
982 value:
983 ObjectValue::Extent(ExtentValue::Some {
984 device_offset,
985 ..
986 }),
987 ..
988 },
989 ..
990 }) = mutation
991 {
992 let handle = reader.handle();
995 if *object_id != handle.object_id() {
996 continue;
997 }
998 if let Some(end_offset) = handle.end_offset() {
999 if range.start != end_offset {
1000 bail!(anyhow!(FxfsError::Inconsistent).context(
1001 format!(
1002 "Unexpected journal extent {:?} -> {}, \
1003 expected start: {}",
1004 range, device_offset, end_offset,
1005 )
1006 ));
1007 }
1008 }
1009 handle.push_extent(
1010 checkpoint.file_offset,
1011 *device_offset
1012 ..*device_offset
1013 + range.length().context("Invalid extent")?,
1014 );
1015 }
1016 }
1017 *end_offset = reader.journal_file_checkpoint().file_offset;
1018 }
1019 }
1020 JournalRecord::Discard(offset) => {
1021 if offset == 0 {
1022 bail!(anyhow!(FxfsError::Inconsistent)
1023 .context("Invalid offset for Discard"));
1024 }
1025 if let Some(transaction) = current_transaction.as_ref() {
1026 if transaction.checkpoint.file_offset < offset {
1027 continue;
1029 }
1030 }
1031 current_transaction = None;
1032 while let Some(transaction) = transactions.last() {
1033 if transaction.checkpoint.file_offset < offset {
1034 break;
1035 }
1036 transactions.pop();
1037 }
1038 reader.handle().discard_extents(offset);
1039 }
1040 JournalRecord::DidFlushDevice(offset) => {
1041 if offset > device_flushed_offset {
1042 device_flushed_offset = offset;
1043 }
1044 }
1045 }
1046 }
1047 ReadResult::ChecksumMismatch => break,
1049 }
1050 }
1051
1052 if current_transaction.is_some() {
1054 transactions.pop();
1055 }
1056
1057 Ok(JournaledTransactions { transactions, device_flushed_offset })
1058 }
1059
1060 pub async fn init_empty(&self, filesystem: Arc<FxFilesystem>) -> Result<(), Error> {
1063 const INIT_ROOT_PARENT_STORE_OBJECT_ID: u64 = 3;
1069 const INIT_ROOT_STORE_OBJECT_ID: u64 = 4;
1070 const INIT_ALLOCATOR_OBJECT_ID: u64 = 5;
1071
1072 info!(device_size = filesystem.device().size(); "Formatting");
1073
1074 let checkpoint = JournalCheckpoint {
1075 version: LATEST_VERSION,
1076 ..self.inner.lock().writer.journal_file_checkpoint()
1077 };
1078
1079 let root_parent = ObjectStore::new_empty(
1080 None,
1081 INIT_ROOT_PARENT_STORE_OBJECT_ID,
1082 filesystem.clone(),
1083 Box::new(NullCache {}),
1084 );
1085 self.objects.set_root_parent_store(root_parent.clone());
1086
1087 let allocator = Arc::new(Allocator::new(filesystem.clone(), INIT_ALLOCATOR_OBJECT_ID));
1088 self.objects.set_allocator(allocator.clone());
1089 self.objects.init_metadata_reservation()?;
1090
1091 let journal_handle;
1092 let super_block_a_handle;
1093 let super_block_b_handle;
1094 let root_store;
1095 let mut transaction = filesystem
1096 .clone()
1097 .new_transaction(
1098 lock_keys![],
1099 Options { skip_journal_checks: true, ..Default::default() },
1100 )
1101 .await?;
1102 root_store = root_parent
1103 .new_child_store(
1104 &mut transaction,
1105 NewChildStoreOptions { object_id: INIT_ROOT_STORE_OBJECT_ID, ..Default::default() },
1106 Box::new(NullCache {}),
1107 )
1108 .await
1109 .context("new_child_store")?;
1110 self.objects.set_root_store(root_store.clone());
1111
1112 allocator.create(&mut transaction).await?;
1113
1114 super_block_a_handle = ObjectStore::create_object_with_id(
1116 &root_store,
1117 &mut transaction,
1118 SuperBlockInstance::A.object_id(),
1119 HandleOptions::default(),
1120 None,
1121 )
1122 .await
1123 .context("create super block")?;
1124 root_store.update_last_object_id(SuperBlockInstance::A.object_id());
1125 super_block_a_handle
1126 .extend(&mut transaction, SuperBlockInstance::A.first_extent())
1127 .await
1128 .context("extend super block")?;
1129 super_block_b_handle = ObjectStore::create_object_with_id(
1130 &root_store,
1131 &mut transaction,
1132 SuperBlockInstance::B.object_id(),
1133 HandleOptions::default(),
1134 None,
1135 )
1136 .await
1137 .context("create super block")?;
1138 root_store.update_last_object_id(SuperBlockInstance::B.object_id());
1139 super_block_b_handle
1140 .extend(&mut transaction, SuperBlockInstance::B.first_extent())
1141 .await
1142 .context("extend super block")?;
1143
1144 journal_handle = ObjectStore::create_object(
1146 &root_parent,
1147 &mut transaction,
1148 journal_handle_options(),
1149 None,
1150 )
1151 .await
1152 .context("create journal")?;
1153 if !self.inner.lock().image_builder_mode {
1154 let mut file_range = 0..self.chunk_size();
1155 journal_handle
1156 .preallocate_range(&mut transaction, &mut file_range)
1157 .await
1158 .context("preallocate journal")?;
1159 if file_range.start < file_range.end {
1160 bail!("preallocate_range returned too little space");
1161 }
1162 }
1163
1164 root_store.create(&mut transaction).await?;
1166
1167 root_parent
1169 .set_graveyard_directory_object_id(Graveyard::create(&mut transaction, &root_parent));
1170
1171 transaction.commit().await?;
1172
1173 self.inner.lock().super_block_header = SuperBlockHeader::new(
1174 root_parent.store_object_id(),
1175 root_parent.graveyard_directory_object_id(),
1176 root_store.store_object_id(),
1177 allocator.object_id(),
1178 journal_handle.object_id(),
1179 checkpoint,
1180 LATEST_VERSION,
1181 );
1182
1183 let _ = self.handle.set(journal_handle);
1185 Ok(())
1186 }
1187
1188 pub async fn allocate_journal(&self) -> Result<(), Error> {
1191 let handle = self.handle.get().unwrap();
1192 let filesystem = handle.store().filesystem();
1193 let mut transaction = filesystem
1194 .clone()
1195 .new_transaction(
1196 lock_keys![LockKey::object(handle.store().store_object_id(), handle.object_id()),],
1197 Options { skip_journal_checks: true, ..Default::default() },
1198 )
1199 .await?;
1200 let mut file_range = 0..self.chunk_size();
1201 self.handle
1202 .get()
1203 .unwrap()
1204 .preallocate_range(&mut transaction, &mut file_range)
1205 .await
1206 .context("preallocate journal")?;
1207 if file_range.start < file_range.end {
1208 bail!("preallocate_range returned too little space");
1209 }
1210 transaction.commit().await?;
1211 Ok(())
1212 }
1213
1214 pub async fn init_superblocks(&self) -> Result<(), Error> {
1215 for _ in 0..2 {
1217 self.write_super_block().await?;
1218 }
1219 Ok(())
1220 }
1221
1222 pub async fn read_transactions_for_object(
1228 &self,
1229 object_id: u64,
1230 ) -> Result<Vec<JournaledTransaction>, Error> {
1231 let handle = self.handle.get().expect("No journal handle");
1232 let handle = ObjectStore::open_object(
1234 handle.owner(),
1235 handle.object_id(),
1236 journal_handle_options(),
1237 None,
1238 )
1239 .await?;
1240
1241 let checkpoint = match self.objects.journal_checkpoint(object_id) {
1242 Some(checkpoint) => checkpoint,
1243 None => return Ok(vec![]),
1244 };
1245 let mut reader = JournalReader::new(handle, &checkpoint);
1246 let end_offset = self.inner.lock().valid_to;
1249 Ok(self.read_transactions(&mut reader, Some(end_offset), object_id).await?.transactions)
1250 }
1251
1252 pub async fn commit(&self, transaction: &mut Transaction<'_>) -> Result<u64, Error> {
1254 if transaction.is_empty() {
1255 return Ok(self.inner.lock().writer.journal_file_checkpoint().file_offset);
1256 }
1257
1258 self.pre_commit(transaction).await?;
1259 Ok(self.write_and_apply_mutations(transaction))
1260 }
1261
1262 async fn pre_commit(&self, transaction: &Transaction<'_>) -> Result<(), Error> {
1265 let handle;
1266
1267 let (size, zero_offset) = {
1268 let mut inner = self.inner.lock();
1269
1270 if std::mem::take(&mut inner.output_reset_version) {
1272 LATEST_VERSION.serialize_into(&mut inner.writer)?;
1273 }
1274
1275 if let Some(discard_offset) = inner.discard_offset {
1276 JournalRecord::Discard(discard_offset).serialize_into(&mut inner.writer)?;
1277 inner.discard_offset = None;
1278 }
1279
1280 if inner.needs_did_flush_device {
1281 let offset = inner.device_flushed_offset;
1282 JournalRecord::DidFlushDevice(offset).serialize_into(&mut inner.writer)?;
1283 inner.needs_did_flush_device = false;
1284 }
1285
1286 handle = match self.handle.get() {
1287 None => return Ok(()),
1288 Some(x) => x,
1289 };
1290
1291 let file_offset = inner.writer.journal_file_checkpoint().file_offset;
1292
1293 let size = handle.get_size();
1294 let size = if file_offset + self.chunk_size() > size { Some(size) } else { None };
1295
1296 if size.is_none()
1297 && inner.zero_offset.is_none()
1298 && !self.objects.needs_borrow_for_journal(file_offset)
1299 {
1300 return Ok(());
1301 }
1302
1303 (size, inner.zero_offset)
1304 };
1305
1306 let mut transaction = handle
1307 .new_transaction_with_options(Options {
1308 skip_journal_checks: true,
1309 borrow_metadata_space: true,
1310 allocator_reservation: Some(self.objects.metadata_reservation()),
1311 txn_guard: Some(transaction.txn_guard()),
1312 ..Default::default()
1313 })
1314 .await?;
1315 if let Some(size) = size {
1316 handle
1317 .preallocate_range(&mut transaction, &mut (size..size + self.chunk_size()))
1318 .await?;
1319 }
1320 if let Some(zero_offset) = zero_offset {
1321 handle.zero(&mut transaction, 0..zero_offset).await?;
1322 }
1323
1324 self.write_and_apply_mutations(&mut transaction);
1327
1328 let mut inner = self.inner.lock();
1329
1330 if let Some(size) = size {
1333 assert!(inner.writer.journal_file_checkpoint().file_offset < size);
1334 }
1335
1336 if inner.zero_offset == zero_offset {
1337 inner.zero_offset = None;
1338 }
1339
1340 Ok(())
1341 }
1342
1343 fn should_apply(&self, object_id: u64, journal_file_checkpoint: &JournalCheckpoint) -> bool {
1347 let super_block_header = &self.inner.lock().super_block_header;
1348 let offset = super_block_header
1349 .journal_file_offsets
1350 .get(&object_id)
1351 .cloned()
1352 .unwrap_or(super_block_header.super_block_journal_file_offset);
1353 journal_file_checkpoint.file_offset >= offset
1354 }
1355
1356 async fn write_super_block(&self) -> Result<(), Error> {
1359 let root_parent_store = self.objects.root_parent_store();
1360
1361 let old_layers;
1366 let old_super_block_offset;
1367 let mut new_super_block_header;
1368 let checkpoint;
1369 let borrowed;
1370
1371 {
1372 let _sync_guard = debug_assert_not_too_long!(self.sync_mutex.lock());
1373 {
1374 let _write_guard = self.writer_mutex.lock();
1375 (checkpoint, borrowed) = self.pad_to_block()?;
1376 old_layers = super_block::compact_root_parent(&*root_parent_store)?;
1377 }
1378 self.flush_device(checkpoint.file_offset)
1379 .await
1380 .context("flush failed when writing superblock")?;
1381 }
1382
1383 new_super_block_header = self.inner.lock().super_block_header.clone();
1384
1385 old_super_block_offset = new_super_block_header.journal_checkpoint.file_offset;
1386
1387 let (journal_file_offsets, min_checkpoint) = self.objects.journal_file_offsets();
1388
1389 new_super_block_header.generation =
1390 new_super_block_header.generation.checked_add(1).ok_or(FxfsError::Inconsistent)?;
1391 new_super_block_header.super_block_journal_file_offset = checkpoint.file_offset;
1392 new_super_block_header.journal_checkpoint = min_checkpoint.unwrap_or(checkpoint);
1393 new_super_block_header.journal_checkpoint.version = LATEST_VERSION;
1394 new_super_block_header.journal_file_offsets = journal_file_offsets;
1395 new_super_block_header.borrowed_metadata_space = borrowed;
1396
1397 self.super_block_manager
1398 .save(
1399 new_super_block_header.clone(),
1400 self.objects.root_parent_store().filesystem(),
1401 old_layers,
1402 )
1403 .await?;
1404 {
1405 let mut inner = self.inner.lock();
1406 inner.super_block_header = new_super_block_header;
1407 inner.zero_offset = Some(round_down(old_super_block_offset, BLOCK_SIZE));
1408 }
1409
1410 Ok(())
1411 }
1412
1413 pub async fn sync(
1420 &self,
1421 options: SyncOptions<'_>,
1422 ) -> Result<Option<(JournalCheckpoint, u64)>, Error> {
1423 let _guard = debug_assert_not_too_long!(self.sync_mutex.lock());
1424
1425 let (checkpoint, borrowed) = {
1426 if let Some(precondition) = options.precondition {
1427 if !precondition() {
1428 return Ok(None);
1429 }
1430 }
1431
1432 let _guard = self.writer_mutex.lock();
1435
1436 self.pad_to_block()?
1437 };
1438
1439 if options.flush_device {
1440 self.flush_device(checkpoint.file_offset).await.context("sync: flush failed")?;
1441 }
1442
1443 Ok(Some((checkpoint, borrowed)))
1444 }
1445
1446 fn pad_to_block(&self) -> Result<(JournalCheckpoint, u64), Error> {
1450 let mut inner = self.inner.lock();
1451 let checkpoint = inner.writer.journal_file_checkpoint();
1452 if checkpoint.file_offset % BLOCK_SIZE != 0 {
1453 JournalRecord::EndBlock.serialize_into(&mut inner.writer)?;
1454 inner.writer.pad_to_block()?;
1455 if let Some(waker) = inner.flush_waker.take() {
1456 waker.wake();
1457 }
1458 }
1459 Ok((checkpoint, self.objects.borrowed_metadata_space()))
1460 }
1461
1462 async fn flush_device(&self, checkpoint_offset: u64) -> Result<(), Error> {
1463 debug_assert_not_too_long!(poll_fn(|ctx| {
1464 let mut inner = self.inner.lock();
1465 if inner.flushed_offset >= checkpoint_offset {
1466 Poll::Ready(Ok(()))
1467 } else if inner.terminate {
1468 let context = inner
1469 .terminate_reason
1470 .as_ref()
1471 .map(|e| format!("Journal closed with error: {:?}", e))
1472 .unwrap_or_else(|| "Journal closed".to_string());
1473 Poll::Ready(Err(anyhow!(FxfsError::JournalFlushError).context(context)))
1474 } else {
1475 inner.sync_waker = Some(ctx.waker().clone());
1476 Poll::Pending
1477 }
1478 }))?;
1479
1480 let needs_flush = self.inner.lock().device_flushed_offset < checkpoint_offset;
1481 if needs_flush {
1482 let trace = self.trace.load(Ordering::Relaxed);
1483 if trace {
1484 info!("J: start flush device");
1485 }
1486 self.handle.get().unwrap().flush_device().await?;
1487 if trace {
1488 info!("J: end flush device");
1489 }
1490
1491 {
1499 let mut inner = self.inner.lock();
1500 inner.device_flushed_offset = checkpoint_offset;
1501 inner.needs_did_flush_device = true;
1502 }
1503
1504 self.objects.allocator().did_flush_device(checkpoint_offset).await;
1507 if trace {
1508 info!("J: did flush device");
1509 }
1510 }
1511
1512 Ok(())
1513 }
1514
1515 pub fn super_block_header(&self) -> SuperBlockHeader {
1517 self.inner.lock().super_block_header.clone()
1518 }
1519
1520 pub async fn check_journal_space(&self) -> Result<(), Error> {
1522 loop {
1523 debug_assert_not_too_long!({
1524 let inner = self.inner.lock();
1525 if inner.terminate {
1526 let context = inner
1529 .terminate_reason
1530 .as_ref()
1531 .map(|e| format!("Journal closed with error: {:?}", e))
1532 .unwrap_or_else(|| "Journal closed".to_string());
1533 break Err(anyhow!(FxfsError::JournalFlushError).context(context));
1534 }
1535 if self.objects.last_end_offset()
1536 - inner.super_block_header.journal_checkpoint.file_offset
1537 < inner.reclaim_size
1538 {
1539 break Ok(());
1540 }
1541 if inner.image_builder_mode {
1542 break Ok(());
1543 }
1544 if inner.disable_compactions {
1545 break Err(
1546 anyhow!(FxfsError::JournalFlushError).context("Compactions disabled")
1547 );
1548 }
1549 self.reclaim_event.listen()
1550 });
1551 }
1552 }
1553
1554 fn chunk_size(&self) -> u64 {
1555 CHUNK_SIZE
1556 }
1557
1558 fn write_and_apply_mutations(&self, transaction: &mut Transaction<'_>) -> u64 {
1559 let checkpoint_before;
1560 let checkpoint_after;
1561 {
1562 let _guard = self.writer_mutex.lock();
1563 checkpoint_before = {
1564 let mut inner = self.inner.lock();
1565 let checkpoint = inner.writer.journal_file_checkpoint();
1566 for TxnMutation { object_id, mutation, .. } in transaction.mutations() {
1567 self.objects.write_mutation(
1568 *object_id,
1569 mutation,
1570 Writer(*object_id, &mut inner.writer),
1571 );
1572 }
1573 checkpoint
1574 };
1575 let maybe_mutation =
1576 self.objects.apply_transaction(transaction, &checkpoint_before).expect(
1577 "apply_transaction should not fail in live mode; \
1578 filesystem will be in an inconsistent state",
1579 );
1580 checkpoint_after = {
1581 let mut inner = self.inner.lock();
1582 if let Some(mutation) = maybe_mutation {
1583 inner
1584 .writer
1585 .write_record(&JournalRecord::Mutation { object_id: 0, mutation })
1586 .unwrap();
1587 }
1588 for (device_range, checksums, first_write) in
1589 transaction.take_checksums().into_iter()
1590 {
1591 inner
1592 .writer
1593 .write_record(&JournalRecord::DataChecksums(
1594 device_range,
1595 Checksums::fletcher(checksums),
1596 first_write,
1597 ))
1598 .unwrap();
1599 }
1600 inner.writer.write_record(&JournalRecord::Commit).unwrap();
1601
1602 inner.writer.journal_file_checkpoint()
1603 };
1604 }
1605 self.objects.did_commit_transaction(
1606 transaction,
1607 &checkpoint_before,
1608 checkpoint_after.file_offset,
1609 );
1610
1611 if let Some(waker) = self.inner.lock().flush_waker.take() {
1612 waker.wake();
1613 }
1614
1615 checkpoint_before.file_offset
1616 }
1617
1618 pub async fn flush_task(self: Arc<Self>) {
1622 let mut flush_fut = None;
1623 let mut compact_fut = None;
1624 let mut flush_error = false;
1625 poll_fn(|ctx| loop {
1626 {
1627 let mut inner = self.inner.lock();
1628 if flush_fut.is_none() && !flush_error && self.handle.get().is_some() {
1629 let flushable = inner.writer.flushable_bytes();
1630 if flushable > 0 {
1631 flush_fut = Some(self.flush(flushable).boxed());
1632 }
1633 }
1634 if inner.terminate && flush_fut.is_none() && compact_fut.is_none() {
1635 return Poll::Ready(());
1636 }
1637 if compact_fut.is_none()
1641 && !inner.terminate
1642 && !inner.disable_compactions
1643 && !inner.image_builder_mode
1644 && self.objects.last_end_offset()
1645 - inner.super_block_header.journal_checkpoint.file_offset
1646 > inner.reclaim_size / 2
1647 {
1648 compact_fut = Some(self.compact().boxed());
1649 inner.compaction_running = true;
1650 }
1651 inner.flush_waker = Some(ctx.waker().clone());
1652 }
1653 let mut pending = true;
1654 if let Some(fut) = flush_fut.as_mut() {
1655 if let Poll::Ready(result) = fut.poll_unpin(ctx) {
1656 if let Err(e) = result {
1657 self.inner.lock().terminate(Some(e.context("Flush error")));
1658 self.reclaim_event.notify(usize::MAX);
1659 flush_error = true;
1660 }
1661 flush_fut = None;
1662 pending = false;
1663 }
1664 }
1665 if let Some(fut) = compact_fut.as_mut() {
1666 if let Poll::Ready(result) = fut.poll_unpin(ctx) {
1667 let mut inner = self.inner.lock();
1668 if let Err(e) = result {
1669 inner.terminate(Some(e.context("Compaction error")));
1670 }
1671 compact_fut = None;
1672 inner.compaction_running = false;
1673 self.reclaim_event.notify(usize::MAX);
1674 pending = false;
1675 }
1676 }
1677 if pending {
1678 return Poll::Pending;
1679 }
1680 })
1681 .await;
1682 }
1683
1684 async fn flush(&self, amount: usize) -> Result<(), Error> {
1685 let handle = self.handle.get().unwrap();
1686 let mut buf = handle.allocate_buffer(amount).await;
1687 let offset = self.inner.lock().writer.take_flushable(buf.as_mut());
1688 let len = buf.len() as u64;
1689 self.handle.get().unwrap().overwrite(offset, buf.as_mut(), false).await?;
1690 let mut inner = self.inner.lock();
1691 if let Some(waker) = inner.sync_waker.take() {
1692 waker.wake();
1693 }
1694 inner.flushed_offset = offset + len;
1695 inner.valid_to = inner.flushed_offset;
1696 Ok(())
1697 }
1698
1699 #[trace]
1702 pub async fn compact(&self) -> Result<(), Error> {
1703 let trace = self.trace.load(Ordering::Relaxed);
1704 debug!("Compaction starting");
1705 if trace {
1706 info!("J: start compaction");
1707 }
1708 let earliest_version = self.objects.flush().await.context("Failed to flush objects")?;
1709 self.inner.lock().super_block_header.earliest_version = earliest_version;
1710 self.write_super_block().await.context("Failed to write superblock")?;
1711 if trace {
1712 info!("J: end compaction");
1713 }
1714 debug!("Compaction finished");
1715 Ok(())
1716 }
1717
1718 pub async fn stop_compactions(&self) {
1719 loop {
1720 debug_assert_not_too_long!({
1721 let mut inner = self.inner.lock();
1722 inner.disable_compactions = true;
1723 if !inner.compaction_running {
1724 return;
1725 }
1726 self.reclaim_event.listen()
1727 });
1728 }
1729 }
1730
1731 pub fn track_statistics(self: &Arc<Self>, parent: &fuchsia_inspect::Node, name: &str) {
1734 let this = Arc::downgrade(self);
1735 parent.record_lazy_child(name, move || {
1736 let this_clone = this.clone();
1737 async move {
1738 let inspector = fuchsia_inspect::Inspector::default();
1739 if let Some(this) = this_clone.upgrade() {
1740 let (journal_min, journal_max, journal_reclaim_size) = {
1741 let inner = this.inner.lock();
1743 (
1744 round_down(
1745 inner.super_block_header.journal_checkpoint.file_offset,
1746 BLOCK_SIZE,
1747 ),
1748 inner.flushed_offset,
1749 inner.reclaim_size,
1750 )
1751 };
1752 let root = inspector.root();
1753 root.record_uint("journal_min_offset", journal_min);
1754 root.record_uint("journal_max_offset", journal_max);
1755 root.record_uint("journal_size", journal_max - journal_min);
1756 root.record_uint("journal_reclaim_size", journal_reclaim_size);
1757
1758 if let Some(x) = round_div(
1760 100 * (journal_max - journal_min),
1761 this.objects.allocator().get_disk_bytes(),
1762 ) {
1763 root.record_uint("journal_size_to_disk_size_percent", x);
1764 }
1765 }
1766 Ok(inspector)
1767 }
1768 .boxed()
1769 });
1770 }
1771
1772 pub fn terminate(&self) {
1774 self.inner.lock().terminate(None);
1775 self.reclaim_event.notify(usize::MAX);
1776 }
1777}
1778
1779pub struct Writer<'a>(u64, &'a mut JournalWriter);
1781
1782impl Writer<'_> {
1783 pub fn write(&mut self, mutation: Mutation) {
1784 self.1.write_record(&JournalRecord::Mutation { object_id: self.0, mutation }).unwrap();
1785 }
1786}
1787
1788#[cfg(test)]
1789mod tests {
1790 use crate::filesystem::{FxFilesystem, SyncOptions};
1791 use crate::fsck::fsck;
1792 use crate::object_handle::{ObjectHandle, ReadObjectHandle, WriteObjectHandle};
1793 use crate::object_store::directory::Directory;
1794 use crate::object_store::transaction::Options;
1795 use crate::object_store::volume::root_volume;
1796 use crate::object_store::{lock_keys, HandleOptions, LockKey, ObjectStore, NO_OWNER};
1797 use fuchsia_async as fasync;
1798 use fuchsia_async::MonotonicDuration;
1799 use storage_device::fake_device::FakeDevice;
1800 use storage_device::DeviceHolder;
1801
1802 const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
1803
1804 #[fuchsia::test]
1805 async fn test_replay() {
1806 const TEST_DATA: &[u8] = b"hello";
1807
1808 let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1809
1810 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1811
1812 let object_id = {
1813 let root_store = fs.root_store();
1814 let root_directory =
1815 Directory::open(&root_store, root_store.root_directory_object_id())
1816 .await
1817 .expect("open failed");
1818 let mut transaction = fs
1819 .clone()
1820 .new_transaction(
1821 lock_keys![LockKey::object(
1822 root_store.store_object_id(),
1823 root_store.root_directory_object_id(),
1824 )],
1825 Options::default(),
1826 )
1827 .await
1828 .expect("new_transaction failed");
1829 let handle = root_directory
1830 .create_child_file(&mut transaction, "test")
1831 .await
1832 .expect("create_child_file failed");
1833
1834 transaction.commit().await.expect("commit failed");
1835 let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
1836 buf.as_mut_slice().copy_from_slice(TEST_DATA);
1837 handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
1838 fs.sync(SyncOptions::default()).await.expect("sync failed");
1841 handle.object_id()
1842 };
1843
1844 {
1845 fs.close().await.expect("Close failed");
1846 let device = fs.take_device().await;
1847 device.reopen(false);
1848 let fs = FxFilesystem::open(device).await.expect("open failed");
1849 let handle = ObjectStore::open_object(
1850 &fs.root_store(),
1851 object_id,
1852 HandleOptions::default(),
1853 None,
1854 )
1855 .await
1856 .expect("open_object failed");
1857 let mut buf = handle.allocate_buffer(TEST_DEVICE_BLOCK_SIZE as usize).await;
1858 assert_eq!(handle.read(0, buf.as_mut()).await.expect("read failed"), TEST_DATA.len());
1859 assert_eq!(&buf.as_slice()[..TEST_DATA.len()], TEST_DATA);
1860 fsck(fs.clone()).await.expect("fsck failed");
1861 fs.close().await.expect("Close failed");
1862 }
1863 }
1864
1865 #[fuchsia::test]
1866 async fn test_reset() {
1867 const TEST_DATA: &[u8] = b"hello";
1868
1869 let device = DeviceHolder::new(FakeDevice::new(32768, TEST_DEVICE_BLOCK_SIZE));
1870
1871 let mut object_ids = Vec::new();
1872
1873 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1874 {
1875 let root_store = fs.root_store();
1876 let root_directory =
1877 Directory::open(&root_store, root_store.root_directory_object_id())
1878 .await
1879 .expect("open failed");
1880 let mut transaction = fs
1881 .clone()
1882 .new_transaction(
1883 lock_keys![LockKey::object(
1884 root_store.store_object_id(),
1885 root_store.root_directory_object_id(),
1886 )],
1887 Options::default(),
1888 )
1889 .await
1890 .expect("new_transaction failed");
1891 let handle = root_directory
1892 .create_child_file(&mut transaction, "test")
1893 .await
1894 .expect("create_child_file failed");
1895 transaction.commit().await.expect("commit failed");
1896 let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
1897 buf.as_mut_slice().copy_from_slice(TEST_DATA);
1898 handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
1899 fs.sync(SyncOptions::default()).await.expect("sync failed");
1900 object_ids.push(handle.object_id());
1901
1902 for i in 0..1000 {
1905 let mut transaction = fs
1906 .clone()
1907 .new_transaction(
1908 lock_keys![LockKey::object(
1909 root_store.store_object_id(),
1910 root_store.root_directory_object_id(),
1911 )],
1912 Options::default(),
1913 )
1914 .await
1915 .expect("new_transaction failed");
1916 let handle = root_directory
1917 .create_child_file(&mut transaction, &format!("{}", i))
1918 .await
1919 .expect("create_child_file failed");
1920 transaction.commit().await.expect("commit failed");
1921 let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
1922 buf.as_mut_slice().copy_from_slice(TEST_DATA);
1923 handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
1924 object_ids.push(handle.object_id());
1925 }
1926 }
1927 fs.close().await.expect("fs close failed");
1928 let device = fs.take_device().await;
1929 device.reopen(false);
1930 let fs = FxFilesystem::open(device).await.expect("open failed");
1931 fsck(fs.clone()).await.expect("fsck failed");
1932 {
1933 let root_store = fs.root_store();
1934 for &object_id in &object_ids[0..1] {
1936 let handle = ObjectStore::open_object(
1937 &root_store,
1938 object_id,
1939 HandleOptions::default(),
1940 None,
1941 )
1942 .await
1943 .expect("open_object failed");
1944 let mut buf = handle.allocate_buffer(TEST_DEVICE_BLOCK_SIZE as usize).await;
1945 assert_eq!(
1946 handle.read(0, buf.as_mut()).await.expect("read failed"),
1947 TEST_DATA.len()
1948 );
1949 assert_eq!(&buf.as_slice()[..TEST_DATA.len()], TEST_DATA);
1950 }
1951
1952 let root_directory =
1954 Directory::open(&root_store, root_store.root_directory_object_id())
1955 .await
1956 .expect("open failed");
1957 let mut transaction = fs
1958 .clone()
1959 .new_transaction(
1960 lock_keys![LockKey::object(
1961 root_store.store_object_id(),
1962 root_store.root_directory_object_id(),
1963 )],
1964 Options::default(),
1965 )
1966 .await
1967 .expect("new_transaction failed");
1968 let handle = root_directory
1969 .create_child_file(&mut transaction, "test2")
1970 .await
1971 .expect("create_child_file failed");
1972 transaction.commit().await.expect("commit failed");
1973 let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
1974 buf.as_mut_slice().copy_from_slice(TEST_DATA);
1975 handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
1976 fs.sync(SyncOptions::default()).await.expect("sync failed");
1977 object_ids.push(handle.object_id());
1978 }
1979
1980 fs.close().await.expect("close failed");
1981 let device = fs.take_device().await;
1982 device.reopen(false);
1983 let fs = FxFilesystem::open(device).await.expect("open failed");
1984 {
1985 fsck(fs.clone()).await.expect("fsck failed");
1986
1987 for &object_id in object_ids[0..1].iter().chain(object_ids.last().cloned().iter()) {
1989 let handle = ObjectStore::open_object(
1990 &fs.root_store(),
1991 object_id,
1992 HandleOptions::default(),
1993 None,
1994 )
1995 .await
1996 .unwrap_or_else(|e| {
1997 panic!("open_object failed (object_id: {}): {:?}", object_id, e)
1998 });
1999 let mut buf = handle.allocate_buffer(TEST_DEVICE_BLOCK_SIZE as usize).await;
2000 assert_eq!(
2001 handle.read(0, buf.as_mut()).await.expect("read failed"),
2002 TEST_DATA.len()
2003 );
2004 assert_eq!(&buf.as_slice()[..TEST_DATA.len()], TEST_DATA);
2005 }
2006 }
2007 fs.close().await.expect("close failed");
2008 }
2009
2010 #[fuchsia::test]
2011 async fn test_discard() {
2012 let device = {
2013 let device = DeviceHolder::new(FakeDevice::new(8192, 4096));
2014 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2015 let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
2016
2017 let store =
2018 root_volume.new_volume("test", NO_OWNER, None).await.expect("new_volume failed");
2019 let root_directory = Directory::open(&store, store.root_directory_object_id())
2020 .await
2021 .expect("open failed");
2022
2023 let mut i = 0;
2025 loop {
2026 let mut transaction = fs
2027 .clone()
2028 .new_transaction(
2029 lock_keys![LockKey::object(
2030 store.store_object_id(),
2031 store.root_directory_object_id()
2032 )],
2033 Options::default(),
2034 )
2035 .await
2036 .expect("new_transaction failed");
2037 root_directory
2038 .create_child_file(&mut transaction, &format!("a {i}"))
2039 .await
2040 .expect("create_child_file failed");
2041 if transaction.commit().await.expect("commit failed") > super::CHUNK_SIZE {
2042 break;
2043 }
2044 i += 1;
2045 }
2046
2047 fs.journal().compact().await.expect("compact failed");
2049 fs.journal().stop_compactions().await;
2050
2051 let mut i = 0;
2053 loop {
2054 let mut transaction = fs
2055 .clone()
2056 .new_transaction(
2057 lock_keys![LockKey::object(
2058 store.store_object_id(),
2059 store.root_directory_object_id()
2060 )],
2061 Options::default(),
2062 )
2063 .await
2064 .expect("new_transaction failed");
2065 root_directory
2066 .create_child_file(&mut transaction, &format!("b {i}"))
2067 .await
2068 .expect("create_child_file failed");
2069 if transaction.commit().await.expect("commit failed") > 2 * super::CHUNK_SIZE {
2070 break;
2071 }
2072 i += 1;
2073 }
2074
2075 fasync::Timer::new(MonotonicDuration::from_millis(10)).await;
2077 fs.device().snapshot().expect("snapshot failed")
2080 };
2081
2082 let fs = FxFilesystem::open(device).await.expect("open failed");
2083
2084 {
2085 let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
2086
2087 let store = root_volume.volume("test", NO_OWNER, None).await.expect("volume failed");
2088
2089 let root_directory = Directory::open(&store, store.root_directory_object_id())
2090 .await
2091 .expect("open failed");
2092
2093 let mut transaction = fs
2095 .clone()
2096 .new_transaction(
2097 lock_keys![LockKey::object(
2098 store.store_object_id(),
2099 store.root_directory_object_id()
2100 )],
2101 Options::default(),
2102 )
2103 .await
2104 .expect("new_transaction failed");
2105 root_directory
2106 .create_child_file(&mut transaction, &format!("d"))
2107 .await
2108 .expect("create_child_file failed");
2109 transaction.commit().await.expect("commit failed");
2110 }
2111
2112 fs.close().await.expect("close failed");
2113 let device = fs.take_device().await;
2114 device.reopen(false);
2115
2116 let fs = FxFilesystem::open(device).await.expect("open failed");
2117 fsck(fs.clone()).await.expect("fsck failed");
2118 fs.close().await.expect("close failed");
2119 }
2120}
2121
2122#[cfg(fuzz)]
2123mod fuzz {
2124 use fuzz::fuzz;
2125
2126 #[fuzz]
2127 fn fuzz_journal_bytes(input: Vec<u8>) {
2128 use crate::filesystem::FxFilesystem;
2129 use fuchsia_async as fasync;
2130 use std::io::Write;
2131 use storage_device::fake_device::FakeDevice;
2132 use storage_device::DeviceHolder;
2133
2134 fasync::SendExecutor::new(4).run(async move {
2135 let device = DeviceHolder::new(FakeDevice::new(32768, 512));
2136 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2137 fs.journal().inner.lock().writer.write_all(&input).expect("write failed");
2138 fs.close().await.expect("close failed");
2139 let device = fs.take_device().await;
2140 device.reopen(false);
2141 if let Ok(fs) = FxFilesystem::open(device).await {
2142 let _ = fs.close().await;
2145 }
2146 });
2147 }
2148
2149 #[fuzz]
2150 fn fuzz_journal(input: Vec<super::JournalRecord>) {
2151 use crate::filesystem::FxFilesystem;
2152 use fuchsia_async as fasync;
2153 use storage_device::fake_device::FakeDevice;
2154 use storage_device::DeviceHolder;
2155
2156 fasync::SendExecutor::new(4).run(async move {
2157 let device = DeviceHolder::new(FakeDevice::new(32768, 512));
2158 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2159 {
2160 let mut inner = fs.journal().inner.lock();
2161 for record in &input {
2162 let _ = inner.writer.write_record(record);
2163 }
2164 }
2165 fs.close().await.expect("close failed");
2166 let device = fs.take_device().await;
2167 device.reopen(false);
2168 if let Ok(fs) = FxFilesystem::open(device).await {
2169 let _ = fs.close().await;
2172 }
2173 });
2174 }
2175}