crossbeam_epoch/
collector.rs

1/// Epoch-based garbage collector.
2///
3/// # Examples
4///
5/// ```
6/// use crossbeam_epoch::Collector;
7///
8/// let collector = Collector::new();
9///
10/// let handle = collector.register();
11/// drop(collector); // `handle` still works after dropping `collector`
12///
13/// handle.pin().flush();
14/// ```
15use core::fmt;
16
17use crate::guard::Guard;
18use crate::internal::{Global, Local};
19use crate::primitive::sync::Arc;
20
21/// An epoch-based garbage collector.
22pub struct Collector {
23    pub(crate) global: Arc<Global>,
24}
25
26unsafe impl Send for Collector {}
27unsafe impl Sync for Collector {}
28
29impl Default for Collector {
30    fn default() -> Self {
31        Self {
32            global: Arc::new(Global::new()),
33        }
34    }
35}
36
37impl Collector {
38    /// Creates a new collector.
39    pub fn new() -> Self {
40        Self::default()
41    }
42
43    /// Registers a new handle for the collector.
44    pub fn register(&self) -> LocalHandle {
45        Local::register(self)
46    }
47}
48
49impl Clone for Collector {
50    /// Creates another reference to the same garbage collector.
51    fn clone(&self) -> Self {
52        Collector {
53            global: self.global.clone(),
54        }
55    }
56}
57
58impl fmt::Debug for Collector {
59    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
60        f.pad("Collector { .. }")
61    }
62}
63
64impl PartialEq for Collector {
65    /// Checks if both handles point to the same collector.
66    fn eq(&self, rhs: &Collector) -> bool {
67        Arc::ptr_eq(&self.global, &rhs.global)
68    }
69}
70impl Eq for Collector {}
71
72/// A handle to a garbage collector.
73pub struct LocalHandle {
74    pub(crate) local: *const Local,
75}
76
77impl LocalHandle {
78    /// Pins the handle.
79    #[inline]
80    pub fn pin(&self) -> Guard {
81        unsafe { (*self.local).pin() }
82    }
83
84    /// Returns `true` if the handle is pinned.
85    #[inline]
86    pub fn is_pinned(&self) -> bool {
87        unsafe { (*self.local).is_pinned() }
88    }
89
90    /// Returns the `Collector` associated with this handle.
91    #[inline]
92    pub fn collector(&self) -> &Collector {
93        unsafe { (*self.local).collector() }
94    }
95}
96
97impl Drop for LocalHandle {
98    #[inline]
99    fn drop(&mut self) {
100        unsafe {
101            Local::release_handle(&*self.local);
102        }
103    }
104}
105
106impl fmt::Debug for LocalHandle {
107    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
108        f.pad("LocalHandle { .. }")
109    }
110}
111
112#[cfg(all(test, not(crossbeam_loom)))]
113mod tests {
114    use std::mem;
115    use std::sync::atomic::{AtomicUsize, Ordering};
116
117    use crossbeam_utils::thread;
118
119    use crate::{Collector, Owned};
120
121    const NUM_THREADS: usize = 8;
122
123    #[test]
124    fn pin_reentrant() {
125        let collector = Collector::new();
126        let handle = collector.register();
127        drop(collector);
128
129        assert!(!handle.is_pinned());
130        {
131            let _guard = &handle.pin();
132            assert!(handle.is_pinned());
133            {
134                let _guard = &handle.pin();
135                assert!(handle.is_pinned());
136            }
137            assert!(handle.is_pinned());
138        }
139        assert!(!handle.is_pinned());
140    }
141
142    #[test]
143    fn flush_local_bag() {
144        let collector = Collector::new();
145        let handle = collector.register();
146        drop(collector);
147
148        for _ in 0..100 {
149            let guard = &handle.pin();
150            unsafe {
151                let a = Owned::new(7).into_shared(guard);
152                guard.defer_destroy(a);
153
154                assert!(!(*guard.local).bag.with(|b| (*b).is_empty()));
155
156                while !(*guard.local).bag.with(|b| (*b).is_empty()) {
157                    guard.flush();
158                }
159            }
160        }
161    }
162
163    #[test]
164    fn garbage_buffering() {
165        let collector = Collector::new();
166        let handle = collector.register();
167        drop(collector);
168
169        let guard = &handle.pin();
170        unsafe {
171            for _ in 0..10 {
172                let a = Owned::new(7).into_shared(guard);
173                guard.defer_destroy(a);
174            }
175            assert!(!(*guard.local).bag.with(|b| (*b).is_empty()));
176        }
177    }
178
179    #[test]
180    fn pin_holds_advance() {
181        let collector = Collector::new();
182
183        thread::scope(|scope| {
184            for _ in 0..NUM_THREADS {
185                scope.spawn(|_| {
186                    let handle = collector.register();
187                    for _ in 0..500_000 {
188                        let guard = &handle.pin();
189
190                        let before = collector.global.epoch.load(Ordering::Relaxed);
191                        collector.global.collect(guard);
192                        let after = collector.global.epoch.load(Ordering::Relaxed);
193
194                        assert!(after.wrapping_sub(before) <= 2);
195                    }
196                });
197            }
198        })
199        .unwrap();
200    }
201
202    #[cfg(not(crossbeam_sanitize))] // TODO: assertions failed due to `cfg(crossbeam_sanitize)` reduce `internal::MAX_OBJECTS`
203    #[test]
204    fn incremental() {
205        const COUNT: usize = 100_000;
206        static DESTROYS: AtomicUsize = AtomicUsize::new(0);
207
208        let collector = Collector::new();
209        let handle = collector.register();
210
211        unsafe {
212            let guard = &handle.pin();
213            for _ in 0..COUNT {
214                let a = Owned::new(7i32).into_shared(guard);
215                guard.defer_unchecked(move || {
216                    drop(a.into_owned());
217                    DESTROYS.fetch_add(1, Ordering::Relaxed);
218                });
219            }
220            guard.flush();
221        }
222
223        let mut last = 0;
224
225        while last < COUNT {
226            let curr = DESTROYS.load(Ordering::Relaxed);
227            assert!(curr - last <= 1024);
228            last = curr;
229
230            let guard = &handle.pin();
231            collector.global.collect(guard);
232        }
233        assert!(DESTROYS.load(Ordering::Relaxed) == 100_000);
234    }
235
236    #[test]
237    fn buffering() {
238        const COUNT: usize = 10;
239        static DESTROYS: AtomicUsize = AtomicUsize::new(0);
240
241        let collector = Collector::new();
242        let handle = collector.register();
243
244        unsafe {
245            let guard = &handle.pin();
246            for _ in 0..COUNT {
247                let a = Owned::new(7i32).into_shared(guard);
248                guard.defer_unchecked(move || {
249                    drop(a.into_owned());
250                    DESTROYS.fetch_add(1, Ordering::Relaxed);
251                });
252            }
253        }
254
255        for _ in 0..100_000 {
256            collector.global.collect(&handle.pin());
257        }
258        assert!(DESTROYS.load(Ordering::Relaxed) < COUNT);
259
260        handle.pin().flush();
261
262        while DESTROYS.load(Ordering::Relaxed) < COUNT {
263            let guard = &handle.pin();
264            collector.global.collect(guard);
265        }
266        assert_eq!(DESTROYS.load(Ordering::Relaxed), COUNT);
267    }
268
269    #[test]
270    fn count_drops() {
271        const COUNT: usize = 100_000;
272        static DROPS: AtomicUsize = AtomicUsize::new(0);
273
274        struct Elem(i32);
275
276        impl Drop for Elem {
277            fn drop(&mut self) {
278                DROPS.fetch_add(1, Ordering::Relaxed);
279            }
280        }
281
282        let collector = Collector::new();
283        let handle = collector.register();
284
285        unsafe {
286            let guard = &handle.pin();
287
288            for _ in 0..COUNT {
289                let a = Owned::new(Elem(7i32)).into_shared(guard);
290                guard.defer_destroy(a);
291            }
292            guard.flush();
293        }
294
295        while DROPS.load(Ordering::Relaxed) < COUNT {
296            let guard = &handle.pin();
297            collector.global.collect(guard);
298        }
299        assert_eq!(DROPS.load(Ordering::Relaxed), COUNT);
300    }
301
302    #[test]
303    fn count_destroy() {
304        const COUNT: usize = 100_000;
305        static DESTROYS: AtomicUsize = AtomicUsize::new(0);
306
307        let collector = Collector::new();
308        let handle = collector.register();
309
310        unsafe {
311            let guard = &handle.pin();
312
313            for _ in 0..COUNT {
314                let a = Owned::new(7i32).into_shared(guard);
315                guard.defer_unchecked(move || {
316                    drop(a.into_owned());
317                    DESTROYS.fetch_add(1, Ordering::Relaxed);
318                });
319            }
320            guard.flush();
321        }
322
323        while DESTROYS.load(Ordering::Relaxed) < COUNT {
324            let guard = &handle.pin();
325            collector.global.collect(guard);
326        }
327        assert_eq!(DESTROYS.load(Ordering::Relaxed), COUNT);
328    }
329
330    #[test]
331    fn drop_array() {
332        const COUNT: usize = 700;
333        static DROPS: AtomicUsize = AtomicUsize::new(0);
334
335        struct Elem(i32);
336
337        impl Drop for Elem {
338            fn drop(&mut self) {
339                DROPS.fetch_add(1, Ordering::Relaxed);
340            }
341        }
342
343        let collector = Collector::new();
344        let handle = collector.register();
345
346        let mut guard = handle.pin();
347
348        let mut v = Vec::with_capacity(COUNT);
349        for i in 0..COUNT {
350            v.push(Elem(i as i32));
351        }
352
353        {
354            let a = Owned::new(v).into_shared(&guard);
355            unsafe {
356                guard.defer_destroy(a);
357            }
358            guard.flush();
359        }
360
361        while DROPS.load(Ordering::Relaxed) < COUNT {
362            guard.repin();
363            collector.global.collect(&guard);
364        }
365        assert_eq!(DROPS.load(Ordering::Relaxed), COUNT);
366    }
367
368    #[test]
369    fn destroy_array() {
370        const COUNT: usize = 100_000;
371        static DESTROYS: AtomicUsize = AtomicUsize::new(0);
372
373        let collector = Collector::new();
374        let handle = collector.register();
375
376        unsafe {
377            let guard = &handle.pin();
378
379            let mut v = Vec::with_capacity(COUNT);
380            for i in 0..COUNT {
381                v.push(i as i32);
382            }
383
384            let ptr = v.as_mut_ptr() as usize;
385            let len = v.len();
386            guard.defer_unchecked(move || {
387                drop(Vec::from_raw_parts(ptr as *const i32 as *mut i32, len, len));
388                DESTROYS.fetch_add(len, Ordering::Relaxed);
389            });
390            guard.flush();
391
392            mem::forget(v);
393        }
394
395        while DESTROYS.load(Ordering::Relaxed) < COUNT {
396            let guard = &handle.pin();
397            collector.global.collect(guard);
398        }
399        assert_eq!(DESTROYS.load(Ordering::Relaxed), COUNT);
400    }
401
402    #[test]
403    fn stress() {
404        const THREADS: usize = 8;
405        const COUNT: usize = 100_000;
406        static DROPS: AtomicUsize = AtomicUsize::new(0);
407
408        struct Elem(i32);
409
410        impl Drop for Elem {
411            fn drop(&mut self) {
412                DROPS.fetch_add(1, Ordering::Relaxed);
413            }
414        }
415
416        let collector = Collector::new();
417
418        thread::scope(|scope| {
419            for _ in 0..THREADS {
420                scope.spawn(|_| {
421                    let handle = collector.register();
422                    for _ in 0..COUNT {
423                        let guard = &handle.pin();
424                        unsafe {
425                            let a = Owned::new(Elem(7i32)).into_shared(guard);
426                            guard.defer_destroy(a);
427                        }
428                    }
429                });
430            }
431        })
432        .unwrap();
433
434        let handle = collector.register();
435        while DROPS.load(Ordering::Relaxed) < COUNT * THREADS {
436            let guard = &handle.pin();
437            collector.global.collect(guard);
438        }
439        assert_eq!(DROPS.load(Ordering::Relaxed), COUNT * THREADS);
440    }
441}