1pub mod merge;
90pub mod strategy;
91
92use crate::drop_event::DropEvent;
93use crate::errors::FxfsError;
94use crate::filesystem::{ApplyContext, ApplyMode, FxFilesystem, JournalingObject, SyncOptions};
95use crate::log::*;
96use crate::lsm_tree::cache::NullCache;
97use crate::lsm_tree::skip_list_layer::SkipListLayer;
98use crate::lsm_tree::types::{
99 FuzzyHash, Item, ItemRef, Layer, LayerIterator, LayerKey, MergeType, OrdLowerBound,
100 OrdUpperBound, RangeKey, SortByU64, Value,
101};
102use crate::lsm_tree::{layers_from_handles, LSMTree, Query};
103use crate::object_handle::{ObjectHandle, ReadObjectHandle, INVALID_OBJECT_ID};
104use crate::object_store::object_manager::ReservationUpdate;
105use crate::object_store::transaction::{
106 lock_keys, AllocatorMutation, AssocObj, LockKey, Mutation, Options, Transaction, WriteGuard,
107};
108use crate::object_store::{tree, DataObjectHandle, DirectWriter, HandleOptions, ObjectStore};
109use crate::range::RangeExt;
110use crate::round::{round_div, round_down, round_up};
111use crate::serialized_types::{
112 Version, Versioned, VersionedLatest, DEFAULT_MAX_SERIALIZED_RECORD_SIZE, LATEST_VERSION,
113};
114use anyhow::{anyhow, bail, ensure, Context, Error};
115use async_trait::async_trait;
116use either::Either::{Left, Right};
117use event_listener::EventListener;
118use fprint::TypeFingerprint;
119use fuchsia_inspect::{ArrayProperty, HistogramProperty};
120use fuchsia_sync::Mutex;
121use futures::FutureExt;
122use merge::{filter_marked_for_deletion, filter_tombstones, merge};
123use rustc_hash::FxHasher;
124use serde::{Deserialize, Serialize};
125use std::borrow::Borrow;
126use std::collections::{BTreeMap, HashSet, VecDeque};
127use std::hash::{Hash, Hasher as _};
128use std::marker::PhantomData;
129use std::num::Saturating;
130use std::ops::Range;
131use std::sync::atomic::{AtomicU64, Ordering};
132use std::sync::{Arc, Weak};
133
134pub trait ReservationOwner: Send + Sync {
136 fn release_reservation(&self, owner_object_id: Option<u64>, amount: u64);
140}
141
142pub struct ReservationImpl<T: Borrow<U>, U: ReservationOwner + ?Sized> {
150 owner: T,
151 owner_object_id: Option<u64>,
152 inner: Mutex<ReservationInner>,
153 phantom: PhantomData<U>,
154}
155
156#[derive(Debug, Default)]
157struct ReservationInner {
158 amount: u64,
160
161 reserved: u64,
163}
164
165impl<T: Borrow<U>, U: ReservationOwner + ?Sized> std::fmt::Debug for ReservationImpl<T, U> {
166 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
167 self.inner.lock().fmt(f)
168 }
169}
170
171impl<T: Borrow<U> + Clone + Send + Sync, U: ReservationOwner + ?Sized> ReservationImpl<T, U> {
172 pub fn new(owner: T, owner_object_id: Option<u64>, amount: u64) -> Self {
173 Self {
174 owner,
175 owner_object_id,
176 inner: Mutex::new(ReservationInner { amount, reserved: 0 }),
177 phantom: PhantomData,
178 }
179 }
180
181 pub fn owner_object_id(&self) -> Option<u64> {
182 self.owner_object_id
183 }
184
185 pub fn amount(&self) -> u64 {
187 self.inner.lock().amount
188 }
189
190 pub fn add(&self, amount: u64) {
192 self.inner.lock().amount += amount;
193 }
194
195 pub fn forget(&self) -> u64 {
199 let mut inner = self.inner.lock();
200 assert_eq!(inner.reserved, 0);
201 std::mem::take(&mut inner.amount)
202 }
203
204 pub fn forget_some(&self, amount: u64) {
208 let mut inner = self.inner.lock();
209 inner.amount -= amount;
210 assert!(inner.reserved <= inner.amount);
211 }
212
213 fn reserve_with(&self, amount: impl FnOnce(u64) -> u64) -> ReservationImpl<&Self, Self> {
216 let mut inner = self.inner.lock();
217 let taken = amount(inner.amount - inner.reserved);
218 inner.reserved += taken;
219 ReservationImpl::new(self, self.owner_object_id, taken)
220 }
221
222 pub fn reserve(&self, amount: u64) -> Option<ReservationImpl<&Self, Self>> {
224 let mut inner = self.inner.lock();
225 if inner.amount - inner.reserved < amount {
226 None
227 } else {
228 inner.reserved += amount;
229 Some(ReservationImpl::new(self, self.owner_object_id, amount))
230 }
231 }
232
233 pub fn commit(&self, amount: u64) {
236 let mut inner = self.inner.lock();
237 inner.reserved -= amount;
238 inner.amount -= amount;
239 }
240
241 pub fn give_back(&self, amount: u64) {
243 self.owner.borrow().release_reservation(self.owner_object_id, amount);
244 let mut inner = self.inner.lock();
245 inner.amount -= amount;
246 assert!(inner.reserved <= inner.amount);
247 }
248
249 pub fn move_to<V: Borrow<W> + Clone + Send + Sync, W: ReservationOwner + ?Sized>(
251 &self,
252 other: &ReservationImpl<V, W>,
253 amount: u64,
254 ) {
255 assert_eq!(self.owner_object_id, other.owner_object_id());
256 let mut inner = self.inner.lock();
257 if let Some(amount) = inner.amount.checked_sub(amount) {
258 inner.amount = amount;
259 } else {
260 std::mem::drop(inner);
261 panic!("Insufficient reservation space");
262 }
263 other.add(amount);
264 }
265}
266
267impl<T: Borrow<U>, U: ReservationOwner + ?Sized> Drop for ReservationImpl<T, U> {
268 fn drop(&mut self) {
269 let inner = self.inner.get_mut();
270 assert_eq!(inner.reserved, 0);
271 let owner_object_id = self.owner_object_id;
272 if inner.amount > 0 {
273 self.owner
274 .borrow()
275 .release_reservation(owner_object_id, std::mem::take(&mut inner.amount));
276 }
277 }
278}
279
280impl<T: Borrow<U> + Send + Sync, U: ReservationOwner + ?Sized> ReservationOwner
281 for ReservationImpl<T, U>
282{
283 fn release_reservation(&self, owner_object_id: Option<u64>, amount: u64) {
284 assert_eq!(owner_object_id, self.owner_object_id);
286 let mut inner = self.inner.lock();
287 assert!(inner.reserved >= amount, "{} >= {}", inner.reserved, amount);
288 inner.reserved -= amount;
289 }
290}
291
292pub type Reservation = ReservationImpl<Arc<dyn ReservationOwner>, dyn ReservationOwner>;
293
294pub type Hold<'a> = ReservationImpl<&'a Reservation, Reservation>;
295
296pub type AllocatorKey = AllocatorKeyV32;
299
300#[derive(Clone, Debug, Deserialize, Eq, Hash, PartialEq, Serialize, TypeFingerprint, Versioned)]
301#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
302pub struct AllocatorKeyV32 {
303 pub device_range: Range<u64>,
304}
305
306impl SortByU64 for AllocatorKey {
307 fn get_leading_u64(&self) -> u64 {
308 self.device_range.start
309 }
310}
311
312const EXTENT_HASH_BUCKET_SIZE: u64 = 1 * 1024 * 1024;
313
314pub struct AllocatorKeyPartitionIterator {
315 device_range: Range<u64>,
316}
317
318impl Iterator for AllocatorKeyPartitionIterator {
319 type Item = u64;
320
321 fn next(&mut self) -> Option<Self::Item> {
322 if self.device_range.start >= self.device_range.end {
323 None
324 } else {
325 let start = self.device_range.start;
326 self.device_range.start = start.saturating_add(EXTENT_HASH_BUCKET_SIZE);
327 let end = std::cmp::min(self.device_range.start, self.device_range.end);
328 let key = AllocatorKey { device_range: start..end };
329 let mut hasher = FxHasher::default();
330 key.hash(&mut hasher);
331 Some(hasher.finish())
332 }
333 }
334}
335
336impl FuzzyHash for AllocatorKey {
337 type Iter = AllocatorKeyPartitionIterator;
338
339 fn fuzzy_hash(&self) -> Self::Iter {
340 AllocatorKeyPartitionIterator {
341 device_range: round_down(self.device_range.start, EXTENT_HASH_BUCKET_SIZE)
342 ..round_up(self.device_range.end, EXTENT_HASH_BUCKET_SIZE).unwrap_or(u64::MAX),
343 }
344 }
345}
346
347impl AllocatorKey {
348 pub fn lower_bound_for_merge_into(self: &AllocatorKey) -> AllocatorKey {
350 AllocatorKey { device_range: 0..self.device_range.start }
351 }
352}
353
354impl LayerKey for AllocatorKey {
355 fn merge_type(&self) -> MergeType {
356 MergeType::OptimizedMerge
357 }
358}
359
360impl OrdUpperBound for AllocatorKey {
361 fn cmp_upper_bound(&self, other: &AllocatorKey) -> std::cmp::Ordering {
362 self.device_range.end.cmp(&other.device_range.end)
363 }
364}
365
366impl OrdLowerBound for AllocatorKey {
367 fn cmp_lower_bound(&self, other: &AllocatorKey) -> std::cmp::Ordering {
368 self.device_range
373 .start
374 .cmp(&other.device_range.start)
375 .then(self.device_range.end.cmp(&other.device_range.end))
376 }
377}
378
379impl Ord for AllocatorKey {
380 fn cmp(&self, other: &AllocatorKey) -> std::cmp::Ordering {
381 self.device_range
382 .start
383 .cmp(&other.device_range.start)
384 .then(self.device_range.end.cmp(&other.device_range.end))
385 }
386}
387
388impl PartialOrd for AllocatorKey {
389 fn partial_cmp(&self, other: &AllocatorKey) -> Option<std::cmp::Ordering> {
390 Some(self.cmp(other))
391 }
392}
393
394impl RangeKey for AllocatorKey {
395 fn overlaps(&self, other: &Self) -> bool {
396 self.device_range.start < other.device_range.end
397 && self.device_range.end > other.device_range.start
398 }
399}
400
401pub type AllocatorValue = AllocatorValueV32;
404impl Value for AllocatorValue {
405 const DELETED_MARKER: Self = Self::None;
406}
407
408#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize, TypeFingerprint, Versioned)]
409#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
410pub enum AllocatorValueV32 {
411 None,
413 Abs { count: u64, owner_object_id: u64 },
417}
418
419pub type AllocatorItem = Item<AllocatorKey, AllocatorValue>;
420
421pub type AllocatorInfo = AllocatorInfoV32;
423
424#[derive(Debug, Default, Clone, Deserialize, Serialize, TypeFingerprint, Versioned)]
425pub struct AllocatorInfoV32 {
426 pub layers: Vec<u64>,
428 pub allocated_bytes: BTreeMap<u64, u64>,
430 pub marked_for_deletion: HashSet<u64>,
433 pub limit_bytes: BTreeMap<u64, u64>,
436}
437
438const MAX_ALLOCATOR_INFO_SERIALIZED_SIZE: usize = 131_072;
439
440pub fn max_extent_size_for_block_size(block_size: u64) -> u64 {
442 block_size * (DEFAULT_MAX_SERIALIZED_RECORD_SIZE - 64) / 9
447}
448
449#[derive(Default)]
450struct AllocatorCounters {
451 num_flushes: u64,
452 last_flush_time: Option<std::time::SystemTime>,
453 persistent_layer_file_sizes: Vec<u64>,
454}
455
456pub struct Allocator {
457 filesystem: Weak<FxFilesystem>,
458 block_size: u64,
459 device_size: u64,
460 object_id: u64,
461 max_extent_size_bytes: u64,
462 tree: LSMTree<AllocatorKey, AllocatorValue>,
463 temporary_allocations: Arc<SkipListLayer<AllocatorKey, AllocatorValue>>,
470 inner: Mutex<Inner>,
471 allocation_mutex: futures::lock::Mutex<()>,
472 counters: Mutex<AllocatorCounters>,
473 maximum_offset: AtomicU64,
474}
475
476#[derive(Debug, Default, PartialEq)]
478struct ByteTracking {
479 allocated_bytes: Saturating<u64>,
486
487 uncommitted_allocated_bytes: u64,
490
491 reserved_bytes: u64,
493
494 committed_deallocated_bytes: u64,
498}
499
500impl ByteTracking {
501 fn used_bytes(&self) -> Saturating<u64> {
504 self.allocated_bytes + Saturating(self.uncommitted_allocated_bytes + self.reserved_bytes)
505 }
506
507 fn unavailable_bytes(&self) -> Saturating<u64> {
512 self.allocated_bytes
513 + Saturating(self.uncommitted_allocated_bytes)
514 + Saturating(self.committed_deallocated_bytes)
515 }
516
517 fn unavailable_after_sync_bytes(&self) -> Saturating<u64> {
520 self.allocated_bytes + Saturating(self.uncommitted_allocated_bytes)
521 }
522}
523
524#[derive(Debug)]
525struct CommittedDeallocation {
526 log_file_offset: u64,
528 range: Range<u64>,
530 owner_object_id: u64,
532}
533
534struct Inner {
535 info: AllocatorInfo,
536
537 opened: bool,
540
541 dropped_temporary_allocations: Vec<Range<u64>>,
569
570 owner_bytes: BTreeMap<u64, ByteTracking>,
573
574 unattributed_reserved_bytes: u64,
577
578 committed_deallocated: VecDeque<CommittedDeallocation>,
580
581 trim_reserved_bytes: u64,
584
585 trim_listener: Option<EventListener>,
589
590 strategy: Box<strategy::BestFit>,
592
593 allocation_size_histogram: [u64; 64],
595 rebuild_strategy_trigger_histogram: [u64; 64],
597
598 marked_for_deletion: HashSet<u64>,
602
603 volumes_deleted_pending_sync: HashSet<u64>,
605}
606
607impl Inner {
608 fn allocated_bytes(&self) -> Saturating<u64> {
609 let mut total = Saturating(0);
610 for (_, bytes) in &self.owner_bytes {
611 total += bytes.allocated_bytes;
612 }
613 total
614 }
615
616 fn uncommitted_allocated_bytes(&self) -> u64 {
617 self.owner_bytes.values().map(|x| &x.uncommitted_allocated_bytes).sum()
618 }
619
620 fn reserved_bytes(&self) -> u64 {
621 self.owner_bytes.values().map(|x| &x.reserved_bytes).sum::<u64>()
622 + self.unattributed_reserved_bytes
623 }
624
625 fn owner_id_limit_bytes(&self, owner_object_id: u64) -> u64 {
626 match self.info.limit_bytes.get(&owner_object_id) {
627 Some(v) => *v,
628 None => u64::MAX,
629 }
630 }
631
632 fn owner_id_bytes_left(&self, owner_object_id: u64) -> u64 {
633 let limit = self.owner_id_limit_bytes(owner_object_id);
634 let used = self.owner_bytes.get(&owner_object_id).map_or(Saturating(0), |b| b.used_bytes());
635 (Saturating(limit) - used).0
636 }
637
638 fn unavailable_bytes(&self) -> Saturating<u64> {
643 let mut total = Saturating(0);
644 for (_, bytes) in &self.owner_bytes {
645 total += bytes.unavailable_bytes();
646 }
647 total
648 }
649
650 fn used_bytes(&self) -> Saturating<u64> {
653 let mut total = Saturating(0);
654 for (_, bytes) in &self.owner_bytes {
655 total += bytes.used_bytes();
656 }
657 total + Saturating(self.unattributed_reserved_bytes)
658 }
659
660 fn unavailable_after_sync_bytes(&self) -> Saturating<u64> {
663 let mut total = Saturating(0);
664 for (_, bytes) in &self.owner_bytes {
665 total += bytes.unavailable_after_sync_bytes();
666 }
667 total
668 }
669
670 fn bytes_available_not_being_trimmed(&self, device_size: u64) -> Result<u64, Error> {
673 device_size
674 .checked_sub(
675 (self.unavailable_after_sync_bytes() + Saturating(self.trim_reserved_bytes)).0,
676 )
677 .ok_or_else(|| anyhow!(FxfsError::Inconsistent))
678 }
679
680 fn add_reservation(&mut self, owner_object_id: Option<u64>, amount: u64) {
681 match owner_object_id {
682 Some(owner) => self.owner_bytes.entry(owner).or_default().reserved_bytes += amount,
683 None => self.unattributed_reserved_bytes += amount,
684 };
685 }
686
687 fn remove_reservation(&mut self, owner_object_id: Option<u64>, amount: u64) {
688 match owner_object_id {
689 Some(owner) => {
690 let owner_entry = self.owner_bytes.entry(owner).or_default();
691 assert!(
692 owner_entry.reserved_bytes >= amount,
693 "{} >= {}",
694 owner_entry.reserved_bytes,
695 amount
696 );
697 owner_entry.reserved_bytes -= amount;
698 }
699 None => {
700 assert!(
701 self.unattributed_reserved_bytes >= amount,
702 "{} >= {}",
703 self.unattributed_reserved_bytes,
704 amount
705 );
706 self.unattributed_reserved_bytes -= amount
707 }
708 };
709 }
710}
711
712pub struct TrimmableExtents<'a> {
715 allocator: &'a Allocator,
716 extents: Vec<Range<u64>>,
717 _drop_event: DropEvent,
721}
722
723impl<'a> TrimmableExtents<'a> {
724 pub fn extents(&self) -> &Vec<Range<u64>> {
725 &self.extents
726 }
727
728 fn new(allocator: &'a Allocator) -> (Self, EventListener) {
730 let drop_event = DropEvent::new();
731 let listener = drop_event.listen();
732 (Self { allocator, extents: vec![], _drop_event: drop_event }, listener)
733 }
734
735 fn add_extent(&mut self, extent: Range<u64>) {
736 self.extents.push(extent);
737 }
738}
739
740impl<'a> Drop for TrimmableExtents<'a> {
741 fn drop(&mut self) {
742 let mut inner = self.allocator.inner.lock();
743 for device_range in std::mem::take(&mut self.extents) {
744 inner.strategy.free(device_range.clone()).expect("drop trim extent");
745 self.allocator
746 .temporary_allocations
747 .erase(&AllocatorKey { device_range: device_range.clone() });
748 }
749 inner.trim_reserved_bytes = 0;
750 }
751}
752
753impl Allocator {
754 pub fn new(filesystem: Arc<FxFilesystem>, object_id: u64) -> Allocator {
755 let block_size = filesystem.block_size();
756 let device_size = round_down(filesystem.device().size(), block_size);
758 if device_size != filesystem.device().size() {
759 warn!("Device size is not block aligned. Rounding down.");
760 }
761 let max_extent_size_bytes = max_extent_size_for_block_size(filesystem.block_size());
762 let mut strategy = Box::new(strategy::BestFit::default());
763 strategy.free(0..device_size).expect("new fs");
764 Allocator {
765 filesystem: Arc::downgrade(&filesystem),
766 block_size,
767 device_size,
768 object_id,
769 max_extent_size_bytes,
770 tree: LSMTree::new(merge, Box::new(NullCache {})),
771 temporary_allocations: SkipListLayer::new(1024),
772 inner: Mutex::new(Inner {
773 info: AllocatorInfo::default(),
774 opened: false,
775 dropped_temporary_allocations: Vec::new(),
776 owner_bytes: BTreeMap::new(),
777 unattributed_reserved_bytes: 0,
778 committed_deallocated: VecDeque::new(),
779 trim_reserved_bytes: 0,
780 trim_listener: None,
781 strategy,
782 allocation_size_histogram: [0; 64],
783 rebuild_strategy_trigger_histogram: [0; 64],
784 marked_for_deletion: HashSet::new(),
785 volumes_deleted_pending_sync: HashSet::new(),
786 }),
787 allocation_mutex: futures::lock::Mutex::new(()),
788 counters: Mutex::new(AllocatorCounters::default()),
789 maximum_offset: AtomicU64::new(0),
790 }
791 }
792
793 pub fn tree(&self) -> &LSMTree<AllocatorKey, AllocatorValue> {
794 &self.tree
795 }
796
797 pub async fn filter(
802 &self,
803 iter: impl LayerIterator<AllocatorKey, AllocatorValue>,
804 committed_marked_for_deletion: bool,
805 ) -> Result<impl LayerIterator<AllocatorKey, AllocatorValue>, Error> {
806 let marked_for_deletion = {
807 let inner = self.inner.lock();
808 if committed_marked_for_deletion {
809 &inner.info.marked_for_deletion
810 } else {
811 &inner.marked_for_deletion
812 }
813 .clone()
814 };
815 let iter =
816 filter_marked_for_deletion(filter_tombstones(iter).await?, marked_for_deletion).await?;
817 Ok(iter)
818 }
819
820 pub fn allocation_size_histogram(&self) -> [u64; 64] {
824 self.inner.lock().allocation_size_histogram
825 }
826
827 pub async fn create(&self, transaction: &mut Transaction<'_>) -> Result<(), Error> {
829 assert_eq!(std::mem::replace(&mut self.inner.lock().opened, true), false);
832
833 let filesystem = self.filesystem.upgrade().unwrap();
834 let root_store = filesystem.root_store();
835 ObjectStore::create_object_with_id(
836 &root_store,
837 transaction,
838 self.object_id(),
839 HandleOptions::default(),
840 None,
841 )
842 .await?;
843 root_store.update_last_object_id(self.object_id());
844 Ok(())
845 }
846
847 pub async fn open(self: &Arc<Self>) -> Result<(), Error> {
850 let filesystem = self.filesystem.upgrade().unwrap();
851 let root_store = filesystem.root_store();
852
853 {
854 let mut inner = self.inner.lock();
855
856 inner.strategy = Box::new(strategy::BestFit::default());
857 }
858
859 let handle =
860 ObjectStore::open_object(&root_store, self.object_id, HandleOptions::default(), None)
861 .await
862 .context("Failed to open allocator object")?;
863
864 if handle.get_size() > 0 {
865 let serialized_info = handle
866 .contents(MAX_ALLOCATOR_INFO_SERIALIZED_SIZE)
867 .await
868 .context("Failed to read AllocatorInfo")?;
869 let mut cursor = std::io::Cursor::new(serialized_info);
870 let (info, _version) = AllocatorInfo::deserialize_with_version(&mut cursor)
871 .context("Failed to deserialize AllocatorInfo")?;
872
873 let mut handles = Vec::new();
874 let mut layer_file_sizes = Vec::new();
875 let mut total_size = 0;
876 for object_id in &info.layers {
877 let handle = ObjectStore::open_object(
878 &root_store,
879 *object_id,
880 HandleOptions::default(),
881 None,
882 )
883 .await
884 .context("Failed to open allocator layer file")?;
885
886 let size = handle.get_size();
887 layer_file_sizes.push(size);
888 total_size += size;
889 handles.push(handle);
890 }
891
892 {
893 let mut inner = self.inner.lock();
894
895 let mut device_bytes = self.device_size;
897 for (&owner_object_id, &bytes) in &info.allocated_bytes {
898 ensure!(
899 bytes <= device_bytes,
900 anyhow!(FxfsError::Inconsistent).context(format!(
901 "Allocated bytes exceeds device size: {:?}",
902 info.allocated_bytes
903 ))
904 );
905 device_bytes -= bytes;
906
907 inner.owner_bytes.entry(owner_object_id).or_default().allocated_bytes =
908 Saturating(bytes);
909 }
910
911 inner.info = info;
912 }
913
914 self.counters.lock().persistent_layer_file_sizes = layer_file_sizes;
915 self.tree.append_layers(handles).await.context("Failed to append allocator layers")?;
916 self.filesystem.upgrade().unwrap().object_manager().update_reservation(
917 self.object_id,
918 tree::reservation_amount_from_layer_size(total_size),
919 );
920 }
921
922 Ok(())
923 }
924
925 pub async fn on_replay_complete(self: &Arc<Self>) -> Result<(), Error> {
926 {
928 let mut inner = self.inner.lock();
929 inner.volumes_deleted_pending_sync.clear();
930 inner.marked_for_deletion = inner.info.marked_for_deletion.clone();
931 }
932
933 if !self.rebuild_strategy().await.context("Build free extents")? {
937 error!("Device contains no free space.");
938 return Err(FxfsError::Inconsistent).context("Device appears to contain no free space");
939 }
940
941 assert_eq!(std::mem::replace(&mut self.inner.lock().opened, true), false);
942 Ok(())
943 }
944
945 async fn rebuild_strategy(self: &Arc<Self>) -> Result<bool, Error> {
950 let mut changed = false;
951 let mut layer_set = self.tree.empty_layer_set();
952 layer_set.layers.push((self.temporary_allocations.clone() as Arc<dyn Layer<_, _>>).into());
953 self.tree.add_all_layers_to_layer_set(&mut layer_set);
954
955 let overflow_markers = self.inner.lock().strategy.overflow_markers();
956 self.inner.lock().strategy.reset_overflow_markers();
957
958 let mut to_add = Vec::new();
959 let mut merger = layer_set.merger();
960 let mut iter = self.filter(merger.query(Query::FullScan).await?, false).await?;
961 let mut last_offset = 0;
962 while last_offset < self.device_size {
963 let next_range = match iter.get() {
964 None => {
965 assert!(last_offset <= self.device_size);
966 let range = last_offset..self.device_size;
967 last_offset = self.device_size;
968 iter.advance().await?;
969 range
970 }
971 Some(ItemRef { key: AllocatorKey { device_range, .. }, .. }) => {
972 if device_range.end < last_offset {
973 iter.advance().await?;
974 continue;
975 }
976 if device_range.start <= last_offset {
977 last_offset = device_range.end;
978 iter.advance().await?;
979 continue;
980 }
981 let range = last_offset..device_range.start;
982 last_offset = device_range.end;
983 iter.advance().await?;
984 range
985 }
986 };
987 to_add.push(next_range);
988 if to_add.len() > 100 {
990 let mut inner = self.inner.lock();
991 for range in to_add.drain(..) {
992 changed |= inner.strategy.force_free(range)?;
993 }
994 }
995 }
996 let mut inner = self.inner.lock();
997 for range in to_add {
998 changed |= inner.strategy.force_free(range)?;
999 }
1000 if overflow_markers != inner.strategy.overflow_markers() {
1001 changed = true;
1002 }
1003 Ok(changed)
1004 }
1005
1006 pub async fn take_for_trimming(
1010 &self,
1011 offset: u64,
1012 max_extent_size: usize,
1013 extents_per_batch: usize,
1014 ) -> Result<TrimmableExtents<'_>, Error> {
1015 let _guard = self.allocation_mutex.lock().await;
1016
1017 let (mut result, listener) = TrimmableExtents::new(self);
1018 let mut bytes = 0;
1019
1020 let mut layer_set = self.tree.empty_layer_set();
1023 layer_set.layers.push((self.temporary_allocations.clone() as Arc<dyn Layer<_, _>>).into());
1024 self.tree.add_all_layers_to_layer_set(&mut layer_set);
1025 let mut merger = layer_set.merger();
1026 let mut iter = self
1027 .filter(
1028 merger.query(Query::FullRange(&AllocatorKey { device_range: offset..0 })).await?,
1029 false,
1030 )
1031 .await?;
1032 let mut last_offset = offset;
1033 'outer: while last_offset < self.device_size {
1034 let mut range = match iter.get() {
1035 None => {
1036 assert!(last_offset <= self.device_size);
1037 let range = last_offset..self.device_size;
1038 last_offset = self.device_size;
1039 iter.advance().await?;
1040 range
1041 }
1042 Some(ItemRef { key: AllocatorKey { device_range, .. }, .. }) => {
1043 if device_range.end <= last_offset {
1044 iter.advance().await?;
1045 continue;
1046 }
1047 if device_range.start <= last_offset {
1048 last_offset = device_range.end;
1049 iter.advance().await?;
1050 continue;
1051 }
1052 let range = last_offset..device_range.start;
1053 last_offset = device_range.end;
1054 iter.advance().await?;
1055 range
1056 }
1057 };
1058 if range.start < offset {
1059 continue;
1060 }
1061
1062 let mut inner = self.inner.lock();
1065
1066 while range.start < range.end {
1067 let prefix =
1068 range.start..std::cmp::min(range.start + max_extent_size as u64, range.end);
1069 range = prefix.end..range.end;
1070 bytes += prefix.length()?;
1071 inner.strategy.remove(prefix.clone());
1074 self.temporary_allocations.insert(AllocatorItem {
1075 key: AllocatorKey { device_range: prefix.clone() },
1076 value: AllocatorValue::Abs { owner_object_id: INVALID_OBJECT_ID, count: 1 },
1077 sequence: 0,
1078 })?;
1079 result.add_extent(prefix);
1080 if result.extents.len() == extents_per_batch {
1081 break 'outer;
1082 }
1083 }
1084 if result.extents.len() == extents_per_batch {
1085 break 'outer;
1086 }
1087 }
1088 {
1089 let mut inner = self.inner.lock();
1090
1091 assert!(inner.trim_reserved_bytes == 0, "Multiple trims ongoing");
1092 inner.trim_listener = Some(listener);
1093 inner.trim_reserved_bytes = bytes;
1094 debug_assert!(
1095 (Saturating(inner.trim_reserved_bytes) + inner.unavailable_bytes()).0
1096 <= self.device_size
1097 );
1098 }
1099 Ok(result)
1100 }
1101
1102 pub fn parent_objects(&self) -> Vec<u64> {
1104 self.inner.lock().info.layers.clone()
1107 }
1108
1109 pub fn owner_byte_limits(&self) -> Vec<(u64, u64)> {
1111 self.inner.lock().info.limit_bytes.iter().map(|(k, v)| (*k, *v)).collect()
1112 }
1113
1114 pub fn owner_allocation_info(&self, owner_object_id: u64) -> (u64, Option<u64>) {
1116 let inner = self.inner.lock();
1117 (
1118 inner.owner_bytes.get(&owner_object_id).map(|b| b.used_bytes().0).unwrap_or(0u64),
1119 inner.info.limit_bytes.get(&owner_object_id).copied(),
1120 )
1121 }
1122
1123 pub fn owner_bytes_debug(&self) -> String {
1125 format!("{:?}", self.inner.lock().owner_bytes)
1126 }
1127
1128 fn needs_sync(&self) -> bool {
1129 let inner = self.inner.lock();
1134 inner.unavailable_bytes().0 >= self.device_size
1135 }
1136
1137 fn is_system_store(&self, owner_object_id: u64) -> bool {
1138 let fs = self.filesystem.upgrade().unwrap();
1139 owner_object_id == fs.object_manager().root_store_object_id()
1140 || owner_object_id == fs.object_manager().root_parent_store_object_id()
1141 }
1142
1143 pub fn disown_reservation(&self, old_owner_object_id: Option<u64>, amount: u64) {
1146 if old_owner_object_id.is_none() || amount == 0 {
1147 return;
1148 }
1149 let mut inner = self.inner.lock();
1151 inner.remove_reservation(old_owner_object_id, amount);
1152 inner.add_reservation(None, amount);
1153 }
1154
1155 pub fn track_statistics(self: &Arc<Self>, parent: &fuchsia_inspect::Node, name: &str) {
1158 let this = Arc::downgrade(self);
1159 parent.record_lazy_child(name, move || {
1160 let this_clone = this.clone();
1161 async move {
1162 let inspector = fuchsia_inspect::Inspector::default();
1163 if let Some(this) = this_clone.upgrade() {
1164 let counters = this.counters.lock();
1165 let root = inspector.root();
1166 root.record_uint("max_extent_size_bytes", this.max_extent_size_bytes);
1167 root.record_uint("bytes_total", this.device_size);
1168 let (allocated, reserved, used, unavailable) = {
1169 let inner = this.inner.lock();
1171 (
1172 inner.allocated_bytes().0,
1173 inner.reserved_bytes(),
1174 inner.used_bytes().0,
1175 inner.unavailable_bytes().0,
1176 )
1177 };
1178 root.record_uint("bytes_allocated", allocated);
1179 root.record_uint("bytes_reserved", reserved);
1180 root.record_uint("bytes_used", used);
1181 root.record_uint("bytes_unavailable", unavailable);
1182
1183 if let Some(x) = round_div(100 * allocated, this.device_size) {
1186 root.record_uint("bytes_allocated_percent", x);
1187 }
1188 if let Some(x) = round_div(100 * reserved, this.device_size) {
1189 root.record_uint("bytes_reserved_percent", x);
1190 }
1191 if let Some(x) = round_div(100 * used, this.device_size) {
1192 root.record_uint("bytes_used_percent", x);
1193 }
1194 if let Some(x) = round_div(100 * unavailable, this.device_size) {
1195 root.record_uint("bytes_unavailable_percent", x);
1196 }
1197
1198 root.record_uint("num_flushes", counters.num_flushes);
1199 if let Some(last_flush_time) = counters.last_flush_time.as_ref() {
1200 root.record_uint(
1201 "last_flush_time_ms",
1202 last_flush_time
1203 .duration_since(std::time::UNIX_EPOCH)
1204 .unwrap_or(std::time::Duration::ZERO)
1205 .as_millis()
1206 .try_into()
1207 .unwrap_or(0u64),
1208 );
1209 }
1210 let sizes = root.create_uint_array(
1211 "persistent_layer_file_sizes",
1212 counters.persistent_layer_file_sizes.len(),
1213 );
1214 for i in 0..counters.persistent_layer_file_sizes.len() {
1215 sizes.set(i, counters.persistent_layer_file_sizes[i]);
1216 }
1217 root.record(sizes);
1218
1219 let data = this.allocation_size_histogram();
1220 let alloc_sizes = root.create_uint_linear_histogram(
1221 "allocation_size_histogram",
1222 fuchsia_inspect::LinearHistogramParams {
1223 floor: 1,
1224 step_size: 1,
1225 buckets: 64,
1226 },
1227 );
1228 for (i, count) in data.iter().enumerate() {
1229 if i != 0 {
1230 alloc_sizes.insert_multiple(i as u64, *count as usize);
1231 }
1232 }
1233 root.record(alloc_sizes);
1234
1235 let data = this.inner.lock().rebuild_strategy_trigger_histogram;
1236 let triggers = root.create_uint_linear_histogram(
1237 "rebuild_strategy_triggers",
1238 fuchsia_inspect::LinearHistogramParams {
1239 floor: 1,
1240 step_size: 1,
1241 buckets: 64,
1242 },
1243 );
1244 for (i, count) in data.iter().enumerate() {
1245 if i != 0 {
1246 triggers.insert_multiple(i as u64, *count as usize);
1247 }
1248 }
1249 root.record(triggers);
1250 }
1251 Ok(inspector)
1252 }
1253 .boxed()
1254 });
1255 }
1256
1257 pub fn maximum_offset(&self) -> u64 {
1262 self.maximum_offset.load(Ordering::Relaxed)
1263 }
1264}
1265
1266impl Drop for Allocator {
1267 fn drop(&mut self) {
1268 let inner = self.inner.lock();
1269 assert_eq!(inner.uncommitted_allocated_bytes(), 0);
1271 assert_eq!(inner.reserved_bytes(), 0);
1272 }
1273}
1274
1275#[fxfs_trace::trace]
1276impl Allocator {
1277 pub fn object_id(&self) -> u64 {
1279 self.object_id
1280 }
1281
1282 pub fn info(&self) -> AllocatorInfo {
1285 self.inner.lock().info.clone()
1286 }
1287
1288 #[trace]
1296 pub async fn allocate(
1297 self: &Arc<Self>,
1298 transaction: &mut Transaction<'_>,
1299 owner_object_id: u64,
1300 mut len: u64,
1301 ) -> Result<Range<u64>, Error> {
1302 assert_eq!(len % self.block_size, 0);
1303 len = std::cmp::min(len, self.max_extent_size_bytes);
1304 debug_assert_ne!(owner_object_id, INVALID_OBJECT_ID);
1305
1306 let reservation = if let Some(reservation) = transaction.allocator_reservation {
1308 match reservation.owner_object_id {
1309 None => assert!(self.is_system_store(owner_object_id)),
1311 Some(res_owner_object_id) => assert_eq!(owner_object_id, res_owner_object_id),
1313 };
1314 let r = reservation
1316 .reserve_with(|limit| std::cmp::min(len, round_down(limit, self.block_size)));
1317 len = r.amount();
1318 Left(r)
1319 } else {
1320 let mut inner = self.inner.lock();
1321 assert!(inner.opened);
1322 let device_used = inner.used_bytes();
1324 let owner_bytes_left = inner.owner_id_bytes_left(owner_object_id);
1325 let limit =
1327 std::cmp::min(owner_bytes_left, (Saturating(self.device_size) - device_used).0);
1328 len = round_down(std::cmp::min(len, limit), self.block_size);
1329 let owner_entry = inner.owner_bytes.entry(owner_object_id).or_default();
1330 owner_entry.reserved_bytes += len;
1331 Right(ReservationImpl::<_, Self>::new(&**self, Some(owner_object_id), len))
1332 };
1333
1334 ensure!(len > 0, FxfsError::NoSpace);
1335
1336 let volumes_deleted = {
1338 let inner = self.inner.lock();
1339 (!inner.volumes_deleted_pending_sync.is_empty())
1340 .then(|| inner.volumes_deleted_pending_sync.clone())
1341 };
1342
1343 if let Some(volumes_deleted) = volumes_deleted {
1344 self.filesystem
1347 .upgrade()
1348 .unwrap()
1349 .sync(SyncOptions {
1350 flush_device: true,
1351 precondition: Some(Box::new(|| {
1352 !self.inner.lock().volumes_deleted_pending_sync.is_empty()
1353 })),
1354 ..Default::default()
1355 })
1356 .await?;
1357
1358 {
1359 let mut inner = self.inner.lock();
1360 for owner_id in volumes_deleted {
1361 inner.volumes_deleted_pending_sync.remove(&owner_id);
1362 inner.marked_for_deletion.insert(owner_id);
1363 }
1364 }
1365
1366 let _guard = self.allocation_mutex.lock().await;
1367 self.rebuild_strategy().await?;
1368 }
1369
1370 #[allow(clippy::never_loop)] let _guard = 'sync: loop {
1372 for _ in 0..10 {
1374 {
1375 let guard = self.allocation_mutex.lock().await;
1376
1377 if !self.needs_sync() {
1378 break 'sync guard;
1379 }
1380 }
1381
1382 self.filesystem
1392 .upgrade()
1393 .unwrap()
1394 .sync(SyncOptions {
1395 flush_device: true,
1396 precondition: Some(Box::new(|| self.needs_sync())),
1397 ..Default::default()
1398 })
1399 .await?;
1400 }
1401 bail!(
1402 anyhow!(FxfsError::NoSpace).context("Sync failed to yield sufficient free space.")
1403 );
1404 };
1405
1406 let mut trim_listener = None;
1407 {
1408 let mut inner = self.inner.lock();
1409 inner.allocation_size_histogram[std::cmp::min(63, len / self.block_size) as usize] += 1;
1410
1411 let avail = self
1414 .device_size
1415 .checked_sub(inner.unavailable_bytes().0)
1416 .ok_or(FxfsError::Inconsistent)?;
1417 let free_and_not_being_trimmed =
1418 inner.bytes_available_not_being_trimmed(self.device_size)?;
1419 if free_and_not_being_trimmed < std::cmp::min(len, avail) {
1420 debug_assert!(inner.trim_reserved_bytes > 0);
1421 trim_listener = std::mem::take(&mut inner.trim_listener);
1422 }
1423 }
1424
1425 if let Some(listener) = trim_listener {
1426 listener.await;
1427 }
1428
1429 let result = loop {
1430 {
1431 let mut inner = self.inner.lock();
1432
1433 for device_range in inner.dropped_temporary_allocations.drain(..) {
1436 self.temporary_allocations.erase(&AllocatorKey { device_range });
1437 }
1438
1439 match inner.strategy.allocate(len) {
1440 Err(FxfsError::NotFound) => {
1441 inner.rebuild_strategy_trigger_histogram
1443 [std::cmp::min(63, (len / self.block_size) as usize)] += 1;
1444 }
1445 Err(err) => {
1446 error!(err:%; "Likely filesystem corruption.");
1447 return Err(err.into());
1448 }
1449 Ok(x) => {
1450 break x;
1451 }
1452 }
1453 }
1454 if !self.rebuild_strategy().await? {
1458 error!("Cannot find additional free space. Corruption?");
1459 return Err(FxfsError::Inconsistent.into());
1460 }
1461 };
1462
1463 debug!(device_range:? = result; "allocate");
1464
1465 let len = result.length().unwrap();
1466 let reservation_owner = reservation.either(
1467 |l| {
1469 l.forget_some(len);
1470 l.owner_object_id()
1471 },
1472 |r| {
1473 r.forget_some(len);
1474 r.owner_object_id()
1475 },
1476 );
1477
1478 {
1479 let mut inner = self.inner.lock();
1480 let owner_entry = inner.owner_bytes.entry(owner_object_id).or_default();
1481 owner_entry.uncommitted_allocated_bytes += len;
1482 assert_eq!(owner_object_id, reservation_owner.unwrap_or(owner_object_id));
1484 inner.remove_reservation(reservation_owner, len);
1485 self.temporary_allocations.insert(AllocatorItem {
1486 key: AllocatorKey { device_range: result.clone() },
1487 value: AllocatorValue::Abs { owner_object_id, count: 1 },
1488 sequence: 0,
1489 })?;
1490 }
1491
1492 let mutation =
1493 AllocatorMutation::Allocate { device_range: result.clone().into(), owner_object_id };
1494 assert!(transaction.add(self.object_id(), Mutation::Allocator(mutation)).is_none());
1495
1496 Ok(result)
1497 }
1498
1499 #[trace]
1502 pub async fn mark_allocated(
1503 &self,
1504 transaction: &mut Transaction<'_>,
1505 owner_object_id: u64,
1506 device_range: Range<u64>,
1507 ) -> Result<(), Error> {
1508 debug_assert_ne!(owner_object_id, INVALID_OBJECT_ID);
1509 {
1510 let len = device_range.length().map_err(|_| FxfsError::InvalidArgs)?;
1511
1512 let mut inner = self.inner.lock();
1513 let device_used = inner.used_bytes();
1514 let owner_id_bytes_left = inner.owner_id_bytes_left(owner_object_id);
1515 let owner_entry = inner.owner_bytes.entry(owner_object_id).or_default();
1516 ensure!(
1517 device_range.end <= self.device_size
1518 && (Saturating(self.device_size) - device_used).0 >= len
1519 && owner_id_bytes_left >= len,
1520 FxfsError::NoSpace
1521 );
1522 if let Some(reservation) = &mut transaction.allocator_reservation {
1523 reservation.reserve(len).ok_or(FxfsError::NoSpace)?.forget();
1525 }
1526 owner_entry.uncommitted_allocated_bytes += len;
1527 inner.strategy.remove(device_range.clone());
1528 self.temporary_allocations.insert(AllocatorItem {
1529 key: AllocatorKey { device_range: device_range.clone() },
1530 value: AllocatorValue::Abs { owner_object_id, count: 1 },
1531 sequence: 0,
1532 })?;
1533 }
1534 let mutation =
1535 AllocatorMutation::Allocate { device_range: device_range.into(), owner_object_id };
1536 transaction.add(self.object_id(), Mutation::Allocator(mutation));
1537 Ok(())
1538 }
1539
1540 pub async fn set_bytes_limit(
1542 &self,
1543 transaction: &mut Transaction<'_>,
1544 owner_object_id: u64,
1545 bytes: u64,
1546 ) -> Result<(), Error> {
1547 assert!(!self.is_system_store(owner_object_id));
1549 transaction.add(
1550 self.object_id(),
1551 Mutation::Allocator(AllocatorMutation::SetLimit { owner_object_id, bytes }),
1552 );
1553 Ok(())
1554 }
1555
1556 pub async fn get_bytes_limit(&self, owner_object_id: u64) -> Option<u64> {
1558 self.inner.lock().info.limit_bytes.get(&owner_object_id).copied()
1559 }
1560
1561 #[trace]
1563 pub async fn deallocate(
1564 &self,
1565 transaction: &mut Transaction<'_>,
1566 owner_object_id: u64,
1567 dealloc_range: Range<u64>,
1568 ) -> Result<u64, Error> {
1569 debug!(device_range:? = dealloc_range; "deallocate");
1570 ensure!(dealloc_range.is_valid(), FxfsError::InvalidArgs);
1571 let deallocated = dealloc_range.end - dealloc_range.start;
1575 let mutation = AllocatorMutation::Deallocate {
1576 device_range: dealloc_range.clone().into(),
1577 owner_object_id,
1578 };
1579 transaction.add(self.object_id(), Mutation::Allocator(mutation));
1580
1581 let _guard = self.allocation_mutex.lock().await;
1582
1583 let mut inner = self.inner.lock();
1595 for device_range in inner.dropped_temporary_allocations.drain(..) {
1596 self.temporary_allocations.erase(&AllocatorKey { device_range });
1597 }
1598
1599 self.temporary_allocations
1604 .insert(AllocatorItem {
1605 key: AllocatorKey { device_range: dealloc_range.clone() },
1606 value: AllocatorValue::Abs { owner_object_id, count: 1 },
1607 sequence: 0,
1608 })
1609 .context("tracking deallocated")?;
1610
1611 Ok(deallocated)
1612 }
1613
1614 pub async fn mark_for_deletion(&self, transaction: &mut Transaction<'_>, owner_object_id: u64) {
1631 transaction.add(
1634 self.object_id(),
1635 Mutation::Allocator(AllocatorMutation::MarkForDeletion(owner_object_id)),
1636 );
1637 }
1638
1639 pub async fn did_flush_device(&self, flush_log_offset: u64) {
1642 #[allow(clippy::never_loop)] let deallocs = 'deallocs_outer: loop {
1647 let mut inner = self.inner.lock();
1648 for (index, dealloc) in inner.committed_deallocated.iter().enumerate() {
1649 if dealloc.log_file_offset >= flush_log_offset {
1650 let mut deallocs = inner.committed_deallocated.split_off(index);
1651 std::mem::swap(&mut inner.committed_deallocated, &mut deallocs);
1653 break 'deallocs_outer deallocs;
1654 }
1655 }
1656 break std::mem::take(&mut inner.committed_deallocated);
1657 };
1658
1659 let mut inner = self.inner.lock();
1661 let mut totals = BTreeMap::<u64, u64>::new();
1662 for dealloc in deallocs {
1663 *(totals.entry(dealloc.owner_object_id).or_default()) +=
1664 dealloc.range.length().unwrap();
1665 inner.strategy.free(dealloc.range.clone()).expect("dealloced ranges");
1666 self.temporary_allocations.erase(&AllocatorKey { device_range: dealloc.range.clone() });
1667 }
1668
1669 for (owner_object_id, total) in totals {
1673 match inner.owner_bytes.get_mut(&owner_object_id) {
1674 Some(counters) => counters.committed_deallocated_bytes -= total,
1675 None => panic!("Failed to decrement for unknown owner: {}", owner_object_id),
1676 }
1677 }
1678 }
1679
1680 pub fn reserve(
1683 self: Arc<Self>,
1684 owner_object_id: Option<u64>,
1685 amount: u64,
1686 ) -> Option<Reservation> {
1687 {
1688 let mut inner = self.inner.lock();
1689
1690 let device_free = (Saturating(self.device_size) - inner.used_bytes()).0;
1691
1692 let limit = match owner_object_id {
1693 Some(id) => std::cmp::min(inner.owner_id_bytes_left(id), device_free),
1694 None => device_free,
1695 };
1696 if limit < amount {
1697 return None;
1698 }
1699 inner.add_reservation(owner_object_id, amount);
1700 }
1701 Some(Reservation::new(self, owner_object_id, amount))
1702 }
1703
1704 pub fn reserve_with(
1707 self: Arc<Self>,
1708 owner_object_id: Option<u64>,
1709 amount: impl FnOnce(u64) -> u64,
1710 ) -> Reservation {
1711 let amount = {
1712 let mut inner = self.inner.lock();
1713
1714 let device_free = (Saturating(self.device_size) - inner.used_bytes()).0;
1715
1716 let amount = amount(match owner_object_id {
1717 Some(id) => std::cmp::min(inner.owner_id_bytes_left(id), device_free),
1718 None => device_free,
1719 });
1720
1721 inner.add_reservation(owner_object_id, amount);
1722
1723 amount
1724 };
1725
1726 Reservation::new(self, owner_object_id, amount)
1727 }
1728
1729 pub fn get_allocated_bytes(&self) -> u64 {
1731 self.inner.lock().allocated_bytes().0
1732 }
1733
1734 pub fn get_disk_bytes(&self) -> u64 {
1736 self.device_size
1737 }
1738
1739 pub fn get_owner_allocated_bytes(&self) -> BTreeMap<u64, u64> {
1743 self.inner.lock().owner_bytes.iter().map(|(k, v)| (*k, v.allocated_bytes.0)).collect()
1744 }
1745
1746 pub fn get_used_bytes(&self) -> Saturating<u64> {
1748 let inner = self.inner.lock();
1749 inner.used_bytes()
1750 }
1751}
1752
1753impl ReservationOwner for Allocator {
1754 fn release_reservation(&self, owner_object_id: Option<u64>, amount: u64) {
1755 self.inner.lock().remove_reservation(owner_object_id, amount);
1756 }
1757}
1758
1759#[async_trait]
1760impl JournalingObject for Allocator {
1761 fn apply_mutation(
1762 &self,
1763 mutation: Mutation,
1764 context: &ApplyContext<'_, '_>,
1765 _assoc_obj: AssocObj<'_>,
1766 ) -> Result<(), Error> {
1767 match mutation {
1768 Mutation::Allocator(AllocatorMutation::MarkForDeletion(owner_object_id)) => {
1769 let mut inner = self.inner.lock();
1770 inner.owner_bytes.remove(&owner_object_id);
1771
1772 inner.info.marked_for_deletion.insert(owner_object_id);
1777 inner.volumes_deleted_pending_sync.insert(owner_object_id);
1778
1779 inner.info.limit_bytes.remove(&owner_object_id);
1780 }
1781 Mutation::Allocator(AllocatorMutation::Allocate { device_range, owner_object_id }) => {
1782 self.maximum_offset.fetch_max(device_range.end, Ordering::Relaxed);
1783 let item = AllocatorItem {
1784 key: AllocatorKey { device_range: device_range.clone().into() },
1785 value: AllocatorValue::Abs { count: 1, owner_object_id },
1786 sequence: context.checkpoint.file_offset,
1787 };
1788 let len = item.key.device_range.length().unwrap();
1789 let lower_bound = item.key.lower_bound_for_merge_into();
1790 self.tree.merge_into(item, &lower_bound);
1791 let mut inner = self.inner.lock();
1792 let entry = inner.owner_bytes.entry(owner_object_id).or_default();
1793 entry.allocated_bytes += len;
1794 if let ApplyMode::Live(transaction) = context.mode {
1795 entry.uncommitted_allocated_bytes -= len;
1796 inner.dropped_temporary_allocations.push(device_range.into());
1801 if let Some(reservation) = transaction.allocator_reservation {
1802 reservation.commit(len);
1803 }
1804 }
1805 }
1806 Mutation::Allocator(AllocatorMutation::Deallocate {
1807 device_range,
1808 owner_object_id,
1809 }) => {
1810 let item = AllocatorItem {
1811 key: AllocatorKey { device_range: device_range.into() },
1812 value: AllocatorValue::None,
1813 sequence: context.checkpoint.file_offset,
1814 };
1815 let len = item.key.device_range.length().unwrap();
1816
1817 {
1818 let mut inner = self.inner.lock();
1819 {
1820 let entry = inner.owner_bytes.entry(owner_object_id).or_default();
1821 entry.allocated_bytes -= len;
1822 if context.mode.is_live() {
1823 entry.committed_deallocated_bytes += len;
1824 }
1825 }
1826 if context.mode.is_live() {
1827 inner.committed_deallocated.push_back(CommittedDeallocation {
1828 log_file_offset: context.checkpoint.file_offset,
1829 range: item.key.device_range.clone(),
1830 owner_object_id,
1831 });
1832 }
1833 if let ApplyMode::Live(Transaction {
1834 allocator_reservation: Some(reservation),
1835 ..
1836 }) = context.mode
1837 {
1838 inner.add_reservation(reservation.owner_object_id(), len);
1839 reservation.add(len);
1840 }
1841 }
1842 let lower_bound = item.key.lower_bound_for_merge_into();
1843 self.tree.merge_into(item, &lower_bound);
1844 }
1845 Mutation::Allocator(AllocatorMutation::SetLimit { owner_object_id, bytes }) => {
1846 self.inner.lock().info.limit_bytes.insert(owner_object_id, bytes);
1851 }
1852 Mutation::BeginFlush => {
1853 self.tree.seal();
1854 let mut inner = self.inner.lock();
1857 let allocated_bytes =
1858 inner.owner_bytes.iter().map(|(k, v)| (*k, v.allocated_bytes.0)).collect();
1859 inner.info.allocated_bytes = allocated_bytes;
1860 }
1861 Mutation::EndFlush => {}
1862 _ => bail!("unexpected mutation: {:?}", mutation),
1863 }
1864 Ok(())
1865 }
1866
1867 fn drop_mutation(&self, mutation: Mutation, transaction: &Transaction<'_>) {
1868 match mutation {
1869 Mutation::Allocator(AllocatorMutation::Allocate { device_range, owner_object_id }) => {
1870 let len = device_range.length().unwrap();
1871 let mut inner = self.inner.lock();
1872 inner
1873 .owner_bytes
1874 .entry(owner_object_id)
1875 .or_default()
1876 .uncommitted_allocated_bytes -= len;
1877 if let Some(reservation) = transaction.allocator_reservation {
1878 let res_owner = reservation.owner_object_id();
1879 inner.add_reservation(res_owner, len);
1880 reservation.release_reservation(res_owner, len);
1881 }
1882 inner.strategy.free(device_range.clone().into()).expect("drop mutaton");
1883 self.temporary_allocations
1884 .erase(&AllocatorKey { device_range: device_range.into() });
1885 }
1886 Mutation::Allocator(AllocatorMutation::Deallocate { device_range, .. }) => {
1887 self.temporary_allocations
1888 .erase(&AllocatorKey { device_range: device_range.into() });
1889 }
1890 _ => {}
1891 }
1892 }
1893
1894 async fn flush(&self) -> Result<Version, Error> {
1895 let filesystem = self.filesystem.upgrade().unwrap();
1896 let object_manager = filesystem.object_manager();
1897 let earliest_version = self.tree.get_earliest_version();
1898 if !object_manager.needs_flush(self.object_id()) && earliest_version == LATEST_VERSION {
1899 return Ok(earliest_version);
1901 }
1902
1903 let fs = self.filesystem.upgrade().unwrap();
1904 let mut flusher = Flusher::new(self, &fs).await;
1905 let new_layer_file = flusher.start().await?;
1906 flusher.finish(new_layer_file).await
1907 }
1908}
1909
1910pub struct CoalescingIterator<I> {
1923 iter: I,
1924 item: Option<AllocatorItem>,
1925}
1926
1927impl<I: LayerIterator<AllocatorKey, AllocatorValue>> CoalescingIterator<I> {
1928 pub async fn new(iter: I) -> Result<CoalescingIterator<I>, Error> {
1929 let mut iter = Self { iter, item: None };
1930 iter.advance().await?;
1931 Ok(iter)
1932 }
1933}
1934
1935#[async_trait]
1936impl<I: LayerIterator<AllocatorKey, AllocatorValue>> LayerIterator<AllocatorKey, AllocatorValue>
1937 for CoalescingIterator<I>
1938{
1939 async fn advance(&mut self) -> Result<(), Error> {
1940 self.item = self.iter.get().map(|x| x.cloned());
1941 if self.item.is_none() {
1942 return Ok(());
1943 }
1944 let left = self.item.as_mut().unwrap();
1945 loop {
1946 self.iter.advance().await?;
1947 match self.iter.get() {
1948 None => return Ok(()),
1949 Some(right) => {
1950 ensure!(
1952 left.key.device_range.end <= right.key.device_range.start,
1953 FxfsError::Inconsistent
1954 );
1955 if left.key.device_range.end < right.key.device_range.start
1957 || left.value != *right.value
1958 {
1959 return Ok(());
1960 }
1961 left.key.device_range.end = right.key.device_range.end;
1962 }
1963 }
1964 }
1965 }
1966
1967 fn get(&self) -> Option<ItemRef<'_, AllocatorKey, AllocatorValue>> {
1968 self.item.as_ref().map(|x| x.as_item_ref())
1969 }
1970}
1971
1972struct Flusher<'a> {
1973 allocator: &'a Allocator,
1974 fs: &'a Arc<FxFilesystem>,
1975 _guard: WriteGuard<'a>,
1976}
1977
1978impl<'a> Flusher<'a> {
1979 async fn new(allocator: &'a Allocator, fs: &'a Arc<FxFilesystem>) -> Self {
1980 let keys = lock_keys![LockKey::flush(allocator.object_id())];
1981 Self { allocator, fs, _guard: fs.lock_manager().write_lock(keys).await }
1982 }
1983
1984 fn txn_options(allocator_reservation: &Reservation) -> Options<'_> {
1985 Options {
1986 skip_journal_checks: true,
1987 borrow_metadata_space: true,
1988 allocator_reservation: Some(allocator_reservation),
1989 ..Default::default()
1990 }
1991 }
1992
1993 async fn start(&mut self) -> Result<DataObjectHandle<ObjectStore>, Error> {
1994 let object_manager = self.fs.object_manager();
1995 let mut transaction = self
1996 .fs
1997 .clone()
1998 .new_transaction(lock_keys![], Self::txn_options(object_manager.metadata_reservation()))
1999 .await?;
2000
2001 let root_store = self.fs.root_store();
2002 let layer_object_handle = ObjectStore::create_object(
2003 &root_store,
2004 &mut transaction,
2005 HandleOptions { skip_journal_checks: true, ..Default::default() },
2006 None,
2007 )
2008 .await?;
2009 root_store.add_to_graveyard(&mut transaction, layer_object_handle.object_id());
2010 transaction.add(self.allocator.object_id(), Mutation::BeginFlush);
2017 transaction.commit().await?;
2018 Ok(layer_object_handle)
2019 }
2020
2021 async fn finish(
2022 self,
2023 layer_object_handle: DataObjectHandle<ObjectStore>,
2024 ) -> Result<Version, Error> {
2025 let object_manager = self.fs.object_manager();
2026 let txn_options = Self::txn_options(object_manager.metadata_reservation());
2027
2028 let layer_set = self.allocator.tree.immutable_layer_set();
2029 let total_len = layer_set.sum_len();
2030 {
2031 let mut merger = layer_set.merger();
2032 let iter = self.allocator.filter(merger.query(Query::FullScan).await?, true).await?;
2033 let iter = CoalescingIterator::new(iter).await?;
2034 self.allocator
2035 .tree
2036 .compact_with_iterator(
2037 iter,
2038 total_len,
2039 DirectWriter::new(&layer_object_handle, txn_options).await,
2040 layer_object_handle.block_size(),
2041 )
2042 .await?;
2043 }
2044
2045 let root_store = self.fs.root_store();
2046
2047 let object_handle;
2049 let reservation_update;
2050 let mut transaction = self
2051 .fs
2052 .clone()
2053 .new_transaction(
2054 lock_keys![LockKey::object(
2055 root_store.store_object_id(),
2056 self.allocator.object_id()
2057 )],
2058 txn_options,
2059 )
2060 .await?;
2061 let mut serialized_info = Vec::new();
2062
2063 debug!(oid = layer_object_handle.object_id(); "new allocator layer file");
2064 object_handle = ObjectStore::open_object(
2065 &root_store,
2066 self.allocator.object_id(),
2067 HandleOptions::default(),
2068 None,
2069 )
2070 .await?;
2071
2072 let (new_info, marked_for_deletion) = {
2078 let mut info = self.allocator.inner.lock().info.clone();
2079
2080 let marked_for_deletion = std::mem::take(&mut info.marked_for_deletion);
2083
2084 for object_id in &info.layers {
2086 root_store.add_to_graveyard(&mut transaction, *object_id);
2087 }
2088
2089 info.layers = vec![layer_object_handle.object_id()];
2090
2091 (info, marked_for_deletion)
2092 };
2093 new_info.serialize_with_version(&mut serialized_info)?;
2094
2095 let mut buf = object_handle.allocate_buffer(serialized_info.len()).await;
2096 buf.as_mut_slice()[..serialized_info.len()].copy_from_slice(&serialized_info[..]);
2097 object_handle.txn_write(&mut transaction, 0u64, buf.as_ref()).await?;
2098
2099 reservation_update = ReservationUpdate::new(tree::reservation_amount_from_layer_size(
2100 layer_object_handle.get_size(),
2101 ));
2102 let layer_file_sizes = vec![layer_object_handle.get_size()];
2103
2104 transaction.add_with_object(
2107 self.allocator.object_id(),
2108 Mutation::EndFlush,
2109 AssocObj::Borrowed(&reservation_update),
2110 );
2111 root_store.remove_from_graveyard(&mut transaction, layer_object_handle.object_id());
2112
2113 let layers = layers_from_handles([layer_object_handle]).await?;
2114 transaction
2115 .commit_with_callback(|_| {
2116 self.allocator.tree.set_layers(layers);
2117
2118 let mut inner = self.allocator.inner.lock();
2122 inner.info = new_info;
2123 for owner_id in marked_for_deletion {
2124 inner.marked_for_deletion.remove(&owner_id);
2125 }
2126 })
2127 .await?;
2128
2129 for layer in layer_set.layers {
2131 let object_id = layer.handle().map(|h| h.object_id());
2132 layer.close_layer().await;
2133 if let Some(object_id) = object_id {
2134 root_store.tombstone_object(object_id, txn_options).await?;
2135 }
2136 }
2137
2138 let mut counters = self.allocator.counters.lock();
2139 counters.num_flushes += 1;
2140 counters.last_flush_time = Some(std::time::SystemTime::now());
2141 counters.persistent_layer_file_sizes = layer_file_sizes;
2142 Ok(self.allocator.tree.get_earliest_version())
2144 }
2145}
2146
2147#[cfg(test)]
2148mod tests {
2149 use crate::filesystem::{
2150 FxFilesystem, FxFilesystemBuilder, JournalingObject, OpenFxFilesystem,
2151 };
2152 use crate::fsck::fsck;
2153 use crate::lsm_tree::cache::NullCache;
2154 use crate::lsm_tree::skip_list_layer::SkipListLayer;
2155 use crate::lsm_tree::types::{Item, ItemRef, Layer, LayerIterator};
2156 use crate::lsm_tree::{LSMTree, Query};
2157 use crate::object_handle::ObjectHandle;
2158 use crate::object_store::allocator::merge::merge;
2159 use crate::object_store::allocator::{
2160 Allocator, AllocatorKey, AllocatorValue, CoalescingIterator,
2161 };
2162 use crate::object_store::transaction::{lock_keys, Options, TRANSACTION_METADATA_MAX_AMOUNT};
2163 use crate::object_store::volume::root_volume;
2164 use crate::object_store::{Directory, LockKey, ObjectStore, NO_OWNER};
2165 use crate::range::RangeExt;
2166 use crate::round::round_up;
2167 use fuchsia_async as fasync;
2168 use fuchsia_sync::Mutex;
2169 use std::cmp::{max, min};
2170 use std::ops::{Bound, Range};
2171 use std::sync::Arc;
2172 use storage_device::fake_device::FakeDevice;
2173 use storage_device::DeviceHolder;
2174
2175 #[fuchsia::test]
2176 async fn test_coalescing_iterator() {
2177 let skip_list = SkipListLayer::new(100);
2178 let items = [
2179 Item::new(
2180 AllocatorKey { device_range: 0..100 },
2181 AllocatorValue::Abs { count: 1, owner_object_id: 99 },
2182 ),
2183 Item::new(
2184 AllocatorKey { device_range: 100..200 },
2185 AllocatorValue::Abs { count: 1, owner_object_id: 99 },
2186 ),
2187 ];
2188 skip_list.insert(items[1].clone()).expect("insert error");
2189 skip_list.insert(items[0].clone()).expect("insert error");
2190 let mut iter =
2191 CoalescingIterator::new(skip_list.seek(Bound::Unbounded).await.expect("seek failed"))
2192 .await
2193 .expect("new failed");
2194 let ItemRef { key, value, .. } = iter.get().expect("get failed");
2195 assert_eq!(
2196 (key, value),
2197 (
2198 &AllocatorKey { device_range: 0..200 },
2199 &AllocatorValue::Abs { count: 1, owner_object_id: 99 }
2200 )
2201 );
2202 iter.advance().await.expect("advance failed");
2203 assert!(iter.get().is_none());
2204 }
2205
2206 #[fuchsia::test]
2207 async fn test_merge_and_coalesce_across_three_layers() {
2208 let lsm_tree = LSMTree::new(merge, Box::new(NullCache {}));
2209 lsm_tree
2210 .insert(Item::new(
2211 AllocatorKey { device_range: 100..200 },
2212 AllocatorValue::Abs { count: 1, owner_object_id: 99 },
2213 ))
2214 .expect("insert error");
2215 lsm_tree.seal();
2216 lsm_tree
2217 .insert(Item::new(
2218 AllocatorKey { device_range: 0..100 },
2219 AllocatorValue::Abs { count: 1, owner_object_id: 99 },
2220 ))
2221 .expect("insert error");
2222
2223 let layer_set = lsm_tree.layer_set();
2224 let mut merger = layer_set.merger();
2225 let mut iter =
2226 CoalescingIterator::new(merger.query(Query::FullScan).await.expect("seek failed"))
2227 .await
2228 .expect("new failed");
2229 let ItemRef { key, value, .. } = iter.get().expect("get failed");
2230 assert_eq!(
2231 (key, value),
2232 (
2233 &AllocatorKey { device_range: 0..200 },
2234 &AllocatorValue::Abs { count: 1, owner_object_id: 99 }
2235 )
2236 );
2237 iter.advance().await.expect("advance failed");
2238 assert!(iter.get().is_none());
2239 }
2240
2241 #[fuchsia::test]
2242 async fn test_merge_and_coalesce_wont_merge_across_object_id() {
2243 let lsm_tree = LSMTree::new(merge, Box::new(NullCache {}));
2244 lsm_tree
2245 .insert(Item::new(
2246 AllocatorKey { device_range: 100..200 },
2247 AllocatorValue::Abs { count: 1, owner_object_id: 99 },
2248 ))
2249 .expect("insert error");
2250 lsm_tree.seal();
2251 lsm_tree
2252 .insert(Item::new(
2253 AllocatorKey { device_range: 0..100 },
2254 AllocatorValue::Abs { count: 1, owner_object_id: 98 },
2255 ))
2256 .expect("insert error");
2257
2258 let layer_set = lsm_tree.layer_set();
2259 let mut merger = layer_set.merger();
2260 let mut iter =
2261 CoalescingIterator::new(merger.query(Query::FullScan).await.expect("seek failed"))
2262 .await
2263 .expect("new failed");
2264 let ItemRef { key, value, .. } = iter.get().expect("get failed");
2265 assert_eq!(
2266 (key, value),
2267 (
2268 &AllocatorKey { device_range: 0..100 },
2269 &AllocatorValue::Abs { count: 1, owner_object_id: 98 },
2270 )
2271 );
2272 iter.advance().await.expect("advance failed");
2273 let ItemRef { key, value, .. } = iter.get().expect("get failed");
2274 assert_eq!(
2275 (key, value),
2276 (
2277 &AllocatorKey { device_range: 100..200 },
2278 &AllocatorValue::Abs { count: 1, owner_object_id: 99 }
2279 )
2280 );
2281 iter.advance().await.expect("advance failed");
2282 assert!(iter.get().is_none());
2283 }
2284
2285 fn overlap(a: &Range<u64>, b: &Range<u64>) -> u64 {
2286 if a.end > b.start && a.start < b.end {
2287 min(a.end, b.end) - max(a.start, b.start)
2288 } else {
2289 0
2290 }
2291 }
2292
2293 async fn collect_allocations(allocator: &Allocator) -> Vec<Range<u64>> {
2294 let layer_set = allocator.tree.layer_set();
2295 let mut merger = layer_set.merger();
2296 let mut iter = allocator
2297 .filter(merger.query(Query::FullScan).await.expect("seek failed"), false)
2298 .await
2299 .expect("build iterator");
2300 let mut allocations: Vec<Range<u64>> = Vec::new();
2301 while let Some(ItemRef { key: AllocatorKey { device_range }, .. }) = iter.get() {
2302 if let Some(r) = allocations.last() {
2303 assert!(device_range.start >= r.end);
2304 }
2305 allocations.push(device_range.clone());
2306 iter.advance().await.expect("advance failed");
2307 }
2308 allocations
2309 }
2310
2311 async fn check_allocations(allocator: &Allocator, expected_allocations: &[Range<u64>]) {
2312 let layer_set = allocator.tree.layer_set();
2313 let mut merger = layer_set.merger();
2314 let mut iter = allocator
2315 .filter(merger.query(Query::FullScan).await.expect("seek failed"), false)
2316 .await
2317 .expect("build iterator");
2318 let mut found = 0;
2319 while let Some(ItemRef { key: AllocatorKey { device_range }, .. }) = iter.get() {
2320 let mut l = device_range.length().expect("Invalid range");
2321 found += l;
2322 for range in expected_allocations {
2325 l -= overlap(range, device_range);
2326 if l == 0 {
2327 break;
2328 }
2329 }
2330 assert_eq!(l, 0, "range {device_range:?} not covered by expectations");
2331 iter.advance().await.expect("advance failed");
2332 }
2333 assert_eq!(found, expected_allocations.iter().map(|r| r.length().unwrap()).sum::<u64>());
2335 }
2336
2337 async fn test_fs() -> (OpenFxFilesystem, Arc<Allocator>) {
2338 let device = DeviceHolder::new(FakeDevice::new(4096, 4096));
2339 let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2340 let allocator = fs.allocator();
2341 (fs, allocator)
2342 }
2343
2344 #[fuchsia::test]
2345 async fn test_allocations() {
2346 const STORE_OBJECT_ID: u64 = 99;
2347 let (fs, allocator) = test_fs().await;
2348 let mut transaction =
2349 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2350 let mut device_ranges = collect_allocations(&allocator).await;
2351
2352 let expected = vec![
2354 0..4096, 4096..139264, 139264..204800, 204800..335872, 335872..401408, 524288..528384, ];
2361 assert_eq!(device_ranges, expected);
2362 device_ranges.push(
2363 allocator
2364 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2365 .await
2366 .expect("allocate failed"),
2367 );
2368 assert_eq!(device_ranges.last().unwrap().length().expect("Invalid range"), fs.block_size());
2369 device_ranges.push(
2370 allocator
2371 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2372 .await
2373 .expect("allocate failed"),
2374 );
2375 assert_eq!(device_ranges.last().unwrap().length().expect("Invalid range"), fs.block_size());
2376 assert_eq!(overlap(&device_ranges[0], &device_ranges[1]), 0);
2377 transaction.commit().await.expect("commit failed");
2378 let mut transaction =
2379 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2380 device_ranges.push(
2381 allocator
2382 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2383 .await
2384 .expect("allocate failed"),
2385 );
2386 assert_eq!(device_ranges[7].length().expect("Invalid range"), fs.block_size());
2387 assert_eq!(overlap(&device_ranges[5], &device_ranges[7]), 0);
2388 assert_eq!(overlap(&device_ranges[6], &device_ranges[7]), 0);
2389 transaction.commit().await.expect("commit failed");
2390
2391 check_allocations(&allocator, &device_ranges).await;
2392 }
2393
2394 #[fuchsia::test]
2395 async fn test_allocate_more_than_max_size() {
2396 const STORE_OBJECT_ID: u64 = 99;
2397 let (fs, allocator) = test_fs().await;
2398 let mut transaction =
2399 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2400 let mut device_ranges = collect_allocations(&allocator).await;
2401 device_ranges.push(
2402 allocator
2403 .allocate(&mut transaction, STORE_OBJECT_ID, fs.device().size())
2404 .await
2405 .expect("allocate failed"),
2406 );
2407 assert_eq!(
2408 device_ranges.last().unwrap().length().expect("Invalid range"),
2409 allocator.max_extent_size_bytes
2410 );
2411 transaction.commit().await.expect("commit failed");
2412
2413 check_allocations(&allocator, &device_ranges).await;
2414 }
2415
2416 #[fuchsia::test]
2417 async fn test_deallocations() {
2418 const STORE_OBJECT_ID: u64 = 99;
2419 let (fs, allocator) = test_fs().await;
2420 let initial_allocations = collect_allocations(&allocator).await;
2421
2422 let mut transaction =
2423 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2424 let device_range1 = allocator
2425 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2426 .await
2427 .expect("allocate failed");
2428 assert_eq!(device_range1.length().expect("Invalid range"), fs.block_size());
2429 transaction.commit().await.expect("commit failed");
2430
2431 let mut transaction =
2432 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2433 allocator
2434 .deallocate(&mut transaction, STORE_OBJECT_ID, device_range1)
2435 .await
2436 .expect("deallocate failed");
2437 transaction.commit().await.expect("commit failed");
2438
2439 check_allocations(&allocator, &initial_allocations).await;
2440 }
2441
2442 #[fuchsia::test]
2443 async fn test_mark_allocated() {
2444 const STORE_OBJECT_ID: u64 = 99;
2445 let (fs, allocator) = test_fs().await;
2446 let mut device_ranges = collect_allocations(&allocator).await;
2447 let range = {
2448 let mut transaction = fs
2449 .clone()
2450 .new_transaction(lock_keys![], Options::default())
2451 .await
2452 .expect("new failed");
2453 allocator
2455 .allocate(&mut transaction, STORE_OBJECT_ID, 2 * fs.block_size())
2456 .await
2457 .expect("allocate failed")
2458 };
2460
2461 let mut transaction =
2462 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2463
2464 device_ranges.push(
2467 allocator
2468 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2469 .await
2470 .expect("allocate failed"),
2471 );
2472
2473 assert_eq!(device_ranges.last().unwrap().start, range.start);
2474
2475 let mut range2 = range.clone();
2477 range2.start += fs.block_size();
2478 allocator
2479 .mark_allocated(&mut transaction, STORE_OBJECT_ID, range2.clone())
2480 .await
2481 .expect("mark_allocated failed");
2482 device_ranges.push(range2);
2483
2484 device_ranges.push(
2486 allocator
2487 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2488 .await
2489 .expect("allocate failed"),
2490 );
2491 let last_range = device_ranges.last().unwrap();
2492 assert_eq!(last_range.length().expect("Invalid range"), fs.block_size());
2493 assert_eq!(overlap(last_range, &range), 0);
2494 transaction.commit().await.expect("commit failed");
2495
2496 check_allocations(&allocator, &device_ranges).await;
2497 }
2498
2499 #[fuchsia::test]
2500 async fn test_mark_for_deletion() {
2501 const STORE_OBJECT_ID: u64 = 99;
2502 let (fs, allocator) = test_fs().await;
2503
2504 let initial_allocated_bytes = allocator.get_allocated_bytes();
2506 let mut device_ranges = collect_allocations(&allocator).await;
2507 let mut transaction =
2508 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2509 for _ in 0..15 {
2511 device_ranges.push(
2512 allocator
2513 .allocate(&mut transaction, STORE_OBJECT_ID, 100 * fs.block_size())
2514 .await
2515 .expect("allocate failed"),
2516 );
2517 device_ranges.push(
2518 allocator
2519 .allocate(&mut transaction, STORE_OBJECT_ID, 100 * fs.block_size())
2520 .await
2521 .expect("allocate2 failed"),
2522 );
2523 }
2524 transaction.commit().await.expect("commit failed");
2525 check_allocations(&allocator, &device_ranges).await;
2526
2527 assert_eq!(
2528 allocator.get_allocated_bytes(),
2529 initial_allocated_bytes + fs.block_size() * 3000
2530 );
2531
2532 let mut transaction =
2534 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2535 allocator.mark_for_deletion(&mut transaction, STORE_OBJECT_ID).await;
2536 transaction.commit().await.expect("commit failed");
2537
2538 assert_eq!(allocator.get_allocated_bytes(), initial_allocated_bytes);
2540 check_allocations(&allocator, &device_ranges).await;
2541
2542 device_ranges.clear();
2545
2546 let mut transaction =
2547 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2548 let target_bytes = 1500 * fs.block_size();
2549 while device_ranges.iter().map(|x| x.length().unwrap()).sum::<u64>() != target_bytes {
2550 let len = std::cmp::min(
2551 target_bytes - device_ranges.iter().map(|x| x.length().unwrap()).sum::<u64>(),
2552 100 * fs.block_size(),
2553 );
2554 device_ranges.push(
2555 allocator.allocate(&mut transaction, 100, len).await.expect("allocate failed"),
2556 );
2557 }
2558 transaction.commit().await.expect("commit failed");
2559
2560 allocator.flush().await.expect("flush failed");
2562
2563 assert!(!allocator.get_owner_allocated_bytes().contains_key(&STORE_OBJECT_ID));
2567 assert_eq!(*allocator.get_owner_allocated_bytes().get(&100).unwrap(), target_bytes,);
2568 }
2569
2570 async fn create_file(store: &Arc<ObjectStore>, size: usize) {
2571 let root_directory =
2572 Directory::open(store, store.root_directory_object_id()).await.expect("open failed");
2573
2574 let mut transaction = store
2575 .filesystem()
2576 .new_transaction(
2577 lock_keys![LockKey::object(
2578 store.store_object_id(),
2579 store.root_directory_object_id()
2580 )],
2581 Options::default(),
2582 )
2583 .await
2584 .expect("new_transaction failed");
2585 let file = root_directory
2586 .create_child_file(&mut transaction, &format!("foo {}", size))
2587 .await
2588 .expect("create_child_file failed");
2589 transaction.commit().await.expect("commit failed");
2590
2591 let buffer = file.allocate_buffer(size).await;
2592
2593 let mut transaction = file
2595 .new_transaction_with_options(Options {
2596 borrow_metadata_space: true,
2597 ..Default::default()
2598 })
2599 .await
2600 .expect("new_transaction_with_options failed");
2601 file.txn_write(&mut transaction, 0, buffer.as_ref()).await.expect("txn_write failed");
2602 transaction.commit().await.expect("commit failed");
2603 }
2604
2605 #[fuchsia::test]
2606 async fn test_replay_with_deleted_store_and_compaction() {
2607 let (fs, _) = test_fs().await;
2608
2609 const FILE_SIZE: usize = 10_000_000;
2610
2611 {
2612 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2613 let store =
2614 root_vol.new_volume("vol", NO_OWNER, None).await.expect("new_volume failed");
2615
2616 create_file(&store, FILE_SIZE).await;
2617 }
2618
2619 fs.close().await.expect("close failed");
2620 let device = fs.take_device().await;
2621 device.reopen(false);
2622
2623 let mut fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2624
2625 fs.journal().compact().await.expect("compact failed");
2628
2629 for _ in 0..2 {
2630 {
2631 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2632
2633 let transaction = fs
2634 .clone()
2635 .new_transaction(
2636 lock_keys![LockKey::object(
2637 root_vol.volume_directory().store().store_object_id(),
2638 root_vol.volume_directory().object_id(),
2639 )],
2640 Options { borrow_metadata_space: true, ..Default::default() },
2641 )
2642 .await
2643 .expect("new_transaction failed");
2644 root_vol
2645 .delete_volume("vol", transaction, || {})
2646 .await
2647 .expect("delete_volume failed");
2648
2649 let store =
2650 root_vol.new_volume("vol", NO_OWNER, None).await.expect("new_volume failed");
2651 create_file(&store, FILE_SIZE).await;
2652 }
2653
2654 fs.close().await.expect("close failed");
2655 let device = fs.take_device().await;
2656 device.reopen(false);
2657
2658 fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2659 }
2660
2661 fsck(fs.clone()).await.expect("fsck failed");
2662 fs.close().await.expect("close failed");
2663 }
2664
2665 #[fuchsia::test]
2666 async fn test_delete_multiple_volumes() {
2667 let (mut fs, _) = test_fs().await;
2668
2669 for _ in 0..50 {
2670 {
2671 let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2672 let store =
2673 root_vol.new_volume("vol", NO_OWNER, None).await.expect("new_volume failed");
2674
2675 create_file(&store, 1_000_000).await;
2676
2677 let transaction = fs
2678 .clone()
2679 .new_transaction(
2680 lock_keys![LockKey::object(
2681 root_vol.volume_directory().store().store_object_id(),
2682 root_vol.volume_directory().object_id(),
2683 )],
2684 Options { borrow_metadata_space: true, ..Default::default() },
2685 )
2686 .await
2687 .expect("new_transaction failed");
2688 root_vol
2689 .delete_volume("vol", transaction, || {})
2690 .await
2691 .expect("delete_volume failed");
2692
2693 fs.allocator().flush().await.expect("flush failed");
2694 }
2695
2696 fs.close().await.expect("close failed");
2697 let device = fs.take_device().await;
2698 device.reopen(false);
2699
2700 fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2701 }
2702
2703 fsck(fs.clone()).await.expect("fsck failed");
2704 fs.close().await.expect("close failed");
2705 }
2706
2707 #[fuchsia::test]
2708 async fn test_allocate_free_reallocate() {
2709 const STORE_OBJECT_ID: u64 = 99;
2710 let (fs, allocator) = test_fs().await;
2711
2712 let mut device_ranges = Vec::new();
2714 let mut transaction =
2715 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2716 for _ in 0..30 {
2717 device_ranges.push(
2718 allocator
2719 .allocate(&mut transaction, STORE_OBJECT_ID, 100 * fs.block_size())
2720 .await
2721 .expect("allocate failed"),
2722 );
2723 }
2724 transaction.commit().await.expect("commit failed");
2725
2726 assert_eq!(
2727 fs.block_size() * 3000,
2728 *allocator.get_owner_allocated_bytes().entry(STORE_OBJECT_ID).or_default()
2729 );
2730
2731 let mut transaction =
2733 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2734 for range in std::mem::replace(&mut device_ranges, Vec::new()) {
2735 allocator.deallocate(&mut transaction, STORE_OBJECT_ID, range).await.expect("dealloc");
2736 }
2737 transaction.commit().await.expect("commit failed");
2738
2739 assert_eq!(0, *allocator.get_owner_allocated_bytes().entry(STORE_OBJECT_ID).or_default());
2740
2741 let mut transaction =
2744 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2745 let target_len = 1500 * fs.block_size();
2746 while device_ranges.iter().map(|i| i.length().unwrap()).sum::<u64>() != target_len {
2747 let len = target_len - device_ranges.iter().map(|i| i.length().unwrap()).sum::<u64>();
2748 device_ranges.push(
2749 allocator
2750 .allocate(&mut transaction, STORE_OBJECT_ID, len)
2751 .await
2752 .expect("allocate failed"),
2753 );
2754 }
2755 transaction.commit().await.expect("commit failed");
2756
2757 assert_eq!(
2758 fs.block_size() * 1500,
2759 *allocator.get_owner_allocated_bytes().entry(STORE_OBJECT_ID).or_default()
2760 );
2761 }
2762
2763 #[fuchsia::test]
2764 async fn test_flush() {
2765 const STORE_OBJECT_ID: u64 = 99;
2766
2767 let mut device_ranges = Vec::new();
2768 let device = {
2769 let (fs, allocator) = test_fs().await;
2770 let mut transaction = fs
2771 .clone()
2772 .new_transaction(lock_keys![], Options::default())
2773 .await
2774 .expect("new failed");
2775 device_ranges.push(
2776 allocator
2777 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2778 .await
2779 .expect("allocate failed"),
2780 );
2781 device_ranges.push(
2782 allocator
2783 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2784 .await
2785 .expect("allocate failed"),
2786 );
2787 device_ranges.push(
2788 allocator
2789 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2790 .await
2791 .expect("allocate failed"),
2792 );
2793 transaction.commit().await.expect("commit failed");
2794
2795 allocator.flush().await.expect("flush failed");
2796
2797 fs.close().await.expect("close failed");
2798 fs.take_device().await
2799 };
2800
2801 device.reopen(false);
2802 let fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2803 let allocator = fs.allocator();
2804
2805 let allocated = collect_allocations(&allocator).await;
2806
2807 for i in &device_ranges {
2809 let mut overlapping = 0;
2810 for j in &allocated {
2811 overlapping += overlap(i, j);
2812 }
2813 assert_eq!(overlapping, i.length().unwrap(), "Range {i:?} not allocated");
2814 }
2815
2816 let mut transaction =
2817 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2818 let range = allocator
2819 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2820 .await
2821 .expect("allocate failed");
2822
2823 for r in &allocated {
2825 assert_eq!(overlap(r, &range), 0);
2826 }
2827 transaction.commit().await.expect("commit failed");
2828 }
2829
2830 #[fuchsia::test]
2831 async fn test_dropped_transaction() {
2832 const STORE_OBJECT_ID: u64 = 99;
2833 let (fs, allocator) = test_fs().await;
2834 let allocated_range = {
2835 let mut transaction = fs
2836 .clone()
2837 .new_transaction(lock_keys![], Options::default())
2838 .await
2839 .expect("new_transaction failed");
2840 allocator
2841 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2842 .await
2843 .expect("allocate failed")
2844 };
2845 let mut transaction = fs
2848 .clone()
2849 .new_transaction(lock_keys![], Options::default())
2850 .await
2851 .expect("new_transaction failed");
2852 assert_eq!(
2853 allocator
2854 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2855 .await
2856 .expect("allocate failed"),
2857 allocated_range
2858 );
2859 }
2860
2861 #[fuchsia::test]
2862 async fn test_cleanup_removed_owner() {
2863 const STORE_OBJECT_ID: u64 = 99;
2864 let device = {
2865 let (fs, allocator) = test_fs().await;
2866
2867 assert!(!allocator.get_owner_allocated_bytes().contains_key(&STORE_OBJECT_ID));
2868 {
2869 let mut transaction =
2870 fs.clone().new_transaction(lock_keys![], Options::default()).await.unwrap();
2871 allocator
2872 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2873 .await
2874 .expect("Allocating");
2875 transaction.commit().await.expect("Committing.");
2876 }
2877 allocator.flush().await.expect("Flushing");
2878 assert!(allocator.get_owner_allocated_bytes().contains_key(&STORE_OBJECT_ID));
2879 {
2880 let mut transaction =
2881 fs.clone().new_transaction(lock_keys![], Options::default()).await.unwrap();
2882 allocator.mark_for_deletion(&mut transaction, STORE_OBJECT_ID).await;
2883 transaction.commit().await.expect("Committing.");
2884 }
2885 assert!(!allocator.get_owner_allocated_bytes().contains_key(&STORE_OBJECT_ID));
2886 fs.close().await.expect("Closing");
2887 fs.take_device().await
2888 };
2889
2890 device.reopen(false);
2891 let fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2892 let allocator = fs.allocator();
2893 assert!(!allocator.get_owner_allocated_bytes().contains_key(&STORE_OBJECT_ID));
2894 }
2895
2896 #[fuchsia::test]
2897 async fn test_allocated_bytes() {
2898 const STORE_OBJECT_ID: u64 = 99;
2899 let (fs, allocator) = test_fs().await;
2900
2901 let initial_allocated_bytes = allocator.get_allocated_bytes();
2902
2903 let allocated_bytes = initial_allocated_bytes + fs.block_size();
2905 let allocated_range = {
2906 let mut transaction = fs
2907 .clone()
2908 .new_transaction(lock_keys![], Options::default())
2909 .await
2910 .expect("new_transaction failed");
2911 let range = allocator
2912 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2913 .await
2914 .expect("allocate failed");
2915 transaction.commit().await.expect("commit failed");
2916 assert_eq!(allocator.get_allocated_bytes(), allocated_bytes);
2917 range
2918 };
2919
2920 {
2921 let mut transaction = fs
2922 .clone()
2923 .new_transaction(lock_keys![], Options::default())
2924 .await
2925 .expect("new_transaction failed");
2926 allocator
2927 .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2928 .await
2929 .expect("allocate failed");
2930
2931 assert_eq!(allocator.get_allocated_bytes(), allocated_bytes);
2933 }
2934
2935 assert_eq!(allocator.get_allocated_bytes(), allocated_bytes);
2937
2938 let deallocate_range = allocated_range.start + 20..allocated_range.end - 20;
2940 let mut transaction =
2941 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2942 allocator
2943 .deallocate(&mut transaction, STORE_OBJECT_ID, deallocate_range)
2944 .await
2945 .expect("deallocate failed");
2946
2947 assert_eq!(allocator.get_allocated_bytes(), allocated_bytes);
2949
2950 transaction.commit().await.expect("commit failed");
2951
2952 assert_eq!(allocator.get_allocated_bytes(), initial_allocated_bytes + 40);
2954 }
2955
2956 #[fuchsia::test]
2957 async fn test_persist_bytes_limit() {
2958 const LIMIT: u64 = 12345;
2959 const OWNER_ID: u64 = 12;
2960
2961 let (fs, allocator) = test_fs().await;
2962 {
2963 let mut transaction = fs
2964 .clone()
2965 .new_transaction(lock_keys![], Options::default())
2966 .await
2967 .expect("new_transaction failed");
2968 allocator
2969 .set_bytes_limit(&mut transaction, OWNER_ID, LIMIT)
2970 .await
2971 .expect("Failed to set limit.");
2972 assert!(allocator.inner.lock().info.limit_bytes.get(&OWNER_ID).is_none());
2973 transaction.commit().await.expect("Failed to commit transaction");
2974 let bytes: u64 = *allocator
2975 .inner
2976 .lock()
2977 .info
2978 .limit_bytes
2979 .get(&OWNER_ID)
2980 .expect("Failed to find limit");
2981 assert_eq!(LIMIT, bytes);
2982 }
2983 }
2984
2985 fn coalesce_ranges(ranges: Vec<Range<u64>>) -> Vec<Range<u64>> {
2989 let mut coalesced = Vec::new();
2990 let mut prev: Option<Range<u64>> = None;
2991 for range in ranges {
2992 if let Some(prev_range) = &mut prev {
2993 if range.start == prev_range.end {
2994 prev_range.end = range.end;
2995 } else {
2996 coalesced.push(prev_range.clone());
2997 prev = Some(range);
2998 }
2999 } else {
3000 prev = Some(range);
3001 }
3002 }
3003 if let Some(prev_range) = prev {
3004 coalesced.push(prev_range);
3005 }
3006 coalesced
3007 }
3008
3009 #[fuchsia::test]
3010 async fn test_take_for_trimming() {
3011 const STORE_OBJECT_ID: u64 = 99;
3012
3013 let allocated_range;
3016 let expected_free_ranges;
3017 let device = {
3018 let (fs, allocator) = test_fs().await;
3019 let bs = fs.block_size();
3020 let mut transaction = fs
3021 .clone()
3022 .new_transaction(lock_keys![], Options::default())
3023 .await
3024 .expect("new failed");
3025 allocated_range = allocator
3026 .allocate(&mut transaction, STORE_OBJECT_ID, 32 * bs)
3027 .await
3028 .expect("allocate failed");
3029 transaction.commit().await.expect("commit failed");
3030
3031 let mut transaction = fs
3032 .clone()
3033 .new_transaction(lock_keys![], Options::default())
3034 .await
3035 .expect("new failed");
3036 let base = allocated_range.start;
3037 expected_free_ranges = vec![
3038 base..(base + (bs * 1)),
3039 (base + (bs * 2))..(base + (bs * 3)),
3040 (base + (bs * 4))..(base + (bs * 8)),
3044 (base + (bs * 8))..(base + (bs * 12)),
3045 (base + (bs * 12))..(base + (bs * 13)),
3046 (base + (bs * 29))..(base + (bs * 30)),
3047 ];
3048 for range in &expected_free_ranges {
3049 allocator
3050 .deallocate(&mut transaction, STORE_OBJECT_ID, range.clone())
3051 .await
3052 .expect("deallocate failed");
3053 }
3054 transaction.commit().await.expect("commit failed");
3055
3056 allocator.flush().await.expect("flush failed");
3057
3058 fs.close().await.expect("close failed");
3059 fs.take_device().await
3060 };
3061
3062 device.reopen(false);
3063 let fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
3064 let allocator = fs.allocator();
3065
3066 let max_extent_size = fs.block_size() as usize * 4;
3070 const EXTENTS_PER_BATCH: usize = 2;
3071 let mut free_ranges = vec![];
3072 let mut offset = allocated_range.start;
3073 while offset < allocated_range.end {
3074 let free = allocator
3075 .take_for_trimming(offset, max_extent_size, EXTENTS_PER_BATCH)
3076 .await
3077 .expect("take_for_trimming failed");
3078 free_ranges.extend(
3079 free.extents().iter().filter(|range| range.end <= allocated_range.end).cloned(),
3080 );
3081 offset = free.extents().last().expect("Unexpectedly hit the end of free extents").end;
3082 }
3083 let coalesced_free_ranges = coalesce_ranges(free_ranges);
3086 let coalesced_expected_free_ranges = coalesce_ranges(expected_free_ranges);
3087
3088 assert_eq!(coalesced_free_ranges, coalesced_expected_free_ranges);
3089 }
3090
3091 #[fuchsia::test]
3092 async fn test_allocations_wait_for_free_extents() {
3093 const STORE_OBJECT_ID: u64 = 99;
3094 let (fs, allocator) = test_fs().await;
3095 let allocator_clone = allocator.clone();
3096
3097 let mut transaction =
3098 fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
3099
3100 let max_extent_size = fs.device().size() as usize;
3102 const EXTENTS_PER_BATCH: usize = usize::MAX;
3103
3104 let trim_done = Arc::new(Mutex::new(false));
3110 let trimmable_extents = allocator
3111 .take_for_trimming(0, max_extent_size, EXTENTS_PER_BATCH)
3112 .await
3113 .expect("take_for_trimming failed");
3114
3115 let trim_done_clone = trim_done.clone();
3116 let bs = fs.block_size();
3117 let alloc_task = fasync::Task::spawn(async move {
3118 allocator_clone
3119 .allocate(&mut transaction, STORE_OBJECT_ID, bs)
3120 .await
3121 .expect("allocate should fail");
3122 {
3123 assert!(*trim_done_clone.lock(), "Allocation finished before trim completed");
3124 }
3125 transaction.commit().await.expect("commit failed");
3126 });
3127
3128 fasync::Timer::new(std::time::Duration::from_millis(100)).await;
3131
3132 {
3134 let mut trim_done = trim_done.lock();
3135 std::mem::drop(trimmable_extents);
3136 *trim_done = true;
3137 }
3138
3139 alloc_task.await;
3140 }
3141
3142 #[fuchsia::test]
3143 async fn test_allocation_with_reservation_not_multiple_of_block_size() {
3144 const STORE_OBJECT_ID: u64 = 99;
3145 let (fs, allocator) = test_fs().await;
3146
3147 const RESERVATION_AMOUNT: u64 = TRANSACTION_METADATA_MAX_AMOUNT + 5000;
3149 let reservation =
3150 allocator.clone().reserve(Some(STORE_OBJECT_ID), RESERVATION_AMOUNT).unwrap();
3151
3152 let mut transaction = fs
3153 .clone()
3154 .new_transaction(
3155 lock_keys![],
3156 Options { allocator_reservation: Some(&reservation), ..Options::default() },
3157 )
3158 .await
3159 .expect("new failed");
3160
3161 let range = allocator
3162 .allocate(
3163 &mut transaction,
3164 STORE_OBJECT_ID,
3165 round_up(RESERVATION_AMOUNT, fs.block_size()).unwrap(),
3166 )
3167 .await
3168 .expect("allocate faiiled");
3169 assert_eq!((range.end - range.start) % fs.block_size(), 0);
3170
3171 println!("{}", range.end - range.start);
3172 }
3173}