fxfs/object_store/allocator/
merge.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::lsm_tree::merge::ItemOp::{Discard, Keep, Replace};
6use crate::lsm_tree::merge::{MergeLayerIterator, MergeResult};
7use crate::lsm_tree::types::{Item, LayerIterator};
8use crate::object_store::allocator::{AllocatorKey, AllocatorValue};
9use anyhow::Error;
10use std::collections::HashSet;
11
12pub fn merge(
13    left: &MergeLayerIterator<'_, AllocatorKey, AllocatorValue>,
14    right: &MergeLayerIterator<'_, AllocatorKey, AllocatorValue>,
15) -> MergeResult<AllocatorKey, AllocatorValue> {
16    // Wherever Replace is used below, it must not extend the *end* of the range for whichever item
17    // is returned i.e. if replacing the left item, replacement.end <= left.end because otherwise we
18    // might not merge records that come after that end point because the merger won't merge records
19    // in the same layer
20
21    /*  Case 1: Disjoint
22     *    L:    |------------|
23     *    R:                      |-----------|
24     */
25    if left.key().device_range.end < right.key().device_range.start {
26        return MergeResult::EmitLeft;
27    }
28
29    /*  Case 2: Touching
30     *    L:    |------------|
31     *    R:                 |-----------|
32     */
33    if left.key().device_range.end == right.key().device_range.start {
34        // We can only merge the range if the values are an exact match.
35        if *left.value() == *right.value() {
36            return MergeResult::Other {
37                emit: None,
38                left: Discard,
39                right: Replace(
40                    Item {
41                        key: AllocatorKey {
42                            device_range: left.key().device_range.start
43                                ..right.key().device_range.end,
44                        },
45                        value: left.value().clone(),
46                        sequence: std::cmp::min(left.sequence(), right.sequence()),
47                    }
48                    .boxed(),
49                ),
50            };
51        } else {
52            return MergeResult::EmitLeft;
53        }
54    }
55    if left.key().device_range.start == right.key().device_range.start {
56        /*  Case 3: Overlap with same start
57         *    L:    |------------|
58         *    R:    |-----------------|
59         */
60        if left.key().device_range.end < right.key().device_range.end {
61            // The newer value eclipses the older.
62            if left.layer_index < right.layer_index {
63                return MergeResult::Other {
64                    emit: None,
65                    left: Keep,
66                    right: if left.key().device_range.end == right.key().device_range.end {
67                        Discard
68                    } else {
69                        Replace(
70                            Item {
71                                key: AllocatorKey {
72                                    device_range: left.key().device_range.end
73                                        ..right.key().device_range.end,
74                                },
75                                value: right.value().clone(),
76                                sequence: right.sequence(),
77                            }
78                            .boxed(),
79                        )
80                    },
81                };
82            } else {
83                // right is a newer Abs/None than left
84                return MergeResult::Other { emit: None, left: Discard, right: Keep };
85            }
86
87        /*  Case 4: Overlap with same start
88         *    L:    |-----------------|
89         *    R:    |------------|
90         */
91        } else {
92            // The newer value eclipses the older.
93            if right.layer_index < left.layer_index {
94                return MergeResult::Other {
95                    emit: None,
96                    left: if right.key().device_range.end == left.key().device_range.end {
97                        Discard
98                    } else {
99                        Replace(
100                            Item {
101                                key: AllocatorKey {
102                                    device_range: right.key().device_range.end
103                                        ..left.key().device_range.end,
104                                },
105                                value: left.value().clone(),
106                                sequence: left.sequence(),
107                            }
108                            .boxed(),
109                        )
110                    },
111                    right: Keep,
112                };
113            } else {
114                // right is a newer Abs/None than left
115                return MergeResult::Other { emit: None, left: Keep, right: Discard };
116            }
117        }
118    }
119    /*  Case 5: Split off left prefix
120     *    L:    |-----...
121     *    R:         |-----...
122     */
123    debug_assert!(left.key().device_range.end >= right.key().device_range.start);
124    MergeResult::Other {
125        emit: Some(
126            Item {
127                key: AllocatorKey {
128                    device_range: left.key().device_range.start..right.key().device_range.start,
129                },
130                value: left.value().clone(),
131                sequence: left.sequence(),
132            }
133            .boxed(),
134        ),
135        left: Replace(
136            Item {
137                key: AllocatorKey {
138                    device_range: right.key().device_range.start..left.key().device_range.end,
139                },
140                value: left.value().clone(),
141                sequence: left.sequence(),
142            }
143            .boxed(),
144        ),
145        right: Keep,
146    }
147}
148
149pub async fn filter_tombstones(
150    iter: impl LayerIterator<AllocatorKey, AllocatorValue>,
151) -> Result<impl LayerIterator<AllocatorKey, AllocatorValue>, Error> {
152    Ok(iter.filter(|i| *i.value != AllocatorValue::None).await?)
153}
154
155pub async fn filter_marked_for_deletion(
156    iter: impl LayerIterator<AllocatorKey, AllocatorValue>,
157    marked_for_deletion: HashSet<u64>,
158) -> Result<impl LayerIterator<AllocatorKey, AllocatorValue>, Error> {
159    Ok(iter
160        .filter(move |i| {
161            if let AllocatorValue::Abs { owner_object_id, .. } = i.value {
162                !marked_for_deletion.contains(owner_object_id)
163            } else {
164                true
165            }
166        })
167        .await?)
168}
169
170#[cfg(test)]
171mod tests {
172    use crate::lsm_tree::cache::NullCache;
173    use crate::lsm_tree::types::{Item, ItemRef, LayerIterator};
174    use crate::lsm_tree::{LSMTree, Query};
175    use crate::object_handle::INVALID_OBJECT_ID;
176    use crate::object_store::allocator::merge::{filter_tombstones, merge};
177    use crate::object_store::allocator::{AllocatorKey, AllocatorValue};
178    use std::ops::Range;
179
180    // Tests merge logic given (range, delta and object_id) for left, right and expected output.
181    async fn test_merge(
182        left: (Range<u64>, AllocatorValue),
183        right: (Range<u64>, AllocatorValue),
184        expected: &[(Range<u64>, AllocatorValue)],
185    ) {
186        let tree = LSMTree::new(merge, Box::new(NullCache {}));
187        tree.insert(Item::new(AllocatorKey { device_range: right.0 }, right.1))
188            .expect("insert error");
189        tree.seal();
190        tree.insert(Item::new(AllocatorKey { device_range: left.0 }, left.1))
191            .expect("insert error");
192        let layer_set = tree.layer_set();
193        let mut merger = layer_set.merger();
194        let mut iter = filter_tombstones(merger.query(Query::FullScan).await.expect("seek failed"))
195            .await
196            .expect("filter failed");
197        for e in expected {
198            let ItemRef { key, value, .. } = iter.get().expect("get failed");
199            assert_eq!((key, value), (&AllocatorKey { device_range: e.0.clone() }, &e.1));
200            iter.advance().await.expect("advance failed");
201        }
202        assert!(iter.get().is_none());
203    }
204
205    #[fuchsia::test]
206    async fn test_no_overlap() {
207        test_merge(
208            (0..100, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
209            (200..300, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
210            &[
211                (0..100, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
212                (200..300, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
213            ],
214        )
215        .await;
216    }
217
218    #[fuchsia::test]
219    async fn test_touching() {
220        test_merge(
221            (0..100, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
222            (100..200, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
223            &[(0..200, AllocatorValue::Abs { count: 1, owner_object_id: 1 })],
224        )
225        .await;
226    }
227
228    #[fuchsia::test]
229    async fn test_identical() {
230        test_merge(
231            (0..100, AllocatorValue::Abs { count: 2, owner_object_id: 1 }),
232            (0..100, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
233            &[(0..100, AllocatorValue::Abs { count: 2, owner_object_id: 1 })],
234        )
235        .await;
236        test_merge(
237            (0..100, AllocatorValue::None),
238            (0..100, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
239            &[],
240        )
241        .await;
242    }
243
244    #[fuchsia::test]
245    async fn test_left_smaller_than_right_with_same_start() {
246        test_merge(
247            (0..100, AllocatorValue::Abs { count: 2, owner_object_id: 1 }),
248            (0..200, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
249            &[
250                (0..100, AllocatorValue::Abs { count: 2, owner_object_id: 1 }),
251                (100..200, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
252            ],
253        )
254        .await;
255        test_merge(
256            (0..100, AllocatorValue::None),
257            (0..200, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
258            &[(100..200, AllocatorValue::Abs { count: 1, owner_object_id: 1 })],
259        )
260        .await;
261    }
262
263    #[fuchsia::test]
264    async fn test_left_starts_before_right_with_overlap() {
265        test_merge(
266            (0..200, AllocatorValue::Abs { count: 2, owner_object_id: 1 }),
267            (100..150, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
268            &[
269                (0..100, AllocatorValue::Abs { count: 2, owner_object_id: 1 }),
270                (100..200, AllocatorValue::Abs { count: 2, owner_object_id: 1 }),
271            ],
272        )
273        .await;
274    }
275
276    #[fuchsia::test]
277    async fn test_different_object_id() {
278        // Case 1
279        test_merge(
280            (0..100, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
281            (200..300, AllocatorValue::Abs { count: 1, owner_object_id: 2 }),
282            &[
283                (0..100, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
284                (200..300, AllocatorValue::Abs { count: 1, owner_object_id: 2 }),
285            ],
286        )
287        .await;
288        // Case 2
289        test_merge(
290            (0..100, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
291            (100..200, AllocatorValue::Abs { count: 1, owner_object_id: 2 }),
292            &[
293                (0..100, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
294                (100..200, AllocatorValue::Abs { count: 1, owner_object_id: 2 }),
295            ],
296        )
297        .await;
298        // Case 3
299        test_merge(
300            (0..100, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
301            (0..100, AllocatorValue::Abs { count: 1, owner_object_id: 2 }),
302            &[(0..100, AllocatorValue::Abs { count: 1, owner_object_id: 1 })],
303        )
304        .await;
305        // Case 4
306        test_merge(
307            (0..100, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
308            (0..200, AllocatorValue::Abs { count: 1, owner_object_id: 2 }),
309            &[
310                (0..100, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
311                (100..200, AllocatorValue::Abs { count: 1, owner_object_id: 2 }),
312            ],
313        )
314        .await;
315        // Case 5
316        test_merge(
317            (0..200, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
318            (0..100, AllocatorValue::Abs { count: 1, owner_object_id: 2 }),
319            &[(0..200, AllocatorValue::Abs { count: 1, owner_object_id: 1 })],
320        )
321        .await;
322        // Case 6
323        test_merge(
324            (0..100, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
325            (50..150, AllocatorValue::Abs { count: 1, owner_object_id: 2 }),
326            &[
327                (0..50, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
328                (50..100, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
329                (100..150, AllocatorValue::Abs { count: 1, owner_object_id: 2 }),
330            ],
331        )
332        .await;
333    }
334
335    #[fuchsia::test]
336    async fn test_tombstones() {
337        // We have to make sure we don't prematurely discard records. seal() may be called at
338        // any time and the resulting layer tree must remain valid.
339        // Here we test absolute allocation counts and reuse of allocated space.
340        //
341        //  1. Alloc object_id A, write layer file.
342        //  2. Dealloc object_id A, Alloc object_id B, write layer file.
343        //  3. Dealloc object_id B, Alloc object_id A.
344        let key = AllocatorKey { device_range: 0..100 };
345        let lower_bound = AllocatorKey::lower_bound_for_merge_into(&key);
346        let tree = LSMTree::new(merge, Box::new(NullCache {}));
347        tree.merge_into(
348            Item::new(key.clone(), AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
349            &lower_bound,
350        );
351        tree.seal();
352        tree.merge_into(
353            Item::new(key.clone(), AllocatorValue::Abs { count: 2, owner_object_id: 1 }),
354            &lower_bound,
355        );
356        tree.seal();
357        tree.merge_into(Item::new(key.clone(), AllocatorValue::None), &lower_bound);
358        tree.merge_into(
359            Item::new(key.clone(), AllocatorValue::Abs { count: 1, owner_object_id: 2 }),
360            &lower_bound,
361        );
362        tree.seal();
363        tree.merge_into(Item::new(key.clone(), AllocatorValue::None), &lower_bound);
364        tree.merge_into(
365            Item::new(key.clone(), AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
366            &lower_bound,
367        );
368        let layer_set = tree.layer_set();
369        let mut merger = layer_set.merger();
370        let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
371        let ItemRef { key: k, value, .. } = iter.get().expect("get failed");
372        assert_eq!((k, value), (&key, &AllocatorValue::Abs { count: 1, owner_object_id: 1 }));
373        iter.advance().await.expect("advance failed");
374        assert!(iter.get().is_none());
375    }
376
377    #[fuchsia::test]
378    async fn test_merge_preserves_sequences() {
379        let tree = LSMTree::new(merge, Box::new(NullCache {}));
380        // |1-1-1-1|
381        tree.insert(Item {
382            key: AllocatorKey { device_range: 0..100 },
383            value: AllocatorValue::Abs { count: 1, owner_object_id: INVALID_OBJECT_ID },
384            sequence: 1u64,
385        })
386        .expect("insert error");
387        tree.seal();
388        // |1|0|1-1|
389        tree.insert(Item {
390            key: AllocatorKey { device_range: 25..50 },
391            value: AllocatorValue::None,
392            sequence: 2u64,
393        })
394        .expect("insert error");
395        // |1|0|1|2|
396        tree.insert(Item {
397            key: AllocatorKey { device_range: 75..100 },
398            value: AllocatorValue::Abs { count: 2, owner_object_id: INVALID_OBJECT_ID },
399            sequence: 3u64,
400        })
401        .expect("insert error");
402        let layer_set = tree.layer_set();
403        let mut merger = layer_set.merger();
404        let mut iter = filter_tombstones(merger.query(Query::FullScan).await.expect("seek failed"))
405            .await
406            .expect("filter failed");
407        assert_eq!(iter.get().unwrap().key, &AllocatorKey { device_range: 0..25 });
408        assert_eq!(
409            iter.get().unwrap().value,
410            &AllocatorValue::Abs { count: 1, owner_object_id: INVALID_OBJECT_ID }
411        );
412        assert_eq!(iter.get().unwrap().sequence, 1u64);
413        iter.advance().await.expect("advance failed");
414        assert_eq!(iter.get().unwrap().key, &AllocatorKey { device_range: 50..75 });
415        assert_eq!(
416            iter.get().unwrap().value,
417            &AllocatorValue::Abs { count: 1, owner_object_id: INVALID_OBJECT_ID }
418        );
419        assert_eq!(iter.get().unwrap().sequence, 1u64);
420        iter.advance().await.expect("advance failed");
421        assert_eq!(iter.get().unwrap().key, &AllocatorKey { device_range: 75..100 });
422        assert_eq!(
423            iter.get().unwrap().value,
424            &AllocatorValue::Abs { count: 2, owner_object_id: INVALID_OBJECT_ID }
425        );
426        assert_eq!(iter.get().unwrap().sequence, 3u64);
427        iter.advance().await.expect("advance failed");
428        assert!(iter.get().is_none());
429    }
430}