1pub mod merge;
90pub mod strategy;
91
92use crate::drop_event::DropEvent;
93use crate::errors::FxfsError;
94use crate::filesystem::{ApplyContext, ApplyMode, FxFilesystem, JournalingObject, SyncOptions};
95use crate::log::*;
96use crate::lsm_tree::cache::NullCache;
97use crate::lsm_tree::skip_list_layer::SkipListLayer;
98use crate::lsm_tree::types::{
99 FuzzyHash, Item, ItemRef, Layer, LayerIterator, LayerKey, MergeType, OrdLowerBound,
100 OrdUpperBound, RangeKey, SortByU64, Value,
101};
102use crate::lsm_tree::{LSMTree, Query, layers_from_handles};
103use crate::object_handle::{INVALID_OBJECT_ID, ObjectHandle, ReadObjectHandle};
104use crate::object_store::object_manager::ReservationUpdate;
105use crate::object_store::transaction::{
106 AllocatorMutation, AssocObj, LockKey, Mutation, Options, Transaction, WriteGuard, lock_keys,
107};
108use crate::object_store::{
109 DataObjectHandle, DirectWriter, HandleOptions, ObjectStore, ReservedId, tree,
110};
111use crate::range::RangeExt;
112use crate::round::{round_div, round_down, round_up};
113use crate::serialized_types::{
114 DEFAULT_MAX_SERIALIZED_RECORD_SIZE, LATEST_VERSION, Version, Versioned, VersionedLatest,
115};
116use anyhow::{Context, Error, anyhow, bail, ensure};
117use async_trait::async_trait;
118use either::Either::{Left, Right};
119use event_listener::EventListener;
120use fprint::TypeFingerprint;
121use fuchsia_inspect::HistogramProperty;
122use fuchsia_sync::Mutex;
123use futures::FutureExt;
124use merge::{filter_marked_for_deletion, filter_tombstones, merge};
125use serde::{Deserialize, Serialize};
126use std::borrow::Borrow;
127use std::collections::{BTreeMap, HashSet, VecDeque};
128use std::hash::Hash;
129use std::marker::PhantomData;
130use std::num::{NonZero, Saturating};
131use std::ops::Range;
132use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
133use std::sync::{Arc, Weak};
134
135pub trait ReservationOwner: Send + Sync {
137 fn release_reservation(&self, owner_object_id: Option<u64>, amount: u64);
141}
142
143pub struct ReservationImpl<T: Borrow<U>, U: ReservationOwner + ?Sized> {
151 owner: T,
152 owner_object_id: Option<u64>,
153 inner: Mutex<ReservationInner>,
154 phantom: PhantomData<U>,
155}
156
157#[derive(Debug, Default)]
158struct ReservationInner {
159 amount: u64,
161
162 reserved: u64,
164}
165
166impl<T: Borrow<U>, U: ReservationOwner + ?Sized> std::fmt::Debug for ReservationImpl<T, U> {
167 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
168 self.inner.lock().fmt(f)
169 }
170}
171
172impl<T: Borrow<U> + Clone + Send + Sync, U: ReservationOwner + ?Sized> ReservationImpl<T, U> {
173 pub fn new(owner: T, owner_object_id: Option<u64>, amount: u64) -> Self {
174 Self {
175 owner,
176 owner_object_id,
177 inner: Mutex::new(ReservationInner { amount, reserved: 0 }),
178 phantom: PhantomData,
179 }
180 }
181
182 pub fn owner_object_id(&self) -> Option<u64> {
183 self.owner_object_id
184 }
185
186 pub fn amount(&self) -> u64 {
188 self.inner.lock().amount
189 }
190
191 pub fn add(&self, amount: u64) {
193 self.inner.lock().amount += amount;
194 }
195
196 pub fn forget(&self) -> u64 {
200 let mut inner = self.inner.lock();
201 assert_eq!(inner.reserved, 0);
202 std::mem::take(&mut inner.amount)
203 }
204
205 pub fn forget_some(&self, amount: u64) {
209 let mut inner = self.inner.lock();
210 inner.amount -= amount;
211 assert!(inner.reserved <= inner.amount);
212 }
213
214 fn reserve_with(&self, amount: impl FnOnce(u64) -> u64) -> ReservationImpl<&Self, Self> {
217 let mut inner = self.inner.lock();
218 let taken = amount(inner.amount - inner.reserved);
219 inner.reserved += taken;
220 ReservationImpl::new(self, self.owner_object_id, taken)
221 }
222
223 pub fn reserve(&self, amount: u64) -> Option<ReservationImpl<&Self, Self>> {
225 let mut inner = self.inner.lock();
226 if inner.amount - inner.reserved < amount {
227 None
228 } else {
229 inner.reserved += amount;
230 Some(ReservationImpl::new(self, self.owner_object_id, amount))
231 }
232 }
233
234 pub fn commit(&self, amount: u64) {
237 let mut inner = self.inner.lock();
238 inner.reserved -= amount;
239 inner.amount -= amount;
240 }
241
242 pub fn give_back(&self, amount: u64) {
244 self.owner.borrow().release_reservation(self.owner_object_id, amount);
245 let mut inner = self.inner.lock();
246 inner.amount -= amount;
247 assert!(inner.reserved <= inner.amount);
248 }
249
250 pub fn move_to<V: Borrow<W> + Clone + Send + Sync, W: ReservationOwner + ?Sized>(
252 &self,
253 other: &ReservationImpl<V, W>,
254 amount: u64,
255 ) {
256 assert_eq!(self.owner_object_id, other.owner_object_id());
257 let mut inner = self.inner.lock();
258 if let Some(amount) = inner.amount.checked_sub(amount) {
259 inner.amount = amount;
260 } else {
261 std::mem::drop(inner);
262 panic!("Insufficient reservation space");
263 }
264 other.add(amount);
265 }
266}
267
268impl<T: Borrow<U>, U: ReservationOwner + ?Sized> Drop for ReservationImpl<T, U> {
269 fn drop(&mut self) {
270 let inner = self.inner.get_mut();
271 assert_eq!(inner.reserved, 0);
272 let owner_object_id = self.owner_object_id;
273 if inner.amount > 0 {
274 self.owner
275 .borrow()
276 .release_reservation(owner_object_id, std::mem::take(&mut inner.amount));
277 }
278 }
279}
280
281impl<T: Borrow<U> + Send + Sync, U: ReservationOwner + ?Sized> ReservationOwner
282 for ReservationImpl<T, U>
283{
284 fn release_reservation(&self, owner_object_id: Option<u64>, amount: u64) {
285 assert_eq!(owner_object_id, self.owner_object_id);
287 let mut inner = self.inner.lock();
288 assert!(inner.reserved >= amount, "{} >= {}", inner.reserved, amount);
289 inner.reserved -= amount;
290 }
291}
292
293pub type Reservation = ReservationImpl<Arc<dyn ReservationOwner>, dyn ReservationOwner>;
294
295pub type Hold<'a> = ReservationImpl<&'a Reservation, Reservation>;
296
297pub type AllocatorKey = AllocatorKeyV32;
300
301#[derive(Clone, Debug, Deserialize, Eq, Hash, PartialEq, Serialize, TypeFingerprint, Versioned)]
302#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
303pub struct AllocatorKeyV32 {
304 pub device_range: Range<u64>,
305}
306
307impl SortByU64 for AllocatorKey {
308 fn get_leading_u64(&self) -> u64 {
309 self.device_range.start
310 }
311}
312
313const EXTENT_HASH_BUCKET_SIZE: u64 = 1 * 1024 * 1024;
314
315pub struct AllocatorKeyPartitionIterator {
316 device_range: Range<u64>,
317}
318
319impl Iterator for AllocatorKeyPartitionIterator {
320 type Item = u64;
321
322 fn next(&mut self) -> Option<Self::Item> {
323 if self.device_range.start >= self.device_range.end {
324 None
325 } else {
326 let start = self.device_range.start;
327 self.device_range.start = start.saturating_add(EXTENT_HASH_BUCKET_SIZE);
328 let end = std::cmp::min(self.device_range.start, self.device_range.end);
329 let key = AllocatorKey { device_range: start..end };
330 let hash = crate::stable_hash::stable_hash(key);
331 Some(hash)
332 }
333 }
334}
335
336impl FuzzyHash for AllocatorKey {
337 fn fuzzy_hash(&self) -> impl Iterator<Item = u64> {
338 AllocatorKeyPartitionIterator {
339 device_range: round_down(self.device_range.start, EXTENT_HASH_BUCKET_SIZE)
340 ..round_up(self.device_range.end, EXTENT_HASH_BUCKET_SIZE).unwrap_or(u64::MAX),
341 }
342 }
343
344 fn is_range_key(&self) -> bool {
345 true
346 }
347}
348
349impl AllocatorKey {
350 pub fn lower_bound_for_merge_into(self: &AllocatorKey) -> AllocatorKey {
352 AllocatorKey { device_range: 0..self.device_range.start }
353 }
354}
355
356impl LayerKey for AllocatorKey {
357 fn merge_type(&self) -> MergeType {
358 MergeType::OptimizedMerge
359 }
360}
361
362impl OrdUpperBound for AllocatorKey {
363 fn cmp_upper_bound(&self, other: &AllocatorKey) -> std::cmp::Ordering {
364 self.device_range.end.cmp(&other.device_range.end)
365 }
366}
367
368impl OrdLowerBound for AllocatorKey {
369 fn cmp_lower_bound(&self, other: &AllocatorKey) -> std::cmp::Ordering {
370 self.device_range
375 .start
376 .cmp(&other.device_range.start)
377 .then(self.device_range.end.cmp(&other.device_range.end))
378 }
379}
380
381impl Ord for AllocatorKey {
382 fn cmp(&self, other: &AllocatorKey) -> std::cmp::Ordering {
383 self.device_range
384 .start
385 .cmp(&other.device_range.start)
386 .then(self.device_range.end.cmp(&other.device_range.end))
387 }
388}
389
390impl PartialOrd for AllocatorKey {
391 fn partial_cmp(&self, other: &AllocatorKey) -> Option<std::cmp::Ordering> {
392 Some(self.cmp(other))
393 }
394}
395
396impl RangeKey for AllocatorKey {
397 fn overlaps(&self, other: &Self) -> bool {
398 self.device_range.start < other.device_range.end
399 && self.device_range.end > other.device_range.start
400 }
401}
402
403pub type AllocatorValue = AllocatorValueV32;
406impl Value for AllocatorValue {
407 const DELETED_MARKER: Self = Self::None;
408}
409
410#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize, TypeFingerprint, Versioned)]
411#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
412pub enum AllocatorValueV32 {
413 None,
415 Abs { count: u64, owner_object_id: u64 },
419}
420
421pub type AllocatorItem = Item<AllocatorKey, AllocatorValue>;
422
423pub type AllocatorInfo = AllocatorInfoV32;
425
426#[derive(Debug, Default, Clone, Deserialize, Serialize, TypeFingerprint, Versioned)]
427pub struct AllocatorInfoV32 {
428 pub layers: Vec<u64>,
430 pub allocated_bytes: BTreeMap<u64, u64>,
432 pub marked_for_deletion: HashSet<u64>,
435 pub limit_bytes: BTreeMap<u64, u64>,
438}
439
440const MAX_ALLOCATOR_INFO_SERIALIZED_SIZE: usize = 131_072;
441
442pub fn max_extent_size_for_block_size(block_size: u64) -> u64 {
444 block_size * (DEFAULT_MAX_SERIALIZED_RECORD_SIZE - 64) / 9
449}
450
451#[derive(Default)]
452struct AllocatorCounters {
453 num_flushes: u64,
454 last_flush_time: Option<std::time::SystemTime>,
455}
456
457pub struct Allocator {
458 filesystem: Weak<FxFilesystem>,
459 block_size: u64,
460 device_size: u64,
461 object_id: u64,
462 max_extent_size_bytes: u64,
463 tree: LSMTree<AllocatorKey, AllocatorValue>,
464 temporary_allocations: Arc<SkipListLayer<AllocatorKey, AllocatorValue>>,
471 inner: Mutex<Inner>,
472 allocation_mutex: futures::lock::Mutex<()>,
473 counters: Mutex<AllocatorCounters>,
474 maximum_offset: AtomicU64,
475 allocations_allowed: AtomicBool,
476}
477
478#[derive(Debug, Default, PartialEq)]
480struct ByteTracking {
481 allocated_bytes: Saturating<u64>,
488
489 uncommitted_allocated_bytes: u64,
492
493 reserved_bytes: u64,
495
496 committed_deallocated_bytes: u64,
500}
501
502impl ByteTracking {
503 fn used_bytes(&self) -> Saturating<u64> {
506 self.allocated_bytes + Saturating(self.uncommitted_allocated_bytes + self.reserved_bytes)
507 }
508
509 fn unavailable_bytes(&self) -> Saturating<u64> {
514 self.allocated_bytes
515 + Saturating(self.uncommitted_allocated_bytes)
516 + Saturating(self.committed_deallocated_bytes)
517 }
518
519 fn unavailable_after_sync_bytes(&self) -> Saturating<u64> {
522 self.allocated_bytes + Saturating(self.uncommitted_allocated_bytes)
523 }
524}
525
526#[derive(Debug)]
527struct CommittedDeallocation {
528 log_file_offset: u64,
530 range: Range<u64>,
532 owner_object_id: u64,
534}
535
536struct Inner {
537 info: AllocatorInfo,
538
539 opened: bool,
542
543 dropped_temporary_allocations: Vec<Range<u64>>,
571
572 owner_bytes: BTreeMap<u64, ByteTracking>,
575
576 unattributed_reserved_bytes: u64,
579
580 committed_deallocated: VecDeque<CommittedDeallocation>,
582
583 trim_reserved_bytes: u64,
586
587 trim_listener: Option<EventListener>,
591
592 strategy: strategy::BestFit,
594
595 allocation_size_histogram: [u64; 64],
597 rebuild_strategy_trigger_histogram: [u64; 64],
599
600 marked_for_deletion: HashSet<u64>,
604
605 volumes_deleted_pending_sync: HashSet<u64>,
607}
608
609impl Inner {
610 fn allocated_bytes(&self) -> Saturating<u64> {
611 let mut total = Saturating(0);
612 for (_, bytes) in &self.owner_bytes {
613 total += bytes.allocated_bytes;
614 }
615 total
616 }
617
618 fn uncommitted_allocated_bytes(&self) -> u64 {
619 self.owner_bytes.values().map(|x| &x.uncommitted_allocated_bytes).sum()
620 }
621
622 fn reserved_bytes(&self) -> u64 {
623 self.owner_bytes.values().map(|x| &x.reserved_bytes).sum::<u64>()
624 + self.unattributed_reserved_bytes
625 }
626
627 fn owner_id_limit_bytes(&self, owner_object_id: u64) -> u64 {
628 match self.info.limit_bytes.get(&owner_object_id) {
629 Some(v) => *v,
630 None => u64::MAX,
631 }
632 }
633
634 fn owner_id_bytes_left(&self, owner_object_id: u64) -> u64 {
635 let limit = self.owner_id_limit_bytes(owner_object_id);
636 let used = self.owner_bytes.get(&owner_object_id).map_or(Saturating(0), |b| b.used_bytes());
637 (Saturating(limit) - used).0
638 }
639
640 fn unavailable_bytes(&self) -> Saturating<u64> {
645 let mut total = Saturating(0);
646 for (_, bytes) in &self.owner_bytes {
647 total += bytes.unavailable_bytes();
648 }
649 total
650 }
651
652 fn used_bytes(&self) -> Saturating<u64> {
655 let mut total = Saturating(0);
656 for (_, bytes) in &self.owner_bytes {
657 total += bytes.used_bytes();
658 }
659 total + Saturating(self.unattributed_reserved_bytes)
660 }
661
662 fn unavailable_after_sync_bytes(&self) -> Saturating<u64> {
665 let mut total = Saturating(0);
666 for (_, bytes) in &self.owner_bytes {
667 total += bytes.unavailable_after_sync_bytes();
668 }
669 total
670 }
671
672 fn bytes_available_not_being_trimmed(&self, device_size: u64) -> Result<u64, Error> {
675 device_size
676 .checked_sub(
677 (self.unavailable_after_sync_bytes() + Saturating(self.trim_reserved_bytes)).0,
678 )
679 .ok_or_else(|| anyhow!(FxfsError::Inconsistent))
680 }
681
682 fn add_reservation(&mut self, owner_object_id: Option<u64>, amount: u64) {
683 match owner_object_id {
684 Some(owner) => self.owner_bytes.entry(owner).or_default().reserved_bytes += amount,
685 None => self.unattributed_reserved_bytes += amount,
686 };
687 }
688
689 fn remove_reservation(&mut self, owner_object_id: Option<u64>, amount: u64) {
690 match owner_object_id {
691 Some(owner) => {
692 let owner_entry = self.owner_bytes.entry(owner).or_default();
693 assert!(
694 owner_entry.reserved_bytes >= amount,
695 "{} >= {}",
696 owner_entry.reserved_bytes,
697 amount
698 );
699 owner_entry.reserved_bytes -= amount;
700 }
701 None => {
702 assert!(
703 self.unattributed_reserved_bytes >= amount,
704 "{} >= {}",
705 self.unattributed_reserved_bytes,
706 amount
707 );
708 self.unattributed_reserved_bytes -= amount
709 }
710 };
711 }
712}
713
714pub struct TrimmableExtents<'a> {
717 allocator: &'a Allocator,
718 extents: Vec<Range<u64>>,
719 _drop_event: DropEvent,
723}
724
725impl<'a> TrimmableExtents<'a> {
726 pub fn extents(&self) -> &Vec<Range<u64>> {
727 &self.extents
728 }
729
730 fn new(allocator: &'a Allocator) -> (Self, EventListener) {
732 let drop_event = DropEvent::new();
733 let listener = drop_event.listen();
734 (Self { allocator, extents: vec![], _drop_event: drop_event }, listener)
735 }
736
737 fn add_extent(&mut self, extent: Range<u64>) {
738 self.extents.push(extent);
739 }
740}
741
742impl<'a> Drop for TrimmableExtents<'a> {
743 fn drop(&mut self) {
744 let mut inner = self.allocator.inner.lock();
745 for device_range in std::mem::take(&mut self.extents) {
746 inner.strategy.free(device_range.clone()).expect("drop trim extent");
747 self.allocator
748 .temporary_allocations
749 .erase(&AllocatorKey { device_range: device_range.clone() });
750 }
751 inner.trim_reserved_bytes = 0;
752 }
753}
754
755impl Allocator {
756 pub fn new(filesystem: Arc<FxFilesystem>, object_id: u64) -> Allocator {
757 let block_size = filesystem.block_size();
758 let device_size = round_down(filesystem.device().size(), block_size);
760 if device_size != filesystem.device().size() {
761 warn!("Device size is not block aligned. Rounding down.");
762 }
763 let max_extent_size_bytes = max_extent_size_for_block_size(filesystem.block_size());
764 let mut strategy = strategy::BestFit::default();
765 strategy.free(0..device_size).expect("new fs");
766 Allocator {
767 filesystem: Arc::downgrade(&filesystem),
768 block_size,
769 device_size,
770 object_id,
771 max_extent_size_bytes,
772 tree: LSMTree::new(merge, Box::new(NullCache {})),
773 temporary_allocations: SkipListLayer::new(1024),
774 inner: Mutex::new(Inner {
775 info: AllocatorInfo::default(),
776 opened: false,
777 dropped_temporary_allocations: Vec::new(),
778 owner_bytes: BTreeMap::new(),
779 unattributed_reserved_bytes: 0,
780 committed_deallocated: VecDeque::new(),
781 trim_reserved_bytes: 0,
782 trim_listener: None,
783 strategy,
784 allocation_size_histogram: [0; 64],
785 rebuild_strategy_trigger_histogram: [0; 64],
786 marked_for_deletion: HashSet::new(),
787 volumes_deleted_pending_sync: HashSet::new(),
788 }),
789 allocation_mutex: futures::lock::Mutex::new(()),
790 counters: Mutex::new(AllocatorCounters::default()),
791 maximum_offset: AtomicU64::new(0),
792 allocations_allowed: AtomicBool::new(filesystem.options().image_builder_mode.is_none()),
793 }
794 }
795
796 pub fn tree(&self) -> &LSMTree<AllocatorKey, AllocatorValue> {
797 &self.tree
798 }
799
800 pub fn enable_allocations(&self) {
803 self.allocations_allowed.store(true, Ordering::SeqCst);
804 }
805
806 pub async fn filter(
811 &self,
812 iter: impl LayerIterator<AllocatorKey, AllocatorValue>,
813 committed_marked_for_deletion: bool,
814 ) -> Result<impl LayerIterator<AllocatorKey, AllocatorValue>, Error> {
815 let marked_for_deletion = {
816 let inner = self.inner.lock();
817 if committed_marked_for_deletion {
818 &inner.info.marked_for_deletion
819 } else {
820 &inner.marked_for_deletion
821 }
822 .clone()
823 };
824 let iter =
825 filter_marked_for_deletion(filter_tombstones(iter).await?, marked_for_deletion).await?;
826 Ok(iter)
827 }
828
829 pub fn allocation_size_histogram(&self) -> [u64; 64] {
833 self.inner.lock().allocation_size_histogram
834 }
835
836 pub async fn create(&self, transaction: &mut Transaction<'_>) -> Result<(), Error> {
838 assert_eq!(std::mem::replace(&mut self.inner.lock().opened, true), false);
841
842 let filesystem = self.filesystem.upgrade().unwrap();
843 let root_store = filesystem.root_store();
844 ObjectStore::create_object_with_id(
845 &root_store,
846 transaction,
847 ReservedId::new(&root_store, NonZero::new(self.object_id()).unwrap()),
848 HandleOptions::default(),
849 None,
850 )?;
851 root_store.update_last_object_id(self.object_id());
852 Ok(())
853 }
854
855 pub async fn open(self: &Arc<Self>) -> Result<(), Error> {
858 let filesystem = self.filesystem.upgrade().unwrap();
859 let root_store = filesystem.root_store();
860
861 self.inner.lock().strategy = strategy::BestFit::default();
862
863 let handle =
864 ObjectStore::open_object(&root_store, self.object_id, HandleOptions::default(), None)
865 .await
866 .context("Failed to open allocator object")?;
867
868 if handle.get_size() > 0 {
869 let serialized_info = handle
870 .contents(MAX_ALLOCATOR_INFO_SERIALIZED_SIZE)
871 .await
872 .context("Failed to read AllocatorInfo")?;
873 let mut cursor = std::io::Cursor::new(serialized_info);
874 let (info, _version) = AllocatorInfo::deserialize_with_version(&mut cursor)
875 .context("Failed to deserialize AllocatorInfo")?;
876
877 let mut handles = Vec::new();
878 let mut total_size = 0;
879 for object_id in &info.layers {
880 let handle = ObjectStore::open_object(
881 &root_store,
882 *object_id,
883 HandleOptions::default(),
884 None,
885 )
886 .await
887 .context("Failed to open allocator layer file")?;
888
889 let size = handle.get_size();
890 total_size += size;
891 handles.push(handle);
892 }
893
894 {
895 let mut inner = self.inner.lock();
896
897 let mut device_bytes = self.device_size;
899 for (&owner_object_id, &bytes) in &info.allocated_bytes {
900 ensure!(
901 bytes <= device_bytes,
902 anyhow!(FxfsError::Inconsistent).context(format!(
903 "Allocated bytes exceeds device size: {:?}",
904 info.allocated_bytes
905 ))
906 );
907 device_bytes -= bytes;
908
909 inner.owner_bytes.entry(owner_object_id).or_default().allocated_bytes =
910 Saturating(bytes);
911 }
912
913 inner.info = info;
914 }
915
916 self.tree.append_layers(handles).await.context("Failed to append allocator layers")?;
917 self.filesystem.upgrade().unwrap().object_manager().update_reservation(
918 self.object_id,
919 tree::reservation_amount_from_layer_size(total_size),
920 );
921 }
922
923 Ok(())
924 }
925
926 pub async fn on_replay_complete(self: &Arc<Self>) -> Result<(), Error> {
927 {
929 let mut inner = self.inner.lock();
930 inner.volumes_deleted_pending_sync.clear();
931 inner.marked_for_deletion = inner.info.marked_for_deletion.clone();
932 }
933
934 if !self.rebuild_strategy().await.context("Build free extents")? {
938 if self.filesystem.upgrade().unwrap().options().read_only {
939 info!("Device contains no free space (read-only mode).");
940 } else {
941 info!("Device contains no free space.");
942 return Err(FxfsError::Inconsistent)
943 .context("Device appears to contain no free space");
944 }
945 }
946
947 assert_eq!(std::mem::replace(&mut self.inner.lock().opened, true), false);
948 Ok(())
949 }
950
951 async fn rebuild_strategy(self: &Arc<Self>) -> Result<bool, Error> {
956 let mut changed = false;
957 let mut layer_set = self.tree.empty_layer_set();
958 layer_set.layers.push((self.temporary_allocations.clone() as Arc<dyn Layer<_, _>>).into());
959 self.tree.add_all_layers_to_layer_set(&mut layer_set);
960
961 let overflow_markers = self.inner.lock().strategy.overflow_markers();
962 self.inner.lock().strategy.reset_overflow_markers();
963
964 let mut to_add = Vec::new();
965 let mut merger = layer_set.merger();
966 let mut iter = self.filter(merger.query(Query::FullScan).await?, false).await?;
967 let mut last_offset = 0;
968 while last_offset < self.device_size {
969 let next_range = match iter.get() {
970 None => {
971 assert!(last_offset <= self.device_size);
972 let range = last_offset..self.device_size;
973 last_offset = self.device_size;
974 iter.advance().await?;
975 range
976 }
977 Some(ItemRef { key: AllocatorKey { device_range, .. }, .. }) => {
978 if device_range.end < last_offset {
979 iter.advance().await?;
980 continue;
981 }
982 if device_range.start <= last_offset {
983 last_offset = device_range.end;
984 iter.advance().await?;
985 continue;
986 }
987 let range = last_offset..device_range.start;
988 last_offset = device_range.end;
989 iter.advance().await?;
990 range
991 }
992 };
993 to_add.push(next_range);
994 if to_add.len() > 100 {
996 let mut inner = self.inner.lock();
997 for range in to_add.drain(..) {
998 changed |= inner.strategy.force_free(range)?;
999 }
1000 }
1001 }
1002 let mut inner = self.inner.lock();
1003 for range in to_add {
1004 changed |= inner.strategy.force_free(range)?;
1005 }
1006 if overflow_markers != inner.strategy.overflow_markers() {
1007 changed = true;
1008 }
1009 Ok(changed)
1010 }
1011
1012 pub async fn take_for_trimming(
1016 &self,
1017 offset: u64,
1018 max_extent_size: usize,
1019 extents_per_batch: usize,
1020 ) -> Result<TrimmableExtents<'_>, Error> {
1021 let _guard = self.allocation_mutex.lock().await;
1022
1023 let (mut result, listener) = TrimmableExtents::new(self);
1024 let mut bytes = 0;
1025
1026 let mut layer_set = self.tree.empty_layer_set();
1029 layer_set.layers.push((self.temporary_allocations.clone() as Arc<dyn Layer<_, _>>).into());
1030 self.tree.add_all_layers_to_layer_set(&mut layer_set);
1031 let mut merger = layer_set.merger();
1032 let mut iter = self
1033 .filter(
1034 merger.query(Query::FullRange(&AllocatorKey { device_range: offset..0 })).await?,
1035 false,
1036 )
1037 .await?;
1038 let mut last_offset = offset;
1039 'outer: while last_offset < self.device_size {
1040 let mut range = match iter.get() {
1041 None => {
1042 assert!(last_offset <= self.device_size);
1043 let range = last_offset..self.device_size;
1044 last_offset = self.device_size;
1045 iter.advance().await?;
1046 range
1047 }
1048 Some(ItemRef { key: AllocatorKey { device_range, .. }, .. }) => {
1049 if device_range.end <= last_offset {
1050 iter.advance().await?;
1051 continue;
1052 }
1053 if device_range.start <= last_offset {
1054 last_offset = device_range.end;
1055 iter.advance().await?;
1056 continue;
1057 }
1058 let range = last_offset..device_range.start;
1059 last_offset = device_range.end;
1060 iter.advance().await?;
1061 range
1062 }
1063 };
1064 if range.start < offset {
1065 continue;
1066 }
1067
1068 let mut inner = self.inner.lock();
1071
1072 while range.start < range.end {
1073 let prefix =
1074 range.start..std::cmp::min(range.start + max_extent_size as u64, range.end);
1075 range = prefix.end..range.end;
1076 bytes += prefix.length()?;
1077 inner.strategy.remove(prefix.clone());
1080 self.temporary_allocations.insert(AllocatorItem {
1081 key: AllocatorKey { device_range: prefix.clone() },
1082 value: AllocatorValue::Abs { owner_object_id: INVALID_OBJECT_ID, count: 1 },
1083 sequence: 0,
1084 })?;
1085 result.add_extent(prefix);
1086 if result.extents.len() == extents_per_batch {
1087 break 'outer;
1088 }
1089 }
1090 if result.extents.len() == extents_per_batch {
1091 break 'outer;
1092 }
1093 }
1094 {
1095 let mut inner = self.inner.lock();
1096
1097 assert!(inner.trim_reserved_bytes == 0, "Multiple trims ongoing");
1098 inner.trim_listener = Some(listener);
1099 inner.trim_reserved_bytes = bytes;
1100 debug_assert!(
1101 (Saturating(inner.trim_reserved_bytes) + inner.unavailable_bytes()).0
1102 <= self.device_size
1103 );
1104 }
1105 Ok(result)
1106 }
1107
1108 pub fn parent_objects(&self) -> Vec<u64> {
1110 self.inner.lock().info.layers.clone()
1113 }
1114
1115 pub fn owner_byte_limits(&self) -> Vec<(u64, u64)> {
1117 self.inner.lock().info.limit_bytes.iter().map(|(k, v)| (*k, *v)).collect()
1118 }
1119
1120 pub fn owner_allocation_info(&self, owner_object_id: u64) -> (u64, Option<u64>) {
1122 let inner = self.inner.lock();
1123 (
1124 inner.owner_bytes.get(&owner_object_id).map(|b| b.used_bytes().0).unwrap_or(0u64),
1125 inner.info.limit_bytes.get(&owner_object_id).copied(),
1126 )
1127 }
1128
1129 pub fn owner_bytes_debug(&self) -> String {
1131 format!("{:?}", self.inner.lock().owner_bytes)
1132 }
1133
1134 fn needs_sync(&self) -> bool {
1135 let inner = self.inner.lock();
1140 inner.unavailable_bytes().0 >= self.device_size
1141 }
1142
1143 fn is_system_store(&self, owner_object_id: u64) -> bool {
1144 let fs = self.filesystem.upgrade().unwrap();
1145 owner_object_id == fs.object_manager().root_store_object_id()
1146 || owner_object_id == fs.object_manager().root_parent_store_object_id()
1147 }
1148
1149 pub fn disown_reservation(&self, old_owner_object_id: Option<u64>, amount: u64) {
1152 if old_owner_object_id.is_none() || amount == 0 {
1153 return;
1154 }
1155 let mut inner = self.inner.lock();
1157 inner.remove_reservation(old_owner_object_id, amount);
1158 inner.add_reservation(None, amount);
1159 }
1160
1161 pub fn track_statistics(self: &Arc<Self>, parent: &fuchsia_inspect::Node, name: &str) {
1164 let this = Arc::downgrade(self);
1165 parent.record_lazy_child(name, move || {
1166 let this_clone = this.clone();
1167 async move {
1168 let inspector = fuchsia_inspect::Inspector::default();
1169 if let Some(this) = this_clone.upgrade() {
1170 let counters = this.counters.lock();
1171 let root = inspector.root();
1172 root.record_uint("max_extent_size_bytes", this.max_extent_size_bytes);
1173 root.record_uint("bytes_total", this.device_size);
1174 let (allocated, reserved, used, unavailable) = {
1175 let inner = this.inner.lock();
1177 (
1178 inner.allocated_bytes().0,
1179 inner.reserved_bytes(),
1180 inner.used_bytes().0,
1181 inner.unavailable_bytes().0,
1182 )
1183 };
1184 root.record_uint("bytes_allocated", allocated);
1185 root.record_uint("bytes_reserved", reserved);
1186 root.record_uint("bytes_used", used);
1187 root.record_uint("bytes_unavailable", unavailable);
1188
1189 if let Some(x) = round_div(100 * allocated, this.device_size) {
1192 root.record_uint("bytes_allocated_percent", x);
1193 }
1194 if let Some(x) = round_div(100 * reserved, this.device_size) {
1195 root.record_uint("bytes_reserved_percent", x);
1196 }
1197 if let Some(x) = round_div(100 * used, this.device_size) {
1198 root.record_uint("bytes_used_percent", x);
1199 }
1200 if let Some(x) = round_div(100 * unavailable, this.device_size) {
1201 root.record_uint("bytes_unavailable_percent", x);
1202 }
1203
1204 root.record_uint("num_flushes", counters.num_flushes);
1205 if let Some(last_flush_time) = counters.last_flush_time.as_ref() {
1206 root.record_uint(
1207 "last_flush_time_ms",
1208 last_flush_time
1209 .duration_since(std::time::UNIX_EPOCH)
1210 .unwrap_or(std::time::Duration::ZERO)
1211 .as_millis()
1212 .try_into()
1213 .unwrap_or(0u64),
1214 );
1215 }
1216
1217 let data = this.allocation_size_histogram();
1218 let alloc_sizes = root.create_uint_linear_histogram(
1219 "allocation_size_histogram",
1220 fuchsia_inspect::LinearHistogramParams {
1221 floor: 1,
1222 step_size: 1,
1223 buckets: 64,
1224 },
1225 );
1226 for (i, count) in data.iter().enumerate() {
1227 if i != 0 {
1228 alloc_sizes.insert_multiple(i as u64, *count as usize);
1229 }
1230 }
1231 root.record(alloc_sizes);
1232
1233 let data = this.inner.lock().rebuild_strategy_trigger_histogram;
1234 let triggers = root.create_uint_linear_histogram(
1235 "rebuild_strategy_triggers",
1236 fuchsia_inspect::LinearHistogramParams {
1237 floor: 1,
1238 step_size: 1,
1239 buckets: 64,
1240 },
1241 );
1242 for (i, count) in data.iter().enumerate() {
1243 if i != 0 {
1244 triggers.insert_multiple(i as u64, *count as usize);
1245 }
1246 }
1247 root.record(triggers);
1248 }
1249 Ok(inspector)
1250 }
1251 .boxed()
1252 });
1253 }
1254
1255 pub fn maximum_offset(&self) -> u64 {
1260 self.maximum_offset.load(Ordering::Relaxed)
1261 }
1262}
1263
1264impl Drop for Allocator {
1265 fn drop(&mut self) {
1266 let inner = self.inner.lock();
1267 assert_eq!(inner.uncommitted_allocated_bytes(), 0);
1269 assert_eq!(inner.reserved_bytes(), 0);
1270 }
1271}
1272
1273#[fxfs_trace::trace]
1274impl Allocator {
1275 pub fn object_id(&self) -> u64 {
1277 self.object_id
1278 }
1279
1280 pub fn info(&self) -> AllocatorInfo {
1283 self.inner.lock().info.clone()
1284 }
1285
1286 #[trace]
1294 pub async fn allocate(
1295 self: &Arc<Self>,
1296 transaction: &mut Transaction<'_>,
1297 owner_object_id: u64,
1298 mut len: u64,
1299 ) -> Result<Range<u64>, Error> {
1300 ensure!(self.allocations_allowed.load(Ordering::SeqCst), FxfsError::Unavailable);
1301 assert_eq!(len % self.block_size, 0);
1302 len = std::cmp::min(len, self.max_extent_size_bytes);
1303 debug_assert_ne!(owner_object_id, INVALID_OBJECT_ID);
1304
1305 let reservation = if let Some(reservation) = transaction.allocator_reservation {
1307 match reservation.owner_object_id {
1308 None => assert!(self.is_system_store(owner_object_id)),
1310 Some(res_owner_object_id) => assert_eq!(owner_object_id, res_owner_object_id),
1312 };
1313 let r = reservation
1315 .reserve_with(|limit| std::cmp::min(len, round_down(limit, self.block_size)));
1316 len = r.amount();
1317 Left(r)
1318 } else {
1319 let mut inner = self.inner.lock();
1320 assert!(inner.opened);
1321 let device_used = inner.used_bytes();
1323 let owner_bytes_left = inner.owner_id_bytes_left(owner_object_id);
1324 let limit =
1326 std::cmp::min(owner_bytes_left, (Saturating(self.device_size) - device_used).0);
1327 len = round_down(std::cmp::min(len, limit), self.block_size);
1328 let owner_entry = inner.owner_bytes.entry(owner_object_id).or_default();
1329 owner_entry.reserved_bytes += len;
1330 Right(ReservationImpl::<_, Self>::new(&**self, Some(owner_object_id), len))
1331 };
1332
1333 ensure!(len > 0, FxfsError::NoSpace);
1334
1335 let volumes_deleted = {
1337 let inner = self.inner.lock();
1338 (!inner.volumes_deleted_pending_sync.is_empty())
1339 .then(|| inner.volumes_deleted_pending_sync.clone())
1340 };
1341
1342 if let Some(volumes_deleted) = volumes_deleted {
1343 self.filesystem
1346 .upgrade()
1347 .unwrap()
1348 .sync(SyncOptions {
1349 flush_device: true,
1350 precondition: Some(Box::new(|| {
1351 !self.inner.lock().volumes_deleted_pending_sync.is_empty()
1352 })),
1353 ..Default::default()
1354 })
1355 .await?;
1356
1357 {
1358 let mut inner = self.inner.lock();
1359 for owner_id in volumes_deleted {
1360 inner.volumes_deleted_pending_sync.remove(&owner_id);
1361 inner.marked_for_deletion.insert(owner_id);
1362 }
1363 }
1364
1365 let _guard = self.allocation_mutex.lock().await;
1366 self.rebuild_strategy().await?;
1367 }
1368
1369 #[allow(clippy::never_loop)] let _guard = 'sync: loop {
1371 for _ in 0..10 {
1373 {
1374 let guard = self.allocation_mutex.lock().await;
1375
1376 if !self.needs_sync() {
1377 break 'sync guard;
1378 }
1379 }
1380
1381 self.filesystem
1391 .upgrade()
1392 .unwrap()
1393 .sync(SyncOptions {
1394 flush_device: true,
1395 precondition: Some(Box::new(|| self.needs_sync())),
1396 ..Default::default()
1397 })
1398 .await?;
1399 }
1400 bail!(
1401 anyhow!(FxfsError::NoSpace).context("Sync failed to yield sufficient free space.")
1402 );
1403 };
1404
1405 let mut trim_listener = None;
1406 {
1407 let mut inner = self.inner.lock();
1408 inner.allocation_size_histogram[std::cmp::min(63, len / self.block_size) as usize] += 1;
1409
1410 let avail = self
1413 .device_size
1414 .checked_sub(inner.unavailable_bytes().0)
1415 .ok_or(FxfsError::Inconsistent)?;
1416 let free_and_not_being_trimmed =
1417 inner.bytes_available_not_being_trimmed(self.device_size)?;
1418 if free_and_not_being_trimmed < std::cmp::min(len, avail) {
1419 debug_assert!(inner.trim_reserved_bytes > 0);
1420 trim_listener = std::mem::take(&mut inner.trim_listener);
1421 }
1422 }
1423
1424 if let Some(listener) = trim_listener {
1425 listener.await;
1426 }
1427
1428 let result = loop {
1429 {
1430 let mut inner = self.inner.lock();
1431
1432 for device_range in inner.dropped_temporary_allocations.drain(..) {
1435 self.temporary_allocations.erase(&AllocatorKey { device_range });
1436 }
1437
1438 match inner.strategy.allocate(len) {
1439 Err(FxfsError::NotFound) => {
1440 inner.rebuild_strategy_trigger_histogram
1442 [std::cmp::min(63, (len / self.block_size) as usize)] += 1;
1443 }
1444 Err(err) => {
1445 error!(err:%; "Likely filesystem corruption.");
1446 return Err(err.into());
1447 }
1448 Ok(x) => {
1449 break x;
1450 }
1451 }
1452 }
1453 if !self.rebuild_strategy().await? {
1457 error!("Cannot find additional free space. Corruption?");
1458 return Err(FxfsError::Inconsistent.into());
1459 }
1460 };
1461
1462 debug!(device_range:? = result; "allocate");
1463
1464 let len = result.length().unwrap();
1465 let reservation_owner = reservation.either(
1466 |l| {
1468 l.forget_some(len);
1469 l.owner_object_id()
1470 },
1471 |r| {
1472 r.forget_some(len);
1473 r.owner_object_id()
1474 },
1475 );
1476
1477 {
1478 let mut inner = self.inner.lock();
1479 let owner_entry = inner.owner_bytes.entry(owner_object_id).or_default();
1480 owner_entry.uncommitted_allocated_bytes += len;
1481 assert_eq!(owner_object_id, reservation_owner.unwrap_or(owner_object_id));
1483 inner.remove_reservation(reservation_owner, len);
1484 self.temporary_allocations.insert(AllocatorItem {
1485 key: AllocatorKey { device_range: result.clone() },
1486 value: AllocatorValue::Abs { owner_object_id, count: 1 },
1487 sequence: 0,
1488 })?;
1489 }
1490
1491 let mutation =
1492 AllocatorMutation::Allocate { device_range: result.clone().into(), owner_object_id };
1493 assert!(transaction.add(self.object_id(), Mutation::Allocator(mutation)).is_none());
1494
1495 Ok(result)
1496 }
1497
1498 #[trace]
1501 pub fn mark_allocated(
1502 &self,
1503 transaction: &mut Transaction<'_>,
1504 owner_object_id: u64,
1505 device_range: Range<u64>,
1506 ) -> Result<(), Error> {
1507 debug_assert_ne!(owner_object_id, INVALID_OBJECT_ID);
1508 {
1509 let len = device_range.length().map_err(|_| FxfsError::InvalidArgs)?;
1510
1511 let mut inner = self.inner.lock();
1512 let device_used = inner.used_bytes();
1513 let owner_id_bytes_left = inner.owner_id_bytes_left(owner_object_id);
1514 let owner_entry = inner.owner_bytes.entry(owner_object_id).or_default();
1515 ensure!(
1516 device_range.end <= self.device_size
1517 && (Saturating(self.device_size) - device_used).0 >= len
1518 && owner_id_bytes_left >= len,
1519 FxfsError::NoSpace
1520 );
1521 if let Some(reservation) = &mut transaction.allocator_reservation {
1522 reservation.reserve(len).ok_or(FxfsError::NoSpace)?.forget();
1524 }
1525 owner_entry.uncommitted_allocated_bytes += len;
1526 inner.strategy.remove(device_range.clone());
1527 self.temporary_allocations.insert(AllocatorItem {
1528 key: AllocatorKey { device_range: device_range.clone() },
1529 value: AllocatorValue::Abs { owner_object_id, count: 1 },
1530 sequence: 0,
1531 })?;
1532 }
1533 let mutation =
1534 AllocatorMutation::Allocate { device_range: device_range.into(), owner_object_id };
1535 transaction.add(self.object_id(), Mutation::Allocator(mutation));
1536 Ok(())
1537 }
1538
1539 pub fn set_bytes_limit(
1541 &self,
1542 transaction: &mut Transaction<'_>,
1543 owner_object_id: u64,
1544 bytes: u64,
1545 ) -> Result<(), Error> {
1546 assert!(!self.is_system_store(owner_object_id));
1548 transaction.add(
1549 self.object_id(),
1550 Mutation::Allocator(AllocatorMutation::SetLimit { owner_object_id, bytes }),
1551 );
1552 Ok(())
1553 }
1554
1555 pub fn get_bytes_limit(&self, owner_object_id: u64) -> Option<u64> {
1557 self.inner.lock().info.limit_bytes.get(&owner_object_id).copied()
1558 }
1559
1560 #[trace]
1562 pub async fn deallocate(
1563 &self,
1564 transaction: &mut Transaction<'_>,
1565 owner_object_id: u64,
1566 dealloc_range: Range<u64>,
1567 ) -> Result<u64, Error> {
1568 debug!(device_range:? = dealloc_range; "deallocate");
1569 ensure!(dealloc_range.is_valid(), FxfsError::InvalidArgs);
1570 let deallocated = dealloc_range.end - dealloc_range.start;
1574 let mutation = AllocatorMutation::Deallocate {
1575 device_range: dealloc_range.clone().into(),
1576 owner_object_id,
1577 };
1578 transaction.add(self.object_id(), Mutation::Allocator(mutation));
1579
1580 let _guard = self.allocation_mutex.lock().await;
1581
1582 let mut inner = self.inner.lock();
1594 for device_range in inner.dropped_temporary_allocations.drain(..) {
1595 self.temporary_allocations.erase(&AllocatorKey { device_range });
1596 }
1597
1598 self.temporary_allocations
1603 .insert(AllocatorItem {
1604 key: AllocatorKey { device_range: dealloc_range.clone() },
1605 value: AllocatorValue::Abs { owner_object_id, count: 1 },
1606 sequence: 0,
1607 })
1608 .context("tracking deallocated")?;
1609
1610 Ok(deallocated)
1611 }
1612
1613 pub fn mark_for_deletion(&self, transaction: &mut Transaction<'_>, owner_object_id: u64) {
1630 transaction.add(
1633 self.object_id(),
1634 Mutation::Allocator(AllocatorMutation::MarkForDeletion(owner_object_id)),
1635 );
1636 }
1637
1638 pub fn did_flush_device(&self, flush_log_offset: u64) {
1641 #[allow(clippy::never_loop)] let deallocs = 'deallocs_outer: loop {
1646 let mut inner = self.inner.lock();
1647 for (index, dealloc) in inner.committed_deallocated.iter().enumerate() {
1648 if dealloc.log_file_offset >= flush_log_offset {
1649 let mut deallocs = inner.committed_deallocated.split_off(index);
1650 std::mem::swap(&mut inner.committed_deallocated, &mut deallocs);
1652 break 'deallocs_outer deallocs;
1653 }
1654 }
1655 break std::mem::take(&mut inner.committed_deallocated);
1656 };
1657
1658 let mut inner = self.inner.lock();
1660 let mut totals = BTreeMap::<u64, u64>::new();
1661 for dealloc in deallocs {
1662 *(totals.entry(dealloc.owner_object_id).or_default()) +=
1663 dealloc.range.length().unwrap();
1664 inner.strategy.free(dealloc.range.clone()).expect("dealloced ranges");
1665 self.temporary_allocations.erase(&AllocatorKey { device_range: dealloc.range.clone() });
1666 }
1667
1668 for (owner_object_id, total) in totals {
1672 match inner.owner_bytes.get_mut(&owner_object_id) {
1673 Some(counters) => counters.committed_deallocated_bytes -= total,
1674 None => panic!("Failed to decrement for unknown owner: {}", owner_object_id),
1675 }
1676 }
1677 }
1678
1679 pub fn reserve(
1682 self: Arc<Self>,
1683 owner_object_id: Option<u64>,
1684 amount: u64,
1685 ) -> Option<Reservation> {
1686 {
1687 let mut inner = self.inner.lock();
1688
1689 let device_free = (Saturating(self.device_size) - inner.used_bytes()).0;
1690
1691 let limit = match owner_object_id {
1692 Some(id) => std::cmp::min(inner.owner_id_bytes_left(id), device_free),
1693 None => device_free,
1694 };
1695 if limit < amount {
1696 return None;
1697 }
1698 inner.add_reservation(owner_object_id, amount);
1699 }
1700 Some(Reservation::new(self, owner_object_id, amount))
1701 }
1702
1703 pub fn reserve_with(
1706 self: Arc<Self>,
1707 owner_object_id: Option<u64>,
1708 amount: impl FnOnce(u64) -> u64,
1709 ) -> Reservation {
1710 let amount = {
1711 let mut inner = self.inner.lock();
1712
1713 let device_free = (Saturating(self.device_size) - inner.used_bytes()).0;
1714
1715 let amount = amount(match owner_object_id {
1716 Some(id) => std::cmp::min(inner.owner_id_bytes_left(id), device_free),
1717 None => device_free,
1718 });
1719
1720 inner.add_reservation(owner_object_id, amount);
1721
1722 amount
1723 };
1724
1725 Reservation::new(self, owner_object_id, amount)
1726 }
1727
1728 pub fn get_allocated_bytes(&self) -> u64 {
1730 self.inner.lock().allocated_bytes().0
1731 }
1732
1733 pub fn get_disk_bytes(&self) -> u64 {
1735 self.device_size
1736 }
1737
1738 pub fn get_owner_allocated_bytes(&self) -> BTreeMap<u64, u64> {
1742 self.inner.lock().owner_bytes.iter().map(|(k, v)| (*k, v.allocated_bytes.0)).collect()
1743 }
1744
1745 pub fn get_used_bytes(&self) -> Saturating<u64> {
1747 let inner = self.inner.lock();
1748 inner.used_bytes()
1749 }
1750}
1751
1752impl ReservationOwner for Allocator {
1753 fn release_reservation(&self, owner_object_id: Option<u64>, amount: u64) {
1754 self.inner.lock().remove_reservation(owner_object_id, amount);
1755 }
1756}
1757
1758#[async_trait]
1759impl JournalingObject for Allocator {
1760 fn apply_mutation(
1761 &self,
1762 mutation: Mutation,
1763 context: &ApplyContext<'_, '_>,
1764 _assoc_obj: AssocObj<'_>,
1765 ) -> Result<(), Error> {
1766 match mutation {
1767 Mutation::Allocator(AllocatorMutation::MarkForDeletion(owner_object_id)) => {
1768 let mut inner = self.inner.lock();
1769 inner.owner_bytes.remove(&owner_object_id);
1770
1771 inner.info.marked_for_deletion.insert(owner_object_id);
1776 inner.volumes_deleted_pending_sync.insert(owner_object_id);
1777
1778 inner.info.limit_bytes.remove(&owner_object_id);
1779 }
1780 Mutation::Allocator(AllocatorMutation::Allocate { device_range, owner_object_id }) => {
1781 self.maximum_offset.fetch_max(device_range.end, Ordering::Relaxed);
1782 let item = AllocatorItem {
1783 key: AllocatorKey { device_range: device_range.clone().into() },
1784 value: AllocatorValue::Abs { count: 1, owner_object_id },
1785 sequence: context.checkpoint.file_offset,
1786 };
1787 let len = item.key.device_range.length().unwrap();
1788 let lower_bound = item.key.lower_bound_for_merge_into();
1789 self.tree.merge_into(item, &lower_bound);
1790 let mut inner = self.inner.lock();
1791 let entry = inner.owner_bytes.entry(owner_object_id).or_default();
1792 entry.allocated_bytes += len;
1793 if let ApplyMode::Live(transaction) = context.mode {
1794 entry.uncommitted_allocated_bytes -= len;
1795 inner.dropped_temporary_allocations.push(device_range.into());
1800 if let Some(reservation) = transaction.allocator_reservation {
1801 reservation.commit(len);
1802 }
1803 }
1804 }
1805 Mutation::Allocator(AllocatorMutation::Deallocate {
1806 device_range,
1807 owner_object_id,
1808 }) => {
1809 let item = AllocatorItem {
1810 key: AllocatorKey { device_range: device_range.into() },
1811 value: AllocatorValue::None,
1812 sequence: context.checkpoint.file_offset,
1813 };
1814 let len = item.key.device_range.length().unwrap();
1815
1816 {
1817 let mut inner = self.inner.lock();
1818 {
1819 let entry = inner.owner_bytes.entry(owner_object_id).or_default();
1820 entry.allocated_bytes -= len;
1821 if context.mode.is_live() {
1822 entry.committed_deallocated_bytes += len;
1823 }
1824 }
1825 if context.mode.is_live() {
1826 inner.committed_deallocated.push_back(CommittedDeallocation {
1827 log_file_offset: context.checkpoint.file_offset,
1828 range: item.key.device_range.clone(),
1829 owner_object_id,
1830 });
1831 }
1832 if let ApplyMode::Live(Transaction {
1833 allocator_reservation: Some(reservation),
1834 ..
1835 }) = context.mode
1836 {
1837 inner.add_reservation(reservation.owner_object_id(), len);
1838 reservation.add(len);
1839 }
1840 }
1841 let lower_bound = item.key.lower_bound_for_merge_into();
1842 self.tree.merge_into(item, &lower_bound);
1843 }
1844 Mutation::Allocator(AllocatorMutation::SetLimit { owner_object_id, bytes }) => {
1845 self.inner.lock().info.limit_bytes.insert(owner_object_id, bytes);
1850 }
1851 Mutation::BeginFlush => {
1852 self.tree.seal();
1853 let mut inner = self.inner.lock();
1856 let allocated_bytes =
1857 inner.owner_bytes.iter().map(|(k, v)| (*k, v.allocated_bytes.0)).collect();
1858 inner.info.allocated_bytes = allocated_bytes;
1859 }
1860 Mutation::EndFlush => {}
1861 _ => bail!("unexpected mutation: {:?}", mutation),
1862 }
1863 Ok(())
1864 }
1865
1866 fn drop_mutation(&self, mutation: Mutation, transaction: &Transaction<'_>) {
1867 match mutation {
1868 Mutation::Allocator(AllocatorMutation::Allocate { device_range, owner_object_id }) => {
1869 let len = device_range.length().unwrap();
1870 let mut inner = self.inner.lock();
1871 inner
1872 .owner_bytes
1873 .entry(owner_object_id)
1874 .or_default()
1875 .uncommitted_allocated_bytes -= len;
1876 if let Some(reservation) = transaction.allocator_reservation {
1877 let res_owner = reservation.owner_object_id();
1878 inner.add_reservation(res_owner, len);
1879 reservation.release_reservation(res_owner, len);
1880 }
1881 inner.strategy.free(device_range.clone().into()).expect("drop mutaton");
1882 self.temporary_allocations
1883 .erase(&AllocatorKey { device_range: device_range.into() });
1884 }
1885 Mutation::Allocator(AllocatorMutation::Deallocate { device_range, .. }) => {
1886 self.temporary_allocations
1887 .erase(&AllocatorKey { device_range: device_range.into() });
1888 }
1889 _ => {}
1890 }
1891 }
1892
1893 async fn flush(&self) -> Result<Version, Error> {
1894 let filesystem = self.filesystem.upgrade().unwrap();
1895 let object_manager = filesystem.object_manager();
1896 let earliest_version = self.tree.get_earliest_version();
1897 if !object_manager.needs_flush(self.object_id()) && earliest_version == LATEST_VERSION {
1898 return Ok(earliest_version);
1900 }
1901
1902 let fs = self.filesystem.upgrade().unwrap();
1903 let mut flusher = Flusher::new(self, &fs).await;
1904 let (new_layer_file, info) = flusher.start().await?;
1905 flusher.finish(new_layer_file, info).await
1906 }
1907}
1908
1909pub struct CoalescingIterator<I> {
1922 iter: I,
1923 item: Option<AllocatorItem>,
1924}
1925
1926impl<I: LayerIterator<AllocatorKey, AllocatorValue>> CoalescingIterator<I> {
1927 pub async fn new(iter: I) -> Result<CoalescingIterator<I>, Error> {
1928 let mut iter = Self { iter, item: None };
1929 iter.advance().await?;
1930 Ok(iter)
1931 }
1932}
1933
1934#[async_trait]
1935impl<I: LayerIterator<AllocatorKey, AllocatorValue>> LayerIterator<AllocatorKey, AllocatorValue>
1936 for CoalescingIterator<I>
1937{
1938 async fn advance(&mut self) -> Result<(), Error> {
1939 self.item = self.iter.get().map(|x| x.cloned());
1940 if self.item.is_none() {
1941 return Ok(());
1942 }
1943 let left = self.item.as_mut().unwrap();
1944 loop {
1945 self.iter.advance().await?;
1946 match self.iter.get() {
1947 None => return Ok(()),
1948 Some(right) => {
1949 ensure!(
1951 left.key.device_range.end <= right.key.device_range.start,
1952 FxfsError::Inconsistent
1953 );
1954 if left.key.device_range.end < right.key.device_range.start
1956 || left.value != *right.value
1957 {
1958 return Ok(());
1959 }
1960 left.key.device_range.end = right.key.device_range.end;
1961 }
1962 }
1963 }
1964 }
1965
1966 fn get(&self) -> Option<ItemRef<'_, AllocatorKey, AllocatorValue>> {
1967 self.item.as_ref().map(|x| x.as_item_ref())
1968 }
1969}
1970
1971struct Flusher<'a> {
1972 allocator: &'a Allocator,
1973 fs: &'a Arc<FxFilesystem>,
1974 _guard: WriteGuard<'a>,
1975}
1976
1977impl<'a> Flusher<'a> {
1978 async fn new(allocator: &'a Allocator, fs: &'a Arc<FxFilesystem>) -> Self {
1979 let keys = lock_keys![LockKey::flush(allocator.object_id())];
1980 Self { allocator, fs, _guard: fs.lock_manager().write_lock(keys).await }
1981 }
1982
1983 fn txn_options(allocator_reservation: &Reservation) -> Options<'_> {
1984 Options {
1985 skip_journal_checks: true,
1986 borrow_metadata_space: true,
1987 allocator_reservation: Some(allocator_reservation),
1988 ..Default::default()
1989 }
1990 }
1991
1992 async fn start(&mut self) -> Result<(DataObjectHandle<ObjectStore>, AllocatorInfo), Error> {
1993 let object_manager = self.fs.object_manager();
1994 let mut transaction = self
1995 .fs
1996 .clone()
1997 .new_transaction(lock_keys![], Self::txn_options(object_manager.metadata_reservation()))
1998 .await?;
1999
2000 let root_store = self.fs.root_store();
2001 let layer_object_handle = ObjectStore::create_object(
2002 &root_store,
2003 &mut transaction,
2004 HandleOptions { skip_journal_checks: true, ..Default::default() },
2005 None,
2006 )
2007 .await?;
2008 root_store.add_to_graveyard(&mut transaction, layer_object_handle.object_id());
2009 transaction.add(self.allocator.object_id(), Mutation::BeginFlush);
2016 let info = transaction
2017 .commit_with_callback(|_| {
2018 self.allocator.inner.lock().info.clone()
2021 })
2022 .await?;
2023 Ok((layer_object_handle, info))
2024 }
2025
2026 async fn finish(
2027 self,
2028 layer_object_handle: DataObjectHandle<ObjectStore>,
2029 mut info: AllocatorInfo,
2030 ) -> Result<Version, Error> {
2031 let object_manager = self.fs.object_manager();
2032 let txn_options = Self::txn_options(object_manager.metadata_reservation());
2033
2034 let layer_set = self.allocator.tree.immutable_layer_set();
2035 let total_len = layer_set.sum_len();
2036 {
2037 let mut merger = layer_set.merger();
2038 let iter = self.allocator.filter(merger.query(Query::FullScan).await?, true).await?;
2039 let iter = CoalescingIterator::new(iter).await?;
2040 self.allocator
2041 .tree
2042 .compact_with_iterator(
2043 iter,
2044 total_len,
2045 DirectWriter::new(&layer_object_handle, txn_options).await,
2046 layer_object_handle.block_size(),
2047 )
2048 .await?;
2049 }
2050
2051 let root_store = self.fs.root_store();
2052
2053 let object_handle;
2055 let reservation_update;
2056 let mut transaction = self
2057 .fs
2058 .clone()
2059 .new_transaction(
2060 lock_keys![LockKey::object(
2061 root_store.store_object_id(),
2062 self.allocator.object_id()
2063 )],
2064 txn_options,
2065 )
2066 .await?;
2067 let mut serialized_info = Vec::new();
2068
2069 debug!(oid = layer_object_handle.object_id(); "new allocator layer file");
2070 object_handle = ObjectStore::open_object(
2071 &root_store,
2072 self.allocator.object_id(),
2073 HandleOptions::default(),
2074 None,
2075 )
2076 .await?;
2077
2078 for object_id in &info.layers {
2080 root_store.add_to_graveyard(&mut transaction, *object_id);
2081 }
2082
2083 let marked_for_deletion = std::mem::take(&mut info.marked_for_deletion);
2090
2091 info.layers = vec![layer_object_handle.object_id()];
2092
2093 info.serialize_with_version(&mut serialized_info)?;
2094
2095 let mut buf = object_handle.allocate_buffer(serialized_info.len()).await;
2096 buf.as_mut_slice()[..serialized_info.len()].copy_from_slice(&serialized_info[..]);
2097 object_handle.txn_write(&mut transaction, 0u64, buf.as_ref()).await?;
2098
2099 reservation_update = ReservationUpdate::new(tree::reservation_amount_from_layer_size(
2100 layer_object_handle.get_size(),
2101 ));
2102
2103 transaction.add_with_object(
2106 self.allocator.object_id(),
2107 Mutation::EndFlush,
2108 AssocObj::Borrowed(&reservation_update),
2109 );
2110 root_store.remove_from_graveyard(&mut transaction, layer_object_handle.object_id());
2111
2112 let layers = layers_from_handles([layer_object_handle]).await?;
2113 transaction
2114 .commit_with_callback(|_| {
2115 self.allocator.tree.set_layers(layers);
2116
2117 let mut inner = self.allocator.inner.lock();
2121 inner.info.layers = info.layers;
2122 for owner_id in marked_for_deletion {
2123 inner.marked_for_deletion.remove(&owner_id);
2124 inner.info.marked_for_deletion.remove(&owner_id);
2125 }
2126 })
2127 .await?;
2128
2129 for layer in layer_set.layers {
2131 let object_id = layer.handle().map(|h| h.object_id());
2132 layer.close_layer().await;
2133 if let Some(object_id) = object_id {
2134 root_store.tombstone_object(object_id, txn_options).await?;
2135 }
2136 }
2137
2138 let mut counters = self.allocator.counters.lock();
2139 counters.num_flushes += 1;
2140 counters.last_flush_time = Some(std::time::SystemTime::now());
2141 Ok(self.allocator.tree.get_earliest_version())
2143 }
2144}
2145
2146#[cfg(test)]
2147mod tests {
2148 use crate::filesystem::{
2149 FxFilesystem, FxFilesystemBuilder, JournalingObject, OpenFxFilesystem,
2150 };
2151 use crate::fsck::fsck;
2152 use crate::lsm_tree::cache::NullCache;
2153 use crate::lsm_tree::skip_list_layer::SkipListLayer;
2154 use crate::lsm_tree::types::{FuzzyHash as _, Item, ItemRef, Layer, LayerIterator};
2155 use crate::lsm_tree::{LSMTree, Query};
2156 use crate::object_handle::ObjectHandle;
2157 use crate::object_store::allocator::merge::merge;
2158 use crate::object_store::allocator::{
2159 Allocator, AllocatorKey, AllocatorValue, CoalescingIterator,
2160 };
2161 use crate::object_store::transaction::{Options, TRANSACTION_METADATA_MAX_AMOUNT, lock_keys};
2162 use crate::object_store::volume::root_volume;
2163 use crate::object_store::{Directory, LockKey, NewChildStoreOptions, ObjectStore};
2164 use crate::range::RangeExt;
2165 use crate::round::round_up;
2166 use crate::testing;
2167 use fuchsia_async as fasync;
2168 use fuchsia_sync::Mutex;
2169 use std::cmp::{max, min};
2170 use std::ops::{Bound, Range};
2171 use std::sync::Arc;
2172 use storage_device::DeviceHolder;
2173 use storage_device::fake_device::FakeDevice;
2174
2175 #[test]
2176 fn test_allocator_key_is_range_based() {
2177 assert!(AllocatorKey { device_range: 0..100 }.is_range_key());
2179 }
2180
2181 #[fuchsia::test]
2182 async fn test_coalescing_iterator() {
2183 let skip_list = SkipListLayer::new(100);
2184 let items = [
2185 Item::new(
2186 AllocatorKey { device_range: 0..100 },
2187 AllocatorValue::Abs { count: 1, owner_object_id: 99 },
2188 ),
2189 Item::new(
2190 AllocatorKey { device_range: 100..200 },
2191 AllocatorValue::Abs { count: 1, owner_object_id: 99 },
2192 ),
2193 ];
2194 skip_list.insert(items[1].clone()).expect("insert error");
2195 skip_list.insert(items[0].clone()).expect("insert error");
2196 let mut iter =
2197 CoalescingIterator::new(skip_list.seek(Bound::Unbounded).await.expect("seek failed"))
2198 .await
2199 .expect("new failed");
2200 let ItemRef { key, value, .. } = iter.get().expect("get failed");
2201 assert_eq!(
2202 (key, value),
2203 (
2204 &AllocatorKey { device_range: 0..200 },
2205 &AllocatorValue::Abs { count: 1, owner_object_id: 99 }
2206 )
2207 );
2208 iter.advance().await.expect("advance failed");
2209 assert!(iter.get().is_none());
2210 }
2211
2212 #[fuchsia::test]
2213 async fn test_merge_and_coalesce_across_three_layers() {
2214 let lsm_tree = LSMTree::new(merge, Box::new(NullCache {}));
2215 lsm_tree
2216 .insert(Item::new(
2217 AllocatorKey { device_range: 100..200 },
2218 AllocatorValue::Abs { count: 1, owner_object_id: 99 },
2219 ))
2220 .expect("insert error");
2221 lsm_tree.seal();
2222 lsm_tree
2223 .insert(Item::new(
2224 AllocatorKey { device_range: 0..100 },
2225 AllocatorValue::Abs { count: 1, owner_object_id: 99 },
2226 ))
2227 .expect("insert error");
2228
2229 let layer_set = lsm_tree.layer_set();
2230 let mut merger = layer_set.merger();
2231 let mut iter =
2232 CoalescingIterator::new(merger.query(Query::FullScan).await.expect("seek failed"))
2233 .await
2234 .expect("new failed");
2235 let ItemRef { key, value, .. } = iter.get().expect("get failed");
2236 assert_eq!(
2237 (key, value),
2238 (
2239 &AllocatorKey { device_range: 0..200 },
2240 &AllocatorValue::Abs { count: 1, owner_object_id: 99 }
2241 )
2242 );
2243 iter.advance().await.expect("advance failed");
2244 assert!(iter.get().is_none());
2245 }
2246
2247 #[fuchsia::test]
2248 async fn test_merge_and_coalesce_wont_merge_across_object_id() {
2249 let lsm_tree = LSMTree::new(merge, Box::new(NullCache {}));
2250 lsm_tree
2251 .insert(Item::new(
2252 AllocatorKey { device_range: 100..200 },
2253 AllocatorValue::Abs { count: 1, owner_object_id: 99 },
2254 ))
2255 .expect("insert error");
2256 lsm_tree.seal();
2257 lsm_tree
2258 .insert(Item::new(
2259 AllocatorKey { device_range: 0..100 },
2260 AllocatorValue::Abs { count: 1, owner_object_id: 98 },
2261 ))
2262 .expect("insert error");
2263
2264 let layer_set = lsm_tree.layer_set();
2265 let mut merger = layer_set.merger();
2266 let mut iter =
2267 CoalescingIterator::new(merger.query(Query::FullScan).await.expect("seek failed"))
2268 .await
2269 .expect("new failed");
2270 let ItemRef { key, value, .. } = iter.get().expect("get failed");
2271 assert_eq!(
2272 (key, value),
2273 (
2274 &AllocatorKey { device_range: 0..100 },
2275 &AllocatorValue::Abs { count: 1, owner_object_id: 98 },
2276 )
2277 );
2278 iter.advance().await.expect("advance failed");
2279 let ItemRef { key, value, .. } = iter.get().expect("get failed");
2280 assert_eq!(
2281 (key, value),
2282 (
2283 &AllocatorKey { device_range: 100..200 },
2284 &AllocatorValue::Abs { count: 1, owner_object_id: 99 }
2285 )
2286 );
2287 iter.advance().await.expect("advance failed");
2288 assert!(iter.get().is_none());
2289 }
2290
2291 fn overlap(a: &Range<u64>, b: &Range<u64>) -> u64 {
2292 if a.end > b.start && a.start < b.end {
2293 min(a.end, b.end) - max(a.start, b.start)
2294 } else {
2295 0
2296 }
2297 }
2298
2299 async fn collect_allocations(allocator: &Allocator) -> Vec<Range<u64>> {
2300 let layer_set = allocator.tree.layer_set();
2301 let mut merger = layer_set.merger();
2302 let mut iter = allocator
2303 .filter(merger.query(Query::FullScan).await.expect("seek failed"), false)
2304 .await
2305 .expect("build iterator");
2306 let mut allocations: Vec<Range<u64>> = Vec::new();
2307 while let Some(ItemRef { key: AllocatorKey { device_range }, .. }) = iter.get() {
2308 if let Some(r) = allocations.last() {
2309 assert!(device_range.start >= r.end);
2310 }
2311 allocations.push(device_range.clone());
2312 iter.advance().await.expect("advance failed");
2313 }
2314 allocations
2315 }
2316
2317 async fn check_allocations(allocator: &Allocator, expected_allocations: &[Range<u64>]) {
2318 let layer_set = allocator.tree.layer_set();
2319 let mut merger = layer_set.merger();
2320 let mut iter = allocator
2321 .filter(merger.query(Query::FullScan).await.expect("seek failed"), false)
2322 .await
2323 .expect("build iterator");
2324 let mut found = 0;
2325 while let Some(ItemRef { key: AllocatorKey { device_range }, .. }) = iter.get() {
2326 let mut l = device_range.length().expect("Invalid range");
2327 found += l;
2328 for range in expected_allocations {
2331 l -= overlap(range, device_range);
2332 if l == 0 {
2333 break;
2334 }
2335 }
2336 assert_eq!(l, 0, "range {device_range:?} not covered by expectations");
2337 iter.advance().await.expect("advance failed");
2338 }
2339 assert_eq!(found, expected_allocations.iter().map(|r| r.length().unwrap()).sum::<u64>());
2341 }
2342
2343 async fn test_fs() -> (OpenFxFilesystem, Arc<Allocator>) {
2344 let device = DeviceHolder::new(FakeDevice::new(4096, 4096));
2345 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2346 let allocator = fs.allocator();
2347 (fs, allocator)
2348 }
2349
2350 #[fuchsia::test]
2351 async fn test_allocations() {
2352 const STORE_OBJECT_ID: u64 = 99;
2353 let (fs, allocator) = test_fs().await;
2354 let mut transaction =
2355 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2356 let mut device_ranges = collect_allocations(&allocator).await;
2357
2358 let expected = vec![
2360 0..4096, 4096..139264, 139264..204800, 204800..335872, 335872..401408, 524288..528384, ];
2367 assert_eq!(device_ranges, expected);
2368 device_ranges.push(
2369 allocator
2370 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2371 .await
2372 .expect("allocate failed"),
2373 );
2374 assert_eq!(device_ranges.last().unwrap().length().expect("Invalid range"), fs.block_size());
2375 device_ranges.push(
2376 allocator
2377 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2378 .await
2379 .expect("allocate failed"),
2380 );
2381 assert_eq!(device_ranges.last().unwrap().length().expect("Invalid range"), fs.block_size());
2382 assert_eq!(overlap(&device_ranges[0], &device_ranges[1]), 0);
2383 transaction.commit().await.expect("commit failed");
2384 let mut transaction =
2385 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2386 device_ranges.push(
2387 allocator
2388 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2389 .await
2390 .expect("allocate failed"),
2391 );
2392 assert_eq!(device_ranges[7].length().expect("Invalid range"), fs.block_size());
2393 assert_eq!(overlap(&device_ranges[5], &device_ranges[7]), 0);
2394 assert_eq!(overlap(&device_ranges[6], &device_ranges[7]), 0);
2395 transaction.commit().await.expect("commit failed");
2396
2397 check_allocations(&allocator, &device_ranges).await;
2398 }
2399
2400 #[fuchsia::test]
2401 async fn test_allocate_more_than_max_size() {
2402 const STORE_OBJECT_ID: u64 = 99;
2403 let (fs, allocator) = test_fs().await;
2404 let mut transaction =
2405 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2406 let mut device_ranges = collect_allocations(&allocator).await;
2407 device_ranges.push(
2408 allocator
2409 .allocate(&mut transaction, STORE_OBJECT_ID, fs.device().size())
2410 .await
2411 .expect("allocate failed"),
2412 );
2413 assert_eq!(
2414 device_ranges.last().unwrap().length().expect("Invalid range"),
2415 allocator.max_extent_size_bytes
2416 );
2417 transaction.commit().await.expect("commit failed");
2418
2419 check_allocations(&allocator, &device_ranges).await;
2420 }
2421
2422 #[fuchsia::test]
2423 async fn test_deallocations() {
2424 const STORE_OBJECT_ID: u64 = 99;
2425 let (fs, allocator) = test_fs().await;
2426 let initial_allocations = collect_allocations(&allocator).await;
2427
2428 let mut transaction =
2429 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2430 let device_range1 = allocator
2431 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2432 .await
2433 .expect("allocate failed");
2434 assert_eq!(device_range1.length().expect("Invalid range"), fs.block_size());
2435 transaction.commit().await.expect("commit failed");
2436
2437 let mut transaction =
2438 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2439 allocator
2440 .deallocate(&mut transaction, STORE_OBJECT_ID, device_range1)
2441 .await
2442 .expect("deallocate failed");
2443 transaction.commit().await.expect("commit failed");
2444
2445 check_allocations(&allocator, &initial_allocations).await;
2446 }
2447
2448 #[fuchsia::test]
2449 async fn test_mark_allocated() {
2450 const STORE_OBJECT_ID: u64 = 99;
2451 let (fs, allocator) = test_fs().await;
2452 let mut device_ranges = collect_allocations(&allocator).await;
2453 let range = {
2454 let mut transaction = fs
2455 .clone()
2456 .new_transaction(lock_keys![], Options::default())
2457 .await
2458 .expect("new failed");
2459 allocator
2461 .allocate(&mut transaction, STORE_OBJECT_ID, 2 * fs.block_size())
2462 .await
2463 .expect("allocate failed")
2464 };
2466
2467 let mut transaction =
2468 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2469
2470 device_ranges.push(
2473 allocator
2474 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2475 .await
2476 .expect("allocate failed"),
2477 );
2478
2479 assert_eq!(device_ranges.last().unwrap().start, range.start);
2480
2481 let mut range2 = range.clone();
2483 range2.start += fs.block_size();
2484 allocator
2485 .mark_allocated(&mut transaction, STORE_OBJECT_ID, range2.clone())
2486 .expect("mark_allocated failed");
2487 device_ranges.push(range2);
2488
2489 device_ranges.push(
2491 allocator
2492 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2493 .await
2494 .expect("allocate failed"),
2495 );
2496 let last_range = device_ranges.last().unwrap();
2497 assert_eq!(last_range.length().expect("Invalid range"), fs.block_size());
2498 assert_eq!(overlap(last_range, &range), 0);
2499 transaction.commit().await.expect("commit failed");
2500
2501 check_allocations(&allocator, &device_ranges).await;
2502 }
2503
2504 #[fuchsia::test]
2505 async fn test_mark_for_deletion() {
2506 const STORE_OBJECT_ID: u64 = 99;
2507 let (fs, allocator) = test_fs().await;
2508
2509 let initial_allocated_bytes = allocator.get_allocated_bytes();
2511 let mut device_ranges = collect_allocations(&allocator).await;
2512 let mut transaction =
2513 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2514 for _ in 0..15 {
2516 device_ranges.push(
2517 allocator
2518 .allocate(&mut transaction, STORE_OBJECT_ID, 100 * fs.block_size())
2519 .await
2520 .expect("allocate failed"),
2521 );
2522 device_ranges.push(
2523 allocator
2524 .allocate(&mut transaction, STORE_OBJECT_ID, 100 * fs.block_size())
2525 .await
2526 .expect("allocate2 failed"),
2527 );
2528 }
2529 transaction.commit().await.expect("commit failed");
2530 check_allocations(&allocator, &device_ranges).await;
2531
2532 assert_eq!(
2533 allocator.get_allocated_bytes(),
2534 initial_allocated_bytes + fs.block_size() * 3000
2535 );
2536
2537 let mut transaction =
2539 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2540 allocator.mark_for_deletion(&mut transaction, STORE_OBJECT_ID);
2541 transaction.commit().await.expect("commit failed");
2542
2543 assert_eq!(allocator.get_allocated_bytes(), initial_allocated_bytes);
2545 check_allocations(&allocator, &device_ranges).await;
2546
2547 device_ranges.clear();
2550
2551 let mut transaction =
2552 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2553 let target_bytes = 1500 * fs.block_size();
2554 while device_ranges.iter().map(|x| x.length().unwrap()).sum::<u64>() != target_bytes {
2555 let len = std::cmp::min(
2556 target_bytes - device_ranges.iter().map(|x| x.length().unwrap()).sum::<u64>(),
2557 100 * fs.block_size(),
2558 );
2559 device_ranges.push(
2560 allocator.allocate(&mut transaction, 100, len).await.expect("allocate failed"),
2561 );
2562 }
2563 transaction.commit().await.expect("commit failed");
2564
2565 allocator.flush().await.expect("flush failed");
2567
2568 assert!(!allocator.get_owner_allocated_bytes().contains_key(&STORE_OBJECT_ID));
2572 assert_eq!(*allocator.get_owner_allocated_bytes().get(&100).unwrap(), target_bytes,);
2573 }
2574
2575 async fn create_file(store: &Arc<ObjectStore>, size: usize) {
2576 let root_directory =
2577 Directory::open(store, store.root_directory_object_id()).await.expect("open failed");
2578
2579 let mut transaction = store
2580 .filesystem()
2581 .new_transaction(
2582 lock_keys![LockKey::object(
2583 store.store_object_id(),
2584 store.root_directory_object_id()
2585 )],
2586 Options::default(),
2587 )
2588 .await
2589 .expect("new_transaction failed");
2590 let file = root_directory
2591 .create_child_file(&mut transaction, &format!("foo {}", size))
2592 .await
2593 .expect("create_child_file failed");
2594 transaction.commit().await.expect("commit failed");
2595
2596 let buffer = file.allocate_buffer(size).await;
2597
2598 let mut transaction = file
2600 .new_transaction_with_options(Options {
2601 borrow_metadata_space: true,
2602 ..Default::default()
2603 })
2604 .await
2605 .expect("new_transaction_with_options failed");
2606 file.txn_write(&mut transaction, 0, buffer.as_ref()).await.expect("txn_write failed");
2607 transaction.commit().await.expect("commit failed");
2608 }
2609
2610 #[fuchsia::test]
2611 async fn test_replay_with_deleted_store_and_compaction() {
2612 let (fs, _) = test_fs().await;
2613
2614 const FILE_SIZE: usize = 10_000_000;
2615
2616 let mut store_id = {
2617 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2618 let store = root_vol
2619 .new_volume("vol", NewChildStoreOptions::default())
2620 .await
2621 .expect("new_volume failed");
2622
2623 create_file(&store, FILE_SIZE).await;
2624 store.store_object_id()
2625 };
2626
2627 fs.close().await.expect("close failed");
2628 let device = fs.take_device().await;
2629 device.reopen(false);
2630
2631 let mut fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2632
2633 fs.journal().compact().await.expect("compact failed");
2636
2637 for _ in 0..2 {
2638 {
2639 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2640
2641 let transaction = fs
2642 .clone()
2643 .new_transaction(
2644 lock_keys![
2645 LockKey::object(
2646 root_vol.volume_directory().store().store_object_id(),
2647 root_vol.volume_directory().object_id(),
2648 ),
2649 LockKey::flush(store_id)
2650 ],
2651 Options { borrow_metadata_space: true, ..Default::default() },
2652 )
2653 .await
2654 .expect("new_transaction failed");
2655 root_vol
2656 .delete_volume("vol", transaction, || {})
2657 .await
2658 .expect("delete_volume failed");
2659
2660 let store = root_vol
2661 .new_volume("vol", NewChildStoreOptions::default())
2662 .await
2663 .expect("new_volume failed");
2664 create_file(&store, FILE_SIZE).await;
2665 store_id = store.store_object_id();
2666 }
2667
2668 fs.close().await.expect("close failed");
2669 let device = fs.take_device().await;
2670 device.reopen(false);
2671
2672 fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2673 }
2674
2675 fsck(fs.clone()).await.expect("fsck failed");
2676 fs.close().await.expect("close failed");
2677 }
2678
2679 #[fuchsia::test(threads = 4)]
2680 async fn test_compaction_delete_race() {
2681 let (fs, _allocator) = test_fs().await;
2682
2683 {
2684 const FILE_SIZE: usize = 10_000_000;
2685
2686 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2687 let store = root_vol
2688 .new_volume("vol", NewChildStoreOptions::default())
2689 .await
2690 .expect("new_volume failed");
2691
2692 create_file(&store, FILE_SIZE).await;
2693
2694 let fs_clone = fs.clone();
2696
2697 let executor_tasks = testing::force_executor_threads_to_run(4).await;
2700
2701 let task = fasync::Task::spawn(async move {
2702 fs_clone.journal().compact().await.expect("compact failed");
2703 });
2704
2705 drop(executor_tasks);
2707
2708 let sleep = rand::random_range(3000..6000);
2711 std::thread::sleep(std::time::Duration::from_micros(sleep));
2712 log::info!("sleep {sleep}us");
2713
2714 let transaction = fs
2715 .clone()
2716 .new_transaction(
2717 lock_keys![
2718 LockKey::object(
2719 root_vol.volume_directory().store().store_object_id(),
2720 root_vol.volume_directory().object_id(),
2721 ),
2722 LockKey::flush(store.store_object_id())
2723 ],
2724 Options { borrow_metadata_space: true, ..Default::default() },
2725 )
2726 .await
2727 .expect("new_transaction failed");
2728 root_vol.delete_volume("vol", transaction, || {}).await.expect("delete_volume failed");
2729
2730 task.await;
2731 }
2732
2733 fs.journal().compact().await.expect("compact failed");
2734 fs.close().await.expect("close failed");
2735
2736 let device = fs.take_device().await;
2737 device.reopen(false);
2738
2739 let fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2740 fsck(fs.clone()).await.expect("fsck failed");
2741 fs.close().await.expect("close failed");
2742 }
2743
2744 #[fuchsia::test]
2745 async fn test_delete_multiple_volumes() {
2746 let (mut fs, _) = test_fs().await;
2747
2748 for _ in 0..50 {
2749 {
2750 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2751 let store = root_vol
2752 .new_volume("vol", NewChildStoreOptions::default())
2753 .await
2754 .expect("new_volume failed");
2755
2756 create_file(&store, 1_000_000).await;
2757
2758 let transaction = fs
2759 .clone()
2760 .new_transaction(
2761 lock_keys![
2762 LockKey::object(
2763 root_vol.volume_directory().store().store_object_id(),
2764 root_vol.volume_directory().object_id(),
2765 ),
2766 LockKey::flush(store.store_object_id())
2767 ],
2768 Options { borrow_metadata_space: true, ..Default::default() },
2769 )
2770 .await
2771 .expect("new_transaction failed");
2772 root_vol
2773 .delete_volume("vol", transaction, || {})
2774 .await
2775 .expect("delete_volume failed");
2776
2777 fs.allocator().flush().await.expect("flush failed");
2778 }
2779
2780 fs.close().await.expect("close failed");
2781 let device = fs.take_device().await;
2782 device.reopen(false);
2783
2784 fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2785 }
2786
2787 fsck(fs.clone()).await.expect("fsck failed");
2788 fs.close().await.expect("close failed");
2789 }
2790
2791 #[fuchsia::test]
2792 async fn test_allocate_free_reallocate() {
2793 const STORE_OBJECT_ID: u64 = 99;
2794 let (fs, allocator) = test_fs().await;
2795
2796 let mut device_ranges = Vec::new();
2798 let mut transaction =
2799 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2800 for _ in 0..30 {
2801 device_ranges.push(
2802 allocator
2803 .allocate(&mut transaction, STORE_OBJECT_ID, 100 * fs.block_size())
2804 .await
2805 .expect("allocate failed"),
2806 );
2807 }
2808 transaction.commit().await.expect("commit failed");
2809
2810 assert_eq!(
2811 fs.block_size() * 3000,
2812 *allocator.get_owner_allocated_bytes().entry(STORE_OBJECT_ID).or_default()
2813 );
2814
2815 let mut transaction =
2817 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2818 for range in std::mem::replace(&mut device_ranges, Vec::new()) {
2819 allocator.deallocate(&mut transaction, STORE_OBJECT_ID, range).await.expect("dealloc");
2820 }
2821 transaction.commit().await.expect("commit failed");
2822
2823 assert_eq!(0, *allocator.get_owner_allocated_bytes().entry(STORE_OBJECT_ID).or_default());
2824
2825 let mut transaction =
2828 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2829 let target_len = 1500 * fs.block_size();
2830 while device_ranges.iter().map(|i| i.length().unwrap()).sum::<u64>() != target_len {
2831 let len = target_len - device_ranges.iter().map(|i| i.length().unwrap()).sum::<u64>();
2832 device_ranges.push(
2833 allocator
2834 .allocate(&mut transaction, STORE_OBJECT_ID, len)
2835 .await
2836 .expect("allocate failed"),
2837 );
2838 }
2839 transaction.commit().await.expect("commit failed");
2840
2841 assert_eq!(
2842 fs.block_size() * 1500,
2843 *allocator.get_owner_allocated_bytes().entry(STORE_OBJECT_ID).or_default()
2844 );
2845 }
2846
2847 #[fuchsia::test]
2848 async fn test_flush() {
2849 const STORE_OBJECT_ID: u64 = 99;
2850
2851 let mut device_ranges = Vec::new();
2852 let device = {
2853 let (fs, allocator) = test_fs().await;
2854 let mut transaction = fs
2855 .clone()
2856 .new_transaction(lock_keys![], Options::default())
2857 .await
2858 .expect("new failed");
2859 device_ranges.push(
2860 allocator
2861 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2862 .await
2863 .expect("allocate failed"),
2864 );
2865 device_ranges.push(
2866 allocator
2867 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2868 .await
2869 .expect("allocate failed"),
2870 );
2871 device_ranges.push(
2872 allocator
2873 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2874 .await
2875 .expect("allocate failed"),
2876 );
2877 transaction.commit().await.expect("commit failed");
2878
2879 allocator.flush().await.expect("flush failed");
2880
2881 fs.close().await.expect("close failed");
2882 fs.take_device().await
2883 };
2884
2885 device.reopen(false);
2886 let fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2887 let allocator = fs.allocator();
2888
2889 let allocated = collect_allocations(&allocator).await;
2890
2891 for i in &device_ranges {
2893 let mut overlapping = 0;
2894 for j in &allocated {
2895 overlapping += overlap(i, j);
2896 }
2897 assert_eq!(overlapping, i.length().unwrap(), "Range {i:?} not allocated");
2898 }
2899
2900 let mut transaction =
2901 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2902 let range = allocator
2903 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2904 .await
2905 .expect("allocate failed");
2906
2907 for r in &allocated {
2909 assert_eq!(overlap(r, &range), 0);
2910 }
2911 transaction.commit().await.expect("commit failed");
2912 }
2913
2914 #[fuchsia::test]
2915 async fn test_dropped_transaction() {
2916 const STORE_OBJECT_ID: u64 = 99;
2917 let (fs, allocator) = test_fs().await;
2918 let allocated_range = {
2919 let mut transaction = fs
2920 .clone()
2921 .new_transaction(lock_keys![], Options::default())
2922 .await
2923 .expect("new_transaction failed");
2924 allocator
2925 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2926 .await
2927 .expect("allocate failed")
2928 };
2929 let mut transaction = fs
2932 .clone()
2933 .new_transaction(lock_keys![], Options::default())
2934 .await
2935 .expect("new_transaction failed");
2936 assert_eq!(
2937 allocator
2938 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2939 .await
2940 .expect("allocate failed"),
2941 allocated_range
2942 );
2943 }
2944
2945 #[fuchsia::test]
2946 async fn test_cleanup_removed_owner() {
2947 const STORE_OBJECT_ID: u64 = 99;
2948 let device = {
2949 let (fs, allocator) = test_fs().await;
2950
2951 assert!(!allocator.get_owner_allocated_bytes().contains_key(&STORE_OBJECT_ID));
2952 {
2953 let mut transaction =
2954 fs.clone().new_transaction(lock_keys![], Options::default()).await.unwrap();
2955 allocator
2956 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2957 .await
2958 .expect("Allocating");
2959 transaction.commit().await.expect("Committing.");
2960 }
2961 allocator.flush().await.expect("Flushing");
2962 assert!(allocator.get_owner_allocated_bytes().contains_key(&STORE_OBJECT_ID));
2963 {
2964 let mut transaction =
2965 fs.clone().new_transaction(lock_keys![], Options::default()).await.unwrap();
2966 allocator.mark_for_deletion(&mut transaction, STORE_OBJECT_ID);
2967 transaction.commit().await.expect("Committing.");
2968 }
2969 assert!(!allocator.get_owner_allocated_bytes().contains_key(&STORE_OBJECT_ID));
2970 fs.close().await.expect("Closing");
2971 fs.take_device().await
2972 };
2973
2974 device.reopen(false);
2975 let fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2976 let allocator = fs.allocator();
2977 assert!(!allocator.get_owner_allocated_bytes().contains_key(&STORE_OBJECT_ID));
2978 }
2979
2980 #[fuchsia::test]
2981 async fn test_allocated_bytes() {
2982 const STORE_OBJECT_ID: u64 = 99;
2983 let (fs, allocator) = test_fs().await;
2984
2985 let initial_allocated_bytes = allocator.get_allocated_bytes();
2986
2987 let allocated_bytes = initial_allocated_bytes + fs.block_size();
2989 let allocated_range = {
2990 let mut transaction = fs
2991 .clone()
2992 .new_transaction(lock_keys![], Options::default())
2993 .await
2994 .expect("new_transaction failed");
2995 let range = allocator
2996 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2997 .await
2998 .expect("allocate failed");
2999 transaction.commit().await.expect("commit failed");
3000 assert_eq!(allocator.get_allocated_bytes(), allocated_bytes);
3001 range
3002 };
3003
3004 {
3005 let mut transaction = fs
3006 .clone()
3007 .new_transaction(lock_keys![], Options::default())
3008 .await
3009 .expect("new_transaction failed");
3010 allocator
3011 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
3012 .await
3013 .expect("allocate failed");
3014
3015 assert_eq!(allocator.get_allocated_bytes(), allocated_bytes);
3017 }
3018
3019 assert_eq!(allocator.get_allocated_bytes(), allocated_bytes);
3021
3022 let deallocate_range = allocated_range.start + 20..allocated_range.end - 20;
3024 let mut transaction =
3025 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
3026 allocator
3027 .deallocate(&mut transaction, STORE_OBJECT_ID, deallocate_range)
3028 .await
3029 .expect("deallocate failed");
3030
3031 assert_eq!(allocator.get_allocated_bytes(), allocated_bytes);
3033
3034 transaction.commit().await.expect("commit failed");
3035
3036 assert_eq!(allocator.get_allocated_bytes(), initial_allocated_bytes + 40);
3038 }
3039
3040 #[fuchsia::test]
3041 async fn test_persist_bytes_limit() {
3042 const LIMIT: u64 = 12345;
3043 const OWNER_ID: u64 = 12;
3044
3045 let (fs, allocator) = test_fs().await;
3046 {
3047 let mut transaction = fs
3048 .clone()
3049 .new_transaction(lock_keys![], Options::default())
3050 .await
3051 .expect("new_transaction failed");
3052 allocator
3053 .set_bytes_limit(&mut transaction, OWNER_ID, LIMIT)
3054 .expect("Failed to set limit.");
3055 assert!(allocator.inner.lock().info.limit_bytes.get(&OWNER_ID).is_none());
3056 transaction.commit().await.expect("Failed to commit transaction");
3057 let bytes: u64 = *allocator
3058 .inner
3059 .lock()
3060 .info
3061 .limit_bytes
3062 .get(&OWNER_ID)
3063 .expect("Failed to find limit");
3064 assert_eq!(LIMIT, bytes);
3065 }
3066 }
3067
3068 fn coalesce_ranges(ranges: Vec<Range<u64>>) -> Vec<Range<u64>> {
3072 let mut coalesced = Vec::new();
3073 let mut prev: Option<Range<u64>> = None;
3074 for range in ranges {
3075 if let Some(prev_range) = &mut prev {
3076 if range.start == prev_range.end {
3077 prev_range.end = range.end;
3078 } else {
3079 coalesced.push(prev_range.clone());
3080 prev = Some(range);
3081 }
3082 } else {
3083 prev = Some(range);
3084 }
3085 }
3086 if let Some(prev_range) = prev {
3087 coalesced.push(prev_range);
3088 }
3089 coalesced
3090 }
3091
3092 #[fuchsia::test]
3093 async fn test_take_for_trimming() {
3094 const STORE_OBJECT_ID: u64 = 99;
3095
3096 let allocated_range;
3099 let expected_free_ranges;
3100 let device = {
3101 let (fs, allocator) = test_fs().await;
3102 let bs = fs.block_size();
3103 let mut transaction = fs
3104 .clone()
3105 .new_transaction(lock_keys![], Options::default())
3106 .await
3107 .expect("new failed");
3108 allocated_range = allocator
3109 .allocate(&mut transaction, STORE_OBJECT_ID, 32 * bs)
3110 .await
3111 .expect("allocate failed");
3112 transaction.commit().await.expect("commit failed");
3113
3114 let mut transaction = fs
3115 .clone()
3116 .new_transaction(lock_keys![], Options::default())
3117 .await
3118 .expect("new failed");
3119 let base = allocated_range.start;
3120 expected_free_ranges = vec![
3121 base..(base + (bs * 1)),
3122 (base + (bs * 2))..(base + (bs * 3)),
3123 (base + (bs * 4))..(base + (bs * 8)),
3127 (base + (bs * 8))..(base + (bs * 12)),
3128 (base + (bs * 12))..(base + (bs * 13)),
3129 (base + (bs * 29))..(base + (bs * 30)),
3130 ];
3131 for range in &expected_free_ranges {
3132 allocator
3133 .deallocate(&mut transaction, STORE_OBJECT_ID, range.clone())
3134 .await
3135 .expect("deallocate failed");
3136 }
3137 transaction.commit().await.expect("commit failed");
3138
3139 allocator.flush().await.expect("flush failed");
3140
3141 fs.close().await.expect("close failed");
3142 fs.take_device().await
3143 };
3144
3145 device.reopen(false);
3146 let fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
3147 let allocator = fs.allocator();
3148
3149 let max_extent_size = fs.block_size() as usize * 4;
3153 const EXTENTS_PER_BATCH: usize = 2;
3154 let mut free_ranges = vec![];
3155 let mut offset = allocated_range.start;
3156 while offset < allocated_range.end {
3157 let free = allocator
3158 .take_for_trimming(offset, max_extent_size, EXTENTS_PER_BATCH)
3159 .await
3160 .expect("take_for_trimming failed");
3161 free_ranges.extend(
3162 free.extents().iter().filter(|range| range.end <= allocated_range.end).cloned(),
3163 );
3164 offset = free.extents().last().expect("Unexpectedly hit the end of free extents").end;
3165 }
3166 let coalesced_free_ranges = coalesce_ranges(free_ranges);
3169 let coalesced_expected_free_ranges = coalesce_ranges(expected_free_ranges);
3170
3171 assert_eq!(coalesced_free_ranges, coalesced_expected_free_ranges);
3172 }
3173
3174 #[fuchsia::test]
3175 async fn test_allocations_wait_for_free_extents() {
3176 const STORE_OBJECT_ID: u64 = 99;
3177 let (fs, allocator) = test_fs().await;
3178 let allocator_clone = allocator.clone();
3179
3180 let mut transaction =
3181 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
3182
3183 let max_extent_size = fs.device().size() as usize;
3185 const EXTENTS_PER_BATCH: usize = usize::MAX;
3186
3187 let trim_done = Arc::new(Mutex::new(false));
3193 let trimmable_extents = allocator
3194 .take_for_trimming(0, max_extent_size, EXTENTS_PER_BATCH)
3195 .await
3196 .expect("take_for_trimming failed");
3197
3198 let trim_done_clone = trim_done.clone();
3199 let bs = fs.block_size();
3200 let alloc_task = fasync::Task::spawn(async move {
3201 allocator_clone
3202 .allocate(&mut transaction, STORE_OBJECT_ID, bs)
3203 .await
3204 .expect("allocate should fail");
3205 {
3206 assert!(*trim_done_clone.lock(), "Allocation finished before trim completed");
3207 }
3208 transaction.commit().await.expect("commit failed");
3209 });
3210
3211 fasync::Timer::new(std::time::Duration::from_millis(100)).await;
3214
3215 {
3217 let mut trim_done = trim_done.lock();
3218 std::mem::drop(trimmable_extents);
3219 *trim_done = true;
3220 }
3221
3222 alloc_task.await;
3223 }
3224
3225 #[fuchsia::test]
3226 async fn test_allocation_with_reservation_not_multiple_of_block_size() {
3227 const STORE_OBJECT_ID: u64 = 99;
3228 let (fs, allocator) = test_fs().await;
3229
3230 const RESERVATION_AMOUNT: u64 = TRANSACTION_METADATA_MAX_AMOUNT + 5000;
3232 let reservation =
3233 allocator.clone().reserve(Some(STORE_OBJECT_ID), RESERVATION_AMOUNT).unwrap();
3234
3235 let mut transaction = fs
3236 .clone()
3237 .new_transaction(
3238 lock_keys![],
3239 Options { allocator_reservation: Some(&reservation), ..Options::default() },
3240 )
3241 .await
3242 .expect("new failed");
3243
3244 let range = allocator
3245 .allocate(
3246 &mut transaction,
3247 STORE_OBJECT_ID,
3248 round_up(RESERVATION_AMOUNT, fs.block_size()).unwrap(),
3249 )
3250 .await
3251 .expect("allocate faiiled");
3252 assert_eq!((range.end - range.start) % fs.block_size(), 0);
3253
3254 println!("{}", range.end - range.start);
3255 }
3256}