Skip to main content

fxfs/object_store/
graveyard.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::errors::FxfsError;
6use crate::log::*;
7use crate::lsm_tree::Query;
8use crate::lsm_tree::merge::{Merger, MergerIterator};
9use crate::lsm_tree::types::{ItemRef, LayerIterator};
10use crate::object_store::ObjectStore;
11use crate::object_store::object_manager::ObjectManager;
12use crate::object_store::object_record::{
13    ObjectAttributes, ObjectKey, ObjectKeyData, ObjectKind, ObjectValue, Timestamp,
14};
15use crate::object_store::transaction::{Mutation, Options, Transaction};
16use anyhow::{Context, Error, anyhow, bail};
17use fuchsia_async::{self as fasync};
18use fuchsia_sync::Mutex;
19use futures::StreamExt;
20use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender, unbounded};
21use futures::channel::oneshot;
22use fxfs_trace::{TraceFutureExt, trace_future_args};
23use std::sync::Arc;
24use std::sync::atomic::Ordering;
25
26enum ReaperTask {
27    None,
28    Pending(UnboundedReceiver<Message>),
29    Running(fasync::Task<()>),
30}
31
32/// A graveyard exists as a place to park objects that should be deleted when they are no longer in
33/// use.  How objects enter and leave the graveyard is up to the caller to decide.  The intention is
34/// that at mount time, any objects in the graveyard will get removed.  Each object store has a
35/// directory like object that contains a list of the objects within that store that are part of the
36/// graveyard.  A single instance of this Graveyard struct manages *all* stores.
37pub struct Graveyard {
38    object_manager: Arc<ObjectManager>,
39    reaper_task: Mutex<ReaperTask>,
40    channel: UnboundedSender<Message>,
41}
42
43enum Message {
44    // Tombstone the object identified by <store-id>, <object-id>, Option<attribute-id>. If
45    // <attribute-id> is Some, tombstone just the attribute instead of the entire object.
46    Tombstone(u64, u64, Option<u64>),
47
48    // Trims the identified object.
49    Trim(u64, u64),
50
51    // When the flush message is processed, notifies sender.  This allows the receiver to know
52    // that all preceding tombstone messages have been processed.
53    Flush(oneshot::Sender<()>),
54}
55
56#[fxfs_trace::trace]
57impl Graveyard {
58    /// Creates a new instance of the graveyard manager.
59    pub fn new(object_manager: Arc<ObjectManager>) -> Arc<Self> {
60        let (sender, receiver) = unbounded();
61        Arc::new(Graveyard {
62            object_manager,
63            reaper_task: Mutex::new(ReaperTask::Pending(receiver)),
64            channel: sender,
65        })
66    }
67
68    /// Creates a graveyard object in `store`.  Returns the object ID for the graveyard object.
69    pub async fn create(
70        transaction: &mut Transaction<'_>,
71        store: &ObjectStore,
72    ) -> Result<u64, Error> {
73        let reserved_object_id = store.get_next_object_id(transaction.txn_guard()).await?;
74        let object_id = reserved_object_id.get();
75        let now = Timestamp::now();
76        transaction.add(
77            store.store_object_id,
78            Mutation::insert_object(
79                ObjectKey::object(reserved_object_id.release()),
80                ObjectValue::Object {
81                    kind: ObjectKind::Graveyard,
82                    attributes: ObjectAttributes {
83                        creation_time: now.clone(),
84                        modification_time: now,
85                        project_id: 0,
86                        ..Default::default()
87                    },
88                },
89            ),
90        );
91        Ok(object_id)
92    }
93
94    /// Starts an asynchronous task to reap the graveyard for all entries older than
95    /// |journal_offset| (exclusive).
96    /// If a task is already started, this has no effect, even if that task was targeting an older
97    /// |journal_offset|.
98    pub fn reap_async(self: Arc<Self>) {
99        let mut reaper_task = self.reaper_task.lock();
100        if let ReaperTask::Pending(_) = &*reaper_task {
101            if let ReaperTask::Pending(receiver) =
102                std::mem::replace(&mut *reaper_task, ReaperTask::None)
103            {
104                *reaper_task = ReaperTask::Running(fasync::Task::spawn(
105                    self.clone()
106                        .reap_task(receiver)
107                        .trace(trace_future_args!("Graveyard::reap_task")),
108                ));
109            } else {
110                unreachable!();
111            }
112        }
113    }
114
115    /// Returns a future which completes when the ongoing reap task (if it exists) completes.
116    pub async fn wait_for_reap(&self) {
117        self.channel.close_channel();
118        let task = std::mem::replace(&mut *self.reaper_task.lock(), ReaperTask::None);
119        if let ReaperTask::Running(task) = task {
120            task.await;
121        }
122    }
123
124    async fn reap_task(self: Arc<Self>, mut receiver: UnboundedReceiver<Message>) {
125        // Wait and process reap requests.
126        while let Some(message) = receiver.next().await {
127            match message {
128                Message::Tombstone(store_id, object_id, attribute_id) => {
129                    let res = if let Some(attribute_id) = attribute_id {
130                        self.tombstone_attribute(store_id, object_id, attribute_id).await
131                    } else {
132                        self.tombstone_object(store_id, object_id).await
133                    };
134                    if let Err(e) = res {
135                        debug_assert!(
136                            false,
137                            "Tombstone error: {e:?}, store_id: {store_id}, oid: {object_id}, \
138                             attribute_id: {attribute_id:?}"
139                        );
140                        error!(
141                            error:? = e,
142                            store_id,
143                            oid = object_id,
144                            attribute_id;
145                            "Tombstone error"
146                        );
147                    }
148                }
149                Message::Trim(store_id, object_id) => {
150                    if let Err(e) = self.trim(store_id, object_id).await {
151                        debug_assert!(
152                            false,
153                            "Tombstone error: {e:?}, store_id: {store_id}, oid: {object_id}"
154                        );
155                        error!(error:? = e, store_id, oid = object_id; "Tombstone error");
156                    }
157                }
158                Message::Flush(sender) => {
159                    let _ = sender.send(());
160                }
161            }
162        }
163    }
164
165    /// Performs the initial mount-time reap for the given store.  This will queue all items in the
166    /// graveyard.  Concurrently adding more entries to the graveyard will lead to undefined
167    /// behaviour: the entries might or might not be immediately tombstoned, so callers should wait
168    /// for this to return before changing to a state where more entries can be added.  Once this
169    /// has returned, entries will be tombstoned in the background.
170    #[trace]
171    pub async fn initial_reap(self: &Arc<Self>, store: &ObjectStore) -> Result<usize, Error> {
172        if store.filesystem().options().skip_initial_reap {
173            return Ok(0);
174        }
175        let mut count = 0;
176        let layer_set = store.tree().layer_set();
177        let mut merger = layer_set.merger();
178        let graveyard_object_id = store.graveyard_directory_object_id();
179        let mut iter = Self::iter(graveyard_object_id, &mut merger).await?;
180        let store_id = store.store_object_id();
181        // TODO(https://fxbug.dev/355246066): This iteration can sometimes attempt to reap
182        // attributes after they've been reaped as part of the parent object.
183        while let Some(GraveyardEntryInfo { object_id, attribute_id, sequence: _, value }) =
184            iter.get()
185        {
186            store.graveyard_entries.fetch_add(1, Ordering::Relaxed);
187            match value {
188                ObjectValue::Some => {
189                    if let Some(attribute_id) = attribute_id {
190                        self.queue_tombstone_attribute(store_id, object_id, attribute_id)
191                    } else {
192                        self.queue_tombstone_object(store_id, object_id)
193                    }
194                }
195                ObjectValue::Trim => {
196                    if attribute_id.is_some() {
197                        return Err(anyhow!(
198                            "Trim is not currently supported for a single attribute"
199                        ));
200                    }
201                    self.queue_trim(store_id, object_id)
202                }
203                _ => bail!(anyhow!(FxfsError::Inconsistent).context("Bad graveyard value")),
204            }
205            count += 1;
206            iter.advance().await?;
207        }
208        Ok(count)
209    }
210    /// Queues an object for tombstoning.
211    pub fn queue_tombstone_object(&self, store_id: u64, object_id: u64) {
212        let _ = self.channel.unbounded_send(Message::Tombstone(store_id, object_id, None));
213    }
214
215    /// Queues an object's attribute for tombstoning.
216    pub fn queue_tombstone_attribute(&self, store_id: u64, object_id: u64, attribute_id: u64) {
217        let _ = self.channel.unbounded_send(Message::Tombstone(
218            store_id,
219            object_id,
220            Some(attribute_id),
221        ));
222    }
223
224    fn queue_trim(&self, store_id: u64, object_id: u64) {
225        let _ = self.channel.unbounded_send(Message::Trim(store_id, object_id));
226    }
227
228    /// Waits for all preceding queued tombstones to finish.
229    pub async fn flush(&self) {
230        let (sender, receiver) = oneshot::channel::<()>();
231        self.channel.unbounded_send(Message::Flush(sender)).unwrap();
232        receiver.await.unwrap();
233    }
234
235    /// Immediately tombstones (discards) an object in the graveyard.
236    /// NB: Code should generally use |queue_tombstone| instead.
237    pub async fn tombstone_object(&self, store_id: u64, object_id: u64) -> Result<(), Error> {
238        let store = self
239            .object_manager
240            .store(store_id)
241            .with_context(|| format!("Failed to get store {}", store_id))?;
242        // For now, it's safe to assume that all objects in the root parent and root store should
243        // return space to the metadata reservation, but we might have to revisit that if we end up
244        // with objects that are in other stores.
245        let options = if store_id == self.object_manager.root_parent_store_object_id()
246            || store_id == self.object_manager.root_store_object_id()
247        {
248            Options {
249                skip_journal_checks: true,
250                borrow_metadata_space: true,
251                allocator_reservation: Some(self.object_manager.metadata_reservation()),
252                ..Default::default()
253            }
254        } else {
255            Options { skip_journal_checks: true, borrow_metadata_space: true, ..Default::default() }
256        };
257        store.tombstone_object(object_id, options).await
258    }
259
260    /// Immediately tombstones (discards) and attribute in the graveyard.
261    /// NB: Code should generally use |queue_tombstone| instead.
262    pub async fn tombstone_attribute(
263        &self,
264        store_id: u64,
265        object_id: u64,
266        attribute_id: u64,
267    ) -> Result<(), Error> {
268        let store = self
269            .object_manager
270            .store(store_id)
271            .with_context(|| format!("Failed to get store {}", store_id))?;
272        // For now, it's safe to assume that all objects in the root parent and root store should
273        // return space to the metadata reservation, but we might have to revisit that if we end up
274        // with objects that are in other stores.
275        let options = if store_id == self.object_manager.root_parent_store_object_id()
276            || store_id == self.object_manager.root_store_object_id()
277        {
278            Options {
279                skip_journal_checks: true,
280                borrow_metadata_space: true,
281                allocator_reservation: Some(self.object_manager.metadata_reservation()),
282                ..Default::default()
283            }
284        } else {
285            Options { skip_journal_checks: true, borrow_metadata_space: true, ..Default::default() }
286        };
287        store.tombstone_attribute(object_id, attribute_id, options).await
288    }
289
290    async fn trim(&self, store_id: u64, object_id: u64) -> Result<(), Error> {
291        let store = self
292            .object_manager
293            .store(store_id)
294            .with_context(|| format!("Failed to get store {}", store_id))?;
295        let fs = store.filesystem();
296        let truncate_guard = fs.truncate_guard(store_id, object_id).await;
297        store.trim(object_id, &truncate_guard).await.context("Failed to trim object")
298    }
299
300    /// Returns an iterator that will return graveyard entries skipping deleted ones.  Example
301    /// usage:
302    ///
303    ///   let layer_set = graveyard.store().tree().layer_set();
304    ///   let mut merger = layer_set.merger();
305    ///   let mut iter = graveyard.iter(&mut merger).await?;
306    ///
307    pub async fn iter<'a, 'b>(
308        graveyard_object_id: u64,
309        merger: &'a mut Merger<'b, ObjectKey, ObjectValue>,
310    ) -> Result<GraveyardIterator<'a, 'b>, Error> {
311        Self::iter_from(merger, graveyard_object_id, 0).await
312    }
313
314    /// Like "iter", but seeks from a specific (store-id, object-id) tuple.  Example usage:
315    ///
316    ///   let layer_set = graveyard.store().tree().layer_set();
317    ///   let mut merger = layer_set.merger();
318    ///   let mut iter = graveyard.iter_from(&mut merger, (2, 3)).await?;
319    ///
320    async fn iter_from<'a, 'b>(
321        merger: &'a mut Merger<'b, ObjectKey, ObjectValue>,
322        graveyard_object_id: u64,
323        from: u64,
324    ) -> Result<GraveyardIterator<'a, 'b>, Error> {
325        GraveyardIterator::new(
326            graveyard_object_id,
327            merger
328                .query(Query::FullRange(&ObjectKey::graveyard_entry(graveyard_object_id, from)))
329                .await?,
330        )
331        .await
332    }
333}
334
335pub struct GraveyardIterator<'a, 'b> {
336    object_id: u64,
337    iter: MergerIterator<'a, 'b, ObjectKey, ObjectValue>,
338}
339
340/// Contains information about a graveyard entry associated with a particular object or
341/// attribute.
342#[derive(Debug, PartialEq)]
343pub struct GraveyardEntryInfo {
344    object_id: u64,
345    attribute_id: Option<u64>,
346    sequence: u64,
347    value: ObjectValue,
348}
349
350impl GraveyardEntryInfo {
351    pub fn object_id(&self) -> u64 {
352        self.object_id
353    }
354
355    pub fn attribute_id(&self) -> Option<u64> {
356        self.attribute_id
357    }
358
359    pub fn value(&self) -> &ObjectValue {
360        &self.value
361    }
362}
363
364impl<'a, 'b> GraveyardIterator<'a, 'b> {
365    async fn new(
366        object_id: u64,
367        iter: MergerIterator<'a, 'b, ObjectKey, ObjectValue>,
368    ) -> Result<GraveyardIterator<'a, 'b>, Error> {
369        let mut iter = GraveyardIterator { object_id, iter };
370        iter.skip_deleted_entries().await?;
371        Ok(iter)
372    }
373
374    async fn skip_deleted_entries(&mut self) -> Result<(), Error> {
375        loop {
376            match self.iter.get() {
377                Some(ItemRef {
378                    key: ObjectKey { object_id, .. },
379                    value: ObjectValue::None,
380                    ..
381                }) if *object_id == self.object_id => {}
382                _ => return Ok(()),
383            }
384            self.iter.advance().await?;
385        }
386    }
387
388    pub fn get(&self) -> Option<GraveyardEntryInfo> {
389        match self.iter.get() {
390            Some(ItemRef {
391                key: ObjectKey { object_id: oid, data: ObjectKeyData::GraveyardEntry { object_id } },
392                value,
393                sequence,
394                ..
395            }) if *oid == self.object_id => Some(GraveyardEntryInfo {
396                object_id: *object_id,
397                attribute_id: None,
398                sequence,
399                value: value.clone(),
400            }),
401            Some(ItemRef {
402                key:
403                    ObjectKey {
404                        object_id: oid,
405                        data: ObjectKeyData::GraveyardAttributeEntry { object_id, attribute_id },
406                    },
407                value,
408                sequence,
409            }) if *oid == self.object_id => Some(GraveyardEntryInfo {
410                object_id: *object_id,
411                attribute_id: Some(*attribute_id),
412                sequence,
413                value: value.clone(),
414            }),
415            _ => None,
416        }
417    }
418
419    pub async fn advance(&mut self) -> Result<(), Error> {
420        self.iter.advance().await?;
421        self.skip_deleted_entries().await
422    }
423}
424
425#[cfg(test)]
426mod tests {
427    use super::{Graveyard, GraveyardEntryInfo, ObjectStore};
428    use crate::errors::FxfsError;
429    use crate::filesystem::{FxFilesystem, FxFilesystemBuilder};
430    use crate::fsck::fsck;
431    use crate::object_handle::ObjectHandle;
432    use crate::object_store::data_object_handle::WRITE_ATTR_BATCH_SIZE;
433    use crate::object_store::object_record::ObjectValue;
434    use crate::object_store::transaction::{Options, lock_keys};
435    use crate::object_store::{FSVERITY_MERKLE_ATTRIBUTE_ID, HandleOptions, Mutation, ObjectKey};
436    use assert_matches::assert_matches;
437    use storage_device::DeviceHolder;
438    use storage_device::fake_device::FakeDevice;
439
440    const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
441
442    #[fuchsia::test]
443    async fn test_graveyard() {
444        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
445        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
446        let root_store = fs.root_store();
447
448        assert_eq!(root_store.graveyard_count(), 0);
449
450        let mut transaction = fs
451            .clone()
452            .new_transaction(lock_keys![], Options::default())
453            .await
454            .expect("new_transaction failed");
455        let handle1 = ObjectStore::create_object(
456            &root_store,
457            &mut transaction,
458            HandleOptions::default(),
459            None,
460        )
461        .await
462        .expect("create_object failed");
463        let handle2 = ObjectStore::create_object(
464            &root_store,
465            &mut transaction,
466            HandleOptions::default(),
467            None,
468        )
469        .await
470        .expect("create_object failed");
471        transaction.commit().await.expect("commit failed");
472        let id1 = handle1.object_id();
473        let id2 = handle2.object_id();
474
475        // Create and add two objects to the graveyard.
476        let mut transaction = fs
477            .clone()
478            .new_transaction(lock_keys![], Options::default())
479            .await
480            .expect("new_transaction failed");
481
482        root_store.add_to_graveyard(&mut transaction, id1);
483        root_store.add_to_graveyard(&mut transaction, id2);
484        transaction.commit().await.expect("commit failed");
485
486        assert_eq!(root_store.graveyard_count(), 2);
487
488        // Check that we see the objects we added.
489        {
490            let layer_set = root_store.tree().layer_set();
491            let mut merger = layer_set.merger();
492            let mut iter = Graveyard::iter(root_store.graveyard_directory_object_id(), &mut merger)
493                .await
494                .expect("iter failed");
495            assert_matches!(
496                iter.get().expect("missing entry"),
497                GraveyardEntryInfo { object_id, attribute_id: None, value: ObjectValue::Some, .. }
498                if object_id == id1
499            );
500            iter.advance().await.expect("advance failed");
501            assert_matches!(
502                iter.get().expect("missing entry"),
503                GraveyardEntryInfo { object_id, attribute_id: None, value: ObjectValue::Some, .. }
504                if object_id == id2
505            );
506            iter.advance().await.expect("advance failed");
507            assert_eq!(iter.get(), None);
508        }
509
510        // Remove one of the objects.
511        let mut transaction = fs
512            .clone()
513            .new_transaction(lock_keys![], Options::default())
514            .await
515            .expect("new_transaction failed");
516        root_store.remove_from_graveyard(&mut transaction, id2);
517        transaction.commit().await.expect("commit failed");
518
519        assert_eq!(root_store.graveyard_count(), 1);
520
521        // Check that the graveyard has been updated as expected.
522        let layer_set = root_store.tree().layer_set();
523        let mut merger = layer_set.merger();
524        let mut iter = Graveyard::iter(root_store.graveyard_directory_object_id(), &mut merger)
525            .await
526            .expect("iter failed");
527        assert_matches!(
528            iter.get().expect("missing entry"),
529            GraveyardEntryInfo { object_id, attribute_id: None, value: ObjectValue::Some, .. }
530            if object_id == id1
531        );
532        iter.advance().await.expect("advance failed");
533        assert_eq!(iter.get(), None);
534    }
535
536    #[fuchsia::test]
537    async fn test_graveyard_count_replay() {
538        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
539        let (device, _object_ids) = {
540            let fs = FxFilesystemBuilder::new()
541                .skip_initial_reap(true)
542                .format(true)
543                .open(device)
544                .await
545                .expect("open failed");
546            let root_store = fs.root_store();
547
548            let mut object_ids = Vec::new();
549            let mut transaction = fs
550                .clone()
551                .new_transaction(lock_keys![], Options::default())
552                .await
553                .expect("new_transaction failed");
554            let handle1 = ObjectStore::create_object(
555                &root_store,
556                &mut transaction,
557                HandleOptions::default(),
558                None,
559            )
560            .await
561            .expect("create_object failed");
562            let handle2 = ObjectStore::create_object(
563                &root_store,
564                &mut transaction,
565                HandleOptions::default(),
566                None,
567            )
568            .await
569            .expect("create_object failed");
570            transaction.commit().await.expect("commit failed");
571            object_ids.push(handle1.object_id());
572            object_ids.push(handle2.object_id());
573
574            // Create and add two objects to the graveyard.
575            let mut transaction = fs
576                .clone()
577                .new_transaction(lock_keys![], Options::default())
578                .await
579                .expect("new_transaction failed");
580
581            root_store.add_to_graveyard(&mut transaction, object_ids[0]);
582            root_store.add_to_graveyard(&mut transaction, object_ids[1]);
583            transaction.commit().await.expect("commit failed");
584
585            assert_eq!(root_store.graveyard_count(), 2);
586            fs.close().await.expect("close failed");
587            (fs.take_device().await, object_ids)
588        };
589        device.reopen(false);
590        let device = {
591            let fs =
592                FxFilesystemBuilder::new().read_only(true).open(device).await.expect("open failed");
593            let root_store = fs.root_store();
594            // Counter is 0 because initial_reap is not called for read-only mounts.
595            assert_eq!(root_store.graveyard_count(), 0);
596
597            // Now manually run it. This will count and queue (but the reaper isn't running).
598            let count =
599                fs.graveyard().initial_reap(&root_store).await.expect("initial_reap failed");
600            let actual_count = root_store.graveyard_count();
601            assert_eq!(count, 2, "initial_reap found wrong number of items (count={})", count);
602            assert_eq!(
603                actual_count, 2,
604                "graveyard_count returned {} but initial_reap found {}",
605                actual_count, count
606            );
607
608            fs.close().await.expect("close failed");
609            fs.take_device().await
610        };
611        device.reopen(false);
612        {
613            // Now test the full flow where they are automatically reaped.
614            let fs = FxFilesystem::open(device).await.expect("open failed");
615            let root_store = fs.root_store();
616
617            // They might or might not have been reaped yet.
618            // Wait for the reaper to finish.
619            fs.graveyard().wait_for_reap().await;
620
621            // Now the count MUST be 0.
622            assert_eq!(root_store.graveyard_count(), 0);
623            fs.close().await.expect("close failed");
624        }
625    }
626
627    #[fuchsia::test]
628    async fn test_tombstone_attribute() {
629        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
630        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
631        let root_store = fs.root_store();
632        let mut transaction = fs
633            .clone()
634            .new_transaction(lock_keys![], Options::default())
635            .await
636            .expect("new_transaction failed");
637
638        let handle = ObjectStore::create_object(
639            &root_store,
640            &mut transaction,
641            HandleOptions::default(),
642            None,
643        )
644        .await
645        .expect("failed to create object");
646        transaction.commit().await.expect("commit failed");
647
648        handle
649            .write_attr(FSVERITY_MERKLE_ATTRIBUTE_ID, &[0; 8192])
650            .await
651            .expect("failed to write merkle attribute");
652        let object_id = handle.object_id();
653        let mut transaction = handle.new_transaction().await.expect("new_transaction failed");
654        transaction.add(
655            root_store.store_object_id(),
656            Mutation::replace_or_insert_object(
657                ObjectKey::graveyard_attribute_entry(
658                    root_store.graveyard_directory_object_id(),
659                    object_id,
660                    FSVERITY_MERKLE_ATTRIBUTE_ID,
661                ),
662                ObjectValue::Some,
663            ),
664        );
665
666        transaction.commit().await.expect("commit failed");
667
668        fs.close().await.expect("failed to close filesystem");
669        let device = fs.take_device().await;
670        device.reopen(false);
671
672        let fs =
673            FxFilesystemBuilder::new().read_only(true).open(device).await.expect("open failed");
674        fsck(fs.clone()).await.expect("fsck failed");
675        fs.close().await.expect("failed to close filesystem");
676        let device = fs.take_device().await;
677        device.reopen(false);
678
679        // On open, the filesystem will call initial_reap which will call queue_tombstone().
680        let fs = FxFilesystem::open(device).await.expect("open failed");
681        // `wait_for_reap` ensures that the Message::Tombstone is actually processed.
682        fs.graveyard().wait_for_reap().await;
683        let root_store = fs.root_store();
684
685        let handle =
686            ObjectStore::open_object(&root_store, object_id, HandleOptions::default(), None)
687                .await
688                .expect("failed to open object");
689
690        assert_eq!(
691            handle.read_attr(FSVERITY_MERKLE_ATTRIBUTE_ID).await.expect("read_attr failed"),
692            None
693        );
694        fsck(fs.clone()).await.expect("fsck failed");
695    }
696
697    #[fuchsia::test]
698    async fn test_tombstone_attribute_and_object() {
699        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
700        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
701        let root_store = fs.root_store();
702        let mut transaction = fs
703            .clone()
704            .new_transaction(lock_keys![], Options::default())
705            .await
706            .expect("new_transaction failed");
707
708        let handle = ObjectStore::create_object(
709            &root_store,
710            &mut transaction,
711            HandleOptions::default(),
712            None,
713        )
714        .await
715        .expect("failed to create object");
716        transaction.commit().await.expect("commit failed");
717
718        handle
719            .write_attr(FSVERITY_MERKLE_ATTRIBUTE_ID, &[0; 8192])
720            .await
721            .expect("failed to write merkle attribute");
722        let object_id = handle.object_id();
723        let mut transaction = handle.new_transaction().await.expect("new_transaction failed");
724        transaction.add(
725            root_store.store_object_id(),
726            Mutation::replace_or_insert_object(
727                ObjectKey::graveyard_attribute_entry(
728                    root_store.graveyard_directory_object_id(),
729                    object_id,
730                    FSVERITY_MERKLE_ATTRIBUTE_ID,
731                ),
732                ObjectValue::Some,
733            ),
734        );
735        transaction.commit().await.expect("commit failed");
736        let mut transaction = handle.new_transaction().await.expect("new_transaction failed");
737        transaction.add(
738            root_store.store_object_id(),
739            Mutation::replace_or_insert_object(
740                ObjectKey::graveyard_entry(root_store.graveyard_directory_object_id(), object_id),
741                ObjectValue::Some,
742            ),
743        );
744        transaction.commit().await.expect("commit failed");
745
746        fs.close().await.expect("failed to close filesystem");
747        let device = fs.take_device().await;
748        device.reopen(false);
749
750        let fs =
751            FxFilesystemBuilder::new().read_only(true).open(device).await.expect("open failed");
752        fsck(fs.clone()).await.expect("fsck failed");
753        fs.close().await.expect("failed to close filesystem");
754        let device = fs.take_device().await;
755        device.reopen(false);
756
757        // On open, the filesystem will call initial_reap which will call queue_tombstone().
758        let fs = FxFilesystem::open(device).await.expect("open failed");
759        // `wait_for_reap` ensures that the two tombstone messages are processed.
760        fs.graveyard().wait_for_reap().await;
761
762        let root_store = fs.root_store();
763        if let Err(e) =
764            ObjectStore::open_object(&root_store, object_id, HandleOptions::default(), None).await
765        {
766            assert!(FxfsError::NotFound.matches(&e));
767        } else {
768            panic!("open_object succeeded");
769        };
770        fsck(fs.clone()).await.expect("fsck failed");
771    }
772
773    #[fuchsia::test]
774    async fn test_tombstone_large_attribute() {
775        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
776        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
777        let root_store = fs.root_store();
778        let mut transaction = fs
779            .clone()
780            .new_transaction(lock_keys![], Options::default())
781            .await
782            .expect("new_transaction failed");
783
784        let handle = ObjectStore::create_object(
785            &root_store,
786            &mut transaction,
787            HandleOptions::default(),
788            None,
789        )
790        .await
791        .expect("failed to create object");
792        transaction.commit().await.expect("commit failed");
793
794        let object_id = {
795            let mut transaction = handle.new_transaction().await.expect("new_transaction failed");
796            transaction.add(
797                root_store.store_object_id(),
798                Mutation::replace_or_insert_object(
799                    ObjectKey::graveyard_attribute_entry(
800                        root_store.graveyard_directory_object_id(),
801                        handle.object_id(),
802                        FSVERITY_MERKLE_ATTRIBUTE_ID,
803                    ),
804                    ObjectValue::Some,
805                ),
806            );
807
808            // This write should span three transactions. This test mimics the behavior when the
809            // last transaction gets interrupted by a filesystem.close().
810            handle
811                .write_new_attr_in_batches(
812                    &mut transaction,
813                    FSVERITY_MERKLE_ATTRIBUTE_ID,
814                    &vec![0; 3 * WRITE_ATTR_BATCH_SIZE],
815                    WRITE_ATTR_BATCH_SIZE,
816                )
817                .await
818                .expect("failed to write merkle attribute");
819
820            handle.object_id()
821            // Drop the transaction to simulate interrupting the merkle tree creation as well as to
822            // release the transaction locks.
823        };
824
825        fs.close().await.expect("failed to close filesystem");
826        let device = fs.take_device().await;
827        device.reopen(false);
828
829        let fs =
830            FxFilesystemBuilder::new().read_only(true).open(device).await.expect("open failed");
831        fsck(fs.clone()).await.expect("fsck failed");
832        fs.close().await.expect("failed to close filesystem");
833        let device = fs.take_device().await;
834        device.reopen(false);
835
836        // On open, the filesystem will call initial_reap which will call queue_tombstone().
837        let fs = FxFilesystem::open(device).await.expect("open failed");
838        // `wait_for_reap` ensures that the two tombstone messages are processed.
839        fs.graveyard().wait_for_reap().await;
840
841        let root_store = fs.root_store();
842
843        let handle =
844            ObjectStore::open_object(&root_store, object_id, HandleOptions::default(), None)
845                .await
846                .expect("failed to open object");
847
848        assert_eq!(
849            handle.read_attr(FSVERITY_MERKLE_ATTRIBUTE_ID).await.expect("read_attr failed"),
850            None
851        );
852        fsck(fs.clone()).await.expect("fsck failed");
853    }
854}