1use crate::errors::FxfsError;
6use crate::fsck::{FsckOptions, fsck_volume_with_options, fsck_with_options};
7use crate::log::*;
8use crate::metrics;
9use crate::object_store::allocator::{Allocator, Hold, Reservation};
10use crate::object_store::directory::Directory;
11use crate::object_store::graveyard::Graveyard;
12use crate::object_store::journal::super_block::{SuperBlockHeader, SuperBlockInstance};
13use crate::object_store::journal::{self, Journal, JournalCheckpoint, JournalOptions};
14use crate::object_store::object_manager::ObjectManager;
15use crate::object_store::transaction::{
16 self, AssocObj, LockKey, LockKeys, LockManager, MetadataReservation, Mutation, ReadGuard,
17 TRANSACTION_METADATA_MAX_AMOUNT, Transaction, WriteGuard, lock_keys,
18};
19use crate::object_store::volume::{VOLUMES_DIRECTORY, root_volume};
20use crate::object_store::{NewChildStoreOptions, ObjectStore, StoreOptions};
21use crate::range::RangeExt;
22use crate::serialized_types::{LATEST_VERSION, Version};
23use anyhow::{Context, Error, anyhow, bail};
24use async_trait::async_trait;
25use event_listener::Event;
26use fuchsia_async as fasync;
27use fuchsia_async::condition::Condition;
28use fuchsia_inspect::{Inspector, LazyNode, NumericProperty as _, UintProperty};
29use fuchsia_sync::Mutex;
30use futures::{FutureExt, Stream};
31use fxfs_crypto::Crypt;
32use fxfs_trace::{TraceFutureExt, trace_future_args};
33use static_assertions::const_assert;
34use std::pin::pin;
35use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
36use std::sync::{Arc, OnceLock, Weak};
37use std::task::Poll;
38use std::time::{Duration, Instant};
39use storage_device::{Device, DeviceHolder};
40
41pub const MIN_BLOCK_SIZE: u64 = 4096;
42pub const MAX_BLOCK_SIZE: u64 = u16::MAX as u64 + 1;
43
44pub const MAX_FILE_SIZE: u64 = i64::MAX as u64 - 4095;
49const_assert!(9223372036854771712 == MAX_FILE_SIZE);
50
51use futures::stream::StreamExt;
52
53const MAX_IN_FLIGHT_TRANSACTIONS: u64 = 4;
55
56const TRIM_AFTER_BOOT_TIMER: Duration = Duration::from_secs(60 * 60);
60
61const TRIM_INTERVAL_TIMER: Duration = Duration::from_secs(60 * 60 * 24);
63
64const CLEAN_TRANSFER_BUFFER_INTERVAL: Duration = Duration::from_secs(60);
67
68#[cfg(target_os = "fuchsia")]
69pub type WakeLease = zx::NullableHandle;
70
71#[cfg(not(target_os = "fuchsia"))]
72pub type WakeLease = fasync::emulated_handle::Handle;
73
74pub trait PowerManager: Send + Sync {
75 fn watch_battery(self: Arc<Self>) -> futures::stream::BoxStream<'static, (bool, WakeLease)>;
77}
78
79pub struct Info {
81 pub total_bytes: u64,
82 pub used_bytes: u64,
83}
84
85pub type PostCommitHook =
86 Option<Box<dyn Fn() -> futures::future::BoxFuture<'static, ()> + Send + Sync>>;
87
88pub type PreCommitHook = Option<Box<dyn Fn(&Transaction<'_>) -> Result<(), Error> + Send + Sync>>;
89
90pub struct Options {
91 pub read_only: bool,
93
94 pub roll_metadata_key_byte_count: u64,
98
99 pub pre_commit_hook: PreCommitHook,
102
103 pub post_commit_hook: PostCommitHook,
106
107 pub skip_initial_reap: bool,
110
111 pub trim_config: Option<(Duration, Duration)>,
116
117 pub image_builder_mode: Option<SuperBlockInstance>,
120
121 pub inline_crypto_enabled: bool,
128
129 pub barriers_enabled: bool,
134
135 pub power_manager: Option<Arc<dyn PowerManager>>,
137
138 pub trim_charger_wait: Duration,
140}
141
142impl Default for Options {
143 fn default() -> Self {
144 Options {
145 roll_metadata_key_byte_count: 128 * 1024 * 1024,
146 read_only: false,
147 pre_commit_hook: None,
148 post_commit_hook: None,
149 skip_initial_reap: false,
150 trim_config: Some((TRIM_AFTER_BOOT_TIMER, TRIM_INTERVAL_TIMER)),
151 image_builder_mode: None,
152 inline_crypto_enabled: false,
153 barriers_enabled: false,
154 power_manager: None,
155 trim_charger_wait: Duration::from_secs(10),
156 }
157 }
158}
159
160pub struct ApplyContext<'a, 'b> {
162 pub mode: ApplyMode<'a, 'b>,
164
165 pub checkpoint: JournalCheckpoint,
167}
168
169pub enum ApplyMode<'a, 'b> {
172 Replay,
173 Live(&'a Transaction<'b>),
174}
175
176impl ApplyMode<'_, '_> {
177 pub fn is_replay(&self) -> bool {
178 matches!(self, ApplyMode::Replay)
179 }
180
181 pub fn is_live(&self) -> bool {
182 matches!(self, ApplyMode::Live(_))
183 }
184}
185
186#[async_trait]
189pub trait JournalingObject: Send + Sync {
190 fn apply_mutation(
194 &self,
195 mutation: Mutation,
196 context: &ApplyContext<'_, '_>,
197 assoc_obj: AssocObj<'_>,
198 ) -> Result<(), Error>;
199
200 fn drop_mutation(&self, mutation: Mutation, transaction: &Transaction<'_>);
202
203 async fn flush(&self) -> Result<Version, Error>;
207
208 fn write_mutation(&self, mutation: &Mutation, mut writer: journal::Writer<'_>) {
211 writer.write(mutation.clone());
212 }
213}
214
215#[derive(Default)]
216pub struct SyncOptions<'a> {
217 pub flush_device: bool,
224
225 pub precondition: Option<Box<dyn FnOnce() -> bool + 'a + Send>>,
228}
229
230pub struct OpenFxFilesystem(Arc<FxFilesystem>);
231
232impl OpenFxFilesystem {
233 pub async fn take_device(self) -> DeviceHolder {
236 let fut = self.device.take_when_dropped();
237 std::mem::drop(self);
238 debug_assert_not_too_long!(fut)
239 }
240}
241
242impl From<Arc<FxFilesystem>> for OpenFxFilesystem {
243 fn from(fs: Arc<FxFilesystem>) -> Self {
244 Self(fs)
245 }
246}
247
248impl Drop for OpenFxFilesystem {
249 fn drop(&mut self) {
250 if self.options.image_builder_mode.is_some()
251 && self.journal().image_builder_mode().is_some()
252 {
253 error!("OpenFxFilesystem in image_builder_mode dropped without calling close().");
254 }
255 if !self.options.read_only && !self.closed.load(Ordering::SeqCst) {
256 error!("OpenFxFilesystem dropped without first being closed. Data loss may occur.");
257 }
258 }
259}
260
261impl std::ops::Deref for OpenFxFilesystem {
262 type Target = Arc<FxFilesystem>;
263
264 fn deref(&self) -> &Self::Target {
265 &self.0
266 }
267}
268
269pub struct FxFilesystemBuilder {
270 format: bool,
271 trace: bool,
272 options: Options,
273 journal_options: JournalOptions,
274 on_new_allocator: Option<Box<dyn Fn(Arc<Allocator>) + Send + Sync>>,
275 on_new_store: Option<Box<dyn Fn(&ObjectStore) + Send + Sync>>,
276 fsck_after_every_transaction: bool,
277}
278
279impl FxFilesystemBuilder {
280 pub fn new() -> Self {
281 Self {
282 format: false,
283 trace: false,
284 options: Options::default(),
285 journal_options: JournalOptions::default(),
286 on_new_allocator: None,
287 on_new_store: None,
288 fsck_after_every_transaction: false,
289 }
290 }
291
292 pub fn format(mut self, format: bool) -> Self {
294 self.format = format;
295 self
296 }
297
298 pub fn trace(mut self, trace: bool) -> Self {
300 self.trace = trace;
301 self
302 }
303
304 pub fn read_only(mut self, read_only: bool) -> Self {
307 self.options.read_only = read_only;
308 self
309 }
310
311 pub fn image_builder_mode(mut self, mode: Option<SuperBlockInstance>) -> Self {
318 self.options.image_builder_mode = mode;
319 self
320 }
321
322 pub fn roll_metadata_key_byte_count(mut self, roll_metadata_key_byte_count: u64) -> Self {
324 self.options.roll_metadata_key_byte_count = roll_metadata_key_byte_count;
325 self
326 }
327
328 pub fn pre_commit_hook(
330 mut self,
331 hook: impl Fn(&Transaction<'_>) -> Result<(), Error> + Send + Sync + 'static,
332 ) -> Self {
333 self.options.pre_commit_hook = Some(Box::new(hook));
334 self
335 }
336
337 pub fn post_commit_hook(
340 mut self,
341 hook: impl Fn() -> futures::future::BoxFuture<'static, ()> + Send + Sync + 'static,
342 ) -> Self {
343 self.options.post_commit_hook = Some(Box::new(hook));
344 self
345 }
346
347 pub fn skip_initial_reap(mut self, skip_initial_reap: bool) -> Self {
350 self.options.skip_initial_reap = skip_initial_reap;
351 self
352 }
353
354 pub fn journal_options(mut self, journal_options: JournalOptions) -> Self {
356 self.journal_options = journal_options;
357 self
358 }
359
360 pub fn on_new_allocator(
362 mut self,
363 on_new_allocator: impl Fn(Arc<Allocator>) + Send + Sync + 'static,
364 ) -> Self {
365 self.on_new_allocator = Some(Box::new(on_new_allocator));
366 self
367 }
368
369 pub fn on_new_store(
371 mut self,
372 on_new_store: impl Fn(&ObjectStore) + Send + Sync + 'static,
373 ) -> Self {
374 self.on_new_store = Some(Box::new(on_new_store));
375 self
376 }
377
378 pub fn fsck_after_every_transaction(mut self, fsck_after_every_transaction: bool) -> Self {
380 self.fsck_after_every_transaction = fsck_after_every_transaction;
381 self
382 }
383
384 pub fn trim_config(mut self, delay_and_interval: Option<(Duration, Duration)>) -> Self {
385 self.options.trim_config = delay_and_interval;
386 self
387 }
388
389 pub fn power_manager(mut self, power_manager: Arc<dyn PowerManager>) -> Self {
390 self.options.power_manager = Some(power_manager);
391 self
392 }
393
394 pub fn trim_charger_wait(mut self, wait: Duration) -> Self {
395 self.options.trim_charger_wait = wait;
396 self
397 }
398
399 pub fn inline_crypto_enabled(mut self, inline_crypto_enabled: bool) -> Self {
401 self.options.inline_crypto_enabled = inline_crypto_enabled;
402 self
403 }
404
405 pub fn barriers_enabled(mut self, barriers_enabled: bool) -> Self {
408 self.options.barriers_enabled = barriers_enabled;
409 self.journal_options.barriers_enabled = barriers_enabled;
410 self
411 }
412
413 pub async fn open(self, device: DeviceHolder) -> Result<OpenFxFilesystem, Error> {
415 let read_only = self.options.read_only;
416 if self.format && read_only {
417 bail!("Cannot initialize a filesystem as read-only");
418 }
419
420 if self.options.inline_crypto_enabled && !self.options.barriers_enabled {
422 bail!("A filesystem using inline encryption requires barriers");
423 }
424
425 let objects = Arc::new(ObjectManager::new(self.on_new_store));
426 let journal = Arc::new(Journal::new(objects.clone(), self.journal_options));
427
428 let image_builder_mode = self.options.image_builder_mode;
429
430 let block_size = std::cmp::max(device.block_size().into(), MIN_BLOCK_SIZE);
431 assert_eq!(block_size % MIN_BLOCK_SIZE, 0);
432 assert!(block_size <= MAX_BLOCK_SIZE, "Max supported block size is 64KiB");
433
434 let mut fsck_after_every_transaction = None;
435 let mut filesystem_options = self.options;
436 if self.fsck_after_every_transaction {
437 let instance =
438 FsckAfterEveryTransaction::new(filesystem_options.post_commit_hook.take());
439 fsck_after_every_transaction = Some(instance.clone());
440 filesystem_options.post_commit_hook =
441 Some(Box::new(move || Box::pin(instance.clone().run())));
442 }
443
444 if !read_only && !self.format {
445 device.flush().await.context("Device flush failed")?;
448 }
449
450 let filesystem = Arc::new_cyclic(|weak: &Weak<FxFilesystem>| {
451 let weak = weak.clone();
452 FxFilesystem {
453 device,
454 block_size,
455 objects: objects.clone(),
456 journal,
457 commit_mutex: futures::lock::Mutex::new(()),
458 lock_manager: LockManager::new(),
459 flush_task: Mutex::new(None),
460 background_tasks: fasync::Scope::new(),
461 closed: AtomicBool::new(true),
462 trace: self.trace,
463 graveyard: Graveyard::new(objects.clone()),
464 completed_transactions: metrics::detail().create_uint("completed_transactions", 0),
465 options: filesystem_options,
466 in_flight_transactions: AtomicU64::new(0),
467 transaction_limit_event: Event::new(),
468 _stores_node: metrics::register_fs(move || {
469 let weak = weak.clone();
470 Box::pin(async move {
471 if let Some(fs) = weak.upgrade() {
472 fs.populate_stores_node().await
473 } else {
474 Err(anyhow!("Filesystem has been dropped"))
475 }
476 })
477 }),
478 }
479 });
480
481 filesystem.journal().set_image_builder_mode(image_builder_mode);
482
483 filesystem.journal.set_trace(self.trace);
484 if self.format {
485 filesystem.journal.init_empty(filesystem.clone()).await?;
486 if image_builder_mode.is_none() {
487 filesystem.journal.init_superblocks().await?;
490
491 filesystem.graveyard.clone().reap_async();
493 }
494
495 let root_store = filesystem.root_store();
497 root_store.set_trace(self.trace);
498 let root_directory =
499 Directory::open(&root_store, root_store.root_directory_object_id())
500 .await
501 .context("Unable to open root volume directory")?;
502 let mut transaction = filesystem
503 .clone()
504 .new_transaction(
505 lock_keys![LockKey::object(
506 root_store.store_object_id(),
507 root_directory.object_id()
508 )],
509 transaction::Options::default(),
510 )
511 .await?;
512 let volume_directory =
513 root_directory.create_child_dir(&mut transaction, VOLUMES_DIRECTORY).await?;
514 transaction.commit().await?;
515 objects.set_volume_directory(volume_directory);
516 } else {
517 filesystem
518 .journal
519 .replay(filesystem.clone(), self.on_new_allocator)
520 .await
521 .context("Journal replay failed")?;
522 filesystem.root_store().set_trace(self.trace);
523
524 if !read_only {
525 for store in objects.unlocked_stores() {
529 filesystem.graveyard.initial_reap(&store).await?;
530 }
531 }
532 }
533
534 if let Some(fsck_after_every_transaction) = fsck_after_every_transaction {
536 fsck_after_every_transaction
537 .fs
538 .set(Arc::downgrade(&filesystem))
539 .unwrap_or_else(|_| unreachable!());
540 }
541
542 filesystem.closed.store(false, Ordering::SeqCst);
543
544 if !read_only && image_builder_mode.is_none() {
545 filesystem.graveyard.clone().reap_async();
547
548 if filesystem.options.trim_config.is_some() {
549 filesystem.start_trim_task();
550 }
551 filesystem.start_clean_transfer_buffer_task();
552 }
553
554 Ok(filesystem.into())
555 }
556}
557
558pub struct FxFilesystem {
559 block_size: u64,
560 objects: Arc<ObjectManager>,
561 journal: Arc<Journal>,
562 commit_mutex: futures::lock::Mutex<()>,
563 lock_manager: LockManager,
564 flush_task: Mutex<Option<fasync::Task<()>>>,
565 background_tasks: fasync::Scope,
566 closed: AtomicBool,
567 trace: bool,
569 graveyard: Arc<Graveyard>,
570 completed_transactions: UintProperty,
571 options: Options,
572
573 in_flight_transactions: AtomicU64,
575
576 transaction_limit_event: Event,
579
580 device: DeviceHolder,
583
584 _stores_node: LazyNode,
586}
587
588#[fxfs_trace::trace]
589impl FxFilesystem {
590 pub async fn new_empty(device: DeviceHolder) -> Result<OpenFxFilesystem, Error> {
591 FxFilesystemBuilder::new().format(true).open(device).await
592 }
593
594 pub async fn open(device: DeviceHolder) -> Result<OpenFxFilesystem, Error> {
595 FxFilesystemBuilder::new().open(device).await
596 }
597
598 pub fn root_parent_store(&self) -> Arc<ObjectStore> {
599 self.objects.root_parent_store()
600 }
601
602 pub async fn close(&self) -> Result<(), Error> {
603 if self.journal().image_builder_mode().is_some() {
604 self.journal().allocate_journal().await?;
605 self.journal().set_image_builder_mode(None);
606 self.journal().force_compact().await?;
607 }
608 assert_eq!(self.closed.swap(true, Ordering::SeqCst), false);
609 debug_assert_not_too_long!(self.graveyard.wait_for_reap());
610 debug_assert_not_too_long!(self.background_tasks.clone().cancel());
611 self.journal.stop_compactions().await;
612 let sync_status =
613 if self.journal().image_builder_mode().is_some() || self.options().read_only {
614 Ok(None)
615 } else {
616 self.journal.sync(SyncOptions { flush_device: true, ..Default::default() }).await
617 };
618 match &sync_status {
619 Ok(None) => {}
620 Ok(checkpoint) => info!(
621 "Filesystem closed (checkpoint={}, metadata_reservation={:?}, \
622 reservation_required={}, borrowed={})",
623 checkpoint.as_ref().unwrap().0.file_offset,
624 self.object_manager().metadata_reservation(),
625 self.object_manager().required_reservation(),
626 self.object_manager().borrowed_metadata_space(),
627 ),
628 Err(e) => error!(error:? = e; "Failed to sync filesystem; data may be lost"),
629 }
630 self.journal.terminate();
631 let flush_task = self.flush_task.lock().take();
632 if let Some(task) = flush_task {
633 debug_assert_not_too_long!(task);
634 }
635 self.device().close().await.context("Failed to close device")?;
638 sync_status.map(|_| ())
639 }
640
641 pub fn device(&self) -> Arc<dyn Device> {
642 Arc::clone(&self.device)
643 }
644
645 pub fn root_store(&self) -> Arc<ObjectStore> {
646 self.objects.root_store()
647 }
648
649 pub fn allocator(&self) -> Arc<Allocator> {
650 self.objects.allocator()
651 }
652
653 pub fn enable_allocations(&self) {
657 self.allocator().enable_allocations();
658 }
659
660 pub fn object_manager(&self) -> &Arc<ObjectManager> {
661 &self.objects
662 }
663
664 pub fn journal(&self) -> &Arc<Journal> {
665 &self.journal
666 }
667
668 pub async fn sync(&self, options: SyncOptions<'_>) -> Result<(), Error> {
669 self.journal.sync(options).await.map(|_| ())
670 }
671
672 pub fn block_size(&self) -> u64 {
673 self.block_size
674 }
675
676 pub fn get_info(&self) -> Info {
677 Info {
678 total_bytes: self.device.size(),
679 used_bytes: self.object_manager().allocator().get_used_bytes().0,
680 }
681 }
682
683 pub fn super_block_header(&self) -> SuperBlockHeader {
684 self.journal.super_block_header()
685 }
686
687 pub fn graveyard(&self) -> &Arc<Graveyard> {
688 &self.graveyard
689 }
690
691 pub fn trace(&self) -> bool {
692 self.trace
693 }
694
695 pub fn options(&self) -> &Options {
696 &self.options
697 }
698
699 pub async fn txn_guard(self: Arc<Self>) -> TxnGuard<'static> {
706 TxnGuard::Owned(
707 self.lock_manager
708 .read_lock(lock_keys!(LockKey::Filesystem))
709 .await
710 .into_owned(self.clone()),
711 )
712 }
713
714 pub async fn new_transaction<'a>(
715 self: Arc<Self>,
716 locks: LockKeys,
717 options: transaction::Options<'a>,
718 ) -> Result<Transaction<'a>, Error> {
719 let guard = if let Some(guard) = options.txn_guard.as_ref() {
720 TxnGuard::Borrowed(guard)
721 } else {
722 self.txn_guard().await
723 };
724 Transaction::new(guard, options, locks).await
725 }
726
727 #[trace]
728 pub async fn commit_transaction(
729 &self,
730 transaction: &mut Transaction<'_>,
731 callback: &mut (dyn FnMut(u64) + Send),
732 ) -> Result<u64, Error> {
733 if let Some(hook) = self.options.pre_commit_hook.as_ref() {
734 hook(transaction)?;
735 }
736 debug_assert_not_too_long!(self.lock_manager.commit_prepare(&transaction));
737 self.maybe_start_flush_task();
738 let _guard = debug_assert_not_too_long!(self.commit_mutex.lock());
739 let journal_offset = if self.journal().image_builder_mode().is_some() {
740 let journal_checkpoint =
741 JournalCheckpoint { file_offset: 0, checksum: 0, version: LATEST_VERSION };
742 let maybe_mutation = self
743 .object_manager()
744 .apply_transaction(transaction, &journal_checkpoint)
745 .expect("Transactions must not fail in image_builder_mode");
746 if let Some(mutation) = maybe_mutation {
747 assert!(matches!(mutation, Mutation::UpdateBorrowed(_)));
748 }
752 self.object_manager().did_commit_transaction(transaction, &journal_checkpoint, 0);
753 0
754 } else {
755 self.journal.commit(transaction).await?
756 };
757 self.completed_transactions.add(1);
758
759 callback(journal_offset);
764
765 if let Some(hook) = self.options.post_commit_hook.as_ref() {
766 hook().await;
767 }
768
769 Ok(journal_offset)
770 }
771
772 pub fn lock_manager(&self) -> &LockManager {
773 &self.lock_manager
774 }
775
776 pub(crate) fn drop_transaction(&self, transaction: &mut Transaction<'_>) {
777 if !matches!(transaction.metadata_reservation, MetadataReservation::None) {
778 self.sub_transaction();
779 }
780 if let MetadataReservation::Hold(hold_amount) =
782 std::mem::replace(&mut transaction.metadata_reservation, MetadataReservation::None)
783 {
784 let hold = transaction
785 .allocator_reservation
786 .unwrap()
787 .reserve(0)
788 .expect("Zero should always succeed.");
789 hold.add(hold_amount);
790 }
791 self.objects.drop_transaction(transaction);
792 self.lock_manager.drop_transaction(transaction);
793 }
794
795 fn maybe_start_flush_task(&self) {
796 if self.journal.image_builder_mode().is_some() {
797 return;
798 }
799 let mut flush_task = self.flush_task.lock();
800 if flush_task.is_none() {
801 let journal = self.journal.clone();
802 *flush_task = Some(fasync::Task::spawn(
803 journal.flush_task().trace(trace_future_args!("Journal::flush_task")),
804 ));
805 }
806 }
807
808 fn start_trim_task(self: &Arc<Self>) {
809 if !self.device.supports_trim() {
810 info!("Device does not support trim; not scheduling trimming");
811 return;
812 }
813 let this = self.clone();
814 self.background_tasks
815 .spawn(this.trim_task().trace(trace_future_args!("Filesystem::trim_task")));
816 }
817
818 async fn trim_task(self: Arc<Self>) {
819 let Some((mut next_timer, _)) = self.options.trim_config else { return };
821 loop {
822 fasync::Timer::new(next_timer.clone()).await;
823
824 let start = Instant::now();
827 let result = if let Some(pm) = &self.options.power_manager {
828 let mut watcher = pm.clone().watch_battery();
829
830 let pauser = Pauser::new(self.options.trim_charger_wait);
832
833 let mut pause_future = pin!(
834 async {
835 let mut wake_lease = WakeLease::invalid();
836 loop {
837 let Some((using_battery, new_lease)) = watcher.next_latest().await
838 else {
839 pauser.set_pause(false);
842 drop(wake_lease); return;
844 };
845
846 pauser.set_pause(using_battery);
848
849 if using_battery {
852 wake_lease = WakeLease::invalid();
853 } else if !new_lease.is_invalid() {
854 wake_lease = new_lease;
855 }
856 }
857 }
858 .fuse()
859 );
860
861 let mut do_trim = pin!(self.do_trim(Some(&pauser)).fuse());
862
863 loop {
864 futures::select! {
865 _ = pause_future => {}
866 result = do_trim => break result,
867 }
868 }
869
870 } else {
873 self.do_trim(None).await
874 };
875
876 let duration = start.elapsed();
877 match result {
878 Ok(bytes_trimmed) => info!(
879 "Trimmed {bytes_trimmed} bytes in {duration:?}. Next trim in \
880 {next_timer:?}",
881 ),
882 Err(error) => error!(error:?; "Failed to trim"),
883 }
884
885 let Some((_, interval)) = self.options.trim_config else { return };
886 next_timer = interval;
887 if next_timer.is_zero() {
888 fasync::yield_now().await;
889 }
890 }
891 }
892
893 async fn do_trim(&self, pauser: Option<&Pauser>) -> Result<usize, Error> {
895 const MAX_EXTENTS_PER_BATCH: usize = 8;
896 const MAX_EXTENT_SIZE: usize = 256 * 1024;
897 let mut offset = 0;
898 let mut bytes_trimmed = 0;
899 loop {
900 let allocator = self.allocator();
901 if let Some(pauser) = pauser {
902 pauser.maybe_pause().await;
903 }
904 let trimmable_extents =
905 allocator.take_for_trimming(offset, MAX_EXTENT_SIZE, MAX_EXTENTS_PER_BATCH).await?;
906 for device_range in trimmable_extents.extents() {
907 self.device.trim(device_range.clone()).await?;
908 bytes_trimmed += device_range.length()? as usize;
909 }
910 if let Some(device_range) = trimmable_extents.extents().last() {
911 offset = device_range.end;
912 } else {
913 break;
914 }
915 }
916 Ok(bytes_trimmed)
917 }
918
919 fn start_clean_transfer_buffer_task(self: &Arc<Self>) {
920 let this = self.clone();
921 self.background_tasks.spawn(
922 async move {
923 loop {
924 fasync::Timer::new(CLEAN_TRANSFER_BUFFER_INTERVAL).await;
925 this.device().clean_transfer_buffer();
926 }
927 }
928 .trace(trace_future_args!("Filesystem::clean_transfer_buffer_task")),
929 );
930 }
931
932 pub(crate) async fn reservation_for_transaction<'a>(
933 self: &Arc<Self>,
934 options: transaction::Options<'a>,
935 ) -> Result<(MetadataReservation, Option<&'a Reservation>, Option<Hold<'a>>), Error> {
936 if self.options.image_builder_mode.is_some() {
937 return Ok((MetadataReservation::Borrowed, None, None));
940 }
941 if !options.skip_journal_checks {
942 self.maybe_start_flush_task();
943 self.journal.check_journal_space().await?;
944 }
945
946 let mut hold = None;
960 let metadata_reservation = if options.borrow_metadata_space {
961 MetadataReservation::Borrowed
962 } else {
963 match options.allocator_reservation {
964 Some(reservation) => {
965 hold = Some(
966 reservation
967 .reserve(TRANSACTION_METADATA_MAX_AMOUNT)
968 .ok_or(FxfsError::NoSpace)?,
969 );
970 MetadataReservation::Hold(TRANSACTION_METADATA_MAX_AMOUNT)
971 }
972 None => {
973 let reservation = self
974 .allocator()
975 .reserve(None, TRANSACTION_METADATA_MAX_AMOUNT)
976 .ok_or(FxfsError::NoSpace)?;
977 MetadataReservation::Reservation(reservation)
978 }
979 }
980 };
981 Ok((metadata_reservation, options.allocator_reservation, hold))
982 }
983
984 pub(crate) async fn add_transaction(&self, skip_journal_checks: bool) {
985 if skip_journal_checks {
986 self.in_flight_transactions.fetch_add(1, Ordering::Relaxed);
987 } else {
988 let inc = || {
989 let mut in_flights = self.in_flight_transactions.load(Ordering::Relaxed);
990 while in_flights < MAX_IN_FLIGHT_TRANSACTIONS {
991 match self.in_flight_transactions.compare_exchange_weak(
992 in_flights,
993 in_flights + 1,
994 Ordering::Relaxed,
995 Ordering::Relaxed,
996 ) {
997 Ok(_) => return true,
998 Err(x) => in_flights = x,
999 }
1000 }
1001 return false;
1002 };
1003 while !inc() {
1004 let listener = self.transaction_limit_event.listen();
1005 if inc() {
1006 break;
1007 }
1008 listener.await;
1009 }
1010 }
1011 }
1012
1013 pub(crate) fn sub_transaction(&self) {
1014 let old = self.in_flight_transactions.fetch_sub(1, Ordering::Relaxed);
1015 assert!(old != 0);
1016 if old <= MAX_IN_FLIGHT_TRANSACTIONS {
1017 self.transaction_limit_event.notify(usize::MAX);
1018 }
1019 }
1020
1021 pub async fn truncate_guard(&self, store_id: u64, object_id: u64) -> TruncateGuard<'_> {
1022 let keys = lock_keys![LockKey::truncate(store_id, object_id,)];
1023 TruncateGuard(self.lock_manager().write_lock(keys).await)
1024 }
1025
1026 async fn populate_stores_node(&self) -> Result<Inspector, Error> {
1027 let inspector = fuchsia_inspect::Inspector::default();
1028 let root = inspector.root();
1029 root.record_child("__root", |n| self.root_store().record_data(n));
1030 root.record_child("__root_parent", |n| self.root_parent_store().record_data(n));
1031 let object_manager = self.object_manager();
1032 let volume_directory = object_manager.volume_directory();
1033 let layer_set = volume_directory.store().tree().layer_set();
1034 let mut merger = layer_set.merger();
1035 let mut iter = volume_directory.iter(&mut merger).await?;
1036 while let Some((name, id, _)) = iter.get() {
1037 if let Some(store) = object_manager.store(id) {
1038 root.record_child(name.to_string(), |n| store.record_data(n));
1039 }
1040 iter.advance().await?;
1041 }
1042 Ok(inspector)
1043 }
1044}
1045
1046pub enum TxnGuard<'a> {
1047 Borrowed(&'a TxnGuard<'a>),
1048 Owned(ReadGuard<'static>),
1049}
1050
1051impl TxnGuard<'_> {
1052 pub fn fs(&self) -> &Arc<FxFilesystem> {
1053 match self {
1054 TxnGuard::Borrowed(b) => b.fs(),
1055 TxnGuard::Owned(o) => o.fs().unwrap(),
1056 }
1057 }
1058}
1059
1060#[allow(dead_code)]
1062pub struct TruncateGuard<'a>(WriteGuard<'a>);
1063
1064pub async fn mkfs(device: DeviceHolder) -> Result<DeviceHolder, Error> {
1066 let fs = FxFilesystem::new_empty(device).await?;
1067 fs.close().await?;
1068 Ok(fs.take_device().await)
1069}
1070
1071pub async fn mkfs_with_volume(
1075 device: DeviceHolder,
1076 volume_name: &str,
1077 crypt: Option<Arc<dyn Crypt>>,
1078) -> Result<DeviceHolder, Error> {
1079 let fs = FxFilesystem::new_empty(device).await?;
1080 {
1081 let root_volume = root_volume(fs.clone()).await.expect("Open root_volume failed");
1084 root_volume
1085 .new_volume(
1086 volume_name,
1087 NewChildStoreOptions {
1088 options: StoreOptions { crypt, ..StoreOptions::default() },
1089 ..Default::default()
1090 },
1091 )
1092 .await
1093 .expect("Create volume failed");
1094 }
1095 fs.close().await?;
1096 Ok(fs.take_device().await)
1097}
1098
1099struct FsckAfterEveryTransaction {
1100 fs: OnceLock<Weak<FxFilesystem>>,
1101 old_hook: PostCommitHook,
1102}
1103
1104impl FsckAfterEveryTransaction {
1105 fn new(old_hook: PostCommitHook) -> Arc<Self> {
1106 Arc::new(Self { fs: OnceLock::new(), old_hook })
1107 }
1108
1109 async fn run(self: Arc<Self>) {
1110 if let Some(fs) = self.fs.get().and_then(Weak::upgrade) {
1111 let options = FsckOptions {
1112 fail_on_warning: true,
1113 no_lock: true,
1114 quiet: true,
1115 ..Default::default()
1116 };
1117 fsck_with_options(fs.clone(), &options).await.expect("fsck failed");
1118 let object_manager = fs.object_manager();
1119 for store in object_manager.unlocked_stores() {
1120 let store_id = store.store_object_id();
1121 if !object_manager.is_system_store(store_id) {
1122 fsck_volume_with_options(fs.as_ref(), &options, store_id, None)
1123 .await
1124 .expect("fsck_volume_with_options failed");
1125 }
1126 }
1127 }
1128 if let Some(old_hook) = self.old_hook.as_ref() {
1129 old_hook().await;
1130 }
1131 }
1132}
1133
1134struct Pauser {
1135 pause: Condition<bool>,
1136 bounce_delay: Duration,
1137}
1138
1139impl Pauser {
1140 fn new(bounce_delay: Duration) -> Self {
1142 Self { pause: Condition::new(true), bounce_delay }
1143 }
1144
1145 async fn maybe_pause(&self) {
1146 loop {
1147 if !*self.pause.lock() {
1148 return;
1149 }
1150 self.pause.when(|p| if *p { Poll::Pending } else { Poll::Ready(()) }).await;
1151 fasync::Timer::new(self.bounce_delay).await;
1152 }
1153 }
1154
1155 fn set_pause(&self, v: bool) {
1156 let mut guard = self.pause.lock();
1157 if *guard == v {
1158 return;
1159 }
1160 *guard = v;
1161 for waker in guard.drain_wakers() {
1162 waker.wake();
1163 }
1164 }
1165}
1166
1167trait NextLatest: Stream + Unpin {
1168 async fn next_latest(&mut self) -> Option<Self::Item> {
1170 let Some(mut next) = self.next().await else { return None };
1171
1172 loop {
1174 match self.next().now_or_never() {
1175 None => return Some(next),
1176 Some(None) => return None,
1177 Some(Some(n)) => next = n,
1178 }
1179 }
1180 }
1181}
1182
1183impl<T: ?Sized + Unpin> NextLatest for T where T: Stream {}
1184
1185#[cfg(test)]
1186mod tests {
1187 use super::{FxFilesystem, FxFilesystemBuilder, FxfsError, SyncOptions};
1188 use crate::fsck::{fsck, fsck_volume};
1189 use crate::log::*;
1190 use crate::lsm_tree::Operation;
1191 use crate::lsm_tree::types::Item;
1192 use crate::object_handle::{
1193 INVALID_OBJECT_ID, ObjectHandle, ReadObjectHandle, WriteObjectHandle,
1194 };
1195 use crate::object_store::directory::{Directory, replace_child};
1196 use crate::object_store::journal::JournalOptions;
1197 use crate::object_store::journal::super_block::SuperBlockInstance;
1198 use crate::object_store::transaction::{LockKey, Options, lock_keys};
1199 use crate::object_store::volume::root_volume;
1200 use crate::object_store::{
1201 HandleOptions, NewChildStoreOptions, ObjectDescriptor, ObjectStore, StoreOptions,
1202 };
1203 use crate::range::RangeExt;
1204 use fuchsia_async as fasync;
1205 use fuchsia_sync::Mutex;
1206 use futures::future::join_all;
1207 use futures::stream::{FuturesUnordered, TryStreamExt};
1208 use fxfs_insecure_crypto::new_insecure_crypt;
1209 use rustc_hash::FxHashMap as HashMap;
1210 use std::ops::Range;
1211 use std::sync::Arc;
1212 use std::sync::atomic::{AtomicU32, Ordering};
1213 use std::time::Duration;
1214 use storage_device::DeviceHolder;
1215 use storage_device::fake_device::{self, FakeDevice};
1216 use test_case::test_case;
1217
1218 const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
1219
1220 #[fuchsia::test(threads = 10)]
1221 async fn test_compaction() {
1222 let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1223
1224 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1226 let root_store = fs.root_store();
1227 let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
1228 .await
1229 .expect("open failed");
1230
1231 let mut tasks = Vec::new();
1232 for i in 0..2 {
1233 let mut transaction = fs
1234 .clone()
1235 .new_transaction(
1236 lock_keys![LockKey::object(
1237 root_store.store_object_id(),
1238 root_directory.object_id()
1239 )],
1240 Options::default(),
1241 )
1242 .await
1243 .expect("new_transaction failed");
1244 let handle = root_directory
1245 .create_child_file(&mut transaction, &format!("{}", i))
1246 .await
1247 .expect("create_child_file failed");
1248 transaction.commit().await.expect("commit failed");
1249 tasks.push(fasync::Task::spawn(async move {
1250 const TEST_DATA: &[u8] = b"hello";
1251 let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
1252 buf.as_mut_slice().copy_from_slice(TEST_DATA);
1253 for _ in 0..1500 {
1254 handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
1255 }
1256 }));
1257 }
1258 join_all(tasks).await;
1259 fs.sync(SyncOptions::default()).await.expect("sync failed");
1260
1261 fsck(fs.clone()).await.expect("fsck failed");
1262 fs.close().await.expect("Close failed");
1263 }
1264
1265 #[fuchsia::test]
1266 async fn test_enable_allocations() {
1267 {
1269 let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1270 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1271 fs.enable_allocations();
1272 let root_store = fs.root_store();
1273 let root_directory =
1274 Directory::open(&root_store, root_store.root_directory_object_id())
1275 .await
1276 .expect("open failed");
1277 let mut transaction = fs
1278 .clone()
1279 .new_transaction(
1280 lock_keys![LockKey::object(
1281 root_store.store_object_id(),
1282 root_directory.object_id()
1283 )],
1284 Options::default(),
1285 )
1286 .await
1287 .expect("new_transaction failed");
1288 root_directory
1289 .create_child_file(&mut transaction, "test")
1290 .await
1291 .expect("create_child_file failed");
1292 transaction.commit().await.expect("commit failed");
1293 fs.close().await.expect("close failed");
1294 }
1295
1296 {
1298 let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1299 let fs = FxFilesystemBuilder::new()
1300 .format(true)
1301 .image_builder_mode(Some(SuperBlockInstance::A))
1302 .open(device)
1303 .await
1304 .expect("open failed");
1305 let root_store = fs.root_store();
1306 let root_directory =
1307 Directory::open(&root_store, root_store.root_directory_object_id())
1308 .await
1309 .expect("open failed");
1310
1311 let mut transaction = fs
1312 .clone()
1313 .new_transaction(
1314 lock_keys![LockKey::object(
1315 root_store.store_object_id(),
1316 root_directory.object_id()
1317 )],
1318 Options::default(),
1319 )
1320 .await
1321 .expect("new_transaction failed");
1322 let handle = root_directory
1323 .create_child_file(&mut transaction, "test_fail")
1324 .await
1325 .expect("create_child_file failed");
1326 transaction.commit().await.expect("commit failed");
1327
1328 assert!(
1330 FxfsError::Unavailable
1331 .matches(&handle.allocate(0..4096).await.expect_err("allocate should fail"))
1332 );
1333
1334 fs.enable_allocations();
1336 handle.allocate(0..4096).await.expect("allocate should work after enable_allocations");
1337
1338 fs.close().await.expect("close failed");
1342 }
1343 }
1347
1348 #[fuchsia::test(threads = 10)]
1349 async fn test_replay_is_identical() {
1350 let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1351 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1352
1353 fs.close().await.expect("close failed");
1356 let device = fs.take_device().await;
1357 device.reopen(false);
1358
1359 struct Mutations<K, V>(Mutex<Vec<(Operation, Item<K, V>)>>);
1360
1361 impl<K: Clone, V: Clone> Mutations<K, V> {
1362 fn new() -> Self {
1363 Mutations(Mutex::new(Vec::new()))
1364 }
1365
1366 fn push(&self, operation: Operation, item: &Item<K, V>) {
1367 self.0.lock().push((operation, item.clone()));
1368 }
1369 }
1370
1371 let open_fs = |device,
1372 object_mutations: Arc<Mutex<HashMap<_, _>>>,
1373 allocator_mutations: Arc<Mutations<_, _>>| async {
1374 FxFilesystemBuilder::new()
1375 .journal_options(JournalOptions { reclaim_size: u64::MAX, ..Default::default() })
1376 .on_new_allocator(move |allocator| {
1377 let allocator_mutations = allocator_mutations.clone();
1378 allocator.tree().set_mutation_callback(Some(Box::new(move |op, item| {
1379 allocator_mutations.push(op, item)
1380 })));
1381 })
1382 .on_new_store(move |store| {
1383 let mutations = Arc::new(Mutations::new());
1384 object_mutations.lock().insert(store.store_object_id(), mutations.clone());
1385 store.tree().set_mutation_callback(Some(Box::new(move |op, item| {
1386 mutations.push(op, item)
1387 })));
1388 })
1389 .open(device)
1390 .await
1391 .expect("open failed")
1392 };
1393
1394 let allocator_mutations = Arc::new(Mutations::new());
1395 let object_mutations = Arc::new(Mutex::new(HashMap::default()));
1396 let fs = open_fs(device, object_mutations.clone(), allocator_mutations.clone()).await;
1397
1398 let root_store = fs.root_store();
1399 let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
1400 .await
1401 .expect("open failed");
1402
1403 let mut transaction = fs
1404 .clone()
1405 .new_transaction(
1406 lock_keys![LockKey::object(
1407 root_store.store_object_id(),
1408 root_directory.object_id()
1409 )],
1410 Options::default(),
1411 )
1412 .await
1413 .expect("new_transaction failed");
1414 let object = root_directory
1415 .create_child_file(&mut transaction, "test")
1416 .await
1417 .expect("create_child_file failed");
1418 transaction.commit().await.expect("commit failed");
1419
1420 let buf = object.allocate_buffer(10000).await;
1422 object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
1423
1424 object.write_or_append(Some(5000), buf.as_ref()).await.expect("write failed");
1426
1427 object.truncate(3000).await.expect("truncate failed");
1429
1430 let mut transaction = fs
1432 .clone()
1433 .new_transaction(
1434 lock_keys![
1435 LockKey::object(root_store.store_object_id(), root_directory.object_id()),
1436 LockKey::object(root_store.store_object_id(), object.object_id()),
1437 ],
1438 Options::default(),
1439 )
1440 .await
1441 .expect("new_transaction failed");
1442
1443 replace_child(&mut transaction, None, (&root_directory, "test"))
1444 .await
1445 .expect("replace_child failed");
1446
1447 transaction.commit().await.expect("commit failed");
1448
1449 root_store
1451 .tombstone_object(object.object_id(), Options::default())
1452 .await
1453 .expect("tombstone failed");
1454
1455 fs.close().await.expect("close failed");
1457
1458 let metadata_reservation_amount = fs.object_manager().metadata_reservation().amount();
1459
1460 let device = fs.take_device().await;
1461 device.reopen(false);
1462
1463 let replayed_object_mutations = Arc::new(Mutex::new(HashMap::default()));
1464 let replayed_allocator_mutations = Arc::new(Mutations::new());
1465 let fs = open_fs(
1466 device,
1467 replayed_object_mutations.clone(),
1468 replayed_allocator_mutations.clone(),
1469 )
1470 .await;
1471
1472 let m1 = object_mutations.lock();
1473 let m2 = replayed_object_mutations.lock();
1474 assert_eq!(m1.len(), m2.len());
1475 for (store_id, mutations) in &*m1 {
1476 let mutations = mutations.0.lock();
1477 let replayed = m2.get(&store_id).expect("Found unexpected store").0.lock();
1478 assert_eq!(mutations.len(), replayed.len());
1479 for ((op1, i1), (op2, i2)) in mutations.iter().zip(replayed.iter()) {
1480 assert_eq!(op1, op2);
1481 assert_eq!(i1.key, i2.key);
1482 assert_eq!(i1.value, i2.value);
1483 assert_eq!(i1.sequence, i2.sequence);
1484 }
1485 }
1486
1487 let a1 = allocator_mutations.0.lock();
1488 let a2 = replayed_allocator_mutations.0.lock();
1489 assert_eq!(a1.len(), a2.len());
1490 for ((op1, i1), (op2, i2)) in a1.iter().zip(a2.iter()) {
1491 assert_eq!(op1, op2);
1492 assert_eq!(i1.key, i2.key);
1493 assert_eq!(i1.value, i2.value);
1494 assert_eq!(i1.sequence, i2.sequence);
1495 }
1496
1497 assert_eq!(
1498 fs.object_manager().metadata_reservation().amount(),
1499 metadata_reservation_amount
1500 );
1501 }
1502
1503 #[fuchsia::test]
1504 async fn test_max_in_flight_transactions() {
1505 let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1506 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1507
1508 let transactions = FuturesUnordered::new();
1509 for _ in 0..super::MAX_IN_FLIGHT_TRANSACTIONS {
1510 transactions.push(fs.clone().new_transaction(lock_keys![], Options::default()));
1511 }
1512 let mut transactions: Vec<_> = transactions.try_collect().await.unwrap();
1513
1514 let mut fut = std::pin::pin!(fs.clone().new_transaction(lock_keys![], Options::default()));
1516 assert!(futures::poll!(&mut fut).is_pending());
1517
1518 transactions.pop();
1520
1521 assert!(futures::poll!(&mut fut).is_ready());
1522 }
1523
1524 #[fuchsia::test(threads = 10)]
1526 async fn test_continuously_trim() {
1527 let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1528 let fs = FxFilesystemBuilder::new()
1529 .trim_config(Some((Duration::ZERO, Duration::ZERO)))
1530 .format(true)
1531 .open(device)
1532 .await
1533 .expect("open failed");
1534 fasync::Timer::new(Duration::from_millis(10)).await;
1536
1537 let root_store = fs.root_store();
1540 let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
1541 .await
1542 .expect("open failed");
1543 for _ in 0..100 {
1544 let mut transaction = fs
1545 .clone()
1546 .new_transaction(
1547 lock_keys![LockKey::object(
1548 root_store.store_object_id(),
1549 root_directory.object_id()
1550 )],
1551 Options::default(),
1552 )
1553 .await
1554 .expect("new_transaction failed");
1555 let object = root_directory
1556 .create_child_file(&mut transaction, "test")
1557 .await
1558 .expect("create_child_file failed");
1559 transaction.commit().await.expect("commit failed");
1560
1561 {
1562 let buf = object.allocate_buffer(1024).await;
1563 object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
1564 }
1565 std::mem::drop(object);
1566
1567 let mut transaction = root_directory
1568 .acquire_context_for_replace(None, "test", true)
1569 .await
1570 .expect("acquire_context_for_replace failed")
1571 .transaction;
1572 replace_child(&mut transaction, None, (&root_directory, "test"))
1573 .await
1574 .expect("replace_child failed");
1575 transaction.commit().await.expect("commit failed");
1576 }
1577 fs.close().await.expect("close failed");
1578 }
1579
1580 #[test_case(true; "test power fail with barriers")]
1581 #[test_case(false; "test power fail with checksums")]
1582 #[fuchsia::test]
1583 async fn test_power_fail(barriers_enabled: bool) {
1584 for _ in 0..10 {
1587 let (store_id, device, test_file_object_id) = {
1588 let device = DeviceHolder::new(FakeDevice::new(8192, 4096));
1589 let fs = if barriers_enabled {
1590 FxFilesystemBuilder::new()
1591 .barriers_enabled(true)
1592 .format(true)
1593 .open(device)
1594 .await
1595 .expect("new filesystem failed")
1596 } else {
1597 FxFilesystem::new_empty(device).await.expect("new_empty failed")
1598 };
1599 let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
1600
1601 fs.sync(SyncOptions { flush_device: true, ..SyncOptions::default() })
1602 .await
1603 .expect("sync failed");
1604
1605 let store = root_volume
1606 .new_volume(
1607 "test",
1608 NewChildStoreOptions {
1609 options: StoreOptions {
1610 crypt: Some(Arc::new(new_insecure_crypt())),
1611 ..StoreOptions::default()
1612 },
1613 ..Default::default()
1614 },
1615 )
1616 .await
1617 .expect("new_volume failed");
1618 let root_directory = Directory::open(&store, store.root_directory_object_id())
1619 .await
1620 .expect("open failed");
1621
1622 async fn create_files(store: &Arc<ObjectStore>, prefix: &str) {
1624 let fs = store.filesystem();
1625 let root_directory = Directory::open(store, store.root_directory_object_id())
1626 .await
1627 .expect("open failed");
1628 for i in 0..100 {
1629 let mut transaction = fs
1630 .clone()
1631 .new_transaction(
1632 lock_keys![LockKey::object(
1633 store.store_object_id(),
1634 store.root_directory_object_id()
1635 )],
1636 Options::default(),
1637 )
1638 .await
1639 .expect("new_transaction failed");
1640 root_directory
1641 .create_child_file(&mut transaction, &format!("{prefix} {i}"))
1642 .await
1643 .expect("create_child_file failed");
1644 transaction.commit().await.expect("commit failed");
1645 }
1646 }
1647
1648 create_files(&store, "A").await;
1650
1651 let mut transaction = fs
1654 .clone()
1655 .new_transaction(
1656 lock_keys![LockKey::object(
1657 store.store_object_id(),
1658 store.root_directory_object_id()
1659 )],
1660 Options::default(),
1661 )
1662 .await
1663 .expect("new_transaction failed");
1664 let object = root_directory
1665 .create_child_file(&mut transaction, "test")
1666 .await
1667 .expect("create_child_file failed");
1668 transaction.commit().await.expect("commit failed");
1669
1670 let mut transaction =
1671 object.new_transaction().await.expect("new_transaction failed");
1672 let mut buffer = object.allocate_buffer(4096).await;
1673 buffer.as_mut_slice().fill(0xed);
1674 object
1675 .txn_write(&mut transaction, 0, buffer.as_ref())
1676 .await
1677 .expect("txn_write failed");
1678 transaction.commit().await.expect("commit failed");
1679
1680 create_files(&store, "B").await;
1682
1683 fs.sync(SyncOptions::default()).await.expect("sync failed");
1686
1687 fasync::Timer::new(Duration::from_millis(10)).await;
1693
1694 (
1695 store.store_object_id(),
1696 fs.device().snapshot().expect("snapshot failed"),
1697 object.object_id(),
1698 )
1699 };
1700
1701 device
1704 .discard_random_since_last_flush()
1705 .expect("discard_random_since_last_flush failed");
1706
1707 let fs = FxFilesystem::open(device).await.expect("open failed");
1708 fsck(fs.clone()).await.expect("fsck failed");
1709
1710 let mut check_test_file = false;
1711
1712 let object_id = if fs.object_manager().store(store_id).is_some() {
1715 fsck_volume(&fs, store_id, Some(Arc::new(new_insecure_crypt())))
1716 .await
1717 .expect("fsck_volume failed");
1718
1719 let store = root_volume(fs.clone())
1723 .await
1724 .expect("root_volume failed")
1725 .volume(
1726 "test",
1727 StoreOptions {
1728 crypt: Some(Arc::new(new_insecure_crypt())),
1729 ..StoreOptions::default()
1730 },
1731 )
1732 .await
1733 .expect("volume failed");
1734
1735 let root_directory = Directory::open(&store, store.root_directory_object_id())
1736 .await
1737 .expect("open failed");
1738
1739 let mut transaction = fs
1740 .clone()
1741 .new_transaction(
1742 lock_keys![LockKey::object(
1743 store.store_object_id(),
1744 store.root_directory_object_id()
1745 )],
1746 Options::default(),
1747 )
1748 .await
1749 .expect("new_transaction failed");
1750 let object = root_directory
1751 .create_child_file(&mut transaction, &format!("C"))
1752 .await
1753 .expect("create_child_file failed");
1754 transaction.commit().await.expect("commit failed");
1755
1756 if let Ok(test_file) = ObjectStore::open_object(
1758 &store,
1759 test_file_object_id,
1760 HandleOptions::default(),
1761 None,
1762 )
1763 .await
1764 {
1765 let mut buffer = test_file.allocate_buffer(4096).await;
1767 let bytes = test_file.read(0, buffer.as_mut()).await.expect("read failed");
1768 if bytes == 4096 {
1769 let expected = [0xed; 4096];
1770 assert_eq!(buffer.as_slice(), &expected);
1771 } else {
1772 assert_eq!(bytes, 0);
1774 }
1775
1776 let mut transaction =
1778 test_file.new_transaction().await.expect("new_transaction failed");
1779 buffer.as_mut_slice().fill(0x37);
1780 test_file
1781 .txn_write(&mut transaction, 0, buffer.as_ref())
1782 .await
1783 .expect("txn_write failed");
1784 transaction.commit().await.expect("commit failed");
1785 check_test_file = true;
1786 }
1787
1788 object.object_id()
1789 } else {
1790 INVALID_OBJECT_ID
1791 };
1792
1793 fs.close().await.expect("close failed");
1795 let device = fs.take_device().await;
1796 device.reopen(false);
1797
1798 let fs = FxFilesystem::open(device).await.expect("open failed");
1799 fsck(fs.clone()).await.expect("fsck failed");
1800
1801 if object_id != INVALID_OBJECT_ID {
1804 fsck_volume(&fs, store_id, Some(Arc::new(new_insecure_crypt())))
1805 .await
1806 .expect("fsck_volume failed");
1807
1808 let store = root_volume(fs.clone())
1809 .await
1810 .expect("root_volume failed")
1811 .volume(
1812 "test",
1813 StoreOptions {
1814 crypt: Some(Arc::new(new_insecure_crypt())),
1815 ..StoreOptions::default()
1816 },
1817 )
1818 .await
1819 .expect("volume failed");
1820 ObjectStore::open_object(&store, object_id, HandleOptions::default(), None)
1822 .await
1823 .expect("open_object failed");
1824
1825 if check_test_file {
1827 info!("Checking test file for modification");
1828 let test_file = ObjectStore::open_object(
1829 &store,
1830 test_file_object_id,
1831 HandleOptions::default(),
1832 None,
1833 )
1834 .await
1835 .expect("open_object failed");
1836 let mut buffer = test_file.allocate_buffer(4096).await;
1837 assert_eq!(
1838 test_file.read(0, buffer.as_mut()).await.expect("read failed"),
1839 4096
1840 );
1841 let expected = [0x37; 4096];
1842 assert_eq!(buffer.as_slice(), &expected);
1843 }
1844 }
1845
1846 fs.close().await.expect("close failed");
1847 }
1848 }
1849
1850 #[fuchsia::test]
1851 async fn test_barrier_not_emitted_when_transaction_has_no_data() {
1852 let barrier_count = Arc::new(AtomicU32::new(0));
1853
1854 struct Observer(Arc<AtomicU32>);
1855
1856 impl fake_device::Observer for Observer {
1857 fn barrier(&self) {
1858 self.0.fetch_add(1, Ordering::Relaxed);
1859 }
1860 }
1861
1862 let mut fake_device = FakeDevice::new(8192, 4096);
1863 fake_device.set_observer(Box::new(Observer(barrier_count.clone())));
1864 let device = DeviceHolder::new(fake_device);
1865 let fs = FxFilesystemBuilder::new()
1866 .barriers_enabled(true)
1867 .format(true)
1868 .open(device)
1869 .await
1870 .expect("new filesystem failed");
1871
1872 {
1873 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
1874 root_vol
1875 .new_volume(
1876 "test",
1877 NewChildStoreOptions {
1878 options: StoreOptions {
1879 crypt: Some(Arc::new(new_insecure_crypt())),
1880 ..StoreOptions::default()
1881 },
1882 ..NewChildStoreOptions::default()
1883 },
1884 )
1885 .await
1886 .expect("there is no test volume");
1887 fs.close().await.expect("close failed");
1888 }
1889 let device = fs.take_device().await;
1892 device.reopen(false);
1893 let fs = FxFilesystemBuilder::new()
1894 .barriers_enabled(true)
1895 .open(device)
1896 .await
1897 .expect("new filesystem failed");
1898 let expected_barrier_count = barrier_count.load(Ordering::Relaxed);
1899
1900 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
1901 let store = root_vol
1902 .volume(
1903 "test",
1904 StoreOptions {
1905 crypt: Some(Arc::new(new_insecure_crypt())),
1906 ..StoreOptions::default()
1907 },
1908 )
1909 .await
1910 .expect("there is no test volume");
1911
1912 let fs = store.filesystem();
1914 let root_directory =
1915 Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
1916 for i in 0..100 {
1917 let mut transaction = fs
1918 .clone()
1919 .new_transaction(
1920 lock_keys![LockKey::object(
1921 store.store_object_id(),
1922 store.root_directory_object_id()
1923 )],
1924 Options::default(),
1925 )
1926 .await
1927 .expect("new_transaction failed");
1928 root_directory
1929 .create_child_file(&mut transaction, &format!("A {i}"))
1930 .await
1931 .expect("create_child_file failed");
1932 transaction.commit().await.expect("commit failed");
1933 }
1934
1935 fs.close().await.expect("close failed");
1937 assert_eq!(expected_barrier_count, barrier_count.load(Ordering::Relaxed));
1939 }
1940
1941 #[fuchsia::test]
1942 async fn test_barrier_emitted_when_transaction_includes_data() {
1943 let barrier_count = Arc::new(AtomicU32::new(0));
1944
1945 struct Observer(Arc<AtomicU32>);
1946
1947 impl fake_device::Observer for Observer {
1948 fn barrier(&self) {
1949 self.0.fetch_add(1, Ordering::Relaxed);
1950 }
1951 }
1952
1953 let mut fake_device = FakeDevice::new(8192, 4096);
1954 fake_device.set_observer(Box::new(Observer(barrier_count.clone())));
1955 let device = DeviceHolder::new(fake_device);
1956 let fs = FxFilesystemBuilder::new()
1957 .barriers_enabled(true)
1958 .format(true)
1959 .open(device)
1960 .await
1961 .expect("new filesystem failed");
1962
1963 {
1964 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
1965 root_vol
1966 .new_volume(
1967 "test",
1968 NewChildStoreOptions {
1969 options: StoreOptions {
1970 crypt: Some(Arc::new(new_insecure_crypt())),
1971 ..StoreOptions::default()
1972 },
1973 ..NewChildStoreOptions::default()
1974 },
1975 )
1976 .await
1977 .expect("there is no test volume");
1978 fs.close().await.expect("close failed");
1979 }
1980 let device = fs.take_device().await;
1983 device.reopen(false);
1984 let fs = FxFilesystemBuilder::new()
1985 .barriers_enabled(true)
1986 .open(device)
1987 .await
1988 .expect("new filesystem failed");
1989 let expected_barrier_count = barrier_count.load(Ordering::Relaxed);
1990
1991 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
1992 let store = root_vol
1993 .volume(
1994 "test",
1995 StoreOptions {
1996 crypt: Some(Arc::new(new_insecure_crypt())),
1997 ..StoreOptions::default()
1998 },
1999 )
2000 .await
2001 .expect("there is no test volume");
2002
2003 let fs: Arc<FxFilesystem> = store.filesystem();
2005 let root_directory =
2006 Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
2007
2008 let mut transaction = fs
2009 .clone()
2010 .new_transaction(
2011 lock_keys![LockKey::object(
2012 store.store_object_id(),
2013 store.root_directory_object_id()
2014 )],
2015 Options::default(),
2016 )
2017 .await
2018 .expect("new_transaction failed");
2019 let object = root_directory
2020 .create_child_file(&mut transaction, "test")
2021 .await
2022 .expect("create_child_file failed");
2023 transaction.commit().await.expect("commit failed");
2024
2025 let mut transaction = object.new_transaction().await.expect("new_transaction failed");
2026 let mut buffer = object.allocate_buffer(4096).await;
2027 buffer.as_mut_slice().fill(0xed);
2028 object.txn_write(&mut transaction, 0, buffer.as_ref()).await.expect("txn_write failed");
2029 transaction.commit().await.expect("commit failed");
2030
2031 fs.close().await.expect("close failed");
2033 assert!(expected_barrier_count < barrier_count.load(Ordering::Relaxed));
2035 }
2036
2037 #[test_case(true; "fail when original filesystem has barriers enabled")]
2038 #[test_case(false; "fail when original filesystem has barriers disabled")]
2039 #[fuchsia::test]
2040 async fn test_switching_barrier_mode_on_existing_filesystem(original_barrier_mode: bool) {
2041 let crypt = Some(Arc::new(new_insecure_crypt()) as Arc<dyn fxfs_crypto::Crypt>);
2042 let fake_device = FakeDevice::new(8192, 4096);
2043 let device = DeviceHolder::new(fake_device);
2044 let fs: super::OpenFxFilesystem = FxFilesystemBuilder::new()
2045 .barriers_enabled(original_barrier_mode)
2046 .format(true)
2047 .open(device)
2048 .await
2049 .expect("new filesystem failed");
2050
2051 {
2053 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2054 let store = root_vol
2055 .new_volume(
2056 "test",
2057 NewChildStoreOptions {
2058 options: StoreOptions { crypt: crypt.clone(), ..Default::default() },
2059 ..Default::default()
2060 },
2061 )
2062 .await
2063 .expect("creating test volume");
2064 let root_dir = Directory::open(&store, store.root_directory_object_id())
2065 .await
2066 .expect("open failed");
2067 let mut transaction = fs
2068 .clone()
2069 .new_transaction(
2070 lock_keys![LockKey::object(
2071 store.store_object_id(),
2072 store.root_directory_object_id()
2073 )],
2074 Default::default(),
2075 )
2076 .await
2077 .expect("new_transaction failed");
2078 let object = root_dir
2079 .create_child_file(&mut transaction, "file")
2080 .await
2081 .expect("create_child_file failed");
2082 transaction.commit().await.expect("commit failed");
2083 let mut buffer = object.allocate_buffer(4096).await;
2084 buffer.as_mut_slice().fill(0xA7);
2085 let new_size = object.write_or_append(None, buffer.as_ref()).await.unwrap();
2086 assert_eq!(new_size, 4096);
2087 }
2088
2089 fs.close().await.expect("close failed");
2091 let device = fs.take_device().await;
2092 device.reopen(false);
2093 let fs = FxFilesystemBuilder::new()
2094 .barriers_enabled(!original_barrier_mode)
2095 .open(device)
2096 .await
2097 .expect("new filesystem failed");
2098 {
2099 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2100 let store = root_vol
2101 .volume("test", StoreOptions { crypt: crypt.clone(), ..Default::default() })
2102 .await
2103 .expect("opening test volume");
2104 let root_dir = Directory::open(&store, store.root_directory_object_id())
2105 .await
2106 .expect("open failed");
2107 let (object_id, _, _) =
2108 root_dir.lookup("file").await.expect("lookup failed").expect("missing file");
2109 let test_file = ObjectStore::open_object(&store, object_id, Default::default(), None)
2110 .await
2111 .expect("open failed");
2112 let mut buffer = test_file.allocate_buffer(4096).await;
2114 buffer.as_mut_slice().fill(0xA8);
2115 let new_size = test_file.write_or_append(None, buffer.as_ref()).await.unwrap();
2116 assert_eq!(new_size, 8192);
2117 }
2118
2119 fs.close().await.expect("close failed");
2122 let device = fs.take_device().await;
2123 device.reopen(false);
2124 let fs = FxFilesystemBuilder::new()
2125 .barriers_enabled(original_barrier_mode)
2126 .open(device)
2127 .await
2128 .expect("new filesystem failed");
2129 {
2130 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2131 let store = root_vol
2132 .volume("test", StoreOptions { crypt: crypt.clone(), ..Default::default() })
2133 .await
2134 .expect("opening test volume");
2135 let root_dir = Directory::open(&store, store.root_directory_object_id())
2136 .await
2137 .expect("open failed");
2138 let (object_id, _, _) =
2139 root_dir.lookup("file").await.expect("lookup failed").expect("missing file");
2140 let test_file = ObjectStore::open_object(&store, object_id, Default::default(), None)
2141 .await
2142 .expect("open failed");
2143 let mut buffer = test_file.allocate_buffer(8192).await;
2144 assert_eq!(
2145 test_file.read(0, buffer.as_mut()).await.expect("read failed"),
2146 8192,
2147 "short read"
2148 );
2149 assert_eq!(buffer.as_slice()[0..4096], [0xA7; 4096]);
2150 assert_eq!(buffer.as_slice()[4096..8192], [0xA8; 4096]);
2151 }
2152 fs.close().await.expect("close failed");
2153 }
2154
2155 #[fuchsia::test]
2156 async fn test_image_builder_mode_no_early_writes() {
2157 const BLOCK_SIZE: u32 = 4096;
2158 let device = DeviceHolder::new(FakeDevice::new(2048, BLOCK_SIZE));
2159 device.reopen(true);
2160 let fs = FxFilesystemBuilder::new()
2161 .format(true)
2162 .image_builder_mode(Some(SuperBlockInstance::A))
2163 .open(device)
2164 .await
2165 .expect("open failed");
2166 fs.enable_allocations();
2167 fs.device().reopen(false);
2169 fs.close().await.expect("closed");
2170 }
2171
2172 #[fuchsia::test]
2173 async fn test_image_builder_mode() {
2174 const BLOCK_SIZE: u32 = 4096;
2175 const EXISTING_FILE_RANGE: Range<u64> = 4096 * 1024..4096 * 1025;
2176 let device = DeviceHolder::new(FakeDevice::new(2048, BLOCK_SIZE));
2177
2178 {
2180 let mut write_buf =
2181 device.allocate_buffer(EXISTING_FILE_RANGE.length().unwrap() as usize).await;
2182 write_buf.as_mut_slice().fill(0xf0);
2183 device.write(EXISTING_FILE_RANGE.start, write_buf.as_ref()).await.expect("write");
2184 }
2185
2186 device.reopen(true);
2187
2188 let device = {
2189 let fs = FxFilesystemBuilder::new()
2190 .format(true)
2191 .image_builder_mode(Some(SuperBlockInstance::B))
2192 .open(device)
2193 .await
2194 .expect("open failed");
2195 fs.enable_allocations();
2196 {
2197 let root_store = fs.root_store();
2198 let root_directory =
2199 Directory::open(&root_store, root_store.root_directory_object_id())
2200 .await
2201 .expect("open failed");
2202 let handle;
2204 {
2205 let mut transaction = fs
2206 .clone()
2207 .new_transaction(
2208 lock_keys![LockKey::object(
2209 root_directory.store().store_object_id(),
2210 root_directory.object_id()
2211 )],
2212 Options::default(),
2213 )
2214 .await
2215 .expect("new transaction");
2216 handle = root_directory
2217 .create_child_file(&mut transaction, "test")
2218 .await
2219 .expect("create file");
2220 handle.extend(&mut transaction, EXISTING_FILE_RANGE).await.expect("extend");
2221 transaction.commit().await.expect("commit");
2222 }
2223 }
2224 fs.device().reopen(false);
2225 fs.close().await.expect("close");
2226 fs.take_device().await
2227 };
2228 device.reopen(false);
2229 let fs = FxFilesystem::open(device).await.expect("open failed");
2230 fsck(fs.clone()).await.expect("fsck failed");
2231
2232 let root_store = fs.root_store();
2234 let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
2235 .await
2236 .expect("open failed");
2237 let (object_id, descriptor, _) =
2238 root_directory.lookup("test").await.expect("lookup failed").unwrap();
2239 assert_eq!(descriptor, ObjectDescriptor::File);
2240 let test_file =
2241 ObjectStore::open_object(&root_store, object_id, HandleOptions::default(), None)
2242 .await
2243 .expect("open failed");
2244 let mut read_buf =
2245 test_file.allocate_buffer(EXISTING_FILE_RANGE.length().unwrap() as usize).await;
2246 test_file.read(0, read_buf.as_mut()).await.expect("read failed");
2247 assert_eq!(read_buf.as_slice(), [0xf0; 4096]);
2248 fs.close().await.expect("closed");
2249 }
2250
2251 #[fuchsia::test]
2252 async fn test_read_only_mount_on_full_filesystem() {
2253 let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
2254 let fs =
2255 FxFilesystemBuilder::new().format(true).open(device).await.expect("new_empty failed");
2256 let root_store = fs.root_store();
2257 let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
2258 .await
2259 .expect("open failed");
2260
2261 let mut transaction = fs
2262 .clone()
2263 .new_transaction(
2264 lock_keys![LockKey::object(
2265 root_store.store_object_id(),
2266 root_directory.object_id()
2267 )],
2268 Options::default(),
2269 )
2270 .await
2271 .expect("new_transaction failed");
2272 let handle = root_directory
2273 .create_child_file(&mut transaction, "test")
2274 .await
2275 .expect("create_child_file failed");
2276 transaction.commit().await.expect("commit failed");
2277
2278 let mut buf = handle.allocate_buffer(4096).await;
2279 buf.as_mut_slice().fill(0xaa);
2280 loop {
2281 if handle.write_or_append(None, buf.as_ref()).await.is_err() {
2282 break;
2283 }
2284 }
2285
2286 let max_offset = fs.allocator().maximum_offset();
2287 fs.close().await.expect("Close failed");
2288
2289 let device = fs.take_device().await;
2290 device.reopen(false);
2291 let mut buffer = device
2292 .allocate_buffer(
2293 crate::round::round_up(max_offset, TEST_DEVICE_BLOCK_SIZE).unwrap() as usize
2294 )
2295 .await;
2296 device.read(0, buffer.as_mut()).await.expect("read failed");
2297
2298 let device = DeviceHolder::new(
2299 FakeDevice::from_image(&buffer.as_slice()[..], TEST_DEVICE_BLOCK_SIZE)
2300 .expect("from_image failed"),
2301 );
2302 let fs =
2303 FxFilesystemBuilder::new().read_only(true).open(device).await.expect("open failed");
2304 fs.close().await.expect("Close failed");
2305 }
2306
2307 #[test_case(SuperBlockInstance::A; "Superblock instance A")]
2308 #[test_case(SuperBlockInstance::B; "Superblock instance B")]
2309 #[fuchsia::test]
2310 async fn test_image_builder_mode_flush_on_close_sb_a(target_sb: SuperBlockInstance) {
2311 const BLOCK_SIZE: u32 = 4096;
2312 let device = DeviceHolder::new(FakeDevice::new(2048, BLOCK_SIZE));
2313
2314 device.reopen(true);
2316 let fs = FxFilesystemBuilder::new()
2317 .format(true)
2318 .image_builder_mode(Some(target_sb))
2319 .open(device)
2320 .await
2321 .expect("open failed");
2322
2323 fs.enable_allocations();
2324
2325 fs.device().reopen(false);
2327
2328 {
2330 let root_store = fs.root_store();
2331 let root_directory =
2332 Directory::open(&root_store, root_store.root_directory_object_id())
2333 .await
2334 .expect("open failed");
2335
2336 let mut transaction = fs
2337 .clone()
2338 .new_transaction(
2339 lock_keys![LockKey::object(
2340 root_directory.store().store_object_id(),
2341 root_directory.object_id()
2342 )],
2343 Options::default(),
2344 )
2345 .await
2346 .expect("new transaction");
2347 let handle = root_directory
2348 .create_child_file(&mut transaction, "post_finalize_file")
2349 .await
2350 .expect("create file");
2351 transaction.commit().await.expect("commit");
2352
2353 let mut buf = handle.allocate_buffer(BLOCK_SIZE as usize).await;
2354 buf.as_mut_slice().fill(0xaa);
2355 handle.write_or_append(None, buf.as_ref()).await.expect("write failed");
2356 }
2357
2358 fs.close().await.expect("close failed");
2360
2361 let other_sb = target_sb.next();
2362
2363 let device = fs.take_device().await;
2365 device.reopen(true); let mut buf = device.allocate_buffer(BLOCK_SIZE as usize).await;
2367
2368 device.read(target_sb.first_extent().start, buf.as_mut()).await.expect("read target_sb");
2369 assert_eq!(&buf.as_slice()[..8], b"FxfsSupr", "target_sb should have magic bytes");
2370
2371 buf.as_mut_slice().fill(0); device.read(other_sb.first_extent().start, buf.as_mut()).await.expect("read other_sb");
2373 assert_eq!(buf.as_slice(), &[0; 4096], "other_sb should be zeroed");
2375 }
2376
2377 #[cfg(target_os = "fuchsia")]
2378 #[fuchsia::test(allow_stalls = false)]
2379 async fn test_trim_with_power_manager() {
2380 use anyhow::Error;
2381 use async_trait::async_trait;
2382 use fuchsia_async::TestExecutor;
2383 use futures::StreamExt;
2384 use zx::HandleBased;
2385
2386 TestExecutor::advance_to(fasync::MonotonicInstant::ZERO).await;
2387
2388 #[derive(Default)]
2389 struct MockPowerManager {
2390 on_battery: Mutex<bool>,
2391 event: event_listener::Event,
2392 wake_lease: Mutex<Option<zx::EventPair>>,
2393 }
2394
2395 impl MockPowerManager {
2396 fn set_on_battery(&self, v: bool) {
2397 *self.on_battery.lock() = v;
2398 self.event.notify(usize::MAX);
2399 }
2400
2401 fn is_lease_held(&self) -> bool {
2402 self.wake_lease.lock().as_ref().is_some_and(|handle| {
2403 handle
2404 .wait_one(
2405 zx::Signals::EVENTPAIR_PEER_CLOSED,
2406 zx::MonotonicInstant::INFINITE_PAST,
2407 )
2408 .is_err()
2409 })
2410 }
2411 }
2412
2413 impl super::PowerManager for MockPowerManager {
2414 fn watch_battery(
2415 self: Arc<Self>,
2416 ) -> futures::stream::BoxStream<'static, (bool, super::WakeLease)> {
2417 futures::stream::unfold(true, move |first| {
2418 let this = self.clone();
2419 async move {
2420 if !first {
2421 this.event.listen().await;
2422 }
2423 let val = *this.on_battery.lock();
2424 let handle = if val {
2425 zx::NullableHandle::invalid()
2426 } else {
2427 let (h1, h2) = zx::EventPair::create();
2428 *this.wake_lease.lock() = Some(h2);
2429 h1.into_handle()
2431 };
2432 Some(((val, handle), false))
2433 }
2434 })
2435 .boxed()
2436 }
2437 }
2438
2439 let trim_count = Arc::new(AtomicU32::new(0));
2440
2441 struct TrimTrackingDevice {
2442 inner: DeviceHolder,
2443 trim_count: Arc<AtomicU32>,
2444 power_manager: Arc<MockPowerManager>,
2445 }
2446
2447 #[async_trait]
2448 impl storage_device::Device for TrimTrackingDevice {
2449 fn allocate_buffer(&self, size: usize) -> storage_device::buffer::BufferFuture<'_> {
2450 self.inner.allocate_buffer(size)
2451 }
2452 fn block_size(&self) -> u32 {
2453 self.inner.block_size()
2454 }
2455 fn block_count(&self) -> u64 {
2456 self.inner.block_count()
2457 }
2458 async fn read_with_opts(
2459 &self,
2460 offset: u64,
2461 buffer: storage_device::buffer::MutableBufferRef<'_>,
2462 opts: storage_device::ReadOptions,
2463 ) -> Result<(), Error> {
2464 self.inner.read_with_opts(offset, buffer, opts).await
2465 }
2466 async fn write_with_opts(
2467 &self,
2468 offset: u64,
2469 buffer: storage_device::buffer::BufferRef<'_>,
2470 opts: storage_device::WriteOptions,
2471 ) -> Result<(), Error> {
2472 self.inner.write_with_opts(offset, buffer, opts).await
2473 }
2474 async fn trim(&self, range: std::ops::Range<u64>) -> Result<(), Error> {
2475 assert!(self.power_manager.is_lease_held());
2476 self.trim_count.fetch_add(1, Ordering::SeqCst);
2477 self.inner.trim(range).await
2478 }
2479 async fn flush(&self) -> Result<(), Error> {
2480 self.inner.flush().await
2481 }
2482 async fn close(&self) -> Result<(), Error> {
2483 self.inner.close().await
2484 }
2485 fn barrier(&self) {
2486 self.inner.barrier()
2487 }
2488 fn supports_trim(&self) -> bool {
2489 true
2490 }
2491 fn is_read_only(&self) -> bool {
2492 self.inner.is_read_only()
2493 }
2494 fn snapshot(&self) -> Result<DeviceHolder, Error> {
2495 Ok(DeviceHolder::new(TrimTrackingDevice {
2496 inner: self.inner.snapshot()?,
2497 trim_count: self.trim_count.clone(),
2498 power_manager: self.power_manager.clone(),
2499 }))
2500 }
2501 fn reopen(&self, read_only: bool) {
2502 self.inner.reopen(read_only)
2503 }
2504 }
2505
2506 let pm = Arc::new(MockPowerManager::default());
2507
2508 pm.set_on_battery(true);
2510
2511 let fake_device = FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE);
2512 let device = DeviceHolder::new(TrimTrackingDevice {
2513 inner: DeviceHolder::new(fake_device),
2514 trim_count: trim_count.clone(),
2515 power_manager: pm.clone(),
2516 });
2517
2518 let fs = FxFilesystemBuilder::new()
2519 .format(true)
2520 .power_manager(pm.clone())
2521 .trim_config(Some((Duration::ZERO, Duration::from_millis(100))))
2522 .trim_charger_wait(Duration::from_millis(10))
2523 .open(device)
2524 .await
2525 .expect("open failed");
2526
2527 TestExecutor::advance_to(fasync::MonotonicInstant::after(
2529 Duration::from_millis(500).into(),
2530 ))
2531 .await;
2532 let _ = TestExecutor::poll_until_stalled(std::future::pending::<()>()).await;
2533
2534 assert_eq!(trim_count.load(Ordering::SeqCst), 0);
2535
2536 {
2538 let root_store = fs.root_store();
2539 let root_directory =
2540 Directory::open(&root_store, root_store.root_directory_object_id())
2541 .await
2542 .expect("open failed");
2543 let mut transaction = fs
2544 .clone()
2545 .new_transaction(
2546 lock_keys![LockKey::object(
2547 root_store.store_object_id(),
2548 root_directory.object_id()
2549 )],
2550 Options::default(),
2551 )
2552 .await
2553 .expect("new_transaction failed");
2554 let handle = root_directory
2555 .create_child_file(&mut transaction, "test")
2556 .await
2557 .expect("create_child_file failed");
2558 transaction.commit().await.expect("commit failed");
2559 handle.allocate(0..4096).await.expect("allocate failed");
2560 let mut transaction = fs
2562 .clone()
2563 .new_transaction(
2564 lock_keys![
2565 LockKey::object(root_store.store_object_id(), root_directory.object_id()),
2566 LockKey::object(root_store.store_object_id(), handle.object_id()),
2567 ],
2568 Options::default(),
2569 )
2570 .await
2571 .expect("new_transaction failed");
2572 replace_child(&mut transaction, None, (&root_directory, "test"))
2573 .await
2574 .expect("delete failed");
2575 transaction.commit().await.expect("commit failed");
2576 fs.root_store()
2577 .tombstone_object(handle.object_id(), Options::default())
2578 .await
2579 .expect("tombstone failed");
2580 }
2581
2582 pm.set_on_battery(false);
2584
2585 TestExecutor::advance_to(fasync::MonotonicInstant::after(Duration::from_millis(10).into()))
2587 .await;
2588
2589 let _ = TestExecutor::poll_until_stalled(std::future::pending::<()>()).await;
2590
2591 assert!(trim_count.load(Ordering::SeqCst) > 0);
2592
2593 trim_count.store(0, Ordering::SeqCst);
2595 pm.set_on_battery(true);
2596
2597 TestExecutor::advance_to(fasync::MonotonicInstant::after(
2599 Duration::from_millis(500).into(),
2600 ))
2601 .await;
2602
2603 let _ = TestExecutor::poll_until_stalled(std::future::pending::<()>()).await;
2604
2605 assert_eq!(trim_count.load(Ordering::SeqCst), 0);
2606
2607 fs.close().await.expect("close failed");
2608 }
2609}