crossbeam_channel/flavors/
list.rs

1//! Unbounded channel implemented as a linked list.
2
3use std::cell::UnsafeCell;
4use std::marker::PhantomData;
5use std::mem::MaybeUninit;
6use std::ptr;
7use std::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering};
8use std::time::Instant;
9
10use crossbeam_utils::{Backoff, CachePadded};
11
12use crate::context::Context;
13use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError};
14use crate::select::{Operation, SelectHandle, Selected, Token};
15use crate::waker::SyncWaker;
16
17// TODO(stjepang): Once we bump the minimum required Rust version to 1.28 or newer, re-apply the
18// following changes by @kleimkuhler:
19//
20// 1. https://github.com/crossbeam-rs/crossbeam-channel/pull/100
21// 2. https://github.com/crossbeam-rs/crossbeam-channel/pull/101
22
23// Bits indicating the state of a slot:
24// * If a message has been written into the slot, `WRITE` is set.
25// * If a message has been read from the slot, `READ` is set.
26// * If the block is being destroyed, `DESTROY` is set.
27const WRITE: usize = 1;
28const READ: usize = 2;
29const DESTROY: usize = 4;
30
31// Each block covers one "lap" of indices.
32const LAP: usize = 32;
33// The maximum number of messages a block can hold.
34const BLOCK_CAP: usize = LAP - 1;
35// How many lower bits are reserved for metadata.
36const SHIFT: usize = 1;
37// Has two different purposes:
38// * If set in head, indicates that the block is not the last one.
39// * If set in tail, indicates that the channel is disconnected.
40const MARK_BIT: usize = 1;
41
42/// A slot in a block.
43struct Slot<T> {
44    /// The message.
45    msg: UnsafeCell<MaybeUninit<T>>,
46
47    /// The state of the slot.
48    state: AtomicUsize,
49}
50
51impl<T> Slot<T> {
52    /// Waits until a message is written into the slot.
53    fn wait_write(&self) {
54        let backoff = Backoff::new();
55        while self.state.load(Ordering::Acquire) & WRITE == 0 {
56            backoff.snooze();
57        }
58    }
59}
60
61/// A block in a linked list.
62///
63/// Each block in the list can hold up to `BLOCK_CAP` messages.
64struct Block<T> {
65    /// The next block in the linked list.
66    next: AtomicPtr<Block<T>>,
67
68    /// Slots for messages.
69    slots: [Slot<T>; BLOCK_CAP],
70}
71
72impl<T> Block<T> {
73    /// Creates an empty block.
74    fn new() -> Block<T> {
75        // SAFETY: This is safe because:
76        //  [1] `Block::next` (AtomicPtr) may be safely zero initialized.
77        //  [2] `Block::slots` (Array) may be safely zero initialized because of [3, 4].
78        //  [3] `Slot::msg` (UnsafeCell) may be safely zero initialized because it
79        //       holds a MaybeUninit.
80        //  [4] `Slot::state` (AtomicUsize) may be safely zero initialized.
81        unsafe { MaybeUninit::zeroed().assume_init() }
82    }
83
84    /// Waits until the next pointer is set.
85    fn wait_next(&self) -> *mut Block<T> {
86        let backoff = Backoff::new();
87        loop {
88            let next = self.next.load(Ordering::Acquire);
89            if !next.is_null() {
90                return next;
91            }
92            backoff.snooze();
93        }
94    }
95
96    /// Sets the `DESTROY` bit in slots starting from `start` and destroys the block.
97    unsafe fn destroy(this: *mut Block<T>, start: usize) {
98        // It is not necessary to set the `DESTROY` bit in the last slot because that slot has
99        // begun destruction of the block.
100        for i in start..BLOCK_CAP - 1 {
101            let slot = (*this).slots.get_unchecked(i);
102
103            // Mark the `DESTROY` bit if a thread is still using the slot.
104            if slot.state.load(Ordering::Acquire) & READ == 0
105                && slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0
106            {
107                // If a thread is still using the slot, it will continue destruction of the block.
108                return;
109            }
110        }
111
112        // No thread is using the block, now it is safe to destroy it.
113        drop(Box::from_raw(this));
114    }
115}
116
117/// A position in a channel.
118#[derive(Debug)]
119struct Position<T> {
120    /// The index in the channel.
121    index: AtomicUsize,
122
123    /// The block in the linked list.
124    block: AtomicPtr<Block<T>>,
125}
126
127/// The token type for the list flavor.
128#[derive(Debug)]
129pub struct ListToken {
130    /// The block of slots.
131    block: *const u8,
132
133    /// The offset into the block.
134    offset: usize,
135}
136
137impl Default for ListToken {
138    #[inline]
139    fn default() -> Self {
140        ListToken {
141            block: ptr::null(),
142            offset: 0,
143        }
144    }
145}
146
147/// Unbounded channel implemented as a linked list.
148///
149/// Each message sent into the channel is assigned a sequence number, i.e. an index. Indices are
150/// represented as numbers of type `usize` and wrap on overflow.
151///
152/// Consecutive messages are grouped into blocks in order to put less pressure on the allocator and
153/// improve cache efficiency.
154pub(crate) struct Channel<T> {
155    /// The head of the channel.
156    head: CachePadded<Position<T>>,
157
158    /// The tail of the channel.
159    tail: CachePadded<Position<T>>,
160
161    /// Receivers waiting while the channel is empty and not disconnected.
162    receivers: SyncWaker,
163
164    /// Indicates that dropping a `Channel<T>` may drop messages of type `T`.
165    _marker: PhantomData<T>,
166}
167
168impl<T> Channel<T> {
169    /// Creates a new unbounded channel.
170    pub(crate) fn new() -> Self {
171        Channel {
172            head: CachePadded::new(Position {
173                block: AtomicPtr::new(ptr::null_mut()),
174                index: AtomicUsize::new(0),
175            }),
176            tail: CachePadded::new(Position {
177                block: AtomicPtr::new(ptr::null_mut()),
178                index: AtomicUsize::new(0),
179            }),
180            receivers: SyncWaker::new(),
181            _marker: PhantomData,
182        }
183    }
184
185    /// Returns a receiver handle to the channel.
186    pub(crate) fn receiver(&self) -> Receiver<'_, T> {
187        Receiver(self)
188    }
189
190    /// Returns a sender handle to the channel.
191    pub(crate) fn sender(&self) -> Sender<'_, T> {
192        Sender(self)
193    }
194
195    /// Attempts to reserve a slot for sending a message.
196    fn start_send(&self, token: &mut Token) -> bool {
197        let backoff = Backoff::new();
198        let mut tail = self.tail.index.load(Ordering::Acquire);
199        let mut block = self.tail.block.load(Ordering::Acquire);
200        let mut next_block = None;
201
202        loop {
203            // Check if the channel is disconnected.
204            if tail & MARK_BIT != 0 {
205                token.list.block = ptr::null();
206                return true;
207            }
208
209            // Calculate the offset of the index into the block.
210            let offset = (tail >> SHIFT) % LAP;
211
212            // If we reached the end of the block, wait until the next one is installed.
213            if offset == BLOCK_CAP {
214                backoff.snooze();
215                tail = self.tail.index.load(Ordering::Acquire);
216                block = self.tail.block.load(Ordering::Acquire);
217                continue;
218            }
219
220            // If we're going to have to install the next block, allocate it in advance in order to
221            // make the wait for other threads as short as possible.
222            if offset + 1 == BLOCK_CAP && next_block.is_none() {
223                next_block = Some(Box::new(Block::<T>::new()));
224            }
225
226            // If this is the first message to be sent into the channel, we need to allocate the
227            // first block and install it.
228            if block.is_null() {
229                let new = Box::into_raw(Box::new(Block::<T>::new()));
230
231                if self
232                    .tail
233                    .block
234                    .compare_exchange(block, new, Ordering::Release, Ordering::Relaxed)
235                    .is_ok()
236                {
237                    self.head.block.store(new, Ordering::Release);
238                    block = new;
239                } else {
240                    next_block = unsafe { Some(Box::from_raw(new)) };
241                    tail = self.tail.index.load(Ordering::Acquire);
242                    block = self.tail.block.load(Ordering::Acquire);
243                    continue;
244                }
245            }
246
247            let new_tail = tail + (1 << SHIFT);
248
249            // Try advancing the tail forward.
250            match self.tail.index.compare_exchange_weak(
251                tail,
252                new_tail,
253                Ordering::SeqCst,
254                Ordering::Acquire,
255            ) {
256                Ok(_) => unsafe {
257                    // If we've reached the end of the block, install the next one.
258                    if offset + 1 == BLOCK_CAP {
259                        let next_block = Box::into_raw(next_block.unwrap());
260                        self.tail.block.store(next_block, Ordering::Release);
261                        self.tail.index.fetch_add(1 << SHIFT, Ordering::Release);
262                        (*block).next.store(next_block, Ordering::Release);
263                    }
264
265                    token.list.block = block as *const u8;
266                    token.list.offset = offset;
267                    return true;
268                },
269                Err(t) => {
270                    tail = t;
271                    block = self.tail.block.load(Ordering::Acquire);
272                    backoff.spin();
273                }
274            }
275        }
276    }
277
278    /// Writes a message into the channel.
279    pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
280        // If there is no slot, the channel is disconnected.
281        if token.list.block.is_null() {
282            return Err(msg);
283        }
284
285        // Write the message into the slot.
286        let block = token.list.block as *mut Block<T>;
287        let offset = token.list.offset;
288        let slot = (*block).slots.get_unchecked(offset);
289        slot.msg.get().write(MaybeUninit::new(msg));
290        slot.state.fetch_or(WRITE, Ordering::Release);
291
292        // Wake a sleeping receiver.
293        self.receivers.notify();
294        Ok(())
295    }
296
297    /// Attempts to reserve a slot for receiving a message.
298    fn start_recv(&self, token: &mut Token) -> bool {
299        let backoff = Backoff::new();
300        let mut head = self.head.index.load(Ordering::Acquire);
301        let mut block = self.head.block.load(Ordering::Acquire);
302
303        loop {
304            // Calculate the offset of the index into the block.
305            let offset = (head >> SHIFT) % LAP;
306
307            // If we reached the end of the block, wait until the next one is installed.
308            if offset == BLOCK_CAP {
309                backoff.snooze();
310                head = self.head.index.load(Ordering::Acquire);
311                block = self.head.block.load(Ordering::Acquire);
312                continue;
313            }
314
315            let mut new_head = head + (1 << SHIFT);
316
317            if new_head & MARK_BIT == 0 {
318                atomic::fence(Ordering::SeqCst);
319                let tail = self.tail.index.load(Ordering::Relaxed);
320
321                // If the tail equals the head, that means the channel is empty.
322                if head >> SHIFT == tail >> SHIFT {
323                    // If the channel is disconnected...
324                    if tail & MARK_BIT != 0 {
325                        // ...then receive an error.
326                        token.list.block = ptr::null();
327                        return true;
328                    } else {
329                        // Otherwise, the receive operation is not ready.
330                        return false;
331                    }
332                }
333
334                // If head and tail are not in the same block, set `MARK_BIT` in head.
335                if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
336                    new_head |= MARK_BIT;
337                }
338            }
339
340            // The block can be null here only if the first message is being sent into the channel.
341            // In that case, just wait until it gets initialized.
342            if block.is_null() {
343                backoff.snooze();
344                head = self.head.index.load(Ordering::Acquire);
345                block = self.head.block.load(Ordering::Acquire);
346                continue;
347            }
348
349            // Try moving the head index forward.
350            match self.head.index.compare_exchange_weak(
351                head,
352                new_head,
353                Ordering::SeqCst,
354                Ordering::Acquire,
355            ) {
356                Ok(_) => unsafe {
357                    // If we've reached the end of the block, move to the next one.
358                    if offset + 1 == BLOCK_CAP {
359                        let next = (*block).wait_next();
360                        let mut next_index = (new_head & !MARK_BIT).wrapping_add(1 << SHIFT);
361                        if !(*next).next.load(Ordering::Relaxed).is_null() {
362                            next_index |= MARK_BIT;
363                        }
364
365                        self.head.block.store(next, Ordering::Release);
366                        self.head.index.store(next_index, Ordering::Release);
367                    }
368
369                    token.list.block = block as *const u8;
370                    token.list.offset = offset;
371                    return true;
372                },
373                Err(h) => {
374                    head = h;
375                    block = self.head.block.load(Ordering::Acquire);
376                    backoff.spin();
377                }
378            }
379        }
380    }
381
382    /// Reads a message from the channel.
383    pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
384        if token.list.block.is_null() {
385            // The channel is disconnected.
386            return Err(());
387        }
388
389        // Read the message.
390        let block = token.list.block as *mut Block<T>;
391        let offset = token.list.offset;
392        let slot = (*block).slots.get_unchecked(offset);
393        slot.wait_write();
394        let msg = slot.msg.get().read().assume_init();
395
396        // Destroy the block if we've reached the end, or if another thread wanted to destroy but
397        // couldn't because we were busy reading from the slot.
398        if offset + 1 == BLOCK_CAP {
399            Block::destroy(block, 0);
400        } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
401            Block::destroy(block, offset + 1);
402        }
403
404        Ok(msg)
405    }
406
407    /// Attempts to send a message into the channel.
408    pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
409        self.send(msg, None).map_err(|err| match err {
410            SendTimeoutError::Disconnected(msg) => TrySendError::Disconnected(msg),
411            SendTimeoutError::Timeout(_) => unreachable!(),
412        })
413    }
414
415    /// Sends a message into the channel.
416    pub(crate) fn send(
417        &self,
418        msg: T,
419        _deadline: Option<Instant>,
420    ) -> Result<(), SendTimeoutError<T>> {
421        let token = &mut Token::default();
422        assert!(self.start_send(token));
423        unsafe {
424            self.write(token, msg)
425                .map_err(SendTimeoutError::Disconnected)
426        }
427    }
428
429    /// Attempts to receive a message without blocking.
430    pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
431        let token = &mut Token::default();
432
433        if self.start_recv(token) {
434            unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) }
435        } else {
436            Err(TryRecvError::Empty)
437        }
438    }
439
440    /// Receives a message from the channel.
441    pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
442        let token = &mut Token::default();
443        loop {
444            // Try receiving a message several times.
445            let backoff = Backoff::new();
446            loop {
447                if self.start_recv(token) {
448                    unsafe {
449                        return self.read(token).map_err(|_| RecvTimeoutError::Disconnected);
450                    }
451                }
452
453                if backoff.is_completed() {
454                    break;
455                } else {
456                    backoff.snooze();
457                }
458            }
459
460            if let Some(d) = deadline {
461                if Instant::now() >= d {
462                    return Err(RecvTimeoutError::Timeout);
463                }
464            }
465
466            // Prepare for blocking until a sender wakes us up.
467            Context::with(|cx| {
468                let oper = Operation::hook(token);
469                self.receivers.register(oper, cx);
470
471                // Has the channel become ready just now?
472                if !self.is_empty() || self.is_disconnected() {
473                    let _ = cx.try_select(Selected::Aborted);
474                }
475
476                // Block the current thread.
477                let sel = cx.wait_until(deadline);
478
479                match sel {
480                    Selected::Waiting => unreachable!(),
481                    Selected::Aborted | Selected::Disconnected => {
482                        self.receivers.unregister(oper).unwrap();
483                        // If the channel was disconnected, we still have to check for remaining
484                        // messages.
485                    }
486                    Selected::Operation(_) => {}
487                }
488            });
489        }
490    }
491
492    /// Returns the current number of messages inside the channel.
493    pub(crate) fn len(&self) -> usize {
494        loop {
495            // Load the tail index, then load the head index.
496            let mut tail = self.tail.index.load(Ordering::SeqCst);
497            let mut head = self.head.index.load(Ordering::SeqCst);
498
499            // If the tail index didn't change, we've got consistent indices to work with.
500            if self.tail.index.load(Ordering::SeqCst) == tail {
501                // Erase the lower bits.
502                tail &= !((1 << SHIFT) - 1);
503                head &= !((1 << SHIFT) - 1);
504
505                // Fix up indices if they fall onto block ends.
506                if (tail >> SHIFT) & (LAP - 1) == LAP - 1 {
507                    tail = tail.wrapping_add(1 << SHIFT);
508                }
509                if (head >> SHIFT) & (LAP - 1) == LAP - 1 {
510                    head = head.wrapping_add(1 << SHIFT);
511                }
512
513                // Rotate indices so that head falls into the first block.
514                let lap = (head >> SHIFT) / LAP;
515                tail = tail.wrapping_sub((lap * LAP) << SHIFT);
516                head = head.wrapping_sub((lap * LAP) << SHIFT);
517
518                // Remove the lower bits.
519                tail >>= SHIFT;
520                head >>= SHIFT;
521
522                // Return the difference minus the number of blocks between tail and head.
523                return tail - head - tail / LAP;
524            }
525        }
526    }
527
528    /// Returns the capacity of the channel.
529    pub(crate) fn capacity(&self) -> Option<usize> {
530        None
531    }
532
533    /// Disconnects senders and wakes up all blocked receivers.
534    ///
535    /// Returns `true` if this call disconnected the channel.
536    pub(crate) fn disconnect_senders(&self) -> bool {
537        let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst);
538
539        if tail & MARK_BIT == 0 {
540            self.receivers.disconnect();
541            true
542        } else {
543            false
544        }
545    }
546
547    /// Disconnects receivers.
548    ///
549    /// Returns `true` if this call disconnected the channel.
550    pub(crate) fn disconnect_receivers(&self) -> bool {
551        let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst);
552
553        if tail & MARK_BIT == 0 {
554            // If receivers are dropped first, discard all messages to free
555            // memory eagerly.
556            self.discard_all_messages();
557            true
558        } else {
559            false
560        }
561    }
562
563    /// Discards all messages.
564    ///
565    /// This method should only be called when all receivers are dropped.
566    fn discard_all_messages(&self) {
567        let backoff = Backoff::new();
568        let mut tail = self.tail.index.load(Ordering::Acquire);
569        loop {
570            let offset = (tail >> SHIFT) % LAP;
571            if offset != BLOCK_CAP {
572                break;
573            }
574
575            // New updates to tail will be rejected by MARK_BIT and aborted unless it's
576            // at boundary. We need to wait for the updates take affect otherwise there
577            // can be memory leaks.
578            backoff.snooze();
579            tail = self.tail.index.load(Ordering::Acquire);
580        }
581
582        let mut head = self.head.index.load(Ordering::Acquire);
583        let mut block = self.head.block.load(Ordering::Acquire);
584
585        unsafe {
586            // Drop all messages between head and tail and deallocate the heap-allocated blocks.
587            while head >> SHIFT != tail >> SHIFT {
588                let offset = (head >> SHIFT) % LAP;
589
590                if offset < BLOCK_CAP {
591                    // Drop the message in the slot.
592                    let slot = (*block).slots.get_unchecked(offset);
593                    slot.wait_write();
594                    let p = &mut *slot.msg.get();
595                    p.as_mut_ptr().drop_in_place();
596                } else {
597                    (*block).wait_next();
598                    // Deallocate the block and move to the next one.
599                    let next = (*block).next.load(Ordering::Acquire);
600                    drop(Box::from_raw(block));
601                    block = next;
602                }
603
604                head = head.wrapping_add(1 << SHIFT);
605            }
606
607            // Deallocate the last remaining block.
608            if !block.is_null() {
609                drop(Box::from_raw(block));
610            }
611        }
612        head &= !MARK_BIT;
613        self.head.block.store(ptr::null_mut(), Ordering::Release);
614        self.head.index.store(head, Ordering::Release);
615    }
616
617    /// Returns `true` if the channel is disconnected.
618    pub(crate) fn is_disconnected(&self) -> bool {
619        self.tail.index.load(Ordering::SeqCst) & MARK_BIT != 0
620    }
621
622    /// Returns `true` if the channel is empty.
623    pub(crate) fn is_empty(&self) -> bool {
624        let head = self.head.index.load(Ordering::SeqCst);
625        let tail = self.tail.index.load(Ordering::SeqCst);
626        head >> SHIFT == tail >> SHIFT
627    }
628
629    /// Returns `true` if the channel is full.
630    pub(crate) fn is_full(&self) -> bool {
631        false
632    }
633}
634
635impl<T> Drop for Channel<T> {
636    fn drop(&mut self) {
637        let mut head = self.head.index.load(Ordering::Relaxed);
638        let mut tail = self.tail.index.load(Ordering::Relaxed);
639        let mut block = self.head.block.load(Ordering::Relaxed);
640
641        // Erase the lower bits.
642        head &= !((1 << SHIFT) - 1);
643        tail &= !((1 << SHIFT) - 1);
644
645        unsafe {
646            // Drop all messages between head and tail and deallocate the heap-allocated blocks.
647            while head != tail {
648                let offset = (head >> SHIFT) % LAP;
649
650                if offset < BLOCK_CAP {
651                    // Drop the message in the slot.
652                    let slot = (*block).slots.get_unchecked(offset);
653                    let p = &mut *slot.msg.get();
654                    p.as_mut_ptr().drop_in_place();
655                } else {
656                    // Deallocate the block and move to the next one.
657                    let next = (*block).next.load(Ordering::Relaxed);
658                    drop(Box::from_raw(block));
659                    block = next;
660                }
661
662                head = head.wrapping_add(1 << SHIFT);
663            }
664
665            // Deallocate the last remaining block.
666            if !block.is_null() {
667                drop(Box::from_raw(block));
668            }
669        }
670    }
671}
672
673/// Receiver handle to a channel.
674pub(crate) struct Receiver<'a, T>(&'a Channel<T>);
675
676/// Sender handle to a channel.
677pub(crate) struct Sender<'a, T>(&'a Channel<T>);
678
679impl<T> SelectHandle for Receiver<'_, T> {
680    fn try_select(&self, token: &mut Token) -> bool {
681        self.0.start_recv(token)
682    }
683
684    fn deadline(&self) -> Option<Instant> {
685        None
686    }
687
688    fn register(&self, oper: Operation, cx: &Context) -> bool {
689        self.0.receivers.register(oper, cx);
690        self.is_ready()
691    }
692
693    fn unregister(&self, oper: Operation) {
694        self.0.receivers.unregister(oper);
695    }
696
697    fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
698        self.try_select(token)
699    }
700
701    fn is_ready(&self) -> bool {
702        !self.0.is_empty() || self.0.is_disconnected()
703    }
704
705    fn watch(&self, oper: Operation, cx: &Context) -> bool {
706        self.0.receivers.watch(oper, cx);
707        self.is_ready()
708    }
709
710    fn unwatch(&self, oper: Operation) {
711        self.0.receivers.unwatch(oper);
712    }
713}
714
715impl<T> SelectHandle for Sender<'_, T> {
716    fn try_select(&self, token: &mut Token) -> bool {
717        self.0.start_send(token)
718    }
719
720    fn deadline(&self) -> Option<Instant> {
721        None
722    }
723
724    fn register(&self, _oper: Operation, _cx: &Context) -> bool {
725        self.is_ready()
726    }
727
728    fn unregister(&self, _oper: Operation) {}
729
730    fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
731        self.try_select(token)
732    }
733
734    fn is_ready(&self) -> bool {
735        true
736    }
737
738    fn watch(&self, _oper: Operation, _cx: &Context) -> bool {
739        self.is_ready()
740    }
741
742    fn unwatch(&self, _oper: Operation) {}
743}