1//! Unbounded channel implemented as a linked list.
23use std::cell::UnsafeCell;
4use std::marker::PhantomData;
5use std::mem::MaybeUninit;
6use std::ptr;
7use std::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering};
8use std::time::Instant;
910use crossbeam_utils::{Backoff, CachePadded};
1112use crate::context::Context;
13use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError};
14use crate::select::{Operation, SelectHandle, Selected, Token};
15use crate::waker::SyncWaker;
1617// TODO(stjepang): Once we bump the minimum required Rust version to 1.28 or newer, re-apply the
18// following changes by @kleimkuhler:
19//
20// 1. https://github.com/crossbeam-rs/crossbeam-channel/pull/100
21// 2. https://github.com/crossbeam-rs/crossbeam-channel/pull/101
2223// Bits indicating the state of a slot:
24// * If a message has been written into the slot, `WRITE` is set.
25// * If a message has been read from the slot, `READ` is set.
26// * If the block is being destroyed, `DESTROY` is set.
27const WRITE: usize = 1;
28const READ: usize = 2;
29const DESTROY: usize = 4;
3031// Each block covers one "lap" of indices.
32const LAP: usize = 32;
33// The maximum number of messages a block can hold.
34const BLOCK_CAP: usize = LAP - 1;
35// How many lower bits are reserved for metadata.
36const SHIFT: usize = 1;
37// Has two different purposes:
38// * If set in head, indicates that the block is not the last one.
39// * If set in tail, indicates that the channel is disconnected.
40const MARK_BIT: usize = 1;
4142/// A slot in a block.
43struct Slot<T> {
44/// The message.
45msg: UnsafeCell<MaybeUninit<T>>,
4647/// The state of the slot.
48state: AtomicUsize,
49}
5051impl<T> Slot<T> {
52/// Waits until a message is written into the slot.
53fn wait_write(&self) {
54let backoff = Backoff::new();
55while self.state.load(Ordering::Acquire) & WRITE == 0 {
56 backoff.snooze();
57 }
58 }
59}
6061/// A block in a linked list.
62///
63/// Each block in the list can hold up to `BLOCK_CAP` messages.
64struct Block<T> {
65/// The next block in the linked list.
66next: AtomicPtr<Block<T>>,
6768/// Slots for messages.
69slots: [Slot<T>; BLOCK_CAP],
70}
7172impl<T> Block<T> {
73/// Creates an empty block.
74fn new() -> Block<T> {
75// SAFETY: This is safe because:
76 // [1] `Block::next` (AtomicPtr) may be safely zero initialized.
77 // [2] `Block::slots` (Array) may be safely zero initialized because of [3, 4].
78 // [3] `Slot::msg` (UnsafeCell) may be safely zero initialized because it
79 // holds a MaybeUninit.
80 // [4] `Slot::state` (AtomicUsize) may be safely zero initialized.
81unsafe { MaybeUninit::zeroed().assume_init() }
82 }
8384/// Waits until the next pointer is set.
85fn wait_next(&self) -> *mut Block<T> {
86let backoff = Backoff::new();
87loop {
88let next = self.next.load(Ordering::Acquire);
89if !next.is_null() {
90return next;
91 }
92 backoff.snooze();
93 }
94 }
9596/// Sets the `DESTROY` bit in slots starting from `start` and destroys the block.
97unsafe fn destroy(this: *mut Block<T>, start: usize) {
98// It is not necessary to set the `DESTROY` bit in the last slot because that slot has
99 // begun destruction of the block.
100for i in start..BLOCK_CAP - 1 {
101let slot = (*this).slots.get_unchecked(i);
102103// Mark the `DESTROY` bit if a thread is still using the slot.
104if slot.state.load(Ordering::Acquire) & READ == 0
105&& slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0
106{
107// If a thread is still using the slot, it will continue destruction of the block.
108return;
109 }
110 }
111112// No thread is using the block, now it is safe to destroy it.
113drop(Box::from_raw(this));
114 }
115}
116117/// A position in a channel.
118#[derive(Debug)]
119struct Position<T> {
120/// The index in the channel.
121index: AtomicUsize,
122123/// The block in the linked list.
124block: AtomicPtr<Block<T>>,
125}
126127/// The token type for the list flavor.
128#[derive(Debug)]
129pub struct ListToken {
130/// The block of slots.
131block: *const u8,
132133/// The offset into the block.
134offset: usize,
135}
136137impl Default for ListToken {
138#[inline]
139fn default() -> Self {
140 ListToken {
141 block: ptr::null(),
142 offset: 0,
143 }
144 }
145}
146147/// Unbounded channel implemented as a linked list.
148///
149/// Each message sent into the channel is assigned a sequence number, i.e. an index. Indices are
150/// represented as numbers of type `usize` and wrap on overflow.
151///
152/// Consecutive messages are grouped into blocks in order to put less pressure on the allocator and
153/// improve cache efficiency.
154pub(crate) struct Channel<T> {
155/// The head of the channel.
156head: CachePadded<Position<T>>,
157158/// The tail of the channel.
159tail: CachePadded<Position<T>>,
160161/// Receivers waiting while the channel is empty and not disconnected.
162receivers: SyncWaker,
163164/// Indicates that dropping a `Channel<T>` may drop messages of type `T`.
165_marker: PhantomData<T>,
166}
167168impl<T> Channel<T> {
169/// Creates a new unbounded channel.
170pub(crate) fn new() -> Self {
171 Channel {
172 head: CachePadded::new(Position {
173 block: AtomicPtr::new(ptr::null_mut()),
174 index: AtomicUsize::new(0),
175 }),
176 tail: CachePadded::new(Position {
177 block: AtomicPtr::new(ptr::null_mut()),
178 index: AtomicUsize::new(0),
179 }),
180 receivers: SyncWaker::new(),
181 _marker: PhantomData,
182 }
183 }
184185/// Returns a receiver handle to the channel.
186pub(crate) fn receiver(&self) -> Receiver<'_, T> {
187 Receiver(self)
188 }
189190/// Returns a sender handle to the channel.
191pub(crate) fn sender(&self) -> Sender<'_, T> {
192 Sender(self)
193 }
194195/// Attempts to reserve a slot for sending a message.
196fn start_send(&self, token: &mut Token) -> bool {
197let backoff = Backoff::new();
198let mut tail = self.tail.index.load(Ordering::Acquire);
199let mut block = self.tail.block.load(Ordering::Acquire);
200let mut next_block = None;
201202loop {
203// Check if the channel is disconnected.
204if tail & MARK_BIT != 0 {
205 token.list.block = ptr::null();
206return true;
207 }
208209// Calculate the offset of the index into the block.
210let offset = (tail >> SHIFT) % LAP;
211212// If we reached the end of the block, wait until the next one is installed.
213if offset == BLOCK_CAP {
214 backoff.snooze();
215 tail = self.tail.index.load(Ordering::Acquire);
216 block = self.tail.block.load(Ordering::Acquire);
217continue;
218 }
219220// If we're going to have to install the next block, allocate it in advance in order to
221 // make the wait for other threads as short as possible.
222if offset + 1 == BLOCK_CAP && next_block.is_none() {
223 next_block = Some(Box::new(Block::<T>::new()));
224 }
225226// If this is the first message to be sent into the channel, we need to allocate the
227 // first block and install it.
228if block.is_null() {
229let new = Box::into_raw(Box::new(Block::<T>::new()));
230231if self
232.tail
233 .block
234 .compare_exchange(block, new, Ordering::Release, Ordering::Relaxed)
235 .is_ok()
236 {
237self.head.block.store(new, Ordering::Release);
238 block = new;
239 } else {
240 next_block = unsafe { Some(Box::from_raw(new)) };
241 tail = self.tail.index.load(Ordering::Acquire);
242 block = self.tail.block.load(Ordering::Acquire);
243continue;
244 }
245 }
246247let new_tail = tail + (1 << SHIFT);
248249// Try advancing the tail forward.
250match self.tail.index.compare_exchange_weak(
251 tail,
252 new_tail,
253 Ordering::SeqCst,
254 Ordering::Acquire,
255 ) {
256Ok(_) => unsafe {
257// If we've reached the end of the block, install the next one.
258if offset + 1 == BLOCK_CAP {
259let next_block = Box::into_raw(next_block.unwrap());
260self.tail.block.store(next_block, Ordering::Release);
261self.tail.index.fetch_add(1 << SHIFT, Ordering::Release);
262 (*block).next.store(next_block, Ordering::Release);
263 }
264265 token.list.block = block as *const u8;
266 token.list.offset = offset;
267return true;
268 },
269Err(t) => {
270 tail = t;
271 block = self.tail.block.load(Ordering::Acquire);
272 backoff.spin();
273 }
274 }
275 }
276 }
277278/// Writes a message into the channel.
279pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
280// If there is no slot, the channel is disconnected.
281if token.list.block.is_null() {
282return Err(msg);
283 }
284285// Write the message into the slot.
286let block = token.list.block as *mut Block<T>;
287let offset = token.list.offset;
288let slot = (*block).slots.get_unchecked(offset);
289 slot.msg.get().write(MaybeUninit::new(msg));
290 slot.state.fetch_or(WRITE, Ordering::Release);
291292// Wake a sleeping receiver.
293self.receivers.notify();
294Ok(())
295 }
296297/// Attempts to reserve a slot for receiving a message.
298fn start_recv(&self, token: &mut Token) -> bool {
299let backoff = Backoff::new();
300let mut head = self.head.index.load(Ordering::Acquire);
301let mut block = self.head.block.load(Ordering::Acquire);
302303loop {
304// Calculate the offset of the index into the block.
305let offset = (head >> SHIFT) % LAP;
306307// If we reached the end of the block, wait until the next one is installed.
308if offset == BLOCK_CAP {
309 backoff.snooze();
310 head = self.head.index.load(Ordering::Acquire);
311 block = self.head.block.load(Ordering::Acquire);
312continue;
313 }
314315let mut new_head = head + (1 << SHIFT);
316317if new_head & MARK_BIT == 0 {
318 atomic::fence(Ordering::SeqCst);
319let tail = self.tail.index.load(Ordering::Relaxed);
320321// If the tail equals the head, that means the channel is empty.
322if head >> SHIFT == tail >> SHIFT {
323// If the channel is disconnected...
324if tail & MARK_BIT != 0 {
325// ...then receive an error.
326token.list.block = ptr::null();
327return true;
328 } else {
329// Otherwise, the receive operation is not ready.
330return false;
331 }
332 }
333334// If head and tail are not in the same block, set `MARK_BIT` in head.
335if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
336 new_head |= MARK_BIT;
337 }
338 }
339340// The block can be null here only if the first message is being sent into the channel.
341 // In that case, just wait until it gets initialized.
342if block.is_null() {
343 backoff.snooze();
344 head = self.head.index.load(Ordering::Acquire);
345 block = self.head.block.load(Ordering::Acquire);
346continue;
347 }
348349// Try moving the head index forward.
350match self.head.index.compare_exchange_weak(
351 head,
352 new_head,
353 Ordering::SeqCst,
354 Ordering::Acquire,
355 ) {
356Ok(_) => unsafe {
357// If we've reached the end of the block, move to the next one.
358if offset + 1 == BLOCK_CAP {
359let next = (*block).wait_next();
360let mut next_index = (new_head & !MARK_BIT).wrapping_add(1 << SHIFT);
361if !(*next).next.load(Ordering::Relaxed).is_null() {
362 next_index |= MARK_BIT;
363 }
364365self.head.block.store(next, Ordering::Release);
366self.head.index.store(next_index, Ordering::Release);
367 }
368369 token.list.block = block as *const u8;
370 token.list.offset = offset;
371return true;
372 },
373Err(h) => {
374 head = h;
375 block = self.head.block.load(Ordering::Acquire);
376 backoff.spin();
377 }
378 }
379 }
380 }
381382/// Reads a message from the channel.
383pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
384if token.list.block.is_null() {
385// The channel is disconnected.
386return Err(());
387 }
388389// Read the message.
390let block = token.list.block as *mut Block<T>;
391let offset = token.list.offset;
392let slot = (*block).slots.get_unchecked(offset);
393 slot.wait_write();
394let msg = slot.msg.get().read().assume_init();
395396// Destroy the block if we've reached the end, or if another thread wanted to destroy but
397 // couldn't because we were busy reading from the slot.
398if offset + 1 == BLOCK_CAP {
399 Block::destroy(block, 0);
400 } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
401 Block::destroy(block, offset + 1);
402 }
403404Ok(msg)
405 }
406407/// Attempts to send a message into the channel.
408pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
409self.send(msg, None).map_err(|err| match err {
410 SendTimeoutError::Disconnected(msg) => TrySendError::Disconnected(msg),
411 SendTimeoutError::Timeout(_) => unreachable!(),
412 })
413 }
414415/// Sends a message into the channel.
416pub(crate) fn send(
417&self,
418 msg: T,
419 _deadline: Option<Instant>,
420 ) -> Result<(), SendTimeoutError<T>> {
421let token = &mut Token::default();
422assert!(self.start_send(token));
423unsafe {
424self.write(token, msg)
425 .map_err(SendTimeoutError::Disconnected)
426 }
427 }
428429/// Attempts to receive a message without blocking.
430pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
431let token = &mut Token::default();
432433if self.start_recv(token) {
434unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) }
435 } else {
436Err(TryRecvError::Empty)
437 }
438 }
439440/// Receives a message from the channel.
441pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
442let token = &mut Token::default();
443loop {
444// Try receiving a message several times.
445let backoff = Backoff::new();
446loop {
447if self.start_recv(token) {
448unsafe {
449return self.read(token).map_err(|_| RecvTimeoutError::Disconnected);
450 }
451 }
452453if backoff.is_completed() {
454break;
455 } else {
456 backoff.snooze();
457 }
458 }
459460if let Some(d) = deadline {
461if Instant::now() >= d {
462return Err(RecvTimeoutError::Timeout);
463 }
464 }
465466// Prepare for blocking until a sender wakes us up.
467Context::with(|cx| {
468let oper = Operation::hook(token);
469self.receivers.register(oper, cx);
470471// Has the channel become ready just now?
472if !self.is_empty() || self.is_disconnected() {
473let _ = cx.try_select(Selected::Aborted);
474 }
475476// Block the current thread.
477let sel = cx.wait_until(deadline);
478479match sel {
480 Selected::Waiting => unreachable!(),
481 Selected::Aborted | Selected::Disconnected => {
482self.receivers.unregister(oper).unwrap();
483// If the channel was disconnected, we still have to check for remaining
484 // messages.
485}
486 Selected::Operation(_) => {}
487 }
488 });
489 }
490 }
491492/// Returns the current number of messages inside the channel.
493pub(crate) fn len(&self) -> usize {
494loop {
495// Load the tail index, then load the head index.
496let mut tail = self.tail.index.load(Ordering::SeqCst);
497let mut head = self.head.index.load(Ordering::SeqCst);
498499// If the tail index didn't change, we've got consistent indices to work with.
500if self.tail.index.load(Ordering::SeqCst) == tail {
501// Erase the lower bits.
502tail &= !((1 << SHIFT) - 1);
503 head &= !((1 << SHIFT) - 1);
504505// Fix up indices if they fall onto block ends.
506if (tail >> SHIFT) & (LAP - 1) == LAP - 1 {
507 tail = tail.wrapping_add(1 << SHIFT);
508 }
509if (head >> SHIFT) & (LAP - 1) == LAP - 1 {
510 head = head.wrapping_add(1 << SHIFT);
511 }
512513// Rotate indices so that head falls into the first block.
514let lap = (head >> SHIFT) / LAP;
515 tail = tail.wrapping_sub((lap * LAP) << SHIFT);
516 head = head.wrapping_sub((lap * LAP) << SHIFT);
517518// Remove the lower bits.
519tail >>= SHIFT;
520 head >>= SHIFT;
521522// Return the difference minus the number of blocks between tail and head.
523return tail - head - tail / LAP;
524 }
525 }
526 }
527528/// Returns the capacity of the channel.
529pub(crate) fn capacity(&self) -> Option<usize> {
530None
531}
532533/// Disconnects senders and wakes up all blocked receivers.
534 ///
535 /// Returns `true` if this call disconnected the channel.
536pub(crate) fn disconnect_senders(&self) -> bool {
537let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst);
538539if tail & MARK_BIT == 0 {
540self.receivers.disconnect();
541true
542} else {
543false
544}
545 }
546547/// Disconnects receivers.
548 ///
549 /// Returns `true` if this call disconnected the channel.
550pub(crate) fn disconnect_receivers(&self) -> bool {
551let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst);
552553if tail & MARK_BIT == 0 {
554// If receivers are dropped first, discard all messages to free
555 // memory eagerly.
556self.discard_all_messages();
557true
558} else {
559false
560}
561 }
562563/// Discards all messages.
564 ///
565 /// This method should only be called when all receivers are dropped.
566fn discard_all_messages(&self) {
567let backoff = Backoff::new();
568let mut tail = self.tail.index.load(Ordering::Acquire);
569loop {
570let offset = (tail >> SHIFT) % LAP;
571if offset != BLOCK_CAP {
572break;
573 }
574575// New updates to tail will be rejected by MARK_BIT and aborted unless it's
576 // at boundary. We need to wait for the updates take affect otherwise there
577 // can be memory leaks.
578backoff.snooze();
579 tail = self.tail.index.load(Ordering::Acquire);
580 }
581582let mut head = self.head.index.load(Ordering::Acquire);
583let mut block = self.head.block.load(Ordering::Acquire);
584585unsafe {
586// Drop all messages between head and tail and deallocate the heap-allocated blocks.
587while head >> SHIFT != tail >> SHIFT {
588let offset = (head >> SHIFT) % LAP;
589590if offset < BLOCK_CAP {
591// Drop the message in the slot.
592let slot = (*block).slots.get_unchecked(offset);
593 slot.wait_write();
594let p = &mut *slot.msg.get();
595 p.as_mut_ptr().drop_in_place();
596 } else {
597 (*block).wait_next();
598// Deallocate the block and move to the next one.
599let next = (*block).next.load(Ordering::Acquire);
600 drop(Box::from_raw(block));
601 block = next;
602 }
603604 head = head.wrapping_add(1 << SHIFT);
605 }
606607// Deallocate the last remaining block.
608if !block.is_null() {
609 drop(Box::from_raw(block));
610 }
611 }
612 head &= !MARK_BIT;
613self.head.block.store(ptr::null_mut(), Ordering::Release);
614self.head.index.store(head, Ordering::Release);
615 }
616617/// Returns `true` if the channel is disconnected.
618pub(crate) fn is_disconnected(&self) -> bool {
619self.tail.index.load(Ordering::SeqCst) & MARK_BIT != 0
620}
621622/// Returns `true` if the channel is empty.
623pub(crate) fn is_empty(&self) -> bool {
624let head = self.head.index.load(Ordering::SeqCst);
625let tail = self.tail.index.load(Ordering::SeqCst);
626 head >> SHIFT == tail >> SHIFT
627 }
628629/// Returns `true` if the channel is full.
630pub(crate) fn is_full(&self) -> bool {
631false
632}
633}
634635impl<T> Drop for Channel<T> {
636fn drop(&mut self) {
637let mut head = self.head.index.load(Ordering::Relaxed);
638let mut tail = self.tail.index.load(Ordering::Relaxed);
639let mut block = self.head.block.load(Ordering::Relaxed);
640641// Erase the lower bits.
642head &= !((1 << SHIFT) - 1);
643 tail &= !((1 << SHIFT) - 1);
644645unsafe {
646// Drop all messages between head and tail and deallocate the heap-allocated blocks.
647while head != tail {
648let offset = (head >> SHIFT) % LAP;
649650if offset < BLOCK_CAP {
651// Drop the message in the slot.
652let slot = (*block).slots.get_unchecked(offset);
653let p = &mut *slot.msg.get();
654 p.as_mut_ptr().drop_in_place();
655 } else {
656// Deallocate the block and move to the next one.
657let next = (*block).next.load(Ordering::Relaxed);
658 drop(Box::from_raw(block));
659 block = next;
660 }
661662 head = head.wrapping_add(1 << SHIFT);
663 }
664665// Deallocate the last remaining block.
666if !block.is_null() {
667 drop(Box::from_raw(block));
668 }
669 }
670 }
671}
672673/// Receiver handle to a channel.
674pub(crate) struct Receiver<'a, T>(&'a Channel<T>);
675676/// Sender handle to a channel.
677pub(crate) struct Sender<'a, T>(&'a Channel<T>);
678679impl<T> SelectHandle for Receiver<'_, T> {
680fn try_select(&self, token: &mut Token) -> bool {
681self.0.start_recv(token)
682 }
683684fn deadline(&self) -> Option<Instant> {
685None
686}
687688fn register(&self, oper: Operation, cx: &Context) -> bool {
689self.0.receivers.register(oper, cx);
690self.is_ready()
691 }
692693fn unregister(&self, oper: Operation) {
694self.0.receivers.unregister(oper);
695 }
696697fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
698self.try_select(token)
699 }
700701fn is_ready(&self) -> bool {
702 !self.0.is_empty() || self.0.is_disconnected()
703 }
704705fn watch(&self, oper: Operation, cx: &Context) -> bool {
706self.0.receivers.watch(oper, cx);
707self.is_ready()
708 }
709710fn unwatch(&self, oper: Operation) {
711self.0.receivers.unwatch(oper);
712 }
713}
714715impl<T> SelectHandle for Sender<'_, T> {
716fn try_select(&self, token: &mut Token) -> bool {
717self.0.start_send(token)
718 }
719720fn deadline(&self) -> Option<Instant> {
721None
722}
723724fn register(&self, _oper: Operation, _cx: &Context) -> bool {
725self.is_ready()
726 }
727728fn unregister(&self, _oper: Operation) {}
729730fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
731self.try_select(token)
732 }
733734fn is_ready(&self) -> bool {
735true
736}
737738fn watch(&self, _oper: Operation, _cx: &Context) -> bool {
739self.is_ready()
740 }
741742fn unwatch(&self, _oper: Operation) {}
743}