1use crate::lsm_tree::merge::ItemOp::{Discard, Keep, Replace};
6use crate::lsm_tree::merge::{MergeLayerIterator, MergeResult};
7use crate::lsm_tree::types::{Item, LayerIterator};
8use crate::object_store::ExtentKey;
9use crate::object_store::allocator::{AllocatorKey, AllocatorValue};
10use anyhow::Error;
11use std::collections::HashSet;
12
13pub fn merge(
14 left: &MergeLayerIterator<'_, AllocatorKey, AllocatorValue>,
15 right: &MergeLayerIterator<'_, AllocatorKey, AllocatorValue>,
16) -> MergeResult<AllocatorKey, AllocatorValue> {
17 if left.key().device_range.end < right.key().device_range.start {
27 return MergeResult::EmitLeft;
28 }
29
30 if left.key().device_range.end == right.key().device_range.start {
35 if *left.value() == *right.value() {
37 return MergeResult::Other {
38 emit: None,
39 left: Discard,
40 right: Replace(
41 Item::new(
42 AllocatorKey {
43 device_range: ExtentKey::new(
44 left.key().device_range.start..right.key().device_range.end,
45 ),
46 },
47 left.value().clone(),
48 )
49 .boxed(),
50 ),
51 };
52 } else {
53 return MergeResult::EmitLeft;
54 }
55 }
56 if left.key().device_range.start == right.key().device_range.start {
57 if left.key().device_range.end < right.key().device_range.end {
62 if left.layer_index < right.layer_index {
64 return MergeResult::Other {
65 emit: None,
66 left: Keep,
67 right: if left.key().device_range.end == right.key().device_range.end {
68 Discard
69 } else {
70 Replace(
71 Item::new(
72 AllocatorKey {
73 device_range: ExtentKey::new(
74 left.key().device_range.end..right.key().device_range.end,
75 ),
76 },
77 right.value().clone(),
78 )
79 .boxed(),
80 )
81 },
82 };
83 } else {
84 return MergeResult::Other { emit: None, left: Discard, right: Keep };
86 }
87
88 } else {
93 if right.layer_index < left.layer_index {
95 return MergeResult::Other {
96 emit: None,
97 left: if right.key().device_range.end == left.key().device_range.end {
98 Discard
99 } else {
100 Replace(
101 Item::new(
102 AllocatorKey {
103 device_range: ExtentKey::new(
104 right.key().device_range.end..left.key().device_range.end,
105 ),
106 },
107 left.value().clone(),
108 )
109 .boxed(),
110 )
111 },
112 right: Keep,
113 };
114 } else {
115 return MergeResult::Other { emit: None, left: Keep, right: Discard };
117 }
118 }
119 }
120 debug_assert!(left.key().device_range.end >= right.key().device_range.start);
125 MergeResult::Other {
126 emit: Some(
127 Item::new(
128 AllocatorKey {
129 device_range: ExtentKey::new(
130 left.key().device_range.start..right.key().device_range.start,
131 ),
132 },
133 left.value().clone(),
134 )
135 .boxed(),
136 ),
137 left: Replace(
138 Item::new(
139 AllocatorKey {
140 device_range: ExtentKey::new(
141 right.key().device_range.start..left.key().device_range.end,
142 ),
143 },
144 left.value().clone(),
145 )
146 .boxed(),
147 ),
148 right: Keep,
149 }
150}
151
152pub async fn filter_tombstones(
153 iter: impl LayerIterator<AllocatorKey, AllocatorValue>,
154) -> Result<impl LayerIterator<AllocatorKey, AllocatorValue>, Error> {
155 Ok(iter.filter(|i| *i.value != AllocatorValue::None).await?)
156}
157
158pub async fn filter_marked_for_deletion(
159 iter: impl LayerIterator<AllocatorKey, AllocatorValue>,
160 marked_for_deletion: HashSet<u64>,
161) -> Result<impl LayerIterator<AllocatorKey, AllocatorValue>, Error> {
162 Ok(iter
163 .filter(move |i| {
164 if let AllocatorValue::Abs { owner_object_id, .. } = i.value {
165 !marked_for_deletion.contains(owner_object_id)
166 } else {
167 true
168 }
169 })
170 .await?)
171}
172
173#[cfg(test)]
174mod tests {
175 use crate::lsm_tree::cache::NullCache;
176 use crate::lsm_tree::types::{Item, ItemRef, LayerIterator};
177 use crate::lsm_tree::{LSMTree, Query};
178 use crate::object_store::allocator::merge::{filter_tombstones, merge};
179 use crate::object_store::allocator::{AllocatorKey, AllocatorValue};
180 use std::ops::Range;
181
182 async fn test_merge(
184 left: (Range<u64>, AllocatorValue),
185 right: (Range<u64>, AllocatorValue),
186 expected: &[(Range<u64>, AllocatorValue)],
187 ) {
188 let tree = LSMTree::new(merge, Box::new(NullCache {}));
189 tree.insert(Item::new(AllocatorKey { device_range: right.0.into() }, right.1))
190 .expect("insert error");
191 tree.seal();
192 tree.insert(Item::new(AllocatorKey { device_range: left.0.into() }, left.1))
193 .expect("insert error");
194 let layer_set = tree.layer_set();
195 let mut merger = layer_set.merger();
196 let mut iter = filter_tombstones(merger.query(Query::FullScan).await.expect("seek failed"))
197 .await
198 .expect("filter failed");
199 for e in expected {
200 let ItemRef { key, value, .. } = iter.get().expect("get failed");
201 assert_eq!((key, value), (&AllocatorKey { device_range: e.0.clone().into() }, &e.1));
202 iter.advance().await.expect("advance failed");
203 }
204 assert!(iter.get().is_none());
205 }
206
207 #[fuchsia::test]
208 async fn test_no_overlap() {
209 test_merge(
210 (0..100, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
211 (200..300, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
212 &[
213 (0..100, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
214 (200..300, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
215 ],
216 )
217 .await;
218 }
219
220 #[fuchsia::test]
221 async fn test_touching() {
222 test_merge(
223 (0..100, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
224 (100..200, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
225 &[(0..200, AllocatorValue::Abs { count: 1, owner_object_id: 1 })],
226 )
227 .await;
228 }
229
230 #[fuchsia::test]
231 async fn test_identical() {
232 test_merge(
233 (0..100, AllocatorValue::Abs { count: 2, owner_object_id: 1 }),
234 (0..100, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
235 &[(0..100, AllocatorValue::Abs { count: 2, owner_object_id: 1 })],
236 )
237 .await;
238 test_merge(
239 (0..100, AllocatorValue::None),
240 (0..100, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
241 &[],
242 )
243 .await;
244 }
245
246 #[fuchsia::test]
247 async fn test_left_smaller_than_right_with_same_start() {
248 test_merge(
249 (0..100, AllocatorValue::Abs { count: 2, owner_object_id: 1 }),
250 (0..200, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
251 &[
252 (0..100, AllocatorValue::Abs { count: 2, owner_object_id: 1 }),
253 (100..200, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
254 ],
255 )
256 .await;
257 test_merge(
258 (0..100, AllocatorValue::None),
259 (0..200, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
260 &[(100..200, AllocatorValue::Abs { count: 1, owner_object_id: 1 })],
261 )
262 .await;
263 }
264
265 #[fuchsia::test]
266 async fn test_left_starts_before_right_with_overlap() {
267 test_merge(
268 (0..200, AllocatorValue::Abs { count: 2, owner_object_id: 1 }),
269 (100..150, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
270 &[
271 (0..100, AllocatorValue::Abs { count: 2, owner_object_id: 1 }),
272 (100..200, AllocatorValue::Abs { count: 2, owner_object_id: 1 }),
273 ],
274 )
275 .await;
276 }
277
278 #[fuchsia::test]
279 async fn test_different_object_id() {
280 test_merge(
282 (0..100, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
283 (200..300, AllocatorValue::Abs { count: 1, owner_object_id: 2 }),
284 &[
285 (0..100, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
286 (200..300, AllocatorValue::Abs { count: 1, owner_object_id: 2 }),
287 ],
288 )
289 .await;
290 test_merge(
292 (0..100, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
293 (100..200, AllocatorValue::Abs { count: 1, owner_object_id: 2 }),
294 &[
295 (0..100, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
296 (100..200, AllocatorValue::Abs { count: 1, owner_object_id: 2 }),
297 ],
298 )
299 .await;
300 test_merge(
302 (0..100, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
303 (0..100, AllocatorValue::Abs { count: 1, owner_object_id: 2 }),
304 &[(0..100, AllocatorValue::Abs { count: 1, owner_object_id: 1 })],
305 )
306 .await;
307 test_merge(
309 (0..100, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
310 (0..200, AllocatorValue::Abs { count: 1, owner_object_id: 2 }),
311 &[
312 (0..100, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
313 (100..200, AllocatorValue::Abs { count: 1, owner_object_id: 2 }),
314 ],
315 )
316 .await;
317 test_merge(
319 (0..200, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
320 (0..100, AllocatorValue::Abs { count: 1, owner_object_id: 2 }),
321 &[(0..200, AllocatorValue::Abs { count: 1, owner_object_id: 1 })],
322 )
323 .await;
324 test_merge(
326 (0..100, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
327 (50..150, AllocatorValue::Abs { count: 1, owner_object_id: 2 }),
328 &[
329 (0..50, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
330 (50..100, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
331 (100..150, AllocatorValue::Abs { count: 1, owner_object_id: 2 }),
332 ],
333 )
334 .await;
335 }
336
337 #[fuchsia::test]
338 async fn test_tombstones() {
339 let key = AllocatorKey { device_range: (0..100 * 512).into() };
347 let lower_bound = AllocatorKey::lower_bound_for_merge_into(&key);
348 let tree = LSMTree::new(merge, Box::new(NullCache {}));
349 tree.merge_into(
350 Item::new(key.clone(), AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
351 &lower_bound,
352 );
353 tree.seal();
354 tree.merge_into(
355 Item::new(key.clone(), AllocatorValue::Abs { count: 2, owner_object_id: 1 }),
356 &lower_bound,
357 );
358 tree.seal();
359 tree.merge_into(Item::new(key.clone(), AllocatorValue::None), &lower_bound);
360 tree.merge_into(
361 Item::new(key.clone(), AllocatorValue::Abs { count: 1, owner_object_id: 2 }),
362 &lower_bound,
363 );
364 tree.seal();
365 tree.merge_into(Item::new(key.clone(), AllocatorValue::None), &lower_bound);
366 tree.merge_into(
367 Item::new(key.clone(), AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
368 &lower_bound,
369 );
370 let layer_set = tree.layer_set();
371 let mut merger = layer_set.merger();
372 let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
373 let ItemRef { key: k, value, .. } = iter.get().expect("get failed");
374 assert_eq!((k, value), (&key, &AllocatorValue::Abs { count: 1, owner_object_id: 1 }));
375 iter.advance().await.expect("advance failed");
376 assert!(iter.get().is_none());
377 }
378
379 #[fuchsia::test]
380 async fn test_overlapping_boundaries() {
381 let base = (50..100, AllocatorValue::Abs { count: 1, owner_object_id: 1 });
382
383 let test_val = AllocatorValue::Abs { count: 1, owner_object_id: 2 };
384
385 test_merge(
390 base.clone(),
391 (49..100, test_val.clone()),
392 &[
393 (49..50, AllocatorValue::Abs { count: 1, owner_object_id: 2 }),
394 (50..100, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
395 ],
396 )
397 .await;
398 test_merge(
400 (49..100, test_val.clone()),
401 base.clone(),
402 &[
403 (49..50, AllocatorValue::Abs { count: 1, owner_object_id: 2 }),
404 (50..100, AllocatorValue::Abs { count: 1, owner_object_id: 2 }),
405 ],
406 )
407 .await;
408
409 test_merge(
412 base.clone(),
413 (51..100, test_val.clone()),
414 &[
415 (50..51, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
416 (51..100, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
417 ],
418 )
419 .await;
420 test_merge(
422 (51..100, test_val.clone()),
423 base.clone(),
424 &[
425 (50..51, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
426 (51..100, AllocatorValue::Abs { count: 1, owner_object_id: 2 }),
427 ],
428 )
429 .await;
430
431 test_merge(
435 base.clone(),
436 (50..99, test_val.clone()),
437 &[(50..100, AllocatorValue::Abs { count: 1, owner_object_id: 1 })],
438 )
439 .await;
440 test_merge(
442 (50..99, test_val.clone()),
443 base.clone(),
444 &[
445 (50..99, AllocatorValue::Abs { count: 1, owner_object_id: 2 }),
446 (99..100, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
447 ],
448 )
449 .await;
450
451 test_merge(
454 base.clone(),
455 (50..101, test_val.clone()),
456 &[
457 (50..100, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
458 (100..101, AllocatorValue::Abs { count: 1, owner_object_id: 2 }),
459 ],
460 )
461 .await;
462 test_merge(
464 (50..101, test_val.clone()),
465 base.clone(),
466 &[(50..101, AllocatorValue::Abs { count: 1, owner_object_id: 2 })],
467 )
468 .await;
469
470 test_merge(
474 base.clone(),
475 (49..99, test_val.clone()),
476 &[
477 (49..50, AllocatorValue::Abs { count: 1, owner_object_id: 2 }),
478 (50..100, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
479 ],
480 )
481 .await;
482 test_merge(
484 (49..99, test_val.clone()),
485 base.clone(),
486 &[
487 (49..50, AllocatorValue::Abs { count: 1, owner_object_id: 2 }),
488 (50..99, AllocatorValue::Abs { count: 1, owner_object_id: 2 }),
489 (99..100, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
490 ],
491 )
492 .await;
493
494 test_merge(
497 base.clone(),
498 (51..101, test_val.clone()),
499 &[
500 (50..51, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
501 (51..100, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
502 (100..101, AllocatorValue::Abs { count: 1, owner_object_id: 2 }),
503 ],
504 )
505 .await;
506 test_merge(
508 (51..101, test_val.clone()),
509 base.clone(),
510 &[
511 (50..51, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
512 (51..101, AllocatorValue::Abs { count: 1, owner_object_id: 2 }),
513 ],
514 )
515 .await;
516 }
517
518 #[fuchsia::test]
519 async fn test_length_1_ranges() {
520 test_merge(
522 (10..11, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
523 (11..12, AllocatorValue::Abs { count: 1, owner_object_id: 2 }),
524 &[
525 (10..11, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
526 (11..12, AllocatorValue::Abs { count: 1, owner_object_id: 2 }),
527 ],
528 )
529 .await;
530
531 test_merge(
533 (10..11, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
534 (10..11, AllocatorValue::Abs { count: 1, owner_object_id: 2 }),
535 &[(10..11, AllocatorValue::Abs { count: 1, owner_object_id: 1 })],
536 )
537 .await;
538 }
539}