1use crate::lsm_tree::merge::ItemOp::{Discard, Keep, Replace};
6use crate::lsm_tree::merge::{MergeLayerIterator, MergeResult};
7use crate::lsm_tree::types::{Item, LayerIterator};
8use crate::object_store::allocator::{AllocatorKey, AllocatorValue};
9use anyhow::Error;
10use std::collections::HashSet;
11
12pub fn merge(
13 left: &MergeLayerIterator<'_, AllocatorKey, AllocatorValue>,
14 right: &MergeLayerIterator<'_, AllocatorKey, AllocatorValue>,
15) -> MergeResult<AllocatorKey, AllocatorValue> {
16 if left.key().device_range.end < right.key().device_range.start {
26 return MergeResult::EmitLeft;
27 }
28
29 if left.key().device_range.end == right.key().device_range.start {
34 if *left.value() == *right.value() {
36 return MergeResult::Other {
37 emit: None,
38 left: Discard,
39 right: Replace(
40 Item::new(
41 AllocatorKey {
42 device_range: left.key().device_range.start
43 ..right.key().device_range.end,
44 },
45 left.value().clone(),
46 )
47 .boxed(),
48 ),
49 };
50 } else {
51 return MergeResult::EmitLeft;
52 }
53 }
54 if left.key().device_range.start == right.key().device_range.start {
55 if left.key().device_range.end < right.key().device_range.end {
60 if left.layer_index < right.layer_index {
62 return MergeResult::Other {
63 emit: None,
64 left: Keep,
65 right: if left.key().device_range.end == right.key().device_range.end {
66 Discard
67 } else {
68 Replace(
69 Item::new(
70 AllocatorKey {
71 device_range: left.key().device_range.end
72 ..right.key().device_range.end,
73 },
74 right.value().clone(),
75 )
76 .boxed(),
77 )
78 },
79 };
80 } else {
81 return MergeResult::Other { emit: None, left: Discard, right: Keep };
83 }
84
85 } else {
90 if right.layer_index < left.layer_index {
92 return MergeResult::Other {
93 emit: None,
94 left: if right.key().device_range.end == left.key().device_range.end {
95 Discard
96 } else {
97 Replace(
98 Item::new(
99 AllocatorKey {
100 device_range: right.key().device_range.end
101 ..left.key().device_range.end,
102 },
103 left.value().clone(),
104 )
105 .boxed(),
106 )
107 },
108 right: Keep,
109 };
110 } else {
111 return MergeResult::Other { emit: None, left: Keep, right: Discard };
113 }
114 }
115 }
116 debug_assert!(left.key().device_range.end >= right.key().device_range.start);
121 MergeResult::Other {
122 emit: Some(
123 Item::new(
124 AllocatorKey {
125 device_range: left.key().device_range.start..right.key().device_range.start,
126 },
127 left.value().clone(),
128 )
129 .boxed(),
130 ),
131 left: Replace(
132 Item::new(
133 AllocatorKey {
134 device_range: right.key().device_range.start..left.key().device_range.end,
135 },
136 left.value().clone(),
137 )
138 .boxed(),
139 ),
140 right: Keep,
141 }
142}
143
144pub async fn filter_tombstones(
145 iter: impl LayerIterator<AllocatorKey, AllocatorValue>,
146) -> Result<impl LayerIterator<AllocatorKey, AllocatorValue>, Error> {
147 Ok(iter.filter(|i| *i.value != AllocatorValue::None).await?)
148}
149
150pub async fn filter_marked_for_deletion(
151 iter: impl LayerIterator<AllocatorKey, AllocatorValue>,
152 marked_for_deletion: HashSet<u64>,
153) -> Result<impl LayerIterator<AllocatorKey, AllocatorValue>, Error> {
154 Ok(iter
155 .filter(move |i| {
156 if let AllocatorValue::Abs { owner_object_id, .. } = i.value {
157 !marked_for_deletion.contains(owner_object_id)
158 } else {
159 true
160 }
161 })
162 .await?)
163}
164
165#[cfg(test)]
166mod tests {
167 use crate::lsm_tree::cache::NullCache;
168 use crate::lsm_tree::types::{Item, ItemRef, LayerIterator};
169 use crate::lsm_tree::{LSMTree, Query};
170 use crate::object_store::allocator::merge::{filter_tombstones, merge};
171 use crate::object_store::allocator::{AllocatorKey, AllocatorValue};
172 use std::ops::Range;
173
174 async fn test_merge(
176 left: (Range<u64>, AllocatorValue),
177 right: (Range<u64>, AllocatorValue),
178 expected: &[(Range<u64>, AllocatorValue)],
179 ) {
180 let tree = LSMTree::new(merge, Box::new(NullCache {}));
181 tree.insert(Item::new(AllocatorKey { device_range: right.0 }, right.1))
182 .expect("insert error");
183 tree.seal();
184 tree.insert(Item::new(AllocatorKey { device_range: left.0 }, left.1))
185 .expect("insert error");
186 let layer_set = tree.layer_set();
187 let mut merger = layer_set.merger();
188 let mut iter = filter_tombstones(merger.query(Query::FullScan).await.expect("seek failed"))
189 .await
190 .expect("filter failed");
191 for e in expected {
192 let ItemRef { key, value, .. } = iter.get().expect("get failed");
193 assert_eq!((key, value), (&AllocatorKey { device_range: e.0.clone() }, &e.1));
194 iter.advance().await.expect("advance failed");
195 }
196 assert!(iter.get().is_none());
197 }
198
199 #[fuchsia::test]
200 async fn test_no_overlap() {
201 test_merge(
202 (0..100, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
203 (200..300, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
204 &[
205 (0..100, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
206 (200..300, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
207 ],
208 )
209 .await;
210 }
211
212 #[fuchsia::test]
213 async fn test_touching() {
214 test_merge(
215 (0..100, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
216 (100..200, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
217 &[(0..200, AllocatorValue::Abs { count: 1, owner_object_id: 1 })],
218 )
219 .await;
220 }
221
222 #[fuchsia::test]
223 async fn test_identical() {
224 test_merge(
225 (0..100, AllocatorValue::Abs { count: 2, owner_object_id: 1 }),
226 (0..100, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
227 &[(0..100, AllocatorValue::Abs { count: 2, owner_object_id: 1 })],
228 )
229 .await;
230 test_merge(
231 (0..100, AllocatorValue::None),
232 (0..100, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
233 &[],
234 )
235 .await;
236 }
237
238 #[fuchsia::test]
239 async fn test_left_smaller_than_right_with_same_start() {
240 test_merge(
241 (0..100, AllocatorValue::Abs { count: 2, owner_object_id: 1 }),
242 (0..200, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
243 &[
244 (0..100, AllocatorValue::Abs { count: 2, owner_object_id: 1 }),
245 (100..200, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
246 ],
247 )
248 .await;
249 test_merge(
250 (0..100, AllocatorValue::None),
251 (0..200, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
252 &[(100..200, AllocatorValue::Abs { count: 1, owner_object_id: 1 })],
253 )
254 .await;
255 }
256
257 #[fuchsia::test]
258 async fn test_left_starts_before_right_with_overlap() {
259 test_merge(
260 (0..200, AllocatorValue::Abs { count: 2, owner_object_id: 1 }),
261 (100..150, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
262 &[
263 (0..100, AllocatorValue::Abs { count: 2, owner_object_id: 1 }),
264 (100..200, AllocatorValue::Abs { count: 2, owner_object_id: 1 }),
265 ],
266 )
267 .await;
268 }
269
270 #[fuchsia::test]
271 async fn test_different_object_id() {
272 test_merge(
274 (0..100, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
275 (200..300, AllocatorValue::Abs { count: 1, owner_object_id: 2 }),
276 &[
277 (0..100, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
278 (200..300, AllocatorValue::Abs { count: 1, owner_object_id: 2 }),
279 ],
280 )
281 .await;
282 test_merge(
284 (0..100, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
285 (100..200, AllocatorValue::Abs { count: 1, owner_object_id: 2 }),
286 &[
287 (0..100, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
288 (100..200, AllocatorValue::Abs { count: 1, owner_object_id: 2 }),
289 ],
290 )
291 .await;
292 test_merge(
294 (0..100, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
295 (0..100, AllocatorValue::Abs { count: 1, owner_object_id: 2 }),
296 &[(0..100, AllocatorValue::Abs { count: 1, owner_object_id: 1 })],
297 )
298 .await;
299 test_merge(
301 (0..100, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
302 (0..200, AllocatorValue::Abs { count: 1, owner_object_id: 2 }),
303 &[
304 (0..100, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
305 (100..200, AllocatorValue::Abs { count: 1, owner_object_id: 2 }),
306 ],
307 )
308 .await;
309 test_merge(
311 (0..200, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
312 (0..100, AllocatorValue::Abs { count: 1, owner_object_id: 2 }),
313 &[(0..200, AllocatorValue::Abs { count: 1, owner_object_id: 1 })],
314 )
315 .await;
316 test_merge(
318 (0..100, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
319 (50..150, AllocatorValue::Abs { count: 1, owner_object_id: 2 }),
320 &[
321 (0..50, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
322 (50..100, AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
323 (100..150, AllocatorValue::Abs { count: 1, owner_object_id: 2 }),
324 ],
325 )
326 .await;
327 }
328
329 #[fuchsia::test]
330 async fn test_tombstones() {
331 let key = AllocatorKey { device_range: 0..100 };
339 let lower_bound = AllocatorKey::lower_bound_for_merge_into(&key);
340 let tree = LSMTree::new(merge, Box::new(NullCache {}));
341 tree.merge_into(
342 Item::new(key.clone(), AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
343 &lower_bound,
344 );
345 tree.seal();
346 tree.merge_into(
347 Item::new(key.clone(), AllocatorValue::Abs { count: 2, owner_object_id: 1 }),
348 &lower_bound,
349 );
350 tree.seal();
351 tree.merge_into(Item::new(key.clone(), AllocatorValue::None), &lower_bound);
352 tree.merge_into(
353 Item::new(key.clone(), AllocatorValue::Abs { count: 1, owner_object_id: 2 }),
354 &lower_bound,
355 );
356 tree.seal();
357 tree.merge_into(Item::new(key.clone(), AllocatorValue::None), &lower_bound);
358 tree.merge_into(
359 Item::new(key.clone(), AllocatorValue::Abs { count: 1, owner_object_id: 1 }),
360 &lower_bound,
361 );
362 let layer_set = tree.layer_set();
363 let mut merger = layer_set.merger();
364 let mut iter = merger.query(Query::FullScan).await.expect("seek failed");
365 let ItemRef { key: k, value, .. } = iter.get().expect("get failed");
366 assert_eq!((k, value), (&key, &AllocatorValue::Abs { count: 1, owner_object_id: 1 }));
367 iter.advance().await.expect("advance failed");
368 assert!(iter.get().is_none());
369 }
370}