1use crate::lsm_tree::merge::ItemOp::{Discard, Keep, Replace};
6use crate::lsm_tree::merge::{MergeLayerIterator, MergeResult};
7use crate::lsm_tree::types::{Item, LayerIterator};
8use crate::object_store::allocator::{AllocatorKey, AllocatorValue};
9use anyhow::Error;
10use std::collections::HashSet;
11
12pub fn merge(
13 left: &MergeLayerIterator<'_, AllocatorKey, AllocatorValue>,
14 right: &MergeLayerIterator<'_, AllocatorKey, AllocatorValue>,
15) -> MergeResult<AllocatorKey, AllocatorValue> {
16 if left.key().device_range.end < right.key().device_range.start {
26 return MergeResult::EmitLeft;
27 }
28
29 if left.key().device_range.end == right.key().device_range.start {
34 if *left.value() == *right.value() {
36 return MergeResult::Other {
37 emit: None,
38 left: Discard,
39 right: Replace(
40 Item {
41 key: AllocatorKey {
42 device_range: left.key().device_range.start
43 ..right.key().device_range.end,
44 },
45 value: left.value().clone(),
46 sequence: std::cmp::min(left.sequence(), right.sequence()),
47 }
48 .boxed(),
49 ),
50 };
51 } else {
52 return MergeResult::EmitLeft;
53 }
54 }
55 if left.key().device_range.start == right.key().device_range.start {
56 if left.key().device_range.end < right.key().device_range.end {
61 if left.layer_index < right.layer_index {
63 return MergeResult::Other {
64 emit: None,
65 left: Keep,
66 right: if left.key().device_range.end == right.key().device_range.end {
67 Discard
68 } else {
69 Replace(
70 Item {
71 key: AllocatorKey {
72 device_range: left.key().device_range.end
73 ..right.key().device_range.end,
74 },
75 value: right.value().clone(),
76 sequence: right.sequence(),
77 }
78 .boxed(),
79 )
80 },
81 };
82 } else {
83 return MergeResult::Other { emit: None, left: Discard, right: Keep };
85 }
86
87 } else {
92 if right.layer_index < left.layer_index {
94 return MergeResult::Other {
95 emit: None,
96 left: if right.key().device_range.end == left.key().device_range.end {
97 Discard
98 } else {
99 Replace(
100 Item {
101 key: AllocatorKey {
102 device_range: right.key().device_range.end
103 ..left.key().device_range.end,
104 },
105 value: left.value().clone(),
106 sequence: left.sequence(),
107 }
108 .boxed(),
109 )
110 },
111 right: Keep,
112 };
113 } else {
114 return MergeResult::Other { emit: None, left: Keep, right: Discard };
116 }
117 }
118 }
119 debug_assert!(left.key().device_range.end >= right.key().device_range.start);
124 MergeResult::Other {
125 emit: Some(
126 Item {
127 key: AllocatorKey {
128 device_range: left.key().device_range.start..right.key().device_range.start,
129 },
130 value: left.value().clone(),
131 sequence: left.sequence(),
132 }
133 .boxed(),
134 ),
135 left: Replace(
136 Item {
137 key: AllocatorKey {
138 device_range: right.key().device_range.start..left.key().device_range.end,
139 },
140 value: left.value().clone(),
141 sequence: left.sequence(),
142 }
143 .boxed(),
144 ),
145 right: Keep,
146 }
147}
148
149pub async fn filter_tombstones(
150 iter: impl LayerIterator<AllocatorKey, AllocatorValue>,
151) -> Result<impl LayerIterator<AllocatorKey, AllocatorValue>, Error> {
152 Ok(iter.filter(|i| *i.value != AllocatorValue::None).await?)
153}
154
155pub async fn filter_marked_for_deletion(
156 iter: impl LayerIterator<AllocatorKey, AllocatorValue>,
157 marked_for_deletion: HashSet<u64>,
158) -> Result<impl LayerIterator<AllocatorKey, AllocatorValue>, Error> {
159 Ok(iter
160 .filter(move |i| {
161 if let AllocatorValue::Abs { owner_object_id, .. } = i.value {
162 !marked_for_deletion.contains(owner_object_id)
163 } else {
164 true
165 }
166 })
167 .await?)
168}
169
170#[cfg(test)]
171mod tests {
172 use crate::lsm_tree::cache::NullCache;
173 use crate::lsm_tree::types::{Item, ItemRef, LayerIterator};
174 use crate::lsm_tree::{LSMTree, Query};
175 use crate::object_handle::INVALID_OBJECT_ID;
176 use crate::object_store::allocator::merge::{filter_tombstones, merge};
177 use crate::object_store::allocator::{AllocatorKey, AllocatorValue};
178 use std::ops::Range;
179
180 async fn test_merge(
182 left: (Range<u64>, AllocatorValue),
183 right: (Range<u64>, AllocatorValue),
184 expected: &[(Range<u64>, AllocatorValue)],
185 ) {
186 let tree = LSMTree::new(merge, Box::new(NullCache {}));
187 tree.insert(Item::new(AllocatorKey { device_range: right.0 }, right.1))
188 .expect("insert error");
189 tree.seal();
190 tree.insert(Item::new(AllocatorKey { device_range: left.0 }, left.1))
191 .expect("insert error");
192 let layer_set = tree.layer_set();
193 let mut merger = layer_set.merger();
194 let mut iter = filter_tombstones(merger.query(Query::FullScan).await.expect("seek failed"))
195 .await
196 .expect("filter failed");
197 for e in expected {
198 let ItemRef { key, value, .. } = iter.get().expect("get failed");
199 assert_eq!((key, value), (&AllocatorKey { device_range: e.0.clone() }, &e.1));
200 iter.advance().await.expect("advance failed");
201 }
202 assert!(iter.get().is_none());
203 }
204
205 #[fuchsia::test]
206 async fn test_no_overlap() {
207 test_merge(
208 (0..100, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
209 (200..300, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
210 &[
211 (0..100, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
212 (200..300, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
213 ],
214 )
215 .await;
216 }
217
218 #[fuchsia::test]
219 async fn test_touching() {
220 test_merge(
221 (0..100, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
222 (100..200, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
223 &[(0..200, AllocatorValue::Abs { count: 1, owner_object_id: 1 })],
224 )
225 .await;
226 }
227
228 #[fuchsia::test]
229 async fn test_identical() {
230 test_merge(
231 (0..100, AllocatorValue::Abs { count: 2, owner_object_id: 1 }),
232 (0..100, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
233 &[(0..100, AllocatorValue::Abs { count: 2, owner_object_id: 1 })],
234 )
235 .await;
236 test_merge(
237 (0..100, AllocatorValue::None),
238 (0..100, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
239 &[],
240 )
241 .await;
242 }
243
244 #[fuchsia::test]
245 async fn test_left_smaller_than_right_with_same_start() {
246 test_merge(
247 (0..100, AllocatorValue::Abs { count: 2, owner_object_id: 1 }),
248 (0..200, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
249 &[
250 (0..100, AllocatorValue::Abs { count: 2, owner_object_id: 1 }),
251 (100..200, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
252 ],
253 )
254 .await;
255 test_merge(
256 (0..100, AllocatorValue::None),
257 (0..200, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
258 &[(100..200, AllocatorValue::Abs { count: 1, owner_object_id: 1 })],
259 )
260 .await;
261 }
262
263 #[fuchsia::test]
264 async fn test_left_starts_before_right_with_overlap() {
265 test_merge(
266 (0..200, AllocatorValue::Abs { count: 2, owner_object_id: 1 }),
267 (100..150, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
268 &[
269 (0..100, AllocatorValue::Abs { count: 2, owner_object_id: 1 }),
270 (100..200, AllocatorValue::Abs { count: 2, owner_object_id: 1 }),
271 ],
272 )
273 .await;
274 }
275
276 #[fuchsia::test]
277 async fn test_different_object_id() {
278 test_merge(
280 (0..100, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
281 (200..300, AllocatorValue::Abs { count: 1, owner_object_id: 2 }),
282 &[
283 (0..100, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
284 (200..300, AllocatorValue::Abs { count: 1, owner_object_id: 2 }),
285 ],
286 )
287 .await;
288 test_merge(
290 (0..100, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
291 (100..200, AllocatorValue::Abs { count: 1, owner_object_id: 2 }),
292 &[
293 (0..100, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
294 (100..200, AllocatorValue::Abs { count: 1, owner_object_id: 2 }),
295 ],
296 )
297 .await;
298 test_merge(
300 (0..100, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
301 (0..100, AllocatorValue::Abs { count: 1, owner_object_id: 2 }),
302 &[(0..100, AllocatorValue::Abs { count: 1, owner_object_id: 1 })],
303 )
304 .await;
305 test_merge(
307 (0..100, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
308 (0..200, AllocatorValue::Abs { count: 1, owner_object_id: 2 }),
309 &[
310 (0..100, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
311 (100..200, AllocatorValue::Abs { count: 1, owner_object_id: 2 }),
312 ],
313 )
314 .await;
315 test_merge(
317 (0..200, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
318 (0..100, AllocatorValue::Abs { count: 1, owner_object_id: 2 }),
319 &[(0..200, AllocatorValue::Abs { count: 1, owner_object_id: 1 })],
320 )
321 .await;
322 test_merge(
324 (0..100, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
325 (50..150, AllocatorValue::Abs { count: 1, owner_object_id: 2 }),
326 &[
327 (0..50, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
328 (50..100, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
329 (100..150, AllocatorValue::Abs { count: 1, owner_object_id: 2 }),
330 ],
331 )
332 .await;
333 }
334
335 #[fuchsia::test]
336 async fn test_tombstones() {
337 let key = AllocatorKey { device_range: 0..100 };
345 let lower_bound = AllocatorKey::lower_bound_for_merge_into(&key);
346 let tree = LSMTree::new(merge, Box::new(NullCache {}));
347 tree.merge_into(
348 Item::new(key.clone(), AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
349 &lower_bound,
350 );
351 tree.seal();
352 tree.merge_into(
353 Item::new(key.clone(), AllocatorValue::Abs { count: 2, owner_object_id: 1 }),
354 &lower_bound,
355 );
356 tree.seal();
357 tree.merge_into(Item::new(key.clone(), AllocatorValue::None), &lower_bound);
358 tree.merge_into(
359 Item::new(key.clone(), AllocatorValue::Abs { count: 1, owner_object_id: 2 }),
360 &lower_bound,
361 );
362 tree.seal();
363 tree.merge_into(Item::new(key.clone(), AllocatorValue::None), &lower_bound);
364 tree.merge_into(
365 Item::new(key.clone(), AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
366 &lower_bound,
367 );
368 let layer_set = tree.layer_set();
369 let mut merger = layer_set.merger();
370 let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
371 let ItemRef { key: k, value, .. } = iter.get().expect("get failed");
372 assert_eq!((k, value), (&key, &AllocatorValue::Abs { count: 1, owner_object_id: 1 }));
373 iter.advance().await.expect("advance failed");
374 assert!(iter.get().is_none());
375 }
376
377 #[fuchsia::test]
378 async fn test_merge_preserves_sequences() {
379 let tree = LSMTree::new(merge, Box::new(NullCache {}));
380 tree.insert(Item {
382 key: AllocatorKey { device_range: 0..100 },
383 value: AllocatorValue::Abs { count: 1, owner_object_id: INVALID_OBJECT_ID },
384 sequence: 1u64,
385 })
386 .expect("insert error");
387 tree.seal();
388 tree.insert(Item {
390 key: AllocatorKey { device_range: 25..50 },
391 value: AllocatorValue::None,
392 sequence: 2u64,
393 })
394 .expect("insert error");
395 tree.insert(Item {
397 key: AllocatorKey { device_range: 75..100 },
398 value: AllocatorValue::Abs { count: 2, owner_object_id: INVALID_OBJECT_ID },
399 sequence: 3u64,
400 })
401 .expect("insert error");
402 let layer_set = tree.layer_set();
403 let mut merger = layer_set.merger();
404 let mut iter = filter_tombstones(merger.query(Query::FullScan).await.expect("seek failed"))
405 .await
406 .expect("filter failed");
407 assert_eq!(iter.get().unwrap().key, &AllocatorKey { device_range: 0..25 });
408 assert_eq!(
409 iter.get().unwrap().value,
410 &AllocatorValue::Abs { count: 1, owner_object_id: INVALID_OBJECT_ID }
411 );
412 assert_eq!(iter.get().unwrap().sequence, 1u64);
413 iter.advance().await.expect("advance failed");
414 assert_eq!(iter.get().unwrap().key, &AllocatorKey { device_range: 50..75 });
415 assert_eq!(
416 iter.get().unwrap().value,
417 &AllocatorValue::Abs { count: 1, owner_object_id: INVALID_OBJECT_ID }
418 );
419 assert_eq!(iter.get().unwrap().sequence, 1u64);
420 iter.advance().await.expect("advance failed");
421 assert_eq!(iter.get().unwrap().key, &AllocatorKey { device_range: 75..100 });
422 assert_eq!(
423 iter.get().unwrap().value,
424 &AllocatorValue::Abs { count: 2, owner_object_id: INVALID_OBJECT_ID }
425 );
426 assert_eq!(iter.get().unwrap().sequence, 3u64);
427 iter.advance().await.expect("advance failed");
428 assert!(iter.get().is_none());
429 }
430}