1pub mod merge;
90pub mod strategy;
91
92use crate::drop_event::DropEvent;
93use crate::errors::FxfsError;
94use crate::filesystem::{ApplyContext, ApplyMode, FxFilesystem, JournalingObject, SyncOptions};
95use crate::log::*;
96use crate::lsm_tree::cache::NullCache;
97use crate::lsm_tree::skip_list_layer::SkipListLayer;
98use crate::lsm_tree::types::{
99 FuzzyHash, Item, ItemRef, Layer, LayerIterator, LayerKey, MergeType, OrdLowerBound,
100 OrdUpperBound, SortByU64, Value,
101};
102use crate::lsm_tree::{LSMTree, Query, compact_with_iterator, layers_from_handles};
103use crate::object_handle::{INVALID_OBJECT_ID, ObjectHandle, ReadObjectHandle};
104use crate::object_store::object_manager::ReservationUpdate;
105use crate::object_store::transaction::{
106 AllocatorMutation, AssocObj, LockKey, Mutation, Options, Transaction, WriteGuard, lock_keys,
107};
108use crate::object_store::{
109 DataObjectHandle, DirectWriter, ExtentKey, HandleOptions, ObjectStore, ReservedId, tree,
110};
111use crate::range::RangeExt;
112use crate::round::{round_div, round_down, round_up};
113use crate::serialized_types::{
114 DEFAULT_MAX_SERIALIZED_RECORD_SIZE, LATEST_VERSION, Version, Versioned, VersionedLatest,
115};
116use anyhow::{Context, Error, anyhow, bail, ensure};
117use async_trait::async_trait;
118use either::Either::{Left, Right};
119use event_listener::EventListener;
120use fprint::TypeFingerprint;
121use fuchsia_inspect::HistogramProperty;
122use fuchsia_sync::Mutex;
123use futures::FutureExt;
124use fxfs_macros::SerializeKey;
125use merge::{filter_marked_for_deletion, filter_tombstones, merge};
126use serde::{Deserialize, Serialize};
127use std::borrow::Borrow;
128use std::collections::{BTreeMap, HashSet, VecDeque};
129use std::hash::Hash;
130use std::marker::PhantomData;
131use std::num::{NonZero, Saturating};
132use std::ops::Range;
133use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
134use std::sync::{Arc, Weak};
135
136pub trait ReservationOwner: Send + Sync {
138 fn release_reservation(&self, owner_object_id: Option<u64>, amount: u64);
142}
143
144pub struct ReservationImpl<T: Borrow<U>, U: ReservationOwner + ?Sized> {
152 owner: T,
153 owner_object_id: Option<u64>,
154 inner: Mutex<ReservationInner>,
155 phantom: PhantomData<U>,
156}
157
158#[derive(Debug, Default)]
159struct ReservationInner {
160 amount: u64,
162
163 reserved: u64,
165}
166
167impl<T: Borrow<U>, U: ReservationOwner + ?Sized> std::fmt::Debug for ReservationImpl<T, U> {
168 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
169 self.inner.lock().fmt(f)
170 }
171}
172
173impl<T: Borrow<U> + Clone + Send + Sync, U: ReservationOwner + ?Sized> ReservationImpl<T, U> {
174 pub fn new(owner: T, owner_object_id: Option<u64>, amount: u64) -> Self {
175 Self {
176 owner,
177 owner_object_id,
178 inner: Mutex::new(ReservationInner { amount, reserved: 0 }),
179 phantom: PhantomData,
180 }
181 }
182
183 pub fn owner_object_id(&self) -> Option<u64> {
184 self.owner_object_id
185 }
186
187 pub fn amount(&self) -> u64 {
189 self.inner.lock().amount
190 }
191
192 pub fn add(&self, amount: u64) {
194 self.inner.lock().amount += amount;
195 }
196
197 pub fn forget(&self) -> u64 {
201 let mut inner = self.inner.lock();
202 assert_eq!(inner.reserved, 0);
203 std::mem::take(&mut inner.amount)
204 }
205
206 pub fn forget_some(&self, amount: u64) {
210 let mut inner = self.inner.lock();
211 inner.amount -= amount;
212 assert!(inner.reserved <= inner.amount);
213 }
214
215 fn reserve_with(&self, amount: impl FnOnce(u64) -> u64) -> ReservationImpl<&Self, Self> {
218 let mut inner = self.inner.lock();
219 let taken = amount(inner.amount - inner.reserved);
220 inner.reserved += taken;
221 ReservationImpl::new(self, self.owner_object_id, taken)
222 }
223
224 pub fn reserve(&self, amount: u64) -> Option<ReservationImpl<&Self, Self>> {
226 let mut inner = self.inner.lock();
227 if inner.amount - inner.reserved < amount {
228 None
229 } else {
230 inner.reserved += amount;
231 Some(ReservationImpl::new(self, self.owner_object_id, amount))
232 }
233 }
234
235 pub fn commit(&self, amount: u64) {
238 let mut inner = self.inner.lock();
239 inner.reserved -= amount;
240 inner.amount -= amount;
241 }
242
243 pub fn give_back(&self, amount: u64) {
245 self.owner.borrow().release_reservation(self.owner_object_id, amount);
246 let mut inner = self.inner.lock();
247 inner.amount -= amount;
248 assert!(inner.reserved <= inner.amount);
249 }
250
251 pub fn move_to<V: Borrow<W> + Clone + Send + Sync, W: ReservationOwner + ?Sized>(
253 &self,
254 other: &ReservationImpl<V, W>,
255 amount: u64,
256 ) {
257 assert_eq!(self.owner_object_id, other.owner_object_id());
258 let mut inner = self.inner.lock();
259 if let Some(amount) = inner.amount.checked_sub(amount) {
260 inner.amount = amount;
261 } else {
262 std::mem::drop(inner);
263 panic!("Insufficient reservation space");
264 }
265 other.add(amount);
266 }
267}
268
269impl<T: Borrow<U>, U: ReservationOwner + ?Sized> Drop for ReservationImpl<T, U> {
270 fn drop(&mut self) {
271 let inner = self.inner.get_mut();
272 assert_eq!(inner.reserved, 0);
273 let owner_object_id = self.owner_object_id;
274 if inner.amount > 0 {
275 self.owner
276 .borrow()
277 .release_reservation(owner_object_id, std::mem::take(&mut inner.amount));
278 }
279 }
280}
281
282impl<T: Borrow<U> + Send + Sync, U: ReservationOwner + ?Sized> ReservationOwner
283 for ReservationImpl<T, U>
284{
285 fn release_reservation(&self, owner_object_id: Option<u64>, amount: u64) {
286 assert_eq!(owner_object_id, self.owner_object_id);
288 let mut inner = self.inner.lock();
289 assert!(inner.reserved >= amount, "{} >= {}", inner.reserved, amount);
290 inner.reserved -= amount;
291 }
292}
293
294pub type Reservation = ReservationImpl<Arc<dyn ReservationOwner>, dyn ReservationOwner>;
295
296pub type Hold<'a> = ReservationImpl<&'a Reservation, Reservation>;
297
298pub type AllocatorKey = AllocatorKeyV32;
301
302#[derive(
303 Clone,
304 Debug,
305 Deserialize,
306 Eq,
307 Hash,
308 PartialEq,
309 Serialize,
310 TypeFingerprint,
311 SerializeKey,
312 Versioned,
313)]
314#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
315pub struct AllocatorKeyV32 {
316 pub device_range: ExtentKey,
317}
318
319impl SortByU64 for AllocatorKey {
320 fn get_leading_u64(&self) -> u64 {
321 self.device_range.end
322 }
323}
324
325const EXTENT_HASH_BUCKET_SIZE: u64 = 1 * 1024 * 1024;
326
327pub struct AllocatorKeyPartitionIterator {
328 device_range: Range<u64>,
329}
330
331impl Iterator for AllocatorKeyPartitionIterator {
332 type Item = u64;
333
334 fn next(&mut self) -> Option<Self::Item> {
335 if self.device_range.start >= self.device_range.end {
336 None
337 } else {
338 let start = self.device_range.start;
339 self.device_range.start = start.saturating_add(EXTENT_HASH_BUCKET_SIZE);
340 let end = std::cmp::min(self.device_range.start, self.device_range.end);
341 let key = AllocatorKey { device_range: ExtentKey::new(start..end) };
342 let hash = crate::stable_hash::stable_hash(key);
343 Some(hash)
344 }
345 }
346}
347
348impl FuzzyHash for AllocatorKey {
349 fn fuzzy_hash(&self) -> impl Iterator<Item = u64> {
350 AllocatorKeyPartitionIterator {
351 device_range: round_down(self.device_range.start, EXTENT_HASH_BUCKET_SIZE)
352 ..round_up(self.device_range.end, EXTENT_HASH_BUCKET_SIZE).unwrap_or(u64::MAX),
353 }
354 }
355
356 fn is_range_key(&self) -> bool {
357 true
358 }
359}
360
361impl AllocatorKey {
362 pub fn lower_bound_for_merge_into(self: &AllocatorKey) -> AllocatorKey {
364 AllocatorKey { device_range: ExtentKey::new(0..self.device_range.start) }
365 }
366}
367
368impl LayerKey for AllocatorKey {
369 fn merge_type(&self) -> MergeType {
370 MergeType::OptimizedMerge
371 }
372
373 fn next_key(&self) -> Option<Self> {
374 Some(Self { device_range: ExtentKey::new(0..self.device_range.end + 1) })
375 }
376
377 fn search_key(&self) -> Option<Self> {
378 Some(Self { device_range: ExtentKey::new(0..self.device_range.start + 1) })
379 }
380
381 fn is_search_key(&self) -> bool {
382 self.device_range.start == 0
383 }
384
385 fn overlaps(&self, other: &Self) -> bool {
386 self.device_range.start < other.device_range.end
387 && self.device_range.end > other.device_range.start
388 }
389}
390
391impl OrdUpperBound for AllocatorKey {
392 fn cmp_upper_bound(&self, other: &AllocatorKey) -> std::cmp::Ordering {
393 self.device_range
394 .end
395 .cmp(&other.device_range.end)
396 .then(self.device_range.start.cmp(&other.device_range.start))
397 }
398}
399
400impl OrdLowerBound for AllocatorKey {
401 fn cmp_lower_bound(&self, other: &AllocatorKey) -> std::cmp::Ordering {
402 self.device_range
407 .start
408 .cmp(&other.device_range.start)
409 .then(self.device_range.end.cmp(&other.device_range.end))
410 }
411}
412
413impl Ord for AllocatorKey {
414 fn cmp(&self, other: &AllocatorKey) -> std::cmp::Ordering {
415 self.device_range
416 .start
417 .cmp(&other.device_range.start)
418 .then(self.device_range.end.cmp(&other.device_range.end))
419 }
420}
421
422impl PartialOrd for AllocatorKey {
423 fn partial_cmp(&self, other: &AllocatorKey) -> Option<std::cmp::Ordering> {
424 Some(self.cmp(other))
425 }
426}
427
428pub type AllocatorValue = AllocatorValueV32;
431impl Value for AllocatorValue {
432 const DELETED_MARKER: Self = Self::None;
433}
434
435#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize, TypeFingerprint, Versioned)]
436#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
437pub enum AllocatorValueV32 {
438 None,
440 Abs { count: u64, owner_object_id: u64 },
444}
445
446pub type AllocatorItem = Item<AllocatorKey, AllocatorValue>;
447
448pub type AllocatorInfo = AllocatorInfoV32;
450
451#[derive(Debug, Default, Clone, Deserialize, Serialize, TypeFingerprint, Versioned)]
452pub struct AllocatorInfoV32 {
453 pub layers: Vec<u64>,
455 pub allocated_bytes: BTreeMap<u64, u64>,
457 pub marked_for_deletion: HashSet<u64>,
460 pub limit_bytes: BTreeMap<u64, u64>,
463}
464
465const MAX_ALLOCATOR_INFO_SERIALIZED_SIZE: usize = 131_072;
466
467pub fn max_extent_size_for_block_size(block_size: u64) -> u64 {
469 block_size * (DEFAULT_MAX_SERIALIZED_RECORD_SIZE - 64) / 9
474}
475
476#[derive(Default)]
477struct AllocatorCounters {
478 num_flushes: u64,
479 last_flush_time: Option<std::time::SystemTime>,
480}
481
482pub struct Allocator {
483 filesystem: Weak<FxFilesystem>,
484 block_size: u64,
485 device_size: u64,
486 object_id: u64,
487 max_extent_size_bytes: u64,
488 tree: LSMTree<AllocatorKey, AllocatorValue>,
489 temporary_allocations: Arc<SkipListLayer<AllocatorKey, AllocatorValue>>,
496 inner: Mutex<Inner>,
497 allocation_mutex: futures::lock::Mutex<()>,
498 counters: Mutex<AllocatorCounters>,
499 maximum_offset: AtomicU64,
500 allocations_allowed: AtomicBool,
501}
502
503#[derive(Debug, Default, PartialEq)]
505struct ByteTracking {
506 allocated_bytes: Saturating<u64>,
513
514 uncommitted_allocated_bytes: u64,
517
518 reserved_bytes: u64,
520
521 committed_deallocated_bytes: u64,
525}
526
527impl ByteTracking {
528 fn used_bytes(&self) -> Saturating<u64> {
531 self.allocated_bytes + Saturating(self.uncommitted_allocated_bytes + self.reserved_bytes)
532 }
533
534 fn unavailable_bytes(&self) -> Saturating<u64> {
539 self.allocated_bytes
540 + Saturating(self.uncommitted_allocated_bytes)
541 + Saturating(self.committed_deallocated_bytes)
542 }
543
544 fn unavailable_after_sync_bytes(&self) -> Saturating<u64> {
547 self.allocated_bytes + Saturating(self.uncommitted_allocated_bytes)
548 }
549}
550
551#[derive(Debug)]
552struct CommittedDeallocation {
553 log_file_offset: u64,
555 range: Range<u64>,
557 owner_object_id: u64,
559}
560
561struct Inner {
562 info: AllocatorInfo,
563
564 opened: bool,
567
568 dropped_temporary_allocations: Vec<Range<u64>>,
596
597 owner_bytes: BTreeMap<u64, ByteTracking>,
600
601 unattributed_reserved_bytes: u64,
604
605 committed_deallocated: VecDeque<CommittedDeallocation>,
607
608 trim_reserved_bytes: u64,
611
612 trim_listener: Option<EventListener>,
616
617 strategy: strategy::BestFit,
619
620 allocation_size_histogram: [u64; 64],
622 rebuild_strategy_trigger_histogram: [u64; 64],
624
625 marked_for_deletion: HashSet<u64>,
629
630 volumes_deleted_pending_sync: HashSet<u64>,
632}
633
634impl Inner {
635 fn allocated_bytes(&self) -> Saturating<u64> {
636 let mut total = Saturating(0);
637 for (_, bytes) in &self.owner_bytes {
638 total += bytes.allocated_bytes;
639 }
640 total
641 }
642
643 fn uncommitted_allocated_bytes(&self) -> u64 {
644 self.owner_bytes.values().map(|x| &x.uncommitted_allocated_bytes).sum()
645 }
646
647 fn reserved_bytes(&self) -> u64 {
648 self.owner_bytes.values().map(|x| &x.reserved_bytes).sum::<u64>()
649 + self.unattributed_reserved_bytes
650 }
651
652 fn owner_id_limit_bytes(&self, owner_object_id: u64) -> u64 {
653 match self.info.limit_bytes.get(&owner_object_id) {
654 Some(v) => *v,
655 None => u64::MAX,
656 }
657 }
658
659 fn owner_id_bytes_left(&self, owner_object_id: u64) -> u64 {
660 let limit = self.owner_id_limit_bytes(owner_object_id);
661 let used = self.owner_bytes.get(&owner_object_id).map_or(Saturating(0), |b| b.used_bytes());
662 (Saturating(limit) - used).0
663 }
664
665 fn unavailable_bytes(&self) -> Saturating<u64> {
670 let mut total = Saturating(0);
671 for (_, bytes) in &self.owner_bytes {
672 total += bytes.unavailable_bytes();
673 }
674 total
675 }
676
677 fn used_bytes(&self) -> Saturating<u64> {
680 let mut total = Saturating(0);
681 for (_, bytes) in &self.owner_bytes {
682 total += bytes.used_bytes();
683 }
684 total + Saturating(self.unattributed_reserved_bytes)
685 }
686
687 fn unavailable_after_sync_bytes(&self) -> Saturating<u64> {
690 let mut total = Saturating(0);
691 for (_, bytes) in &self.owner_bytes {
692 total += bytes.unavailable_after_sync_bytes();
693 }
694 total
695 }
696
697 fn bytes_available_not_being_trimmed(&self, device_size: u64) -> Result<u64, Error> {
700 device_size
701 .checked_sub(
702 (self.unavailable_after_sync_bytes() + Saturating(self.trim_reserved_bytes)).0,
703 )
704 .ok_or_else(|| anyhow!(FxfsError::Inconsistent))
705 }
706
707 fn add_reservation(&mut self, owner_object_id: Option<u64>, amount: u64) {
708 match owner_object_id {
709 Some(owner) => self.owner_bytes.entry(owner).or_default().reserved_bytes += amount,
710 None => self.unattributed_reserved_bytes += amount,
711 };
712 }
713
714 fn remove_reservation(&mut self, owner_object_id: Option<u64>, amount: u64) {
715 match owner_object_id {
716 Some(owner) => {
717 let owner_entry = self.owner_bytes.entry(owner).or_default();
718 assert!(
719 owner_entry.reserved_bytes >= amount,
720 "{} >= {}",
721 owner_entry.reserved_bytes,
722 amount
723 );
724 owner_entry.reserved_bytes -= amount;
725 }
726 None => {
727 assert!(
728 self.unattributed_reserved_bytes >= amount,
729 "{} >= {}",
730 self.unattributed_reserved_bytes,
731 amount
732 );
733 self.unattributed_reserved_bytes -= amount
734 }
735 };
736 }
737}
738
739pub struct TrimmableExtents<'a> {
742 allocator: &'a Allocator,
743 extents: Vec<Range<u64>>,
744 _drop_event: DropEvent,
748}
749
750impl<'a> TrimmableExtents<'a> {
751 pub fn extents(&self) -> &Vec<Range<u64>> {
752 &self.extents
753 }
754
755 fn new(allocator: &'a Allocator) -> (Self, EventListener) {
757 let drop_event = DropEvent::new();
758 let listener = drop_event.listen();
759 (Self { allocator, extents: vec![], _drop_event: drop_event }, listener)
760 }
761
762 fn add_extent(&mut self, extent: Range<u64>) {
763 self.extents.push(extent);
764 }
765}
766
767impl<'a> Drop for TrimmableExtents<'a> {
768 fn drop(&mut self) {
769 let mut inner = self.allocator.inner.lock();
770 for device_range in std::mem::take(&mut self.extents) {
771 inner.strategy.free(device_range.clone()).expect("drop trim extent");
772 self.allocator
773 .temporary_allocations
774 .erase(&AllocatorKey { device_range: ExtentKey::new(device_range.clone()) });
775 }
776 inner.trim_reserved_bytes = 0;
777 }
778}
779
780impl Allocator {
781 pub fn new(filesystem: Arc<FxFilesystem>, object_id: u64) -> Allocator {
782 let block_size = filesystem.block_size();
783 let device_size = round_down(filesystem.device().size(), block_size);
785 if device_size != filesystem.device().size() {
786 warn!("Device size is not block aligned. Rounding down.");
787 }
788 let max_extent_size_bytes = max_extent_size_for_block_size(filesystem.block_size());
789 let mut strategy = strategy::BestFit::default();
790 strategy.free(0..device_size).expect("new fs");
791 Allocator {
792 filesystem: Arc::downgrade(&filesystem),
793 block_size,
794 device_size,
795 object_id,
796 max_extent_size_bytes,
797 tree: LSMTree::new(merge, Box::new(NullCache {})),
798 temporary_allocations: SkipListLayer::new(1024),
799 inner: Mutex::new(Inner {
800 info: AllocatorInfo::default(),
801 opened: false,
802 dropped_temporary_allocations: Vec::new(),
803 owner_bytes: BTreeMap::new(),
804 unattributed_reserved_bytes: 0,
805 committed_deallocated: VecDeque::new(),
806 trim_reserved_bytes: 0,
807 trim_listener: None,
808 strategy,
809 allocation_size_histogram: [0; 64],
810 rebuild_strategy_trigger_histogram: [0; 64],
811 marked_for_deletion: HashSet::new(),
812 volumes_deleted_pending_sync: HashSet::new(),
813 }),
814 allocation_mutex: futures::lock::Mutex::new(()),
815 counters: Mutex::new(AllocatorCounters::default()),
816 maximum_offset: AtomicU64::new(0),
817 allocations_allowed: AtomicBool::new(filesystem.options().image_builder_mode.is_none()),
818 }
819 }
820
821 pub fn tree(&self) -> &LSMTree<AllocatorKey, AllocatorValue> {
822 &self.tree
823 }
824
825 pub fn enable_allocations(&self) {
828 self.allocations_allowed.store(true, Ordering::SeqCst);
829 }
830
831 pub async fn filter(
836 &self,
837 iter: impl LayerIterator<AllocatorKey, AllocatorValue>,
838 committed_marked_for_deletion: bool,
839 ) -> Result<impl LayerIterator<AllocatorKey, AllocatorValue>, Error> {
840 let marked_for_deletion = {
841 let inner = self.inner.lock();
842 if committed_marked_for_deletion {
843 &inner.info.marked_for_deletion
844 } else {
845 &inner.marked_for_deletion
846 }
847 .clone()
848 };
849 let iter =
850 filter_marked_for_deletion(filter_tombstones(iter).await?, marked_for_deletion).await?;
851 Ok(iter)
852 }
853
854 pub fn allocation_size_histogram(&self) -> [u64; 64] {
858 self.inner.lock().allocation_size_histogram
859 }
860
861 pub async fn create(&self, transaction: &mut Transaction<'_>) -> Result<(), Error> {
863 assert_eq!(std::mem::replace(&mut self.inner.lock().opened, true), false);
866
867 let filesystem = self.filesystem.upgrade().unwrap();
868 let root_store = filesystem.root_store();
869 ObjectStore::create_object_with_id(
870 &root_store,
871 transaction,
872 ReservedId::new(&root_store, NonZero::new(self.object_id()).unwrap()),
873 HandleOptions::default(),
874 None,
875 )?;
876 root_store.update_last_object_id(self.object_id());
877 Ok(())
878 }
879
880 pub async fn open(self: &Arc<Self>) -> Result<(), Error> {
883 let filesystem = self.filesystem.upgrade().unwrap();
884 let root_store = filesystem.root_store();
885
886 self.inner.lock().strategy = strategy::BestFit::default();
887
888 let handle =
889 ObjectStore::open_object(&root_store, self.object_id, HandleOptions::default(), None)
890 .await
891 .context("Failed to open allocator object")?;
892
893 if handle.get_size() > 0 {
894 let serialized_info = handle
895 .contents(MAX_ALLOCATOR_INFO_SERIALIZED_SIZE)
896 .await
897 .context("Failed to read AllocatorInfo")?;
898 let mut cursor = std::io::Cursor::new(serialized_info);
899 let (info, _version) = AllocatorInfo::deserialize_with_version(&mut cursor)
900 .context("Failed to deserialize AllocatorInfo")?;
901
902 let mut handles = Vec::new();
903 let mut total_size = 0;
904 for object_id in &info.layers {
905 let handle = ObjectStore::open_object(
906 &root_store,
907 *object_id,
908 HandleOptions::default(),
909 None,
910 )
911 .await
912 .context("Failed to open allocator layer file")?;
913
914 let size = handle.get_size();
915 total_size += size;
916 handles.push(handle);
917 }
918
919 {
920 let mut inner = self.inner.lock();
921
922 let mut device_bytes = self.device_size;
924 for (&owner_object_id, &bytes) in &info.allocated_bytes {
925 ensure!(
926 bytes <= device_bytes,
927 anyhow!(FxfsError::Inconsistent).context(format!(
928 "Allocated bytes exceeds device size: {:?}",
929 info.allocated_bytes
930 ))
931 );
932 device_bytes -= bytes;
933
934 inner.owner_bytes.entry(owner_object_id).or_default().allocated_bytes =
935 Saturating(bytes);
936 }
937
938 inner.info = info;
939 }
940
941 self.tree.append_layers(handles).await.context("Failed to append allocator layers")?;
942 self.filesystem.upgrade().unwrap().object_manager().update_reservation(
943 self.object_id,
944 tree::reservation_amount_from_layer_size(total_size),
945 );
946 }
947
948 Ok(())
949 }
950
951 pub async fn on_replay_complete(self: &Arc<Self>) -> Result<(), Error> {
952 {
954 let mut inner = self.inner.lock();
955 inner.volumes_deleted_pending_sync.clear();
956 inner.marked_for_deletion = inner.info.marked_for_deletion.clone();
957 }
958
959 if !self.rebuild_strategy().await.context("Build free extents")? {
963 if self.filesystem.upgrade().unwrap().options().read_only {
964 info!("Device contains no free space (read-only mode).");
965 } else {
966 info!("Device contains no free space.");
967 return Err(FxfsError::Inconsistent)
968 .context("Device appears to contain no free space");
969 }
970 }
971
972 assert_eq!(std::mem::replace(&mut self.inner.lock().opened, true), false);
973 Ok(())
974 }
975
976 async fn rebuild_strategy(self: &Arc<Self>) -> Result<bool, Error> {
981 let mut changed = false;
982 let mut layer_set = self.tree.empty_layer_set();
983 layer_set.layers.push((self.temporary_allocations.clone() as Arc<dyn Layer<_, _>>).into());
984 self.tree.add_all_layers_to_layer_set(&mut layer_set);
985
986 let overflow_markers = self.inner.lock().strategy.overflow_markers();
987 self.inner.lock().strategy.reset_overflow_markers();
988
989 let mut to_add = Vec::new();
990 let mut merger = layer_set.merger();
991 let mut iter = self.filter(merger.query(Query::FullScan).await?, false).await?;
992 let mut last_offset = 0;
993 while last_offset < self.device_size {
994 let next_range = match iter.get() {
995 None => {
996 assert!(last_offset <= self.device_size);
997 let range = last_offset..self.device_size;
998 last_offset = self.device_size;
999 iter.advance().await?;
1000 range
1001 }
1002 Some(ItemRef { key, .. }) => {
1003 let device_range = &key.device_range;
1004 if device_range.end < last_offset {
1005 iter.advance().await?;
1006 continue;
1007 }
1008 if device_range.start <= last_offset {
1009 last_offset = device_range.end;
1010 iter.advance().await?;
1011 continue;
1012 }
1013 let range = last_offset..device_range.start;
1014 last_offset = device_range.end;
1015 iter.advance().await?;
1016 range
1017 }
1018 };
1019 to_add.push(next_range);
1020 if to_add.len() > 100 {
1022 let mut inner = self.inner.lock();
1023 for range in to_add.drain(..) {
1024 changed |= inner.strategy.force_free(range)?;
1025 }
1026 }
1027 }
1028 let mut inner = self.inner.lock();
1029 for range in to_add {
1030 changed |= inner.strategy.force_free(range)?;
1031 }
1032 if overflow_markers != inner.strategy.overflow_markers() {
1033 changed = true;
1034 }
1035 Ok(changed)
1036 }
1037
1038 pub async fn take_for_trimming(
1042 &self,
1043 offset: u64,
1044 max_extent_size: usize,
1045 extents_per_batch: usize,
1046 ) -> Result<TrimmableExtents<'_>, Error> {
1047 let _guard = self.allocation_mutex.lock().await;
1048
1049 let (mut result, listener) = TrimmableExtents::new(self);
1050 let mut bytes = 0;
1051
1052 let mut layer_set = self.tree.empty_layer_set();
1055 layer_set.layers.push((self.temporary_allocations.clone() as Arc<dyn Layer<_, _>>).into());
1056 self.tree.add_all_layers_to_layer_set(&mut layer_set);
1057 let mut merger = layer_set.merger();
1058 let mut iter = self
1059 .filter(
1060 merger
1061 .query(Query::FullRange(&AllocatorKey {
1062 device_range: ExtentKey::new(0..offset + 1),
1063 }))
1064 .await?,
1065 false,
1066 )
1067 .await?;
1068 let mut last_offset = offset;
1069 'outer: while last_offset < self.device_size {
1070 let mut range = match iter.get() {
1071 None => {
1072 assert!(last_offset <= self.device_size);
1073 let range = last_offset..self.device_size;
1074 last_offset = self.device_size;
1075 iter.advance().await?;
1076 range
1077 }
1078 Some(ItemRef { key: AllocatorKey { device_range, .. }, .. }) => {
1079 if device_range.end <= last_offset {
1080 iter.advance().await?;
1081 continue;
1082 }
1083 if device_range.start <= last_offset {
1084 last_offset = device_range.end;
1085 iter.advance().await?;
1086 continue;
1087 }
1088 let range = last_offset..device_range.start;
1089 last_offset = device_range.end;
1090 iter.advance().await?;
1091 range
1092 }
1093 };
1094 if range.start < offset {
1095 continue;
1096 }
1097
1098 let mut inner = self.inner.lock();
1101
1102 while range.start < range.end {
1103 let prefix =
1104 range.start..std::cmp::min(range.start + max_extent_size as u64, range.end);
1105 range = prefix.end..range.end;
1106 bytes += prefix.length()?;
1107 inner.strategy.remove(prefix.clone());
1110 self.temporary_allocations.insert(AllocatorItem {
1111 key: AllocatorKey { device_range: ExtentKey::new(prefix.clone()) },
1112 value: AllocatorValue::Abs { owner_object_id: INVALID_OBJECT_ID, count: 1 },
1113 })?;
1114 result.add_extent(prefix);
1115 if result.extents.len() == extents_per_batch {
1116 break 'outer;
1117 }
1118 }
1119 if result.extents.len() == extents_per_batch {
1120 break 'outer;
1121 }
1122 }
1123 {
1124 let mut inner = self.inner.lock();
1125
1126 assert!(inner.trim_reserved_bytes == 0, "Multiple trims ongoing");
1127 inner.trim_listener = Some(listener);
1128 inner.trim_reserved_bytes = bytes;
1129 debug_assert!(
1130 (Saturating(inner.trim_reserved_bytes) + inner.unavailable_bytes()).0
1131 <= self.device_size
1132 );
1133 }
1134 Ok(result)
1135 }
1136
1137 pub fn parent_objects(&self) -> Vec<u64> {
1139 self.inner.lock().info.layers.clone()
1142 }
1143
1144 pub fn owner_byte_limits(&self) -> Vec<(u64, u64)> {
1146 self.inner.lock().info.limit_bytes.iter().map(|(k, v)| (*k, *v)).collect()
1147 }
1148
1149 pub fn owner_allocation_info(&self, owner_object_id: u64) -> (u64, Option<u64>) {
1151 let inner = self.inner.lock();
1152 (
1153 inner.owner_bytes.get(&owner_object_id).map(|b| b.used_bytes().0).unwrap_or(0u64),
1154 inner.info.limit_bytes.get(&owner_object_id).copied(),
1155 )
1156 }
1157
1158 pub fn owner_bytes_debug(&self) -> String {
1160 format!("{:?}", self.inner.lock().owner_bytes)
1161 }
1162
1163 fn needs_sync(&self) -> bool {
1164 let inner = self.inner.lock();
1169 inner.unavailable_bytes().0 >= self.device_size
1170 }
1171
1172 fn is_system_store(&self, owner_object_id: u64) -> bool {
1173 let fs = self.filesystem.upgrade().unwrap();
1174 owner_object_id == fs.object_manager().root_store_object_id()
1175 || owner_object_id == fs.object_manager().root_parent_store_object_id()
1176 }
1177
1178 pub fn disown_reservation(&self, old_owner_object_id: Option<u64>, amount: u64) {
1181 if old_owner_object_id.is_none() || amount == 0 {
1182 return;
1183 }
1184 let mut inner = self.inner.lock();
1186 inner.remove_reservation(old_owner_object_id, amount);
1187 inner.add_reservation(None, amount);
1188 }
1189
1190 pub fn track_statistics(self: &Arc<Self>, parent: &fuchsia_inspect::Node, name: &str) {
1193 let this = Arc::downgrade(self);
1194 parent.record_lazy_child(name, move || {
1195 let this_clone = this.clone();
1196 async move {
1197 let inspector = fuchsia_inspect::Inspector::default();
1198 if let Some(this) = this_clone.upgrade() {
1199 let counters = this.counters.lock();
1200 let root = inspector.root();
1201 root.record_uint("max_extent_size_bytes", this.max_extent_size_bytes);
1202 root.record_uint("bytes_total", this.device_size);
1203 let (allocated, reserved, used, unavailable) = {
1204 let inner = this.inner.lock();
1206 (
1207 inner.allocated_bytes().0,
1208 inner.reserved_bytes(),
1209 inner.used_bytes().0,
1210 inner.unavailable_bytes().0,
1211 )
1212 };
1213 root.record_uint("bytes_allocated", allocated);
1214 root.record_uint("bytes_reserved", reserved);
1215 root.record_uint("bytes_used", used);
1216 root.record_uint("bytes_unavailable", unavailable);
1217
1218 if let Some(x) = round_div(100 * allocated, this.device_size) {
1221 root.record_uint("bytes_allocated_percent", x);
1222 }
1223 if let Some(x) = round_div(100 * reserved, this.device_size) {
1224 root.record_uint("bytes_reserved_percent", x);
1225 }
1226 if let Some(x) = round_div(100 * used, this.device_size) {
1227 root.record_uint("bytes_used_percent", x);
1228 }
1229 if let Some(x) = round_div(100 * unavailable, this.device_size) {
1230 root.record_uint("bytes_unavailable_percent", x);
1231 }
1232
1233 root.record_uint("num_flushes", counters.num_flushes);
1234 if let Some(last_flush_time) = counters.last_flush_time.as_ref() {
1235 root.record_uint(
1236 "last_flush_time_ms",
1237 last_flush_time
1238 .duration_since(std::time::UNIX_EPOCH)
1239 .unwrap_or(std::time::Duration::ZERO)
1240 .as_millis()
1241 .try_into()
1242 .unwrap_or(0u64),
1243 );
1244 }
1245
1246 let data = this.allocation_size_histogram();
1247 let alloc_sizes = root.create_uint_linear_histogram(
1248 "allocation_size_histogram",
1249 fuchsia_inspect::LinearHistogramParams {
1250 floor: 1,
1251 step_size: 1,
1252 buckets: 64,
1253 },
1254 );
1255 for (i, count) in data.iter().enumerate() {
1256 if i != 0 {
1257 alloc_sizes.insert_multiple(i as u64, *count as usize);
1258 }
1259 }
1260 root.record(alloc_sizes);
1261
1262 let data = this.inner.lock().rebuild_strategy_trigger_histogram;
1263 let triggers = root.create_uint_linear_histogram(
1264 "rebuild_strategy_triggers",
1265 fuchsia_inspect::LinearHistogramParams {
1266 floor: 1,
1267 step_size: 1,
1268 buckets: 64,
1269 },
1270 );
1271 for (i, count) in data.iter().enumerate() {
1272 if i != 0 {
1273 triggers.insert_multiple(i as u64, *count as usize);
1274 }
1275 }
1276 root.record(triggers);
1277
1278 let allocator_ref = this.clone();
1279 root.record_child("lsm_tree", move |node| {
1280 allocator_ref.tree.record_inspect_data(node);
1281 });
1282 }
1283 Ok(inspector)
1284 }
1285 .boxed()
1286 });
1287 }
1288
1289 pub fn maximum_offset(&self) -> u64 {
1294 self.maximum_offset.load(Ordering::Relaxed)
1295 }
1296}
1297
1298impl Drop for Allocator {
1299 fn drop(&mut self) {
1300 let inner = self.inner.lock();
1301 assert_eq!(inner.uncommitted_allocated_bytes(), 0);
1303 assert_eq!(inner.reserved_bytes(), 0);
1304 }
1305}
1306
1307#[fxfs_trace::trace]
1308impl Allocator {
1309 pub fn object_id(&self) -> u64 {
1311 self.object_id
1312 }
1313
1314 pub fn info(&self) -> AllocatorInfo {
1317 self.inner.lock().info.clone()
1318 }
1319
1320 #[trace]
1328 pub async fn allocate(
1329 self: &Arc<Self>,
1330 transaction: &mut Transaction<'_>,
1331 owner_object_id: u64,
1332 mut len: u64,
1333 ) -> Result<Range<u64>, Error> {
1334 ensure!(self.allocations_allowed.load(Ordering::SeqCst), FxfsError::Unavailable);
1335 assert_eq!(len % self.block_size, 0);
1336 len = std::cmp::min(len, self.max_extent_size_bytes);
1337 debug_assert_ne!(owner_object_id, INVALID_OBJECT_ID);
1338
1339 let reservation = if let Some(reservation) = transaction.allocator_reservation {
1341 match reservation.owner_object_id {
1342 None => assert!(self.is_system_store(owner_object_id)),
1344 Some(res_owner_object_id) => assert_eq!(owner_object_id, res_owner_object_id),
1346 };
1347 let r = reservation
1349 .reserve_with(|limit| std::cmp::min(len, round_down(limit, self.block_size)));
1350 len = r.amount();
1351 Left(r)
1352 } else {
1353 let mut inner = self.inner.lock();
1354 assert!(inner.opened);
1355 let device_used = inner.used_bytes();
1357 let owner_bytes_left = inner.owner_id_bytes_left(owner_object_id);
1358 let limit =
1360 std::cmp::min(owner_bytes_left, (Saturating(self.device_size) - device_used).0);
1361 len = round_down(std::cmp::min(len, limit), self.block_size);
1362 let owner_entry = inner.owner_bytes.entry(owner_object_id).or_default();
1363 owner_entry.reserved_bytes += len;
1364 Right(ReservationImpl::<_, Self>::new(&**self, Some(owner_object_id), len))
1365 };
1366
1367 ensure!(len > 0, FxfsError::NoSpace);
1368
1369 let volumes_deleted = {
1371 let inner = self.inner.lock();
1372 (!inner.volumes_deleted_pending_sync.is_empty())
1373 .then(|| inner.volumes_deleted_pending_sync.clone())
1374 };
1375
1376 if let Some(volumes_deleted) = volumes_deleted {
1377 self.filesystem
1380 .upgrade()
1381 .unwrap()
1382 .sync(SyncOptions {
1383 flush_device: true,
1384 precondition: Some(Box::new(|| {
1385 !self.inner.lock().volumes_deleted_pending_sync.is_empty()
1386 })),
1387 ..Default::default()
1388 })
1389 .await?;
1390
1391 {
1392 let mut inner = self.inner.lock();
1393 for owner_id in volumes_deleted {
1394 inner.volumes_deleted_pending_sync.remove(&owner_id);
1395 inner.marked_for_deletion.insert(owner_id);
1396 }
1397 }
1398
1399 let _guard = self.allocation_mutex.lock().await;
1400 self.rebuild_strategy().await?;
1401 }
1402
1403 #[allow(clippy::never_loop)] let _guard = 'sync: loop {
1405 for _ in 0..10 {
1407 {
1408 let guard = self.allocation_mutex.lock().await;
1409
1410 if !self.needs_sync() {
1411 break 'sync guard;
1412 }
1413 }
1414
1415 self.filesystem
1425 .upgrade()
1426 .unwrap()
1427 .sync(SyncOptions {
1428 flush_device: true,
1429 precondition: Some(Box::new(|| self.needs_sync())),
1430 ..Default::default()
1431 })
1432 .await?;
1433 }
1434 bail!(
1435 anyhow!(FxfsError::NoSpace).context("Sync failed to yield sufficient free space.")
1436 );
1437 };
1438
1439 let mut trim_listener = None;
1440 {
1441 let mut inner = self.inner.lock();
1442 inner.allocation_size_histogram[std::cmp::min(63, len / self.block_size) as usize] += 1;
1443
1444 let avail = self
1447 .device_size
1448 .checked_sub(inner.unavailable_bytes().0)
1449 .ok_or(FxfsError::Inconsistent)?;
1450 let free_and_not_being_trimmed =
1451 inner.bytes_available_not_being_trimmed(self.device_size)?;
1452 if free_and_not_being_trimmed < std::cmp::min(len, avail) {
1453 debug_assert!(inner.trim_reserved_bytes > 0);
1454 trim_listener = std::mem::take(&mut inner.trim_listener);
1455 }
1456 }
1457
1458 if let Some(listener) = trim_listener {
1459 listener.await;
1460 }
1461
1462 let result = loop {
1463 {
1464 let mut inner = self.inner.lock();
1465
1466 for device_range in inner.dropped_temporary_allocations.drain(..) {
1469 self.temporary_allocations
1470 .erase(&AllocatorKey { device_range: ExtentKey::new(device_range) });
1471 }
1472
1473 match inner.strategy.allocate(len) {
1474 Err(FxfsError::NotFound) => {
1475 inner.rebuild_strategy_trigger_histogram
1477 [std::cmp::min(63, (len / self.block_size) as usize)] += 1;
1478 }
1479 Err(err) => {
1480 error!(err:%; "Likely filesystem corruption.");
1481 return Err(err.into());
1482 }
1483 Ok(x) => {
1484 break x;
1485 }
1486 }
1487 }
1488 if !self.rebuild_strategy().await? {
1492 error!("Cannot find additional free space. Corruption?");
1493 return Err(FxfsError::Inconsistent.into());
1494 }
1495 };
1496
1497 debug!(device_range:? = result; "allocate");
1498
1499 let len = result.length().unwrap();
1500 let reservation_owner = reservation.either(
1501 |l| {
1503 l.forget_some(len);
1504 l.owner_object_id()
1505 },
1506 |r| {
1507 r.forget_some(len);
1508 r.owner_object_id()
1509 },
1510 );
1511
1512 {
1513 let mut inner = self.inner.lock();
1514 let owner_entry = inner.owner_bytes.entry(owner_object_id).or_default();
1515 owner_entry.uncommitted_allocated_bytes += len;
1516 assert_eq!(owner_object_id, reservation_owner.unwrap_or(owner_object_id));
1518 inner.remove_reservation(reservation_owner, len);
1519 self.temporary_allocations.insert(AllocatorItem {
1520 key: AllocatorKey { device_range: ExtentKey::new(result.clone()) },
1521 value: AllocatorValue::Abs { owner_object_id, count: 1 },
1522 })?;
1523 }
1524
1525 let mutation =
1526 AllocatorMutation::Allocate { device_range: result.clone().into(), owner_object_id };
1527 assert!(transaction.add(self.object_id(), Mutation::Allocator(mutation)).is_none());
1528
1529 Ok(result)
1530 }
1531
1532 #[trace]
1535 pub fn mark_allocated(
1536 &self,
1537 transaction: &mut Transaction<'_>,
1538 owner_object_id: u64,
1539 device_range: Range<u64>,
1540 ) -> Result<(), Error> {
1541 debug_assert_ne!(owner_object_id, INVALID_OBJECT_ID);
1542 {
1543 let len = device_range.length().map_err(|_| FxfsError::InvalidArgs)?;
1544
1545 let mut inner = self.inner.lock();
1546 let device_used = inner.used_bytes();
1547 let owner_id_bytes_left = inner.owner_id_bytes_left(owner_object_id);
1548 let owner_entry = inner.owner_bytes.entry(owner_object_id).or_default();
1549 ensure!(
1550 device_range.end <= self.device_size
1551 && (Saturating(self.device_size) - device_used).0 >= len
1552 && owner_id_bytes_left >= len,
1553 FxfsError::NoSpace
1554 );
1555 if let Some(reservation) = &mut transaction.allocator_reservation {
1556 reservation.reserve(len).ok_or(FxfsError::NoSpace)?.forget();
1558 }
1559 owner_entry.uncommitted_allocated_bytes += len;
1560 inner.strategy.remove(device_range.clone());
1561 self.temporary_allocations.insert(AllocatorItem {
1562 key: AllocatorKey { device_range: ExtentKey::new(device_range.clone()) },
1563 value: AllocatorValue::Abs { owner_object_id, count: 1 },
1564 })?;
1565 }
1566 let mutation =
1567 AllocatorMutation::Allocate { device_range: device_range.into(), owner_object_id };
1568 transaction.add(self.object_id(), Mutation::Allocator(mutation));
1569 Ok(())
1570 }
1571
1572 pub fn set_bytes_limit(
1574 &self,
1575 transaction: &mut Transaction<'_>,
1576 owner_object_id: u64,
1577 bytes: u64,
1578 ) -> Result<(), Error> {
1579 assert!(!self.is_system_store(owner_object_id));
1581 transaction.add(
1582 self.object_id(),
1583 Mutation::Allocator(AllocatorMutation::SetLimit { owner_object_id, bytes }),
1584 );
1585 Ok(())
1586 }
1587
1588 pub fn get_owner_bytes_limit(&self, owner_object_id: u64) -> Option<u64> {
1590 self.inner.lock().info.limit_bytes.get(&owner_object_id).copied()
1591 }
1592
1593 pub fn get_owner_bytes_used(&self, owner_object_id: u64) -> u64 {
1596 self.inner.lock().owner_bytes.get(&owner_object_id).map_or(0, |info| info.used_bytes().0)
1597 }
1598
1599 #[trace]
1601 pub async fn deallocate(
1602 &self,
1603 transaction: &mut Transaction<'_>,
1604 owner_object_id: u64,
1605 dealloc_range: Range<u64>,
1606 ) -> Result<u64, Error> {
1607 debug!(device_range:? = dealloc_range; "deallocate");
1608 ensure!(dealloc_range.is_valid(), FxfsError::InvalidArgs);
1609 let deallocated = dealloc_range.end - dealloc_range.start;
1613 let mutation = AllocatorMutation::Deallocate {
1614 device_range: dealloc_range.clone().into(),
1615 owner_object_id,
1616 };
1617 transaction.add(self.object_id(), Mutation::Allocator(mutation));
1618
1619 let _guard = self.allocation_mutex.lock().await;
1620
1621 let mut inner = self.inner.lock();
1633 for device_range in inner.dropped_temporary_allocations.drain(..) {
1634 self.temporary_allocations
1635 .erase(&AllocatorKey { device_range: ExtentKey::new(device_range) });
1636 }
1637
1638 self.temporary_allocations
1643 .insert(AllocatorItem {
1644 key: AllocatorKey { device_range: ExtentKey::new(dealloc_range.clone()) },
1645 value: AllocatorValue::Abs { owner_object_id, count: 1 },
1646 })
1647 .context("tracking deallocated")?;
1648
1649 Ok(deallocated)
1650 }
1651
1652 pub fn mark_for_deletion(&self, transaction: &mut Transaction<'_>, owner_object_id: u64) {
1669 transaction.add(
1672 self.object_id(),
1673 Mutation::Allocator(AllocatorMutation::MarkForDeletion(owner_object_id)),
1674 );
1675 }
1676
1677 pub fn did_flush_device(&self, flush_log_offset: u64) {
1680 #[allow(clippy::never_loop)] let deallocs = 'deallocs_outer: loop {
1685 let mut inner = self.inner.lock();
1686 for (index, dealloc) in inner.committed_deallocated.iter().enumerate() {
1687 if dealloc.log_file_offset >= flush_log_offset {
1688 let mut deallocs = inner.committed_deallocated.split_off(index);
1689 std::mem::swap(&mut inner.committed_deallocated, &mut deallocs);
1691 break 'deallocs_outer deallocs;
1692 }
1693 }
1694 break std::mem::take(&mut inner.committed_deallocated);
1695 };
1696
1697 let mut inner = self.inner.lock();
1699 let mut totals = BTreeMap::<u64, u64>::new();
1700 for dealloc in deallocs {
1701 *(totals.entry(dealloc.owner_object_id).or_default()) +=
1702 dealloc.range.length().unwrap();
1703 inner.strategy.free(dealloc.range.clone()).expect("dealloced ranges");
1704 self.temporary_allocations
1705 .erase(&AllocatorKey { device_range: ExtentKey::new(dealloc.range.clone()) });
1706 }
1707
1708 for (owner_object_id, total) in totals {
1712 match inner.owner_bytes.get_mut(&owner_object_id) {
1713 Some(counters) => counters.committed_deallocated_bytes -= total,
1714 None => {
1715 assert!(inner.volumes_deleted_pending_sync.contains(&owner_object_id));
1718 }
1719 }
1720 }
1721 }
1722
1723 pub fn reserve(
1726 self: Arc<Self>,
1727 owner_object_id: Option<u64>,
1728 amount: u64,
1729 ) -> Option<Reservation> {
1730 {
1731 let mut inner = self.inner.lock();
1732
1733 let device_free = (Saturating(self.device_size) - inner.used_bytes()).0;
1734
1735 let limit = match owner_object_id {
1736 Some(id) => std::cmp::min(inner.owner_id_bytes_left(id), device_free),
1737 None => device_free,
1738 };
1739 if limit < amount {
1740 return None;
1741 }
1742 inner.add_reservation(owner_object_id, amount);
1743 }
1744 Some(Reservation::new(self, owner_object_id, amount))
1745 }
1746
1747 pub fn reserve_with(
1750 self: Arc<Self>,
1751 owner_object_id: Option<u64>,
1752 amount: impl FnOnce(u64) -> u64,
1753 ) -> Reservation {
1754 let amount = {
1755 let mut inner = self.inner.lock();
1756
1757 let device_free = (Saturating(self.device_size) - inner.used_bytes()).0;
1758
1759 let amount = amount(match owner_object_id {
1760 Some(id) => std::cmp::min(inner.owner_id_bytes_left(id), device_free),
1761 None => device_free,
1762 });
1763
1764 inner.add_reservation(owner_object_id, amount);
1765
1766 amount
1767 };
1768
1769 Reservation::new(self, owner_object_id, amount)
1770 }
1771
1772 pub fn get_allocated_bytes(&self) -> u64 {
1774 self.inner.lock().allocated_bytes().0
1775 }
1776
1777 pub fn get_disk_bytes(&self) -> u64 {
1779 self.device_size
1780 }
1781
1782 pub fn get_owner_allocated_bytes(&self) -> BTreeMap<u64, u64> {
1786 self.inner.lock().owner_bytes.iter().map(|(k, v)| (*k, v.allocated_bytes.0)).collect()
1787 }
1788
1789 pub fn get_used_bytes(&self) -> Saturating<u64> {
1791 let inner = self.inner.lock();
1792 inner.used_bytes()
1793 }
1794}
1795
1796impl ReservationOwner for Allocator {
1797 fn release_reservation(&self, owner_object_id: Option<u64>, amount: u64) {
1798 self.inner.lock().remove_reservation(owner_object_id, amount);
1799 }
1800}
1801
1802#[async_trait]
1803impl JournalingObject for Allocator {
1804 fn apply_mutation(
1805 &self,
1806 mutation: Mutation,
1807 context: &ApplyContext<'_, '_>,
1808 _assoc_obj: AssocObj<'_>,
1809 ) -> Result<(), Error> {
1810 match mutation {
1811 Mutation::Allocator(AllocatorMutation::MarkForDeletion(owner_object_id)) => {
1812 let mut inner = self.inner.lock();
1813 inner.owner_bytes.remove(&owner_object_id);
1814
1815 inner.info.marked_for_deletion.insert(owner_object_id);
1820 inner.volumes_deleted_pending_sync.insert(owner_object_id);
1821
1822 inner.info.limit_bytes.remove(&owner_object_id);
1823 }
1824 Mutation::Allocator(AllocatorMutation::Allocate { device_range, owner_object_id }) => {
1825 self.maximum_offset.fetch_max(device_range.end, Ordering::Relaxed);
1826 let item = AllocatorItem {
1827 key: AllocatorKey { device_range: ExtentKey::new(device_range.clone().into()) },
1828 value: AllocatorValue::Abs { count: 1, owner_object_id },
1829 };
1830 let len = item.key.device_range.length().unwrap();
1831 let lower_bound = item.key.lower_bound_for_merge_into();
1832 self.tree.merge_into(item, &lower_bound);
1833 let mut inner = self.inner.lock();
1834 let entry = inner.owner_bytes.entry(owner_object_id).or_default();
1835 entry.allocated_bytes += len;
1836 if let ApplyMode::Live(transaction) = context.mode {
1837 entry.uncommitted_allocated_bytes -= len;
1838 inner.dropped_temporary_allocations.push(device_range.into());
1843 if let Some(reservation) = transaction.allocator_reservation {
1844 reservation.commit(len);
1845 }
1846 }
1847 }
1848 Mutation::Allocator(AllocatorMutation::Deallocate {
1849 device_range,
1850 owner_object_id,
1851 }) => {
1852 let item = AllocatorItem {
1853 key: AllocatorKey { device_range: ExtentKey::new(device_range.into()) },
1854 value: AllocatorValue::None,
1855 };
1856 let len = item.key.device_range.length().unwrap();
1857
1858 {
1859 let mut inner = self.inner.lock();
1860 {
1861 let entry = inner.owner_bytes.entry(owner_object_id).or_default();
1862 entry.allocated_bytes -= len;
1863 if context.mode.is_live() {
1864 entry.committed_deallocated_bytes += len;
1865 }
1866 }
1867 if context.mode.is_live() {
1868 inner.committed_deallocated.push_back(CommittedDeallocation {
1869 log_file_offset: context.checkpoint.file_offset,
1870 range: (*item.key.device_range).clone(),
1871 owner_object_id,
1872 });
1873 }
1874 if let ApplyMode::Live(Transaction {
1875 allocator_reservation: Some(reservation),
1876 ..
1877 }) = context.mode
1878 {
1879 inner.add_reservation(reservation.owner_object_id(), len);
1880 reservation.add(len);
1881 }
1882 }
1883 let lower_bound = item.key.lower_bound_for_merge_into();
1884 self.tree.merge_into(item, &lower_bound);
1885 }
1886 Mutation::Allocator(AllocatorMutation::SetLimit { owner_object_id, bytes }) => {
1887 self.inner.lock().info.limit_bytes.insert(owner_object_id, bytes);
1892 }
1893 Mutation::BeginFlush => {
1894 self.tree.seal();
1895 let mut inner = self.inner.lock();
1898 let allocated_bytes =
1899 inner.owner_bytes.iter().map(|(k, v)| (*k, v.allocated_bytes.0)).collect();
1900 inner.info.allocated_bytes = allocated_bytes;
1901 }
1902 Mutation::EndFlush => {}
1903 _ => bail!("unexpected mutation: {:?}", mutation),
1904 }
1905 Ok(())
1906 }
1907
1908 fn drop_mutation(&self, mutation: Mutation, transaction: &Transaction<'_>) {
1909 match mutation {
1910 Mutation::Allocator(AllocatorMutation::Allocate { device_range, owner_object_id }) => {
1911 let len = device_range.length().unwrap();
1912 let mut inner = self.inner.lock();
1913 inner
1914 .owner_bytes
1915 .entry(owner_object_id)
1916 .or_default()
1917 .uncommitted_allocated_bytes -= len;
1918 if let Some(reservation) = transaction.allocator_reservation {
1919 let res_owner = reservation.owner_object_id();
1920 inner.add_reservation(res_owner, len);
1921 reservation.release_reservation(res_owner, len);
1922 }
1923 inner.strategy.free(device_range.clone().into()).expect("drop mutaton");
1924 self.temporary_allocations
1925 .erase(&AllocatorKey { device_range: ExtentKey::new(device_range.into()) });
1926 }
1927 Mutation::Allocator(AllocatorMutation::Deallocate { device_range, .. }) => {
1928 self.temporary_allocations
1929 .erase(&AllocatorKey { device_range: ExtentKey::new(device_range.into()) });
1930 }
1931 _ => {}
1932 }
1933 }
1934
1935 async fn flush(&self) -> Result<Version, Error> {
1936 let filesystem = self.filesystem.upgrade().unwrap();
1937 let object_manager = filesystem.object_manager();
1938 let earliest_version = self.tree.get_earliest_version();
1939 if !object_manager.needs_flush(self.object_id()) && earliest_version == LATEST_VERSION {
1940 return Ok(earliest_version);
1942 }
1943
1944 let fs = self.filesystem.upgrade().unwrap();
1945 let mut flusher = Flusher::new(self, &fs).await;
1946 let (new_layer_file, info) = flusher.start().await?;
1947 flusher.finish(new_layer_file, info).await
1948 }
1949}
1950
1951pub struct CoalescingIterator<I> {
1964 iter: I,
1965 item: Option<AllocatorItem>,
1966}
1967
1968impl<I: LayerIterator<AllocatorKey, AllocatorValue>> CoalescingIterator<I> {
1969 pub async fn new(iter: I) -> Result<CoalescingIterator<I>, Error> {
1970 let mut iter = Self { iter, item: None };
1971 iter.advance().await?;
1972 Ok(iter)
1973 }
1974}
1975
1976#[async_trait]
1977impl<I: LayerIterator<AllocatorKey, AllocatorValue>> LayerIterator<AllocatorKey, AllocatorValue>
1978 for CoalescingIterator<I>
1979{
1980 async fn advance(&mut self) -> Result<(), Error> {
1981 self.item = self.iter.get().map(|x| x.cloned());
1982 if self.item.is_none() {
1983 return Ok(());
1984 }
1985 let left = self.item.as_mut().unwrap();
1986 loop {
1987 self.iter.advance().await?;
1988 match self.iter.get() {
1989 None => return Ok(()),
1990 Some(right) => {
1991 ensure!(
1993 left.key.device_range.end <= right.key.device_range.start,
1994 FxfsError::Inconsistent
1995 );
1996 if left.key.device_range.end < right.key.device_range.start
1998 || left.value != *right.value
1999 {
2000 return Ok(());
2001 }
2002 left.key.device_range.end = right.key.device_range.end;
2003 }
2004 }
2005 }
2006 }
2007
2008 fn get(&self) -> Option<ItemRef<'_, AllocatorKey, AllocatorValue>> {
2009 self.item.as_ref().map(|x| x.as_item_ref())
2010 }
2011}
2012
2013struct Flusher<'a> {
2014 allocator: &'a Allocator,
2015 fs: &'a Arc<FxFilesystem>,
2016 _guard: WriteGuard<'a>,
2017}
2018
2019impl<'a> Flusher<'a> {
2020 async fn new(allocator: &'a Allocator, fs: &'a Arc<FxFilesystem>) -> Self {
2021 let keys = lock_keys![LockKey::flush(allocator.object_id())];
2022 Self { allocator, fs, _guard: fs.lock_manager().write_lock(keys).await }
2023 }
2024
2025 fn txn_options(allocator_reservation: &Reservation) -> Options<'_> {
2026 Options {
2027 skip_journal_checks: true,
2028 borrow_metadata_space: true,
2029 allocator_reservation: Some(allocator_reservation),
2030 ..Default::default()
2031 }
2032 }
2033
2034 async fn start(&mut self) -> Result<(DataObjectHandle<ObjectStore>, AllocatorInfo), Error> {
2035 let object_manager = self.fs.object_manager();
2036 let mut transaction = self
2037 .fs
2038 .clone()
2039 .new_transaction(lock_keys![], Self::txn_options(object_manager.metadata_reservation()))
2040 .await?;
2041
2042 let root_store = self.fs.root_store();
2043 let layer_object_handle = ObjectStore::create_object(
2044 &root_store,
2045 &mut transaction,
2046 HandleOptions { skip_journal_checks: true, ..Default::default() },
2047 None,
2048 )
2049 .await?;
2050 root_store.add_to_graveyard(&mut transaction, layer_object_handle.object_id());
2051 transaction.add(self.allocator.object_id(), Mutation::BeginFlush);
2058 let info = transaction
2059 .commit_with_callback(|_| {
2060 self.allocator.inner.lock().info.clone()
2063 })
2064 .await?;
2065 Ok((layer_object_handle, info))
2066 }
2067
2068 async fn finish(
2069 self,
2070 layer_object_handle: DataObjectHandle<ObjectStore>,
2071 mut info: AllocatorInfo,
2072 ) -> Result<Version, Error> {
2073 let object_manager = self.fs.object_manager();
2074 let txn_options = Self::txn_options(object_manager.metadata_reservation());
2075
2076 let layer_set = self.allocator.tree.immutable_layer_set();
2077 let total_len = layer_set.sum_len();
2078 {
2079 let start_time = std::time::Instant::now();
2080 let merged_layer_count = layer_set.layers.len();
2081 let mut merger = layer_set.merger();
2082 let iter = self.allocator.filter(merger.query(Query::FullScan).await?, true).await?;
2083 let iter = CoalescingIterator::new(iter).await?;
2084 let bytes_written = compact_with_iterator(
2085 iter,
2086 total_len,
2087 DirectWriter::new(&layer_object_handle, txn_options).await,
2088 layer_object_handle.block_size(),
2089 Some(self.fs.journal().get_compaction_yielder()),
2090 )
2091 .await?;
2092
2093 self.allocator.tree.report_compaction_metrics(
2094 bytes_written,
2095 start_time.elapsed(),
2096 merged_layer_count,
2097 );
2098 }
2099
2100 let root_store = self.fs.root_store();
2101
2102 let object_handle;
2104 let reservation_update;
2105 let mut transaction = self
2106 .fs
2107 .clone()
2108 .new_transaction(
2109 lock_keys![LockKey::object(
2110 root_store.store_object_id(),
2111 self.allocator.object_id()
2112 )],
2113 txn_options,
2114 )
2115 .await?;
2116 let mut serialized_info = Vec::new();
2117
2118 debug!(oid = layer_object_handle.object_id(); "new allocator layer file");
2119 object_handle = ObjectStore::open_object(
2120 &root_store,
2121 self.allocator.object_id(),
2122 HandleOptions::default(),
2123 None,
2124 )
2125 .await?;
2126
2127 for object_id in &info.layers {
2129 root_store.add_to_graveyard(&mut transaction, *object_id);
2130 }
2131
2132 let marked_for_deletion = std::mem::take(&mut info.marked_for_deletion);
2139
2140 info.layers = vec![layer_object_handle.object_id()];
2141
2142 info.serialize_with_version(&mut serialized_info)?;
2143
2144 let mut buf = object_handle.allocate_buffer(serialized_info.len()).await;
2145 buf.as_mut_slice()[..serialized_info.len()].copy_from_slice(&serialized_info[..]);
2146 object_handle.txn_write(&mut transaction, 0u64, buf.as_ref()).await?;
2147
2148 reservation_update = ReservationUpdate::new(tree::reservation_amount_from_layer_size(
2149 layer_object_handle.get_size(),
2150 ));
2151
2152 transaction.add_with_object(
2155 self.allocator.object_id(),
2156 Mutation::EndFlush,
2157 AssocObj::Borrowed(&reservation_update),
2158 );
2159 root_store.remove_from_graveyard(&mut transaction, layer_object_handle.object_id());
2160
2161 let layers = layers_from_handles([layer_object_handle]).await?;
2162 transaction
2163 .commit_with_callback(|_| {
2164 self.allocator.tree.set_layers(layers);
2165
2166 let mut inner = self.allocator.inner.lock();
2170 inner.info.layers = info.layers;
2171 for owner_id in marked_for_deletion {
2172 inner.marked_for_deletion.remove(&owner_id);
2173 inner.info.marked_for_deletion.remove(&owner_id);
2174 }
2175 })
2176 .await?;
2177
2178 for layer in layer_set.layers {
2180 let object_id = layer.handle().map(|h| h.object_id());
2181 layer.close_layer().await;
2182 if let Some(object_id) = object_id {
2183 root_store.tombstone_object(object_id, txn_options).await?;
2184 }
2185 }
2186
2187 let mut counters = self.allocator.counters.lock();
2188 counters.num_flushes += 1;
2189 counters.last_flush_time = Some(std::time::SystemTime::now());
2190 Ok(self.allocator.tree.get_earliest_version())
2192 }
2193}
2194
2195#[cfg(test)]
2196mod tests {
2197 use crate::filesystem::{
2198 FxFilesystem, FxFilesystemBuilder, JournalingObject, OpenFxFilesystem,
2199 };
2200 use crate::fsck::fsck;
2201 use crate::lsm_tree::cache::NullCache;
2202 use crate::lsm_tree::skip_list_layer::SkipListLayer;
2203 use crate::lsm_tree::types::{FuzzyHash as _, Item, ItemRef, Layer, LayerIterator};
2204 use crate::lsm_tree::{LSMTree, Query};
2205 use crate::object_handle::ObjectHandle;
2206 use crate::object_store::allocator::merge::merge;
2207 use crate::object_store::allocator::{
2208 Allocator, AllocatorKey, AllocatorValue, CoalescingIterator,
2209 };
2210 use crate::object_store::transaction::{Options, TRANSACTION_METADATA_MAX_AMOUNT, lock_keys};
2211 use crate::object_store::volume::root_volume;
2212 use crate::object_store::{Directory, LockKey, NewChildStoreOptions, ObjectStore};
2213 use crate::range::RangeExt;
2214 use crate::round::round_up;
2215 use crate::serialized_types::{LATEST_VERSION, Versioned};
2216 use crate::testing;
2217 use bincode::Options as _;
2218 use fuchsia_async as fasync;
2219 use fuchsia_sync::Mutex;
2220 use std::cmp::{max, min};
2221 use std::ops::{Bound, Range};
2222 use std::sync::Arc;
2223 use storage_device::DeviceHolder;
2224 use storage_device::fake_device::FakeDevice;
2225
2226 #[test]
2227 fn test_allocator_key_is_range_based() {
2228 assert!(AllocatorKey { device_range: (0..100).into() }.is_range_key());
2230 }
2231
2232 #[test]
2233 fn test_allocator_key_serialization_compatibility() {
2234 let range = 1024..2048;
2235 let key = AllocatorKey { device_range: range.clone().into() };
2236
2237 let options = bincode::DefaultOptions::new().allow_trailing_bytes();
2240 let expected_bytes = options.serialize(&range).unwrap();
2241
2242 let mut key_bytes = Vec::new();
2244 key.serialize_into(&mut key_bytes).unwrap();
2245
2246 assert_eq!(key_bytes, expected_bytes);
2248
2249 let deserialized_key =
2252 AllocatorKey::deserialize_from(&mut &expected_bytes[..], LATEST_VERSION).unwrap();
2253 assert_eq!(deserialized_key, key);
2254 }
2255
2256 #[fuchsia::test]
2257 async fn test_coalescing_iterator() {
2258 let skip_list = SkipListLayer::new(100);
2259 let items = [
2260 Item::new(
2261 AllocatorKey { device_range: (0..100 * 512).into() },
2262 AllocatorValue::Abs { count: 1, owner_object_id: 99 },
2263 ),
2264 Item::new(
2265 AllocatorKey { device_range: (100 * 512..200 * 512).into() },
2266 AllocatorValue::Abs { count: 1, owner_object_id: 99 },
2267 ),
2268 ];
2269 skip_list.insert(items[1].clone()).expect("insert error");
2270 skip_list.insert(items[0].clone()).expect("insert error");
2271 let mut iter =
2272 CoalescingIterator::new(skip_list.seek(Bound::Unbounded).await.expect("seek failed"))
2273 .await
2274 .expect("new failed");
2275 let ItemRef { key, value, .. } = iter.get().expect("get failed");
2276 assert_eq!(
2277 (key, value),
2278 (
2279 &AllocatorKey { device_range: (0..200 * 512).into() },
2280 &AllocatorValue::Abs { count: 1, owner_object_id: 99 }
2281 )
2282 );
2283 iter.advance().await.expect("advance failed");
2284 assert!(iter.get().is_none());
2285 }
2286
2287 #[fuchsia::test]
2288 async fn test_merge_and_coalesce_across_three_layers() {
2289 let lsm_tree = LSMTree::new(merge, Box::new(NullCache {}));
2290 lsm_tree
2291 .insert(Item::new(
2292 AllocatorKey { device_range: (100 * 512..200 * 512).into() },
2293 AllocatorValue::Abs { count: 1, owner_object_id: 99 },
2294 ))
2295 .expect("insert error");
2296 lsm_tree.seal();
2297 lsm_tree
2298 .insert(Item::new(
2299 AllocatorKey { device_range: (0..100 * 512).into() },
2300 AllocatorValue::Abs { count: 1, owner_object_id: 99 },
2301 ))
2302 .expect("insert error");
2303
2304 let layer_set = lsm_tree.layer_set();
2305 let mut merger = layer_set.merger();
2306 let mut iter =
2307 CoalescingIterator::new(merger.query(Query::FullScan).await.expect("seek failed"))
2308 .await
2309 .expect("new failed");
2310 let ItemRef { key, value, .. } = iter.get().expect("get failed");
2311 assert_eq!(
2312 (key, value),
2313 (
2314 &AllocatorKey { device_range: (0..200 * 512).into() },
2315 &AllocatorValue::Abs { count: 1, owner_object_id: 99 }
2316 )
2317 );
2318 iter.advance().await.expect("advance failed");
2319 assert!(iter.get().is_none());
2320 }
2321
2322 #[fuchsia::test]
2323 async fn test_merge_and_coalesce_wont_merge_across_object_id() {
2324 let lsm_tree = LSMTree::new(merge, Box::new(NullCache {}));
2325 lsm_tree
2326 .insert(Item::new(
2327 AllocatorKey { device_range: (100 * 512..200 * 512).into() },
2328 AllocatorValue::Abs { count: 1, owner_object_id: 99 },
2329 ))
2330 .expect("insert error");
2331 lsm_tree.seal();
2332 lsm_tree
2333 .insert(Item::new(
2334 AllocatorKey { device_range: (0..100 * 512).into() },
2335 AllocatorValue::Abs { count: 1, owner_object_id: 98 },
2336 ))
2337 .expect("insert error");
2338
2339 let layer_set = lsm_tree.layer_set();
2340 let mut merger = layer_set.merger();
2341 let mut iter =
2342 CoalescingIterator::new(merger.query(Query::FullScan).await.expect("seek failed"))
2343 .await
2344 .expect("new failed");
2345 let ItemRef { key, value, .. } = iter.get().expect("get failed");
2346 assert_eq!(
2347 (key, value),
2348 (
2349 &AllocatorKey { device_range: (0..100 * 512).into() },
2350 &AllocatorValue::Abs { count: 1, owner_object_id: 98 },
2351 )
2352 );
2353 iter.advance().await.expect("advance failed");
2354 let ItemRef { key, value, .. } = iter.get().expect("get failed");
2355 assert_eq!(
2356 (key, value),
2357 (
2358 &AllocatorKey { device_range: (100 * 512..200 * 512).into() },
2359 &AllocatorValue::Abs { count: 1, owner_object_id: 99 }
2360 )
2361 );
2362 iter.advance().await.expect("advance failed");
2363 assert!(iter.get().is_none());
2364 }
2365
2366 fn overlap(a: &Range<u64>, b: &Range<u64>) -> u64 {
2367 if a.end > b.start && a.start < b.end {
2368 min(a.end, b.end) - max(a.start, b.start)
2369 } else {
2370 0
2371 }
2372 }
2373
2374 async fn collect_allocations(allocator: &Allocator) -> Vec<Range<u64>> {
2375 let layer_set = allocator.tree.layer_set();
2376 let mut merger = layer_set.merger();
2377 let mut iter = allocator
2378 .filter(merger.query(Query::FullScan).await.expect("seek failed"), false)
2379 .await
2380 .expect("build iterator");
2381 let mut allocations: Vec<Range<u64>> = Vec::new();
2382 while let Some(ItemRef { key: AllocatorKey { device_range }, .. }) = iter.get() {
2383 if let Some(r) = allocations.last() {
2384 assert!(device_range.range.start >= r.end);
2385 }
2386 allocations.push(device_range.range.clone());
2387 iter.advance().await.expect("advance failed");
2388 }
2389 allocations
2390 }
2391
2392 async fn check_allocations(allocator: &Allocator, expected_allocations: &[Range<u64>]) {
2393 let layer_set = allocator.tree.layer_set();
2394 let mut merger = layer_set.merger();
2395 let mut iter = allocator
2396 .filter(merger.query(Query::FullScan).await.expect("seek failed"), false)
2397 .await
2398 .expect("build iterator");
2399 let mut found = 0;
2400 while let Some(ItemRef { key: AllocatorKey { device_range }, .. }) = iter.get() {
2401 let mut l = device_range.range.length().expect("Invalid range");
2402 found += l;
2403 for range in expected_allocations {
2406 l -= overlap(range, &device_range.range);
2407 if l == 0 {
2408 break;
2409 }
2410 }
2411 assert_eq!(l, 0, "range {:?} not covered by expectations", device_range.range);
2412 iter.advance().await.expect("advance failed");
2413 }
2414 assert_eq!(found, expected_allocations.iter().map(|r| r.length().unwrap()).sum::<u64>());
2416 }
2417
2418 async fn test_fs() -> (OpenFxFilesystem, Arc<Allocator>) {
2419 let device = DeviceHolder::new(FakeDevice::new(4096, 4096));
2420 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2421 let allocator = fs.allocator();
2422 (fs, allocator)
2423 }
2424
2425 #[fuchsia::test]
2426 async fn test_allocations() {
2427 const STORE_OBJECT_ID: u64 = 99;
2428 let (fs, allocator) = test_fs().await;
2429 let mut transaction =
2430 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2431 let mut device_ranges = collect_allocations(&allocator).await;
2432
2433 let expected = vec![
2435 0..4096, 4096..139264, 139264..204800, 204800..335872, 335872..401408, 524288..528384, ];
2442 assert_eq!(device_ranges, expected);
2443 device_ranges.push(
2444 allocator
2445 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2446 .await
2447 .expect("allocate failed"),
2448 );
2449 assert_eq!(device_ranges.last().unwrap().length().expect("Invalid range"), fs.block_size());
2450 device_ranges.push(
2451 allocator
2452 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2453 .await
2454 .expect("allocate failed"),
2455 );
2456 assert_eq!(device_ranges.last().unwrap().length().expect("Invalid range"), fs.block_size());
2457 assert_eq!(overlap(&device_ranges[0], &device_ranges[1]), 0);
2458 transaction.commit().await.expect("commit failed");
2459 let mut transaction =
2460 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2461 device_ranges.push(
2462 allocator
2463 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2464 .await
2465 .expect("allocate failed"),
2466 );
2467 assert_eq!(device_ranges[7].length().expect("Invalid range"), fs.block_size());
2468 assert_eq!(overlap(&device_ranges[5], &device_ranges[7]), 0);
2469 assert_eq!(overlap(&device_ranges[6], &device_ranges[7]), 0);
2470 transaction.commit().await.expect("commit failed");
2471
2472 check_allocations(&allocator, &device_ranges).await;
2473 }
2474
2475 #[fuchsia::test]
2476 async fn test_allocate_more_than_max_size() {
2477 const STORE_OBJECT_ID: u64 = 99;
2478 let (fs, allocator) = test_fs().await;
2479 let mut transaction =
2480 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2481 let mut device_ranges = collect_allocations(&allocator).await;
2482 device_ranges.push(
2483 allocator
2484 .allocate(&mut transaction, STORE_OBJECT_ID, fs.device().size())
2485 .await
2486 .expect("allocate failed"),
2487 );
2488 assert_eq!(
2489 device_ranges.last().unwrap().length().expect("Invalid range"),
2490 allocator.max_extent_size_bytes
2491 );
2492 transaction.commit().await.expect("commit failed");
2493
2494 check_allocations(&allocator, &device_ranges).await;
2495 }
2496
2497 #[fuchsia::test]
2498 async fn test_deallocations() {
2499 const STORE_OBJECT_ID: u64 = 99;
2500 let (fs, allocator) = test_fs().await;
2501 let initial_allocations = collect_allocations(&allocator).await;
2502
2503 let mut transaction =
2504 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2505 let device_range1 = allocator
2506 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2507 .await
2508 .expect("allocate failed");
2509 assert_eq!(device_range1.length().expect("Invalid range"), fs.block_size());
2510 transaction.commit().await.expect("commit failed");
2511
2512 let mut transaction =
2513 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2514 allocator
2515 .deallocate(&mut transaction, STORE_OBJECT_ID, device_range1)
2516 .await
2517 .expect("deallocate failed");
2518 transaction.commit().await.expect("commit failed");
2519
2520 check_allocations(&allocator, &initial_allocations).await;
2521 }
2522
2523 #[fuchsia::test]
2524 async fn test_mark_allocated() {
2525 const STORE_OBJECT_ID: u64 = 99;
2526 let (fs, allocator) = test_fs().await;
2527 let mut device_ranges = collect_allocations(&allocator).await;
2528 let range = {
2529 let mut transaction = fs
2530 .clone()
2531 .new_transaction(lock_keys![], Options::default())
2532 .await
2533 .expect("new failed");
2534 allocator
2536 .allocate(&mut transaction, STORE_OBJECT_ID, 2 * fs.block_size())
2537 .await
2538 .expect("allocate failed")
2539 };
2541
2542 let mut transaction =
2543 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2544
2545 device_ranges.push(
2548 allocator
2549 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2550 .await
2551 .expect("allocate failed"),
2552 );
2553
2554 assert_eq!(device_ranges.last().unwrap().start, range.start);
2555
2556 let mut range2 = range.clone();
2558 range2.start += fs.block_size();
2559 allocator
2560 .mark_allocated(&mut transaction, STORE_OBJECT_ID, range2.clone())
2561 .expect("mark_allocated failed");
2562 device_ranges.push(range2);
2563
2564 device_ranges.push(
2566 allocator
2567 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2568 .await
2569 .expect("allocate failed"),
2570 );
2571 let last_range = device_ranges.last().unwrap();
2572 assert_eq!(last_range.length().expect("Invalid range"), fs.block_size());
2573 assert_eq!(overlap(last_range, &range), 0);
2574 transaction.commit().await.expect("commit failed");
2575
2576 check_allocations(&allocator, &device_ranges).await;
2577 }
2578
2579 #[fuchsia::test]
2580 async fn test_mark_for_deletion() {
2581 const STORE_OBJECT_ID: u64 = 99;
2582 let (fs, allocator) = test_fs().await;
2583
2584 let initial_allocated_bytes = allocator.get_allocated_bytes();
2586 let mut device_ranges = collect_allocations(&allocator).await;
2587 let mut transaction =
2588 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2589 for _ in 0..15 {
2591 device_ranges.push(
2592 allocator
2593 .allocate(&mut transaction, STORE_OBJECT_ID, 100 * fs.block_size())
2594 .await
2595 .expect("allocate failed"),
2596 );
2597 device_ranges.push(
2598 allocator
2599 .allocate(&mut transaction, STORE_OBJECT_ID, 100 * fs.block_size())
2600 .await
2601 .expect("allocate2 failed"),
2602 );
2603 }
2604 transaction.commit().await.expect("commit failed");
2605 check_allocations(&allocator, &device_ranges).await;
2606
2607 assert_eq!(
2608 allocator.get_allocated_bytes(),
2609 initial_allocated_bytes + fs.block_size() * 3000
2610 );
2611
2612 let mut transaction =
2614 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2615 allocator.mark_for_deletion(&mut transaction, STORE_OBJECT_ID);
2616 transaction.commit().await.expect("commit failed");
2617
2618 assert_eq!(allocator.get_allocated_bytes(), initial_allocated_bytes);
2620 check_allocations(&allocator, &device_ranges).await;
2621
2622 device_ranges.clear();
2625
2626 let mut transaction =
2627 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2628 let target_bytes = 1500 * fs.block_size();
2629 while device_ranges.iter().map(|x| x.length().unwrap()).sum::<u64>() != target_bytes {
2630 let len = std::cmp::min(
2631 target_bytes - device_ranges.iter().map(|x| x.length().unwrap()).sum::<u64>(),
2632 100 * fs.block_size(),
2633 );
2634 device_ranges.push(
2635 allocator.allocate(&mut transaction, 100, len).await.expect("allocate failed"),
2636 );
2637 }
2638 transaction.commit().await.expect("commit failed");
2639
2640 allocator.flush().await.expect("flush failed");
2642
2643 assert!(!allocator.get_owner_allocated_bytes().contains_key(&STORE_OBJECT_ID));
2647 assert_eq!(*allocator.get_owner_allocated_bytes().get(&100).unwrap(), target_bytes,);
2648 }
2649
2650 async fn create_file(store: &Arc<ObjectStore>, size: usize) {
2651 let root_directory =
2652 Directory::open(store, store.root_directory_object_id()).await.expect("open failed");
2653
2654 let mut transaction = store
2655 .filesystem()
2656 .new_transaction(
2657 lock_keys![LockKey::object(
2658 store.store_object_id(),
2659 store.root_directory_object_id()
2660 )],
2661 Options::default(),
2662 )
2663 .await
2664 .expect("new_transaction failed");
2665 let file = root_directory
2666 .create_child_file(&mut transaction, &format!("foo {}", size))
2667 .await
2668 .expect("create_child_file failed");
2669 transaction.commit().await.expect("commit failed");
2670
2671 let buffer = file.allocate_buffer(size).await;
2672
2673 let mut transaction = file
2675 .new_transaction_with_options(Options {
2676 borrow_metadata_space: true,
2677 ..Default::default()
2678 })
2679 .await
2680 .expect("new_transaction_with_options failed");
2681 file.txn_write(&mut transaction, 0, buffer.as_ref()).await.expect("txn_write failed");
2682 transaction.commit().await.expect("commit failed");
2683 }
2684
2685 #[fuchsia::test]
2686 async fn test_replay_with_deleted_store_and_compaction() {
2687 let (fs, _) = test_fs().await;
2688
2689 const FILE_SIZE: usize = 10_000_000;
2690
2691 let mut store_id = {
2692 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2693 let store = root_vol
2694 .new_volume("vol", NewChildStoreOptions::default())
2695 .await
2696 .expect("new_volume failed");
2697
2698 create_file(&store, FILE_SIZE).await;
2699 store.store_object_id()
2700 };
2701
2702 fs.close().await.expect("close failed");
2703 let device = fs.take_device().await;
2704 device.reopen(false);
2705
2706 let mut fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2707
2708 fs.journal().force_compact().await.expect("compact failed");
2711
2712 for _ in 0..2 {
2713 {
2714 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2715
2716 let transaction = fs
2717 .clone()
2718 .new_transaction(
2719 lock_keys![
2720 LockKey::object(
2721 root_vol.volume_directory().store().store_object_id(),
2722 root_vol.volume_directory().object_id(),
2723 ),
2724 LockKey::flush(store_id)
2725 ],
2726 Options { borrow_metadata_space: true, ..Default::default() },
2727 )
2728 .await
2729 .expect("new_transaction failed");
2730 root_vol
2731 .delete_volume("vol", transaction, || {})
2732 .await
2733 .expect("delete_volume failed");
2734
2735 let store = root_vol
2736 .new_volume("vol", NewChildStoreOptions::default())
2737 .await
2738 .expect("new_volume failed");
2739 create_file(&store, FILE_SIZE).await;
2740 store_id = store.store_object_id();
2741 }
2742
2743 fs.close().await.expect("close failed");
2744 let device = fs.take_device().await;
2745 device.reopen(false);
2746
2747 fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2748 }
2749
2750 fsck(fs.clone()).await.expect("fsck failed");
2751 fs.close().await.expect("close failed");
2752 }
2753
2754 #[fuchsia::test(threads = 4)]
2755 async fn test_compaction_delete_race() {
2756 let (fs, _allocator) = test_fs().await;
2757
2758 {
2759 const FILE_SIZE: usize = 10_000_000;
2760
2761 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2762 let store = root_vol
2763 .new_volume("vol", NewChildStoreOptions::default())
2764 .await
2765 .expect("new_volume failed");
2766
2767 create_file(&store, FILE_SIZE).await;
2768
2769 let fs_clone = fs.clone();
2771
2772 let executor_tasks = testing::force_executor_threads_to_run(4).await;
2775
2776 let task = fasync::Task::spawn(async move {
2777 fs_clone.journal().force_compact().await.expect("compact failed");
2778 });
2779
2780 drop(executor_tasks);
2782
2783 let sleep = rand::random_range(3000..6000);
2786 std::thread::sleep(std::time::Duration::from_micros(sleep));
2787 log::info!("sleep {sleep}us");
2788
2789 let transaction = fs
2790 .clone()
2791 .new_transaction(
2792 lock_keys![
2793 LockKey::object(
2794 root_vol.volume_directory().store().store_object_id(),
2795 root_vol.volume_directory().object_id(),
2796 ),
2797 LockKey::flush(store.store_object_id())
2798 ],
2799 Options { borrow_metadata_space: true, ..Default::default() },
2800 )
2801 .await
2802 .expect("new_transaction failed");
2803 root_vol.delete_volume("vol", transaction, || {}).await.expect("delete_volume failed");
2804
2805 task.await;
2806 }
2807
2808 fs.journal().force_compact().await.expect("compact failed");
2809 fs.close().await.expect("close failed");
2810
2811 let device = fs.take_device().await;
2812 device.reopen(false);
2813
2814 let fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2815 fsck(fs.clone()).await.expect("fsck failed");
2816 fs.close().await.expect("close failed");
2817 }
2818
2819 #[fuchsia::test]
2820 async fn test_delete_multiple_volumes() {
2821 let (mut fs, _) = test_fs().await;
2822
2823 for _ in 0..50 {
2824 {
2825 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2826 let store = root_vol
2827 .new_volume("vol", NewChildStoreOptions::default())
2828 .await
2829 .expect("new_volume failed");
2830
2831 create_file(&store, 1_000_000).await;
2832
2833 let transaction = fs
2834 .clone()
2835 .new_transaction(
2836 lock_keys![
2837 LockKey::object(
2838 root_vol.volume_directory().store().store_object_id(),
2839 root_vol.volume_directory().object_id(),
2840 ),
2841 LockKey::flush(store.store_object_id())
2842 ],
2843 Options { borrow_metadata_space: true, ..Default::default() },
2844 )
2845 .await
2846 .expect("new_transaction failed");
2847 root_vol
2848 .delete_volume("vol", transaction, || {})
2849 .await
2850 .expect("delete_volume failed");
2851
2852 fs.allocator().flush().await.expect("flush failed");
2853 }
2854
2855 fs.close().await.expect("close failed");
2856 let device = fs.take_device().await;
2857 device.reopen(false);
2858
2859 fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2860 }
2861
2862 fsck(fs.clone()).await.expect("fsck failed");
2863 fs.close().await.expect("close failed");
2864 }
2865
2866 #[fuchsia::test]
2867 async fn test_allocate_free_reallocate() {
2868 const STORE_OBJECT_ID: u64 = 99;
2869 let (fs, allocator) = test_fs().await;
2870
2871 let mut device_ranges = Vec::new();
2873 let mut transaction =
2874 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2875 for _ in 0..30 {
2876 device_ranges.push(
2877 allocator
2878 .allocate(&mut transaction, STORE_OBJECT_ID, 100 * fs.block_size())
2879 .await
2880 .expect("allocate failed"),
2881 );
2882 }
2883 transaction.commit().await.expect("commit failed");
2884
2885 assert_eq!(
2886 fs.block_size() * 3000,
2887 *allocator.get_owner_allocated_bytes().entry(STORE_OBJECT_ID).or_default()
2888 );
2889
2890 let mut transaction =
2892 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2893 for range in std::mem::replace(&mut device_ranges, Vec::new()) {
2894 allocator.deallocate(&mut transaction, STORE_OBJECT_ID, range).await.expect("dealloc");
2895 }
2896 transaction.commit().await.expect("commit failed");
2897
2898 assert_eq!(0, *allocator.get_owner_allocated_bytes().entry(STORE_OBJECT_ID).or_default());
2899
2900 let mut transaction =
2903 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2904 let target_len = 1500 * fs.block_size();
2905 while device_ranges.iter().map(|i| i.length().unwrap()).sum::<u64>() != target_len {
2906 let len = target_len - device_ranges.iter().map(|i| i.length().unwrap()).sum::<u64>();
2907 device_ranges.push(
2908 allocator
2909 .allocate(&mut transaction, STORE_OBJECT_ID, len)
2910 .await
2911 .expect("allocate failed"),
2912 );
2913 }
2914 transaction.commit().await.expect("commit failed");
2915
2916 assert_eq!(
2917 fs.block_size() * 1500,
2918 *allocator.get_owner_allocated_bytes().entry(STORE_OBJECT_ID).or_default()
2919 );
2920 }
2921
2922 #[fuchsia::test]
2923 async fn test_flush() {
2924 const STORE_OBJECT_ID: u64 = 99;
2925
2926 let mut device_ranges = Vec::new();
2927 let device = {
2928 let (fs, allocator) = test_fs().await;
2929 let mut transaction = fs
2930 .clone()
2931 .new_transaction(lock_keys![], Options::default())
2932 .await
2933 .expect("new failed");
2934 device_ranges.push(
2935 allocator
2936 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2937 .await
2938 .expect("allocate failed"),
2939 );
2940 device_ranges.push(
2941 allocator
2942 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2943 .await
2944 .expect("allocate failed"),
2945 );
2946 device_ranges.push(
2947 allocator
2948 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2949 .await
2950 .expect("allocate failed"),
2951 );
2952 transaction.commit().await.expect("commit failed");
2953
2954 allocator.flush().await.expect("flush failed");
2955
2956 fs.close().await.expect("close failed");
2957 fs.take_device().await
2958 };
2959
2960 device.reopen(false);
2961 let fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2962 let allocator = fs.allocator();
2963
2964 let allocated = collect_allocations(&allocator).await;
2965
2966 for i in &device_ranges {
2968 let mut overlapping = 0;
2969 for j in &allocated {
2970 overlapping += overlap(i, j);
2971 }
2972 assert_eq!(overlapping, i.length().unwrap(), "Range {i:?} not allocated");
2973 }
2974
2975 let mut transaction =
2976 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2977 let range = allocator
2978 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2979 .await
2980 .expect("allocate failed");
2981
2982 for r in &allocated {
2984 assert_eq!(overlap(r, &range), 0);
2985 }
2986 transaction.commit().await.expect("commit failed");
2987 }
2988
2989 #[fuchsia::test]
2990 async fn test_dropped_transaction() {
2991 const STORE_OBJECT_ID: u64 = 99;
2992 let (fs, allocator) = test_fs().await;
2993 let allocated_range = {
2994 let mut transaction = fs
2995 .clone()
2996 .new_transaction(lock_keys![], Options::default())
2997 .await
2998 .expect("new_transaction failed");
2999 allocator
3000 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
3001 .await
3002 .expect("allocate failed")
3003 };
3004 let mut transaction = fs
3007 .clone()
3008 .new_transaction(lock_keys![], Options::default())
3009 .await
3010 .expect("new_transaction failed");
3011 assert_eq!(
3012 allocator
3013 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
3014 .await
3015 .expect("allocate failed"),
3016 allocated_range
3017 );
3018 }
3019
3020 #[fuchsia::test]
3021 async fn test_cleanup_removed_owner() {
3022 const STORE_OBJECT_ID: u64 = 99;
3023 let device = {
3024 let (fs, allocator) = test_fs().await;
3025
3026 assert!(!allocator.get_owner_allocated_bytes().contains_key(&STORE_OBJECT_ID));
3027 {
3028 let mut transaction =
3029 fs.clone().new_transaction(lock_keys![], Options::default()).await.unwrap();
3030 allocator
3031 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
3032 .await
3033 .expect("Allocating");
3034 transaction.commit().await.expect("Committing.");
3035 }
3036 allocator.flush().await.expect("Flushing");
3037 assert!(allocator.get_owner_allocated_bytes().contains_key(&STORE_OBJECT_ID));
3038 {
3039 let mut transaction =
3040 fs.clone().new_transaction(lock_keys![], Options::default()).await.unwrap();
3041 allocator.mark_for_deletion(&mut transaction, STORE_OBJECT_ID);
3042 transaction.commit().await.expect("Committing.");
3043 }
3044 assert!(!allocator.get_owner_allocated_bytes().contains_key(&STORE_OBJECT_ID));
3045 fs.close().await.expect("Closing");
3046 fs.take_device().await
3047 };
3048
3049 device.reopen(false);
3050 let fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
3051 let allocator = fs.allocator();
3052 assert!(!allocator.get_owner_allocated_bytes().contains_key(&STORE_OBJECT_ID));
3053 }
3054
3055 #[fuchsia::test]
3056 async fn test_allocated_bytes() {
3057 const STORE_OBJECT_ID: u64 = 99;
3058 let (fs, allocator) = test_fs().await;
3059
3060 let initial_allocated_bytes = allocator.get_allocated_bytes();
3061
3062 let allocated_bytes = initial_allocated_bytes + fs.block_size();
3064 let allocated_range = {
3065 let mut transaction = fs
3066 .clone()
3067 .new_transaction(lock_keys![], Options::default())
3068 .await
3069 .expect("new_transaction failed");
3070 let range = allocator
3071 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
3072 .await
3073 .expect("allocate failed");
3074 transaction.commit().await.expect("commit failed");
3075 assert_eq!(allocator.get_allocated_bytes(), allocated_bytes);
3076 range
3077 };
3078
3079 {
3080 let mut transaction = fs
3081 .clone()
3082 .new_transaction(lock_keys![], Options::default())
3083 .await
3084 .expect("new_transaction failed");
3085 allocator
3086 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
3087 .await
3088 .expect("allocate failed");
3089
3090 assert_eq!(allocator.get_allocated_bytes(), allocated_bytes);
3092 }
3093
3094 assert_eq!(allocator.get_allocated_bytes(), allocated_bytes);
3096
3097 let deallocate_range = allocated_range.start + 20..allocated_range.end - 20;
3099 let mut transaction =
3100 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
3101 allocator
3102 .deallocate(&mut transaction, STORE_OBJECT_ID, deallocate_range)
3103 .await
3104 .expect("deallocate failed");
3105
3106 assert_eq!(allocator.get_allocated_bytes(), allocated_bytes);
3108
3109 transaction.commit().await.expect("commit failed");
3110
3111 assert_eq!(allocator.get_allocated_bytes(), initial_allocated_bytes + 40);
3113 }
3114
3115 #[fuchsia::test]
3116 async fn test_persist_bytes_limit() {
3117 const LIMIT: u64 = 12345;
3118 const OWNER_ID: u64 = 12;
3119
3120 let (fs, allocator) = test_fs().await;
3121 {
3122 let mut transaction = fs
3123 .clone()
3124 .new_transaction(lock_keys![], Options::default())
3125 .await
3126 .expect("new_transaction failed");
3127 allocator
3128 .set_bytes_limit(&mut transaction, OWNER_ID, LIMIT)
3129 .expect("Failed to set limit.");
3130 assert!(allocator.inner.lock().info.limit_bytes.get(&OWNER_ID).is_none());
3131 transaction.commit().await.expect("Failed to commit transaction");
3132 let bytes: u64 = *allocator
3133 .inner
3134 .lock()
3135 .info
3136 .limit_bytes
3137 .get(&OWNER_ID)
3138 .expect("Failed to find limit");
3139 assert_eq!(LIMIT, bytes);
3140 }
3141 }
3142
3143 fn coalesce_ranges(ranges: Vec<Range<u64>>) -> Vec<Range<u64>> {
3147 let mut coalesced = Vec::new();
3148 let mut prev: Option<Range<u64>> = None;
3149 for range in ranges {
3150 if let Some(prev_range) = &mut prev {
3151 if range.start == prev_range.end {
3152 prev_range.end = range.end;
3153 } else {
3154 coalesced.push(prev_range.clone());
3155 prev = Some(range);
3156 }
3157 } else {
3158 prev = Some(range);
3159 }
3160 }
3161 if let Some(prev_range) = prev {
3162 coalesced.push(prev_range);
3163 }
3164 coalesced
3165 }
3166
3167 #[fuchsia::test]
3168 async fn test_take_for_trimming() {
3169 const STORE_OBJECT_ID: u64 = 99;
3170
3171 let allocated_range;
3174 let expected_free_ranges;
3175 let device = {
3176 let (fs, allocator) = test_fs().await;
3177 let bs = fs.block_size();
3178 let mut transaction = fs
3179 .clone()
3180 .new_transaction(lock_keys![], Options::default())
3181 .await
3182 .expect("new failed");
3183 allocated_range = allocator
3184 .allocate(&mut transaction, STORE_OBJECT_ID, 32 * bs)
3185 .await
3186 .expect("allocate failed");
3187 transaction.commit().await.expect("commit failed");
3188
3189 let mut transaction = fs
3190 .clone()
3191 .new_transaction(lock_keys![], Options::default())
3192 .await
3193 .expect("new failed");
3194 let base = allocated_range.start;
3195 expected_free_ranges = vec![
3196 base..(base + (bs * 1)),
3197 (base + (bs * 2))..(base + (bs * 3)),
3198 (base + (bs * 4))..(base + (bs * 8)),
3202 (base + (bs * 8))..(base + (bs * 12)),
3203 (base + (bs * 12))..(base + (bs * 13)),
3204 (base + (bs * 29))..(base + (bs * 30)),
3205 ];
3206 for range in &expected_free_ranges {
3207 allocator
3208 .deallocate(&mut transaction, STORE_OBJECT_ID, range.clone())
3209 .await
3210 .expect("deallocate failed");
3211 }
3212 transaction.commit().await.expect("commit failed");
3213
3214 allocator.flush().await.expect("flush failed");
3215
3216 fs.close().await.expect("close failed");
3217 fs.take_device().await
3218 };
3219
3220 device.reopen(false);
3221 let fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
3222 let allocator = fs.allocator();
3223
3224 let max_extent_size = fs.block_size() as usize * 4;
3228 const EXTENTS_PER_BATCH: usize = 2;
3229 let mut free_ranges = vec![];
3230 let mut offset = allocated_range.start;
3231 while offset < allocated_range.end {
3232 let free = allocator
3233 .take_for_trimming(offset, max_extent_size, EXTENTS_PER_BATCH)
3234 .await
3235 .expect("take_for_trimming failed");
3236 free_ranges.extend(
3237 free.extents().iter().filter(|range| range.end <= allocated_range.end).cloned(),
3238 );
3239 offset = free.extents().last().expect("Unexpectedly hit the end of free extents").end;
3240 }
3241 let coalesced_free_ranges = coalesce_ranges(free_ranges);
3244 let coalesced_expected_free_ranges = coalesce_ranges(expected_free_ranges);
3245
3246 assert_eq!(coalesced_free_ranges, coalesced_expected_free_ranges);
3247 }
3248
3249 #[fuchsia::test]
3250 async fn test_allocations_wait_for_free_extents() {
3251 const STORE_OBJECT_ID: u64 = 99;
3252 let (fs, allocator) = test_fs().await;
3253 let allocator_clone = allocator.clone();
3254
3255 let mut transaction =
3256 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
3257
3258 let max_extent_size = fs.device().size() as usize;
3260 const EXTENTS_PER_BATCH: usize = usize::MAX;
3261
3262 let trim_done = Arc::new(Mutex::new(false));
3268 let trimmable_extents = allocator
3269 .take_for_trimming(0, max_extent_size, EXTENTS_PER_BATCH)
3270 .await
3271 .expect("take_for_trimming failed");
3272
3273 let trim_done_clone = trim_done.clone();
3274 let bs = fs.block_size();
3275 let alloc_task = fasync::Task::spawn(async move {
3276 allocator_clone
3277 .allocate(&mut transaction, STORE_OBJECT_ID, bs)
3278 .await
3279 .expect("allocate should fail");
3280 {
3281 assert!(*trim_done_clone.lock(), "Allocation finished before trim completed");
3282 }
3283 transaction.commit().await.expect("commit failed");
3284 });
3285
3286 fasync::Timer::new(std::time::Duration::from_millis(100)).await;
3289
3290 {
3292 let mut trim_done = trim_done.lock();
3293 std::mem::drop(trimmable_extents);
3294 *trim_done = true;
3295 }
3296
3297 alloc_task.await;
3298 }
3299
3300 #[fuchsia::test]
3301 async fn test_allocation_with_reservation_not_multiple_of_block_size() {
3302 const STORE_OBJECT_ID: u64 = 99;
3303 let (fs, allocator) = test_fs().await;
3304
3305 const RESERVATION_AMOUNT: u64 = TRANSACTION_METADATA_MAX_AMOUNT + 5000;
3307 let reservation =
3308 allocator.clone().reserve(Some(STORE_OBJECT_ID), RESERVATION_AMOUNT).unwrap();
3309
3310 let mut transaction = fs
3311 .clone()
3312 .new_transaction(
3313 lock_keys![],
3314 Options { allocator_reservation: Some(&reservation), ..Options::default() },
3315 )
3316 .await
3317 .expect("new failed");
3318
3319 let range = allocator
3320 .allocate(
3321 &mut transaction,
3322 STORE_OBJECT_ID,
3323 round_up(RESERVATION_AMOUNT, fs.block_size()).unwrap(),
3324 )
3325 .await
3326 .expect("allocate faiiled");
3327 assert_eq!((range.end - range.start) % fs.block_size(), 0);
3328
3329 println!("{}", range.end - range.start);
3330 }
3331}