crossbeam_channel/
waker.rs
1use std::sync::atomic::{AtomicBool, Ordering};
4use std::thread::{self, ThreadId};
5
6use crate::context::Context;
7use crate::select::{Operation, Selected};
8use crate::utils::Spinlock;
9
10pub(crate) struct Entry {
12 pub(crate) oper: Operation,
14
15 pub(crate) packet: usize,
17
18 pub(crate) cx: Context,
20}
21
22pub(crate) struct Waker {
27 selectors: Vec<Entry>,
29
30 observers: Vec<Entry>,
32}
33
34impl Waker {
35 #[inline]
37 pub(crate) fn new() -> Self {
38 Waker {
39 selectors: Vec::new(),
40 observers: Vec::new(),
41 }
42 }
43
44 #[inline]
46 pub(crate) fn register(&mut self, oper: Operation, cx: &Context) {
47 self.register_with_packet(oper, 0, cx);
48 }
49
50 #[inline]
52 pub(crate) fn register_with_packet(&mut self, oper: Operation, packet: usize, cx: &Context) {
53 self.selectors.push(Entry {
54 oper,
55 packet,
56 cx: cx.clone(),
57 });
58 }
59
60 #[inline]
62 pub(crate) fn unregister(&mut self, oper: Operation) -> Option<Entry> {
63 if let Some((i, _)) = self
64 .selectors
65 .iter()
66 .enumerate()
67 .find(|&(_, entry)| entry.oper == oper)
68 {
69 let entry = self.selectors.remove(i);
70 Some(entry)
71 } else {
72 None
73 }
74 }
75
76 #[inline]
78 pub(crate) fn try_select(&mut self) -> Option<Entry> {
79 let mut entry = None;
80
81 if !self.selectors.is_empty() {
82 let thread_id = current_thread_id();
83
84 for i in 0..self.selectors.len() {
85 if self.selectors[i].cx.thread_id() != thread_id {
87 let sel = Selected::Operation(self.selectors[i].oper);
89 let res = self.selectors[i].cx.try_select(sel);
90
91 if res.is_ok() {
92 self.selectors[i].cx.store_packet(self.selectors[i].packet);
94 self.selectors[i].cx.unpark();
96
97 entry = Some(self.selectors.remove(i));
100 break;
101 }
102 }
103 }
104 }
105
106 entry
107 }
108
109 #[inline]
111 pub(crate) fn can_select(&self) -> bool {
112 if self.selectors.is_empty() {
113 false
114 } else {
115 let thread_id = current_thread_id();
116
117 self.selectors.iter().any(|entry| {
118 entry.cx.thread_id() != thread_id && entry.cx.selected() == Selected::Waiting
119 })
120 }
121 }
122
123 #[inline]
125 pub(crate) fn watch(&mut self, oper: Operation, cx: &Context) {
126 self.observers.push(Entry {
127 oper,
128 packet: 0,
129 cx: cx.clone(),
130 });
131 }
132
133 #[inline]
135 pub(crate) fn unwatch(&mut self, oper: Operation) {
136 self.observers.retain(|e| e.oper != oper);
137 }
138
139 #[inline]
141 pub(crate) fn notify(&mut self) {
142 for entry in self.observers.drain(..) {
143 if entry.cx.try_select(Selected::Operation(entry.oper)).is_ok() {
144 entry.cx.unpark();
145 }
146 }
147 }
148
149 #[inline]
151 pub(crate) fn disconnect(&mut self) {
152 for entry in self.selectors.iter() {
153 if entry.cx.try_select(Selected::Disconnected).is_ok() {
154 entry.cx.unpark();
160 }
161 }
162
163 self.notify();
164 }
165}
166
167impl Drop for Waker {
168 #[inline]
169 fn drop(&mut self) {
170 debug_assert_eq!(self.selectors.len(), 0);
171 debug_assert_eq!(self.observers.len(), 0);
172 }
173}
174
175pub(crate) struct SyncWaker {
179 inner: Spinlock<Waker>,
181
182 is_empty: AtomicBool,
184}
185
186impl SyncWaker {
187 #[inline]
189 pub(crate) fn new() -> Self {
190 SyncWaker {
191 inner: Spinlock::new(Waker::new()),
192 is_empty: AtomicBool::new(true),
193 }
194 }
195
196 #[inline]
198 pub(crate) fn register(&self, oper: Operation, cx: &Context) {
199 let mut inner = self.inner.lock();
200 inner.register(oper, cx);
201 self.is_empty.store(
202 inner.selectors.is_empty() && inner.observers.is_empty(),
203 Ordering::SeqCst,
204 );
205 }
206
207 #[inline]
209 pub(crate) fn unregister(&self, oper: Operation) -> Option<Entry> {
210 let mut inner = self.inner.lock();
211 let entry = inner.unregister(oper);
212 self.is_empty.store(
213 inner.selectors.is_empty() && inner.observers.is_empty(),
214 Ordering::SeqCst,
215 );
216 entry
217 }
218
219 #[inline]
221 pub(crate) fn notify(&self) {
222 if !self.is_empty.load(Ordering::SeqCst) {
223 let mut inner = self.inner.lock();
224 if !self.is_empty.load(Ordering::SeqCst) {
225 inner.try_select();
226 inner.notify();
227 self.is_empty.store(
228 inner.selectors.is_empty() && inner.observers.is_empty(),
229 Ordering::SeqCst,
230 );
231 }
232 }
233 }
234
235 #[inline]
237 pub(crate) fn watch(&self, oper: Operation, cx: &Context) {
238 let mut inner = self.inner.lock();
239 inner.watch(oper, cx);
240 self.is_empty.store(
241 inner.selectors.is_empty() && inner.observers.is_empty(),
242 Ordering::SeqCst,
243 );
244 }
245
246 #[inline]
248 pub(crate) fn unwatch(&self, oper: Operation) {
249 let mut inner = self.inner.lock();
250 inner.unwatch(oper);
251 self.is_empty.store(
252 inner.selectors.is_empty() && inner.observers.is_empty(),
253 Ordering::SeqCst,
254 );
255 }
256
257 #[inline]
259 pub(crate) fn disconnect(&self) {
260 let mut inner = self.inner.lock();
261 inner.disconnect();
262 self.is_empty.store(
263 inner.selectors.is_empty() && inner.observers.is_empty(),
264 Ordering::SeqCst,
265 );
266 }
267}
268
269impl Drop for SyncWaker {
270 #[inline]
271 fn drop(&mut self) {
272 debug_assert_eq!(self.is_empty.load(Ordering::SeqCst), true);
273 }
274}
275
276#[inline]
278fn current_thread_id() -> ThreadId {
279 thread_local! {
280 static THREAD_ID: ThreadId = thread::current().id();
282 }
283
284 THREAD_ID
285 .try_with(|id| *id)
286 .unwrap_or_else(|_| thread::current().id())
287}