crossbeam_channel/flavors/
array.rs

1//! Bounded channel based on a preallocated array.
2//!
3//! This flavor has a fixed, positive capacity.
4//!
5//! The implementation is based on Dmitry Vyukov's bounded MPMC queue.
6//!
7//! Source:
8//!   - <http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue>
9//!   - <https://docs.google.com/document/d/1yIAYmbvL3JxOKOjuCyon7JhW4cSv1wy5hC0ApeGMV9s/pub>
10
11use std::cell::UnsafeCell;
12use std::marker::PhantomData;
13use std::mem::{self, MaybeUninit};
14use std::ptr;
15use std::sync::atomic::{self, AtomicUsize, Ordering};
16use std::time::Instant;
17
18use crossbeam_utils::{Backoff, CachePadded};
19
20use crate::context::Context;
21use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError};
22use crate::select::{Operation, SelectHandle, Selected, Token};
23use crate::waker::SyncWaker;
24
25/// A slot in a channel.
26struct Slot<T> {
27    /// The current stamp.
28    stamp: AtomicUsize,
29
30    /// The message in this slot.
31    msg: UnsafeCell<MaybeUninit<T>>,
32}
33
34/// The token type for the array flavor.
35#[derive(Debug)]
36pub struct ArrayToken {
37    /// Slot to read from or write to.
38    slot: *const u8,
39
40    /// Stamp to store into the slot after reading or writing.
41    stamp: usize,
42}
43
44impl Default for ArrayToken {
45    #[inline]
46    fn default() -> Self {
47        ArrayToken {
48            slot: ptr::null(),
49            stamp: 0,
50        }
51    }
52}
53
54/// Bounded channel based on a preallocated array.
55pub(crate) struct Channel<T> {
56    /// The head of the channel.
57    ///
58    /// This value is a "stamp" consisting of an index into the buffer, a mark bit, and a lap, but
59    /// packed into a single `usize`. The lower bits represent the index, while the upper bits
60    /// represent the lap. The mark bit in the head is always zero.
61    ///
62    /// Messages are popped from the head of the channel.
63    head: CachePadded<AtomicUsize>,
64
65    /// The tail of the channel.
66    ///
67    /// This value is a "stamp" consisting of an index into the buffer, a mark bit, and a lap, but
68    /// packed into a single `usize`. The lower bits represent the index, while the upper bits
69    /// represent the lap. The mark bit indicates that the channel is disconnected.
70    ///
71    /// Messages are pushed into the tail of the channel.
72    tail: CachePadded<AtomicUsize>,
73
74    /// The buffer holding slots.
75    buffer: *mut Slot<T>,
76
77    /// The channel capacity.
78    cap: usize,
79
80    /// A stamp with the value of `{ lap: 1, mark: 0, index: 0 }`.
81    one_lap: usize,
82
83    /// If this bit is set in the tail, that means the channel is disconnected.
84    mark_bit: usize,
85
86    /// Senders waiting while the channel is full.
87    senders: SyncWaker,
88
89    /// Receivers waiting while the channel is empty and not disconnected.
90    receivers: SyncWaker,
91
92    /// Indicates that dropping a `Channel<T>` may drop values of type `T`.
93    _marker: PhantomData<T>,
94}
95
96impl<T> Channel<T> {
97    /// Creates a bounded channel of capacity `cap`.
98    pub(crate) fn with_capacity(cap: usize) -> Self {
99        assert!(cap > 0, "capacity must be positive");
100
101        // Compute constants `mark_bit` and `one_lap`.
102        let mark_bit = (cap + 1).next_power_of_two();
103        let one_lap = mark_bit * 2;
104
105        // Head is initialized to `{ lap: 0, mark: 0, index: 0 }`.
106        let head = 0;
107        // Tail is initialized to `{ lap: 0, mark: 0, index: 0 }`.
108        let tail = 0;
109
110        // Allocate a buffer of `cap` slots initialized
111        // with stamps.
112        let buffer = {
113            let mut boxed: Box<[Slot<T>]> = (0..cap)
114                .map(|i| {
115                    // Set the stamp to `{ lap: 0, mark: 0, index: i }`.
116                    Slot {
117                        stamp: AtomicUsize::new(i),
118                        msg: UnsafeCell::new(MaybeUninit::uninit()),
119                    }
120                })
121                .collect();
122            let ptr = boxed.as_mut_ptr();
123            mem::forget(boxed);
124            ptr
125        };
126
127        Channel {
128            buffer,
129            cap,
130            one_lap,
131            mark_bit,
132            head: CachePadded::new(AtomicUsize::new(head)),
133            tail: CachePadded::new(AtomicUsize::new(tail)),
134            senders: SyncWaker::new(),
135            receivers: SyncWaker::new(),
136            _marker: PhantomData,
137        }
138    }
139
140    /// Returns a receiver handle to the channel.
141    pub(crate) fn receiver(&self) -> Receiver<'_, T> {
142        Receiver(self)
143    }
144
145    /// Returns a sender handle to the channel.
146    pub(crate) fn sender(&self) -> Sender<'_, T> {
147        Sender(self)
148    }
149
150    /// Attempts to reserve a slot for sending a message.
151    fn start_send(&self, token: &mut Token) -> bool {
152        let backoff = Backoff::new();
153        let mut tail = self.tail.load(Ordering::Relaxed);
154
155        loop {
156            // Check if the channel is disconnected.
157            if tail & self.mark_bit != 0 {
158                token.array.slot = ptr::null();
159                token.array.stamp = 0;
160                return true;
161            }
162
163            // Deconstruct the tail.
164            let index = tail & (self.mark_bit - 1);
165            let lap = tail & !(self.one_lap - 1);
166
167            // Inspect the corresponding slot.
168            let slot = unsafe { &*self.buffer.add(index) };
169            let stamp = slot.stamp.load(Ordering::Acquire);
170
171            // If the tail and the stamp match, we may attempt to push.
172            if tail == stamp {
173                let new_tail = if index + 1 < self.cap {
174                    // Same lap, incremented index.
175                    // Set to `{ lap: lap, mark: 0, index: index + 1 }`.
176                    tail + 1
177                } else {
178                    // One lap forward, index wraps around to zero.
179                    // Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`.
180                    lap.wrapping_add(self.one_lap)
181                };
182
183                // Try moving the tail.
184                match self.tail.compare_exchange_weak(
185                    tail,
186                    new_tail,
187                    Ordering::SeqCst,
188                    Ordering::Relaxed,
189                ) {
190                    Ok(_) => {
191                        // Prepare the token for the follow-up call to `write`.
192                        token.array.slot = slot as *const Slot<T> as *const u8;
193                        token.array.stamp = tail + 1;
194                        return true;
195                    }
196                    Err(t) => {
197                        tail = t;
198                        backoff.spin();
199                    }
200                }
201            } else if stamp.wrapping_add(self.one_lap) == tail + 1 {
202                atomic::fence(Ordering::SeqCst);
203                let head = self.head.load(Ordering::Relaxed);
204
205                // If the head lags one lap behind the tail as well...
206                if head.wrapping_add(self.one_lap) == tail {
207                    // ...then the channel is full.
208                    return false;
209                }
210
211                backoff.spin();
212                tail = self.tail.load(Ordering::Relaxed);
213            } else {
214                // Snooze because we need to wait for the stamp to get updated.
215                backoff.snooze();
216                tail = self.tail.load(Ordering::Relaxed);
217            }
218        }
219    }
220
221    /// Writes a message into the channel.
222    pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
223        // If there is no slot, the channel is disconnected.
224        if token.array.slot.is_null() {
225            return Err(msg);
226        }
227
228        let slot: &Slot<T> = &*(token.array.slot as *const Slot<T>);
229
230        // Write the message into the slot and update the stamp.
231        slot.msg.get().write(MaybeUninit::new(msg));
232        slot.stamp.store(token.array.stamp, Ordering::Release);
233
234        // Wake a sleeping receiver.
235        self.receivers.notify();
236        Ok(())
237    }
238
239    /// Attempts to reserve a slot for receiving a message.
240    fn start_recv(&self, token: &mut Token) -> bool {
241        let backoff = Backoff::new();
242        let mut head = self.head.load(Ordering::Relaxed);
243
244        loop {
245            // Deconstruct the head.
246            let index = head & (self.mark_bit - 1);
247            let lap = head & !(self.one_lap - 1);
248
249            // Inspect the corresponding slot.
250            let slot = unsafe { &*self.buffer.add(index) };
251            let stamp = slot.stamp.load(Ordering::Acquire);
252
253            // If the the stamp is ahead of the head by 1, we may attempt to pop.
254            if head + 1 == stamp {
255                let new = if index + 1 < self.cap {
256                    // Same lap, incremented index.
257                    // Set to `{ lap: lap, mark: 0, index: index + 1 }`.
258                    head + 1
259                } else {
260                    // One lap forward, index wraps around to zero.
261                    // Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`.
262                    lap.wrapping_add(self.one_lap)
263                };
264
265                // Try moving the head.
266                match self.head.compare_exchange_weak(
267                    head,
268                    new,
269                    Ordering::SeqCst,
270                    Ordering::Relaxed,
271                ) {
272                    Ok(_) => {
273                        // Prepare the token for the follow-up call to `read`.
274                        token.array.slot = slot as *const Slot<T> as *const u8;
275                        token.array.stamp = head.wrapping_add(self.one_lap);
276                        return true;
277                    }
278                    Err(h) => {
279                        head = h;
280                        backoff.spin();
281                    }
282                }
283            } else if stamp == head {
284                atomic::fence(Ordering::SeqCst);
285                let tail = self.tail.load(Ordering::Relaxed);
286
287                // If the tail equals the head, that means the channel is empty.
288                if (tail & !self.mark_bit) == head {
289                    // If the channel is disconnected...
290                    if tail & self.mark_bit != 0 {
291                        // ...then receive an error.
292                        token.array.slot = ptr::null();
293                        token.array.stamp = 0;
294                        return true;
295                    } else {
296                        // Otherwise, the receive operation is not ready.
297                        return false;
298                    }
299                }
300
301                backoff.spin();
302                head = self.head.load(Ordering::Relaxed);
303            } else {
304                // Snooze because we need to wait for the stamp to get updated.
305                backoff.snooze();
306                head = self.head.load(Ordering::Relaxed);
307            }
308        }
309    }
310
311    /// Reads a message from the channel.
312    pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
313        if token.array.slot.is_null() {
314            // The channel is disconnected.
315            return Err(());
316        }
317
318        let slot: &Slot<T> = &*(token.array.slot as *const Slot<T>);
319
320        // Read the message from the slot and update the stamp.
321        let msg = slot.msg.get().read().assume_init();
322        slot.stamp.store(token.array.stamp, Ordering::Release);
323
324        // Wake a sleeping sender.
325        self.senders.notify();
326        Ok(msg)
327    }
328
329    /// Attempts to send a message into the channel.
330    pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
331        let token = &mut Token::default();
332        if self.start_send(token) {
333            unsafe { self.write(token, msg).map_err(TrySendError::Disconnected) }
334        } else {
335            Err(TrySendError::Full(msg))
336        }
337    }
338
339    /// Sends a message into the channel.
340    pub(crate) fn send(
341        &self,
342        msg: T,
343        deadline: Option<Instant>,
344    ) -> Result<(), SendTimeoutError<T>> {
345        let token = &mut Token::default();
346        loop {
347            // Try sending a message several times.
348            let backoff = Backoff::new();
349            loop {
350                if self.start_send(token) {
351                    let res = unsafe { self.write(token, msg) };
352                    return res.map_err(SendTimeoutError::Disconnected);
353                }
354
355                if backoff.is_completed() {
356                    break;
357                } else {
358                    backoff.snooze();
359                }
360            }
361
362            if let Some(d) = deadline {
363                if Instant::now() >= d {
364                    return Err(SendTimeoutError::Timeout(msg));
365                }
366            }
367
368            Context::with(|cx| {
369                // Prepare for blocking until a receiver wakes us up.
370                let oper = Operation::hook(token);
371                self.senders.register(oper, cx);
372
373                // Has the channel become ready just now?
374                if !self.is_full() || self.is_disconnected() {
375                    let _ = cx.try_select(Selected::Aborted);
376                }
377
378                // Block the current thread.
379                let sel = cx.wait_until(deadline);
380
381                match sel {
382                    Selected::Waiting => unreachable!(),
383                    Selected::Aborted | Selected::Disconnected => {
384                        self.senders.unregister(oper).unwrap();
385                    }
386                    Selected::Operation(_) => {}
387                }
388            });
389        }
390    }
391
392    /// Attempts to receive a message without blocking.
393    pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
394        let token = &mut Token::default();
395
396        if self.start_recv(token) {
397            unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) }
398        } else {
399            Err(TryRecvError::Empty)
400        }
401    }
402
403    /// Receives a message from the channel.
404    pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
405        let token = &mut Token::default();
406        loop {
407            // Try receiving a message several times.
408            let backoff = Backoff::new();
409            loop {
410                if self.start_recv(token) {
411                    let res = unsafe { self.read(token) };
412                    return res.map_err(|_| RecvTimeoutError::Disconnected);
413                }
414
415                if backoff.is_completed() {
416                    break;
417                } else {
418                    backoff.snooze();
419                }
420            }
421
422            if let Some(d) = deadline {
423                if Instant::now() >= d {
424                    return Err(RecvTimeoutError::Timeout);
425                }
426            }
427
428            Context::with(|cx| {
429                // Prepare for blocking until a sender wakes us up.
430                let oper = Operation::hook(token);
431                self.receivers.register(oper, cx);
432
433                // Has the channel become ready just now?
434                if !self.is_empty() || self.is_disconnected() {
435                    let _ = cx.try_select(Selected::Aborted);
436                }
437
438                // Block the current thread.
439                let sel = cx.wait_until(deadline);
440
441                match sel {
442                    Selected::Waiting => unreachable!(),
443                    Selected::Aborted | Selected::Disconnected => {
444                        self.receivers.unregister(oper).unwrap();
445                        // If the channel was disconnected, we still have to check for remaining
446                        // messages.
447                    }
448                    Selected::Operation(_) => {}
449                }
450            });
451        }
452    }
453
454    /// Returns the current number of messages inside the channel.
455    pub(crate) fn len(&self) -> usize {
456        loop {
457            // Load the tail, then load the head.
458            let tail = self.tail.load(Ordering::SeqCst);
459            let head = self.head.load(Ordering::SeqCst);
460
461            // If the tail didn't change, we've got consistent values to work with.
462            if self.tail.load(Ordering::SeqCst) == tail {
463                let hix = head & (self.mark_bit - 1);
464                let tix = tail & (self.mark_bit - 1);
465
466                return if hix < tix {
467                    tix - hix
468                } else if hix > tix {
469                    self.cap - hix + tix
470                } else if (tail & !self.mark_bit) == head {
471                    0
472                } else {
473                    self.cap
474                };
475            }
476        }
477    }
478
479    /// Returns the capacity of the channel.
480    #[allow(clippy::unnecessary_wraps)] // This is intentional.
481    pub(crate) fn capacity(&self) -> Option<usize> {
482        Some(self.cap)
483    }
484
485    /// Disconnects the channel and wakes up all blocked senders and receivers.
486    ///
487    /// Returns `true` if this call disconnected the channel.
488    pub(crate) fn disconnect(&self) -> bool {
489        let tail = self.tail.fetch_or(self.mark_bit, Ordering::SeqCst);
490
491        if tail & self.mark_bit == 0 {
492            self.senders.disconnect();
493            self.receivers.disconnect();
494            true
495        } else {
496            false
497        }
498    }
499
500    /// Returns `true` if the channel is disconnected.
501    pub(crate) fn is_disconnected(&self) -> bool {
502        self.tail.load(Ordering::SeqCst) & self.mark_bit != 0
503    }
504
505    /// Returns `true` if the channel is empty.
506    pub(crate) fn is_empty(&self) -> bool {
507        let head = self.head.load(Ordering::SeqCst);
508        let tail = self.tail.load(Ordering::SeqCst);
509
510        // Is the tail equal to the head?
511        //
512        // Note: If the head changes just before we load the tail, that means there was a moment
513        // when the channel was not empty, so it is safe to just return `false`.
514        (tail & !self.mark_bit) == head
515    }
516
517    /// Returns `true` if the channel is full.
518    pub(crate) fn is_full(&self) -> bool {
519        let tail = self.tail.load(Ordering::SeqCst);
520        let head = self.head.load(Ordering::SeqCst);
521
522        // Is the head lagging one lap behind tail?
523        //
524        // Note: If the tail changes just before we load the head, that means there was a moment
525        // when the channel was not full, so it is safe to just return `false`.
526        head.wrapping_add(self.one_lap) == tail & !self.mark_bit
527    }
528}
529
530impl<T> Drop for Channel<T> {
531    fn drop(&mut self) {
532        // Get the index of the head.
533        let hix = self.head.load(Ordering::Relaxed) & (self.mark_bit - 1);
534
535        // Loop over all slots that hold a message and drop them.
536        for i in 0..self.len() {
537            // Compute the index of the next slot holding a message.
538            let index = if hix + i < self.cap {
539                hix + i
540            } else {
541                hix + i - self.cap
542            };
543
544            unsafe {
545                let p = {
546                    let slot = &mut *self.buffer.add(index);
547                    let msg = &mut *slot.msg.get();
548                    msg.as_mut_ptr()
549                };
550                p.drop_in_place();
551            }
552        }
553
554        // Finally, deallocate the buffer, but don't run any destructors.
555        unsafe {
556            // Create a slice from the buffer to make
557            // a fat pointer. Then, use Box::from_raw
558            // to deallocate it.
559            let ptr = std::slice::from_raw_parts_mut(self.buffer, self.cap) as *mut [Slot<T>];
560            Box::from_raw(ptr);
561        }
562    }
563}
564
565/// Receiver handle to a channel.
566pub(crate) struct Receiver<'a, T>(&'a Channel<T>);
567
568/// Sender handle to a channel.
569pub(crate) struct Sender<'a, T>(&'a Channel<T>);
570
571impl<T> SelectHandle for Receiver<'_, T> {
572    fn try_select(&self, token: &mut Token) -> bool {
573        self.0.start_recv(token)
574    }
575
576    fn deadline(&self) -> Option<Instant> {
577        None
578    }
579
580    fn register(&self, oper: Operation, cx: &Context) -> bool {
581        self.0.receivers.register(oper, cx);
582        self.is_ready()
583    }
584
585    fn unregister(&self, oper: Operation) {
586        self.0.receivers.unregister(oper);
587    }
588
589    fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
590        self.try_select(token)
591    }
592
593    fn is_ready(&self) -> bool {
594        !self.0.is_empty() || self.0.is_disconnected()
595    }
596
597    fn watch(&self, oper: Operation, cx: &Context) -> bool {
598        self.0.receivers.watch(oper, cx);
599        self.is_ready()
600    }
601
602    fn unwatch(&self, oper: Operation) {
603        self.0.receivers.unwatch(oper);
604    }
605}
606
607impl<T> SelectHandle for Sender<'_, T> {
608    fn try_select(&self, token: &mut Token) -> bool {
609        self.0.start_send(token)
610    }
611
612    fn deadline(&self) -> Option<Instant> {
613        None
614    }
615
616    fn register(&self, oper: Operation, cx: &Context) -> bool {
617        self.0.senders.register(oper, cx);
618        self.is_ready()
619    }
620
621    fn unregister(&self, oper: Operation) {
622        self.0.senders.unregister(oper);
623    }
624
625    fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
626        self.try_select(token)
627    }
628
629    fn is_ready(&self) -> bool {
630        !self.0.is_full() || self.0.is_disconnected()
631    }
632
633    fn watch(&self, oper: Operation, cx: &Context) -> bool {
634        self.0.senders.watch(oper, cx);
635        self.is_ready()
636    }
637
638    fn unwatch(&self, oper: Operation) {
639        self.0.senders.unwatch(oper);
640    }
641}