crossbeam_channel/flavors/
zero.rs

1//! Zero-capacity channel.
2//!
3//! This kind of channel is also known as *rendezvous* channel.
4
5use std::cell::UnsafeCell;
6use std::marker::PhantomData;
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::time::Instant;
9
10use crossbeam_utils::Backoff;
11
12use crate::context::Context;
13use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError};
14use crate::select::{Operation, SelectHandle, Selected, Token};
15use crate::utils::Spinlock;
16use crate::waker::Waker;
17
18/// A pointer to a packet.
19pub(crate) type ZeroToken = usize;
20
21/// A slot for passing one message from a sender to a receiver.
22struct Packet<T> {
23    /// Equals `true` if the packet is allocated on the stack.
24    on_stack: bool,
25
26    /// Equals `true` once the packet is ready for reading or writing.
27    ready: AtomicBool,
28
29    /// The message.
30    msg: UnsafeCell<Option<T>>,
31}
32
33impl<T> Packet<T> {
34    /// Creates an empty packet on the stack.
35    fn empty_on_stack() -> Packet<T> {
36        Packet {
37            on_stack: true,
38            ready: AtomicBool::new(false),
39            msg: UnsafeCell::new(None),
40        }
41    }
42
43    /// Creates an empty packet on the heap.
44    fn empty_on_heap() -> Box<Packet<T>> {
45        Box::new(Packet {
46            on_stack: false,
47            ready: AtomicBool::new(false),
48            msg: UnsafeCell::new(None),
49        })
50    }
51
52    /// Creates a packet on the stack, containing a message.
53    fn message_on_stack(msg: T) -> Packet<T> {
54        Packet {
55            on_stack: true,
56            ready: AtomicBool::new(false),
57            msg: UnsafeCell::new(Some(msg)),
58        }
59    }
60
61    /// Waits until the packet becomes ready for reading or writing.
62    fn wait_ready(&self) {
63        let backoff = Backoff::new();
64        while !self.ready.load(Ordering::Acquire) {
65            backoff.snooze();
66        }
67    }
68}
69
70/// Inner representation of a zero-capacity channel.
71struct Inner {
72    /// Senders waiting to pair up with a receive operation.
73    senders: Waker,
74
75    /// Receivers waiting to pair up with a send operation.
76    receivers: Waker,
77
78    /// Equals `true` when the channel is disconnected.
79    is_disconnected: bool,
80}
81
82/// Zero-capacity channel.
83pub(crate) struct Channel<T> {
84    /// Inner representation of the channel.
85    inner: Spinlock<Inner>,
86
87    /// Indicates that dropping a `Channel<T>` may drop values of type `T`.
88    _marker: PhantomData<T>,
89}
90
91impl<T> Channel<T> {
92    /// Constructs a new zero-capacity channel.
93    pub(crate) fn new() -> Self {
94        Channel {
95            inner: Spinlock::new(Inner {
96                senders: Waker::new(),
97                receivers: Waker::new(),
98                is_disconnected: false,
99            }),
100            _marker: PhantomData,
101        }
102    }
103
104    /// Returns a receiver handle to the channel.
105    pub(crate) fn receiver(&self) -> Receiver<'_, T> {
106        Receiver(self)
107    }
108
109    /// Returns a sender handle to the channel.
110    pub(crate) fn sender(&self) -> Sender<'_, T> {
111        Sender(self)
112    }
113
114    /// Attempts to reserve a slot for sending a message.
115    fn start_send(&self, token: &mut Token) -> bool {
116        let mut inner = self.inner.lock();
117
118        // If there's a waiting receiver, pair up with it.
119        if let Some(operation) = inner.receivers.try_select() {
120            token.zero = operation.packet;
121            true
122        } else if inner.is_disconnected {
123            token.zero = 0;
124            true
125        } else {
126            false
127        }
128    }
129
130    /// Writes a message into the packet.
131    pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
132        // If there is no packet, the channel is disconnected.
133        if token.zero == 0 {
134            return Err(msg);
135        }
136
137        let packet = &*(token.zero as *const Packet<T>);
138        packet.msg.get().write(Some(msg));
139        packet.ready.store(true, Ordering::Release);
140        Ok(())
141    }
142
143    /// Attempts to pair up with a sender.
144    fn start_recv(&self, token: &mut Token) -> bool {
145        let mut inner = self.inner.lock();
146
147        // If there's a waiting sender, pair up with it.
148        if let Some(operation) = inner.senders.try_select() {
149            token.zero = operation.packet;
150            true
151        } else if inner.is_disconnected {
152            token.zero = 0;
153            true
154        } else {
155            false
156        }
157    }
158
159    /// Reads a message from the packet.
160    pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
161        // If there is no packet, the channel is disconnected.
162        if token.zero == 0 {
163            return Err(());
164        }
165
166        let packet = &*(token.zero as *const Packet<T>);
167
168        if packet.on_stack {
169            // The message has been in the packet from the beginning, so there is no need to wait
170            // for it. However, after reading the message, we need to set `ready` to `true` in
171            // order to signal that the packet can be destroyed.
172            let msg = packet.msg.get().replace(None).unwrap();
173            packet.ready.store(true, Ordering::Release);
174            Ok(msg)
175        } else {
176            // Wait until the message becomes available, then read it and destroy the
177            // heap-allocated packet.
178            packet.wait_ready();
179            let msg = packet.msg.get().replace(None).unwrap();
180            drop(Box::from_raw(packet as *const Packet<T> as *mut Packet<T>));
181            Ok(msg)
182        }
183    }
184
185    /// Attempts to send a message into the channel.
186    pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
187        let token = &mut Token::default();
188        let mut inner = self.inner.lock();
189
190        // If there's a waiting receiver, pair up with it.
191        if let Some(operation) = inner.receivers.try_select() {
192            token.zero = operation.packet;
193            drop(inner);
194            unsafe {
195                self.write(token, msg).ok().unwrap();
196            }
197            Ok(())
198        } else if inner.is_disconnected {
199            Err(TrySendError::Disconnected(msg))
200        } else {
201            Err(TrySendError::Full(msg))
202        }
203    }
204
205    /// Sends a message into the channel.
206    pub(crate) fn send(
207        &self,
208        msg: T,
209        deadline: Option<Instant>,
210    ) -> Result<(), SendTimeoutError<T>> {
211        let token = &mut Token::default();
212        let mut inner = self.inner.lock();
213
214        // If there's a waiting receiver, pair up with it.
215        if let Some(operation) = inner.receivers.try_select() {
216            token.zero = operation.packet;
217            drop(inner);
218            unsafe {
219                self.write(token, msg).ok().unwrap();
220            }
221            return Ok(());
222        }
223
224        if inner.is_disconnected {
225            return Err(SendTimeoutError::Disconnected(msg));
226        }
227
228        Context::with(|cx| {
229            // Prepare for blocking until a receiver wakes us up.
230            let oper = Operation::hook(token);
231            let packet = Packet::<T>::message_on_stack(msg);
232            inner
233                .senders
234                .register_with_packet(oper, &packet as *const Packet<T> as usize, cx);
235            inner.receivers.notify();
236            drop(inner);
237
238            // Block the current thread.
239            let sel = cx.wait_until(deadline);
240
241            match sel {
242                Selected::Waiting => unreachable!(),
243                Selected::Aborted => {
244                    self.inner.lock().senders.unregister(oper).unwrap();
245                    let msg = unsafe { packet.msg.get().replace(None).unwrap() };
246                    Err(SendTimeoutError::Timeout(msg))
247                }
248                Selected::Disconnected => {
249                    self.inner.lock().senders.unregister(oper).unwrap();
250                    let msg = unsafe { packet.msg.get().replace(None).unwrap() };
251                    Err(SendTimeoutError::Disconnected(msg))
252                }
253                Selected::Operation(_) => {
254                    // Wait until the message is read, then drop the packet.
255                    packet.wait_ready();
256                    Ok(())
257                }
258            }
259        })
260    }
261
262    /// Attempts to receive a message without blocking.
263    pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
264        let token = &mut Token::default();
265        let mut inner = self.inner.lock();
266
267        // If there's a waiting sender, pair up with it.
268        if let Some(operation) = inner.senders.try_select() {
269            token.zero = operation.packet;
270            drop(inner);
271            unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) }
272        } else if inner.is_disconnected {
273            Err(TryRecvError::Disconnected)
274        } else {
275            Err(TryRecvError::Empty)
276        }
277    }
278
279    /// Receives a message from the channel.
280    pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
281        let token = &mut Token::default();
282        let mut inner = self.inner.lock();
283
284        // If there's a waiting sender, pair up with it.
285        if let Some(operation) = inner.senders.try_select() {
286            token.zero = operation.packet;
287            drop(inner);
288            unsafe {
289                return self.read(token).map_err(|_| RecvTimeoutError::Disconnected);
290            }
291        }
292
293        if inner.is_disconnected {
294            return Err(RecvTimeoutError::Disconnected);
295        }
296
297        Context::with(|cx| {
298            // Prepare for blocking until a sender wakes us up.
299            let oper = Operation::hook(token);
300            let packet = Packet::<T>::empty_on_stack();
301            inner
302                .receivers
303                .register_with_packet(oper, &packet as *const Packet<T> as usize, cx);
304            inner.senders.notify();
305            drop(inner);
306
307            // Block the current thread.
308            let sel = cx.wait_until(deadline);
309
310            match sel {
311                Selected::Waiting => unreachable!(),
312                Selected::Aborted => {
313                    self.inner.lock().receivers.unregister(oper).unwrap();
314                    Err(RecvTimeoutError::Timeout)
315                }
316                Selected::Disconnected => {
317                    self.inner.lock().receivers.unregister(oper).unwrap();
318                    Err(RecvTimeoutError::Disconnected)
319                }
320                Selected::Operation(_) => {
321                    // Wait until the message is provided, then read it.
322                    packet.wait_ready();
323                    unsafe { Ok(packet.msg.get().replace(None).unwrap()) }
324                }
325            }
326        })
327    }
328
329    /// Disconnects the channel and wakes up all blocked senders and receivers.
330    ///
331    /// Returns `true` if this call disconnected the channel.
332    pub(crate) fn disconnect(&self) -> bool {
333        let mut inner = self.inner.lock();
334
335        if !inner.is_disconnected {
336            inner.is_disconnected = true;
337            inner.senders.disconnect();
338            inner.receivers.disconnect();
339            true
340        } else {
341            false
342        }
343    }
344
345    /// Returns the current number of messages inside the channel.
346    pub(crate) fn len(&self) -> usize {
347        0
348    }
349
350    /// Returns the capacity of the channel.
351    #[allow(clippy::unnecessary_wraps)] // This is intentional.
352    pub(crate) fn capacity(&self) -> Option<usize> {
353        Some(0)
354    }
355
356    /// Returns `true` if the channel is empty.
357    pub(crate) fn is_empty(&self) -> bool {
358        true
359    }
360
361    /// Returns `true` if the channel is full.
362    pub(crate) fn is_full(&self) -> bool {
363        true
364    }
365}
366
367/// Receiver handle to a channel.
368pub(crate) struct Receiver<'a, T>(&'a Channel<T>);
369
370/// Sender handle to a channel.
371pub(crate) struct Sender<'a, T>(&'a Channel<T>);
372
373impl<T> SelectHandle for Receiver<'_, T> {
374    fn try_select(&self, token: &mut Token) -> bool {
375        self.0.start_recv(token)
376    }
377
378    fn deadline(&self) -> Option<Instant> {
379        None
380    }
381
382    fn register(&self, oper: Operation, cx: &Context) -> bool {
383        let packet = Box::into_raw(Packet::<T>::empty_on_heap());
384
385        let mut inner = self.0.inner.lock();
386        inner
387            .receivers
388            .register_with_packet(oper, packet as usize, cx);
389        inner.senders.notify();
390        inner.senders.can_select() || inner.is_disconnected
391    }
392
393    fn unregister(&self, oper: Operation) {
394        if let Some(operation) = self.0.inner.lock().receivers.unregister(oper) {
395            unsafe {
396                drop(Box::from_raw(operation.packet as *mut Packet<T>));
397            }
398        }
399    }
400
401    fn accept(&self, token: &mut Token, cx: &Context) -> bool {
402        token.zero = cx.wait_packet();
403        true
404    }
405
406    fn is_ready(&self) -> bool {
407        let inner = self.0.inner.lock();
408        inner.senders.can_select() || inner.is_disconnected
409    }
410
411    fn watch(&self, oper: Operation, cx: &Context) -> bool {
412        let mut inner = self.0.inner.lock();
413        inner.receivers.watch(oper, cx);
414        inner.senders.can_select() || inner.is_disconnected
415    }
416
417    fn unwatch(&self, oper: Operation) {
418        let mut inner = self.0.inner.lock();
419        inner.receivers.unwatch(oper);
420    }
421}
422
423impl<T> SelectHandle for Sender<'_, T> {
424    fn try_select(&self, token: &mut Token) -> bool {
425        self.0.start_send(token)
426    }
427
428    fn deadline(&self) -> Option<Instant> {
429        None
430    }
431
432    fn register(&self, oper: Operation, cx: &Context) -> bool {
433        let packet = Box::into_raw(Packet::<T>::empty_on_heap());
434
435        let mut inner = self.0.inner.lock();
436        inner
437            .senders
438            .register_with_packet(oper, packet as usize, cx);
439        inner.receivers.notify();
440        inner.receivers.can_select() || inner.is_disconnected
441    }
442
443    fn unregister(&self, oper: Operation) {
444        if let Some(operation) = self.0.inner.lock().senders.unregister(oper) {
445            unsafe {
446                drop(Box::from_raw(operation.packet as *mut Packet<T>));
447            }
448        }
449    }
450
451    fn accept(&self, token: &mut Token, cx: &Context) -> bool {
452        token.zero = cx.wait_packet();
453        true
454    }
455
456    fn is_ready(&self) -> bool {
457        let inner = self.0.inner.lock();
458        inner.receivers.can_select() || inner.is_disconnected
459    }
460
461    fn watch(&self, oper: Operation, cx: &Context) -> bool {
462        let mut inner = self.0.inner.lock();
463        inner.senders.watch(oper, cx);
464        inner.receivers.can_select() || inner.is_disconnected
465    }
466
467    fn unwatch(&self, oper: Operation) {
468        let mut inner = self.0.inner.lock();
469        inner.senders.unwatch(oper);
470    }
471}