1pub mod merge;
90pub mod strategy;
91
92use crate::drop_event::DropEvent;
93use crate::errors::FxfsError;
94use crate::filesystem::{ApplyContext, ApplyMode, FxFilesystem, JournalingObject, SyncOptions};
95use crate::log::*;
96use crate::lsm_tree::cache::NullCache;
97use crate::lsm_tree::skip_list_layer::SkipListLayer;
98use crate::lsm_tree::types::{
99 FuzzyHash, Item, ItemRef, Layer, LayerIterator, LayerKey, MergeType, OrdLowerBound,
100 OrdUpperBound, SortByU64, Value,
101};
102use crate::lsm_tree::{LSMTree, Query, compact_with_iterator, layers_from_handles};
103use crate::object_handle::{INVALID_OBJECT_ID, ObjectHandle, ReadObjectHandle};
104use crate::object_store::object_manager::ReservationUpdate;
105use crate::object_store::transaction::{
106 AllocatorMutation, AssocObj, LockKey, Mutation, Options, Transaction, WriteGuard, lock_keys,
107};
108use crate::object_store::{
109 DataObjectHandle, DirectWriter, Extent, HandleOptions, ObjectStore, ReservedId, tree,
110};
111use crate::range::RangeExt;
112use crate::round::{round_div, round_down, round_up};
113use crate::serialized_types::{
114 DEFAULT_MAX_SERIALIZED_RECORD_SIZE, LATEST_VERSION, Version, Versioned, VersionedLatest,
115};
116use anyhow::{Context, Error, anyhow, bail, ensure};
117use async_trait::async_trait;
118use either::Either::{Left, Right};
119use event_listener::EventListener;
120use fprint::TypeFingerprint;
121use fuchsia_inspect::HistogramProperty;
122use fuchsia_sync::Mutex;
123use futures::FutureExt;
124use fxfs_macros::SerializeKey;
125use merge::{filter_marked_for_deletion, filter_tombstones, merge};
126use serde::{Deserialize, Serialize};
127use std::borrow::Borrow;
128use std::collections::{BTreeMap, HashSet, VecDeque};
129use std::hash::Hash;
130use std::marker::PhantomData;
131use std::num::{NonZero, Saturating};
132use std::ops::Range;
133use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
134use std::sync::{Arc, Weak};
135
136pub trait ReservationOwner: Send + Sync {
138 fn release_reservation(&self, owner_object_id: Option<u64>, amount: u64);
142}
143
144pub struct ReservationImpl<T: Borrow<U>, U: ReservationOwner + ?Sized> {
152 owner: T,
153 owner_object_id: Option<u64>,
154 inner: Mutex<ReservationInner>,
155 phantom: PhantomData<U>,
156}
157
158#[derive(Debug, Default)]
159struct ReservationInner {
160 amount: u64,
162
163 reserved: u64,
165}
166
167impl<T: Borrow<U>, U: ReservationOwner + ?Sized> std::fmt::Debug for ReservationImpl<T, U> {
168 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
169 self.inner.lock().fmt(f)
170 }
171}
172
173impl<T: Borrow<U> + Clone + Send + Sync, U: ReservationOwner + ?Sized> ReservationImpl<T, U> {
174 pub fn new(owner: T, owner_object_id: Option<u64>, amount: u64) -> Self {
175 Self {
176 owner,
177 owner_object_id,
178 inner: Mutex::new(ReservationInner { amount, reserved: 0 }),
179 phantom: PhantomData,
180 }
181 }
182
183 pub fn owner_object_id(&self) -> Option<u64> {
184 self.owner_object_id
185 }
186
187 pub fn amount(&self) -> u64 {
189 self.inner.lock().amount
190 }
191
192 pub fn add(&self, amount: u64) {
194 self.inner.lock().amount += amount;
195 }
196
197 pub fn forget(&self) -> u64 {
201 let mut inner = self.inner.lock();
202 assert_eq!(inner.reserved, 0);
203 std::mem::take(&mut inner.amount)
204 }
205
206 pub fn forget_some(&self, amount: u64) {
210 let mut inner = self.inner.lock();
211 inner.amount -= amount;
212 assert!(inner.reserved <= inner.amount);
213 }
214
215 fn reserve_with(&self, amount: impl FnOnce(u64) -> u64) -> ReservationImpl<&Self, Self> {
218 let mut inner = self.inner.lock();
219 let taken = amount(inner.amount - inner.reserved);
220 inner.reserved += taken;
221 ReservationImpl::new(self, self.owner_object_id, taken)
222 }
223
224 pub fn reserve(&self, amount: u64) -> Option<ReservationImpl<&Self, Self>> {
226 let mut inner = self.inner.lock();
227 if inner.amount - inner.reserved < amount {
228 None
229 } else {
230 inner.reserved += amount;
231 Some(ReservationImpl::new(self, self.owner_object_id, amount))
232 }
233 }
234
235 pub fn commit(&self, amount: u64) {
238 let mut inner = self.inner.lock();
239 inner.reserved -= amount;
240 inner.amount -= amount;
241 }
242
243 pub fn give_back(&self, amount: u64) {
245 self.owner.borrow().release_reservation(self.owner_object_id, amount);
246 let mut inner = self.inner.lock();
247 inner.amount -= amount;
248 assert!(inner.reserved <= inner.amount);
249 }
250
251 pub fn move_to<V: Borrow<W> + Clone + Send + Sync, W: ReservationOwner + ?Sized>(
253 &self,
254 other: &ReservationImpl<V, W>,
255 amount: u64,
256 ) {
257 assert_eq!(self.owner_object_id, other.owner_object_id());
258 let mut inner = self.inner.lock();
259 if let Some(amount) = inner.amount.checked_sub(amount) {
260 inner.amount = amount;
261 } else {
262 std::mem::drop(inner);
263 panic!("Insufficient reservation space");
264 }
265 other.add(amount);
266 }
267}
268
269impl<T: Borrow<U>, U: ReservationOwner + ?Sized> Drop for ReservationImpl<T, U> {
270 fn drop(&mut self) {
271 let inner = self.inner.get_mut();
272 assert_eq!(inner.reserved, 0);
273 let owner_object_id = self.owner_object_id;
274 if inner.amount > 0 {
275 self.owner
276 .borrow()
277 .release_reservation(owner_object_id, std::mem::take(&mut inner.amount));
278 }
279 }
280}
281
282impl<T: Borrow<U> + Send + Sync, U: ReservationOwner + ?Sized> ReservationOwner
283 for ReservationImpl<T, U>
284{
285 fn release_reservation(&self, owner_object_id: Option<u64>, amount: u64) {
286 assert_eq!(owner_object_id, self.owner_object_id);
288 let mut inner = self.inner.lock();
289 assert!(inner.reserved >= amount, "{} >= {}", inner.reserved, amount);
290 inner.reserved -= amount;
291 }
292}
293
294pub type Reservation = ReservationImpl<Arc<dyn ReservationOwner>, dyn ReservationOwner>;
295
296pub type Hold<'a> = ReservationImpl<&'a Reservation, Reservation>;
297
298pub type AllocatorKey = AllocatorKeyV32;
301
302#[derive(
303 Clone,
304 Debug,
305 Deserialize,
306 Eq,
307 Hash,
308 Ord,
309 PartialEq,
310 PartialOrd,
311 Serialize,
312 TypeFingerprint,
313 SerializeKey,
314 Versioned,
315)]
316#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
317pub struct AllocatorKeyV32 {
318 pub device_range: Extent,
319}
320
321impl SortByU64 for AllocatorKey {
322 fn get_leading_u64(&self) -> u64 {
323 self.device_range.end
324 }
325}
326
327const EXTENT_HASH_BUCKET_SIZE: u64 = 1 * 1024 * 1024;
328
329pub struct AllocatorKeyPartitionIterator {
330 device_range: Range<u64>,
331}
332
333impl Iterator for AllocatorKeyPartitionIterator {
334 type Item = u64;
335
336 fn next(&mut self) -> Option<Self::Item> {
337 if self.device_range.start >= self.device_range.end {
338 None
339 } else {
340 let start = self.device_range.start;
341 self.device_range.start = start.saturating_add(EXTENT_HASH_BUCKET_SIZE);
342 let end = std::cmp::min(self.device_range.start, self.device_range.end);
343 let key = AllocatorKey { device_range: Extent(start..end) };
344 let hash = crate::stable_hash::stable_hash(key);
345 Some(hash)
346 }
347 }
348}
349
350impl FuzzyHash for AllocatorKey {
351 fn fuzzy_hash(&self) -> impl Iterator<Item = u64> {
352 AllocatorKeyPartitionIterator {
353 device_range: round_down(self.device_range.start, EXTENT_HASH_BUCKET_SIZE)
354 ..round_up(self.device_range.end, EXTENT_HASH_BUCKET_SIZE).unwrap_or(u64::MAX),
355 }
356 }
357
358 fn is_range_key(&self) -> bool {
359 true
360 }
361}
362
363impl AllocatorKey {
364 pub fn lower_bound_for_merge_into(self: &AllocatorKey) -> AllocatorKey {
366 AllocatorKey { device_range: Extent(0..self.device_range.start) }
367 }
368}
369
370impl LayerKey for AllocatorKey {
371 fn merge_type(&self) -> MergeType {
372 MergeType::OptimizedMerge
373 }
374
375 fn next_key(&self) -> Option<Self> {
376 Some(Self { device_range: Extent(0..self.device_range.end + 1) })
377 }
378
379 fn search_key(&self) -> Option<Self> {
380 Some(Self { device_range: Extent(0..self.device_range.start + 1) })
381 }
382
383 fn is_search_key(&self) -> bool {
384 self.device_range.start == 0
385 }
386
387 fn overlaps(&self, other: &Self) -> bool {
388 self.device_range.overlap(&other.device_range).is_some()
389 }
390}
391
392impl OrdUpperBound for AllocatorKey {
393 fn cmp_upper_bound(&self, other: &AllocatorKey) -> std::cmp::Ordering {
394 self.device_range
395 .end
396 .cmp(&other.device_range.end)
397 .then(self.device_range.start.cmp(&other.device_range.start))
398 }
399}
400
401impl OrdLowerBound for AllocatorKey {
402 fn cmp_lower_bound(&self, other: &AllocatorKey) -> std::cmp::Ordering {
403 self.device_range.cmp(&other.device_range)
408 }
409}
410
411pub type AllocatorValue = AllocatorValueV32;
414impl Value for AllocatorValue {
415 const DELETED_MARKER: Self = Self::None;
416}
417
418#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize, TypeFingerprint, Versioned)]
419#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
420pub enum AllocatorValueV32 {
421 None,
423 Abs { count: u64, owner_object_id: u64 },
427}
428
429pub type AllocatorItem = Item<AllocatorKey, AllocatorValue>;
430
431pub type AllocatorInfo = AllocatorInfoV32;
433
434#[derive(Debug, Default, Clone, Deserialize, Serialize, TypeFingerprint, Versioned)]
435pub struct AllocatorInfoV32 {
436 pub layers: Vec<u64>,
438 pub allocated_bytes: BTreeMap<u64, u64>,
440 pub marked_for_deletion: HashSet<u64>,
443 pub limit_bytes: BTreeMap<u64, u64>,
446}
447
448const MAX_ALLOCATOR_INFO_SERIALIZED_SIZE: usize = 131_072;
449
450pub fn max_extent_size_for_block_size(block_size: u64) -> u64 {
452 block_size * (DEFAULT_MAX_SERIALIZED_RECORD_SIZE - 64) / 9
457}
458
459#[derive(Default)]
460struct AllocatorCounters {
461 num_flushes: u64,
462 last_flush_time: Option<std::time::SystemTime>,
463}
464
465pub struct Allocator {
466 filesystem: Weak<FxFilesystem>,
467 block_size: u64,
468 device_size: u64,
469 object_id: u64,
470 max_extent_size_bytes: u64,
471 tree: LSMTree<AllocatorKey, AllocatorValue>,
472 temporary_allocations: Arc<SkipListLayer<AllocatorKey, AllocatorValue>>,
479 inner: Mutex<Inner>,
480 allocation_mutex: futures::lock::Mutex<()>,
481 counters: Mutex<AllocatorCounters>,
482 maximum_offset: AtomicU64,
483 allocations_allowed: AtomicBool,
484}
485
486#[derive(Debug, Default, PartialEq)]
488struct ByteTracking {
489 allocated_bytes: Saturating<u64>,
496
497 uncommitted_allocated_bytes: u64,
500
501 reserved_bytes: u64,
503
504 committed_deallocated_bytes: u64,
508}
509
510impl ByteTracking {
511 fn used_bytes(&self) -> Saturating<u64> {
514 self.allocated_bytes + Saturating(self.uncommitted_allocated_bytes + self.reserved_bytes)
515 }
516
517 fn unavailable_bytes(&self) -> Saturating<u64> {
522 self.allocated_bytes
523 + Saturating(self.uncommitted_allocated_bytes)
524 + Saturating(self.committed_deallocated_bytes)
525 }
526
527 fn unavailable_after_sync_bytes(&self) -> Saturating<u64> {
530 self.allocated_bytes + Saturating(self.uncommitted_allocated_bytes)
531 }
532}
533
534#[derive(Debug)]
535struct CommittedDeallocation {
536 log_file_offset: u64,
538 range: Range<u64>,
540 owner_object_id: u64,
542}
543
544struct Inner {
545 info: AllocatorInfo,
546
547 opened: bool,
550
551 dropped_temporary_allocations: Vec<Range<u64>>,
579
580 owner_bytes: BTreeMap<u64, ByteTracking>,
583
584 unattributed_reserved_bytes: u64,
587
588 committed_deallocated: VecDeque<CommittedDeallocation>,
590
591 trim_reserved_bytes: u64,
594
595 trim_listener: Option<EventListener>,
599
600 strategy: strategy::BestFit,
602
603 allocation_size_histogram: [u64; 64],
605 rebuild_strategy_trigger_histogram: [u64; 64],
607
608 marked_for_deletion: HashSet<u64>,
612
613 volumes_deleted_pending_sync: HashSet<u64>,
615}
616
617impl Inner {
618 fn allocated_bytes(&self) -> Saturating<u64> {
619 let mut total = Saturating(0);
620 for (_, bytes) in &self.owner_bytes {
621 total += bytes.allocated_bytes;
622 }
623 total
624 }
625
626 fn uncommitted_allocated_bytes(&self) -> u64 {
627 self.owner_bytes.values().map(|x| &x.uncommitted_allocated_bytes).sum()
628 }
629
630 fn reserved_bytes(&self) -> u64 {
631 self.owner_bytes.values().map(|x| &x.reserved_bytes).sum::<u64>()
632 + self.unattributed_reserved_bytes
633 }
634
635 fn owner_id_limit_bytes(&self, owner_object_id: u64) -> u64 {
636 match self.info.limit_bytes.get(&owner_object_id) {
637 Some(v) => *v,
638 None => u64::MAX,
639 }
640 }
641
642 fn owner_id_bytes_left(&self, owner_object_id: u64) -> u64 {
643 let limit = self.owner_id_limit_bytes(owner_object_id);
644 let used = self.owner_bytes.get(&owner_object_id).map_or(Saturating(0), |b| b.used_bytes());
645 (Saturating(limit) - used).0
646 }
647
648 fn unavailable_bytes(&self) -> Saturating<u64> {
653 let mut total = Saturating(0);
654 for (_, bytes) in &self.owner_bytes {
655 total += bytes.unavailable_bytes();
656 }
657 total
658 }
659
660 fn used_bytes(&self) -> Saturating<u64> {
663 let mut total = Saturating(0);
664 for (_, bytes) in &self.owner_bytes {
665 total += bytes.used_bytes();
666 }
667 total + Saturating(self.unattributed_reserved_bytes)
668 }
669
670 fn unavailable_after_sync_bytes(&self) -> Saturating<u64> {
673 let mut total = Saturating(0);
674 for (_, bytes) in &self.owner_bytes {
675 total += bytes.unavailable_after_sync_bytes();
676 }
677 total
678 }
679
680 fn bytes_available_not_being_trimmed(&self, device_size: u64) -> Result<u64, Error> {
683 device_size
684 .checked_sub(
685 (self.unavailable_after_sync_bytes() + Saturating(self.trim_reserved_bytes)).0,
686 )
687 .ok_or_else(|| anyhow!(FxfsError::Inconsistent))
688 }
689
690 fn add_reservation(&mut self, owner_object_id: Option<u64>, amount: u64) {
691 match owner_object_id {
692 Some(owner) => self.owner_bytes.entry(owner).or_default().reserved_bytes += amount,
693 None => self.unattributed_reserved_bytes += amount,
694 };
695 }
696
697 fn remove_reservation(&mut self, owner_object_id: Option<u64>, amount: u64) {
698 match owner_object_id {
699 Some(owner) => {
700 let owner_entry = self.owner_bytes.entry(owner).or_default();
701 assert!(
702 owner_entry.reserved_bytes >= amount,
703 "{} >= {}",
704 owner_entry.reserved_bytes,
705 amount
706 );
707 owner_entry.reserved_bytes -= amount;
708 }
709 None => {
710 assert!(
711 self.unattributed_reserved_bytes >= amount,
712 "{} >= {}",
713 self.unattributed_reserved_bytes,
714 amount
715 );
716 self.unattributed_reserved_bytes -= amount
717 }
718 };
719 }
720}
721
722pub struct TrimmableExtents<'a> {
725 allocator: &'a Allocator,
726 extents: Vec<Range<u64>>,
727 _drop_event: DropEvent,
731}
732
733impl<'a> TrimmableExtents<'a> {
734 pub fn extents(&self) -> &Vec<Range<u64>> {
735 &self.extents
736 }
737
738 fn new(allocator: &'a Allocator) -> (Self, EventListener) {
740 let drop_event = DropEvent::new();
741 let listener = drop_event.listen();
742 (Self { allocator, extents: vec![], _drop_event: drop_event }, listener)
743 }
744
745 fn add_extent(&mut self, extent: Range<u64>) {
746 self.extents.push(extent);
747 }
748}
749
750impl<'a> Drop for TrimmableExtents<'a> {
751 fn drop(&mut self) {
752 let mut inner = self.allocator.inner.lock();
753 for device_range in std::mem::take(&mut self.extents) {
754 inner.strategy.free(device_range.clone()).expect("drop trim extent");
755 self.allocator
756 .temporary_allocations
757 .erase(&AllocatorKey { device_range: Extent(device_range.clone()) });
758 }
759 inner.trim_reserved_bytes = 0;
760 }
761}
762
763impl Allocator {
764 pub fn new(filesystem: Arc<FxFilesystem>, object_id: u64) -> Allocator {
765 let block_size = filesystem.block_size();
766 let device_size = round_down(filesystem.device().size(), block_size);
768 if device_size != filesystem.device().size() {
769 warn!("Device size is not block aligned. Rounding down.");
770 }
771 let max_extent_size_bytes = max_extent_size_for_block_size(filesystem.block_size());
772 let mut strategy = strategy::BestFit::default();
773 strategy.free(0..device_size).expect("new fs");
774 Allocator {
775 filesystem: Arc::downgrade(&filesystem),
776 block_size,
777 device_size,
778 object_id,
779 max_extent_size_bytes,
780 tree: LSMTree::new(merge, Box::new(NullCache {})),
781 temporary_allocations: SkipListLayer::new(1024),
782 inner: Mutex::new(Inner {
783 info: AllocatorInfo::default(),
784 opened: false,
785 dropped_temporary_allocations: Vec::new(),
786 owner_bytes: BTreeMap::new(),
787 unattributed_reserved_bytes: 0,
788 committed_deallocated: VecDeque::new(),
789 trim_reserved_bytes: 0,
790 trim_listener: None,
791 strategy,
792 allocation_size_histogram: [0; 64],
793 rebuild_strategy_trigger_histogram: [0; 64],
794 marked_for_deletion: HashSet::new(),
795 volumes_deleted_pending_sync: HashSet::new(),
796 }),
797 allocation_mutex: futures::lock::Mutex::new(()),
798 counters: Mutex::new(AllocatorCounters::default()),
799 maximum_offset: AtomicU64::new(0),
800 allocations_allowed: AtomicBool::new(filesystem.options().image_builder_mode.is_none()),
801 }
802 }
803
804 pub fn tree(&self) -> &LSMTree<AllocatorKey, AllocatorValue> {
805 &self.tree
806 }
807
808 pub fn enable_allocations(&self) {
811 self.allocations_allowed.store(true, Ordering::SeqCst);
812 }
813
814 pub async fn filter(
819 &self,
820 iter: impl LayerIterator<AllocatorKey, AllocatorValue>,
821 committed_marked_for_deletion: bool,
822 ) -> Result<impl LayerIterator<AllocatorKey, AllocatorValue>, Error> {
823 let marked_for_deletion = {
824 let inner = self.inner.lock();
825 if committed_marked_for_deletion {
826 &inner.info.marked_for_deletion
827 } else {
828 &inner.marked_for_deletion
829 }
830 .clone()
831 };
832 let iter =
833 filter_marked_for_deletion(filter_tombstones(iter).await?, marked_for_deletion).await?;
834 Ok(iter)
835 }
836
837 pub fn allocation_size_histogram(&self) -> [u64; 64] {
841 self.inner.lock().allocation_size_histogram
842 }
843
844 pub async fn create(&self, transaction: &mut Transaction<'_>) -> Result<(), Error> {
846 assert_eq!(std::mem::replace(&mut self.inner.lock().opened, true), false);
849
850 let filesystem = self.filesystem.upgrade().unwrap();
851 let root_store = filesystem.root_store();
852 ObjectStore::create_object_with_id(
853 &root_store,
854 transaction,
855 ReservedId::new(&root_store, NonZero::new(self.object_id()).unwrap()),
856 HandleOptions::default(),
857 None,
858 )?;
859 root_store.update_last_object_id(self.object_id());
860 Ok(())
861 }
862
863 pub async fn open(self: &Arc<Self>) -> Result<(), Error> {
866 let filesystem = self.filesystem.upgrade().unwrap();
867 let root_store = filesystem.root_store();
868
869 self.inner.lock().strategy = strategy::BestFit::default();
870
871 let handle =
872 ObjectStore::open_object(&root_store, self.object_id, HandleOptions::default(), None)
873 .await
874 .context("Failed to open allocator object")?;
875
876 if handle.get_size() > 0 {
877 let serialized_info = handle
878 .contents(MAX_ALLOCATOR_INFO_SERIALIZED_SIZE)
879 .await
880 .context("Failed to read AllocatorInfo")?;
881 let mut cursor = std::io::Cursor::new(serialized_info);
882 let (info, _version) = AllocatorInfo::deserialize_with_version(&mut cursor)
883 .context("Failed to deserialize AllocatorInfo")?;
884
885 let mut handles = Vec::new();
886 let mut total_size = 0;
887 for object_id in &info.layers {
888 let handle = ObjectStore::open_object(
889 &root_store,
890 *object_id,
891 HandleOptions::default(),
892 None,
893 )
894 .await
895 .context("Failed to open allocator layer file")?;
896
897 let size = handle.get_size();
898 total_size += size;
899 handles.push(handle);
900 }
901
902 {
903 let mut inner = self.inner.lock();
904
905 let mut device_bytes = self.device_size;
907 for (&owner_object_id, &bytes) in &info.allocated_bytes {
908 ensure!(
909 bytes <= device_bytes,
910 anyhow!(FxfsError::Inconsistent).context(format!(
911 "Allocated bytes exceeds device size: {:?}",
912 info.allocated_bytes
913 ))
914 );
915 device_bytes -= bytes;
916
917 inner.owner_bytes.entry(owner_object_id).or_default().allocated_bytes =
918 Saturating(bytes);
919 }
920
921 inner.info = info;
922 }
923
924 self.tree.append_layers(handles).await.context("Failed to append allocator layers")?;
925 self.filesystem.upgrade().unwrap().object_manager().update_reservation(
926 self.object_id,
927 tree::reservation_amount_from_layer_size(total_size),
928 );
929 }
930
931 Ok(())
932 }
933
934 pub async fn on_replay_complete(self: &Arc<Self>) -> Result<(), Error> {
935 {
937 let mut inner = self.inner.lock();
938 inner.volumes_deleted_pending_sync.clear();
939 inner.marked_for_deletion = inner.info.marked_for_deletion.clone();
940 }
941
942 if !self.rebuild_strategy().await.context("Build free extents")? {
946 if self.filesystem.upgrade().unwrap().options().read_only {
947 info!("Device contains no free space (read-only mode).");
948 } else {
949 info!("Device contains no free space.");
950 return Err(FxfsError::Inconsistent)
951 .context("Device appears to contain no free space");
952 }
953 }
954
955 assert_eq!(std::mem::replace(&mut self.inner.lock().opened, true), false);
956 Ok(())
957 }
958
959 async fn rebuild_strategy(self: &Arc<Self>) -> Result<bool, Error> {
964 let mut changed = false;
965 let mut layer_set = self.tree.empty_layer_set();
966 layer_set.layers.push((self.temporary_allocations.clone() as Arc<dyn Layer<_, _>>).into());
967 self.tree.add_all_layers_to_layer_set(&mut layer_set);
968
969 let overflow_markers = self.inner.lock().strategy.overflow_markers();
970 self.inner.lock().strategy.reset_overflow_markers();
971
972 let mut to_add = Vec::new();
973 let mut merger = layer_set.merger();
974 let mut iter = self.filter(merger.query(Query::FullScan).await?, false).await?;
975 let mut last_offset = 0;
976 while last_offset < self.device_size {
977 let next_range = match iter.get() {
978 None => {
979 assert!(last_offset <= self.device_size);
980 let range = last_offset..self.device_size;
981 last_offset = self.device_size;
982 iter.advance().await?;
983 range
984 }
985 Some(ItemRef { key, .. }) => {
986 let device_range = &key.device_range;
987 if device_range.end < last_offset {
988 iter.advance().await?;
989 continue;
990 }
991 if device_range.start <= last_offset {
992 last_offset = device_range.end;
993 iter.advance().await?;
994 continue;
995 }
996 let range = last_offset..device_range.start;
997 last_offset = device_range.end;
998 iter.advance().await?;
999 range
1000 }
1001 };
1002 to_add.push(next_range);
1003 if to_add.len() > 100 {
1005 let mut inner = self.inner.lock();
1006 for range in to_add.drain(..) {
1007 changed |= inner.strategy.force_free(range)?;
1008 }
1009 }
1010 }
1011 let mut inner = self.inner.lock();
1012 for range in to_add {
1013 changed |= inner.strategy.force_free(range)?;
1014 }
1015 if overflow_markers != inner.strategy.overflow_markers() {
1016 changed = true;
1017 }
1018 Ok(changed)
1019 }
1020
1021 pub async fn take_for_trimming(
1025 &self,
1026 offset: u64,
1027 max_extent_size: usize,
1028 extents_per_batch: usize,
1029 ) -> Result<TrimmableExtents<'_>, Error> {
1030 let _guard = self.allocation_mutex.lock().await;
1031
1032 let (mut result, listener) = TrimmableExtents::new(self);
1033 if offset >= self.device_size {
1034 return Ok(result);
1035 }
1036 let mut bytes = 0;
1037
1038 let mut layer_set = self.tree.empty_layer_set();
1041 layer_set.layers.push((self.temporary_allocations.clone() as Arc<dyn Layer<_, _>>).into());
1042 self.tree.add_all_layers_to_layer_set(&mut layer_set);
1043 let mut merger = layer_set.merger();
1044 let mut iter = self
1045 .filter(
1046 merger
1047 .query(Query::FullRange(&AllocatorKey {
1048 device_range: Extent::search_key_from_offset(offset),
1049 }))
1050 .await?,
1051 false,
1052 )
1053 .await?;
1054 let mut last_offset = offset;
1055 'outer: while last_offset < self.device_size {
1056 let mut range = match iter.get() {
1057 None => {
1058 assert!(last_offset <= self.device_size);
1059 let range = last_offset..self.device_size;
1060 last_offset = self.device_size;
1061 iter.advance().await?;
1062 range
1063 }
1064 Some(ItemRef { key: AllocatorKey { device_range, .. }, .. }) => {
1065 if device_range.end <= last_offset {
1066 iter.advance().await?;
1067 continue;
1068 }
1069 if device_range.start <= last_offset {
1070 last_offset = device_range.end;
1071 iter.advance().await?;
1072 continue;
1073 }
1074 let range = last_offset..device_range.start;
1075 last_offset = device_range.end;
1076 iter.advance().await?;
1077 range
1078 }
1079 };
1080 if range.start < offset {
1081 continue;
1082 }
1083
1084 let mut inner = self.inner.lock();
1087
1088 while range.start < range.end {
1089 let prefix =
1090 range.start..std::cmp::min(range.start + max_extent_size as u64, range.end);
1091 range = prefix.end..range.end;
1092 bytes += prefix.length()?;
1093 inner.strategy.remove(prefix.clone());
1096 self.temporary_allocations.insert(AllocatorItem {
1097 key: AllocatorKey { device_range: Extent(prefix.clone()) },
1098 value: AllocatorValue::Abs { owner_object_id: INVALID_OBJECT_ID, count: 1 },
1099 })?;
1100 result.add_extent(prefix);
1101 if result.extents.len() == extents_per_batch {
1102 break 'outer;
1103 }
1104 }
1105 if result.extents.len() == extents_per_batch {
1106 break 'outer;
1107 }
1108 }
1109 {
1110 let mut inner = self.inner.lock();
1111
1112 assert!(inner.trim_reserved_bytes == 0, "Multiple trims ongoing");
1113 inner.trim_listener = Some(listener);
1114 inner.trim_reserved_bytes = bytes;
1115 debug_assert!(
1116 (Saturating(inner.trim_reserved_bytes) + inner.unavailable_bytes()).0
1117 <= self.device_size
1118 );
1119 }
1120 Ok(result)
1121 }
1122
1123 pub fn parent_objects(&self) -> Vec<u64> {
1125 self.inner.lock().info.layers.clone()
1128 }
1129
1130 pub fn owner_byte_limits(&self) -> Vec<(u64, u64)> {
1132 self.inner.lock().info.limit_bytes.iter().map(|(k, v)| (*k, *v)).collect()
1133 }
1134
1135 pub fn owner_allocation_info(&self, owner_object_id: u64) -> (u64, Option<u64>) {
1137 let inner = self.inner.lock();
1138 (
1139 inner.owner_bytes.get(&owner_object_id).map(|b| b.used_bytes().0).unwrap_or(0u64),
1140 inner.info.limit_bytes.get(&owner_object_id).copied(),
1141 )
1142 }
1143
1144 pub fn owner_bytes_debug(&self) -> String {
1146 format!("{:?}", self.inner.lock().owner_bytes)
1147 }
1148
1149 fn needs_sync(&self) -> bool {
1150 let inner = self.inner.lock();
1155 inner.unavailable_bytes().0 >= self.device_size
1156 }
1157
1158 fn is_system_store(&self, owner_object_id: u64) -> bool {
1159 let fs = self.filesystem.upgrade().unwrap();
1160 owner_object_id == fs.object_manager().root_store_object_id()
1161 || owner_object_id == fs.object_manager().root_parent_store_object_id()
1162 }
1163
1164 pub fn disown_reservation(&self, old_owner_object_id: Option<u64>, amount: u64) {
1167 if old_owner_object_id.is_none() || amount == 0 {
1168 return;
1169 }
1170 let mut inner = self.inner.lock();
1172 inner.remove_reservation(old_owner_object_id, amount);
1173 inner.add_reservation(None, amount);
1174 }
1175
1176 pub fn track_statistics(self: &Arc<Self>, parent: &fuchsia_inspect::Node, name: &str) {
1179 let this = Arc::downgrade(self);
1180 parent.record_lazy_child(name, move || {
1181 let this_clone = this.clone();
1182 async move {
1183 let inspector = fuchsia_inspect::Inspector::default();
1184 if let Some(this) = this_clone.upgrade() {
1185 let counters = this.counters.lock();
1186 let root = inspector.root();
1187 root.record_uint("max_extent_size_bytes", this.max_extent_size_bytes);
1188 root.record_uint("bytes_total", this.device_size);
1189 let (allocated, reserved, used, unavailable) = {
1190 let inner = this.inner.lock();
1192 (
1193 inner.allocated_bytes().0,
1194 inner.reserved_bytes(),
1195 inner.used_bytes().0,
1196 inner.unavailable_bytes().0,
1197 )
1198 };
1199 root.record_uint("bytes_allocated", allocated);
1200 root.record_uint("bytes_reserved", reserved);
1201 root.record_uint("bytes_used", used);
1202 root.record_uint("bytes_unavailable", unavailable);
1203
1204 if let Some(x) = round_div(100 * allocated, this.device_size) {
1207 root.record_uint("bytes_allocated_percent", x);
1208 }
1209 if let Some(x) = round_div(100 * reserved, this.device_size) {
1210 root.record_uint("bytes_reserved_percent", x);
1211 }
1212 if let Some(x) = round_div(100 * used, this.device_size) {
1213 root.record_uint("bytes_used_percent", x);
1214 }
1215 if let Some(x) = round_div(100 * unavailable, this.device_size) {
1216 root.record_uint("bytes_unavailable_percent", x);
1217 }
1218
1219 root.record_uint("num_flushes", counters.num_flushes);
1220 if let Some(last_flush_time) = counters.last_flush_time.as_ref() {
1221 root.record_uint(
1222 "last_flush_time_ms",
1223 last_flush_time
1224 .duration_since(std::time::UNIX_EPOCH)
1225 .unwrap_or(std::time::Duration::ZERO)
1226 .as_millis()
1227 .try_into()
1228 .unwrap_or(0u64),
1229 );
1230 }
1231
1232 let data = this.allocation_size_histogram();
1233 let alloc_sizes = root.create_uint_linear_histogram(
1234 "allocation_size_histogram",
1235 fuchsia_inspect::LinearHistogramParams {
1236 floor: 1,
1237 step_size: 1,
1238 buckets: 64,
1239 },
1240 );
1241 for (i, count) in data.iter().enumerate() {
1242 if i != 0 {
1243 alloc_sizes.insert_multiple(i as u64, *count as usize);
1244 }
1245 }
1246 root.record(alloc_sizes);
1247
1248 let data = this.inner.lock().rebuild_strategy_trigger_histogram;
1249 let triggers = root.create_uint_linear_histogram(
1250 "rebuild_strategy_triggers",
1251 fuchsia_inspect::LinearHistogramParams {
1252 floor: 1,
1253 step_size: 1,
1254 buckets: 64,
1255 },
1256 );
1257 for (i, count) in data.iter().enumerate() {
1258 if i != 0 {
1259 triggers.insert_multiple(i as u64, *count as usize);
1260 }
1261 }
1262 root.record(triggers);
1263
1264 let allocator_ref = this.clone();
1265 root.record_child("lsm_tree", move |node| {
1266 allocator_ref.tree.record_inspect_data(node);
1267 });
1268 }
1269 Ok(inspector)
1270 }
1271 .boxed()
1272 });
1273 }
1274
1275 pub fn maximum_offset(&self) -> u64 {
1280 self.maximum_offset.load(Ordering::Relaxed)
1281 }
1282}
1283
1284impl Drop for Allocator {
1285 fn drop(&mut self) {
1286 let inner = self.inner.lock();
1287 assert_eq!(inner.uncommitted_allocated_bytes(), 0);
1289 assert_eq!(inner.reserved_bytes(), 0);
1290 }
1291}
1292
1293#[fxfs_trace::trace]
1294impl Allocator {
1295 pub fn object_id(&self) -> u64 {
1297 self.object_id
1298 }
1299
1300 pub fn info(&self) -> AllocatorInfo {
1303 self.inner.lock().info.clone()
1304 }
1305
1306 #[trace]
1314 pub async fn allocate(
1315 self: &Arc<Self>,
1316 transaction: &mut Transaction<'_>,
1317 owner_object_id: u64,
1318 mut len: u64,
1319 ) -> Result<Range<u64>, Error> {
1320 ensure!(self.allocations_allowed.load(Ordering::SeqCst), FxfsError::Unavailable);
1321 assert_eq!(len % self.block_size, 0);
1322 len = std::cmp::min(len, self.max_extent_size_bytes);
1323 debug_assert_ne!(owner_object_id, INVALID_OBJECT_ID);
1324
1325 let reservation = if let Some(reservation) = transaction.allocator_reservation {
1327 match reservation.owner_object_id {
1328 None => assert!(self.is_system_store(owner_object_id)),
1330 Some(res_owner_object_id) => assert_eq!(owner_object_id, res_owner_object_id),
1332 };
1333 let r = reservation
1335 .reserve_with(|limit| std::cmp::min(len, round_down(limit, self.block_size)));
1336 len = r.amount();
1337 Left(r)
1338 } else {
1339 let mut inner = self.inner.lock();
1340 assert!(inner.opened);
1341 let device_used = inner.used_bytes();
1343 let owner_bytes_left = inner.owner_id_bytes_left(owner_object_id);
1344 let limit =
1346 std::cmp::min(owner_bytes_left, (Saturating(self.device_size) - device_used).0);
1347 len = round_down(std::cmp::min(len, limit), self.block_size);
1348 let owner_entry = inner.owner_bytes.entry(owner_object_id).or_default();
1349 owner_entry.reserved_bytes += len;
1350 Right(ReservationImpl::<_, Self>::new(&**self, Some(owner_object_id), len))
1351 };
1352
1353 ensure!(len > 0, FxfsError::NoSpace);
1354
1355 let volumes_deleted = {
1357 let inner = self.inner.lock();
1358 (!inner.volumes_deleted_pending_sync.is_empty())
1359 .then(|| inner.volumes_deleted_pending_sync.clone())
1360 };
1361
1362 if let Some(volumes_deleted) = volumes_deleted {
1363 self.filesystem
1366 .upgrade()
1367 .unwrap()
1368 .sync(SyncOptions {
1369 flush_device: true,
1370 precondition: Some(Box::new(|| {
1371 !self.inner.lock().volumes_deleted_pending_sync.is_empty()
1372 })),
1373 ..Default::default()
1374 })
1375 .await?;
1376
1377 {
1378 let mut inner = self.inner.lock();
1379 for owner_id in volumes_deleted {
1380 inner.volumes_deleted_pending_sync.remove(&owner_id);
1381 inner.marked_for_deletion.insert(owner_id);
1382 }
1383 }
1384
1385 let _guard = self.allocation_mutex.lock().await;
1386 self.rebuild_strategy().await?;
1387 }
1388
1389 #[allow(clippy::never_loop)] let _guard = 'sync: loop {
1391 for _ in 0..10 {
1393 {
1394 let guard = self.allocation_mutex.lock().await;
1395
1396 if !self.needs_sync() {
1397 break 'sync guard;
1398 }
1399 }
1400
1401 self.filesystem
1411 .upgrade()
1412 .unwrap()
1413 .sync(SyncOptions {
1414 flush_device: true,
1415 precondition: Some(Box::new(|| self.needs_sync())),
1416 ..Default::default()
1417 })
1418 .await?;
1419 }
1420 bail!(
1421 anyhow!(FxfsError::NoSpace).context("Sync failed to yield sufficient free space.")
1422 );
1423 };
1424
1425 let mut trim_listener = None;
1426 {
1427 let mut inner = self.inner.lock();
1428 inner.allocation_size_histogram[std::cmp::min(63, len / self.block_size) as usize] += 1;
1429
1430 let avail = self
1433 .device_size
1434 .checked_sub(inner.unavailable_bytes().0)
1435 .ok_or(FxfsError::Inconsistent)?;
1436 let free_and_not_being_trimmed =
1437 inner.bytes_available_not_being_trimmed(self.device_size)?;
1438 if free_and_not_being_trimmed < std::cmp::min(len, avail) {
1439 debug_assert!(inner.trim_reserved_bytes > 0);
1440 trim_listener = std::mem::take(&mut inner.trim_listener);
1441 }
1442 }
1443
1444 if let Some(listener) = trim_listener {
1445 listener.await;
1446 }
1447
1448 let result = loop {
1449 {
1450 let mut inner = self.inner.lock();
1451
1452 for device_range in inner.dropped_temporary_allocations.drain(..) {
1455 self.temporary_allocations
1456 .erase(&AllocatorKey { device_range: Extent(device_range) });
1457 }
1458
1459 match inner.strategy.allocate(len) {
1460 Err(FxfsError::NotFound) => {
1461 inner.rebuild_strategy_trigger_histogram
1463 [std::cmp::min(63, (len / self.block_size) as usize)] += 1;
1464 }
1465 Err(err) => {
1466 error!(err:%; "Likely filesystem corruption.");
1467 return Err(err.into());
1468 }
1469 Ok(x) => {
1470 break x;
1471 }
1472 }
1473 }
1474 if !self.rebuild_strategy().await? {
1478 error!("Cannot find additional free space. Corruption?");
1479 return Err(FxfsError::Inconsistent.into());
1480 }
1481 };
1482
1483 debug!(device_range:? = result; "allocate");
1484
1485 let len = result.length().unwrap();
1486 let reservation_owner = reservation.either(
1487 |l| {
1489 l.forget_some(len);
1490 l.owner_object_id()
1491 },
1492 |r| {
1493 r.forget_some(len);
1494 r.owner_object_id()
1495 },
1496 );
1497
1498 {
1499 let mut inner = self.inner.lock();
1500 let owner_entry = inner.owner_bytes.entry(owner_object_id).or_default();
1501 owner_entry.uncommitted_allocated_bytes += len;
1502 assert_eq!(owner_object_id, reservation_owner.unwrap_or(owner_object_id));
1504 inner.remove_reservation(reservation_owner, len);
1505 self.temporary_allocations.insert(AllocatorItem {
1506 key: AllocatorKey { device_range: Extent(result.clone()) },
1507 value: AllocatorValue::Abs { owner_object_id, count: 1 },
1508 })?;
1509 }
1510
1511 let mutation =
1512 AllocatorMutation::Allocate { device_range: result.clone().into(), owner_object_id };
1513 assert!(transaction.add(self.object_id(), Mutation::Allocator(mutation)).is_none());
1514
1515 Ok(result)
1516 }
1517
1518 #[trace]
1521 pub fn mark_allocated(
1522 &self,
1523 transaction: &mut Transaction<'_>,
1524 owner_object_id: u64,
1525 device_range: Range<u64>,
1526 ) -> Result<(), Error> {
1527 debug_assert_ne!(owner_object_id, INVALID_OBJECT_ID);
1528 {
1529 let len = device_range.length().map_err(|_| FxfsError::InvalidArgs)?;
1530
1531 let mut inner = self.inner.lock();
1532 let device_used = inner.used_bytes();
1533 let owner_id_bytes_left = inner.owner_id_bytes_left(owner_object_id);
1534 let owner_entry = inner.owner_bytes.entry(owner_object_id).or_default();
1535 ensure!(
1536 device_range.end <= self.device_size
1537 && (Saturating(self.device_size) - device_used).0 >= len
1538 && owner_id_bytes_left >= len,
1539 FxfsError::NoSpace
1540 );
1541 if let Some(reservation) = &mut transaction.allocator_reservation {
1542 reservation.reserve(len).ok_or(FxfsError::NoSpace)?.forget();
1544 }
1545 owner_entry.uncommitted_allocated_bytes += len;
1546 inner.strategy.remove(device_range.clone());
1547 self.temporary_allocations.insert(AllocatorItem {
1548 key: AllocatorKey { device_range: Extent(device_range.clone()) },
1549 value: AllocatorValue::Abs { owner_object_id, count: 1 },
1550 })?;
1551 }
1552 let mutation =
1553 AllocatorMutation::Allocate { device_range: device_range.into(), owner_object_id };
1554 transaction.add(self.object_id(), Mutation::Allocator(mutation));
1555 Ok(())
1556 }
1557
1558 pub fn set_bytes_limit(
1560 &self,
1561 transaction: &mut Transaction<'_>,
1562 owner_object_id: u64,
1563 bytes: u64,
1564 ) -> Result<(), Error> {
1565 assert!(!self.is_system_store(owner_object_id));
1567 transaction.add(
1568 self.object_id(),
1569 Mutation::Allocator(AllocatorMutation::SetLimit { owner_object_id, bytes }),
1570 );
1571 Ok(())
1572 }
1573
1574 pub fn get_owner_bytes_limit(&self, owner_object_id: u64) -> Option<u64> {
1576 self.inner.lock().info.limit_bytes.get(&owner_object_id).copied()
1577 }
1578
1579 pub fn get_owner_bytes_used(&self, owner_object_id: u64) -> u64 {
1582 self.inner.lock().owner_bytes.get(&owner_object_id).map_or(0, |info| info.used_bytes().0)
1583 }
1584
1585 #[trace]
1587 pub async fn deallocate(
1588 &self,
1589 transaction: &mut Transaction<'_>,
1590 owner_object_id: u64,
1591 dealloc_range: Range<u64>,
1592 ) -> Result<u64, Error> {
1593 debug!(device_range:? = dealloc_range; "deallocate");
1594 ensure!(dealloc_range.is_valid(), FxfsError::InvalidArgs);
1595 let deallocated = dealloc_range.end - dealloc_range.start;
1599 let mutation = AllocatorMutation::Deallocate {
1600 device_range: dealloc_range.clone().into(),
1601 owner_object_id,
1602 };
1603 transaction.add(self.object_id(), Mutation::Allocator(mutation));
1604
1605 let _guard = self.allocation_mutex.lock().await;
1606
1607 let mut inner = self.inner.lock();
1619 for device_range in inner.dropped_temporary_allocations.drain(..) {
1620 self.temporary_allocations.erase(&AllocatorKey { device_range: Extent(device_range) });
1621 }
1622
1623 self.temporary_allocations
1628 .insert(AllocatorItem {
1629 key: AllocatorKey { device_range: Extent(dealloc_range.clone()) },
1630 value: AllocatorValue::Abs { owner_object_id, count: 1 },
1631 })
1632 .context("tracking deallocated")?;
1633
1634 Ok(deallocated)
1635 }
1636
1637 pub fn mark_for_deletion(&self, transaction: &mut Transaction<'_>, owner_object_id: u64) {
1654 transaction.add(
1657 self.object_id(),
1658 Mutation::Allocator(AllocatorMutation::MarkForDeletion(owner_object_id)),
1659 );
1660 }
1661
1662 pub fn did_flush_device(&self, flush_log_offset: u64) {
1665 #[allow(clippy::never_loop)] let deallocs = 'deallocs_outer: loop {
1670 let mut inner = self.inner.lock();
1671 for (index, dealloc) in inner.committed_deallocated.iter().enumerate() {
1672 if dealloc.log_file_offset >= flush_log_offset {
1673 let mut deallocs = inner.committed_deallocated.split_off(index);
1674 std::mem::swap(&mut inner.committed_deallocated, &mut deallocs);
1676 break 'deallocs_outer deallocs;
1677 }
1678 }
1679 break std::mem::take(&mut inner.committed_deallocated);
1680 };
1681
1682 let mut inner = self.inner.lock();
1684 let mut totals = BTreeMap::<u64, u64>::new();
1685 for dealloc in deallocs {
1686 *(totals.entry(dealloc.owner_object_id).or_default()) +=
1687 dealloc.range.length().unwrap();
1688 inner.strategy.free(dealloc.range.clone()).expect("dealloced ranges");
1689 self.temporary_allocations
1690 .erase(&AllocatorKey { device_range: Extent(dealloc.range.clone()) });
1691 }
1692
1693 for (owner_object_id, total) in totals {
1697 match inner.owner_bytes.get_mut(&owner_object_id) {
1698 Some(counters) => counters.committed_deallocated_bytes -= total,
1699 None => {
1700 assert!(inner.volumes_deleted_pending_sync.contains(&owner_object_id));
1703 }
1704 }
1705 }
1706 }
1707
1708 pub fn reserve(
1711 self: Arc<Self>,
1712 owner_object_id: Option<u64>,
1713 amount: u64,
1714 ) -> Option<Reservation> {
1715 {
1716 let mut inner = self.inner.lock();
1717
1718 let device_free = (Saturating(self.device_size) - inner.used_bytes()).0;
1719
1720 let limit = match owner_object_id {
1721 Some(id) => std::cmp::min(inner.owner_id_bytes_left(id), device_free),
1722 None => device_free,
1723 };
1724 if limit < amount {
1725 return None;
1726 }
1727 inner.add_reservation(owner_object_id, amount);
1728 }
1729 Some(Reservation::new(self, owner_object_id, amount))
1730 }
1731
1732 pub fn reserve_with(
1735 self: Arc<Self>,
1736 owner_object_id: Option<u64>,
1737 amount: impl FnOnce(u64) -> u64,
1738 ) -> Reservation {
1739 let amount = {
1740 let mut inner = self.inner.lock();
1741
1742 let device_free = (Saturating(self.device_size) - inner.used_bytes()).0;
1743
1744 let amount = amount(match owner_object_id {
1745 Some(id) => std::cmp::min(inner.owner_id_bytes_left(id), device_free),
1746 None => device_free,
1747 });
1748
1749 inner.add_reservation(owner_object_id, amount);
1750
1751 amount
1752 };
1753
1754 Reservation::new(self, owner_object_id, amount)
1755 }
1756
1757 pub fn get_allocated_bytes(&self) -> u64 {
1759 self.inner.lock().allocated_bytes().0
1760 }
1761
1762 pub fn get_disk_bytes(&self) -> u64 {
1764 self.device_size
1765 }
1766
1767 pub fn get_owner_allocated_bytes(&self) -> BTreeMap<u64, u64> {
1771 self.inner.lock().owner_bytes.iter().map(|(k, v)| (*k, v.allocated_bytes.0)).collect()
1772 }
1773
1774 pub fn get_used_bytes(&self) -> Saturating<u64> {
1776 let inner = self.inner.lock();
1777 inner.used_bytes()
1778 }
1779}
1780
1781impl ReservationOwner for Allocator {
1782 fn release_reservation(&self, owner_object_id: Option<u64>, amount: u64) {
1783 self.inner.lock().remove_reservation(owner_object_id, amount);
1784 }
1785}
1786
1787#[async_trait]
1788impl JournalingObject for Allocator {
1789 fn apply_mutation(
1790 &self,
1791 mutation: Mutation,
1792 context: &ApplyContext<'_, '_>,
1793 _assoc_obj: AssocObj<'_>,
1794 ) -> Result<(), Error> {
1795 match mutation {
1796 Mutation::Allocator(AllocatorMutation::MarkForDeletion(owner_object_id)) => {
1797 let mut inner = self.inner.lock();
1798 inner.owner_bytes.remove(&owner_object_id);
1799
1800 inner.info.marked_for_deletion.insert(owner_object_id);
1805 inner.volumes_deleted_pending_sync.insert(owner_object_id);
1806
1807 inner.info.limit_bytes.remove(&owner_object_id);
1808 }
1809 Mutation::Allocator(AllocatorMutation::Allocate { device_range, owner_object_id }) => {
1810 self.maximum_offset.fetch_max(device_range.end, Ordering::Relaxed);
1811 let item = AllocatorItem {
1812 key: AllocatorKey { device_range: Extent(device_range.0.clone()) },
1813 value: AllocatorValue::Abs { count: 1, owner_object_id },
1814 };
1815 let len = item.key.device_range.length().unwrap();
1816 let lower_bound = item.key.lower_bound_for_merge_into();
1817 self.tree.merge_into(item, &lower_bound);
1818 let mut inner = self.inner.lock();
1819 let entry = inner.owner_bytes.entry(owner_object_id).or_default();
1820 entry.allocated_bytes += len;
1821 if let ApplyMode::Live(transaction) = context.mode {
1822 entry.uncommitted_allocated_bytes -= len;
1823 inner.dropped_temporary_allocations.push(device_range.0);
1828 if let Some(reservation) = transaction.allocator_reservation {
1829 reservation.commit(len);
1830 }
1831 }
1832 }
1833 Mutation::Allocator(AllocatorMutation::Deallocate {
1834 device_range,
1835 owner_object_id,
1836 }) => {
1837 let item = AllocatorItem {
1838 key: AllocatorKey { device_range: Extent(device_range.0) },
1839 value: AllocatorValue::None,
1840 };
1841 let len = item.key.device_range.length().unwrap();
1842
1843 {
1844 let mut inner = self.inner.lock();
1845 {
1846 let entry = inner.owner_bytes.entry(owner_object_id).or_default();
1847 entry.allocated_bytes -= len;
1848 if context.mode.is_live() {
1849 entry.committed_deallocated_bytes += len;
1850 }
1851 }
1852 if context.mode.is_live() {
1853 inner.committed_deallocated.push_back(CommittedDeallocation {
1854 log_file_offset: context.checkpoint.file_offset,
1855 range: (*item.key.device_range).clone(),
1856 owner_object_id,
1857 });
1858 }
1859 if let ApplyMode::Live(Transaction {
1860 allocator_reservation: Some(reservation),
1861 ..
1862 }) = context.mode
1863 {
1864 inner.add_reservation(reservation.owner_object_id(), len);
1865 reservation.add(len);
1866 }
1867 }
1868 let lower_bound = item.key.lower_bound_for_merge_into();
1869 self.tree.merge_into(item, &lower_bound);
1870 }
1871 Mutation::Allocator(AllocatorMutation::SetLimit { owner_object_id, bytes }) => {
1872 self.inner.lock().info.limit_bytes.insert(owner_object_id, bytes);
1877 }
1878 Mutation::BeginFlush => {
1879 self.tree.seal();
1880 let mut inner = self.inner.lock();
1883 let allocated_bytes =
1884 inner.owner_bytes.iter().map(|(k, v)| (*k, v.allocated_bytes.0)).collect();
1885 inner.info.allocated_bytes = allocated_bytes;
1886 }
1887 Mutation::EndFlush => {}
1888 _ => bail!("unexpected mutation: {:?}", mutation),
1889 }
1890 Ok(())
1891 }
1892
1893 fn drop_mutation(&self, mutation: Mutation, transaction: &Transaction<'_>) {
1894 match mutation {
1895 Mutation::Allocator(AllocatorMutation::Allocate { device_range, owner_object_id }) => {
1896 let len = device_range.length().unwrap();
1897 let mut inner = self.inner.lock();
1898 inner
1899 .owner_bytes
1900 .entry(owner_object_id)
1901 .or_default()
1902 .uncommitted_allocated_bytes -= len;
1903 if let Some(reservation) = transaction.allocator_reservation {
1904 let res_owner = reservation.owner_object_id();
1905 inner.add_reservation(res_owner, len);
1906 reservation.release_reservation(res_owner, len);
1907 }
1908 inner.strategy.free(device_range.0.clone()).expect("drop mutaton");
1909 self.temporary_allocations
1910 .erase(&AllocatorKey { device_range: Extent(device_range.0) });
1911 }
1912 Mutation::Allocator(AllocatorMutation::Deallocate { device_range, .. }) => {
1913 self.temporary_allocations
1914 .erase(&AllocatorKey { device_range: Extent(device_range.0) });
1915 }
1916 _ => {}
1917 }
1918 }
1919
1920 async fn flush(&self) -> Result<Version, Error> {
1921 let filesystem = self.filesystem.upgrade().unwrap();
1922 let object_manager = filesystem.object_manager();
1923 let earliest_version = self.tree.get_earliest_version();
1924 if !object_manager.needs_flush(self.object_id()) && earliest_version == LATEST_VERSION {
1925 return Ok(earliest_version);
1927 }
1928
1929 let fs = self.filesystem.upgrade().unwrap();
1930 let mut flusher = Flusher::new(self, &fs).await;
1931 let (new_layer_file, info) = flusher.start().await?;
1932 flusher.finish(new_layer_file, info).await
1933 }
1934}
1935
1936pub struct CoalescingIterator<I> {
1949 iter: I,
1950 item: Option<AllocatorItem>,
1951}
1952
1953impl<I: LayerIterator<AllocatorKey, AllocatorValue>> CoalescingIterator<I> {
1954 pub async fn new(iter: I) -> Result<CoalescingIterator<I>, Error> {
1955 let mut iter = Self { iter, item: None };
1956 iter.advance().await?;
1957 Ok(iter)
1958 }
1959}
1960
1961#[async_trait]
1962impl<I: LayerIterator<AllocatorKey, AllocatorValue>> LayerIterator<AllocatorKey, AllocatorValue>
1963 for CoalescingIterator<I>
1964{
1965 async fn advance(&mut self) -> Result<(), Error> {
1966 self.item = self.iter.get().map(|x| x.cloned());
1967 if self.item.is_none() {
1968 return Ok(());
1969 }
1970 let left = self.item.as_mut().unwrap();
1971 loop {
1972 self.iter.advance().await?;
1973 match self.iter.get() {
1974 None => return Ok(()),
1975 Some(right) => {
1976 ensure!(
1978 left.key.device_range.end <= right.key.device_range.start,
1979 FxfsError::Inconsistent
1980 );
1981 if left.key.device_range.end < right.key.device_range.start
1983 || left.value != *right.value
1984 {
1985 return Ok(());
1986 }
1987 left.key.device_range.end = right.key.device_range.end;
1988 }
1989 }
1990 }
1991 }
1992
1993 fn get(&self) -> Option<ItemRef<'_, AllocatorKey, AllocatorValue>> {
1994 self.item.as_ref().map(|x| x.as_item_ref())
1995 }
1996}
1997
1998struct Flusher<'a> {
1999 allocator: &'a Allocator,
2000 fs: &'a Arc<FxFilesystem>,
2001 _guard: WriteGuard<'a>,
2002}
2003
2004impl<'a> Flusher<'a> {
2005 async fn new(allocator: &'a Allocator, fs: &'a Arc<FxFilesystem>) -> Self {
2006 let keys = lock_keys![LockKey::flush(allocator.object_id())];
2007 Self { allocator, fs, _guard: fs.lock_manager().write_lock(keys).await }
2008 }
2009
2010 fn txn_options(allocator_reservation: &Reservation) -> Options<'_> {
2011 Options {
2012 skip_journal_checks: true,
2013 borrow_metadata_space: true,
2014 allocator_reservation: Some(allocator_reservation),
2015 ..Default::default()
2016 }
2017 }
2018
2019 async fn start(&mut self) -> Result<(DataObjectHandle<ObjectStore>, AllocatorInfo), Error> {
2020 let object_manager = self.fs.object_manager();
2021 let mut transaction = self
2022 .fs
2023 .clone()
2024 .new_transaction(lock_keys![], Self::txn_options(object_manager.metadata_reservation()))
2025 .await?;
2026
2027 let root_store = self.fs.root_store();
2028 let layer_object_handle = ObjectStore::create_object(
2029 &root_store,
2030 &mut transaction,
2031 HandleOptions { skip_journal_checks: true, ..Default::default() },
2032 None,
2033 )
2034 .await?;
2035 root_store.add_to_graveyard(&mut transaction, layer_object_handle.object_id());
2036 transaction.add(self.allocator.object_id(), Mutation::BeginFlush);
2043 let info = transaction
2044 .commit_with_callback(|_| {
2045 self.allocator.inner.lock().info.clone()
2048 })
2049 .await?;
2050 Ok((layer_object_handle, info))
2051 }
2052
2053 async fn finish(
2054 self,
2055 layer_object_handle: DataObjectHandle<ObjectStore>,
2056 mut info: AllocatorInfo,
2057 ) -> Result<Version, Error> {
2058 let object_manager = self.fs.object_manager();
2059 let txn_options = Self::txn_options(object_manager.metadata_reservation());
2060
2061 let layer_set = self.allocator.tree.immutable_layer_set();
2062 let total_len = layer_set.sum_len();
2063 {
2064 let start_time = std::time::Instant::now();
2065 let merged_layer_count = layer_set.layers.len();
2066 let mut merger = layer_set.merger();
2067 let iter = self.allocator.filter(merger.query(Query::FullScan).await?, true).await?;
2068 let iter = CoalescingIterator::new(iter).await?;
2069 let bytes_written = compact_with_iterator(
2070 iter,
2071 total_len,
2072 DirectWriter::new(&layer_object_handle, txn_options).await,
2073 layer_object_handle.block_size(),
2074 Some(self.fs.journal().get_compaction_yielder()),
2075 )
2076 .await?;
2077
2078 self.allocator.tree.report_compaction_metrics(
2079 bytes_written,
2080 start_time.elapsed(),
2081 merged_layer_count,
2082 );
2083 }
2084
2085 let root_store = self.fs.root_store();
2086
2087 let object_handle;
2089 let reservation_update;
2090 let mut transaction = self
2091 .fs
2092 .clone()
2093 .new_transaction(
2094 lock_keys![LockKey::object(
2095 root_store.store_object_id(),
2096 self.allocator.object_id()
2097 )],
2098 txn_options,
2099 )
2100 .await?;
2101 let mut serialized_info = Vec::new();
2102
2103 debug!(oid = layer_object_handle.object_id(); "new allocator layer file");
2104 object_handle = ObjectStore::open_object(
2105 &root_store,
2106 self.allocator.object_id(),
2107 HandleOptions::default(),
2108 None,
2109 )
2110 .await?;
2111
2112 for object_id in &info.layers {
2114 root_store.add_to_graveyard(&mut transaction, *object_id);
2115 }
2116
2117 let marked_for_deletion = std::mem::take(&mut info.marked_for_deletion);
2124
2125 info.layers = vec![layer_object_handle.object_id()];
2126
2127 info.serialize_with_version(&mut serialized_info)?;
2128
2129 let mut buf = object_handle.allocate_buffer(serialized_info.len()).await;
2130 buf.as_mut_slice()[..serialized_info.len()].copy_from_slice(&serialized_info[..]);
2131 object_handle.txn_write(&mut transaction, 0u64, buf.as_ref()).await?;
2132
2133 reservation_update = ReservationUpdate::new(tree::reservation_amount_from_layer_size(
2134 layer_object_handle.get_size(),
2135 ));
2136
2137 transaction.add_with_object(
2140 self.allocator.object_id(),
2141 Mutation::EndFlush,
2142 AssocObj::Borrowed(&reservation_update),
2143 );
2144 root_store.remove_from_graveyard(&mut transaction, layer_object_handle.object_id());
2145
2146 let layers = layers_from_handles([layer_object_handle]).await?;
2147 transaction
2148 .commit_with_callback(|_| {
2149 self.allocator.tree.set_layers(layers);
2150
2151 let mut inner = self.allocator.inner.lock();
2155 inner.info.layers = info.layers;
2156 for owner_id in marked_for_deletion {
2157 inner.marked_for_deletion.remove(&owner_id);
2158 inner.info.marked_for_deletion.remove(&owner_id);
2159 }
2160 })
2161 .await?;
2162
2163 for layer in layer_set.layers {
2165 let object_id = layer.handle().map(|h| h.object_id());
2166 layer.close_layer().await;
2167 if let Some(object_id) = object_id {
2168 root_store.tombstone_object(object_id, txn_options).await?;
2169 }
2170 }
2171
2172 let mut counters = self.allocator.counters.lock();
2173 counters.num_flushes += 1;
2174 counters.last_flush_time = Some(std::time::SystemTime::now());
2175 Ok(self.allocator.tree.get_earliest_version())
2177 }
2178}
2179
2180#[cfg(test)]
2181mod tests {
2182 use crate::filesystem::{
2183 FxFilesystem, FxFilesystemBuilder, JournalingObject, OpenFxFilesystem,
2184 };
2185 use crate::fsck::fsck;
2186 use crate::lsm_tree::cache::NullCache;
2187 use crate::lsm_tree::skip_list_layer::SkipListLayer;
2188 use crate::lsm_tree::types::{FuzzyHash as _, Item, ItemRef, Layer, LayerIterator};
2189 use crate::lsm_tree::{LSMTree, Query};
2190 use crate::object_handle::ObjectHandle;
2191 use crate::object_store::allocator::merge::merge;
2192 use crate::object_store::allocator::{
2193 Allocator, AllocatorKey, AllocatorValue, CoalescingIterator,
2194 };
2195 use crate::object_store::transaction::{Options, TRANSACTION_METADATA_MAX_AMOUNT, lock_keys};
2196 use crate::object_store::volume::root_volume;
2197 use crate::object_store::{Directory, LockKey, NewChildStoreOptions, ObjectStore};
2198 use crate::range::RangeExt;
2199 use crate::round::round_up;
2200 use crate::serialized_types::{LATEST_VERSION, Versioned};
2201 use crate::testing;
2202 use bincode::Options as _;
2203 use fuchsia_async as fasync;
2204 use fuchsia_sync::Mutex;
2205 use std::cmp::{max, min};
2206 use std::ops::{Bound, Range};
2207 use std::sync::Arc;
2208 use storage_device::DeviceHolder;
2209 use storage_device::fake_device::FakeDevice;
2210
2211 #[test]
2212 fn test_allocator_key_is_range_based() {
2213 assert!(AllocatorKey { device_range: (0..100).into() }.is_range_key());
2215 }
2216
2217 #[test]
2218 fn test_allocator_key_serialization_compatibility() {
2219 let range = 1024..2048;
2220 let key = AllocatorKey { device_range: range.clone().into() };
2221
2222 let options = bincode::DefaultOptions::new().allow_trailing_bytes();
2225 let expected_bytes = options.serialize(&range).unwrap();
2226
2227 let mut key_bytes = Vec::new();
2229 key.serialize_into(&mut key_bytes).unwrap();
2230
2231 assert_eq!(key_bytes, expected_bytes);
2233
2234 let deserialized_key =
2237 AllocatorKey::deserialize_from(&mut &expected_bytes[..], LATEST_VERSION).unwrap();
2238 assert_eq!(deserialized_key, key);
2239 }
2240
2241 #[fuchsia::test]
2242 async fn test_coalescing_iterator() {
2243 let skip_list = SkipListLayer::new(100);
2244 let items = [
2245 Item::new(
2246 AllocatorKey { device_range: (0..100 * 512).into() },
2247 AllocatorValue::Abs { count: 1, owner_object_id: 99 },
2248 ),
2249 Item::new(
2250 AllocatorKey { device_range: (100 * 512..200 * 512).into() },
2251 AllocatorValue::Abs { count: 1, owner_object_id: 99 },
2252 ),
2253 ];
2254 skip_list.insert(items[1].clone()).expect("insert error");
2255 skip_list.insert(items[0].clone()).expect("insert error");
2256 let mut iter =
2257 CoalescingIterator::new(skip_list.seek(Bound::Unbounded).await.expect("seek failed"))
2258 .await
2259 .expect("new failed");
2260 let ItemRef { key, value, .. } = iter.get().expect("get failed");
2261 assert_eq!(
2262 (key, value),
2263 (
2264 &AllocatorKey { device_range: (0..200 * 512).into() },
2265 &AllocatorValue::Abs { count: 1, owner_object_id: 99 }
2266 )
2267 );
2268 iter.advance().await.expect("advance failed");
2269 assert!(iter.get().is_none());
2270 }
2271
2272 #[fuchsia::test]
2273 async fn test_merge_and_coalesce_across_three_layers() {
2274 let lsm_tree = LSMTree::new(merge, Box::new(NullCache {}));
2275 lsm_tree
2276 .insert(Item::new(
2277 AllocatorKey { device_range: (100 * 512..200 * 512).into() },
2278 AllocatorValue::Abs { count: 1, owner_object_id: 99 },
2279 ))
2280 .expect("insert error");
2281 lsm_tree.seal();
2282 lsm_tree
2283 .insert(Item::new(
2284 AllocatorKey { device_range: (0..100 * 512).into() },
2285 AllocatorValue::Abs { count: 1, owner_object_id: 99 },
2286 ))
2287 .expect("insert error");
2288
2289 let layer_set = lsm_tree.layer_set();
2290 let mut merger = layer_set.merger();
2291 let mut iter =
2292 CoalescingIterator::new(merger.query(Query::FullScan).await.expect("seek failed"))
2293 .await
2294 .expect("new failed");
2295 let ItemRef { key, value, .. } = iter.get().expect("get failed");
2296 assert_eq!(
2297 (key, value),
2298 (
2299 &AllocatorKey { device_range: (0..200 * 512).into() },
2300 &AllocatorValue::Abs { count: 1, owner_object_id: 99 }
2301 )
2302 );
2303 iter.advance().await.expect("advance failed");
2304 assert!(iter.get().is_none());
2305 }
2306
2307 #[fuchsia::test]
2308 async fn test_merge_and_coalesce_wont_merge_across_object_id() {
2309 let lsm_tree = LSMTree::new(merge, Box::new(NullCache {}));
2310 lsm_tree
2311 .insert(Item::new(
2312 AllocatorKey { device_range: (100 * 512..200 * 512).into() },
2313 AllocatorValue::Abs { count: 1, owner_object_id: 99 },
2314 ))
2315 .expect("insert error");
2316 lsm_tree.seal();
2317 lsm_tree
2318 .insert(Item::new(
2319 AllocatorKey { device_range: (0..100 * 512).into() },
2320 AllocatorValue::Abs { count: 1, owner_object_id: 98 },
2321 ))
2322 .expect("insert error");
2323
2324 let layer_set = lsm_tree.layer_set();
2325 let mut merger = layer_set.merger();
2326 let mut iter =
2327 CoalescingIterator::new(merger.query(Query::FullScan).await.expect("seek failed"))
2328 .await
2329 .expect("new failed");
2330 let ItemRef { key, value, .. } = iter.get().expect("get failed");
2331 assert_eq!(
2332 (key, value),
2333 (
2334 &AllocatorKey { device_range: (0..100 * 512).into() },
2335 &AllocatorValue::Abs { count: 1, owner_object_id: 98 },
2336 )
2337 );
2338 iter.advance().await.expect("advance failed");
2339 let ItemRef { key, value, .. } = iter.get().expect("get failed");
2340 assert_eq!(
2341 (key, value),
2342 (
2343 &AllocatorKey { device_range: (100 * 512..200 * 512).into() },
2344 &AllocatorValue::Abs { count: 1, owner_object_id: 99 }
2345 )
2346 );
2347 iter.advance().await.expect("advance failed");
2348 assert!(iter.get().is_none());
2349 }
2350
2351 fn overlap(a: &Range<u64>, b: &Range<u64>) -> u64 {
2352 if a.end > b.start && a.start < b.end {
2353 min(a.end, b.end) - max(a.start, b.start)
2354 } else {
2355 0
2356 }
2357 }
2358
2359 async fn collect_allocations(allocator: &Allocator) -> Vec<Range<u64>> {
2360 let layer_set = allocator.tree.layer_set();
2361 let mut merger = layer_set.merger();
2362 let mut iter = allocator
2363 .filter(merger.query(Query::FullScan).await.expect("seek failed"), false)
2364 .await
2365 .expect("build iterator");
2366 let mut allocations: Vec<Range<u64>> = Vec::new();
2367 while let Some(ItemRef { key: AllocatorKey { device_range }, .. }) = iter.get() {
2368 if let Some(r) = allocations.last() {
2369 assert!(device_range.start >= r.end);
2370 }
2371 allocations.push(device_range.clone().into());
2372 iter.advance().await.expect("advance failed");
2373 }
2374 allocations
2375 }
2376
2377 async fn check_allocations(allocator: &Allocator, expected_allocations: &[Range<u64>]) {
2378 let layer_set = allocator.tree.layer_set();
2379 let mut merger = layer_set.merger();
2380 let mut iter = allocator
2381 .filter(merger.query(Query::FullScan).await.expect("seek failed"), false)
2382 .await
2383 .expect("build iterator");
2384 let mut found = 0;
2385 while let Some(ItemRef { key: AllocatorKey { device_range }, .. }) = iter.get() {
2386 let mut l = device_range.length().expect("Invalid range");
2387 found += l;
2388 for range in expected_allocations {
2391 l -= overlap(range, device_range);
2392 if l == 0 {
2393 break;
2394 }
2395 }
2396 assert_eq!(l, 0, "range {:?} not covered by expectations", device_range);
2397 iter.advance().await.expect("advance failed");
2398 }
2399 assert_eq!(found, expected_allocations.iter().map(|r| r.length().unwrap()).sum::<u64>());
2401 }
2402
2403 async fn test_fs() -> (OpenFxFilesystem, Arc<Allocator>) {
2404 let device = DeviceHolder::new(FakeDevice::new(4096, 4096));
2405 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2406 let allocator = fs.allocator();
2407 (fs, allocator)
2408 }
2409
2410 #[fuchsia::test]
2411 async fn test_allocations() {
2412 const STORE_OBJECT_ID: u64 = 99;
2413 let (fs, allocator) = test_fs().await;
2414 let mut transaction =
2415 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2416 let mut device_ranges = collect_allocations(&allocator).await;
2417
2418 let expected = vec![
2420 0..4096, 4096..139264, 139264..204800, 204800..335872, 335872..401408, 524288..528384, ];
2427 assert_eq!(device_ranges, expected);
2428 device_ranges.push(
2429 allocator
2430 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2431 .await
2432 .expect("allocate failed"),
2433 );
2434 assert_eq!(device_ranges.last().unwrap().length().expect("Invalid range"), fs.block_size());
2435 device_ranges.push(
2436 allocator
2437 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2438 .await
2439 .expect("allocate failed"),
2440 );
2441 assert_eq!(device_ranges.last().unwrap().length().expect("Invalid range"), fs.block_size());
2442 assert_eq!(overlap(&device_ranges[0], &device_ranges[1]), 0);
2443 transaction.commit().await.expect("commit failed");
2444 let mut transaction =
2445 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2446 device_ranges.push(
2447 allocator
2448 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2449 .await
2450 .expect("allocate failed"),
2451 );
2452 assert_eq!(device_ranges[7].length().expect("Invalid range"), fs.block_size());
2453 assert_eq!(overlap(&device_ranges[5], &device_ranges[7]), 0);
2454 assert_eq!(overlap(&device_ranges[6], &device_ranges[7]), 0);
2455 transaction.commit().await.expect("commit failed");
2456
2457 check_allocations(&allocator, &device_ranges).await;
2458 }
2459
2460 #[fuchsia::test]
2461 async fn test_allocate_more_than_max_size() {
2462 const STORE_OBJECT_ID: u64 = 99;
2463 let (fs, allocator) = test_fs().await;
2464 let mut transaction =
2465 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2466 let mut device_ranges = collect_allocations(&allocator).await;
2467 device_ranges.push(
2468 allocator
2469 .allocate(&mut transaction, STORE_OBJECT_ID, fs.device().size())
2470 .await
2471 .expect("allocate failed"),
2472 );
2473 assert_eq!(
2474 device_ranges.last().unwrap().length().expect("Invalid range"),
2475 allocator.max_extent_size_bytes
2476 );
2477 transaction.commit().await.expect("commit failed");
2478
2479 check_allocations(&allocator, &device_ranges).await;
2480 }
2481
2482 #[fuchsia::test]
2483 async fn test_deallocations() {
2484 const STORE_OBJECT_ID: u64 = 99;
2485 let (fs, allocator) = test_fs().await;
2486 let initial_allocations = collect_allocations(&allocator).await;
2487
2488 let mut transaction =
2489 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2490 let device_range1 = allocator
2491 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2492 .await
2493 .expect("allocate failed");
2494 assert_eq!(device_range1.length().expect("Invalid range"), fs.block_size());
2495 transaction.commit().await.expect("commit failed");
2496
2497 let mut transaction =
2498 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2499 allocator
2500 .deallocate(&mut transaction, STORE_OBJECT_ID, device_range1)
2501 .await
2502 .expect("deallocate failed");
2503 transaction.commit().await.expect("commit failed");
2504
2505 check_allocations(&allocator, &initial_allocations).await;
2506 }
2507
2508 #[fuchsia::test]
2509 async fn test_mark_allocated() {
2510 const STORE_OBJECT_ID: u64 = 99;
2511 let (fs, allocator) = test_fs().await;
2512 let mut device_ranges = collect_allocations(&allocator).await;
2513 let range = {
2514 let mut transaction = fs
2515 .clone()
2516 .new_transaction(lock_keys![], Options::default())
2517 .await
2518 .expect("new failed");
2519 allocator
2521 .allocate(&mut transaction, STORE_OBJECT_ID, 2 * fs.block_size())
2522 .await
2523 .expect("allocate failed")
2524 };
2526
2527 let mut transaction =
2528 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2529
2530 device_ranges.push(
2533 allocator
2534 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2535 .await
2536 .expect("allocate failed"),
2537 );
2538
2539 assert_eq!(device_ranges.last().unwrap().start, range.start);
2540
2541 let mut range2 = range.clone();
2543 range2.start += fs.block_size();
2544 allocator
2545 .mark_allocated(&mut transaction, STORE_OBJECT_ID, range2.clone())
2546 .expect("mark_allocated failed");
2547 device_ranges.push(range2);
2548
2549 device_ranges.push(
2551 allocator
2552 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2553 .await
2554 .expect("allocate failed"),
2555 );
2556 let last_range = device_ranges.last().unwrap();
2557 assert_eq!(last_range.length().expect("Invalid range"), fs.block_size());
2558 assert_eq!(overlap(last_range, &range), 0);
2559 transaction.commit().await.expect("commit failed");
2560
2561 check_allocations(&allocator, &device_ranges).await;
2562 }
2563
2564 #[fuchsia::test]
2565 async fn test_mark_for_deletion() {
2566 const STORE_OBJECT_ID: u64 = 99;
2567 let (fs, allocator) = test_fs().await;
2568
2569 let initial_allocated_bytes = allocator.get_allocated_bytes();
2571 let mut device_ranges = collect_allocations(&allocator).await;
2572 let mut transaction =
2573 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2574 for _ in 0..15 {
2576 device_ranges.push(
2577 allocator
2578 .allocate(&mut transaction, STORE_OBJECT_ID, 100 * fs.block_size())
2579 .await
2580 .expect("allocate failed"),
2581 );
2582 device_ranges.push(
2583 allocator
2584 .allocate(&mut transaction, STORE_OBJECT_ID, 100 * fs.block_size())
2585 .await
2586 .expect("allocate2 failed"),
2587 );
2588 }
2589 transaction.commit().await.expect("commit failed");
2590 check_allocations(&allocator, &device_ranges).await;
2591
2592 assert_eq!(
2593 allocator.get_allocated_bytes(),
2594 initial_allocated_bytes + fs.block_size() * 3000
2595 );
2596
2597 let mut transaction =
2599 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2600 allocator.mark_for_deletion(&mut transaction, STORE_OBJECT_ID);
2601 transaction.commit().await.expect("commit failed");
2602
2603 assert_eq!(allocator.get_allocated_bytes(), initial_allocated_bytes);
2605 check_allocations(&allocator, &device_ranges).await;
2606
2607 device_ranges.clear();
2610
2611 let mut transaction =
2612 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2613 let target_bytes = 1500 * fs.block_size();
2614 while device_ranges.iter().map(|x| x.length().unwrap()).sum::<u64>() != target_bytes {
2615 let len = std::cmp::min(
2616 target_bytes - device_ranges.iter().map(|x| x.length().unwrap()).sum::<u64>(),
2617 100 * fs.block_size(),
2618 );
2619 device_ranges.push(
2620 allocator.allocate(&mut transaction, 100, len).await.expect("allocate failed"),
2621 );
2622 }
2623 transaction.commit().await.expect("commit failed");
2624
2625 allocator.flush().await.expect("flush failed");
2627
2628 assert!(!allocator.get_owner_allocated_bytes().contains_key(&STORE_OBJECT_ID));
2632 assert_eq!(*allocator.get_owner_allocated_bytes().get(&100).unwrap(), target_bytes,);
2633 }
2634
2635 async fn create_file(store: &Arc<ObjectStore>, size: usize) {
2636 let root_directory =
2637 Directory::open(store, store.root_directory_object_id()).await.expect("open failed");
2638
2639 let mut transaction = store
2640 .filesystem()
2641 .new_transaction(
2642 lock_keys![LockKey::object(
2643 store.store_object_id(),
2644 store.root_directory_object_id()
2645 )],
2646 Options::default(),
2647 )
2648 .await
2649 .expect("new_transaction failed");
2650 let file = root_directory
2651 .create_child_file(&mut transaction, &format!("foo {}", size))
2652 .await
2653 .expect("create_child_file failed");
2654 transaction.commit().await.expect("commit failed");
2655
2656 let buffer = file.allocate_buffer(size).await;
2657
2658 let mut transaction = file
2660 .new_transaction_with_options(Options {
2661 borrow_metadata_space: true,
2662 ..Default::default()
2663 })
2664 .await
2665 .expect("new_transaction_with_options failed");
2666 file.txn_write(&mut transaction, 0, buffer.as_ref()).await.expect("txn_write failed");
2667 transaction.commit().await.expect("commit failed");
2668 }
2669
2670 #[fuchsia::test]
2671 async fn test_replay_with_deleted_store_and_compaction() {
2672 let (fs, _) = test_fs().await;
2673
2674 const FILE_SIZE: usize = 10_000_000;
2675
2676 let mut store_id = {
2677 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2678 let store = root_vol
2679 .new_volume("vol", NewChildStoreOptions::default())
2680 .await
2681 .expect("new_volume failed");
2682
2683 create_file(&store, FILE_SIZE).await;
2684 store.store_object_id()
2685 };
2686
2687 fs.close().await.expect("close failed");
2688 let device = fs.take_device().await;
2689 device.reopen(false);
2690
2691 let mut fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2692
2693 fs.journal().force_compact().await.expect("compact failed");
2696
2697 for _ in 0..2 {
2698 {
2699 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2700
2701 let transaction = fs
2702 .clone()
2703 .new_transaction(
2704 lock_keys![
2705 LockKey::object(
2706 root_vol.volume_directory().store().store_object_id(),
2707 root_vol.volume_directory().object_id(),
2708 ),
2709 LockKey::flush(store_id)
2710 ],
2711 Options { borrow_metadata_space: true, ..Default::default() },
2712 )
2713 .await
2714 .expect("new_transaction failed");
2715 root_vol
2716 .delete_volume("vol", transaction, || {})
2717 .await
2718 .expect("delete_volume failed");
2719
2720 let store = root_vol
2721 .new_volume("vol", NewChildStoreOptions::default())
2722 .await
2723 .expect("new_volume failed");
2724 create_file(&store, FILE_SIZE).await;
2725 store_id = store.store_object_id();
2726 }
2727
2728 fs.close().await.expect("close failed");
2729 let device = fs.take_device().await;
2730 device.reopen(false);
2731
2732 fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2733 }
2734
2735 fsck(fs.clone()).await.expect("fsck failed");
2736 fs.close().await.expect("close failed");
2737 }
2738
2739 #[fuchsia::test(threads = 4)]
2740 async fn test_compaction_delete_race() {
2741 let (fs, _allocator) = test_fs().await;
2742
2743 {
2744 const FILE_SIZE: usize = 10_000_000;
2745
2746 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2747 let store = root_vol
2748 .new_volume("vol", NewChildStoreOptions::default())
2749 .await
2750 .expect("new_volume failed");
2751
2752 create_file(&store, FILE_SIZE).await;
2753
2754 let fs_clone = fs.clone();
2756
2757 let executor_tasks = testing::force_executor_threads_to_run(4).await;
2760
2761 let task = fasync::Task::spawn(async move {
2762 fs_clone.journal().force_compact().await.expect("compact failed");
2763 });
2764
2765 drop(executor_tasks);
2767
2768 let sleep = rand::random_range(3000..6000);
2771 std::thread::sleep(std::time::Duration::from_micros(sleep));
2772 log::info!("sleep {sleep}us");
2773
2774 let transaction = fs
2775 .clone()
2776 .new_transaction(
2777 lock_keys![
2778 LockKey::object(
2779 root_vol.volume_directory().store().store_object_id(),
2780 root_vol.volume_directory().object_id(),
2781 ),
2782 LockKey::flush(store.store_object_id())
2783 ],
2784 Options { borrow_metadata_space: true, ..Default::default() },
2785 )
2786 .await
2787 .expect("new_transaction failed");
2788 root_vol.delete_volume("vol", transaction, || {}).await.expect("delete_volume failed");
2789
2790 task.await;
2791 }
2792
2793 fs.journal().force_compact().await.expect("compact failed");
2794 fs.close().await.expect("close failed");
2795
2796 let device = fs.take_device().await;
2797 device.reopen(false);
2798
2799 let fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2800 fsck(fs.clone()).await.expect("fsck failed");
2801 fs.close().await.expect("close failed");
2802 }
2803
2804 #[fuchsia::test]
2805 async fn test_delete_multiple_volumes() {
2806 let (mut fs, _) = test_fs().await;
2807
2808 for _ in 0..50 {
2809 {
2810 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2811 let store = root_vol
2812 .new_volume("vol", NewChildStoreOptions::default())
2813 .await
2814 .expect("new_volume failed");
2815
2816 create_file(&store, 1_000_000).await;
2817
2818 let transaction = fs
2819 .clone()
2820 .new_transaction(
2821 lock_keys![
2822 LockKey::object(
2823 root_vol.volume_directory().store().store_object_id(),
2824 root_vol.volume_directory().object_id(),
2825 ),
2826 LockKey::flush(store.store_object_id())
2827 ],
2828 Options { borrow_metadata_space: true, ..Default::default() },
2829 )
2830 .await
2831 .expect("new_transaction failed");
2832 root_vol
2833 .delete_volume("vol", transaction, || {})
2834 .await
2835 .expect("delete_volume failed");
2836
2837 fs.allocator().flush().await.expect("flush failed");
2838 }
2839
2840 fs.close().await.expect("close failed");
2841 let device = fs.take_device().await;
2842 device.reopen(false);
2843
2844 fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2845 }
2846
2847 fsck(fs.clone()).await.expect("fsck failed");
2848 fs.close().await.expect("close failed");
2849 }
2850
2851 #[fuchsia::test]
2852 async fn test_allocate_free_reallocate() {
2853 const STORE_OBJECT_ID: u64 = 99;
2854 let (fs, allocator) = test_fs().await;
2855
2856 let mut device_ranges = Vec::new();
2858 let mut transaction =
2859 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2860 for _ in 0..30 {
2861 device_ranges.push(
2862 allocator
2863 .allocate(&mut transaction, STORE_OBJECT_ID, 100 * fs.block_size())
2864 .await
2865 .expect("allocate failed"),
2866 );
2867 }
2868 transaction.commit().await.expect("commit failed");
2869
2870 assert_eq!(
2871 fs.block_size() * 3000,
2872 *allocator.get_owner_allocated_bytes().entry(STORE_OBJECT_ID).or_default()
2873 );
2874
2875 let mut transaction =
2877 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2878 for range in std::mem::replace(&mut device_ranges, Vec::new()) {
2879 allocator.deallocate(&mut transaction, STORE_OBJECT_ID, range).await.expect("dealloc");
2880 }
2881 transaction.commit().await.expect("commit failed");
2882
2883 assert_eq!(0, *allocator.get_owner_allocated_bytes().entry(STORE_OBJECT_ID).or_default());
2884
2885 let mut transaction =
2888 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2889 let target_len = 1500 * fs.block_size();
2890 while device_ranges.iter().map(|i| i.length().unwrap()).sum::<u64>() != target_len {
2891 let len = target_len - device_ranges.iter().map(|i| i.length().unwrap()).sum::<u64>();
2892 device_ranges.push(
2893 allocator
2894 .allocate(&mut transaction, STORE_OBJECT_ID, len)
2895 .await
2896 .expect("allocate failed"),
2897 );
2898 }
2899 transaction.commit().await.expect("commit failed");
2900
2901 assert_eq!(
2902 fs.block_size() * 1500,
2903 *allocator.get_owner_allocated_bytes().entry(STORE_OBJECT_ID).or_default()
2904 );
2905 }
2906
2907 #[fuchsia::test]
2908 async fn test_flush() {
2909 const STORE_OBJECT_ID: u64 = 99;
2910
2911 let mut device_ranges = Vec::new();
2912 let device = {
2913 let (fs, allocator) = test_fs().await;
2914 let mut transaction = fs
2915 .clone()
2916 .new_transaction(lock_keys![], Options::default())
2917 .await
2918 .expect("new failed");
2919 device_ranges.push(
2920 allocator
2921 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2922 .await
2923 .expect("allocate failed"),
2924 );
2925 device_ranges.push(
2926 allocator
2927 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2928 .await
2929 .expect("allocate failed"),
2930 );
2931 device_ranges.push(
2932 allocator
2933 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2934 .await
2935 .expect("allocate failed"),
2936 );
2937 transaction.commit().await.expect("commit failed");
2938
2939 allocator.flush().await.expect("flush failed");
2940
2941 fs.close().await.expect("close failed");
2942 fs.take_device().await
2943 };
2944
2945 device.reopen(false);
2946 let fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2947 let allocator = fs.allocator();
2948
2949 let allocated = collect_allocations(&allocator).await;
2950
2951 for i in &device_ranges {
2953 let mut overlapping = 0;
2954 for j in &allocated {
2955 overlapping += overlap(i, j);
2956 }
2957 assert_eq!(overlapping, i.length().unwrap(), "Range {i:?} not allocated");
2958 }
2959
2960 let mut transaction =
2961 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2962 let range = allocator
2963 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2964 .await
2965 .expect("allocate failed");
2966
2967 for r in &allocated {
2969 assert_eq!(overlap(r, &range), 0);
2970 }
2971 transaction.commit().await.expect("commit failed");
2972 }
2973
2974 #[fuchsia::test]
2975 async fn test_dropped_transaction() {
2976 const STORE_OBJECT_ID: u64 = 99;
2977 let (fs, allocator) = test_fs().await;
2978 let allocated_range = {
2979 let mut transaction = fs
2980 .clone()
2981 .new_transaction(lock_keys![], Options::default())
2982 .await
2983 .expect("new_transaction failed");
2984 allocator
2985 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2986 .await
2987 .expect("allocate failed")
2988 };
2989 let mut transaction = fs
2992 .clone()
2993 .new_transaction(lock_keys![], Options::default())
2994 .await
2995 .expect("new_transaction failed");
2996 assert_eq!(
2997 allocator
2998 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2999 .await
3000 .expect("allocate failed"),
3001 allocated_range
3002 );
3003 }
3004
3005 #[fuchsia::test]
3006 async fn test_cleanup_removed_owner() {
3007 const STORE_OBJECT_ID: u64 = 99;
3008 let device = {
3009 let (fs, allocator) = test_fs().await;
3010
3011 assert!(!allocator.get_owner_allocated_bytes().contains_key(&STORE_OBJECT_ID));
3012 {
3013 let mut transaction =
3014 fs.clone().new_transaction(lock_keys![], Options::default()).await.unwrap();
3015 allocator
3016 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
3017 .await
3018 .expect("Allocating");
3019 transaction.commit().await.expect("Committing.");
3020 }
3021 allocator.flush().await.expect("Flushing");
3022 assert!(allocator.get_owner_allocated_bytes().contains_key(&STORE_OBJECT_ID));
3023 {
3024 let mut transaction =
3025 fs.clone().new_transaction(lock_keys![], Options::default()).await.unwrap();
3026 allocator.mark_for_deletion(&mut transaction, STORE_OBJECT_ID);
3027 transaction.commit().await.expect("Committing.");
3028 }
3029 assert!(!allocator.get_owner_allocated_bytes().contains_key(&STORE_OBJECT_ID));
3030 fs.close().await.expect("Closing");
3031 fs.take_device().await
3032 };
3033
3034 device.reopen(false);
3035 let fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
3036 let allocator = fs.allocator();
3037 assert!(!allocator.get_owner_allocated_bytes().contains_key(&STORE_OBJECT_ID));
3038 }
3039
3040 #[fuchsia::test]
3041 async fn test_allocated_bytes() {
3042 const STORE_OBJECT_ID: u64 = 99;
3043 let (fs, allocator) = test_fs().await;
3044
3045 let initial_allocated_bytes = allocator.get_allocated_bytes();
3046
3047 let allocated_bytes = initial_allocated_bytes + fs.block_size();
3049 let allocated_range = {
3050 let mut transaction = fs
3051 .clone()
3052 .new_transaction(lock_keys![], Options::default())
3053 .await
3054 .expect("new_transaction failed");
3055 let range = allocator
3056 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
3057 .await
3058 .expect("allocate failed");
3059 transaction.commit().await.expect("commit failed");
3060 assert_eq!(allocator.get_allocated_bytes(), allocated_bytes);
3061 range
3062 };
3063
3064 {
3065 let mut transaction = fs
3066 .clone()
3067 .new_transaction(lock_keys![], Options::default())
3068 .await
3069 .expect("new_transaction failed");
3070 allocator
3071 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
3072 .await
3073 .expect("allocate failed");
3074
3075 assert_eq!(allocator.get_allocated_bytes(), allocated_bytes);
3077 }
3078
3079 assert_eq!(allocator.get_allocated_bytes(), allocated_bytes);
3081
3082 let deallocate_range = allocated_range.start + 20..allocated_range.end - 20;
3084 let mut transaction =
3085 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
3086 allocator
3087 .deallocate(&mut transaction, STORE_OBJECT_ID, deallocate_range)
3088 .await
3089 .expect("deallocate failed");
3090
3091 assert_eq!(allocator.get_allocated_bytes(), allocated_bytes);
3093
3094 transaction.commit().await.expect("commit failed");
3095
3096 assert_eq!(allocator.get_allocated_bytes(), initial_allocated_bytes + 40);
3098 }
3099
3100 #[fuchsia::test]
3101 async fn test_persist_bytes_limit() {
3102 const LIMIT: u64 = 12345;
3103 const OWNER_ID: u64 = 12;
3104
3105 let (fs, allocator) = test_fs().await;
3106 {
3107 let mut transaction = fs
3108 .clone()
3109 .new_transaction(lock_keys![], Options::default())
3110 .await
3111 .expect("new_transaction failed");
3112 allocator
3113 .set_bytes_limit(&mut transaction, OWNER_ID, LIMIT)
3114 .expect("Failed to set limit.");
3115 assert!(allocator.inner.lock().info.limit_bytes.get(&OWNER_ID).is_none());
3116 transaction.commit().await.expect("Failed to commit transaction");
3117 let bytes: u64 = *allocator
3118 .inner
3119 .lock()
3120 .info
3121 .limit_bytes
3122 .get(&OWNER_ID)
3123 .expect("Failed to find limit");
3124 assert_eq!(LIMIT, bytes);
3125 }
3126 }
3127
3128 fn coalesce_ranges(ranges: Vec<Range<u64>>) -> Vec<Range<u64>> {
3132 let mut coalesced = Vec::new();
3133 let mut prev: Option<Range<u64>> = None;
3134 for range in ranges {
3135 if let Some(prev_range) = &mut prev {
3136 if range.start == prev_range.end {
3137 prev_range.end = range.end;
3138 } else {
3139 coalesced.push(prev_range.clone());
3140 prev = Some(range);
3141 }
3142 } else {
3143 prev = Some(range);
3144 }
3145 }
3146 if let Some(prev_range) = prev {
3147 coalesced.push(prev_range);
3148 }
3149 coalesced
3150 }
3151
3152 #[fuchsia::test]
3153 async fn test_take_for_trimming() {
3154 const STORE_OBJECT_ID: u64 = 99;
3155
3156 let allocated_range;
3159 let expected_free_ranges;
3160 let device = {
3161 let (fs, allocator) = test_fs().await;
3162 let bs = fs.block_size();
3163 let mut transaction = fs
3164 .clone()
3165 .new_transaction(lock_keys![], Options::default())
3166 .await
3167 .expect("new failed");
3168 allocated_range = allocator
3169 .allocate(&mut transaction, STORE_OBJECT_ID, 32 * bs)
3170 .await
3171 .expect("allocate failed");
3172 transaction.commit().await.expect("commit failed");
3173
3174 let mut transaction = fs
3175 .clone()
3176 .new_transaction(lock_keys![], Options::default())
3177 .await
3178 .expect("new failed");
3179 let base = allocated_range.start;
3180 expected_free_ranges = vec![
3181 base..(base + (bs * 1)),
3182 (base + (bs * 2))..(base + (bs * 3)),
3183 (base + (bs * 4))..(base + (bs * 8)),
3187 (base + (bs * 8))..(base + (bs * 12)),
3188 (base + (bs * 12))..(base + (bs * 13)),
3189 (base + (bs * 29))..(base + (bs * 30)),
3190 ];
3191 for range in &expected_free_ranges {
3192 allocator
3193 .deallocate(&mut transaction, STORE_OBJECT_ID, range.clone())
3194 .await
3195 .expect("deallocate failed");
3196 }
3197 transaction.commit().await.expect("commit failed");
3198
3199 allocator.flush().await.expect("flush failed");
3200
3201 fs.close().await.expect("close failed");
3202 fs.take_device().await
3203 };
3204
3205 device.reopen(false);
3206 let fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
3207 let allocator = fs.allocator();
3208
3209 let max_extent_size = fs.block_size() as usize * 4;
3213 const EXTENTS_PER_BATCH: usize = 2;
3214 let mut free_ranges = vec![];
3215 let mut offset = allocated_range.start;
3216 while offset < allocated_range.end {
3217 let free = allocator
3218 .take_for_trimming(offset, max_extent_size, EXTENTS_PER_BATCH)
3219 .await
3220 .expect("take_for_trimming failed");
3221 free_ranges.extend(
3222 free.extents().iter().filter(|range| range.end <= allocated_range.end).cloned(),
3223 );
3224 offset = free.extents().last().expect("Unexpectedly hit the end of free extents").end;
3225 }
3226 let coalesced_free_ranges = coalesce_ranges(free_ranges);
3229 let coalesced_expected_free_ranges = coalesce_ranges(expected_free_ranges);
3230
3231 assert_eq!(coalesced_free_ranges, coalesced_expected_free_ranges);
3232 }
3233
3234 #[fuchsia::test]
3235 async fn test_allocations_wait_for_free_extents() {
3236 const STORE_OBJECT_ID: u64 = 99;
3237 let (fs, allocator) = test_fs().await;
3238 let allocator_clone = allocator.clone();
3239
3240 let mut transaction =
3241 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
3242
3243 let max_extent_size = fs.device().size() as usize;
3245 const EXTENTS_PER_BATCH: usize = usize::MAX;
3246
3247 let trim_done = Arc::new(Mutex::new(false));
3253 let trimmable_extents = allocator
3254 .take_for_trimming(0, max_extent_size, EXTENTS_PER_BATCH)
3255 .await
3256 .expect("take_for_trimming failed");
3257
3258 let trim_done_clone = trim_done.clone();
3259 let bs = fs.block_size();
3260 let alloc_task = fasync::Task::spawn(async move {
3261 allocator_clone
3262 .allocate(&mut transaction, STORE_OBJECT_ID, bs)
3263 .await
3264 .expect("allocate should fail");
3265 {
3266 assert!(*trim_done_clone.lock(), "Allocation finished before trim completed");
3267 }
3268 transaction.commit().await.expect("commit failed");
3269 });
3270
3271 fasync::Timer::new(std::time::Duration::from_millis(100)).await;
3274
3275 {
3277 let mut trim_done = trim_done.lock();
3278 std::mem::drop(trimmable_extents);
3279 *trim_done = true;
3280 }
3281
3282 alloc_task.await;
3283 }
3284
3285 #[fuchsia::test]
3286 async fn test_allocation_with_reservation_not_multiple_of_block_size() {
3287 const STORE_OBJECT_ID: u64 = 99;
3288 let (fs, allocator) = test_fs().await;
3289
3290 const RESERVATION_AMOUNT: u64 = TRANSACTION_METADATA_MAX_AMOUNT + 5000;
3292 let reservation =
3293 allocator.clone().reserve(Some(STORE_OBJECT_ID), RESERVATION_AMOUNT).unwrap();
3294
3295 let mut transaction = fs
3296 .clone()
3297 .new_transaction(
3298 lock_keys![],
3299 Options { allocator_reservation: Some(&reservation), ..Options::default() },
3300 )
3301 .await
3302 .expect("new failed");
3303
3304 let range = allocator
3305 .allocate(
3306 &mut transaction,
3307 STORE_OBJECT_ID,
3308 round_up(RESERVATION_AMOUNT, fs.block_size()).unwrap(),
3309 )
3310 .await
3311 .expect("allocate faiiled");
3312 assert_eq!((range.end - range.start) % fs.block_size(), 0);
3313
3314 println!("{}", range.end - range.start);
3315 }
3316}