crossbeam_epoch/
collector.rs
1use core::fmt;
16
17use crate::guard::Guard;
18use crate::internal::{Global, Local};
19use crate::primitive::sync::Arc;
20
21pub struct Collector {
23 pub(crate) global: Arc<Global>,
24}
25
26unsafe impl Send for Collector {}
27unsafe impl Sync for Collector {}
28
29impl Default for Collector {
30 fn default() -> Self {
31 Self {
32 global: Arc::new(Global::new()),
33 }
34 }
35}
36
37impl Collector {
38 pub fn new() -> Self {
40 Self::default()
41 }
42
43 pub fn register(&self) -> LocalHandle {
45 Local::register(self)
46 }
47}
48
49impl Clone for Collector {
50 fn clone(&self) -> Self {
52 Collector {
53 global: self.global.clone(),
54 }
55 }
56}
57
58impl fmt::Debug for Collector {
59 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
60 f.pad("Collector { .. }")
61 }
62}
63
64impl PartialEq for Collector {
65 fn eq(&self, rhs: &Collector) -> bool {
67 Arc::ptr_eq(&self.global, &rhs.global)
68 }
69}
70impl Eq for Collector {}
71
72pub struct LocalHandle {
74 pub(crate) local: *const Local,
75}
76
77impl LocalHandle {
78 #[inline]
80 pub fn pin(&self) -> Guard {
81 unsafe { (*self.local).pin() }
82 }
83
84 #[inline]
86 pub fn is_pinned(&self) -> bool {
87 unsafe { (*self.local).is_pinned() }
88 }
89
90 #[inline]
92 pub fn collector(&self) -> &Collector {
93 unsafe { (*self.local).collector() }
94 }
95}
96
97impl Drop for LocalHandle {
98 #[inline]
99 fn drop(&mut self) {
100 unsafe {
101 Local::release_handle(&*self.local);
102 }
103 }
104}
105
106impl fmt::Debug for LocalHandle {
107 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
108 f.pad("LocalHandle { .. }")
109 }
110}
111
112#[cfg(all(test, not(crossbeam_loom)))]
113mod tests {
114 use std::mem;
115 use std::sync::atomic::{AtomicUsize, Ordering};
116
117 use crossbeam_utils::thread;
118
119 use crate::{Collector, Owned};
120
121 const NUM_THREADS: usize = 8;
122
123 #[test]
124 fn pin_reentrant() {
125 let collector = Collector::new();
126 let handle = collector.register();
127 drop(collector);
128
129 assert!(!handle.is_pinned());
130 {
131 let _guard = &handle.pin();
132 assert!(handle.is_pinned());
133 {
134 let _guard = &handle.pin();
135 assert!(handle.is_pinned());
136 }
137 assert!(handle.is_pinned());
138 }
139 assert!(!handle.is_pinned());
140 }
141
142 #[test]
143 fn flush_local_bag() {
144 let collector = Collector::new();
145 let handle = collector.register();
146 drop(collector);
147
148 for _ in 0..100 {
149 let guard = &handle.pin();
150 unsafe {
151 let a = Owned::new(7).into_shared(guard);
152 guard.defer_destroy(a);
153
154 assert!(!(*guard.local).bag.with(|b| (*b).is_empty()));
155
156 while !(*guard.local).bag.with(|b| (*b).is_empty()) {
157 guard.flush();
158 }
159 }
160 }
161 }
162
163 #[test]
164 fn garbage_buffering() {
165 let collector = Collector::new();
166 let handle = collector.register();
167 drop(collector);
168
169 let guard = &handle.pin();
170 unsafe {
171 for _ in 0..10 {
172 let a = Owned::new(7).into_shared(guard);
173 guard.defer_destroy(a);
174 }
175 assert!(!(*guard.local).bag.with(|b| (*b).is_empty()));
176 }
177 }
178
179 #[test]
180 fn pin_holds_advance() {
181 let collector = Collector::new();
182
183 thread::scope(|scope| {
184 for _ in 0..NUM_THREADS {
185 scope.spawn(|_| {
186 let handle = collector.register();
187 for _ in 0..500_000 {
188 let guard = &handle.pin();
189
190 let before = collector.global.epoch.load(Ordering::Relaxed);
191 collector.global.collect(guard);
192 let after = collector.global.epoch.load(Ordering::Relaxed);
193
194 assert!(after.wrapping_sub(before) <= 2);
195 }
196 });
197 }
198 })
199 .unwrap();
200 }
201
202 #[cfg(not(crossbeam_sanitize))] #[test]
204 fn incremental() {
205 const COUNT: usize = 100_000;
206 static DESTROYS: AtomicUsize = AtomicUsize::new(0);
207
208 let collector = Collector::new();
209 let handle = collector.register();
210
211 unsafe {
212 let guard = &handle.pin();
213 for _ in 0..COUNT {
214 let a = Owned::new(7i32).into_shared(guard);
215 guard.defer_unchecked(move || {
216 drop(a.into_owned());
217 DESTROYS.fetch_add(1, Ordering::Relaxed);
218 });
219 }
220 guard.flush();
221 }
222
223 let mut last = 0;
224
225 while last < COUNT {
226 let curr = DESTROYS.load(Ordering::Relaxed);
227 assert!(curr - last <= 1024);
228 last = curr;
229
230 let guard = &handle.pin();
231 collector.global.collect(guard);
232 }
233 assert!(DESTROYS.load(Ordering::Relaxed) == 100_000);
234 }
235
236 #[test]
237 fn buffering() {
238 const COUNT: usize = 10;
239 static DESTROYS: AtomicUsize = AtomicUsize::new(0);
240
241 let collector = Collector::new();
242 let handle = collector.register();
243
244 unsafe {
245 let guard = &handle.pin();
246 for _ in 0..COUNT {
247 let a = Owned::new(7i32).into_shared(guard);
248 guard.defer_unchecked(move || {
249 drop(a.into_owned());
250 DESTROYS.fetch_add(1, Ordering::Relaxed);
251 });
252 }
253 }
254
255 for _ in 0..100_000 {
256 collector.global.collect(&handle.pin());
257 }
258 assert!(DESTROYS.load(Ordering::Relaxed) < COUNT);
259
260 handle.pin().flush();
261
262 while DESTROYS.load(Ordering::Relaxed) < COUNT {
263 let guard = &handle.pin();
264 collector.global.collect(guard);
265 }
266 assert_eq!(DESTROYS.load(Ordering::Relaxed), COUNT);
267 }
268
269 #[test]
270 fn count_drops() {
271 const COUNT: usize = 100_000;
272 static DROPS: AtomicUsize = AtomicUsize::new(0);
273
274 struct Elem(i32);
275
276 impl Drop for Elem {
277 fn drop(&mut self) {
278 DROPS.fetch_add(1, Ordering::Relaxed);
279 }
280 }
281
282 let collector = Collector::new();
283 let handle = collector.register();
284
285 unsafe {
286 let guard = &handle.pin();
287
288 for _ in 0..COUNT {
289 let a = Owned::new(Elem(7i32)).into_shared(guard);
290 guard.defer_destroy(a);
291 }
292 guard.flush();
293 }
294
295 while DROPS.load(Ordering::Relaxed) < COUNT {
296 let guard = &handle.pin();
297 collector.global.collect(guard);
298 }
299 assert_eq!(DROPS.load(Ordering::Relaxed), COUNT);
300 }
301
302 #[test]
303 fn count_destroy() {
304 const COUNT: usize = 100_000;
305 static DESTROYS: AtomicUsize = AtomicUsize::new(0);
306
307 let collector = Collector::new();
308 let handle = collector.register();
309
310 unsafe {
311 let guard = &handle.pin();
312
313 for _ in 0..COUNT {
314 let a = Owned::new(7i32).into_shared(guard);
315 guard.defer_unchecked(move || {
316 drop(a.into_owned());
317 DESTROYS.fetch_add(1, Ordering::Relaxed);
318 });
319 }
320 guard.flush();
321 }
322
323 while DESTROYS.load(Ordering::Relaxed) < COUNT {
324 let guard = &handle.pin();
325 collector.global.collect(guard);
326 }
327 assert_eq!(DESTROYS.load(Ordering::Relaxed), COUNT);
328 }
329
330 #[test]
331 fn drop_array() {
332 const COUNT: usize = 700;
333 static DROPS: AtomicUsize = AtomicUsize::new(0);
334
335 struct Elem(i32);
336
337 impl Drop for Elem {
338 fn drop(&mut self) {
339 DROPS.fetch_add(1, Ordering::Relaxed);
340 }
341 }
342
343 let collector = Collector::new();
344 let handle = collector.register();
345
346 let mut guard = handle.pin();
347
348 let mut v = Vec::with_capacity(COUNT);
349 for i in 0..COUNT {
350 v.push(Elem(i as i32));
351 }
352
353 {
354 let a = Owned::new(v).into_shared(&guard);
355 unsafe {
356 guard.defer_destroy(a);
357 }
358 guard.flush();
359 }
360
361 while DROPS.load(Ordering::Relaxed) < COUNT {
362 guard.repin();
363 collector.global.collect(&guard);
364 }
365 assert_eq!(DROPS.load(Ordering::Relaxed), COUNT);
366 }
367
368 #[test]
369 fn destroy_array() {
370 const COUNT: usize = 100_000;
371 static DESTROYS: AtomicUsize = AtomicUsize::new(0);
372
373 let collector = Collector::new();
374 let handle = collector.register();
375
376 unsafe {
377 let guard = &handle.pin();
378
379 let mut v = Vec::with_capacity(COUNT);
380 for i in 0..COUNT {
381 v.push(i as i32);
382 }
383
384 let ptr = v.as_mut_ptr() as usize;
385 let len = v.len();
386 guard.defer_unchecked(move || {
387 drop(Vec::from_raw_parts(ptr as *const i32 as *mut i32, len, len));
388 DESTROYS.fetch_add(len, Ordering::Relaxed);
389 });
390 guard.flush();
391
392 mem::forget(v);
393 }
394
395 while DESTROYS.load(Ordering::Relaxed) < COUNT {
396 let guard = &handle.pin();
397 collector.global.collect(guard);
398 }
399 assert_eq!(DESTROYS.load(Ordering::Relaxed), COUNT);
400 }
401
402 #[test]
403 fn stress() {
404 const THREADS: usize = 8;
405 const COUNT: usize = 100_000;
406 static DROPS: AtomicUsize = AtomicUsize::new(0);
407
408 struct Elem(i32);
409
410 impl Drop for Elem {
411 fn drop(&mut self) {
412 DROPS.fetch_add(1, Ordering::Relaxed);
413 }
414 }
415
416 let collector = Collector::new();
417
418 thread::scope(|scope| {
419 for _ in 0..THREADS {
420 scope.spawn(|_| {
421 let handle = collector.register();
422 for _ in 0..COUNT {
423 let guard = &handle.pin();
424 unsafe {
425 let a = Owned::new(Elem(7i32)).into_shared(guard);
426 guard.defer_destroy(a);
427 }
428 }
429 });
430 }
431 })
432 .unwrap();
433
434 let handle = collector.register();
435 while DROPS.load(Ordering::Relaxed) < COUNT * THREADS {
436 let guard = &handle.pin();
437 collector.global.collect(guard);
438 }
439 assert_eq!(DROPS.load(Ordering::Relaxed), COUNT * THREADS);
440 }
441}