1use crate::errors::FxfsError;
6use crate::fsck::{FsckOptions, fsck_volume_with_options, fsck_with_options};
7use crate::log::*;
8use crate::metrics;
9use crate::object_store::allocator::{Allocator, Hold, Reservation};
10use crate::object_store::directory::Directory;
11use crate::object_store::graveyard::Graveyard;
12use crate::object_store::journal::super_block::{SuperBlockHeader, SuperBlockInstance};
13use crate::object_store::journal::{self, Journal, JournalCheckpoint, JournalOptions};
14use crate::object_store::object_manager::ObjectManager;
15use crate::object_store::transaction::{
16 self, AssocObj, LockKey, LockKeys, LockManager, MetadataReservation, Mutation, ReadGuard,
17 TRANSACTION_METADATA_MAX_AMOUNT, Transaction, WriteGuard, lock_keys,
18};
19use crate::object_store::volume::{VOLUMES_DIRECTORY, root_volume};
20use crate::object_store::{NewChildStoreOptions, ObjectStore, StoreOptions};
21use crate::range::RangeExt;
22use crate::serialized_types::{LATEST_VERSION, Version};
23use anyhow::{Context, Error, anyhow, bail, ensure};
24use async_trait::async_trait;
25use event_listener::Event;
26use fuchsia_async as fasync;
27use fuchsia_inspect::{Inspector, LazyNode, NumericProperty as _, UintProperty};
28use fuchsia_sync::Mutex;
29use futures::FutureExt;
30use fxfs_crypto::Crypt;
31use once_cell::sync::OnceCell;
32use static_assertions::const_assert;
33use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
34use std::sync::{Arc, Weak};
35use storage_device::{Device, DeviceHolder};
36
37pub const MIN_BLOCK_SIZE: u64 = 4096;
38pub const MAX_BLOCK_SIZE: u64 = u16::MAX as u64 + 1;
39
40pub const MAX_FILE_SIZE: u64 = i64::MAX as u64 - 4095;
45const_assert!(9223372036854771712 == MAX_FILE_SIZE);
46
47const MAX_IN_FLIGHT_TRANSACTIONS: u64 = 4;
49
50const TRIM_AFTER_BOOT_TIMER: std::time::Duration = std::time::Duration::from_secs(60 * 60);
54
55const TRIM_INTERVAL_TIMER: std::time::Duration = std::time::Duration::from_secs(60 * 60 * 24);
57
58pub struct Info {
60 pub total_bytes: u64,
61 pub used_bytes: u64,
62}
63
64pub type PostCommitHook =
65 Option<Box<dyn Fn() -> futures::future::BoxFuture<'static, ()> + Send + Sync>>;
66
67pub type PreCommitHook = Option<Box<dyn Fn(&Transaction<'_>) -> Result<(), Error> + Send + Sync>>;
68
69pub struct Options {
70 pub read_only: bool,
72
73 pub roll_metadata_key_byte_count: u64,
77
78 pub pre_commit_hook: PreCommitHook,
81
82 pub post_commit_hook: PostCommitHook,
85
86 pub skip_initial_reap: bool,
89
90 pub trim_config: Option<(std::time::Duration, std::time::Duration)>,
95
96 pub image_builder_mode: Option<SuperBlockInstance>,
99
100 pub inline_crypto_enabled: bool,
107
108 pub barriers_enabled: bool,
113}
114
115impl Default for Options {
116 fn default() -> Self {
117 Options {
118 roll_metadata_key_byte_count: 128 * 1024 * 1024,
119 read_only: false,
120 pre_commit_hook: None,
121 post_commit_hook: None,
122 skip_initial_reap: false,
123 trim_config: Some((TRIM_AFTER_BOOT_TIMER, TRIM_INTERVAL_TIMER)),
124 image_builder_mode: None,
125 inline_crypto_enabled: false,
126 barriers_enabled: false,
127 }
128 }
129}
130
131pub struct ApplyContext<'a, 'b> {
133 pub mode: ApplyMode<'a, 'b>,
135
136 pub checkpoint: JournalCheckpoint,
138}
139
140pub enum ApplyMode<'a, 'b> {
143 Replay,
144 Live(&'a Transaction<'b>),
145}
146
147impl ApplyMode<'_, '_> {
148 pub fn is_replay(&self) -> bool {
149 matches!(self, ApplyMode::Replay)
150 }
151
152 pub fn is_live(&self) -> bool {
153 matches!(self, ApplyMode::Live(_))
154 }
155}
156
157#[async_trait]
160pub trait JournalingObject: Send + Sync {
161 fn apply_mutation(
165 &self,
166 mutation: Mutation,
167 context: &ApplyContext<'_, '_>,
168 assoc_obj: AssocObj<'_>,
169 ) -> Result<(), Error>;
170
171 fn drop_mutation(&self, mutation: Mutation, transaction: &Transaction<'_>);
173
174 async fn flush(&self) -> Result<Version, Error>;
178
179 fn write_mutation(&self, mutation: &Mutation, mut writer: journal::Writer<'_>) {
182 writer.write(mutation.clone());
183 }
184}
185
186#[derive(Default)]
187pub struct SyncOptions<'a> {
188 pub flush_device: bool,
195
196 pub precondition: Option<Box<dyn FnOnce() -> bool + 'a + Send>>,
199}
200
201pub struct OpenFxFilesystem(Arc<FxFilesystem>);
202
203impl OpenFxFilesystem {
204 pub async fn take_device(self) -> DeviceHolder {
207 let fut = self.device.take_when_dropped();
208 std::mem::drop(self);
209 debug_assert_not_too_long!(fut)
210 }
211
212 pub async fn finalize(&self) -> Result<(), Error> {
214 ensure!(
215 self.journal().image_builder_mode().is_some(),
216 "finalize() only valid in image_builder_mode."
217 );
218 self.journal().allocate_journal().await?;
219 self.journal().set_image_builder_mode(None);
220 self.journal().compact().await?;
221 Ok(())
222 }
223}
224
225impl From<Arc<FxFilesystem>> for OpenFxFilesystem {
226 fn from(fs: Arc<FxFilesystem>) -> Self {
227 Self(fs)
228 }
229}
230
231impl Drop for OpenFxFilesystem {
232 fn drop(&mut self) {
233 if self.options.image_builder_mode.is_some()
234 && self.journal().image_builder_mode().is_some()
235 {
236 error!("OpenFxFilesystem in image_builder_mode dropped without calling finalize().");
237 }
238 if !self.options.read_only && !self.closed.load(Ordering::SeqCst) {
239 error!("OpenFxFilesystem dropped without first being closed. Data loss may occur.");
240 }
241 }
242}
243
244impl std::ops::Deref for OpenFxFilesystem {
245 type Target = Arc<FxFilesystem>;
246
247 fn deref(&self) -> &Self::Target {
248 &self.0
249 }
250}
251
252pub struct FxFilesystemBuilder {
253 format: bool,
254 trace: bool,
255 options: Options,
256 journal_options: JournalOptions,
257 on_new_allocator: Option<Box<dyn Fn(Arc<Allocator>) + Send + Sync>>,
258 on_new_store: Option<Box<dyn Fn(&ObjectStore) + Send + Sync>>,
259 fsck_after_every_transaction: bool,
260}
261
262impl FxFilesystemBuilder {
263 pub fn new() -> Self {
264 Self {
265 format: false,
266 trace: false,
267 options: Options::default(),
268 journal_options: JournalOptions::default(),
269 on_new_allocator: None,
270 on_new_store: None,
271 fsck_after_every_transaction: false,
272 }
273 }
274
275 pub fn format(mut self, format: bool) -> Self {
277 self.format = format;
278 self
279 }
280
281 pub fn trace(mut self, trace: bool) -> Self {
283 self.trace = trace;
284 self
285 }
286
287 pub fn read_only(mut self, read_only: bool) -> Self {
290 self.options.read_only = read_only;
291 self
292 }
293
294 pub fn image_builder_mode(mut self, mode: Option<SuperBlockInstance>) -> Self {
301 self.options.image_builder_mode = mode;
302 self
303 }
304
305 pub fn roll_metadata_key_byte_count(mut self, roll_metadata_key_byte_count: u64) -> Self {
307 self.options.roll_metadata_key_byte_count = roll_metadata_key_byte_count;
308 self
309 }
310
311 pub fn pre_commit_hook(
313 mut self,
314 hook: impl Fn(&Transaction<'_>) -> Result<(), Error> + Send + Sync + 'static,
315 ) -> Self {
316 self.options.pre_commit_hook = Some(Box::new(hook));
317 self
318 }
319
320 pub fn post_commit_hook(
323 mut self,
324 hook: impl Fn() -> futures::future::BoxFuture<'static, ()> + Send + Sync + 'static,
325 ) -> Self {
326 self.options.post_commit_hook = Some(Box::new(hook));
327 self
328 }
329
330 pub fn skip_initial_reap(mut self, skip_initial_reap: bool) -> Self {
333 self.options.skip_initial_reap = skip_initial_reap;
334 self
335 }
336
337 pub fn journal_options(mut self, journal_options: JournalOptions) -> Self {
339 self.journal_options = journal_options;
340 self
341 }
342
343 pub fn on_new_allocator(
345 mut self,
346 on_new_allocator: impl Fn(Arc<Allocator>) + Send + Sync + 'static,
347 ) -> Self {
348 self.on_new_allocator = Some(Box::new(on_new_allocator));
349 self
350 }
351
352 pub fn on_new_store(
354 mut self,
355 on_new_store: impl Fn(&ObjectStore) + Send + Sync + 'static,
356 ) -> Self {
357 self.on_new_store = Some(Box::new(on_new_store));
358 self
359 }
360
361 pub fn fsck_after_every_transaction(mut self, fsck_after_every_transaction: bool) -> Self {
363 self.fsck_after_every_transaction = fsck_after_every_transaction;
364 self
365 }
366
367 pub fn trim_config(
368 mut self,
369 delay_and_interval: Option<(std::time::Duration, std::time::Duration)>,
370 ) -> Self {
371 self.options.trim_config = delay_and_interval;
372 self
373 }
374
375 pub fn inline_crypto_enabled(mut self, inline_crypto_enabled: bool) -> Self {
377 self.options.inline_crypto_enabled = inline_crypto_enabled;
378 self
379 }
380
381 pub fn barriers_enabled(mut self, barriers_enabled: bool) -> Self {
384 self.options.barriers_enabled = barriers_enabled;
385 self.journal_options.barriers_enabled = barriers_enabled;
386 self
387 }
388
389 pub async fn open(self, device: DeviceHolder) -> Result<OpenFxFilesystem, Error> {
391 let read_only = self.options.read_only;
392 if self.format && read_only {
393 bail!("Cannot initialize a filesystem as read-only");
394 }
395
396 if self.options.inline_crypto_enabled && !self.options.barriers_enabled {
398 bail!("A filesystem using inline encryption requires barriers");
399 }
400
401 let objects = Arc::new(ObjectManager::new(self.on_new_store));
402 let journal = Arc::new(Journal::new(objects.clone(), self.journal_options));
403
404 let image_builder_mode = self.options.image_builder_mode;
405
406 let block_size = std::cmp::max(device.block_size().into(), MIN_BLOCK_SIZE);
407 assert_eq!(block_size % MIN_BLOCK_SIZE, 0);
408 assert!(block_size <= MAX_BLOCK_SIZE, "Max supported block size is 64KiB");
409
410 let mut fsck_after_every_transaction = None;
411 let mut filesystem_options = self.options;
412 if self.fsck_after_every_transaction {
413 let instance =
414 FsckAfterEveryTransaction::new(filesystem_options.post_commit_hook.take());
415 fsck_after_every_transaction = Some(instance.clone());
416 filesystem_options.post_commit_hook =
417 Some(Box::new(move || instance.clone().run().boxed()));
418 }
419
420 if !read_only && !self.format {
421 device.flush().await.context("Device flush failed")?;
424 }
425
426 let filesystem = Arc::new_cyclic(|weak: &Weak<FxFilesystem>| {
427 let weak = weak.clone();
428 FxFilesystem {
429 device,
430 block_size,
431 objects: objects.clone(),
432 journal,
433 commit_mutex: futures::lock::Mutex::new(()),
434 lock_manager: LockManager::new(),
435 flush_task: Mutex::new(None),
436 trim_task: Mutex::new(None),
437 closed: AtomicBool::new(true),
438 shutdown_event: Event::new(),
439 trace: self.trace,
440 graveyard: Graveyard::new(objects.clone()),
441 completed_transactions: metrics::detail().create_uint("completed_transactions", 0),
442 options: filesystem_options,
443 in_flight_transactions: AtomicU64::new(0),
444 transaction_limit_event: Event::new(),
445 _stores_node: metrics::register_fs(move || {
446 let weak = weak.clone();
447 Box::pin(async move {
448 if let Some(fs) = weak.upgrade() {
449 fs.populate_stores_node().await
450 } else {
451 Err(anyhow!("Filesystem has been dropped"))
452 }
453 })
454 }),
455 }
456 });
457
458 filesystem.journal().set_image_builder_mode(image_builder_mode);
459
460 filesystem.journal.set_trace(self.trace);
461 if self.format {
462 filesystem.journal.init_empty(filesystem.clone()).await?;
463 if image_builder_mode.is_none() {
464 filesystem.journal.init_superblocks().await?;
467
468 filesystem.graveyard.clone().reap_async();
470 }
471
472 let root_store = filesystem.root_store();
474 root_store.set_trace(self.trace);
475 let root_directory =
476 Directory::open(&root_store, root_store.root_directory_object_id())
477 .await
478 .context("Unable to open root volume directory")?;
479 let mut transaction = filesystem
480 .clone()
481 .new_transaction(
482 lock_keys![LockKey::object(
483 root_store.store_object_id(),
484 root_directory.object_id()
485 )],
486 transaction::Options::default(),
487 )
488 .await?;
489 let volume_directory =
490 root_directory.create_child_dir(&mut transaction, VOLUMES_DIRECTORY).await?;
491 transaction.commit().await?;
492 objects.set_volume_directory(volume_directory);
493 } else {
494 filesystem
495 .journal
496 .replay(filesystem.clone(), self.on_new_allocator)
497 .await
498 .context("Journal replay failed")?;
499 filesystem.root_store().set_trace(self.trace);
500
501 if !read_only {
502 for store in objects.unlocked_stores() {
506 filesystem.graveyard.initial_reap(&store).await?;
507 }
508 }
509 }
510
511 if let Some(fsck_after_every_transaction) = fsck_after_every_transaction {
513 fsck_after_every_transaction
514 .fs
515 .set(Arc::downgrade(&filesystem))
516 .unwrap_or_else(|_| unreachable!());
517 }
518
519 filesystem.closed.store(false, Ordering::SeqCst);
520
521 if !read_only && image_builder_mode.is_none() {
522 filesystem.graveyard.clone().reap_async();
524
525 if let Some((delay, interval)) = filesystem.options.trim_config.clone() {
526 filesystem.start_trim_task(delay, interval);
527 }
528 }
529
530 Ok(filesystem.into())
531 }
532}
533
534pub struct FxFilesystem {
535 block_size: u64,
536 objects: Arc<ObjectManager>,
537 journal: Arc<Journal>,
538 commit_mutex: futures::lock::Mutex<()>,
539 lock_manager: LockManager,
540 flush_task: Mutex<Option<fasync::Task<()>>>,
541 trim_task: Mutex<Option<fasync::Task<()>>>,
542 closed: AtomicBool,
543 shutdown_event: Event,
545 trace: bool,
546 graveyard: Arc<Graveyard>,
547 completed_transactions: UintProperty,
548 options: Options,
549
550 in_flight_transactions: AtomicU64,
552
553 transaction_limit_event: Event,
556
557 device: DeviceHolder,
560
561 _stores_node: LazyNode,
563}
564
565#[fxfs_trace::trace]
566impl FxFilesystem {
567 pub async fn new_empty(device: DeviceHolder) -> Result<OpenFxFilesystem, Error> {
568 FxFilesystemBuilder::new().format(true).open(device).await
569 }
570
571 pub async fn open(device: DeviceHolder) -> Result<OpenFxFilesystem, Error> {
572 FxFilesystemBuilder::new().open(device).await
573 }
574
575 pub fn root_parent_store(&self) -> Arc<ObjectStore> {
576 self.objects.root_parent_store()
577 }
578
579 pub async fn close(&self) -> Result<(), Error> {
580 assert_eq!(self.closed.swap(true, Ordering::SeqCst), false);
581 self.shutdown_event.notify(usize::MAX);
582 debug_assert_not_too_long!(self.graveyard.wait_for_reap());
583 let trim_task = self.trim_task.lock().take();
584 if let Some(task) = trim_task {
585 debug_assert_not_too_long!(task);
586 }
587 self.journal.stop_compactions().await;
588 let sync_status =
589 self.journal.sync(SyncOptions { flush_device: true, ..Default::default() }).await;
590 match &sync_status {
591 Ok(checkpoint) => info!(
592 "Filesystem closed (checkpoint={}, metadata_reservation={:?}, \
593 reservation_required={}, borrowed={})",
594 checkpoint.as_ref().unwrap().0.file_offset,
595 self.object_manager().metadata_reservation(),
596 self.object_manager().required_reservation(),
597 self.object_manager().borrowed_metadata_space(),
598 ),
599 Err(e) => error!(error:? = e; "Failed to sync filesystem; data may be lost"),
600 }
601 self.journal.terminate();
602 let flush_task = self.flush_task.lock().take();
603 if let Some(task) = flush_task {
604 debug_assert_not_too_long!(task);
605 }
606 self.device().close().await.context("Failed to close device")?;
609 sync_status.map(|_| ())
610 }
611
612 pub fn device(&self) -> Arc<dyn Device> {
613 Arc::clone(&self.device)
614 }
615
616 pub fn root_store(&self) -> Arc<ObjectStore> {
617 self.objects.root_store()
618 }
619
620 pub fn allocator(&self) -> Arc<Allocator> {
621 self.objects.allocator()
622 }
623
624 pub fn object_manager(&self) -> &Arc<ObjectManager> {
625 &self.objects
626 }
627
628 pub fn journal(&self) -> &Arc<Journal> {
629 &self.journal
630 }
631
632 pub async fn sync(&self, options: SyncOptions<'_>) -> Result<(), Error> {
633 self.journal.sync(options).await.map(|_| ())
634 }
635
636 pub fn block_size(&self) -> u64 {
637 self.block_size
638 }
639
640 pub fn get_info(&self) -> Info {
641 Info {
642 total_bytes: self.device.size(),
643 used_bytes: self.object_manager().allocator().get_used_bytes().0,
644 }
645 }
646
647 pub fn super_block_header(&self) -> SuperBlockHeader {
648 self.journal.super_block_header()
649 }
650
651 pub fn graveyard(&self) -> &Arc<Graveyard> {
652 &self.graveyard
653 }
654
655 pub fn trace(&self) -> bool {
656 self.trace
657 }
658
659 pub fn options(&self) -> &Options {
660 &self.options
661 }
662
663 pub async fn txn_guard(self: Arc<Self>) -> TxnGuard<'static> {
670 TxnGuard::Owned(
671 self.lock_manager
672 .read_lock(lock_keys!(LockKey::Filesystem))
673 .await
674 .into_owned(self.clone()),
675 )
676 }
677
678 pub async fn new_transaction<'a>(
679 self: Arc<Self>,
680 locks: LockKeys,
681 options: transaction::Options<'a>,
682 ) -> Result<Transaction<'a>, Error> {
683 let guard = if let Some(guard) = options.txn_guard.as_ref() {
684 TxnGuard::Borrowed(guard)
685 } else {
686 self.txn_guard().await
687 };
688 Transaction::new(guard, options, locks).await
689 }
690
691 #[trace]
692 pub async fn commit_transaction(
693 &self,
694 transaction: &mut Transaction<'_>,
695 callback: &mut (dyn FnMut(u64) + Send),
696 ) -> Result<u64, Error> {
697 if let Some(hook) = self.options.pre_commit_hook.as_ref() {
698 hook(transaction)?;
699 }
700 debug_assert_not_too_long!(self.lock_manager.commit_prepare(&transaction));
701 self.maybe_start_flush_task();
702 let _guard = debug_assert_not_too_long!(self.commit_mutex.lock());
703 let journal_offset = if self.journal().image_builder_mode().is_some() {
704 let journal_checkpoint =
705 JournalCheckpoint { file_offset: 0, checksum: 0, version: LATEST_VERSION };
706 let maybe_mutation = self
707 .object_manager()
708 .apply_transaction(transaction, &journal_checkpoint)
709 .expect("Transactions must not fail in image_builder_mode");
710 if let Some(mutation) = maybe_mutation {
711 assert!(matches!(mutation, Mutation::UpdateBorrowed(_)));
712 }
716 self.object_manager().did_commit_transaction(transaction, &journal_checkpoint, 0);
717 0
718 } else {
719 self.journal.commit(transaction).await?
720 };
721 self.completed_transactions.add(1);
722
723 callback(journal_offset);
728
729 if let Some(hook) = self.options.post_commit_hook.as_ref() {
730 hook().await;
731 }
732
733 Ok(journal_offset)
734 }
735
736 pub fn lock_manager(&self) -> &LockManager {
737 &self.lock_manager
738 }
739
740 pub(crate) fn drop_transaction(&self, transaction: &mut Transaction<'_>) {
741 if !matches!(transaction.metadata_reservation, MetadataReservation::None) {
742 self.sub_transaction();
743 }
744 if let MetadataReservation::Hold(hold_amount) =
746 std::mem::replace(&mut transaction.metadata_reservation, MetadataReservation::None)
747 {
748 let hold = transaction
749 .allocator_reservation
750 .unwrap()
751 .reserve(0)
752 .expect("Zero should always succeed.");
753 hold.add(hold_amount);
754 }
755 self.objects.drop_transaction(transaction);
756 self.lock_manager.drop_transaction(transaction);
757 }
758
759 fn maybe_start_flush_task(&self) {
760 let mut flush_task = self.flush_task.lock();
761 if flush_task.is_none() {
762 let journal = self.journal.clone();
763 *flush_task = Some(fasync::Task::spawn(journal.flush_task()));
764 }
765 }
766
767 async fn do_trim(&self) -> Result<usize, Error> {
769 const MAX_EXTENTS_PER_BATCH: usize = 8;
770 const MAX_EXTENT_SIZE: usize = 256 * 1024;
771 let mut offset = 0;
772 let mut bytes_trimmed = 0;
773 loop {
774 if self.closed.load(Ordering::Relaxed) {
775 info!("Filesystem is closed, nothing to trim");
776 return Ok(bytes_trimmed);
777 }
778 let allocator = self.allocator();
779 let trimmable_extents =
780 allocator.take_for_trimming(offset, MAX_EXTENT_SIZE, MAX_EXTENTS_PER_BATCH).await?;
781 for device_range in trimmable_extents.extents() {
782 self.device.trim(device_range.clone()).await?;
783 bytes_trimmed += device_range.length()? as usize;
784 }
785 if let Some(device_range) = trimmable_extents.extents().last() {
786 offset = device_range.end;
787 } else {
788 break;
789 }
790 }
791 Ok(bytes_trimmed)
792 }
793
794 fn start_trim_task(
795 self: &Arc<Self>,
796 delay: std::time::Duration,
797 interval: std::time::Duration,
798 ) {
799 if !self.device.supports_trim() {
800 info!("Device does not support trim; not scheduling trimming");
801 return;
802 }
803 let this = self.clone();
804 let mut next_timer = delay;
805 *self.trim_task.lock() = Some(fasync::Task::spawn(async move {
806 loop {
807 let shutdown_listener = this.shutdown_event.listen();
808 if this.closed.load(Ordering::SeqCst) {
814 return;
815 }
816 futures::select!(
817 () = fasync::Timer::new(next_timer.clone()).fuse() => {},
818 () = shutdown_listener.fuse() => return,
819 );
820 let start_time = std::time::Instant::now();
821 let res = this.do_trim().await;
822 let duration = std::time::Instant::now() - start_time;
823 next_timer = interval.clone();
824 match res {
825 Ok(bytes_trimmed) => info!(
826 "Trimmed {bytes_trimmed} bytes in {duration:?}. Next trim in \
827 {next_timer:?}",
828 ),
829 Err(e) => error!(e:?; "Failed to trim"),
830 }
831 }
832 }));
833 }
834
835 pub(crate) async fn reservation_for_transaction<'a>(
836 self: &Arc<Self>,
837 options: transaction::Options<'a>,
838 ) -> Result<(MetadataReservation, Option<&'a Reservation>, Option<Hold<'a>>), Error> {
839 if self.options.image_builder_mode.is_some() {
840 return Ok((MetadataReservation::Borrowed, None, None));
843 }
844 if !options.skip_journal_checks {
845 self.maybe_start_flush_task();
846 self.journal.check_journal_space().await?;
847 }
848
849 let mut hold = None;
863 let metadata_reservation = if options.borrow_metadata_space {
864 MetadataReservation::Borrowed
865 } else {
866 match options.allocator_reservation {
867 Some(reservation) => {
868 hold = Some(
869 reservation
870 .reserve(TRANSACTION_METADATA_MAX_AMOUNT)
871 .ok_or(FxfsError::NoSpace)?,
872 );
873 MetadataReservation::Hold(TRANSACTION_METADATA_MAX_AMOUNT)
874 }
875 None => {
876 let reservation = self
877 .allocator()
878 .reserve(None, TRANSACTION_METADATA_MAX_AMOUNT)
879 .ok_or(FxfsError::NoSpace)?;
880 MetadataReservation::Reservation(reservation)
881 }
882 }
883 };
884 Ok((metadata_reservation, options.allocator_reservation, hold))
885 }
886
887 pub(crate) async fn add_transaction(&self, skip_journal_checks: bool) {
888 if skip_journal_checks {
889 self.in_flight_transactions.fetch_add(1, Ordering::Relaxed);
890 } else {
891 let inc = || {
892 let mut in_flights = self.in_flight_transactions.load(Ordering::Relaxed);
893 while in_flights < MAX_IN_FLIGHT_TRANSACTIONS {
894 match self.in_flight_transactions.compare_exchange_weak(
895 in_flights,
896 in_flights + 1,
897 Ordering::Relaxed,
898 Ordering::Relaxed,
899 ) {
900 Ok(_) => return true,
901 Err(x) => in_flights = x,
902 }
903 }
904 return false;
905 };
906 while !inc() {
907 let listener = self.transaction_limit_event.listen();
908 if inc() {
909 break;
910 }
911 listener.await;
912 }
913 }
914 }
915
916 pub(crate) fn sub_transaction(&self) {
917 let old = self.in_flight_transactions.fetch_sub(1, Ordering::Relaxed);
918 assert!(old != 0);
919 if old <= MAX_IN_FLIGHT_TRANSACTIONS {
920 self.transaction_limit_event.notify(usize::MAX);
921 }
922 }
923
924 pub async fn truncate_guard(&self, store_id: u64, object_id: u64) -> TruncateGuard<'_> {
925 let keys = lock_keys![LockKey::truncate(store_id, object_id,)];
926 TruncateGuard(self.lock_manager().write_lock(keys).await)
927 }
928
929 async fn populate_stores_node(&self) -> Result<Inspector, Error> {
930 let inspector = fuchsia_inspect::Inspector::default();
931 let root = inspector.root();
932 root.record_child("__root", |n| self.root_store().record_data(n));
933 let object_manager = self.object_manager();
934 let volume_directory = object_manager.volume_directory();
935 let layer_set = volume_directory.store().tree().layer_set();
936 let mut merger = layer_set.merger();
937 let mut iter = volume_directory.iter(&mut merger).await?;
938 while let Some((name, id, _)) = iter.get() {
939 if let Some(store) = object_manager.store(id) {
940 root.record_child(name.to_string(), |n| store.record_data(n));
941 }
942 iter.advance().await?;
943 }
944 Ok(inspector)
945 }
946}
947
948pub enum TxnGuard<'a> {
949 Borrowed(&'a TxnGuard<'a>),
950 Owned(ReadGuard<'static>),
951}
952
953impl TxnGuard<'_> {
954 pub fn fs(&self) -> &Arc<FxFilesystem> {
955 match self {
956 TxnGuard::Borrowed(b) => b.fs(),
957 TxnGuard::Owned(o) => o.fs().unwrap(),
958 }
959 }
960}
961
962#[allow(dead_code)]
964pub struct TruncateGuard<'a>(WriteGuard<'a>);
965
966pub async fn mkfs(device: DeviceHolder) -> Result<DeviceHolder, Error> {
968 let fs = FxFilesystem::new_empty(device).await?;
969 fs.close().await?;
970 Ok(fs.take_device().await)
971}
972
973pub async fn mkfs_with_volume(
977 device: DeviceHolder,
978 volume_name: &str,
979 crypt: Option<Arc<dyn Crypt>>,
980) -> Result<DeviceHolder, Error> {
981 let fs = FxFilesystem::new_empty(device).await?;
982 {
983 let root_volume = root_volume(fs.clone()).await.expect("Open root_volume failed");
986 root_volume
987 .new_volume(
988 volume_name,
989 NewChildStoreOptions {
990 options: StoreOptions { crypt, ..StoreOptions::default() },
991 ..Default::default()
992 },
993 )
994 .await
995 .expect("Create volume failed");
996 }
997 fs.close().await?;
998 Ok(fs.take_device().await)
999}
1000
1001struct FsckAfterEveryTransaction {
1002 fs: OnceCell<Weak<FxFilesystem>>,
1003 old_hook: PostCommitHook,
1004}
1005
1006impl FsckAfterEveryTransaction {
1007 fn new(old_hook: PostCommitHook) -> Arc<Self> {
1008 Arc::new(Self { fs: OnceCell::new(), old_hook })
1009 }
1010
1011 async fn run(self: Arc<Self>) {
1012 if let Some(fs) = self.fs.get().and_then(Weak::upgrade) {
1013 let options = FsckOptions {
1014 fail_on_warning: true,
1015 no_lock: true,
1016 quiet: true,
1017 ..Default::default()
1018 };
1019 fsck_with_options(fs.clone(), &options).await.expect("fsck failed");
1020 let object_manager = fs.object_manager();
1021 for store in object_manager.unlocked_stores() {
1022 let store_id = store.store_object_id();
1023 if !object_manager.is_system_store(store_id) {
1024 fsck_volume_with_options(fs.as_ref(), &options, store_id, None)
1025 .await
1026 .expect("fsck_volume_with_options failed");
1027 }
1028 }
1029 }
1030 if let Some(old_hook) = self.old_hook.as_ref() {
1031 old_hook().await;
1032 }
1033 }
1034}
1035
1036#[cfg(test)]
1037mod tests {
1038 use super::{FxFilesystem, FxFilesystemBuilder, SyncOptions};
1039 use crate::fsck::{fsck, fsck_volume};
1040 use crate::log::*;
1041 use crate::lsm_tree::Operation;
1042 use crate::lsm_tree::types::Item;
1043 use crate::object_handle::{
1044 INVALID_OBJECT_ID, ObjectHandle, ReadObjectHandle, WriteObjectHandle,
1045 };
1046 use crate::object_store::directory::{Directory, replace_child};
1047 use crate::object_store::journal::JournalOptions;
1048 use crate::object_store::journal::super_block::SuperBlockInstance;
1049 use crate::object_store::transaction::{LockKey, Options, lock_keys};
1050 use crate::object_store::volume::root_volume;
1051 use crate::object_store::{
1052 HandleOptions, NewChildStoreOptions, ObjectDescriptor, ObjectStore, StoreOptions,
1053 };
1054 use crate::range::RangeExt;
1055 use fuchsia_async as fasync;
1056 use fuchsia_sync::Mutex;
1057 use futures::future::join_all;
1058 use futures::stream::{FuturesUnordered, TryStreamExt};
1059 use fxfs_insecure_crypto::InsecureCrypt;
1060 use rustc_hash::FxHashMap as HashMap;
1061 use std::ops::Range;
1062 use std::sync::Arc;
1063 use std::sync::atomic::{self, AtomicU32};
1064 use std::time::Duration;
1065 use storage_device::DeviceHolder;
1066 use storage_device::fake_device::{self, FakeDevice};
1067 use test_case::test_case;
1068
1069 const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
1070
1071 #[fuchsia::test(threads = 10)]
1072 async fn test_compaction() {
1073 let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1074
1075 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1077 let root_store = fs.root_store();
1078 let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
1079 .await
1080 .expect("open failed");
1081
1082 let mut tasks = Vec::new();
1083 for i in 0..2 {
1084 let mut transaction = fs
1085 .clone()
1086 .new_transaction(
1087 lock_keys![LockKey::object(
1088 root_store.store_object_id(),
1089 root_directory.object_id()
1090 )],
1091 Options::default(),
1092 )
1093 .await
1094 .expect("new_transaction failed");
1095 let handle = root_directory
1096 .create_child_file(&mut transaction, &format!("{}", i))
1097 .await
1098 .expect("create_child_file failed");
1099 transaction.commit().await.expect("commit failed");
1100 tasks.push(fasync::Task::spawn(async move {
1101 const TEST_DATA: &[u8] = b"hello";
1102 let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
1103 buf.as_mut_slice().copy_from_slice(TEST_DATA);
1104 for _ in 0..1500 {
1105 handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
1106 }
1107 }));
1108 }
1109 join_all(tasks).await;
1110 fs.sync(SyncOptions::default()).await.expect("sync failed");
1111
1112 fsck(fs.clone()).await.expect("fsck failed");
1113 fs.close().await.expect("Close failed");
1114 }
1115
1116 #[fuchsia::test(threads = 10)]
1117 async fn test_replay_is_identical() {
1118 let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1119 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1120
1121 fs.close().await.expect("close failed");
1124 let device = fs.take_device().await;
1125 device.reopen(false);
1126
1127 struct Mutations<K, V>(Mutex<Vec<(Operation, Item<K, V>)>>);
1128
1129 impl<K: Clone, V: Clone> Mutations<K, V> {
1130 fn new() -> Self {
1131 Mutations(Mutex::new(Vec::new()))
1132 }
1133
1134 fn push(&self, operation: Operation, item: &Item<K, V>) {
1135 self.0.lock().push((operation, item.clone()));
1136 }
1137 }
1138
1139 let open_fs = |device,
1140 object_mutations: Arc<Mutex<HashMap<_, _>>>,
1141 allocator_mutations: Arc<Mutations<_, _>>| async {
1142 FxFilesystemBuilder::new()
1143 .journal_options(JournalOptions { reclaim_size: u64::MAX, ..Default::default() })
1144 .on_new_allocator(move |allocator| {
1145 let allocator_mutations = allocator_mutations.clone();
1146 allocator.tree().set_mutation_callback(Some(Box::new(move |op, item| {
1147 allocator_mutations.push(op, item)
1148 })));
1149 })
1150 .on_new_store(move |store| {
1151 let mutations = Arc::new(Mutations::new());
1152 object_mutations.lock().insert(store.store_object_id(), mutations.clone());
1153 store.tree().set_mutation_callback(Some(Box::new(move |op, item| {
1154 mutations.push(op, item)
1155 })));
1156 })
1157 .open(device)
1158 .await
1159 .expect("open failed")
1160 };
1161
1162 let allocator_mutations = Arc::new(Mutations::new());
1163 let object_mutations = Arc::new(Mutex::new(HashMap::default()));
1164 let fs = open_fs(device, object_mutations.clone(), allocator_mutations.clone()).await;
1165
1166 let root_store = fs.root_store();
1167 let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
1168 .await
1169 .expect("open failed");
1170
1171 let mut transaction = fs
1172 .clone()
1173 .new_transaction(
1174 lock_keys![LockKey::object(
1175 root_store.store_object_id(),
1176 root_directory.object_id()
1177 )],
1178 Options::default(),
1179 )
1180 .await
1181 .expect("new_transaction failed");
1182 let object = root_directory
1183 .create_child_file(&mut transaction, "test")
1184 .await
1185 .expect("create_child_file failed");
1186 transaction.commit().await.expect("commit failed");
1187
1188 let buf = object.allocate_buffer(10000).await;
1190 object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
1191
1192 object.write_or_append(Some(5000), buf.as_ref()).await.expect("write failed");
1194
1195 object.truncate(3000).await.expect("truncate failed");
1197
1198 let mut transaction = fs
1200 .clone()
1201 .new_transaction(
1202 lock_keys![
1203 LockKey::object(root_store.store_object_id(), root_directory.object_id()),
1204 LockKey::object(root_store.store_object_id(), object.object_id()),
1205 ],
1206 Options::default(),
1207 )
1208 .await
1209 .expect("new_transaction failed");
1210
1211 replace_child(&mut transaction, None, (&root_directory, "test"))
1212 .await
1213 .expect("replace_child failed");
1214
1215 transaction.commit().await.expect("commit failed");
1216
1217 root_store
1219 .tombstone_object(object.object_id(), Options::default())
1220 .await
1221 .expect("tombstone failed");
1222
1223 fs.close().await.expect("close failed");
1225
1226 let metadata_reservation_amount = fs.object_manager().metadata_reservation().amount();
1227
1228 let device = fs.take_device().await;
1229 device.reopen(false);
1230
1231 let replayed_object_mutations = Arc::new(Mutex::new(HashMap::default()));
1232 let replayed_allocator_mutations = Arc::new(Mutations::new());
1233 let fs = open_fs(
1234 device,
1235 replayed_object_mutations.clone(),
1236 replayed_allocator_mutations.clone(),
1237 )
1238 .await;
1239
1240 let m1 = object_mutations.lock();
1241 let m2 = replayed_object_mutations.lock();
1242 assert_eq!(m1.len(), m2.len());
1243 for (store_id, mutations) in &*m1 {
1244 let mutations = mutations.0.lock();
1245 let replayed = m2.get(&store_id).expect("Found unexpected store").0.lock();
1246 assert_eq!(mutations.len(), replayed.len());
1247 for ((op1, i1), (op2, i2)) in mutations.iter().zip(replayed.iter()) {
1248 assert_eq!(op1, op2);
1249 assert_eq!(i1.key, i2.key);
1250 assert_eq!(i1.value, i2.value);
1251 assert_eq!(i1.sequence, i2.sequence);
1252 }
1253 }
1254
1255 let a1 = allocator_mutations.0.lock();
1256 let a2 = replayed_allocator_mutations.0.lock();
1257 assert_eq!(a1.len(), a2.len());
1258 for ((op1, i1), (op2, i2)) in a1.iter().zip(a2.iter()) {
1259 assert_eq!(op1, op2);
1260 assert_eq!(i1.key, i2.key);
1261 assert_eq!(i1.value, i2.value);
1262 assert_eq!(i1.sequence, i2.sequence);
1263 }
1264
1265 assert_eq!(
1266 fs.object_manager().metadata_reservation().amount(),
1267 metadata_reservation_amount
1268 );
1269 }
1270
1271 #[fuchsia::test]
1272 async fn test_max_in_flight_transactions() {
1273 let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1274 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1275
1276 let transactions = FuturesUnordered::new();
1277 for _ in 0..super::MAX_IN_FLIGHT_TRANSACTIONS {
1278 transactions.push(fs.clone().new_transaction(lock_keys![], Options::default()));
1279 }
1280 let mut transactions: Vec<_> = transactions.try_collect().await.unwrap();
1281
1282 let mut fut = std::pin::pin!(fs.clone().new_transaction(lock_keys![], Options::default()));
1284 assert!(futures::poll!(&mut fut).is_pending());
1285
1286 transactions.pop();
1288
1289 assert!(futures::poll!(&mut fut).is_ready());
1290 }
1291
1292 #[fuchsia::test(threads = 10)]
1294 async fn test_continuously_trim() {
1295 let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1296 let fs = FxFilesystemBuilder::new()
1297 .trim_config(Some((Duration::ZERO, Duration::ZERO)))
1298 .format(true)
1299 .open(device)
1300 .await
1301 .expect("open failed");
1302 fasync::Timer::new(Duration::from_millis(10)).await;
1304
1305 let root_store = fs.root_store();
1308 let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
1309 .await
1310 .expect("open failed");
1311 for _ in 0..100 {
1312 let mut transaction = fs
1313 .clone()
1314 .new_transaction(
1315 lock_keys![LockKey::object(
1316 root_store.store_object_id(),
1317 root_directory.object_id()
1318 )],
1319 Options::default(),
1320 )
1321 .await
1322 .expect("new_transaction failed");
1323 let object = root_directory
1324 .create_child_file(&mut transaction, "test")
1325 .await
1326 .expect("create_child_file failed");
1327 transaction.commit().await.expect("commit failed");
1328
1329 {
1330 let buf = object.allocate_buffer(1024).await;
1331 object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
1332 }
1333 std::mem::drop(object);
1334
1335 let mut transaction = root_directory
1336 .acquire_context_for_replace(None, "test", true)
1337 .await
1338 .expect("acquire_context_for_replace failed")
1339 .transaction;
1340 replace_child(&mut transaction, None, (&root_directory, "test"))
1341 .await
1342 .expect("replace_child failed");
1343 transaction.commit().await.expect("commit failed");
1344 }
1345 fs.close().await.expect("close failed");
1346 }
1347
1348 #[test_case(true; "test power fail with barriers")]
1349 #[test_case(false; "test power fail with checksums")]
1350 #[fuchsia::test]
1351 async fn test_power_fail(barriers_enabled: bool) {
1352 for _ in 0..10 {
1355 let (store_id, device, test_file_object_id) = {
1356 let device = DeviceHolder::new(FakeDevice::new(8192, 4096));
1357 let fs = if barriers_enabled {
1358 FxFilesystemBuilder::new()
1359 .barriers_enabled(true)
1360 .format(true)
1361 .open(device)
1362 .await
1363 .expect("new filesystem failed")
1364 } else {
1365 FxFilesystem::new_empty(device).await.expect("new_empty failed")
1366 };
1367 let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
1368
1369 fs.sync(SyncOptions { flush_device: true, ..SyncOptions::default() })
1370 .await
1371 .expect("sync failed");
1372
1373 let store = root_volume
1374 .new_volume(
1375 "test",
1376 NewChildStoreOptions {
1377 options: StoreOptions {
1378 crypt: Some(Arc::new(InsecureCrypt::new())),
1379 ..StoreOptions::default()
1380 },
1381 ..Default::default()
1382 },
1383 )
1384 .await
1385 .expect("new_volume failed");
1386 let root_directory = Directory::open(&store, store.root_directory_object_id())
1387 .await
1388 .expect("open failed");
1389
1390 async fn create_files(store: &Arc<ObjectStore>, prefix: &str) {
1392 let fs = store.filesystem();
1393 let root_directory = Directory::open(store, store.root_directory_object_id())
1394 .await
1395 .expect("open failed");
1396 for i in 0..100 {
1397 let mut transaction = fs
1398 .clone()
1399 .new_transaction(
1400 lock_keys![LockKey::object(
1401 store.store_object_id(),
1402 store.root_directory_object_id()
1403 )],
1404 Options::default(),
1405 )
1406 .await
1407 .expect("new_transaction failed");
1408 root_directory
1409 .create_child_file(&mut transaction, &format!("{prefix} {i}"))
1410 .await
1411 .expect("create_child_file failed");
1412 transaction.commit().await.expect("commit failed");
1413 }
1414 }
1415
1416 create_files(&store, "A").await;
1418
1419 let mut transaction = fs
1422 .clone()
1423 .new_transaction(
1424 lock_keys![LockKey::object(
1425 store.store_object_id(),
1426 store.root_directory_object_id()
1427 )],
1428 Options::default(),
1429 )
1430 .await
1431 .expect("new_transaction failed");
1432 let object = root_directory
1433 .create_child_file(&mut transaction, "test")
1434 .await
1435 .expect("create_child_file failed");
1436 transaction.commit().await.expect("commit failed");
1437
1438 let mut transaction =
1439 object.new_transaction().await.expect("new_transaction failed");
1440 let mut buffer = object.allocate_buffer(4096).await;
1441 buffer.as_mut_slice().fill(0xed);
1442 object
1443 .txn_write(&mut transaction, 0, buffer.as_ref())
1444 .await
1445 .expect("txn_write failed");
1446 transaction.commit().await.expect("commit failed");
1447
1448 create_files(&store, "B").await;
1450
1451 fs.sync(SyncOptions::default()).await.expect("sync failed");
1454
1455 fasync::Timer::new(Duration::from_millis(10)).await;
1461
1462 (
1463 store.store_object_id(),
1464 fs.device().snapshot().expect("snapshot failed"),
1465 object.object_id(),
1466 )
1467 };
1468
1469 device
1472 .discard_random_since_last_flush()
1473 .expect("discard_random_since_last_flush failed");
1474
1475 let fs = FxFilesystem::open(device).await.expect("open failed");
1476 fsck(fs.clone()).await.expect("fsck failed");
1477
1478 let mut check_test_file = false;
1479
1480 let object_id = if fs.object_manager().store(store_id).is_some() {
1483 fsck_volume(&fs, store_id, Some(Arc::new(InsecureCrypt::new())))
1484 .await
1485 .expect("fsck_volume failed");
1486
1487 let store = root_volume(fs.clone())
1491 .await
1492 .expect("root_volume failed")
1493 .volume(
1494 "test",
1495 StoreOptions {
1496 crypt: Some(Arc::new(InsecureCrypt::new())),
1497 ..StoreOptions::default()
1498 },
1499 )
1500 .await
1501 .expect("volume failed");
1502
1503 let root_directory = Directory::open(&store, store.root_directory_object_id())
1504 .await
1505 .expect("open failed");
1506
1507 let mut transaction = fs
1508 .clone()
1509 .new_transaction(
1510 lock_keys![LockKey::object(
1511 store.store_object_id(),
1512 store.root_directory_object_id()
1513 )],
1514 Options::default(),
1515 )
1516 .await
1517 .expect("new_transaction failed");
1518 let object = root_directory
1519 .create_child_file(&mut transaction, &format!("C"))
1520 .await
1521 .expect("create_child_file failed");
1522 transaction.commit().await.expect("commit failed");
1523
1524 if let Ok(test_file) = ObjectStore::open_object(
1526 &store,
1527 test_file_object_id,
1528 HandleOptions::default(),
1529 None,
1530 )
1531 .await
1532 {
1533 let mut buffer = test_file.allocate_buffer(4096).await;
1535 let bytes = test_file.read(0, buffer.as_mut()).await.expect("read failed");
1536 if bytes == 4096 {
1537 let expected = [0xed; 4096];
1538 assert_eq!(buffer.as_slice(), &expected);
1539 } else {
1540 assert_eq!(bytes, 0);
1542 }
1543
1544 let mut transaction =
1546 test_file.new_transaction().await.expect("new_transaction failed");
1547 buffer.as_mut_slice().fill(0x37);
1548 test_file
1549 .txn_write(&mut transaction, 0, buffer.as_ref())
1550 .await
1551 .expect("txn_write failed");
1552 transaction.commit().await.expect("commit failed");
1553 check_test_file = true;
1554 }
1555
1556 object.object_id()
1557 } else {
1558 INVALID_OBJECT_ID
1559 };
1560
1561 fs.close().await.expect("close failed");
1563 let device = fs.take_device().await;
1564 device.reopen(false);
1565
1566 let fs = FxFilesystem::open(device).await.expect("open failed");
1567 fsck(fs.clone()).await.expect("fsck failed");
1568
1569 if object_id != INVALID_OBJECT_ID {
1572 fsck_volume(&fs, store_id, Some(Arc::new(InsecureCrypt::new())))
1573 .await
1574 .expect("fsck_volume failed");
1575
1576 let store = root_volume(fs.clone())
1577 .await
1578 .expect("root_volume failed")
1579 .volume(
1580 "test",
1581 StoreOptions {
1582 crypt: Some(Arc::new(InsecureCrypt::new())),
1583 ..StoreOptions::default()
1584 },
1585 )
1586 .await
1587 .expect("volume failed");
1588 ObjectStore::open_object(&store, object_id, HandleOptions::default(), None)
1590 .await
1591 .expect("open_object failed");
1592
1593 if check_test_file {
1595 info!("Checking test file for modification");
1596 let test_file = ObjectStore::open_object(
1597 &store,
1598 test_file_object_id,
1599 HandleOptions::default(),
1600 None,
1601 )
1602 .await
1603 .expect("open_object failed");
1604 let mut buffer = test_file.allocate_buffer(4096).await;
1605 assert_eq!(
1606 test_file.read(0, buffer.as_mut()).await.expect("read failed"),
1607 4096
1608 );
1609 let expected = [0x37; 4096];
1610 assert_eq!(buffer.as_slice(), &expected);
1611 }
1612 }
1613
1614 fs.close().await.expect("close failed");
1615 }
1616 }
1617
1618 #[fuchsia::test]
1619 async fn test_barrier_not_emitted_when_transaction_has_no_data() {
1620 let barrier_count = Arc::new(AtomicU32::new(0));
1621
1622 struct Observer(Arc<AtomicU32>);
1623
1624 impl fake_device::Observer for Observer {
1625 fn barrier(&self) {
1626 self.0.fetch_add(1, atomic::Ordering::Relaxed);
1627 }
1628 }
1629
1630 let mut fake_device = FakeDevice::new(8192, 4096);
1631 fake_device.set_observer(Box::new(Observer(barrier_count.clone())));
1632 let device = DeviceHolder::new(fake_device);
1633 let fs = FxFilesystemBuilder::new()
1634 .barriers_enabled(true)
1635 .format(true)
1636 .open(device)
1637 .await
1638 .expect("new filesystem failed");
1639
1640 {
1641 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
1642 root_vol
1643 .new_volume(
1644 "test",
1645 NewChildStoreOptions {
1646 options: StoreOptions {
1647 crypt: Some(Arc::new(InsecureCrypt::new())),
1648 ..StoreOptions::default()
1649 },
1650 ..NewChildStoreOptions::default()
1651 },
1652 )
1653 .await
1654 .expect("there is no test volume");
1655 fs.close().await.expect("close failed");
1656 }
1657 let device = fs.take_device().await;
1660 device.reopen(false);
1661 let fs = FxFilesystemBuilder::new()
1662 .barriers_enabled(true)
1663 .open(device)
1664 .await
1665 .expect("new filesystem failed");
1666 let expected_barrier_count = barrier_count.load(atomic::Ordering::Relaxed);
1667
1668 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
1669 let store = root_vol
1670 .volume(
1671 "test",
1672 StoreOptions {
1673 crypt: Some(Arc::new(InsecureCrypt::new())),
1674 ..StoreOptions::default()
1675 },
1676 )
1677 .await
1678 .expect("there is no test volume");
1679
1680 let fs = store.filesystem();
1682 let root_directory =
1683 Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
1684 for i in 0..100 {
1685 let mut transaction = fs
1686 .clone()
1687 .new_transaction(
1688 lock_keys![LockKey::object(
1689 store.store_object_id(),
1690 store.root_directory_object_id()
1691 )],
1692 Options::default(),
1693 )
1694 .await
1695 .expect("new_transaction failed");
1696 root_directory
1697 .create_child_file(&mut transaction, &format!("A {i}"))
1698 .await
1699 .expect("create_child_file failed");
1700 transaction.commit().await.expect("commit failed");
1701 }
1702
1703 fs.close().await.expect("close failed");
1705 assert_eq!(expected_barrier_count, barrier_count.load(atomic::Ordering::Relaxed));
1707 }
1708
1709 #[fuchsia::test]
1710 async fn test_barrier_emitted_when_transaction_includes_data() {
1711 let barrier_count = Arc::new(AtomicU32::new(0));
1712
1713 struct Observer(Arc<AtomicU32>);
1714
1715 impl fake_device::Observer for Observer {
1716 fn barrier(&self) {
1717 self.0.fetch_add(1, atomic::Ordering::Relaxed);
1718 }
1719 }
1720
1721 let mut fake_device = FakeDevice::new(8192, 4096);
1722 fake_device.set_observer(Box::new(Observer(barrier_count.clone())));
1723 let device = DeviceHolder::new(fake_device);
1724 let fs = FxFilesystemBuilder::new()
1725 .barriers_enabled(true)
1726 .format(true)
1727 .open(device)
1728 .await
1729 .expect("new filesystem failed");
1730
1731 {
1732 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
1733 root_vol
1734 .new_volume(
1735 "test",
1736 NewChildStoreOptions {
1737 options: StoreOptions {
1738 crypt: Some(Arc::new(InsecureCrypt::new())),
1739 ..StoreOptions::default()
1740 },
1741 ..NewChildStoreOptions::default()
1742 },
1743 )
1744 .await
1745 .expect("there is no test volume");
1746 fs.close().await.expect("close failed");
1747 }
1748 let device = fs.take_device().await;
1751 device.reopen(false);
1752 let fs = FxFilesystemBuilder::new()
1753 .barriers_enabled(true)
1754 .open(device)
1755 .await
1756 .expect("new filesystem failed");
1757 let expected_barrier_count = barrier_count.load(atomic::Ordering::Relaxed);
1758
1759 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
1760 let store = root_vol
1761 .volume(
1762 "test",
1763 StoreOptions {
1764 crypt: Some(Arc::new(InsecureCrypt::new())),
1765 ..StoreOptions::default()
1766 },
1767 )
1768 .await
1769 .expect("there is no test volume");
1770
1771 let fs: Arc<FxFilesystem> = store.filesystem();
1773 let root_directory =
1774 Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
1775
1776 let mut transaction = fs
1777 .clone()
1778 .new_transaction(
1779 lock_keys![LockKey::object(
1780 store.store_object_id(),
1781 store.root_directory_object_id()
1782 )],
1783 Options::default(),
1784 )
1785 .await
1786 .expect("new_transaction failed");
1787 let object = root_directory
1788 .create_child_file(&mut transaction, "test")
1789 .await
1790 .expect("create_child_file failed");
1791 transaction.commit().await.expect("commit failed");
1792
1793 let mut transaction = object.new_transaction().await.expect("new_transaction failed");
1794 let mut buffer = object.allocate_buffer(4096).await;
1795 buffer.as_mut_slice().fill(0xed);
1796 object.txn_write(&mut transaction, 0, buffer.as_ref()).await.expect("txn_write failed");
1797 transaction.commit().await.expect("commit failed");
1798
1799 fs.close().await.expect("close failed");
1801 assert!(expected_barrier_count < barrier_count.load(atomic::Ordering::Relaxed));
1803 }
1804
1805 #[fuchsia::test]
1806 async fn test_image_builder_mode_no_early_writes() {
1807 const BLOCK_SIZE: u32 = 4096;
1808 let device = DeviceHolder::new(FakeDevice::new(2048, BLOCK_SIZE));
1809 device.reopen(true);
1810 let fs = FxFilesystemBuilder::new()
1811 .format(true)
1812 .image_builder_mode(Some(SuperBlockInstance::A))
1813 .open(device)
1814 .await
1815 .expect("open failed");
1816 fs.close().await.expect("closed");
1819 }
1820
1821 #[fuchsia::test]
1822 async fn test_image_builder_mode() {
1823 const BLOCK_SIZE: u32 = 4096;
1824 const EXISTING_FILE_RANGE: Range<u64> = 4096 * 1024..4096 * 1025;
1825 let device = DeviceHolder::new(FakeDevice::new(2048, BLOCK_SIZE));
1826
1827 {
1829 let mut write_buf =
1830 device.allocate_buffer(EXISTING_FILE_RANGE.length().unwrap() as usize).await;
1831 write_buf.as_mut_slice().fill(0xf0);
1832 device.write(EXISTING_FILE_RANGE.start, write_buf.as_ref()).await.expect("write");
1833 }
1834
1835 device.reopen(true);
1836
1837 let device = {
1838 let fs = FxFilesystemBuilder::new()
1839 .format(true)
1840 .image_builder_mode(Some(SuperBlockInstance::B))
1841 .open(device)
1842 .await
1843 .expect("open failed");
1844 {
1845 let root_store = fs.root_store();
1846 let root_directory =
1847 Directory::open(&root_store, root_store.root_directory_object_id())
1848 .await
1849 .expect("open failed");
1850 let handle;
1852 {
1853 let mut transaction = fs
1854 .clone()
1855 .new_transaction(
1856 lock_keys![LockKey::object(
1857 root_directory.store().store_object_id(),
1858 root_directory.object_id()
1859 )],
1860 Options::default(),
1861 )
1862 .await
1863 .expect("new transaction");
1864 handle = root_directory
1865 .create_child_file(&mut transaction, "test")
1866 .await
1867 .expect("create file");
1868 handle.extend(&mut transaction, EXISTING_FILE_RANGE).await.expect("extend");
1869 transaction.commit().await.expect("commit");
1870 }
1871 }
1872 fs.device().reopen(false);
1873 fs.finalize().await.expect("finalize");
1874 fs.close().await.expect("close");
1875 fs.take_device().await
1876 };
1877 device.reopen(false);
1878 let fs = FxFilesystem::open(device).await.expect("open failed");
1879 fsck(fs.clone()).await.expect("fsck failed");
1880
1881 let root_store = fs.root_store();
1883 let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
1884 .await
1885 .expect("open failed");
1886 let (object_id, descriptor, _) =
1887 root_directory.lookup("test").await.expect("lookup failed").unwrap();
1888 assert_eq!(descriptor, ObjectDescriptor::File);
1889 let test_file =
1890 ObjectStore::open_object(&root_store, object_id, HandleOptions::default(), None)
1891 .await
1892 .expect("open failed");
1893 let mut read_buf =
1894 test_file.allocate_buffer(EXISTING_FILE_RANGE.length().unwrap() as usize).await;
1895 test_file.read(0, read_buf.as_mut()).await.expect("read failed");
1896 assert_eq!(read_buf.as_slice(), [0xf0; 4096]);
1897 fs.close().await.expect("closed");
1898 }
1899}