fxfs/object_store/
object_manager.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::errors::FxfsError;
6use crate::filesystem::{ApplyContext, ApplyMode, JournalingObject};
7use crate::log::*;
8use crate::metrics;
9use crate::object_handle::INVALID_OBJECT_ID;
10use crate::object_store::allocator::{Allocator, Reservation};
11use crate::object_store::directory::Directory;
12use crate::object_store::journal::{self, JournalCheckpoint};
13use crate::object_store::transaction::{
14    AssocObj, AssociatedObject, MetadataReservation, Mutation, Transaction, TxnMutation,
15};
16use crate::object_store::tree_cache::TreeCache;
17use crate::object_store::volume::{list_volumes, VOLUMES_DIRECTORY};
18use crate::object_store::{ObjectDescriptor, ObjectStore};
19use crate::round::round_div;
20use crate::serialized_types::{Version, LATEST_VERSION};
21use anyhow::{anyhow, bail, ensure, Context, Error};
22use fuchsia_inspect::{Property as _, UintProperty};
23use fuchsia_sync::RwLock;
24use futures::FutureExt as _;
25use once_cell::sync::OnceCell;
26use rustc_hash::FxHashMap as HashMap;
27use std::collections::hash_map::Entry;
28use std::num::Saturating;
29use std::sync::Arc;
30
31// Data written to the journal eventually needs to be flushed somewhere (typically into layer
32// files).  Here we conservatively assume that could take up to four times as much space as it does
33// in the journal.  In the layer file, it'll take up at least as much, but we must reserve the same
34// again that so that there's enough space for compactions, and then we need some spare for
35// overheads.
36//
37// TODO(https://fxbug.dev/42178158): We should come up with a better way of determining what the multiplier
38// should be here.  2x was too low, as it didn't cover any space for metadata.  4x might be too
39// much.
40pub const fn reserved_space_from_journal_usage(journal_usage: u64) -> u64 {
41    journal_usage * 4
42}
43
44/// ObjectManager is a global loading cache for object stores and other special objects.
45pub struct ObjectManager {
46    inner: RwLock<Inner>,
47    metadata_reservation: OnceCell<Reservation>,
48    volume_directory: OnceCell<Directory<ObjectStore>>,
49    on_new_store: Option<Box<dyn Fn(&ObjectStore) + Send + Sync>>,
50}
51
52// Whilst we are flushing we need to keep track of the old checkpoint that we are hoping to flush,
53// and a new one that should apply if we successfully finish the flush.
54#[derive(Debug)]
55enum Checkpoints {
56    Current(JournalCheckpoint),
57    Old(JournalCheckpoint),
58    Both(/* old: */ JournalCheckpoint, /* current: */ JournalCheckpoint),
59}
60
61impl Checkpoints {
62    // Returns the earliest checkpoint (which will always be the old one if present).
63    fn earliest(&self) -> &JournalCheckpoint {
64        match self {
65            Checkpoints::Old(x) | Checkpoints::Both(x, _) | Checkpoints::Current(x) => x,
66        }
67    }
68}
69
70// We currently maintain strong references to all stores that have been opened, but there's no
71// currently no mechanism for releasing stores that aren't being used.
72struct Inner {
73    stores: HashMap<u64, Arc<ObjectStore>>,
74    root_parent_store_object_id: u64,
75    root_store_object_id: u64,
76    allocator_object_id: u64,
77    allocator: Option<Arc<Allocator>>,
78
79    // Records dependencies on the journal for objects i.e. an entry for object ID 1, would mean it
80    // has a dependency on journal records from that offset.
81    journal_checkpoints: HashMap<u64, Checkpoints>,
82
83    // Mappings from object-id to a target reservation amount.  The object IDs here are from the
84    // root store namespace, so it can be associated with any object in the root store.  A
85    // reservation will be made to cover the *maximum* in this map, since it is assumed that any
86    // requirement is only temporary, for the duration of a compaction, and that once compaction has
87    // finished for a particular object, the space will be recovered.
88    reservations: HashMap<u64, u64>,
89
90    // The last journal end offset for a transaction that has been applied.  This is not necessarily
91    // the same as the start offset for the next transaction because of padding.
92    last_end_offset: u64,
93
94    // A running counter that tracks metadata space that has been borrowed on the understanding that
95    // eventually it will be recovered (potentially after a full compaction).
96    borrowed_metadata_space: u64,
97
98    // The maximum transaction size that has been encountered so far.
99    max_transaction_size: (u64, UintProperty),
100
101    // Extra temporary space that might be tied up in the journal that hasn't yet been deallocated.
102    reserved_space: u64,
103}
104
105impl Inner {
106    fn earliest_journal_offset(&self) -> Option<u64> {
107        self.journal_checkpoints.values().map(|c| c.earliest().file_offset).min()
108    }
109
110    // Returns the required size of the metadata reservation assuming that no space has been
111    // borrowed.  The invariant is: reservation-size + borrowed-space = required.
112    fn required_reservation(&self) -> u64 {
113        // Start with the maximum amount of temporary space we might need during compactions.
114        self.reservations.values().max().unwrap_or(&0)
115
116        // Account for data that has been written to the journal that will need to be written
117        // to layer files when flushed.
118            + self.earliest_journal_offset()
119            .map(|min| reserved_space_from_journal_usage(self.last_end_offset - min))
120            .unwrap_or(0)
121
122        // Extra reserved space
123            + self.reserved_space
124    }
125
126    fn object(&self, object_id: u64) -> Option<Arc<dyn JournalingObject>> {
127        if object_id == self.allocator_object_id {
128            Some(self.allocator.clone().unwrap() as Arc<dyn JournalingObject>)
129        } else {
130            self.stores.get(&object_id).map(|x| x.clone() as Arc<dyn JournalingObject>)
131        }
132    }
133}
134
135impl ObjectManager {
136    pub fn new(on_new_store: Option<Box<dyn Fn(&ObjectStore) + Send + Sync>>) -> ObjectManager {
137        ObjectManager {
138            inner: RwLock::new(Inner {
139                stores: HashMap::default(),
140                root_parent_store_object_id: INVALID_OBJECT_ID,
141                root_store_object_id: INVALID_OBJECT_ID,
142                allocator_object_id: INVALID_OBJECT_ID,
143                allocator: None,
144                journal_checkpoints: HashMap::default(),
145                reservations: HashMap::default(),
146                last_end_offset: 0,
147                borrowed_metadata_space: 0,
148                max_transaction_size: (0, metrics::detail().create_uint("max_transaction_size", 0)),
149                reserved_space: journal::RESERVED_SPACE,
150            }),
151            metadata_reservation: OnceCell::new(),
152            volume_directory: OnceCell::new(),
153            on_new_store,
154        }
155    }
156
157    pub fn required_reservation(&self) -> u64 {
158        self.inner.read().required_reservation()
159    }
160
161    pub fn root_parent_store_object_id(&self) -> u64 {
162        self.inner.read().root_parent_store_object_id
163    }
164
165    pub fn root_parent_store(&self) -> Arc<ObjectStore> {
166        let inner = self.inner.read();
167        inner.stores.get(&inner.root_parent_store_object_id).unwrap().clone()
168    }
169
170    pub fn set_root_parent_store(&self, store: Arc<ObjectStore>) {
171        if let Some(on_new_store) = &self.on_new_store {
172            on_new_store(&store);
173        }
174        let mut inner = self.inner.write();
175        let store_id = store.store_object_id();
176        inner.stores.insert(store_id, store);
177        inner.root_parent_store_object_id = store_id;
178    }
179
180    pub fn root_store_object_id(&self) -> u64 {
181        self.inner.read().root_store_object_id
182    }
183
184    pub fn root_store(&self) -> Arc<ObjectStore> {
185        let inner = self.inner.read();
186        inner.stores.get(&inner.root_store_object_id).unwrap().clone()
187    }
188
189    pub fn set_root_store(&self, store: Arc<ObjectStore>) {
190        if let Some(on_new_store) = &self.on_new_store {
191            on_new_store(&store);
192        }
193        let mut inner = self.inner.write();
194        let store_id = store.store_object_id();
195        inner.stores.insert(store_id, store);
196        inner.root_store_object_id = store_id;
197    }
198
199    pub fn is_system_store(&self, store_id: u64) -> bool {
200        let inner = self.inner.read();
201        store_id == inner.root_store_object_id || store_id == inner.root_parent_store_object_id
202    }
203
204    /// Returns the store which might or might not be locked.
205    pub fn store(&self, store_object_id: u64) -> Option<Arc<ObjectStore>> {
206        self.inner.read().stores.get(&store_object_id).cloned()
207    }
208
209    /// This is not thread-safe: it assumes that a store won't be forgotten whilst the loop is
210    /// running.  This is to be used after replaying the journal.
211    pub async fn on_replay_complete(&self) -> Result<(), Error> {
212        let root_store = self.root_store();
213
214        let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
215            .await
216            .context("Unable to open root volume directory")?;
217
218        match root_directory.lookup(VOLUMES_DIRECTORY).await? {
219            None => bail!("Root directory not found"),
220            Some((object_id, ObjectDescriptor::Directory)) => {
221                let volume_directory = Directory::open(&root_store, object_id)
222                    .await
223                    .context("Unable to open volumes directory")?;
224                self.volume_directory.set(volume_directory).unwrap();
225            }
226            _ => {
227                bail!(anyhow!(FxfsError::Inconsistent)
228                    .context("Unexpected type for volumes directory"))
229            }
230        }
231
232        let object_ids = list_volumes(self.volume_directory.get().unwrap())
233            .await
234            .context("Failed to list volumes")?;
235
236        for store_id in object_ids {
237            self.open_store(&root_store, store_id).await?;
238        }
239
240        // This can fail if a filesystem is created and truncated to a size
241        // that doesn't leave enough free space for metadata reservations.
242        self.init_metadata_reservation()
243            .context("Insufficient free space for metadata reservation.")?;
244
245        Ok(())
246    }
247
248    pub fn volume_directory(&self) -> &Directory<ObjectStore> {
249        self.volume_directory.get().unwrap()
250    }
251
252    pub fn set_volume_directory(&self, volume_directory: Directory<ObjectStore>) {
253        self.volume_directory.set(volume_directory).unwrap();
254    }
255
256    pub fn add_store(&self, store: Arc<ObjectStore>) {
257        if let Some(on_new_store) = &self.on_new_store {
258            on_new_store(&store);
259        }
260        let mut inner = self.inner.write();
261        let store_object_id = store.store_object_id();
262        assert_ne!(store_object_id, inner.root_parent_store_object_id);
263        assert_ne!(store_object_id, inner.root_store_object_id);
264        assert_ne!(store_object_id, inner.allocator_object_id);
265        inner.stores.insert(store_object_id, store);
266    }
267
268    pub fn forget_store(&self, store_object_id: u64) {
269        let mut inner = self.inner.write();
270        assert_ne!(store_object_id, inner.allocator_object_id);
271        inner.stores.remove(&store_object_id);
272        inner.reservations.remove(&store_object_id);
273    }
274
275    pub fn set_allocator(&self, allocator: Arc<Allocator>) {
276        let mut inner = self.inner.write();
277        assert!(!inner.stores.contains_key(&allocator.object_id()));
278        inner.allocator_object_id = allocator.object_id();
279        inner.allocator = Some(allocator);
280    }
281
282    pub fn allocator(&self) -> Arc<Allocator> {
283        self.inner.read().allocator.clone().unwrap()
284    }
285
286    /// Applies `mutation` to `object` with `context`.
287    pub fn apply_mutation(
288        &self,
289        object_id: u64,
290        mutation: Mutation,
291        context: &ApplyContext<'_, '_>,
292        associated_object: AssocObj<'_>,
293    ) -> Result<(), Error> {
294        debug!(oid = object_id, mutation:?; "applying mutation");
295        let object = {
296            let mut inner = self.inner.write();
297            match mutation {
298                Mutation::BeginFlush => {
299                    if let Some(entry) = inner.journal_checkpoints.get_mut(&object_id) {
300                        match entry {
301                            Checkpoints::Current(x) | Checkpoints::Both(x, _) => {
302                                *entry = Checkpoints::Old(x.clone());
303                            }
304                            _ => {}
305                        }
306                    }
307                }
308                Mutation::EndFlush => {
309                    if let Entry::Occupied(mut o) = inner.journal_checkpoints.entry(object_id) {
310                        let entry = o.get_mut();
311                        match entry {
312                            Checkpoints::Old(_) => {
313                                o.remove();
314                            }
315                            Checkpoints::Both(_, x) => {
316                                *entry = Checkpoints::Current(x.clone());
317                            }
318                            _ => {}
319                        }
320                    }
321                }
322                Mutation::DeleteVolume => {
323                    inner.stores.remove(&object_id);
324                    inner.reservations.remove(&object_id);
325                    inner.journal_checkpoints.remove(&object_id);
326                    return Ok(());
327                }
328                _ => {
329                    if object_id != inner.root_parent_store_object_id {
330                        inner
331                            .journal_checkpoints
332                            .entry(object_id)
333                            .and_modify(|entry| {
334                                if let Checkpoints::Old(x) = entry {
335                                    *entry =
336                                        Checkpoints::Both(x.clone(), context.checkpoint.clone());
337                                }
338                            })
339                            .or_insert_with(|| Checkpoints::Current(context.checkpoint.clone()));
340                    }
341                }
342            }
343            if object_id == inner.allocator_object_id {
344                inner.allocator.clone().unwrap() as Arc<dyn JournalingObject>
345            } else {
346                inner.stores.get(&object_id).unwrap().clone() as Arc<dyn JournalingObject>
347            }
348        };
349        associated_object.map(|o| o.will_apply_mutation(&mutation, object_id, self));
350        object.apply_mutation(mutation, context, associated_object)
351    }
352
353    /// Replays `mutations` for a single transaction.  `journal_offsets` contains the per-object
354    /// starting offsets; if the current transaction offset precedes an offset, the mutations for
355    /// that object are ignored.  `context` contains the location in the journal file for this
356    /// transaction and `end_offset` is the ending journal offset for this transaction.
357    pub async fn replay_mutations(
358        &self,
359        mutations: Vec<(u64, Mutation)>,
360        journal_offsets: &HashMap<u64, u64>,
361        context: &ApplyContext<'_, '_>,
362        end_offset: u64,
363    ) -> Result<(), Error> {
364        debug!(checkpoint = context.checkpoint.file_offset; "REPLAY");
365        let txn_size = {
366            let mut inner = self.inner.write();
367            if end_offset > inner.last_end_offset {
368                Some(end_offset - std::mem::replace(&mut inner.last_end_offset, end_offset))
369            } else {
370                None
371            }
372        };
373
374        let allocator_object_id = self.inner.read().allocator_object_id;
375
376        for (object_id, mutation) in mutations {
377            if let Mutation::UpdateBorrowed(borrowed) = mutation {
378                if let Some(txn_size) = txn_size {
379                    self.inner.write().borrowed_metadata_space = borrowed
380                        .checked_add(reserved_space_from_journal_usage(txn_size))
381                        .ok_or(FxfsError::Inconsistent)?;
382                }
383                continue;
384            }
385
386            // Don't replay mutations if the object doesn't want it.
387            if let Some(&offset) = journal_offsets.get(&object_id) {
388                if context.checkpoint.file_offset < offset {
389                    continue;
390                }
391            }
392
393            // If this is the first time we've encountered this store, we'll need to open it.
394            if object_id != allocator_object_id {
395                self.open_store(&self.root_store(), object_id).await?;
396            }
397
398            self.apply_mutation(object_id, mutation, context, AssocObj::None)?;
399        }
400        Ok(())
401    }
402
403    /// Opens the specified store if it isn't already.  This is *not* thread-safe.
404    async fn open_store(&self, parent: &Arc<ObjectStore>, object_id: u64) -> Result<(), Error> {
405        if self.inner.read().stores.contains_key(&object_id) {
406            return Ok(());
407        }
408        let store = ObjectStore::open(parent, object_id, Box::new(TreeCache::new()))
409            .await
410            .with_context(|| format!("Failed to open store {object_id}"))?;
411        if let Some(on_new_store) = &self.on_new_store {
412            on_new_store(&store);
413        }
414        assert!(self.inner.write().stores.insert(object_id, store).is_none());
415        Ok(())
416    }
417
418    /// Called by the journaling system to apply a transaction.  `checkpoint` indicates the location
419    /// in the journal file for this transaction.  Returns an optional mutation to be written to be
420    /// included with the transaction.
421    pub fn apply_transaction(
422        &self,
423        transaction: &mut Transaction<'_>,
424        checkpoint: &JournalCheckpoint,
425    ) -> Result<Option<Mutation>, Error> {
426        // Record old values so we can see what changes as a result of this transaction.
427        let old_amount = self.metadata_reservation().amount();
428        let old_required = self.inner.read().required_reservation();
429
430        debug!(checkpoint = checkpoint.file_offset; "BEGIN TXN");
431        let mutations = transaction.take_mutations();
432        let context =
433            ApplyContext { mode: ApplyMode::Live(transaction), checkpoint: checkpoint.clone() };
434        for TxnMutation { object_id, mutation, associated_object, .. } in mutations {
435            self.apply_mutation(object_id, mutation, &context, associated_object)?;
436        }
437        debug!("END TXN");
438
439        Ok(if let MetadataReservation::Borrowed = transaction.metadata_reservation {
440            // If this transaction is borrowing metadata, figure out what has changed and return a
441            // mutation with the updated value for borrowed.  The transaction might have allocated
442            // or deallocated some data from the metadata reservation, or it might have made a
443            // change that means we need to reserve more or less space (e.g. we compacted).
444            let new_amount = self.metadata_reservation().amount();
445            let mut inner = self.inner.write();
446            let new_required = inner.required_reservation();
447            let add = old_amount + new_required;
448            let sub = new_amount + old_required;
449            if add >= sub {
450                inner.borrowed_metadata_space += add - sub;
451            } else {
452                inner.borrowed_metadata_space =
453                    inner.borrowed_metadata_space.saturating_sub(sub - add);
454            }
455            Some(Mutation::UpdateBorrowed(inner.borrowed_metadata_space))
456        } else {
457            // This transaction should have had no impact on the metadata reservation or the amount
458            // we need to reserve.
459            debug_assert_eq!(self.metadata_reservation().amount(), old_amount);
460            debug_assert_eq!(self.inner.read().required_reservation(), old_required);
461            None
462        })
463    }
464
465    /// Called by the journaling system after a transaction has been written providing the end
466    /// offset for the transaction so that we can adjust borrowed metadata space accordingly.
467    pub fn did_commit_transaction(
468        &self,
469        transaction: &mut Transaction<'_>,
470        _checkpoint: &JournalCheckpoint,
471        end_offset: u64,
472    ) {
473        let reservation = self.metadata_reservation();
474        let mut inner = self.inner.write();
475        let journal_usage = end_offset - std::mem::replace(&mut inner.last_end_offset, end_offset);
476
477        if journal_usage > inner.max_transaction_size.0 {
478            inner.max_transaction_size.0 = journal_usage;
479            inner.max_transaction_size.1.set(journal_usage);
480        }
481
482        let txn_space = reserved_space_from_journal_usage(journal_usage);
483        match &mut transaction.metadata_reservation {
484            MetadataReservation::None => unreachable!(),
485            MetadataReservation::Borrowed => {
486                // Account for the amount we need to borrow for the transaction itself now that we
487                // know the transaction size.
488                inner.borrowed_metadata_space += txn_space;
489
490                // This transaction borrowed metadata space, but it might have returned space to the
491                // transaction that we can now give back to the allocator.
492                let to_give_back = (reservation.amount() + inner.borrowed_metadata_space)
493                    .saturating_sub(inner.required_reservation());
494                if to_give_back > 0 {
495                    reservation.give_back(to_give_back);
496                }
497            }
498            MetadataReservation::Hold(hold_amount) => {
499                // Transfer reserved space into the metadata reservation.
500                let txn_reservation = transaction.allocator_reservation.unwrap();
501                assert_ne!(
502                    txn_reservation as *const _, reservation as *const _,
503                    "MetadataReservation::Borrowed should be used."
504                );
505                txn_reservation.commit(txn_space);
506                if txn_reservation.owner_object_id() != reservation.owner_object_id() {
507                    assert_eq!(
508                        reservation.owner_object_id(),
509                        None,
510                        "Should not be mixing attributed owners."
511                    );
512                    inner
513                        .allocator
514                        .as_ref()
515                        .unwrap()
516                        .disown_reservation(txn_reservation.owner_object_id(), txn_space);
517                }
518                if let Some(amount) = hold_amount.checked_sub(txn_space) {
519                    *hold_amount = amount;
520                } else {
521                    panic!("Transaction was larger than metadata reservation");
522                }
523                reservation.add(txn_space);
524            }
525            MetadataReservation::Reservation(txn_reservation) => {
526                // Transfer reserved space into the metadata reservation.
527                txn_reservation.move_to(reservation, txn_space);
528            }
529        }
530        // Check that our invariant holds true.
531        debug_assert_eq!(
532            reservation.amount() + inner.borrowed_metadata_space,
533            inner.required_reservation(),
534            "txn_space: {}, reservation_amount: {}, borrowed: {}, required: {}",
535            txn_space,
536            reservation.amount(),
537            inner.borrowed_metadata_space,
538            inner.required_reservation(),
539        );
540    }
541
542    /// Drops a transaction.  This is called automatically when a transaction is dropped.  If the
543    /// transaction has been committed, it should contain no mutations and so nothing will get rolled
544    /// back.  For each mutation, drop_mutation is called to allow for roll back (e.g. the allocator
545    /// will unreserve allocations).
546    pub fn drop_transaction(&self, transaction: &mut Transaction<'_>) {
547        for TxnMutation { object_id, mutation, .. } in transaction.take_mutations() {
548            self.object(object_id).map(|o| o.drop_mutation(mutation, transaction));
549        }
550    }
551
552    /// Returns the journal file offsets that each object depends on and the checkpoint for the
553    /// minimum offset.
554    pub fn journal_file_offsets(&self) -> (HashMap<u64, u64>, Option<JournalCheckpoint>) {
555        let inner = self.inner.read();
556        let mut min_checkpoint = None;
557        let mut offsets = HashMap::default();
558        for (&object_id, checkpoint) in &inner.journal_checkpoints {
559            let checkpoint = checkpoint.earliest();
560            match &mut min_checkpoint {
561                None => min_checkpoint = Some(checkpoint),
562                Some(ref mut min_checkpoint) => {
563                    if checkpoint.file_offset < min_checkpoint.file_offset {
564                        *min_checkpoint = checkpoint;
565                    }
566                }
567            }
568            offsets.insert(object_id, checkpoint.file_offset);
569        }
570        (offsets, min_checkpoint.cloned())
571    }
572
573    /// Returns the checkpoint into the journal that the object depends on, or None if the object
574    /// has no journaled updates.
575    pub fn journal_checkpoint(&self, object_id: u64) -> Option<JournalCheckpoint> {
576        self.inner
577            .read()
578            .journal_checkpoints
579            .get(&object_id)
580            .map(|checkpoints| checkpoints.earliest().clone())
581    }
582
583    /// Returns true if the object identified by `object_id` is known to have updates recorded in
584    /// the journal that the object depends upon.
585    pub fn needs_flush(&self, object_id: u64) -> bool {
586        self.inner.read().journal_checkpoints.contains_key(&object_id)
587    }
588
589    /// Flushes all known objects.  This will then allow the journal space to be freed.
590    ///
591    /// Also returns the earliest known version of a struct on the filesystem.
592    pub async fn flush(&self) -> Result<Version, Error> {
593        let objects = {
594            let inner = self.inner.read();
595            let mut object_ids = inner.journal_checkpoints.keys().cloned().collect::<Vec<_>>();
596            // Process objects in reverse sorted order because that will mean we compact the root
597            // object store last which will ensure we include the metadata from the compactions of
598            // other objects.
599            object_ids.sort_unstable();
600            object_ids
601                .iter()
602                .rev()
603                .map(|oid| (*oid, inner.object(*oid).unwrap()))
604                .collect::<Vec<_>>()
605        };
606
607        // As we iterate, keep track of the earliest version used by structs in these objects
608        let mut earliest_version: Version = LATEST_VERSION;
609        for (object_id, object) in objects {
610            let object_earliest_version =
611                object.flush().await.with_context(|| format!("Failed to flush oid {object_id}"))?;
612            if object_earliest_version < earliest_version {
613                earliest_version = object_earliest_version;
614            }
615        }
616
617        Ok(earliest_version)
618    }
619
620    fn object(&self, object_id: u64) -> Option<Arc<dyn JournalingObject>> {
621        self.inner.read().object(object_id)
622    }
623
624    pub fn init_metadata_reservation(&self) -> Result<(), Error> {
625        let inner = self.inner.read();
626        let required = inner.required_reservation();
627        ensure!(required >= inner.borrowed_metadata_space, FxfsError::Inconsistent);
628        let allocator = inner.allocator.as_ref().cloned().unwrap();
629        self.metadata_reservation
630            .set(
631                allocator
632                    .clone()
633                    .reserve(None, inner.required_reservation() - inner.borrowed_metadata_space)
634                    .with_context(|| {
635                        format!(
636                            "Failed to reserve {} - {} = {} bytes, free={}, \
637                             owner_bytes={}",
638                            inner.required_reservation(),
639                            inner.borrowed_metadata_space,
640                            inner.required_reservation() - inner.borrowed_metadata_space,
641                            Saturating(allocator.get_disk_bytes()) - allocator.get_used_bytes(),
642                            allocator.owner_bytes_debug(),
643                        )
644                    })?,
645            )
646            .unwrap();
647        Ok(())
648    }
649
650    pub fn metadata_reservation(&self) -> &Reservation {
651        self.metadata_reservation.get().unwrap()
652    }
653
654    pub fn update_reservation(&self, object_id: u64, amount: u64) {
655        self.inner.write().reservations.insert(object_id, amount);
656    }
657
658    pub fn reservation(&self, object_id: u64) -> Option<u64> {
659        self.inner.read().reservations.get(&object_id).cloned()
660    }
661
662    pub fn set_reserved_space(&self, amount: u64) {
663        self.inner.write().reserved_space = amount;
664    }
665
666    pub fn last_end_offset(&self) -> u64 {
667        self.inner.read().last_end_offset
668    }
669
670    pub fn set_last_end_offset(&self, v: u64) {
671        self.inner.write().last_end_offset = v;
672    }
673
674    pub fn borrowed_metadata_space(&self) -> u64 {
675        self.inner.read().borrowed_metadata_space
676    }
677
678    pub fn set_borrowed_metadata_space(&self, v: u64) {
679        self.inner.write().borrowed_metadata_space = v;
680    }
681
682    pub fn write_mutation(&self, object_id: u64, mutation: &Mutation, writer: journal::Writer<'_>) {
683        self.object(object_id).unwrap().write_mutation(mutation, writer);
684    }
685
686    pub fn unlocked_stores(&self) -> Vec<Arc<ObjectStore>> {
687        let inner = self.inner.read();
688        let mut stores = Vec::new();
689        for store in inner.stores.values() {
690            if !store.is_locked() {
691                stores.push(store.clone());
692            }
693        }
694        stores
695    }
696
697    /// Creates a lazy inspect node named `str` under `parent` which will yield statistics for the
698    /// object manager when queried.
699    pub fn track_statistics(self: &Arc<Self>, parent: &fuchsia_inspect::Node, name: &str) {
700        let this = Arc::downgrade(self);
701        parent.record_lazy_child(name, move || {
702            let this_clone = this.clone();
703            async move {
704                let inspector = fuchsia_inspect::Inspector::default();
705                if let Some(this) = this_clone.upgrade() {
706                    let (required, borrowed, earliest_checkpoint) = {
707                        // TODO(https://fxbug.dev/42069513): Push-back or rate-limit to prevent DoS.
708                        let inner = this.inner.read();
709                        (
710                            inner.required_reservation(),
711                            inner.borrowed_metadata_space,
712                            inner.earliest_journal_offset(),
713                        )
714                    };
715                    let root = inspector.root();
716                    root.record_uint("metadata_reservation", this.metadata_reservation().amount());
717                    root.record_uint("required_reservation", required);
718                    root.record_uint("borrowed_reservation", borrowed);
719                    if let Some(earliest_checkpoint) = earliest_checkpoint {
720                        root.record_uint("earliest_checkpoint", earliest_checkpoint);
721                    }
722
723                    // TODO(https://fxbug.dev/42068224): Post-compute rather than manually computing metrics.
724                    if let Some(x) = round_div(100 * borrowed, required) {
725                        root.record_uint("borrowed_to_required_reservation_percent", x);
726                    }
727                }
728                Ok(inspector)
729            }
730            .boxed()
731        });
732    }
733
734    /// Normally, we make new transactions pay for overheads incurred by the journal, such as
735    /// checksums and padding, but if the journal has discarded a significant amount after a replay,
736    /// we run the risk of there not being enough reserved.  To handle this, if the amount is
737    /// significant, we force the journal to borrow the space (using a journal created transaction).
738    pub fn needs_borrow_for_journal(&self, checkpoint: u64) -> bool {
739        checkpoint.checked_sub(self.inner.read().last_end_offset).unwrap() > 256
740    }
741}
742
743/// ReservationUpdate is an associated object that sets the amount reserved for an object
744/// (overwriting any previous amount). Updates must be applied as part of a transaction before
745/// did_commit_transaction runs because it will reconcile the accounting for reserved metadata
746/// space.
747pub struct ReservationUpdate(u64);
748
749impl ReservationUpdate {
750    pub fn new(amount: u64) -> Self {
751        Self(amount)
752    }
753}
754
755impl AssociatedObject for ReservationUpdate {
756    fn will_apply_mutation(&self, _mutation: &Mutation, object_id: u64, manager: &ObjectManager) {
757        manager.update_reservation(object_id, self.0);
758    }
759}