crossbeam_channel/
counter.rs

1//! Reference counter for channels.
2
3use std::isize;
4use std::ops;
5use std::process;
6use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
7
8/// Reference counter internals.
9struct Counter<C> {
10    /// The number of senders associated with the channel.
11    senders: AtomicUsize,
12
13    /// The number of receivers associated with the channel.
14    receivers: AtomicUsize,
15
16    /// Set to `true` if the last sender or the last receiver reference deallocates the channel.
17    destroy: AtomicBool,
18
19    /// The internal channel.
20    chan: C,
21}
22
23/// Wraps a channel into the reference counter.
24pub(crate) fn new<C>(chan: C) -> (Sender<C>, Receiver<C>) {
25    let counter = Box::into_raw(Box::new(Counter {
26        senders: AtomicUsize::new(1),
27        receivers: AtomicUsize::new(1),
28        destroy: AtomicBool::new(false),
29        chan,
30    }));
31    let s = Sender { counter };
32    let r = Receiver { counter };
33    (s, r)
34}
35
36/// The sending side.
37pub(crate) struct Sender<C> {
38    counter: *mut Counter<C>,
39}
40
41impl<C> Sender<C> {
42    /// Returns the internal `Counter`.
43    fn counter(&self) -> &Counter<C> {
44        unsafe { &*self.counter }
45    }
46
47    /// Acquires another sender reference.
48    pub(crate) fn acquire(&self) -> Sender<C> {
49        let count = self.counter().senders.fetch_add(1, Ordering::Relaxed);
50
51        // Cloning senders and calling `mem::forget` on the clones could potentially overflow the
52        // counter. It's very difficult to recover sensibly from such degenerate scenarios so we
53        // just abort when the count becomes very large.
54        if count > isize::MAX as usize {
55            process::abort();
56        }
57
58        Sender {
59            counter: self.counter,
60        }
61    }
62
63    /// Releases the sender reference.
64    ///
65    /// Function `disconnect` will be called if this is the last sender reference.
66    pub(crate) unsafe fn release<F: FnOnce(&C) -> bool>(&self, disconnect: F) {
67        if self.counter().senders.fetch_sub(1, Ordering::AcqRel) == 1 {
68            disconnect(&self.counter().chan);
69
70            if self.counter().destroy.swap(true, Ordering::AcqRel) {
71                drop(Box::from_raw(self.counter));
72            }
73        }
74    }
75}
76
77impl<C> ops::Deref for Sender<C> {
78    type Target = C;
79
80    fn deref(&self) -> &C {
81        &self.counter().chan
82    }
83}
84
85impl<C> PartialEq for Sender<C> {
86    fn eq(&self, other: &Sender<C>) -> bool {
87        self.counter == other.counter
88    }
89}
90
91/// The receiving side.
92pub(crate) struct Receiver<C> {
93    counter: *mut Counter<C>,
94}
95
96impl<C> Receiver<C> {
97    /// Returns the internal `Counter`.
98    fn counter(&self) -> &Counter<C> {
99        unsafe { &*self.counter }
100    }
101
102    /// Acquires another receiver reference.
103    pub(crate) fn acquire(&self) -> Receiver<C> {
104        let count = self.counter().receivers.fetch_add(1, Ordering::Relaxed);
105
106        // Cloning receivers and calling `mem::forget` on the clones could potentially overflow the
107        // counter. It's very difficult to recover sensibly from such degenerate scenarios so we
108        // just abort when the count becomes very large.
109        if count > isize::MAX as usize {
110            process::abort();
111        }
112
113        Receiver {
114            counter: self.counter,
115        }
116    }
117
118    /// Releases the receiver reference.
119    ///
120    /// Function `disconnect` will be called if this is the last receiver reference.
121    pub(crate) unsafe fn release<F: FnOnce(&C) -> bool>(&self, disconnect: F) {
122        if self.counter().receivers.fetch_sub(1, Ordering::AcqRel) == 1 {
123            disconnect(&self.counter().chan);
124
125            if self.counter().destroy.swap(true, Ordering::AcqRel) {
126                drop(Box::from_raw(self.counter));
127            }
128        }
129    }
130}
131
132impl<C> ops::Deref for Receiver<C> {
133    type Target = C;
134
135    fn deref(&self) -> &C {
136        &self.counter().chan
137    }
138}
139
140impl<C> PartialEq for Receiver<C> {
141    fn eq(&self, other: &Receiver<C>) -> bool {
142        self.counter == other.counter
143    }
144}