1use crate::errors::FxfsError;
6use crate::fsck::{FsckOptions, fsck_volume_with_options, fsck_with_options};
7use crate::log::*;
8use crate::metrics;
9use crate::object_store::allocator::{Allocator, Hold, Reservation};
10use crate::object_store::directory::Directory;
11use crate::object_store::graveyard::Graveyard;
12use crate::object_store::journal::super_block::{SuperBlockHeader, SuperBlockInstance};
13use crate::object_store::journal::{self, Journal, JournalCheckpoint, JournalOptions};
14use crate::object_store::object_manager::ObjectManager;
15use crate::object_store::transaction::{
16 self, AssocObj, LockKey, LockKeys, LockManager, MetadataReservation, Mutation, ReadGuard,
17 TRANSACTION_METADATA_MAX_AMOUNT, Transaction, WriteGuard, lock_keys,
18};
19use crate::object_store::volume::{VOLUMES_DIRECTORY, root_volume};
20use crate::object_store::{NewChildStoreOptions, ObjectStore, StoreOptions};
21use crate::range::RangeExt;
22use crate::serialized_types::{LATEST_VERSION, Version};
23use anyhow::{Context, Error, anyhow, bail};
24use async_trait::async_trait;
25use event_listener::Event;
26use fuchsia_async as fasync;
27use fuchsia_async::condition::Condition;
28use fuchsia_inspect::{Inspector, LazyNode, NumericProperty as _, UintProperty};
29use fuchsia_sync::Mutex;
30use futures::{FutureExt, Stream};
31use fxfs_crypto::Crypt;
32use fxfs_trace::{TraceFutureExt, trace_future_args};
33use static_assertions::const_assert;
34use std::pin::pin;
35use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
36use std::sync::{Arc, OnceLock, Weak};
37use std::task::Poll;
38use std::time::{Duration, Instant};
39use storage_device::{Device, DeviceHolder};
40
41pub const MIN_BLOCK_SIZE: u64 = 4096;
42pub const MAX_BLOCK_SIZE: u64 = u16::MAX as u64 + 1;
43
44pub const MAX_FILE_SIZE: u64 = i64::MAX as u64 - 4095;
49const_assert!(9223372036854771712 == MAX_FILE_SIZE);
50
51use futures::stream::StreamExt;
52
53const MAX_IN_FLIGHT_TRANSACTIONS: u64 = 4;
55
56const TRIM_AFTER_BOOT_TIMER: Duration = Duration::from_secs(60 * 60);
60
61const TRIM_INTERVAL_TIMER: Duration = Duration::from_secs(60 * 60 * 24);
63
64const CLEAN_TRANSFER_BUFFER_INTERVAL: Duration = Duration::from_secs(60);
67
68#[cfg(target_os = "fuchsia")]
69pub type WakeLease = zx::NullableHandle;
70
71#[cfg(not(target_os = "fuchsia"))]
72pub type WakeLease = fasync::emulated_handle::Handle;
73
74pub trait PowerManager: Send + Sync {
75 fn watch_battery(self: Arc<Self>) -> futures::stream::BoxStream<'static, (bool, WakeLease)>;
77}
78
79pub struct Info {
81 pub total_bytes: u64,
82 pub used_bytes: u64,
83}
84
85pub type PostCommitHook =
86 Option<Box<dyn Fn() -> futures::future::BoxFuture<'static, ()> + Send + Sync>>;
87
88pub type PreCommitHook = Option<Box<dyn Fn(&Transaction<'_>) -> Result<(), Error> + Send + Sync>>;
89
90pub struct Options {
91 pub read_only: bool,
93
94 pub roll_metadata_key_byte_count: u64,
98
99 pub pre_commit_hook: PreCommitHook,
102
103 pub post_commit_hook: PostCommitHook,
106
107 pub skip_initial_reap: bool,
110
111 pub trim_config: Option<(Duration, Duration)>,
116
117 pub image_builder_mode: Option<SuperBlockInstance>,
120
121 pub inline_crypto_enabled: bool,
128
129 pub barriers_enabled: bool,
134
135 pub power_manager: Option<Arc<dyn PowerManager>>,
137
138 pub trim_charger_wait: Duration,
140}
141
142impl Default for Options {
143 fn default() -> Self {
144 Options {
145 roll_metadata_key_byte_count: 128 * 1024 * 1024,
146 read_only: false,
147 pre_commit_hook: None,
148 post_commit_hook: None,
149 skip_initial_reap: false,
150 trim_config: Some((TRIM_AFTER_BOOT_TIMER, TRIM_INTERVAL_TIMER)),
151 image_builder_mode: None,
152 inline_crypto_enabled: false,
153 barriers_enabled: false,
154 power_manager: None,
155 trim_charger_wait: Duration::from_secs(10),
156 }
157 }
158}
159
160pub struct ApplyContext<'a, 'b> {
162 pub mode: ApplyMode<'a, 'b>,
164
165 pub checkpoint: JournalCheckpoint,
167}
168
169pub enum ApplyMode<'a, 'b> {
172 Replay,
173 Live(&'a Transaction<'b>),
174}
175
176impl ApplyMode<'_, '_> {
177 pub fn is_replay(&self) -> bool {
178 matches!(self, ApplyMode::Replay)
179 }
180
181 pub fn is_live(&self) -> bool {
182 matches!(self, ApplyMode::Live(_))
183 }
184}
185
186#[async_trait]
189pub trait JournalingObject: Send + Sync {
190 fn apply_mutation(
194 &self,
195 mutation: Mutation,
196 context: &ApplyContext<'_, '_>,
197 assoc_obj: AssocObj<'_>,
198 ) -> Result<(), Error>;
199
200 fn drop_mutation(&self, mutation: Mutation, transaction: &Transaction<'_>);
202
203 async fn flush(&self) -> Result<Version, Error>;
207
208 fn write_mutation(&self, mutation: &Mutation, mut writer: journal::Writer<'_>) {
211 writer.write(mutation.clone());
212 }
213}
214
215#[derive(Default)]
216pub struct SyncOptions<'a> {
217 pub flush_device: bool,
224
225 pub precondition: Option<Box<dyn FnOnce() -> bool + 'a + Send>>,
228}
229
230pub struct OpenFxFilesystem(Arc<FxFilesystem>);
231
232impl OpenFxFilesystem {
233 pub async fn take_device(self) -> DeviceHolder {
236 let fut = self.device.take_when_dropped();
237 std::mem::drop(self);
238 debug_assert_not_too_long!(fut)
239 }
240}
241
242impl From<Arc<FxFilesystem>> for OpenFxFilesystem {
243 fn from(fs: Arc<FxFilesystem>) -> Self {
244 Self(fs)
245 }
246}
247
248impl Drop for OpenFxFilesystem {
249 fn drop(&mut self) {
250 if self.options.image_builder_mode.is_some()
251 && self.journal().image_builder_mode().is_some()
252 {
253 error!("OpenFxFilesystem in image_builder_mode dropped without calling close().");
254 }
255 if !self.options.read_only && !self.closed.load(Ordering::SeqCst) {
256 error!("OpenFxFilesystem dropped without first being closed. Data loss may occur.");
257 }
258 }
259}
260
261impl std::ops::Deref for OpenFxFilesystem {
262 type Target = Arc<FxFilesystem>;
263
264 fn deref(&self) -> &Self::Target {
265 &self.0
266 }
267}
268
269pub struct FxFilesystemBuilder {
270 format: bool,
271 trace: bool,
272 options: Options,
273 journal_options: JournalOptions,
274 on_new_allocator: Option<Box<dyn Fn(Arc<Allocator>) + Send + Sync>>,
275 on_new_store: Option<Box<dyn Fn(&ObjectStore) + Send + Sync>>,
276 fsck_after_every_transaction: bool,
277}
278
279impl FxFilesystemBuilder {
280 pub fn new() -> Self {
281 Self {
282 format: false,
283 trace: false,
284 options: Options::default(),
285 journal_options: JournalOptions::default(),
286 on_new_allocator: None,
287 on_new_store: None,
288 fsck_after_every_transaction: false,
289 }
290 }
291
292 pub fn format(mut self, format: bool) -> Self {
294 self.format = format;
295 self
296 }
297
298 pub fn trace(mut self, trace: bool) -> Self {
300 self.trace = trace;
301 self
302 }
303
304 pub fn read_only(mut self, read_only: bool) -> Self {
307 self.options.read_only = read_only;
308 self
309 }
310
311 pub fn image_builder_mode(mut self, mode: Option<SuperBlockInstance>) -> Self {
318 self.options.image_builder_mode = mode;
319 self
320 }
321
322 pub fn roll_metadata_key_byte_count(mut self, roll_metadata_key_byte_count: u64) -> Self {
324 self.options.roll_metadata_key_byte_count = roll_metadata_key_byte_count;
325 self
326 }
327
328 pub fn pre_commit_hook(
330 mut self,
331 hook: impl Fn(&Transaction<'_>) -> Result<(), Error> + Send + Sync + 'static,
332 ) -> Self {
333 self.options.pre_commit_hook = Some(Box::new(hook));
334 self
335 }
336
337 pub fn post_commit_hook(
340 mut self,
341 hook: impl Fn() -> futures::future::BoxFuture<'static, ()> + Send + Sync + 'static,
342 ) -> Self {
343 self.options.post_commit_hook = Some(Box::new(hook));
344 self
345 }
346
347 pub fn skip_initial_reap(mut self, skip_initial_reap: bool) -> Self {
350 self.options.skip_initial_reap = skip_initial_reap;
351 self
352 }
353
354 pub fn journal_options(mut self, journal_options: JournalOptions) -> Self {
356 self.journal_options = journal_options;
357 self
358 }
359
360 pub fn on_new_allocator(
362 mut self,
363 on_new_allocator: impl Fn(Arc<Allocator>) + Send + Sync + 'static,
364 ) -> Self {
365 self.on_new_allocator = Some(Box::new(on_new_allocator));
366 self
367 }
368
369 pub fn on_new_store(
371 mut self,
372 on_new_store: impl Fn(&ObjectStore) + Send + Sync + 'static,
373 ) -> Self {
374 self.on_new_store = Some(Box::new(on_new_store));
375 self
376 }
377
378 pub fn fsck_after_every_transaction(mut self, fsck_after_every_transaction: bool) -> Self {
380 self.fsck_after_every_transaction = fsck_after_every_transaction;
381 self
382 }
383
384 pub fn trim_config(mut self, delay_and_interval: Option<(Duration, Duration)>) -> Self {
385 self.options.trim_config = delay_and_interval;
386 self
387 }
388
389 pub fn power_manager(mut self, power_manager: Arc<dyn PowerManager>) -> Self {
390 self.options.power_manager = Some(power_manager);
391 self
392 }
393
394 pub fn trim_charger_wait(mut self, wait: Duration) -> Self {
395 self.options.trim_charger_wait = wait;
396 self
397 }
398
399 pub fn inline_crypto_enabled(mut self, inline_crypto_enabled: bool) -> Self {
401 self.options.inline_crypto_enabled = inline_crypto_enabled;
402 self
403 }
404
405 pub fn barriers_enabled(mut self, barriers_enabled: bool) -> Self {
408 self.options.barriers_enabled = barriers_enabled;
409 self.journal_options.barriers_enabled = barriers_enabled;
410 self
411 }
412
413 pub async fn open(self, device: DeviceHolder) -> Result<OpenFxFilesystem, Error> {
415 let read_only = self.options.read_only;
416 if self.format && read_only {
417 bail!("Cannot initialize a filesystem as read-only");
418 }
419
420 if self.options.inline_crypto_enabled && !self.options.barriers_enabled {
422 bail!("A filesystem using inline encryption requires barriers");
423 }
424
425 let objects = Arc::new(ObjectManager::new(self.on_new_store));
426 let journal = Arc::new(Journal::new(objects.clone(), self.journal_options));
427
428 let image_builder_mode = self.options.image_builder_mode;
429
430 let block_size = std::cmp::max(device.block_size().into(), MIN_BLOCK_SIZE);
431 assert_eq!(block_size % MIN_BLOCK_SIZE, 0);
432 assert!(block_size <= MAX_BLOCK_SIZE, "Max supported block size is 64KiB");
433
434 let mut fsck_after_every_transaction = None;
435 let mut filesystem_options = self.options;
436 if self.fsck_after_every_transaction {
437 let instance =
438 FsckAfterEveryTransaction::new(filesystem_options.post_commit_hook.take());
439 fsck_after_every_transaction = Some(instance.clone());
440 filesystem_options.post_commit_hook =
441 Some(Box::new(move || Box::pin(instance.clone().run())));
442 }
443
444 if !read_only && !self.format {
445 device.flush().await.context("Device flush failed")?;
448 }
449
450 let filesystem = Arc::new_cyclic(|weak: &Weak<FxFilesystem>| {
451 let weak = weak.clone();
452 FxFilesystem {
453 device,
454 block_size,
455 objects: objects.clone(),
456 journal,
457 commit_mutex: futures::lock::Mutex::new(()),
458 lock_manager: LockManager::new(),
459 flush_task: Mutex::new(None),
460 background_tasks: fasync::Scope::new(),
461 closed: AtomicBool::new(true),
462 trace: self.trace,
463 graveyard: Graveyard::new(objects.clone()),
464 completed_transactions: metrics::detail().create_uint("completed_transactions", 0),
465 options: filesystem_options,
466 in_flight_transactions: AtomicU64::new(0),
467 transaction_limit_event: Event::new(),
468 _stores_node: metrics::register_fs(move || {
469 let weak = weak.clone();
470 Box::pin(async move {
471 if let Some(fs) = weak.upgrade() {
472 fs.populate_stores_node().await
473 } else {
474 Err(anyhow!("Filesystem has been dropped"))
475 }
476 })
477 }),
478 }
479 });
480
481 filesystem.journal().set_image_builder_mode(image_builder_mode);
482
483 filesystem.journal.set_trace(self.trace);
484 if self.format {
485 filesystem.journal.init_empty(filesystem.clone()).await?;
486 if image_builder_mode.is_none() {
487 filesystem.journal.init_superblocks().await?;
490
491 filesystem.graveyard.clone().reap_async();
493 }
494
495 let root_store = filesystem.root_store();
497 root_store.set_trace(self.trace);
498 let root_directory =
499 Directory::open(&root_store, root_store.root_directory_object_id())
500 .await
501 .context("Unable to open root volume directory")?;
502 let mut transaction = filesystem
503 .clone()
504 .new_transaction(
505 lock_keys![LockKey::object(
506 root_store.store_object_id(),
507 root_directory.object_id()
508 )],
509 transaction::Options::default(),
510 )
511 .await?;
512 let volume_directory =
513 root_directory.create_child_dir(&mut transaction, VOLUMES_DIRECTORY).await?;
514 transaction.commit().await?;
515 objects.set_volume_directory(volume_directory);
516 } else {
517 filesystem
518 .journal
519 .replay(filesystem.clone(), self.on_new_allocator)
520 .await
521 .context("Journal replay failed")?;
522 filesystem.root_store().set_trace(self.trace);
523
524 if !read_only {
525 for store in objects.unlocked_stores() {
529 filesystem.graveyard.initial_reap(&store).await?;
530 }
531 }
532 }
533
534 if let Some(fsck_after_every_transaction) = fsck_after_every_transaction {
536 fsck_after_every_transaction
537 .fs
538 .set(Arc::downgrade(&filesystem))
539 .unwrap_or_else(|_| unreachable!());
540 }
541
542 filesystem.closed.store(false, Ordering::SeqCst);
543
544 if !read_only && image_builder_mode.is_none() {
545 filesystem.graveyard.clone().reap_async();
547
548 if filesystem.options.trim_config.is_some() {
549 filesystem.start_trim_task();
550 }
551 filesystem.start_clean_transfer_buffer_task();
552 }
553
554 Ok(filesystem.into())
555 }
556}
557
558pub struct FxFilesystem {
559 block_size: u64,
560 objects: Arc<ObjectManager>,
561 journal: Arc<Journal>,
562 commit_mutex: futures::lock::Mutex<()>,
563 lock_manager: LockManager,
564 flush_task: Mutex<Option<fasync::Task<()>>>,
565 background_tasks: fasync::Scope,
566 closed: AtomicBool,
567 trace: bool,
569 graveyard: Arc<Graveyard>,
570 completed_transactions: UintProperty,
571 options: Options,
572
573 in_flight_transactions: AtomicU64,
575
576 transaction_limit_event: Event,
579
580 device: DeviceHolder,
583
584 _stores_node: LazyNode,
586}
587
588#[fxfs_trace::trace]
589impl FxFilesystem {
590 pub async fn new_empty(device: DeviceHolder) -> Result<OpenFxFilesystem, Error> {
591 FxFilesystemBuilder::new().format(true).open(device).await
592 }
593
594 pub async fn open(device: DeviceHolder) -> Result<OpenFxFilesystem, Error> {
595 FxFilesystemBuilder::new().open(device).await
596 }
597
598 pub fn root_parent_store(&self) -> Arc<ObjectStore> {
599 self.objects.root_parent_store()
600 }
601
602 pub async fn close(&self) -> Result<(), Error> {
603 if self.journal().image_builder_mode().is_some() {
604 self.journal().allocate_journal().await?;
605 self.journal().set_image_builder_mode(None);
606 self.journal().force_compact().await?;
607 }
608 assert_eq!(self.closed.swap(true, Ordering::SeqCst), false);
609 debug_assert_not_too_long!(self.graveyard.wait_for_reap());
610 debug_assert_not_too_long!(self.background_tasks.clone().cancel());
611 self.journal.stop_compactions().await;
612 let sync_status =
613 if self.journal().image_builder_mode().is_some() || self.options().read_only {
614 Ok(None)
615 } else {
616 self.journal.sync(SyncOptions { flush_device: true, ..Default::default() }).await
617 };
618 match &sync_status {
619 Ok(None) => {}
620 Ok(checkpoint) => info!(
621 "Filesystem closed (checkpoint={}, metadata_reservation={:?}, \
622 reservation_required={}, borrowed={})",
623 checkpoint.as_ref().unwrap().0.file_offset,
624 self.object_manager().metadata_reservation(),
625 self.object_manager().required_reservation(),
626 self.object_manager().borrowed_metadata_space(),
627 ),
628 Err(e) => error!(error:? = e; "Failed to sync filesystem; data may be lost"),
629 }
630 self.journal.terminate();
631 let flush_task = self.flush_task.lock().take();
632 if let Some(task) = flush_task {
633 debug_assert_not_too_long!(task);
634 }
635 self.device().close().await.context("Failed to close device")?;
638 sync_status.map(|_| ())
639 }
640
641 pub fn device(&self) -> Arc<dyn Device> {
642 Arc::clone(&self.device)
643 }
644
645 pub fn root_store(&self) -> Arc<ObjectStore> {
646 self.objects.root_store()
647 }
648
649 pub fn allocator(&self) -> Arc<Allocator> {
650 self.objects.allocator()
651 }
652
653 pub fn enable_allocations(&self) {
657 self.allocator().enable_allocations();
658 }
659
660 pub fn object_manager(&self) -> &Arc<ObjectManager> {
661 &self.objects
662 }
663
664 pub fn journal(&self) -> &Arc<Journal> {
665 &self.journal
666 }
667
668 pub async fn sync(&self, options: SyncOptions<'_>) -> Result<(), Error> {
669 self.journal.sync(options).await.map(|_| ())
670 }
671
672 pub fn block_size(&self) -> u64 {
673 self.block_size
674 }
675
676 pub fn get_info(&self) -> Info {
677 Info {
678 total_bytes: self.device.size(),
679 used_bytes: self.object_manager().allocator().get_used_bytes().0,
680 }
681 }
682
683 pub fn super_block_header(&self) -> SuperBlockHeader {
684 self.journal.super_block_header()
685 }
686
687 pub fn graveyard(&self) -> &Arc<Graveyard> {
688 &self.graveyard
689 }
690
691 pub fn trace(&self) -> bool {
692 self.trace
693 }
694
695 pub fn options(&self) -> &Options {
696 &self.options
697 }
698
699 pub async fn txn_guard(self: Arc<Self>) -> TxnGuard<'static> {
706 TxnGuard::Owned(
707 self.lock_manager
708 .read_lock(lock_keys!(LockKey::Filesystem))
709 .await
710 .into_owned(self.clone()),
711 )
712 }
713
714 pub async fn new_transaction<'a>(
715 self: Arc<Self>,
716 locks: LockKeys,
717 options: transaction::Options<'a>,
718 ) -> Result<Transaction<'a>, Error> {
719 let guard = if let Some(guard) = options.txn_guard.as_ref() {
720 TxnGuard::Borrowed(guard)
721 } else {
722 self.txn_guard().await
723 };
724 Transaction::new(guard, options, locks).await
725 }
726
727 #[trace]
728 pub async fn commit_transaction<R: Send>(
729 &self,
730 transaction: &mut Transaction<'_>,
731 callback: impl FnOnce(u64) -> R + Send,
732 ) -> Result<R, Error> {
733 if let Some(hook) = self.options.pre_commit_hook.as_ref() {
734 hook(transaction)?;
735 }
736 debug_assert_not_too_long!(self.lock_manager.commit_prepare(&transaction));
737 self.maybe_start_flush_task();
738 let _guard = debug_assert_not_too_long!(self.commit_mutex.lock());
739 let journal_offset = if self.journal().image_builder_mode().is_some() {
740 let journal_checkpoint =
741 JournalCheckpoint { file_offset: 0, checksum: 0, version: LATEST_VERSION };
742 let maybe_mutation = self
743 .object_manager()
744 .apply_transaction(transaction, &journal_checkpoint)
745 .expect("Transactions must not fail in image_builder_mode");
746 if let Some(mutation) = maybe_mutation {
747 assert!(matches!(mutation, Mutation::UpdateBorrowed(_)));
748 }
752 self.object_manager().did_commit_transaction(transaction, &journal_checkpoint, 0);
753 0
754 } else {
755 self.journal.commit(transaction).await?
756 };
757 self.completed_transactions.add(1);
758
759 let result = callback(journal_offset);
764
765 if let Some(hook) = self.options.post_commit_hook.as_ref() {
766 hook().await;
767 }
768
769 Ok(result)
770 }
771
772 pub fn lock_manager(&self) -> &LockManager {
773 &self.lock_manager
774 }
775
776 pub(crate) fn drop_transaction(&self, transaction: &mut Transaction<'_>) {
777 if !matches!(transaction.metadata_reservation, MetadataReservation::None) {
778 self.sub_transaction();
779 }
780 if let MetadataReservation::Hold(hold_amount) =
782 std::mem::replace(&mut transaction.metadata_reservation, MetadataReservation::None)
783 {
784 let hold = transaction
785 .allocator_reservation
786 .unwrap()
787 .reserve(0)
788 .expect("Zero should always succeed.");
789 hold.add(hold_amount);
790 }
791 self.objects.drop_transaction(transaction);
792 self.lock_manager.drop_transaction(transaction);
793 }
794
795 fn maybe_start_flush_task(&self) {
796 if self.journal.image_builder_mode().is_some() {
797 return;
798 }
799 let mut flush_task = self.flush_task.lock();
800 if flush_task.is_none() {
801 let journal = self.journal.clone();
802 *flush_task = Some(fasync::Task::spawn(
803 journal.flush_task().trace(trace_future_args!("Journal::flush_task")),
804 ));
805 }
806 }
807
808 fn start_trim_task(self: &Arc<Self>) {
809 if !self.device.supports_trim() {
810 info!("Device does not support trim; not scheduling trimming");
811 return;
812 }
813 let this = self.clone();
814 self.background_tasks
815 .spawn(this.trim_task().trace(trace_future_args!("Filesystem::trim_task")));
816 }
817
818 async fn trim_task(self: Arc<Self>) {
819 let Some((mut next_timer, _)) = self.options.trim_config else { return };
821 loop {
822 fasync::Timer::new(next_timer.clone()).await;
823
824 let start = Instant::now();
827 let result = if let Some(pm) = &self.options.power_manager {
828 let mut watcher = pm.clone().watch_battery();
829
830 let pauser = Pauser::new(self.options.trim_charger_wait);
832
833 let mut pause_future = pin!(
834 async {
835 let mut wake_lease = WakeLease::invalid();
836 loop {
837 let Some((using_battery, new_lease)) = watcher.next_latest().await
838 else {
839 pauser.set_pause(false);
842 drop(wake_lease); return;
844 };
845
846 pauser.set_pause(using_battery);
848
849 if using_battery {
852 wake_lease = WakeLease::invalid();
853 } else if !new_lease.is_invalid() {
854 wake_lease = new_lease;
855 }
856 }
857 }
858 .fuse()
859 );
860
861 let mut do_trim = pin!(self.do_trim(Some(&pauser)).fuse());
862
863 loop {
864 futures::select! {
865 _ = pause_future => {}
866 result = do_trim => break result,
867 }
868 }
869
870 } else {
873 self.do_trim(None).await
874 };
875
876 let duration = start.elapsed();
877 match result {
878 Ok(bytes_trimmed) => info!(
879 "Trimmed {bytes_trimmed} bytes in {duration:?}. Next trim in \
880 {next_timer:?}",
881 ),
882 Err(error) => error!(error:?; "Failed to trim"),
883 }
884
885 let Some((_, interval)) = self.options.trim_config else { return };
886 next_timer = interval;
887 if next_timer.is_zero() {
888 fasync::yield_now().await;
889 }
890 }
891 }
892
893 async fn do_trim(&self, pauser: Option<&Pauser>) -> Result<usize, Error> {
895 const MAX_EXTENTS_PER_BATCH: usize = 8;
896 const MAX_EXTENT_SIZE: usize = 256 * 1024;
897 let mut offset = 0;
898 let mut bytes_trimmed = 0;
899 loop {
900 let allocator = self.allocator();
901 if let Some(pauser) = pauser {
902 pauser.maybe_pause().await;
903 }
904 let trimmable_extents =
905 allocator.take_for_trimming(offset, MAX_EXTENT_SIZE, MAX_EXTENTS_PER_BATCH).await?;
906 for device_range in trimmable_extents.extents() {
907 self.device.trim(device_range.clone()).await?;
908 bytes_trimmed += device_range.length()? as usize;
909 }
910 if let Some(device_range) = trimmable_extents.extents().last() {
911 offset = device_range.end;
912 } else {
913 break;
914 }
915 }
916 Ok(bytes_trimmed)
917 }
918
919 fn start_clean_transfer_buffer_task(self: &Arc<Self>) {
920 let this = self.clone();
921 self.background_tasks.spawn(
922 async move {
923 loop {
924 fasync::Timer::new(CLEAN_TRANSFER_BUFFER_INTERVAL).await;
925 this.device().clean_transfer_buffer();
926 }
927 }
928 .trace(trace_future_args!("Filesystem::clean_transfer_buffer_task")),
929 );
930 }
931
932 pub(crate) async fn reservation_for_transaction<'a>(
933 self: &Arc<Self>,
934 options: transaction::Options<'a>,
935 ) -> Result<(MetadataReservation, Option<&'a Reservation>, Option<Hold<'a>>), Error> {
936 if self.options.image_builder_mode.is_some() {
937 return Ok((MetadataReservation::Borrowed, None, None));
940 }
941 if !options.skip_journal_checks {
942 self.maybe_start_flush_task();
943 self.journal.check_journal_space().await?;
944 }
945
946 let mut hold = None;
960 let metadata_reservation = if options.borrow_metadata_space {
961 MetadataReservation::Borrowed
962 } else {
963 match options.allocator_reservation {
964 Some(reservation) => {
965 hold = Some(
966 reservation
967 .reserve(TRANSACTION_METADATA_MAX_AMOUNT)
968 .ok_or(FxfsError::NoSpace)?,
969 );
970 MetadataReservation::Hold(TRANSACTION_METADATA_MAX_AMOUNT)
971 }
972 None => {
973 let reservation = self
974 .allocator()
975 .reserve(None, TRANSACTION_METADATA_MAX_AMOUNT)
976 .ok_or(FxfsError::NoSpace)?;
977 MetadataReservation::Reservation(reservation)
978 }
979 }
980 };
981 Ok((metadata_reservation, options.allocator_reservation, hold))
982 }
983
984 pub(crate) async fn add_transaction(&self, skip_journal_checks: bool) {
985 if skip_journal_checks {
986 self.in_flight_transactions.fetch_add(1, Ordering::Relaxed);
987 } else {
988 let inc = || {
989 let mut in_flights = self.in_flight_transactions.load(Ordering::Relaxed);
990 while in_flights < MAX_IN_FLIGHT_TRANSACTIONS {
991 match self.in_flight_transactions.compare_exchange_weak(
992 in_flights,
993 in_flights + 1,
994 Ordering::Relaxed,
995 Ordering::Relaxed,
996 ) {
997 Ok(_) => return true,
998 Err(x) => in_flights = x,
999 }
1000 }
1001 return false;
1002 };
1003 while !inc() {
1004 let listener = self.transaction_limit_event.listen();
1005 if inc() {
1006 break;
1007 }
1008 listener.await;
1009 }
1010 }
1011 }
1012
1013 pub(crate) fn sub_transaction(&self) {
1014 let old = self.in_flight_transactions.fetch_sub(1, Ordering::Relaxed);
1015 assert!(old != 0);
1016 if old <= MAX_IN_FLIGHT_TRANSACTIONS {
1017 self.transaction_limit_event.notify(usize::MAX);
1018 }
1019 }
1020
1021 pub async fn truncate_guard(&self, store_id: u64, object_id: u64) -> TruncateGuard<'_> {
1022 let keys = lock_keys![LockKey::truncate(store_id, object_id,)];
1023 TruncateGuard(self.lock_manager().write_lock(keys).await)
1024 }
1025
1026 async fn populate_stores_node(&self) -> Result<Inspector, Error> {
1027 let inspector = fuchsia_inspect::Inspector::default();
1028 let root = inspector.root();
1029 root.record_child("__root", |n| self.root_store().record_data(n));
1030 root.record_child("__root_parent", |n| self.root_parent_store().record_data(n));
1031 let object_manager = self.object_manager();
1032 let volume_directory = object_manager.volume_directory();
1033 let layer_set = volume_directory.store().tree().layer_set();
1034 let mut merger = layer_set.merger();
1035 let mut iter = volume_directory.iter(&mut merger).await?;
1036 while let Some((name, id, _)) = iter.get() {
1037 if let Some(store) = object_manager.store(id) {
1038 root.record_child(name.to_string(), |n| store.record_data(n));
1039 }
1040 iter.advance().await?;
1041 }
1042 Ok(inspector)
1043 }
1044}
1045
1046pub enum TxnGuard<'a> {
1047 Borrowed(&'a TxnGuard<'a>),
1048 Owned(ReadGuard<'static>),
1049}
1050
1051impl TxnGuard<'_> {
1052 pub fn fs(&self) -> &Arc<FxFilesystem> {
1053 match self {
1054 TxnGuard::Borrowed(b) => b.fs(),
1055 TxnGuard::Owned(o) => o.fs().unwrap(),
1056 }
1057 }
1058}
1059
1060#[allow(dead_code)]
1062pub struct TruncateGuard<'a>(WriteGuard<'a>);
1063
1064pub async fn mkfs(device: DeviceHolder) -> Result<DeviceHolder, Error> {
1066 let fs = FxFilesystem::new_empty(device).await?;
1067 fs.close().await?;
1068 Ok(fs.take_device().await)
1069}
1070
1071pub async fn mkfs_with_volume(
1075 device: DeviceHolder,
1076 volume_name: &str,
1077 crypt: Option<Arc<dyn Crypt>>,
1078) -> Result<DeviceHolder, Error> {
1079 let fs = FxFilesystem::new_empty(device).await?;
1080 {
1081 let root_volume = root_volume(fs.clone()).await.expect("Open root_volume failed");
1084 root_volume
1085 .new_volume(
1086 volume_name,
1087 NewChildStoreOptions {
1088 options: StoreOptions { crypt, ..StoreOptions::default() },
1089 ..Default::default()
1090 },
1091 )
1092 .await
1093 .expect("Create volume failed");
1094 }
1095 fs.close().await?;
1096 Ok(fs.take_device().await)
1097}
1098
1099struct FsckAfterEveryTransaction {
1100 fs: OnceLock<Weak<FxFilesystem>>,
1101 old_hook: PostCommitHook,
1102}
1103
1104impl FsckAfterEveryTransaction {
1105 fn new(old_hook: PostCommitHook) -> Arc<Self> {
1106 Arc::new(Self { fs: OnceLock::new(), old_hook })
1107 }
1108
1109 async fn run(self: Arc<Self>) {
1110 if let Some(fs) = self.fs.get().and_then(Weak::upgrade) {
1111 let options = FsckOptions {
1112 fail_on_warning: true,
1113 no_lock: true,
1114 quiet: true,
1115 ..Default::default()
1116 };
1117 fsck_with_options(fs.clone(), &options).await.expect("fsck failed");
1118 let object_manager = fs.object_manager();
1119 for store in object_manager.unlocked_stores() {
1120 let store_id = store.store_object_id();
1121 if !object_manager.is_system_store(store_id) {
1122 fsck_volume_with_options(fs.as_ref(), &options, store_id, None)
1123 .await
1124 .expect("fsck_volume_with_options failed");
1125 }
1126 }
1127 }
1128 if let Some(old_hook) = self.old_hook.as_ref() {
1129 old_hook().await;
1130 }
1131 }
1132}
1133
1134struct Pauser {
1135 pause: Condition<bool>,
1136 bounce_delay: Duration,
1137}
1138
1139impl Pauser {
1140 fn new(bounce_delay: Duration) -> Self {
1142 Self { pause: Condition::new(true), bounce_delay }
1143 }
1144
1145 async fn maybe_pause(&self) {
1146 loop {
1147 if !*self.pause.lock() {
1148 return;
1149 }
1150 self.pause.when(|p| if *p { Poll::Pending } else { Poll::Ready(()) }).await;
1151 fasync::Timer::new(self.bounce_delay).await;
1152 }
1153 }
1154
1155 fn set_pause(&self, v: bool) {
1156 let mut guard = self.pause.lock();
1157 if *guard == v {
1158 return;
1159 }
1160 *guard = v;
1161 for waker in guard.drain_wakers() {
1162 waker.wake();
1163 }
1164 }
1165}
1166
1167trait NextLatest: Stream + Unpin {
1168 async fn next_latest(&mut self) -> Option<Self::Item> {
1170 let Some(mut next) = self.next().await else { return None };
1171
1172 loop {
1174 match self.next().now_or_never() {
1175 None => return Some(next),
1176 Some(None) => return None,
1177 Some(Some(n)) => next = n,
1178 }
1179 }
1180 }
1181}
1182
1183impl<T: ?Sized + Unpin> NextLatest for T where T: Stream {}
1184
1185#[cfg(test)]
1186mod tests {
1187 use super::{FxFilesystem, FxFilesystemBuilder, FxfsError, SyncOptions};
1188 use crate::fsck::{fsck, fsck_volume};
1189 use crate::log::*;
1190 use crate::lsm_tree::Operation;
1191 use crate::lsm_tree::types::Item;
1192 use crate::object_handle::{
1193 INVALID_OBJECT_ID, ObjectHandle, ReadObjectHandle, WriteObjectHandle,
1194 };
1195 use crate::object_store::directory::{Directory, replace_child};
1196 use crate::object_store::journal::JournalOptions;
1197 use crate::object_store::journal::super_block::SuperBlockInstance;
1198 use crate::object_store::transaction::{LockKey, Options, lock_keys};
1199 use crate::object_store::volume::root_volume;
1200 use crate::object_store::{
1201 HandleOptions, NewChildStoreOptions, ObjectDescriptor, ObjectStore, StoreOptions,
1202 };
1203 use crate::range::RangeExt;
1204 use fuchsia_async as fasync;
1205 use fuchsia_sync::Mutex;
1206 use futures::future::join_all;
1207 use futures::stream::{FuturesUnordered, TryStreamExt};
1208 use fxfs_insecure_crypto::new_insecure_crypt;
1209 use rustc_hash::FxHashMap as HashMap;
1210 use std::ops::Range;
1211 use std::sync::Arc;
1212 use std::sync::atomic::{AtomicU32, Ordering};
1213 use std::time::Duration;
1214 use storage_device::DeviceHolder;
1215 use storage_device::fake_device::{self, FakeDevice};
1216 use test_case::test_case;
1217
1218 const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
1219
1220 #[fuchsia::test(threads = 10)]
1221 async fn test_compaction() {
1222 let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1223
1224 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1226 let root_store = fs.root_store();
1227 let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
1228 .await
1229 .expect("open failed");
1230
1231 let mut tasks = Vec::new();
1232 for i in 0..2 {
1233 let mut transaction = fs
1234 .clone()
1235 .new_transaction(
1236 lock_keys![LockKey::object(
1237 root_store.store_object_id(),
1238 root_directory.object_id()
1239 )],
1240 Options::default(),
1241 )
1242 .await
1243 .expect("new_transaction failed");
1244 let handle = root_directory
1245 .create_child_file(&mut transaction, &format!("{}", i))
1246 .await
1247 .expect("create_child_file failed");
1248 transaction.commit().await.expect("commit failed");
1249 tasks.push(fasync::Task::spawn(async move {
1250 const TEST_DATA: &[u8] = b"hello";
1251 let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
1252 buf.as_mut_slice().copy_from_slice(TEST_DATA);
1253 for _ in 0..1500 {
1254 handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
1255 }
1256 }));
1257 }
1258 join_all(tasks).await;
1259 fs.sync(SyncOptions::default()).await.expect("sync failed");
1260
1261 fsck(fs.clone()).await.expect("fsck failed");
1262 fs.close().await.expect("Close failed");
1263 }
1264
1265 #[fuchsia::test]
1266 async fn test_enable_allocations() {
1267 {
1269 let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1270 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1271 fs.enable_allocations();
1272 let root_store = fs.root_store();
1273 let root_directory =
1274 Directory::open(&root_store, root_store.root_directory_object_id())
1275 .await
1276 .expect("open failed");
1277 let mut transaction = fs
1278 .clone()
1279 .new_transaction(
1280 lock_keys![LockKey::object(
1281 root_store.store_object_id(),
1282 root_directory.object_id()
1283 )],
1284 Options::default(),
1285 )
1286 .await
1287 .expect("new_transaction failed");
1288 root_directory
1289 .create_child_file(&mut transaction, "test")
1290 .await
1291 .expect("create_child_file failed");
1292 transaction.commit().await.expect("commit failed");
1293 fs.close().await.expect("close failed");
1294 }
1295
1296 {
1298 let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1299 let fs = FxFilesystemBuilder::new()
1300 .format(true)
1301 .image_builder_mode(Some(SuperBlockInstance::A))
1302 .open(device)
1303 .await
1304 .expect("open failed");
1305 let root_store = fs.root_store();
1306 let root_directory =
1307 Directory::open(&root_store, root_store.root_directory_object_id())
1308 .await
1309 .expect("open failed");
1310
1311 let mut transaction = fs
1312 .clone()
1313 .new_transaction(
1314 lock_keys![LockKey::object(
1315 root_store.store_object_id(),
1316 root_directory.object_id()
1317 )],
1318 Options::default(),
1319 )
1320 .await
1321 .expect("new_transaction failed");
1322 let handle = root_directory
1323 .create_child_file(&mut transaction, "test_fail")
1324 .await
1325 .expect("create_child_file failed");
1326 transaction.commit().await.expect("commit failed");
1327
1328 assert!(
1330 FxfsError::Unavailable
1331 .matches(&handle.allocate(0..4096).await.expect_err("allocate should fail"))
1332 );
1333
1334 fs.enable_allocations();
1336 handle.allocate(0..4096).await.expect("allocate should work after enable_allocations");
1337
1338 fs.close().await.expect("close failed");
1342 }
1343 }
1347
1348 #[fuchsia::test(threads = 10)]
1349 async fn test_replay_is_identical() {
1350 let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1351 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1352
1353 fs.close().await.expect("close failed");
1356 let device = fs.take_device().await;
1357 device.reopen(false);
1358
1359 struct Mutations<K, V>(Mutex<Vec<(Operation, Item<K, V>)>>);
1360
1361 impl<K: Clone, V: Clone> Mutations<K, V> {
1362 fn new() -> Self {
1363 Mutations(Mutex::new(Vec::new()))
1364 }
1365
1366 fn push(&self, operation: Operation, item: &Item<K, V>) {
1367 self.0.lock().push((operation, item.clone()));
1368 }
1369 }
1370
1371 let open_fs = |device,
1372 object_mutations: Arc<Mutex<HashMap<_, _>>>,
1373 allocator_mutations: Arc<Mutations<_, _>>| async {
1374 FxFilesystemBuilder::new()
1375 .journal_options(JournalOptions { reclaim_size: u64::MAX, ..Default::default() })
1376 .on_new_allocator(move |allocator| {
1377 let allocator_mutations = allocator_mutations.clone();
1378 allocator.tree().set_mutation_callback(Some(Box::new(move |op, item| {
1379 allocator_mutations.push(op, item)
1380 })));
1381 })
1382 .on_new_store(move |store| {
1383 let mutations = Arc::new(Mutations::new());
1384 object_mutations.lock().insert(store.store_object_id(), mutations.clone());
1385 store.tree().set_mutation_callback(Some(Box::new(move |op, item| {
1386 mutations.push(op, item)
1387 })));
1388 })
1389 .open(device)
1390 .await
1391 .expect("open failed")
1392 };
1393
1394 let allocator_mutations = Arc::new(Mutations::new());
1395 let object_mutations = Arc::new(Mutex::new(HashMap::default()));
1396 let fs = open_fs(device, object_mutations.clone(), allocator_mutations.clone()).await;
1397
1398 let root_store = fs.root_store();
1399 let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
1400 .await
1401 .expect("open failed");
1402
1403 let mut transaction = fs
1404 .clone()
1405 .new_transaction(
1406 lock_keys![LockKey::object(
1407 root_store.store_object_id(),
1408 root_directory.object_id()
1409 )],
1410 Options::default(),
1411 )
1412 .await
1413 .expect("new_transaction failed");
1414 let object = root_directory
1415 .create_child_file(&mut transaction, "test")
1416 .await
1417 .expect("create_child_file failed");
1418 transaction.commit().await.expect("commit failed");
1419
1420 let buf = object.allocate_buffer(10000).await;
1422 object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
1423
1424 object.write_or_append(Some(5000), buf.as_ref()).await.expect("write failed");
1426
1427 object.truncate(3000).await.expect("truncate failed");
1429
1430 let mut transaction = fs
1432 .clone()
1433 .new_transaction(
1434 lock_keys![
1435 LockKey::object(root_store.store_object_id(), root_directory.object_id()),
1436 LockKey::object(root_store.store_object_id(), object.object_id()),
1437 ],
1438 Options::default(),
1439 )
1440 .await
1441 .expect("new_transaction failed");
1442
1443 replace_child(&mut transaction, None, (&root_directory, "test"))
1444 .await
1445 .expect("replace_child failed");
1446
1447 transaction.commit().await.expect("commit failed");
1448
1449 root_store
1451 .tombstone_object(object.object_id(), Options::default())
1452 .await
1453 .expect("tombstone failed");
1454
1455 fs.close().await.expect("close failed");
1457
1458 let metadata_reservation_amount = fs.object_manager().metadata_reservation().amount();
1459
1460 let device = fs.take_device().await;
1461 device.reopen(false);
1462
1463 let replayed_object_mutations = Arc::new(Mutex::new(HashMap::default()));
1464 let replayed_allocator_mutations = Arc::new(Mutations::new());
1465 let fs = open_fs(
1466 device,
1467 replayed_object_mutations.clone(),
1468 replayed_allocator_mutations.clone(),
1469 )
1470 .await;
1471
1472 let m1 = object_mutations.lock();
1473 let m2 = replayed_object_mutations.lock();
1474 assert_eq!(m1.len(), m2.len());
1475 for (store_id, mutations) in &*m1 {
1476 let mutations = mutations.0.lock();
1477 let replayed = m2.get(&store_id).expect("Found unexpected store").0.lock();
1478 assert_eq!(mutations.len(), replayed.len());
1479 for ((op1, i1), (op2, i2)) in mutations.iter().zip(replayed.iter()) {
1480 assert_eq!(op1, op2);
1481 assert_eq!(i1.key, i2.key);
1482 assert_eq!(i1.value, i2.value);
1483 }
1484 }
1485
1486 let a1 = allocator_mutations.0.lock();
1487 let a2 = replayed_allocator_mutations.0.lock();
1488 assert_eq!(a1.len(), a2.len());
1489 for ((op1, i1), (op2, i2)) in a1.iter().zip(a2.iter()) {
1490 assert_eq!(op1, op2);
1491 assert_eq!(i1.key, i2.key);
1492 assert_eq!(i1.value, i2.value);
1493 }
1494
1495 assert_eq!(
1496 fs.object_manager().metadata_reservation().amount(),
1497 metadata_reservation_amount
1498 );
1499 }
1500
1501 #[fuchsia::test]
1502 async fn test_max_in_flight_transactions() {
1503 let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1504 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1505
1506 let transactions = FuturesUnordered::new();
1507 for _ in 0..super::MAX_IN_FLIGHT_TRANSACTIONS {
1508 transactions.push(fs.clone().new_transaction(lock_keys![], Options::default()));
1509 }
1510 let mut transactions: Vec<_> = transactions.try_collect().await.unwrap();
1511
1512 let mut fut = std::pin::pin!(fs.clone().new_transaction(lock_keys![], Options::default()));
1514 assert!(futures::poll!(&mut fut).is_pending());
1515
1516 transactions.pop();
1518
1519 assert!(futures::poll!(&mut fut).is_ready());
1520 }
1521
1522 #[fuchsia::test(threads = 10)]
1524 async fn test_continuously_trim() {
1525 let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1526 let fs = FxFilesystemBuilder::new()
1527 .trim_config(Some((Duration::ZERO, Duration::ZERO)))
1528 .format(true)
1529 .open(device)
1530 .await
1531 .expect("open failed");
1532 fasync::Timer::new(Duration::from_millis(10)).await;
1534
1535 let root_store = fs.root_store();
1538 let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
1539 .await
1540 .expect("open failed");
1541 for _ in 0..100 {
1542 let mut transaction = fs
1543 .clone()
1544 .new_transaction(
1545 lock_keys![LockKey::object(
1546 root_store.store_object_id(),
1547 root_directory.object_id()
1548 )],
1549 Options::default(),
1550 )
1551 .await
1552 .expect("new_transaction failed");
1553 let object = root_directory
1554 .create_child_file(&mut transaction, "test")
1555 .await
1556 .expect("create_child_file failed");
1557 transaction.commit().await.expect("commit failed");
1558
1559 {
1560 let buf = object.allocate_buffer(1024).await;
1561 object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
1562 }
1563 std::mem::drop(object);
1564
1565 let mut transaction = root_directory
1566 .acquire_context_for_replace(None, "test", true)
1567 .await
1568 .expect("acquire_context_for_replace failed")
1569 .transaction;
1570 replace_child(&mut transaction, None, (&root_directory, "test"))
1571 .await
1572 .expect("replace_child failed");
1573 transaction.commit().await.expect("commit failed");
1574 }
1575 fs.close().await.expect("close failed");
1576 }
1577
1578 #[test_case(true; "test power fail with barriers")]
1579 #[test_case(false; "test power fail with checksums")]
1580 #[fuchsia::test]
1581 async fn test_power_fail(barriers_enabled: bool) {
1582 for _ in 0..10 {
1585 let (store_id, device, test_file_object_id) = {
1586 let device = DeviceHolder::new(FakeDevice::new(8192, 4096));
1587 let fs = if barriers_enabled {
1588 FxFilesystemBuilder::new()
1589 .barriers_enabled(true)
1590 .format(true)
1591 .open(device)
1592 .await
1593 .expect("new filesystem failed")
1594 } else {
1595 FxFilesystem::new_empty(device).await.expect("new_empty failed")
1596 };
1597 let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
1598
1599 fs.sync(SyncOptions { flush_device: true, ..SyncOptions::default() })
1600 .await
1601 .expect("sync failed");
1602
1603 let store = root_volume
1604 .new_volume(
1605 "test",
1606 NewChildStoreOptions {
1607 options: StoreOptions {
1608 crypt: Some(Arc::new(new_insecure_crypt())),
1609 ..StoreOptions::default()
1610 },
1611 ..Default::default()
1612 },
1613 )
1614 .await
1615 .expect("new_volume failed");
1616 let root_directory = Directory::open(&store, store.root_directory_object_id())
1617 .await
1618 .expect("open failed");
1619
1620 async fn create_files(store: &Arc<ObjectStore>, prefix: &str) {
1622 let fs = store.filesystem();
1623 let root_directory = Directory::open(store, store.root_directory_object_id())
1624 .await
1625 .expect("open failed");
1626 for i in 0..100 {
1627 let mut transaction = fs
1628 .clone()
1629 .new_transaction(
1630 lock_keys![LockKey::object(
1631 store.store_object_id(),
1632 store.root_directory_object_id()
1633 )],
1634 Options::default(),
1635 )
1636 .await
1637 .expect("new_transaction failed");
1638 root_directory
1639 .create_child_file(&mut transaction, &format!("{prefix} {i}"))
1640 .await
1641 .expect("create_child_file failed");
1642 transaction.commit().await.expect("commit failed");
1643 }
1644 }
1645
1646 create_files(&store, "A").await;
1648
1649 let mut transaction = fs
1652 .clone()
1653 .new_transaction(
1654 lock_keys![LockKey::object(
1655 store.store_object_id(),
1656 store.root_directory_object_id()
1657 )],
1658 Options::default(),
1659 )
1660 .await
1661 .expect("new_transaction failed");
1662 let object = root_directory
1663 .create_child_file(&mut transaction, "test")
1664 .await
1665 .expect("create_child_file failed");
1666 transaction.commit().await.expect("commit failed");
1667
1668 let mut transaction =
1669 object.new_transaction().await.expect("new_transaction failed");
1670 let mut buffer = object.allocate_buffer(4096).await;
1671 buffer.as_mut_slice().fill(0xed);
1672 object
1673 .txn_write(&mut transaction, 0, buffer.as_ref())
1674 .await
1675 .expect("txn_write failed");
1676 transaction.commit().await.expect("commit failed");
1677
1678 create_files(&store, "B").await;
1680
1681 fs.sync(SyncOptions::default()).await.expect("sync failed");
1684
1685 fasync::Timer::new(Duration::from_millis(10)).await;
1691
1692 (
1693 store.store_object_id(),
1694 fs.device().snapshot().expect("snapshot failed"),
1695 object.object_id(),
1696 )
1697 };
1698
1699 device
1702 .discard_random_since_last_flush()
1703 .expect("discard_random_since_last_flush failed");
1704
1705 let fs = FxFilesystem::open(device).await.expect("open failed");
1706 fsck(fs.clone()).await.expect("fsck failed");
1707
1708 let mut check_test_file = false;
1709
1710 let object_id = if fs.object_manager().store(store_id).is_some() {
1713 fsck_volume(&fs, store_id, Some(Arc::new(new_insecure_crypt())))
1714 .await
1715 .expect("fsck_volume failed");
1716
1717 let store = root_volume(fs.clone())
1721 .await
1722 .expect("root_volume failed")
1723 .volume(
1724 "test",
1725 StoreOptions {
1726 crypt: Some(Arc::new(new_insecure_crypt())),
1727 ..StoreOptions::default()
1728 },
1729 )
1730 .await
1731 .expect("volume failed");
1732
1733 let root_directory = Directory::open(&store, store.root_directory_object_id())
1734 .await
1735 .expect("open failed");
1736
1737 let mut transaction = fs
1738 .clone()
1739 .new_transaction(
1740 lock_keys![LockKey::object(
1741 store.store_object_id(),
1742 store.root_directory_object_id()
1743 )],
1744 Options::default(),
1745 )
1746 .await
1747 .expect("new_transaction failed");
1748 let object = root_directory
1749 .create_child_file(&mut transaction, &format!("C"))
1750 .await
1751 .expect("create_child_file failed");
1752 transaction.commit().await.expect("commit failed");
1753
1754 if let Ok(test_file) = ObjectStore::open_object(
1756 &store,
1757 test_file_object_id,
1758 HandleOptions::default(),
1759 None,
1760 )
1761 .await
1762 {
1763 let mut buffer = test_file.allocate_buffer(4096).await;
1765 let bytes = test_file.read(0, buffer.as_mut()).await.expect("read failed");
1766 if bytes == 4096 {
1767 let expected = [0xed; 4096];
1768 assert_eq!(buffer.as_slice(), &expected);
1769 } else {
1770 assert_eq!(bytes, 0);
1772 }
1773
1774 let mut transaction =
1776 test_file.new_transaction().await.expect("new_transaction failed");
1777 buffer.as_mut_slice().fill(0x37);
1778 test_file
1779 .txn_write(&mut transaction, 0, buffer.as_ref())
1780 .await
1781 .expect("txn_write failed");
1782 transaction.commit().await.expect("commit failed");
1783 check_test_file = true;
1784 }
1785
1786 object.object_id()
1787 } else {
1788 INVALID_OBJECT_ID
1789 };
1790
1791 fs.close().await.expect("close failed");
1793 let device = fs.take_device().await;
1794 device.reopen(false);
1795
1796 let fs = FxFilesystem::open(device).await.expect("open failed");
1797 fsck(fs.clone()).await.expect("fsck failed");
1798
1799 if object_id != INVALID_OBJECT_ID {
1802 fsck_volume(&fs, store_id, Some(Arc::new(new_insecure_crypt())))
1803 .await
1804 .expect("fsck_volume failed");
1805
1806 let store = root_volume(fs.clone())
1807 .await
1808 .expect("root_volume failed")
1809 .volume(
1810 "test",
1811 StoreOptions {
1812 crypt: Some(Arc::new(new_insecure_crypt())),
1813 ..StoreOptions::default()
1814 },
1815 )
1816 .await
1817 .expect("volume failed");
1818 ObjectStore::open_object(&store, object_id, HandleOptions::default(), None)
1820 .await
1821 .expect("open_object failed");
1822
1823 if check_test_file {
1825 info!("Checking test file for modification");
1826 let test_file = ObjectStore::open_object(
1827 &store,
1828 test_file_object_id,
1829 HandleOptions::default(),
1830 None,
1831 )
1832 .await
1833 .expect("open_object failed");
1834 let mut buffer = test_file.allocate_buffer(4096).await;
1835 assert_eq!(
1836 test_file.read(0, buffer.as_mut()).await.expect("read failed"),
1837 4096
1838 );
1839 let expected = [0x37; 4096];
1840 assert_eq!(buffer.as_slice(), &expected);
1841 }
1842 }
1843
1844 fs.close().await.expect("close failed");
1845 }
1846 }
1847
1848 #[fuchsia::test]
1849 async fn test_barrier_not_emitted_when_transaction_has_no_data() {
1850 let barrier_count = Arc::new(AtomicU32::new(0));
1851
1852 struct Observer(Arc<AtomicU32>);
1853
1854 impl fake_device::Observer for Observer {
1855 fn barrier(&self) {
1856 self.0.fetch_add(1, Ordering::Relaxed);
1857 }
1858 }
1859
1860 let mut fake_device = FakeDevice::new(8192, 4096);
1861 fake_device.set_observer(Box::new(Observer(barrier_count.clone())));
1862 let device = DeviceHolder::new(fake_device);
1863 let fs = FxFilesystemBuilder::new()
1864 .barriers_enabled(true)
1865 .format(true)
1866 .open(device)
1867 .await
1868 .expect("new filesystem failed");
1869
1870 {
1871 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
1872 root_vol
1873 .new_volume(
1874 "test",
1875 NewChildStoreOptions {
1876 options: StoreOptions {
1877 crypt: Some(Arc::new(new_insecure_crypt())),
1878 ..StoreOptions::default()
1879 },
1880 ..NewChildStoreOptions::default()
1881 },
1882 )
1883 .await
1884 .expect("there is no test volume");
1885 fs.close().await.expect("close failed");
1886 }
1887 let device = fs.take_device().await;
1890 device.reopen(false);
1891 let fs = FxFilesystemBuilder::new()
1892 .barriers_enabled(true)
1893 .open(device)
1894 .await
1895 .expect("new filesystem failed");
1896 let expected_barrier_count = barrier_count.load(Ordering::Relaxed);
1897
1898 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
1899 let store = root_vol
1900 .volume(
1901 "test",
1902 StoreOptions {
1903 crypt: Some(Arc::new(new_insecure_crypt())),
1904 ..StoreOptions::default()
1905 },
1906 )
1907 .await
1908 .expect("there is no test volume");
1909
1910 let fs = store.filesystem();
1912 let root_directory =
1913 Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
1914 for i in 0..100 {
1915 let mut transaction = fs
1916 .clone()
1917 .new_transaction(
1918 lock_keys![LockKey::object(
1919 store.store_object_id(),
1920 store.root_directory_object_id()
1921 )],
1922 Options::default(),
1923 )
1924 .await
1925 .expect("new_transaction failed");
1926 root_directory
1927 .create_child_file(&mut transaction, &format!("A {i}"))
1928 .await
1929 .expect("create_child_file failed");
1930 transaction.commit().await.expect("commit failed");
1931 }
1932
1933 fs.close().await.expect("close failed");
1935 assert_eq!(expected_barrier_count, barrier_count.load(Ordering::Relaxed));
1937 }
1938
1939 #[fuchsia::test]
1940 async fn test_barrier_emitted_when_transaction_includes_data() {
1941 let barrier_count = Arc::new(AtomicU32::new(0));
1942
1943 struct Observer(Arc<AtomicU32>);
1944
1945 impl fake_device::Observer for Observer {
1946 fn barrier(&self) {
1947 self.0.fetch_add(1, Ordering::Relaxed);
1948 }
1949 }
1950
1951 let mut fake_device = FakeDevice::new(8192, 4096);
1952 fake_device.set_observer(Box::new(Observer(barrier_count.clone())));
1953 let device = DeviceHolder::new(fake_device);
1954 let fs = FxFilesystemBuilder::new()
1955 .barriers_enabled(true)
1956 .format(true)
1957 .open(device)
1958 .await
1959 .expect("new filesystem failed");
1960
1961 {
1962 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
1963 root_vol
1964 .new_volume(
1965 "test",
1966 NewChildStoreOptions {
1967 options: StoreOptions {
1968 crypt: Some(Arc::new(new_insecure_crypt())),
1969 ..StoreOptions::default()
1970 },
1971 ..NewChildStoreOptions::default()
1972 },
1973 )
1974 .await
1975 .expect("there is no test volume");
1976 fs.close().await.expect("close failed");
1977 }
1978 let device = fs.take_device().await;
1981 device.reopen(false);
1982 let fs = FxFilesystemBuilder::new()
1983 .barriers_enabled(true)
1984 .open(device)
1985 .await
1986 .expect("new filesystem failed");
1987 let expected_barrier_count = barrier_count.load(Ordering::Relaxed);
1988
1989 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
1990 let store = root_vol
1991 .volume(
1992 "test",
1993 StoreOptions {
1994 crypt: Some(Arc::new(new_insecure_crypt())),
1995 ..StoreOptions::default()
1996 },
1997 )
1998 .await
1999 .expect("there is no test volume");
2000
2001 let fs: Arc<FxFilesystem> = store.filesystem();
2003 let root_directory =
2004 Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
2005
2006 let mut transaction = fs
2007 .clone()
2008 .new_transaction(
2009 lock_keys![LockKey::object(
2010 store.store_object_id(),
2011 store.root_directory_object_id()
2012 )],
2013 Options::default(),
2014 )
2015 .await
2016 .expect("new_transaction failed");
2017 let object = root_directory
2018 .create_child_file(&mut transaction, "test")
2019 .await
2020 .expect("create_child_file failed");
2021 transaction.commit().await.expect("commit failed");
2022
2023 let mut transaction = object.new_transaction().await.expect("new_transaction failed");
2024 let mut buffer = object.allocate_buffer(4096).await;
2025 buffer.as_mut_slice().fill(0xed);
2026 object.txn_write(&mut transaction, 0, buffer.as_ref()).await.expect("txn_write failed");
2027 transaction.commit().await.expect("commit failed");
2028
2029 fs.close().await.expect("close failed");
2031 assert!(expected_barrier_count < barrier_count.load(Ordering::Relaxed));
2033 }
2034
2035 #[test_case(true; "fail when original filesystem has barriers enabled")]
2036 #[test_case(false; "fail when original filesystem has barriers disabled")]
2037 #[fuchsia::test]
2038 async fn test_switching_barrier_mode_on_existing_filesystem(original_barrier_mode: bool) {
2039 let crypt = Some(Arc::new(new_insecure_crypt()) as Arc<dyn fxfs_crypto::Crypt>);
2040 let fake_device = FakeDevice::new(8192, 4096);
2041 let device = DeviceHolder::new(fake_device);
2042 let fs: super::OpenFxFilesystem = FxFilesystemBuilder::new()
2043 .barriers_enabled(original_barrier_mode)
2044 .format(true)
2045 .open(device)
2046 .await
2047 .expect("new filesystem failed");
2048
2049 {
2051 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2052 let store = root_vol
2053 .new_volume(
2054 "test",
2055 NewChildStoreOptions {
2056 options: StoreOptions { crypt: crypt.clone(), ..Default::default() },
2057 ..Default::default()
2058 },
2059 )
2060 .await
2061 .expect("creating test volume");
2062 let root_dir = Directory::open(&store, store.root_directory_object_id())
2063 .await
2064 .expect("open failed");
2065 let mut transaction = fs
2066 .clone()
2067 .new_transaction(
2068 lock_keys![LockKey::object(
2069 store.store_object_id(),
2070 store.root_directory_object_id()
2071 )],
2072 Default::default(),
2073 )
2074 .await
2075 .expect("new_transaction failed");
2076 let object = root_dir
2077 .create_child_file(&mut transaction, "file")
2078 .await
2079 .expect("create_child_file failed");
2080 transaction.commit().await.expect("commit failed");
2081 let mut buffer = object.allocate_buffer(4096).await;
2082 buffer.as_mut_slice().fill(0xA7);
2083 let new_size = object.write_or_append(None, buffer.as_ref()).await.unwrap();
2084 assert_eq!(new_size, 4096);
2085 }
2086
2087 fs.close().await.expect("close failed");
2089 let device = fs.take_device().await;
2090 device.reopen(false);
2091 let fs = FxFilesystemBuilder::new()
2092 .barriers_enabled(!original_barrier_mode)
2093 .open(device)
2094 .await
2095 .expect("new filesystem failed");
2096 {
2097 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2098 let store = root_vol
2099 .volume("test", StoreOptions { crypt: crypt.clone(), ..Default::default() })
2100 .await
2101 .expect("opening test volume");
2102 let root_dir = Directory::open(&store, store.root_directory_object_id())
2103 .await
2104 .expect("open failed");
2105 let (object_id, _, _) =
2106 root_dir.lookup("file").await.expect("lookup failed").expect("missing file");
2107 let test_file = ObjectStore::open_object(&store, object_id, Default::default(), None)
2108 .await
2109 .expect("open failed");
2110 let mut buffer = test_file.allocate_buffer(4096).await;
2112 buffer.as_mut_slice().fill(0xA8);
2113 let new_size = test_file.write_or_append(None, buffer.as_ref()).await.unwrap();
2114 assert_eq!(new_size, 8192);
2115 }
2116
2117 fs.close().await.expect("close failed");
2120 let device = fs.take_device().await;
2121 device.reopen(false);
2122 let fs = FxFilesystemBuilder::new()
2123 .barriers_enabled(original_barrier_mode)
2124 .open(device)
2125 .await
2126 .expect("new filesystem failed");
2127 {
2128 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2129 let store = root_vol
2130 .volume("test", StoreOptions { crypt: crypt.clone(), ..Default::default() })
2131 .await
2132 .expect("opening test volume");
2133 let root_dir = Directory::open(&store, store.root_directory_object_id())
2134 .await
2135 .expect("open failed");
2136 let (object_id, _, _) =
2137 root_dir.lookup("file").await.expect("lookup failed").expect("missing file");
2138 let test_file = ObjectStore::open_object(&store, object_id, Default::default(), None)
2139 .await
2140 .expect("open failed");
2141 let mut buffer = test_file.allocate_buffer(8192).await;
2142 assert_eq!(
2143 test_file.read(0, buffer.as_mut()).await.expect("read failed"),
2144 8192,
2145 "short read"
2146 );
2147 assert_eq!(buffer.as_slice()[0..4096], [0xA7; 4096]);
2148 assert_eq!(buffer.as_slice()[4096..8192], [0xA8; 4096]);
2149 }
2150 fs.close().await.expect("close failed");
2151 }
2152
2153 #[fuchsia::test]
2154 async fn test_image_builder_mode_no_early_writes() {
2155 const BLOCK_SIZE: u32 = 4096;
2156 let device = DeviceHolder::new(FakeDevice::new(2048, BLOCK_SIZE));
2157 device.reopen(true);
2158 let fs = FxFilesystemBuilder::new()
2159 .format(true)
2160 .image_builder_mode(Some(SuperBlockInstance::A))
2161 .open(device)
2162 .await
2163 .expect("open failed");
2164 fs.enable_allocations();
2165 fs.device().reopen(false);
2167 fs.close().await.expect("closed");
2168 }
2169
2170 #[fuchsia::test]
2171 async fn test_image_builder_mode() {
2172 const BLOCK_SIZE: u32 = 4096;
2173 const EXISTING_FILE_RANGE: Range<u64> = 4096 * 1024..4096 * 1025;
2174 let device = DeviceHolder::new(FakeDevice::new(2048, BLOCK_SIZE));
2175
2176 {
2178 let mut write_buf =
2179 device.allocate_buffer(EXISTING_FILE_RANGE.length().unwrap() as usize).await;
2180 write_buf.as_mut_slice().fill(0xf0);
2181 device.write(EXISTING_FILE_RANGE.start, write_buf.as_ref()).await.expect("write");
2182 }
2183
2184 device.reopen(true);
2185
2186 let device = {
2187 let fs = FxFilesystemBuilder::new()
2188 .format(true)
2189 .image_builder_mode(Some(SuperBlockInstance::B))
2190 .open(device)
2191 .await
2192 .expect("open failed");
2193 fs.enable_allocations();
2194 {
2195 let root_store = fs.root_store();
2196 let root_directory =
2197 Directory::open(&root_store, root_store.root_directory_object_id())
2198 .await
2199 .expect("open failed");
2200 let handle;
2202 {
2203 let mut transaction = fs
2204 .clone()
2205 .new_transaction(
2206 lock_keys![LockKey::object(
2207 root_directory.store().store_object_id(),
2208 root_directory.object_id()
2209 )],
2210 Options::default(),
2211 )
2212 .await
2213 .expect("new transaction");
2214 handle = root_directory
2215 .create_child_file(&mut transaction, "test")
2216 .await
2217 .expect("create file");
2218 handle.extend(&mut transaction, EXISTING_FILE_RANGE).await.expect("extend");
2219 transaction.commit().await.expect("commit");
2220 }
2221 }
2222 fs.device().reopen(false);
2223 fs.close().await.expect("close");
2224 fs.take_device().await
2225 };
2226 device.reopen(false);
2227 let fs = FxFilesystem::open(device).await.expect("open failed");
2228 fsck(fs.clone()).await.expect("fsck failed");
2229
2230 let root_store = fs.root_store();
2232 let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
2233 .await
2234 .expect("open failed");
2235 let (object_id, descriptor, _) =
2236 root_directory.lookup("test").await.expect("lookup failed").unwrap();
2237 assert_eq!(descriptor, ObjectDescriptor::File);
2238 let test_file =
2239 ObjectStore::open_object(&root_store, object_id, HandleOptions::default(), None)
2240 .await
2241 .expect("open failed");
2242 let mut read_buf =
2243 test_file.allocate_buffer(EXISTING_FILE_RANGE.length().unwrap() as usize).await;
2244 test_file.read(0, read_buf.as_mut()).await.expect("read failed");
2245 assert_eq!(read_buf.as_slice(), [0xf0; 4096]);
2246 fs.close().await.expect("closed");
2247 }
2248
2249 #[fuchsia::test]
2250 async fn test_read_only_mount_on_full_filesystem() {
2251 let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
2252 let fs =
2253 FxFilesystemBuilder::new().format(true).open(device).await.expect("new_empty failed");
2254 let root_store = fs.root_store();
2255 let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
2256 .await
2257 .expect("open failed");
2258
2259 let mut transaction = fs
2260 .clone()
2261 .new_transaction(
2262 lock_keys![LockKey::object(
2263 root_store.store_object_id(),
2264 root_directory.object_id()
2265 )],
2266 Options::default(),
2267 )
2268 .await
2269 .expect("new_transaction failed");
2270 let handle = root_directory
2271 .create_child_file(&mut transaction, "test")
2272 .await
2273 .expect("create_child_file failed");
2274 transaction.commit().await.expect("commit failed");
2275
2276 let mut buf = handle.allocate_buffer(4096).await;
2277 buf.as_mut_slice().fill(0xaa);
2278 loop {
2279 if handle.write_or_append(None, buf.as_ref()).await.is_err() {
2280 break;
2281 }
2282 }
2283
2284 let max_offset = fs.allocator().maximum_offset();
2285 fs.close().await.expect("Close failed");
2286
2287 let device = fs.take_device().await;
2288 device.reopen(false);
2289 let mut buffer = device
2290 .allocate_buffer(
2291 crate::round::round_up(max_offset, TEST_DEVICE_BLOCK_SIZE).unwrap() as usize
2292 )
2293 .await;
2294 device.read(0, buffer.as_mut()).await.expect("read failed");
2295
2296 let device = DeviceHolder::new(
2297 FakeDevice::from_image(&buffer.as_slice()[..], TEST_DEVICE_BLOCK_SIZE)
2298 .expect("from_image failed"),
2299 );
2300 let fs =
2301 FxFilesystemBuilder::new().read_only(true).open(device).await.expect("open failed");
2302 fs.close().await.expect("Close failed");
2303 }
2304
2305 #[test_case(SuperBlockInstance::A; "Superblock instance A")]
2306 #[test_case(SuperBlockInstance::B; "Superblock instance B")]
2307 #[fuchsia::test]
2308 async fn test_image_builder_mode_flush_on_close_sb_a(target_sb: SuperBlockInstance) {
2309 const BLOCK_SIZE: u32 = 4096;
2310 let device = DeviceHolder::new(FakeDevice::new(2048, BLOCK_SIZE));
2311
2312 device.reopen(true);
2314 let fs = FxFilesystemBuilder::new()
2315 .format(true)
2316 .image_builder_mode(Some(target_sb))
2317 .open(device)
2318 .await
2319 .expect("open failed");
2320
2321 fs.enable_allocations();
2322
2323 fs.device().reopen(false);
2325
2326 {
2328 let root_store = fs.root_store();
2329 let root_directory =
2330 Directory::open(&root_store, root_store.root_directory_object_id())
2331 .await
2332 .expect("open failed");
2333
2334 let mut transaction = fs
2335 .clone()
2336 .new_transaction(
2337 lock_keys![LockKey::object(
2338 root_directory.store().store_object_id(),
2339 root_directory.object_id()
2340 )],
2341 Options::default(),
2342 )
2343 .await
2344 .expect("new transaction");
2345 let handle = root_directory
2346 .create_child_file(&mut transaction, "post_finalize_file")
2347 .await
2348 .expect("create file");
2349 transaction.commit().await.expect("commit");
2350
2351 let mut buf = handle.allocate_buffer(BLOCK_SIZE as usize).await;
2352 buf.as_mut_slice().fill(0xaa);
2353 handle.write_or_append(None, buf.as_ref()).await.expect("write failed");
2354 }
2355
2356 fs.close().await.expect("close failed");
2358
2359 let other_sb = target_sb.next();
2360
2361 let device = fs.take_device().await;
2363 device.reopen(true); let mut buf = device.allocate_buffer(BLOCK_SIZE as usize).await;
2365
2366 device.read(target_sb.first_extent().start, buf.as_mut()).await.expect("read target_sb");
2367 assert_eq!(&buf.as_slice()[..8], b"FxfsSupr", "target_sb should have magic bytes");
2368
2369 buf.as_mut_slice().fill(0); device.read(other_sb.first_extent().start, buf.as_mut()).await.expect("read other_sb");
2371 assert_eq!(buf.as_slice(), &[0; 4096], "other_sb should be zeroed");
2373 }
2374
2375 #[cfg(target_os = "fuchsia")]
2376 #[fuchsia::test(allow_stalls = false)]
2377 async fn test_trim_with_power_manager() {
2378 use anyhow::Error;
2379 use async_trait::async_trait;
2380 use fuchsia_async::TestExecutor;
2381 use futures::StreamExt;
2382 use zx::HandleBased;
2383
2384 TestExecutor::advance_to(fasync::MonotonicInstant::ZERO).await;
2385
2386 #[derive(Default)]
2387 struct MockPowerManager {
2388 on_battery: Mutex<bool>,
2389 event: event_listener::Event,
2390 wake_lease: Mutex<Option<zx::EventPair>>,
2391 }
2392
2393 impl MockPowerManager {
2394 fn set_on_battery(&self, v: bool) {
2395 *self.on_battery.lock() = v;
2396 self.event.notify(usize::MAX);
2397 }
2398
2399 fn is_lease_held(&self) -> bool {
2400 self.wake_lease.lock().as_ref().is_some_and(|handle| {
2401 handle
2402 .wait_one(
2403 zx::Signals::EVENTPAIR_PEER_CLOSED,
2404 zx::MonotonicInstant::INFINITE_PAST,
2405 )
2406 .is_err()
2407 })
2408 }
2409 }
2410
2411 impl super::PowerManager for MockPowerManager {
2412 fn watch_battery(
2413 self: Arc<Self>,
2414 ) -> futures::stream::BoxStream<'static, (bool, super::WakeLease)> {
2415 futures::stream::unfold(true, move |first| {
2416 let this = self.clone();
2417 async move {
2418 if !first {
2419 this.event.listen().await;
2420 }
2421 let val = *this.on_battery.lock();
2422 let handle = if val {
2423 zx::NullableHandle::invalid()
2424 } else {
2425 let (h1, h2) = zx::EventPair::create();
2426 *this.wake_lease.lock() = Some(h2);
2427 h1.into_handle()
2429 };
2430 Some(((val, handle), false))
2431 }
2432 })
2433 .boxed()
2434 }
2435 }
2436
2437 let trim_count = Arc::new(AtomicU32::new(0));
2438
2439 struct TrimTrackingDevice {
2440 inner: DeviceHolder,
2441 trim_count: Arc<AtomicU32>,
2442 power_manager: Arc<MockPowerManager>,
2443 }
2444
2445 #[async_trait]
2446 impl storage_device::Device for TrimTrackingDevice {
2447 fn allocate_buffer(&self, size: usize) -> storage_device::buffer::BufferFuture<'_> {
2448 self.inner.allocate_buffer(size)
2449 }
2450 fn block_size(&self) -> u32 {
2451 self.inner.block_size()
2452 }
2453 fn block_count(&self) -> u64 {
2454 self.inner.block_count()
2455 }
2456 async fn read_with_opts(
2457 &self,
2458 offset: u64,
2459 buffer: storage_device::buffer::MutableBufferRef<'_>,
2460 opts: storage_device::ReadOptions,
2461 ) -> Result<(), Error> {
2462 self.inner.read_with_opts(offset, buffer, opts).await
2463 }
2464 async fn write_with_opts(
2465 &self,
2466 offset: u64,
2467 buffer: storage_device::buffer::BufferRef<'_>,
2468 opts: storage_device::WriteOptions,
2469 ) -> Result<(), Error> {
2470 self.inner.write_with_opts(offset, buffer, opts).await
2471 }
2472 async fn trim(&self, range: std::ops::Range<u64>) -> Result<(), Error> {
2473 assert!(self.power_manager.is_lease_held());
2474 self.trim_count.fetch_add(1, Ordering::SeqCst);
2475 self.inner.trim(range).await
2476 }
2477 async fn flush(&self) -> Result<(), Error> {
2478 self.inner.flush().await
2479 }
2480 async fn close(&self) -> Result<(), Error> {
2481 self.inner.close().await
2482 }
2483 fn barrier(&self) {
2484 self.inner.barrier()
2485 }
2486 fn supports_trim(&self) -> bool {
2487 true
2488 }
2489 fn is_read_only(&self) -> bool {
2490 self.inner.is_read_only()
2491 }
2492 fn snapshot(&self) -> Result<DeviceHolder, Error> {
2493 Ok(DeviceHolder::new(TrimTrackingDevice {
2494 inner: self.inner.snapshot()?,
2495 trim_count: self.trim_count.clone(),
2496 power_manager: self.power_manager.clone(),
2497 }))
2498 }
2499 fn reopen(&self, read_only: bool) {
2500 self.inner.reopen(read_only)
2501 }
2502 }
2503
2504 let pm = Arc::new(MockPowerManager::default());
2505
2506 pm.set_on_battery(true);
2508
2509 let fake_device = FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE);
2510 let device = DeviceHolder::new(TrimTrackingDevice {
2511 inner: DeviceHolder::new(fake_device),
2512 trim_count: trim_count.clone(),
2513 power_manager: pm.clone(),
2514 });
2515
2516 let fs = FxFilesystemBuilder::new()
2517 .format(true)
2518 .power_manager(pm.clone())
2519 .trim_config(Some((Duration::ZERO, Duration::from_millis(100))))
2520 .trim_charger_wait(Duration::from_millis(10))
2521 .open(device)
2522 .await
2523 .expect("open failed");
2524
2525 TestExecutor::advance_to(fasync::MonotonicInstant::after(
2527 Duration::from_millis(500).into(),
2528 ))
2529 .await;
2530 let _ = TestExecutor::poll_until_stalled(std::future::pending::<()>()).await;
2531
2532 assert_eq!(trim_count.load(Ordering::SeqCst), 0);
2533
2534 {
2536 let root_store = fs.root_store();
2537 let root_directory =
2538 Directory::open(&root_store, root_store.root_directory_object_id())
2539 .await
2540 .expect("open failed");
2541 let mut transaction = fs
2542 .clone()
2543 .new_transaction(
2544 lock_keys![LockKey::object(
2545 root_store.store_object_id(),
2546 root_directory.object_id()
2547 )],
2548 Options::default(),
2549 )
2550 .await
2551 .expect("new_transaction failed");
2552 let handle = root_directory
2553 .create_child_file(&mut transaction, "test")
2554 .await
2555 .expect("create_child_file failed");
2556 transaction.commit().await.expect("commit failed");
2557 handle.allocate(0..4096).await.expect("allocate failed");
2558 let mut transaction = fs
2560 .clone()
2561 .new_transaction(
2562 lock_keys![
2563 LockKey::object(root_store.store_object_id(), root_directory.object_id()),
2564 LockKey::object(root_store.store_object_id(), handle.object_id()),
2565 ],
2566 Options::default(),
2567 )
2568 .await
2569 .expect("new_transaction failed");
2570 replace_child(&mut transaction, None, (&root_directory, "test"))
2571 .await
2572 .expect("delete failed");
2573 transaction.commit().await.expect("commit failed");
2574 fs.root_store()
2575 .tombstone_object(handle.object_id(), Options::default())
2576 .await
2577 .expect("tombstone failed");
2578 }
2579
2580 pm.set_on_battery(false);
2582
2583 TestExecutor::advance_to(fasync::MonotonicInstant::after(Duration::from_millis(10).into()))
2585 .await;
2586
2587 let _ = TestExecutor::poll_until_stalled(std::future::pending::<()>()).await;
2588
2589 assert!(trim_count.load(Ordering::SeqCst) > 0);
2590
2591 trim_count.store(0, Ordering::SeqCst);
2593 pm.set_on_battery(true);
2594
2595 TestExecutor::advance_to(fasync::MonotonicInstant::after(
2597 Duration::from_millis(500).into(),
2598 ))
2599 .await;
2600
2601 let _ = TestExecutor::poll_until_stalled(std::future::pending::<()>()).await;
2602
2603 assert_eq!(trim_count.load(Ordering::SeqCst), 0);
2604
2605 fs.close().await.expect("close failed");
2606 }
2607}