1use crate::errors::FxfsError;
6use crate::fsck::{fsck_volume_with_options, fsck_with_options, FsckOptions};
7use crate::log::*;
8use crate::object_store::allocator::{Allocator, Hold, Reservation};
9use crate::object_store::directory::Directory;
10use crate::object_store::graveyard::Graveyard;
11use crate::object_store::journal::super_block::SuperBlockHeader;
12use crate::object_store::journal::{self, Journal, JournalCheckpoint, JournalOptions};
13use crate::object_store::object_manager::ObjectManager;
14use crate::object_store::transaction::{
15 self, lock_keys, AssocObj, LockKey, LockKeys, LockManager, MetadataReservation, Mutation,
16 ReadGuard, Transaction, TRANSACTION_METADATA_MAX_AMOUNT,
17};
18use crate::object_store::volume::{root_volume, VOLUMES_DIRECTORY};
19use crate::object_store::{ObjectStore, NO_OWNER};
20use crate::range::RangeExt;
21use crate::serialized_types::{Version, LATEST_VERSION};
22use crate::{debug_assert_not_too_long, metrics};
23use anyhow::{bail, ensure, Context, Error};
24use async_trait::async_trait;
25use event_listener::Event;
26use fuchsia_async as fasync;
27use fuchsia_inspect::{NumericProperty as _, UintProperty};
28use fuchsia_sync::Mutex;
29use futures::FutureExt;
30use fxfs_crypto::Crypt;
31use once_cell::sync::OnceCell;
32use static_assertions::const_assert;
33use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
34use std::sync::{Arc, Weak};
35use storage_device::{Device, DeviceHolder};
36
37pub const MIN_BLOCK_SIZE: u64 = 4096;
38pub const MAX_BLOCK_SIZE: u64 = u16::MAX as u64 + 1;
39
40pub const MAX_FILE_SIZE: u64 = i64::MAX as u64 - 4095;
45const_assert!(9223372036854771712 == MAX_FILE_SIZE);
46
47const MAX_IN_FLIGHT_TRANSACTIONS: u64 = 4;
49
50const TRIM_AFTER_BOOT_TIMER: std::time::Duration = std::time::Duration::from_secs(60 * 60);
54
55const TRIM_INTERVAL_TIMER: std::time::Duration = std::time::Duration::from_secs(60 * 60 * 24);
57
58pub struct Info {
60 pub total_bytes: u64,
61 pub used_bytes: u64,
62}
63
64pub type PostCommitHook =
65 Option<Box<dyn Fn() -> futures::future::BoxFuture<'static, ()> + Send + Sync>>;
66
67pub type PreCommitHook = Option<Box<dyn Fn(&Transaction<'_>) -> Result<(), Error> + Send + Sync>>;
68
69pub struct Options {
70 pub read_only: bool,
72
73 pub roll_metadata_key_byte_count: u64,
77
78 pub pre_commit_hook: PreCommitHook,
81
82 pub post_commit_hook: PostCommitHook,
85
86 pub skip_initial_reap: bool,
89
90 pub trim_config: Option<(std::time::Duration, std::time::Duration)>,
95
96 pub image_builder_mode: bool,
98}
99
100impl Default for Options {
101 fn default() -> Self {
102 Options {
103 roll_metadata_key_byte_count: 128 * 1024 * 1024,
104 read_only: false,
105 pre_commit_hook: None,
106 post_commit_hook: None,
107 skip_initial_reap: false,
108 trim_config: Some((TRIM_AFTER_BOOT_TIMER, TRIM_INTERVAL_TIMER)),
109 image_builder_mode: false,
110 }
111 }
112}
113
114pub struct ApplyContext<'a, 'b> {
116 pub mode: ApplyMode<'a, 'b>,
118
119 pub checkpoint: JournalCheckpoint,
121}
122
123pub enum ApplyMode<'a, 'b> {
126 Replay,
127 Live(&'a Transaction<'b>),
128}
129
130impl ApplyMode<'_, '_> {
131 pub fn is_replay(&self) -> bool {
132 matches!(self, ApplyMode::Replay)
133 }
134
135 pub fn is_live(&self) -> bool {
136 matches!(self, ApplyMode::Live(_))
137 }
138}
139
140#[async_trait]
143pub trait JournalingObject: Send + Sync {
144 fn apply_mutation(
148 &self,
149 mutation: Mutation,
150 context: &ApplyContext<'_, '_>,
151 assoc_obj: AssocObj<'_>,
152 ) -> Result<(), Error>;
153
154 fn drop_mutation(&self, mutation: Mutation, transaction: &Transaction<'_>);
156
157 async fn flush(&self) -> Result<Version, Error>;
161
162 fn write_mutation(&self, mutation: &Mutation, mut writer: journal::Writer<'_>) {
165 writer.write(mutation.clone());
166 }
167}
168
169#[derive(Default)]
170pub struct SyncOptions<'a> {
171 pub flush_device: bool,
178
179 pub precondition: Option<Box<dyn FnOnce() -> bool + 'a + Send>>,
182}
183
184pub struct OpenFxFilesystem(Arc<FxFilesystem>);
185
186impl OpenFxFilesystem {
187 pub async fn take_device(self) -> DeviceHolder {
190 let fut = self.device.take_when_dropped();
191 std::mem::drop(self);
192 debug_assert_not_too_long!(fut)
193 }
194
195 pub async fn finalize(self) -> Result<(DeviceHolder, u64), Error> {
199 ensure!(
200 self.journal().image_builder_mode(),
201 "finalize() only valid in image_builder_mode."
202 );
203 self.journal().allocate_journal().await?;
204 self.journal().set_image_builder_mode(false);
205 self.journal().compact().await?;
206 let actual_size = self.allocator().maximum_offset();
207 self.close().await?;
208 Ok((self.take_device().await, actual_size))
209 }
210}
211
212impl From<Arc<FxFilesystem>> for OpenFxFilesystem {
213 fn from(fs: Arc<FxFilesystem>) -> Self {
214 Self(fs)
215 }
216}
217
218impl Drop for OpenFxFilesystem {
219 fn drop(&mut self) {
220 if self.options.image_builder_mode && self.journal().image_builder_mode() {
221 error!("OpenFxFilesystem in image_builder_mode dropped without calling finalize().");
222 }
223 if !self.options.read_only && !self.closed.load(Ordering::SeqCst) {
224 error!("OpenFxFilesystem dropped without first being closed. Data loss may occur.");
225 }
226 }
227}
228
229impl std::ops::Deref for OpenFxFilesystem {
230 type Target = Arc<FxFilesystem>;
231
232 fn deref(&self) -> &Self::Target {
233 &self.0
234 }
235}
236
237pub struct FxFilesystemBuilder {
238 format: bool,
239 trace: bool,
240 options: Options,
241 journal_options: JournalOptions,
242 on_new_allocator: Option<Box<dyn Fn(Arc<Allocator>) + Send + Sync>>,
243 on_new_store: Option<Box<dyn Fn(&ObjectStore) + Send + Sync>>,
244 fsck_after_every_transaction: bool,
245}
246
247impl FxFilesystemBuilder {
248 pub fn new() -> Self {
249 Self {
250 format: false,
251 trace: false,
252 options: Options::default(),
253 journal_options: JournalOptions::default(),
254 on_new_allocator: None,
255 on_new_store: None,
256 fsck_after_every_transaction: false,
257 }
258 }
259
260 pub fn format(mut self, format: bool) -> Self {
262 self.format = format;
263 self
264 }
265
266 pub fn trace(mut self, trace: bool) -> Self {
268 self.trace = trace;
269 self
270 }
271
272 pub fn read_only(mut self, read_only: bool) -> Self {
275 self.options.read_only = read_only;
276 self
277 }
278
279 pub fn image_builder_mode(mut self, enabled: bool) -> Self {
285 self.options.image_builder_mode = enabled;
286 self
287 }
288
289 pub fn roll_metadata_key_byte_count(mut self, roll_metadata_key_byte_count: u64) -> Self {
291 self.options.roll_metadata_key_byte_count = roll_metadata_key_byte_count;
292 self
293 }
294
295 pub fn pre_commit_hook(
297 mut self,
298 hook: impl Fn(&Transaction<'_>) -> Result<(), Error> + Send + Sync + 'static,
299 ) -> Self {
300 self.options.pre_commit_hook = Some(Box::new(hook));
301 self
302 }
303
304 pub fn post_commit_hook(
307 mut self,
308 hook: impl Fn() -> futures::future::BoxFuture<'static, ()> + Send + Sync + 'static,
309 ) -> Self {
310 self.options.post_commit_hook = Some(Box::new(hook));
311 self
312 }
313
314 pub fn skip_initial_reap(mut self, skip_initial_reap: bool) -> Self {
317 self.options.skip_initial_reap = skip_initial_reap;
318 self
319 }
320
321 pub fn journal_options(mut self, journal_options: JournalOptions) -> Self {
323 self.journal_options = journal_options;
324 self
325 }
326
327 pub fn on_new_allocator(
329 mut self,
330 on_new_allocator: impl Fn(Arc<Allocator>) + Send + Sync + 'static,
331 ) -> Self {
332 self.on_new_allocator = Some(Box::new(on_new_allocator));
333 self
334 }
335
336 pub fn on_new_store(
338 mut self,
339 on_new_store: impl Fn(&ObjectStore) + Send + Sync + 'static,
340 ) -> Self {
341 self.on_new_store = Some(Box::new(on_new_store));
342 self
343 }
344
345 pub fn fsck_after_every_transaction(mut self, fsck_after_every_transaction: bool) -> Self {
347 self.fsck_after_every_transaction = fsck_after_every_transaction;
348 self
349 }
350
351 pub fn trim_config(
352 mut self,
353 delay_and_interval: Option<(std::time::Duration, std::time::Duration)>,
354 ) -> Self {
355 self.options.trim_config = delay_and_interval;
356 self
357 }
358
359 pub async fn open(self, device: DeviceHolder) -> Result<OpenFxFilesystem, Error> {
361 let read_only = self.options.read_only;
362 if self.format && read_only {
363 bail!("Cannot initialize a filesystem as read-only");
364 }
365
366 let objects = Arc::new(ObjectManager::new(self.on_new_store));
367 let journal = Arc::new(Journal::new(objects.clone(), self.journal_options));
368
369 let image_builder_mode = self.options.image_builder_mode;
370
371 let block_size = std::cmp::max(device.block_size().into(), MIN_BLOCK_SIZE);
372 assert_eq!(block_size % MIN_BLOCK_SIZE, 0);
373 assert!(block_size <= MAX_BLOCK_SIZE, "Max supported block size is 64KiB");
374
375 let mut fsck_after_every_transaction = None;
376 let mut filesystem_options = self.options;
377 if self.fsck_after_every_transaction {
378 let instance =
379 FsckAfterEveryTransaction::new(filesystem_options.post_commit_hook.take());
380 fsck_after_every_transaction = Some(instance.clone());
381 filesystem_options.post_commit_hook =
382 Some(Box::new(move || instance.clone().run().boxed()));
383 }
384
385 if !read_only && !self.format {
386 device.flush().await.context("Device flush failed")?;
389 }
390
391 let filesystem = Arc::new(FxFilesystem {
392 device,
393 block_size,
394 objects: objects.clone(),
395 journal,
396 commit_mutex: futures::lock::Mutex::new(()),
397 lock_manager: LockManager::new(),
398 flush_task: Mutex::new(None),
399 trim_task: Mutex::new(None),
400 closed: AtomicBool::new(true),
401 shutdown_event: Event::new(),
402 trace: self.trace,
403 graveyard: Graveyard::new(objects.clone()),
404 completed_transactions: metrics::detail().create_uint("completed_transactions", 0),
405 options: filesystem_options,
406 in_flight_transactions: AtomicU64::new(0),
407 transaction_limit_event: Event::new(),
408 });
409
410 if image_builder_mode {
411 filesystem.journal().set_image_builder_mode(true);
412 }
413
414 filesystem.journal.set_trace(self.trace);
415 if self.format {
416 filesystem.journal.init_empty(filesystem.clone()).await?;
417 if !image_builder_mode {
418 filesystem.journal.init_superblocks().await?;
421
422 filesystem.graveyard.clone().reap_async();
424 }
425
426 let root_store = filesystem.root_store();
428 root_store.set_trace(self.trace);
429 let root_directory =
430 Directory::open(&root_store, root_store.root_directory_object_id())
431 .await
432 .context("Unable to open root volume directory")?;
433 let mut transaction = filesystem
434 .clone()
435 .new_transaction(
436 lock_keys![LockKey::object(
437 root_store.store_object_id(),
438 root_directory.object_id()
439 )],
440 transaction::Options::default(),
441 )
442 .await?;
443 let volume_directory =
444 root_directory.create_child_dir(&mut transaction, VOLUMES_DIRECTORY).await?;
445 transaction.commit().await?;
446 objects.set_volume_directory(volume_directory);
447 } else {
448 filesystem
449 .journal
450 .replay(filesystem.clone(), self.on_new_allocator)
451 .await
452 .context("Journal replay failed")?;
453 filesystem.root_store().set_trace(self.trace);
454
455 if !read_only {
456 for store in objects.unlocked_stores() {
460 filesystem.graveyard.initial_reap(&store).await?;
461 }
462 }
463 }
464
465 if let Some(fsck_after_every_transaction) = fsck_after_every_transaction {
467 fsck_after_every_transaction
468 .fs
469 .set(Arc::downgrade(&filesystem))
470 .unwrap_or_else(|_| unreachable!());
471 }
472
473 filesystem.closed.store(false, Ordering::SeqCst);
474
475 if !read_only && !image_builder_mode {
476 filesystem.graveyard.clone().reap_async();
478
479 if let Some((delay, interval)) = filesystem.options.trim_config.clone() {
480 filesystem.start_trim_task(delay, interval);
481 }
482 }
483
484 Ok(filesystem.into())
485 }
486}
487
488pub struct FxFilesystem {
489 block_size: u64,
490 objects: Arc<ObjectManager>,
491 journal: Arc<Journal>,
492 commit_mutex: futures::lock::Mutex<()>,
493 lock_manager: LockManager,
494 flush_task: Mutex<Option<fasync::Task<()>>>,
495 trim_task: Mutex<Option<fasync::Task<()>>>,
496 closed: AtomicBool,
497 shutdown_event: Event,
499 trace: bool,
500 graveyard: Arc<Graveyard>,
501 completed_transactions: UintProperty,
502 options: Options,
503
504 in_flight_transactions: AtomicU64,
506
507 transaction_limit_event: Event,
510
511 device: DeviceHolder,
514}
515
516#[fxfs_trace::trace]
517impl FxFilesystem {
518 pub async fn new_empty(device: DeviceHolder) -> Result<OpenFxFilesystem, Error> {
519 FxFilesystemBuilder::new().format(true).open(device).await
520 }
521
522 pub async fn open(device: DeviceHolder) -> Result<OpenFxFilesystem, Error> {
523 FxFilesystemBuilder::new().open(device).await
524 }
525
526 pub fn root_parent_store(&self) -> Arc<ObjectStore> {
527 self.objects.root_parent_store()
528 }
529
530 pub async fn close(&self) -> Result<(), Error> {
531 assert_eq!(self.closed.swap(true, Ordering::SeqCst), false);
532 self.shutdown_event.notify(usize::MAX);
533 debug_assert_not_too_long!(self.graveyard.wait_for_reap());
534 let trim_task = self.trim_task.lock().take();
535 if let Some(task) = trim_task {
536 debug_assert_not_too_long!(task);
537 }
538 self.journal.stop_compactions().await;
539 let sync_status =
540 self.journal.sync(SyncOptions { flush_device: true, ..Default::default() }).await;
541 match &sync_status {
542 Ok(checkpoint) => info!(
543 "Filesystem closed (checkpoint={}, metadata_reservation={:?}, \
544 reservation_required={}, borrowed={})",
545 checkpoint.as_ref().unwrap().0.file_offset,
546 self.object_manager().metadata_reservation(),
547 self.object_manager().required_reservation(),
548 self.object_manager().borrowed_metadata_space(),
549 ),
550 Err(e) => error!(error:? = e; "Failed to sync filesystem; data may be lost"),
551 }
552 self.journal.terminate();
553 let flush_task = self.flush_task.lock().take();
554 if let Some(task) = flush_task {
555 debug_assert_not_too_long!(task);
556 }
557 self.device().close().await.context("Failed to close device")?;
560 sync_status.map(|_| ())
561 }
562
563 pub fn device(&self) -> Arc<dyn Device> {
564 Arc::clone(&self.device)
565 }
566
567 pub fn root_store(&self) -> Arc<ObjectStore> {
568 self.objects.root_store()
569 }
570
571 pub fn allocator(&self) -> Arc<Allocator> {
572 self.objects.allocator()
573 }
574
575 pub fn object_manager(&self) -> &Arc<ObjectManager> {
576 &self.objects
577 }
578
579 pub fn journal(&self) -> &Arc<Journal> {
580 &self.journal
581 }
582
583 pub async fn sync(&self, options: SyncOptions<'_>) -> Result<(), Error> {
584 self.journal.sync(options).await.map(|_| ())
585 }
586
587 pub fn block_size(&self) -> u64 {
588 self.block_size
589 }
590
591 pub fn get_info(&self) -> Info {
592 Info {
593 total_bytes: self.device.size(),
594 used_bytes: self.object_manager().allocator().get_used_bytes().0,
595 }
596 }
597
598 pub fn super_block_header(&self) -> SuperBlockHeader {
599 self.journal.super_block_header()
600 }
601
602 pub fn graveyard(&self) -> &Arc<Graveyard> {
603 &self.graveyard
604 }
605
606 pub fn trace(&self) -> bool {
607 self.trace
608 }
609
610 pub fn options(&self) -> &Options {
611 &self.options
612 }
613
614 pub async fn txn_guard(self: Arc<Self>) -> TxnGuard<'static> {
621 TxnGuard::Owned(
622 self.lock_manager
623 .read_lock(lock_keys!(LockKey::Filesystem))
624 .await
625 .into_owned(self.clone()),
626 )
627 }
628
629 pub async fn new_transaction<'a>(
630 self: Arc<Self>,
631 locks: LockKeys,
632 options: transaction::Options<'a>,
633 ) -> Result<Transaction<'a>, Error> {
634 let guard = if let Some(guard) = options.txn_guard.as_ref() {
635 TxnGuard::Borrowed(guard)
636 } else {
637 self.txn_guard().await
638 };
639 Transaction::new(guard, options, locks).await
640 }
641
642 #[trace]
643 pub async fn commit_transaction(
644 &self,
645 transaction: &mut Transaction<'_>,
646 callback: &mut (dyn FnMut(u64) + Send),
647 ) -> Result<u64, Error> {
648 if let Some(hook) = self.options.pre_commit_hook.as_ref() {
649 hook(transaction)?;
650 }
651 debug_assert_not_too_long!(self.lock_manager.commit_prepare(&transaction));
652 self.maybe_start_flush_task();
653 let _guard = debug_assert_not_too_long!(self.commit_mutex.lock());
654 let journal_offset = if self.journal().image_builder_mode() {
655 let journal_checkpoint =
656 JournalCheckpoint { file_offset: 0, checksum: 0, version: LATEST_VERSION };
657 let maybe_mutation = self
658 .object_manager()
659 .apply_transaction(transaction, &journal_checkpoint)
660 .expect("Transactions must not fail in image_builder_mode");
661 if let Some(mutation) = maybe_mutation {
662 assert!(matches!(mutation, Mutation::UpdateBorrowed(_)));
663 }
667 self.object_manager().did_commit_transaction(transaction, &journal_checkpoint, 0);
668 0
669 } else {
670 self.journal.commit(transaction).await?
671 };
672 self.completed_transactions.add(1);
673
674 callback(journal_offset);
679
680 if let Some(hook) = self.options.post_commit_hook.as_ref() {
681 hook().await;
682 }
683
684 Ok(journal_offset)
685 }
686
687 pub fn lock_manager(&self) -> &LockManager {
688 &self.lock_manager
689 }
690
691 pub(crate) fn drop_transaction(&self, transaction: &mut Transaction<'_>) {
692 if !matches!(transaction.metadata_reservation, MetadataReservation::None) {
693 self.sub_transaction();
694 }
695 if let MetadataReservation::Hold(hold_amount) =
697 std::mem::replace(&mut transaction.metadata_reservation, MetadataReservation::None)
698 {
699 let hold = transaction
700 .allocator_reservation
701 .unwrap()
702 .reserve(0)
703 .expect("Zero should always succeed.");
704 hold.add(hold_amount);
705 }
706 self.objects.drop_transaction(transaction);
707 self.lock_manager.drop_transaction(transaction);
708 }
709
710 fn maybe_start_flush_task(&self) {
711 let mut flush_task = self.flush_task.lock();
712 if flush_task.is_none() {
713 let journal = self.journal.clone();
714 *flush_task = Some(fasync::Task::spawn(journal.flush_task()));
715 }
716 }
717
718 async fn do_trim(&self) -> Result<usize, Error> {
720 const MAX_EXTENTS_PER_BATCH: usize = 8;
721 const MAX_EXTENT_SIZE: usize = 256 * 1024;
722 let mut offset = 0;
723 let mut bytes_trimmed = 0;
724 loop {
725 if self.closed.load(Ordering::Relaxed) {
726 info!("Filesystem is closed, nothing to trim");
727 return Ok(bytes_trimmed);
728 }
729 let allocator = self.allocator();
730 let trimmable_extents =
731 allocator.take_for_trimming(offset, MAX_EXTENT_SIZE, MAX_EXTENTS_PER_BATCH).await?;
732 for device_range in trimmable_extents.extents() {
733 self.device.trim(device_range.clone()).await?;
734 bytes_trimmed += device_range.length()? as usize;
735 }
736 if let Some(device_range) = trimmable_extents.extents().last() {
737 offset = device_range.end;
738 } else {
739 break;
740 }
741 }
742 Ok(bytes_trimmed)
743 }
744
745 fn start_trim_task(
746 self: &Arc<Self>,
747 delay: std::time::Duration,
748 interval: std::time::Duration,
749 ) {
750 if !self.device.supports_trim() {
751 info!("Device does not support trim; not scheduling trimming");
752 return;
753 }
754 let this = self.clone();
755 let mut next_timer = delay;
756 *self.trim_task.lock() = Some(fasync::Task::spawn(async move {
757 loop {
758 let shutdown_listener = this.shutdown_event.listen();
759 if this.closed.load(Ordering::SeqCst) {
765 return;
766 }
767 futures::select!(
768 () = fasync::Timer::new(next_timer.clone()).fuse() => {},
769 () = shutdown_listener.fuse() => return,
770 );
771 let start_time = std::time::Instant::now();
772 let res = this.do_trim().await;
773 let duration = std::time::Instant::now() - start_time;
774 next_timer = interval.clone();
775 match res {
776 Ok(bytes_trimmed) => info!(
777 "Trimmed {bytes_trimmed} bytes in {duration:?}. Next trim in \
778 {next_timer:?}",
779 ),
780 Err(e) => error!(e:?; "Failed to trim"),
781 }
782 }
783 }));
784 }
785
786 pub(crate) async fn reservation_for_transaction<'a>(
787 self: &Arc<Self>,
788 options: transaction::Options<'a>,
789 ) -> Result<(MetadataReservation, Option<&'a Reservation>, Option<Hold<'a>>), Error> {
790 if self.options.image_builder_mode {
791 return Ok((MetadataReservation::Borrowed, None, None));
794 }
795 if !options.skip_journal_checks {
796 self.maybe_start_flush_task();
797 self.journal.check_journal_space().await?;
798 }
799
800 let mut hold = None;
814 let metadata_reservation = if options.borrow_metadata_space {
815 MetadataReservation::Borrowed
816 } else {
817 match options.allocator_reservation {
818 Some(reservation) => {
819 hold = Some(
820 reservation
821 .reserve(TRANSACTION_METADATA_MAX_AMOUNT)
822 .ok_or(FxfsError::NoSpace)?,
823 );
824 MetadataReservation::Hold(TRANSACTION_METADATA_MAX_AMOUNT)
825 }
826 None => {
827 let reservation = self
828 .allocator()
829 .reserve(None, TRANSACTION_METADATA_MAX_AMOUNT)
830 .ok_or(FxfsError::NoSpace)?;
831 MetadataReservation::Reservation(reservation)
832 }
833 }
834 };
835 Ok((metadata_reservation, options.allocator_reservation, hold))
836 }
837
838 pub(crate) async fn add_transaction(&self, skip_journal_checks: bool) {
839 if skip_journal_checks {
840 self.in_flight_transactions.fetch_add(1, Ordering::Relaxed);
841 } else {
842 let inc = || {
843 let mut in_flights = self.in_flight_transactions.load(Ordering::Relaxed);
844 while in_flights < MAX_IN_FLIGHT_TRANSACTIONS {
845 match self.in_flight_transactions.compare_exchange_weak(
846 in_flights,
847 in_flights + 1,
848 Ordering::Relaxed,
849 Ordering::Relaxed,
850 ) {
851 Ok(_) => return true,
852 Err(x) => in_flights = x,
853 }
854 }
855 return false;
856 };
857 while !inc() {
858 let listener = self.transaction_limit_event.listen();
859 if inc() {
860 break;
861 }
862 listener.await;
863 }
864 }
865 }
866
867 pub(crate) fn sub_transaction(&self) {
868 let old = self.in_flight_transactions.fetch_sub(1, Ordering::Relaxed);
869 assert!(old != 0);
870 if old <= MAX_IN_FLIGHT_TRANSACTIONS {
871 self.transaction_limit_event.notify(usize::MAX);
872 }
873 }
874}
875
876pub enum TxnGuard<'a> {
877 Borrowed(&'a TxnGuard<'a>),
878 Owned(ReadGuard<'static>),
879}
880
881impl TxnGuard<'_> {
882 pub fn fs(&self) -> &Arc<FxFilesystem> {
883 match self {
884 TxnGuard::Borrowed(b) => b.fs(),
885 TxnGuard::Owned(o) => o.fs().unwrap(),
886 }
887 }
888}
889
890pub async fn mkfs(device: DeviceHolder) -> Result<(), Error> {
892 let fs = FxFilesystem::new_empty(device).await?;
893 fs.close().await
894}
895
896pub async fn mkfs_with_volume(
900 device: DeviceHolder,
901 volume_name: &str,
902 crypt: Option<Arc<dyn Crypt>>,
903) -> Result<(), Error> {
904 let fs = FxFilesystem::new_empty(device).await?;
905 {
906 let root_volume = root_volume(fs.clone()).await.expect("Open root_volume failed");
909 root_volume.new_volume(volume_name, NO_OWNER, crypt).await.expect("Create volume failed");
910 }
911 fs.close().await?;
912 Ok(())
913}
914
915struct FsckAfterEveryTransaction {
916 fs: OnceCell<Weak<FxFilesystem>>,
917 old_hook: PostCommitHook,
918}
919
920impl FsckAfterEveryTransaction {
921 fn new(old_hook: PostCommitHook) -> Arc<Self> {
922 Arc::new(Self { fs: OnceCell::new(), old_hook })
923 }
924
925 async fn run(self: Arc<Self>) {
926 if let Some(fs) = self.fs.get().and_then(Weak::upgrade) {
927 let options = FsckOptions {
928 fail_on_warning: true,
929 no_lock: true,
930 quiet: true,
931 ..Default::default()
932 };
933 fsck_with_options(fs.clone(), &options).await.expect("fsck failed");
934 let object_manager = fs.object_manager();
935 for store in object_manager.unlocked_stores() {
936 let store_id = store.store_object_id();
937 if !object_manager.is_system_store(store_id) {
938 fsck_volume_with_options(fs.as_ref(), &options, store_id, None)
939 .await
940 .expect("fsck_volume_with_options failed");
941 }
942 }
943 }
944 if let Some(old_hook) = self.old_hook.as_ref() {
945 old_hook().await;
946 }
947 }
948}
949
950#[cfg(test)]
951mod tests {
952 use super::{FxFilesystem, FxFilesystemBuilder, SyncOptions};
953 use crate::fsck::{fsck, fsck_volume};
954 use crate::log::*;
955 use crate::lsm_tree::types::Item;
956 use crate::lsm_tree::Operation;
957 use crate::object_handle::{
958 ObjectHandle, ReadObjectHandle, WriteObjectHandle, INVALID_OBJECT_ID,
959 };
960 use crate::object_store::directory::{replace_child, Directory};
961 use crate::object_store::journal::JournalOptions;
962 use crate::object_store::transaction::{lock_keys, LockKey, Options};
963 use crate::object_store::volume::root_volume;
964 use crate::object_store::{HandleOptions, ObjectDescriptor, ObjectStore, NO_OWNER};
965 use crate::range::RangeExt;
966 use fuchsia_async as fasync;
967 use fuchsia_sync::Mutex;
968 use futures::future::join_all;
969 use futures::stream::{FuturesUnordered, TryStreamExt};
970 use fxfs_insecure_crypto::InsecureCrypt;
971 use rustc_hash::FxHashMap as HashMap;
972 use std::ops::Range;
973 use std::sync::Arc;
974 use std::time::Duration;
975 use storage_device::fake_device::FakeDevice;
976 use storage_device::DeviceHolder;
977
978 const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
979
980 #[fuchsia::test(threads = 10)]
981 async fn test_compaction() {
982 let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
983
984 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
986 let root_store = fs.root_store();
987 let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
988 .await
989 .expect("open failed");
990
991 let mut tasks = Vec::new();
992 for i in 0..2 {
993 let mut transaction = fs
994 .clone()
995 .new_transaction(
996 lock_keys![LockKey::object(
997 root_store.store_object_id(),
998 root_directory.object_id()
999 )],
1000 Options::default(),
1001 )
1002 .await
1003 .expect("new_transaction failed");
1004 let handle = root_directory
1005 .create_child_file(&mut transaction, &format!("{}", i))
1006 .await
1007 .expect("create_child_file failed");
1008 transaction.commit().await.expect("commit failed");
1009 tasks.push(fasync::Task::spawn(async move {
1010 const TEST_DATA: &[u8] = b"hello";
1011 let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
1012 buf.as_mut_slice().copy_from_slice(TEST_DATA);
1013 for _ in 0..1500 {
1014 handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
1015 }
1016 }));
1017 }
1018 join_all(tasks).await;
1019 fs.sync(SyncOptions::default()).await.expect("sync failed");
1020
1021 fsck(fs.clone()).await.expect("fsck failed");
1022 fs.close().await.expect("Close failed");
1023 }
1024
1025 #[fuchsia::test(threads = 10)]
1026 async fn test_replay_is_identical() {
1027 let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1028 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1029
1030 fs.close().await.expect("close failed");
1033 let device = fs.take_device().await;
1034 device.reopen(false);
1035
1036 struct Mutations<K, V>(Mutex<Vec<(Operation, Item<K, V>)>>);
1037
1038 impl<K: Clone, V: Clone> Mutations<K, V> {
1039 fn new() -> Self {
1040 Mutations(Mutex::new(Vec::new()))
1041 }
1042
1043 fn push(&self, operation: Operation, item: &Item<K, V>) {
1044 self.0.lock().push((operation, item.clone()));
1045 }
1046 }
1047
1048 let open_fs = |device,
1049 object_mutations: Arc<Mutex<HashMap<_, _>>>,
1050 allocator_mutations: Arc<Mutations<_, _>>| async {
1051 FxFilesystemBuilder::new()
1052 .journal_options(JournalOptions { reclaim_size: u64::MAX, ..Default::default() })
1053 .on_new_allocator(move |allocator| {
1054 let allocator_mutations = allocator_mutations.clone();
1055 allocator.tree().set_mutation_callback(Some(Box::new(move |op, item| {
1056 allocator_mutations.push(op, item)
1057 })));
1058 })
1059 .on_new_store(move |store| {
1060 let mutations = Arc::new(Mutations::new());
1061 object_mutations.lock().insert(store.store_object_id(), mutations.clone());
1062 store.tree().set_mutation_callback(Some(Box::new(move |op, item| {
1063 mutations.push(op, item)
1064 })));
1065 })
1066 .open(device)
1067 .await
1068 .expect("open failed")
1069 };
1070
1071 let allocator_mutations = Arc::new(Mutations::new());
1072 let object_mutations = Arc::new(Mutex::new(HashMap::default()));
1073 let fs = open_fs(device, object_mutations.clone(), allocator_mutations.clone()).await;
1074
1075 let root_store = fs.root_store();
1076 let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
1077 .await
1078 .expect("open failed");
1079
1080 let mut transaction = fs
1081 .clone()
1082 .new_transaction(
1083 lock_keys![LockKey::object(
1084 root_store.store_object_id(),
1085 root_directory.object_id()
1086 )],
1087 Options::default(),
1088 )
1089 .await
1090 .expect("new_transaction failed");
1091 let object = root_directory
1092 .create_child_file(&mut transaction, "test")
1093 .await
1094 .expect("create_child_file failed");
1095 transaction.commit().await.expect("commit failed");
1096
1097 let buf = object.allocate_buffer(10000).await;
1099 object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
1100
1101 object.write_or_append(Some(5000), buf.as_ref()).await.expect("write failed");
1103
1104 object.truncate(3000).await.expect("truncate failed");
1106
1107 let mut transaction = fs
1109 .clone()
1110 .new_transaction(
1111 lock_keys![
1112 LockKey::object(root_store.store_object_id(), root_directory.object_id()),
1113 LockKey::object(root_store.store_object_id(), object.object_id()),
1114 ],
1115 Options::default(),
1116 )
1117 .await
1118 .expect("new_transaction failed");
1119
1120 replace_child(&mut transaction, None, (&root_directory, "test"))
1121 .await
1122 .expect("replace_child failed");
1123
1124 transaction.commit().await.expect("commit failed");
1125
1126 root_store
1128 .tombstone_object(object.object_id(), Options::default())
1129 .await
1130 .expect("tombstone failed");
1131
1132 fs.close().await.expect("close failed");
1134
1135 let metadata_reservation_amount = fs.object_manager().metadata_reservation().amount();
1136
1137 let device = fs.take_device().await;
1138 device.reopen(false);
1139
1140 let replayed_object_mutations = Arc::new(Mutex::new(HashMap::default()));
1141 let replayed_allocator_mutations = Arc::new(Mutations::new());
1142 let fs = open_fs(
1143 device,
1144 replayed_object_mutations.clone(),
1145 replayed_allocator_mutations.clone(),
1146 )
1147 .await;
1148
1149 let m1 = object_mutations.lock();
1150 let m2 = replayed_object_mutations.lock();
1151 assert_eq!(m1.len(), m2.len());
1152 for (store_id, mutations) in &*m1 {
1153 let mutations = mutations.0.lock();
1154 let replayed = m2.get(&store_id).expect("Found unexpected store").0.lock();
1155 assert_eq!(mutations.len(), replayed.len());
1156 for ((op1, i1), (op2, i2)) in mutations.iter().zip(replayed.iter()) {
1157 assert_eq!(op1, op2);
1158 assert_eq!(i1.key, i2.key);
1159 assert_eq!(i1.value, i2.value);
1160 assert_eq!(i1.sequence, i2.sequence);
1161 }
1162 }
1163
1164 let a1 = allocator_mutations.0.lock();
1165 let a2 = replayed_allocator_mutations.0.lock();
1166 assert_eq!(a1.len(), a2.len());
1167 for ((op1, i1), (op2, i2)) in a1.iter().zip(a2.iter()) {
1168 assert_eq!(op1, op2);
1169 assert_eq!(i1.key, i2.key);
1170 assert_eq!(i1.value, i2.value);
1171 assert_eq!(i1.sequence, i2.sequence);
1172 }
1173
1174 assert_eq!(
1175 fs.object_manager().metadata_reservation().amount(),
1176 metadata_reservation_amount
1177 );
1178 }
1179
1180 #[fuchsia::test]
1181 async fn test_max_in_flight_transactions() {
1182 let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1183 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1184
1185 let transactions = FuturesUnordered::new();
1186 for _ in 0..super::MAX_IN_FLIGHT_TRANSACTIONS {
1187 transactions.push(fs.clone().new_transaction(lock_keys![], Options::default()));
1188 }
1189 let mut transactions: Vec<_> = transactions.try_collect().await.unwrap();
1190
1191 let mut fut = std::pin::pin!(fs.clone().new_transaction(lock_keys![], Options::default()));
1193 assert!(futures::poll!(&mut fut).is_pending());
1194
1195 transactions.pop();
1197
1198 assert!(futures::poll!(&mut fut).is_ready());
1199 }
1200
1201 #[fuchsia::test(threads = 10)]
1203 async fn test_continuously_trim() {
1204 let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1205 let fs = FxFilesystemBuilder::new()
1206 .trim_config(Some((Duration::ZERO, Duration::ZERO)))
1207 .format(true)
1208 .open(device)
1209 .await
1210 .expect("open failed");
1211 fasync::Timer::new(Duration::from_millis(10)).await;
1213
1214 let root_store = fs.root_store();
1217 let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
1218 .await
1219 .expect("open failed");
1220 for _ in 0..100 {
1221 let mut transaction = fs
1222 .clone()
1223 .new_transaction(
1224 lock_keys![LockKey::object(
1225 root_store.store_object_id(),
1226 root_directory.object_id()
1227 )],
1228 Options::default(),
1229 )
1230 .await
1231 .expect("new_transaction failed");
1232 let object = root_directory
1233 .create_child_file(&mut transaction, "test")
1234 .await
1235 .expect("create_child_file failed");
1236 transaction.commit().await.expect("commit failed");
1237
1238 {
1239 let buf = object.allocate_buffer(1024).await;
1240 object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
1241 }
1242 std::mem::drop(object);
1243
1244 let mut transaction = root_directory
1245 .acquire_context_for_replace(None, "test", true)
1246 .await
1247 .expect("acquire_context_for_replace failed")
1248 .transaction;
1249 replace_child(&mut transaction, None, (&root_directory, "test"))
1250 .await
1251 .expect("replace_child failed");
1252 transaction.commit().await.expect("commit failed");
1253 }
1254 fs.close().await.expect("close failed");
1255 }
1256
1257 #[fuchsia::test]
1258 async fn test_power_fail() {
1259 for _ in 0..10 {
1262 let (store_id, device, test_file_object_id) = {
1263 let device = DeviceHolder::new(FakeDevice::new(8192, 4096));
1264 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1265 let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
1266
1267 fs.sync(SyncOptions { flush_device: true, ..SyncOptions::default() })
1268 .await
1269 .expect("sync failed");
1270
1271 let store = root_volume
1272 .new_volume("test", NO_OWNER, Some(Arc::new(InsecureCrypt::new())))
1273 .await
1274 .expect("new_volume failed");
1275 let root_directory = Directory::open(&store, store.root_directory_object_id())
1276 .await
1277 .expect("open failed");
1278
1279 async fn create_files(store: &Arc<ObjectStore>, prefix: &str) {
1281 let fs = store.filesystem();
1282 let root_directory = Directory::open(store, store.root_directory_object_id())
1283 .await
1284 .expect("open failed");
1285 for i in 0..100 {
1286 let mut transaction = fs
1287 .clone()
1288 .new_transaction(
1289 lock_keys![LockKey::object(
1290 store.store_object_id(),
1291 store.root_directory_object_id()
1292 )],
1293 Options::default(),
1294 )
1295 .await
1296 .expect("new_transaction failed");
1297 root_directory
1298 .create_child_file(&mut transaction, &format!("{prefix} {i}"))
1299 .await
1300 .expect("create_child_file failed");
1301 transaction.commit().await.expect("commit failed");
1302 }
1303 }
1304
1305 create_files(&store, "A").await;
1307
1308 let mut transaction = fs
1311 .clone()
1312 .new_transaction(
1313 lock_keys![LockKey::object(
1314 store.store_object_id(),
1315 store.root_directory_object_id()
1316 )],
1317 Options::default(),
1318 )
1319 .await
1320 .expect("new_transaction failed");
1321 let object = root_directory
1322 .create_child_file(&mut transaction, "test")
1323 .await
1324 .expect("create_child_file failed");
1325 transaction.commit().await.expect("commit failed");
1326
1327 let mut transaction =
1328 object.new_transaction().await.expect("new_transaction failed");
1329 let mut buffer = object.allocate_buffer(4096).await;
1330 buffer.as_mut_slice().fill(0xed);
1331 object
1332 .txn_write(&mut transaction, 0, buffer.as_ref())
1333 .await
1334 .expect("txn_write failed");
1335 transaction.commit().await.expect("commit failed");
1336
1337 create_files(&store, "B").await;
1339
1340 fs.sync(SyncOptions::default()).await.expect("sync failed");
1343
1344 fasync::Timer::new(Duration::from_millis(10)).await;
1350
1351 (
1352 store.store_object_id(),
1353 fs.device().snapshot().expect("snapshot failed"),
1354 object.object_id(),
1355 )
1356 };
1357
1358 device
1361 .discard_random_since_last_flush()
1362 .expect("discard_random_since_last_flush failed");
1363
1364 let fs = FxFilesystem::open(device).await.expect("open failed");
1365 fsck(fs.clone()).await.expect("fsck failed");
1366
1367 let mut check_test_file = false;
1368
1369 let object_id = if fs.object_manager().store(store_id).is_some() {
1372 fsck_volume(&fs, store_id, Some(Arc::new(InsecureCrypt::new())))
1373 .await
1374 .expect("fsck_volume failed");
1375
1376 let store = root_volume(fs.clone())
1380 .await
1381 .expect("root_volume failed")
1382 .volume("test", NO_OWNER, Some(Arc::new(InsecureCrypt::new())))
1383 .await
1384 .expect("volume failed");
1385
1386 let root_directory = Directory::open(&store, store.root_directory_object_id())
1387 .await
1388 .expect("open failed");
1389
1390 let mut transaction = fs
1391 .clone()
1392 .new_transaction(
1393 lock_keys![LockKey::object(
1394 store.store_object_id(),
1395 store.root_directory_object_id()
1396 )],
1397 Options::default(),
1398 )
1399 .await
1400 .expect("new_transaction failed");
1401 let object = root_directory
1402 .create_child_file(&mut transaction, &format!("C"))
1403 .await
1404 .expect("create_child_file failed");
1405 transaction.commit().await.expect("commit failed");
1406
1407 if let Ok(test_file) = ObjectStore::open_object(
1409 &store,
1410 test_file_object_id,
1411 HandleOptions::default(),
1412 None,
1413 )
1414 .await
1415 {
1416 let mut buffer = test_file.allocate_buffer(4096).await;
1418 let bytes = test_file.read(0, buffer.as_mut()).await.expect("read failed");
1419 if bytes == 4096 {
1420 let expected = [0xed; 4096];
1421 assert_eq!(buffer.as_slice(), &expected);
1422 } else {
1423 assert_eq!(bytes, 0);
1425 }
1426
1427 let mut transaction =
1429 test_file.new_transaction().await.expect("new_transaction failed");
1430 buffer.as_mut_slice().fill(0x37);
1431 test_file
1432 .txn_write(&mut transaction, 0, buffer.as_ref())
1433 .await
1434 .expect("txn_write failed");
1435 transaction.commit().await.expect("commit failed");
1436 check_test_file = true;
1437 }
1438
1439 object.object_id()
1440 } else {
1441 INVALID_OBJECT_ID
1442 };
1443
1444 fs.close().await.expect("close failed");
1446 let device = fs.take_device().await;
1447 device.reopen(false);
1448
1449 let fs = FxFilesystem::open(device).await.expect("open failed");
1450 fsck(fs.clone()).await.expect("fsck failed");
1451
1452 if object_id != INVALID_OBJECT_ID {
1455 fsck_volume(&fs, store_id, Some(Arc::new(InsecureCrypt::new())))
1456 .await
1457 .expect("fsck_volume failed");
1458
1459 let store = root_volume(fs.clone())
1460 .await
1461 .expect("root_volume failed")
1462 .volume("test", NO_OWNER, Some(Arc::new(InsecureCrypt::new())))
1463 .await
1464 .expect("volume failed");
1465 ObjectStore::open_object(&store, object_id, HandleOptions::default(), None)
1467 .await
1468 .expect("open_object failed");
1469
1470 if check_test_file {
1472 info!("Checking test file for modification");
1473 let test_file = ObjectStore::open_object(
1474 &store,
1475 test_file_object_id,
1476 HandleOptions::default(),
1477 None,
1478 )
1479 .await
1480 .expect("open_object failed");
1481 let mut buffer = test_file.allocate_buffer(4096).await;
1482 assert_eq!(
1483 test_file.read(0, buffer.as_mut()).await.expect("read failed"),
1484 4096
1485 );
1486 let expected = [0x37; 4096];
1487 assert_eq!(buffer.as_slice(), &expected);
1488 }
1489 }
1490
1491 fs.close().await.expect("close failed");
1492 }
1493 }
1494
1495 #[fuchsia::test]
1496 async fn test_image_builder_mode_no_early_writes() {
1497 const BLOCK_SIZE: u32 = 4096;
1498 let device = DeviceHolder::new(FakeDevice::new(2048, BLOCK_SIZE));
1499 device.reopen(true);
1500 let fs = FxFilesystemBuilder::new()
1501 .format(true)
1502 .image_builder_mode(true)
1503 .open(device)
1504 .await
1505 .expect("open failed");
1506 fs.close().await.expect("closed");
1509 }
1510
1511 #[fuchsia::test]
1512 async fn test_image_builder_mode() {
1513 const BLOCK_SIZE: u32 = 4096;
1514 const EXISTING_FILE_RANGE: Range<u64> = 4096 * 1024..4096 * 1025;
1515 let device = DeviceHolder::new(FakeDevice::new(2048, BLOCK_SIZE));
1516
1517 {
1519 let mut write_buf =
1520 device.allocate_buffer(EXISTING_FILE_RANGE.length().unwrap() as usize).await;
1521 write_buf.as_mut_slice().fill(0xf0);
1522 device.write(EXISTING_FILE_RANGE.start, write_buf.as_ref()).await.expect("write");
1523 }
1524
1525 device.reopen(true);
1526
1527 let device = {
1528 let fs = FxFilesystemBuilder::new()
1529 .format(true)
1530 .image_builder_mode(true)
1531 .open(device)
1532 .await
1533 .expect("open failed");
1534 {
1535 let root_store = fs.root_store();
1536 let root_directory =
1537 Directory::open(&root_store, root_store.root_directory_object_id())
1538 .await
1539 .expect("open failed");
1540 let handle;
1542 {
1543 let mut transaction = fs
1544 .clone()
1545 .new_transaction(
1546 lock_keys![LockKey::object(
1547 root_directory.store().store_object_id(),
1548 root_directory.object_id()
1549 )],
1550 Options::default(),
1551 )
1552 .await
1553 .expect("new transaction");
1554 handle = root_directory
1555 .create_child_file(&mut transaction, "test")
1556 .await
1557 .expect("create file");
1558 handle.extend(&mut transaction, EXISTING_FILE_RANGE).await.expect("extend");
1559 transaction.commit().await.expect("commit");
1560 }
1561 }
1562 fs.device().reopen(false);
1563 let (device, _size) = fs.finalize().await.expect("finalize");
1564 device
1565 };
1566 device.reopen(false);
1567 let fs = FxFilesystem::open(device).await.expect("open failed");
1568 fsck(fs.clone()).await.expect("fsck failed");
1569
1570 let root_store = fs.root_store();
1572 let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
1573 .await
1574 .expect("open failed");
1575 let (object_id, descriptor) =
1576 root_directory.lookup("test").await.expect("lookup failed").unwrap();
1577 assert_eq!(descriptor, ObjectDescriptor::File);
1578 let test_file =
1579 ObjectStore::open_object(&root_store, object_id, HandleOptions::default(), None)
1580 .await
1581 .expect("open failed");
1582 let mut read_buf =
1583 test_file.allocate_buffer(EXISTING_FILE_RANGE.length().unwrap() as usize).await;
1584 test_file.read(0, read_buf.as_mut()).await.expect("read failed");
1585 assert_eq!(read_buf.as_slice(), [0xf0; 4096]);
1586 fs.close().await.expect("closed");
1587 }
1588}