fxfs/object_store/
allocator.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! # The Allocator
6//!
7//! The allocator in Fxfs is filesystem-wide entity responsible for managing the allocation of
8//! regions of the device to "owners" (which are `ObjectStore`).
9//!
10//! Allocations are tracked in an LSMTree with coalescing used to merge neighboring allocations
11//! with the same properties (owner and reference count). As of writing, reference counting is not
12//! used. (Reference counts are intended for future use if/when snapshot support is implemented.)
13//!
14//! There are a number of important implementation features of the allocator that warrant further
15//! documentation.
16//!
17//! ## Byte limits
18//!
19//! Fxfs is a multi-volume filesystem. Fuchsia with fxblob currently uses two primary volumes -
20//! an unencrypted volume for blob storage and an encrypted volume for data storage.
21//! Byte limits ensure that no one volume can consume all available storage. This is important
22//! as software updates must always be possible (blobs) and conversely configuration data should
23//! always be writable (data).
24//!
25//! ## Reservation tracking
26//!
27//! Fxfs on Fuchsia leverages write-back caching which allows us to buffer writes in RAM until
28//! memory pressure, an explicit flush or a file close requires us to persist data to storage.
29//!
30//! To ensure that we do not over-commit in such cases (e.g. by writing many files to RAM only to
31//! find out tha there is insufficient disk to store them all), the allocator includes a simple
32//! reservation tracking mechanism.
33//!
34//! Reservation tracking is implemented hierarchically such that a reservation can portion out
35//! sub-reservations, each of which may be "reserved" or "forgotten" when it comes time to actually
36//! allocate space.
37//!
38//! ## Fixed locations
39//!
40//! The only fixed location files in Fxfs are the first 512kiB extents of the two superblocks that
41//! exist as the first things on the disk (i.e. The first 1MiB). The allocator manages these
42//! locations, but does so using a 'mark_allocated' method distinct from all other allocations in
43//! which the location is left up to the allocator.
44//!
45//! ## Deallocated unusable regions
46//!
47//! It is not legal to reuse a deallocated disk region until after a flush. Transactions
48//! are not guaranteed to be persisted until after a successful flush so any reuse of
49//! a region before then followed by power loss may lead to corruption.
50//!
51//! e.g. Consider if we deallocate a file, reuse its extent for another file, then crash after
52//! writing to the new file but not yet flushing the journal. At next mount we will have lost the
53//! transaction despite overwriting the original file's data, leading to inconsistency errors (data
54//! corruption).
55//!
56//! These regions are currently tracked in RAM in the allocator.
57//!
58//! ## Allocated but uncommitted regions
59//!
60//! These are allocated regions that are not (yet) persisted to disk. They are regular
61//! file allocations, but are not stored on persistent storage until their transaction is committed
62//! and safely flushed to disk.
63//!
64//! ## TRIMed unusable regions
65//!
66//! We periodically TRIM unallocated regions to give the SSD controller insight into which
67//! parts of the device contain data. We must avoid using temporary TRIM allocations that are held
68//! while we perform these operations.
69//!
70//! ## Volume deletion
71//!
72//! We make use of an optimisation in the case where an entire volume is deleted. In such cases,
73//! rather than individually deallocate all disk regions associated with that volume, we make
74//! note of the deletion and perform special merge operation on the next LSMTree compaction that
75//! filters out allocations for the deleted volume.
76//!
77//! This is designed to make dropping of volumes significantly cheaper, but it does add some
78//! additional complexity if implementing an allocator that implements data structures to track
79//! free space (rather than just allocated space).
80//!
81//! ## Image generation
82//!
83//! The Fuchsia build process requires building an initial filesystem image. In the case of
84//! fxblob-based boards, this is an Fxfs filesystem containing a volume with the base set of
85//! blobs required to bootstrap the system. When we build such an image, we want it to be as compact
86//! as possible as we're potentially packaging it up for distribution. To that end, our allocation
87//! strategy (or at least the strategy used for image generation) should prefer to allocate from the
88//! start of the device wherever possible.
89pub mod merge;
90pub mod strategy;
91
92use crate::drop_event::DropEvent;
93use crate::errors::FxfsError;
94use crate::filesystem::{ApplyContext, ApplyMode, FxFilesystem, JournalingObject, SyncOptions};
95use crate::log::*;
96use crate::lsm_tree::cache::NullCache;
97use crate::lsm_tree::skip_list_layer::SkipListLayer;
98use crate::lsm_tree::types::{
99    FuzzyHash, Item, ItemRef, Layer, LayerIterator, LayerKey, MergeType, OrdLowerBound,
100    OrdUpperBound, RangeKey, SortByU64, Value,
101};
102use crate::lsm_tree::{LSMTree, Query, layers_from_handles};
103use crate::object_handle::{INVALID_OBJECT_ID, ObjectHandle, ReadObjectHandle};
104use crate::object_store::object_manager::ReservationUpdate;
105use crate::object_store::transaction::{
106    AllocatorMutation, AssocObj, LockKey, Mutation, Options, Transaction, WriteGuard, lock_keys,
107};
108use crate::object_store::{
109    DataObjectHandle, DirectWriter, HandleOptions, ObjectStore, ReservedId, tree,
110};
111use crate::range::RangeExt;
112use crate::round::{round_div, round_down, round_up};
113use crate::serialized_types::{
114    DEFAULT_MAX_SERIALIZED_RECORD_SIZE, LATEST_VERSION, Version, Versioned, VersionedLatest,
115};
116use anyhow::{Context, Error, anyhow, bail, ensure};
117use async_trait::async_trait;
118use either::Either::{Left, Right};
119use event_listener::EventListener;
120use fprint::TypeFingerprint;
121use fuchsia_inspect::HistogramProperty;
122use fuchsia_sync::Mutex;
123use futures::FutureExt;
124use merge::{filter_marked_for_deletion, filter_tombstones, merge};
125use serde::{Deserialize, Serialize};
126use std::borrow::Borrow;
127use std::collections::{BTreeMap, HashSet, VecDeque};
128use std::hash::Hash;
129use std::marker::PhantomData;
130use std::num::{NonZero, Saturating};
131use std::ops::Range;
132use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
133use std::sync::{Arc, Weak};
134
135/// This trait is implemented by things that own reservations.
136pub trait ReservationOwner: Send + Sync {
137    /// Report that bytes are being released from the reservation back to the the |ReservationOwner|
138    /// where |owner_object_id| is the owner under the root object store associated with the
139    /// reservation.
140    fn release_reservation(&self, owner_object_id: Option<u64>, amount: u64);
141}
142
143/// A reservation guarantees that when it comes time to actually allocate, it will not fail due to
144/// lack of space.  Sub-reservations (a.k.a. holds) are possible which effectively allows part of a
145/// reservation to be set aside until it's time to commit.  Reservations do offer some
146/// thread-safety, but some responsibility is born by the caller: e.g. calling `forget` and
147/// `reserve` at the same time from different threads is unsafe. Reservations are have an
148/// |owner_object_id| which associates it with an object under the root object store that the
149/// reservation is accounted against.
150pub struct ReservationImpl<T: Borrow<U>, U: ReservationOwner + ?Sized> {
151    owner: T,
152    owner_object_id: Option<u64>,
153    inner: Mutex<ReservationInner>,
154    phantom: PhantomData<U>,
155}
156
157#[derive(Debug, Default)]
158struct ReservationInner {
159    // Amount currently held by this reservation.
160    amount: u64,
161
162    // Amount reserved by sub-reservations.
163    reserved: u64,
164}
165
166impl<T: Borrow<U>, U: ReservationOwner + ?Sized> std::fmt::Debug for ReservationImpl<T, U> {
167    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
168        self.inner.lock().fmt(f)
169    }
170}
171
172impl<T: Borrow<U> + Clone + Send + Sync, U: ReservationOwner + ?Sized> ReservationImpl<T, U> {
173    pub fn new(owner: T, owner_object_id: Option<u64>, amount: u64) -> Self {
174        Self {
175            owner,
176            owner_object_id,
177            inner: Mutex::new(ReservationInner { amount, reserved: 0 }),
178            phantom: PhantomData,
179        }
180    }
181
182    pub fn owner_object_id(&self) -> Option<u64> {
183        self.owner_object_id
184    }
185
186    /// Returns the total amount of the reservation, not accounting for anything that might be held.
187    pub fn amount(&self) -> u64 {
188        self.inner.lock().amount
189    }
190
191    /// Adds more to the reservation.
192    pub fn add(&self, amount: u64) {
193        self.inner.lock().amount += amount;
194    }
195
196    /// Returns the entire amount of the reservation.  The caller is responsible for maintaining
197    /// consistency, i.e. updating counters, etc, and there can be no sub-reservations (an assert
198    /// will fire otherwise).
199    pub fn forget(&self) -> u64 {
200        let mut inner = self.inner.lock();
201        assert_eq!(inner.reserved, 0);
202        std::mem::take(&mut inner.amount)
203    }
204
205    /// Takes some of the reservation.  The caller is responsible for maintaining consistency,
206    /// i.e. updating counters, etc.  This will assert that the amount being forgotten does not
207    /// exceed the available reservation amount; the caller should ensure that this is the case.
208    pub fn forget_some(&self, amount: u64) {
209        let mut inner = self.inner.lock();
210        inner.amount -= amount;
211        assert!(inner.reserved <= inner.amount);
212    }
213
214    /// Returns a partial amount of the reservation. |amount| is passed the |limit| and should
215    /// return the amount, which can be zero.
216    fn reserve_with(&self, amount: impl FnOnce(u64) -> u64) -> ReservationImpl<&Self, Self> {
217        let mut inner = self.inner.lock();
218        let taken = amount(inner.amount - inner.reserved);
219        inner.reserved += taken;
220        ReservationImpl::new(self, self.owner_object_id, taken)
221    }
222
223    /// Reserves *exactly* amount if possible.
224    pub fn reserve(&self, amount: u64) -> Option<ReservationImpl<&Self, Self>> {
225        let mut inner = self.inner.lock();
226        if inner.amount - inner.reserved < amount {
227            None
228        } else {
229            inner.reserved += amount;
230            Some(ReservationImpl::new(self, self.owner_object_id, amount))
231        }
232    }
233
234    /// Commits a previously reserved amount from this reservation.  The caller is responsible for
235    /// ensuring the amount was reserved.
236    pub fn commit(&self, amount: u64) {
237        let mut inner = self.inner.lock();
238        inner.reserved -= amount;
239        inner.amount -= amount;
240    }
241
242    /// Returns some of the reservation.
243    pub fn give_back(&self, amount: u64) {
244        self.owner.borrow().release_reservation(self.owner_object_id, amount);
245        let mut inner = self.inner.lock();
246        inner.amount -= amount;
247        assert!(inner.reserved <= inner.amount);
248    }
249
250    /// Moves `amount` from this reservation to another reservation.
251    pub fn move_to<V: Borrow<W> + Clone + Send + Sync, W: ReservationOwner + ?Sized>(
252        &self,
253        other: &ReservationImpl<V, W>,
254        amount: u64,
255    ) {
256        assert_eq!(self.owner_object_id, other.owner_object_id());
257        let mut inner = self.inner.lock();
258        if let Some(amount) = inner.amount.checked_sub(amount) {
259            inner.amount = amount;
260        } else {
261            std::mem::drop(inner);
262            panic!("Insufficient reservation space");
263        }
264        other.add(amount);
265    }
266}
267
268impl<T: Borrow<U>, U: ReservationOwner + ?Sized> Drop for ReservationImpl<T, U> {
269    fn drop(&mut self) {
270        let inner = self.inner.get_mut();
271        assert_eq!(inner.reserved, 0);
272        let owner_object_id = self.owner_object_id;
273        if inner.amount > 0 {
274            self.owner
275                .borrow()
276                .release_reservation(owner_object_id, std::mem::take(&mut inner.amount));
277        }
278    }
279}
280
281impl<T: Borrow<U> + Send + Sync, U: ReservationOwner + ?Sized> ReservationOwner
282    for ReservationImpl<T, U>
283{
284    fn release_reservation(&self, owner_object_id: Option<u64>, amount: u64) {
285        // Sub-reservations should belong to the same owner (or lack thereof).
286        assert_eq!(owner_object_id, self.owner_object_id);
287        let mut inner = self.inner.lock();
288        assert!(inner.reserved >= amount, "{} >= {}", inner.reserved, amount);
289        inner.reserved -= amount;
290    }
291}
292
293pub type Reservation = ReservationImpl<Arc<dyn ReservationOwner>, dyn ReservationOwner>;
294
295pub type Hold<'a> = ReservationImpl<&'a Reservation, Reservation>;
296
297/// Our allocator implementation tracks extents with a reference count.  At time of writing, these
298/// reference counts should never exceed 1, but that might change with snapshots and clones.
299pub type AllocatorKey = AllocatorKeyV32;
300
301#[derive(Clone, Debug, Deserialize, Eq, Hash, PartialEq, Serialize, TypeFingerprint, Versioned)]
302#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
303pub struct AllocatorKeyV32 {
304    pub device_range: Range<u64>,
305}
306
307impl SortByU64 for AllocatorKey {
308    fn get_leading_u64(&self) -> u64 {
309        self.device_range.start
310    }
311}
312
313const EXTENT_HASH_BUCKET_SIZE: u64 = 1 * 1024 * 1024;
314
315pub struct AllocatorKeyPartitionIterator {
316    device_range: Range<u64>,
317}
318
319impl Iterator for AllocatorKeyPartitionIterator {
320    type Item = u64;
321
322    fn next(&mut self) -> Option<Self::Item> {
323        if self.device_range.start >= self.device_range.end {
324            None
325        } else {
326            let start = self.device_range.start;
327            self.device_range.start = start.saturating_add(EXTENT_HASH_BUCKET_SIZE);
328            let end = std::cmp::min(self.device_range.start, self.device_range.end);
329            let key = AllocatorKey { device_range: start..end };
330            let hash = crate::stable_hash::stable_hash(key);
331            Some(hash)
332        }
333    }
334}
335
336impl FuzzyHash for AllocatorKey {
337    fn fuzzy_hash(&self) -> impl Iterator<Item = u64> {
338        AllocatorKeyPartitionIterator {
339            device_range: round_down(self.device_range.start, EXTENT_HASH_BUCKET_SIZE)
340                ..round_up(self.device_range.end, EXTENT_HASH_BUCKET_SIZE).unwrap_or(u64::MAX),
341        }
342    }
343
344    fn is_range_key(&self) -> bool {
345        true
346    }
347}
348
349impl AllocatorKey {
350    /// Returns a new key that is a lower bound suitable for use with merge_into.
351    pub fn lower_bound_for_merge_into(self: &AllocatorKey) -> AllocatorKey {
352        AllocatorKey { device_range: 0..self.device_range.start }
353    }
354}
355
356impl LayerKey for AllocatorKey {
357    fn merge_type(&self) -> MergeType {
358        MergeType::OptimizedMerge
359    }
360}
361
362impl OrdUpperBound for AllocatorKey {
363    fn cmp_upper_bound(&self, other: &AllocatorKey) -> std::cmp::Ordering {
364        self.device_range.end.cmp(&other.device_range.end)
365    }
366}
367
368impl OrdLowerBound for AllocatorKey {
369    fn cmp_lower_bound(&self, other: &AllocatorKey) -> std::cmp::Ordering {
370        // The ordering over range.end is significant here as it is used in
371        // the heap ordering that feeds into our merge function and
372        // a total ordering over range lets us remove a symmetry case from
373        // the allocator merge function.
374        self.device_range
375            .start
376            .cmp(&other.device_range.start)
377            .then(self.device_range.end.cmp(&other.device_range.end))
378    }
379}
380
381impl Ord for AllocatorKey {
382    fn cmp(&self, other: &AllocatorKey) -> std::cmp::Ordering {
383        self.device_range
384            .start
385            .cmp(&other.device_range.start)
386            .then(self.device_range.end.cmp(&other.device_range.end))
387    }
388}
389
390impl PartialOrd for AllocatorKey {
391    fn partial_cmp(&self, other: &AllocatorKey) -> Option<std::cmp::Ordering> {
392        Some(self.cmp(other))
393    }
394}
395
396impl RangeKey for AllocatorKey {
397    fn overlaps(&self, other: &Self) -> bool {
398        self.device_range.start < other.device_range.end
399            && self.device_range.end > other.device_range.start
400    }
401}
402
403/// Allocations are "owned" by a single ObjectStore and are reference counted
404/// (for future snapshot/clone support).
405pub type AllocatorValue = AllocatorValueV32;
406impl Value for AllocatorValue {
407    const DELETED_MARKER: Self = Self::None;
408}
409
410#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize, TypeFingerprint, Versioned)]
411#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
412pub enum AllocatorValueV32 {
413    // Tombstone variant indicating an extent is no longer allocated.
414    None,
415    // Used when we know there are no possible allocations below us in the stack.
416    // This is currently all the time. We used to have a related Delta type but
417    // it has been removed due to correctness issues (https://fxbug.dev/42179428).
418    Abs { count: u64, owner_object_id: u64 },
419}
420
421pub type AllocatorItem = Item<AllocatorKey, AllocatorValue>;
422
423/// Serialized information about the allocator.
424pub type AllocatorInfo = AllocatorInfoV32;
425
426#[derive(Debug, Default, Clone, Deserialize, Serialize, TypeFingerprint, Versioned)]
427pub struct AllocatorInfoV32 {
428    /// Holds the set of layer file object_id for the LSM tree (newest first).
429    pub layers: Vec<u64>,
430    /// Maps from owner_object_id to bytes allocated.
431    pub allocated_bytes: BTreeMap<u64, u64>,
432    /// Set of owner_object_id that we should ignore if found in layer files.  For now, this should
433    /// always be empty on-disk because we always do full compactions.
434    pub marked_for_deletion: HashSet<u64>,
435    /// The limit for the number of allocates bytes per `owner_object_id` whereas the value. If
436    /// there is no limit present here for an `owner_object_id` assume it is max u64.
437    pub limit_bytes: BTreeMap<u64, u64>,
438}
439
440const MAX_ALLOCATOR_INFO_SERIALIZED_SIZE: usize = 131_072;
441
442/// Computes the target maximum extent size based on the block size of the allocator.
443pub fn max_extent_size_for_block_size(block_size: u64) -> u64 {
444    // Each block in an extent contains an 8-byte checksum (which due to varint encoding is 9
445    // bytes), and a given extent record must be no larger DEFAULT_MAX_SERIALIZED_RECORD_SIZE.  We
446    // also need to leave a bit of room (arbitrarily, 64 bytes) for the rest of the extent's
447    // metadata.
448    block_size * (DEFAULT_MAX_SERIALIZED_RECORD_SIZE - 64) / 9
449}
450
451#[derive(Default)]
452struct AllocatorCounters {
453    num_flushes: u64,
454    last_flush_time: Option<std::time::SystemTime>,
455}
456
457pub struct Allocator {
458    filesystem: Weak<FxFilesystem>,
459    block_size: u64,
460    device_size: u64,
461    object_id: u64,
462    max_extent_size_bytes: u64,
463    tree: LSMTree<AllocatorKey, AllocatorValue>,
464    // A list of allocations which are temporary; i.e. they are not actually stored in the LSM tree,
465    // but we still don't want to be able to allocate over them.  This layer is merged into the
466    // allocator's LSM tree whilst reading it, so the allocations appear to exist in the LSM tree.
467    // This is used in a few places, for example to hold back allocations that have been added to a
468    // transaction but are not yet committed yet, or to prevent the allocation of a deleted extent
469    // until the device is flushed.
470    temporary_allocations: Arc<SkipListLayer<AllocatorKey, AllocatorValue>>,
471    inner: Mutex<Inner>,
472    allocation_mutex: futures::lock::Mutex<()>,
473    counters: Mutex<AllocatorCounters>,
474    maximum_offset: AtomicU64,
475    allocations_allowed: AtomicBool,
476}
477
478/// Tracks the different stages of byte allocations for an individual owner.
479#[derive(Debug, Default, PartialEq)]
480struct ByteTracking {
481    /// This value is the up-to-date count of the number of allocated bytes per owner_object_id
482    /// whereas the value in `Info::allocated_bytes` is the value as it was when we last flushed.
483    /// This field should be regarded as *untrusted*; it can be invalid due to filesystem
484    /// inconsistencies, and this is why it is `Saturating<u64>` rather than just `u64`.  Any
485    /// computations that use this value should typically propagate `Saturating` so that it is clear
486    /// the value is *untrusted*.
487    allocated_bytes: Saturating<u64>,
488
489    /// This value is the number of bytes allocated to uncommitted allocations.
490    /// (Bytes allocated, but not yet persisted to disk)
491    uncommitted_allocated_bytes: u64,
492
493    /// This value is the number of bytes allocated to reservations.
494    reserved_bytes: u64,
495
496    /// Committed deallocations that we cannot use until they are flushed to the device.  Each entry
497    /// in this list is the log file offset at which it was committed and an array of deallocations
498    /// that occurred at that time.
499    committed_deallocated_bytes: u64,
500}
501
502impl ByteTracking {
503    // Returns the total number of bytes that are taken either from reservations, allocations or
504    // uncommitted allocations.
505    fn used_bytes(&self) -> Saturating<u64> {
506        self.allocated_bytes + Saturating(self.uncommitted_allocated_bytes + self.reserved_bytes)
507    }
508
509    // Returns the amount that is not available to be allocated, which includes actually allocated
510    // bytes, bytes that have been allocated for a transaction but the transaction hasn't committed
511    // yet, and bytes that have been deallocated, but the device hasn't been flushed yet so we can't
512    // reuse those bytes yet.
513    fn unavailable_bytes(&self) -> Saturating<u64> {
514        self.allocated_bytes
515            + Saturating(self.uncommitted_allocated_bytes)
516            + Saturating(self.committed_deallocated_bytes)
517    }
518
519    // Like `unavailable_bytes`, but treats as available the bytes which have been deallocated and
520    // require a device flush to be reused.
521    fn unavailable_after_sync_bytes(&self) -> Saturating<u64> {
522        self.allocated_bytes + Saturating(self.uncommitted_allocated_bytes)
523    }
524}
525
526#[derive(Debug)]
527struct CommittedDeallocation {
528    // The offset at which this deallocation was committed.
529    log_file_offset: u64,
530    // The device range being deallocated.
531    range: Range<u64>,
532    // The owning object id which originally allocated it.
533    owner_object_id: u64,
534}
535
536struct Inner {
537    info: AllocatorInfo,
538
539    /// The allocator can only be opened if there have been no allocations and it has not already
540    /// been opened or initialized.
541    opened: bool,
542
543    /// When we allocate a range from RAM, we add it to Allocator::temporary_allocations.
544    /// This layer is added to layer_set in rebuild_strategy and take_for trim so we don't assume
545    /// the range is available.
546    /// If we apply the mutation, we move it from `temporary_allocations` to the LSMTree.
547    /// If we drop the mutation, we just delete it from `temporary_allocations` and call `free`.
548    ///
549    /// We need to be very careful about races when we move the allocation into the LSMTree.
550    /// A layer_set is a snapshot in time of a set of layers. Say:
551    ///  1. `rebuild_strategy` takes a layer_set that includes the mutable layer.
552    ///  2. A compaction operation seals that mutable layer and creates a new mutable layer.
553    ///     Note that the layer_set in rebuild_strategy doesn't have this new mutable layer.
554    ///  3. An apply_mutation operation adds the allocation to the new mutable layer.
555    ///     It then removes the allocation from `temporary_allocations`.
556    ///  4. `rebuild_strategy` iterates the layer_set, missing both the temporary mutation AND
557    ///     the record in the new mutable layer, making the allocated range available for double
558    ///     allocation and future filesystem corruption.
559    ///
560    /// We don't have (or want) a means to lock compctions during rebuild_strategy operations so
561    /// to avoid this scenario, we can't remove from temporary_allocations when we apply a mutation.
562    /// Instead we add them to dropped_temporary_allocations and make the actual removals from
563    /// `temporary_allocations` while holding the allocator lock. Because rebuild_strategy only
564    /// runs with this lock held, this guarantees that we don't remove entries from temporary
565    /// mutations while also iterating over it.
566    ///
567    /// A related issue happens when we deallocate a range. We use temporary_allocations this time
568    /// to prevent reuse until after the deallocation has been successfully flushed.
569    /// In this scenario we don't want to call 'free()' until the range is ready to use again.
570    dropped_temporary_allocations: Vec<Range<u64>>,
571
572    /// The per-owner counters for bytes at various stages of the data life-cycle. From initial
573    /// reservation through until the bytes are unallocated and eventually uncommitted.
574    owner_bytes: BTreeMap<u64, ByteTracking>,
575
576    /// This value is the number of bytes allocated to reservations but not tracked as part of a
577    /// particular volume.
578    unattributed_reserved_bytes: u64,
579
580    /// Committed deallocations that we cannot use until they are flushed to the device.
581    committed_deallocated: VecDeque<CommittedDeallocation>,
582
583    /// Bytes which are currently being trimmed.  These can still be allocated from, but the
584    /// allocation will block until the current batch of trimming is finished.
585    trim_reserved_bytes: u64,
586
587    /// While a trim is being performed, this listener is set.  When the current batch of extents
588    /// being trimmed have been released (and trim_reserved_bytes is 0), this is signaled.
589    /// This should only be listened to while the allocation_mutex is held.
590    trim_listener: Option<EventListener>,
591
592    /// This controls how we allocate our free space to manage fragmentation.
593    strategy: strategy::BestFit,
594
595    /// Tracks the number of allocations of size 1,2,...63,>=64.
596    allocation_size_histogram: [u64; 64],
597    /// Tracks which size bucket triggers rebuild_strategy.
598    rebuild_strategy_trigger_histogram: [u64; 64],
599
600    /// This is distinct from the set contained in `info`.  New entries are inserted *after* a
601    /// device has been flushed (it is not safe to reuse the space taken by a deleted volume prior
602    /// to this) and are removed after a major compaction.
603    marked_for_deletion: HashSet<u64>,
604
605    /// The set of volumes deleted that are waiting for a sync.
606    volumes_deleted_pending_sync: HashSet<u64>,
607}
608
609impl Inner {
610    fn allocated_bytes(&self) -> Saturating<u64> {
611        let mut total = Saturating(0);
612        for (_, bytes) in &self.owner_bytes {
613            total += bytes.allocated_bytes;
614        }
615        total
616    }
617
618    fn uncommitted_allocated_bytes(&self) -> u64 {
619        self.owner_bytes.values().map(|x| &x.uncommitted_allocated_bytes).sum()
620    }
621
622    fn reserved_bytes(&self) -> u64 {
623        self.owner_bytes.values().map(|x| &x.reserved_bytes).sum::<u64>()
624            + self.unattributed_reserved_bytes
625    }
626
627    fn owner_id_limit_bytes(&self, owner_object_id: u64) -> u64 {
628        match self.info.limit_bytes.get(&owner_object_id) {
629            Some(v) => *v,
630            None => u64::MAX,
631        }
632    }
633
634    fn owner_id_bytes_left(&self, owner_object_id: u64) -> u64 {
635        let limit = self.owner_id_limit_bytes(owner_object_id);
636        let used = self.owner_bytes.get(&owner_object_id).map_or(Saturating(0), |b| b.used_bytes());
637        (Saturating(limit) - used).0
638    }
639
640    // Returns the amount that is not available to be allocated, which includes actually allocated
641    // bytes, bytes that have been allocated for a transaction but the transaction hasn't committed
642    // yet, and bytes that have been deallocated, but the device hasn't been flushed yet so we can't
643    // reuse those bytes yet.
644    fn unavailable_bytes(&self) -> Saturating<u64> {
645        let mut total = Saturating(0);
646        for (_, bytes) in &self.owner_bytes {
647            total += bytes.unavailable_bytes();
648        }
649        total
650    }
651
652    // Returns the total number of bytes that are taken either from reservations, allocations or
653    // uncommitted allocations.
654    fn used_bytes(&self) -> Saturating<u64> {
655        let mut total = Saturating(0);
656        for (_, bytes) in &self.owner_bytes {
657            total += bytes.used_bytes();
658        }
659        total + Saturating(self.unattributed_reserved_bytes)
660    }
661
662    // Like `unavailable_bytes`, but treats as available the bytes which have been deallocated and
663    // require a device flush to be reused.
664    fn unavailable_after_sync_bytes(&self) -> Saturating<u64> {
665        let mut total = Saturating(0);
666        for (_, bytes) in &self.owner_bytes {
667            total += bytes.unavailable_after_sync_bytes();
668        }
669        total
670    }
671
672    // Returns the number of bytes which will be available after the current batch of trimming
673    // completes.
674    fn bytes_available_not_being_trimmed(&self, device_size: u64) -> Result<u64, Error> {
675        device_size
676            .checked_sub(
677                (self.unavailable_after_sync_bytes() + Saturating(self.trim_reserved_bytes)).0,
678            )
679            .ok_or_else(|| anyhow!(FxfsError::Inconsistent))
680    }
681
682    fn add_reservation(&mut self, owner_object_id: Option<u64>, amount: u64) {
683        match owner_object_id {
684            Some(owner) => self.owner_bytes.entry(owner).or_default().reserved_bytes += amount,
685            None => self.unattributed_reserved_bytes += amount,
686        };
687    }
688
689    fn remove_reservation(&mut self, owner_object_id: Option<u64>, amount: u64) {
690        match owner_object_id {
691            Some(owner) => {
692                let owner_entry = self.owner_bytes.entry(owner).or_default();
693                assert!(
694                    owner_entry.reserved_bytes >= amount,
695                    "{} >= {}",
696                    owner_entry.reserved_bytes,
697                    amount
698                );
699                owner_entry.reserved_bytes -= amount;
700            }
701            None => {
702                assert!(
703                    self.unattributed_reserved_bytes >= amount,
704                    "{} >= {}",
705                    self.unattributed_reserved_bytes,
706                    amount
707                );
708                self.unattributed_reserved_bytes -= amount
709            }
710        };
711    }
712}
713
714/// A container for a set of extents which are known to be free and can be trimmed.  Returned by
715/// `take_for_trimming`.
716pub struct TrimmableExtents<'a> {
717    allocator: &'a Allocator,
718    extents: Vec<Range<u64>>,
719    // The allocator can subscribe to this event to wait until these extents are dropped.  This way,
720    // we don't fail an allocation attempt if blocks are tied up for trimming; rather, we just wait
721    // until the batch is finished with and then proceed.
722    _drop_event: DropEvent,
723}
724
725impl<'a> TrimmableExtents<'a> {
726    pub fn extents(&self) -> &Vec<Range<u64>> {
727        &self.extents
728    }
729
730    // Also returns an EventListener which is signaled when this is dropped.
731    fn new(allocator: &'a Allocator) -> (Self, EventListener) {
732        let drop_event = DropEvent::new();
733        let listener = drop_event.listen();
734        (Self { allocator, extents: vec![], _drop_event: drop_event }, listener)
735    }
736
737    fn add_extent(&mut self, extent: Range<u64>) {
738        self.extents.push(extent);
739    }
740}
741
742impl<'a> Drop for TrimmableExtents<'a> {
743    fn drop(&mut self) {
744        let mut inner = self.allocator.inner.lock();
745        for device_range in std::mem::take(&mut self.extents) {
746            inner.strategy.free(device_range.clone()).expect("drop trim extent");
747            self.allocator
748                .temporary_allocations
749                .erase(&AllocatorKey { device_range: device_range.clone() });
750        }
751        inner.trim_reserved_bytes = 0;
752    }
753}
754
755impl Allocator {
756    pub fn new(filesystem: Arc<FxFilesystem>, object_id: u64) -> Allocator {
757        let block_size = filesystem.block_size();
758        // We expect device size to be a multiple of block size. Throw away any tail.
759        let device_size = round_down(filesystem.device().size(), block_size);
760        if device_size != filesystem.device().size() {
761            warn!("Device size is not block aligned. Rounding down.");
762        }
763        let max_extent_size_bytes = max_extent_size_for_block_size(filesystem.block_size());
764        let mut strategy = strategy::BestFit::default();
765        strategy.free(0..device_size).expect("new fs");
766        Allocator {
767            filesystem: Arc::downgrade(&filesystem),
768            block_size,
769            device_size,
770            object_id,
771            max_extent_size_bytes,
772            tree: LSMTree::new(merge, Box::new(NullCache {})),
773            temporary_allocations: SkipListLayer::new(1024),
774            inner: Mutex::new(Inner {
775                info: AllocatorInfo::default(),
776                opened: false,
777                dropped_temporary_allocations: Vec::new(),
778                owner_bytes: BTreeMap::new(),
779                unattributed_reserved_bytes: 0,
780                committed_deallocated: VecDeque::new(),
781                trim_reserved_bytes: 0,
782                trim_listener: None,
783                strategy,
784                allocation_size_histogram: [0; 64],
785                rebuild_strategy_trigger_histogram: [0; 64],
786                marked_for_deletion: HashSet::new(),
787                volumes_deleted_pending_sync: HashSet::new(),
788            }),
789            allocation_mutex: futures::lock::Mutex::new(()),
790            counters: Mutex::new(AllocatorCounters::default()),
791            maximum_offset: AtomicU64::new(0),
792            allocations_allowed: AtomicBool::new(filesystem.options().image_builder_mode.is_none()),
793        }
794    }
795
796    pub fn tree(&self) -> &LSMTree<AllocatorKey, AllocatorValue> {
797        &self.tree
798    }
799
800    /// Enables allocations.
801    /// This is only valid to call if the allocator was created in image builder mode.
802    pub fn enable_allocations(&self) {
803        self.allocations_allowed.store(true, Ordering::SeqCst);
804    }
805
806    /// Returns an iterator that yields all allocations, filtering out tombstones and any
807    /// owner_object_id that have been marked as deleted.  If `committed_marked_for_deletion` is
808    /// true, then filter using the committed volumes marked for deletion rather than the in-memory
809    /// copy which excludes volumes that have been deleted but there hasn't been a sync yet.
810    pub async fn filter(
811        &self,
812        iter: impl LayerIterator<AllocatorKey, AllocatorValue>,
813        committed_marked_for_deletion: bool,
814    ) -> Result<impl LayerIterator<AllocatorKey, AllocatorValue>, Error> {
815        let marked_for_deletion = {
816            let inner = self.inner.lock();
817            if committed_marked_for_deletion {
818                &inner.info.marked_for_deletion
819            } else {
820                &inner.marked_for_deletion
821            }
822            .clone()
823        };
824        let iter =
825            filter_marked_for_deletion(filter_tombstones(iter).await?, marked_for_deletion).await?;
826        Ok(iter)
827    }
828
829    /// A histogram of allocation request sizes.
830    /// The index into the array is 'number of blocks'.
831    /// The last bucket is a catch-all for larger allocation requests.
832    pub fn allocation_size_histogram(&self) -> [u64; 64] {
833        self.inner.lock().allocation_size_histogram
834    }
835
836    /// Creates a new (empty) allocator.
837    pub async fn create(&self, transaction: &mut Transaction<'_>) -> Result<(), Error> {
838        // Mark the allocator as opened before creating the file because creating a new
839        // transaction requires a reservation.
840        assert_eq!(std::mem::replace(&mut self.inner.lock().opened, true), false);
841
842        let filesystem = self.filesystem.upgrade().unwrap();
843        let root_store = filesystem.root_store();
844        ObjectStore::create_object_with_id(
845            &root_store,
846            transaction,
847            ReservedId::new(&root_store, NonZero::new(self.object_id()).unwrap()),
848            HandleOptions::default(),
849            None,
850        )?;
851        root_store.update_last_object_id(self.object_id());
852        Ok(())
853    }
854
855    // Opens the allocator.  This is not thread-safe; this should be called prior to
856    // replaying the allocator's mutations.
857    pub async fn open(self: &Arc<Self>) -> Result<(), Error> {
858        let filesystem = self.filesystem.upgrade().unwrap();
859        let root_store = filesystem.root_store();
860
861        self.inner.lock().strategy = strategy::BestFit::default();
862
863        let handle =
864            ObjectStore::open_object(&root_store, self.object_id, HandleOptions::default(), None)
865                .await
866                .context("Failed to open allocator object")?;
867
868        if handle.get_size() > 0 {
869            let serialized_info = handle
870                .contents(MAX_ALLOCATOR_INFO_SERIALIZED_SIZE)
871                .await
872                .context("Failed to read AllocatorInfo")?;
873            let mut cursor = std::io::Cursor::new(serialized_info);
874            let (info, _version) = AllocatorInfo::deserialize_with_version(&mut cursor)
875                .context("Failed to deserialize AllocatorInfo")?;
876
877            let mut handles = Vec::new();
878            let mut total_size = 0;
879            for object_id in &info.layers {
880                let handle = ObjectStore::open_object(
881                    &root_store,
882                    *object_id,
883                    HandleOptions::default(),
884                    None,
885                )
886                .await
887                .context("Failed to open allocator layer file")?;
888
889                let size = handle.get_size();
890                total_size += size;
891                handles.push(handle);
892            }
893
894            {
895                let mut inner = self.inner.lock();
896
897                // Check allocated_bytes fits within the device.
898                let mut device_bytes = self.device_size;
899                for (&owner_object_id, &bytes) in &info.allocated_bytes {
900                    ensure!(
901                        bytes <= device_bytes,
902                        anyhow!(FxfsError::Inconsistent).context(format!(
903                            "Allocated bytes exceeds device size: {:?}",
904                            info.allocated_bytes
905                        ))
906                    );
907                    device_bytes -= bytes;
908
909                    inner.owner_bytes.entry(owner_object_id).or_default().allocated_bytes =
910                        Saturating(bytes);
911                }
912
913                inner.info = info;
914            }
915
916            self.tree.append_layers(handles).await.context("Failed to append allocator layers")?;
917            self.filesystem.upgrade().unwrap().object_manager().update_reservation(
918                self.object_id,
919                tree::reservation_amount_from_layer_size(total_size),
920            );
921        }
922
923        Ok(())
924    }
925
926    pub async fn on_replay_complete(self: &Arc<Self>) -> Result<(), Error> {
927        // We can assume the device has been flushed.
928        {
929            let mut inner = self.inner.lock();
930            inner.volumes_deleted_pending_sync.clear();
931            inner.marked_for_deletion = inner.info.marked_for_deletion.clone();
932        }
933
934        // Build free extent structure from disk. For now, we expect disks to have *some* free
935        // space at all times. This may change if we ever support mounting of read-only
936        // redistributable filesystem images.
937        if !self.rebuild_strategy().await.context("Build free extents")? {
938            if self.filesystem.upgrade().unwrap().options().read_only {
939                info!("Device contains no free space (read-only mode).");
940            } else {
941                info!("Device contains no free space.");
942                return Err(FxfsError::Inconsistent)
943                    .context("Device appears to contain no free space");
944            }
945        }
946
947        assert_eq!(std::mem::replace(&mut self.inner.lock().opened, true), false);
948        Ok(())
949    }
950
951    /// Walk all allocations to generate the set of free regions between allocations.
952    /// It is safe to re-run this on live filesystems but it should not be called concurrently
953    /// with allocations, trims or other rebuild_strategy invocations -- use allocation_mutex.
954    /// Returns true if the rebuild made changes to either the free ranges or overflow markers.
955    async fn rebuild_strategy(self: &Arc<Self>) -> Result<bool, Error> {
956        let mut changed = false;
957        let mut layer_set = self.tree.empty_layer_set();
958        layer_set.layers.push((self.temporary_allocations.clone() as Arc<dyn Layer<_, _>>).into());
959        self.tree.add_all_layers_to_layer_set(&mut layer_set);
960
961        let overflow_markers = self.inner.lock().strategy.overflow_markers();
962        self.inner.lock().strategy.reset_overflow_markers();
963
964        let mut to_add = Vec::new();
965        let mut merger = layer_set.merger();
966        let mut iter = self.filter(merger.query(Query::FullScan).await?, false).await?;
967        let mut last_offset = 0;
968        while last_offset < self.device_size {
969            let next_range = match iter.get() {
970                None => {
971                    assert!(last_offset <= self.device_size);
972                    let range = last_offset..self.device_size;
973                    last_offset = self.device_size;
974                    iter.advance().await?;
975                    range
976                }
977                Some(ItemRef { key: AllocatorKey { device_range, .. }, .. }) => {
978                    if device_range.end < last_offset {
979                        iter.advance().await?;
980                        continue;
981                    }
982                    if device_range.start <= last_offset {
983                        last_offset = device_range.end;
984                        iter.advance().await?;
985                        continue;
986                    }
987                    let range = last_offset..device_range.start;
988                    last_offset = device_range.end;
989                    iter.advance().await?;
990                    range
991                }
992            };
993            to_add.push(next_range);
994            // Avoid taking a lock on inner for every free range.
995            if to_add.len() > 100 {
996                let mut inner = self.inner.lock();
997                for range in to_add.drain(..) {
998                    changed |= inner.strategy.force_free(range)?;
999                }
1000            }
1001        }
1002        let mut inner = self.inner.lock();
1003        for range in to_add {
1004            changed |= inner.strategy.force_free(range)?;
1005        }
1006        if overflow_markers != inner.strategy.overflow_markers() {
1007            changed = true;
1008        }
1009        Ok(changed)
1010    }
1011
1012    /// Collects up to `extents_per_batch` free extents of size up to `max_extent_size` from
1013    /// `offset`.  The extents will be reserved.
1014    /// Note that only one `FreeExtents` can exist for the allocator at any time.
1015    pub async fn take_for_trimming(
1016        &self,
1017        offset: u64,
1018        max_extent_size: usize,
1019        extents_per_batch: usize,
1020    ) -> Result<TrimmableExtents<'_>, Error> {
1021        let _guard = self.allocation_mutex.lock().await;
1022
1023        let (mut result, listener) = TrimmableExtents::new(self);
1024        let mut bytes = 0;
1025
1026        // We can't just use self.strategy here because it doesn't necessarily hold all
1027        // free extent metadata in RAM.
1028        let mut layer_set = self.tree.empty_layer_set();
1029        layer_set.layers.push((self.temporary_allocations.clone() as Arc<dyn Layer<_, _>>).into());
1030        self.tree.add_all_layers_to_layer_set(&mut layer_set);
1031        let mut merger = layer_set.merger();
1032        let mut iter = self
1033            .filter(
1034                merger.query(Query::FullRange(&AllocatorKey { device_range: offset..0 })).await?,
1035                false,
1036            )
1037            .await?;
1038        let mut last_offset = offset;
1039        'outer: while last_offset < self.device_size {
1040            let mut range = match iter.get() {
1041                None => {
1042                    assert!(last_offset <= self.device_size);
1043                    let range = last_offset..self.device_size;
1044                    last_offset = self.device_size;
1045                    iter.advance().await?;
1046                    range
1047                }
1048                Some(ItemRef { key: AllocatorKey { device_range, .. }, .. }) => {
1049                    if device_range.end <= last_offset {
1050                        iter.advance().await?;
1051                        continue;
1052                    }
1053                    if device_range.start <= last_offset {
1054                        last_offset = device_range.end;
1055                        iter.advance().await?;
1056                        continue;
1057                    }
1058                    let range = last_offset..device_range.start;
1059                    last_offset = device_range.end;
1060                    iter.advance().await?;
1061                    range
1062                }
1063            };
1064            if range.start < offset {
1065                continue;
1066            }
1067
1068            // 'range' is based on the on-disk LSM tree. We need to check against any uncommitted
1069            // allocations and remove temporarily allocate the range for ourselves.
1070            let mut inner = self.inner.lock();
1071
1072            while range.start < range.end {
1073                let prefix =
1074                    range.start..std::cmp::min(range.start + max_extent_size as u64, range.end);
1075                range = prefix.end..range.end;
1076                bytes += prefix.length()?;
1077                // We can assume the following are safe because we've checked both LSM tree and
1078                // temporary_allocations while holding the lock.
1079                inner.strategy.remove(prefix.clone());
1080                self.temporary_allocations.insert(AllocatorItem {
1081                    key: AllocatorKey { device_range: prefix.clone() },
1082                    value: AllocatorValue::Abs { owner_object_id: INVALID_OBJECT_ID, count: 1 },
1083                    sequence: 0,
1084                })?;
1085                result.add_extent(prefix);
1086                if result.extents.len() == extents_per_batch {
1087                    break 'outer;
1088                }
1089            }
1090            if result.extents.len() == extents_per_batch {
1091                break 'outer;
1092            }
1093        }
1094        {
1095            let mut inner = self.inner.lock();
1096
1097            assert!(inner.trim_reserved_bytes == 0, "Multiple trims ongoing");
1098            inner.trim_listener = Some(listener);
1099            inner.trim_reserved_bytes = bytes;
1100            debug_assert!(
1101                (Saturating(inner.trim_reserved_bytes) + inner.unavailable_bytes()).0
1102                    <= self.device_size
1103            );
1104        }
1105        Ok(result)
1106    }
1107
1108    /// Returns all objects that exist in the parent store that pertain to this allocator.
1109    pub fn parent_objects(&self) -> Vec<u64> {
1110        // The allocator tree needs to store a file for each of the layers in the tree, so we return
1111        // those, since nothing else references them.
1112        self.inner.lock().info.layers.clone()
1113    }
1114
1115    /// Returns all the current owner byte limits (in pairs of `(owner_object_id, bytes)`).
1116    pub fn owner_byte_limits(&self) -> Vec<(u64, u64)> {
1117        self.inner.lock().info.limit_bytes.iter().map(|(k, v)| (*k, *v)).collect()
1118    }
1119
1120    /// Returns (allocated_bytes, byte_limit) for the given owner.
1121    pub fn owner_allocation_info(&self, owner_object_id: u64) -> (u64, Option<u64>) {
1122        let inner = self.inner.lock();
1123        (
1124            inner.owner_bytes.get(&owner_object_id).map(|b| b.used_bytes().0).unwrap_or(0u64),
1125            inner.info.limit_bytes.get(&owner_object_id).copied(),
1126        )
1127    }
1128
1129    /// Returns owner bytes debug information.
1130    pub fn owner_bytes_debug(&self) -> String {
1131        format!("{:?}", self.inner.lock().owner_bytes)
1132    }
1133
1134    fn needs_sync(&self) -> bool {
1135        // TODO(https://fxbug.dev/42178048): This will only trigger if *all* free space is taken up with
1136        // committed deallocated bytes, but we might want to trigger a sync if we're low and there
1137        // happens to be a lot of deallocated bytes as that might mean we can fully satisfy
1138        // allocation requests.
1139        let inner = self.inner.lock();
1140        inner.unavailable_bytes().0 >= self.device_size
1141    }
1142
1143    fn is_system_store(&self, owner_object_id: u64) -> bool {
1144        let fs = self.filesystem.upgrade().unwrap();
1145        owner_object_id == fs.object_manager().root_store_object_id()
1146            || owner_object_id == fs.object_manager().root_parent_store_object_id()
1147    }
1148
1149    /// Updates the accounting to track that a byte reservation has been moved out of an owner to
1150    /// the unattributed pool.
1151    pub fn disown_reservation(&self, old_owner_object_id: Option<u64>, amount: u64) {
1152        if old_owner_object_id.is_none() || amount == 0 {
1153            return;
1154        }
1155        // These 2 mutations should behave as though they're a single atomic mutation.
1156        let mut inner = self.inner.lock();
1157        inner.remove_reservation(old_owner_object_id, amount);
1158        inner.add_reservation(None, amount);
1159    }
1160
1161    /// Creates a lazy inspect node named `str` under `parent` which will yield statistics for the
1162    /// allocator when queried.
1163    pub fn track_statistics(self: &Arc<Self>, parent: &fuchsia_inspect::Node, name: &str) {
1164        let this = Arc::downgrade(self);
1165        parent.record_lazy_child(name, move || {
1166            let this_clone = this.clone();
1167            async move {
1168                let inspector = fuchsia_inspect::Inspector::default();
1169                if let Some(this) = this_clone.upgrade() {
1170                    let counters = this.counters.lock();
1171                    let root = inspector.root();
1172                    root.record_uint("max_extent_size_bytes", this.max_extent_size_bytes);
1173                    root.record_uint("bytes_total", this.device_size);
1174                    let (allocated, reserved, used, unavailable) = {
1175                        // TODO(https://fxbug.dev/42069513): Push-back or rate-limit to prevent DoS.
1176                        let inner = this.inner.lock();
1177                        (
1178                            inner.allocated_bytes().0,
1179                            inner.reserved_bytes(),
1180                            inner.used_bytes().0,
1181                            inner.unavailable_bytes().0,
1182                        )
1183                    };
1184                    root.record_uint("bytes_allocated", allocated);
1185                    root.record_uint("bytes_reserved", reserved);
1186                    root.record_uint("bytes_used", used);
1187                    root.record_uint("bytes_unavailable", unavailable);
1188
1189                    // TODO(https://fxbug.dev/42068224): Post-compute rather than manually computing
1190                    // metrics.
1191                    if let Some(x) = round_div(100 * allocated, this.device_size) {
1192                        root.record_uint("bytes_allocated_percent", x);
1193                    }
1194                    if let Some(x) = round_div(100 * reserved, this.device_size) {
1195                        root.record_uint("bytes_reserved_percent", x);
1196                    }
1197                    if let Some(x) = round_div(100 * used, this.device_size) {
1198                        root.record_uint("bytes_used_percent", x);
1199                    }
1200                    if let Some(x) = round_div(100 * unavailable, this.device_size) {
1201                        root.record_uint("bytes_unavailable_percent", x);
1202                    }
1203
1204                    root.record_uint("num_flushes", counters.num_flushes);
1205                    if let Some(last_flush_time) = counters.last_flush_time.as_ref() {
1206                        root.record_uint(
1207                            "last_flush_time_ms",
1208                            last_flush_time
1209                                .duration_since(std::time::UNIX_EPOCH)
1210                                .unwrap_or(std::time::Duration::ZERO)
1211                                .as_millis()
1212                                .try_into()
1213                                .unwrap_or(0u64),
1214                        );
1215                    }
1216
1217                    let data = this.allocation_size_histogram();
1218                    let alloc_sizes = root.create_uint_linear_histogram(
1219                        "allocation_size_histogram",
1220                        fuchsia_inspect::LinearHistogramParams {
1221                            floor: 1,
1222                            step_size: 1,
1223                            buckets: 64,
1224                        },
1225                    );
1226                    for (i, count) in data.iter().enumerate() {
1227                        if i != 0 {
1228                            alloc_sizes.insert_multiple(i as u64, *count as usize);
1229                        }
1230                    }
1231                    root.record(alloc_sizes);
1232
1233                    let data = this.inner.lock().rebuild_strategy_trigger_histogram;
1234                    let triggers = root.create_uint_linear_histogram(
1235                        "rebuild_strategy_triggers",
1236                        fuchsia_inspect::LinearHistogramParams {
1237                            floor: 1,
1238                            step_size: 1,
1239                            buckets: 64,
1240                        },
1241                    );
1242                    for (i, count) in data.iter().enumerate() {
1243                        if i != 0 {
1244                            triggers.insert_multiple(i as u64, *count as usize);
1245                        }
1246                    }
1247                    root.record(triggers);
1248                }
1249                Ok(inspector)
1250            }
1251            .boxed()
1252        });
1253    }
1254
1255    /// Returns the offset of the first byte which has not been used by the allocator since its
1256    /// creation.
1257    /// NB: This does *not* take into account existing allocations.  This is only reliable when the
1258    /// allocator was created from scratch, without any pre-existing allocations.
1259    pub fn maximum_offset(&self) -> u64 {
1260        self.maximum_offset.load(Ordering::Relaxed)
1261    }
1262}
1263
1264impl Drop for Allocator {
1265    fn drop(&mut self) {
1266        let inner = self.inner.lock();
1267        // Uncommitted and reserved should be released back using RAII, so they should be zero.
1268        assert_eq!(inner.uncommitted_allocated_bytes(), 0);
1269        assert_eq!(inner.reserved_bytes(), 0);
1270    }
1271}
1272
1273#[fxfs_trace::trace]
1274impl Allocator {
1275    /// Returns the object ID for the allocator.
1276    pub fn object_id(&self) -> u64 {
1277        self.object_id
1278    }
1279
1280    /// Returns information about the allocator such as the layer files storing persisted
1281    /// allocations.
1282    pub fn info(&self) -> AllocatorInfo {
1283        self.inner.lock().info.clone()
1284    }
1285
1286    /// Tries to allocate enough space for |object_range| in the specified object and returns the
1287    /// device range allocated.
1288    /// The allocated range may be short (e.g. due to fragmentation), in which case the caller can
1289    /// simply call allocate again until they have enough blocks.
1290    ///
1291    /// We also store the object store ID of the store that the allocation should be assigned to so
1292    /// that we have a means to delete encrypted stores without needing the encryption key.
1293    #[trace]
1294    pub async fn allocate(
1295        self: &Arc<Self>,
1296        transaction: &mut Transaction<'_>,
1297        owner_object_id: u64,
1298        mut len: u64,
1299    ) -> Result<Range<u64>, Error> {
1300        ensure!(self.allocations_allowed.load(Ordering::SeqCst), FxfsError::Unavailable);
1301        assert_eq!(len % self.block_size, 0);
1302        len = std::cmp::min(len, self.max_extent_size_bytes);
1303        debug_assert_ne!(owner_object_id, INVALID_OBJECT_ID);
1304
1305        // Make sure we have space reserved before we try and find the space.
1306        let reservation = if let Some(reservation) = transaction.allocator_reservation {
1307            match reservation.owner_object_id {
1308                // If there is no owner, this must be a system store that we're allocating for.
1309                None => assert!(self.is_system_store(owner_object_id)),
1310                // If it has an owner, it should not be different than the allocating owner.
1311                Some(res_owner_object_id) => assert_eq!(owner_object_id, res_owner_object_id),
1312            };
1313            // Reservation limits aren't necessarily a multiple of the block size.
1314            let r = reservation
1315                .reserve_with(|limit| std::cmp::min(len, round_down(limit, self.block_size)));
1316            len = r.amount();
1317            Left(r)
1318        } else {
1319            let mut inner = self.inner.lock();
1320            assert!(inner.opened);
1321            // Do not exceed the limit for the owner or the device.
1322            let device_used = inner.used_bytes();
1323            let owner_bytes_left = inner.owner_id_bytes_left(owner_object_id);
1324            // We must take care not to use up space that might be reserved.
1325            let limit =
1326                std::cmp::min(owner_bytes_left, (Saturating(self.device_size) - device_used).0);
1327            len = round_down(std::cmp::min(len, limit), self.block_size);
1328            let owner_entry = inner.owner_bytes.entry(owner_object_id).or_default();
1329            owner_entry.reserved_bytes += len;
1330            Right(ReservationImpl::<_, Self>::new(&**self, Some(owner_object_id), len))
1331        };
1332
1333        ensure!(len > 0, FxfsError::NoSpace);
1334
1335        // If volumes have been deleted, flush the device so that we can use any of the freed space.
1336        let volumes_deleted = {
1337            let inner = self.inner.lock();
1338            (!inner.volumes_deleted_pending_sync.is_empty())
1339                .then(|| inner.volumes_deleted_pending_sync.clone())
1340        };
1341
1342        if let Some(volumes_deleted) = volumes_deleted {
1343            // No locks are held here, so in theory, there could be unnecessary syncs, but it
1344            // should be sufficiently rare that it won't matter.
1345            self.filesystem
1346                .upgrade()
1347                .unwrap()
1348                .sync(SyncOptions {
1349                    flush_device: true,
1350                    precondition: Some(Box::new(|| {
1351                        !self.inner.lock().volumes_deleted_pending_sync.is_empty()
1352                    })),
1353                    ..Default::default()
1354                })
1355                .await?;
1356
1357            {
1358                let mut inner = self.inner.lock();
1359                for owner_id in volumes_deleted {
1360                    inner.volumes_deleted_pending_sync.remove(&owner_id);
1361                    inner.marked_for_deletion.insert(owner_id);
1362                }
1363            }
1364
1365            let _guard = self.allocation_mutex.lock().await;
1366            self.rebuild_strategy().await?;
1367        }
1368
1369        #[allow(clippy::never_loop)] // Loop used as a for {} else {}.
1370        let _guard = 'sync: loop {
1371            // Cap number of sync attempts before giving up on finding free space.
1372            for _ in 0..10 {
1373                {
1374                    let guard = self.allocation_mutex.lock().await;
1375
1376                    if !self.needs_sync() {
1377                        break 'sync guard;
1378                    }
1379                }
1380
1381                // All the free space is currently tied up with deallocations, so we need to sync
1382                // and flush the device to free that up.
1383                //
1384                // We can't hold the allocation lock whilst we sync here because the allocation lock
1385                // is also taken in apply_mutations, which is called when journal locks are held,
1386                // and we call sync here which takes those same locks, so it would have the
1387                // potential to result in a deadlock.  Sync holds its own lock to guard against
1388                // multiple syncs occurring at the same time, and we can supply a precondition that
1389                // is evaluated under that lock to ensure we don't sync twice if we don't need to.
1390                self.filesystem
1391                    .upgrade()
1392                    .unwrap()
1393                    .sync(SyncOptions {
1394                        flush_device: true,
1395                        precondition: Some(Box::new(|| self.needs_sync())),
1396                        ..Default::default()
1397                    })
1398                    .await?;
1399            }
1400            bail!(
1401                anyhow!(FxfsError::NoSpace).context("Sync failed to yield sufficient free space.")
1402            );
1403        };
1404
1405        let mut trim_listener = None;
1406        {
1407            let mut inner = self.inner.lock();
1408            inner.allocation_size_histogram[std::cmp::min(63, len / self.block_size) as usize] += 1;
1409
1410            // If trimming would be the reason that this allocation gets cut short, wait for
1411            // trimming to complete before proceeding.
1412            let avail = self
1413                .device_size
1414                .checked_sub(inner.unavailable_bytes().0)
1415                .ok_or(FxfsError::Inconsistent)?;
1416            let free_and_not_being_trimmed =
1417                inner.bytes_available_not_being_trimmed(self.device_size)?;
1418            if free_and_not_being_trimmed < std::cmp::min(len, avail) {
1419                debug_assert!(inner.trim_reserved_bytes > 0);
1420                trim_listener = std::mem::take(&mut inner.trim_listener);
1421            }
1422        }
1423
1424        if let Some(listener) = trim_listener {
1425            listener.await;
1426        }
1427
1428        let result = loop {
1429            {
1430                let mut inner = self.inner.lock();
1431
1432                // While we know rebuild_strategy and take_for_trim are not running (we hold guard),
1433                // apply temporary_allocation removals.
1434                for device_range in inner.dropped_temporary_allocations.drain(..) {
1435                    self.temporary_allocations.erase(&AllocatorKey { device_range });
1436                }
1437
1438                match inner.strategy.allocate(len) {
1439                    Err(FxfsError::NotFound) => {
1440                        // Overflow. Fall through and rebuild
1441                        inner.rebuild_strategy_trigger_histogram
1442                            [std::cmp::min(63, (len / self.block_size) as usize)] += 1;
1443                    }
1444                    Err(err) => {
1445                        error!(err:%; "Likely filesystem corruption.");
1446                        return Err(err.into());
1447                    }
1448                    Ok(x) => {
1449                        break x;
1450                    }
1451                }
1452            }
1453            // We've run out of extents of the requested length in RAM but there
1454            // exists more of this size on device. Rescan device and circle back.
1455            // We already hold the allocation_mutex, so exclusive access is guaranteed.
1456            if !self.rebuild_strategy().await? {
1457                error!("Cannot find additional free space. Corruption?");
1458                return Err(FxfsError::Inconsistent.into());
1459            }
1460        };
1461
1462        debug!(device_range:? = result; "allocate");
1463
1464        let len = result.length().unwrap();
1465        let reservation_owner = reservation.either(
1466            // Left means we got an outside reservation.
1467            |l| {
1468                l.forget_some(len);
1469                l.owner_object_id()
1470            },
1471            |r| {
1472                r.forget_some(len);
1473                r.owner_object_id()
1474            },
1475        );
1476
1477        {
1478            let mut inner = self.inner.lock();
1479            let owner_entry = inner.owner_bytes.entry(owner_object_id).or_default();
1480            owner_entry.uncommitted_allocated_bytes += len;
1481            // If the reservation has an owner, ensure they are the same.
1482            assert_eq!(owner_object_id, reservation_owner.unwrap_or(owner_object_id));
1483            inner.remove_reservation(reservation_owner, len);
1484            self.temporary_allocations.insert(AllocatorItem {
1485                key: AllocatorKey { device_range: result.clone() },
1486                value: AllocatorValue::Abs { owner_object_id, count: 1 },
1487                sequence: 0,
1488            })?;
1489        }
1490
1491        let mutation =
1492            AllocatorMutation::Allocate { device_range: result.clone().into(), owner_object_id };
1493        assert!(transaction.add(self.object_id(), Mutation::Allocator(mutation)).is_none());
1494
1495        Ok(result)
1496    }
1497
1498    /// Marks the given device range as allocated.  The main use case for this at this time is for
1499    /// the super-block which needs to be at a fixed location on the device.
1500    #[trace]
1501    pub fn mark_allocated(
1502        &self,
1503        transaction: &mut Transaction<'_>,
1504        owner_object_id: u64,
1505        device_range: Range<u64>,
1506    ) -> Result<(), Error> {
1507        debug_assert_ne!(owner_object_id, INVALID_OBJECT_ID);
1508        {
1509            let len = device_range.length().map_err(|_| FxfsError::InvalidArgs)?;
1510
1511            let mut inner = self.inner.lock();
1512            let device_used = inner.used_bytes();
1513            let owner_id_bytes_left = inner.owner_id_bytes_left(owner_object_id);
1514            let owner_entry = inner.owner_bytes.entry(owner_object_id).or_default();
1515            ensure!(
1516                device_range.end <= self.device_size
1517                    && (Saturating(self.device_size) - device_used).0 >= len
1518                    && owner_id_bytes_left >= len,
1519                FxfsError::NoSpace
1520            );
1521            if let Some(reservation) = &mut transaction.allocator_reservation {
1522                // The transaction takes ownership of this hold.
1523                reservation.reserve(len).ok_or(FxfsError::NoSpace)?.forget();
1524            }
1525            owner_entry.uncommitted_allocated_bytes += len;
1526            inner.strategy.remove(device_range.clone());
1527            self.temporary_allocations.insert(AllocatorItem {
1528                key: AllocatorKey { device_range: device_range.clone() },
1529                value: AllocatorValue::Abs { owner_object_id, count: 1 },
1530                sequence: 0,
1531            })?;
1532        }
1533        let mutation =
1534            AllocatorMutation::Allocate { device_range: device_range.into(), owner_object_id };
1535        transaction.add(self.object_id(), Mutation::Allocator(mutation));
1536        Ok(())
1537    }
1538
1539    /// Sets the limits for an owner object in terms of usage.
1540    pub fn set_bytes_limit(
1541        &self,
1542        transaction: &mut Transaction<'_>,
1543        owner_object_id: u64,
1544        bytes: u64,
1545    ) -> Result<(), Error> {
1546        // System stores cannot be given limits.
1547        assert!(!self.is_system_store(owner_object_id));
1548        transaction.add(
1549            self.object_id(),
1550            Mutation::Allocator(AllocatorMutation::SetLimit { owner_object_id, bytes }),
1551        );
1552        Ok(())
1553    }
1554
1555    /// Gets the bytes limit for an owner object.
1556    pub fn get_owner_bytes_limit(&self, owner_object_id: u64) -> Option<u64> {
1557        self.inner.lock().info.limit_bytes.get(&owner_object_id).copied()
1558    }
1559
1560    /// Gets the number of bytes currently used by allocations, reservations or uncommitted
1561    /// allocations.
1562    pub fn get_owner_bytes_used(&self, owner_object_id: u64) -> u64 {
1563        self.inner.lock().owner_bytes.get(&owner_object_id).map_or(0, |info| info.used_bytes().0)
1564    }
1565
1566    /// Deallocates the given device range for the specified object.
1567    #[trace]
1568    pub async fn deallocate(
1569        &self,
1570        transaction: &mut Transaction<'_>,
1571        owner_object_id: u64,
1572        dealloc_range: Range<u64>,
1573    ) -> Result<u64, Error> {
1574        debug!(device_range:? = dealloc_range; "deallocate");
1575        ensure!(dealloc_range.is_valid(), FxfsError::InvalidArgs);
1576        // We don't currently support sharing of allocations (value.count always equals 1), so as
1577        // long as we can assume the deallocated range is actually allocated, we can avoid device
1578        // access.
1579        let deallocated = dealloc_range.end - dealloc_range.start;
1580        let mutation = AllocatorMutation::Deallocate {
1581            device_range: dealloc_range.clone().into(),
1582            owner_object_id,
1583        };
1584        transaction.add(self.object_id(), Mutation::Allocator(mutation));
1585
1586        let _guard = self.allocation_mutex.lock().await;
1587
1588        // We use `dropped_temporary_allocations` to defer removals from `temporary_allocations` in
1589        // places where we can't execute async code or take locks.
1590        //
1591        // It's important we don't ever remove entries from `temporary_allocations` without
1592        // holding the `allocation_mutex` lock or else we may end up with an inconsistent view of
1593        // available disk space when we combine temporary_allocations with the LSMTree.
1594        // This is normally done in `allocate()` but we also need to apply these here because
1595        // `temporary_allocations` is also used to track deallocated space until it has been
1596        // flushed (see comment below). A user may allocate and then deallocate space before calling
1597        // allocate() a second time, so if we do not clean up here, we may end up with the same
1598        // range in temporary_allocations twice (once for allocate, once for deallocate).
1599        let mut inner = self.inner.lock();
1600        for device_range in inner.dropped_temporary_allocations.drain(..) {
1601            self.temporary_allocations.erase(&AllocatorKey { device_range });
1602        }
1603
1604        // We can't reuse deallocated space immediately because failure to successfully flush will
1605        // mean that on next mount, we may find this space is still assigned to the deallocated
1606        // region. To avoid immediate reuse, we hold these regions in 'temporary_allocations' until
1607        // after a successful flush so we know the region is safe to reuse.
1608        self.temporary_allocations
1609            .insert(AllocatorItem {
1610                key: AllocatorKey { device_range: dealloc_range.clone() },
1611                value: AllocatorValue::Abs { owner_object_id, count: 1 },
1612                sequence: 0,
1613            })
1614            .context("tracking deallocated")?;
1615
1616        Ok(deallocated)
1617    }
1618
1619    /// Marks allocations associated with a given |owner_object_id| for deletion.
1620    ///
1621    /// This is used as part of deleting encrypted volumes (ObjectStore) without having the keys.
1622    ///
1623    /// MarkForDeletion mutations eventually manipulates allocator metadata (AllocatorInfo) instead
1624    /// of the mutable layer but we must be careful not to do this too early and risk premature
1625    /// reuse of extents.
1626    ///
1627    /// Replay is not guaranteed until the *device* gets flushed, so we cannot reuse the deleted
1628    /// extents until we've flushed the device.
1629    ///
1630    /// TODO(b/316827348): Consider removing the use of mark_for_deletion in AllocatorInfo and
1631    /// just compacting?
1632    ///
1633    /// After an allocator.flush() (i.e. a major compaction), we know that there is no data left
1634    /// in the layer files for this owner_object_id and we are able to clear `marked_for_deletion`.
1635    pub fn mark_for_deletion(&self, transaction: &mut Transaction<'_>, owner_object_id: u64) {
1636        // Note that because the actual time of deletion (the next major compaction) is undefined,
1637        // |owner_object_id| should not be reused after this call.
1638        transaction.add(
1639            self.object_id(),
1640            Mutation::Allocator(AllocatorMutation::MarkForDeletion(owner_object_id)),
1641        );
1642    }
1643
1644    /// Called when the device has been flush and indicates what the journal log offset was when
1645    /// that happened.
1646    pub fn did_flush_device(&self, flush_log_offset: u64) {
1647        // First take out the deallocations that we now know to be flushed.  The list is maintained
1648        // in order, so we can stop on the first entry that we find that should not be unreserved
1649        // yet.
1650        #[allow(clippy::never_loop)] // Loop used as a for {} else {}.
1651        let deallocs = 'deallocs_outer: loop {
1652            let mut inner = self.inner.lock();
1653            for (index, dealloc) in inner.committed_deallocated.iter().enumerate() {
1654                if dealloc.log_file_offset >= flush_log_offset {
1655                    let mut deallocs = inner.committed_deallocated.split_off(index);
1656                    // Swap because we want the opposite of what split_off does.
1657                    std::mem::swap(&mut inner.committed_deallocated, &mut deallocs);
1658                    break 'deallocs_outer deallocs;
1659                }
1660            }
1661            break std::mem::take(&mut inner.committed_deallocated);
1662        };
1663
1664        // Now we can free those elements.
1665        let mut inner = self.inner.lock();
1666        let mut totals = BTreeMap::<u64, u64>::new();
1667        for dealloc in deallocs {
1668            *(totals.entry(dealloc.owner_object_id).or_default()) +=
1669                dealloc.range.length().unwrap();
1670            inner.strategy.free(dealloc.range.clone()).expect("dealloced ranges");
1671            self.temporary_allocations.erase(&AllocatorKey { device_range: dealloc.range.clone() });
1672        }
1673
1674        // This *must* come after we've removed the records from reserved reservations because the
1675        // allocator uses this value to decide whether or not a device-flush is required and it must
1676        // be possible to find free space if it thinks no device-flush is required.
1677        for (owner_object_id, total) in totals {
1678            match inner.owner_bytes.get_mut(&owner_object_id) {
1679                Some(counters) => counters.committed_deallocated_bytes -= total,
1680                None => panic!("Failed to decrement for unknown owner: {}", owner_object_id),
1681            }
1682        }
1683    }
1684
1685    /// Returns a reservation that can be used later, or None if there is insufficient space. The
1686    /// |owner_object_id| indicates which object in the root object store the reservation is for.
1687    pub fn reserve(
1688        self: Arc<Self>,
1689        owner_object_id: Option<u64>,
1690        amount: u64,
1691    ) -> Option<Reservation> {
1692        {
1693            let mut inner = self.inner.lock();
1694
1695            let device_free = (Saturating(self.device_size) - inner.used_bytes()).0;
1696
1697            let limit = match owner_object_id {
1698                Some(id) => std::cmp::min(inner.owner_id_bytes_left(id), device_free),
1699                None => device_free,
1700            };
1701            if limit < amount {
1702                return None;
1703            }
1704            inner.add_reservation(owner_object_id, amount);
1705        }
1706        Some(Reservation::new(self, owner_object_id, amount))
1707    }
1708
1709    /// Like reserve, but takes a callback is passed the |limit| and should return the amount,
1710    /// which can be zero.
1711    pub fn reserve_with(
1712        self: Arc<Self>,
1713        owner_object_id: Option<u64>,
1714        amount: impl FnOnce(u64) -> u64,
1715    ) -> Reservation {
1716        let amount = {
1717            let mut inner = self.inner.lock();
1718
1719            let device_free = (Saturating(self.device_size) - inner.used_bytes()).0;
1720
1721            let amount = amount(match owner_object_id {
1722                Some(id) => std::cmp::min(inner.owner_id_bytes_left(id), device_free),
1723                None => device_free,
1724            });
1725
1726            inner.add_reservation(owner_object_id, amount);
1727
1728            amount
1729        };
1730
1731        Reservation::new(self, owner_object_id, amount)
1732    }
1733
1734    /// Returns the total number of allocated bytes.
1735    pub fn get_allocated_bytes(&self) -> u64 {
1736        self.inner.lock().allocated_bytes().0
1737    }
1738
1739    /// Returns the size of bytes available to allocate.
1740    pub fn get_disk_bytes(&self) -> u64 {
1741        self.device_size
1742    }
1743
1744    /// Returns the total number of allocated bytes per owner_object_id.
1745    /// Note that this is quite an expensive operation as it copies the collection.
1746    /// This is intended for use in fsck() and friends, not general use code.
1747    pub fn get_owner_allocated_bytes(&self) -> BTreeMap<u64, u64> {
1748        self.inner.lock().owner_bytes.iter().map(|(k, v)| (*k, v.allocated_bytes.0)).collect()
1749    }
1750
1751    /// Returns the number of allocated and reserved bytes.
1752    pub fn get_used_bytes(&self) -> Saturating<u64> {
1753        let inner = self.inner.lock();
1754        inner.used_bytes()
1755    }
1756}
1757
1758impl ReservationOwner for Allocator {
1759    fn release_reservation(&self, owner_object_id: Option<u64>, amount: u64) {
1760        self.inner.lock().remove_reservation(owner_object_id, amount);
1761    }
1762}
1763
1764#[async_trait]
1765impl JournalingObject for Allocator {
1766    fn apply_mutation(
1767        &self,
1768        mutation: Mutation,
1769        context: &ApplyContext<'_, '_>,
1770        _assoc_obj: AssocObj<'_>,
1771    ) -> Result<(), Error> {
1772        match mutation {
1773            Mutation::Allocator(AllocatorMutation::MarkForDeletion(owner_object_id)) => {
1774                let mut inner = self.inner.lock();
1775                inner.owner_bytes.remove(&owner_object_id);
1776
1777                // We use `info.marked_for_deletion` to track the committed state and
1778                // `inner.marked_for_deletion` to track volumes marked for deletion *after* we have
1779                // flushed the device.  It is not safe to use extents belonging to deleted volumes
1780                // until after we have flushed the device.
1781                inner.info.marked_for_deletion.insert(owner_object_id);
1782                inner.volumes_deleted_pending_sync.insert(owner_object_id);
1783
1784                inner.info.limit_bytes.remove(&owner_object_id);
1785            }
1786            Mutation::Allocator(AllocatorMutation::Allocate { device_range, owner_object_id }) => {
1787                self.maximum_offset.fetch_max(device_range.end, Ordering::Relaxed);
1788                let item = AllocatorItem {
1789                    key: AllocatorKey { device_range: device_range.clone().into() },
1790                    value: AllocatorValue::Abs { count: 1, owner_object_id },
1791                    sequence: context.checkpoint.file_offset,
1792                };
1793                let len = item.key.device_range.length().unwrap();
1794                let lower_bound = item.key.lower_bound_for_merge_into();
1795                self.tree.merge_into(item, &lower_bound);
1796                let mut inner = self.inner.lock();
1797                let entry = inner.owner_bytes.entry(owner_object_id).or_default();
1798                entry.allocated_bytes += len;
1799                if let ApplyMode::Live(transaction) = context.mode {
1800                    entry.uncommitted_allocated_bytes -= len;
1801                    // Note that we cannot drop entries from temporary_allocations without holding
1802                    // the allocation_mutex as it may introduce races. We instead add the range to
1803                    // a Vec that can be applied later when we hold the lock (See comment on
1804                    // `dropped_temporary_allocations` above).
1805                    inner.dropped_temporary_allocations.push(device_range.into());
1806                    if let Some(reservation) = transaction.allocator_reservation {
1807                        reservation.commit(len);
1808                    }
1809                }
1810            }
1811            Mutation::Allocator(AllocatorMutation::Deallocate {
1812                device_range,
1813                owner_object_id,
1814            }) => {
1815                let item = AllocatorItem {
1816                    key: AllocatorKey { device_range: device_range.into() },
1817                    value: AllocatorValue::None,
1818                    sequence: context.checkpoint.file_offset,
1819                };
1820                let len = item.key.device_range.length().unwrap();
1821
1822                {
1823                    let mut inner = self.inner.lock();
1824                    {
1825                        let entry = inner.owner_bytes.entry(owner_object_id).or_default();
1826                        entry.allocated_bytes -= len;
1827                        if context.mode.is_live() {
1828                            entry.committed_deallocated_bytes += len;
1829                        }
1830                    }
1831                    if context.mode.is_live() {
1832                        inner.committed_deallocated.push_back(CommittedDeallocation {
1833                            log_file_offset: context.checkpoint.file_offset,
1834                            range: item.key.device_range.clone(),
1835                            owner_object_id,
1836                        });
1837                    }
1838                    if let ApplyMode::Live(Transaction {
1839                        allocator_reservation: Some(reservation),
1840                        ..
1841                    }) = context.mode
1842                    {
1843                        inner.add_reservation(reservation.owner_object_id(), len);
1844                        reservation.add(len);
1845                    }
1846                }
1847                let lower_bound = item.key.lower_bound_for_merge_into();
1848                self.tree.merge_into(item, &lower_bound);
1849            }
1850            Mutation::Allocator(AllocatorMutation::SetLimit { owner_object_id, bytes }) => {
1851                // Journal replay is ordered and each of these calls is idempotent. So the last one
1852                // will be respected, it doesn't matter if the value is already set, or gets changed
1853                // multiple times during replay. When it gets opened it will be merged in with the
1854                // snapshot.
1855                self.inner.lock().info.limit_bytes.insert(owner_object_id, bytes);
1856            }
1857            Mutation::BeginFlush => {
1858                self.tree.seal();
1859                // Transfer our running count for allocated_bytes so that it gets written to the new
1860                // info file when flush completes.
1861                let mut inner = self.inner.lock();
1862                let allocated_bytes =
1863                    inner.owner_bytes.iter().map(|(k, v)| (*k, v.allocated_bytes.0)).collect();
1864                inner.info.allocated_bytes = allocated_bytes;
1865            }
1866            Mutation::EndFlush => {}
1867            _ => bail!("unexpected mutation: {:?}", mutation),
1868        }
1869        Ok(())
1870    }
1871
1872    fn drop_mutation(&self, mutation: Mutation, transaction: &Transaction<'_>) {
1873        match mutation {
1874            Mutation::Allocator(AllocatorMutation::Allocate { device_range, owner_object_id }) => {
1875                let len = device_range.length().unwrap();
1876                let mut inner = self.inner.lock();
1877                inner
1878                    .owner_bytes
1879                    .entry(owner_object_id)
1880                    .or_default()
1881                    .uncommitted_allocated_bytes -= len;
1882                if let Some(reservation) = transaction.allocator_reservation {
1883                    let res_owner = reservation.owner_object_id();
1884                    inner.add_reservation(res_owner, len);
1885                    reservation.release_reservation(res_owner, len);
1886                }
1887                inner.strategy.free(device_range.clone().into()).expect("drop mutaton");
1888                self.temporary_allocations
1889                    .erase(&AllocatorKey { device_range: device_range.into() });
1890            }
1891            Mutation::Allocator(AllocatorMutation::Deallocate { device_range, .. }) => {
1892                self.temporary_allocations
1893                    .erase(&AllocatorKey { device_range: device_range.into() });
1894            }
1895            _ => {}
1896        }
1897    }
1898
1899    async fn flush(&self) -> Result<Version, Error> {
1900        let filesystem = self.filesystem.upgrade().unwrap();
1901        let object_manager = filesystem.object_manager();
1902        let earliest_version = self.tree.get_earliest_version();
1903        if !object_manager.needs_flush(self.object_id()) && earliest_version == LATEST_VERSION {
1904            // Early exit, but still return the earliest version used by a struct in the tree
1905            return Ok(earliest_version);
1906        }
1907
1908        let fs = self.filesystem.upgrade().unwrap();
1909        let mut flusher = Flusher::new(self, &fs).await;
1910        let (new_layer_file, info) = flusher.start().await?;
1911        flusher.finish(new_layer_file, info).await
1912    }
1913}
1914
1915// The merger is unable to merge extents that exist like the following:
1916//
1917//     |----- +1 -----|
1918//                    |----- -1 -----|
1919//                    |----- +2 -----|
1920//
1921// It cannot coalesce them because it has to emit the +1 record so that it can move on and merge the
1922// -1 and +2 records. To address this, we add another stage that applies after merging which
1923// coalesces records after they have been emitted.  This is a bit simpler than merging because the
1924// records cannot overlap, so it's just a question of merging adjacent records if they happen to
1925// have the same delta and object_id.
1926
1927pub struct CoalescingIterator<I> {
1928    iter: I,
1929    item: Option<AllocatorItem>,
1930}
1931
1932impl<I: LayerIterator<AllocatorKey, AllocatorValue>> CoalescingIterator<I> {
1933    pub async fn new(iter: I) -> Result<CoalescingIterator<I>, Error> {
1934        let mut iter = Self { iter, item: None };
1935        iter.advance().await?;
1936        Ok(iter)
1937    }
1938}
1939
1940#[async_trait]
1941impl<I: LayerIterator<AllocatorKey, AllocatorValue>> LayerIterator<AllocatorKey, AllocatorValue>
1942    for CoalescingIterator<I>
1943{
1944    async fn advance(&mut self) -> Result<(), Error> {
1945        self.item = self.iter.get().map(|x| x.cloned());
1946        if self.item.is_none() {
1947            return Ok(());
1948        }
1949        let left = self.item.as_mut().unwrap();
1950        loop {
1951            self.iter.advance().await?;
1952            match self.iter.get() {
1953                None => return Ok(()),
1954                Some(right) => {
1955                    // The two records cannot overlap.
1956                    ensure!(
1957                        left.key.device_range.end <= right.key.device_range.start,
1958                        FxfsError::Inconsistent
1959                    );
1960                    // We can only coalesce records if they are touching and have the same value.
1961                    if left.key.device_range.end < right.key.device_range.start
1962                        || left.value != *right.value
1963                    {
1964                        return Ok(());
1965                    }
1966                    left.key.device_range.end = right.key.device_range.end;
1967                }
1968            }
1969        }
1970    }
1971
1972    fn get(&self) -> Option<ItemRef<'_, AllocatorKey, AllocatorValue>> {
1973        self.item.as_ref().map(|x| x.as_item_ref())
1974    }
1975}
1976
1977struct Flusher<'a> {
1978    allocator: &'a Allocator,
1979    fs: &'a Arc<FxFilesystem>,
1980    _guard: WriteGuard<'a>,
1981}
1982
1983impl<'a> Flusher<'a> {
1984    async fn new(allocator: &'a Allocator, fs: &'a Arc<FxFilesystem>) -> Self {
1985        let keys = lock_keys![LockKey::flush(allocator.object_id())];
1986        Self { allocator, fs, _guard: fs.lock_manager().write_lock(keys).await }
1987    }
1988
1989    fn txn_options(allocator_reservation: &Reservation) -> Options<'_> {
1990        Options {
1991            skip_journal_checks: true,
1992            borrow_metadata_space: true,
1993            allocator_reservation: Some(allocator_reservation),
1994            ..Default::default()
1995        }
1996    }
1997
1998    async fn start(&mut self) -> Result<(DataObjectHandle<ObjectStore>, AllocatorInfo), Error> {
1999        let object_manager = self.fs.object_manager();
2000        let mut transaction = self
2001            .fs
2002            .clone()
2003            .new_transaction(lock_keys![], Self::txn_options(object_manager.metadata_reservation()))
2004            .await?;
2005
2006        let root_store = self.fs.root_store();
2007        let layer_object_handle = ObjectStore::create_object(
2008            &root_store,
2009            &mut transaction,
2010            HandleOptions { skip_journal_checks: true, ..Default::default() },
2011            None,
2012        )
2013        .await?;
2014        root_store.add_to_graveyard(&mut transaction, layer_object_handle.object_id());
2015        // It's important that this transaction does not include any allocations because we use
2016        // BeginFlush as a snapshot point for mutations to the tree: other allocator mutations
2017        // within this transaction might get applied before seal (which would be OK), but they could
2018        // equally get applied afterwards (since Transaction makes no guarantees about the order in
2019        // which mutations are applied whilst committing), in which case they'd get lost on replay
2020        // because the journal will only send mutations that follow this transaction.
2021        transaction.add(self.allocator.object_id(), Mutation::BeginFlush);
2022        let info = transaction
2023            .commit_with_callback(|_| {
2024                // We must capture `info` as it is when we start flushing. Subsequent transactions
2025                // can end up modifying `info` and we shouldn't capture those here.
2026                self.allocator.inner.lock().info.clone()
2027            })
2028            .await?;
2029        Ok((layer_object_handle, info))
2030    }
2031
2032    async fn finish(
2033        self,
2034        layer_object_handle: DataObjectHandle<ObjectStore>,
2035        mut info: AllocatorInfo,
2036    ) -> Result<Version, Error> {
2037        let object_manager = self.fs.object_manager();
2038        let txn_options = Self::txn_options(object_manager.metadata_reservation());
2039
2040        let layer_set = self.allocator.tree.immutable_layer_set();
2041        let total_len = layer_set.sum_len();
2042        {
2043            let mut merger = layer_set.merger();
2044            let iter = self.allocator.filter(merger.query(Query::FullScan).await?, true).await?;
2045            let iter = CoalescingIterator::new(iter).await?;
2046            self.allocator
2047                .tree
2048                .compact_with_iterator(
2049                    iter,
2050                    total_len,
2051                    DirectWriter::new(&layer_object_handle, txn_options).await,
2052                    layer_object_handle.block_size(),
2053                )
2054                .await?;
2055        }
2056
2057        let root_store = self.fs.root_store();
2058
2059        // Both of these forward-declared variables need to outlive the transaction.
2060        let object_handle;
2061        let reservation_update;
2062        let mut transaction = self
2063            .fs
2064            .clone()
2065            .new_transaction(
2066                lock_keys![LockKey::object(
2067                    root_store.store_object_id(),
2068                    self.allocator.object_id()
2069                )],
2070                txn_options,
2071            )
2072            .await?;
2073        let mut serialized_info = Vec::new();
2074
2075        debug!(oid = layer_object_handle.object_id(); "new allocator layer file");
2076        object_handle = ObjectStore::open_object(
2077            &root_store,
2078            self.allocator.object_id(),
2079            HandleOptions::default(),
2080            None,
2081        )
2082        .await?;
2083
2084        // Move all the existing layers to the graveyard.
2085        for object_id in &info.layers {
2086            root_store.add_to_graveyard(&mut transaction, *object_id);
2087        }
2088
2089        // Write out updated info.
2090
2091        // After successfully flushing, all the stores that were marked for deletion at the time of
2092        // the BeginFlush transaction, no longer need to be marked for deletion.  There can be
2093        // stores that have been deleted since the BeginFlush transaction, but they will be covered
2094        // by a MarkForDeletion mutation.
2095        let marked_for_deletion = std::mem::take(&mut info.marked_for_deletion);
2096
2097        info.layers = vec![layer_object_handle.object_id()];
2098
2099        info.serialize_with_version(&mut serialized_info)?;
2100
2101        let mut buf = object_handle.allocate_buffer(serialized_info.len()).await;
2102        buf.as_mut_slice()[..serialized_info.len()].copy_from_slice(&serialized_info[..]);
2103        object_handle.txn_write(&mut transaction, 0u64, buf.as_ref()).await?;
2104
2105        reservation_update = ReservationUpdate::new(tree::reservation_amount_from_layer_size(
2106            layer_object_handle.get_size(),
2107        ));
2108
2109        // It's important that EndFlush is in the same transaction that we write AllocatorInfo,
2110        // because we use EndFlush to make the required adjustments to allocated_bytes.
2111        transaction.add_with_object(
2112            self.allocator.object_id(),
2113            Mutation::EndFlush,
2114            AssocObj::Borrowed(&reservation_update),
2115        );
2116        root_store.remove_from_graveyard(&mut transaction, layer_object_handle.object_id());
2117
2118        let layers = layers_from_handles([layer_object_handle]).await?;
2119        transaction
2120            .commit_with_callback(|_| {
2121                self.allocator.tree.set_layers(layers);
2122
2123                // At this point we've committed the new layers to disk so we can start using them.
2124                // This means we can also switch to the new AllocatorInfo which clears
2125                // marked_for_deletion.
2126                let mut inner = self.allocator.inner.lock();
2127                inner.info.layers = info.layers;
2128                for owner_id in marked_for_deletion {
2129                    inner.marked_for_deletion.remove(&owner_id);
2130                    inner.info.marked_for_deletion.remove(&owner_id);
2131                }
2132            })
2133            .await?;
2134
2135        // Now close the layers and purge them.
2136        for layer in layer_set.layers {
2137            let object_id = layer.handle().map(|h| h.object_id());
2138            layer.close_layer().await;
2139            if let Some(object_id) = object_id {
2140                root_store.tombstone_object(object_id, txn_options).await?;
2141            }
2142        }
2143
2144        let mut counters = self.allocator.counters.lock();
2145        counters.num_flushes += 1;
2146        counters.last_flush_time = Some(std::time::SystemTime::now());
2147        // Return the earliest version used by a struct in the tree
2148        Ok(self.allocator.tree.get_earliest_version())
2149    }
2150}
2151
2152#[cfg(test)]
2153mod tests {
2154    use crate::filesystem::{
2155        FxFilesystem, FxFilesystemBuilder, JournalingObject, OpenFxFilesystem,
2156    };
2157    use crate::fsck::fsck;
2158    use crate::lsm_tree::cache::NullCache;
2159    use crate::lsm_tree::skip_list_layer::SkipListLayer;
2160    use crate::lsm_tree::types::{FuzzyHash as _, Item, ItemRef, Layer, LayerIterator};
2161    use crate::lsm_tree::{LSMTree, Query};
2162    use crate::object_handle::ObjectHandle;
2163    use crate::object_store::allocator::merge::merge;
2164    use crate::object_store::allocator::{
2165        Allocator, AllocatorKey, AllocatorValue, CoalescingIterator,
2166    };
2167    use crate::object_store::transaction::{Options, TRANSACTION_METADATA_MAX_AMOUNT, lock_keys};
2168    use crate::object_store::volume::root_volume;
2169    use crate::object_store::{Directory, LockKey, NewChildStoreOptions, ObjectStore};
2170    use crate::range::RangeExt;
2171    use crate::round::round_up;
2172    use crate::testing;
2173    use fuchsia_async as fasync;
2174    use fuchsia_sync::Mutex;
2175    use std::cmp::{max, min};
2176    use std::ops::{Bound, Range};
2177    use std::sync::Arc;
2178    use storage_device::DeviceHolder;
2179    use storage_device::fake_device::FakeDevice;
2180
2181    #[test]
2182    fn test_allocator_key_is_range_based() {
2183        // Make sure we disallow using allocator keys with point queries.
2184        assert!(AllocatorKey { device_range: 0..100 }.is_range_key());
2185    }
2186
2187    #[fuchsia::test]
2188    async fn test_coalescing_iterator() {
2189        let skip_list = SkipListLayer::new(100);
2190        let items = [
2191            Item::new(
2192                AllocatorKey { device_range: 0..100 },
2193                AllocatorValue::Abs { count: 1, owner_object_id: 99 },
2194            ),
2195            Item::new(
2196                AllocatorKey { device_range: 100..200 },
2197                AllocatorValue::Abs { count: 1, owner_object_id: 99 },
2198            ),
2199        ];
2200        skip_list.insert(items[1].clone()).expect("insert error");
2201        skip_list.insert(items[0].clone()).expect("insert error");
2202        let mut iter =
2203            CoalescingIterator::new(skip_list.seek(Bound::Unbounded).await.expect("seek failed"))
2204                .await
2205                .expect("new failed");
2206        let ItemRef { key, value, .. } = iter.get().expect("get failed");
2207        assert_eq!(
2208            (key, value),
2209            (
2210                &AllocatorKey { device_range: 0..200 },
2211                &AllocatorValue::Abs { count: 1, owner_object_id: 99 }
2212            )
2213        );
2214        iter.advance().await.expect("advance failed");
2215        assert!(iter.get().is_none());
2216    }
2217
2218    #[fuchsia::test]
2219    async fn test_merge_and_coalesce_across_three_layers() {
2220        let lsm_tree = LSMTree::new(merge, Box::new(NullCache {}));
2221        lsm_tree
2222            .insert(Item::new(
2223                AllocatorKey { device_range: 100..200 },
2224                AllocatorValue::Abs { count: 1, owner_object_id: 99 },
2225            ))
2226            .expect("insert error");
2227        lsm_tree.seal();
2228        lsm_tree
2229            .insert(Item::new(
2230                AllocatorKey { device_range: 0..100 },
2231                AllocatorValue::Abs { count: 1, owner_object_id: 99 },
2232            ))
2233            .expect("insert error");
2234
2235        let layer_set = lsm_tree.layer_set();
2236        let mut merger = layer_set.merger();
2237        let mut iter =
2238            CoalescingIterator::new(merger.query(Query::FullScan).await.expect("seek failed"))
2239                .await
2240                .expect("new failed");
2241        let ItemRef { key, value, .. } = iter.get().expect("get failed");
2242        assert_eq!(
2243            (key, value),
2244            (
2245                &AllocatorKey { device_range: 0..200 },
2246                &AllocatorValue::Abs { count: 1, owner_object_id: 99 }
2247            )
2248        );
2249        iter.advance().await.expect("advance failed");
2250        assert!(iter.get().is_none());
2251    }
2252
2253    #[fuchsia::test]
2254    async fn test_merge_and_coalesce_wont_merge_across_object_id() {
2255        let lsm_tree = LSMTree::new(merge, Box::new(NullCache {}));
2256        lsm_tree
2257            .insert(Item::new(
2258                AllocatorKey { device_range: 100..200 },
2259                AllocatorValue::Abs { count: 1, owner_object_id: 99 },
2260            ))
2261            .expect("insert error");
2262        lsm_tree.seal();
2263        lsm_tree
2264            .insert(Item::new(
2265                AllocatorKey { device_range: 0..100 },
2266                AllocatorValue::Abs { count: 1, owner_object_id: 98 },
2267            ))
2268            .expect("insert error");
2269
2270        let layer_set = lsm_tree.layer_set();
2271        let mut merger = layer_set.merger();
2272        let mut iter =
2273            CoalescingIterator::new(merger.query(Query::FullScan).await.expect("seek failed"))
2274                .await
2275                .expect("new failed");
2276        let ItemRef { key, value, .. } = iter.get().expect("get failed");
2277        assert_eq!(
2278            (key, value),
2279            (
2280                &AllocatorKey { device_range: 0..100 },
2281                &AllocatorValue::Abs { count: 1, owner_object_id: 98 },
2282            )
2283        );
2284        iter.advance().await.expect("advance failed");
2285        let ItemRef { key, value, .. } = iter.get().expect("get failed");
2286        assert_eq!(
2287            (key, value),
2288            (
2289                &AllocatorKey { device_range: 100..200 },
2290                &AllocatorValue::Abs { count: 1, owner_object_id: 99 }
2291            )
2292        );
2293        iter.advance().await.expect("advance failed");
2294        assert!(iter.get().is_none());
2295    }
2296
2297    fn overlap(a: &Range<u64>, b: &Range<u64>) -> u64 {
2298        if a.end > b.start && a.start < b.end {
2299            min(a.end, b.end) - max(a.start, b.start)
2300        } else {
2301            0
2302        }
2303    }
2304
2305    async fn collect_allocations(allocator: &Allocator) -> Vec<Range<u64>> {
2306        let layer_set = allocator.tree.layer_set();
2307        let mut merger = layer_set.merger();
2308        let mut iter = allocator
2309            .filter(merger.query(Query::FullScan).await.expect("seek failed"), false)
2310            .await
2311            .expect("build iterator");
2312        let mut allocations: Vec<Range<u64>> = Vec::new();
2313        while let Some(ItemRef { key: AllocatorKey { device_range }, .. }) = iter.get() {
2314            if let Some(r) = allocations.last() {
2315                assert!(device_range.start >= r.end);
2316            }
2317            allocations.push(device_range.clone());
2318            iter.advance().await.expect("advance failed");
2319        }
2320        allocations
2321    }
2322
2323    async fn check_allocations(allocator: &Allocator, expected_allocations: &[Range<u64>]) {
2324        let layer_set = allocator.tree.layer_set();
2325        let mut merger = layer_set.merger();
2326        let mut iter = allocator
2327            .filter(merger.query(Query::FullScan).await.expect("seek failed"), false)
2328            .await
2329            .expect("build iterator");
2330        let mut found = 0;
2331        while let Some(ItemRef { key: AllocatorKey { device_range }, .. }) = iter.get() {
2332            let mut l = device_range.length().expect("Invalid range");
2333            found += l;
2334            // Make sure that the entire range we have found completely overlaps with all the
2335            // allocations we expect to find.
2336            for range in expected_allocations {
2337                l -= overlap(range, device_range);
2338                if l == 0 {
2339                    break;
2340                }
2341            }
2342            assert_eq!(l, 0, "range {device_range:?} not covered by expectations");
2343            iter.advance().await.expect("advance failed");
2344        }
2345        // Make sure the total we found adds up to what we expect.
2346        assert_eq!(found, expected_allocations.iter().map(|r| r.length().unwrap()).sum::<u64>());
2347    }
2348
2349    async fn test_fs() -> (OpenFxFilesystem, Arc<Allocator>) {
2350        let device = DeviceHolder::new(FakeDevice::new(4096, 4096));
2351        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2352        let allocator = fs.allocator();
2353        (fs, allocator)
2354    }
2355
2356    #[fuchsia::test]
2357    async fn test_allocations() {
2358        const STORE_OBJECT_ID: u64 = 99;
2359        let (fs, allocator) = test_fs().await;
2360        let mut transaction =
2361            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2362        let mut device_ranges = collect_allocations(&allocator).await;
2363
2364        // Expected extents:
2365        let expected = vec![
2366            0..4096,        // Superblock A (4k)
2367            4096..139264,   // root_store layer files, StoreInfo.. (33x4k blocks)
2368            139264..204800, // Superblock A extension (64k)
2369            204800..335872, // Initial Journal (128k)
2370            335872..401408, // Superblock B extension (64k)
2371            524288..528384, // Superblock B (4k)
2372        ];
2373        assert_eq!(device_ranges, expected);
2374        device_ranges.push(
2375            allocator
2376                .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2377                .await
2378                .expect("allocate failed"),
2379        );
2380        assert_eq!(device_ranges.last().unwrap().length().expect("Invalid range"), fs.block_size());
2381        device_ranges.push(
2382            allocator
2383                .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2384                .await
2385                .expect("allocate failed"),
2386        );
2387        assert_eq!(device_ranges.last().unwrap().length().expect("Invalid range"), fs.block_size());
2388        assert_eq!(overlap(&device_ranges[0], &device_ranges[1]), 0);
2389        transaction.commit().await.expect("commit failed");
2390        let mut transaction =
2391            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2392        device_ranges.push(
2393            allocator
2394                .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2395                .await
2396                .expect("allocate failed"),
2397        );
2398        assert_eq!(device_ranges[7].length().expect("Invalid range"), fs.block_size());
2399        assert_eq!(overlap(&device_ranges[5], &device_ranges[7]), 0);
2400        assert_eq!(overlap(&device_ranges[6], &device_ranges[7]), 0);
2401        transaction.commit().await.expect("commit failed");
2402
2403        check_allocations(&allocator, &device_ranges).await;
2404    }
2405
2406    #[fuchsia::test]
2407    async fn test_allocate_more_than_max_size() {
2408        const STORE_OBJECT_ID: u64 = 99;
2409        let (fs, allocator) = test_fs().await;
2410        let mut transaction =
2411            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2412        let mut device_ranges = collect_allocations(&allocator).await;
2413        device_ranges.push(
2414            allocator
2415                .allocate(&mut transaction, STORE_OBJECT_ID, fs.device().size())
2416                .await
2417                .expect("allocate failed"),
2418        );
2419        assert_eq!(
2420            device_ranges.last().unwrap().length().expect("Invalid range"),
2421            allocator.max_extent_size_bytes
2422        );
2423        transaction.commit().await.expect("commit failed");
2424
2425        check_allocations(&allocator, &device_ranges).await;
2426    }
2427
2428    #[fuchsia::test]
2429    async fn test_deallocations() {
2430        const STORE_OBJECT_ID: u64 = 99;
2431        let (fs, allocator) = test_fs().await;
2432        let initial_allocations = collect_allocations(&allocator).await;
2433
2434        let mut transaction =
2435            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2436        let device_range1 = allocator
2437            .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2438            .await
2439            .expect("allocate failed");
2440        assert_eq!(device_range1.length().expect("Invalid range"), fs.block_size());
2441        transaction.commit().await.expect("commit failed");
2442
2443        let mut transaction =
2444            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2445        allocator
2446            .deallocate(&mut transaction, STORE_OBJECT_ID, device_range1)
2447            .await
2448            .expect("deallocate failed");
2449        transaction.commit().await.expect("commit failed");
2450
2451        check_allocations(&allocator, &initial_allocations).await;
2452    }
2453
2454    #[fuchsia::test]
2455    async fn test_mark_allocated() {
2456        const STORE_OBJECT_ID: u64 = 99;
2457        let (fs, allocator) = test_fs().await;
2458        let mut device_ranges = collect_allocations(&allocator).await;
2459        let range = {
2460            let mut transaction = fs
2461                .clone()
2462                .new_transaction(lock_keys![], Options::default())
2463                .await
2464                .expect("new failed");
2465            // First, allocate 2 blocks.
2466            allocator
2467                .allocate(&mut transaction, STORE_OBJECT_ID, 2 * fs.block_size())
2468                .await
2469                .expect("allocate failed")
2470            // Let the transaction drop.
2471        };
2472
2473        let mut transaction =
2474            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2475
2476        // If we allocate 1 block, the two blocks that were allocated earlier should be available,
2477        // and this should return the first of them.
2478        device_ranges.push(
2479            allocator
2480                .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2481                .await
2482                .expect("allocate failed"),
2483        );
2484
2485        assert_eq!(device_ranges.last().unwrap().start, range.start);
2486
2487        // Mark the second block as allocated.
2488        let mut range2 = range.clone();
2489        range2.start += fs.block_size();
2490        allocator
2491            .mark_allocated(&mut transaction, STORE_OBJECT_ID, range2.clone())
2492            .expect("mark_allocated failed");
2493        device_ranges.push(range2);
2494
2495        // This should avoid the range we marked as allocated.
2496        device_ranges.push(
2497            allocator
2498                .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2499                .await
2500                .expect("allocate failed"),
2501        );
2502        let last_range = device_ranges.last().unwrap();
2503        assert_eq!(last_range.length().expect("Invalid range"), fs.block_size());
2504        assert_eq!(overlap(last_range, &range), 0);
2505        transaction.commit().await.expect("commit failed");
2506
2507        check_allocations(&allocator, &device_ranges).await;
2508    }
2509
2510    #[fuchsia::test]
2511    async fn test_mark_for_deletion() {
2512        const STORE_OBJECT_ID: u64 = 99;
2513        let (fs, allocator) = test_fs().await;
2514
2515        // Allocate some stuff.
2516        let initial_allocated_bytes = allocator.get_allocated_bytes();
2517        let mut device_ranges = collect_allocations(&allocator).await;
2518        let mut transaction =
2519            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2520        // Note we have a cap on individual allocation length so we allocate over multiple mutation.
2521        for _ in 0..15 {
2522            device_ranges.push(
2523                allocator
2524                    .allocate(&mut transaction, STORE_OBJECT_ID, 100 * fs.block_size())
2525                    .await
2526                    .expect("allocate failed"),
2527            );
2528            device_ranges.push(
2529                allocator
2530                    .allocate(&mut transaction, STORE_OBJECT_ID, 100 * fs.block_size())
2531                    .await
2532                    .expect("allocate2 failed"),
2533            );
2534        }
2535        transaction.commit().await.expect("commit failed");
2536        check_allocations(&allocator, &device_ranges).await;
2537
2538        assert_eq!(
2539            allocator.get_allocated_bytes(),
2540            initial_allocated_bytes + fs.block_size() * 3000
2541        );
2542
2543        // Mark for deletion.
2544        let mut transaction =
2545            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2546        allocator.mark_for_deletion(&mut transaction, STORE_OBJECT_ID);
2547        transaction.commit().await.expect("commit failed");
2548
2549        // Expect that allocated bytes is updated immediately but device ranges are still allocated.
2550        assert_eq!(allocator.get_allocated_bytes(), initial_allocated_bytes);
2551        check_allocations(&allocator, &device_ranges).await;
2552
2553        // Allocate more space than we have until we deallocate the mark_for_deletion space.
2554        // This should force a flush on allocate(). (1500 * 3 > test_fs size of 4096 blocks).
2555        device_ranges.clear();
2556
2557        let mut transaction =
2558            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2559        let target_bytes = 1500 * fs.block_size();
2560        while device_ranges.iter().map(|x| x.length().unwrap()).sum::<u64>() != target_bytes {
2561            let len = std::cmp::min(
2562                target_bytes - device_ranges.iter().map(|x| x.length().unwrap()).sum::<u64>(),
2563                100 * fs.block_size(),
2564            );
2565            device_ranges.push(
2566                allocator.allocate(&mut transaction, 100, len).await.expect("allocate failed"),
2567            );
2568        }
2569        transaction.commit().await.expect("commit failed");
2570
2571        // Have the deleted ranges cleaned up.
2572        allocator.flush().await.expect("flush failed");
2573
2574        // The flush above seems to trigger an allocation for the allocator itself.
2575        // We will just check that we have the right size for the owner we care about.
2576
2577        assert!(!allocator.get_owner_allocated_bytes().contains_key(&STORE_OBJECT_ID));
2578        assert_eq!(*allocator.get_owner_allocated_bytes().get(&100).unwrap(), target_bytes,);
2579    }
2580
2581    async fn create_file(store: &Arc<ObjectStore>, size: usize) {
2582        let root_directory =
2583            Directory::open(store, store.root_directory_object_id()).await.expect("open failed");
2584
2585        let mut transaction = store
2586            .filesystem()
2587            .new_transaction(
2588                lock_keys![LockKey::object(
2589                    store.store_object_id(),
2590                    store.root_directory_object_id()
2591                )],
2592                Options::default(),
2593            )
2594            .await
2595            .expect("new_transaction failed");
2596        let file = root_directory
2597            .create_child_file(&mut transaction, &format!("foo {}", size))
2598            .await
2599            .expect("create_child_file failed");
2600        transaction.commit().await.expect("commit failed");
2601
2602        let buffer = file.allocate_buffer(size).await;
2603
2604        // Append some data to it.
2605        let mut transaction = file
2606            .new_transaction_with_options(Options {
2607                borrow_metadata_space: true,
2608                ..Default::default()
2609            })
2610            .await
2611            .expect("new_transaction_with_options failed");
2612        file.txn_write(&mut transaction, 0, buffer.as_ref()).await.expect("txn_write failed");
2613        transaction.commit().await.expect("commit failed");
2614    }
2615
2616    #[fuchsia::test]
2617    async fn test_replay_with_deleted_store_and_compaction() {
2618        let (fs, _) = test_fs().await;
2619
2620        const FILE_SIZE: usize = 10_000_000;
2621
2622        let mut store_id = {
2623            let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2624            let store = root_vol
2625                .new_volume("vol", NewChildStoreOptions::default())
2626                .await
2627                .expect("new_volume failed");
2628
2629            create_file(&store, FILE_SIZE).await;
2630            store.store_object_id()
2631        };
2632
2633        fs.close().await.expect("close failed");
2634        let device = fs.take_device().await;
2635        device.reopen(false);
2636
2637        let mut fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2638
2639        // Compact so that when we replay the transaction to delete the store won't find any
2640        // mutations.
2641        fs.journal().compact().await.expect("compact failed");
2642
2643        for _ in 0..2 {
2644            {
2645                let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2646
2647                let transaction = fs
2648                    .clone()
2649                    .new_transaction(
2650                        lock_keys![
2651                            LockKey::object(
2652                                root_vol.volume_directory().store().store_object_id(),
2653                                root_vol.volume_directory().object_id(),
2654                            ),
2655                            LockKey::flush(store_id)
2656                        ],
2657                        Options { borrow_metadata_space: true, ..Default::default() },
2658                    )
2659                    .await
2660                    .expect("new_transaction failed");
2661                root_vol
2662                    .delete_volume("vol", transaction, || {})
2663                    .await
2664                    .expect("delete_volume failed");
2665
2666                let store = root_vol
2667                    .new_volume("vol", NewChildStoreOptions::default())
2668                    .await
2669                    .expect("new_volume failed");
2670                create_file(&store, FILE_SIZE).await;
2671                store_id = store.store_object_id();
2672            }
2673
2674            fs.close().await.expect("close failed");
2675            let device = fs.take_device().await;
2676            device.reopen(false);
2677
2678            fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2679        }
2680
2681        fsck(fs.clone()).await.expect("fsck failed");
2682        fs.close().await.expect("close failed");
2683    }
2684
2685    #[fuchsia::test(threads = 4)]
2686    async fn test_compaction_delete_race() {
2687        let (fs, _allocator) = test_fs().await;
2688
2689        {
2690            const FILE_SIZE: usize = 10_000_000;
2691
2692            let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2693            let store = root_vol
2694                .new_volume("vol", NewChildStoreOptions::default())
2695                .await
2696                .expect("new_volume failed");
2697
2698            create_file(&store, FILE_SIZE).await;
2699
2700            // Race compaction with deleting a store.
2701            let fs_clone = fs.clone();
2702
2703            // Even though the executor has 4 threads, it's hard to get it to run with
2704            // multiple threads.
2705            let executor_tasks = testing::force_executor_threads_to_run(4).await;
2706
2707            let task = fasync::Task::spawn(async move {
2708                fs_clone.journal().compact().await.expect("compact failed");
2709            });
2710
2711            // We don't need the executor tasks any more.
2712            drop(executor_tasks);
2713
2714            // This range is chosen such that it caused this test to fail after quite a low number
2715            // of iterations for the bug that this test was introduced for.
2716            let sleep = rand::random_range(3000..6000);
2717            std::thread::sleep(std::time::Duration::from_micros(sleep));
2718            log::info!("sleep {sleep}us");
2719
2720            let transaction = fs
2721                .clone()
2722                .new_transaction(
2723                    lock_keys![
2724                        LockKey::object(
2725                            root_vol.volume_directory().store().store_object_id(),
2726                            root_vol.volume_directory().object_id(),
2727                        ),
2728                        LockKey::flush(store.store_object_id())
2729                    ],
2730                    Options { borrow_metadata_space: true, ..Default::default() },
2731                )
2732                .await
2733                .expect("new_transaction failed");
2734            root_vol.delete_volume("vol", transaction, || {}).await.expect("delete_volume failed");
2735
2736            task.await;
2737        }
2738
2739        fs.journal().compact().await.expect("compact failed");
2740        fs.close().await.expect("close failed");
2741
2742        let device = fs.take_device().await;
2743        device.reopen(false);
2744
2745        let fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2746        fsck(fs.clone()).await.expect("fsck failed");
2747        fs.close().await.expect("close failed");
2748    }
2749
2750    #[fuchsia::test]
2751    async fn test_delete_multiple_volumes() {
2752        let (mut fs, _) = test_fs().await;
2753
2754        for _ in 0..50 {
2755            {
2756                let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2757                let store = root_vol
2758                    .new_volume("vol", NewChildStoreOptions::default())
2759                    .await
2760                    .expect("new_volume failed");
2761
2762                create_file(&store, 1_000_000).await;
2763
2764                let transaction = fs
2765                    .clone()
2766                    .new_transaction(
2767                        lock_keys![
2768                            LockKey::object(
2769                                root_vol.volume_directory().store().store_object_id(),
2770                                root_vol.volume_directory().object_id(),
2771                            ),
2772                            LockKey::flush(store.store_object_id())
2773                        ],
2774                        Options { borrow_metadata_space: true, ..Default::default() },
2775                    )
2776                    .await
2777                    .expect("new_transaction failed");
2778                root_vol
2779                    .delete_volume("vol", transaction, || {})
2780                    .await
2781                    .expect("delete_volume failed");
2782
2783                fs.allocator().flush().await.expect("flush failed");
2784            }
2785
2786            fs.close().await.expect("close failed");
2787            let device = fs.take_device().await;
2788            device.reopen(false);
2789
2790            fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2791        }
2792
2793        fsck(fs.clone()).await.expect("fsck failed");
2794        fs.close().await.expect("close failed");
2795    }
2796
2797    #[fuchsia::test]
2798    async fn test_allocate_free_reallocate() {
2799        const STORE_OBJECT_ID: u64 = 99;
2800        let (fs, allocator) = test_fs().await;
2801
2802        // Allocate some stuff.
2803        let mut device_ranges = Vec::new();
2804        let mut transaction =
2805            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2806        for _ in 0..30 {
2807            device_ranges.push(
2808                allocator
2809                    .allocate(&mut transaction, STORE_OBJECT_ID, 100 * fs.block_size())
2810                    .await
2811                    .expect("allocate failed"),
2812            );
2813        }
2814        transaction.commit().await.expect("commit failed");
2815
2816        assert_eq!(
2817            fs.block_size() * 3000,
2818            *allocator.get_owner_allocated_bytes().entry(STORE_OBJECT_ID).or_default()
2819        );
2820
2821        // Delete it all.
2822        let mut transaction =
2823            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2824        for range in std::mem::replace(&mut device_ranges, Vec::new()) {
2825            allocator.deallocate(&mut transaction, STORE_OBJECT_ID, range).await.expect("dealloc");
2826        }
2827        transaction.commit().await.expect("commit failed");
2828
2829        assert_eq!(0, *allocator.get_owner_allocated_bytes().entry(STORE_OBJECT_ID).or_default());
2830
2831        // Allocate some more stuff. Due to storage pressure, this requires us to flush device
2832        // before reusing the above space
2833        let mut transaction =
2834            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2835        let target_len = 1500 * fs.block_size();
2836        while device_ranges.iter().map(|i| i.length().unwrap()).sum::<u64>() != target_len {
2837            let len = target_len - device_ranges.iter().map(|i| i.length().unwrap()).sum::<u64>();
2838            device_ranges.push(
2839                allocator
2840                    .allocate(&mut transaction, STORE_OBJECT_ID, len)
2841                    .await
2842                    .expect("allocate failed"),
2843            );
2844        }
2845        transaction.commit().await.expect("commit failed");
2846
2847        assert_eq!(
2848            fs.block_size() * 1500,
2849            *allocator.get_owner_allocated_bytes().entry(STORE_OBJECT_ID).or_default()
2850        );
2851    }
2852
2853    #[fuchsia::test]
2854    async fn test_flush() {
2855        const STORE_OBJECT_ID: u64 = 99;
2856
2857        let mut device_ranges = Vec::new();
2858        let device = {
2859            let (fs, allocator) = test_fs().await;
2860            let mut transaction = fs
2861                .clone()
2862                .new_transaction(lock_keys![], Options::default())
2863                .await
2864                .expect("new failed");
2865            device_ranges.push(
2866                allocator
2867                    .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2868                    .await
2869                    .expect("allocate failed"),
2870            );
2871            device_ranges.push(
2872                allocator
2873                    .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2874                    .await
2875                    .expect("allocate failed"),
2876            );
2877            device_ranges.push(
2878                allocator
2879                    .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2880                    .await
2881                    .expect("allocate failed"),
2882            );
2883            transaction.commit().await.expect("commit failed");
2884
2885            allocator.flush().await.expect("flush failed");
2886
2887            fs.close().await.expect("close failed");
2888            fs.take_device().await
2889        };
2890
2891        device.reopen(false);
2892        let fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2893        let allocator = fs.allocator();
2894
2895        let allocated = collect_allocations(&allocator).await;
2896
2897        // Make sure the ranges we allocated earlier are still allocated.
2898        for i in &device_ranges {
2899            let mut overlapping = 0;
2900            for j in &allocated {
2901                overlapping += overlap(i, j);
2902            }
2903            assert_eq!(overlapping, i.length().unwrap(), "Range {i:?} not allocated");
2904        }
2905
2906        let mut transaction =
2907            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2908        let range = allocator
2909            .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2910            .await
2911            .expect("allocate failed");
2912
2913        // Make sure the range just allocated doesn't overlap any other allocated ranges.
2914        for r in &allocated {
2915            assert_eq!(overlap(r, &range), 0);
2916        }
2917        transaction.commit().await.expect("commit failed");
2918    }
2919
2920    #[fuchsia::test]
2921    async fn test_dropped_transaction() {
2922        const STORE_OBJECT_ID: u64 = 99;
2923        let (fs, allocator) = test_fs().await;
2924        let allocated_range = {
2925            let mut transaction = fs
2926                .clone()
2927                .new_transaction(lock_keys![], Options::default())
2928                .await
2929                .expect("new_transaction failed");
2930            allocator
2931                .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2932                .await
2933                .expect("allocate failed")
2934        };
2935        // After dropping the transaction and attempting to allocate again, we should end up with
2936        // the same range because the reservation should have been released.
2937        let mut transaction = fs
2938            .clone()
2939            .new_transaction(lock_keys![], Options::default())
2940            .await
2941            .expect("new_transaction failed");
2942        assert_eq!(
2943            allocator
2944                .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2945                .await
2946                .expect("allocate failed"),
2947            allocated_range
2948        );
2949    }
2950
2951    #[fuchsia::test]
2952    async fn test_cleanup_removed_owner() {
2953        const STORE_OBJECT_ID: u64 = 99;
2954        let device = {
2955            let (fs, allocator) = test_fs().await;
2956
2957            assert!(!allocator.get_owner_allocated_bytes().contains_key(&STORE_OBJECT_ID));
2958            {
2959                let mut transaction =
2960                    fs.clone().new_transaction(lock_keys![], Options::default()).await.unwrap();
2961                allocator
2962                    .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2963                    .await
2964                    .expect("Allocating");
2965                transaction.commit().await.expect("Committing.");
2966            }
2967            allocator.flush().await.expect("Flushing");
2968            assert!(allocator.get_owner_allocated_bytes().contains_key(&STORE_OBJECT_ID));
2969            {
2970                let mut transaction =
2971                    fs.clone().new_transaction(lock_keys![], Options::default()).await.unwrap();
2972                allocator.mark_for_deletion(&mut transaction, STORE_OBJECT_ID);
2973                transaction.commit().await.expect("Committing.");
2974            }
2975            assert!(!allocator.get_owner_allocated_bytes().contains_key(&STORE_OBJECT_ID));
2976            fs.close().await.expect("Closing");
2977            fs.take_device().await
2978        };
2979
2980        device.reopen(false);
2981        let fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2982        let allocator = fs.allocator();
2983        assert!(!allocator.get_owner_allocated_bytes().contains_key(&STORE_OBJECT_ID));
2984    }
2985
2986    #[fuchsia::test]
2987    async fn test_allocated_bytes() {
2988        const STORE_OBJECT_ID: u64 = 99;
2989        let (fs, allocator) = test_fs().await;
2990
2991        let initial_allocated_bytes = allocator.get_allocated_bytes();
2992
2993        // Verify allocated_bytes reflects allocation changes.
2994        let allocated_bytes = initial_allocated_bytes + fs.block_size();
2995        let allocated_range = {
2996            let mut transaction = fs
2997                .clone()
2998                .new_transaction(lock_keys![], Options::default())
2999                .await
3000                .expect("new_transaction failed");
3001            let range = allocator
3002                .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
3003                .await
3004                .expect("allocate failed");
3005            transaction.commit().await.expect("commit failed");
3006            assert_eq!(allocator.get_allocated_bytes(), allocated_bytes);
3007            range
3008        };
3009
3010        {
3011            let mut transaction = fs
3012                .clone()
3013                .new_transaction(lock_keys![], Options::default())
3014                .await
3015                .expect("new_transaction failed");
3016            allocator
3017                .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
3018                .await
3019                .expect("allocate failed");
3020
3021            // Prior to committing, the count of allocated bytes shouldn't change.
3022            assert_eq!(allocator.get_allocated_bytes(), allocated_bytes);
3023        }
3024
3025        // After dropping the prior transaction, the allocated bytes still shouldn't have changed.
3026        assert_eq!(allocator.get_allocated_bytes(), allocated_bytes);
3027
3028        // Verify allocated_bytes reflects deallocations.
3029        let deallocate_range = allocated_range.start + 20..allocated_range.end - 20;
3030        let mut transaction =
3031            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
3032        allocator
3033            .deallocate(&mut transaction, STORE_OBJECT_ID, deallocate_range)
3034            .await
3035            .expect("deallocate failed");
3036
3037        // Before committing, there should be no change.
3038        assert_eq!(allocator.get_allocated_bytes(), allocated_bytes);
3039
3040        transaction.commit().await.expect("commit failed");
3041
3042        // After committing, all but 40 bytes should remain allocated.
3043        assert_eq!(allocator.get_allocated_bytes(), initial_allocated_bytes + 40);
3044    }
3045
3046    #[fuchsia::test]
3047    async fn test_persist_bytes_limit() {
3048        const LIMIT: u64 = 12345;
3049        const OWNER_ID: u64 = 12;
3050
3051        let (fs, allocator) = test_fs().await;
3052        {
3053            let mut transaction = fs
3054                .clone()
3055                .new_transaction(lock_keys![], Options::default())
3056                .await
3057                .expect("new_transaction failed");
3058            allocator
3059                .set_bytes_limit(&mut transaction, OWNER_ID, LIMIT)
3060                .expect("Failed to set limit.");
3061            assert!(allocator.inner.lock().info.limit_bytes.get(&OWNER_ID).is_none());
3062            transaction.commit().await.expect("Failed to commit transaction");
3063            let bytes: u64 = *allocator
3064                .inner
3065                .lock()
3066                .info
3067                .limit_bytes
3068                .get(&OWNER_ID)
3069                .expect("Failed to find limit");
3070            assert_eq!(LIMIT, bytes);
3071        }
3072    }
3073
3074    /// Given a sorted list of non-overlapping ranges, this will coalesce adjacent ranges.
3075    /// This allows comparison of equivalent sets of ranges which may occur due to differences
3076    /// across allocator strategies.
3077    fn coalesce_ranges(ranges: Vec<Range<u64>>) -> Vec<Range<u64>> {
3078        let mut coalesced = Vec::new();
3079        let mut prev: Option<Range<u64>> = None;
3080        for range in ranges {
3081            if let Some(prev_range) = &mut prev {
3082                if range.start == prev_range.end {
3083                    prev_range.end = range.end;
3084                } else {
3085                    coalesced.push(prev_range.clone());
3086                    prev = Some(range);
3087                }
3088            } else {
3089                prev = Some(range);
3090            }
3091        }
3092        if let Some(prev_range) = prev {
3093            coalesced.push(prev_range);
3094        }
3095        coalesced
3096    }
3097
3098    #[fuchsia::test]
3099    async fn test_take_for_trimming() {
3100        const STORE_OBJECT_ID: u64 = 99;
3101
3102        // Allocate a large chunk, then free a few bits of it, so we have free chunks interleaved
3103        // with allocated chunks.
3104        let allocated_range;
3105        let expected_free_ranges;
3106        let device = {
3107            let (fs, allocator) = test_fs().await;
3108            let bs = fs.block_size();
3109            let mut transaction = fs
3110                .clone()
3111                .new_transaction(lock_keys![], Options::default())
3112                .await
3113                .expect("new failed");
3114            allocated_range = allocator
3115                .allocate(&mut transaction, STORE_OBJECT_ID, 32 * bs)
3116                .await
3117                .expect("allocate failed");
3118            transaction.commit().await.expect("commit failed");
3119
3120            let mut transaction = fs
3121                .clone()
3122                .new_transaction(lock_keys![], Options::default())
3123                .await
3124                .expect("new failed");
3125            let base = allocated_range.start;
3126            expected_free_ranges = vec![
3127                base..(base + (bs * 1)),
3128                (base + (bs * 2))..(base + (bs * 3)),
3129                // Note that the next three ranges are adjacent and will be treated as one free
3130                // range once applied.  We separate them here to exercise the handling of "large"
3131                // free ranges.
3132                (base + (bs * 4))..(base + (bs * 8)),
3133                (base + (bs * 8))..(base + (bs * 12)),
3134                (base + (bs * 12))..(base + (bs * 13)),
3135                (base + (bs * 29))..(base + (bs * 30)),
3136            ];
3137            for range in &expected_free_ranges {
3138                allocator
3139                    .deallocate(&mut transaction, STORE_OBJECT_ID, range.clone())
3140                    .await
3141                    .expect("deallocate failed");
3142            }
3143            transaction.commit().await.expect("commit failed");
3144
3145            allocator.flush().await.expect("flush failed");
3146
3147            fs.close().await.expect("close failed");
3148            fs.take_device().await
3149        };
3150
3151        device.reopen(false);
3152        let fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
3153        let allocator = fs.allocator();
3154
3155        // These values were picked so that each of them would be the reason why
3156        // collect_free_extents finished, and so we would return after partially processing one of
3157        // the free extents.
3158        let max_extent_size = fs.block_size() as usize * 4;
3159        const EXTENTS_PER_BATCH: usize = 2;
3160        let mut free_ranges = vec![];
3161        let mut offset = allocated_range.start;
3162        while offset < allocated_range.end {
3163            let free = allocator
3164                .take_for_trimming(offset, max_extent_size, EXTENTS_PER_BATCH)
3165                .await
3166                .expect("take_for_trimming failed");
3167            free_ranges.extend(
3168                free.extents().iter().filter(|range| range.end <= allocated_range.end).cloned(),
3169            );
3170            offset = free.extents().last().expect("Unexpectedly hit the end of free extents").end;
3171        }
3172        // Coalesce adjacent free ranges because the allocator may return smaller aligned chunks
3173        // but the overall range should always be equivalent.
3174        let coalesced_free_ranges = coalesce_ranges(free_ranges);
3175        let coalesced_expected_free_ranges = coalesce_ranges(expected_free_ranges);
3176
3177        assert_eq!(coalesced_free_ranges, coalesced_expected_free_ranges);
3178    }
3179
3180    #[fuchsia::test]
3181    async fn test_allocations_wait_for_free_extents() {
3182        const STORE_OBJECT_ID: u64 = 99;
3183        let (fs, allocator) = test_fs().await;
3184        let allocator_clone = allocator.clone();
3185
3186        let mut transaction =
3187            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
3188
3189        // Tie up all of the free extents on the device, and make sure allocations block.
3190        let max_extent_size = fs.device().size() as usize;
3191        const EXTENTS_PER_BATCH: usize = usize::MAX;
3192
3193        // HACK: Treat `trimmable_extents` as being locked by `trim_done` (i.e. it should only be
3194        // accessed whilst `trim_done` is locked). We can't combine them into the same mutex,
3195        // because the inner type would be "poisoned" by the lifetime parameter of
3196        // `trimmable_extents` (which is in the lifetime of `allocator`), and then we can't move it
3197        // into `alloc_task` which would require a `'static` lifetime.
3198        let trim_done = Arc::new(Mutex::new(false));
3199        let trimmable_extents = allocator
3200            .take_for_trimming(0, max_extent_size, EXTENTS_PER_BATCH)
3201            .await
3202            .expect("take_for_trimming failed");
3203
3204        let trim_done_clone = trim_done.clone();
3205        let bs = fs.block_size();
3206        let alloc_task = fasync::Task::spawn(async move {
3207            allocator_clone
3208                .allocate(&mut transaction, STORE_OBJECT_ID, bs)
3209                .await
3210                .expect("allocate should fail");
3211            {
3212                assert!(*trim_done_clone.lock(), "Allocation finished before trim completed");
3213            }
3214            transaction.commit().await.expect("commit failed");
3215        });
3216
3217        // Add a small delay to simulate the trim taking some nonzero amount of time.  Otherwise,
3218        // this will almost certainly always beat the allocation attempt.
3219        fasync::Timer::new(std::time::Duration::from_millis(100)).await;
3220
3221        // Once the free extents are released, the task should unblock.
3222        {
3223            let mut trim_done = trim_done.lock();
3224            std::mem::drop(trimmable_extents);
3225            *trim_done = true;
3226        }
3227
3228        alloc_task.await;
3229    }
3230
3231    #[fuchsia::test]
3232    async fn test_allocation_with_reservation_not_multiple_of_block_size() {
3233        const STORE_OBJECT_ID: u64 = 99;
3234        let (fs, allocator) = test_fs().await;
3235
3236        // Reserve an amount that isn't a multiple of the block size.
3237        const RESERVATION_AMOUNT: u64 = TRANSACTION_METADATA_MAX_AMOUNT + 5000;
3238        let reservation =
3239            allocator.clone().reserve(Some(STORE_OBJECT_ID), RESERVATION_AMOUNT).unwrap();
3240
3241        let mut transaction = fs
3242            .clone()
3243            .new_transaction(
3244                lock_keys![],
3245                Options { allocator_reservation: Some(&reservation), ..Options::default() },
3246            )
3247            .await
3248            .expect("new failed");
3249
3250        let range = allocator
3251            .allocate(
3252                &mut transaction,
3253                STORE_OBJECT_ID,
3254                round_up(RESERVATION_AMOUNT, fs.block_size()).unwrap(),
3255            )
3256            .await
3257            .expect("allocate faiiled");
3258        assert_eq!((range.end - range.start) % fs.block_size(), 0);
3259
3260        println!("{}", range.end - range.start);
3261    }
3262}