fxfs/object_store/
allocator.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! # The Allocator
6//!
7//! The allocator in Fxfs is filesystem-wide entity responsible for managing the allocation of
8//! regions of the device to "owners" (which are `ObjectStore`).
9//!
10//! Allocations are tracked in an LSMTree with coalescing used to merge neighboring allocations
11//! with the same properties (owner and reference count). As of writing, reference counting is not
12//! used. (Reference counts are intended for future use if/when snapshot support is implemented.)
13//!
14//! There are a number of important implementation features of the allocator that warrant further
15//! documentation.
16//!
17//! ## Byte limits
18//!
19//! Fxfs is a multi-volume filesystem. Fuchsia with fxblob currently uses two primary volumes -
20//! an unencrypted volume for blob storage and an encrypted volume for data storage.
21//! Byte limits ensure that no one volume can consume all available storage. This is important
22//! as software updates must always be possible (blobs) and conversely configuration data should
23//! always be writable (data).
24//!
25//! ## Reservation tracking
26//!
27//! Fxfs on Fuchsia leverages write-back caching which allows us to buffer writes in RAM until
28//! memory pressure, an explicit flush or a file close requires us to persist data to storage.
29//!
30//! To ensure that we do not over-commit in such cases (e.g. by writing many files to RAM only to
31//! find out tha there is insufficient disk to store them all), the allocator includes a simple
32//! reservation tracking mechanism.
33//!
34//! Reservation tracking is implemented hierarchically such that a reservation can portion out
35//! sub-reservations, each of which may be "reserved" or "forgotten" when it comes time to actually
36//! allocate space.
37//!
38//! ## Fixed locations
39//!
40//! The only fixed location files in Fxfs are the first 512kiB extents of the two superblocks that
41//! exist as the first things on the disk (i.e. The first 1MiB). The allocator manages these
42//! locations, but does so using a 'mark_allocated' method distinct from all other allocations in
43//! which the location is left up to the allocator.
44//!
45//! ## Deallocated unusable regions
46//!
47//! It is not legal to reuse a deallocated disk region until after a flush. Transactions
48//! are not guaranteed to be persisted until after a successful flush so any reuse of
49//! a region before then followed by power loss may lead to corruption.
50//!
51//! e.g. Consider if we deallocate a file, reuse its extent for another file, then crash after
52//! writing to the new file but not yet flushing the journal. At next mount we will have lost the
53//! transaction despite overwriting the original file's data, leading to inconsistency errors (data
54//! corruption).
55//!
56//! These regions are currently tracked in RAM in the allocator.
57//!
58//! ## Allocated but uncommitted regions
59//!
60//! These are allocated regions that are not (yet) persisted to disk. They are regular
61//! file allocations, but are not stored on persistent storage until their transaction is committed
62//! and safely flushed to disk.
63//!
64//! ## TRIMed unusable regions
65//!
66//! We periodically TRIM unallocated regions to give the SSD controller insight into which
67//! parts of the device contain data. We must avoid using temporary TRIM allocations that are held
68//! while we perform these operations.
69//!
70//! ## Volume deletion
71//!
72//! We make use of an optimisation in the case where an entire volume is deleted. In such cases,
73//! rather than individually deallocate all disk regions associated with that volume, we make
74//! note of the deletion and perform special merge operation on the next LSMTree compaction that
75//! filters out allocations for the deleted volume.
76//!
77//! This is designed to make dropping of volumes significantly cheaper, but it does add some
78//! additional complexity if implementing an allocator that implements data structures to track
79//! free space (rather than just allocated space).
80//!
81//! ## Image generation
82//!
83//! The Fuchsia build process requires building an initial filesystem image. In the case of
84//! fxblob-based boards, this is an Fxfs filesystem containing a volume with the base set of
85//! blobs required to bootstrap the system. When we build such an image, we want it to be as compact
86//! as possible as we're potentially packaging it up for distribution. To that end, our allocation
87//! strategy (or at least the strategy used for image generation) should prefer to allocate from the
88//! start of the device wherever possible.
89pub mod merge;
90pub mod strategy;
91
92use crate::drop_event::DropEvent;
93use crate::errors::FxfsError;
94use crate::filesystem::{ApplyContext, ApplyMode, FxFilesystem, JournalingObject, SyncOptions};
95use crate::log::*;
96use crate::lsm_tree::cache::NullCache;
97use crate::lsm_tree::skip_list_layer::SkipListLayer;
98use crate::lsm_tree::types::{
99    FuzzyHash, Item, ItemRef, Layer, LayerIterator, LayerKey, MergeType, OrdLowerBound,
100    OrdUpperBound, RangeKey, SortByU64, Value,
101};
102use crate::lsm_tree::{LSMTree, Query, layers_from_handles};
103use crate::object_handle::{INVALID_OBJECT_ID, ObjectHandle, ReadObjectHandle};
104use crate::object_store::object_manager::ReservationUpdate;
105use crate::object_store::transaction::{
106    AllocatorMutation, AssocObj, LockKey, Mutation, Options, Transaction, WriteGuard, lock_keys,
107};
108use crate::object_store::{
109    DataObjectHandle, DirectWriter, HandleOptions, ObjectStore, ReservedId, tree,
110};
111use crate::range::RangeExt;
112use crate::round::{round_div, round_down, round_up};
113use crate::serialized_types::{
114    DEFAULT_MAX_SERIALIZED_RECORD_SIZE, LATEST_VERSION, Version, Versioned, VersionedLatest,
115};
116use anyhow::{Context, Error, anyhow, bail, ensure};
117use async_trait::async_trait;
118use either::Either::{Left, Right};
119use event_listener::EventListener;
120use fprint::TypeFingerprint;
121use fuchsia_inspect::HistogramProperty;
122use fuchsia_sync::Mutex;
123use futures::FutureExt;
124use merge::{filter_marked_for_deletion, filter_tombstones, merge};
125use serde::{Deserialize, Serialize};
126use std::borrow::Borrow;
127use std::collections::{BTreeMap, HashSet, VecDeque};
128use std::hash::Hash;
129use std::marker::PhantomData;
130use std::num::{NonZero, Saturating};
131use std::ops::Range;
132use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
133use std::sync::{Arc, Weak};
134
135/// This trait is implemented by things that own reservations.
136pub trait ReservationOwner: Send + Sync {
137    /// Report that bytes are being released from the reservation back to the the |ReservationOwner|
138    /// where |owner_object_id| is the owner under the root object store associated with the
139    /// reservation.
140    fn release_reservation(&self, owner_object_id: Option<u64>, amount: u64);
141}
142
143/// A reservation guarantees that when it comes time to actually allocate, it will not fail due to
144/// lack of space.  Sub-reservations (a.k.a. holds) are possible which effectively allows part of a
145/// reservation to be set aside until it's time to commit.  Reservations do offer some
146/// thread-safety, but some responsibility is born by the caller: e.g. calling `forget` and
147/// `reserve` at the same time from different threads is unsafe. Reservations are have an
148/// |owner_object_id| which associates it with an object under the root object store that the
149/// reservation is accounted against.
150pub struct ReservationImpl<T: Borrow<U>, U: ReservationOwner + ?Sized> {
151    owner: T,
152    owner_object_id: Option<u64>,
153    inner: Mutex<ReservationInner>,
154    phantom: PhantomData<U>,
155}
156
157#[derive(Debug, Default)]
158struct ReservationInner {
159    // Amount currently held by this reservation.
160    amount: u64,
161
162    // Amount reserved by sub-reservations.
163    reserved: u64,
164}
165
166impl<T: Borrow<U>, U: ReservationOwner + ?Sized> std::fmt::Debug for ReservationImpl<T, U> {
167    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
168        self.inner.lock().fmt(f)
169    }
170}
171
172impl<T: Borrow<U> + Clone + Send + Sync, U: ReservationOwner + ?Sized> ReservationImpl<T, U> {
173    pub fn new(owner: T, owner_object_id: Option<u64>, amount: u64) -> Self {
174        Self {
175            owner,
176            owner_object_id,
177            inner: Mutex::new(ReservationInner { amount, reserved: 0 }),
178            phantom: PhantomData,
179        }
180    }
181
182    pub fn owner_object_id(&self) -> Option<u64> {
183        self.owner_object_id
184    }
185
186    /// Returns the total amount of the reservation, not accounting for anything that might be held.
187    pub fn amount(&self) -> u64 {
188        self.inner.lock().amount
189    }
190
191    /// Adds more to the reservation.
192    pub fn add(&self, amount: u64) {
193        self.inner.lock().amount += amount;
194    }
195
196    /// Returns the entire amount of the reservation.  The caller is responsible for maintaining
197    /// consistency, i.e. updating counters, etc, and there can be no sub-reservations (an assert
198    /// will fire otherwise).
199    pub fn forget(&self) -> u64 {
200        let mut inner = self.inner.lock();
201        assert_eq!(inner.reserved, 0);
202        std::mem::take(&mut inner.amount)
203    }
204
205    /// Takes some of the reservation.  The caller is responsible for maintaining consistency,
206    /// i.e. updating counters, etc.  This will assert that the amount being forgotten does not
207    /// exceed the available reservation amount; the caller should ensure that this is the case.
208    pub fn forget_some(&self, amount: u64) {
209        let mut inner = self.inner.lock();
210        inner.amount -= amount;
211        assert!(inner.reserved <= inner.amount);
212    }
213
214    /// Returns a partial amount of the reservation. |amount| is passed the |limit| and should
215    /// return the amount, which can be zero.
216    fn reserve_with(&self, amount: impl FnOnce(u64) -> u64) -> ReservationImpl<&Self, Self> {
217        let mut inner = self.inner.lock();
218        let taken = amount(inner.amount - inner.reserved);
219        inner.reserved += taken;
220        ReservationImpl::new(self, self.owner_object_id, taken)
221    }
222
223    /// Reserves *exactly* amount if possible.
224    pub fn reserve(&self, amount: u64) -> Option<ReservationImpl<&Self, Self>> {
225        let mut inner = self.inner.lock();
226        if inner.amount - inner.reserved < amount {
227            None
228        } else {
229            inner.reserved += amount;
230            Some(ReservationImpl::new(self, self.owner_object_id, amount))
231        }
232    }
233
234    /// Commits a previously reserved amount from this reservation.  The caller is responsible for
235    /// ensuring the amount was reserved.
236    pub fn commit(&self, amount: u64) {
237        let mut inner = self.inner.lock();
238        inner.reserved -= amount;
239        inner.amount -= amount;
240    }
241
242    /// Returns some of the reservation.
243    pub fn give_back(&self, amount: u64) {
244        self.owner.borrow().release_reservation(self.owner_object_id, amount);
245        let mut inner = self.inner.lock();
246        inner.amount -= amount;
247        assert!(inner.reserved <= inner.amount);
248    }
249
250    /// Moves `amount` from this reservation to another reservation.
251    pub fn move_to<V: Borrow<W> + Clone + Send + Sync, W: ReservationOwner + ?Sized>(
252        &self,
253        other: &ReservationImpl<V, W>,
254        amount: u64,
255    ) {
256        assert_eq!(self.owner_object_id, other.owner_object_id());
257        let mut inner = self.inner.lock();
258        if let Some(amount) = inner.amount.checked_sub(amount) {
259            inner.amount = amount;
260        } else {
261            std::mem::drop(inner);
262            panic!("Insufficient reservation space");
263        }
264        other.add(amount);
265    }
266}
267
268impl<T: Borrow<U>, U: ReservationOwner + ?Sized> Drop for ReservationImpl<T, U> {
269    fn drop(&mut self) {
270        let inner = self.inner.get_mut();
271        assert_eq!(inner.reserved, 0);
272        let owner_object_id = self.owner_object_id;
273        if inner.amount > 0 {
274            self.owner
275                .borrow()
276                .release_reservation(owner_object_id, std::mem::take(&mut inner.amount));
277        }
278    }
279}
280
281impl<T: Borrow<U> + Send + Sync, U: ReservationOwner + ?Sized> ReservationOwner
282    for ReservationImpl<T, U>
283{
284    fn release_reservation(&self, owner_object_id: Option<u64>, amount: u64) {
285        // Sub-reservations should belong to the same owner (or lack thereof).
286        assert_eq!(owner_object_id, self.owner_object_id);
287        let mut inner = self.inner.lock();
288        assert!(inner.reserved >= amount, "{} >= {}", inner.reserved, amount);
289        inner.reserved -= amount;
290    }
291}
292
293pub type Reservation = ReservationImpl<Arc<dyn ReservationOwner>, dyn ReservationOwner>;
294
295pub type Hold<'a> = ReservationImpl<&'a Reservation, Reservation>;
296
297/// Our allocator implementation tracks extents with a reference count.  At time of writing, these
298/// reference counts should never exceed 1, but that might change with snapshots and clones.
299pub type AllocatorKey = AllocatorKeyV32;
300
301#[derive(Clone, Debug, Deserialize, Eq, Hash, PartialEq, Serialize, TypeFingerprint, Versioned)]
302#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
303pub struct AllocatorKeyV32 {
304    pub device_range: Range<u64>,
305}
306
307impl SortByU64 for AllocatorKey {
308    fn get_leading_u64(&self) -> u64 {
309        self.device_range.start
310    }
311}
312
313const EXTENT_HASH_BUCKET_SIZE: u64 = 1 * 1024 * 1024;
314
315pub struct AllocatorKeyPartitionIterator {
316    device_range: Range<u64>,
317}
318
319impl Iterator for AllocatorKeyPartitionIterator {
320    type Item = u64;
321
322    fn next(&mut self) -> Option<Self::Item> {
323        if self.device_range.start >= self.device_range.end {
324            None
325        } else {
326            let start = self.device_range.start;
327            self.device_range.start = start.saturating_add(EXTENT_HASH_BUCKET_SIZE);
328            let end = std::cmp::min(self.device_range.start, self.device_range.end);
329            let key = AllocatorKey { device_range: start..end };
330            let hash = crate::stable_hash::stable_hash(key);
331            Some(hash)
332        }
333    }
334}
335
336impl FuzzyHash for AllocatorKey {
337    fn fuzzy_hash(&self) -> impl Iterator<Item = u64> {
338        AllocatorKeyPartitionIterator {
339            device_range: round_down(self.device_range.start, EXTENT_HASH_BUCKET_SIZE)
340                ..round_up(self.device_range.end, EXTENT_HASH_BUCKET_SIZE).unwrap_or(u64::MAX),
341        }
342    }
343
344    fn is_range_key(&self) -> bool {
345        true
346    }
347}
348
349impl AllocatorKey {
350    /// Returns a new key that is a lower bound suitable for use with merge_into.
351    pub fn lower_bound_for_merge_into(self: &AllocatorKey) -> AllocatorKey {
352        AllocatorKey { device_range: 0..self.device_range.start }
353    }
354}
355
356impl LayerKey for AllocatorKey {
357    fn merge_type(&self) -> MergeType {
358        MergeType::OptimizedMerge
359    }
360}
361
362impl OrdUpperBound for AllocatorKey {
363    fn cmp_upper_bound(&self, other: &AllocatorKey) -> std::cmp::Ordering {
364        self.device_range.end.cmp(&other.device_range.end)
365    }
366}
367
368impl OrdLowerBound for AllocatorKey {
369    fn cmp_lower_bound(&self, other: &AllocatorKey) -> std::cmp::Ordering {
370        // The ordering over range.end is significant here as it is used in
371        // the heap ordering that feeds into our merge function and
372        // a total ordering over range lets us remove a symmetry case from
373        // the allocator merge function.
374        self.device_range
375            .start
376            .cmp(&other.device_range.start)
377            .then(self.device_range.end.cmp(&other.device_range.end))
378    }
379}
380
381impl Ord for AllocatorKey {
382    fn cmp(&self, other: &AllocatorKey) -> std::cmp::Ordering {
383        self.device_range
384            .start
385            .cmp(&other.device_range.start)
386            .then(self.device_range.end.cmp(&other.device_range.end))
387    }
388}
389
390impl PartialOrd for AllocatorKey {
391    fn partial_cmp(&self, other: &AllocatorKey) -> Option<std::cmp::Ordering> {
392        Some(self.cmp(other))
393    }
394}
395
396impl RangeKey for AllocatorKey {
397    fn overlaps(&self, other: &Self) -> bool {
398        self.device_range.start < other.device_range.end
399            && self.device_range.end > other.device_range.start
400    }
401}
402
403/// Allocations are "owned" by a single ObjectStore and are reference counted
404/// (for future snapshot/clone support).
405pub type AllocatorValue = AllocatorValueV32;
406impl Value for AllocatorValue {
407    const DELETED_MARKER: Self = Self::None;
408}
409
410#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize, TypeFingerprint, Versioned)]
411#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
412pub enum AllocatorValueV32 {
413    // Tombstone variant indicating an extent is no longer allocated.
414    None,
415    // Used when we know there are no possible allocations below us in the stack.
416    // This is currently all the time. We used to have a related Delta type but
417    // it has been removed due to correctness issues (https://fxbug.dev/42179428).
418    Abs { count: u64, owner_object_id: u64 },
419}
420
421pub type AllocatorItem = Item<AllocatorKey, AllocatorValue>;
422
423/// Serialized information about the allocator.
424pub type AllocatorInfo = AllocatorInfoV32;
425
426#[derive(Debug, Default, Clone, Deserialize, Serialize, TypeFingerprint, Versioned)]
427pub struct AllocatorInfoV32 {
428    /// Holds the set of layer file object_id for the LSM tree (newest first).
429    pub layers: Vec<u64>,
430    /// Maps from owner_object_id to bytes allocated.
431    pub allocated_bytes: BTreeMap<u64, u64>,
432    /// Set of owner_object_id that we should ignore if found in layer files.  For now, this should
433    /// always be empty on-disk because we always do full compactions.
434    pub marked_for_deletion: HashSet<u64>,
435    /// The limit for the number of allocates bytes per `owner_object_id` whereas the value. If
436    /// there is no limit present here for an `owner_object_id` assume it is max u64.
437    pub limit_bytes: BTreeMap<u64, u64>,
438}
439
440const MAX_ALLOCATOR_INFO_SERIALIZED_SIZE: usize = 131_072;
441
442/// Computes the target maximum extent size based on the block size of the allocator.
443pub fn max_extent_size_for_block_size(block_size: u64) -> u64 {
444    // Each block in an extent contains an 8-byte checksum (which due to varint encoding is 9
445    // bytes), and a given extent record must be no larger DEFAULT_MAX_SERIALIZED_RECORD_SIZE.  We
446    // also need to leave a bit of room (arbitrarily, 64 bytes) for the rest of the extent's
447    // metadata.
448    block_size * (DEFAULT_MAX_SERIALIZED_RECORD_SIZE - 64) / 9
449}
450
451#[derive(Default)]
452struct AllocatorCounters {
453    num_flushes: u64,
454    last_flush_time: Option<std::time::SystemTime>,
455}
456
457pub struct Allocator {
458    filesystem: Weak<FxFilesystem>,
459    block_size: u64,
460    device_size: u64,
461    object_id: u64,
462    max_extent_size_bytes: u64,
463    tree: LSMTree<AllocatorKey, AllocatorValue>,
464    // A list of allocations which are temporary; i.e. they are not actually stored in the LSM tree,
465    // but we still don't want to be able to allocate over them.  This layer is merged into the
466    // allocator's LSM tree whilst reading it, so the allocations appear to exist in the LSM tree.
467    // This is used in a few places, for example to hold back allocations that have been added to a
468    // transaction but are not yet committed yet, or to prevent the allocation of a deleted extent
469    // until the device is flushed.
470    temporary_allocations: Arc<SkipListLayer<AllocatorKey, AllocatorValue>>,
471    inner: Mutex<Inner>,
472    allocation_mutex: futures::lock::Mutex<()>,
473    counters: Mutex<AllocatorCounters>,
474    maximum_offset: AtomicU64,
475    allocations_allowed: AtomicBool,
476}
477
478/// Tracks the different stages of byte allocations for an individual owner.
479#[derive(Debug, Default, PartialEq)]
480struct ByteTracking {
481    /// This value is the up-to-date count of the number of allocated bytes per owner_object_id
482    /// whereas the value in `Info::allocated_bytes` is the value as it was when we last flushed.
483    /// This field should be regarded as *untrusted*; it can be invalid due to filesystem
484    /// inconsistencies, and this is why it is `Saturating<u64>` rather than just `u64`.  Any
485    /// computations that use this value should typically propagate `Saturating` so that it is clear
486    /// the value is *untrusted*.
487    allocated_bytes: Saturating<u64>,
488
489    /// This value is the number of bytes allocated to uncommitted allocations.
490    /// (Bytes allocated, but not yet persisted to disk)
491    uncommitted_allocated_bytes: u64,
492
493    /// This value is the number of bytes allocated to reservations.
494    reserved_bytes: u64,
495
496    /// Committed deallocations that we cannot use until they are flushed to the device.  Each entry
497    /// in this list is the log file offset at which it was committed and an array of deallocations
498    /// that occurred at that time.
499    committed_deallocated_bytes: u64,
500}
501
502impl ByteTracking {
503    // Returns the total number of bytes that are taken either from reservations, allocations or
504    // uncommitted allocations.
505    fn used_bytes(&self) -> Saturating<u64> {
506        self.allocated_bytes + Saturating(self.uncommitted_allocated_bytes + self.reserved_bytes)
507    }
508
509    // Returns the amount that is not available to be allocated, which includes actually allocated
510    // bytes, bytes that have been allocated for a transaction but the transaction hasn't committed
511    // yet, and bytes that have been deallocated, but the device hasn't been flushed yet so we can't
512    // reuse those bytes yet.
513    fn unavailable_bytes(&self) -> Saturating<u64> {
514        self.allocated_bytes
515            + Saturating(self.uncommitted_allocated_bytes)
516            + Saturating(self.committed_deallocated_bytes)
517    }
518
519    // Like `unavailable_bytes`, but treats as available the bytes which have been deallocated and
520    // require a device flush to be reused.
521    fn unavailable_after_sync_bytes(&self) -> Saturating<u64> {
522        self.allocated_bytes + Saturating(self.uncommitted_allocated_bytes)
523    }
524}
525
526#[derive(Debug)]
527struct CommittedDeallocation {
528    // The offset at which this deallocation was committed.
529    log_file_offset: u64,
530    // The device range being deallocated.
531    range: Range<u64>,
532    // The owning object id which originally allocated it.
533    owner_object_id: u64,
534}
535
536struct Inner {
537    info: AllocatorInfo,
538
539    /// The allocator can only be opened if there have been no allocations and it has not already
540    /// been opened or initialized.
541    opened: bool,
542
543    /// When we allocate a range from RAM, we add it to Allocator::temporary_allocations.
544    /// This layer is added to layer_set in rebuild_strategy and take_for trim so we don't assume
545    /// the range is available.
546    /// If we apply the mutation, we move it from `temporary_allocations` to the LSMTree.
547    /// If we drop the mutation, we just delete it from `temporary_allocations` and call `free`.
548    ///
549    /// We need to be very careful about races when we move the allocation into the LSMTree.
550    /// A layer_set is a snapshot in time of a set of layers. Say:
551    ///  1. `rebuild_strategy` takes a layer_set that includes the mutable layer.
552    ///  2. A compaction operation seals that mutable layer and creates a new mutable layer.
553    ///     Note that the layer_set in rebuild_strategy doesn't have this new mutable layer.
554    ///  3. An apply_mutation operation adds the allocation to the new mutable layer.
555    ///     It then removes the allocation from `temporary_allocations`.
556    ///  4. `rebuild_strategy` iterates the layer_set, missing both the temporary mutation AND
557    ///     the record in the new mutable layer, making the allocated range available for double
558    ///     allocation and future filesystem corruption.
559    ///
560    /// We don't have (or want) a means to lock compctions during rebuild_strategy operations so
561    /// to avoid this scenario, we can't remove from temporary_allocations when we apply a mutation.
562    /// Instead we add them to dropped_temporary_allocations and make the actual removals from
563    /// `temporary_allocations` while holding the allocator lock. Because rebuild_strategy only
564    /// runs with this lock held, this guarantees that we don't remove entries from temporary
565    /// mutations while also iterating over it.
566    ///
567    /// A related issue happens when we deallocate a range. We use temporary_allocations this time
568    /// to prevent reuse until after the deallocation has been successfully flushed.
569    /// In this scenario we don't want to call 'free()' until the range is ready to use again.
570    dropped_temporary_allocations: Vec<Range<u64>>,
571
572    /// The per-owner counters for bytes at various stages of the data life-cycle. From initial
573    /// reservation through until the bytes are unallocated and eventually uncommitted.
574    owner_bytes: BTreeMap<u64, ByteTracking>,
575
576    /// This value is the number of bytes allocated to reservations but not tracked as part of a
577    /// particular volume.
578    unattributed_reserved_bytes: u64,
579
580    /// Committed deallocations that we cannot use until they are flushed to the device.
581    committed_deallocated: VecDeque<CommittedDeallocation>,
582
583    /// Bytes which are currently being trimmed.  These can still be allocated from, but the
584    /// allocation will block until the current batch of trimming is finished.
585    trim_reserved_bytes: u64,
586
587    /// While a trim is being performed, this listener is set.  When the current batch of extents
588    /// being trimmed have been released (and trim_reserved_bytes is 0), this is signaled.
589    /// This should only be listened to while the allocation_mutex is held.
590    trim_listener: Option<EventListener>,
591
592    /// This controls how we allocate our free space to manage fragmentation.
593    strategy: strategy::BestFit,
594
595    /// Tracks the number of allocations of size 1,2,...63,>=64.
596    allocation_size_histogram: [u64; 64],
597    /// Tracks which size bucket triggers rebuild_strategy.
598    rebuild_strategy_trigger_histogram: [u64; 64],
599
600    /// This is distinct from the set contained in `info`.  New entries are inserted *after* a
601    /// device has been flushed (it is not safe to reuse the space taken by a deleted volume prior
602    /// to this) and are removed after a major compaction.
603    marked_for_deletion: HashSet<u64>,
604
605    /// The set of volumes deleted that are waiting for a sync.
606    volumes_deleted_pending_sync: HashSet<u64>,
607}
608
609impl Inner {
610    fn allocated_bytes(&self) -> Saturating<u64> {
611        let mut total = Saturating(0);
612        for (_, bytes) in &self.owner_bytes {
613            total += bytes.allocated_bytes;
614        }
615        total
616    }
617
618    fn uncommitted_allocated_bytes(&self) -> u64 {
619        self.owner_bytes.values().map(|x| &x.uncommitted_allocated_bytes).sum()
620    }
621
622    fn reserved_bytes(&self) -> u64 {
623        self.owner_bytes.values().map(|x| &x.reserved_bytes).sum::<u64>()
624            + self.unattributed_reserved_bytes
625    }
626
627    fn owner_id_limit_bytes(&self, owner_object_id: u64) -> u64 {
628        match self.info.limit_bytes.get(&owner_object_id) {
629            Some(v) => *v,
630            None => u64::MAX,
631        }
632    }
633
634    fn owner_id_bytes_left(&self, owner_object_id: u64) -> u64 {
635        let limit = self.owner_id_limit_bytes(owner_object_id);
636        let used = self.owner_bytes.get(&owner_object_id).map_or(Saturating(0), |b| b.used_bytes());
637        (Saturating(limit) - used).0
638    }
639
640    // Returns the amount that is not available to be allocated, which includes actually allocated
641    // bytes, bytes that have been allocated for a transaction but the transaction hasn't committed
642    // yet, and bytes that have been deallocated, but the device hasn't been flushed yet so we can't
643    // reuse those bytes yet.
644    fn unavailable_bytes(&self) -> Saturating<u64> {
645        let mut total = Saturating(0);
646        for (_, bytes) in &self.owner_bytes {
647            total += bytes.unavailable_bytes();
648        }
649        total
650    }
651
652    // Returns the total number of bytes that are taken either from reservations, allocations or
653    // uncommitted allocations.
654    fn used_bytes(&self) -> Saturating<u64> {
655        let mut total = Saturating(0);
656        for (_, bytes) in &self.owner_bytes {
657            total += bytes.used_bytes();
658        }
659        total + Saturating(self.unattributed_reserved_bytes)
660    }
661
662    // Like `unavailable_bytes`, but treats as available the bytes which have been deallocated and
663    // require a device flush to be reused.
664    fn unavailable_after_sync_bytes(&self) -> Saturating<u64> {
665        let mut total = Saturating(0);
666        for (_, bytes) in &self.owner_bytes {
667            total += bytes.unavailable_after_sync_bytes();
668        }
669        total
670    }
671
672    // Returns the number of bytes which will be available after the current batch of trimming
673    // completes.
674    fn bytes_available_not_being_trimmed(&self, device_size: u64) -> Result<u64, Error> {
675        device_size
676            .checked_sub(
677                (self.unavailable_after_sync_bytes() + Saturating(self.trim_reserved_bytes)).0,
678            )
679            .ok_or_else(|| anyhow!(FxfsError::Inconsistent))
680    }
681
682    fn add_reservation(&mut self, owner_object_id: Option<u64>, amount: u64) {
683        match owner_object_id {
684            Some(owner) => self.owner_bytes.entry(owner).or_default().reserved_bytes += amount,
685            None => self.unattributed_reserved_bytes += amount,
686        };
687    }
688
689    fn remove_reservation(&mut self, owner_object_id: Option<u64>, amount: u64) {
690        match owner_object_id {
691            Some(owner) => {
692                let owner_entry = self.owner_bytes.entry(owner).or_default();
693                assert!(
694                    owner_entry.reserved_bytes >= amount,
695                    "{} >= {}",
696                    owner_entry.reserved_bytes,
697                    amount
698                );
699                owner_entry.reserved_bytes -= amount;
700            }
701            None => {
702                assert!(
703                    self.unattributed_reserved_bytes >= amount,
704                    "{} >= {}",
705                    self.unattributed_reserved_bytes,
706                    amount
707                );
708                self.unattributed_reserved_bytes -= amount
709            }
710        };
711    }
712}
713
714/// A container for a set of extents which are known to be free and can be trimmed.  Returned by
715/// `take_for_trimming`.
716pub struct TrimmableExtents<'a> {
717    allocator: &'a Allocator,
718    extents: Vec<Range<u64>>,
719    // The allocator can subscribe to this event to wait until these extents are dropped.  This way,
720    // we don't fail an allocation attempt if blocks are tied up for trimming; rather, we just wait
721    // until the batch is finished with and then proceed.
722    _drop_event: DropEvent,
723}
724
725impl<'a> TrimmableExtents<'a> {
726    pub fn extents(&self) -> &Vec<Range<u64>> {
727        &self.extents
728    }
729
730    // Also returns an EventListener which is signaled when this is dropped.
731    fn new(allocator: &'a Allocator) -> (Self, EventListener) {
732        let drop_event = DropEvent::new();
733        let listener = drop_event.listen();
734        (Self { allocator, extents: vec![], _drop_event: drop_event }, listener)
735    }
736
737    fn add_extent(&mut self, extent: Range<u64>) {
738        self.extents.push(extent);
739    }
740}
741
742impl<'a> Drop for TrimmableExtents<'a> {
743    fn drop(&mut self) {
744        let mut inner = self.allocator.inner.lock();
745        for device_range in std::mem::take(&mut self.extents) {
746            inner.strategy.free(device_range.clone()).expect("drop trim extent");
747            self.allocator
748                .temporary_allocations
749                .erase(&AllocatorKey { device_range: device_range.clone() });
750        }
751        inner.trim_reserved_bytes = 0;
752    }
753}
754
755impl Allocator {
756    pub fn new(filesystem: Arc<FxFilesystem>, object_id: u64) -> Allocator {
757        let block_size = filesystem.block_size();
758        // We expect device size to be a multiple of block size. Throw away any tail.
759        let device_size = round_down(filesystem.device().size(), block_size);
760        if device_size != filesystem.device().size() {
761            warn!("Device size is not block aligned. Rounding down.");
762        }
763        let max_extent_size_bytes = max_extent_size_for_block_size(filesystem.block_size());
764        let mut strategy = strategy::BestFit::default();
765        strategy.free(0..device_size).expect("new fs");
766        Allocator {
767            filesystem: Arc::downgrade(&filesystem),
768            block_size,
769            device_size,
770            object_id,
771            max_extent_size_bytes,
772            tree: LSMTree::new(merge, Box::new(NullCache {})),
773            temporary_allocations: SkipListLayer::new(1024),
774            inner: Mutex::new(Inner {
775                info: AllocatorInfo::default(),
776                opened: false,
777                dropped_temporary_allocations: Vec::new(),
778                owner_bytes: BTreeMap::new(),
779                unattributed_reserved_bytes: 0,
780                committed_deallocated: VecDeque::new(),
781                trim_reserved_bytes: 0,
782                trim_listener: None,
783                strategy,
784                allocation_size_histogram: [0; 64],
785                rebuild_strategy_trigger_histogram: [0; 64],
786                marked_for_deletion: HashSet::new(),
787                volumes_deleted_pending_sync: HashSet::new(),
788            }),
789            allocation_mutex: futures::lock::Mutex::new(()),
790            counters: Mutex::new(AllocatorCounters::default()),
791            maximum_offset: AtomicU64::new(0),
792            allocations_allowed: AtomicBool::new(filesystem.options().image_builder_mode.is_none()),
793        }
794    }
795
796    pub fn tree(&self) -> &LSMTree<AllocatorKey, AllocatorValue> {
797        &self.tree
798    }
799
800    /// Enables allocations.
801    /// This is only valid to call if the allocator was created in image builder mode.
802    pub fn enable_allocations(&self) {
803        self.allocations_allowed.store(true, Ordering::SeqCst);
804    }
805
806    /// Returns an iterator that yields all allocations, filtering out tombstones and any
807    /// owner_object_id that have been marked as deleted.  If `committed_marked_for_deletion` is
808    /// true, then filter using the committed volumes marked for deletion rather than the in-memory
809    /// copy which excludes volumes that have been deleted but there hasn't been a sync yet.
810    pub async fn filter(
811        &self,
812        iter: impl LayerIterator<AllocatorKey, AllocatorValue>,
813        committed_marked_for_deletion: bool,
814    ) -> Result<impl LayerIterator<AllocatorKey, AllocatorValue>, Error> {
815        let marked_for_deletion = {
816            let inner = self.inner.lock();
817            if committed_marked_for_deletion {
818                &inner.info.marked_for_deletion
819            } else {
820                &inner.marked_for_deletion
821            }
822            .clone()
823        };
824        let iter =
825            filter_marked_for_deletion(filter_tombstones(iter).await?, marked_for_deletion).await?;
826        Ok(iter)
827    }
828
829    /// A histogram of allocation request sizes.
830    /// The index into the array is 'number of blocks'.
831    /// The last bucket is a catch-all for larger allocation requests.
832    pub fn allocation_size_histogram(&self) -> [u64; 64] {
833        self.inner.lock().allocation_size_histogram
834    }
835
836    /// Creates a new (empty) allocator.
837    pub async fn create(&self, transaction: &mut Transaction<'_>) -> Result<(), Error> {
838        // Mark the allocator as opened before creating the file because creating a new
839        // transaction requires a reservation.
840        assert_eq!(std::mem::replace(&mut self.inner.lock().opened, true), false);
841
842        let filesystem = self.filesystem.upgrade().unwrap();
843        let root_store = filesystem.root_store();
844        ObjectStore::create_object_with_id(
845            &root_store,
846            transaction,
847            ReservedId::new(&root_store, NonZero::new(self.object_id()).unwrap()),
848            HandleOptions::default(),
849            None,
850        )?;
851        root_store.update_last_object_id(self.object_id());
852        Ok(())
853    }
854
855    // Opens the allocator.  This is not thread-safe; this should be called prior to
856    // replaying the allocator's mutations.
857    pub async fn open(self: &Arc<Self>) -> Result<(), Error> {
858        let filesystem = self.filesystem.upgrade().unwrap();
859        let root_store = filesystem.root_store();
860
861        self.inner.lock().strategy = strategy::BestFit::default();
862
863        let handle =
864            ObjectStore::open_object(&root_store, self.object_id, HandleOptions::default(), None)
865                .await
866                .context("Failed to open allocator object")?;
867
868        if handle.get_size() > 0 {
869            let serialized_info = handle
870                .contents(MAX_ALLOCATOR_INFO_SERIALIZED_SIZE)
871                .await
872                .context("Failed to read AllocatorInfo")?;
873            let mut cursor = std::io::Cursor::new(serialized_info);
874            let (info, _version) = AllocatorInfo::deserialize_with_version(&mut cursor)
875                .context("Failed to deserialize AllocatorInfo")?;
876
877            let mut handles = Vec::new();
878            let mut total_size = 0;
879            for object_id in &info.layers {
880                let handle = ObjectStore::open_object(
881                    &root_store,
882                    *object_id,
883                    HandleOptions::default(),
884                    None,
885                )
886                .await
887                .context("Failed to open allocator layer file")?;
888
889                let size = handle.get_size();
890                total_size += size;
891                handles.push(handle);
892            }
893
894            {
895                let mut inner = self.inner.lock();
896
897                // Check allocated_bytes fits within the device.
898                let mut device_bytes = self.device_size;
899                for (&owner_object_id, &bytes) in &info.allocated_bytes {
900                    ensure!(
901                        bytes <= device_bytes,
902                        anyhow!(FxfsError::Inconsistent).context(format!(
903                            "Allocated bytes exceeds device size: {:?}",
904                            info.allocated_bytes
905                        ))
906                    );
907                    device_bytes -= bytes;
908
909                    inner.owner_bytes.entry(owner_object_id).or_default().allocated_bytes =
910                        Saturating(bytes);
911                }
912
913                inner.info = info;
914            }
915
916            self.tree.append_layers(handles).await.context("Failed to append allocator layers")?;
917            self.filesystem.upgrade().unwrap().object_manager().update_reservation(
918                self.object_id,
919                tree::reservation_amount_from_layer_size(total_size),
920            );
921        }
922
923        Ok(())
924    }
925
926    pub async fn on_replay_complete(self: &Arc<Self>) -> Result<(), Error> {
927        // We can assume the device has been flushed.
928        {
929            let mut inner = self.inner.lock();
930            inner.volumes_deleted_pending_sync.clear();
931            inner.marked_for_deletion = inner.info.marked_for_deletion.clone();
932        }
933
934        // Build free extent structure from disk. For now, we expect disks to have *some* free
935        // space at all times. This may change if we ever support mounting of read-only
936        // redistributable filesystem images.
937        if !self.rebuild_strategy().await.context("Build free extents")? {
938            if self.filesystem.upgrade().unwrap().options().read_only {
939                info!("Device contains no free space (read-only mode).");
940            } else {
941                info!("Device contains no free space.");
942                return Err(FxfsError::Inconsistent)
943                    .context("Device appears to contain no free space");
944            }
945        }
946
947        assert_eq!(std::mem::replace(&mut self.inner.lock().opened, true), false);
948        Ok(())
949    }
950
951    /// Walk all allocations to generate the set of free regions between allocations.
952    /// It is safe to re-run this on live filesystems but it should not be called concurrently
953    /// with allocations, trims or other rebuild_strategy invocations -- use allocation_mutex.
954    /// Returns true if the rebuild made changes to either the free ranges or overflow markers.
955    async fn rebuild_strategy(self: &Arc<Self>) -> Result<bool, Error> {
956        let mut changed = false;
957        let mut layer_set = self.tree.empty_layer_set();
958        layer_set.layers.push((self.temporary_allocations.clone() as Arc<dyn Layer<_, _>>).into());
959        self.tree.add_all_layers_to_layer_set(&mut layer_set);
960
961        let overflow_markers = self.inner.lock().strategy.overflow_markers();
962        self.inner.lock().strategy.reset_overflow_markers();
963
964        let mut to_add = Vec::new();
965        let mut merger = layer_set.merger();
966        let mut iter = self.filter(merger.query(Query::FullScan).await?, false).await?;
967        let mut last_offset = 0;
968        while last_offset < self.device_size {
969            let next_range = match iter.get() {
970                None => {
971                    assert!(last_offset <= self.device_size);
972                    let range = last_offset..self.device_size;
973                    last_offset = self.device_size;
974                    iter.advance().await?;
975                    range
976                }
977                Some(ItemRef { key: AllocatorKey { device_range, .. }, .. }) => {
978                    if device_range.end < last_offset {
979                        iter.advance().await?;
980                        continue;
981                    }
982                    if device_range.start <= last_offset {
983                        last_offset = device_range.end;
984                        iter.advance().await?;
985                        continue;
986                    }
987                    let range = last_offset..device_range.start;
988                    last_offset = device_range.end;
989                    iter.advance().await?;
990                    range
991                }
992            };
993            to_add.push(next_range);
994            // Avoid taking a lock on inner for every free range.
995            if to_add.len() > 100 {
996                let mut inner = self.inner.lock();
997                for range in to_add.drain(..) {
998                    changed |= inner.strategy.force_free(range)?;
999                }
1000            }
1001        }
1002        let mut inner = self.inner.lock();
1003        for range in to_add {
1004            changed |= inner.strategy.force_free(range)?;
1005        }
1006        if overflow_markers != inner.strategy.overflow_markers() {
1007            changed = true;
1008        }
1009        Ok(changed)
1010    }
1011
1012    /// Collects up to `extents_per_batch` free extents of size up to `max_extent_size` from
1013    /// `offset`.  The extents will be reserved.
1014    /// Note that only one `FreeExtents` can exist for the allocator at any time.
1015    pub async fn take_for_trimming(
1016        &self,
1017        offset: u64,
1018        max_extent_size: usize,
1019        extents_per_batch: usize,
1020    ) -> Result<TrimmableExtents<'_>, Error> {
1021        let _guard = self.allocation_mutex.lock().await;
1022
1023        let (mut result, listener) = TrimmableExtents::new(self);
1024        let mut bytes = 0;
1025
1026        // We can't just use self.strategy here because it doesn't necessarily hold all
1027        // free extent metadata in RAM.
1028        let mut layer_set = self.tree.empty_layer_set();
1029        layer_set.layers.push((self.temporary_allocations.clone() as Arc<dyn Layer<_, _>>).into());
1030        self.tree.add_all_layers_to_layer_set(&mut layer_set);
1031        let mut merger = layer_set.merger();
1032        let mut iter = self
1033            .filter(
1034                merger.query(Query::FullRange(&AllocatorKey { device_range: offset..0 })).await?,
1035                false,
1036            )
1037            .await?;
1038        let mut last_offset = offset;
1039        'outer: while last_offset < self.device_size {
1040            let mut range = match iter.get() {
1041                None => {
1042                    assert!(last_offset <= self.device_size);
1043                    let range = last_offset..self.device_size;
1044                    last_offset = self.device_size;
1045                    iter.advance().await?;
1046                    range
1047                }
1048                Some(ItemRef { key: AllocatorKey { device_range, .. }, .. }) => {
1049                    if device_range.end <= last_offset {
1050                        iter.advance().await?;
1051                        continue;
1052                    }
1053                    if device_range.start <= last_offset {
1054                        last_offset = device_range.end;
1055                        iter.advance().await?;
1056                        continue;
1057                    }
1058                    let range = last_offset..device_range.start;
1059                    last_offset = device_range.end;
1060                    iter.advance().await?;
1061                    range
1062                }
1063            };
1064            if range.start < offset {
1065                continue;
1066            }
1067
1068            // 'range' is based on the on-disk LSM tree. We need to check against any uncommitted
1069            // allocations and remove temporarily allocate the range for ourselves.
1070            let mut inner = self.inner.lock();
1071
1072            while range.start < range.end {
1073                let prefix =
1074                    range.start..std::cmp::min(range.start + max_extent_size as u64, range.end);
1075                range = prefix.end..range.end;
1076                bytes += prefix.length()?;
1077                // We can assume the following are safe because we've checked both LSM tree and
1078                // temporary_allocations while holding the lock.
1079                inner.strategy.remove(prefix.clone());
1080                self.temporary_allocations.insert(AllocatorItem {
1081                    key: AllocatorKey { device_range: prefix.clone() },
1082                    value: AllocatorValue::Abs { owner_object_id: INVALID_OBJECT_ID, count: 1 },
1083                    sequence: 0,
1084                })?;
1085                result.add_extent(prefix);
1086                if result.extents.len() == extents_per_batch {
1087                    break 'outer;
1088                }
1089            }
1090            if result.extents.len() == extents_per_batch {
1091                break 'outer;
1092            }
1093        }
1094        {
1095            let mut inner = self.inner.lock();
1096
1097            assert!(inner.trim_reserved_bytes == 0, "Multiple trims ongoing");
1098            inner.trim_listener = Some(listener);
1099            inner.trim_reserved_bytes = bytes;
1100            debug_assert!(
1101                (Saturating(inner.trim_reserved_bytes) + inner.unavailable_bytes()).0
1102                    <= self.device_size
1103            );
1104        }
1105        Ok(result)
1106    }
1107
1108    /// Returns all objects that exist in the parent store that pertain to this allocator.
1109    pub fn parent_objects(&self) -> Vec<u64> {
1110        // The allocator tree needs to store a file for each of the layers in the tree, so we return
1111        // those, since nothing else references them.
1112        self.inner.lock().info.layers.clone()
1113    }
1114
1115    /// Returns all the current owner byte limits (in pairs of `(owner_object_id, bytes)`).
1116    pub fn owner_byte_limits(&self) -> Vec<(u64, u64)> {
1117        self.inner.lock().info.limit_bytes.iter().map(|(k, v)| (*k, *v)).collect()
1118    }
1119
1120    /// Returns (allocated_bytes, byte_limit) for the given owner.
1121    pub fn owner_allocation_info(&self, owner_object_id: u64) -> (u64, Option<u64>) {
1122        let inner = self.inner.lock();
1123        (
1124            inner.owner_bytes.get(&owner_object_id).map(|b| b.used_bytes().0).unwrap_or(0u64),
1125            inner.info.limit_bytes.get(&owner_object_id).copied(),
1126        )
1127    }
1128
1129    /// Returns owner bytes debug information.
1130    pub fn owner_bytes_debug(&self) -> String {
1131        format!("{:?}", self.inner.lock().owner_bytes)
1132    }
1133
1134    fn needs_sync(&self) -> bool {
1135        // TODO(https://fxbug.dev/42178048): This will only trigger if *all* free space is taken up with
1136        // committed deallocated bytes, but we might want to trigger a sync if we're low and there
1137        // happens to be a lot of deallocated bytes as that might mean we can fully satisfy
1138        // allocation requests.
1139        let inner = self.inner.lock();
1140        inner.unavailable_bytes().0 >= self.device_size
1141    }
1142
1143    fn is_system_store(&self, owner_object_id: u64) -> bool {
1144        let fs = self.filesystem.upgrade().unwrap();
1145        owner_object_id == fs.object_manager().root_store_object_id()
1146            || owner_object_id == fs.object_manager().root_parent_store_object_id()
1147    }
1148
1149    /// Updates the accounting to track that a byte reservation has been moved out of an owner to
1150    /// the unattributed pool.
1151    pub fn disown_reservation(&self, old_owner_object_id: Option<u64>, amount: u64) {
1152        if old_owner_object_id.is_none() || amount == 0 {
1153            return;
1154        }
1155        // These 2 mutations should behave as though they're a single atomic mutation.
1156        let mut inner = self.inner.lock();
1157        inner.remove_reservation(old_owner_object_id, amount);
1158        inner.add_reservation(None, amount);
1159    }
1160
1161    /// Creates a lazy inspect node named `str` under `parent` which will yield statistics for the
1162    /// allocator when queried.
1163    pub fn track_statistics(self: &Arc<Self>, parent: &fuchsia_inspect::Node, name: &str) {
1164        let this = Arc::downgrade(self);
1165        parent.record_lazy_child(name, move || {
1166            let this_clone = this.clone();
1167            async move {
1168                let inspector = fuchsia_inspect::Inspector::default();
1169                if let Some(this) = this_clone.upgrade() {
1170                    let counters = this.counters.lock();
1171                    let root = inspector.root();
1172                    root.record_uint("max_extent_size_bytes", this.max_extent_size_bytes);
1173                    root.record_uint("bytes_total", this.device_size);
1174                    let (allocated, reserved, used, unavailable) = {
1175                        // TODO(https://fxbug.dev/42069513): Push-back or rate-limit to prevent DoS.
1176                        let inner = this.inner.lock();
1177                        (
1178                            inner.allocated_bytes().0,
1179                            inner.reserved_bytes(),
1180                            inner.used_bytes().0,
1181                            inner.unavailable_bytes().0,
1182                        )
1183                    };
1184                    root.record_uint("bytes_allocated", allocated);
1185                    root.record_uint("bytes_reserved", reserved);
1186                    root.record_uint("bytes_used", used);
1187                    root.record_uint("bytes_unavailable", unavailable);
1188
1189                    // TODO(https://fxbug.dev/42068224): Post-compute rather than manually computing
1190                    // metrics.
1191                    if let Some(x) = round_div(100 * allocated, this.device_size) {
1192                        root.record_uint("bytes_allocated_percent", x);
1193                    }
1194                    if let Some(x) = round_div(100 * reserved, this.device_size) {
1195                        root.record_uint("bytes_reserved_percent", x);
1196                    }
1197                    if let Some(x) = round_div(100 * used, this.device_size) {
1198                        root.record_uint("bytes_used_percent", x);
1199                    }
1200                    if let Some(x) = round_div(100 * unavailable, this.device_size) {
1201                        root.record_uint("bytes_unavailable_percent", x);
1202                    }
1203
1204                    root.record_uint("num_flushes", counters.num_flushes);
1205                    if let Some(last_flush_time) = counters.last_flush_time.as_ref() {
1206                        root.record_uint(
1207                            "last_flush_time_ms",
1208                            last_flush_time
1209                                .duration_since(std::time::UNIX_EPOCH)
1210                                .unwrap_or(std::time::Duration::ZERO)
1211                                .as_millis()
1212                                .try_into()
1213                                .unwrap_or(0u64),
1214                        );
1215                    }
1216
1217                    let data = this.allocation_size_histogram();
1218                    let alloc_sizes = root.create_uint_linear_histogram(
1219                        "allocation_size_histogram",
1220                        fuchsia_inspect::LinearHistogramParams {
1221                            floor: 1,
1222                            step_size: 1,
1223                            buckets: 64,
1224                        },
1225                    );
1226                    for (i, count) in data.iter().enumerate() {
1227                        if i != 0 {
1228                            alloc_sizes.insert_multiple(i as u64, *count as usize);
1229                        }
1230                    }
1231                    root.record(alloc_sizes);
1232
1233                    let data = this.inner.lock().rebuild_strategy_trigger_histogram;
1234                    let triggers = root.create_uint_linear_histogram(
1235                        "rebuild_strategy_triggers",
1236                        fuchsia_inspect::LinearHistogramParams {
1237                            floor: 1,
1238                            step_size: 1,
1239                            buckets: 64,
1240                        },
1241                    );
1242                    for (i, count) in data.iter().enumerate() {
1243                        if i != 0 {
1244                            triggers.insert_multiple(i as u64, *count as usize);
1245                        }
1246                    }
1247                    root.record(triggers);
1248                }
1249                Ok(inspector)
1250            }
1251            .boxed()
1252        });
1253    }
1254
1255    /// Returns the offset of the first byte which has not been used by the allocator since its
1256    /// creation.
1257    /// NB: This does *not* take into account existing allocations.  This is only reliable when the
1258    /// allocator was created from scratch, without any pre-existing allocations.
1259    pub fn maximum_offset(&self) -> u64 {
1260        self.maximum_offset.load(Ordering::Relaxed)
1261    }
1262}
1263
1264impl Drop for Allocator {
1265    fn drop(&mut self) {
1266        let inner = self.inner.lock();
1267        // Uncommitted and reserved should be released back using RAII, so they should be zero.
1268        assert_eq!(inner.uncommitted_allocated_bytes(), 0);
1269        assert_eq!(inner.reserved_bytes(), 0);
1270    }
1271}
1272
1273#[fxfs_trace::trace]
1274impl Allocator {
1275    /// Returns the object ID for the allocator.
1276    pub fn object_id(&self) -> u64 {
1277        self.object_id
1278    }
1279
1280    /// Returns information about the allocator such as the layer files storing persisted
1281    /// allocations.
1282    pub fn info(&self) -> AllocatorInfo {
1283        self.inner.lock().info.clone()
1284    }
1285
1286    /// Tries to allocate enough space for |object_range| in the specified object and returns the
1287    /// device range allocated.
1288    /// The allocated range may be short (e.g. due to fragmentation), in which case the caller can
1289    /// simply call allocate again until they have enough blocks.
1290    ///
1291    /// We also store the object store ID of the store that the allocation should be assigned to so
1292    /// that we have a means to delete encrypted stores without needing the encryption key.
1293    #[trace]
1294    pub async fn allocate(
1295        self: &Arc<Self>,
1296        transaction: &mut Transaction<'_>,
1297        owner_object_id: u64,
1298        mut len: u64,
1299    ) -> Result<Range<u64>, Error> {
1300        ensure!(self.allocations_allowed.load(Ordering::SeqCst), FxfsError::Unavailable);
1301        assert_eq!(len % self.block_size, 0);
1302        len = std::cmp::min(len, self.max_extent_size_bytes);
1303        debug_assert_ne!(owner_object_id, INVALID_OBJECT_ID);
1304
1305        // Make sure we have space reserved before we try and find the space.
1306        let reservation = if let Some(reservation) = transaction.allocator_reservation {
1307            match reservation.owner_object_id {
1308                // If there is no owner, this must be a system store that we're allocating for.
1309                None => assert!(self.is_system_store(owner_object_id)),
1310                // If it has an owner, it should not be different than the allocating owner.
1311                Some(res_owner_object_id) => assert_eq!(owner_object_id, res_owner_object_id),
1312            };
1313            // Reservation limits aren't necessarily a multiple of the block size.
1314            let r = reservation
1315                .reserve_with(|limit| std::cmp::min(len, round_down(limit, self.block_size)));
1316            len = r.amount();
1317            Left(r)
1318        } else {
1319            let mut inner = self.inner.lock();
1320            assert!(inner.opened);
1321            // Do not exceed the limit for the owner or the device.
1322            let device_used = inner.used_bytes();
1323            let owner_bytes_left = inner.owner_id_bytes_left(owner_object_id);
1324            // We must take care not to use up space that might be reserved.
1325            let limit =
1326                std::cmp::min(owner_bytes_left, (Saturating(self.device_size) - device_used).0);
1327            len = round_down(std::cmp::min(len, limit), self.block_size);
1328            let owner_entry = inner.owner_bytes.entry(owner_object_id).or_default();
1329            owner_entry.reserved_bytes += len;
1330            Right(ReservationImpl::<_, Self>::new(&**self, Some(owner_object_id), len))
1331        };
1332
1333        ensure!(len > 0, FxfsError::NoSpace);
1334
1335        // If volumes have been deleted, flush the device so that we can use any of the freed space.
1336        let volumes_deleted = {
1337            let inner = self.inner.lock();
1338            (!inner.volumes_deleted_pending_sync.is_empty())
1339                .then(|| inner.volumes_deleted_pending_sync.clone())
1340        };
1341
1342        if let Some(volumes_deleted) = volumes_deleted {
1343            // No locks are held here, so in theory, there could be unnecessary syncs, but it
1344            // should be sufficiently rare that it won't matter.
1345            self.filesystem
1346                .upgrade()
1347                .unwrap()
1348                .sync(SyncOptions {
1349                    flush_device: true,
1350                    precondition: Some(Box::new(|| {
1351                        !self.inner.lock().volumes_deleted_pending_sync.is_empty()
1352                    })),
1353                    ..Default::default()
1354                })
1355                .await?;
1356
1357            {
1358                let mut inner = self.inner.lock();
1359                for owner_id in volumes_deleted {
1360                    inner.volumes_deleted_pending_sync.remove(&owner_id);
1361                    inner.marked_for_deletion.insert(owner_id);
1362                }
1363            }
1364
1365            let _guard = self.allocation_mutex.lock().await;
1366            self.rebuild_strategy().await?;
1367        }
1368
1369        #[allow(clippy::never_loop)] // Loop used as a for {} else {}.
1370        let _guard = 'sync: loop {
1371            // Cap number of sync attempts before giving up on finding free space.
1372            for _ in 0..10 {
1373                {
1374                    let guard = self.allocation_mutex.lock().await;
1375
1376                    if !self.needs_sync() {
1377                        break 'sync guard;
1378                    }
1379                }
1380
1381                // All the free space is currently tied up with deallocations, so we need to sync
1382                // and flush the device to free that up.
1383                //
1384                // We can't hold the allocation lock whilst we sync here because the allocation lock
1385                // is also taken in apply_mutations, which is called when journal locks are held,
1386                // and we call sync here which takes those same locks, so it would have the
1387                // potential to result in a deadlock.  Sync holds its own lock to guard against
1388                // multiple syncs occurring at the same time, and we can supply a precondition that
1389                // is evaluated under that lock to ensure we don't sync twice if we don't need to.
1390                self.filesystem
1391                    .upgrade()
1392                    .unwrap()
1393                    .sync(SyncOptions {
1394                        flush_device: true,
1395                        precondition: Some(Box::new(|| self.needs_sync())),
1396                        ..Default::default()
1397                    })
1398                    .await?;
1399            }
1400            bail!(
1401                anyhow!(FxfsError::NoSpace).context("Sync failed to yield sufficient free space.")
1402            );
1403        };
1404
1405        let mut trim_listener = None;
1406        {
1407            let mut inner = self.inner.lock();
1408            inner.allocation_size_histogram[std::cmp::min(63, len / self.block_size) as usize] += 1;
1409
1410            // If trimming would be the reason that this allocation gets cut short, wait for
1411            // trimming to complete before proceeding.
1412            let avail = self
1413                .device_size
1414                .checked_sub(inner.unavailable_bytes().0)
1415                .ok_or(FxfsError::Inconsistent)?;
1416            let free_and_not_being_trimmed =
1417                inner.bytes_available_not_being_trimmed(self.device_size)?;
1418            if free_and_not_being_trimmed < std::cmp::min(len, avail) {
1419                debug_assert!(inner.trim_reserved_bytes > 0);
1420                trim_listener = std::mem::take(&mut inner.trim_listener);
1421            }
1422        }
1423
1424        if let Some(listener) = trim_listener {
1425            listener.await;
1426        }
1427
1428        let result = loop {
1429            {
1430                let mut inner = self.inner.lock();
1431
1432                // While we know rebuild_strategy and take_for_trim are not running (we hold guard),
1433                // apply temporary_allocation removals.
1434                for device_range in inner.dropped_temporary_allocations.drain(..) {
1435                    self.temporary_allocations.erase(&AllocatorKey { device_range });
1436                }
1437
1438                match inner.strategy.allocate(len) {
1439                    Err(FxfsError::NotFound) => {
1440                        // Overflow. Fall through and rebuild
1441                        inner.rebuild_strategy_trigger_histogram
1442                            [std::cmp::min(63, (len / self.block_size) as usize)] += 1;
1443                    }
1444                    Err(err) => {
1445                        error!(err:%; "Likely filesystem corruption.");
1446                        return Err(err.into());
1447                    }
1448                    Ok(x) => {
1449                        break x;
1450                    }
1451                }
1452            }
1453            // We've run out of extents of the requested length in RAM but there
1454            // exists more of this size on device. Rescan device and circle back.
1455            // We already hold the allocation_mutex, so exclusive access is guaranteed.
1456            if !self.rebuild_strategy().await? {
1457                error!("Cannot find additional free space. Corruption?");
1458                return Err(FxfsError::Inconsistent.into());
1459            }
1460        };
1461
1462        debug!(device_range:? = result; "allocate");
1463
1464        let len = result.length().unwrap();
1465        let reservation_owner = reservation.either(
1466            // Left means we got an outside reservation.
1467            |l| {
1468                l.forget_some(len);
1469                l.owner_object_id()
1470            },
1471            |r| {
1472                r.forget_some(len);
1473                r.owner_object_id()
1474            },
1475        );
1476
1477        {
1478            let mut inner = self.inner.lock();
1479            let owner_entry = inner.owner_bytes.entry(owner_object_id).or_default();
1480            owner_entry.uncommitted_allocated_bytes += len;
1481            // If the reservation has an owner, ensure they are the same.
1482            assert_eq!(owner_object_id, reservation_owner.unwrap_or(owner_object_id));
1483            inner.remove_reservation(reservation_owner, len);
1484            self.temporary_allocations.insert(AllocatorItem {
1485                key: AllocatorKey { device_range: result.clone() },
1486                value: AllocatorValue::Abs { owner_object_id, count: 1 },
1487                sequence: 0,
1488            })?;
1489        }
1490
1491        let mutation =
1492            AllocatorMutation::Allocate { device_range: result.clone().into(), owner_object_id };
1493        assert!(transaction.add(self.object_id(), Mutation::Allocator(mutation)).is_none());
1494
1495        Ok(result)
1496    }
1497
1498    /// Marks the given device range as allocated.  The main use case for this at this time is for
1499    /// the super-block which needs to be at a fixed location on the device.
1500    #[trace]
1501    pub fn mark_allocated(
1502        &self,
1503        transaction: &mut Transaction<'_>,
1504        owner_object_id: u64,
1505        device_range: Range<u64>,
1506    ) -> Result<(), Error> {
1507        debug_assert_ne!(owner_object_id, INVALID_OBJECT_ID);
1508        {
1509            let len = device_range.length().map_err(|_| FxfsError::InvalidArgs)?;
1510
1511            let mut inner = self.inner.lock();
1512            let device_used = inner.used_bytes();
1513            let owner_id_bytes_left = inner.owner_id_bytes_left(owner_object_id);
1514            let owner_entry = inner.owner_bytes.entry(owner_object_id).or_default();
1515            ensure!(
1516                device_range.end <= self.device_size
1517                    && (Saturating(self.device_size) - device_used).0 >= len
1518                    && owner_id_bytes_left >= len,
1519                FxfsError::NoSpace
1520            );
1521            if let Some(reservation) = &mut transaction.allocator_reservation {
1522                // The transaction takes ownership of this hold.
1523                reservation.reserve(len).ok_or(FxfsError::NoSpace)?.forget();
1524            }
1525            owner_entry.uncommitted_allocated_bytes += len;
1526            inner.strategy.remove(device_range.clone());
1527            self.temporary_allocations.insert(AllocatorItem {
1528                key: AllocatorKey { device_range: device_range.clone() },
1529                value: AllocatorValue::Abs { owner_object_id, count: 1 },
1530                sequence: 0,
1531            })?;
1532        }
1533        let mutation =
1534            AllocatorMutation::Allocate { device_range: device_range.into(), owner_object_id };
1535        transaction.add(self.object_id(), Mutation::Allocator(mutation));
1536        Ok(())
1537    }
1538
1539    /// Sets the limits for an owner object in terms of usage.
1540    pub fn set_bytes_limit(
1541        &self,
1542        transaction: &mut Transaction<'_>,
1543        owner_object_id: u64,
1544        bytes: u64,
1545    ) -> Result<(), Error> {
1546        // System stores cannot be given limits.
1547        assert!(!self.is_system_store(owner_object_id));
1548        transaction.add(
1549            self.object_id(),
1550            Mutation::Allocator(AllocatorMutation::SetLimit { owner_object_id, bytes }),
1551        );
1552        Ok(())
1553    }
1554
1555    /// Gets the bytes limit for an owner object.
1556    pub fn get_bytes_limit(&self, owner_object_id: u64) -> Option<u64> {
1557        self.inner.lock().info.limit_bytes.get(&owner_object_id).copied()
1558    }
1559
1560    /// Deallocates the given device range for the specified object.
1561    #[trace]
1562    pub async fn deallocate(
1563        &self,
1564        transaction: &mut Transaction<'_>,
1565        owner_object_id: u64,
1566        dealloc_range: Range<u64>,
1567    ) -> Result<u64, Error> {
1568        debug!(device_range:? = dealloc_range; "deallocate");
1569        ensure!(dealloc_range.is_valid(), FxfsError::InvalidArgs);
1570        // We don't currently support sharing of allocations (value.count always equals 1), so as
1571        // long as we can assume the deallocated range is actually allocated, we can avoid device
1572        // access.
1573        let deallocated = dealloc_range.end - dealloc_range.start;
1574        let mutation = AllocatorMutation::Deallocate {
1575            device_range: dealloc_range.clone().into(),
1576            owner_object_id,
1577        };
1578        transaction.add(self.object_id(), Mutation::Allocator(mutation));
1579
1580        let _guard = self.allocation_mutex.lock().await;
1581
1582        // We use `dropped_temporary_allocations` to defer removals from `temporary_allocations` in
1583        // places where we can't execute async code or take locks.
1584        //
1585        // It's important we don't ever remove entries from `temporary_allocations` without
1586        // holding the `allocation_mutex` lock or else we may end up with an inconsistent view of
1587        // available disk space when we combine temporary_allocations with the LSMTree.
1588        // This is normally done in `allocate()` but we also need to apply these here because
1589        // `temporary_allocations` is also used to track deallocated space until it has been
1590        // flushed (see comment below). A user may allocate and then deallocate space before calling
1591        // allocate() a second time, so if we do not clean up here, we may end up with the same
1592        // range in temporary_allocations twice (once for allocate, once for deallocate).
1593        let mut inner = self.inner.lock();
1594        for device_range in inner.dropped_temporary_allocations.drain(..) {
1595            self.temporary_allocations.erase(&AllocatorKey { device_range });
1596        }
1597
1598        // We can't reuse deallocated space immediately because failure to successfully flush will
1599        // mean that on next mount, we may find this space is still assigned to the deallocated
1600        // region. To avoid immediate reuse, we hold these regions in 'temporary_allocations' until
1601        // after a successful flush so we know the region is safe to reuse.
1602        self.temporary_allocations
1603            .insert(AllocatorItem {
1604                key: AllocatorKey { device_range: dealloc_range.clone() },
1605                value: AllocatorValue::Abs { owner_object_id, count: 1 },
1606                sequence: 0,
1607            })
1608            .context("tracking deallocated")?;
1609
1610        Ok(deallocated)
1611    }
1612
1613    /// Marks allocations associated with a given |owner_object_id| for deletion.
1614    ///
1615    /// This is used as part of deleting encrypted volumes (ObjectStore) without having the keys.
1616    ///
1617    /// MarkForDeletion mutations eventually manipulates allocator metadata (AllocatorInfo) instead
1618    /// of the mutable layer but we must be careful not to do this too early and risk premature
1619    /// reuse of extents.
1620    ///
1621    /// Replay is not guaranteed until the *device* gets flushed, so we cannot reuse the deleted
1622    /// extents until we've flushed the device.
1623    ///
1624    /// TODO(b/316827348): Consider removing the use of mark_for_deletion in AllocatorInfo and
1625    /// just compacting?
1626    ///
1627    /// After an allocator.flush() (i.e. a major compaction), we know that there is no data left
1628    /// in the layer files for this owner_object_id and we are able to clear `marked_for_deletion`.
1629    pub fn mark_for_deletion(&self, transaction: &mut Transaction<'_>, owner_object_id: u64) {
1630        // Note that because the actual time of deletion (the next major compaction) is undefined,
1631        // |owner_object_id| should not be reused after this call.
1632        transaction.add(
1633            self.object_id(),
1634            Mutation::Allocator(AllocatorMutation::MarkForDeletion(owner_object_id)),
1635        );
1636    }
1637
1638    /// Called when the device has been flush and indicates what the journal log offset was when
1639    /// that happened.
1640    pub fn did_flush_device(&self, flush_log_offset: u64) {
1641        // First take out the deallocations that we now know to be flushed.  The list is maintained
1642        // in order, so we can stop on the first entry that we find that should not be unreserved
1643        // yet.
1644        #[allow(clippy::never_loop)] // Loop used as a for {} else {}.
1645        let deallocs = 'deallocs_outer: loop {
1646            let mut inner = self.inner.lock();
1647            for (index, dealloc) in inner.committed_deallocated.iter().enumerate() {
1648                if dealloc.log_file_offset >= flush_log_offset {
1649                    let mut deallocs = inner.committed_deallocated.split_off(index);
1650                    // Swap because we want the opposite of what split_off does.
1651                    std::mem::swap(&mut inner.committed_deallocated, &mut deallocs);
1652                    break 'deallocs_outer deallocs;
1653                }
1654            }
1655            break std::mem::take(&mut inner.committed_deallocated);
1656        };
1657
1658        // Now we can free those elements.
1659        let mut inner = self.inner.lock();
1660        let mut totals = BTreeMap::<u64, u64>::new();
1661        for dealloc in deallocs {
1662            *(totals.entry(dealloc.owner_object_id).or_default()) +=
1663                dealloc.range.length().unwrap();
1664            inner.strategy.free(dealloc.range.clone()).expect("dealloced ranges");
1665            self.temporary_allocations.erase(&AllocatorKey { device_range: dealloc.range.clone() });
1666        }
1667
1668        // This *must* come after we've removed the records from reserved reservations because the
1669        // allocator uses this value to decide whether or not a device-flush is required and it must
1670        // be possible to find free space if it thinks no device-flush is required.
1671        for (owner_object_id, total) in totals {
1672            match inner.owner_bytes.get_mut(&owner_object_id) {
1673                Some(counters) => counters.committed_deallocated_bytes -= total,
1674                None => panic!("Failed to decrement for unknown owner: {}", owner_object_id),
1675            }
1676        }
1677    }
1678
1679    /// Returns a reservation that can be used later, or None if there is insufficient space. The
1680    /// |owner_object_id| indicates which object in the root object store the reservation is for.
1681    pub fn reserve(
1682        self: Arc<Self>,
1683        owner_object_id: Option<u64>,
1684        amount: u64,
1685    ) -> Option<Reservation> {
1686        {
1687            let mut inner = self.inner.lock();
1688
1689            let device_free = (Saturating(self.device_size) - inner.used_bytes()).0;
1690
1691            let limit = match owner_object_id {
1692                Some(id) => std::cmp::min(inner.owner_id_bytes_left(id), device_free),
1693                None => device_free,
1694            };
1695            if limit < amount {
1696                return None;
1697            }
1698            inner.add_reservation(owner_object_id, amount);
1699        }
1700        Some(Reservation::new(self, owner_object_id, amount))
1701    }
1702
1703    /// Like reserve, but takes a callback is passed the |limit| and should return the amount,
1704    /// which can be zero.
1705    pub fn reserve_with(
1706        self: Arc<Self>,
1707        owner_object_id: Option<u64>,
1708        amount: impl FnOnce(u64) -> u64,
1709    ) -> Reservation {
1710        let amount = {
1711            let mut inner = self.inner.lock();
1712
1713            let device_free = (Saturating(self.device_size) - inner.used_bytes()).0;
1714
1715            let amount = amount(match owner_object_id {
1716                Some(id) => std::cmp::min(inner.owner_id_bytes_left(id), device_free),
1717                None => device_free,
1718            });
1719
1720            inner.add_reservation(owner_object_id, amount);
1721
1722            amount
1723        };
1724
1725        Reservation::new(self, owner_object_id, amount)
1726    }
1727
1728    /// Returns the total number of allocated bytes.
1729    pub fn get_allocated_bytes(&self) -> u64 {
1730        self.inner.lock().allocated_bytes().0
1731    }
1732
1733    /// Returns the size of bytes available to allocate.
1734    pub fn get_disk_bytes(&self) -> u64 {
1735        self.device_size
1736    }
1737
1738    /// Returns the total number of allocated bytes per owner_object_id.
1739    /// Note that this is quite an expensive operation as it copies the collection.
1740    /// This is intended for use in fsck() and friends, not general use code.
1741    pub fn get_owner_allocated_bytes(&self) -> BTreeMap<u64, u64> {
1742        self.inner.lock().owner_bytes.iter().map(|(k, v)| (*k, v.allocated_bytes.0)).collect()
1743    }
1744
1745    /// Returns the number of allocated and reserved bytes.
1746    pub fn get_used_bytes(&self) -> Saturating<u64> {
1747        let inner = self.inner.lock();
1748        inner.used_bytes()
1749    }
1750}
1751
1752impl ReservationOwner for Allocator {
1753    fn release_reservation(&self, owner_object_id: Option<u64>, amount: u64) {
1754        self.inner.lock().remove_reservation(owner_object_id, amount);
1755    }
1756}
1757
1758#[async_trait]
1759impl JournalingObject for Allocator {
1760    fn apply_mutation(
1761        &self,
1762        mutation: Mutation,
1763        context: &ApplyContext<'_, '_>,
1764        _assoc_obj: AssocObj<'_>,
1765    ) -> Result<(), Error> {
1766        match mutation {
1767            Mutation::Allocator(AllocatorMutation::MarkForDeletion(owner_object_id)) => {
1768                let mut inner = self.inner.lock();
1769                inner.owner_bytes.remove(&owner_object_id);
1770
1771                // We use `info.marked_for_deletion` to track the committed state and
1772                // `inner.marked_for_deletion` to track volumes marked for deletion *after* we have
1773                // flushed the device.  It is not safe to use extents belonging to deleted volumes
1774                // until after we have flushed the device.
1775                inner.info.marked_for_deletion.insert(owner_object_id);
1776                inner.volumes_deleted_pending_sync.insert(owner_object_id);
1777
1778                inner.info.limit_bytes.remove(&owner_object_id);
1779            }
1780            Mutation::Allocator(AllocatorMutation::Allocate { device_range, owner_object_id }) => {
1781                self.maximum_offset.fetch_max(device_range.end, Ordering::Relaxed);
1782                let item = AllocatorItem {
1783                    key: AllocatorKey { device_range: device_range.clone().into() },
1784                    value: AllocatorValue::Abs { count: 1, owner_object_id },
1785                    sequence: context.checkpoint.file_offset,
1786                };
1787                let len = item.key.device_range.length().unwrap();
1788                let lower_bound = item.key.lower_bound_for_merge_into();
1789                self.tree.merge_into(item, &lower_bound);
1790                let mut inner = self.inner.lock();
1791                let entry = inner.owner_bytes.entry(owner_object_id).or_default();
1792                entry.allocated_bytes += len;
1793                if let ApplyMode::Live(transaction) = context.mode {
1794                    entry.uncommitted_allocated_bytes -= len;
1795                    // Note that we cannot drop entries from temporary_allocations without holding
1796                    // the allocation_mutex as it may introduce races. We instead add the range to
1797                    // a Vec that can be applied later when we hold the lock (See comment on
1798                    // `dropped_temporary_allocations` above).
1799                    inner.dropped_temporary_allocations.push(device_range.into());
1800                    if let Some(reservation) = transaction.allocator_reservation {
1801                        reservation.commit(len);
1802                    }
1803                }
1804            }
1805            Mutation::Allocator(AllocatorMutation::Deallocate {
1806                device_range,
1807                owner_object_id,
1808            }) => {
1809                let item = AllocatorItem {
1810                    key: AllocatorKey { device_range: device_range.into() },
1811                    value: AllocatorValue::None,
1812                    sequence: context.checkpoint.file_offset,
1813                };
1814                let len = item.key.device_range.length().unwrap();
1815
1816                {
1817                    let mut inner = self.inner.lock();
1818                    {
1819                        let entry = inner.owner_bytes.entry(owner_object_id).or_default();
1820                        entry.allocated_bytes -= len;
1821                        if context.mode.is_live() {
1822                            entry.committed_deallocated_bytes += len;
1823                        }
1824                    }
1825                    if context.mode.is_live() {
1826                        inner.committed_deallocated.push_back(CommittedDeallocation {
1827                            log_file_offset: context.checkpoint.file_offset,
1828                            range: item.key.device_range.clone(),
1829                            owner_object_id,
1830                        });
1831                    }
1832                    if let ApplyMode::Live(Transaction {
1833                        allocator_reservation: Some(reservation),
1834                        ..
1835                    }) = context.mode
1836                    {
1837                        inner.add_reservation(reservation.owner_object_id(), len);
1838                        reservation.add(len);
1839                    }
1840                }
1841                let lower_bound = item.key.lower_bound_for_merge_into();
1842                self.tree.merge_into(item, &lower_bound);
1843            }
1844            Mutation::Allocator(AllocatorMutation::SetLimit { owner_object_id, bytes }) => {
1845                // Journal replay is ordered and each of these calls is idempotent. So the last one
1846                // will be respected, it doesn't matter if the value is already set, or gets changed
1847                // multiple times during replay. When it gets opened it will be merged in with the
1848                // snapshot.
1849                self.inner.lock().info.limit_bytes.insert(owner_object_id, bytes);
1850            }
1851            Mutation::BeginFlush => {
1852                self.tree.seal();
1853                // Transfer our running count for allocated_bytes so that it gets written to the new
1854                // info file when flush completes.
1855                let mut inner = self.inner.lock();
1856                let allocated_bytes =
1857                    inner.owner_bytes.iter().map(|(k, v)| (*k, v.allocated_bytes.0)).collect();
1858                inner.info.allocated_bytes = allocated_bytes;
1859            }
1860            Mutation::EndFlush => {}
1861            _ => bail!("unexpected mutation: {:?}", mutation),
1862        }
1863        Ok(())
1864    }
1865
1866    fn drop_mutation(&self, mutation: Mutation, transaction: &Transaction<'_>) {
1867        match mutation {
1868            Mutation::Allocator(AllocatorMutation::Allocate { device_range, owner_object_id }) => {
1869                let len = device_range.length().unwrap();
1870                let mut inner = self.inner.lock();
1871                inner
1872                    .owner_bytes
1873                    .entry(owner_object_id)
1874                    .or_default()
1875                    .uncommitted_allocated_bytes -= len;
1876                if let Some(reservation) = transaction.allocator_reservation {
1877                    let res_owner = reservation.owner_object_id();
1878                    inner.add_reservation(res_owner, len);
1879                    reservation.release_reservation(res_owner, len);
1880                }
1881                inner.strategy.free(device_range.clone().into()).expect("drop mutaton");
1882                self.temporary_allocations
1883                    .erase(&AllocatorKey { device_range: device_range.into() });
1884            }
1885            Mutation::Allocator(AllocatorMutation::Deallocate { device_range, .. }) => {
1886                self.temporary_allocations
1887                    .erase(&AllocatorKey { device_range: device_range.into() });
1888            }
1889            _ => {}
1890        }
1891    }
1892
1893    async fn flush(&self) -> Result<Version, Error> {
1894        let filesystem = self.filesystem.upgrade().unwrap();
1895        let object_manager = filesystem.object_manager();
1896        let earliest_version = self.tree.get_earliest_version();
1897        if !object_manager.needs_flush(self.object_id()) && earliest_version == LATEST_VERSION {
1898            // Early exit, but still return the earliest version used by a struct in the tree
1899            return Ok(earliest_version);
1900        }
1901
1902        let fs = self.filesystem.upgrade().unwrap();
1903        let mut flusher = Flusher::new(self, &fs).await;
1904        let (new_layer_file, info) = flusher.start().await?;
1905        flusher.finish(new_layer_file, info).await
1906    }
1907}
1908
1909// The merger is unable to merge extents that exist like the following:
1910//
1911//     |----- +1 -----|
1912//                    |----- -1 -----|
1913//                    |----- +2 -----|
1914//
1915// It cannot coalesce them because it has to emit the +1 record so that it can move on and merge the
1916// -1 and +2 records. To address this, we add another stage that applies after merging which
1917// coalesces records after they have been emitted.  This is a bit simpler than merging because the
1918// records cannot overlap, so it's just a question of merging adjacent records if they happen to
1919// have the same delta and object_id.
1920
1921pub struct CoalescingIterator<I> {
1922    iter: I,
1923    item: Option<AllocatorItem>,
1924}
1925
1926impl<I: LayerIterator<AllocatorKey, AllocatorValue>> CoalescingIterator<I> {
1927    pub async fn new(iter: I) -> Result<CoalescingIterator<I>, Error> {
1928        let mut iter = Self { iter, item: None };
1929        iter.advance().await?;
1930        Ok(iter)
1931    }
1932}
1933
1934#[async_trait]
1935impl<I: LayerIterator<AllocatorKey, AllocatorValue>> LayerIterator<AllocatorKey, AllocatorValue>
1936    for CoalescingIterator<I>
1937{
1938    async fn advance(&mut self) -> Result<(), Error> {
1939        self.item = self.iter.get().map(|x| x.cloned());
1940        if self.item.is_none() {
1941            return Ok(());
1942        }
1943        let left = self.item.as_mut().unwrap();
1944        loop {
1945            self.iter.advance().await?;
1946            match self.iter.get() {
1947                None => return Ok(()),
1948                Some(right) => {
1949                    // The two records cannot overlap.
1950                    ensure!(
1951                        left.key.device_range.end <= right.key.device_range.start,
1952                        FxfsError::Inconsistent
1953                    );
1954                    // We can only coalesce records if they are touching and have the same value.
1955                    if left.key.device_range.end < right.key.device_range.start
1956                        || left.value != *right.value
1957                    {
1958                        return Ok(());
1959                    }
1960                    left.key.device_range.end = right.key.device_range.end;
1961                }
1962            }
1963        }
1964    }
1965
1966    fn get(&self) -> Option<ItemRef<'_, AllocatorKey, AllocatorValue>> {
1967        self.item.as_ref().map(|x| x.as_item_ref())
1968    }
1969}
1970
1971struct Flusher<'a> {
1972    allocator: &'a Allocator,
1973    fs: &'a Arc<FxFilesystem>,
1974    _guard: WriteGuard<'a>,
1975}
1976
1977impl<'a> Flusher<'a> {
1978    async fn new(allocator: &'a Allocator, fs: &'a Arc<FxFilesystem>) -> Self {
1979        let keys = lock_keys![LockKey::flush(allocator.object_id())];
1980        Self { allocator, fs, _guard: fs.lock_manager().write_lock(keys).await }
1981    }
1982
1983    fn txn_options(allocator_reservation: &Reservation) -> Options<'_> {
1984        Options {
1985            skip_journal_checks: true,
1986            borrow_metadata_space: true,
1987            allocator_reservation: Some(allocator_reservation),
1988            ..Default::default()
1989        }
1990    }
1991
1992    async fn start(&mut self) -> Result<(DataObjectHandle<ObjectStore>, AllocatorInfo), Error> {
1993        let object_manager = self.fs.object_manager();
1994        let mut transaction = self
1995            .fs
1996            .clone()
1997            .new_transaction(lock_keys![], Self::txn_options(object_manager.metadata_reservation()))
1998            .await?;
1999
2000        let root_store = self.fs.root_store();
2001        let layer_object_handle = ObjectStore::create_object(
2002            &root_store,
2003            &mut transaction,
2004            HandleOptions { skip_journal_checks: true, ..Default::default() },
2005            None,
2006        )
2007        .await?;
2008        root_store.add_to_graveyard(&mut transaction, layer_object_handle.object_id());
2009        // It's important that this transaction does not include any allocations because we use
2010        // BeginFlush as a snapshot point for mutations to the tree: other allocator mutations
2011        // within this transaction might get applied before seal (which would be OK), but they could
2012        // equally get applied afterwards (since Transaction makes no guarantees about the order in
2013        // which mutations are applied whilst committing), in which case they'd get lost on replay
2014        // because the journal will only send mutations that follow this transaction.
2015        transaction.add(self.allocator.object_id(), Mutation::BeginFlush);
2016        let info = transaction
2017            .commit_with_callback(|_| {
2018                // We must capture `info` as it is when we start flushing. Subsequent transactions
2019                // can end up modifying `info` and we shouldn't capture those here.
2020                self.allocator.inner.lock().info.clone()
2021            })
2022            .await?;
2023        Ok((layer_object_handle, info))
2024    }
2025
2026    async fn finish(
2027        self,
2028        layer_object_handle: DataObjectHandle<ObjectStore>,
2029        mut info: AllocatorInfo,
2030    ) -> Result<Version, Error> {
2031        let object_manager = self.fs.object_manager();
2032        let txn_options = Self::txn_options(object_manager.metadata_reservation());
2033
2034        let layer_set = self.allocator.tree.immutable_layer_set();
2035        let total_len = layer_set.sum_len();
2036        {
2037            let mut merger = layer_set.merger();
2038            let iter = self.allocator.filter(merger.query(Query::FullScan).await?, true).await?;
2039            let iter = CoalescingIterator::new(iter).await?;
2040            self.allocator
2041                .tree
2042                .compact_with_iterator(
2043                    iter,
2044                    total_len,
2045                    DirectWriter::new(&layer_object_handle, txn_options).await,
2046                    layer_object_handle.block_size(),
2047                )
2048                .await?;
2049        }
2050
2051        let root_store = self.fs.root_store();
2052
2053        // Both of these forward-declared variables need to outlive the transaction.
2054        let object_handle;
2055        let reservation_update;
2056        let mut transaction = self
2057            .fs
2058            .clone()
2059            .new_transaction(
2060                lock_keys![LockKey::object(
2061                    root_store.store_object_id(),
2062                    self.allocator.object_id()
2063                )],
2064                txn_options,
2065            )
2066            .await?;
2067        let mut serialized_info = Vec::new();
2068
2069        debug!(oid = layer_object_handle.object_id(); "new allocator layer file");
2070        object_handle = ObjectStore::open_object(
2071            &root_store,
2072            self.allocator.object_id(),
2073            HandleOptions::default(),
2074            None,
2075        )
2076        .await?;
2077
2078        // Move all the existing layers to the graveyard.
2079        for object_id in &info.layers {
2080            root_store.add_to_graveyard(&mut transaction, *object_id);
2081        }
2082
2083        // Write out updated info.
2084
2085        // After successfully flushing, all the stores that were marked for deletion at the time of
2086        // the BeginFlush transaction, no longer need to be marked for deletion.  There can be
2087        // stores that have been deleted since the BeginFlush transaction, but they will be covered
2088        // by a MarkForDeletion mutation.
2089        let marked_for_deletion = std::mem::take(&mut info.marked_for_deletion);
2090
2091        info.layers = vec![layer_object_handle.object_id()];
2092
2093        info.serialize_with_version(&mut serialized_info)?;
2094
2095        let mut buf = object_handle.allocate_buffer(serialized_info.len()).await;
2096        buf.as_mut_slice()[..serialized_info.len()].copy_from_slice(&serialized_info[..]);
2097        object_handle.txn_write(&mut transaction, 0u64, buf.as_ref()).await?;
2098
2099        reservation_update = ReservationUpdate::new(tree::reservation_amount_from_layer_size(
2100            layer_object_handle.get_size(),
2101        ));
2102
2103        // It's important that EndFlush is in the same transaction that we write AllocatorInfo,
2104        // because we use EndFlush to make the required adjustments to allocated_bytes.
2105        transaction.add_with_object(
2106            self.allocator.object_id(),
2107            Mutation::EndFlush,
2108            AssocObj::Borrowed(&reservation_update),
2109        );
2110        root_store.remove_from_graveyard(&mut transaction, layer_object_handle.object_id());
2111
2112        let layers = layers_from_handles([layer_object_handle]).await?;
2113        transaction
2114            .commit_with_callback(|_| {
2115                self.allocator.tree.set_layers(layers);
2116
2117                // At this point we've committed the new layers to disk so we can start using them.
2118                // This means we can also switch to the new AllocatorInfo which clears
2119                // marked_for_deletion.
2120                let mut inner = self.allocator.inner.lock();
2121                inner.info.layers = info.layers;
2122                for owner_id in marked_for_deletion {
2123                    inner.marked_for_deletion.remove(&owner_id);
2124                    inner.info.marked_for_deletion.remove(&owner_id);
2125                }
2126            })
2127            .await?;
2128
2129        // Now close the layers and purge them.
2130        for layer in layer_set.layers {
2131            let object_id = layer.handle().map(|h| h.object_id());
2132            layer.close_layer().await;
2133            if let Some(object_id) = object_id {
2134                root_store.tombstone_object(object_id, txn_options).await?;
2135            }
2136        }
2137
2138        let mut counters = self.allocator.counters.lock();
2139        counters.num_flushes += 1;
2140        counters.last_flush_time = Some(std::time::SystemTime::now());
2141        // Return the earliest version used by a struct in the tree
2142        Ok(self.allocator.tree.get_earliest_version())
2143    }
2144}
2145
2146#[cfg(test)]
2147mod tests {
2148    use crate::filesystem::{
2149        FxFilesystem, FxFilesystemBuilder, JournalingObject, OpenFxFilesystem,
2150    };
2151    use crate::fsck::fsck;
2152    use crate::lsm_tree::cache::NullCache;
2153    use crate::lsm_tree::skip_list_layer::SkipListLayer;
2154    use crate::lsm_tree::types::{FuzzyHash as _, Item, ItemRef, Layer, LayerIterator};
2155    use crate::lsm_tree::{LSMTree, Query};
2156    use crate::object_handle::ObjectHandle;
2157    use crate::object_store::allocator::merge::merge;
2158    use crate::object_store::allocator::{
2159        Allocator, AllocatorKey, AllocatorValue, CoalescingIterator,
2160    };
2161    use crate::object_store::transaction::{Options, TRANSACTION_METADATA_MAX_AMOUNT, lock_keys};
2162    use crate::object_store::volume::root_volume;
2163    use crate::object_store::{Directory, LockKey, NewChildStoreOptions, ObjectStore};
2164    use crate::range::RangeExt;
2165    use crate::round::round_up;
2166    use crate::testing;
2167    use fuchsia_async as fasync;
2168    use fuchsia_sync::Mutex;
2169    use std::cmp::{max, min};
2170    use std::ops::{Bound, Range};
2171    use std::sync::Arc;
2172    use storage_device::DeviceHolder;
2173    use storage_device::fake_device::FakeDevice;
2174
2175    #[test]
2176    fn test_allocator_key_is_range_based() {
2177        // Make sure we disallow using allocator keys with point queries.
2178        assert!(AllocatorKey { device_range: 0..100 }.is_range_key());
2179    }
2180
2181    #[fuchsia::test]
2182    async fn test_coalescing_iterator() {
2183        let skip_list = SkipListLayer::new(100);
2184        let items = [
2185            Item::new(
2186                AllocatorKey { device_range: 0..100 },
2187                AllocatorValue::Abs { count: 1, owner_object_id: 99 },
2188            ),
2189            Item::new(
2190                AllocatorKey { device_range: 100..200 },
2191                AllocatorValue::Abs { count: 1, owner_object_id: 99 },
2192            ),
2193        ];
2194        skip_list.insert(items[1].clone()).expect("insert error");
2195        skip_list.insert(items[0].clone()).expect("insert error");
2196        let mut iter =
2197            CoalescingIterator::new(skip_list.seek(Bound::Unbounded).await.expect("seek failed"))
2198                .await
2199                .expect("new failed");
2200        let ItemRef { key, value, .. } = iter.get().expect("get failed");
2201        assert_eq!(
2202            (key, value),
2203            (
2204                &AllocatorKey { device_range: 0..200 },
2205                &AllocatorValue::Abs { count: 1, owner_object_id: 99 }
2206            )
2207        );
2208        iter.advance().await.expect("advance failed");
2209        assert!(iter.get().is_none());
2210    }
2211
2212    #[fuchsia::test]
2213    async fn test_merge_and_coalesce_across_three_layers() {
2214        let lsm_tree = LSMTree::new(merge, Box::new(NullCache {}));
2215        lsm_tree
2216            .insert(Item::new(
2217                AllocatorKey { device_range: 100..200 },
2218                AllocatorValue::Abs { count: 1, owner_object_id: 99 },
2219            ))
2220            .expect("insert error");
2221        lsm_tree.seal();
2222        lsm_tree
2223            .insert(Item::new(
2224                AllocatorKey { device_range: 0..100 },
2225                AllocatorValue::Abs { count: 1, owner_object_id: 99 },
2226            ))
2227            .expect("insert error");
2228
2229        let layer_set = lsm_tree.layer_set();
2230        let mut merger = layer_set.merger();
2231        let mut iter =
2232            CoalescingIterator::new(merger.query(Query::FullScan).await.expect("seek failed"))
2233                .await
2234                .expect("new failed");
2235        let ItemRef { key, value, .. } = iter.get().expect("get failed");
2236        assert_eq!(
2237            (key, value),
2238            (
2239                &AllocatorKey { device_range: 0..200 },
2240                &AllocatorValue::Abs { count: 1, owner_object_id: 99 }
2241            )
2242        );
2243        iter.advance().await.expect("advance failed");
2244        assert!(iter.get().is_none());
2245    }
2246
2247    #[fuchsia::test]
2248    async fn test_merge_and_coalesce_wont_merge_across_object_id() {
2249        let lsm_tree = LSMTree::new(merge, Box::new(NullCache {}));
2250        lsm_tree
2251            .insert(Item::new(
2252                AllocatorKey { device_range: 100..200 },
2253                AllocatorValue::Abs { count: 1, owner_object_id: 99 },
2254            ))
2255            .expect("insert error");
2256        lsm_tree.seal();
2257        lsm_tree
2258            .insert(Item::new(
2259                AllocatorKey { device_range: 0..100 },
2260                AllocatorValue::Abs { count: 1, owner_object_id: 98 },
2261            ))
2262            .expect("insert error");
2263
2264        let layer_set = lsm_tree.layer_set();
2265        let mut merger = layer_set.merger();
2266        let mut iter =
2267            CoalescingIterator::new(merger.query(Query::FullScan).await.expect("seek failed"))
2268                .await
2269                .expect("new failed");
2270        let ItemRef { key, value, .. } = iter.get().expect("get failed");
2271        assert_eq!(
2272            (key, value),
2273            (
2274                &AllocatorKey { device_range: 0..100 },
2275                &AllocatorValue::Abs { count: 1, owner_object_id: 98 },
2276            )
2277        );
2278        iter.advance().await.expect("advance failed");
2279        let ItemRef { key, value, .. } = iter.get().expect("get failed");
2280        assert_eq!(
2281            (key, value),
2282            (
2283                &AllocatorKey { device_range: 100..200 },
2284                &AllocatorValue::Abs { count: 1, owner_object_id: 99 }
2285            )
2286        );
2287        iter.advance().await.expect("advance failed");
2288        assert!(iter.get().is_none());
2289    }
2290
2291    fn overlap(a: &Range<u64>, b: &Range<u64>) -> u64 {
2292        if a.end > b.start && a.start < b.end {
2293            min(a.end, b.end) - max(a.start, b.start)
2294        } else {
2295            0
2296        }
2297    }
2298
2299    async fn collect_allocations(allocator: &Allocator) -> Vec<Range<u64>> {
2300        let layer_set = allocator.tree.layer_set();
2301        let mut merger = layer_set.merger();
2302        let mut iter = allocator
2303            .filter(merger.query(Query::FullScan).await.expect("seek failed"), false)
2304            .await
2305            .expect("build iterator");
2306        let mut allocations: Vec<Range<u64>> = Vec::new();
2307        while let Some(ItemRef { key: AllocatorKey { device_range }, .. }) = iter.get() {
2308            if let Some(r) = allocations.last() {
2309                assert!(device_range.start >= r.end);
2310            }
2311            allocations.push(device_range.clone());
2312            iter.advance().await.expect("advance failed");
2313        }
2314        allocations
2315    }
2316
2317    async fn check_allocations(allocator: &Allocator, expected_allocations: &[Range<u64>]) {
2318        let layer_set = allocator.tree.layer_set();
2319        let mut merger = layer_set.merger();
2320        let mut iter = allocator
2321            .filter(merger.query(Query::FullScan).await.expect("seek failed"), false)
2322            .await
2323            .expect("build iterator");
2324        let mut found = 0;
2325        while let Some(ItemRef { key: AllocatorKey { device_range }, .. }) = iter.get() {
2326            let mut l = device_range.length().expect("Invalid range");
2327            found += l;
2328            // Make sure that the entire range we have found completely overlaps with all the
2329            // allocations we expect to find.
2330            for range in expected_allocations {
2331                l -= overlap(range, device_range);
2332                if l == 0 {
2333                    break;
2334                }
2335            }
2336            assert_eq!(l, 0, "range {device_range:?} not covered by expectations");
2337            iter.advance().await.expect("advance failed");
2338        }
2339        // Make sure the total we found adds up to what we expect.
2340        assert_eq!(found, expected_allocations.iter().map(|r| r.length().unwrap()).sum::<u64>());
2341    }
2342
2343    async fn test_fs() -> (OpenFxFilesystem, Arc<Allocator>) {
2344        let device = DeviceHolder::new(FakeDevice::new(4096, 4096));
2345        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2346        let allocator = fs.allocator();
2347        (fs, allocator)
2348    }
2349
2350    #[fuchsia::test]
2351    async fn test_allocations() {
2352        const STORE_OBJECT_ID: u64 = 99;
2353        let (fs, allocator) = test_fs().await;
2354        let mut transaction =
2355            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2356        let mut device_ranges = collect_allocations(&allocator).await;
2357
2358        // Expected extents:
2359        let expected = vec![
2360            0..4096,        // Superblock A (4k)
2361            4096..139264,   // root_store layer files, StoreInfo.. (33x4k blocks)
2362            139264..204800, // Superblock A extension (64k)
2363            204800..335872, // Initial Journal (128k)
2364            335872..401408, // Superblock B extension (64k)
2365            524288..528384, // Superblock B (4k)
2366        ];
2367        assert_eq!(device_ranges, expected);
2368        device_ranges.push(
2369            allocator
2370                .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2371                .await
2372                .expect("allocate failed"),
2373        );
2374        assert_eq!(device_ranges.last().unwrap().length().expect("Invalid range"), fs.block_size());
2375        device_ranges.push(
2376            allocator
2377                .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2378                .await
2379                .expect("allocate failed"),
2380        );
2381        assert_eq!(device_ranges.last().unwrap().length().expect("Invalid range"), fs.block_size());
2382        assert_eq!(overlap(&device_ranges[0], &device_ranges[1]), 0);
2383        transaction.commit().await.expect("commit failed");
2384        let mut transaction =
2385            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2386        device_ranges.push(
2387            allocator
2388                .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2389                .await
2390                .expect("allocate failed"),
2391        );
2392        assert_eq!(device_ranges[7].length().expect("Invalid range"), fs.block_size());
2393        assert_eq!(overlap(&device_ranges[5], &device_ranges[7]), 0);
2394        assert_eq!(overlap(&device_ranges[6], &device_ranges[7]), 0);
2395        transaction.commit().await.expect("commit failed");
2396
2397        check_allocations(&allocator, &device_ranges).await;
2398    }
2399
2400    #[fuchsia::test]
2401    async fn test_allocate_more_than_max_size() {
2402        const STORE_OBJECT_ID: u64 = 99;
2403        let (fs, allocator) = test_fs().await;
2404        let mut transaction =
2405            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2406        let mut device_ranges = collect_allocations(&allocator).await;
2407        device_ranges.push(
2408            allocator
2409                .allocate(&mut transaction, STORE_OBJECT_ID, fs.device().size())
2410                .await
2411                .expect("allocate failed"),
2412        );
2413        assert_eq!(
2414            device_ranges.last().unwrap().length().expect("Invalid range"),
2415            allocator.max_extent_size_bytes
2416        );
2417        transaction.commit().await.expect("commit failed");
2418
2419        check_allocations(&allocator, &device_ranges).await;
2420    }
2421
2422    #[fuchsia::test]
2423    async fn test_deallocations() {
2424        const STORE_OBJECT_ID: u64 = 99;
2425        let (fs, allocator) = test_fs().await;
2426        let initial_allocations = collect_allocations(&allocator).await;
2427
2428        let mut transaction =
2429            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2430        let device_range1 = allocator
2431            .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2432            .await
2433            .expect("allocate failed");
2434        assert_eq!(device_range1.length().expect("Invalid range"), fs.block_size());
2435        transaction.commit().await.expect("commit failed");
2436
2437        let mut transaction =
2438            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2439        allocator
2440            .deallocate(&mut transaction, STORE_OBJECT_ID, device_range1)
2441            .await
2442            .expect("deallocate failed");
2443        transaction.commit().await.expect("commit failed");
2444
2445        check_allocations(&allocator, &initial_allocations).await;
2446    }
2447
2448    #[fuchsia::test]
2449    async fn test_mark_allocated() {
2450        const STORE_OBJECT_ID: u64 = 99;
2451        let (fs, allocator) = test_fs().await;
2452        let mut device_ranges = collect_allocations(&allocator).await;
2453        let range = {
2454            let mut transaction = fs
2455                .clone()
2456                .new_transaction(lock_keys![], Options::default())
2457                .await
2458                .expect("new failed");
2459            // First, allocate 2 blocks.
2460            allocator
2461                .allocate(&mut transaction, STORE_OBJECT_ID, 2 * fs.block_size())
2462                .await
2463                .expect("allocate failed")
2464            // Let the transaction drop.
2465        };
2466
2467        let mut transaction =
2468            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2469
2470        // If we allocate 1 block, the two blocks that were allocated earlier should be available,
2471        // and this should return the first of them.
2472        device_ranges.push(
2473            allocator
2474                .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2475                .await
2476                .expect("allocate failed"),
2477        );
2478
2479        assert_eq!(device_ranges.last().unwrap().start, range.start);
2480
2481        // Mark the second block as allocated.
2482        let mut range2 = range.clone();
2483        range2.start += fs.block_size();
2484        allocator
2485            .mark_allocated(&mut transaction, STORE_OBJECT_ID, range2.clone())
2486            .expect("mark_allocated failed");
2487        device_ranges.push(range2);
2488
2489        // This should avoid the range we marked as allocated.
2490        device_ranges.push(
2491            allocator
2492                .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2493                .await
2494                .expect("allocate failed"),
2495        );
2496        let last_range = device_ranges.last().unwrap();
2497        assert_eq!(last_range.length().expect("Invalid range"), fs.block_size());
2498        assert_eq!(overlap(last_range, &range), 0);
2499        transaction.commit().await.expect("commit failed");
2500
2501        check_allocations(&allocator, &device_ranges).await;
2502    }
2503
2504    #[fuchsia::test]
2505    async fn test_mark_for_deletion() {
2506        const STORE_OBJECT_ID: u64 = 99;
2507        let (fs, allocator) = test_fs().await;
2508
2509        // Allocate some stuff.
2510        let initial_allocated_bytes = allocator.get_allocated_bytes();
2511        let mut device_ranges = collect_allocations(&allocator).await;
2512        let mut transaction =
2513            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2514        // Note we have a cap on individual allocation length so we allocate over multiple mutation.
2515        for _ in 0..15 {
2516            device_ranges.push(
2517                allocator
2518                    .allocate(&mut transaction, STORE_OBJECT_ID, 100 * fs.block_size())
2519                    .await
2520                    .expect("allocate failed"),
2521            );
2522            device_ranges.push(
2523                allocator
2524                    .allocate(&mut transaction, STORE_OBJECT_ID, 100 * fs.block_size())
2525                    .await
2526                    .expect("allocate2 failed"),
2527            );
2528        }
2529        transaction.commit().await.expect("commit failed");
2530        check_allocations(&allocator, &device_ranges).await;
2531
2532        assert_eq!(
2533            allocator.get_allocated_bytes(),
2534            initial_allocated_bytes + fs.block_size() * 3000
2535        );
2536
2537        // Mark for deletion.
2538        let mut transaction =
2539            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2540        allocator.mark_for_deletion(&mut transaction, STORE_OBJECT_ID);
2541        transaction.commit().await.expect("commit failed");
2542
2543        // Expect that allocated bytes is updated immediately but device ranges are still allocated.
2544        assert_eq!(allocator.get_allocated_bytes(), initial_allocated_bytes);
2545        check_allocations(&allocator, &device_ranges).await;
2546
2547        // Allocate more space than we have until we deallocate the mark_for_deletion space.
2548        // This should force a flush on allocate(). (1500 * 3 > test_fs size of 4096 blocks).
2549        device_ranges.clear();
2550
2551        let mut transaction =
2552            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2553        let target_bytes = 1500 * fs.block_size();
2554        while device_ranges.iter().map(|x| x.length().unwrap()).sum::<u64>() != target_bytes {
2555            let len = std::cmp::min(
2556                target_bytes - device_ranges.iter().map(|x| x.length().unwrap()).sum::<u64>(),
2557                100 * fs.block_size(),
2558            );
2559            device_ranges.push(
2560                allocator.allocate(&mut transaction, 100, len).await.expect("allocate failed"),
2561            );
2562        }
2563        transaction.commit().await.expect("commit failed");
2564
2565        // Have the deleted ranges cleaned up.
2566        allocator.flush().await.expect("flush failed");
2567
2568        // The flush above seems to trigger an allocation for the allocator itself.
2569        // We will just check that we have the right size for the owner we care about.
2570
2571        assert!(!allocator.get_owner_allocated_bytes().contains_key(&STORE_OBJECT_ID));
2572        assert_eq!(*allocator.get_owner_allocated_bytes().get(&100).unwrap(), target_bytes,);
2573    }
2574
2575    async fn create_file(store: &Arc<ObjectStore>, size: usize) {
2576        let root_directory =
2577            Directory::open(store, store.root_directory_object_id()).await.expect("open failed");
2578
2579        let mut transaction = store
2580            .filesystem()
2581            .new_transaction(
2582                lock_keys![LockKey::object(
2583                    store.store_object_id(),
2584                    store.root_directory_object_id()
2585                )],
2586                Options::default(),
2587            )
2588            .await
2589            .expect("new_transaction failed");
2590        let file = root_directory
2591            .create_child_file(&mut transaction, &format!("foo {}", size))
2592            .await
2593            .expect("create_child_file failed");
2594        transaction.commit().await.expect("commit failed");
2595
2596        let buffer = file.allocate_buffer(size).await;
2597
2598        // Append some data to it.
2599        let mut transaction = file
2600            .new_transaction_with_options(Options {
2601                borrow_metadata_space: true,
2602                ..Default::default()
2603            })
2604            .await
2605            .expect("new_transaction_with_options failed");
2606        file.txn_write(&mut transaction, 0, buffer.as_ref()).await.expect("txn_write failed");
2607        transaction.commit().await.expect("commit failed");
2608    }
2609
2610    #[fuchsia::test]
2611    async fn test_replay_with_deleted_store_and_compaction() {
2612        let (fs, _) = test_fs().await;
2613
2614        const FILE_SIZE: usize = 10_000_000;
2615
2616        let mut store_id = {
2617            let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2618            let store = root_vol
2619                .new_volume("vol", NewChildStoreOptions::default())
2620                .await
2621                .expect("new_volume failed");
2622
2623            create_file(&store, FILE_SIZE).await;
2624            store.store_object_id()
2625        };
2626
2627        fs.close().await.expect("close failed");
2628        let device = fs.take_device().await;
2629        device.reopen(false);
2630
2631        let mut fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2632
2633        // Compact so that when we replay the transaction to delete the store won't find any
2634        // mutations.
2635        fs.journal().compact().await.expect("compact failed");
2636
2637        for _ in 0..2 {
2638            {
2639                let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2640
2641                let transaction = fs
2642                    .clone()
2643                    .new_transaction(
2644                        lock_keys![
2645                            LockKey::object(
2646                                root_vol.volume_directory().store().store_object_id(),
2647                                root_vol.volume_directory().object_id(),
2648                            ),
2649                            LockKey::flush(store_id)
2650                        ],
2651                        Options { borrow_metadata_space: true, ..Default::default() },
2652                    )
2653                    .await
2654                    .expect("new_transaction failed");
2655                root_vol
2656                    .delete_volume("vol", transaction, || {})
2657                    .await
2658                    .expect("delete_volume failed");
2659
2660                let store = root_vol
2661                    .new_volume("vol", NewChildStoreOptions::default())
2662                    .await
2663                    .expect("new_volume failed");
2664                create_file(&store, FILE_SIZE).await;
2665                store_id = store.store_object_id();
2666            }
2667
2668            fs.close().await.expect("close failed");
2669            let device = fs.take_device().await;
2670            device.reopen(false);
2671
2672            fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2673        }
2674
2675        fsck(fs.clone()).await.expect("fsck failed");
2676        fs.close().await.expect("close failed");
2677    }
2678
2679    #[fuchsia::test(threads = 4)]
2680    async fn test_compaction_delete_race() {
2681        let (fs, _allocator) = test_fs().await;
2682
2683        {
2684            const FILE_SIZE: usize = 10_000_000;
2685
2686            let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2687            let store = root_vol
2688                .new_volume("vol", NewChildStoreOptions::default())
2689                .await
2690                .expect("new_volume failed");
2691
2692            create_file(&store, FILE_SIZE).await;
2693
2694            // Race compaction with deleting a store.
2695            let fs_clone = fs.clone();
2696
2697            // Even though the executor has 4 threads, it's hard to get it to run with
2698            // multiple threads.
2699            let executor_tasks = testing::force_executor_threads_to_run(4).await;
2700
2701            let task = fasync::Task::spawn(async move {
2702                fs_clone.journal().compact().await.expect("compact failed");
2703            });
2704
2705            // We don't need the executor tasks any more.
2706            drop(executor_tasks);
2707
2708            // This range is chosen such that it caused this test to fail after quite a low number
2709            // of iterations for the bug that this test was introduced for.
2710            let sleep = rand::random_range(3000..6000);
2711            std::thread::sleep(std::time::Duration::from_micros(sleep));
2712            log::info!("sleep {sleep}us");
2713
2714            let transaction = fs
2715                .clone()
2716                .new_transaction(
2717                    lock_keys![
2718                        LockKey::object(
2719                            root_vol.volume_directory().store().store_object_id(),
2720                            root_vol.volume_directory().object_id(),
2721                        ),
2722                        LockKey::flush(store.store_object_id())
2723                    ],
2724                    Options { borrow_metadata_space: true, ..Default::default() },
2725                )
2726                .await
2727                .expect("new_transaction failed");
2728            root_vol.delete_volume("vol", transaction, || {}).await.expect("delete_volume failed");
2729
2730            task.await;
2731        }
2732
2733        fs.journal().compact().await.expect("compact failed");
2734        fs.close().await.expect("close failed");
2735
2736        let device = fs.take_device().await;
2737        device.reopen(false);
2738
2739        let fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2740        fsck(fs.clone()).await.expect("fsck failed");
2741        fs.close().await.expect("close failed");
2742    }
2743
2744    #[fuchsia::test]
2745    async fn test_delete_multiple_volumes() {
2746        let (mut fs, _) = test_fs().await;
2747
2748        for _ in 0..50 {
2749            {
2750                let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2751                let store = root_vol
2752                    .new_volume("vol", NewChildStoreOptions::default())
2753                    .await
2754                    .expect("new_volume failed");
2755
2756                create_file(&store, 1_000_000).await;
2757
2758                let transaction = fs
2759                    .clone()
2760                    .new_transaction(
2761                        lock_keys![
2762                            LockKey::object(
2763                                root_vol.volume_directory().store().store_object_id(),
2764                                root_vol.volume_directory().object_id(),
2765                            ),
2766                            LockKey::flush(store.store_object_id())
2767                        ],
2768                        Options { borrow_metadata_space: true, ..Default::default() },
2769                    )
2770                    .await
2771                    .expect("new_transaction failed");
2772                root_vol
2773                    .delete_volume("vol", transaction, || {})
2774                    .await
2775                    .expect("delete_volume failed");
2776
2777                fs.allocator().flush().await.expect("flush failed");
2778            }
2779
2780            fs.close().await.expect("close failed");
2781            let device = fs.take_device().await;
2782            device.reopen(false);
2783
2784            fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2785        }
2786
2787        fsck(fs.clone()).await.expect("fsck failed");
2788        fs.close().await.expect("close failed");
2789    }
2790
2791    #[fuchsia::test]
2792    async fn test_allocate_free_reallocate() {
2793        const STORE_OBJECT_ID: u64 = 99;
2794        let (fs, allocator) = test_fs().await;
2795
2796        // Allocate some stuff.
2797        let mut device_ranges = Vec::new();
2798        let mut transaction =
2799            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2800        for _ in 0..30 {
2801            device_ranges.push(
2802                allocator
2803                    .allocate(&mut transaction, STORE_OBJECT_ID, 100 * fs.block_size())
2804                    .await
2805                    .expect("allocate failed"),
2806            );
2807        }
2808        transaction.commit().await.expect("commit failed");
2809
2810        assert_eq!(
2811            fs.block_size() * 3000,
2812            *allocator.get_owner_allocated_bytes().entry(STORE_OBJECT_ID).or_default()
2813        );
2814
2815        // Delete it all.
2816        let mut transaction =
2817            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2818        for range in std::mem::replace(&mut device_ranges, Vec::new()) {
2819            allocator.deallocate(&mut transaction, STORE_OBJECT_ID, range).await.expect("dealloc");
2820        }
2821        transaction.commit().await.expect("commit failed");
2822
2823        assert_eq!(0, *allocator.get_owner_allocated_bytes().entry(STORE_OBJECT_ID).or_default());
2824
2825        // Allocate some more stuff. Due to storage pressure, this requires us to flush device
2826        // before reusing the above space
2827        let mut transaction =
2828            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2829        let target_len = 1500 * fs.block_size();
2830        while device_ranges.iter().map(|i| i.length().unwrap()).sum::<u64>() != target_len {
2831            let len = target_len - device_ranges.iter().map(|i| i.length().unwrap()).sum::<u64>();
2832            device_ranges.push(
2833                allocator
2834                    .allocate(&mut transaction, STORE_OBJECT_ID, len)
2835                    .await
2836                    .expect("allocate failed"),
2837            );
2838        }
2839        transaction.commit().await.expect("commit failed");
2840
2841        assert_eq!(
2842            fs.block_size() * 1500,
2843            *allocator.get_owner_allocated_bytes().entry(STORE_OBJECT_ID).or_default()
2844        );
2845    }
2846
2847    #[fuchsia::test]
2848    async fn test_flush() {
2849        const STORE_OBJECT_ID: u64 = 99;
2850
2851        let mut device_ranges = Vec::new();
2852        let device = {
2853            let (fs, allocator) = test_fs().await;
2854            let mut transaction = fs
2855                .clone()
2856                .new_transaction(lock_keys![], Options::default())
2857                .await
2858                .expect("new failed");
2859            device_ranges.push(
2860                allocator
2861                    .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2862                    .await
2863                    .expect("allocate failed"),
2864            );
2865            device_ranges.push(
2866                allocator
2867                    .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2868                    .await
2869                    .expect("allocate failed"),
2870            );
2871            device_ranges.push(
2872                allocator
2873                    .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2874                    .await
2875                    .expect("allocate failed"),
2876            );
2877            transaction.commit().await.expect("commit failed");
2878
2879            allocator.flush().await.expect("flush failed");
2880
2881            fs.close().await.expect("close failed");
2882            fs.take_device().await
2883        };
2884
2885        device.reopen(false);
2886        let fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2887        let allocator = fs.allocator();
2888
2889        let allocated = collect_allocations(&allocator).await;
2890
2891        // Make sure the ranges we allocated earlier are still allocated.
2892        for i in &device_ranges {
2893            let mut overlapping = 0;
2894            for j in &allocated {
2895                overlapping += overlap(i, j);
2896            }
2897            assert_eq!(overlapping, i.length().unwrap(), "Range {i:?} not allocated");
2898        }
2899
2900        let mut transaction =
2901            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2902        let range = allocator
2903            .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2904            .await
2905            .expect("allocate failed");
2906
2907        // Make sure the range just allocated doesn't overlap any other allocated ranges.
2908        for r in &allocated {
2909            assert_eq!(overlap(r, &range), 0);
2910        }
2911        transaction.commit().await.expect("commit failed");
2912    }
2913
2914    #[fuchsia::test]
2915    async fn test_dropped_transaction() {
2916        const STORE_OBJECT_ID: u64 = 99;
2917        let (fs, allocator) = test_fs().await;
2918        let allocated_range = {
2919            let mut transaction = fs
2920                .clone()
2921                .new_transaction(lock_keys![], Options::default())
2922                .await
2923                .expect("new_transaction failed");
2924            allocator
2925                .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2926                .await
2927                .expect("allocate failed")
2928        };
2929        // After dropping the transaction and attempting to allocate again, we should end up with
2930        // the same range because the reservation should have been released.
2931        let mut transaction = fs
2932            .clone()
2933            .new_transaction(lock_keys![], Options::default())
2934            .await
2935            .expect("new_transaction failed");
2936        assert_eq!(
2937            allocator
2938                .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2939                .await
2940                .expect("allocate failed"),
2941            allocated_range
2942        );
2943    }
2944
2945    #[fuchsia::test]
2946    async fn test_cleanup_removed_owner() {
2947        const STORE_OBJECT_ID: u64 = 99;
2948        let device = {
2949            let (fs, allocator) = test_fs().await;
2950
2951            assert!(!allocator.get_owner_allocated_bytes().contains_key(&STORE_OBJECT_ID));
2952            {
2953                let mut transaction =
2954                    fs.clone().new_transaction(lock_keys![], Options::default()).await.unwrap();
2955                allocator
2956                    .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2957                    .await
2958                    .expect("Allocating");
2959                transaction.commit().await.expect("Committing.");
2960            }
2961            allocator.flush().await.expect("Flushing");
2962            assert!(allocator.get_owner_allocated_bytes().contains_key(&STORE_OBJECT_ID));
2963            {
2964                let mut transaction =
2965                    fs.clone().new_transaction(lock_keys![], Options::default()).await.unwrap();
2966                allocator.mark_for_deletion(&mut transaction, STORE_OBJECT_ID);
2967                transaction.commit().await.expect("Committing.");
2968            }
2969            assert!(!allocator.get_owner_allocated_bytes().contains_key(&STORE_OBJECT_ID));
2970            fs.close().await.expect("Closing");
2971            fs.take_device().await
2972        };
2973
2974        device.reopen(false);
2975        let fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2976        let allocator = fs.allocator();
2977        assert!(!allocator.get_owner_allocated_bytes().contains_key(&STORE_OBJECT_ID));
2978    }
2979
2980    #[fuchsia::test]
2981    async fn test_allocated_bytes() {
2982        const STORE_OBJECT_ID: u64 = 99;
2983        let (fs, allocator) = test_fs().await;
2984
2985        let initial_allocated_bytes = allocator.get_allocated_bytes();
2986
2987        // Verify allocated_bytes reflects allocation changes.
2988        let allocated_bytes = initial_allocated_bytes + fs.block_size();
2989        let allocated_range = {
2990            let mut transaction = fs
2991                .clone()
2992                .new_transaction(lock_keys![], Options::default())
2993                .await
2994                .expect("new_transaction failed");
2995            let range = allocator
2996                .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2997                .await
2998                .expect("allocate failed");
2999            transaction.commit().await.expect("commit failed");
3000            assert_eq!(allocator.get_allocated_bytes(), allocated_bytes);
3001            range
3002        };
3003
3004        {
3005            let mut transaction = fs
3006                .clone()
3007                .new_transaction(lock_keys![], Options::default())
3008                .await
3009                .expect("new_transaction failed");
3010            allocator
3011                .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
3012                .await
3013                .expect("allocate failed");
3014
3015            // Prior to committing, the count of allocated bytes shouldn't change.
3016            assert_eq!(allocator.get_allocated_bytes(), allocated_bytes);
3017        }
3018
3019        // After dropping the prior transaction, the allocated bytes still shouldn't have changed.
3020        assert_eq!(allocator.get_allocated_bytes(), allocated_bytes);
3021
3022        // Verify allocated_bytes reflects deallocations.
3023        let deallocate_range = allocated_range.start + 20..allocated_range.end - 20;
3024        let mut transaction =
3025            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
3026        allocator
3027            .deallocate(&mut transaction, STORE_OBJECT_ID, deallocate_range)
3028            .await
3029            .expect("deallocate failed");
3030
3031        // Before committing, there should be no change.
3032        assert_eq!(allocator.get_allocated_bytes(), allocated_bytes);
3033
3034        transaction.commit().await.expect("commit failed");
3035
3036        // After committing, all but 40 bytes should remain allocated.
3037        assert_eq!(allocator.get_allocated_bytes(), initial_allocated_bytes + 40);
3038    }
3039
3040    #[fuchsia::test]
3041    async fn test_persist_bytes_limit() {
3042        const LIMIT: u64 = 12345;
3043        const OWNER_ID: u64 = 12;
3044
3045        let (fs, allocator) = test_fs().await;
3046        {
3047            let mut transaction = fs
3048                .clone()
3049                .new_transaction(lock_keys![], Options::default())
3050                .await
3051                .expect("new_transaction failed");
3052            allocator
3053                .set_bytes_limit(&mut transaction, OWNER_ID, LIMIT)
3054                .expect("Failed to set limit.");
3055            assert!(allocator.inner.lock().info.limit_bytes.get(&OWNER_ID).is_none());
3056            transaction.commit().await.expect("Failed to commit transaction");
3057            let bytes: u64 = *allocator
3058                .inner
3059                .lock()
3060                .info
3061                .limit_bytes
3062                .get(&OWNER_ID)
3063                .expect("Failed to find limit");
3064            assert_eq!(LIMIT, bytes);
3065        }
3066    }
3067
3068    /// Given a sorted list of non-overlapping ranges, this will coalesce adjacent ranges.
3069    /// This allows comparison of equivalent sets of ranges which may occur due to differences
3070    /// across allocator strategies.
3071    fn coalesce_ranges(ranges: Vec<Range<u64>>) -> Vec<Range<u64>> {
3072        let mut coalesced = Vec::new();
3073        let mut prev: Option<Range<u64>> = None;
3074        for range in ranges {
3075            if let Some(prev_range) = &mut prev {
3076                if range.start == prev_range.end {
3077                    prev_range.end = range.end;
3078                } else {
3079                    coalesced.push(prev_range.clone());
3080                    prev = Some(range);
3081                }
3082            } else {
3083                prev = Some(range);
3084            }
3085        }
3086        if let Some(prev_range) = prev {
3087            coalesced.push(prev_range);
3088        }
3089        coalesced
3090    }
3091
3092    #[fuchsia::test]
3093    async fn test_take_for_trimming() {
3094        const STORE_OBJECT_ID: u64 = 99;
3095
3096        // Allocate a large chunk, then free a few bits of it, so we have free chunks interleaved
3097        // with allocated chunks.
3098        let allocated_range;
3099        let expected_free_ranges;
3100        let device = {
3101            let (fs, allocator) = test_fs().await;
3102            let bs = fs.block_size();
3103            let mut transaction = fs
3104                .clone()
3105                .new_transaction(lock_keys![], Options::default())
3106                .await
3107                .expect("new failed");
3108            allocated_range = allocator
3109                .allocate(&mut transaction, STORE_OBJECT_ID, 32 * bs)
3110                .await
3111                .expect("allocate failed");
3112            transaction.commit().await.expect("commit failed");
3113
3114            let mut transaction = fs
3115                .clone()
3116                .new_transaction(lock_keys![], Options::default())
3117                .await
3118                .expect("new failed");
3119            let base = allocated_range.start;
3120            expected_free_ranges = vec![
3121                base..(base + (bs * 1)),
3122                (base + (bs * 2))..(base + (bs * 3)),
3123                // Note that the next three ranges are adjacent and will be treated as one free
3124                // range once applied.  We separate them here to exercise the handling of "large"
3125                // free ranges.
3126                (base + (bs * 4))..(base + (bs * 8)),
3127                (base + (bs * 8))..(base + (bs * 12)),
3128                (base + (bs * 12))..(base + (bs * 13)),
3129                (base + (bs * 29))..(base + (bs * 30)),
3130            ];
3131            for range in &expected_free_ranges {
3132                allocator
3133                    .deallocate(&mut transaction, STORE_OBJECT_ID, range.clone())
3134                    .await
3135                    .expect("deallocate failed");
3136            }
3137            transaction.commit().await.expect("commit failed");
3138
3139            allocator.flush().await.expect("flush failed");
3140
3141            fs.close().await.expect("close failed");
3142            fs.take_device().await
3143        };
3144
3145        device.reopen(false);
3146        let fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
3147        let allocator = fs.allocator();
3148
3149        // These values were picked so that each of them would be the reason why
3150        // collect_free_extents finished, and so we would return after partially processing one of
3151        // the free extents.
3152        let max_extent_size = fs.block_size() as usize * 4;
3153        const EXTENTS_PER_BATCH: usize = 2;
3154        let mut free_ranges = vec![];
3155        let mut offset = allocated_range.start;
3156        while offset < allocated_range.end {
3157            let free = allocator
3158                .take_for_trimming(offset, max_extent_size, EXTENTS_PER_BATCH)
3159                .await
3160                .expect("take_for_trimming failed");
3161            free_ranges.extend(
3162                free.extents().iter().filter(|range| range.end <= allocated_range.end).cloned(),
3163            );
3164            offset = free.extents().last().expect("Unexpectedly hit the end of free extents").end;
3165        }
3166        // Coalesce adjacent free ranges because the allocator may return smaller aligned chunks
3167        // but the overall range should always be equivalent.
3168        let coalesced_free_ranges = coalesce_ranges(free_ranges);
3169        let coalesced_expected_free_ranges = coalesce_ranges(expected_free_ranges);
3170
3171        assert_eq!(coalesced_free_ranges, coalesced_expected_free_ranges);
3172    }
3173
3174    #[fuchsia::test]
3175    async fn test_allocations_wait_for_free_extents() {
3176        const STORE_OBJECT_ID: u64 = 99;
3177        let (fs, allocator) = test_fs().await;
3178        let allocator_clone = allocator.clone();
3179
3180        let mut transaction =
3181            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
3182
3183        // Tie up all of the free extents on the device, and make sure allocations block.
3184        let max_extent_size = fs.device().size() as usize;
3185        const EXTENTS_PER_BATCH: usize = usize::MAX;
3186
3187        // HACK: Treat `trimmable_extents` as being locked by `trim_done` (i.e. it should only be
3188        // accessed whilst `trim_done` is locked). We can't combine them into the same mutex,
3189        // because the inner type would be "poisoned" by the lifetime parameter of
3190        // `trimmable_extents` (which is in the lifetime of `allocator`), and then we can't move it
3191        // into `alloc_task` which would require a `'static` lifetime.
3192        let trim_done = Arc::new(Mutex::new(false));
3193        let trimmable_extents = allocator
3194            .take_for_trimming(0, max_extent_size, EXTENTS_PER_BATCH)
3195            .await
3196            .expect("take_for_trimming failed");
3197
3198        let trim_done_clone = trim_done.clone();
3199        let bs = fs.block_size();
3200        let alloc_task = fasync::Task::spawn(async move {
3201            allocator_clone
3202                .allocate(&mut transaction, STORE_OBJECT_ID, bs)
3203                .await
3204                .expect("allocate should fail");
3205            {
3206                assert!(*trim_done_clone.lock(), "Allocation finished before trim completed");
3207            }
3208            transaction.commit().await.expect("commit failed");
3209        });
3210
3211        // Add a small delay to simulate the trim taking some nonzero amount of time.  Otherwise,
3212        // this will almost certainly always beat the allocation attempt.
3213        fasync::Timer::new(std::time::Duration::from_millis(100)).await;
3214
3215        // Once the free extents are released, the task should unblock.
3216        {
3217            let mut trim_done = trim_done.lock();
3218            std::mem::drop(trimmable_extents);
3219            *trim_done = true;
3220        }
3221
3222        alloc_task.await;
3223    }
3224
3225    #[fuchsia::test]
3226    async fn test_allocation_with_reservation_not_multiple_of_block_size() {
3227        const STORE_OBJECT_ID: u64 = 99;
3228        let (fs, allocator) = test_fs().await;
3229
3230        // Reserve an amount that isn't a multiple of the block size.
3231        const RESERVATION_AMOUNT: u64 = TRANSACTION_METADATA_MAX_AMOUNT + 5000;
3232        let reservation =
3233            allocator.clone().reserve(Some(STORE_OBJECT_ID), RESERVATION_AMOUNT).unwrap();
3234
3235        let mut transaction = fs
3236            .clone()
3237            .new_transaction(
3238                lock_keys![],
3239                Options { allocator_reservation: Some(&reservation), ..Options::default() },
3240            )
3241            .await
3242            .expect("new failed");
3243
3244        let range = allocator
3245            .allocate(
3246                &mut transaction,
3247                STORE_OBJECT_ID,
3248                round_up(RESERVATION_AMOUNT, fs.block_size()).unwrap(),
3249            )
3250            .await
3251            .expect("allocate faiiled");
3252        assert_eq!((range.end - range.start) % fs.block_size(), 0);
3253
3254        println!("{}", range.end - range.start);
3255    }
3256}