crossbeam_channel/flavors/
zero.rs
1use std::cell::UnsafeCell;
6use std::marker::PhantomData;
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::time::Instant;
9
10use crossbeam_utils::Backoff;
11
12use crate::context::Context;
13use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError};
14use crate::select::{Operation, SelectHandle, Selected, Token};
15use crate::utils::Spinlock;
16use crate::waker::Waker;
17
18pub(crate) type ZeroToken = usize;
20
21struct Packet<T> {
23 on_stack: bool,
25
26 ready: AtomicBool,
28
29 msg: UnsafeCell<Option<T>>,
31}
32
33impl<T> Packet<T> {
34 fn empty_on_stack() -> Packet<T> {
36 Packet {
37 on_stack: true,
38 ready: AtomicBool::new(false),
39 msg: UnsafeCell::new(None),
40 }
41 }
42
43 fn empty_on_heap() -> Box<Packet<T>> {
45 Box::new(Packet {
46 on_stack: false,
47 ready: AtomicBool::new(false),
48 msg: UnsafeCell::new(None),
49 })
50 }
51
52 fn message_on_stack(msg: T) -> Packet<T> {
54 Packet {
55 on_stack: true,
56 ready: AtomicBool::new(false),
57 msg: UnsafeCell::new(Some(msg)),
58 }
59 }
60
61 fn wait_ready(&self) {
63 let backoff = Backoff::new();
64 while !self.ready.load(Ordering::Acquire) {
65 backoff.snooze();
66 }
67 }
68}
69
70struct Inner {
72 senders: Waker,
74
75 receivers: Waker,
77
78 is_disconnected: bool,
80}
81
82pub(crate) struct Channel<T> {
84 inner: Spinlock<Inner>,
86
87 _marker: PhantomData<T>,
89}
90
91impl<T> Channel<T> {
92 pub(crate) fn new() -> Self {
94 Channel {
95 inner: Spinlock::new(Inner {
96 senders: Waker::new(),
97 receivers: Waker::new(),
98 is_disconnected: false,
99 }),
100 _marker: PhantomData,
101 }
102 }
103
104 pub(crate) fn receiver(&self) -> Receiver<'_, T> {
106 Receiver(self)
107 }
108
109 pub(crate) fn sender(&self) -> Sender<'_, T> {
111 Sender(self)
112 }
113
114 fn start_send(&self, token: &mut Token) -> bool {
116 let mut inner = self.inner.lock();
117
118 if let Some(operation) = inner.receivers.try_select() {
120 token.zero = operation.packet;
121 true
122 } else if inner.is_disconnected {
123 token.zero = 0;
124 true
125 } else {
126 false
127 }
128 }
129
130 pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
132 if token.zero == 0 {
134 return Err(msg);
135 }
136
137 let packet = &*(token.zero as *const Packet<T>);
138 packet.msg.get().write(Some(msg));
139 packet.ready.store(true, Ordering::Release);
140 Ok(())
141 }
142
143 fn start_recv(&self, token: &mut Token) -> bool {
145 let mut inner = self.inner.lock();
146
147 if let Some(operation) = inner.senders.try_select() {
149 token.zero = operation.packet;
150 true
151 } else if inner.is_disconnected {
152 token.zero = 0;
153 true
154 } else {
155 false
156 }
157 }
158
159 pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
161 if token.zero == 0 {
163 return Err(());
164 }
165
166 let packet = &*(token.zero as *const Packet<T>);
167
168 if packet.on_stack {
169 let msg = packet.msg.get().replace(None).unwrap();
173 packet.ready.store(true, Ordering::Release);
174 Ok(msg)
175 } else {
176 packet.wait_ready();
179 let msg = packet.msg.get().replace(None).unwrap();
180 drop(Box::from_raw(packet as *const Packet<T> as *mut Packet<T>));
181 Ok(msg)
182 }
183 }
184
185 pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
187 let token = &mut Token::default();
188 let mut inner = self.inner.lock();
189
190 if let Some(operation) = inner.receivers.try_select() {
192 token.zero = operation.packet;
193 drop(inner);
194 unsafe {
195 self.write(token, msg).ok().unwrap();
196 }
197 Ok(())
198 } else if inner.is_disconnected {
199 Err(TrySendError::Disconnected(msg))
200 } else {
201 Err(TrySendError::Full(msg))
202 }
203 }
204
205 pub(crate) fn send(
207 &self,
208 msg: T,
209 deadline: Option<Instant>,
210 ) -> Result<(), SendTimeoutError<T>> {
211 let token = &mut Token::default();
212 let mut inner = self.inner.lock();
213
214 if let Some(operation) = inner.receivers.try_select() {
216 token.zero = operation.packet;
217 drop(inner);
218 unsafe {
219 self.write(token, msg).ok().unwrap();
220 }
221 return Ok(());
222 }
223
224 if inner.is_disconnected {
225 return Err(SendTimeoutError::Disconnected(msg));
226 }
227
228 Context::with(|cx| {
229 let oper = Operation::hook(token);
231 let packet = Packet::<T>::message_on_stack(msg);
232 inner
233 .senders
234 .register_with_packet(oper, &packet as *const Packet<T> as usize, cx);
235 inner.receivers.notify();
236 drop(inner);
237
238 let sel = cx.wait_until(deadline);
240
241 match sel {
242 Selected::Waiting => unreachable!(),
243 Selected::Aborted => {
244 self.inner.lock().senders.unregister(oper).unwrap();
245 let msg = unsafe { packet.msg.get().replace(None).unwrap() };
246 Err(SendTimeoutError::Timeout(msg))
247 }
248 Selected::Disconnected => {
249 self.inner.lock().senders.unregister(oper).unwrap();
250 let msg = unsafe { packet.msg.get().replace(None).unwrap() };
251 Err(SendTimeoutError::Disconnected(msg))
252 }
253 Selected::Operation(_) => {
254 packet.wait_ready();
256 Ok(())
257 }
258 }
259 })
260 }
261
262 pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
264 let token = &mut Token::default();
265 let mut inner = self.inner.lock();
266
267 if let Some(operation) = inner.senders.try_select() {
269 token.zero = operation.packet;
270 drop(inner);
271 unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) }
272 } else if inner.is_disconnected {
273 Err(TryRecvError::Disconnected)
274 } else {
275 Err(TryRecvError::Empty)
276 }
277 }
278
279 pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
281 let token = &mut Token::default();
282 let mut inner = self.inner.lock();
283
284 if let Some(operation) = inner.senders.try_select() {
286 token.zero = operation.packet;
287 drop(inner);
288 unsafe {
289 return self.read(token).map_err(|_| RecvTimeoutError::Disconnected);
290 }
291 }
292
293 if inner.is_disconnected {
294 return Err(RecvTimeoutError::Disconnected);
295 }
296
297 Context::with(|cx| {
298 let oper = Operation::hook(token);
300 let packet = Packet::<T>::empty_on_stack();
301 inner
302 .receivers
303 .register_with_packet(oper, &packet as *const Packet<T> as usize, cx);
304 inner.senders.notify();
305 drop(inner);
306
307 let sel = cx.wait_until(deadline);
309
310 match sel {
311 Selected::Waiting => unreachable!(),
312 Selected::Aborted => {
313 self.inner.lock().receivers.unregister(oper).unwrap();
314 Err(RecvTimeoutError::Timeout)
315 }
316 Selected::Disconnected => {
317 self.inner.lock().receivers.unregister(oper).unwrap();
318 Err(RecvTimeoutError::Disconnected)
319 }
320 Selected::Operation(_) => {
321 packet.wait_ready();
323 unsafe { Ok(packet.msg.get().replace(None).unwrap()) }
324 }
325 }
326 })
327 }
328
329 pub(crate) fn disconnect(&self) -> bool {
333 let mut inner = self.inner.lock();
334
335 if !inner.is_disconnected {
336 inner.is_disconnected = true;
337 inner.senders.disconnect();
338 inner.receivers.disconnect();
339 true
340 } else {
341 false
342 }
343 }
344
345 pub(crate) fn len(&self) -> usize {
347 0
348 }
349
350 #[allow(clippy::unnecessary_wraps)] pub(crate) fn capacity(&self) -> Option<usize> {
353 Some(0)
354 }
355
356 pub(crate) fn is_empty(&self) -> bool {
358 true
359 }
360
361 pub(crate) fn is_full(&self) -> bool {
363 true
364 }
365}
366
367pub(crate) struct Receiver<'a, T>(&'a Channel<T>);
369
370pub(crate) struct Sender<'a, T>(&'a Channel<T>);
372
373impl<T> SelectHandle for Receiver<'_, T> {
374 fn try_select(&self, token: &mut Token) -> bool {
375 self.0.start_recv(token)
376 }
377
378 fn deadline(&self) -> Option<Instant> {
379 None
380 }
381
382 fn register(&self, oper: Operation, cx: &Context) -> bool {
383 let packet = Box::into_raw(Packet::<T>::empty_on_heap());
384
385 let mut inner = self.0.inner.lock();
386 inner
387 .receivers
388 .register_with_packet(oper, packet as usize, cx);
389 inner.senders.notify();
390 inner.senders.can_select() || inner.is_disconnected
391 }
392
393 fn unregister(&self, oper: Operation) {
394 if let Some(operation) = self.0.inner.lock().receivers.unregister(oper) {
395 unsafe {
396 drop(Box::from_raw(operation.packet as *mut Packet<T>));
397 }
398 }
399 }
400
401 fn accept(&self, token: &mut Token, cx: &Context) -> bool {
402 token.zero = cx.wait_packet();
403 true
404 }
405
406 fn is_ready(&self) -> bool {
407 let inner = self.0.inner.lock();
408 inner.senders.can_select() || inner.is_disconnected
409 }
410
411 fn watch(&self, oper: Operation, cx: &Context) -> bool {
412 let mut inner = self.0.inner.lock();
413 inner.receivers.watch(oper, cx);
414 inner.senders.can_select() || inner.is_disconnected
415 }
416
417 fn unwatch(&self, oper: Operation) {
418 let mut inner = self.0.inner.lock();
419 inner.receivers.unwatch(oper);
420 }
421}
422
423impl<T> SelectHandle for Sender<'_, T> {
424 fn try_select(&self, token: &mut Token) -> bool {
425 self.0.start_send(token)
426 }
427
428 fn deadline(&self) -> Option<Instant> {
429 None
430 }
431
432 fn register(&self, oper: Operation, cx: &Context) -> bool {
433 let packet = Box::into_raw(Packet::<T>::empty_on_heap());
434
435 let mut inner = self.0.inner.lock();
436 inner
437 .senders
438 .register_with_packet(oper, packet as usize, cx);
439 inner.receivers.notify();
440 inner.receivers.can_select() || inner.is_disconnected
441 }
442
443 fn unregister(&self, oper: Operation) {
444 if let Some(operation) = self.0.inner.lock().senders.unregister(oper) {
445 unsafe {
446 drop(Box::from_raw(operation.packet as *mut Packet<T>));
447 }
448 }
449 }
450
451 fn accept(&self, token: &mut Token, cx: &Context) -> bool {
452 token.zero = cx.wait_packet();
453 true
454 }
455
456 fn is_ready(&self) -> bool {
457 let inner = self.0.inner.lock();
458 inner.receivers.can_select() || inner.is_disconnected
459 }
460
461 fn watch(&self, oper: Operation, cx: &Context) -> bool {
462 let mut inner = self.0.inner.lock();
463 inner.senders.watch(oper, cx);
464 inner.receivers.can_select() || inner.is_disconnected
465 }
466
467 fn unwatch(&self, oper: Operation) {
468 let mut inner = self.0.inner.lock();
469 inner.senders.unwatch(oper);
470 }
471}