1use crate::errors::FxfsError;
6use crate::fsck::{FsckOptions, fsck_volume_with_options, fsck_with_options};
7use crate::log::*;
8use crate::metrics;
9use crate::object_store::allocator::{Allocator, Hold, Reservation};
10use crate::object_store::directory::Directory;
11use crate::object_store::graveyard::Graveyard;
12use crate::object_store::journal::super_block::{SuperBlockHeader, SuperBlockInstance};
13use crate::object_store::journal::{self, Journal, JournalCheckpoint, JournalOptions};
14use crate::object_store::object_manager::ObjectManager;
15use crate::object_store::transaction::{
16 self, AssocObj, LockKey, LockManager, MetadataReservation, Mutation,
17 TRANSACTION_METADATA_MAX_AMOUNT, Transaction, WriteGuard, lock_keys,
18};
19use crate::object_store::volume::{VOLUMES_DIRECTORY, root_volume};
20use crate::object_store::{NewChildStoreOptions, ObjectStore, StoreOptions};
21use crate::range::RangeExt;
22use crate::serialized_types::{LATEST_VERSION, Version};
23use anyhow::{Context, Error, anyhow, bail};
24use async_trait::async_trait;
25use event_listener::Event;
26use fuchsia_async as fasync;
27use fuchsia_async::condition::Condition;
28use fuchsia_inspect::{Inspector, LazyNode, NumericProperty as _, UintProperty};
29use fuchsia_sync::Mutex;
30use futures::{FutureExt, Stream};
31use fxfs_crypto::Crypt;
32use fxfs_trace::{TraceFutureExt, trace_future_args};
33use static_assertions::const_assert;
34use std::pin::pin;
35use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
36use std::sync::{Arc, OnceLock, Weak};
37use std::task::Poll;
38use std::time::{Duration, Instant};
39use storage_device::{Device, DeviceHolder};
40
41pub const MIN_BLOCK_SIZE: u64 = 4096;
42pub const MAX_BLOCK_SIZE: u64 = u16::MAX as u64 + 1;
43
44pub const MAX_FILE_SIZE: u64 = i64::MAX as u64 - 4095;
49const_assert!(9223372036854771712 == MAX_FILE_SIZE);
50
51use futures::stream::StreamExt;
52
53const MAX_IN_FLIGHT_TRANSACTIONS: u64 = 4;
55
56const TRIM_AFTER_BOOT_TIMER: Duration = Duration::from_secs(60 * 60);
60
61const TRIM_INTERVAL_TIMER: Duration = Duration::from_secs(60 * 60 * 24);
63
64const CLEAN_TRANSFER_BUFFER_INTERVAL: Duration = Duration::from_secs(60);
67
68#[cfg(target_os = "fuchsia")]
69pub type WakeLease = zx::NullableHandle;
70
71#[cfg(not(target_os = "fuchsia"))]
72pub type WakeLease = fasync::emulated_handle::Handle;
73
74pub trait PowerManager: Send + Sync {
75 fn watch_battery(self: Arc<Self>) -> futures::stream::BoxStream<'static, (bool, WakeLease)>;
77}
78
79pub struct Info {
81 pub total_bytes: u64,
82 pub used_bytes: u64,
83}
84
85pub type PostCommitHook =
86 Option<Box<dyn Fn() -> futures::future::BoxFuture<'static, ()> + Send + Sync>>;
87
88pub type PreCommitHook = Option<Box<dyn Fn(&Transaction<'_>) -> Result<(), Error> + Send + Sync>>;
89
90pub struct Options {
91 pub read_only: bool,
93
94 pub roll_metadata_key_byte_count: u64,
98
99 pub pre_commit_hook: PreCommitHook,
102
103 pub post_commit_hook: PostCommitHook,
106
107 pub skip_initial_reap: bool,
110
111 pub trim_config: Option<(Duration, Duration)>,
116
117 pub image_builder_mode: Option<SuperBlockInstance>,
120
121 pub inline_crypto_enabled: bool,
128
129 pub barriers_enabled: bool,
134
135 pub power_manager: Option<Arc<dyn PowerManager>>,
137
138 pub trim_charger_wait: Duration,
140}
141
142impl Default for Options {
143 fn default() -> Self {
144 Options {
145 roll_metadata_key_byte_count: 128 * 1024 * 1024,
146 read_only: false,
147 pre_commit_hook: None,
148 post_commit_hook: None,
149 skip_initial_reap: false,
150 trim_config: Some((TRIM_AFTER_BOOT_TIMER, TRIM_INTERVAL_TIMER)),
151 image_builder_mode: None,
152 inline_crypto_enabled: false,
153 barriers_enabled: false,
154 power_manager: None,
155 trim_charger_wait: Duration::from_secs(10),
156 }
157 }
158}
159
160pub struct ApplyContext<'a, 'b> {
162 pub mode: ApplyMode<'a, 'b>,
164
165 pub checkpoint: JournalCheckpoint,
167}
168
169pub enum ApplyMode<'a, 'b> {
172 Replay,
173 Live(&'a Transaction<'b>),
174}
175
176impl ApplyMode<'_, '_> {
177 pub fn is_replay(&self) -> bool {
178 matches!(self, ApplyMode::Replay)
179 }
180
181 pub fn is_live(&self) -> bool {
182 matches!(self, ApplyMode::Live(_))
183 }
184}
185
186#[async_trait]
189pub trait JournalingObject: Send + Sync {
190 fn apply_mutation(
194 &self,
195 mutation: Mutation,
196 context: &ApplyContext<'_, '_>,
197 assoc_obj: AssocObj<'_>,
198 ) -> Result<(), Error>;
199
200 fn drop_mutation(&self, mutation: Mutation, transaction: &Transaction<'_>);
202
203 async fn prepare_commit<'a>(
206 &self,
207 _filesystem: &'a FxFilesystem,
208 _transaction: &Transaction<'_>,
209 ) -> Result<Option<WriteGuard<'a>>, Error> {
210 Ok(None)
211 }
212
213 async fn flush(&self) -> Result<Version, Error>;
217
218 fn write_mutation(&self, mutation: &Mutation, mut writer: journal::Writer<'_>) {
221 writer.write(mutation.clone());
222 }
223}
224
225#[derive(Default)]
226pub struct SyncOptions<'a> {
227 pub flush_device: bool,
234
235 pub precondition: Option<Box<dyn FnOnce() -> bool + 'a + Send>>,
238}
239
240pub struct OpenFxFilesystem(Arc<FxFilesystem>);
241
242impl OpenFxFilesystem {
243 pub async fn take_device(self) -> DeviceHolder {
246 let fut = self.device.take_when_dropped();
247 std::mem::drop(self);
248 debug_assert_not_too_long!(fut)
249 }
250}
251
252impl From<Arc<FxFilesystem>> for OpenFxFilesystem {
253 fn from(fs: Arc<FxFilesystem>) -> Self {
254 Self(fs)
255 }
256}
257
258impl Drop for OpenFxFilesystem {
259 fn drop(&mut self) {
260 if self.options.image_builder_mode.is_some()
261 && self.journal().image_builder_mode().is_some()
262 {
263 error!("OpenFxFilesystem in image_builder_mode dropped without calling close().");
264 }
265 if !self.options.read_only && !self.closed.load(Ordering::SeqCst) {
266 error!("OpenFxFilesystem dropped without first being closed. Data loss may occur.");
267 }
268 }
269}
270
271impl std::ops::Deref for OpenFxFilesystem {
272 type Target = Arc<FxFilesystem>;
273
274 fn deref(&self) -> &Self::Target {
275 &self.0
276 }
277}
278
279pub struct FxFilesystemBuilder {
280 format: bool,
281 trace: bool,
282 options: Options,
283 journal_options: JournalOptions,
284 on_new_allocator: Option<Box<dyn Fn(Arc<Allocator>) + Send + Sync>>,
285 on_new_store: Option<Box<dyn Fn(&ObjectStore) + Send + Sync>>,
286 fsck_after_every_transaction: bool,
287}
288
289impl FxFilesystemBuilder {
290 pub fn new() -> Self {
291 Self {
292 format: false,
293 trace: false,
294 options: Options::default(),
295 journal_options: JournalOptions::default(),
296 on_new_allocator: None,
297 on_new_store: None,
298 fsck_after_every_transaction: false,
299 }
300 }
301
302 pub fn format(mut self, format: bool) -> Self {
304 self.format = format;
305 self
306 }
307
308 pub fn trace(mut self, trace: bool) -> Self {
310 self.trace = trace;
311 self
312 }
313
314 pub fn read_only(mut self, read_only: bool) -> Self {
317 self.options.read_only = read_only;
318 self
319 }
320
321 pub fn image_builder_mode(mut self, mode: Option<SuperBlockInstance>) -> Self {
328 self.options.image_builder_mode = mode;
329 self
330 }
331
332 pub fn roll_metadata_key_byte_count(mut self, roll_metadata_key_byte_count: u64) -> Self {
334 self.options.roll_metadata_key_byte_count = roll_metadata_key_byte_count;
335 self
336 }
337
338 pub fn pre_commit_hook(
340 mut self,
341 hook: impl Fn(&Transaction<'_>) -> Result<(), Error> + Send + Sync + 'static,
342 ) -> Self {
343 self.options.pre_commit_hook = Some(Box::new(hook));
344 self
345 }
346
347 pub fn post_commit_hook(
350 mut self,
351 hook: impl Fn() -> futures::future::BoxFuture<'static, ()> + Send + Sync + 'static,
352 ) -> Self {
353 self.options.post_commit_hook = Some(Box::new(hook));
354 self
355 }
356
357 pub fn skip_initial_reap(mut self, skip_initial_reap: bool) -> Self {
360 self.options.skip_initial_reap = skip_initial_reap;
361 self
362 }
363
364 pub fn journal_options(mut self, journal_options: JournalOptions) -> Self {
366 self.journal_options = journal_options;
367 self
368 }
369
370 pub fn on_new_allocator(
372 mut self,
373 on_new_allocator: impl Fn(Arc<Allocator>) + Send + Sync + 'static,
374 ) -> Self {
375 self.on_new_allocator = Some(Box::new(on_new_allocator));
376 self
377 }
378
379 pub fn on_new_store(
381 mut self,
382 on_new_store: impl Fn(&ObjectStore) + Send + Sync + 'static,
383 ) -> Self {
384 self.on_new_store = Some(Box::new(on_new_store));
385 self
386 }
387
388 pub fn fsck_after_every_transaction(mut self, fsck_after_every_transaction: bool) -> Self {
390 self.fsck_after_every_transaction = fsck_after_every_transaction;
391 self
392 }
393
394 pub fn trim_config(mut self, delay_and_interval: Option<(Duration, Duration)>) -> Self {
395 self.options.trim_config = delay_and_interval;
396 self
397 }
398
399 pub fn power_manager(mut self, power_manager: Arc<dyn PowerManager>) -> Self {
400 self.options.power_manager = Some(power_manager);
401 self
402 }
403
404 pub fn trim_charger_wait(mut self, wait: Duration) -> Self {
405 self.options.trim_charger_wait = wait;
406 self
407 }
408
409 pub fn inline_crypto_enabled(mut self, inline_crypto_enabled: bool) -> Self {
411 self.options.inline_crypto_enabled = inline_crypto_enabled;
412 self
413 }
414
415 pub fn barriers_enabled(mut self, barriers_enabled: bool) -> Self {
418 self.options.barriers_enabled = barriers_enabled;
419 self.journal_options.barriers_enabled = barriers_enabled;
420 self
421 }
422
423 pub async fn open(self, device: DeviceHolder) -> Result<OpenFxFilesystem, Error> {
425 let read_only = self.options.read_only;
426 if self.format && read_only {
427 bail!("Cannot initialize a filesystem as read-only");
428 }
429
430 if self.options.inline_crypto_enabled && !self.options.barriers_enabled {
432 bail!("A filesystem using inline encryption requires barriers");
433 }
434
435 let objects = Arc::new(ObjectManager::new(self.on_new_store));
436 let journal = Arc::new(Journal::new(objects.clone(), self.journal_options));
437
438 let image_builder_mode = self.options.image_builder_mode;
439
440 let block_size = std::cmp::max(device.block_size().into(), MIN_BLOCK_SIZE);
441 assert_eq!(block_size % MIN_BLOCK_SIZE, 0);
442 assert!(block_size <= MAX_BLOCK_SIZE, "Max supported block size is 64KiB");
443
444 let mut fsck_after_every_transaction = None;
445 let mut filesystem_options = self.options;
446 if self.fsck_after_every_transaction {
447 let instance =
448 FsckAfterEveryTransaction::new(filesystem_options.post_commit_hook.take());
449 fsck_after_every_transaction = Some(instance.clone());
450 filesystem_options.post_commit_hook =
451 Some(Box::new(move || Box::pin(instance.clone().run())));
452 }
453
454 if !read_only && !self.format {
455 device.flush().await.context("Device flush failed")?;
458 }
459
460 let filesystem = Arc::new_cyclic(|weak: &Weak<FxFilesystem>| {
461 let weak = weak.clone();
462 FxFilesystem {
463 device,
464 block_size,
465 objects: objects.clone(),
466 journal,
467 commit_mutex: futures::lock::Mutex::new(()),
468 lock_manager: LockManager::new(),
469 flush_task: Mutex::new(None),
470 background_tasks: fasync::Scope::new(),
471 closed: AtomicBool::new(true),
472 trace: self.trace,
473 graveyard: Graveyard::new(objects.clone()),
474 completed_transactions: metrics::detail().create_uint("completed_transactions", 0),
475 options: filesystem_options,
476 in_flight_transactions: AtomicU64::new(0),
477 transaction_limit_event: Event::new(),
478 _stores_node: metrics::register_fs(move || {
479 let weak = weak.clone();
480 Box::pin(async move {
481 if let Some(fs) = weak.upgrade() {
482 fs.populate_stores_node().await
483 } else {
484 Err(anyhow!("Filesystem has been dropped"))
485 }
486 })
487 }),
488 }
489 });
490
491 filesystem.journal().set_image_builder_mode(image_builder_mode);
492
493 filesystem.journal.set_trace(self.trace);
494 if self.format {
495 filesystem.journal.init_empty(filesystem.clone()).await?;
496 if image_builder_mode.is_none() {
497 filesystem.journal.init_superblocks().await?;
500
501 filesystem.graveyard.clone().reap_async();
503 }
504
505 let root_store = filesystem.root_store();
507 root_store.set_trace(self.trace);
508 let root_directory =
509 Directory::open(&root_store, root_store.root_directory_object_id())
510 .await
511 .context("Unable to open root volume directory")?;
512 let mut transaction = root_store
513 .new_transaction(
514 lock_keys![LockKey::object(
515 root_store.store_object_id(),
516 root_directory.object_id()
517 )],
518 transaction::Options::default(),
519 )
520 .await?;
521 let volume_directory =
522 root_directory.create_child_dir(&mut transaction, VOLUMES_DIRECTORY).await?;
523 transaction.commit().await?;
524 objects.set_volume_directory(volume_directory);
525 } else {
526 filesystem
527 .journal
528 .replay(filesystem.clone(), self.on_new_allocator)
529 .await
530 .context("Journal replay failed")?;
531 filesystem.root_store().set_trace(self.trace);
532
533 if !read_only {
534 for store in objects.unlocked_stores() {
538 filesystem.graveyard.initial_reap(&store).await?;
539 }
540 }
541 }
542
543 if let Some(fsck_after_every_transaction) = fsck_after_every_transaction {
545 fsck_after_every_transaction
546 .fs
547 .set(Arc::downgrade(&filesystem))
548 .unwrap_or_else(|_| unreachable!());
549 }
550
551 filesystem.closed.store(false, Ordering::SeqCst);
552
553 if !read_only && image_builder_mode.is_none() {
554 filesystem.graveyard.clone().reap_async();
556
557 if filesystem.options.trim_config.is_some() {
558 filesystem.start_trim_task();
559 }
560 filesystem.start_clean_transfer_buffer_task();
561 }
562
563 Ok(filesystem.into())
564 }
565}
566
567pub struct FxFilesystem {
568 block_size: u64,
569 objects: Arc<ObjectManager>,
570 journal: Arc<Journal>,
571 commit_mutex: futures::lock::Mutex<()>,
572 lock_manager: LockManager,
573 flush_task: Mutex<Option<fasync::Task<()>>>,
574 background_tasks: fasync::Scope,
575 closed: AtomicBool,
576 trace: bool,
578 graveyard: Arc<Graveyard>,
579 completed_transactions: UintProperty,
580 options: Options,
581
582 in_flight_transactions: AtomicU64,
584
585 transaction_limit_event: Event,
588
589 device: DeviceHolder,
592
593 _stores_node: LazyNode,
595}
596
597#[fxfs_trace::trace]
598impl FxFilesystem {
599 pub async fn new_empty(device: DeviceHolder) -> Result<OpenFxFilesystem, Error> {
600 FxFilesystemBuilder::new().format(true).open(device).await
601 }
602
603 pub async fn open(device: DeviceHolder) -> Result<OpenFxFilesystem, Error> {
604 FxFilesystemBuilder::new().open(device).await
605 }
606
607 pub fn root_parent_store(&self) -> Arc<ObjectStore> {
608 self.objects.root_parent_store()
609 }
610
611 pub async fn close(&self) -> Result<(), Error> {
612 if self.journal().image_builder_mode().is_some() {
613 self.journal().allocate_journal().await?;
614 self.journal().set_image_builder_mode(None);
615 self.journal().force_compact().await?;
616 }
617 assert_eq!(self.closed.swap(true, Ordering::SeqCst), false);
618 debug_assert_not_too_long!(self.graveyard.wait_for_reap());
619 debug_assert_not_too_long!(self.background_tasks.clone().cancel());
620 self.journal.stop_compactions().await;
621 let sync_status =
622 if self.journal().image_builder_mode().is_some() || self.options().read_only {
623 Ok(None)
624 } else {
625 self.journal.sync(SyncOptions { flush_device: true, ..Default::default() }).await
626 };
627 match &sync_status {
628 Ok(None) => {}
629 Ok(checkpoint) => info!(
630 "Filesystem closed (checkpoint={}, metadata_reservation={:?}, \
631 reservation_required={}, borrowed={})",
632 checkpoint.as_ref().unwrap().0.file_offset,
633 self.object_manager().metadata_reservation(),
634 self.object_manager().required_reservation(),
635 self.object_manager().borrowed_metadata_space(),
636 ),
637 Err(e) => error!(error:? = e; "Failed to sync filesystem; data may be lost"),
638 }
639 self.journal.terminate();
640 let flush_task = self.flush_task.lock().take();
641 if let Some(task) = flush_task {
642 debug_assert_not_too_long!(task);
643 }
644 self.device().close().await.context("Failed to close device")?;
647 sync_status.map(|_| ())
648 }
649
650 pub fn device(&self) -> Arc<dyn Device> {
651 Arc::clone(&self.device)
652 }
653
654 pub fn root_store(&self) -> Arc<ObjectStore> {
655 self.objects.root_store()
656 }
657
658 pub fn allocator(&self) -> Arc<Allocator> {
659 self.objects.allocator()
660 }
661
662 pub fn enable_allocations(&self) {
666 self.allocator().enable_allocations();
667 }
668
669 pub fn object_manager(&self) -> &Arc<ObjectManager> {
670 &self.objects
671 }
672
673 pub fn journal(&self) -> &Arc<Journal> {
674 &self.journal
675 }
676
677 pub async fn sync(&self, options: SyncOptions<'_>) -> Result<(), Error> {
678 self.journal.sync(options).await.map(|_| ())
679 }
680
681 pub fn block_size(&self) -> u64 {
682 self.block_size
683 }
684
685 pub fn get_info(&self) -> Info {
686 Info {
687 total_bytes: self.device.size(),
688 used_bytes: self.object_manager().allocator().get_used_bytes().0,
689 }
690 }
691
692 pub fn super_block_header(&self) -> SuperBlockHeader {
693 self.journal.super_block_header()
694 }
695
696 pub fn graveyard(&self) -> &Arc<Graveyard> {
697 &self.graveyard
698 }
699
700 pub fn trace(&self) -> bool {
701 self.trace
702 }
703
704 pub fn options(&self) -> &Options {
705 &self.options
706 }
707
708 pub async fn lock_commits(&self) -> futures::lock::MutexGuard<'_, ()> {
715 self.commit_mutex.lock().await
716 }
717
718 #[trace]
719 pub async fn commit_transaction<R: Send>(
720 &self,
721 transaction: &mut Transaction<'_>,
722 callback: impl FnOnce(u64) -> R + Send,
723 ) -> Result<R, Error> {
724 if let Some(hook) = self.options.pre_commit_hook.as_ref() {
725 hook(transaction)?;
726 }
727 debug_assert_not_too_long!(self.lock_manager.commit_prepare(&transaction));
728
729 let mut guards = Vec::new();
734 let mut last_object_id = 0;
735 for mutation in transaction.mutations() {
736 let object_id = mutation.object_id;
737
738 if matches!(mutation.mutation, Mutation::BeginFlush | Mutation::EndFlush) {
740 continue;
741 }
742
743 if object_id == last_object_id {
744 continue;
745 }
746 assert!(object_id > last_object_id);
747 last_object_id = object_id;
748
749 if let Some(obj) = self.object_manager().journaling_object(object_id) {
750 if let Some(guard) = obj.prepare_commit(self, transaction).await? {
751 guards.push(guard);
752 }
753 }
754 }
755
756 self.maybe_start_flush_task();
757 let _guard = debug_assert_not_too_long!(self.commit_mutex.lock());
758 let journal_offset = if self.journal().image_builder_mode().is_some() {
759 let journal_checkpoint =
760 JournalCheckpoint { file_offset: 0, checksum: 0, version: LATEST_VERSION };
761 let maybe_mutation = self
762 .object_manager()
763 .apply_transaction(transaction, &journal_checkpoint)
764 .expect("Transactions must not fail in image_builder_mode");
765 if let Some(mutation) = maybe_mutation {
766 assert!(matches!(mutation, Mutation::UpdateBorrowed(_)));
767 }
771 self.object_manager().did_commit_transaction(transaction, &journal_checkpoint, 0);
772 0
773 } else {
774 self.journal.commit(transaction).await?
775 };
776
777 std::mem::drop(guards);
778 self.completed_transactions.add(1);
779
780 let result = callback(journal_offset);
785
786 if let Some(hook) = self.options.post_commit_hook.as_ref() {
787 hook().await;
788 }
789
790 Ok(result)
791 }
792
793 pub fn lock_manager(&self) -> &LockManager {
794 &self.lock_manager
795 }
796
797 pub(crate) fn drop_transaction(&self, transaction: &mut Transaction<'_>) {
798 if !matches!(transaction.metadata_reservation, MetadataReservation::None) {
799 self.sub_transaction();
800 }
801 if let MetadataReservation::Hold(hold_amount) =
803 std::mem::replace(&mut transaction.metadata_reservation, MetadataReservation::None)
804 {
805 let hold = transaction
806 .allocator_reservation
807 .unwrap()
808 .reserve(0)
809 .expect("Zero should always succeed.");
810 hold.add(hold_amount);
811 }
812 self.objects.drop_transaction(transaction);
813 self.lock_manager.drop_transaction(transaction);
814 }
815
816 fn maybe_start_flush_task(&self) {
817 if self.journal.image_builder_mode().is_some() {
818 return;
819 }
820 let mut flush_task = self.flush_task.lock();
821 if flush_task.is_none() {
822 let journal = self.journal.clone();
823 *flush_task = Some(fasync::Task::spawn(
824 journal.flush_task().trace(trace_future_args!("Journal::flush_task")),
825 ));
826 }
827 }
828
829 fn start_trim_task(self: &Arc<Self>) {
830 if !self.device.supports_trim() {
831 info!("Device does not support trim; not scheduling trimming");
832 return;
833 }
834 let this = self.clone();
835 self.background_tasks
836 .spawn(this.trim_task().trace(trace_future_args!("Filesystem::trim_task")));
837 }
838
839 async fn trim_task(self: Arc<Self>) {
840 let Some((mut next_timer, _)) = self.options.trim_config else { return };
842 loop {
843 fasync::Timer::new(next_timer.clone()).await;
844
845 let start = Instant::now();
848 let result = if let Some(pm) = &self.options.power_manager {
849 let mut watcher = pm.clone().watch_battery();
850
851 let pauser = Pauser::new(self.options.trim_charger_wait);
853
854 let mut pause_future = pin!(
855 async {
856 let mut wake_lease = WakeLease::invalid();
857 loop {
858 let Some((using_battery, new_lease)) = watcher.next_latest().await
859 else {
860 pauser.set_pause(false);
863 drop(wake_lease); return;
865 };
866
867 pauser.set_pause(using_battery);
869
870 if using_battery {
873 wake_lease = WakeLease::invalid();
874 } else if !new_lease.is_invalid() {
875 wake_lease = new_lease;
876 }
877 }
878 }
879 .fuse()
880 );
881
882 let mut do_trim = pin!(self.do_trim(Some(&pauser)).fuse());
883
884 loop {
885 futures::select! {
886 _ = pause_future => {}
887 result = do_trim => break result,
888 }
889 }
890
891 } else {
894 self.do_trim(None).await
895 };
896
897 let duration = start.elapsed();
898 match result {
899 Ok(bytes_trimmed) => info!(
900 "Trimmed {bytes_trimmed} bytes in {duration:?}. Next trim in \
901 {next_timer:?}",
902 ),
903 Err(error) => error!(error:?; "Failed to trim"),
904 }
905
906 let Some((_, interval)) = self.options.trim_config else { return };
907 next_timer = interval;
908 if next_timer.is_zero() {
909 fasync::yield_now().await;
910 }
911 }
912 }
913
914 async fn do_trim(&self, pauser: Option<&Pauser>) -> Result<usize, Error> {
916 const MAX_EXTENTS_PER_BATCH: usize = 8;
917 const MAX_EXTENT_SIZE: usize = 256 * 1024;
918 let mut offset = 0;
919 let mut bytes_trimmed = 0;
920 loop {
921 let allocator = self.allocator();
922 if let Some(pauser) = pauser {
923 pauser.maybe_pause().await;
924 }
925 let trimmable_extents =
926 allocator.take_for_trimming(offset, MAX_EXTENT_SIZE, MAX_EXTENTS_PER_BATCH).await?;
927 for device_range in trimmable_extents.extents() {
928 self.device.trim(device_range.clone()).await?;
929 bytes_trimmed += device_range.length()? as usize;
930 }
931 if let Some(device_range) = trimmable_extents.extents().last() {
932 offset = device_range.end;
933 } else {
934 break;
935 }
936 }
937 Ok(bytes_trimmed)
938 }
939
940 fn start_clean_transfer_buffer_task(self: &Arc<Self>) {
941 let this = self.clone();
942 self.background_tasks.spawn(
943 async move {
944 loop {
945 fasync::Timer::new(CLEAN_TRANSFER_BUFFER_INTERVAL).await;
946 this.device().clean_transfer_buffer();
947 }
948 }
949 .trace(trace_future_args!("Filesystem::clean_transfer_buffer_task")),
950 );
951 }
952
953 pub(crate) async fn reservation_for_transaction<'a>(
954 self: &Arc<Self>,
955 options: transaction::Options<'a>,
956 ) -> Result<(MetadataReservation, Option<&'a Reservation>, Option<Hold<'a>>), Error> {
957 if self.options.image_builder_mode.is_some() {
958 return Ok((MetadataReservation::Borrowed, None, None));
961 }
962 if !options.skip_journal_checks {
963 self.maybe_start_flush_task();
964 self.journal.check_journal_space().await?;
965 }
966
967 let mut hold = None;
981 let metadata_reservation = if options.borrow_metadata_space {
982 MetadataReservation::Borrowed
983 } else {
984 match options.allocator_reservation {
985 Some(reservation) => {
986 hold = Some(
987 reservation
988 .reserve(TRANSACTION_METADATA_MAX_AMOUNT)
989 .ok_or(FxfsError::NoSpace)?,
990 );
991 MetadataReservation::Hold(TRANSACTION_METADATA_MAX_AMOUNT)
992 }
993 None => {
994 let reservation = self
995 .allocator()
996 .reserve(None, TRANSACTION_METADATA_MAX_AMOUNT)
997 .ok_or(FxfsError::NoSpace)?;
998 MetadataReservation::Reservation(reservation)
999 }
1000 }
1001 };
1002 Ok((metadata_reservation, options.allocator_reservation, hold))
1003 }
1004
1005 pub(crate) async fn add_transaction(&self, skip_journal_checks: bool) {
1006 if skip_journal_checks {
1007 self.in_flight_transactions.fetch_add(1, Ordering::Relaxed);
1008 } else {
1009 let inc = || {
1010 let mut in_flights = self.in_flight_transactions.load(Ordering::Relaxed);
1011 while in_flights < MAX_IN_FLIGHT_TRANSACTIONS {
1012 match self.in_flight_transactions.compare_exchange_weak(
1013 in_flights,
1014 in_flights + 1,
1015 Ordering::Relaxed,
1016 Ordering::Relaxed,
1017 ) {
1018 Ok(_) => return true,
1019 Err(x) => in_flights = x,
1020 }
1021 }
1022 return false;
1023 };
1024 while !inc() {
1025 let listener = self.transaction_limit_event.listen();
1026 if inc() {
1027 break;
1028 }
1029 listener.await;
1030 }
1031 }
1032 }
1033
1034 pub(crate) fn sub_transaction(&self) {
1035 let old = self.in_flight_transactions.fetch_sub(1, Ordering::Relaxed);
1036 assert!(old != 0);
1037 if old <= MAX_IN_FLIGHT_TRANSACTIONS {
1038 self.transaction_limit_event.notify(usize::MAX);
1039 }
1040 }
1041
1042 pub async fn truncate_guard(&self, store_id: u64, object_id: u64) -> TruncateGuard<'_> {
1043 let keys = lock_keys![LockKey::truncate(store_id, object_id,)];
1044 TruncateGuard(self.lock_manager().write_lock(keys).await)
1045 }
1046
1047 async fn populate_stores_node(&self) -> Result<Inspector, Error> {
1048 let inspector = fuchsia_inspect::Inspector::default();
1049 let root = inspector.root();
1050 root.record_child("__root", |n| self.root_store().record_data(n));
1051 root.record_child("__root_parent", |n| self.root_parent_store().record_data(n));
1052 let object_manager = self.object_manager();
1053 let volume_directory = object_manager.volume_directory();
1054 let layer_set = volume_directory.store().tree().layer_set();
1055 let mut merger = layer_set.merger();
1056 let mut iter = volume_directory.iter(&mut merger).await?;
1057 while let Some((name, id, _)) = iter.get() {
1058 if let Some(store) = object_manager.store(id) {
1059 root.record_child(name.to_string(), |n| store.record_data(n));
1060 }
1061 iter.advance().await?;
1062 }
1063 Ok(inspector)
1064 }
1065}
1066
1067#[allow(dead_code)]
1069pub struct TruncateGuard<'a>(WriteGuard<'a>);
1070
1071pub async fn mkfs(device: DeviceHolder) -> Result<DeviceHolder, Error> {
1073 let fs = FxFilesystem::new_empty(device).await?;
1074 fs.close().await?;
1075 Ok(fs.take_device().await)
1076}
1077
1078pub async fn mkfs_with_volume(
1082 device: DeviceHolder,
1083 volume_name: &str,
1084 crypt: Option<Arc<dyn Crypt>>,
1085) -> Result<DeviceHolder, Error> {
1086 let fs = FxFilesystem::new_empty(device).await?;
1087 {
1088 let root_volume = root_volume(fs.clone()).await.expect("Open root_volume failed");
1091 root_volume
1092 .new_volume(
1093 volume_name,
1094 NewChildStoreOptions {
1095 options: StoreOptions { crypt, ..StoreOptions::default() },
1096 ..Default::default()
1097 },
1098 )
1099 .await
1100 .expect("Create volume failed");
1101 }
1102 fs.close().await?;
1103 Ok(fs.take_device().await)
1104}
1105
1106struct FsckAfterEveryTransaction {
1107 fs: OnceLock<Weak<FxFilesystem>>,
1108 old_hook: PostCommitHook,
1109}
1110
1111impl FsckAfterEveryTransaction {
1112 fn new(old_hook: PostCommitHook) -> Arc<Self> {
1113 Arc::new(Self { fs: OnceLock::new(), old_hook })
1114 }
1115
1116 async fn run(self: Arc<Self>) {
1117 if let Some(fs) = self.fs.get().and_then(Weak::upgrade) {
1118 let options = FsckOptions {
1119 fail_on_warning: true,
1120 no_lock: true,
1121 quiet: true,
1122 ..Default::default()
1123 };
1124 fsck_with_options(fs.clone(), &options).await.expect("fsck failed");
1125 let object_manager = fs.object_manager();
1126 for store in object_manager.unlocked_stores() {
1127 let store_id = store.store_object_id();
1128 if !object_manager.is_system_store(store_id) {
1129 fsck_volume_with_options(fs.as_ref(), &options, store_id, None)
1130 .await
1131 .expect("fsck_volume_with_options failed");
1132 }
1133 }
1134 }
1135 if let Some(old_hook) = self.old_hook.as_ref() {
1136 old_hook().await;
1137 }
1138 }
1139}
1140
1141struct Pauser {
1142 pause: Condition<bool>,
1143 bounce_delay: Duration,
1144}
1145
1146impl Pauser {
1147 fn new(bounce_delay: Duration) -> Self {
1149 Self { pause: Condition::new(true), bounce_delay }
1150 }
1151
1152 async fn maybe_pause(&self) {
1153 loop {
1154 if !*self.pause.lock() {
1155 return;
1156 }
1157 self.pause.when(|p| if **p { Poll::Pending } else { Poll::Ready(()) }).await;
1158 fasync::Timer::new(self.bounce_delay).await;
1159 }
1160 }
1161
1162 fn set_pause(&self, v: bool) {
1163 let mut guard = self.pause.lock();
1164 if *guard == v {
1165 return;
1166 }
1167 *guard = v;
1168 for waker in guard.drain_wakers() {
1169 waker.wake();
1170 }
1171 }
1172}
1173
1174trait NextLatest: Stream + Unpin {
1175 async fn next_latest(&mut self) -> Option<Self::Item> {
1177 let Some(mut next) = self.next().await else { return None };
1178
1179 loop {
1181 match self.next().now_or_never() {
1182 None => return Some(next),
1183 Some(None) => return None,
1184 Some(Some(n)) => next = n,
1185 }
1186 }
1187 }
1188}
1189
1190impl<T: ?Sized + Unpin> NextLatest for T where T: Stream {}
1191
1192#[cfg(test)]
1193mod tests {
1194 use super::{FxFilesystem, FxFilesystemBuilder, FxfsError, SyncOptions};
1195 use crate::fsck::{fsck, fsck_volume};
1196 use crate::log::*;
1197 use crate::lsm_tree::Operation;
1198 use crate::lsm_tree::types::Item;
1199 use crate::object_handle::{
1200 INVALID_OBJECT_ID, ObjectHandle, ReadObjectHandle, WriteObjectHandle,
1201 };
1202 use crate::object_store::directory::{Directory, replace_child};
1203 use crate::object_store::journal::JournalOptions;
1204 use crate::object_store::journal::super_block::SuperBlockInstance;
1205 use crate::object_store::transaction::{LockKey, Options, lock_keys};
1206 use crate::object_store::volume::root_volume;
1207 use crate::object_store::{
1208 HandleOptions, NewChildStoreOptions, ObjectDescriptor, ObjectStore, StoreOptions,
1209 };
1210 use crate::range::RangeExt;
1211 use fuchsia_async as fasync;
1212 use fuchsia_sync::Mutex;
1213 use futures::future::join_all;
1214 use futures::stream::{FuturesUnordered, TryStreamExt};
1215 use fxfs_insecure_crypto::new_insecure_crypt;
1216 use rustc_hash::FxHashMap as HashMap;
1217 use std::ops::Range;
1218 use std::sync::Arc;
1219 use std::sync::atomic::{AtomicU32, Ordering};
1220 use std::time::Duration;
1221 use storage_device::DeviceHolder;
1222 use storage_device::fake_device::{self, FakeDevice};
1223 use test_case::test_case;
1224
1225 const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
1226
1227 #[fuchsia::test(threads = 10)]
1228 async fn test_compaction() {
1229 let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1230
1231 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1233 let root_store = fs.root_store();
1234 let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
1235 .await
1236 .expect("open failed");
1237
1238 let mut tasks = Vec::new();
1239 for i in 0..2 {
1240 let mut transaction = fs
1241 .root_store()
1242 .new_transaction(
1243 lock_keys![LockKey::object(
1244 root_store.store_object_id(),
1245 root_directory.object_id()
1246 )],
1247 Options::default(),
1248 )
1249 .await
1250 .expect("new_transaction failed");
1251 let handle = root_directory
1252 .create_child_file(&mut transaction, &format!("{}", i))
1253 .await
1254 .expect("create_child_file failed");
1255 transaction.commit().await.expect("commit failed");
1256 tasks.push(fasync::Task::spawn(async move {
1257 const TEST_DATA: &[u8] = b"hello";
1258 let mut buf = handle.allocate_buffer(TEST_DATA.len()).await;
1259 buf.as_mut_slice().copy_from_slice(TEST_DATA);
1260 for _ in 0..1500 {
1261 handle.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
1262 }
1263 }));
1264 }
1265 join_all(tasks).await;
1266 fs.sync(SyncOptions::default()).await.expect("sync failed");
1267
1268 fsck(fs.clone()).await.expect("fsck failed");
1269 fs.close().await.expect("Close failed");
1270 }
1271
1272 #[fuchsia::test]
1273 async fn test_enable_allocations() {
1274 {
1276 let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1277 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1278 fs.enable_allocations();
1279 let root_store = fs.root_store();
1280 let root_directory =
1281 Directory::open(&root_store, root_store.root_directory_object_id())
1282 .await
1283 .expect("open failed");
1284 let mut transaction = fs
1285 .root_store()
1286 .new_transaction(
1287 lock_keys![LockKey::object(
1288 root_store.store_object_id(),
1289 root_directory.object_id()
1290 )],
1291 Options::default(),
1292 )
1293 .await
1294 .expect("new_transaction failed");
1295 root_directory
1296 .create_child_file(&mut transaction, "test")
1297 .await
1298 .expect("create_child_file failed");
1299 transaction.commit().await.expect("commit failed");
1300 fs.close().await.expect("close failed");
1301 }
1302
1303 {
1305 let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1306 let fs = FxFilesystemBuilder::new()
1307 .format(true)
1308 .image_builder_mode(Some(SuperBlockInstance::A))
1309 .open(device)
1310 .await
1311 .expect("open failed");
1312 let root_store = fs.root_store();
1313 let root_directory =
1314 Directory::open(&root_store, root_store.root_directory_object_id())
1315 .await
1316 .expect("open failed");
1317
1318 let mut transaction = fs
1319 .root_store()
1320 .new_transaction(
1321 lock_keys![LockKey::object(
1322 root_store.store_object_id(),
1323 root_directory.object_id()
1324 )],
1325 Options::default(),
1326 )
1327 .await
1328 .expect("new_transaction failed");
1329 let handle = root_directory
1330 .create_child_file(&mut transaction, "test_fail")
1331 .await
1332 .expect("create_child_file failed");
1333 transaction.commit().await.expect("commit failed");
1334
1335 assert!(
1337 FxfsError::Unavailable
1338 .matches(&handle.allocate(0..4096).await.expect_err("allocate should fail"))
1339 );
1340
1341 fs.enable_allocations();
1343 handle.allocate(0..4096).await.expect("allocate should work after enable_allocations");
1344
1345 fs.close().await.expect("close failed");
1349 }
1350 }
1354
1355 #[fuchsia::test(threads = 10)]
1356 async fn test_replay_is_identical() {
1357 let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1358 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1359
1360 fs.close().await.expect("close failed");
1363 let device = fs.take_device().await;
1364 device.reopen(false);
1365
1366 struct Mutations<K, V>(Mutex<Vec<(Operation, Item<K, V>)>>);
1367
1368 impl<K: Clone, V: Clone> Mutations<K, V> {
1369 fn new() -> Self {
1370 Mutations(Mutex::new(Vec::new()))
1371 }
1372
1373 fn push(&self, operation: Operation, item: &Item<K, V>) {
1374 self.0.lock().push((operation, item.clone()));
1375 }
1376 }
1377
1378 let open_fs = |device,
1379 object_mutations: Arc<Mutex<HashMap<_, _>>>,
1380 allocator_mutations: Arc<Mutations<_, _>>| async {
1381 FxFilesystemBuilder::new()
1382 .journal_options(JournalOptions { reclaim_size: u64::MAX, ..Default::default() })
1383 .on_new_allocator(move |allocator| {
1384 let allocator_mutations = allocator_mutations.clone();
1385 allocator.tree().set_mutation_callback(Some(Box::new(move |op, item| {
1386 allocator_mutations.push(op, item)
1387 })));
1388 })
1389 .on_new_store(move |store| {
1390 let mutations = Arc::new(Mutations::new());
1391 object_mutations.lock().insert(store.store_object_id(), mutations.clone());
1392 store.tree().set_mutation_callback(Some(Box::new(move |op, item| {
1393 mutations.push(op, item)
1394 })));
1395 })
1396 .open(device)
1397 .await
1398 .expect("open failed")
1399 };
1400
1401 let allocator_mutations = Arc::new(Mutations::new());
1402 let object_mutations = Arc::new(Mutex::new(HashMap::default()));
1403 let fs = open_fs(device, object_mutations.clone(), allocator_mutations.clone()).await;
1404
1405 let root_store = fs.root_store();
1406 let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
1407 .await
1408 .expect("open failed");
1409
1410 let mut transaction = fs
1411 .root_store()
1412 .new_transaction(
1413 lock_keys![LockKey::object(
1414 root_store.store_object_id(),
1415 root_directory.object_id()
1416 )],
1417 Options::default(),
1418 )
1419 .await
1420 .expect("new_transaction failed");
1421 let object = root_directory
1422 .create_child_file(&mut transaction, "test")
1423 .await
1424 .expect("create_child_file failed");
1425 transaction.commit().await.expect("commit failed");
1426
1427 let buf = object.allocate_buffer(10000).await;
1429 object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
1430
1431 object.write_or_append(Some(5000), buf.as_ref()).await.expect("write failed");
1433
1434 object.truncate(3000).await.expect("truncate failed");
1436
1437 let mut transaction = fs
1439 .root_store()
1440 .new_transaction(
1441 lock_keys![
1442 LockKey::object(root_store.store_object_id(), root_directory.object_id()),
1443 LockKey::object(root_store.store_object_id(), object.object_id()),
1444 ],
1445 Options::default(),
1446 )
1447 .await
1448 .expect("new_transaction failed");
1449
1450 replace_child(&mut transaction, None, (&root_directory, "test"))
1451 .await
1452 .expect("replace_child failed");
1453
1454 transaction.commit().await.expect("commit failed");
1455
1456 root_store
1458 .tombstone_object(object.object_id(), Options::default())
1459 .await
1460 .expect("tombstone failed");
1461
1462 fs.close().await.expect("close failed");
1464
1465 let metadata_reservation_amount = fs.object_manager().metadata_reservation().amount();
1466
1467 let device = fs.take_device().await;
1468 device.reopen(false);
1469
1470 let replayed_object_mutations = Arc::new(Mutex::new(HashMap::default()));
1471 let replayed_allocator_mutations = Arc::new(Mutations::new());
1472 let fs = open_fs(
1473 device,
1474 replayed_object_mutations.clone(),
1475 replayed_allocator_mutations.clone(),
1476 )
1477 .await;
1478
1479 let m1 = object_mutations.lock();
1480 let m2 = replayed_object_mutations.lock();
1481 assert_eq!(m1.len(), m2.len());
1482 for (store_id, mutations) in &*m1 {
1483 let mutations = mutations.0.lock();
1484 let replayed = m2.get(&store_id).expect("Found unexpected store").0.lock();
1485 assert_eq!(mutations.len(), replayed.len());
1486 for ((op1, i1), (op2, i2)) in mutations.iter().zip(replayed.iter()) {
1487 assert_eq!(op1, op2);
1488 assert_eq!(i1.key, i2.key);
1489 assert_eq!(i1.value, i2.value);
1490 }
1491 }
1492
1493 let a1 = allocator_mutations.0.lock();
1494 let a2 = replayed_allocator_mutations.0.lock();
1495 assert_eq!(a1.len(), a2.len());
1496 for ((op1, i1), (op2, i2)) in a1.iter().zip(a2.iter()) {
1497 assert_eq!(op1, op2);
1498 assert_eq!(i1.key, i2.key);
1499 assert_eq!(i1.value, i2.value);
1500 }
1501
1502 assert_eq!(
1503 fs.object_manager().metadata_reservation().amount(),
1504 metadata_reservation_amount
1505 );
1506 }
1507
1508 #[fuchsia::test]
1509 async fn test_max_in_flight_transactions() {
1510 let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1511 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
1512
1513 let store = fs.root_store();
1514 let transactions = FuturesUnordered::new();
1515 for _ in 0..super::MAX_IN_FLIGHT_TRANSACTIONS {
1516 transactions.push(store.new_transaction(lock_keys![], Options::default()));
1517 }
1518 let mut transactions: Vec<_> = transactions.try_collect().await.unwrap();
1519
1520 let mut fut = std::pin::pin!(store.new_transaction(lock_keys![], Options::default()));
1522 assert!(futures::poll!(&mut fut).is_pending());
1523
1524 transactions.pop();
1526
1527 assert!(futures::poll!(&mut fut).is_ready());
1528 }
1529
1530 #[fuchsia::test(threads = 10)]
1532 async fn test_continuously_trim() {
1533 let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
1534 let fs = FxFilesystemBuilder::new()
1535 .trim_config(Some((Duration::ZERO, Duration::ZERO)))
1536 .format(true)
1537 .open(device)
1538 .await
1539 .expect("open failed");
1540 fasync::Timer::new(Duration::from_millis(10)).await;
1542
1543 let root_store = fs.root_store();
1546 let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
1547 .await
1548 .expect("open failed");
1549 for _ in 0..100 {
1550 let mut transaction = fs
1551 .root_store()
1552 .new_transaction(
1553 lock_keys![LockKey::object(
1554 root_store.store_object_id(),
1555 root_directory.object_id()
1556 )],
1557 Options::default(),
1558 )
1559 .await
1560 .expect("new_transaction failed");
1561 let object = root_directory
1562 .create_child_file(&mut transaction, "test")
1563 .await
1564 .expect("create_child_file failed");
1565 transaction.commit().await.expect("commit failed");
1566
1567 {
1568 let buf = object.allocate_buffer(1024).await;
1569 object.write_or_append(Some(0), buf.as_ref()).await.expect("write failed");
1570 }
1571 std::mem::drop(object);
1572
1573 let mut transaction = root_directory
1574 .acquire_context_for_replace(None, "test", true)
1575 .await
1576 .expect("acquire_context_for_replace failed")
1577 .transaction;
1578 replace_child(&mut transaction, None, (&root_directory, "test"))
1579 .await
1580 .expect("replace_child failed");
1581 transaction.commit().await.expect("commit failed");
1582 }
1583 fs.close().await.expect("close failed");
1584 }
1585
1586 #[test_case(true; "test power fail with barriers")]
1587 #[test_case(false; "test power fail with checksums")]
1588 #[fuchsia::test]
1589 async fn test_power_fail(barriers_enabled: bool) {
1590 for _ in 0..10 {
1593 let (store_id, device, test_file_object_id) = {
1594 let device = DeviceHolder::new(FakeDevice::new(8192, 4096));
1595 let fs = if barriers_enabled {
1596 FxFilesystemBuilder::new()
1597 .barriers_enabled(true)
1598 .format(true)
1599 .open(device)
1600 .await
1601 .expect("new filesystem failed")
1602 } else {
1603 FxFilesystem::new_empty(device).await.expect("new_empty failed")
1604 };
1605 let root_volume = root_volume(fs.clone()).await.expect("root_volume failed");
1606
1607 fs.sync(SyncOptions { flush_device: true, ..SyncOptions::default() })
1608 .await
1609 .expect("sync failed");
1610
1611 let store = root_volume
1612 .new_volume(
1613 "test",
1614 NewChildStoreOptions {
1615 options: StoreOptions {
1616 crypt: Some(Arc::new(new_insecure_crypt())),
1617 ..StoreOptions::default()
1618 },
1619 ..Default::default()
1620 },
1621 )
1622 .await
1623 .expect("new_volume failed");
1624 let root_directory = Directory::open(&store, store.root_directory_object_id())
1625 .await
1626 .expect("open failed");
1627
1628 async fn create_files(store: &Arc<ObjectStore>, prefix: &str) {
1630 let fs = store.filesystem();
1631 let root_directory = Directory::open(store, store.root_directory_object_id())
1632 .await
1633 .expect("open failed");
1634 for i in 0..100 {
1635 let mut transaction = fs
1636 .root_store()
1637 .new_transaction(
1638 lock_keys![LockKey::object(
1639 store.store_object_id(),
1640 store.root_directory_object_id()
1641 )],
1642 Options::default(),
1643 )
1644 .await
1645 .expect("new_transaction failed");
1646 root_directory
1647 .create_child_file(&mut transaction, &format!("{prefix} {i}"))
1648 .await
1649 .expect("create_child_file failed");
1650 transaction.commit().await.expect("commit failed");
1651 }
1652 }
1653
1654 create_files(&store, "A").await;
1656
1657 let mut transaction = fs
1660 .root_store()
1661 .new_transaction(
1662 lock_keys![LockKey::object(
1663 store.store_object_id(),
1664 store.root_directory_object_id()
1665 )],
1666 Options::default(),
1667 )
1668 .await
1669 .expect("new_transaction failed");
1670 let object = root_directory
1671 .create_child_file(&mut transaction, "test")
1672 .await
1673 .expect("create_child_file failed");
1674 transaction.commit().await.expect("commit failed");
1675
1676 let mut transaction =
1677 object.new_transaction().await.expect("new_transaction failed");
1678 let mut buffer = object.allocate_buffer(4096).await;
1679 buffer.as_mut_slice().fill(0xed);
1680 object
1681 .txn_write(&mut transaction, 0, buffer.as_ref())
1682 .await
1683 .expect("txn_write failed");
1684 transaction.commit().await.expect("commit failed");
1685
1686 create_files(&store, "B").await;
1688
1689 fs.sync(SyncOptions::default()).await.expect("sync failed");
1692
1693 fasync::Timer::new(Duration::from_millis(10)).await;
1699
1700 (
1701 store.store_object_id(),
1702 fs.device().snapshot().expect("snapshot failed"),
1703 object.object_id(),
1704 )
1705 };
1706
1707 device
1710 .discard_random_since_last_flush()
1711 .expect("discard_random_since_last_flush failed");
1712
1713 let fs = FxFilesystem::open(device).await.expect("open failed");
1714 fsck(fs.clone()).await.expect("fsck failed");
1715
1716 let mut check_test_file = false;
1717
1718 let object_id = if fs.object_manager().store(store_id).is_some() {
1721 fsck_volume(&fs, store_id, Some(Arc::new(new_insecure_crypt())))
1722 .await
1723 .expect("fsck_volume failed");
1724
1725 let store = root_volume(fs.clone())
1729 .await
1730 .expect("root_volume failed")
1731 .volume(
1732 "test",
1733 StoreOptions {
1734 crypt: Some(Arc::new(new_insecure_crypt())),
1735 ..StoreOptions::default()
1736 },
1737 )
1738 .await
1739 .expect("volume failed");
1740
1741 let root_directory = Directory::open(&store, store.root_directory_object_id())
1742 .await
1743 .expect("open failed");
1744
1745 let mut transaction = fs
1746 .root_store()
1747 .new_transaction(
1748 lock_keys![LockKey::object(
1749 store.store_object_id(),
1750 store.root_directory_object_id()
1751 )],
1752 Options::default(),
1753 )
1754 .await
1755 .expect("new_transaction failed");
1756 let object = root_directory
1757 .create_child_file(&mut transaction, &format!("C"))
1758 .await
1759 .expect("create_child_file failed");
1760 transaction.commit().await.expect("commit failed");
1761
1762 if let Ok(test_file) = ObjectStore::open_object(
1764 &store,
1765 test_file_object_id,
1766 HandleOptions::default(),
1767 None,
1768 )
1769 .await
1770 {
1771 let mut buffer = test_file.allocate_buffer(4096).await;
1773 let bytes = test_file.read(0, buffer.as_mut()).await.expect("read failed");
1774 if bytes == 4096 {
1775 let expected = [0xed; 4096];
1776 assert_eq!(buffer.as_slice(), &expected);
1777 } else {
1778 assert_eq!(bytes, 0);
1780 }
1781
1782 let mut transaction =
1784 test_file.new_transaction().await.expect("new_transaction failed");
1785 buffer.as_mut_slice().fill(0x37);
1786 test_file
1787 .txn_write(&mut transaction, 0, buffer.as_ref())
1788 .await
1789 .expect("txn_write failed");
1790 transaction.commit().await.expect("commit failed");
1791 check_test_file = true;
1792 }
1793
1794 object.object_id()
1795 } else {
1796 INVALID_OBJECT_ID
1797 };
1798
1799 fs.close().await.expect("close failed");
1801 let device = fs.take_device().await;
1802 device.reopen(false);
1803
1804 let fs = FxFilesystem::open(device).await.expect("open failed");
1805 fsck(fs.clone()).await.expect("fsck failed");
1806
1807 if object_id != INVALID_OBJECT_ID {
1810 fsck_volume(&fs, store_id, Some(Arc::new(new_insecure_crypt())))
1811 .await
1812 .expect("fsck_volume failed");
1813
1814 let store = root_volume(fs.clone())
1815 .await
1816 .expect("root_volume failed")
1817 .volume(
1818 "test",
1819 StoreOptions {
1820 crypt: Some(Arc::new(new_insecure_crypt())),
1821 ..StoreOptions::default()
1822 },
1823 )
1824 .await
1825 .expect("volume failed");
1826 ObjectStore::open_object(&store, object_id, HandleOptions::default(), None)
1828 .await
1829 .expect("open_object failed");
1830
1831 if check_test_file {
1833 info!("Checking test file for modification");
1834 let test_file = ObjectStore::open_object(
1835 &store,
1836 test_file_object_id,
1837 HandleOptions::default(),
1838 None,
1839 )
1840 .await
1841 .expect("open_object failed");
1842 let mut buffer = test_file.allocate_buffer(4096).await;
1843 assert_eq!(
1844 test_file.read(0, buffer.as_mut()).await.expect("read failed"),
1845 4096
1846 );
1847 let expected = [0x37; 4096];
1848 assert_eq!(buffer.as_slice(), &expected);
1849 }
1850 }
1851
1852 fs.close().await.expect("close failed");
1853 }
1854 }
1855
1856 #[fuchsia::test]
1857 async fn test_barrier_not_emitted_when_transaction_has_no_data() {
1858 let barrier_count = Arc::new(AtomicU32::new(0));
1859
1860 struct Observer(Arc<AtomicU32>);
1861
1862 impl fake_device::Observer for Observer {
1863 fn barrier(&self) {
1864 self.0.fetch_add(1, Ordering::Relaxed);
1865 }
1866 }
1867
1868 let mut fake_device = FakeDevice::new(8192, 4096);
1869 fake_device.set_observer(Box::new(Observer(barrier_count.clone())));
1870 let device = DeviceHolder::new(fake_device);
1871 let fs = FxFilesystemBuilder::new()
1872 .barriers_enabled(true)
1873 .format(true)
1874 .open(device)
1875 .await
1876 .expect("new filesystem failed");
1877
1878 {
1879 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
1880 root_vol
1881 .new_volume(
1882 "test",
1883 NewChildStoreOptions {
1884 options: StoreOptions {
1885 crypt: Some(Arc::new(new_insecure_crypt())),
1886 ..StoreOptions::default()
1887 },
1888 ..NewChildStoreOptions::default()
1889 },
1890 )
1891 .await
1892 .expect("there is no test volume");
1893 fs.close().await.expect("close failed");
1894 }
1895 let device = fs.take_device().await;
1898 device.reopen(false);
1899 let fs = FxFilesystemBuilder::new()
1900 .barriers_enabled(true)
1901 .open(device)
1902 .await
1903 .expect("new filesystem failed");
1904 let expected_barrier_count = barrier_count.load(Ordering::Relaxed);
1905
1906 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
1907 let store = root_vol
1908 .volume(
1909 "test",
1910 StoreOptions {
1911 crypt: Some(Arc::new(new_insecure_crypt())),
1912 ..StoreOptions::default()
1913 },
1914 )
1915 .await
1916 .expect("there is no test volume");
1917
1918 let fs = store.filesystem();
1920 let root_directory =
1921 Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
1922 for i in 0..100 {
1923 let mut transaction = fs
1924 .root_store()
1925 .new_transaction(
1926 lock_keys![LockKey::object(
1927 store.store_object_id(),
1928 store.root_directory_object_id()
1929 )],
1930 Options::default(),
1931 )
1932 .await
1933 .expect("new_transaction failed");
1934 root_directory
1935 .create_child_file(&mut transaction, &format!("A {i}"))
1936 .await
1937 .expect("create_child_file failed");
1938 transaction.commit().await.expect("commit failed");
1939 }
1940
1941 fs.close().await.expect("close failed");
1943 assert_eq!(expected_barrier_count, barrier_count.load(Ordering::Relaxed));
1945 }
1946
1947 #[fuchsia::test]
1948 async fn test_barrier_emitted_when_transaction_includes_data() {
1949 let barrier_count = Arc::new(AtomicU32::new(0));
1950
1951 struct Observer(Arc<AtomicU32>);
1952
1953 impl fake_device::Observer for Observer {
1954 fn barrier(&self) {
1955 self.0.fetch_add(1, Ordering::Relaxed);
1956 }
1957 }
1958
1959 let mut fake_device = FakeDevice::new(8192, 4096);
1960 fake_device.set_observer(Box::new(Observer(barrier_count.clone())));
1961 let device = DeviceHolder::new(fake_device);
1962 let fs = FxFilesystemBuilder::new()
1963 .barriers_enabled(true)
1964 .format(true)
1965 .open(device)
1966 .await
1967 .expect("new filesystem failed");
1968
1969 {
1970 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
1971 root_vol
1972 .new_volume(
1973 "test",
1974 NewChildStoreOptions {
1975 options: StoreOptions {
1976 crypt: Some(Arc::new(new_insecure_crypt())),
1977 ..StoreOptions::default()
1978 },
1979 ..NewChildStoreOptions::default()
1980 },
1981 )
1982 .await
1983 .expect("there is no test volume");
1984 fs.close().await.expect("close failed");
1985 }
1986 let device = fs.take_device().await;
1989 device.reopen(false);
1990 let fs = FxFilesystemBuilder::new()
1991 .barriers_enabled(true)
1992 .open(device)
1993 .await
1994 .expect("new filesystem failed");
1995 let expected_barrier_count = barrier_count.load(Ordering::Relaxed);
1996
1997 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
1998 let store = root_vol
1999 .volume(
2000 "test",
2001 StoreOptions {
2002 crypt: Some(Arc::new(new_insecure_crypt())),
2003 ..StoreOptions::default()
2004 },
2005 )
2006 .await
2007 .expect("there is no test volume");
2008
2009 let fs: Arc<FxFilesystem> = store.filesystem();
2011 let root_directory =
2012 Directory::open(&store, store.root_directory_object_id()).await.expect("open failed");
2013
2014 let mut transaction = fs
2015 .root_store()
2016 .new_transaction(
2017 lock_keys![LockKey::object(
2018 store.store_object_id(),
2019 store.root_directory_object_id()
2020 )],
2021 Options::default(),
2022 )
2023 .await
2024 .expect("new_transaction failed");
2025 let object = root_directory
2026 .create_child_file(&mut transaction, "test")
2027 .await
2028 .expect("create_child_file failed");
2029 transaction.commit().await.expect("commit failed");
2030
2031 let mut transaction = object.new_transaction().await.expect("new_transaction failed");
2032 let mut buffer = object.allocate_buffer(4096).await;
2033 buffer.as_mut_slice().fill(0xed);
2034 object.txn_write(&mut transaction, 0, buffer.as_ref()).await.expect("txn_write failed");
2035 transaction.commit().await.expect("commit failed");
2036
2037 fs.close().await.expect("close failed");
2039 assert!(expected_barrier_count < barrier_count.load(Ordering::Relaxed));
2041 }
2042
2043 #[test_case(true; "fail when original filesystem has barriers enabled")]
2044 #[test_case(false; "fail when original filesystem has barriers disabled")]
2045 #[fuchsia::test]
2046 async fn test_switching_barrier_mode_on_existing_filesystem(original_barrier_mode: bool) {
2047 let crypt = Some(Arc::new(new_insecure_crypt()) as Arc<dyn fxfs_crypto::Crypt>);
2048 let fake_device = FakeDevice::new(8192, 4096);
2049 let device = DeviceHolder::new(fake_device);
2050 let fs: super::OpenFxFilesystem = FxFilesystemBuilder::new()
2051 .barriers_enabled(original_barrier_mode)
2052 .format(true)
2053 .open(device)
2054 .await
2055 .expect("new filesystem failed");
2056
2057 {
2059 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2060 let store = root_vol
2061 .new_volume(
2062 "test",
2063 NewChildStoreOptions {
2064 options: StoreOptions { crypt: crypt.clone(), ..Default::default() },
2065 ..Default::default()
2066 },
2067 )
2068 .await
2069 .expect("creating test volume");
2070 let root_dir = Directory::open(&store, store.root_directory_object_id())
2071 .await
2072 .expect("open failed");
2073 let mut transaction = fs
2074 .root_store()
2075 .new_transaction(
2076 lock_keys![LockKey::object(
2077 store.store_object_id(),
2078 store.root_directory_object_id()
2079 )],
2080 Default::default(),
2081 )
2082 .await
2083 .expect("new_transaction failed");
2084 let object = root_dir
2085 .create_child_file(&mut transaction, "file")
2086 .await
2087 .expect("create_child_file failed");
2088 transaction.commit().await.expect("commit failed");
2089 let mut buffer = object.allocate_buffer(4096).await;
2090 buffer.as_mut_slice().fill(0xA7);
2091 let new_size = object.write_or_append(None, buffer.as_ref()).await.unwrap();
2092 assert_eq!(new_size, 4096);
2093 }
2094
2095 fs.close().await.expect("close failed");
2097 let device = fs.take_device().await;
2098 device.reopen(false);
2099 let fs = FxFilesystemBuilder::new()
2100 .barriers_enabled(!original_barrier_mode)
2101 .open(device)
2102 .await
2103 .expect("new filesystem failed");
2104 {
2105 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2106 let store = root_vol
2107 .volume("test", StoreOptions { crypt: crypt.clone(), ..Default::default() })
2108 .await
2109 .expect("opening test volume");
2110 let root_dir = Directory::open(&store, store.root_directory_object_id())
2111 .await
2112 .expect("open failed");
2113 let (object_id, _, _) =
2114 root_dir.lookup("file").await.expect("lookup failed").expect("missing file");
2115 let test_file = ObjectStore::open_object(&store, object_id, Default::default(), None)
2116 .await
2117 .expect("open failed");
2118 let mut buffer = test_file.allocate_buffer(4096).await;
2120 buffer.as_mut_slice().fill(0xA8);
2121 let new_size = test_file.write_or_append(None, buffer.as_ref()).await.unwrap();
2122 assert_eq!(new_size, 8192);
2123 }
2124
2125 fs.close().await.expect("close failed");
2128 let device = fs.take_device().await;
2129 device.reopen(false);
2130 let fs = FxFilesystemBuilder::new()
2131 .barriers_enabled(original_barrier_mode)
2132 .open(device)
2133 .await
2134 .expect("new filesystem failed");
2135 {
2136 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2137 let store = root_vol
2138 .volume("test", StoreOptions { crypt: crypt.clone(), ..Default::default() })
2139 .await
2140 .expect("opening test volume");
2141 let root_dir = Directory::open(&store, store.root_directory_object_id())
2142 .await
2143 .expect("open failed");
2144 let (object_id, _, _) =
2145 root_dir.lookup("file").await.expect("lookup failed").expect("missing file");
2146 let test_file = ObjectStore::open_object(&store, object_id, Default::default(), None)
2147 .await
2148 .expect("open failed");
2149 let mut buffer = test_file.allocate_buffer(8192).await;
2150 assert_eq!(
2151 test_file.read(0, buffer.as_mut()).await.expect("read failed"),
2152 8192,
2153 "short read"
2154 );
2155 assert_eq!(buffer.as_slice()[0..4096], [0xA7; 4096]);
2156 assert_eq!(buffer.as_slice()[4096..8192], [0xA8; 4096]);
2157 }
2158 fs.close().await.expect("close failed");
2159 }
2160
2161 #[fuchsia::test]
2162 async fn test_image_builder_mode_no_early_writes() {
2163 const BLOCK_SIZE: u32 = 4096;
2164 let device = DeviceHolder::new(FakeDevice::new(2048, BLOCK_SIZE));
2165 device.reopen(true);
2166 let fs = FxFilesystemBuilder::new()
2167 .format(true)
2168 .image_builder_mode(Some(SuperBlockInstance::A))
2169 .open(device)
2170 .await
2171 .expect("open failed");
2172 fs.enable_allocations();
2173 fs.device().reopen(false);
2175 fs.close().await.expect("closed");
2176 }
2177
2178 #[fuchsia::test]
2179 async fn test_image_builder_mode() {
2180 const BLOCK_SIZE: u32 = 4096;
2181 const EXISTING_FILE_RANGE: Range<u64> = 4096 * 1024..4096 * 1025;
2182 let device = DeviceHolder::new(FakeDevice::new(2048, BLOCK_SIZE));
2183
2184 {
2186 let mut write_buf =
2187 device.allocate_buffer(EXISTING_FILE_RANGE.length().unwrap() as usize).await;
2188 write_buf.as_mut_slice().fill(0xf0);
2189 device.write(EXISTING_FILE_RANGE.start, write_buf.as_ref()).await.expect("write");
2190 }
2191
2192 device.reopen(true);
2193
2194 let device = {
2195 let fs = FxFilesystemBuilder::new()
2196 .format(true)
2197 .image_builder_mode(Some(SuperBlockInstance::B))
2198 .open(device)
2199 .await
2200 .expect("open failed");
2201 fs.enable_allocations();
2202 {
2203 let root_store = fs.root_store();
2204 let root_directory =
2205 Directory::open(&root_store, root_store.root_directory_object_id())
2206 .await
2207 .expect("open failed");
2208 let handle;
2210 {
2211 let mut transaction = fs
2212 .root_store()
2213 .new_transaction(
2214 lock_keys![LockKey::object(
2215 root_directory.store().store_object_id(),
2216 root_directory.object_id()
2217 )],
2218 Options::default(),
2219 )
2220 .await
2221 .expect("new transaction");
2222 handle = root_directory
2223 .create_child_file(&mut transaction, "test")
2224 .await
2225 .expect("create file");
2226 handle.extend(&mut transaction, EXISTING_FILE_RANGE).await.expect("extend");
2227 transaction.commit().await.expect("commit");
2228 }
2229 }
2230 fs.device().reopen(false);
2231 fs.close().await.expect("close");
2232 fs.take_device().await
2233 };
2234 device.reopen(false);
2235 let fs = FxFilesystem::open(device).await.expect("open failed");
2236 fsck(fs.clone()).await.expect("fsck failed");
2237
2238 let root_store = fs.root_store();
2240 let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
2241 .await
2242 .expect("open failed");
2243 let (object_id, descriptor, _) =
2244 root_directory.lookup("test").await.expect("lookup failed").unwrap();
2245 assert_eq!(descriptor, ObjectDescriptor::File);
2246 let test_file =
2247 ObjectStore::open_object(&root_store, object_id, HandleOptions::default(), None)
2248 .await
2249 .expect("open failed");
2250 let mut read_buf =
2251 test_file.allocate_buffer(EXISTING_FILE_RANGE.length().unwrap() as usize).await;
2252 test_file.read(0, read_buf.as_mut()).await.expect("read failed");
2253 assert_eq!(read_buf.as_slice(), [0xf0; 4096]);
2254 fs.close().await.expect("closed");
2255 }
2256
2257 #[fuchsia::test]
2258 async fn test_read_only_mount_on_full_filesystem() {
2259 let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
2260 let fs =
2261 FxFilesystemBuilder::new().format(true).open(device).await.expect("new_empty failed");
2262 let root_store = fs.root_store();
2263 let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
2264 .await
2265 .expect("open failed");
2266
2267 let mut transaction = fs
2268 .root_store()
2269 .new_transaction(
2270 lock_keys![LockKey::object(
2271 root_store.store_object_id(),
2272 root_directory.object_id()
2273 )],
2274 Options::default(),
2275 )
2276 .await
2277 .expect("new_transaction failed");
2278 let handle = root_directory
2279 .create_child_file(&mut transaction, "test")
2280 .await
2281 .expect("create_child_file failed");
2282 transaction.commit().await.expect("commit failed");
2283
2284 let mut buf = handle.allocate_buffer(4096).await;
2285 buf.as_mut_slice().fill(0xaa);
2286 loop {
2287 if handle.write_or_append(None, buf.as_ref()).await.is_err() {
2288 break;
2289 }
2290 }
2291
2292 let max_offset = fs.allocator().maximum_offset();
2293 fs.close().await.expect("Close failed");
2294
2295 let device = fs.take_device().await;
2296 device.reopen(false);
2297 let mut buffer = device
2298 .allocate_buffer(
2299 crate::round::round_up(max_offset, TEST_DEVICE_BLOCK_SIZE).unwrap() as usize
2300 )
2301 .await;
2302 device.read(0, buffer.as_mut()).await.expect("read failed");
2303
2304 let device = DeviceHolder::new(
2305 FakeDevice::from_image(&buffer.as_slice()[..], TEST_DEVICE_BLOCK_SIZE)
2306 .expect("from_image failed"),
2307 );
2308 let fs =
2309 FxFilesystemBuilder::new().read_only(true).open(device).await.expect("open failed");
2310 fs.close().await.expect("Close failed");
2311 }
2312
2313 #[test_case(SuperBlockInstance::A; "Superblock instance A")]
2314 #[test_case(SuperBlockInstance::B; "Superblock instance B")]
2315 #[fuchsia::test]
2316 async fn test_image_builder_mode_flush_on_close_sb_a(target_sb: SuperBlockInstance) {
2317 const BLOCK_SIZE: u32 = 4096;
2318 let device = DeviceHolder::new(FakeDevice::new(2048, BLOCK_SIZE));
2319
2320 device.reopen(true);
2322 let fs = FxFilesystemBuilder::new()
2323 .format(true)
2324 .image_builder_mode(Some(target_sb))
2325 .open(device)
2326 .await
2327 .expect("open failed");
2328
2329 fs.enable_allocations();
2330
2331 fs.device().reopen(false);
2333
2334 {
2336 let root_store = fs.root_store();
2337 let root_directory =
2338 Directory::open(&root_store, root_store.root_directory_object_id())
2339 .await
2340 .expect("open failed");
2341
2342 let mut transaction = fs
2343 .root_store()
2344 .new_transaction(
2345 lock_keys![LockKey::object(
2346 root_directory.store().store_object_id(),
2347 root_directory.object_id()
2348 )],
2349 Options::default(),
2350 )
2351 .await
2352 .expect("new transaction");
2353 let handle = root_directory
2354 .create_child_file(&mut transaction, "post_finalize_file")
2355 .await
2356 .expect("create file");
2357 transaction.commit().await.expect("commit");
2358
2359 let mut buf = handle.allocate_buffer(BLOCK_SIZE as usize).await;
2360 buf.as_mut_slice().fill(0xaa);
2361 handle.write_or_append(None, buf.as_ref()).await.expect("write failed");
2362 }
2363
2364 fs.close().await.expect("close failed");
2366
2367 let other_sb = target_sb.next();
2368
2369 let device = fs.take_device().await;
2371 device.reopen(true); let mut buf = device.allocate_buffer(BLOCK_SIZE as usize).await;
2373
2374 device.read(target_sb.first_extent().start, buf.as_mut()).await.expect("read target_sb");
2375 assert_eq!(&buf.as_slice()[..8], b"FxfsSupr", "target_sb should have magic bytes");
2376
2377 buf.as_mut_slice().fill(0); device.read(other_sb.first_extent().start, buf.as_mut()).await.expect("read other_sb");
2379 assert_eq!(buf.as_slice(), &[0; 4096], "other_sb should be zeroed");
2381 }
2382
2383 #[cfg(target_os = "fuchsia")]
2384 #[fuchsia::test(allow_stalls = false)]
2385 async fn test_trim_with_power_manager() {
2386 use anyhow::Error;
2387 use async_trait::async_trait;
2388 use fuchsia_async::TestExecutor;
2389 use futures::StreamExt;
2390
2391 TestExecutor::advance_to(fasync::MonotonicInstant::ZERO).await;
2392
2393 #[derive(Default)]
2394 struct MockPowerManager {
2395 on_battery: Mutex<bool>,
2396 event: event_listener::Event,
2397 wake_lease: Mutex<Option<zx::EventPair>>,
2398 }
2399
2400 impl MockPowerManager {
2401 fn set_on_battery(&self, v: bool) {
2402 *self.on_battery.lock() = v;
2403 self.event.notify(usize::MAX);
2404 }
2405
2406 fn is_lease_held(&self) -> bool {
2407 self.wake_lease.lock().as_ref().is_some_and(|handle| {
2408 handle
2409 .wait_one(
2410 zx::Signals::EVENTPAIR_PEER_CLOSED,
2411 zx::MonotonicInstant::INFINITE_PAST,
2412 )
2413 .is_err()
2414 })
2415 }
2416 }
2417
2418 impl super::PowerManager for MockPowerManager {
2419 fn watch_battery(
2420 self: Arc<Self>,
2421 ) -> futures::stream::BoxStream<'static, (bool, super::WakeLease)> {
2422 futures::stream::unfold(true, move |first| {
2423 let this = self.clone();
2424 async move {
2425 if !first {
2426 this.event.listen().await;
2427 }
2428 let val = *this.on_battery.lock();
2429 let handle = if val {
2430 zx::NullableHandle::invalid()
2431 } else {
2432 let (h1, h2) = zx::EventPair::create();
2433 *this.wake_lease.lock() = Some(h2);
2434 h1.into_handle()
2436 };
2437 Some(((val, handle), false))
2438 }
2439 })
2440 .boxed()
2441 }
2442 }
2443
2444 let trim_count = Arc::new(AtomicU32::new(0));
2445
2446 struct TrimTrackingDevice {
2447 inner: DeviceHolder,
2448 trim_count: Arc<AtomicU32>,
2449 power_manager: Arc<MockPowerManager>,
2450 }
2451
2452 #[async_trait]
2453 impl storage_device::Device for TrimTrackingDevice {
2454 fn allocate_buffer(&self, size: usize) -> storage_device::buffer::BufferFuture<'_> {
2455 self.inner.allocate_buffer(size)
2456 }
2457 fn block_size(&self) -> u32 {
2458 self.inner.block_size()
2459 }
2460 fn block_count(&self) -> u64 {
2461 self.inner.block_count()
2462 }
2463 async fn read_with_opts(
2464 &self,
2465 offset: u64,
2466 buffer: storage_device::buffer::MutableBufferRef<'_>,
2467 opts: storage_device::ReadOptions,
2468 ) -> Result<(), Error> {
2469 self.inner.read_with_opts(offset, buffer, opts).await
2470 }
2471 async fn write_with_opts(
2472 &self,
2473 offset: u64,
2474 buffer: storage_device::buffer::BufferRef<'_>,
2475 opts: storage_device::WriteOptions,
2476 ) -> Result<(), Error> {
2477 self.inner.write_with_opts(offset, buffer, opts).await
2478 }
2479 async fn trim(&self, range: std::ops::Range<u64>) -> Result<(), Error> {
2480 assert!(self.power_manager.is_lease_held());
2481 self.trim_count.fetch_add(1, Ordering::SeqCst);
2482 self.inner.trim(range).await
2483 }
2484 async fn flush(&self) -> Result<(), Error> {
2485 self.inner.flush().await
2486 }
2487 async fn close(&self) -> Result<(), Error> {
2488 self.inner.close().await
2489 }
2490 fn barrier(&self) {
2491 self.inner.barrier()
2492 }
2493 fn supports_trim(&self) -> bool {
2494 true
2495 }
2496 fn is_read_only(&self) -> bool {
2497 self.inner.is_read_only()
2498 }
2499 fn snapshot(&self) -> Result<DeviceHolder, Error> {
2500 Ok(DeviceHolder::new(TrimTrackingDevice {
2501 inner: self.inner.snapshot()?,
2502 trim_count: self.trim_count.clone(),
2503 power_manager: self.power_manager.clone(),
2504 }))
2505 }
2506 fn reopen(&self, read_only: bool) {
2507 self.inner.reopen(read_only)
2508 }
2509 }
2510
2511 let pm = Arc::new(MockPowerManager::default());
2512
2513 pm.set_on_battery(true);
2515
2516 let fake_device = FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE);
2517 let device = DeviceHolder::new(TrimTrackingDevice {
2518 inner: DeviceHolder::new(fake_device),
2519 trim_count: trim_count.clone(),
2520 power_manager: pm.clone(),
2521 });
2522
2523 let fs = FxFilesystemBuilder::new()
2524 .format(true)
2525 .power_manager(pm.clone())
2526 .trim_config(Some((Duration::ZERO, Duration::from_millis(100))))
2527 .trim_charger_wait(Duration::from_millis(10))
2528 .open(device)
2529 .await
2530 .expect("open failed");
2531
2532 TestExecutor::advance_to(fasync::MonotonicInstant::after(
2534 Duration::from_millis(500).into(),
2535 ))
2536 .await;
2537 let _ = TestExecutor::poll_until_stalled(std::future::pending::<()>()).await;
2538
2539 assert_eq!(trim_count.load(Ordering::SeqCst), 0);
2540
2541 {
2543 let root_store = fs.root_store();
2544 let root_directory =
2545 Directory::open(&root_store, root_store.root_directory_object_id())
2546 .await
2547 .expect("open failed");
2548 let mut transaction = fs
2549 .root_store()
2550 .new_transaction(
2551 lock_keys![LockKey::object(
2552 root_store.store_object_id(),
2553 root_directory.object_id()
2554 )],
2555 Options::default(),
2556 )
2557 .await
2558 .expect("new_transaction failed");
2559 let handle = root_directory
2560 .create_child_file(&mut transaction, "test")
2561 .await
2562 .expect("create_child_file failed");
2563 transaction.commit().await.expect("commit failed");
2564 handle.allocate(0..4096).await.expect("allocate failed");
2565 let mut transaction = fs
2567 .root_store()
2568 .new_transaction(
2569 lock_keys![
2570 LockKey::object(root_store.store_object_id(), root_directory.object_id()),
2571 LockKey::object(root_store.store_object_id(), handle.object_id()),
2572 ],
2573 Options::default(),
2574 )
2575 .await
2576 .expect("new_transaction failed");
2577 replace_child(&mut transaction, None, (&root_directory, "test"))
2578 .await
2579 .expect("delete failed");
2580 transaction.commit().await.expect("commit failed");
2581 fs.root_store()
2582 .tombstone_object(handle.object_id(), Options::default())
2583 .await
2584 .expect("tombstone failed");
2585 }
2586
2587 pm.set_on_battery(false);
2589
2590 TestExecutor::advance_to(fasync::MonotonicInstant::after(Duration::from_millis(10).into()))
2592 .await;
2593
2594 let _ = TestExecutor::poll_until_stalled(std::future::pending::<()>()).await;
2595
2596 assert!(trim_count.load(Ordering::SeqCst) > 0);
2597
2598 trim_count.store(0, Ordering::SeqCst);
2600 pm.set_on_battery(true);
2601
2602 TestExecutor::advance_to(fasync::MonotonicInstant::after(
2604 Duration::from_millis(500).into(),
2605 ))
2606 .await;
2607
2608 let _ = TestExecutor::poll_until_stalled(std::future::pending::<()>()).await;
2609
2610 assert_eq!(trim_count.load(Ordering::SeqCst), 0);
2611
2612 fs.close().await.expect("close failed");
2613 }
2614}