1pub mod merge;
90pub mod strategy;
91
92use crate::drop_event::DropEvent;
93use crate::errors::FxfsError;
94use crate::filesystem::{ApplyContext, ApplyMode, FxFilesystem, JournalingObject, SyncOptions};
95use crate::log::*;
96use crate::lsm_tree::cache::NullCache;
97use crate::lsm_tree::skip_list_layer::SkipListLayer;
98use crate::lsm_tree::types::{
99 FuzzyHash, Item, ItemRef, Layer, LayerIterator, LayerKey, MergeType, OrdLowerBound,
100 OrdUpperBound, SortByU64, Value,
101};
102use crate::lsm_tree::{LSMTree, Query, compact_with_iterator, layers_from_handles};
103use crate::object_handle::{INVALID_OBJECT_ID, ObjectHandle, ReadObjectHandle};
104use crate::object_store::object_manager::ReservationUpdate;
105use crate::object_store::transaction::{
106 AllocatorMutation, AssocObj, LockKey, Mutation, Options, Transaction, WriteGuard, lock_keys,
107};
108use crate::object_store::{
109 DataObjectHandle, DirectWriter, Extent, HandleOptions, ObjectStore, ReservedId, tree,
110};
111use crate::range::RangeExt;
112use crate::round::{round_div, round_down, round_up};
113use crate::serialized_types::{
114 DEFAULT_MAX_SERIALIZED_RECORD_SIZE, LATEST_VERSION, Version, Versioned, VersionedLatest,
115};
116use anyhow::{Context, Error, anyhow, bail, ensure};
117use async_trait::async_trait;
118use either::Either::{Left, Right};
119use event_listener::EventListener;
120use fprint::TypeFingerprint;
121use fuchsia_inspect::HistogramProperty;
122use fuchsia_sync::Mutex;
123use futures::FutureExt;
124use fxfs_macros::SerializeKey;
125use merge::{filter_marked_for_deletion, filter_tombstones, merge};
126use serde::{Deserialize, Serialize};
127use std::borrow::Borrow;
128use std::collections::{BTreeMap, HashSet, VecDeque};
129use std::hash::Hash;
130use std::marker::PhantomData;
131use std::num::{NonZero, Saturating};
132use std::ops::Range;
133use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
134use std::sync::{Arc, Weak};
135
136pub trait ReservationOwner: Send + Sync {
138 fn release_reservation(&self, owner_object_id: Option<u64>, amount: u64);
142}
143
144pub struct ReservationImpl<T: Borrow<U>, U: ReservationOwner + ?Sized> {
152 owner: T,
153 owner_object_id: Option<u64>,
154 inner: Mutex<ReservationInner>,
155 phantom: PhantomData<U>,
156}
157
158#[derive(Debug, Default)]
159struct ReservationInner {
160 amount: u64,
162
163 reserved: u64,
165}
166
167impl<T: Borrow<U>, U: ReservationOwner + ?Sized> std::fmt::Debug for ReservationImpl<T, U> {
168 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
169 self.inner.lock().fmt(f)
170 }
171}
172
173impl<T: Borrow<U> + Clone + Send + Sync, U: ReservationOwner + ?Sized> ReservationImpl<T, U> {
174 pub fn new(owner: T, owner_object_id: Option<u64>, amount: u64) -> Self {
175 Self {
176 owner,
177 owner_object_id,
178 inner: Mutex::new(ReservationInner { amount, reserved: 0 }),
179 phantom: PhantomData,
180 }
181 }
182
183 pub fn owner_object_id(&self) -> Option<u64> {
184 self.owner_object_id
185 }
186
187 pub fn amount(&self) -> u64 {
189 self.inner.lock().amount
190 }
191
192 pub fn add(&self, amount: u64) {
194 self.inner.lock().amount += amount;
195 }
196
197 pub fn forget(&self) -> u64 {
201 let mut inner = self.inner.lock();
202 assert_eq!(inner.reserved, 0);
203 std::mem::take(&mut inner.amount)
204 }
205
206 pub fn forget_some(&self, amount: u64) {
210 let mut inner = self.inner.lock();
211 inner.amount -= amount;
212 assert!(inner.reserved <= inner.amount);
213 }
214
215 fn reserve_with(&self, amount: impl FnOnce(u64) -> u64) -> ReservationImpl<&Self, Self> {
218 let mut inner = self.inner.lock();
219 let taken = amount(inner.amount - inner.reserved);
220 inner.reserved += taken;
221 ReservationImpl::new(self, self.owner_object_id, taken)
222 }
223
224 pub fn reserve(&self, amount: u64) -> Option<ReservationImpl<&Self, Self>> {
226 let mut inner = self.inner.lock();
227 if inner.amount - inner.reserved < amount {
228 None
229 } else {
230 inner.reserved += amount;
231 Some(ReservationImpl::new(self, self.owner_object_id, amount))
232 }
233 }
234
235 pub fn commit(&self, amount: u64) {
238 let mut inner = self.inner.lock();
239 inner.reserved -= amount;
240 inner.amount -= amount;
241 }
242
243 pub fn give_back(&self, amount: u64) {
245 self.owner.borrow().release_reservation(self.owner_object_id, amount);
246 let mut inner = self.inner.lock();
247 inner.amount -= amount;
248 assert!(inner.reserved <= inner.amount);
249 }
250
251 pub fn move_to<V: Borrow<W> + Clone + Send + Sync, W: ReservationOwner + ?Sized>(
253 &self,
254 other: &ReservationImpl<V, W>,
255 amount: u64,
256 ) {
257 assert_eq!(self.owner_object_id, other.owner_object_id());
258 let mut inner = self.inner.lock();
259 if let Some(amount) = inner.amount.checked_sub(amount) {
260 inner.amount = amount;
261 } else {
262 std::mem::drop(inner);
263 panic!("Insufficient reservation space");
264 }
265 other.add(amount);
266 }
267}
268
269impl<T: Borrow<U>, U: ReservationOwner + ?Sized> Drop for ReservationImpl<T, U> {
270 fn drop(&mut self) {
271 let inner = self.inner.get_mut();
272 assert_eq!(inner.reserved, 0);
273 let owner_object_id = self.owner_object_id;
274 if inner.amount > 0 {
275 self.owner
276 .borrow()
277 .release_reservation(owner_object_id, std::mem::take(&mut inner.amount));
278 }
279 }
280}
281
282impl<T: Borrow<U> + Send + Sync, U: ReservationOwner + ?Sized> ReservationOwner
283 for ReservationImpl<T, U>
284{
285 fn release_reservation(&self, owner_object_id: Option<u64>, amount: u64) {
286 assert_eq!(owner_object_id, self.owner_object_id);
288 let mut inner = self.inner.lock();
289 assert!(inner.reserved >= amount, "{} >= {}", inner.reserved, amount);
290 inner.reserved -= amount;
291 }
292}
293
294pub type Reservation = ReservationImpl<Arc<dyn ReservationOwner>, dyn ReservationOwner>;
295
296pub type Hold<'a> = ReservationImpl<&'a Reservation, Reservation>;
297
298pub type AllocatorKey = AllocatorKeyV32;
301
302#[derive(
303 Clone,
304 Debug,
305 Deserialize,
306 Eq,
307 Hash,
308 Ord,
309 PartialEq,
310 PartialOrd,
311 Serialize,
312 TypeFingerprint,
313 SerializeKey,
314 Versioned,
315)]
316#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
317pub struct AllocatorKeyV32 {
318 pub device_range: Extent,
319}
320
321impl SortByU64 for AllocatorKey {
322 fn get_leading_u64(&self) -> u64 {
323 self.device_range.end
324 }
325}
326
327const EXTENT_HASH_BUCKET_SIZE: u64 = 1 * 1024 * 1024;
328
329pub struct AllocatorKeyPartitionIterator {
330 device_range: Range<u64>,
331}
332
333impl Iterator for AllocatorKeyPartitionIterator {
334 type Item = u64;
335
336 fn next(&mut self) -> Option<Self::Item> {
337 if self.device_range.start >= self.device_range.end {
338 None
339 } else {
340 let start = self.device_range.start;
341 self.device_range.start = start.saturating_add(EXTENT_HASH_BUCKET_SIZE);
342 let end = std::cmp::min(self.device_range.start, self.device_range.end);
343 let key = AllocatorKey { device_range: Extent(start..end) };
344 let hash = crate::stable_hash::stable_hash(key);
345 Some(hash)
346 }
347 }
348}
349
350impl FuzzyHash for AllocatorKey {
351 fn fuzzy_hash(&self) -> impl Iterator<Item = u64> {
352 AllocatorKeyPartitionIterator {
353 device_range: round_down(self.device_range.start, EXTENT_HASH_BUCKET_SIZE)
354 ..round_up(self.device_range.end, EXTENT_HASH_BUCKET_SIZE).unwrap_or(u64::MAX),
355 }
356 }
357
358 fn is_range_key(&self) -> bool {
359 true
360 }
361}
362
363impl AllocatorKey {
364 pub fn lower_bound_for_merge_into(self: &AllocatorKey) -> AllocatorKey {
366 AllocatorKey { device_range: Extent(0..self.device_range.start) }
367 }
368}
369
370impl LayerKey for AllocatorKey {
371 fn merge_type(&self) -> MergeType {
372 MergeType::OptimizedMerge
373 }
374
375 fn next_key(&self) -> Option<Self> {
376 Some(Self { device_range: Extent(0..self.device_range.end + 1) })
377 }
378
379 fn search_key(&self) -> Option<Self> {
380 Some(Self { device_range: Extent(0..self.device_range.start + 1) })
381 }
382
383 fn is_search_key(&self) -> bool {
384 self.device_range.start == 0
385 }
386
387 fn overlaps(&self, other: &Self) -> bool {
388 self.device_range.overlap(&other.device_range).is_some()
389 }
390}
391
392impl OrdUpperBound for AllocatorKey {
393 fn cmp_upper_bound(&self, other: &AllocatorKey) -> std::cmp::Ordering {
394 self.device_range
395 .end
396 .cmp(&other.device_range.end)
397 .then(self.device_range.start.cmp(&other.device_range.start))
398 }
399}
400
401impl OrdLowerBound for AllocatorKey {
402 fn cmp_lower_bound(&self, other: &AllocatorKey) -> std::cmp::Ordering {
403 self.device_range.cmp(&other.device_range)
408 }
409}
410
411pub type AllocatorValue = AllocatorValueV32;
414impl Value for AllocatorValue {
415 const DELETED_MARKER: Self = Self::None;
416}
417
418#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize, TypeFingerprint, Versioned)]
419#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
420pub enum AllocatorValueV32 {
421 None,
423 Abs { count: u64, owner_object_id: u64 },
427}
428
429pub type AllocatorItem = Item<AllocatorKey, AllocatorValue>;
430
431pub type AllocatorInfo = AllocatorInfoV32;
433
434#[derive(Debug, Default, Clone, Deserialize, Serialize, TypeFingerprint, Versioned)]
435pub struct AllocatorInfoV32 {
436 pub layers: Vec<u64>,
438 pub allocated_bytes: BTreeMap<u64, u64>,
440 pub marked_for_deletion: HashSet<u64>,
443 pub limit_bytes: BTreeMap<u64, u64>,
446}
447
448const MAX_ALLOCATOR_INFO_SERIALIZED_SIZE: usize = 131_072;
449
450pub fn max_extent_size_for_block_size(block_size: u64) -> u64 {
452 block_size * (DEFAULT_MAX_SERIALIZED_RECORD_SIZE - 64) / 9
457}
458
459#[derive(Default)]
460struct AllocatorCounters {
461 num_flushes: u64,
462 last_flush_time: Option<std::time::SystemTime>,
463}
464
465pub struct Allocator {
466 filesystem: Weak<FxFilesystem>,
467 block_size: u64,
468 device_size: u64,
469 object_id: u64,
470 max_extent_size_bytes: u64,
471 tree: LSMTree<AllocatorKey, AllocatorValue>,
472 temporary_allocations: Arc<SkipListLayer<AllocatorKey, AllocatorValue>>,
479 inner: Mutex<Inner>,
480 allocation_mutex: futures::lock::Mutex<()>,
481 counters: Mutex<AllocatorCounters>,
482 maximum_offset: AtomicU64,
483 allocations_allowed: AtomicBool,
484}
485
486#[derive(Debug, Default, PartialEq)]
488struct ByteTracking {
489 allocated_bytes: Saturating<u64>,
496
497 uncommitted_allocated_bytes: u64,
500
501 reserved_bytes: u64,
503
504 committed_deallocated_bytes: u64,
508}
509
510impl ByteTracking {
511 fn used_bytes(&self) -> Saturating<u64> {
514 self.allocated_bytes + Saturating(self.uncommitted_allocated_bytes + self.reserved_bytes)
515 }
516
517 fn unavailable_bytes(&self) -> Saturating<u64> {
522 self.allocated_bytes
523 + Saturating(self.uncommitted_allocated_bytes)
524 + Saturating(self.committed_deallocated_bytes)
525 }
526
527 fn unavailable_after_sync_bytes(&self) -> Saturating<u64> {
530 self.allocated_bytes + Saturating(self.uncommitted_allocated_bytes)
531 }
532}
533
534#[derive(Debug)]
535struct CommittedDeallocation {
536 log_file_offset: u64,
538 range: Range<u64>,
540 owner_object_id: u64,
542}
543
544struct Inner {
545 info: AllocatorInfo,
546
547 opened: bool,
550
551 dropped_temporary_allocations: Vec<Range<u64>>,
579
580 owner_bytes: BTreeMap<u64, ByteTracking>,
583
584 unattributed_reserved_bytes: u64,
587
588 committed_deallocated: VecDeque<CommittedDeallocation>,
590
591 trim_reserved_bytes: u64,
594
595 trim_listener: Option<EventListener>,
599
600 strategy: strategy::BestFit,
602
603 allocation_size_histogram: [u64; 64],
605 rebuild_strategy_trigger_histogram: [u64; 64],
607
608 marked_for_deletion: HashSet<u64>,
612
613 volumes_deleted_pending_sync: HashSet<u64>,
615}
616
617impl Inner {
618 fn allocated_bytes(&self) -> Saturating<u64> {
619 let mut total = Saturating(0);
620 for (_, bytes) in &self.owner_bytes {
621 total += bytes.allocated_bytes;
622 }
623 total
624 }
625
626 fn uncommitted_allocated_bytes(&self) -> u64 {
627 self.owner_bytes.values().map(|x| &x.uncommitted_allocated_bytes).sum()
628 }
629
630 fn reserved_bytes(&self) -> u64 {
631 self.owner_bytes.values().map(|x| &x.reserved_bytes).sum::<u64>()
632 + self.unattributed_reserved_bytes
633 }
634
635 fn owner_id_limit_bytes(&self, owner_object_id: u64) -> u64 {
636 match self.info.limit_bytes.get(&owner_object_id) {
637 Some(v) => *v,
638 None => u64::MAX,
639 }
640 }
641
642 fn owner_id_bytes_left(&self, owner_object_id: u64) -> u64 {
643 let limit = self.owner_id_limit_bytes(owner_object_id);
644 let used = self.owner_bytes.get(&owner_object_id).map_or(Saturating(0), |b| b.used_bytes());
645 (Saturating(limit) - used).0
646 }
647
648 fn unavailable_bytes(&self) -> Saturating<u64> {
653 let mut total = Saturating(0);
654 for (_, bytes) in &self.owner_bytes {
655 total += bytes.unavailable_bytes();
656 }
657 total
658 }
659
660 fn used_bytes(&self) -> Saturating<u64> {
663 let mut total = Saturating(0);
664 for (_, bytes) in &self.owner_bytes {
665 total += bytes.used_bytes();
666 }
667 total + Saturating(self.unattributed_reserved_bytes)
668 }
669
670 fn unavailable_after_sync_bytes(&self) -> Saturating<u64> {
673 let mut total = Saturating(0);
674 for (_, bytes) in &self.owner_bytes {
675 total += bytes.unavailable_after_sync_bytes();
676 }
677 total
678 }
679
680 fn bytes_available_not_being_trimmed(&self, device_size: u64) -> Result<u64, Error> {
683 device_size
684 .checked_sub(
685 (self.unavailable_after_sync_bytes() + Saturating(self.trim_reserved_bytes)).0,
686 )
687 .ok_or_else(|| anyhow!(FxfsError::Inconsistent))
688 }
689
690 fn add_reservation(&mut self, owner_object_id: Option<u64>, amount: u64) {
691 match owner_object_id {
692 Some(owner) => self.owner_bytes.entry(owner).or_default().reserved_bytes += amount,
693 None => self.unattributed_reserved_bytes += amount,
694 };
695 }
696
697 fn remove_reservation(&mut self, owner_object_id: Option<u64>, amount: u64) {
698 match owner_object_id {
699 Some(owner) => {
700 let owner_entry = self.owner_bytes.entry(owner).or_default();
701 assert!(
702 owner_entry.reserved_bytes >= amount,
703 "{} >= {}",
704 owner_entry.reserved_bytes,
705 amount
706 );
707 owner_entry.reserved_bytes -= amount;
708 }
709 None => {
710 assert!(
711 self.unattributed_reserved_bytes >= amount,
712 "{} >= {}",
713 self.unattributed_reserved_bytes,
714 amount
715 );
716 self.unattributed_reserved_bytes -= amount
717 }
718 };
719 }
720}
721
722pub struct TrimmableExtents<'a> {
725 allocator: &'a Allocator,
726 extents: Vec<Range<u64>>,
727 _drop_event: DropEvent,
731}
732
733impl<'a> TrimmableExtents<'a> {
734 pub fn extents(&self) -> &Vec<Range<u64>> {
735 &self.extents
736 }
737
738 fn new(allocator: &'a Allocator) -> (Self, EventListener) {
740 let drop_event = DropEvent::new();
741 let listener = drop_event.listen();
742 (Self { allocator, extents: vec![], _drop_event: drop_event }, listener)
743 }
744
745 fn add_extent(&mut self, extent: Range<u64>) {
746 self.extents.push(extent);
747 }
748}
749
750impl<'a> Drop for TrimmableExtents<'a> {
751 fn drop(&mut self) {
752 let mut inner = self.allocator.inner.lock();
753 for device_range in std::mem::take(&mut self.extents) {
754 inner.strategy.free(device_range.clone()).expect("drop trim extent");
755 self.allocator
756 .temporary_allocations
757 .erase(&AllocatorKey { device_range: Extent(device_range.clone()) });
758 }
759 inner.trim_reserved_bytes = 0;
760 }
761}
762
763impl Allocator {
764 pub fn new(filesystem: Arc<FxFilesystem>, object_id: u64) -> Allocator {
765 let block_size = filesystem.block_size();
766 let device_size = round_down(filesystem.device().size(), block_size);
768 if device_size != filesystem.device().size() {
769 warn!("Device size is not block aligned. Rounding down.");
770 }
771 let max_extent_size_bytes = max_extent_size_for_block_size(filesystem.block_size());
772 let mut strategy = strategy::BestFit::default();
773 strategy.free(0..device_size).expect("new fs");
774 Allocator {
775 filesystem: Arc::downgrade(&filesystem),
776 block_size,
777 device_size,
778 object_id,
779 max_extent_size_bytes,
780 tree: LSMTree::new(merge, Box::new(NullCache {})),
781 temporary_allocations: SkipListLayer::new(1024),
782 inner: Mutex::new(Inner {
783 info: AllocatorInfo::default(),
784 opened: false,
785 dropped_temporary_allocations: Vec::new(),
786 owner_bytes: BTreeMap::new(),
787 unattributed_reserved_bytes: 0,
788 committed_deallocated: VecDeque::new(),
789 trim_reserved_bytes: 0,
790 trim_listener: None,
791 strategy,
792 allocation_size_histogram: [0; 64],
793 rebuild_strategy_trigger_histogram: [0; 64],
794 marked_for_deletion: HashSet::new(),
795 volumes_deleted_pending_sync: HashSet::new(),
796 }),
797 allocation_mutex: futures::lock::Mutex::new(()),
798 counters: Mutex::new(AllocatorCounters::default()),
799 maximum_offset: AtomicU64::new(0),
800 allocations_allowed: AtomicBool::new(filesystem.options().image_builder_mode.is_none()),
801 }
802 }
803
804 pub fn tree(&self) -> &LSMTree<AllocatorKey, AllocatorValue> {
805 &self.tree
806 }
807
808 pub fn enable_allocations(&self) {
811 self.allocations_allowed.store(true, Ordering::SeqCst);
812 }
813
814 pub async fn filter(
819 &self,
820 iter: impl LayerIterator<AllocatorKey, AllocatorValue>,
821 committed_marked_for_deletion: bool,
822 ) -> Result<impl LayerIterator<AllocatorKey, AllocatorValue>, Error> {
823 let marked_for_deletion = {
824 let inner = self.inner.lock();
825 if committed_marked_for_deletion {
826 &inner.info.marked_for_deletion
827 } else {
828 &inner.marked_for_deletion
829 }
830 .clone()
831 };
832 let iter =
833 filter_marked_for_deletion(filter_tombstones(iter).await?, marked_for_deletion).await?;
834 Ok(iter)
835 }
836
837 pub fn allocation_size_histogram(&self) -> [u64; 64] {
841 self.inner.lock().allocation_size_histogram
842 }
843
844 pub async fn create(&self, transaction: &mut Transaction<'_>) -> Result<(), Error> {
846 assert_eq!(std::mem::replace(&mut self.inner.lock().opened, true), false);
849
850 let filesystem = self.filesystem.upgrade().unwrap();
851 let root_store = filesystem.root_store();
852 ObjectStore::create_object_with_id(
853 &root_store,
854 transaction,
855 ReservedId::new(&root_store, NonZero::new(self.object_id()).unwrap()),
856 HandleOptions::default(),
857 None,
858 )?;
859 root_store.update_last_object_id(self.object_id());
860 Ok(())
861 }
862
863 pub async fn open(self: &Arc<Self>) -> Result<(), Error> {
866 let filesystem = self.filesystem.upgrade().unwrap();
867 let root_store = filesystem.root_store();
868
869 self.inner.lock().strategy = strategy::BestFit::default();
870
871 let handle =
872 ObjectStore::open_object(&root_store, self.object_id, HandleOptions::default(), None)
873 .await
874 .context("Failed to open allocator object")?;
875
876 if handle.get_size() > 0 {
877 let serialized_info = handle
878 .contents(MAX_ALLOCATOR_INFO_SERIALIZED_SIZE)
879 .await
880 .context("Failed to read AllocatorInfo")?;
881 let mut cursor = std::io::Cursor::new(serialized_info);
882 let (info, _version) = AllocatorInfo::deserialize_with_version(&mut cursor)
883 .context("Failed to deserialize AllocatorInfo")?;
884
885 let mut handles = Vec::new();
886 let mut total_size = 0;
887 for object_id in &info.layers {
888 let handle = ObjectStore::open_object(
889 &root_store,
890 *object_id,
891 HandleOptions::default(),
892 None,
893 )
894 .await
895 .context("Failed to open allocator layer file")?;
896
897 let size = handle.get_size();
898 total_size += size;
899 handles.push(handle);
900 }
901
902 {
903 let mut inner = self.inner.lock();
904
905 let mut device_bytes = self.device_size;
907 for (&owner_object_id, &bytes) in &info.allocated_bytes {
908 ensure!(
909 bytes <= device_bytes,
910 anyhow!(FxfsError::Inconsistent).context(format!(
911 "Allocated bytes exceeds device size: {:?}",
912 info.allocated_bytes
913 ))
914 );
915 device_bytes -= bytes;
916
917 inner.owner_bytes.entry(owner_object_id).or_default().allocated_bytes =
918 Saturating(bytes);
919 }
920
921 inner.info = info;
922 }
923
924 self.tree.append_layers(handles).await.context("Failed to append allocator layers")?;
925 self.filesystem.upgrade().unwrap().object_manager().update_reservation(
926 self.object_id,
927 tree::reservation_amount_from_layer_size(total_size),
928 );
929 }
930
931 Ok(())
932 }
933
934 pub async fn on_replay_complete(self: &Arc<Self>) -> Result<(), Error> {
935 {
937 let mut inner = self.inner.lock();
938 inner.volumes_deleted_pending_sync.clear();
939 inner.marked_for_deletion = inner.info.marked_for_deletion.clone();
940 }
941
942 if !self.rebuild_strategy().await.context("Build free extents")? {
946 if self.filesystem.upgrade().unwrap().options().read_only {
947 info!("Device contains no free space (read-only mode).");
948 } else {
949 info!("Device contains no free space.");
950 return Err(FxfsError::Inconsistent)
951 .context("Device appears to contain no free space");
952 }
953 }
954
955 assert_eq!(std::mem::replace(&mut self.inner.lock().opened, true), false);
956 Ok(())
957 }
958
959 async fn rebuild_strategy(self: &Arc<Self>) -> Result<bool, Error> {
964 let mut changed = false;
965 let mut layer_set = self.tree.empty_layer_set();
966 layer_set.layers.push((self.temporary_allocations.clone() as Arc<dyn Layer<_, _>>).into());
967 self.tree.add_all_layers_to_layer_set(&mut layer_set);
968
969 let overflow_markers = self.inner.lock().strategy.overflow_markers();
970 self.inner.lock().strategy.reset_overflow_markers();
971
972 let mut to_add = Vec::new();
973 let mut merger = layer_set.merger();
974 let mut iter = self.filter(merger.query(Query::FullScan).await?, false).await?;
975 let mut last_offset = 0;
976 while last_offset < self.device_size {
977 let next_range = match iter.get() {
978 None => {
979 assert!(last_offset <= self.device_size);
980 let range = last_offset..self.device_size;
981 last_offset = self.device_size;
982 iter.advance().await?;
983 range
984 }
985 Some(ItemRef { key, .. }) => {
986 let device_range = &key.device_range;
987 if device_range.end < last_offset {
988 iter.advance().await?;
989 continue;
990 }
991 if device_range.start <= last_offset {
992 last_offset = device_range.end;
993 iter.advance().await?;
994 continue;
995 }
996 let range = last_offset..device_range.start;
997 last_offset = device_range.end;
998 iter.advance().await?;
999 range
1000 }
1001 };
1002 to_add.push(next_range);
1003 if to_add.len() > 100 {
1005 let mut inner = self.inner.lock();
1006 for range in to_add.drain(..) {
1007 changed |= inner.strategy.force_free(range)?;
1008 }
1009 }
1010 }
1011 let mut inner = self.inner.lock();
1012 for range in to_add {
1013 changed |= inner.strategy.force_free(range)?;
1014 }
1015 if overflow_markers != inner.strategy.overflow_markers() {
1016 changed = true;
1017 }
1018 Ok(changed)
1019 }
1020
1021 pub async fn take_for_trimming(
1025 &self,
1026 offset: u64,
1027 max_extent_size: usize,
1028 extents_per_batch: usize,
1029 ) -> Result<TrimmableExtents<'_>, Error> {
1030 let _guard = self.allocation_mutex.lock().await;
1031
1032 let (mut result, listener) = TrimmableExtents::new(self);
1033 if offset >= self.device_size {
1034 return Ok(result);
1035 }
1036 let mut bytes = 0;
1037
1038 let mut layer_set = self.tree.empty_layer_set();
1041 layer_set.layers.push((self.temporary_allocations.clone() as Arc<dyn Layer<_, _>>).into());
1042 self.tree.add_all_layers_to_layer_set(&mut layer_set);
1043 let mut merger = layer_set.merger();
1044 let mut iter = self
1045 .filter(
1046 merger
1047 .query(Query::FullRange(&AllocatorKey {
1048 device_range: Extent::search_key_from_offset(offset),
1049 }))
1050 .await?,
1051 false,
1052 )
1053 .await?;
1054 let mut last_offset = offset;
1055 'outer: while last_offset < self.device_size {
1056 let mut range = match iter.get() {
1057 None => {
1058 assert!(last_offset <= self.device_size);
1059 let range = last_offset..self.device_size;
1060 last_offset = self.device_size;
1061 iter.advance().await?;
1062 range
1063 }
1064 Some(ItemRef { key: AllocatorKey { device_range, .. }, .. }) => {
1065 if device_range.end <= last_offset {
1066 iter.advance().await?;
1067 continue;
1068 }
1069 if device_range.start <= last_offset {
1070 last_offset = device_range.end;
1071 iter.advance().await?;
1072 continue;
1073 }
1074 let range = last_offset..device_range.start;
1075 last_offset = device_range.end;
1076 iter.advance().await?;
1077 range
1078 }
1079 };
1080 if range.start < offset {
1081 continue;
1082 }
1083
1084 let mut inner = self.inner.lock();
1087
1088 while range.start < range.end {
1089 let prefix =
1090 range.start..std::cmp::min(range.start + max_extent_size as u64, range.end);
1091 range = prefix.end..range.end;
1092 bytes += prefix.length()?;
1093 inner.strategy.remove(prefix.clone());
1096 self.temporary_allocations.insert(AllocatorItem {
1097 key: AllocatorKey { device_range: Extent(prefix.clone()) },
1098 value: AllocatorValue::Abs { owner_object_id: INVALID_OBJECT_ID, count: 1 },
1099 })?;
1100 result.add_extent(prefix);
1101 if result.extents.len() == extents_per_batch {
1102 break 'outer;
1103 }
1104 }
1105 if result.extents.len() == extents_per_batch {
1106 break 'outer;
1107 }
1108 }
1109 {
1110 let mut inner = self.inner.lock();
1111
1112 assert!(inner.trim_reserved_bytes == 0, "Multiple trims ongoing");
1113 inner.trim_listener = Some(listener);
1114 inner.trim_reserved_bytes = bytes;
1115 debug_assert!(
1116 (Saturating(inner.trim_reserved_bytes) + inner.unavailable_bytes()).0
1117 <= self.device_size
1118 );
1119 }
1120 Ok(result)
1121 }
1122
1123 pub fn parent_objects(&self) -> Vec<u64> {
1125 self.inner.lock().info.layers.clone()
1128 }
1129
1130 pub fn owner_byte_limits(&self) -> Vec<(u64, u64)> {
1132 self.inner.lock().info.limit_bytes.iter().map(|(k, v)| (*k, *v)).collect()
1133 }
1134
1135 pub fn owner_allocation_info(&self, owner_object_id: u64) -> (u64, Option<u64>) {
1137 let inner = self.inner.lock();
1138 (
1139 inner.owner_bytes.get(&owner_object_id).map(|b| b.used_bytes().0).unwrap_or(0u64),
1140 inner.info.limit_bytes.get(&owner_object_id).copied(),
1141 )
1142 }
1143
1144 pub fn owner_bytes_debug(&self) -> String {
1146 format!("{:?}", self.inner.lock().owner_bytes)
1147 }
1148
1149 fn needs_sync(&self) -> bool {
1150 let inner = self.inner.lock();
1155 inner.unavailable_bytes().0 >= self.device_size
1156 }
1157
1158 fn is_system_store(&self, owner_object_id: u64) -> bool {
1159 let fs = self.filesystem.upgrade().unwrap();
1160 owner_object_id == fs.object_manager().root_store_object_id()
1161 || owner_object_id == fs.object_manager().root_parent_store_object_id()
1162 }
1163
1164 pub fn disown_reservation(&self, old_owner_object_id: Option<u64>, amount: u64) {
1167 if old_owner_object_id.is_none() || amount == 0 {
1168 return;
1169 }
1170 let mut inner = self.inner.lock();
1172 inner.remove_reservation(old_owner_object_id, amount);
1173 inner.add_reservation(None, amount);
1174 }
1175
1176 pub fn track_statistics(self: &Arc<Self>, parent: &fuchsia_inspect::Node, name: &str) {
1179 let this = Arc::downgrade(self);
1180 parent.record_lazy_child(name, move || {
1181 let this_clone = this.clone();
1182 async move {
1183 let inspector = fuchsia_inspect::Inspector::default();
1184 if let Some(this) = this_clone.upgrade() {
1185 let counters = this.counters.lock();
1186 let root = inspector.root();
1187 root.record_uint("max_extent_size_bytes", this.max_extent_size_bytes);
1188 root.record_uint("bytes_total", this.device_size);
1189 let (allocated, reserved, used, unavailable) = {
1190 let inner = this.inner.lock();
1192 (
1193 inner.allocated_bytes().0,
1194 inner.reserved_bytes(),
1195 inner.used_bytes().0,
1196 inner.unavailable_bytes().0,
1197 )
1198 };
1199 root.record_uint("bytes_allocated", allocated);
1200 root.record_uint("bytes_reserved", reserved);
1201 root.record_uint("bytes_used", used);
1202 root.record_uint("bytes_unavailable", unavailable);
1203
1204 if let Some(x) = round_div(100 * allocated, this.device_size) {
1207 root.record_uint("bytes_allocated_percent", x);
1208 }
1209 if let Some(x) = round_div(100 * reserved, this.device_size) {
1210 root.record_uint("bytes_reserved_percent", x);
1211 }
1212 if let Some(x) = round_div(100 * used, this.device_size) {
1213 root.record_uint("bytes_used_percent", x);
1214 }
1215 if let Some(x) = round_div(100 * unavailable, this.device_size) {
1216 root.record_uint("bytes_unavailable_percent", x);
1217 }
1218
1219 root.record_uint("num_flushes", counters.num_flushes);
1220 if let Some(last_flush_time) = counters.last_flush_time.as_ref() {
1221 root.record_uint(
1222 "last_flush_time_ms",
1223 last_flush_time
1224 .duration_since(std::time::UNIX_EPOCH)
1225 .unwrap_or(std::time::Duration::ZERO)
1226 .as_millis()
1227 .try_into()
1228 .unwrap_or(0u64),
1229 );
1230 }
1231
1232 let data = this.allocation_size_histogram();
1233 let alloc_sizes = root.create_uint_linear_histogram(
1234 "allocation_size_histogram",
1235 fuchsia_inspect::LinearHistogramParams {
1236 floor: 1,
1237 step_size: 1,
1238 buckets: 64,
1239 },
1240 );
1241 for (i, count) in data.iter().enumerate() {
1242 if i != 0 {
1243 alloc_sizes.insert_multiple(i as u64, *count as usize);
1244 }
1245 }
1246 root.record(alloc_sizes);
1247
1248 let data = this.inner.lock().rebuild_strategy_trigger_histogram;
1249 let triggers = root.create_uint_linear_histogram(
1250 "rebuild_strategy_triggers",
1251 fuchsia_inspect::LinearHistogramParams {
1252 floor: 1,
1253 step_size: 1,
1254 buckets: 64,
1255 },
1256 );
1257 for (i, count) in data.iter().enumerate() {
1258 if i != 0 {
1259 triggers.insert_multiple(i as u64, *count as usize);
1260 }
1261 }
1262 root.record(triggers);
1263
1264 let allocator_ref = this.clone();
1265 root.record_child("lsm_tree", move |node| {
1266 allocator_ref.tree.record_inspect_data(node);
1267 });
1268 }
1269 Ok(inspector)
1270 }
1271 .boxed()
1272 });
1273 }
1274
1275 pub fn maximum_offset(&self) -> u64 {
1280 self.maximum_offset.load(Ordering::Relaxed)
1281 }
1282}
1283
1284impl Drop for Allocator {
1285 fn drop(&mut self) {
1286 let inner = self.inner.lock();
1287 assert_eq!(inner.uncommitted_allocated_bytes(), 0);
1289 assert_eq!(inner.reserved_bytes(), 0);
1290 }
1291}
1292
1293#[fxfs_trace::trace]
1294impl Allocator {
1295 pub fn object_id(&self) -> u64 {
1297 self.object_id
1298 }
1299
1300 pub fn info(&self) -> AllocatorInfo {
1303 self.inner.lock().info.clone()
1304 }
1305
1306 #[trace]
1314 pub async fn allocate(
1315 self: &Arc<Self>,
1316 transaction: &mut Transaction<'_>,
1317 owner_object_id: u64,
1318 mut len: u64,
1319 ) -> Result<Range<u64>, Error> {
1320 ensure!(self.allocations_allowed.load(Ordering::SeqCst), FxfsError::Unavailable);
1321 assert_eq!(len % self.block_size, 0);
1322 len = std::cmp::min(len, self.max_extent_size_bytes);
1323 debug_assert_ne!(owner_object_id, INVALID_OBJECT_ID);
1324
1325 let reservation = if let Some(reservation) = transaction.allocator_reservation {
1327 match reservation.owner_object_id {
1328 None => assert!(self.is_system_store(owner_object_id)),
1330 Some(res_owner_object_id) => assert_eq!(owner_object_id, res_owner_object_id),
1332 };
1333 let r = reservation
1335 .reserve_with(|limit| std::cmp::min(len, round_down(limit, self.block_size)));
1336 len = r.amount();
1337 Left(r)
1338 } else {
1339 let mut inner = self.inner.lock();
1340 assert!(inner.opened);
1341 let device_used = inner.used_bytes();
1343 let owner_bytes_left = inner.owner_id_bytes_left(owner_object_id);
1344 let limit =
1346 std::cmp::min(owner_bytes_left, (Saturating(self.device_size) - device_used).0);
1347 len = round_down(std::cmp::min(len, limit), self.block_size);
1348 let owner_entry = inner.owner_bytes.entry(owner_object_id).or_default();
1349 owner_entry.reserved_bytes += len;
1350 Right(ReservationImpl::<_, Self>::new(&**self, Some(owner_object_id), len))
1351 };
1352
1353 ensure!(len > 0, FxfsError::NoSpace);
1354
1355 let volumes_deleted = {
1357 let inner = self.inner.lock();
1358 (!inner.volumes_deleted_pending_sync.is_empty())
1359 .then(|| inner.volumes_deleted_pending_sync.clone())
1360 };
1361
1362 if let Some(volumes_deleted) = volumes_deleted {
1363 self.filesystem
1366 .upgrade()
1367 .unwrap()
1368 .sync(SyncOptions {
1369 flush_device: true,
1370 precondition: Some(Box::new(|| {
1371 !self.inner.lock().volumes_deleted_pending_sync.is_empty()
1372 })),
1373 ..Default::default()
1374 })
1375 .await?;
1376
1377 {
1378 let mut inner = self.inner.lock();
1379 for owner_id in volumes_deleted {
1380 inner.volumes_deleted_pending_sync.remove(&owner_id);
1381 inner.marked_for_deletion.insert(owner_id);
1382 }
1383 }
1384
1385 let _guard = self.allocation_mutex.lock().await;
1386 self.rebuild_strategy().await?;
1387 }
1388
1389 #[allow(clippy::never_loop)] let _guard = 'sync: loop {
1391 for _ in 0..10 {
1393 {
1394 let guard = self.allocation_mutex.lock().await;
1395
1396 if !self.needs_sync() {
1397 break 'sync guard;
1398 }
1399 }
1400
1401 self.filesystem
1411 .upgrade()
1412 .unwrap()
1413 .sync(SyncOptions {
1414 flush_device: true,
1415 precondition: Some(Box::new(|| self.needs_sync())),
1416 ..Default::default()
1417 })
1418 .await?;
1419 }
1420 bail!(
1421 anyhow!(FxfsError::NoSpace).context("Sync failed to yield sufficient free space.")
1422 );
1423 };
1424
1425 let mut trim_listener = None;
1426 {
1427 let mut inner = self.inner.lock();
1428 inner.allocation_size_histogram[std::cmp::min(63, len / self.block_size) as usize] += 1;
1429
1430 let avail = self
1433 .device_size
1434 .checked_sub(inner.unavailable_bytes().0)
1435 .ok_or(FxfsError::Inconsistent)?;
1436 let free_and_not_being_trimmed =
1437 inner.bytes_available_not_being_trimmed(self.device_size)?;
1438 if free_and_not_being_trimmed < std::cmp::min(len, avail) {
1439 debug_assert!(inner.trim_reserved_bytes > 0);
1440 trim_listener = std::mem::take(&mut inner.trim_listener);
1441 }
1442 }
1443
1444 if let Some(listener) = trim_listener {
1445 listener.await;
1446 }
1447
1448 let result = loop {
1449 {
1450 let mut inner = self.inner.lock();
1451
1452 for device_range in inner.dropped_temporary_allocations.drain(..) {
1455 self.temporary_allocations
1456 .erase(&AllocatorKey { device_range: Extent(device_range) });
1457 }
1458
1459 match inner.strategy.allocate(len) {
1460 Err(FxfsError::NotFound) => {
1461 inner.rebuild_strategy_trigger_histogram
1463 [std::cmp::min(63, (len / self.block_size) as usize)] += 1;
1464 }
1465 Err(err) => {
1466 error!(err:%; "Likely filesystem corruption.");
1467 return Err(err.into());
1468 }
1469 Ok(x) => {
1470 break x;
1471 }
1472 }
1473 }
1474 if !self.rebuild_strategy().await? {
1478 error!("Cannot find additional free space. Corruption?");
1479 return Err(FxfsError::Inconsistent.into());
1480 }
1481 };
1482
1483 debug!(device_range:? = result; "allocate");
1484
1485 let len = result.length().unwrap();
1486 let reservation_owner = reservation.either(
1487 |l| {
1489 l.forget_some(len);
1490 l.owner_object_id()
1491 },
1492 |r| {
1493 r.forget_some(len);
1494 r.owner_object_id()
1495 },
1496 );
1497
1498 {
1499 let mut inner = self.inner.lock();
1500 let owner_entry = inner.owner_bytes.entry(owner_object_id).or_default();
1501 owner_entry.uncommitted_allocated_bytes += len;
1502 assert_eq!(owner_object_id, reservation_owner.unwrap_or(owner_object_id));
1504 inner.remove_reservation(reservation_owner, len);
1505 self.temporary_allocations.insert(AllocatorItem {
1506 key: AllocatorKey { device_range: Extent(result.clone()) },
1507 value: AllocatorValue::Abs { owner_object_id, count: 1 },
1508 })?;
1509 }
1510
1511 let mutation =
1512 AllocatorMutation::Allocate { device_range: result.clone().into(), owner_object_id };
1513 assert!(transaction.add(self.object_id(), Mutation::Allocator(mutation)).is_none());
1514
1515 Ok(result)
1516 }
1517
1518 #[trace]
1521 pub fn mark_allocated(
1522 &self,
1523 transaction: &mut Transaction<'_>,
1524 owner_object_id: u64,
1525 device_range: Range<u64>,
1526 ) -> Result<(), Error> {
1527 debug_assert_ne!(owner_object_id, INVALID_OBJECT_ID);
1528 {
1529 let len = device_range.length().map_err(|_| FxfsError::InvalidArgs)?;
1530
1531 let mut inner = self.inner.lock();
1532 let device_used = inner.used_bytes();
1533 let owner_id_bytes_left = inner.owner_id_bytes_left(owner_object_id);
1534 let owner_entry = inner.owner_bytes.entry(owner_object_id).or_default();
1535 ensure!(
1536 device_range.end <= self.device_size
1537 && (Saturating(self.device_size) - device_used).0 >= len
1538 && owner_id_bytes_left >= len,
1539 FxfsError::NoSpace
1540 );
1541 if let Some(reservation) = &mut transaction.allocator_reservation {
1542 reservation.reserve(len).ok_or(FxfsError::NoSpace)?.forget();
1544 }
1545 owner_entry.uncommitted_allocated_bytes += len;
1546 inner.strategy.remove(device_range.clone());
1547 self.temporary_allocations.insert(AllocatorItem {
1548 key: AllocatorKey { device_range: Extent(device_range.clone()) },
1549 value: AllocatorValue::Abs { owner_object_id, count: 1 },
1550 })?;
1551 }
1552 let mutation =
1553 AllocatorMutation::Allocate { device_range: device_range.into(), owner_object_id };
1554 transaction.add(self.object_id(), Mutation::Allocator(mutation));
1555 Ok(())
1556 }
1557
1558 pub fn set_bytes_limit(
1560 &self,
1561 transaction: &mut Transaction<'_>,
1562 owner_object_id: u64,
1563 bytes: u64,
1564 ) -> Result<(), Error> {
1565 assert!(!self.is_system_store(owner_object_id));
1567 transaction.add(
1568 self.object_id(),
1569 Mutation::Allocator(AllocatorMutation::SetLimit { owner_object_id, bytes }),
1570 );
1571 Ok(())
1572 }
1573
1574 pub fn get_owner_bytes_limit(&self, owner_object_id: u64) -> Option<u64> {
1576 self.inner.lock().info.limit_bytes.get(&owner_object_id).copied()
1577 }
1578
1579 pub fn get_owner_bytes_used(&self, owner_object_id: u64) -> u64 {
1582 self.inner.lock().owner_bytes.get(&owner_object_id).map_or(0, |info| info.used_bytes().0)
1583 }
1584
1585 #[trace]
1587 pub async fn deallocate(
1588 &self,
1589 transaction: &mut Transaction<'_>,
1590 owner_object_id: u64,
1591 dealloc_range: Range<u64>,
1592 ) -> Result<u64, Error> {
1593 debug!(device_range:? = dealloc_range; "deallocate");
1594 ensure!(dealloc_range.is_valid(), FxfsError::InvalidArgs);
1595 let deallocated = dealloc_range.end - dealloc_range.start;
1599 let mutation = AllocatorMutation::Deallocate {
1600 device_range: dealloc_range.clone().into(),
1601 owner_object_id,
1602 };
1603 transaction.add(self.object_id(), Mutation::Allocator(mutation));
1604
1605 let _guard = self.allocation_mutex.lock().await;
1606
1607 let mut inner = self.inner.lock();
1619 for device_range in inner.dropped_temporary_allocations.drain(..) {
1620 self.temporary_allocations.erase(&AllocatorKey { device_range: Extent(device_range) });
1621 }
1622
1623 self.temporary_allocations
1628 .insert(AllocatorItem {
1629 key: AllocatorKey { device_range: Extent(dealloc_range.clone()) },
1630 value: AllocatorValue::Abs { owner_object_id, count: 1 },
1631 })
1632 .context("tracking deallocated")?;
1633
1634 Ok(deallocated)
1635 }
1636
1637 pub fn mark_for_deletion(&self, transaction: &mut Transaction<'_>, owner_object_id: u64) {
1654 transaction.add(
1657 self.object_id(),
1658 Mutation::Allocator(AllocatorMutation::MarkForDeletion(owner_object_id)),
1659 );
1660 }
1661
1662 pub fn did_flush_device(&self, flush_log_offset: u64) {
1665 #[allow(clippy::never_loop)] let deallocs = 'deallocs_outer: loop {
1670 let mut inner = self.inner.lock();
1671 for (index, dealloc) in inner.committed_deallocated.iter().enumerate() {
1672 if dealloc.log_file_offset >= flush_log_offset {
1673 let mut deallocs = inner.committed_deallocated.split_off(index);
1674 std::mem::swap(&mut inner.committed_deallocated, &mut deallocs);
1676 break 'deallocs_outer deallocs;
1677 }
1678 }
1679 break std::mem::take(&mut inner.committed_deallocated);
1680 };
1681
1682 let mut inner = self.inner.lock();
1684 let mut totals = BTreeMap::<u64, u64>::new();
1685 for dealloc in deallocs {
1686 *(totals.entry(dealloc.owner_object_id).or_default()) +=
1687 dealloc.range.length().unwrap();
1688 inner.strategy.free(dealloc.range.clone()).expect("dealloced ranges");
1689 self.temporary_allocations
1690 .erase(&AllocatorKey { device_range: Extent(dealloc.range.clone()) });
1691 }
1692
1693 for (owner_object_id, total) in totals {
1697 match inner.owner_bytes.get_mut(&owner_object_id) {
1698 Some(counters) => counters.committed_deallocated_bytes -= total,
1699 None => {
1700 assert!(inner.volumes_deleted_pending_sync.contains(&owner_object_id));
1703 }
1704 }
1705 }
1706 }
1707
1708 pub fn reserve(
1711 self: Arc<Self>,
1712 owner_object_id: Option<u64>,
1713 amount: u64,
1714 ) -> Option<Reservation> {
1715 {
1716 let mut inner = self.inner.lock();
1717
1718 let device_free = (Saturating(self.device_size) - inner.used_bytes()).0;
1719
1720 let limit = match owner_object_id {
1721 Some(id) => std::cmp::min(inner.owner_id_bytes_left(id), device_free),
1722 None => device_free,
1723 };
1724 if limit < amount {
1725 return None;
1726 }
1727 inner.add_reservation(owner_object_id, amount);
1728 }
1729 Some(Reservation::new(self, owner_object_id, amount))
1730 }
1731
1732 pub fn reserve_with(
1735 self: Arc<Self>,
1736 owner_object_id: Option<u64>,
1737 amount: impl FnOnce(u64) -> u64,
1738 ) -> Reservation {
1739 let amount = {
1740 let mut inner = self.inner.lock();
1741
1742 let device_free = (Saturating(self.device_size) - inner.used_bytes()).0;
1743
1744 let amount = amount(match owner_object_id {
1745 Some(id) => std::cmp::min(inner.owner_id_bytes_left(id), device_free),
1746 None => device_free,
1747 });
1748
1749 inner.add_reservation(owner_object_id, amount);
1750
1751 amount
1752 };
1753
1754 Reservation::new(self, owner_object_id, amount)
1755 }
1756
1757 pub fn get_allocated_bytes(&self) -> u64 {
1759 self.inner.lock().allocated_bytes().0
1760 }
1761
1762 pub fn get_disk_bytes(&self) -> u64 {
1764 self.device_size
1765 }
1766
1767 pub fn get_owner_allocated_bytes(&self) -> BTreeMap<u64, u64> {
1771 self.inner.lock().owner_bytes.iter().map(|(k, v)| (*k, v.allocated_bytes.0)).collect()
1772 }
1773
1774 pub fn get_used_bytes(&self) -> Saturating<u64> {
1776 let inner = self.inner.lock();
1777 inner.used_bytes()
1778 }
1779}
1780
1781impl ReservationOwner for Allocator {
1782 fn release_reservation(&self, owner_object_id: Option<u64>, amount: u64) {
1783 self.inner.lock().remove_reservation(owner_object_id, amount);
1784 }
1785}
1786
1787#[async_trait]
1788impl JournalingObject for Allocator {
1789 fn apply_mutation(
1790 &self,
1791 mutation: Mutation,
1792 context: &ApplyContext<'_, '_>,
1793 _assoc_obj: AssocObj<'_>,
1794 ) -> Result<(), Error> {
1795 match mutation {
1796 Mutation::Allocator(AllocatorMutation::MarkForDeletion(owner_object_id)) => {
1797 let mut inner = self.inner.lock();
1798 inner.owner_bytes.remove(&owner_object_id);
1799
1800 inner.info.marked_for_deletion.insert(owner_object_id);
1805 inner.volumes_deleted_pending_sync.insert(owner_object_id);
1806
1807 inner.info.limit_bytes.remove(&owner_object_id);
1808 }
1809 Mutation::Allocator(AllocatorMutation::Allocate { device_range, owner_object_id }) => {
1810 self.maximum_offset.fetch_max(device_range.end, Ordering::Relaxed);
1811 let item = AllocatorItem {
1812 key: AllocatorKey { device_range: Extent(device_range.0.clone()) },
1813 value: AllocatorValue::Abs { count: 1, owner_object_id },
1814 };
1815 let len = item.key.device_range.length().unwrap();
1816 let lower_bound = item.key.lower_bound_for_merge_into();
1817 self.tree.merge_into(item, &lower_bound);
1818 let mut inner = self.inner.lock();
1819 let entry = inner.owner_bytes.entry(owner_object_id).or_default();
1820 entry.allocated_bytes += len;
1821 if let ApplyMode::Live(transaction) = context.mode {
1822 entry.uncommitted_allocated_bytes -= len;
1823 inner.dropped_temporary_allocations.push(device_range.0);
1828 if let Some(reservation) = transaction.allocator_reservation {
1829 reservation.commit(len);
1830 }
1831 }
1832 }
1833 Mutation::Allocator(AllocatorMutation::Deallocate {
1834 device_range,
1835 owner_object_id,
1836 }) => {
1837 let item = AllocatorItem {
1838 key: AllocatorKey { device_range: Extent(device_range.0) },
1839 value: AllocatorValue::None,
1840 };
1841 let len = item.key.device_range.length().unwrap();
1842
1843 {
1844 let mut inner = self.inner.lock();
1845 {
1846 let entry = inner.owner_bytes.entry(owner_object_id).or_default();
1847 entry.allocated_bytes -= len;
1848 if context.mode.is_live() {
1849 entry.committed_deallocated_bytes += len;
1850 }
1851 }
1852 if context.mode.is_live() {
1853 inner.committed_deallocated.push_back(CommittedDeallocation {
1854 log_file_offset: context.checkpoint.file_offset,
1855 range: (*item.key.device_range).clone(),
1856 owner_object_id,
1857 });
1858 }
1859 if let ApplyMode::Live(Transaction {
1860 allocator_reservation: Some(reservation),
1861 ..
1862 }) = context.mode
1863 {
1864 inner.add_reservation(reservation.owner_object_id(), len);
1865 reservation.add(len);
1866 }
1867 }
1868 let lower_bound = item.key.lower_bound_for_merge_into();
1869 self.tree.merge_into(item, &lower_bound);
1870 }
1871 Mutation::Allocator(AllocatorMutation::SetLimit { owner_object_id, bytes }) => {
1872 self.inner.lock().info.limit_bytes.insert(owner_object_id, bytes);
1877 }
1878 Mutation::BeginFlush => {
1879 self.tree.seal();
1880 let mut inner = self.inner.lock();
1883 let allocated_bytes =
1884 inner.owner_bytes.iter().map(|(k, v)| (*k, v.allocated_bytes.0)).collect();
1885 inner.info.allocated_bytes = allocated_bytes;
1886 }
1887 Mutation::EndFlush => {}
1888 _ => bail!("unexpected mutation: {:?}", mutation),
1889 }
1890 Ok(())
1891 }
1892
1893 fn drop_mutation(&self, mutation: Mutation, transaction: &Transaction<'_>) {
1894 match mutation {
1895 Mutation::Allocator(AllocatorMutation::Allocate { device_range, owner_object_id }) => {
1896 let len = device_range.length().unwrap();
1897 let mut inner = self.inner.lock();
1898 inner
1899 .owner_bytes
1900 .entry(owner_object_id)
1901 .or_default()
1902 .uncommitted_allocated_bytes -= len;
1903 if let Some(reservation) = transaction.allocator_reservation {
1904 let res_owner = reservation.owner_object_id();
1905 inner.add_reservation(res_owner, len);
1906 reservation.release_reservation(res_owner, len);
1907 }
1908 inner.strategy.free(device_range.0.clone()).expect("drop mutaton");
1909 self.temporary_allocations
1910 .erase(&AllocatorKey { device_range: Extent(device_range.0) });
1911 }
1912 Mutation::Allocator(AllocatorMutation::Deallocate { device_range, .. }) => {
1913 self.temporary_allocations
1914 .erase(&AllocatorKey { device_range: Extent(device_range.0) });
1915 }
1916 _ => {}
1917 }
1918 }
1919
1920 async fn flush(&self) -> Result<Version, Error> {
1921 let filesystem = self.filesystem.upgrade().unwrap();
1922 let object_manager = filesystem.object_manager();
1923 let earliest_version = self.tree.get_earliest_version();
1924 if !object_manager.needs_flush(self.object_id()) && earliest_version == LATEST_VERSION {
1925 return Ok(earliest_version);
1927 }
1928
1929 let fs = self.filesystem.upgrade().unwrap();
1930 let mut flusher = Flusher::new(self, &fs).await;
1931 let (new_layer_file, info) = flusher.start().await?;
1932 flusher.finish(new_layer_file, info).await
1933 }
1934}
1935
1936pub struct CoalescingIterator<I> {
1949 iter: I,
1950 item: Option<AllocatorItem>,
1951}
1952
1953impl<I: LayerIterator<AllocatorKey, AllocatorValue>> CoalescingIterator<I> {
1954 pub async fn new(iter: I) -> Result<CoalescingIterator<I>, Error> {
1955 let mut iter = Self { iter, item: None };
1956 iter.advance().await?;
1957 Ok(iter)
1958 }
1959}
1960
1961#[async_trait]
1962impl<I: LayerIterator<AllocatorKey, AllocatorValue>> LayerIterator<AllocatorKey, AllocatorValue>
1963 for CoalescingIterator<I>
1964{
1965 async fn advance(&mut self) -> Result<(), Error> {
1966 self.item = self.iter.get().map(|x| x.cloned());
1967 if self.item.is_none() {
1968 return Ok(());
1969 }
1970 let left = self.item.as_mut().unwrap();
1971 loop {
1972 self.iter.advance().await?;
1973 match self.iter.get() {
1974 None => return Ok(()),
1975 Some(right) => {
1976 ensure!(
1978 left.key.device_range.end <= right.key.device_range.start,
1979 FxfsError::Inconsistent
1980 );
1981 if left.key.device_range.end < right.key.device_range.start
1983 || left.value != *right.value
1984 {
1985 return Ok(());
1986 }
1987 left.key.device_range.end = right.key.device_range.end;
1988 }
1989 }
1990 }
1991 }
1992
1993 fn get(&self) -> Option<ItemRef<'_, AllocatorKey, AllocatorValue>> {
1994 self.item.as_ref().map(|x| x.as_item_ref())
1995 }
1996}
1997
1998struct Flusher<'a> {
1999 allocator: &'a Allocator,
2000 fs: &'a Arc<FxFilesystem>,
2001 _guard: WriteGuard<'a>,
2002}
2003
2004impl<'a> Flusher<'a> {
2005 async fn new(allocator: &'a Allocator, fs: &'a Arc<FxFilesystem>) -> Self {
2006 let keys = lock_keys![LockKey::flush(allocator.object_id())];
2007 Self { allocator, fs, _guard: fs.lock_manager().write_lock(keys).await }
2008 }
2009
2010 fn txn_options(allocator_reservation: &Reservation) -> Options<'_> {
2011 Options {
2012 skip_journal_checks: true,
2013 borrow_metadata_space: true,
2014 allocator_reservation: Some(allocator_reservation),
2015 ..Default::default()
2016 }
2017 }
2018
2019 async fn start(&mut self) -> Result<(DataObjectHandle<ObjectStore>, AllocatorInfo), Error> {
2020 let object_manager = self.fs.object_manager();
2021 let root_store = self.fs.root_store();
2022 let mut transaction = root_store
2023 .new_transaction(lock_keys![], Self::txn_options(object_manager.metadata_reservation()))
2024 .await?;
2025 let layer_object_handle = ObjectStore::create_object(
2026 &root_store,
2027 &mut transaction,
2028 HandleOptions { skip_journal_checks: true, ..Default::default() },
2029 None,
2030 )
2031 .await?;
2032 root_store.add_to_graveyard(&mut transaction, layer_object_handle.object_id());
2033 transaction.add(self.allocator.object_id(), Mutation::BeginFlush);
2040 let info = transaction
2041 .commit_with_callback(|_| {
2042 self.allocator.inner.lock().info.clone()
2045 })
2046 .await?;
2047 Ok((layer_object_handle, info))
2048 }
2049
2050 async fn finish(
2051 self,
2052 layer_object_handle: DataObjectHandle<ObjectStore>,
2053 mut info: AllocatorInfo,
2054 ) -> Result<Version, Error> {
2055 let object_manager = self.fs.object_manager();
2056 let txn_options = Self::txn_options(object_manager.metadata_reservation());
2057
2058 let layer_set = self.allocator.tree.immutable_layer_set();
2059 let total_len = layer_set.sum_len();
2060 {
2061 let start_time = std::time::Instant::now();
2062 let merged_layer_count = layer_set.layers.len();
2063 let mut merger = layer_set.merger();
2064 let iter = self.allocator.filter(merger.query(Query::FullScan).await?, true).await?;
2065 let iter = CoalescingIterator::new(iter).await?;
2066 let bytes_written = compact_with_iterator(
2067 iter,
2068 total_len,
2069 DirectWriter::new(&layer_object_handle, txn_options).await,
2070 layer_object_handle.block_size(),
2071 Some(self.fs.journal().get_compaction_yielder()),
2072 )
2073 .await?;
2074
2075 self.allocator.tree.report_compaction_metrics(
2076 bytes_written,
2077 start_time.elapsed(),
2078 merged_layer_count,
2079 );
2080 }
2081
2082 let root_store = self.fs.root_store();
2083
2084 let object_handle;
2086 let reservation_update;
2087 let mut transaction = root_store
2088 .new_transaction(
2089 lock_keys![LockKey::object(
2090 root_store.store_object_id(),
2091 self.allocator.object_id()
2092 )],
2093 txn_options,
2094 )
2095 .await?;
2096 let mut serialized_info = Vec::new();
2097
2098 debug!(oid = layer_object_handle.object_id(); "new allocator layer file");
2099 object_handle = ObjectStore::open_object(
2100 &root_store,
2101 self.allocator.object_id(),
2102 HandleOptions::default(),
2103 None,
2104 )
2105 .await?;
2106
2107 for object_id in &info.layers {
2109 root_store.add_to_graveyard(&mut transaction, *object_id);
2110 }
2111
2112 let marked_for_deletion = std::mem::take(&mut info.marked_for_deletion);
2119
2120 info.layers = vec![layer_object_handle.object_id()];
2121
2122 info.serialize_with_version(&mut serialized_info)?;
2123
2124 let mut buf = object_handle.allocate_buffer(serialized_info.len()).await;
2125 buf.as_mut_slice()[..serialized_info.len()].copy_from_slice(&serialized_info[..]);
2126 object_handle.txn_write(&mut transaction, 0u64, buf.as_ref()).await?;
2127
2128 reservation_update = ReservationUpdate::new(tree::reservation_amount_from_layer_size(
2129 layer_object_handle.get_size(),
2130 ));
2131
2132 transaction.add_with_object(
2135 self.allocator.object_id(),
2136 Mutation::EndFlush,
2137 AssocObj::Borrowed(&reservation_update),
2138 );
2139 root_store.remove_from_graveyard(&mut transaction, layer_object_handle.object_id());
2140
2141 let layers = layers_from_handles([layer_object_handle]).await?;
2142 transaction
2143 .commit_with_callback(|_| {
2144 self.allocator.tree.set_layers(layers);
2145
2146 let mut inner = self.allocator.inner.lock();
2150 inner.info.layers = info.layers;
2151 for owner_id in marked_for_deletion {
2152 inner.marked_for_deletion.remove(&owner_id);
2153 inner.info.marked_for_deletion.remove(&owner_id);
2154 }
2155 })
2156 .await?;
2157
2158 for layer in layer_set.layers {
2160 let object_id = layer.handle().map(|h| h.object_id());
2161 layer.close_layer().await;
2162 if let Some(object_id) = object_id {
2163 root_store.tombstone_object(object_id, txn_options).await?;
2164 }
2165 }
2166
2167 let mut counters = self.allocator.counters.lock();
2168 counters.num_flushes += 1;
2169 counters.last_flush_time = Some(std::time::SystemTime::now());
2170 Ok(self.allocator.tree.get_earliest_version())
2172 }
2173}
2174
2175#[cfg(test)]
2176mod tests {
2177 use crate::filesystem::{
2178 FxFilesystem, FxFilesystemBuilder, JournalingObject, OpenFxFilesystem,
2179 };
2180 use crate::fsck::fsck;
2181 use crate::lsm_tree::cache::NullCache;
2182 use crate::lsm_tree::skip_list_layer::SkipListLayer;
2183 use crate::lsm_tree::types::{FuzzyHash as _, Item, ItemRef, Layer, LayerIterator};
2184 use crate::lsm_tree::{LSMTree, Query};
2185 use crate::object_handle::ObjectHandle;
2186 use crate::object_store::allocator::merge::merge;
2187 use crate::object_store::allocator::{
2188 Allocator, AllocatorKey, AllocatorValue, CoalescingIterator,
2189 };
2190 use crate::object_store::transaction::{Options, TRANSACTION_METADATA_MAX_AMOUNT, lock_keys};
2191 use crate::object_store::volume::root_volume;
2192 use crate::object_store::{Directory, LockKey, NewChildStoreOptions, ObjectStore};
2193 use crate::range::RangeExt;
2194 use crate::round::round_up;
2195 use crate::serialized_types::{LATEST_VERSION, Versioned};
2196 use crate::testing;
2197 use bincode::Options as _;
2198 use fuchsia_async as fasync;
2199 use fuchsia_sync::Mutex;
2200 use std::cmp::{max, min};
2201 use std::ops::{Bound, Range};
2202 use std::sync::Arc;
2203 use storage_device::DeviceHolder;
2204 use storage_device::fake_device::FakeDevice;
2205
2206 #[test]
2207 fn test_allocator_key_is_range_based() {
2208 assert!(AllocatorKey { device_range: (0..100).into() }.is_range_key());
2210 }
2211
2212 #[test]
2213 fn test_allocator_key_serialization_compatibility() {
2214 let range = 1024..2048;
2215 let key = AllocatorKey { device_range: range.clone().into() };
2216
2217 let options = bincode::DefaultOptions::new().allow_trailing_bytes();
2220 let expected_bytes = options.serialize(&range).unwrap();
2221
2222 let mut key_bytes = Vec::new();
2224 key.serialize_into(&mut key_bytes).unwrap();
2225
2226 assert_eq!(key_bytes, expected_bytes);
2228
2229 let deserialized_key =
2232 AllocatorKey::deserialize_from(&mut &expected_bytes[..], LATEST_VERSION).unwrap();
2233 assert_eq!(deserialized_key, key);
2234 }
2235
2236 #[fuchsia::test]
2237 async fn test_coalescing_iterator() {
2238 let skip_list = SkipListLayer::new(100);
2239 let items = [
2240 Item::new(
2241 AllocatorKey { device_range: (0..100 * 512).into() },
2242 AllocatorValue::Abs { count: 1, owner_object_id: 99 },
2243 ),
2244 Item::new(
2245 AllocatorKey { device_range: (100 * 512..200 * 512).into() },
2246 AllocatorValue::Abs { count: 1, owner_object_id: 99 },
2247 ),
2248 ];
2249 skip_list.insert(items[1].clone()).expect("insert error");
2250 skip_list.insert(items[0].clone()).expect("insert error");
2251 let mut iter =
2252 CoalescingIterator::new(skip_list.seek(Bound::Unbounded).await.expect("seek failed"))
2253 .await
2254 .expect("new failed");
2255 let ItemRef { key, value, .. } = iter.get().expect("get failed");
2256 assert_eq!(
2257 (key, value),
2258 (
2259 &AllocatorKey { device_range: (0..200 * 512).into() },
2260 &AllocatorValue::Abs { count: 1, owner_object_id: 99 }
2261 )
2262 );
2263 iter.advance().await.expect("advance failed");
2264 assert!(iter.get().is_none());
2265 }
2266
2267 #[fuchsia::test]
2268 async fn test_merge_and_coalesce_across_three_layers() {
2269 let lsm_tree = LSMTree::new(merge, Box::new(NullCache {}));
2270 lsm_tree
2271 .insert(Item::new(
2272 AllocatorKey { device_range: (100 * 512..200 * 512).into() },
2273 AllocatorValue::Abs { count: 1, owner_object_id: 99 },
2274 ))
2275 .expect("insert error");
2276 lsm_tree.seal();
2277 lsm_tree
2278 .insert(Item::new(
2279 AllocatorKey { device_range: (0..100 * 512).into() },
2280 AllocatorValue::Abs { count: 1, owner_object_id: 99 },
2281 ))
2282 .expect("insert error");
2283
2284 let layer_set = lsm_tree.layer_set();
2285 let mut merger = layer_set.merger();
2286 let mut iter =
2287 CoalescingIterator::new(merger.query(Query::FullScan).await.expect("seek failed"))
2288 .await
2289 .expect("new failed");
2290 let ItemRef { key, value, .. } = iter.get().expect("get failed");
2291 assert_eq!(
2292 (key, value),
2293 (
2294 &AllocatorKey { device_range: (0..200 * 512).into() },
2295 &AllocatorValue::Abs { count: 1, owner_object_id: 99 }
2296 )
2297 );
2298 iter.advance().await.expect("advance failed");
2299 assert!(iter.get().is_none());
2300 }
2301
2302 #[fuchsia::test]
2303 async fn test_merge_and_coalesce_wont_merge_across_object_id() {
2304 let lsm_tree = LSMTree::new(merge, Box::new(NullCache {}));
2305 lsm_tree
2306 .insert(Item::new(
2307 AllocatorKey { device_range: (100 * 512..200 * 512).into() },
2308 AllocatorValue::Abs { count: 1, owner_object_id: 99 },
2309 ))
2310 .expect("insert error");
2311 lsm_tree.seal();
2312 lsm_tree
2313 .insert(Item::new(
2314 AllocatorKey { device_range: (0..100 * 512).into() },
2315 AllocatorValue::Abs { count: 1, owner_object_id: 98 },
2316 ))
2317 .expect("insert error");
2318
2319 let layer_set = lsm_tree.layer_set();
2320 let mut merger = layer_set.merger();
2321 let mut iter =
2322 CoalescingIterator::new(merger.query(Query::FullScan).await.expect("seek failed"))
2323 .await
2324 .expect("new failed");
2325 let ItemRef { key, value, .. } = iter.get().expect("get failed");
2326 assert_eq!(
2327 (key, value),
2328 (
2329 &AllocatorKey { device_range: (0..100 * 512).into() },
2330 &AllocatorValue::Abs { count: 1, owner_object_id: 98 },
2331 )
2332 );
2333 iter.advance().await.expect("advance failed");
2334 let ItemRef { key, value, .. } = iter.get().expect("get failed");
2335 assert_eq!(
2336 (key, value),
2337 (
2338 &AllocatorKey { device_range: (100 * 512..200 * 512).into() },
2339 &AllocatorValue::Abs { count: 1, owner_object_id: 99 }
2340 )
2341 );
2342 iter.advance().await.expect("advance failed");
2343 assert!(iter.get().is_none());
2344 }
2345
2346 fn overlap(a: &Range<u64>, b: &Range<u64>) -> u64 {
2347 if a.end > b.start && a.start < b.end {
2348 min(a.end, b.end) - max(a.start, b.start)
2349 } else {
2350 0
2351 }
2352 }
2353
2354 async fn collect_allocations(allocator: &Allocator) -> Vec<Range<u64>> {
2355 let layer_set = allocator.tree.layer_set();
2356 let mut merger = layer_set.merger();
2357 let mut iter = allocator
2358 .filter(merger.query(Query::FullScan).await.expect("seek failed"), false)
2359 .await
2360 .expect("build iterator");
2361 let mut allocations: Vec<Range<u64>> = Vec::new();
2362 while let Some(ItemRef { key: AllocatorKey { device_range }, .. }) = iter.get() {
2363 if let Some(r) = allocations.last() {
2364 assert!(device_range.start >= r.end);
2365 }
2366 allocations.push(device_range.clone().into());
2367 iter.advance().await.expect("advance failed");
2368 }
2369 allocations
2370 }
2371
2372 async fn check_allocations(allocator: &Allocator, expected_allocations: &[Range<u64>]) {
2373 let layer_set = allocator.tree.layer_set();
2374 let mut merger = layer_set.merger();
2375 let mut iter = allocator
2376 .filter(merger.query(Query::FullScan).await.expect("seek failed"), false)
2377 .await
2378 .expect("build iterator");
2379 let mut found = 0;
2380 while let Some(ItemRef { key: AllocatorKey { device_range }, .. }) = iter.get() {
2381 let mut l = device_range.length().expect("Invalid range");
2382 found += l;
2383 for range in expected_allocations {
2386 l -= overlap(range, device_range);
2387 if l == 0 {
2388 break;
2389 }
2390 }
2391 assert_eq!(l, 0, "range {:?} not covered by expectations", device_range);
2392 iter.advance().await.expect("advance failed");
2393 }
2394 assert_eq!(found, expected_allocations.iter().map(|r| r.length().unwrap()).sum::<u64>());
2396 }
2397
2398 async fn test_fs() -> (OpenFxFilesystem, Arc<Allocator>) {
2399 let device = DeviceHolder::new(FakeDevice::new(4096, 4096));
2400 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2401 let allocator = fs.allocator();
2402 (fs, allocator)
2403 }
2404
2405 #[fuchsia::test]
2406 async fn test_allocations() {
2407 const STORE_OBJECT_ID: u64 = 99;
2408 let (fs, allocator) = test_fs().await;
2409 let mut transaction = fs
2410 .root_store()
2411 .new_transaction(lock_keys![], Options::default())
2412 .await
2413 .expect("new failed");
2414 let mut device_ranges = collect_allocations(&allocator).await;
2415
2416 let expected = vec![
2418 0..4096, 4096..139264, 139264..204800, 204800..335872, 335872..401408, 524288..528384, ];
2425 assert_eq!(device_ranges, expected);
2426 device_ranges.push(
2427 allocator
2428 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2429 .await
2430 .expect("allocate failed"),
2431 );
2432 assert_eq!(device_ranges.last().unwrap().length().expect("Invalid range"), fs.block_size());
2433 device_ranges.push(
2434 allocator
2435 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2436 .await
2437 .expect("allocate failed"),
2438 );
2439 assert_eq!(device_ranges.last().unwrap().length().expect("Invalid range"), fs.block_size());
2440 assert_eq!(overlap(&device_ranges[0], &device_ranges[1]), 0);
2441 transaction.commit().await.expect("commit failed");
2442 let mut transaction = fs
2443 .root_store()
2444 .new_transaction(lock_keys![], Options::default())
2445 .await
2446 .expect("new failed");
2447 device_ranges.push(
2448 allocator
2449 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2450 .await
2451 .expect("allocate failed"),
2452 );
2453 assert_eq!(device_ranges[7].length().expect("Invalid range"), fs.block_size());
2454 assert_eq!(overlap(&device_ranges[5], &device_ranges[7]), 0);
2455 assert_eq!(overlap(&device_ranges[6], &device_ranges[7]), 0);
2456 transaction.commit().await.expect("commit failed");
2457
2458 check_allocations(&allocator, &device_ranges).await;
2459 }
2460
2461 #[fuchsia::test]
2462 async fn test_allocate_more_than_max_size() {
2463 const STORE_OBJECT_ID: u64 = 99;
2464 let (fs, allocator) = test_fs().await;
2465 let mut transaction = fs
2466 .root_store()
2467 .new_transaction(lock_keys![], Options::default())
2468 .await
2469 .expect("new failed");
2470 let mut device_ranges = collect_allocations(&allocator).await;
2471 device_ranges.push(
2472 allocator
2473 .allocate(&mut transaction, STORE_OBJECT_ID, fs.device().size())
2474 .await
2475 .expect("allocate failed"),
2476 );
2477 assert_eq!(
2478 device_ranges.last().unwrap().length().expect("Invalid range"),
2479 allocator.max_extent_size_bytes
2480 );
2481 transaction.commit().await.expect("commit failed");
2482
2483 check_allocations(&allocator, &device_ranges).await;
2484 }
2485
2486 #[fuchsia::test]
2487 async fn test_deallocations() {
2488 const STORE_OBJECT_ID: u64 = 99;
2489 let (fs, allocator) = test_fs().await;
2490 let initial_allocations = collect_allocations(&allocator).await;
2491
2492 let mut transaction = fs
2493 .root_store()
2494 .new_transaction(lock_keys![], Options::default())
2495 .await
2496 .expect("new failed");
2497 let device_range1 = allocator
2498 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2499 .await
2500 .expect("allocate failed");
2501 assert_eq!(device_range1.length().expect("Invalid range"), fs.block_size());
2502 transaction.commit().await.expect("commit failed");
2503
2504 let mut transaction = fs
2505 .root_store()
2506 .new_transaction(lock_keys![], Options::default())
2507 .await
2508 .expect("new failed");
2509 allocator
2510 .deallocate(&mut transaction, STORE_OBJECT_ID, device_range1)
2511 .await
2512 .expect("deallocate failed");
2513 transaction.commit().await.expect("commit failed");
2514
2515 check_allocations(&allocator, &initial_allocations).await;
2516 }
2517
2518 #[fuchsia::test]
2519 async fn test_mark_allocated() {
2520 const STORE_OBJECT_ID: u64 = 99;
2521 let (fs, allocator) = test_fs().await;
2522 let mut device_ranges = collect_allocations(&allocator).await;
2523 let range = {
2524 let mut transaction = fs
2525 .root_store()
2526 .new_transaction(lock_keys![], Options::default())
2527 .await
2528 .expect("new failed");
2529 allocator
2531 .allocate(&mut transaction, STORE_OBJECT_ID, 2 * fs.block_size())
2532 .await
2533 .expect("allocate failed")
2534 };
2536
2537 let mut transaction = fs
2538 .root_store()
2539 .new_transaction(lock_keys![], Options::default())
2540 .await
2541 .expect("new failed");
2542
2543 device_ranges.push(
2546 allocator
2547 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2548 .await
2549 .expect("allocate failed"),
2550 );
2551
2552 assert_eq!(device_ranges.last().unwrap().start, range.start);
2553
2554 let mut range2 = range.clone();
2556 range2.start += fs.block_size();
2557 allocator
2558 .mark_allocated(&mut transaction, STORE_OBJECT_ID, range2.clone())
2559 .expect("mark_allocated failed");
2560 device_ranges.push(range2);
2561
2562 device_ranges.push(
2564 allocator
2565 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2566 .await
2567 .expect("allocate failed"),
2568 );
2569 let last_range = device_ranges.last().unwrap();
2570 assert_eq!(last_range.length().expect("Invalid range"), fs.block_size());
2571 assert_eq!(overlap(last_range, &range), 0);
2572 transaction.commit().await.expect("commit failed");
2573
2574 check_allocations(&allocator, &device_ranges).await;
2575 }
2576
2577 #[fuchsia::test]
2578 async fn test_mark_for_deletion() {
2579 const STORE_OBJECT_ID: u64 = 99;
2580 let (fs, allocator) = test_fs().await;
2581
2582 let initial_allocated_bytes = allocator.get_allocated_bytes();
2584 let mut device_ranges = collect_allocations(&allocator).await;
2585 let mut transaction = fs
2586 .root_store()
2587 .new_transaction(lock_keys![], Options::default())
2588 .await
2589 .expect("new failed");
2590 for _ in 0..15 {
2592 device_ranges.push(
2593 allocator
2594 .allocate(&mut transaction, STORE_OBJECT_ID, 100 * fs.block_size())
2595 .await
2596 .expect("allocate failed"),
2597 );
2598 device_ranges.push(
2599 allocator
2600 .allocate(&mut transaction, STORE_OBJECT_ID, 100 * fs.block_size())
2601 .await
2602 .expect("allocate2 failed"),
2603 );
2604 }
2605 transaction.commit().await.expect("commit failed");
2606 check_allocations(&allocator, &device_ranges).await;
2607
2608 assert_eq!(
2609 allocator.get_allocated_bytes(),
2610 initial_allocated_bytes + fs.block_size() * 3000
2611 );
2612
2613 let mut transaction = fs
2615 .root_store()
2616 .new_transaction(lock_keys![], Options::default())
2617 .await
2618 .expect("new failed");
2619 allocator.mark_for_deletion(&mut transaction, STORE_OBJECT_ID);
2620 transaction.commit().await.expect("commit failed");
2621
2622 assert_eq!(allocator.get_allocated_bytes(), initial_allocated_bytes);
2624 check_allocations(&allocator, &device_ranges).await;
2625
2626 device_ranges.clear();
2629
2630 let mut transaction = fs
2631 .root_store()
2632 .new_transaction(lock_keys![], Options::default())
2633 .await
2634 .expect("new failed");
2635 let target_bytes = 1500 * fs.block_size();
2636 while device_ranges.iter().map(|x| x.length().unwrap()).sum::<u64>() != target_bytes {
2637 let len = std::cmp::min(
2638 target_bytes - device_ranges.iter().map(|x| x.length().unwrap()).sum::<u64>(),
2639 100 * fs.block_size(),
2640 );
2641 device_ranges.push(
2642 allocator.allocate(&mut transaction, 100, len).await.expect("allocate failed"),
2643 );
2644 }
2645 transaction.commit().await.expect("commit failed");
2646
2647 allocator.flush().await.expect("flush failed");
2649
2650 assert!(!allocator.get_owner_allocated_bytes().contains_key(&STORE_OBJECT_ID));
2654 assert_eq!(*allocator.get_owner_allocated_bytes().get(&100).unwrap(), target_bytes,);
2655 }
2656
2657 async fn create_file(store: &Arc<ObjectStore>, size: usize) {
2658 let root_directory =
2659 Directory::open(store, store.root_directory_object_id()).await.expect("open failed");
2660
2661 let mut transaction = store
2662 .filesystem()
2663 .root_store()
2664 .new_transaction(
2665 lock_keys![LockKey::object(
2666 store.store_object_id(),
2667 store.root_directory_object_id()
2668 )],
2669 Options::default(),
2670 )
2671 .await
2672 .expect("new_transaction failed");
2673 let file = root_directory
2674 .create_child_file(&mut transaction, &format!("foo {}", size))
2675 .await
2676 .expect("create_child_file failed");
2677 transaction.commit().await.expect("commit failed");
2678
2679 let buffer = file.allocate_buffer(size).await;
2680
2681 let mut transaction = file
2683 .new_transaction_with_options(Options {
2684 borrow_metadata_space: true,
2685 ..Default::default()
2686 })
2687 .await
2688 .expect("new_transaction_with_options failed");
2689 file.txn_write(&mut transaction, 0, buffer.as_ref()).await.expect("txn_write failed");
2690 transaction.commit().await.expect("commit failed");
2691 }
2692
2693 #[fuchsia::test]
2694 async fn test_replay_with_deleted_store_and_compaction() {
2695 let (fs, _) = test_fs().await;
2696
2697 const FILE_SIZE: usize = 10_000_000;
2698
2699 let mut store_id = {
2700 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2701 let store = root_vol
2702 .new_volume("vol", NewChildStoreOptions::default())
2703 .await
2704 .expect("new_volume failed");
2705
2706 create_file(&store, FILE_SIZE).await;
2707 store.store_object_id()
2708 };
2709
2710 fs.close().await.expect("close failed");
2711 let device = fs.take_device().await;
2712 device.reopen(false);
2713
2714 let mut fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2715
2716 fs.journal().force_compact().await.expect("compact failed");
2719
2720 for _ in 0..2 {
2721 {
2722 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2723
2724 let transaction = fs
2725 .root_store()
2726 .new_transaction(
2727 lock_keys![
2728 LockKey::object(
2729 root_vol.volume_directory().store().store_object_id(),
2730 root_vol.volume_directory().object_id(),
2731 ),
2732 LockKey::flush(store_id)
2733 ],
2734 Options { borrow_metadata_space: true, ..Default::default() },
2735 )
2736 .await
2737 .expect("new_transaction failed");
2738 root_vol
2739 .delete_volume("vol", transaction, || {})
2740 .await
2741 .expect("delete_volume failed");
2742
2743 let store = root_vol
2744 .new_volume("vol", NewChildStoreOptions::default())
2745 .await
2746 .expect("new_volume failed");
2747 create_file(&store, FILE_SIZE).await;
2748 store_id = store.store_object_id();
2749 }
2750
2751 fs.close().await.expect("close failed");
2752 let device = fs.take_device().await;
2753 device.reopen(false);
2754
2755 fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2756 }
2757
2758 fsck(fs.clone()).await.expect("fsck failed");
2759 fs.close().await.expect("close failed");
2760 }
2761
2762 #[fuchsia::test(threads = 4)]
2763 async fn test_compaction_delete_race() {
2764 let (fs, _allocator) = test_fs().await;
2765
2766 {
2767 const FILE_SIZE: usize = 10_000_000;
2768
2769 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2770 let store = root_vol
2771 .new_volume("vol", NewChildStoreOptions::default())
2772 .await
2773 .expect("new_volume failed");
2774
2775 create_file(&store, FILE_SIZE).await;
2776
2777 let fs_clone = fs.clone();
2779
2780 let executor_tasks = testing::force_executor_threads_to_run(4).await;
2783
2784 let task = fasync::Task::spawn(async move {
2785 fs_clone.journal().force_compact().await.expect("compact failed");
2786 });
2787
2788 drop(executor_tasks);
2790
2791 let sleep = rand::random_range(3000..6000);
2794 std::thread::sleep(std::time::Duration::from_micros(sleep));
2795 log::info!("sleep {sleep}us");
2796
2797 let transaction = fs
2798 .root_store()
2799 .new_transaction(
2800 lock_keys![
2801 LockKey::object(
2802 root_vol.volume_directory().store().store_object_id(),
2803 root_vol.volume_directory().object_id(),
2804 ),
2805 LockKey::flush(store.store_object_id())
2806 ],
2807 Options { borrow_metadata_space: true, ..Default::default() },
2808 )
2809 .await
2810 .expect("new_transaction failed");
2811 root_vol.delete_volume("vol", transaction, || {}).await.expect("delete_volume failed");
2812
2813 task.await;
2814 }
2815
2816 fs.journal().force_compact().await.expect("compact failed");
2817 fs.close().await.expect("close failed");
2818
2819 let device = fs.take_device().await;
2820 device.reopen(false);
2821
2822 let fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2823 fsck(fs.clone()).await.expect("fsck failed");
2824 fs.close().await.expect("close failed");
2825 }
2826
2827 #[fuchsia::test]
2828 async fn test_delete_multiple_volumes() {
2829 let (mut fs, _) = test_fs().await;
2830
2831 for _ in 0..50 {
2832 {
2833 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2834 let store = root_vol
2835 .new_volume("vol", NewChildStoreOptions::default())
2836 .await
2837 .expect("new_volume failed");
2838
2839 create_file(&store, 1_000_000).await;
2840
2841 let transaction = fs
2842 .root_store()
2843 .new_transaction(
2844 lock_keys![
2845 LockKey::object(
2846 root_vol.volume_directory().store().store_object_id(),
2847 root_vol.volume_directory().object_id(),
2848 ),
2849 LockKey::flush(store.store_object_id())
2850 ],
2851 Options { borrow_metadata_space: true, ..Default::default() },
2852 )
2853 .await
2854 .expect("new_transaction failed");
2855 root_vol
2856 .delete_volume("vol", transaction, || {})
2857 .await
2858 .expect("delete_volume failed");
2859
2860 fs.allocator().flush().await.expect("flush failed");
2861 }
2862
2863 fs.close().await.expect("close failed");
2864 let device = fs.take_device().await;
2865 device.reopen(false);
2866
2867 fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2868 }
2869
2870 fsck(fs.clone()).await.expect("fsck failed");
2871 fs.close().await.expect("close failed");
2872 }
2873
2874 #[fuchsia::test]
2875 async fn test_allocate_free_reallocate() {
2876 const STORE_OBJECT_ID: u64 = 99;
2877 let (fs, allocator) = test_fs().await;
2878
2879 let mut device_ranges = Vec::new();
2881 let mut transaction = fs
2882 .root_store()
2883 .new_transaction(lock_keys![], Options::default())
2884 .await
2885 .expect("new failed");
2886 for _ in 0..30 {
2887 device_ranges.push(
2888 allocator
2889 .allocate(&mut transaction, STORE_OBJECT_ID, 100 * fs.block_size())
2890 .await
2891 .expect("allocate failed"),
2892 );
2893 }
2894 transaction.commit().await.expect("commit failed");
2895
2896 assert_eq!(
2897 fs.block_size() * 3000,
2898 *allocator.get_owner_allocated_bytes().entry(STORE_OBJECT_ID).or_default()
2899 );
2900
2901 let mut transaction = fs
2903 .root_store()
2904 .new_transaction(lock_keys![], Options::default())
2905 .await
2906 .expect("new failed");
2907 for range in std::mem::replace(&mut device_ranges, Vec::new()) {
2908 allocator.deallocate(&mut transaction, STORE_OBJECT_ID, range).await.expect("dealloc");
2909 }
2910 transaction.commit().await.expect("commit failed");
2911
2912 assert_eq!(0, *allocator.get_owner_allocated_bytes().entry(STORE_OBJECT_ID).or_default());
2913
2914 let mut transaction = fs
2917 .root_store()
2918 .new_transaction(lock_keys![], Options::default())
2919 .await
2920 .expect("new failed");
2921 let target_len = 1500 * fs.block_size();
2922 while device_ranges.iter().map(|i| i.length().unwrap()).sum::<u64>() != target_len {
2923 let len = target_len - device_ranges.iter().map(|i| i.length().unwrap()).sum::<u64>();
2924 device_ranges.push(
2925 allocator
2926 .allocate(&mut transaction, STORE_OBJECT_ID, len)
2927 .await
2928 .expect("allocate failed"),
2929 );
2930 }
2931 transaction.commit().await.expect("commit failed");
2932
2933 assert_eq!(
2934 fs.block_size() * 1500,
2935 *allocator.get_owner_allocated_bytes().entry(STORE_OBJECT_ID).or_default()
2936 );
2937 }
2938
2939 #[fuchsia::test]
2940 async fn test_flush() {
2941 const STORE_OBJECT_ID: u64 = 99;
2942
2943 let mut device_ranges = Vec::new();
2944 let device = {
2945 let (fs, allocator) = test_fs().await;
2946 let mut transaction = fs
2947 .root_store()
2948 .new_transaction(lock_keys![], Options::default())
2949 .await
2950 .expect("new failed");
2951 device_ranges.push(
2952 allocator
2953 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2954 .await
2955 .expect("allocate failed"),
2956 );
2957 device_ranges.push(
2958 allocator
2959 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2960 .await
2961 .expect("allocate failed"),
2962 );
2963 device_ranges.push(
2964 allocator
2965 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2966 .await
2967 .expect("allocate failed"),
2968 );
2969 transaction.commit().await.expect("commit failed");
2970
2971 allocator.flush().await.expect("flush failed");
2972
2973 fs.close().await.expect("close failed");
2974 fs.take_device().await
2975 };
2976
2977 device.reopen(false);
2978 let fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2979 let allocator = fs.allocator();
2980
2981 let allocated = collect_allocations(&allocator).await;
2982
2983 for i in &device_ranges {
2985 let mut overlapping = 0;
2986 for j in &allocated {
2987 overlapping += overlap(i, j);
2988 }
2989 assert_eq!(overlapping, i.length().unwrap(), "Range {i:?} not allocated");
2990 }
2991
2992 let mut transaction = fs
2993 .root_store()
2994 .new_transaction(lock_keys![], Options::default())
2995 .await
2996 .expect("new failed");
2997 let range = allocator
2998 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2999 .await
3000 .expect("allocate failed");
3001
3002 for r in &allocated {
3004 assert_eq!(overlap(r, &range), 0);
3005 }
3006 transaction.commit().await.expect("commit failed");
3007 }
3008
3009 #[fuchsia::test]
3010 async fn test_dropped_transaction() {
3011 const STORE_OBJECT_ID: u64 = 99;
3012 let (fs, allocator) = test_fs().await;
3013 let allocated_range = {
3014 let mut transaction = fs
3015 .root_store()
3016 .new_transaction(lock_keys![], Options::default())
3017 .await
3018 .expect("new_transaction failed");
3019 allocator
3020 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
3021 .await
3022 .expect("allocate failed")
3023 };
3024 let mut transaction = fs
3027 .root_store()
3028 .new_transaction(lock_keys![], Options::default())
3029 .await
3030 .expect("new_transaction failed");
3031 assert_eq!(
3032 allocator
3033 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
3034 .await
3035 .expect("allocate failed"),
3036 allocated_range
3037 );
3038 }
3039
3040 #[fuchsia::test]
3041 async fn test_cleanup_removed_owner() {
3042 const STORE_OBJECT_ID: u64 = 99;
3043 let device = {
3044 let (fs, allocator) = test_fs().await;
3045
3046 assert!(!allocator.get_owner_allocated_bytes().contains_key(&STORE_OBJECT_ID));
3047 {
3048 let mut transaction = fs
3049 .root_store()
3050 .new_transaction(lock_keys![], Options::default())
3051 .await
3052 .unwrap();
3053 allocator
3054 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
3055 .await
3056 .expect("Allocating");
3057 transaction.commit().await.expect("Committing.");
3058 }
3059 allocator.flush().await.expect("Flushing");
3060 assert!(allocator.get_owner_allocated_bytes().contains_key(&STORE_OBJECT_ID));
3061 {
3062 let mut transaction = fs
3063 .root_store()
3064 .new_transaction(lock_keys![], Options::default())
3065 .await
3066 .unwrap();
3067 allocator.mark_for_deletion(&mut transaction, STORE_OBJECT_ID);
3068 transaction.commit().await.expect("Committing.");
3069 }
3070 assert!(!allocator.get_owner_allocated_bytes().contains_key(&STORE_OBJECT_ID));
3071 fs.close().await.expect("Closing");
3072 fs.take_device().await
3073 };
3074
3075 device.reopen(false);
3076 let fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
3077 let allocator = fs.allocator();
3078 assert!(!allocator.get_owner_allocated_bytes().contains_key(&STORE_OBJECT_ID));
3079 }
3080
3081 #[fuchsia::test]
3082 async fn test_allocated_bytes() {
3083 const STORE_OBJECT_ID: u64 = 99;
3084 let (fs, allocator) = test_fs().await;
3085
3086 let initial_allocated_bytes = allocator.get_allocated_bytes();
3087
3088 let allocated_bytes = initial_allocated_bytes + fs.block_size();
3090 let allocated_range = {
3091 let mut transaction = fs
3092 .root_store()
3093 .new_transaction(lock_keys![], Options::default())
3094 .await
3095 .expect("new_transaction failed");
3096 let range = allocator
3097 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
3098 .await
3099 .expect("allocate failed");
3100 transaction.commit().await.expect("commit failed");
3101 assert_eq!(allocator.get_allocated_bytes(), allocated_bytes);
3102 range
3103 };
3104
3105 {
3106 let mut transaction = fs
3107 .root_store()
3108 .new_transaction(lock_keys![], Options::default())
3109 .await
3110 .expect("new_transaction failed");
3111 allocator
3112 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
3113 .await
3114 .expect("allocate failed");
3115
3116 assert_eq!(allocator.get_allocated_bytes(), allocated_bytes);
3118 }
3119
3120 assert_eq!(allocator.get_allocated_bytes(), allocated_bytes);
3122
3123 let deallocate_range = allocated_range.start + 20..allocated_range.end - 20;
3125 let mut transaction = fs
3126 .root_store()
3127 .new_transaction(lock_keys![], Options::default())
3128 .await
3129 .expect("new failed");
3130 allocator
3131 .deallocate(&mut transaction, STORE_OBJECT_ID, deallocate_range)
3132 .await
3133 .expect("deallocate failed");
3134
3135 assert_eq!(allocator.get_allocated_bytes(), allocated_bytes);
3137
3138 transaction.commit().await.expect("commit failed");
3139
3140 assert_eq!(allocator.get_allocated_bytes(), initial_allocated_bytes + 40);
3142 }
3143
3144 #[fuchsia::test]
3145 async fn test_persist_bytes_limit() {
3146 const LIMIT: u64 = 12345;
3147 const OWNER_ID: u64 = 12;
3148
3149 let (fs, allocator) = test_fs().await;
3150 {
3151 let mut transaction = fs
3152 .root_store()
3153 .new_transaction(lock_keys![], Options::default())
3154 .await
3155 .expect("new_transaction failed");
3156 allocator
3157 .set_bytes_limit(&mut transaction, OWNER_ID, LIMIT)
3158 .expect("Failed to set limit.");
3159 assert!(allocator.inner.lock().info.limit_bytes.get(&OWNER_ID).is_none());
3160 transaction.commit().await.expect("Failed to commit transaction");
3161 let bytes: u64 = *allocator
3162 .inner
3163 .lock()
3164 .info
3165 .limit_bytes
3166 .get(&OWNER_ID)
3167 .expect("Failed to find limit");
3168 assert_eq!(LIMIT, bytes);
3169 }
3170 }
3171
3172 fn coalesce_ranges(ranges: Vec<Range<u64>>) -> Vec<Range<u64>> {
3176 let mut coalesced = Vec::new();
3177 let mut prev: Option<Range<u64>> = None;
3178 for range in ranges {
3179 if let Some(prev_range) = &mut prev {
3180 if range.start == prev_range.end {
3181 prev_range.end = range.end;
3182 } else {
3183 coalesced.push(prev_range.clone());
3184 prev = Some(range);
3185 }
3186 } else {
3187 prev = Some(range);
3188 }
3189 }
3190 if let Some(prev_range) = prev {
3191 coalesced.push(prev_range);
3192 }
3193 coalesced
3194 }
3195
3196 #[fuchsia::test]
3197 async fn test_take_for_trimming() {
3198 const STORE_OBJECT_ID: u64 = 99;
3199
3200 let allocated_range;
3203 let expected_free_ranges;
3204 let device = {
3205 let (fs, allocator) = test_fs().await;
3206 let bs = fs.block_size();
3207 let mut transaction = fs
3208 .root_store()
3209 .new_transaction(lock_keys![], Options::default())
3210 .await
3211 .expect("new failed");
3212 allocated_range = allocator
3213 .allocate(&mut transaction, STORE_OBJECT_ID, 32 * bs)
3214 .await
3215 .expect("allocate failed");
3216 transaction.commit().await.expect("commit failed");
3217
3218 let mut transaction = fs
3219 .root_store()
3220 .new_transaction(lock_keys![], Options::default())
3221 .await
3222 .expect("new failed");
3223 let base = allocated_range.start;
3224 expected_free_ranges = vec![
3225 base..(base + (bs * 1)),
3226 (base + (bs * 2))..(base + (bs * 3)),
3227 (base + (bs * 4))..(base + (bs * 8)),
3231 (base + (bs * 8))..(base + (bs * 12)),
3232 (base + (bs * 12))..(base + (bs * 13)),
3233 (base + (bs * 29))..(base + (bs * 30)),
3234 ];
3235 for range in &expected_free_ranges {
3236 allocator
3237 .deallocate(&mut transaction, STORE_OBJECT_ID, range.clone())
3238 .await
3239 .expect("deallocate failed");
3240 }
3241 transaction.commit().await.expect("commit failed");
3242
3243 allocator.flush().await.expect("flush failed");
3244
3245 fs.close().await.expect("close failed");
3246 fs.take_device().await
3247 };
3248
3249 device.reopen(false);
3250 let fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
3251 let allocator = fs.allocator();
3252
3253 let max_extent_size = fs.block_size() as usize * 4;
3257 const EXTENTS_PER_BATCH: usize = 2;
3258 let mut free_ranges = vec![];
3259 let mut offset = allocated_range.start;
3260 while offset < allocated_range.end {
3261 let free = allocator
3262 .take_for_trimming(offset, max_extent_size, EXTENTS_PER_BATCH)
3263 .await
3264 .expect("take_for_trimming failed");
3265 free_ranges.extend(
3266 free.extents().iter().filter(|range| range.end <= allocated_range.end).cloned(),
3267 );
3268 offset = free.extents().last().expect("Unexpectedly hit the end of free extents").end;
3269 }
3270 let coalesced_free_ranges = coalesce_ranges(free_ranges);
3273 let coalesced_expected_free_ranges = coalesce_ranges(expected_free_ranges);
3274
3275 assert_eq!(coalesced_free_ranges, coalesced_expected_free_ranges);
3276 }
3277
3278 #[fuchsia::test]
3279 async fn test_allocations_wait_for_free_extents() {
3280 const STORE_OBJECT_ID: u64 = 99;
3281 let (fs, allocator) = test_fs().await;
3282 let allocator_clone = allocator.clone();
3283
3284 let mut transaction = fs
3285 .root_store()
3286 .new_transaction(lock_keys![], Options::default())
3287 .await
3288 .expect("new failed");
3289
3290 let max_extent_size = fs.device().size() as usize;
3292 const EXTENTS_PER_BATCH: usize = usize::MAX;
3293
3294 let trim_done = Arc::new(Mutex::new(false));
3300 let trimmable_extents = allocator
3301 .take_for_trimming(0, max_extent_size, EXTENTS_PER_BATCH)
3302 .await
3303 .expect("take_for_trimming failed");
3304
3305 let trim_done_clone = trim_done.clone();
3306 let bs = fs.block_size();
3307 let alloc_task = fasync::Task::spawn(async move {
3308 allocator_clone
3309 .allocate(&mut transaction, STORE_OBJECT_ID, bs)
3310 .await
3311 .expect("allocate should fail");
3312 {
3313 assert!(*trim_done_clone.lock(), "Allocation finished before trim completed");
3314 }
3315 transaction.commit().await.expect("commit failed");
3316 });
3317
3318 fasync::Timer::new(std::time::Duration::from_millis(100)).await;
3321
3322 {
3324 let mut trim_done = trim_done.lock();
3325 std::mem::drop(trimmable_extents);
3326 *trim_done = true;
3327 }
3328
3329 alloc_task.await;
3330 }
3331
3332 #[fuchsia::test]
3333 async fn test_allocation_with_reservation_not_multiple_of_block_size() {
3334 const STORE_OBJECT_ID: u64 = 99;
3335 let (fs, allocator) = test_fs().await;
3336
3337 const RESERVATION_AMOUNT: u64 = TRANSACTION_METADATA_MAX_AMOUNT + 5000;
3339 let reservation =
3340 allocator.clone().reserve(Some(STORE_OBJECT_ID), RESERVATION_AMOUNT).unwrap();
3341
3342 let mut transaction = fs
3343 .root_store()
3344 .new_transaction(
3345 lock_keys![],
3346 Options { allocator_reservation: Some(&reservation), ..Options::default() },
3347 )
3348 .await
3349 .expect("new failed");
3350
3351 let range = allocator
3352 .allocate(
3353 &mut transaction,
3354 STORE_OBJECT_ID,
3355 round_up(RESERVATION_AMOUNT, fs.block_size()).unwrap(),
3356 )
3357 .await
3358 .expect("allocate faiiled");
3359 assert_eq!((range.end - range.start) % fs.block_size(), 0);
3360
3361 println!("{}", range.end - range.start);
3362 }
3363}