Skip to main content

fxfs/object_store/
allocator.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! # The Allocator
6//!
7//! The allocator in Fxfs is filesystem-wide entity responsible for managing the allocation of
8//! regions of the device to "owners" (which are `ObjectStore`).
9//!
10//! Allocations are tracked in an LSMTree with coalescing used to merge neighboring allocations
11//! with the same properties (owner and reference count). As of writing, reference counting is not
12//! used. (Reference counts are intended for future use if/when snapshot support is implemented.)
13//!
14//! There are a number of important implementation features of the allocator that warrant further
15//! documentation.
16//!
17//! ## Byte limits
18//!
19//! Fxfs is a multi-volume filesystem. Fuchsia with fxblob currently uses two primary volumes -
20//! an unencrypted volume for blob storage and an encrypted volume for data storage.
21//! Byte limits ensure that no one volume can consume all available storage. This is important
22//! as software updates must always be possible (blobs) and conversely configuration data should
23//! always be writable (data).
24//!
25//! ## Reservation tracking
26//!
27//! Fxfs on Fuchsia leverages write-back caching which allows us to buffer writes in RAM until
28//! memory pressure, an explicit flush or a file close requires us to persist data to storage.
29//!
30//! To ensure that we do not over-commit in such cases (e.g. by writing many files to RAM only to
31//! find out tha there is insufficient disk to store them all), the allocator includes a simple
32//! reservation tracking mechanism.
33//!
34//! Reservation tracking is implemented hierarchically such that a reservation can portion out
35//! sub-reservations, each of which may be "reserved" or "forgotten" when it comes time to actually
36//! allocate space.
37//!
38//! ## Fixed locations
39//!
40//! The only fixed location files in Fxfs are the first 512kiB extents of the two superblocks that
41//! exist as the first things on the disk (i.e. The first 1MiB). The allocator manages these
42//! locations, but does so using a 'mark_allocated' method distinct from all other allocations in
43//! which the location is left up to the allocator.
44//!
45//! ## Deallocated unusable regions
46//!
47//! It is not legal to reuse a deallocated disk region until after a flush. Transactions
48//! are not guaranteed to be persisted until after a successful flush so any reuse of
49//! a region before then followed by power loss may lead to corruption.
50//!
51//! e.g. Consider if we deallocate a file, reuse its extent for another file, then crash after
52//! writing to the new file but not yet flushing the journal. At next mount we will have lost the
53//! transaction despite overwriting the original file's data, leading to inconsistency errors (data
54//! corruption).
55//!
56//! These regions are currently tracked in RAM in the allocator.
57//!
58//! ## Allocated but uncommitted regions
59//!
60//! These are allocated regions that are not (yet) persisted to disk. They are regular
61//! file allocations, but are not stored on persistent storage until their transaction is committed
62//! and safely flushed to disk.
63//!
64//! ## TRIMed unusable regions
65//!
66//! We periodically TRIM unallocated regions to give the SSD controller insight into which
67//! parts of the device contain data. We must avoid using temporary TRIM allocations that are held
68//! while we perform these operations.
69//!
70//! ## Volume deletion
71//!
72//! We make use of an optimisation in the case where an entire volume is deleted. In such cases,
73//! rather than individually deallocate all disk regions associated with that volume, we make
74//! note of the deletion and perform special merge operation on the next LSMTree compaction that
75//! filters out allocations for the deleted volume.
76//!
77//! This is designed to make dropping of volumes significantly cheaper, but it does add some
78//! additional complexity if implementing an allocator that implements data structures to track
79//! free space (rather than just allocated space).
80//!
81//! ## Image generation
82//!
83//! The Fuchsia build process requires building an initial filesystem image. In the case of
84//! fxblob-based boards, this is an Fxfs filesystem containing a volume with the base set of
85//! blobs required to bootstrap the system. When we build such an image, we want it to be as compact
86//! as possible as we're potentially packaging it up for distribution. To that end, our allocation
87//! strategy (or at least the strategy used for image generation) should prefer to allocate from the
88//! start of the device wherever possible.
89pub mod merge;
90pub mod strategy;
91
92use crate::drop_event::DropEvent;
93use crate::errors::FxfsError;
94use crate::filesystem::{ApplyContext, ApplyMode, FxFilesystem, JournalingObject, SyncOptions};
95use crate::log::*;
96use crate::lsm_tree::cache::NullCache;
97use crate::lsm_tree::skip_list_layer::SkipListLayer;
98use crate::lsm_tree::types::{
99    FuzzyHash, Item, ItemRef, Layer, LayerIterator, LayerKey, MergeType, OrdLowerBound,
100    OrdUpperBound, RangeKey, SortByU64, Value,
101};
102use crate::lsm_tree::{LSMTree, Query, compact_with_iterator, layers_from_handles};
103use crate::object_handle::{INVALID_OBJECT_ID, ObjectHandle, ReadObjectHandle};
104use crate::object_store::object_manager::ReservationUpdate;
105use crate::object_store::transaction::{
106    AllocatorMutation, AssocObj, LockKey, Mutation, Options, Transaction, WriteGuard, lock_keys,
107};
108use crate::object_store::{
109    DataObjectHandle, DirectWriter, HandleOptions, ObjectStore, ReservedId, tree,
110};
111use crate::range::RangeExt;
112use crate::round::{round_div, round_down, round_up};
113use crate::serialized_types::{
114    DEFAULT_MAX_SERIALIZED_RECORD_SIZE, LATEST_VERSION, Version, Versioned, VersionedLatest,
115};
116use anyhow::{Context, Error, anyhow, bail, ensure};
117use async_trait::async_trait;
118use either::Either::{Left, Right};
119use event_listener::EventListener;
120use fprint::TypeFingerprint;
121use fuchsia_inspect::HistogramProperty;
122use fuchsia_sync::Mutex;
123use futures::FutureExt;
124use merge::{filter_marked_for_deletion, filter_tombstones, merge};
125use serde::{Deserialize, Serialize};
126use std::borrow::Borrow;
127use std::collections::{BTreeMap, HashSet, VecDeque};
128use std::hash::Hash;
129use std::marker::PhantomData;
130use std::num::{NonZero, Saturating};
131use std::ops::Range;
132use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
133use std::sync::{Arc, Weak};
134
135/// This trait is implemented by things that own reservations.
136pub trait ReservationOwner: Send + Sync {
137    /// Report that bytes are being released from the reservation back to the the |ReservationOwner|
138    /// where |owner_object_id| is the owner under the root object store associated with the
139    /// reservation.
140    fn release_reservation(&self, owner_object_id: Option<u64>, amount: u64);
141}
142
143/// A reservation guarantees that when it comes time to actually allocate, it will not fail due to
144/// lack of space.  Sub-reservations (a.k.a. holds) are possible which effectively allows part of a
145/// reservation to be set aside until it's time to commit.  Reservations do offer some
146/// thread-safety, but some responsibility is born by the caller: e.g. calling `forget` and
147/// `reserve` at the same time from different threads is unsafe. Reservations are have an
148/// |owner_object_id| which associates it with an object under the root object store that the
149/// reservation is accounted against.
150pub struct ReservationImpl<T: Borrow<U>, U: ReservationOwner + ?Sized> {
151    owner: T,
152    owner_object_id: Option<u64>,
153    inner: Mutex<ReservationInner>,
154    phantom: PhantomData<U>,
155}
156
157#[derive(Debug, Default)]
158struct ReservationInner {
159    // Amount currently held by this reservation.
160    amount: u64,
161
162    // Amount reserved by sub-reservations.
163    reserved: u64,
164}
165
166impl<T: Borrow<U>, U: ReservationOwner + ?Sized> std::fmt::Debug for ReservationImpl<T, U> {
167    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
168        self.inner.lock().fmt(f)
169    }
170}
171
172impl<T: Borrow<U> + Clone + Send + Sync, U: ReservationOwner + ?Sized> ReservationImpl<T, U> {
173    pub fn new(owner: T, owner_object_id: Option<u64>, amount: u64) -> Self {
174        Self {
175            owner,
176            owner_object_id,
177            inner: Mutex::new(ReservationInner { amount, reserved: 0 }),
178            phantom: PhantomData,
179        }
180    }
181
182    pub fn owner_object_id(&self) -> Option<u64> {
183        self.owner_object_id
184    }
185
186    /// Returns the total amount of the reservation, not accounting for anything that might be held.
187    pub fn amount(&self) -> u64 {
188        self.inner.lock().amount
189    }
190
191    /// Adds more to the reservation.
192    pub fn add(&self, amount: u64) {
193        self.inner.lock().amount += amount;
194    }
195
196    /// Returns the entire amount of the reservation.  The caller is responsible for maintaining
197    /// consistency, i.e. updating counters, etc, and there can be no sub-reservations (an assert
198    /// will fire otherwise).
199    pub fn forget(&self) -> u64 {
200        let mut inner = self.inner.lock();
201        assert_eq!(inner.reserved, 0);
202        std::mem::take(&mut inner.amount)
203    }
204
205    /// Takes some of the reservation.  The caller is responsible for maintaining consistency,
206    /// i.e. updating counters, etc.  This will assert that the amount being forgotten does not
207    /// exceed the available reservation amount; the caller should ensure that this is the case.
208    pub fn forget_some(&self, amount: u64) {
209        let mut inner = self.inner.lock();
210        inner.amount -= amount;
211        assert!(inner.reserved <= inner.amount);
212    }
213
214    /// Returns a partial amount of the reservation. |amount| is passed the |limit| and should
215    /// return the amount, which can be zero.
216    fn reserve_with(&self, amount: impl FnOnce(u64) -> u64) -> ReservationImpl<&Self, Self> {
217        let mut inner = self.inner.lock();
218        let taken = amount(inner.amount - inner.reserved);
219        inner.reserved += taken;
220        ReservationImpl::new(self, self.owner_object_id, taken)
221    }
222
223    /// Reserves *exactly* amount if possible.
224    pub fn reserve(&self, amount: u64) -> Option<ReservationImpl<&Self, Self>> {
225        let mut inner = self.inner.lock();
226        if inner.amount - inner.reserved < amount {
227            None
228        } else {
229            inner.reserved += amount;
230            Some(ReservationImpl::new(self, self.owner_object_id, amount))
231        }
232    }
233
234    /// Commits a previously reserved amount from this reservation.  The caller is responsible for
235    /// ensuring the amount was reserved.
236    pub fn commit(&self, amount: u64) {
237        let mut inner = self.inner.lock();
238        inner.reserved -= amount;
239        inner.amount -= amount;
240    }
241
242    /// Returns some of the reservation.
243    pub fn give_back(&self, amount: u64) {
244        self.owner.borrow().release_reservation(self.owner_object_id, amount);
245        let mut inner = self.inner.lock();
246        inner.amount -= amount;
247        assert!(inner.reserved <= inner.amount);
248    }
249
250    /// Moves `amount` from this reservation to another reservation.
251    pub fn move_to<V: Borrow<W> + Clone + Send + Sync, W: ReservationOwner + ?Sized>(
252        &self,
253        other: &ReservationImpl<V, W>,
254        amount: u64,
255    ) {
256        assert_eq!(self.owner_object_id, other.owner_object_id());
257        let mut inner = self.inner.lock();
258        if let Some(amount) = inner.amount.checked_sub(amount) {
259            inner.amount = amount;
260        } else {
261            std::mem::drop(inner);
262            panic!("Insufficient reservation space");
263        }
264        other.add(amount);
265    }
266}
267
268impl<T: Borrow<U>, U: ReservationOwner + ?Sized> Drop for ReservationImpl<T, U> {
269    fn drop(&mut self) {
270        let inner = self.inner.get_mut();
271        assert_eq!(inner.reserved, 0);
272        let owner_object_id = self.owner_object_id;
273        if inner.amount > 0 {
274            self.owner
275                .borrow()
276                .release_reservation(owner_object_id, std::mem::take(&mut inner.amount));
277        }
278    }
279}
280
281impl<T: Borrow<U> + Send + Sync, U: ReservationOwner + ?Sized> ReservationOwner
282    for ReservationImpl<T, U>
283{
284    fn release_reservation(&self, owner_object_id: Option<u64>, amount: u64) {
285        // Sub-reservations should belong to the same owner (or lack thereof).
286        assert_eq!(owner_object_id, self.owner_object_id);
287        let mut inner = self.inner.lock();
288        assert!(inner.reserved >= amount, "{} >= {}", inner.reserved, amount);
289        inner.reserved -= amount;
290    }
291}
292
293pub type Reservation = ReservationImpl<Arc<dyn ReservationOwner>, dyn ReservationOwner>;
294
295pub type Hold<'a> = ReservationImpl<&'a Reservation, Reservation>;
296
297/// Our allocator implementation tracks extents with a reference count.  At time of writing, these
298/// reference counts should never exceed 1, but that might change with snapshots and clones.
299pub type AllocatorKey = AllocatorKeyV32;
300
301#[derive(Clone, Debug, Deserialize, Eq, Hash, PartialEq, Serialize, TypeFingerprint, Versioned)]
302#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
303pub struct AllocatorKeyV32 {
304    pub device_range: Range<u64>,
305}
306
307impl SortByU64 for AllocatorKey {
308    fn get_leading_u64(&self) -> u64 {
309        self.device_range.start
310    }
311}
312
313const EXTENT_HASH_BUCKET_SIZE: u64 = 1 * 1024 * 1024;
314
315pub struct AllocatorKeyPartitionIterator {
316    device_range: Range<u64>,
317}
318
319impl Iterator for AllocatorKeyPartitionIterator {
320    type Item = u64;
321
322    fn next(&mut self) -> Option<Self::Item> {
323        if self.device_range.start >= self.device_range.end {
324            None
325        } else {
326            let start = self.device_range.start;
327            self.device_range.start = start.saturating_add(EXTENT_HASH_BUCKET_SIZE);
328            let end = std::cmp::min(self.device_range.start, self.device_range.end);
329            let key = AllocatorKey { device_range: start..end };
330            let hash = crate::stable_hash::stable_hash(key);
331            Some(hash)
332        }
333    }
334}
335
336impl FuzzyHash for AllocatorKey {
337    fn fuzzy_hash(&self) -> impl Iterator<Item = u64> {
338        AllocatorKeyPartitionIterator {
339            device_range: round_down(self.device_range.start, EXTENT_HASH_BUCKET_SIZE)
340                ..round_up(self.device_range.end, EXTENT_HASH_BUCKET_SIZE).unwrap_or(u64::MAX),
341        }
342    }
343
344    fn is_range_key(&self) -> bool {
345        true
346    }
347}
348
349impl AllocatorKey {
350    /// Returns a new key that is a lower bound suitable for use with merge_into.
351    pub fn lower_bound_for_merge_into(self: &AllocatorKey) -> AllocatorKey {
352        AllocatorKey { device_range: 0..self.device_range.start }
353    }
354}
355
356impl LayerKey for AllocatorKey {
357    fn merge_type(&self) -> MergeType {
358        MergeType::OptimizedMerge
359    }
360}
361
362impl OrdUpperBound for AllocatorKey {
363    fn cmp_upper_bound(&self, other: &AllocatorKey) -> std::cmp::Ordering {
364        self.device_range.end.cmp(&other.device_range.end)
365    }
366}
367
368impl OrdLowerBound for AllocatorKey {
369    fn cmp_lower_bound(&self, other: &AllocatorKey) -> std::cmp::Ordering {
370        // The ordering over range.end is significant here as it is used in
371        // the heap ordering that feeds into our merge function and
372        // a total ordering over range lets us remove a symmetry case from
373        // the allocator merge function.
374        self.device_range
375            .start
376            .cmp(&other.device_range.start)
377            .then(self.device_range.end.cmp(&other.device_range.end))
378    }
379}
380
381impl Ord for AllocatorKey {
382    fn cmp(&self, other: &AllocatorKey) -> std::cmp::Ordering {
383        self.device_range
384            .start
385            .cmp(&other.device_range.start)
386            .then(self.device_range.end.cmp(&other.device_range.end))
387    }
388}
389
390impl PartialOrd for AllocatorKey {
391    fn partial_cmp(&self, other: &AllocatorKey) -> Option<std::cmp::Ordering> {
392        Some(self.cmp(other))
393    }
394}
395
396impl RangeKey for AllocatorKey {
397    fn overlaps(&self, other: &Self) -> bool {
398        self.device_range.start < other.device_range.end
399            && self.device_range.end > other.device_range.start
400    }
401}
402
403/// Allocations are "owned" by a single ObjectStore and are reference counted
404/// (for future snapshot/clone support).
405pub type AllocatorValue = AllocatorValueV32;
406impl Value for AllocatorValue {
407    const DELETED_MARKER: Self = Self::None;
408}
409
410#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize, TypeFingerprint, Versioned)]
411#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
412pub enum AllocatorValueV32 {
413    // Tombstone variant indicating an extent is no longer allocated.
414    None,
415    // Used when we know there are no possible allocations below us in the stack.
416    // This is currently all the time. We used to have a related Delta type but
417    // it has been removed due to correctness issues (https://fxbug.dev/42179428).
418    Abs { count: u64, owner_object_id: u64 },
419}
420
421pub type AllocatorItem = Item<AllocatorKey, AllocatorValue>;
422
423/// Serialized information about the allocator.
424pub type AllocatorInfo = AllocatorInfoV32;
425
426#[derive(Debug, Default, Clone, Deserialize, Serialize, TypeFingerprint, Versioned)]
427pub struct AllocatorInfoV32 {
428    /// Holds the set of layer file object_id for the LSM tree (newest first).
429    pub layers: Vec<u64>,
430    /// Maps from owner_object_id to bytes allocated.
431    pub allocated_bytes: BTreeMap<u64, u64>,
432    /// Set of owner_object_id that we should ignore if found in layer files.  For now, this should
433    /// always be empty on-disk because we always do full compactions.
434    pub marked_for_deletion: HashSet<u64>,
435    /// The limit for the number of allocates bytes per `owner_object_id` whereas the value. If
436    /// there is no limit present here for an `owner_object_id` assume it is max u64.
437    pub limit_bytes: BTreeMap<u64, u64>,
438}
439
440const MAX_ALLOCATOR_INFO_SERIALIZED_SIZE: usize = 131_072;
441
442/// Computes the target maximum extent size based on the block size of the allocator.
443pub fn max_extent_size_for_block_size(block_size: u64) -> u64 {
444    // Each block in an extent contains an 8-byte checksum (which due to varint encoding is 9
445    // bytes), and a given extent record must be no larger DEFAULT_MAX_SERIALIZED_RECORD_SIZE.  We
446    // also need to leave a bit of room (arbitrarily, 64 bytes) for the rest of the extent's
447    // metadata.
448    block_size * (DEFAULT_MAX_SERIALIZED_RECORD_SIZE - 64) / 9
449}
450
451#[derive(Default)]
452struct AllocatorCounters {
453    num_flushes: u64,
454    last_flush_time: Option<std::time::SystemTime>,
455}
456
457pub struct Allocator {
458    filesystem: Weak<FxFilesystem>,
459    block_size: u64,
460    device_size: u64,
461    object_id: u64,
462    max_extent_size_bytes: u64,
463    tree: LSMTree<AllocatorKey, AllocatorValue>,
464    // A list of allocations which are temporary; i.e. they are not actually stored in the LSM tree,
465    // but we still don't want to be able to allocate over them.  This layer is merged into the
466    // allocator's LSM tree whilst reading it, so the allocations appear to exist in the LSM tree.
467    // This is used in a few places, for example to hold back allocations that have been added to a
468    // transaction but are not yet committed yet, or to prevent the allocation of a deleted extent
469    // until the device is flushed.
470    temporary_allocations: Arc<SkipListLayer<AllocatorKey, AllocatorValue>>,
471    inner: Mutex<Inner>,
472    allocation_mutex: futures::lock::Mutex<()>,
473    counters: Mutex<AllocatorCounters>,
474    maximum_offset: AtomicU64,
475    allocations_allowed: AtomicBool,
476}
477
478/// Tracks the different stages of byte allocations for an individual owner.
479#[derive(Debug, Default, PartialEq)]
480struct ByteTracking {
481    /// This value is the up-to-date count of the number of allocated bytes per owner_object_id
482    /// whereas the value in `Info::allocated_bytes` is the value as it was when we last flushed.
483    /// This field should be regarded as *untrusted*; it can be invalid due to filesystem
484    /// inconsistencies, and this is why it is `Saturating<u64>` rather than just `u64`.  Any
485    /// computations that use this value should typically propagate `Saturating` so that it is clear
486    /// the value is *untrusted*.
487    allocated_bytes: Saturating<u64>,
488
489    /// This value is the number of bytes allocated to uncommitted allocations.
490    /// (Bytes allocated, but not yet persisted to disk)
491    uncommitted_allocated_bytes: u64,
492
493    /// This value is the number of bytes allocated to reservations.
494    reserved_bytes: u64,
495
496    /// Committed deallocations that we cannot use until they are flushed to the device.  Each entry
497    /// in this list is the log file offset at which it was committed and an array of deallocations
498    /// that occurred at that time.
499    committed_deallocated_bytes: u64,
500}
501
502impl ByteTracking {
503    // Returns the total number of bytes that are taken either from reservations, allocations or
504    // uncommitted allocations.
505    fn used_bytes(&self) -> Saturating<u64> {
506        self.allocated_bytes + Saturating(self.uncommitted_allocated_bytes + self.reserved_bytes)
507    }
508
509    // Returns the amount that is not available to be allocated, which includes actually allocated
510    // bytes, bytes that have been allocated for a transaction but the transaction hasn't committed
511    // yet, and bytes that have been deallocated, but the device hasn't been flushed yet so we can't
512    // reuse those bytes yet.
513    fn unavailable_bytes(&self) -> Saturating<u64> {
514        self.allocated_bytes
515            + Saturating(self.uncommitted_allocated_bytes)
516            + Saturating(self.committed_deallocated_bytes)
517    }
518
519    // Like `unavailable_bytes`, but treats as available the bytes which have been deallocated and
520    // require a device flush to be reused.
521    fn unavailable_after_sync_bytes(&self) -> Saturating<u64> {
522        self.allocated_bytes + Saturating(self.uncommitted_allocated_bytes)
523    }
524}
525
526#[derive(Debug)]
527struct CommittedDeallocation {
528    // The offset at which this deallocation was committed.
529    log_file_offset: u64,
530    // The device range being deallocated.
531    range: Range<u64>,
532    // The owning object id which originally allocated it.
533    owner_object_id: u64,
534}
535
536struct Inner {
537    info: AllocatorInfo,
538
539    /// The allocator can only be opened if there have been no allocations and it has not already
540    /// been opened or initialized.
541    opened: bool,
542
543    /// When we allocate a range from RAM, we add it to Allocator::temporary_allocations.
544    /// This layer is added to layer_set in rebuild_strategy and take_for trim so we don't assume
545    /// the range is available.
546    /// If we apply the mutation, we move it from `temporary_allocations` to the LSMTree.
547    /// If we drop the mutation, we just delete it from `temporary_allocations` and call `free`.
548    ///
549    /// We need to be very careful about races when we move the allocation into the LSMTree.
550    /// A layer_set is a snapshot in time of a set of layers. Say:
551    ///  1. `rebuild_strategy` takes a layer_set that includes the mutable layer.
552    ///  2. A compaction operation seals that mutable layer and creates a new mutable layer.
553    ///     Note that the layer_set in rebuild_strategy doesn't have this new mutable layer.
554    ///  3. An apply_mutation operation adds the allocation to the new mutable layer.
555    ///     It then removes the allocation from `temporary_allocations`.
556    ///  4. `rebuild_strategy` iterates the layer_set, missing both the temporary mutation AND
557    ///     the record in the new mutable layer, making the allocated range available for double
558    ///     allocation and future filesystem corruption.
559    ///
560    /// We don't have (or want) a means to lock compctions during rebuild_strategy operations so
561    /// to avoid this scenario, we can't remove from temporary_allocations when we apply a mutation.
562    /// Instead we add them to dropped_temporary_allocations and make the actual removals from
563    /// `temporary_allocations` while holding the allocator lock. Because rebuild_strategy only
564    /// runs with this lock held, this guarantees that we don't remove entries from temporary
565    /// mutations while also iterating over it.
566    ///
567    /// A related issue happens when we deallocate a range. We use temporary_allocations this time
568    /// to prevent reuse until after the deallocation has been successfully flushed.
569    /// In this scenario we don't want to call 'free()' until the range is ready to use again.
570    dropped_temporary_allocations: Vec<Range<u64>>,
571
572    /// The per-owner counters for bytes at various stages of the data life-cycle. From initial
573    /// reservation through until the bytes are unallocated and eventually uncommitted.
574    owner_bytes: BTreeMap<u64, ByteTracking>,
575
576    /// This value is the number of bytes allocated to reservations but not tracked as part of a
577    /// particular volume.
578    unattributed_reserved_bytes: u64,
579
580    /// Committed deallocations that we cannot use until they are flushed to the device.
581    committed_deallocated: VecDeque<CommittedDeallocation>,
582
583    /// Bytes which are currently being trimmed.  These can still be allocated from, but the
584    /// allocation will block until the current batch of trimming is finished.
585    trim_reserved_bytes: u64,
586
587    /// While a trim is being performed, this listener is set.  When the current batch of extents
588    /// being trimmed have been released (and trim_reserved_bytes is 0), this is signaled.
589    /// This should only be listened to while the allocation_mutex is held.
590    trim_listener: Option<EventListener>,
591
592    /// This controls how we allocate our free space to manage fragmentation.
593    strategy: strategy::BestFit,
594
595    /// Tracks the number of allocations of size 1,2,...63,>=64.
596    allocation_size_histogram: [u64; 64],
597    /// Tracks which size bucket triggers rebuild_strategy.
598    rebuild_strategy_trigger_histogram: [u64; 64],
599
600    /// This is distinct from the set contained in `info`.  New entries are inserted *after* a
601    /// device has been flushed (it is not safe to reuse the space taken by a deleted volume prior
602    /// to this) and are removed after a major compaction.
603    marked_for_deletion: HashSet<u64>,
604
605    /// The set of volumes deleted that are waiting for a sync.
606    volumes_deleted_pending_sync: HashSet<u64>,
607}
608
609impl Inner {
610    fn allocated_bytes(&self) -> Saturating<u64> {
611        let mut total = Saturating(0);
612        for (_, bytes) in &self.owner_bytes {
613            total += bytes.allocated_bytes;
614        }
615        total
616    }
617
618    fn uncommitted_allocated_bytes(&self) -> u64 {
619        self.owner_bytes.values().map(|x| &x.uncommitted_allocated_bytes).sum()
620    }
621
622    fn reserved_bytes(&self) -> u64 {
623        self.owner_bytes.values().map(|x| &x.reserved_bytes).sum::<u64>()
624            + self.unattributed_reserved_bytes
625    }
626
627    fn owner_id_limit_bytes(&self, owner_object_id: u64) -> u64 {
628        match self.info.limit_bytes.get(&owner_object_id) {
629            Some(v) => *v,
630            None => u64::MAX,
631        }
632    }
633
634    fn owner_id_bytes_left(&self, owner_object_id: u64) -> u64 {
635        let limit = self.owner_id_limit_bytes(owner_object_id);
636        let used = self.owner_bytes.get(&owner_object_id).map_or(Saturating(0), |b| b.used_bytes());
637        (Saturating(limit) - used).0
638    }
639
640    // Returns the amount that is not available to be allocated, which includes actually allocated
641    // bytes, bytes that have been allocated for a transaction but the transaction hasn't committed
642    // yet, and bytes that have been deallocated, but the device hasn't been flushed yet so we can't
643    // reuse those bytes yet.
644    fn unavailable_bytes(&self) -> Saturating<u64> {
645        let mut total = Saturating(0);
646        for (_, bytes) in &self.owner_bytes {
647            total += bytes.unavailable_bytes();
648        }
649        total
650    }
651
652    // Returns the total number of bytes that are taken either from reservations, allocations or
653    // uncommitted allocations.
654    fn used_bytes(&self) -> Saturating<u64> {
655        let mut total = Saturating(0);
656        for (_, bytes) in &self.owner_bytes {
657            total += bytes.used_bytes();
658        }
659        total + Saturating(self.unattributed_reserved_bytes)
660    }
661
662    // Like `unavailable_bytes`, but treats as available the bytes which have been deallocated and
663    // require a device flush to be reused.
664    fn unavailable_after_sync_bytes(&self) -> Saturating<u64> {
665        let mut total = Saturating(0);
666        for (_, bytes) in &self.owner_bytes {
667            total += bytes.unavailable_after_sync_bytes();
668        }
669        total
670    }
671
672    // Returns the number of bytes which will be available after the current batch of trimming
673    // completes.
674    fn bytes_available_not_being_trimmed(&self, device_size: u64) -> Result<u64, Error> {
675        device_size
676            .checked_sub(
677                (self.unavailable_after_sync_bytes() + Saturating(self.trim_reserved_bytes)).0,
678            )
679            .ok_or_else(|| anyhow!(FxfsError::Inconsistent))
680    }
681
682    fn add_reservation(&mut self, owner_object_id: Option<u64>, amount: u64) {
683        match owner_object_id {
684            Some(owner) => self.owner_bytes.entry(owner).or_default().reserved_bytes += amount,
685            None => self.unattributed_reserved_bytes += amount,
686        };
687    }
688
689    fn remove_reservation(&mut self, owner_object_id: Option<u64>, amount: u64) {
690        match owner_object_id {
691            Some(owner) => {
692                let owner_entry = self.owner_bytes.entry(owner).or_default();
693                assert!(
694                    owner_entry.reserved_bytes >= amount,
695                    "{} >= {}",
696                    owner_entry.reserved_bytes,
697                    amount
698                );
699                owner_entry.reserved_bytes -= amount;
700            }
701            None => {
702                assert!(
703                    self.unattributed_reserved_bytes >= amount,
704                    "{} >= {}",
705                    self.unattributed_reserved_bytes,
706                    amount
707                );
708                self.unattributed_reserved_bytes -= amount
709            }
710        };
711    }
712}
713
714/// A container for a set of extents which are known to be free and can be trimmed.  Returned by
715/// `take_for_trimming`.
716pub struct TrimmableExtents<'a> {
717    allocator: &'a Allocator,
718    extents: Vec<Range<u64>>,
719    // The allocator can subscribe to this event to wait until these extents are dropped.  This way,
720    // we don't fail an allocation attempt if blocks are tied up for trimming; rather, we just wait
721    // until the batch is finished with and then proceed.
722    _drop_event: DropEvent,
723}
724
725impl<'a> TrimmableExtents<'a> {
726    pub fn extents(&self) -> &Vec<Range<u64>> {
727        &self.extents
728    }
729
730    // Also returns an EventListener which is signaled when this is dropped.
731    fn new(allocator: &'a Allocator) -> (Self, EventListener) {
732        let drop_event = DropEvent::new();
733        let listener = drop_event.listen();
734        (Self { allocator, extents: vec![], _drop_event: drop_event }, listener)
735    }
736
737    fn add_extent(&mut self, extent: Range<u64>) {
738        self.extents.push(extent);
739    }
740}
741
742impl<'a> Drop for TrimmableExtents<'a> {
743    fn drop(&mut self) {
744        let mut inner = self.allocator.inner.lock();
745        for device_range in std::mem::take(&mut self.extents) {
746            inner.strategy.free(device_range.clone()).expect("drop trim extent");
747            self.allocator
748                .temporary_allocations
749                .erase(&AllocatorKey { device_range: device_range.clone() });
750        }
751        inner.trim_reserved_bytes = 0;
752    }
753}
754
755impl Allocator {
756    pub fn new(filesystem: Arc<FxFilesystem>, object_id: u64) -> Allocator {
757        let block_size = filesystem.block_size();
758        // We expect device size to be a multiple of block size. Throw away any tail.
759        let device_size = round_down(filesystem.device().size(), block_size);
760        if device_size != filesystem.device().size() {
761            warn!("Device size is not block aligned. Rounding down.");
762        }
763        let max_extent_size_bytes = max_extent_size_for_block_size(filesystem.block_size());
764        let mut strategy = strategy::BestFit::default();
765        strategy.free(0..device_size).expect("new fs");
766        Allocator {
767            filesystem: Arc::downgrade(&filesystem),
768            block_size,
769            device_size,
770            object_id,
771            max_extent_size_bytes,
772            tree: LSMTree::new(merge, Box::new(NullCache {})),
773            temporary_allocations: SkipListLayer::new(1024),
774            inner: Mutex::new(Inner {
775                info: AllocatorInfo::default(),
776                opened: false,
777                dropped_temporary_allocations: Vec::new(),
778                owner_bytes: BTreeMap::new(),
779                unattributed_reserved_bytes: 0,
780                committed_deallocated: VecDeque::new(),
781                trim_reserved_bytes: 0,
782                trim_listener: None,
783                strategy,
784                allocation_size_histogram: [0; 64],
785                rebuild_strategy_trigger_histogram: [0; 64],
786                marked_for_deletion: HashSet::new(),
787                volumes_deleted_pending_sync: HashSet::new(),
788            }),
789            allocation_mutex: futures::lock::Mutex::new(()),
790            counters: Mutex::new(AllocatorCounters::default()),
791            maximum_offset: AtomicU64::new(0),
792            allocations_allowed: AtomicBool::new(filesystem.options().image_builder_mode.is_none()),
793        }
794    }
795
796    pub fn tree(&self) -> &LSMTree<AllocatorKey, AllocatorValue> {
797        &self.tree
798    }
799
800    /// Enables allocations.
801    /// This is only valid to call if the allocator was created in image builder mode.
802    pub fn enable_allocations(&self) {
803        self.allocations_allowed.store(true, Ordering::SeqCst);
804    }
805
806    /// Returns an iterator that yields all allocations, filtering out tombstones and any
807    /// owner_object_id that have been marked as deleted.  If `committed_marked_for_deletion` is
808    /// true, then filter using the committed volumes marked for deletion rather than the in-memory
809    /// copy which excludes volumes that have been deleted but there hasn't been a sync yet.
810    pub async fn filter(
811        &self,
812        iter: impl LayerIterator<AllocatorKey, AllocatorValue>,
813        committed_marked_for_deletion: bool,
814    ) -> Result<impl LayerIterator<AllocatorKey, AllocatorValue>, Error> {
815        let marked_for_deletion = {
816            let inner = self.inner.lock();
817            if committed_marked_for_deletion {
818                &inner.info.marked_for_deletion
819            } else {
820                &inner.marked_for_deletion
821            }
822            .clone()
823        };
824        let iter =
825            filter_marked_for_deletion(filter_tombstones(iter).await?, marked_for_deletion).await?;
826        Ok(iter)
827    }
828
829    /// A histogram of allocation request sizes.
830    /// The index into the array is 'number of blocks'.
831    /// The last bucket is a catch-all for larger allocation requests.
832    pub fn allocation_size_histogram(&self) -> [u64; 64] {
833        self.inner.lock().allocation_size_histogram
834    }
835
836    /// Creates a new (empty) allocator.
837    pub async fn create(&self, transaction: &mut Transaction<'_>) -> Result<(), Error> {
838        // Mark the allocator as opened before creating the file because creating a new
839        // transaction requires a reservation.
840        assert_eq!(std::mem::replace(&mut self.inner.lock().opened, true), false);
841
842        let filesystem = self.filesystem.upgrade().unwrap();
843        let root_store = filesystem.root_store();
844        ObjectStore::create_object_with_id(
845            &root_store,
846            transaction,
847            ReservedId::new(&root_store, NonZero::new(self.object_id()).unwrap()),
848            HandleOptions::default(),
849            None,
850        )?;
851        root_store.update_last_object_id(self.object_id());
852        Ok(())
853    }
854
855    // Opens the allocator.  This is not thread-safe; this should be called prior to
856    // replaying the allocator's mutations.
857    pub async fn open(self: &Arc<Self>) -> Result<(), Error> {
858        let filesystem = self.filesystem.upgrade().unwrap();
859        let root_store = filesystem.root_store();
860
861        self.inner.lock().strategy = strategy::BestFit::default();
862
863        let handle =
864            ObjectStore::open_object(&root_store, self.object_id, HandleOptions::default(), None)
865                .await
866                .context("Failed to open allocator object")?;
867
868        if handle.get_size() > 0 {
869            let serialized_info = handle
870                .contents(MAX_ALLOCATOR_INFO_SERIALIZED_SIZE)
871                .await
872                .context("Failed to read AllocatorInfo")?;
873            let mut cursor = std::io::Cursor::new(serialized_info);
874            let (info, _version) = AllocatorInfo::deserialize_with_version(&mut cursor)
875                .context("Failed to deserialize AllocatorInfo")?;
876
877            let mut handles = Vec::new();
878            let mut total_size = 0;
879            for object_id in &info.layers {
880                let handle = ObjectStore::open_object(
881                    &root_store,
882                    *object_id,
883                    HandleOptions::default(),
884                    None,
885                )
886                .await
887                .context("Failed to open allocator layer file")?;
888
889                let size = handle.get_size();
890                total_size += size;
891                handles.push(handle);
892            }
893
894            {
895                let mut inner = self.inner.lock();
896
897                // Check allocated_bytes fits within the device.
898                let mut device_bytes = self.device_size;
899                for (&owner_object_id, &bytes) in &info.allocated_bytes {
900                    ensure!(
901                        bytes <= device_bytes,
902                        anyhow!(FxfsError::Inconsistent).context(format!(
903                            "Allocated bytes exceeds device size: {:?}",
904                            info.allocated_bytes
905                        ))
906                    );
907                    device_bytes -= bytes;
908
909                    inner.owner_bytes.entry(owner_object_id).or_default().allocated_bytes =
910                        Saturating(bytes);
911                }
912
913                inner.info = info;
914            }
915
916            self.tree.append_layers(handles).await.context("Failed to append allocator layers")?;
917            self.filesystem.upgrade().unwrap().object_manager().update_reservation(
918                self.object_id,
919                tree::reservation_amount_from_layer_size(total_size),
920            );
921        }
922
923        Ok(())
924    }
925
926    pub async fn on_replay_complete(self: &Arc<Self>) -> Result<(), Error> {
927        // We can assume the device has been flushed.
928        {
929            let mut inner = self.inner.lock();
930            inner.volumes_deleted_pending_sync.clear();
931            inner.marked_for_deletion = inner.info.marked_for_deletion.clone();
932        }
933
934        // Build free extent structure from disk. For now, we expect disks to have *some* free
935        // space at all times. This may change if we ever support mounting of read-only
936        // redistributable filesystem images.
937        if !self.rebuild_strategy().await.context("Build free extents")? {
938            if self.filesystem.upgrade().unwrap().options().read_only {
939                info!("Device contains no free space (read-only mode).");
940            } else {
941                info!("Device contains no free space.");
942                return Err(FxfsError::Inconsistent)
943                    .context("Device appears to contain no free space");
944            }
945        }
946
947        assert_eq!(std::mem::replace(&mut self.inner.lock().opened, true), false);
948        Ok(())
949    }
950
951    /// Walk all allocations to generate the set of free regions between allocations.
952    /// It is safe to re-run this on live filesystems but it should not be called concurrently
953    /// with allocations, trims or other rebuild_strategy invocations -- use allocation_mutex.
954    /// Returns true if the rebuild made changes to either the free ranges or overflow markers.
955    async fn rebuild_strategy(self: &Arc<Self>) -> Result<bool, Error> {
956        let mut changed = false;
957        let mut layer_set = self.tree.empty_layer_set();
958        layer_set.layers.push((self.temporary_allocations.clone() as Arc<dyn Layer<_, _>>).into());
959        self.tree.add_all_layers_to_layer_set(&mut layer_set);
960
961        let overflow_markers = self.inner.lock().strategy.overflow_markers();
962        self.inner.lock().strategy.reset_overflow_markers();
963
964        let mut to_add = Vec::new();
965        let mut merger = layer_set.merger();
966        let mut iter = self.filter(merger.query(Query::FullScan).await?, false).await?;
967        let mut last_offset = 0;
968        while last_offset < self.device_size {
969            let next_range = match iter.get() {
970                None => {
971                    assert!(last_offset <= self.device_size);
972                    let range = last_offset..self.device_size;
973                    last_offset = self.device_size;
974                    iter.advance().await?;
975                    range
976                }
977                Some(ItemRef { key: AllocatorKey { device_range, .. }, .. }) => {
978                    if device_range.end < last_offset {
979                        iter.advance().await?;
980                        continue;
981                    }
982                    if device_range.start <= last_offset {
983                        last_offset = device_range.end;
984                        iter.advance().await?;
985                        continue;
986                    }
987                    let range = last_offset..device_range.start;
988                    last_offset = device_range.end;
989                    iter.advance().await?;
990                    range
991                }
992            };
993            to_add.push(next_range);
994            // Avoid taking a lock on inner for every free range.
995            if to_add.len() > 100 {
996                let mut inner = self.inner.lock();
997                for range in to_add.drain(..) {
998                    changed |= inner.strategy.force_free(range)?;
999                }
1000            }
1001        }
1002        let mut inner = self.inner.lock();
1003        for range in to_add {
1004            changed |= inner.strategy.force_free(range)?;
1005        }
1006        if overflow_markers != inner.strategy.overflow_markers() {
1007            changed = true;
1008        }
1009        Ok(changed)
1010    }
1011
1012    /// Collects up to `extents_per_batch` free extents of size up to `max_extent_size` from
1013    /// `offset`.  The extents will be reserved.
1014    /// Note that only one `FreeExtents` can exist for the allocator at any time.
1015    pub async fn take_for_trimming(
1016        &self,
1017        offset: u64,
1018        max_extent_size: usize,
1019        extents_per_batch: usize,
1020    ) -> Result<TrimmableExtents<'_>, Error> {
1021        let _guard = self.allocation_mutex.lock().await;
1022
1023        let (mut result, listener) = TrimmableExtents::new(self);
1024        let mut bytes = 0;
1025
1026        // We can't just use self.strategy here because it doesn't necessarily hold all
1027        // free extent metadata in RAM.
1028        let mut layer_set = self.tree.empty_layer_set();
1029        layer_set.layers.push((self.temporary_allocations.clone() as Arc<dyn Layer<_, _>>).into());
1030        self.tree.add_all_layers_to_layer_set(&mut layer_set);
1031        let mut merger = layer_set.merger();
1032        let mut iter = self
1033            .filter(
1034                merger.query(Query::FullRange(&AllocatorKey { device_range: offset..0 })).await?,
1035                false,
1036            )
1037            .await?;
1038        let mut last_offset = offset;
1039        'outer: while last_offset < self.device_size {
1040            let mut range = match iter.get() {
1041                None => {
1042                    assert!(last_offset <= self.device_size);
1043                    let range = last_offset..self.device_size;
1044                    last_offset = self.device_size;
1045                    iter.advance().await?;
1046                    range
1047                }
1048                Some(ItemRef { key: AllocatorKey { device_range, .. }, .. }) => {
1049                    if device_range.end <= last_offset {
1050                        iter.advance().await?;
1051                        continue;
1052                    }
1053                    if device_range.start <= last_offset {
1054                        last_offset = device_range.end;
1055                        iter.advance().await?;
1056                        continue;
1057                    }
1058                    let range = last_offset..device_range.start;
1059                    last_offset = device_range.end;
1060                    iter.advance().await?;
1061                    range
1062                }
1063            };
1064            if range.start < offset {
1065                continue;
1066            }
1067
1068            // 'range' is based on the on-disk LSM tree. We need to check against any uncommitted
1069            // allocations and remove temporarily allocate the range for ourselves.
1070            let mut inner = self.inner.lock();
1071
1072            while range.start < range.end {
1073                let prefix =
1074                    range.start..std::cmp::min(range.start + max_extent_size as u64, range.end);
1075                range = prefix.end..range.end;
1076                bytes += prefix.length()?;
1077                // We can assume the following are safe because we've checked both LSM tree and
1078                // temporary_allocations while holding the lock.
1079                inner.strategy.remove(prefix.clone());
1080                self.temporary_allocations.insert(AllocatorItem {
1081                    key: AllocatorKey { device_range: prefix.clone() },
1082                    value: AllocatorValue::Abs { owner_object_id: INVALID_OBJECT_ID, count: 1 },
1083                })?;
1084                result.add_extent(prefix);
1085                if result.extents.len() == extents_per_batch {
1086                    break 'outer;
1087                }
1088            }
1089            if result.extents.len() == extents_per_batch {
1090                break 'outer;
1091            }
1092        }
1093        {
1094            let mut inner = self.inner.lock();
1095
1096            assert!(inner.trim_reserved_bytes == 0, "Multiple trims ongoing");
1097            inner.trim_listener = Some(listener);
1098            inner.trim_reserved_bytes = bytes;
1099            debug_assert!(
1100                (Saturating(inner.trim_reserved_bytes) + inner.unavailable_bytes()).0
1101                    <= self.device_size
1102            );
1103        }
1104        Ok(result)
1105    }
1106
1107    /// Returns all objects that exist in the parent store that pertain to this allocator.
1108    pub fn parent_objects(&self) -> Vec<u64> {
1109        // The allocator tree needs to store a file for each of the layers in the tree, so we return
1110        // those, since nothing else references them.
1111        self.inner.lock().info.layers.clone()
1112    }
1113
1114    /// Returns all the current owner byte limits (in pairs of `(owner_object_id, bytes)`).
1115    pub fn owner_byte_limits(&self) -> Vec<(u64, u64)> {
1116        self.inner.lock().info.limit_bytes.iter().map(|(k, v)| (*k, *v)).collect()
1117    }
1118
1119    /// Returns (allocated_bytes, byte_limit) for the given owner.
1120    pub fn owner_allocation_info(&self, owner_object_id: u64) -> (u64, Option<u64>) {
1121        let inner = self.inner.lock();
1122        (
1123            inner.owner_bytes.get(&owner_object_id).map(|b| b.used_bytes().0).unwrap_or(0u64),
1124            inner.info.limit_bytes.get(&owner_object_id).copied(),
1125        )
1126    }
1127
1128    /// Returns owner bytes debug information.
1129    pub fn owner_bytes_debug(&self) -> String {
1130        format!("{:?}", self.inner.lock().owner_bytes)
1131    }
1132
1133    fn needs_sync(&self) -> bool {
1134        // TODO(https://fxbug.dev/42178048): This will only trigger if *all* free space is taken up with
1135        // committed deallocated bytes, but we might want to trigger a sync if we're low and there
1136        // happens to be a lot of deallocated bytes as that might mean we can fully satisfy
1137        // allocation requests.
1138        let inner = self.inner.lock();
1139        inner.unavailable_bytes().0 >= self.device_size
1140    }
1141
1142    fn is_system_store(&self, owner_object_id: u64) -> bool {
1143        let fs = self.filesystem.upgrade().unwrap();
1144        owner_object_id == fs.object_manager().root_store_object_id()
1145            || owner_object_id == fs.object_manager().root_parent_store_object_id()
1146    }
1147
1148    /// Updates the accounting to track that a byte reservation has been moved out of an owner to
1149    /// the unattributed pool.
1150    pub fn disown_reservation(&self, old_owner_object_id: Option<u64>, amount: u64) {
1151        if old_owner_object_id.is_none() || amount == 0 {
1152            return;
1153        }
1154        // These 2 mutations should behave as though they're a single atomic mutation.
1155        let mut inner = self.inner.lock();
1156        inner.remove_reservation(old_owner_object_id, amount);
1157        inner.add_reservation(None, amount);
1158    }
1159
1160    /// Creates a lazy inspect node named `str` under `parent` which will yield statistics for the
1161    /// allocator when queried.
1162    pub fn track_statistics(self: &Arc<Self>, parent: &fuchsia_inspect::Node, name: &str) {
1163        let this = Arc::downgrade(self);
1164        parent.record_lazy_child(name, move || {
1165            let this_clone = this.clone();
1166            async move {
1167                let inspector = fuchsia_inspect::Inspector::default();
1168                if let Some(this) = this_clone.upgrade() {
1169                    let counters = this.counters.lock();
1170                    let root = inspector.root();
1171                    root.record_uint("max_extent_size_bytes", this.max_extent_size_bytes);
1172                    root.record_uint("bytes_total", this.device_size);
1173                    let (allocated, reserved, used, unavailable) = {
1174                        // TODO(https://fxbug.dev/42069513): Push-back or rate-limit to prevent DoS.
1175                        let inner = this.inner.lock();
1176                        (
1177                            inner.allocated_bytes().0,
1178                            inner.reserved_bytes(),
1179                            inner.used_bytes().0,
1180                            inner.unavailable_bytes().0,
1181                        )
1182                    };
1183                    root.record_uint("bytes_allocated", allocated);
1184                    root.record_uint("bytes_reserved", reserved);
1185                    root.record_uint("bytes_used", used);
1186                    root.record_uint("bytes_unavailable", unavailable);
1187
1188                    // TODO(https://fxbug.dev/42068224): Post-compute rather than manually computing
1189                    // metrics.
1190                    if let Some(x) = round_div(100 * allocated, this.device_size) {
1191                        root.record_uint("bytes_allocated_percent", x);
1192                    }
1193                    if let Some(x) = round_div(100 * reserved, this.device_size) {
1194                        root.record_uint("bytes_reserved_percent", x);
1195                    }
1196                    if let Some(x) = round_div(100 * used, this.device_size) {
1197                        root.record_uint("bytes_used_percent", x);
1198                    }
1199                    if let Some(x) = round_div(100 * unavailable, this.device_size) {
1200                        root.record_uint("bytes_unavailable_percent", x);
1201                    }
1202
1203                    root.record_uint("num_flushes", counters.num_flushes);
1204                    if let Some(last_flush_time) = counters.last_flush_time.as_ref() {
1205                        root.record_uint(
1206                            "last_flush_time_ms",
1207                            last_flush_time
1208                                .duration_since(std::time::UNIX_EPOCH)
1209                                .unwrap_or(std::time::Duration::ZERO)
1210                                .as_millis()
1211                                .try_into()
1212                                .unwrap_or(0u64),
1213                        );
1214                    }
1215
1216                    let data = this.allocation_size_histogram();
1217                    let alloc_sizes = root.create_uint_linear_histogram(
1218                        "allocation_size_histogram",
1219                        fuchsia_inspect::LinearHistogramParams {
1220                            floor: 1,
1221                            step_size: 1,
1222                            buckets: 64,
1223                        },
1224                    );
1225                    for (i, count) in data.iter().enumerate() {
1226                        if i != 0 {
1227                            alloc_sizes.insert_multiple(i as u64, *count as usize);
1228                        }
1229                    }
1230                    root.record(alloc_sizes);
1231
1232                    let data = this.inner.lock().rebuild_strategy_trigger_histogram;
1233                    let triggers = root.create_uint_linear_histogram(
1234                        "rebuild_strategy_triggers",
1235                        fuchsia_inspect::LinearHistogramParams {
1236                            floor: 1,
1237                            step_size: 1,
1238                            buckets: 64,
1239                        },
1240                    );
1241                    for (i, count) in data.iter().enumerate() {
1242                        if i != 0 {
1243                            triggers.insert_multiple(i as u64, *count as usize);
1244                        }
1245                    }
1246                    root.record(triggers);
1247
1248                    let allocator_ref = this.clone();
1249                    root.record_child("lsm_tree", move |node| {
1250                        allocator_ref.tree.record_inspect_data(node);
1251                    });
1252                }
1253                Ok(inspector)
1254            }
1255            .boxed()
1256        });
1257    }
1258
1259    /// Returns the offset of the first byte which has not been used by the allocator since its
1260    /// creation.
1261    /// NB: This does *not* take into account existing allocations.  This is only reliable when the
1262    /// allocator was created from scratch, without any pre-existing allocations.
1263    pub fn maximum_offset(&self) -> u64 {
1264        self.maximum_offset.load(Ordering::Relaxed)
1265    }
1266}
1267
1268impl Drop for Allocator {
1269    fn drop(&mut self) {
1270        let inner = self.inner.lock();
1271        // Uncommitted and reserved should be released back using RAII, so they should be zero.
1272        assert_eq!(inner.uncommitted_allocated_bytes(), 0);
1273        assert_eq!(inner.reserved_bytes(), 0);
1274    }
1275}
1276
1277#[fxfs_trace::trace]
1278impl Allocator {
1279    /// Returns the object ID for the allocator.
1280    pub fn object_id(&self) -> u64 {
1281        self.object_id
1282    }
1283
1284    /// Returns information about the allocator such as the layer files storing persisted
1285    /// allocations.
1286    pub fn info(&self) -> AllocatorInfo {
1287        self.inner.lock().info.clone()
1288    }
1289
1290    /// Tries to allocate enough space for |object_range| in the specified object and returns the
1291    /// device range allocated.
1292    /// The allocated range may be short (e.g. due to fragmentation), in which case the caller can
1293    /// simply call allocate again until they have enough blocks.
1294    ///
1295    /// We also store the object store ID of the store that the allocation should be assigned to so
1296    /// that we have a means to delete encrypted stores without needing the encryption key.
1297    #[trace]
1298    pub async fn allocate(
1299        self: &Arc<Self>,
1300        transaction: &mut Transaction<'_>,
1301        owner_object_id: u64,
1302        mut len: u64,
1303    ) -> Result<Range<u64>, Error> {
1304        ensure!(self.allocations_allowed.load(Ordering::SeqCst), FxfsError::Unavailable);
1305        assert_eq!(len % self.block_size, 0);
1306        len = std::cmp::min(len, self.max_extent_size_bytes);
1307        debug_assert_ne!(owner_object_id, INVALID_OBJECT_ID);
1308
1309        // Make sure we have space reserved before we try and find the space.
1310        let reservation = if let Some(reservation) = transaction.allocator_reservation {
1311            match reservation.owner_object_id {
1312                // If there is no owner, this must be a system store that we're allocating for.
1313                None => assert!(self.is_system_store(owner_object_id)),
1314                // If it has an owner, it should not be different than the allocating owner.
1315                Some(res_owner_object_id) => assert_eq!(owner_object_id, res_owner_object_id),
1316            };
1317            // Reservation limits aren't necessarily a multiple of the block size.
1318            let r = reservation
1319                .reserve_with(|limit| std::cmp::min(len, round_down(limit, self.block_size)));
1320            len = r.amount();
1321            Left(r)
1322        } else {
1323            let mut inner = self.inner.lock();
1324            assert!(inner.opened);
1325            // Do not exceed the limit for the owner or the device.
1326            let device_used = inner.used_bytes();
1327            let owner_bytes_left = inner.owner_id_bytes_left(owner_object_id);
1328            // We must take care not to use up space that might be reserved.
1329            let limit =
1330                std::cmp::min(owner_bytes_left, (Saturating(self.device_size) - device_used).0);
1331            len = round_down(std::cmp::min(len, limit), self.block_size);
1332            let owner_entry = inner.owner_bytes.entry(owner_object_id).or_default();
1333            owner_entry.reserved_bytes += len;
1334            Right(ReservationImpl::<_, Self>::new(&**self, Some(owner_object_id), len))
1335        };
1336
1337        ensure!(len > 0, FxfsError::NoSpace);
1338
1339        // If volumes have been deleted, flush the device so that we can use any of the freed space.
1340        let volumes_deleted = {
1341            let inner = self.inner.lock();
1342            (!inner.volumes_deleted_pending_sync.is_empty())
1343                .then(|| inner.volumes_deleted_pending_sync.clone())
1344        };
1345
1346        if let Some(volumes_deleted) = volumes_deleted {
1347            // No locks are held here, so in theory, there could be unnecessary syncs, but it
1348            // should be sufficiently rare that it won't matter.
1349            self.filesystem
1350                .upgrade()
1351                .unwrap()
1352                .sync(SyncOptions {
1353                    flush_device: true,
1354                    precondition: Some(Box::new(|| {
1355                        !self.inner.lock().volumes_deleted_pending_sync.is_empty()
1356                    })),
1357                    ..Default::default()
1358                })
1359                .await?;
1360
1361            {
1362                let mut inner = self.inner.lock();
1363                for owner_id in volumes_deleted {
1364                    inner.volumes_deleted_pending_sync.remove(&owner_id);
1365                    inner.marked_for_deletion.insert(owner_id);
1366                }
1367            }
1368
1369            let _guard = self.allocation_mutex.lock().await;
1370            self.rebuild_strategy().await?;
1371        }
1372
1373        #[allow(clippy::never_loop)] // Loop used as a for {} else {}.
1374        let _guard = 'sync: loop {
1375            // Cap number of sync attempts before giving up on finding free space.
1376            for _ in 0..10 {
1377                {
1378                    let guard = self.allocation_mutex.lock().await;
1379
1380                    if !self.needs_sync() {
1381                        break 'sync guard;
1382                    }
1383                }
1384
1385                // All the free space is currently tied up with deallocations, so we need to sync
1386                // and flush the device to free that up.
1387                //
1388                // We can't hold the allocation lock whilst we sync here because the allocation lock
1389                // is also taken in apply_mutations, which is called when journal locks are held,
1390                // and we call sync here which takes those same locks, so it would have the
1391                // potential to result in a deadlock.  Sync holds its own lock to guard against
1392                // multiple syncs occurring at the same time, and we can supply a precondition that
1393                // is evaluated under that lock to ensure we don't sync twice if we don't need to.
1394                self.filesystem
1395                    .upgrade()
1396                    .unwrap()
1397                    .sync(SyncOptions {
1398                        flush_device: true,
1399                        precondition: Some(Box::new(|| self.needs_sync())),
1400                        ..Default::default()
1401                    })
1402                    .await?;
1403            }
1404            bail!(
1405                anyhow!(FxfsError::NoSpace).context("Sync failed to yield sufficient free space.")
1406            );
1407        };
1408
1409        let mut trim_listener = None;
1410        {
1411            let mut inner = self.inner.lock();
1412            inner.allocation_size_histogram[std::cmp::min(63, len / self.block_size) as usize] += 1;
1413
1414            // If trimming would be the reason that this allocation gets cut short, wait for
1415            // trimming to complete before proceeding.
1416            let avail = self
1417                .device_size
1418                .checked_sub(inner.unavailable_bytes().0)
1419                .ok_or(FxfsError::Inconsistent)?;
1420            let free_and_not_being_trimmed =
1421                inner.bytes_available_not_being_trimmed(self.device_size)?;
1422            if free_and_not_being_trimmed < std::cmp::min(len, avail) {
1423                debug_assert!(inner.trim_reserved_bytes > 0);
1424                trim_listener = std::mem::take(&mut inner.trim_listener);
1425            }
1426        }
1427
1428        if let Some(listener) = trim_listener {
1429            listener.await;
1430        }
1431
1432        let result = loop {
1433            {
1434                let mut inner = self.inner.lock();
1435
1436                // While we know rebuild_strategy and take_for_trim are not running (we hold guard),
1437                // apply temporary_allocation removals.
1438                for device_range in inner.dropped_temporary_allocations.drain(..) {
1439                    self.temporary_allocations.erase(&AllocatorKey { device_range });
1440                }
1441
1442                match inner.strategy.allocate(len) {
1443                    Err(FxfsError::NotFound) => {
1444                        // Overflow. Fall through and rebuild
1445                        inner.rebuild_strategy_trigger_histogram
1446                            [std::cmp::min(63, (len / self.block_size) as usize)] += 1;
1447                    }
1448                    Err(err) => {
1449                        error!(err:%; "Likely filesystem corruption.");
1450                        return Err(err.into());
1451                    }
1452                    Ok(x) => {
1453                        break x;
1454                    }
1455                }
1456            }
1457            // We've run out of extents of the requested length in RAM but there
1458            // exists more of this size on device. Rescan device and circle back.
1459            // We already hold the allocation_mutex, so exclusive access is guaranteed.
1460            if !self.rebuild_strategy().await? {
1461                error!("Cannot find additional free space. Corruption?");
1462                return Err(FxfsError::Inconsistent.into());
1463            }
1464        };
1465
1466        debug!(device_range:? = result; "allocate");
1467
1468        let len = result.length().unwrap();
1469        let reservation_owner = reservation.either(
1470            // Left means we got an outside reservation.
1471            |l| {
1472                l.forget_some(len);
1473                l.owner_object_id()
1474            },
1475            |r| {
1476                r.forget_some(len);
1477                r.owner_object_id()
1478            },
1479        );
1480
1481        {
1482            let mut inner = self.inner.lock();
1483            let owner_entry = inner.owner_bytes.entry(owner_object_id).or_default();
1484            owner_entry.uncommitted_allocated_bytes += len;
1485            // If the reservation has an owner, ensure they are the same.
1486            assert_eq!(owner_object_id, reservation_owner.unwrap_or(owner_object_id));
1487            inner.remove_reservation(reservation_owner, len);
1488            self.temporary_allocations.insert(AllocatorItem {
1489                key: AllocatorKey { device_range: result.clone() },
1490                value: AllocatorValue::Abs { owner_object_id, count: 1 },
1491            })?;
1492        }
1493
1494        let mutation =
1495            AllocatorMutation::Allocate { device_range: result.clone().into(), owner_object_id };
1496        assert!(transaction.add(self.object_id(), Mutation::Allocator(mutation)).is_none());
1497
1498        Ok(result)
1499    }
1500
1501    /// Marks the given device range as allocated.  The main use case for this at this time is for
1502    /// the super-block which needs to be at a fixed location on the device.
1503    #[trace]
1504    pub fn mark_allocated(
1505        &self,
1506        transaction: &mut Transaction<'_>,
1507        owner_object_id: u64,
1508        device_range: Range<u64>,
1509    ) -> Result<(), Error> {
1510        debug_assert_ne!(owner_object_id, INVALID_OBJECT_ID);
1511        {
1512            let len = device_range.length().map_err(|_| FxfsError::InvalidArgs)?;
1513
1514            let mut inner = self.inner.lock();
1515            let device_used = inner.used_bytes();
1516            let owner_id_bytes_left = inner.owner_id_bytes_left(owner_object_id);
1517            let owner_entry = inner.owner_bytes.entry(owner_object_id).or_default();
1518            ensure!(
1519                device_range.end <= self.device_size
1520                    && (Saturating(self.device_size) - device_used).0 >= len
1521                    && owner_id_bytes_left >= len,
1522                FxfsError::NoSpace
1523            );
1524            if let Some(reservation) = &mut transaction.allocator_reservation {
1525                // The transaction takes ownership of this hold.
1526                reservation.reserve(len).ok_or(FxfsError::NoSpace)?.forget();
1527            }
1528            owner_entry.uncommitted_allocated_bytes += len;
1529            inner.strategy.remove(device_range.clone());
1530            self.temporary_allocations.insert(AllocatorItem {
1531                key: AllocatorKey { device_range: device_range.clone() },
1532                value: AllocatorValue::Abs { owner_object_id, count: 1 },
1533            })?;
1534        }
1535        let mutation =
1536            AllocatorMutation::Allocate { device_range: device_range.into(), owner_object_id };
1537        transaction.add(self.object_id(), Mutation::Allocator(mutation));
1538        Ok(())
1539    }
1540
1541    /// Sets the limits for an owner object in terms of usage.
1542    pub fn set_bytes_limit(
1543        &self,
1544        transaction: &mut Transaction<'_>,
1545        owner_object_id: u64,
1546        bytes: u64,
1547    ) -> Result<(), Error> {
1548        // System stores cannot be given limits.
1549        assert!(!self.is_system_store(owner_object_id));
1550        transaction.add(
1551            self.object_id(),
1552            Mutation::Allocator(AllocatorMutation::SetLimit { owner_object_id, bytes }),
1553        );
1554        Ok(())
1555    }
1556
1557    /// Gets the bytes limit for an owner object.
1558    pub fn get_owner_bytes_limit(&self, owner_object_id: u64) -> Option<u64> {
1559        self.inner.lock().info.limit_bytes.get(&owner_object_id).copied()
1560    }
1561
1562    /// Gets the number of bytes currently used by allocations, reservations or uncommitted
1563    /// allocations.
1564    pub fn get_owner_bytes_used(&self, owner_object_id: u64) -> u64 {
1565        self.inner.lock().owner_bytes.get(&owner_object_id).map_or(0, |info| info.used_bytes().0)
1566    }
1567
1568    /// Deallocates the given device range for the specified object.
1569    #[trace]
1570    pub async fn deallocate(
1571        &self,
1572        transaction: &mut Transaction<'_>,
1573        owner_object_id: u64,
1574        dealloc_range: Range<u64>,
1575    ) -> Result<u64, Error> {
1576        debug!(device_range:? = dealloc_range; "deallocate");
1577        ensure!(dealloc_range.is_valid(), FxfsError::InvalidArgs);
1578        // We don't currently support sharing of allocations (value.count always equals 1), so as
1579        // long as we can assume the deallocated range is actually allocated, we can avoid device
1580        // access.
1581        let deallocated = dealloc_range.end - dealloc_range.start;
1582        let mutation = AllocatorMutation::Deallocate {
1583            device_range: dealloc_range.clone().into(),
1584            owner_object_id,
1585        };
1586        transaction.add(self.object_id(), Mutation::Allocator(mutation));
1587
1588        let _guard = self.allocation_mutex.lock().await;
1589
1590        // We use `dropped_temporary_allocations` to defer removals from `temporary_allocations` in
1591        // places where we can't execute async code or take locks.
1592        //
1593        // It's important we don't ever remove entries from `temporary_allocations` without
1594        // holding the `allocation_mutex` lock or else we may end up with an inconsistent view of
1595        // available disk space when we combine temporary_allocations with the LSMTree.
1596        // This is normally done in `allocate()` but we also need to apply these here because
1597        // `temporary_allocations` is also used to track deallocated space until it has been
1598        // flushed (see comment below). A user may allocate and then deallocate space before calling
1599        // allocate() a second time, so if we do not clean up here, we may end up with the same
1600        // range in temporary_allocations twice (once for allocate, once for deallocate).
1601        let mut inner = self.inner.lock();
1602        for device_range in inner.dropped_temporary_allocations.drain(..) {
1603            self.temporary_allocations.erase(&AllocatorKey { device_range });
1604        }
1605
1606        // We can't reuse deallocated space immediately because failure to successfully flush will
1607        // mean that on next mount, we may find this space is still assigned to the deallocated
1608        // region. To avoid immediate reuse, we hold these regions in 'temporary_allocations' until
1609        // after a successful flush so we know the region is safe to reuse.
1610        self.temporary_allocations
1611            .insert(AllocatorItem {
1612                key: AllocatorKey { device_range: dealloc_range.clone() },
1613                value: AllocatorValue::Abs { owner_object_id, count: 1 },
1614            })
1615            .context("tracking deallocated")?;
1616
1617        Ok(deallocated)
1618    }
1619
1620    /// Marks allocations associated with a given |owner_object_id| for deletion.
1621    ///
1622    /// This is used as part of deleting encrypted volumes (ObjectStore) without having the keys.
1623    ///
1624    /// MarkForDeletion mutations eventually manipulates allocator metadata (AllocatorInfo) instead
1625    /// of the mutable layer but we must be careful not to do this too early and risk premature
1626    /// reuse of extents.
1627    ///
1628    /// Replay is not guaranteed until the *device* gets flushed, so we cannot reuse the deleted
1629    /// extents until we've flushed the device.
1630    ///
1631    /// TODO(b/316827348): Consider removing the use of mark_for_deletion in AllocatorInfo and
1632    /// just compacting?
1633    ///
1634    /// After an allocator.flush() (i.e. a major compaction), we know that there is no data left
1635    /// in the layer files for this owner_object_id and we are able to clear `marked_for_deletion`.
1636    pub fn mark_for_deletion(&self, transaction: &mut Transaction<'_>, owner_object_id: u64) {
1637        // Note that because the actual time of deletion (the next major compaction) is undefined,
1638        // |owner_object_id| should not be reused after this call.
1639        transaction.add(
1640            self.object_id(),
1641            Mutation::Allocator(AllocatorMutation::MarkForDeletion(owner_object_id)),
1642        );
1643    }
1644
1645    /// Called when the device has been flush and indicates what the journal log offset was when
1646    /// that happened.
1647    pub fn did_flush_device(&self, flush_log_offset: u64) {
1648        // First take out the deallocations that we now know to be flushed.  The list is maintained
1649        // in order, so we can stop on the first entry that we find that should not be unreserved
1650        // yet.
1651        #[allow(clippy::never_loop)] // Loop used as a for {} else {}.
1652        let deallocs = 'deallocs_outer: loop {
1653            let mut inner = self.inner.lock();
1654            for (index, dealloc) in inner.committed_deallocated.iter().enumerate() {
1655                if dealloc.log_file_offset >= flush_log_offset {
1656                    let mut deallocs = inner.committed_deallocated.split_off(index);
1657                    // Swap because we want the opposite of what split_off does.
1658                    std::mem::swap(&mut inner.committed_deallocated, &mut deallocs);
1659                    break 'deallocs_outer deallocs;
1660                }
1661            }
1662            break std::mem::take(&mut inner.committed_deallocated);
1663        };
1664
1665        // Now we can free those elements.
1666        let mut inner = self.inner.lock();
1667        let mut totals = BTreeMap::<u64, u64>::new();
1668        for dealloc in deallocs {
1669            *(totals.entry(dealloc.owner_object_id).or_default()) +=
1670                dealloc.range.length().unwrap();
1671            inner.strategy.free(dealloc.range.clone()).expect("dealloced ranges");
1672            self.temporary_allocations.erase(&AllocatorKey { device_range: dealloc.range.clone() });
1673        }
1674
1675        // This *must* come after we've removed the records from reserved reservations because the
1676        // allocator uses this value to decide whether or not a device-flush is required and it must
1677        // be possible to find free space if it thinks no device-flush is required.
1678        for (owner_object_id, total) in totals {
1679            match inner.owner_bytes.get_mut(&owner_object_id) {
1680                Some(counters) => counters.committed_deallocated_bytes -= total,
1681                None => {
1682                    // This should only be possible if the volume has been deleted and a sync is
1683                    // pending.
1684                    assert!(inner.volumes_deleted_pending_sync.contains(&owner_object_id));
1685                }
1686            }
1687        }
1688    }
1689
1690    /// Returns a reservation that can be used later, or None if there is insufficient space. The
1691    /// |owner_object_id| indicates which object in the root object store the reservation is for.
1692    pub fn reserve(
1693        self: Arc<Self>,
1694        owner_object_id: Option<u64>,
1695        amount: u64,
1696    ) -> Option<Reservation> {
1697        {
1698            let mut inner = self.inner.lock();
1699
1700            let device_free = (Saturating(self.device_size) - inner.used_bytes()).0;
1701
1702            let limit = match owner_object_id {
1703                Some(id) => std::cmp::min(inner.owner_id_bytes_left(id), device_free),
1704                None => device_free,
1705            };
1706            if limit < amount {
1707                return None;
1708            }
1709            inner.add_reservation(owner_object_id, amount);
1710        }
1711        Some(Reservation::new(self, owner_object_id, amount))
1712    }
1713
1714    /// Like reserve, but takes a callback is passed the |limit| and should return the amount,
1715    /// which can be zero.
1716    pub fn reserve_with(
1717        self: Arc<Self>,
1718        owner_object_id: Option<u64>,
1719        amount: impl FnOnce(u64) -> u64,
1720    ) -> Reservation {
1721        let amount = {
1722            let mut inner = self.inner.lock();
1723
1724            let device_free = (Saturating(self.device_size) - inner.used_bytes()).0;
1725
1726            let amount = amount(match owner_object_id {
1727                Some(id) => std::cmp::min(inner.owner_id_bytes_left(id), device_free),
1728                None => device_free,
1729            });
1730
1731            inner.add_reservation(owner_object_id, amount);
1732
1733            amount
1734        };
1735
1736        Reservation::new(self, owner_object_id, amount)
1737    }
1738
1739    /// Returns the total number of allocated bytes.
1740    pub fn get_allocated_bytes(&self) -> u64 {
1741        self.inner.lock().allocated_bytes().0
1742    }
1743
1744    /// Returns the size of bytes available to allocate.
1745    pub fn get_disk_bytes(&self) -> u64 {
1746        self.device_size
1747    }
1748
1749    /// Returns the total number of allocated bytes per owner_object_id.
1750    /// Note that this is quite an expensive operation as it copies the collection.
1751    /// This is intended for use in fsck() and friends, not general use code.
1752    pub fn get_owner_allocated_bytes(&self) -> BTreeMap<u64, u64> {
1753        self.inner.lock().owner_bytes.iter().map(|(k, v)| (*k, v.allocated_bytes.0)).collect()
1754    }
1755
1756    /// Returns the number of allocated and reserved bytes.
1757    pub fn get_used_bytes(&self) -> Saturating<u64> {
1758        let inner = self.inner.lock();
1759        inner.used_bytes()
1760    }
1761}
1762
1763impl ReservationOwner for Allocator {
1764    fn release_reservation(&self, owner_object_id: Option<u64>, amount: u64) {
1765        self.inner.lock().remove_reservation(owner_object_id, amount);
1766    }
1767}
1768
1769#[async_trait]
1770impl JournalingObject for Allocator {
1771    fn apply_mutation(
1772        &self,
1773        mutation: Mutation,
1774        context: &ApplyContext<'_, '_>,
1775        _assoc_obj: AssocObj<'_>,
1776    ) -> Result<(), Error> {
1777        match mutation {
1778            Mutation::Allocator(AllocatorMutation::MarkForDeletion(owner_object_id)) => {
1779                let mut inner = self.inner.lock();
1780                inner.owner_bytes.remove(&owner_object_id);
1781
1782                // We use `info.marked_for_deletion` to track the committed state and
1783                // `inner.marked_for_deletion` to track volumes marked for deletion *after* we have
1784                // flushed the device.  It is not safe to use extents belonging to deleted volumes
1785                // until after we have flushed the device.
1786                inner.info.marked_for_deletion.insert(owner_object_id);
1787                inner.volumes_deleted_pending_sync.insert(owner_object_id);
1788
1789                inner.info.limit_bytes.remove(&owner_object_id);
1790            }
1791            Mutation::Allocator(AllocatorMutation::Allocate { device_range, owner_object_id }) => {
1792                self.maximum_offset.fetch_max(device_range.end, Ordering::Relaxed);
1793                let item = AllocatorItem {
1794                    key: AllocatorKey { device_range: device_range.clone().into() },
1795                    value: AllocatorValue::Abs { count: 1, owner_object_id },
1796                };
1797                let len = item.key.device_range.length().unwrap();
1798                let lower_bound = item.key.lower_bound_for_merge_into();
1799                self.tree.merge_into(item, &lower_bound);
1800                let mut inner = self.inner.lock();
1801                let entry = inner.owner_bytes.entry(owner_object_id).or_default();
1802                entry.allocated_bytes += len;
1803                if let ApplyMode::Live(transaction) = context.mode {
1804                    entry.uncommitted_allocated_bytes -= len;
1805                    // Note that we cannot drop entries from temporary_allocations without holding
1806                    // the allocation_mutex as it may introduce races. We instead add the range to
1807                    // a Vec that can be applied later when we hold the lock (See comment on
1808                    // `dropped_temporary_allocations` above).
1809                    inner.dropped_temporary_allocations.push(device_range.into());
1810                    if let Some(reservation) = transaction.allocator_reservation {
1811                        reservation.commit(len);
1812                    }
1813                }
1814            }
1815            Mutation::Allocator(AllocatorMutation::Deallocate {
1816                device_range,
1817                owner_object_id,
1818            }) => {
1819                let item = AllocatorItem {
1820                    key: AllocatorKey { device_range: device_range.into() },
1821                    value: AllocatorValue::None,
1822                };
1823                let len = item.key.device_range.length().unwrap();
1824
1825                {
1826                    let mut inner = self.inner.lock();
1827                    {
1828                        let entry = inner.owner_bytes.entry(owner_object_id).or_default();
1829                        entry.allocated_bytes -= len;
1830                        if context.mode.is_live() {
1831                            entry.committed_deallocated_bytes += len;
1832                        }
1833                    }
1834                    if context.mode.is_live() {
1835                        inner.committed_deallocated.push_back(CommittedDeallocation {
1836                            log_file_offset: context.checkpoint.file_offset,
1837                            range: item.key.device_range.clone(),
1838                            owner_object_id,
1839                        });
1840                    }
1841                    if let ApplyMode::Live(Transaction {
1842                        allocator_reservation: Some(reservation),
1843                        ..
1844                    }) = context.mode
1845                    {
1846                        inner.add_reservation(reservation.owner_object_id(), len);
1847                        reservation.add(len);
1848                    }
1849                }
1850                let lower_bound = item.key.lower_bound_for_merge_into();
1851                self.tree.merge_into(item, &lower_bound);
1852            }
1853            Mutation::Allocator(AllocatorMutation::SetLimit { owner_object_id, bytes }) => {
1854                // Journal replay is ordered and each of these calls is idempotent. So the last one
1855                // will be respected, it doesn't matter if the value is already set, or gets changed
1856                // multiple times during replay. When it gets opened it will be merged in with the
1857                // snapshot.
1858                self.inner.lock().info.limit_bytes.insert(owner_object_id, bytes);
1859            }
1860            Mutation::BeginFlush => {
1861                self.tree.seal();
1862                // Transfer our running count for allocated_bytes so that it gets written to the new
1863                // info file when flush completes.
1864                let mut inner = self.inner.lock();
1865                let allocated_bytes =
1866                    inner.owner_bytes.iter().map(|(k, v)| (*k, v.allocated_bytes.0)).collect();
1867                inner.info.allocated_bytes = allocated_bytes;
1868            }
1869            Mutation::EndFlush => {}
1870            _ => bail!("unexpected mutation: {:?}", mutation),
1871        }
1872        Ok(())
1873    }
1874
1875    fn drop_mutation(&self, mutation: Mutation, transaction: &Transaction<'_>) {
1876        match mutation {
1877            Mutation::Allocator(AllocatorMutation::Allocate { device_range, owner_object_id }) => {
1878                let len = device_range.length().unwrap();
1879                let mut inner = self.inner.lock();
1880                inner
1881                    .owner_bytes
1882                    .entry(owner_object_id)
1883                    .or_default()
1884                    .uncommitted_allocated_bytes -= len;
1885                if let Some(reservation) = transaction.allocator_reservation {
1886                    let res_owner = reservation.owner_object_id();
1887                    inner.add_reservation(res_owner, len);
1888                    reservation.release_reservation(res_owner, len);
1889                }
1890                inner.strategy.free(device_range.clone().into()).expect("drop mutaton");
1891                self.temporary_allocations
1892                    .erase(&AllocatorKey { device_range: device_range.into() });
1893            }
1894            Mutation::Allocator(AllocatorMutation::Deallocate { device_range, .. }) => {
1895                self.temporary_allocations
1896                    .erase(&AllocatorKey { device_range: device_range.into() });
1897            }
1898            _ => {}
1899        }
1900    }
1901
1902    async fn flush(&self) -> Result<Version, Error> {
1903        let filesystem = self.filesystem.upgrade().unwrap();
1904        let object_manager = filesystem.object_manager();
1905        let earliest_version = self.tree.get_earliest_version();
1906        if !object_manager.needs_flush(self.object_id()) && earliest_version == LATEST_VERSION {
1907            // Early exit, but still return the earliest version used by a struct in the tree
1908            return Ok(earliest_version);
1909        }
1910
1911        let fs = self.filesystem.upgrade().unwrap();
1912        let mut flusher = Flusher::new(self, &fs).await;
1913        let (new_layer_file, info) = flusher.start().await?;
1914        flusher.finish(new_layer_file, info).await
1915    }
1916}
1917
1918// The merger is unable to merge extents that exist like the following:
1919//
1920//     |----- +1 -----|
1921//                    |----- -1 -----|
1922//                    |----- +2 -----|
1923//
1924// It cannot coalesce them because it has to emit the +1 record so that it can move on and merge the
1925// -1 and +2 records. To address this, we add another stage that applies after merging which
1926// coalesces records after they have been emitted.  This is a bit simpler than merging because the
1927// records cannot overlap, so it's just a question of merging adjacent records if they happen to
1928// have the same delta and object_id.
1929
1930pub struct CoalescingIterator<I> {
1931    iter: I,
1932    item: Option<AllocatorItem>,
1933}
1934
1935impl<I: LayerIterator<AllocatorKey, AllocatorValue>> CoalescingIterator<I> {
1936    pub async fn new(iter: I) -> Result<CoalescingIterator<I>, Error> {
1937        let mut iter = Self { iter, item: None };
1938        iter.advance().await?;
1939        Ok(iter)
1940    }
1941}
1942
1943#[async_trait]
1944impl<I: LayerIterator<AllocatorKey, AllocatorValue>> LayerIterator<AllocatorKey, AllocatorValue>
1945    for CoalescingIterator<I>
1946{
1947    async fn advance(&mut self) -> Result<(), Error> {
1948        self.item = self.iter.get().map(|x| x.cloned());
1949        if self.item.is_none() {
1950            return Ok(());
1951        }
1952        let left = self.item.as_mut().unwrap();
1953        loop {
1954            self.iter.advance().await?;
1955            match self.iter.get() {
1956                None => return Ok(()),
1957                Some(right) => {
1958                    // The two records cannot overlap.
1959                    ensure!(
1960                        left.key.device_range.end <= right.key.device_range.start,
1961                        FxfsError::Inconsistent
1962                    );
1963                    // We can only coalesce records if they are touching and have the same value.
1964                    if left.key.device_range.end < right.key.device_range.start
1965                        || left.value != *right.value
1966                    {
1967                        return Ok(());
1968                    }
1969                    left.key.device_range.end = right.key.device_range.end;
1970                }
1971            }
1972        }
1973    }
1974
1975    fn get(&self) -> Option<ItemRef<'_, AllocatorKey, AllocatorValue>> {
1976        self.item.as_ref().map(|x| x.as_item_ref())
1977    }
1978}
1979
1980struct Flusher<'a> {
1981    allocator: &'a Allocator,
1982    fs: &'a Arc<FxFilesystem>,
1983    _guard: WriteGuard<'a>,
1984}
1985
1986impl<'a> Flusher<'a> {
1987    async fn new(allocator: &'a Allocator, fs: &'a Arc<FxFilesystem>) -> Self {
1988        let keys = lock_keys![LockKey::flush(allocator.object_id())];
1989        Self { allocator, fs, _guard: fs.lock_manager().write_lock(keys).await }
1990    }
1991
1992    fn txn_options(allocator_reservation: &Reservation) -> Options<'_> {
1993        Options {
1994            skip_journal_checks: true,
1995            borrow_metadata_space: true,
1996            allocator_reservation: Some(allocator_reservation),
1997            ..Default::default()
1998        }
1999    }
2000
2001    async fn start(&mut self) -> Result<(DataObjectHandle<ObjectStore>, AllocatorInfo), Error> {
2002        let object_manager = self.fs.object_manager();
2003        let mut transaction = self
2004            .fs
2005            .clone()
2006            .new_transaction(lock_keys![], Self::txn_options(object_manager.metadata_reservation()))
2007            .await?;
2008
2009        let root_store = self.fs.root_store();
2010        let layer_object_handle = ObjectStore::create_object(
2011            &root_store,
2012            &mut transaction,
2013            HandleOptions { skip_journal_checks: true, ..Default::default() },
2014            None,
2015        )
2016        .await?;
2017        root_store.add_to_graveyard(&mut transaction, layer_object_handle.object_id());
2018        // It's important that this transaction does not include any allocations because we use
2019        // BeginFlush as a snapshot point for mutations to the tree: other allocator mutations
2020        // within this transaction might get applied before seal (which would be OK), but they could
2021        // equally get applied afterwards (since Transaction makes no guarantees about the order in
2022        // which mutations are applied whilst committing), in which case they'd get lost on replay
2023        // because the journal will only send mutations that follow this transaction.
2024        transaction.add(self.allocator.object_id(), Mutation::BeginFlush);
2025        let info = transaction
2026            .commit_with_callback(|_| {
2027                // We must capture `info` as it is when we start flushing. Subsequent transactions
2028                // can end up modifying `info` and we shouldn't capture those here.
2029                self.allocator.inner.lock().info.clone()
2030            })
2031            .await?;
2032        Ok((layer_object_handle, info))
2033    }
2034
2035    async fn finish(
2036        self,
2037        layer_object_handle: DataObjectHandle<ObjectStore>,
2038        mut info: AllocatorInfo,
2039    ) -> Result<Version, Error> {
2040        let object_manager = self.fs.object_manager();
2041        let txn_options = Self::txn_options(object_manager.metadata_reservation());
2042
2043        let layer_set = self.allocator.tree.immutable_layer_set();
2044        let total_len = layer_set.sum_len();
2045        {
2046            let start_time = std::time::Instant::now();
2047            let merged_layer_count = layer_set.layers.len();
2048            let mut merger = layer_set.merger();
2049            let iter = self.allocator.filter(merger.query(Query::FullScan).await?, true).await?;
2050            let iter = CoalescingIterator::new(iter).await?;
2051            let bytes_written = compact_with_iterator(
2052                iter,
2053                total_len,
2054                DirectWriter::new(&layer_object_handle, txn_options).await,
2055                layer_object_handle.block_size(),
2056                Some(self.fs.journal().get_compaction_yielder()),
2057            )
2058            .await?;
2059
2060            self.allocator.tree.report_compaction_metrics(
2061                bytes_written,
2062                start_time.elapsed(),
2063                merged_layer_count,
2064            );
2065        }
2066
2067        let root_store = self.fs.root_store();
2068
2069        // Both of these forward-declared variables need to outlive the transaction.
2070        let object_handle;
2071        let reservation_update;
2072        let mut transaction = self
2073            .fs
2074            .clone()
2075            .new_transaction(
2076                lock_keys![LockKey::object(
2077                    root_store.store_object_id(),
2078                    self.allocator.object_id()
2079                )],
2080                txn_options,
2081            )
2082            .await?;
2083        let mut serialized_info = Vec::new();
2084
2085        debug!(oid = layer_object_handle.object_id(); "new allocator layer file");
2086        object_handle = ObjectStore::open_object(
2087            &root_store,
2088            self.allocator.object_id(),
2089            HandleOptions::default(),
2090            None,
2091        )
2092        .await?;
2093
2094        // Move all the existing layers to the graveyard.
2095        for object_id in &info.layers {
2096            root_store.add_to_graveyard(&mut transaction, *object_id);
2097        }
2098
2099        // Write out updated info.
2100
2101        // After successfully flushing, all the stores that were marked for deletion at the time of
2102        // the BeginFlush transaction, no longer need to be marked for deletion.  There can be
2103        // stores that have been deleted since the BeginFlush transaction, but they will be covered
2104        // by a MarkForDeletion mutation.
2105        let marked_for_deletion = std::mem::take(&mut info.marked_for_deletion);
2106
2107        info.layers = vec![layer_object_handle.object_id()];
2108
2109        info.serialize_with_version(&mut serialized_info)?;
2110
2111        let mut buf = object_handle.allocate_buffer(serialized_info.len()).await;
2112        buf.as_mut_slice()[..serialized_info.len()].copy_from_slice(&serialized_info[..]);
2113        object_handle.txn_write(&mut transaction, 0u64, buf.as_ref()).await?;
2114
2115        reservation_update = ReservationUpdate::new(tree::reservation_amount_from_layer_size(
2116            layer_object_handle.get_size(),
2117        ));
2118
2119        // It's important that EndFlush is in the same transaction that we write AllocatorInfo,
2120        // because we use EndFlush to make the required adjustments to allocated_bytes.
2121        transaction.add_with_object(
2122            self.allocator.object_id(),
2123            Mutation::EndFlush,
2124            AssocObj::Borrowed(&reservation_update),
2125        );
2126        root_store.remove_from_graveyard(&mut transaction, layer_object_handle.object_id());
2127
2128        let layers = layers_from_handles([layer_object_handle]).await?;
2129        transaction
2130            .commit_with_callback(|_| {
2131                self.allocator.tree.set_layers(layers);
2132
2133                // At this point we've committed the new layers to disk so we can start using them.
2134                // This means we can also switch to the new AllocatorInfo which clears
2135                // marked_for_deletion.
2136                let mut inner = self.allocator.inner.lock();
2137                inner.info.layers = info.layers;
2138                for owner_id in marked_for_deletion {
2139                    inner.marked_for_deletion.remove(&owner_id);
2140                    inner.info.marked_for_deletion.remove(&owner_id);
2141                }
2142            })
2143            .await?;
2144
2145        // Now close the layers and purge them.
2146        for layer in layer_set.layers {
2147            let object_id = layer.handle().map(|h| h.object_id());
2148            layer.close_layer().await;
2149            if let Some(object_id) = object_id {
2150                root_store.tombstone_object(object_id, txn_options).await?;
2151            }
2152        }
2153
2154        let mut counters = self.allocator.counters.lock();
2155        counters.num_flushes += 1;
2156        counters.last_flush_time = Some(std::time::SystemTime::now());
2157        // Return the earliest version used by a struct in the tree
2158        Ok(self.allocator.tree.get_earliest_version())
2159    }
2160}
2161
2162#[cfg(test)]
2163mod tests {
2164    use crate::filesystem::{
2165        FxFilesystem, FxFilesystemBuilder, JournalingObject, OpenFxFilesystem,
2166    };
2167    use crate::fsck::fsck;
2168    use crate::lsm_tree::cache::NullCache;
2169    use crate::lsm_tree::skip_list_layer::SkipListLayer;
2170    use crate::lsm_tree::types::{FuzzyHash as _, Item, ItemRef, Layer, LayerIterator};
2171    use crate::lsm_tree::{LSMTree, Query};
2172    use crate::object_handle::ObjectHandle;
2173    use crate::object_store::allocator::merge::merge;
2174    use crate::object_store::allocator::{
2175        Allocator, AllocatorKey, AllocatorValue, CoalescingIterator,
2176    };
2177    use crate::object_store::transaction::{Options, TRANSACTION_METADATA_MAX_AMOUNT, lock_keys};
2178    use crate::object_store::volume::root_volume;
2179    use crate::object_store::{Directory, LockKey, NewChildStoreOptions, ObjectStore};
2180    use crate::range::RangeExt;
2181    use crate::round::round_up;
2182    use crate::testing;
2183    use fuchsia_async as fasync;
2184    use fuchsia_sync::Mutex;
2185    use std::cmp::{max, min};
2186    use std::ops::{Bound, Range};
2187    use std::sync::Arc;
2188    use storage_device::DeviceHolder;
2189    use storage_device::fake_device::FakeDevice;
2190
2191    #[test]
2192    fn test_allocator_key_is_range_based() {
2193        // Make sure we disallow using allocator keys with point queries.
2194        assert!(AllocatorKey { device_range: 0..100 }.is_range_key());
2195    }
2196
2197    #[fuchsia::test]
2198    async fn test_coalescing_iterator() {
2199        let skip_list = SkipListLayer::new(100);
2200        let items = [
2201            Item::new(
2202                AllocatorKey { device_range: 0..100 },
2203                AllocatorValue::Abs { count: 1, owner_object_id: 99 },
2204            ),
2205            Item::new(
2206                AllocatorKey { device_range: 100..200 },
2207                AllocatorValue::Abs { count: 1, owner_object_id: 99 },
2208            ),
2209        ];
2210        skip_list.insert(items[1].clone()).expect("insert error");
2211        skip_list.insert(items[0].clone()).expect("insert error");
2212        let mut iter =
2213            CoalescingIterator::new(skip_list.seek(Bound::Unbounded).await.expect("seek failed"))
2214                .await
2215                .expect("new failed");
2216        let ItemRef { key, value, .. } = iter.get().expect("get failed");
2217        assert_eq!(
2218            (key, value),
2219            (
2220                &AllocatorKey { device_range: 0..200 },
2221                &AllocatorValue::Abs { count: 1, owner_object_id: 99 }
2222            )
2223        );
2224        iter.advance().await.expect("advance failed");
2225        assert!(iter.get().is_none());
2226    }
2227
2228    #[fuchsia::test]
2229    async fn test_merge_and_coalesce_across_three_layers() {
2230        let lsm_tree = LSMTree::new(merge, Box::new(NullCache {}));
2231        lsm_tree
2232            .insert(Item::new(
2233                AllocatorKey { device_range: 100..200 },
2234                AllocatorValue::Abs { count: 1, owner_object_id: 99 },
2235            ))
2236            .expect("insert error");
2237        lsm_tree.seal();
2238        lsm_tree
2239            .insert(Item::new(
2240                AllocatorKey { device_range: 0..100 },
2241                AllocatorValue::Abs { count: 1, owner_object_id: 99 },
2242            ))
2243            .expect("insert error");
2244
2245        let layer_set = lsm_tree.layer_set();
2246        let mut merger = layer_set.merger();
2247        let mut iter =
2248            CoalescingIterator::new(merger.query(Query::FullScan).await.expect("seek failed"))
2249                .await
2250                .expect("new failed");
2251        let ItemRef { key, value, .. } = iter.get().expect("get failed");
2252        assert_eq!(
2253            (key, value),
2254            (
2255                &AllocatorKey { device_range: 0..200 },
2256                &AllocatorValue::Abs { count: 1, owner_object_id: 99 }
2257            )
2258        );
2259        iter.advance().await.expect("advance failed");
2260        assert!(iter.get().is_none());
2261    }
2262
2263    #[fuchsia::test]
2264    async fn test_merge_and_coalesce_wont_merge_across_object_id() {
2265        let lsm_tree = LSMTree::new(merge, Box::new(NullCache {}));
2266        lsm_tree
2267            .insert(Item::new(
2268                AllocatorKey { device_range: 100..200 },
2269                AllocatorValue::Abs { count: 1, owner_object_id: 99 },
2270            ))
2271            .expect("insert error");
2272        lsm_tree.seal();
2273        lsm_tree
2274            .insert(Item::new(
2275                AllocatorKey { device_range: 0..100 },
2276                AllocatorValue::Abs { count: 1, owner_object_id: 98 },
2277            ))
2278            .expect("insert error");
2279
2280        let layer_set = lsm_tree.layer_set();
2281        let mut merger = layer_set.merger();
2282        let mut iter =
2283            CoalescingIterator::new(merger.query(Query::FullScan).await.expect("seek failed"))
2284                .await
2285                .expect("new failed");
2286        let ItemRef { key, value, .. } = iter.get().expect("get failed");
2287        assert_eq!(
2288            (key, value),
2289            (
2290                &AllocatorKey { device_range: 0..100 },
2291                &AllocatorValue::Abs { count: 1, owner_object_id: 98 },
2292            )
2293        );
2294        iter.advance().await.expect("advance failed");
2295        let ItemRef { key, value, .. } = iter.get().expect("get failed");
2296        assert_eq!(
2297            (key, value),
2298            (
2299                &AllocatorKey { device_range: 100..200 },
2300                &AllocatorValue::Abs { count: 1, owner_object_id: 99 }
2301            )
2302        );
2303        iter.advance().await.expect("advance failed");
2304        assert!(iter.get().is_none());
2305    }
2306
2307    fn overlap(a: &Range<u64>, b: &Range<u64>) -> u64 {
2308        if a.end > b.start && a.start < b.end {
2309            min(a.end, b.end) - max(a.start, b.start)
2310        } else {
2311            0
2312        }
2313    }
2314
2315    async fn collect_allocations(allocator: &Allocator) -> Vec<Range<u64>> {
2316        let layer_set = allocator.tree.layer_set();
2317        let mut merger = layer_set.merger();
2318        let mut iter = allocator
2319            .filter(merger.query(Query::FullScan).await.expect("seek failed"), false)
2320            .await
2321            .expect("build iterator");
2322        let mut allocations: Vec<Range<u64>> = Vec::new();
2323        while let Some(ItemRef { key: AllocatorKey { device_range }, .. }) = iter.get() {
2324            if let Some(r) = allocations.last() {
2325                assert!(device_range.start >= r.end);
2326            }
2327            allocations.push(device_range.clone());
2328            iter.advance().await.expect("advance failed");
2329        }
2330        allocations
2331    }
2332
2333    async fn check_allocations(allocator: &Allocator, expected_allocations: &[Range<u64>]) {
2334        let layer_set = allocator.tree.layer_set();
2335        let mut merger = layer_set.merger();
2336        let mut iter = allocator
2337            .filter(merger.query(Query::FullScan).await.expect("seek failed"), false)
2338            .await
2339            .expect("build iterator");
2340        let mut found = 0;
2341        while let Some(ItemRef { key: AllocatorKey { device_range }, .. }) = iter.get() {
2342            let mut l = device_range.length().expect("Invalid range");
2343            found += l;
2344            // Make sure that the entire range we have found completely overlaps with all the
2345            // allocations we expect to find.
2346            for range in expected_allocations {
2347                l -= overlap(range, device_range);
2348                if l == 0 {
2349                    break;
2350                }
2351            }
2352            assert_eq!(l, 0, "range {device_range:?} not covered by expectations");
2353            iter.advance().await.expect("advance failed");
2354        }
2355        // Make sure the total we found adds up to what we expect.
2356        assert_eq!(found, expected_allocations.iter().map(|r| r.length().unwrap()).sum::<u64>());
2357    }
2358
2359    async fn test_fs() -> (OpenFxFilesystem, Arc<Allocator>) {
2360        let device = DeviceHolder::new(FakeDevice::new(4096, 4096));
2361        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2362        let allocator = fs.allocator();
2363        (fs, allocator)
2364    }
2365
2366    #[fuchsia::test]
2367    async fn test_allocations() {
2368        const STORE_OBJECT_ID: u64 = 99;
2369        let (fs, allocator) = test_fs().await;
2370        let mut transaction =
2371            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2372        let mut device_ranges = collect_allocations(&allocator).await;
2373
2374        // Expected extents:
2375        let expected = vec![
2376            0..4096,        // Superblock A (4k)
2377            4096..139264,   // root_store layer files, StoreInfo.. (33x4k blocks)
2378            139264..204800, // Superblock A extension (64k)
2379            204800..335872, // Initial Journal (128k)
2380            335872..401408, // Superblock B extension (64k)
2381            524288..528384, // Superblock B (4k)
2382        ];
2383        assert_eq!(device_ranges, expected);
2384        device_ranges.push(
2385            allocator
2386                .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2387                .await
2388                .expect("allocate failed"),
2389        );
2390        assert_eq!(device_ranges.last().unwrap().length().expect("Invalid range"), fs.block_size());
2391        device_ranges.push(
2392            allocator
2393                .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2394                .await
2395                .expect("allocate failed"),
2396        );
2397        assert_eq!(device_ranges.last().unwrap().length().expect("Invalid range"), fs.block_size());
2398        assert_eq!(overlap(&device_ranges[0], &device_ranges[1]), 0);
2399        transaction.commit().await.expect("commit failed");
2400        let mut transaction =
2401            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2402        device_ranges.push(
2403            allocator
2404                .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2405                .await
2406                .expect("allocate failed"),
2407        );
2408        assert_eq!(device_ranges[7].length().expect("Invalid range"), fs.block_size());
2409        assert_eq!(overlap(&device_ranges[5], &device_ranges[7]), 0);
2410        assert_eq!(overlap(&device_ranges[6], &device_ranges[7]), 0);
2411        transaction.commit().await.expect("commit failed");
2412
2413        check_allocations(&allocator, &device_ranges).await;
2414    }
2415
2416    #[fuchsia::test]
2417    async fn test_allocate_more_than_max_size() {
2418        const STORE_OBJECT_ID: u64 = 99;
2419        let (fs, allocator) = test_fs().await;
2420        let mut transaction =
2421            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2422        let mut device_ranges = collect_allocations(&allocator).await;
2423        device_ranges.push(
2424            allocator
2425                .allocate(&mut transaction, STORE_OBJECT_ID, fs.device().size())
2426                .await
2427                .expect("allocate failed"),
2428        );
2429        assert_eq!(
2430            device_ranges.last().unwrap().length().expect("Invalid range"),
2431            allocator.max_extent_size_bytes
2432        );
2433        transaction.commit().await.expect("commit failed");
2434
2435        check_allocations(&allocator, &device_ranges).await;
2436    }
2437
2438    #[fuchsia::test]
2439    async fn test_deallocations() {
2440        const STORE_OBJECT_ID: u64 = 99;
2441        let (fs, allocator) = test_fs().await;
2442        let initial_allocations = collect_allocations(&allocator).await;
2443
2444        let mut transaction =
2445            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2446        let device_range1 = allocator
2447            .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2448            .await
2449            .expect("allocate failed");
2450        assert_eq!(device_range1.length().expect("Invalid range"), fs.block_size());
2451        transaction.commit().await.expect("commit failed");
2452
2453        let mut transaction =
2454            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2455        allocator
2456            .deallocate(&mut transaction, STORE_OBJECT_ID, device_range1)
2457            .await
2458            .expect("deallocate failed");
2459        transaction.commit().await.expect("commit failed");
2460
2461        check_allocations(&allocator, &initial_allocations).await;
2462    }
2463
2464    #[fuchsia::test]
2465    async fn test_mark_allocated() {
2466        const STORE_OBJECT_ID: u64 = 99;
2467        let (fs, allocator) = test_fs().await;
2468        let mut device_ranges = collect_allocations(&allocator).await;
2469        let range = {
2470            let mut transaction = fs
2471                .clone()
2472                .new_transaction(lock_keys![], Options::default())
2473                .await
2474                .expect("new failed");
2475            // First, allocate 2 blocks.
2476            allocator
2477                .allocate(&mut transaction, STORE_OBJECT_ID, 2 * fs.block_size())
2478                .await
2479                .expect("allocate failed")
2480            // Let the transaction drop.
2481        };
2482
2483        let mut transaction =
2484            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2485
2486        // If we allocate 1 block, the two blocks that were allocated earlier should be available,
2487        // and this should return the first of them.
2488        device_ranges.push(
2489            allocator
2490                .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2491                .await
2492                .expect("allocate failed"),
2493        );
2494
2495        assert_eq!(device_ranges.last().unwrap().start, range.start);
2496
2497        // Mark the second block as allocated.
2498        let mut range2 = range.clone();
2499        range2.start += fs.block_size();
2500        allocator
2501            .mark_allocated(&mut transaction, STORE_OBJECT_ID, range2.clone())
2502            .expect("mark_allocated failed");
2503        device_ranges.push(range2);
2504
2505        // This should avoid the range we marked as allocated.
2506        device_ranges.push(
2507            allocator
2508                .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2509                .await
2510                .expect("allocate failed"),
2511        );
2512        let last_range = device_ranges.last().unwrap();
2513        assert_eq!(last_range.length().expect("Invalid range"), fs.block_size());
2514        assert_eq!(overlap(last_range, &range), 0);
2515        transaction.commit().await.expect("commit failed");
2516
2517        check_allocations(&allocator, &device_ranges).await;
2518    }
2519
2520    #[fuchsia::test]
2521    async fn test_mark_for_deletion() {
2522        const STORE_OBJECT_ID: u64 = 99;
2523        let (fs, allocator) = test_fs().await;
2524
2525        // Allocate some stuff.
2526        let initial_allocated_bytes = allocator.get_allocated_bytes();
2527        let mut device_ranges = collect_allocations(&allocator).await;
2528        let mut transaction =
2529            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2530        // Note we have a cap on individual allocation length so we allocate over multiple mutation.
2531        for _ in 0..15 {
2532            device_ranges.push(
2533                allocator
2534                    .allocate(&mut transaction, STORE_OBJECT_ID, 100 * fs.block_size())
2535                    .await
2536                    .expect("allocate failed"),
2537            );
2538            device_ranges.push(
2539                allocator
2540                    .allocate(&mut transaction, STORE_OBJECT_ID, 100 * fs.block_size())
2541                    .await
2542                    .expect("allocate2 failed"),
2543            );
2544        }
2545        transaction.commit().await.expect("commit failed");
2546        check_allocations(&allocator, &device_ranges).await;
2547
2548        assert_eq!(
2549            allocator.get_allocated_bytes(),
2550            initial_allocated_bytes + fs.block_size() * 3000
2551        );
2552
2553        // Mark for deletion.
2554        let mut transaction =
2555            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2556        allocator.mark_for_deletion(&mut transaction, STORE_OBJECT_ID);
2557        transaction.commit().await.expect("commit failed");
2558
2559        // Expect that allocated bytes is updated immediately but device ranges are still allocated.
2560        assert_eq!(allocator.get_allocated_bytes(), initial_allocated_bytes);
2561        check_allocations(&allocator, &device_ranges).await;
2562
2563        // Allocate more space than we have until we deallocate the mark_for_deletion space.
2564        // This should force a flush on allocate(). (1500 * 3 > test_fs size of 4096 blocks).
2565        device_ranges.clear();
2566
2567        let mut transaction =
2568            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2569        let target_bytes = 1500 * fs.block_size();
2570        while device_ranges.iter().map(|x| x.length().unwrap()).sum::<u64>() != target_bytes {
2571            let len = std::cmp::min(
2572                target_bytes - device_ranges.iter().map(|x| x.length().unwrap()).sum::<u64>(),
2573                100 * fs.block_size(),
2574            );
2575            device_ranges.push(
2576                allocator.allocate(&mut transaction, 100, len).await.expect("allocate failed"),
2577            );
2578        }
2579        transaction.commit().await.expect("commit failed");
2580
2581        // Have the deleted ranges cleaned up.
2582        allocator.flush().await.expect("flush failed");
2583
2584        // The flush above seems to trigger an allocation for the allocator itself.
2585        // We will just check that we have the right size for the owner we care about.
2586
2587        assert!(!allocator.get_owner_allocated_bytes().contains_key(&STORE_OBJECT_ID));
2588        assert_eq!(*allocator.get_owner_allocated_bytes().get(&100).unwrap(), target_bytes,);
2589    }
2590
2591    async fn create_file(store: &Arc<ObjectStore>, size: usize) {
2592        let root_directory =
2593            Directory::open(store, store.root_directory_object_id()).await.expect("open failed");
2594
2595        let mut transaction = store
2596            .filesystem()
2597            .new_transaction(
2598                lock_keys![LockKey::object(
2599                    store.store_object_id(),
2600                    store.root_directory_object_id()
2601                )],
2602                Options::default(),
2603            )
2604            .await
2605            .expect("new_transaction failed");
2606        let file = root_directory
2607            .create_child_file(&mut transaction, &format!("foo {}", size))
2608            .await
2609            .expect("create_child_file failed");
2610        transaction.commit().await.expect("commit failed");
2611
2612        let buffer = file.allocate_buffer(size).await;
2613
2614        // Append some data to it.
2615        let mut transaction = file
2616            .new_transaction_with_options(Options {
2617                borrow_metadata_space: true,
2618                ..Default::default()
2619            })
2620            .await
2621            .expect("new_transaction_with_options failed");
2622        file.txn_write(&mut transaction, 0, buffer.as_ref()).await.expect("txn_write failed");
2623        transaction.commit().await.expect("commit failed");
2624    }
2625
2626    #[fuchsia::test]
2627    async fn test_replay_with_deleted_store_and_compaction() {
2628        let (fs, _) = test_fs().await;
2629
2630        const FILE_SIZE: usize = 10_000_000;
2631
2632        let mut store_id = {
2633            let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2634            let store = root_vol
2635                .new_volume("vol", NewChildStoreOptions::default())
2636                .await
2637                .expect("new_volume failed");
2638
2639            create_file(&store, FILE_SIZE).await;
2640            store.store_object_id()
2641        };
2642
2643        fs.close().await.expect("close failed");
2644        let device = fs.take_device().await;
2645        device.reopen(false);
2646
2647        let mut fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2648
2649        // Compact so that when we replay the transaction to delete the store won't find any
2650        // mutations.
2651        fs.journal().force_compact().await.expect("compact failed");
2652
2653        for _ in 0..2 {
2654            {
2655                let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2656
2657                let transaction = fs
2658                    .clone()
2659                    .new_transaction(
2660                        lock_keys![
2661                            LockKey::object(
2662                                root_vol.volume_directory().store().store_object_id(),
2663                                root_vol.volume_directory().object_id(),
2664                            ),
2665                            LockKey::flush(store_id)
2666                        ],
2667                        Options { borrow_metadata_space: true, ..Default::default() },
2668                    )
2669                    .await
2670                    .expect("new_transaction failed");
2671                root_vol
2672                    .delete_volume("vol", transaction, || {})
2673                    .await
2674                    .expect("delete_volume failed");
2675
2676                let store = root_vol
2677                    .new_volume("vol", NewChildStoreOptions::default())
2678                    .await
2679                    .expect("new_volume failed");
2680                create_file(&store, FILE_SIZE).await;
2681                store_id = store.store_object_id();
2682            }
2683
2684            fs.close().await.expect("close failed");
2685            let device = fs.take_device().await;
2686            device.reopen(false);
2687
2688            fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2689        }
2690
2691        fsck(fs.clone()).await.expect("fsck failed");
2692        fs.close().await.expect("close failed");
2693    }
2694
2695    #[fuchsia::test(threads = 4)]
2696    async fn test_compaction_delete_race() {
2697        let (fs, _allocator) = test_fs().await;
2698
2699        {
2700            const FILE_SIZE: usize = 10_000_000;
2701
2702            let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2703            let store = root_vol
2704                .new_volume("vol", NewChildStoreOptions::default())
2705                .await
2706                .expect("new_volume failed");
2707
2708            create_file(&store, FILE_SIZE).await;
2709
2710            // Race compaction with deleting a store.
2711            let fs_clone = fs.clone();
2712
2713            // Even though the executor has 4 threads, it's hard to get it to run with
2714            // multiple threads.
2715            let executor_tasks = testing::force_executor_threads_to_run(4).await;
2716
2717            let task = fasync::Task::spawn(async move {
2718                fs_clone.journal().force_compact().await.expect("compact failed");
2719            });
2720
2721            // We don't need the executor tasks any more.
2722            drop(executor_tasks);
2723
2724            // This range is chosen such that it caused this test to fail after quite a low number
2725            // of iterations for the bug that this test was introduced for.
2726            let sleep = rand::random_range(3000..6000);
2727            std::thread::sleep(std::time::Duration::from_micros(sleep));
2728            log::info!("sleep {sleep}us");
2729
2730            let transaction = fs
2731                .clone()
2732                .new_transaction(
2733                    lock_keys![
2734                        LockKey::object(
2735                            root_vol.volume_directory().store().store_object_id(),
2736                            root_vol.volume_directory().object_id(),
2737                        ),
2738                        LockKey::flush(store.store_object_id())
2739                    ],
2740                    Options { borrow_metadata_space: true, ..Default::default() },
2741                )
2742                .await
2743                .expect("new_transaction failed");
2744            root_vol.delete_volume("vol", transaction, || {}).await.expect("delete_volume failed");
2745
2746            task.await;
2747        }
2748
2749        fs.journal().force_compact().await.expect("compact failed");
2750        fs.close().await.expect("close failed");
2751
2752        let device = fs.take_device().await;
2753        device.reopen(false);
2754
2755        let fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2756        fsck(fs.clone()).await.expect("fsck failed");
2757        fs.close().await.expect("close failed");
2758    }
2759
2760    #[fuchsia::test]
2761    async fn test_delete_multiple_volumes() {
2762        let (mut fs, _) = test_fs().await;
2763
2764        for _ in 0..50 {
2765            {
2766                let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2767                let store = root_vol
2768                    .new_volume("vol", NewChildStoreOptions::default())
2769                    .await
2770                    .expect("new_volume failed");
2771
2772                create_file(&store, 1_000_000).await;
2773
2774                let transaction = fs
2775                    .clone()
2776                    .new_transaction(
2777                        lock_keys![
2778                            LockKey::object(
2779                                root_vol.volume_directory().store().store_object_id(),
2780                                root_vol.volume_directory().object_id(),
2781                            ),
2782                            LockKey::flush(store.store_object_id())
2783                        ],
2784                        Options { borrow_metadata_space: true, ..Default::default() },
2785                    )
2786                    .await
2787                    .expect("new_transaction failed");
2788                root_vol
2789                    .delete_volume("vol", transaction, || {})
2790                    .await
2791                    .expect("delete_volume failed");
2792
2793                fs.allocator().flush().await.expect("flush failed");
2794            }
2795
2796            fs.close().await.expect("close failed");
2797            let device = fs.take_device().await;
2798            device.reopen(false);
2799
2800            fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2801        }
2802
2803        fsck(fs.clone()).await.expect("fsck failed");
2804        fs.close().await.expect("close failed");
2805    }
2806
2807    #[fuchsia::test]
2808    async fn test_allocate_free_reallocate() {
2809        const STORE_OBJECT_ID: u64 = 99;
2810        let (fs, allocator) = test_fs().await;
2811
2812        // Allocate some stuff.
2813        let mut device_ranges = Vec::new();
2814        let mut transaction =
2815            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2816        for _ in 0..30 {
2817            device_ranges.push(
2818                allocator
2819                    .allocate(&mut transaction, STORE_OBJECT_ID, 100 * fs.block_size())
2820                    .await
2821                    .expect("allocate failed"),
2822            );
2823        }
2824        transaction.commit().await.expect("commit failed");
2825
2826        assert_eq!(
2827            fs.block_size() * 3000,
2828            *allocator.get_owner_allocated_bytes().entry(STORE_OBJECT_ID).or_default()
2829        );
2830
2831        // Delete it all.
2832        let mut transaction =
2833            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2834        for range in std::mem::replace(&mut device_ranges, Vec::new()) {
2835            allocator.deallocate(&mut transaction, STORE_OBJECT_ID, range).await.expect("dealloc");
2836        }
2837        transaction.commit().await.expect("commit failed");
2838
2839        assert_eq!(0, *allocator.get_owner_allocated_bytes().entry(STORE_OBJECT_ID).or_default());
2840
2841        // Allocate some more stuff. Due to storage pressure, this requires us to flush device
2842        // before reusing the above space
2843        let mut transaction =
2844            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2845        let target_len = 1500 * fs.block_size();
2846        while device_ranges.iter().map(|i| i.length().unwrap()).sum::<u64>() != target_len {
2847            let len = target_len - device_ranges.iter().map(|i| i.length().unwrap()).sum::<u64>();
2848            device_ranges.push(
2849                allocator
2850                    .allocate(&mut transaction, STORE_OBJECT_ID, len)
2851                    .await
2852                    .expect("allocate failed"),
2853            );
2854        }
2855        transaction.commit().await.expect("commit failed");
2856
2857        assert_eq!(
2858            fs.block_size() * 1500,
2859            *allocator.get_owner_allocated_bytes().entry(STORE_OBJECT_ID).or_default()
2860        );
2861    }
2862
2863    #[fuchsia::test]
2864    async fn test_flush() {
2865        const STORE_OBJECT_ID: u64 = 99;
2866
2867        let mut device_ranges = Vec::new();
2868        let device = {
2869            let (fs, allocator) = test_fs().await;
2870            let mut transaction = fs
2871                .clone()
2872                .new_transaction(lock_keys![], Options::default())
2873                .await
2874                .expect("new failed");
2875            device_ranges.push(
2876                allocator
2877                    .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2878                    .await
2879                    .expect("allocate failed"),
2880            );
2881            device_ranges.push(
2882                allocator
2883                    .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2884                    .await
2885                    .expect("allocate failed"),
2886            );
2887            device_ranges.push(
2888                allocator
2889                    .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2890                    .await
2891                    .expect("allocate failed"),
2892            );
2893            transaction.commit().await.expect("commit failed");
2894
2895            allocator.flush().await.expect("flush failed");
2896
2897            fs.close().await.expect("close failed");
2898            fs.take_device().await
2899        };
2900
2901        device.reopen(false);
2902        let fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2903        let allocator = fs.allocator();
2904
2905        let allocated = collect_allocations(&allocator).await;
2906
2907        // Make sure the ranges we allocated earlier are still allocated.
2908        for i in &device_ranges {
2909            let mut overlapping = 0;
2910            for j in &allocated {
2911                overlapping += overlap(i, j);
2912            }
2913            assert_eq!(overlapping, i.length().unwrap(), "Range {i:?} not allocated");
2914        }
2915
2916        let mut transaction =
2917            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2918        let range = allocator
2919            .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2920            .await
2921            .expect("allocate failed");
2922
2923        // Make sure the range just allocated doesn't overlap any other allocated ranges.
2924        for r in &allocated {
2925            assert_eq!(overlap(r, &range), 0);
2926        }
2927        transaction.commit().await.expect("commit failed");
2928    }
2929
2930    #[fuchsia::test]
2931    async fn test_dropped_transaction() {
2932        const STORE_OBJECT_ID: u64 = 99;
2933        let (fs, allocator) = test_fs().await;
2934        let allocated_range = {
2935            let mut transaction = fs
2936                .clone()
2937                .new_transaction(lock_keys![], Options::default())
2938                .await
2939                .expect("new_transaction failed");
2940            allocator
2941                .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2942                .await
2943                .expect("allocate failed")
2944        };
2945        // After dropping the transaction and attempting to allocate again, we should end up with
2946        // the same range because the reservation should have been released.
2947        let mut transaction = fs
2948            .clone()
2949            .new_transaction(lock_keys![], Options::default())
2950            .await
2951            .expect("new_transaction failed");
2952        assert_eq!(
2953            allocator
2954                .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2955                .await
2956                .expect("allocate failed"),
2957            allocated_range
2958        );
2959    }
2960
2961    #[fuchsia::test]
2962    async fn test_cleanup_removed_owner() {
2963        const STORE_OBJECT_ID: u64 = 99;
2964        let device = {
2965            let (fs, allocator) = test_fs().await;
2966
2967            assert!(!allocator.get_owner_allocated_bytes().contains_key(&STORE_OBJECT_ID));
2968            {
2969                let mut transaction =
2970                    fs.clone().new_transaction(lock_keys![], Options::default()).await.unwrap();
2971                allocator
2972                    .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2973                    .await
2974                    .expect("Allocating");
2975                transaction.commit().await.expect("Committing.");
2976            }
2977            allocator.flush().await.expect("Flushing");
2978            assert!(allocator.get_owner_allocated_bytes().contains_key(&STORE_OBJECT_ID));
2979            {
2980                let mut transaction =
2981                    fs.clone().new_transaction(lock_keys![], Options::default()).await.unwrap();
2982                allocator.mark_for_deletion(&mut transaction, STORE_OBJECT_ID);
2983                transaction.commit().await.expect("Committing.");
2984            }
2985            assert!(!allocator.get_owner_allocated_bytes().contains_key(&STORE_OBJECT_ID));
2986            fs.close().await.expect("Closing");
2987            fs.take_device().await
2988        };
2989
2990        device.reopen(false);
2991        let fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2992        let allocator = fs.allocator();
2993        assert!(!allocator.get_owner_allocated_bytes().contains_key(&STORE_OBJECT_ID));
2994    }
2995
2996    #[fuchsia::test]
2997    async fn test_allocated_bytes() {
2998        const STORE_OBJECT_ID: u64 = 99;
2999        let (fs, allocator) = test_fs().await;
3000
3001        let initial_allocated_bytes = allocator.get_allocated_bytes();
3002
3003        // Verify allocated_bytes reflects allocation changes.
3004        let allocated_bytes = initial_allocated_bytes + fs.block_size();
3005        let allocated_range = {
3006            let mut transaction = fs
3007                .clone()
3008                .new_transaction(lock_keys![], Options::default())
3009                .await
3010                .expect("new_transaction failed");
3011            let range = allocator
3012                .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
3013                .await
3014                .expect("allocate failed");
3015            transaction.commit().await.expect("commit failed");
3016            assert_eq!(allocator.get_allocated_bytes(), allocated_bytes);
3017            range
3018        };
3019
3020        {
3021            let mut transaction = fs
3022                .clone()
3023                .new_transaction(lock_keys![], Options::default())
3024                .await
3025                .expect("new_transaction failed");
3026            allocator
3027                .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
3028                .await
3029                .expect("allocate failed");
3030
3031            // Prior to committing, the count of allocated bytes shouldn't change.
3032            assert_eq!(allocator.get_allocated_bytes(), allocated_bytes);
3033        }
3034
3035        // After dropping the prior transaction, the allocated bytes still shouldn't have changed.
3036        assert_eq!(allocator.get_allocated_bytes(), allocated_bytes);
3037
3038        // Verify allocated_bytes reflects deallocations.
3039        let deallocate_range = allocated_range.start + 20..allocated_range.end - 20;
3040        let mut transaction =
3041            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
3042        allocator
3043            .deallocate(&mut transaction, STORE_OBJECT_ID, deallocate_range)
3044            .await
3045            .expect("deallocate failed");
3046
3047        // Before committing, there should be no change.
3048        assert_eq!(allocator.get_allocated_bytes(), allocated_bytes);
3049
3050        transaction.commit().await.expect("commit failed");
3051
3052        // After committing, all but 40 bytes should remain allocated.
3053        assert_eq!(allocator.get_allocated_bytes(), initial_allocated_bytes + 40);
3054    }
3055
3056    #[fuchsia::test]
3057    async fn test_persist_bytes_limit() {
3058        const LIMIT: u64 = 12345;
3059        const OWNER_ID: u64 = 12;
3060
3061        let (fs, allocator) = test_fs().await;
3062        {
3063            let mut transaction = fs
3064                .clone()
3065                .new_transaction(lock_keys![], Options::default())
3066                .await
3067                .expect("new_transaction failed");
3068            allocator
3069                .set_bytes_limit(&mut transaction, OWNER_ID, LIMIT)
3070                .expect("Failed to set limit.");
3071            assert!(allocator.inner.lock().info.limit_bytes.get(&OWNER_ID).is_none());
3072            transaction.commit().await.expect("Failed to commit transaction");
3073            let bytes: u64 = *allocator
3074                .inner
3075                .lock()
3076                .info
3077                .limit_bytes
3078                .get(&OWNER_ID)
3079                .expect("Failed to find limit");
3080            assert_eq!(LIMIT, bytes);
3081        }
3082    }
3083
3084    /// Given a sorted list of non-overlapping ranges, this will coalesce adjacent ranges.
3085    /// This allows comparison of equivalent sets of ranges which may occur due to differences
3086    /// across allocator strategies.
3087    fn coalesce_ranges(ranges: Vec<Range<u64>>) -> Vec<Range<u64>> {
3088        let mut coalesced = Vec::new();
3089        let mut prev: Option<Range<u64>> = None;
3090        for range in ranges {
3091            if let Some(prev_range) = &mut prev {
3092                if range.start == prev_range.end {
3093                    prev_range.end = range.end;
3094                } else {
3095                    coalesced.push(prev_range.clone());
3096                    prev = Some(range);
3097                }
3098            } else {
3099                prev = Some(range);
3100            }
3101        }
3102        if let Some(prev_range) = prev {
3103            coalesced.push(prev_range);
3104        }
3105        coalesced
3106    }
3107
3108    #[fuchsia::test]
3109    async fn test_take_for_trimming() {
3110        const STORE_OBJECT_ID: u64 = 99;
3111
3112        // Allocate a large chunk, then free a few bits of it, so we have free chunks interleaved
3113        // with allocated chunks.
3114        let allocated_range;
3115        let expected_free_ranges;
3116        let device = {
3117            let (fs, allocator) = test_fs().await;
3118            let bs = fs.block_size();
3119            let mut transaction = fs
3120                .clone()
3121                .new_transaction(lock_keys![], Options::default())
3122                .await
3123                .expect("new failed");
3124            allocated_range = allocator
3125                .allocate(&mut transaction, STORE_OBJECT_ID, 32 * bs)
3126                .await
3127                .expect("allocate failed");
3128            transaction.commit().await.expect("commit failed");
3129
3130            let mut transaction = fs
3131                .clone()
3132                .new_transaction(lock_keys![], Options::default())
3133                .await
3134                .expect("new failed");
3135            let base = allocated_range.start;
3136            expected_free_ranges = vec![
3137                base..(base + (bs * 1)),
3138                (base + (bs * 2))..(base + (bs * 3)),
3139                // Note that the next three ranges are adjacent and will be treated as one free
3140                // range once applied.  We separate them here to exercise the handling of "large"
3141                // free ranges.
3142                (base + (bs * 4))..(base + (bs * 8)),
3143                (base + (bs * 8))..(base + (bs * 12)),
3144                (base + (bs * 12))..(base + (bs * 13)),
3145                (base + (bs * 29))..(base + (bs * 30)),
3146            ];
3147            for range in &expected_free_ranges {
3148                allocator
3149                    .deallocate(&mut transaction, STORE_OBJECT_ID, range.clone())
3150                    .await
3151                    .expect("deallocate failed");
3152            }
3153            transaction.commit().await.expect("commit failed");
3154
3155            allocator.flush().await.expect("flush failed");
3156
3157            fs.close().await.expect("close failed");
3158            fs.take_device().await
3159        };
3160
3161        device.reopen(false);
3162        let fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
3163        let allocator = fs.allocator();
3164
3165        // These values were picked so that each of them would be the reason why
3166        // collect_free_extents finished, and so we would return after partially processing one of
3167        // the free extents.
3168        let max_extent_size = fs.block_size() as usize * 4;
3169        const EXTENTS_PER_BATCH: usize = 2;
3170        let mut free_ranges = vec![];
3171        let mut offset = allocated_range.start;
3172        while offset < allocated_range.end {
3173            let free = allocator
3174                .take_for_trimming(offset, max_extent_size, EXTENTS_PER_BATCH)
3175                .await
3176                .expect("take_for_trimming failed");
3177            free_ranges.extend(
3178                free.extents().iter().filter(|range| range.end <= allocated_range.end).cloned(),
3179            );
3180            offset = free.extents().last().expect("Unexpectedly hit the end of free extents").end;
3181        }
3182        // Coalesce adjacent free ranges because the allocator may return smaller aligned chunks
3183        // but the overall range should always be equivalent.
3184        let coalesced_free_ranges = coalesce_ranges(free_ranges);
3185        let coalesced_expected_free_ranges = coalesce_ranges(expected_free_ranges);
3186
3187        assert_eq!(coalesced_free_ranges, coalesced_expected_free_ranges);
3188    }
3189
3190    #[fuchsia::test]
3191    async fn test_allocations_wait_for_free_extents() {
3192        const STORE_OBJECT_ID: u64 = 99;
3193        let (fs, allocator) = test_fs().await;
3194        let allocator_clone = allocator.clone();
3195
3196        let mut transaction =
3197            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
3198
3199        // Tie up all of the free extents on the device, and make sure allocations block.
3200        let max_extent_size = fs.device().size() as usize;
3201        const EXTENTS_PER_BATCH: usize = usize::MAX;
3202
3203        // HACK: Treat `trimmable_extents` as being locked by `trim_done` (i.e. it should only be
3204        // accessed whilst `trim_done` is locked). We can't combine them into the same mutex,
3205        // because the inner type would be "poisoned" by the lifetime parameter of
3206        // `trimmable_extents` (which is in the lifetime of `allocator`), and then we can't move it
3207        // into `alloc_task` which would require a `'static` lifetime.
3208        let trim_done = Arc::new(Mutex::new(false));
3209        let trimmable_extents = allocator
3210            .take_for_trimming(0, max_extent_size, EXTENTS_PER_BATCH)
3211            .await
3212            .expect("take_for_trimming failed");
3213
3214        let trim_done_clone = trim_done.clone();
3215        let bs = fs.block_size();
3216        let alloc_task = fasync::Task::spawn(async move {
3217            allocator_clone
3218                .allocate(&mut transaction, STORE_OBJECT_ID, bs)
3219                .await
3220                .expect("allocate should fail");
3221            {
3222                assert!(*trim_done_clone.lock(), "Allocation finished before trim completed");
3223            }
3224            transaction.commit().await.expect("commit failed");
3225        });
3226
3227        // Add a small delay to simulate the trim taking some nonzero amount of time.  Otherwise,
3228        // this will almost certainly always beat the allocation attempt.
3229        fasync::Timer::new(std::time::Duration::from_millis(100)).await;
3230
3231        // Once the free extents are released, the task should unblock.
3232        {
3233            let mut trim_done = trim_done.lock();
3234            std::mem::drop(trimmable_extents);
3235            *trim_done = true;
3236        }
3237
3238        alloc_task.await;
3239    }
3240
3241    #[fuchsia::test]
3242    async fn test_allocation_with_reservation_not_multiple_of_block_size() {
3243        const STORE_OBJECT_ID: u64 = 99;
3244        let (fs, allocator) = test_fs().await;
3245
3246        // Reserve an amount that isn't a multiple of the block size.
3247        const RESERVATION_AMOUNT: u64 = TRANSACTION_METADATA_MAX_AMOUNT + 5000;
3248        let reservation =
3249            allocator.clone().reserve(Some(STORE_OBJECT_ID), RESERVATION_AMOUNT).unwrap();
3250
3251        let mut transaction = fs
3252            .clone()
3253            .new_transaction(
3254                lock_keys![],
3255                Options { allocator_reservation: Some(&reservation), ..Options::default() },
3256            )
3257            .await
3258            .expect("new failed");
3259
3260        let range = allocator
3261            .allocate(
3262                &mut transaction,
3263                STORE_OBJECT_ID,
3264                round_up(RESERVATION_AMOUNT, fs.block_size()).unwrap(),
3265            )
3266            .await
3267            .expect("allocate faiiled");
3268        assert_eq!((range.end - range.start) % fs.block_size(), 0);
3269
3270        println!("{}", range.end - range.start);
3271    }
3272}