fxfs/object_store/
allocator.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! # The Allocator
6//!
7//! The allocator in Fxfs is filesystem-wide entity responsible for managing the allocation of
8//! regions of the device to "owners" (which are `ObjectStore`).
9//!
10//! Allocations are tracked in an LSMTree with coalescing used to merge neighboring allocations
11//! with the same properties (owner and reference count). As of writing, reference counting is not
12//! used. (Reference counts are intended for future use if/when snapshot support is implemented.)
13//!
14//! There are a number of important implementation features of the allocator that warrant further
15//! documentation.
16//!
17//! ## Byte limits
18//!
19//! Fxfs is a multi-volume filesystem. Fuchsia with fxblob currently uses two primary volumes -
20//! an unencrypted volume for blob storage and an encrypted volume for data storage.
21//! Byte limits ensure that no one volume can consume all available storage. This is important
22//! as software updates must always be possible (blobs) and conversely configuration data should
23//! always be writable (data).
24//!
25//! ## Reservation tracking
26//!
27//! Fxfs on Fuchsia leverages write-back caching which allows us to buffer writes in RAM until
28//! memory pressure, an explicit flush or a file close requires us to persist data to storage.
29//!
30//! To ensure that we do not over-commit in such cases (e.g. by writing many files to RAM only to
31//! find out tha there is insufficient disk to store them all), the allocator includes a simple
32//! reservation tracking mechanism.
33//!
34//! Reservation tracking is implemented hierarchically such that a reservation can portion out
35//! sub-reservations, each of which may be "reserved" or "forgotten" when it comes time to actually
36//! allocate space.
37//!
38//! ## Fixed locations
39//!
40//! The only fixed location files in Fxfs are the first 512kiB extents of the two superblocks that
41//! exist as the first things on the disk (i.e. The first 1MiB). The allocator manages these
42//! locations, but does so using a 'mark_allocated' method distinct from all other allocations in
43//! which the location is left up to the allocator.
44//!
45//! ## Deallocated unusable regions
46//!
47//! It is not legal to reuse a deallocated disk region until after a flush. Transactions
48//! are not guaranteed to be persisted until after a successful flush so any reuse of
49//! a region before then followed by power loss may lead to corruption.
50//!
51//! e.g. Consider if we deallocate a file, reuse its extent for another file, then crash after
52//! writing to the new file but not yet flushing the journal. At next mount we will have lost the
53//! transaction despite overwriting the original file's data, leading to inconsistency errors (data
54//! corruption).
55//!
56//! These regions are currently tracked in RAM in the allocator.
57//!
58//! ## Allocated but uncommitted regions
59//!
60//! These are allocated regions that are not (yet) persisted to disk. They are regular
61//! file allocations, but are not stored on persistent storage until their transaction is committed
62//! and safely flushed to disk.
63//!
64//! ## TRIMed unusable regions
65//!
66//! We periodically TRIM unallocated regions to give the SSD controller insight into which
67//! parts of the device contain data. We must avoid using temporary TRIM allocations that are held
68//! while we perform these operations.
69//!
70//! ## Volume deletion
71//!
72//! We make use of an optimisation in the case where an entire volume is deleted. In such cases,
73//! rather than individually deallocate all disk regions associated with that volume, we make
74//! note of the deletion and perform special merge operation on the next LSMTree compaction that
75//! filters out allocations for the deleted volume.
76//!
77//! This is designed to make dropping of volumes significantly cheaper, but it does add some
78//! additional complexity if implementing an allocator that implements data structures to track
79//! free space (rather than just allocated space).
80//!
81//! ## Image generation
82//!
83//! The Fuchsia build process requires building an initial filesystem image. In the case of
84//! fxblob-based boards, this is an Fxfs filesystem containing a volume with the base set of
85//! blobs required to bootstrap the system. When we build such an image, we want it to be as compact
86//! as possible as we're potentially packaging it up for distribution. To that end, our allocation
87//! strategy (or at least the strategy used for image generation) should prefer to allocate from the
88//! start of the device wherever possible.
89pub mod merge;
90pub mod strategy;
91
92use crate::drop_event::DropEvent;
93use crate::errors::FxfsError;
94use crate::filesystem::{ApplyContext, ApplyMode, FxFilesystem, JournalingObject, SyncOptions};
95use crate::log::*;
96use crate::lsm_tree::cache::NullCache;
97use crate::lsm_tree::skip_list_layer::SkipListLayer;
98use crate::lsm_tree::types::{
99    FuzzyHash, Item, ItemRef, Layer, LayerIterator, LayerKey, MergeType, OrdLowerBound,
100    OrdUpperBound, RangeKey, SortByU64, Value,
101};
102use crate::lsm_tree::{LSMTree, Query, layers_from_handles};
103use crate::object_handle::{INVALID_OBJECT_ID, ObjectHandle, ReadObjectHandle};
104use crate::object_store::object_manager::ReservationUpdate;
105use crate::object_store::transaction::{
106    AllocatorMutation, AssocObj, LockKey, Mutation, Options, Transaction, WriteGuard, lock_keys,
107};
108use crate::object_store::{DataObjectHandle, DirectWriter, HandleOptions, ObjectStore, tree};
109use crate::range::RangeExt;
110use crate::round::{round_div, round_down, round_up};
111use crate::serialized_types::{
112    DEFAULT_MAX_SERIALIZED_RECORD_SIZE, LATEST_VERSION, Version, Versioned, VersionedLatest,
113};
114use anyhow::{Context, Error, anyhow, bail, ensure};
115use async_trait::async_trait;
116use either::Either::{Left, Right};
117use event_listener::EventListener;
118use fprint::TypeFingerprint;
119use fuchsia_inspect::HistogramProperty;
120use fuchsia_sync::Mutex;
121use futures::FutureExt;
122use merge::{filter_marked_for_deletion, filter_tombstones, merge};
123use serde::{Deserialize, Serialize};
124use std::borrow::Borrow;
125use std::collections::{BTreeMap, HashSet, VecDeque};
126use std::hash::Hash;
127use std::marker::PhantomData;
128use std::num::Saturating;
129use std::ops::Range;
130use std::sync::atomic::{AtomicU64, Ordering};
131use std::sync::{Arc, Weak};
132
133/// This trait is implemented by things that own reservations.
134pub trait ReservationOwner: Send + Sync {
135    /// Report that bytes are being released from the reservation back to the the |ReservationOwner|
136    /// where |owner_object_id| is the owner under the root object store associated with the
137    /// reservation.
138    fn release_reservation(&self, owner_object_id: Option<u64>, amount: u64);
139}
140
141/// A reservation guarantees that when it comes time to actually allocate, it will not fail due to
142/// lack of space.  Sub-reservations (a.k.a. holds) are possible which effectively allows part of a
143/// reservation to be set aside until it's time to commit.  Reservations do offer some
144/// thread-safety, but some responsibility is born by the caller: e.g. calling `forget` and
145/// `reserve` at the same time from different threads is unsafe. Reservations are have an
146/// |owner_object_id| which associates it with an object under the root object store that the
147/// reservation is accounted against.
148pub struct ReservationImpl<T: Borrow<U>, U: ReservationOwner + ?Sized> {
149    owner: T,
150    owner_object_id: Option<u64>,
151    inner: Mutex<ReservationInner>,
152    phantom: PhantomData<U>,
153}
154
155#[derive(Debug, Default)]
156struct ReservationInner {
157    // Amount currently held by this reservation.
158    amount: u64,
159
160    // Amount reserved by sub-reservations.
161    reserved: u64,
162}
163
164impl<T: Borrow<U>, U: ReservationOwner + ?Sized> std::fmt::Debug for ReservationImpl<T, U> {
165    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
166        self.inner.lock().fmt(f)
167    }
168}
169
170impl<T: Borrow<U> + Clone + Send + Sync, U: ReservationOwner + ?Sized> ReservationImpl<T, U> {
171    pub fn new(owner: T, owner_object_id: Option<u64>, amount: u64) -> Self {
172        Self {
173            owner,
174            owner_object_id,
175            inner: Mutex::new(ReservationInner { amount, reserved: 0 }),
176            phantom: PhantomData,
177        }
178    }
179
180    pub fn owner_object_id(&self) -> Option<u64> {
181        self.owner_object_id
182    }
183
184    /// Returns the total amount of the reservation, not accounting for anything that might be held.
185    pub fn amount(&self) -> u64 {
186        self.inner.lock().amount
187    }
188
189    /// Adds more to the reservation.
190    pub fn add(&self, amount: u64) {
191        self.inner.lock().amount += amount;
192    }
193
194    /// Returns the entire amount of the reservation.  The caller is responsible for maintaining
195    /// consistency, i.e. updating counters, etc, and there can be no sub-reservations (an assert
196    /// will fire otherwise).
197    pub fn forget(&self) -> u64 {
198        let mut inner = self.inner.lock();
199        assert_eq!(inner.reserved, 0);
200        std::mem::take(&mut inner.amount)
201    }
202
203    /// Takes some of the reservation.  The caller is responsible for maintaining consistency,
204    /// i.e. updating counters, etc.  This will assert that the amount being forgotten does not
205    /// exceed the available reservation amount; the caller should ensure that this is the case.
206    pub fn forget_some(&self, amount: u64) {
207        let mut inner = self.inner.lock();
208        inner.amount -= amount;
209        assert!(inner.reserved <= inner.amount);
210    }
211
212    /// Returns a partial amount of the reservation. |amount| is passed the |limit| and should
213    /// return the amount, which can be zero.
214    fn reserve_with(&self, amount: impl FnOnce(u64) -> u64) -> ReservationImpl<&Self, Self> {
215        let mut inner = self.inner.lock();
216        let taken = amount(inner.amount - inner.reserved);
217        inner.reserved += taken;
218        ReservationImpl::new(self, self.owner_object_id, taken)
219    }
220
221    /// Reserves *exactly* amount if possible.
222    pub fn reserve(&self, amount: u64) -> Option<ReservationImpl<&Self, Self>> {
223        let mut inner = self.inner.lock();
224        if inner.amount - inner.reserved < amount {
225            None
226        } else {
227            inner.reserved += amount;
228            Some(ReservationImpl::new(self, self.owner_object_id, amount))
229        }
230    }
231
232    /// Commits a previously reserved amount from this reservation.  The caller is responsible for
233    /// ensuring the amount was reserved.
234    pub fn commit(&self, amount: u64) {
235        let mut inner = self.inner.lock();
236        inner.reserved -= amount;
237        inner.amount -= amount;
238    }
239
240    /// Returns some of the reservation.
241    pub fn give_back(&self, amount: u64) {
242        self.owner.borrow().release_reservation(self.owner_object_id, amount);
243        let mut inner = self.inner.lock();
244        inner.amount -= amount;
245        assert!(inner.reserved <= inner.amount);
246    }
247
248    /// Moves `amount` from this reservation to another reservation.
249    pub fn move_to<V: Borrow<W> + Clone + Send + Sync, W: ReservationOwner + ?Sized>(
250        &self,
251        other: &ReservationImpl<V, W>,
252        amount: u64,
253    ) {
254        assert_eq!(self.owner_object_id, other.owner_object_id());
255        let mut inner = self.inner.lock();
256        if let Some(amount) = inner.amount.checked_sub(amount) {
257            inner.amount = amount;
258        } else {
259            std::mem::drop(inner);
260            panic!("Insufficient reservation space");
261        }
262        other.add(amount);
263    }
264}
265
266impl<T: Borrow<U>, U: ReservationOwner + ?Sized> Drop for ReservationImpl<T, U> {
267    fn drop(&mut self) {
268        let inner = self.inner.get_mut();
269        assert_eq!(inner.reserved, 0);
270        let owner_object_id = self.owner_object_id;
271        if inner.amount > 0 {
272            self.owner
273                .borrow()
274                .release_reservation(owner_object_id, std::mem::take(&mut inner.amount));
275        }
276    }
277}
278
279impl<T: Borrow<U> + Send + Sync, U: ReservationOwner + ?Sized> ReservationOwner
280    for ReservationImpl<T, U>
281{
282    fn release_reservation(&self, owner_object_id: Option<u64>, amount: u64) {
283        // Sub-reservations should belong to the same owner (or lack thereof).
284        assert_eq!(owner_object_id, self.owner_object_id);
285        let mut inner = self.inner.lock();
286        assert!(inner.reserved >= amount, "{} >= {}", inner.reserved, amount);
287        inner.reserved -= amount;
288    }
289}
290
291pub type Reservation = ReservationImpl<Arc<dyn ReservationOwner>, dyn ReservationOwner>;
292
293pub type Hold<'a> = ReservationImpl<&'a Reservation, Reservation>;
294
295/// Our allocator implementation tracks extents with a reference count.  At time of writing, these
296/// reference counts should never exceed 1, but that might change with snapshots and clones.
297pub type AllocatorKey = AllocatorKeyV32;
298
299#[derive(Clone, Debug, Deserialize, Eq, Hash, PartialEq, Serialize, TypeFingerprint, Versioned)]
300#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
301pub struct AllocatorKeyV32 {
302    pub device_range: Range<u64>,
303}
304
305impl SortByU64 for AllocatorKey {
306    fn get_leading_u64(&self) -> u64 {
307        self.device_range.start
308    }
309}
310
311const EXTENT_HASH_BUCKET_SIZE: u64 = 1 * 1024 * 1024;
312
313pub struct AllocatorKeyPartitionIterator {
314    device_range: Range<u64>,
315}
316
317impl Iterator for AllocatorKeyPartitionIterator {
318    type Item = u64;
319
320    fn next(&mut self) -> Option<Self::Item> {
321        if self.device_range.start >= self.device_range.end {
322            None
323        } else {
324            let start = self.device_range.start;
325            self.device_range.start = start.saturating_add(EXTENT_HASH_BUCKET_SIZE);
326            let end = std::cmp::min(self.device_range.start, self.device_range.end);
327            let key = AllocatorKey { device_range: start..end };
328            let hash = crate::stable_hash::stable_hash(key);
329            Some(hash)
330        }
331    }
332}
333
334impl FuzzyHash for AllocatorKey {
335    fn fuzzy_hash(&self) -> impl Iterator<Item = u64> {
336        AllocatorKeyPartitionIterator {
337            device_range: round_down(self.device_range.start, EXTENT_HASH_BUCKET_SIZE)
338                ..round_up(self.device_range.end, EXTENT_HASH_BUCKET_SIZE).unwrap_or(u64::MAX),
339        }
340    }
341
342    fn is_range_key(&self) -> bool {
343        true
344    }
345}
346
347impl AllocatorKey {
348    /// Returns a new key that is a lower bound suitable for use with merge_into.
349    pub fn lower_bound_for_merge_into(self: &AllocatorKey) -> AllocatorKey {
350        AllocatorKey { device_range: 0..self.device_range.start }
351    }
352}
353
354impl LayerKey for AllocatorKey {
355    fn merge_type(&self) -> MergeType {
356        MergeType::OptimizedMerge
357    }
358}
359
360impl OrdUpperBound for AllocatorKey {
361    fn cmp_upper_bound(&self, other: &AllocatorKey) -> std::cmp::Ordering {
362        self.device_range.end.cmp(&other.device_range.end)
363    }
364}
365
366impl OrdLowerBound for AllocatorKey {
367    fn cmp_lower_bound(&self, other: &AllocatorKey) -> std::cmp::Ordering {
368        // The ordering over range.end is significant here as it is used in
369        // the heap ordering that feeds into our merge function and
370        // a total ordering over range lets us remove a symmetry case from
371        // the allocator merge function.
372        self.device_range
373            .start
374            .cmp(&other.device_range.start)
375            .then(self.device_range.end.cmp(&other.device_range.end))
376    }
377}
378
379impl Ord for AllocatorKey {
380    fn cmp(&self, other: &AllocatorKey) -> std::cmp::Ordering {
381        self.device_range
382            .start
383            .cmp(&other.device_range.start)
384            .then(self.device_range.end.cmp(&other.device_range.end))
385    }
386}
387
388impl PartialOrd for AllocatorKey {
389    fn partial_cmp(&self, other: &AllocatorKey) -> Option<std::cmp::Ordering> {
390        Some(self.cmp(other))
391    }
392}
393
394impl RangeKey for AllocatorKey {
395    fn overlaps(&self, other: &Self) -> bool {
396        self.device_range.start < other.device_range.end
397            && self.device_range.end > other.device_range.start
398    }
399}
400
401/// Allocations are "owned" by a single ObjectStore and are reference counted
402/// (for future snapshot/clone support).
403pub type AllocatorValue = AllocatorValueV32;
404impl Value for AllocatorValue {
405    const DELETED_MARKER: Self = Self::None;
406}
407
408#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize, TypeFingerprint, Versioned)]
409#[cfg_attr(fuzz, derive(arbitrary::Arbitrary))]
410pub enum AllocatorValueV32 {
411    // Tombstone variant indicating an extent is no longer allocated.
412    None,
413    // Used when we know there are no possible allocations below us in the stack.
414    // This is currently all the time. We used to have a related Delta type but
415    // it has been removed due to correctness issues (https://fxbug.dev/42179428).
416    Abs { count: u64, owner_object_id: u64 },
417}
418
419pub type AllocatorItem = Item<AllocatorKey, AllocatorValue>;
420
421/// Serialized information about the allocator.
422pub type AllocatorInfo = AllocatorInfoV32;
423
424#[derive(Debug, Default, Clone, Deserialize, Serialize, TypeFingerprint, Versioned)]
425pub struct AllocatorInfoV32 {
426    /// Holds the set of layer file object_id for the LSM tree (newest first).
427    pub layers: Vec<u64>,
428    /// Maps from owner_object_id to bytes allocated.
429    pub allocated_bytes: BTreeMap<u64, u64>,
430    /// Set of owner_object_id that we should ignore if found in layer files.  For now, this should
431    /// always be empty on-disk because we always do full compactions.
432    pub marked_for_deletion: HashSet<u64>,
433    /// The limit for the number of allocates bytes per `owner_object_id` whereas the value. If
434    /// there is no limit present here for an `owner_object_id` assume it is max u64.
435    pub limit_bytes: BTreeMap<u64, u64>,
436}
437
438const MAX_ALLOCATOR_INFO_SERIALIZED_SIZE: usize = 131_072;
439
440/// Computes the target maximum extent size based on the block size of the allocator.
441pub fn max_extent_size_for_block_size(block_size: u64) -> u64 {
442    // Each block in an extent contains an 8-byte checksum (which due to varint encoding is 9
443    // bytes), and a given extent record must be no larger DEFAULT_MAX_SERIALIZED_RECORD_SIZE.  We
444    // also need to leave a bit of room (arbitrarily, 64 bytes) for the rest of the extent's
445    // metadata.
446    block_size * (DEFAULT_MAX_SERIALIZED_RECORD_SIZE - 64) / 9
447}
448
449#[derive(Default)]
450struct AllocatorCounters {
451    num_flushes: u64,
452    last_flush_time: Option<std::time::SystemTime>,
453}
454
455pub struct Allocator {
456    filesystem: Weak<FxFilesystem>,
457    block_size: u64,
458    device_size: u64,
459    object_id: u64,
460    max_extent_size_bytes: u64,
461    tree: LSMTree<AllocatorKey, AllocatorValue>,
462    // A list of allocations which are temporary; i.e. they are not actually stored in the LSM tree,
463    // but we still don't want to be able to allocate over them.  This layer is merged into the
464    // allocator's LSM tree whilst reading it, so the allocations appear to exist in the LSM tree.
465    // This is used in a few places, for example to hold back allocations that have been added to a
466    // transaction but are not yet committed yet, or to prevent the allocation of a deleted extent
467    // until the device is flushed.
468    temporary_allocations: Arc<SkipListLayer<AllocatorKey, AllocatorValue>>,
469    inner: Mutex<Inner>,
470    allocation_mutex: futures::lock::Mutex<()>,
471    counters: Mutex<AllocatorCounters>,
472    maximum_offset: AtomicU64,
473}
474
475/// Tracks the different stages of byte allocations for an individual owner.
476#[derive(Debug, Default, PartialEq)]
477struct ByteTracking {
478    /// This value is the up-to-date count of the number of allocated bytes per owner_object_id
479    /// whereas the value in `Info::allocated_bytes` is the value as it was when we last flushed.
480    /// This field should be regarded as *untrusted*; it can be invalid due to filesystem
481    /// inconsistencies, and this is why it is `Saturating<u64>` rather than just `u64`.  Any
482    /// computations that use this value should typically propagate `Saturating` so that it is clear
483    /// the value is *untrusted*.
484    allocated_bytes: Saturating<u64>,
485
486    /// This value is the number of bytes allocated to uncommitted allocations.
487    /// (Bytes allocated, but not yet persisted to disk)
488    uncommitted_allocated_bytes: u64,
489
490    /// This value is the number of bytes allocated to reservations.
491    reserved_bytes: u64,
492
493    /// Committed deallocations that we cannot use until they are flushed to the device.  Each entry
494    /// in this list is the log file offset at which it was committed and an array of deallocations
495    /// that occurred at that time.
496    committed_deallocated_bytes: u64,
497}
498
499impl ByteTracking {
500    // Returns the total number of bytes that are taken either from reservations, allocations or
501    // uncommitted allocations.
502    fn used_bytes(&self) -> Saturating<u64> {
503        self.allocated_bytes + Saturating(self.uncommitted_allocated_bytes + self.reserved_bytes)
504    }
505
506    // Returns the amount that is not available to be allocated, which includes actually allocated
507    // bytes, bytes that have been allocated for a transaction but the transaction hasn't committed
508    // yet, and bytes that have been deallocated, but the device hasn't been flushed yet so we can't
509    // reuse those bytes yet.
510    fn unavailable_bytes(&self) -> Saturating<u64> {
511        self.allocated_bytes
512            + Saturating(self.uncommitted_allocated_bytes)
513            + Saturating(self.committed_deallocated_bytes)
514    }
515
516    // Like `unavailable_bytes`, but treats as available the bytes which have been deallocated and
517    // require a device flush to be reused.
518    fn unavailable_after_sync_bytes(&self) -> Saturating<u64> {
519        self.allocated_bytes + Saturating(self.uncommitted_allocated_bytes)
520    }
521}
522
523#[derive(Debug)]
524struct CommittedDeallocation {
525    // The offset at which this deallocation was committed.
526    log_file_offset: u64,
527    // The device range being deallocated.
528    range: Range<u64>,
529    // The owning object id which originally allocated it.
530    owner_object_id: u64,
531}
532
533struct Inner {
534    info: AllocatorInfo,
535
536    /// The allocator can only be opened if there have been no allocations and it has not already
537    /// been opened or initialized.
538    opened: bool,
539
540    /// When we allocate a range from RAM, we add it to Allocator::temporary_allocations.
541    /// This layer is added to layer_set in rebuild_strategy and take_for trim so we don't assume
542    /// the range is available.
543    /// If we apply the mutation, we move it from `temporary_allocations` to the LSMTree.
544    /// If we drop the mutation, we just delete it from `temporary_allocations` and call `free`.
545    ///
546    /// We need to be very careful about races when we move the allocation into the LSMTree.
547    /// A layer_set is a snapshot in time of a set of layers. Say:
548    ///  1. `rebuild_strategy` takes a layer_set that includes the mutable layer.
549    ///  2. A compaction operation seals that mutable layer and creates a new mutable layer.
550    ///     Note that the layer_set in rebuild_strategy doesn't have this new mutable layer.
551    ///  3. An apply_mutation operation adds the allocation to the new mutable layer.
552    ///     It then removes the allocation from `temporary_allocations`.
553    ///  4. `rebuild_strategy` iterates the layer_set, missing both the temporary mutation AND
554    ///     the record in the new mutable layer, making the allocated range available for double
555    ///     allocation and future filesystem corruption.
556    ///
557    /// We don't have (or want) a means to lock compctions during rebuild_strategy operations so
558    /// to avoid this scenario, we can't remove from temporary_allocations when we apply a mutation.
559    /// Instead we add them to dropped_temporary_allocations and make the actual removals from
560    /// `temporary_allocations` while holding the allocator lock. Because rebuild_strategy only
561    /// runs with this lock held, this guarantees that we don't remove entries from temporary
562    /// mutations while also iterating over it.
563    ///
564    /// A related issue happens when we deallocate a range. We use temporary_allocations this time
565    /// to prevent reuse until after the deallocation has been successfully flushed.
566    /// In this scenario we don't want to call 'free()' until the range is ready to use again.
567    dropped_temporary_allocations: Vec<Range<u64>>,
568
569    /// The per-owner counters for bytes at various stages of the data life-cycle. From initial
570    /// reservation through until the bytes are unallocated and eventually uncommitted.
571    owner_bytes: BTreeMap<u64, ByteTracking>,
572
573    /// This value is the number of bytes allocated to reservations but not tracked as part of a
574    /// particular volume.
575    unattributed_reserved_bytes: u64,
576
577    /// Committed deallocations that we cannot use until they are flushed to the device.
578    committed_deallocated: VecDeque<CommittedDeallocation>,
579
580    /// Bytes which are currently being trimmed.  These can still be allocated from, but the
581    /// allocation will block until the current batch of trimming is finished.
582    trim_reserved_bytes: u64,
583
584    /// While a trim is being performed, this listener is set.  When the current batch of extents
585    /// being trimmed have been released (and trim_reserved_bytes is 0), this is signaled.
586    /// This should only be listened to while the allocation_mutex is held.
587    trim_listener: Option<EventListener>,
588
589    /// This controls how we allocate our free space to manage fragmentation.
590    strategy: strategy::BestFit,
591
592    /// Tracks the number of allocations of size 1,2,...63,>=64.
593    allocation_size_histogram: [u64; 64],
594    /// Tracks which size bucket triggers rebuild_strategy.
595    rebuild_strategy_trigger_histogram: [u64; 64],
596
597    /// This is distinct from the set contained in `info`.  New entries are inserted *after* a
598    /// device has been flushed (it is not safe to reuse the space taken by a deleted volume prior
599    /// to this) and are removed after a major compaction.
600    marked_for_deletion: HashSet<u64>,
601
602    /// The set of volumes deleted that are waiting for a sync.
603    volumes_deleted_pending_sync: HashSet<u64>,
604}
605
606impl Inner {
607    fn allocated_bytes(&self) -> Saturating<u64> {
608        let mut total = Saturating(0);
609        for (_, bytes) in &self.owner_bytes {
610            total += bytes.allocated_bytes;
611        }
612        total
613    }
614
615    fn uncommitted_allocated_bytes(&self) -> u64 {
616        self.owner_bytes.values().map(|x| &x.uncommitted_allocated_bytes).sum()
617    }
618
619    fn reserved_bytes(&self) -> u64 {
620        self.owner_bytes.values().map(|x| &x.reserved_bytes).sum::<u64>()
621            + self.unattributed_reserved_bytes
622    }
623
624    fn owner_id_limit_bytes(&self, owner_object_id: u64) -> u64 {
625        match self.info.limit_bytes.get(&owner_object_id) {
626            Some(v) => *v,
627            None => u64::MAX,
628        }
629    }
630
631    fn owner_id_bytes_left(&self, owner_object_id: u64) -> u64 {
632        let limit = self.owner_id_limit_bytes(owner_object_id);
633        let used = self.owner_bytes.get(&owner_object_id).map_or(Saturating(0), |b| b.used_bytes());
634        (Saturating(limit) - used).0
635    }
636
637    // Returns the amount that is not available to be allocated, which includes actually allocated
638    // bytes, bytes that have been allocated for a transaction but the transaction hasn't committed
639    // yet, and bytes that have been deallocated, but the device hasn't been flushed yet so we can't
640    // reuse those bytes yet.
641    fn unavailable_bytes(&self) -> Saturating<u64> {
642        let mut total = Saturating(0);
643        for (_, bytes) in &self.owner_bytes {
644            total += bytes.unavailable_bytes();
645        }
646        total
647    }
648
649    // Returns the total number of bytes that are taken either from reservations, allocations or
650    // uncommitted allocations.
651    fn used_bytes(&self) -> Saturating<u64> {
652        let mut total = Saturating(0);
653        for (_, bytes) in &self.owner_bytes {
654            total += bytes.used_bytes();
655        }
656        total + Saturating(self.unattributed_reserved_bytes)
657    }
658
659    // Like `unavailable_bytes`, but treats as available the bytes which have been deallocated and
660    // require a device flush to be reused.
661    fn unavailable_after_sync_bytes(&self) -> Saturating<u64> {
662        let mut total = Saturating(0);
663        for (_, bytes) in &self.owner_bytes {
664            total += bytes.unavailable_after_sync_bytes();
665        }
666        total
667    }
668
669    // Returns the number of bytes which will be available after the current batch of trimming
670    // completes.
671    fn bytes_available_not_being_trimmed(&self, device_size: u64) -> Result<u64, Error> {
672        device_size
673            .checked_sub(
674                (self.unavailable_after_sync_bytes() + Saturating(self.trim_reserved_bytes)).0,
675            )
676            .ok_or_else(|| anyhow!(FxfsError::Inconsistent))
677    }
678
679    fn add_reservation(&mut self, owner_object_id: Option<u64>, amount: u64) {
680        match owner_object_id {
681            Some(owner) => self.owner_bytes.entry(owner).or_default().reserved_bytes += amount,
682            None => self.unattributed_reserved_bytes += amount,
683        };
684    }
685
686    fn remove_reservation(&mut self, owner_object_id: Option<u64>, amount: u64) {
687        match owner_object_id {
688            Some(owner) => {
689                let owner_entry = self.owner_bytes.entry(owner).or_default();
690                assert!(
691                    owner_entry.reserved_bytes >= amount,
692                    "{} >= {}",
693                    owner_entry.reserved_bytes,
694                    amount
695                );
696                owner_entry.reserved_bytes -= amount;
697            }
698            None => {
699                assert!(
700                    self.unattributed_reserved_bytes >= amount,
701                    "{} >= {}",
702                    self.unattributed_reserved_bytes,
703                    amount
704                );
705                self.unattributed_reserved_bytes -= amount
706            }
707        };
708    }
709}
710
711/// A container for a set of extents which are known to be free and can be trimmed.  Returned by
712/// `take_for_trimming`.
713pub struct TrimmableExtents<'a> {
714    allocator: &'a Allocator,
715    extents: Vec<Range<u64>>,
716    // The allocator can subscribe to this event to wait until these extents are dropped.  This way,
717    // we don't fail an allocation attempt if blocks are tied up for trimming; rather, we just wait
718    // until the batch is finished with and then proceed.
719    _drop_event: DropEvent,
720}
721
722impl<'a> TrimmableExtents<'a> {
723    pub fn extents(&self) -> &Vec<Range<u64>> {
724        &self.extents
725    }
726
727    // Also returns an EventListener which is signaled when this is dropped.
728    fn new(allocator: &'a Allocator) -> (Self, EventListener) {
729        let drop_event = DropEvent::new();
730        let listener = drop_event.listen();
731        (Self { allocator, extents: vec![], _drop_event: drop_event }, listener)
732    }
733
734    fn add_extent(&mut self, extent: Range<u64>) {
735        self.extents.push(extent);
736    }
737}
738
739impl<'a> Drop for TrimmableExtents<'a> {
740    fn drop(&mut self) {
741        let mut inner = self.allocator.inner.lock();
742        for device_range in std::mem::take(&mut self.extents) {
743            inner.strategy.free(device_range.clone()).expect("drop trim extent");
744            self.allocator
745                .temporary_allocations
746                .erase(&AllocatorKey { device_range: device_range.clone() });
747        }
748        inner.trim_reserved_bytes = 0;
749    }
750}
751
752impl Allocator {
753    pub fn new(filesystem: Arc<FxFilesystem>, object_id: u64) -> Allocator {
754        let block_size = filesystem.block_size();
755        // We expect device size to be a multiple of block size. Throw away any tail.
756        let device_size = round_down(filesystem.device().size(), block_size);
757        if device_size != filesystem.device().size() {
758            warn!("Device size is not block aligned. Rounding down.");
759        }
760        let max_extent_size_bytes = max_extent_size_for_block_size(filesystem.block_size());
761        let mut strategy = strategy::BestFit::default();
762        strategy.free(0..device_size).expect("new fs");
763        Allocator {
764            filesystem: Arc::downgrade(&filesystem),
765            block_size,
766            device_size,
767            object_id,
768            max_extent_size_bytes,
769            tree: LSMTree::new(merge, Box::new(NullCache {})),
770            temporary_allocations: SkipListLayer::new(1024),
771            inner: Mutex::new(Inner {
772                info: AllocatorInfo::default(),
773                opened: false,
774                dropped_temporary_allocations: Vec::new(),
775                owner_bytes: BTreeMap::new(),
776                unattributed_reserved_bytes: 0,
777                committed_deallocated: VecDeque::new(),
778                trim_reserved_bytes: 0,
779                trim_listener: None,
780                strategy,
781                allocation_size_histogram: [0; 64],
782                rebuild_strategy_trigger_histogram: [0; 64],
783                marked_for_deletion: HashSet::new(),
784                volumes_deleted_pending_sync: HashSet::new(),
785            }),
786            allocation_mutex: futures::lock::Mutex::new(()),
787            counters: Mutex::new(AllocatorCounters::default()),
788            maximum_offset: AtomicU64::new(0),
789        }
790    }
791
792    pub fn tree(&self) -> &LSMTree<AllocatorKey, AllocatorValue> {
793        &self.tree
794    }
795
796    /// Returns an iterator that yields all allocations, filtering out tombstones and any
797    /// owner_object_id that have been marked as deleted.  If `committed_marked_for_deletion` is
798    /// true, then filter using the committed volumes marked for deletion rather than the in-memory
799    /// copy which excludes volumes that have been deleted but there hasn't been a sync yet.
800    pub async fn filter(
801        &self,
802        iter: impl LayerIterator<AllocatorKey, AllocatorValue>,
803        committed_marked_for_deletion: bool,
804    ) -> Result<impl LayerIterator<AllocatorKey, AllocatorValue>, Error> {
805        let marked_for_deletion = {
806            let inner = self.inner.lock();
807            if committed_marked_for_deletion {
808                &inner.info.marked_for_deletion
809            } else {
810                &inner.marked_for_deletion
811            }
812            .clone()
813        };
814        let iter =
815            filter_marked_for_deletion(filter_tombstones(iter).await?, marked_for_deletion).await?;
816        Ok(iter)
817    }
818
819    /// A histogram of allocation request sizes.
820    /// The index into the array is 'number of blocks'.
821    /// The last bucket is a catch-all for larger allocation requests.
822    pub fn allocation_size_histogram(&self) -> [u64; 64] {
823        self.inner.lock().allocation_size_histogram
824    }
825
826    /// Creates a new (empty) allocator.
827    pub async fn create(&self, transaction: &mut Transaction<'_>) -> Result<(), Error> {
828        // Mark the allocator as opened before creating the file because creating a new
829        // transaction requires a reservation.
830        assert_eq!(std::mem::replace(&mut self.inner.lock().opened, true), false);
831
832        let filesystem = self.filesystem.upgrade().unwrap();
833        let root_store = filesystem.root_store();
834        ObjectStore::create_object_with_id(
835            &root_store,
836            transaction,
837            self.object_id(),
838            HandleOptions::default(),
839            None,
840        )?;
841        root_store.update_last_object_id(self.object_id());
842        Ok(())
843    }
844
845    // Opens the allocator.  This is not thread-safe; this should be called prior to
846    // replaying the allocator's mutations.
847    pub async fn open(self: &Arc<Self>) -> Result<(), Error> {
848        let filesystem = self.filesystem.upgrade().unwrap();
849        let root_store = filesystem.root_store();
850
851        self.inner.lock().strategy = strategy::BestFit::default();
852
853        let handle =
854            ObjectStore::open_object(&root_store, self.object_id, HandleOptions::default(), None)
855                .await
856                .context("Failed to open allocator object")?;
857
858        if handle.get_size() > 0 {
859            let serialized_info = handle
860                .contents(MAX_ALLOCATOR_INFO_SERIALIZED_SIZE)
861                .await
862                .context("Failed to read AllocatorInfo")?;
863            let mut cursor = std::io::Cursor::new(serialized_info);
864            let (info, _version) = AllocatorInfo::deserialize_with_version(&mut cursor)
865                .context("Failed to deserialize AllocatorInfo")?;
866
867            let mut handles = Vec::new();
868            let mut total_size = 0;
869            for object_id in &info.layers {
870                let handle = ObjectStore::open_object(
871                    &root_store,
872                    *object_id,
873                    HandleOptions::default(),
874                    None,
875                )
876                .await
877                .context("Failed to open allocator layer file")?;
878
879                let size = handle.get_size();
880                total_size += size;
881                handles.push(handle);
882            }
883
884            {
885                let mut inner = self.inner.lock();
886
887                // Check allocated_bytes fits within the device.
888                let mut device_bytes = self.device_size;
889                for (&owner_object_id, &bytes) in &info.allocated_bytes {
890                    ensure!(
891                        bytes <= device_bytes,
892                        anyhow!(FxfsError::Inconsistent).context(format!(
893                            "Allocated bytes exceeds device size: {:?}",
894                            info.allocated_bytes
895                        ))
896                    );
897                    device_bytes -= bytes;
898
899                    inner.owner_bytes.entry(owner_object_id).or_default().allocated_bytes =
900                        Saturating(bytes);
901                }
902
903                inner.info = info;
904            }
905
906            self.tree.append_layers(handles).await.context("Failed to append allocator layers")?;
907            self.filesystem.upgrade().unwrap().object_manager().update_reservation(
908                self.object_id,
909                tree::reservation_amount_from_layer_size(total_size),
910            );
911        }
912
913        Ok(())
914    }
915
916    pub async fn on_replay_complete(self: &Arc<Self>) -> Result<(), Error> {
917        // We can assume the device has been flushed.
918        {
919            let mut inner = self.inner.lock();
920            inner.volumes_deleted_pending_sync.clear();
921            inner.marked_for_deletion = inner.info.marked_for_deletion.clone();
922        }
923
924        // Build free extent structure from disk. For now, we expect disks to have *some* free
925        // space at all times. This may change if we ever support mounting of read-only
926        // redistributable filesystem images.
927        if !self.rebuild_strategy().await.context("Build free extents")? {
928            error!("Device contains no free space.");
929            return Err(FxfsError::Inconsistent).context("Device appears to contain no free space");
930        }
931
932        assert_eq!(std::mem::replace(&mut self.inner.lock().opened, true), false);
933        Ok(())
934    }
935
936    /// Walk all allocations to generate the set of free regions between allocations.
937    /// It is safe to re-run this on live filesystems but it should not be called concurrently
938    /// with allocations, trims or other rebuild_strategy invocations -- use allocation_mutex.
939    /// Returns true if the rebuild made changes to either the free ranges or overflow markers.
940    async fn rebuild_strategy(self: &Arc<Self>) -> Result<bool, Error> {
941        let mut changed = false;
942        let mut layer_set = self.tree.empty_layer_set();
943        layer_set.layers.push((self.temporary_allocations.clone() as Arc<dyn Layer<_, _>>).into());
944        self.tree.add_all_layers_to_layer_set(&mut layer_set);
945
946        let overflow_markers = self.inner.lock().strategy.overflow_markers();
947        self.inner.lock().strategy.reset_overflow_markers();
948
949        let mut to_add = Vec::new();
950        let mut merger = layer_set.merger();
951        let mut iter = self.filter(merger.query(Query::FullScan).await?, false).await?;
952        let mut last_offset = 0;
953        while last_offset < self.device_size {
954            let next_range = match iter.get() {
955                None => {
956                    assert!(last_offset <= self.device_size);
957                    let range = last_offset..self.device_size;
958                    last_offset = self.device_size;
959                    iter.advance().await?;
960                    range
961                }
962                Some(ItemRef { key: AllocatorKey { device_range, .. }, .. }) => {
963                    if device_range.end < last_offset {
964                        iter.advance().await?;
965                        continue;
966                    }
967                    if device_range.start <= last_offset {
968                        last_offset = device_range.end;
969                        iter.advance().await?;
970                        continue;
971                    }
972                    let range = last_offset..device_range.start;
973                    last_offset = device_range.end;
974                    iter.advance().await?;
975                    range
976                }
977            };
978            to_add.push(next_range);
979            // Avoid taking a lock on inner for every free range.
980            if to_add.len() > 100 {
981                let mut inner = self.inner.lock();
982                for range in to_add.drain(..) {
983                    changed |= inner.strategy.force_free(range)?;
984                }
985            }
986        }
987        let mut inner = self.inner.lock();
988        for range in to_add {
989            changed |= inner.strategy.force_free(range)?;
990        }
991        if overflow_markers != inner.strategy.overflow_markers() {
992            changed = true;
993        }
994        Ok(changed)
995    }
996
997    /// Collects up to `extents_per_batch` free extents of size up to `max_extent_size` from
998    /// `offset`.  The extents will be reserved.
999    /// Note that only one `FreeExtents` can exist for the allocator at any time.
1000    pub async fn take_for_trimming(
1001        &self,
1002        offset: u64,
1003        max_extent_size: usize,
1004        extents_per_batch: usize,
1005    ) -> Result<TrimmableExtents<'_>, Error> {
1006        let _guard = self.allocation_mutex.lock().await;
1007
1008        let (mut result, listener) = TrimmableExtents::new(self);
1009        let mut bytes = 0;
1010
1011        // We can't just use self.strategy here because it doesn't necessarily hold all
1012        // free extent metadata in RAM.
1013        let mut layer_set = self.tree.empty_layer_set();
1014        layer_set.layers.push((self.temporary_allocations.clone() as Arc<dyn Layer<_, _>>).into());
1015        self.tree.add_all_layers_to_layer_set(&mut layer_set);
1016        let mut merger = layer_set.merger();
1017        let mut iter = self
1018            .filter(
1019                merger.query(Query::FullRange(&AllocatorKey { device_range: offset..0 })).await?,
1020                false,
1021            )
1022            .await?;
1023        let mut last_offset = offset;
1024        'outer: while last_offset < self.device_size {
1025            let mut range = match iter.get() {
1026                None => {
1027                    assert!(last_offset <= self.device_size);
1028                    let range = last_offset..self.device_size;
1029                    last_offset = self.device_size;
1030                    iter.advance().await?;
1031                    range
1032                }
1033                Some(ItemRef { key: AllocatorKey { device_range, .. }, .. }) => {
1034                    if device_range.end <= last_offset {
1035                        iter.advance().await?;
1036                        continue;
1037                    }
1038                    if device_range.start <= last_offset {
1039                        last_offset = device_range.end;
1040                        iter.advance().await?;
1041                        continue;
1042                    }
1043                    let range = last_offset..device_range.start;
1044                    last_offset = device_range.end;
1045                    iter.advance().await?;
1046                    range
1047                }
1048            };
1049            if range.start < offset {
1050                continue;
1051            }
1052
1053            // 'range' is based on the on-disk LSM tree. We need to check against any uncommitted
1054            // allocations and remove temporarily allocate the range for ourselves.
1055            let mut inner = self.inner.lock();
1056
1057            while range.start < range.end {
1058                let prefix =
1059                    range.start..std::cmp::min(range.start + max_extent_size as u64, range.end);
1060                range = prefix.end..range.end;
1061                bytes += prefix.length()?;
1062                // We can assume the following are safe because we've checked both LSM tree and
1063                // temporary_allocations while holding the lock.
1064                inner.strategy.remove(prefix.clone());
1065                self.temporary_allocations.insert(AllocatorItem {
1066                    key: AllocatorKey { device_range: prefix.clone() },
1067                    value: AllocatorValue::Abs { owner_object_id: INVALID_OBJECT_ID, count: 1 },
1068                    sequence: 0,
1069                })?;
1070                result.add_extent(prefix);
1071                if result.extents.len() == extents_per_batch {
1072                    break 'outer;
1073                }
1074            }
1075            if result.extents.len() == extents_per_batch {
1076                break 'outer;
1077            }
1078        }
1079        {
1080            let mut inner = self.inner.lock();
1081
1082            assert!(inner.trim_reserved_bytes == 0, "Multiple trims ongoing");
1083            inner.trim_listener = Some(listener);
1084            inner.trim_reserved_bytes = bytes;
1085            debug_assert!(
1086                (Saturating(inner.trim_reserved_bytes) + inner.unavailable_bytes()).0
1087                    <= self.device_size
1088            );
1089        }
1090        Ok(result)
1091    }
1092
1093    /// Returns all objects that exist in the parent store that pertain to this allocator.
1094    pub fn parent_objects(&self) -> Vec<u64> {
1095        // The allocator tree needs to store a file for each of the layers in the tree, so we return
1096        // those, since nothing else references them.
1097        self.inner.lock().info.layers.clone()
1098    }
1099
1100    /// Returns all the current owner byte limits (in pairs of `(owner_object_id, bytes)`).
1101    pub fn owner_byte_limits(&self) -> Vec<(u64, u64)> {
1102        self.inner.lock().info.limit_bytes.iter().map(|(k, v)| (*k, *v)).collect()
1103    }
1104
1105    /// Returns (allocated_bytes, byte_limit) for the given owner.
1106    pub fn owner_allocation_info(&self, owner_object_id: u64) -> (u64, Option<u64>) {
1107        let inner = self.inner.lock();
1108        (
1109            inner.owner_bytes.get(&owner_object_id).map(|b| b.used_bytes().0).unwrap_or(0u64),
1110            inner.info.limit_bytes.get(&owner_object_id).copied(),
1111        )
1112    }
1113
1114    /// Returns owner bytes debug information.
1115    pub fn owner_bytes_debug(&self) -> String {
1116        format!("{:?}", self.inner.lock().owner_bytes)
1117    }
1118
1119    fn needs_sync(&self) -> bool {
1120        // TODO(https://fxbug.dev/42178048): This will only trigger if *all* free space is taken up with
1121        // committed deallocated bytes, but we might want to trigger a sync if we're low and there
1122        // happens to be a lot of deallocated bytes as that might mean we can fully satisfy
1123        // allocation requests.
1124        let inner = self.inner.lock();
1125        inner.unavailable_bytes().0 >= self.device_size
1126    }
1127
1128    fn is_system_store(&self, owner_object_id: u64) -> bool {
1129        let fs = self.filesystem.upgrade().unwrap();
1130        owner_object_id == fs.object_manager().root_store_object_id()
1131            || owner_object_id == fs.object_manager().root_parent_store_object_id()
1132    }
1133
1134    /// Updates the accounting to track that a byte reservation has been moved out of an owner to
1135    /// the unattributed pool.
1136    pub fn disown_reservation(&self, old_owner_object_id: Option<u64>, amount: u64) {
1137        if old_owner_object_id.is_none() || amount == 0 {
1138            return;
1139        }
1140        // These 2 mutations should behave as though they're a single atomic mutation.
1141        let mut inner = self.inner.lock();
1142        inner.remove_reservation(old_owner_object_id, amount);
1143        inner.add_reservation(None, amount);
1144    }
1145
1146    /// Creates a lazy inspect node named `str` under `parent` which will yield statistics for the
1147    /// allocator when queried.
1148    pub fn track_statistics(self: &Arc<Self>, parent: &fuchsia_inspect::Node, name: &str) {
1149        let this = Arc::downgrade(self);
1150        parent.record_lazy_child(name, move || {
1151            let this_clone = this.clone();
1152            async move {
1153                let inspector = fuchsia_inspect::Inspector::default();
1154                if let Some(this) = this_clone.upgrade() {
1155                    let counters = this.counters.lock();
1156                    let root = inspector.root();
1157                    root.record_uint("max_extent_size_bytes", this.max_extent_size_bytes);
1158                    root.record_uint("bytes_total", this.device_size);
1159                    let (allocated, reserved, used, unavailable) = {
1160                        // TODO(https://fxbug.dev/42069513): Push-back or rate-limit to prevent DoS.
1161                        let inner = this.inner.lock();
1162                        (
1163                            inner.allocated_bytes().0,
1164                            inner.reserved_bytes(),
1165                            inner.used_bytes().0,
1166                            inner.unavailable_bytes().0,
1167                        )
1168                    };
1169                    root.record_uint("bytes_allocated", allocated);
1170                    root.record_uint("bytes_reserved", reserved);
1171                    root.record_uint("bytes_used", used);
1172                    root.record_uint("bytes_unavailable", unavailable);
1173
1174                    // TODO(https://fxbug.dev/42068224): Post-compute rather than manually computing
1175                    // metrics.
1176                    if let Some(x) = round_div(100 * allocated, this.device_size) {
1177                        root.record_uint("bytes_allocated_percent", x);
1178                    }
1179                    if let Some(x) = round_div(100 * reserved, this.device_size) {
1180                        root.record_uint("bytes_reserved_percent", x);
1181                    }
1182                    if let Some(x) = round_div(100 * used, this.device_size) {
1183                        root.record_uint("bytes_used_percent", x);
1184                    }
1185                    if let Some(x) = round_div(100 * unavailable, this.device_size) {
1186                        root.record_uint("bytes_unavailable_percent", x);
1187                    }
1188
1189                    root.record_uint("num_flushes", counters.num_flushes);
1190                    if let Some(last_flush_time) = counters.last_flush_time.as_ref() {
1191                        root.record_uint(
1192                            "last_flush_time_ms",
1193                            last_flush_time
1194                                .duration_since(std::time::UNIX_EPOCH)
1195                                .unwrap_or(std::time::Duration::ZERO)
1196                                .as_millis()
1197                                .try_into()
1198                                .unwrap_or(0u64),
1199                        );
1200                    }
1201
1202                    let data = this.allocation_size_histogram();
1203                    let alloc_sizes = root.create_uint_linear_histogram(
1204                        "allocation_size_histogram",
1205                        fuchsia_inspect::LinearHistogramParams {
1206                            floor: 1,
1207                            step_size: 1,
1208                            buckets: 64,
1209                        },
1210                    );
1211                    for (i, count) in data.iter().enumerate() {
1212                        if i != 0 {
1213                            alloc_sizes.insert_multiple(i as u64, *count as usize);
1214                        }
1215                    }
1216                    root.record(alloc_sizes);
1217
1218                    let data = this.inner.lock().rebuild_strategy_trigger_histogram;
1219                    let triggers = root.create_uint_linear_histogram(
1220                        "rebuild_strategy_triggers",
1221                        fuchsia_inspect::LinearHistogramParams {
1222                            floor: 1,
1223                            step_size: 1,
1224                            buckets: 64,
1225                        },
1226                    );
1227                    for (i, count) in data.iter().enumerate() {
1228                        if i != 0 {
1229                            triggers.insert_multiple(i as u64, *count as usize);
1230                        }
1231                    }
1232                    root.record(triggers);
1233                }
1234                Ok(inspector)
1235            }
1236            .boxed()
1237        });
1238    }
1239
1240    /// Returns the offset of the first byte which has not been used by the allocator since its
1241    /// creation.
1242    /// NB: This does *not* take into account existing allocations.  This is only reliable when the
1243    /// allocator was created from scratch, without any pre-existing allocations.
1244    pub fn maximum_offset(&self) -> u64 {
1245        self.maximum_offset.load(Ordering::Relaxed)
1246    }
1247}
1248
1249impl Drop for Allocator {
1250    fn drop(&mut self) {
1251        let inner = self.inner.lock();
1252        // Uncommitted and reserved should be released back using RAII, so they should be zero.
1253        assert_eq!(inner.uncommitted_allocated_bytes(), 0);
1254        assert_eq!(inner.reserved_bytes(), 0);
1255    }
1256}
1257
1258#[fxfs_trace::trace]
1259impl Allocator {
1260    /// Returns the object ID for the allocator.
1261    pub fn object_id(&self) -> u64 {
1262        self.object_id
1263    }
1264
1265    /// Returns information about the allocator such as the layer files storing persisted
1266    /// allocations.
1267    pub fn info(&self) -> AllocatorInfo {
1268        self.inner.lock().info.clone()
1269    }
1270
1271    /// Tries to allocate enough space for |object_range| in the specified object and returns the
1272    /// device range allocated.
1273    /// The allocated range may be short (e.g. due to fragmentation), in which case the caller can
1274    /// simply call allocate again until they have enough blocks.
1275    ///
1276    /// We also store the object store ID of the store that the allocation should be assigned to so
1277    /// that we have a means to delete encrypted stores without needing the encryption key.
1278    #[trace]
1279    pub async fn allocate(
1280        self: &Arc<Self>,
1281        transaction: &mut Transaction<'_>,
1282        owner_object_id: u64,
1283        mut len: u64,
1284    ) -> Result<Range<u64>, Error> {
1285        assert_eq!(len % self.block_size, 0);
1286        len = std::cmp::min(len, self.max_extent_size_bytes);
1287        debug_assert_ne!(owner_object_id, INVALID_OBJECT_ID);
1288
1289        // Make sure we have space reserved before we try and find the space.
1290        let reservation = if let Some(reservation) = transaction.allocator_reservation {
1291            match reservation.owner_object_id {
1292                // If there is no owner, this must be a system store that we're allocating for.
1293                None => assert!(self.is_system_store(owner_object_id)),
1294                // If it has an owner, it should not be different than the allocating owner.
1295                Some(res_owner_object_id) => assert_eq!(owner_object_id, res_owner_object_id),
1296            };
1297            // Reservation limits aren't necessarily a multiple of the block size.
1298            let r = reservation
1299                .reserve_with(|limit| std::cmp::min(len, round_down(limit, self.block_size)));
1300            len = r.amount();
1301            Left(r)
1302        } else {
1303            let mut inner = self.inner.lock();
1304            assert!(inner.opened);
1305            // Do not exceed the limit for the owner or the device.
1306            let device_used = inner.used_bytes();
1307            let owner_bytes_left = inner.owner_id_bytes_left(owner_object_id);
1308            // We must take care not to use up space that might be reserved.
1309            let limit =
1310                std::cmp::min(owner_bytes_left, (Saturating(self.device_size) - device_used).0);
1311            len = round_down(std::cmp::min(len, limit), self.block_size);
1312            let owner_entry = inner.owner_bytes.entry(owner_object_id).or_default();
1313            owner_entry.reserved_bytes += len;
1314            Right(ReservationImpl::<_, Self>::new(&**self, Some(owner_object_id), len))
1315        };
1316
1317        ensure!(len > 0, FxfsError::NoSpace);
1318
1319        // If volumes have been deleted, flush the device so that we can use any of the freed space.
1320        let volumes_deleted = {
1321            let inner = self.inner.lock();
1322            (!inner.volumes_deleted_pending_sync.is_empty())
1323                .then(|| inner.volumes_deleted_pending_sync.clone())
1324        };
1325
1326        if let Some(volumes_deleted) = volumes_deleted {
1327            // No locks are held here, so in theory, there could be unnecessary syncs, but it
1328            // should be sufficiently rare that it won't matter.
1329            self.filesystem
1330                .upgrade()
1331                .unwrap()
1332                .sync(SyncOptions {
1333                    flush_device: true,
1334                    precondition: Some(Box::new(|| {
1335                        !self.inner.lock().volumes_deleted_pending_sync.is_empty()
1336                    })),
1337                    ..Default::default()
1338                })
1339                .await?;
1340
1341            {
1342                let mut inner = self.inner.lock();
1343                for owner_id in volumes_deleted {
1344                    inner.volumes_deleted_pending_sync.remove(&owner_id);
1345                    inner.marked_for_deletion.insert(owner_id);
1346                }
1347            }
1348
1349            let _guard = self.allocation_mutex.lock().await;
1350            self.rebuild_strategy().await?;
1351        }
1352
1353        #[allow(clippy::never_loop)] // Loop used as a for {} else {}.
1354        let _guard = 'sync: loop {
1355            // Cap number of sync attempts before giving up on finding free space.
1356            for _ in 0..10 {
1357                {
1358                    let guard = self.allocation_mutex.lock().await;
1359
1360                    if !self.needs_sync() {
1361                        break 'sync guard;
1362                    }
1363                }
1364
1365                // All the free space is currently tied up with deallocations, so we need to sync
1366                // and flush the device to free that up.
1367                //
1368                // We can't hold the allocation lock whilst we sync here because the allocation lock
1369                // is also taken in apply_mutations, which is called when journal locks are held,
1370                // and we call sync here which takes those same locks, so it would have the
1371                // potential to result in a deadlock.  Sync holds its own lock to guard against
1372                // multiple syncs occurring at the same time, and we can supply a precondition that
1373                // is evaluated under that lock to ensure we don't sync twice if we don't need to.
1374                self.filesystem
1375                    .upgrade()
1376                    .unwrap()
1377                    .sync(SyncOptions {
1378                        flush_device: true,
1379                        precondition: Some(Box::new(|| self.needs_sync())),
1380                        ..Default::default()
1381                    })
1382                    .await?;
1383            }
1384            bail!(
1385                anyhow!(FxfsError::NoSpace).context("Sync failed to yield sufficient free space.")
1386            );
1387        };
1388
1389        let mut trim_listener = None;
1390        {
1391            let mut inner = self.inner.lock();
1392            inner.allocation_size_histogram[std::cmp::min(63, len / self.block_size) as usize] += 1;
1393
1394            // If trimming would be the reason that this allocation gets cut short, wait for
1395            // trimming to complete before proceeding.
1396            let avail = self
1397                .device_size
1398                .checked_sub(inner.unavailable_bytes().0)
1399                .ok_or(FxfsError::Inconsistent)?;
1400            let free_and_not_being_trimmed =
1401                inner.bytes_available_not_being_trimmed(self.device_size)?;
1402            if free_and_not_being_trimmed < std::cmp::min(len, avail) {
1403                debug_assert!(inner.trim_reserved_bytes > 0);
1404                trim_listener = std::mem::take(&mut inner.trim_listener);
1405            }
1406        }
1407
1408        if let Some(listener) = trim_listener {
1409            listener.await;
1410        }
1411
1412        let result = loop {
1413            {
1414                let mut inner = self.inner.lock();
1415
1416                // While we know rebuild_strategy and take_for_trim are not running (we hold guard),
1417                // apply temporary_allocation removals.
1418                for device_range in inner.dropped_temporary_allocations.drain(..) {
1419                    self.temporary_allocations.erase(&AllocatorKey { device_range });
1420                }
1421
1422                match inner.strategy.allocate(len) {
1423                    Err(FxfsError::NotFound) => {
1424                        // Overflow. Fall through and rebuild
1425                        inner.rebuild_strategy_trigger_histogram
1426                            [std::cmp::min(63, (len / self.block_size) as usize)] += 1;
1427                    }
1428                    Err(err) => {
1429                        error!(err:%; "Likely filesystem corruption.");
1430                        return Err(err.into());
1431                    }
1432                    Ok(x) => {
1433                        break x;
1434                    }
1435                }
1436            }
1437            // We've run out of extents of the requested length in RAM but there
1438            // exists more of this size on device. Rescan device and circle back.
1439            // We already hold the allocation_mutex, so exclusive access is guaranteed.
1440            if !self.rebuild_strategy().await? {
1441                error!("Cannot find additional free space. Corruption?");
1442                return Err(FxfsError::Inconsistent.into());
1443            }
1444        };
1445
1446        debug!(device_range:? = result; "allocate");
1447
1448        let len = result.length().unwrap();
1449        let reservation_owner = reservation.either(
1450            // Left means we got an outside reservation.
1451            |l| {
1452                l.forget_some(len);
1453                l.owner_object_id()
1454            },
1455            |r| {
1456                r.forget_some(len);
1457                r.owner_object_id()
1458            },
1459        );
1460
1461        {
1462            let mut inner = self.inner.lock();
1463            let owner_entry = inner.owner_bytes.entry(owner_object_id).or_default();
1464            owner_entry.uncommitted_allocated_bytes += len;
1465            // If the reservation has an owner, ensure they are the same.
1466            assert_eq!(owner_object_id, reservation_owner.unwrap_or(owner_object_id));
1467            inner.remove_reservation(reservation_owner, len);
1468            self.temporary_allocations.insert(AllocatorItem {
1469                key: AllocatorKey { device_range: result.clone() },
1470                value: AllocatorValue::Abs { owner_object_id, count: 1 },
1471                sequence: 0,
1472            })?;
1473        }
1474
1475        let mutation =
1476            AllocatorMutation::Allocate { device_range: result.clone().into(), owner_object_id };
1477        assert!(transaction.add(self.object_id(), Mutation::Allocator(mutation)).is_none());
1478
1479        Ok(result)
1480    }
1481
1482    /// Marks the given device range as allocated.  The main use case for this at this time is for
1483    /// the super-block which needs to be at a fixed location on the device.
1484    #[trace]
1485    pub fn mark_allocated(
1486        &self,
1487        transaction: &mut Transaction<'_>,
1488        owner_object_id: u64,
1489        device_range: Range<u64>,
1490    ) -> Result<(), Error> {
1491        debug_assert_ne!(owner_object_id, INVALID_OBJECT_ID);
1492        {
1493            let len = device_range.length().map_err(|_| FxfsError::InvalidArgs)?;
1494
1495            let mut inner = self.inner.lock();
1496            let device_used = inner.used_bytes();
1497            let owner_id_bytes_left = inner.owner_id_bytes_left(owner_object_id);
1498            let owner_entry = inner.owner_bytes.entry(owner_object_id).or_default();
1499            ensure!(
1500                device_range.end <= self.device_size
1501                    && (Saturating(self.device_size) - device_used).0 >= len
1502                    && owner_id_bytes_left >= len,
1503                FxfsError::NoSpace
1504            );
1505            if let Some(reservation) = &mut transaction.allocator_reservation {
1506                // The transaction takes ownership of this hold.
1507                reservation.reserve(len).ok_or(FxfsError::NoSpace)?.forget();
1508            }
1509            owner_entry.uncommitted_allocated_bytes += len;
1510            inner.strategy.remove(device_range.clone());
1511            self.temporary_allocations.insert(AllocatorItem {
1512                key: AllocatorKey { device_range: device_range.clone() },
1513                value: AllocatorValue::Abs { owner_object_id, count: 1 },
1514                sequence: 0,
1515            })?;
1516        }
1517        let mutation =
1518            AllocatorMutation::Allocate { device_range: device_range.into(), owner_object_id };
1519        transaction.add(self.object_id(), Mutation::Allocator(mutation));
1520        Ok(())
1521    }
1522
1523    /// Sets the limits for an owner object in terms of usage.
1524    pub fn set_bytes_limit(
1525        &self,
1526        transaction: &mut Transaction<'_>,
1527        owner_object_id: u64,
1528        bytes: u64,
1529    ) -> Result<(), Error> {
1530        // System stores cannot be given limits.
1531        assert!(!self.is_system_store(owner_object_id));
1532        transaction.add(
1533            self.object_id(),
1534            Mutation::Allocator(AllocatorMutation::SetLimit { owner_object_id, bytes }),
1535        );
1536        Ok(())
1537    }
1538
1539    /// Gets the bytes limit for an owner object.
1540    pub fn get_bytes_limit(&self, owner_object_id: u64) -> Option<u64> {
1541        self.inner.lock().info.limit_bytes.get(&owner_object_id).copied()
1542    }
1543
1544    /// Deallocates the given device range for the specified object.
1545    #[trace]
1546    pub async fn deallocate(
1547        &self,
1548        transaction: &mut Transaction<'_>,
1549        owner_object_id: u64,
1550        dealloc_range: Range<u64>,
1551    ) -> Result<u64, Error> {
1552        debug!(device_range:? = dealloc_range; "deallocate");
1553        ensure!(dealloc_range.is_valid(), FxfsError::InvalidArgs);
1554        // We don't currently support sharing of allocations (value.count always equals 1), so as
1555        // long as we can assume the deallocated range is actually allocated, we can avoid device
1556        // access.
1557        let deallocated = dealloc_range.end - dealloc_range.start;
1558        let mutation = AllocatorMutation::Deallocate {
1559            device_range: dealloc_range.clone().into(),
1560            owner_object_id,
1561        };
1562        transaction.add(self.object_id(), Mutation::Allocator(mutation));
1563
1564        let _guard = self.allocation_mutex.lock().await;
1565
1566        // We use `dropped_temporary_allocations` to defer removals from `temporary_allocations` in
1567        // places where we can't execute async code or take locks.
1568        //
1569        // It's important we don't ever remove entries from `temporary_allocations` without
1570        // holding the `allocation_mutex` lock or else we may end up with an inconsistent view of
1571        // available disk space when we combine temporary_allocations with the LSMTree.
1572        // This is normally done in `allocate()` but we also need to apply these here because
1573        // `temporary_allocations` is also used to track deallocated space until it has been
1574        // flushed (see comment below). A user may allocate and then deallocate space before calling
1575        // allocate() a second time, so if we do not clean up here, we may end up with the same
1576        // range in temporary_allocations twice (once for allocate, once for deallocate).
1577        let mut inner = self.inner.lock();
1578        for device_range in inner.dropped_temporary_allocations.drain(..) {
1579            self.temporary_allocations.erase(&AllocatorKey { device_range });
1580        }
1581
1582        // We can't reuse deallocated space immediately because failure to successfully flush will
1583        // mean that on next mount, we may find this space is still assigned to the deallocated
1584        // region. To avoid immediate reuse, we hold these regions in 'temporary_allocations' until
1585        // after a successful flush so we know the region is safe to reuse.
1586        self.temporary_allocations
1587            .insert(AllocatorItem {
1588                key: AllocatorKey { device_range: dealloc_range.clone() },
1589                value: AllocatorValue::Abs { owner_object_id, count: 1 },
1590                sequence: 0,
1591            })
1592            .context("tracking deallocated")?;
1593
1594        Ok(deallocated)
1595    }
1596
1597    /// Marks allocations associated with a given |owner_object_id| for deletion.
1598    ///
1599    /// This is used as part of deleting encrypted volumes (ObjectStore) without having the keys.
1600    ///
1601    /// MarkForDeletion mutations eventually manipulates allocator metadata (AllocatorInfo) instead
1602    /// of the mutable layer but we must be careful not to do this too early and risk premature
1603    /// reuse of extents.
1604    ///
1605    /// Replay is not guaranteed until the *device* gets flushed, so we cannot reuse the deleted
1606    /// extents until we've flushed the device.
1607    ///
1608    /// TODO(b/316827348): Consider removing the use of mark_for_deletion in AllocatorInfo and
1609    /// just compacting?
1610    ///
1611    /// After an allocator.flush() (i.e. a major compaction), we know that there is no data left
1612    /// in the layer files for this owner_object_id and we are able to clear `marked_for_deletion`.
1613    pub fn mark_for_deletion(&self, transaction: &mut Transaction<'_>, owner_object_id: u64) {
1614        // Note that because the actual time of deletion (the next major compaction) is undefined,
1615        // |owner_object_id| should not be reused after this call.
1616        transaction.add(
1617            self.object_id(),
1618            Mutation::Allocator(AllocatorMutation::MarkForDeletion(owner_object_id)),
1619        );
1620    }
1621
1622    /// Called when the device has been flush and indicates what the journal log offset was when
1623    /// that happened.
1624    pub fn did_flush_device(&self, flush_log_offset: u64) {
1625        // First take out the deallocations that we now know to be flushed.  The list is maintained
1626        // in order, so we can stop on the first entry that we find that should not be unreserved
1627        // yet.
1628        #[allow(clippy::never_loop)] // Loop used as a for {} else {}.
1629        let deallocs = 'deallocs_outer: loop {
1630            let mut inner = self.inner.lock();
1631            for (index, dealloc) in inner.committed_deallocated.iter().enumerate() {
1632                if dealloc.log_file_offset >= flush_log_offset {
1633                    let mut deallocs = inner.committed_deallocated.split_off(index);
1634                    // Swap because we want the opposite of what split_off does.
1635                    std::mem::swap(&mut inner.committed_deallocated, &mut deallocs);
1636                    break 'deallocs_outer deallocs;
1637                }
1638            }
1639            break std::mem::take(&mut inner.committed_deallocated);
1640        };
1641
1642        // Now we can free those elements.
1643        let mut inner = self.inner.lock();
1644        let mut totals = BTreeMap::<u64, u64>::new();
1645        for dealloc in deallocs {
1646            *(totals.entry(dealloc.owner_object_id).or_default()) +=
1647                dealloc.range.length().unwrap();
1648            inner.strategy.free(dealloc.range.clone()).expect("dealloced ranges");
1649            self.temporary_allocations.erase(&AllocatorKey { device_range: dealloc.range.clone() });
1650        }
1651
1652        // This *must* come after we've removed the records from reserved reservations because the
1653        // allocator uses this value to decide whether or not a device-flush is required and it must
1654        // be possible to find free space if it thinks no device-flush is required.
1655        for (owner_object_id, total) in totals {
1656            match inner.owner_bytes.get_mut(&owner_object_id) {
1657                Some(counters) => counters.committed_deallocated_bytes -= total,
1658                None => panic!("Failed to decrement for unknown owner: {}", owner_object_id),
1659            }
1660        }
1661    }
1662
1663    /// Returns a reservation that can be used later, or None if there is insufficient space. The
1664    /// |owner_object_id| indicates which object in the root object store the reservation is for.
1665    pub fn reserve(
1666        self: Arc<Self>,
1667        owner_object_id: Option<u64>,
1668        amount: u64,
1669    ) -> Option<Reservation> {
1670        {
1671            let mut inner = self.inner.lock();
1672
1673            let device_free = (Saturating(self.device_size) - inner.used_bytes()).0;
1674
1675            let limit = match owner_object_id {
1676                Some(id) => std::cmp::min(inner.owner_id_bytes_left(id), device_free),
1677                None => device_free,
1678            };
1679            if limit < amount {
1680                return None;
1681            }
1682            inner.add_reservation(owner_object_id, amount);
1683        }
1684        Some(Reservation::new(self, owner_object_id, amount))
1685    }
1686
1687    /// Like reserve, but takes a callback is passed the |limit| and should return the amount,
1688    /// which can be zero.
1689    pub fn reserve_with(
1690        self: Arc<Self>,
1691        owner_object_id: Option<u64>,
1692        amount: impl FnOnce(u64) -> u64,
1693    ) -> Reservation {
1694        let amount = {
1695            let mut inner = self.inner.lock();
1696
1697            let device_free = (Saturating(self.device_size) - inner.used_bytes()).0;
1698
1699            let amount = amount(match owner_object_id {
1700                Some(id) => std::cmp::min(inner.owner_id_bytes_left(id), device_free),
1701                None => device_free,
1702            });
1703
1704            inner.add_reservation(owner_object_id, amount);
1705
1706            amount
1707        };
1708
1709        Reservation::new(self, owner_object_id, amount)
1710    }
1711
1712    /// Returns the total number of allocated bytes.
1713    pub fn get_allocated_bytes(&self) -> u64 {
1714        self.inner.lock().allocated_bytes().0
1715    }
1716
1717    /// Returns the size of bytes available to allocate.
1718    pub fn get_disk_bytes(&self) -> u64 {
1719        self.device_size
1720    }
1721
1722    /// Returns the total number of allocated bytes per owner_object_id.
1723    /// Note that this is quite an expensive operation as it copies the collection.
1724    /// This is intended for use in fsck() and friends, not general use code.
1725    pub fn get_owner_allocated_bytes(&self) -> BTreeMap<u64, u64> {
1726        self.inner.lock().owner_bytes.iter().map(|(k, v)| (*k, v.allocated_bytes.0)).collect()
1727    }
1728
1729    /// Returns the number of allocated and reserved bytes.
1730    pub fn get_used_bytes(&self) -> Saturating<u64> {
1731        let inner = self.inner.lock();
1732        inner.used_bytes()
1733    }
1734}
1735
1736impl ReservationOwner for Allocator {
1737    fn release_reservation(&self, owner_object_id: Option<u64>, amount: u64) {
1738        self.inner.lock().remove_reservation(owner_object_id, amount);
1739    }
1740}
1741
1742#[async_trait]
1743impl JournalingObject for Allocator {
1744    fn apply_mutation(
1745        &self,
1746        mutation: Mutation,
1747        context: &ApplyContext<'_, '_>,
1748        _assoc_obj: AssocObj<'_>,
1749    ) -> Result<(), Error> {
1750        match mutation {
1751            Mutation::Allocator(AllocatorMutation::MarkForDeletion(owner_object_id)) => {
1752                let mut inner = self.inner.lock();
1753                inner.owner_bytes.remove(&owner_object_id);
1754
1755                // We use `info.marked_for_deletion` to track the committed state and
1756                // `inner.marked_for_deletion` to track volumes marked for deletion *after* we have
1757                // flushed the device.  It is not safe to use extents belonging to deleted volumes
1758                // until after we have flushed the device.
1759                inner.info.marked_for_deletion.insert(owner_object_id);
1760                inner.volumes_deleted_pending_sync.insert(owner_object_id);
1761
1762                inner.info.limit_bytes.remove(&owner_object_id);
1763            }
1764            Mutation::Allocator(AllocatorMutation::Allocate { device_range, owner_object_id }) => {
1765                self.maximum_offset.fetch_max(device_range.end, Ordering::Relaxed);
1766                let item = AllocatorItem {
1767                    key: AllocatorKey { device_range: device_range.clone().into() },
1768                    value: AllocatorValue::Abs { count: 1, owner_object_id },
1769                    sequence: context.checkpoint.file_offset,
1770                };
1771                let len = item.key.device_range.length().unwrap();
1772                let lower_bound = item.key.lower_bound_for_merge_into();
1773                self.tree.merge_into(item, &lower_bound);
1774                let mut inner = self.inner.lock();
1775                let entry = inner.owner_bytes.entry(owner_object_id).or_default();
1776                entry.allocated_bytes += len;
1777                if let ApplyMode::Live(transaction) = context.mode {
1778                    entry.uncommitted_allocated_bytes -= len;
1779                    // Note that we cannot drop entries from temporary_allocations without holding
1780                    // the allocation_mutex as it may introduce races. We instead add the range to
1781                    // a Vec that can be applied later when we hold the lock (See comment on
1782                    // `dropped_temporary_allocations` above).
1783                    inner.dropped_temporary_allocations.push(device_range.into());
1784                    if let Some(reservation) = transaction.allocator_reservation {
1785                        reservation.commit(len);
1786                    }
1787                }
1788            }
1789            Mutation::Allocator(AllocatorMutation::Deallocate {
1790                device_range,
1791                owner_object_id,
1792            }) => {
1793                let item = AllocatorItem {
1794                    key: AllocatorKey { device_range: device_range.into() },
1795                    value: AllocatorValue::None,
1796                    sequence: context.checkpoint.file_offset,
1797                };
1798                let len = item.key.device_range.length().unwrap();
1799
1800                {
1801                    let mut inner = self.inner.lock();
1802                    {
1803                        let entry = inner.owner_bytes.entry(owner_object_id).or_default();
1804                        entry.allocated_bytes -= len;
1805                        if context.mode.is_live() {
1806                            entry.committed_deallocated_bytes += len;
1807                        }
1808                    }
1809                    if context.mode.is_live() {
1810                        inner.committed_deallocated.push_back(CommittedDeallocation {
1811                            log_file_offset: context.checkpoint.file_offset,
1812                            range: item.key.device_range.clone(),
1813                            owner_object_id,
1814                        });
1815                    }
1816                    if let ApplyMode::Live(Transaction {
1817                        allocator_reservation: Some(reservation),
1818                        ..
1819                    }) = context.mode
1820                    {
1821                        inner.add_reservation(reservation.owner_object_id(), len);
1822                        reservation.add(len);
1823                    }
1824                }
1825                let lower_bound = item.key.lower_bound_for_merge_into();
1826                self.tree.merge_into(item, &lower_bound);
1827            }
1828            Mutation::Allocator(AllocatorMutation::SetLimit { owner_object_id, bytes }) => {
1829                // Journal replay is ordered and each of these calls is idempotent. So the last one
1830                // will be respected, it doesn't matter if the value is already set, or gets changed
1831                // multiple times during replay. When it gets opened it will be merged in with the
1832                // snapshot.
1833                self.inner.lock().info.limit_bytes.insert(owner_object_id, bytes);
1834            }
1835            Mutation::BeginFlush => {
1836                self.tree.seal();
1837                // Transfer our running count for allocated_bytes so that it gets written to the new
1838                // info file when flush completes.
1839                let mut inner = self.inner.lock();
1840                let allocated_bytes =
1841                    inner.owner_bytes.iter().map(|(k, v)| (*k, v.allocated_bytes.0)).collect();
1842                inner.info.allocated_bytes = allocated_bytes;
1843            }
1844            Mutation::EndFlush => {}
1845            _ => bail!("unexpected mutation: {:?}", mutation),
1846        }
1847        Ok(())
1848    }
1849
1850    fn drop_mutation(&self, mutation: Mutation, transaction: &Transaction<'_>) {
1851        match mutation {
1852            Mutation::Allocator(AllocatorMutation::Allocate { device_range, owner_object_id }) => {
1853                let len = device_range.length().unwrap();
1854                let mut inner = self.inner.lock();
1855                inner
1856                    .owner_bytes
1857                    .entry(owner_object_id)
1858                    .or_default()
1859                    .uncommitted_allocated_bytes -= len;
1860                if let Some(reservation) = transaction.allocator_reservation {
1861                    let res_owner = reservation.owner_object_id();
1862                    inner.add_reservation(res_owner, len);
1863                    reservation.release_reservation(res_owner, len);
1864                }
1865                inner.strategy.free(device_range.clone().into()).expect("drop mutaton");
1866                self.temporary_allocations
1867                    .erase(&AllocatorKey { device_range: device_range.into() });
1868            }
1869            Mutation::Allocator(AllocatorMutation::Deallocate { device_range, .. }) => {
1870                self.temporary_allocations
1871                    .erase(&AllocatorKey { device_range: device_range.into() });
1872            }
1873            _ => {}
1874        }
1875    }
1876
1877    async fn flush(&self) -> Result<Version, Error> {
1878        let filesystem = self.filesystem.upgrade().unwrap();
1879        let object_manager = filesystem.object_manager();
1880        let earliest_version = self.tree.get_earliest_version();
1881        if !object_manager.needs_flush(self.object_id()) && earliest_version == LATEST_VERSION {
1882            // Early exit, but still return the earliest version used by a struct in the tree
1883            return Ok(earliest_version);
1884        }
1885
1886        let fs = self.filesystem.upgrade().unwrap();
1887        let mut flusher = Flusher::new(self, &fs).await;
1888        let (new_layer_file, info) = flusher.start().await?;
1889        flusher.finish(new_layer_file, info).await
1890    }
1891}
1892
1893// The merger is unable to merge extents that exist like the following:
1894//
1895//     |----- +1 -----|
1896//                    |----- -1 -----|
1897//                    |----- +2 -----|
1898//
1899// It cannot coalesce them because it has to emit the +1 record so that it can move on and merge the
1900// -1 and +2 records. To address this, we add another stage that applies after merging which
1901// coalesces records after they have been emitted.  This is a bit simpler than merging because the
1902// records cannot overlap, so it's just a question of merging adjacent records if they happen to
1903// have the same delta and object_id.
1904
1905pub struct CoalescingIterator<I> {
1906    iter: I,
1907    item: Option<AllocatorItem>,
1908}
1909
1910impl<I: LayerIterator<AllocatorKey, AllocatorValue>> CoalescingIterator<I> {
1911    pub async fn new(iter: I) -> Result<CoalescingIterator<I>, Error> {
1912        let mut iter = Self { iter, item: None };
1913        iter.advance().await?;
1914        Ok(iter)
1915    }
1916}
1917
1918#[async_trait]
1919impl<I: LayerIterator<AllocatorKey, AllocatorValue>> LayerIterator<AllocatorKey, AllocatorValue>
1920    for CoalescingIterator<I>
1921{
1922    async fn advance(&mut self) -> Result<(), Error> {
1923        self.item = self.iter.get().map(|x| x.cloned());
1924        if self.item.is_none() {
1925            return Ok(());
1926        }
1927        let left = self.item.as_mut().unwrap();
1928        loop {
1929            self.iter.advance().await?;
1930            match self.iter.get() {
1931                None => return Ok(()),
1932                Some(right) => {
1933                    // The two records cannot overlap.
1934                    ensure!(
1935                        left.key.device_range.end <= right.key.device_range.start,
1936                        FxfsError::Inconsistent
1937                    );
1938                    // We can only coalesce records if they are touching and have the same value.
1939                    if left.key.device_range.end < right.key.device_range.start
1940                        || left.value != *right.value
1941                    {
1942                        return Ok(());
1943                    }
1944                    left.key.device_range.end = right.key.device_range.end;
1945                }
1946            }
1947        }
1948    }
1949
1950    fn get(&self) -> Option<ItemRef<'_, AllocatorKey, AllocatorValue>> {
1951        self.item.as_ref().map(|x| x.as_item_ref())
1952    }
1953}
1954
1955struct Flusher<'a> {
1956    allocator: &'a Allocator,
1957    fs: &'a Arc<FxFilesystem>,
1958    _guard: WriteGuard<'a>,
1959}
1960
1961impl<'a> Flusher<'a> {
1962    async fn new(allocator: &'a Allocator, fs: &'a Arc<FxFilesystem>) -> Self {
1963        let keys = lock_keys![LockKey::flush(allocator.object_id())];
1964        Self { allocator, fs, _guard: fs.lock_manager().write_lock(keys).await }
1965    }
1966
1967    fn txn_options(allocator_reservation: &Reservation) -> Options<'_> {
1968        Options {
1969            skip_journal_checks: true,
1970            borrow_metadata_space: true,
1971            allocator_reservation: Some(allocator_reservation),
1972            ..Default::default()
1973        }
1974    }
1975
1976    async fn start(&mut self) -> Result<(DataObjectHandle<ObjectStore>, AllocatorInfo), Error> {
1977        let object_manager = self.fs.object_manager();
1978        let mut transaction = self
1979            .fs
1980            .clone()
1981            .new_transaction(lock_keys![], Self::txn_options(object_manager.metadata_reservation()))
1982            .await?;
1983
1984        let root_store = self.fs.root_store();
1985        let layer_object_handle = ObjectStore::create_object(
1986            &root_store,
1987            &mut transaction,
1988            HandleOptions { skip_journal_checks: true, ..Default::default() },
1989            None,
1990        )
1991        .await?;
1992        root_store.add_to_graveyard(&mut transaction, layer_object_handle.object_id());
1993        // It's important that this transaction does not include any allocations because we use
1994        // BeginFlush as a snapshot point for mutations to the tree: other allocator mutations
1995        // within this transaction might get applied before seal (which would be OK), but they could
1996        // equally get applied afterwards (since Transaction makes no guarantees about the order in
1997        // which mutations are applied whilst committing), in which case they'd get lost on replay
1998        // because the journal will only send mutations that follow this transaction.
1999        transaction.add(self.allocator.object_id(), Mutation::BeginFlush);
2000        let info = transaction
2001            .commit_with_callback(|_| {
2002                // We must capture `info` as it is when we start flushing. Subsequent transactions
2003                // can end up modifying `info` and we shouldn't capture those here.
2004                self.allocator.inner.lock().info.clone()
2005            })
2006            .await?;
2007        Ok((layer_object_handle, info))
2008    }
2009
2010    async fn finish(
2011        self,
2012        layer_object_handle: DataObjectHandle<ObjectStore>,
2013        mut info: AllocatorInfo,
2014    ) -> Result<Version, Error> {
2015        let object_manager = self.fs.object_manager();
2016        let txn_options = Self::txn_options(object_manager.metadata_reservation());
2017
2018        let layer_set = self.allocator.tree.immutable_layer_set();
2019        let total_len = layer_set.sum_len();
2020        {
2021            let mut merger = layer_set.merger();
2022            let iter = self.allocator.filter(merger.query(Query::FullScan).await?, true).await?;
2023            let iter = CoalescingIterator::new(iter).await?;
2024            self.allocator
2025                .tree
2026                .compact_with_iterator(
2027                    iter,
2028                    total_len,
2029                    DirectWriter::new(&layer_object_handle, txn_options).await,
2030                    layer_object_handle.block_size(),
2031                )
2032                .await?;
2033        }
2034
2035        let root_store = self.fs.root_store();
2036
2037        // Both of these forward-declared variables need to outlive the transaction.
2038        let object_handle;
2039        let reservation_update;
2040        let mut transaction = self
2041            .fs
2042            .clone()
2043            .new_transaction(
2044                lock_keys![LockKey::object(
2045                    root_store.store_object_id(),
2046                    self.allocator.object_id()
2047                )],
2048                txn_options,
2049            )
2050            .await?;
2051        let mut serialized_info = Vec::new();
2052
2053        debug!(oid = layer_object_handle.object_id(); "new allocator layer file");
2054        object_handle = ObjectStore::open_object(
2055            &root_store,
2056            self.allocator.object_id(),
2057            HandleOptions::default(),
2058            None,
2059        )
2060        .await?;
2061
2062        // Move all the existing layers to the graveyard.
2063        for object_id in &info.layers {
2064            root_store.add_to_graveyard(&mut transaction, *object_id);
2065        }
2066
2067        // Write out updated info.
2068
2069        // After successfully flushing, all the stores that were marked for deletion at the time of
2070        // the BeginFlush transaction, no longer need to be marked for deletion.  There can be
2071        // stores that have been deleted since the BeginFlush transaction, but they will be covered
2072        // by a MarkForDeletion mutation.
2073        let marked_for_deletion = std::mem::take(&mut info.marked_for_deletion);
2074
2075        info.layers = vec![layer_object_handle.object_id()];
2076
2077        info.serialize_with_version(&mut serialized_info)?;
2078
2079        let mut buf = object_handle.allocate_buffer(serialized_info.len()).await;
2080        buf.as_mut_slice()[..serialized_info.len()].copy_from_slice(&serialized_info[..]);
2081        object_handle.txn_write(&mut transaction, 0u64, buf.as_ref()).await?;
2082
2083        reservation_update = ReservationUpdate::new(tree::reservation_amount_from_layer_size(
2084            layer_object_handle.get_size(),
2085        ));
2086
2087        // It's important that EndFlush is in the same transaction that we write AllocatorInfo,
2088        // because we use EndFlush to make the required adjustments to allocated_bytes.
2089        transaction.add_with_object(
2090            self.allocator.object_id(),
2091            Mutation::EndFlush,
2092            AssocObj::Borrowed(&reservation_update),
2093        );
2094        root_store.remove_from_graveyard(&mut transaction, layer_object_handle.object_id());
2095
2096        let layers = layers_from_handles([layer_object_handle]).await?;
2097        transaction
2098            .commit_with_callback(|_| {
2099                self.allocator.tree.set_layers(layers);
2100
2101                // At this point we've committed the new layers to disk so we can start using them.
2102                // This means we can also switch to the new AllocatorInfo which clears
2103                // marked_for_deletion.
2104                let mut inner = self.allocator.inner.lock();
2105                inner.info.layers = info.layers;
2106                for owner_id in marked_for_deletion {
2107                    inner.marked_for_deletion.remove(&owner_id);
2108                    inner.info.marked_for_deletion.remove(&owner_id);
2109                }
2110            })
2111            .await?;
2112
2113        // Now close the layers and purge them.
2114        for layer in layer_set.layers {
2115            let object_id = layer.handle().map(|h| h.object_id());
2116            layer.close_layer().await;
2117            if let Some(object_id) = object_id {
2118                root_store.tombstone_object(object_id, txn_options).await?;
2119            }
2120        }
2121
2122        let mut counters = self.allocator.counters.lock();
2123        counters.num_flushes += 1;
2124        counters.last_flush_time = Some(std::time::SystemTime::now());
2125        // Return the earliest version used by a struct in the tree
2126        Ok(self.allocator.tree.get_earliest_version())
2127    }
2128}
2129
2130#[cfg(test)]
2131mod tests {
2132    use crate::filesystem::{
2133        FxFilesystem, FxFilesystemBuilder, JournalingObject, OpenFxFilesystem,
2134    };
2135    use crate::fsck::fsck;
2136    use crate::lsm_tree::cache::NullCache;
2137    use crate::lsm_tree::skip_list_layer::SkipListLayer;
2138    use crate::lsm_tree::types::{FuzzyHash as _, Item, ItemRef, Layer, LayerIterator};
2139    use crate::lsm_tree::{LSMTree, Query};
2140    use crate::object_handle::ObjectHandle;
2141    use crate::object_store::allocator::merge::merge;
2142    use crate::object_store::allocator::{
2143        Allocator, AllocatorKey, AllocatorValue, CoalescingIterator,
2144    };
2145    use crate::object_store::transaction::{Options, TRANSACTION_METADATA_MAX_AMOUNT, lock_keys};
2146    use crate::object_store::volume::root_volume;
2147    use crate::object_store::{Directory, LockKey, NewChildStoreOptions, ObjectStore};
2148    use crate::range::RangeExt;
2149    use crate::round::round_up;
2150    use fuchsia_async as fasync;
2151    use fuchsia_sync::Mutex;
2152    use std::cmp::{max, min};
2153    use std::ops::{Bound, Range};
2154    use std::sync::Arc;
2155    use storage_device::DeviceHolder;
2156    use storage_device::fake_device::FakeDevice;
2157
2158    #[test]
2159    fn test_allocator_key_is_range_based() {
2160        // Make sure we disallow using allocator keys with point queries.
2161        assert!(AllocatorKey { device_range: 0..100 }.is_range_key());
2162    }
2163
2164    #[fuchsia::test]
2165    async fn test_coalescing_iterator() {
2166        let skip_list = SkipListLayer::new(100);
2167        let items = [
2168            Item::new(
2169                AllocatorKey { device_range: 0..100 },
2170                AllocatorValue::Abs { count: 1, owner_object_id: 99 },
2171            ),
2172            Item::new(
2173                AllocatorKey { device_range: 100..200 },
2174                AllocatorValue::Abs { count: 1, owner_object_id: 99 },
2175            ),
2176        ];
2177        skip_list.insert(items[1].clone()).expect("insert error");
2178        skip_list.insert(items[0].clone()).expect("insert error");
2179        let mut iter =
2180            CoalescingIterator::new(skip_list.seek(Bound::Unbounded).await.expect("seek failed"))
2181                .await
2182                .expect("new failed");
2183        let ItemRef { key, value, .. } = iter.get().expect("get failed");
2184        assert_eq!(
2185            (key, value),
2186            (
2187                &AllocatorKey { device_range: 0..200 },
2188                &AllocatorValue::Abs { count: 1, owner_object_id: 99 }
2189            )
2190        );
2191        iter.advance().await.expect("advance failed");
2192        assert!(iter.get().is_none());
2193    }
2194
2195    #[fuchsia::test]
2196    async fn test_merge_and_coalesce_across_three_layers() {
2197        let lsm_tree = LSMTree::new(merge, Box::new(NullCache {}));
2198        lsm_tree
2199            .insert(Item::new(
2200                AllocatorKey { device_range: 100..200 },
2201                AllocatorValue::Abs { count: 1, owner_object_id: 99 },
2202            ))
2203            .expect("insert error");
2204        lsm_tree.seal();
2205        lsm_tree
2206            .insert(Item::new(
2207                AllocatorKey { device_range: 0..100 },
2208                AllocatorValue::Abs { count: 1, owner_object_id: 99 },
2209            ))
2210            .expect("insert error");
2211
2212        let layer_set = lsm_tree.layer_set();
2213        let mut merger = layer_set.merger();
2214        let mut iter =
2215            CoalescingIterator::new(merger.query(Query::FullScan).await.expect("seek failed"))
2216                .await
2217                .expect("new failed");
2218        let ItemRef { key, value, .. } = iter.get().expect("get failed");
2219        assert_eq!(
2220            (key, value),
2221            (
2222                &AllocatorKey { device_range: 0..200 },
2223                &AllocatorValue::Abs { count: 1, owner_object_id: 99 }
2224            )
2225        );
2226        iter.advance().await.expect("advance failed");
2227        assert!(iter.get().is_none());
2228    }
2229
2230    #[fuchsia::test]
2231    async fn test_merge_and_coalesce_wont_merge_across_object_id() {
2232        let lsm_tree = LSMTree::new(merge, Box::new(NullCache {}));
2233        lsm_tree
2234            .insert(Item::new(
2235                AllocatorKey { device_range: 100..200 },
2236                AllocatorValue::Abs { count: 1, owner_object_id: 99 },
2237            ))
2238            .expect("insert error");
2239        lsm_tree.seal();
2240        lsm_tree
2241            .insert(Item::new(
2242                AllocatorKey { device_range: 0..100 },
2243                AllocatorValue::Abs { count: 1, owner_object_id: 98 },
2244            ))
2245            .expect("insert error");
2246
2247        let layer_set = lsm_tree.layer_set();
2248        let mut merger = layer_set.merger();
2249        let mut iter =
2250            CoalescingIterator::new(merger.query(Query::FullScan).await.expect("seek failed"))
2251                .await
2252                .expect("new failed");
2253        let ItemRef { key, value, .. } = iter.get().expect("get failed");
2254        assert_eq!(
2255            (key, value),
2256            (
2257                &AllocatorKey { device_range: 0..100 },
2258                &AllocatorValue::Abs { count: 1, owner_object_id: 98 },
2259            )
2260        );
2261        iter.advance().await.expect("advance failed");
2262        let ItemRef { key, value, .. } = iter.get().expect("get failed");
2263        assert_eq!(
2264            (key, value),
2265            (
2266                &AllocatorKey { device_range: 100..200 },
2267                &AllocatorValue::Abs { count: 1, owner_object_id: 99 }
2268            )
2269        );
2270        iter.advance().await.expect("advance failed");
2271        assert!(iter.get().is_none());
2272    }
2273
2274    fn overlap(a: &Range<u64>, b: &Range<u64>) -> u64 {
2275        if a.end > b.start && a.start < b.end {
2276            min(a.end, b.end) - max(a.start, b.start)
2277        } else {
2278            0
2279        }
2280    }
2281
2282    async fn collect_allocations(allocator: &Allocator) -> Vec<Range<u64>> {
2283        let layer_set = allocator.tree.layer_set();
2284        let mut merger = layer_set.merger();
2285        let mut iter = allocator
2286            .filter(merger.query(Query::FullScan).await.expect("seek failed"), false)
2287            .await
2288            .expect("build iterator");
2289        let mut allocations: Vec<Range<u64>> = Vec::new();
2290        while let Some(ItemRef { key: AllocatorKey { device_range }, .. }) = iter.get() {
2291            if let Some(r) = allocations.last() {
2292                assert!(device_range.start >= r.end);
2293            }
2294            allocations.push(device_range.clone());
2295            iter.advance().await.expect("advance failed");
2296        }
2297        allocations
2298    }
2299
2300    async fn check_allocations(allocator: &Allocator, expected_allocations: &[Range<u64>]) {
2301        let layer_set = allocator.tree.layer_set();
2302        let mut merger = layer_set.merger();
2303        let mut iter = allocator
2304            .filter(merger.query(Query::FullScan).await.expect("seek failed"), false)
2305            .await
2306            .expect("build iterator");
2307        let mut found = 0;
2308        while let Some(ItemRef { key: AllocatorKey { device_range }, .. }) = iter.get() {
2309            let mut l = device_range.length().expect("Invalid range");
2310            found += l;
2311            // Make sure that the entire range we have found completely overlaps with all the
2312            // allocations we expect to find.
2313            for range in expected_allocations {
2314                l -= overlap(range, device_range);
2315                if l == 0 {
2316                    break;
2317                }
2318            }
2319            assert_eq!(l, 0, "range {device_range:?} not covered by expectations");
2320            iter.advance().await.expect("advance failed");
2321        }
2322        // Make sure the total we found adds up to what we expect.
2323        assert_eq!(found, expected_allocations.iter().map(|r| r.length().unwrap()).sum::<u64>());
2324    }
2325
2326    async fn test_fs() -> (OpenFxFilesystem, Arc<Allocator>) {
2327        let device = DeviceHolder::new(FakeDevice::new(4096, 4096));
2328        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
2329        let allocator = fs.allocator();
2330        (fs, allocator)
2331    }
2332
2333    #[fuchsia::test]
2334    async fn test_allocations() {
2335        const STORE_OBJECT_ID: u64 = 99;
2336        let (fs, allocator) = test_fs().await;
2337        let mut transaction =
2338            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2339        let mut device_ranges = collect_allocations(&allocator).await;
2340
2341        // Expected extents:
2342        let expected = vec![
2343            0..4096,        // Superblock A (4k)
2344            4096..139264,   // root_store layer files, StoreInfo.. (33x4k blocks)
2345            139264..204800, // Superblock A extension (64k)
2346            204800..335872, // Initial Journal (128k)
2347            335872..401408, // Superblock B extension (64k)
2348            524288..528384, // Superblock B (4k)
2349        ];
2350        assert_eq!(device_ranges, expected);
2351        device_ranges.push(
2352            allocator
2353                .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2354                .await
2355                .expect("allocate failed"),
2356        );
2357        assert_eq!(device_ranges.last().unwrap().length().expect("Invalid range"), fs.block_size());
2358        device_ranges.push(
2359            allocator
2360                .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2361                .await
2362                .expect("allocate failed"),
2363        );
2364        assert_eq!(device_ranges.last().unwrap().length().expect("Invalid range"), fs.block_size());
2365        assert_eq!(overlap(&device_ranges[0], &device_ranges[1]), 0);
2366        transaction.commit().await.expect("commit failed");
2367        let mut transaction =
2368            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2369        device_ranges.push(
2370            allocator
2371                .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2372                .await
2373                .expect("allocate failed"),
2374        );
2375        assert_eq!(device_ranges[7].length().expect("Invalid range"), fs.block_size());
2376        assert_eq!(overlap(&device_ranges[5], &device_ranges[7]), 0);
2377        assert_eq!(overlap(&device_ranges[6], &device_ranges[7]), 0);
2378        transaction.commit().await.expect("commit failed");
2379
2380        check_allocations(&allocator, &device_ranges).await;
2381    }
2382
2383    #[fuchsia::test]
2384    async fn test_allocate_more_than_max_size() {
2385        const STORE_OBJECT_ID: u64 = 99;
2386        let (fs, allocator) = test_fs().await;
2387        let mut transaction =
2388            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2389        let mut device_ranges = collect_allocations(&allocator).await;
2390        device_ranges.push(
2391            allocator
2392                .allocate(&mut transaction, STORE_OBJECT_ID, fs.device().size())
2393                .await
2394                .expect("allocate failed"),
2395        );
2396        assert_eq!(
2397            device_ranges.last().unwrap().length().expect("Invalid range"),
2398            allocator.max_extent_size_bytes
2399        );
2400        transaction.commit().await.expect("commit failed");
2401
2402        check_allocations(&allocator, &device_ranges).await;
2403    }
2404
2405    #[fuchsia::test]
2406    async fn test_deallocations() {
2407        const STORE_OBJECT_ID: u64 = 99;
2408        let (fs, allocator) = test_fs().await;
2409        let initial_allocations = collect_allocations(&allocator).await;
2410
2411        let mut transaction =
2412            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2413        let device_range1 = allocator
2414            .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2415            .await
2416            .expect("allocate failed");
2417        assert_eq!(device_range1.length().expect("Invalid range"), fs.block_size());
2418        transaction.commit().await.expect("commit failed");
2419
2420        let mut transaction =
2421            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2422        allocator
2423            .deallocate(&mut transaction, STORE_OBJECT_ID, device_range1)
2424            .await
2425            .expect("deallocate failed");
2426        transaction.commit().await.expect("commit failed");
2427
2428        check_allocations(&allocator, &initial_allocations).await;
2429    }
2430
2431    #[fuchsia::test]
2432    async fn test_mark_allocated() {
2433        const STORE_OBJECT_ID: u64 = 99;
2434        let (fs, allocator) = test_fs().await;
2435        let mut device_ranges = collect_allocations(&allocator).await;
2436        let range = {
2437            let mut transaction = fs
2438                .clone()
2439                .new_transaction(lock_keys![], Options::default())
2440                .await
2441                .expect("new failed");
2442            // First, allocate 2 blocks.
2443            allocator
2444                .allocate(&mut transaction, STORE_OBJECT_ID, 2 * fs.block_size())
2445                .await
2446                .expect("allocate failed")
2447            // Let the transaction drop.
2448        };
2449
2450        let mut transaction =
2451            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2452
2453        // If we allocate 1 block, the two blocks that were allocated earlier should be available,
2454        // and this should return the first of them.
2455        device_ranges.push(
2456            allocator
2457                .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2458                .await
2459                .expect("allocate failed"),
2460        );
2461
2462        assert_eq!(device_ranges.last().unwrap().start, range.start);
2463
2464        // Mark the second block as allocated.
2465        let mut range2 = range.clone();
2466        range2.start += fs.block_size();
2467        allocator
2468            .mark_allocated(&mut transaction, STORE_OBJECT_ID, range2.clone())
2469            .expect("mark_allocated failed");
2470        device_ranges.push(range2);
2471
2472        // This should avoid the range we marked as allocated.
2473        device_ranges.push(
2474            allocator
2475                .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2476                .await
2477                .expect("allocate failed"),
2478        );
2479        let last_range = device_ranges.last().unwrap();
2480        assert_eq!(last_range.length().expect("Invalid range"), fs.block_size());
2481        assert_eq!(overlap(last_range, &range), 0);
2482        transaction.commit().await.expect("commit failed");
2483
2484        check_allocations(&allocator, &device_ranges).await;
2485    }
2486
2487    #[fuchsia::test]
2488    async fn test_mark_for_deletion() {
2489        const STORE_OBJECT_ID: u64 = 99;
2490        let (fs, allocator) = test_fs().await;
2491
2492        // Allocate some stuff.
2493        let initial_allocated_bytes = allocator.get_allocated_bytes();
2494        let mut device_ranges = collect_allocations(&allocator).await;
2495        let mut transaction =
2496            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2497        // Note we have a cap on individual allocation length so we allocate over multiple mutation.
2498        for _ in 0..15 {
2499            device_ranges.push(
2500                allocator
2501                    .allocate(&mut transaction, STORE_OBJECT_ID, 100 * fs.block_size())
2502                    .await
2503                    .expect("allocate failed"),
2504            );
2505            device_ranges.push(
2506                allocator
2507                    .allocate(&mut transaction, STORE_OBJECT_ID, 100 * fs.block_size())
2508                    .await
2509                    .expect("allocate2 failed"),
2510            );
2511        }
2512        transaction.commit().await.expect("commit failed");
2513        check_allocations(&allocator, &device_ranges).await;
2514
2515        assert_eq!(
2516            allocator.get_allocated_bytes(),
2517            initial_allocated_bytes + fs.block_size() * 3000
2518        );
2519
2520        // Mark for deletion.
2521        let mut transaction =
2522            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2523        allocator.mark_for_deletion(&mut transaction, STORE_OBJECT_ID);
2524        transaction.commit().await.expect("commit failed");
2525
2526        // Expect that allocated bytes is updated immediately but device ranges are still allocated.
2527        assert_eq!(allocator.get_allocated_bytes(), initial_allocated_bytes);
2528        check_allocations(&allocator, &device_ranges).await;
2529
2530        // Allocate more space than we have until we deallocate the mark_for_deletion space.
2531        // This should force a flush on allocate(). (1500 * 3 > test_fs size of 4096 blocks).
2532        device_ranges.clear();
2533
2534        let mut transaction =
2535            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2536        let target_bytes = 1500 * fs.block_size();
2537        while device_ranges.iter().map(|x| x.length().unwrap()).sum::<u64>() != target_bytes {
2538            let len = std::cmp::min(
2539                target_bytes - device_ranges.iter().map(|x| x.length().unwrap()).sum::<u64>(),
2540                100 * fs.block_size(),
2541            );
2542            device_ranges.push(
2543                allocator.allocate(&mut transaction, 100, len).await.expect("allocate failed"),
2544            );
2545        }
2546        transaction.commit().await.expect("commit failed");
2547
2548        // Have the deleted ranges cleaned up.
2549        allocator.flush().await.expect("flush failed");
2550
2551        // The flush above seems to trigger an allocation for the allocator itself.
2552        // We will just check that we have the right size for the owner we care about.
2553
2554        assert!(!allocator.get_owner_allocated_bytes().contains_key(&STORE_OBJECT_ID));
2555        assert_eq!(*allocator.get_owner_allocated_bytes().get(&100).unwrap(), target_bytes,);
2556    }
2557
2558    async fn create_file(store: &Arc<ObjectStore>, size: usize) {
2559        let root_directory =
2560            Directory::open(store, store.root_directory_object_id()).await.expect("open failed");
2561
2562        let mut transaction = store
2563            .filesystem()
2564            .new_transaction(
2565                lock_keys![LockKey::object(
2566                    store.store_object_id(),
2567                    store.root_directory_object_id()
2568                )],
2569                Options::default(),
2570            )
2571            .await
2572            .expect("new_transaction failed");
2573        let file = root_directory
2574            .create_child_file(&mut transaction, &format!("foo {}", size))
2575            .await
2576            .expect("create_child_file failed");
2577        transaction.commit().await.expect("commit failed");
2578
2579        let buffer = file.allocate_buffer(size).await;
2580
2581        // Append some data to it.
2582        let mut transaction = file
2583            .new_transaction_with_options(Options {
2584                borrow_metadata_space: true,
2585                ..Default::default()
2586            })
2587            .await
2588            .expect("new_transaction_with_options failed");
2589        file.txn_write(&mut transaction, 0, buffer.as_ref()).await.expect("txn_write failed");
2590        transaction.commit().await.expect("commit failed");
2591    }
2592
2593    #[fuchsia::test]
2594    async fn test_replay_with_deleted_store_and_compaction() {
2595        let (fs, _) = test_fs().await;
2596
2597        const FILE_SIZE: usize = 10_000_000;
2598
2599        let mut store_id = {
2600            let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2601            let store = root_vol
2602                .new_volume("vol", NewChildStoreOptions::default())
2603                .await
2604                .expect("new_volume failed");
2605
2606            create_file(&store, FILE_SIZE).await;
2607            store.store_object_id()
2608        };
2609
2610        fs.close().await.expect("close failed");
2611        let device = fs.take_device().await;
2612        device.reopen(false);
2613
2614        let mut fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2615
2616        // Compact so that when we replay the transaction to delete the store won't find any
2617        // mutations.
2618        fs.journal().compact().await.expect("compact failed");
2619
2620        for _ in 0..2 {
2621            {
2622                let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2623
2624                let transaction = fs
2625                    .clone()
2626                    .new_transaction(
2627                        lock_keys![
2628                            LockKey::object(
2629                                root_vol.volume_directory().store().store_object_id(),
2630                                root_vol.volume_directory().object_id(),
2631                            ),
2632                            LockKey::flush(store_id)
2633                        ],
2634                        Options { borrow_metadata_space: true, ..Default::default() },
2635                    )
2636                    .await
2637                    .expect("new_transaction failed");
2638                root_vol
2639                    .delete_volume("vol", transaction, || {})
2640                    .await
2641                    .expect("delete_volume failed");
2642
2643                let store = root_vol
2644                    .new_volume("vol", NewChildStoreOptions::default())
2645                    .await
2646                    .expect("new_volume failed");
2647                create_file(&store, FILE_SIZE).await;
2648                store_id = store.store_object_id();
2649            }
2650
2651            fs.close().await.expect("close failed");
2652            let device = fs.take_device().await;
2653            device.reopen(false);
2654
2655            fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2656        }
2657
2658        fsck(fs.clone()).await.expect("fsck failed");
2659        fs.close().await.expect("close failed");
2660    }
2661
2662    #[fuchsia::test(threads = 4)]
2663    async fn test_compaction_delete_race() {
2664        let (fs, _allocator) = test_fs().await;
2665
2666        {
2667            const FILE_SIZE: usize = 10_000_000;
2668
2669            let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2670            let store = root_vol
2671                .new_volume("vol", NewChildStoreOptions::default())
2672                .await
2673                .expect("new_volume failed");
2674
2675            create_file(&store, FILE_SIZE).await;
2676
2677            // Race compaction with deleting a store.
2678            let fs_clone = fs.clone();
2679
2680            // Even though the executor has 4 threads, it's hard to get it to run with
2681            // multiple threads. To make this happen, spin up a task that is continually yielding.
2682            let yield_task = fasync::Task::spawn(async {
2683                loop {
2684                    fuchsia_async::yield_now().await;
2685                }
2686            });
2687
2688            // Loop for a while to guarantee at least two threads start running.  We still have to
2689            // do the above, because otherwise another thread could start but then immediately stop
2690            // if there's nothing to do.
2691            for _ in 0..100 {
2692                fuchsia_async::yield_now().await;
2693            }
2694
2695            let task = fasync::Task::spawn(async move {
2696                fs_clone.journal().compact().await.expect("compact failed");
2697            });
2698
2699            // Now we can kill the yield_task.
2700            let _ = yield_task.abort();
2701
2702            // This range is chosen such that it caused this test to fail after quite a low number
2703            // of iterations for the bug that this test was introduced for.
2704            let sleep = rand::random_range(3000..6000);
2705            std::thread::sleep(std::time::Duration::from_micros(sleep));
2706            log::info!("sleep {sleep}us");
2707
2708            let transaction = fs
2709                .clone()
2710                .new_transaction(
2711                    lock_keys![
2712                        LockKey::object(
2713                            root_vol.volume_directory().store().store_object_id(),
2714                            root_vol.volume_directory().object_id(),
2715                        ),
2716                        LockKey::flush(store.store_object_id())
2717                    ],
2718                    Options { borrow_metadata_space: true, ..Default::default() },
2719                )
2720                .await
2721                .expect("new_transaction failed");
2722            root_vol.delete_volume("vol", transaction, || {}).await.expect("delete_volume failed");
2723
2724            task.await;
2725        }
2726
2727        fs.journal().compact().await.expect("compact failed");
2728        fs.close().await.expect("close failed");
2729
2730        let device = fs.take_device().await;
2731        device.reopen(false);
2732
2733        let fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2734        fsck(fs.clone()).await.expect("fsck failed");
2735        fs.close().await.expect("close failed");
2736    }
2737
2738    #[fuchsia::test]
2739    async fn test_delete_multiple_volumes() {
2740        let (mut fs, _) = test_fs().await;
2741
2742        for _ in 0..50 {
2743            {
2744                let root_vol = root_volume(fs.clone()).await.expect("root_volume failed");
2745                let store = root_vol
2746                    .new_volume("vol", NewChildStoreOptions::default())
2747                    .await
2748                    .expect("new_volume failed");
2749
2750                create_file(&store, 1_000_000).await;
2751
2752                let transaction = fs
2753                    .clone()
2754                    .new_transaction(
2755                        lock_keys![
2756                            LockKey::object(
2757                                root_vol.volume_directory().store().store_object_id(),
2758                                root_vol.volume_directory().object_id(),
2759                            ),
2760                            LockKey::flush(store.store_object_id())
2761                        ],
2762                        Options { borrow_metadata_space: true, ..Default::default() },
2763                    )
2764                    .await
2765                    .expect("new_transaction failed");
2766                root_vol
2767                    .delete_volume("vol", transaction, || {})
2768                    .await
2769                    .expect("delete_volume failed");
2770
2771                fs.allocator().flush().await.expect("flush failed");
2772            }
2773
2774            fs.close().await.expect("close failed");
2775            let device = fs.take_device().await;
2776            device.reopen(false);
2777
2778            fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2779        }
2780
2781        fsck(fs.clone()).await.expect("fsck failed");
2782        fs.close().await.expect("close failed");
2783    }
2784
2785    #[fuchsia::test]
2786    async fn test_allocate_free_reallocate() {
2787        const STORE_OBJECT_ID: u64 = 99;
2788        let (fs, allocator) = test_fs().await;
2789
2790        // Allocate some stuff.
2791        let mut device_ranges = Vec::new();
2792        let mut transaction =
2793            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2794        for _ in 0..30 {
2795            device_ranges.push(
2796                allocator
2797                    .allocate(&mut transaction, STORE_OBJECT_ID, 100 * fs.block_size())
2798                    .await
2799                    .expect("allocate failed"),
2800            );
2801        }
2802        transaction.commit().await.expect("commit failed");
2803
2804        assert_eq!(
2805            fs.block_size() * 3000,
2806            *allocator.get_owner_allocated_bytes().entry(STORE_OBJECT_ID).or_default()
2807        );
2808
2809        // Delete it all.
2810        let mut transaction =
2811            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2812        for range in std::mem::replace(&mut device_ranges, Vec::new()) {
2813            allocator.deallocate(&mut transaction, STORE_OBJECT_ID, range).await.expect("dealloc");
2814        }
2815        transaction.commit().await.expect("commit failed");
2816
2817        assert_eq!(0, *allocator.get_owner_allocated_bytes().entry(STORE_OBJECT_ID).or_default());
2818
2819        // Allocate some more stuff. Due to storage pressure, this requires us to flush device
2820        // before reusing the above space
2821        let mut transaction =
2822            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2823        let target_len = 1500 * fs.block_size();
2824        while device_ranges.iter().map(|i| i.length().unwrap()).sum::<u64>() != target_len {
2825            let len = target_len - device_ranges.iter().map(|i| i.length().unwrap()).sum::<u64>();
2826            device_ranges.push(
2827                allocator
2828                    .allocate(&mut transaction, STORE_OBJECT_ID, len)
2829                    .await
2830                    .expect("allocate failed"),
2831            );
2832        }
2833        transaction.commit().await.expect("commit failed");
2834
2835        assert_eq!(
2836            fs.block_size() * 1500,
2837            *allocator.get_owner_allocated_bytes().entry(STORE_OBJECT_ID).or_default()
2838        );
2839    }
2840
2841    #[fuchsia::test]
2842    async fn test_flush() {
2843        const STORE_OBJECT_ID: u64 = 99;
2844
2845        let mut device_ranges = Vec::new();
2846        let device = {
2847            let (fs, allocator) = test_fs().await;
2848            let mut transaction = fs
2849                .clone()
2850                .new_transaction(lock_keys![], Options::default())
2851                .await
2852                .expect("new failed");
2853            device_ranges.push(
2854                allocator
2855                    .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2856                    .await
2857                    .expect("allocate failed"),
2858            );
2859            device_ranges.push(
2860                allocator
2861                    .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2862                    .await
2863                    .expect("allocate failed"),
2864            );
2865            device_ranges.push(
2866                allocator
2867                    .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2868                    .await
2869                    .expect("allocate failed"),
2870            );
2871            transaction.commit().await.expect("commit failed");
2872
2873            allocator.flush().await.expect("flush failed");
2874
2875            fs.close().await.expect("close failed");
2876            fs.take_device().await
2877        };
2878
2879        device.reopen(false);
2880        let fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2881        let allocator = fs.allocator();
2882
2883        let allocated = collect_allocations(&allocator).await;
2884
2885        // Make sure the ranges we allocated earlier are still allocated.
2886        for i in &device_ranges {
2887            let mut overlapping = 0;
2888            for j in &allocated {
2889                overlapping += overlap(i, j);
2890            }
2891            assert_eq!(overlapping, i.length().unwrap(), "Range {i:?} not allocated");
2892        }
2893
2894        let mut transaction =
2895            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
2896        let range = allocator
2897            .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2898            .await
2899            .expect("allocate failed");
2900
2901        // Make sure the range just allocated doesn't overlap any other allocated ranges.
2902        for r in &allocated {
2903            assert_eq!(overlap(r, &range), 0);
2904        }
2905        transaction.commit().await.expect("commit failed");
2906    }
2907
2908    #[fuchsia::test]
2909    async fn test_dropped_transaction() {
2910        const STORE_OBJECT_ID: u64 = 99;
2911        let (fs, allocator) = test_fs().await;
2912        let allocated_range = {
2913            let mut transaction = fs
2914                .clone()
2915                .new_transaction(lock_keys![], Options::default())
2916                .await
2917                .expect("new_transaction failed");
2918            allocator
2919                .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2920                .await
2921                .expect("allocate failed")
2922        };
2923        // After dropping the transaction and attempting to allocate again, we should end up with
2924        // the same range because the reservation should have been released.
2925        let mut transaction = fs
2926            .clone()
2927            .new_transaction(lock_keys![], Options::default())
2928            .await
2929            .expect("new_transaction failed");
2930        assert_eq!(
2931            allocator
2932                .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2933                .await
2934                .expect("allocate failed"),
2935            allocated_range
2936        );
2937    }
2938
2939    #[fuchsia::test]
2940    async fn test_cleanup_removed_owner() {
2941        const STORE_OBJECT_ID: u64 = 99;
2942        let device = {
2943            let (fs, allocator) = test_fs().await;
2944
2945            assert!(!allocator.get_owner_allocated_bytes().contains_key(&STORE_OBJECT_ID));
2946            {
2947                let mut transaction =
2948                    fs.clone().new_transaction(lock_keys![], Options::default()).await.unwrap();
2949                allocator
2950                    .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2951                    .await
2952                    .expect("Allocating");
2953                transaction.commit().await.expect("Committing.");
2954            }
2955            allocator.flush().await.expect("Flushing");
2956            assert!(allocator.get_owner_allocated_bytes().contains_key(&STORE_OBJECT_ID));
2957            {
2958                let mut transaction =
2959                    fs.clone().new_transaction(lock_keys![], Options::default()).await.unwrap();
2960                allocator.mark_for_deletion(&mut transaction, STORE_OBJECT_ID);
2961                transaction.commit().await.expect("Committing.");
2962            }
2963            assert!(!allocator.get_owner_allocated_bytes().contains_key(&STORE_OBJECT_ID));
2964            fs.close().await.expect("Closing");
2965            fs.take_device().await
2966        };
2967
2968        device.reopen(false);
2969        let fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
2970        let allocator = fs.allocator();
2971        assert!(!allocator.get_owner_allocated_bytes().contains_key(&STORE_OBJECT_ID));
2972    }
2973
2974    #[fuchsia::test]
2975    async fn test_allocated_bytes() {
2976        const STORE_OBJECT_ID: u64 = 99;
2977        let (fs, allocator) = test_fs().await;
2978
2979        let initial_allocated_bytes = allocator.get_allocated_bytes();
2980
2981        // Verify allocated_bytes reflects allocation changes.
2982        let allocated_bytes = initial_allocated_bytes + fs.block_size();
2983        let allocated_range = {
2984            let mut transaction = fs
2985                .clone()
2986                .new_transaction(lock_keys![], Options::default())
2987                .await
2988                .expect("new_transaction failed");
2989            let range = allocator
2990                .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
2991                .await
2992                .expect("allocate failed");
2993            transaction.commit().await.expect("commit failed");
2994            assert_eq!(allocator.get_allocated_bytes(), allocated_bytes);
2995            range
2996        };
2997
2998        {
2999            let mut transaction = fs
3000                .clone()
3001                .new_transaction(lock_keys![], Options::default())
3002                .await
3003                .expect("new_transaction failed");
3004            allocator
3005                .allocate(&mut transaction, STORE_OBJECT_ID, fs.block_size())
3006                .await
3007                .expect("allocate failed");
3008
3009            // Prior to committing, the count of allocated bytes shouldn't change.
3010            assert_eq!(allocator.get_allocated_bytes(), allocated_bytes);
3011        }
3012
3013        // After dropping the prior transaction, the allocated bytes still shouldn't have changed.
3014        assert_eq!(allocator.get_allocated_bytes(), allocated_bytes);
3015
3016        // Verify allocated_bytes reflects deallocations.
3017        let deallocate_range = allocated_range.start + 20..allocated_range.end - 20;
3018        let mut transaction =
3019            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
3020        allocator
3021            .deallocate(&mut transaction, STORE_OBJECT_ID, deallocate_range)
3022            .await
3023            .expect("deallocate failed");
3024
3025        // Before committing, there should be no change.
3026        assert_eq!(allocator.get_allocated_bytes(), allocated_bytes);
3027
3028        transaction.commit().await.expect("commit failed");
3029
3030        // After committing, all but 40 bytes should remain allocated.
3031        assert_eq!(allocator.get_allocated_bytes(), initial_allocated_bytes + 40);
3032    }
3033
3034    #[fuchsia::test]
3035    async fn test_persist_bytes_limit() {
3036        const LIMIT: u64 = 12345;
3037        const OWNER_ID: u64 = 12;
3038
3039        let (fs, allocator) = test_fs().await;
3040        {
3041            let mut transaction = fs
3042                .clone()
3043                .new_transaction(lock_keys![], Options::default())
3044                .await
3045                .expect("new_transaction failed");
3046            allocator
3047                .set_bytes_limit(&mut transaction, OWNER_ID, LIMIT)
3048                .expect("Failed to set limit.");
3049            assert!(allocator.inner.lock().info.limit_bytes.get(&OWNER_ID).is_none());
3050            transaction.commit().await.expect("Failed to commit transaction");
3051            let bytes: u64 = *allocator
3052                .inner
3053                .lock()
3054                .info
3055                .limit_bytes
3056                .get(&OWNER_ID)
3057                .expect("Failed to find limit");
3058            assert_eq!(LIMIT, bytes);
3059        }
3060    }
3061
3062    /// Given a sorted list of non-overlapping ranges, this will coalesce adjacent ranges.
3063    /// This allows comparison of equivalent sets of ranges which may occur due to differences
3064    /// across allocator strategies.
3065    fn coalesce_ranges(ranges: Vec<Range<u64>>) -> Vec<Range<u64>> {
3066        let mut coalesced = Vec::new();
3067        let mut prev: Option<Range<u64>> = None;
3068        for range in ranges {
3069            if let Some(prev_range) = &mut prev {
3070                if range.start == prev_range.end {
3071                    prev_range.end = range.end;
3072                } else {
3073                    coalesced.push(prev_range.clone());
3074                    prev = Some(range);
3075                }
3076            } else {
3077                prev = Some(range);
3078            }
3079        }
3080        if let Some(prev_range) = prev {
3081            coalesced.push(prev_range);
3082        }
3083        coalesced
3084    }
3085
3086    #[fuchsia::test]
3087    async fn test_take_for_trimming() {
3088        const STORE_OBJECT_ID: u64 = 99;
3089
3090        // Allocate a large chunk, then free a few bits of it, so we have free chunks interleaved
3091        // with allocated chunks.
3092        let allocated_range;
3093        let expected_free_ranges;
3094        let device = {
3095            let (fs, allocator) = test_fs().await;
3096            let bs = fs.block_size();
3097            let mut transaction = fs
3098                .clone()
3099                .new_transaction(lock_keys![], Options::default())
3100                .await
3101                .expect("new failed");
3102            allocated_range = allocator
3103                .allocate(&mut transaction, STORE_OBJECT_ID, 32 * bs)
3104                .await
3105                .expect("allocate failed");
3106            transaction.commit().await.expect("commit failed");
3107
3108            let mut transaction = fs
3109                .clone()
3110                .new_transaction(lock_keys![], Options::default())
3111                .await
3112                .expect("new failed");
3113            let base = allocated_range.start;
3114            expected_free_ranges = vec![
3115                base..(base + (bs * 1)),
3116                (base + (bs * 2))..(base + (bs * 3)),
3117                // Note that the next three ranges are adjacent and will be treated as one free
3118                // range once applied.  We separate them here to exercise the handling of "large"
3119                // free ranges.
3120                (base + (bs * 4))..(base + (bs * 8)),
3121                (base + (bs * 8))..(base + (bs * 12)),
3122                (base + (bs * 12))..(base + (bs * 13)),
3123                (base + (bs * 29))..(base + (bs * 30)),
3124            ];
3125            for range in &expected_free_ranges {
3126                allocator
3127                    .deallocate(&mut transaction, STORE_OBJECT_ID, range.clone())
3128                    .await
3129                    .expect("deallocate failed");
3130            }
3131            transaction.commit().await.expect("commit failed");
3132
3133            allocator.flush().await.expect("flush failed");
3134
3135            fs.close().await.expect("close failed");
3136            fs.take_device().await
3137        };
3138
3139        device.reopen(false);
3140        let fs = FxFilesystemBuilder::new().open(device).await.expect("open failed");
3141        let allocator = fs.allocator();
3142
3143        // These values were picked so that each of them would be the reason why
3144        // collect_free_extents finished, and so we would return after partially processing one of
3145        // the free extents.
3146        let max_extent_size = fs.block_size() as usize * 4;
3147        const EXTENTS_PER_BATCH: usize = 2;
3148        let mut free_ranges = vec![];
3149        let mut offset = allocated_range.start;
3150        while offset < allocated_range.end {
3151            let free = allocator
3152                .take_for_trimming(offset, max_extent_size, EXTENTS_PER_BATCH)
3153                .await
3154                .expect("take_for_trimming failed");
3155            free_ranges.extend(
3156                free.extents().iter().filter(|range| range.end <= allocated_range.end).cloned(),
3157            );
3158            offset = free.extents().last().expect("Unexpectedly hit the end of free extents").end;
3159        }
3160        // Coalesce adjacent free ranges because the allocator may return smaller aligned chunks
3161        // but the overall range should always be equivalent.
3162        let coalesced_free_ranges = coalesce_ranges(free_ranges);
3163        let coalesced_expected_free_ranges = coalesce_ranges(expected_free_ranges);
3164
3165        assert_eq!(coalesced_free_ranges, coalesced_expected_free_ranges);
3166    }
3167
3168    #[fuchsia::test]
3169    async fn test_allocations_wait_for_free_extents() {
3170        const STORE_OBJECT_ID: u64 = 99;
3171        let (fs, allocator) = test_fs().await;
3172        let allocator_clone = allocator.clone();
3173
3174        let mut transaction =
3175            fs.clone().new_transaction(lock_keys![], Options::default()).await.expect("new failed");
3176
3177        // Tie up all of the free extents on the device, and make sure allocations block.
3178        let max_extent_size = fs.device().size() as usize;
3179        const EXTENTS_PER_BATCH: usize = usize::MAX;
3180
3181        // HACK: Treat `trimmable_extents` as being locked by `trim_done` (i.e. it should only be
3182        // accessed whilst `trim_done` is locked). We can't combine them into the same mutex,
3183        // because the inner type would be "poisoned" by the lifetime parameter of
3184        // `trimmable_extents` (which is in the lifetime of `allocator`), and then we can't move it
3185        // into `alloc_task` which would require a `'static` lifetime.
3186        let trim_done = Arc::new(Mutex::new(false));
3187        let trimmable_extents = allocator
3188            .take_for_trimming(0, max_extent_size, EXTENTS_PER_BATCH)
3189            .await
3190            .expect("take_for_trimming failed");
3191
3192        let trim_done_clone = trim_done.clone();
3193        let bs = fs.block_size();
3194        let alloc_task = fasync::Task::spawn(async move {
3195            allocator_clone
3196                .allocate(&mut transaction, STORE_OBJECT_ID, bs)
3197                .await
3198                .expect("allocate should fail");
3199            {
3200                assert!(*trim_done_clone.lock(), "Allocation finished before trim completed");
3201            }
3202            transaction.commit().await.expect("commit failed");
3203        });
3204
3205        // Add a small delay to simulate the trim taking some nonzero amount of time.  Otherwise,
3206        // this will almost certainly always beat the allocation attempt.
3207        fasync::Timer::new(std::time::Duration::from_millis(100)).await;
3208
3209        // Once the free extents are released, the task should unblock.
3210        {
3211            let mut trim_done = trim_done.lock();
3212            std::mem::drop(trimmable_extents);
3213            *trim_done = true;
3214        }
3215
3216        alloc_task.await;
3217    }
3218
3219    #[fuchsia::test]
3220    async fn test_allocation_with_reservation_not_multiple_of_block_size() {
3221        const STORE_OBJECT_ID: u64 = 99;
3222        let (fs, allocator) = test_fs().await;
3223
3224        // Reserve an amount that isn't a multiple of the block size.
3225        const RESERVATION_AMOUNT: u64 = TRANSACTION_METADATA_MAX_AMOUNT + 5000;
3226        let reservation =
3227            allocator.clone().reserve(Some(STORE_OBJECT_ID), RESERVATION_AMOUNT).unwrap();
3228
3229        let mut transaction = fs
3230            .clone()
3231            .new_transaction(
3232                lock_keys![],
3233                Options { allocator_reservation: Some(&reservation), ..Options::default() },
3234            )
3235            .await
3236            .expect("new failed");
3237
3238        let range = allocator
3239            .allocate(
3240                &mut transaction,
3241                STORE_OBJECT_ID,
3242                round_up(RESERVATION_AMOUNT, fs.block_size()).unwrap(),
3243            )
3244            .await
3245            .expect("allocate faiiled");
3246        assert_eq!((range.end - range.start) % fs.block_size(), 0);
3247
3248        println!("{}", range.end - range.start);
3249    }
3250}