fxfs/object_store/
object_manager.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::errors::FxfsError;
6use crate::filesystem::{ApplyContext, ApplyMode, JournalingObject};
7use crate::log::*;
8use crate::metrics;
9use crate::object_handle::INVALID_OBJECT_ID;
10use crate::object_store::allocator::{Allocator, Reservation};
11use crate::object_store::directory::Directory;
12use crate::object_store::journal::{self, JournalCheckpoint};
13use crate::object_store::transaction::{
14    AssocObj, AssociatedObject, MetadataReservation, Mutation, Transaction, TxnMutation,
15};
16use crate::object_store::tree_cache::TreeCache;
17use crate::object_store::volume::{VOLUMES_DIRECTORY, list_volumes};
18use crate::object_store::{ObjectDescriptor, ObjectStore};
19use crate::round::round_div;
20use crate::serialized_types::{LATEST_VERSION, Version};
21use anyhow::{Context, Error, anyhow, bail, ensure};
22use fuchsia_inspect::{Property as _, UintProperty};
23use fuchsia_sync::RwLock;
24use futures::FutureExt as _;
25use rustc_hash::FxHashMap as HashMap;
26use std::collections::hash_map::Entry;
27use std::num::Saturating;
28use std::sync::{Arc, OnceLock};
29
30// Data written to the journal eventually needs to be flushed somewhere (typically into layer
31// files).  Here we conservatively assume that could take up to four times as much space as it does
32// in the journal.  In the layer file, it'll take up at least as much, but we must reserve the same
33// again that so that there's enough space for compactions, and then we need some spare for
34// overheads.
35//
36// TODO(https://fxbug.dev/42178158): We should come up with a better way of determining what the multiplier
37// should be here.  2x was too low, as it didn't cover any space for metadata.  4x might be too
38// much.
39pub const fn reserved_space_from_journal_usage(journal_usage: u64) -> u64 {
40    journal_usage * 4
41}
42
43/// ObjectManager is a global loading cache for object stores and other special objects.
44pub struct ObjectManager {
45    inner: RwLock<Inner>,
46    metadata_reservation: OnceLock<Reservation>,
47    volume_directory: OnceLock<Directory<ObjectStore>>,
48    on_new_store: Option<Box<dyn Fn(&ObjectStore) + Send + Sync>>,
49}
50
51// Whilst we are flushing we need to keep track of the old checkpoint that we are hoping to flush,
52// and a new one that should apply if we successfully finish the flush.
53#[derive(Debug)]
54enum Checkpoints {
55    Current(JournalCheckpoint),
56    Old(JournalCheckpoint),
57    Both(/* old: */ JournalCheckpoint, /* current: */ JournalCheckpoint),
58}
59
60impl Checkpoints {
61    // Returns the earliest checkpoint (which will always be the old one if present).
62    fn earliest(&self) -> &JournalCheckpoint {
63        match self {
64            Checkpoints::Old(x) | Checkpoints::Both(x, _) | Checkpoints::Current(x) => x,
65        }
66    }
67}
68
69// We currently maintain strong references to all stores that have been opened, but there's no
70// currently no mechanism for releasing stores that aren't being used.
71struct Inner {
72    stores: HashMap<u64, Arc<ObjectStore>>,
73    root_parent_store_object_id: u64,
74    root_store_object_id: u64,
75    allocator_object_id: u64,
76    allocator: Option<Arc<Allocator>>,
77
78    // Records dependencies on the journal for objects i.e. an entry for object ID 1, would mean it
79    // has a dependency on journal records from that offset.
80    journal_checkpoints: HashMap<u64, Checkpoints>,
81
82    // Mappings from object-id to a target reservation amount.  The object IDs here are from the
83    // root store namespace, so it can be associated with any object in the root store.  A
84    // reservation will be made to cover the *maximum* in this map, since it is assumed that any
85    // requirement is only temporary, for the duration of a compaction, and that once compaction has
86    // finished for a particular object, the space will be recovered.
87    reservations: HashMap<u64, u64>,
88
89    // The last journal end offset for a transaction that has been applied.  This is not necessarily
90    // the same as the start offset for the next transaction because of padding.
91    last_end_offset: u64,
92
93    // A running counter that tracks metadata space that has been borrowed on the understanding that
94    // eventually it will be recovered (potentially after a full compaction).
95    borrowed_metadata_space: u64,
96
97    // The maximum transaction size that has been encountered so far.
98    max_transaction_size: (u64, UintProperty),
99
100    // Extra temporary space that might be tied up in the journal that hasn't yet been deallocated.
101    reserved_space: u64,
102}
103
104impl Inner {
105    fn earliest_journal_offset(&self) -> Option<u64> {
106        self.journal_checkpoints.values().map(|c| c.earliest().file_offset).min()
107    }
108
109    // Returns the required size of the metadata reservation assuming that no space has been
110    // borrowed.  The invariant is: reservation-size + borrowed-space = required.
111    fn required_reservation(&self) -> u64 {
112        // Start with the maximum amount of temporary space we might need during compactions.
113        self.reservations.values().max().unwrap_or(&0)
114
115        // Account for data that has been written to the journal that will need to be written
116        // to layer files when flushed.
117            + self.earliest_journal_offset()
118            .map(|min| reserved_space_from_journal_usage(self.last_end_offset - min))
119            .unwrap_or(0)
120
121        // Extra reserved space
122            + self.reserved_space
123    }
124
125    fn object(&self, object_id: u64) -> Option<Arc<dyn JournalingObject>> {
126        if object_id == self.allocator_object_id {
127            Some(self.allocator.clone().unwrap() as Arc<dyn JournalingObject>)
128        } else {
129            self.stores.get(&object_id).map(|x| x.clone() as Arc<dyn JournalingObject>)
130        }
131    }
132}
133
134impl ObjectManager {
135    pub fn new(on_new_store: Option<Box<dyn Fn(&ObjectStore) + Send + Sync>>) -> ObjectManager {
136        ObjectManager {
137            inner: RwLock::new(Inner {
138                stores: HashMap::default(),
139                root_parent_store_object_id: INVALID_OBJECT_ID,
140                root_store_object_id: INVALID_OBJECT_ID,
141                allocator_object_id: INVALID_OBJECT_ID,
142                allocator: None,
143                journal_checkpoints: HashMap::default(),
144                reservations: HashMap::default(),
145                last_end_offset: 0,
146                borrowed_metadata_space: 0,
147                max_transaction_size: (0, metrics::detail().create_uint("max_transaction_size", 0)),
148                reserved_space: journal::RESERVED_SPACE,
149            }),
150            metadata_reservation: OnceLock::new(),
151            volume_directory: OnceLock::new(),
152            on_new_store,
153        }
154    }
155
156    pub fn required_reservation(&self) -> u64 {
157        self.inner.read().required_reservation()
158    }
159
160    pub fn root_parent_store_object_id(&self) -> u64 {
161        self.inner.read().root_parent_store_object_id
162    }
163
164    pub fn root_parent_store(&self) -> Arc<ObjectStore> {
165        let inner = self.inner.read();
166        inner.stores.get(&inner.root_parent_store_object_id).unwrap().clone()
167    }
168
169    pub fn set_root_parent_store(&self, store: Arc<ObjectStore>) {
170        if let Some(on_new_store) = &self.on_new_store {
171            on_new_store(&store);
172        }
173        let mut inner = self.inner.write();
174        let store_id = store.store_object_id();
175        inner.stores.insert(store_id, store);
176        inner.root_parent_store_object_id = store_id;
177    }
178
179    pub fn root_store_object_id(&self) -> u64 {
180        self.inner.read().root_store_object_id
181    }
182
183    pub fn root_store(&self) -> Arc<ObjectStore> {
184        let inner = self.inner.read();
185        inner.stores.get(&inner.root_store_object_id).unwrap().clone()
186    }
187
188    pub fn set_root_store(&self, store: Arc<ObjectStore>) {
189        if let Some(on_new_store) = &self.on_new_store {
190            on_new_store(&store);
191        }
192        let mut inner = self.inner.write();
193        let store_id = store.store_object_id();
194        inner.stores.insert(store_id, store);
195        inner.root_store_object_id = store_id;
196    }
197
198    pub fn is_system_store(&self, store_id: u64) -> bool {
199        let inner = self.inner.read();
200        store_id == inner.root_store_object_id || store_id == inner.root_parent_store_object_id
201    }
202
203    /// Returns the store which might or might not be locked.
204    pub fn store(&self, store_object_id: u64) -> Option<Arc<ObjectStore>> {
205        self.inner.read().stores.get(&store_object_id).cloned()
206    }
207
208    /// This is not thread-safe: it assumes that a store won't be forgotten whilst the loop is
209    /// running.  This is to be used after replaying the journal.
210    pub async fn on_replay_complete(&self) -> Result<(), Error> {
211        let root_store = self.root_store();
212
213        let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
214            .await
215            .context("Unable to open root volume directory")?;
216
217        match root_directory.lookup(VOLUMES_DIRECTORY).await? {
218            None => bail!("Root directory not found"),
219            Some((object_id, ObjectDescriptor::Directory, _)) => {
220                let volume_directory = Directory::open(&root_store, object_id)
221                    .await
222                    .context("Unable to open volumes directory")?;
223                self.volume_directory.set(volume_directory).unwrap();
224            }
225            _ => {
226                bail!(
227                    anyhow!(FxfsError::Inconsistent)
228                        .context("Unexpected type for volumes directory")
229                )
230            }
231        }
232
233        let object_ids = list_volumes(self.volume_directory.get().unwrap())
234            .await
235            .context("Failed to list volumes")?;
236
237        for store_id in object_ids {
238            self.open_store(&root_store, store_id).await?;
239        }
240
241        // This can fail if a filesystem is created and truncated to a size
242        // that doesn't leave enough free space for metadata reservations.
243        self.init_metadata_reservation()
244            .context("Insufficient free space for metadata reservation.")?;
245
246        Ok(())
247    }
248
249    pub fn volume_directory(&self) -> &Directory<ObjectStore> {
250        self.volume_directory.get().unwrap()
251    }
252
253    pub fn set_volume_directory(&self, volume_directory: Directory<ObjectStore>) {
254        self.volume_directory.set(volume_directory).unwrap();
255    }
256
257    pub fn add_store(&self, store: Arc<ObjectStore>) {
258        if let Some(on_new_store) = &self.on_new_store {
259            on_new_store(&store);
260        }
261        let mut inner = self.inner.write();
262        let store_object_id = store.store_object_id();
263        assert_ne!(store_object_id, inner.root_parent_store_object_id);
264        assert_ne!(store_object_id, inner.root_store_object_id);
265        assert_ne!(store_object_id, inner.allocator_object_id);
266        inner.stores.insert(store_object_id, store);
267    }
268
269    pub fn forget_store(&self, store_object_id: u64) {
270        let mut inner = self.inner.write();
271        assert_ne!(store_object_id, inner.allocator_object_id);
272        inner.stores.remove(&store_object_id);
273        inner.reservations.remove(&store_object_id);
274    }
275
276    pub fn set_allocator(&self, allocator: Arc<Allocator>) {
277        let mut inner = self.inner.write();
278        assert!(!inner.stores.contains_key(&allocator.object_id()));
279        inner.allocator_object_id = allocator.object_id();
280        inner.allocator = Some(allocator);
281    }
282
283    pub fn allocator(&self) -> Arc<Allocator> {
284        self.inner.read().allocator.clone().unwrap()
285    }
286
287    /// Applies `mutation` to `object` with `context`.
288    pub fn apply_mutation(
289        &self,
290        object_id: u64,
291        mutation: Mutation,
292        context: &ApplyContext<'_, '_>,
293        associated_object: AssocObj<'_>,
294    ) -> Result<(), Error> {
295        debug!(oid = object_id, mutation:?; "applying mutation");
296        let object = {
297            let mut inner = self.inner.write();
298            match mutation {
299                Mutation::BeginFlush => {
300                    if let Some(entry) = inner.journal_checkpoints.get_mut(&object_id) {
301                        match entry {
302                            Checkpoints::Current(x) | Checkpoints::Both(x, _) => {
303                                *entry = Checkpoints::Old(x.clone());
304                            }
305                            _ => {}
306                        }
307                    }
308                }
309                Mutation::EndFlush => {
310                    if let Entry::Occupied(mut o) = inner.journal_checkpoints.entry(object_id) {
311                        let entry = o.get_mut();
312                        match entry {
313                            Checkpoints::Old(_) => {
314                                o.remove();
315                            }
316                            Checkpoints::Both(_, x) => {
317                                *entry = Checkpoints::Current(x.clone());
318                            }
319                            _ => {}
320                        }
321                    }
322                }
323                Mutation::DeleteVolume => {
324                    if let Some(store) = inner.stores.remove(&object_id) {
325                        store.mark_deleted();
326                    }
327                    inner.reservations.remove(&object_id);
328                    inner.journal_checkpoints.remove(&object_id);
329                    return Ok(());
330                }
331                _ => {
332                    if object_id != inner.root_parent_store_object_id {
333                        inner
334                            .journal_checkpoints
335                            .entry(object_id)
336                            .and_modify(|entry| {
337                                if let Checkpoints::Old(x) = entry {
338                                    *entry =
339                                        Checkpoints::Both(x.clone(), context.checkpoint.clone());
340                                }
341                            })
342                            .or_insert_with(|| Checkpoints::Current(context.checkpoint.clone()));
343                    }
344                }
345            }
346            if object_id == inner.allocator_object_id {
347                inner.allocator.clone().unwrap() as Arc<dyn JournalingObject>
348            } else {
349                inner.stores.get(&object_id).unwrap().clone() as Arc<dyn JournalingObject>
350            }
351        };
352        associated_object.map(|o| o.will_apply_mutation(&mutation, object_id, self));
353        object.apply_mutation(mutation, context, associated_object)
354    }
355
356    /// Replays `mutations` for a single transaction.  `journal_offsets` contains the per-object
357    /// starting offsets; if the current transaction offset precedes an offset, the mutations for
358    /// that object are ignored.  `context` contains the location in the journal file for this
359    /// transaction and `end_offset` is the ending journal offset for this transaction.
360    pub async fn replay_mutations(
361        &self,
362        mutations: Vec<(u64, Mutation)>,
363        journal_offsets: &HashMap<u64, u64>,
364        context: &ApplyContext<'_, '_>,
365        end_offset: u64,
366    ) -> Result<(), Error> {
367        debug!(checkpoint = context.checkpoint.file_offset; "REPLAY");
368        let txn_size = {
369            let mut inner = self.inner.write();
370            if end_offset > inner.last_end_offset {
371                Some(end_offset - std::mem::replace(&mut inner.last_end_offset, end_offset))
372            } else {
373                None
374            }
375        };
376
377        let allocator_object_id = self.inner.read().allocator_object_id;
378
379        for (object_id, mutation) in mutations {
380            if let Mutation::UpdateBorrowed(borrowed) = mutation {
381                if let Some(txn_size) = txn_size {
382                    self.inner.write().borrowed_metadata_space = borrowed
383                        .checked_add(reserved_space_from_journal_usage(txn_size))
384                        .ok_or(FxfsError::Inconsistent)?;
385                }
386                continue;
387            }
388
389            // Don't replay mutations if the object doesn't want it.
390            if let Some(&offset) = journal_offsets.get(&object_id) {
391                if context.checkpoint.file_offset < offset {
392                    continue;
393                }
394            }
395
396            // If this is the first time we've encountered this store, we'll need to open it.
397            if object_id != allocator_object_id {
398                self.open_store(&self.root_store(), object_id).await?;
399            }
400
401            self.apply_mutation(object_id, mutation, context, AssocObj::None)?;
402        }
403        Ok(())
404    }
405
406    /// Opens the specified store if it isn't already.  This is *not* thread-safe.
407    async fn open_store(&self, parent: &Arc<ObjectStore>, object_id: u64) -> Result<(), Error> {
408        if self.inner.read().stores.contains_key(&object_id) {
409            return Ok(());
410        }
411        let store = ObjectStore::open(parent, object_id, Box::new(TreeCache::new()))
412            .await
413            .with_context(|| format!("Failed to open store {object_id}"))?;
414        if let Some(on_new_store) = &self.on_new_store {
415            on_new_store(&store);
416        }
417        assert!(self.inner.write().stores.insert(object_id, store).is_none());
418        Ok(())
419    }
420
421    /// Called by the journaling system to apply a transaction.  `checkpoint` indicates the location
422    /// in the journal file for this transaction.  Returns an optional mutation to be written to be
423    /// included with the transaction.
424    pub fn apply_transaction(
425        &self,
426        transaction: &mut Transaction<'_>,
427        checkpoint: &JournalCheckpoint,
428    ) -> Result<Option<Mutation>, Error> {
429        // Record old values so we can see what changes as a result of this transaction.
430        let old_amount = self.metadata_reservation().amount();
431        let old_required = self.inner.read().required_reservation();
432
433        debug!(checkpoint = checkpoint.file_offset; "BEGIN TXN");
434        let mutations = transaction.take_mutations();
435        let context =
436            ApplyContext { mode: ApplyMode::Live(transaction), checkpoint: checkpoint.clone() };
437        for TxnMutation { object_id, mutation, associated_object, .. } in mutations {
438            self.apply_mutation(object_id, mutation, &context, associated_object)?;
439        }
440        debug!("END TXN");
441
442        Ok(if let MetadataReservation::Borrowed = transaction.metadata_reservation {
443            // If this transaction is borrowing metadata, figure out what has changed and return a
444            // mutation with the updated value for borrowed.  The transaction might have allocated
445            // or deallocated some data from the metadata reservation, or it might have made a
446            // change that means we need to reserve more or less space (e.g. we compacted).
447            let new_amount = self.metadata_reservation().amount();
448            let mut inner = self.inner.write();
449            let new_required = inner.required_reservation();
450            let add = old_amount + new_required;
451            let sub = new_amount + old_required;
452            if add >= sub {
453                inner.borrowed_metadata_space += add - sub;
454            } else {
455                inner.borrowed_metadata_space =
456                    inner.borrowed_metadata_space.saturating_sub(sub - add);
457            }
458            Some(Mutation::UpdateBorrowed(inner.borrowed_metadata_space))
459        } else {
460            // This transaction should have had no impact on the metadata reservation or the amount
461            // we need to reserve.
462            debug_assert_eq!(self.metadata_reservation().amount(), old_amount);
463            debug_assert_eq!(self.inner.read().required_reservation(), old_required);
464            None
465        })
466    }
467
468    /// Called by the journaling system after a transaction has been written providing the end
469    /// offset for the transaction so that we can adjust borrowed metadata space accordingly.
470    pub fn did_commit_transaction(
471        &self,
472        transaction: &mut Transaction<'_>,
473        _checkpoint: &JournalCheckpoint,
474        end_offset: u64,
475    ) {
476        let reservation = self.metadata_reservation();
477        let mut inner = self.inner.write();
478        let journal_usage = end_offset - std::mem::replace(&mut inner.last_end_offset, end_offset);
479
480        if journal_usage > inner.max_transaction_size.0 {
481            inner.max_transaction_size.0 = journal_usage;
482            inner.max_transaction_size.1.set(journal_usage);
483        }
484
485        let txn_space = reserved_space_from_journal_usage(journal_usage);
486        match &mut transaction.metadata_reservation {
487            MetadataReservation::None => unreachable!(),
488            MetadataReservation::Borrowed => {
489                // Account for the amount we need to borrow for the transaction itself now that we
490                // know the transaction size.
491                inner.borrowed_metadata_space += txn_space;
492
493                // This transaction borrowed metadata space, but it might have returned space to the
494                // transaction that we can now give back to the allocator.
495                let to_give_back = (reservation.amount() + inner.borrowed_metadata_space)
496                    .saturating_sub(inner.required_reservation());
497                if to_give_back > 0 {
498                    reservation.give_back(to_give_back);
499                }
500            }
501            MetadataReservation::Hold(hold_amount) => {
502                // Transfer reserved space into the metadata reservation.
503                let txn_reservation = transaction.allocator_reservation.unwrap();
504                assert_ne!(
505                    txn_reservation as *const _, reservation as *const _,
506                    "MetadataReservation::Borrowed should be used."
507                );
508                txn_reservation.commit(txn_space);
509                if txn_reservation.owner_object_id() != reservation.owner_object_id() {
510                    assert_eq!(
511                        reservation.owner_object_id(),
512                        None,
513                        "Should not be mixing attributed owners."
514                    );
515                    inner
516                        .allocator
517                        .as_ref()
518                        .unwrap()
519                        .disown_reservation(txn_reservation.owner_object_id(), txn_space);
520                }
521                if let Some(amount) = hold_amount.checked_sub(txn_space) {
522                    *hold_amount = amount;
523                } else {
524                    panic!("Transaction was larger than metadata reservation");
525                }
526                reservation.add(txn_space);
527            }
528            MetadataReservation::Reservation(txn_reservation) => {
529                // Transfer reserved space into the metadata reservation.
530                txn_reservation.move_to(reservation, txn_space);
531            }
532        }
533        // Check that our invariant holds true.
534        debug_assert_eq!(
535            reservation.amount() + inner.borrowed_metadata_space,
536            inner.required_reservation(),
537            "txn_space: {}, reservation_amount: {}, borrowed: {}, required: {}",
538            txn_space,
539            reservation.amount(),
540            inner.borrowed_metadata_space,
541            inner.required_reservation(),
542        );
543    }
544
545    /// Drops a transaction.  This is called automatically when a transaction is dropped.  If the
546    /// transaction has been committed, it should contain no mutations and so nothing will get rolled
547    /// back.  For each mutation, drop_mutation is called to allow for roll back (e.g. the allocator
548    /// will unreserve allocations).
549    pub fn drop_transaction(&self, transaction: &mut Transaction<'_>) {
550        for TxnMutation { object_id, mutation, .. } in transaction.take_mutations() {
551            self.object(object_id).map(|o| o.drop_mutation(mutation, transaction));
552        }
553    }
554
555    /// Returns the journal file offsets that each object depends on and the checkpoint for the
556    /// minimum offset.
557    pub fn journal_file_offsets(&self) -> (HashMap<u64, u64>, Option<JournalCheckpoint>) {
558        let inner = self.inner.read();
559        let mut min_checkpoint = None;
560        let mut offsets = HashMap::default();
561        for (&object_id, checkpoint) in &inner.journal_checkpoints {
562            let checkpoint = checkpoint.earliest();
563            match &mut min_checkpoint {
564                None => min_checkpoint = Some(checkpoint),
565                Some(min_checkpoint) => {
566                    if checkpoint.file_offset < min_checkpoint.file_offset {
567                        *min_checkpoint = checkpoint;
568                    }
569                }
570            }
571            offsets.insert(object_id, checkpoint.file_offset);
572        }
573        (offsets, min_checkpoint.cloned())
574    }
575
576    /// Returns the checkpoint into the journal that the object depends on, or None if the object
577    /// has no journaled updates.
578    pub fn journal_checkpoint(&self, object_id: u64) -> Option<JournalCheckpoint> {
579        self.inner
580            .read()
581            .journal_checkpoints
582            .get(&object_id)
583            .map(|checkpoints| checkpoints.earliest().clone())
584    }
585
586    /// Returns true if the object identified by `object_id` is known to have updates recorded in
587    /// the journal that the object depends upon.
588    pub fn needs_flush(&self, object_id: u64) -> bool {
589        self.inner.read().journal_checkpoints.contains_key(&object_id)
590    }
591
592    /// Flushes all known objects.  This will then allow the journal space to be freed.
593    ///
594    /// Also returns the earliest known version of a struct on the filesystem.
595    pub async fn flush(&self) -> Result<Version, Error> {
596        let objects = {
597            let inner = self.inner.read();
598            let mut object_ids = inner.journal_checkpoints.keys().cloned().collect::<Vec<_>>();
599            // Process objects in reverse sorted order because that will mean we compact the root
600            // object store last which will ensure we include the metadata from the compactions of
601            // other objects.
602            object_ids.sort_unstable();
603            object_ids
604                .iter()
605                .rev()
606                .map(|oid| (*oid, inner.object(*oid).unwrap()))
607                .collect::<Vec<_>>()
608        };
609
610        // As we iterate, keep track of the earliest version used by structs in these objects
611        let mut earliest_version: Version = LATEST_VERSION;
612        for (object_id, object) in objects {
613            let object_earliest_version =
614                object.flush().await.with_context(|| format!("Failed to flush oid {object_id}"))?;
615            if object_earliest_version < earliest_version {
616                earliest_version = object_earliest_version;
617            }
618        }
619
620        Ok(earliest_version)
621    }
622
623    fn object(&self, object_id: u64) -> Option<Arc<dyn JournalingObject>> {
624        self.inner.read().object(object_id)
625    }
626
627    pub fn init_metadata_reservation(&self) -> Result<(), Error> {
628        let inner = self.inner.read();
629        let required = inner.required_reservation();
630        ensure!(required >= inner.borrowed_metadata_space, FxfsError::Inconsistent);
631        let allocator = inner.allocator.as_ref().cloned().unwrap();
632        self.metadata_reservation
633            .set(
634                allocator
635                    .clone()
636                    .reserve(None, inner.required_reservation() - inner.borrowed_metadata_space)
637                    .with_context(|| {
638                        format!(
639                            "Failed to reserve {} - {} = {} bytes, free={}, \
640                             owner_bytes={}",
641                            inner.required_reservation(),
642                            inner.borrowed_metadata_space,
643                            inner.required_reservation() - inner.borrowed_metadata_space,
644                            Saturating(allocator.get_disk_bytes()) - allocator.get_used_bytes(),
645                            allocator.owner_bytes_debug(),
646                        )
647                    })?,
648            )
649            .unwrap();
650        Ok(())
651    }
652
653    pub fn metadata_reservation(&self) -> &Reservation {
654        self.metadata_reservation.get().unwrap()
655    }
656
657    pub fn update_reservation(&self, object_id: u64, amount: u64) {
658        self.inner.write().reservations.insert(object_id, amount);
659    }
660
661    pub fn reservation(&self, object_id: u64) -> Option<u64> {
662        self.inner.read().reservations.get(&object_id).cloned()
663    }
664
665    pub fn set_reserved_space(&self, amount: u64) {
666        self.inner.write().reserved_space = amount;
667    }
668
669    pub fn last_end_offset(&self) -> u64 {
670        self.inner.read().last_end_offset
671    }
672
673    pub fn set_last_end_offset(&self, v: u64) {
674        self.inner.write().last_end_offset = v;
675    }
676
677    pub fn borrowed_metadata_space(&self) -> u64 {
678        self.inner.read().borrowed_metadata_space
679    }
680
681    pub fn set_borrowed_metadata_space(&self, v: u64) {
682        self.inner.write().borrowed_metadata_space = v;
683    }
684
685    pub fn write_mutation(&self, object_id: u64, mutation: &Mutation, writer: journal::Writer<'_>) {
686        self.object(object_id).unwrap().write_mutation(mutation, writer);
687    }
688
689    pub fn unlocked_stores(&self) -> Vec<Arc<ObjectStore>> {
690        let inner = self.inner.read();
691        let mut stores = Vec::new();
692        for store in inner.stores.values() {
693            if !store.is_locked() {
694                stores.push(store.clone());
695            }
696        }
697        stores
698    }
699
700    /// Creates a lazy inspect node named `str` under `parent` which will yield statistics for the
701    /// object manager when queried.
702    pub fn track_statistics(self: &Arc<Self>, parent: &fuchsia_inspect::Node, name: &str) {
703        let this = Arc::downgrade(self);
704        parent.record_lazy_child(name, move || {
705            let this_clone = this.clone();
706            async move {
707                let inspector = fuchsia_inspect::Inspector::default();
708                if let Some(this) = this_clone.upgrade() {
709                    let (required, borrowed, earliest_checkpoint) = {
710                        // TODO(https://fxbug.dev/42069513): Push-back or rate-limit to prevent DoS.
711                        let inner = this.inner.read();
712                        (
713                            inner.required_reservation(),
714                            inner.borrowed_metadata_space,
715                            inner.earliest_journal_offset(),
716                        )
717                    };
718                    let root = inspector.root();
719                    root.record_uint("metadata_reservation", this.metadata_reservation().amount());
720                    root.record_uint("required_reservation", required);
721                    root.record_uint("borrowed_reservation", borrowed);
722                    if let Some(earliest_checkpoint) = earliest_checkpoint {
723                        root.record_uint("earliest_checkpoint", earliest_checkpoint);
724                    }
725
726                    // TODO(https://fxbug.dev/42068224): Post-compute rather than manually computing metrics.
727                    if let Some(x) = round_div(100 * borrowed, required) {
728                        root.record_uint("borrowed_to_required_reservation_percent", x);
729                    }
730                }
731                Ok(inspector)
732            }
733            .boxed()
734        });
735    }
736
737    /// Normally, we make new transactions pay for overheads incurred by the journal, such as
738    /// checksums and padding, but if the journal has discarded a significant amount after a replay,
739    /// we run the risk of there not being enough reserved.  To handle this, if the amount is
740    /// significant, we force the journal to borrow the space (using a journal created transaction).
741    pub fn needs_borrow_for_journal(&self, checkpoint: u64) -> bool {
742        checkpoint.checked_sub(self.inner.read().last_end_offset).unwrap() > 256
743    }
744}
745
746/// ReservationUpdate is an associated object that sets the amount reserved for an object
747/// (overwriting any previous amount). Updates must be applied as part of a transaction before
748/// did_commit_transaction runs because it will reconcile the accounting for reserved metadata
749/// space.
750pub struct ReservationUpdate(u64);
751
752impl ReservationUpdate {
753    pub fn new(amount: u64) -> Self {
754        Self(amount)
755    }
756}
757
758impl AssociatedObject for ReservationUpdate {
759    fn will_apply_mutation(&self, _mutation: &Mutation, object_id: u64, manager: &ObjectManager) {
760        manager.update_reservation(object_id, self.0);
761    }
762}