crossbeam_channel/context.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191
//! Thread-local context used in select.
use std::cell::Cell;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread::{self, Thread, ThreadId};
use std::time::Instant;
use crossbeam_utils::Backoff;
use crate::select::Selected;
/// Thread-local context used in select.
#[derive(Debug, Clone)]
pub struct Context {
inner: Arc<Inner>,
}
/// Inner representation of `Context`.
#[derive(Debug)]
struct Inner {
/// Selected operation.
select: AtomicUsize,
/// A slot into which another thread may store a pointer to its `Packet`.
packet: AtomicUsize,
/// Thread handle.
thread: Thread,
/// Thread id.
thread_id: ThreadId,
}
impl Context {
/// Creates a new context for the duration of the closure.
#[inline]
pub fn with<F, R>(f: F) -> R
where
F: FnOnce(&Context) -> R,
{
thread_local! {
/// Cached thread-local context.
static CONTEXT: Cell<Option<Context>> = Cell::new(Some(Context::new()));
}
let mut f = Some(f);
let mut f = move |cx: &Context| -> R {
let f = f.take().unwrap();
f(cx)
};
CONTEXT
.try_with(|cell| match cell.take() {
None => f(&Context::new()),
Some(cx) => {
cx.reset();
let res = f(&cx);
cell.set(Some(cx));
res
}
})
.unwrap_or_else(|_| f(&Context::new()))
}
/// Creates a new `Context`.
#[cold]
fn new() -> Context {
Context {
inner: Arc::new(Inner {
select: AtomicUsize::new(Selected::Waiting.into()),
packet: AtomicUsize::new(0),
thread: thread::current(),
thread_id: thread::current().id(),
}),
}
}
/// Resets `select` and `packet`.
#[inline]
fn reset(&self) {
self.inner
.select
.store(Selected::Waiting.into(), Ordering::Release);
self.inner.packet.store(0, Ordering::Release);
}
/// Attempts to select an operation.
///
/// On failure, the previously selected operation is returned.
#[inline]
pub fn try_select(&self, select: Selected) -> Result<(), Selected> {
self.inner
.select
.compare_exchange(
Selected::Waiting.into(),
select.into(),
Ordering::AcqRel,
Ordering::Acquire,
)
.map(|_| ())
.map_err(|e| e.into())
}
/// Returns the selected operation.
#[inline]
pub fn selected(&self) -> Selected {
Selected::from(self.inner.select.load(Ordering::Acquire))
}
/// Stores a packet.
///
/// This method must be called after `try_select` succeeds and there is a packet to provide.
#[inline]
pub fn store_packet(&self, packet: usize) {
if packet != 0 {
self.inner.packet.store(packet, Ordering::Release);
}
}
/// Waits until a packet is provided and returns it.
#[inline]
pub fn wait_packet(&self) -> usize {
let backoff = Backoff::new();
loop {
let packet = self.inner.packet.load(Ordering::Acquire);
if packet != 0 {
return packet;
}
backoff.snooze();
}
}
/// Waits until an operation is selected and returns it.
///
/// If the deadline is reached, `Selected::Aborted` will be selected.
#[inline]
pub fn wait_until(&self, deadline: Option<Instant>) -> Selected {
// Spin for a short time, waiting until an operation is selected.
let backoff = Backoff::new();
loop {
let sel = Selected::from(self.inner.select.load(Ordering::Acquire));
if sel != Selected::Waiting {
return sel;
}
if backoff.is_completed() {
break;
} else {
backoff.snooze();
}
}
loop {
// Check whether an operation has been selected.
let sel = Selected::from(self.inner.select.load(Ordering::Acquire));
if sel != Selected::Waiting {
return sel;
}
// If there's a deadline, park the current thread until the deadline is reached.
if let Some(end) = deadline {
let now = Instant::now();
if now < end {
thread::park_timeout(end - now);
} else {
// The deadline has been reached. Try aborting select.
return match self.try_select(Selected::Aborted) {
Ok(()) => Selected::Aborted,
Err(s) => s,
};
}
} else {
thread::park();
}
}
}
/// Unparks the thread this context belongs to.
#[inline]
pub fn unpark(&self) {
self.inner.thread.unpark();
}
/// Returns the id of the thread this context belongs to.
#[inline]
pub fn thread_id(&self) -> ThreadId {
self.inner.thread_id
}
}