1use crate::errors::FxfsError;
6use crate::fsck::{FsckOptions, fsck_volume_with_options, fsck_with_options};
7use crate::log::*;
8use crate::metrics;
9use crate::object_store::allocator::{Allocator, Hold, Reservation};
10use crate::object_store::directory::Directory;
11use crate::object_store::graveyard::Graveyard;
12use crate::object_store::journal::super_block::{SuperBlockHeader, SuperBlockInstance};
13use crate::object_store::journal::{self, Journal, JournalCheckpoint, JournalOptions};
14use crate::object_store::object_manager::ObjectManager;
15use crate::object_store::transaction::{
16 self, AssocObj, LockKey, LockKeys, LockManager, MetadataReservation, Mutation, ReadGuard,
17 TRANSACTION_METADATA_MAX_AMOUNT, Transaction, WriteGuard, lock_keys,
18};
19use crate::object_store::volume::{VOLUMES_DIRECTORY, root_volume};
20use crate::object_store::{NewChildStoreOptions, ObjectStore, StoreOptions};
21use crate::range::RangeExt;
22use crate::serialized_types::{LATEST_VERSION, Version};
23use anyhow::{Context, Error, anyhow, bail};
24use async_trait::async_trait;
25use event_listener::Event;
26use fuchsia_async as fasync;
27use fuchsia_inspect::{Inspector, LazyNode, NumericProperty as _, UintProperty};
28use fuchsia_sync::Mutex;
29use futures::FutureExt;
30use fxfs_crypto::Crypt;
31use fxfs_trace::{TraceFutureExt, trace_future_args};
32use static_assertions::const_assert;
33use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
34use std::sync::{Arc, OnceLock, Weak};
35use storage_device::{Device, DeviceHolder};
36
37pub const MIN_BLOCK_SIZE: u64 = 4096;
38pub const MAX_BLOCK_SIZE: u64 = u16::MAX as u64 + 1;
39
40pub const MAX_FILE_SIZE: u64 = i64::MAX as u64 - 4095;
45const_assert!(9223372036854771712 == MAX_FILE_SIZE);
46
47const MAX_IN_FLIGHT_TRANSACTIONS: u64 = 4;
49
50const TRIM_AFTER_BOOT_TIMER: std::time::Duration = std::time::Duration::from_secs(60 * 60);
54
55const TRIM_INTERVAL_TIMER: std::time::Duration = std::time::Duration::from_secs(60 * 60 * 24);
57
58const CLEAN_TRANSFER_BUFFER_INTERVAL: std::time::Duration = std::time::Duration::from_secs(60);
61
62pub struct Info {
64 pub total_bytes: u64,
65 pub used_bytes: u64,
66}
67
68pub type PostCommitHook =
69 Option<Box<dyn Fn() -> futures::future::BoxFuture<'static, ()> + Send + Sync>>;
70
71pub type PreCommitHook = Option<Box<dyn Fn(&Transaction<'_>) -> Result<(), Error> + Send + Sync>>;
72
73pub struct Options {
74 pub read_only: bool,
76
77 pub roll_metadata_key_byte_count: u64,
81
82 pub pre_commit_hook: PreCommitHook,
85
86 pub post_commit_hook: PostCommitHook,
89
90 pub skip_initial_reap: bool,
93
94 pub trim_config: Option<(std::time::Duration, std::time::Duration)>,
99
100 pub image_builder_mode: Option<SuperBlockInstance>,
103
104 pub inline_crypto_enabled: bool,
111
112 pub barriers_enabled: bool,
117}
118
119impl Default for Options {
120 fn default() -> Self {
121 Options {
122 roll_metadata_key_byte_count: 128 * 1024 * 1024,
123 read_only: false,
124 pre_commit_hook: None,
125 post_commit_hook: None,
126 skip_initial_reap: false,
127 trim_config: Some((TRIM_AFTER_BOOT_TIMER, TRIM_INTERVAL_TIMER)),
128 image_builder_mode: None,
129 inline_crypto_enabled: false,
130 barriers_enabled: false,
131 }
132 }
133}
134
135pub struct ApplyContext<'a, 'b> {
137 pub mode: ApplyMode<'a, 'b>,
139
140 pub checkpoint: JournalCheckpoint,
142}
143
144pub enum ApplyMode<'a, 'b> {
147 Replay,
148 Live(&'a Transaction<'b>),
149}
150
151impl ApplyMode<'_, '_> {
152 pub fn is_replay(&self) -> bool {
153 matches!(self, ApplyMode::Replay)
154 }
155
156 pub fn is_live(&self) -> bool {
157 matches!(self, ApplyMode::Live(_))
158 }
159}
160
161#[async_trait]
164pub trait JournalingObject: Send + Sync {
165 fn apply_mutation(
169 &self,
170 mutation: Mutation,
171 context: &ApplyContext<'_, '_>,
172 assoc_obj: AssocObj<'_>,
173 ) -> Result<(), Error>;
174
175 fn drop_mutation(&self, mutation: Mutation, transaction: &Transaction<'_>);
177
178 async fn flush(&self) -> Result<Version, Error>;
182
183 fn write_mutation(&self, mutation: &Mutation, mut writer: journal::Writer<'_>) {
186 writer.write(mutation.clone());
187 }
188}
189
190#[derive(Default)]
191pub struct SyncOptions<'a> {
192 pub flush_device: bool,
199
200 pub precondition: Option<Box<dyn FnOnce() -> bool + 'a + Send>>,
203}
204
205pub struct OpenFxFilesystem(Arc<FxFilesystem>);
206
207impl OpenFxFilesystem {
208 pub async fn take_device(self) -> DeviceHolder {
211 let fut = self.device.take_when_dropped();
212 std::mem::drop(self);
213 debug_assert_not_too_long!(fut)
214 }
215}
216
217impl From<Arc<FxFilesystem>> for OpenFxFilesystem {
218 fn from(fs: Arc<FxFilesystem>) -> Self {
219 Self(fs)
220 }
221}
222
223impl Drop for OpenFxFilesystem {
224 fn drop(&mut self) {
225 if self.options.image_builder_mode.is_some()
226 && self.journal().image_builder_mode().is_some()
227 {
228 error!("OpenFxFilesystem in image_builder_mode dropped without calling close().");
229 }
230 if !self.options.read_only && !self.closed.load(Ordering::SeqCst) {
231 error!("OpenFxFilesystem dropped without first being closed. Data loss may occur.");
232 }
233 }
234}
235
236impl std::ops::Deref for OpenFxFilesystem {
237 type Target = Arc<FxFilesystem>;
238
239 fn deref(&self) -> &Self::Target {
240 &self.0
241 }
242}
243
244pub struct FxFilesystemBuilder {
245 format: bool,
246 trace: bool,
247 options: Options,
248 journal_options: JournalOptions,
249 on_new_allocator: Option<Box<dyn Fn(Arc<Allocator>) + Send + Sync>>,
250 on_new_store: Option<Box<dyn Fn(&ObjectStore) + Send + Sync>>,
251 fsck_after_every_transaction: bool,
252}
253
254impl FxFilesystemBuilder {
255 pub fn new() -> Self {
256 Self {
257 format: false,
258 trace: false,
259 options: Options::default(),
260 journal_options: JournalOptions::default(),
261 on_new_allocator: None,
262 on_new_store: None,
263 fsck_after_every_transaction: false,
264 }
265 }
266
267 pub fn format(mut self, format: bool) -> Self {
269 self.format = format;
270 self
271 }
272
273 pub fn trace(mut self, trace: bool) -> Self {
275 self.trace = trace;
276 self
277 }
278
279 pub fn read_only(mut self, read_only: bool) -> Self {
282 self.options.read_only = read_only;
283 self
284 }
285
286 pub fn image_builder_mode(mut self, mode: Option<SuperBlockInstance>) -> Self {
293 self.options.image_builder_mode = mode;
294 self
295 }
296
297 pub fn roll_metadata_key_byte_count(mut self, roll_metadata_key_byte_count: u64) -> Self {
299 self.options.roll_metadata_key_byte_count = roll_metadata_key_byte_count;
300 self
301 }
302
303 pub fn pre_commit_hook(
305 mut self,
306 hook: impl Fn(&Transaction<'_>) -> Result<(), Error> + Send + Sync + 'static,
307 ) -> Self {
308 self.options.pre_commit_hook = Some(Box::new(hook));
309 self
310 }
311
312 pub fn post_commit_hook(
315 mut self,
316 hook: impl Fn() -> futures::future::BoxFuture<'static, ()> + Send + Sync + 'static,
317 ) -> Self {
318 self.options.post_commit_hook = Some(Box::new(hook));
319 self
320 }
321
322 pub fn skip_initial_reap(mut self, skip_initial_reap: bool) -> Self {
325 self.options.skip_initial_reap = skip_initial_reap;
326 self
327 }
328
329 pub fn journal_options(mut self, journal_options: JournalOptions) -> Self {
331 self.journal_options = journal_options;
332 self
333 }
334
335 pub fn on_new_allocator(
337 mut self,
338 on_new_allocator: impl Fn(Arc<Allocator>) + Send + Sync + 'static,
339 ) -> Self {
340 self.on_new_allocator = Some(Box::new(on_new_allocator));
341 self
342 }
343
344 pub fn on_new_store(
346 mut self,
347 on_new_store: impl Fn(&ObjectStore) + Send + Sync + 'static,
348 ) -> Self {
349 self.on_new_store = Some(Box::new(on_new_store));
350 self
351 }
352
353 pub fn fsck_after_every_transaction(mut self, fsck_after_every_transaction: bool) -> Self {
355 self.fsck_after_every_transaction = fsck_after_every_transaction;
356 self
357 }
358
359 pub fn trim_config(
360 mut self,
361 delay_and_interval: Option<(std::time::Duration, std::time::Duration)>,
362 ) -> Self {
363 self.options.trim_config = delay_and_interval;
364 self
365 }
366
367 pub fn inline_crypto_enabled(mut self, inline_crypto_enabled: bool) -> Self {
369 self.options.inline_crypto_enabled = inline_crypto_enabled;
370 self
371 }
372
373 pub fn barriers_enabled(mut self, barriers_enabled: bool) -> Self {
376 self.options.barriers_enabled = barriers_enabled;
377 self.journal_options.barriers_enabled = barriers_enabled;
378 self
379 }
380
381 pub async fn open(self, device: DeviceHolder) -> Result<OpenFxFilesystem, Error> {
383 let read_only = self.options.read_only;
384 if self.format && read_only {
385 bail!("Cannot initialize a filesystem as read-only");
386 }
387
388 if self.options.inline_crypto_enabled && !self.options.barriers_enabled {
390 bail!("A filesystem using inline encryption requires barriers");
391 }
392
393 let objects = Arc::new(ObjectManager::new(self.on_new_store));
394 let journal = Arc::new(Journal::new(objects.clone(), self.journal_options));
395
396 let image_builder_mode = self.options.image_builder_mode;
397
398 let block_size = std::cmp::max(device.block_size().into(), MIN_BLOCK_SIZE);
399 assert_eq!(block_size % MIN_BLOCK_SIZE, 0);
400 assert!(block_size <= MAX_BLOCK_SIZE, "Max supported block size is 64KiB");
401
402 let mut fsck_after_every_transaction = None;
403 let mut filesystem_options = self.options;
404 if self.fsck_after_every_transaction {
405 let instance =
406 FsckAfterEveryTransaction::new(filesystem_options.post_commit_hook.take());
407 fsck_after_every_transaction = Some(instance.clone());
408 filesystem_options.post_commit_hook =
409 Some(Box::new(move || Box::pin(instance.clone().run())));
410 }
411
412 if !read_only && !self.format {
413 device.flush().await.context("Device flush failed")?;
416 }
417
418 let filesystem = Arc::new_cyclic(|weak: &Weak<FxFilesystem>| {
419 let weak = weak.clone();
420 FxFilesystem {
421 device,
422 block_size,
423 objects: objects.clone(),
424 journal,
425 commit_mutex: futures::lock::Mutex::new(()),
426 lock_manager: LockManager::new(),
427 flush_task: Mutex::new(None),
428 trim_task: Mutex::new(None),
429 clean_transfer_buffer_task: Mutex::new(None),
430 closed: AtomicBool::new(true),
431 shutdown_event: Event::new(),
432 trace: self.trace,
433 graveyard: Graveyard::new(objects.clone()),
434 completed_transactions: metrics::detail().create_uint("completed_transactions", 0),
435 options: filesystem_options,
436 in_flight_transactions: AtomicU64::new(0),
437 transaction_limit_event: Event::new(),
438 _stores_node: metrics::register_fs(move || {
439 let weak = weak.clone();
440 Box::pin(async move {
441 if let Some(fs) = weak.upgrade() {
442 fs.populate_stores_node().await
443 } else {
444 Err(anyhow!("Filesystem has been dropped"))
445 }
446 })
447 }),
448 }
449 });
450
451 filesystem.journal().set_image_builder_mode(image_builder_mode);
452
453 filesystem.journal.set_trace(self.trace);
454 if self.format {
455 filesystem.journal.init_empty(filesystem.clone()).await?;
456 if image_builder_mode.is_none() {
457 filesystem.journal.init_superblocks().await?;
460
461 filesystem.graveyard.clone().reap_async();
463 }
464
465 let root_store = filesystem.root_store();
467 root_store.set_trace(self.trace);
468 let root_directory =
469 Directory::open(&root_store, root_store.root_directory_object_id())
470 .await
471 .context("Unable to open root volume directory")?;
472 let mut transaction = filesystem
473 .clone()
474 .new_transaction(
475 lock_keys![LockKey::object(
476 root_store.store_object_id(),
477 root_directory.object_id()
478 )],
479 transaction::Options::default(),
480 )
481 .await?;
482 let volume_directory =
483 root_directory.create_child_dir(&mut transaction, VOLUMES_DIRECTORY).await?;
484 transaction.commit().await?;
485 objects.set_volume_directory(volume_directory);
486 } else {
487 filesystem
488 .journal
489 .replay(filesystem.clone(), self.on_new_allocator)
490 .await
491 .context("Journal replay failed")?;
492 filesystem.root_store().set_trace(self.trace);
493
494 if !read_only {
495 for store in objects.unlocked_stores() {
499 filesystem.graveyard.initial_reap(&store).await?;
500 }
501 }
502 }
503
504 if let Some(fsck_after_every_transaction) = fsck_after_every_transaction {
506 fsck_after_every_transaction
507 .fs
508 .set(Arc::downgrade(&filesystem))
509 .unwrap_or_else(|_| unreachable!());
510 }
511
512 filesystem.closed.store(false, Ordering::SeqCst);
513
514 if !read_only && image_builder_mode.is_none() {
515 filesystem.graveyard.clone().reap_async();
517
518 if let Some((delay, interval)) = filesystem.options.trim_config.clone() {
519 filesystem.start_trim_task(delay, interval);
520 }
521 filesystem.start_clean_transfer_buffer_task();
522 }
523
524 Ok(filesystem.into())
525 }
526}
527
528pub struct FxFilesystem {
529 block_size: u64,
530 objects: Arc<ObjectManager>,
531 journal: Arc<Journal>,
532 commit_mutex: futures::lock::Mutex<()>,
533 lock_manager: LockManager,
534 flush_task: Mutex<Option<fasync::Task<()>>>,
535 trim_task: Mutex<Option<fasync::Task<()>>>,
536 clean_transfer_buffer_task: Mutex<Option<fasync::Task<()>>>,
537 closed: AtomicBool,
538 shutdown_event: Event,
540 trace: bool,
541 graveyard: Arc<Graveyard>,
542 completed_transactions: UintProperty,
543 options: Options,
544
545 in_flight_transactions: AtomicU64,
547
548 transaction_limit_event: Event,
551
552 device: DeviceHolder,
555
556 _stores_node: LazyNode,
558}
559
560#[fxfs_trace::trace]
561impl FxFilesystem {
562 pub async fn new_empty(device: DeviceHolder) -> Result<OpenFxFilesystem, Error> {
563 FxFilesystemBuilder::new().format(true).open(device).await
564 }
565
566 pub async fn open(device: DeviceHolder) -> Result<OpenFxFilesystem, Error> {
567 FxFilesystemBuilder::new().open(device).await
568 }
569
570 pub fn root_parent_store(&self) -> Arc<ObjectStore> {
571 self.objects.root_parent_store()
572 }
573
574 pub async fn close(&self) -> Result<(), Error> {
575 if self.journal().image_builder_mode().is_some() {
576 self.journal().allocate_journal().await?;
577 self.journal().set_image_builder_mode(None);
578 self.journal().force_compact().await?;
579 }
580 assert_eq!(self.closed.swap(true, Ordering::SeqCst), false);
581 self.shutdown_event.notify(usize::MAX);
582 debug_assert_not_too_long!(self.graveyard.wait_for_reap());
583 let trim_task = self.trim_task.lock().take();
584 if let Some(task) = trim_task {
585 debug_assert_not_too_long!(task);
586 }
587 let clean_transfer_buffer_task = self.clean_transfer_buffer_task.lock().take();
588 if let Some(task) = clean_transfer_buffer_task {
589 debug_assert_not_too_long!(task);
590 }
591 self.journal.stop_compactions().await;
592 let sync_status =
593 if self.journal().image_builder_mode().is_some() || self.options().read_only {
594 Ok(None)
595 } else {
596 self.journal.sync(SyncOptions { flush_device: true, ..Default::default() }).await
597 };
598 match &sync_status {
599 Ok(None) => {}
600 Ok(checkpoint) => info!(
601 "Filesystem closed (checkpoint={}, metadata_reservation={:?}, \
602 reservation_required={}, borrowed={})",
603 checkpoint.as_ref().unwrap().0.file_offset,
604 self.object_manager().metadata_reservation(),
605 self.object_manager().required_reservation(),
606 self.object_manager().borrowed_metadata_space(),
607 ),
608 Err(e) => error!(error:? = e; "Failed to sync filesystem; data may be lost"),
609 }
610 self.journal.terminate();
611 let flush_task = self.flush_task.lock().take();
612 if let Some(task) = flush_task {
613 debug_assert_not_too_long!(task);
614 }
615 self.device().close().await.context("Failed to close device")?;
618 sync_status.map(|_| ())
619 }
620
621 pub fn device(&self) -> Arc<dyn Device> {
622 Arc::clone(&self.device)
623 }
624
625 pub fn root_store(&self) -> Arc<ObjectStore> {
626 self.objects.root_store()
627 }
628
629 pub fn allocator(&self) -> Arc<Allocator> {
630 self.objects.allocator()
631 }
632
633 pub fn enable_allocations(&self) {
637 self.allocator().enable_allocations();
638 }
639
640 pub fn object_manager(&self) -> &Arc<ObjectManager> {
641 &self.objects
642 }
643
644 pub fn journal(&self) -> &Arc<Journal> {
645 &self.journal
646 }
647
648 pub async fn sync(&self, options: SyncOptions<'_>) -> Result<(), Error> {
649 self.journal.sync(options).await.map(|_| ())
650 }
651
652 pub fn block_size(&self) -> u64 {
653 self.block_size
654 }
655
656 pub fn get_info(&self) -> Info {
657 Info {
658 total_bytes: self.device.size(),
659 used_bytes: self.object_manager().allocator().get_used_bytes().0,
660 }
661 }
662
663 pub fn super_block_header(&self) -> SuperBlockHeader {
664 self.journal.super_block_header()
665 }
666
667 pub fn graveyard(&self) -> &Arc<Graveyard> {
668 &self.graveyard
669 }
670
671 pub fn trace(&self) -> bool {
672 self.trace
673 }
674
675 pub fn options(&self) -> &Options {
676 &self.options
677 }
678
679 pub async fn txn_guard(self: Arc<Self>) -> TxnGuard<'static> {
686 TxnGuard::Owned(
687 self.lock_manager
688 .read_lock(lock_keys!(LockKey::Filesystem))
689 .await
690 .into_owned(self.clone()),
691 )
692 }
693
694 pub async fn new_transaction<'a>(
695 self: Arc<Self>,
696 locks: LockKeys,
697 options: transaction::Options<'a>,
698 ) -> Result<Transaction<'a>, Error> {
699 let guard = if let Some(guard) = options.txn_guard.as_ref() {
700 TxnGuard::Borrowed(guard)
701 } else {
702 self.txn_guard().await
703 };
704 Transaction::new(guard, options, locks).await
705 }
706
707 #[trace]
708 pub async fn commit_transaction(
709 &self,
710 transaction: &mut Transaction<'_>,
711 callback: &mut (dyn FnMut(u64) + Send),
712 ) -> Result<u64, Error> {
713 if let Some(hook) = self.options.pre_commit_hook.as_ref() {
714 hook(transaction)?;
715 }
716 debug_assert_not_too_long!(self.lock_manager.commit_prepare(&transaction));
717 self.maybe_start_flush_task();
718 let _guard = debug_assert_not_too_long!(self.commit_mutex.lock());
719 let journal_offset = if self.journal().image_builder_mode().is_some() {
720 let journal_checkpoint =
721 JournalCheckpoint { file_offset: 0, checksum: 0, version: LATEST_VERSION };
722 let maybe_mutation = self
723 .object_manager()
724 .apply_transaction(transaction, &journal_checkpoint)
725 .expect("Transactions must not fail in image_builder_mode");
726 if let Some(mutation) = maybe_mutation {
727 assert!(matches!(mutation, Mutation::UpdateBorrowed(_)));
728 }
732 self.object_manager().did_commit_transaction(transaction, &journal_checkpoint, 0);
733 0
734 } else {
735 self.journal.commit(transaction).await?
736 };
737 self.completed_transactions.add(1);
738
739 callback(journal_offset);
744
745 if let Some(hook) = self.options.post_commit_hook.as_ref() {
746 hook().await;
747 }
748
749 Ok(journal_offset)
750 }
751
752 pub fn lock_manager(&self) -> &LockManager {
753 &self.lock_manager
754 }
755
756 pub(crate) fn drop_transaction(&self, transaction: &mut Transaction<'_>) {
757 if !matches!(transaction.metadata_reservation, MetadataReservation::None) {
758 self.sub_transaction();
759 }
760 if let MetadataReservation::Hold(hold_amount) =
762 std::mem::replace(&mut transaction.metadata_reservation, MetadataReservation::None)
763 {
764 let hold = transaction
765 .allocator_reservation
766 .unwrap()
767 .reserve(0)
768 .expect("Zero should always succeed.");
769 hold.add(hold_amount);
770 }
771 self.objects.drop_transaction(transaction);
772 self.lock_manager.drop_transaction(transaction);
773 }
774
775 fn maybe_start_flush_task(&self) {
776 if self.journal.image_builder_mode().is_some() {
777 return;
778 }
779 let mut flush_task = self.flush_task.lock();
780 if flush_task.is_none() {
781 let journal = self.journal.clone();
782 *flush_task = Some(fasync::Task::spawn(
783 journal.flush_task().trace(trace_future_args!("Journal::flush_task")),
784 ));
785 }
786 }
787
788 async fn do_trim(&self) -> Result<usize, Error> {
790 const MAX_EXTENTS_PER_BATCH: usize = 8;
791 const MAX_EXTENT_SIZE: usize = 256 * 1024;
792 let mut offset = 0;
793 let mut bytes_trimmed = 0;
794 loop {
795 if self.closed.load(Ordering::Relaxed) {
796 info!("Filesystem is closed, nothing to trim");
797 return Ok(bytes_trimmed);
798 }
799 let allocator = self.allocator();
800 let trimmable_extents =
801 allocator.take_for_trimming(offset, MAX_EXTENT_SIZE, MAX_EXTENTS_PER_BATCH).await?;
802 for device_range in trimmable_extents.extents() {
803 self.device.trim(device_range.clone()).await?;
804 bytes_trimmed += device_range.length()? as usize;
805 }
806 if let Some(device_range) = trimmable_extents.extents().last() {
807 offset = device_range.end;
808 } else {
809 break;
810 }
811 }
812 Ok(bytes_trimmed)
813 }
814
815 fn start_trim_task(
816 self: &Arc<Self>,
817 delay: std::time::Duration,
818 interval: std::time::Duration,
819 ) {
820 if !self.device.supports_trim() {
821 info!("Device does not support trim; not scheduling trimming");
822 return;
823 }
824 let this = self.clone();
825 let mut next_timer = delay;
826 *self.trim_task.lock() = Some(fasync::Task::spawn(
827 async move {
828 loop {
829 let shutdown_listener = this.shutdown_event.listen();
830 if this.closed.load(Ordering::SeqCst) {
836 return;
837 }
838 futures::select!(
839 () = fasync::Timer::new(next_timer.clone()).fuse() => {},
840 () = shutdown_listener.fuse() => return,
841 );
842 let start_time = std::time::Instant::now();
843 let res = this.do_trim().await;
844 let duration = std::time::Instant::now() - start_time;
845 next_timer = interval.clone();
846 match res {
847 Ok(bytes_trimmed) => info!(
848 "Trimmed {bytes_trimmed} bytes in {duration:?}. Next trim in \
849 {next_timer:?}",
850 ),
851 Err(e) => error!(e:?; "Failed to trim"),
852 }
853 }
854 }
855 .trace(trace_future_args!("Filesystem::trim_task")),
856 ));
857 }
858
859 fn start_clean_transfer_buffer_task(self: &Arc<Self>) {
860 let this = self.clone();
861 *self.clean_transfer_buffer_task.lock() = Some(fasync::Task::spawn(
862 async move {
863 loop {
864 let shutdown_listener = this.shutdown_event.listen();
865 if this.closed.load(Ordering::SeqCst) {
866 return;
867 }
868 futures::select!(
869 () = fasync::Timer::new(CLEAN_TRANSFER_BUFFER_INTERVAL).fuse() => {},
870 () = shutdown_listener.fuse() => return,
871 );
872 this.device().clean_transfer_buffer();
873 }
874 }
875 .trace(trace_future_args!("Filesystem::clean_transfer_buffer_task")),
876 ));
877 }
878
879 pub(crate) async fn reservation_for_transaction<'a>(
880 self: &Arc<Self>,
881 options: transaction::Options<'a>,
882 ) -> Result<(MetadataReservation, Option<&'a Reservation>, Option<Hold<'a>>), Error> {
883 if self.options.image_builder_mode.is_some() {
884 return Ok((MetadataReservation::Borrowed, None, None));
887 }
888 if !options.skip_journal_checks {
889 self.maybe_start_flush_task();
890 self.journal.check_journal_space().await?;
891 }
892
893 let mut hold = None;
907 let metadata_reservation = if options.borrow_metadata_space {
908 MetadataReservation::Borrowed
909 } else {
910 match options.allocator_reservation {
911 Some(reservation) => {
912 hold = Some(
913 reservation
914 .reserve(TRANSACTION_METADATA_MAX_AMOUNT)
915 .ok_or(FxfsError::NoSpace)?,
916 );
917 MetadataReservation::Hold(TRANSACTION_METADATA_MAX_AMOUNT)
918 }
919 None => {
920 let reservation = self
921 .allocator()
922 .reserve(None, TRANSACTION_METADATA_MAX_AMOUNT)
923 .ok_or(FxfsError::NoSpace)?;
924 MetadataReservation::Reservation(reservation)
925 }
926 }
927 };
928 Ok((metadata_reservation, options.allocator_reservation, hold))
929 }
930
931 pub(crate) async fn add_transaction(&self, skip_journal_checks: bool) {
932 if skip_journal_checks {
933 self.in_flight_transactions.fetch_add(1, Ordering::Relaxed);
934 } else {
935 let inc = || {
936 let mut in_flights = self.in_flight_transactions.load(Ordering::Relaxed);
937 while in_flights < MAX_IN_FLIGHT_TRANSACTIONS {
938 match self.in_flight_transactions.compare_exchange_weak(
939 in_flights,
940 in_flights + 1,
941 Ordering::Relaxed,
942 Ordering::Relaxed,
943 ) {
944 Ok(_) => return true,
945 Err(x) => in_flights = x,
946 }
947 }
948 return false;
949 };
950 while !inc() {
951 let listener = self.transaction_limit_event.listen();
952 if inc() {
953 break;
954 }
955 listener.await;
956 }
957 }
958 }
959
960 pub(crate) fn sub_transaction(&self) {
961 let old = self.in_flight_transactions.fetch_sub(1, Ordering::Relaxed);
962 assert!(old != 0);
963 if old <= MAX_IN_FLIGHT_TRANSACTIONS {
964 self.transaction_limit_event.notify(usize::MAX);
965 }
966 }
967
968 pub async fn truncate_guard(&self, store_id: u64, object_id: u64) -> TruncateGuard<'_> {
969 let keys = lock_keys![LockKey::truncate(store_id, object_id,)];
970 TruncateGuard(self.lock_manager().write_lock(keys).await)
971 }
972
973 async fn populate_stores_node(&self) -> Result<Inspector, Error> {
974 let inspector = fuchsia_inspect::Inspector::default();
975 let root = inspector.root();
976 root.record_child("__root", |n| self.root_store().record_data(n));
977 root.record_child("__root_parent", |n| self.root_parent_store().record_data(n));
978 let object_manager = self.object_manager();
979 let volume_directory = object_manager.volume_directory();
980 let layer_set = volume_directory.store().tree().layer_set();
981 let mut merger = layer_set.merger();
982 let mut iter = volume_directory.iter(&mut merger).await?;
983 while let Some((name, id, _)) = iter.get() {
984 if let Some(store) = object_manager.store(id) {
985 root.record_child(name.to_string(), |n| store.record_data(n));
986 }
987 iter.advance().await?;
988 }
989 Ok(inspector)
990 }
991}
992
993pub enum TxnGuard<'a> {
994 Borrowed(&'a TxnGuard<'a>),
995 Owned(ReadGuard<'static>),
996}
997
998impl TxnGuard<'_> {
999 pub fn fs(&self) -> &Arc<FxFilesystem> {
1000 match self {
1001 TxnGuard::Borrowed(b) => b.fs(),
1002 TxnGuard::Owned(o) => o.fs().unwrap(),
1003 }
1004 }
1005}
1006
1007#[allow(dead_code)]
1009pub struct TruncateGuard<'a>(WriteGuard<'a>);
1010
1011pub async fn mkfs(device: DeviceHolder) -> Result<DeviceHolder, Error> {
1013 let fs = FxFilesystem::new_empty(device).await?;
1014 fs.close().await?;
1015 Ok(fs.take_device().await)
1016}
1017
1018pub async fn mkfs_with_volume(
1022 device: DeviceHolder,
1023 volume_name: &str,
1024 crypt: Option<Arc<dyn Crypt>>,
1025) -> Result<DeviceHolder, Error> {
1026 let fs = FxFilesystem::new_empty(device).await?;
1027 {
1028 let root_volume = root_volume(fs.clone()).await.expect("Open root_volume failed");
1031 root_volume
1032 .new_volume(
1033 volume_name,
1034 NewChildStoreOptions {
1035 options: StoreOptions { crypt, ..StoreOptions::default() },
1036 ..Default::default()
1037 },
1038 )
1039 .await
1040 .expect("Create volume failed");
1041 }
1042 fs.close().await?;
1043 Ok(fs.take_device().await)
1044}
1045
1046struct FsckAfterEveryTransaction {
1047 fs: OnceLock<Weak<FxFilesystem>>,
1048 old_hook: PostCommitHook,
1049}
1050
1051impl FsckAfterEveryTransaction {
1052 fn new(old_hook: PostCommitHook) -> Arc<Self> {
1053 Arc::new(Self { fs: OnceLock::new(), old_hook })
1054 }
1055
1056 async fn run(self: Arc<Self>) {
1057 if let Some(fs) = self.fs.get().and_then(Weak::upgrade) {
1058 let options = FsckOptions {
1059 fail_on_warning: true,
1060 no_lock: true,
1061 quiet: true,
1062 ..Default::default()
1063 };
1064 fsck_with_options(fs.clone(), &options).await.expect("fsck failed");
1065 let object_manager = fs.object_manager();
1066 for store in object_manager.unlocked_stores() {
1067 let store_id = store.store_object_id();
1068 if !object_manager.is_system_store(store_id) {
1069 fsck_volume_with_options(fs.as_ref(), &options, store_id, None)
1070 .await
1071 .expect("fsck_volume_with_options failed");
1072 }
1073 }
1074 }
1075 if let Some(old_hook) = self.old_hook.as_ref() {
1076 old_hook().await;
1077 }
1078 }
1079}
1080
1081#[cfg(test)]
1082mod tests {
1083 use super::{FxFilesystem, FxFilesystemBuilder, FxfsError, SyncOptions};
1084 use crate::fsck::{fsck, fsck_volume};
1085 use crate::log::*;
1086 use crate::lsm_tree::Operation;
1087 use crate::lsm_tree::types::Item;
1088 use crate::object_handle::{
1089 INVALID_OBJECT_ID, ObjectHandle, ReadObjectHandle, WriteObjectHandle,
1090 };
1091 use crate::object_store::directory::{Directory, replace_child};
1092 use crate::object_store::journal::JournalOptions;
1093 use crate::object_store::journal::super_block::SuperBlockInstance;
1094 use crate::object_store::transaction::{LockKey, Options, lock_keys};
1095 use crate::object_store::volume::root_volume;
1096 use crate::object_store::{
1097 HandleOptions, NewChildStoreOptions, ObjectDescriptor, ObjectStore, StoreOptions,
1098 };
1099 use crate::range::RangeExt;
1100 use fuchsia_async as fasync;
1101 use fuchsia_sync::Mutex;
1102 use futures::future::join_all;
1103 use futures::stream::{FuturesUnordered, TryStreamExt};
1104 use fxfs_insecure_crypto::new_insecure_crypt;
1105 use rustc_hash::FxHashMap as HashMap;
1106 use std::ops::Range;
1107 use std::sync::Arc;
1108 use std::sync::atomic::{self, AtomicU32};
1109 use std::time::Duration;
1110 use storage_device::DeviceHolder;
1111 use storage_device::fake_device::{self, FakeDevice};
1112 use test_case::test_case;
1113
1114 const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
1115
1116 #[fuchsia::test(threads = 10)]
1117 async fn test_compaction() {
1118 let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1119
1120 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1122 let root_store = fs.root_store();
1123 let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
1124 .await
1125 .expect("open failed");
1126
1127 let mut tasks = Vec::new();
1128 for i in 0..2 {
1129 let mut transaction = fs
1130 .clone()
1131 .new_transaction(
1132 lock_keys![LockKey::object(
1133 root_store.store_object_id(),
1134 root_directory.object_id()
1135 )],
1136 Options::default(),
1137 )
1138 .await
1139 .expect("new_transaction failed");
1140 let handle = root_directory
1141 .create_child_file(&mut transaction, &format!("{}", i))
1142 .await
1143 .expect("create_child_file failed");
1144 transaction.commit().await.expect("commit failed");
1145 tasks.push(fasync::Task::spawn(async move {
1146 const TEST_DATA: &[u8] = b"hello";
1147 let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
1148 buf.as_mut_slice().copy_from_slice(TEST_DATA);
1149 for _ in 0..1500 {
1150 handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
1151 }
1152 }));
1153 }
1154 join_all(tasks).await;
1155 fs.sync(SyncOptions::default()).await.expect("sync failed");
1156
1157 fsck(fs.clone()).await.expect("fsck failed");
1158 fs.close().await.expect("Close failed");
1159 }
1160
1161 #[fuchsia::test]
1162 async fn test_enable_allocations() {
1163 {
1165 let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1166 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1167 fs.enable_allocations();
1168 let root_store = fs.root_store();
1169 let root_directory =
1170 Directory::open(&root_store, root_store.root_directory_object_id())
1171 .await
1172 .expect("open failed");
1173 let mut transaction = fs
1174 .clone()
1175 .new_transaction(
1176 lock_keys![LockKey::object(
1177 root_store.store_object_id(),
1178 root_directory.object_id()
1179 )],
1180 Options::default(),
1181 )
1182 .await
1183 .expect("new_transaction failed");
1184 root_directory
1185 .create_child_file(&mut transaction, "test")
1186 .await
1187 .expect("create_child_file failed");
1188 transaction.commit().await.expect("commit failed");
1189 fs.close().await.expect("close failed");
1190 }
1191
1192 {
1194 let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1195 let fs = FxFilesystemBuilder::new()
1196 .format(true)
1197 .image_builder_mode(Some(SuperBlockInstance::A))
1198 .open(device)
1199 .await
1200 .expect("open failed");
1201 let root_store = fs.root_store();
1202 let root_directory =
1203 Directory::open(&root_store, root_store.root_directory_object_id())
1204 .await
1205 .expect("open failed");
1206
1207 let mut transaction = fs
1208 .clone()
1209 .new_transaction(
1210 lock_keys![LockKey::object(
1211 root_store.store_object_id(),
1212 root_directory.object_id()
1213 )],
1214 Options::default(),
1215 )
1216 .await
1217 .expect("new_transaction failed");
1218 let handle = root_directory
1219 .create_child_file(&mut transaction, "test_fail")
1220 .await
1221 .expect("create_child_file failed");
1222 transaction.commit().await.expect("commit failed");
1223
1224 assert!(
1226 FxfsError::Unavailable
1227 .matches(&handle.allocate(0..4096).await.expect_err("allocate should fail"))
1228 );
1229
1230 fs.enable_allocations();
1232 handle.allocate(0..4096).await.expect("allocate should work after enable_allocations");
1233
1234 fs.close().await.expect("close failed");
1238 }
1239 }
1243
1244 #[fuchsia::test(threads = 10)]
1245 async fn test_replay_is_identical() {
1246 let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1247 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1248
1249 fs.close().await.expect("close failed");
1252 let device = fs.take_device().await;
1253 device.reopen(false);
1254
1255 struct Mutations<K, V>(Mutex<Vec<(Operation, Item<K, V>)>>);
1256
1257 impl<K: Clone, V: Clone> Mutations<K, V> {
1258 fn new() -> Self {
1259 Mutations(Mutex::new(Vec::new()))
1260 }
1261
1262 fn push(&self, operation: Operation, item: &Item<K, V>) {
1263 self.0.lock().push((operation, item.clone()));
1264 }
1265 }
1266
1267 let open_fs = |device,
1268 object_mutations: Arc<Mutex<HashMap<_, _>>>,
1269 allocator_mutations: Arc<Mutations<_, _>>| async {
1270 FxFilesystemBuilder::new()
1271 .journal_options(JournalOptions { reclaim_size: u64::MAX, ..Default::default() })
1272 .on_new_allocator(move |allocator| {
1273 let allocator_mutations = allocator_mutations.clone();
1274 allocator.tree().set_mutation_callback(Some(Box::new(move |op, item| {
1275 allocator_mutations.push(op, item)
1276 })));
1277 })
1278 .on_new_store(move |store| {
1279 let mutations = Arc::new(Mutations::new());
1280 object_mutations.lock().insert(store.store_object_id(), mutations.clone());
1281 store.tree().set_mutation_callback(Some(Box::new(move |op, item| {
1282 mutations.push(op, item)
1283 })));
1284 })
1285 .open(device)
1286 .await
1287 .expect("open failed")
1288 };
1289
1290 let allocator_mutations = Arc::new(Mutations::new());
1291 let object_mutations = Arc::new(Mutex::new(HashMap::default()));
1292 let fs = open_fs(device, object_mutations.clone(), allocator_mutations.clone()).await;
1293
1294 let root_store = fs.root_store();
1295 let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
1296 .await
1297 .expect("open failed");
1298
1299 let mut transaction = fs
1300 .clone()
1301 .new_transaction(
1302 lock_keys![LockKey::object(
1303 root_store.store_object_id(),
1304 root_directory.object_id()
1305 )],
1306 Options::default(),
1307 )
1308 .await
1309 .expect("new_transaction failed");
1310 let object = root_directory
1311 .create_child_file(&mut transaction, "test")
1312 .await
1313 .expect("create_child_file failed");
1314 transaction.commit().await.expect("commit failed");
1315
1316 let buf = object.allocate_buffer(10000).await;
1318 object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
1319
1320 object.write_or_append(Some(5000), buf.as_ref()).await.expect("write failed");
1322
1323 object.truncate(3000).await.expect("truncate failed");
1325
1326 let mut transaction = fs
1328 .clone()
1329 .new_transaction(
1330 lock_keys![
1331 LockKey::object(root_store.store_object_id(), root_directory.object_id()),
1332 LockKey::object(root_store.store_object_id(), object.object_id()),
1333 ],
1334 Options::default(),
1335 )
1336 .await
1337 .expect("new_transaction failed");
1338
1339 replace_child(&mut transaction, None, (&root_directory, "test"))
1340 .await
1341 .expect("replace_child failed");
1342
1343 transaction.commit().await.expect("commit failed");
1344
1345 root_store
1347 .tombstone_object(object.object_id(), Options::default())
1348 .await
1349 .expect("tombstone failed");
1350
1351 fs.close().await.expect("close failed");
1353
1354 let metadata_reservation_amount = fs.object_manager().metadata_reservation().amount();
1355
1356 let device = fs.take_device().await;
1357 device.reopen(false);
1358
1359 let replayed_object_mutations = Arc::new(Mutex::new(HashMap::default()));
1360 let replayed_allocator_mutations = Arc::new(Mutations::new());
1361 let fs = open_fs(
1362 device,
1363 replayed_object_mutations.clone(),
1364 replayed_allocator_mutations.clone(),
1365 )
1366 .await;
1367
1368 let m1 = object_mutations.lock();
1369 let m2 = replayed_object_mutations.lock();
1370 assert_eq!(m1.len(), m2.len());
1371 for (store_id, mutations) in &*m1 {
1372 let mutations = mutations.0.lock();
1373 let replayed = m2.get(&store_id).expect("Found unexpected store").0.lock();
1374 assert_eq!(mutations.len(), replayed.len());
1375 for ((op1, i1), (op2, i2)) in mutations.iter().zip(replayed.iter()) {
1376 assert_eq!(op1, op2);
1377 assert_eq!(i1.key, i2.key);
1378 assert_eq!(i1.value, i2.value);
1379 assert_eq!(i1.sequence, i2.sequence);
1380 }
1381 }
1382
1383 let a1 = allocator_mutations.0.lock();
1384 let a2 = replayed_allocator_mutations.0.lock();
1385 assert_eq!(a1.len(), a2.len());
1386 for ((op1, i1), (op2, i2)) in a1.iter().zip(a2.iter()) {
1387 assert_eq!(op1, op2);
1388 assert_eq!(i1.key, i2.key);
1389 assert_eq!(i1.value, i2.value);
1390 assert_eq!(i1.sequence, i2.sequence);
1391 }
1392
1393 assert_eq!(
1394 fs.object_manager().metadata_reservation().amount(),
1395 metadata_reservation_amount
1396 );
1397 }
1398
1399 #[fuchsia::test]
1400 async fn test_max_in_flight_transactions() {
1401 let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1402 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1403
1404 let transactions = FuturesUnordered::new();
1405 for _ in 0..super::MAX_IN_FLIGHT_TRANSACTIONS {
1406 transactions.push(fs.clone().new_transaction(lock_keys![], Options::default()));
1407 }
1408 let mut transactions: Vec<_> = transactions.try_collect().await.unwrap();
1409
1410 let mut fut = std::pin::pin!(fs.clone().new_transaction(lock_keys![], Options::default()));
1412 assert!(futures::poll!(&mut fut).is_pending());
1413
1414 transactions.pop();
1416
1417 assert!(futures::poll!(&mut fut).is_ready());
1418 }
1419
1420 #[fuchsia::test(threads = 10)]
1422 async fn test_continuously_trim() {
1423 let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1424 let fs = FxFilesystemBuilder::new()
1425 .trim_config(Some((Duration::ZERO, Duration::ZERO)))
1426 .format(true)
1427 .open(device)
1428 .await
1429 .expect("open failed");
1430 fasync::Timer::new(Duration::from_millis(10)).await;
1432
1433 let root_store = fs.root_store();
1436 let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
1437 .await
1438 .expect("open failed");
1439 for _ in 0..100 {
1440 let mut transaction = fs
1441 .clone()
1442 .new_transaction(
1443 lock_keys![LockKey::object(
1444 root_store.store_object_id(),
1445 root_directory.object_id()
1446 )],
1447 Options::default(),
1448 )
1449 .await
1450 .expect("new_transaction failed");
1451 let object = root_directory
1452 .create_child_file(&mut transaction, "test")
1453 .await
1454 .expect("create_child_file failed");
1455 transaction.commit().await.expect("commit failed");
1456
1457 {
1458 let buf = object.allocate_buffer(1024).await;
1459 object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
1460 }
1461 std::mem::drop(object);
1462
1463 let mut transaction = root_directory
1464 .acquire_context_for_replace(None, "test", true)
1465 .await
1466 .expect("acquire_context_for_replace failed")
1467 .transaction;
1468 replace_child(&mut transaction, None, (&root_directory, "test"))
1469 .await
1470 .expect("replace_child failed");
1471 transaction.commit().await.expect("commit failed");
1472 }
1473 fs.close().await.expect("close failed");
1474 }
1475
1476 #[test_case(true; "test power fail with barriers")]
1477 #[test_case(false; "test power fail with checksums")]
1478 #[fuchsia::test]
1479 async fn test_power_fail(barriers_enabled: bool) {
1480 for _ in 0..10 {
1483 let (store_id, device, test_file_object_id) = {
1484 let device = DeviceHolder::new(FakeDevice::new(8192, 4096));
1485 let fs = if barriers_enabled {
1486 FxFilesystemBuilder::new()
1487 .barriers_enabled(true)
1488 .format(true)
1489 .open(device)
1490 .await
1491 .expect("new filesystem failed")
1492 } else {
1493 FxFilesystem::new_empty(device).await.expect("new_empty failed")
1494 };
1495 let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
1496
1497 fs.sync(SyncOptions { flush_device: true, ..SyncOptions::default() })
1498 .await
1499 .expect("sync failed");
1500
1501 let store = root_volume
1502 .new_volume(
1503 "test",
1504 NewChildStoreOptions {
1505 options: StoreOptions {
1506 crypt: Some(Arc::new(new_insecure_crypt())),
1507 ..StoreOptions::default()
1508 },
1509 ..Default::default()
1510 },
1511 )
1512 .await
1513 .expect("new_volume failed");
1514 let root_directory = Directory::open(&store, store.root_directory_object_id())
1515 .await
1516 .expect("open failed");
1517
1518 async fn create_files(store: &Arc<ObjectStore>, prefix: &str) {
1520 let fs = store.filesystem();
1521 let root_directory = Directory::open(store, store.root_directory_object_id())
1522 .await
1523 .expect("open failed");
1524 for i in 0..100 {
1525 let mut transaction = fs
1526 .clone()
1527 .new_transaction(
1528 lock_keys![LockKey::object(
1529 store.store_object_id(),
1530 store.root_directory_object_id()
1531 )],
1532 Options::default(),
1533 )
1534 .await
1535 .expect("new_transaction failed");
1536 root_directory
1537 .create_child_file(&mut transaction, &format!("{prefix} {i}"))
1538 .await
1539 .expect("create_child_file failed");
1540 transaction.commit().await.expect("commit failed");
1541 }
1542 }
1543
1544 create_files(&store, "A").await;
1546
1547 let mut transaction = fs
1550 .clone()
1551 .new_transaction(
1552 lock_keys![LockKey::object(
1553 store.store_object_id(),
1554 store.root_directory_object_id()
1555 )],
1556 Options::default(),
1557 )
1558 .await
1559 .expect("new_transaction failed");
1560 let object = root_directory
1561 .create_child_file(&mut transaction, "test")
1562 .await
1563 .expect("create_child_file failed");
1564 transaction.commit().await.expect("commit failed");
1565
1566 let mut transaction =
1567 object.new_transaction().await.expect("new_transaction failed");
1568 let mut buffer = object.allocate_buffer(4096).await;
1569 buffer.as_mut_slice().fill(0xed);
1570 object
1571 .txn_write(&mut transaction, 0, buffer.as_ref())
1572 .await
1573 .expect("txn_write failed");
1574 transaction.commit().await.expect("commit failed");
1575
1576 create_files(&store, "B").await;
1578
1579 fs.sync(SyncOptions::default()).await.expect("sync failed");
1582
1583 fasync::Timer::new(Duration::from_millis(10)).await;
1589
1590 (
1591 store.store_object_id(),
1592 fs.device().snapshot().expect("snapshot failed"),
1593 object.object_id(),
1594 )
1595 };
1596
1597 device
1600 .discard_random_since_last_flush()
1601 .expect("discard_random_since_last_flush failed");
1602
1603 let fs = FxFilesystem::open(device).await.expect("open failed");
1604 fsck(fs.clone()).await.expect("fsck failed");
1605
1606 let mut check_test_file = false;
1607
1608 let object_id = if fs.object_manager().store(store_id).is_some() {
1611 fsck_volume(&fs, store_id, Some(Arc::new(new_insecure_crypt())))
1612 .await
1613 .expect("fsck_volume failed");
1614
1615 let store = root_volume(fs.clone())
1619 .await
1620 .expect("root_volume failed")
1621 .volume(
1622 "test",
1623 StoreOptions {
1624 crypt: Some(Arc::new(new_insecure_crypt())),
1625 ..StoreOptions::default()
1626 },
1627 )
1628 .await
1629 .expect("volume failed");
1630
1631 let root_directory = Directory::open(&store, store.root_directory_object_id())
1632 .await
1633 .expect("open failed");
1634
1635 let mut transaction = fs
1636 .clone()
1637 .new_transaction(
1638 lock_keys![LockKey::object(
1639 store.store_object_id(),
1640 store.root_directory_object_id()
1641 )],
1642 Options::default(),
1643 )
1644 .await
1645 .expect("new_transaction failed");
1646 let object = root_directory
1647 .create_child_file(&mut transaction, &format!("C"))
1648 .await
1649 .expect("create_child_file failed");
1650 transaction.commit().await.expect("commit failed");
1651
1652 if let Ok(test_file) = ObjectStore::open_object(
1654 &store,
1655 test_file_object_id,
1656 HandleOptions::default(),
1657 None,
1658 )
1659 .await
1660 {
1661 let mut buffer = test_file.allocate_buffer(4096).await;
1663 let bytes = test_file.read(0, buffer.as_mut()).await.expect("read failed");
1664 if bytes == 4096 {
1665 let expected = [0xed; 4096];
1666 assert_eq!(buffer.as_slice(), &expected);
1667 } else {
1668 assert_eq!(bytes, 0);
1670 }
1671
1672 let mut transaction =
1674 test_file.new_transaction().await.expect("new_transaction failed");
1675 buffer.as_mut_slice().fill(0x37);
1676 test_file
1677 .txn_write(&mut transaction, 0, buffer.as_ref())
1678 .await
1679 .expect("txn_write failed");
1680 transaction.commit().await.expect("commit failed");
1681 check_test_file = true;
1682 }
1683
1684 object.object_id()
1685 } else {
1686 INVALID_OBJECT_ID
1687 };
1688
1689 fs.close().await.expect("close failed");
1691 let device = fs.take_device().await;
1692 device.reopen(false);
1693
1694 let fs = FxFilesystem::open(device).await.expect("open failed");
1695 fsck(fs.clone()).await.expect("fsck failed");
1696
1697 if object_id != INVALID_OBJECT_ID {
1700 fsck_volume(&fs, store_id, Some(Arc::new(new_insecure_crypt())))
1701 .await
1702 .expect("fsck_volume failed");
1703
1704 let store = root_volume(fs.clone())
1705 .await
1706 .expect("root_volume failed")
1707 .volume(
1708 "test",
1709 StoreOptions {
1710 crypt: Some(Arc::new(new_insecure_crypt())),
1711 ..StoreOptions::default()
1712 },
1713 )
1714 .await
1715 .expect("volume failed");
1716 ObjectStore::open_object(&store, object_id, HandleOptions::default(), None)
1718 .await
1719 .expect("open_object failed");
1720
1721 if check_test_file {
1723 info!("Checking test file for modification");
1724 let test_file = ObjectStore::open_object(
1725 &store,
1726 test_file_object_id,
1727 HandleOptions::default(),
1728 None,
1729 )
1730 .await
1731 .expect("open_object failed");
1732 let mut buffer = test_file.allocate_buffer(4096).await;
1733 assert_eq!(
1734 test_file.read(0, buffer.as_mut()).await.expect("read failed"),
1735 4096
1736 );
1737 let expected = [0x37; 4096];
1738 assert_eq!(buffer.as_slice(), &expected);
1739 }
1740 }
1741
1742 fs.close().await.expect("close failed");
1743 }
1744 }
1745
1746 #[fuchsia::test]
1747 async fn test_barrier_not_emitted_when_transaction_has_no_data() {
1748 let barrier_count = Arc::new(AtomicU32::new(0));
1749
1750 struct Observer(Arc<AtomicU32>);
1751
1752 impl fake_device::Observer for Observer {
1753 fn barrier(&self) {
1754 self.0.fetch_add(1, atomic::Ordering::Relaxed);
1755 }
1756 }
1757
1758 let mut fake_device = FakeDevice::new(8192, 4096);
1759 fake_device.set_observer(Box::new(Observer(barrier_count.clone())));
1760 let device = DeviceHolder::new(fake_device);
1761 let fs = FxFilesystemBuilder::new()
1762 .barriers_enabled(true)
1763 .format(true)
1764 .open(device)
1765 .await
1766 .expect("new filesystem failed");
1767
1768 {
1769 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
1770 root_vol
1771 .new_volume(
1772 "test",
1773 NewChildStoreOptions {
1774 options: StoreOptions {
1775 crypt: Some(Arc::new(new_insecure_crypt())),
1776 ..StoreOptions::default()
1777 },
1778 ..NewChildStoreOptions::default()
1779 },
1780 )
1781 .await
1782 .expect("there is no test volume");
1783 fs.close().await.expect("close failed");
1784 }
1785 let device = fs.take_device().await;
1788 device.reopen(false);
1789 let fs = FxFilesystemBuilder::new()
1790 .barriers_enabled(true)
1791 .open(device)
1792 .await
1793 .expect("new filesystem failed");
1794 let expected_barrier_count = barrier_count.load(atomic::Ordering::Relaxed);
1795
1796 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
1797 let store = root_vol
1798 .volume(
1799 "test",
1800 StoreOptions {
1801 crypt: Some(Arc::new(new_insecure_crypt())),
1802 ..StoreOptions::default()
1803 },
1804 )
1805 .await
1806 .expect("there is no test volume");
1807
1808 let fs = store.filesystem();
1810 let root_directory =
1811 Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
1812 for i in 0..100 {
1813 let mut transaction = fs
1814 .clone()
1815 .new_transaction(
1816 lock_keys![LockKey::object(
1817 store.store_object_id(),
1818 store.root_directory_object_id()
1819 )],
1820 Options::default(),
1821 )
1822 .await
1823 .expect("new_transaction failed");
1824 root_directory
1825 .create_child_file(&mut transaction, &format!("A {i}"))
1826 .await
1827 .expect("create_child_file failed");
1828 transaction.commit().await.expect("commit failed");
1829 }
1830
1831 fs.close().await.expect("close failed");
1833 assert_eq!(expected_barrier_count, barrier_count.load(atomic::Ordering::Relaxed));
1835 }
1836
1837 #[fuchsia::test]
1838 async fn test_barrier_emitted_when_transaction_includes_data() {
1839 let barrier_count = Arc::new(AtomicU32::new(0));
1840
1841 struct Observer(Arc<AtomicU32>);
1842
1843 impl fake_device::Observer for Observer {
1844 fn barrier(&self) {
1845 self.0.fetch_add(1, atomic::Ordering::Relaxed);
1846 }
1847 }
1848
1849 let mut fake_device = FakeDevice::new(8192, 4096);
1850 fake_device.set_observer(Box::new(Observer(barrier_count.clone())));
1851 let device = DeviceHolder::new(fake_device);
1852 let fs = FxFilesystemBuilder::new()
1853 .barriers_enabled(true)
1854 .format(true)
1855 .open(device)
1856 .await
1857 .expect("new filesystem failed");
1858
1859 {
1860 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
1861 root_vol
1862 .new_volume(
1863 "test",
1864 NewChildStoreOptions {
1865 options: StoreOptions {
1866 crypt: Some(Arc::new(new_insecure_crypt())),
1867 ..StoreOptions::default()
1868 },
1869 ..NewChildStoreOptions::default()
1870 },
1871 )
1872 .await
1873 .expect("there is no test volume");
1874 fs.close().await.expect("close failed");
1875 }
1876 let device = fs.take_device().await;
1879 device.reopen(false);
1880 let fs = FxFilesystemBuilder::new()
1881 .barriers_enabled(true)
1882 .open(device)
1883 .await
1884 .expect("new filesystem failed");
1885 let expected_barrier_count = barrier_count.load(atomic::Ordering::Relaxed);
1886
1887 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
1888 let store = root_vol
1889 .volume(
1890 "test",
1891 StoreOptions {
1892 crypt: Some(Arc::new(new_insecure_crypt())),
1893 ..StoreOptions::default()
1894 },
1895 )
1896 .await
1897 .expect("there is no test volume");
1898
1899 let fs: Arc<FxFilesystem> = store.filesystem();
1901 let root_directory =
1902 Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
1903
1904 let mut transaction = fs
1905 .clone()
1906 .new_transaction(
1907 lock_keys![LockKey::object(
1908 store.store_object_id(),
1909 store.root_directory_object_id()
1910 )],
1911 Options::default(),
1912 )
1913 .await
1914 .expect("new_transaction failed");
1915 let object = root_directory
1916 .create_child_file(&mut transaction, "test")
1917 .await
1918 .expect("create_child_file failed");
1919 transaction.commit().await.expect("commit failed");
1920
1921 let mut transaction = object.new_transaction().await.expect("new_transaction failed");
1922 let mut buffer = object.allocate_buffer(4096).await;
1923 buffer.as_mut_slice().fill(0xed);
1924 object.txn_write(&mut transaction, 0, buffer.as_ref()).await.expect("txn_write failed");
1925 transaction.commit().await.expect("commit failed");
1926
1927 fs.close().await.expect("close failed");
1929 assert!(expected_barrier_count < barrier_count.load(atomic::Ordering::Relaxed));
1931 }
1932
1933 #[test_case(true; "fail when original filesystem has barriers enabled")]
1934 #[test_case(false; "fail when original filesystem has barriers disabled")]
1935 #[fuchsia::test]
1936 async fn test_switching_barrier_mode_on_existing_filesystem(original_barrier_mode: bool) {
1937 let crypt = Some(Arc::new(new_insecure_crypt()) as Arc<dyn fxfs_crypto::Crypt>);
1938 let fake_device = FakeDevice::new(8192, 4096);
1939 let device = DeviceHolder::new(fake_device);
1940 let fs: super::OpenFxFilesystem = FxFilesystemBuilder::new()
1941 .barriers_enabled(original_barrier_mode)
1942 .format(true)
1943 .open(device)
1944 .await
1945 .expect("new filesystem failed");
1946
1947 {
1949 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
1950 let store = root_vol
1951 .new_volume(
1952 "test",
1953 NewChildStoreOptions {
1954 options: StoreOptions { crypt: crypt.clone(), ..Default::default() },
1955 ..Default::default()
1956 },
1957 )
1958 .await
1959 .expect("creating test volume");
1960 let root_dir = Directory::open(&store, store.root_directory_object_id())
1961 .await
1962 .expect("open failed");
1963 let mut transaction = fs
1964 .clone()
1965 .new_transaction(
1966 lock_keys![LockKey::object(
1967 store.store_object_id(),
1968 store.root_directory_object_id()
1969 )],
1970 Default::default(),
1971 )
1972 .await
1973 .expect("new_transaction failed");
1974 let object = root_dir
1975 .create_child_file(&mut transaction, "file")
1976 .await
1977 .expect("create_child_file failed");
1978 transaction.commit().await.expect("commit failed");
1979 let mut buffer = object.allocate_buffer(4096).await;
1980 buffer.as_mut_slice().fill(0xA7);
1981 let new_size = object.write_or_append(None, buffer.as_ref()).await.unwrap();
1982 assert_eq!(new_size, 4096);
1983 }
1984
1985 fs.close().await.expect("close failed");
1987 let device = fs.take_device().await;
1988 device.reopen(false);
1989 let fs = FxFilesystemBuilder::new()
1990 .barriers_enabled(!original_barrier_mode)
1991 .open(device)
1992 .await
1993 .expect("new filesystem failed");
1994 {
1995 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
1996 let store = root_vol
1997 .volume("test", StoreOptions { crypt: crypt.clone(), ..Default::default() })
1998 .await
1999 .expect("opening test volume");
2000 let root_dir = Directory::open(&store, store.root_directory_object_id())
2001 .await
2002 .expect("open failed");
2003 let (object_id, _, _) =
2004 root_dir.lookup("file").await.expect("lookup failed").expect("missing file");
2005 let test_file = ObjectStore::open_object(&store, object_id, Default::default(), None)
2006 .await
2007 .expect("open failed");
2008 let mut buffer = test_file.allocate_buffer(4096).await;
2010 buffer.as_mut_slice().fill(0xA8);
2011 let new_size = test_file.write_or_append(None, buffer.as_ref()).await.unwrap();
2012 assert_eq!(new_size, 8192);
2013 }
2014
2015 fs.close().await.expect("close failed");
2018 let device = fs.take_device().await;
2019 device.reopen(false);
2020 let fs = FxFilesystemBuilder::new()
2021 .barriers_enabled(original_barrier_mode)
2022 .open(device)
2023 .await
2024 .expect("new filesystem failed");
2025 {
2026 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2027 let store = root_vol
2028 .volume("test", StoreOptions { crypt: crypt.clone(), ..Default::default() })
2029 .await
2030 .expect("opening test volume");
2031 let root_dir = Directory::open(&store, store.root_directory_object_id())
2032 .await
2033 .expect("open failed");
2034 let (object_id, _, _) =
2035 root_dir.lookup("file").await.expect("lookup failed").expect("missing file");
2036 let test_file = ObjectStore::open_object(&store, object_id, Default::default(), None)
2037 .await
2038 .expect("open failed");
2039 let mut buffer = test_file.allocate_buffer(8192).await;
2040 assert_eq!(
2041 test_file.read(0, buffer.as_mut()).await.expect("read failed"),
2042 8192,
2043 "short read"
2044 );
2045 assert_eq!(buffer.as_slice()[0..4096], [0xA7; 4096]);
2046 assert_eq!(buffer.as_slice()[4096..8192], [0xA8; 4096]);
2047 }
2048 fs.close().await.expect("close failed");
2049 }
2050
2051 #[fuchsia::test]
2052 async fn test_image_builder_mode_no_early_writes() {
2053 const BLOCK_SIZE: u32 = 4096;
2054 let device = DeviceHolder::new(FakeDevice::new(2048, BLOCK_SIZE));
2055 device.reopen(true);
2056 let fs = FxFilesystemBuilder::new()
2057 .format(true)
2058 .image_builder_mode(Some(SuperBlockInstance::A))
2059 .open(device)
2060 .await
2061 .expect("open failed");
2062 fs.enable_allocations();
2063 fs.device().reopen(false);
2065 fs.close().await.expect("closed");
2066 }
2067
2068 #[fuchsia::test]
2069 async fn test_image_builder_mode() {
2070 const BLOCK_SIZE: u32 = 4096;
2071 const EXISTING_FILE_RANGE: Range<u64> = 4096 * 1024..4096 * 1025;
2072 let device = DeviceHolder::new(FakeDevice::new(2048, BLOCK_SIZE));
2073
2074 {
2076 let mut write_buf =
2077 device.allocate_buffer(EXISTING_FILE_RANGE.length().unwrap() as usize).await;
2078 write_buf.as_mut_slice().fill(0xf0);
2079 device.write(EXISTING_FILE_RANGE.start, write_buf.as_ref()).await.expect("write");
2080 }
2081
2082 device.reopen(true);
2083
2084 let device = {
2085 let fs = FxFilesystemBuilder::new()
2086 .format(true)
2087 .image_builder_mode(Some(SuperBlockInstance::B))
2088 .open(device)
2089 .await
2090 .expect("open failed");
2091 fs.enable_allocations();
2092 {
2093 let root_store = fs.root_store();
2094 let root_directory =
2095 Directory::open(&root_store, root_store.root_directory_object_id())
2096 .await
2097 .expect("open failed");
2098 let handle;
2100 {
2101 let mut transaction = fs
2102 .clone()
2103 .new_transaction(
2104 lock_keys![LockKey::object(
2105 root_directory.store().store_object_id(),
2106 root_directory.object_id()
2107 )],
2108 Options::default(),
2109 )
2110 .await
2111 .expect("new transaction");
2112 handle = root_directory
2113 .create_child_file(&mut transaction, "test")
2114 .await
2115 .expect("create file");
2116 handle.extend(&mut transaction, EXISTING_FILE_RANGE).await.expect("extend");
2117 transaction.commit().await.expect("commit");
2118 }
2119 }
2120 fs.device().reopen(false);
2121 fs.close().await.expect("close");
2122 fs.take_device().await
2123 };
2124 device.reopen(false);
2125 let fs = FxFilesystem::open(device).await.expect("open failed");
2126 fsck(fs.clone()).await.expect("fsck failed");
2127
2128 let root_store = fs.root_store();
2130 let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
2131 .await
2132 .expect("open failed");
2133 let (object_id, descriptor, _) =
2134 root_directory.lookup("test").await.expect("lookup failed").unwrap();
2135 assert_eq!(descriptor, ObjectDescriptor::File);
2136 let test_file =
2137 ObjectStore::open_object(&root_store, object_id, HandleOptions::default(), None)
2138 .await
2139 .expect("open failed");
2140 let mut read_buf =
2141 test_file.allocate_buffer(EXISTING_FILE_RANGE.length().unwrap() as usize).await;
2142 test_file.read(0, read_buf.as_mut()).await.expect("read failed");
2143 assert_eq!(read_buf.as_slice(), [0xf0; 4096]);
2144 fs.close().await.expect("closed");
2145 }
2146
2147 #[fuchsia::test]
2148 async fn test_read_only_mount_on_full_filesystem() {
2149 let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
2150 let fs =
2151 FxFilesystemBuilder::new().format(true).open(device).await.expect("new_empty failed");
2152 let root_store = fs.root_store();
2153 let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
2154 .await
2155 .expect("open failed");
2156
2157 let mut transaction = fs
2158 .clone()
2159 .new_transaction(
2160 lock_keys![LockKey::object(
2161 root_store.store_object_id(),
2162 root_directory.object_id()
2163 )],
2164 Options::default(),
2165 )
2166 .await
2167 .expect("new_transaction failed");
2168 let handle = root_directory
2169 .create_child_file(&mut transaction, "test")
2170 .await
2171 .expect("create_child_file failed");
2172 transaction.commit().await.expect("commit failed");
2173
2174 let mut buf = handle.allocate_buffer(4096).await;
2175 buf.as_mut_slice().fill(0xaa);
2176 loop {
2177 if handle.write_or_append(None, buf.as_ref()).await.is_err() {
2178 break;
2179 }
2180 }
2181
2182 let max_offset = fs.allocator().maximum_offset();
2183 fs.close().await.expect("Close failed");
2184
2185 let device = fs.take_device().await;
2186 device.reopen(false);
2187 let mut buffer = device
2188 .allocate_buffer(
2189 crate::round::round_up(max_offset, TEST_DEVICE_BLOCK_SIZE).unwrap() as usize
2190 )
2191 .await;
2192 device.read(0, buffer.as_mut()).await.expect("read failed");
2193
2194 let device = DeviceHolder::new(
2195 FakeDevice::from_image(&buffer.as_slice()[..], TEST_DEVICE_BLOCK_SIZE)
2196 .expect("from_image failed"),
2197 );
2198 let fs =
2199 FxFilesystemBuilder::new().read_only(true).open(device).await.expect("open failed");
2200 fs.close().await.expect("Close failed");
2201 }
2202
2203 #[test_case(SuperBlockInstance::A; "Superblock instance A")]
2204 #[test_case(SuperBlockInstance::B; "Superblock instance B")]
2205 #[fuchsia::test]
2206 async fn test_image_builder_mode_flush_on_close_sb_a(target_sb: SuperBlockInstance) {
2207 const BLOCK_SIZE: u32 = 4096;
2208 let device = DeviceHolder::new(FakeDevice::new(2048, BLOCK_SIZE));
2209
2210 device.reopen(true);
2212 let fs = FxFilesystemBuilder::new()
2213 .format(true)
2214 .image_builder_mode(Some(target_sb))
2215 .open(device)
2216 .await
2217 .expect("open failed");
2218
2219 fs.enable_allocations();
2220
2221 fs.device().reopen(false);
2223
2224 {
2226 let root_store = fs.root_store();
2227 let root_directory =
2228 Directory::open(&root_store, root_store.root_directory_object_id())
2229 .await
2230 .expect("open failed");
2231
2232 let mut transaction = fs
2233 .clone()
2234 .new_transaction(
2235 lock_keys![LockKey::object(
2236 root_directory.store().store_object_id(),
2237 root_directory.object_id()
2238 )],
2239 Options::default(),
2240 )
2241 .await
2242 .expect("new transaction");
2243 let handle = root_directory
2244 .create_child_file(&mut transaction, "post_finalize_file")
2245 .await
2246 .expect("create file");
2247 transaction.commit().await.expect("commit");
2248
2249 let mut buf = handle.allocate_buffer(BLOCK_SIZE as usize).await;
2250 buf.as_mut_slice().fill(0xaa);
2251 handle.write_or_append(None, buf.as_ref()).await.expect("write failed");
2252 }
2253
2254 fs.close().await.expect("close failed");
2256
2257 let other_sb = target_sb.next();
2258
2259 let device = fs.take_device().await;
2261 device.reopen(true); let mut buf = device.allocate_buffer(BLOCK_SIZE as usize).await;
2263
2264 device.read(target_sb.first_extent().start, buf.as_mut()).await.expect("read target_sb");
2265 assert_eq!(&buf.as_slice()[..8], b"FxfsSupr", "target_sb should have magic bytes");
2266
2267 buf.as_mut_slice().fill(0); device.read(other_sb.first_extent().start, buf.as_mut()).await.expect("read other_sb");
2269 assert_eq!(buf.as_slice(), &[0; 4096], "other_sb should be zeroed");
2271 }
2272}