Skip to main content

fxfs/object_store/
object_manager.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::errors::FxfsError;
6use crate::filesystem::{ApplyContext, ApplyMode, JournalingObject};
7use crate::log::*;
8use crate::metrics;
9use crate::object_handle::INVALID_OBJECT_ID;
10use crate::object_store::allocator::{Allocator, Reservation};
11use crate::object_store::directory::Directory;
12use crate::object_store::journal::{self, JournalCheckpoint};
13use crate::object_store::transaction::{
14    AssocObj, AssociatedObject, MetadataReservation, Mutation, Transaction, TxnMutation,
15};
16use crate::object_store::tree_cache::TreeCache;
17use crate::object_store::volume::{VOLUMES_DIRECTORY, list_volumes};
18use crate::object_store::{ObjectDescriptor, ObjectStore};
19use crate::round::round_div;
20use crate::serialized_types::{LATEST_VERSION, Version};
21use anyhow::{Context, Error, anyhow, bail, ensure};
22use fuchsia_inspect::{Property as _, UintProperty};
23use fuchsia_sync::RwLock;
24use futures::FutureExt as _;
25use rustc_hash::FxHashMap as HashMap;
26use std::collections::hash_map::Entry;
27use std::num::Saturating;
28use std::sync::{Arc, OnceLock};
29
30// Data written to the journal eventually needs to be flushed somewhere (typically into layer
31// files).  Here we conservatively assume that could take up to four times as much space as it does
32// in the journal.  In the layer file, it'll take up at least as much, but we must reserve the same
33// again that so that there's enough space for compactions, and then we need some spare for
34// overheads.
35//
36// TODO(https://fxbug.dev/42178158): We should come up with a better way of determining what the multiplier
37// should be here.  2x was too low, as it didn't cover any space for metadata.  4x might be too
38// much.
39pub const fn reserved_space_from_journal_usage(journal_usage: u64) -> u64 {
40    journal_usage * 4
41}
42
43/// ObjectManager is a global loading cache for object stores and other special objects.
44pub struct ObjectManager {
45    inner: RwLock<Inner>,
46    metadata_reservation: OnceLock<Reservation>,
47    volume_directory: OnceLock<Directory<ObjectStore>>,
48    on_new_store: Option<Box<dyn Fn(&ObjectStore) + Send + Sync>>,
49}
50
51// Whilst we are flushing we need to keep track of the old checkpoint that we are hoping to flush,
52// and a new one that should apply if we successfully finish the flush.
53#[derive(Debug)]
54enum Checkpoints {
55    Current(JournalCheckpoint),
56    Old(JournalCheckpoint),
57    Both(/* old: */ JournalCheckpoint, /* current: */ JournalCheckpoint),
58}
59
60impl Checkpoints {
61    // Returns the earliest checkpoint (which will always be the old one if present).
62    fn earliest(&self) -> &JournalCheckpoint {
63        match self {
64            Checkpoints::Old(x) | Checkpoints::Both(x, _) | Checkpoints::Current(x) => x,
65        }
66    }
67}
68
69// We currently maintain strong references to all stores that have been opened, but there's no
70// currently no mechanism for releasing stores that aren't being used.
71struct Inner {
72    stores: HashMap<u64, Arc<ObjectStore>>,
73    root_parent_store_object_id: u64,
74    root_store_object_id: u64,
75    allocator_object_id: u64,
76    allocator: Option<Arc<Allocator>>,
77
78    // Records dependencies on the journal for objects i.e. an entry for object ID 1, would mean it
79    // has a dependency on journal records from that offset.
80    journal_checkpoints: HashMap<u64, Checkpoints>,
81
82    // Mappings from object-id to a target reservation amount.  The object IDs here are from the
83    // root store namespace, so it can be associated with any object in the root store.  A
84    // reservation will be made to cover the *maximum* in this map, since it is assumed that any
85    // requirement is only temporary, for the duration of a compaction, and that once compaction has
86    // finished for a particular object, the space will be recovered.
87    reservations: HashMap<u64, u64>,
88
89    // The last journal end offset for a transaction that has been applied.  This is not necessarily
90    // the same as the start offset for the next transaction because of padding.
91    last_end_offset: u64,
92
93    // A running counter that tracks metadata space that has been borrowed on the understanding that
94    // eventually it will be recovered (potentially after a full compaction).
95    borrowed_metadata_space: u64,
96
97    // The maximum transaction size that has been encountered so far.
98    max_transaction_size: (u64, UintProperty),
99
100    // Extra temporary space that might be tied up in the journal that hasn't yet been deallocated.
101    reserved_space: u64,
102}
103
104impl Inner {
105    fn earliest_journal_offset(&self) -> Option<u64> {
106        self.journal_checkpoints.values().map(|c| c.earliest().file_offset).min()
107    }
108
109    // Returns the required size of the metadata reservation assuming that no space has been
110    // borrowed.  The invariant is: reservation-size + borrowed-space = required.
111    fn required_reservation(&self) -> u64 {
112        // Start with the maximum amount of temporary space we might need during compactions.
113        self.reservations.values().max().unwrap_or(&0)
114
115        // Account for data that has been written to the journal that will need to be written
116        // to layer files when flushed.
117            + self.earliest_journal_offset()
118            .map(|min| reserved_space_from_journal_usage(self.last_end_offset - min))
119            .unwrap_or(0)
120
121        // Extra reserved space
122            + self.reserved_space
123    }
124
125    fn journaling_object(&self, object_id: u64) -> Option<Arc<dyn JournalingObject>> {
126        if object_id == self.allocator_object_id {
127            Some(self.allocator.clone().unwrap() as Arc<dyn JournalingObject>)
128        } else {
129            self.stores.get(&object_id).map(|x| x.clone() as Arc<dyn JournalingObject>)
130        }
131    }
132}
133
134impl ObjectManager {
135    pub fn new(on_new_store: Option<Box<dyn Fn(&ObjectStore) + Send + Sync>>) -> ObjectManager {
136        ObjectManager {
137            inner: RwLock::new(Inner {
138                stores: HashMap::default(),
139                root_parent_store_object_id: INVALID_OBJECT_ID,
140                root_store_object_id: INVALID_OBJECT_ID,
141                allocator_object_id: INVALID_OBJECT_ID,
142                allocator: None,
143                journal_checkpoints: HashMap::default(),
144                reservations: HashMap::default(),
145                last_end_offset: 0,
146                borrowed_metadata_space: 0,
147                max_transaction_size: (0, metrics::detail().create_uint("max_transaction_size", 0)),
148                reserved_space: journal::RESERVED_SPACE,
149            }),
150            metadata_reservation: OnceLock::new(),
151            volume_directory: OnceLock::new(),
152            on_new_store,
153        }
154    }
155
156    pub fn required_reservation(&self) -> u64 {
157        self.inner.read().required_reservation()
158    }
159
160    pub fn root_parent_store_object_id(&self) -> u64 {
161        self.inner.read().root_parent_store_object_id
162    }
163
164    pub fn root_parent_store(&self) -> Arc<ObjectStore> {
165        let inner = self.inner.read();
166        inner.stores.get(&inner.root_parent_store_object_id).unwrap().clone()
167    }
168
169    pub fn set_root_parent_store(&self, store: Arc<ObjectStore>) {
170        if let Some(on_new_store) = &self.on_new_store {
171            on_new_store(&store);
172        }
173        let mut inner = self.inner.write();
174        let store_id = store.store_object_id();
175        inner.stores.insert(store_id, store);
176        inner.root_parent_store_object_id = store_id;
177    }
178
179    pub fn root_store_object_id(&self) -> u64 {
180        self.inner.read().root_store_object_id
181    }
182
183    pub fn root_store(&self) -> Arc<ObjectStore> {
184        let inner = self.inner.read();
185        inner.stores.get(&inner.root_store_object_id).unwrap().clone()
186    }
187
188    pub fn set_root_store(&self, store: Arc<ObjectStore>) {
189        if let Some(on_new_store) = &self.on_new_store {
190            on_new_store(&store);
191        }
192        let mut inner = self.inner.write();
193        let store_id = store.store_object_id();
194        inner.stores.insert(store_id, store);
195        inner.root_store_object_id = store_id;
196    }
197
198    pub fn is_system_store(&self, store_id: u64) -> bool {
199        let inner = self.inner.read();
200        store_id == inner.root_store_object_id || store_id == inner.root_parent_store_object_id
201    }
202
203    /// Returns the store which might or might not be locked.
204    pub fn store(&self, store_object_id: u64) -> Option<Arc<ObjectStore>> {
205        self.inner.read().stores.get(&store_object_id).cloned()
206    }
207
208    /// Returns the total bytes written to the LSM trees in the allocator and all object stores
209    /// during compaction operations.
210    pub fn compaction_bytes_written(&self) -> u64 {
211        let inner = self.inner.read();
212        let mut total = 0;
213        if let Some(allocator) = &inner.allocator {
214            total += allocator.tree().compaction_bytes_written();
215        }
216        for store in inner.stores.values() {
217            total += store.tree().compaction_bytes_written();
218        }
219        total
220    }
221
222    /// This is not thread-safe: it assumes that a store won't be forgotten whilst the loop is
223    /// running.  This is to be used after replaying the journal.
224    pub async fn on_replay_complete(&self) -> Result<(), Error> {
225        let root_store = self.root_store();
226
227        let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
228            .await
229            .context("Unable to open root volume directory")?;
230
231        match root_directory.lookup(VOLUMES_DIRECTORY).await? {
232            None => bail!("Root directory not found"),
233            Some((object_id, ObjectDescriptor::Directory, _)) => {
234                let volume_directory = Directory::open(&root_store, object_id)
235                    .await
236                    .context("Unable to open volumes directory")?;
237                self.volume_directory.set(volume_directory).unwrap();
238            }
239            _ => {
240                bail!(
241                    anyhow!(FxfsError::Inconsistent)
242                        .context("Unexpected type for volumes directory")
243                )
244            }
245        }
246
247        let object_ids = list_volumes(self.volume_directory.get().unwrap())
248            .await
249            .context("Failed to list volumes")?;
250
251        for store_id in object_ids {
252            self.open_store(&root_store, store_id).await?;
253        }
254
255        // This can fail if a filesystem is created and truncated to a size
256        // that doesn't leave enough free space for metadata reservations.
257        self.init_metadata_reservation()
258            .context("Insufficient free space for metadata reservation.")?;
259
260        Ok(())
261    }
262
263    pub fn volume_directory(&self) -> &Directory<ObjectStore> {
264        self.volume_directory.get().unwrap()
265    }
266
267    pub fn set_volume_directory(&self, volume_directory: Directory<ObjectStore>) {
268        self.volume_directory.set(volume_directory).unwrap();
269    }
270
271    pub fn add_store(&self, store: Arc<ObjectStore>) {
272        if let Some(on_new_store) = &self.on_new_store {
273            on_new_store(&store);
274        }
275        let mut inner = self.inner.write();
276        let store_object_id = store.store_object_id();
277        assert_ne!(store_object_id, inner.root_parent_store_object_id);
278        assert_ne!(store_object_id, inner.root_store_object_id);
279        assert_ne!(store_object_id, inner.allocator_object_id);
280        inner.stores.insert(store_object_id, store);
281    }
282
283    pub fn forget_store(&self, store_object_id: u64) {
284        let mut inner = self.inner.write();
285        assert_ne!(store_object_id, inner.allocator_object_id);
286        inner.stores.remove(&store_object_id);
287        inner.reservations.remove(&store_object_id);
288    }
289
290    pub fn set_allocator(&self, allocator: Arc<Allocator>) {
291        let mut inner = self.inner.write();
292        assert!(!inner.stores.contains_key(&allocator.object_id()));
293        inner.allocator_object_id = allocator.object_id();
294        inner.allocator = Some(allocator);
295    }
296
297    pub fn allocator(&self) -> Arc<Allocator> {
298        self.inner.read().allocator.clone().unwrap()
299    }
300
301    /// Applies `mutation` to `object` with `context`.
302    pub fn apply_mutation(
303        &self,
304        object_id: u64,
305        mutation: Mutation,
306        context: &ApplyContext<'_, '_>,
307        associated_object: AssocObj<'_>,
308    ) -> Result<(), Error> {
309        debug!(oid = object_id, mutation:?; "applying mutation");
310        let object = {
311            let mut inner = self.inner.write();
312            match mutation {
313                Mutation::BeginFlush => {
314                    if let Some(entry) = inner.journal_checkpoints.get_mut(&object_id) {
315                        match entry {
316                            Checkpoints::Current(x) | Checkpoints::Both(x, _) => {
317                                *entry = Checkpoints::Old(x.clone());
318                            }
319                            _ => {}
320                        }
321                    }
322                }
323                Mutation::EndFlush => {
324                    if let Entry::Occupied(mut o) = inner.journal_checkpoints.entry(object_id) {
325                        let entry = o.get_mut();
326                        match entry {
327                            Checkpoints::Old(_) => {
328                                o.remove();
329                            }
330                            Checkpoints::Both(_, x) => {
331                                *entry = Checkpoints::Current(x.clone());
332                            }
333                            _ => {}
334                        }
335                    }
336                }
337                Mutation::DeleteVolume => {
338                    if let Some(store) = inner.stores.remove(&object_id) {
339                        store.mark_deleted();
340                    }
341                    inner.reservations.remove(&object_id);
342                    inner.journal_checkpoints.remove(&object_id);
343                    return Ok(());
344                }
345                _ => {
346                    if object_id != inner.root_parent_store_object_id {
347                        inner
348                            .journal_checkpoints
349                            .entry(object_id)
350                            .and_modify(|entry| {
351                                if let Checkpoints::Old(x) = entry {
352                                    *entry =
353                                        Checkpoints::Both(x.clone(), context.checkpoint.clone());
354                                }
355                            })
356                            .or_insert_with(|| Checkpoints::Current(context.checkpoint.clone()));
357                    }
358                }
359            }
360            inner.journaling_object(object_id).unwrap()
361        };
362        associated_object.map(|o| o.will_apply_mutation(&mutation, object_id, self));
363        object.apply_mutation(mutation, context, associated_object)
364    }
365
366    /// Replays `mutations` for a single transaction.  `journal_offsets` contains the per-object
367    /// starting offsets; if the current transaction offset precedes an offset, the mutations for
368    /// that object are ignored.  `context` contains the location in the journal file for this
369    /// transaction and `end_offset` is the ending journal offset for this transaction.
370    pub async fn replay_mutations(
371        &self,
372        mutations: Vec<(u64, Mutation)>,
373        journal_offsets: &HashMap<u64, u64>,
374        context: &ApplyContext<'_, '_>,
375        end_offset: u64,
376    ) -> Result<(), Error> {
377        debug!(checkpoint = context.checkpoint.file_offset; "REPLAY");
378        let txn_size = {
379            let mut inner = self.inner.write();
380            if end_offset > inner.last_end_offset {
381                Some(end_offset - std::mem::replace(&mut inner.last_end_offset, end_offset))
382            } else {
383                None
384            }
385        };
386
387        let allocator_object_id = self.inner.read().allocator_object_id;
388
389        for (object_id, mutation) in mutations {
390            if let Mutation::UpdateBorrowed(borrowed) = mutation {
391                if let Some(txn_size) = txn_size {
392                    self.inner.write().borrowed_metadata_space = borrowed
393                        .checked_add(reserved_space_from_journal_usage(txn_size))
394                        .ok_or(FxfsError::Inconsistent)?;
395                }
396                continue;
397            }
398
399            // Don't replay mutations if the object doesn't want it.
400            if let Some(&offset) = journal_offsets.get(&object_id) {
401                if context.checkpoint.file_offset < offset {
402                    continue;
403                }
404            }
405
406            // If this is the first time we've encountered this store, we'll need to open it.
407            if object_id != allocator_object_id {
408                self.open_store(&self.root_store(), object_id).await?;
409            }
410
411            self.apply_mutation(object_id, mutation, context, AssocObj::None)?;
412        }
413        Ok(())
414    }
415
416    /// Opens the specified store if it isn't already.  This is *not* thread-safe.
417    async fn open_store(&self, parent: &Arc<ObjectStore>, object_id: u64) -> Result<(), Error> {
418        if self.inner.read().stores.contains_key(&object_id) {
419            return Ok(());
420        }
421        let store = ObjectStore::open(parent, object_id, Box::new(TreeCache::new()))
422            .await
423            .with_context(|| format!("Failed to open store {object_id}"))?;
424        if let Some(on_new_store) = &self.on_new_store {
425            on_new_store(&store);
426        }
427        assert!(self.inner.write().stores.insert(object_id, store).is_none());
428        Ok(())
429    }
430
431    /// Called by the journaling system to apply a transaction.  `checkpoint` indicates the location
432    /// in the journal file for this transaction.  Returns an optional mutation to be written to be
433    /// included with the transaction.
434    pub fn apply_transaction(
435        &self,
436        transaction: &mut Transaction<'_>,
437        checkpoint: &JournalCheckpoint,
438    ) -> Result<Option<Mutation>, Error> {
439        // Record old values so we can see what changes as a result of this transaction.
440        let old_amount = self.metadata_reservation().amount();
441        let old_required = self.inner.read().required_reservation();
442
443        debug!(checkpoint = checkpoint.file_offset; "BEGIN TXN");
444        let mutations = transaction.take_mutations();
445        let context =
446            ApplyContext { mode: ApplyMode::Live(transaction), checkpoint: checkpoint.clone() };
447        for TxnMutation { object_id, mutation, associated_object, .. } in mutations {
448            self.apply_mutation(object_id, mutation, &context, associated_object)?;
449        }
450        debug!("END TXN");
451
452        Ok(if let MetadataReservation::Borrowed = transaction.metadata_reservation {
453            // If this transaction is borrowing metadata, figure out what has changed and return a
454            // mutation with the updated value for borrowed.  The transaction might have allocated
455            // or deallocated some data from the metadata reservation, or it might have made a
456            // change that means we need to reserve more or less space (e.g. we compacted).
457            let new_amount = self.metadata_reservation().amount();
458            let mut inner = self.inner.write();
459            let new_required = inner.required_reservation();
460            let add = old_amount + new_required;
461            let sub = new_amount + old_required;
462            if add >= sub {
463                inner.borrowed_metadata_space += add - sub;
464            } else {
465                inner.borrowed_metadata_space =
466                    inner.borrowed_metadata_space.saturating_sub(sub - add);
467            }
468            Some(Mutation::UpdateBorrowed(inner.borrowed_metadata_space))
469        } else {
470            // This transaction should have had no impact on the metadata reservation or the amount
471            // we need to reserve.
472            debug_assert_eq!(self.metadata_reservation().amount(), old_amount);
473            debug_assert_eq!(self.inner.read().required_reservation(), old_required);
474            None
475        })
476    }
477
478    /// Called by the journaling system after a transaction has been written providing the end
479    /// offset for the transaction so that we can adjust borrowed metadata space accordingly.
480    pub fn did_commit_transaction(
481        &self,
482        transaction: &mut Transaction<'_>,
483        _checkpoint: &JournalCheckpoint,
484        end_offset: u64,
485    ) {
486        let reservation = self.metadata_reservation();
487        let mut inner = self.inner.write();
488        let journal_usage = end_offset - std::mem::replace(&mut inner.last_end_offset, end_offset);
489
490        if journal_usage > inner.max_transaction_size.0 {
491            inner.max_transaction_size.0 = journal_usage;
492            inner.max_transaction_size.1.set(journal_usage);
493        }
494
495        let txn_space = reserved_space_from_journal_usage(journal_usage);
496        match &mut transaction.metadata_reservation {
497            MetadataReservation::None => unreachable!(),
498            MetadataReservation::Borrowed => {
499                // Account for the amount we need to borrow for the transaction itself now that we
500                // know the transaction size.
501                inner.borrowed_metadata_space += txn_space;
502
503                // This transaction borrowed metadata space, but it might have returned space to the
504                // transaction that we can now give back to the allocator.
505                let to_give_back = (reservation.amount() + inner.borrowed_metadata_space)
506                    .saturating_sub(inner.required_reservation());
507                if to_give_back > 0 {
508                    reservation.give_back(to_give_back);
509                }
510            }
511            MetadataReservation::Hold(hold_amount) => {
512                // Transfer reserved space into the metadata reservation.
513                let txn_reservation = transaction.allocator_reservation.unwrap();
514                assert_ne!(
515                    txn_reservation as *const _, reservation as *const _,
516                    "MetadataReservation::Borrowed should be used."
517                );
518                txn_reservation.commit(txn_space);
519                if txn_reservation.owner_object_id() != reservation.owner_object_id() {
520                    assert_eq!(
521                        reservation.owner_object_id(),
522                        None,
523                        "Should not be mixing attributed owners."
524                    );
525                    inner
526                        .allocator
527                        .as_ref()
528                        .unwrap()
529                        .disown_reservation(txn_reservation.owner_object_id(), txn_space);
530                }
531                if let Some(amount) = hold_amount.checked_sub(txn_space) {
532                    *hold_amount = amount;
533                } else {
534                    panic!("Transaction was larger than metadata reservation");
535                }
536                reservation.add(txn_space);
537            }
538            MetadataReservation::Reservation(txn_reservation) => {
539                // Transfer reserved space into the metadata reservation.
540                txn_reservation.move_to(reservation, txn_space);
541            }
542        }
543        // Check that our invariant holds true.
544        debug_assert_eq!(
545            reservation.amount() + inner.borrowed_metadata_space,
546            inner.required_reservation(),
547            "txn_space: {}, reservation_amount: {}, borrowed: {}, required: {}",
548            txn_space,
549            reservation.amount(),
550            inner.borrowed_metadata_space,
551            inner.required_reservation(),
552        );
553    }
554
555    /// Drops a transaction.  This is called automatically when a transaction is dropped.  If the
556    /// transaction has been committed, it should contain no mutations and so nothing will get rolled
557    /// back.  For each mutation, drop_mutation is called to allow for roll back (e.g. the allocator
558    /// will unreserve allocations).
559    pub fn drop_transaction(&self, transaction: &mut Transaction<'_>) {
560        for TxnMutation { object_id, mutation, .. } in transaction.take_mutations() {
561            self.journaling_object(object_id).map(|o| o.drop_mutation(mutation, transaction));
562        }
563    }
564
565    /// Returns the journal file offsets that each object depends on and the checkpoint for the
566    /// minimum offset.
567    pub fn journal_file_offsets(&self) -> (HashMap<u64, u64>, Option<JournalCheckpoint>) {
568        let inner = self.inner.read();
569        let mut min_checkpoint = None;
570        let mut offsets = HashMap::default();
571        for (&object_id, checkpoint) in &inner.journal_checkpoints {
572            let checkpoint = checkpoint.earliest();
573            match &mut min_checkpoint {
574                None => min_checkpoint = Some(checkpoint),
575                Some(min_checkpoint) => {
576                    if checkpoint.file_offset < min_checkpoint.file_offset {
577                        *min_checkpoint = checkpoint;
578                    }
579                }
580            }
581            offsets.insert(object_id, checkpoint.file_offset);
582        }
583        (offsets, min_checkpoint.cloned())
584    }
585
586    /// Returns the checkpoint into the journal that the object depends on, or None if the object
587    /// has no journaled updates.
588    pub fn journal_checkpoint(&self, object_id: u64) -> Option<JournalCheckpoint> {
589        self.inner
590            .read()
591            .journal_checkpoints
592            .get(&object_id)
593            .map(|checkpoints| checkpoints.earliest().clone())
594    }
595
596    /// Returns true if the object identified by `object_id` is known to have updates recorded in
597    /// the journal that the object depends upon.
598    pub fn needs_flush(&self, object_id: u64) -> bool {
599        self.inner.read().journal_checkpoints.contains_key(&object_id)
600    }
601
602    /// Flushes all known objects.  This will then allow the journal space to be freed.
603    ///
604    /// Also returns the earliest known version of a struct on the filesystem.
605    pub async fn flush(&self) -> Result<Version, Error> {
606        let objects = {
607            let inner = self.inner.read();
608            let mut object_ids = inner.journal_checkpoints.keys().cloned().collect::<Vec<_>>();
609            // Process objects in reverse sorted order because that will mean we compact the root
610            // object store last which will ensure we include the metadata from the compactions of
611            // other objects.
612            object_ids.sort_unstable();
613            object_ids
614                .iter()
615                .rev()
616                .map(|oid| (*oid, inner.journaling_object(*oid).unwrap()))
617                .collect::<Vec<_>>()
618        };
619
620        // As we iterate, keep track of the earliest version used by structs in these objects
621        let mut earliest_version: Version = LATEST_VERSION;
622        for (object_id, object) in objects {
623            let object_earliest_version =
624                object.flush().await.with_context(|| format!("Failed to flush oid {object_id}"))?;
625            if object_earliest_version < earliest_version {
626                earliest_version = object_earliest_version;
627            }
628        }
629
630        Ok(earliest_version)
631    }
632
633    pub fn journaling_object(&self, object_id: u64) -> Option<Arc<dyn JournalingObject>> {
634        self.inner.read().journaling_object(object_id)
635    }
636
637    pub fn init_metadata_reservation(&self) -> Result<(), Error> {
638        if self.root_parent_store().filesystem().options().read_only {
639            // We don't need metadata reservations in read-only mode.
640            return Ok(());
641        }
642        let inner = self.inner.read();
643        let required = inner.required_reservation();
644        ensure!(required >= inner.borrowed_metadata_space, FxfsError::Inconsistent);
645        let allocator = inner.allocator.as_ref().cloned().unwrap();
646        self.metadata_reservation
647            .set(
648                allocator
649                    .clone()
650                    .reserve(None, inner.required_reservation() - inner.borrowed_metadata_space)
651                    .with_context(|| {
652                        format!(
653                            "Failed to reserve {} - {} = {} bytes, free={}, \
654                             owner_bytes={}",
655                            inner.required_reservation(),
656                            inner.borrowed_metadata_space,
657                            inner.required_reservation() - inner.borrowed_metadata_space,
658                            Saturating(allocator.get_disk_bytes()) - allocator.get_used_bytes(),
659                            allocator.owner_bytes_debug(),
660                        )
661                    })?,
662            )
663            .unwrap();
664        Ok(())
665    }
666
667    pub fn metadata_reservation(&self) -> &Reservation {
668        self.metadata_reservation.get().unwrap()
669    }
670
671    pub fn update_reservation(&self, object_id: u64, amount: u64) {
672        self.inner.write().reservations.insert(object_id, amount);
673    }
674
675    pub fn reservation(&self, object_id: u64) -> Option<u64> {
676        self.inner.read().reservations.get(&object_id).cloned()
677    }
678
679    pub fn set_reserved_space(&self, amount: u64) {
680        self.inner.write().reserved_space = amount;
681    }
682
683    pub fn last_end_offset(&self) -> u64 {
684        self.inner.read().last_end_offset
685    }
686
687    pub fn set_last_end_offset(&self, v: u64) {
688        self.inner.write().last_end_offset = v;
689    }
690
691    pub fn borrowed_metadata_space(&self) -> u64 {
692        self.inner.read().borrowed_metadata_space
693    }
694
695    pub fn set_borrowed_metadata_space(&self, v: u64) {
696        self.inner.write().borrowed_metadata_space = v;
697    }
698
699    pub fn write_mutation(&self, object_id: u64, mutation: &Mutation, writer: journal::Writer<'_>) {
700        self.journaling_object(object_id).unwrap().write_mutation(mutation, writer);
701    }
702
703    pub fn unlocked_stores(&self) -> Vec<Arc<ObjectStore>> {
704        let inner = self.inner.read();
705        let mut stores = Vec::new();
706        for store in inner.stores.values() {
707            if !store.is_locked() {
708                stores.push(store.clone());
709            }
710        }
711        stores
712    }
713
714    /// Creates a lazy inspect node named `str` under `parent` which will yield statistics for the
715    /// object manager when queried.
716    pub fn track_statistics(self: &Arc<Self>, parent: &fuchsia_inspect::Node, name: &str) {
717        let this = Arc::downgrade(self);
718        parent.record_lazy_child(name, move || {
719            let this_clone = this.clone();
720            async move {
721                let inspector = fuchsia_inspect::Inspector::default();
722                if let Some(this) = this_clone.upgrade() {
723                    let (required, borrowed, earliest_checkpoint) = {
724                        // TODO(https://fxbug.dev/42069513): Push-back or rate-limit to prevent DoS.
725                        let inner = this.inner.read();
726                        (
727                            inner.required_reservation(),
728                            inner.borrowed_metadata_space,
729                            inner.earliest_journal_offset(),
730                        )
731                    };
732                    let root = inspector.root();
733                    if let Some(reservation) = this.metadata_reservation.get() {
734                        root.record_uint("metadata_reservation", reservation.amount());
735                    }
736                    root.record_uint("required_reservation", required);
737                    root.record_uint("borrowed_reservation", borrowed);
738                    if let Some(earliest_checkpoint) = earliest_checkpoint {
739                        root.record_uint("earliest_checkpoint", earliest_checkpoint);
740                    }
741
742                    // TODO(https://fxbug.dev/42068224): Post-compute rather than manually computing metrics.
743                    if let Some(x) = round_div(100 * borrowed, required) {
744                        root.record_uint("borrowed_to_required_reservation_percent", x);
745                    }
746                }
747                Ok(inspector)
748            }
749            .boxed()
750        });
751    }
752
753    /// Normally, we make new transactions pay for overheads incurred by the journal, such as
754    /// checksums and padding, but if the journal has discarded a significant amount after a replay,
755    /// we run the risk of there not being enough reserved.  To handle this, if the amount is
756    /// significant, we force the journal to borrow the space (using a journal created transaction).
757    pub fn needs_borrow_for_journal(&self, checkpoint: u64) -> bool {
758        checkpoint.checked_sub(self.inner.read().last_end_offset).unwrap() > 256
759    }
760}
761
762/// ReservationUpdate is an associated object that sets the amount reserved for an object
763/// (overwriting any previous amount). Updates must be applied as part of a transaction before
764/// did_commit_transaction runs because it will reconcile the accounting for reserved metadata
765/// space.
766pub struct ReservationUpdate(u64);
767
768impl ReservationUpdate {
769    pub fn new(amount: u64) -> Self {
770        Self(amount)
771    }
772}
773
774impl AssociatedObject for ReservationUpdate {
775    fn will_apply_mutation(&self, _mutation: &Mutation, object_id: u64, manager: &ObjectManager) {
776        manager.update_reservation(object_id, self.0);
777    }
778}