Skip to main content

fxfs/object_store/
allocator.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! # The Allocator
6//!
7//! The allocator in Fxfs is filesystem-wide entity responsible for managing the allocation of
8//! regions of the device to "owners" (which are `ObjectStore`).
9//!
10//! Allocations are tracked in an LSMTree with coalescing used to merge neighboring allocations
11//! with the same properties (owner and reference count). As of writing, reference counting is not
12//! used. (Reference counts are intended for future use if/when snapshot support is implemented.)
13//!
14//! There are a number of important implementation features of the allocator that warrant further
15//! documentation.
16//!
17//! ## Byte limits
18//!
19//! Fxfs is a multi-volume filesystem. Fuchsia with fxblob currently uses two primary volumes -
20//! an unencrypted volume for blob storage and an encrypted volume for data storage.
21//! Byte limits ensure that no one volume can consume all available storage. This is important
22//! as software updates must always be possible (blobs) and conversely configuration data should
23//! always be writable (data).
24//!
25//! ## Reservation tracking
26//!
27//! Fxfs on Fuchsia leverages write-back caching which allows us to buffer writes in RAM until
28//! memory pressure, an explicit flush or a file close requires us to persist data to storage.
29//!
30//! To ensure that we do not over-commit in such cases (e.g. by writing many files to RAM only to
31//! find out tha there is insufficient disk to store them all), the allocator includes a simple
32//! reservation tracking mechanism.
33//!
34//! Reservation tracking is implemented hierarchically such that a reservation can portion out
35//! sub-reservations, each of which may be "reserved" or "forgotten" when it comes time to actually
36//! allocate space.
37//!
38//! ## Fixed locations
39//!
40//! The only fixed location files in Fxfs are the first 512kiB extents of the two superblocks that
41//! exist as the first things on the disk (i.e. The first 1MiB). The allocator manages these
42//! locations, but does so using a 'mark_allocated' method distinct from all other allocations in
43//! which the location is left up to the allocator.
44//!
45//! ## Deallocated unusable regions
46//!
47//! It is not legal to reuse a deallocated disk region until after a flush. Transactions
48//! are not guaranteed to be persisted until after a successful flush so any reuse of
49//! a region before then followed by power loss may lead to corruption.
50//!
51//! e.g. Consider if we deallocate a file, reuse its extent for another file, then crash after
52//! writing to the new file but not yet flushing the journal. At next mount we will have lost the
53//! transaction despite overwriting the original file's data, leading to inconsistency errors (data
54//! corruption).
55//!
56//! These regions are currently tracked in RAM in the allocator.
57//!
58//! ## Allocated but uncommitted regions
59//!
60//! These are allocated regions that are not (yet) persisted to disk. They are regular
61//! file allocations, but are not stored on persistent storage until their transaction is committed
62//! and safely flushed to disk.
63//!
64//! ## TRIMed unusable regions
65//!
66//! We periodically TRIM unallocated regions to give the SSD controller insight into which
67//! parts of the device contain data. We must avoid using temporary TRIM allocations that are held
68//! while we perform these operations.
69//!
70//! ## Volume deletion
71//!
72//! We make use of an optimisation in the case where an entire volume is deleted. In such cases,
73//! rather than individually deallocate all disk regions associated with that volume, we make
74//! note of the deletion and perform special merge operation on the next LSMTree compaction that
75//! filters out allocations for the deleted volume.
76//!
77//! This is designed to make dropping of volumes significantly cheaper, but it does add some
78//! additional complexity if implementing an allocator that implements data structures to track
79//! free space (rather than just allocated space).
80//!
81//! ## Image generation
82//!
83//! The Fuchsia build process requires building an initial filesystem image. In the case of
84//! fxblob-based boards, this is an Fxfs filesystem containing a volume with the base set of
85//! blobs required to bootstrap the system. When we build such an image, we want it to be as compact
86//! as possible as we're potentially packaging it up for distribution. To that end, our allocation
87//! strategy (or at least the strategy used for image generation) should prefer to allocate from the
88//! start of the device wherever possible.
89pub mod merge;
90pub mod strategy;
91
92use crate::drop_event::DropEvent;
93use crate::errors::FxfsError;
94use crate::filesystem::{ApplyContext, ApplyMode, FxFilesystem, JournalingObject, SyncOptions};
95use crate::log::*;
96use crate::lsm_tree::cache::NullCache;
97use crate::lsm_tree::skip_list_layer::SkipListLayer;
98use crate::lsm_tree::types::{
99    FuzzyHash, Item, ItemRef, Layer, LayerIterator, LayerKey, MergeType, OrdLowerBound,
100    OrdUpperBound, SortByU64, Value,
101};
102use crate::lsm_tree::{LSMTree, Query, compact_with_iterator, layers_from_handles};
103use crate::object_handle::{INVALID_OBJECT_ID, ObjectHandle, ReadObjectHandle};
104use crate::object_store::object_manager::ReservationUpdate;
105use crate::object_store::transaction::{
106    AllocatorMutation, AssocObj, LockKey, Mutation, Options, Transaction, WriteGuard, lock_keys,
107};
108use crate::object_store::{
109    DataObjectHandle, DirectWriter, Extent, HandleOptions, ObjectStore, ReservedId, tree,
110};
111use crate::range::RangeExt;
112use crate::round::{round_div, round_down, round_up};
113use crate::serialized_types::{
114    DEFAULT_MAX_SERIALIZED_RECORD_SIZE, LATEST_VERSION, Version, Versioned, VersionedLatest,
115};
116use anyhow::{Context, Error, anyhow, bail, ensure};
117use async_trait::async_trait;
118use either::Either::{Left, Right};
119use event_listener::EventListener;
120use fprint::TypeFingerprint;
121use fuchsia_inspect::HistogramProperty;
122use fuchsia_sync::Mutex;
123use futures::FutureExt;
124use fxfs_macros::SerializeKey;
125use merge::{filter_marked_for_deletion, filter_tombstones, merge};
126use serde::{Deserialize, Serialize};
127use std::borrow::Borrow;
128use std::collections::{BTreeMap, HashSet, VecDeque};
129use std::hash::Hash;
130use std::marker::PhantomData;
131use std::num::{NonZero, Saturating};
132use std::ops::Range;
133use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
134use std::sync::{Arc, Weak};
135
136/// This trait is implemented by things that own reservations.
137pub trait ReservationOwner: Send + Sync {
138    /// Report that bytes are being released from the reservation back to the the |ReservationOwner|
139    /// where |owner_object_id| is the owner under the root object store associated with the
140    /// reservation.
141    fn release_reservation(&self, owner_object_id: Option<u64>, amount: u64);
142}
143
144/// A reservation guarantees that when it comes time to actually allocate, it will not fail due to
145/// lack of space.  Sub-reservations (a.k.a. holds) are possible which effectively allows part of a
146/// reservation to be set aside until it's time to commit.  Reservations do offer some
147/// thread-safety, but some responsibility is born by the caller: e.g. calling `forget` and
148/// `reserve` at the same time from different threads is unsafe. Reservations are have an
149/// |owner_object_id| which associates it with an object under the root object store that the
150/// reservation is accounted against.
151pub struct ReservationImpl<T: Borrow<U>, U: ReservationOwner + ?Sized> {
152    owner: T,
153    owner_object_id: Option<u64>,
154    inner: Mutex<ReservationInner>,
155    phantom: PhantomData<U>,
156}
157
158#[derive(Debug, Default)]
159struct ReservationInner {
160    // Amount currently held by this reservation.
161    amount: u64,
162
163    // Amount reserved by sub-reservations.
164    reserved: u64,
165}
166
167impl<T: Borrow<U>, U: ReservationOwner + ?Sized> std::fmt::Debug for ReservationImpl<T, U> {
168    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
169        self.inner.lock().fmt(f)
170    }
171}
172
173impl<T: Borrow<U> + Clone + Send + Sync, U: ReservationOwner + ?Sized> ReservationImpl<T, U> {
174    pub fn new(owner: T, owner_object_id: Option<u64>, amount: u64) -> Self {
175        Self {
176            owner,
177            owner_object_id,
178            inner: Mutex::new(ReservationInner { amount, reserved: 0 }),
179            phantom: PhantomData,
180        }
181    }
182
183    pub fn owner_object_id(&self) -> Option<u64> {
184        self.owner_object_id
185    }
186
187    /// Returns the total amount of the reservation, not accounting for anything that might be held.
188    pub fn amount(&self) -> u64 {
189        self.inner.lock().amount
190    }
191
192    /// Adds more to the reservation.
193    pub fn add(&self, amount: u64) {
194        self.inner.lock().amount += amount;
195    }
196
197    /// Returns the entire amount of the reservation.  The caller is responsible for maintaining
198    /// consistency, i.e. updating counters, etc, and there can be no sub-reservations (an assert
199    /// will fire otherwise).
200    pub fn forget(&self) -> u64 {
201        let mut inner = self.inner.lock();
202        assert_eq!(inner.reserved, 0);
203        std::mem::take(&mut inner.amount)
204    }
205
206    /// Takes some of the reservation.  The caller is responsible for maintaining consistency,
207    /// i.e. updating counters, etc.  This will assert that the amount being forgotten does not
208    /// exceed the available reservation amount; the caller should ensure that this is the case.
209    pub fn forget_some(&self, amount: u64) {
210        let mut inner = self.inner.lock();
211        inner.amount -= amount;
212        assert!(inner.reserved <= inner.amount);
213    }
214
215    /// Returns a partial amount of the reservation. |amount| is passed the |limit| and should
216    /// return the amount, which can be zero.
217    fn reserve_with(&self, amount: impl FnOnce(u64) -> u64) -> ReservationImpl<&Self, Self> {
218        let mut inner = self.inner.lock();
219        let taken = amount(inner.amount - inner.reserved);
220        inner.reserved += taken;
221        ReservationImpl::new(self, self.owner_object_id, taken)
222    }
223
224    /// Reserves *exactly* amount if possible.
225    pub fn reserve(&self, amount: u64) -> Option<ReservationImpl<&Self, Self>> {
226        let mut inner = self.inner.lock();
227        if inner.amount - inner.reserved < amount {
228            None
229        } else {
230            inner.reserved += amount;
231            Some(ReservationImpl::new(self, self.owner_object_id, amount))
232        }
233    }
234
235    /// Commits a previously reserved amount from this reservation.  The caller is responsible for
236    /// ensuring the amount was reserved.
237    pub fn commit(&self, amount: u64) {
238        let mut inner = self.inner.lock();
239        inner.reserved -= amount;
240        inner.amount -= amount;
241    }
242
243    /// Returns some of the reservation.
244    pub fn give_back(&self, amount: u64) {
245        self.owner.borrow().release_reservation(self.owner_object_id, amount);
246        let mut inner = self.inner.lock();
247        inner.amount -= amount;
248        assert!(inner.reserved <= inner.amount);
249    }
250
251    /// Moves `amount` from this reservation to another reservation.
252    pub fn move_to<V: Borrow<W> + Clone + Send + Sync, W: ReservationOwner + ?Sized>(
253        &self,
254        other: &ReservationImpl<V, W>,
255        amount: u64,
256    ) {
257        assert_eq!(self.owner_object_id, other.owner_object_id());
258        let mut inner = self.inner.lock();
259        if let Some(amount) = inner.amount.checked_sub(amount) {
260            inner.amount = amount;
261        } else {
262            std::mem::drop(inner);
263            panic!("Insufficient reservation space");
264        }
265        other.add(amount);
266    }
267}
268
269impl<T: Borrow<U>, U: ReservationOwner + ?Sized> Drop for ReservationImpl<T, U> {
270    fn drop(&mut self) {
271        let inner = self.inner.get_mut();
272        assert_eq!(inner.reserved, 0);
273        let owner_object_id = self.owner_object_id;
274        if inner.amount > 0 {
275            self.owner
276                .borrow()
277                .release_reservation(owner_object_id, std::mem::take(&mut inner.amount));
278        }
279    }
280}
281
282impl<T: Borrow<U> + Send + Sync, U: ReservationOwner + ?Sized> ReservationOwner
283    for ReservationImpl<T, U>
284{
285    fn release_reservation(&self, owner_object_id: Option<u64>, amount: u64) {
286        // Sub-reservations should belong to the same owner (or lack thereof).
287        assert_eq!(owner_object_id, self.owner_object_id);
288        let mut inner = self.inner.lock();
289        assert!(inner.reserved >= amount, "{} >= {}", inner.reserved, amount);
290        inner.reserved -= amount;
291    }
292}
293
294pub type Reservation = ReservationImpl<Arc<dyn ReservationOwner>, dyn ReservationOwner>;
295
296pub type Hold<'a> = ReservationImpl<&'a Reservation, Reservation>;
297
298/// Our allocator implementation tracks extents with a reference count.  At time of writing, these
299/// reference counts should never exceed 1, but that might change with snapshots and clones.
300pub type AllocatorKey = AllocatorKeyV32;
301
302#[derive(
303    Clone,
304    Debug,
305    Deserialize,
306    Eq,
307    Hash,
308    Ord,
309    PartialEq,
310    PartialOrd,
311    Serialize,
312    TypeFingerprint,
313    SerializeKey,
314    Versioned,
315)]
316#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
317pub struct AllocatorKeyV32 {
318    pub device_range: Extent,
319}
320
321impl SortByU64 for AllocatorKey {
322    fn get_leading_u64(&self) -> u64 {
323        self.device_range.end
324    }
325}
326
327const EXTENT_HASH_BUCKET_SIZE: u64 = 1 * 1024 * 1024;
328
329pub struct AllocatorKeyPartitionIterator {
330    device_range: Range<u64>,
331}
332
333impl Iterator for AllocatorKeyPartitionIterator {
334    type Item = u64;
335
336    fn next(&mut self) -> Option<Self::Item> {
337        if self.device_range.start >= self.device_range.end {
338            None
339        } else {
340            let start = self.device_range.start;
341            self.device_range.start = start.saturating_add(EXTENT_HASH_BUCKET_SIZE);
342            let end = std::cmp::min(self.device_range.start, self.device_range.end);
343            let key = AllocatorKey { device_range: Extent(start..end) };
344            let hash = crate::stable_hash::stable_hash(key);
345            Some(hash)
346        }
347    }
348}
349
350impl FuzzyHash for AllocatorKey {
351    fn fuzzy_hash(&self) -> impl Iterator<Item = u64> {
352        AllocatorKeyPartitionIterator {
353            device_range: round_down(self.device_range.start, EXTENT_HASH_BUCKET_SIZE)
354                ..round_up(self.device_range.end, EXTENT_HASH_BUCKET_SIZE).unwrap_or(u64::MAX),
355        }
356    }
357
358    fn is_range_key(&self) -> bool {
359        true
360    }
361}
362
363impl AllocatorKey {
364    /// Returns a new key that is a lower bound suitable for use with merge_into.
365    pub fn lower_bound_for_merge_into(self: &AllocatorKey) -> AllocatorKey {
366        AllocatorKey { device_range: Extent(0..self.device_range.start) }
367    }
368}
369
370impl LayerKey for AllocatorKey {
371    fn merge_type(&self) -> MergeType {
372        MergeType::OptimizedMerge
373    }
374
375    fn next_key(&self) -> Option<Self> {
376        Some(Self { device_range: Extent(0..self.device_range.end + 1) })
377    }
378
379    fn search_key(&self) -> Option<Self> {
380        Some(Self { device_range: Extent(0..self.device_range.start + 1) })
381    }
382
383    fn is_search_key(&self) -> bool {
384        self.device_range.start == 0
385    }
386
387    fn overlaps(&self, other: &Self) -> bool {
388        self.device_range.overlap(&other.device_range).is_some()
389    }
390}
391
392impl OrdUpperBound for AllocatorKey {
393    fn cmp_upper_bound(&self, other: &AllocatorKey) -> std::cmp::Ordering {
394        self.device_range
395            .end
396            .cmp(&other.device_range.end)
397            .then(self.device_range.start.cmp(&other.device_range.start))
398    }
399}
400
401impl OrdLowerBound for AllocatorKey {
402    fn cmp_lower_bound(&self, other: &AllocatorKey) -> std::cmp::Ordering {
403        // The ordering over range.end is significant here as it is used in
404        // the heap ordering that feeds into our merge function and
405        // a total ordering over range lets us remove a symmetry case from
406        // the allocator merge function.
407        self.device_range.cmp(&other.device_range)
408    }
409}
410
411/// Allocations are "owned" by a single ObjectStore and are reference counted
412/// (for future snapshot/clone support).
413pub type AllocatorValue = AllocatorValueV32;
414impl Value for AllocatorValue {
415    const DELETED_MARKER: Self = Self::None;
416}
417
418#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize, TypeFingerprint, Versioned)]
419#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
420pub enum AllocatorValueV32 {
421    // Tombstone variant indicating an extent is no longer allocated.
422    None,
423    // Used when we know there are no possible allocations below us in the stack.
424    // This is currently all the time. We used to have a related Delta type but
425    // it has been removed due to correctness issues (https://fxbug.dev/42179428).
426    Abs { count: u64, owner_object_id: u64 },
427}
428
429pub type AllocatorItem = Item<AllocatorKey, AllocatorValue>;
430
431/// Serialized information about the allocator.
432pub type AllocatorInfo = AllocatorInfoV32;
433
434#[derive(Debug, Default, Clone, Deserialize, Serialize, TypeFingerprint, Versioned)]
435pub struct AllocatorInfoV32 {
436    /// Holds the set of layer file object_id for the LSM tree (newest first).
437    pub layers: Vec<u64>,
438    /// Maps from owner_object_id to bytes allocated.
439    pub allocated_bytes: BTreeMap<u64, u64>,
440    /// Set of owner_object_id that we should ignore if found in layer files.  For now, this should
441    /// always be empty on-disk because we always do full compactions.
442    pub marked_for_deletion: HashSet<u64>,
443    /// The limit for the number of allocates bytes per `owner_object_id` whereas the value. If
444    /// there is no limit present here for an `owner_object_id` assume it is max u64.
445    pub limit_bytes: BTreeMap<u64, u64>,
446}
447
448const MAX_ALLOCATOR_INFO_SERIALIZED_SIZE: usize = 131_072;
449
450/// Computes the target maximum extent size based on the block size of the allocator.
451pub fn max_extent_size_for_block_size(block_size: u64) -> u64 {
452    // Each block in an extent contains an 8-byte checksum (which due to varint encoding is 9
453    // bytes), and a given extent record must be no larger DEFAULT_MAX_SERIALIZED_RECORD_SIZE.  We
454    // also need to leave a bit of room (arbitrarily, 64 bytes) for the rest of the extent's
455    // metadata.
456    block_size * (DEFAULT_MAX_SERIALIZED_RECORD_SIZE - 64) / 9
457}
458
459#[derive(Default)]
460struct AllocatorCounters {
461    num_flushes: u64,
462    last_flush_time: Option<std::time::SystemTime>,
463}
464
465pub struct Allocator {
466    filesystem: Weak<FxFilesystem>,
467    block_size: u64,
468    device_size: u64,
469    object_id: u64,
470    max_extent_size_bytes: u64,
471    tree: LSMTree<AllocatorKey, AllocatorValue>,
472    // A list of allocations which are temporary; i.e. they are not actually stored in the LSM tree,
473    // but we still don't want to be able to allocate over them.  This layer is merged into the
474    // allocator's LSM tree whilst reading it, so the allocations appear to exist in the LSM tree.
475    // This is used in a few places, for example to hold back allocations that have been added to a
476    // transaction but are not yet committed yet, or to prevent the allocation of a deleted extent
477    // until the device is flushed.
478    temporary_allocations: Arc<SkipListLayer<AllocatorKey, AllocatorValue>>,
479    inner: Mutex<Inner>,
480    allocation_mutex: futures::lock::Mutex<()>,
481    counters: Mutex<AllocatorCounters>,
482    maximum_offset: AtomicU64,
483    allocations_allowed: AtomicBool,
484}
485
486/// Tracks the different stages of byte allocations for an individual owner.
487#[derive(Debug, Default, PartialEq)]
488struct ByteTracking {
489    /// This value is the up-to-date count of the number of allocated bytes per owner_object_id
490    /// whereas the value in `Info::allocated_bytes` is the value as it was when we last flushed.
491    /// This field should be regarded as *untrusted*; it can be invalid due to filesystem
492    /// inconsistencies, and this is why it is `Saturating<u64>` rather than just `u64`.  Any
493    /// computations that use this value should typically propagate `Saturating` so that it is clear
494    /// the value is *untrusted*.
495    allocated_bytes: Saturating<u64>,
496
497    /// This value is the number of bytes allocated to uncommitted allocations.
498    /// (Bytes allocated, but not yet persisted to disk)
499    uncommitted_allocated_bytes: u64,
500
501    /// This value is the number of bytes allocated to reservations.
502    reserved_bytes: u64,
503
504    /// Committed deallocations that we cannot use until they are flushed to the device.  Each entry
505    /// in this list is the log file offset at which it was committed and an array of deallocations
506    /// that occurred at that time.
507    committed_deallocated_bytes: u64,
508}
509
510impl ByteTracking {
511    // Returns the total number of bytes that are taken either from reservations, allocations or
512    // uncommitted allocations.
513    fn used_bytes(&self) -> Saturating<u64> {
514        self.allocated_bytes + Saturating(self.uncommitted_allocated_bytes + self.reserved_bytes)
515    }
516
517    // Returns the amount that is not available to be allocated, which includes actually allocated
518    // bytes, bytes that have been allocated for a transaction but the transaction hasn't committed
519    // yet, and bytes that have been deallocated, but the device hasn't been flushed yet so we can't
520    // reuse those bytes yet.
521    fn unavailable_bytes(&self) -> Saturating<u64> {
522        self.allocated_bytes
523            + Saturating(self.uncommitted_allocated_bytes)
524            + Saturating(self.committed_deallocated_bytes)
525    }
526
527    // Like `unavailable_bytes`, but treats as available the bytes which have been deallocated and
528    // require a device flush to be reused.
529    fn unavailable_after_sync_bytes(&self) -> Saturating<u64> {
530        self.allocated_bytes + Saturating(self.uncommitted_allocated_bytes)
531    }
532}
533
534#[derive(Debug)]
535struct CommittedDeallocation {
536    // The offset at which this deallocation was committed.
537    log_file_offset: u64,
538    // The device range being deallocated.
539    range: Range<u64>,
540    // The owning object id which originally allocated it.
541    owner_object_id: u64,
542}
543
544struct Inner {
545    info: AllocatorInfo,
546
547    /// The allocator can only be opened if there have been no allocations and it has not already
548    /// been opened or initialized.
549    opened: bool,
550
551    /// When we allocate a range from RAM, we add it to Allocator::temporary_allocations.
552    /// This layer is added to layer_set in rebuild_strategy and take_for trim so we don't assume
553    /// the range is available.
554    /// If we apply the mutation, we move it from `temporary_allocations` to the LSMTree.
555    /// If we drop the mutation, we just delete it from `temporary_allocations` and call `free`.
556    ///
557    /// We need to be very careful about races when we move the allocation into the LSMTree.
558    /// A layer_set is a snapshot in time of a set of layers. Say:
559    ///  1. `rebuild_strategy` takes a layer_set that includes the mutable layer.
560    ///  2. A compaction operation seals that mutable layer and creates a new mutable layer.
561    ///     Note that the layer_set in rebuild_strategy doesn't have this new mutable layer.
562    ///  3. An apply_mutation operation adds the allocation to the new mutable layer.
563    ///     It then removes the allocation from `temporary_allocations`.
564    ///  4. `rebuild_strategy` iterates the layer_set, missing both the temporary mutation AND
565    ///     the record in the new mutable layer, making the allocated range available for double
566    ///     allocation and future filesystem corruption.
567    ///
568    /// We don't have (or want) a means to lock compctions during rebuild_strategy operations so
569    /// to avoid this scenario, we can't remove from temporary_allocations when we apply a mutation.
570    /// Instead we add them to dropped_temporary_allocations and make the actual removals from
571    /// `temporary_allocations` while holding the allocator lock. Because rebuild_strategy only
572    /// runs with this lock held, this guarantees that we don't remove entries from temporary
573    /// mutations while also iterating over it.
574    ///
575    /// A related issue happens when we deallocate a range. We use temporary_allocations this time
576    /// to prevent reuse until after the deallocation has been successfully flushed.
577    /// In this scenario we don't want to call 'free()' until the range is ready to use again.
578    dropped_temporary_allocations: Vec<Range<u64>>,
579
580    /// The per-owner counters for bytes at various stages of the data life-cycle. From initial
581    /// reservation through until the bytes are unallocated and eventually uncommitted.
582    owner_bytes: BTreeMap<u64, ByteTracking>,
583
584    /// This value is the number of bytes allocated to reservations but not tracked as part of a
585    /// particular volume.
586    unattributed_reserved_bytes: u64,
587
588    /// Committed deallocations that we cannot use until they are flushed to the device.
589    committed_deallocated: VecDeque<CommittedDeallocation>,
590
591    /// Bytes which are currently being trimmed.  These can still be allocated from, but the
592    /// allocation will block until the current batch of trimming is finished.
593    trim_reserved_bytes: u64,
594
595    /// While a trim is being performed, this listener is set.  When the current batch of extents
596    /// being trimmed have been released (and trim_reserved_bytes is 0), this is signaled.
597    /// This should only be listened to while the allocation_mutex is held.
598    trim_listener: Option<EventListener>,
599
600    /// This controls how we allocate our free space to manage fragmentation.
601    strategy: strategy::BestFit,
602
603    /// Tracks the number of allocations of size 1,2,...63,>=64.
604    allocation_size_histogram: [u64; 64],
605    /// Tracks which size bucket triggers rebuild_strategy.
606    rebuild_strategy_trigger_histogram: [u64; 64],
607
608    /// This is distinct from the set contained in `info`.  New entries are inserted *after* a
609    /// device has been flushed (it is not safe to reuse the space taken by a deleted volume prior
610    /// to this) and are removed after a major compaction.
611    marked_for_deletion: HashSet<u64>,
612
613    /// The set of volumes deleted that are waiting for a sync.
614    volumes_deleted_pending_sync: HashSet<u64>,
615}
616
617impl Inner {
618    fn allocated_bytes(&self) -> Saturating<u64> {
619        let mut total = Saturating(0);
620        for (_, bytes) in &self.owner_bytes {
621            total += bytes.allocated_bytes;
622        }
623        total
624    }
625
626    fn uncommitted_allocated_bytes(&self) -> u64 {
627        self.owner_bytes.values().map(|x| &x.uncommitted_allocated_bytes).sum()
628    }
629
630    fn reserved_bytes(&self) -> u64 {
631        self.owner_bytes.values().map(|x| &x.reserved_bytes).sum::<u64>()
632            + self.unattributed_reserved_bytes
633    }
634
635    fn owner_id_limit_bytes(&self, owner_object_id: u64) -> u64 {
636        match self.info.limit_bytes.get(&owner_object_id) {
637            Some(v) => *v,
638            None => u64::MAX,
639        }
640    }
641
642    fn owner_id_bytes_left(&self, owner_object_id: u64) -> u64 {
643        let limit = self.owner_id_limit_bytes(owner_object_id);
644        let used = self.owner_bytes.get(&owner_object_id).map_or(Saturating(0), |b| b.used_bytes());
645        (Saturating(limit) - used).0
646    }
647
648    // Returns the amount that is not available to be allocated, which includes actually allocated
649    // bytes, bytes that have been allocated for a transaction but the transaction hasn't committed
650    // yet, and bytes that have been deallocated, but the device hasn't been flushed yet so we can't
651    // reuse those bytes yet.
652    fn unavailable_bytes(&self) -> Saturating<u64> {
653        let mut total = Saturating(0);
654        for (_, bytes) in &self.owner_bytes {
655            total += bytes.unavailable_bytes();
656        }
657        total
658    }
659
660    // Returns the total number of bytes that are taken either from reservations, allocations or
661    // uncommitted allocations.
662    fn used_bytes(&self) -> Saturating<u64> {
663        let mut total = Saturating(0);
664        for (_, bytes) in &self.owner_bytes {
665            total += bytes.used_bytes();
666        }
667        total + Saturating(self.unattributed_reserved_bytes)
668    }
669
670    // Like `unavailable_bytes`, but treats as available the bytes which have been deallocated and
671    // require a device flush to be reused.
672    fn unavailable_after_sync_bytes(&self) -> Saturating<u64> {
673        let mut total = Saturating(0);
674        for (_, bytes) in &self.owner_bytes {
675            total += bytes.unavailable_after_sync_bytes();
676        }
677        total
678    }
679
680    // Returns the number of bytes which will be available after the current batch of trimming
681    // completes.
682    fn bytes_available_not_being_trimmed(&self, device_size: u64) -> Result<u64, Error> {
683        device_size
684            .checked_sub(
685                (self.unavailable_after_sync_bytes() + Saturating(self.trim_reserved_bytes)).0,
686            )
687            .ok_or_else(|| anyhow!(FxfsError::Inconsistent))
688    }
689
690    fn add_reservation(&mut self, owner_object_id: Option<u64>, amount: u64) {
691        match owner_object_id {
692            Some(owner) => self.owner_bytes.entry(owner).or_default().reserved_bytes += amount,
693            None => self.unattributed_reserved_bytes += amount,
694        };
695    }
696
697    fn remove_reservation(&mut self, owner_object_id: Option<u64>, amount: u64) {
698        match owner_object_id {
699            Some(owner) => {
700                let owner_entry = self.owner_bytes.entry(owner).or_default();
701                assert!(
702                    owner_entry.reserved_bytes >= amount,
703                    "{} >= {}",
704                    owner_entry.reserved_bytes,
705                    amount
706                );
707                owner_entry.reserved_bytes -= amount;
708            }
709            None => {
710                assert!(
711                    self.unattributed_reserved_bytes >= amount,
712                    "{} >= {}",
713                    self.unattributed_reserved_bytes,
714                    amount
715                );
716                self.unattributed_reserved_bytes -= amount
717            }
718        };
719    }
720}
721
722/// A container for a set of extents which are known to be free and can be trimmed.  Returned by
723/// `take_for_trimming`.
724pub struct TrimmableExtents<'a> {
725    allocator: &'a Allocator,
726    extents: Vec<Range<u64>>,
727    // The allocator can subscribe to this event to wait until these extents are dropped.  This way,
728    // we don't fail an allocation attempt if blocks are tied up for trimming; rather, we just wait
729    // until the batch is finished with and then proceed.
730    _drop_event: DropEvent,
731}
732
733impl<'a> TrimmableExtents<'a> {
734    pub fn extents(&self) -> &Vec<Range<u64>> {
735        &self.extents
736    }
737
738    // Also returns an EventListener which is signaled when this is dropped.
739    fn new(allocator: &'a Allocator) -> (Self, EventListener) {
740        let drop_event = DropEvent::new();
741        let listener = drop_event.listen();
742        (Self { allocator, extents: vec![], _drop_event: drop_event }, listener)
743    }
744
745    fn add_extent(&mut self, extent: Range<u64>) {
746        self.extents.push(extent);
747    }
748}
749
750impl<'a> Drop for TrimmableExtents<'a> {
751    fn drop(&mut self) {
752        let mut inner = self.allocator.inner.lock();
753        for device_range in std::mem::take(&mut self.extents) {
754            inner.strategy.free(device_range.clone()).expect("drop trim extent");
755            self.allocator
756                .temporary_allocations
757                .erase(&AllocatorKey { device_range: Extent(device_range.clone()) });
758        }
759        inner.trim_reserved_bytes = 0;
760    }
761}
762
763impl Allocator {
764    pub fn new(filesystem: Arc<FxFilesystem>, object_id: u64) -> Allocator {
765        let block_size = filesystem.block_size();
766        // We expect device size to be a multiple of block size. Throw away any tail.
767        let device_size = round_down(filesystem.device().size(), block_size);
768        if device_size != filesystem.device().size() {
769            warn!("Device size is not block aligned. Rounding down.");
770        }
771        let max_extent_size_bytes = max_extent_size_for_block_size(filesystem.block_size());
772        let mut strategy = strategy::BestFit::default();
773        strategy.free(0..device_size).expect("new fs");
774        Allocator {
775            filesystem: Arc::downgrade(&filesystem),
776            block_size,
777            device_size,
778            object_id,
779            max_extent_size_bytes,
780            tree: LSMTree::new(merge, Box::new(NullCache {})),
781            temporary_allocations: SkipListLayer::new(1024),
782            inner: Mutex::new(Inner {
783                info: AllocatorInfo::default(),
784                opened: false,
785                dropped_temporary_allocations: Vec::new(),
786                owner_bytes: BTreeMap::new(),
787                unattributed_reserved_bytes: 0,
788                committed_deallocated: VecDeque::new(),
789                trim_reserved_bytes: 0,
790                trim_listener: None,
791                strategy,
792                allocation_size_histogram: [0; 64],
793                rebuild_strategy_trigger_histogram: [0; 64],
794                marked_for_deletion: HashSet::new(),
795                volumes_deleted_pending_sync: HashSet::new(),
796            }),
797            allocation_mutex: futures::lock::Mutex::new(()),
798            counters: Mutex::new(AllocatorCounters::default()),
799            maximum_offset: AtomicU64::new(0),
800            allocations_allowed: AtomicBool::new(filesystem.options().image_builder_mode.is_none()),
801        }
802    }
803
804    pub fn tree(&self) -> &LSMTree<AllocatorKey, AllocatorValue> {
805        &self.tree
806    }
807
808    /// Enables allocations.
809    /// This is only valid to call if the allocator was created in image builder mode.
810    pub fn enable_allocations(&self) {
811        self.allocations_allowed.store(true, Ordering::SeqCst);
812    }
813
814    /// Returns an iterator that yields all allocations, filtering out tombstones and any
815    /// owner_object_id that have been marked as deleted.  If `committed_marked_for_deletion` is
816    /// true, then filter using the committed volumes marked for deletion rather than the in-memory
817    /// copy which excludes volumes that have been deleted but there hasn't been a sync yet.
818    pub async fn filter(
819        &self,
820        iter: impl LayerIterator<AllocatorKey, AllocatorValue>,
821        committed_marked_for_deletion: bool,
822    ) -> Result<impl LayerIterator<AllocatorKey, AllocatorValue>, Error> {
823        let marked_for_deletion = {
824            let inner = self.inner.lock();
825            if committed_marked_for_deletion {
826                &inner.info.marked_for_deletion
827            } else {
828                &inner.marked_for_deletion
829            }
830            .clone()
831        };
832        let iter =
833            filter_marked_for_deletion(filter_tombstones(iter).await?, marked_for_deletion).await?;
834        Ok(iter)
835    }
836
837    /// A histogram of allocation request sizes.
838    /// The index into the array is 'number of blocks'.
839    /// The last bucket is a catch-all for larger allocation requests.
840    pub fn allocation_size_histogram(&self) -> [u64; 64] {
841        self.inner.lock().allocation_size_histogram
842    }
843
844    /// Creates a new (empty) allocator.
845    pub async fn create(&self, transaction: &mut Transaction<'_>) -> Result<(), Error> {
846        // Mark the allocator as opened before creating the file because creating a new
847        // transaction requires a reservation.
848        assert_eq!(std::mem::replace(&mut self.inner.lock().opened, true), false);
849
850        let filesystem = self.filesystem.upgrade().unwrap();
851        let root_store = filesystem.root_store();
852        ObjectStore::create_object_with_id(
853            &root_store,
854            transaction,
855            ReservedId::new(&root_store, NonZero::new(self.object_id()).unwrap()),
856            HandleOptions::default(),
857            None,
858        )?;
859        root_store.update_last_object_id(self.object_id());
860        Ok(())
861    }
862
863    // Opens the allocator.  This is not thread-safe; this should be called prior to
864    // replaying the allocator's mutations.
865    pub async fn open(self: &Arc<Self>) -> Result<(), Error> {
866        let filesystem = self.filesystem.upgrade().unwrap();
867        let root_store = filesystem.root_store();
868
869        self.inner.lock().strategy = strategy::BestFit::default();
870
871        let handle =
872            ObjectStore::open_object(&root_store, self.object_id, HandleOptions::default(), None)
873                .await
874                .context("Failed to open allocator object")?;
875
876        if handle.get_size() > 0 {
877            let serialized_info = handle
878                .contents(MAX_ALLOCATOR_INFO_SERIALIZED_SIZE)
879                .await
880                .context("Failed to read AllocatorInfo")?;
881            let mut cursor = std::io::Cursor::new(serialized_info);
882            let (info, _version) = AllocatorInfo::deserialize_with_version(&mut cursor)
883                .context("Failed to deserialize AllocatorInfo")?;
884
885            let mut handles = Vec::new();
886            let mut total_size = 0;
887            for object_id in &info.layers {
888                let handle = ObjectStore::open_object(
889                    &root_store,
890                    *object_id,
891                    HandleOptions::default(),
892                    None,
893                )
894                .await
895                .context("Failed to open allocator layer file")?;
896
897                let size = handle.get_size();
898                total_size += size;
899                handles.push(handle);
900            }
901
902            {
903                let mut inner = self.inner.lock();
904
905                // Check allocated_bytes fits within the device.
906                let mut device_bytes = self.device_size;
907                for (&owner_object_id, &bytes) in &info.allocated_bytes {
908                    ensure!(
909                        bytes <= device_bytes,
910                        anyhow!(FxfsError::Inconsistent).context(format!(
911                            "Allocated bytes exceeds device size: {:?}",
912                            info.allocated_bytes
913                        ))
914                    );
915                    device_bytes -= bytes;
916
917                    inner.owner_bytes.entry(owner_object_id).or_default().allocated_bytes =
918                        Saturating(bytes);
919                }
920
921                inner.info = info;
922            }
923
924            self.tree.append_layers(handles).await.context("Failed to append allocator layers")?;
925            self.filesystem.upgrade().unwrap().object_manager().update_reservation(
926                self.object_id,
927                tree::reservation_amount_from_layer_size(total_size),
928            );
929        }
930
931        Ok(())
932    }
933
934    pub async fn on_replay_complete(self: &Arc<Self>) -> Result<(), Error> {
935        // We can assume the device has been flushed.
936        {
937            let mut inner = self.inner.lock();
938            inner.volumes_deleted_pending_sync.clear();
939            inner.marked_for_deletion = inner.info.marked_for_deletion.clone();
940        }
941
942        // Build free extent structure from disk. For now, we expect disks to have *some* free
943        // space at all times. This may change if we ever support mounting of read-only
944        // redistributable filesystem images.
945        if !self.rebuild_strategy().await.context("Build free extents")? {
946            if self.filesystem.upgrade().unwrap().options().read_only {
947                info!("Device contains no free space (read-only mode).");
948            } else {
949                info!("Device contains no free space.");
950                return Err(FxfsError::Inconsistent)
951                    .context("Device appears to contain no free space");
952            }
953        }
954
955        assert_eq!(std::mem::replace(&mut self.inner.lock().opened, true), false);
956        Ok(())
957    }
958
959    /// Walk all allocations to generate the set of free regions between allocations.
960    /// It is safe to re-run this on live filesystems but it should not be called concurrently
961    /// with allocations, trims or other rebuild_strategy invocations -- use allocation_mutex.
962    /// Returns true if the rebuild made changes to either the free ranges or overflow markers.
963    async fn rebuild_strategy(self: &Arc<Self>) -> Result<bool, Error> {
964        let mut changed = false;
965        let mut layer_set = self.tree.empty_layer_set();
966        layer_set.layers.push((self.temporary_allocations.clone() as Arc<dyn Layer<_, _>>).into());
967        self.tree.add_all_layers_to_layer_set(&mut layer_set);
968
969        let overflow_markers = self.inner.lock().strategy.overflow_markers();
970        self.inner.lock().strategy.reset_overflow_markers();
971
972        let mut to_add = Vec::new();
973        let mut merger = layer_set.merger();
974        let mut iter = self.filter(merger.query(Query::FullScan).await?, false).await?;
975        let mut last_offset = 0;
976        while last_offset < self.device_size {
977            let next_range = match iter.get() {
978                None => {
979                    assert!(last_offset <= self.device_size);
980                    let range = last_offset..self.device_size;
981                    last_offset = self.device_size;
982                    iter.advance().await?;
983                    range
984                }
985                Some(ItemRef { key, .. }) => {
986                    let device_range = &key.device_range;
987                    if device_range.end < last_offset {
988                        iter.advance().await?;
989                        continue;
990                    }
991                    if device_range.start <= last_offset {
992                        last_offset = device_range.end;
993                        iter.advance().await?;
994                        continue;
995                    }
996                    let range = last_offset..device_range.start;
997                    last_offset = device_range.end;
998                    iter.advance().await?;
999                    range
1000                }
1001            };
1002            to_add.push(next_range);
1003            // Avoid taking a lock on inner for every free range.
1004            if to_add.len() > 100 {
1005                let mut inner = self.inner.lock();
1006                for range in to_add.drain(..) {
1007                    changed |= inner.strategy.force_free(range)?;
1008                }
1009            }
1010        }
1011        let mut inner = self.inner.lock();
1012        for range in to_add {
1013            changed |= inner.strategy.force_free(range)?;
1014        }
1015        if overflow_markers != inner.strategy.overflow_markers() {
1016            changed = true;
1017        }
1018        Ok(changed)
1019    }
1020
1021    /// Collects up to `extents_per_batch` free extents of size up to `max_extent_size` from
1022    /// `offset`.  The extents will be reserved.
1023    /// Note that only one `FreeExtents` can exist for the allocator at any time.
1024    pub async fn take_for_trimming(
1025        &self,
1026        offset: u64,
1027        max_extent_size: usize,
1028        extents_per_batch: usize,
1029    ) -> Result<TrimmableExtents<'_>, Error> {
1030        let _guard = self.allocation_mutex.lock().await;
1031
1032        let (mut result, listener) = TrimmableExtents::new(self);
1033        if offset >= self.device_size {
1034            return Ok(result);
1035        }
1036        let mut bytes = 0;
1037
1038        // We can't just use self.strategy here because it doesn't necessarily hold all
1039        // free extent metadata in RAM.
1040        let mut layer_set = self.tree.empty_layer_set();
1041        layer_set.layers.push((self.temporary_allocations.clone() as Arc<dyn Layer<_, _>>).into());
1042        self.tree.add_all_layers_to_layer_set(&mut layer_set);
1043        let mut merger = layer_set.merger();
1044        let mut iter = self
1045            .filter(
1046                merger
1047                    .query(Query::FullRange(&AllocatorKey {
1048                        device_range: Extent::search_key_from_offset(offset),
1049                    }))
1050                    .await?,
1051                false,
1052            )
1053            .await?;
1054        let mut last_offset = offset;
1055        'outer: while last_offset < self.device_size {
1056            let mut range = match iter.get() {
1057                None => {
1058                    assert!(last_offset <= self.device_size);
1059                    let range = last_offset..self.device_size;
1060                    last_offset = self.device_size;
1061                    iter.advance().await?;
1062                    range
1063                }
1064                Some(ItemRef { key: AllocatorKey { device_range, .. }, .. }) => {
1065                    if device_range.end <= last_offset {
1066                        iter.advance().await?;
1067                        continue;
1068                    }
1069                    if device_range.start <= last_offset {
1070                        last_offset = device_range.end;
1071                        iter.advance().await?;
1072                        continue;
1073                    }
1074                    let range = last_offset..device_range.start;
1075                    last_offset = device_range.end;
1076                    iter.advance().await?;
1077                    range
1078                }
1079            };
1080            if range.start < offset {
1081                continue;
1082            }
1083
1084            // 'range' is based on the on-disk LSM tree. We need to check against any uncommitted
1085            // allocations and remove temporarily allocate the range for ourselves.
1086            let mut inner = self.inner.lock();
1087
1088            while range.start < range.end {
1089                let prefix =
1090                    range.start..std::cmp::min(range.start + max_extent_size as u64, range.end);
1091                range = prefix.end..range.end;
1092                bytes += prefix.length()?;
1093                // We can assume the following are safe because we've checked both LSM tree and
1094                // temporary_allocations while holding the lock.
1095                inner.strategy.remove(prefix.clone());
1096                self.temporary_allocations.insert(AllocatorItem {
1097                    key: AllocatorKey { device_range: Extent(prefix.clone()) },
1098                    value: AllocatorValue::Abs { owner_object_id: INVALID_OBJECT_ID, count: 1 },
1099                })?;
1100                result.add_extent(prefix);
1101                if result.extents.len() == extents_per_batch {
1102                    break 'outer;
1103                }
1104            }
1105            if result.extents.len() == extents_per_batch {
1106                break 'outer;
1107            }
1108        }
1109        {
1110            let mut inner = self.inner.lock();
1111
1112            assert!(inner.trim_reserved_bytes == 0, "Multiple trims ongoing");
1113            inner.trim_listener = Some(listener);
1114            inner.trim_reserved_bytes = bytes;
1115            debug_assert!(
1116                (Saturating(inner.trim_reserved_bytes) + inner.unavailable_bytes()).0
1117                    <= self.device_size
1118            );
1119        }
1120        Ok(result)
1121    }
1122
1123    /// Returns all objects that exist in the parent store that pertain to this allocator.
1124    pub fn parent_objects(&self) -> Vec<u64> {
1125        // The allocator tree needs to store a file for each of the layers in the tree, so we return
1126        // those, since nothing else references them.
1127        self.inner.lock().info.layers.clone()
1128    }
1129
1130    /// Returns all the current owner byte limits (in pairs of `(owner_object_id, bytes)`).
1131    pub fn owner_byte_limits(&self) -> Vec<(u64, u64)> {
1132        self.inner.lock().info.limit_bytes.iter().map(|(k, v)| (*k, *v)).collect()
1133    }
1134
1135    /// Returns (allocated_bytes, byte_limit) for the given owner.
1136    pub fn owner_allocation_info(&self, owner_object_id: u64) -> (u64, Option<u64>) {
1137        let inner = self.inner.lock();
1138        (
1139            inner.owner_bytes.get(&owner_object_id).map(|b| b.used_bytes().0).unwrap_or(0u64),
1140            inner.info.limit_bytes.get(&owner_object_id).copied(),
1141        )
1142    }
1143
1144    /// Returns owner bytes debug information.
1145    pub fn owner_bytes_debug(&self) -> String {
1146        format!("{:?}", self.inner.lock().owner_bytes)
1147    }
1148
1149    fn needs_sync(&self) -> bool {
1150        // TODO(https://fxbug.dev/42178048): This will only trigger if *all* free space is taken up with
1151        // committed deallocated bytes, but we might want to trigger a sync if we're low and there
1152        // happens to be a lot of deallocated bytes as that might mean we can fully satisfy
1153        // allocation requests.
1154        let inner = self.inner.lock();
1155        inner.unavailable_bytes().0 >= self.device_size
1156    }
1157
1158    fn is_system_store(&self, owner_object_id: u64) -> bool {
1159        let fs = self.filesystem.upgrade().unwrap();
1160        owner_object_id == fs.object_manager().root_store_object_id()
1161            || owner_object_id == fs.object_manager().root_parent_store_object_id()
1162    }
1163
1164    /// Updates the accounting to track that a byte reservation has been moved out of an owner to
1165    /// the unattributed pool.
1166    pub fn disown_reservation(&self, old_owner_object_id: Option<u64>, amount: u64) {
1167        if old_owner_object_id.is_none() || amount == 0 {
1168            return;
1169        }
1170        // These 2 mutations should behave as though they're a single atomic mutation.
1171        let mut inner = self.inner.lock();
1172        inner.remove_reservation(old_owner_object_id, amount);
1173        inner.add_reservation(None, amount);
1174    }
1175
1176    /// Creates a lazy inspect node named `str` under `parent` which will yield statistics for the
1177    /// allocator when queried.
1178    pub fn track_statistics(self: &Arc<Self>, parent: &fuchsia_inspect::Node, name: &str) {
1179        let this = Arc::downgrade(self);
1180        parent.record_lazy_child(name, move || {
1181            let this_clone = this.clone();
1182            async move {
1183                let inspector = fuchsia_inspect::Inspector::default();
1184                if let Some(this) = this_clone.upgrade() {
1185                    let counters = this.counters.lock();
1186                    let root = inspector.root();
1187                    root.record_uint("max_extent_size_bytes", this.max_extent_size_bytes);
1188                    root.record_uint("bytes_total", this.device_size);
1189                    let (allocated, reserved, used, unavailable) = {
1190                        // TODO(https://fxbug.dev/42069513): Push-back or rate-limit to prevent DoS.
1191                        let inner = this.inner.lock();
1192                        (
1193                            inner.allocated_bytes().0,
1194                            inner.reserved_bytes(),
1195                            inner.used_bytes().0,
1196                            inner.unavailable_bytes().0,
1197                        )
1198                    };
1199                    root.record_uint("bytes_allocated", allocated);
1200                    root.record_uint("bytes_reserved", reserved);
1201                    root.record_uint("bytes_used", used);
1202                    root.record_uint("bytes_unavailable", unavailable);
1203
1204                    // TODO(https://fxbug.dev/42068224): Post-compute rather than manually computing
1205                    // metrics.
1206                    if let Some(x) = round_div(100 * allocated, this.device_size) {
1207                        root.record_uint("bytes_allocated_percent", x);
1208                    }
1209                    if let Some(x) = round_div(100 * reserved, this.device_size) {
1210                        root.record_uint("bytes_reserved_percent", x);
1211                    }
1212                    if let Some(x) = round_div(100 * used, this.device_size) {
1213                        root.record_uint("bytes_used_percent", x);
1214                    }
1215                    if let Some(x) = round_div(100 * unavailable, this.device_size) {
1216                        root.record_uint("bytes_unavailable_percent", x);
1217                    }
1218
1219                    root.record_uint("num_flushes", counters.num_flushes);
1220                    if let Some(last_flush_time) = counters.last_flush_time.as_ref() {
1221                        root.record_uint(
1222                            "last_flush_time_ms",
1223                            last_flush_time
1224                                .duration_since(std::time::UNIX_EPOCH)
1225                                .unwrap_or(std::time::Duration::ZERO)
1226                                .as_millis()
1227                                .try_into()
1228                                .unwrap_or(0u64),
1229                        );
1230                    }
1231
1232                    let data = this.allocation_size_histogram();
1233                    let alloc_sizes = root.create_uint_linear_histogram(
1234                        "allocation_size_histogram",
1235                        fuchsia_inspect::LinearHistogramParams {
1236                            floor: 1,
1237                            step_size: 1,
1238                            buckets: 64,
1239                        },
1240                    );
1241                    for (i, count) in data.iter().enumerate() {
1242                        if i != 0 {
1243                            alloc_sizes.insert_multiple(i as u64, *count as usize);
1244                        }
1245                    }
1246                    root.record(alloc_sizes);
1247
1248                    let data = this.inner.lock().rebuild_strategy_trigger_histogram;
1249                    let triggers = root.create_uint_linear_histogram(
1250                        "rebuild_strategy_triggers",
1251                        fuchsia_inspect::LinearHistogramParams {
1252                            floor: 1,
1253                            step_size: 1,
1254                            buckets: 64,
1255                        },
1256                    );
1257                    for (i, count) in data.iter().enumerate() {
1258                        if i != 0 {
1259                            triggers.insert_multiple(i as u64, *count as usize);
1260                        }
1261                    }
1262                    root.record(triggers);
1263
1264                    let allocator_ref = this.clone();
1265                    root.record_child("lsm_tree", move |node| {
1266                        allocator_ref.tree.record_inspect_data(node);
1267                    });
1268                }
1269                Ok(inspector)
1270            }
1271            .boxed()
1272        });
1273    }
1274
1275    /// Returns the offset of the first byte which has not been used by the allocator since its
1276    /// creation.
1277    /// NB: This does *not* take into account existing allocations.  This is only reliable when the
1278    /// allocator was created from scratch, without any pre-existing allocations.
1279    pub fn maximum_offset(&self) -> u64 {
1280        self.maximum_offset.load(Ordering::Relaxed)
1281    }
1282}
1283
1284impl Drop for Allocator {
1285    fn drop(&mut self) {
1286        let inner = self.inner.lock();
1287        // Uncommitted and reserved should be released back using RAII, so they should be zero.
1288        assert_eq!(inner.uncommitted_allocated_bytes(), 0);
1289        assert_eq!(inner.reserved_bytes(), 0);
1290    }
1291}
1292
1293#[fxfs_trace::trace]
1294impl Allocator {
1295    /// Returns the object ID for the allocator.
1296    pub fn object_id(&self) -> u64 {
1297        self.object_id
1298    }
1299
1300    /// Returns information about the allocator such as the layer files storing persisted
1301    /// allocations.
1302    pub fn info(&self) -> AllocatorInfo {
1303        self.inner.lock().info.clone()
1304    }
1305
1306    /// Tries to allocate enough space for |object_range| in the specified object and returns the
1307    /// device range allocated.
1308    /// The allocated range may be short (e.g. due to fragmentation), in which case the caller can
1309    /// simply call allocate again until they have enough blocks.
1310    ///
1311    /// We also store the object store ID of the store that the allocation should be assigned to so
1312    /// that we have a means to delete encrypted stores without needing the encryption key.
1313    #[trace]
1314    pub async fn allocate(
1315        self: &Arc<Self>,
1316        transaction: &mut Transaction<'_>,
1317        owner_object_id: u64,
1318        mut len: u64,
1319    ) -> Result<Range<u64>, Error> {
1320        ensure!(self.allocations_allowed.load(Ordering::SeqCst), FxfsError::Unavailable);
1321        assert_eq!(len % self.block_size, 0);
1322        len = std::cmp::min(len, self.max_extent_size_bytes);
1323        debug_assert_ne!(owner_object_id, INVALID_OBJECT_ID);
1324
1325        // Make sure we have space reserved before we try and find the space.
1326        let reservation = if let Some(reservation) = transaction.allocator_reservation {
1327            match reservation.owner_object_id {
1328                // If there is no owner, this must be a system store that we're allocating for.
1329                None => assert!(self.is_system_store(owner_object_id)),
1330                // If it has an owner, it should not be different than the allocating owner.
1331                Some(res_owner_object_id) => assert_eq!(owner_object_id, res_owner_object_id),
1332            };
1333            // Reservation limits aren't necessarily a multiple of the block size.
1334            let r = reservation
1335                .reserve_with(|limit| std::cmp::min(len, round_down(limit, self.block_size)));
1336            len = r.amount();
1337            Left(r)
1338        } else {
1339            let mut inner = self.inner.lock();
1340            assert!(inner.opened);
1341            // Do not exceed the limit for the owner or the device.
1342            let device_used = inner.used_bytes();
1343            let owner_bytes_left = inner.owner_id_bytes_left(owner_object_id);
1344            // We must take care not to use up space that might be reserved.
1345            let limit =
1346                std::cmp::min(owner_bytes_left, (Saturating(self.device_size) - device_used).0);
1347            len = round_down(std::cmp::min(len, limit), self.block_size);
1348            let owner_entry = inner.owner_bytes.entry(owner_object_id).or_default();
1349            owner_entry.reserved_bytes += len;
1350            Right(ReservationImpl::<_, Self>::new(&**self, Some(owner_object_id), len))
1351        };
1352
1353        ensure!(len > 0, FxfsError::NoSpace);
1354
1355        // If volumes have been deleted, flush the device so that we can use any of the freed space.
1356        let volumes_deleted = {
1357            let inner = self.inner.lock();
1358            (!inner.volumes_deleted_pending_sync.is_empty())
1359                .then(|| inner.volumes_deleted_pending_sync.clone())
1360        };
1361
1362        if let Some(volumes_deleted) = volumes_deleted {
1363            // No locks are held here, so in theory, there could be unnecessary syncs, but it
1364            // should be sufficiently rare that it won't matter.
1365            self.filesystem
1366                .upgrade()
1367                .unwrap()
1368                .sync(SyncOptions {
1369                    flush_device: true,
1370                    precondition: Some(Box::new(|| {
1371                        !self.inner.lock().volumes_deleted_pending_sync.is_empty()
1372                    })),
1373                    ..Default::default()
1374                })
1375                .await?;
1376
1377            {
1378                let mut inner = self.inner.lock();
1379                for owner_id in volumes_deleted {
1380                    inner.volumes_deleted_pending_sync.remove(&owner_id);
1381                    inner.marked_for_deletion.insert(owner_id);
1382                }
1383            }
1384
1385            let _guard = self.allocation_mutex.lock().await;
1386            self.rebuild_strategy().await?;
1387        }
1388
1389        #[allow(clippy::never_loop)] // Loop used as a for {} else {}.
1390        let _guard = 'sync: loop {
1391            // Cap number of sync attempts before giving up on finding free space.
1392            for _ in 0..10 {
1393                {
1394                    let guard = self.allocation_mutex.lock().await;
1395
1396                    if !self.needs_sync() {
1397                        break 'sync guard;
1398                    }
1399                }
1400
1401                // All the free space is currently tied up with deallocations, so we need to sync
1402                // and flush the device to free that up.
1403                //
1404                // We can't hold the allocation lock whilst we sync here because the allocation lock
1405                // is also taken in apply_mutations, which is called when journal locks are held,
1406                // and we call sync here which takes those same locks, so it would have the
1407                // potential to result in a deadlock.  Sync holds its own lock to guard against
1408                // multiple syncs occurring at the same time, and we can supply a precondition that
1409                // is evaluated under that lock to ensure we don't sync twice if we don't need to.
1410                self.filesystem
1411                    .upgrade()
1412                    .unwrap()
1413                    .sync(SyncOptions {
1414                        flush_device: true,
1415                        precondition: Some(Box::new(|| self.needs_sync())),
1416                        ..Default::default()
1417                    })
1418                    .await?;
1419            }
1420            bail!(
1421                anyhow!(FxfsError::NoSpace).context("Sync failed to yield sufficient free space.")
1422            );
1423        };
1424
1425        let mut trim_listener = None;
1426        {
1427            let mut inner = self.inner.lock();
1428            inner.allocation_size_histogram[std::cmp::min(63, len / self.block_size) as usize] += 1;
1429
1430            // If trimming would be the reason that this allocation gets cut short, wait for
1431            // trimming to complete before proceeding.
1432            let avail = self
1433                .device_size
1434                .checked_sub(inner.unavailable_bytes().0)
1435                .ok_or(FxfsError::Inconsistent)?;
1436            let free_and_not_being_trimmed =
1437                inner.bytes_available_not_being_trimmed(self.device_size)?;
1438            if free_and_not_being_trimmed < std::cmp::min(len, avail) {
1439                debug_assert!(inner.trim_reserved_bytes > 0);
1440                trim_listener = std::mem::take(&mut inner.trim_listener);
1441            }
1442        }
1443
1444        if let Some(listener) = trim_listener {
1445            listener.await;
1446        }
1447
1448        let result = loop {
1449            {
1450                let mut inner = self.inner.lock();
1451
1452                // While we know rebuild_strategy and take_for_trim are not running (we hold guard),
1453                // apply temporary_allocation removals.
1454                for device_range in inner.dropped_temporary_allocations.drain(..) {
1455                    self.temporary_allocations
1456                        .erase(&AllocatorKey { device_range: Extent(device_range) });
1457                }
1458
1459                match inner.strategy.allocate(len) {
1460                    Err(FxfsError::NotFound) => {
1461                        // Overflow. Fall through and rebuild
1462                        inner.rebuild_strategy_trigger_histogram
1463                            [std::cmp::min(63, (len / self.block_size) as usize)] += 1;
1464                    }
1465                    Err(err) => {
1466                        error!(err:%; "Likely filesystem corruption.");
1467                        return Err(err.into());
1468                    }
1469                    Ok(x) => {
1470                        break x;
1471                    }
1472                }
1473            }
1474            // We've run out of extents of the requested length in RAM but there
1475            // exists more of this size on device. Rescan device and circle back.
1476            // We already hold the allocation_mutex, so exclusive access is guaranteed.
1477            if !self.rebuild_strategy().await? {
1478                error!("Cannot find additional free space. Corruption?");
1479                return Err(FxfsError::Inconsistent.into());
1480            }
1481        };
1482
1483        debug!(device_range:? = result; "allocate");
1484
1485        let len = result.length().unwrap();
1486        let reservation_owner = reservation.either(
1487            // Left means we got an outside reservation.
1488            |l| {
1489                l.forget_some(len);
1490                l.owner_object_id()
1491            },
1492            |r| {
1493                r.forget_some(len);
1494                r.owner_object_id()
1495            },
1496        );
1497
1498        {
1499            let mut inner = self.inner.lock();
1500            let owner_entry = inner.owner_bytes.entry(owner_object_id).or_default();
1501            owner_entry.uncommitted_allocated_bytes += len;
1502            // If the reservation has an owner, ensure they are the same.
1503            assert_eq!(owner_object_id, reservation_owner.unwrap_or(owner_object_id));
1504            inner.remove_reservation(reservation_owner, len);
1505            self.temporary_allocations.insert(AllocatorItem {
1506                key: AllocatorKey { device_range: Extent(result.clone()) },
1507                value: AllocatorValue::Abs { owner_object_id, count: 1 },
1508            })?;
1509        }
1510
1511        let mutation =
1512            AllocatorMutation::Allocate { device_range: result.clone().into(), owner_object_id };
1513        assert!(transaction.add(self.object_id(), Mutation::Allocator(mutation)).is_none());
1514
1515        Ok(result)
1516    }
1517
1518    /// Marks the given device range as allocated.  The main use case for this at this time is for
1519    /// the super-block which needs to be at a fixed location on the device.
1520    #[trace]
1521    pub fn mark_allocated(
1522        &self,
1523        transaction: &mut Transaction<'_>,
1524        owner_object_id: u64,
1525        device_range: Range<u64>,
1526    ) -> Result<(), Error> {
1527        debug_assert_ne!(owner_object_id, INVALID_OBJECT_ID);
1528        {
1529            let len = device_range.length().map_err(|_| FxfsError::InvalidArgs)?;
1530
1531            let mut inner = self.inner.lock();
1532            let device_used = inner.used_bytes();
1533            let owner_id_bytes_left = inner.owner_id_bytes_left(owner_object_id);
1534            let owner_entry = inner.owner_bytes.entry(owner_object_id).or_default();
1535            ensure!(
1536                device_range.end <= self.device_size
1537                    && (Saturating(self.device_size) - device_used).0 >= len
1538                    && owner_id_bytes_left >= len,
1539                FxfsError::NoSpace
1540            );
1541            if let Some(reservation) = &mut transaction.allocator_reservation {
1542                // The transaction takes ownership of this hold.
1543                reservation.reserve(len).ok_or(FxfsError::NoSpace)?.forget();
1544            }
1545            owner_entry.uncommitted_allocated_bytes += len;
1546            inner.strategy.remove(device_range.clone());
1547            self.temporary_allocations.insert(AllocatorItem {
1548                key: AllocatorKey { device_range: Extent(device_range.clone()) },
1549                value: AllocatorValue::Abs { owner_object_id, count: 1 },
1550            })?;
1551        }
1552        let mutation =
1553            AllocatorMutation::Allocate { device_range: device_range.into(), owner_object_id };
1554        transaction.add(self.object_id(), Mutation::Allocator(mutation));
1555        Ok(())
1556    }
1557
1558    /// Sets the limits for an owner object in terms of usage.
1559    pub fn set_bytes_limit(
1560        &self,
1561        transaction: &mut Transaction<'_>,
1562        owner_object_id: u64,
1563        bytes: u64,
1564    ) -> Result<(), Error> {
1565        // System stores cannot be given limits.
1566        assert!(!self.is_system_store(owner_object_id));
1567        transaction.add(
1568            self.object_id(),
1569            Mutation::Allocator(AllocatorMutation::SetLimit { owner_object_id, bytes }),
1570        );
1571        Ok(())
1572    }
1573
1574    /// Gets the bytes limit for an owner object.
1575    pub fn get_owner_bytes_limit(&self, owner_object_id: u64) -> Option<u64> {
1576        self.inner.lock().info.limit_bytes.get(&owner_object_id).copied()
1577    }
1578
1579    /// Gets the number of bytes currently used by allocations, reservations or uncommitted
1580    /// allocations.
1581    pub fn get_owner_bytes_used(&self, owner_object_id: u64) -> u64 {
1582        self.inner.lock().owner_bytes.get(&owner_object_id).map_or(0, |info| info.used_bytes().0)
1583    }
1584
1585    /// Deallocates the given device range for the specified object.
1586    #[trace]
1587    pub async fn deallocate(
1588        &self,
1589        transaction: &mut Transaction<'_>,
1590        owner_object_id: u64,
1591        dealloc_range: Range<u64>,
1592    ) -> Result<u64, Error> {
1593        debug!(device_range:? = dealloc_range; "deallocate");
1594        ensure!(dealloc_range.is_valid(), FxfsError::InvalidArgs);
1595        // We don't currently support sharing of allocations (value.count always equals 1), so as
1596        // long as we can assume the deallocated range is actually allocated, we can avoid device
1597        // access.
1598        let deallocated = dealloc_range.end - dealloc_range.start;
1599        let mutation = AllocatorMutation::Deallocate {
1600            device_range: dealloc_range.clone().into(),
1601            owner_object_id,
1602        };
1603        transaction.add(self.object_id(), Mutation::Allocator(mutation));
1604
1605        let _guard = self.allocation_mutex.lock().await;
1606
1607        // We use `dropped_temporary_allocations` to defer removals from `temporary_allocations` in
1608        // places where we can't execute async code or take locks.
1609        //
1610        // It's important we don't ever remove entries from `temporary_allocations` without
1611        // holding the `allocation_mutex` lock or else we may end up with an inconsistent view of
1612        // available disk space when we combine temporary_allocations with the LSMTree.
1613        // This is normally done in `allocate()` but we also need to apply these here because
1614        // `temporary_allocations` is also used to track deallocated space until it has been
1615        // flushed (see comment below). A user may allocate and then deallocate space before calling
1616        // allocate() a second time, so if we do not clean up here, we may end up with the same
1617        // range in temporary_allocations twice (once for allocate, once for deallocate).
1618        let mut inner = self.inner.lock();
1619        for device_range in inner.dropped_temporary_allocations.drain(..) {
1620            self.temporary_allocations.erase(&AllocatorKey { device_range: Extent(device_range) });
1621        }
1622
1623        // We can't reuse deallocated space immediately because failure to successfully flush will
1624        // mean that on next mount, we may find this space is still assigned to the deallocated
1625        // region. To avoid immediate reuse, we hold these regions in 'temporary_allocations' until
1626        // after a successful flush so we know the region is safe to reuse.
1627        self.temporary_allocations
1628            .insert(AllocatorItem {
1629                key: AllocatorKey { device_range: Extent(dealloc_range.clone()) },
1630                value: AllocatorValue::Abs { owner_object_id, count: 1 },
1631            })
1632            .context("tracking deallocated")?;
1633
1634        Ok(deallocated)
1635    }
1636
1637    /// Marks allocations associated with a given |owner_object_id| for deletion.
1638    ///
1639    /// This is used as part of deleting encrypted volumes (ObjectStore) without having the keys.
1640    ///
1641    /// MarkForDeletion mutations eventually manipulates allocator metadata (AllocatorInfo) instead
1642    /// of the mutable layer but we must be careful not to do this too early and risk premature
1643    /// reuse of extents.
1644    ///
1645    /// Replay is not guaranteed until the *device* gets flushed, so we cannot reuse the deleted
1646    /// extents until we've flushed the device.
1647    ///
1648    /// TODO(b/316827348): Consider removing the use of mark_for_deletion in AllocatorInfo and
1649    /// just compacting?
1650    ///
1651    /// After an allocator.flush() (i.e. a major compaction), we know that there is no data left
1652    /// in the layer files for this owner_object_id and we are able to clear `marked_for_deletion`.
1653    pub fn mark_for_deletion(&self, transaction: &mut Transaction<'_>, owner_object_id: u64) {
1654        // Note that because the actual time of deletion (the next major compaction) is undefined,
1655        // |owner_object_id| should not be reused after this call.
1656        transaction.add(
1657            self.object_id(),
1658            Mutation::Allocator(AllocatorMutation::MarkForDeletion(owner_object_id)),
1659        );
1660    }
1661
1662    /// Called when the device has been flush and indicates what the journal log offset was when
1663    /// that happened.
1664    pub fn did_flush_device(&self, flush_log_offset: u64) {
1665        // First take out the deallocations that we now know to be flushed.  The list is maintained
1666        // in order, so we can stop on the first entry that we find that should not be unreserved
1667        // yet.
1668        #[allow(clippy::never_loop)] // Loop used as a for {} else {}.
1669        let deallocs = 'deallocs_outer: loop {
1670            let mut inner = self.inner.lock();
1671            for (index, dealloc) in inner.committed_deallocated.iter().enumerate() {
1672                if dealloc.log_file_offset >= flush_log_offset {
1673                    let mut deallocs = inner.committed_deallocated.split_off(index);
1674                    // Swap because we want the opposite of what split_off does.
1675                    std::mem::swap(&mut inner.committed_deallocated, &mut deallocs);
1676                    break 'deallocs_outer deallocs;
1677                }
1678            }
1679            break std::mem::take(&mut inner.committed_deallocated);
1680        };
1681
1682        // Now we can free those elements.
1683        let mut inner = self.inner.lock();
1684        let mut totals = BTreeMap::<u64, u64>::new();
1685        for dealloc in deallocs {
1686            *(totals.entry(dealloc.owner_object_id).or_default()) +=
1687                dealloc.range.length().unwrap();
1688            inner.strategy.free(dealloc.range.clone()).expect("dealloced ranges");
1689            self.temporary_allocations
1690                .erase(&AllocatorKey { device_range: Extent(dealloc.range.clone()) });
1691        }
1692
1693        // This *must* come after we've removed the records from reserved reservations because the
1694        // allocator uses this value to decide whether or not a device-flush is required and it must
1695        // be possible to find free space if it thinks no device-flush is required.
1696        for (owner_object_id, total) in totals {
1697            match inner.owner_bytes.get_mut(&owner_object_id) {
1698                Some(counters) => counters.committed_deallocated_bytes -= total,
1699                None => {
1700                    // This should only be possible if the volume has been deleted and a sync is
1701                    // pending.
1702                    assert!(inner.volumes_deleted_pending_sync.contains(&owner_object_id));
1703                }
1704            }
1705        }
1706    }
1707
1708    /// Returns a reservation that can be used later, or None if there is insufficient space. The
1709    /// |owner_object_id| indicates which object in the root object store the reservation is for.
1710    pub fn reserve(
1711        self: Arc<Self>,
1712        owner_object_id: Option<u64>,
1713        amount: u64,
1714    ) -> Option<Reservation> {
1715        {
1716            let mut inner = self.inner.lock();
1717
1718            let device_free = (Saturating(self.device_size) - inner.used_bytes()).0;
1719
1720            let limit = match owner_object_id {
1721                Some(id) => std::cmp::min(inner.owner_id_bytes_left(id), device_free),
1722                None => device_free,
1723            };
1724            if limit < amount {
1725                return None;
1726            }
1727            inner.add_reservation(owner_object_id, amount);
1728        }
1729        Some(Reservation::new(self, owner_object_id, amount))
1730    }
1731
1732    /// Like reserve, but takes a callback is passed the |limit| and should return the amount,
1733    /// which can be zero.
1734    pub fn reserve_with(
1735        self: Arc<Self>,
1736        owner_object_id: Option<u64>,
1737        amount: impl FnOnce(u64) -> u64,
1738    ) -> Reservation {
1739        let amount = {
1740            let mut inner = self.inner.lock();
1741
1742            let device_free = (Saturating(self.device_size) - inner.used_bytes()).0;
1743
1744            let amount = amount(match owner_object_id {
1745                Some(id) => std::cmp::min(inner.owner_id_bytes_left(id), device_free),
1746                None => device_free,
1747            });
1748
1749            inner.add_reservation(owner_object_id, amount);
1750
1751            amount
1752        };
1753
1754        Reservation::new(self, owner_object_id, amount)
1755    }
1756
1757    /// Returns the total number of allocated bytes.
1758    pub fn get_allocated_bytes(&self) -> u64 {
1759        self.inner.lock().allocated_bytes().0
1760    }
1761
1762    /// Returns the size of bytes available to allocate.
1763    pub fn get_disk_bytes(&self) -> u64 {
1764        self.device_size
1765    }
1766
1767    /// Returns the total number of allocated bytes per owner_object_id.
1768    /// Note that this is quite an expensive operation as it copies the collection.
1769    /// This is intended for use in fsck() and friends, not general use code.
1770    pub fn get_owner_allocated_bytes(&self) -> BTreeMap<u64, u64> {
1771        self.inner.lock().owner_bytes.iter().map(|(k, v)| (*k, v.allocated_bytes.0)).collect()
1772    }
1773
1774    /// Returns the number of allocated and reserved bytes.
1775    pub fn get_used_bytes(&self) -> Saturating<u64> {
1776        let inner = self.inner.lock();
1777        inner.used_bytes()
1778    }
1779}
1780
1781impl ReservationOwner for Allocator {
1782    fn release_reservation(&self, owner_object_id: Option<u64>, amount: u64) {
1783        self.inner.lock().remove_reservation(owner_object_id, amount);
1784    }
1785}
1786
1787#[async_trait]
1788impl JournalingObject for Allocator {
1789    fn apply_mutation(
1790        &self,
1791        mutation: Mutation,
1792        context: &ApplyContext<'_, '_>,
1793        _assoc_obj: AssocObj<'_>,
1794    ) -> Result<(), Error> {
1795        match mutation {
1796            Mutation::Allocator(AllocatorMutation::MarkForDeletion(owner_object_id)) => {
1797                let mut inner = self.inner.lock();
1798                inner.owner_bytes.remove(&owner_object_id);
1799
1800                // We use `info.marked_for_deletion` to track the committed state and
1801                // `inner.marked_for_deletion` to track volumes marked for deletion *after* we have
1802                // flushed the device.  It is not safe to use extents belonging to deleted volumes
1803                // until after we have flushed the device.
1804                inner.info.marked_for_deletion.insert(owner_object_id);
1805                inner.volumes_deleted_pending_sync.insert(owner_object_id);
1806
1807                inner.info.limit_bytes.remove(&owner_object_id);
1808            }
1809            Mutation::Allocator(AllocatorMutation::Allocate { device_range, owner_object_id }) => {
1810                self.maximum_offset.fetch_max(device_range.end, Ordering::Relaxed);
1811                let item = AllocatorItem {
1812                    key: AllocatorKey { device_range: Extent(device_range.0.clone()) },
1813                    value: AllocatorValue::Abs { count: 1, owner_object_id },
1814                };
1815                let len = item.key.device_range.length().unwrap();
1816                let lower_bound = item.key.lower_bound_for_merge_into();
1817                self.tree.merge_into(item, &lower_bound);
1818                let mut inner = self.inner.lock();
1819                let entry = inner.owner_bytes.entry(owner_object_id).or_default();
1820                entry.allocated_bytes += len;
1821                if let ApplyMode::Live(transaction) = context.mode {
1822                    entry.uncommitted_allocated_bytes -= len;
1823                    // Note that we cannot drop entries from temporary_allocations without holding
1824                    // the allocation_mutex as it may introduce races. We instead add the range to
1825                    // a Vec that can be applied later when we hold the lock (See comment on
1826                    // `dropped_temporary_allocations` above).
1827                    inner.dropped_temporary_allocations.push(device_range.0);
1828                    if let Some(reservation) = transaction.allocator_reservation {
1829                        reservation.commit(len);
1830                    }
1831                }
1832            }
1833            Mutation::Allocator(AllocatorMutation::Deallocate {
1834                device_range,
1835                owner_object_id,
1836            }) => {
1837                let item = AllocatorItem {
1838                    key: AllocatorKey { device_range: Extent(device_range.0) },
1839                    value: AllocatorValue::None,
1840                };
1841                let len = item.key.device_range.length().unwrap();
1842
1843                {
1844                    let mut inner = self.inner.lock();
1845                    {
1846                        let entry = inner.owner_bytes.entry(owner_object_id).or_default();
1847                        entry.allocated_bytes -= len;
1848                        if context.mode.is_live() {
1849                            entry.committed_deallocated_bytes += len;
1850                        }
1851                    }
1852                    if context.mode.is_live() {
1853                        inner.committed_deallocated.push_back(CommittedDeallocation {
1854                            log_file_offset: context.checkpoint.file_offset,
1855                            range: (*item.key.device_range).clone(),
1856                            owner_object_id,
1857                        });
1858                    }
1859                    if let ApplyMode::Live(Transaction {
1860                        allocator_reservation: Some(reservation),
1861                        ..
1862                    }) = context.mode
1863                    {
1864                        inner.add_reservation(reservation.owner_object_id(), len);
1865                        reservation.add(len);
1866                    }
1867                }
1868                let lower_bound = item.key.lower_bound_for_merge_into();
1869                self.tree.merge_into(item, &lower_bound);
1870            }
1871            Mutation::Allocator(AllocatorMutation::SetLimit { owner_object_id, bytes }) => {
1872                // Journal replay is ordered and each of these calls is idempotent. So the last one
1873                // will be respected, it doesn't matter if the value is already set, or gets changed
1874                // multiple times during replay. When it gets opened it will be merged in with the
1875                // snapshot.
1876                self.inner.lock().info.limit_bytes.insert(owner_object_id, bytes);
1877            }
1878            Mutation::BeginFlush => {
1879                self.tree.seal();
1880                // Transfer our running count for allocated_bytes so that it gets written to the new
1881                // info file when flush completes.
1882                let mut inner = self.inner.lock();
1883                let allocated_bytes =
1884                    inner.owner_bytes.iter().map(|(k, v)| (*k, v.allocated_bytes.0)).collect();
1885                inner.info.allocated_bytes = allocated_bytes;
1886            }
1887            Mutation::EndFlush => {}
1888            _ => bail!("unexpected mutation: {:?}", mutation),
1889        }
1890        Ok(())
1891    }
1892
1893    fn drop_mutation(&self, mutation: Mutation, transaction: &Transaction<'_>) {
1894        match mutation {
1895            Mutation::Allocator(AllocatorMutation::Allocate { device_range, owner_object_id }) => {
1896                let len = device_range.length().unwrap();
1897                let mut inner = self.inner.lock();
1898                inner
1899                    .owner_bytes
1900                    .entry(owner_object_id)
1901                    .or_default()
1902                    .uncommitted_allocated_bytes -= len;
1903                if let Some(reservation) = transaction.allocator_reservation {
1904                    let res_owner = reservation.owner_object_id();
1905                    inner.add_reservation(res_owner, len);
1906                    reservation.release_reservation(res_owner, len);
1907                }
1908                inner.strategy.free(device_range.0.clone()).expect("drop mutaton");
1909                self.temporary_allocations
1910                    .erase(&AllocatorKey { device_range: Extent(device_range.0) });
1911            }
1912            Mutation::Allocator(AllocatorMutation::Deallocate { device_range, .. }) => {
1913                self.temporary_allocations
1914                    .erase(&AllocatorKey { device_range: Extent(device_range.0) });
1915            }
1916            _ => {}
1917        }
1918    }
1919
1920    async fn flush(&self) -> Result<Version, Error> {
1921        let filesystem = self.filesystem.upgrade().unwrap();
1922        let object_manager = filesystem.object_manager();
1923        let earliest_version = self.tree.get_earliest_version();
1924        if !object_manager.needs_flush(self.object_id()) && earliest_version == LATEST_VERSION {
1925            // Early exit, but still return the earliest version used by a struct in the tree
1926            return Ok(earliest_version);
1927        }
1928
1929        let fs = self.filesystem.upgrade().unwrap();
1930        let mut flusher = Flusher::new(self, &fs).await;
1931        let (new_layer_file, info) = flusher.start().await?;
1932        flusher.finish(new_layer_file, info).await
1933    }
1934}
1935
1936// The merger is unable to merge extents that exist like the following:
1937//
1938//     |----- +1 -----|
1939//                    |----- -1 -----|
1940//                    |----- +2 -----|
1941//
1942// It cannot coalesce them because it has to emit the +1 record so that it can move on and merge the
1943// -1 and +2 records. To address this, we add another stage that applies after merging which
1944// coalesces records after they have been emitted.  This is a bit simpler than merging because the
1945// records cannot overlap, so it's just a question of merging adjacent records if they happen to
1946// have the same delta and object_id.
1947
1948pub struct CoalescingIterator<I> {
1949    iter: I,
1950    item: Option<AllocatorItem>,
1951}
1952
1953impl<I: LayerIterator<AllocatorKey, AllocatorValue>> CoalescingIterator<I> {
1954    pub async fn new(iter: I) -> Result<CoalescingIterator<I>, Error> {
1955        let mut iter = Self { iter, item: None };
1956        iter.advance().await?;
1957        Ok(iter)
1958    }
1959}
1960
1961#[async_trait]
1962impl<I: LayerIterator<AllocatorKey, AllocatorValue>> LayerIterator<AllocatorKey, AllocatorValue>
1963    for CoalescingIterator<I>
1964{
1965    async fn advance(&mut self) -> Result<(), Error> {
1966        self.item = self.iter.get().map(|x| x.cloned());
1967        if self.item.is_none() {
1968            return Ok(());
1969        }
1970        let left = self.item.as_mut().unwrap();
1971        loop {
1972            self.iter.advance().await?;
1973            match self.iter.get() {
1974                None => return Ok(()),
1975                Some(right) => {
1976                    // The two records cannot overlap.
1977                    ensure!(
1978                        left.key.device_range.end <= right.key.device_range.start,
1979                        FxfsError::Inconsistent
1980                    );
1981                    // We can only coalesce records if they are touching and have the same value.
1982                    if left.key.device_range.end < right.key.device_range.start
1983                        || left.value != *right.value
1984                    {
1985                        return Ok(());
1986                    }
1987                    left.key.device_range.end = right.key.device_range.end;
1988                }
1989            }
1990        }
1991    }
1992
1993    fn get(&self) -> Option<ItemRef<'_, AllocatorKey, AllocatorValue>> {
1994        self.item.as_ref().map(|x| x.as_item_ref())
1995    }
1996}
1997
1998struct Flusher<'a> {
1999    allocator: &'a Allocator,
2000    fs: &'a Arc<FxFilesystem>,
2001    _guard: WriteGuard<'a>,
2002}
2003
2004impl<'a> Flusher<'a> {
2005    async fn new(allocator: &'a Allocator, fs: &'a Arc<FxFilesystem>) -> Self {
2006        let keys = lock_keys![LockKey::flush(allocator.object_id())];
2007        Self { allocator, fs, _guard: fs.lock_manager().write_lock(keys).await }
2008    }
2009
2010    fn txn_options(allocator_reservation: &Reservation) -> Options<'_> {
2011        Options {
2012            skip_journal_checks: true,
2013            borrow_metadata_space: true,
2014            allocator_reservation: Some(allocator_reservation),
2015            ..Default::default()
2016        }
2017    }
2018
2019    async fn start(&mut self) -> Result<(DataObjectHandle<ObjectStore>, AllocatorInfo), Error> {
2020        let object_manager = self.fs.object_manager();
2021        let mut transaction = self
2022            .fs
2023            .clone()
2024            .new_transaction(lock_keys![], Self::txn_options(object_manager.metadata_reservation()))
2025            .await?;
2026
2027        let root_store = self.fs.root_store();
2028        let layer_object_handle = ObjectStore::create_object(
2029            &root_store,
2030            &mut transaction,
2031            HandleOptions { skip_journal_checks: true, ..Default::default() },
2032            None,
2033        )
2034        .await?;
2035        root_store.add_to_graveyard(&mut transaction, layer_object_handle.object_id());
2036        // It's important that this transaction does not include any allocations because we use
2037        // BeginFlush as a snapshot point for mutations to the tree: other allocator mutations
2038        // within this transaction might get applied before seal (which would be OK), but they could
2039        // equally get applied afterwards (since Transaction makes no guarantees about the order in
2040        // which mutations are applied whilst committing), in which case they'd get lost on replay
2041        // because the journal will only send mutations that follow this transaction.
2042        transaction.add(self.allocator.object_id(), Mutation::BeginFlush);
2043        let info = transaction
2044            .commit_with_callback(|_| {
2045                // We must capture `info` as it is when we start flushing. Subsequent transactions
2046                // can end up modifying `info` and we shouldn't capture those here.
2047                self.allocator.inner.lock().info.clone()
2048            })
2049            .await?;
2050        Ok((layer_object_handle, info))
2051    }
2052
2053    async fn finish(
2054        self,
2055        layer_object_handle: DataObjectHandle<ObjectStore>,
2056        mut info: AllocatorInfo,
2057    ) -> Result<Version, Error> {
2058        let object_manager = self.fs.object_manager();
2059        let txn_options = Self::txn_options(object_manager.metadata_reservation());
2060
2061        let layer_set = self.allocator.tree.immutable_layer_set();
2062        let total_len = layer_set.sum_len();
2063        {
2064            let start_time = std::time::Instant::now();
2065            let merged_layer_count = layer_set.layers.len();
2066            let mut merger = layer_set.merger();
2067            let iter = self.allocator.filter(merger.query(Query::FullScan).await?, true).await?;
2068            let iter = CoalescingIterator::new(iter).await?;
2069            let bytes_written = compact_with_iterator(
2070                iter,
2071                total_len,
2072                DirectWriter::new(&layer_object_handle, txn_options).await,
2073                layer_object_handle.block_size(),
2074                Some(self.fs.journal().get_compaction_yielder()),
2075            )
2076            .await?;
2077
2078            self.allocator.tree.report_compaction_metrics(
2079                bytes_written,
2080                start_time.elapsed(),
2081                merged_layer_count,
2082            );
2083        }
2084
2085        let root_store = self.fs.root_store();
2086
2087        // Both of these forward-declared variables need to outlive the transaction.
2088        let object_handle;
2089        let reservation_update;
2090        let mut transaction = self
2091            .fs
2092            .clone()
2093            .new_transaction(
2094                lock_keys![LockKey::object(
2095                    root_store.store_object_id(),
2096                    self.allocator.object_id()
2097                )],
2098                txn_options,
2099            )
2100            .await?;
2101        let mut serialized_info = Vec::new();
2102
2103        debug!(oid = layer_object_handle.object_id(); "new allocator layer file");
2104        object_handle = ObjectStore::open_object(
2105            &root_store,
2106            self.allocator.object_id(),
2107            HandleOptions::default(),
2108            None,
2109        )
2110        .await?;
2111
2112        // Move all the existing layers to the graveyard.
2113        for object_id in &info.layers {
2114            root_store.add_to_graveyard(&mut transaction, *object_id);
2115        }
2116
2117        // Write out updated info.
2118
2119        // After successfully flushing, all the stores that were marked for deletion at the time of
2120        // the BeginFlush transaction, no longer need to be marked for deletion.  There can be
2121        // stores that have been deleted since the BeginFlush transaction, but they will be covered
2122        // by a MarkForDeletion mutation.
2123        let marked_for_deletion = std::mem::take(&mut info.marked_for_deletion);
2124
2125        info.layers = vec![layer_object_handle.object_id()];
2126
2127        info.serialize_with_version(&mut serialized_info)?;
2128
2129        let mut buf = object_handle.allocate_buffer(serialized_info.len()).await;
2130        buf.as_mut_slice()[..serialized_info.len()].copy_from_slice(&serialized_info[..]);
2131        object_handle.txn_write(&mut transaction, 0u64, buf.as_ref()).await?;
2132
2133        reservation_update = ReservationUpdate::new(tree::reservation_amount_from_layer_size(
2134            layer_object_handle.get_size(),
2135        ));
2136
2137        // It's important that EndFlush is in the same transaction that we write AllocatorInfo,
2138        // because we use EndFlush to make the required adjustments to allocated_bytes.
2139        transaction.add_with_object(
2140            self.allocator.object_id(),
2141            Mutation::EndFlush,
2142            AssocObj::Borrowed(&reservation_update),
2143        );
2144        root_store.remove_from_graveyard(&mut transaction, layer_object_handle.object_id());
2145
2146        let layers = layers_from_handles([layer_object_handle]).await?;
2147        transaction
2148            .commit_with_callback(|_| {
2149                self.allocator.tree.set_layers(layers);
2150
2151                // At this point we've committed the new layers to disk so we can start using them.
2152                // This means we can also switch to the new AllocatorInfo which clears
2153                // marked_for_deletion.
2154                let mut inner = self.allocator.inner.lock();
2155                inner.info.layers = info.layers;
2156                for owner_id in marked_for_deletion {
2157                    inner.marked_for_deletion.remove(&owner_id);
2158                    inner.info.marked_for_deletion.remove(&owner_id);
2159                }
2160            })
2161            .await?;
2162
2163        // Now close the layers and purge them.
2164        for layer in layer_set.layers {
2165            let object_id = layer.handle().map(|h| h.object_id());
2166            layer.close_layer().await;
2167            if let Some(object_id) = object_id {
2168                root_store.tombstone_object(object_id, txn_options).await?;
2169            }
2170        }
2171
2172        let mut counters = self.allocator.counters.lock();
2173        counters.num_flushes += 1;
2174        counters.last_flush_time = Some(std::time::SystemTime::now());
2175        // Return the earliest version used by a struct in the tree
2176        Ok(self.allocator.tree.get_earliest_version())
2177    }
2178}
2179
2180#[cfg(test)]
2181mod tests {
2182    use crate::filesystem::{
2183        FxFilesystem, FxFilesystemBuilder, JournalingObject, OpenFxFilesystem,
2184    };
2185    use crate::fsck::fsck;
2186    use crate::lsm_tree::cache::NullCache;
2187    use crate::lsm_tree::skip_list_layer::SkipListLayer;
2188    use crate::lsm_tree::types::{FuzzyHash as _, Item, ItemRef, Layer, LayerIterator};
2189    use crate::lsm_tree::{LSMTree, Query};
2190    use crate::object_handle::ObjectHandle;
2191    use crate::object_store::allocator::merge::merge;
2192    use crate::object_store::allocator::{
2193        Allocator, AllocatorKey, AllocatorValue, CoalescingIterator,
2194    };
2195    use crate::object_store::transaction::{Options, TRANSACTION_METADATA_MAX_AMOUNT, lock_keys};
2196    use crate::object_store::volume::root_volume;
2197    use crate::object_store::{Directory, LockKey, NewChildStoreOptions, ObjectStore};
2198    use crate::range::RangeExt;
2199    use crate::round::round_up;
2200    use crate::serialized_types::{LATEST_VERSION, Versioned};
2201    use crate::testing;
2202    use bincode::Options as _;
2203    use fuchsia_async as fasync;
2204    use fuchsia_sync::Mutex;
2205    use std::cmp::{max, min};
2206    use std::ops::{Bound, Range};
2207    use std::sync::Arc;
2208    use storage_device::DeviceHolder;
2209    use storage_device::fake_device::FakeDevice;
2210
2211    #[test]
2212    fn test_allocator_key_is_range_based() {
2213        // Make sure we disallow using allocator keys with point queries.
2214        assert!(AllocatorKey { device_range: (0..100).into() }.is_range_key());
2215    }
2216
2217    #[test]
2218    fn test_allocator_key_serialization_compatibility() {
2219        let range = 1024..2048;
2220        let key = AllocatorKey { device_range: range.clone().into() };
2221
2222        // 1. Serialize the range directly using bincode (which is what the manual
2223        //    Versioned implementation did)
2224        let options = bincode::DefaultOptions::new().allow_trailing_bytes();
2225        let expected_bytes = options.serialize(&range).unwrap();
2226
2227        // 2. Serialize AllocatorKey via our derived Versioned implementation
2228        let mut key_bytes = Vec::new();
2229        key.serialize_into(&mut key_bytes).unwrap();
2230
2231        // Verify they are byte-compatible
2232        assert_eq!(key_bytes, expected_bytes);
2233
2234        // 3. Deserialize a Range<u64> as AllocatorKey via our derived Versioned
2235        //    implementation
2236        let deserialized_key =
2237            AllocatorKey::deserialize_from(&mut &expected_bytes[..], LATEST_VERSION).unwrap();
2238        assert_eq!(deserialized_key, key);
2239    }
2240
2241    #[fuchsia::test]
2242    async fn test_coalescing_iterator() {
2243        let skip_list = SkipListLayer::new(100);
2244        let items = [
2245            Item::new(
2246                AllocatorKey { device_range: (0..100 * 512).into() },
2247                AllocatorValue::Abs { count: 1, owner_object_id: 99 },
2248            ),
2249            Item::new(
2250                AllocatorKey { device_range: (100 * 512..200 * 512).into() },
2251                AllocatorValue::Abs { count: 1, owner_object_id: 99 },
2252            ),
2253        ];
2254        skip_list.insert(items[1].clone()).expect("insert error");
2255        skip_list.insert(items[0].clone()).expect("insert error");
2256        let mut iter =
2257            CoalescingIterator::new(skip_list.seek(Bound::Unbounded).await.expect("seek failed"))
2258                .await
2259                .expect("new failed");
2260        let ItemRef { key, value, .. } = iter.get().expect("get failed");
2261        assert_eq!(
2262            (key, value),
2263            (
2264                &AllocatorKey { device_range: (0..200 * 512).into() },
2265                &AllocatorValue::Abs { count: 1, owner_object_id: 99 }
2266            )
2267        );
2268        iter.advance().await.expect("advance failed");
2269        assert!(iter.get().is_none());
2270    }
2271
2272    #[fuchsia::test]
2273    async fn test_merge_and_coalesce_across_three_layers() {
2274        let lsm_tree = LSMTree::new(merge, Box::new(NullCache {}));
2275        lsm_tree
2276            .insert(Item::new(
2277                AllocatorKey { device_range: (100 * 512..200 * 512).into() },
2278                AllocatorValue::Abs { count: 1, owner_object_id: 99 },
2279            ))
2280            .expect("insert error");
2281        lsm_tree.seal();
2282        lsm_tree
2283            .insert(Item::new(
2284                AllocatorKey { device_range: (0..100 * 512).into() },
2285                AllocatorValue::Abs { count: 1, owner_object_id: 99 },
2286            ))
2287            .expect("insert error");
2288
2289        let layer_set = lsm_tree.layer_set();
2290        let mut merger = layer_set.merger();
2291        let mut iter =
2292            CoalescingIterator::new(merger.query(Query::FullScan).await.expect("seek failed"))
2293                .await
2294                .expect("new failed");
2295        let ItemRef { key, value, .. } = iter.get().expect("get failed");
2296        assert_eq!(
2297            (key, value),
2298            (
2299                &AllocatorKey { device_range: (0..200 * 512).into() },
2300                &AllocatorValue::Abs { count: 1, owner_object_id: 99 }
2301            )
2302        );
2303        iter.advance().await.expect("advance failed");
2304        assert!(iter.get().is_none());
2305    }
2306
2307    #[fuchsia::test]
2308    async fn test_merge_and_coalesce_wont_merge_across_object_id() {
2309        let lsm_tree = LSMTree::new(merge, Box::new(NullCache {}));
2310        lsm_tree
2311            .insert(Item::new(
2312                AllocatorKey { device_range: (100 * 512..200 * 512).into() },
2313                AllocatorValue::Abs { count: 1, owner_object_id: 99 },
2314            ))
2315            .expect("insert error");
2316        lsm_tree.seal();
2317        lsm_tree
2318            .insert(Item::new(
2319                AllocatorKey { device_range: (0..100 * 512).into() },
2320                AllocatorValue::Abs { count: 1, owner_object_id: 98 },
2321            ))
2322            .expect("insert error");
2323
2324        let layer_set = lsm_tree.layer_set();
2325        let mut merger = layer_set.merger();
2326        let mut iter =
2327            CoalescingIterator::new(merger.query(Query::FullScan).await.expect("seek failed"))
2328                .await
2329                .expect("new failed");
2330        let ItemRef { key, value, .. } = iter.get().expect("get failed");
2331        assert_eq!(
2332            (key, value),
2333            (
2334                &AllocatorKey { device_range: (0..100 * 512).into() },
2335                &AllocatorValue::Abs { count: 1, owner_object_id: 98 },
2336            )
2337        );
2338        iter.advance().await.expect("advance failed");
2339        let ItemRef { key, value, .. } = iter.get().expect("get failed");
2340        assert_eq!(
2341            (key, value),
2342            (
2343                &AllocatorKey { device_range: (100 * 512..200 * 512).into() },
2344                &AllocatorValue::Abs { count: 1, owner_object_id: 99 }
2345            )
2346        );
2347        iter.advance().await.expect("advance failed");
2348        assert!(iter.get().is_none());
2349    }
2350
2351    fn overlap(a: &Range<u64>, b: &Range<u64>) -> u64 {
2352        if a.end > b.start && a.start < b.end {
2353            min(a.end, b.end) - max(a.start, b.start)
2354        } else {
2355            0
2356        }
2357    }
2358
2359    async fn collect_allocations(allocator: &Allocator) -> Vec<Range<u64>> {
2360        let layer_set = allocator.tree.layer_set();
2361        let mut merger = layer_set.merger();
2362        let mut iter = allocator
2363            .filter(merger.query(Query::FullScan).await.expect("seek failed"), false)
2364            .await
2365            .expect("build iterator");
2366        let mut allocations: Vec<Range<u64>> = Vec::new();
2367        while let Some(ItemRef { key: AllocatorKey { device_range }, .. }) = iter.get() {
2368            if let Some(r) = allocations.last() {
2369                assert!(device_range.start >= r.end);
2370            }
2371            allocations.push(device_range.clone().into());
2372            iter.advance().await.expect("advance failed");
2373        }
2374        allocations
2375    }
2376
2377    async fn check_allocations(allocator: &Allocator, expected_allocations: &[Range<u64>]) {
2378        let layer_set = allocator.tree.layer_set();
2379        let mut merger = layer_set.merger();
2380        let mut iter = allocator
2381            .filter(merger.query(Query::FullScan).await.expect("seek failed"), false)
2382            .await
2383            .expect("build iterator");
2384        let mut found = 0;
2385        while let Some(ItemRef { key: AllocatorKey { device_range }, .. }) = iter.get() {
2386            let mut l = device_range.length().expect("Invalid range");
2387            found += l;
2388            // Make sure that the entire range we have found completely overlaps with all the
2389            // allocations we expect to find.
2390            for range in expected_allocations {
2391                l -= overlap(range, device_range);
2392                if l == 0 {
2393                    break;
2394                }
2395            }
2396            assert_eq!(l, 0, "range {:?} not covered by expectations", device_range);
2397            iter.advance().await.expect("advance failed");
2398        }
2399        // Make sure the total we found adds up to what we expect.
2400        assert_eq!(found, expected_allocations.iter().map(|r| r.length().unwrap()).sum::<u64>());
2401    }
2402
2403    async fn test_fs() -> (OpenFxFilesystem, Arc<Allocator>) {
2404        let device = DeviceHolder::new(FakeDevice::new(4096, 4096));
2405        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2406        let allocator = fs.allocator();
2407        (fs, allocator)
2408    }
2409
2410    #[fuchsia::test]
2411    async fn test_allocations() {
2412        const STORE_OBJECT_ID: u64 = 99;
2413        let (fs, allocator) = test_fs().await;
2414        let mut transaction =
2415            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2416        let mut device_ranges = collect_allocations(&allocator).await;
2417
2418        // Expected extents:
2419        let expected = vec![
2420            0..4096,        // Superblock A (4k)
2421            4096..139264,   // root_store layer files, StoreInfo.. (33x4k blocks)
2422            139264..204800, // Superblock A extension (64k)
2423            204800..335872, // Initial Journal (128k)
2424            335872..401408, // Superblock B extension (64k)
2425            524288..528384, // Superblock B (4k)
2426        ];
2427        assert_eq!(device_ranges, expected);
2428        device_ranges.push(
2429            allocator
2430                .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2431                .await
2432                .expect("allocate failed"),
2433        );
2434        assert_eq!(device_ranges.last().unwrap().length().expect("Invalid range"), fs.block_size());
2435        device_ranges.push(
2436            allocator
2437                .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2438                .await
2439                .expect("allocate failed"),
2440        );
2441        assert_eq!(device_ranges.last().unwrap().length().expect("Invalid range"), fs.block_size());
2442        assert_eq!(overlap(&device_ranges[0], &device_ranges[1]), 0);
2443        transaction.commit().await.expect("commit failed");
2444        let mut transaction =
2445            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2446        device_ranges.push(
2447            allocator
2448                .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2449                .await
2450                .expect("allocate failed"),
2451        );
2452        assert_eq!(device_ranges[7].length().expect("Invalid range"), fs.block_size());
2453        assert_eq!(overlap(&device_ranges[5], &device_ranges[7]), 0);
2454        assert_eq!(overlap(&device_ranges[6], &device_ranges[7]), 0);
2455        transaction.commit().await.expect("commit failed");
2456
2457        check_allocations(&allocator, &device_ranges).await;
2458    }
2459
2460    #[fuchsia::test]
2461    async fn test_allocate_more_than_max_size() {
2462        const STORE_OBJECT_ID: u64 = 99;
2463        let (fs, allocator) = test_fs().await;
2464        let mut transaction =
2465            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2466        let mut device_ranges = collect_allocations(&allocator).await;
2467        device_ranges.push(
2468            allocator
2469                .allocate(&mut transaction, STORE_OBJECT_ID, fs.device().size())
2470                .await
2471                .expect("allocate failed"),
2472        );
2473        assert_eq!(
2474            device_ranges.last().unwrap().length().expect("Invalid range"),
2475            allocator.max_extent_size_bytes
2476        );
2477        transaction.commit().await.expect("commit failed");
2478
2479        check_allocations(&allocator, &device_ranges).await;
2480    }
2481
2482    #[fuchsia::test]
2483    async fn test_deallocations() {
2484        const STORE_OBJECT_ID: u64 = 99;
2485        let (fs, allocator) = test_fs().await;
2486        let initial_allocations = collect_allocations(&allocator).await;
2487
2488        let mut transaction =
2489            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2490        let device_range1 = allocator
2491            .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2492            .await
2493            .expect("allocate failed");
2494        assert_eq!(device_range1.length().expect("Invalid range"), fs.block_size());
2495        transaction.commit().await.expect("commit failed");
2496
2497        let mut transaction =
2498            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2499        allocator
2500            .deallocate(&mut transaction, STORE_OBJECT_ID, device_range1)
2501            .await
2502            .expect("deallocate failed");
2503        transaction.commit().await.expect("commit failed");
2504
2505        check_allocations(&allocator, &initial_allocations).await;
2506    }
2507
2508    #[fuchsia::test]
2509    async fn test_mark_allocated() {
2510        const STORE_OBJECT_ID: u64 = 99;
2511        let (fs, allocator) = test_fs().await;
2512        let mut device_ranges = collect_allocations(&allocator).await;
2513        let range = {
2514            let mut transaction = fs
2515                .clone()
2516                .new_transaction(lock_keys![], Options::default())
2517                .await
2518                .expect("new failed");
2519            // First, allocate 2 blocks.
2520            allocator
2521                .allocate(&mut transaction, STORE_OBJECT_ID, 2 * fs.block_size())
2522                .await
2523                .expect("allocate failed")
2524            // Let the transaction drop.
2525        };
2526
2527        let mut transaction =
2528            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2529
2530        // If we allocate 1 block, the two blocks that were allocated earlier should be available,
2531        // and this should return the first of them.
2532        device_ranges.push(
2533            allocator
2534                .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2535                .await
2536                .expect("allocate failed"),
2537        );
2538
2539        assert_eq!(device_ranges.last().unwrap().start, range.start);
2540
2541        // Mark the second block as allocated.
2542        let mut range2 = range.clone();
2543        range2.start += fs.block_size();
2544        allocator
2545            .mark_allocated(&mut transaction, STORE_OBJECT_ID, range2.clone())
2546            .expect("mark_allocated failed");
2547        device_ranges.push(range2);
2548
2549        // This should avoid the range we marked as allocated.
2550        device_ranges.push(
2551            allocator
2552                .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2553                .await
2554                .expect("allocate failed"),
2555        );
2556        let last_range = device_ranges.last().unwrap();
2557        assert_eq!(last_range.length().expect("Invalid range"), fs.block_size());
2558        assert_eq!(overlap(last_range, &range), 0);
2559        transaction.commit().await.expect("commit failed");
2560
2561        check_allocations(&allocator, &device_ranges).await;
2562    }
2563
2564    #[fuchsia::test]
2565    async fn test_mark_for_deletion() {
2566        const STORE_OBJECT_ID: u64 = 99;
2567        let (fs, allocator) = test_fs().await;
2568
2569        // Allocate some stuff.
2570        let initial_allocated_bytes = allocator.get_allocated_bytes();
2571        let mut device_ranges = collect_allocations(&allocator).await;
2572        let mut transaction =
2573            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2574        // Note we have a cap on individual allocation length so we allocate over multiple mutation.
2575        for _ in 0..15 {
2576            device_ranges.push(
2577                allocator
2578                    .allocate(&mut transaction, STORE_OBJECT_ID, 100 * fs.block_size())
2579                    .await
2580                    .expect("allocate failed"),
2581            );
2582            device_ranges.push(
2583                allocator
2584                    .allocate(&mut transaction, STORE_OBJECT_ID, 100 * fs.block_size())
2585                    .await
2586                    .expect("allocate2 failed"),
2587            );
2588        }
2589        transaction.commit().await.expect("commit failed");
2590        check_allocations(&allocator, &device_ranges).await;
2591
2592        assert_eq!(
2593            allocator.get_allocated_bytes(),
2594            initial_allocated_bytes + fs.block_size() * 3000
2595        );
2596
2597        // Mark for deletion.
2598        let mut transaction =
2599            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2600        allocator.mark_for_deletion(&mut transaction, STORE_OBJECT_ID);
2601        transaction.commit().await.expect("commit failed");
2602
2603        // Expect that allocated bytes is updated immediately but device ranges are still allocated.
2604        assert_eq!(allocator.get_allocated_bytes(), initial_allocated_bytes);
2605        check_allocations(&allocator, &device_ranges).await;
2606
2607        // Allocate more space than we have until we deallocate the mark_for_deletion space.
2608        // This should force a flush on allocate(). (1500 * 3 > test_fs size of 4096 blocks).
2609        device_ranges.clear();
2610
2611        let mut transaction =
2612            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2613        let target_bytes = 1500 * fs.block_size();
2614        while device_ranges.iter().map(|x| x.length().unwrap()).sum::<u64>() != target_bytes {
2615            let len = std::cmp::min(
2616                target_bytes - device_ranges.iter().map(|x| x.length().unwrap()).sum::<u64>(),
2617                100 * fs.block_size(),
2618            );
2619            device_ranges.push(
2620                allocator.allocate(&mut transaction, 100, len).await.expect("allocate failed"),
2621            );
2622        }
2623        transaction.commit().await.expect("commit failed");
2624
2625        // Have the deleted ranges cleaned up.
2626        allocator.flush().await.expect("flush failed");
2627
2628        // The flush above seems to trigger an allocation for the allocator itself.
2629        // We will just check that we have the right size for the owner we care about.
2630
2631        assert!(!allocator.get_owner_allocated_bytes().contains_key(&STORE_OBJECT_ID));
2632        assert_eq!(*allocator.get_owner_allocated_bytes().get(&100).unwrap(), target_bytes,);
2633    }
2634
2635    async fn create_file(store: &Arc<ObjectStore>, size: usize) {
2636        let root_directory =
2637            Directory::open(store, store.root_directory_object_id()).await.expect("open failed");
2638
2639        let mut transaction = store
2640            .filesystem()
2641            .new_transaction(
2642                lock_keys![LockKey::object(
2643                    store.store_object_id(),
2644                    store.root_directory_object_id()
2645                )],
2646                Options::default(),
2647            )
2648            .await
2649            .expect("new_transaction failed");
2650        let file = root_directory
2651            .create_child_file(&mut transaction, &format!("foo {}", size))
2652            .await
2653            .expect("create_child_file failed");
2654        transaction.commit().await.expect("commit failed");
2655
2656        let buffer = file.allocate_buffer(size).await;
2657
2658        // Append some data to it.
2659        let mut transaction = file
2660            .new_transaction_with_options(Options {
2661                borrow_metadata_space: true,
2662                ..Default::default()
2663            })
2664            .await
2665            .expect("new_transaction_with_options failed");
2666        file.txn_write(&mut transaction, 0, buffer.as_ref()).await.expect("txn_write failed");
2667        transaction.commit().await.expect("commit failed");
2668    }
2669
2670    #[fuchsia::test]
2671    async fn test_replay_with_deleted_store_and_compaction() {
2672        let (fs, _) = test_fs().await;
2673
2674        const FILE_SIZE: usize = 10_000_000;
2675
2676        let mut store_id = {
2677            let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2678            let store = root_vol
2679                .new_volume("vol", NewChildStoreOptions::default())
2680                .await
2681                .expect("new_volume failed");
2682
2683            create_file(&store, FILE_SIZE).await;
2684            store.store_object_id()
2685        };
2686
2687        fs.close().await.expect("close failed");
2688        let device = fs.take_device().await;
2689        device.reopen(false);
2690
2691        let mut fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2692
2693        // Compact so that when we replay the transaction to delete the store won't find any
2694        // mutations.
2695        fs.journal().force_compact().await.expect("compact failed");
2696
2697        for _ in 0..2 {
2698            {
2699                let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2700
2701                let transaction = fs
2702                    .clone()
2703                    .new_transaction(
2704                        lock_keys![
2705                            LockKey::object(
2706                                root_vol.volume_directory().store().store_object_id(),
2707                                root_vol.volume_directory().object_id(),
2708                            ),
2709                            LockKey::flush(store_id)
2710                        ],
2711                        Options { borrow_metadata_space: true, ..Default::default() },
2712                    )
2713                    .await
2714                    .expect("new_transaction failed");
2715                root_vol
2716                    .delete_volume("vol", transaction, || {})
2717                    .await
2718                    .expect("delete_volume failed");
2719
2720                let store = root_vol
2721                    .new_volume("vol", NewChildStoreOptions::default())
2722                    .await
2723                    .expect("new_volume failed");
2724                create_file(&store, FILE_SIZE).await;
2725                store_id = store.store_object_id();
2726            }
2727
2728            fs.close().await.expect("close failed");
2729            let device = fs.take_device().await;
2730            device.reopen(false);
2731
2732            fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2733        }
2734
2735        fsck(fs.clone()).await.expect("fsck failed");
2736        fs.close().await.expect("close failed");
2737    }
2738
2739    #[fuchsia::test(threads = 4)]
2740    async fn test_compaction_delete_race() {
2741        let (fs, _allocator) = test_fs().await;
2742
2743        {
2744            const FILE_SIZE: usize = 10_000_000;
2745
2746            let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2747            let store = root_vol
2748                .new_volume("vol", NewChildStoreOptions::default())
2749                .await
2750                .expect("new_volume failed");
2751
2752            create_file(&store, FILE_SIZE).await;
2753
2754            // Race compaction with deleting a store.
2755            let fs_clone = fs.clone();
2756
2757            // Even though the executor has 4 threads, it's hard to get it to run with
2758            // multiple threads.
2759            let executor_tasks = testing::force_executor_threads_to_run(4).await;
2760
2761            let task = fasync::Task::spawn(async move {
2762                fs_clone.journal().force_compact().await.expect("compact failed");
2763            });
2764
2765            // We don't need the executor tasks any more.
2766            drop(executor_tasks);
2767
2768            // This range is chosen such that it caused this test to fail after quite a low number
2769            // of iterations for the bug that this test was introduced for.
2770            let sleep = rand::random_range(3000..6000);
2771            std::thread::sleep(std::time::Duration::from_micros(sleep));
2772            log::info!("sleep {sleep}us");
2773
2774            let transaction = fs
2775                .clone()
2776                .new_transaction(
2777                    lock_keys![
2778                        LockKey::object(
2779                            root_vol.volume_directory().store().store_object_id(),
2780                            root_vol.volume_directory().object_id(),
2781                        ),
2782                        LockKey::flush(store.store_object_id())
2783                    ],
2784                    Options { borrow_metadata_space: true, ..Default::default() },
2785                )
2786                .await
2787                .expect("new_transaction failed");
2788            root_vol.delete_volume("vol", transaction, || {}).await.expect("delete_volume failed");
2789
2790            task.await;
2791        }
2792
2793        fs.journal().force_compact().await.expect("compact failed");
2794        fs.close().await.expect("close failed");
2795
2796        let device = fs.take_device().await;
2797        device.reopen(false);
2798
2799        let fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2800        fsck(fs.clone()).await.expect("fsck failed");
2801        fs.close().await.expect("close failed");
2802    }
2803
2804    #[fuchsia::test]
2805    async fn test_delete_multiple_volumes() {
2806        let (mut fs, _) = test_fs().await;
2807
2808        for _ in 0..50 {
2809            {
2810                let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2811                let store = root_vol
2812                    .new_volume("vol", NewChildStoreOptions::default())
2813                    .await
2814                    .expect("new_volume failed");
2815
2816                create_file(&store, 1_000_000).await;
2817
2818                let transaction = fs
2819                    .clone()
2820                    .new_transaction(
2821                        lock_keys![
2822                            LockKey::object(
2823                                root_vol.volume_directory().store().store_object_id(),
2824                                root_vol.volume_directory().object_id(),
2825                            ),
2826                            LockKey::flush(store.store_object_id())
2827                        ],
2828                        Options { borrow_metadata_space: true, ..Default::default() },
2829                    )
2830                    .await
2831                    .expect("new_transaction failed");
2832                root_vol
2833                    .delete_volume("vol", transaction, || {})
2834                    .await
2835                    .expect("delete_volume failed");
2836
2837                fs.allocator().flush().await.expect("flush failed");
2838            }
2839
2840            fs.close().await.expect("close failed");
2841            let device = fs.take_device().await;
2842            device.reopen(false);
2843
2844            fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2845        }
2846
2847        fsck(fs.clone()).await.expect("fsck failed");
2848        fs.close().await.expect("close failed");
2849    }
2850
2851    #[fuchsia::test]
2852    async fn test_allocate_free_reallocate() {
2853        const STORE_OBJECT_ID: u64 = 99;
2854        let (fs, allocator) = test_fs().await;
2855
2856        // Allocate some stuff.
2857        let mut device_ranges = Vec::new();
2858        let mut transaction =
2859            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2860        for _ in 0..30 {
2861            device_ranges.push(
2862                allocator
2863                    .allocate(&mut transaction, STORE_OBJECT_ID, 100 * fs.block_size())
2864                    .await
2865                    .expect("allocate failed"),
2866            );
2867        }
2868        transaction.commit().await.expect("commit failed");
2869
2870        assert_eq!(
2871            fs.block_size() * 3000,
2872            *allocator.get_owner_allocated_bytes().entry(STORE_OBJECT_ID).or_default()
2873        );
2874
2875        // Delete it all.
2876        let mut transaction =
2877            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2878        for range in std::mem::replace(&mut device_ranges, Vec::new()) {
2879            allocator.deallocate(&mut transaction, STORE_OBJECT_ID, range).await.expect("dealloc");
2880        }
2881        transaction.commit().await.expect("commit failed");
2882
2883        assert_eq!(0, *allocator.get_owner_allocated_bytes().entry(STORE_OBJECT_ID).or_default());
2884
2885        // Allocate some more stuff. Due to storage pressure, this requires us to flush device
2886        // before reusing the above space
2887        let mut transaction =
2888            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2889        let target_len = 1500 * fs.block_size();
2890        while device_ranges.iter().map(|i| i.length().unwrap()).sum::<u64>() != target_len {
2891            let len = target_len - device_ranges.iter().map(|i| i.length().unwrap()).sum::<u64>();
2892            device_ranges.push(
2893                allocator
2894                    .allocate(&mut transaction, STORE_OBJECT_ID, len)
2895                    .await
2896                    .expect("allocate failed"),
2897            );
2898        }
2899        transaction.commit().await.expect("commit failed");
2900
2901        assert_eq!(
2902            fs.block_size() * 1500,
2903            *allocator.get_owner_allocated_bytes().entry(STORE_OBJECT_ID).or_default()
2904        );
2905    }
2906
2907    #[fuchsia::test]
2908    async fn test_flush() {
2909        const STORE_OBJECT_ID: u64 = 99;
2910
2911        let mut device_ranges = Vec::new();
2912        let device = {
2913            let (fs, allocator) = test_fs().await;
2914            let mut transaction = fs
2915                .clone()
2916                .new_transaction(lock_keys![], Options::default())
2917                .await
2918                .expect("new failed");
2919            device_ranges.push(
2920                allocator
2921                    .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2922                    .await
2923                    .expect("allocate failed"),
2924            );
2925            device_ranges.push(
2926                allocator
2927                    .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2928                    .await
2929                    .expect("allocate failed"),
2930            );
2931            device_ranges.push(
2932                allocator
2933                    .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2934                    .await
2935                    .expect("allocate failed"),
2936            );
2937            transaction.commit().await.expect("commit failed");
2938
2939            allocator.flush().await.expect("flush failed");
2940
2941            fs.close().await.expect("close failed");
2942            fs.take_device().await
2943        };
2944
2945        device.reopen(false);
2946        let fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2947        let allocator = fs.allocator();
2948
2949        let allocated = collect_allocations(&allocator).await;
2950
2951        // Make sure the ranges we allocated earlier are still allocated.
2952        for i in &device_ranges {
2953            let mut overlapping = 0;
2954            for j in &allocated {
2955                overlapping += overlap(i, j);
2956            }
2957            assert_eq!(overlapping, i.length().unwrap(), "Range {i:?} not allocated");
2958        }
2959
2960        let mut transaction =
2961            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2962        let range = allocator
2963            .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2964            .await
2965            .expect("allocate failed");
2966
2967        // Make sure the range just allocated doesn't overlap any other allocated ranges.
2968        for r in &allocated {
2969            assert_eq!(overlap(r, &range), 0);
2970        }
2971        transaction.commit().await.expect("commit failed");
2972    }
2973
2974    #[fuchsia::test]
2975    async fn test_dropped_transaction() {
2976        const STORE_OBJECT_ID: u64 = 99;
2977        let (fs, allocator) = test_fs().await;
2978        let allocated_range = {
2979            let mut transaction = fs
2980                .clone()
2981                .new_transaction(lock_keys![], Options::default())
2982                .await
2983                .expect("new_transaction failed");
2984            allocator
2985                .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2986                .await
2987                .expect("allocate failed")
2988        };
2989        // After dropping the transaction and attempting to allocate again, we should end up with
2990        // the same range because the reservation should have been released.
2991        let mut transaction = fs
2992            .clone()
2993            .new_transaction(lock_keys![], Options::default())
2994            .await
2995            .expect("new_transaction failed");
2996        assert_eq!(
2997            allocator
2998                .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2999                .await
3000                .expect("allocate failed"),
3001            allocated_range
3002        );
3003    }
3004
3005    #[fuchsia::test]
3006    async fn test_cleanup_removed_owner() {
3007        const STORE_OBJECT_ID: u64 = 99;
3008        let device = {
3009            let (fs, allocator) = test_fs().await;
3010
3011            assert!(!allocator.get_owner_allocated_bytes().contains_key(&STORE_OBJECT_ID));
3012            {
3013                let mut transaction =
3014                    fs.clone().new_transaction(lock_keys![], Options::default()).await.unwrap();
3015                allocator
3016                    .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
3017                    .await
3018                    .expect("Allocating");
3019                transaction.commit().await.expect("Committing.");
3020            }
3021            allocator.flush().await.expect("Flushing");
3022            assert!(allocator.get_owner_allocated_bytes().contains_key(&STORE_OBJECT_ID));
3023            {
3024                let mut transaction =
3025                    fs.clone().new_transaction(lock_keys![], Options::default()).await.unwrap();
3026                allocator.mark_for_deletion(&mut transaction, STORE_OBJECT_ID);
3027                transaction.commit().await.expect("Committing.");
3028            }
3029            assert!(!allocator.get_owner_allocated_bytes().contains_key(&STORE_OBJECT_ID));
3030            fs.close().await.expect("Closing");
3031            fs.take_device().await
3032        };
3033
3034        device.reopen(false);
3035        let fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
3036        let allocator = fs.allocator();
3037        assert!(!allocator.get_owner_allocated_bytes().contains_key(&STORE_OBJECT_ID));
3038    }
3039
3040    #[fuchsia::test]
3041    async fn test_allocated_bytes() {
3042        const STORE_OBJECT_ID: u64 = 99;
3043        let (fs, allocator) = test_fs().await;
3044
3045        let initial_allocated_bytes = allocator.get_allocated_bytes();
3046
3047        // Verify allocated_bytes reflects allocation changes.
3048        let allocated_bytes = initial_allocated_bytes + fs.block_size();
3049        let allocated_range = {
3050            let mut transaction = fs
3051                .clone()
3052                .new_transaction(lock_keys![], Options::default())
3053                .await
3054                .expect("new_transaction failed");
3055            let range = allocator
3056                .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
3057                .await
3058                .expect("allocate failed");
3059            transaction.commit().await.expect("commit failed");
3060            assert_eq!(allocator.get_allocated_bytes(), allocated_bytes);
3061            range
3062        };
3063
3064        {
3065            let mut transaction = fs
3066                .clone()
3067                .new_transaction(lock_keys![], Options::default())
3068                .await
3069                .expect("new_transaction failed");
3070            allocator
3071                .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
3072                .await
3073                .expect("allocate failed");
3074
3075            // Prior to committing, the count of allocated bytes shouldn't change.
3076            assert_eq!(allocator.get_allocated_bytes(), allocated_bytes);
3077        }
3078
3079        // After dropping the prior transaction, the allocated bytes still shouldn't have changed.
3080        assert_eq!(allocator.get_allocated_bytes(), allocated_bytes);
3081
3082        // Verify allocated_bytes reflects deallocations.
3083        let deallocate_range = allocated_range.start + 20..allocated_range.end - 20;
3084        let mut transaction =
3085            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
3086        allocator
3087            .deallocate(&mut transaction, STORE_OBJECT_ID, deallocate_range)
3088            .await
3089            .expect("deallocate failed");
3090
3091        // Before committing, there should be no change.
3092        assert_eq!(allocator.get_allocated_bytes(), allocated_bytes);
3093
3094        transaction.commit().await.expect("commit failed");
3095
3096        // After committing, all but 40 bytes should remain allocated.
3097        assert_eq!(allocator.get_allocated_bytes(), initial_allocated_bytes + 40);
3098    }
3099
3100    #[fuchsia::test]
3101    async fn test_persist_bytes_limit() {
3102        const LIMIT: u64 = 12345;
3103        const OWNER_ID: u64 = 12;
3104
3105        let (fs, allocator) = test_fs().await;
3106        {
3107            let mut transaction = fs
3108                .clone()
3109                .new_transaction(lock_keys![], Options::default())
3110                .await
3111                .expect("new_transaction failed");
3112            allocator
3113                .set_bytes_limit(&mut transaction, OWNER_ID, LIMIT)
3114                .expect("Failed to set limit.");
3115            assert!(allocator.inner.lock().info.limit_bytes.get(&OWNER_ID).is_none());
3116            transaction.commit().await.expect("Failed to commit transaction");
3117            let bytes: u64 = *allocator
3118                .inner
3119                .lock()
3120                .info
3121                .limit_bytes
3122                .get(&OWNER_ID)
3123                .expect("Failed to find limit");
3124            assert_eq!(LIMIT, bytes);
3125        }
3126    }
3127
3128    /// Given a sorted list of non-overlapping ranges, this will coalesce adjacent ranges.
3129    /// This allows comparison of equivalent sets of ranges which may occur due to differences
3130    /// across allocator strategies.
3131    fn coalesce_ranges(ranges: Vec<Range<u64>>) -> Vec<Range<u64>> {
3132        let mut coalesced = Vec::new();
3133        let mut prev: Option<Range<u64>> = None;
3134        for range in ranges {
3135            if let Some(prev_range) = &mut prev {
3136                if range.start == prev_range.end {
3137                    prev_range.end = range.end;
3138                } else {
3139                    coalesced.push(prev_range.clone());
3140                    prev = Some(range);
3141                }
3142            } else {
3143                prev = Some(range);
3144            }
3145        }
3146        if let Some(prev_range) = prev {
3147            coalesced.push(prev_range);
3148        }
3149        coalesced
3150    }
3151
3152    #[fuchsia::test]
3153    async fn test_take_for_trimming() {
3154        const STORE_OBJECT_ID: u64 = 99;
3155
3156        // Allocate a large chunk, then free a few bits of it, so we have free chunks interleaved
3157        // with allocated chunks.
3158        let allocated_range;
3159        let expected_free_ranges;
3160        let device = {
3161            let (fs, allocator) = test_fs().await;
3162            let bs = fs.block_size();
3163            let mut transaction = fs
3164                .clone()
3165                .new_transaction(lock_keys![], Options::default())
3166                .await
3167                .expect("new failed");
3168            allocated_range = allocator
3169                .allocate(&mut transaction, STORE_OBJECT_ID, 32 * bs)
3170                .await
3171                .expect("allocate failed");
3172            transaction.commit().await.expect("commit failed");
3173
3174            let mut transaction = fs
3175                .clone()
3176                .new_transaction(lock_keys![], Options::default())
3177                .await
3178                .expect("new failed");
3179            let base = allocated_range.start;
3180            expected_free_ranges = vec![
3181                base..(base + (bs * 1)),
3182                (base + (bs * 2))..(base + (bs * 3)),
3183                // Note that the next three ranges are adjacent and will be treated as one free
3184                // range once applied.  We separate them here to exercise the handling of "large"
3185                // free ranges.
3186                (base + (bs * 4))..(base + (bs * 8)),
3187                (base + (bs * 8))..(base + (bs * 12)),
3188                (base + (bs * 12))..(base + (bs * 13)),
3189                (base + (bs * 29))..(base + (bs * 30)),
3190            ];
3191            for range in &expected_free_ranges {
3192                allocator
3193                    .deallocate(&mut transaction, STORE_OBJECT_ID, range.clone())
3194                    .await
3195                    .expect("deallocate failed");
3196            }
3197            transaction.commit().await.expect("commit failed");
3198
3199            allocator.flush().await.expect("flush failed");
3200
3201            fs.close().await.expect("close failed");
3202            fs.take_device().await
3203        };
3204
3205        device.reopen(false);
3206        let fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
3207        let allocator = fs.allocator();
3208
3209        // These values were picked so that each of them would be the reason why
3210        // collect_free_extents finished, and so we would return after partially processing one of
3211        // the free extents.
3212        let max_extent_size = fs.block_size() as usize * 4;
3213        const EXTENTS_PER_BATCH: usize = 2;
3214        let mut free_ranges = vec![];
3215        let mut offset = allocated_range.start;
3216        while offset < allocated_range.end {
3217            let free = allocator
3218                .take_for_trimming(offset, max_extent_size, EXTENTS_PER_BATCH)
3219                .await
3220                .expect("take_for_trimming failed");
3221            free_ranges.extend(
3222                free.extents().iter().filter(|range| range.end <= allocated_range.end).cloned(),
3223            );
3224            offset = free.extents().last().expect("Unexpectedly hit the end of free extents").end;
3225        }
3226        // Coalesce adjacent free ranges because the allocator may return smaller aligned chunks
3227        // but the overall range should always be equivalent.
3228        let coalesced_free_ranges = coalesce_ranges(free_ranges);
3229        let coalesced_expected_free_ranges = coalesce_ranges(expected_free_ranges);
3230
3231        assert_eq!(coalesced_free_ranges, coalesced_expected_free_ranges);
3232    }
3233
3234    #[fuchsia::test]
3235    async fn test_allocations_wait_for_free_extents() {
3236        const STORE_OBJECT_ID: u64 = 99;
3237        let (fs, allocator) = test_fs().await;
3238        let allocator_clone = allocator.clone();
3239
3240        let mut transaction =
3241            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
3242
3243        // Tie up all of the free extents on the device, and make sure allocations block.
3244        let max_extent_size = fs.device().size() as usize;
3245        const EXTENTS_PER_BATCH: usize = usize::MAX;
3246
3247        // HACK: Treat `trimmable_extents` as being locked by `trim_done` (i.e. it should only be
3248        // accessed whilst `trim_done` is locked). We can't combine them into the same mutex,
3249        // because the inner type would be "poisoned" by the lifetime parameter of
3250        // `trimmable_extents` (which is in the lifetime of `allocator`), and then we can't move it
3251        // into `alloc_task` which would require a `'static` lifetime.
3252        let trim_done = Arc::new(Mutex::new(false));
3253        let trimmable_extents = allocator
3254            .take_for_trimming(0, max_extent_size, EXTENTS_PER_BATCH)
3255            .await
3256            .expect("take_for_trimming failed");
3257
3258        let trim_done_clone = trim_done.clone();
3259        let bs = fs.block_size();
3260        let alloc_task = fasync::Task::spawn(async move {
3261            allocator_clone
3262                .allocate(&mut transaction, STORE_OBJECT_ID, bs)
3263                .await
3264                .expect("allocate should fail");
3265            {
3266                assert!(*trim_done_clone.lock(), "Allocation finished before trim completed");
3267            }
3268            transaction.commit().await.expect("commit failed");
3269        });
3270
3271        // Add a small delay to simulate the trim taking some nonzero amount of time.  Otherwise,
3272        // this will almost certainly always beat the allocation attempt.
3273        fasync::Timer::new(std::time::Duration::from_millis(100)).await;
3274
3275        // Once the free extents are released, the task should unblock.
3276        {
3277            let mut trim_done = trim_done.lock();
3278            std::mem::drop(trimmable_extents);
3279            *trim_done = true;
3280        }
3281
3282        alloc_task.await;
3283    }
3284
3285    #[fuchsia::test]
3286    async fn test_allocation_with_reservation_not_multiple_of_block_size() {
3287        const STORE_OBJECT_ID: u64 = 99;
3288        let (fs, allocator) = test_fs().await;
3289
3290        // Reserve an amount that isn't a multiple of the block size.
3291        const RESERVATION_AMOUNT: u64 = TRANSACTION_METADATA_MAX_AMOUNT + 5000;
3292        let reservation =
3293            allocator.clone().reserve(Some(STORE_OBJECT_ID), RESERVATION_AMOUNT).unwrap();
3294
3295        let mut transaction = fs
3296            .clone()
3297            .new_transaction(
3298                lock_keys![],
3299                Options { allocator_reservation: Some(&reservation), ..Options::default() },
3300            )
3301            .await
3302            .expect("new failed");
3303
3304        let range = allocator
3305            .allocate(
3306                &mut transaction,
3307                STORE_OBJECT_ID,
3308                round_up(RESERVATION_AMOUNT, fs.block_size()).unwrap(),
3309            )
3310            .await
3311            .expect("allocate faiiled");
3312        assert_eq!((range.end - range.start) % fs.block_size(), 0);
3313
3314        println!("{}", range.end - range.start);
3315    }
3316}