1//! Bounded channel based on a preallocated array.
2//!
3//! This flavor has a fixed, positive capacity.
4//!
5//! The implementation is based on Dmitry Vyukov's bounded MPMC queue.
6//!
7//! Source:
8//! - <http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue>
9//! - <https://docs.google.com/document/d/1yIAYmbvL3JxOKOjuCyon7JhW4cSv1wy5hC0ApeGMV9s/pub>
1011use std::cell::UnsafeCell;
12use std::marker::PhantomData;
13use std::mem::{self, MaybeUninit};
14use std::ptr;
15use std::sync::atomic::{self, AtomicUsize, Ordering};
16use std::time::Instant;
1718use crossbeam_utils::{Backoff, CachePadded};
1920use crate::context::Context;
21use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError};
22use crate::select::{Operation, SelectHandle, Selected, Token};
23use crate::waker::SyncWaker;
2425/// A slot in a channel.
26struct Slot<T> {
27/// The current stamp.
28stamp: AtomicUsize,
2930/// The message in this slot.
31msg: UnsafeCell<MaybeUninit<T>>,
32}
3334/// The token type for the array flavor.
35#[derive(Debug)]
36pub struct ArrayToken {
37/// Slot to read from or write to.
38slot: *const u8,
3940/// Stamp to store into the slot after reading or writing.
41stamp: usize,
42}
4344impl Default for ArrayToken {
45#[inline]
46fn default() -> Self {
47 ArrayToken {
48 slot: ptr::null(),
49 stamp: 0,
50 }
51 }
52}
5354/// Bounded channel based on a preallocated array.
55pub(crate) struct Channel<T> {
56/// The head of the channel.
57 ///
58 /// This value is a "stamp" consisting of an index into the buffer, a mark bit, and a lap, but
59 /// packed into a single `usize`. The lower bits represent the index, while the upper bits
60 /// represent the lap. The mark bit in the head is always zero.
61 ///
62 /// Messages are popped from the head of the channel.
63head: CachePadded<AtomicUsize>,
6465/// The tail of the channel.
66 ///
67 /// This value is a "stamp" consisting of an index into the buffer, a mark bit, and a lap, but
68 /// packed into a single `usize`. The lower bits represent the index, while the upper bits
69 /// represent the lap. The mark bit indicates that the channel is disconnected.
70 ///
71 /// Messages are pushed into the tail of the channel.
72tail: CachePadded<AtomicUsize>,
7374/// The buffer holding slots.
75buffer: *mut Slot<T>,
7677/// The channel capacity.
78cap: usize,
7980/// A stamp with the value of `{ lap: 1, mark: 0, index: 0 }`.
81one_lap: usize,
8283/// If this bit is set in the tail, that means the channel is disconnected.
84mark_bit: usize,
8586/// Senders waiting while the channel is full.
87senders: SyncWaker,
8889/// Receivers waiting while the channel is empty and not disconnected.
90receivers: SyncWaker,
9192/// Indicates that dropping a `Channel<T>` may drop values of type `T`.
93_marker: PhantomData<T>,
94}
9596impl<T> Channel<T> {
97/// Creates a bounded channel of capacity `cap`.
98pub(crate) fn with_capacity(cap: usize) -> Self {
99assert!(cap > 0, "capacity must be positive");
100101// Compute constants `mark_bit` and `one_lap`.
102let mark_bit = (cap + 1).next_power_of_two();
103let one_lap = mark_bit * 2;
104105// Head is initialized to `{ lap: 0, mark: 0, index: 0 }`.
106let head = 0;
107// Tail is initialized to `{ lap: 0, mark: 0, index: 0 }`.
108let tail = 0;
109110// Allocate a buffer of `cap` slots initialized
111 // with stamps.
112let buffer = {
113let mut boxed: Box<[Slot<T>]> = (0..cap)
114 .map(|i| {
115// Set the stamp to `{ lap: 0, mark: 0, index: i }`.
116Slot {
117 stamp: AtomicUsize::new(i),
118 msg: UnsafeCell::new(MaybeUninit::uninit()),
119 }
120 })
121 .collect();
122let ptr = boxed.as_mut_ptr();
123 mem::forget(boxed);
124 ptr
125 };
126127 Channel {
128 buffer,
129 cap,
130 one_lap,
131 mark_bit,
132 head: CachePadded::new(AtomicUsize::new(head)),
133 tail: CachePadded::new(AtomicUsize::new(tail)),
134 senders: SyncWaker::new(),
135 receivers: SyncWaker::new(),
136 _marker: PhantomData,
137 }
138 }
139140/// Returns a receiver handle to the channel.
141pub(crate) fn receiver(&self) -> Receiver<'_, T> {
142 Receiver(self)
143 }
144145/// Returns a sender handle to the channel.
146pub(crate) fn sender(&self) -> Sender<'_, T> {
147 Sender(self)
148 }
149150/// Attempts to reserve a slot for sending a message.
151fn start_send(&self, token: &mut Token) -> bool {
152let backoff = Backoff::new();
153let mut tail = self.tail.load(Ordering::Relaxed);
154155loop {
156// Check if the channel is disconnected.
157if tail & self.mark_bit != 0 {
158 token.array.slot = ptr::null();
159 token.array.stamp = 0;
160return true;
161 }
162163// Deconstruct the tail.
164let index = tail & (self.mark_bit - 1);
165let lap = tail & !(self.one_lap - 1);
166167// Inspect the corresponding slot.
168let slot = unsafe { &*self.buffer.add(index) };
169let stamp = slot.stamp.load(Ordering::Acquire);
170171// If the tail and the stamp match, we may attempt to push.
172if tail == stamp {
173let new_tail = if index + 1 < self.cap {
174// Same lap, incremented index.
175 // Set to `{ lap: lap, mark: 0, index: index + 1 }`.
176tail + 1
177} else {
178// One lap forward, index wraps around to zero.
179 // Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`.
180lap.wrapping_add(self.one_lap)
181 };
182183// Try moving the tail.
184match self.tail.compare_exchange_weak(
185 tail,
186 new_tail,
187 Ordering::SeqCst,
188 Ordering::Relaxed,
189 ) {
190Ok(_) => {
191// Prepare the token for the follow-up call to `write`.
192token.array.slot = slot as *const Slot<T> as *const u8;
193 token.array.stamp = tail + 1;
194return true;
195 }
196Err(t) => {
197 tail = t;
198 backoff.spin();
199 }
200 }
201 } else if stamp.wrapping_add(self.one_lap) == tail + 1 {
202 atomic::fence(Ordering::SeqCst);
203let head = self.head.load(Ordering::Relaxed);
204205// If the head lags one lap behind the tail as well...
206if head.wrapping_add(self.one_lap) == tail {
207// ...then the channel is full.
208return false;
209 }
210211 backoff.spin();
212 tail = self.tail.load(Ordering::Relaxed);
213 } else {
214// Snooze because we need to wait for the stamp to get updated.
215backoff.snooze();
216 tail = self.tail.load(Ordering::Relaxed);
217 }
218 }
219 }
220221/// Writes a message into the channel.
222pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
223// If there is no slot, the channel is disconnected.
224if token.array.slot.is_null() {
225return Err(msg);
226 }
227228let slot: &Slot<T> = &*(token.array.slot as *const Slot<T>);
229230// Write the message into the slot and update the stamp.
231slot.msg.get().write(MaybeUninit::new(msg));
232 slot.stamp.store(token.array.stamp, Ordering::Release);
233234// Wake a sleeping receiver.
235self.receivers.notify();
236Ok(())
237 }
238239/// Attempts to reserve a slot for receiving a message.
240fn start_recv(&self, token: &mut Token) -> bool {
241let backoff = Backoff::new();
242let mut head = self.head.load(Ordering::Relaxed);
243244loop {
245// Deconstruct the head.
246let index = head & (self.mark_bit - 1);
247let lap = head & !(self.one_lap - 1);
248249// Inspect the corresponding slot.
250let slot = unsafe { &*self.buffer.add(index) };
251let stamp = slot.stamp.load(Ordering::Acquire);
252253// If the the stamp is ahead of the head by 1, we may attempt to pop.
254if head + 1 == stamp {
255let new = if index + 1 < self.cap {
256// Same lap, incremented index.
257 // Set to `{ lap: lap, mark: 0, index: index + 1 }`.
258head + 1
259} else {
260// One lap forward, index wraps around to zero.
261 // Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`.
262lap.wrapping_add(self.one_lap)
263 };
264265// Try moving the head.
266match self.head.compare_exchange_weak(
267 head,
268 new,
269 Ordering::SeqCst,
270 Ordering::Relaxed,
271 ) {
272Ok(_) => {
273// Prepare the token for the follow-up call to `read`.
274token.array.slot = slot as *const Slot<T> as *const u8;
275 token.array.stamp = head.wrapping_add(self.one_lap);
276return true;
277 }
278Err(h) => {
279 head = h;
280 backoff.spin();
281 }
282 }
283 } else if stamp == head {
284 atomic::fence(Ordering::SeqCst);
285let tail = self.tail.load(Ordering::Relaxed);
286287// If the tail equals the head, that means the channel is empty.
288if (tail & !self.mark_bit) == head {
289// If the channel is disconnected...
290if tail & self.mark_bit != 0 {
291// ...then receive an error.
292token.array.slot = ptr::null();
293 token.array.stamp = 0;
294return true;
295 } else {
296// Otherwise, the receive operation is not ready.
297return false;
298 }
299 }
300301 backoff.spin();
302 head = self.head.load(Ordering::Relaxed);
303 } else {
304// Snooze because we need to wait for the stamp to get updated.
305backoff.snooze();
306 head = self.head.load(Ordering::Relaxed);
307 }
308 }
309 }
310311/// Reads a message from the channel.
312pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
313if token.array.slot.is_null() {
314// The channel is disconnected.
315return Err(());
316 }
317318let slot: &Slot<T> = &*(token.array.slot as *const Slot<T>);
319320// Read the message from the slot and update the stamp.
321let msg = slot.msg.get().read().assume_init();
322 slot.stamp.store(token.array.stamp, Ordering::Release);
323324// Wake a sleeping sender.
325self.senders.notify();
326Ok(msg)
327 }
328329/// Attempts to send a message into the channel.
330pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
331let token = &mut Token::default();
332if self.start_send(token) {
333unsafe { self.write(token, msg).map_err(TrySendError::Disconnected) }
334 } else {
335Err(TrySendError::Full(msg))
336 }
337 }
338339/// Sends a message into the channel.
340pub(crate) fn send(
341&self,
342 msg: T,
343 deadline: Option<Instant>,
344 ) -> Result<(), SendTimeoutError<T>> {
345let token = &mut Token::default();
346loop {
347// Try sending a message several times.
348let backoff = Backoff::new();
349loop {
350if self.start_send(token) {
351let res = unsafe { self.write(token, msg) };
352return res.map_err(SendTimeoutError::Disconnected);
353 }
354355if backoff.is_completed() {
356break;
357 } else {
358 backoff.snooze();
359 }
360 }
361362if let Some(d) = deadline {
363if Instant::now() >= d {
364return Err(SendTimeoutError::Timeout(msg));
365 }
366 }
367368 Context::with(|cx| {
369// Prepare for blocking until a receiver wakes us up.
370let oper = Operation::hook(token);
371self.senders.register(oper, cx);
372373// Has the channel become ready just now?
374if !self.is_full() || self.is_disconnected() {
375let _ = cx.try_select(Selected::Aborted);
376 }
377378// Block the current thread.
379let sel = cx.wait_until(deadline);
380381match sel {
382 Selected::Waiting => unreachable!(),
383 Selected::Aborted | Selected::Disconnected => {
384self.senders.unregister(oper).unwrap();
385 }
386 Selected::Operation(_) => {}
387 }
388 });
389 }
390 }
391392/// Attempts to receive a message without blocking.
393pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
394let token = &mut Token::default();
395396if self.start_recv(token) {
397unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) }
398 } else {
399Err(TryRecvError::Empty)
400 }
401 }
402403/// Receives a message from the channel.
404pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
405let token = &mut Token::default();
406loop {
407// Try receiving a message several times.
408let backoff = Backoff::new();
409loop {
410if self.start_recv(token) {
411let res = unsafe { self.read(token) };
412return res.map_err(|_| RecvTimeoutError::Disconnected);
413 }
414415if backoff.is_completed() {
416break;
417 } else {
418 backoff.snooze();
419 }
420 }
421422if let Some(d) = deadline {
423if Instant::now() >= d {
424return Err(RecvTimeoutError::Timeout);
425 }
426 }
427428 Context::with(|cx| {
429// Prepare for blocking until a sender wakes us up.
430let oper = Operation::hook(token);
431self.receivers.register(oper, cx);
432433// Has the channel become ready just now?
434if !self.is_empty() || self.is_disconnected() {
435let _ = cx.try_select(Selected::Aborted);
436 }
437438// Block the current thread.
439let sel = cx.wait_until(deadline);
440441match sel {
442 Selected::Waiting => unreachable!(),
443 Selected::Aborted | Selected::Disconnected => {
444self.receivers.unregister(oper).unwrap();
445// If the channel was disconnected, we still have to check for remaining
446 // messages.
447}
448 Selected::Operation(_) => {}
449 }
450 });
451 }
452 }
453454/// Returns the current number of messages inside the channel.
455pub(crate) fn len(&self) -> usize {
456loop {
457// Load the tail, then load the head.
458let tail = self.tail.load(Ordering::SeqCst);
459let head = self.head.load(Ordering::SeqCst);
460461// If the tail didn't change, we've got consistent values to work with.
462if self.tail.load(Ordering::SeqCst) == tail {
463let hix = head & (self.mark_bit - 1);
464let tix = tail & (self.mark_bit - 1);
465466return if hix < tix {
467 tix - hix
468 } else if hix > tix {
469self.cap - hix + tix
470 } else if (tail & !self.mark_bit) == head {
4710
472} else {
473self.cap
474 };
475 }
476 }
477 }
478479/// Returns the capacity of the channel.
480#[allow(clippy::unnecessary_wraps)] // This is intentional.
481pub(crate) fn capacity(&self) -> Option<usize> {
482Some(self.cap)
483 }
484485/// Disconnects the channel and wakes up all blocked senders and receivers.
486 ///
487 /// Returns `true` if this call disconnected the channel.
488pub(crate) fn disconnect(&self) -> bool {
489let tail = self.tail.fetch_or(self.mark_bit, Ordering::SeqCst);
490491if tail & self.mark_bit == 0 {
492self.senders.disconnect();
493self.receivers.disconnect();
494true
495} else {
496false
497}
498 }
499500/// Returns `true` if the channel is disconnected.
501pub(crate) fn is_disconnected(&self) -> bool {
502self.tail.load(Ordering::SeqCst) & self.mark_bit != 0
503}
504505/// Returns `true` if the channel is empty.
506pub(crate) fn is_empty(&self) -> bool {
507let head = self.head.load(Ordering::SeqCst);
508let tail = self.tail.load(Ordering::SeqCst);
509510// Is the tail equal to the head?
511 //
512 // Note: If the head changes just before we load the tail, that means there was a moment
513 // when the channel was not empty, so it is safe to just return `false`.
514(tail & !self.mark_bit) == head
515 }
516517/// Returns `true` if the channel is full.
518pub(crate) fn is_full(&self) -> bool {
519let tail = self.tail.load(Ordering::SeqCst);
520let head = self.head.load(Ordering::SeqCst);
521522// Is the head lagging one lap behind tail?
523 //
524 // Note: If the tail changes just before we load the head, that means there was a moment
525 // when the channel was not full, so it is safe to just return `false`.
526head.wrapping_add(self.one_lap) == tail & !self.mark_bit
527 }
528}
529530impl<T> Drop for Channel<T> {
531fn drop(&mut self) {
532// Get the index of the head.
533let hix = self.head.load(Ordering::Relaxed) & (self.mark_bit - 1);
534535// Loop over all slots that hold a message and drop them.
536for i in 0..self.len() {
537// Compute the index of the next slot holding a message.
538let index = if hix + i < self.cap {
539 hix + i
540 } else {
541 hix + i - self.cap
542 };
543544unsafe {
545let p = {
546let slot = &mut *self.buffer.add(index);
547let msg = &mut *slot.msg.get();
548 msg.as_mut_ptr()
549 };
550 p.drop_in_place();
551 }
552 }
553554// Finally, deallocate the buffer, but don't run any destructors.
555unsafe {
556// Create a slice from the buffer to make
557 // a fat pointer. Then, use Box::from_raw
558 // to deallocate it.
559let ptr = std::slice::from_raw_parts_mut(self.buffer, self.cap) as *mut [Slot<T>];
560 Box::from_raw(ptr);
561 }
562 }
563}
564565/// Receiver handle to a channel.
566pub(crate) struct Receiver<'a, T>(&'a Channel<T>);
567568/// Sender handle to a channel.
569pub(crate) struct Sender<'a, T>(&'a Channel<T>);
570571impl<T> SelectHandle for Receiver<'_, T> {
572fn try_select(&self, token: &mut Token) -> bool {
573self.0.start_recv(token)
574 }
575576fn deadline(&self) -> Option<Instant> {
577None
578}
579580fn register(&self, oper: Operation, cx: &Context) -> bool {
581self.0.receivers.register(oper, cx);
582self.is_ready()
583 }
584585fn unregister(&self, oper: Operation) {
586self.0.receivers.unregister(oper);
587 }
588589fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
590self.try_select(token)
591 }
592593fn is_ready(&self) -> bool {
594 !self.0.is_empty() || self.0.is_disconnected()
595 }
596597fn watch(&self, oper: Operation, cx: &Context) -> bool {
598self.0.receivers.watch(oper, cx);
599self.is_ready()
600 }
601602fn unwatch(&self, oper: Operation) {
603self.0.receivers.unwatch(oper);
604 }
605}
606607impl<T> SelectHandle for Sender<'_, T> {
608fn try_select(&self, token: &mut Token) -> bool {
609self.0.start_send(token)
610 }
611612fn deadline(&self) -> Option<Instant> {
613None
614}
615616fn register(&self, oper: Operation, cx: &Context) -> bool {
617self.0.senders.register(oper, cx);
618self.is_ready()
619 }
620621fn unregister(&self, oper: Operation) {
622self.0.senders.unregister(oper);
623 }
624625fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
626self.try_select(token)
627 }
628629fn is_ready(&self) -> bool {
630 !self.0.is_full() || self.0.is_disconnected()
631 }
632633fn watch(&self, oper: Operation, cx: &Context) -> bool {
634self.0.senders.watch(oper, cx);
635self.is_ready()
636 }
637638fn unwatch(&self, oper: Operation) {
639self.0.senders.unwatch(oper);
640 }
641}