1pub mod merge;
90pub mod strategy;
91
92use crate::drop_event::DropEvent;
93use crate::errors::FxfsError;
94use crate::filesystem::{ApplyContext, ApplyMode, FxFilesystem, JournalingObject, SyncOptions};
95use crate::log::*;
96use crate::lsm_tree::cache::NullCache;
97use crate::lsm_tree::skip_list_layer::SkipListLayer;
98use crate::lsm_tree::types::{
99 FuzzyHash, Item, ItemRef, Layer, LayerIterator, LayerKey, MergeType, OrdLowerBound,
100 OrdUpperBound, RangeKey, SortByU64, Value,
101};
102use crate::lsm_tree::{LSMTree, Query, layers_from_handles};
103use crate::object_handle::{INVALID_OBJECT_ID, ObjectHandle, ReadObjectHandle};
104use crate::object_store::object_manager::ReservationUpdate;
105use crate::object_store::transaction::{
106 AllocatorMutation, AssocObj, LockKey, Mutation, Options, Transaction, WriteGuard, lock_keys,
107};
108use crate::object_store::{DataObjectHandle, DirectWriter, HandleOptions, ObjectStore, tree};
109use crate::range::RangeExt;
110use crate::round::{round_div, round_down, round_up};
111use crate::serialized_types::{
112 DEFAULT_MAX_SERIALIZED_RECORD_SIZE, LATEST_VERSION, Version, Versioned, VersionedLatest,
113};
114use anyhow::{Context, Error, anyhow, bail, ensure};
115use async_trait::async_trait;
116use either::Either::{Left, Right};
117use event_listener::EventListener;
118use fprint::TypeFingerprint;
119use fuchsia_inspect::HistogramProperty;
120use fuchsia_sync::Mutex;
121use futures::FutureExt;
122use merge::{filter_marked_for_deletion, filter_tombstones, merge};
123use serde::{Deserialize, Serialize};
124use std::borrow::Borrow;
125use std::collections::{BTreeMap, HashSet, VecDeque};
126use std::hash::Hash;
127use std::marker::PhantomData;
128use std::num::Saturating;
129use std::ops::Range;
130use std::sync::atomic::{AtomicU64, Ordering};
131use std::sync::{Arc, Weak};
132
133pub trait ReservationOwner: Send + Sync {
135 fn release_reservation(&self, owner_object_id: Option<u64>, amount: u64);
139}
140
141pub struct ReservationImpl<T: Borrow<U>, U: ReservationOwner + ?Sized> {
149 owner: T,
150 owner_object_id: Option<u64>,
151 inner: Mutex<ReservationInner>,
152 phantom: PhantomData<U>,
153}
154
155#[derive(Debug, Default)]
156struct ReservationInner {
157 amount: u64,
159
160 reserved: u64,
162}
163
164impl<T: Borrow<U>, U: ReservationOwner + ?Sized> std::fmt::Debug for ReservationImpl<T, U> {
165 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
166 self.inner.lock().fmt(f)
167 }
168}
169
170impl<T: Borrow<U> + Clone + Send + Sync, U: ReservationOwner + ?Sized> ReservationImpl<T, U> {
171 pub fn new(owner: T, owner_object_id: Option<u64>, amount: u64) -> Self {
172 Self {
173 owner,
174 owner_object_id,
175 inner: Mutex::new(ReservationInner { amount, reserved: 0 }),
176 phantom: PhantomData,
177 }
178 }
179
180 pub fn owner_object_id(&self) -> Option<u64> {
181 self.owner_object_id
182 }
183
184 pub fn amount(&self) -> u64 {
186 self.inner.lock().amount
187 }
188
189 pub fn add(&self, amount: u64) {
191 self.inner.lock().amount += amount;
192 }
193
194 pub fn forget(&self) -> u64 {
198 let mut inner = self.inner.lock();
199 assert_eq!(inner.reserved, 0);
200 std::mem::take(&mut inner.amount)
201 }
202
203 pub fn forget_some(&self, amount: u64) {
207 let mut inner = self.inner.lock();
208 inner.amount -= amount;
209 assert!(inner.reserved <= inner.amount);
210 }
211
212 fn reserve_with(&self, amount: impl FnOnce(u64) -> u64) -> ReservationImpl<&Self, Self> {
215 let mut inner = self.inner.lock();
216 let taken = amount(inner.amount - inner.reserved);
217 inner.reserved += taken;
218 ReservationImpl::new(self, self.owner_object_id, taken)
219 }
220
221 pub fn reserve(&self, amount: u64) -> Option<ReservationImpl<&Self, Self>> {
223 let mut inner = self.inner.lock();
224 if inner.amount - inner.reserved < amount {
225 None
226 } else {
227 inner.reserved += amount;
228 Some(ReservationImpl::new(self, self.owner_object_id, amount))
229 }
230 }
231
232 pub fn commit(&self, amount: u64) {
235 let mut inner = self.inner.lock();
236 inner.reserved -= amount;
237 inner.amount -= amount;
238 }
239
240 pub fn give_back(&self, amount: u64) {
242 self.owner.borrow().release_reservation(self.owner_object_id, amount);
243 let mut inner = self.inner.lock();
244 inner.amount -= amount;
245 assert!(inner.reserved <= inner.amount);
246 }
247
248 pub fn move_to<V: Borrow<W> + Clone + Send + Sync, W: ReservationOwner + ?Sized>(
250 &self,
251 other: &ReservationImpl<V, W>,
252 amount: u64,
253 ) {
254 assert_eq!(self.owner_object_id, other.owner_object_id());
255 let mut inner = self.inner.lock();
256 if let Some(amount) = inner.amount.checked_sub(amount) {
257 inner.amount = amount;
258 } else {
259 std::mem::drop(inner);
260 panic!("Insufficient reservation space");
261 }
262 other.add(amount);
263 }
264}
265
266impl<T: Borrow<U>, U: ReservationOwner + ?Sized> Drop for ReservationImpl<T, U> {
267 fn drop(&mut self) {
268 let inner = self.inner.get_mut();
269 assert_eq!(inner.reserved, 0);
270 let owner_object_id = self.owner_object_id;
271 if inner.amount > 0 {
272 self.owner
273 .borrow()
274 .release_reservation(owner_object_id, std::mem::take(&mut inner.amount));
275 }
276 }
277}
278
279impl<T: Borrow<U> + Send + Sync, U: ReservationOwner + ?Sized> ReservationOwner
280 for ReservationImpl<T, U>
281{
282 fn release_reservation(&self, owner_object_id: Option<u64>, amount: u64) {
283 assert_eq!(owner_object_id, self.owner_object_id);
285 let mut inner = self.inner.lock();
286 assert!(inner.reserved >= amount, "{} >= {}", inner.reserved, amount);
287 inner.reserved -= amount;
288 }
289}
290
291pub type Reservation = ReservationImpl<Arc<dyn ReservationOwner>, dyn ReservationOwner>;
292
293pub type Hold<'a> = ReservationImpl<&'a Reservation, Reservation>;
294
295pub type AllocatorKey = AllocatorKeyV32;
298
299#[derive(Clone, Debug, Deserialize, Eq, Hash, PartialEq, Serialize, TypeFingerprint, Versioned)]
300#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
301pub struct AllocatorKeyV32 {
302 pub device_range: Range<u64>,
303}
304
305impl SortByU64 for AllocatorKey {
306 fn get_leading_u64(&self) -> u64 {
307 self.device_range.start
308 }
309}
310
311const EXTENT_HASH_BUCKET_SIZE: u64 = 1 * 1024 * 1024;
312
313pub struct AllocatorKeyPartitionIterator {
314 device_range: Range<u64>,
315}
316
317impl Iterator for AllocatorKeyPartitionIterator {
318 type Item = u64;
319
320 fn next(&mut self) -> Option<Self::Item> {
321 if self.device_range.start >= self.device_range.end {
322 None
323 } else {
324 let start = self.device_range.start;
325 self.device_range.start = start.saturating_add(EXTENT_HASH_BUCKET_SIZE);
326 let end = std::cmp::min(self.device_range.start, self.device_range.end);
327 let key = AllocatorKey { device_range: start..end };
328 let hash = crate::stable_hash::stable_hash(key);
329 Some(hash)
330 }
331 }
332}
333
334impl FuzzyHash for AllocatorKey {
335 fn fuzzy_hash(&self) -> impl Iterator<Item = u64> {
336 AllocatorKeyPartitionIterator {
337 device_range: round_down(self.device_range.start, EXTENT_HASH_BUCKET_SIZE)
338 ..round_up(self.device_range.end, EXTENT_HASH_BUCKET_SIZE).unwrap_or(u64::MAX),
339 }
340 }
341
342 fn is_range_key(&self) -> bool {
343 true
344 }
345}
346
347impl AllocatorKey {
348 pub fn lower_bound_for_merge_into(self: &AllocatorKey) -> AllocatorKey {
350 AllocatorKey { device_range: 0..self.device_range.start }
351 }
352}
353
354impl LayerKey for AllocatorKey {
355 fn merge_type(&self) -> MergeType {
356 MergeType::OptimizedMerge
357 }
358}
359
360impl OrdUpperBound for AllocatorKey {
361 fn cmp_upper_bound(&self, other: &AllocatorKey) -> std::cmp::Ordering {
362 self.device_range.end.cmp(&other.device_range.end)
363 }
364}
365
366impl OrdLowerBound for AllocatorKey {
367 fn cmp_lower_bound(&self, other: &AllocatorKey) -> std::cmp::Ordering {
368 self.device_range
373 .start
374 .cmp(&other.device_range.start)
375 .then(self.device_range.end.cmp(&other.device_range.end))
376 }
377}
378
379impl Ord for AllocatorKey {
380 fn cmp(&self, other: &AllocatorKey) -> std::cmp::Ordering {
381 self.device_range
382 .start
383 .cmp(&other.device_range.start)
384 .then(self.device_range.end.cmp(&other.device_range.end))
385 }
386}
387
388impl PartialOrd for AllocatorKey {
389 fn partial_cmp(&self, other: &AllocatorKey) -> Option<std::cmp::Ordering> {
390 Some(self.cmp(other))
391 }
392}
393
394impl RangeKey for AllocatorKey {
395 fn overlaps(&self, other: &Self) -> bool {
396 self.device_range.start < other.device_range.end
397 && self.device_range.end > other.device_range.start
398 }
399}
400
401pub type AllocatorValue = AllocatorValueV32;
404impl Value for AllocatorValue {
405 const DELETED_MARKER: Self = Self::None;
406}
407
408#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize, TypeFingerprint, Versioned)]
409#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
410pub enum AllocatorValueV32 {
411 None,
413 Abs { count: u64, owner_object_id: u64 },
417}
418
419pub type AllocatorItem = Item<AllocatorKey, AllocatorValue>;
420
421pub type AllocatorInfo = AllocatorInfoV32;
423
424#[derive(Debug, Default, Clone, Deserialize, Serialize, TypeFingerprint, Versioned)]
425pub struct AllocatorInfoV32 {
426 pub layers: Vec<u64>,
428 pub allocated_bytes: BTreeMap<u64, u64>,
430 pub marked_for_deletion: HashSet<u64>,
433 pub limit_bytes: BTreeMap<u64, u64>,
436}
437
438const MAX_ALLOCATOR_INFO_SERIALIZED_SIZE: usize = 131_072;
439
440pub fn max_extent_size_for_block_size(block_size: u64) -> u64 {
442 block_size * (DEFAULT_MAX_SERIALIZED_RECORD_SIZE - 64) / 9
447}
448
449#[derive(Default)]
450struct AllocatorCounters {
451 num_flushes: u64,
452 last_flush_time: Option<std::time::SystemTime>,
453}
454
455pub struct Allocator {
456 filesystem: Weak<FxFilesystem>,
457 block_size: u64,
458 device_size: u64,
459 object_id: u64,
460 max_extent_size_bytes: u64,
461 tree: LSMTree<AllocatorKey, AllocatorValue>,
462 temporary_allocations: Arc<SkipListLayer<AllocatorKey, AllocatorValue>>,
469 inner: Mutex<Inner>,
470 allocation_mutex: futures::lock::Mutex<()>,
471 counters: Mutex<AllocatorCounters>,
472 maximum_offset: AtomicU64,
473}
474
475#[derive(Debug, Default, PartialEq)]
477struct ByteTracking {
478 allocated_bytes: Saturating<u64>,
485
486 uncommitted_allocated_bytes: u64,
489
490 reserved_bytes: u64,
492
493 committed_deallocated_bytes: u64,
497}
498
499impl ByteTracking {
500 fn used_bytes(&self) -> Saturating<u64> {
503 self.allocated_bytes + Saturating(self.uncommitted_allocated_bytes + self.reserved_bytes)
504 }
505
506 fn unavailable_bytes(&self) -> Saturating<u64> {
511 self.allocated_bytes
512 + Saturating(self.uncommitted_allocated_bytes)
513 + Saturating(self.committed_deallocated_bytes)
514 }
515
516 fn unavailable_after_sync_bytes(&self) -> Saturating<u64> {
519 self.allocated_bytes + Saturating(self.uncommitted_allocated_bytes)
520 }
521}
522
523#[derive(Debug)]
524struct CommittedDeallocation {
525 log_file_offset: u64,
527 range: Range<u64>,
529 owner_object_id: u64,
531}
532
533struct Inner {
534 info: AllocatorInfo,
535
536 opened: bool,
539
540 dropped_temporary_allocations: Vec<Range<u64>>,
568
569 owner_bytes: BTreeMap<u64, ByteTracking>,
572
573 unattributed_reserved_bytes: u64,
576
577 committed_deallocated: VecDeque<CommittedDeallocation>,
579
580 trim_reserved_bytes: u64,
583
584 trim_listener: Option<EventListener>,
588
589 strategy: strategy::BestFit,
591
592 allocation_size_histogram: [u64; 64],
594 rebuild_strategy_trigger_histogram: [u64; 64],
596
597 marked_for_deletion: HashSet<u64>,
601
602 volumes_deleted_pending_sync: HashSet<u64>,
604}
605
606impl Inner {
607 fn allocated_bytes(&self) -> Saturating<u64> {
608 let mut total = Saturating(0);
609 for (_, bytes) in &self.owner_bytes {
610 total += bytes.allocated_bytes;
611 }
612 total
613 }
614
615 fn uncommitted_allocated_bytes(&self) -> u64 {
616 self.owner_bytes.values().map(|x| &x.uncommitted_allocated_bytes).sum()
617 }
618
619 fn reserved_bytes(&self) -> u64 {
620 self.owner_bytes.values().map(|x| &x.reserved_bytes).sum::<u64>()
621 + self.unattributed_reserved_bytes
622 }
623
624 fn owner_id_limit_bytes(&self, owner_object_id: u64) -> u64 {
625 match self.info.limit_bytes.get(&owner_object_id) {
626 Some(v) => *v,
627 None => u64::MAX,
628 }
629 }
630
631 fn owner_id_bytes_left(&self, owner_object_id: u64) -> u64 {
632 let limit = self.owner_id_limit_bytes(owner_object_id);
633 let used = self.owner_bytes.get(&owner_object_id).map_or(Saturating(0), |b| b.used_bytes());
634 (Saturating(limit) - used).0
635 }
636
637 fn unavailable_bytes(&self) -> Saturating<u64> {
642 let mut total = Saturating(0);
643 for (_, bytes) in &self.owner_bytes {
644 total += bytes.unavailable_bytes();
645 }
646 total
647 }
648
649 fn used_bytes(&self) -> Saturating<u64> {
652 let mut total = Saturating(0);
653 for (_, bytes) in &self.owner_bytes {
654 total += bytes.used_bytes();
655 }
656 total + Saturating(self.unattributed_reserved_bytes)
657 }
658
659 fn unavailable_after_sync_bytes(&self) -> Saturating<u64> {
662 let mut total = Saturating(0);
663 for (_, bytes) in &self.owner_bytes {
664 total += bytes.unavailable_after_sync_bytes();
665 }
666 total
667 }
668
669 fn bytes_available_not_being_trimmed(&self, device_size: u64) -> Result<u64, Error> {
672 device_size
673 .checked_sub(
674 (self.unavailable_after_sync_bytes() + Saturating(self.trim_reserved_bytes)).0,
675 )
676 .ok_or_else(|| anyhow!(FxfsError::Inconsistent))
677 }
678
679 fn add_reservation(&mut self, owner_object_id: Option<u64>, amount: u64) {
680 match owner_object_id {
681 Some(owner) => self.owner_bytes.entry(owner).or_default().reserved_bytes += amount,
682 None => self.unattributed_reserved_bytes += amount,
683 };
684 }
685
686 fn remove_reservation(&mut self, owner_object_id: Option<u64>, amount: u64) {
687 match owner_object_id {
688 Some(owner) => {
689 let owner_entry = self.owner_bytes.entry(owner).or_default();
690 assert!(
691 owner_entry.reserved_bytes >= amount,
692 "{} >= {}",
693 owner_entry.reserved_bytes,
694 amount
695 );
696 owner_entry.reserved_bytes -= amount;
697 }
698 None => {
699 assert!(
700 self.unattributed_reserved_bytes >= amount,
701 "{} >= {}",
702 self.unattributed_reserved_bytes,
703 amount
704 );
705 self.unattributed_reserved_bytes -= amount
706 }
707 };
708 }
709}
710
711pub struct TrimmableExtents<'a> {
714 allocator: &'a Allocator,
715 extents: Vec<Range<u64>>,
716 _drop_event: DropEvent,
720}
721
722impl<'a> TrimmableExtents<'a> {
723 pub fn extents(&self) -> &Vec<Range<u64>> {
724 &self.extents
725 }
726
727 fn new(allocator: &'a Allocator) -> (Self, EventListener) {
729 let drop_event = DropEvent::new();
730 let listener = drop_event.listen();
731 (Self { allocator, extents: vec![], _drop_event: drop_event }, listener)
732 }
733
734 fn add_extent(&mut self, extent: Range<u64>) {
735 self.extents.push(extent);
736 }
737}
738
739impl<'a> Drop for TrimmableExtents<'a> {
740 fn drop(&mut self) {
741 let mut inner = self.allocator.inner.lock();
742 for device_range in std::mem::take(&mut self.extents) {
743 inner.strategy.free(device_range.clone()).expect("drop trim extent");
744 self.allocator
745 .temporary_allocations
746 .erase(&AllocatorKey { device_range: device_range.clone() });
747 }
748 inner.trim_reserved_bytes = 0;
749 }
750}
751
752impl Allocator {
753 pub fn new(filesystem: Arc<FxFilesystem>, object_id: u64) -> Allocator {
754 let block_size = filesystem.block_size();
755 let device_size = round_down(filesystem.device().size(), block_size);
757 if device_size != filesystem.device().size() {
758 warn!("Device size is not block aligned. Rounding down.");
759 }
760 let max_extent_size_bytes = max_extent_size_for_block_size(filesystem.block_size());
761 let mut strategy = strategy::BestFit::default();
762 strategy.free(0..device_size).expect("new fs");
763 Allocator {
764 filesystem: Arc::downgrade(&filesystem),
765 block_size,
766 device_size,
767 object_id,
768 max_extent_size_bytes,
769 tree: LSMTree::new(merge, Box::new(NullCache {})),
770 temporary_allocations: SkipListLayer::new(1024),
771 inner: Mutex::new(Inner {
772 info: AllocatorInfo::default(),
773 opened: false,
774 dropped_temporary_allocations: Vec::new(),
775 owner_bytes: BTreeMap::new(),
776 unattributed_reserved_bytes: 0,
777 committed_deallocated: VecDeque::new(),
778 trim_reserved_bytes: 0,
779 trim_listener: None,
780 strategy,
781 allocation_size_histogram: [0; 64],
782 rebuild_strategy_trigger_histogram: [0; 64],
783 marked_for_deletion: HashSet::new(),
784 volumes_deleted_pending_sync: HashSet::new(),
785 }),
786 allocation_mutex: futures::lock::Mutex::new(()),
787 counters: Mutex::new(AllocatorCounters::default()),
788 maximum_offset: AtomicU64::new(0),
789 }
790 }
791
792 pub fn tree(&self) -> &LSMTree<AllocatorKey, AllocatorValue> {
793 &self.tree
794 }
795
796 pub async fn filter(
801 &self,
802 iter: impl LayerIterator<AllocatorKey, AllocatorValue>,
803 committed_marked_for_deletion: bool,
804 ) -> Result<impl LayerIterator<AllocatorKey, AllocatorValue>, Error> {
805 let marked_for_deletion = {
806 let inner = self.inner.lock();
807 if committed_marked_for_deletion {
808 &inner.info.marked_for_deletion
809 } else {
810 &inner.marked_for_deletion
811 }
812 .clone()
813 };
814 let iter =
815 filter_marked_for_deletion(filter_tombstones(iter).await?, marked_for_deletion).await?;
816 Ok(iter)
817 }
818
819 pub fn allocation_size_histogram(&self) -> [u64; 64] {
823 self.inner.lock().allocation_size_histogram
824 }
825
826 pub async fn create(&self, transaction: &mut Transaction<'_>) -> Result<(), Error> {
828 assert_eq!(std::mem::replace(&mut self.inner.lock().opened, true), false);
831
832 let filesystem = self.filesystem.upgrade().unwrap();
833 let root_store = filesystem.root_store();
834 ObjectStore::create_object_with_id(
835 &root_store,
836 transaction,
837 self.object_id(),
838 HandleOptions::default(),
839 None,
840 )?;
841 root_store.update_last_object_id(self.object_id());
842 Ok(())
843 }
844
845 pub async fn open(self: &Arc<Self>) -> Result<(), Error> {
848 let filesystem = self.filesystem.upgrade().unwrap();
849 let root_store = filesystem.root_store();
850
851 self.inner.lock().strategy = strategy::BestFit::default();
852
853 let handle =
854 ObjectStore::open_object(&root_store, self.object_id, HandleOptions::default(), None)
855 .await
856 .context("Failed to open allocator object")?;
857
858 if handle.get_size() > 0 {
859 let serialized_info = handle
860 .contents(MAX_ALLOCATOR_INFO_SERIALIZED_SIZE)
861 .await
862 .context("Failed to read AllocatorInfo")?;
863 let mut cursor = std::io::Cursor::new(serialized_info);
864 let (info, _version) = AllocatorInfo::deserialize_with_version(&mut cursor)
865 .context("Failed to deserialize AllocatorInfo")?;
866
867 let mut handles = Vec::new();
868 let mut total_size = 0;
869 for object_id in &info.layers {
870 let handle = ObjectStore::open_object(
871 &root_store,
872 *object_id,
873 HandleOptions::default(),
874 None,
875 )
876 .await
877 .context("Failed to open allocator layer file")?;
878
879 let size = handle.get_size();
880 total_size += size;
881 handles.push(handle);
882 }
883
884 {
885 let mut inner = self.inner.lock();
886
887 let mut device_bytes = self.device_size;
889 for (&owner_object_id, &bytes) in &info.allocated_bytes {
890 ensure!(
891 bytes <= device_bytes,
892 anyhow!(FxfsError::Inconsistent).context(format!(
893 "Allocated bytes exceeds device size: {:?}",
894 info.allocated_bytes
895 ))
896 );
897 device_bytes -= bytes;
898
899 inner.owner_bytes.entry(owner_object_id).or_default().allocated_bytes =
900 Saturating(bytes);
901 }
902
903 inner.info = info;
904 }
905
906 self.tree.append_layers(handles).await.context("Failed to append allocator layers")?;
907 self.filesystem.upgrade().unwrap().object_manager().update_reservation(
908 self.object_id,
909 tree::reservation_amount_from_layer_size(total_size),
910 );
911 }
912
913 Ok(())
914 }
915
916 pub async fn on_replay_complete(self: &Arc<Self>) -> Result<(), Error> {
917 {
919 let mut inner = self.inner.lock();
920 inner.volumes_deleted_pending_sync.clear();
921 inner.marked_for_deletion = inner.info.marked_for_deletion.clone();
922 }
923
924 if !self.rebuild_strategy().await.context("Build free extents")? {
928 error!("Device contains no free space.");
929 return Err(FxfsError::Inconsistent).context("Device appears to contain no free space");
930 }
931
932 assert_eq!(std::mem::replace(&mut self.inner.lock().opened, true), false);
933 Ok(())
934 }
935
936 async fn rebuild_strategy(self: &Arc<Self>) -> Result<bool, Error> {
941 let mut changed = false;
942 let mut layer_set = self.tree.empty_layer_set();
943 layer_set.layers.push((self.temporary_allocations.clone() as Arc<dyn Layer<_, _>>).into());
944 self.tree.add_all_layers_to_layer_set(&mut layer_set);
945
946 let overflow_markers = self.inner.lock().strategy.overflow_markers();
947 self.inner.lock().strategy.reset_overflow_markers();
948
949 let mut to_add = Vec::new();
950 let mut merger = layer_set.merger();
951 let mut iter = self.filter(merger.query(Query::FullScan).await?, false).await?;
952 let mut last_offset = 0;
953 while last_offset < self.device_size {
954 let next_range = match iter.get() {
955 None => {
956 assert!(last_offset <= self.device_size);
957 let range = last_offset..self.device_size;
958 last_offset = self.device_size;
959 iter.advance().await?;
960 range
961 }
962 Some(ItemRef { key: AllocatorKey { device_range, .. }, .. }) => {
963 if device_range.end < last_offset {
964 iter.advance().await?;
965 continue;
966 }
967 if device_range.start <= last_offset {
968 last_offset = device_range.end;
969 iter.advance().await?;
970 continue;
971 }
972 let range = last_offset..device_range.start;
973 last_offset = device_range.end;
974 iter.advance().await?;
975 range
976 }
977 };
978 to_add.push(next_range);
979 if to_add.len() > 100 {
981 let mut inner = self.inner.lock();
982 for range in to_add.drain(..) {
983 changed |= inner.strategy.force_free(range)?;
984 }
985 }
986 }
987 let mut inner = self.inner.lock();
988 for range in to_add {
989 changed |= inner.strategy.force_free(range)?;
990 }
991 if overflow_markers != inner.strategy.overflow_markers() {
992 changed = true;
993 }
994 Ok(changed)
995 }
996
997 pub async fn take_for_trimming(
1001 &self,
1002 offset: u64,
1003 max_extent_size: usize,
1004 extents_per_batch: usize,
1005 ) -> Result<TrimmableExtents<'_>, Error> {
1006 let _guard = self.allocation_mutex.lock().await;
1007
1008 let (mut result, listener) = TrimmableExtents::new(self);
1009 let mut bytes = 0;
1010
1011 let mut layer_set = self.tree.empty_layer_set();
1014 layer_set.layers.push((self.temporary_allocations.clone() as Arc<dyn Layer<_, _>>).into());
1015 self.tree.add_all_layers_to_layer_set(&mut layer_set);
1016 let mut merger = layer_set.merger();
1017 let mut iter = self
1018 .filter(
1019 merger.query(Query::FullRange(&AllocatorKey { device_range: offset..0 })).await?,
1020 false,
1021 )
1022 .await?;
1023 let mut last_offset = offset;
1024 'outer: while last_offset < self.device_size {
1025 let mut range = match iter.get() {
1026 None => {
1027 assert!(last_offset <= self.device_size);
1028 let range = last_offset..self.device_size;
1029 last_offset = self.device_size;
1030 iter.advance().await?;
1031 range
1032 }
1033 Some(ItemRef { key: AllocatorKey { device_range, .. }, .. }) => {
1034 if device_range.end <= last_offset {
1035 iter.advance().await?;
1036 continue;
1037 }
1038 if device_range.start <= last_offset {
1039 last_offset = device_range.end;
1040 iter.advance().await?;
1041 continue;
1042 }
1043 let range = last_offset..device_range.start;
1044 last_offset = device_range.end;
1045 iter.advance().await?;
1046 range
1047 }
1048 };
1049 if range.start < offset {
1050 continue;
1051 }
1052
1053 let mut inner = self.inner.lock();
1056
1057 while range.start < range.end {
1058 let prefix =
1059 range.start..std::cmp::min(range.start + max_extent_size as u64, range.end);
1060 range = prefix.end..range.end;
1061 bytes += prefix.length()?;
1062 inner.strategy.remove(prefix.clone());
1065 self.temporary_allocations.insert(AllocatorItem {
1066 key: AllocatorKey { device_range: prefix.clone() },
1067 value: AllocatorValue::Abs { owner_object_id: INVALID_OBJECT_ID, count: 1 },
1068 sequence: 0,
1069 })?;
1070 result.add_extent(prefix);
1071 if result.extents.len() == extents_per_batch {
1072 break 'outer;
1073 }
1074 }
1075 if result.extents.len() == extents_per_batch {
1076 break 'outer;
1077 }
1078 }
1079 {
1080 let mut inner = self.inner.lock();
1081
1082 assert!(inner.trim_reserved_bytes == 0, "Multiple trims ongoing");
1083 inner.trim_listener = Some(listener);
1084 inner.trim_reserved_bytes = bytes;
1085 debug_assert!(
1086 (Saturating(inner.trim_reserved_bytes) + inner.unavailable_bytes()).0
1087 <= self.device_size
1088 );
1089 }
1090 Ok(result)
1091 }
1092
1093 pub fn parent_objects(&self) -> Vec<u64> {
1095 self.inner.lock().info.layers.clone()
1098 }
1099
1100 pub fn owner_byte_limits(&self) -> Vec<(u64, u64)> {
1102 self.inner.lock().info.limit_bytes.iter().map(|(k, v)| (*k, *v)).collect()
1103 }
1104
1105 pub fn owner_allocation_info(&self, owner_object_id: u64) -> (u64, Option<u64>) {
1107 let inner = self.inner.lock();
1108 (
1109 inner.owner_bytes.get(&owner_object_id).map(|b| b.used_bytes().0).unwrap_or(0u64),
1110 inner.info.limit_bytes.get(&owner_object_id).copied(),
1111 )
1112 }
1113
1114 pub fn owner_bytes_debug(&self) -> String {
1116 format!("{:?}", self.inner.lock().owner_bytes)
1117 }
1118
1119 fn needs_sync(&self) -> bool {
1120 let inner = self.inner.lock();
1125 inner.unavailable_bytes().0 >= self.device_size
1126 }
1127
1128 fn is_system_store(&self, owner_object_id: u64) -> bool {
1129 let fs = self.filesystem.upgrade().unwrap();
1130 owner_object_id == fs.object_manager().root_store_object_id()
1131 || owner_object_id == fs.object_manager().root_parent_store_object_id()
1132 }
1133
1134 pub fn disown_reservation(&self, old_owner_object_id: Option<u64>, amount: u64) {
1137 if old_owner_object_id.is_none() || amount == 0 {
1138 return;
1139 }
1140 let mut inner = self.inner.lock();
1142 inner.remove_reservation(old_owner_object_id, amount);
1143 inner.add_reservation(None, amount);
1144 }
1145
1146 pub fn track_statistics(self: &Arc<Self>, parent: &fuchsia_inspect::Node, name: &str) {
1149 let this = Arc::downgrade(self);
1150 parent.record_lazy_child(name, move || {
1151 let this_clone = this.clone();
1152 async move {
1153 let inspector = fuchsia_inspect::Inspector::default();
1154 if let Some(this) = this_clone.upgrade() {
1155 let counters = this.counters.lock();
1156 let root = inspector.root();
1157 root.record_uint("max_extent_size_bytes", this.max_extent_size_bytes);
1158 root.record_uint("bytes_total", this.device_size);
1159 let (allocated, reserved, used, unavailable) = {
1160 let inner = this.inner.lock();
1162 (
1163 inner.allocated_bytes().0,
1164 inner.reserved_bytes(),
1165 inner.used_bytes().0,
1166 inner.unavailable_bytes().0,
1167 )
1168 };
1169 root.record_uint("bytes_allocated", allocated);
1170 root.record_uint("bytes_reserved", reserved);
1171 root.record_uint("bytes_used", used);
1172 root.record_uint("bytes_unavailable", unavailable);
1173
1174 if let Some(x) = round_div(100 * allocated, this.device_size) {
1177 root.record_uint("bytes_allocated_percent", x);
1178 }
1179 if let Some(x) = round_div(100 * reserved, this.device_size) {
1180 root.record_uint("bytes_reserved_percent", x);
1181 }
1182 if let Some(x) = round_div(100 * used, this.device_size) {
1183 root.record_uint("bytes_used_percent", x);
1184 }
1185 if let Some(x) = round_div(100 * unavailable, this.device_size) {
1186 root.record_uint("bytes_unavailable_percent", x);
1187 }
1188
1189 root.record_uint("num_flushes", counters.num_flushes);
1190 if let Some(last_flush_time) = counters.last_flush_time.as_ref() {
1191 root.record_uint(
1192 "last_flush_time_ms",
1193 last_flush_time
1194 .duration_since(std::time::UNIX_EPOCH)
1195 .unwrap_or(std::time::Duration::ZERO)
1196 .as_millis()
1197 .try_into()
1198 .unwrap_or(0u64),
1199 );
1200 }
1201
1202 let data = this.allocation_size_histogram();
1203 let alloc_sizes = root.create_uint_linear_histogram(
1204 "allocation_size_histogram",
1205 fuchsia_inspect::LinearHistogramParams {
1206 floor: 1,
1207 step_size: 1,
1208 buckets: 64,
1209 },
1210 );
1211 for (i, count) in data.iter().enumerate() {
1212 if i != 0 {
1213 alloc_sizes.insert_multiple(i as u64, *count as usize);
1214 }
1215 }
1216 root.record(alloc_sizes);
1217
1218 let data = this.inner.lock().rebuild_strategy_trigger_histogram;
1219 let triggers = root.create_uint_linear_histogram(
1220 "rebuild_strategy_triggers",
1221 fuchsia_inspect::LinearHistogramParams {
1222 floor: 1,
1223 step_size: 1,
1224 buckets: 64,
1225 },
1226 );
1227 for (i, count) in data.iter().enumerate() {
1228 if i != 0 {
1229 triggers.insert_multiple(i as u64, *count as usize);
1230 }
1231 }
1232 root.record(triggers);
1233 }
1234 Ok(inspector)
1235 }
1236 .boxed()
1237 });
1238 }
1239
1240 pub fn maximum_offset(&self) -> u64 {
1245 self.maximum_offset.load(Ordering::Relaxed)
1246 }
1247}
1248
1249impl Drop for Allocator {
1250 fn drop(&mut self) {
1251 let inner = self.inner.lock();
1252 assert_eq!(inner.uncommitted_allocated_bytes(), 0);
1254 assert_eq!(inner.reserved_bytes(), 0);
1255 }
1256}
1257
1258#[fxfs_trace::trace]
1259impl Allocator {
1260 pub fn object_id(&self) -> u64 {
1262 self.object_id
1263 }
1264
1265 pub fn info(&self) -> AllocatorInfo {
1268 self.inner.lock().info.clone()
1269 }
1270
1271 #[trace]
1279 pub async fn allocate(
1280 self: &Arc<Self>,
1281 transaction: &mut Transaction<'_>,
1282 owner_object_id: u64,
1283 mut len: u64,
1284 ) -> Result<Range<u64>, Error> {
1285 assert_eq!(len % self.block_size, 0);
1286 len = std::cmp::min(len, self.max_extent_size_bytes);
1287 debug_assert_ne!(owner_object_id, INVALID_OBJECT_ID);
1288
1289 let reservation = if let Some(reservation) = transaction.allocator_reservation {
1291 match reservation.owner_object_id {
1292 None => assert!(self.is_system_store(owner_object_id)),
1294 Some(res_owner_object_id) => assert_eq!(owner_object_id, res_owner_object_id),
1296 };
1297 let r = reservation
1299 .reserve_with(|limit| std::cmp::min(len, round_down(limit, self.block_size)));
1300 len = r.amount();
1301 Left(r)
1302 } else {
1303 let mut inner = self.inner.lock();
1304 assert!(inner.opened);
1305 let device_used = inner.used_bytes();
1307 let owner_bytes_left = inner.owner_id_bytes_left(owner_object_id);
1308 let limit =
1310 std::cmp::min(owner_bytes_left, (Saturating(self.device_size) - device_used).0);
1311 len = round_down(std::cmp::min(len, limit), self.block_size);
1312 let owner_entry = inner.owner_bytes.entry(owner_object_id).or_default();
1313 owner_entry.reserved_bytes += len;
1314 Right(ReservationImpl::<_, Self>::new(&**self, Some(owner_object_id), len))
1315 };
1316
1317 ensure!(len > 0, FxfsError::NoSpace);
1318
1319 let volumes_deleted = {
1321 let inner = self.inner.lock();
1322 (!inner.volumes_deleted_pending_sync.is_empty())
1323 .then(|| inner.volumes_deleted_pending_sync.clone())
1324 };
1325
1326 if let Some(volumes_deleted) = volumes_deleted {
1327 self.filesystem
1330 .upgrade()
1331 .unwrap()
1332 .sync(SyncOptions {
1333 flush_device: true,
1334 precondition: Some(Box::new(|| {
1335 !self.inner.lock().volumes_deleted_pending_sync.is_empty()
1336 })),
1337 ..Default::default()
1338 })
1339 .await?;
1340
1341 {
1342 let mut inner = self.inner.lock();
1343 for owner_id in volumes_deleted {
1344 inner.volumes_deleted_pending_sync.remove(&owner_id);
1345 inner.marked_for_deletion.insert(owner_id);
1346 }
1347 }
1348
1349 let _guard = self.allocation_mutex.lock().await;
1350 self.rebuild_strategy().await?;
1351 }
1352
1353 #[allow(clippy::never_loop)] let _guard = 'sync: loop {
1355 for _ in 0..10 {
1357 {
1358 let guard = self.allocation_mutex.lock().await;
1359
1360 if !self.needs_sync() {
1361 break 'sync guard;
1362 }
1363 }
1364
1365 self.filesystem
1375 .upgrade()
1376 .unwrap()
1377 .sync(SyncOptions {
1378 flush_device: true,
1379 precondition: Some(Box::new(|| self.needs_sync())),
1380 ..Default::default()
1381 })
1382 .await?;
1383 }
1384 bail!(
1385 anyhow!(FxfsError::NoSpace).context("Sync failed to yield sufficient free space.")
1386 );
1387 };
1388
1389 let mut trim_listener = None;
1390 {
1391 let mut inner = self.inner.lock();
1392 inner.allocation_size_histogram[std::cmp::min(63, len / self.block_size) as usize] += 1;
1393
1394 let avail = self
1397 .device_size
1398 .checked_sub(inner.unavailable_bytes().0)
1399 .ok_or(FxfsError::Inconsistent)?;
1400 let free_and_not_being_trimmed =
1401 inner.bytes_available_not_being_trimmed(self.device_size)?;
1402 if free_and_not_being_trimmed < std::cmp::min(len, avail) {
1403 debug_assert!(inner.trim_reserved_bytes > 0);
1404 trim_listener = std::mem::take(&mut inner.trim_listener);
1405 }
1406 }
1407
1408 if let Some(listener) = trim_listener {
1409 listener.await;
1410 }
1411
1412 let result = loop {
1413 {
1414 let mut inner = self.inner.lock();
1415
1416 for device_range in inner.dropped_temporary_allocations.drain(..) {
1419 self.temporary_allocations.erase(&AllocatorKey { device_range });
1420 }
1421
1422 match inner.strategy.allocate(len) {
1423 Err(FxfsError::NotFound) => {
1424 inner.rebuild_strategy_trigger_histogram
1426 [std::cmp::min(63, (len / self.block_size) as usize)] += 1;
1427 }
1428 Err(err) => {
1429 error!(err:%; "Likely filesystem corruption.");
1430 return Err(err.into());
1431 }
1432 Ok(x) => {
1433 break x;
1434 }
1435 }
1436 }
1437 if !self.rebuild_strategy().await? {
1441 error!("Cannot find additional free space. Corruption?");
1442 return Err(FxfsError::Inconsistent.into());
1443 }
1444 };
1445
1446 debug!(device_range:? = result; "allocate");
1447
1448 let len = result.length().unwrap();
1449 let reservation_owner = reservation.either(
1450 |l| {
1452 l.forget_some(len);
1453 l.owner_object_id()
1454 },
1455 |r| {
1456 r.forget_some(len);
1457 r.owner_object_id()
1458 },
1459 );
1460
1461 {
1462 let mut inner = self.inner.lock();
1463 let owner_entry = inner.owner_bytes.entry(owner_object_id).or_default();
1464 owner_entry.uncommitted_allocated_bytes += len;
1465 assert_eq!(owner_object_id, reservation_owner.unwrap_or(owner_object_id));
1467 inner.remove_reservation(reservation_owner, len);
1468 self.temporary_allocations.insert(AllocatorItem {
1469 key: AllocatorKey { device_range: result.clone() },
1470 value: AllocatorValue::Abs { owner_object_id, count: 1 },
1471 sequence: 0,
1472 })?;
1473 }
1474
1475 let mutation =
1476 AllocatorMutation::Allocate { device_range: result.clone().into(), owner_object_id };
1477 assert!(transaction.add(self.object_id(), Mutation::Allocator(mutation)).is_none());
1478
1479 Ok(result)
1480 }
1481
1482 #[trace]
1485 pub fn mark_allocated(
1486 &self,
1487 transaction: &mut Transaction<'_>,
1488 owner_object_id: u64,
1489 device_range: Range<u64>,
1490 ) -> Result<(), Error> {
1491 debug_assert_ne!(owner_object_id, INVALID_OBJECT_ID);
1492 {
1493 let len = device_range.length().map_err(|_| FxfsError::InvalidArgs)?;
1494
1495 let mut inner = self.inner.lock();
1496 let device_used = inner.used_bytes();
1497 let owner_id_bytes_left = inner.owner_id_bytes_left(owner_object_id);
1498 let owner_entry = inner.owner_bytes.entry(owner_object_id).or_default();
1499 ensure!(
1500 device_range.end <= self.device_size
1501 && (Saturating(self.device_size) - device_used).0 >= len
1502 && owner_id_bytes_left >= len,
1503 FxfsError::NoSpace
1504 );
1505 if let Some(reservation) = &mut transaction.allocator_reservation {
1506 reservation.reserve(len).ok_or(FxfsError::NoSpace)?.forget();
1508 }
1509 owner_entry.uncommitted_allocated_bytes += len;
1510 inner.strategy.remove(device_range.clone());
1511 self.temporary_allocations.insert(AllocatorItem {
1512 key: AllocatorKey { device_range: device_range.clone() },
1513 value: AllocatorValue::Abs { owner_object_id, count: 1 },
1514 sequence: 0,
1515 })?;
1516 }
1517 let mutation =
1518 AllocatorMutation::Allocate { device_range: device_range.into(), owner_object_id };
1519 transaction.add(self.object_id(), Mutation::Allocator(mutation));
1520 Ok(())
1521 }
1522
1523 pub fn set_bytes_limit(
1525 &self,
1526 transaction: &mut Transaction<'_>,
1527 owner_object_id: u64,
1528 bytes: u64,
1529 ) -> Result<(), Error> {
1530 assert!(!self.is_system_store(owner_object_id));
1532 transaction.add(
1533 self.object_id(),
1534 Mutation::Allocator(AllocatorMutation::SetLimit { owner_object_id, bytes }),
1535 );
1536 Ok(())
1537 }
1538
1539 pub fn get_bytes_limit(&self, owner_object_id: u64) -> Option<u64> {
1541 self.inner.lock().info.limit_bytes.get(&owner_object_id).copied()
1542 }
1543
1544 #[trace]
1546 pub async fn deallocate(
1547 &self,
1548 transaction: &mut Transaction<'_>,
1549 owner_object_id: u64,
1550 dealloc_range: Range<u64>,
1551 ) -> Result<u64, Error> {
1552 debug!(device_range:? = dealloc_range; "deallocate");
1553 ensure!(dealloc_range.is_valid(), FxfsError::InvalidArgs);
1554 let deallocated = dealloc_range.end - dealloc_range.start;
1558 let mutation = AllocatorMutation::Deallocate {
1559 device_range: dealloc_range.clone().into(),
1560 owner_object_id,
1561 };
1562 transaction.add(self.object_id(), Mutation::Allocator(mutation));
1563
1564 let _guard = self.allocation_mutex.lock().await;
1565
1566 let mut inner = self.inner.lock();
1578 for device_range in inner.dropped_temporary_allocations.drain(..) {
1579 self.temporary_allocations.erase(&AllocatorKey { device_range });
1580 }
1581
1582 self.temporary_allocations
1587 .insert(AllocatorItem {
1588 key: AllocatorKey { device_range: dealloc_range.clone() },
1589 value: AllocatorValue::Abs { owner_object_id, count: 1 },
1590 sequence: 0,
1591 })
1592 .context("tracking deallocated")?;
1593
1594 Ok(deallocated)
1595 }
1596
1597 pub fn mark_for_deletion(&self, transaction: &mut Transaction<'_>, owner_object_id: u64) {
1614 transaction.add(
1617 self.object_id(),
1618 Mutation::Allocator(AllocatorMutation::MarkForDeletion(owner_object_id)),
1619 );
1620 }
1621
1622 pub fn did_flush_device(&self, flush_log_offset: u64) {
1625 #[allow(clippy::never_loop)] let deallocs = 'deallocs_outer: loop {
1630 let mut inner = self.inner.lock();
1631 for (index, dealloc) in inner.committed_deallocated.iter().enumerate() {
1632 if dealloc.log_file_offset >= flush_log_offset {
1633 let mut deallocs = inner.committed_deallocated.split_off(index);
1634 std::mem::swap(&mut inner.committed_deallocated, &mut deallocs);
1636 break 'deallocs_outer deallocs;
1637 }
1638 }
1639 break std::mem::take(&mut inner.committed_deallocated);
1640 };
1641
1642 let mut inner = self.inner.lock();
1644 let mut totals = BTreeMap::<u64, u64>::new();
1645 for dealloc in deallocs {
1646 *(totals.entry(dealloc.owner_object_id).or_default()) +=
1647 dealloc.range.length().unwrap();
1648 inner.strategy.free(dealloc.range.clone()).expect("dealloced ranges");
1649 self.temporary_allocations.erase(&AllocatorKey { device_range: dealloc.range.clone() });
1650 }
1651
1652 for (owner_object_id, total) in totals {
1656 match inner.owner_bytes.get_mut(&owner_object_id) {
1657 Some(counters) => counters.committed_deallocated_bytes -= total,
1658 None => panic!("Failed to decrement for unknown owner: {}", owner_object_id),
1659 }
1660 }
1661 }
1662
1663 pub fn reserve(
1666 self: Arc<Self>,
1667 owner_object_id: Option<u64>,
1668 amount: u64,
1669 ) -> Option<Reservation> {
1670 {
1671 let mut inner = self.inner.lock();
1672
1673 let device_free = (Saturating(self.device_size) - inner.used_bytes()).0;
1674
1675 let limit = match owner_object_id {
1676 Some(id) => std::cmp::min(inner.owner_id_bytes_left(id), device_free),
1677 None => device_free,
1678 };
1679 if limit < amount {
1680 return None;
1681 }
1682 inner.add_reservation(owner_object_id, amount);
1683 }
1684 Some(Reservation::new(self, owner_object_id, amount))
1685 }
1686
1687 pub fn reserve_with(
1690 self: Arc<Self>,
1691 owner_object_id: Option<u64>,
1692 amount: impl FnOnce(u64) -> u64,
1693 ) -> Reservation {
1694 let amount = {
1695 let mut inner = self.inner.lock();
1696
1697 let device_free = (Saturating(self.device_size) - inner.used_bytes()).0;
1698
1699 let amount = amount(match owner_object_id {
1700 Some(id) => std::cmp::min(inner.owner_id_bytes_left(id), device_free),
1701 None => device_free,
1702 });
1703
1704 inner.add_reservation(owner_object_id, amount);
1705
1706 amount
1707 };
1708
1709 Reservation::new(self, owner_object_id, amount)
1710 }
1711
1712 pub fn get_allocated_bytes(&self) -> u64 {
1714 self.inner.lock().allocated_bytes().0
1715 }
1716
1717 pub fn get_disk_bytes(&self) -> u64 {
1719 self.device_size
1720 }
1721
1722 pub fn get_owner_allocated_bytes(&self) -> BTreeMap<u64, u64> {
1726 self.inner.lock().owner_bytes.iter().map(|(k, v)| (*k, v.allocated_bytes.0)).collect()
1727 }
1728
1729 pub fn get_used_bytes(&self) -> Saturating<u64> {
1731 let inner = self.inner.lock();
1732 inner.used_bytes()
1733 }
1734}
1735
1736impl ReservationOwner for Allocator {
1737 fn release_reservation(&self, owner_object_id: Option<u64>, amount: u64) {
1738 self.inner.lock().remove_reservation(owner_object_id, amount);
1739 }
1740}
1741
1742#[async_trait]
1743impl JournalingObject for Allocator {
1744 fn apply_mutation(
1745 &self,
1746 mutation: Mutation,
1747 context: &ApplyContext<'_, '_>,
1748 _assoc_obj: AssocObj<'_>,
1749 ) -> Result<(), Error> {
1750 match mutation {
1751 Mutation::Allocator(AllocatorMutation::MarkForDeletion(owner_object_id)) => {
1752 let mut inner = self.inner.lock();
1753 inner.owner_bytes.remove(&owner_object_id);
1754
1755 inner.info.marked_for_deletion.insert(owner_object_id);
1760 inner.volumes_deleted_pending_sync.insert(owner_object_id);
1761
1762 inner.info.limit_bytes.remove(&owner_object_id);
1763 }
1764 Mutation::Allocator(AllocatorMutation::Allocate { device_range, owner_object_id }) => {
1765 self.maximum_offset.fetch_max(device_range.end, Ordering::Relaxed);
1766 let item = AllocatorItem {
1767 key: AllocatorKey { device_range: device_range.clone().into() },
1768 value: AllocatorValue::Abs { count: 1, owner_object_id },
1769 sequence: context.checkpoint.file_offset,
1770 };
1771 let len = item.key.device_range.length().unwrap();
1772 let lower_bound = item.key.lower_bound_for_merge_into();
1773 self.tree.merge_into(item, &lower_bound);
1774 let mut inner = self.inner.lock();
1775 let entry = inner.owner_bytes.entry(owner_object_id).or_default();
1776 entry.allocated_bytes += len;
1777 if let ApplyMode::Live(transaction) = context.mode {
1778 entry.uncommitted_allocated_bytes -= len;
1779 inner.dropped_temporary_allocations.push(device_range.into());
1784 if let Some(reservation) = transaction.allocator_reservation {
1785 reservation.commit(len);
1786 }
1787 }
1788 }
1789 Mutation::Allocator(AllocatorMutation::Deallocate {
1790 device_range,
1791 owner_object_id,
1792 }) => {
1793 let item = AllocatorItem {
1794 key: AllocatorKey { device_range: device_range.into() },
1795 value: AllocatorValue::None,
1796 sequence: context.checkpoint.file_offset,
1797 };
1798 let len = item.key.device_range.length().unwrap();
1799
1800 {
1801 let mut inner = self.inner.lock();
1802 {
1803 let entry = inner.owner_bytes.entry(owner_object_id).or_default();
1804 entry.allocated_bytes -= len;
1805 if context.mode.is_live() {
1806 entry.committed_deallocated_bytes += len;
1807 }
1808 }
1809 if context.mode.is_live() {
1810 inner.committed_deallocated.push_back(CommittedDeallocation {
1811 log_file_offset: context.checkpoint.file_offset,
1812 range: item.key.device_range.clone(),
1813 owner_object_id,
1814 });
1815 }
1816 if let ApplyMode::Live(Transaction {
1817 allocator_reservation: Some(reservation),
1818 ..
1819 }) = context.mode
1820 {
1821 inner.add_reservation(reservation.owner_object_id(), len);
1822 reservation.add(len);
1823 }
1824 }
1825 let lower_bound = item.key.lower_bound_for_merge_into();
1826 self.tree.merge_into(item, &lower_bound);
1827 }
1828 Mutation::Allocator(AllocatorMutation::SetLimit { owner_object_id, bytes }) => {
1829 self.inner.lock().info.limit_bytes.insert(owner_object_id, bytes);
1834 }
1835 Mutation::BeginFlush => {
1836 self.tree.seal();
1837 let mut inner = self.inner.lock();
1840 let allocated_bytes =
1841 inner.owner_bytes.iter().map(|(k, v)| (*k, v.allocated_bytes.0)).collect();
1842 inner.info.allocated_bytes = allocated_bytes;
1843 }
1844 Mutation::EndFlush => {}
1845 _ => bail!("unexpected mutation: {:?}", mutation),
1846 }
1847 Ok(())
1848 }
1849
1850 fn drop_mutation(&self, mutation: Mutation, transaction: &Transaction<'_>) {
1851 match mutation {
1852 Mutation::Allocator(AllocatorMutation::Allocate { device_range, owner_object_id }) => {
1853 let len = device_range.length().unwrap();
1854 let mut inner = self.inner.lock();
1855 inner
1856 .owner_bytes
1857 .entry(owner_object_id)
1858 .or_default()
1859 .uncommitted_allocated_bytes -= len;
1860 if let Some(reservation) = transaction.allocator_reservation {
1861 let res_owner = reservation.owner_object_id();
1862 inner.add_reservation(res_owner, len);
1863 reservation.release_reservation(res_owner, len);
1864 }
1865 inner.strategy.free(device_range.clone().into()).expect("drop mutaton");
1866 self.temporary_allocations
1867 .erase(&AllocatorKey { device_range: device_range.into() });
1868 }
1869 Mutation::Allocator(AllocatorMutation::Deallocate { device_range, .. }) => {
1870 self.temporary_allocations
1871 .erase(&AllocatorKey { device_range: device_range.into() });
1872 }
1873 _ => {}
1874 }
1875 }
1876
1877 async fn flush(&self) -> Result<Version, Error> {
1878 let filesystem = self.filesystem.upgrade().unwrap();
1879 let object_manager = filesystem.object_manager();
1880 let earliest_version = self.tree.get_earliest_version();
1881 if !object_manager.needs_flush(self.object_id()) && earliest_version == LATEST_VERSION {
1882 return Ok(earliest_version);
1884 }
1885
1886 let fs = self.filesystem.upgrade().unwrap();
1887 let mut flusher = Flusher::new(self, &fs).await;
1888 let (new_layer_file, info) = flusher.start().await?;
1889 flusher.finish(new_layer_file, info).await
1890 }
1891}
1892
1893pub struct CoalescingIterator<I> {
1906 iter: I,
1907 item: Option<AllocatorItem>,
1908}
1909
1910impl<I: LayerIterator<AllocatorKey, AllocatorValue>> CoalescingIterator<I> {
1911 pub async fn new(iter: I) -> Result<CoalescingIterator<I>, Error> {
1912 let mut iter = Self { iter, item: None };
1913 iter.advance().await?;
1914 Ok(iter)
1915 }
1916}
1917
1918#[async_trait]
1919impl<I: LayerIterator<AllocatorKey, AllocatorValue>> LayerIterator<AllocatorKey, AllocatorValue>
1920 for CoalescingIterator<I>
1921{
1922 async fn advance(&mut self) -> Result<(), Error> {
1923 self.item = self.iter.get().map(|x| x.cloned());
1924 if self.item.is_none() {
1925 return Ok(());
1926 }
1927 let left = self.item.as_mut().unwrap();
1928 loop {
1929 self.iter.advance().await?;
1930 match self.iter.get() {
1931 None => return Ok(()),
1932 Some(right) => {
1933 ensure!(
1935 left.key.device_range.end <= right.key.device_range.start,
1936 FxfsError::Inconsistent
1937 );
1938 if left.key.device_range.end < right.key.device_range.start
1940 || left.value != *right.value
1941 {
1942 return Ok(());
1943 }
1944 left.key.device_range.end = right.key.device_range.end;
1945 }
1946 }
1947 }
1948 }
1949
1950 fn get(&self) -> Option<ItemRef<'_, AllocatorKey, AllocatorValue>> {
1951 self.item.as_ref().map(|x| x.as_item_ref())
1952 }
1953}
1954
1955struct Flusher<'a> {
1956 allocator: &'a Allocator,
1957 fs: &'a Arc<FxFilesystem>,
1958 _guard: WriteGuard<'a>,
1959}
1960
1961impl<'a> Flusher<'a> {
1962 async fn new(allocator: &'a Allocator, fs: &'a Arc<FxFilesystem>) -> Self {
1963 let keys = lock_keys![LockKey::flush(allocator.object_id())];
1964 Self { allocator, fs, _guard: fs.lock_manager().write_lock(keys).await }
1965 }
1966
1967 fn txn_options(allocator_reservation: &Reservation) -> Options<'_> {
1968 Options {
1969 skip_journal_checks: true,
1970 borrow_metadata_space: true,
1971 allocator_reservation: Some(allocator_reservation),
1972 ..Default::default()
1973 }
1974 }
1975
1976 async fn start(&mut self) -> Result<(DataObjectHandle<ObjectStore>, AllocatorInfo), Error> {
1977 let object_manager = self.fs.object_manager();
1978 let mut transaction = self
1979 .fs
1980 .clone()
1981 .new_transaction(lock_keys![], Self::txn_options(object_manager.metadata_reservation()))
1982 .await?;
1983
1984 let root_store = self.fs.root_store();
1985 let layer_object_handle = ObjectStore::create_object(
1986 &root_store,
1987 &mut transaction,
1988 HandleOptions { skip_journal_checks: true, ..Default::default() },
1989 None,
1990 )
1991 .await?;
1992 root_store.add_to_graveyard(&mut transaction, layer_object_handle.object_id());
1993 transaction.add(self.allocator.object_id(), Mutation::BeginFlush);
2000 let info = transaction
2001 .commit_with_callback(|_| {
2002 self.allocator.inner.lock().info.clone()
2005 })
2006 .await?;
2007 Ok((layer_object_handle, info))
2008 }
2009
2010 async fn finish(
2011 self,
2012 layer_object_handle: DataObjectHandle<ObjectStore>,
2013 mut info: AllocatorInfo,
2014 ) -> Result<Version, Error> {
2015 let object_manager = self.fs.object_manager();
2016 let txn_options = Self::txn_options(object_manager.metadata_reservation());
2017
2018 let layer_set = self.allocator.tree.immutable_layer_set();
2019 let total_len = layer_set.sum_len();
2020 {
2021 let mut merger = layer_set.merger();
2022 let iter = self.allocator.filter(merger.query(Query::FullScan).await?, true).await?;
2023 let iter = CoalescingIterator::new(iter).await?;
2024 self.allocator
2025 .tree
2026 .compact_with_iterator(
2027 iter,
2028 total_len,
2029 DirectWriter::new(&layer_object_handle, txn_options).await,
2030 layer_object_handle.block_size(),
2031 )
2032 .await?;
2033 }
2034
2035 let root_store = self.fs.root_store();
2036
2037 let object_handle;
2039 let reservation_update;
2040 let mut transaction = self
2041 .fs
2042 .clone()
2043 .new_transaction(
2044 lock_keys![LockKey::object(
2045 root_store.store_object_id(),
2046 self.allocator.object_id()
2047 )],
2048 txn_options,
2049 )
2050 .await?;
2051 let mut serialized_info = Vec::new();
2052
2053 debug!(oid = layer_object_handle.object_id(); "new allocator layer file");
2054 object_handle = ObjectStore::open_object(
2055 &root_store,
2056 self.allocator.object_id(),
2057 HandleOptions::default(),
2058 None,
2059 )
2060 .await?;
2061
2062 for object_id in &info.layers {
2064 root_store.add_to_graveyard(&mut transaction, *object_id);
2065 }
2066
2067 let marked_for_deletion = std::mem::take(&mut info.marked_for_deletion);
2074
2075 info.layers = vec![layer_object_handle.object_id()];
2076
2077 info.serialize_with_version(&mut serialized_info)?;
2078
2079 let mut buf = object_handle.allocate_buffer(serialized_info.len()).await;
2080 buf.as_mut_slice()[..serialized_info.len()].copy_from_slice(&serialized_info[..]);
2081 object_handle.txn_write(&mut transaction, 0u64, buf.as_ref()).await?;
2082
2083 reservation_update = ReservationUpdate::new(tree::reservation_amount_from_layer_size(
2084 layer_object_handle.get_size(),
2085 ));
2086
2087 transaction.add_with_object(
2090 self.allocator.object_id(),
2091 Mutation::EndFlush,
2092 AssocObj::Borrowed(&reservation_update),
2093 );
2094 root_store.remove_from_graveyard(&mut transaction, layer_object_handle.object_id());
2095
2096 let layers = layers_from_handles([layer_object_handle]).await?;
2097 transaction
2098 .commit_with_callback(|_| {
2099 self.allocator.tree.set_layers(layers);
2100
2101 let mut inner = self.allocator.inner.lock();
2105 inner.info.layers = info.layers;
2106 for owner_id in marked_for_deletion {
2107 inner.marked_for_deletion.remove(&owner_id);
2108 inner.info.marked_for_deletion.remove(&owner_id);
2109 }
2110 })
2111 .await?;
2112
2113 for layer in layer_set.layers {
2115 let object_id = layer.handle().map(|h| h.object_id());
2116 layer.close_layer().await;
2117 if let Some(object_id) = object_id {
2118 root_store.tombstone_object(object_id, txn_options).await?;
2119 }
2120 }
2121
2122 let mut counters = self.allocator.counters.lock();
2123 counters.num_flushes += 1;
2124 counters.last_flush_time = Some(std::time::SystemTime::now());
2125 Ok(self.allocator.tree.get_earliest_version())
2127 }
2128}
2129
2130#[cfg(test)]
2131mod tests {
2132 use crate::filesystem::{
2133 FxFilesystem, FxFilesystemBuilder, JournalingObject, OpenFxFilesystem,
2134 };
2135 use crate::fsck::fsck;
2136 use crate::lsm_tree::cache::NullCache;
2137 use crate::lsm_tree::skip_list_layer::SkipListLayer;
2138 use crate::lsm_tree::types::{FuzzyHash as _, Item, ItemRef, Layer, LayerIterator};
2139 use crate::lsm_tree::{LSMTree, Query};
2140 use crate::object_handle::ObjectHandle;
2141 use crate::object_store::allocator::merge::merge;
2142 use crate::object_store::allocator::{
2143 Allocator, AllocatorKey, AllocatorValue, CoalescingIterator,
2144 };
2145 use crate::object_store::transaction::{Options, TRANSACTION_METADATA_MAX_AMOUNT, lock_keys};
2146 use crate::object_store::volume::root_volume;
2147 use crate::object_store::{Directory, LockKey, NewChildStoreOptions, ObjectStore};
2148 use crate::range::RangeExt;
2149 use crate::round::round_up;
2150 use fuchsia_async as fasync;
2151 use fuchsia_sync::Mutex;
2152 use std::cmp::{max, min};
2153 use std::ops::{Bound, Range};
2154 use std::sync::Arc;
2155 use storage_device::DeviceHolder;
2156 use storage_device::fake_device::FakeDevice;
2157
2158 #[test]
2159 fn test_allocator_key_is_range_based() {
2160 assert!(AllocatorKey { device_range: 0..100 }.is_range_key());
2162 }
2163
2164 #[fuchsia::test]
2165 async fn test_coalescing_iterator() {
2166 let skip_list = SkipListLayer::new(100);
2167 let items = [
2168 Item::new(
2169 AllocatorKey { device_range: 0..100 },
2170 AllocatorValue::Abs { count: 1, owner_object_id: 99 },
2171 ),
2172 Item::new(
2173 AllocatorKey { device_range: 100..200 },
2174 AllocatorValue::Abs { count: 1, owner_object_id: 99 },
2175 ),
2176 ];
2177 skip_list.insert(items[1].clone()).expect("insert error");
2178 skip_list.insert(items[0].clone()).expect("insert error");
2179 let mut iter =
2180 CoalescingIterator::new(skip_list.seek(Bound::Unbounded).await.expect("seek failed"))
2181 .await
2182 .expect("new failed");
2183 let ItemRef { key, value, .. } = iter.get().expect("get failed");
2184 assert_eq!(
2185 (key, value),
2186 (
2187 &AllocatorKey { device_range: 0..200 },
2188 &AllocatorValue::Abs { count: 1, owner_object_id: 99 }
2189 )
2190 );
2191 iter.advance().await.expect("advance failed");
2192 assert!(iter.get().is_none());
2193 }
2194
2195 #[fuchsia::test]
2196 async fn test_merge_and_coalesce_across_three_layers() {
2197 let lsm_tree = LSMTree::new(merge, Box::new(NullCache {}));
2198 lsm_tree
2199 .insert(Item::new(
2200 AllocatorKey { device_range: 100..200 },
2201 AllocatorValue::Abs { count: 1, owner_object_id: 99 },
2202 ))
2203 .expect("insert error");
2204 lsm_tree.seal();
2205 lsm_tree
2206 .insert(Item::new(
2207 AllocatorKey { device_range: 0..100 },
2208 AllocatorValue::Abs { count: 1, owner_object_id: 99 },
2209 ))
2210 .expect("insert error");
2211
2212 let layer_set = lsm_tree.layer_set();
2213 let mut merger = layer_set.merger();
2214 let mut iter =
2215 CoalescingIterator::new(merger.query(Query::FullScan).await.expect("seek failed"))
2216 .await
2217 .expect("new failed");
2218 let ItemRef { key, value, .. } = iter.get().expect("get failed");
2219 assert_eq!(
2220 (key, value),
2221 (
2222 &AllocatorKey { device_range: 0..200 },
2223 &AllocatorValue::Abs { count: 1, owner_object_id: 99 }
2224 )
2225 );
2226 iter.advance().await.expect("advance failed");
2227 assert!(iter.get().is_none());
2228 }
2229
2230 #[fuchsia::test]
2231 async fn test_merge_and_coalesce_wont_merge_across_object_id() {
2232 let lsm_tree = LSMTree::new(merge, Box::new(NullCache {}));
2233 lsm_tree
2234 .insert(Item::new(
2235 AllocatorKey { device_range: 100..200 },
2236 AllocatorValue::Abs { count: 1, owner_object_id: 99 },
2237 ))
2238 .expect("insert error");
2239 lsm_tree.seal();
2240 lsm_tree
2241 .insert(Item::new(
2242 AllocatorKey { device_range: 0..100 },
2243 AllocatorValue::Abs { count: 1, owner_object_id: 98 },
2244 ))
2245 .expect("insert error");
2246
2247 let layer_set = lsm_tree.layer_set();
2248 let mut merger = layer_set.merger();
2249 let mut iter =
2250 CoalescingIterator::new(merger.query(Query::FullScan).await.expect("seek failed"))
2251 .await
2252 .expect("new failed");
2253 let ItemRef { key, value, .. } = iter.get().expect("get failed");
2254 assert_eq!(
2255 (key, value),
2256 (
2257 &AllocatorKey { device_range: 0..100 },
2258 &AllocatorValue::Abs { count: 1, owner_object_id: 98 },
2259 )
2260 );
2261 iter.advance().await.expect("advance failed");
2262 let ItemRef { key, value, .. } = iter.get().expect("get failed");
2263 assert_eq!(
2264 (key, value),
2265 (
2266 &AllocatorKey { device_range: 100..200 },
2267 &AllocatorValue::Abs { count: 1, owner_object_id: 99 }
2268 )
2269 );
2270 iter.advance().await.expect("advance failed");
2271 assert!(iter.get().is_none());
2272 }
2273
2274 fn overlap(a: &Range<u64>, b: &Range<u64>) -> u64 {
2275 if a.end > b.start && a.start < b.end {
2276 min(a.end, b.end) - max(a.start, b.start)
2277 } else {
2278 0
2279 }
2280 }
2281
2282 async fn collect_allocations(allocator: &Allocator) -> Vec<Range<u64>> {
2283 let layer_set = allocator.tree.layer_set();
2284 let mut merger = layer_set.merger();
2285 let mut iter = allocator
2286 .filter(merger.query(Query::FullScan).await.expect("seek failed"), false)
2287 .await
2288 .expect("build iterator");
2289 let mut allocations: Vec<Range<u64>> = Vec::new();
2290 while let Some(ItemRef { key: AllocatorKey { device_range }, .. }) = iter.get() {
2291 if let Some(r) = allocations.last() {
2292 assert!(device_range.start >= r.end);
2293 }
2294 allocations.push(device_range.clone());
2295 iter.advance().await.expect("advance failed");
2296 }
2297 allocations
2298 }
2299
2300 async fn check_allocations(allocator: &Allocator, expected_allocations: &[Range<u64>]) {
2301 let layer_set = allocator.tree.layer_set();
2302 let mut merger = layer_set.merger();
2303 let mut iter = allocator
2304 .filter(merger.query(Query::FullScan).await.expect("seek failed"), false)
2305 .await
2306 .expect("build iterator");
2307 let mut found = 0;
2308 while let Some(ItemRef { key: AllocatorKey { device_range }, .. }) = iter.get() {
2309 let mut l = device_range.length().expect("Invalid range");
2310 found += l;
2311 for range in expected_allocations {
2314 l -= overlap(range, device_range);
2315 if l == 0 {
2316 break;
2317 }
2318 }
2319 assert_eq!(l, 0, "range {device_range:?} not covered by expectations");
2320 iter.advance().await.expect("advance failed");
2321 }
2322 assert_eq!(found, expected_allocations.iter().map(|r| r.length().unwrap()).sum::<u64>());
2324 }
2325
2326 async fn test_fs() -> (OpenFxFilesystem, Arc<Allocator>) {
2327 let device = DeviceHolder::new(FakeDevice::new(4096, 4096));
2328 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2329 let allocator = fs.allocator();
2330 (fs, allocator)
2331 }
2332
2333 #[fuchsia::test]
2334 async fn test_allocations() {
2335 const STORE_OBJECT_ID: u64 = 99;
2336 let (fs, allocator) = test_fs().await;
2337 let mut transaction =
2338 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2339 let mut device_ranges = collect_allocations(&allocator).await;
2340
2341 let expected = vec![
2343 0..4096, 4096..139264, 139264..204800, 204800..335872, 335872..401408, 524288..528384, ];
2350 assert_eq!(device_ranges, expected);
2351 device_ranges.push(
2352 allocator
2353 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2354 .await
2355 .expect("allocate failed"),
2356 );
2357 assert_eq!(device_ranges.last().unwrap().length().expect("Invalid range"), fs.block_size());
2358 device_ranges.push(
2359 allocator
2360 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2361 .await
2362 .expect("allocate failed"),
2363 );
2364 assert_eq!(device_ranges.last().unwrap().length().expect("Invalid range"), fs.block_size());
2365 assert_eq!(overlap(&device_ranges[0], &device_ranges[1]), 0);
2366 transaction.commit().await.expect("commit failed");
2367 let mut transaction =
2368 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2369 device_ranges.push(
2370 allocator
2371 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2372 .await
2373 .expect("allocate failed"),
2374 );
2375 assert_eq!(device_ranges[7].length().expect("Invalid range"), fs.block_size());
2376 assert_eq!(overlap(&device_ranges[5], &device_ranges[7]), 0);
2377 assert_eq!(overlap(&device_ranges[6], &device_ranges[7]), 0);
2378 transaction.commit().await.expect("commit failed");
2379
2380 check_allocations(&allocator, &device_ranges).await;
2381 }
2382
2383 #[fuchsia::test]
2384 async fn test_allocate_more_than_max_size() {
2385 const STORE_OBJECT_ID: u64 = 99;
2386 let (fs, allocator) = test_fs().await;
2387 let mut transaction =
2388 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2389 let mut device_ranges = collect_allocations(&allocator).await;
2390 device_ranges.push(
2391 allocator
2392 .allocate(&mut transaction, STORE_OBJECT_ID, fs.device().size())
2393 .await
2394 .expect("allocate failed"),
2395 );
2396 assert_eq!(
2397 device_ranges.last().unwrap().length().expect("Invalid range"),
2398 allocator.max_extent_size_bytes
2399 );
2400 transaction.commit().await.expect("commit failed");
2401
2402 check_allocations(&allocator, &device_ranges).await;
2403 }
2404
2405 #[fuchsia::test]
2406 async fn test_deallocations() {
2407 const STORE_OBJECT_ID: u64 = 99;
2408 let (fs, allocator) = test_fs().await;
2409 let initial_allocations = collect_allocations(&allocator).await;
2410
2411 let mut transaction =
2412 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2413 let device_range1 = allocator
2414 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2415 .await
2416 .expect("allocate failed");
2417 assert_eq!(device_range1.length().expect("Invalid range"), fs.block_size());
2418 transaction.commit().await.expect("commit failed");
2419
2420 let mut transaction =
2421 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2422 allocator
2423 .deallocate(&mut transaction, STORE_OBJECT_ID, device_range1)
2424 .await
2425 .expect("deallocate failed");
2426 transaction.commit().await.expect("commit failed");
2427
2428 check_allocations(&allocator, &initial_allocations).await;
2429 }
2430
2431 #[fuchsia::test]
2432 async fn test_mark_allocated() {
2433 const STORE_OBJECT_ID: u64 = 99;
2434 let (fs, allocator) = test_fs().await;
2435 let mut device_ranges = collect_allocations(&allocator).await;
2436 let range = {
2437 let mut transaction = fs
2438 .clone()
2439 .new_transaction(lock_keys![], Options::default())
2440 .await
2441 .expect("new failed");
2442 allocator
2444 .allocate(&mut transaction, STORE_OBJECT_ID, 2 * fs.block_size())
2445 .await
2446 .expect("allocate failed")
2447 };
2449
2450 let mut transaction =
2451 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2452
2453 device_ranges.push(
2456 allocator
2457 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2458 .await
2459 .expect("allocate failed"),
2460 );
2461
2462 assert_eq!(device_ranges.last().unwrap().start, range.start);
2463
2464 let mut range2 = range.clone();
2466 range2.start += fs.block_size();
2467 allocator
2468 .mark_allocated(&mut transaction, STORE_OBJECT_ID, range2.clone())
2469 .expect("mark_allocated failed");
2470 device_ranges.push(range2);
2471
2472 device_ranges.push(
2474 allocator
2475 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2476 .await
2477 .expect("allocate failed"),
2478 );
2479 let last_range = device_ranges.last().unwrap();
2480 assert_eq!(last_range.length().expect("Invalid range"), fs.block_size());
2481 assert_eq!(overlap(last_range, &range), 0);
2482 transaction.commit().await.expect("commit failed");
2483
2484 check_allocations(&allocator, &device_ranges).await;
2485 }
2486
2487 #[fuchsia::test]
2488 async fn test_mark_for_deletion() {
2489 const STORE_OBJECT_ID: u64 = 99;
2490 let (fs, allocator) = test_fs().await;
2491
2492 let initial_allocated_bytes = allocator.get_allocated_bytes();
2494 let mut device_ranges = collect_allocations(&allocator).await;
2495 let mut transaction =
2496 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2497 for _ in 0..15 {
2499 device_ranges.push(
2500 allocator
2501 .allocate(&mut transaction, STORE_OBJECT_ID, 100 * fs.block_size())
2502 .await
2503 .expect("allocate failed"),
2504 );
2505 device_ranges.push(
2506 allocator
2507 .allocate(&mut transaction, STORE_OBJECT_ID, 100 * fs.block_size())
2508 .await
2509 .expect("allocate2 failed"),
2510 );
2511 }
2512 transaction.commit().await.expect("commit failed");
2513 check_allocations(&allocator, &device_ranges).await;
2514
2515 assert_eq!(
2516 allocator.get_allocated_bytes(),
2517 initial_allocated_bytes + fs.block_size() * 3000
2518 );
2519
2520 let mut transaction =
2522 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2523 allocator.mark_for_deletion(&mut transaction, STORE_OBJECT_ID);
2524 transaction.commit().await.expect("commit failed");
2525
2526 assert_eq!(allocator.get_allocated_bytes(), initial_allocated_bytes);
2528 check_allocations(&allocator, &device_ranges).await;
2529
2530 device_ranges.clear();
2533
2534 let mut transaction =
2535 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2536 let target_bytes = 1500 * fs.block_size();
2537 while device_ranges.iter().map(|x| x.length().unwrap()).sum::<u64>() != target_bytes {
2538 let len = std::cmp::min(
2539 target_bytes - device_ranges.iter().map(|x| x.length().unwrap()).sum::<u64>(),
2540 100 * fs.block_size(),
2541 );
2542 device_ranges.push(
2543 allocator.allocate(&mut transaction, 100, len).await.expect("allocate failed"),
2544 );
2545 }
2546 transaction.commit().await.expect("commit failed");
2547
2548 allocator.flush().await.expect("flush failed");
2550
2551 assert!(!allocator.get_owner_allocated_bytes().contains_key(&STORE_OBJECT_ID));
2555 assert_eq!(*allocator.get_owner_allocated_bytes().get(&100).unwrap(), target_bytes,);
2556 }
2557
2558 async fn create_file(store: &Arc<ObjectStore>, size: usize) {
2559 let root_directory =
2560 Directory::open(store, store.root_directory_object_id()).await.expect("open failed");
2561
2562 let mut transaction = store
2563 .filesystem()
2564 .new_transaction(
2565 lock_keys![LockKey::object(
2566 store.store_object_id(),
2567 store.root_directory_object_id()
2568 )],
2569 Options::default(),
2570 )
2571 .await
2572 .expect("new_transaction failed");
2573 let file = root_directory
2574 .create_child_file(&mut transaction, &format!("foo {}", size))
2575 .await
2576 .expect("create_child_file failed");
2577 transaction.commit().await.expect("commit failed");
2578
2579 let buffer = file.allocate_buffer(size).await;
2580
2581 let mut transaction = file
2583 .new_transaction_with_options(Options {
2584 borrow_metadata_space: true,
2585 ..Default::default()
2586 })
2587 .await
2588 .expect("new_transaction_with_options failed");
2589 file.txn_write(&mut transaction, 0, buffer.as_ref()).await.expect("txn_write failed");
2590 transaction.commit().await.expect("commit failed");
2591 }
2592
2593 #[fuchsia::test]
2594 async fn test_replay_with_deleted_store_and_compaction() {
2595 let (fs, _) = test_fs().await;
2596
2597 const FILE_SIZE: usize = 10_000_000;
2598
2599 let mut store_id = {
2600 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2601 let store = root_vol
2602 .new_volume("vol", NewChildStoreOptions::default())
2603 .await
2604 .expect("new_volume failed");
2605
2606 create_file(&store, FILE_SIZE).await;
2607 store.store_object_id()
2608 };
2609
2610 fs.close().await.expect("close failed");
2611 let device = fs.take_device().await;
2612 device.reopen(false);
2613
2614 let mut fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2615
2616 fs.journal().compact().await.expect("compact failed");
2619
2620 for _ in 0..2 {
2621 {
2622 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2623
2624 let transaction = fs
2625 .clone()
2626 .new_transaction(
2627 lock_keys![
2628 LockKey::object(
2629 root_vol.volume_directory().store().store_object_id(),
2630 root_vol.volume_directory().object_id(),
2631 ),
2632 LockKey::flush(store_id)
2633 ],
2634 Options { borrow_metadata_space: true, ..Default::default() },
2635 )
2636 .await
2637 .expect("new_transaction failed");
2638 root_vol
2639 .delete_volume("vol", transaction, || {})
2640 .await
2641 .expect("delete_volume failed");
2642
2643 let store = root_vol
2644 .new_volume("vol", NewChildStoreOptions::default())
2645 .await
2646 .expect("new_volume failed");
2647 create_file(&store, FILE_SIZE).await;
2648 store_id = store.store_object_id();
2649 }
2650
2651 fs.close().await.expect("close failed");
2652 let device = fs.take_device().await;
2653 device.reopen(false);
2654
2655 fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2656 }
2657
2658 fsck(fs.clone()).await.expect("fsck failed");
2659 fs.close().await.expect("close failed");
2660 }
2661
2662 #[fuchsia::test(threads = 4)]
2663 async fn test_compaction_delete_race() {
2664 let (fs, _allocator) = test_fs().await;
2665
2666 {
2667 const FILE_SIZE: usize = 10_000_000;
2668
2669 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2670 let store = root_vol
2671 .new_volume("vol", NewChildStoreOptions::default())
2672 .await
2673 .expect("new_volume failed");
2674
2675 create_file(&store, FILE_SIZE).await;
2676
2677 let fs_clone = fs.clone();
2679
2680 let yield_task = fasync::Task::spawn(async {
2683 loop {
2684 fuchsia_async::yield_now().await;
2685 }
2686 });
2687
2688 for _ in 0..100 {
2692 fuchsia_async::yield_now().await;
2693 }
2694
2695 let task = fasync::Task::spawn(async move {
2696 fs_clone.journal().compact().await.expect("compact failed");
2697 });
2698
2699 let _ = yield_task.abort();
2701
2702 let sleep = rand::random_range(3000..6000);
2705 std::thread::sleep(std::time::Duration::from_micros(sleep));
2706 log::info!("sleep {sleep}us");
2707
2708 let transaction = fs
2709 .clone()
2710 .new_transaction(
2711 lock_keys![
2712 LockKey::object(
2713 root_vol.volume_directory().store().store_object_id(),
2714 root_vol.volume_directory().object_id(),
2715 ),
2716 LockKey::flush(store.store_object_id())
2717 ],
2718 Options { borrow_metadata_space: true, ..Default::default() },
2719 )
2720 .await
2721 .expect("new_transaction failed");
2722 root_vol.delete_volume("vol", transaction, || {}).await.expect("delete_volume failed");
2723
2724 task.await;
2725 }
2726
2727 fs.journal().compact().await.expect("compact failed");
2728 fs.close().await.expect("close failed");
2729
2730 let device = fs.take_device().await;
2731 device.reopen(false);
2732
2733 let fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2734 fsck(fs.clone()).await.expect("fsck failed");
2735 fs.close().await.expect("close failed");
2736 }
2737
2738 #[fuchsia::test]
2739 async fn test_delete_multiple_volumes() {
2740 let (mut fs, _) = test_fs().await;
2741
2742 for _ in 0..50 {
2743 {
2744 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2745 let store = root_vol
2746 .new_volume("vol", NewChildStoreOptions::default())
2747 .await
2748 .expect("new_volume failed");
2749
2750 create_file(&store, 1_000_000).await;
2751
2752 let transaction = fs
2753 .clone()
2754 .new_transaction(
2755 lock_keys![
2756 LockKey::object(
2757 root_vol.volume_directory().store().store_object_id(),
2758 root_vol.volume_directory().object_id(),
2759 ),
2760 LockKey::flush(store.store_object_id())
2761 ],
2762 Options { borrow_metadata_space: true, ..Default::default() },
2763 )
2764 .await
2765 .expect("new_transaction failed");
2766 root_vol
2767 .delete_volume("vol", transaction, || {})
2768 .await
2769 .expect("delete_volume failed");
2770
2771 fs.allocator().flush().await.expect("flush failed");
2772 }
2773
2774 fs.close().await.expect("close failed");
2775 let device = fs.take_device().await;
2776 device.reopen(false);
2777
2778 fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2779 }
2780
2781 fsck(fs.clone()).await.expect("fsck failed");
2782 fs.close().await.expect("close failed");
2783 }
2784
2785 #[fuchsia::test]
2786 async fn test_allocate_free_reallocate() {
2787 const STORE_OBJECT_ID: u64 = 99;
2788 let (fs, allocator) = test_fs().await;
2789
2790 let mut device_ranges = Vec::new();
2792 let mut transaction =
2793 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2794 for _ in 0..30 {
2795 device_ranges.push(
2796 allocator
2797 .allocate(&mut transaction, STORE_OBJECT_ID, 100 * fs.block_size())
2798 .await
2799 .expect("allocate failed"),
2800 );
2801 }
2802 transaction.commit().await.expect("commit failed");
2803
2804 assert_eq!(
2805 fs.block_size() * 3000,
2806 *allocator.get_owner_allocated_bytes().entry(STORE_OBJECT_ID).or_default()
2807 );
2808
2809 let mut transaction =
2811 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2812 for range in std::mem::replace(&mut device_ranges, Vec::new()) {
2813 allocator.deallocate(&mut transaction, STORE_OBJECT_ID, range).await.expect("dealloc");
2814 }
2815 transaction.commit().await.expect("commit failed");
2816
2817 assert_eq!(0, *allocator.get_owner_allocated_bytes().entry(STORE_OBJECT_ID).or_default());
2818
2819 let mut transaction =
2822 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2823 let target_len = 1500 * fs.block_size();
2824 while device_ranges.iter().map(|i| i.length().unwrap()).sum::<u64>() != target_len {
2825 let len = target_len - device_ranges.iter().map(|i| i.length().unwrap()).sum::<u64>();
2826 device_ranges.push(
2827 allocator
2828 .allocate(&mut transaction, STORE_OBJECT_ID, len)
2829 .await
2830 .expect("allocate failed"),
2831 );
2832 }
2833 transaction.commit().await.expect("commit failed");
2834
2835 assert_eq!(
2836 fs.block_size() * 1500,
2837 *allocator.get_owner_allocated_bytes().entry(STORE_OBJECT_ID).or_default()
2838 );
2839 }
2840
2841 #[fuchsia::test]
2842 async fn test_flush() {
2843 const STORE_OBJECT_ID: u64 = 99;
2844
2845 let mut device_ranges = Vec::new();
2846 let device = {
2847 let (fs, allocator) = test_fs().await;
2848 let mut transaction = fs
2849 .clone()
2850 .new_transaction(lock_keys![], Options::default())
2851 .await
2852 .expect("new failed");
2853 device_ranges.push(
2854 allocator
2855 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2856 .await
2857 .expect("allocate failed"),
2858 );
2859 device_ranges.push(
2860 allocator
2861 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2862 .await
2863 .expect("allocate failed"),
2864 );
2865 device_ranges.push(
2866 allocator
2867 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2868 .await
2869 .expect("allocate failed"),
2870 );
2871 transaction.commit().await.expect("commit failed");
2872
2873 allocator.flush().await.expect("flush failed");
2874
2875 fs.close().await.expect("close failed");
2876 fs.take_device().await
2877 };
2878
2879 device.reopen(false);
2880 let fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2881 let allocator = fs.allocator();
2882
2883 let allocated = collect_allocations(&allocator).await;
2884
2885 for i in &device_ranges {
2887 let mut overlapping = 0;
2888 for j in &allocated {
2889 overlapping += overlap(i, j);
2890 }
2891 assert_eq!(overlapping, i.length().unwrap(), "Range {i:?} not allocated");
2892 }
2893
2894 let mut transaction =
2895 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2896 let range = allocator
2897 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2898 .await
2899 .expect("allocate failed");
2900
2901 for r in &allocated {
2903 assert_eq!(overlap(r, &range), 0);
2904 }
2905 transaction.commit().await.expect("commit failed");
2906 }
2907
2908 #[fuchsia::test]
2909 async fn test_dropped_transaction() {
2910 const STORE_OBJECT_ID: u64 = 99;
2911 let (fs, allocator) = test_fs().await;
2912 let allocated_range = {
2913 let mut transaction = fs
2914 .clone()
2915 .new_transaction(lock_keys![], Options::default())
2916 .await
2917 .expect("new_transaction failed");
2918 allocator
2919 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2920 .await
2921 .expect("allocate failed")
2922 };
2923 let mut transaction = fs
2926 .clone()
2927 .new_transaction(lock_keys![], Options::default())
2928 .await
2929 .expect("new_transaction failed");
2930 assert_eq!(
2931 allocator
2932 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2933 .await
2934 .expect("allocate failed"),
2935 allocated_range
2936 );
2937 }
2938
2939 #[fuchsia::test]
2940 async fn test_cleanup_removed_owner() {
2941 const STORE_OBJECT_ID: u64 = 99;
2942 let device = {
2943 let (fs, allocator) = test_fs().await;
2944
2945 assert!(!allocator.get_owner_allocated_bytes().contains_key(&STORE_OBJECT_ID));
2946 {
2947 let mut transaction =
2948 fs.clone().new_transaction(lock_keys![], Options::default()).await.unwrap();
2949 allocator
2950 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2951 .await
2952 .expect("Allocating");
2953 transaction.commit().await.expect("Committing.");
2954 }
2955 allocator.flush().await.expect("Flushing");
2956 assert!(allocator.get_owner_allocated_bytes().contains_key(&STORE_OBJECT_ID));
2957 {
2958 let mut transaction =
2959 fs.clone().new_transaction(lock_keys![], Options::default()).await.unwrap();
2960 allocator.mark_for_deletion(&mut transaction, STORE_OBJECT_ID);
2961 transaction.commit().await.expect("Committing.");
2962 }
2963 assert!(!allocator.get_owner_allocated_bytes().contains_key(&STORE_OBJECT_ID));
2964 fs.close().await.expect("Closing");
2965 fs.take_device().await
2966 };
2967
2968 device.reopen(false);
2969 let fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2970 let allocator = fs.allocator();
2971 assert!(!allocator.get_owner_allocated_bytes().contains_key(&STORE_OBJECT_ID));
2972 }
2973
2974 #[fuchsia::test]
2975 async fn test_allocated_bytes() {
2976 const STORE_OBJECT_ID: u64 = 99;
2977 let (fs, allocator) = test_fs().await;
2978
2979 let initial_allocated_bytes = allocator.get_allocated_bytes();
2980
2981 let allocated_bytes = initial_allocated_bytes + fs.block_size();
2983 let allocated_range = {
2984 let mut transaction = fs
2985 .clone()
2986 .new_transaction(lock_keys![], Options::default())
2987 .await
2988 .expect("new_transaction failed");
2989 let range = allocator
2990 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2991 .await
2992 .expect("allocate failed");
2993 transaction.commit().await.expect("commit failed");
2994 assert_eq!(allocator.get_allocated_bytes(), allocated_bytes);
2995 range
2996 };
2997
2998 {
2999 let mut transaction = fs
3000 .clone()
3001 .new_transaction(lock_keys![], Options::default())
3002 .await
3003 .expect("new_transaction failed");
3004 allocator
3005 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
3006 .await
3007 .expect("allocate failed");
3008
3009 assert_eq!(allocator.get_allocated_bytes(), allocated_bytes);
3011 }
3012
3013 assert_eq!(allocator.get_allocated_bytes(), allocated_bytes);
3015
3016 let deallocate_range = allocated_range.start + 20..allocated_range.end - 20;
3018 let mut transaction =
3019 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
3020 allocator
3021 .deallocate(&mut transaction, STORE_OBJECT_ID, deallocate_range)
3022 .await
3023 .expect("deallocate failed");
3024
3025 assert_eq!(allocator.get_allocated_bytes(), allocated_bytes);
3027
3028 transaction.commit().await.expect("commit failed");
3029
3030 assert_eq!(allocator.get_allocated_bytes(), initial_allocated_bytes + 40);
3032 }
3033
3034 #[fuchsia::test]
3035 async fn test_persist_bytes_limit() {
3036 const LIMIT: u64 = 12345;
3037 const OWNER_ID: u64 = 12;
3038
3039 let (fs, allocator) = test_fs().await;
3040 {
3041 let mut transaction = fs
3042 .clone()
3043 .new_transaction(lock_keys![], Options::default())
3044 .await
3045 .expect("new_transaction failed");
3046 allocator
3047 .set_bytes_limit(&mut transaction, OWNER_ID, LIMIT)
3048 .expect("Failed to set limit.");
3049 assert!(allocator.inner.lock().info.limit_bytes.get(&OWNER_ID).is_none());
3050 transaction.commit().await.expect("Failed to commit transaction");
3051 let bytes: u64 = *allocator
3052 .inner
3053 .lock()
3054 .info
3055 .limit_bytes
3056 .get(&OWNER_ID)
3057 .expect("Failed to find limit");
3058 assert_eq!(LIMIT, bytes);
3059 }
3060 }
3061
3062 fn coalesce_ranges(ranges: Vec<Range<u64>>) -> Vec<Range<u64>> {
3066 let mut coalesced = Vec::new();
3067 let mut prev: Option<Range<u64>> = None;
3068 for range in ranges {
3069 if let Some(prev_range) = &mut prev {
3070 if range.start == prev_range.end {
3071 prev_range.end = range.end;
3072 } else {
3073 coalesced.push(prev_range.clone());
3074 prev = Some(range);
3075 }
3076 } else {
3077 prev = Some(range);
3078 }
3079 }
3080 if let Some(prev_range) = prev {
3081 coalesced.push(prev_range);
3082 }
3083 coalesced
3084 }
3085
3086 #[fuchsia::test]
3087 async fn test_take_for_trimming() {
3088 const STORE_OBJECT_ID: u64 = 99;
3089
3090 let allocated_range;
3093 let expected_free_ranges;
3094 let device = {
3095 let (fs, allocator) = test_fs().await;
3096 let bs = fs.block_size();
3097 let mut transaction = fs
3098 .clone()
3099 .new_transaction(lock_keys![], Options::default())
3100 .await
3101 .expect("new failed");
3102 allocated_range = allocator
3103 .allocate(&mut transaction, STORE_OBJECT_ID, 32 * bs)
3104 .await
3105 .expect("allocate failed");
3106 transaction.commit().await.expect("commit failed");
3107
3108 let mut transaction = fs
3109 .clone()
3110 .new_transaction(lock_keys![], Options::default())
3111 .await
3112 .expect("new failed");
3113 let base = allocated_range.start;
3114 expected_free_ranges = vec![
3115 base..(base + (bs * 1)),
3116 (base + (bs * 2))..(base + (bs * 3)),
3117 (base + (bs * 4))..(base + (bs * 8)),
3121 (base + (bs * 8))..(base + (bs * 12)),
3122 (base + (bs * 12))..(base + (bs * 13)),
3123 (base + (bs * 29))..(base + (bs * 30)),
3124 ];
3125 for range in &expected_free_ranges {
3126 allocator
3127 .deallocate(&mut transaction, STORE_OBJECT_ID, range.clone())
3128 .await
3129 .expect("deallocate failed");
3130 }
3131 transaction.commit().await.expect("commit failed");
3132
3133 allocator.flush().await.expect("flush failed");
3134
3135 fs.close().await.expect("close failed");
3136 fs.take_device().await
3137 };
3138
3139 device.reopen(false);
3140 let fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
3141 let allocator = fs.allocator();
3142
3143 let max_extent_size = fs.block_size() as usize * 4;
3147 const EXTENTS_PER_BATCH: usize = 2;
3148 let mut free_ranges = vec![];
3149 let mut offset = allocated_range.start;
3150 while offset < allocated_range.end {
3151 let free = allocator
3152 .take_for_trimming(offset, max_extent_size, EXTENTS_PER_BATCH)
3153 .await
3154 .expect("take_for_trimming failed");
3155 free_ranges.extend(
3156 free.extents().iter().filter(|range| range.end <= allocated_range.end).cloned(),
3157 );
3158 offset = free.extents().last().expect("Unexpectedly hit the end of free extents").end;
3159 }
3160 let coalesced_free_ranges = coalesce_ranges(free_ranges);
3163 let coalesced_expected_free_ranges = coalesce_ranges(expected_free_ranges);
3164
3165 assert_eq!(coalesced_free_ranges, coalesced_expected_free_ranges);
3166 }
3167
3168 #[fuchsia::test]
3169 async fn test_allocations_wait_for_free_extents() {
3170 const STORE_OBJECT_ID: u64 = 99;
3171 let (fs, allocator) = test_fs().await;
3172 let allocator_clone = allocator.clone();
3173
3174 let mut transaction =
3175 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
3176
3177 let max_extent_size = fs.device().size() as usize;
3179 const EXTENTS_PER_BATCH: usize = usize::MAX;
3180
3181 let trim_done = Arc::new(Mutex::new(false));
3187 let trimmable_extents = allocator
3188 .take_for_trimming(0, max_extent_size, EXTENTS_PER_BATCH)
3189 .await
3190 .expect("take_for_trimming failed");
3191
3192 let trim_done_clone = trim_done.clone();
3193 let bs = fs.block_size();
3194 let alloc_task = fasync::Task::spawn(async move {
3195 allocator_clone
3196 .allocate(&mut transaction, STORE_OBJECT_ID, bs)
3197 .await
3198 .expect("allocate should fail");
3199 {
3200 assert!(*trim_done_clone.lock(), "Allocation finished before trim completed");
3201 }
3202 transaction.commit().await.expect("commit failed");
3203 });
3204
3205 fasync::Timer::new(std::time::Duration::from_millis(100)).await;
3208
3209 {
3211 let mut trim_done = trim_done.lock();
3212 std::mem::drop(trimmable_extents);
3213 *trim_done = true;
3214 }
3215
3216 alloc_task.await;
3217 }
3218
3219 #[fuchsia::test]
3220 async fn test_allocation_with_reservation_not_multiple_of_block_size() {
3221 const STORE_OBJECT_ID: u64 = 99;
3222 let (fs, allocator) = test_fs().await;
3223
3224 const RESERVATION_AMOUNT: u64 = TRANSACTION_METADATA_MAX_AMOUNT + 5000;
3226 let reservation =
3227 allocator.clone().reserve(Some(STORE_OBJECT_ID), RESERVATION_AMOUNT).unwrap();
3228
3229 let mut transaction = fs
3230 .clone()
3231 .new_transaction(
3232 lock_keys![],
3233 Options { allocator_reservation: Some(&reservation), ..Options::default() },
3234 )
3235 .await
3236 .expect("new failed");
3237
3238 let range = allocator
3239 .allocate(
3240 &mut transaction,
3241 STORE_OBJECT_ID,
3242 round_up(RESERVATION_AMOUNT, fs.block_size()).unwrap(),
3243 )
3244 .await
3245 .expect("allocate faiiled");
3246 assert_eq!((range.end - range.start) % fs.block_size(), 0);
3247
3248 println!("{}", range.end - range.start);
3249 }
3250}