1pub mod merge;
90pub mod strategy;
91
92use crate::drop_event::DropEvent;
93use crate::errors::FxfsError;
94use crate::filesystem::{ApplyContext, ApplyMode, FxFilesystem, JournalingObject, SyncOptions};
95use crate::log::*;
96use crate::lsm_tree::cache::NullCache;
97use crate::lsm_tree::skip_list_layer::SkipListLayer;
98use crate::lsm_tree::types::{
99 FuzzyHash, Item, ItemRef, Layer, LayerIterator, LayerKey, MergeType, OrdLowerBound,
100 OrdUpperBound, RangeKey, SortByU64, Value,
101};
102use crate::lsm_tree::{LSMTree, Query, compact_with_iterator, layers_from_handles};
103use crate::object_handle::{INVALID_OBJECT_ID, ObjectHandle, ReadObjectHandle};
104use crate::object_store::object_manager::ReservationUpdate;
105use crate::object_store::transaction::{
106 AllocatorMutation, AssocObj, LockKey, Mutation, Options, Transaction, WriteGuard, lock_keys,
107};
108use crate::object_store::{
109 DataObjectHandle, DirectWriter, HandleOptions, ObjectStore, ReservedId, tree,
110};
111use crate::range::RangeExt;
112use crate::round::{round_div, round_down, round_up};
113use crate::serialized_types::{
114 DEFAULT_MAX_SERIALIZED_RECORD_SIZE, LATEST_VERSION, Version, Versioned, VersionedLatest,
115};
116use anyhow::{Context, Error, anyhow, bail, ensure};
117use async_trait::async_trait;
118use either::Either::{Left, Right};
119use event_listener::EventListener;
120use fprint::TypeFingerprint;
121use fuchsia_inspect::HistogramProperty;
122use fuchsia_sync::Mutex;
123use futures::FutureExt;
124use merge::{filter_marked_for_deletion, filter_tombstones, merge};
125use serde::{Deserialize, Serialize};
126use std::borrow::Borrow;
127use std::collections::{BTreeMap, HashSet, VecDeque};
128use std::hash::Hash;
129use std::marker::PhantomData;
130use std::num::{NonZero, Saturating};
131use std::ops::Range;
132use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
133use std::sync::{Arc, Weak};
134
135pub trait ReservationOwner: Send + Sync {
137 fn release_reservation(&self, owner_object_id: Option<u64>, amount: u64);
141}
142
143pub struct ReservationImpl<T: Borrow<U>, U: ReservationOwner + ?Sized> {
151 owner: T,
152 owner_object_id: Option<u64>,
153 inner: Mutex<ReservationInner>,
154 phantom: PhantomData<U>,
155}
156
157#[derive(Debug, Default)]
158struct ReservationInner {
159 amount: u64,
161
162 reserved: u64,
164}
165
166impl<T: Borrow<U>, U: ReservationOwner + ?Sized> std::fmt::Debug for ReservationImpl<T, U> {
167 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
168 self.inner.lock().fmt(f)
169 }
170}
171
172impl<T: Borrow<U> + Clone + Send + Sync, U: ReservationOwner + ?Sized> ReservationImpl<T, U> {
173 pub fn new(owner: T, owner_object_id: Option<u64>, amount: u64) -> Self {
174 Self {
175 owner,
176 owner_object_id,
177 inner: Mutex::new(ReservationInner { amount, reserved: 0 }),
178 phantom: PhantomData,
179 }
180 }
181
182 pub fn owner_object_id(&self) -> Option<u64> {
183 self.owner_object_id
184 }
185
186 pub fn amount(&self) -> u64 {
188 self.inner.lock().amount
189 }
190
191 pub fn add(&self, amount: u64) {
193 self.inner.lock().amount += amount;
194 }
195
196 pub fn forget(&self) -> u64 {
200 let mut inner = self.inner.lock();
201 assert_eq!(inner.reserved, 0);
202 std::mem::take(&mut inner.amount)
203 }
204
205 pub fn forget_some(&self, amount: u64) {
209 let mut inner = self.inner.lock();
210 inner.amount -= amount;
211 assert!(inner.reserved <= inner.amount);
212 }
213
214 fn reserve_with(&self, amount: impl FnOnce(u64) -> u64) -> ReservationImpl<&Self, Self> {
217 let mut inner = self.inner.lock();
218 let taken = amount(inner.amount - inner.reserved);
219 inner.reserved += taken;
220 ReservationImpl::new(self, self.owner_object_id, taken)
221 }
222
223 pub fn reserve(&self, amount: u64) -> Option<ReservationImpl<&Self, Self>> {
225 let mut inner = self.inner.lock();
226 if inner.amount - inner.reserved < amount {
227 None
228 } else {
229 inner.reserved += amount;
230 Some(ReservationImpl::new(self, self.owner_object_id, amount))
231 }
232 }
233
234 pub fn commit(&self, amount: u64) {
237 let mut inner = self.inner.lock();
238 inner.reserved -= amount;
239 inner.amount -= amount;
240 }
241
242 pub fn give_back(&self, amount: u64) {
244 self.owner.borrow().release_reservation(self.owner_object_id, amount);
245 let mut inner = self.inner.lock();
246 inner.amount -= amount;
247 assert!(inner.reserved <= inner.amount);
248 }
249
250 pub fn move_to<V: Borrow<W> + Clone + Send + Sync, W: ReservationOwner + ?Sized>(
252 &self,
253 other: &ReservationImpl<V, W>,
254 amount: u64,
255 ) {
256 assert_eq!(self.owner_object_id, other.owner_object_id());
257 let mut inner = self.inner.lock();
258 if let Some(amount) = inner.amount.checked_sub(amount) {
259 inner.amount = amount;
260 } else {
261 std::mem::drop(inner);
262 panic!("Insufficient reservation space");
263 }
264 other.add(amount);
265 }
266}
267
268impl<T: Borrow<U>, U: ReservationOwner + ?Sized> Drop for ReservationImpl<T, U> {
269 fn drop(&mut self) {
270 let inner = self.inner.get_mut();
271 assert_eq!(inner.reserved, 0);
272 let owner_object_id = self.owner_object_id;
273 if inner.amount > 0 {
274 self.owner
275 .borrow()
276 .release_reservation(owner_object_id, std::mem::take(&mut inner.amount));
277 }
278 }
279}
280
281impl<T: Borrow<U> + Send + Sync, U: ReservationOwner + ?Sized> ReservationOwner
282 for ReservationImpl<T, U>
283{
284 fn release_reservation(&self, owner_object_id: Option<u64>, amount: u64) {
285 assert_eq!(owner_object_id, self.owner_object_id);
287 let mut inner = self.inner.lock();
288 assert!(inner.reserved >= amount, "{} >= {}", inner.reserved, amount);
289 inner.reserved -= amount;
290 }
291}
292
293pub type Reservation = ReservationImpl<Arc<dyn ReservationOwner>, dyn ReservationOwner>;
294
295pub type Hold<'a> = ReservationImpl<&'a Reservation, Reservation>;
296
297pub type AllocatorKey = AllocatorKeyV32;
300
301#[derive(Clone, Debug, Deserialize, Eq, Hash, PartialEq, Serialize, TypeFingerprint, Versioned)]
302#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
303pub struct AllocatorKeyV32 {
304 pub device_range: Range<u64>,
305}
306
307impl SortByU64 for AllocatorKey {
308 fn get_leading_u64(&self) -> u64 {
309 self.device_range.start
310 }
311}
312
313const EXTENT_HASH_BUCKET_SIZE: u64 = 1 * 1024 * 1024;
314
315pub struct AllocatorKeyPartitionIterator {
316 device_range: Range<u64>,
317}
318
319impl Iterator for AllocatorKeyPartitionIterator {
320 type Item = u64;
321
322 fn next(&mut self) -> Option<Self::Item> {
323 if self.device_range.start >= self.device_range.end {
324 None
325 } else {
326 let start = self.device_range.start;
327 self.device_range.start = start.saturating_add(EXTENT_HASH_BUCKET_SIZE);
328 let end = std::cmp::min(self.device_range.start, self.device_range.end);
329 let key = AllocatorKey { device_range: start..end };
330 let hash = crate::stable_hash::stable_hash(key);
331 Some(hash)
332 }
333 }
334}
335
336impl FuzzyHash for AllocatorKey {
337 fn fuzzy_hash(&self) -> impl Iterator<Item = u64> {
338 AllocatorKeyPartitionIterator {
339 device_range: round_down(self.device_range.start, EXTENT_HASH_BUCKET_SIZE)
340 ..round_up(self.device_range.end, EXTENT_HASH_BUCKET_SIZE).unwrap_or(u64::MAX),
341 }
342 }
343
344 fn is_range_key(&self) -> bool {
345 true
346 }
347}
348
349impl AllocatorKey {
350 pub fn lower_bound_for_merge_into(self: &AllocatorKey) -> AllocatorKey {
352 AllocatorKey { device_range: 0..self.device_range.start }
353 }
354}
355
356impl LayerKey for AllocatorKey {
357 fn merge_type(&self) -> MergeType {
358 MergeType::OptimizedMerge
359 }
360}
361
362impl OrdUpperBound for AllocatorKey {
363 fn cmp_upper_bound(&self, other: &AllocatorKey) -> std::cmp::Ordering {
364 self.device_range.end.cmp(&other.device_range.end)
365 }
366}
367
368impl OrdLowerBound for AllocatorKey {
369 fn cmp_lower_bound(&self, other: &AllocatorKey) -> std::cmp::Ordering {
370 self.device_range
375 .start
376 .cmp(&other.device_range.start)
377 .then(self.device_range.end.cmp(&other.device_range.end))
378 }
379}
380
381impl Ord for AllocatorKey {
382 fn cmp(&self, other: &AllocatorKey) -> std::cmp::Ordering {
383 self.device_range
384 .start
385 .cmp(&other.device_range.start)
386 .then(self.device_range.end.cmp(&other.device_range.end))
387 }
388}
389
390impl PartialOrd for AllocatorKey {
391 fn partial_cmp(&self, other: &AllocatorKey) -> Option<std::cmp::Ordering> {
392 Some(self.cmp(other))
393 }
394}
395
396impl RangeKey for AllocatorKey {
397 fn overlaps(&self, other: &Self) -> bool {
398 self.device_range.start < other.device_range.end
399 && self.device_range.end > other.device_range.start
400 }
401}
402
403pub type AllocatorValue = AllocatorValueV32;
406impl Value for AllocatorValue {
407 const DELETED_MARKER: Self = Self::None;
408}
409
410#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize, TypeFingerprint, Versioned)]
411#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
412pub enum AllocatorValueV32 {
413 None,
415 Abs { count: u64, owner_object_id: u64 },
419}
420
421pub type AllocatorItem = Item<AllocatorKey, AllocatorValue>;
422
423pub type AllocatorInfo = AllocatorInfoV32;
425
426#[derive(Debug, Default, Clone, Deserialize, Serialize, TypeFingerprint, Versioned)]
427pub struct AllocatorInfoV32 {
428 pub layers: Vec<u64>,
430 pub allocated_bytes: BTreeMap<u64, u64>,
432 pub marked_for_deletion: HashSet<u64>,
435 pub limit_bytes: BTreeMap<u64, u64>,
438}
439
440const MAX_ALLOCATOR_INFO_SERIALIZED_SIZE: usize = 131_072;
441
442pub fn max_extent_size_for_block_size(block_size: u64) -> u64 {
444 block_size * (DEFAULT_MAX_SERIALIZED_RECORD_SIZE - 64) / 9
449}
450
451#[derive(Default)]
452struct AllocatorCounters {
453 num_flushes: u64,
454 last_flush_time: Option<std::time::SystemTime>,
455}
456
457pub struct Allocator {
458 filesystem: Weak<FxFilesystem>,
459 block_size: u64,
460 device_size: u64,
461 object_id: u64,
462 max_extent_size_bytes: u64,
463 tree: LSMTree<AllocatorKey, AllocatorValue>,
464 temporary_allocations: Arc<SkipListLayer<AllocatorKey, AllocatorValue>>,
471 inner: Mutex<Inner>,
472 allocation_mutex: futures::lock::Mutex<()>,
473 counters: Mutex<AllocatorCounters>,
474 maximum_offset: AtomicU64,
475 allocations_allowed: AtomicBool,
476}
477
478#[derive(Debug, Default, PartialEq)]
480struct ByteTracking {
481 allocated_bytes: Saturating<u64>,
488
489 uncommitted_allocated_bytes: u64,
492
493 reserved_bytes: u64,
495
496 committed_deallocated_bytes: u64,
500}
501
502impl ByteTracking {
503 fn used_bytes(&self) -> Saturating<u64> {
506 self.allocated_bytes + Saturating(self.uncommitted_allocated_bytes + self.reserved_bytes)
507 }
508
509 fn unavailable_bytes(&self) -> Saturating<u64> {
514 self.allocated_bytes
515 + Saturating(self.uncommitted_allocated_bytes)
516 + Saturating(self.committed_deallocated_bytes)
517 }
518
519 fn unavailable_after_sync_bytes(&self) -> Saturating<u64> {
522 self.allocated_bytes + Saturating(self.uncommitted_allocated_bytes)
523 }
524}
525
526#[derive(Debug)]
527struct CommittedDeallocation {
528 log_file_offset: u64,
530 range: Range<u64>,
532 owner_object_id: u64,
534}
535
536struct Inner {
537 info: AllocatorInfo,
538
539 opened: bool,
542
543 dropped_temporary_allocations: Vec<Range<u64>>,
571
572 owner_bytes: BTreeMap<u64, ByteTracking>,
575
576 unattributed_reserved_bytes: u64,
579
580 committed_deallocated: VecDeque<CommittedDeallocation>,
582
583 trim_reserved_bytes: u64,
586
587 trim_listener: Option<EventListener>,
591
592 strategy: strategy::BestFit,
594
595 allocation_size_histogram: [u64; 64],
597 rebuild_strategy_trigger_histogram: [u64; 64],
599
600 marked_for_deletion: HashSet<u64>,
604
605 volumes_deleted_pending_sync: HashSet<u64>,
607}
608
609impl Inner {
610 fn allocated_bytes(&self) -> Saturating<u64> {
611 let mut total = Saturating(0);
612 for (_, bytes) in &self.owner_bytes {
613 total += bytes.allocated_bytes;
614 }
615 total
616 }
617
618 fn uncommitted_allocated_bytes(&self) -> u64 {
619 self.owner_bytes.values().map(|x| &x.uncommitted_allocated_bytes).sum()
620 }
621
622 fn reserved_bytes(&self) -> u64 {
623 self.owner_bytes.values().map(|x| &x.reserved_bytes).sum::<u64>()
624 + self.unattributed_reserved_bytes
625 }
626
627 fn owner_id_limit_bytes(&self, owner_object_id: u64) -> u64 {
628 match self.info.limit_bytes.get(&owner_object_id) {
629 Some(v) => *v,
630 None => u64::MAX,
631 }
632 }
633
634 fn owner_id_bytes_left(&self, owner_object_id: u64) -> u64 {
635 let limit = self.owner_id_limit_bytes(owner_object_id);
636 let used = self.owner_bytes.get(&owner_object_id).map_or(Saturating(0), |b| b.used_bytes());
637 (Saturating(limit) - used).0
638 }
639
640 fn unavailable_bytes(&self) -> Saturating<u64> {
645 let mut total = Saturating(0);
646 for (_, bytes) in &self.owner_bytes {
647 total += bytes.unavailable_bytes();
648 }
649 total
650 }
651
652 fn used_bytes(&self) -> Saturating<u64> {
655 let mut total = Saturating(0);
656 for (_, bytes) in &self.owner_bytes {
657 total += bytes.used_bytes();
658 }
659 total + Saturating(self.unattributed_reserved_bytes)
660 }
661
662 fn unavailable_after_sync_bytes(&self) -> Saturating<u64> {
665 let mut total = Saturating(0);
666 for (_, bytes) in &self.owner_bytes {
667 total += bytes.unavailable_after_sync_bytes();
668 }
669 total
670 }
671
672 fn bytes_available_not_being_trimmed(&self, device_size: u64) -> Result<u64, Error> {
675 device_size
676 .checked_sub(
677 (self.unavailable_after_sync_bytes() + Saturating(self.trim_reserved_bytes)).0,
678 )
679 .ok_or_else(|| anyhow!(FxfsError::Inconsistent))
680 }
681
682 fn add_reservation(&mut self, owner_object_id: Option<u64>, amount: u64) {
683 match owner_object_id {
684 Some(owner) => self.owner_bytes.entry(owner).or_default().reserved_bytes += amount,
685 None => self.unattributed_reserved_bytes += amount,
686 };
687 }
688
689 fn remove_reservation(&mut self, owner_object_id: Option<u64>, amount: u64) {
690 match owner_object_id {
691 Some(owner) => {
692 let owner_entry = self.owner_bytes.entry(owner).or_default();
693 assert!(
694 owner_entry.reserved_bytes >= amount,
695 "{} >= {}",
696 owner_entry.reserved_bytes,
697 amount
698 );
699 owner_entry.reserved_bytes -= amount;
700 }
701 None => {
702 assert!(
703 self.unattributed_reserved_bytes >= amount,
704 "{} >= {}",
705 self.unattributed_reserved_bytes,
706 amount
707 );
708 self.unattributed_reserved_bytes -= amount
709 }
710 };
711 }
712}
713
714pub struct TrimmableExtents<'a> {
717 allocator: &'a Allocator,
718 extents: Vec<Range<u64>>,
719 _drop_event: DropEvent,
723}
724
725impl<'a> TrimmableExtents<'a> {
726 pub fn extents(&self) -> &Vec<Range<u64>> {
727 &self.extents
728 }
729
730 fn new(allocator: &'a Allocator) -> (Self, EventListener) {
732 let drop_event = DropEvent::new();
733 let listener = drop_event.listen();
734 (Self { allocator, extents: vec![], _drop_event: drop_event }, listener)
735 }
736
737 fn add_extent(&mut self, extent: Range<u64>) {
738 self.extents.push(extent);
739 }
740}
741
742impl<'a> Drop for TrimmableExtents<'a> {
743 fn drop(&mut self) {
744 let mut inner = self.allocator.inner.lock();
745 for device_range in std::mem::take(&mut self.extents) {
746 inner.strategy.free(device_range.clone()).expect("drop trim extent");
747 self.allocator
748 .temporary_allocations
749 .erase(&AllocatorKey { device_range: device_range.clone() });
750 }
751 inner.trim_reserved_bytes = 0;
752 }
753}
754
755impl Allocator {
756 pub fn new(filesystem: Arc<FxFilesystem>, object_id: u64) -> Allocator {
757 let block_size = filesystem.block_size();
758 let device_size = round_down(filesystem.device().size(), block_size);
760 if device_size != filesystem.device().size() {
761 warn!("Device size is not block aligned. Rounding down.");
762 }
763 let max_extent_size_bytes = max_extent_size_for_block_size(filesystem.block_size());
764 let mut strategy = strategy::BestFit::default();
765 strategy.free(0..device_size).expect("new fs");
766 Allocator {
767 filesystem: Arc::downgrade(&filesystem),
768 block_size,
769 device_size,
770 object_id,
771 max_extent_size_bytes,
772 tree: LSMTree::new(merge, Box::new(NullCache {})),
773 temporary_allocations: SkipListLayer::new(1024),
774 inner: Mutex::new(Inner {
775 info: AllocatorInfo::default(),
776 opened: false,
777 dropped_temporary_allocations: Vec::new(),
778 owner_bytes: BTreeMap::new(),
779 unattributed_reserved_bytes: 0,
780 committed_deallocated: VecDeque::new(),
781 trim_reserved_bytes: 0,
782 trim_listener: None,
783 strategy,
784 allocation_size_histogram: [0; 64],
785 rebuild_strategy_trigger_histogram: [0; 64],
786 marked_for_deletion: HashSet::new(),
787 volumes_deleted_pending_sync: HashSet::new(),
788 }),
789 allocation_mutex: futures::lock::Mutex::new(()),
790 counters: Mutex::new(AllocatorCounters::default()),
791 maximum_offset: AtomicU64::new(0),
792 allocations_allowed: AtomicBool::new(filesystem.options().image_builder_mode.is_none()),
793 }
794 }
795
796 pub fn tree(&self) -> &LSMTree<AllocatorKey, AllocatorValue> {
797 &self.tree
798 }
799
800 pub fn enable_allocations(&self) {
803 self.allocations_allowed.store(true, Ordering::SeqCst);
804 }
805
806 pub async fn filter(
811 &self,
812 iter: impl LayerIterator<AllocatorKey, AllocatorValue>,
813 committed_marked_for_deletion: bool,
814 ) -> Result<impl LayerIterator<AllocatorKey, AllocatorValue>, Error> {
815 let marked_for_deletion = {
816 let inner = self.inner.lock();
817 if committed_marked_for_deletion {
818 &inner.info.marked_for_deletion
819 } else {
820 &inner.marked_for_deletion
821 }
822 .clone()
823 };
824 let iter =
825 filter_marked_for_deletion(filter_tombstones(iter).await?, marked_for_deletion).await?;
826 Ok(iter)
827 }
828
829 pub fn allocation_size_histogram(&self) -> [u64; 64] {
833 self.inner.lock().allocation_size_histogram
834 }
835
836 pub async fn create(&self, transaction: &mut Transaction<'_>) -> Result<(), Error> {
838 assert_eq!(std::mem::replace(&mut self.inner.lock().opened, true), false);
841
842 let filesystem = self.filesystem.upgrade().unwrap();
843 let root_store = filesystem.root_store();
844 ObjectStore::create_object_with_id(
845 &root_store,
846 transaction,
847 ReservedId::new(&root_store, NonZero::new(self.object_id()).unwrap()),
848 HandleOptions::default(),
849 None,
850 )?;
851 root_store.update_last_object_id(self.object_id());
852 Ok(())
853 }
854
855 pub async fn open(self: &Arc<Self>) -> Result<(), Error> {
858 let filesystem = self.filesystem.upgrade().unwrap();
859 let root_store = filesystem.root_store();
860
861 self.inner.lock().strategy = strategy::BestFit::default();
862
863 let handle =
864 ObjectStore::open_object(&root_store, self.object_id, HandleOptions::default(), None)
865 .await
866 .context("Failed to open allocator object")?;
867
868 if handle.get_size() > 0 {
869 let serialized_info = handle
870 .contents(MAX_ALLOCATOR_INFO_SERIALIZED_SIZE)
871 .await
872 .context("Failed to read AllocatorInfo")?;
873 let mut cursor = std::io::Cursor::new(serialized_info);
874 let (info, _version) = AllocatorInfo::deserialize_with_version(&mut cursor)
875 .context("Failed to deserialize AllocatorInfo")?;
876
877 let mut handles = Vec::new();
878 let mut total_size = 0;
879 for object_id in &info.layers {
880 let handle = ObjectStore::open_object(
881 &root_store,
882 *object_id,
883 HandleOptions::default(),
884 None,
885 )
886 .await
887 .context("Failed to open allocator layer file")?;
888
889 let size = handle.get_size();
890 total_size += size;
891 handles.push(handle);
892 }
893
894 {
895 let mut inner = self.inner.lock();
896
897 let mut device_bytes = self.device_size;
899 for (&owner_object_id, &bytes) in &info.allocated_bytes {
900 ensure!(
901 bytes <= device_bytes,
902 anyhow!(FxfsError::Inconsistent).context(format!(
903 "Allocated bytes exceeds device size: {:?}",
904 info.allocated_bytes
905 ))
906 );
907 device_bytes -= bytes;
908
909 inner.owner_bytes.entry(owner_object_id).or_default().allocated_bytes =
910 Saturating(bytes);
911 }
912
913 inner.info = info;
914 }
915
916 self.tree.append_layers(handles).await.context("Failed to append allocator layers")?;
917 self.filesystem.upgrade().unwrap().object_manager().update_reservation(
918 self.object_id,
919 tree::reservation_amount_from_layer_size(total_size),
920 );
921 }
922
923 Ok(())
924 }
925
926 pub async fn on_replay_complete(self: &Arc<Self>) -> Result<(), Error> {
927 {
929 let mut inner = self.inner.lock();
930 inner.volumes_deleted_pending_sync.clear();
931 inner.marked_for_deletion = inner.info.marked_for_deletion.clone();
932 }
933
934 if !self.rebuild_strategy().await.context("Build free extents")? {
938 if self.filesystem.upgrade().unwrap().options().read_only {
939 info!("Device contains no free space (read-only mode).");
940 } else {
941 info!("Device contains no free space.");
942 return Err(FxfsError::Inconsistent)
943 .context("Device appears to contain no free space");
944 }
945 }
946
947 assert_eq!(std::mem::replace(&mut self.inner.lock().opened, true), false);
948 Ok(())
949 }
950
951 async fn rebuild_strategy(self: &Arc<Self>) -> Result<bool, Error> {
956 let mut changed = false;
957 let mut layer_set = self.tree.empty_layer_set();
958 layer_set.layers.push((self.temporary_allocations.clone() as Arc<dyn Layer<_, _>>).into());
959 self.tree.add_all_layers_to_layer_set(&mut layer_set);
960
961 let overflow_markers = self.inner.lock().strategy.overflow_markers();
962 self.inner.lock().strategy.reset_overflow_markers();
963
964 let mut to_add = Vec::new();
965 let mut merger = layer_set.merger();
966 let mut iter = self.filter(merger.query(Query::FullScan).await?, false).await?;
967 let mut last_offset = 0;
968 while last_offset < self.device_size {
969 let next_range = match iter.get() {
970 None => {
971 assert!(last_offset <= self.device_size);
972 let range = last_offset..self.device_size;
973 last_offset = self.device_size;
974 iter.advance().await?;
975 range
976 }
977 Some(ItemRef { key: AllocatorKey { device_range, .. }, .. }) => {
978 if device_range.end < last_offset {
979 iter.advance().await?;
980 continue;
981 }
982 if device_range.start <= last_offset {
983 last_offset = device_range.end;
984 iter.advance().await?;
985 continue;
986 }
987 let range = last_offset..device_range.start;
988 last_offset = device_range.end;
989 iter.advance().await?;
990 range
991 }
992 };
993 to_add.push(next_range);
994 if to_add.len() > 100 {
996 let mut inner = self.inner.lock();
997 for range in to_add.drain(..) {
998 changed |= inner.strategy.force_free(range)?;
999 }
1000 }
1001 }
1002 let mut inner = self.inner.lock();
1003 for range in to_add {
1004 changed |= inner.strategy.force_free(range)?;
1005 }
1006 if overflow_markers != inner.strategy.overflow_markers() {
1007 changed = true;
1008 }
1009 Ok(changed)
1010 }
1011
1012 pub async fn take_for_trimming(
1016 &self,
1017 offset: u64,
1018 max_extent_size: usize,
1019 extents_per_batch: usize,
1020 ) -> Result<TrimmableExtents<'_>, Error> {
1021 let _guard = self.allocation_mutex.lock().await;
1022
1023 let (mut result, listener) = TrimmableExtents::new(self);
1024 let mut bytes = 0;
1025
1026 let mut layer_set = self.tree.empty_layer_set();
1029 layer_set.layers.push((self.temporary_allocations.clone() as Arc<dyn Layer<_, _>>).into());
1030 self.tree.add_all_layers_to_layer_set(&mut layer_set);
1031 let mut merger = layer_set.merger();
1032 let mut iter = self
1033 .filter(
1034 merger.query(Query::FullRange(&AllocatorKey { device_range: offset..0 })).await?,
1035 false,
1036 )
1037 .await?;
1038 let mut last_offset = offset;
1039 'outer: while last_offset < self.device_size {
1040 let mut range = match iter.get() {
1041 None => {
1042 assert!(last_offset <= self.device_size);
1043 let range = last_offset..self.device_size;
1044 last_offset = self.device_size;
1045 iter.advance().await?;
1046 range
1047 }
1048 Some(ItemRef { key: AllocatorKey { device_range, .. }, .. }) => {
1049 if device_range.end <= last_offset {
1050 iter.advance().await?;
1051 continue;
1052 }
1053 if device_range.start <= last_offset {
1054 last_offset = device_range.end;
1055 iter.advance().await?;
1056 continue;
1057 }
1058 let range = last_offset..device_range.start;
1059 last_offset = device_range.end;
1060 iter.advance().await?;
1061 range
1062 }
1063 };
1064 if range.start < offset {
1065 continue;
1066 }
1067
1068 let mut inner = self.inner.lock();
1071
1072 while range.start < range.end {
1073 let prefix =
1074 range.start..std::cmp::min(range.start + max_extent_size as u64, range.end);
1075 range = prefix.end..range.end;
1076 bytes += prefix.length()?;
1077 inner.strategy.remove(prefix.clone());
1080 self.temporary_allocations.insert(AllocatorItem {
1081 key: AllocatorKey { device_range: prefix.clone() },
1082 value: AllocatorValue::Abs { owner_object_id: INVALID_OBJECT_ID, count: 1 },
1083 })?;
1084 result.add_extent(prefix);
1085 if result.extents.len() == extents_per_batch {
1086 break 'outer;
1087 }
1088 }
1089 if result.extents.len() == extents_per_batch {
1090 break 'outer;
1091 }
1092 }
1093 {
1094 let mut inner = self.inner.lock();
1095
1096 assert!(inner.trim_reserved_bytes == 0, "Multiple trims ongoing");
1097 inner.trim_listener = Some(listener);
1098 inner.trim_reserved_bytes = bytes;
1099 debug_assert!(
1100 (Saturating(inner.trim_reserved_bytes) + inner.unavailable_bytes()).0
1101 <= self.device_size
1102 );
1103 }
1104 Ok(result)
1105 }
1106
1107 pub fn parent_objects(&self) -> Vec<u64> {
1109 self.inner.lock().info.layers.clone()
1112 }
1113
1114 pub fn owner_byte_limits(&self) -> Vec<(u64, u64)> {
1116 self.inner.lock().info.limit_bytes.iter().map(|(k, v)| (*k, *v)).collect()
1117 }
1118
1119 pub fn owner_allocation_info(&self, owner_object_id: u64) -> (u64, Option<u64>) {
1121 let inner = self.inner.lock();
1122 (
1123 inner.owner_bytes.get(&owner_object_id).map(|b| b.used_bytes().0).unwrap_or(0u64),
1124 inner.info.limit_bytes.get(&owner_object_id).copied(),
1125 )
1126 }
1127
1128 pub fn owner_bytes_debug(&self) -> String {
1130 format!("{:?}", self.inner.lock().owner_bytes)
1131 }
1132
1133 fn needs_sync(&self) -> bool {
1134 let inner = self.inner.lock();
1139 inner.unavailable_bytes().0 >= self.device_size
1140 }
1141
1142 fn is_system_store(&self, owner_object_id: u64) -> bool {
1143 let fs = self.filesystem.upgrade().unwrap();
1144 owner_object_id == fs.object_manager().root_store_object_id()
1145 || owner_object_id == fs.object_manager().root_parent_store_object_id()
1146 }
1147
1148 pub fn disown_reservation(&self, old_owner_object_id: Option<u64>, amount: u64) {
1151 if old_owner_object_id.is_none() || amount == 0 {
1152 return;
1153 }
1154 let mut inner = self.inner.lock();
1156 inner.remove_reservation(old_owner_object_id, amount);
1157 inner.add_reservation(None, amount);
1158 }
1159
1160 pub fn track_statistics(self: &Arc<Self>, parent: &fuchsia_inspect::Node, name: &str) {
1163 let this = Arc::downgrade(self);
1164 parent.record_lazy_child(name, move || {
1165 let this_clone = this.clone();
1166 async move {
1167 let inspector = fuchsia_inspect::Inspector::default();
1168 if let Some(this) = this_clone.upgrade() {
1169 let counters = this.counters.lock();
1170 let root = inspector.root();
1171 root.record_uint("max_extent_size_bytes", this.max_extent_size_bytes);
1172 root.record_uint("bytes_total", this.device_size);
1173 let (allocated, reserved, used, unavailable) = {
1174 let inner = this.inner.lock();
1176 (
1177 inner.allocated_bytes().0,
1178 inner.reserved_bytes(),
1179 inner.used_bytes().0,
1180 inner.unavailable_bytes().0,
1181 )
1182 };
1183 root.record_uint("bytes_allocated", allocated);
1184 root.record_uint("bytes_reserved", reserved);
1185 root.record_uint("bytes_used", used);
1186 root.record_uint("bytes_unavailable", unavailable);
1187
1188 if let Some(x) = round_div(100 * allocated, this.device_size) {
1191 root.record_uint("bytes_allocated_percent", x);
1192 }
1193 if let Some(x) = round_div(100 * reserved, this.device_size) {
1194 root.record_uint("bytes_reserved_percent", x);
1195 }
1196 if let Some(x) = round_div(100 * used, this.device_size) {
1197 root.record_uint("bytes_used_percent", x);
1198 }
1199 if let Some(x) = round_div(100 * unavailable, this.device_size) {
1200 root.record_uint("bytes_unavailable_percent", x);
1201 }
1202
1203 root.record_uint("num_flushes", counters.num_flushes);
1204 if let Some(last_flush_time) = counters.last_flush_time.as_ref() {
1205 root.record_uint(
1206 "last_flush_time_ms",
1207 last_flush_time
1208 .duration_since(std::time::UNIX_EPOCH)
1209 .unwrap_or(std::time::Duration::ZERO)
1210 .as_millis()
1211 .try_into()
1212 .unwrap_or(0u64),
1213 );
1214 }
1215
1216 let data = this.allocation_size_histogram();
1217 let alloc_sizes = root.create_uint_linear_histogram(
1218 "allocation_size_histogram",
1219 fuchsia_inspect::LinearHistogramParams {
1220 floor: 1,
1221 step_size: 1,
1222 buckets: 64,
1223 },
1224 );
1225 for (i, count) in data.iter().enumerate() {
1226 if i != 0 {
1227 alloc_sizes.insert_multiple(i as u64, *count as usize);
1228 }
1229 }
1230 root.record(alloc_sizes);
1231
1232 let data = this.inner.lock().rebuild_strategy_trigger_histogram;
1233 let triggers = root.create_uint_linear_histogram(
1234 "rebuild_strategy_triggers",
1235 fuchsia_inspect::LinearHistogramParams {
1236 floor: 1,
1237 step_size: 1,
1238 buckets: 64,
1239 },
1240 );
1241 for (i, count) in data.iter().enumerate() {
1242 if i != 0 {
1243 triggers.insert_multiple(i as u64, *count as usize);
1244 }
1245 }
1246 root.record(triggers);
1247
1248 let allocator_ref = this.clone();
1249 root.record_child("lsm_tree", move |node| {
1250 allocator_ref.tree.record_inspect_data(node);
1251 });
1252 }
1253 Ok(inspector)
1254 }
1255 .boxed()
1256 });
1257 }
1258
1259 pub fn maximum_offset(&self) -> u64 {
1264 self.maximum_offset.load(Ordering::Relaxed)
1265 }
1266}
1267
1268impl Drop for Allocator {
1269 fn drop(&mut self) {
1270 let inner = self.inner.lock();
1271 assert_eq!(inner.uncommitted_allocated_bytes(), 0);
1273 assert_eq!(inner.reserved_bytes(), 0);
1274 }
1275}
1276
1277#[fxfs_trace::trace]
1278impl Allocator {
1279 pub fn object_id(&self) -> u64 {
1281 self.object_id
1282 }
1283
1284 pub fn info(&self) -> AllocatorInfo {
1287 self.inner.lock().info.clone()
1288 }
1289
1290 #[trace]
1298 pub async fn allocate(
1299 self: &Arc<Self>,
1300 transaction: &mut Transaction<'_>,
1301 owner_object_id: u64,
1302 mut len: u64,
1303 ) -> Result<Range<u64>, Error> {
1304 ensure!(self.allocations_allowed.load(Ordering::SeqCst), FxfsError::Unavailable);
1305 assert_eq!(len % self.block_size, 0);
1306 len = std::cmp::min(len, self.max_extent_size_bytes);
1307 debug_assert_ne!(owner_object_id, INVALID_OBJECT_ID);
1308
1309 let reservation = if let Some(reservation) = transaction.allocator_reservation {
1311 match reservation.owner_object_id {
1312 None => assert!(self.is_system_store(owner_object_id)),
1314 Some(res_owner_object_id) => assert_eq!(owner_object_id, res_owner_object_id),
1316 };
1317 let r = reservation
1319 .reserve_with(|limit| std::cmp::min(len, round_down(limit, self.block_size)));
1320 len = r.amount();
1321 Left(r)
1322 } else {
1323 let mut inner = self.inner.lock();
1324 assert!(inner.opened);
1325 let device_used = inner.used_bytes();
1327 let owner_bytes_left = inner.owner_id_bytes_left(owner_object_id);
1328 let limit =
1330 std::cmp::min(owner_bytes_left, (Saturating(self.device_size) - device_used).0);
1331 len = round_down(std::cmp::min(len, limit), self.block_size);
1332 let owner_entry = inner.owner_bytes.entry(owner_object_id).or_default();
1333 owner_entry.reserved_bytes += len;
1334 Right(ReservationImpl::<_, Self>::new(&**self, Some(owner_object_id), len))
1335 };
1336
1337 ensure!(len > 0, FxfsError::NoSpace);
1338
1339 let volumes_deleted = {
1341 let inner = self.inner.lock();
1342 (!inner.volumes_deleted_pending_sync.is_empty())
1343 .then(|| inner.volumes_deleted_pending_sync.clone())
1344 };
1345
1346 if let Some(volumes_deleted) = volumes_deleted {
1347 self.filesystem
1350 .upgrade()
1351 .unwrap()
1352 .sync(SyncOptions {
1353 flush_device: true,
1354 precondition: Some(Box::new(|| {
1355 !self.inner.lock().volumes_deleted_pending_sync.is_empty()
1356 })),
1357 ..Default::default()
1358 })
1359 .await?;
1360
1361 {
1362 let mut inner = self.inner.lock();
1363 for owner_id in volumes_deleted {
1364 inner.volumes_deleted_pending_sync.remove(&owner_id);
1365 inner.marked_for_deletion.insert(owner_id);
1366 }
1367 }
1368
1369 let _guard = self.allocation_mutex.lock().await;
1370 self.rebuild_strategy().await?;
1371 }
1372
1373 #[allow(clippy::never_loop)] let _guard = 'sync: loop {
1375 for _ in 0..10 {
1377 {
1378 let guard = self.allocation_mutex.lock().await;
1379
1380 if !self.needs_sync() {
1381 break 'sync guard;
1382 }
1383 }
1384
1385 self.filesystem
1395 .upgrade()
1396 .unwrap()
1397 .sync(SyncOptions {
1398 flush_device: true,
1399 precondition: Some(Box::new(|| self.needs_sync())),
1400 ..Default::default()
1401 })
1402 .await?;
1403 }
1404 bail!(
1405 anyhow!(FxfsError::NoSpace).context("Sync failed to yield sufficient free space.")
1406 );
1407 };
1408
1409 let mut trim_listener = None;
1410 {
1411 let mut inner = self.inner.lock();
1412 inner.allocation_size_histogram[std::cmp::min(63, len / self.block_size) as usize] += 1;
1413
1414 let avail = self
1417 .device_size
1418 .checked_sub(inner.unavailable_bytes().0)
1419 .ok_or(FxfsError::Inconsistent)?;
1420 let free_and_not_being_trimmed =
1421 inner.bytes_available_not_being_trimmed(self.device_size)?;
1422 if free_and_not_being_trimmed < std::cmp::min(len, avail) {
1423 debug_assert!(inner.trim_reserved_bytes > 0);
1424 trim_listener = std::mem::take(&mut inner.trim_listener);
1425 }
1426 }
1427
1428 if let Some(listener) = trim_listener {
1429 listener.await;
1430 }
1431
1432 let result = loop {
1433 {
1434 let mut inner = self.inner.lock();
1435
1436 for device_range in inner.dropped_temporary_allocations.drain(..) {
1439 self.temporary_allocations.erase(&AllocatorKey { device_range });
1440 }
1441
1442 match inner.strategy.allocate(len) {
1443 Err(FxfsError::NotFound) => {
1444 inner.rebuild_strategy_trigger_histogram
1446 [std::cmp::min(63, (len / self.block_size) as usize)] += 1;
1447 }
1448 Err(err) => {
1449 error!(err:%; "Likely filesystem corruption.");
1450 return Err(err.into());
1451 }
1452 Ok(x) => {
1453 break x;
1454 }
1455 }
1456 }
1457 if !self.rebuild_strategy().await? {
1461 error!("Cannot find additional free space. Corruption?");
1462 return Err(FxfsError::Inconsistent.into());
1463 }
1464 };
1465
1466 debug!(device_range:? = result; "allocate");
1467
1468 let len = result.length().unwrap();
1469 let reservation_owner = reservation.either(
1470 |l| {
1472 l.forget_some(len);
1473 l.owner_object_id()
1474 },
1475 |r| {
1476 r.forget_some(len);
1477 r.owner_object_id()
1478 },
1479 );
1480
1481 {
1482 let mut inner = self.inner.lock();
1483 let owner_entry = inner.owner_bytes.entry(owner_object_id).or_default();
1484 owner_entry.uncommitted_allocated_bytes += len;
1485 assert_eq!(owner_object_id, reservation_owner.unwrap_or(owner_object_id));
1487 inner.remove_reservation(reservation_owner, len);
1488 self.temporary_allocations.insert(AllocatorItem {
1489 key: AllocatorKey { device_range: result.clone() },
1490 value: AllocatorValue::Abs { owner_object_id, count: 1 },
1491 })?;
1492 }
1493
1494 let mutation =
1495 AllocatorMutation::Allocate { device_range: result.clone().into(), owner_object_id };
1496 assert!(transaction.add(self.object_id(), Mutation::Allocator(mutation)).is_none());
1497
1498 Ok(result)
1499 }
1500
1501 #[trace]
1504 pub fn mark_allocated(
1505 &self,
1506 transaction: &mut Transaction<'_>,
1507 owner_object_id: u64,
1508 device_range: Range<u64>,
1509 ) -> Result<(), Error> {
1510 debug_assert_ne!(owner_object_id, INVALID_OBJECT_ID);
1511 {
1512 let len = device_range.length().map_err(|_| FxfsError::InvalidArgs)?;
1513
1514 let mut inner = self.inner.lock();
1515 let device_used = inner.used_bytes();
1516 let owner_id_bytes_left = inner.owner_id_bytes_left(owner_object_id);
1517 let owner_entry = inner.owner_bytes.entry(owner_object_id).or_default();
1518 ensure!(
1519 device_range.end <= self.device_size
1520 && (Saturating(self.device_size) - device_used).0 >= len
1521 && owner_id_bytes_left >= len,
1522 FxfsError::NoSpace
1523 );
1524 if let Some(reservation) = &mut transaction.allocator_reservation {
1525 reservation.reserve(len).ok_or(FxfsError::NoSpace)?.forget();
1527 }
1528 owner_entry.uncommitted_allocated_bytes += len;
1529 inner.strategy.remove(device_range.clone());
1530 self.temporary_allocations.insert(AllocatorItem {
1531 key: AllocatorKey { device_range: device_range.clone() },
1532 value: AllocatorValue::Abs { owner_object_id, count: 1 },
1533 })?;
1534 }
1535 let mutation =
1536 AllocatorMutation::Allocate { device_range: device_range.into(), owner_object_id };
1537 transaction.add(self.object_id(), Mutation::Allocator(mutation));
1538 Ok(())
1539 }
1540
1541 pub fn set_bytes_limit(
1543 &self,
1544 transaction: &mut Transaction<'_>,
1545 owner_object_id: u64,
1546 bytes: u64,
1547 ) -> Result<(), Error> {
1548 assert!(!self.is_system_store(owner_object_id));
1550 transaction.add(
1551 self.object_id(),
1552 Mutation::Allocator(AllocatorMutation::SetLimit { owner_object_id, bytes }),
1553 );
1554 Ok(())
1555 }
1556
1557 pub fn get_owner_bytes_limit(&self, owner_object_id: u64) -> Option<u64> {
1559 self.inner.lock().info.limit_bytes.get(&owner_object_id).copied()
1560 }
1561
1562 pub fn get_owner_bytes_used(&self, owner_object_id: u64) -> u64 {
1565 self.inner.lock().owner_bytes.get(&owner_object_id).map_or(0, |info| info.used_bytes().0)
1566 }
1567
1568 #[trace]
1570 pub async fn deallocate(
1571 &self,
1572 transaction: &mut Transaction<'_>,
1573 owner_object_id: u64,
1574 dealloc_range: Range<u64>,
1575 ) -> Result<u64, Error> {
1576 debug!(device_range:? = dealloc_range; "deallocate");
1577 ensure!(dealloc_range.is_valid(), FxfsError::InvalidArgs);
1578 let deallocated = dealloc_range.end - dealloc_range.start;
1582 let mutation = AllocatorMutation::Deallocate {
1583 device_range: dealloc_range.clone().into(),
1584 owner_object_id,
1585 };
1586 transaction.add(self.object_id(), Mutation::Allocator(mutation));
1587
1588 let _guard = self.allocation_mutex.lock().await;
1589
1590 let mut inner = self.inner.lock();
1602 for device_range in inner.dropped_temporary_allocations.drain(..) {
1603 self.temporary_allocations.erase(&AllocatorKey { device_range });
1604 }
1605
1606 self.temporary_allocations
1611 .insert(AllocatorItem {
1612 key: AllocatorKey { device_range: dealloc_range.clone() },
1613 value: AllocatorValue::Abs { owner_object_id, count: 1 },
1614 })
1615 .context("tracking deallocated")?;
1616
1617 Ok(deallocated)
1618 }
1619
1620 pub fn mark_for_deletion(&self, transaction: &mut Transaction<'_>, owner_object_id: u64) {
1637 transaction.add(
1640 self.object_id(),
1641 Mutation::Allocator(AllocatorMutation::MarkForDeletion(owner_object_id)),
1642 );
1643 }
1644
1645 pub fn did_flush_device(&self, flush_log_offset: u64) {
1648 #[allow(clippy::never_loop)] let deallocs = 'deallocs_outer: loop {
1653 let mut inner = self.inner.lock();
1654 for (index, dealloc) in inner.committed_deallocated.iter().enumerate() {
1655 if dealloc.log_file_offset >= flush_log_offset {
1656 let mut deallocs = inner.committed_deallocated.split_off(index);
1657 std::mem::swap(&mut inner.committed_deallocated, &mut deallocs);
1659 break 'deallocs_outer deallocs;
1660 }
1661 }
1662 break std::mem::take(&mut inner.committed_deallocated);
1663 };
1664
1665 let mut inner = self.inner.lock();
1667 let mut totals = BTreeMap::<u64, u64>::new();
1668 for dealloc in deallocs {
1669 *(totals.entry(dealloc.owner_object_id).or_default()) +=
1670 dealloc.range.length().unwrap();
1671 inner.strategy.free(dealloc.range.clone()).expect("dealloced ranges");
1672 self.temporary_allocations.erase(&AllocatorKey { device_range: dealloc.range.clone() });
1673 }
1674
1675 for (owner_object_id, total) in totals {
1679 match inner.owner_bytes.get_mut(&owner_object_id) {
1680 Some(counters) => counters.committed_deallocated_bytes -= total,
1681 None => {
1682 assert!(inner.volumes_deleted_pending_sync.contains(&owner_object_id));
1685 }
1686 }
1687 }
1688 }
1689
1690 pub fn reserve(
1693 self: Arc<Self>,
1694 owner_object_id: Option<u64>,
1695 amount: u64,
1696 ) -> Option<Reservation> {
1697 {
1698 let mut inner = self.inner.lock();
1699
1700 let device_free = (Saturating(self.device_size) - inner.used_bytes()).0;
1701
1702 let limit = match owner_object_id {
1703 Some(id) => std::cmp::min(inner.owner_id_bytes_left(id), device_free),
1704 None => device_free,
1705 };
1706 if limit < amount {
1707 return None;
1708 }
1709 inner.add_reservation(owner_object_id, amount);
1710 }
1711 Some(Reservation::new(self, owner_object_id, amount))
1712 }
1713
1714 pub fn reserve_with(
1717 self: Arc<Self>,
1718 owner_object_id: Option<u64>,
1719 amount: impl FnOnce(u64) -> u64,
1720 ) -> Reservation {
1721 let amount = {
1722 let mut inner = self.inner.lock();
1723
1724 let device_free = (Saturating(self.device_size) - inner.used_bytes()).0;
1725
1726 let amount = amount(match owner_object_id {
1727 Some(id) => std::cmp::min(inner.owner_id_bytes_left(id), device_free),
1728 None => device_free,
1729 });
1730
1731 inner.add_reservation(owner_object_id, amount);
1732
1733 amount
1734 };
1735
1736 Reservation::new(self, owner_object_id, amount)
1737 }
1738
1739 pub fn get_allocated_bytes(&self) -> u64 {
1741 self.inner.lock().allocated_bytes().0
1742 }
1743
1744 pub fn get_disk_bytes(&self) -> u64 {
1746 self.device_size
1747 }
1748
1749 pub fn get_owner_allocated_bytes(&self) -> BTreeMap<u64, u64> {
1753 self.inner.lock().owner_bytes.iter().map(|(k, v)| (*k, v.allocated_bytes.0)).collect()
1754 }
1755
1756 pub fn get_used_bytes(&self) -> Saturating<u64> {
1758 let inner = self.inner.lock();
1759 inner.used_bytes()
1760 }
1761}
1762
1763impl ReservationOwner for Allocator {
1764 fn release_reservation(&self, owner_object_id: Option<u64>, amount: u64) {
1765 self.inner.lock().remove_reservation(owner_object_id, amount);
1766 }
1767}
1768
1769#[async_trait]
1770impl JournalingObject for Allocator {
1771 fn apply_mutation(
1772 &self,
1773 mutation: Mutation,
1774 context: &ApplyContext<'_, '_>,
1775 _assoc_obj: AssocObj<'_>,
1776 ) -> Result<(), Error> {
1777 match mutation {
1778 Mutation::Allocator(AllocatorMutation::MarkForDeletion(owner_object_id)) => {
1779 let mut inner = self.inner.lock();
1780 inner.owner_bytes.remove(&owner_object_id);
1781
1782 inner.info.marked_for_deletion.insert(owner_object_id);
1787 inner.volumes_deleted_pending_sync.insert(owner_object_id);
1788
1789 inner.info.limit_bytes.remove(&owner_object_id);
1790 }
1791 Mutation::Allocator(AllocatorMutation::Allocate { device_range, owner_object_id }) => {
1792 self.maximum_offset.fetch_max(device_range.end, Ordering::Relaxed);
1793 let item = AllocatorItem {
1794 key: AllocatorKey { device_range: device_range.clone().into() },
1795 value: AllocatorValue::Abs { count: 1, owner_object_id },
1796 };
1797 let len = item.key.device_range.length().unwrap();
1798 let lower_bound = item.key.lower_bound_for_merge_into();
1799 self.tree.merge_into(item, &lower_bound);
1800 let mut inner = self.inner.lock();
1801 let entry = inner.owner_bytes.entry(owner_object_id).or_default();
1802 entry.allocated_bytes += len;
1803 if let ApplyMode::Live(transaction) = context.mode {
1804 entry.uncommitted_allocated_bytes -= len;
1805 inner.dropped_temporary_allocations.push(device_range.into());
1810 if let Some(reservation) = transaction.allocator_reservation {
1811 reservation.commit(len);
1812 }
1813 }
1814 }
1815 Mutation::Allocator(AllocatorMutation::Deallocate {
1816 device_range,
1817 owner_object_id,
1818 }) => {
1819 let item = AllocatorItem {
1820 key: AllocatorKey { device_range: device_range.into() },
1821 value: AllocatorValue::None,
1822 };
1823 let len = item.key.device_range.length().unwrap();
1824
1825 {
1826 let mut inner = self.inner.lock();
1827 {
1828 let entry = inner.owner_bytes.entry(owner_object_id).or_default();
1829 entry.allocated_bytes -= len;
1830 if context.mode.is_live() {
1831 entry.committed_deallocated_bytes += len;
1832 }
1833 }
1834 if context.mode.is_live() {
1835 inner.committed_deallocated.push_back(CommittedDeallocation {
1836 log_file_offset: context.checkpoint.file_offset,
1837 range: item.key.device_range.clone(),
1838 owner_object_id,
1839 });
1840 }
1841 if let ApplyMode::Live(Transaction {
1842 allocator_reservation: Some(reservation),
1843 ..
1844 }) = context.mode
1845 {
1846 inner.add_reservation(reservation.owner_object_id(), len);
1847 reservation.add(len);
1848 }
1849 }
1850 let lower_bound = item.key.lower_bound_for_merge_into();
1851 self.tree.merge_into(item, &lower_bound);
1852 }
1853 Mutation::Allocator(AllocatorMutation::SetLimit { owner_object_id, bytes }) => {
1854 self.inner.lock().info.limit_bytes.insert(owner_object_id, bytes);
1859 }
1860 Mutation::BeginFlush => {
1861 self.tree.seal();
1862 let mut inner = self.inner.lock();
1865 let allocated_bytes =
1866 inner.owner_bytes.iter().map(|(k, v)| (*k, v.allocated_bytes.0)).collect();
1867 inner.info.allocated_bytes = allocated_bytes;
1868 }
1869 Mutation::EndFlush => {}
1870 _ => bail!("unexpected mutation: {:?}", mutation),
1871 }
1872 Ok(())
1873 }
1874
1875 fn drop_mutation(&self, mutation: Mutation, transaction: &Transaction<'_>) {
1876 match mutation {
1877 Mutation::Allocator(AllocatorMutation::Allocate { device_range, owner_object_id }) => {
1878 let len = device_range.length().unwrap();
1879 let mut inner = self.inner.lock();
1880 inner
1881 .owner_bytes
1882 .entry(owner_object_id)
1883 .or_default()
1884 .uncommitted_allocated_bytes -= len;
1885 if let Some(reservation) = transaction.allocator_reservation {
1886 let res_owner = reservation.owner_object_id();
1887 inner.add_reservation(res_owner, len);
1888 reservation.release_reservation(res_owner, len);
1889 }
1890 inner.strategy.free(device_range.clone().into()).expect("drop mutaton");
1891 self.temporary_allocations
1892 .erase(&AllocatorKey { device_range: device_range.into() });
1893 }
1894 Mutation::Allocator(AllocatorMutation::Deallocate { device_range, .. }) => {
1895 self.temporary_allocations
1896 .erase(&AllocatorKey { device_range: device_range.into() });
1897 }
1898 _ => {}
1899 }
1900 }
1901
1902 async fn flush(&self) -> Result<Version, Error> {
1903 let filesystem = self.filesystem.upgrade().unwrap();
1904 let object_manager = filesystem.object_manager();
1905 let earliest_version = self.tree.get_earliest_version();
1906 if !object_manager.needs_flush(self.object_id()) && earliest_version == LATEST_VERSION {
1907 return Ok(earliest_version);
1909 }
1910
1911 let fs = self.filesystem.upgrade().unwrap();
1912 let mut flusher = Flusher::new(self, &fs).await;
1913 let (new_layer_file, info) = flusher.start().await?;
1914 flusher.finish(new_layer_file, info).await
1915 }
1916}
1917
1918pub struct CoalescingIterator<I> {
1931 iter: I,
1932 item: Option<AllocatorItem>,
1933}
1934
1935impl<I: LayerIterator<AllocatorKey, AllocatorValue>> CoalescingIterator<I> {
1936 pub async fn new(iter: I) -> Result<CoalescingIterator<I>, Error> {
1937 let mut iter = Self { iter, item: None };
1938 iter.advance().await?;
1939 Ok(iter)
1940 }
1941}
1942
1943#[async_trait]
1944impl<I: LayerIterator<AllocatorKey, AllocatorValue>> LayerIterator<AllocatorKey, AllocatorValue>
1945 for CoalescingIterator<I>
1946{
1947 async fn advance(&mut self) -> Result<(), Error> {
1948 self.item = self.iter.get().map(|x| x.cloned());
1949 if self.item.is_none() {
1950 return Ok(());
1951 }
1952 let left = self.item.as_mut().unwrap();
1953 loop {
1954 self.iter.advance().await?;
1955 match self.iter.get() {
1956 None => return Ok(()),
1957 Some(right) => {
1958 ensure!(
1960 left.key.device_range.end <= right.key.device_range.start,
1961 FxfsError::Inconsistent
1962 );
1963 if left.key.device_range.end < right.key.device_range.start
1965 || left.value != *right.value
1966 {
1967 return Ok(());
1968 }
1969 left.key.device_range.end = right.key.device_range.end;
1970 }
1971 }
1972 }
1973 }
1974
1975 fn get(&self) -> Option<ItemRef<'_, AllocatorKey, AllocatorValue>> {
1976 self.item.as_ref().map(|x| x.as_item_ref())
1977 }
1978}
1979
1980struct Flusher<'a> {
1981 allocator: &'a Allocator,
1982 fs: &'a Arc<FxFilesystem>,
1983 _guard: WriteGuard<'a>,
1984}
1985
1986impl<'a> Flusher<'a> {
1987 async fn new(allocator: &'a Allocator, fs: &'a Arc<FxFilesystem>) -> Self {
1988 let keys = lock_keys![LockKey::flush(allocator.object_id())];
1989 Self { allocator, fs, _guard: fs.lock_manager().write_lock(keys).await }
1990 }
1991
1992 fn txn_options(allocator_reservation: &Reservation) -> Options<'_> {
1993 Options {
1994 skip_journal_checks: true,
1995 borrow_metadata_space: true,
1996 allocator_reservation: Some(allocator_reservation),
1997 ..Default::default()
1998 }
1999 }
2000
2001 async fn start(&mut self) -> Result<(DataObjectHandle<ObjectStore>, AllocatorInfo), Error> {
2002 let object_manager = self.fs.object_manager();
2003 let mut transaction = self
2004 .fs
2005 .clone()
2006 .new_transaction(lock_keys![], Self::txn_options(object_manager.metadata_reservation()))
2007 .await?;
2008
2009 let root_store = self.fs.root_store();
2010 let layer_object_handle = ObjectStore::create_object(
2011 &root_store,
2012 &mut transaction,
2013 HandleOptions { skip_journal_checks: true, ..Default::default() },
2014 None,
2015 )
2016 .await?;
2017 root_store.add_to_graveyard(&mut transaction, layer_object_handle.object_id());
2018 transaction.add(self.allocator.object_id(), Mutation::BeginFlush);
2025 let info = transaction
2026 .commit_with_callback(|_| {
2027 self.allocator.inner.lock().info.clone()
2030 })
2031 .await?;
2032 Ok((layer_object_handle, info))
2033 }
2034
2035 async fn finish(
2036 self,
2037 layer_object_handle: DataObjectHandle<ObjectStore>,
2038 mut info: AllocatorInfo,
2039 ) -> Result<Version, Error> {
2040 let object_manager = self.fs.object_manager();
2041 let txn_options = Self::txn_options(object_manager.metadata_reservation());
2042
2043 let layer_set = self.allocator.tree.immutable_layer_set();
2044 let total_len = layer_set.sum_len();
2045 {
2046 let start_time = std::time::Instant::now();
2047 let merged_layer_count = layer_set.layers.len();
2048 let mut merger = layer_set.merger();
2049 let iter = self.allocator.filter(merger.query(Query::FullScan).await?, true).await?;
2050 let iter = CoalescingIterator::new(iter).await?;
2051 let bytes_written = compact_with_iterator(
2052 iter,
2053 total_len,
2054 DirectWriter::new(&layer_object_handle, txn_options).await,
2055 layer_object_handle.block_size(),
2056 Some(self.fs.journal().get_compaction_yielder()),
2057 )
2058 .await?;
2059
2060 self.allocator.tree.report_compaction_metrics(
2061 bytes_written,
2062 start_time.elapsed(),
2063 merged_layer_count,
2064 );
2065 }
2066
2067 let root_store = self.fs.root_store();
2068
2069 let object_handle;
2071 let reservation_update;
2072 let mut transaction = self
2073 .fs
2074 .clone()
2075 .new_transaction(
2076 lock_keys![LockKey::object(
2077 root_store.store_object_id(),
2078 self.allocator.object_id()
2079 )],
2080 txn_options,
2081 )
2082 .await?;
2083 let mut serialized_info = Vec::new();
2084
2085 debug!(oid = layer_object_handle.object_id(); "new allocator layer file");
2086 object_handle = ObjectStore::open_object(
2087 &root_store,
2088 self.allocator.object_id(),
2089 HandleOptions::default(),
2090 None,
2091 )
2092 .await?;
2093
2094 for object_id in &info.layers {
2096 root_store.add_to_graveyard(&mut transaction, *object_id);
2097 }
2098
2099 let marked_for_deletion = std::mem::take(&mut info.marked_for_deletion);
2106
2107 info.layers = vec![layer_object_handle.object_id()];
2108
2109 info.serialize_with_version(&mut serialized_info)?;
2110
2111 let mut buf = object_handle.allocate_buffer(serialized_info.len()).await;
2112 buf.as_mut_slice()[..serialized_info.len()].copy_from_slice(&serialized_info[..]);
2113 object_handle.txn_write(&mut transaction, 0u64, buf.as_ref()).await?;
2114
2115 reservation_update = ReservationUpdate::new(tree::reservation_amount_from_layer_size(
2116 layer_object_handle.get_size(),
2117 ));
2118
2119 transaction.add_with_object(
2122 self.allocator.object_id(),
2123 Mutation::EndFlush,
2124 AssocObj::Borrowed(&reservation_update),
2125 );
2126 root_store.remove_from_graveyard(&mut transaction, layer_object_handle.object_id());
2127
2128 let layers = layers_from_handles([layer_object_handle]).await?;
2129 transaction
2130 .commit_with_callback(|_| {
2131 self.allocator.tree.set_layers(layers);
2132
2133 let mut inner = self.allocator.inner.lock();
2137 inner.info.layers = info.layers;
2138 for owner_id in marked_for_deletion {
2139 inner.marked_for_deletion.remove(&owner_id);
2140 inner.info.marked_for_deletion.remove(&owner_id);
2141 }
2142 })
2143 .await?;
2144
2145 for layer in layer_set.layers {
2147 let object_id = layer.handle().map(|h| h.object_id());
2148 layer.close_layer().await;
2149 if let Some(object_id) = object_id {
2150 root_store.tombstone_object(object_id, txn_options).await?;
2151 }
2152 }
2153
2154 let mut counters = self.allocator.counters.lock();
2155 counters.num_flushes += 1;
2156 counters.last_flush_time = Some(std::time::SystemTime::now());
2157 Ok(self.allocator.tree.get_earliest_version())
2159 }
2160}
2161
2162#[cfg(test)]
2163mod tests {
2164 use crate::filesystem::{
2165 FxFilesystem, FxFilesystemBuilder, JournalingObject, OpenFxFilesystem,
2166 };
2167 use crate::fsck::fsck;
2168 use crate::lsm_tree::cache::NullCache;
2169 use crate::lsm_tree::skip_list_layer::SkipListLayer;
2170 use crate::lsm_tree::types::{FuzzyHash as _, Item, ItemRef, Layer, LayerIterator};
2171 use crate::lsm_tree::{LSMTree, Query};
2172 use crate::object_handle::ObjectHandle;
2173 use crate::object_store::allocator::merge::merge;
2174 use crate::object_store::allocator::{
2175 Allocator, AllocatorKey, AllocatorValue, CoalescingIterator,
2176 };
2177 use crate::object_store::transaction::{Options, TRANSACTION_METADATA_MAX_AMOUNT, lock_keys};
2178 use crate::object_store::volume::root_volume;
2179 use crate::object_store::{Directory, LockKey, NewChildStoreOptions, ObjectStore};
2180 use crate::range::RangeExt;
2181 use crate::round::round_up;
2182 use crate::testing;
2183 use fuchsia_async as fasync;
2184 use fuchsia_sync::Mutex;
2185 use std::cmp::{max, min};
2186 use std::ops::{Bound, Range};
2187 use std::sync::Arc;
2188 use storage_device::DeviceHolder;
2189 use storage_device::fake_device::FakeDevice;
2190
2191 #[test]
2192 fn test_allocator_key_is_range_based() {
2193 assert!(AllocatorKey { device_range: 0..100 }.is_range_key());
2195 }
2196
2197 #[fuchsia::test]
2198 async fn test_coalescing_iterator() {
2199 let skip_list = SkipListLayer::new(100);
2200 let items = [
2201 Item::new(
2202 AllocatorKey { device_range: 0..100 },
2203 AllocatorValue::Abs { count: 1, owner_object_id: 99 },
2204 ),
2205 Item::new(
2206 AllocatorKey { device_range: 100..200 },
2207 AllocatorValue::Abs { count: 1, owner_object_id: 99 },
2208 ),
2209 ];
2210 skip_list.insert(items[1].clone()).expect("insert error");
2211 skip_list.insert(items[0].clone()).expect("insert error");
2212 let mut iter =
2213 CoalescingIterator::new(skip_list.seek(Bound::Unbounded).await.expect("seek failed"))
2214 .await
2215 .expect("new failed");
2216 let ItemRef { key, value, .. } = iter.get().expect("get failed");
2217 assert_eq!(
2218 (key, value),
2219 (
2220 &AllocatorKey { device_range: 0..200 },
2221 &AllocatorValue::Abs { count: 1, owner_object_id: 99 }
2222 )
2223 );
2224 iter.advance().await.expect("advance failed");
2225 assert!(iter.get().is_none());
2226 }
2227
2228 #[fuchsia::test]
2229 async fn test_merge_and_coalesce_across_three_layers() {
2230 let lsm_tree = LSMTree::new(merge, Box::new(NullCache {}));
2231 lsm_tree
2232 .insert(Item::new(
2233 AllocatorKey { device_range: 100..200 },
2234 AllocatorValue::Abs { count: 1, owner_object_id: 99 },
2235 ))
2236 .expect("insert error");
2237 lsm_tree.seal();
2238 lsm_tree
2239 .insert(Item::new(
2240 AllocatorKey { device_range: 0..100 },
2241 AllocatorValue::Abs { count: 1, owner_object_id: 99 },
2242 ))
2243 .expect("insert error");
2244
2245 let layer_set = lsm_tree.layer_set();
2246 let mut merger = layer_set.merger();
2247 let mut iter =
2248 CoalescingIterator::new(merger.query(Query::FullScan).await.expect("seek failed"))
2249 .await
2250 .expect("new failed");
2251 let ItemRef { key, value, .. } = iter.get().expect("get failed");
2252 assert_eq!(
2253 (key, value),
2254 (
2255 &AllocatorKey { device_range: 0..200 },
2256 &AllocatorValue::Abs { count: 1, owner_object_id: 99 }
2257 )
2258 );
2259 iter.advance().await.expect("advance failed");
2260 assert!(iter.get().is_none());
2261 }
2262
2263 #[fuchsia::test]
2264 async fn test_merge_and_coalesce_wont_merge_across_object_id() {
2265 let lsm_tree = LSMTree::new(merge, Box::new(NullCache {}));
2266 lsm_tree
2267 .insert(Item::new(
2268 AllocatorKey { device_range: 100..200 },
2269 AllocatorValue::Abs { count: 1, owner_object_id: 99 },
2270 ))
2271 .expect("insert error");
2272 lsm_tree.seal();
2273 lsm_tree
2274 .insert(Item::new(
2275 AllocatorKey { device_range: 0..100 },
2276 AllocatorValue::Abs { count: 1, owner_object_id: 98 },
2277 ))
2278 .expect("insert error");
2279
2280 let layer_set = lsm_tree.layer_set();
2281 let mut merger = layer_set.merger();
2282 let mut iter =
2283 CoalescingIterator::new(merger.query(Query::FullScan).await.expect("seek failed"))
2284 .await
2285 .expect("new failed");
2286 let ItemRef { key, value, .. } = iter.get().expect("get failed");
2287 assert_eq!(
2288 (key, value),
2289 (
2290 &AllocatorKey { device_range: 0..100 },
2291 &AllocatorValue::Abs { count: 1, owner_object_id: 98 },
2292 )
2293 );
2294 iter.advance().await.expect("advance failed");
2295 let ItemRef { key, value, .. } = iter.get().expect("get failed");
2296 assert_eq!(
2297 (key, value),
2298 (
2299 &AllocatorKey { device_range: 100..200 },
2300 &AllocatorValue::Abs { count: 1, owner_object_id: 99 }
2301 )
2302 );
2303 iter.advance().await.expect("advance failed");
2304 assert!(iter.get().is_none());
2305 }
2306
2307 fn overlap(a: &Range<u64>, b: &Range<u64>) -> u64 {
2308 if a.end > b.start && a.start < b.end {
2309 min(a.end, b.end) - max(a.start, b.start)
2310 } else {
2311 0
2312 }
2313 }
2314
2315 async fn collect_allocations(allocator: &Allocator) -> Vec<Range<u64>> {
2316 let layer_set = allocator.tree.layer_set();
2317 let mut merger = layer_set.merger();
2318 let mut iter = allocator
2319 .filter(merger.query(Query::FullScan).await.expect("seek failed"), false)
2320 .await
2321 .expect("build iterator");
2322 let mut allocations: Vec<Range<u64>> = Vec::new();
2323 while let Some(ItemRef { key: AllocatorKey { device_range }, .. }) = iter.get() {
2324 if let Some(r) = allocations.last() {
2325 assert!(device_range.start >= r.end);
2326 }
2327 allocations.push(device_range.clone());
2328 iter.advance().await.expect("advance failed");
2329 }
2330 allocations
2331 }
2332
2333 async fn check_allocations(allocator: &Allocator, expected_allocations: &[Range<u64>]) {
2334 let layer_set = allocator.tree.layer_set();
2335 let mut merger = layer_set.merger();
2336 let mut iter = allocator
2337 .filter(merger.query(Query::FullScan).await.expect("seek failed"), false)
2338 .await
2339 .expect("build iterator");
2340 let mut found = 0;
2341 while let Some(ItemRef { key: AllocatorKey { device_range }, .. }) = iter.get() {
2342 let mut l = device_range.length().expect("Invalid range");
2343 found += l;
2344 for range in expected_allocations {
2347 l -= overlap(range, device_range);
2348 if l == 0 {
2349 break;
2350 }
2351 }
2352 assert_eq!(l, 0, "range {device_range:?} not covered by expectations");
2353 iter.advance().await.expect("advance failed");
2354 }
2355 assert_eq!(found, expected_allocations.iter().map(|r| r.length().unwrap()).sum::<u64>());
2357 }
2358
2359 async fn test_fs() -> (OpenFxFilesystem, Arc<Allocator>) {
2360 let device = DeviceHolder::new(FakeDevice::new(4096, 4096));
2361 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2362 let allocator = fs.allocator();
2363 (fs, allocator)
2364 }
2365
2366 #[fuchsia::test]
2367 async fn test_allocations() {
2368 const STORE_OBJECT_ID: u64 = 99;
2369 let (fs, allocator) = test_fs().await;
2370 let mut transaction =
2371 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2372 let mut device_ranges = collect_allocations(&allocator).await;
2373
2374 let expected = vec![
2376 0..4096, 4096..139264, 139264..204800, 204800..335872, 335872..401408, 524288..528384, ];
2383 assert_eq!(device_ranges, expected);
2384 device_ranges.push(
2385 allocator
2386 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2387 .await
2388 .expect("allocate failed"),
2389 );
2390 assert_eq!(device_ranges.last().unwrap().length().expect("Invalid range"), fs.block_size());
2391 device_ranges.push(
2392 allocator
2393 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2394 .await
2395 .expect("allocate failed"),
2396 );
2397 assert_eq!(device_ranges.last().unwrap().length().expect("Invalid range"), fs.block_size());
2398 assert_eq!(overlap(&device_ranges[0], &device_ranges[1]), 0);
2399 transaction.commit().await.expect("commit failed");
2400 let mut transaction =
2401 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2402 device_ranges.push(
2403 allocator
2404 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2405 .await
2406 .expect("allocate failed"),
2407 );
2408 assert_eq!(device_ranges[7].length().expect("Invalid range"), fs.block_size());
2409 assert_eq!(overlap(&device_ranges[5], &device_ranges[7]), 0);
2410 assert_eq!(overlap(&device_ranges[6], &device_ranges[7]), 0);
2411 transaction.commit().await.expect("commit failed");
2412
2413 check_allocations(&allocator, &device_ranges).await;
2414 }
2415
2416 #[fuchsia::test]
2417 async fn test_allocate_more_than_max_size() {
2418 const STORE_OBJECT_ID: u64 = 99;
2419 let (fs, allocator) = test_fs().await;
2420 let mut transaction =
2421 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2422 let mut device_ranges = collect_allocations(&allocator).await;
2423 device_ranges.push(
2424 allocator
2425 .allocate(&mut transaction, STORE_OBJECT_ID, fs.device().size())
2426 .await
2427 .expect("allocate failed"),
2428 );
2429 assert_eq!(
2430 device_ranges.last().unwrap().length().expect("Invalid range"),
2431 allocator.max_extent_size_bytes
2432 );
2433 transaction.commit().await.expect("commit failed");
2434
2435 check_allocations(&allocator, &device_ranges).await;
2436 }
2437
2438 #[fuchsia::test]
2439 async fn test_deallocations() {
2440 const STORE_OBJECT_ID: u64 = 99;
2441 let (fs, allocator) = test_fs().await;
2442 let initial_allocations = collect_allocations(&allocator).await;
2443
2444 let mut transaction =
2445 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2446 let device_range1 = allocator
2447 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2448 .await
2449 .expect("allocate failed");
2450 assert_eq!(device_range1.length().expect("Invalid range"), fs.block_size());
2451 transaction.commit().await.expect("commit failed");
2452
2453 let mut transaction =
2454 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2455 allocator
2456 .deallocate(&mut transaction, STORE_OBJECT_ID, device_range1)
2457 .await
2458 .expect("deallocate failed");
2459 transaction.commit().await.expect("commit failed");
2460
2461 check_allocations(&allocator, &initial_allocations).await;
2462 }
2463
2464 #[fuchsia::test]
2465 async fn test_mark_allocated() {
2466 const STORE_OBJECT_ID: u64 = 99;
2467 let (fs, allocator) = test_fs().await;
2468 let mut device_ranges = collect_allocations(&allocator).await;
2469 let range = {
2470 let mut transaction = fs
2471 .clone()
2472 .new_transaction(lock_keys![], Options::default())
2473 .await
2474 .expect("new failed");
2475 allocator
2477 .allocate(&mut transaction, STORE_OBJECT_ID, 2 * fs.block_size())
2478 .await
2479 .expect("allocate failed")
2480 };
2482
2483 let mut transaction =
2484 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2485
2486 device_ranges.push(
2489 allocator
2490 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2491 .await
2492 .expect("allocate failed"),
2493 );
2494
2495 assert_eq!(device_ranges.last().unwrap().start, range.start);
2496
2497 let mut range2 = range.clone();
2499 range2.start += fs.block_size();
2500 allocator
2501 .mark_allocated(&mut transaction, STORE_OBJECT_ID, range2.clone())
2502 .expect("mark_allocated failed");
2503 device_ranges.push(range2);
2504
2505 device_ranges.push(
2507 allocator
2508 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2509 .await
2510 .expect("allocate failed"),
2511 );
2512 let last_range = device_ranges.last().unwrap();
2513 assert_eq!(last_range.length().expect("Invalid range"), fs.block_size());
2514 assert_eq!(overlap(last_range, &range), 0);
2515 transaction.commit().await.expect("commit failed");
2516
2517 check_allocations(&allocator, &device_ranges).await;
2518 }
2519
2520 #[fuchsia::test]
2521 async fn test_mark_for_deletion() {
2522 const STORE_OBJECT_ID: u64 = 99;
2523 let (fs, allocator) = test_fs().await;
2524
2525 let initial_allocated_bytes = allocator.get_allocated_bytes();
2527 let mut device_ranges = collect_allocations(&allocator).await;
2528 let mut transaction =
2529 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2530 for _ in 0..15 {
2532 device_ranges.push(
2533 allocator
2534 .allocate(&mut transaction, STORE_OBJECT_ID, 100 * fs.block_size())
2535 .await
2536 .expect("allocate failed"),
2537 );
2538 device_ranges.push(
2539 allocator
2540 .allocate(&mut transaction, STORE_OBJECT_ID, 100 * fs.block_size())
2541 .await
2542 .expect("allocate2 failed"),
2543 );
2544 }
2545 transaction.commit().await.expect("commit failed");
2546 check_allocations(&allocator, &device_ranges).await;
2547
2548 assert_eq!(
2549 allocator.get_allocated_bytes(),
2550 initial_allocated_bytes + fs.block_size() * 3000
2551 );
2552
2553 let mut transaction =
2555 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2556 allocator.mark_for_deletion(&mut transaction, STORE_OBJECT_ID);
2557 transaction.commit().await.expect("commit failed");
2558
2559 assert_eq!(allocator.get_allocated_bytes(), initial_allocated_bytes);
2561 check_allocations(&allocator, &device_ranges).await;
2562
2563 device_ranges.clear();
2566
2567 let mut transaction =
2568 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2569 let target_bytes = 1500 * fs.block_size();
2570 while device_ranges.iter().map(|x| x.length().unwrap()).sum::<u64>() != target_bytes {
2571 let len = std::cmp::min(
2572 target_bytes - device_ranges.iter().map(|x| x.length().unwrap()).sum::<u64>(),
2573 100 * fs.block_size(),
2574 );
2575 device_ranges.push(
2576 allocator.allocate(&mut transaction, 100, len).await.expect("allocate failed"),
2577 );
2578 }
2579 transaction.commit().await.expect("commit failed");
2580
2581 allocator.flush().await.expect("flush failed");
2583
2584 assert!(!allocator.get_owner_allocated_bytes().contains_key(&STORE_OBJECT_ID));
2588 assert_eq!(*allocator.get_owner_allocated_bytes().get(&100).unwrap(), target_bytes,);
2589 }
2590
2591 async fn create_file(store: &Arc<ObjectStore>, size: usize) {
2592 let root_directory =
2593 Directory::open(store, store.root_directory_object_id()).await.expect("open failed");
2594
2595 let mut transaction = store
2596 .filesystem()
2597 .new_transaction(
2598 lock_keys![LockKey::object(
2599 store.store_object_id(),
2600 store.root_directory_object_id()
2601 )],
2602 Options::default(),
2603 )
2604 .await
2605 .expect("new_transaction failed");
2606 let file = root_directory
2607 .create_child_file(&mut transaction, &format!("foo {}", size))
2608 .await
2609 .expect("create_child_file failed");
2610 transaction.commit().await.expect("commit failed");
2611
2612 let buffer = file.allocate_buffer(size).await;
2613
2614 let mut transaction = file
2616 .new_transaction_with_options(Options {
2617 borrow_metadata_space: true,
2618 ..Default::default()
2619 })
2620 .await
2621 .expect("new_transaction_with_options failed");
2622 file.txn_write(&mut transaction, 0, buffer.as_ref()).await.expect("txn_write failed");
2623 transaction.commit().await.expect("commit failed");
2624 }
2625
2626 #[fuchsia::test]
2627 async fn test_replay_with_deleted_store_and_compaction() {
2628 let (fs, _) = test_fs().await;
2629
2630 const FILE_SIZE: usize = 10_000_000;
2631
2632 let mut store_id = {
2633 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2634 let store = root_vol
2635 .new_volume("vol", NewChildStoreOptions::default())
2636 .await
2637 .expect("new_volume failed");
2638
2639 create_file(&store, FILE_SIZE).await;
2640 store.store_object_id()
2641 };
2642
2643 fs.close().await.expect("close failed");
2644 let device = fs.take_device().await;
2645 device.reopen(false);
2646
2647 let mut fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2648
2649 fs.journal().force_compact().await.expect("compact failed");
2652
2653 for _ in 0..2 {
2654 {
2655 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2656
2657 let transaction = fs
2658 .clone()
2659 .new_transaction(
2660 lock_keys![
2661 LockKey::object(
2662 root_vol.volume_directory().store().store_object_id(),
2663 root_vol.volume_directory().object_id(),
2664 ),
2665 LockKey::flush(store_id)
2666 ],
2667 Options { borrow_metadata_space: true, ..Default::default() },
2668 )
2669 .await
2670 .expect("new_transaction failed");
2671 root_vol
2672 .delete_volume("vol", transaction, || {})
2673 .await
2674 .expect("delete_volume failed");
2675
2676 let store = root_vol
2677 .new_volume("vol", NewChildStoreOptions::default())
2678 .await
2679 .expect("new_volume failed");
2680 create_file(&store, FILE_SIZE).await;
2681 store_id = store.store_object_id();
2682 }
2683
2684 fs.close().await.expect("close failed");
2685 let device = fs.take_device().await;
2686 device.reopen(false);
2687
2688 fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2689 }
2690
2691 fsck(fs.clone()).await.expect("fsck failed");
2692 fs.close().await.expect("close failed");
2693 }
2694
2695 #[fuchsia::test(threads = 4)]
2696 async fn test_compaction_delete_race() {
2697 let (fs, _allocator) = test_fs().await;
2698
2699 {
2700 const FILE_SIZE: usize = 10_000_000;
2701
2702 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2703 let store = root_vol
2704 .new_volume("vol", NewChildStoreOptions::default())
2705 .await
2706 .expect("new_volume failed");
2707
2708 create_file(&store, FILE_SIZE).await;
2709
2710 let fs_clone = fs.clone();
2712
2713 let executor_tasks = testing::force_executor_threads_to_run(4).await;
2716
2717 let task = fasync::Task::spawn(async move {
2718 fs_clone.journal().force_compact().await.expect("compact failed");
2719 });
2720
2721 drop(executor_tasks);
2723
2724 let sleep = rand::random_range(3000..6000);
2727 std::thread::sleep(std::time::Duration::from_micros(sleep));
2728 log::info!("sleep {sleep}us");
2729
2730 let transaction = fs
2731 .clone()
2732 .new_transaction(
2733 lock_keys![
2734 LockKey::object(
2735 root_vol.volume_directory().store().store_object_id(),
2736 root_vol.volume_directory().object_id(),
2737 ),
2738 LockKey::flush(store.store_object_id())
2739 ],
2740 Options { borrow_metadata_space: true, ..Default::default() },
2741 )
2742 .await
2743 .expect("new_transaction failed");
2744 root_vol.delete_volume("vol", transaction, || {}).await.expect("delete_volume failed");
2745
2746 task.await;
2747 }
2748
2749 fs.journal().force_compact().await.expect("compact failed");
2750 fs.close().await.expect("close failed");
2751
2752 let device = fs.take_device().await;
2753 device.reopen(false);
2754
2755 let fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2756 fsck(fs.clone()).await.expect("fsck failed");
2757 fs.close().await.expect("close failed");
2758 }
2759
2760 #[fuchsia::test]
2761 async fn test_delete_multiple_volumes() {
2762 let (mut fs, _) = test_fs().await;
2763
2764 for _ in 0..50 {
2765 {
2766 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2767 let store = root_vol
2768 .new_volume("vol", NewChildStoreOptions::default())
2769 .await
2770 .expect("new_volume failed");
2771
2772 create_file(&store, 1_000_000).await;
2773
2774 let transaction = fs
2775 .clone()
2776 .new_transaction(
2777 lock_keys![
2778 LockKey::object(
2779 root_vol.volume_directory().store().store_object_id(),
2780 root_vol.volume_directory().object_id(),
2781 ),
2782 LockKey::flush(store.store_object_id())
2783 ],
2784 Options { borrow_metadata_space: true, ..Default::default() },
2785 )
2786 .await
2787 .expect("new_transaction failed");
2788 root_vol
2789 .delete_volume("vol", transaction, || {})
2790 .await
2791 .expect("delete_volume failed");
2792
2793 fs.allocator().flush().await.expect("flush failed");
2794 }
2795
2796 fs.close().await.expect("close failed");
2797 let device = fs.take_device().await;
2798 device.reopen(false);
2799
2800 fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2801 }
2802
2803 fsck(fs.clone()).await.expect("fsck failed");
2804 fs.close().await.expect("close failed");
2805 }
2806
2807 #[fuchsia::test]
2808 async fn test_allocate_free_reallocate() {
2809 const STORE_OBJECT_ID: u64 = 99;
2810 let (fs, allocator) = test_fs().await;
2811
2812 let mut device_ranges = Vec::new();
2814 let mut transaction =
2815 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2816 for _ in 0..30 {
2817 device_ranges.push(
2818 allocator
2819 .allocate(&mut transaction, STORE_OBJECT_ID, 100 * fs.block_size())
2820 .await
2821 .expect("allocate failed"),
2822 );
2823 }
2824 transaction.commit().await.expect("commit failed");
2825
2826 assert_eq!(
2827 fs.block_size() * 3000,
2828 *allocator.get_owner_allocated_bytes().entry(STORE_OBJECT_ID).or_default()
2829 );
2830
2831 let mut transaction =
2833 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2834 for range in std::mem::replace(&mut device_ranges, Vec::new()) {
2835 allocator.deallocate(&mut transaction, STORE_OBJECT_ID, range).await.expect("dealloc");
2836 }
2837 transaction.commit().await.expect("commit failed");
2838
2839 assert_eq!(0, *allocator.get_owner_allocated_bytes().entry(STORE_OBJECT_ID).or_default());
2840
2841 let mut transaction =
2844 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2845 let target_len = 1500 * fs.block_size();
2846 while device_ranges.iter().map(|i| i.length().unwrap()).sum::<u64>() != target_len {
2847 let len = target_len - device_ranges.iter().map(|i| i.length().unwrap()).sum::<u64>();
2848 device_ranges.push(
2849 allocator
2850 .allocate(&mut transaction, STORE_OBJECT_ID, len)
2851 .await
2852 .expect("allocate failed"),
2853 );
2854 }
2855 transaction.commit().await.expect("commit failed");
2856
2857 assert_eq!(
2858 fs.block_size() * 1500,
2859 *allocator.get_owner_allocated_bytes().entry(STORE_OBJECT_ID).or_default()
2860 );
2861 }
2862
2863 #[fuchsia::test]
2864 async fn test_flush() {
2865 const STORE_OBJECT_ID: u64 = 99;
2866
2867 let mut device_ranges = Vec::new();
2868 let device = {
2869 let (fs, allocator) = test_fs().await;
2870 let mut transaction = fs
2871 .clone()
2872 .new_transaction(lock_keys![], Options::default())
2873 .await
2874 .expect("new failed");
2875 device_ranges.push(
2876 allocator
2877 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2878 .await
2879 .expect("allocate failed"),
2880 );
2881 device_ranges.push(
2882 allocator
2883 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2884 .await
2885 .expect("allocate failed"),
2886 );
2887 device_ranges.push(
2888 allocator
2889 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2890 .await
2891 .expect("allocate failed"),
2892 );
2893 transaction.commit().await.expect("commit failed");
2894
2895 allocator.flush().await.expect("flush failed");
2896
2897 fs.close().await.expect("close failed");
2898 fs.take_device().await
2899 };
2900
2901 device.reopen(false);
2902 let fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2903 let allocator = fs.allocator();
2904
2905 let allocated = collect_allocations(&allocator).await;
2906
2907 for i in &device_ranges {
2909 let mut overlapping = 0;
2910 for j in &allocated {
2911 overlapping += overlap(i, j);
2912 }
2913 assert_eq!(overlapping, i.length().unwrap(), "Range {i:?} not allocated");
2914 }
2915
2916 let mut transaction =
2917 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2918 let range = allocator
2919 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2920 .await
2921 .expect("allocate failed");
2922
2923 for r in &allocated {
2925 assert_eq!(overlap(r, &range), 0);
2926 }
2927 transaction.commit().await.expect("commit failed");
2928 }
2929
2930 #[fuchsia::test]
2931 async fn test_dropped_transaction() {
2932 const STORE_OBJECT_ID: u64 = 99;
2933 let (fs, allocator) = test_fs().await;
2934 let allocated_range = {
2935 let mut transaction = fs
2936 .clone()
2937 .new_transaction(lock_keys![], Options::default())
2938 .await
2939 .expect("new_transaction failed");
2940 allocator
2941 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2942 .await
2943 .expect("allocate failed")
2944 };
2945 let mut transaction = fs
2948 .clone()
2949 .new_transaction(lock_keys![], Options::default())
2950 .await
2951 .expect("new_transaction failed");
2952 assert_eq!(
2953 allocator
2954 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2955 .await
2956 .expect("allocate failed"),
2957 allocated_range
2958 );
2959 }
2960
2961 #[fuchsia::test]
2962 async fn test_cleanup_removed_owner() {
2963 const STORE_OBJECT_ID: u64 = 99;
2964 let device = {
2965 let (fs, allocator) = test_fs().await;
2966
2967 assert!(!allocator.get_owner_allocated_bytes().contains_key(&STORE_OBJECT_ID));
2968 {
2969 let mut transaction =
2970 fs.clone().new_transaction(lock_keys![], Options::default()).await.unwrap();
2971 allocator
2972 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2973 .await
2974 .expect("Allocating");
2975 transaction.commit().await.expect("Committing.");
2976 }
2977 allocator.flush().await.expect("Flushing");
2978 assert!(allocator.get_owner_allocated_bytes().contains_key(&STORE_OBJECT_ID));
2979 {
2980 let mut transaction =
2981 fs.clone().new_transaction(lock_keys![], Options::default()).await.unwrap();
2982 allocator.mark_for_deletion(&mut transaction, STORE_OBJECT_ID);
2983 transaction.commit().await.expect("Committing.");
2984 }
2985 assert!(!allocator.get_owner_allocated_bytes().contains_key(&STORE_OBJECT_ID));
2986 fs.close().await.expect("Closing");
2987 fs.take_device().await
2988 };
2989
2990 device.reopen(false);
2991 let fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2992 let allocator = fs.allocator();
2993 assert!(!allocator.get_owner_allocated_bytes().contains_key(&STORE_OBJECT_ID));
2994 }
2995
2996 #[fuchsia::test]
2997 async fn test_allocated_bytes() {
2998 const STORE_OBJECT_ID: u64 = 99;
2999 let (fs, allocator) = test_fs().await;
3000
3001 let initial_allocated_bytes = allocator.get_allocated_bytes();
3002
3003 let allocated_bytes = initial_allocated_bytes + fs.block_size();
3005 let allocated_range = {
3006 let mut transaction = fs
3007 .clone()
3008 .new_transaction(lock_keys![], Options::default())
3009 .await
3010 .expect("new_transaction failed");
3011 let range = allocator
3012 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
3013 .await
3014 .expect("allocate failed");
3015 transaction.commit().await.expect("commit failed");
3016 assert_eq!(allocator.get_allocated_bytes(), allocated_bytes);
3017 range
3018 };
3019
3020 {
3021 let mut transaction = fs
3022 .clone()
3023 .new_transaction(lock_keys![], Options::default())
3024 .await
3025 .expect("new_transaction failed");
3026 allocator
3027 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
3028 .await
3029 .expect("allocate failed");
3030
3031 assert_eq!(allocator.get_allocated_bytes(), allocated_bytes);
3033 }
3034
3035 assert_eq!(allocator.get_allocated_bytes(), allocated_bytes);
3037
3038 let deallocate_range = allocated_range.start + 20..allocated_range.end - 20;
3040 let mut transaction =
3041 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
3042 allocator
3043 .deallocate(&mut transaction, STORE_OBJECT_ID, deallocate_range)
3044 .await
3045 .expect("deallocate failed");
3046
3047 assert_eq!(allocator.get_allocated_bytes(), allocated_bytes);
3049
3050 transaction.commit().await.expect("commit failed");
3051
3052 assert_eq!(allocator.get_allocated_bytes(), initial_allocated_bytes + 40);
3054 }
3055
3056 #[fuchsia::test]
3057 async fn test_persist_bytes_limit() {
3058 const LIMIT: u64 = 12345;
3059 const OWNER_ID: u64 = 12;
3060
3061 let (fs, allocator) = test_fs().await;
3062 {
3063 let mut transaction = fs
3064 .clone()
3065 .new_transaction(lock_keys![], Options::default())
3066 .await
3067 .expect("new_transaction failed");
3068 allocator
3069 .set_bytes_limit(&mut transaction, OWNER_ID, LIMIT)
3070 .expect("Failed to set limit.");
3071 assert!(allocator.inner.lock().info.limit_bytes.get(&OWNER_ID).is_none());
3072 transaction.commit().await.expect("Failed to commit transaction");
3073 let bytes: u64 = *allocator
3074 .inner
3075 .lock()
3076 .info
3077 .limit_bytes
3078 .get(&OWNER_ID)
3079 .expect("Failed to find limit");
3080 assert_eq!(LIMIT, bytes);
3081 }
3082 }
3083
3084 fn coalesce_ranges(ranges: Vec<Range<u64>>) -> Vec<Range<u64>> {
3088 let mut coalesced = Vec::new();
3089 let mut prev: Option<Range<u64>> = None;
3090 for range in ranges {
3091 if let Some(prev_range) = &mut prev {
3092 if range.start == prev_range.end {
3093 prev_range.end = range.end;
3094 } else {
3095 coalesced.push(prev_range.clone());
3096 prev = Some(range);
3097 }
3098 } else {
3099 prev = Some(range);
3100 }
3101 }
3102 if let Some(prev_range) = prev {
3103 coalesced.push(prev_range);
3104 }
3105 coalesced
3106 }
3107
3108 #[fuchsia::test]
3109 async fn test_take_for_trimming() {
3110 const STORE_OBJECT_ID: u64 = 99;
3111
3112 let allocated_range;
3115 let expected_free_ranges;
3116 let device = {
3117 let (fs, allocator) = test_fs().await;
3118 let bs = fs.block_size();
3119 let mut transaction = fs
3120 .clone()
3121 .new_transaction(lock_keys![], Options::default())
3122 .await
3123 .expect("new failed");
3124 allocated_range = allocator
3125 .allocate(&mut transaction, STORE_OBJECT_ID, 32 * bs)
3126 .await
3127 .expect("allocate failed");
3128 transaction.commit().await.expect("commit failed");
3129
3130 let mut transaction = fs
3131 .clone()
3132 .new_transaction(lock_keys![], Options::default())
3133 .await
3134 .expect("new failed");
3135 let base = allocated_range.start;
3136 expected_free_ranges = vec![
3137 base..(base + (bs * 1)),
3138 (base + (bs * 2))..(base + (bs * 3)),
3139 (base + (bs * 4))..(base + (bs * 8)),
3143 (base + (bs * 8))..(base + (bs * 12)),
3144 (base + (bs * 12))..(base + (bs * 13)),
3145 (base + (bs * 29))..(base + (bs * 30)),
3146 ];
3147 for range in &expected_free_ranges {
3148 allocator
3149 .deallocate(&mut transaction, STORE_OBJECT_ID, range.clone())
3150 .await
3151 .expect("deallocate failed");
3152 }
3153 transaction.commit().await.expect("commit failed");
3154
3155 allocator.flush().await.expect("flush failed");
3156
3157 fs.close().await.expect("close failed");
3158 fs.take_device().await
3159 };
3160
3161 device.reopen(false);
3162 let fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
3163 let allocator = fs.allocator();
3164
3165 let max_extent_size = fs.block_size() as usize * 4;
3169 const EXTENTS_PER_BATCH: usize = 2;
3170 let mut free_ranges = vec![];
3171 let mut offset = allocated_range.start;
3172 while offset < allocated_range.end {
3173 let free = allocator
3174 .take_for_trimming(offset, max_extent_size, EXTENTS_PER_BATCH)
3175 .await
3176 .expect("take_for_trimming failed");
3177 free_ranges.extend(
3178 free.extents().iter().filter(|range| range.end <= allocated_range.end).cloned(),
3179 );
3180 offset = free.extents().last().expect("Unexpectedly hit the end of free extents").end;
3181 }
3182 let coalesced_free_ranges = coalesce_ranges(free_ranges);
3185 let coalesced_expected_free_ranges = coalesce_ranges(expected_free_ranges);
3186
3187 assert_eq!(coalesced_free_ranges, coalesced_expected_free_ranges);
3188 }
3189
3190 #[fuchsia::test]
3191 async fn test_allocations_wait_for_free_extents() {
3192 const STORE_OBJECT_ID: u64 = 99;
3193 let (fs, allocator) = test_fs().await;
3194 let allocator_clone = allocator.clone();
3195
3196 let mut transaction =
3197 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
3198
3199 let max_extent_size = fs.device().size() as usize;
3201 const EXTENTS_PER_BATCH: usize = usize::MAX;
3202
3203 let trim_done = Arc::new(Mutex::new(false));
3209 let trimmable_extents = allocator
3210 .take_for_trimming(0, max_extent_size, EXTENTS_PER_BATCH)
3211 .await
3212 .expect("take_for_trimming failed");
3213
3214 let trim_done_clone = trim_done.clone();
3215 let bs = fs.block_size();
3216 let alloc_task = fasync::Task::spawn(async move {
3217 allocator_clone
3218 .allocate(&mut transaction, STORE_OBJECT_ID, bs)
3219 .await
3220 .expect("allocate should fail");
3221 {
3222 assert!(*trim_done_clone.lock(), "Allocation finished before trim completed");
3223 }
3224 transaction.commit().await.expect("commit failed");
3225 });
3226
3227 fasync::Timer::new(std::time::Duration::from_millis(100)).await;
3230
3231 {
3233 let mut trim_done = trim_done.lock();
3234 std::mem::drop(trimmable_extents);
3235 *trim_done = true;
3236 }
3237
3238 alloc_task.await;
3239 }
3240
3241 #[fuchsia::test]
3242 async fn test_allocation_with_reservation_not_multiple_of_block_size() {
3243 const STORE_OBJECT_ID: u64 = 99;
3244 let (fs, allocator) = test_fs().await;
3245
3246 const RESERVATION_AMOUNT: u64 = TRANSACTION_METADATA_MAX_AMOUNT + 5000;
3248 let reservation =
3249 allocator.clone().reserve(Some(STORE_OBJECT_ID), RESERVATION_AMOUNT).unwrap();
3250
3251 let mut transaction = fs
3252 .clone()
3253 .new_transaction(
3254 lock_keys![],
3255 Options { allocator_reservation: Some(&reservation), ..Options::default() },
3256 )
3257 .await
3258 .expect("new failed");
3259
3260 let range = allocator
3261 .allocate(
3262 &mut transaction,
3263 STORE_OBJECT_ID,
3264 round_up(RESERVATION_AMOUNT, fs.block_size()).unwrap(),
3265 )
3266 .await
3267 .expect("allocate faiiled");
3268 assert_eq!((range.end - range.start) % fs.block_size(), 0);
3269
3270 println!("{}", range.end - range.start);
3271 }
3272}