fxfs/object_store/
allocator.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! # The Allocator
6//!
7//! The allocator in Fxfs is filesystem-wide entity responsible for managing the allocation of
8//! regions of the device to "owners" (which are `ObjectStore`).
9//!
10//! Allocations are tracked in an LSMTree with coalescing used to merge neighboring allocations
11//! with the same properties (owner and reference count). As of writing, reference counting is not
12//! used. (Reference counts are intended for future use if/when snapshot support is implemented.)
13//!
14//! There are a number of important implementation features of the allocator that warrant further
15//! documentation.
16//!
17//! ## Byte limits
18//!
19//! Fxfs is a multi-volume filesystem. Fuchsia with fxblob currently uses two primary volumes -
20//! an unencrypted volume for blob storage and an encrypted volume for data storage.
21//! Byte limits ensure that no one volume can consume all available storage. This is important
22//! as software updates must always be possible (blobs) and conversely configuration data should
23//! always be writable (data).
24//!
25//! ## Reservation tracking
26//!
27//! Fxfs on Fuchsia leverages write-back caching which allows us to buffer writes in RAM until
28//! memory pressure, an explicit flush or a file close requires us to persist data to storage.
29//!
30//! To ensure that we do not over-commit in such cases (e.g. by writing many files to RAM only to
31//! find out tha there is insufficient disk to store them all), the allocator includes a simple
32//! reservation tracking mechanism.
33//!
34//! Reservation tracking is implemented hierarchically such that a reservation can portion out
35//! sub-reservations, each of which may be "reserved" or "forgotten" when it comes time to actually
36//! allocate space.
37//!
38//! ## Fixed locations
39//!
40//! The only fixed location files in Fxfs are the first 512kiB extents of the two superblocks that
41//! exist as the first things on the disk (i.e. The first 1MiB). The allocator manages these
42//! locations, but does so using a 'mark_allocated' method distinct from all other allocations in
43//! which the location is left up to the allocator.
44//!
45//! ## Deallocated unusable regions
46//!
47//! It is not legal to reuse a deallocated disk region until after a flush. Transactions
48//! are not guaranteed to be persisted until after a successful flush so any reuse of
49//! a region before then followed by power loss may lead to corruption.
50//!
51//! e.g. Consider if we deallocate a file, reuse its extent for another file, then crash after
52//! writing to the new file but not yet flushing the journal. At next mount we will have lost the
53//! transaction despite overwriting the original file's data, leading to inconsistency errors (data
54//! corruption).
55//!
56//! These regions are currently tracked in RAM in the allocator.
57//!
58//! ## Allocated but uncommitted regions
59//!
60//! These are allocated regions that are not (yet) persisted to disk. They are regular
61//! file allocations, but are not stored on persistent storage until their transaction is committed
62//! and safely flushed to disk.
63//!
64//! ## TRIMed unusable regions
65//!
66//! We periodically TRIM unallocated regions to give the SSD controller insight into which
67//! parts of the device contain data. We must avoid using temporary TRIM allocations that are held
68//! while we perform these operations.
69//!
70//! ## Volume deletion
71//!
72//! We make use of an optimisation in the case where an entire volume is deleted. In such cases,
73//! rather than individually deallocate all disk regions associated with that volume, we make
74//! note of the deletion and perform special merge operation on the next LSMTree compaction that
75//! filters out allocations for the deleted volume.
76//!
77//! This is designed to make dropping of volumes significantly cheaper, but it does add some
78//! additional complexity if implementing an allocator that implements data structures to track
79//! free space (rather than just allocated space).
80//!
81//! ## Image generation
82//!
83//! The Fuchsia build process requires building an initial filesystem image. In the case of
84//! fxblob-based boards, this is an Fxfs filesystem containing a volume with the base set of
85//! blobs required to bootstrap the system. When we build such an image, we want it to be as compact
86//! as possible as we're potentially packaging it up for distribution. To that end, our allocation
87//! strategy (or at least the strategy used for image generation) should prefer to allocate from the
88//! start of the device wherever possible.
89pub mod merge;
90pub mod strategy;
91
92use crate::drop_event::DropEvent;
93use crate::errors::FxfsError;
94use crate::filesystem::{ApplyContext, ApplyMode, FxFilesystem, JournalingObject, SyncOptions};
95use crate::log::*;
96use crate::lsm_tree::cache::NullCache;
97use crate::lsm_tree::skip_list_layer::SkipListLayer;
98use crate::lsm_tree::types::{
99    FuzzyHash, Item, ItemRef, Layer, LayerIterator, LayerKey, MergeType, OrdLowerBound,
100    OrdUpperBound, RangeKey, SortByU64, Value,
101};
102use crate::lsm_tree::{layers_from_handles, LSMTree, Query};
103use crate::object_handle::{ObjectHandle, ReadObjectHandle, INVALID_OBJECT_ID};
104use crate::object_store::object_manager::ReservationUpdate;
105use crate::object_store::transaction::{
106    lock_keys, AllocatorMutation, AssocObj, LockKey, Mutation, Options, Transaction, WriteGuard,
107};
108use crate::object_store::{tree, DataObjectHandle, DirectWriter, HandleOptions, ObjectStore};
109use crate::range::RangeExt;
110use crate::round::{round_div, round_down, round_up};
111use crate::serialized_types::{
112    Version, Versioned, VersionedLatest, DEFAULT_MAX_SERIALIZED_RECORD_SIZE, LATEST_VERSION,
113};
114use anyhow::{anyhow, bail, ensure, Context, Error};
115use async_trait::async_trait;
116use either::Either::{Left, Right};
117use event_listener::EventListener;
118use fprint::TypeFingerprint;
119use fuchsia_inspect::{ArrayProperty, HistogramProperty};
120use fuchsia_sync::Mutex;
121use futures::FutureExt;
122use merge::{filter_marked_for_deletion, filter_tombstones, merge};
123use rustc_hash::FxHasher;
124use serde::{Deserialize, Serialize};
125use std::borrow::Borrow;
126use std::collections::{BTreeMap, HashSet, VecDeque};
127use std::hash::{Hash, Hasher as _};
128use std::marker::PhantomData;
129use std::num::Saturating;
130use std::ops::Range;
131use std::sync::atomic::{AtomicU64, Ordering};
132use std::sync::{Arc, Weak};
133
134/// This trait is implemented by things that own reservations.
135pub trait ReservationOwner: Send + Sync {
136    /// Report that bytes are being released from the reservation back to the the |ReservationOwner|
137    /// where |owner_object_id| is the owner under the root object store associated with the
138    /// reservation.
139    fn release_reservation(&self, owner_object_id: Option<u64>, amount: u64);
140}
141
142/// A reservation guarantees that when it comes time to actually allocate, it will not fail due to
143/// lack of space.  Sub-reservations (a.k.a. holds) are possible which effectively allows part of a
144/// reservation to be set aside until it's time to commit.  Reservations do offer some
145/// thread-safety, but some responsibility is born by the caller: e.g. calling `forget` and
146/// `reserve` at the same time from different threads is unsafe. Reservations are have an
147/// |owner_object_id| which associates it with an object under the root object store that the
148/// reservation is accounted against.
149pub struct ReservationImpl<T: Borrow<U>, U: ReservationOwner + ?Sized> {
150    owner: T,
151    owner_object_id: Option<u64>,
152    inner: Mutex<ReservationInner>,
153    phantom: PhantomData<U>,
154}
155
156#[derive(Debug, Default)]
157struct ReservationInner {
158    // Amount currently held by this reservation.
159    amount: u64,
160
161    // Amount reserved by sub-reservations.
162    reserved: u64,
163}
164
165impl<T: Borrow<U>, U: ReservationOwner + ?Sized> std::fmt::Debug for ReservationImpl<T, U> {
166    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
167        self.inner.lock().fmt(f)
168    }
169}
170
171impl<T: Borrow<U> + Clone + Send + Sync, U: ReservationOwner + ?Sized> ReservationImpl<T, U> {
172    pub fn new(owner: T, owner_object_id: Option<u64>, amount: u64) -> Self {
173        Self {
174            owner,
175            owner_object_id,
176            inner: Mutex::new(ReservationInner { amount, reserved: 0 }),
177            phantom: PhantomData,
178        }
179    }
180
181    pub fn owner_object_id(&self) -> Option<u64> {
182        self.owner_object_id
183    }
184
185    /// Returns the total amount of the reservation, not accounting for anything that might be held.
186    pub fn amount(&self) -> u64 {
187        self.inner.lock().amount
188    }
189
190    /// Adds more to the reservation.
191    pub fn add(&self, amount: u64) {
192        self.inner.lock().amount += amount;
193    }
194
195    /// Returns the entire amount of the reservation.  The caller is responsible for maintaining
196    /// consistency, i.e. updating counters, etc, and there can be no sub-reservations (an assert
197    /// will fire otherwise).
198    pub fn forget(&self) -> u64 {
199        let mut inner = self.inner.lock();
200        assert_eq!(inner.reserved, 0);
201        std::mem::take(&mut inner.amount)
202    }
203
204    /// Takes some of the reservation.  The caller is responsible for maintaining consistency,
205    /// i.e. updating counters, etc.  This will assert that the amount being forgotten does not
206    /// exceed the available reservation amount; the caller should ensure that this is the case.
207    pub fn forget_some(&self, amount: u64) {
208        let mut inner = self.inner.lock();
209        inner.amount -= amount;
210        assert!(inner.reserved <= inner.amount);
211    }
212
213    /// Returns a partial amount of the reservation. |amount| is passed the |limit| and should
214    /// return the amount, which can be zero.
215    fn reserve_with(&self, amount: impl FnOnce(u64) -> u64) -> ReservationImpl<&Self, Self> {
216        let mut inner = self.inner.lock();
217        let taken = amount(inner.amount - inner.reserved);
218        inner.reserved += taken;
219        ReservationImpl::new(self, self.owner_object_id, taken)
220    }
221
222    /// Reserves *exactly* amount if possible.
223    pub fn reserve(&self, amount: u64) -> Option<ReservationImpl<&Self, Self>> {
224        let mut inner = self.inner.lock();
225        if inner.amount - inner.reserved < amount {
226            None
227        } else {
228            inner.reserved += amount;
229            Some(ReservationImpl::new(self, self.owner_object_id, amount))
230        }
231    }
232
233    /// Commits a previously reserved amount from this reservation.  The caller is responsible for
234    /// ensuring the amount was reserved.
235    pub fn commit(&self, amount: u64) {
236        let mut inner = self.inner.lock();
237        inner.reserved -= amount;
238        inner.amount -= amount;
239    }
240
241    /// Returns some of the reservation.
242    pub fn give_back(&self, amount: u64) {
243        self.owner.borrow().release_reservation(self.owner_object_id, amount);
244        let mut inner = self.inner.lock();
245        inner.amount -= amount;
246        assert!(inner.reserved <= inner.amount);
247    }
248
249    /// Moves `amount` from this reservation to another reservation.
250    pub fn move_to<V: Borrow<W> + Clone + Send + Sync, W: ReservationOwner + ?Sized>(
251        &self,
252        other: &ReservationImpl<V, W>,
253        amount: u64,
254    ) {
255        assert_eq!(self.owner_object_id, other.owner_object_id());
256        let mut inner = self.inner.lock();
257        if let Some(amount) = inner.amount.checked_sub(amount) {
258            inner.amount = amount;
259        } else {
260            std::mem::drop(inner);
261            panic!("Insufficient reservation space");
262        }
263        other.add(amount);
264    }
265}
266
267impl<T: Borrow<U>, U: ReservationOwner + ?Sized> Drop for ReservationImpl<T, U> {
268    fn drop(&mut self) {
269        let inner = self.inner.get_mut();
270        assert_eq!(inner.reserved, 0);
271        let owner_object_id = self.owner_object_id;
272        if inner.amount > 0 {
273            self.owner
274                .borrow()
275                .release_reservation(owner_object_id, std::mem::take(&mut inner.amount));
276        }
277    }
278}
279
280impl<T: Borrow<U> + Send + Sync, U: ReservationOwner + ?Sized> ReservationOwner
281    for ReservationImpl<T, U>
282{
283    fn release_reservation(&self, owner_object_id: Option<u64>, amount: u64) {
284        // Sub-reservations should belong to the same owner (or lack thereof).
285        assert_eq!(owner_object_id, self.owner_object_id);
286        let mut inner = self.inner.lock();
287        assert!(inner.reserved >= amount, "{} >= {}", inner.reserved, amount);
288        inner.reserved -= amount;
289    }
290}
291
292pub type Reservation = ReservationImpl<Arc<dyn ReservationOwner>, dyn ReservationOwner>;
293
294pub type Hold<'a> = ReservationImpl<&'a Reservation, Reservation>;
295
296/// Our allocator implementation tracks extents with a reference count.  At time of writing, these
297/// reference counts should never exceed 1, but that might change with snapshots and clones.
298pub type AllocatorKey = AllocatorKeyV32;
299
300#[derive(Clone, Debug, Deserialize, Eq, Hash, PartialEq, Serialize, TypeFingerprint, Versioned)]
301#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
302pub struct AllocatorKeyV32 {
303    pub device_range: Range<u64>,
304}
305
306impl SortByU64 for AllocatorKey {
307    fn get_leading_u64(&self) -> u64 {
308        self.device_range.start
309    }
310}
311
312const EXTENT_HASH_BUCKET_SIZE: u64 = 1 * 1024 * 1024;
313
314pub struct AllocatorKeyPartitionIterator {
315    device_range: Range<u64>,
316}
317
318impl Iterator for AllocatorKeyPartitionIterator {
319    type Item = u64;
320
321    fn next(&mut self) -> Option<Self::Item> {
322        if self.device_range.start >= self.device_range.end {
323            None
324        } else {
325            let start = self.device_range.start;
326            self.device_range.start = start.saturating_add(EXTENT_HASH_BUCKET_SIZE);
327            let end = std::cmp::min(self.device_range.start, self.device_range.end);
328            let key = AllocatorKey { device_range: start..end };
329            let mut hasher = FxHasher::default();
330            key.hash(&mut hasher);
331            Some(hasher.finish())
332        }
333    }
334}
335
336impl FuzzyHash for AllocatorKey {
337    type Iter = AllocatorKeyPartitionIterator;
338
339    fn fuzzy_hash(&self) -> Self::Iter {
340        AllocatorKeyPartitionIterator {
341            device_range: round_down(self.device_range.start, EXTENT_HASH_BUCKET_SIZE)
342                ..round_up(self.device_range.end, EXTENT_HASH_BUCKET_SIZE).unwrap_or(u64::MAX),
343        }
344    }
345}
346
347impl AllocatorKey {
348    /// Returns a new key that is a lower bound suitable for use with merge_into.
349    pub fn lower_bound_for_merge_into(self: &AllocatorKey) -> AllocatorKey {
350        AllocatorKey { device_range: 0..self.device_range.start }
351    }
352}
353
354impl LayerKey for AllocatorKey {
355    fn merge_type(&self) -> MergeType {
356        MergeType::OptimizedMerge
357    }
358}
359
360impl OrdUpperBound for AllocatorKey {
361    fn cmp_upper_bound(&self, other: &AllocatorKey) -> std::cmp::Ordering {
362        self.device_range.end.cmp(&other.device_range.end)
363    }
364}
365
366impl OrdLowerBound for AllocatorKey {
367    fn cmp_lower_bound(&self, other: &AllocatorKey) -> std::cmp::Ordering {
368        // The ordering over range.end is significant here as it is used in
369        // the heap ordering that feeds into our merge function and
370        // a total ordering over range lets us remove a symmetry case from
371        // the allocator merge function.
372        self.device_range
373            .start
374            .cmp(&other.device_range.start)
375            .then(self.device_range.end.cmp(&other.device_range.end))
376    }
377}
378
379impl Ord for AllocatorKey {
380    fn cmp(&self, other: &AllocatorKey) -> std::cmp::Ordering {
381        self.device_range
382            .start
383            .cmp(&other.device_range.start)
384            .then(self.device_range.end.cmp(&other.device_range.end))
385    }
386}
387
388impl PartialOrd for AllocatorKey {
389    fn partial_cmp(&self, other: &AllocatorKey) -> Option<std::cmp::Ordering> {
390        Some(self.cmp(other))
391    }
392}
393
394impl RangeKey for AllocatorKey {
395    fn overlaps(&self, other: &Self) -> bool {
396        self.device_range.start < other.device_range.end
397            && self.device_range.end > other.device_range.start
398    }
399}
400
401/// Allocations are "owned" by a single ObjectStore and are reference counted
402/// (for future snapshot/clone support).
403pub type AllocatorValue = AllocatorValueV32;
404impl Value for AllocatorValue {
405    const DELETED_MARKER: Self = Self::None;
406}
407
408#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize, TypeFingerprint, Versioned)]
409#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
410pub enum AllocatorValueV32 {
411    // Tombstone variant indicating an extent is no longer allocated.
412    None,
413    // Used when we know there are no possible allocations below us in the stack.
414    // This is currently all the time. We used to have a related Delta type but
415    // it has been removed due to correctness issues (https://fxbug.dev/42179428).
416    Abs { count: u64, owner_object_id: u64 },
417}
418
419pub type AllocatorItem = Item<AllocatorKey, AllocatorValue>;
420
421/// Serialized information about the allocator.
422pub type AllocatorInfo = AllocatorInfoV32;
423
424#[derive(Debug, Default, Clone, Deserialize, Serialize, TypeFingerprint, Versioned)]
425pub struct AllocatorInfoV32 {
426    /// Holds the set of layer file object_id for the LSM tree (newest first).
427    pub layers: Vec<u64>,
428    /// Maps from owner_object_id to bytes allocated.
429    pub allocated_bytes: BTreeMap<u64, u64>,
430    /// Set of owner_object_id that we should ignore if found in layer files.  For now, this should
431    /// always be empty on-disk because we always do full compactions.
432    pub marked_for_deletion: HashSet<u64>,
433    /// The limit for the number of allocates bytes per `owner_object_id` whereas the value. If
434    /// there is no limit present here for an `owner_object_id` assume it is max u64.
435    pub limit_bytes: BTreeMap<u64, u64>,
436}
437
438const MAX_ALLOCATOR_INFO_SERIALIZED_SIZE: usize = 131_072;
439
440/// Computes the target maximum extent size based on the block size of the allocator.
441pub fn max_extent_size_for_block_size(block_size: u64) -> u64 {
442    // Each block in an extent contains an 8-byte checksum (which due to varint encoding is 9
443    // bytes), and a given extent record must be no larger DEFAULT_MAX_SERIALIZED_RECORD_SIZE.  We
444    // also need to leave a bit of room (arbitrarily, 64 bytes) for the rest of the extent's
445    // metadata.
446    block_size * (DEFAULT_MAX_SERIALIZED_RECORD_SIZE - 64) / 9
447}
448
449#[derive(Default)]
450struct AllocatorCounters {
451    num_flushes: u64,
452    last_flush_time: Option<std::time::SystemTime>,
453    persistent_layer_file_sizes: Vec<u64>,
454}
455
456pub struct Allocator {
457    filesystem: Weak<FxFilesystem>,
458    block_size: u64,
459    device_size: u64,
460    object_id: u64,
461    max_extent_size_bytes: u64,
462    tree: LSMTree<AllocatorKey, AllocatorValue>,
463    // A list of allocations which are temporary; i.e. they are not actually stored in the LSM tree,
464    // but we still don't want to be able to allocate over them.  This layer is merged into the
465    // allocator's LSM tree whilst reading it, so the allocations appear to exist in the LSM tree.
466    // This is used in a few places, for example to hold back allocations that have been added to a
467    // transaction but are not yet committed yet, or to prevent the allocation of a deleted extent
468    // until the device is flushed.
469    temporary_allocations: Arc<SkipListLayer<AllocatorKey, AllocatorValue>>,
470    inner: Mutex<Inner>,
471    allocation_mutex: futures::lock::Mutex<()>,
472    counters: Mutex<AllocatorCounters>,
473    maximum_offset: AtomicU64,
474}
475
476/// Tracks the different stages of byte allocations for an individual owner.
477#[derive(Debug, Default, PartialEq)]
478struct ByteTracking {
479    /// This value is the up-to-date count of the number of allocated bytes per owner_object_id
480    /// whereas the value in `Info::allocated_bytes` is the value as it was when we last flushed.
481    /// This field should be regarded as *untrusted*; it can be invalid due to filesystem
482    /// inconsistencies, and this is why it is `Saturating<u64>` rather than just `u64`.  Any
483    /// computations that use this value should typically propagate `Saturating` so that it is clear
484    /// the value is *untrusted*.
485    allocated_bytes: Saturating<u64>,
486
487    /// This value is the number of bytes allocated to uncommitted allocations.
488    /// (Bytes allocated, but not yet persisted to disk)
489    uncommitted_allocated_bytes: u64,
490
491    /// This value is the number of bytes allocated to reservations.
492    reserved_bytes: u64,
493
494    /// Committed deallocations that we cannot use until they are flushed to the device.  Each entry
495    /// in this list is the log file offset at which it was committed and an array of deallocations
496    /// that occurred at that time.
497    committed_deallocated_bytes: u64,
498}
499
500impl ByteTracking {
501    // Returns the total number of bytes that are taken either from reservations, allocations or
502    // uncommitted allocations.
503    fn used_bytes(&self) -> Saturating<u64> {
504        self.allocated_bytes + Saturating(self.uncommitted_allocated_bytes + self.reserved_bytes)
505    }
506
507    // Returns the amount that is not available to be allocated, which includes actually allocated
508    // bytes, bytes that have been allocated for a transaction but the transaction hasn't committed
509    // yet, and bytes that have been deallocated, but the device hasn't been flushed yet so we can't
510    // reuse those bytes yet.
511    fn unavailable_bytes(&self) -> Saturating<u64> {
512        self.allocated_bytes
513            + Saturating(self.uncommitted_allocated_bytes)
514            + Saturating(self.committed_deallocated_bytes)
515    }
516
517    // Like `unavailable_bytes`, but treats as available the bytes which have been deallocated and
518    // require a device flush to be reused.
519    fn unavailable_after_sync_bytes(&self) -> Saturating<u64> {
520        self.allocated_bytes + Saturating(self.uncommitted_allocated_bytes)
521    }
522}
523
524#[derive(Debug)]
525struct CommittedDeallocation {
526    // The offset at which this deallocation was committed.
527    log_file_offset: u64,
528    // The device range being deallocated.
529    range: Range<u64>,
530    // The owning object id which originally allocated it.
531    owner_object_id: u64,
532}
533
534struct Inner {
535    info: AllocatorInfo,
536
537    /// The allocator can only be opened if there have been no allocations and it has not already
538    /// been opened or initialized.
539    opened: bool,
540
541    /// When we allocate a range from RAM, we add it to Allocator::temporary_allocations.
542    /// This layer is added to layer_set in rebuild_strategy and take_for trim so we don't assume
543    /// the range is available.
544    /// If we apply the mutation, we move it from `temporary_allocations` to the LSMTree.
545    /// If we drop the mutation, we just delete it from `temporary_allocations` and call `free`.
546    ///
547    /// We need to be very careful about races when we move the allocation into the LSMTree.
548    /// A layer_set is a snapshot in time of a set of layers. Say:
549    ///  1. `rebuild_strategy` takes a layer_set that includes the mutable layer.
550    ///  2. A compaction operation seals that mutable layer and creates a new mutable layer.
551    ///     Note that the layer_set in rebuild_strategy doesn't have this new mutable layer.
552    ///  3. An apply_mutation operation adds the allocation to the new mutable layer.
553    ///     It then removes the allocation from `temporary_allocations`.
554    ///  4. `rebuild_strategy` iterates the layer_set, missing both the temporary mutation AND
555    ///     the record in the new mutable layer, making the allocated range available for double
556    ///     allocation and future filesystem corruption.
557    ///
558    /// We don't have (or want) a means to lock compctions during rebuild_strategy operations so
559    /// to avoid this scenario, we can't remove from temporary_allocations when we apply a mutation.
560    /// Instead we add them to dropped_temporary_allocations and make the actual removals from
561    /// `temporary_allocations` while holding the allocator lock. Because rebuild_strategy only
562    /// runs with this lock held, this guarantees that we don't remove entries from temporary
563    /// mutations while also iterating over it.
564    ///
565    /// A related issue happens when we deallocate a range. We use temporary_allocations this time
566    /// to prevent reuse until after the deallocation has been successfully flushed.
567    /// In this scenario we don't want to call 'free()' until the range is ready to use again.
568    dropped_temporary_allocations: Vec<Range<u64>>,
569
570    /// The per-owner counters for bytes at various stages of the data life-cycle. From initial
571    /// reservation through until the bytes are unallocated and eventually uncommitted.
572    owner_bytes: BTreeMap<u64, ByteTracking>,
573
574    /// This value is the number of bytes allocated to reservations but not tracked as part of a
575    /// particular volume.
576    unattributed_reserved_bytes: u64,
577
578    /// Committed deallocations that we cannot use until they are flushed to the device.
579    committed_deallocated: VecDeque<CommittedDeallocation>,
580
581    /// Bytes which are currently being trimmed.  These can still be allocated from, but the
582    /// allocation will block until the current batch of trimming is finished.
583    trim_reserved_bytes: u64,
584
585    /// While a trim is being performed, this listener is set.  When the current batch of extents
586    /// being trimmed have been released (and trim_reserved_bytes is 0), this is signaled.
587    /// This should only be listened to while the allocation_mutex is held.
588    trim_listener: Option<EventListener>,
589
590    /// This controls how we allocate our free space to manage fragmentation.
591    strategy: Box<strategy::BestFit>,
592
593    /// Tracks the number of allocations of size 1,2,...63,>=64.
594    allocation_size_histogram: [u64; 64],
595    /// Tracks which size bucket triggers rebuild_strategy.
596    rebuild_strategy_trigger_histogram: [u64; 64],
597
598    /// This is distinct from the set contained in `info`.  New entries are inserted *after* a
599    /// device has been flushed (it is not safe to reuse the space taken by a deleted volume prior
600    /// to this) and are removed after a major compaction.
601    marked_for_deletion: HashSet<u64>,
602
603    /// The set of volumes deleted that are waiting for a sync.
604    volumes_deleted_pending_sync: HashSet<u64>,
605}
606
607impl Inner {
608    fn allocated_bytes(&self) -> Saturating<u64> {
609        let mut total = Saturating(0);
610        for (_, bytes) in &self.owner_bytes {
611            total += bytes.allocated_bytes;
612        }
613        total
614    }
615
616    fn uncommitted_allocated_bytes(&self) -> u64 {
617        self.owner_bytes.values().map(|x| &x.uncommitted_allocated_bytes).sum()
618    }
619
620    fn reserved_bytes(&self) -> u64 {
621        self.owner_bytes.values().map(|x| &x.reserved_bytes).sum::<u64>()
622            + self.unattributed_reserved_bytes
623    }
624
625    fn owner_id_limit_bytes(&self, owner_object_id: u64) -> u64 {
626        match self.info.limit_bytes.get(&owner_object_id) {
627            Some(v) => *v,
628            None => u64::MAX,
629        }
630    }
631
632    fn owner_id_bytes_left(&self, owner_object_id: u64) -> u64 {
633        let limit = self.owner_id_limit_bytes(owner_object_id);
634        let used = self.owner_bytes.get(&owner_object_id).map_or(Saturating(0), |b| b.used_bytes());
635        (Saturating(limit) - used).0
636    }
637
638    // Returns the amount that is not available to be allocated, which includes actually allocated
639    // bytes, bytes that have been allocated for a transaction but the transaction hasn't committed
640    // yet, and bytes that have been deallocated, but the device hasn't been flushed yet so we can't
641    // reuse those bytes yet.
642    fn unavailable_bytes(&self) -> Saturating<u64> {
643        let mut total = Saturating(0);
644        for (_, bytes) in &self.owner_bytes {
645            total += bytes.unavailable_bytes();
646        }
647        total
648    }
649
650    // Returns the total number of bytes that are taken either from reservations, allocations or
651    // uncommitted allocations.
652    fn used_bytes(&self) -> Saturating<u64> {
653        let mut total = Saturating(0);
654        for (_, bytes) in &self.owner_bytes {
655            total += bytes.used_bytes();
656        }
657        total + Saturating(self.unattributed_reserved_bytes)
658    }
659
660    // Like `unavailable_bytes`, but treats as available the bytes which have been deallocated and
661    // require a device flush to be reused.
662    fn unavailable_after_sync_bytes(&self) -> Saturating<u64> {
663        let mut total = Saturating(0);
664        for (_, bytes) in &self.owner_bytes {
665            total += bytes.unavailable_after_sync_bytes();
666        }
667        total
668    }
669
670    // Returns the number of bytes which will be available after the current batch of trimming
671    // completes.
672    fn bytes_available_not_being_trimmed(&self, device_size: u64) -> Result<u64, Error> {
673        device_size
674            .checked_sub(
675                (self.unavailable_after_sync_bytes() + Saturating(self.trim_reserved_bytes)).0,
676            )
677            .ok_or_else(|| anyhow!(FxfsError::Inconsistent))
678    }
679
680    fn add_reservation(&mut self, owner_object_id: Option<u64>, amount: u64) {
681        match owner_object_id {
682            Some(owner) => self.owner_bytes.entry(owner).or_default().reserved_bytes += amount,
683            None => self.unattributed_reserved_bytes += amount,
684        };
685    }
686
687    fn remove_reservation(&mut self, owner_object_id: Option<u64>, amount: u64) {
688        match owner_object_id {
689            Some(owner) => {
690                let owner_entry = self.owner_bytes.entry(owner).or_default();
691                assert!(
692                    owner_entry.reserved_bytes >= amount,
693                    "{} >= {}",
694                    owner_entry.reserved_bytes,
695                    amount
696                );
697                owner_entry.reserved_bytes -= amount;
698            }
699            None => {
700                assert!(
701                    self.unattributed_reserved_bytes >= amount,
702                    "{} >= {}",
703                    self.unattributed_reserved_bytes,
704                    amount
705                );
706                self.unattributed_reserved_bytes -= amount
707            }
708        };
709    }
710}
711
712/// A container for a set of extents which are known to be free and can be trimmed.  Returned by
713/// `take_for_trimming`.
714pub struct TrimmableExtents<'a> {
715    allocator: &'a Allocator,
716    extents: Vec<Range<u64>>,
717    // The allocator can subscribe to this event to wait until these extents are dropped.  This way,
718    // we don't fail an allocation attempt if blocks are tied up for trimming; rather, we just wait
719    // until the batch is finished with and then proceed.
720    _drop_event: DropEvent,
721}
722
723impl<'a> TrimmableExtents<'a> {
724    pub fn extents(&self) -> &Vec<Range<u64>> {
725        &self.extents
726    }
727
728    // Also returns an EventListener which is signaled when this is dropped.
729    fn new(allocator: &'a Allocator) -> (Self, EventListener) {
730        let drop_event = DropEvent::new();
731        let listener = drop_event.listen();
732        (Self { allocator, extents: vec![], _drop_event: drop_event }, listener)
733    }
734
735    fn add_extent(&mut self, extent: Range<u64>) {
736        self.extents.push(extent);
737    }
738}
739
740impl<'a> Drop for TrimmableExtents<'a> {
741    fn drop(&mut self) {
742        let mut inner = self.allocator.inner.lock();
743        for device_range in std::mem::take(&mut self.extents) {
744            inner.strategy.free(device_range.clone()).expect("drop trim extent");
745            self.allocator
746                .temporary_allocations
747                .erase(&AllocatorKey { device_range: device_range.clone() });
748        }
749        inner.trim_reserved_bytes = 0;
750    }
751}
752
753impl Allocator {
754    pub fn new(filesystem: Arc<FxFilesystem>, object_id: u64) -> Allocator {
755        let block_size = filesystem.block_size();
756        // We expect device size to be a multiple of block size. Throw away any tail.
757        let device_size = round_down(filesystem.device().size(), block_size);
758        if device_size != filesystem.device().size() {
759            warn!("Device size is not block aligned. Rounding down.");
760        }
761        let max_extent_size_bytes = max_extent_size_for_block_size(filesystem.block_size());
762        let mut strategy = Box::new(strategy::BestFit::default());
763        strategy.free(0..device_size).expect("new fs");
764        Allocator {
765            filesystem: Arc::downgrade(&filesystem),
766            block_size,
767            device_size,
768            object_id,
769            max_extent_size_bytes,
770            tree: LSMTree::new(merge, Box::new(NullCache {})),
771            temporary_allocations: SkipListLayer::new(1024),
772            inner: Mutex::new(Inner {
773                info: AllocatorInfo::default(),
774                opened: false,
775                dropped_temporary_allocations: Vec::new(),
776                owner_bytes: BTreeMap::new(),
777                unattributed_reserved_bytes: 0,
778                committed_deallocated: VecDeque::new(),
779                trim_reserved_bytes: 0,
780                trim_listener: None,
781                strategy,
782                allocation_size_histogram: [0; 64],
783                rebuild_strategy_trigger_histogram: [0; 64],
784                marked_for_deletion: HashSet::new(),
785                volumes_deleted_pending_sync: HashSet::new(),
786            }),
787            allocation_mutex: futures::lock::Mutex::new(()),
788            counters: Mutex::new(AllocatorCounters::default()),
789            maximum_offset: AtomicU64::new(0),
790        }
791    }
792
793    pub fn tree(&self) -> &LSMTree<AllocatorKey, AllocatorValue> {
794        &self.tree
795    }
796
797    /// Returns an iterator that yields all allocations, filtering out tombstones and any
798    /// owner_object_id that have been marked as deleted.  If `committed_marked_for_deletion` is
799    /// true, then filter using the committed volumes marked for deletion rather than the in-memory
800    /// copy which excludes volumes that have been deleted but there hasn't been a sync yet.
801    pub async fn filter(
802        &self,
803        iter: impl LayerIterator<AllocatorKey, AllocatorValue>,
804        committed_marked_for_deletion: bool,
805    ) -> Result<impl LayerIterator<AllocatorKey, AllocatorValue>, Error> {
806        let marked_for_deletion = {
807            let inner = self.inner.lock();
808            if committed_marked_for_deletion {
809                &inner.info.marked_for_deletion
810            } else {
811                &inner.marked_for_deletion
812            }
813            .clone()
814        };
815        let iter =
816            filter_marked_for_deletion(filter_tombstones(iter).await?, marked_for_deletion).await?;
817        Ok(iter)
818    }
819
820    /// A histogram of allocation request sizes.
821    /// The index into the array is 'number of blocks'.
822    /// The last bucket is a catch-all for larger allocation requests.
823    pub fn allocation_size_histogram(&self) -> [u64; 64] {
824        self.inner.lock().allocation_size_histogram
825    }
826
827    /// Creates a new (empty) allocator.
828    pub async fn create(&self, transaction: &mut Transaction<'_>) -> Result<(), Error> {
829        // Mark the allocator as opened before creating the file because creating a new
830        // transaction requires a reservation.
831        assert_eq!(std::mem::replace(&mut self.inner.lock().opened, true), false);
832
833        let filesystem = self.filesystem.upgrade().unwrap();
834        let root_store = filesystem.root_store();
835        ObjectStore::create_object_with_id(
836            &root_store,
837            transaction,
838            self.object_id(),
839            HandleOptions::default(),
840            None,
841        )
842        .await?;
843        root_store.update_last_object_id(self.object_id());
844        Ok(())
845    }
846
847    // Opens the allocator.  This is not thread-safe; this should be called prior to
848    // replaying the allocator's mutations.
849    pub async fn open(self: &Arc<Self>) -> Result<(), Error> {
850        let filesystem = self.filesystem.upgrade().unwrap();
851        let root_store = filesystem.root_store();
852
853        {
854            let mut inner = self.inner.lock();
855
856            inner.strategy = Box::new(strategy::BestFit::default());
857        }
858
859        let handle =
860            ObjectStore::open_object(&root_store, self.object_id, HandleOptions::default(), None)
861                .await
862                .context("Failed to open allocator object")?;
863
864        if handle.get_size() > 0 {
865            let serialized_info = handle
866                .contents(MAX_ALLOCATOR_INFO_SERIALIZED_SIZE)
867                .await
868                .context("Failed to read AllocatorInfo")?;
869            let mut cursor = std::io::Cursor::new(serialized_info);
870            let (info, _version) = AllocatorInfo::deserialize_with_version(&mut cursor)
871                .context("Failed to deserialize AllocatorInfo")?;
872
873            let mut handles = Vec::new();
874            let mut layer_file_sizes = Vec::new();
875            let mut total_size = 0;
876            for object_id in &info.layers {
877                let handle = ObjectStore::open_object(
878                    &root_store,
879                    *object_id,
880                    HandleOptions::default(),
881                    None,
882                )
883                .await
884                .context("Failed to open allocator layer file")?;
885
886                let size = handle.get_size();
887                layer_file_sizes.push(size);
888                total_size += size;
889                handles.push(handle);
890            }
891
892            {
893                let mut inner = self.inner.lock();
894
895                // Check allocated_bytes fits within the device.
896                let mut device_bytes = self.device_size;
897                for (&owner_object_id, &bytes) in &info.allocated_bytes {
898                    ensure!(
899                        bytes <= device_bytes,
900                        anyhow!(FxfsError::Inconsistent).context(format!(
901                            "Allocated bytes exceeds device size: {:?}",
902                            info.allocated_bytes
903                        ))
904                    );
905                    device_bytes -= bytes;
906
907                    inner.owner_bytes.entry(owner_object_id).or_default().allocated_bytes =
908                        Saturating(bytes);
909                }
910
911                inner.info = info;
912            }
913
914            self.counters.lock().persistent_layer_file_sizes = layer_file_sizes;
915            self.tree.append_layers(handles).await.context("Failed to append allocator layers")?;
916            self.filesystem.upgrade().unwrap().object_manager().update_reservation(
917                self.object_id,
918                tree::reservation_amount_from_layer_size(total_size),
919            );
920        }
921
922        Ok(())
923    }
924
925    pub async fn on_replay_complete(self: &Arc<Self>) -> Result<(), Error> {
926        // We can assume the device has been flushed.
927        {
928            let mut inner = self.inner.lock();
929            inner.volumes_deleted_pending_sync.clear();
930            inner.marked_for_deletion = inner.info.marked_for_deletion.clone();
931        }
932
933        // Build free extent structure from disk. For now, we expect disks to have *some* free
934        // space at all times. This may change if we ever support mounting of read-only
935        // redistributable filesystem images.
936        if !self.rebuild_strategy().await.context("Build free extents")? {
937            error!("Device contains no free space.");
938            return Err(FxfsError::Inconsistent).context("Device appears to contain no free space");
939        }
940
941        assert_eq!(std::mem::replace(&mut self.inner.lock().opened, true), false);
942        Ok(())
943    }
944
945    /// Walk all allocations to generate the set of free regions between allocations.
946    /// It is safe to re-run this on live filesystems but it should not be called concurrently
947    /// with allocations, trims or other rebuild_strategy invocations -- use allocation_mutex.
948    /// Returns true if the rebuild made changes to either the free ranges or overflow markers.
949    async fn rebuild_strategy(self: &Arc<Self>) -> Result<bool, Error> {
950        let mut changed = false;
951        let mut layer_set = self.tree.empty_layer_set();
952        layer_set.layers.push((self.temporary_allocations.clone() as Arc<dyn Layer<_, _>>).into());
953        self.tree.add_all_layers_to_layer_set(&mut layer_set);
954
955        let overflow_markers = self.inner.lock().strategy.overflow_markers();
956        self.inner.lock().strategy.reset_overflow_markers();
957
958        let mut to_add = Vec::new();
959        let mut merger = layer_set.merger();
960        let mut iter = self.filter(merger.query(Query::FullScan).await?, false).await?;
961        let mut last_offset = 0;
962        while last_offset < self.device_size {
963            let next_range = match iter.get() {
964                None => {
965                    assert!(last_offset <= self.device_size);
966                    let range = last_offset..self.device_size;
967                    last_offset = self.device_size;
968                    iter.advance().await?;
969                    range
970                }
971                Some(ItemRef { key: AllocatorKey { device_range, .. }, .. }) => {
972                    if device_range.end < last_offset {
973                        iter.advance().await?;
974                        continue;
975                    }
976                    if device_range.start <= last_offset {
977                        last_offset = device_range.end;
978                        iter.advance().await?;
979                        continue;
980                    }
981                    let range = last_offset..device_range.start;
982                    last_offset = device_range.end;
983                    iter.advance().await?;
984                    range
985                }
986            };
987            to_add.push(next_range);
988            // Avoid taking a lock on inner for every free range.
989            if to_add.len() > 100 {
990                let mut inner = self.inner.lock();
991                for range in to_add.drain(..) {
992                    changed |= inner.strategy.force_free(range)?;
993                }
994            }
995        }
996        let mut inner = self.inner.lock();
997        for range in to_add {
998            changed |= inner.strategy.force_free(range)?;
999        }
1000        if overflow_markers != inner.strategy.overflow_markers() {
1001            changed = true;
1002        }
1003        Ok(changed)
1004    }
1005
1006    /// Collects up to `extents_per_batch` free extents of size up to `max_extent_size` from
1007    /// `offset`.  The extents will be reserved.
1008    /// Note that only one `FreeExtents` can exist for the allocator at any time.
1009    pub async fn take_for_trimming(
1010        &self,
1011        offset: u64,
1012        max_extent_size: usize,
1013        extents_per_batch: usize,
1014    ) -> Result<TrimmableExtents<'_>, Error> {
1015        let _guard = self.allocation_mutex.lock().await;
1016
1017        let (mut result, listener) = TrimmableExtents::new(self);
1018        let mut bytes = 0;
1019
1020        // We can't just use self.strategy here because it doesn't necessarily hold all
1021        // free extent metadata in RAM.
1022        let mut layer_set = self.tree.empty_layer_set();
1023        layer_set.layers.push((self.temporary_allocations.clone() as Arc<dyn Layer<_, _>>).into());
1024        self.tree.add_all_layers_to_layer_set(&mut layer_set);
1025        let mut merger = layer_set.merger();
1026        let mut iter = self
1027            .filter(
1028                merger.query(Query::FullRange(&AllocatorKey { device_range: offset..0 })).await?,
1029                false,
1030            )
1031            .await?;
1032        let mut last_offset = offset;
1033        'outer: while last_offset < self.device_size {
1034            let mut range = match iter.get() {
1035                None => {
1036                    assert!(last_offset <= self.device_size);
1037                    let range = last_offset..self.device_size;
1038                    last_offset = self.device_size;
1039                    iter.advance().await?;
1040                    range
1041                }
1042                Some(ItemRef { key: AllocatorKey { device_range, .. }, .. }) => {
1043                    if device_range.end <= last_offset {
1044                        iter.advance().await?;
1045                        continue;
1046                    }
1047                    if device_range.start <= last_offset {
1048                        last_offset = device_range.end;
1049                        iter.advance().await?;
1050                        continue;
1051                    }
1052                    let range = last_offset..device_range.start;
1053                    last_offset = device_range.end;
1054                    iter.advance().await?;
1055                    range
1056                }
1057            };
1058            if range.start < offset {
1059                continue;
1060            }
1061
1062            // 'range' is based on the on-disk LSM tree. We need to check against any uncommitted
1063            // allocations and remove temporarily allocate the range for ourselves.
1064            let mut inner = self.inner.lock();
1065
1066            while range.start < range.end {
1067                let prefix =
1068                    range.start..std::cmp::min(range.start + max_extent_size as u64, range.end);
1069                range = prefix.end..range.end;
1070                bytes += prefix.length()?;
1071                // We can assume the following are safe because we've checked both LSM tree and
1072                // temporary_allocations while holding the lock.
1073                inner.strategy.remove(prefix.clone());
1074                self.temporary_allocations.insert(AllocatorItem {
1075                    key: AllocatorKey { device_range: prefix.clone() },
1076                    value: AllocatorValue::Abs { owner_object_id: INVALID_OBJECT_ID, count: 1 },
1077                    sequence: 0,
1078                })?;
1079                result.add_extent(prefix);
1080                if result.extents.len() == extents_per_batch {
1081                    break 'outer;
1082                }
1083            }
1084            if result.extents.len() == extents_per_batch {
1085                break 'outer;
1086            }
1087        }
1088        {
1089            let mut inner = self.inner.lock();
1090
1091            assert!(inner.trim_reserved_bytes == 0, "Multiple trims ongoing");
1092            inner.trim_listener = Some(listener);
1093            inner.trim_reserved_bytes = bytes;
1094            debug_assert!(
1095                (Saturating(inner.trim_reserved_bytes) + inner.unavailable_bytes()).0
1096                    <= self.device_size
1097            );
1098        }
1099        Ok(result)
1100    }
1101
1102    /// Returns all objects that exist in the parent store that pertain to this allocator.
1103    pub fn parent_objects(&self) -> Vec<u64> {
1104        // The allocator tree needs to store a file for each of the layers in the tree, so we return
1105        // those, since nothing else references them.
1106        self.inner.lock().info.layers.clone()
1107    }
1108
1109    /// Returns all the current owner byte limits (in pairs of `(owner_object_id, bytes)`).
1110    pub fn owner_byte_limits(&self) -> Vec<(u64, u64)> {
1111        self.inner.lock().info.limit_bytes.iter().map(|(k, v)| (*k, *v)).collect()
1112    }
1113
1114    /// Returns (allocated_bytes, byte_limit) for the given owner.
1115    pub fn owner_allocation_info(&self, owner_object_id: u64) -> (u64, Option<u64>) {
1116        let inner = self.inner.lock();
1117        (
1118            inner.owner_bytes.get(&owner_object_id).map(|b| b.used_bytes().0).unwrap_or(0u64),
1119            inner.info.limit_bytes.get(&owner_object_id).copied(),
1120        )
1121    }
1122
1123    /// Returns owner bytes debug information.
1124    pub fn owner_bytes_debug(&self) -> String {
1125        format!("{:?}", self.inner.lock().owner_bytes)
1126    }
1127
1128    fn needs_sync(&self) -> bool {
1129        // TODO(https://fxbug.dev/42178048): This will only trigger if *all* free space is taken up with
1130        // committed deallocated bytes, but we might want to trigger a sync if we're low and there
1131        // happens to be a lot of deallocated bytes as that might mean we can fully satisfy
1132        // allocation requests.
1133        let inner = self.inner.lock();
1134        inner.unavailable_bytes().0 >= self.device_size
1135    }
1136
1137    fn is_system_store(&self, owner_object_id: u64) -> bool {
1138        let fs = self.filesystem.upgrade().unwrap();
1139        owner_object_id == fs.object_manager().root_store_object_id()
1140            || owner_object_id == fs.object_manager().root_parent_store_object_id()
1141    }
1142
1143    /// Updates the accounting to track that a byte reservation has been moved out of an owner to
1144    /// the unattributed pool.
1145    pub fn disown_reservation(&self, old_owner_object_id: Option<u64>, amount: u64) {
1146        if old_owner_object_id.is_none() || amount == 0 {
1147            return;
1148        }
1149        // These 2 mutations should behave as though they're a single atomic mutation.
1150        let mut inner = self.inner.lock();
1151        inner.remove_reservation(old_owner_object_id, amount);
1152        inner.add_reservation(None, amount);
1153    }
1154
1155    /// Creates a lazy inspect node named `str` under `parent` which will yield statistics for the
1156    /// allocator when queried.
1157    pub fn track_statistics(self: &Arc<Self>, parent: &fuchsia_inspect::Node, name: &str) {
1158        let this = Arc::downgrade(self);
1159        parent.record_lazy_child(name, move || {
1160            let this_clone = this.clone();
1161            async move {
1162                let inspector = fuchsia_inspect::Inspector::default();
1163                if let Some(this) = this_clone.upgrade() {
1164                    let counters = this.counters.lock();
1165                    let root = inspector.root();
1166                    root.record_uint("max_extent_size_bytes", this.max_extent_size_bytes);
1167                    root.record_uint("bytes_total", this.device_size);
1168                    let (allocated, reserved, used, unavailable) = {
1169                        // TODO(https://fxbug.dev/42069513): Push-back or rate-limit to prevent DoS.
1170                        let inner = this.inner.lock();
1171                        (
1172                            inner.allocated_bytes().0,
1173                            inner.reserved_bytes(),
1174                            inner.used_bytes().0,
1175                            inner.unavailable_bytes().0,
1176                        )
1177                    };
1178                    root.record_uint("bytes_allocated", allocated);
1179                    root.record_uint("bytes_reserved", reserved);
1180                    root.record_uint("bytes_used", used);
1181                    root.record_uint("bytes_unavailable", unavailable);
1182
1183                    // TODO(https://fxbug.dev/42068224): Post-compute rather than manually computing
1184                    // metrics.
1185                    if let Some(x) = round_div(100 * allocated, this.device_size) {
1186                        root.record_uint("bytes_allocated_percent", x);
1187                    }
1188                    if let Some(x) = round_div(100 * reserved, this.device_size) {
1189                        root.record_uint("bytes_reserved_percent", x);
1190                    }
1191                    if let Some(x) = round_div(100 * used, this.device_size) {
1192                        root.record_uint("bytes_used_percent", x);
1193                    }
1194                    if let Some(x) = round_div(100 * unavailable, this.device_size) {
1195                        root.record_uint("bytes_unavailable_percent", x);
1196                    }
1197
1198                    root.record_uint("num_flushes", counters.num_flushes);
1199                    if let Some(last_flush_time) = counters.last_flush_time.as_ref() {
1200                        root.record_uint(
1201                            "last_flush_time_ms",
1202                            last_flush_time
1203                                .duration_since(std::time::UNIX_EPOCH)
1204                                .unwrap_or(std::time::Duration::ZERO)
1205                                .as_millis()
1206                                .try_into()
1207                                .unwrap_or(0u64),
1208                        );
1209                    }
1210                    let sizes = root.create_uint_array(
1211                        "persistent_layer_file_sizes",
1212                        counters.persistent_layer_file_sizes.len(),
1213                    );
1214                    for i in 0..counters.persistent_layer_file_sizes.len() {
1215                        sizes.set(i, counters.persistent_layer_file_sizes[i]);
1216                    }
1217                    root.record(sizes);
1218
1219                    let data = this.allocation_size_histogram();
1220                    let alloc_sizes = root.create_uint_linear_histogram(
1221                        "allocation_size_histogram",
1222                        fuchsia_inspect::LinearHistogramParams {
1223                            floor: 1,
1224                            step_size: 1,
1225                            buckets: 64,
1226                        },
1227                    );
1228                    for (i, count) in data.iter().enumerate() {
1229                        if i != 0 {
1230                            alloc_sizes.insert_multiple(i as u64, *count as usize);
1231                        }
1232                    }
1233                    root.record(alloc_sizes);
1234
1235                    let data = this.inner.lock().rebuild_strategy_trigger_histogram;
1236                    let triggers = root.create_uint_linear_histogram(
1237                        "rebuild_strategy_triggers",
1238                        fuchsia_inspect::LinearHistogramParams {
1239                            floor: 1,
1240                            step_size: 1,
1241                            buckets: 64,
1242                        },
1243                    );
1244                    for (i, count) in data.iter().enumerate() {
1245                        if i != 0 {
1246                            triggers.insert_multiple(i as u64, *count as usize);
1247                        }
1248                    }
1249                    root.record(triggers);
1250                }
1251                Ok(inspector)
1252            }
1253            .boxed()
1254        });
1255    }
1256
1257    /// Returns the offset of the first byte which has not been used by the allocator since its
1258    /// creation.
1259    /// NB: This does *not* take into account existing allocations.  This is only reliable when the
1260    /// allocator was created from scratch, without any pre-existing allocations.
1261    pub fn maximum_offset(&self) -> u64 {
1262        self.maximum_offset.load(Ordering::Relaxed)
1263    }
1264}
1265
1266impl Drop for Allocator {
1267    fn drop(&mut self) {
1268        let inner = self.inner.lock();
1269        // Uncommitted and reserved should be released back using RAII, so they should be zero.
1270        assert_eq!(inner.uncommitted_allocated_bytes(), 0);
1271        assert_eq!(inner.reserved_bytes(), 0);
1272    }
1273}
1274
1275#[fxfs_trace::trace]
1276impl Allocator {
1277    /// Returns the object ID for the allocator.
1278    pub fn object_id(&self) -> u64 {
1279        self.object_id
1280    }
1281
1282    /// Returns information about the allocator such as the layer files storing persisted
1283    /// allocations.
1284    pub fn info(&self) -> AllocatorInfo {
1285        self.inner.lock().info.clone()
1286    }
1287
1288    /// Tries to allocate enough space for |object_range| in the specified object and returns the
1289    /// device range allocated.
1290    /// The allocated range may be short (e.g. due to fragmentation), in which case the caller can
1291    /// simply call allocate again until they have enough blocks.
1292    ///
1293    /// We also store the object store ID of the store that the allocation should be assigned to so
1294    /// that we have a means to delete encrypted stores without needing the encryption key.
1295    #[trace]
1296    pub async fn allocate(
1297        self: &Arc<Self>,
1298        transaction: &mut Transaction<'_>,
1299        owner_object_id: u64,
1300        mut len: u64,
1301    ) -> Result<Range<u64>, Error> {
1302        assert_eq!(len % self.block_size, 0);
1303        len = std::cmp::min(len, self.max_extent_size_bytes);
1304        debug_assert_ne!(owner_object_id, INVALID_OBJECT_ID);
1305
1306        // Make sure we have space reserved before we try and find the space.
1307        let reservation = if let Some(reservation) = transaction.allocator_reservation {
1308            match reservation.owner_object_id {
1309                // If there is no owner, this must be a system store that we're allocating for.
1310                None => assert!(self.is_system_store(owner_object_id)),
1311                // If it has an owner, it should not be different than the allocating owner.
1312                Some(res_owner_object_id) => assert_eq!(owner_object_id, res_owner_object_id),
1313            };
1314            // Reservation limits aren't necessarily a multiple of the block size.
1315            let r = reservation
1316                .reserve_with(|limit| std::cmp::min(len, round_down(limit, self.block_size)));
1317            len = r.amount();
1318            Left(r)
1319        } else {
1320            let mut inner = self.inner.lock();
1321            assert!(inner.opened);
1322            // Do not exceed the limit for the owner or the device.
1323            let device_used = inner.used_bytes();
1324            let owner_bytes_left = inner.owner_id_bytes_left(owner_object_id);
1325            // We must take care not to use up space that might be reserved.
1326            let limit =
1327                std::cmp::min(owner_bytes_left, (Saturating(self.device_size) - device_used).0);
1328            len = round_down(std::cmp::min(len, limit), self.block_size);
1329            let owner_entry = inner.owner_bytes.entry(owner_object_id).or_default();
1330            owner_entry.reserved_bytes += len;
1331            Right(ReservationImpl::<_, Self>::new(&**self, Some(owner_object_id), len))
1332        };
1333
1334        ensure!(len > 0, FxfsError::NoSpace);
1335
1336        // If volumes have been deleted, flush the device so that we can use any of the freed space.
1337        let volumes_deleted = {
1338            let inner = self.inner.lock();
1339            (!inner.volumes_deleted_pending_sync.is_empty())
1340                .then(|| inner.volumes_deleted_pending_sync.clone())
1341        };
1342
1343        if let Some(volumes_deleted) = volumes_deleted {
1344            // No locks are held here, so in theory, there could be unnecessary syncs, but it
1345            // should be sufficiently rare that it won't matter.
1346            self.filesystem
1347                .upgrade()
1348                .unwrap()
1349                .sync(SyncOptions {
1350                    flush_device: true,
1351                    precondition: Some(Box::new(|| {
1352                        !self.inner.lock().volumes_deleted_pending_sync.is_empty()
1353                    })),
1354                    ..Default::default()
1355                })
1356                .await?;
1357
1358            {
1359                let mut inner = self.inner.lock();
1360                for owner_id in volumes_deleted {
1361                    inner.volumes_deleted_pending_sync.remove(&owner_id);
1362                    inner.marked_for_deletion.insert(owner_id);
1363                }
1364            }
1365
1366            let _guard = self.allocation_mutex.lock().await;
1367            self.rebuild_strategy().await?;
1368        }
1369
1370        #[allow(clippy::never_loop)] // Loop used as a for {} else {}.
1371        let _guard = 'sync: loop {
1372            // Cap number of sync attempts before giving up on finding free space.
1373            for _ in 0..10 {
1374                {
1375                    let guard = self.allocation_mutex.lock().await;
1376
1377                    if !self.needs_sync() {
1378                        break 'sync guard;
1379                    }
1380                }
1381
1382                // All the free space is currently tied up with deallocations, so we need to sync
1383                // and flush the device to free that up.
1384                //
1385                // We can't hold the allocation lock whilst we sync here because the allocation lock
1386                // is also taken in apply_mutations, which is called when journal locks are held,
1387                // and we call sync here which takes those same locks, so it would have the
1388                // potential to result in a deadlock.  Sync holds its own lock to guard against
1389                // multiple syncs occurring at the same time, and we can supply a precondition that
1390                // is evaluated under that lock to ensure we don't sync twice if we don't need to.
1391                self.filesystem
1392                    .upgrade()
1393                    .unwrap()
1394                    .sync(SyncOptions {
1395                        flush_device: true,
1396                        precondition: Some(Box::new(|| self.needs_sync())),
1397                        ..Default::default()
1398                    })
1399                    .await?;
1400            }
1401            bail!(
1402                anyhow!(FxfsError::NoSpace).context("Sync failed to yield sufficient free space.")
1403            );
1404        };
1405
1406        let mut trim_listener = None;
1407        {
1408            let mut inner = self.inner.lock();
1409            inner.allocation_size_histogram[std::cmp::min(63, len / self.block_size) as usize] += 1;
1410
1411            // If trimming would be the reason that this allocation gets cut short, wait for
1412            // trimming to complete before proceeding.
1413            let avail = self
1414                .device_size
1415                .checked_sub(inner.unavailable_bytes().0)
1416                .ok_or(FxfsError::Inconsistent)?;
1417            let free_and_not_being_trimmed =
1418                inner.bytes_available_not_being_trimmed(self.device_size)?;
1419            if free_and_not_being_trimmed < std::cmp::min(len, avail) {
1420                debug_assert!(inner.trim_reserved_bytes > 0);
1421                trim_listener = std::mem::take(&mut inner.trim_listener);
1422            }
1423        }
1424
1425        if let Some(listener) = trim_listener {
1426            listener.await;
1427        }
1428
1429        let result = loop {
1430            {
1431                let mut inner = self.inner.lock();
1432
1433                // While we know rebuild_strategy and take_for_trim are not running (we hold guard),
1434                // apply temporary_allocation removals.
1435                for device_range in inner.dropped_temporary_allocations.drain(..) {
1436                    self.temporary_allocations.erase(&AllocatorKey { device_range });
1437                }
1438
1439                match inner.strategy.allocate(len) {
1440                    Err(FxfsError::NotFound) => {
1441                        // Overflow. Fall through and rebuild
1442                        inner.rebuild_strategy_trigger_histogram
1443                            [std::cmp::min(63, (len / self.block_size) as usize)] += 1;
1444                    }
1445                    Err(err) => {
1446                        error!(err:%; "Likely filesystem corruption.");
1447                        return Err(err.into());
1448                    }
1449                    Ok(x) => {
1450                        break x;
1451                    }
1452                }
1453            }
1454            // We've run out of extents of the requested length in RAM but there
1455            // exists more of this size on device. Rescan device and circle back.
1456            // We already hold the allocation_mutex, so exclusive access is guaranteed.
1457            if !self.rebuild_strategy().await? {
1458                error!("Cannot find additional free space. Corruption?");
1459                return Err(FxfsError::Inconsistent.into());
1460            }
1461        };
1462
1463        debug!(device_range:? = result; "allocate");
1464
1465        let len = result.length().unwrap();
1466        let reservation_owner = reservation.either(
1467            // Left means we got an outside reservation.
1468            |l| {
1469                l.forget_some(len);
1470                l.owner_object_id()
1471            },
1472            |r| {
1473                r.forget_some(len);
1474                r.owner_object_id()
1475            },
1476        );
1477
1478        {
1479            let mut inner = self.inner.lock();
1480            let owner_entry = inner.owner_bytes.entry(owner_object_id).or_default();
1481            owner_entry.uncommitted_allocated_bytes += len;
1482            // If the reservation has an owner, ensure they are the same.
1483            assert_eq!(owner_object_id, reservation_owner.unwrap_or(owner_object_id));
1484            inner.remove_reservation(reservation_owner, len);
1485            self.temporary_allocations.insert(AllocatorItem {
1486                key: AllocatorKey { device_range: result.clone() },
1487                value: AllocatorValue::Abs { owner_object_id, count: 1 },
1488                sequence: 0,
1489            })?;
1490        }
1491
1492        let mutation =
1493            AllocatorMutation::Allocate { device_range: result.clone().into(), owner_object_id };
1494        assert!(transaction.add(self.object_id(), Mutation::Allocator(mutation)).is_none());
1495
1496        Ok(result)
1497    }
1498
1499    /// Marks the given device range as allocated.  The main use case for this at this time is for
1500    /// the super-block which needs to be at a fixed location on the device.
1501    #[trace]
1502    pub async fn mark_allocated(
1503        &self,
1504        transaction: &mut Transaction<'_>,
1505        owner_object_id: u64,
1506        device_range: Range<u64>,
1507    ) -> Result<(), Error> {
1508        debug_assert_ne!(owner_object_id, INVALID_OBJECT_ID);
1509        {
1510            let len = device_range.length().map_err(|_| FxfsError::InvalidArgs)?;
1511
1512            let mut inner = self.inner.lock();
1513            let device_used = inner.used_bytes();
1514            let owner_id_bytes_left = inner.owner_id_bytes_left(owner_object_id);
1515            let owner_entry = inner.owner_bytes.entry(owner_object_id).or_default();
1516            ensure!(
1517                device_range.end <= self.device_size
1518                    && (Saturating(self.device_size) - device_used).0 >= len
1519                    && owner_id_bytes_left >= len,
1520                FxfsError::NoSpace
1521            );
1522            if let Some(reservation) = &mut transaction.allocator_reservation {
1523                // The transaction takes ownership of this hold.
1524                reservation.reserve(len).ok_or(FxfsError::NoSpace)?.forget();
1525            }
1526            owner_entry.uncommitted_allocated_bytes += len;
1527            inner.strategy.remove(device_range.clone());
1528            self.temporary_allocations.insert(AllocatorItem {
1529                key: AllocatorKey { device_range: device_range.clone() },
1530                value: AllocatorValue::Abs { owner_object_id, count: 1 },
1531                sequence: 0,
1532            })?;
1533        }
1534        let mutation =
1535            AllocatorMutation::Allocate { device_range: device_range.into(), owner_object_id };
1536        transaction.add(self.object_id(), Mutation::Allocator(mutation));
1537        Ok(())
1538    }
1539
1540    /// Sets the limits for an owner object in terms of usage.
1541    pub async fn set_bytes_limit(
1542        &self,
1543        transaction: &mut Transaction<'_>,
1544        owner_object_id: u64,
1545        bytes: u64,
1546    ) -> Result<(), Error> {
1547        // System stores cannot be given limits.
1548        assert!(!self.is_system_store(owner_object_id));
1549        transaction.add(
1550            self.object_id(),
1551            Mutation::Allocator(AllocatorMutation::SetLimit { owner_object_id, bytes }),
1552        );
1553        Ok(())
1554    }
1555
1556    /// Gets the bytes limit for an owner object.
1557    pub async fn get_bytes_limit(&self, owner_object_id: u64) -> Option<u64> {
1558        self.inner.lock().info.limit_bytes.get(&owner_object_id).copied()
1559    }
1560
1561    /// Deallocates the given device range for the specified object.
1562    #[trace]
1563    pub async fn deallocate(
1564        &self,
1565        transaction: &mut Transaction<'_>,
1566        owner_object_id: u64,
1567        dealloc_range: Range<u64>,
1568    ) -> Result<u64, Error> {
1569        debug!(device_range:? = dealloc_range; "deallocate");
1570        ensure!(dealloc_range.is_valid(), FxfsError::InvalidArgs);
1571        // We don't currently support sharing of allocations (value.count always equals 1), so as
1572        // long as we can assume the deallocated range is actually allocated, we can avoid device
1573        // access.
1574        let deallocated = dealloc_range.end - dealloc_range.start;
1575        let mutation = AllocatorMutation::Deallocate {
1576            device_range: dealloc_range.clone().into(),
1577            owner_object_id,
1578        };
1579        transaction.add(self.object_id(), Mutation::Allocator(mutation));
1580
1581        let _guard = self.allocation_mutex.lock().await;
1582
1583        // We use `dropped_temporary_allocations` to defer removals from `temporary_allocations` in
1584        // places where we can't execute async code or take locks.
1585        //
1586        // It's important we don't ever remove entries from `temporary_allocations` without
1587        // holding the `allocation_mutex` lock or else we may end up with an inconsistent view of
1588        // available disk space when we combine temporary_allocations with the LSMTree.
1589        // This is normally done in `allocate()` but we also need to apply these here because
1590        // `temporary_allocations` is also used to track deallocated space until it has been
1591        // flushed (see comment below). A user may allocate and then deallocate space before calling
1592        // allocate() a second time, so if we do not clean up here, we may end up with the same
1593        // range in temporary_allocations twice (once for allocate, once for deallocate).
1594        let mut inner = self.inner.lock();
1595        for device_range in inner.dropped_temporary_allocations.drain(..) {
1596            self.temporary_allocations.erase(&AllocatorKey { device_range });
1597        }
1598
1599        // We can't reuse deallocated space immediately because failure to successfully flush will
1600        // mean that on next mount, we may find this space is still assigned to the deallocated
1601        // region. To avoid immediate reuse, we hold these regions in 'temporary_allocations' until
1602        // after a successful flush so we know the region is safe to reuse.
1603        self.temporary_allocations
1604            .insert(AllocatorItem {
1605                key: AllocatorKey { device_range: dealloc_range.clone() },
1606                value: AllocatorValue::Abs { owner_object_id, count: 1 },
1607                sequence: 0,
1608            })
1609            .context("tracking deallocated")?;
1610
1611        Ok(deallocated)
1612    }
1613
1614    /// Marks allocations associated with a given |owner_object_id| for deletion.
1615    ///
1616    /// This is used as part of deleting encrypted volumes (ObjectStore) without having the keys.
1617    ///
1618    /// MarkForDeletion mutations eventually manipulates allocator metadata (AllocatorInfo) instead
1619    /// of the mutable layer but we must be careful not to do this too early and risk premature
1620    /// reuse of extents.
1621    ///
1622    /// Replay is not guaranteed until the *device* gets flushed, so we cannot reuse the deleted
1623    /// extents until we've flushed the device.
1624    ///
1625    /// TODO(b/316827348): Consider removing the use of mark_for_deletion in AllocatorInfo and
1626    /// just compacting?
1627    ///
1628    /// After an allocator.flush() (i.e. a major compaction), we know that there is no data left
1629    /// in the layer files for this owner_object_id and we are able to clear `marked_for_deletion`.
1630    pub async fn mark_for_deletion(&self, transaction: &mut Transaction<'_>, owner_object_id: u64) {
1631        // Note that because the actual time of deletion (the next major compaction) is undefined,
1632        // |owner_object_id| should not be reused after this call.
1633        transaction.add(
1634            self.object_id(),
1635            Mutation::Allocator(AllocatorMutation::MarkForDeletion(owner_object_id)),
1636        );
1637    }
1638
1639    /// Called when the device has been flush and indicates what the journal log offset was when
1640    /// that happened.
1641    pub async fn did_flush_device(&self, flush_log_offset: u64) {
1642        // First take out the deallocations that we now know to be flushed.  The list is maintained
1643        // in order, so we can stop on the first entry that we find that should not be unreserved
1644        // yet.
1645        #[allow(clippy::never_loop)] // Loop used as a for {} else {}.
1646        let deallocs = 'deallocs_outer: loop {
1647            let mut inner = self.inner.lock();
1648            for (index, dealloc) in inner.committed_deallocated.iter().enumerate() {
1649                if dealloc.log_file_offset >= flush_log_offset {
1650                    let mut deallocs = inner.committed_deallocated.split_off(index);
1651                    // Swap because we want the opposite of what split_off does.
1652                    std::mem::swap(&mut inner.committed_deallocated, &mut deallocs);
1653                    break 'deallocs_outer deallocs;
1654                }
1655            }
1656            break std::mem::take(&mut inner.committed_deallocated);
1657        };
1658
1659        // Now we can free those elements.
1660        let mut inner = self.inner.lock();
1661        let mut totals = BTreeMap::<u64, u64>::new();
1662        for dealloc in deallocs {
1663            *(totals.entry(dealloc.owner_object_id).or_default()) +=
1664                dealloc.range.length().unwrap();
1665            inner.strategy.free(dealloc.range.clone()).expect("dealloced ranges");
1666            self.temporary_allocations.erase(&AllocatorKey { device_range: dealloc.range.clone() });
1667        }
1668
1669        // This *must* come after we've removed the records from reserved reservations because the
1670        // allocator uses this value to decide whether or not a device-flush is required and it must
1671        // be possible to find free space if it thinks no device-flush is required.
1672        for (owner_object_id, total) in totals {
1673            match inner.owner_bytes.get_mut(&owner_object_id) {
1674                Some(counters) => counters.committed_deallocated_bytes -= total,
1675                None => panic!("Failed to decrement for unknown owner: {}", owner_object_id),
1676            }
1677        }
1678    }
1679
1680    /// Returns a reservation that can be used later, or None if there is insufficient space. The
1681    /// |owner_object_id| indicates which object in the root object store the reservation is for.
1682    pub fn reserve(
1683        self: Arc<Self>,
1684        owner_object_id: Option<u64>,
1685        amount: u64,
1686    ) -> Option<Reservation> {
1687        {
1688            let mut inner = self.inner.lock();
1689
1690            let device_free = (Saturating(self.device_size) - inner.used_bytes()).0;
1691
1692            let limit = match owner_object_id {
1693                Some(id) => std::cmp::min(inner.owner_id_bytes_left(id), device_free),
1694                None => device_free,
1695            };
1696            if limit < amount {
1697                return None;
1698            }
1699            inner.add_reservation(owner_object_id, amount);
1700        }
1701        Some(Reservation::new(self, owner_object_id, amount))
1702    }
1703
1704    /// Like reserve, but takes a callback is passed the |limit| and should return the amount,
1705    /// which can be zero.
1706    pub fn reserve_with(
1707        self: Arc<Self>,
1708        owner_object_id: Option<u64>,
1709        amount: impl FnOnce(u64) -> u64,
1710    ) -> Reservation {
1711        let amount = {
1712            let mut inner = self.inner.lock();
1713
1714            let device_free = (Saturating(self.device_size) - inner.used_bytes()).0;
1715
1716            let amount = amount(match owner_object_id {
1717                Some(id) => std::cmp::min(inner.owner_id_bytes_left(id), device_free),
1718                None => device_free,
1719            });
1720
1721            inner.add_reservation(owner_object_id, amount);
1722
1723            amount
1724        };
1725
1726        Reservation::new(self, owner_object_id, amount)
1727    }
1728
1729    /// Returns the total number of allocated bytes.
1730    pub fn get_allocated_bytes(&self) -> u64 {
1731        self.inner.lock().allocated_bytes().0
1732    }
1733
1734    /// Returns the size of bytes available to allocate.
1735    pub fn get_disk_bytes(&self) -> u64 {
1736        self.device_size
1737    }
1738
1739    /// Returns the total number of allocated bytes per owner_object_id.
1740    /// Note that this is quite an expensive operation as it copies the collection.
1741    /// This is intended for use in fsck() and friends, not general use code.
1742    pub fn get_owner_allocated_bytes(&self) -> BTreeMap<u64, u64> {
1743        self.inner.lock().owner_bytes.iter().map(|(k, v)| (*k, v.allocated_bytes.0)).collect()
1744    }
1745
1746    /// Returns the number of allocated and reserved bytes.
1747    pub fn get_used_bytes(&self) -> Saturating<u64> {
1748        let inner = self.inner.lock();
1749        inner.used_bytes()
1750    }
1751}
1752
1753impl ReservationOwner for Allocator {
1754    fn release_reservation(&self, owner_object_id: Option<u64>, amount: u64) {
1755        self.inner.lock().remove_reservation(owner_object_id, amount);
1756    }
1757}
1758
1759#[async_trait]
1760impl JournalingObject for Allocator {
1761    fn apply_mutation(
1762        &self,
1763        mutation: Mutation,
1764        context: &ApplyContext<'_, '_>,
1765        _assoc_obj: AssocObj<'_>,
1766    ) -> Result<(), Error> {
1767        match mutation {
1768            Mutation::Allocator(AllocatorMutation::MarkForDeletion(owner_object_id)) => {
1769                let mut inner = self.inner.lock();
1770                inner.owner_bytes.remove(&owner_object_id);
1771
1772                // We use `info.marked_for_deletion` to track the committed state and
1773                // `inner.marked_for_deletion` to track volumes marked for deletion *after* we have
1774                // flushed the device.  It is not safe to use extents belonging to deleted volumes
1775                // until after we have flushed the device.
1776                inner.info.marked_for_deletion.insert(owner_object_id);
1777                inner.volumes_deleted_pending_sync.insert(owner_object_id);
1778
1779                inner.info.limit_bytes.remove(&owner_object_id);
1780            }
1781            Mutation::Allocator(AllocatorMutation::Allocate { device_range, owner_object_id }) => {
1782                self.maximum_offset.fetch_max(device_range.end, Ordering::Relaxed);
1783                let item = AllocatorItem {
1784                    key: AllocatorKey { device_range: device_range.clone().into() },
1785                    value: AllocatorValue::Abs { count: 1, owner_object_id },
1786                    sequence: context.checkpoint.file_offset,
1787                };
1788                let len = item.key.device_range.length().unwrap();
1789                let lower_bound = item.key.lower_bound_for_merge_into();
1790                self.tree.merge_into(item, &lower_bound);
1791                let mut inner = self.inner.lock();
1792                let entry = inner.owner_bytes.entry(owner_object_id).or_default();
1793                entry.allocated_bytes += len;
1794                if let ApplyMode::Live(transaction) = context.mode {
1795                    entry.uncommitted_allocated_bytes -= len;
1796                    // Note that we cannot drop entries from temporary_allocations without holding
1797                    // the allocation_mutex as it may introduce races. We instead add the range to
1798                    // a Vec that can be applied later when we hold the lock (See comment on
1799                    // `dropped_temporary_allocations` above).
1800                    inner.dropped_temporary_allocations.push(device_range.into());
1801                    if let Some(reservation) = transaction.allocator_reservation {
1802                        reservation.commit(len);
1803                    }
1804                }
1805            }
1806            Mutation::Allocator(AllocatorMutation::Deallocate {
1807                device_range,
1808                owner_object_id,
1809            }) => {
1810                let item = AllocatorItem {
1811                    key: AllocatorKey { device_range: device_range.into() },
1812                    value: AllocatorValue::None,
1813                    sequence: context.checkpoint.file_offset,
1814                };
1815                let len = item.key.device_range.length().unwrap();
1816
1817                {
1818                    let mut inner = self.inner.lock();
1819                    {
1820                        let entry = inner.owner_bytes.entry(owner_object_id).or_default();
1821                        entry.allocated_bytes -= len;
1822                        if context.mode.is_live() {
1823                            entry.committed_deallocated_bytes += len;
1824                        }
1825                    }
1826                    if context.mode.is_live() {
1827                        inner.committed_deallocated.push_back(CommittedDeallocation {
1828                            log_file_offset: context.checkpoint.file_offset,
1829                            range: item.key.device_range.clone(),
1830                            owner_object_id,
1831                        });
1832                    }
1833                    if let ApplyMode::Live(Transaction {
1834                        allocator_reservation: Some(reservation),
1835                        ..
1836                    }) = context.mode
1837                    {
1838                        inner.add_reservation(reservation.owner_object_id(), len);
1839                        reservation.add(len);
1840                    }
1841                }
1842                let lower_bound = item.key.lower_bound_for_merge_into();
1843                self.tree.merge_into(item, &lower_bound);
1844            }
1845            Mutation::Allocator(AllocatorMutation::SetLimit { owner_object_id, bytes }) => {
1846                // Journal replay is ordered and each of these calls is idempotent. So the last one
1847                // will be respected, it doesn't matter if the value is already set, or gets changed
1848                // multiple times during replay. When it gets opened it will be merged in with the
1849                // snapshot.
1850                self.inner.lock().info.limit_bytes.insert(owner_object_id, bytes);
1851            }
1852            Mutation::BeginFlush => {
1853                self.tree.seal();
1854                // Transfer our running count for allocated_bytes so that it gets written to the new
1855                // info file when flush completes.
1856                let mut inner = self.inner.lock();
1857                let allocated_bytes =
1858                    inner.owner_bytes.iter().map(|(k, v)| (*k, v.allocated_bytes.0)).collect();
1859                inner.info.allocated_bytes = allocated_bytes;
1860            }
1861            Mutation::EndFlush => {}
1862            _ => bail!("unexpected mutation: {:?}", mutation),
1863        }
1864        Ok(())
1865    }
1866
1867    fn drop_mutation(&self, mutation: Mutation, transaction: &Transaction<'_>) {
1868        match mutation {
1869            Mutation::Allocator(AllocatorMutation::Allocate { device_range, owner_object_id }) => {
1870                let len = device_range.length().unwrap();
1871                let mut inner = self.inner.lock();
1872                inner
1873                    .owner_bytes
1874                    .entry(owner_object_id)
1875                    .or_default()
1876                    .uncommitted_allocated_bytes -= len;
1877                if let Some(reservation) = transaction.allocator_reservation {
1878                    let res_owner = reservation.owner_object_id();
1879                    inner.add_reservation(res_owner, len);
1880                    reservation.release_reservation(res_owner, len);
1881                }
1882                inner.strategy.free(device_range.clone().into()).expect("drop mutaton");
1883                self.temporary_allocations
1884                    .erase(&AllocatorKey { device_range: device_range.into() });
1885            }
1886            Mutation::Allocator(AllocatorMutation::Deallocate { device_range, .. }) => {
1887                self.temporary_allocations
1888                    .erase(&AllocatorKey { device_range: device_range.into() });
1889            }
1890            _ => {}
1891        }
1892    }
1893
1894    async fn flush(&self) -> Result<Version, Error> {
1895        let filesystem = self.filesystem.upgrade().unwrap();
1896        let object_manager = filesystem.object_manager();
1897        let earliest_version = self.tree.get_earliest_version();
1898        if !object_manager.needs_flush(self.object_id()) && earliest_version == LATEST_VERSION {
1899            // Early exit, but still return the earliest version used by a struct in the tree
1900            return Ok(earliest_version);
1901        }
1902
1903        let fs = self.filesystem.upgrade().unwrap();
1904        let mut flusher = Flusher::new(self, &fs).await;
1905        let new_layer_file = flusher.start().await?;
1906        flusher.finish(new_layer_file).await
1907    }
1908}
1909
1910// The merger is unable to merge extents that exist like the following:
1911//
1912//     |----- +1 -----|
1913//                    |----- -1 -----|
1914//                    |----- +2 -----|
1915//
1916// It cannot coalesce them because it has to emit the +1 record so that it can move on and merge the
1917// -1 and +2 records. To address this, we add another stage that applies after merging which
1918// coalesces records after they have been emitted.  This is a bit simpler than merging because the
1919// records cannot overlap, so it's just a question of merging adjacent records if they happen to
1920// have the same delta and object_id.
1921
1922pub struct CoalescingIterator<I> {
1923    iter: I,
1924    item: Option<AllocatorItem>,
1925}
1926
1927impl<I: LayerIterator<AllocatorKey, AllocatorValue>> CoalescingIterator<I> {
1928    pub async fn new(iter: I) -> Result<CoalescingIterator<I>, Error> {
1929        let mut iter = Self { iter, item: None };
1930        iter.advance().await?;
1931        Ok(iter)
1932    }
1933}
1934
1935#[async_trait]
1936impl<I: LayerIterator<AllocatorKey, AllocatorValue>> LayerIterator<AllocatorKey, AllocatorValue>
1937    for CoalescingIterator<I>
1938{
1939    async fn advance(&mut self) -> Result<(), Error> {
1940        self.item = self.iter.get().map(|x| x.cloned());
1941        if self.item.is_none() {
1942            return Ok(());
1943        }
1944        let left = self.item.as_mut().unwrap();
1945        loop {
1946            self.iter.advance().await?;
1947            match self.iter.get() {
1948                None => return Ok(()),
1949                Some(right) => {
1950                    // The two records cannot overlap.
1951                    ensure!(
1952                        left.key.device_range.end <= right.key.device_range.start,
1953                        FxfsError::Inconsistent
1954                    );
1955                    // We can only coalesce records if they are touching and have the same value.
1956                    if left.key.device_range.end < right.key.device_range.start
1957                        || left.value != *right.value
1958                    {
1959                        return Ok(());
1960                    }
1961                    left.key.device_range.end = right.key.device_range.end;
1962                }
1963            }
1964        }
1965    }
1966
1967    fn get(&self) -> Option<ItemRef<'_, AllocatorKey, AllocatorValue>> {
1968        self.item.as_ref().map(|x| x.as_item_ref())
1969    }
1970}
1971
1972struct Flusher<'a> {
1973    allocator: &'a Allocator,
1974    fs: &'a Arc<FxFilesystem>,
1975    _guard: WriteGuard<'a>,
1976}
1977
1978impl<'a> Flusher<'a> {
1979    async fn new(allocator: &'a Allocator, fs: &'a Arc<FxFilesystem>) -> Self {
1980        let keys = lock_keys![LockKey::flush(allocator.object_id())];
1981        Self { allocator, fs, _guard: fs.lock_manager().write_lock(keys).await }
1982    }
1983
1984    fn txn_options(allocator_reservation: &Reservation) -> Options<'_> {
1985        Options {
1986            skip_journal_checks: true,
1987            borrow_metadata_space: true,
1988            allocator_reservation: Some(allocator_reservation),
1989            ..Default::default()
1990        }
1991    }
1992
1993    async fn start(&mut self) -> Result<DataObjectHandle<ObjectStore>, Error> {
1994        let object_manager = self.fs.object_manager();
1995        let mut transaction = self
1996            .fs
1997            .clone()
1998            .new_transaction(lock_keys![], Self::txn_options(object_manager.metadata_reservation()))
1999            .await?;
2000
2001        let root_store = self.fs.root_store();
2002        let layer_object_handle = ObjectStore::create_object(
2003            &root_store,
2004            &mut transaction,
2005            HandleOptions { skip_journal_checks: true, ..Default::default() },
2006            None,
2007        )
2008        .await?;
2009        root_store.add_to_graveyard(&mut transaction, layer_object_handle.object_id());
2010        // It's important that this transaction does not include any allocations because we use
2011        // BeginFlush as a snapshot point for mutations to the tree: other allocator mutations
2012        // within this transaction might get applied before seal (which would be OK), but they could
2013        // equally get applied afterwards (since Transaction makes no guarantees about the order in
2014        // which mutations are applied whilst committing), in which case they'd get lost on replay
2015        // because the journal will only send mutations that follow this transaction.
2016        transaction.add(self.allocator.object_id(), Mutation::BeginFlush);
2017        transaction.commit().await?;
2018        Ok(layer_object_handle)
2019    }
2020
2021    async fn finish(
2022        self,
2023        layer_object_handle: DataObjectHandle<ObjectStore>,
2024    ) -> Result<Version, Error> {
2025        let object_manager = self.fs.object_manager();
2026        let txn_options = Self::txn_options(object_manager.metadata_reservation());
2027
2028        let layer_set = self.allocator.tree.immutable_layer_set();
2029        let total_len = layer_set.sum_len();
2030        {
2031            let mut merger = layer_set.merger();
2032            let iter = self.allocator.filter(merger.query(Query::FullScan).await?, true).await?;
2033            let iter = CoalescingIterator::new(iter).await?;
2034            self.allocator
2035                .tree
2036                .compact_with_iterator(
2037                    iter,
2038                    total_len,
2039                    DirectWriter::new(&layer_object_handle, txn_options).await,
2040                    layer_object_handle.block_size(),
2041                )
2042                .await?;
2043        }
2044
2045        let root_store = self.fs.root_store();
2046
2047        // Both of these forward-declared variables need to outlive the transaction.
2048        let object_handle;
2049        let reservation_update;
2050        let mut transaction = self
2051            .fs
2052            .clone()
2053            .new_transaction(
2054                lock_keys![LockKey::object(
2055                    root_store.store_object_id(),
2056                    self.allocator.object_id()
2057                )],
2058                txn_options,
2059            )
2060            .await?;
2061        let mut serialized_info = Vec::new();
2062
2063        debug!(oid = layer_object_handle.object_id(); "new allocator layer file");
2064        object_handle = ObjectStore::open_object(
2065            &root_store,
2066            self.allocator.object_id(),
2067            HandleOptions::default(),
2068            None,
2069        )
2070        .await?;
2071
2072        // We must be careful to take a copy AllocatorInfo here rather than manipulate the
2073        // live one. If we remove marked_for_deletion entries prematurely, we may fail any
2074        // allocate() calls that are performed before the new version makes it to disk.
2075        // Specifically, txn_write() below must allocate space and may fail if we prematurely
2076        // clear marked_for_deletion.
2077        let (new_info, marked_for_deletion) = {
2078            let mut info = self.allocator.inner.lock().info.clone();
2079
2080            // After compaction, since we always do a major compaction, all new layers have
2081            // marked_for_deletion objects removed.
2082            let marked_for_deletion = std::mem::take(&mut info.marked_for_deletion);
2083
2084            // Move all the existing layers to the graveyard.
2085            for object_id in &info.layers {
2086                root_store.add_to_graveyard(&mut transaction, *object_id);
2087            }
2088
2089            info.layers = vec![layer_object_handle.object_id()];
2090
2091            (info, marked_for_deletion)
2092        };
2093        new_info.serialize_with_version(&mut serialized_info)?;
2094
2095        let mut buf = object_handle.allocate_buffer(serialized_info.len()).await;
2096        buf.as_mut_slice()[..serialized_info.len()].copy_from_slice(&serialized_info[..]);
2097        object_handle.txn_write(&mut transaction, 0u64, buf.as_ref()).await?;
2098
2099        reservation_update = ReservationUpdate::new(tree::reservation_amount_from_layer_size(
2100            layer_object_handle.get_size(),
2101        ));
2102        let layer_file_sizes = vec![layer_object_handle.get_size()];
2103
2104        // It's important that EndFlush is in the same transaction that we write AllocatorInfo,
2105        // because we use EndFlush to make the required adjustments to allocated_bytes.
2106        transaction.add_with_object(
2107            self.allocator.object_id(),
2108            Mutation::EndFlush,
2109            AssocObj::Borrowed(&reservation_update),
2110        );
2111        root_store.remove_from_graveyard(&mut transaction, layer_object_handle.object_id());
2112
2113        let layers = layers_from_handles([layer_object_handle]).await?;
2114        transaction
2115            .commit_with_callback(|_| {
2116                self.allocator.tree.set_layers(layers);
2117
2118                // At this point we've committed the new layers to disk so we can start using them.
2119                // This means we can also switch to the new AllocatorInfo which clears
2120                // marked_for_deletion.
2121                let mut inner = self.allocator.inner.lock();
2122                inner.info = new_info;
2123                for owner_id in marked_for_deletion {
2124                    inner.marked_for_deletion.remove(&owner_id);
2125                }
2126            })
2127            .await?;
2128
2129        // Now close the layers and purge them.
2130        for layer in layer_set.layers {
2131            let object_id = layer.handle().map(|h| h.object_id());
2132            layer.close_layer().await;
2133            if let Some(object_id) = object_id {
2134                root_store.tombstone_object(object_id, txn_options).await?;
2135            }
2136        }
2137
2138        let mut counters = self.allocator.counters.lock();
2139        counters.num_flushes += 1;
2140        counters.last_flush_time = Some(std::time::SystemTime::now());
2141        counters.persistent_layer_file_sizes = layer_file_sizes;
2142        // Return the earliest version used by a struct in the tree
2143        Ok(self.allocator.tree.get_earliest_version())
2144    }
2145}
2146
2147#[cfg(test)]
2148mod tests {
2149    use crate::filesystem::{
2150        FxFilesystem, FxFilesystemBuilder, JournalingObject, OpenFxFilesystem,
2151    };
2152    use crate::fsck::fsck;
2153    use crate::lsm_tree::cache::NullCache;
2154    use crate::lsm_tree::skip_list_layer::SkipListLayer;
2155    use crate::lsm_tree::types::{Item, ItemRef, Layer, LayerIterator};
2156    use crate::lsm_tree::{LSMTree, Query};
2157    use crate::object_handle::ObjectHandle;
2158    use crate::object_store::allocator::merge::merge;
2159    use crate::object_store::allocator::{
2160        Allocator, AllocatorKey, AllocatorValue, CoalescingIterator,
2161    };
2162    use crate::object_store::transaction::{lock_keys, Options, TRANSACTION_METADATA_MAX_AMOUNT};
2163    use crate::object_store::volume::root_volume;
2164    use crate::object_store::{Directory, LockKey, ObjectStore, NO_OWNER};
2165    use crate::range::RangeExt;
2166    use crate::round::round_up;
2167    use fuchsia_async as fasync;
2168    use fuchsia_sync::Mutex;
2169    use std::cmp::{max, min};
2170    use std::ops::{Bound, Range};
2171    use std::sync::Arc;
2172    use storage_device::fake_device::FakeDevice;
2173    use storage_device::DeviceHolder;
2174
2175    #[fuchsia::test]
2176    async fn test_coalescing_iterator() {
2177        let skip_list = SkipListLayer::new(100);
2178        let items = [
2179            Item::new(
2180                AllocatorKey { device_range: 0..100 },
2181                AllocatorValue::Abs { count: 1, owner_object_id: 99 },
2182            ),
2183            Item::new(
2184                AllocatorKey { device_range: 100..200 },
2185                AllocatorValue::Abs { count: 1, owner_object_id: 99 },
2186            ),
2187        ];
2188        skip_list.insert(items[1].clone()).expect("insert error");
2189        skip_list.insert(items[0].clone()).expect("insert error");
2190        let mut iter =
2191            CoalescingIterator::new(skip_list.seek(Bound::Unbounded).await.expect("seek failed"))
2192                .await
2193                .expect("new failed");
2194        let ItemRef { key, value, .. } = iter.get().expect("get failed");
2195        assert_eq!(
2196            (key, value),
2197            (
2198                &AllocatorKey { device_range: 0..200 },
2199                &AllocatorValue::Abs { count: 1, owner_object_id: 99 }
2200            )
2201        );
2202        iter.advance().await.expect("advance failed");
2203        assert!(iter.get().is_none());
2204    }
2205
2206    #[fuchsia::test]
2207    async fn test_merge_and_coalesce_across_three_layers() {
2208        let lsm_tree = LSMTree::new(merge, Box::new(NullCache {}));
2209        lsm_tree
2210            .insert(Item::new(
2211                AllocatorKey { device_range: 100..200 },
2212                AllocatorValue::Abs { count: 1, owner_object_id: 99 },
2213            ))
2214            .expect("insert error");
2215        lsm_tree.seal();
2216        lsm_tree
2217            .insert(Item::new(
2218                AllocatorKey { device_range: 0..100 },
2219                AllocatorValue::Abs { count: 1, owner_object_id: 99 },
2220            ))
2221            .expect("insert error");
2222
2223        let layer_set = lsm_tree.layer_set();
2224        let mut merger = layer_set.merger();
2225        let mut iter =
2226            CoalescingIterator::new(merger.query(Query::FullScan).await.expect("seek failed"))
2227                .await
2228                .expect("new failed");
2229        let ItemRef { key, value, .. } = iter.get().expect("get failed");
2230        assert_eq!(
2231            (key, value),
2232            (
2233                &AllocatorKey { device_range: 0..200 },
2234                &AllocatorValue::Abs { count: 1, owner_object_id: 99 }
2235            )
2236        );
2237        iter.advance().await.expect("advance failed");
2238        assert!(iter.get().is_none());
2239    }
2240
2241    #[fuchsia::test]
2242    async fn test_merge_and_coalesce_wont_merge_across_object_id() {
2243        let lsm_tree = LSMTree::new(merge, Box::new(NullCache {}));
2244        lsm_tree
2245            .insert(Item::new(
2246                AllocatorKey { device_range: 100..200 },
2247                AllocatorValue::Abs { count: 1, owner_object_id: 99 },
2248            ))
2249            .expect("insert error");
2250        lsm_tree.seal();
2251        lsm_tree
2252            .insert(Item::new(
2253                AllocatorKey { device_range: 0..100 },
2254                AllocatorValue::Abs { count: 1, owner_object_id: 98 },
2255            ))
2256            .expect("insert error");
2257
2258        let layer_set = lsm_tree.layer_set();
2259        let mut merger = layer_set.merger();
2260        let mut iter =
2261            CoalescingIterator::new(merger.query(Query::FullScan).await.expect("seek failed"))
2262                .await
2263                .expect("new failed");
2264        let ItemRef { key, value, .. } = iter.get().expect("get failed");
2265        assert_eq!(
2266            (key, value),
2267            (
2268                &AllocatorKey { device_range: 0..100 },
2269                &AllocatorValue::Abs { count: 1, owner_object_id: 98 },
2270            )
2271        );
2272        iter.advance().await.expect("advance failed");
2273        let ItemRef { key, value, .. } = iter.get().expect("get failed");
2274        assert_eq!(
2275            (key, value),
2276            (
2277                &AllocatorKey { device_range: 100..200 },
2278                &AllocatorValue::Abs { count: 1, owner_object_id: 99 }
2279            )
2280        );
2281        iter.advance().await.expect("advance failed");
2282        assert!(iter.get().is_none());
2283    }
2284
2285    fn overlap(a: &Range<u64>, b: &Range<u64>) -> u64 {
2286        if a.end > b.start && a.start < b.end {
2287            min(a.end, b.end) - max(a.start, b.start)
2288        } else {
2289            0
2290        }
2291    }
2292
2293    async fn collect_allocations(allocator: &Allocator) -> Vec<Range<u64>> {
2294        let layer_set = allocator.tree.layer_set();
2295        let mut merger = layer_set.merger();
2296        let mut iter = allocator
2297            .filter(merger.query(Query::FullScan).await.expect("seek failed"), false)
2298            .await
2299            .expect("build iterator");
2300        let mut allocations: Vec<Range<u64>> = Vec::new();
2301        while let Some(ItemRef { key: AllocatorKey { device_range }, .. }) = iter.get() {
2302            if let Some(r) = allocations.last() {
2303                assert!(device_range.start >= r.end);
2304            }
2305            allocations.push(device_range.clone());
2306            iter.advance().await.expect("advance failed");
2307        }
2308        allocations
2309    }
2310
2311    async fn check_allocations(allocator: &Allocator, expected_allocations: &[Range<u64>]) {
2312        let layer_set = allocator.tree.layer_set();
2313        let mut merger = layer_set.merger();
2314        let mut iter = allocator
2315            .filter(merger.query(Query::FullScan).await.expect("seek failed"), false)
2316            .await
2317            .expect("build iterator");
2318        let mut found = 0;
2319        while let Some(ItemRef { key: AllocatorKey { device_range }, .. }) = iter.get() {
2320            let mut l = device_range.length().expect("Invalid range");
2321            found += l;
2322            // Make sure that the entire range we have found completely overlaps with all the
2323            // allocations we expect to find.
2324            for range in expected_allocations {
2325                l -= overlap(range, device_range);
2326                if l == 0 {
2327                    break;
2328                }
2329            }
2330            assert_eq!(l, 0, "range {device_range:?} not covered by expectations");
2331            iter.advance().await.expect("advance failed");
2332        }
2333        // Make sure the total we found adds up to what we expect.
2334        assert_eq!(found, expected_allocations.iter().map(|r| r.length().unwrap()).sum::<u64>());
2335    }
2336
2337    async fn test_fs() -> (OpenFxFilesystem, Arc<Allocator>) {
2338        let device = DeviceHolder::new(FakeDevice::new(4096, 4096));
2339        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2340        let allocator = fs.allocator();
2341        (fs, allocator)
2342    }
2343
2344    #[fuchsia::test]
2345    async fn test_allocations() {
2346        const STORE_OBJECT_ID: u64 = 99;
2347        let (fs, allocator) = test_fs().await;
2348        let mut transaction =
2349            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2350        let mut device_ranges = collect_allocations(&allocator).await;
2351
2352        // Expected extents:
2353        let expected = vec![
2354            0..4096,        // Superblock A (4k)
2355            4096..139264,   // root_store layer files, StoreInfo.. (33x4k blocks)
2356            139264..204800, // Superblock A extension (64k)
2357            204800..335872, // Initial Journal (128k)
2358            335872..401408, // Superblock B extension (64k)
2359            524288..528384, // Superblock B (4k)
2360        ];
2361        assert_eq!(device_ranges, expected);
2362        device_ranges.push(
2363            allocator
2364                .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2365                .await
2366                .expect("allocate failed"),
2367        );
2368        assert_eq!(device_ranges.last().unwrap().length().expect("Invalid range"), fs.block_size());
2369        device_ranges.push(
2370            allocator
2371                .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2372                .await
2373                .expect("allocate failed"),
2374        );
2375        assert_eq!(device_ranges.last().unwrap().length().expect("Invalid range"), fs.block_size());
2376        assert_eq!(overlap(&device_ranges[0], &device_ranges[1]), 0);
2377        transaction.commit().await.expect("commit failed");
2378        let mut transaction =
2379            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2380        device_ranges.push(
2381            allocator
2382                .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2383                .await
2384                .expect("allocate failed"),
2385        );
2386        assert_eq!(device_ranges[7].length().expect("Invalid range"), fs.block_size());
2387        assert_eq!(overlap(&device_ranges[5], &device_ranges[7]), 0);
2388        assert_eq!(overlap(&device_ranges[6], &device_ranges[7]), 0);
2389        transaction.commit().await.expect("commit failed");
2390
2391        check_allocations(&allocator, &device_ranges).await;
2392    }
2393
2394    #[fuchsia::test]
2395    async fn test_allocate_more_than_max_size() {
2396        const STORE_OBJECT_ID: u64 = 99;
2397        let (fs, allocator) = test_fs().await;
2398        let mut transaction =
2399            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2400        let mut device_ranges = collect_allocations(&allocator).await;
2401        device_ranges.push(
2402            allocator
2403                .allocate(&mut transaction, STORE_OBJECT_ID, fs.device().size())
2404                .await
2405                .expect("allocate failed"),
2406        );
2407        assert_eq!(
2408            device_ranges.last().unwrap().length().expect("Invalid range"),
2409            allocator.max_extent_size_bytes
2410        );
2411        transaction.commit().await.expect("commit failed");
2412
2413        check_allocations(&allocator, &device_ranges).await;
2414    }
2415
2416    #[fuchsia::test]
2417    async fn test_deallocations() {
2418        const STORE_OBJECT_ID: u64 = 99;
2419        let (fs, allocator) = test_fs().await;
2420        let initial_allocations = collect_allocations(&allocator).await;
2421
2422        let mut transaction =
2423            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2424        let device_range1 = allocator
2425            .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2426            .await
2427            .expect("allocate failed");
2428        assert_eq!(device_range1.length().expect("Invalid range"), fs.block_size());
2429        transaction.commit().await.expect("commit failed");
2430
2431        let mut transaction =
2432            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2433        allocator
2434            .deallocate(&mut transaction, STORE_OBJECT_ID, device_range1)
2435            .await
2436            .expect("deallocate failed");
2437        transaction.commit().await.expect("commit failed");
2438
2439        check_allocations(&allocator, &initial_allocations).await;
2440    }
2441
2442    #[fuchsia::test]
2443    async fn test_mark_allocated() {
2444        const STORE_OBJECT_ID: u64 = 99;
2445        let (fs, allocator) = test_fs().await;
2446        let mut device_ranges = collect_allocations(&allocator).await;
2447        let range = {
2448            let mut transaction = fs
2449                .clone()
2450                .new_transaction(lock_keys![], Options::default())
2451                .await
2452                .expect("new failed");
2453            // First, allocate 2 blocks.
2454            allocator
2455                .allocate(&mut transaction, STORE_OBJECT_ID, 2 * fs.block_size())
2456                .await
2457                .expect("allocate failed")
2458            // Let the transaction drop.
2459        };
2460
2461        let mut transaction =
2462            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2463
2464        // If we allocate 1 block, the two blocks that were allocated earlier should be available,
2465        // and this should return the first of them.
2466        device_ranges.push(
2467            allocator
2468                .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2469                .await
2470                .expect("allocate failed"),
2471        );
2472
2473        assert_eq!(device_ranges.last().unwrap().start, range.start);
2474
2475        // Mark the second block as allocated.
2476        let mut range2 = range.clone();
2477        range2.start += fs.block_size();
2478        allocator
2479            .mark_allocated(&mut transaction, STORE_OBJECT_ID, range2.clone())
2480            .await
2481            .expect("mark_allocated failed");
2482        device_ranges.push(range2);
2483
2484        // This should avoid the range we marked as allocated.
2485        device_ranges.push(
2486            allocator
2487                .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2488                .await
2489                .expect("allocate failed"),
2490        );
2491        let last_range = device_ranges.last().unwrap();
2492        assert_eq!(last_range.length().expect("Invalid range"), fs.block_size());
2493        assert_eq!(overlap(last_range, &range), 0);
2494        transaction.commit().await.expect("commit failed");
2495
2496        check_allocations(&allocator, &device_ranges).await;
2497    }
2498
2499    #[fuchsia::test]
2500    async fn test_mark_for_deletion() {
2501        const STORE_OBJECT_ID: u64 = 99;
2502        let (fs, allocator) = test_fs().await;
2503
2504        // Allocate some stuff.
2505        let initial_allocated_bytes = allocator.get_allocated_bytes();
2506        let mut device_ranges = collect_allocations(&allocator).await;
2507        let mut transaction =
2508            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2509        // Note we have a cap on individual allocation length so we allocate over multiple mutation.
2510        for _ in 0..15 {
2511            device_ranges.push(
2512                allocator
2513                    .allocate(&mut transaction, STORE_OBJECT_ID, 100 * fs.block_size())
2514                    .await
2515                    .expect("allocate failed"),
2516            );
2517            device_ranges.push(
2518                allocator
2519                    .allocate(&mut transaction, STORE_OBJECT_ID, 100 * fs.block_size())
2520                    .await
2521                    .expect("allocate2 failed"),
2522            );
2523        }
2524        transaction.commit().await.expect("commit failed");
2525        check_allocations(&allocator, &device_ranges).await;
2526
2527        assert_eq!(
2528            allocator.get_allocated_bytes(),
2529            initial_allocated_bytes + fs.block_size() * 3000
2530        );
2531
2532        // Mark for deletion.
2533        let mut transaction =
2534            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2535        allocator.mark_for_deletion(&mut transaction, STORE_OBJECT_ID).await;
2536        transaction.commit().await.expect("commit failed");
2537
2538        // Expect that allocated bytes is updated immediately but device ranges are still allocated.
2539        assert_eq!(allocator.get_allocated_bytes(), initial_allocated_bytes);
2540        check_allocations(&allocator, &device_ranges).await;
2541
2542        // Allocate more space than we have until we deallocate the mark_for_deletion space.
2543        // This should force a flush on allocate(). (1500 * 3 > test_fs size of 4096 blocks).
2544        device_ranges.clear();
2545
2546        let mut transaction =
2547            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2548        let target_bytes = 1500 * fs.block_size();
2549        while device_ranges.iter().map(|x| x.length().unwrap()).sum::<u64>() != target_bytes {
2550            let len = std::cmp::min(
2551                target_bytes - device_ranges.iter().map(|x| x.length().unwrap()).sum::<u64>(),
2552                100 * fs.block_size(),
2553            );
2554            device_ranges.push(
2555                allocator.allocate(&mut transaction, 100, len).await.expect("allocate failed"),
2556            );
2557        }
2558        transaction.commit().await.expect("commit failed");
2559
2560        // Have the deleted ranges cleaned up.
2561        allocator.flush().await.expect("flush failed");
2562
2563        // The flush above seems to trigger an allocation for the allocator itself.
2564        // We will just check that we have the right size for the owner we care about.
2565
2566        assert!(!allocator.get_owner_allocated_bytes().contains_key(&STORE_OBJECT_ID));
2567        assert_eq!(*allocator.get_owner_allocated_bytes().get(&100).unwrap(), target_bytes,);
2568    }
2569
2570    async fn create_file(store: &Arc<ObjectStore>, size: usize) {
2571        let root_directory =
2572            Directory::open(store, store.root_directory_object_id()).await.expect("open failed");
2573
2574        let mut transaction = store
2575            .filesystem()
2576            .new_transaction(
2577                lock_keys![LockKey::object(
2578                    store.store_object_id(),
2579                    store.root_directory_object_id()
2580                )],
2581                Options::default(),
2582            )
2583            .await
2584            .expect("new_transaction failed");
2585        let file = root_directory
2586            .create_child_file(&mut transaction, &format!("foo {}", size))
2587            .await
2588            .expect("create_child_file failed");
2589        transaction.commit().await.expect("commit failed");
2590
2591        let buffer = file.allocate_buffer(size).await;
2592
2593        // Append some data to it.
2594        let mut transaction = file
2595            .new_transaction_with_options(Options {
2596                borrow_metadata_space: true,
2597                ..Default::default()
2598            })
2599            .await
2600            .expect("new_transaction_with_options failed");
2601        file.txn_write(&mut transaction, 0, buffer.as_ref()).await.expect("txn_write failed");
2602        transaction.commit().await.expect("commit failed");
2603    }
2604
2605    #[fuchsia::test]
2606    async fn test_replay_with_deleted_store_and_compaction() {
2607        let (fs, _) = test_fs().await;
2608
2609        const FILE_SIZE: usize = 10_000_000;
2610
2611        {
2612            let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2613            let store =
2614                root_vol.new_volume("vol", NO_OWNER, None).await.expect("new_volume failed");
2615
2616            create_file(&store, FILE_SIZE).await;
2617        }
2618
2619        fs.close().await.expect("close failed");
2620        let device = fs.take_device().await;
2621        device.reopen(false);
2622
2623        let mut fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2624
2625        // Compact so that when we replay the transaction to delete the store won't find any
2626        // mutations.
2627        fs.journal().compact().await.expect("compact failed");
2628
2629        for _ in 0..2 {
2630            {
2631                let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2632
2633                let transaction = fs
2634                    .clone()
2635                    .new_transaction(
2636                        lock_keys![LockKey::object(
2637                            root_vol.volume_directory().store().store_object_id(),
2638                            root_vol.volume_directory().object_id(),
2639                        )],
2640                        Options { borrow_metadata_space: true, ..Default::default() },
2641                    )
2642                    .await
2643                    .expect("new_transaction failed");
2644                root_vol
2645                    .delete_volume("vol", transaction, || {})
2646                    .await
2647                    .expect("delete_volume failed");
2648
2649                let store =
2650                    root_vol.new_volume("vol", NO_OWNER, None).await.expect("new_volume failed");
2651                create_file(&store, FILE_SIZE).await;
2652            }
2653
2654            fs.close().await.expect("close failed");
2655            let device = fs.take_device().await;
2656            device.reopen(false);
2657
2658            fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2659        }
2660
2661        fsck(fs.clone()).await.expect("fsck failed");
2662        fs.close().await.expect("close failed");
2663    }
2664
2665    #[fuchsia::test]
2666    async fn test_delete_multiple_volumes() {
2667        let (mut fs, _) = test_fs().await;
2668
2669        for _ in 0..50 {
2670            {
2671                let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2672                let store =
2673                    root_vol.new_volume("vol", NO_OWNER, None).await.expect("new_volume failed");
2674
2675                create_file(&store, 1_000_000).await;
2676
2677                let transaction = fs
2678                    .clone()
2679                    .new_transaction(
2680                        lock_keys![LockKey::object(
2681                            root_vol.volume_directory().store().store_object_id(),
2682                            root_vol.volume_directory().object_id(),
2683                        )],
2684                        Options { borrow_metadata_space: true, ..Default::default() },
2685                    )
2686                    .await
2687                    .expect("new_transaction failed");
2688                root_vol
2689                    .delete_volume("vol", transaction, || {})
2690                    .await
2691                    .expect("delete_volume failed");
2692
2693                fs.allocator().flush().await.expect("flush failed");
2694            }
2695
2696            fs.close().await.expect("close failed");
2697            let device = fs.take_device().await;
2698            device.reopen(false);
2699
2700            fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2701        }
2702
2703        fsck(fs.clone()).await.expect("fsck failed");
2704        fs.close().await.expect("close failed");
2705    }
2706
2707    #[fuchsia::test]
2708    async fn test_allocate_free_reallocate() {
2709        const STORE_OBJECT_ID: u64 = 99;
2710        let (fs, allocator) = test_fs().await;
2711
2712        // Allocate some stuff.
2713        let mut device_ranges = Vec::new();
2714        let mut transaction =
2715            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2716        for _ in 0..30 {
2717            device_ranges.push(
2718                allocator
2719                    .allocate(&mut transaction, STORE_OBJECT_ID, 100 * fs.block_size())
2720                    .await
2721                    .expect("allocate failed"),
2722            );
2723        }
2724        transaction.commit().await.expect("commit failed");
2725
2726        assert_eq!(
2727            fs.block_size() * 3000,
2728            *allocator.get_owner_allocated_bytes().entry(STORE_OBJECT_ID).or_default()
2729        );
2730
2731        // Delete it all.
2732        let mut transaction =
2733            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2734        for range in std::mem::replace(&mut device_ranges, Vec::new()) {
2735            allocator.deallocate(&mut transaction, STORE_OBJECT_ID, range).await.expect("dealloc");
2736        }
2737        transaction.commit().await.expect("commit failed");
2738
2739        assert_eq!(0, *allocator.get_owner_allocated_bytes().entry(STORE_OBJECT_ID).or_default());
2740
2741        // Allocate some more stuff. Due to storage pressure, this requires us to flush device
2742        // before reusing the above space
2743        let mut transaction =
2744            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2745        let target_len = 1500 * fs.block_size();
2746        while device_ranges.iter().map(|i| i.length().unwrap()).sum::<u64>() != target_len {
2747            let len = target_len - device_ranges.iter().map(|i| i.length().unwrap()).sum::<u64>();
2748            device_ranges.push(
2749                allocator
2750                    .allocate(&mut transaction, STORE_OBJECT_ID, len)
2751                    .await
2752                    .expect("allocate failed"),
2753            );
2754        }
2755        transaction.commit().await.expect("commit failed");
2756
2757        assert_eq!(
2758            fs.block_size() * 1500,
2759            *allocator.get_owner_allocated_bytes().entry(STORE_OBJECT_ID).or_default()
2760        );
2761    }
2762
2763    #[fuchsia::test]
2764    async fn test_flush() {
2765        const STORE_OBJECT_ID: u64 = 99;
2766
2767        let mut device_ranges = Vec::new();
2768        let device = {
2769            let (fs, allocator) = test_fs().await;
2770            let mut transaction = fs
2771                .clone()
2772                .new_transaction(lock_keys![], Options::default())
2773                .await
2774                .expect("new failed");
2775            device_ranges.push(
2776                allocator
2777                    .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2778                    .await
2779                    .expect("allocate failed"),
2780            );
2781            device_ranges.push(
2782                allocator
2783                    .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2784                    .await
2785                    .expect("allocate failed"),
2786            );
2787            device_ranges.push(
2788                allocator
2789                    .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2790                    .await
2791                    .expect("allocate failed"),
2792            );
2793            transaction.commit().await.expect("commit failed");
2794
2795            allocator.flush().await.expect("flush failed");
2796
2797            fs.close().await.expect("close failed");
2798            fs.take_device().await
2799        };
2800
2801        device.reopen(false);
2802        let fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2803        let allocator = fs.allocator();
2804
2805        let allocated = collect_allocations(&allocator).await;
2806
2807        // Make sure the ranges we allocated earlier are still allocated.
2808        for i in &device_ranges {
2809            let mut overlapping = 0;
2810            for j in &allocated {
2811                overlapping += overlap(i, j);
2812            }
2813            assert_eq!(overlapping, i.length().unwrap(), "Range {i:?} not allocated");
2814        }
2815
2816        let mut transaction =
2817            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2818        let range = allocator
2819            .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2820            .await
2821            .expect("allocate failed");
2822
2823        // Make sure the range just allocated doesn't overlap any other allocated ranges.
2824        for r in &allocated {
2825            assert_eq!(overlap(r, &range), 0);
2826        }
2827        transaction.commit().await.expect("commit failed");
2828    }
2829
2830    #[fuchsia::test]
2831    async fn test_dropped_transaction() {
2832        const STORE_OBJECT_ID: u64 = 99;
2833        let (fs, allocator) = test_fs().await;
2834        let allocated_range = {
2835            let mut transaction = fs
2836                .clone()
2837                .new_transaction(lock_keys![], Options::default())
2838                .await
2839                .expect("new_transaction failed");
2840            allocator
2841                .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2842                .await
2843                .expect("allocate failed")
2844        };
2845        // After dropping the transaction and attempting to allocate again, we should end up with
2846        // the same range because the reservation should have been released.
2847        let mut transaction = fs
2848            .clone()
2849            .new_transaction(lock_keys![], Options::default())
2850            .await
2851            .expect("new_transaction failed");
2852        assert_eq!(
2853            allocator
2854                .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2855                .await
2856                .expect("allocate failed"),
2857            allocated_range
2858        );
2859    }
2860
2861    #[fuchsia::test]
2862    async fn test_cleanup_removed_owner() {
2863        const STORE_OBJECT_ID: u64 = 99;
2864        let device = {
2865            let (fs, allocator) = test_fs().await;
2866
2867            assert!(!allocator.get_owner_allocated_bytes().contains_key(&STORE_OBJECT_ID));
2868            {
2869                let mut transaction =
2870                    fs.clone().new_transaction(lock_keys![], Options::default()).await.unwrap();
2871                allocator
2872                    .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2873                    .await
2874                    .expect("Allocating");
2875                transaction.commit().await.expect("Committing.");
2876            }
2877            allocator.flush().await.expect("Flushing");
2878            assert!(allocator.get_owner_allocated_bytes().contains_key(&STORE_OBJECT_ID));
2879            {
2880                let mut transaction =
2881                    fs.clone().new_transaction(lock_keys![], Options::default()).await.unwrap();
2882                allocator.mark_for_deletion(&mut transaction, STORE_OBJECT_ID).await;
2883                transaction.commit().await.expect("Committing.");
2884            }
2885            assert!(!allocator.get_owner_allocated_bytes().contains_key(&STORE_OBJECT_ID));
2886            fs.close().await.expect("Closing");
2887            fs.take_device().await
2888        };
2889
2890        device.reopen(false);
2891        let fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2892        let allocator = fs.allocator();
2893        assert!(!allocator.get_owner_allocated_bytes().contains_key(&STORE_OBJECT_ID));
2894    }
2895
2896    #[fuchsia::test]
2897    async fn test_allocated_bytes() {
2898        const STORE_OBJECT_ID: u64 = 99;
2899        let (fs, allocator) = test_fs().await;
2900
2901        let initial_allocated_bytes = allocator.get_allocated_bytes();
2902
2903        // Verify allocated_bytes reflects allocation changes.
2904        let allocated_bytes = initial_allocated_bytes + fs.block_size();
2905        let allocated_range = {
2906            let mut transaction = fs
2907                .clone()
2908                .new_transaction(lock_keys![], Options::default())
2909                .await
2910                .expect("new_transaction failed");
2911            let range = allocator
2912                .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2913                .await
2914                .expect("allocate failed");
2915            transaction.commit().await.expect("commit failed");
2916            assert_eq!(allocator.get_allocated_bytes(), allocated_bytes);
2917            range
2918        };
2919
2920        {
2921            let mut transaction = fs
2922                .clone()
2923                .new_transaction(lock_keys![], Options::default())
2924                .await
2925                .expect("new_transaction failed");
2926            allocator
2927                .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2928                .await
2929                .expect("allocate failed");
2930
2931            // Prior to committing, the count of allocated bytes shouldn't change.
2932            assert_eq!(allocator.get_allocated_bytes(), allocated_bytes);
2933        }
2934
2935        // After dropping the prior transaction, the allocated bytes still shouldn't have changed.
2936        assert_eq!(allocator.get_allocated_bytes(), allocated_bytes);
2937
2938        // Verify allocated_bytes reflects deallocations.
2939        let deallocate_range = allocated_range.start + 20..allocated_range.end - 20;
2940        let mut transaction =
2941            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2942        allocator
2943            .deallocate(&mut transaction, STORE_OBJECT_ID, deallocate_range)
2944            .await
2945            .expect("deallocate failed");
2946
2947        // Before committing, there should be no change.
2948        assert_eq!(allocator.get_allocated_bytes(), allocated_bytes);
2949
2950        transaction.commit().await.expect("commit failed");
2951
2952        // After committing, all but 40 bytes should remain allocated.
2953        assert_eq!(allocator.get_allocated_bytes(), initial_allocated_bytes + 40);
2954    }
2955
2956    #[fuchsia::test]
2957    async fn test_persist_bytes_limit() {
2958        const LIMIT: u64 = 12345;
2959        const OWNER_ID: u64 = 12;
2960
2961        let (fs, allocator) = test_fs().await;
2962        {
2963            let mut transaction = fs
2964                .clone()
2965                .new_transaction(lock_keys![], Options::default())
2966                .await
2967                .expect("new_transaction failed");
2968            allocator
2969                .set_bytes_limit(&mut transaction, OWNER_ID, LIMIT)
2970                .await
2971                .expect("Failed to set limit.");
2972            assert!(allocator.inner.lock().info.limit_bytes.get(&OWNER_ID).is_none());
2973            transaction.commit().await.expect("Failed to commit transaction");
2974            let bytes: u64 = *allocator
2975                .inner
2976                .lock()
2977                .info
2978                .limit_bytes
2979                .get(&OWNER_ID)
2980                .expect("Failed to find limit");
2981            assert_eq!(LIMIT, bytes);
2982        }
2983    }
2984
2985    /// Given a sorted list of non-overlapping ranges, this will coalesce adjacent ranges.
2986    /// This allows comparison of equivalent sets of ranges which may occur due to differences
2987    /// across allocator strategies.
2988    fn coalesce_ranges(ranges: Vec<Range<u64>>) -> Vec<Range<u64>> {
2989        let mut coalesced = Vec::new();
2990        let mut prev: Option<Range<u64>> = None;
2991        for range in ranges {
2992            if let Some(prev_range) = &mut prev {
2993                if range.start == prev_range.end {
2994                    prev_range.end = range.end;
2995                } else {
2996                    coalesced.push(prev_range.clone());
2997                    prev = Some(range);
2998                }
2999            } else {
3000                prev = Some(range);
3001            }
3002        }
3003        if let Some(prev_range) = prev {
3004            coalesced.push(prev_range);
3005        }
3006        coalesced
3007    }
3008
3009    #[fuchsia::test]
3010    async fn test_take_for_trimming() {
3011        const STORE_OBJECT_ID: u64 = 99;
3012
3013        // Allocate a large chunk, then free a few bits of it, so we have free chunks interleaved
3014        // with allocated chunks.
3015        let allocated_range;
3016        let expected_free_ranges;
3017        let device = {
3018            let (fs, allocator) = test_fs().await;
3019            let bs = fs.block_size();
3020            let mut transaction = fs
3021                .clone()
3022                .new_transaction(lock_keys![], Options::default())
3023                .await
3024                .expect("new failed");
3025            allocated_range = allocator
3026                .allocate(&mut transaction, STORE_OBJECT_ID, 32 * bs)
3027                .await
3028                .expect("allocate failed");
3029            transaction.commit().await.expect("commit failed");
3030
3031            let mut transaction = fs
3032                .clone()
3033                .new_transaction(lock_keys![], Options::default())
3034                .await
3035                .expect("new failed");
3036            let base = allocated_range.start;
3037            expected_free_ranges = vec![
3038                base..(base + (bs * 1)),
3039                (base + (bs * 2))..(base + (bs * 3)),
3040                // Note that the next three ranges are adjacent and will be treated as one free
3041                // range once applied.  We separate them here to exercise the handling of "large"
3042                // free ranges.
3043                (base + (bs * 4))..(base + (bs * 8)),
3044                (base + (bs * 8))..(base + (bs * 12)),
3045                (base + (bs * 12))..(base + (bs * 13)),
3046                (base + (bs * 29))..(base + (bs * 30)),
3047            ];
3048            for range in &expected_free_ranges {
3049                allocator
3050                    .deallocate(&mut transaction, STORE_OBJECT_ID, range.clone())
3051                    .await
3052                    .expect("deallocate failed");
3053            }
3054            transaction.commit().await.expect("commit failed");
3055
3056            allocator.flush().await.expect("flush failed");
3057
3058            fs.close().await.expect("close failed");
3059            fs.take_device().await
3060        };
3061
3062        device.reopen(false);
3063        let fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
3064        let allocator = fs.allocator();
3065
3066        // These values were picked so that each of them would be the reason why
3067        // collect_free_extents finished, and so we would return after partially processing one of
3068        // the free extents.
3069        let max_extent_size = fs.block_size() as usize * 4;
3070        const EXTENTS_PER_BATCH: usize = 2;
3071        let mut free_ranges = vec![];
3072        let mut offset = allocated_range.start;
3073        while offset < allocated_range.end {
3074            let free = allocator
3075                .take_for_trimming(offset, max_extent_size, EXTENTS_PER_BATCH)
3076                .await
3077                .expect("take_for_trimming failed");
3078            free_ranges.extend(
3079                free.extents().iter().filter(|range| range.end <= allocated_range.end).cloned(),
3080            );
3081            offset = free.extents().last().expect("Unexpectedly hit the end of free extents").end;
3082        }
3083        // Coalesce adjacent free ranges because the allocator may return smaller aligned chunks
3084        // but the overall range should always be equivalent.
3085        let coalesced_free_ranges = coalesce_ranges(free_ranges);
3086        let coalesced_expected_free_ranges = coalesce_ranges(expected_free_ranges);
3087
3088        assert_eq!(coalesced_free_ranges, coalesced_expected_free_ranges);
3089    }
3090
3091    #[fuchsia::test]
3092    async fn test_allocations_wait_for_free_extents() {
3093        const STORE_OBJECT_ID: u64 = 99;
3094        let (fs, allocator) = test_fs().await;
3095        let allocator_clone = allocator.clone();
3096
3097        let mut transaction =
3098            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
3099
3100        // Tie up all of the free extents on the device, and make sure allocations block.
3101        let max_extent_size = fs.device().size() as usize;
3102        const EXTENTS_PER_BATCH: usize = usize::MAX;
3103
3104        // HACK: Treat `trimmable_extents` as being locked by `trim_done` (i.e. it should only be
3105        // accessed whilst `trim_done` is locked). We can't combine them into the same mutex,
3106        // because the inner type would be "poisoned" by the lifetime parameter of
3107        // `trimmable_extents` (which is in the lifetime of `allocator`), and then we can't move it
3108        // into `alloc_task` which would require a `'static` lifetime.
3109        let trim_done = Arc::new(Mutex::new(false));
3110        let trimmable_extents = allocator
3111            .take_for_trimming(0, max_extent_size, EXTENTS_PER_BATCH)
3112            .await
3113            .expect("take_for_trimming failed");
3114
3115        let trim_done_clone = trim_done.clone();
3116        let bs = fs.block_size();
3117        let alloc_task = fasync::Task::spawn(async move {
3118            allocator_clone
3119                .allocate(&mut transaction, STORE_OBJECT_ID, bs)
3120                .await
3121                .expect("allocate should fail");
3122            {
3123                assert!(*trim_done_clone.lock(), "Allocation finished before trim completed");
3124            }
3125            transaction.commit().await.expect("commit failed");
3126        });
3127
3128        // Add a small delay to simulate the trim taking some nonzero amount of time.  Otherwise,
3129        // this will almost certainly always beat the allocation attempt.
3130        fasync::Timer::new(std::time::Duration::from_millis(100)).await;
3131
3132        // Once the free extents are released, the task should unblock.
3133        {
3134            let mut trim_done = trim_done.lock();
3135            std::mem::drop(trimmable_extents);
3136            *trim_done = true;
3137        }
3138
3139        alloc_task.await;
3140    }
3141
3142    #[fuchsia::test]
3143    async fn test_allocation_with_reservation_not_multiple_of_block_size() {
3144        const STORE_OBJECT_ID: u64 = 99;
3145        let (fs, allocator) = test_fs().await;
3146
3147        // Reserve an amount that isn't a multiple of the block size.
3148        const RESERVATION_AMOUNT: u64 = TRANSACTION_METADATA_MAX_AMOUNT + 5000;
3149        let reservation =
3150            allocator.clone().reserve(Some(STORE_OBJECT_ID), RESERVATION_AMOUNT).unwrap();
3151
3152        let mut transaction = fs
3153            .clone()
3154            .new_transaction(
3155                lock_keys![],
3156                Options { allocator_reservation: Some(&reservation), ..Options::default() },
3157            )
3158            .await
3159            .expect("new failed");
3160
3161        let range = allocator
3162            .allocate(
3163                &mut transaction,
3164                STORE_OBJECT_ID,
3165                round_up(RESERVATION_AMOUNT, fs.block_size()).unwrap(),
3166            )
3167            .await
3168            .expect("allocate faiiled");
3169        assert_eq!((range.end - range.start) % fs.block_size(), 0);
3170
3171        println!("{}", range.end - range.start);
3172    }
3173}