fxfs/object_store/
object_manager.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::errors::FxfsError;
6use crate::filesystem::{ApplyContext, ApplyMode, JournalingObject};
7use crate::log::*;
8use crate::metrics;
9use crate::object_handle::INVALID_OBJECT_ID;
10use crate::object_store::allocator::{Allocator, Reservation};
11use crate::object_store::directory::Directory;
12use crate::object_store::journal::{self, JournalCheckpoint};
13use crate::object_store::transaction::{
14    AssocObj, AssociatedObject, MetadataReservation, Mutation, Transaction, TxnMutation,
15};
16use crate::object_store::tree_cache::TreeCache;
17use crate::object_store::volume::{VOLUMES_DIRECTORY, list_volumes};
18use crate::object_store::{ObjectDescriptor, ObjectStore};
19use crate::round::round_div;
20use crate::serialized_types::{LATEST_VERSION, Version};
21use anyhow::{Context, Error, anyhow, bail, ensure};
22use fuchsia_inspect::{Property as _, UintProperty};
23use fuchsia_sync::RwLock;
24use futures::FutureExt as _;
25use once_cell::sync::OnceCell;
26use rustc_hash::FxHashMap as HashMap;
27use std::collections::hash_map::Entry;
28use std::num::Saturating;
29use std::sync::Arc;
30
31// Data written to the journal eventually needs to be flushed somewhere (typically into layer
32// files).  Here we conservatively assume that could take up to four times as much space as it does
33// in the journal.  In the layer file, it'll take up at least as much, but we must reserve the same
34// again that so that there's enough space for compactions, and then we need some spare for
35// overheads.
36//
37// TODO(https://fxbug.dev/42178158): We should come up with a better way of determining what the multiplier
38// should be here.  2x was too low, as it didn't cover any space for metadata.  4x might be too
39// much.
40pub const fn reserved_space_from_journal_usage(journal_usage: u64) -> u64 {
41    journal_usage * 4
42}
43
44/// ObjectManager is a global loading cache for object stores and other special objects.
45pub struct ObjectManager {
46    inner: RwLock<Inner>,
47    metadata_reservation: OnceCell<Reservation>,
48    volume_directory: OnceCell<Directory<ObjectStore>>,
49    on_new_store: Option<Box<dyn Fn(&ObjectStore) + Send + Sync>>,
50}
51
52// Whilst we are flushing we need to keep track of the old checkpoint that we are hoping to flush,
53// and a new one that should apply if we successfully finish the flush.
54#[derive(Debug)]
55enum Checkpoints {
56    Current(JournalCheckpoint),
57    Old(JournalCheckpoint),
58    Both(/* old: */ JournalCheckpoint, /* current: */ JournalCheckpoint),
59}
60
61impl Checkpoints {
62    // Returns the earliest checkpoint (which will always be the old one if present).
63    fn earliest(&self) -> &JournalCheckpoint {
64        match self {
65            Checkpoints::Old(x) | Checkpoints::Both(x, _) | Checkpoints::Current(x) => x,
66        }
67    }
68}
69
70// We currently maintain strong references to all stores that have been opened, but there's no
71// currently no mechanism for releasing stores that aren't being used.
72struct Inner {
73    stores: HashMap<u64, Arc<ObjectStore>>,
74    root_parent_store_object_id: u64,
75    root_store_object_id: u64,
76    allocator_object_id: u64,
77    allocator: Option<Arc<Allocator>>,
78
79    // Records dependencies on the journal for objects i.e. an entry for object ID 1, would mean it
80    // has a dependency on journal records from that offset.
81    journal_checkpoints: HashMap<u64, Checkpoints>,
82
83    // Mappings from object-id to a target reservation amount.  The object IDs here are from the
84    // root store namespace, so it can be associated with any object in the root store.  A
85    // reservation will be made to cover the *maximum* in this map, since it is assumed that any
86    // requirement is only temporary, for the duration of a compaction, and that once compaction has
87    // finished for a particular object, the space will be recovered.
88    reservations: HashMap<u64, u64>,
89
90    // The last journal end offset for a transaction that has been applied.  This is not necessarily
91    // the same as the start offset for the next transaction because of padding.
92    last_end_offset: u64,
93
94    // A running counter that tracks metadata space that has been borrowed on the understanding that
95    // eventually it will be recovered (potentially after a full compaction).
96    borrowed_metadata_space: u64,
97
98    // The maximum transaction size that has been encountered so far.
99    max_transaction_size: (u64, UintProperty),
100
101    // Extra temporary space that might be tied up in the journal that hasn't yet been deallocated.
102    reserved_space: u64,
103}
104
105impl Inner {
106    fn earliest_journal_offset(&self) -> Option<u64> {
107        self.journal_checkpoints.values().map(|c| c.earliest().file_offset).min()
108    }
109
110    // Returns the required size of the metadata reservation assuming that no space has been
111    // borrowed.  The invariant is: reservation-size + borrowed-space = required.
112    fn required_reservation(&self) -> u64 {
113        // Start with the maximum amount of temporary space we might need during compactions.
114        self.reservations.values().max().unwrap_or(&0)
115
116        // Account for data that has been written to the journal that will need to be written
117        // to layer files when flushed.
118            + self.earliest_journal_offset()
119            .map(|min| reserved_space_from_journal_usage(self.last_end_offset - min))
120            .unwrap_or(0)
121
122        // Extra reserved space
123            + self.reserved_space
124    }
125
126    fn object(&self, object_id: u64) -> Option<Arc<dyn JournalingObject>> {
127        if object_id == self.allocator_object_id {
128            Some(self.allocator.clone().unwrap() as Arc<dyn JournalingObject>)
129        } else {
130            self.stores.get(&object_id).map(|x| x.clone() as Arc<dyn JournalingObject>)
131        }
132    }
133}
134
135impl ObjectManager {
136    pub fn new(on_new_store: Option<Box<dyn Fn(&ObjectStore) + Send + Sync>>) -> ObjectManager {
137        ObjectManager {
138            inner: RwLock::new(Inner {
139                stores: HashMap::default(),
140                root_parent_store_object_id: INVALID_OBJECT_ID,
141                root_store_object_id: INVALID_OBJECT_ID,
142                allocator_object_id: INVALID_OBJECT_ID,
143                allocator: None,
144                journal_checkpoints: HashMap::default(),
145                reservations: HashMap::default(),
146                last_end_offset: 0,
147                borrowed_metadata_space: 0,
148                max_transaction_size: (0, metrics::detail().create_uint("max_transaction_size", 0)),
149                reserved_space: journal::RESERVED_SPACE,
150            }),
151            metadata_reservation: OnceCell::new(),
152            volume_directory: OnceCell::new(),
153            on_new_store,
154        }
155    }
156
157    pub fn required_reservation(&self) -> u64 {
158        self.inner.read().required_reservation()
159    }
160
161    pub fn root_parent_store_object_id(&self) -> u64 {
162        self.inner.read().root_parent_store_object_id
163    }
164
165    pub fn root_parent_store(&self) -> Arc<ObjectStore> {
166        let inner = self.inner.read();
167        inner.stores.get(&inner.root_parent_store_object_id).unwrap().clone()
168    }
169
170    pub fn set_root_parent_store(&self, store: Arc<ObjectStore>) {
171        if let Some(on_new_store) = &self.on_new_store {
172            on_new_store(&store);
173        }
174        let mut inner = self.inner.write();
175        let store_id = store.store_object_id();
176        inner.stores.insert(store_id, store);
177        inner.root_parent_store_object_id = store_id;
178    }
179
180    pub fn root_store_object_id(&self) -> u64 {
181        self.inner.read().root_store_object_id
182    }
183
184    pub fn root_store(&self) -> Arc<ObjectStore> {
185        let inner = self.inner.read();
186        inner.stores.get(&inner.root_store_object_id).unwrap().clone()
187    }
188
189    pub fn set_root_store(&self, store: Arc<ObjectStore>) {
190        if let Some(on_new_store) = &self.on_new_store {
191            on_new_store(&store);
192        }
193        let mut inner = self.inner.write();
194        let store_id = store.store_object_id();
195        inner.stores.insert(store_id, store);
196        inner.root_store_object_id = store_id;
197    }
198
199    pub fn is_system_store(&self, store_id: u64) -> bool {
200        let inner = self.inner.read();
201        store_id == inner.root_store_object_id || store_id == inner.root_parent_store_object_id
202    }
203
204    /// Returns the store which might or might not be locked.
205    pub fn store(&self, store_object_id: u64) -> Option<Arc<ObjectStore>> {
206        self.inner.read().stores.get(&store_object_id).cloned()
207    }
208
209    /// This is not thread-safe: it assumes that a store won't be forgotten whilst the loop is
210    /// running.  This is to be used after replaying the journal.
211    pub async fn on_replay_complete(&self) -> Result<(), Error> {
212        let root_store = self.root_store();
213
214        let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
215            .await
216            .context("Unable to open root volume directory")?;
217
218        match root_directory.lookup(VOLUMES_DIRECTORY).await? {
219            None => bail!("Root directory not found"),
220            Some((object_id, ObjectDescriptor::Directory, _)) => {
221                let volume_directory = Directory::open(&root_store, object_id)
222                    .await
223                    .context("Unable to open volumes directory")?;
224                self.volume_directory.set(volume_directory).unwrap();
225            }
226            _ => {
227                bail!(
228                    anyhow!(FxfsError::Inconsistent)
229                        .context("Unexpected type for volumes directory")
230                )
231            }
232        }
233
234        let object_ids = list_volumes(self.volume_directory.get().unwrap())
235            .await
236            .context("Failed to list volumes")?;
237
238        for store_id in object_ids {
239            self.open_store(&root_store, store_id).await?;
240        }
241
242        // This can fail if a filesystem is created and truncated to a size
243        // that doesn't leave enough free space for metadata reservations.
244        self.init_metadata_reservation()
245            .context("Insufficient free space for metadata reservation.")?;
246
247        Ok(())
248    }
249
250    pub fn volume_directory(&self) -> &Directory<ObjectStore> {
251        self.volume_directory.get().unwrap()
252    }
253
254    pub fn set_volume_directory(&self, volume_directory: Directory<ObjectStore>) {
255        self.volume_directory.set(volume_directory).unwrap();
256    }
257
258    pub fn add_store(&self, store: Arc<ObjectStore>) {
259        if let Some(on_new_store) = &self.on_new_store {
260            on_new_store(&store);
261        }
262        let mut inner = self.inner.write();
263        let store_object_id = store.store_object_id();
264        assert_ne!(store_object_id, inner.root_parent_store_object_id);
265        assert_ne!(store_object_id, inner.root_store_object_id);
266        assert_ne!(store_object_id, inner.allocator_object_id);
267        inner.stores.insert(store_object_id, store);
268    }
269
270    pub fn forget_store(&self, store_object_id: u64) {
271        let mut inner = self.inner.write();
272        assert_ne!(store_object_id, inner.allocator_object_id);
273        inner.stores.remove(&store_object_id);
274        inner.reservations.remove(&store_object_id);
275    }
276
277    pub fn set_allocator(&self, allocator: Arc<Allocator>) {
278        let mut inner = self.inner.write();
279        assert!(!inner.stores.contains_key(&allocator.object_id()));
280        inner.allocator_object_id = allocator.object_id();
281        inner.allocator = Some(allocator);
282    }
283
284    pub fn allocator(&self) -> Arc<Allocator> {
285        self.inner.read().allocator.clone().unwrap()
286    }
287
288    /// Applies `mutation` to `object` with `context`.
289    pub fn apply_mutation(
290        &self,
291        object_id: u64,
292        mutation: Mutation,
293        context: &ApplyContext<'_, '_>,
294        associated_object: AssocObj<'_>,
295    ) -> Result<(), Error> {
296        debug!(oid = object_id, mutation:?; "applying mutation");
297        let object = {
298            let mut inner = self.inner.write();
299            match mutation {
300                Mutation::BeginFlush => {
301                    if let Some(entry) = inner.journal_checkpoints.get_mut(&object_id) {
302                        match entry {
303                            Checkpoints::Current(x) | Checkpoints::Both(x, _) => {
304                                *entry = Checkpoints::Old(x.clone());
305                            }
306                            _ => {}
307                        }
308                    }
309                }
310                Mutation::EndFlush => {
311                    if let Entry::Occupied(mut o) = inner.journal_checkpoints.entry(object_id) {
312                        let entry = o.get_mut();
313                        match entry {
314                            Checkpoints::Old(_) => {
315                                o.remove();
316                            }
317                            Checkpoints::Both(_, x) => {
318                                *entry = Checkpoints::Current(x.clone());
319                            }
320                            _ => {}
321                        }
322                    }
323                }
324                Mutation::DeleteVolume => {
325                    if let Some(store) = inner.stores.remove(&object_id) {
326                        store.mark_deleted();
327                    }
328                    inner.reservations.remove(&object_id);
329                    inner.journal_checkpoints.remove(&object_id);
330                    return Ok(());
331                }
332                _ => {
333                    if object_id != inner.root_parent_store_object_id {
334                        inner
335                            .journal_checkpoints
336                            .entry(object_id)
337                            .and_modify(|entry| {
338                                if let Checkpoints::Old(x) = entry {
339                                    *entry =
340                                        Checkpoints::Both(x.clone(), context.checkpoint.clone());
341                                }
342                            })
343                            .or_insert_with(|| Checkpoints::Current(context.checkpoint.clone()));
344                    }
345                }
346            }
347            if object_id == inner.allocator_object_id {
348                inner.allocator.clone().unwrap() as Arc<dyn JournalingObject>
349            } else {
350                inner.stores.get(&object_id).unwrap().clone() as Arc<dyn JournalingObject>
351            }
352        };
353        associated_object.map(|o| o.will_apply_mutation(&mutation, object_id, self));
354        object.apply_mutation(mutation, context, associated_object)
355    }
356
357    /// Replays `mutations` for a single transaction.  `journal_offsets` contains the per-object
358    /// starting offsets; if the current transaction offset precedes an offset, the mutations for
359    /// that object are ignored.  `context` contains the location in the journal file for this
360    /// transaction and `end_offset` is the ending journal offset for this transaction.
361    pub async fn replay_mutations(
362        &self,
363        mutations: Vec<(u64, Mutation)>,
364        journal_offsets: &HashMap<u64, u64>,
365        context: &ApplyContext<'_, '_>,
366        end_offset: u64,
367    ) -> Result<(), Error> {
368        debug!(checkpoint = context.checkpoint.file_offset; "REPLAY");
369        let txn_size = {
370            let mut inner = self.inner.write();
371            if end_offset > inner.last_end_offset {
372                Some(end_offset - std::mem::replace(&mut inner.last_end_offset, end_offset))
373            } else {
374                None
375            }
376        };
377
378        let allocator_object_id = self.inner.read().allocator_object_id;
379
380        for (object_id, mutation) in mutations {
381            if let Mutation::UpdateBorrowed(borrowed) = mutation {
382                if let Some(txn_size) = txn_size {
383                    self.inner.write().borrowed_metadata_space = borrowed
384                        .checked_add(reserved_space_from_journal_usage(txn_size))
385                        .ok_or(FxfsError::Inconsistent)?;
386                }
387                continue;
388            }
389
390            // Don't replay mutations if the object doesn't want it.
391            if let Some(&offset) = journal_offsets.get(&object_id) {
392                if context.checkpoint.file_offset < offset {
393                    continue;
394                }
395            }
396
397            // If this is the first time we've encountered this store, we'll need to open it.
398            if object_id != allocator_object_id {
399                self.open_store(&self.root_store(), object_id).await?;
400            }
401
402            self.apply_mutation(object_id, mutation, context, AssocObj::None)?;
403        }
404        Ok(())
405    }
406
407    /// Opens the specified store if it isn't already.  This is *not* thread-safe.
408    async fn open_store(&self, parent: &Arc<ObjectStore>, object_id: u64) -> Result<(), Error> {
409        if self.inner.read().stores.contains_key(&object_id) {
410            return Ok(());
411        }
412        let store = ObjectStore::open(parent, object_id, Box::new(TreeCache::new()))
413            .await
414            .with_context(|| format!("Failed to open store {object_id}"))?;
415        if let Some(on_new_store) = &self.on_new_store {
416            on_new_store(&store);
417        }
418        assert!(self.inner.write().stores.insert(object_id, store).is_none());
419        Ok(())
420    }
421
422    /// Called by the journaling system to apply a transaction.  `checkpoint` indicates the location
423    /// in the journal file for this transaction.  Returns an optional mutation to be written to be
424    /// included with the transaction.
425    pub fn apply_transaction(
426        &self,
427        transaction: &mut Transaction<'_>,
428        checkpoint: &JournalCheckpoint,
429    ) -> Result<Option<Mutation>, Error> {
430        // Record old values so we can see what changes as a result of this transaction.
431        let old_amount = self.metadata_reservation().amount();
432        let old_required = self.inner.read().required_reservation();
433
434        debug!(checkpoint = checkpoint.file_offset; "BEGIN TXN");
435        let mutations = transaction.take_mutations();
436        let context =
437            ApplyContext { mode: ApplyMode::Live(transaction), checkpoint: checkpoint.clone() };
438        for TxnMutation { object_id, mutation, associated_object, .. } in mutations {
439            self.apply_mutation(object_id, mutation, &context, associated_object)?;
440        }
441        debug!("END TXN");
442
443        Ok(if let MetadataReservation::Borrowed = transaction.metadata_reservation {
444            // If this transaction is borrowing metadata, figure out what has changed and return a
445            // mutation with the updated value for borrowed.  The transaction might have allocated
446            // or deallocated some data from the metadata reservation, or it might have made a
447            // change that means we need to reserve more or less space (e.g. we compacted).
448            let new_amount = self.metadata_reservation().amount();
449            let mut inner = self.inner.write();
450            let new_required = inner.required_reservation();
451            let add = old_amount + new_required;
452            let sub = new_amount + old_required;
453            if add >= sub {
454                inner.borrowed_metadata_space += add - sub;
455            } else {
456                inner.borrowed_metadata_space =
457                    inner.borrowed_metadata_space.saturating_sub(sub - add);
458            }
459            Some(Mutation::UpdateBorrowed(inner.borrowed_metadata_space))
460        } else {
461            // This transaction should have had no impact on the metadata reservation or the amount
462            // we need to reserve.
463            debug_assert_eq!(self.metadata_reservation().amount(), old_amount);
464            debug_assert_eq!(self.inner.read().required_reservation(), old_required);
465            None
466        })
467    }
468
469    /// Called by the journaling system after a transaction has been written providing the end
470    /// offset for the transaction so that we can adjust borrowed metadata space accordingly.
471    pub fn did_commit_transaction(
472        &self,
473        transaction: &mut Transaction<'_>,
474        _checkpoint: &JournalCheckpoint,
475        end_offset: u64,
476    ) {
477        let reservation = self.metadata_reservation();
478        let mut inner = self.inner.write();
479        let journal_usage = end_offset - std::mem::replace(&mut inner.last_end_offset, end_offset);
480
481        if journal_usage > inner.max_transaction_size.0 {
482            inner.max_transaction_size.0 = journal_usage;
483            inner.max_transaction_size.1.set(journal_usage);
484        }
485
486        let txn_space = reserved_space_from_journal_usage(journal_usage);
487        match &mut transaction.metadata_reservation {
488            MetadataReservation::None => unreachable!(),
489            MetadataReservation::Borrowed => {
490                // Account for the amount we need to borrow for the transaction itself now that we
491                // know the transaction size.
492                inner.borrowed_metadata_space += txn_space;
493
494                // This transaction borrowed metadata space, but it might have returned space to the
495                // transaction that we can now give back to the allocator.
496                let to_give_back = (reservation.amount() + inner.borrowed_metadata_space)
497                    .saturating_sub(inner.required_reservation());
498                if to_give_back > 0 {
499                    reservation.give_back(to_give_back);
500                }
501            }
502            MetadataReservation::Hold(hold_amount) => {
503                // Transfer reserved space into the metadata reservation.
504                let txn_reservation = transaction.allocator_reservation.unwrap();
505                assert_ne!(
506                    txn_reservation as *const _, reservation as *const _,
507                    "MetadataReservation::Borrowed should be used."
508                );
509                txn_reservation.commit(txn_space);
510                if txn_reservation.owner_object_id() != reservation.owner_object_id() {
511                    assert_eq!(
512                        reservation.owner_object_id(),
513                        None,
514                        "Should not be mixing attributed owners."
515                    );
516                    inner
517                        .allocator
518                        .as_ref()
519                        .unwrap()
520                        .disown_reservation(txn_reservation.owner_object_id(), txn_space);
521                }
522                if let Some(amount) = hold_amount.checked_sub(txn_space) {
523                    *hold_amount = amount;
524                } else {
525                    panic!("Transaction was larger than metadata reservation");
526                }
527                reservation.add(txn_space);
528            }
529            MetadataReservation::Reservation(txn_reservation) => {
530                // Transfer reserved space into the metadata reservation.
531                txn_reservation.move_to(reservation, txn_space);
532            }
533        }
534        // Check that our invariant holds true.
535        debug_assert_eq!(
536            reservation.amount() + inner.borrowed_metadata_space,
537            inner.required_reservation(),
538            "txn_space: {}, reservation_amount: {}, borrowed: {}, required: {}",
539            txn_space,
540            reservation.amount(),
541            inner.borrowed_metadata_space,
542            inner.required_reservation(),
543        );
544    }
545
546    /// Drops a transaction.  This is called automatically when a transaction is dropped.  If the
547    /// transaction has been committed, it should contain no mutations and so nothing will get rolled
548    /// back.  For each mutation, drop_mutation is called to allow for roll back (e.g. the allocator
549    /// will unreserve allocations).
550    pub fn drop_transaction(&self, transaction: &mut Transaction<'_>) {
551        for TxnMutation { object_id, mutation, .. } in transaction.take_mutations() {
552            self.object(object_id).map(|o| o.drop_mutation(mutation, transaction));
553        }
554    }
555
556    /// Returns the journal file offsets that each object depends on and the checkpoint for the
557    /// minimum offset.
558    pub fn journal_file_offsets(&self) -> (HashMap<u64, u64>, Option<JournalCheckpoint>) {
559        let inner = self.inner.read();
560        let mut min_checkpoint = None;
561        let mut offsets = HashMap::default();
562        for (&object_id, checkpoint) in &inner.journal_checkpoints {
563            let checkpoint = checkpoint.earliest();
564            match &mut min_checkpoint {
565                None => min_checkpoint = Some(checkpoint),
566                Some(min_checkpoint) => {
567                    if checkpoint.file_offset < min_checkpoint.file_offset {
568                        *min_checkpoint = checkpoint;
569                    }
570                }
571            }
572            offsets.insert(object_id, checkpoint.file_offset);
573        }
574        (offsets, min_checkpoint.cloned())
575    }
576
577    /// Returns the checkpoint into the journal that the object depends on, or None if the object
578    /// has no journaled updates.
579    pub fn journal_checkpoint(&self, object_id: u64) -> Option<JournalCheckpoint> {
580        self.inner
581            .read()
582            .journal_checkpoints
583            .get(&object_id)
584            .map(|checkpoints| checkpoints.earliest().clone())
585    }
586
587    /// Returns true if the object identified by `object_id` is known to have updates recorded in
588    /// the journal that the object depends upon.
589    pub fn needs_flush(&self, object_id: u64) -> bool {
590        self.inner.read().journal_checkpoints.contains_key(&object_id)
591    }
592
593    /// Flushes all known objects.  This will then allow the journal space to be freed.
594    ///
595    /// Also returns the earliest known version of a struct on the filesystem.
596    pub async fn flush(&self) -> Result<Version, Error> {
597        let objects = {
598            let inner = self.inner.read();
599            let mut object_ids = inner.journal_checkpoints.keys().cloned().collect::<Vec<_>>();
600            // Process objects in reverse sorted order because that will mean we compact the root
601            // object store last which will ensure we include the metadata from the compactions of
602            // other objects.
603            object_ids.sort_unstable();
604            object_ids
605                .iter()
606                .rev()
607                .map(|oid| (*oid, inner.object(*oid).unwrap()))
608                .collect::<Vec<_>>()
609        };
610
611        // As we iterate, keep track of the earliest version used by structs in these objects
612        let mut earliest_version: Version = LATEST_VERSION;
613        for (object_id, object) in objects {
614            let object_earliest_version =
615                object.flush().await.with_context(|| format!("Failed to flush oid {object_id}"))?;
616            if object_earliest_version < earliest_version {
617                earliest_version = object_earliest_version;
618            }
619        }
620
621        Ok(earliest_version)
622    }
623
624    fn object(&self, object_id: u64) -> Option<Arc<dyn JournalingObject>> {
625        self.inner.read().object(object_id)
626    }
627
628    pub fn init_metadata_reservation(&self) -> Result<(), Error> {
629        let inner = self.inner.read();
630        let required = inner.required_reservation();
631        ensure!(required >= inner.borrowed_metadata_space, FxfsError::Inconsistent);
632        let allocator = inner.allocator.as_ref().cloned().unwrap();
633        self.metadata_reservation
634            .set(
635                allocator
636                    .clone()
637                    .reserve(None, inner.required_reservation() - inner.borrowed_metadata_space)
638                    .with_context(|| {
639                        format!(
640                            "Failed to reserve {} - {} = {} bytes, free={}, \
641                             owner_bytes={}",
642                            inner.required_reservation(),
643                            inner.borrowed_metadata_space,
644                            inner.required_reservation() - inner.borrowed_metadata_space,
645                            Saturating(allocator.get_disk_bytes()) - allocator.get_used_bytes(),
646                            allocator.owner_bytes_debug(),
647                        )
648                    })?,
649            )
650            .unwrap();
651        Ok(())
652    }
653
654    pub fn metadata_reservation(&self) -> &Reservation {
655        self.metadata_reservation.get().unwrap()
656    }
657
658    pub fn update_reservation(&self, object_id: u64, amount: u64) {
659        self.inner.write().reservations.insert(object_id, amount);
660    }
661
662    pub fn reservation(&self, object_id: u64) -> Option<u64> {
663        self.inner.read().reservations.get(&object_id).cloned()
664    }
665
666    pub fn set_reserved_space(&self, amount: u64) {
667        self.inner.write().reserved_space = amount;
668    }
669
670    pub fn last_end_offset(&self) -> u64 {
671        self.inner.read().last_end_offset
672    }
673
674    pub fn set_last_end_offset(&self, v: u64) {
675        self.inner.write().last_end_offset = v;
676    }
677
678    pub fn borrowed_metadata_space(&self) -> u64 {
679        self.inner.read().borrowed_metadata_space
680    }
681
682    pub fn set_borrowed_metadata_space(&self, v: u64) {
683        self.inner.write().borrowed_metadata_space = v;
684    }
685
686    pub fn write_mutation(&self, object_id: u64, mutation: &Mutation, writer: journal::Writer<'_>) {
687        self.object(object_id).unwrap().write_mutation(mutation, writer);
688    }
689
690    pub fn unlocked_stores(&self) -> Vec<Arc<ObjectStore>> {
691        let inner = self.inner.read();
692        let mut stores = Vec::new();
693        for store in inner.stores.values() {
694            if !store.is_locked() {
695                stores.push(store.clone());
696            }
697        }
698        stores
699    }
700
701    /// Creates a lazy inspect node named `str` under `parent` which will yield statistics for the
702    /// object manager when queried.
703    pub fn track_statistics(self: &Arc<Self>, parent: &fuchsia_inspect::Node, name: &str) {
704        let this = Arc::downgrade(self);
705        parent.record_lazy_child(name, move || {
706            let this_clone = this.clone();
707            async move {
708                let inspector = fuchsia_inspect::Inspector::default();
709                if let Some(this) = this_clone.upgrade() {
710                    let (required, borrowed, earliest_checkpoint) = {
711                        // TODO(https://fxbug.dev/42069513): Push-back or rate-limit to prevent DoS.
712                        let inner = this.inner.read();
713                        (
714                            inner.required_reservation(),
715                            inner.borrowed_metadata_space,
716                            inner.earliest_journal_offset(),
717                        )
718                    };
719                    let root = inspector.root();
720                    root.record_uint("metadata_reservation", this.metadata_reservation().amount());
721                    root.record_uint("required_reservation", required);
722                    root.record_uint("borrowed_reservation", borrowed);
723                    if let Some(earliest_checkpoint) = earliest_checkpoint {
724                        root.record_uint("earliest_checkpoint", earliest_checkpoint);
725                    }
726
727                    // TODO(https://fxbug.dev/42068224): Post-compute rather than manually computing metrics.
728                    if let Some(x) = round_div(100 * borrowed, required) {
729                        root.record_uint("borrowed_to_required_reservation_percent", x);
730                    }
731                }
732                Ok(inspector)
733            }
734            .boxed()
735        });
736    }
737
738    /// Normally, we make new transactions pay for overheads incurred by the journal, such as
739    /// checksums and padding, but if the journal has discarded a significant amount after a replay,
740    /// we run the risk of there not being enough reserved.  To handle this, if the amount is
741    /// significant, we force the journal to borrow the space (using a journal created transaction).
742    pub fn needs_borrow_for_journal(&self, checkpoint: u64) -> bool {
743        checkpoint.checked_sub(self.inner.read().last_end_offset).unwrap() > 256
744    }
745}
746
747/// ReservationUpdate is an associated object that sets the amount reserved for an object
748/// (overwriting any previous amount). Updates must be applied as part of a transaction before
749/// did_commit_transaction runs because it will reconcile the accounting for reserved metadata
750/// space.
751pub struct ReservationUpdate(u64);
752
753impl ReservationUpdate {
754    pub fn new(amount: u64) -> Self {
755        Self(amount)
756    }
757}
758
759impl AssociatedObject for ReservationUpdate {
760    fn will_apply_mutation(&self, _mutation: &Mutation, object_id: u64, manager: &ObjectManager) {
761        manager.update_reservation(object_id, self.0);
762    }
763}