Skip to main content

fxfs/object_store/
graveyard.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::errors::FxfsError;
6use crate::log::*;
7use crate::lsm_tree::Query;
8use crate::lsm_tree::merge::{Merger, MergerIterator};
9use crate::lsm_tree::types::{ItemRef, LayerIterator};
10use crate::object_store::ObjectStore;
11use crate::object_store::object_manager::ObjectManager;
12use crate::object_store::object_record::{
13    ObjectAttributes, ObjectKey, ObjectKeyData, ObjectKind, ObjectValue, Timestamp,
14};
15use crate::object_store::transaction::{Mutation, Options, Transaction};
16use anyhow::{Context, Error, anyhow, bail};
17use fuchsia_async::{self as fasync};
18use fuchsia_sync::Mutex;
19use futures::StreamExt;
20use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender, unbounded};
21use futures::channel::oneshot;
22use fxfs_trace::{TraceFutureExt, trace_future_args};
23use std::collections::BTreeSet;
24use std::sync::Arc;
25use std::sync::atomic::Ordering;
26
27enum ReaperTask {
28    None,
29    Pending(UnboundedReceiver<Message>),
30    Running(fasync::Task<()>),
31}
32
33/// A graveyard exists as a place to park objects that should be deleted when they are no longer in
34/// use.  How objects enter and leave the graveyard is up to the caller to decide.  The intention is
35/// that at mount time, any objects in the graveyard will get removed.  Each object store has a
36/// directory like object that contains a list of the objects within that store that are part of the
37/// graveyard.  A single instance of this Graveyard struct manages *all* stores.
38pub struct Graveyard {
39    object_manager: Arc<ObjectManager>,
40    reaper_task: Mutex<ReaperTask>,
41    channel: UnboundedSender<Message>,
42}
43
44enum Message {
45    // Tombstone the object identified by <store-id>, <object-id>, Option<attribute-id>. If
46    // <attribute-id> is Some, tombstone just the attribute instead of the entire object.
47    Tombstone(u64, u64, Option<u64>),
48
49    // Trims the identified object.
50    Trim(u64, u64),
51
52    // When the flush message is processed, notifies sender.  This allows the receiver to know
53    // that all preceding tombstone messages have been processed.
54    Flush(oneshot::Sender<()>),
55}
56
57#[fxfs_trace::trace]
58impl Graveyard {
59    /// Creates a new instance of the graveyard manager.
60    pub fn new(object_manager: Arc<ObjectManager>) -> Arc<Self> {
61        let (sender, receiver) = unbounded();
62        Arc::new(Graveyard {
63            object_manager,
64            reaper_task: Mutex::new(ReaperTask::Pending(receiver)),
65            channel: sender,
66        })
67    }
68
69    /// Creates a graveyard object in `store`.  Returns the object ID for the graveyard object.
70    pub async fn create(
71        transaction: &mut Transaction<'_>,
72        store: &ObjectStore,
73    ) -> Result<u64, Error> {
74        let reserved_object_id = store.get_next_object_id(transaction.txn_guard()).await?;
75        let object_id = reserved_object_id.get();
76        let now = Timestamp::now();
77        transaction.add(
78            store.store_object_id,
79            Mutation::insert_object(
80                ObjectKey::object(reserved_object_id.release()),
81                ObjectValue::Object {
82                    kind: ObjectKind::Graveyard,
83                    attributes: ObjectAttributes {
84                        creation_time: now.clone(),
85                        modification_time: now,
86                        project_id: 0,
87                        ..Default::default()
88                    },
89                },
90            ),
91        );
92        Ok(object_id)
93    }
94
95    /// Starts an asynchronous task to reap the graveyard for all entries older than
96    /// |journal_offset| (exclusive).
97    /// If a task is already started, this has no effect, even if that task was targeting an older
98    /// |journal_offset|.
99    pub fn reap_async(self: Arc<Self>) {
100        let mut reaper_task = self.reaper_task.lock();
101        if let ReaperTask::Pending(_) = &*reaper_task {
102            if let ReaperTask::Pending(receiver) =
103                std::mem::replace(&mut *reaper_task, ReaperTask::None)
104            {
105                *reaper_task = ReaperTask::Running(fasync::Task::spawn(
106                    self.clone()
107                        .reap_task(receiver)
108                        .trace(trace_future_args!("Graveyard::reap_task")),
109                ));
110            } else {
111                unreachable!();
112            }
113        }
114    }
115
116    /// Returns a future which completes when the ongoing reap task (if it exists) completes.
117    pub async fn wait_for_reap(&self) {
118        self.channel.close_channel();
119        let task = std::mem::replace(&mut *self.reaper_task.lock(), ReaperTask::None);
120        if let ReaperTask::Running(task) = task {
121            task.await;
122        }
123    }
124
125    async fn reap_task(self: Arc<Self>, mut receiver: UnboundedReceiver<Message>) {
126        // Wait and process reap requests.
127        while let Some(message) = receiver.next().await {
128            match message {
129                Message::Tombstone(store_id, object_id, attribute_id) => {
130                    let res = if let Some(attribute_id) = attribute_id {
131                        self.tombstone_attribute(store_id, object_id, attribute_id).await
132                    } else {
133                        self.tombstone_object(store_id, object_id).await
134                    };
135                    if let Err(e) = res {
136                        error!(
137                            error:? = e,
138                            store_id,
139                            oid = object_id,
140                            attribute_id;
141                            "Tombstone error"
142                        );
143                    }
144                }
145                Message::Trim(store_id, object_id) => {
146                    if let Err(e) = self.trim(store_id, object_id).await {
147                        error!(error:? = e, store_id, oid = object_id; "Tombstone error");
148                    }
149                }
150                Message::Flush(sender) => {
151                    let _ = sender.send(());
152                }
153            }
154        }
155    }
156
157    /// Performs the initial mount-time reap for the given store.  This will queue all items in the
158    /// graveyard.  Concurrently adding more entries to the graveyard will lead to undefined
159    /// behaviour: the entries might or might not be immediately tombstoned, so callers should wait
160    /// for this to return before changing to a state where more entries can be added.  Once this
161    /// has returned, entries will be tombstoned in the background.
162    #[trace]
163    pub async fn initial_reap(self: &Arc<Self>, store: &ObjectStore) -> Result<usize, Error> {
164        if store.filesystem().options().skip_initial_reap {
165            return Ok(0);
166        }
167        let mut count = 0;
168        let layer_set = store.tree().layer_set();
169        let mut merger = layer_set.merger();
170        let graveyard_object_id = store.graveyard_directory_object_id();
171        let mut iter = Self::iter(graveyard_object_id, &mut merger).await?;
172        let store_id = store.store_object_id();
173        let mut queued_objects = BTreeSet::new();
174        while let Some(GraveyardEntryInfo { object_id, attribute_id, value }) = iter.get() {
175            store.graveyard_entries.fetch_add(1, Ordering::Relaxed);
176            match value {
177                ObjectValue::Some => {
178                    if let Some(attribute_id) = attribute_id {
179                        // If the object is already queued for tombstone, don't queue any attributes
180                        // under it as well. The object tombstone will clean up any attributes as
181                        // well as their graveyard entries.
182                        if !queued_objects.contains(&(store_id, object_id)) {
183                            self.queue_tombstone_attribute(store_id, object_id, attribute_id)
184                        }
185                    } else {
186                        queued_objects.insert((store_id, object_id));
187                        self.queue_tombstone_object(store_id, object_id)
188                    }
189                }
190                ObjectValue::Trim => {
191                    if attribute_id.is_some() {
192                        return Err(anyhow!(
193                            "Trim is not currently supported for a single attribute"
194                        ));
195                    }
196                    self.queue_trim(store_id, object_id)
197                }
198                _ => bail!(anyhow!(FxfsError::Inconsistent).context("Bad graveyard value")),
199            }
200            count += 1;
201            iter.advance().await?;
202        }
203        Ok(count)
204    }
205    /// Queues an object for tombstoning.
206    pub fn queue_tombstone_object(&self, store_id: u64, object_id: u64) {
207        let _ = self.channel.unbounded_send(Message::Tombstone(store_id, object_id, None));
208    }
209
210    /// Queues an object's attribute for tombstoning.
211    pub fn queue_tombstone_attribute(&self, store_id: u64, object_id: u64, attribute_id: u64) {
212        let _ = self.channel.unbounded_send(Message::Tombstone(
213            store_id,
214            object_id,
215            Some(attribute_id),
216        ));
217    }
218
219    fn queue_trim(&self, store_id: u64, object_id: u64) {
220        let _ = self.channel.unbounded_send(Message::Trim(store_id, object_id));
221    }
222
223    /// Waits for all preceding queued tombstones to finish.
224    pub async fn flush(&self) {
225        let (sender, receiver) = oneshot::channel::<()>();
226        self.channel.unbounded_send(Message::Flush(sender)).unwrap();
227        receiver.await.unwrap();
228    }
229
230    /// Immediately tombstones (discards) an object in the graveyard.
231    /// NB: Code should generally use |queue_tombstone| instead.
232    pub async fn tombstone_object(&self, store_id: u64, object_id: u64) -> Result<(), Error> {
233        let store = self
234            .object_manager
235            .store(store_id)
236            .with_context(|| format!("Failed to get store {}", store_id))?;
237        // For now, it's safe to assume that all objects in the root parent and root store should
238        // return space to the metadata reservation, but we might have to revisit that if we end up
239        // with objects that are in other stores.
240        let options = if store_id == self.object_manager.root_parent_store_object_id()
241            || store_id == self.object_manager.root_store_object_id()
242        {
243            Options {
244                skip_journal_checks: true,
245                borrow_metadata_space: true,
246                allocator_reservation: Some(self.object_manager.metadata_reservation()),
247                ..Default::default()
248            }
249        } else {
250            Options { skip_journal_checks: true, borrow_metadata_space: true, ..Default::default() }
251        };
252        store.tombstone_object(object_id, options).await
253    }
254
255    /// Immediately tombstones (discards) and attribute in the graveyard.
256    /// NB: Code should generally use |queue_tombstone| instead.
257    pub async fn tombstone_attribute(
258        &self,
259        store_id: u64,
260        object_id: u64,
261        attribute_id: u64,
262    ) -> Result<(), Error> {
263        let store = self
264            .object_manager
265            .store(store_id)
266            .with_context(|| format!("Failed to get store {}", store_id))?;
267        // For now, it's safe to assume that all objects in the root parent and root store should
268        // return space to the metadata reservation, but we might have to revisit that if we end up
269        // with objects that are in other stores.
270        let options = if store_id == self.object_manager.root_parent_store_object_id()
271            || store_id == self.object_manager.root_store_object_id()
272        {
273            Options {
274                skip_journal_checks: true,
275                borrow_metadata_space: true,
276                allocator_reservation: Some(self.object_manager.metadata_reservation()),
277                ..Default::default()
278            }
279        } else {
280            Options { skip_journal_checks: true, borrow_metadata_space: true, ..Default::default() }
281        };
282        store.tombstone_attribute(object_id, attribute_id, options).await
283    }
284
285    async fn trim(&self, store_id: u64, object_id: u64) -> Result<(), Error> {
286        let store = self
287            .object_manager
288            .store(store_id)
289            .with_context(|| format!("Failed to get store {}", store_id))?;
290        let fs = store.filesystem();
291        let truncate_guard = fs.truncate_guard(store_id, object_id).await;
292        store.trim(object_id, &truncate_guard).await.context("Failed to trim object")
293    }
294
295    /// Returns an iterator that will return graveyard entries skipping deleted ones.  Example
296    /// usage:
297    ///
298    ///   let layer_set = graveyard.store().tree().layer_set();
299    ///   let mut merger = layer_set.merger();
300    ///   let mut iter = graveyard.iter(&mut merger).await?;
301    ///
302    pub async fn iter<'a, 'b>(
303        graveyard_object_id: u64,
304        merger: &'a mut Merger<'b, ObjectKey, ObjectValue>,
305    ) -> Result<GraveyardIterator<'a, 'b>, Error> {
306        Self::iter_from(merger, graveyard_object_id, 0).await
307    }
308
309    /// Like "iter", but seeks from a specific (store-id, object-id) tuple.  Example usage:
310    ///
311    ///   let layer_set = graveyard.store().tree().layer_set();
312    ///   let mut merger = layer_set.merger();
313    ///   let mut iter = graveyard.iter_from(&mut merger, (2, 3)).await?;
314    ///
315    async fn iter_from<'a, 'b>(
316        merger: &'a mut Merger<'b, ObjectKey, ObjectValue>,
317        graveyard_object_id: u64,
318        from: u64,
319    ) -> Result<GraveyardIterator<'a, 'b>, Error> {
320        GraveyardIterator::new(
321            graveyard_object_id,
322            merger
323                .query(Query::FullRange(&ObjectKey::graveyard_entry(graveyard_object_id, from)))
324                .await?,
325        )
326        .await
327    }
328}
329
330pub struct GraveyardIterator<'a, 'b> {
331    object_id: u64,
332    iter: MergerIterator<'a, 'b, ObjectKey, ObjectValue>,
333}
334
335/// Contains information about a graveyard entry associated with a particular object or
336/// attribute.
337#[derive(Debug, PartialEq)]
338pub struct GraveyardEntryInfo {
339    object_id: u64,
340    attribute_id: Option<u64>,
341    value: ObjectValue,
342}
343
344impl GraveyardEntryInfo {
345    pub fn object_id(&self) -> u64 {
346        self.object_id
347    }
348
349    pub fn attribute_id(&self) -> Option<u64> {
350        self.attribute_id
351    }
352
353    pub fn value(&self) -> &ObjectValue {
354        &self.value
355    }
356}
357
358impl<'a, 'b> GraveyardIterator<'a, 'b> {
359    async fn new(
360        object_id: u64,
361        iter: MergerIterator<'a, 'b, ObjectKey, ObjectValue>,
362    ) -> Result<GraveyardIterator<'a, 'b>, Error> {
363        let mut iter = GraveyardIterator { object_id, iter };
364        iter.skip_deleted_entries().await?;
365        Ok(iter)
366    }
367
368    async fn skip_deleted_entries(&mut self) -> Result<(), Error> {
369        loop {
370            match self.iter.get() {
371                Some(ItemRef {
372                    key: ObjectKey { object_id, .. },
373                    value: ObjectValue::None,
374                    ..
375                }) if *object_id == self.object_id => {}
376                _ => return Ok(()),
377            }
378            self.iter.advance().await?;
379        }
380    }
381
382    pub fn get(&self) -> Option<GraveyardEntryInfo> {
383        match self.iter.get() {
384            Some(ItemRef {
385                key: ObjectKey { object_id: oid, data: ObjectKeyData::GraveyardEntry { object_id } },
386                value,
387                ..
388            }) if *oid == self.object_id => Some(GraveyardEntryInfo {
389                object_id: *object_id,
390                attribute_id: None,
391                value: value.clone(),
392            }),
393            Some(ItemRef {
394                key:
395                    ObjectKey {
396                        object_id: oid,
397                        data: ObjectKeyData::GraveyardAttributeEntry { object_id, attribute_id },
398                    },
399                value,
400                ..
401            }) if *oid == self.object_id => Some(GraveyardEntryInfo {
402                object_id: *object_id,
403                attribute_id: Some(*attribute_id),
404                value: value.clone(),
405            }),
406            _ => None,
407        }
408    }
409
410    pub async fn advance(&mut self) -> Result<(), Error> {
411        self.iter.advance().await?;
412        self.skip_deleted_entries().await
413    }
414}
415
416#[cfg(test)]
417mod tests {
418    use super::{Graveyard, GraveyardEntryInfo, ObjectStore};
419    use crate::errors::FxfsError;
420    use crate::filesystem::{FxFilesystem, FxFilesystemBuilder};
421    use crate::fsck::fsck;
422    use crate::object_handle::ObjectHandle;
423    use crate::object_store::data_object_handle::WRITE_ATTR_BATCH_SIZE;
424    use crate::object_store::object_record::ObjectValue;
425    use crate::object_store::transaction::{Options, lock_keys};
426    use crate::object_store::{FSVERITY_MERKLE_ATTRIBUTE_ID, HandleOptions, Mutation, ObjectKey};
427    use assert_matches::assert_matches;
428    use storage_device::DeviceHolder;
429    use storage_device::fake_device::FakeDevice;
430
431    const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
432
433    #[fuchsia::test]
434    async fn test_graveyard() {
435        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
436        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
437        let root_store = fs.root_store();
438
439        assert_eq!(root_store.graveyard_count(), 0);
440
441        let mut transaction = fs
442            .clone()
443            .new_transaction(lock_keys![], Options::default())
444            .await
445            .expect("new_transaction failed");
446        let handle1 = ObjectStore::create_object(
447            &root_store,
448            &mut transaction,
449            HandleOptions::default(),
450            None,
451        )
452        .await
453        .expect("create_object failed");
454        let handle2 = ObjectStore::create_object(
455            &root_store,
456            &mut transaction,
457            HandleOptions::default(),
458            None,
459        )
460        .await
461        .expect("create_object failed");
462        transaction.commit().await.expect("commit failed");
463        let id1 = handle1.object_id();
464        let id2 = handle2.object_id();
465
466        // Create and add two objects to the graveyard.
467        let mut transaction = fs
468            .clone()
469            .new_transaction(lock_keys![], Options::default())
470            .await
471            .expect("new_transaction failed");
472
473        root_store.add_to_graveyard(&mut transaction, id1);
474        root_store.add_to_graveyard(&mut transaction, id2);
475        transaction.commit().await.expect("commit failed");
476
477        assert_eq!(root_store.graveyard_count(), 2);
478
479        // Check that we see the objects we added.
480        {
481            let layer_set = root_store.tree().layer_set();
482            let mut merger = layer_set.merger();
483            let mut iter = Graveyard::iter(root_store.graveyard_directory_object_id(), &mut merger)
484                .await
485                .expect("iter failed");
486            assert_matches!(
487                iter.get().expect("missing entry"),
488                GraveyardEntryInfo { object_id, attribute_id: None, value: ObjectValue::Some }
489                if object_id == id1
490            );
491            iter.advance().await.expect("advance failed");
492            assert_matches!(
493                iter.get().expect("missing entry"),
494                GraveyardEntryInfo { object_id, attribute_id: None, value: ObjectValue::Some }
495                if object_id == id2
496            );
497            iter.advance().await.expect("advance failed");
498            assert_eq!(iter.get(), None);
499        }
500
501        // Remove one of the objects.
502        let mut transaction = fs
503            .clone()
504            .new_transaction(lock_keys![], Options::default())
505            .await
506            .expect("new_transaction failed");
507        root_store.remove_from_graveyard(&mut transaction, id2);
508        transaction.commit().await.expect("commit failed");
509
510        assert_eq!(root_store.graveyard_count(), 1);
511
512        // Check that the graveyard has been updated as expected.
513        let layer_set = root_store.tree().layer_set();
514        let mut merger = layer_set.merger();
515        let mut iter = Graveyard::iter(root_store.graveyard_directory_object_id(), &mut merger)
516            .await
517            .expect("iter failed");
518        assert_matches!(
519            iter.get().expect("missing entry"),
520            GraveyardEntryInfo { object_id, attribute_id: None, value: ObjectValue::Some }
521            if object_id == id1
522        );
523        iter.advance().await.expect("advance failed");
524        assert_eq!(iter.get(), None);
525    }
526
527    #[fuchsia::test]
528    async fn test_graveyard_count_replay() {
529        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
530        let (device, _object_ids) = {
531            let fs = FxFilesystemBuilder::new()
532                .skip_initial_reap(true)
533                .format(true)
534                .open(device)
535                .await
536                .expect("open failed");
537            let root_store = fs.root_store();
538
539            let mut object_ids = Vec::new();
540            let mut transaction = fs
541                .clone()
542                .new_transaction(lock_keys![], Options::default())
543                .await
544                .expect("new_transaction failed");
545            let handle1 = ObjectStore::create_object(
546                &root_store,
547                &mut transaction,
548                HandleOptions::default(),
549                None,
550            )
551            .await
552            .expect("create_object failed");
553            let handle2 = ObjectStore::create_object(
554                &root_store,
555                &mut transaction,
556                HandleOptions::default(),
557                None,
558            )
559            .await
560            .expect("create_object failed");
561            transaction.commit().await.expect("commit failed");
562            object_ids.push(handle1.object_id());
563            object_ids.push(handle2.object_id());
564
565            // Create and add two objects to the graveyard.
566            let mut transaction = fs
567                .clone()
568                .new_transaction(lock_keys![], Options::default())
569                .await
570                .expect("new_transaction failed");
571
572            root_store.add_to_graveyard(&mut transaction, object_ids[0]);
573            root_store.add_to_graveyard(&mut transaction, object_ids[1]);
574            transaction.commit().await.expect("commit failed");
575
576            assert_eq!(root_store.graveyard_count(), 2);
577            fs.close().await.expect("close failed");
578            (fs.take_device().await, object_ids)
579        };
580        device.reopen(false);
581        let device = {
582            let fs =
583                FxFilesystemBuilder::new().read_only(true).open(device).await.expect("open failed");
584            let root_store = fs.root_store();
585            // Counter is 0 because initial_reap is not called for read-only mounts.
586            assert_eq!(root_store.graveyard_count(), 0);
587
588            // Now manually run it. This will count and queue (but the reaper isn't running).
589            let count =
590                fs.graveyard().initial_reap(&root_store).await.expect("initial_reap failed");
591            let actual_count = root_store.graveyard_count();
592            assert_eq!(count, 2, "initial_reap found wrong number of items (count={})", count);
593            assert_eq!(
594                actual_count, 2,
595                "graveyard_count returned {} but initial_reap found {}",
596                actual_count, count
597            );
598
599            fs.close().await.expect("close failed");
600            fs.take_device().await
601        };
602        device.reopen(false);
603        {
604            // Now test the full flow where they are automatically reaped.
605            let fs = FxFilesystem::open(device).await.expect("open failed");
606            let root_store = fs.root_store();
607
608            // They might or might not have been reaped yet.
609            // Wait for the reaper to finish.
610            fs.graveyard().wait_for_reap().await;
611
612            // Now the count MUST be 0.
613            assert_eq!(root_store.graveyard_count(), 0);
614            fs.close().await.expect("close failed");
615        }
616    }
617
618    #[fuchsia::test]
619    async fn test_tombstone_attribute() {
620        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
621        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
622        let root_store = fs.root_store();
623        let mut transaction = fs
624            .clone()
625            .new_transaction(lock_keys![], Options::default())
626            .await
627            .expect("new_transaction failed");
628
629        let handle = ObjectStore::create_object(
630            &root_store,
631            &mut transaction,
632            HandleOptions::default(),
633            None,
634        )
635        .await
636        .expect("failed to create object");
637        transaction.commit().await.expect("commit failed");
638
639        handle
640            .write_attr(FSVERITY_MERKLE_ATTRIBUTE_ID, &[0; 8192])
641            .await
642            .expect("failed to write merkle attribute");
643        let object_id = handle.object_id();
644        let mut transaction = handle.new_transaction().await.expect("new_transaction failed");
645        transaction.add(
646            root_store.store_object_id(),
647            Mutation::replace_or_insert_object(
648                ObjectKey::graveyard_attribute_entry(
649                    root_store.graveyard_directory_object_id(),
650                    object_id,
651                    FSVERITY_MERKLE_ATTRIBUTE_ID,
652                ),
653                ObjectValue::Some,
654            ),
655        );
656
657        transaction.commit().await.expect("commit failed");
658
659        fs.close().await.expect("failed to close filesystem");
660        let device = fs.take_device().await;
661        device.reopen(false);
662
663        let fs =
664            FxFilesystemBuilder::new().read_only(true).open(device).await.expect("open failed");
665        fsck(fs.clone()).await.expect("fsck failed");
666        fs.close().await.expect("failed to close filesystem");
667        let device = fs.take_device().await;
668        device.reopen(false);
669
670        // On open, the filesystem will call initial_reap which will call queue_tombstone().
671        let fs = FxFilesystem::open(device).await.expect("open failed");
672        // `wait_for_reap` ensures that the Message::Tombstone is actually processed.
673        fs.graveyard().wait_for_reap().await;
674        let root_store = fs.root_store();
675
676        let handle =
677            ObjectStore::open_object(&root_store, object_id, HandleOptions::default(), None)
678                .await
679                .expect("failed to open object");
680
681        assert_eq!(
682            handle.read_attr(FSVERITY_MERKLE_ATTRIBUTE_ID).await.expect("read_attr failed"),
683            None
684        );
685        fsck(fs.clone()).await.expect("fsck failed");
686    }
687
688    #[fuchsia::test]
689    async fn test_tombstone_attribute_and_object() {
690        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
691        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
692        let root_store = fs.root_store();
693        let mut transaction = fs
694            .clone()
695            .new_transaction(lock_keys![], Options::default())
696            .await
697            .expect("new_transaction failed");
698
699        let handle = ObjectStore::create_object(
700            &root_store,
701            &mut transaction,
702            HandleOptions::default(),
703            None,
704        )
705        .await
706        .expect("failed to create object");
707        transaction.commit().await.expect("commit failed");
708
709        // With both of these it will test that both their graveyard entries got cleaned up in
710        // trim_or_tombstone() via two different paths.
711        handle
712            .write_attr(FSVERITY_MERKLE_ATTRIBUTE_ID, &[0; 8192])
713            .await
714            .expect("failed to write merkle attribute");
715        handle
716            .write_attr(FSVERITY_MERKLE_ATTRIBUTE_ID + 1, &[0; 8192])
717            .await
718            .expect("failed to write merkle attribute");
719        let object_id = handle.object_id();
720        let mut transaction = handle.new_transaction().await.expect("new_transaction failed");
721        transaction.add(
722            root_store.store_object_id(),
723            Mutation::replace_or_insert_object(
724                ObjectKey::graveyard_attribute_entry(
725                    root_store.graveyard_directory_object_id(),
726                    object_id,
727                    FSVERITY_MERKLE_ATTRIBUTE_ID,
728                ),
729                ObjectValue::Some,
730            ),
731        );
732        transaction.add(
733            root_store.store_object_id(),
734            Mutation::replace_or_insert_object(
735                ObjectKey::graveyard_attribute_entry(
736                    root_store.graveyard_directory_object_id(),
737                    object_id,
738                    FSVERITY_MERKLE_ATTRIBUTE_ID + 1,
739                ),
740                ObjectValue::Some,
741            ),
742        );
743        transaction.commit().await.expect("commit failed");
744        let mut transaction = handle.new_transaction().await.expect("new_transaction failed");
745        transaction.add(
746            root_store.store_object_id(),
747            Mutation::replace_or_insert_object(
748                ObjectKey::graveyard_entry(root_store.graveyard_directory_object_id(), object_id),
749                ObjectValue::Some,
750            ),
751        );
752        transaction.commit().await.expect("commit failed");
753
754        fs.close().await.expect("failed to close filesystem");
755        let device = fs.take_device().await;
756        device.reopen(false);
757
758        let fs =
759            FxFilesystemBuilder::new().read_only(true).open(device).await.expect("open failed");
760        fsck(fs.clone()).await.expect("fsck failed");
761        fs.close().await.expect("failed to close filesystem");
762        let device = fs.take_device().await;
763        device.reopen(false);
764
765        // On open, the filesystem will call initial_reap which will call queue_tombstone().
766        let fs = FxFilesystem::open(device).await.expect("open failed");
767        // `wait_for_reap` ensures that the two tombstone messages are processed.
768        fs.graveyard().wait_for_reap().await;
769
770        let root_store = fs.root_store();
771        if let Err(e) =
772            ObjectStore::open_object(&root_store, object_id, HandleOptions::default(), None).await
773        {
774            assert!(FxfsError::NotFound.matches(&e));
775        } else {
776            panic!("open_object succeeded");
777        };
778        fsck(fs.clone()).await.expect("fsck failed");
779    }
780
781    #[fuchsia::test]
782    async fn test_tombstone_large_attribute() {
783        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
784        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
785        let root_store = fs.root_store();
786        let mut transaction = fs
787            .clone()
788            .new_transaction(lock_keys![], Options::default())
789            .await
790            .expect("new_transaction failed");
791
792        let handle = ObjectStore::create_object(
793            &root_store,
794            &mut transaction,
795            HandleOptions::default(),
796            None,
797        )
798        .await
799        .expect("failed to create object");
800        transaction.commit().await.expect("commit failed");
801
802        let object_id = {
803            let mut transaction = handle.new_transaction().await.expect("new_transaction failed");
804            transaction.add(
805                root_store.store_object_id(),
806                Mutation::replace_or_insert_object(
807                    ObjectKey::graveyard_attribute_entry(
808                        root_store.graveyard_directory_object_id(),
809                        handle.object_id(),
810                        FSVERITY_MERKLE_ATTRIBUTE_ID,
811                    ),
812                    ObjectValue::Some,
813                ),
814            );
815
816            // This write should span three transactions. This test mimics the behavior when the
817            // last transaction gets interrupted by a filesystem.close().
818            handle
819                .write_new_attr_in_batches(
820                    &mut transaction,
821                    FSVERITY_MERKLE_ATTRIBUTE_ID,
822                    &vec![0; 3 * WRITE_ATTR_BATCH_SIZE],
823                    WRITE_ATTR_BATCH_SIZE,
824                )
825                .await
826                .expect("failed to write merkle attribute");
827
828            handle.object_id()
829            // Drop the transaction to simulate interrupting the merkle tree creation as well as to
830            // release the transaction locks.
831        };
832
833        fs.close().await.expect("failed to close filesystem");
834        let device = fs.take_device().await;
835        device.reopen(false);
836
837        let fs =
838            FxFilesystemBuilder::new().read_only(true).open(device).await.expect("open failed");
839        fsck(fs.clone()).await.expect("fsck failed");
840        fs.close().await.expect("failed to close filesystem");
841        let device = fs.take_device().await;
842        device.reopen(false);
843
844        // On open, the filesystem will call initial_reap which will call queue_tombstone().
845        let fs = FxFilesystem::open(device).await.expect("open failed");
846        // `wait_for_reap` ensures that the two tombstone messages are processed.
847        fs.graveyard().wait_for_reap().await;
848
849        let root_store = fs.root_store();
850
851        let handle =
852            ObjectStore::open_object(&root_store, object_id, HandleOptions::default(), None)
853                .await
854                .expect("failed to open object");
855
856        assert_eq!(
857            handle.read_attr(FSVERITY_MERKLE_ATTRIBUTE_ID).await.expect("read_attr failed"),
858            None
859        );
860        fsck(fs.clone()).await.expect("fsck failed");
861    }
862}