Skip to main content

fxfs/object_store/
graveyard.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::errors::FxfsError;
6use crate::log::*;
7use crate::lsm_tree::Query;
8use crate::lsm_tree::merge::{Merger, MergerIterator};
9use crate::lsm_tree::types::{ItemRef, LayerIterator};
10use crate::object_store::object_manager::ObjectManager;
11use crate::object_store::object_record::{
12    ObjectAttributes, ObjectKey, ObjectKeyData, ObjectKind, ObjectValue, Timestamp,
13};
14use crate::object_store::transaction::{Mutation, Options, Transaction};
15use crate::object_store::{AttributeId, ObjectStore};
16use anyhow::{Context, Error, anyhow, bail};
17use fuchsia_async::{self as fasync};
18use fuchsia_sync::Mutex;
19use futures::StreamExt;
20use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender, unbounded};
21use futures::channel::oneshot;
22use fxfs_trace::{TraceFutureExt, trace_future_args};
23use std::collections::BTreeSet;
24use std::sync::Arc;
25use std::sync::atomic::Ordering;
26
27enum ReaperTask {
28    None,
29    Pending(UnboundedReceiver<Message>),
30    Running(fasync::Task<()>),
31}
32
33/// A graveyard exists as a place to park objects that should be deleted when they are no longer in
34/// use.  How objects enter and leave the graveyard is up to the caller to decide.  The intention is
35/// that at mount time, any objects in the graveyard will get removed.  Each object store has a
36/// directory like object that contains a list of the objects within that store that are part of the
37/// graveyard.  A single instance of this Graveyard struct manages *all* stores.
38pub struct Graveyard {
39    object_manager: Arc<ObjectManager>,
40    reaper_task: Mutex<ReaperTask>,
41    channel: UnboundedSender<Message>,
42}
43
44enum Message {
45    // Tombstone the object identified by <store-id>, <object-id>, Option<attribute-id>. If
46    // <attribute-id> is Some, tombstone just the attribute instead of the entire object.
47    Tombstone(u64, u64, Option<AttributeId>),
48
49    // Trims the identified object.
50    Trim(u64, u64),
51
52    // When the flush message is processed, notifies sender.  This allows the receiver to know
53    // that all preceding tombstone messages have been processed.
54    Flush(oneshot::Sender<()>),
55}
56
57#[fxfs_trace::trace]
58impl Graveyard {
59    /// Creates a new instance of the graveyard manager.
60    pub fn new(object_manager: Arc<ObjectManager>) -> Arc<Self> {
61        let (sender, receiver) = unbounded();
62        Arc::new(Graveyard {
63            object_manager,
64            reaper_task: Mutex::new(ReaperTask::Pending(receiver)),
65            channel: sender,
66        })
67    }
68
69    /// Creates a graveyard object in `store`.  Returns the object ID for the graveyard object.
70    pub async fn create(
71        transaction: &mut Transaction<'_>,
72        store: &ObjectStore,
73    ) -> Result<u64, Error> {
74        let reserved_object_id = store.get_next_object_id(transaction.txn_guard()).await?;
75        let object_id = reserved_object_id.get();
76        let now = Timestamp::now();
77        transaction.add(
78            store.store_object_id,
79            Mutation::insert_object(
80                ObjectKey::object(reserved_object_id.release()),
81                ObjectValue::Object {
82                    kind: ObjectKind::Graveyard,
83                    attributes: ObjectAttributes {
84                        creation_time: now.clone(),
85                        modification_time: now,
86                        ..Default::default()
87                    },
88                },
89            ),
90        );
91        Ok(object_id)
92    }
93
94    /// Starts an asynchronous task to reap the graveyard for all entries older than
95    /// |journal_offset| (exclusive).
96    /// If a task is already started, this has no effect, even if that task was targeting an older
97    /// |journal_offset|.
98    pub fn reap_async(self: Arc<Self>) {
99        let mut reaper_task = self.reaper_task.lock();
100        if let ReaperTask::Pending(_) = &*reaper_task {
101            if let ReaperTask::Pending(receiver) =
102                std::mem::replace(&mut *reaper_task, ReaperTask::None)
103            {
104                *reaper_task = ReaperTask::Running(fasync::Task::spawn(
105                    self.clone()
106                        .reap_task(receiver)
107                        .trace(trace_future_args!("Graveyard::reap_task")),
108                ));
109            } else {
110                unreachable!();
111            }
112        }
113    }
114
115    /// Returns a future which completes when the ongoing reap task (if it exists) completes.
116    pub async fn wait_for_reap(&self) {
117        self.channel.close_channel();
118        let task = std::mem::replace(&mut *self.reaper_task.lock(), ReaperTask::None);
119        if let ReaperTask::Running(task) = task {
120            task.await;
121        }
122    }
123
124    async fn reap_task(self: Arc<Self>, mut receiver: UnboundedReceiver<Message>) {
125        // Wait and process reap requests.
126        while let Some(message) = receiver.next().await {
127            match message {
128                Message::Tombstone(store_id, object_id, attribute_id) => {
129                    let res = if let Some(attribute_id) = attribute_id {
130                        self.tombstone_attribute(store_id, object_id, attribute_id).await
131                    } else {
132                        self.tombstone_object(store_id, object_id).await
133                    };
134                    if let Err(e) = res {
135                        error!(
136                            error:? = e,
137                            store_id,
138                            oid = object_id,
139                            attribute_id;
140                            "Tombstone error"
141                        );
142                    }
143                }
144                Message::Trim(store_id, object_id) => {
145                    if let Err(e) = self.trim(store_id, object_id).await {
146                        error!(error:? = e, store_id, oid = object_id; "Tombstone error");
147                    }
148                }
149                Message::Flush(sender) => {
150                    let _ = sender.send(());
151                }
152            }
153        }
154    }
155
156    /// Performs the initial mount-time reap for the given store.  This will queue all items in the
157    /// graveyard.  Concurrently adding more entries to the graveyard will lead to undefined
158    /// behaviour: the entries might or might not be immediately tombstoned, so callers should wait
159    /// for this to return before changing to a state where more entries can be added.  Once this
160    /// has returned, entries will be tombstoned in the background.
161    #[trace]
162    pub async fn initial_reap(self: &Arc<Self>, store: &ObjectStore) -> Result<usize, Error> {
163        if store.filesystem().options().skip_initial_reap {
164            return Ok(0);
165        }
166        let mut count = 0;
167        let layer_set = store.tree().layer_set();
168        let mut merger = layer_set.merger();
169        let graveyard_object_id = store.graveyard_directory_object_id();
170        let mut iter = Self::iter(graveyard_object_id, &mut merger).await?;
171        let store_id = store.store_object_id();
172        let mut queued_objects = BTreeSet::new();
173        while let Some(GraveyardEntryInfo { object_id, attribute_id, value }) = iter.get() {
174            store.graveyard_entries.fetch_add(1, Ordering::Relaxed);
175            match value {
176                ObjectValue::Some => {
177                    if let Some(attribute_id) = attribute_id {
178                        // If the object is already queued for tombstone, don't queue any attributes
179                        // under it as well. The object tombstone will clean up any attributes as
180                        // well as their graveyard entries.
181                        if !queued_objects.contains(&(store_id, object_id)) {
182                            self.queue_tombstone_attribute(store_id, object_id, attribute_id)
183                        }
184                    } else {
185                        queued_objects.insert((store_id, object_id));
186                        self.queue_tombstone_object(store_id, object_id)
187                    }
188                }
189                ObjectValue::Trim => {
190                    if attribute_id.is_some() {
191                        return Err(anyhow!(
192                            "Trim is not currently supported for a single attribute"
193                        ));
194                    }
195                    self.queue_trim(store_id, object_id)
196                }
197                _ => bail!(anyhow!(FxfsError::Inconsistent).context("Bad graveyard value")),
198            }
199            count += 1;
200            iter.advance().await?;
201        }
202        Ok(count)
203    }
204    /// Queues an object for tombstoning.
205    pub fn queue_tombstone_object(&self, store_id: u64, object_id: u64) {
206        let _ = self.channel.unbounded_send(Message::Tombstone(store_id, object_id, None));
207    }
208
209    /// Queues an object's attribute for tombstoning.
210    pub fn queue_tombstone_attribute(
211        &self,
212        store_id: u64,
213        object_id: u64,
214        attribute_id: AttributeId,
215    ) {
216        let _ = self.channel.unbounded_send(Message::Tombstone(
217            store_id,
218            object_id,
219            Some(attribute_id),
220        ));
221    }
222
223    fn queue_trim(&self, store_id: u64, object_id: u64) {
224        let _ = self.channel.unbounded_send(Message::Trim(store_id, object_id));
225    }
226
227    /// Waits for all preceding queued tombstones to finish.
228    pub async fn flush(&self) {
229        let (sender, receiver) = oneshot::channel::<()>();
230        self.channel.unbounded_send(Message::Flush(sender)).unwrap();
231        receiver.await.unwrap();
232    }
233
234    /// Immediately tombstones (discards) an object in the graveyard.
235    /// NB: Code should generally use |queue_tombstone| instead.
236    pub async fn tombstone_object(&self, store_id: u64, object_id: u64) -> Result<(), Error> {
237        let store = self
238            .object_manager
239            .store(store_id)
240            .with_context(|| format!("Failed to get store {}", store_id))?;
241        // For now, it's safe to assume that all objects in the root parent and root store should
242        // return space to the metadata reservation, but we might have to revisit that if we end up
243        // with objects that are in other stores.
244        let options = if store_id == self.object_manager.root_parent_store_object_id()
245            || store_id == self.object_manager.root_store_object_id()
246        {
247            Options {
248                skip_journal_checks: true,
249                borrow_metadata_space: true,
250                allocator_reservation: Some(self.object_manager.metadata_reservation()),
251                ..Default::default()
252            }
253        } else {
254            Options { skip_journal_checks: true, borrow_metadata_space: true, ..Default::default() }
255        };
256        store.tombstone_object(object_id, options).await
257    }
258
259    /// Immediately tombstones (discards) and attribute in the graveyard.
260    /// NB: Code should generally use |queue_tombstone| instead.
261    pub async fn tombstone_attribute(
262        &self,
263        store_id: u64,
264        object_id: u64,
265        attribute_id: AttributeId,
266    ) -> Result<(), Error> {
267        let store = self
268            .object_manager
269            .store(store_id)
270            .with_context(|| format!("Failed to get store {}", store_id))?;
271        // For now, it's safe to assume that all objects in the root parent and root store should
272        // return space to the metadata reservation, but we might have to revisit that if we end up
273        // with objects that are in other stores.
274        let options = if store_id == self.object_manager.root_parent_store_object_id()
275            || store_id == self.object_manager.root_store_object_id()
276        {
277            Options {
278                skip_journal_checks: true,
279                borrow_metadata_space: true,
280                allocator_reservation: Some(self.object_manager.metadata_reservation()),
281                ..Default::default()
282            }
283        } else {
284            Options { skip_journal_checks: true, borrow_metadata_space: true, ..Default::default() }
285        };
286        store.tombstone_attribute(object_id, attribute_id, options).await
287    }
288
289    async fn trim(&self, store_id: u64, object_id: u64) -> Result<(), Error> {
290        let store = self
291            .object_manager
292            .store(store_id)
293            .with_context(|| format!("Failed to get store {}", store_id))?;
294        let fs = store.filesystem();
295        let truncate_guard = fs.truncate_guard(store_id, object_id).await;
296        store.trim(object_id, &truncate_guard).await.context("Failed to trim object")
297    }
298
299    /// Returns an iterator that will return graveyard entries skipping deleted ones.  Example
300    /// usage:
301    ///
302    ///   let layer_set = graveyard.store().tree().layer_set();
303    ///   let mut merger = layer_set.merger();
304    ///   let mut iter = graveyard.iter(&mut merger).await?;
305    ///
306    pub async fn iter<'a, 'b>(
307        graveyard_object_id: u64,
308        merger: &'a mut Merger<'b, ObjectKey, ObjectValue>,
309    ) -> Result<GraveyardIterator<'a, 'b>, Error> {
310        Self::iter_from(merger, graveyard_object_id, 0).await
311    }
312
313    /// Like "iter", but seeks from a specific (store-id, object-id) tuple.  Example usage:
314    ///
315    ///   let layer_set = graveyard.store().tree().layer_set();
316    ///   let mut merger = layer_set.merger();
317    ///   let mut iter = graveyard.iter_from(&mut merger, (2, 3)).await?;
318    ///
319    async fn iter_from<'a, 'b>(
320        merger: &'a mut Merger<'b, ObjectKey, ObjectValue>,
321        graveyard_object_id: u64,
322        from: u64,
323    ) -> Result<GraveyardIterator<'a, 'b>, Error> {
324        GraveyardIterator::new(
325            graveyard_object_id,
326            merger
327                .query(Query::FullRange(&ObjectKey::graveyard_entry(graveyard_object_id, from)))
328                .await?,
329        )
330        .await
331    }
332}
333
334pub struct GraveyardIterator<'a, 'b> {
335    object_id: u64,
336    iter: MergerIterator<'a, 'b, ObjectKey, ObjectValue>,
337}
338
339/// Contains information about a graveyard entry associated with a particular object or
340/// attribute.
341#[derive(Debug, PartialEq)]
342pub struct GraveyardEntryInfo {
343    object_id: u64,
344    attribute_id: Option<AttributeId>,
345    value: ObjectValue,
346}
347
348impl GraveyardEntryInfo {
349    pub fn object_id(&self) -> u64 {
350        self.object_id
351    }
352
353    pub fn attribute_id(&self) -> Option<AttributeId> {
354        self.attribute_id
355    }
356
357    pub fn value(&self) -> &ObjectValue {
358        &self.value
359    }
360}
361
362impl<'a, 'b> GraveyardIterator<'a, 'b> {
363    async fn new(
364        object_id: u64,
365        iter: MergerIterator<'a, 'b, ObjectKey, ObjectValue>,
366    ) -> Result<GraveyardIterator<'a, 'b>, Error> {
367        let mut iter = GraveyardIterator { object_id, iter };
368        iter.skip_deleted_entries().await?;
369        Ok(iter)
370    }
371
372    async fn skip_deleted_entries(&mut self) -> Result<(), Error> {
373        loop {
374            match self.iter.get() {
375                Some(ItemRef {
376                    key: ObjectKey { object_id, .. },
377                    value: ObjectValue::None,
378                    ..
379                }) if *object_id == self.object_id => {}
380                _ => return Ok(()),
381            }
382            self.iter.advance().await?;
383        }
384    }
385
386    pub fn get(&self) -> Option<GraveyardEntryInfo> {
387        match self.iter.get() {
388            Some(ItemRef {
389                key: ObjectKey { object_id: oid, data: ObjectKeyData::GraveyardEntry { object_id } },
390                value,
391                ..
392            }) if *oid == self.object_id => Some(GraveyardEntryInfo {
393                object_id: *object_id,
394                attribute_id: None,
395                value: value.clone(),
396            }),
397            Some(ItemRef {
398                key:
399                    ObjectKey {
400                        object_id: oid,
401                        data: ObjectKeyData::GraveyardAttributeEntry { object_id, attribute_id },
402                    },
403                value,
404                ..
405            }) if *oid == self.object_id => Some(GraveyardEntryInfo {
406                object_id: *object_id,
407                attribute_id: Some(*attribute_id),
408                value: value.clone(),
409            }),
410            _ => None,
411        }
412    }
413
414    pub async fn advance(&mut self) -> Result<(), Error> {
415        self.iter.advance().await?;
416        self.skip_deleted_entries().await
417    }
418}
419
420#[cfg(test)]
421mod tests {
422    use super::{Graveyard, GraveyardEntryInfo, ObjectStore};
423    use crate::errors::FxfsError;
424    use crate::filesystem::{FxFilesystem, FxFilesystemBuilder};
425    use crate::fsck::fsck;
426    use crate::object_handle::ObjectHandle;
427    use crate::object_store::data_object_handle::WRITE_ATTR_BATCH_SIZE;
428    use crate::object_store::object_record::ObjectValue;
429    use crate::object_store::transaction::{Options, lock_keys};
430    use crate::object_store::{AttributeId, HandleOptions, Mutation, ObjectKey};
431    use assert_matches::assert_matches;
432    use storage_device::DeviceHolder;
433    use storage_device::fake_device::FakeDevice;
434
435    const TEST_DEVICE_BLOCK_SIZE: u32 = 512;
436
437    #[fuchsia::test]
438    async fn test_graveyard() {
439        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
440        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
441        let root_store = fs.root_store();
442
443        assert_eq!(root_store.graveyard_count(), 0);
444
445        let mut transaction = fs
446            .clone()
447            .new_transaction(lock_keys![], Options::default())
448            .await
449            .expect("new_transaction failed");
450        let handle1 = ObjectStore::create_object(
451            &root_store,
452            &mut transaction,
453            HandleOptions::default(),
454            None,
455        )
456        .await
457        .expect("create_object failed");
458        let handle2 = ObjectStore::create_object(
459            &root_store,
460            &mut transaction,
461            HandleOptions::default(),
462            None,
463        )
464        .await
465        .expect("create_object failed");
466        transaction.commit().await.expect("commit failed");
467        let id1 = handle1.object_id();
468        let id2 = handle2.object_id();
469
470        // Create and add two objects to the graveyard.
471        let mut transaction = fs
472            .clone()
473            .new_transaction(lock_keys![], Options::default())
474            .await
475            .expect("new_transaction failed");
476
477        root_store.add_to_graveyard(&mut transaction, id1);
478        root_store.add_to_graveyard(&mut transaction, id2);
479        transaction.commit().await.expect("commit failed");
480
481        assert_eq!(root_store.graveyard_count(), 2);
482
483        // Check that we see the objects we added.
484        {
485            let layer_set = root_store.tree().layer_set();
486            let mut merger = layer_set.merger();
487            let mut iter = Graveyard::iter(root_store.graveyard_directory_object_id(), &mut merger)
488                .await
489                .expect("iter failed");
490            assert_matches!(
491                iter.get().expect("missing entry"),
492                GraveyardEntryInfo { object_id, attribute_id: None, value: ObjectValue::Some }
493                if object_id == id1
494            );
495            iter.advance().await.expect("advance failed");
496            assert_matches!(
497                iter.get().expect("missing entry"),
498                GraveyardEntryInfo { object_id, attribute_id: None, value: ObjectValue::Some }
499                if object_id == id2
500            );
501            iter.advance().await.expect("advance failed");
502            assert_eq!(iter.get(), None);
503        }
504
505        // Remove one of the objects.
506        let mut transaction = fs
507            .clone()
508            .new_transaction(lock_keys![], Options::default())
509            .await
510            .expect("new_transaction failed");
511        root_store.remove_from_graveyard(&mut transaction, id2);
512        transaction.commit().await.expect("commit failed");
513
514        assert_eq!(root_store.graveyard_count(), 1);
515
516        // Check that the graveyard has been updated as expected.
517        let layer_set = root_store.tree().layer_set();
518        let mut merger = layer_set.merger();
519        let mut iter = Graveyard::iter(root_store.graveyard_directory_object_id(), &mut merger)
520            .await
521            .expect("iter failed");
522        assert_matches!(
523            iter.get().expect("missing entry"),
524            GraveyardEntryInfo { object_id, attribute_id: None, value: ObjectValue::Some }
525            if object_id == id1
526        );
527        iter.advance().await.expect("advance failed");
528        assert_eq!(iter.get(), None);
529    }
530
531    #[fuchsia::test]
532    async fn test_graveyard_count_replay() {
533        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
534        let (device, _object_ids) = {
535            let fs = FxFilesystemBuilder::new()
536                .skip_initial_reap(true)
537                .format(true)
538                .open(device)
539                .await
540                .expect("open failed");
541            let root_store = fs.root_store();
542
543            let mut object_ids = Vec::new();
544            let mut transaction = fs
545                .clone()
546                .new_transaction(lock_keys![], Options::default())
547                .await
548                .expect("new_transaction failed");
549            let handle1 = ObjectStore::create_object(
550                &root_store,
551                &mut transaction,
552                HandleOptions::default(),
553                None,
554            )
555            .await
556            .expect("create_object failed");
557            let handle2 = ObjectStore::create_object(
558                &root_store,
559                &mut transaction,
560                HandleOptions::default(),
561                None,
562            )
563            .await
564            .expect("create_object failed");
565            transaction.commit().await.expect("commit failed");
566            object_ids.push(handle1.object_id());
567            object_ids.push(handle2.object_id());
568
569            // Create and add two objects to the graveyard.
570            let mut transaction = fs
571                .clone()
572                .new_transaction(lock_keys![], Options::default())
573                .await
574                .expect("new_transaction failed");
575
576            root_store.add_to_graveyard(&mut transaction, object_ids[0]);
577            root_store.add_to_graveyard(&mut transaction, object_ids[1]);
578            transaction.commit().await.expect("commit failed");
579
580            assert_eq!(root_store.graveyard_count(), 2);
581            fs.close().await.expect("close failed");
582            (fs.take_device().await, object_ids)
583        };
584        device.reopen(false);
585        let device = {
586            let fs =
587                FxFilesystemBuilder::new().read_only(true).open(device).await.expect("open failed");
588            let root_store = fs.root_store();
589            // Counter is 0 because initial_reap is not called for read-only mounts.
590            assert_eq!(root_store.graveyard_count(), 0);
591
592            // Now manually run it. This will count and queue (but the reaper isn't running).
593            let count =
594                fs.graveyard().initial_reap(&root_store).await.expect("initial_reap failed");
595            let actual_count = root_store.graveyard_count();
596            assert_eq!(count, 2, "initial_reap found wrong number of items (count={})", count);
597            assert_eq!(
598                actual_count, 2,
599                "graveyard_count returned {} but initial_reap found {}",
600                actual_count, count
601            );
602
603            fs.close().await.expect("close failed");
604            fs.take_device().await
605        };
606        device.reopen(false);
607        {
608            // Now test the full flow where they are automatically reaped.
609            let fs = FxFilesystem::open(device).await.expect("open failed");
610            let root_store = fs.root_store();
611
612            // They might or might not have been reaped yet.
613            // Wait for the reaper to finish.
614            fs.graveyard().wait_for_reap().await;
615
616            // Now the count MUST be 0.
617            assert_eq!(root_store.graveyard_count(), 0);
618            fs.close().await.expect("close failed");
619        }
620    }
621
622    #[fuchsia::test]
623    async fn test_tombstone_attribute() {
624        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
625        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
626        let root_store = fs.root_store();
627        let mut transaction = fs
628            .clone()
629            .new_transaction(lock_keys![], Options::default())
630            .await
631            .expect("new_transaction failed");
632
633        let handle = ObjectStore::create_object(
634            &root_store,
635            &mut transaction,
636            HandleOptions::default(),
637            None,
638        )
639        .await
640        .expect("failed to create object");
641        transaction.commit().await.expect("commit failed");
642
643        handle
644            .write_attr(AttributeId::TEST_ID, &[0; 8192])
645            .await
646            .expect("failed to write attribute");
647        let object_id = handle.object_id();
648        let mut transaction = handle.new_transaction().await.expect("new_transaction failed");
649        transaction.add(
650            root_store.store_object_id(),
651            Mutation::replace_or_insert_object(
652                ObjectKey::graveyard_attribute_entry(
653                    root_store.graveyard_directory_object_id(),
654                    object_id,
655                    AttributeId::TEST_ID,
656                ),
657                ObjectValue::Some,
658            ),
659        );
660
661        transaction.commit().await.expect("commit failed");
662
663        fs.close().await.expect("failed to close filesystem");
664        let device = fs.take_device().await;
665        device.reopen(false);
666
667        let fs =
668            FxFilesystemBuilder::new().read_only(true).open(device).await.expect("open failed");
669        fsck(fs.clone()).await.expect("fsck failed");
670        fs.close().await.expect("failed to close filesystem");
671        let device = fs.take_device().await;
672        device.reopen(false);
673
674        // On open, the filesystem will call initial_reap which will call queue_tombstone().
675        let fs = FxFilesystem::open(device).await.expect("open failed");
676        // `wait_for_reap` ensures that the Message::Tombstone is actually processed.
677        fs.graveyard().wait_for_reap().await;
678        let root_store = fs.root_store();
679
680        let handle =
681            ObjectStore::open_object(&root_store, object_id, HandleOptions::default(), None)
682                .await
683                .expect("failed to open object");
684
685        assert_eq!(handle.read_attr(AttributeId::TEST_ID).await.expect("read_attr failed"), None);
686        fsck(fs.clone()).await.expect("fsck failed");
687    }
688
689    #[fuchsia::test]
690    async fn test_tombstone_attribute_and_object() {
691        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
692        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
693        let root_store = fs.root_store();
694        let mut transaction = fs
695            .clone()
696            .new_transaction(lock_keys![], Options::default())
697            .await
698            .expect("new_transaction failed");
699
700        let handle = ObjectStore::create_object(
701            &root_store,
702            &mut transaction,
703            HandleOptions::default(),
704            None,
705        )
706        .await
707        .expect("failed to create object");
708        transaction.commit().await.expect("commit failed");
709
710        const ATTR_1: AttributeId = AttributeId::TEST_ID;
711        const ATTR_2: AttributeId = AttributeId::TEST_ID.next();
712        // With both of these it will test that both their graveyard entries got cleaned up in
713        // trim_or_tombstone() via two different paths.
714        handle.write_attr(ATTR_1, &[0; 8192]).await.expect("failed to write attribute");
715        handle.write_attr(ATTR_2, &[0; 8192]).await.expect("failed to write attribute");
716        let object_id = handle.object_id();
717        let mut transaction = handle.new_transaction().await.expect("new_transaction failed");
718        transaction.add(
719            root_store.store_object_id(),
720            Mutation::replace_or_insert_object(
721                ObjectKey::graveyard_attribute_entry(
722                    root_store.graveyard_directory_object_id(),
723                    object_id,
724                    ATTR_1,
725                ),
726                ObjectValue::Some,
727            ),
728        );
729        transaction.add(
730            root_store.store_object_id(),
731            Mutation::replace_or_insert_object(
732                ObjectKey::graveyard_attribute_entry(
733                    root_store.graveyard_directory_object_id(),
734                    object_id,
735                    ATTR_2,
736                ),
737                ObjectValue::Some,
738            ),
739        );
740        transaction.commit().await.expect("commit failed");
741        let mut transaction = handle.new_transaction().await.expect("new_transaction failed");
742        transaction.add(
743            root_store.store_object_id(),
744            Mutation::replace_or_insert_object(
745                ObjectKey::graveyard_entry(root_store.graveyard_directory_object_id(), object_id),
746                ObjectValue::Some,
747            ),
748        );
749        transaction.commit().await.expect("commit failed");
750
751        fs.close().await.expect("failed to close filesystem");
752        let device = fs.take_device().await;
753        device.reopen(false);
754
755        let fs =
756            FxFilesystemBuilder::new().read_only(true).open(device).await.expect("open failed");
757        fsck(fs.clone()).await.expect("fsck failed");
758        fs.close().await.expect("failed to close filesystem");
759        let device = fs.take_device().await;
760        device.reopen(false);
761
762        // On open, the filesystem will call initial_reap which will call queue_tombstone().
763        let fs = FxFilesystem::open(device).await.expect("open failed");
764        // `wait_for_reap` ensures that the two tombstone messages are processed.
765        fs.graveyard().wait_for_reap().await;
766
767        let root_store = fs.root_store();
768        if let Err(e) =
769            ObjectStore::open_object(&root_store, object_id, HandleOptions::default(), None).await
770        {
771            assert!(FxfsError::NotFound.matches(&e));
772        } else {
773            panic!("open_object succeeded");
774        };
775        fsck(fs.clone()).await.expect("fsck failed");
776    }
777
778    #[fuchsia::test]
779    async fn test_tombstone_large_attribute() {
780        let device = DeviceHolder::new(FakeDevice::new(8192, TEST_DEVICE_BLOCK_SIZE));
781        let fs = FxFilesystem::new_empty(device).await.expect("new_empty failed");
782        let root_store = fs.root_store();
783        let mut transaction = fs
784            .clone()
785            .new_transaction(lock_keys![], Options::default())
786            .await
787            .expect("new_transaction failed");
788
789        let handle = ObjectStore::create_object(
790            &root_store,
791            &mut transaction,
792            HandleOptions::default(),
793            None,
794        )
795        .await
796        .expect("failed to create object");
797        transaction.commit().await.expect("commit failed");
798
799        let object_id = {
800            let mut transaction = handle.new_transaction().await.expect("new_transaction failed");
801            transaction.add(
802                root_store.store_object_id(),
803                Mutation::replace_or_insert_object(
804                    ObjectKey::graveyard_attribute_entry(
805                        root_store.graveyard_directory_object_id(),
806                        handle.object_id(),
807                        AttributeId::TEST_ID,
808                    ),
809                    ObjectValue::Some,
810                ),
811            );
812
813            // This write should span three transactions. This test mimics the behavior when the
814            // last transaction gets interrupted by a filesystem.close().
815            handle
816                .write_new_attr_in_batches(
817                    &mut transaction,
818                    AttributeId::TEST_ID,
819                    &vec![0; 3 * WRITE_ATTR_BATCH_SIZE],
820                    WRITE_ATTR_BATCH_SIZE,
821                )
822                .await
823                .expect("failed to write attribute");
824
825            handle.object_id()
826            // Drop the transaction to simulate interrupting the attribute creation as well as to
827            // release the transaction locks.
828        };
829
830        fs.close().await.expect("failed to close filesystem");
831        let device = fs.take_device().await;
832        device.reopen(false);
833
834        let fs =
835            FxFilesystemBuilder::new().read_only(true).open(device).await.expect("open failed");
836        fsck(fs.clone()).await.expect("fsck failed");
837        fs.close().await.expect("failed to close filesystem");
838        let device = fs.take_device().await;
839        device.reopen(false);
840
841        // On open, the filesystem will call initial_reap which will call queue_tombstone().
842        let fs = FxFilesystem::open(device).await.expect("open failed");
843        // `wait_for_reap` ensures that the two tombstone messages are processed.
844        fs.graveyard().wait_for_reap().await;
845
846        let root_store = fs.root_store();
847
848        let handle =
849            ObjectStore::open_object(&root_store, object_id, HandleOptions::default(), None)
850                .await
851                .expect("failed to open object");
852
853        assert_eq!(handle.read_attr(AttributeId::TEST_ID).await.expect("read_attr failed"), None);
854        fsck(fs.clone()).await.expect("fsck failed");
855    }
856}