Skip to main content

fxfs/object_store/
allocator.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! # The Allocator
6//!
7//! The allocator in Fxfs is filesystem-wide entity responsible for managing the allocation of
8//! regions of the device to "owners" (which are `ObjectStore`).
9//!
10//! Allocations are tracked in an LSMTree with coalescing used to merge neighboring allocations
11//! with the same properties (owner and reference count). As of writing, reference counting is not
12//! used. (Reference counts are intended for future use if/when snapshot support is implemented.)
13//!
14//! There are a number of important implementation features of the allocator that warrant further
15//! documentation.
16//!
17//! ## Byte limits
18//!
19//! Fxfs is a multi-volume filesystem. Fuchsia with fxblob currently uses two primary volumes -
20//! an unencrypted volume for blob storage and an encrypted volume for data storage.
21//! Byte limits ensure that no one volume can consume all available storage. This is important
22//! as software updates must always be possible (blobs) and conversely configuration data should
23//! always be writable (data).
24//!
25//! ## Reservation tracking
26//!
27//! Fxfs on Fuchsia leverages write-back caching which allows us to buffer writes in RAM until
28//! memory pressure, an explicit flush or a file close requires us to persist data to storage.
29//!
30//! To ensure that we do not over-commit in such cases (e.g. by writing many files to RAM only to
31//! find out tha there is insufficient disk to store them all), the allocator includes a simple
32//! reservation tracking mechanism.
33//!
34//! Reservation tracking is implemented hierarchically such that a reservation can portion out
35//! sub-reservations, each of which may be "reserved" or "forgotten" when it comes time to actually
36//! allocate space.
37//!
38//! ## Fixed locations
39//!
40//! The only fixed location files in Fxfs are the first 512kiB extents of the two superblocks that
41//! exist as the first things on the disk (i.e. The first 1MiB). The allocator manages these
42//! locations, but does so using a 'mark_allocated' method distinct from all other allocations in
43//! which the location is left up to the allocator.
44//!
45//! ## Deallocated unusable regions
46//!
47//! It is not legal to reuse a deallocated disk region until after a flush. Transactions
48//! are not guaranteed to be persisted until after a successful flush so any reuse of
49//! a region before then followed by power loss may lead to corruption.
50//!
51//! e.g. Consider if we deallocate a file, reuse its extent for another file, then crash after
52//! writing to the new file but not yet flushing the journal. At next mount we will have lost the
53//! transaction despite overwriting the original file's data, leading to inconsistency errors (data
54//! corruption).
55//!
56//! These regions are currently tracked in RAM in the allocator.
57//!
58//! ## Allocated but uncommitted regions
59//!
60//! These are allocated regions that are not (yet) persisted to disk. They are regular
61//! file allocations, but are not stored on persistent storage until their transaction is committed
62//! and safely flushed to disk.
63//!
64//! ## TRIMed unusable regions
65//!
66//! We periodically TRIM unallocated regions to give the SSD controller insight into which
67//! parts of the device contain data. We must avoid using temporary TRIM allocations that are held
68//! while we perform these operations.
69//!
70//! ## Volume deletion
71//!
72//! We make use of an optimisation in the case where an entire volume is deleted. In such cases,
73//! rather than individually deallocate all disk regions associated with that volume, we make
74//! note of the deletion and perform special merge operation on the next LSMTree compaction that
75//! filters out allocations for the deleted volume.
76//!
77//! This is designed to make dropping of volumes significantly cheaper, but it does add some
78//! additional complexity if implementing an allocator that implements data structures to track
79//! free space (rather than just allocated space).
80//!
81//! ## Image generation
82//!
83//! The Fuchsia build process requires building an initial filesystem image. In the case of
84//! fxblob-based boards, this is an Fxfs filesystem containing a volume with the base set of
85//! blobs required to bootstrap the system. When we build such an image, we want it to be as compact
86//! as possible as we're potentially packaging it up for distribution. To that end, our allocation
87//! strategy (or at least the strategy used for image generation) should prefer to allocate from the
88//! start of the device wherever possible.
89pub mod merge;
90pub mod strategy;
91
92use crate::drop_event::DropEvent;
93use crate::errors::FxfsError;
94use crate::filesystem::{ApplyContext, ApplyMode, FxFilesystem, JournalingObject, SyncOptions};
95use crate::log::*;
96use crate::lsm_tree::cache::NullCache;
97use crate::lsm_tree::skip_list_layer::SkipListLayer;
98use crate::lsm_tree::types::{
99    FuzzyHash, Item, ItemRef, Layer, LayerIterator, LayerKey, MergeType, OrdLowerBound,
100    OrdUpperBound, SortByU64, Value,
101};
102use crate::lsm_tree::{LSMTree, Query, compact_with_iterator, layers_from_handles};
103use crate::object_handle::{INVALID_OBJECT_ID, ObjectHandle, ReadObjectHandle};
104use crate::object_store::object_manager::ReservationUpdate;
105use crate::object_store::transaction::{
106    AllocatorMutation, AssocObj, LockKey, Mutation, Options, Transaction, WriteGuard, lock_keys,
107};
108use crate::object_store::{
109    DataObjectHandle, DirectWriter, ExtentKey, HandleOptions, ObjectStore, ReservedId, tree,
110};
111use crate::range::RangeExt;
112use crate::round::{round_div, round_down, round_up};
113use crate::serialized_types::{
114    DEFAULT_MAX_SERIALIZED_RECORD_SIZE, LATEST_VERSION, Version, Versioned, VersionedLatest,
115};
116use anyhow::{Context, Error, anyhow, bail, ensure};
117use async_trait::async_trait;
118use either::Either::{Left, Right};
119use event_listener::EventListener;
120use fprint::TypeFingerprint;
121use fuchsia_inspect::HistogramProperty;
122use fuchsia_sync::Mutex;
123use futures::FutureExt;
124use fxfs_macros::SerializeKey;
125use merge::{filter_marked_for_deletion, filter_tombstones, merge};
126use serde::{Deserialize, Serialize};
127use std::borrow::Borrow;
128use std::collections::{BTreeMap, HashSet, VecDeque};
129use std::hash::Hash;
130use std::marker::PhantomData;
131use std::num::{NonZero, Saturating};
132use std::ops::Range;
133use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
134use std::sync::{Arc, Weak};
135
136/// This trait is implemented by things that own reservations.
137pub trait ReservationOwner: Send + Sync {
138    /// Report that bytes are being released from the reservation back to the the |ReservationOwner|
139    /// where |owner_object_id| is the owner under the root object store associated with the
140    /// reservation.
141    fn release_reservation(&self, owner_object_id: Option<u64>, amount: u64);
142}
143
144/// A reservation guarantees that when it comes time to actually allocate, it will not fail due to
145/// lack of space.  Sub-reservations (a.k.a. holds) are possible which effectively allows part of a
146/// reservation to be set aside until it's time to commit.  Reservations do offer some
147/// thread-safety, but some responsibility is born by the caller: e.g. calling `forget` and
148/// `reserve` at the same time from different threads is unsafe. Reservations are have an
149/// |owner_object_id| which associates it with an object under the root object store that the
150/// reservation is accounted against.
151pub struct ReservationImpl<T: Borrow<U>, U: ReservationOwner + ?Sized> {
152    owner: T,
153    owner_object_id: Option<u64>,
154    inner: Mutex<ReservationInner>,
155    phantom: PhantomData<U>,
156}
157
158#[derive(Debug, Default)]
159struct ReservationInner {
160    // Amount currently held by this reservation.
161    amount: u64,
162
163    // Amount reserved by sub-reservations.
164    reserved: u64,
165}
166
167impl<T: Borrow<U>, U: ReservationOwner + ?Sized> std::fmt::Debug for ReservationImpl<T, U> {
168    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
169        self.inner.lock().fmt(f)
170    }
171}
172
173impl<T: Borrow<U> + Clone + Send + Sync, U: ReservationOwner + ?Sized> ReservationImpl<T, U> {
174    pub fn new(owner: T, owner_object_id: Option<u64>, amount: u64) -> Self {
175        Self {
176            owner,
177            owner_object_id,
178            inner: Mutex::new(ReservationInner { amount, reserved: 0 }),
179            phantom: PhantomData,
180        }
181    }
182
183    pub fn owner_object_id(&self) -> Option<u64> {
184        self.owner_object_id
185    }
186
187    /// Returns the total amount of the reservation, not accounting for anything that might be held.
188    pub fn amount(&self) -> u64 {
189        self.inner.lock().amount
190    }
191
192    /// Adds more to the reservation.
193    pub fn add(&self, amount: u64) {
194        self.inner.lock().amount += amount;
195    }
196
197    /// Returns the entire amount of the reservation.  The caller is responsible for maintaining
198    /// consistency, i.e. updating counters, etc, and there can be no sub-reservations (an assert
199    /// will fire otherwise).
200    pub fn forget(&self) -> u64 {
201        let mut inner = self.inner.lock();
202        assert_eq!(inner.reserved, 0);
203        std::mem::take(&mut inner.amount)
204    }
205
206    /// Takes some of the reservation.  The caller is responsible for maintaining consistency,
207    /// i.e. updating counters, etc.  This will assert that the amount being forgotten does not
208    /// exceed the available reservation amount; the caller should ensure that this is the case.
209    pub fn forget_some(&self, amount: u64) {
210        let mut inner = self.inner.lock();
211        inner.amount -= amount;
212        assert!(inner.reserved <= inner.amount);
213    }
214
215    /// Returns a partial amount of the reservation. |amount| is passed the |limit| and should
216    /// return the amount, which can be zero.
217    fn reserve_with(&self, amount: impl FnOnce(u64) -> u64) -> ReservationImpl<&Self, Self> {
218        let mut inner = self.inner.lock();
219        let taken = amount(inner.amount - inner.reserved);
220        inner.reserved += taken;
221        ReservationImpl::new(self, self.owner_object_id, taken)
222    }
223
224    /// Reserves *exactly* amount if possible.
225    pub fn reserve(&self, amount: u64) -> Option<ReservationImpl<&Self, Self>> {
226        let mut inner = self.inner.lock();
227        if inner.amount - inner.reserved < amount {
228            None
229        } else {
230            inner.reserved += amount;
231            Some(ReservationImpl::new(self, self.owner_object_id, amount))
232        }
233    }
234
235    /// Commits a previously reserved amount from this reservation.  The caller is responsible for
236    /// ensuring the amount was reserved.
237    pub fn commit(&self, amount: u64) {
238        let mut inner = self.inner.lock();
239        inner.reserved -= amount;
240        inner.amount -= amount;
241    }
242
243    /// Returns some of the reservation.
244    pub fn give_back(&self, amount: u64) {
245        self.owner.borrow().release_reservation(self.owner_object_id, amount);
246        let mut inner = self.inner.lock();
247        inner.amount -= amount;
248        assert!(inner.reserved <= inner.amount);
249    }
250
251    /// Moves `amount` from this reservation to another reservation.
252    pub fn move_to<V: Borrow<W> + Clone + Send + Sync, W: ReservationOwner + ?Sized>(
253        &self,
254        other: &ReservationImpl<V, W>,
255        amount: u64,
256    ) {
257        assert_eq!(self.owner_object_id, other.owner_object_id());
258        let mut inner = self.inner.lock();
259        if let Some(amount) = inner.amount.checked_sub(amount) {
260            inner.amount = amount;
261        } else {
262            std::mem::drop(inner);
263            panic!("Insufficient reservation space");
264        }
265        other.add(amount);
266    }
267}
268
269impl<T: Borrow<U>, U: ReservationOwner + ?Sized> Drop for ReservationImpl<T, U> {
270    fn drop(&mut self) {
271        let inner = self.inner.get_mut();
272        assert_eq!(inner.reserved, 0);
273        let owner_object_id = self.owner_object_id;
274        if inner.amount > 0 {
275            self.owner
276                .borrow()
277                .release_reservation(owner_object_id, std::mem::take(&mut inner.amount));
278        }
279    }
280}
281
282impl<T: Borrow<U> + Send + Sync, U: ReservationOwner + ?Sized> ReservationOwner
283    for ReservationImpl<T, U>
284{
285    fn release_reservation(&self, owner_object_id: Option<u64>, amount: u64) {
286        // Sub-reservations should belong to the same owner (or lack thereof).
287        assert_eq!(owner_object_id, self.owner_object_id);
288        let mut inner = self.inner.lock();
289        assert!(inner.reserved >= amount, "{} >= {}", inner.reserved, amount);
290        inner.reserved -= amount;
291    }
292}
293
294pub type Reservation = ReservationImpl<Arc<dyn ReservationOwner>, dyn ReservationOwner>;
295
296pub type Hold<'a> = ReservationImpl<&'a Reservation, Reservation>;
297
298/// Our allocator implementation tracks extents with a reference count.  At time of writing, these
299/// reference counts should never exceed 1, but that might change with snapshots and clones.
300pub type AllocatorKey = AllocatorKeyV32;
301
302#[derive(
303    Clone,
304    Debug,
305    Deserialize,
306    Eq,
307    Hash,
308    PartialEq,
309    Serialize,
310    TypeFingerprint,
311    SerializeKey,
312    Versioned,
313)]
314#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
315pub struct AllocatorKeyV32 {
316    pub device_range: ExtentKey,
317}
318
319impl SortByU64 for AllocatorKey {
320    fn get_leading_u64(&self) -> u64 {
321        self.device_range.end
322    }
323}
324
325const EXTENT_HASH_BUCKET_SIZE: u64 = 1 * 1024 * 1024;
326
327pub struct AllocatorKeyPartitionIterator {
328    device_range: Range<u64>,
329}
330
331impl Iterator for AllocatorKeyPartitionIterator {
332    type Item = u64;
333
334    fn next(&mut self) -> Option<Self::Item> {
335        if self.device_range.start >= self.device_range.end {
336            None
337        } else {
338            let start = self.device_range.start;
339            self.device_range.start = start.saturating_add(EXTENT_HASH_BUCKET_SIZE);
340            let end = std::cmp::min(self.device_range.start, self.device_range.end);
341            let key = AllocatorKey { device_range: ExtentKey::new(start..end) };
342            let hash = crate::stable_hash::stable_hash(key);
343            Some(hash)
344        }
345    }
346}
347
348impl FuzzyHash for AllocatorKey {
349    fn fuzzy_hash(&self) -> impl Iterator<Item = u64> {
350        AllocatorKeyPartitionIterator {
351            device_range: round_down(self.device_range.start, EXTENT_HASH_BUCKET_SIZE)
352                ..round_up(self.device_range.end, EXTENT_HASH_BUCKET_SIZE).unwrap_or(u64::MAX),
353        }
354    }
355
356    fn is_range_key(&self) -> bool {
357        true
358    }
359}
360
361impl AllocatorKey {
362    /// Returns a new key that is a lower bound suitable for use with merge_into.
363    pub fn lower_bound_for_merge_into(self: &AllocatorKey) -> AllocatorKey {
364        AllocatorKey { device_range: ExtentKey::new(0..self.device_range.start) }
365    }
366}
367
368impl LayerKey for AllocatorKey {
369    fn merge_type(&self) -> MergeType {
370        MergeType::OptimizedMerge
371    }
372
373    fn next_key(&self) -> Option<Self> {
374        Some(Self { device_range: ExtentKey::new(0..self.device_range.end + 1) })
375    }
376
377    fn search_key(&self) -> Option<Self> {
378        Some(Self { device_range: ExtentKey::new(0..self.device_range.start + 1) })
379    }
380
381    fn is_search_key(&self) -> bool {
382        self.device_range.start == 0
383    }
384
385    fn overlaps(&self, other: &Self) -> bool {
386        self.device_range.start < other.device_range.end
387            && self.device_range.end > other.device_range.start
388    }
389}
390
391impl OrdUpperBound for AllocatorKey {
392    fn cmp_upper_bound(&self, other: &AllocatorKey) -> std::cmp::Ordering {
393        self.device_range
394            .end
395            .cmp(&other.device_range.end)
396            .then(self.device_range.start.cmp(&other.device_range.start))
397    }
398}
399
400impl OrdLowerBound for AllocatorKey {
401    fn cmp_lower_bound(&self, other: &AllocatorKey) -> std::cmp::Ordering {
402        // The ordering over range.end is significant here as it is used in
403        // the heap ordering that feeds into our merge function and
404        // a total ordering over range lets us remove a symmetry case from
405        // the allocator merge function.
406        self.device_range
407            .start
408            .cmp(&other.device_range.start)
409            .then(self.device_range.end.cmp(&other.device_range.end))
410    }
411}
412
413impl Ord for AllocatorKey {
414    fn cmp(&self, other: &AllocatorKey) -> std::cmp::Ordering {
415        self.device_range
416            .start
417            .cmp(&other.device_range.start)
418            .then(self.device_range.end.cmp(&other.device_range.end))
419    }
420}
421
422impl PartialOrd for AllocatorKey {
423    fn partial_cmp(&self, other: &AllocatorKey) -> Option<std::cmp::Ordering> {
424        Some(self.cmp(other))
425    }
426}
427
428/// Allocations are "owned" by a single ObjectStore and are reference counted
429/// (for future snapshot/clone support).
430pub type AllocatorValue = AllocatorValueV32;
431impl Value for AllocatorValue {
432    const DELETED_MARKER: Self = Self::None;
433}
434
435#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize, TypeFingerprint, Versioned)]
436#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
437pub enum AllocatorValueV32 {
438    // Tombstone variant indicating an extent is no longer allocated.
439    None,
440    // Used when we know there are no possible allocations below us in the stack.
441    // This is currently all the time. We used to have a related Delta type but
442    // it has been removed due to correctness issues (https://fxbug.dev/42179428).
443    Abs { count: u64, owner_object_id: u64 },
444}
445
446pub type AllocatorItem = Item<AllocatorKey, AllocatorValue>;
447
448/// Serialized information about the allocator.
449pub type AllocatorInfo = AllocatorInfoV32;
450
451#[derive(Debug, Default, Clone, Deserialize, Serialize, TypeFingerprint, Versioned)]
452pub struct AllocatorInfoV32 {
453    /// Holds the set of layer file object_id for the LSM tree (newest first).
454    pub layers: Vec<u64>,
455    /// Maps from owner_object_id to bytes allocated.
456    pub allocated_bytes: BTreeMap<u64, u64>,
457    /// Set of owner_object_id that we should ignore if found in layer files.  For now, this should
458    /// always be empty on-disk because we always do full compactions.
459    pub marked_for_deletion: HashSet<u64>,
460    /// The limit for the number of allocates bytes per `owner_object_id` whereas the value. If
461    /// there is no limit present here for an `owner_object_id` assume it is max u64.
462    pub limit_bytes: BTreeMap<u64, u64>,
463}
464
465const MAX_ALLOCATOR_INFO_SERIALIZED_SIZE: usize = 131_072;
466
467/// Computes the target maximum extent size based on the block size of the allocator.
468pub fn max_extent_size_for_block_size(block_size: u64) -> u64 {
469    // Each block in an extent contains an 8-byte checksum (which due to varint encoding is 9
470    // bytes), and a given extent record must be no larger DEFAULT_MAX_SERIALIZED_RECORD_SIZE.  We
471    // also need to leave a bit of room (arbitrarily, 64 bytes) for the rest of the extent's
472    // metadata.
473    block_size * (DEFAULT_MAX_SERIALIZED_RECORD_SIZE - 64) / 9
474}
475
476#[derive(Default)]
477struct AllocatorCounters {
478    num_flushes: u64,
479    last_flush_time: Option<std::time::SystemTime>,
480}
481
482pub struct Allocator {
483    filesystem: Weak<FxFilesystem>,
484    block_size: u64,
485    device_size: u64,
486    object_id: u64,
487    max_extent_size_bytes: u64,
488    tree: LSMTree<AllocatorKey, AllocatorValue>,
489    // A list of allocations which are temporary; i.e. they are not actually stored in the LSM tree,
490    // but we still don't want to be able to allocate over them.  This layer is merged into the
491    // allocator's LSM tree whilst reading it, so the allocations appear to exist in the LSM tree.
492    // This is used in a few places, for example to hold back allocations that have been added to a
493    // transaction but are not yet committed yet, or to prevent the allocation of a deleted extent
494    // until the device is flushed.
495    temporary_allocations: Arc<SkipListLayer<AllocatorKey, AllocatorValue>>,
496    inner: Mutex<Inner>,
497    allocation_mutex: futures::lock::Mutex<()>,
498    counters: Mutex<AllocatorCounters>,
499    maximum_offset: AtomicU64,
500    allocations_allowed: AtomicBool,
501}
502
503/// Tracks the different stages of byte allocations for an individual owner.
504#[derive(Debug, Default, PartialEq)]
505struct ByteTracking {
506    /// This value is the up-to-date count of the number of allocated bytes per owner_object_id
507    /// whereas the value in `Info::allocated_bytes` is the value as it was when we last flushed.
508    /// This field should be regarded as *untrusted*; it can be invalid due to filesystem
509    /// inconsistencies, and this is why it is `Saturating<u64>` rather than just `u64`.  Any
510    /// computations that use this value should typically propagate `Saturating` so that it is clear
511    /// the value is *untrusted*.
512    allocated_bytes: Saturating<u64>,
513
514    /// This value is the number of bytes allocated to uncommitted allocations.
515    /// (Bytes allocated, but not yet persisted to disk)
516    uncommitted_allocated_bytes: u64,
517
518    /// This value is the number of bytes allocated to reservations.
519    reserved_bytes: u64,
520
521    /// Committed deallocations that we cannot use until they are flushed to the device.  Each entry
522    /// in this list is the log file offset at which it was committed and an array of deallocations
523    /// that occurred at that time.
524    committed_deallocated_bytes: u64,
525}
526
527impl ByteTracking {
528    // Returns the total number of bytes that are taken either from reservations, allocations or
529    // uncommitted allocations.
530    fn used_bytes(&self) -> Saturating<u64> {
531        self.allocated_bytes + Saturating(self.uncommitted_allocated_bytes + self.reserved_bytes)
532    }
533
534    // Returns the amount that is not available to be allocated, which includes actually allocated
535    // bytes, bytes that have been allocated for a transaction but the transaction hasn't committed
536    // yet, and bytes that have been deallocated, but the device hasn't been flushed yet so we can't
537    // reuse those bytes yet.
538    fn unavailable_bytes(&self) -> Saturating<u64> {
539        self.allocated_bytes
540            + Saturating(self.uncommitted_allocated_bytes)
541            + Saturating(self.committed_deallocated_bytes)
542    }
543
544    // Like `unavailable_bytes`, but treats as available the bytes which have been deallocated and
545    // require a device flush to be reused.
546    fn unavailable_after_sync_bytes(&self) -> Saturating<u64> {
547        self.allocated_bytes + Saturating(self.uncommitted_allocated_bytes)
548    }
549}
550
551#[derive(Debug)]
552struct CommittedDeallocation {
553    // The offset at which this deallocation was committed.
554    log_file_offset: u64,
555    // The device range being deallocated.
556    range: Range<u64>,
557    // The owning object id which originally allocated it.
558    owner_object_id: u64,
559}
560
561struct Inner {
562    info: AllocatorInfo,
563
564    /// The allocator can only be opened if there have been no allocations and it has not already
565    /// been opened or initialized.
566    opened: bool,
567
568    /// When we allocate a range from RAM, we add it to Allocator::temporary_allocations.
569    /// This layer is added to layer_set in rebuild_strategy and take_for trim so we don't assume
570    /// the range is available.
571    /// If we apply the mutation, we move it from `temporary_allocations` to the LSMTree.
572    /// If we drop the mutation, we just delete it from `temporary_allocations` and call `free`.
573    ///
574    /// We need to be very careful about races when we move the allocation into the LSMTree.
575    /// A layer_set is a snapshot in time of a set of layers. Say:
576    ///  1. `rebuild_strategy` takes a layer_set that includes the mutable layer.
577    ///  2. A compaction operation seals that mutable layer and creates a new mutable layer.
578    ///     Note that the layer_set in rebuild_strategy doesn't have this new mutable layer.
579    ///  3. An apply_mutation operation adds the allocation to the new mutable layer.
580    ///     It then removes the allocation from `temporary_allocations`.
581    ///  4. `rebuild_strategy` iterates the layer_set, missing both the temporary mutation AND
582    ///     the record in the new mutable layer, making the allocated range available for double
583    ///     allocation and future filesystem corruption.
584    ///
585    /// We don't have (or want) a means to lock compctions during rebuild_strategy operations so
586    /// to avoid this scenario, we can't remove from temporary_allocations when we apply a mutation.
587    /// Instead we add them to dropped_temporary_allocations and make the actual removals from
588    /// `temporary_allocations` while holding the allocator lock. Because rebuild_strategy only
589    /// runs with this lock held, this guarantees that we don't remove entries from temporary
590    /// mutations while also iterating over it.
591    ///
592    /// A related issue happens when we deallocate a range. We use temporary_allocations this time
593    /// to prevent reuse until after the deallocation has been successfully flushed.
594    /// In this scenario we don't want to call 'free()' until the range is ready to use again.
595    dropped_temporary_allocations: Vec<Range<u64>>,
596
597    /// The per-owner counters for bytes at various stages of the data life-cycle. From initial
598    /// reservation through until the bytes are unallocated and eventually uncommitted.
599    owner_bytes: BTreeMap<u64, ByteTracking>,
600
601    /// This value is the number of bytes allocated to reservations but not tracked as part of a
602    /// particular volume.
603    unattributed_reserved_bytes: u64,
604
605    /// Committed deallocations that we cannot use until they are flushed to the device.
606    committed_deallocated: VecDeque<CommittedDeallocation>,
607
608    /// Bytes which are currently being trimmed.  These can still be allocated from, but the
609    /// allocation will block until the current batch of trimming is finished.
610    trim_reserved_bytes: u64,
611
612    /// While a trim is being performed, this listener is set.  When the current batch of extents
613    /// being trimmed have been released (and trim_reserved_bytes is 0), this is signaled.
614    /// This should only be listened to while the allocation_mutex is held.
615    trim_listener: Option<EventListener>,
616
617    /// This controls how we allocate our free space to manage fragmentation.
618    strategy: strategy::BestFit,
619
620    /// Tracks the number of allocations of size 1,2,...63,>=64.
621    allocation_size_histogram: [u64; 64],
622    /// Tracks which size bucket triggers rebuild_strategy.
623    rebuild_strategy_trigger_histogram: [u64; 64],
624
625    /// This is distinct from the set contained in `info`.  New entries are inserted *after* a
626    /// device has been flushed (it is not safe to reuse the space taken by a deleted volume prior
627    /// to this) and are removed after a major compaction.
628    marked_for_deletion: HashSet<u64>,
629
630    /// The set of volumes deleted that are waiting for a sync.
631    volumes_deleted_pending_sync: HashSet<u64>,
632}
633
634impl Inner {
635    fn allocated_bytes(&self) -> Saturating<u64> {
636        let mut total = Saturating(0);
637        for (_, bytes) in &self.owner_bytes {
638            total += bytes.allocated_bytes;
639        }
640        total
641    }
642
643    fn uncommitted_allocated_bytes(&self) -> u64 {
644        self.owner_bytes.values().map(|x| &x.uncommitted_allocated_bytes).sum()
645    }
646
647    fn reserved_bytes(&self) -> u64 {
648        self.owner_bytes.values().map(|x| &x.reserved_bytes).sum::<u64>()
649            + self.unattributed_reserved_bytes
650    }
651
652    fn owner_id_limit_bytes(&self, owner_object_id: u64) -> u64 {
653        match self.info.limit_bytes.get(&owner_object_id) {
654            Some(v) => *v,
655            None => u64::MAX,
656        }
657    }
658
659    fn owner_id_bytes_left(&self, owner_object_id: u64) -> u64 {
660        let limit = self.owner_id_limit_bytes(owner_object_id);
661        let used = self.owner_bytes.get(&owner_object_id).map_or(Saturating(0), |b| b.used_bytes());
662        (Saturating(limit) - used).0
663    }
664
665    // Returns the amount that is not available to be allocated, which includes actually allocated
666    // bytes, bytes that have been allocated for a transaction but the transaction hasn't committed
667    // yet, and bytes that have been deallocated, but the device hasn't been flushed yet so we can't
668    // reuse those bytes yet.
669    fn unavailable_bytes(&self) -> Saturating<u64> {
670        let mut total = Saturating(0);
671        for (_, bytes) in &self.owner_bytes {
672            total += bytes.unavailable_bytes();
673        }
674        total
675    }
676
677    // Returns the total number of bytes that are taken either from reservations, allocations or
678    // uncommitted allocations.
679    fn used_bytes(&self) -> Saturating<u64> {
680        let mut total = Saturating(0);
681        for (_, bytes) in &self.owner_bytes {
682            total += bytes.used_bytes();
683        }
684        total + Saturating(self.unattributed_reserved_bytes)
685    }
686
687    // Like `unavailable_bytes`, but treats as available the bytes which have been deallocated and
688    // require a device flush to be reused.
689    fn unavailable_after_sync_bytes(&self) -> Saturating<u64> {
690        let mut total = Saturating(0);
691        for (_, bytes) in &self.owner_bytes {
692            total += bytes.unavailable_after_sync_bytes();
693        }
694        total
695    }
696
697    // Returns the number of bytes which will be available after the current batch of trimming
698    // completes.
699    fn bytes_available_not_being_trimmed(&self, device_size: u64) -> Result<u64, Error> {
700        device_size
701            .checked_sub(
702                (self.unavailable_after_sync_bytes() + Saturating(self.trim_reserved_bytes)).0,
703            )
704            .ok_or_else(|| anyhow!(FxfsError::Inconsistent))
705    }
706
707    fn add_reservation(&mut self, owner_object_id: Option<u64>, amount: u64) {
708        match owner_object_id {
709            Some(owner) => self.owner_bytes.entry(owner).or_default().reserved_bytes += amount,
710            None => self.unattributed_reserved_bytes += amount,
711        };
712    }
713
714    fn remove_reservation(&mut self, owner_object_id: Option<u64>, amount: u64) {
715        match owner_object_id {
716            Some(owner) => {
717                let owner_entry = self.owner_bytes.entry(owner).or_default();
718                assert!(
719                    owner_entry.reserved_bytes >= amount,
720                    "{} >= {}",
721                    owner_entry.reserved_bytes,
722                    amount
723                );
724                owner_entry.reserved_bytes -= amount;
725            }
726            None => {
727                assert!(
728                    self.unattributed_reserved_bytes >= amount,
729                    "{} >= {}",
730                    self.unattributed_reserved_bytes,
731                    amount
732                );
733                self.unattributed_reserved_bytes -= amount
734            }
735        };
736    }
737}
738
739/// A container for a set of extents which are known to be free and can be trimmed.  Returned by
740/// `take_for_trimming`.
741pub struct TrimmableExtents<'a> {
742    allocator: &'a Allocator,
743    extents: Vec<Range<u64>>,
744    // The allocator can subscribe to this event to wait until these extents are dropped.  This way,
745    // we don't fail an allocation attempt if blocks are tied up for trimming; rather, we just wait
746    // until the batch is finished with and then proceed.
747    _drop_event: DropEvent,
748}
749
750impl<'a> TrimmableExtents<'a> {
751    pub fn extents(&self) -> &Vec<Range<u64>> {
752        &self.extents
753    }
754
755    // Also returns an EventListener which is signaled when this is dropped.
756    fn new(allocator: &'a Allocator) -> (Self, EventListener) {
757        let drop_event = DropEvent::new();
758        let listener = drop_event.listen();
759        (Self { allocator, extents: vec![], _drop_event: drop_event }, listener)
760    }
761
762    fn add_extent(&mut self, extent: Range<u64>) {
763        self.extents.push(extent);
764    }
765}
766
767impl<'a> Drop for TrimmableExtents<'a> {
768    fn drop(&mut self) {
769        let mut inner = self.allocator.inner.lock();
770        for device_range in std::mem::take(&mut self.extents) {
771            inner.strategy.free(device_range.clone()).expect("drop trim extent");
772            self.allocator
773                .temporary_allocations
774                .erase(&AllocatorKey { device_range: ExtentKey::new(device_range.clone()) });
775        }
776        inner.trim_reserved_bytes = 0;
777    }
778}
779
780impl Allocator {
781    pub fn new(filesystem: Arc<FxFilesystem>, object_id: u64) -> Allocator {
782        let block_size = filesystem.block_size();
783        // We expect device size to be a multiple of block size. Throw away any tail.
784        let device_size = round_down(filesystem.device().size(), block_size);
785        if device_size != filesystem.device().size() {
786            warn!("Device size is not block aligned. Rounding down.");
787        }
788        let max_extent_size_bytes = max_extent_size_for_block_size(filesystem.block_size());
789        let mut strategy = strategy::BestFit::default();
790        strategy.free(0..device_size).expect("new fs");
791        Allocator {
792            filesystem: Arc::downgrade(&filesystem),
793            block_size,
794            device_size,
795            object_id,
796            max_extent_size_bytes,
797            tree: LSMTree::new(merge, Box::new(NullCache {})),
798            temporary_allocations: SkipListLayer::new(1024),
799            inner: Mutex::new(Inner {
800                info: AllocatorInfo::default(),
801                opened: false,
802                dropped_temporary_allocations: Vec::new(),
803                owner_bytes: BTreeMap::new(),
804                unattributed_reserved_bytes: 0,
805                committed_deallocated: VecDeque::new(),
806                trim_reserved_bytes: 0,
807                trim_listener: None,
808                strategy,
809                allocation_size_histogram: [0; 64],
810                rebuild_strategy_trigger_histogram: [0; 64],
811                marked_for_deletion: HashSet::new(),
812                volumes_deleted_pending_sync: HashSet::new(),
813            }),
814            allocation_mutex: futures::lock::Mutex::new(()),
815            counters: Mutex::new(AllocatorCounters::default()),
816            maximum_offset: AtomicU64::new(0),
817            allocations_allowed: AtomicBool::new(filesystem.options().image_builder_mode.is_none()),
818        }
819    }
820
821    pub fn tree(&self) -> &LSMTree<AllocatorKey, AllocatorValue> {
822        &self.tree
823    }
824
825    /// Enables allocations.
826    /// This is only valid to call if the allocator was created in image builder mode.
827    pub fn enable_allocations(&self) {
828        self.allocations_allowed.store(true, Ordering::SeqCst);
829    }
830
831    /// Returns an iterator that yields all allocations, filtering out tombstones and any
832    /// owner_object_id that have been marked as deleted.  If `committed_marked_for_deletion` is
833    /// true, then filter using the committed volumes marked for deletion rather than the in-memory
834    /// copy which excludes volumes that have been deleted but there hasn't been a sync yet.
835    pub async fn filter(
836        &self,
837        iter: impl LayerIterator<AllocatorKey, AllocatorValue>,
838        committed_marked_for_deletion: bool,
839    ) -> Result<impl LayerIterator<AllocatorKey, AllocatorValue>, Error> {
840        let marked_for_deletion = {
841            let inner = self.inner.lock();
842            if committed_marked_for_deletion {
843                &inner.info.marked_for_deletion
844            } else {
845                &inner.marked_for_deletion
846            }
847            .clone()
848        };
849        let iter =
850            filter_marked_for_deletion(filter_tombstones(iter).await?, marked_for_deletion).await?;
851        Ok(iter)
852    }
853
854    /// A histogram of allocation request sizes.
855    /// The index into the array is 'number of blocks'.
856    /// The last bucket is a catch-all for larger allocation requests.
857    pub fn allocation_size_histogram(&self) -> [u64; 64] {
858        self.inner.lock().allocation_size_histogram
859    }
860
861    /// Creates a new (empty) allocator.
862    pub async fn create(&self, transaction: &mut Transaction<'_>) -> Result<(), Error> {
863        // Mark the allocator as opened before creating the file because creating a new
864        // transaction requires a reservation.
865        assert_eq!(std::mem::replace(&mut self.inner.lock().opened, true), false);
866
867        let filesystem = self.filesystem.upgrade().unwrap();
868        let root_store = filesystem.root_store();
869        ObjectStore::create_object_with_id(
870            &root_store,
871            transaction,
872            ReservedId::new(&root_store, NonZero::new(self.object_id()).unwrap()),
873            HandleOptions::default(),
874            None,
875        )?;
876        root_store.update_last_object_id(self.object_id());
877        Ok(())
878    }
879
880    // Opens the allocator.  This is not thread-safe; this should be called prior to
881    // replaying the allocator's mutations.
882    pub async fn open(self: &Arc<Self>) -> Result<(), Error> {
883        let filesystem = self.filesystem.upgrade().unwrap();
884        let root_store = filesystem.root_store();
885
886        self.inner.lock().strategy = strategy::BestFit::default();
887
888        let handle =
889            ObjectStore::open_object(&root_store, self.object_id, HandleOptions::default(), None)
890                .await
891                .context("Failed to open allocator object")?;
892
893        if handle.get_size() > 0 {
894            let serialized_info = handle
895                .contents(MAX_ALLOCATOR_INFO_SERIALIZED_SIZE)
896                .await
897                .context("Failed to read AllocatorInfo")?;
898            let mut cursor = std::io::Cursor::new(serialized_info);
899            let (info, _version) = AllocatorInfo::deserialize_with_version(&mut cursor)
900                .context("Failed to deserialize AllocatorInfo")?;
901
902            let mut handles = Vec::new();
903            let mut total_size = 0;
904            for object_id in &info.layers {
905                let handle = ObjectStore::open_object(
906                    &root_store,
907                    *object_id,
908                    HandleOptions::default(),
909                    None,
910                )
911                .await
912                .context("Failed to open allocator layer file")?;
913
914                let size = handle.get_size();
915                total_size += size;
916                handles.push(handle);
917            }
918
919            {
920                let mut inner = self.inner.lock();
921
922                // Check allocated_bytes fits within the device.
923                let mut device_bytes = self.device_size;
924                for (&owner_object_id, &bytes) in &info.allocated_bytes {
925                    ensure!(
926                        bytes <= device_bytes,
927                        anyhow!(FxfsError::Inconsistent).context(format!(
928                            "Allocated bytes exceeds device size: {:?}",
929                            info.allocated_bytes
930                        ))
931                    );
932                    device_bytes -= bytes;
933
934                    inner.owner_bytes.entry(owner_object_id).or_default().allocated_bytes =
935                        Saturating(bytes);
936                }
937
938                inner.info = info;
939            }
940
941            self.tree.append_layers(handles).await.context("Failed to append allocator layers")?;
942            self.filesystem.upgrade().unwrap().object_manager().update_reservation(
943                self.object_id,
944                tree::reservation_amount_from_layer_size(total_size),
945            );
946        }
947
948        Ok(())
949    }
950
951    pub async fn on_replay_complete(self: &Arc<Self>) -> Result<(), Error> {
952        // We can assume the device has been flushed.
953        {
954            let mut inner = self.inner.lock();
955            inner.volumes_deleted_pending_sync.clear();
956            inner.marked_for_deletion = inner.info.marked_for_deletion.clone();
957        }
958
959        // Build free extent structure from disk. For now, we expect disks to have *some* free
960        // space at all times. This may change if we ever support mounting of read-only
961        // redistributable filesystem images.
962        if !self.rebuild_strategy().await.context("Build free extents")? {
963            if self.filesystem.upgrade().unwrap().options().read_only {
964                info!("Device contains no free space (read-only mode).");
965            } else {
966                info!("Device contains no free space.");
967                return Err(FxfsError::Inconsistent)
968                    .context("Device appears to contain no free space");
969            }
970        }
971
972        assert_eq!(std::mem::replace(&mut self.inner.lock().opened, true), false);
973        Ok(())
974    }
975
976    /// Walk all allocations to generate the set of free regions between allocations.
977    /// It is safe to re-run this on live filesystems but it should not be called concurrently
978    /// with allocations, trims or other rebuild_strategy invocations -- use allocation_mutex.
979    /// Returns true if the rebuild made changes to either the free ranges or overflow markers.
980    async fn rebuild_strategy(self: &Arc<Self>) -> Result<bool, Error> {
981        let mut changed = false;
982        let mut layer_set = self.tree.empty_layer_set();
983        layer_set.layers.push((self.temporary_allocations.clone() as Arc<dyn Layer<_, _>>).into());
984        self.tree.add_all_layers_to_layer_set(&mut layer_set);
985
986        let overflow_markers = self.inner.lock().strategy.overflow_markers();
987        self.inner.lock().strategy.reset_overflow_markers();
988
989        let mut to_add = Vec::new();
990        let mut merger = layer_set.merger();
991        let mut iter = self.filter(merger.query(Query::FullScan).await?, false).await?;
992        let mut last_offset = 0;
993        while last_offset < self.device_size {
994            let next_range = match iter.get() {
995                None => {
996                    assert!(last_offset <= self.device_size);
997                    let range = last_offset..self.device_size;
998                    last_offset = self.device_size;
999                    iter.advance().await?;
1000                    range
1001                }
1002                Some(ItemRef { key, .. }) => {
1003                    let device_range = &key.device_range;
1004                    if device_range.end < last_offset {
1005                        iter.advance().await?;
1006                        continue;
1007                    }
1008                    if device_range.start <= last_offset {
1009                        last_offset = device_range.end;
1010                        iter.advance().await?;
1011                        continue;
1012                    }
1013                    let range = last_offset..device_range.start;
1014                    last_offset = device_range.end;
1015                    iter.advance().await?;
1016                    range
1017                }
1018            };
1019            to_add.push(next_range);
1020            // Avoid taking a lock on inner for every free range.
1021            if to_add.len() > 100 {
1022                let mut inner = self.inner.lock();
1023                for range in to_add.drain(..) {
1024                    changed |= inner.strategy.force_free(range)?;
1025                }
1026            }
1027        }
1028        let mut inner = self.inner.lock();
1029        for range in to_add {
1030            changed |= inner.strategy.force_free(range)?;
1031        }
1032        if overflow_markers != inner.strategy.overflow_markers() {
1033            changed = true;
1034        }
1035        Ok(changed)
1036    }
1037
1038    /// Collects up to `extents_per_batch` free extents of size up to `max_extent_size` from
1039    /// `offset`.  The extents will be reserved.
1040    /// Note that only one `FreeExtents` can exist for the allocator at any time.
1041    pub async fn take_for_trimming(
1042        &self,
1043        offset: u64,
1044        max_extent_size: usize,
1045        extents_per_batch: usize,
1046    ) -> Result<TrimmableExtents<'_>, Error> {
1047        let _guard = self.allocation_mutex.lock().await;
1048
1049        let (mut result, listener) = TrimmableExtents::new(self);
1050        let mut bytes = 0;
1051
1052        // We can't just use self.strategy here because it doesn't necessarily hold all
1053        // free extent metadata in RAM.
1054        let mut layer_set = self.tree.empty_layer_set();
1055        layer_set.layers.push((self.temporary_allocations.clone() as Arc<dyn Layer<_, _>>).into());
1056        self.tree.add_all_layers_to_layer_set(&mut layer_set);
1057        let mut merger = layer_set.merger();
1058        let mut iter = self
1059            .filter(
1060                merger
1061                    .query(Query::FullRange(&AllocatorKey {
1062                        device_range: ExtentKey::new(0..offset + 1),
1063                    }))
1064                    .await?,
1065                false,
1066            )
1067            .await?;
1068        let mut last_offset = offset;
1069        'outer: while last_offset < self.device_size {
1070            let mut range = match iter.get() {
1071                None => {
1072                    assert!(last_offset <= self.device_size);
1073                    let range = last_offset..self.device_size;
1074                    last_offset = self.device_size;
1075                    iter.advance().await?;
1076                    range
1077                }
1078                Some(ItemRef { key: AllocatorKey { device_range, .. }, .. }) => {
1079                    if device_range.end <= last_offset {
1080                        iter.advance().await?;
1081                        continue;
1082                    }
1083                    if device_range.start <= last_offset {
1084                        last_offset = device_range.end;
1085                        iter.advance().await?;
1086                        continue;
1087                    }
1088                    let range = last_offset..device_range.start;
1089                    last_offset = device_range.end;
1090                    iter.advance().await?;
1091                    range
1092                }
1093            };
1094            if range.start < offset {
1095                continue;
1096            }
1097
1098            // 'range' is based on the on-disk LSM tree. We need to check against any uncommitted
1099            // allocations and remove temporarily allocate the range for ourselves.
1100            let mut inner = self.inner.lock();
1101
1102            while range.start < range.end {
1103                let prefix =
1104                    range.start..std::cmp::min(range.start + max_extent_size as u64, range.end);
1105                range = prefix.end..range.end;
1106                bytes += prefix.length()?;
1107                // We can assume the following are safe because we've checked both LSM tree and
1108                // temporary_allocations while holding the lock.
1109                inner.strategy.remove(prefix.clone());
1110                self.temporary_allocations.insert(AllocatorItem {
1111                    key: AllocatorKey { device_range: ExtentKey::new(prefix.clone()) },
1112                    value: AllocatorValue::Abs { owner_object_id: INVALID_OBJECT_ID, count: 1 },
1113                })?;
1114                result.add_extent(prefix);
1115                if result.extents.len() == extents_per_batch {
1116                    break 'outer;
1117                }
1118            }
1119            if result.extents.len() == extents_per_batch {
1120                break 'outer;
1121            }
1122        }
1123        {
1124            let mut inner = self.inner.lock();
1125
1126            assert!(inner.trim_reserved_bytes == 0, "Multiple trims ongoing");
1127            inner.trim_listener = Some(listener);
1128            inner.trim_reserved_bytes = bytes;
1129            debug_assert!(
1130                (Saturating(inner.trim_reserved_bytes) + inner.unavailable_bytes()).0
1131                    <= self.device_size
1132            );
1133        }
1134        Ok(result)
1135    }
1136
1137    /// Returns all objects that exist in the parent store that pertain to this allocator.
1138    pub fn parent_objects(&self) -> Vec<u64> {
1139        // The allocator tree needs to store a file for each of the layers in the tree, so we return
1140        // those, since nothing else references them.
1141        self.inner.lock().info.layers.clone()
1142    }
1143
1144    /// Returns all the current owner byte limits (in pairs of `(owner_object_id, bytes)`).
1145    pub fn owner_byte_limits(&self) -> Vec<(u64, u64)> {
1146        self.inner.lock().info.limit_bytes.iter().map(|(k, v)| (*k, *v)).collect()
1147    }
1148
1149    /// Returns (allocated_bytes, byte_limit) for the given owner.
1150    pub fn owner_allocation_info(&self, owner_object_id: u64) -> (u64, Option<u64>) {
1151        let inner = self.inner.lock();
1152        (
1153            inner.owner_bytes.get(&owner_object_id).map(|b| b.used_bytes().0).unwrap_or(0u64),
1154            inner.info.limit_bytes.get(&owner_object_id).copied(),
1155        )
1156    }
1157
1158    /// Returns owner bytes debug information.
1159    pub fn owner_bytes_debug(&self) -> String {
1160        format!("{:?}", self.inner.lock().owner_bytes)
1161    }
1162
1163    fn needs_sync(&self) -> bool {
1164        // TODO(https://fxbug.dev/42178048): This will only trigger if *all* free space is taken up with
1165        // committed deallocated bytes, but we might want to trigger a sync if we're low and there
1166        // happens to be a lot of deallocated bytes as that might mean we can fully satisfy
1167        // allocation requests.
1168        let inner = self.inner.lock();
1169        inner.unavailable_bytes().0 >= self.device_size
1170    }
1171
1172    fn is_system_store(&self, owner_object_id: u64) -> bool {
1173        let fs = self.filesystem.upgrade().unwrap();
1174        owner_object_id == fs.object_manager().root_store_object_id()
1175            || owner_object_id == fs.object_manager().root_parent_store_object_id()
1176    }
1177
1178    /// Updates the accounting to track that a byte reservation has been moved out of an owner to
1179    /// the unattributed pool.
1180    pub fn disown_reservation(&self, old_owner_object_id: Option<u64>, amount: u64) {
1181        if old_owner_object_id.is_none() || amount == 0 {
1182            return;
1183        }
1184        // These 2 mutations should behave as though they're a single atomic mutation.
1185        let mut inner = self.inner.lock();
1186        inner.remove_reservation(old_owner_object_id, amount);
1187        inner.add_reservation(None, amount);
1188    }
1189
1190    /// Creates a lazy inspect node named `str` under `parent` which will yield statistics for the
1191    /// allocator when queried.
1192    pub fn track_statistics(self: &Arc<Self>, parent: &fuchsia_inspect::Node, name: &str) {
1193        let this = Arc::downgrade(self);
1194        parent.record_lazy_child(name, move || {
1195            let this_clone = this.clone();
1196            async move {
1197                let inspector = fuchsia_inspect::Inspector::default();
1198                if let Some(this) = this_clone.upgrade() {
1199                    let counters = this.counters.lock();
1200                    let root = inspector.root();
1201                    root.record_uint("max_extent_size_bytes", this.max_extent_size_bytes);
1202                    root.record_uint("bytes_total", this.device_size);
1203                    let (allocated, reserved, used, unavailable) = {
1204                        // TODO(https://fxbug.dev/42069513): Push-back or rate-limit to prevent DoS.
1205                        let inner = this.inner.lock();
1206                        (
1207                            inner.allocated_bytes().0,
1208                            inner.reserved_bytes(),
1209                            inner.used_bytes().0,
1210                            inner.unavailable_bytes().0,
1211                        )
1212                    };
1213                    root.record_uint("bytes_allocated", allocated);
1214                    root.record_uint("bytes_reserved", reserved);
1215                    root.record_uint("bytes_used", used);
1216                    root.record_uint("bytes_unavailable", unavailable);
1217
1218                    // TODO(https://fxbug.dev/42068224): Post-compute rather than manually computing
1219                    // metrics.
1220                    if let Some(x) = round_div(100 * allocated, this.device_size) {
1221                        root.record_uint("bytes_allocated_percent", x);
1222                    }
1223                    if let Some(x) = round_div(100 * reserved, this.device_size) {
1224                        root.record_uint("bytes_reserved_percent", x);
1225                    }
1226                    if let Some(x) = round_div(100 * used, this.device_size) {
1227                        root.record_uint("bytes_used_percent", x);
1228                    }
1229                    if let Some(x) = round_div(100 * unavailable, this.device_size) {
1230                        root.record_uint("bytes_unavailable_percent", x);
1231                    }
1232
1233                    root.record_uint("num_flushes", counters.num_flushes);
1234                    if let Some(last_flush_time) = counters.last_flush_time.as_ref() {
1235                        root.record_uint(
1236                            "last_flush_time_ms",
1237                            last_flush_time
1238                                .duration_since(std::time::UNIX_EPOCH)
1239                                .unwrap_or(std::time::Duration::ZERO)
1240                                .as_millis()
1241                                .try_into()
1242                                .unwrap_or(0u64),
1243                        );
1244                    }
1245
1246                    let data = this.allocation_size_histogram();
1247                    let alloc_sizes = root.create_uint_linear_histogram(
1248                        "allocation_size_histogram",
1249                        fuchsia_inspect::LinearHistogramParams {
1250                            floor: 1,
1251                            step_size: 1,
1252                            buckets: 64,
1253                        },
1254                    );
1255                    for (i, count) in data.iter().enumerate() {
1256                        if i != 0 {
1257                            alloc_sizes.insert_multiple(i as u64, *count as usize);
1258                        }
1259                    }
1260                    root.record(alloc_sizes);
1261
1262                    let data = this.inner.lock().rebuild_strategy_trigger_histogram;
1263                    let triggers = root.create_uint_linear_histogram(
1264                        "rebuild_strategy_triggers",
1265                        fuchsia_inspect::LinearHistogramParams {
1266                            floor: 1,
1267                            step_size: 1,
1268                            buckets: 64,
1269                        },
1270                    );
1271                    for (i, count) in data.iter().enumerate() {
1272                        if i != 0 {
1273                            triggers.insert_multiple(i as u64, *count as usize);
1274                        }
1275                    }
1276                    root.record(triggers);
1277
1278                    let allocator_ref = this.clone();
1279                    root.record_child("lsm_tree", move |node| {
1280                        allocator_ref.tree.record_inspect_data(node);
1281                    });
1282                }
1283                Ok(inspector)
1284            }
1285            .boxed()
1286        });
1287    }
1288
1289    /// Returns the offset of the first byte which has not been used by the allocator since its
1290    /// creation.
1291    /// NB: This does *not* take into account existing allocations.  This is only reliable when the
1292    /// allocator was created from scratch, without any pre-existing allocations.
1293    pub fn maximum_offset(&self) -> u64 {
1294        self.maximum_offset.load(Ordering::Relaxed)
1295    }
1296}
1297
1298impl Drop for Allocator {
1299    fn drop(&mut self) {
1300        let inner = self.inner.lock();
1301        // Uncommitted and reserved should be released back using RAII, so they should be zero.
1302        assert_eq!(inner.uncommitted_allocated_bytes(), 0);
1303        assert_eq!(inner.reserved_bytes(), 0);
1304    }
1305}
1306
1307#[fxfs_trace::trace]
1308impl Allocator {
1309    /// Returns the object ID for the allocator.
1310    pub fn object_id(&self) -> u64 {
1311        self.object_id
1312    }
1313
1314    /// Returns information about the allocator such as the layer files storing persisted
1315    /// allocations.
1316    pub fn info(&self) -> AllocatorInfo {
1317        self.inner.lock().info.clone()
1318    }
1319
1320    /// Tries to allocate enough space for |object_range| in the specified object and returns the
1321    /// device range allocated.
1322    /// The allocated range may be short (e.g. due to fragmentation), in which case the caller can
1323    /// simply call allocate again until they have enough blocks.
1324    ///
1325    /// We also store the object store ID of the store that the allocation should be assigned to so
1326    /// that we have a means to delete encrypted stores without needing the encryption key.
1327    #[trace]
1328    pub async fn allocate(
1329        self: &Arc<Self>,
1330        transaction: &mut Transaction<'_>,
1331        owner_object_id: u64,
1332        mut len: u64,
1333    ) -> Result<Range<u64>, Error> {
1334        ensure!(self.allocations_allowed.load(Ordering::SeqCst), FxfsError::Unavailable);
1335        assert_eq!(len % self.block_size, 0);
1336        len = std::cmp::min(len, self.max_extent_size_bytes);
1337        debug_assert_ne!(owner_object_id, INVALID_OBJECT_ID);
1338
1339        // Make sure we have space reserved before we try and find the space.
1340        let reservation = if let Some(reservation) = transaction.allocator_reservation {
1341            match reservation.owner_object_id {
1342                // If there is no owner, this must be a system store that we're allocating for.
1343                None => assert!(self.is_system_store(owner_object_id)),
1344                // If it has an owner, it should not be different than the allocating owner.
1345                Some(res_owner_object_id) => assert_eq!(owner_object_id, res_owner_object_id),
1346            };
1347            // Reservation limits aren't necessarily a multiple of the block size.
1348            let r = reservation
1349                .reserve_with(|limit| std::cmp::min(len, round_down(limit, self.block_size)));
1350            len = r.amount();
1351            Left(r)
1352        } else {
1353            let mut inner = self.inner.lock();
1354            assert!(inner.opened);
1355            // Do not exceed the limit for the owner or the device.
1356            let device_used = inner.used_bytes();
1357            let owner_bytes_left = inner.owner_id_bytes_left(owner_object_id);
1358            // We must take care not to use up space that might be reserved.
1359            let limit =
1360                std::cmp::min(owner_bytes_left, (Saturating(self.device_size) - device_used).0);
1361            len = round_down(std::cmp::min(len, limit), self.block_size);
1362            let owner_entry = inner.owner_bytes.entry(owner_object_id).or_default();
1363            owner_entry.reserved_bytes += len;
1364            Right(ReservationImpl::<_, Self>::new(&**self, Some(owner_object_id), len))
1365        };
1366
1367        ensure!(len > 0, FxfsError::NoSpace);
1368
1369        // If volumes have been deleted, flush the device so that we can use any of the freed space.
1370        let volumes_deleted = {
1371            let inner = self.inner.lock();
1372            (!inner.volumes_deleted_pending_sync.is_empty())
1373                .then(|| inner.volumes_deleted_pending_sync.clone())
1374        };
1375
1376        if let Some(volumes_deleted) = volumes_deleted {
1377            // No locks are held here, so in theory, there could be unnecessary syncs, but it
1378            // should be sufficiently rare that it won't matter.
1379            self.filesystem
1380                .upgrade()
1381                .unwrap()
1382                .sync(SyncOptions {
1383                    flush_device: true,
1384                    precondition: Some(Box::new(|| {
1385                        !self.inner.lock().volumes_deleted_pending_sync.is_empty()
1386                    })),
1387                    ..Default::default()
1388                })
1389                .await?;
1390
1391            {
1392                let mut inner = self.inner.lock();
1393                for owner_id in volumes_deleted {
1394                    inner.volumes_deleted_pending_sync.remove(&owner_id);
1395                    inner.marked_for_deletion.insert(owner_id);
1396                }
1397            }
1398
1399            let _guard = self.allocation_mutex.lock().await;
1400            self.rebuild_strategy().await?;
1401        }
1402
1403        #[allow(clippy::never_loop)] // Loop used as a for {} else {}.
1404        let _guard = 'sync: loop {
1405            // Cap number of sync attempts before giving up on finding free space.
1406            for _ in 0..10 {
1407                {
1408                    let guard = self.allocation_mutex.lock().await;
1409
1410                    if !self.needs_sync() {
1411                        break 'sync guard;
1412                    }
1413                }
1414
1415                // All the free space is currently tied up with deallocations, so we need to sync
1416                // and flush the device to free that up.
1417                //
1418                // We can't hold the allocation lock whilst we sync here because the allocation lock
1419                // is also taken in apply_mutations, which is called when journal locks are held,
1420                // and we call sync here which takes those same locks, so it would have the
1421                // potential to result in a deadlock.  Sync holds its own lock to guard against
1422                // multiple syncs occurring at the same time, and we can supply a precondition that
1423                // is evaluated under that lock to ensure we don't sync twice if we don't need to.
1424                self.filesystem
1425                    .upgrade()
1426                    .unwrap()
1427                    .sync(SyncOptions {
1428                        flush_device: true,
1429                        precondition: Some(Box::new(|| self.needs_sync())),
1430                        ..Default::default()
1431                    })
1432                    .await?;
1433            }
1434            bail!(
1435                anyhow!(FxfsError::NoSpace).context("Sync failed to yield sufficient free space.")
1436            );
1437        };
1438
1439        let mut trim_listener = None;
1440        {
1441            let mut inner = self.inner.lock();
1442            inner.allocation_size_histogram[std::cmp::min(63, len / self.block_size) as usize] += 1;
1443
1444            // If trimming would be the reason that this allocation gets cut short, wait for
1445            // trimming to complete before proceeding.
1446            let avail = self
1447                .device_size
1448                .checked_sub(inner.unavailable_bytes().0)
1449                .ok_or(FxfsError::Inconsistent)?;
1450            let free_and_not_being_trimmed =
1451                inner.bytes_available_not_being_trimmed(self.device_size)?;
1452            if free_and_not_being_trimmed < std::cmp::min(len, avail) {
1453                debug_assert!(inner.trim_reserved_bytes > 0);
1454                trim_listener = std::mem::take(&mut inner.trim_listener);
1455            }
1456        }
1457
1458        if let Some(listener) = trim_listener {
1459            listener.await;
1460        }
1461
1462        let result = loop {
1463            {
1464                let mut inner = self.inner.lock();
1465
1466                // While we know rebuild_strategy and take_for_trim are not running (we hold guard),
1467                // apply temporary_allocation removals.
1468                for device_range in inner.dropped_temporary_allocations.drain(..) {
1469                    self.temporary_allocations
1470                        .erase(&AllocatorKey { device_range: ExtentKey::new(device_range) });
1471                }
1472
1473                match inner.strategy.allocate(len) {
1474                    Err(FxfsError::NotFound) => {
1475                        // Overflow. Fall through and rebuild
1476                        inner.rebuild_strategy_trigger_histogram
1477                            [std::cmp::min(63, (len / self.block_size) as usize)] += 1;
1478                    }
1479                    Err(err) => {
1480                        error!(err:%; "Likely filesystem corruption.");
1481                        return Err(err.into());
1482                    }
1483                    Ok(x) => {
1484                        break x;
1485                    }
1486                }
1487            }
1488            // We've run out of extents of the requested length in RAM but there
1489            // exists more of this size on device. Rescan device and circle back.
1490            // We already hold the allocation_mutex, so exclusive access is guaranteed.
1491            if !self.rebuild_strategy().await? {
1492                error!("Cannot find additional free space. Corruption?");
1493                return Err(FxfsError::Inconsistent.into());
1494            }
1495        };
1496
1497        debug!(device_range:? = result; "allocate");
1498
1499        let len = result.length().unwrap();
1500        let reservation_owner = reservation.either(
1501            // Left means we got an outside reservation.
1502            |l| {
1503                l.forget_some(len);
1504                l.owner_object_id()
1505            },
1506            |r| {
1507                r.forget_some(len);
1508                r.owner_object_id()
1509            },
1510        );
1511
1512        {
1513            let mut inner = self.inner.lock();
1514            let owner_entry = inner.owner_bytes.entry(owner_object_id).or_default();
1515            owner_entry.uncommitted_allocated_bytes += len;
1516            // If the reservation has an owner, ensure they are the same.
1517            assert_eq!(owner_object_id, reservation_owner.unwrap_or(owner_object_id));
1518            inner.remove_reservation(reservation_owner, len);
1519            self.temporary_allocations.insert(AllocatorItem {
1520                key: AllocatorKey { device_range: ExtentKey::new(result.clone()) },
1521                value: AllocatorValue::Abs { owner_object_id, count: 1 },
1522            })?;
1523        }
1524
1525        let mutation =
1526            AllocatorMutation::Allocate { device_range: result.clone().into(), owner_object_id };
1527        assert!(transaction.add(self.object_id(), Mutation::Allocator(mutation)).is_none());
1528
1529        Ok(result)
1530    }
1531
1532    /// Marks the given device range as allocated.  The main use case for this at this time is for
1533    /// the super-block which needs to be at a fixed location on the device.
1534    #[trace]
1535    pub fn mark_allocated(
1536        &self,
1537        transaction: &mut Transaction<'_>,
1538        owner_object_id: u64,
1539        device_range: Range<u64>,
1540    ) -> Result<(), Error> {
1541        debug_assert_ne!(owner_object_id, INVALID_OBJECT_ID);
1542        {
1543            let len = device_range.length().map_err(|_| FxfsError::InvalidArgs)?;
1544
1545            let mut inner = self.inner.lock();
1546            let device_used = inner.used_bytes();
1547            let owner_id_bytes_left = inner.owner_id_bytes_left(owner_object_id);
1548            let owner_entry = inner.owner_bytes.entry(owner_object_id).or_default();
1549            ensure!(
1550                device_range.end <= self.device_size
1551                    && (Saturating(self.device_size) - device_used).0 >= len
1552                    && owner_id_bytes_left >= len,
1553                FxfsError::NoSpace
1554            );
1555            if let Some(reservation) = &mut transaction.allocator_reservation {
1556                // The transaction takes ownership of this hold.
1557                reservation.reserve(len).ok_or(FxfsError::NoSpace)?.forget();
1558            }
1559            owner_entry.uncommitted_allocated_bytes += len;
1560            inner.strategy.remove(device_range.clone());
1561            self.temporary_allocations.insert(AllocatorItem {
1562                key: AllocatorKey { device_range: ExtentKey::new(device_range.clone()) },
1563                value: AllocatorValue::Abs { owner_object_id, count: 1 },
1564            })?;
1565        }
1566        let mutation =
1567            AllocatorMutation::Allocate { device_range: device_range.into(), owner_object_id };
1568        transaction.add(self.object_id(), Mutation::Allocator(mutation));
1569        Ok(())
1570    }
1571
1572    /// Sets the limits for an owner object in terms of usage.
1573    pub fn set_bytes_limit(
1574        &self,
1575        transaction: &mut Transaction<'_>,
1576        owner_object_id: u64,
1577        bytes: u64,
1578    ) -> Result<(), Error> {
1579        // System stores cannot be given limits.
1580        assert!(!self.is_system_store(owner_object_id));
1581        transaction.add(
1582            self.object_id(),
1583            Mutation::Allocator(AllocatorMutation::SetLimit { owner_object_id, bytes }),
1584        );
1585        Ok(())
1586    }
1587
1588    /// Gets the bytes limit for an owner object.
1589    pub fn get_owner_bytes_limit(&self, owner_object_id: u64) -> Option<u64> {
1590        self.inner.lock().info.limit_bytes.get(&owner_object_id).copied()
1591    }
1592
1593    /// Gets the number of bytes currently used by allocations, reservations or uncommitted
1594    /// allocations.
1595    pub fn get_owner_bytes_used(&self, owner_object_id: u64) -> u64 {
1596        self.inner.lock().owner_bytes.get(&owner_object_id).map_or(0, |info| info.used_bytes().0)
1597    }
1598
1599    /// Deallocates the given device range for the specified object.
1600    #[trace]
1601    pub async fn deallocate(
1602        &self,
1603        transaction: &mut Transaction<'_>,
1604        owner_object_id: u64,
1605        dealloc_range: Range<u64>,
1606    ) -> Result<u64, Error> {
1607        debug!(device_range:? = dealloc_range; "deallocate");
1608        ensure!(dealloc_range.is_valid(), FxfsError::InvalidArgs);
1609        // We don't currently support sharing of allocations (value.count always equals 1), so as
1610        // long as we can assume the deallocated range is actually allocated, we can avoid device
1611        // access.
1612        let deallocated = dealloc_range.end - dealloc_range.start;
1613        let mutation = AllocatorMutation::Deallocate {
1614            device_range: dealloc_range.clone().into(),
1615            owner_object_id,
1616        };
1617        transaction.add(self.object_id(), Mutation::Allocator(mutation));
1618
1619        let _guard = self.allocation_mutex.lock().await;
1620
1621        // We use `dropped_temporary_allocations` to defer removals from `temporary_allocations` in
1622        // places where we can't execute async code or take locks.
1623        //
1624        // It's important we don't ever remove entries from `temporary_allocations` without
1625        // holding the `allocation_mutex` lock or else we may end up with an inconsistent view of
1626        // available disk space when we combine temporary_allocations with the LSMTree.
1627        // This is normally done in `allocate()` but we also need to apply these here because
1628        // `temporary_allocations` is also used to track deallocated space until it has been
1629        // flushed (see comment below). A user may allocate and then deallocate space before calling
1630        // allocate() a second time, so if we do not clean up here, we may end up with the same
1631        // range in temporary_allocations twice (once for allocate, once for deallocate).
1632        let mut inner = self.inner.lock();
1633        for device_range in inner.dropped_temporary_allocations.drain(..) {
1634            self.temporary_allocations
1635                .erase(&AllocatorKey { device_range: ExtentKey::new(device_range) });
1636        }
1637
1638        // We can't reuse deallocated space immediately because failure to successfully flush will
1639        // mean that on next mount, we may find this space is still assigned to the deallocated
1640        // region. To avoid immediate reuse, we hold these regions in 'temporary_allocations' until
1641        // after a successful flush so we know the region is safe to reuse.
1642        self.temporary_allocations
1643            .insert(AllocatorItem {
1644                key: AllocatorKey { device_range: ExtentKey::new(dealloc_range.clone()) },
1645                value: AllocatorValue::Abs { owner_object_id, count: 1 },
1646            })
1647            .context("tracking deallocated")?;
1648
1649        Ok(deallocated)
1650    }
1651
1652    /// Marks allocations associated with a given |owner_object_id| for deletion.
1653    ///
1654    /// This is used as part of deleting encrypted volumes (ObjectStore) without having the keys.
1655    ///
1656    /// MarkForDeletion mutations eventually manipulates allocator metadata (AllocatorInfo) instead
1657    /// of the mutable layer but we must be careful not to do this too early and risk premature
1658    /// reuse of extents.
1659    ///
1660    /// Replay is not guaranteed until the *device* gets flushed, so we cannot reuse the deleted
1661    /// extents until we've flushed the device.
1662    ///
1663    /// TODO(b/316827348): Consider removing the use of mark_for_deletion in AllocatorInfo and
1664    /// just compacting?
1665    ///
1666    /// After an allocator.flush() (i.e. a major compaction), we know that there is no data left
1667    /// in the layer files for this owner_object_id and we are able to clear `marked_for_deletion`.
1668    pub fn mark_for_deletion(&self, transaction: &mut Transaction<'_>, owner_object_id: u64) {
1669        // Note that because the actual time of deletion (the next major compaction) is undefined,
1670        // |owner_object_id| should not be reused after this call.
1671        transaction.add(
1672            self.object_id(),
1673            Mutation::Allocator(AllocatorMutation::MarkForDeletion(owner_object_id)),
1674        );
1675    }
1676
1677    /// Called when the device has been flush and indicates what the journal log offset was when
1678    /// that happened.
1679    pub fn did_flush_device(&self, flush_log_offset: u64) {
1680        // First take out the deallocations that we now know to be flushed.  The list is maintained
1681        // in order, so we can stop on the first entry that we find that should not be unreserved
1682        // yet.
1683        #[allow(clippy::never_loop)] // Loop used as a for {} else {}.
1684        let deallocs = 'deallocs_outer: loop {
1685            let mut inner = self.inner.lock();
1686            for (index, dealloc) in inner.committed_deallocated.iter().enumerate() {
1687                if dealloc.log_file_offset >= flush_log_offset {
1688                    let mut deallocs = inner.committed_deallocated.split_off(index);
1689                    // Swap because we want the opposite of what split_off does.
1690                    std::mem::swap(&mut inner.committed_deallocated, &mut deallocs);
1691                    break 'deallocs_outer deallocs;
1692                }
1693            }
1694            break std::mem::take(&mut inner.committed_deallocated);
1695        };
1696
1697        // Now we can free those elements.
1698        let mut inner = self.inner.lock();
1699        let mut totals = BTreeMap::<u64, u64>::new();
1700        for dealloc in deallocs {
1701            *(totals.entry(dealloc.owner_object_id).or_default()) +=
1702                dealloc.range.length().unwrap();
1703            inner.strategy.free(dealloc.range.clone()).expect("dealloced ranges");
1704            self.temporary_allocations
1705                .erase(&AllocatorKey { device_range: ExtentKey::new(dealloc.range.clone()) });
1706        }
1707
1708        // This *must* come after we've removed the records from reserved reservations because the
1709        // allocator uses this value to decide whether or not a device-flush is required and it must
1710        // be possible to find free space if it thinks no device-flush is required.
1711        for (owner_object_id, total) in totals {
1712            match inner.owner_bytes.get_mut(&owner_object_id) {
1713                Some(counters) => counters.committed_deallocated_bytes -= total,
1714                None => {
1715                    // This should only be possible if the volume has been deleted and a sync is
1716                    // pending.
1717                    assert!(inner.volumes_deleted_pending_sync.contains(&owner_object_id));
1718                }
1719            }
1720        }
1721    }
1722
1723    /// Returns a reservation that can be used later, or None if there is insufficient space. The
1724    /// |owner_object_id| indicates which object in the root object store the reservation is for.
1725    pub fn reserve(
1726        self: Arc<Self>,
1727        owner_object_id: Option<u64>,
1728        amount: u64,
1729    ) -> Option<Reservation> {
1730        {
1731            let mut inner = self.inner.lock();
1732
1733            let device_free = (Saturating(self.device_size) - inner.used_bytes()).0;
1734
1735            let limit = match owner_object_id {
1736                Some(id) => std::cmp::min(inner.owner_id_bytes_left(id), device_free),
1737                None => device_free,
1738            };
1739            if limit < amount {
1740                return None;
1741            }
1742            inner.add_reservation(owner_object_id, amount);
1743        }
1744        Some(Reservation::new(self, owner_object_id, amount))
1745    }
1746
1747    /// Like reserve, but takes a callback is passed the |limit| and should return the amount,
1748    /// which can be zero.
1749    pub fn reserve_with(
1750        self: Arc<Self>,
1751        owner_object_id: Option<u64>,
1752        amount: impl FnOnce(u64) -> u64,
1753    ) -> Reservation {
1754        let amount = {
1755            let mut inner = self.inner.lock();
1756
1757            let device_free = (Saturating(self.device_size) - inner.used_bytes()).0;
1758
1759            let amount = amount(match owner_object_id {
1760                Some(id) => std::cmp::min(inner.owner_id_bytes_left(id), device_free),
1761                None => device_free,
1762            });
1763
1764            inner.add_reservation(owner_object_id, amount);
1765
1766            amount
1767        };
1768
1769        Reservation::new(self, owner_object_id, amount)
1770    }
1771
1772    /// Returns the total number of allocated bytes.
1773    pub fn get_allocated_bytes(&self) -> u64 {
1774        self.inner.lock().allocated_bytes().0
1775    }
1776
1777    /// Returns the size of bytes available to allocate.
1778    pub fn get_disk_bytes(&self) -> u64 {
1779        self.device_size
1780    }
1781
1782    /// Returns the total number of allocated bytes per owner_object_id.
1783    /// Note that this is quite an expensive operation as it copies the collection.
1784    /// This is intended for use in fsck() and friends, not general use code.
1785    pub fn get_owner_allocated_bytes(&self) -> BTreeMap<u64, u64> {
1786        self.inner.lock().owner_bytes.iter().map(|(k, v)| (*k, v.allocated_bytes.0)).collect()
1787    }
1788
1789    /// Returns the number of allocated and reserved bytes.
1790    pub fn get_used_bytes(&self) -> Saturating<u64> {
1791        let inner = self.inner.lock();
1792        inner.used_bytes()
1793    }
1794}
1795
1796impl ReservationOwner for Allocator {
1797    fn release_reservation(&self, owner_object_id: Option<u64>, amount: u64) {
1798        self.inner.lock().remove_reservation(owner_object_id, amount);
1799    }
1800}
1801
1802#[async_trait]
1803impl JournalingObject for Allocator {
1804    fn apply_mutation(
1805        &self,
1806        mutation: Mutation,
1807        context: &ApplyContext<'_, '_>,
1808        _assoc_obj: AssocObj<'_>,
1809    ) -> Result<(), Error> {
1810        match mutation {
1811            Mutation::Allocator(AllocatorMutation::MarkForDeletion(owner_object_id)) => {
1812                let mut inner = self.inner.lock();
1813                inner.owner_bytes.remove(&owner_object_id);
1814
1815                // We use `info.marked_for_deletion` to track the committed state and
1816                // `inner.marked_for_deletion` to track volumes marked for deletion *after* we have
1817                // flushed the device.  It is not safe to use extents belonging to deleted volumes
1818                // until after we have flushed the device.
1819                inner.info.marked_for_deletion.insert(owner_object_id);
1820                inner.volumes_deleted_pending_sync.insert(owner_object_id);
1821
1822                inner.info.limit_bytes.remove(&owner_object_id);
1823            }
1824            Mutation::Allocator(AllocatorMutation::Allocate { device_range, owner_object_id }) => {
1825                self.maximum_offset.fetch_max(device_range.end, Ordering::Relaxed);
1826                let item = AllocatorItem {
1827                    key: AllocatorKey { device_range: ExtentKey::new(device_range.clone().into()) },
1828                    value: AllocatorValue::Abs { count: 1, owner_object_id },
1829                };
1830                let len = item.key.device_range.length().unwrap();
1831                let lower_bound = item.key.lower_bound_for_merge_into();
1832                self.tree.merge_into(item, &lower_bound);
1833                let mut inner = self.inner.lock();
1834                let entry = inner.owner_bytes.entry(owner_object_id).or_default();
1835                entry.allocated_bytes += len;
1836                if let ApplyMode::Live(transaction) = context.mode {
1837                    entry.uncommitted_allocated_bytes -= len;
1838                    // Note that we cannot drop entries from temporary_allocations without holding
1839                    // the allocation_mutex as it may introduce races. We instead add the range to
1840                    // a Vec that can be applied later when we hold the lock (See comment on
1841                    // `dropped_temporary_allocations` above).
1842                    inner.dropped_temporary_allocations.push(device_range.into());
1843                    if let Some(reservation) = transaction.allocator_reservation {
1844                        reservation.commit(len);
1845                    }
1846                }
1847            }
1848            Mutation::Allocator(AllocatorMutation::Deallocate {
1849                device_range,
1850                owner_object_id,
1851            }) => {
1852                let item = AllocatorItem {
1853                    key: AllocatorKey { device_range: ExtentKey::new(device_range.into()) },
1854                    value: AllocatorValue::None,
1855                };
1856                let len = item.key.device_range.length().unwrap();
1857
1858                {
1859                    let mut inner = self.inner.lock();
1860                    {
1861                        let entry = inner.owner_bytes.entry(owner_object_id).or_default();
1862                        entry.allocated_bytes -= len;
1863                        if context.mode.is_live() {
1864                            entry.committed_deallocated_bytes += len;
1865                        }
1866                    }
1867                    if context.mode.is_live() {
1868                        inner.committed_deallocated.push_back(CommittedDeallocation {
1869                            log_file_offset: context.checkpoint.file_offset,
1870                            range: (*item.key.device_range).clone(),
1871                            owner_object_id,
1872                        });
1873                    }
1874                    if let ApplyMode::Live(Transaction {
1875                        allocator_reservation: Some(reservation),
1876                        ..
1877                    }) = context.mode
1878                    {
1879                        inner.add_reservation(reservation.owner_object_id(), len);
1880                        reservation.add(len);
1881                    }
1882                }
1883                let lower_bound = item.key.lower_bound_for_merge_into();
1884                self.tree.merge_into(item, &lower_bound);
1885            }
1886            Mutation::Allocator(AllocatorMutation::SetLimit { owner_object_id, bytes }) => {
1887                // Journal replay is ordered and each of these calls is idempotent. So the last one
1888                // will be respected, it doesn't matter if the value is already set, or gets changed
1889                // multiple times during replay. When it gets opened it will be merged in with the
1890                // snapshot.
1891                self.inner.lock().info.limit_bytes.insert(owner_object_id, bytes);
1892            }
1893            Mutation::BeginFlush => {
1894                self.tree.seal();
1895                // Transfer our running count for allocated_bytes so that it gets written to the new
1896                // info file when flush completes.
1897                let mut inner = self.inner.lock();
1898                let allocated_bytes =
1899                    inner.owner_bytes.iter().map(|(k, v)| (*k, v.allocated_bytes.0)).collect();
1900                inner.info.allocated_bytes = allocated_bytes;
1901            }
1902            Mutation::EndFlush => {}
1903            _ => bail!("unexpected mutation: {:?}", mutation),
1904        }
1905        Ok(())
1906    }
1907
1908    fn drop_mutation(&self, mutation: Mutation, transaction: &Transaction<'_>) {
1909        match mutation {
1910            Mutation::Allocator(AllocatorMutation::Allocate { device_range, owner_object_id }) => {
1911                let len = device_range.length().unwrap();
1912                let mut inner = self.inner.lock();
1913                inner
1914                    .owner_bytes
1915                    .entry(owner_object_id)
1916                    .or_default()
1917                    .uncommitted_allocated_bytes -= len;
1918                if let Some(reservation) = transaction.allocator_reservation {
1919                    let res_owner = reservation.owner_object_id();
1920                    inner.add_reservation(res_owner, len);
1921                    reservation.release_reservation(res_owner, len);
1922                }
1923                inner.strategy.free(device_range.clone().into()).expect("drop mutaton");
1924                self.temporary_allocations
1925                    .erase(&AllocatorKey { device_range: ExtentKey::new(device_range.into()) });
1926            }
1927            Mutation::Allocator(AllocatorMutation::Deallocate { device_range, .. }) => {
1928                self.temporary_allocations
1929                    .erase(&AllocatorKey { device_range: ExtentKey::new(device_range.into()) });
1930            }
1931            _ => {}
1932        }
1933    }
1934
1935    async fn flush(&self) -> Result<Version, Error> {
1936        let filesystem = self.filesystem.upgrade().unwrap();
1937        let object_manager = filesystem.object_manager();
1938        let earliest_version = self.tree.get_earliest_version();
1939        if !object_manager.needs_flush(self.object_id()) && earliest_version == LATEST_VERSION {
1940            // Early exit, but still return the earliest version used by a struct in the tree
1941            return Ok(earliest_version);
1942        }
1943
1944        let fs = self.filesystem.upgrade().unwrap();
1945        let mut flusher = Flusher::new(self, &fs).await;
1946        let (new_layer_file, info) = flusher.start().await?;
1947        flusher.finish(new_layer_file, info).await
1948    }
1949}
1950
1951// The merger is unable to merge extents that exist like the following:
1952//
1953//     |----- +1 -----|
1954//                    |----- -1 -----|
1955//                    |----- +2 -----|
1956//
1957// It cannot coalesce them because it has to emit the +1 record so that it can move on and merge the
1958// -1 and +2 records. To address this, we add another stage that applies after merging which
1959// coalesces records after they have been emitted.  This is a bit simpler than merging because the
1960// records cannot overlap, so it's just a question of merging adjacent records if they happen to
1961// have the same delta and object_id.
1962
1963pub struct CoalescingIterator<I> {
1964    iter: I,
1965    item: Option<AllocatorItem>,
1966}
1967
1968impl<I: LayerIterator<AllocatorKey, AllocatorValue>> CoalescingIterator<I> {
1969    pub async fn new(iter: I) -> Result<CoalescingIterator<I>, Error> {
1970        let mut iter = Self { iter, item: None };
1971        iter.advance().await?;
1972        Ok(iter)
1973    }
1974}
1975
1976#[async_trait]
1977impl<I: LayerIterator<AllocatorKey, AllocatorValue>> LayerIterator<AllocatorKey, AllocatorValue>
1978    for CoalescingIterator<I>
1979{
1980    async fn advance(&mut self) -> Result<(), Error> {
1981        self.item = self.iter.get().map(|x| x.cloned());
1982        if self.item.is_none() {
1983            return Ok(());
1984        }
1985        let left = self.item.as_mut().unwrap();
1986        loop {
1987            self.iter.advance().await?;
1988            match self.iter.get() {
1989                None => return Ok(()),
1990                Some(right) => {
1991                    // The two records cannot overlap.
1992                    ensure!(
1993                        left.key.device_range.end <= right.key.device_range.start,
1994                        FxfsError::Inconsistent
1995                    );
1996                    // We can only coalesce records if they are touching and have the same value.
1997                    if left.key.device_range.end < right.key.device_range.start
1998                        || left.value != *right.value
1999                    {
2000                        return Ok(());
2001                    }
2002                    left.key.device_range.end = right.key.device_range.end;
2003                }
2004            }
2005        }
2006    }
2007
2008    fn get(&self) -> Option<ItemRef<'_, AllocatorKey, AllocatorValue>> {
2009        self.item.as_ref().map(|x| x.as_item_ref())
2010    }
2011}
2012
2013struct Flusher<'a> {
2014    allocator: &'a Allocator,
2015    fs: &'a Arc<FxFilesystem>,
2016    _guard: WriteGuard<'a>,
2017}
2018
2019impl<'a> Flusher<'a> {
2020    async fn new(allocator: &'a Allocator, fs: &'a Arc<FxFilesystem>) -> Self {
2021        let keys = lock_keys![LockKey::flush(allocator.object_id())];
2022        Self { allocator, fs, _guard: fs.lock_manager().write_lock(keys).await }
2023    }
2024
2025    fn txn_options(allocator_reservation: &Reservation) -> Options<'_> {
2026        Options {
2027            skip_journal_checks: true,
2028            borrow_metadata_space: true,
2029            allocator_reservation: Some(allocator_reservation),
2030            ..Default::default()
2031        }
2032    }
2033
2034    async fn start(&mut self) -> Result<(DataObjectHandle<ObjectStore>, AllocatorInfo), Error> {
2035        let object_manager = self.fs.object_manager();
2036        let mut transaction = self
2037            .fs
2038            .clone()
2039            .new_transaction(lock_keys![], Self::txn_options(object_manager.metadata_reservation()))
2040            .await?;
2041
2042        let root_store = self.fs.root_store();
2043        let layer_object_handle = ObjectStore::create_object(
2044            &root_store,
2045            &mut transaction,
2046            HandleOptions { skip_journal_checks: true, ..Default::default() },
2047            None,
2048        )
2049        .await?;
2050        root_store.add_to_graveyard(&mut transaction, layer_object_handle.object_id());
2051        // It's important that this transaction does not include any allocations because we use
2052        // BeginFlush as a snapshot point for mutations to the tree: other allocator mutations
2053        // within this transaction might get applied before seal (which would be OK), but they could
2054        // equally get applied afterwards (since Transaction makes no guarantees about the order in
2055        // which mutations are applied whilst committing), in which case they'd get lost on replay
2056        // because the journal will only send mutations that follow this transaction.
2057        transaction.add(self.allocator.object_id(), Mutation::BeginFlush);
2058        let info = transaction
2059            .commit_with_callback(|_| {
2060                // We must capture `info` as it is when we start flushing. Subsequent transactions
2061                // can end up modifying `info` and we shouldn't capture those here.
2062                self.allocator.inner.lock().info.clone()
2063            })
2064            .await?;
2065        Ok((layer_object_handle, info))
2066    }
2067
2068    async fn finish(
2069        self,
2070        layer_object_handle: DataObjectHandle<ObjectStore>,
2071        mut info: AllocatorInfo,
2072    ) -> Result<Version, Error> {
2073        let object_manager = self.fs.object_manager();
2074        let txn_options = Self::txn_options(object_manager.metadata_reservation());
2075
2076        let layer_set = self.allocator.tree.immutable_layer_set();
2077        let total_len = layer_set.sum_len();
2078        {
2079            let start_time = std::time::Instant::now();
2080            let merged_layer_count = layer_set.layers.len();
2081            let mut merger = layer_set.merger();
2082            let iter = self.allocator.filter(merger.query(Query::FullScan).await?, true).await?;
2083            let iter = CoalescingIterator::new(iter).await?;
2084            let bytes_written = compact_with_iterator(
2085                iter,
2086                total_len,
2087                DirectWriter::new(&layer_object_handle, txn_options).await,
2088                layer_object_handle.block_size(),
2089                Some(self.fs.journal().get_compaction_yielder()),
2090            )
2091            .await?;
2092
2093            self.allocator.tree.report_compaction_metrics(
2094                bytes_written,
2095                start_time.elapsed(),
2096                merged_layer_count,
2097            );
2098        }
2099
2100        let root_store = self.fs.root_store();
2101
2102        // Both of these forward-declared variables need to outlive the transaction.
2103        let object_handle;
2104        let reservation_update;
2105        let mut transaction = self
2106            .fs
2107            .clone()
2108            .new_transaction(
2109                lock_keys![LockKey::object(
2110                    root_store.store_object_id(),
2111                    self.allocator.object_id()
2112                )],
2113                txn_options,
2114            )
2115            .await?;
2116        let mut serialized_info = Vec::new();
2117
2118        debug!(oid = layer_object_handle.object_id(); "new allocator layer file");
2119        object_handle = ObjectStore::open_object(
2120            &root_store,
2121            self.allocator.object_id(),
2122            HandleOptions::default(),
2123            None,
2124        )
2125        .await?;
2126
2127        // Move all the existing layers to the graveyard.
2128        for object_id in &info.layers {
2129            root_store.add_to_graveyard(&mut transaction, *object_id);
2130        }
2131
2132        // Write out updated info.
2133
2134        // After successfully flushing, all the stores that were marked for deletion at the time of
2135        // the BeginFlush transaction, no longer need to be marked for deletion.  There can be
2136        // stores that have been deleted since the BeginFlush transaction, but they will be covered
2137        // by a MarkForDeletion mutation.
2138        let marked_for_deletion = std::mem::take(&mut info.marked_for_deletion);
2139
2140        info.layers = vec![layer_object_handle.object_id()];
2141
2142        info.serialize_with_version(&mut serialized_info)?;
2143
2144        let mut buf = object_handle.allocate_buffer(serialized_info.len()).await;
2145        buf.as_mut_slice()[..serialized_info.len()].copy_from_slice(&serialized_info[..]);
2146        object_handle.txn_write(&mut transaction, 0u64, buf.as_ref()).await?;
2147
2148        reservation_update = ReservationUpdate::new(tree::reservation_amount_from_layer_size(
2149            layer_object_handle.get_size(),
2150        ));
2151
2152        // It's important that EndFlush is in the same transaction that we write AllocatorInfo,
2153        // because we use EndFlush to make the required adjustments to allocated_bytes.
2154        transaction.add_with_object(
2155            self.allocator.object_id(),
2156            Mutation::EndFlush,
2157            AssocObj::Borrowed(&reservation_update),
2158        );
2159        root_store.remove_from_graveyard(&mut transaction, layer_object_handle.object_id());
2160
2161        let layers = layers_from_handles([layer_object_handle]).await?;
2162        transaction
2163            .commit_with_callback(|_| {
2164                self.allocator.tree.set_layers(layers);
2165
2166                // At this point we've committed the new layers to disk so we can start using them.
2167                // This means we can also switch to the new AllocatorInfo which clears
2168                // marked_for_deletion.
2169                let mut inner = self.allocator.inner.lock();
2170                inner.info.layers = info.layers;
2171                for owner_id in marked_for_deletion {
2172                    inner.marked_for_deletion.remove(&owner_id);
2173                    inner.info.marked_for_deletion.remove(&owner_id);
2174                }
2175            })
2176            .await?;
2177
2178        // Now close the layers and purge them.
2179        for layer in layer_set.layers {
2180            let object_id = layer.handle().map(|h| h.object_id());
2181            layer.close_layer().await;
2182            if let Some(object_id) = object_id {
2183                root_store.tombstone_object(object_id, txn_options).await?;
2184            }
2185        }
2186
2187        let mut counters = self.allocator.counters.lock();
2188        counters.num_flushes += 1;
2189        counters.last_flush_time = Some(std::time::SystemTime::now());
2190        // Return the earliest version used by a struct in the tree
2191        Ok(self.allocator.tree.get_earliest_version())
2192    }
2193}
2194
2195#[cfg(test)]
2196mod tests {
2197    use crate::filesystem::{
2198        FxFilesystem, FxFilesystemBuilder, JournalingObject, OpenFxFilesystem,
2199    };
2200    use crate::fsck::fsck;
2201    use crate::lsm_tree::cache::NullCache;
2202    use crate::lsm_tree::skip_list_layer::SkipListLayer;
2203    use crate::lsm_tree::types::{FuzzyHash as _, Item, ItemRef, Layer, LayerIterator};
2204    use crate::lsm_tree::{LSMTree, Query};
2205    use crate::object_handle::ObjectHandle;
2206    use crate::object_store::allocator::merge::merge;
2207    use crate::object_store::allocator::{
2208        Allocator, AllocatorKey, AllocatorValue, CoalescingIterator,
2209    };
2210    use crate::object_store::transaction::{Options, TRANSACTION_METADATA_MAX_AMOUNT, lock_keys};
2211    use crate::object_store::volume::root_volume;
2212    use crate::object_store::{Directory, LockKey, NewChildStoreOptions, ObjectStore};
2213    use crate::range::RangeExt;
2214    use crate::round::round_up;
2215    use crate::serialized_types::{LATEST_VERSION, Versioned};
2216    use crate::testing;
2217    use bincode::Options as _;
2218    use fuchsia_async as fasync;
2219    use fuchsia_sync::Mutex;
2220    use std::cmp::{max, min};
2221    use std::ops::{Bound, Range};
2222    use std::sync::Arc;
2223    use storage_device::DeviceHolder;
2224    use storage_device::fake_device::FakeDevice;
2225
2226    #[test]
2227    fn test_allocator_key_is_range_based() {
2228        // Make sure we disallow using allocator keys with point queries.
2229        assert!(AllocatorKey { device_range: (0..100).into() }.is_range_key());
2230    }
2231
2232    #[test]
2233    fn test_allocator_key_serialization_compatibility() {
2234        let range = 1024..2048;
2235        let key = AllocatorKey { device_range: range.clone().into() };
2236
2237        // 1. Serialize the range directly using bincode (which is what the manual
2238        //    Versioned implementation did)
2239        let options = bincode::DefaultOptions::new().allow_trailing_bytes();
2240        let expected_bytes = options.serialize(&range).unwrap();
2241
2242        // 2. Serialize AllocatorKey via our derived Versioned implementation
2243        let mut key_bytes = Vec::new();
2244        key.serialize_into(&mut key_bytes).unwrap();
2245
2246        // Verify they are byte-compatible
2247        assert_eq!(key_bytes, expected_bytes);
2248
2249        // 3. Deserialize a Range<u64> as AllocatorKey via our derived Versioned
2250        //    implementation
2251        let deserialized_key =
2252            AllocatorKey::deserialize_from(&mut &expected_bytes[..], LATEST_VERSION).unwrap();
2253        assert_eq!(deserialized_key, key);
2254    }
2255
2256    #[fuchsia::test]
2257    async fn test_coalescing_iterator() {
2258        let skip_list = SkipListLayer::new(100);
2259        let items = [
2260            Item::new(
2261                AllocatorKey { device_range: (0..100 * 512).into() },
2262                AllocatorValue::Abs { count: 1, owner_object_id: 99 },
2263            ),
2264            Item::new(
2265                AllocatorKey { device_range: (100 * 512..200 * 512).into() },
2266                AllocatorValue::Abs { count: 1, owner_object_id: 99 },
2267            ),
2268        ];
2269        skip_list.insert(items[1].clone()).expect("insert error");
2270        skip_list.insert(items[0].clone()).expect("insert error");
2271        let mut iter =
2272            CoalescingIterator::new(skip_list.seek(Bound::Unbounded).await.expect("seek failed"))
2273                .await
2274                .expect("new failed");
2275        let ItemRef { key, value, .. } = iter.get().expect("get failed");
2276        assert_eq!(
2277            (key, value),
2278            (
2279                &AllocatorKey { device_range: (0..200 * 512).into() },
2280                &AllocatorValue::Abs { count: 1, owner_object_id: 99 }
2281            )
2282        );
2283        iter.advance().await.expect("advance failed");
2284        assert!(iter.get().is_none());
2285    }
2286
2287    #[fuchsia::test]
2288    async fn test_merge_and_coalesce_across_three_layers() {
2289        let lsm_tree = LSMTree::new(merge, Box::new(NullCache {}));
2290        lsm_tree
2291            .insert(Item::new(
2292                AllocatorKey { device_range: (100 * 512..200 * 512).into() },
2293                AllocatorValue::Abs { count: 1, owner_object_id: 99 },
2294            ))
2295            .expect("insert error");
2296        lsm_tree.seal();
2297        lsm_tree
2298            .insert(Item::new(
2299                AllocatorKey { device_range: (0..100 * 512).into() },
2300                AllocatorValue::Abs { count: 1, owner_object_id: 99 },
2301            ))
2302            .expect("insert error");
2303
2304        let layer_set = lsm_tree.layer_set();
2305        let mut merger = layer_set.merger();
2306        let mut iter =
2307            CoalescingIterator::new(merger.query(Query::FullScan).await.expect("seek failed"))
2308                .await
2309                .expect("new failed");
2310        let ItemRef { key, value, .. } = iter.get().expect("get failed");
2311        assert_eq!(
2312            (key, value),
2313            (
2314                &AllocatorKey { device_range: (0..200 * 512).into() },
2315                &AllocatorValue::Abs { count: 1, owner_object_id: 99 }
2316            )
2317        );
2318        iter.advance().await.expect("advance failed");
2319        assert!(iter.get().is_none());
2320    }
2321
2322    #[fuchsia::test]
2323    async fn test_merge_and_coalesce_wont_merge_across_object_id() {
2324        let lsm_tree = LSMTree::new(merge, Box::new(NullCache {}));
2325        lsm_tree
2326            .insert(Item::new(
2327                AllocatorKey { device_range: (100 * 512..200 * 512).into() },
2328                AllocatorValue::Abs { count: 1, owner_object_id: 99 },
2329            ))
2330            .expect("insert error");
2331        lsm_tree.seal();
2332        lsm_tree
2333            .insert(Item::new(
2334                AllocatorKey { device_range: (0..100 * 512).into() },
2335                AllocatorValue::Abs { count: 1, owner_object_id: 98 },
2336            ))
2337            .expect("insert error");
2338
2339        let layer_set = lsm_tree.layer_set();
2340        let mut merger = layer_set.merger();
2341        let mut iter =
2342            CoalescingIterator::new(merger.query(Query::FullScan).await.expect("seek failed"))
2343                .await
2344                .expect("new failed");
2345        let ItemRef { key, value, .. } = iter.get().expect("get failed");
2346        assert_eq!(
2347            (key, value),
2348            (
2349                &AllocatorKey { device_range: (0..100 * 512).into() },
2350                &AllocatorValue::Abs { count: 1, owner_object_id: 98 },
2351            )
2352        );
2353        iter.advance().await.expect("advance failed");
2354        let ItemRef { key, value, .. } = iter.get().expect("get failed");
2355        assert_eq!(
2356            (key, value),
2357            (
2358                &AllocatorKey { device_range: (100 * 512..200 * 512).into() },
2359                &AllocatorValue::Abs { count: 1, owner_object_id: 99 }
2360            )
2361        );
2362        iter.advance().await.expect("advance failed");
2363        assert!(iter.get().is_none());
2364    }
2365
2366    fn overlap(a: &Range<u64>, b: &Range<u64>) -> u64 {
2367        if a.end > b.start && a.start < b.end {
2368            min(a.end, b.end) - max(a.start, b.start)
2369        } else {
2370            0
2371        }
2372    }
2373
2374    async fn collect_allocations(allocator: &Allocator) -> Vec<Range<u64>> {
2375        let layer_set = allocator.tree.layer_set();
2376        let mut merger = layer_set.merger();
2377        let mut iter = allocator
2378            .filter(merger.query(Query::FullScan).await.expect("seek failed"), false)
2379            .await
2380            .expect("build iterator");
2381        let mut allocations: Vec<Range<u64>> = Vec::new();
2382        while let Some(ItemRef { key: AllocatorKey { device_range }, .. }) = iter.get() {
2383            if let Some(r) = allocations.last() {
2384                assert!(device_range.range.start >= r.end);
2385            }
2386            allocations.push(device_range.range.clone());
2387            iter.advance().await.expect("advance failed");
2388        }
2389        allocations
2390    }
2391
2392    async fn check_allocations(allocator: &Allocator, expected_allocations: &[Range<u64>]) {
2393        let layer_set = allocator.tree.layer_set();
2394        let mut merger = layer_set.merger();
2395        let mut iter = allocator
2396            .filter(merger.query(Query::FullScan).await.expect("seek failed"), false)
2397            .await
2398            .expect("build iterator");
2399        let mut found = 0;
2400        while let Some(ItemRef { key: AllocatorKey { device_range }, .. }) = iter.get() {
2401            let mut l = device_range.range.length().expect("Invalid range");
2402            found += l;
2403            // Make sure that the entire range we have found completely overlaps with all the
2404            // allocations we expect to find.
2405            for range in expected_allocations {
2406                l -= overlap(range, &device_range.range);
2407                if l == 0 {
2408                    break;
2409                }
2410            }
2411            assert_eq!(l, 0, "range {:?} not covered by expectations", device_range.range);
2412            iter.advance().await.expect("advance failed");
2413        }
2414        // Make sure the total we found adds up to what we expect.
2415        assert_eq!(found, expected_allocations.iter().map(|r| r.length().unwrap()).sum::<u64>());
2416    }
2417
2418    async fn test_fs() -> (OpenFxFilesystem, Arc<Allocator>) {
2419        let device = DeviceHolder::new(FakeDevice::new(4096, 4096));
2420        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2421        let allocator = fs.allocator();
2422        (fs, allocator)
2423    }
2424
2425    #[fuchsia::test]
2426    async fn test_allocations() {
2427        const STORE_OBJECT_ID: u64 = 99;
2428        let (fs, allocator) = test_fs().await;
2429        let mut transaction =
2430            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2431        let mut device_ranges = collect_allocations(&allocator).await;
2432
2433        // Expected extents:
2434        let expected = vec![
2435            0..4096,        // Superblock A (4k)
2436            4096..139264,   // root_store layer files, StoreInfo.. (33x4k blocks)
2437            139264..204800, // Superblock A extension (64k)
2438            204800..335872, // Initial Journal (128k)
2439            335872..401408, // Superblock B extension (64k)
2440            524288..528384, // Superblock B (4k)
2441        ];
2442        assert_eq!(device_ranges, expected);
2443        device_ranges.push(
2444            allocator
2445                .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2446                .await
2447                .expect("allocate failed"),
2448        );
2449        assert_eq!(device_ranges.last().unwrap().length().expect("Invalid range"), fs.block_size());
2450        device_ranges.push(
2451            allocator
2452                .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2453                .await
2454                .expect("allocate failed"),
2455        );
2456        assert_eq!(device_ranges.last().unwrap().length().expect("Invalid range"), fs.block_size());
2457        assert_eq!(overlap(&device_ranges[0], &device_ranges[1]), 0);
2458        transaction.commit().await.expect("commit failed");
2459        let mut transaction =
2460            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2461        device_ranges.push(
2462            allocator
2463                .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2464                .await
2465                .expect("allocate failed"),
2466        );
2467        assert_eq!(device_ranges[7].length().expect("Invalid range"), fs.block_size());
2468        assert_eq!(overlap(&device_ranges[5], &device_ranges[7]), 0);
2469        assert_eq!(overlap(&device_ranges[6], &device_ranges[7]), 0);
2470        transaction.commit().await.expect("commit failed");
2471
2472        check_allocations(&allocator, &device_ranges).await;
2473    }
2474
2475    #[fuchsia::test]
2476    async fn test_allocate_more_than_max_size() {
2477        const STORE_OBJECT_ID: u64 = 99;
2478        let (fs, allocator) = test_fs().await;
2479        let mut transaction =
2480            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2481        let mut device_ranges = collect_allocations(&allocator).await;
2482        device_ranges.push(
2483            allocator
2484                .allocate(&mut transaction, STORE_OBJECT_ID, fs.device().size())
2485                .await
2486                .expect("allocate failed"),
2487        );
2488        assert_eq!(
2489            device_ranges.last().unwrap().length().expect("Invalid range"),
2490            allocator.max_extent_size_bytes
2491        );
2492        transaction.commit().await.expect("commit failed");
2493
2494        check_allocations(&allocator, &device_ranges).await;
2495    }
2496
2497    #[fuchsia::test]
2498    async fn test_deallocations() {
2499        const STORE_OBJECT_ID: u64 = 99;
2500        let (fs, allocator) = test_fs().await;
2501        let initial_allocations = collect_allocations(&allocator).await;
2502
2503        let mut transaction =
2504            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2505        let device_range1 = allocator
2506            .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2507            .await
2508            .expect("allocate failed");
2509        assert_eq!(device_range1.length().expect("Invalid range"), fs.block_size());
2510        transaction.commit().await.expect("commit failed");
2511
2512        let mut transaction =
2513            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2514        allocator
2515            .deallocate(&mut transaction, STORE_OBJECT_ID, device_range1)
2516            .await
2517            .expect("deallocate failed");
2518        transaction.commit().await.expect("commit failed");
2519
2520        check_allocations(&allocator, &initial_allocations).await;
2521    }
2522
2523    #[fuchsia::test]
2524    async fn test_mark_allocated() {
2525        const STORE_OBJECT_ID: u64 = 99;
2526        let (fs, allocator) = test_fs().await;
2527        let mut device_ranges = collect_allocations(&allocator).await;
2528        let range = {
2529            let mut transaction = fs
2530                .clone()
2531                .new_transaction(lock_keys![], Options::default())
2532                .await
2533                .expect("new failed");
2534            // First, allocate 2 blocks.
2535            allocator
2536                .allocate(&mut transaction, STORE_OBJECT_ID, 2 * fs.block_size())
2537                .await
2538                .expect("allocate failed")
2539            // Let the transaction drop.
2540        };
2541
2542        let mut transaction =
2543            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2544
2545        // If we allocate 1 block, the two blocks that were allocated earlier should be available,
2546        // and this should return the first of them.
2547        device_ranges.push(
2548            allocator
2549                .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2550                .await
2551                .expect("allocate failed"),
2552        );
2553
2554        assert_eq!(device_ranges.last().unwrap().start, range.start);
2555
2556        // Mark the second block as allocated.
2557        let mut range2 = range.clone();
2558        range2.start += fs.block_size();
2559        allocator
2560            .mark_allocated(&mut transaction, STORE_OBJECT_ID, range2.clone())
2561            .expect("mark_allocated failed");
2562        device_ranges.push(range2);
2563
2564        // This should avoid the range we marked as allocated.
2565        device_ranges.push(
2566            allocator
2567                .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2568                .await
2569                .expect("allocate failed"),
2570        );
2571        let last_range = device_ranges.last().unwrap();
2572        assert_eq!(last_range.length().expect("Invalid range"), fs.block_size());
2573        assert_eq!(overlap(last_range, &range), 0);
2574        transaction.commit().await.expect("commit failed");
2575
2576        check_allocations(&allocator, &device_ranges).await;
2577    }
2578
2579    #[fuchsia::test]
2580    async fn test_mark_for_deletion() {
2581        const STORE_OBJECT_ID: u64 = 99;
2582        let (fs, allocator) = test_fs().await;
2583
2584        // Allocate some stuff.
2585        let initial_allocated_bytes = allocator.get_allocated_bytes();
2586        let mut device_ranges = collect_allocations(&allocator).await;
2587        let mut transaction =
2588            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2589        // Note we have a cap on individual allocation length so we allocate over multiple mutation.
2590        for _ in 0..15 {
2591            device_ranges.push(
2592                allocator
2593                    .allocate(&mut transaction, STORE_OBJECT_ID, 100 * fs.block_size())
2594                    .await
2595                    .expect("allocate failed"),
2596            );
2597            device_ranges.push(
2598                allocator
2599                    .allocate(&mut transaction, STORE_OBJECT_ID, 100 * fs.block_size())
2600                    .await
2601                    .expect("allocate2 failed"),
2602            );
2603        }
2604        transaction.commit().await.expect("commit failed");
2605        check_allocations(&allocator, &device_ranges).await;
2606
2607        assert_eq!(
2608            allocator.get_allocated_bytes(),
2609            initial_allocated_bytes + fs.block_size() * 3000
2610        );
2611
2612        // Mark for deletion.
2613        let mut transaction =
2614            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2615        allocator.mark_for_deletion(&mut transaction, STORE_OBJECT_ID);
2616        transaction.commit().await.expect("commit failed");
2617
2618        // Expect that allocated bytes is updated immediately but device ranges are still allocated.
2619        assert_eq!(allocator.get_allocated_bytes(), initial_allocated_bytes);
2620        check_allocations(&allocator, &device_ranges).await;
2621
2622        // Allocate more space than we have until we deallocate the mark_for_deletion space.
2623        // This should force a flush on allocate(). (1500 * 3 > test_fs size of 4096 blocks).
2624        device_ranges.clear();
2625
2626        let mut transaction =
2627            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2628        let target_bytes = 1500 * fs.block_size();
2629        while device_ranges.iter().map(|x| x.length().unwrap()).sum::<u64>() != target_bytes {
2630            let len = std::cmp::min(
2631                target_bytes - device_ranges.iter().map(|x| x.length().unwrap()).sum::<u64>(),
2632                100 * fs.block_size(),
2633            );
2634            device_ranges.push(
2635                allocator.allocate(&mut transaction, 100, len).await.expect("allocate failed"),
2636            );
2637        }
2638        transaction.commit().await.expect("commit failed");
2639
2640        // Have the deleted ranges cleaned up.
2641        allocator.flush().await.expect("flush failed");
2642
2643        // The flush above seems to trigger an allocation for the allocator itself.
2644        // We will just check that we have the right size for the owner we care about.
2645
2646        assert!(!allocator.get_owner_allocated_bytes().contains_key(&STORE_OBJECT_ID));
2647        assert_eq!(*allocator.get_owner_allocated_bytes().get(&100).unwrap(), target_bytes,);
2648    }
2649
2650    async fn create_file(store: &Arc<ObjectStore>, size: usize) {
2651        let root_directory =
2652            Directory::open(store, store.root_directory_object_id()).await.expect("open failed");
2653
2654        let mut transaction = store
2655            .filesystem()
2656            .new_transaction(
2657                lock_keys![LockKey::object(
2658                    store.store_object_id(),
2659                    store.root_directory_object_id()
2660                )],
2661                Options::default(),
2662            )
2663            .await
2664            .expect("new_transaction failed");
2665        let file = root_directory
2666            .create_child_file(&mut transaction, &format!("foo {}", size))
2667            .await
2668            .expect("create_child_file failed");
2669        transaction.commit().await.expect("commit failed");
2670
2671        let buffer = file.allocate_buffer(size).await;
2672
2673        // Append some data to it.
2674        let mut transaction = file
2675            .new_transaction_with_options(Options {
2676                borrow_metadata_space: true,
2677                ..Default::default()
2678            })
2679            .await
2680            .expect("new_transaction_with_options failed");
2681        file.txn_write(&mut transaction, 0, buffer.as_ref()).await.expect("txn_write failed");
2682        transaction.commit().await.expect("commit failed");
2683    }
2684
2685    #[fuchsia::test]
2686    async fn test_replay_with_deleted_store_and_compaction() {
2687        let (fs, _) = test_fs().await;
2688
2689        const FILE_SIZE: usize = 10_000_000;
2690
2691        let mut store_id = {
2692            let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2693            let store = root_vol
2694                .new_volume("vol", NewChildStoreOptions::default())
2695                .await
2696                .expect("new_volume failed");
2697
2698            create_file(&store, FILE_SIZE).await;
2699            store.store_object_id()
2700        };
2701
2702        fs.close().await.expect("close failed");
2703        let device = fs.take_device().await;
2704        device.reopen(false);
2705
2706        let mut fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2707
2708        // Compact so that when we replay the transaction to delete the store won't find any
2709        // mutations.
2710        fs.journal().force_compact().await.expect("compact failed");
2711
2712        for _ in 0..2 {
2713            {
2714                let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2715
2716                let transaction = fs
2717                    .clone()
2718                    .new_transaction(
2719                        lock_keys![
2720                            LockKey::object(
2721                                root_vol.volume_directory().store().store_object_id(),
2722                                root_vol.volume_directory().object_id(),
2723                            ),
2724                            LockKey::flush(store_id)
2725                        ],
2726                        Options { borrow_metadata_space: true, ..Default::default() },
2727                    )
2728                    .await
2729                    .expect("new_transaction failed");
2730                root_vol
2731                    .delete_volume("vol", transaction, || {})
2732                    .await
2733                    .expect("delete_volume failed");
2734
2735                let store = root_vol
2736                    .new_volume("vol", NewChildStoreOptions::default())
2737                    .await
2738                    .expect("new_volume failed");
2739                create_file(&store, FILE_SIZE).await;
2740                store_id = store.store_object_id();
2741            }
2742
2743            fs.close().await.expect("close failed");
2744            let device = fs.take_device().await;
2745            device.reopen(false);
2746
2747            fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2748        }
2749
2750        fsck(fs.clone()).await.expect("fsck failed");
2751        fs.close().await.expect("close failed");
2752    }
2753
2754    #[fuchsia::test(threads = 4)]
2755    async fn test_compaction_delete_race() {
2756        let (fs, _allocator) = test_fs().await;
2757
2758        {
2759            const FILE_SIZE: usize = 10_000_000;
2760
2761            let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2762            let store = root_vol
2763                .new_volume("vol", NewChildStoreOptions::default())
2764                .await
2765                .expect("new_volume failed");
2766
2767            create_file(&store, FILE_SIZE).await;
2768
2769            // Race compaction with deleting a store.
2770            let fs_clone = fs.clone();
2771
2772            // Even though the executor has 4 threads, it's hard to get it to run with
2773            // multiple threads.
2774            let executor_tasks = testing::force_executor_threads_to_run(4).await;
2775
2776            let task = fasync::Task::spawn(async move {
2777                fs_clone.journal().force_compact().await.expect("compact failed");
2778            });
2779
2780            // We don't need the executor tasks any more.
2781            drop(executor_tasks);
2782
2783            // This range is chosen such that it caused this test to fail after quite a low number
2784            // of iterations for the bug that this test was introduced for.
2785            let sleep = rand::random_range(3000..6000);
2786            std::thread::sleep(std::time::Duration::from_micros(sleep));
2787            log::info!("sleep {sleep}us");
2788
2789            let transaction = fs
2790                .clone()
2791                .new_transaction(
2792                    lock_keys![
2793                        LockKey::object(
2794                            root_vol.volume_directory().store().store_object_id(),
2795                            root_vol.volume_directory().object_id(),
2796                        ),
2797                        LockKey::flush(store.store_object_id())
2798                    ],
2799                    Options { borrow_metadata_space: true, ..Default::default() },
2800                )
2801                .await
2802                .expect("new_transaction failed");
2803            root_vol.delete_volume("vol", transaction, || {}).await.expect("delete_volume failed");
2804
2805            task.await;
2806        }
2807
2808        fs.journal().force_compact().await.expect("compact failed");
2809        fs.close().await.expect("close failed");
2810
2811        let device = fs.take_device().await;
2812        device.reopen(false);
2813
2814        let fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2815        fsck(fs.clone()).await.expect("fsck failed");
2816        fs.close().await.expect("close failed");
2817    }
2818
2819    #[fuchsia::test]
2820    async fn test_delete_multiple_volumes() {
2821        let (mut fs, _) = test_fs().await;
2822
2823        for _ in 0..50 {
2824            {
2825                let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2826                let store = root_vol
2827                    .new_volume("vol", NewChildStoreOptions::default())
2828                    .await
2829                    .expect("new_volume failed");
2830
2831                create_file(&store, 1_000_000).await;
2832
2833                let transaction = fs
2834                    .clone()
2835                    .new_transaction(
2836                        lock_keys![
2837                            LockKey::object(
2838                                root_vol.volume_directory().store().store_object_id(),
2839                                root_vol.volume_directory().object_id(),
2840                            ),
2841                            LockKey::flush(store.store_object_id())
2842                        ],
2843                        Options { borrow_metadata_space: true, ..Default::default() },
2844                    )
2845                    .await
2846                    .expect("new_transaction failed");
2847                root_vol
2848                    .delete_volume("vol", transaction, || {})
2849                    .await
2850                    .expect("delete_volume failed");
2851
2852                fs.allocator().flush().await.expect("flush failed");
2853            }
2854
2855            fs.close().await.expect("close failed");
2856            let device = fs.take_device().await;
2857            device.reopen(false);
2858
2859            fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2860        }
2861
2862        fsck(fs.clone()).await.expect("fsck failed");
2863        fs.close().await.expect("close failed");
2864    }
2865
2866    #[fuchsia::test]
2867    async fn test_allocate_free_reallocate() {
2868        const STORE_OBJECT_ID: u64 = 99;
2869        let (fs, allocator) = test_fs().await;
2870
2871        // Allocate some stuff.
2872        let mut device_ranges = Vec::new();
2873        let mut transaction =
2874            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2875        for _ in 0..30 {
2876            device_ranges.push(
2877                allocator
2878                    .allocate(&mut transaction, STORE_OBJECT_ID, 100 * fs.block_size())
2879                    .await
2880                    .expect("allocate failed"),
2881            );
2882        }
2883        transaction.commit().await.expect("commit failed");
2884
2885        assert_eq!(
2886            fs.block_size() * 3000,
2887            *allocator.get_owner_allocated_bytes().entry(STORE_OBJECT_ID).or_default()
2888        );
2889
2890        // Delete it all.
2891        let mut transaction =
2892            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2893        for range in std::mem::replace(&mut device_ranges, Vec::new()) {
2894            allocator.deallocate(&mut transaction, STORE_OBJECT_ID, range).await.expect("dealloc");
2895        }
2896        transaction.commit().await.expect("commit failed");
2897
2898        assert_eq!(0, *allocator.get_owner_allocated_bytes().entry(STORE_OBJECT_ID).or_default());
2899
2900        // Allocate some more stuff. Due to storage pressure, this requires us to flush device
2901        // before reusing the above space
2902        let mut transaction =
2903            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2904        let target_len = 1500 * fs.block_size();
2905        while device_ranges.iter().map(|i| i.length().unwrap()).sum::<u64>() != target_len {
2906            let len = target_len - device_ranges.iter().map(|i| i.length().unwrap()).sum::<u64>();
2907            device_ranges.push(
2908                allocator
2909                    .allocate(&mut transaction, STORE_OBJECT_ID, len)
2910                    .await
2911                    .expect("allocate failed"),
2912            );
2913        }
2914        transaction.commit().await.expect("commit failed");
2915
2916        assert_eq!(
2917            fs.block_size() * 1500,
2918            *allocator.get_owner_allocated_bytes().entry(STORE_OBJECT_ID).or_default()
2919        );
2920    }
2921
2922    #[fuchsia::test]
2923    async fn test_flush() {
2924        const STORE_OBJECT_ID: u64 = 99;
2925
2926        let mut device_ranges = Vec::new();
2927        let device = {
2928            let (fs, allocator) = test_fs().await;
2929            let mut transaction = fs
2930                .clone()
2931                .new_transaction(lock_keys![], Options::default())
2932                .await
2933                .expect("new failed");
2934            device_ranges.push(
2935                allocator
2936                    .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2937                    .await
2938                    .expect("allocate failed"),
2939            );
2940            device_ranges.push(
2941                allocator
2942                    .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2943                    .await
2944                    .expect("allocate failed"),
2945            );
2946            device_ranges.push(
2947                allocator
2948                    .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2949                    .await
2950                    .expect("allocate failed"),
2951            );
2952            transaction.commit().await.expect("commit failed");
2953
2954            allocator.flush().await.expect("flush failed");
2955
2956            fs.close().await.expect("close failed");
2957            fs.take_device().await
2958        };
2959
2960        device.reopen(false);
2961        let fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2962        let allocator = fs.allocator();
2963
2964        let allocated = collect_allocations(&allocator).await;
2965
2966        // Make sure the ranges we allocated earlier are still allocated.
2967        for i in &device_ranges {
2968            let mut overlapping = 0;
2969            for j in &allocated {
2970                overlapping += overlap(i, j);
2971            }
2972            assert_eq!(overlapping, i.length().unwrap(), "Range {i:?} not allocated");
2973        }
2974
2975        let mut transaction =
2976            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2977        let range = allocator
2978            .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2979            .await
2980            .expect("allocate failed");
2981
2982        // Make sure the range just allocated doesn't overlap any other allocated ranges.
2983        for r in &allocated {
2984            assert_eq!(overlap(r, &range), 0);
2985        }
2986        transaction.commit().await.expect("commit failed");
2987    }
2988
2989    #[fuchsia::test]
2990    async fn test_dropped_transaction() {
2991        const STORE_OBJECT_ID: u64 = 99;
2992        let (fs, allocator) = test_fs().await;
2993        let allocated_range = {
2994            let mut transaction = fs
2995                .clone()
2996                .new_transaction(lock_keys![], Options::default())
2997                .await
2998                .expect("new_transaction failed");
2999            allocator
3000                .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
3001                .await
3002                .expect("allocate failed")
3003        };
3004        // After dropping the transaction and attempting to allocate again, we should end up with
3005        // the same range because the reservation should have been released.
3006        let mut transaction = fs
3007            .clone()
3008            .new_transaction(lock_keys![], Options::default())
3009            .await
3010            .expect("new_transaction failed");
3011        assert_eq!(
3012            allocator
3013                .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
3014                .await
3015                .expect("allocate failed"),
3016            allocated_range
3017        );
3018    }
3019
3020    #[fuchsia::test]
3021    async fn test_cleanup_removed_owner() {
3022        const STORE_OBJECT_ID: u64 = 99;
3023        let device = {
3024            let (fs, allocator) = test_fs().await;
3025
3026            assert!(!allocator.get_owner_allocated_bytes().contains_key(&STORE_OBJECT_ID));
3027            {
3028                let mut transaction =
3029                    fs.clone().new_transaction(lock_keys![], Options::default()).await.unwrap();
3030                allocator
3031                    .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
3032                    .await
3033                    .expect("Allocating");
3034                transaction.commit().await.expect("Committing.");
3035            }
3036            allocator.flush().await.expect("Flushing");
3037            assert!(allocator.get_owner_allocated_bytes().contains_key(&STORE_OBJECT_ID));
3038            {
3039                let mut transaction =
3040                    fs.clone().new_transaction(lock_keys![], Options::default()).await.unwrap();
3041                allocator.mark_for_deletion(&mut transaction, STORE_OBJECT_ID);
3042                transaction.commit().await.expect("Committing.");
3043            }
3044            assert!(!allocator.get_owner_allocated_bytes().contains_key(&STORE_OBJECT_ID));
3045            fs.close().await.expect("Closing");
3046            fs.take_device().await
3047        };
3048
3049        device.reopen(false);
3050        let fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
3051        let allocator = fs.allocator();
3052        assert!(!allocator.get_owner_allocated_bytes().contains_key(&STORE_OBJECT_ID));
3053    }
3054
3055    #[fuchsia::test]
3056    async fn test_allocated_bytes() {
3057        const STORE_OBJECT_ID: u64 = 99;
3058        let (fs, allocator) = test_fs().await;
3059
3060        let initial_allocated_bytes = allocator.get_allocated_bytes();
3061
3062        // Verify allocated_bytes reflects allocation changes.
3063        let allocated_bytes = initial_allocated_bytes + fs.block_size();
3064        let allocated_range = {
3065            let mut transaction = fs
3066                .clone()
3067                .new_transaction(lock_keys![], Options::default())
3068                .await
3069                .expect("new_transaction failed");
3070            let range = allocator
3071                .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
3072                .await
3073                .expect("allocate failed");
3074            transaction.commit().await.expect("commit failed");
3075            assert_eq!(allocator.get_allocated_bytes(), allocated_bytes);
3076            range
3077        };
3078
3079        {
3080            let mut transaction = fs
3081                .clone()
3082                .new_transaction(lock_keys![], Options::default())
3083                .await
3084                .expect("new_transaction failed");
3085            allocator
3086                .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
3087                .await
3088                .expect("allocate failed");
3089
3090            // Prior to committing, the count of allocated bytes shouldn't change.
3091            assert_eq!(allocator.get_allocated_bytes(), allocated_bytes);
3092        }
3093
3094        // After dropping the prior transaction, the allocated bytes still shouldn't have changed.
3095        assert_eq!(allocator.get_allocated_bytes(), allocated_bytes);
3096
3097        // Verify allocated_bytes reflects deallocations.
3098        let deallocate_range = allocated_range.start + 20..allocated_range.end - 20;
3099        let mut transaction =
3100            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
3101        allocator
3102            .deallocate(&mut transaction, STORE_OBJECT_ID, deallocate_range)
3103            .await
3104            .expect("deallocate failed");
3105
3106        // Before committing, there should be no change.
3107        assert_eq!(allocator.get_allocated_bytes(), allocated_bytes);
3108
3109        transaction.commit().await.expect("commit failed");
3110
3111        // After committing, all but 40 bytes should remain allocated.
3112        assert_eq!(allocator.get_allocated_bytes(), initial_allocated_bytes + 40);
3113    }
3114
3115    #[fuchsia::test]
3116    async fn test_persist_bytes_limit() {
3117        const LIMIT: u64 = 12345;
3118        const OWNER_ID: u64 = 12;
3119
3120        let (fs, allocator) = test_fs().await;
3121        {
3122            let mut transaction = fs
3123                .clone()
3124                .new_transaction(lock_keys![], Options::default())
3125                .await
3126                .expect("new_transaction failed");
3127            allocator
3128                .set_bytes_limit(&mut transaction, OWNER_ID, LIMIT)
3129                .expect("Failed to set limit.");
3130            assert!(allocator.inner.lock().info.limit_bytes.get(&OWNER_ID).is_none());
3131            transaction.commit().await.expect("Failed to commit transaction");
3132            let bytes: u64 = *allocator
3133                .inner
3134                .lock()
3135                .info
3136                .limit_bytes
3137                .get(&OWNER_ID)
3138                .expect("Failed to find limit");
3139            assert_eq!(LIMIT, bytes);
3140        }
3141    }
3142
3143    /// Given a sorted list of non-overlapping ranges, this will coalesce adjacent ranges.
3144    /// This allows comparison of equivalent sets of ranges which may occur due to differences
3145    /// across allocator strategies.
3146    fn coalesce_ranges(ranges: Vec<Range<u64>>) -> Vec<Range<u64>> {
3147        let mut coalesced = Vec::new();
3148        let mut prev: Option<Range<u64>> = None;
3149        for range in ranges {
3150            if let Some(prev_range) = &mut prev {
3151                if range.start == prev_range.end {
3152                    prev_range.end = range.end;
3153                } else {
3154                    coalesced.push(prev_range.clone());
3155                    prev = Some(range);
3156                }
3157            } else {
3158                prev = Some(range);
3159            }
3160        }
3161        if let Some(prev_range) = prev {
3162            coalesced.push(prev_range);
3163        }
3164        coalesced
3165    }
3166
3167    #[fuchsia::test]
3168    async fn test_take_for_trimming() {
3169        const STORE_OBJECT_ID: u64 = 99;
3170
3171        // Allocate a large chunk, then free a few bits of it, so we have free chunks interleaved
3172        // with allocated chunks.
3173        let allocated_range;
3174        let expected_free_ranges;
3175        let device = {
3176            let (fs, allocator) = test_fs().await;
3177            let bs = fs.block_size();
3178            let mut transaction = fs
3179                .clone()
3180                .new_transaction(lock_keys![], Options::default())
3181                .await
3182                .expect("new failed");
3183            allocated_range = allocator
3184                .allocate(&mut transaction, STORE_OBJECT_ID, 32 * bs)
3185                .await
3186                .expect("allocate failed");
3187            transaction.commit().await.expect("commit failed");
3188
3189            let mut transaction = fs
3190                .clone()
3191                .new_transaction(lock_keys![], Options::default())
3192                .await
3193                .expect("new failed");
3194            let base = allocated_range.start;
3195            expected_free_ranges = vec![
3196                base..(base + (bs * 1)),
3197                (base + (bs * 2))..(base + (bs * 3)),
3198                // Note that the next three ranges are adjacent and will be treated as one free
3199                // range once applied.  We separate them here to exercise the handling of "large"
3200                // free ranges.
3201                (base + (bs * 4))..(base + (bs * 8)),
3202                (base + (bs * 8))..(base + (bs * 12)),
3203                (base + (bs * 12))..(base + (bs * 13)),
3204                (base + (bs * 29))..(base + (bs * 30)),
3205            ];
3206            for range in &expected_free_ranges {
3207                allocator
3208                    .deallocate(&mut transaction, STORE_OBJECT_ID, range.clone())
3209                    .await
3210                    .expect("deallocate failed");
3211            }
3212            transaction.commit().await.expect("commit failed");
3213
3214            allocator.flush().await.expect("flush failed");
3215
3216            fs.close().await.expect("close failed");
3217            fs.take_device().await
3218        };
3219
3220        device.reopen(false);
3221        let fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
3222        let allocator = fs.allocator();
3223
3224        // These values were picked so that each of them would be the reason why
3225        // collect_free_extents finished, and so we would return after partially processing one of
3226        // the free extents.
3227        let max_extent_size = fs.block_size() as usize * 4;
3228        const EXTENTS_PER_BATCH: usize = 2;
3229        let mut free_ranges = vec![];
3230        let mut offset = allocated_range.start;
3231        while offset < allocated_range.end {
3232            let free = allocator
3233                .take_for_trimming(offset, max_extent_size, EXTENTS_PER_BATCH)
3234                .await
3235                .expect("take_for_trimming failed");
3236            free_ranges.extend(
3237                free.extents().iter().filter(|range| range.end <= allocated_range.end).cloned(),
3238            );
3239            offset = free.extents().last().expect("Unexpectedly hit the end of free extents").end;
3240        }
3241        // Coalesce adjacent free ranges because the allocator may return smaller aligned chunks
3242        // but the overall range should always be equivalent.
3243        let coalesced_free_ranges = coalesce_ranges(free_ranges);
3244        let coalesced_expected_free_ranges = coalesce_ranges(expected_free_ranges);
3245
3246        assert_eq!(coalesced_free_ranges, coalesced_expected_free_ranges);
3247    }
3248
3249    #[fuchsia::test]
3250    async fn test_allocations_wait_for_free_extents() {
3251        const STORE_OBJECT_ID: u64 = 99;
3252        let (fs, allocator) = test_fs().await;
3253        let allocator_clone = allocator.clone();
3254
3255        let mut transaction =
3256            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
3257
3258        // Tie up all of the free extents on the device, and make sure allocations block.
3259        let max_extent_size = fs.device().size() as usize;
3260        const EXTENTS_PER_BATCH: usize = usize::MAX;
3261
3262        // HACK: Treat `trimmable_extents` as being locked by `trim_done` (i.e. it should only be
3263        // accessed whilst `trim_done` is locked). We can't combine them into the same mutex,
3264        // because the inner type would be "poisoned" by the lifetime parameter of
3265        // `trimmable_extents` (which is in the lifetime of `allocator`), and then we can't move it
3266        // into `alloc_task` which would require a `'static` lifetime.
3267        let trim_done = Arc::new(Mutex::new(false));
3268        let trimmable_extents = allocator
3269            .take_for_trimming(0, max_extent_size, EXTENTS_PER_BATCH)
3270            .await
3271            .expect("take_for_trimming failed");
3272
3273        let trim_done_clone = trim_done.clone();
3274        let bs = fs.block_size();
3275        let alloc_task = fasync::Task::spawn(async move {
3276            allocator_clone
3277                .allocate(&mut transaction, STORE_OBJECT_ID, bs)
3278                .await
3279                .expect("allocate should fail");
3280            {
3281                assert!(*trim_done_clone.lock(), "Allocation finished before trim completed");
3282            }
3283            transaction.commit().await.expect("commit failed");
3284        });
3285
3286        // Add a small delay to simulate the trim taking some nonzero amount of time.  Otherwise,
3287        // this will almost certainly always beat the allocation attempt.
3288        fasync::Timer::new(std::time::Duration::from_millis(100)).await;
3289
3290        // Once the free extents are released, the task should unblock.
3291        {
3292            let mut trim_done = trim_done.lock();
3293            std::mem::drop(trimmable_extents);
3294            *trim_done = true;
3295        }
3296
3297        alloc_task.await;
3298    }
3299
3300    #[fuchsia::test]
3301    async fn test_allocation_with_reservation_not_multiple_of_block_size() {
3302        const STORE_OBJECT_ID: u64 = 99;
3303        let (fs, allocator) = test_fs().await;
3304
3305        // Reserve an amount that isn't a multiple of the block size.
3306        const RESERVATION_AMOUNT: u64 = TRANSACTION_METADATA_MAX_AMOUNT + 5000;
3307        let reservation =
3308            allocator.clone().reserve(Some(STORE_OBJECT_ID), RESERVATION_AMOUNT).unwrap();
3309
3310        let mut transaction = fs
3311            .clone()
3312            .new_transaction(
3313                lock_keys![],
3314                Options { allocator_reservation: Some(&reservation), ..Options::default() },
3315            )
3316            .await
3317            .expect("new failed");
3318
3319        let range = allocator
3320            .allocate(
3321                &mut transaction,
3322                STORE_OBJECT_ID,
3323                round_up(RESERVATION_AMOUNT, fs.block_size()).unwrap(),
3324            )
3325            .await
3326            .expect("allocate faiiled");
3327        assert_eq!((range.end - range.start) % fs.block_size(), 0);
3328
3329        println!("{}", range.end - range.start);
3330    }
3331}