crossbeam_channel/
context.rs
1use std::cell::Cell;
4use std::sync::atomic::{AtomicUsize, Ordering};
5use std::sync::Arc;
6use std::thread::{self, Thread, ThreadId};
7use std::time::Instant;
8
9use crossbeam_utils::Backoff;
10
11use crate::select::Selected;
12
13#[derive(Debug, Clone)]
15pub struct Context {
16 inner: Arc<Inner>,
17}
18
19#[derive(Debug)]
21struct Inner {
22 select: AtomicUsize,
24
25 packet: AtomicUsize,
27
28 thread: Thread,
30
31 thread_id: ThreadId,
33}
34
35impl Context {
36 #[inline]
38 pub fn with<F, R>(f: F) -> R
39 where
40 F: FnOnce(&Context) -> R,
41 {
42 thread_local! {
43 static CONTEXT: Cell<Option<Context>> = Cell::new(Some(Context::new()));
45 }
46
47 let mut f = Some(f);
48 let mut f = move |cx: &Context| -> R {
49 let f = f.take().unwrap();
50 f(cx)
51 };
52
53 CONTEXT
54 .try_with(|cell| match cell.take() {
55 None => f(&Context::new()),
56 Some(cx) => {
57 cx.reset();
58 let res = f(&cx);
59 cell.set(Some(cx));
60 res
61 }
62 })
63 .unwrap_or_else(|_| f(&Context::new()))
64 }
65
66 #[cold]
68 fn new() -> Context {
69 Context {
70 inner: Arc::new(Inner {
71 select: AtomicUsize::new(Selected::Waiting.into()),
72 packet: AtomicUsize::new(0),
73 thread: thread::current(),
74 thread_id: thread::current().id(),
75 }),
76 }
77 }
78
79 #[inline]
81 fn reset(&self) {
82 self.inner
83 .select
84 .store(Selected::Waiting.into(), Ordering::Release);
85 self.inner.packet.store(0, Ordering::Release);
86 }
87
88 #[inline]
92 pub fn try_select(&self, select: Selected) -> Result<(), Selected> {
93 self.inner
94 .select
95 .compare_exchange(
96 Selected::Waiting.into(),
97 select.into(),
98 Ordering::AcqRel,
99 Ordering::Acquire,
100 )
101 .map(|_| ())
102 .map_err(|e| e.into())
103 }
104
105 #[inline]
107 pub fn selected(&self) -> Selected {
108 Selected::from(self.inner.select.load(Ordering::Acquire))
109 }
110
111 #[inline]
115 pub fn store_packet(&self, packet: usize) {
116 if packet != 0 {
117 self.inner.packet.store(packet, Ordering::Release);
118 }
119 }
120
121 #[inline]
123 pub fn wait_packet(&self) -> usize {
124 let backoff = Backoff::new();
125 loop {
126 let packet = self.inner.packet.load(Ordering::Acquire);
127 if packet != 0 {
128 return packet;
129 }
130 backoff.snooze();
131 }
132 }
133
134 #[inline]
138 pub fn wait_until(&self, deadline: Option<Instant>) -> Selected {
139 let backoff = Backoff::new();
141 loop {
142 let sel = Selected::from(self.inner.select.load(Ordering::Acquire));
143 if sel != Selected::Waiting {
144 return sel;
145 }
146
147 if backoff.is_completed() {
148 break;
149 } else {
150 backoff.snooze();
151 }
152 }
153
154 loop {
155 let sel = Selected::from(self.inner.select.load(Ordering::Acquire));
157 if sel != Selected::Waiting {
158 return sel;
159 }
160
161 if let Some(end) = deadline {
163 let now = Instant::now();
164
165 if now < end {
166 thread::park_timeout(end - now);
167 } else {
168 return match self.try_select(Selected::Aborted) {
170 Ok(()) => Selected::Aborted,
171 Err(s) => s,
172 };
173 }
174 } else {
175 thread::park();
176 }
177 }
178 }
179
180 #[inline]
182 pub fn unpark(&self) {
183 self.inner.thread.unpark();
184 }
185
186 #[inline]
188 pub fn thread_id(&self) -> ThreadId {
189 self.inner.thread_id
190 }
191}