1pub mod merge;
90pub mod strategy;
91
92use crate::drop_event::DropEvent;
93use crate::errors::FxfsError;
94use crate::filesystem::{ApplyContext, ApplyMode, FxFilesystem, JournalingObject, SyncOptions};
95use crate::log::*;
96use crate::lsm_tree::cache::NullCache;
97use crate::lsm_tree::skip_list_layer::SkipListLayer;
98use crate::lsm_tree::types::{
99 FuzzyHash, Item, ItemRef, Layer, LayerIterator, LayerKey, MergeType, OrdLowerBound,
100 OrdUpperBound, RangeKey, SortByU64, Value,
101};
102use crate::lsm_tree::{LSMTree, Query, layers_from_handles};
103use crate::object_handle::{INVALID_OBJECT_ID, ObjectHandle, ReadObjectHandle};
104use crate::object_store::object_manager::ReservationUpdate;
105use crate::object_store::transaction::{
106 AllocatorMutation, AssocObj, LockKey, Mutation, Options, Transaction, WriteGuard, lock_keys,
107};
108use crate::object_store::{
109 DataObjectHandle, DirectWriter, HandleOptions, ObjectStore, ReservedId, tree,
110};
111use crate::range::RangeExt;
112use crate::round::{round_div, round_down, round_up};
113use crate::serialized_types::{
114 DEFAULT_MAX_SERIALIZED_RECORD_SIZE, LATEST_VERSION, Version, Versioned, VersionedLatest,
115};
116use anyhow::{Context, Error, anyhow, bail, ensure};
117use async_trait::async_trait;
118use either::Either::{Left, Right};
119use event_listener::EventListener;
120use fprint::TypeFingerprint;
121use fuchsia_inspect::HistogramProperty;
122use fuchsia_sync::Mutex;
123use futures::FutureExt;
124use merge::{filter_marked_for_deletion, filter_tombstones, merge};
125use serde::{Deserialize, Serialize};
126use std::borrow::Borrow;
127use std::collections::{BTreeMap, HashSet, VecDeque};
128use std::hash::Hash;
129use std::marker::PhantomData;
130use std::num::{NonZero, Saturating};
131use std::ops::Range;
132use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
133use std::sync::{Arc, Weak};
134
135pub trait ReservationOwner: Send + Sync {
137 fn release_reservation(&self, owner_object_id: Option<u64>, amount: u64);
141}
142
143pub struct ReservationImpl<T: Borrow<U>, U: ReservationOwner + ?Sized> {
151 owner: T,
152 owner_object_id: Option<u64>,
153 inner: Mutex<ReservationInner>,
154 phantom: PhantomData<U>,
155}
156
157#[derive(Debug, Default)]
158struct ReservationInner {
159 amount: u64,
161
162 reserved: u64,
164}
165
166impl<T: Borrow<U>, U: ReservationOwner + ?Sized> std::fmt::Debug for ReservationImpl<T, U> {
167 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
168 self.inner.lock().fmt(f)
169 }
170}
171
172impl<T: Borrow<U> + Clone + Send + Sync, U: ReservationOwner + ?Sized> ReservationImpl<T, U> {
173 pub fn new(owner: T, owner_object_id: Option<u64>, amount: u64) -> Self {
174 Self {
175 owner,
176 owner_object_id,
177 inner: Mutex::new(ReservationInner { amount, reserved: 0 }),
178 phantom: PhantomData,
179 }
180 }
181
182 pub fn owner_object_id(&self) -> Option<u64> {
183 self.owner_object_id
184 }
185
186 pub fn amount(&self) -> u64 {
188 self.inner.lock().amount
189 }
190
191 pub fn add(&self, amount: u64) {
193 self.inner.lock().amount += amount;
194 }
195
196 pub fn forget(&self) -> u64 {
200 let mut inner = self.inner.lock();
201 assert_eq!(inner.reserved, 0);
202 std::mem::take(&mut inner.amount)
203 }
204
205 pub fn forget_some(&self, amount: u64) {
209 let mut inner = self.inner.lock();
210 inner.amount -= amount;
211 assert!(inner.reserved <= inner.amount);
212 }
213
214 fn reserve_with(&self, amount: impl FnOnce(u64) -> u64) -> ReservationImpl<&Self, Self> {
217 let mut inner = self.inner.lock();
218 let taken = amount(inner.amount - inner.reserved);
219 inner.reserved += taken;
220 ReservationImpl::new(self, self.owner_object_id, taken)
221 }
222
223 pub fn reserve(&self, amount: u64) -> Option<ReservationImpl<&Self, Self>> {
225 let mut inner = self.inner.lock();
226 if inner.amount - inner.reserved < amount {
227 None
228 } else {
229 inner.reserved += amount;
230 Some(ReservationImpl::new(self, self.owner_object_id, amount))
231 }
232 }
233
234 pub fn commit(&self, amount: u64) {
237 let mut inner = self.inner.lock();
238 inner.reserved -= amount;
239 inner.amount -= amount;
240 }
241
242 pub fn give_back(&self, amount: u64) {
244 self.owner.borrow().release_reservation(self.owner_object_id, amount);
245 let mut inner = self.inner.lock();
246 inner.amount -= amount;
247 assert!(inner.reserved <= inner.amount);
248 }
249
250 pub fn move_to<V: Borrow<W> + Clone + Send + Sync, W: ReservationOwner + ?Sized>(
252 &self,
253 other: &ReservationImpl<V, W>,
254 amount: u64,
255 ) {
256 assert_eq!(self.owner_object_id, other.owner_object_id());
257 let mut inner = self.inner.lock();
258 if let Some(amount) = inner.amount.checked_sub(amount) {
259 inner.amount = amount;
260 } else {
261 std::mem::drop(inner);
262 panic!("Insufficient reservation space");
263 }
264 other.add(amount);
265 }
266}
267
268impl<T: Borrow<U>, U: ReservationOwner + ?Sized> Drop for ReservationImpl<T, U> {
269 fn drop(&mut self) {
270 let inner = self.inner.get_mut();
271 assert_eq!(inner.reserved, 0);
272 let owner_object_id = self.owner_object_id;
273 if inner.amount > 0 {
274 self.owner
275 .borrow()
276 .release_reservation(owner_object_id, std::mem::take(&mut inner.amount));
277 }
278 }
279}
280
281impl<T: Borrow<U> + Send + Sync, U: ReservationOwner + ?Sized> ReservationOwner
282 for ReservationImpl<T, U>
283{
284 fn release_reservation(&self, owner_object_id: Option<u64>, amount: u64) {
285 assert_eq!(owner_object_id, self.owner_object_id);
287 let mut inner = self.inner.lock();
288 assert!(inner.reserved >= amount, "{} >= {}", inner.reserved, amount);
289 inner.reserved -= amount;
290 }
291}
292
293pub type Reservation = ReservationImpl<Arc<dyn ReservationOwner>, dyn ReservationOwner>;
294
295pub type Hold<'a> = ReservationImpl<&'a Reservation, Reservation>;
296
297pub type AllocatorKey = AllocatorKeyV32;
300
301#[derive(Clone, Debug, Deserialize, Eq, Hash, PartialEq, Serialize, TypeFingerprint, Versioned)]
302#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
303pub struct AllocatorKeyV32 {
304 pub device_range: Range<u64>,
305}
306
307impl SortByU64 for AllocatorKey {
308 fn get_leading_u64(&self) -> u64 {
309 self.device_range.start
310 }
311}
312
313const EXTENT_HASH_BUCKET_SIZE: u64 = 1 * 1024 * 1024;
314
315pub struct AllocatorKeyPartitionIterator {
316 device_range: Range<u64>,
317}
318
319impl Iterator for AllocatorKeyPartitionIterator {
320 type Item = u64;
321
322 fn next(&mut self) -> Option<Self::Item> {
323 if self.device_range.start >= self.device_range.end {
324 None
325 } else {
326 let start = self.device_range.start;
327 self.device_range.start = start.saturating_add(EXTENT_HASH_BUCKET_SIZE);
328 let end = std::cmp::min(self.device_range.start, self.device_range.end);
329 let key = AllocatorKey { device_range: start..end };
330 let hash = crate::stable_hash::stable_hash(key);
331 Some(hash)
332 }
333 }
334}
335
336impl FuzzyHash for AllocatorKey {
337 fn fuzzy_hash(&self) -> impl Iterator<Item = u64> {
338 AllocatorKeyPartitionIterator {
339 device_range: round_down(self.device_range.start, EXTENT_HASH_BUCKET_SIZE)
340 ..round_up(self.device_range.end, EXTENT_HASH_BUCKET_SIZE).unwrap_or(u64::MAX),
341 }
342 }
343
344 fn is_range_key(&self) -> bool {
345 true
346 }
347}
348
349impl AllocatorKey {
350 pub fn lower_bound_for_merge_into(self: &AllocatorKey) -> AllocatorKey {
352 AllocatorKey { device_range: 0..self.device_range.start }
353 }
354}
355
356impl LayerKey for AllocatorKey {
357 fn merge_type(&self) -> MergeType {
358 MergeType::OptimizedMerge
359 }
360}
361
362impl OrdUpperBound for AllocatorKey {
363 fn cmp_upper_bound(&self, other: &AllocatorKey) -> std::cmp::Ordering {
364 self.device_range.end.cmp(&other.device_range.end)
365 }
366}
367
368impl OrdLowerBound for AllocatorKey {
369 fn cmp_lower_bound(&self, other: &AllocatorKey) -> std::cmp::Ordering {
370 self.device_range
375 .start
376 .cmp(&other.device_range.start)
377 .then(self.device_range.end.cmp(&other.device_range.end))
378 }
379}
380
381impl Ord for AllocatorKey {
382 fn cmp(&self, other: &AllocatorKey) -> std::cmp::Ordering {
383 self.device_range
384 .start
385 .cmp(&other.device_range.start)
386 .then(self.device_range.end.cmp(&other.device_range.end))
387 }
388}
389
390impl PartialOrd for AllocatorKey {
391 fn partial_cmp(&self, other: &AllocatorKey) -> Option<std::cmp::Ordering> {
392 Some(self.cmp(other))
393 }
394}
395
396impl RangeKey for AllocatorKey {
397 fn overlaps(&self, other: &Self) -> bool {
398 self.device_range.start < other.device_range.end
399 && self.device_range.end > other.device_range.start
400 }
401}
402
403pub type AllocatorValue = AllocatorValueV32;
406impl Value for AllocatorValue {
407 const DELETED_MARKER: Self = Self::None;
408}
409
410#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize, TypeFingerprint, Versioned)]
411#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
412pub enum AllocatorValueV32 {
413 None,
415 Abs { count: u64, owner_object_id: u64 },
419}
420
421pub type AllocatorItem = Item<AllocatorKey, AllocatorValue>;
422
423pub type AllocatorInfo = AllocatorInfoV32;
425
426#[derive(Debug, Default, Clone, Deserialize, Serialize, TypeFingerprint, Versioned)]
427pub struct AllocatorInfoV32 {
428 pub layers: Vec<u64>,
430 pub allocated_bytes: BTreeMap<u64, u64>,
432 pub marked_for_deletion: HashSet<u64>,
435 pub limit_bytes: BTreeMap<u64, u64>,
438}
439
440const MAX_ALLOCATOR_INFO_SERIALIZED_SIZE: usize = 131_072;
441
442pub fn max_extent_size_for_block_size(block_size: u64) -> u64 {
444 block_size * (DEFAULT_MAX_SERIALIZED_RECORD_SIZE - 64) / 9
449}
450
451#[derive(Default)]
452struct AllocatorCounters {
453 num_flushes: u64,
454 last_flush_time: Option<std::time::SystemTime>,
455}
456
457pub struct Allocator {
458 filesystem: Weak<FxFilesystem>,
459 block_size: u64,
460 device_size: u64,
461 object_id: u64,
462 max_extent_size_bytes: u64,
463 tree: LSMTree<AllocatorKey, AllocatorValue>,
464 temporary_allocations: Arc<SkipListLayer<AllocatorKey, AllocatorValue>>,
471 inner: Mutex<Inner>,
472 allocation_mutex: futures::lock::Mutex<()>,
473 counters: Mutex<AllocatorCounters>,
474 maximum_offset: AtomicU64,
475 allocations_allowed: AtomicBool,
476}
477
478#[derive(Debug, Default, PartialEq)]
480struct ByteTracking {
481 allocated_bytes: Saturating<u64>,
488
489 uncommitted_allocated_bytes: u64,
492
493 reserved_bytes: u64,
495
496 committed_deallocated_bytes: u64,
500}
501
502impl ByteTracking {
503 fn used_bytes(&self) -> Saturating<u64> {
506 self.allocated_bytes + Saturating(self.uncommitted_allocated_bytes + self.reserved_bytes)
507 }
508
509 fn unavailable_bytes(&self) -> Saturating<u64> {
514 self.allocated_bytes
515 + Saturating(self.uncommitted_allocated_bytes)
516 + Saturating(self.committed_deallocated_bytes)
517 }
518
519 fn unavailable_after_sync_bytes(&self) -> Saturating<u64> {
522 self.allocated_bytes + Saturating(self.uncommitted_allocated_bytes)
523 }
524}
525
526#[derive(Debug)]
527struct CommittedDeallocation {
528 log_file_offset: u64,
530 range: Range<u64>,
532 owner_object_id: u64,
534}
535
536struct Inner {
537 info: AllocatorInfo,
538
539 opened: bool,
542
543 dropped_temporary_allocations: Vec<Range<u64>>,
571
572 owner_bytes: BTreeMap<u64, ByteTracking>,
575
576 unattributed_reserved_bytes: u64,
579
580 committed_deallocated: VecDeque<CommittedDeallocation>,
582
583 trim_reserved_bytes: u64,
586
587 trim_listener: Option<EventListener>,
591
592 strategy: strategy::BestFit,
594
595 allocation_size_histogram: [u64; 64],
597 rebuild_strategy_trigger_histogram: [u64; 64],
599
600 marked_for_deletion: HashSet<u64>,
604
605 volumes_deleted_pending_sync: HashSet<u64>,
607}
608
609impl Inner {
610 fn allocated_bytes(&self) -> Saturating<u64> {
611 let mut total = Saturating(0);
612 for (_, bytes) in &self.owner_bytes {
613 total += bytes.allocated_bytes;
614 }
615 total
616 }
617
618 fn uncommitted_allocated_bytes(&self) -> u64 {
619 self.owner_bytes.values().map(|x| &x.uncommitted_allocated_bytes).sum()
620 }
621
622 fn reserved_bytes(&self) -> u64 {
623 self.owner_bytes.values().map(|x| &x.reserved_bytes).sum::<u64>()
624 + self.unattributed_reserved_bytes
625 }
626
627 fn owner_id_limit_bytes(&self, owner_object_id: u64) -> u64 {
628 match self.info.limit_bytes.get(&owner_object_id) {
629 Some(v) => *v,
630 None => u64::MAX,
631 }
632 }
633
634 fn owner_id_bytes_left(&self, owner_object_id: u64) -> u64 {
635 let limit = self.owner_id_limit_bytes(owner_object_id);
636 let used = self.owner_bytes.get(&owner_object_id).map_or(Saturating(0), |b| b.used_bytes());
637 (Saturating(limit) - used).0
638 }
639
640 fn unavailable_bytes(&self) -> Saturating<u64> {
645 let mut total = Saturating(0);
646 for (_, bytes) in &self.owner_bytes {
647 total += bytes.unavailable_bytes();
648 }
649 total
650 }
651
652 fn used_bytes(&self) -> Saturating<u64> {
655 let mut total = Saturating(0);
656 for (_, bytes) in &self.owner_bytes {
657 total += bytes.used_bytes();
658 }
659 total + Saturating(self.unattributed_reserved_bytes)
660 }
661
662 fn unavailable_after_sync_bytes(&self) -> Saturating<u64> {
665 let mut total = Saturating(0);
666 for (_, bytes) in &self.owner_bytes {
667 total += bytes.unavailable_after_sync_bytes();
668 }
669 total
670 }
671
672 fn bytes_available_not_being_trimmed(&self, device_size: u64) -> Result<u64, Error> {
675 device_size
676 .checked_sub(
677 (self.unavailable_after_sync_bytes() + Saturating(self.trim_reserved_bytes)).0,
678 )
679 .ok_or_else(|| anyhow!(FxfsError::Inconsistent))
680 }
681
682 fn add_reservation(&mut self, owner_object_id: Option<u64>, amount: u64) {
683 match owner_object_id {
684 Some(owner) => self.owner_bytes.entry(owner).or_default().reserved_bytes += amount,
685 None => self.unattributed_reserved_bytes += amount,
686 };
687 }
688
689 fn remove_reservation(&mut self, owner_object_id: Option<u64>, amount: u64) {
690 match owner_object_id {
691 Some(owner) => {
692 let owner_entry = self.owner_bytes.entry(owner).or_default();
693 assert!(
694 owner_entry.reserved_bytes >= amount,
695 "{} >= {}",
696 owner_entry.reserved_bytes,
697 amount
698 );
699 owner_entry.reserved_bytes -= amount;
700 }
701 None => {
702 assert!(
703 self.unattributed_reserved_bytes >= amount,
704 "{} >= {}",
705 self.unattributed_reserved_bytes,
706 amount
707 );
708 self.unattributed_reserved_bytes -= amount
709 }
710 };
711 }
712}
713
714pub struct TrimmableExtents<'a> {
717 allocator: &'a Allocator,
718 extents: Vec<Range<u64>>,
719 _drop_event: DropEvent,
723}
724
725impl<'a> TrimmableExtents<'a> {
726 pub fn extents(&self) -> &Vec<Range<u64>> {
727 &self.extents
728 }
729
730 fn new(allocator: &'a Allocator) -> (Self, EventListener) {
732 let drop_event = DropEvent::new();
733 let listener = drop_event.listen();
734 (Self { allocator, extents: vec![], _drop_event: drop_event }, listener)
735 }
736
737 fn add_extent(&mut self, extent: Range<u64>) {
738 self.extents.push(extent);
739 }
740}
741
742impl<'a> Drop for TrimmableExtents<'a> {
743 fn drop(&mut self) {
744 let mut inner = self.allocator.inner.lock();
745 for device_range in std::mem::take(&mut self.extents) {
746 inner.strategy.free(device_range.clone()).expect("drop trim extent");
747 self.allocator
748 .temporary_allocations
749 .erase(&AllocatorKey { device_range: device_range.clone() });
750 }
751 inner.trim_reserved_bytes = 0;
752 }
753}
754
755impl Allocator {
756 pub fn new(filesystem: Arc<FxFilesystem>, object_id: u64) -> Allocator {
757 let block_size = filesystem.block_size();
758 let device_size = round_down(filesystem.device().size(), block_size);
760 if device_size != filesystem.device().size() {
761 warn!("Device size is not block aligned. Rounding down.");
762 }
763 let max_extent_size_bytes = max_extent_size_for_block_size(filesystem.block_size());
764 let mut strategy = strategy::BestFit::default();
765 strategy.free(0..device_size).expect("new fs");
766 Allocator {
767 filesystem: Arc::downgrade(&filesystem),
768 block_size,
769 device_size,
770 object_id,
771 max_extent_size_bytes,
772 tree: LSMTree::new(merge, Box::new(NullCache {})),
773 temporary_allocations: SkipListLayer::new(1024),
774 inner: Mutex::new(Inner {
775 info: AllocatorInfo::default(),
776 opened: false,
777 dropped_temporary_allocations: Vec::new(),
778 owner_bytes: BTreeMap::new(),
779 unattributed_reserved_bytes: 0,
780 committed_deallocated: VecDeque::new(),
781 trim_reserved_bytes: 0,
782 trim_listener: None,
783 strategy,
784 allocation_size_histogram: [0; 64],
785 rebuild_strategy_trigger_histogram: [0; 64],
786 marked_for_deletion: HashSet::new(),
787 volumes_deleted_pending_sync: HashSet::new(),
788 }),
789 allocation_mutex: futures::lock::Mutex::new(()),
790 counters: Mutex::new(AllocatorCounters::default()),
791 maximum_offset: AtomicU64::new(0),
792 allocations_allowed: AtomicBool::new(filesystem.options().image_builder_mode.is_none()),
793 }
794 }
795
796 pub fn tree(&self) -> &LSMTree<AllocatorKey, AllocatorValue> {
797 &self.tree
798 }
799
800 pub fn enable_allocations(&self) {
803 self.allocations_allowed.store(true, Ordering::SeqCst);
804 }
805
806 pub async fn filter(
811 &self,
812 iter: impl LayerIterator<AllocatorKey, AllocatorValue>,
813 committed_marked_for_deletion: bool,
814 ) -> Result<impl LayerIterator<AllocatorKey, AllocatorValue>, Error> {
815 let marked_for_deletion = {
816 let inner = self.inner.lock();
817 if committed_marked_for_deletion {
818 &inner.info.marked_for_deletion
819 } else {
820 &inner.marked_for_deletion
821 }
822 .clone()
823 };
824 let iter =
825 filter_marked_for_deletion(filter_tombstones(iter).await?, marked_for_deletion).await?;
826 Ok(iter)
827 }
828
829 pub fn allocation_size_histogram(&self) -> [u64; 64] {
833 self.inner.lock().allocation_size_histogram
834 }
835
836 pub async fn create(&self, transaction: &mut Transaction<'_>) -> Result<(), Error> {
838 assert_eq!(std::mem::replace(&mut self.inner.lock().opened, true), false);
841
842 let filesystem = self.filesystem.upgrade().unwrap();
843 let root_store = filesystem.root_store();
844 ObjectStore::create_object_with_id(
845 &root_store,
846 transaction,
847 ReservedId::new(&root_store, NonZero::new(self.object_id()).unwrap()),
848 HandleOptions::default(),
849 None,
850 )?;
851 root_store.update_last_object_id(self.object_id());
852 Ok(())
853 }
854
855 pub async fn open(self: &Arc<Self>) -> Result<(), Error> {
858 let filesystem = self.filesystem.upgrade().unwrap();
859 let root_store = filesystem.root_store();
860
861 self.inner.lock().strategy = strategy::BestFit::default();
862
863 let handle =
864 ObjectStore::open_object(&root_store, self.object_id, HandleOptions::default(), None)
865 .await
866 .context("Failed to open allocator object")?;
867
868 if handle.get_size() > 0 {
869 let serialized_info = handle
870 .contents(MAX_ALLOCATOR_INFO_SERIALIZED_SIZE)
871 .await
872 .context("Failed to read AllocatorInfo")?;
873 let mut cursor = std::io::Cursor::new(serialized_info);
874 let (info, _version) = AllocatorInfo::deserialize_with_version(&mut cursor)
875 .context("Failed to deserialize AllocatorInfo")?;
876
877 let mut handles = Vec::new();
878 let mut total_size = 0;
879 for object_id in &info.layers {
880 let handle = ObjectStore::open_object(
881 &root_store,
882 *object_id,
883 HandleOptions::default(),
884 None,
885 )
886 .await
887 .context("Failed to open allocator layer file")?;
888
889 let size = handle.get_size();
890 total_size += size;
891 handles.push(handle);
892 }
893
894 {
895 let mut inner = self.inner.lock();
896
897 let mut device_bytes = self.device_size;
899 for (&owner_object_id, &bytes) in &info.allocated_bytes {
900 ensure!(
901 bytes <= device_bytes,
902 anyhow!(FxfsError::Inconsistent).context(format!(
903 "Allocated bytes exceeds device size: {:?}",
904 info.allocated_bytes
905 ))
906 );
907 device_bytes -= bytes;
908
909 inner.owner_bytes.entry(owner_object_id).or_default().allocated_bytes =
910 Saturating(bytes);
911 }
912
913 inner.info = info;
914 }
915
916 self.tree.append_layers(handles).await.context("Failed to append allocator layers")?;
917 self.filesystem.upgrade().unwrap().object_manager().update_reservation(
918 self.object_id,
919 tree::reservation_amount_from_layer_size(total_size),
920 );
921 }
922
923 Ok(())
924 }
925
926 pub async fn on_replay_complete(self: &Arc<Self>) -> Result<(), Error> {
927 {
929 let mut inner = self.inner.lock();
930 inner.volumes_deleted_pending_sync.clear();
931 inner.marked_for_deletion = inner.info.marked_for_deletion.clone();
932 }
933
934 if !self.rebuild_strategy().await.context("Build free extents")? {
938 if self.filesystem.upgrade().unwrap().options().read_only {
939 info!("Device contains no free space (read-only mode).");
940 } else {
941 info!("Device contains no free space.");
942 return Err(FxfsError::Inconsistent)
943 .context("Device appears to contain no free space");
944 }
945 }
946
947 assert_eq!(std::mem::replace(&mut self.inner.lock().opened, true), false);
948 Ok(())
949 }
950
951 async fn rebuild_strategy(self: &Arc<Self>) -> Result<bool, Error> {
956 let mut changed = false;
957 let mut layer_set = self.tree.empty_layer_set();
958 layer_set.layers.push((self.temporary_allocations.clone() as Arc<dyn Layer<_, _>>).into());
959 self.tree.add_all_layers_to_layer_set(&mut layer_set);
960
961 let overflow_markers = self.inner.lock().strategy.overflow_markers();
962 self.inner.lock().strategy.reset_overflow_markers();
963
964 let mut to_add = Vec::new();
965 let mut merger = layer_set.merger();
966 let mut iter = self.filter(merger.query(Query::FullScan).await?, false).await?;
967 let mut last_offset = 0;
968 while last_offset < self.device_size {
969 let next_range = match iter.get() {
970 None => {
971 assert!(last_offset <= self.device_size);
972 let range = last_offset..self.device_size;
973 last_offset = self.device_size;
974 iter.advance().await?;
975 range
976 }
977 Some(ItemRef { key: AllocatorKey { device_range, .. }, .. }) => {
978 if device_range.end < last_offset {
979 iter.advance().await?;
980 continue;
981 }
982 if device_range.start <= last_offset {
983 last_offset = device_range.end;
984 iter.advance().await?;
985 continue;
986 }
987 let range = last_offset..device_range.start;
988 last_offset = device_range.end;
989 iter.advance().await?;
990 range
991 }
992 };
993 to_add.push(next_range);
994 if to_add.len() > 100 {
996 let mut inner = self.inner.lock();
997 for range in to_add.drain(..) {
998 changed |= inner.strategy.force_free(range)?;
999 }
1000 }
1001 }
1002 let mut inner = self.inner.lock();
1003 for range in to_add {
1004 changed |= inner.strategy.force_free(range)?;
1005 }
1006 if overflow_markers != inner.strategy.overflow_markers() {
1007 changed = true;
1008 }
1009 Ok(changed)
1010 }
1011
1012 pub async fn take_for_trimming(
1016 &self,
1017 offset: u64,
1018 max_extent_size: usize,
1019 extents_per_batch: usize,
1020 ) -> Result<TrimmableExtents<'_>, Error> {
1021 let _guard = self.allocation_mutex.lock().await;
1022
1023 let (mut result, listener) = TrimmableExtents::new(self);
1024 let mut bytes = 0;
1025
1026 let mut layer_set = self.tree.empty_layer_set();
1029 layer_set.layers.push((self.temporary_allocations.clone() as Arc<dyn Layer<_, _>>).into());
1030 self.tree.add_all_layers_to_layer_set(&mut layer_set);
1031 let mut merger = layer_set.merger();
1032 let mut iter = self
1033 .filter(
1034 merger.query(Query::FullRange(&AllocatorKey { device_range: offset..0 })).await?,
1035 false,
1036 )
1037 .await?;
1038 let mut last_offset = offset;
1039 'outer: while last_offset < self.device_size {
1040 let mut range = match iter.get() {
1041 None => {
1042 assert!(last_offset <= self.device_size);
1043 let range = last_offset..self.device_size;
1044 last_offset = self.device_size;
1045 iter.advance().await?;
1046 range
1047 }
1048 Some(ItemRef { key: AllocatorKey { device_range, .. }, .. }) => {
1049 if device_range.end <= last_offset {
1050 iter.advance().await?;
1051 continue;
1052 }
1053 if device_range.start <= last_offset {
1054 last_offset = device_range.end;
1055 iter.advance().await?;
1056 continue;
1057 }
1058 let range = last_offset..device_range.start;
1059 last_offset = device_range.end;
1060 iter.advance().await?;
1061 range
1062 }
1063 };
1064 if range.start < offset {
1065 continue;
1066 }
1067
1068 let mut inner = self.inner.lock();
1071
1072 while range.start < range.end {
1073 let prefix =
1074 range.start..std::cmp::min(range.start + max_extent_size as u64, range.end);
1075 range = prefix.end..range.end;
1076 bytes += prefix.length()?;
1077 inner.strategy.remove(prefix.clone());
1080 self.temporary_allocations.insert(AllocatorItem {
1081 key: AllocatorKey { device_range: prefix.clone() },
1082 value: AllocatorValue::Abs { owner_object_id: INVALID_OBJECT_ID, count: 1 },
1083 sequence: 0,
1084 })?;
1085 result.add_extent(prefix);
1086 if result.extents.len() == extents_per_batch {
1087 break 'outer;
1088 }
1089 }
1090 if result.extents.len() == extents_per_batch {
1091 break 'outer;
1092 }
1093 }
1094 {
1095 let mut inner = self.inner.lock();
1096
1097 assert!(inner.trim_reserved_bytes == 0, "Multiple trims ongoing");
1098 inner.trim_listener = Some(listener);
1099 inner.trim_reserved_bytes = bytes;
1100 debug_assert!(
1101 (Saturating(inner.trim_reserved_bytes) + inner.unavailable_bytes()).0
1102 <= self.device_size
1103 );
1104 }
1105 Ok(result)
1106 }
1107
1108 pub fn parent_objects(&self) -> Vec<u64> {
1110 self.inner.lock().info.layers.clone()
1113 }
1114
1115 pub fn owner_byte_limits(&self) -> Vec<(u64, u64)> {
1117 self.inner.lock().info.limit_bytes.iter().map(|(k, v)| (*k, *v)).collect()
1118 }
1119
1120 pub fn owner_allocation_info(&self, owner_object_id: u64) -> (u64, Option<u64>) {
1122 let inner = self.inner.lock();
1123 (
1124 inner.owner_bytes.get(&owner_object_id).map(|b| b.used_bytes().0).unwrap_or(0u64),
1125 inner.info.limit_bytes.get(&owner_object_id).copied(),
1126 )
1127 }
1128
1129 pub fn owner_bytes_debug(&self) -> String {
1131 format!("{:?}", self.inner.lock().owner_bytes)
1132 }
1133
1134 fn needs_sync(&self) -> bool {
1135 let inner = self.inner.lock();
1140 inner.unavailable_bytes().0 >= self.device_size
1141 }
1142
1143 fn is_system_store(&self, owner_object_id: u64) -> bool {
1144 let fs = self.filesystem.upgrade().unwrap();
1145 owner_object_id == fs.object_manager().root_store_object_id()
1146 || owner_object_id == fs.object_manager().root_parent_store_object_id()
1147 }
1148
1149 pub fn disown_reservation(&self, old_owner_object_id: Option<u64>, amount: u64) {
1152 if old_owner_object_id.is_none() || amount == 0 {
1153 return;
1154 }
1155 let mut inner = self.inner.lock();
1157 inner.remove_reservation(old_owner_object_id, amount);
1158 inner.add_reservation(None, amount);
1159 }
1160
1161 pub fn track_statistics(self: &Arc<Self>, parent: &fuchsia_inspect::Node, name: &str) {
1164 let this = Arc::downgrade(self);
1165 parent.record_lazy_child(name, move || {
1166 let this_clone = this.clone();
1167 async move {
1168 let inspector = fuchsia_inspect::Inspector::default();
1169 if let Some(this) = this_clone.upgrade() {
1170 let counters = this.counters.lock();
1171 let root = inspector.root();
1172 root.record_uint("max_extent_size_bytes", this.max_extent_size_bytes);
1173 root.record_uint("bytes_total", this.device_size);
1174 let (allocated, reserved, used, unavailable) = {
1175 let inner = this.inner.lock();
1177 (
1178 inner.allocated_bytes().0,
1179 inner.reserved_bytes(),
1180 inner.used_bytes().0,
1181 inner.unavailable_bytes().0,
1182 )
1183 };
1184 root.record_uint("bytes_allocated", allocated);
1185 root.record_uint("bytes_reserved", reserved);
1186 root.record_uint("bytes_used", used);
1187 root.record_uint("bytes_unavailable", unavailable);
1188
1189 if let Some(x) = round_div(100 * allocated, this.device_size) {
1192 root.record_uint("bytes_allocated_percent", x);
1193 }
1194 if let Some(x) = round_div(100 * reserved, this.device_size) {
1195 root.record_uint("bytes_reserved_percent", x);
1196 }
1197 if let Some(x) = round_div(100 * used, this.device_size) {
1198 root.record_uint("bytes_used_percent", x);
1199 }
1200 if let Some(x) = round_div(100 * unavailable, this.device_size) {
1201 root.record_uint("bytes_unavailable_percent", x);
1202 }
1203
1204 root.record_uint("num_flushes", counters.num_flushes);
1205 if let Some(last_flush_time) = counters.last_flush_time.as_ref() {
1206 root.record_uint(
1207 "last_flush_time_ms",
1208 last_flush_time
1209 .duration_since(std::time::UNIX_EPOCH)
1210 .unwrap_or(std::time::Duration::ZERO)
1211 .as_millis()
1212 .try_into()
1213 .unwrap_or(0u64),
1214 );
1215 }
1216
1217 let data = this.allocation_size_histogram();
1218 let alloc_sizes = root.create_uint_linear_histogram(
1219 "allocation_size_histogram",
1220 fuchsia_inspect::LinearHistogramParams {
1221 floor: 1,
1222 step_size: 1,
1223 buckets: 64,
1224 },
1225 );
1226 for (i, count) in data.iter().enumerate() {
1227 if i != 0 {
1228 alloc_sizes.insert_multiple(i as u64, *count as usize);
1229 }
1230 }
1231 root.record(alloc_sizes);
1232
1233 let data = this.inner.lock().rebuild_strategy_trigger_histogram;
1234 let triggers = root.create_uint_linear_histogram(
1235 "rebuild_strategy_triggers",
1236 fuchsia_inspect::LinearHistogramParams {
1237 floor: 1,
1238 step_size: 1,
1239 buckets: 64,
1240 },
1241 );
1242 for (i, count) in data.iter().enumerate() {
1243 if i != 0 {
1244 triggers.insert_multiple(i as u64, *count as usize);
1245 }
1246 }
1247 root.record(triggers);
1248 }
1249 Ok(inspector)
1250 }
1251 .boxed()
1252 });
1253 }
1254
1255 pub fn maximum_offset(&self) -> u64 {
1260 self.maximum_offset.load(Ordering::Relaxed)
1261 }
1262}
1263
1264impl Drop for Allocator {
1265 fn drop(&mut self) {
1266 let inner = self.inner.lock();
1267 assert_eq!(inner.uncommitted_allocated_bytes(), 0);
1269 assert_eq!(inner.reserved_bytes(), 0);
1270 }
1271}
1272
1273#[fxfs_trace::trace]
1274impl Allocator {
1275 pub fn object_id(&self) -> u64 {
1277 self.object_id
1278 }
1279
1280 pub fn info(&self) -> AllocatorInfo {
1283 self.inner.lock().info.clone()
1284 }
1285
1286 #[trace]
1294 pub async fn allocate(
1295 self: &Arc<Self>,
1296 transaction: &mut Transaction<'_>,
1297 owner_object_id: u64,
1298 mut len: u64,
1299 ) -> Result<Range<u64>, Error> {
1300 ensure!(self.allocations_allowed.load(Ordering::SeqCst), FxfsError::Unavailable);
1301 assert_eq!(len % self.block_size, 0);
1302 len = std::cmp::min(len, self.max_extent_size_bytes);
1303 debug_assert_ne!(owner_object_id, INVALID_OBJECT_ID);
1304
1305 let reservation = if let Some(reservation) = transaction.allocator_reservation {
1307 match reservation.owner_object_id {
1308 None => assert!(self.is_system_store(owner_object_id)),
1310 Some(res_owner_object_id) => assert_eq!(owner_object_id, res_owner_object_id),
1312 };
1313 let r = reservation
1315 .reserve_with(|limit| std::cmp::min(len, round_down(limit, self.block_size)));
1316 len = r.amount();
1317 Left(r)
1318 } else {
1319 let mut inner = self.inner.lock();
1320 assert!(inner.opened);
1321 let device_used = inner.used_bytes();
1323 let owner_bytes_left = inner.owner_id_bytes_left(owner_object_id);
1324 let limit =
1326 std::cmp::min(owner_bytes_left, (Saturating(self.device_size) - device_used).0);
1327 len = round_down(std::cmp::min(len, limit), self.block_size);
1328 let owner_entry = inner.owner_bytes.entry(owner_object_id).or_default();
1329 owner_entry.reserved_bytes += len;
1330 Right(ReservationImpl::<_, Self>::new(&**self, Some(owner_object_id), len))
1331 };
1332
1333 ensure!(len > 0, FxfsError::NoSpace);
1334
1335 let volumes_deleted = {
1337 let inner = self.inner.lock();
1338 (!inner.volumes_deleted_pending_sync.is_empty())
1339 .then(|| inner.volumes_deleted_pending_sync.clone())
1340 };
1341
1342 if let Some(volumes_deleted) = volumes_deleted {
1343 self.filesystem
1346 .upgrade()
1347 .unwrap()
1348 .sync(SyncOptions {
1349 flush_device: true,
1350 precondition: Some(Box::new(|| {
1351 !self.inner.lock().volumes_deleted_pending_sync.is_empty()
1352 })),
1353 ..Default::default()
1354 })
1355 .await?;
1356
1357 {
1358 let mut inner = self.inner.lock();
1359 for owner_id in volumes_deleted {
1360 inner.volumes_deleted_pending_sync.remove(&owner_id);
1361 inner.marked_for_deletion.insert(owner_id);
1362 }
1363 }
1364
1365 let _guard = self.allocation_mutex.lock().await;
1366 self.rebuild_strategy().await?;
1367 }
1368
1369 #[allow(clippy::never_loop)] let _guard = 'sync: loop {
1371 for _ in 0..10 {
1373 {
1374 let guard = self.allocation_mutex.lock().await;
1375
1376 if !self.needs_sync() {
1377 break 'sync guard;
1378 }
1379 }
1380
1381 self.filesystem
1391 .upgrade()
1392 .unwrap()
1393 .sync(SyncOptions {
1394 flush_device: true,
1395 precondition: Some(Box::new(|| self.needs_sync())),
1396 ..Default::default()
1397 })
1398 .await?;
1399 }
1400 bail!(
1401 anyhow!(FxfsError::NoSpace).context("Sync failed to yield sufficient free space.")
1402 );
1403 };
1404
1405 let mut trim_listener = None;
1406 {
1407 let mut inner = self.inner.lock();
1408 inner.allocation_size_histogram[std::cmp::min(63, len / self.block_size) as usize] += 1;
1409
1410 let avail = self
1413 .device_size
1414 .checked_sub(inner.unavailable_bytes().0)
1415 .ok_or(FxfsError::Inconsistent)?;
1416 let free_and_not_being_trimmed =
1417 inner.bytes_available_not_being_trimmed(self.device_size)?;
1418 if free_and_not_being_trimmed < std::cmp::min(len, avail) {
1419 debug_assert!(inner.trim_reserved_bytes > 0);
1420 trim_listener = std::mem::take(&mut inner.trim_listener);
1421 }
1422 }
1423
1424 if let Some(listener) = trim_listener {
1425 listener.await;
1426 }
1427
1428 let result = loop {
1429 {
1430 let mut inner = self.inner.lock();
1431
1432 for device_range in inner.dropped_temporary_allocations.drain(..) {
1435 self.temporary_allocations.erase(&AllocatorKey { device_range });
1436 }
1437
1438 match inner.strategy.allocate(len) {
1439 Err(FxfsError::NotFound) => {
1440 inner.rebuild_strategy_trigger_histogram
1442 [std::cmp::min(63, (len / self.block_size) as usize)] += 1;
1443 }
1444 Err(err) => {
1445 error!(err:%; "Likely filesystem corruption.");
1446 return Err(err.into());
1447 }
1448 Ok(x) => {
1449 break x;
1450 }
1451 }
1452 }
1453 if !self.rebuild_strategy().await? {
1457 error!("Cannot find additional free space. Corruption?");
1458 return Err(FxfsError::Inconsistent.into());
1459 }
1460 };
1461
1462 debug!(device_range:? = result; "allocate");
1463
1464 let len = result.length().unwrap();
1465 let reservation_owner = reservation.either(
1466 |l| {
1468 l.forget_some(len);
1469 l.owner_object_id()
1470 },
1471 |r| {
1472 r.forget_some(len);
1473 r.owner_object_id()
1474 },
1475 );
1476
1477 {
1478 let mut inner = self.inner.lock();
1479 let owner_entry = inner.owner_bytes.entry(owner_object_id).or_default();
1480 owner_entry.uncommitted_allocated_bytes += len;
1481 assert_eq!(owner_object_id, reservation_owner.unwrap_or(owner_object_id));
1483 inner.remove_reservation(reservation_owner, len);
1484 self.temporary_allocations.insert(AllocatorItem {
1485 key: AllocatorKey { device_range: result.clone() },
1486 value: AllocatorValue::Abs { owner_object_id, count: 1 },
1487 sequence: 0,
1488 })?;
1489 }
1490
1491 let mutation =
1492 AllocatorMutation::Allocate { device_range: result.clone().into(), owner_object_id };
1493 assert!(transaction.add(self.object_id(), Mutation::Allocator(mutation)).is_none());
1494
1495 Ok(result)
1496 }
1497
1498 #[trace]
1501 pub fn mark_allocated(
1502 &self,
1503 transaction: &mut Transaction<'_>,
1504 owner_object_id: u64,
1505 device_range: Range<u64>,
1506 ) -> Result<(), Error> {
1507 debug_assert_ne!(owner_object_id, INVALID_OBJECT_ID);
1508 {
1509 let len = device_range.length().map_err(|_| FxfsError::InvalidArgs)?;
1510
1511 let mut inner = self.inner.lock();
1512 let device_used = inner.used_bytes();
1513 let owner_id_bytes_left = inner.owner_id_bytes_left(owner_object_id);
1514 let owner_entry = inner.owner_bytes.entry(owner_object_id).or_default();
1515 ensure!(
1516 device_range.end <= self.device_size
1517 && (Saturating(self.device_size) - device_used).0 >= len
1518 && owner_id_bytes_left >= len,
1519 FxfsError::NoSpace
1520 );
1521 if let Some(reservation) = &mut transaction.allocator_reservation {
1522 reservation.reserve(len).ok_or(FxfsError::NoSpace)?.forget();
1524 }
1525 owner_entry.uncommitted_allocated_bytes += len;
1526 inner.strategy.remove(device_range.clone());
1527 self.temporary_allocations.insert(AllocatorItem {
1528 key: AllocatorKey { device_range: device_range.clone() },
1529 value: AllocatorValue::Abs { owner_object_id, count: 1 },
1530 sequence: 0,
1531 })?;
1532 }
1533 let mutation =
1534 AllocatorMutation::Allocate { device_range: device_range.into(), owner_object_id };
1535 transaction.add(self.object_id(), Mutation::Allocator(mutation));
1536 Ok(())
1537 }
1538
1539 pub fn set_bytes_limit(
1541 &self,
1542 transaction: &mut Transaction<'_>,
1543 owner_object_id: u64,
1544 bytes: u64,
1545 ) -> Result<(), Error> {
1546 assert!(!self.is_system_store(owner_object_id));
1548 transaction.add(
1549 self.object_id(),
1550 Mutation::Allocator(AllocatorMutation::SetLimit { owner_object_id, bytes }),
1551 );
1552 Ok(())
1553 }
1554
1555 pub fn get_owner_bytes_limit(&self, owner_object_id: u64) -> Option<u64> {
1557 self.inner.lock().info.limit_bytes.get(&owner_object_id).copied()
1558 }
1559
1560 pub fn get_owner_bytes_used(&self, owner_object_id: u64) -> u64 {
1563 self.inner.lock().owner_bytes.get(&owner_object_id).map_or(0, |info| info.used_bytes().0)
1564 }
1565
1566 #[trace]
1568 pub async fn deallocate(
1569 &self,
1570 transaction: &mut Transaction<'_>,
1571 owner_object_id: u64,
1572 dealloc_range: Range<u64>,
1573 ) -> Result<u64, Error> {
1574 debug!(device_range:? = dealloc_range; "deallocate");
1575 ensure!(dealloc_range.is_valid(), FxfsError::InvalidArgs);
1576 let deallocated = dealloc_range.end - dealloc_range.start;
1580 let mutation = AllocatorMutation::Deallocate {
1581 device_range: dealloc_range.clone().into(),
1582 owner_object_id,
1583 };
1584 transaction.add(self.object_id(), Mutation::Allocator(mutation));
1585
1586 let _guard = self.allocation_mutex.lock().await;
1587
1588 let mut inner = self.inner.lock();
1600 for device_range in inner.dropped_temporary_allocations.drain(..) {
1601 self.temporary_allocations.erase(&AllocatorKey { device_range });
1602 }
1603
1604 self.temporary_allocations
1609 .insert(AllocatorItem {
1610 key: AllocatorKey { device_range: dealloc_range.clone() },
1611 value: AllocatorValue::Abs { owner_object_id, count: 1 },
1612 sequence: 0,
1613 })
1614 .context("tracking deallocated")?;
1615
1616 Ok(deallocated)
1617 }
1618
1619 pub fn mark_for_deletion(&self, transaction: &mut Transaction<'_>, owner_object_id: u64) {
1636 transaction.add(
1639 self.object_id(),
1640 Mutation::Allocator(AllocatorMutation::MarkForDeletion(owner_object_id)),
1641 );
1642 }
1643
1644 pub fn did_flush_device(&self, flush_log_offset: u64) {
1647 #[allow(clippy::never_loop)] let deallocs = 'deallocs_outer: loop {
1652 let mut inner = self.inner.lock();
1653 for (index, dealloc) in inner.committed_deallocated.iter().enumerate() {
1654 if dealloc.log_file_offset >= flush_log_offset {
1655 let mut deallocs = inner.committed_deallocated.split_off(index);
1656 std::mem::swap(&mut inner.committed_deallocated, &mut deallocs);
1658 break 'deallocs_outer deallocs;
1659 }
1660 }
1661 break std::mem::take(&mut inner.committed_deallocated);
1662 };
1663
1664 let mut inner = self.inner.lock();
1666 let mut totals = BTreeMap::<u64, u64>::new();
1667 for dealloc in deallocs {
1668 *(totals.entry(dealloc.owner_object_id).or_default()) +=
1669 dealloc.range.length().unwrap();
1670 inner.strategy.free(dealloc.range.clone()).expect("dealloced ranges");
1671 self.temporary_allocations.erase(&AllocatorKey { device_range: dealloc.range.clone() });
1672 }
1673
1674 for (owner_object_id, total) in totals {
1678 match inner.owner_bytes.get_mut(&owner_object_id) {
1679 Some(counters) => counters.committed_deallocated_bytes -= total,
1680 None => panic!("Failed to decrement for unknown owner: {}", owner_object_id),
1681 }
1682 }
1683 }
1684
1685 pub fn reserve(
1688 self: Arc<Self>,
1689 owner_object_id: Option<u64>,
1690 amount: u64,
1691 ) -> Option<Reservation> {
1692 {
1693 let mut inner = self.inner.lock();
1694
1695 let device_free = (Saturating(self.device_size) - inner.used_bytes()).0;
1696
1697 let limit = match owner_object_id {
1698 Some(id) => std::cmp::min(inner.owner_id_bytes_left(id), device_free),
1699 None => device_free,
1700 };
1701 if limit < amount {
1702 return None;
1703 }
1704 inner.add_reservation(owner_object_id, amount);
1705 }
1706 Some(Reservation::new(self, owner_object_id, amount))
1707 }
1708
1709 pub fn reserve_with(
1712 self: Arc<Self>,
1713 owner_object_id: Option<u64>,
1714 amount: impl FnOnce(u64) -> u64,
1715 ) -> Reservation {
1716 let amount = {
1717 let mut inner = self.inner.lock();
1718
1719 let device_free = (Saturating(self.device_size) - inner.used_bytes()).0;
1720
1721 let amount = amount(match owner_object_id {
1722 Some(id) => std::cmp::min(inner.owner_id_bytes_left(id), device_free),
1723 None => device_free,
1724 });
1725
1726 inner.add_reservation(owner_object_id, amount);
1727
1728 amount
1729 };
1730
1731 Reservation::new(self, owner_object_id, amount)
1732 }
1733
1734 pub fn get_allocated_bytes(&self) -> u64 {
1736 self.inner.lock().allocated_bytes().0
1737 }
1738
1739 pub fn get_disk_bytes(&self) -> u64 {
1741 self.device_size
1742 }
1743
1744 pub fn get_owner_allocated_bytes(&self) -> BTreeMap<u64, u64> {
1748 self.inner.lock().owner_bytes.iter().map(|(k, v)| (*k, v.allocated_bytes.0)).collect()
1749 }
1750
1751 pub fn get_used_bytes(&self) -> Saturating<u64> {
1753 let inner = self.inner.lock();
1754 inner.used_bytes()
1755 }
1756}
1757
1758impl ReservationOwner for Allocator {
1759 fn release_reservation(&self, owner_object_id: Option<u64>, amount: u64) {
1760 self.inner.lock().remove_reservation(owner_object_id, amount);
1761 }
1762}
1763
1764#[async_trait]
1765impl JournalingObject for Allocator {
1766 fn apply_mutation(
1767 &self,
1768 mutation: Mutation,
1769 context: &ApplyContext<'_, '_>,
1770 _assoc_obj: AssocObj<'_>,
1771 ) -> Result<(), Error> {
1772 match mutation {
1773 Mutation::Allocator(AllocatorMutation::MarkForDeletion(owner_object_id)) => {
1774 let mut inner = self.inner.lock();
1775 inner.owner_bytes.remove(&owner_object_id);
1776
1777 inner.info.marked_for_deletion.insert(owner_object_id);
1782 inner.volumes_deleted_pending_sync.insert(owner_object_id);
1783
1784 inner.info.limit_bytes.remove(&owner_object_id);
1785 }
1786 Mutation::Allocator(AllocatorMutation::Allocate { device_range, owner_object_id }) => {
1787 self.maximum_offset.fetch_max(device_range.end, Ordering::Relaxed);
1788 let item = AllocatorItem {
1789 key: AllocatorKey { device_range: device_range.clone().into() },
1790 value: AllocatorValue::Abs { count: 1, owner_object_id },
1791 sequence: context.checkpoint.file_offset,
1792 };
1793 let len = item.key.device_range.length().unwrap();
1794 let lower_bound = item.key.lower_bound_for_merge_into();
1795 self.tree.merge_into(item, &lower_bound);
1796 let mut inner = self.inner.lock();
1797 let entry = inner.owner_bytes.entry(owner_object_id).or_default();
1798 entry.allocated_bytes += len;
1799 if let ApplyMode::Live(transaction) = context.mode {
1800 entry.uncommitted_allocated_bytes -= len;
1801 inner.dropped_temporary_allocations.push(device_range.into());
1806 if let Some(reservation) = transaction.allocator_reservation {
1807 reservation.commit(len);
1808 }
1809 }
1810 }
1811 Mutation::Allocator(AllocatorMutation::Deallocate {
1812 device_range,
1813 owner_object_id,
1814 }) => {
1815 let item = AllocatorItem {
1816 key: AllocatorKey { device_range: device_range.into() },
1817 value: AllocatorValue::None,
1818 sequence: context.checkpoint.file_offset,
1819 };
1820 let len = item.key.device_range.length().unwrap();
1821
1822 {
1823 let mut inner = self.inner.lock();
1824 {
1825 let entry = inner.owner_bytes.entry(owner_object_id).or_default();
1826 entry.allocated_bytes -= len;
1827 if context.mode.is_live() {
1828 entry.committed_deallocated_bytes += len;
1829 }
1830 }
1831 if context.mode.is_live() {
1832 inner.committed_deallocated.push_back(CommittedDeallocation {
1833 log_file_offset: context.checkpoint.file_offset,
1834 range: item.key.device_range.clone(),
1835 owner_object_id,
1836 });
1837 }
1838 if let ApplyMode::Live(Transaction {
1839 allocator_reservation: Some(reservation),
1840 ..
1841 }) = context.mode
1842 {
1843 inner.add_reservation(reservation.owner_object_id(), len);
1844 reservation.add(len);
1845 }
1846 }
1847 let lower_bound = item.key.lower_bound_for_merge_into();
1848 self.tree.merge_into(item, &lower_bound);
1849 }
1850 Mutation::Allocator(AllocatorMutation::SetLimit { owner_object_id, bytes }) => {
1851 self.inner.lock().info.limit_bytes.insert(owner_object_id, bytes);
1856 }
1857 Mutation::BeginFlush => {
1858 self.tree.seal();
1859 let mut inner = self.inner.lock();
1862 let allocated_bytes =
1863 inner.owner_bytes.iter().map(|(k, v)| (*k, v.allocated_bytes.0)).collect();
1864 inner.info.allocated_bytes = allocated_bytes;
1865 }
1866 Mutation::EndFlush => {}
1867 _ => bail!("unexpected mutation: {:?}", mutation),
1868 }
1869 Ok(())
1870 }
1871
1872 fn drop_mutation(&self, mutation: Mutation, transaction: &Transaction<'_>) {
1873 match mutation {
1874 Mutation::Allocator(AllocatorMutation::Allocate { device_range, owner_object_id }) => {
1875 let len = device_range.length().unwrap();
1876 let mut inner = self.inner.lock();
1877 inner
1878 .owner_bytes
1879 .entry(owner_object_id)
1880 .or_default()
1881 .uncommitted_allocated_bytes -= len;
1882 if let Some(reservation) = transaction.allocator_reservation {
1883 let res_owner = reservation.owner_object_id();
1884 inner.add_reservation(res_owner, len);
1885 reservation.release_reservation(res_owner, len);
1886 }
1887 inner.strategy.free(device_range.clone().into()).expect("drop mutaton");
1888 self.temporary_allocations
1889 .erase(&AllocatorKey { device_range: device_range.into() });
1890 }
1891 Mutation::Allocator(AllocatorMutation::Deallocate { device_range, .. }) => {
1892 self.temporary_allocations
1893 .erase(&AllocatorKey { device_range: device_range.into() });
1894 }
1895 _ => {}
1896 }
1897 }
1898
1899 async fn flush(&self) -> Result<Version, Error> {
1900 let filesystem = self.filesystem.upgrade().unwrap();
1901 let object_manager = filesystem.object_manager();
1902 let earliest_version = self.tree.get_earliest_version();
1903 if !object_manager.needs_flush(self.object_id()) && earliest_version == LATEST_VERSION {
1904 return Ok(earliest_version);
1906 }
1907
1908 let fs = self.filesystem.upgrade().unwrap();
1909 let mut flusher = Flusher::new(self, &fs).await;
1910 let (new_layer_file, info) = flusher.start().await?;
1911 flusher.finish(new_layer_file, info).await
1912 }
1913}
1914
1915pub struct CoalescingIterator<I> {
1928 iter: I,
1929 item: Option<AllocatorItem>,
1930}
1931
1932impl<I: LayerIterator<AllocatorKey, AllocatorValue>> CoalescingIterator<I> {
1933 pub async fn new(iter: I) -> Result<CoalescingIterator<I>, Error> {
1934 let mut iter = Self { iter, item: None };
1935 iter.advance().await?;
1936 Ok(iter)
1937 }
1938}
1939
1940#[async_trait]
1941impl<I: LayerIterator<AllocatorKey, AllocatorValue>> LayerIterator<AllocatorKey, AllocatorValue>
1942 for CoalescingIterator<I>
1943{
1944 async fn advance(&mut self) -> Result<(), Error> {
1945 self.item = self.iter.get().map(|x| x.cloned());
1946 if self.item.is_none() {
1947 return Ok(());
1948 }
1949 let left = self.item.as_mut().unwrap();
1950 loop {
1951 self.iter.advance().await?;
1952 match self.iter.get() {
1953 None => return Ok(()),
1954 Some(right) => {
1955 ensure!(
1957 left.key.device_range.end <= right.key.device_range.start,
1958 FxfsError::Inconsistent
1959 );
1960 if left.key.device_range.end < right.key.device_range.start
1962 || left.value != *right.value
1963 {
1964 return Ok(());
1965 }
1966 left.key.device_range.end = right.key.device_range.end;
1967 }
1968 }
1969 }
1970 }
1971
1972 fn get(&self) -> Option<ItemRef<'_, AllocatorKey, AllocatorValue>> {
1973 self.item.as_ref().map(|x| x.as_item_ref())
1974 }
1975}
1976
1977struct Flusher<'a> {
1978 allocator: &'a Allocator,
1979 fs: &'a Arc<FxFilesystem>,
1980 _guard: WriteGuard<'a>,
1981}
1982
1983impl<'a> Flusher<'a> {
1984 async fn new(allocator: &'a Allocator, fs: &'a Arc<FxFilesystem>) -> Self {
1985 let keys = lock_keys![LockKey::flush(allocator.object_id())];
1986 Self { allocator, fs, _guard: fs.lock_manager().write_lock(keys).await }
1987 }
1988
1989 fn txn_options(allocator_reservation: &Reservation) -> Options<'_> {
1990 Options {
1991 skip_journal_checks: true,
1992 borrow_metadata_space: true,
1993 allocator_reservation: Some(allocator_reservation),
1994 ..Default::default()
1995 }
1996 }
1997
1998 async fn start(&mut self) -> Result<(DataObjectHandle<ObjectStore>, AllocatorInfo), Error> {
1999 let object_manager = self.fs.object_manager();
2000 let mut transaction = self
2001 .fs
2002 .clone()
2003 .new_transaction(lock_keys![], Self::txn_options(object_manager.metadata_reservation()))
2004 .await?;
2005
2006 let root_store = self.fs.root_store();
2007 let layer_object_handle = ObjectStore::create_object(
2008 &root_store,
2009 &mut transaction,
2010 HandleOptions { skip_journal_checks: true, ..Default::default() },
2011 None,
2012 )
2013 .await?;
2014 root_store.add_to_graveyard(&mut transaction, layer_object_handle.object_id());
2015 transaction.add(self.allocator.object_id(), Mutation::BeginFlush);
2022 let info = transaction
2023 .commit_with_callback(|_| {
2024 self.allocator.inner.lock().info.clone()
2027 })
2028 .await?;
2029 Ok((layer_object_handle, info))
2030 }
2031
2032 async fn finish(
2033 self,
2034 layer_object_handle: DataObjectHandle<ObjectStore>,
2035 mut info: AllocatorInfo,
2036 ) -> Result<Version, Error> {
2037 let object_manager = self.fs.object_manager();
2038 let txn_options = Self::txn_options(object_manager.metadata_reservation());
2039
2040 let layer_set = self.allocator.tree.immutable_layer_set();
2041 let total_len = layer_set.sum_len();
2042 {
2043 let mut merger = layer_set.merger();
2044 let iter = self.allocator.filter(merger.query(Query::FullScan).await?, true).await?;
2045 let iter = CoalescingIterator::new(iter).await?;
2046 self.allocator
2047 .tree
2048 .compact_with_iterator(
2049 iter,
2050 total_len,
2051 DirectWriter::new(&layer_object_handle, txn_options).await,
2052 layer_object_handle.block_size(),
2053 )
2054 .await?;
2055 }
2056
2057 let root_store = self.fs.root_store();
2058
2059 let object_handle;
2061 let reservation_update;
2062 let mut transaction = self
2063 .fs
2064 .clone()
2065 .new_transaction(
2066 lock_keys![LockKey::object(
2067 root_store.store_object_id(),
2068 self.allocator.object_id()
2069 )],
2070 txn_options,
2071 )
2072 .await?;
2073 let mut serialized_info = Vec::new();
2074
2075 debug!(oid = layer_object_handle.object_id(); "new allocator layer file");
2076 object_handle = ObjectStore::open_object(
2077 &root_store,
2078 self.allocator.object_id(),
2079 HandleOptions::default(),
2080 None,
2081 )
2082 .await?;
2083
2084 for object_id in &info.layers {
2086 root_store.add_to_graveyard(&mut transaction, *object_id);
2087 }
2088
2089 let marked_for_deletion = std::mem::take(&mut info.marked_for_deletion);
2096
2097 info.layers = vec![layer_object_handle.object_id()];
2098
2099 info.serialize_with_version(&mut serialized_info)?;
2100
2101 let mut buf = object_handle.allocate_buffer(serialized_info.len()).await;
2102 buf.as_mut_slice()[..serialized_info.len()].copy_from_slice(&serialized_info[..]);
2103 object_handle.txn_write(&mut transaction, 0u64, buf.as_ref()).await?;
2104
2105 reservation_update = ReservationUpdate::new(tree::reservation_amount_from_layer_size(
2106 layer_object_handle.get_size(),
2107 ));
2108
2109 transaction.add_with_object(
2112 self.allocator.object_id(),
2113 Mutation::EndFlush,
2114 AssocObj::Borrowed(&reservation_update),
2115 );
2116 root_store.remove_from_graveyard(&mut transaction, layer_object_handle.object_id());
2117
2118 let layers = layers_from_handles([layer_object_handle]).await?;
2119 transaction
2120 .commit_with_callback(|_| {
2121 self.allocator.tree.set_layers(layers);
2122
2123 let mut inner = self.allocator.inner.lock();
2127 inner.info.layers = info.layers;
2128 for owner_id in marked_for_deletion {
2129 inner.marked_for_deletion.remove(&owner_id);
2130 inner.info.marked_for_deletion.remove(&owner_id);
2131 }
2132 })
2133 .await?;
2134
2135 for layer in layer_set.layers {
2137 let object_id = layer.handle().map(|h| h.object_id());
2138 layer.close_layer().await;
2139 if let Some(object_id) = object_id {
2140 root_store.tombstone_object(object_id, txn_options).await?;
2141 }
2142 }
2143
2144 let mut counters = self.allocator.counters.lock();
2145 counters.num_flushes += 1;
2146 counters.last_flush_time = Some(std::time::SystemTime::now());
2147 Ok(self.allocator.tree.get_earliest_version())
2149 }
2150}
2151
2152#[cfg(test)]
2153mod tests {
2154 use crate::filesystem::{
2155 FxFilesystem, FxFilesystemBuilder, JournalingObject, OpenFxFilesystem,
2156 };
2157 use crate::fsck::fsck;
2158 use crate::lsm_tree::cache::NullCache;
2159 use crate::lsm_tree::skip_list_layer::SkipListLayer;
2160 use crate::lsm_tree::types::{FuzzyHash as _, Item, ItemRef, Layer, LayerIterator};
2161 use crate::lsm_tree::{LSMTree, Query};
2162 use crate::object_handle::ObjectHandle;
2163 use crate::object_store::allocator::merge::merge;
2164 use crate::object_store::allocator::{
2165 Allocator, AllocatorKey, AllocatorValue, CoalescingIterator,
2166 };
2167 use crate::object_store::transaction::{Options, TRANSACTION_METADATA_MAX_AMOUNT, lock_keys};
2168 use crate::object_store::volume::root_volume;
2169 use crate::object_store::{Directory, LockKey, NewChildStoreOptions, ObjectStore};
2170 use crate::range::RangeExt;
2171 use crate::round::round_up;
2172 use crate::testing;
2173 use fuchsia_async as fasync;
2174 use fuchsia_sync::Mutex;
2175 use std::cmp::{max, min};
2176 use std::ops::{Bound, Range};
2177 use std::sync::Arc;
2178 use storage_device::DeviceHolder;
2179 use storage_device::fake_device::FakeDevice;
2180
2181 #[test]
2182 fn test_allocator_key_is_range_based() {
2183 assert!(AllocatorKey { device_range: 0..100 }.is_range_key());
2185 }
2186
2187 #[fuchsia::test]
2188 async fn test_coalescing_iterator() {
2189 let skip_list = SkipListLayer::new(100);
2190 let items = [
2191 Item::new(
2192 AllocatorKey { device_range: 0..100 },
2193 AllocatorValue::Abs { count: 1, owner_object_id: 99 },
2194 ),
2195 Item::new(
2196 AllocatorKey { device_range: 100..200 },
2197 AllocatorValue::Abs { count: 1, owner_object_id: 99 },
2198 ),
2199 ];
2200 skip_list.insert(items[1].clone()).expect("insert error");
2201 skip_list.insert(items[0].clone()).expect("insert error");
2202 let mut iter =
2203 CoalescingIterator::new(skip_list.seek(Bound::Unbounded).await.expect("seek failed"))
2204 .await
2205 .expect("new failed");
2206 let ItemRef { key, value, .. } = iter.get().expect("get failed");
2207 assert_eq!(
2208 (key, value),
2209 (
2210 &AllocatorKey { device_range: 0..200 },
2211 &AllocatorValue::Abs { count: 1, owner_object_id: 99 }
2212 )
2213 );
2214 iter.advance().await.expect("advance failed");
2215 assert!(iter.get().is_none());
2216 }
2217
2218 #[fuchsia::test]
2219 async fn test_merge_and_coalesce_across_three_layers() {
2220 let lsm_tree = LSMTree::new(merge, Box::new(NullCache {}));
2221 lsm_tree
2222 .insert(Item::new(
2223 AllocatorKey { device_range: 100..200 },
2224 AllocatorValue::Abs { count: 1, owner_object_id: 99 },
2225 ))
2226 .expect("insert error");
2227 lsm_tree.seal();
2228 lsm_tree
2229 .insert(Item::new(
2230 AllocatorKey { device_range: 0..100 },
2231 AllocatorValue::Abs { count: 1, owner_object_id: 99 },
2232 ))
2233 .expect("insert error");
2234
2235 let layer_set = lsm_tree.layer_set();
2236 let mut merger = layer_set.merger();
2237 let mut iter =
2238 CoalescingIterator::new(merger.query(Query::FullScan).await.expect("seek failed"))
2239 .await
2240 .expect("new failed");
2241 let ItemRef { key, value, .. } = iter.get().expect("get failed");
2242 assert_eq!(
2243 (key, value),
2244 (
2245 &AllocatorKey { device_range: 0..200 },
2246 &AllocatorValue::Abs { count: 1, owner_object_id: 99 }
2247 )
2248 );
2249 iter.advance().await.expect("advance failed");
2250 assert!(iter.get().is_none());
2251 }
2252
2253 #[fuchsia::test]
2254 async fn test_merge_and_coalesce_wont_merge_across_object_id() {
2255 let lsm_tree = LSMTree::new(merge, Box::new(NullCache {}));
2256 lsm_tree
2257 .insert(Item::new(
2258 AllocatorKey { device_range: 100..200 },
2259 AllocatorValue::Abs { count: 1, owner_object_id: 99 },
2260 ))
2261 .expect("insert error");
2262 lsm_tree.seal();
2263 lsm_tree
2264 .insert(Item::new(
2265 AllocatorKey { device_range: 0..100 },
2266 AllocatorValue::Abs { count: 1, owner_object_id: 98 },
2267 ))
2268 .expect("insert error");
2269
2270 let layer_set = lsm_tree.layer_set();
2271 let mut merger = layer_set.merger();
2272 let mut iter =
2273 CoalescingIterator::new(merger.query(Query::FullScan).await.expect("seek failed"))
2274 .await
2275 .expect("new failed");
2276 let ItemRef { key, value, .. } = iter.get().expect("get failed");
2277 assert_eq!(
2278 (key, value),
2279 (
2280 &AllocatorKey { device_range: 0..100 },
2281 &AllocatorValue::Abs { count: 1, owner_object_id: 98 },
2282 )
2283 );
2284 iter.advance().await.expect("advance failed");
2285 let ItemRef { key, value, .. } = iter.get().expect("get failed");
2286 assert_eq!(
2287 (key, value),
2288 (
2289 &AllocatorKey { device_range: 100..200 },
2290 &AllocatorValue::Abs { count: 1, owner_object_id: 99 }
2291 )
2292 );
2293 iter.advance().await.expect("advance failed");
2294 assert!(iter.get().is_none());
2295 }
2296
2297 fn overlap(a: &Range<u64>, b: &Range<u64>) -> u64 {
2298 if a.end > b.start && a.start < b.end {
2299 min(a.end, b.end) - max(a.start, b.start)
2300 } else {
2301 0
2302 }
2303 }
2304
2305 async fn collect_allocations(allocator: &Allocator) -> Vec<Range<u64>> {
2306 let layer_set = allocator.tree.layer_set();
2307 let mut merger = layer_set.merger();
2308 let mut iter = allocator
2309 .filter(merger.query(Query::FullScan).await.expect("seek failed"), false)
2310 .await
2311 .expect("build iterator");
2312 let mut allocations: Vec<Range<u64>> = Vec::new();
2313 while let Some(ItemRef { key: AllocatorKey { device_range }, .. }) = iter.get() {
2314 if let Some(r) = allocations.last() {
2315 assert!(device_range.start >= r.end);
2316 }
2317 allocations.push(device_range.clone());
2318 iter.advance().await.expect("advance failed");
2319 }
2320 allocations
2321 }
2322
2323 async fn check_allocations(allocator: &Allocator, expected_allocations: &[Range<u64>]) {
2324 let layer_set = allocator.tree.layer_set();
2325 let mut merger = layer_set.merger();
2326 let mut iter = allocator
2327 .filter(merger.query(Query::FullScan).await.expect("seek failed"), false)
2328 .await
2329 .expect("build iterator");
2330 let mut found = 0;
2331 while let Some(ItemRef { key: AllocatorKey { device_range }, .. }) = iter.get() {
2332 let mut l = device_range.length().expect("Invalid range");
2333 found += l;
2334 for range in expected_allocations {
2337 l -= overlap(range, device_range);
2338 if l == 0 {
2339 break;
2340 }
2341 }
2342 assert_eq!(l, 0, "range {device_range:?} not covered by expectations");
2343 iter.advance().await.expect("advance failed");
2344 }
2345 assert_eq!(found, expected_allocations.iter().map(|r| r.length().unwrap()).sum::<u64>());
2347 }
2348
2349 async fn test_fs() -> (OpenFxFilesystem, Arc<Allocator>) {
2350 let device = DeviceHolder::new(FakeDevice::new(4096, 4096));
2351 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2352 let allocator = fs.allocator();
2353 (fs, allocator)
2354 }
2355
2356 #[fuchsia::test]
2357 async fn test_allocations() {
2358 const STORE_OBJECT_ID: u64 = 99;
2359 let (fs, allocator) = test_fs().await;
2360 let mut transaction =
2361 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2362 let mut device_ranges = collect_allocations(&allocator).await;
2363
2364 let expected = vec![
2366 0..4096, 4096..139264, 139264..204800, 204800..335872, 335872..401408, 524288..528384, ];
2373 assert_eq!(device_ranges, expected);
2374 device_ranges.push(
2375 allocator
2376 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2377 .await
2378 .expect("allocate failed"),
2379 );
2380 assert_eq!(device_ranges.last().unwrap().length().expect("Invalid range"), fs.block_size());
2381 device_ranges.push(
2382 allocator
2383 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2384 .await
2385 .expect("allocate failed"),
2386 );
2387 assert_eq!(device_ranges.last().unwrap().length().expect("Invalid range"), fs.block_size());
2388 assert_eq!(overlap(&device_ranges[0], &device_ranges[1]), 0);
2389 transaction.commit().await.expect("commit failed");
2390 let mut transaction =
2391 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2392 device_ranges.push(
2393 allocator
2394 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2395 .await
2396 .expect("allocate failed"),
2397 );
2398 assert_eq!(device_ranges[7].length().expect("Invalid range"), fs.block_size());
2399 assert_eq!(overlap(&device_ranges[5], &device_ranges[7]), 0);
2400 assert_eq!(overlap(&device_ranges[6], &device_ranges[7]), 0);
2401 transaction.commit().await.expect("commit failed");
2402
2403 check_allocations(&allocator, &device_ranges).await;
2404 }
2405
2406 #[fuchsia::test]
2407 async fn test_allocate_more_than_max_size() {
2408 const STORE_OBJECT_ID: u64 = 99;
2409 let (fs, allocator) = test_fs().await;
2410 let mut transaction =
2411 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2412 let mut device_ranges = collect_allocations(&allocator).await;
2413 device_ranges.push(
2414 allocator
2415 .allocate(&mut transaction, STORE_OBJECT_ID, fs.device().size())
2416 .await
2417 .expect("allocate failed"),
2418 );
2419 assert_eq!(
2420 device_ranges.last().unwrap().length().expect("Invalid range"),
2421 allocator.max_extent_size_bytes
2422 );
2423 transaction.commit().await.expect("commit failed");
2424
2425 check_allocations(&allocator, &device_ranges).await;
2426 }
2427
2428 #[fuchsia::test]
2429 async fn test_deallocations() {
2430 const STORE_OBJECT_ID: u64 = 99;
2431 let (fs, allocator) = test_fs().await;
2432 let initial_allocations = collect_allocations(&allocator).await;
2433
2434 let mut transaction =
2435 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2436 let device_range1 = allocator
2437 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2438 .await
2439 .expect("allocate failed");
2440 assert_eq!(device_range1.length().expect("Invalid range"), fs.block_size());
2441 transaction.commit().await.expect("commit failed");
2442
2443 let mut transaction =
2444 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2445 allocator
2446 .deallocate(&mut transaction, STORE_OBJECT_ID, device_range1)
2447 .await
2448 .expect("deallocate failed");
2449 transaction.commit().await.expect("commit failed");
2450
2451 check_allocations(&allocator, &initial_allocations).await;
2452 }
2453
2454 #[fuchsia::test]
2455 async fn test_mark_allocated() {
2456 const STORE_OBJECT_ID: u64 = 99;
2457 let (fs, allocator) = test_fs().await;
2458 let mut device_ranges = collect_allocations(&allocator).await;
2459 let range = {
2460 let mut transaction = fs
2461 .clone()
2462 .new_transaction(lock_keys![], Options::default())
2463 .await
2464 .expect("new failed");
2465 allocator
2467 .allocate(&mut transaction, STORE_OBJECT_ID, 2 * fs.block_size())
2468 .await
2469 .expect("allocate failed")
2470 };
2472
2473 let mut transaction =
2474 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2475
2476 device_ranges.push(
2479 allocator
2480 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2481 .await
2482 .expect("allocate failed"),
2483 );
2484
2485 assert_eq!(device_ranges.last().unwrap().start, range.start);
2486
2487 let mut range2 = range.clone();
2489 range2.start += fs.block_size();
2490 allocator
2491 .mark_allocated(&mut transaction, STORE_OBJECT_ID, range2.clone())
2492 .expect("mark_allocated failed");
2493 device_ranges.push(range2);
2494
2495 device_ranges.push(
2497 allocator
2498 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2499 .await
2500 .expect("allocate failed"),
2501 );
2502 let last_range = device_ranges.last().unwrap();
2503 assert_eq!(last_range.length().expect("Invalid range"), fs.block_size());
2504 assert_eq!(overlap(last_range, &range), 0);
2505 transaction.commit().await.expect("commit failed");
2506
2507 check_allocations(&allocator, &device_ranges).await;
2508 }
2509
2510 #[fuchsia::test]
2511 async fn test_mark_for_deletion() {
2512 const STORE_OBJECT_ID: u64 = 99;
2513 let (fs, allocator) = test_fs().await;
2514
2515 let initial_allocated_bytes = allocator.get_allocated_bytes();
2517 let mut device_ranges = collect_allocations(&allocator).await;
2518 let mut transaction =
2519 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2520 for _ in 0..15 {
2522 device_ranges.push(
2523 allocator
2524 .allocate(&mut transaction, STORE_OBJECT_ID, 100 * fs.block_size())
2525 .await
2526 .expect("allocate failed"),
2527 );
2528 device_ranges.push(
2529 allocator
2530 .allocate(&mut transaction, STORE_OBJECT_ID, 100 * fs.block_size())
2531 .await
2532 .expect("allocate2 failed"),
2533 );
2534 }
2535 transaction.commit().await.expect("commit failed");
2536 check_allocations(&allocator, &device_ranges).await;
2537
2538 assert_eq!(
2539 allocator.get_allocated_bytes(),
2540 initial_allocated_bytes + fs.block_size() * 3000
2541 );
2542
2543 let mut transaction =
2545 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2546 allocator.mark_for_deletion(&mut transaction, STORE_OBJECT_ID);
2547 transaction.commit().await.expect("commit failed");
2548
2549 assert_eq!(allocator.get_allocated_bytes(), initial_allocated_bytes);
2551 check_allocations(&allocator, &device_ranges).await;
2552
2553 device_ranges.clear();
2556
2557 let mut transaction =
2558 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2559 let target_bytes = 1500 * fs.block_size();
2560 while device_ranges.iter().map(|x| x.length().unwrap()).sum::<u64>() != target_bytes {
2561 let len = std::cmp::min(
2562 target_bytes - device_ranges.iter().map(|x| x.length().unwrap()).sum::<u64>(),
2563 100 * fs.block_size(),
2564 );
2565 device_ranges.push(
2566 allocator.allocate(&mut transaction, 100, len).await.expect("allocate failed"),
2567 );
2568 }
2569 transaction.commit().await.expect("commit failed");
2570
2571 allocator.flush().await.expect("flush failed");
2573
2574 assert!(!allocator.get_owner_allocated_bytes().contains_key(&STORE_OBJECT_ID));
2578 assert_eq!(*allocator.get_owner_allocated_bytes().get(&100).unwrap(), target_bytes,);
2579 }
2580
2581 async fn create_file(store: &Arc<ObjectStore>, size: usize) {
2582 let root_directory =
2583 Directory::open(store, store.root_directory_object_id()).await.expect("open failed");
2584
2585 let mut transaction = store
2586 .filesystem()
2587 .new_transaction(
2588 lock_keys![LockKey::object(
2589 store.store_object_id(),
2590 store.root_directory_object_id()
2591 )],
2592 Options::default(),
2593 )
2594 .await
2595 .expect("new_transaction failed");
2596 let file = root_directory
2597 .create_child_file(&mut transaction, &format!("foo {}", size))
2598 .await
2599 .expect("create_child_file failed");
2600 transaction.commit().await.expect("commit failed");
2601
2602 let buffer = file.allocate_buffer(size).await;
2603
2604 let mut transaction = file
2606 .new_transaction_with_options(Options {
2607 borrow_metadata_space: true,
2608 ..Default::default()
2609 })
2610 .await
2611 .expect("new_transaction_with_options failed");
2612 file.txn_write(&mut transaction, 0, buffer.as_ref()).await.expect("txn_write failed");
2613 transaction.commit().await.expect("commit failed");
2614 }
2615
2616 #[fuchsia::test]
2617 async fn test_replay_with_deleted_store_and_compaction() {
2618 let (fs, _) = test_fs().await;
2619
2620 const FILE_SIZE: usize = 10_000_000;
2621
2622 let mut store_id = {
2623 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2624 let store = root_vol
2625 .new_volume("vol", NewChildStoreOptions::default())
2626 .await
2627 .expect("new_volume failed");
2628
2629 create_file(&store, FILE_SIZE).await;
2630 store.store_object_id()
2631 };
2632
2633 fs.close().await.expect("close failed");
2634 let device = fs.take_device().await;
2635 device.reopen(false);
2636
2637 let mut fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2638
2639 fs.journal().compact().await.expect("compact failed");
2642
2643 for _ in 0..2 {
2644 {
2645 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2646
2647 let transaction = fs
2648 .clone()
2649 .new_transaction(
2650 lock_keys![
2651 LockKey::object(
2652 root_vol.volume_directory().store().store_object_id(),
2653 root_vol.volume_directory().object_id(),
2654 ),
2655 LockKey::flush(store_id)
2656 ],
2657 Options { borrow_metadata_space: true, ..Default::default() },
2658 )
2659 .await
2660 .expect("new_transaction failed");
2661 root_vol
2662 .delete_volume("vol", transaction, || {})
2663 .await
2664 .expect("delete_volume failed");
2665
2666 let store = root_vol
2667 .new_volume("vol", NewChildStoreOptions::default())
2668 .await
2669 .expect("new_volume failed");
2670 create_file(&store, FILE_SIZE).await;
2671 store_id = store.store_object_id();
2672 }
2673
2674 fs.close().await.expect("close failed");
2675 let device = fs.take_device().await;
2676 device.reopen(false);
2677
2678 fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2679 }
2680
2681 fsck(fs.clone()).await.expect("fsck failed");
2682 fs.close().await.expect("close failed");
2683 }
2684
2685 #[fuchsia::test(threads = 4)]
2686 async fn test_compaction_delete_race() {
2687 let (fs, _allocator) = test_fs().await;
2688
2689 {
2690 const FILE_SIZE: usize = 10_000_000;
2691
2692 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2693 let store = root_vol
2694 .new_volume("vol", NewChildStoreOptions::default())
2695 .await
2696 .expect("new_volume failed");
2697
2698 create_file(&store, FILE_SIZE).await;
2699
2700 let fs_clone = fs.clone();
2702
2703 let executor_tasks = testing::force_executor_threads_to_run(4).await;
2706
2707 let task = fasync::Task::spawn(async move {
2708 fs_clone.journal().compact().await.expect("compact failed");
2709 });
2710
2711 drop(executor_tasks);
2713
2714 let sleep = rand::random_range(3000..6000);
2717 std::thread::sleep(std::time::Duration::from_micros(sleep));
2718 log::info!("sleep {sleep}us");
2719
2720 let transaction = fs
2721 .clone()
2722 .new_transaction(
2723 lock_keys![
2724 LockKey::object(
2725 root_vol.volume_directory().store().store_object_id(),
2726 root_vol.volume_directory().object_id(),
2727 ),
2728 LockKey::flush(store.store_object_id())
2729 ],
2730 Options { borrow_metadata_space: true, ..Default::default() },
2731 )
2732 .await
2733 .expect("new_transaction failed");
2734 root_vol.delete_volume("vol", transaction, || {}).await.expect("delete_volume failed");
2735
2736 task.await;
2737 }
2738
2739 fs.journal().compact().await.expect("compact failed");
2740 fs.close().await.expect("close failed");
2741
2742 let device = fs.take_device().await;
2743 device.reopen(false);
2744
2745 let fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2746 fsck(fs.clone()).await.expect("fsck failed");
2747 fs.close().await.expect("close failed");
2748 }
2749
2750 #[fuchsia::test]
2751 async fn test_delete_multiple_volumes() {
2752 let (mut fs, _) = test_fs().await;
2753
2754 for _ in 0..50 {
2755 {
2756 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2757 let store = root_vol
2758 .new_volume("vol", NewChildStoreOptions::default())
2759 .await
2760 .expect("new_volume failed");
2761
2762 create_file(&store, 1_000_000).await;
2763
2764 let transaction = fs
2765 .clone()
2766 .new_transaction(
2767 lock_keys![
2768 LockKey::object(
2769 root_vol.volume_directory().store().store_object_id(),
2770 root_vol.volume_directory().object_id(),
2771 ),
2772 LockKey::flush(store.store_object_id())
2773 ],
2774 Options { borrow_metadata_space: true, ..Default::default() },
2775 )
2776 .await
2777 .expect("new_transaction failed");
2778 root_vol
2779 .delete_volume("vol", transaction, || {})
2780 .await
2781 .expect("delete_volume failed");
2782
2783 fs.allocator().flush().await.expect("flush failed");
2784 }
2785
2786 fs.close().await.expect("close failed");
2787 let device = fs.take_device().await;
2788 device.reopen(false);
2789
2790 fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2791 }
2792
2793 fsck(fs.clone()).await.expect("fsck failed");
2794 fs.close().await.expect("close failed");
2795 }
2796
2797 #[fuchsia::test]
2798 async fn test_allocate_free_reallocate() {
2799 const STORE_OBJECT_ID: u64 = 99;
2800 let (fs, allocator) = test_fs().await;
2801
2802 let mut device_ranges = Vec::new();
2804 let mut transaction =
2805 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2806 for _ in 0..30 {
2807 device_ranges.push(
2808 allocator
2809 .allocate(&mut transaction, STORE_OBJECT_ID, 100 * fs.block_size())
2810 .await
2811 .expect("allocate failed"),
2812 );
2813 }
2814 transaction.commit().await.expect("commit failed");
2815
2816 assert_eq!(
2817 fs.block_size() * 3000,
2818 *allocator.get_owner_allocated_bytes().entry(STORE_OBJECT_ID).or_default()
2819 );
2820
2821 let mut transaction =
2823 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2824 for range in std::mem::replace(&mut device_ranges, Vec::new()) {
2825 allocator.deallocate(&mut transaction, STORE_OBJECT_ID, range).await.expect("dealloc");
2826 }
2827 transaction.commit().await.expect("commit failed");
2828
2829 assert_eq!(0, *allocator.get_owner_allocated_bytes().entry(STORE_OBJECT_ID).or_default());
2830
2831 let mut transaction =
2834 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2835 let target_len = 1500 * fs.block_size();
2836 while device_ranges.iter().map(|i| i.length().unwrap()).sum::<u64>() != target_len {
2837 let len = target_len - device_ranges.iter().map(|i| i.length().unwrap()).sum::<u64>();
2838 device_ranges.push(
2839 allocator
2840 .allocate(&mut transaction, STORE_OBJECT_ID, len)
2841 .await
2842 .expect("allocate failed"),
2843 );
2844 }
2845 transaction.commit().await.expect("commit failed");
2846
2847 assert_eq!(
2848 fs.block_size() * 1500,
2849 *allocator.get_owner_allocated_bytes().entry(STORE_OBJECT_ID).or_default()
2850 );
2851 }
2852
2853 #[fuchsia::test]
2854 async fn test_flush() {
2855 const STORE_OBJECT_ID: u64 = 99;
2856
2857 let mut device_ranges = Vec::new();
2858 let device = {
2859 let (fs, allocator) = test_fs().await;
2860 let mut transaction = fs
2861 .clone()
2862 .new_transaction(lock_keys![], Options::default())
2863 .await
2864 .expect("new failed");
2865 device_ranges.push(
2866 allocator
2867 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2868 .await
2869 .expect("allocate failed"),
2870 );
2871 device_ranges.push(
2872 allocator
2873 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2874 .await
2875 .expect("allocate failed"),
2876 );
2877 device_ranges.push(
2878 allocator
2879 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2880 .await
2881 .expect("allocate failed"),
2882 );
2883 transaction.commit().await.expect("commit failed");
2884
2885 allocator.flush().await.expect("flush failed");
2886
2887 fs.close().await.expect("close failed");
2888 fs.take_device().await
2889 };
2890
2891 device.reopen(false);
2892 let fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2893 let allocator = fs.allocator();
2894
2895 let allocated = collect_allocations(&allocator).await;
2896
2897 for i in &device_ranges {
2899 let mut overlapping = 0;
2900 for j in &allocated {
2901 overlapping += overlap(i, j);
2902 }
2903 assert_eq!(overlapping, i.length().unwrap(), "Range {i:?} not allocated");
2904 }
2905
2906 let mut transaction =
2907 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2908 let range = allocator
2909 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2910 .await
2911 .expect("allocate failed");
2912
2913 for r in &allocated {
2915 assert_eq!(overlap(r, &range), 0);
2916 }
2917 transaction.commit().await.expect("commit failed");
2918 }
2919
2920 #[fuchsia::test]
2921 async fn test_dropped_transaction() {
2922 const STORE_OBJECT_ID: u64 = 99;
2923 let (fs, allocator) = test_fs().await;
2924 let allocated_range = {
2925 let mut transaction = fs
2926 .clone()
2927 .new_transaction(lock_keys![], Options::default())
2928 .await
2929 .expect("new_transaction failed");
2930 allocator
2931 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2932 .await
2933 .expect("allocate failed")
2934 };
2935 let mut transaction = fs
2938 .clone()
2939 .new_transaction(lock_keys![], Options::default())
2940 .await
2941 .expect("new_transaction failed");
2942 assert_eq!(
2943 allocator
2944 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2945 .await
2946 .expect("allocate failed"),
2947 allocated_range
2948 );
2949 }
2950
2951 #[fuchsia::test]
2952 async fn test_cleanup_removed_owner() {
2953 const STORE_OBJECT_ID: u64 = 99;
2954 let device = {
2955 let (fs, allocator) = test_fs().await;
2956
2957 assert!(!allocator.get_owner_allocated_bytes().contains_key(&STORE_OBJECT_ID));
2958 {
2959 let mut transaction =
2960 fs.clone().new_transaction(lock_keys![], Options::default()).await.unwrap();
2961 allocator
2962 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2963 .await
2964 .expect("Allocating");
2965 transaction.commit().await.expect("Committing.");
2966 }
2967 allocator.flush().await.expect("Flushing");
2968 assert!(allocator.get_owner_allocated_bytes().contains_key(&STORE_OBJECT_ID));
2969 {
2970 let mut transaction =
2971 fs.clone().new_transaction(lock_keys![], Options::default()).await.unwrap();
2972 allocator.mark_for_deletion(&mut transaction, STORE_OBJECT_ID);
2973 transaction.commit().await.expect("Committing.");
2974 }
2975 assert!(!allocator.get_owner_allocated_bytes().contains_key(&STORE_OBJECT_ID));
2976 fs.close().await.expect("Closing");
2977 fs.take_device().await
2978 };
2979
2980 device.reopen(false);
2981 let fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2982 let allocator = fs.allocator();
2983 assert!(!allocator.get_owner_allocated_bytes().contains_key(&STORE_OBJECT_ID));
2984 }
2985
2986 #[fuchsia::test]
2987 async fn test_allocated_bytes() {
2988 const STORE_OBJECT_ID: u64 = 99;
2989 let (fs, allocator) = test_fs().await;
2990
2991 let initial_allocated_bytes = allocator.get_allocated_bytes();
2992
2993 let allocated_bytes = initial_allocated_bytes + fs.block_size();
2995 let allocated_range = {
2996 let mut transaction = fs
2997 .clone()
2998 .new_transaction(lock_keys![], Options::default())
2999 .await
3000 .expect("new_transaction failed");
3001 let range = allocator
3002 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
3003 .await
3004 .expect("allocate failed");
3005 transaction.commit().await.expect("commit failed");
3006 assert_eq!(allocator.get_allocated_bytes(), allocated_bytes);
3007 range
3008 };
3009
3010 {
3011 let mut transaction = fs
3012 .clone()
3013 .new_transaction(lock_keys![], Options::default())
3014 .await
3015 .expect("new_transaction failed");
3016 allocator
3017 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
3018 .await
3019 .expect("allocate failed");
3020
3021 assert_eq!(allocator.get_allocated_bytes(), allocated_bytes);
3023 }
3024
3025 assert_eq!(allocator.get_allocated_bytes(), allocated_bytes);
3027
3028 let deallocate_range = allocated_range.start + 20..allocated_range.end - 20;
3030 let mut transaction =
3031 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
3032 allocator
3033 .deallocate(&mut transaction, STORE_OBJECT_ID, deallocate_range)
3034 .await
3035 .expect("deallocate failed");
3036
3037 assert_eq!(allocator.get_allocated_bytes(), allocated_bytes);
3039
3040 transaction.commit().await.expect("commit failed");
3041
3042 assert_eq!(allocator.get_allocated_bytes(), initial_allocated_bytes + 40);
3044 }
3045
3046 #[fuchsia::test]
3047 async fn test_persist_bytes_limit() {
3048 const LIMIT: u64 = 12345;
3049 const OWNER_ID: u64 = 12;
3050
3051 let (fs, allocator) = test_fs().await;
3052 {
3053 let mut transaction = fs
3054 .clone()
3055 .new_transaction(lock_keys![], Options::default())
3056 .await
3057 .expect("new_transaction failed");
3058 allocator
3059 .set_bytes_limit(&mut transaction, OWNER_ID, LIMIT)
3060 .expect("Failed to set limit.");
3061 assert!(allocator.inner.lock().info.limit_bytes.get(&OWNER_ID).is_none());
3062 transaction.commit().await.expect("Failed to commit transaction");
3063 let bytes: u64 = *allocator
3064 .inner
3065 .lock()
3066 .info
3067 .limit_bytes
3068 .get(&OWNER_ID)
3069 .expect("Failed to find limit");
3070 assert_eq!(LIMIT, bytes);
3071 }
3072 }
3073
3074 fn coalesce_ranges(ranges: Vec<Range<u64>>) -> Vec<Range<u64>> {
3078 let mut coalesced = Vec::new();
3079 let mut prev: Option<Range<u64>> = None;
3080 for range in ranges {
3081 if let Some(prev_range) = &mut prev {
3082 if range.start == prev_range.end {
3083 prev_range.end = range.end;
3084 } else {
3085 coalesced.push(prev_range.clone());
3086 prev = Some(range);
3087 }
3088 } else {
3089 prev = Some(range);
3090 }
3091 }
3092 if let Some(prev_range) = prev {
3093 coalesced.push(prev_range);
3094 }
3095 coalesced
3096 }
3097
3098 #[fuchsia::test]
3099 async fn test_take_for_trimming() {
3100 const STORE_OBJECT_ID: u64 = 99;
3101
3102 let allocated_range;
3105 let expected_free_ranges;
3106 let device = {
3107 let (fs, allocator) = test_fs().await;
3108 let bs = fs.block_size();
3109 let mut transaction = fs
3110 .clone()
3111 .new_transaction(lock_keys![], Options::default())
3112 .await
3113 .expect("new failed");
3114 allocated_range = allocator
3115 .allocate(&mut transaction, STORE_OBJECT_ID, 32 * bs)
3116 .await
3117 .expect("allocate failed");
3118 transaction.commit().await.expect("commit failed");
3119
3120 let mut transaction = fs
3121 .clone()
3122 .new_transaction(lock_keys![], Options::default())
3123 .await
3124 .expect("new failed");
3125 let base = allocated_range.start;
3126 expected_free_ranges = vec![
3127 base..(base + (bs * 1)),
3128 (base + (bs * 2))..(base + (bs * 3)),
3129 (base + (bs * 4))..(base + (bs * 8)),
3133 (base + (bs * 8))..(base + (bs * 12)),
3134 (base + (bs * 12))..(base + (bs * 13)),
3135 (base + (bs * 29))..(base + (bs * 30)),
3136 ];
3137 for range in &expected_free_ranges {
3138 allocator
3139 .deallocate(&mut transaction, STORE_OBJECT_ID, range.clone())
3140 .await
3141 .expect("deallocate failed");
3142 }
3143 transaction.commit().await.expect("commit failed");
3144
3145 allocator.flush().await.expect("flush failed");
3146
3147 fs.close().await.expect("close failed");
3148 fs.take_device().await
3149 };
3150
3151 device.reopen(false);
3152 let fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
3153 let allocator = fs.allocator();
3154
3155 let max_extent_size = fs.block_size() as usize * 4;
3159 const EXTENTS_PER_BATCH: usize = 2;
3160 let mut free_ranges = vec![];
3161 let mut offset = allocated_range.start;
3162 while offset < allocated_range.end {
3163 let free = allocator
3164 .take_for_trimming(offset, max_extent_size, EXTENTS_PER_BATCH)
3165 .await
3166 .expect("take_for_trimming failed");
3167 free_ranges.extend(
3168 free.extents().iter().filter(|range| range.end <= allocated_range.end).cloned(),
3169 );
3170 offset = free.extents().last().expect("Unexpectedly hit the end of free extents").end;
3171 }
3172 let coalesced_free_ranges = coalesce_ranges(free_ranges);
3175 let coalesced_expected_free_ranges = coalesce_ranges(expected_free_ranges);
3176
3177 assert_eq!(coalesced_free_ranges, coalesced_expected_free_ranges);
3178 }
3179
3180 #[fuchsia::test]
3181 async fn test_allocations_wait_for_free_extents() {
3182 const STORE_OBJECT_ID: u64 = 99;
3183 let (fs, allocator) = test_fs().await;
3184 let allocator_clone = allocator.clone();
3185
3186 let mut transaction =
3187 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
3188
3189 let max_extent_size = fs.device().size() as usize;
3191 const EXTENTS_PER_BATCH: usize = usize::MAX;
3192
3193 let trim_done = Arc::new(Mutex::new(false));
3199 let trimmable_extents = allocator
3200 .take_for_trimming(0, max_extent_size, EXTENTS_PER_BATCH)
3201 .await
3202 .expect("take_for_trimming failed");
3203
3204 let trim_done_clone = trim_done.clone();
3205 let bs = fs.block_size();
3206 let alloc_task = fasync::Task::spawn(async move {
3207 allocator_clone
3208 .allocate(&mut transaction, STORE_OBJECT_ID, bs)
3209 .await
3210 .expect("allocate should fail");
3211 {
3212 assert!(*trim_done_clone.lock(), "Allocation finished before trim completed");
3213 }
3214 transaction.commit().await.expect("commit failed");
3215 });
3216
3217 fasync::Timer::new(std::time::Duration::from_millis(100)).await;
3220
3221 {
3223 let mut trim_done = trim_done.lock();
3224 std::mem::drop(trimmable_extents);
3225 *trim_done = true;
3226 }
3227
3228 alloc_task.await;
3229 }
3230
3231 #[fuchsia::test]
3232 async fn test_allocation_with_reservation_not_multiple_of_block_size() {
3233 const STORE_OBJECT_ID: u64 = 99;
3234 let (fs, allocator) = test_fs().await;
3235
3236 const RESERVATION_AMOUNT: u64 = TRANSACTION_METADATA_MAX_AMOUNT + 5000;
3238 let reservation =
3239 allocator.clone().reserve(Some(STORE_OBJECT_ID), RESERVATION_AMOUNT).unwrap();
3240
3241 let mut transaction = fs
3242 .clone()
3243 .new_transaction(
3244 lock_keys![],
3245 Options { allocator_reservation: Some(&reservation), ..Options::default() },
3246 )
3247 .await
3248 .expect("new failed");
3249
3250 let range = allocator
3251 .allocate(
3252 &mut transaction,
3253 STORE_OBJECT_ID,
3254 round_up(RESERVATION_AMOUNT, fs.block_size()).unwrap(),
3255 )
3256 .await
3257 .expect("allocate faiiled");
3258 assert_eq!((range.end - range.start) % fs.block_size(), 0);
3259
3260 println!("{}", range.end - range.start);
3261 }
3262}