fxfs/object_store/
merge.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use super::extent_record::{ExtentKey, ExtentValue};
6use super::object_record::{AttributeKey, ObjectKey, ObjectKeyData, ObjectValue, ProjectProperty};
7use crate::log::*;
8use crate::lsm_tree::merge::ItemOp::{Discard, Keep, Replace};
9use crate::lsm_tree::merge::{MergeLayerIterator, MergeResult};
10use crate::lsm_tree::types::Item;
11
12fn merge_extents(
13    object_id: u64,
14    attribute_id: u64,
15    left: &MergeLayerIterator<'_, ObjectKey, ObjectValue>,
16    right: &MergeLayerIterator<'_, ObjectKey, ObjectValue>,
17    left_key: &ExtentKey,
18    right_key: &ExtentKey,
19    left_value: &ExtentValue,
20    right_value: &ExtentValue,
21) -> MergeResult<ObjectKey, ObjectValue> {
22    // For now, we don't support/expect two extents with the same key in one layer.
23    // One reason you can't merge deleted extents in non-adjacent layers is because you
24    // can't decide which layer the merge should end up in.
25    //
26    // Consider this scenario:
27    //   |X-X-X|
28    //      |a-a-a|
29    //         |X-X-X|
30    // If you merge the deleted extents here, you might end up with:
31    //   |X-X-X-X-X-X|
32    //      |a-a-a|
33    // which is clearly incorrect.
34    debug_assert!(right.layer_index != left.layer_index);
35
36    if let (ExtentValue::None, ExtentValue::None) = (left_value, right_value) {
37        if (left.layer_index as i32 - right.layer_index as i32).abs() == 1 {
38            // Two deletions in adjacent layers can be merged.
39            return merge_deleted_extents(
40                object_id,
41                attribute_id,
42                left_key,
43                right_key,
44                std::cmp::min(left.sequence(), right.sequence()),
45            );
46        }
47    }
48
49    if left_key.range.end <= right_key.range.start {
50        // Extents don't overlap.
51        return MergeResult::EmitLeft;
52    }
53
54    // The start of the left extent is <= the start of the right extent, due to merge key ordering.
55    //
56    // One of the extents has to win. The way we break this tie is by picking the extent from the
57    // newest layer (i.e. the layer with the lowest index).
58    //
59    // Generally, we'll be doing the following:
60    //
61    //  Old  |----------|
62    //  New          |----------|
63    //
64    // Turns into
65    //
66    //  Emit  |------|
67    //  Old          |--|
68    //  New          |----------|
69
70    if right.layer_index < left.layer_index {
71        // Right layer is newer.
72        debug_assert!(left_key.range.start < right_key.range.start);
73        return MergeResult::Other {
74            emit: Some(
75                Item::new_with_sequence(
76                    ObjectKey::extent(
77                        object_id,
78                        attribute_id,
79                        left_key.range.start..right_key.range.start,
80                    ),
81                    ObjectValue::Extent(left_value.shrunk(
82                        left_key.range.end - left_key.range.start,
83                        right_key.range.start - left_key.range.start,
84                    )),
85                    std::cmp::min(left.sequence(), right.sequence()),
86                )
87                .boxed(),
88            ),
89            left: Replace(
90                Item::new_with_sequence(
91                    ObjectKey::extent(
92                        object_id,
93                        attribute_id,
94                        right_key.range.start..left_key.range.end,
95                    ),
96                    ObjectValue::Extent(left_value.offset_by(
97                        right_key.range.start - left_key.range.start,
98                        left_key.range.end - left_key.range.start,
99                    )),
100                    std::cmp::min(left.sequence(), right.sequence()),
101                )
102                .boxed(),
103            ),
104            right: Keep,
105        };
106    }
107    // Left layer is newer.
108    if left_key.range.end >= right_key.range.end {
109        // The left key entirely contains the right key.
110        return MergeResult::Other { emit: None, left: Keep, right: Discard };
111    }
112    MergeResult::Other {
113        emit: None,
114        left: Keep,
115        right: Replace(
116            Item::new_with_sequence(
117                ObjectKey::extent(object_id, attribute_id, left_key.range.end..right_key.range.end),
118                ObjectValue::Extent(right_value.offset_by(
119                    left_key.range.end - right_key.range.start,
120                    right_key.range.end - right_key.range.start,
121                )),
122                std::cmp::min(left.sequence(), right.sequence()),
123            )
124            .boxed(),
125        ),
126    }
127}
128
129// Assumes that the two extents to be merged are on adjacent layers (i.e. layers N, N+1).
130fn merge_deleted_extents(
131    object_id: u64,
132    attribute_id: u64,
133    left_key: &ExtentKey,
134    right_key: &ExtentKey,
135    sequence: u64,
136) -> MergeResult<ObjectKey, ObjectValue> {
137    if left_key.range.end < right_key.range.start {
138        // The extents are not adjacent or overlapping.
139        return MergeResult::EmitLeft;
140    }
141    // Both of these are deleted extents which are either adjacent or overlapping, which means
142    // we can coalesce the records.
143    if left_key.range.end >= right_key.range.end {
144        // The left deletion eclipses the right, so just keep the left.
145        return MergeResult::Other { emit: None, left: Keep, right: Discard };
146    }
147    MergeResult::Other {
148        emit: None,
149        left: Discard,
150        right: Replace(Box::new(Item::new_with_sequence(
151            ObjectKey::extent(object_id, attribute_id, left_key.range.start..right_key.range.end),
152            ObjectValue::deleted_extent(),
153            sequence,
154        ))),
155    }
156}
157
158/// Merge function for items in the object store.
159///
160/// The most interesting behaviour in this merge function is how extents are handled. Since extents
161/// can overlap and replace one another, the merge function generally builds up the most
162/// recent view of the extents in the tree, so that the output of a full merge contains no
163/// overlapping extents. You can imagine looking down at the extents from the top-most layer.
164///
165/// A brief example:
166///
167/// Layer 0   |a-a-a-a|     |b-b-b|
168/// Layer 1   |c-c-c-c-c|
169/// Layer 2                     |d-d-d-d|
170///
171/// Merged    |a-a-a-a|c|   |b-b-b|d-d-d|
172///
173/// Adjacent or overlapping extent deletions in two adjacent layers can be merged into single
174/// records (since they do not have a physical offset, so there's no need to keep the physical
175/// extents contiguous). We can't merge deletions from non-adjacent layers, since that would
176/// cause issues in situations like this:
177///
178/// Layer 0         |X-X-X|
179/// Layer 1   |a-a-a-a-a-a|
180/// Layer 2   |X-X-X|
181///
182/// Merging the two deletions in layers 0 and 2 would either result in the middle extent being
183/// fully occluded or not at all (depending on whether we replaced on the left or right layer).
184pub fn merge(
185    left: &MergeLayerIterator<'_, ObjectKey, ObjectValue>,
186    right: &MergeLayerIterator<'_, ObjectKey, ObjectValue>,
187) -> MergeResult<ObjectKey, ObjectValue> {
188    if left.key().object_id != right.key().object_id {
189        return MergeResult::EmitLeft;
190    }
191    match (left.key(), right.key(), left.value(), right.value()) {
192        (
193            ObjectKey {
194                object_id,
195                data: ObjectKeyData::Attribute(left_attr_id, AttributeKey::Extent(left_extent_key)),
196            },
197            ObjectKey {
198                object_id: _,
199                data:
200                    ObjectKeyData::Attribute(right_attr_id, AttributeKey::Extent(right_extent_key)),
201            },
202            ObjectValue::Extent(left_extent),
203            ObjectValue::Extent(right_extent),
204        ) if left_attr_id == right_attr_id => {
205            return merge_extents(
206                *object_id,
207                *left_attr_id,
208                left,
209                right,
210                left_extent_key,
211                right_extent_key,
212                left_extent,
213                right_extent,
214            );
215        }
216        (
217            ObjectKey {
218                object_id: _,
219                data:
220                    ObjectKeyData::Project {
221                        project_id: left_project_id,
222                        property: ProjectProperty::Usage,
223                    },
224            },
225            ObjectKey {
226                object_id: _,
227                data:
228                    ObjectKeyData::Project {
229                        project_id: right_project_id,
230                        property: ProjectProperty::Usage,
231                    },
232            },
233            ObjectValue::BytesAndNodes { bytes: left_bytes, nodes: left_nodes },
234            ObjectValue::BytesAndNodes { bytes: right_bytes, nodes: right_nodes },
235        ) if left_project_id == right_project_id => {
236            let bytes = left_bytes + right_bytes;
237            let nodes = left_nodes + right_nodes;
238            // Tombstone the tracking when it goes to zero.
239            match (bytes, nodes) {
240                (0, 0) => MergeResult::Other { emit: None, left: Discard, right: Discard },
241                _ => MergeResult::Other {
242                    emit: None,
243                    left: Discard,
244                    right: Replace(
245                        Item {
246                            key: right.key().clone(),
247                            value: ObjectValue::BytesAndNodes { bytes, nodes },
248                            sequence: right.sequence(),
249                        }
250                        .boxed(),
251                    ),
252                },
253            }
254        }
255        // Tombstones (ObjectKeyData::Object) compare before others, so always appear on left.
256        (ObjectKey { data: ObjectKeyData::Object, .. }, _, ObjectValue::None, _) => {
257            if left.layer_index > right.layer_index {
258                warn!("Detected inconsistency: record has been inserted after tombstone");
259                MergeResult::EmitLeft
260            } else {
261                MergeResult::Other { emit: None, left: Keep, right: Discard }
262            }
263        }
264        // Note that identical keys are sorted by layer_index, so left is always newer.
265        (left_key, right_key, _, _) if left_key == right_key => {
266            debug_assert!(left.layer_index < right.layer_index);
267            MergeResult::Other { emit: None, left: Keep, right: Discard }
268        }
269        _ => MergeResult::EmitLeft,
270    }
271}
272
273#[cfg(test)]
274mod tests {
275    use super::merge;
276    use crate::checksum::Checksums;
277    use crate::lsm_tree::cache::NullCache;
278    use crate::lsm_tree::types::{Item, LayerIterator, MergeableKey, Value};
279    use crate::lsm_tree::{LSMTree, Query};
280    use crate::object_store::extent_record::ExtentValue;
281    use crate::object_store::object_record::{AttributeKey, ObjectKey, ObjectValue, Timestamp};
282    use crate::object_store::VOLUME_DATA_KEY_ID;
283    use anyhow::Error;
284
285    async fn test_merge<K: MergeableKey, V: Value + PartialEq>(
286        tree: &LSMTree<K, V>,
287        layer0: &[Item<K, V>],
288        layer1: &[Item<K, V>],
289        expected: &[Item<K, V>],
290    ) {
291        for item in layer1 {
292            tree.insert(item.clone()).expect("insert error");
293        }
294        tree.seal();
295        for item in layer0 {
296            tree.insert(item.clone()).expect("insert error");
297        }
298        let layer_set = tree.layer_set();
299        let mut merger = layer_set.merger();
300        let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
301        for e in expected {
302            assert_eq!(iter.get().expect("get failed"), e.as_item_ref());
303            iter.advance().await.expect("advance failed");
304        }
305        assert!(iter.get().is_none());
306    }
307
308    #[fuchsia::test]
309    async fn test_merge_extents_non_overlapping() -> Result<(), Error> {
310        let object_id = 0;
311        let attr_id = 0;
312        let tree = LSMTree::new(merge, Box::new(NullCache {}));
313
314        tree.insert(Item::new(
315            ObjectKey::extent(object_id, attr_id, 0..512),
316            ObjectValue::Extent(ExtentValue::new_raw(0, VOLUME_DATA_KEY_ID)),
317        ))
318        .expect("insert error");
319        tree.seal();
320
321        tree.insert(Item::new(
322            ObjectKey::extent(object_id, attr_id, 512..1024),
323            ObjectValue::Extent(ExtentValue::new_raw(16384, VOLUME_DATA_KEY_ID)),
324        ))
325        .expect("insert error");
326
327        let layer_set = tree.layer_set();
328        let mut merger = layer_set.merger();
329        let mut iter = merger.query(Query::FullScan).await?;
330        assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 0..512));
331        iter.advance().await?;
332        assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 512..1024));
333        iter.advance().await?;
334        assert!(iter.get().is_none());
335        Ok(())
336    }
337
338    #[fuchsia::test]
339    async fn test_merge_extents_rewrite_right() -> Result<(), Error> {
340        let object_id = 0;
341        let attr_id = 0;
342        let tree = LSMTree::new(merge, Box::new(NullCache {}));
343
344        tree.insert(Item::new(
345            ObjectKey::extent(object_id, attr_id, 0..1024),
346            ObjectValue::Extent(ExtentValue::new_raw(0, VOLUME_DATA_KEY_ID)),
347        ))
348        .expect("insert error");
349        tree.seal();
350
351        tree.insert(Item::new(
352            ObjectKey::extent(object_id, attr_id, 512..1024),
353            ObjectValue::Extent(ExtentValue::new_raw(16384, VOLUME_DATA_KEY_ID)),
354        ))
355        .expect("insert error");
356
357        let layer_set = tree.layer_set();
358        let mut merger = layer_set.merger();
359        let mut iter = merger.query(Query::FullScan).await?;
360        assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 0..512));
361        assert_eq!(
362            iter.get().unwrap().value,
363            &ObjectValue::Extent(ExtentValue::new_raw(0, VOLUME_DATA_KEY_ID))
364        );
365        iter.advance().await?;
366        assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 512..1024));
367        assert_eq!(
368            iter.get().unwrap().value,
369            &ObjectValue::Extent(ExtentValue::new_raw(16384, VOLUME_DATA_KEY_ID))
370        );
371        iter.advance().await?;
372        assert!(iter.get().is_none());
373        Ok(())
374    }
375
376    #[fuchsia::test]
377    async fn test_merge_extents_rewrite_left() -> Result<(), Error> {
378        let object_id = 0;
379        let attr_id = 0;
380        let tree = LSMTree::new(merge, Box::new(NullCache {}));
381
382        tree.insert(Item::new(
383            ObjectKey::extent(object_id, attr_id, 0..1024),
384            ObjectValue::Extent(ExtentValue::with_checksum(
385                0,
386                Checksums::fletcher(vec![1, 2]),
387                VOLUME_DATA_KEY_ID,
388            )),
389        ))
390        .expect("insert error");
391        tree.seal();
392
393        tree.insert(Item::new(
394            ObjectKey::extent(object_id, attr_id, 0..512),
395            ObjectValue::Extent(ExtentValue::with_checksum(
396                16384,
397                Checksums::fletcher(vec![3]),
398                VOLUME_DATA_KEY_ID,
399            )),
400        ))
401        .expect("insert error");
402
403        let layer_set = tree.layer_set();
404        let mut merger = layer_set.merger();
405        let mut iter = merger.query(Query::FullScan).await?;
406        assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 0..512));
407        assert_eq!(
408            iter.get().unwrap().value,
409            &ObjectValue::Extent(ExtentValue::with_checksum(
410                16384,
411                Checksums::fletcher(vec![3]),
412                VOLUME_DATA_KEY_ID
413            ))
414        );
415        iter.advance().await?;
416        assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 512..1024));
417        assert_eq!(
418            iter.get().unwrap().value,
419            &ObjectValue::Extent(ExtentValue::with_checksum(
420                512,
421                Checksums::fletcher(vec![2]),
422                VOLUME_DATA_KEY_ID
423            ))
424        );
425        iter.advance().await?;
426        assert!(iter.get().is_none());
427        Ok(())
428    }
429
430    #[fuchsia::test]
431    async fn test_merge_extents_rewrite_middle() -> Result<(), Error> {
432        let object_id = 0;
433        let attr_id = 0;
434        let tree = LSMTree::new(merge, Box::new(NullCache {}));
435
436        tree.insert(Item::new(
437            ObjectKey::extent(object_id, attr_id, 0..2048),
438            ObjectValue::Extent(ExtentValue::with_checksum(
439                0,
440                Checksums::fletcher(vec![1, 2, 3, 4]),
441                VOLUME_DATA_KEY_ID,
442            )),
443        ))
444        .expect("insert error");
445        tree.seal();
446
447        tree.insert(Item::new(
448            ObjectKey::extent(object_id, attr_id, 1024..1536),
449            ObjectValue::Extent(ExtentValue::with_checksum(
450                16384,
451                Checksums::fletcher(vec![5]),
452                VOLUME_DATA_KEY_ID,
453            )),
454        ))
455        .expect("insert error");
456
457        let layer_set = tree.layer_set();
458        let mut merger = layer_set.merger();
459        let mut iter = merger.query(Query::FullScan).await?;
460        assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 0..1024));
461        assert_eq!(
462            iter.get().unwrap().value,
463            &ObjectValue::Extent(ExtentValue::with_checksum(
464                0,
465                Checksums::fletcher(vec![1, 2]),
466                VOLUME_DATA_KEY_ID
467            ))
468        );
469        iter.advance().await?;
470        assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 1024..1536));
471        assert_eq!(
472            iter.get().unwrap().value,
473            &ObjectValue::Extent(ExtentValue::with_checksum(
474                16384,
475                Checksums::fletcher(vec![5]),
476                VOLUME_DATA_KEY_ID
477            ))
478        );
479        iter.advance().await?;
480        assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 1536..2048));
481        assert_eq!(
482            iter.get().unwrap().value,
483            &ObjectValue::Extent(ExtentValue::with_checksum(
484                1536,
485                Checksums::fletcher(vec![4]),
486                VOLUME_DATA_KEY_ID
487            ))
488        );
489        iter.advance().await?;
490        assert!(iter.get().is_none());
491        Ok(())
492    }
493
494    #[fuchsia::test]
495    async fn test_merge_extents_rewrite_eclipses() -> Result<(), Error> {
496        let object_id = 0;
497        let attr_id = 0;
498        let tree = LSMTree::new(merge, Box::new(NullCache {}));
499
500        tree.insert(Item::new(
501            ObjectKey::extent(object_id, attr_id, 1024..1536),
502            ObjectValue::Extent(ExtentValue::new_raw(0, VOLUME_DATA_KEY_ID)),
503        ))
504        .expect("insert error");
505        tree.seal();
506
507        tree.insert(Item::new(
508            ObjectKey::extent(object_id, attr_id, 0..2048),
509            ObjectValue::Extent(ExtentValue::new_raw(16384, VOLUME_DATA_KEY_ID)),
510        ))
511        .expect("insert error");
512
513        let layer_set = tree.layer_set();
514        let mut merger = layer_set.merger();
515        let mut iter = merger.query(Query::FullScan).await?;
516        assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 0..2048));
517        assert_eq!(
518            iter.get().unwrap().value,
519            &ObjectValue::Extent(ExtentValue::new_raw(16384, VOLUME_DATA_KEY_ID))
520        );
521        iter.advance().await?;
522        assert!(iter.get().is_none());
523        Ok(())
524    }
525
526    #[fuchsia::test]
527    async fn test_merge_extents_delete_left() -> Result<(), Error> {
528        let object_id = 0;
529        let attr_id = 0;
530        let tree = LSMTree::new(merge, Box::new(NullCache {}));
531
532        tree.insert(Item::new(
533            ObjectKey::extent(object_id, attr_id, 0..1024),
534            ObjectValue::Extent(ExtentValue::new_raw(0, VOLUME_DATA_KEY_ID)),
535        ))
536        .expect("insert error");
537        tree.seal();
538
539        tree.insert(Item::new(
540            ObjectKey::extent(object_id, attr_id, 0..512),
541            ObjectValue::deleted_extent(),
542        ))
543        .expect("insert error");
544
545        let layer_set = tree.layer_set();
546        let mut merger = layer_set.merger();
547        let mut iter = merger.query(Query::FullScan).await?;
548        assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 0..512));
549        assert_eq!(iter.get().unwrap().value, &ObjectValue::deleted_extent());
550        iter.advance().await?;
551        assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 512..1024));
552        assert_eq!(
553            iter.get().unwrap().value,
554            &ObjectValue::Extent(ExtentValue::new_raw(512, VOLUME_DATA_KEY_ID))
555        );
556        iter.advance().await?;
557        assert!(iter.get().is_none());
558        Ok(())
559    }
560
561    #[fuchsia::test]
562    async fn test_merge_extents_delete_right() -> Result<(), Error> {
563        let object_id = 0;
564        let attr_id = 0;
565        let tree = LSMTree::new(merge, Box::new(NullCache {}));
566
567        tree.insert(Item::new(
568            ObjectKey::extent(object_id, attr_id, 0..1024),
569            ObjectValue::Extent(ExtentValue::new_raw(0, VOLUME_DATA_KEY_ID)),
570        ))
571        .expect("insert error");
572        tree.seal();
573
574        tree.insert(Item::new(
575            ObjectKey::extent(object_id, attr_id, 512..1024),
576            ObjectValue::deleted_extent(),
577        ))
578        .expect("insert error");
579
580        let layer_set = tree.layer_set();
581        let mut merger = layer_set.merger();
582        let mut iter = merger.query(Query::FullScan).await?;
583        assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 0..512));
584        assert_eq!(
585            iter.get().unwrap().value,
586            &ObjectValue::Extent(ExtentValue::new_raw(0, VOLUME_DATA_KEY_ID))
587        );
588        iter.advance().await?;
589        assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 512..1024));
590        assert_eq!(iter.get().unwrap().value, &ObjectValue::deleted_extent());
591        iter.advance().await?;
592        assert!(iter.get().is_none());
593        Ok(())
594    }
595
596    #[fuchsia::test]
597    async fn test_merge_extents_delete_middle() -> Result<(), Error> {
598        let object_id = 0;
599        let attr_id = 0;
600        let tree = LSMTree::new(merge, Box::new(NullCache {}));
601
602        tree.insert(Item::new(
603            ObjectKey::extent(object_id, attr_id, 0..2048),
604            ObjectValue::Extent(ExtentValue::new_raw(0, VOLUME_DATA_KEY_ID)),
605        ))
606        .expect("insert error");
607        tree.seal();
608
609        tree.insert(Item::new(
610            ObjectKey::extent(object_id, attr_id, 1024..1536),
611            ObjectValue::deleted_extent(),
612        ))
613        .expect("insert error");
614
615        let layer_set = tree.layer_set();
616        let mut merger = layer_set.merger();
617        let mut iter = merger.query(Query::FullScan).await?;
618        assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 0..1024));
619        assert_eq!(
620            iter.get().unwrap().value,
621            &ObjectValue::Extent(ExtentValue::new_raw(0, VOLUME_DATA_KEY_ID))
622        );
623        iter.advance().await?;
624        assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 1024..1536));
625        assert_eq!(iter.get().unwrap().value, &ObjectValue::deleted_extent());
626        iter.advance().await?;
627        assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 1536..2048));
628        assert_eq!(
629            iter.get().unwrap().value,
630            &ObjectValue::Extent(ExtentValue::new_raw(1536, VOLUME_DATA_KEY_ID))
631        );
632        iter.advance().await?;
633        assert!(iter.get().is_none());
634        Ok(())
635    }
636
637    #[fuchsia::test]
638    async fn test_merge_extents_delete_eclipses() -> Result<(), Error> {
639        let object_id = 0;
640        let attr_id = 0;
641        let tree = LSMTree::new(merge, Box::new(NullCache {}));
642
643        tree.insert(Item::new(
644            ObjectKey::extent(object_id, attr_id, 1024..1536),
645            ObjectValue::Extent(ExtentValue::new_raw(0, VOLUME_DATA_KEY_ID)),
646        ))
647        .expect("insert error");
648        tree.seal();
649
650        tree.insert(Item::new(
651            ObjectKey::extent(object_id, attr_id, 0..2048),
652            ObjectValue::deleted_extent(),
653        ))
654        .expect("insert error");
655
656        let layer_set = tree.layer_set();
657        let mut merger = layer_set.merger();
658        let mut iter = merger.query(Query::FullScan).await?;
659        assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 0..2048));
660        assert_eq!(iter.get().unwrap().value, &ObjectValue::deleted_extent());
661        iter.advance().await?;
662        assert!(iter.get().is_none());
663        Ok(())
664    }
665
666    #[fuchsia::test]
667    async fn test_merge_deleted_extents_new_layer_joins_two_deletions() -> Result<(), Error> {
668        // Old layer:  [----]    [----]
669        // New layer:       [----]
670        // Merged:     [--------------]
671        let object_id = 0;
672        let attr_id = 0;
673        let tree = LSMTree::new(merge, Box::new(NullCache {}));
674
675        tree.insert(Item::new(
676            ObjectKey::extent(object_id, attr_id, 0..512),
677            ObjectValue::deleted_extent(),
678        ))
679        .expect("insert error");
680        tree.insert(Item::new(
681            ObjectKey::extent(object_id, attr_id, 1024..1536),
682            ObjectValue::deleted_extent(),
683        ))
684        .expect("insert error");
685        tree.seal();
686
687        tree.insert(Item::new(
688            ObjectKey::extent(object_id, attr_id, 512..1024),
689            ObjectValue::deleted_extent(),
690        ))
691        .expect("insert error");
692        tree.seal();
693
694        let layer_set = tree.layer_set();
695        let mut merger = layer_set.merger();
696        let mut iter = merger.query(Query::FullScan).await?;
697        assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 0..1536));
698        assert_eq!(iter.get().unwrap().value, &ObjectValue::deleted_extent());
699        iter.advance().await?;
700        assert!(iter.get().is_none());
701        Ok(())
702    }
703
704    #[fuchsia::test]
705    async fn test_merge_deleted_extents_new_layer_joined_by_old_deletion() -> Result<(), Error> {
706        // Old layer:       [----]
707        // New layer:  [----]    [----]
708        // Merged:     [--------------]
709        let object_id = 0;
710        let attr_id = 0;
711        let tree = LSMTree::new(merge, Box::new(NullCache {}));
712
713        tree.insert(Item::new(
714            ObjectKey::extent(object_id, attr_id, 512..1024),
715            ObjectValue::deleted_extent(),
716        ))
717        .expect("insert error");
718        tree.seal();
719
720        tree.insert(Item::new(
721            ObjectKey::extent(object_id, attr_id, 0..512),
722            ObjectValue::deleted_extent(),
723        ))
724        .expect("insert error");
725        tree.insert(Item::new(
726            ObjectKey::extent(object_id, attr_id, 1024..1536),
727            ObjectValue::deleted_extent(),
728        ))
729        .expect("insert error");
730        tree.seal();
731
732        let layer_set = tree.layer_set();
733        let mut merger = layer_set.merger();
734        let mut iter = merger.query(Query::FullScan).await?;
735        assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 0..1536));
736        assert_eq!(iter.get().unwrap().value, &ObjectValue::deleted_extent());
737        iter.advance().await?;
738        assert!(iter.get().is_none());
739        Ok(())
740    }
741
742    #[fuchsia::test]
743    async fn test_merge_deleted_extents_overlapping_newest_on_right() -> Result<(), Error> {
744        let object_id = 0;
745        let attr_id = 0;
746        let tree = LSMTree::new(merge, Box::new(NullCache {}));
747
748        tree.insert(Item::new(
749            ObjectKey::extent(object_id, attr_id, 0..1024),
750            ObjectValue::deleted_extent(),
751        ))
752        .expect("insert error");
753        tree.seal();
754
755        tree.insert(Item::new(
756            ObjectKey::extent(object_id, attr_id, 512..1536),
757            ObjectValue::deleted_extent(),
758        ))
759        .expect("insert error");
760
761        let layer_set = tree.layer_set();
762        let mut merger = layer_set.merger();
763        let mut iter = merger.query(Query::FullScan).await?;
764        assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 0..1536));
765        assert_eq!(iter.get().unwrap().value, &ObjectValue::deleted_extent());
766        iter.advance().await?;
767        assert!(iter.get().is_none());
768        Ok(())
769    }
770
771    #[fuchsia::test]
772    async fn test_merge_deleted_extents_overlapping_newest_on_left() -> Result<(), Error> {
773        let object_id = 0;
774        let attr_id = 0;
775        let tree = LSMTree::new(merge, Box::new(NullCache {}));
776
777        tree.insert(Item::new(
778            ObjectKey::extent(object_id, attr_id, 512..1536),
779            ObjectValue::deleted_extent(),
780        ))
781        .expect("insert error");
782        tree.seal();
783        tree.insert(Item::new(
784            ObjectKey::extent(object_id, attr_id, 0..1024),
785            ObjectValue::deleted_extent(),
786        ))
787        .expect("insert error");
788
789        let layer_set = tree.layer_set();
790        let mut merger = layer_set.merger();
791        let mut iter = merger.query(Query::FullScan).await?;
792        assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 0..1536));
793        assert_eq!(iter.get().unwrap().value, &ObjectValue::deleted_extent());
794        iter.advance().await?;
795        assert!(iter.get().is_none());
796        Ok(())
797    }
798
799    #[fuchsia::test]
800    async fn test_merge_deleted_extents_new_layer_contained_in_old() -> Result<(), Error> {
801        // Old layer:  [--------------]
802        // New layer:       [----]
803        // Merged:     [--------------]
804        let object_id = 0;
805        let attr_id = 0;
806        let tree = LSMTree::new(merge, Box::new(NullCache {}));
807
808        tree.insert(Item::new(
809            ObjectKey::extent(object_id, attr_id, 0..1536),
810            ObjectValue::deleted_extent(),
811        ))
812        .expect("insert error");
813        tree.seal();
814
815        tree.insert(Item::new(
816            ObjectKey::extent(object_id, attr_id, 512..1024),
817            ObjectValue::deleted_extent(),
818        ))
819        .expect("insert error");
820        tree.seal();
821
822        let layer_set = tree.layer_set();
823        let mut merger = layer_set.merger();
824        let mut iter = merger.query(Query::FullScan).await?;
825        assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 0..1536));
826        assert_eq!(iter.get().unwrap().value, &ObjectValue::deleted_extent());
827        iter.advance().await?;
828        assert!(iter.get().is_none());
829        Ok(())
830    }
831
832    #[fuchsia::test]
833    async fn test_merge_deleted_extents_new_layer_eclipses_old() -> Result<(), Error> {
834        // Old layer:       [----]
835        // New layer:  [--------------]
836        // Merged:     [--------------]
837        let object_id = 0;
838        let attr_id = 0;
839        let tree = LSMTree::new(merge, Box::new(NullCache {}));
840
841        tree.insert(Item::new(
842            ObjectKey::extent(object_id, attr_id, 512..1024),
843            ObjectValue::deleted_extent(),
844        ))
845        .expect("insert error");
846        tree.seal();
847
848        tree.insert(Item::new(
849            ObjectKey::extent(object_id, attr_id, 0..1536),
850            ObjectValue::deleted_extent(),
851        ))
852        .expect("insert error");
853        tree.seal();
854
855        let layer_set = tree.layer_set();
856        let mut merger = layer_set.merger();
857        let mut iter = merger.query(Query::FullScan).await?;
858        assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 0..1536));
859        assert_eq!(iter.get().unwrap().value, &ObjectValue::deleted_extent());
860        iter.advance().await?;
861        assert!(iter.get().is_none());
862        Ok(())
863    }
864
865    #[fuchsia::test]
866    async fn test_merge_deleted_extents_does_not_coalesce_if_not_adjacent_layers(
867    ) -> Result<(), Error> {
868        // Layer 0:  [XXXXX]
869        // Layer 1:  [--------------]
870        // Layer 2:        [XXXXXXXX]
871        //  Merged:  [XXXXX|--------]
872        let object_id = 0;
873        let attr_id = 0;
874        let tree = LSMTree::<ObjectKey, ObjectValue>::new(merge, Box::new(NullCache {}));
875
876        tree.insert(Item::new(
877            ObjectKey::extent(object_id, attr_id, 512..1024),
878            ObjectValue::deleted_extent(),
879        ))
880        .expect("insert error");
881        tree.seal();
882
883        tree.insert(Item::new(
884            ObjectKey::extent(object_id, attr_id, 0..1024),
885            ObjectValue::Extent(ExtentValue::new_raw(0, VOLUME_DATA_KEY_ID)),
886        ))
887        .expect("insert error");
888        tree.seal();
889
890        tree.insert(Item::new(
891            ObjectKey::extent(object_id, attr_id, 0..512),
892            ObjectValue::deleted_extent(),
893        ))
894        .expect("insert error");
895
896        let layer_set = tree.layer_set();
897        let mut merger = layer_set.merger();
898        let mut iter = merger.query(Query::FullScan).await?;
899        assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 0..512));
900        assert_eq!(iter.get().unwrap().value, &ObjectValue::deleted_extent());
901        iter.advance().await?;
902        assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 512..1024));
903        assert_eq!(
904            iter.get().unwrap().value,
905            &ObjectValue::Extent(ExtentValue::new_raw(512, VOLUME_DATA_KEY_ID))
906        );
907        iter.advance().await?;
908        assert!(iter.get().is_none());
909        Ok(())
910    }
911
912    #[fuchsia::test]
913    async fn test_merge_deleted_extents_does_not_coalesce_if_not_adjacent_deletions(
914    ) -> Result<(), Error> {
915        // Layer 0:  [XXXXX|--------]
916        // Layer 1:           [XXXXX]
917        //  Merged:  [XXXXX|--------]
918        let object_id = 0;
919        let attr_id = 0;
920        let tree = LSMTree::<ObjectKey, ObjectValue>::new(merge, Box::new(NullCache {}));
921
922        tree.insert(Item::new(
923            ObjectKey::extent(object_id, attr_id, 1024..1536),
924            ObjectValue::Extent(ExtentValue::new_raw(0, VOLUME_DATA_KEY_ID)),
925        ))
926        .expect("insert error");
927        tree.seal();
928
929        tree.insert(Item::new(
930            ObjectKey::extent(object_id, attr_id, 0..512),
931            ObjectValue::deleted_extent(),
932        ))
933        .expect("insert error");
934        tree.insert(Item::new(
935            ObjectKey::extent(object_id, attr_id, 512..1536),
936            ObjectValue::Extent(ExtentValue::new_raw(0, VOLUME_DATA_KEY_ID)),
937        ))
938        .expect("insert error");
939
940        let layer_set = tree.layer_set();
941        let mut merger = layer_set.merger();
942        let mut iter = merger.query(Query::FullScan).await?;
943        assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 0..512));
944        assert_eq!(iter.get().unwrap().value, &ObjectValue::deleted_extent());
945        iter.advance().await?;
946        assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 512..1536));
947        assert_eq!(
948            iter.get().unwrap().value,
949            &ObjectValue::Extent(ExtentValue::new_raw(0, VOLUME_DATA_KEY_ID))
950        );
951        iter.advance().await?;
952        assert!(iter.get().is_none());
953        Ok(())
954    }
955
956    #[fuchsia::test]
957    async fn test_merge_deleted_extent_into_overwrites_extents() -> Result<(), Error> {
958        let object_id = 0;
959        let attr_id = 0;
960        let tree = LSMTree::<ObjectKey, ObjectValue>::new(merge, Box::new(NullCache {}));
961
962        tree.insert(Item::new(
963            ObjectKey::extent(object_id, attr_id, 0..1024),
964            ObjectValue::Extent(ExtentValue::new_raw(0, VOLUME_DATA_KEY_ID)),
965        ))
966        .expect("insert error");
967        tree.insert(Item::new(
968            ObjectKey::extent(object_id, attr_id, 1024..2048),
969            ObjectValue::Extent(ExtentValue::new_raw(16384, VOLUME_DATA_KEY_ID)),
970        ))
971        .expect("insert error");
972        let key = ObjectKey::extent(object_id, attr_id, 512..1536);
973        tree.merge_into(
974            Item::new(key.clone(), ObjectValue::deleted_extent()),
975            &key.key_for_merge_into(),
976        );
977
978        let layer_set = tree.layer_set();
979        let mut merger = layer_set.merger();
980        let mut iter = merger.query(Query::FullScan).await?;
981        assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 0..512));
982        assert_eq!(
983            iter.get().unwrap().value,
984            &ObjectValue::Extent(ExtentValue::new_raw(0, VOLUME_DATA_KEY_ID))
985        );
986        iter.advance().await?;
987        assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 512..1536));
988        assert_eq!(iter.get().unwrap().value, &ObjectValue::deleted_extent());
989        iter.advance().await?;
990        assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 1536..2048));
991        assert_eq!(
992            iter.get().unwrap().value,
993            &ObjectValue::Extent(ExtentValue::new_raw(16896, VOLUME_DATA_KEY_ID))
994        );
995        iter.advance().await?;
996        assert!(iter.get().is_none());
997        Ok(())
998    }
999
1000    #[fuchsia::test]
1001    async fn test_merge_deleted_extent_into_merges_with_other_deletions() -> Result<(), Error> {
1002        let object_id = 0;
1003        let attr_id = 0;
1004        let tree = LSMTree::<ObjectKey, ObjectValue>::new(merge, Box::new(NullCache {}));
1005
1006        tree.insert(Item::new(
1007            ObjectKey::extent(object_id, attr_id, 0..1024),
1008            ObjectValue::deleted_extent(),
1009        ))
1010        .expect("insert error");
1011        tree.insert(Item::new(
1012            ObjectKey::extent(object_id, attr_id, 1024..2048),
1013            ObjectValue::deleted_extent(),
1014        ))
1015        .expect("insert error");
1016
1017        let key = ObjectKey::extent(object_id, attr_id, 512..1536);
1018        tree.merge_into(
1019            Item::new(key.clone(), ObjectValue::deleted_extent()),
1020            &key.key_for_merge_into(),
1021        );
1022
1023        let layer_set = tree.layer_set();
1024        let mut merger = layer_set.merger();
1025        let mut iter = merger.query(Query::FullScan).await?;
1026        assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 0..2048));
1027        assert_eq!(iter.get().unwrap().value, &ObjectValue::deleted_extent());
1028        iter.advance().await?;
1029        assert!(iter.get().is_none());
1030        Ok(())
1031    }
1032
1033    #[fuchsia::test]
1034    async fn test_merge_size_records() {
1035        let left = &[Item::new(
1036            ObjectKey::attribute(1, 0, AttributeKey::Attribute),
1037            ObjectValue::attribute(5, false),
1038        )];
1039        let right = &[Item::new(
1040            ObjectKey::attribute(1, 0, AttributeKey::Attribute),
1041            ObjectValue::attribute(10, false),
1042        )];
1043        let tree = LSMTree::new(merge, Box::new(NullCache {}));
1044        test_merge(&tree, left, right, left).await;
1045    }
1046
1047    #[fuchsia::test]
1048    async fn test_different_attributes_not_merged() {
1049        let left = Item::new(
1050            ObjectKey::attribute(1, 0, AttributeKey::Attribute),
1051            ObjectValue::attribute(5, false),
1052        );
1053        let right = Item::new(
1054            ObjectKey::attribute(1, 1, AttributeKey::Attribute),
1055            ObjectValue::attribute(10, false),
1056        );
1057        let tree = LSMTree::new(merge, Box::new(NullCache {}));
1058        test_merge(&tree, &[left.clone()], &[right.clone()], &[left, right]).await;
1059
1060        let left = Item::new(
1061            ObjectKey::extent(1, 0, 0..100),
1062            ObjectValue::Extent(ExtentValue::new_raw(0, VOLUME_DATA_KEY_ID)),
1063        );
1064        let right = Item::new(
1065            ObjectKey::extent(1, 1, 0..100),
1066            ObjectValue::Extent(ExtentValue::new_raw(1, VOLUME_DATA_KEY_ID)),
1067        );
1068        let tree = LSMTree::new(merge, Box::new(NullCache {}));
1069        test_merge(&tree, &[left.clone()], &[right.clone()], &[left, right]).await;
1070    }
1071
1072    #[fuchsia::test]
1073    async fn test_tombstone_discards_all_other_records() {
1074        let tombstone = Item::new(ObjectKey::object(1), ObjectValue::None);
1075        let other_object = Item::new(
1076            ObjectKey::object(2),
1077            ObjectValue::file(
1078                1,
1079                0,
1080                Timestamp::default(),
1081                Timestamp::default(),
1082                Timestamp::default(),
1083                Timestamp::default(),
1084                0,
1085                None,
1086            ),
1087        );
1088        let tree = LSMTree::new(merge, Box::new(NullCache {}));
1089        test_merge(
1090            &tree,
1091            &[tombstone.clone()],
1092            &[
1093                Item::new(
1094                    ObjectKey::object(1),
1095                    ObjectValue::file(
1096                        1,
1097                        100,
1098                        Timestamp::default(),
1099                        Timestamp::default(),
1100                        Timestamp::default(),
1101                        Timestamp::default(),
1102                        0,
1103                        None,
1104                    ),
1105                ),
1106                Item::new(
1107                    ObjectKey::attribute(1, 0, AttributeKey::Attribute),
1108                    ObjectValue::attribute(100, false),
1109                ),
1110                other_object.clone(),
1111            ],
1112            &[tombstone, other_object],
1113        )
1114        .await;
1115    }
1116
1117    #[fuchsia::test]
1118    async fn test_merge_preserves_sequences() -> Result<(), Error> {
1119        let object_id = 0;
1120        let attr_id = 0;
1121        let tree = LSMTree::<ObjectKey, ObjectValue>::new(merge, Box::new(NullCache {}));
1122
1123        tree.insert(Item {
1124            key: ObjectKey::extent(object_id, attr_id, 0..1024),
1125            value: ObjectValue::Extent(ExtentValue::new_raw(0u64, VOLUME_DATA_KEY_ID)),
1126            sequence: 1u64,
1127        })
1128        .expect("insert error");
1129        tree.seal();
1130        tree.insert(Item {
1131            key: ObjectKey::extent(object_id, attr_id, 0..512),
1132            value: ObjectValue::deleted_extent(),
1133            sequence: 2u64,
1134        })
1135        .expect("insert error");
1136        tree.insert(Item {
1137            key: ObjectKey::extent(object_id, attr_id, 1536..2048),
1138            value: ObjectValue::Extent(ExtentValue::new_raw(1536, VOLUME_DATA_KEY_ID)),
1139            sequence: 3u64,
1140        })
1141        .expect("insert error");
1142        tree.insert(Item {
1143            key: ObjectKey::extent(object_id, attr_id, 768..1024),
1144            value: ObjectValue::Extent(ExtentValue::new_raw(12345, VOLUME_DATA_KEY_ID)),
1145            sequence: 4u64,
1146        })
1147        .expect("insert error");
1148
1149        let layer_set = tree.layer_set();
1150        let mut merger = layer_set.merger();
1151        let mut iter = merger.query(Query::FullScan).await?;
1152        assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 0..512));
1153        assert_eq!(iter.get().unwrap().value, &ObjectValue::deleted_extent());
1154        assert_eq!(iter.get().unwrap().sequence, 2u64);
1155        iter.advance().await?;
1156        assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 512..768));
1157        assert_eq!(
1158            iter.get().unwrap().value,
1159            &ObjectValue::Extent(ExtentValue::new_raw(512, VOLUME_DATA_KEY_ID))
1160        );
1161        assert_eq!(iter.get().unwrap().sequence, 1u64);
1162        iter.advance().await?;
1163        assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 768..1024));
1164        assert_eq!(
1165            iter.get().unwrap().value,
1166            &ObjectValue::Extent(ExtentValue::new_raw(12345, VOLUME_DATA_KEY_ID))
1167        );
1168        assert_eq!(iter.get().unwrap().sequence, 4u64);
1169        iter.advance().await?;
1170        assert_eq!(iter.get().unwrap().key, &ObjectKey::extent(object_id, attr_id, 1536..2048));
1171        assert_eq!(
1172            iter.get().unwrap().value,
1173            &ObjectValue::Extent(ExtentValue::new_raw(1536, VOLUME_DATA_KEY_ID))
1174        );
1175        assert_eq!(iter.get().unwrap().sequence, 3u64);
1176        iter.advance().await?;
1177        assert!(iter.get().is_none());
1178        Ok(())
1179    }
1180
1181    #[fuchsia::test]
1182    async fn test_merge_project_usage() {
1183        let tree = LSMTree::new(merge, Box::new(NullCache {}));
1184        let key = ObjectKey::project_usage(5, 6);
1185
1186        tree.insert(Item::new(key.clone(), ObjectValue::BytesAndNodes { bytes: 100, nodes: 1000 }))
1187            .expect("insert error");
1188        tree.merge_into(
1189            Item::new(key.clone(), ObjectValue::BytesAndNodes { bytes: 4, nodes: 8 }),
1190            &key,
1191        );
1192        tree.seal();
1193
1194        tree.merge_into(
1195            Item::new(key.clone(), ObjectValue::BytesAndNodes { bytes: -1, nodes: -2 }),
1196            &key,
1197        );
1198        tree.seal();
1199
1200        tree.merge_into(
1201            Item::new(key.clone(), ObjectValue::BytesAndNodes { bytes: 16, nodes: 32 }),
1202            &key,
1203        );
1204
1205        let layer_set = tree.layer_set();
1206        let mut merger = layer_set.merger();
1207        let mut iter = merger.query(Query::FullScan).await.unwrap();
1208        assert_eq!(iter.get().unwrap().key, &key);
1209        assert_eq!(
1210            iter.get().unwrap().value,
1211            &ObjectValue::BytesAndNodes { bytes: 119, nodes: 1038 }
1212        );
1213        iter.advance().await.unwrap();
1214        assert!(iter.get().is_none());
1215    }
1216
1217    #[fuchsia::test]
1218    async fn test_merge_project_usage_gap_layer() {
1219        let tree = LSMTree::new(merge, Box::new(NullCache {}));
1220        let key = ObjectKey::project_usage(5, 6);
1221        let key2 = ObjectKey::project_usage(5, 7);
1222
1223        tree.insert(Item::new(key.clone(), ObjectValue::BytesAndNodes { bytes: 100, nodes: 1000 }))
1224            .expect("insert error");
1225        tree.merge_into(
1226            Item::new(key.clone(), ObjectValue::BytesAndNodes { bytes: 4, nodes: 8 }),
1227            &key,
1228        );
1229        tree.seal();
1230
1231        assert_eq!(
1232            tree.find(&key).await.expect("Find").unwrap().value,
1233            ObjectValue::BytesAndNodes { bytes: 104, nodes: 1008 }
1234        );
1235
1236        tree.merge_into(
1237            Item::new(key2.clone(), ObjectValue::BytesAndNodes { bytes: 13, nodes: 17 }),
1238            &key,
1239        );
1240        tree.seal();
1241
1242        assert_eq!(
1243            tree.find(&key).await.expect("Find").unwrap().value,
1244            ObjectValue::BytesAndNodes { bytes: 104, nodes: 1008 }
1245        );
1246
1247        tree.merge_into(
1248            Item::new(key.clone(), ObjectValue::BytesAndNodes { bytes: 16, nodes: 32 }),
1249            &key,
1250        );
1251
1252        let layer_set = tree.layer_set();
1253        let mut merger = layer_set.merger();
1254        let mut iter = merger.query(Query::FullScan).await.unwrap();
1255        assert_eq!(iter.get().unwrap().key, &key);
1256        assert_eq!(
1257            iter.get().unwrap().value,
1258            &ObjectValue::BytesAndNodes { bytes: 120, nodes: 1040 }
1259        );
1260        iter.advance().await.unwrap();
1261        assert_eq!(iter.get().unwrap().key, &key2);
1262        assert_eq!(iter.get().unwrap().value, &ObjectValue::BytesAndNodes { bytes: 13, nodes: 17 });
1263        iter.advance().await.unwrap();
1264        assert!(iter.get().is_none());
1265
1266        assert_eq!(
1267            tree.find(&key).await.expect("Find").unwrap().value,
1268            ObjectValue::BytesAndNodes { bytes: 120, nodes: 1040 }
1269        );
1270    }
1271
1272    #[fuchsia::test]
1273    async fn test_merge_project_usage_to_zero() {
1274        let tree = LSMTree::new(merge, Box::new(NullCache {}));
1275        let key = ObjectKey::project_usage(5, 6);
1276
1277        tree.insert(Item::new(key.clone(), ObjectValue::BytesAndNodes { bytes: 4, nodes: 8 }))
1278            .expect("insert error");
1279        tree.seal();
1280
1281        tree.merge_into(
1282            Item::new(key.clone(), ObjectValue::BytesAndNodes { bytes: -4, nodes: -8 }),
1283            &key,
1284        );
1285        tree.seal();
1286
1287        let layer_set = tree.layer_set();
1288        let mut merger = layer_set.merger();
1289        let iter = merger.query(Query::FullScan).await.unwrap();
1290        assert!(iter.get().is_none());
1291    }
1292
1293    #[fuchsia::test]
1294    async fn test_merge_project_usage_recover_from_zero() {
1295        let tree = LSMTree::new(merge, Box::new(NullCache {}));
1296        let key = ObjectKey::project_usage(5, 6);
1297
1298        tree.insert(Item::new(key.clone(), ObjectValue::BytesAndNodes { bytes: 4, nodes: 8 }))
1299            .expect("insert error");
1300        tree.seal();
1301
1302        tree.merge_into(
1303            Item::new(key.clone(), ObjectValue::BytesAndNodes { bytes: -4, nodes: -8 }),
1304            &key,
1305        );
1306        tree.seal();
1307
1308        tree.merge_into(
1309            Item::new(key.clone(), ObjectValue::BytesAndNodes { bytes: 20, nodes: 40 }),
1310            &key,
1311        );
1312        tree.seal();
1313
1314        let layer_set = tree.layer_set();
1315        let mut merger = layer_set.merger();
1316        let mut iter = merger.query(Query::FullScan).await.unwrap();
1317        assert_eq!(iter.get().unwrap().key, &key);
1318        assert_eq!(iter.get().unwrap().value, &ObjectValue::BytesAndNodes { bytes: 20, nodes: 40 });
1319        iter.advance().await.unwrap();
1320        assert!(iter.get().is_none());
1321    }
1322
1323    #[fuchsia::test]
1324    async fn test_merge_project_usage_layer_merge_to_negative() {
1325        let tree = LSMTree::new(merge, Box::new(NullCache {}));
1326        let key = ObjectKey::project_usage(5, 6);
1327
1328        tree.insert(Item::new(key.clone(), ObjectValue::BytesAndNodes { bytes: 4, nodes: 8 }))
1329            .expect("insert error");
1330        tree.seal();
1331
1332        tree.merge_into(
1333            Item::new(key.clone(), ObjectValue::BytesAndNodes { bytes: 16, nodes: 32 }),
1334            &key,
1335        );
1336        tree.seal();
1337
1338        // As we merge from the newest layer down this will drop bytes below zero and nodes to
1339        // exactly zero during the merge process.
1340        tree.merge_into(
1341            Item::new(key.clone(), ObjectValue::BytesAndNodes { bytes: -18, nodes: -32 }),
1342            &key,
1343        );
1344        tree.seal();
1345
1346        let layer_set = tree.layer_set();
1347        let mut merger = layer_set.merger();
1348        let mut iter = merger.query(Query::FullScan).await.unwrap();
1349        assert_eq!(iter.get().unwrap().key, &key);
1350        assert_eq!(iter.get().unwrap().value, &ObjectValue::BytesAndNodes { bytes: 2, nodes: 8 });
1351        iter.advance().await.unwrap();
1352        assert!(iter.get().is_none());
1353    }
1354}