Skip to main content

fxfs/object_store/
object_manager.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::errors::FxfsError;
6use crate::filesystem::{ApplyContext, ApplyMode, JournalingObject};
7use crate::log::*;
8use crate::metrics;
9use crate::object_handle::INVALID_OBJECT_ID;
10use crate::object_store::allocator::{Allocator, Reservation};
11use crate::object_store::directory::Directory;
12use crate::object_store::journal::{self, JournalCheckpoint};
13use crate::object_store::transaction::{
14    AssocObj, AssociatedObject, MetadataReservation, Mutation, Transaction, TxnMutation,
15};
16use crate::object_store::tree_cache::TreeCache;
17use crate::object_store::volume::{VOLUMES_DIRECTORY, list_volumes};
18use crate::object_store::{ObjectDescriptor, ObjectStore};
19use crate::round::round_div;
20use crate::serialized_types::{LATEST_VERSION, Version};
21use anyhow::{Context, Error, anyhow, bail, ensure};
22use fuchsia_inspect::{Property as _, UintProperty};
23use fuchsia_sync::RwLock;
24use futures::FutureExt as _;
25use rustc_hash::FxHashMap as HashMap;
26use std::collections::hash_map::Entry;
27use std::num::Saturating;
28use std::sync::{Arc, OnceLock};
29
30// Data written to the journal eventually needs to be flushed somewhere (typically into layer
31// files).  Here we conservatively assume that could take up to four times as much space as it does
32// in the journal.  In the layer file, it'll take up at least as much, but we must reserve the same
33// again that so that there's enough space for compactions, and then we need some spare for
34// overheads.
35//
36// TODO(https://fxbug.dev/42178158): We should come up with a better way of determining what the multiplier
37// should be here.  2x was too low, as it didn't cover any space for metadata.  4x might be too
38// much.
39pub const fn reserved_space_from_journal_usage(journal_usage: u64) -> u64 {
40    journal_usage * 4
41}
42
43/// ObjectManager is a global loading cache for object stores and other special objects.
44pub struct ObjectManager {
45    inner: RwLock<Inner>,
46    metadata_reservation: OnceLock<Reservation>,
47    volume_directory: OnceLock<Directory<ObjectStore>>,
48    on_new_store: Option<Box<dyn Fn(&ObjectStore) + Send + Sync>>,
49}
50
51// Whilst we are flushing we need to keep track of the old checkpoint that we are hoping to flush,
52// and a new one that should apply if we successfully finish the flush.
53#[derive(Debug)]
54enum Checkpoints {
55    Current(JournalCheckpoint),
56    Old(JournalCheckpoint),
57    Both(/* old: */ JournalCheckpoint, /* current: */ JournalCheckpoint),
58}
59
60impl Checkpoints {
61    // Returns the earliest checkpoint (which will always be the old one if present).
62    fn earliest(&self) -> &JournalCheckpoint {
63        match self {
64            Checkpoints::Old(x) | Checkpoints::Both(x, _) | Checkpoints::Current(x) => x,
65        }
66    }
67}
68
69// We currently maintain strong references to all stores that have been opened, but there's no
70// currently no mechanism for releasing stores that aren't being used.
71struct Inner {
72    stores: HashMap<u64, Arc<ObjectStore>>,
73    root_parent_store_object_id: u64,
74    root_store_object_id: u64,
75    allocator_object_id: u64,
76    allocator: Option<Arc<Allocator>>,
77
78    // Records dependencies on the journal for objects i.e. an entry for object ID 1, would mean it
79    // has a dependency on journal records from that offset.
80    journal_checkpoints: HashMap<u64, Checkpoints>,
81
82    // Mappings from object-id to a target reservation amount.  The object IDs here are from the
83    // root store namespace, so it can be associated with any object in the root store.  A
84    // reservation will be made to cover the *maximum* in this map, since it is assumed that any
85    // requirement is only temporary, for the duration of a compaction, and that once compaction has
86    // finished for a particular object, the space will be recovered.
87    reservations: HashMap<u64, u64>,
88
89    // The last journal end offset for a transaction that has been applied.  This is not necessarily
90    // the same as the start offset for the next transaction because of padding.
91    last_end_offset: u64,
92
93    // A running counter that tracks metadata space that has been borrowed on the understanding that
94    // eventually it will be recovered (potentially after a full compaction).
95    borrowed_metadata_space: u64,
96
97    // The maximum transaction size that has been encountered so far.
98    max_transaction_size: (u64, UintProperty),
99
100    // Extra temporary space that might be tied up in the journal that hasn't yet been deallocated.
101    reserved_space: u64,
102}
103
104impl Inner {
105    fn earliest_journal_offset(&self) -> Option<u64> {
106        self.journal_checkpoints.values().map(|c| c.earliest().file_offset).min()
107    }
108
109    // Returns the required size of the metadata reservation assuming that no space has been
110    // borrowed.  The invariant is: reservation-size + borrowed-space = required.
111    fn required_reservation(&self) -> u64 {
112        // Start with the maximum amount of temporary space we might need during compactions.
113        self.reservations.values().max().unwrap_or(&0)
114
115        // Account for data that has been written to the journal that will need to be written
116        // to layer files when flushed.
117            + self.earliest_journal_offset()
118            .map(|min| reserved_space_from_journal_usage(self.last_end_offset - min))
119            .unwrap_or(0)
120
121        // Extra reserved space
122            + self.reserved_space
123    }
124
125    fn object(&self, object_id: u64) -> Option<Arc<dyn JournalingObject>> {
126        if object_id == self.allocator_object_id {
127            Some(self.allocator.clone().unwrap() as Arc<dyn JournalingObject>)
128        } else {
129            self.stores.get(&object_id).map(|x| x.clone() as Arc<dyn JournalingObject>)
130        }
131    }
132}
133
134impl ObjectManager {
135    pub fn new(on_new_store: Option<Box<dyn Fn(&ObjectStore) + Send + Sync>>) -> ObjectManager {
136        ObjectManager {
137            inner: RwLock::new(Inner {
138                stores: HashMap::default(),
139                root_parent_store_object_id: INVALID_OBJECT_ID,
140                root_store_object_id: INVALID_OBJECT_ID,
141                allocator_object_id: INVALID_OBJECT_ID,
142                allocator: None,
143                journal_checkpoints: HashMap::default(),
144                reservations: HashMap::default(),
145                last_end_offset: 0,
146                borrowed_metadata_space: 0,
147                max_transaction_size: (0, metrics::detail().create_uint("max_transaction_size", 0)),
148                reserved_space: journal::RESERVED_SPACE,
149            }),
150            metadata_reservation: OnceLock::new(),
151            volume_directory: OnceLock::new(),
152            on_new_store,
153        }
154    }
155
156    pub fn required_reservation(&self) -> u64 {
157        self.inner.read().required_reservation()
158    }
159
160    pub fn root_parent_store_object_id(&self) -> u64 {
161        self.inner.read().root_parent_store_object_id
162    }
163
164    pub fn root_parent_store(&self) -> Arc<ObjectStore> {
165        let inner = self.inner.read();
166        inner.stores.get(&inner.root_parent_store_object_id).unwrap().clone()
167    }
168
169    pub fn set_root_parent_store(&self, store: Arc<ObjectStore>) {
170        if let Some(on_new_store) = &self.on_new_store {
171            on_new_store(&store);
172        }
173        let mut inner = self.inner.write();
174        let store_id = store.store_object_id();
175        inner.stores.insert(store_id, store);
176        inner.root_parent_store_object_id = store_id;
177    }
178
179    pub fn root_store_object_id(&self) -> u64 {
180        self.inner.read().root_store_object_id
181    }
182
183    pub fn root_store(&self) -> Arc<ObjectStore> {
184        let inner = self.inner.read();
185        inner.stores.get(&inner.root_store_object_id).unwrap().clone()
186    }
187
188    pub fn set_root_store(&self, store: Arc<ObjectStore>) {
189        if let Some(on_new_store) = &self.on_new_store {
190            on_new_store(&store);
191        }
192        let mut inner = self.inner.write();
193        let store_id = store.store_object_id();
194        inner.stores.insert(store_id, store);
195        inner.root_store_object_id = store_id;
196    }
197
198    pub fn is_system_store(&self, store_id: u64) -> bool {
199        let inner = self.inner.read();
200        store_id == inner.root_store_object_id || store_id == inner.root_parent_store_object_id
201    }
202
203    /// Returns the store which might or might not be locked.
204    pub fn store(&self, store_object_id: u64) -> Option<Arc<ObjectStore>> {
205        self.inner.read().stores.get(&store_object_id).cloned()
206    }
207
208    /// Returns the total bytes written to the LSM trees in the allocator and all object stores
209    /// during compaction operations.
210    pub fn compaction_bytes_written(&self) -> u64 {
211        let inner = self.inner.read();
212        let mut total = 0;
213        if let Some(allocator) = &inner.allocator {
214            total += allocator.tree().compaction_bytes_written();
215        }
216        for store in inner.stores.values() {
217            total += store.tree().compaction_bytes_written();
218        }
219        total
220    }
221
222    /// This is not thread-safe: it assumes that a store won't be forgotten whilst the loop is
223    /// running.  This is to be used after replaying the journal.
224    pub async fn on_replay_complete(&self) -> Result<(), Error> {
225        let root_store = self.root_store();
226
227        let root_directory = Directory::open(&root_store, root_store.root_directory_object_id())
228            .await
229            .context("Unable to open root volume directory")?;
230
231        match root_directory.lookup(VOLUMES_DIRECTORY).await? {
232            None => bail!("Root directory not found"),
233            Some((object_id, ObjectDescriptor::Directory, _)) => {
234                let volume_directory = Directory::open(&root_store, object_id)
235                    .await
236                    .context("Unable to open volumes directory")?;
237                self.volume_directory.set(volume_directory).unwrap();
238            }
239            _ => {
240                bail!(
241                    anyhow!(FxfsError::Inconsistent)
242                        .context("Unexpected type for volumes directory")
243                )
244            }
245        }
246
247        let object_ids = list_volumes(self.volume_directory.get().unwrap())
248            .await
249            .context("Failed to list volumes")?;
250
251        for store_id in object_ids {
252            self.open_store(&root_store, store_id).await?;
253        }
254
255        // This can fail if a filesystem is created and truncated to a size
256        // that doesn't leave enough free space for metadata reservations.
257        self.init_metadata_reservation()
258            .context("Insufficient free space for metadata reservation.")?;
259
260        Ok(())
261    }
262
263    pub fn volume_directory(&self) -> &Directory<ObjectStore> {
264        self.volume_directory.get().unwrap()
265    }
266
267    pub fn set_volume_directory(&self, volume_directory: Directory<ObjectStore>) {
268        self.volume_directory.set(volume_directory).unwrap();
269    }
270
271    pub fn add_store(&self, store: Arc<ObjectStore>) {
272        if let Some(on_new_store) = &self.on_new_store {
273            on_new_store(&store);
274        }
275        let mut inner = self.inner.write();
276        let store_object_id = store.store_object_id();
277        assert_ne!(store_object_id, inner.root_parent_store_object_id);
278        assert_ne!(store_object_id, inner.root_store_object_id);
279        assert_ne!(store_object_id, inner.allocator_object_id);
280        inner.stores.insert(store_object_id, store);
281    }
282
283    pub fn forget_store(&self, store_object_id: u64) {
284        let mut inner = self.inner.write();
285        assert_ne!(store_object_id, inner.allocator_object_id);
286        inner.stores.remove(&store_object_id);
287        inner.reservations.remove(&store_object_id);
288    }
289
290    pub fn set_allocator(&self, allocator: Arc<Allocator>) {
291        let mut inner = self.inner.write();
292        assert!(!inner.stores.contains_key(&allocator.object_id()));
293        inner.allocator_object_id = allocator.object_id();
294        inner.allocator = Some(allocator);
295    }
296
297    pub fn allocator(&self) -> Arc<Allocator> {
298        self.inner.read().allocator.clone().unwrap()
299    }
300
301    /// Applies `mutation` to `object` with `context`.
302    pub fn apply_mutation(
303        &self,
304        object_id: u64,
305        mutation: Mutation,
306        context: &ApplyContext<'_, '_>,
307        associated_object: AssocObj<'_>,
308    ) -> Result<(), Error> {
309        debug!(oid = object_id, mutation:?; "applying mutation");
310        let object = {
311            let mut inner = self.inner.write();
312            match mutation {
313                Mutation::BeginFlush => {
314                    if let Some(entry) = inner.journal_checkpoints.get_mut(&object_id) {
315                        match entry {
316                            Checkpoints::Current(x) | Checkpoints::Both(x, _) => {
317                                *entry = Checkpoints::Old(x.clone());
318                            }
319                            _ => {}
320                        }
321                    }
322                }
323                Mutation::EndFlush => {
324                    if let Entry::Occupied(mut o) = inner.journal_checkpoints.entry(object_id) {
325                        let entry = o.get_mut();
326                        match entry {
327                            Checkpoints::Old(_) => {
328                                o.remove();
329                            }
330                            Checkpoints::Both(_, x) => {
331                                *entry = Checkpoints::Current(x.clone());
332                            }
333                            _ => {}
334                        }
335                    }
336                }
337                Mutation::DeleteVolume => {
338                    if let Some(store) = inner.stores.remove(&object_id) {
339                        store.mark_deleted();
340                    }
341                    inner.reservations.remove(&object_id);
342                    inner.journal_checkpoints.remove(&object_id);
343                    return Ok(());
344                }
345                _ => {
346                    if object_id != inner.root_parent_store_object_id {
347                        inner
348                            .journal_checkpoints
349                            .entry(object_id)
350                            .and_modify(|entry| {
351                                if let Checkpoints::Old(x) = entry {
352                                    *entry =
353                                        Checkpoints::Both(x.clone(), context.checkpoint.clone());
354                                }
355                            })
356                            .or_insert_with(|| Checkpoints::Current(context.checkpoint.clone()));
357                    }
358                }
359            }
360            if object_id == inner.allocator_object_id {
361                inner.allocator.clone().unwrap() as Arc<dyn JournalingObject>
362            } else {
363                inner.stores.get(&object_id).unwrap().clone() as Arc<dyn JournalingObject>
364            }
365        };
366        associated_object.map(|o| o.will_apply_mutation(&mutation, object_id, self));
367        object.apply_mutation(mutation, context, associated_object)
368    }
369
370    /// Replays `mutations` for a single transaction.  `journal_offsets` contains the per-object
371    /// starting offsets; if the current transaction offset precedes an offset, the mutations for
372    /// that object are ignored.  `context` contains the location in the journal file for this
373    /// transaction and `end_offset` is the ending journal offset for this transaction.
374    pub async fn replay_mutations(
375        &self,
376        mutations: Vec<(u64, Mutation)>,
377        journal_offsets: &HashMap<u64, u64>,
378        context: &ApplyContext<'_, '_>,
379        end_offset: u64,
380    ) -> Result<(), Error> {
381        debug!(checkpoint = context.checkpoint.file_offset; "REPLAY");
382        let txn_size = {
383            let mut inner = self.inner.write();
384            if end_offset > inner.last_end_offset {
385                Some(end_offset - std::mem::replace(&mut inner.last_end_offset, end_offset))
386            } else {
387                None
388            }
389        };
390
391        let allocator_object_id = self.inner.read().allocator_object_id;
392
393        for (object_id, mutation) in mutations {
394            if let Mutation::UpdateBorrowed(borrowed) = mutation {
395                if let Some(txn_size) = txn_size {
396                    self.inner.write().borrowed_metadata_space = borrowed
397                        .checked_add(reserved_space_from_journal_usage(txn_size))
398                        .ok_or(FxfsError::Inconsistent)?;
399                }
400                continue;
401            }
402
403            // Don't replay mutations if the object doesn't want it.
404            if let Some(&offset) = journal_offsets.get(&object_id) {
405                if context.checkpoint.file_offset < offset {
406                    continue;
407                }
408            }
409
410            // If this is the first time we've encountered this store, we'll need to open it.
411            if object_id != allocator_object_id {
412                self.open_store(&self.root_store(), object_id).await?;
413            }
414
415            self.apply_mutation(object_id, mutation, context, AssocObj::None)?;
416        }
417        Ok(())
418    }
419
420    /// Opens the specified store if it isn't already.  This is *not* thread-safe.
421    async fn open_store(&self, parent: &Arc<ObjectStore>, object_id: u64) -> Result<(), Error> {
422        if self.inner.read().stores.contains_key(&object_id) {
423            return Ok(());
424        }
425        let store = ObjectStore::open(parent, object_id, Box::new(TreeCache::new()))
426            .await
427            .with_context(|| format!("Failed to open store {object_id}"))?;
428        if let Some(on_new_store) = &self.on_new_store {
429            on_new_store(&store);
430        }
431        assert!(self.inner.write().stores.insert(object_id, store).is_none());
432        Ok(())
433    }
434
435    /// Called by the journaling system to apply a transaction.  `checkpoint` indicates the location
436    /// in the journal file for this transaction.  Returns an optional mutation to be written to be
437    /// included with the transaction.
438    pub fn apply_transaction(
439        &self,
440        transaction: &mut Transaction<'_>,
441        checkpoint: &JournalCheckpoint,
442    ) -> Result<Option<Mutation>, Error> {
443        // Record old values so we can see what changes as a result of this transaction.
444        let old_amount = self.metadata_reservation().amount();
445        let old_required = self.inner.read().required_reservation();
446
447        debug!(checkpoint = checkpoint.file_offset; "BEGIN TXN");
448        let mutations = transaction.take_mutations();
449        let context =
450            ApplyContext { mode: ApplyMode::Live(transaction), checkpoint: checkpoint.clone() };
451        for TxnMutation { object_id, mutation, associated_object, .. } in mutations {
452            self.apply_mutation(object_id, mutation, &context, associated_object)?;
453        }
454        debug!("END TXN");
455
456        Ok(if let MetadataReservation::Borrowed = transaction.metadata_reservation {
457            // If this transaction is borrowing metadata, figure out what has changed and return a
458            // mutation with the updated value for borrowed.  The transaction might have allocated
459            // or deallocated some data from the metadata reservation, or it might have made a
460            // change that means we need to reserve more or less space (e.g. we compacted).
461            let new_amount = self.metadata_reservation().amount();
462            let mut inner = self.inner.write();
463            let new_required = inner.required_reservation();
464            let add = old_amount + new_required;
465            let sub = new_amount + old_required;
466            if add >= sub {
467                inner.borrowed_metadata_space += add - sub;
468            } else {
469                inner.borrowed_metadata_space =
470                    inner.borrowed_metadata_space.saturating_sub(sub - add);
471            }
472            Some(Mutation::UpdateBorrowed(inner.borrowed_metadata_space))
473        } else {
474            // This transaction should have had no impact on the metadata reservation or the amount
475            // we need to reserve.
476            debug_assert_eq!(self.metadata_reservation().amount(), old_amount);
477            debug_assert_eq!(self.inner.read().required_reservation(), old_required);
478            None
479        })
480    }
481
482    /// Called by the journaling system after a transaction has been written providing the end
483    /// offset for the transaction so that we can adjust borrowed metadata space accordingly.
484    pub fn did_commit_transaction(
485        &self,
486        transaction: &mut Transaction<'_>,
487        _checkpoint: &JournalCheckpoint,
488        end_offset: u64,
489    ) {
490        let reservation = self.metadata_reservation();
491        let mut inner = self.inner.write();
492        let journal_usage = end_offset - std::mem::replace(&mut inner.last_end_offset, end_offset);
493
494        if journal_usage > inner.max_transaction_size.0 {
495            inner.max_transaction_size.0 = journal_usage;
496            inner.max_transaction_size.1.set(journal_usage);
497        }
498
499        let txn_space = reserved_space_from_journal_usage(journal_usage);
500        match &mut transaction.metadata_reservation {
501            MetadataReservation::None => unreachable!(),
502            MetadataReservation::Borrowed => {
503                // Account for the amount we need to borrow for the transaction itself now that we
504                // know the transaction size.
505                inner.borrowed_metadata_space += txn_space;
506
507                // This transaction borrowed metadata space, but it might have returned space to the
508                // transaction that we can now give back to the allocator.
509                let to_give_back = (reservation.amount() + inner.borrowed_metadata_space)
510                    .saturating_sub(inner.required_reservation());
511                if to_give_back > 0 {
512                    reservation.give_back(to_give_back);
513                }
514            }
515            MetadataReservation::Hold(hold_amount) => {
516                // Transfer reserved space into the metadata reservation.
517                let txn_reservation = transaction.allocator_reservation.unwrap();
518                assert_ne!(
519                    txn_reservation as *const _, reservation as *const _,
520                    "MetadataReservation::Borrowed should be used."
521                );
522                txn_reservation.commit(txn_space);
523                if txn_reservation.owner_object_id() != reservation.owner_object_id() {
524                    assert_eq!(
525                        reservation.owner_object_id(),
526                        None,
527                        "Should not be mixing attributed owners."
528                    );
529                    inner
530                        .allocator
531                        .as_ref()
532                        .unwrap()
533                        .disown_reservation(txn_reservation.owner_object_id(), txn_space);
534                }
535                if let Some(amount) = hold_amount.checked_sub(txn_space) {
536                    *hold_amount = amount;
537                } else {
538                    panic!("Transaction was larger than metadata reservation");
539                }
540                reservation.add(txn_space);
541            }
542            MetadataReservation::Reservation(txn_reservation) => {
543                // Transfer reserved space into the metadata reservation.
544                txn_reservation.move_to(reservation, txn_space);
545            }
546        }
547        // Check that our invariant holds true.
548        debug_assert_eq!(
549            reservation.amount() + inner.borrowed_metadata_space,
550            inner.required_reservation(),
551            "txn_space: {}, reservation_amount: {}, borrowed: {}, required: {}",
552            txn_space,
553            reservation.amount(),
554            inner.borrowed_metadata_space,
555            inner.required_reservation(),
556        );
557    }
558
559    /// Drops a transaction.  This is called automatically when a transaction is dropped.  If the
560    /// transaction has been committed, it should contain no mutations and so nothing will get rolled
561    /// back.  For each mutation, drop_mutation is called to allow for roll back (e.g. the allocator
562    /// will unreserve allocations).
563    pub fn drop_transaction(&self, transaction: &mut Transaction<'_>) {
564        for TxnMutation { object_id, mutation, .. } in transaction.take_mutations() {
565            self.object(object_id).map(|o| o.drop_mutation(mutation, transaction));
566        }
567    }
568
569    /// Returns the journal file offsets that each object depends on and the checkpoint for the
570    /// minimum offset.
571    pub fn journal_file_offsets(&self) -> (HashMap<u64, u64>, Option<JournalCheckpoint>) {
572        let inner = self.inner.read();
573        let mut min_checkpoint = None;
574        let mut offsets = HashMap::default();
575        for (&object_id, checkpoint) in &inner.journal_checkpoints {
576            let checkpoint = checkpoint.earliest();
577            match &mut min_checkpoint {
578                None => min_checkpoint = Some(checkpoint),
579                Some(min_checkpoint) => {
580                    if checkpoint.file_offset < min_checkpoint.file_offset {
581                        *min_checkpoint = checkpoint;
582                    }
583                }
584            }
585            offsets.insert(object_id, checkpoint.file_offset);
586        }
587        (offsets, min_checkpoint.cloned())
588    }
589
590    /// Returns the checkpoint into the journal that the object depends on, or None if the object
591    /// has no journaled updates.
592    pub fn journal_checkpoint(&self, object_id: u64) -> Option<JournalCheckpoint> {
593        self.inner
594            .read()
595            .journal_checkpoints
596            .get(&object_id)
597            .map(|checkpoints| checkpoints.earliest().clone())
598    }
599
600    /// Returns true if the object identified by `object_id` is known to have updates recorded in
601    /// the journal that the object depends upon.
602    pub fn needs_flush(&self, object_id: u64) -> bool {
603        self.inner.read().journal_checkpoints.contains_key(&object_id)
604    }
605
606    /// Flushes all known objects.  This will then allow the journal space to be freed.
607    ///
608    /// Also returns the earliest known version of a struct on the filesystem.
609    pub async fn flush(&self) -> Result<Version, Error> {
610        let objects = {
611            let inner = self.inner.read();
612            let mut object_ids = inner.journal_checkpoints.keys().cloned().collect::<Vec<_>>();
613            // Process objects in reverse sorted order because that will mean we compact the root
614            // object store last which will ensure we include the metadata from the compactions of
615            // other objects.
616            object_ids.sort_unstable();
617            object_ids
618                .iter()
619                .rev()
620                .map(|oid| (*oid, inner.object(*oid).unwrap()))
621                .collect::<Vec<_>>()
622        };
623
624        // As we iterate, keep track of the earliest version used by structs in these objects
625        let mut earliest_version: Version = LATEST_VERSION;
626        for (object_id, object) in objects {
627            let object_earliest_version =
628                object.flush().await.with_context(|| format!("Failed to flush oid {object_id}"))?;
629            if object_earliest_version < earliest_version {
630                earliest_version = object_earliest_version;
631            }
632        }
633
634        Ok(earliest_version)
635    }
636
637    fn object(&self, object_id: u64) -> Option<Arc<dyn JournalingObject>> {
638        self.inner.read().object(object_id)
639    }
640
641    pub fn init_metadata_reservation(&self) -> Result<(), Error> {
642        if self.root_parent_store().filesystem().options().read_only {
643            // We don't need metadata reservations in read-only mode.
644            return Ok(());
645        }
646        let inner = self.inner.read();
647        let required = inner.required_reservation();
648        ensure!(required >= inner.borrowed_metadata_space, FxfsError::Inconsistent);
649        let allocator = inner.allocator.as_ref().cloned().unwrap();
650        self.metadata_reservation
651            .set(
652                allocator
653                    .clone()
654                    .reserve(None, inner.required_reservation() - inner.borrowed_metadata_space)
655                    .with_context(|| {
656                        format!(
657                            "Failed to reserve {} - {} = {} bytes, free={}, \
658                             owner_bytes={}",
659                            inner.required_reservation(),
660                            inner.borrowed_metadata_space,
661                            inner.required_reservation() - inner.borrowed_metadata_space,
662                            Saturating(allocator.get_disk_bytes()) - allocator.get_used_bytes(),
663                            allocator.owner_bytes_debug(),
664                        )
665                    })?,
666            )
667            .unwrap();
668        Ok(())
669    }
670
671    pub fn metadata_reservation(&self) -> &Reservation {
672        self.metadata_reservation.get().unwrap()
673    }
674
675    pub fn update_reservation(&self, object_id: u64, amount: u64) {
676        self.inner.write().reservations.insert(object_id, amount);
677    }
678
679    pub fn reservation(&self, object_id: u64) -> Option<u64> {
680        self.inner.read().reservations.get(&object_id).cloned()
681    }
682
683    pub fn set_reserved_space(&self, amount: u64) {
684        self.inner.write().reserved_space = amount;
685    }
686
687    pub fn last_end_offset(&self) -> u64 {
688        self.inner.read().last_end_offset
689    }
690
691    pub fn set_last_end_offset(&self, v: u64) {
692        self.inner.write().last_end_offset = v;
693    }
694
695    pub fn borrowed_metadata_space(&self) -> u64 {
696        self.inner.read().borrowed_metadata_space
697    }
698
699    pub fn set_borrowed_metadata_space(&self, v: u64) {
700        self.inner.write().borrowed_metadata_space = v;
701    }
702
703    pub fn write_mutation(&self, object_id: u64, mutation: &Mutation, writer: journal::Writer<'_>) {
704        self.object(object_id).unwrap().write_mutation(mutation, writer);
705    }
706
707    pub fn unlocked_stores(&self) -> Vec<Arc<ObjectStore>> {
708        let inner = self.inner.read();
709        let mut stores = Vec::new();
710        for store in inner.stores.values() {
711            if !store.is_locked() {
712                stores.push(store.clone());
713            }
714        }
715        stores
716    }
717
718    /// Creates a lazy inspect node named `str` under `parent` which will yield statistics for the
719    /// object manager when queried.
720    pub fn track_statistics(self: &Arc<Self>, parent: &fuchsia_inspect::Node, name: &str) {
721        let this = Arc::downgrade(self);
722        parent.record_lazy_child(name, move || {
723            let this_clone = this.clone();
724            async move {
725                let inspector = fuchsia_inspect::Inspector::default();
726                if let Some(this) = this_clone.upgrade() {
727                    let (required, borrowed, earliest_checkpoint) = {
728                        // TODO(https://fxbug.dev/42069513): Push-back or rate-limit to prevent DoS.
729                        let inner = this.inner.read();
730                        (
731                            inner.required_reservation(),
732                            inner.borrowed_metadata_space,
733                            inner.earliest_journal_offset(),
734                        )
735                    };
736                    let root = inspector.root();
737                    if let Some(reservation) = this.metadata_reservation.get() {
738                        root.record_uint("metadata_reservation", reservation.amount());
739                    }
740                    root.record_uint("required_reservation", required);
741                    root.record_uint("borrowed_reservation", borrowed);
742                    if let Some(earliest_checkpoint) = earliest_checkpoint {
743                        root.record_uint("earliest_checkpoint", earliest_checkpoint);
744                    }
745
746                    // TODO(https://fxbug.dev/42068224): Post-compute rather than manually computing metrics.
747                    if let Some(x) = round_div(100 * borrowed, required) {
748                        root.record_uint("borrowed_to_required_reservation_percent", x);
749                    }
750                }
751                Ok(inspector)
752            }
753            .boxed()
754        });
755    }
756
757    /// Normally, we make new transactions pay for overheads incurred by the journal, such as
758    /// checksums and padding, but if the journal has discarded a significant amount after a replay,
759    /// we run the risk of there not being enough reserved.  To handle this, if the amount is
760    /// significant, we force the journal to borrow the space (using a journal created transaction).
761    pub fn needs_borrow_for_journal(&self, checkpoint: u64) -> bool {
762        checkpoint.checked_sub(self.inner.read().last_end_offset).unwrap() > 256
763    }
764}
765
766/// ReservationUpdate is an associated object that sets the amount reserved for an object
767/// (overwriting any previous amount). Updates must be applied as part of a transaction before
768/// did_commit_transaction runs because it will reconcile the accounting for reserved metadata
769/// space.
770pub struct ReservationUpdate(u64);
771
772impl ReservationUpdate {
773    pub fn new(amount: u64) -> Self {
774        Self(amount)
775    }
776}
777
778impl AssociatedObject for ReservationUpdate {
779    fn will_apply_mutation(&self, _mutation: &Mutation, object_id: u64, manager: &ObjectManager) {
780        manager.update_reservation(object_id, self.0);
781    }
782}