1pub mod merge;
90pub mod strategy;
91
92use crate::drop_event::DropEvent;
93use crate::errors::FxfsError;
94use crate::filesystem::{ApplyContext, ApplyMode, FxFilesystem, JournalingObject, SyncOptions};
95use crate::log::*;
96use crate::lsm_tree::cache::NullCache;
97use crate::lsm_tree::skip_list_layer::SkipListLayer;
98use crate::lsm_tree::types::{
99 FuzzyHash, Item, ItemRef, Layer, LayerIterator, LayerKey, MergeType, OrdLowerBound,
100 OrdUpperBound, RangeKey, SortByU64, Value,
101};
102use crate::lsm_tree::{LSMTree, Query, layers_from_handles};
103use crate::object_handle::{INVALID_OBJECT_ID, ObjectHandle, ReadObjectHandle};
104use crate::object_store::object_manager::ReservationUpdate;
105use crate::object_store::transaction::{
106 AllocatorMutation, AssocObj, LockKey, Mutation, Options, Transaction, WriteGuard, lock_keys,
107};
108use crate::object_store::{DataObjectHandle, DirectWriter, HandleOptions, ObjectStore, tree};
109use crate::range::RangeExt;
110use crate::round::{round_div, round_down, round_up};
111use crate::serialized_types::{
112 DEFAULT_MAX_SERIALIZED_RECORD_SIZE, LATEST_VERSION, Version, Versioned, VersionedLatest,
113};
114use anyhow::{Context, Error, anyhow, bail, ensure};
115use async_trait::async_trait;
116use either::Either::{Left, Right};
117use event_listener::EventListener;
118use fprint::TypeFingerprint;
119use fuchsia_inspect::HistogramProperty;
120use fuchsia_sync::Mutex;
121use futures::FutureExt;
122use merge::{filter_marked_for_deletion, filter_tombstones, merge};
123use serde::{Deserialize, Serialize};
124use std::borrow::Borrow;
125use std::collections::{BTreeMap, HashSet, VecDeque};
126use std::hash::Hash;
127use std::marker::PhantomData;
128use std::num::Saturating;
129use std::ops::Range;
130use std::sync::atomic::{AtomicU64, Ordering};
131use std::sync::{Arc, Weak};
132
133pub trait ReservationOwner: Send + Sync {
135 fn release_reservation(&self, owner_object_id: Option<u64>, amount: u64);
139}
140
141pub struct ReservationImpl<T: Borrow<U>, U: ReservationOwner + ?Sized> {
149 owner: T,
150 owner_object_id: Option<u64>,
151 inner: Mutex<ReservationInner>,
152 phantom: PhantomData<U>,
153}
154
155#[derive(Debug, Default)]
156struct ReservationInner {
157 amount: u64,
159
160 reserved: u64,
162}
163
164impl<T: Borrow<U>, U: ReservationOwner + ?Sized> std::fmt::Debug for ReservationImpl<T, U> {
165 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
166 self.inner.lock().fmt(f)
167 }
168}
169
170impl<T: Borrow<U> + Clone + Send + Sync, U: ReservationOwner + ?Sized> ReservationImpl<T, U> {
171 pub fn new(owner: T, owner_object_id: Option<u64>, amount: u64) -> Self {
172 Self {
173 owner,
174 owner_object_id,
175 inner: Mutex::new(ReservationInner { amount, reserved: 0 }),
176 phantom: PhantomData,
177 }
178 }
179
180 pub fn owner_object_id(&self) -> Option<u64> {
181 self.owner_object_id
182 }
183
184 pub fn amount(&self) -> u64 {
186 self.inner.lock().amount
187 }
188
189 pub fn add(&self, amount: u64) {
191 self.inner.lock().amount += amount;
192 }
193
194 pub fn forget(&self) -> u64 {
198 let mut inner = self.inner.lock();
199 assert_eq!(inner.reserved, 0);
200 std::mem::take(&mut inner.amount)
201 }
202
203 pub fn forget_some(&self, amount: u64) {
207 let mut inner = self.inner.lock();
208 inner.amount -= amount;
209 assert!(inner.reserved <= inner.amount);
210 }
211
212 fn reserve_with(&self, amount: impl FnOnce(u64) -> u64) -> ReservationImpl<&Self, Self> {
215 let mut inner = self.inner.lock();
216 let taken = amount(inner.amount - inner.reserved);
217 inner.reserved += taken;
218 ReservationImpl::new(self, self.owner_object_id, taken)
219 }
220
221 pub fn reserve(&self, amount: u64) -> Option<ReservationImpl<&Self, Self>> {
223 let mut inner = self.inner.lock();
224 if inner.amount - inner.reserved < amount {
225 None
226 } else {
227 inner.reserved += amount;
228 Some(ReservationImpl::new(self, self.owner_object_id, amount))
229 }
230 }
231
232 pub fn commit(&self, amount: u64) {
235 let mut inner = self.inner.lock();
236 inner.reserved -= amount;
237 inner.amount -= amount;
238 }
239
240 pub fn give_back(&self, amount: u64) {
242 self.owner.borrow().release_reservation(self.owner_object_id, amount);
243 let mut inner = self.inner.lock();
244 inner.amount -= amount;
245 assert!(inner.reserved <= inner.amount);
246 }
247
248 pub fn move_to<V: Borrow<W> + Clone + Send + Sync, W: ReservationOwner + ?Sized>(
250 &self,
251 other: &ReservationImpl<V, W>,
252 amount: u64,
253 ) {
254 assert_eq!(self.owner_object_id, other.owner_object_id());
255 let mut inner = self.inner.lock();
256 if let Some(amount) = inner.amount.checked_sub(amount) {
257 inner.amount = amount;
258 } else {
259 std::mem::drop(inner);
260 panic!("Insufficient reservation space");
261 }
262 other.add(amount);
263 }
264}
265
266impl<T: Borrow<U>, U: ReservationOwner + ?Sized> Drop for ReservationImpl<T, U> {
267 fn drop(&mut self) {
268 let inner = self.inner.get_mut();
269 assert_eq!(inner.reserved, 0);
270 let owner_object_id = self.owner_object_id;
271 if inner.amount > 0 {
272 self.owner
273 .borrow()
274 .release_reservation(owner_object_id, std::mem::take(&mut inner.amount));
275 }
276 }
277}
278
279impl<T: Borrow<U> + Send + Sync, U: ReservationOwner + ?Sized> ReservationOwner
280 for ReservationImpl<T, U>
281{
282 fn release_reservation(&self, owner_object_id: Option<u64>, amount: u64) {
283 assert_eq!(owner_object_id, self.owner_object_id);
285 let mut inner = self.inner.lock();
286 assert!(inner.reserved >= amount, "{} >= {}", inner.reserved, amount);
287 inner.reserved -= amount;
288 }
289}
290
291pub type Reservation = ReservationImpl<Arc<dyn ReservationOwner>, dyn ReservationOwner>;
292
293pub type Hold<'a> = ReservationImpl<&'a Reservation, Reservation>;
294
295pub type AllocatorKey = AllocatorKeyV32;
298
299#[derive(Clone, Debug, Deserialize, Eq, Hash, PartialEq, Serialize, TypeFingerprint, Versioned)]
300#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
301pub struct AllocatorKeyV32 {
302 pub device_range: Range<u64>,
303}
304
305impl SortByU64 for AllocatorKey {
306 fn get_leading_u64(&self) -> u64 {
307 self.device_range.start
308 }
309}
310
311const EXTENT_HASH_BUCKET_SIZE: u64 = 1 * 1024 * 1024;
312
313pub struct AllocatorKeyPartitionIterator {
314 device_range: Range<u64>,
315}
316
317impl Iterator for AllocatorKeyPartitionIterator {
318 type Item = u64;
319
320 fn next(&mut self) -> Option<Self::Item> {
321 if self.device_range.start >= self.device_range.end {
322 None
323 } else {
324 let start = self.device_range.start;
325 self.device_range.start = start.saturating_add(EXTENT_HASH_BUCKET_SIZE);
326 let end = std::cmp::min(self.device_range.start, self.device_range.end);
327 let key = AllocatorKey { device_range: start..end };
328 let hash = crate::stable_hash::stable_hash(key);
329 Some(hash)
330 }
331 }
332}
333
334impl FuzzyHash for AllocatorKey {
335 fn fuzzy_hash(&self) -> impl Iterator<Item = u64> {
336 AllocatorKeyPartitionIterator {
337 device_range: round_down(self.device_range.start, EXTENT_HASH_BUCKET_SIZE)
338 ..round_up(self.device_range.end, EXTENT_HASH_BUCKET_SIZE).unwrap_or(u64::MAX),
339 }
340 }
341
342 fn is_range_key(&self) -> bool {
343 true
344 }
345}
346
347impl AllocatorKey {
348 pub fn lower_bound_for_merge_into(self: &AllocatorKey) -> AllocatorKey {
350 AllocatorKey { device_range: 0..self.device_range.start }
351 }
352}
353
354impl LayerKey for AllocatorKey {
355 fn merge_type(&self) -> MergeType {
356 MergeType::OptimizedMerge
357 }
358}
359
360impl OrdUpperBound for AllocatorKey {
361 fn cmp_upper_bound(&self, other: &AllocatorKey) -> std::cmp::Ordering {
362 self.device_range.end.cmp(&other.device_range.end)
363 }
364}
365
366impl OrdLowerBound for AllocatorKey {
367 fn cmp_lower_bound(&self, other: &AllocatorKey) -> std::cmp::Ordering {
368 self.device_range
373 .start
374 .cmp(&other.device_range.start)
375 .then(self.device_range.end.cmp(&other.device_range.end))
376 }
377}
378
379impl Ord for AllocatorKey {
380 fn cmp(&self, other: &AllocatorKey) -> std::cmp::Ordering {
381 self.device_range
382 .start
383 .cmp(&other.device_range.start)
384 .then(self.device_range.end.cmp(&other.device_range.end))
385 }
386}
387
388impl PartialOrd for AllocatorKey {
389 fn partial_cmp(&self, other: &AllocatorKey) -> Option<std::cmp::Ordering> {
390 Some(self.cmp(other))
391 }
392}
393
394impl RangeKey for AllocatorKey {
395 fn overlaps(&self, other: &Self) -> bool {
396 self.device_range.start < other.device_range.end
397 && self.device_range.end > other.device_range.start
398 }
399}
400
401pub type AllocatorValue = AllocatorValueV32;
404impl Value for AllocatorValue {
405 const DELETED_MARKER: Self = Self::None;
406}
407
408#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize, TypeFingerprint, Versioned)]
409#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
410pub enum AllocatorValueV32 {
411 None,
413 Abs { count: u64, owner_object_id: u64 },
417}
418
419pub type AllocatorItem = Item<AllocatorKey, AllocatorValue>;
420
421pub type AllocatorInfo = AllocatorInfoV32;
423
424#[derive(Debug, Default, Clone, Deserialize, Serialize, TypeFingerprint, Versioned)]
425pub struct AllocatorInfoV32 {
426 pub layers: Vec<u64>,
428 pub allocated_bytes: BTreeMap<u64, u64>,
430 pub marked_for_deletion: HashSet<u64>,
433 pub limit_bytes: BTreeMap<u64, u64>,
436}
437
438const MAX_ALLOCATOR_INFO_SERIALIZED_SIZE: usize = 131_072;
439
440pub fn max_extent_size_for_block_size(block_size: u64) -> u64 {
442 block_size * (DEFAULT_MAX_SERIALIZED_RECORD_SIZE - 64) / 9
447}
448
449#[derive(Default)]
450struct AllocatorCounters {
451 num_flushes: u64,
452 last_flush_time: Option<std::time::SystemTime>,
453}
454
455pub struct Allocator {
456 filesystem: Weak<FxFilesystem>,
457 block_size: u64,
458 device_size: u64,
459 object_id: u64,
460 max_extent_size_bytes: u64,
461 tree: LSMTree<AllocatorKey, AllocatorValue>,
462 temporary_allocations: Arc<SkipListLayer<AllocatorKey, AllocatorValue>>,
469 inner: Mutex<Inner>,
470 allocation_mutex: futures::lock::Mutex<()>,
471 counters: Mutex<AllocatorCounters>,
472 maximum_offset: AtomicU64,
473}
474
475#[derive(Debug, Default, PartialEq)]
477struct ByteTracking {
478 allocated_bytes: Saturating<u64>,
485
486 uncommitted_allocated_bytes: u64,
489
490 reserved_bytes: u64,
492
493 committed_deallocated_bytes: u64,
497}
498
499impl ByteTracking {
500 fn used_bytes(&self) -> Saturating<u64> {
503 self.allocated_bytes + Saturating(self.uncommitted_allocated_bytes + self.reserved_bytes)
504 }
505
506 fn unavailable_bytes(&self) -> Saturating<u64> {
511 self.allocated_bytes
512 + Saturating(self.uncommitted_allocated_bytes)
513 + Saturating(self.committed_deallocated_bytes)
514 }
515
516 fn unavailable_after_sync_bytes(&self) -> Saturating<u64> {
519 self.allocated_bytes + Saturating(self.uncommitted_allocated_bytes)
520 }
521}
522
523#[derive(Debug)]
524struct CommittedDeallocation {
525 log_file_offset: u64,
527 range: Range<u64>,
529 owner_object_id: u64,
531}
532
533struct Inner {
534 info: AllocatorInfo,
535
536 opened: bool,
539
540 dropped_temporary_allocations: Vec<Range<u64>>,
568
569 owner_bytes: BTreeMap<u64, ByteTracking>,
572
573 unattributed_reserved_bytes: u64,
576
577 committed_deallocated: VecDeque<CommittedDeallocation>,
579
580 trim_reserved_bytes: u64,
583
584 trim_listener: Option<EventListener>,
588
589 strategy: strategy::BestFit,
591
592 allocation_size_histogram: [u64; 64],
594 rebuild_strategy_trigger_histogram: [u64; 64],
596
597 marked_for_deletion: HashSet<u64>,
601
602 volumes_deleted_pending_sync: HashSet<u64>,
604}
605
606impl Inner {
607 fn allocated_bytes(&self) -> Saturating<u64> {
608 let mut total = Saturating(0);
609 for (_, bytes) in &self.owner_bytes {
610 total += bytes.allocated_bytes;
611 }
612 total
613 }
614
615 fn uncommitted_allocated_bytes(&self) -> u64 {
616 self.owner_bytes.values().map(|x| &x.uncommitted_allocated_bytes).sum()
617 }
618
619 fn reserved_bytes(&self) -> u64 {
620 self.owner_bytes.values().map(|x| &x.reserved_bytes).sum::<u64>()
621 + self.unattributed_reserved_bytes
622 }
623
624 fn owner_id_limit_bytes(&self, owner_object_id: u64) -> u64 {
625 match self.info.limit_bytes.get(&owner_object_id) {
626 Some(v) => *v,
627 None => u64::MAX,
628 }
629 }
630
631 fn owner_id_bytes_left(&self, owner_object_id: u64) -> u64 {
632 let limit = self.owner_id_limit_bytes(owner_object_id);
633 let used = self.owner_bytes.get(&owner_object_id).map_or(Saturating(0), |b| b.used_bytes());
634 (Saturating(limit) - used).0
635 }
636
637 fn unavailable_bytes(&self) -> Saturating<u64> {
642 let mut total = Saturating(0);
643 for (_, bytes) in &self.owner_bytes {
644 total += bytes.unavailable_bytes();
645 }
646 total
647 }
648
649 fn used_bytes(&self) -> Saturating<u64> {
652 let mut total = Saturating(0);
653 for (_, bytes) in &self.owner_bytes {
654 total += bytes.used_bytes();
655 }
656 total + Saturating(self.unattributed_reserved_bytes)
657 }
658
659 fn unavailable_after_sync_bytes(&self) -> Saturating<u64> {
662 let mut total = Saturating(0);
663 for (_, bytes) in &self.owner_bytes {
664 total += bytes.unavailable_after_sync_bytes();
665 }
666 total
667 }
668
669 fn bytes_available_not_being_trimmed(&self, device_size: u64) -> Result<u64, Error> {
672 device_size
673 .checked_sub(
674 (self.unavailable_after_sync_bytes() + Saturating(self.trim_reserved_bytes)).0,
675 )
676 .ok_or_else(|| anyhow!(FxfsError::Inconsistent))
677 }
678
679 fn add_reservation(&mut self, owner_object_id: Option<u64>, amount: u64) {
680 match owner_object_id {
681 Some(owner) => self.owner_bytes.entry(owner).or_default().reserved_bytes += amount,
682 None => self.unattributed_reserved_bytes += amount,
683 };
684 }
685
686 fn remove_reservation(&mut self, owner_object_id: Option<u64>, amount: u64) {
687 match owner_object_id {
688 Some(owner) => {
689 let owner_entry = self.owner_bytes.entry(owner).or_default();
690 assert!(
691 owner_entry.reserved_bytes >= amount,
692 "{} >= {}",
693 owner_entry.reserved_bytes,
694 amount
695 );
696 owner_entry.reserved_bytes -= amount;
697 }
698 None => {
699 assert!(
700 self.unattributed_reserved_bytes >= amount,
701 "{} >= {}",
702 self.unattributed_reserved_bytes,
703 amount
704 );
705 self.unattributed_reserved_bytes -= amount
706 }
707 };
708 }
709}
710
711pub struct TrimmableExtents<'a> {
714 allocator: &'a Allocator,
715 extents: Vec<Range<u64>>,
716 _drop_event: DropEvent,
720}
721
722impl<'a> TrimmableExtents<'a> {
723 pub fn extents(&self) -> &Vec<Range<u64>> {
724 &self.extents
725 }
726
727 fn new(allocator: &'a Allocator) -> (Self, EventListener) {
729 let drop_event = DropEvent::new();
730 let listener = drop_event.listen();
731 (Self { allocator, extents: vec![], _drop_event: drop_event }, listener)
732 }
733
734 fn add_extent(&mut self, extent: Range<u64>) {
735 self.extents.push(extent);
736 }
737}
738
739impl<'a> Drop for TrimmableExtents<'a> {
740 fn drop(&mut self) {
741 let mut inner = self.allocator.inner.lock();
742 for device_range in std::mem::take(&mut self.extents) {
743 inner.strategy.free(device_range.clone()).expect("drop trim extent");
744 self.allocator
745 .temporary_allocations
746 .erase(&AllocatorKey { device_range: device_range.clone() });
747 }
748 inner.trim_reserved_bytes = 0;
749 }
750}
751
752impl Allocator {
753 pub fn new(filesystem: Arc<FxFilesystem>, object_id: u64) -> Allocator {
754 let block_size = filesystem.block_size();
755 let device_size = round_down(filesystem.device().size(), block_size);
757 if device_size != filesystem.device().size() {
758 warn!("Device size is not block aligned. Rounding down.");
759 }
760 let max_extent_size_bytes = max_extent_size_for_block_size(filesystem.block_size());
761 let mut strategy = strategy::BestFit::default();
762 strategy.free(0..device_size).expect("new fs");
763 Allocator {
764 filesystem: Arc::downgrade(&filesystem),
765 block_size,
766 device_size,
767 object_id,
768 max_extent_size_bytes,
769 tree: LSMTree::new(merge, Box::new(NullCache {})),
770 temporary_allocations: SkipListLayer::new(1024),
771 inner: Mutex::new(Inner {
772 info: AllocatorInfo::default(),
773 opened: false,
774 dropped_temporary_allocations: Vec::new(),
775 owner_bytes: BTreeMap::new(),
776 unattributed_reserved_bytes: 0,
777 committed_deallocated: VecDeque::new(),
778 trim_reserved_bytes: 0,
779 trim_listener: None,
780 strategy,
781 allocation_size_histogram: [0; 64],
782 rebuild_strategy_trigger_histogram: [0; 64],
783 marked_for_deletion: HashSet::new(),
784 volumes_deleted_pending_sync: HashSet::new(),
785 }),
786 allocation_mutex: futures::lock::Mutex::new(()),
787 counters: Mutex::new(AllocatorCounters::default()),
788 maximum_offset: AtomicU64::new(0),
789 }
790 }
791
792 pub fn tree(&self) -> &LSMTree<AllocatorKey, AllocatorValue> {
793 &self.tree
794 }
795
796 pub async fn filter(
801 &self,
802 iter: impl LayerIterator<AllocatorKey, AllocatorValue>,
803 committed_marked_for_deletion: bool,
804 ) -> Result<impl LayerIterator<AllocatorKey, AllocatorValue>, Error> {
805 let marked_for_deletion = {
806 let inner = self.inner.lock();
807 if committed_marked_for_deletion {
808 &inner.info.marked_for_deletion
809 } else {
810 &inner.marked_for_deletion
811 }
812 .clone()
813 };
814 let iter =
815 filter_marked_for_deletion(filter_tombstones(iter).await?, marked_for_deletion).await?;
816 Ok(iter)
817 }
818
819 pub fn allocation_size_histogram(&self) -> [u64; 64] {
823 self.inner.lock().allocation_size_histogram
824 }
825
826 pub async fn create(&self, transaction: &mut Transaction<'_>) -> Result<(), Error> {
828 assert_eq!(std::mem::replace(&mut self.inner.lock().opened, true), false);
831
832 let filesystem = self.filesystem.upgrade().unwrap();
833 let root_store = filesystem.root_store();
834 ObjectStore::create_object_with_id(
835 &root_store,
836 transaction,
837 self.object_id(),
838 HandleOptions::default(),
839 None,
840 )?;
841 root_store.update_last_object_id(self.object_id());
842 Ok(())
843 }
844
845 pub async fn open(self: &Arc<Self>) -> Result<(), Error> {
848 let filesystem = self.filesystem.upgrade().unwrap();
849 let root_store = filesystem.root_store();
850
851 self.inner.lock().strategy = strategy::BestFit::default();
852
853 let handle =
854 ObjectStore::open_object(&root_store, self.object_id, HandleOptions::default(), None)
855 .await
856 .context("Failed to open allocator object")?;
857
858 if handle.get_size() > 0 {
859 let serialized_info = handle
860 .contents(MAX_ALLOCATOR_INFO_SERIALIZED_SIZE)
861 .await
862 .context("Failed to read AllocatorInfo")?;
863 let mut cursor = std::io::Cursor::new(serialized_info);
864 let (info, _version) = AllocatorInfo::deserialize_with_version(&mut cursor)
865 .context("Failed to deserialize AllocatorInfo")?;
866
867 let mut handles = Vec::new();
868 let mut total_size = 0;
869 for object_id in &info.layers {
870 let handle = ObjectStore::open_object(
871 &root_store,
872 *object_id,
873 HandleOptions::default(),
874 None,
875 )
876 .await
877 .context("Failed to open allocator layer file")?;
878
879 let size = handle.get_size();
880 total_size += size;
881 handles.push(handle);
882 }
883
884 {
885 let mut inner = self.inner.lock();
886
887 let mut device_bytes = self.device_size;
889 for (&owner_object_id, &bytes) in &info.allocated_bytes {
890 ensure!(
891 bytes <= device_bytes,
892 anyhow!(FxfsError::Inconsistent).context(format!(
893 "Allocated bytes exceeds device size: {:?}",
894 info.allocated_bytes
895 ))
896 );
897 device_bytes -= bytes;
898
899 inner.owner_bytes.entry(owner_object_id).or_default().allocated_bytes =
900 Saturating(bytes);
901 }
902
903 inner.info = info;
904 }
905
906 self.tree.append_layers(handles).await.context("Failed to append allocator layers")?;
907 self.filesystem.upgrade().unwrap().object_manager().update_reservation(
908 self.object_id,
909 tree::reservation_amount_from_layer_size(total_size),
910 );
911 }
912
913 Ok(())
914 }
915
916 pub async fn on_replay_complete(self: &Arc<Self>) -> Result<(), Error> {
917 {
919 let mut inner = self.inner.lock();
920 inner.volumes_deleted_pending_sync.clear();
921 inner.marked_for_deletion = inner.info.marked_for_deletion.clone();
922 }
923
924 if !self.rebuild_strategy().await.context("Build free extents")? {
928 error!("Device contains no free space.");
929 return Err(FxfsError::Inconsistent).context("Device appears to contain no free space");
930 }
931
932 assert_eq!(std::mem::replace(&mut self.inner.lock().opened, true), false);
933 Ok(())
934 }
935
936 async fn rebuild_strategy(self: &Arc<Self>) -> Result<bool, Error> {
941 let mut changed = false;
942 let mut layer_set = self.tree.empty_layer_set();
943 layer_set.layers.push((self.temporary_allocations.clone() as Arc<dyn Layer<_, _>>).into());
944 self.tree.add_all_layers_to_layer_set(&mut layer_set);
945
946 let overflow_markers = self.inner.lock().strategy.overflow_markers();
947 self.inner.lock().strategy.reset_overflow_markers();
948
949 let mut to_add = Vec::new();
950 let mut merger = layer_set.merger();
951 let mut iter = self.filter(merger.query(Query::FullScan).await?, false).await?;
952 let mut last_offset = 0;
953 while last_offset < self.device_size {
954 let next_range = match iter.get() {
955 None => {
956 assert!(last_offset <= self.device_size);
957 let range = last_offset..self.device_size;
958 last_offset = self.device_size;
959 iter.advance().await?;
960 range
961 }
962 Some(ItemRef { key: AllocatorKey { device_range, .. }, .. }) => {
963 if device_range.end < last_offset {
964 iter.advance().await?;
965 continue;
966 }
967 if device_range.start <= last_offset {
968 last_offset = device_range.end;
969 iter.advance().await?;
970 continue;
971 }
972 let range = last_offset..device_range.start;
973 last_offset = device_range.end;
974 iter.advance().await?;
975 range
976 }
977 };
978 to_add.push(next_range);
979 if to_add.len() > 100 {
981 let mut inner = self.inner.lock();
982 for range in to_add.drain(..) {
983 changed |= inner.strategy.force_free(range)?;
984 }
985 }
986 }
987 let mut inner = self.inner.lock();
988 for range in to_add {
989 changed |= inner.strategy.force_free(range)?;
990 }
991 if overflow_markers != inner.strategy.overflow_markers() {
992 changed = true;
993 }
994 Ok(changed)
995 }
996
997 pub async fn take_for_trimming(
1001 &self,
1002 offset: u64,
1003 max_extent_size: usize,
1004 extents_per_batch: usize,
1005 ) -> Result<TrimmableExtents<'_>, Error> {
1006 let _guard = self.allocation_mutex.lock().await;
1007
1008 let (mut result, listener) = TrimmableExtents::new(self);
1009 let mut bytes = 0;
1010
1011 let mut layer_set = self.tree.empty_layer_set();
1014 layer_set.layers.push((self.temporary_allocations.clone() as Arc<dyn Layer<_, _>>).into());
1015 self.tree.add_all_layers_to_layer_set(&mut layer_set);
1016 let mut merger = layer_set.merger();
1017 let mut iter = self
1018 .filter(
1019 merger.query(Query::FullRange(&AllocatorKey { device_range: offset..0 })).await?,
1020 false,
1021 )
1022 .await?;
1023 let mut last_offset = offset;
1024 'outer: while last_offset < self.device_size {
1025 let mut range = match iter.get() {
1026 None => {
1027 assert!(last_offset <= self.device_size);
1028 let range = last_offset..self.device_size;
1029 last_offset = self.device_size;
1030 iter.advance().await?;
1031 range
1032 }
1033 Some(ItemRef { key: AllocatorKey { device_range, .. }, .. }) => {
1034 if device_range.end <= last_offset {
1035 iter.advance().await?;
1036 continue;
1037 }
1038 if device_range.start <= last_offset {
1039 last_offset = device_range.end;
1040 iter.advance().await?;
1041 continue;
1042 }
1043 let range = last_offset..device_range.start;
1044 last_offset = device_range.end;
1045 iter.advance().await?;
1046 range
1047 }
1048 };
1049 if range.start < offset {
1050 continue;
1051 }
1052
1053 let mut inner = self.inner.lock();
1056
1057 while range.start < range.end {
1058 let prefix =
1059 range.start..std::cmp::min(range.start + max_extent_size as u64, range.end);
1060 range = prefix.end..range.end;
1061 bytes += prefix.length()?;
1062 inner.strategy.remove(prefix.clone());
1065 self.temporary_allocations.insert(AllocatorItem {
1066 key: AllocatorKey { device_range: prefix.clone() },
1067 value: AllocatorValue::Abs { owner_object_id: INVALID_OBJECT_ID, count: 1 },
1068 sequence: 0,
1069 })?;
1070 result.add_extent(prefix);
1071 if result.extents.len() == extents_per_batch {
1072 break 'outer;
1073 }
1074 }
1075 if result.extents.len() == extents_per_batch {
1076 break 'outer;
1077 }
1078 }
1079 {
1080 let mut inner = self.inner.lock();
1081
1082 assert!(inner.trim_reserved_bytes == 0, "Multiple trims ongoing");
1083 inner.trim_listener = Some(listener);
1084 inner.trim_reserved_bytes = bytes;
1085 debug_assert!(
1086 (Saturating(inner.trim_reserved_bytes) + inner.unavailable_bytes()).0
1087 <= self.device_size
1088 );
1089 }
1090 Ok(result)
1091 }
1092
1093 pub fn parent_objects(&self) -> Vec<u64> {
1095 self.inner.lock().info.layers.clone()
1098 }
1099
1100 pub fn owner_byte_limits(&self) -> Vec<(u64, u64)> {
1102 self.inner.lock().info.limit_bytes.iter().map(|(k, v)| (*k, *v)).collect()
1103 }
1104
1105 pub fn owner_allocation_info(&self, owner_object_id: u64) -> (u64, Option<u64>) {
1107 let inner = self.inner.lock();
1108 (
1109 inner.owner_bytes.get(&owner_object_id).map(|b| b.used_bytes().0).unwrap_or(0u64),
1110 inner.info.limit_bytes.get(&owner_object_id).copied(),
1111 )
1112 }
1113
1114 pub fn owner_bytes_debug(&self) -> String {
1116 format!("{:?}", self.inner.lock().owner_bytes)
1117 }
1118
1119 fn needs_sync(&self) -> bool {
1120 let inner = self.inner.lock();
1125 inner.unavailable_bytes().0 >= self.device_size
1126 }
1127
1128 fn is_system_store(&self, owner_object_id: u64) -> bool {
1129 let fs = self.filesystem.upgrade().unwrap();
1130 owner_object_id == fs.object_manager().root_store_object_id()
1131 || owner_object_id == fs.object_manager().root_parent_store_object_id()
1132 }
1133
1134 pub fn disown_reservation(&self, old_owner_object_id: Option<u64>, amount: u64) {
1137 if old_owner_object_id.is_none() || amount == 0 {
1138 return;
1139 }
1140 let mut inner = self.inner.lock();
1142 inner.remove_reservation(old_owner_object_id, amount);
1143 inner.add_reservation(None, amount);
1144 }
1145
1146 pub fn track_statistics(self: &Arc<Self>, parent: &fuchsia_inspect::Node, name: &str) {
1149 let this = Arc::downgrade(self);
1150 parent.record_lazy_child(name, move || {
1151 let this_clone = this.clone();
1152 async move {
1153 let inspector = fuchsia_inspect::Inspector::default();
1154 if let Some(this) = this_clone.upgrade() {
1155 let counters = this.counters.lock();
1156 let root = inspector.root();
1157 root.record_uint("max_extent_size_bytes", this.max_extent_size_bytes);
1158 root.record_uint("bytes_total", this.device_size);
1159 let (allocated, reserved, used, unavailable) = {
1160 let inner = this.inner.lock();
1162 (
1163 inner.allocated_bytes().0,
1164 inner.reserved_bytes(),
1165 inner.used_bytes().0,
1166 inner.unavailable_bytes().0,
1167 )
1168 };
1169 root.record_uint("bytes_allocated", allocated);
1170 root.record_uint("bytes_reserved", reserved);
1171 root.record_uint("bytes_used", used);
1172 root.record_uint("bytes_unavailable", unavailable);
1173
1174 if let Some(x) = round_div(100 * allocated, this.device_size) {
1177 root.record_uint("bytes_allocated_percent", x);
1178 }
1179 if let Some(x) = round_div(100 * reserved, this.device_size) {
1180 root.record_uint("bytes_reserved_percent", x);
1181 }
1182 if let Some(x) = round_div(100 * used, this.device_size) {
1183 root.record_uint("bytes_used_percent", x);
1184 }
1185 if let Some(x) = round_div(100 * unavailable, this.device_size) {
1186 root.record_uint("bytes_unavailable_percent", x);
1187 }
1188
1189 root.record_uint("num_flushes", counters.num_flushes);
1190 if let Some(last_flush_time) = counters.last_flush_time.as_ref() {
1191 root.record_uint(
1192 "last_flush_time_ms",
1193 last_flush_time
1194 .duration_since(std::time::UNIX_EPOCH)
1195 .unwrap_or(std::time::Duration::ZERO)
1196 .as_millis()
1197 .try_into()
1198 .unwrap_or(0u64),
1199 );
1200 }
1201
1202 let data = this.allocation_size_histogram();
1203 let alloc_sizes = root.create_uint_linear_histogram(
1204 "allocation_size_histogram",
1205 fuchsia_inspect::LinearHistogramParams {
1206 floor: 1,
1207 step_size: 1,
1208 buckets: 64,
1209 },
1210 );
1211 for (i, count) in data.iter().enumerate() {
1212 if i != 0 {
1213 alloc_sizes.insert_multiple(i as u64, *count as usize);
1214 }
1215 }
1216 root.record(alloc_sizes);
1217
1218 let data = this.inner.lock().rebuild_strategy_trigger_histogram;
1219 let triggers = root.create_uint_linear_histogram(
1220 "rebuild_strategy_triggers",
1221 fuchsia_inspect::LinearHistogramParams {
1222 floor: 1,
1223 step_size: 1,
1224 buckets: 64,
1225 },
1226 );
1227 for (i, count) in data.iter().enumerate() {
1228 if i != 0 {
1229 triggers.insert_multiple(i as u64, *count as usize);
1230 }
1231 }
1232 root.record(triggers);
1233 }
1234 Ok(inspector)
1235 }
1236 .boxed()
1237 });
1238 }
1239
1240 pub fn maximum_offset(&self) -> u64 {
1245 self.maximum_offset.load(Ordering::Relaxed)
1246 }
1247}
1248
1249impl Drop for Allocator {
1250 fn drop(&mut self) {
1251 let inner = self.inner.lock();
1252 assert_eq!(inner.uncommitted_allocated_bytes(), 0);
1254 assert_eq!(inner.reserved_bytes(), 0);
1255 }
1256}
1257
1258#[fxfs_trace::trace]
1259impl Allocator {
1260 pub fn object_id(&self) -> u64 {
1262 self.object_id
1263 }
1264
1265 pub fn info(&self) -> AllocatorInfo {
1268 self.inner.lock().info.clone()
1269 }
1270
1271 #[trace]
1279 pub async fn allocate(
1280 self: &Arc<Self>,
1281 transaction: &mut Transaction<'_>,
1282 owner_object_id: u64,
1283 mut len: u64,
1284 ) -> Result<Range<u64>, Error> {
1285 assert_eq!(len % self.block_size, 0);
1286 len = std::cmp::min(len, self.max_extent_size_bytes);
1287 debug_assert_ne!(owner_object_id, INVALID_OBJECT_ID);
1288
1289 let reservation = if let Some(reservation) = transaction.allocator_reservation {
1291 match reservation.owner_object_id {
1292 None => assert!(self.is_system_store(owner_object_id)),
1294 Some(res_owner_object_id) => assert_eq!(owner_object_id, res_owner_object_id),
1296 };
1297 let r = reservation
1299 .reserve_with(|limit| std::cmp::min(len, round_down(limit, self.block_size)));
1300 len = r.amount();
1301 Left(r)
1302 } else {
1303 let mut inner = self.inner.lock();
1304 assert!(inner.opened);
1305 let device_used = inner.used_bytes();
1307 let owner_bytes_left = inner.owner_id_bytes_left(owner_object_id);
1308 let limit =
1310 std::cmp::min(owner_bytes_left, (Saturating(self.device_size) - device_used).0);
1311 len = round_down(std::cmp::min(len, limit), self.block_size);
1312 let owner_entry = inner.owner_bytes.entry(owner_object_id).or_default();
1313 owner_entry.reserved_bytes += len;
1314 Right(ReservationImpl::<_, Self>::new(&**self, Some(owner_object_id), len))
1315 };
1316
1317 ensure!(len > 0, FxfsError::NoSpace);
1318
1319 let volumes_deleted = {
1321 let inner = self.inner.lock();
1322 (!inner.volumes_deleted_pending_sync.is_empty())
1323 .then(|| inner.volumes_deleted_pending_sync.clone())
1324 };
1325
1326 if let Some(volumes_deleted) = volumes_deleted {
1327 self.filesystem
1330 .upgrade()
1331 .unwrap()
1332 .sync(SyncOptions {
1333 flush_device: true,
1334 precondition: Some(Box::new(|| {
1335 !self.inner.lock().volumes_deleted_pending_sync.is_empty()
1336 })),
1337 ..Default::default()
1338 })
1339 .await?;
1340
1341 {
1342 let mut inner = self.inner.lock();
1343 for owner_id in volumes_deleted {
1344 inner.volumes_deleted_pending_sync.remove(&owner_id);
1345 inner.marked_for_deletion.insert(owner_id);
1346 }
1347 }
1348
1349 let _guard = self.allocation_mutex.lock().await;
1350 self.rebuild_strategy().await?;
1351 }
1352
1353 #[allow(clippy::never_loop)] let _guard = 'sync: loop {
1355 for _ in 0..10 {
1357 {
1358 let guard = self.allocation_mutex.lock().await;
1359
1360 if !self.needs_sync() {
1361 break 'sync guard;
1362 }
1363 }
1364
1365 self.filesystem
1375 .upgrade()
1376 .unwrap()
1377 .sync(SyncOptions {
1378 flush_device: true,
1379 precondition: Some(Box::new(|| self.needs_sync())),
1380 ..Default::default()
1381 })
1382 .await?;
1383 }
1384 bail!(
1385 anyhow!(FxfsError::NoSpace).context("Sync failed to yield sufficient free space.")
1386 );
1387 };
1388
1389 let mut trim_listener = None;
1390 {
1391 let mut inner = self.inner.lock();
1392 inner.allocation_size_histogram[std::cmp::min(63, len / self.block_size) as usize] += 1;
1393
1394 let avail = self
1397 .device_size
1398 .checked_sub(inner.unavailable_bytes().0)
1399 .ok_or(FxfsError::Inconsistent)?;
1400 let free_and_not_being_trimmed =
1401 inner.bytes_available_not_being_trimmed(self.device_size)?;
1402 if free_and_not_being_trimmed < std::cmp::min(len, avail) {
1403 debug_assert!(inner.trim_reserved_bytes > 0);
1404 trim_listener = std::mem::take(&mut inner.trim_listener);
1405 }
1406 }
1407
1408 if let Some(listener) = trim_listener {
1409 listener.await;
1410 }
1411
1412 let result = loop {
1413 {
1414 let mut inner = self.inner.lock();
1415
1416 for device_range in inner.dropped_temporary_allocations.drain(..) {
1419 self.temporary_allocations.erase(&AllocatorKey { device_range });
1420 }
1421
1422 match inner.strategy.allocate(len) {
1423 Err(FxfsError::NotFound) => {
1424 inner.rebuild_strategy_trigger_histogram
1426 [std::cmp::min(63, (len / self.block_size) as usize)] += 1;
1427 }
1428 Err(err) => {
1429 error!(err:%; "Likely filesystem corruption.");
1430 return Err(err.into());
1431 }
1432 Ok(x) => {
1433 break x;
1434 }
1435 }
1436 }
1437 if !self.rebuild_strategy().await? {
1441 error!("Cannot find additional free space. Corruption?");
1442 return Err(FxfsError::Inconsistent.into());
1443 }
1444 };
1445
1446 debug!(device_range:? = result; "allocate");
1447
1448 let len = result.length().unwrap();
1449 let reservation_owner = reservation.either(
1450 |l| {
1452 l.forget_some(len);
1453 l.owner_object_id()
1454 },
1455 |r| {
1456 r.forget_some(len);
1457 r.owner_object_id()
1458 },
1459 );
1460
1461 {
1462 let mut inner = self.inner.lock();
1463 let owner_entry = inner.owner_bytes.entry(owner_object_id).or_default();
1464 owner_entry.uncommitted_allocated_bytes += len;
1465 assert_eq!(owner_object_id, reservation_owner.unwrap_or(owner_object_id));
1467 inner.remove_reservation(reservation_owner, len);
1468 self.temporary_allocations.insert(AllocatorItem {
1469 key: AllocatorKey { device_range: result.clone() },
1470 value: AllocatorValue::Abs { owner_object_id, count: 1 },
1471 sequence: 0,
1472 })?;
1473 }
1474
1475 let mutation =
1476 AllocatorMutation::Allocate { device_range: result.clone().into(), owner_object_id };
1477 assert!(transaction.add(self.object_id(), Mutation::Allocator(mutation)).is_none());
1478
1479 Ok(result)
1480 }
1481
1482 #[trace]
1485 pub fn mark_allocated(
1486 &self,
1487 transaction: &mut Transaction<'_>,
1488 owner_object_id: u64,
1489 device_range: Range<u64>,
1490 ) -> Result<(), Error> {
1491 debug_assert_ne!(owner_object_id, INVALID_OBJECT_ID);
1492 {
1493 let len = device_range.length().map_err(|_| FxfsError::InvalidArgs)?;
1494
1495 let mut inner = self.inner.lock();
1496 let device_used = inner.used_bytes();
1497 let owner_id_bytes_left = inner.owner_id_bytes_left(owner_object_id);
1498 let owner_entry = inner.owner_bytes.entry(owner_object_id).or_default();
1499 ensure!(
1500 device_range.end <= self.device_size
1501 && (Saturating(self.device_size) - device_used).0 >= len
1502 && owner_id_bytes_left >= len,
1503 FxfsError::NoSpace
1504 );
1505 if let Some(reservation) = &mut transaction.allocator_reservation {
1506 reservation.reserve(len).ok_or(FxfsError::NoSpace)?.forget();
1508 }
1509 owner_entry.uncommitted_allocated_bytes += len;
1510 inner.strategy.remove(device_range.clone());
1511 self.temporary_allocations.insert(AllocatorItem {
1512 key: AllocatorKey { device_range: device_range.clone() },
1513 value: AllocatorValue::Abs { owner_object_id, count: 1 },
1514 sequence: 0,
1515 })?;
1516 }
1517 let mutation =
1518 AllocatorMutation::Allocate { device_range: device_range.into(), owner_object_id };
1519 transaction.add(self.object_id(), Mutation::Allocator(mutation));
1520 Ok(())
1521 }
1522
1523 pub fn set_bytes_limit(
1525 &self,
1526 transaction: &mut Transaction<'_>,
1527 owner_object_id: u64,
1528 bytes: u64,
1529 ) -> Result<(), Error> {
1530 assert!(!self.is_system_store(owner_object_id));
1532 transaction.add(
1533 self.object_id(),
1534 Mutation::Allocator(AllocatorMutation::SetLimit { owner_object_id, bytes }),
1535 );
1536 Ok(())
1537 }
1538
1539 pub fn get_bytes_limit(&self, owner_object_id: u64) -> Option<u64> {
1541 self.inner.lock().info.limit_bytes.get(&owner_object_id).copied()
1542 }
1543
1544 #[trace]
1546 pub async fn deallocate(
1547 &self,
1548 transaction: &mut Transaction<'_>,
1549 owner_object_id: u64,
1550 dealloc_range: Range<u64>,
1551 ) -> Result<u64, Error> {
1552 debug!(device_range:? = dealloc_range; "deallocate");
1553 ensure!(dealloc_range.is_valid(), FxfsError::InvalidArgs);
1554 let deallocated = dealloc_range.end - dealloc_range.start;
1558 let mutation = AllocatorMutation::Deallocate {
1559 device_range: dealloc_range.clone().into(),
1560 owner_object_id,
1561 };
1562 transaction.add(self.object_id(), Mutation::Allocator(mutation));
1563
1564 let _guard = self.allocation_mutex.lock().await;
1565
1566 let mut inner = self.inner.lock();
1578 for device_range in inner.dropped_temporary_allocations.drain(..) {
1579 self.temporary_allocations.erase(&AllocatorKey { device_range });
1580 }
1581
1582 self.temporary_allocations
1587 .insert(AllocatorItem {
1588 key: AllocatorKey { device_range: dealloc_range.clone() },
1589 value: AllocatorValue::Abs { owner_object_id, count: 1 },
1590 sequence: 0,
1591 })
1592 .context("tracking deallocated")?;
1593
1594 Ok(deallocated)
1595 }
1596
1597 pub fn mark_for_deletion(&self, transaction: &mut Transaction<'_>, owner_object_id: u64) {
1614 transaction.add(
1617 self.object_id(),
1618 Mutation::Allocator(AllocatorMutation::MarkForDeletion(owner_object_id)),
1619 );
1620 }
1621
1622 pub fn did_flush_device(&self, flush_log_offset: u64) {
1625 #[allow(clippy::never_loop)] let deallocs = 'deallocs_outer: loop {
1630 let mut inner = self.inner.lock();
1631 for (index, dealloc) in inner.committed_deallocated.iter().enumerate() {
1632 if dealloc.log_file_offset >= flush_log_offset {
1633 let mut deallocs = inner.committed_deallocated.split_off(index);
1634 std::mem::swap(&mut inner.committed_deallocated, &mut deallocs);
1636 break 'deallocs_outer deallocs;
1637 }
1638 }
1639 break std::mem::take(&mut inner.committed_deallocated);
1640 };
1641
1642 let mut inner = self.inner.lock();
1644 let mut totals = BTreeMap::<u64, u64>::new();
1645 for dealloc in deallocs {
1646 *(totals.entry(dealloc.owner_object_id).or_default()) +=
1647 dealloc.range.length().unwrap();
1648 inner.strategy.free(dealloc.range.clone()).expect("dealloced ranges");
1649 self.temporary_allocations.erase(&AllocatorKey { device_range: dealloc.range.clone() });
1650 }
1651
1652 for (owner_object_id, total) in totals {
1656 match inner.owner_bytes.get_mut(&owner_object_id) {
1657 Some(counters) => counters.committed_deallocated_bytes -= total,
1658 None => panic!("Failed to decrement for unknown owner: {}", owner_object_id),
1659 }
1660 }
1661 }
1662
1663 pub fn reserve(
1666 self: Arc<Self>,
1667 owner_object_id: Option<u64>,
1668 amount: u64,
1669 ) -> Option<Reservation> {
1670 {
1671 let mut inner = self.inner.lock();
1672
1673 let device_free = (Saturating(self.device_size) - inner.used_bytes()).0;
1674
1675 let limit = match owner_object_id {
1676 Some(id) => std::cmp::min(inner.owner_id_bytes_left(id), device_free),
1677 None => device_free,
1678 };
1679 if limit < amount {
1680 return None;
1681 }
1682 inner.add_reservation(owner_object_id, amount);
1683 }
1684 Some(Reservation::new(self, owner_object_id, amount))
1685 }
1686
1687 pub fn reserve_with(
1690 self: Arc<Self>,
1691 owner_object_id: Option<u64>,
1692 amount: impl FnOnce(u64) -> u64,
1693 ) -> Reservation {
1694 let amount = {
1695 let mut inner = self.inner.lock();
1696
1697 let device_free = (Saturating(self.device_size) - inner.used_bytes()).0;
1698
1699 let amount = amount(match owner_object_id {
1700 Some(id) => std::cmp::min(inner.owner_id_bytes_left(id), device_free),
1701 None => device_free,
1702 });
1703
1704 inner.add_reservation(owner_object_id, amount);
1705
1706 amount
1707 };
1708
1709 Reservation::new(self, owner_object_id, amount)
1710 }
1711
1712 pub fn get_allocated_bytes(&self) -> u64 {
1714 self.inner.lock().allocated_bytes().0
1715 }
1716
1717 pub fn get_disk_bytes(&self) -> u64 {
1719 self.device_size
1720 }
1721
1722 pub fn get_owner_allocated_bytes(&self) -> BTreeMap<u64, u64> {
1726 self.inner.lock().owner_bytes.iter().map(|(k, v)| (*k, v.allocated_bytes.0)).collect()
1727 }
1728
1729 pub fn get_used_bytes(&self) -> Saturating<u64> {
1731 let inner = self.inner.lock();
1732 inner.used_bytes()
1733 }
1734}
1735
1736impl ReservationOwner for Allocator {
1737 fn release_reservation(&self, owner_object_id: Option<u64>, amount: u64) {
1738 self.inner.lock().remove_reservation(owner_object_id, amount);
1739 }
1740}
1741
1742#[async_trait]
1743impl JournalingObject for Allocator {
1744 fn apply_mutation(
1745 &self,
1746 mutation: Mutation,
1747 context: &ApplyContext<'_, '_>,
1748 _assoc_obj: AssocObj<'_>,
1749 ) -> Result<(), Error> {
1750 match mutation {
1751 Mutation::Allocator(AllocatorMutation::MarkForDeletion(owner_object_id)) => {
1752 let mut inner = self.inner.lock();
1753 inner.owner_bytes.remove(&owner_object_id);
1754
1755 inner.info.marked_for_deletion.insert(owner_object_id);
1760 inner.volumes_deleted_pending_sync.insert(owner_object_id);
1761
1762 inner.info.limit_bytes.remove(&owner_object_id);
1763 }
1764 Mutation::Allocator(AllocatorMutation::Allocate { device_range, owner_object_id }) => {
1765 self.maximum_offset.fetch_max(device_range.end, Ordering::Relaxed);
1766 let item = AllocatorItem {
1767 key: AllocatorKey { device_range: device_range.clone().into() },
1768 value: AllocatorValue::Abs { count: 1, owner_object_id },
1769 sequence: context.checkpoint.file_offset,
1770 };
1771 let len = item.key.device_range.length().unwrap();
1772 let lower_bound = item.key.lower_bound_for_merge_into();
1773 self.tree.merge_into(item, &lower_bound);
1774 let mut inner = self.inner.lock();
1775 let entry = inner.owner_bytes.entry(owner_object_id).or_default();
1776 entry.allocated_bytes += len;
1777 if let ApplyMode::Live(transaction) = context.mode {
1778 entry.uncommitted_allocated_bytes -= len;
1779 inner.dropped_temporary_allocations.push(device_range.into());
1784 if let Some(reservation) = transaction.allocator_reservation {
1785 reservation.commit(len);
1786 }
1787 }
1788 }
1789 Mutation::Allocator(AllocatorMutation::Deallocate {
1790 device_range,
1791 owner_object_id,
1792 }) => {
1793 let item = AllocatorItem {
1794 key: AllocatorKey { device_range: device_range.into() },
1795 value: AllocatorValue::None,
1796 sequence: context.checkpoint.file_offset,
1797 };
1798 let len = item.key.device_range.length().unwrap();
1799
1800 {
1801 let mut inner = self.inner.lock();
1802 {
1803 let entry = inner.owner_bytes.entry(owner_object_id).or_default();
1804 entry.allocated_bytes -= len;
1805 if context.mode.is_live() {
1806 entry.committed_deallocated_bytes += len;
1807 }
1808 }
1809 if context.mode.is_live() {
1810 inner.committed_deallocated.push_back(CommittedDeallocation {
1811 log_file_offset: context.checkpoint.file_offset,
1812 range: item.key.device_range.clone(),
1813 owner_object_id,
1814 });
1815 }
1816 if let ApplyMode::Live(Transaction {
1817 allocator_reservation: Some(reservation),
1818 ..
1819 }) = context.mode
1820 {
1821 inner.add_reservation(reservation.owner_object_id(), len);
1822 reservation.add(len);
1823 }
1824 }
1825 let lower_bound = item.key.lower_bound_for_merge_into();
1826 self.tree.merge_into(item, &lower_bound);
1827 }
1828 Mutation::Allocator(AllocatorMutation::SetLimit { owner_object_id, bytes }) => {
1829 self.inner.lock().info.limit_bytes.insert(owner_object_id, bytes);
1834 }
1835 Mutation::BeginFlush => {
1836 self.tree.seal();
1837 let mut inner = self.inner.lock();
1840 let allocated_bytes =
1841 inner.owner_bytes.iter().map(|(k, v)| (*k, v.allocated_bytes.0)).collect();
1842 inner.info.allocated_bytes = allocated_bytes;
1843 }
1844 Mutation::EndFlush => {}
1845 _ => bail!("unexpected mutation: {:?}", mutation),
1846 }
1847 Ok(())
1848 }
1849
1850 fn drop_mutation(&self, mutation: Mutation, transaction: &Transaction<'_>) {
1851 match mutation {
1852 Mutation::Allocator(AllocatorMutation::Allocate { device_range, owner_object_id }) => {
1853 let len = device_range.length().unwrap();
1854 let mut inner = self.inner.lock();
1855 inner
1856 .owner_bytes
1857 .entry(owner_object_id)
1858 .or_default()
1859 .uncommitted_allocated_bytes -= len;
1860 if let Some(reservation) = transaction.allocator_reservation {
1861 let res_owner = reservation.owner_object_id();
1862 inner.add_reservation(res_owner, len);
1863 reservation.release_reservation(res_owner, len);
1864 }
1865 inner.strategy.free(device_range.clone().into()).expect("drop mutaton");
1866 self.temporary_allocations
1867 .erase(&AllocatorKey { device_range: device_range.into() });
1868 }
1869 Mutation::Allocator(AllocatorMutation::Deallocate { device_range, .. }) => {
1870 self.temporary_allocations
1871 .erase(&AllocatorKey { device_range: device_range.into() });
1872 }
1873 _ => {}
1874 }
1875 }
1876
1877 async fn flush(&self) -> Result<Version, Error> {
1878 let filesystem = self.filesystem.upgrade().unwrap();
1879 let object_manager = filesystem.object_manager();
1880 let earliest_version = self.tree.get_earliest_version();
1881 if !object_manager.needs_flush(self.object_id()) && earliest_version == LATEST_VERSION {
1882 return Ok(earliest_version);
1884 }
1885
1886 let fs = self.filesystem.upgrade().unwrap();
1887 let mut flusher = Flusher::new(self, &fs).await;
1888 let (new_layer_file, info) = flusher.start().await?;
1889 flusher.finish(new_layer_file, info).await
1890 }
1891}
1892
1893pub struct CoalescingIterator<I> {
1906 iter: I,
1907 item: Option<AllocatorItem>,
1908}
1909
1910impl<I: LayerIterator<AllocatorKey, AllocatorValue>> CoalescingIterator<I> {
1911 pub async fn new(iter: I) -> Result<CoalescingIterator<I>, Error> {
1912 let mut iter = Self { iter, item: None };
1913 iter.advance().await?;
1914 Ok(iter)
1915 }
1916}
1917
1918#[async_trait]
1919impl<I: LayerIterator<AllocatorKey, AllocatorValue>> LayerIterator<AllocatorKey, AllocatorValue>
1920 for CoalescingIterator<I>
1921{
1922 async fn advance(&mut self) -> Result<(), Error> {
1923 self.item = self.iter.get().map(|x| x.cloned());
1924 if self.item.is_none() {
1925 return Ok(());
1926 }
1927 let left = self.item.as_mut().unwrap();
1928 loop {
1929 self.iter.advance().await?;
1930 match self.iter.get() {
1931 None => return Ok(()),
1932 Some(right) => {
1933 ensure!(
1935 left.key.device_range.end <= right.key.device_range.start,
1936 FxfsError::Inconsistent
1937 );
1938 if left.key.device_range.end < right.key.device_range.start
1940 || left.value != *right.value
1941 {
1942 return Ok(());
1943 }
1944 left.key.device_range.end = right.key.device_range.end;
1945 }
1946 }
1947 }
1948 }
1949
1950 fn get(&self) -> Option<ItemRef<'_, AllocatorKey, AllocatorValue>> {
1951 self.item.as_ref().map(|x| x.as_item_ref())
1952 }
1953}
1954
1955struct Flusher<'a> {
1956 allocator: &'a Allocator,
1957 fs: &'a Arc<FxFilesystem>,
1958 _guard: WriteGuard<'a>,
1959}
1960
1961impl<'a> Flusher<'a> {
1962 async fn new(allocator: &'a Allocator, fs: &'a Arc<FxFilesystem>) -> Self {
1963 let keys = lock_keys![LockKey::flush(allocator.object_id())];
1964 Self { allocator, fs, _guard: fs.lock_manager().write_lock(keys).await }
1965 }
1966
1967 fn txn_options(allocator_reservation: &Reservation) -> Options<'_> {
1968 Options {
1969 skip_journal_checks: true,
1970 borrow_metadata_space: true,
1971 allocator_reservation: Some(allocator_reservation),
1972 ..Default::default()
1973 }
1974 }
1975
1976 async fn start(&mut self) -> Result<(DataObjectHandle<ObjectStore>, AllocatorInfo), Error> {
1977 let object_manager = self.fs.object_manager();
1978 let mut transaction = self
1979 .fs
1980 .clone()
1981 .new_transaction(lock_keys![], Self::txn_options(object_manager.metadata_reservation()))
1982 .await?;
1983
1984 let root_store = self.fs.root_store();
1985 let layer_object_handle = ObjectStore::create_object(
1986 &root_store,
1987 &mut transaction,
1988 HandleOptions { skip_journal_checks: true, ..Default::default() },
1989 None,
1990 )
1991 .await?;
1992 root_store.add_to_graveyard(&mut transaction, layer_object_handle.object_id());
1993 transaction.add(self.allocator.object_id(), Mutation::BeginFlush);
2000 let info = transaction
2001 .commit_with_callback(|_| {
2002 self.allocator.inner.lock().info.clone()
2005 })
2006 .await?;
2007 Ok((layer_object_handle, info))
2008 }
2009
2010 async fn finish(
2011 self,
2012 layer_object_handle: DataObjectHandle<ObjectStore>,
2013 mut info: AllocatorInfo,
2014 ) -> Result<Version, Error> {
2015 let object_manager = self.fs.object_manager();
2016 let txn_options = Self::txn_options(object_manager.metadata_reservation());
2017
2018 let layer_set = self.allocator.tree.immutable_layer_set();
2019 let total_len = layer_set.sum_len();
2020 {
2021 let mut merger = layer_set.merger();
2022 let iter = self.allocator.filter(merger.query(Query::FullScan).await?, true).await?;
2023 let iter = CoalescingIterator::new(iter).await?;
2024 self.allocator
2025 .tree
2026 .compact_with_iterator(
2027 iter,
2028 total_len,
2029 DirectWriter::new(&layer_object_handle, txn_options).await,
2030 layer_object_handle.block_size(),
2031 )
2032 .await?;
2033 }
2034
2035 let root_store = self.fs.root_store();
2036
2037 let object_handle;
2039 let reservation_update;
2040 let mut transaction = self
2041 .fs
2042 .clone()
2043 .new_transaction(
2044 lock_keys![LockKey::object(
2045 root_store.store_object_id(),
2046 self.allocator.object_id()
2047 )],
2048 txn_options,
2049 )
2050 .await?;
2051 let mut serialized_info = Vec::new();
2052
2053 debug!(oid = layer_object_handle.object_id(); "new allocator layer file");
2054 object_handle = ObjectStore::open_object(
2055 &root_store,
2056 self.allocator.object_id(),
2057 HandleOptions::default(),
2058 None,
2059 )
2060 .await?;
2061
2062 for object_id in &info.layers {
2064 root_store.add_to_graveyard(&mut transaction, *object_id);
2065 }
2066
2067 let marked_for_deletion = std::mem::take(&mut info.marked_for_deletion);
2074
2075 info.layers = vec![layer_object_handle.object_id()];
2076
2077 info.serialize_with_version(&mut serialized_info)?;
2078
2079 let mut buf = object_handle.allocate_buffer(serialized_info.len()).await;
2080 buf.as_mut_slice()[..serialized_info.len()].copy_from_slice(&serialized_info[..]);
2081 object_handle.txn_write(&mut transaction, 0u64, buf.as_ref()).await?;
2082
2083 reservation_update = ReservationUpdate::new(tree::reservation_amount_from_layer_size(
2084 layer_object_handle.get_size(),
2085 ));
2086
2087 transaction.add_with_object(
2090 self.allocator.object_id(),
2091 Mutation::EndFlush,
2092 AssocObj::Borrowed(&reservation_update),
2093 );
2094 root_store.remove_from_graveyard(&mut transaction, layer_object_handle.object_id());
2095
2096 let layers = layers_from_handles([layer_object_handle]).await?;
2097 transaction
2098 .commit_with_callback(|_| {
2099 self.allocator.tree.set_layers(layers);
2100
2101 let mut inner = self.allocator.inner.lock();
2105 inner.info.layers = info.layers;
2106 for owner_id in marked_for_deletion {
2107 inner.marked_for_deletion.remove(&owner_id);
2108 inner.info.marked_for_deletion.remove(&owner_id);
2109 }
2110 })
2111 .await?;
2112
2113 for layer in layer_set.layers {
2115 let object_id = layer.handle().map(|h| h.object_id());
2116 layer.close_layer().await;
2117 if let Some(object_id) = object_id {
2118 root_store.tombstone_object(object_id, txn_options).await?;
2119 }
2120 }
2121
2122 let mut counters = self.allocator.counters.lock();
2123 counters.num_flushes += 1;
2124 counters.last_flush_time = Some(std::time::SystemTime::now());
2125 Ok(self.allocator.tree.get_earliest_version())
2127 }
2128}
2129
2130#[cfg(test)]
2131mod tests {
2132 use crate::filesystem::{
2133 FxFilesystem, FxFilesystemBuilder, JournalingObject, OpenFxFilesystem,
2134 };
2135 use crate::fsck::fsck;
2136 use crate::lsm_tree::cache::NullCache;
2137 use crate::lsm_tree::skip_list_layer::SkipListLayer;
2138 use crate::lsm_tree::types::{FuzzyHash as _, Item, ItemRef, Layer, LayerIterator};
2139 use crate::lsm_tree::{LSMTree, Query};
2140 use crate::object_handle::ObjectHandle;
2141 use crate::object_store::allocator::merge::merge;
2142 use crate::object_store::allocator::{
2143 Allocator, AllocatorKey, AllocatorValue, CoalescingIterator,
2144 };
2145 use crate::object_store::transaction::{Options, TRANSACTION_METADATA_MAX_AMOUNT, lock_keys};
2146 use crate::object_store::volume::root_volume;
2147 use crate::object_store::{Directory, LockKey, NewChildStoreOptions, ObjectStore};
2148 use crate::range::RangeExt;
2149 use crate::round::round_up;
2150 use crate::testing;
2151 use fuchsia_async as fasync;
2152 use fuchsia_sync::Mutex;
2153 use std::cmp::{max, min};
2154 use std::ops::{Bound, Range};
2155 use std::sync::Arc;
2156 use storage_device::DeviceHolder;
2157 use storage_device::fake_device::FakeDevice;
2158
2159 #[test]
2160 fn test_allocator_key_is_range_based() {
2161 assert!(AllocatorKey { device_range: 0..100 }.is_range_key());
2163 }
2164
2165 #[fuchsia::test]
2166 async fn test_coalescing_iterator() {
2167 let skip_list = SkipListLayer::new(100);
2168 let items = [
2169 Item::new(
2170 AllocatorKey { device_range: 0..100 },
2171 AllocatorValue::Abs { count: 1, owner_object_id: 99 },
2172 ),
2173 Item::new(
2174 AllocatorKey { device_range: 100..200 },
2175 AllocatorValue::Abs { count: 1, owner_object_id: 99 },
2176 ),
2177 ];
2178 skip_list.insert(items[1].clone()).expect("insert error");
2179 skip_list.insert(items[0].clone()).expect("insert error");
2180 let mut iter =
2181 CoalescingIterator::new(skip_list.seek(Bound::Unbounded).await.expect("seek failed"))
2182 .await
2183 .expect("new failed");
2184 let ItemRef { key, value, .. } = iter.get().expect("get failed");
2185 assert_eq!(
2186 (key, value),
2187 (
2188 &AllocatorKey { device_range: 0..200 },
2189 &AllocatorValue::Abs { count: 1, owner_object_id: 99 }
2190 )
2191 );
2192 iter.advance().await.expect("advance failed");
2193 assert!(iter.get().is_none());
2194 }
2195
2196 #[fuchsia::test]
2197 async fn test_merge_and_coalesce_across_three_layers() {
2198 let lsm_tree = LSMTree::new(merge, Box::new(NullCache {}));
2199 lsm_tree
2200 .insert(Item::new(
2201 AllocatorKey { device_range: 100..200 },
2202 AllocatorValue::Abs { count: 1, owner_object_id: 99 },
2203 ))
2204 .expect("insert error");
2205 lsm_tree.seal();
2206 lsm_tree
2207 .insert(Item::new(
2208 AllocatorKey { device_range: 0..100 },
2209 AllocatorValue::Abs { count: 1, owner_object_id: 99 },
2210 ))
2211 .expect("insert error");
2212
2213 let layer_set = lsm_tree.layer_set();
2214 let mut merger = layer_set.merger();
2215 let mut iter =
2216 CoalescingIterator::new(merger.query(Query::FullScan).await.expect("seek failed"))
2217 .await
2218 .expect("new failed");
2219 let ItemRef { key, value, .. } = iter.get().expect("get failed");
2220 assert_eq!(
2221 (key, value),
2222 (
2223 &AllocatorKey { device_range: 0..200 },
2224 &AllocatorValue::Abs { count: 1, owner_object_id: 99 }
2225 )
2226 );
2227 iter.advance().await.expect("advance failed");
2228 assert!(iter.get().is_none());
2229 }
2230
2231 #[fuchsia::test]
2232 async fn test_merge_and_coalesce_wont_merge_across_object_id() {
2233 let lsm_tree = LSMTree::new(merge, Box::new(NullCache {}));
2234 lsm_tree
2235 .insert(Item::new(
2236 AllocatorKey { device_range: 100..200 },
2237 AllocatorValue::Abs { count: 1, owner_object_id: 99 },
2238 ))
2239 .expect("insert error");
2240 lsm_tree.seal();
2241 lsm_tree
2242 .insert(Item::new(
2243 AllocatorKey { device_range: 0..100 },
2244 AllocatorValue::Abs { count: 1, owner_object_id: 98 },
2245 ))
2246 .expect("insert error");
2247
2248 let layer_set = lsm_tree.layer_set();
2249 let mut merger = layer_set.merger();
2250 let mut iter =
2251 CoalescingIterator::new(merger.query(Query::FullScan).await.expect("seek failed"))
2252 .await
2253 .expect("new failed");
2254 let ItemRef { key, value, .. } = iter.get().expect("get failed");
2255 assert_eq!(
2256 (key, value),
2257 (
2258 &AllocatorKey { device_range: 0..100 },
2259 &AllocatorValue::Abs { count: 1, owner_object_id: 98 },
2260 )
2261 );
2262 iter.advance().await.expect("advance failed");
2263 let ItemRef { key, value, .. } = iter.get().expect("get failed");
2264 assert_eq!(
2265 (key, value),
2266 (
2267 &AllocatorKey { device_range: 100..200 },
2268 &AllocatorValue::Abs { count: 1, owner_object_id: 99 }
2269 )
2270 );
2271 iter.advance().await.expect("advance failed");
2272 assert!(iter.get().is_none());
2273 }
2274
2275 fn overlap(a: &Range<u64>, b: &Range<u64>) -> u64 {
2276 if a.end > b.start && a.start < b.end {
2277 min(a.end, b.end) - max(a.start, b.start)
2278 } else {
2279 0
2280 }
2281 }
2282
2283 async fn collect_allocations(allocator: &Allocator) -> Vec<Range<u64>> {
2284 let layer_set = allocator.tree.layer_set();
2285 let mut merger = layer_set.merger();
2286 let mut iter = allocator
2287 .filter(merger.query(Query::FullScan).await.expect("seek failed"), false)
2288 .await
2289 .expect("build iterator");
2290 let mut allocations: Vec<Range<u64>> = Vec::new();
2291 while let Some(ItemRef { key: AllocatorKey { device_range }, .. }) = iter.get() {
2292 if let Some(r) = allocations.last() {
2293 assert!(device_range.start >= r.end);
2294 }
2295 allocations.push(device_range.clone());
2296 iter.advance().await.expect("advance failed");
2297 }
2298 allocations
2299 }
2300
2301 async fn check_allocations(allocator: &Allocator, expected_allocations: &[Range<u64>]) {
2302 let layer_set = allocator.tree.layer_set();
2303 let mut merger = layer_set.merger();
2304 let mut iter = allocator
2305 .filter(merger.query(Query::FullScan).await.expect("seek failed"), false)
2306 .await
2307 .expect("build iterator");
2308 let mut found = 0;
2309 while let Some(ItemRef { key: AllocatorKey { device_range }, .. }) = iter.get() {
2310 let mut l = device_range.length().expect("Invalid range");
2311 found += l;
2312 for range in expected_allocations {
2315 l -= overlap(range, device_range);
2316 if l == 0 {
2317 break;
2318 }
2319 }
2320 assert_eq!(l, 0, "range {device_range:?} not covered by expectations");
2321 iter.advance().await.expect("advance failed");
2322 }
2323 assert_eq!(found, expected_allocations.iter().map(|r| r.length().unwrap()).sum::<u64>());
2325 }
2326
2327 async fn test_fs() -> (OpenFxFilesystem, Arc<Allocator>) {
2328 let device = DeviceHolder::new(FakeDevice::new(4096, 4096));
2329 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2330 let allocator = fs.allocator();
2331 (fs, allocator)
2332 }
2333
2334 #[fuchsia::test]
2335 async fn test_allocations() {
2336 const STORE_OBJECT_ID: u64 = 99;
2337 let (fs, allocator) = test_fs().await;
2338 let mut transaction =
2339 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2340 let mut device_ranges = collect_allocations(&allocator).await;
2341
2342 let expected = vec![
2344 0..4096, 4096..139264, 139264..204800, 204800..335872, 335872..401408, 524288..528384, ];
2351 assert_eq!(device_ranges, expected);
2352 device_ranges.push(
2353 allocator
2354 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2355 .await
2356 .expect("allocate failed"),
2357 );
2358 assert_eq!(device_ranges.last().unwrap().length().expect("Invalid range"), fs.block_size());
2359 device_ranges.push(
2360 allocator
2361 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2362 .await
2363 .expect("allocate failed"),
2364 );
2365 assert_eq!(device_ranges.last().unwrap().length().expect("Invalid range"), fs.block_size());
2366 assert_eq!(overlap(&device_ranges[0], &device_ranges[1]), 0);
2367 transaction.commit().await.expect("commit failed");
2368 let mut transaction =
2369 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2370 device_ranges.push(
2371 allocator
2372 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2373 .await
2374 .expect("allocate failed"),
2375 );
2376 assert_eq!(device_ranges[7].length().expect("Invalid range"), fs.block_size());
2377 assert_eq!(overlap(&device_ranges[5], &device_ranges[7]), 0);
2378 assert_eq!(overlap(&device_ranges[6], &device_ranges[7]), 0);
2379 transaction.commit().await.expect("commit failed");
2380
2381 check_allocations(&allocator, &device_ranges).await;
2382 }
2383
2384 #[fuchsia::test]
2385 async fn test_allocate_more_than_max_size() {
2386 const STORE_OBJECT_ID: u64 = 99;
2387 let (fs, allocator) = test_fs().await;
2388 let mut transaction =
2389 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2390 let mut device_ranges = collect_allocations(&allocator).await;
2391 device_ranges.push(
2392 allocator
2393 .allocate(&mut transaction, STORE_OBJECT_ID, fs.device().size())
2394 .await
2395 .expect("allocate failed"),
2396 );
2397 assert_eq!(
2398 device_ranges.last().unwrap().length().expect("Invalid range"),
2399 allocator.max_extent_size_bytes
2400 );
2401 transaction.commit().await.expect("commit failed");
2402
2403 check_allocations(&allocator, &device_ranges).await;
2404 }
2405
2406 #[fuchsia::test]
2407 async fn test_deallocations() {
2408 const STORE_OBJECT_ID: u64 = 99;
2409 let (fs, allocator) = test_fs().await;
2410 let initial_allocations = collect_allocations(&allocator).await;
2411
2412 let mut transaction =
2413 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2414 let device_range1 = allocator
2415 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2416 .await
2417 .expect("allocate failed");
2418 assert_eq!(device_range1.length().expect("Invalid range"), fs.block_size());
2419 transaction.commit().await.expect("commit failed");
2420
2421 let mut transaction =
2422 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2423 allocator
2424 .deallocate(&mut transaction, STORE_OBJECT_ID, device_range1)
2425 .await
2426 .expect("deallocate failed");
2427 transaction.commit().await.expect("commit failed");
2428
2429 check_allocations(&allocator, &initial_allocations).await;
2430 }
2431
2432 #[fuchsia::test]
2433 async fn test_mark_allocated() {
2434 const STORE_OBJECT_ID: u64 = 99;
2435 let (fs, allocator) = test_fs().await;
2436 let mut device_ranges = collect_allocations(&allocator).await;
2437 let range = {
2438 let mut transaction = fs
2439 .clone()
2440 .new_transaction(lock_keys![], Options::default())
2441 .await
2442 .expect("new failed");
2443 allocator
2445 .allocate(&mut transaction, STORE_OBJECT_ID, 2 * fs.block_size())
2446 .await
2447 .expect("allocate failed")
2448 };
2450
2451 let mut transaction =
2452 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2453
2454 device_ranges.push(
2457 allocator
2458 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2459 .await
2460 .expect("allocate failed"),
2461 );
2462
2463 assert_eq!(device_ranges.last().unwrap().start, range.start);
2464
2465 let mut range2 = range.clone();
2467 range2.start += fs.block_size();
2468 allocator
2469 .mark_allocated(&mut transaction, STORE_OBJECT_ID, range2.clone())
2470 .expect("mark_allocated failed");
2471 device_ranges.push(range2);
2472
2473 device_ranges.push(
2475 allocator
2476 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2477 .await
2478 .expect("allocate failed"),
2479 );
2480 let last_range = device_ranges.last().unwrap();
2481 assert_eq!(last_range.length().expect("Invalid range"), fs.block_size());
2482 assert_eq!(overlap(last_range, &range), 0);
2483 transaction.commit().await.expect("commit failed");
2484
2485 check_allocations(&allocator, &device_ranges).await;
2486 }
2487
2488 #[fuchsia::test]
2489 async fn test_mark_for_deletion() {
2490 const STORE_OBJECT_ID: u64 = 99;
2491 let (fs, allocator) = test_fs().await;
2492
2493 let initial_allocated_bytes = allocator.get_allocated_bytes();
2495 let mut device_ranges = collect_allocations(&allocator).await;
2496 let mut transaction =
2497 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2498 for _ in 0..15 {
2500 device_ranges.push(
2501 allocator
2502 .allocate(&mut transaction, STORE_OBJECT_ID, 100 * fs.block_size())
2503 .await
2504 .expect("allocate failed"),
2505 );
2506 device_ranges.push(
2507 allocator
2508 .allocate(&mut transaction, STORE_OBJECT_ID, 100 * fs.block_size())
2509 .await
2510 .expect("allocate2 failed"),
2511 );
2512 }
2513 transaction.commit().await.expect("commit failed");
2514 check_allocations(&allocator, &device_ranges).await;
2515
2516 assert_eq!(
2517 allocator.get_allocated_bytes(),
2518 initial_allocated_bytes + fs.block_size() * 3000
2519 );
2520
2521 let mut transaction =
2523 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2524 allocator.mark_for_deletion(&mut transaction, STORE_OBJECT_ID);
2525 transaction.commit().await.expect("commit failed");
2526
2527 assert_eq!(allocator.get_allocated_bytes(), initial_allocated_bytes);
2529 check_allocations(&allocator, &device_ranges).await;
2530
2531 device_ranges.clear();
2534
2535 let mut transaction =
2536 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2537 let target_bytes = 1500 * fs.block_size();
2538 while device_ranges.iter().map(|x| x.length().unwrap()).sum::<u64>() != target_bytes {
2539 let len = std::cmp::min(
2540 target_bytes - device_ranges.iter().map(|x| x.length().unwrap()).sum::<u64>(),
2541 100 * fs.block_size(),
2542 );
2543 device_ranges.push(
2544 allocator.allocate(&mut transaction, 100, len).await.expect("allocate failed"),
2545 );
2546 }
2547 transaction.commit().await.expect("commit failed");
2548
2549 allocator.flush().await.expect("flush failed");
2551
2552 assert!(!allocator.get_owner_allocated_bytes().contains_key(&STORE_OBJECT_ID));
2556 assert_eq!(*allocator.get_owner_allocated_bytes().get(&100).unwrap(), target_bytes,);
2557 }
2558
2559 async fn create_file(store: &Arc<ObjectStore>, size: usize) {
2560 let root_directory =
2561 Directory::open(store, store.root_directory_object_id()).await.expect("open failed");
2562
2563 let mut transaction = store
2564 .filesystem()
2565 .new_transaction(
2566 lock_keys![LockKey::object(
2567 store.store_object_id(),
2568 store.root_directory_object_id()
2569 )],
2570 Options::default(),
2571 )
2572 .await
2573 .expect("new_transaction failed");
2574 let file = root_directory
2575 .create_child_file(&mut transaction, &format!("foo {}", size))
2576 .await
2577 .expect("create_child_file failed");
2578 transaction.commit().await.expect("commit failed");
2579
2580 let buffer = file.allocate_buffer(size).await;
2581
2582 let mut transaction = file
2584 .new_transaction_with_options(Options {
2585 borrow_metadata_space: true,
2586 ..Default::default()
2587 })
2588 .await
2589 .expect("new_transaction_with_options failed");
2590 file.txn_write(&mut transaction, 0, buffer.as_ref()).await.expect("txn_write failed");
2591 transaction.commit().await.expect("commit failed");
2592 }
2593
2594 #[fuchsia::test]
2595 async fn test_replay_with_deleted_store_and_compaction() {
2596 let (fs, _) = test_fs().await;
2597
2598 const FILE_SIZE: usize = 10_000_000;
2599
2600 let mut store_id = {
2601 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2602 let store = root_vol
2603 .new_volume("vol", NewChildStoreOptions::default())
2604 .await
2605 .expect("new_volume failed");
2606
2607 create_file(&store, FILE_SIZE).await;
2608 store.store_object_id()
2609 };
2610
2611 fs.close().await.expect("close failed");
2612 let device = fs.take_device().await;
2613 device.reopen(false);
2614
2615 let mut fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2616
2617 fs.journal().compact().await.expect("compact failed");
2620
2621 for _ in 0..2 {
2622 {
2623 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2624
2625 let transaction = fs
2626 .clone()
2627 .new_transaction(
2628 lock_keys![
2629 LockKey::object(
2630 root_vol.volume_directory().store().store_object_id(),
2631 root_vol.volume_directory().object_id(),
2632 ),
2633 LockKey::flush(store_id)
2634 ],
2635 Options { borrow_metadata_space: true, ..Default::default() },
2636 )
2637 .await
2638 .expect("new_transaction failed");
2639 root_vol
2640 .delete_volume("vol", transaction, || {})
2641 .await
2642 .expect("delete_volume failed");
2643
2644 let store = root_vol
2645 .new_volume("vol", NewChildStoreOptions::default())
2646 .await
2647 .expect("new_volume failed");
2648 create_file(&store, FILE_SIZE).await;
2649 store_id = store.store_object_id();
2650 }
2651
2652 fs.close().await.expect("close failed");
2653 let device = fs.take_device().await;
2654 device.reopen(false);
2655
2656 fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2657 }
2658
2659 fsck(fs.clone()).await.expect("fsck failed");
2660 fs.close().await.expect("close failed");
2661 }
2662
2663 #[fuchsia::test(threads = 4)]
2664 async fn test_compaction_delete_race() {
2665 let (fs, _allocator) = test_fs().await;
2666
2667 {
2668 const FILE_SIZE: usize = 10_000_000;
2669
2670 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2671 let store = root_vol
2672 .new_volume("vol", NewChildStoreOptions::default())
2673 .await
2674 .expect("new_volume failed");
2675
2676 create_file(&store, FILE_SIZE).await;
2677
2678 let fs_clone = fs.clone();
2680
2681 let executor_tasks = testing::force_executor_threads_to_run(4).await;
2684
2685 let task = fasync::Task::spawn(async move {
2686 fs_clone.journal().compact().await.expect("compact failed");
2687 });
2688
2689 drop(executor_tasks);
2691
2692 let sleep = rand::random_range(3000..6000);
2695 std::thread::sleep(std::time::Duration::from_micros(sleep));
2696 log::info!("sleep {sleep}us");
2697
2698 let transaction = fs
2699 .clone()
2700 .new_transaction(
2701 lock_keys![
2702 LockKey::object(
2703 root_vol.volume_directory().store().store_object_id(),
2704 root_vol.volume_directory().object_id(),
2705 ),
2706 LockKey::flush(store.store_object_id())
2707 ],
2708 Options { borrow_metadata_space: true, ..Default::default() },
2709 )
2710 .await
2711 .expect("new_transaction failed");
2712 root_vol.delete_volume("vol", transaction, || {}).await.expect("delete_volume failed");
2713
2714 task.await;
2715 }
2716
2717 fs.journal().compact().await.expect("compact failed");
2718 fs.close().await.expect("close failed");
2719
2720 let device = fs.take_device().await;
2721 device.reopen(false);
2722
2723 let fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2724 fsck(fs.clone()).await.expect("fsck failed");
2725 fs.close().await.expect("close failed");
2726 }
2727
2728 #[fuchsia::test]
2729 async fn test_delete_multiple_volumes() {
2730 let (mut fs, _) = test_fs().await;
2731
2732 for _ in 0..50 {
2733 {
2734 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2735 let store = root_vol
2736 .new_volume("vol", NewChildStoreOptions::default())
2737 .await
2738 .expect("new_volume failed");
2739
2740 create_file(&store, 1_000_000).await;
2741
2742 let transaction = fs
2743 .clone()
2744 .new_transaction(
2745 lock_keys![
2746 LockKey::object(
2747 root_vol.volume_directory().store().store_object_id(),
2748 root_vol.volume_directory().object_id(),
2749 ),
2750 LockKey::flush(store.store_object_id())
2751 ],
2752 Options { borrow_metadata_space: true, ..Default::default() },
2753 )
2754 .await
2755 .expect("new_transaction failed");
2756 root_vol
2757 .delete_volume("vol", transaction, || {})
2758 .await
2759 .expect("delete_volume failed");
2760
2761 fs.allocator().flush().await.expect("flush failed");
2762 }
2763
2764 fs.close().await.expect("close failed");
2765 let device = fs.take_device().await;
2766 device.reopen(false);
2767
2768 fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2769 }
2770
2771 fsck(fs.clone()).await.expect("fsck failed");
2772 fs.close().await.expect("close failed");
2773 }
2774
2775 #[fuchsia::test]
2776 async fn test_allocate_free_reallocate() {
2777 const STORE_OBJECT_ID: u64 = 99;
2778 let (fs, allocator) = test_fs().await;
2779
2780 let mut device_ranges = Vec::new();
2782 let mut transaction =
2783 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2784 for _ in 0..30 {
2785 device_ranges.push(
2786 allocator
2787 .allocate(&mut transaction, STORE_OBJECT_ID, 100 * fs.block_size())
2788 .await
2789 .expect("allocate failed"),
2790 );
2791 }
2792 transaction.commit().await.expect("commit failed");
2793
2794 assert_eq!(
2795 fs.block_size() * 3000,
2796 *allocator.get_owner_allocated_bytes().entry(STORE_OBJECT_ID).or_default()
2797 );
2798
2799 let mut transaction =
2801 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2802 for range in std::mem::replace(&mut device_ranges, Vec::new()) {
2803 allocator.deallocate(&mut transaction, STORE_OBJECT_ID, range).await.expect("dealloc");
2804 }
2805 transaction.commit().await.expect("commit failed");
2806
2807 assert_eq!(0, *allocator.get_owner_allocated_bytes().entry(STORE_OBJECT_ID).or_default());
2808
2809 let mut transaction =
2812 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2813 let target_len = 1500 * fs.block_size();
2814 while device_ranges.iter().map(|i| i.length().unwrap()).sum::<u64>() != target_len {
2815 let len = target_len - device_ranges.iter().map(|i| i.length().unwrap()).sum::<u64>();
2816 device_ranges.push(
2817 allocator
2818 .allocate(&mut transaction, STORE_OBJECT_ID, len)
2819 .await
2820 .expect("allocate failed"),
2821 );
2822 }
2823 transaction.commit().await.expect("commit failed");
2824
2825 assert_eq!(
2826 fs.block_size() * 1500,
2827 *allocator.get_owner_allocated_bytes().entry(STORE_OBJECT_ID).or_default()
2828 );
2829 }
2830
2831 #[fuchsia::test]
2832 async fn test_flush() {
2833 const STORE_OBJECT_ID: u64 = 99;
2834
2835 let mut device_ranges = Vec::new();
2836 let device = {
2837 let (fs, allocator) = test_fs().await;
2838 let mut transaction = fs
2839 .clone()
2840 .new_transaction(lock_keys![], Options::default())
2841 .await
2842 .expect("new failed");
2843 device_ranges.push(
2844 allocator
2845 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2846 .await
2847 .expect("allocate failed"),
2848 );
2849 device_ranges.push(
2850 allocator
2851 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2852 .await
2853 .expect("allocate failed"),
2854 );
2855 device_ranges.push(
2856 allocator
2857 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2858 .await
2859 .expect("allocate failed"),
2860 );
2861 transaction.commit().await.expect("commit failed");
2862
2863 allocator.flush().await.expect("flush failed");
2864
2865 fs.close().await.expect("close failed");
2866 fs.take_device().await
2867 };
2868
2869 device.reopen(false);
2870 let fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2871 let allocator = fs.allocator();
2872
2873 let allocated = collect_allocations(&allocator).await;
2874
2875 for i in &device_ranges {
2877 let mut overlapping = 0;
2878 for j in &allocated {
2879 overlapping += overlap(i, j);
2880 }
2881 assert_eq!(overlapping, i.length().unwrap(), "Range {i:?} not allocated");
2882 }
2883
2884 let mut transaction =
2885 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2886 let range = allocator
2887 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2888 .await
2889 .expect("allocate failed");
2890
2891 for r in &allocated {
2893 assert_eq!(overlap(r, &range), 0);
2894 }
2895 transaction.commit().await.expect("commit failed");
2896 }
2897
2898 #[fuchsia::test]
2899 async fn test_dropped_transaction() {
2900 const STORE_OBJECT_ID: u64 = 99;
2901 let (fs, allocator) = test_fs().await;
2902 let allocated_range = {
2903 let mut transaction = fs
2904 .clone()
2905 .new_transaction(lock_keys![], Options::default())
2906 .await
2907 .expect("new_transaction failed");
2908 allocator
2909 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2910 .await
2911 .expect("allocate failed")
2912 };
2913 let mut transaction = fs
2916 .clone()
2917 .new_transaction(lock_keys![], Options::default())
2918 .await
2919 .expect("new_transaction failed");
2920 assert_eq!(
2921 allocator
2922 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2923 .await
2924 .expect("allocate failed"),
2925 allocated_range
2926 );
2927 }
2928
2929 #[fuchsia::test]
2930 async fn test_cleanup_removed_owner() {
2931 const STORE_OBJECT_ID: u64 = 99;
2932 let device = {
2933 let (fs, allocator) = test_fs().await;
2934
2935 assert!(!allocator.get_owner_allocated_bytes().contains_key(&STORE_OBJECT_ID));
2936 {
2937 let mut transaction =
2938 fs.clone().new_transaction(lock_keys![], Options::default()).await.unwrap();
2939 allocator
2940 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2941 .await
2942 .expect("Allocating");
2943 transaction.commit().await.expect("Committing.");
2944 }
2945 allocator.flush().await.expect("Flushing");
2946 assert!(allocator.get_owner_allocated_bytes().contains_key(&STORE_OBJECT_ID));
2947 {
2948 let mut transaction =
2949 fs.clone().new_transaction(lock_keys![], Options::default()).await.unwrap();
2950 allocator.mark_for_deletion(&mut transaction, STORE_OBJECT_ID);
2951 transaction.commit().await.expect("Committing.");
2952 }
2953 assert!(!allocator.get_owner_allocated_bytes().contains_key(&STORE_OBJECT_ID));
2954 fs.close().await.expect("Closing");
2955 fs.take_device().await
2956 };
2957
2958 device.reopen(false);
2959 let fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2960 let allocator = fs.allocator();
2961 assert!(!allocator.get_owner_allocated_bytes().contains_key(&STORE_OBJECT_ID));
2962 }
2963
2964 #[fuchsia::test]
2965 async fn test_allocated_bytes() {
2966 const STORE_OBJECT_ID: u64 = 99;
2967 let (fs, allocator) = test_fs().await;
2968
2969 let initial_allocated_bytes = allocator.get_allocated_bytes();
2970
2971 let allocated_bytes = initial_allocated_bytes + fs.block_size();
2973 let allocated_range = {
2974 let mut transaction = fs
2975 .clone()
2976 .new_transaction(lock_keys![], Options::default())
2977 .await
2978 .expect("new_transaction failed");
2979 let range = allocator
2980 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2981 .await
2982 .expect("allocate failed");
2983 transaction.commit().await.expect("commit failed");
2984 assert_eq!(allocator.get_allocated_bytes(), allocated_bytes);
2985 range
2986 };
2987
2988 {
2989 let mut transaction = fs
2990 .clone()
2991 .new_transaction(lock_keys![], Options::default())
2992 .await
2993 .expect("new_transaction failed");
2994 allocator
2995 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2996 .await
2997 .expect("allocate failed");
2998
2999 assert_eq!(allocator.get_allocated_bytes(), allocated_bytes);
3001 }
3002
3003 assert_eq!(allocator.get_allocated_bytes(), allocated_bytes);
3005
3006 let deallocate_range = allocated_range.start + 20..allocated_range.end - 20;
3008 let mut transaction =
3009 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
3010 allocator
3011 .deallocate(&mut transaction, STORE_OBJECT_ID, deallocate_range)
3012 .await
3013 .expect("deallocate failed");
3014
3015 assert_eq!(allocator.get_allocated_bytes(), allocated_bytes);
3017
3018 transaction.commit().await.expect("commit failed");
3019
3020 assert_eq!(allocator.get_allocated_bytes(), initial_allocated_bytes + 40);
3022 }
3023
3024 #[fuchsia::test]
3025 async fn test_persist_bytes_limit() {
3026 const LIMIT: u64 = 12345;
3027 const OWNER_ID: u64 = 12;
3028
3029 let (fs, allocator) = test_fs().await;
3030 {
3031 let mut transaction = fs
3032 .clone()
3033 .new_transaction(lock_keys![], Options::default())
3034 .await
3035 .expect("new_transaction failed");
3036 allocator
3037 .set_bytes_limit(&mut transaction, OWNER_ID, LIMIT)
3038 .expect("Failed to set limit.");
3039 assert!(allocator.inner.lock().info.limit_bytes.get(&OWNER_ID).is_none());
3040 transaction.commit().await.expect("Failed to commit transaction");
3041 let bytes: u64 = *allocator
3042 .inner
3043 .lock()
3044 .info
3045 .limit_bytes
3046 .get(&OWNER_ID)
3047 .expect("Failed to find limit");
3048 assert_eq!(LIMIT, bytes);
3049 }
3050 }
3051
3052 fn coalesce_ranges(ranges: Vec<Range<u64>>) -> Vec<Range<u64>> {
3056 let mut coalesced = Vec::new();
3057 let mut prev: Option<Range<u64>> = None;
3058 for range in ranges {
3059 if let Some(prev_range) = &mut prev {
3060 if range.start == prev_range.end {
3061 prev_range.end = range.end;
3062 } else {
3063 coalesced.push(prev_range.clone());
3064 prev = Some(range);
3065 }
3066 } else {
3067 prev = Some(range);
3068 }
3069 }
3070 if let Some(prev_range) = prev {
3071 coalesced.push(prev_range);
3072 }
3073 coalesced
3074 }
3075
3076 #[fuchsia::test]
3077 async fn test_take_for_trimming() {
3078 const STORE_OBJECT_ID: u64 = 99;
3079
3080 let allocated_range;
3083 let expected_free_ranges;
3084 let device = {
3085 let (fs, allocator) = test_fs().await;
3086 let bs = fs.block_size();
3087 let mut transaction = fs
3088 .clone()
3089 .new_transaction(lock_keys![], Options::default())
3090 .await
3091 .expect("new failed");
3092 allocated_range = allocator
3093 .allocate(&mut transaction, STORE_OBJECT_ID, 32 * bs)
3094 .await
3095 .expect("allocate failed");
3096 transaction.commit().await.expect("commit failed");
3097
3098 let mut transaction = fs
3099 .clone()
3100 .new_transaction(lock_keys![], Options::default())
3101 .await
3102 .expect("new failed");
3103 let base = allocated_range.start;
3104 expected_free_ranges = vec![
3105 base..(base + (bs * 1)),
3106 (base + (bs * 2))..(base + (bs * 3)),
3107 (base + (bs * 4))..(base + (bs * 8)),
3111 (base + (bs * 8))..(base + (bs * 12)),
3112 (base + (bs * 12))..(base + (bs * 13)),
3113 (base + (bs * 29))..(base + (bs * 30)),
3114 ];
3115 for range in &expected_free_ranges {
3116 allocator
3117 .deallocate(&mut transaction, STORE_OBJECT_ID, range.clone())
3118 .await
3119 .expect("deallocate failed");
3120 }
3121 transaction.commit().await.expect("commit failed");
3122
3123 allocator.flush().await.expect("flush failed");
3124
3125 fs.close().await.expect("close failed");
3126 fs.take_device().await
3127 };
3128
3129 device.reopen(false);
3130 let fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
3131 let allocator = fs.allocator();
3132
3133 let max_extent_size = fs.block_size() as usize * 4;
3137 const EXTENTS_PER_BATCH: usize = 2;
3138 let mut free_ranges = vec![];
3139 let mut offset = allocated_range.start;
3140 while offset < allocated_range.end {
3141 let free = allocator
3142 .take_for_trimming(offset, max_extent_size, EXTENTS_PER_BATCH)
3143 .await
3144 .expect("take_for_trimming failed");
3145 free_ranges.extend(
3146 free.extents().iter().filter(|range| range.end <= allocated_range.end).cloned(),
3147 );
3148 offset = free.extents().last().expect("Unexpectedly hit the end of free extents").end;
3149 }
3150 let coalesced_free_ranges = coalesce_ranges(free_ranges);
3153 let coalesced_expected_free_ranges = coalesce_ranges(expected_free_ranges);
3154
3155 assert_eq!(coalesced_free_ranges, coalesced_expected_free_ranges);
3156 }
3157
3158 #[fuchsia::test]
3159 async fn test_allocations_wait_for_free_extents() {
3160 const STORE_OBJECT_ID: u64 = 99;
3161 let (fs, allocator) = test_fs().await;
3162 let allocator_clone = allocator.clone();
3163
3164 let mut transaction =
3165 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
3166
3167 let max_extent_size = fs.device().size() as usize;
3169 const EXTENTS_PER_BATCH: usize = usize::MAX;
3170
3171 let trim_done = Arc::new(Mutex::new(false));
3177 let trimmable_extents = allocator
3178 .take_for_trimming(0, max_extent_size, EXTENTS_PER_BATCH)
3179 .await
3180 .expect("take_for_trimming failed");
3181
3182 let trim_done_clone = trim_done.clone();
3183 let bs = fs.block_size();
3184 let alloc_task = fasync::Task::spawn(async move {
3185 allocator_clone
3186 .allocate(&mut transaction, STORE_OBJECT_ID, bs)
3187 .await
3188 .expect("allocate should fail");
3189 {
3190 assert!(*trim_done_clone.lock(), "Allocation finished before trim completed");
3191 }
3192 transaction.commit().await.expect("commit failed");
3193 });
3194
3195 fasync::Timer::new(std::time::Duration::from_millis(100)).await;
3198
3199 {
3201 let mut trim_done = trim_done.lock();
3202 std::mem::drop(trimmable_extents);
3203 *trim_done = true;
3204 }
3205
3206 alloc_task.await;
3207 }
3208
3209 #[fuchsia::test]
3210 async fn test_allocation_with_reservation_not_multiple_of_block_size() {
3211 const STORE_OBJECT_ID: u64 = 99;
3212 let (fs, allocator) = test_fs().await;
3213
3214 const RESERVATION_AMOUNT: u64 = TRANSACTION_METADATA_MAX_AMOUNT + 5000;
3216 let reservation =
3217 allocator.clone().reserve(Some(STORE_OBJECT_ID), RESERVATION_AMOUNT).unwrap();
3218
3219 let mut transaction = fs
3220 .clone()
3221 .new_transaction(
3222 lock_keys![],
3223 Options { allocator_reservation: Some(&reservation), ..Options::default() },
3224 )
3225 .await
3226 .expect("new failed");
3227
3228 let range = allocator
3229 .allocate(
3230 &mut transaction,
3231 STORE_OBJECT_ID,
3232 round_up(RESERVATION_AMOUNT, fs.block_size()).unwrap(),
3233 )
3234 .await
3235 .expect("allocate faiiled");
3236 assert_eq!((range.end - range.start) % fs.block_size(), 0);
3237
3238 println!("{}", range.end - range.start);
3239 }
3240}