1pub mod merge;
90pub mod strategy;
91
92use crate::drop_event::DropEvent;
93use crate::errors::FxfsError;
94use crate::filesystem::{ApplyContext, ApplyMode, FxFilesystem, JournalingObject, SyncOptions};
95use crate::log::*;
96use crate::lsm_tree::cache::NullCache;
97use crate::lsm_tree::skip_list_layer::SkipListLayer;
98use crate::lsm_tree::types::{
99 FuzzyHash, Item, ItemRef, Layer, LayerIterator, LayerKey, MergeType, OrdLowerBound,
100 OrdUpperBound, RangeKey, SortByU64, Value,
101};
102use crate::lsm_tree::{LSMTree, Query, compact_with_iterator, layers_from_handles};
103use crate::object_handle::{INVALID_OBJECT_ID, ObjectHandle, ReadObjectHandle};
104use crate::object_store::object_manager::ReservationUpdate;
105use crate::object_store::transaction::{
106 AllocatorMutation, AssocObj, LockKey, Mutation, Options, Transaction, WriteGuard, lock_keys,
107};
108use crate::object_store::{
109 DataObjectHandle, DirectWriter, HandleOptions, ObjectStore, ReservedId, tree,
110};
111use crate::range::RangeExt;
112use crate::round::{round_div, round_down, round_up};
113use crate::serialized_types::{
114 DEFAULT_MAX_SERIALIZED_RECORD_SIZE, LATEST_VERSION, Version, Versioned, VersionedLatest,
115};
116use anyhow::{Context, Error, anyhow, bail, ensure};
117use async_trait::async_trait;
118use either::Either::{Left, Right};
119use event_listener::EventListener;
120use fprint::TypeFingerprint;
121use fuchsia_inspect::HistogramProperty;
122use fuchsia_sync::Mutex;
123use futures::FutureExt;
124use merge::{filter_marked_for_deletion, filter_tombstones, merge};
125use serde::{Deserialize, Serialize};
126use std::borrow::Borrow;
127use std::collections::{BTreeMap, HashSet, VecDeque};
128use std::hash::Hash;
129use std::marker::PhantomData;
130use std::num::{NonZero, Saturating};
131use std::ops::Range;
132use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
133use std::sync::{Arc, Weak};
134
135pub trait ReservationOwner: Send + Sync {
137 fn release_reservation(&self, owner_object_id: Option<u64>, amount: u64);
141}
142
143pub struct ReservationImpl<T: Borrow<U>, U: ReservationOwner + ?Sized> {
151 owner: T,
152 owner_object_id: Option<u64>,
153 inner: Mutex<ReservationInner>,
154 phantom: PhantomData<U>,
155}
156
157#[derive(Debug, Default)]
158struct ReservationInner {
159 amount: u64,
161
162 reserved: u64,
164}
165
166impl<T: Borrow<U>, U: ReservationOwner + ?Sized> std::fmt::Debug for ReservationImpl<T, U> {
167 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
168 self.inner.lock().fmt(f)
169 }
170}
171
172impl<T: Borrow<U> + Clone + Send + Sync, U: ReservationOwner + ?Sized> ReservationImpl<T, U> {
173 pub fn new(owner: T, owner_object_id: Option<u64>, amount: u64) -> Self {
174 Self {
175 owner,
176 owner_object_id,
177 inner: Mutex::new(ReservationInner { amount, reserved: 0 }),
178 phantom: PhantomData,
179 }
180 }
181
182 pub fn owner_object_id(&self) -> Option<u64> {
183 self.owner_object_id
184 }
185
186 pub fn amount(&self) -> u64 {
188 self.inner.lock().amount
189 }
190
191 pub fn add(&self, amount: u64) {
193 self.inner.lock().amount += amount;
194 }
195
196 pub fn forget(&self) -> u64 {
200 let mut inner = self.inner.lock();
201 assert_eq!(inner.reserved, 0);
202 std::mem::take(&mut inner.amount)
203 }
204
205 pub fn forget_some(&self, amount: u64) {
209 let mut inner = self.inner.lock();
210 inner.amount -= amount;
211 assert!(inner.reserved <= inner.amount);
212 }
213
214 fn reserve_with(&self, amount: impl FnOnce(u64) -> u64) -> ReservationImpl<&Self, Self> {
217 let mut inner = self.inner.lock();
218 let taken = amount(inner.amount - inner.reserved);
219 inner.reserved += taken;
220 ReservationImpl::new(self, self.owner_object_id, taken)
221 }
222
223 pub fn reserve(&self, amount: u64) -> Option<ReservationImpl<&Self, Self>> {
225 let mut inner = self.inner.lock();
226 if inner.amount - inner.reserved < amount {
227 None
228 } else {
229 inner.reserved += amount;
230 Some(ReservationImpl::new(self, self.owner_object_id, amount))
231 }
232 }
233
234 pub fn commit(&self, amount: u64) {
237 let mut inner = self.inner.lock();
238 inner.reserved -= amount;
239 inner.amount -= amount;
240 }
241
242 pub fn give_back(&self, amount: u64) {
244 self.owner.borrow().release_reservation(self.owner_object_id, amount);
245 let mut inner = self.inner.lock();
246 inner.amount -= amount;
247 assert!(inner.reserved <= inner.amount);
248 }
249
250 pub fn move_to<V: Borrow<W> + Clone + Send + Sync, W: ReservationOwner + ?Sized>(
252 &self,
253 other: &ReservationImpl<V, W>,
254 amount: u64,
255 ) {
256 assert_eq!(self.owner_object_id, other.owner_object_id());
257 let mut inner = self.inner.lock();
258 if let Some(amount) = inner.amount.checked_sub(amount) {
259 inner.amount = amount;
260 } else {
261 std::mem::drop(inner);
262 panic!("Insufficient reservation space");
263 }
264 other.add(amount);
265 }
266}
267
268impl<T: Borrow<U>, U: ReservationOwner + ?Sized> Drop for ReservationImpl<T, U> {
269 fn drop(&mut self) {
270 let inner = self.inner.get_mut();
271 assert_eq!(inner.reserved, 0);
272 let owner_object_id = self.owner_object_id;
273 if inner.amount > 0 {
274 self.owner
275 .borrow()
276 .release_reservation(owner_object_id, std::mem::take(&mut inner.amount));
277 }
278 }
279}
280
281impl<T: Borrow<U> + Send + Sync, U: ReservationOwner + ?Sized> ReservationOwner
282 for ReservationImpl<T, U>
283{
284 fn release_reservation(&self, owner_object_id: Option<u64>, amount: u64) {
285 assert_eq!(owner_object_id, self.owner_object_id);
287 let mut inner = self.inner.lock();
288 assert!(inner.reserved >= amount, "{} >= {}", inner.reserved, amount);
289 inner.reserved -= amount;
290 }
291}
292
293pub type Reservation = ReservationImpl<Arc<dyn ReservationOwner>, dyn ReservationOwner>;
294
295pub type Hold<'a> = ReservationImpl<&'a Reservation, Reservation>;
296
297pub type AllocatorKey = AllocatorKeyV32;
300
301#[derive(Clone, Debug, Deserialize, Eq, Hash, PartialEq, Serialize, TypeFingerprint, Versioned)]
302#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
303pub struct AllocatorKeyV32 {
304 pub device_range: Range<u64>,
305}
306
307impl SortByU64 for AllocatorKey {
308 fn get_leading_u64(&self) -> u64 {
309 self.device_range.start
310 }
311}
312
313const EXTENT_HASH_BUCKET_SIZE: u64 = 1 * 1024 * 1024;
314
315pub struct AllocatorKeyPartitionIterator {
316 device_range: Range<u64>,
317}
318
319impl Iterator for AllocatorKeyPartitionIterator {
320 type Item = u64;
321
322 fn next(&mut self) -> Option<Self::Item> {
323 if self.device_range.start >= self.device_range.end {
324 None
325 } else {
326 let start = self.device_range.start;
327 self.device_range.start = start.saturating_add(EXTENT_HASH_BUCKET_SIZE);
328 let end = std::cmp::min(self.device_range.start, self.device_range.end);
329 let key = AllocatorKey { device_range: start..end };
330 let hash = crate::stable_hash::stable_hash(key);
331 Some(hash)
332 }
333 }
334}
335
336impl FuzzyHash for AllocatorKey {
337 fn fuzzy_hash(&self) -> impl Iterator<Item = u64> {
338 AllocatorKeyPartitionIterator {
339 device_range: round_down(self.device_range.start, EXTENT_HASH_BUCKET_SIZE)
340 ..round_up(self.device_range.end, EXTENT_HASH_BUCKET_SIZE).unwrap_or(u64::MAX),
341 }
342 }
343
344 fn is_range_key(&self) -> bool {
345 true
346 }
347}
348
349impl AllocatorKey {
350 pub fn lower_bound_for_merge_into(self: &AllocatorKey) -> AllocatorKey {
352 AllocatorKey { device_range: 0..self.device_range.start }
353 }
354}
355
356impl LayerKey for AllocatorKey {
357 fn merge_type(&self) -> MergeType {
358 MergeType::OptimizedMerge
359 }
360}
361
362impl OrdUpperBound for AllocatorKey {
363 fn cmp_upper_bound(&self, other: &AllocatorKey) -> std::cmp::Ordering {
364 self.device_range.end.cmp(&other.device_range.end)
365 }
366}
367
368impl OrdLowerBound for AllocatorKey {
369 fn cmp_lower_bound(&self, other: &AllocatorKey) -> std::cmp::Ordering {
370 self.device_range
375 .start
376 .cmp(&other.device_range.start)
377 .then(self.device_range.end.cmp(&other.device_range.end))
378 }
379}
380
381impl Ord for AllocatorKey {
382 fn cmp(&self, other: &AllocatorKey) -> std::cmp::Ordering {
383 self.device_range
384 .start
385 .cmp(&other.device_range.start)
386 .then(self.device_range.end.cmp(&other.device_range.end))
387 }
388}
389
390impl PartialOrd for AllocatorKey {
391 fn partial_cmp(&self, other: &AllocatorKey) -> Option<std::cmp::Ordering> {
392 Some(self.cmp(other))
393 }
394}
395
396impl RangeKey for AllocatorKey {
397 fn overlaps(&self, other: &Self) -> bool {
398 self.device_range.start < other.device_range.end
399 && self.device_range.end > other.device_range.start
400 }
401}
402
403pub type AllocatorValue = AllocatorValueV32;
406impl Value for AllocatorValue {
407 const DELETED_MARKER: Self = Self::None;
408}
409
410#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize, TypeFingerprint, Versioned)]
411#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
412pub enum AllocatorValueV32 {
413 None,
415 Abs { count: u64, owner_object_id: u64 },
419}
420
421pub type AllocatorItem = Item<AllocatorKey, AllocatorValue>;
422
423pub type AllocatorInfo = AllocatorInfoV32;
425
426#[derive(Debug, Default, Clone, Deserialize, Serialize, TypeFingerprint, Versioned)]
427pub struct AllocatorInfoV32 {
428 pub layers: Vec<u64>,
430 pub allocated_bytes: BTreeMap<u64, u64>,
432 pub marked_for_deletion: HashSet<u64>,
435 pub limit_bytes: BTreeMap<u64, u64>,
438}
439
440const MAX_ALLOCATOR_INFO_SERIALIZED_SIZE: usize = 131_072;
441
442pub fn max_extent_size_for_block_size(block_size: u64) -> u64 {
444 block_size * (DEFAULT_MAX_SERIALIZED_RECORD_SIZE - 64) / 9
449}
450
451#[derive(Default)]
452struct AllocatorCounters {
453 num_flushes: u64,
454 last_flush_time: Option<std::time::SystemTime>,
455}
456
457pub struct Allocator {
458 filesystem: Weak<FxFilesystem>,
459 block_size: u64,
460 device_size: u64,
461 object_id: u64,
462 max_extent_size_bytes: u64,
463 tree: LSMTree<AllocatorKey, AllocatorValue>,
464 temporary_allocations: Arc<SkipListLayer<AllocatorKey, AllocatorValue>>,
471 inner: Mutex<Inner>,
472 allocation_mutex: futures::lock::Mutex<()>,
473 counters: Mutex<AllocatorCounters>,
474 maximum_offset: AtomicU64,
475 allocations_allowed: AtomicBool,
476}
477
478#[derive(Debug, Default, PartialEq)]
480struct ByteTracking {
481 allocated_bytes: Saturating<u64>,
488
489 uncommitted_allocated_bytes: u64,
492
493 reserved_bytes: u64,
495
496 committed_deallocated_bytes: u64,
500}
501
502impl ByteTracking {
503 fn used_bytes(&self) -> Saturating<u64> {
506 self.allocated_bytes + Saturating(self.uncommitted_allocated_bytes + self.reserved_bytes)
507 }
508
509 fn unavailable_bytes(&self) -> Saturating<u64> {
514 self.allocated_bytes
515 + Saturating(self.uncommitted_allocated_bytes)
516 + Saturating(self.committed_deallocated_bytes)
517 }
518
519 fn unavailable_after_sync_bytes(&self) -> Saturating<u64> {
522 self.allocated_bytes + Saturating(self.uncommitted_allocated_bytes)
523 }
524}
525
526#[derive(Debug)]
527struct CommittedDeallocation {
528 log_file_offset: u64,
530 range: Range<u64>,
532 owner_object_id: u64,
534}
535
536struct Inner {
537 info: AllocatorInfo,
538
539 opened: bool,
542
543 dropped_temporary_allocations: Vec<Range<u64>>,
571
572 owner_bytes: BTreeMap<u64, ByteTracking>,
575
576 unattributed_reserved_bytes: u64,
579
580 committed_deallocated: VecDeque<CommittedDeallocation>,
582
583 trim_reserved_bytes: u64,
586
587 trim_listener: Option<EventListener>,
591
592 strategy: strategy::BestFit,
594
595 allocation_size_histogram: [u64; 64],
597 rebuild_strategy_trigger_histogram: [u64; 64],
599
600 marked_for_deletion: HashSet<u64>,
604
605 volumes_deleted_pending_sync: HashSet<u64>,
607}
608
609impl Inner {
610 fn allocated_bytes(&self) -> Saturating<u64> {
611 let mut total = Saturating(0);
612 for (_, bytes) in &self.owner_bytes {
613 total += bytes.allocated_bytes;
614 }
615 total
616 }
617
618 fn uncommitted_allocated_bytes(&self) -> u64 {
619 self.owner_bytes.values().map(|x| &x.uncommitted_allocated_bytes).sum()
620 }
621
622 fn reserved_bytes(&self) -> u64 {
623 self.owner_bytes.values().map(|x| &x.reserved_bytes).sum::<u64>()
624 + self.unattributed_reserved_bytes
625 }
626
627 fn owner_id_limit_bytes(&self, owner_object_id: u64) -> u64 {
628 match self.info.limit_bytes.get(&owner_object_id) {
629 Some(v) => *v,
630 None => u64::MAX,
631 }
632 }
633
634 fn owner_id_bytes_left(&self, owner_object_id: u64) -> u64 {
635 let limit = self.owner_id_limit_bytes(owner_object_id);
636 let used = self.owner_bytes.get(&owner_object_id).map_or(Saturating(0), |b| b.used_bytes());
637 (Saturating(limit) - used).0
638 }
639
640 fn unavailable_bytes(&self) -> Saturating<u64> {
645 let mut total = Saturating(0);
646 for (_, bytes) in &self.owner_bytes {
647 total += bytes.unavailable_bytes();
648 }
649 total
650 }
651
652 fn used_bytes(&self) -> Saturating<u64> {
655 let mut total = Saturating(0);
656 for (_, bytes) in &self.owner_bytes {
657 total += bytes.used_bytes();
658 }
659 total + Saturating(self.unattributed_reserved_bytes)
660 }
661
662 fn unavailable_after_sync_bytes(&self) -> Saturating<u64> {
665 let mut total = Saturating(0);
666 for (_, bytes) in &self.owner_bytes {
667 total += bytes.unavailable_after_sync_bytes();
668 }
669 total
670 }
671
672 fn bytes_available_not_being_trimmed(&self, device_size: u64) -> Result<u64, Error> {
675 device_size
676 .checked_sub(
677 (self.unavailable_after_sync_bytes() + Saturating(self.trim_reserved_bytes)).0,
678 )
679 .ok_or_else(|| anyhow!(FxfsError::Inconsistent))
680 }
681
682 fn add_reservation(&mut self, owner_object_id: Option<u64>, amount: u64) {
683 match owner_object_id {
684 Some(owner) => self.owner_bytes.entry(owner).or_default().reserved_bytes += amount,
685 None => self.unattributed_reserved_bytes += amount,
686 };
687 }
688
689 fn remove_reservation(&mut self, owner_object_id: Option<u64>, amount: u64) {
690 match owner_object_id {
691 Some(owner) => {
692 let owner_entry = self.owner_bytes.entry(owner).or_default();
693 assert!(
694 owner_entry.reserved_bytes >= amount,
695 "{} >= {}",
696 owner_entry.reserved_bytes,
697 amount
698 );
699 owner_entry.reserved_bytes -= amount;
700 }
701 None => {
702 assert!(
703 self.unattributed_reserved_bytes >= amount,
704 "{} >= {}",
705 self.unattributed_reserved_bytes,
706 amount
707 );
708 self.unattributed_reserved_bytes -= amount
709 }
710 };
711 }
712}
713
714pub struct TrimmableExtents<'a> {
717 allocator: &'a Allocator,
718 extents: Vec<Range<u64>>,
719 _drop_event: DropEvent,
723}
724
725impl<'a> TrimmableExtents<'a> {
726 pub fn extents(&self) -> &Vec<Range<u64>> {
727 &self.extents
728 }
729
730 fn new(allocator: &'a Allocator) -> (Self, EventListener) {
732 let drop_event = DropEvent::new();
733 let listener = drop_event.listen();
734 (Self { allocator, extents: vec![], _drop_event: drop_event }, listener)
735 }
736
737 fn add_extent(&mut self, extent: Range<u64>) {
738 self.extents.push(extent);
739 }
740}
741
742impl<'a> Drop for TrimmableExtents<'a> {
743 fn drop(&mut self) {
744 let mut inner = self.allocator.inner.lock();
745 for device_range in std::mem::take(&mut self.extents) {
746 inner.strategy.free(device_range.clone()).expect("drop trim extent");
747 self.allocator
748 .temporary_allocations
749 .erase(&AllocatorKey { device_range: device_range.clone() });
750 }
751 inner.trim_reserved_bytes = 0;
752 }
753}
754
755impl Allocator {
756 pub fn new(filesystem: Arc<FxFilesystem>, object_id: u64) -> Allocator {
757 let block_size = filesystem.block_size();
758 let device_size = round_down(filesystem.device().size(), block_size);
760 if device_size != filesystem.device().size() {
761 warn!("Device size is not block aligned. Rounding down.");
762 }
763 let max_extent_size_bytes = max_extent_size_for_block_size(filesystem.block_size());
764 let mut strategy = strategy::BestFit::default();
765 strategy.free(0..device_size).expect("new fs");
766 Allocator {
767 filesystem: Arc::downgrade(&filesystem),
768 block_size,
769 device_size,
770 object_id,
771 max_extent_size_bytes,
772 tree: LSMTree::new(merge, Box::new(NullCache {})),
773 temporary_allocations: SkipListLayer::new(1024),
774 inner: Mutex::new(Inner {
775 info: AllocatorInfo::default(),
776 opened: false,
777 dropped_temporary_allocations: Vec::new(),
778 owner_bytes: BTreeMap::new(),
779 unattributed_reserved_bytes: 0,
780 committed_deallocated: VecDeque::new(),
781 trim_reserved_bytes: 0,
782 trim_listener: None,
783 strategy,
784 allocation_size_histogram: [0; 64],
785 rebuild_strategy_trigger_histogram: [0; 64],
786 marked_for_deletion: HashSet::new(),
787 volumes_deleted_pending_sync: HashSet::new(),
788 }),
789 allocation_mutex: futures::lock::Mutex::new(()),
790 counters: Mutex::new(AllocatorCounters::default()),
791 maximum_offset: AtomicU64::new(0),
792 allocations_allowed: AtomicBool::new(filesystem.options().image_builder_mode.is_none()),
793 }
794 }
795
796 pub fn tree(&self) -> &LSMTree<AllocatorKey, AllocatorValue> {
797 &self.tree
798 }
799
800 pub fn enable_allocations(&self) {
803 self.allocations_allowed.store(true, Ordering::SeqCst);
804 }
805
806 pub async fn filter(
811 &self,
812 iter: impl LayerIterator<AllocatorKey, AllocatorValue>,
813 committed_marked_for_deletion: bool,
814 ) -> Result<impl LayerIterator<AllocatorKey, AllocatorValue>, Error> {
815 let marked_for_deletion = {
816 let inner = self.inner.lock();
817 if committed_marked_for_deletion {
818 &inner.info.marked_for_deletion
819 } else {
820 &inner.marked_for_deletion
821 }
822 .clone()
823 };
824 let iter =
825 filter_marked_for_deletion(filter_tombstones(iter).await?, marked_for_deletion).await?;
826 Ok(iter)
827 }
828
829 pub fn allocation_size_histogram(&self) -> [u64; 64] {
833 self.inner.lock().allocation_size_histogram
834 }
835
836 pub async fn create(&self, transaction: &mut Transaction<'_>) -> Result<(), Error> {
838 assert_eq!(std::mem::replace(&mut self.inner.lock().opened, true), false);
841
842 let filesystem = self.filesystem.upgrade().unwrap();
843 let root_store = filesystem.root_store();
844 ObjectStore::create_object_with_id(
845 &root_store,
846 transaction,
847 ReservedId::new(&root_store, NonZero::new(self.object_id()).unwrap()),
848 HandleOptions::default(),
849 None,
850 )?;
851 root_store.update_last_object_id(self.object_id());
852 Ok(())
853 }
854
855 pub async fn open(self: &Arc<Self>) -> Result<(), Error> {
858 let filesystem = self.filesystem.upgrade().unwrap();
859 let root_store = filesystem.root_store();
860
861 self.inner.lock().strategy = strategy::BestFit::default();
862
863 let handle =
864 ObjectStore::open_object(&root_store, self.object_id, HandleOptions::default(), None)
865 .await
866 .context("Failed to open allocator object")?;
867
868 if handle.get_size() > 0 {
869 let serialized_info = handle
870 .contents(MAX_ALLOCATOR_INFO_SERIALIZED_SIZE)
871 .await
872 .context("Failed to read AllocatorInfo")?;
873 let mut cursor = std::io::Cursor::new(serialized_info);
874 let (info, _version) = AllocatorInfo::deserialize_with_version(&mut cursor)
875 .context("Failed to deserialize AllocatorInfo")?;
876
877 let mut handles = Vec::new();
878 let mut total_size = 0;
879 for object_id in &info.layers {
880 let handle = ObjectStore::open_object(
881 &root_store,
882 *object_id,
883 HandleOptions::default(),
884 None,
885 )
886 .await
887 .context("Failed to open allocator layer file")?;
888
889 let size = handle.get_size();
890 total_size += size;
891 handles.push(handle);
892 }
893
894 {
895 let mut inner = self.inner.lock();
896
897 let mut device_bytes = self.device_size;
899 for (&owner_object_id, &bytes) in &info.allocated_bytes {
900 ensure!(
901 bytes <= device_bytes,
902 anyhow!(FxfsError::Inconsistent).context(format!(
903 "Allocated bytes exceeds device size: {:?}",
904 info.allocated_bytes
905 ))
906 );
907 device_bytes -= bytes;
908
909 inner.owner_bytes.entry(owner_object_id).or_default().allocated_bytes =
910 Saturating(bytes);
911 }
912
913 inner.info = info;
914 }
915
916 self.tree.append_layers(handles).await.context("Failed to append allocator layers")?;
917 self.filesystem.upgrade().unwrap().object_manager().update_reservation(
918 self.object_id,
919 tree::reservation_amount_from_layer_size(total_size),
920 );
921 }
922
923 Ok(())
924 }
925
926 pub async fn on_replay_complete(self: &Arc<Self>) -> Result<(), Error> {
927 {
929 let mut inner = self.inner.lock();
930 inner.volumes_deleted_pending_sync.clear();
931 inner.marked_for_deletion = inner.info.marked_for_deletion.clone();
932 }
933
934 if !self.rebuild_strategy().await.context("Build free extents")? {
938 if self.filesystem.upgrade().unwrap().options().read_only {
939 info!("Device contains no free space (read-only mode).");
940 } else {
941 info!("Device contains no free space.");
942 return Err(FxfsError::Inconsistent)
943 .context("Device appears to contain no free space");
944 }
945 }
946
947 assert_eq!(std::mem::replace(&mut self.inner.lock().opened, true), false);
948 Ok(())
949 }
950
951 async fn rebuild_strategy(self: &Arc<Self>) -> Result<bool, Error> {
956 let mut changed = false;
957 let mut layer_set = self.tree.empty_layer_set();
958 layer_set.layers.push((self.temporary_allocations.clone() as Arc<dyn Layer<_, _>>).into());
959 self.tree.add_all_layers_to_layer_set(&mut layer_set);
960
961 let overflow_markers = self.inner.lock().strategy.overflow_markers();
962 self.inner.lock().strategy.reset_overflow_markers();
963
964 let mut to_add = Vec::new();
965 let mut merger = layer_set.merger();
966 let mut iter = self.filter(merger.query(Query::FullScan).await?, false).await?;
967 let mut last_offset = 0;
968 while last_offset < self.device_size {
969 let next_range = match iter.get() {
970 None => {
971 assert!(last_offset <= self.device_size);
972 let range = last_offset..self.device_size;
973 last_offset = self.device_size;
974 iter.advance().await?;
975 range
976 }
977 Some(ItemRef { key: AllocatorKey { device_range, .. }, .. }) => {
978 if device_range.end < last_offset {
979 iter.advance().await?;
980 continue;
981 }
982 if device_range.start <= last_offset {
983 last_offset = device_range.end;
984 iter.advance().await?;
985 continue;
986 }
987 let range = last_offset..device_range.start;
988 last_offset = device_range.end;
989 iter.advance().await?;
990 range
991 }
992 };
993 to_add.push(next_range);
994 if to_add.len() > 100 {
996 let mut inner = self.inner.lock();
997 for range in to_add.drain(..) {
998 changed |= inner.strategy.force_free(range)?;
999 }
1000 }
1001 }
1002 let mut inner = self.inner.lock();
1003 for range in to_add {
1004 changed |= inner.strategy.force_free(range)?;
1005 }
1006 if overflow_markers != inner.strategy.overflow_markers() {
1007 changed = true;
1008 }
1009 Ok(changed)
1010 }
1011
1012 pub async fn take_for_trimming(
1016 &self,
1017 offset: u64,
1018 max_extent_size: usize,
1019 extents_per_batch: usize,
1020 ) -> Result<TrimmableExtents<'_>, Error> {
1021 let _guard = self.allocation_mutex.lock().await;
1022
1023 let (mut result, listener) = TrimmableExtents::new(self);
1024 let mut bytes = 0;
1025
1026 let mut layer_set = self.tree.empty_layer_set();
1029 layer_set.layers.push((self.temporary_allocations.clone() as Arc<dyn Layer<_, _>>).into());
1030 self.tree.add_all_layers_to_layer_set(&mut layer_set);
1031 let mut merger = layer_set.merger();
1032 let mut iter = self
1033 .filter(
1034 merger.query(Query::FullRange(&AllocatorKey { device_range: offset..0 })).await?,
1035 false,
1036 )
1037 .await?;
1038 let mut last_offset = offset;
1039 'outer: while last_offset < self.device_size {
1040 let mut range = match iter.get() {
1041 None => {
1042 assert!(last_offset <= self.device_size);
1043 let range = last_offset..self.device_size;
1044 last_offset = self.device_size;
1045 iter.advance().await?;
1046 range
1047 }
1048 Some(ItemRef { key: AllocatorKey { device_range, .. }, .. }) => {
1049 if device_range.end <= last_offset {
1050 iter.advance().await?;
1051 continue;
1052 }
1053 if device_range.start <= last_offset {
1054 last_offset = device_range.end;
1055 iter.advance().await?;
1056 continue;
1057 }
1058 let range = last_offset..device_range.start;
1059 last_offset = device_range.end;
1060 iter.advance().await?;
1061 range
1062 }
1063 };
1064 if range.start < offset {
1065 continue;
1066 }
1067
1068 let mut inner = self.inner.lock();
1071
1072 while range.start < range.end {
1073 let prefix =
1074 range.start..std::cmp::min(range.start + max_extent_size as u64, range.end);
1075 range = prefix.end..range.end;
1076 bytes += prefix.length()?;
1077 inner.strategy.remove(prefix.clone());
1080 self.temporary_allocations.insert(AllocatorItem {
1081 key: AllocatorKey { device_range: prefix.clone() },
1082 value: AllocatorValue::Abs { owner_object_id: INVALID_OBJECT_ID, count: 1 },
1083 sequence: 0,
1084 })?;
1085 result.add_extent(prefix);
1086 if result.extents.len() == extents_per_batch {
1087 break 'outer;
1088 }
1089 }
1090 if result.extents.len() == extents_per_batch {
1091 break 'outer;
1092 }
1093 }
1094 {
1095 let mut inner = self.inner.lock();
1096
1097 assert!(inner.trim_reserved_bytes == 0, "Multiple trims ongoing");
1098 inner.trim_listener = Some(listener);
1099 inner.trim_reserved_bytes = bytes;
1100 debug_assert!(
1101 (Saturating(inner.trim_reserved_bytes) + inner.unavailable_bytes()).0
1102 <= self.device_size
1103 );
1104 }
1105 Ok(result)
1106 }
1107
1108 pub fn parent_objects(&self) -> Vec<u64> {
1110 self.inner.lock().info.layers.clone()
1113 }
1114
1115 pub fn owner_byte_limits(&self) -> Vec<(u64, u64)> {
1117 self.inner.lock().info.limit_bytes.iter().map(|(k, v)| (*k, *v)).collect()
1118 }
1119
1120 pub fn owner_allocation_info(&self, owner_object_id: u64) -> (u64, Option<u64>) {
1122 let inner = self.inner.lock();
1123 (
1124 inner.owner_bytes.get(&owner_object_id).map(|b| b.used_bytes().0).unwrap_or(0u64),
1125 inner.info.limit_bytes.get(&owner_object_id).copied(),
1126 )
1127 }
1128
1129 pub fn owner_bytes_debug(&self) -> String {
1131 format!("{:?}", self.inner.lock().owner_bytes)
1132 }
1133
1134 fn needs_sync(&self) -> bool {
1135 let inner = self.inner.lock();
1140 inner.unavailable_bytes().0 >= self.device_size
1141 }
1142
1143 fn is_system_store(&self, owner_object_id: u64) -> bool {
1144 let fs = self.filesystem.upgrade().unwrap();
1145 owner_object_id == fs.object_manager().root_store_object_id()
1146 || owner_object_id == fs.object_manager().root_parent_store_object_id()
1147 }
1148
1149 pub fn disown_reservation(&self, old_owner_object_id: Option<u64>, amount: u64) {
1152 if old_owner_object_id.is_none() || amount == 0 {
1153 return;
1154 }
1155 let mut inner = self.inner.lock();
1157 inner.remove_reservation(old_owner_object_id, amount);
1158 inner.add_reservation(None, amount);
1159 }
1160
1161 pub fn track_statistics(self: &Arc<Self>, parent: &fuchsia_inspect::Node, name: &str) {
1164 let this = Arc::downgrade(self);
1165 parent.record_lazy_child(name, move || {
1166 let this_clone = this.clone();
1167 async move {
1168 let inspector = fuchsia_inspect::Inspector::default();
1169 if let Some(this) = this_clone.upgrade() {
1170 let counters = this.counters.lock();
1171 let root = inspector.root();
1172 root.record_uint("max_extent_size_bytes", this.max_extent_size_bytes);
1173 root.record_uint("bytes_total", this.device_size);
1174 let (allocated, reserved, used, unavailable) = {
1175 let inner = this.inner.lock();
1177 (
1178 inner.allocated_bytes().0,
1179 inner.reserved_bytes(),
1180 inner.used_bytes().0,
1181 inner.unavailable_bytes().0,
1182 )
1183 };
1184 root.record_uint("bytes_allocated", allocated);
1185 root.record_uint("bytes_reserved", reserved);
1186 root.record_uint("bytes_used", used);
1187 root.record_uint("bytes_unavailable", unavailable);
1188
1189 if let Some(x) = round_div(100 * allocated, this.device_size) {
1192 root.record_uint("bytes_allocated_percent", x);
1193 }
1194 if let Some(x) = round_div(100 * reserved, this.device_size) {
1195 root.record_uint("bytes_reserved_percent", x);
1196 }
1197 if let Some(x) = round_div(100 * used, this.device_size) {
1198 root.record_uint("bytes_used_percent", x);
1199 }
1200 if let Some(x) = round_div(100 * unavailable, this.device_size) {
1201 root.record_uint("bytes_unavailable_percent", x);
1202 }
1203
1204 root.record_uint("num_flushes", counters.num_flushes);
1205 if let Some(last_flush_time) = counters.last_flush_time.as_ref() {
1206 root.record_uint(
1207 "last_flush_time_ms",
1208 last_flush_time
1209 .duration_since(std::time::UNIX_EPOCH)
1210 .unwrap_or(std::time::Duration::ZERO)
1211 .as_millis()
1212 .try_into()
1213 .unwrap_or(0u64),
1214 );
1215 }
1216
1217 let data = this.allocation_size_histogram();
1218 let alloc_sizes = root.create_uint_linear_histogram(
1219 "allocation_size_histogram",
1220 fuchsia_inspect::LinearHistogramParams {
1221 floor: 1,
1222 step_size: 1,
1223 buckets: 64,
1224 },
1225 );
1226 for (i, count) in data.iter().enumerate() {
1227 if i != 0 {
1228 alloc_sizes.insert_multiple(i as u64, *count as usize);
1229 }
1230 }
1231 root.record(alloc_sizes);
1232
1233 let data = this.inner.lock().rebuild_strategy_trigger_histogram;
1234 let triggers = root.create_uint_linear_histogram(
1235 "rebuild_strategy_triggers",
1236 fuchsia_inspect::LinearHistogramParams {
1237 floor: 1,
1238 step_size: 1,
1239 buckets: 64,
1240 },
1241 );
1242 for (i, count) in data.iter().enumerate() {
1243 if i != 0 {
1244 triggers.insert_multiple(i as u64, *count as usize);
1245 }
1246 }
1247 root.record(triggers);
1248
1249 let allocator_ref = this.clone();
1250 root.record_child("lsm_tree", move |node| {
1251 allocator_ref.tree.record_inspect_data(node);
1252 });
1253 }
1254 Ok(inspector)
1255 }
1256 .boxed()
1257 });
1258 }
1259
1260 pub fn maximum_offset(&self) -> u64 {
1265 self.maximum_offset.load(Ordering::Relaxed)
1266 }
1267}
1268
1269impl Drop for Allocator {
1270 fn drop(&mut self) {
1271 let inner = self.inner.lock();
1272 assert_eq!(inner.uncommitted_allocated_bytes(), 0);
1274 assert_eq!(inner.reserved_bytes(), 0);
1275 }
1276}
1277
1278#[fxfs_trace::trace]
1279impl Allocator {
1280 pub fn object_id(&self) -> u64 {
1282 self.object_id
1283 }
1284
1285 pub fn info(&self) -> AllocatorInfo {
1288 self.inner.lock().info.clone()
1289 }
1290
1291 #[trace]
1299 pub async fn allocate(
1300 self: &Arc<Self>,
1301 transaction: &mut Transaction<'_>,
1302 owner_object_id: u64,
1303 mut len: u64,
1304 ) -> Result<Range<u64>, Error> {
1305 ensure!(self.allocations_allowed.load(Ordering::SeqCst), FxfsError::Unavailable);
1306 assert_eq!(len % self.block_size, 0);
1307 len = std::cmp::min(len, self.max_extent_size_bytes);
1308 debug_assert_ne!(owner_object_id, INVALID_OBJECT_ID);
1309
1310 let reservation = if let Some(reservation) = transaction.allocator_reservation {
1312 match reservation.owner_object_id {
1313 None => assert!(self.is_system_store(owner_object_id)),
1315 Some(res_owner_object_id) => assert_eq!(owner_object_id, res_owner_object_id),
1317 };
1318 let r = reservation
1320 .reserve_with(|limit| std::cmp::min(len, round_down(limit, self.block_size)));
1321 len = r.amount();
1322 Left(r)
1323 } else {
1324 let mut inner = self.inner.lock();
1325 assert!(inner.opened);
1326 let device_used = inner.used_bytes();
1328 let owner_bytes_left = inner.owner_id_bytes_left(owner_object_id);
1329 let limit =
1331 std::cmp::min(owner_bytes_left, (Saturating(self.device_size) - device_used).0);
1332 len = round_down(std::cmp::min(len, limit), self.block_size);
1333 let owner_entry = inner.owner_bytes.entry(owner_object_id).or_default();
1334 owner_entry.reserved_bytes += len;
1335 Right(ReservationImpl::<_, Self>::new(&**self, Some(owner_object_id), len))
1336 };
1337
1338 ensure!(len > 0, FxfsError::NoSpace);
1339
1340 let volumes_deleted = {
1342 let inner = self.inner.lock();
1343 (!inner.volumes_deleted_pending_sync.is_empty())
1344 .then(|| inner.volumes_deleted_pending_sync.clone())
1345 };
1346
1347 if let Some(volumes_deleted) = volumes_deleted {
1348 self.filesystem
1351 .upgrade()
1352 .unwrap()
1353 .sync(SyncOptions {
1354 flush_device: true,
1355 precondition: Some(Box::new(|| {
1356 !self.inner.lock().volumes_deleted_pending_sync.is_empty()
1357 })),
1358 ..Default::default()
1359 })
1360 .await?;
1361
1362 {
1363 let mut inner = self.inner.lock();
1364 for owner_id in volumes_deleted {
1365 inner.volumes_deleted_pending_sync.remove(&owner_id);
1366 inner.marked_for_deletion.insert(owner_id);
1367 }
1368 }
1369
1370 let _guard = self.allocation_mutex.lock().await;
1371 self.rebuild_strategy().await?;
1372 }
1373
1374 #[allow(clippy::never_loop)] let _guard = 'sync: loop {
1376 for _ in 0..10 {
1378 {
1379 let guard = self.allocation_mutex.lock().await;
1380
1381 if !self.needs_sync() {
1382 break 'sync guard;
1383 }
1384 }
1385
1386 self.filesystem
1396 .upgrade()
1397 .unwrap()
1398 .sync(SyncOptions {
1399 flush_device: true,
1400 precondition: Some(Box::new(|| self.needs_sync())),
1401 ..Default::default()
1402 })
1403 .await?;
1404 }
1405 bail!(
1406 anyhow!(FxfsError::NoSpace).context("Sync failed to yield sufficient free space.")
1407 );
1408 };
1409
1410 let mut trim_listener = None;
1411 {
1412 let mut inner = self.inner.lock();
1413 inner.allocation_size_histogram[std::cmp::min(63, len / self.block_size) as usize] += 1;
1414
1415 let avail = self
1418 .device_size
1419 .checked_sub(inner.unavailable_bytes().0)
1420 .ok_or(FxfsError::Inconsistent)?;
1421 let free_and_not_being_trimmed =
1422 inner.bytes_available_not_being_trimmed(self.device_size)?;
1423 if free_and_not_being_trimmed < std::cmp::min(len, avail) {
1424 debug_assert!(inner.trim_reserved_bytes > 0);
1425 trim_listener = std::mem::take(&mut inner.trim_listener);
1426 }
1427 }
1428
1429 if let Some(listener) = trim_listener {
1430 listener.await;
1431 }
1432
1433 let result = loop {
1434 {
1435 let mut inner = self.inner.lock();
1436
1437 for device_range in inner.dropped_temporary_allocations.drain(..) {
1440 self.temporary_allocations.erase(&AllocatorKey { device_range });
1441 }
1442
1443 match inner.strategy.allocate(len) {
1444 Err(FxfsError::NotFound) => {
1445 inner.rebuild_strategy_trigger_histogram
1447 [std::cmp::min(63, (len / self.block_size) as usize)] += 1;
1448 }
1449 Err(err) => {
1450 error!(err:%; "Likely filesystem corruption.");
1451 return Err(err.into());
1452 }
1453 Ok(x) => {
1454 break x;
1455 }
1456 }
1457 }
1458 if !self.rebuild_strategy().await? {
1462 error!("Cannot find additional free space. Corruption?");
1463 return Err(FxfsError::Inconsistent.into());
1464 }
1465 };
1466
1467 debug!(device_range:? = result; "allocate");
1468
1469 let len = result.length().unwrap();
1470 let reservation_owner = reservation.either(
1471 |l| {
1473 l.forget_some(len);
1474 l.owner_object_id()
1475 },
1476 |r| {
1477 r.forget_some(len);
1478 r.owner_object_id()
1479 },
1480 );
1481
1482 {
1483 let mut inner = self.inner.lock();
1484 let owner_entry = inner.owner_bytes.entry(owner_object_id).or_default();
1485 owner_entry.uncommitted_allocated_bytes += len;
1486 assert_eq!(owner_object_id, reservation_owner.unwrap_or(owner_object_id));
1488 inner.remove_reservation(reservation_owner, len);
1489 self.temporary_allocations.insert(AllocatorItem {
1490 key: AllocatorKey { device_range: result.clone() },
1491 value: AllocatorValue::Abs { owner_object_id, count: 1 },
1492 sequence: 0,
1493 })?;
1494 }
1495
1496 let mutation =
1497 AllocatorMutation::Allocate { device_range: result.clone().into(), owner_object_id };
1498 assert!(transaction.add(self.object_id(), Mutation::Allocator(mutation)).is_none());
1499
1500 Ok(result)
1501 }
1502
1503 #[trace]
1506 pub fn mark_allocated(
1507 &self,
1508 transaction: &mut Transaction<'_>,
1509 owner_object_id: u64,
1510 device_range: Range<u64>,
1511 ) -> Result<(), Error> {
1512 debug_assert_ne!(owner_object_id, INVALID_OBJECT_ID);
1513 {
1514 let len = device_range.length().map_err(|_| FxfsError::InvalidArgs)?;
1515
1516 let mut inner = self.inner.lock();
1517 let device_used = inner.used_bytes();
1518 let owner_id_bytes_left = inner.owner_id_bytes_left(owner_object_id);
1519 let owner_entry = inner.owner_bytes.entry(owner_object_id).or_default();
1520 ensure!(
1521 device_range.end <= self.device_size
1522 && (Saturating(self.device_size) - device_used).0 >= len
1523 && owner_id_bytes_left >= len,
1524 FxfsError::NoSpace
1525 );
1526 if let Some(reservation) = &mut transaction.allocator_reservation {
1527 reservation.reserve(len).ok_or(FxfsError::NoSpace)?.forget();
1529 }
1530 owner_entry.uncommitted_allocated_bytes += len;
1531 inner.strategy.remove(device_range.clone());
1532 self.temporary_allocations.insert(AllocatorItem {
1533 key: AllocatorKey { device_range: device_range.clone() },
1534 value: AllocatorValue::Abs { owner_object_id, count: 1 },
1535 sequence: 0,
1536 })?;
1537 }
1538 let mutation =
1539 AllocatorMutation::Allocate { device_range: device_range.into(), owner_object_id };
1540 transaction.add(self.object_id(), Mutation::Allocator(mutation));
1541 Ok(())
1542 }
1543
1544 pub fn set_bytes_limit(
1546 &self,
1547 transaction: &mut Transaction<'_>,
1548 owner_object_id: u64,
1549 bytes: u64,
1550 ) -> Result<(), Error> {
1551 assert!(!self.is_system_store(owner_object_id));
1553 transaction.add(
1554 self.object_id(),
1555 Mutation::Allocator(AllocatorMutation::SetLimit { owner_object_id, bytes }),
1556 );
1557 Ok(())
1558 }
1559
1560 pub fn get_owner_bytes_limit(&self, owner_object_id: u64) -> Option<u64> {
1562 self.inner.lock().info.limit_bytes.get(&owner_object_id).copied()
1563 }
1564
1565 pub fn get_owner_bytes_used(&self, owner_object_id: u64) -> u64 {
1568 self.inner.lock().owner_bytes.get(&owner_object_id).map_or(0, |info| info.used_bytes().0)
1569 }
1570
1571 #[trace]
1573 pub async fn deallocate(
1574 &self,
1575 transaction: &mut Transaction<'_>,
1576 owner_object_id: u64,
1577 dealloc_range: Range<u64>,
1578 ) -> Result<u64, Error> {
1579 debug!(device_range:? = dealloc_range; "deallocate");
1580 ensure!(dealloc_range.is_valid(), FxfsError::InvalidArgs);
1581 let deallocated = dealloc_range.end - dealloc_range.start;
1585 let mutation = AllocatorMutation::Deallocate {
1586 device_range: dealloc_range.clone().into(),
1587 owner_object_id,
1588 };
1589 transaction.add(self.object_id(), Mutation::Allocator(mutation));
1590
1591 let _guard = self.allocation_mutex.lock().await;
1592
1593 let mut inner = self.inner.lock();
1605 for device_range in inner.dropped_temporary_allocations.drain(..) {
1606 self.temporary_allocations.erase(&AllocatorKey { device_range });
1607 }
1608
1609 self.temporary_allocations
1614 .insert(AllocatorItem {
1615 key: AllocatorKey { device_range: dealloc_range.clone() },
1616 value: AllocatorValue::Abs { owner_object_id, count: 1 },
1617 sequence: 0,
1618 })
1619 .context("tracking deallocated")?;
1620
1621 Ok(deallocated)
1622 }
1623
1624 pub fn mark_for_deletion(&self, transaction: &mut Transaction<'_>, owner_object_id: u64) {
1641 transaction.add(
1644 self.object_id(),
1645 Mutation::Allocator(AllocatorMutation::MarkForDeletion(owner_object_id)),
1646 );
1647 }
1648
1649 pub fn did_flush_device(&self, flush_log_offset: u64) {
1652 #[allow(clippy::never_loop)] let deallocs = 'deallocs_outer: loop {
1657 let mut inner = self.inner.lock();
1658 for (index, dealloc) in inner.committed_deallocated.iter().enumerate() {
1659 if dealloc.log_file_offset >= flush_log_offset {
1660 let mut deallocs = inner.committed_deallocated.split_off(index);
1661 std::mem::swap(&mut inner.committed_deallocated, &mut deallocs);
1663 break 'deallocs_outer deallocs;
1664 }
1665 }
1666 break std::mem::take(&mut inner.committed_deallocated);
1667 };
1668
1669 let mut inner = self.inner.lock();
1671 let mut totals = BTreeMap::<u64, u64>::new();
1672 for dealloc in deallocs {
1673 *(totals.entry(dealloc.owner_object_id).or_default()) +=
1674 dealloc.range.length().unwrap();
1675 inner.strategy.free(dealloc.range.clone()).expect("dealloced ranges");
1676 self.temporary_allocations.erase(&AllocatorKey { device_range: dealloc.range.clone() });
1677 }
1678
1679 for (owner_object_id, total) in totals {
1683 match inner.owner_bytes.get_mut(&owner_object_id) {
1684 Some(counters) => counters.committed_deallocated_bytes -= total,
1685 None => {
1686 assert!(inner.volumes_deleted_pending_sync.contains(&owner_object_id));
1689 }
1690 }
1691 }
1692 }
1693
1694 pub fn reserve(
1697 self: Arc<Self>,
1698 owner_object_id: Option<u64>,
1699 amount: u64,
1700 ) -> Option<Reservation> {
1701 {
1702 let mut inner = self.inner.lock();
1703
1704 let device_free = (Saturating(self.device_size) - inner.used_bytes()).0;
1705
1706 let limit = match owner_object_id {
1707 Some(id) => std::cmp::min(inner.owner_id_bytes_left(id), device_free),
1708 None => device_free,
1709 };
1710 if limit < amount {
1711 return None;
1712 }
1713 inner.add_reservation(owner_object_id, amount);
1714 }
1715 Some(Reservation::new(self, owner_object_id, amount))
1716 }
1717
1718 pub fn reserve_with(
1721 self: Arc<Self>,
1722 owner_object_id: Option<u64>,
1723 amount: impl FnOnce(u64) -> u64,
1724 ) -> Reservation {
1725 let amount = {
1726 let mut inner = self.inner.lock();
1727
1728 let device_free = (Saturating(self.device_size) - inner.used_bytes()).0;
1729
1730 let amount = amount(match owner_object_id {
1731 Some(id) => std::cmp::min(inner.owner_id_bytes_left(id), device_free),
1732 None => device_free,
1733 });
1734
1735 inner.add_reservation(owner_object_id, amount);
1736
1737 amount
1738 };
1739
1740 Reservation::new(self, owner_object_id, amount)
1741 }
1742
1743 pub fn get_allocated_bytes(&self) -> u64 {
1745 self.inner.lock().allocated_bytes().0
1746 }
1747
1748 pub fn get_disk_bytes(&self) -> u64 {
1750 self.device_size
1751 }
1752
1753 pub fn get_owner_allocated_bytes(&self) -> BTreeMap<u64, u64> {
1757 self.inner.lock().owner_bytes.iter().map(|(k, v)| (*k, v.allocated_bytes.0)).collect()
1758 }
1759
1760 pub fn get_used_bytes(&self) -> Saturating<u64> {
1762 let inner = self.inner.lock();
1763 inner.used_bytes()
1764 }
1765}
1766
1767impl ReservationOwner for Allocator {
1768 fn release_reservation(&self, owner_object_id: Option<u64>, amount: u64) {
1769 self.inner.lock().remove_reservation(owner_object_id, amount);
1770 }
1771}
1772
1773#[async_trait]
1774impl JournalingObject for Allocator {
1775 fn apply_mutation(
1776 &self,
1777 mutation: Mutation,
1778 context: &ApplyContext<'_, '_>,
1779 _assoc_obj: AssocObj<'_>,
1780 ) -> Result<(), Error> {
1781 match mutation {
1782 Mutation::Allocator(AllocatorMutation::MarkForDeletion(owner_object_id)) => {
1783 let mut inner = self.inner.lock();
1784 inner.owner_bytes.remove(&owner_object_id);
1785
1786 inner.info.marked_for_deletion.insert(owner_object_id);
1791 inner.volumes_deleted_pending_sync.insert(owner_object_id);
1792
1793 inner.info.limit_bytes.remove(&owner_object_id);
1794 }
1795 Mutation::Allocator(AllocatorMutation::Allocate { device_range, owner_object_id }) => {
1796 self.maximum_offset.fetch_max(device_range.end, Ordering::Relaxed);
1797 let item = AllocatorItem {
1798 key: AllocatorKey { device_range: device_range.clone().into() },
1799 value: AllocatorValue::Abs { count: 1, owner_object_id },
1800 sequence: context.checkpoint.file_offset,
1801 };
1802 let len = item.key.device_range.length().unwrap();
1803 let lower_bound = item.key.lower_bound_for_merge_into();
1804 self.tree.merge_into(item, &lower_bound);
1805 let mut inner = self.inner.lock();
1806 let entry = inner.owner_bytes.entry(owner_object_id).or_default();
1807 entry.allocated_bytes += len;
1808 if let ApplyMode::Live(transaction) = context.mode {
1809 entry.uncommitted_allocated_bytes -= len;
1810 inner.dropped_temporary_allocations.push(device_range.into());
1815 if let Some(reservation) = transaction.allocator_reservation {
1816 reservation.commit(len);
1817 }
1818 }
1819 }
1820 Mutation::Allocator(AllocatorMutation::Deallocate {
1821 device_range,
1822 owner_object_id,
1823 }) => {
1824 let item = AllocatorItem {
1825 key: AllocatorKey { device_range: device_range.into() },
1826 value: AllocatorValue::None,
1827 sequence: context.checkpoint.file_offset,
1828 };
1829 let len = item.key.device_range.length().unwrap();
1830
1831 {
1832 let mut inner = self.inner.lock();
1833 {
1834 let entry = inner.owner_bytes.entry(owner_object_id).or_default();
1835 entry.allocated_bytes -= len;
1836 if context.mode.is_live() {
1837 entry.committed_deallocated_bytes += len;
1838 }
1839 }
1840 if context.mode.is_live() {
1841 inner.committed_deallocated.push_back(CommittedDeallocation {
1842 log_file_offset: context.checkpoint.file_offset,
1843 range: item.key.device_range.clone(),
1844 owner_object_id,
1845 });
1846 }
1847 if let ApplyMode::Live(Transaction {
1848 allocator_reservation: Some(reservation),
1849 ..
1850 }) = context.mode
1851 {
1852 inner.add_reservation(reservation.owner_object_id(), len);
1853 reservation.add(len);
1854 }
1855 }
1856 let lower_bound = item.key.lower_bound_for_merge_into();
1857 self.tree.merge_into(item, &lower_bound);
1858 }
1859 Mutation::Allocator(AllocatorMutation::SetLimit { owner_object_id, bytes }) => {
1860 self.inner.lock().info.limit_bytes.insert(owner_object_id, bytes);
1865 }
1866 Mutation::BeginFlush => {
1867 self.tree.seal();
1868 let mut inner = self.inner.lock();
1871 let allocated_bytes =
1872 inner.owner_bytes.iter().map(|(k, v)| (*k, v.allocated_bytes.0)).collect();
1873 inner.info.allocated_bytes = allocated_bytes;
1874 }
1875 Mutation::EndFlush => {}
1876 _ => bail!("unexpected mutation: {:?}", mutation),
1877 }
1878 Ok(())
1879 }
1880
1881 fn drop_mutation(&self, mutation: Mutation, transaction: &Transaction<'_>) {
1882 match mutation {
1883 Mutation::Allocator(AllocatorMutation::Allocate { device_range, owner_object_id }) => {
1884 let len = device_range.length().unwrap();
1885 let mut inner = self.inner.lock();
1886 inner
1887 .owner_bytes
1888 .entry(owner_object_id)
1889 .or_default()
1890 .uncommitted_allocated_bytes -= len;
1891 if let Some(reservation) = transaction.allocator_reservation {
1892 let res_owner = reservation.owner_object_id();
1893 inner.add_reservation(res_owner, len);
1894 reservation.release_reservation(res_owner, len);
1895 }
1896 inner.strategy.free(device_range.clone().into()).expect("drop mutaton");
1897 self.temporary_allocations
1898 .erase(&AllocatorKey { device_range: device_range.into() });
1899 }
1900 Mutation::Allocator(AllocatorMutation::Deallocate { device_range, .. }) => {
1901 self.temporary_allocations
1902 .erase(&AllocatorKey { device_range: device_range.into() });
1903 }
1904 _ => {}
1905 }
1906 }
1907
1908 async fn flush(&self) -> Result<Version, Error> {
1909 let filesystem = self.filesystem.upgrade().unwrap();
1910 let object_manager = filesystem.object_manager();
1911 let earliest_version = self.tree.get_earliest_version();
1912 if !object_manager.needs_flush(self.object_id()) && earliest_version == LATEST_VERSION {
1913 return Ok(earliest_version);
1915 }
1916
1917 let fs = self.filesystem.upgrade().unwrap();
1918 let mut flusher = Flusher::new(self, &fs).await;
1919 let (new_layer_file, info) = flusher.start().await?;
1920 flusher.finish(new_layer_file, info).await
1921 }
1922}
1923
1924pub struct CoalescingIterator<I> {
1937 iter: I,
1938 item: Option<AllocatorItem>,
1939}
1940
1941impl<I: LayerIterator<AllocatorKey, AllocatorValue>> CoalescingIterator<I> {
1942 pub async fn new(iter: I) -> Result<CoalescingIterator<I>, Error> {
1943 let mut iter = Self { iter, item: None };
1944 iter.advance().await?;
1945 Ok(iter)
1946 }
1947}
1948
1949#[async_trait]
1950impl<I: LayerIterator<AllocatorKey, AllocatorValue>> LayerIterator<AllocatorKey, AllocatorValue>
1951 for CoalescingIterator<I>
1952{
1953 async fn advance(&mut self) -> Result<(), Error> {
1954 self.item = self.iter.get().map(|x| x.cloned());
1955 if self.item.is_none() {
1956 return Ok(());
1957 }
1958 let left = self.item.as_mut().unwrap();
1959 loop {
1960 self.iter.advance().await?;
1961 match self.iter.get() {
1962 None => return Ok(()),
1963 Some(right) => {
1964 ensure!(
1966 left.key.device_range.end <= right.key.device_range.start,
1967 FxfsError::Inconsistent
1968 );
1969 if left.key.device_range.end < right.key.device_range.start
1971 || left.value != *right.value
1972 {
1973 return Ok(());
1974 }
1975 left.key.device_range.end = right.key.device_range.end;
1976 }
1977 }
1978 }
1979 }
1980
1981 fn get(&self) -> Option<ItemRef<'_, AllocatorKey, AllocatorValue>> {
1982 self.item.as_ref().map(|x| x.as_item_ref())
1983 }
1984}
1985
1986struct Flusher<'a> {
1987 allocator: &'a Allocator,
1988 fs: &'a Arc<FxFilesystem>,
1989 _guard: WriteGuard<'a>,
1990}
1991
1992impl<'a> Flusher<'a> {
1993 async fn new(allocator: &'a Allocator, fs: &'a Arc<FxFilesystem>) -> Self {
1994 let keys = lock_keys![LockKey::flush(allocator.object_id())];
1995 Self { allocator, fs, _guard: fs.lock_manager().write_lock(keys).await }
1996 }
1997
1998 fn txn_options(allocator_reservation: &Reservation) -> Options<'_> {
1999 Options {
2000 skip_journal_checks: true,
2001 borrow_metadata_space: true,
2002 allocator_reservation: Some(allocator_reservation),
2003 ..Default::default()
2004 }
2005 }
2006
2007 async fn start(&mut self) -> Result<(DataObjectHandle<ObjectStore>, AllocatorInfo), Error> {
2008 let object_manager = self.fs.object_manager();
2009 let mut transaction = self
2010 .fs
2011 .clone()
2012 .new_transaction(lock_keys![], Self::txn_options(object_manager.metadata_reservation()))
2013 .await?;
2014
2015 let root_store = self.fs.root_store();
2016 let layer_object_handle = ObjectStore::create_object(
2017 &root_store,
2018 &mut transaction,
2019 HandleOptions { skip_journal_checks: true, ..Default::default() },
2020 None,
2021 )
2022 .await?;
2023 root_store.add_to_graveyard(&mut transaction, layer_object_handle.object_id());
2024 transaction.add(self.allocator.object_id(), Mutation::BeginFlush);
2031 let info = transaction
2032 .commit_with_callback(|_| {
2033 self.allocator.inner.lock().info.clone()
2036 })
2037 .await?;
2038 Ok((layer_object_handle, info))
2039 }
2040
2041 async fn finish(
2042 self,
2043 layer_object_handle: DataObjectHandle<ObjectStore>,
2044 mut info: AllocatorInfo,
2045 ) -> Result<Version, Error> {
2046 let object_manager = self.fs.object_manager();
2047 let txn_options = Self::txn_options(object_manager.metadata_reservation());
2048
2049 let layer_set = self.allocator.tree.immutable_layer_set();
2050 let total_len = layer_set.sum_len();
2051 {
2052 let start_time = std::time::Instant::now();
2053 let merged_layer_count = layer_set.layers.len();
2054 let mut merger = layer_set.merger();
2055 let iter = self.allocator.filter(merger.query(Query::FullScan).await?, true).await?;
2056 let iter = CoalescingIterator::new(iter).await?;
2057 let bytes_written = compact_with_iterator(
2058 iter,
2059 total_len,
2060 DirectWriter::new(&layer_object_handle, txn_options).await,
2061 layer_object_handle.block_size(),
2062 Some(self.fs.journal().get_compaction_yielder()),
2063 )
2064 .await?;
2065
2066 self.allocator.tree.report_compaction_metrics(
2067 bytes_written,
2068 start_time.elapsed(),
2069 merged_layer_count,
2070 );
2071 }
2072
2073 let root_store = self.fs.root_store();
2074
2075 let object_handle;
2077 let reservation_update;
2078 let mut transaction = self
2079 .fs
2080 .clone()
2081 .new_transaction(
2082 lock_keys![LockKey::object(
2083 root_store.store_object_id(),
2084 self.allocator.object_id()
2085 )],
2086 txn_options,
2087 )
2088 .await?;
2089 let mut serialized_info = Vec::new();
2090
2091 debug!(oid = layer_object_handle.object_id(); "new allocator layer file");
2092 object_handle = ObjectStore::open_object(
2093 &root_store,
2094 self.allocator.object_id(),
2095 HandleOptions::default(),
2096 None,
2097 )
2098 .await?;
2099
2100 for object_id in &info.layers {
2102 root_store.add_to_graveyard(&mut transaction, *object_id);
2103 }
2104
2105 let marked_for_deletion = std::mem::take(&mut info.marked_for_deletion);
2112
2113 info.layers = vec![layer_object_handle.object_id()];
2114
2115 info.serialize_with_version(&mut serialized_info)?;
2116
2117 let mut buf = object_handle.allocate_buffer(serialized_info.len()).await;
2118 buf.as_mut_slice()[..serialized_info.len()].copy_from_slice(&serialized_info[..]);
2119 object_handle.txn_write(&mut transaction, 0u64, buf.as_ref()).await?;
2120
2121 reservation_update = ReservationUpdate::new(tree::reservation_amount_from_layer_size(
2122 layer_object_handle.get_size(),
2123 ));
2124
2125 transaction.add_with_object(
2128 self.allocator.object_id(),
2129 Mutation::EndFlush,
2130 AssocObj::Borrowed(&reservation_update),
2131 );
2132 root_store.remove_from_graveyard(&mut transaction, layer_object_handle.object_id());
2133
2134 let layers = layers_from_handles([layer_object_handle]).await?;
2135 transaction
2136 .commit_with_callback(|_| {
2137 self.allocator.tree.set_layers(layers);
2138
2139 let mut inner = self.allocator.inner.lock();
2143 inner.info.layers = info.layers;
2144 for owner_id in marked_for_deletion {
2145 inner.marked_for_deletion.remove(&owner_id);
2146 inner.info.marked_for_deletion.remove(&owner_id);
2147 }
2148 })
2149 .await?;
2150
2151 for layer in layer_set.layers {
2153 let object_id = layer.handle().map(|h| h.object_id());
2154 layer.close_layer().await;
2155 if let Some(object_id) = object_id {
2156 root_store.tombstone_object(object_id, txn_options).await?;
2157 }
2158 }
2159
2160 let mut counters = self.allocator.counters.lock();
2161 counters.num_flushes += 1;
2162 counters.last_flush_time = Some(std::time::SystemTime::now());
2163 Ok(self.allocator.tree.get_earliest_version())
2165 }
2166}
2167
2168#[cfg(test)]
2169mod tests {
2170 use crate::filesystem::{
2171 FxFilesystem, FxFilesystemBuilder, JournalingObject, OpenFxFilesystem,
2172 };
2173 use crate::fsck::fsck;
2174 use crate::lsm_tree::cache::NullCache;
2175 use crate::lsm_tree::skip_list_layer::SkipListLayer;
2176 use crate::lsm_tree::types::{FuzzyHash as _, Item, ItemRef, Layer, LayerIterator};
2177 use crate::lsm_tree::{LSMTree, Query};
2178 use crate::object_handle::ObjectHandle;
2179 use crate::object_store::allocator::merge::merge;
2180 use crate::object_store::allocator::{
2181 Allocator, AllocatorKey, AllocatorValue, CoalescingIterator,
2182 };
2183 use crate::object_store::transaction::{Options, TRANSACTION_METADATA_MAX_AMOUNT, lock_keys};
2184 use crate::object_store::volume::root_volume;
2185 use crate::object_store::{Directory, LockKey, NewChildStoreOptions, ObjectStore};
2186 use crate::range::RangeExt;
2187 use crate::round::round_up;
2188 use crate::testing;
2189 use fuchsia_async as fasync;
2190 use fuchsia_sync::Mutex;
2191 use std::cmp::{max, min};
2192 use std::ops::{Bound, Range};
2193 use std::sync::Arc;
2194 use storage_device::DeviceHolder;
2195 use storage_device::fake_device::FakeDevice;
2196
2197 #[test]
2198 fn test_allocator_key_is_range_based() {
2199 assert!(AllocatorKey { device_range: 0..100 }.is_range_key());
2201 }
2202
2203 #[fuchsia::test]
2204 async fn test_coalescing_iterator() {
2205 let skip_list = SkipListLayer::new(100);
2206 let items = [
2207 Item::new(
2208 AllocatorKey { device_range: 0..100 },
2209 AllocatorValue::Abs { count: 1, owner_object_id: 99 },
2210 ),
2211 Item::new(
2212 AllocatorKey { device_range: 100..200 },
2213 AllocatorValue::Abs { count: 1, owner_object_id: 99 },
2214 ),
2215 ];
2216 skip_list.insert(items[1].clone()).expect("insert error");
2217 skip_list.insert(items[0].clone()).expect("insert error");
2218 let mut iter =
2219 CoalescingIterator::new(skip_list.seek(Bound::Unbounded).await.expect("seek failed"))
2220 .await
2221 .expect("new failed");
2222 let ItemRef { key, value, .. } = iter.get().expect("get failed");
2223 assert_eq!(
2224 (key, value),
2225 (
2226 &AllocatorKey { device_range: 0..200 },
2227 &AllocatorValue::Abs { count: 1, owner_object_id: 99 }
2228 )
2229 );
2230 iter.advance().await.expect("advance failed");
2231 assert!(iter.get().is_none());
2232 }
2233
2234 #[fuchsia::test]
2235 async fn test_merge_and_coalesce_across_three_layers() {
2236 let lsm_tree = LSMTree::new(merge, Box::new(NullCache {}));
2237 lsm_tree
2238 .insert(Item::new(
2239 AllocatorKey { device_range: 100..200 },
2240 AllocatorValue::Abs { count: 1, owner_object_id: 99 },
2241 ))
2242 .expect("insert error");
2243 lsm_tree.seal();
2244 lsm_tree
2245 .insert(Item::new(
2246 AllocatorKey { device_range: 0..100 },
2247 AllocatorValue::Abs { count: 1, owner_object_id: 99 },
2248 ))
2249 .expect("insert error");
2250
2251 let layer_set = lsm_tree.layer_set();
2252 let mut merger = layer_set.merger();
2253 let mut iter =
2254 CoalescingIterator::new(merger.query(Query::FullScan).await.expect("seek failed"))
2255 .await
2256 .expect("new failed");
2257 let ItemRef { key, value, .. } = iter.get().expect("get failed");
2258 assert_eq!(
2259 (key, value),
2260 (
2261 &AllocatorKey { device_range: 0..200 },
2262 &AllocatorValue::Abs { count: 1, owner_object_id: 99 }
2263 )
2264 );
2265 iter.advance().await.expect("advance failed");
2266 assert!(iter.get().is_none());
2267 }
2268
2269 #[fuchsia::test]
2270 async fn test_merge_and_coalesce_wont_merge_across_object_id() {
2271 let lsm_tree = LSMTree::new(merge, Box::new(NullCache {}));
2272 lsm_tree
2273 .insert(Item::new(
2274 AllocatorKey { device_range: 100..200 },
2275 AllocatorValue::Abs { count: 1, owner_object_id: 99 },
2276 ))
2277 .expect("insert error");
2278 lsm_tree.seal();
2279 lsm_tree
2280 .insert(Item::new(
2281 AllocatorKey { device_range: 0..100 },
2282 AllocatorValue::Abs { count: 1, owner_object_id: 98 },
2283 ))
2284 .expect("insert error");
2285
2286 let layer_set = lsm_tree.layer_set();
2287 let mut merger = layer_set.merger();
2288 let mut iter =
2289 CoalescingIterator::new(merger.query(Query::FullScan).await.expect("seek failed"))
2290 .await
2291 .expect("new failed");
2292 let ItemRef { key, value, .. } = iter.get().expect("get failed");
2293 assert_eq!(
2294 (key, value),
2295 (
2296 &AllocatorKey { device_range: 0..100 },
2297 &AllocatorValue::Abs { count: 1, owner_object_id: 98 },
2298 )
2299 );
2300 iter.advance().await.expect("advance failed");
2301 let ItemRef { key, value, .. } = iter.get().expect("get failed");
2302 assert_eq!(
2303 (key, value),
2304 (
2305 &AllocatorKey { device_range: 100..200 },
2306 &AllocatorValue::Abs { count: 1, owner_object_id: 99 }
2307 )
2308 );
2309 iter.advance().await.expect("advance failed");
2310 assert!(iter.get().is_none());
2311 }
2312
2313 fn overlap(a: &Range<u64>, b: &Range<u64>) -> u64 {
2314 if a.end > b.start && a.start < b.end {
2315 min(a.end, b.end) - max(a.start, b.start)
2316 } else {
2317 0
2318 }
2319 }
2320
2321 async fn collect_allocations(allocator: &Allocator) -> Vec<Range<u64>> {
2322 let layer_set = allocator.tree.layer_set();
2323 let mut merger = layer_set.merger();
2324 let mut iter = allocator
2325 .filter(merger.query(Query::FullScan).await.expect("seek failed"), false)
2326 .await
2327 .expect("build iterator");
2328 let mut allocations: Vec<Range<u64>> = Vec::new();
2329 while let Some(ItemRef { key: AllocatorKey { device_range }, .. }) = iter.get() {
2330 if let Some(r) = allocations.last() {
2331 assert!(device_range.start >= r.end);
2332 }
2333 allocations.push(device_range.clone());
2334 iter.advance().await.expect("advance failed");
2335 }
2336 allocations
2337 }
2338
2339 async fn check_allocations(allocator: &Allocator, expected_allocations: &[Range<u64>]) {
2340 let layer_set = allocator.tree.layer_set();
2341 let mut merger = layer_set.merger();
2342 let mut iter = allocator
2343 .filter(merger.query(Query::FullScan).await.expect("seek failed"), false)
2344 .await
2345 .expect("build iterator");
2346 let mut found = 0;
2347 while let Some(ItemRef { key: AllocatorKey { device_range }, .. }) = iter.get() {
2348 let mut l = device_range.length().expect("Invalid range");
2349 found += l;
2350 for range in expected_allocations {
2353 l -= overlap(range, device_range);
2354 if l == 0 {
2355 break;
2356 }
2357 }
2358 assert_eq!(l, 0, "range {device_range:?} not covered by expectations");
2359 iter.advance().await.expect("advance failed");
2360 }
2361 assert_eq!(found, expected_allocations.iter().map(|r| r.length().unwrap()).sum::<u64>());
2363 }
2364
2365 async fn test_fs() -> (OpenFxFilesystem, Arc<Allocator>) {
2366 let device = DeviceHolder::new(FakeDevice::new(4096, 4096));
2367 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2368 let allocator = fs.allocator();
2369 (fs, allocator)
2370 }
2371
2372 #[fuchsia::test]
2373 async fn test_allocations() {
2374 const STORE_OBJECT_ID: u64 = 99;
2375 let (fs, allocator) = test_fs().await;
2376 let mut transaction =
2377 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2378 let mut device_ranges = collect_allocations(&allocator).await;
2379
2380 let expected = vec![
2382 0..4096, 4096..139264, 139264..204800, 204800..335872, 335872..401408, 524288..528384, ];
2389 assert_eq!(device_ranges, expected);
2390 device_ranges.push(
2391 allocator
2392 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2393 .await
2394 .expect("allocate failed"),
2395 );
2396 assert_eq!(device_ranges.last().unwrap().length().expect("Invalid range"), fs.block_size());
2397 device_ranges.push(
2398 allocator
2399 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2400 .await
2401 .expect("allocate failed"),
2402 );
2403 assert_eq!(device_ranges.last().unwrap().length().expect("Invalid range"), fs.block_size());
2404 assert_eq!(overlap(&device_ranges[0], &device_ranges[1]), 0);
2405 transaction.commit().await.expect("commit failed");
2406 let mut transaction =
2407 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2408 device_ranges.push(
2409 allocator
2410 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2411 .await
2412 .expect("allocate failed"),
2413 );
2414 assert_eq!(device_ranges[7].length().expect("Invalid range"), fs.block_size());
2415 assert_eq!(overlap(&device_ranges[5], &device_ranges[7]), 0);
2416 assert_eq!(overlap(&device_ranges[6], &device_ranges[7]), 0);
2417 transaction.commit().await.expect("commit failed");
2418
2419 check_allocations(&allocator, &device_ranges).await;
2420 }
2421
2422 #[fuchsia::test]
2423 async fn test_allocate_more_than_max_size() {
2424 const STORE_OBJECT_ID: u64 = 99;
2425 let (fs, allocator) = test_fs().await;
2426 let mut transaction =
2427 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2428 let mut device_ranges = collect_allocations(&allocator).await;
2429 device_ranges.push(
2430 allocator
2431 .allocate(&mut transaction, STORE_OBJECT_ID, fs.device().size())
2432 .await
2433 .expect("allocate failed"),
2434 );
2435 assert_eq!(
2436 device_ranges.last().unwrap().length().expect("Invalid range"),
2437 allocator.max_extent_size_bytes
2438 );
2439 transaction.commit().await.expect("commit failed");
2440
2441 check_allocations(&allocator, &device_ranges).await;
2442 }
2443
2444 #[fuchsia::test]
2445 async fn test_deallocations() {
2446 const STORE_OBJECT_ID: u64 = 99;
2447 let (fs, allocator) = test_fs().await;
2448 let initial_allocations = collect_allocations(&allocator).await;
2449
2450 let mut transaction =
2451 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2452 let device_range1 = allocator
2453 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2454 .await
2455 .expect("allocate failed");
2456 assert_eq!(device_range1.length().expect("Invalid range"), fs.block_size());
2457 transaction.commit().await.expect("commit failed");
2458
2459 let mut transaction =
2460 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2461 allocator
2462 .deallocate(&mut transaction, STORE_OBJECT_ID, device_range1)
2463 .await
2464 .expect("deallocate failed");
2465 transaction.commit().await.expect("commit failed");
2466
2467 check_allocations(&allocator, &initial_allocations).await;
2468 }
2469
2470 #[fuchsia::test]
2471 async fn test_mark_allocated() {
2472 const STORE_OBJECT_ID: u64 = 99;
2473 let (fs, allocator) = test_fs().await;
2474 let mut device_ranges = collect_allocations(&allocator).await;
2475 let range = {
2476 let mut transaction = fs
2477 .clone()
2478 .new_transaction(lock_keys![], Options::default())
2479 .await
2480 .expect("new failed");
2481 allocator
2483 .allocate(&mut transaction, STORE_OBJECT_ID, 2 * fs.block_size())
2484 .await
2485 .expect("allocate failed")
2486 };
2488
2489 let mut transaction =
2490 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2491
2492 device_ranges.push(
2495 allocator
2496 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2497 .await
2498 .expect("allocate failed"),
2499 );
2500
2501 assert_eq!(device_ranges.last().unwrap().start, range.start);
2502
2503 let mut range2 = range.clone();
2505 range2.start += fs.block_size();
2506 allocator
2507 .mark_allocated(&mut transaction, STORE_OBJECT_ID, range2.clone())
2508 .expect("mark_allocated failed");
2509 device_ranges.push(range2);
2510
2511 device_ranges.push(
2513 allocator
2514 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2515 .await
2516 .expect("allocate failed"),
2517 );
2518 let last_range = device_ranges.last().unwrap();
2519 assert_eq!(last_range.length().expect("Invalid range"), fs.block_size());
2520 assert_eq!(overlap(last_range, &range), 0);
2521 transaction.commit().await.expect("commit failed");
2522
2523 check_allocations(&allocator, &device_ranges).await;
2524 }
2525
2526 #[fuchsia::test]
2527 async fn test_mark_for_deletion() {
2528 const STORE_OBJECT_ID: u64 = 99;
2529 let (fs, allocator) = test_fs().await;
2530
2531 let initial_allocated_bytes = allocator.get_allocated_bytes();
2533 let mut device_ranges = collect_allocations(&allocator).await;
2534 let mut transaction =
2535 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2536 for _ in 0..15 {
2538 device_ranges.push(
2539 allocator
2540 .allocate(&mut transaction, STORE_OBJECT_ID, 100 * fs.block_size())
2541 .await
2542 .expect("allocate failed"),
2543 );
2544 device_ranges.push(
2545 allocator
2546 .allocate(&mut transaction, STORE_OBJECT_ID, 100 * fs.block_size())
2547 .await
2548 .expect("allocate2 failed"),
2549 );
2550 }
2551 transaction.commit().await.expect("commit failed");
2552 check_allocations(&allocator, &device_ranges).await;
2553
2554 assert_eq!(
2555 allocator.get_allocated_bytes(),
2556 initial_allocated_bytes + fs.block_size() * 3000
2557 );
2558
2559 let mut transaction =
2561 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2562 allocator.mark_for_deletion(&mut transaction, STORE_OBJECT_ID);
2563 transaction.commit().await.expect("commit failed");
2564
2565 assert_eq!(allocator.get_allocated_bytes(), initial_allocated_bytes);
2567 check_allocations(&allocator, &device_ranges).await;
2568
2569 device_ranges.clear();
2572
2573 let mut transaction =
2574 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2575 let target_bytes = 1500 * fs.block_size();
2576 while device_ranges.iter().map(|x| x.length().unwrap()).sum::<u64>() != target_bytes {
2577 let len = std::cmp::min(
2578 target_bytes - device_ranges.iter().map(|x| x.length().unwrap()).sum::<u64>(),
2579 100 * fs.block_size(),
2580 );
2581 device_ranges.push(
2582 allocator.allocate(&mut transaction, 100, len).await.expect("allocate failed"),
2583 );
2584 }
2585 transaction.commit().await.expect("commit failed");
2586
2587 allocator.flush().await.expect("flush failed");
2589
2590 assert!(!allocator.get_owner_allocated_bytes().contains_key(&STORE_OBJECT_ID));
2594 assert_eq!(*allocator.get_owner_allocated_bytes().get(&100).unwrap(), target_bytes,);
2595 }
2596
2597 async fn create_file(store: &Arc<ObjectStore>, size: usize) {
2598 let root_directory =
2599 Directory::open(store, store.root_directory_object_id()).await.expect("open failed");
2600
2601 let mut transaction = store
2602 .filesystem()
2603 .new_transaction(
2604 lock_keys![LockKey::object(
2605 store.store_object_id(),
2606 store.root_directory_object_id()
2607 )],
2608 Options::default(),
2609 )
2610 .await
2611 .expect("new_transaction failed");
2612 let file = root_directory
2613 .create_child_file(&mut transaction, &format!("foo {}", size))
2614 .await
2615 .expect("create_child_file failed");
2616 transaction.commit().await.expect("commit failed");
2617
2618 let buffer = file.allocate_buffer(size).await;
2619
2620 let mut transaction = file
2622 .new_transaction_with_options(Options {
2623 borrow_metadata_space: true,
2624 ..Default::default()
2625 })
2626 .await
2627 .expect("new_transaction_with_options failed");
2628 file.txn_write(&mut transaction, 0, buffer.as_ref()).await.expect("txn_write failed");
2629 transaction.commit().await.expect("commit failed");
2630 }
2631
2632 #[fuchsia::test]
2633 async fn test_replay_with_deleted_store_and_compaction() {
2634 let (fs, _) = test_fs().await;
2635
2636 const FILE_SIZE: usize = 10_000_000;
2637
2638 let mut store_id = {
2639 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2640 let store = root_vol
2641 .new_volume("vol", NewChildStoreOptions::default())
2642 .await
2643 .expect("new_volume failed");
2644
2645 create_file(&store, FILE_SIZE).await;
2646 store.store_object_id()
2647 };
2648
2649 fs.close().await.expect("close failed");
2650 let device = fs.take_device().await;
2651 device.reopen(false);
2652
2653 let mut fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2654
2655 fs.journal().force_compact().await.expect("compact failed");
2658
2659 for _ in 0..2 {
2660 {
2661 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2662
2663 let transaction = fs
2664 .clone()
2665 .new_transaction(
2666 lock_keys![
2667 LockKey::object(
2668 root_vol.volume_directory().store().store_object_id(),
2669 root_vol.volume_directory().object_id(),
2670 ),
2671 LockKey::flush(store_id)
2672 ],
2673 Options { borrow_metadata_space: true, ..Default::default() },
2674 )
2675 .await
2676 .expect("new_transaction failed");
2677 root_vol
2678 .delete_volume("vol", transaction, || {})
2679 .await
2680 .expect("delete_volume failed");
2681
2682 let store = root_vol
2683 .new_volume("vol", NewChildStoreOptions::default())
2684 .await
2685 .expect("new_volume failed");
2686 create_file(&store, FILE_SIZE).await;
2687 store_id = store.store_object_id();
2688 }
2689
2690 fs.close().await.expect("close failed");
2691 let device = fs.take_device().await;
2692 device.reopen(false);
2693
2694 fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2695 }
2696
2697 fsck(fs.clone()).await.expect("fsck failed");
2698 fs.close().await.expect("close failed");
2699 }
2700
2701 #[fuchsia::test(threads = 4)]
2702 async fn test_compaction_delete_race() {
2703 let (fs, _allocator) = test_fs().await;
2704
2705 {
2706 const FILE_SIZE: usize = 10_000_000;
2707
2708 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2709 let store = root_vol
2710 .new_volume("vol", NewChildStoreOptions::default())
2711 .await
2712 .expect("new_volume failed");
2713
2714 create_file(&store, FILE_SIZE).await;
2715
2716 let fs_clone = fs.clone();
2718
2719 let executor_tasks = testing::force_executor_threads_to_run(4).await;
2722
2723 let task = fasync::Task::spawn(async move {
2724 fs_clone.journal().force_compact().await.expect("compact failed");
2725 });
2726
2727 drop(executor_tasks);
2729
2730 let sleep = rand::random_range(3000..6000);
2733 std::thread::sleep(std::time::Duration::from_micros(sleep));
2734 log::info!("sleep {sleep}us");
2735
2736 let transaction = fs
2737 .clone()
2738 .new_transaction(
2739 lock_keys![
2740 LockKey::object(
2741 root_vol.volume_directory().store().store_object_id(),
2742 root_vol.volume_directory().object_id(),
2743 ),
2744 LockKey::flush(store.store_object_id())
2745 ],
2746 Options { borrow_metadata_space: true, ..Default::default() },
2747 )
2748 .await
2749 .expect("new_transaction failed");
2750 root_vol.delete_volume("vol", transaction, || {}).await.expect("delete_volume failed");
2751
2752 task.await;
2753 }
2754
2755 fs.journal().force_compact().await.expect("compact failed");
2756 fs.close().await.expect("close failed");
2757
2758 let device = fs.take_device().await;
2759 device.reopen(false);
2760
2761 let fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2762 fsck(fs.clone()).await.expect("fsck failed");
2763 fs.close().await.expect("close failed");
2764 }
2765
2766 #[fuchsia::test]
2767 async fn test_delete_multiple_volumes() {
2768 let (mut fs, _) = test_fs().await;
2769
2770 for _ in 0..50 {
2771 {
2772 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2773 let store = root_vol
2774 .new_volume("vol", NewChildStoreOptions::default())
2775 .await
2776 .expect("new_volume failed");
2777
2778 create_file(&store, 1_000_000).await;
2779
2780 let transaction = fs
2781 .clone()
2782 .new_transaction(
2783 lock_keys![
2784 LockKey::object(
2785 root_vol.volume_directory().store().store_object_id(),
2786 root_vol.volume_directory().object_id(),
2787 ),
2788 LockKey::flush(store.store_object_id())
2789 ],
2790 Options { borrow_metadata_space: true, ..Default::default() },
2791 )
2792 .await
2793 .expect("new_transaction failed");
2794 root_vol
2795 .delete_volume("vol", transaction, || {})
2796 .await
2797 .expect("delete_volume failed");
2798
2799 fs.allocator().flush().await.expect("flush failed");
2800 }
2801
2802 fs.close().await.expect("close failed");
2803 let device = fs.take_device().await;
2804 device.reopen(false);
2805
2806 fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2807 }
2808
2809 fsck(fs.clone()).await.expect("fsck failed");
2810 fs.close().await.expect("close failed");
2811 }
2812
2813 #[fuchsia::test]
2814 async fn test_allocate_free_reallocate() {
2815 const STORE_OBJECT_ID: u64 = 99;
2816 let (fs, allocator) = test_fs().await;
2817
2818 let mut device_ranges = Vec::new();
2820 let mut transaction =
2821 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2822 for _ in 0..30 {
2823 device_ranges.push(
2824 allocator
2825 .allocate(&mut transaction, STORE_OBJECT_ID, 100 * fs.block_size())
2826 .await
2827 .expect("allocate failed"),
2828 );
2829 }
2830 transaction.commit().await.expect("commit failed");
2831
2832 assert_eq!(
2833 fs.block_size() * 3000,
2834 *allocator.get_owner_allocated_bytes().entry(STORE_OBJECT_ID).or_default()
2835 );
2836
2837 let mut transaction =
2839 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2840 for range in std::mem::replace(&mut device_ranges, Vec::new()) {
2841 allocator.deallocate(&mut transaction, STORE_OBJECT_ID, range).await.expect("dealloc");
2842 }
2843 transaction.commit().await.expect("commit failed");
2844
2845 assert_eq!(0, *allocator.get_owner_allocated_bytes().entry(STORE_OBJECT_ID).or_default());
2846
2847 let mut transaction =
2850 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2851 let target_len = 1500 * fs.block_size();
2852 while device_ranges.iter().map(|i| i.length().unwrap()).sum::<u64>() != target_len {
2853 let len = target_len - device_ranges.iter().map(|i| i.length().unwrap()).sum::<u64>();
2854 device_ranges.push(
2855 allocator
2856 .allocate(&mut transaction, STORE_OBJECT_ID, len)
2857 .await
2858 .expect("allocate failed"),
2859 );
2860 }
2861 transaction.commit().await.expect("commit failed");
2862
2863 assert_eq!(
2864 fs.block_size() * 1500,
2865 *allocator.get_owner_allocated_bytes().entry(STORE_OBJECT_ID).or_default()
2866 );
2867 }
2868
2869 #[fuchsia::test]
2870 async fn test_flush() {
2871 const STORE_OBJECT_ID: u64 = 99;
2872
2873 let mut device_ranges = Vec::new();
2874 let device = {
2875 let (fs, allocator) = test_fs().await;
2876 let mut transaction = fs
2877 .clone()
2878 .new_transaction(lock_keys![], Options::default())
2879 .await
2880 .expect("new failed");
2881 device_ranges.push(
2882 allocator
2883 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2884 .await
2885 .expect("allocate failed"),
2886 );
2887 device_ranges.push(
2888 allocator
2889 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2890 .await
2891 .expect("allocate failed"),
2892 );
2893 device_ranges.push(
2894 allocator
2895 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2896 .await
2897 .expect("allocate failed"),
2898 );
2899 transaction.commit().await.expect("commit failed");
2900
2901 allocator.flush().await.expect("flush failed");
2902
2903 fs.close().await.expect("close failed");
2904 fs.take_device().await
2905 };
2906
2907 device.reopen(false);
2908 let fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2909 let allocator = fs.allocator();
2910
2911 let allocated = collect_allocations(&allocator).await;
2912
2913 for i in &device_ranges {
2915 let mut overlapping = 0;
2916 for j in &allocated {
2917 overlapping += overlap(i, j);
2918 }
2919 assert_eq!(overlapping, i.length().unwrap(), "Range {i:?} not allocated");
2920 }
2921
2922 let mut transaction =
2923 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2924 let range = allocator
2925 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2926 .await
2927 .expect("allocate failed");
2928
2929 for r in &allocated {
2931 assert_eq!(overlap(r, &range), 0);
2932 }
2933 transaction.commit().await.expect("commit failed");
2934 }
2935
2936 #[fuchsia::test]
2937 async fn test_dropped_transaction() {
2938 const STORE_OBJECT_ID: u64 = 99;
2939 let (fs, allocator) = test_fs().await;
2940 let allocated_range = {
2941 let mut transaction = fs
2942 .clone()
2943 .new_transaction(lock_keys![], Options::default())
2944 .await
2945 .expect("new_transaction failed");
2946 allocator
2947 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2948 .await
2949 .expect("allocate failed")
2950 };
2951 let mut transaction = fs
2954 .clone()
2955 .new_transaction(lock_keys![], Options::default())
2956 .await
2957 .expect("new_transaction failed");
2958 assert_eq!(
2959 allocator
2960 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2961 .await
2962 .expect("allocate failed"),
2963 allocated_range
2964 );
2965 }
2966
2967 #[fuchsia::test]
2968 async fn test_cleanup_removed_owner() {
2969 const STORE_OBJECT_ID: u64 = 99;
2970 let device = {
2971 let (fs, allocator) = test_fs().await;
2972
2973 assert!(!allocator.get_owner_allocated_bytes().contains_key(&STORE_OBJECT_ID));
2974 {
2975 let mut transaction =
2976 fs.clone().new_transaction(lock_keys![], Options::default()).await.unwrap();
2977 allocator
2978 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2979 .await
2980 .expect("Allocating");
2981 transaction.commit().await.expect("Committing.");
2982 }
2983 allocator.flush().await.expect("Flushing");
2984 assert!(allocator.get_owner_allocated_bytes().contains_key(&STORE_OBJECT_ID));
2985 {
2986 let mut transaction =
2987 fs.clone().new_transaction(lock_keys![], Options::default()).await.unwrap();
2988 allocator.mark_for_deletion(&mut transaction, STORE_OBJECT_ID);
2989 transaction.commit().await.expect("Committing.");
2990 }
2991 assert!(!allocator.get_owner_allocated_bytes().contains_key(&STORE_OBJECT_ID));
2992 fs.close().await.expect("Closing");
2993 fs.take_device().await
2994 };
2995
2996 device.reopen(false);
2997 let fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2998 let allocator = fs.allocator();
2999 assert!(!allocator.get_owner_allocated_bytes().contains_key(&STORE_OBJECT_ID));
3000 }
3001
3002 #[fuchsia::test]
3003 async fn test_allocated_bytes() {
3004 const STORE_OBJECT_ID: u64 = 99;
3005 let (fs, allocator) = test_fs().await;
3006
3007 let initial_allocated_bytes = allocator.get_allocated_bytes();
3008
3009 let allocated_bytes = initial_allocated_bytes + fs.block_size();
3011 let allocated_range = {
3012 let mut transaction = fs
3013 .clone()
3014 .new_transaction(lock_keys![], Options::default())
3015 .await
3016 .expect("new_transaction failed");
3017 let range = allocator
3018 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
3019 .await
3020 .expect("allocate failed");
3021 transaction.commit().await.expect("commit failed");
3022 assert_eq!(allocator.get_allocated_bytes(), allocated_bytes);
3023 range
3024 };
3025
3026 {
3027 let mut transaction = fs
3028 .clone()
3029 .new_transaction(lock_keys![], Options::default())
3030 .await
3031 .expect("new_transaction failed");
3032 allocator
3033 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
3034 .await
3035 .expect("allocate failed");
3036
3037 assert_eq!(allocator.get_allocated_bytes(), allocated_bytes);
3039 }
3040
3041 assert_eq!(allocator.get_allocated_bytes(), allocated_bytes);
3043
3044 let deallocate_range = allocated_range.start + 20..allocated_range.end - 20;
3046 let mut transaction =
3047 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
3048 allocator
3049 .deallocate(&mut transaction, STORE_OBJECT_ID, deallocate_range)
3050 .await
3051 .expect("deallocate failed");
3052
3053 assert_eq!(allocator.get_allocated_bytes(), allocated_bytes);
3055
3056 transaction.commit().await.expect("commit failed");
3057
3058 assert_eq!(allocator.get_allocated_bytes(), initial_allocated_bytes + 40);
3060 }
3061
3062 #[fuchsia::test]
3063 async fn test_persist_bytes_limit() {
3064 const LIMIT: u64 = 12345;
3065 const OWNER_ID: u64 = 12;
3066
3067 let (fs, allocator) = test_fs().await;
3068 {
3069 let mut transaction = fs
3070 .clone()
3071 .new_transaction(lock_keys![], Options::default())
3072 .await
3073 .expect("new_transaction failed");
3074 allocator
3075 .set_bytes_limit(&mut transaction, OWNER_ID, LIMIT)
3076 .expect("Failed to set limit.");
3077 assert!(allocator.inner.lock().info.limit_bytes.get(&OWNER_ID).is_none());
3078 transaction.commit().await.expect("Failed to commit transaction");
3079 let bytes: u64 = *allocator
3080 .inner
3081 .lock()
3082 .info
3083 .limit_bytes
3084 .get(&OWNER_ID)
3085 .expect("Failed to find limit");
3086 assert_eq!(LIMIT, bytes);
3087 }
3088 }
3089
3090 fn coalesce_ranges(ranges: Vec<Range<u64>>) -> Vec<Range<u64>> {
3094 let mut coalesced = Vec::new();
3095 let mut prev: Option<Range<u64>> = None;
3096 for range in ranges {
3097 if let Some(prev_range) = &mut prev {
3098 if range.start == prev_range.end {
3099 prev_range.end = range.end;
3100 } else {
3101 coalesced.push(prev_range.clone());
3102 prev = Some(range);
3103 }
3104 } else {
3105 prev = Some(range);
3106 }
3107 }
3108 if let Some(prev_range) = prev {
3109 coalesced.push(prev_range);
3110 }
3111 coalesced
3112 }
3113
3114 #[fuchsia::test]
3115 async fn test_take_for_trimming() {
3116 const STORE_OBJECT_ID: u64 = 99;
3117
3118 let allocated_range;
3121 let expected_free_ranges;
3122 let device = {
3123 let (fs, allocator) = test_fs().await;
3124 let bs = fs.block_size();
3125 let mut transaction = fs
3126 .clone()
3127 .new_transaction(lock_keys![], Options::default())
3128 .await
3129 .expect("new failed");
3130 allocated_range = allocator
3131 .allocate(&mut transaction, STORE_OBJECT_ID, 32 * bs)
3132 .await
3133 .expect("allocate failed");
3134 transaction.commit().await.expect("commit failed");
3135
3136 let mut transaction = fs
3137 .clone()
3138 .new_transaction(lock_keys![], Options::default())
3139 .await
3140 .expect("new failed");
3141 let base = allocated_range.start;
3142 expected_free_ranges = vec![
3143 base..(base + (bs * 1)),
3144 (base + (bs * 2))..(base + (bs * 3)),
3145 (base + (bs * 4))..(base + (bs * 8)),
3149 (base + (bs * 8))..(base + (bs * 12)),
3150 (base + (bs * 12))..(base + (bs * 13)),
3151 (base + (bs * 29))..(base + (bs * 30)),
3152 ];
3153 for range in &expected_free_ranges {
3154 allocator
3155 .deallocate(&mut transaction, STORE_OBJECT_ID, range.clone())
3156 .await
3157 .expect("deallocate failed");
3158 }
3159 transaction.commit().await.expect("commit failed");
3160
3161 allocator.flush().await.expect("flush failed");
3162
3163 fs.close().await.expect("close failed");
3164 fs.take_device().await
3165 };
3166
3167 device.reopen(false);
3168 let fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
3169 let allocator = fs.allocator();
3170
3171 let max_extent_size = fs.block_size() as usize * 4;
3175 const EXTENTS_PER_BATCH: usize = 2;
3176 let mut free_ranges = vec![];
3177 let mut offset = allocated_range.start;
3178 while offset < allocated_range.end {
3179 let free = allocator
3180 .take_for_trimming(offset, max_extent_size, EXTENTS_PER_BATCH)
3181 .await
3182 .expect("take_for_trimming failed");
3183 free_ranges.extend(
3184 free.extents().iter().filter(|range| range.end <= allocated_range.end).cloned(),
3185 );
3186 offset = free.extents().last().expect("Unexpectedly hit the end of free extents").end;
3187 }
3188 let coalesced_free_ranges = coalesce_ranges(free_ranges);
3191 let coalesced_expected_free_ranges = coalesce_ranges(expected_free_ranges);
3192
3193 assert_eq!(coalesced_free_ranges, coalesced_expected_free_ranges);
3194 }
3195
3196 #[fuchsia::test]
3197 async fn test_allocations_wait_for_free_extents() {
3198 const STORE_OBJECT_ID: u64 = 99;
3199 let (fs, allocator) = test_fs().await;
3200 let allocator_clone = allocator.clone();
3201
3202 let mut transaction =
3203 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
3204
3205 let max_extent_size = fs.device().size() as usize;
3207 const EXTENTS_PER_BATCH: usize = usize::MAX;
3208
3209 let trim_done = Arc::new(Mutex::new(false));
3215 let trimmable_extents = allocator
3216 .take_for_trimming(0, max_extent_size, EXTENTS_PER_BATCH)
3217 .await
3218 .expect("take_for_trimming failed");
3219
3220 let trim_done_clone = trim_done.clone();
3221 let bs = fs.block_size();
3222 let alloc_task = fasync::Task::spawn(async move {
3223 allocator_clone
3224 .allocate(&mut transaction, STORE_OBJECT_ID, bs)
3225 .await
3226 .expect("allocate should fail");
3227 {
3228 assert!(*trim_done_clone.lock(), "Allocation finished before trim completed");
3229 }
3230 transaction.commit().await.expect("commit failed");
3231 });
3232
3233 fasync::Timer::new(std::time::Duration::from_millis(100)).await;
3236
3237 {
3239 let mut trim_done = trim_done.lock();
3240 std::mem::drop(trimmable_extents);
3241 *trim_done = true;
3242 }
3243
3244 alloc_task.await;
3245 }
3246
3247 #[fuchsia::test]
3248 async fn test_allocation_with_reservation_not_multiple_of_block_size() {
3249 const STORE_OBJECT_ID: u64 = 99;
3250 let (fs, allocator) = test_fs().await;
3251
3252 const RESERVATION_AMOUNT: u64 = TRANSACTION_METADATA_MAX_AMOUNT + 5000;
3254 let reservation =
3255 allocator.clone().reserve(Some(STORE_OBJECT_ID), RESERVATION_AMOUNT).unwrap();
3256
3257 let mut transaction = fs
3258 .clone()
3259 .new_transaction(
3260 lock_keys![],
3261 Options { allocator_reservation: Some(&reservation), ..Options::default() },
3262 )
3263 .await
3264 .expect("new failed");
3265
3266 let range = allocator
3267 .allocate(
3268 &mut transaction,
3269 STORE_OBJECT_ID,
3270 round_up(RESERVATION_AMOUNT, fs.block_size()).unwrap(),
3271 )
3272 .await
3273 .expect("allocate faiiled");
3274 assert_eq!((range.end - range.start) % fs.block_size(), 0);
3275
3276 println!("{}", range.end - range.start);
3277 }
3278}