crossbeam_channel/
context.rs

1//! Thread-local context used in select.
2
3use std::cell::Cell;
4use std::sync::atomic::{AtomicUsize, Ordering};
5use std::sync::Arc;
6use std::thread::{self, Thread, ThreadId};
7use std::time::Instant;
8
9use crossbeam_utils::Backoff;
10
11use crate::select::Selected;
12
13/// Thread-local context used in select.
14#[derive(Debug, Clone)]
15pub struct Context {
16    inner: Arc<Inner>,
17}
18
19/// Inner representation of `Context`.
20#[derive(Debug)]
21struct Inner {
22    /// Selected operation.
23    select: AtomicUsize,
24
25    /// A slot into which another thread may store a pointer to its `Packet`.
26    packet: AtomicUsize,
27
28    /// Thread handle.
29    thread: Thread,
30
31    /// Thread id.
32    thread_id: ThreadId,
33}
34
35impl Context {
36    /// Creates a new context for the duration of the closure.
37    #[inline]
38    pub fn with<F, R>(f: F) -> R
39    where
40        F: FnOnce(&Context) -> R,
41    {
42        thread_local! {
43            /// Cached thread-local context.
44            static CONTEXT: Cell<Option<Context>> = Cell::new(Some(Context::new()));
45        }
46
47        let mut f = Some(f);
48        let mut f = move |cx: &Context| -> R {
49            let f = f.take().unwrap();
50            f(cx)
51        };
52
53        CONTEXT
54            .try_with(|cell| match cell.take() {
55                None => f(&Context::new()),
56                Some(cx) => {
57                    cx.reset();
58                    let res = f(&cx);
59                    cell.set(Some(cx));
60                    res
61                }
62            })
63            .unwrap_or_else(|_| f(&Context::new()))
64    }
65
66    /// Creates a new `Context`.
67    #[cold]
68    fn new() -> Context {
69        Context {
70            inner: Arc::new(Inner {
71                select: AtomicUsize::new(Selected::Waiting.into()),
72                packet: AtomicUsize::new(0),
73                thread: thread::current(),
74                thread_id: thread::current().id(),
75            }),
76        }
77    }
78
79    /// Resets `select` and `packet`.
80    #[inline]
81    fn reset(&self) {
82        self.inner
83            .select
84            .store(Selected::Waiting.into(), Ordering::Release);
85        self.inner.packet.store(0, Ordering::Release);
86    }
87
88    /// Attempts to select an operation.
89    ///
90    /// On failure, the previously selected operation is returned.
91    #[inline]
92    pub fn try_select(&self, select: Selected) -> Result<(), Selected> {
93        self.inner
94            .select
95            .compare_exchange(
96                Selected::Waiting.into(),
97                select.into(),
98                Ordering::AcqRel,
99                Ordering::Acquire,
100            )
101            .map(|_| ())
102            .map_err(|e| e.into())
103    }
104
105    /// Returns the selected operation.
106    #[inline]
107    pub fn selected(&self) -> Selected {
108        Selected::from(self.inner.select.load(Ordering::Acquire))
109    }
110
111    /// Stores a packet.
112    ///
113    /// This method must be called after `try_select` succeeds and there is a packet to provide.
114    #[inline]
115    pub fn store_packet(&self, packet: usize) {
116        if packet != 0 {
117            self.inner.packet.store(packet, Ordering::Release);
118        }
119    }
120
121    /// Waits until a packet is provided and returns it.
122    #[inline]
123    pub fn wait_packet(&self) -> usize {
124        let backoff = Backoff::new();
125        loop {
126            let packet = self.inner.packet.load(Ordering::Acquire);
127            if packet != 0 {
128                return packet;
129            }
130            backoff.snooze();
131        }
132    }
133
134    /// Waits until an operation is selected and returns it.
135    ///
136    /// If the deadline is reached, `Selected::Aborted` will be selected.
137    #[inline]
138    pub fn wait_until(&self, deadline: Option<Instant>) -> Selected {
139        // Spin for a short time, waiting until an operation is selected.
140        let backoff = Backoff::new();
141        loop {
142            let sel = Selected::from(self.inner.select.load(Ordering::Acquire));
143            if sel != Selected::Waiting {
144                return sel;
145            }
146
147            if backoff.is_completed() {
148                break;
149            } else {
150                backoff.snooze();
151            }
152        }
153
154        loop {
155            // Check whether an operation has been selected.
156            let sel = Selected::from(self.inner.select.load(Ordering::Acquire));
157            if sel != Selected::Waiting {
158                return sel;
159            }
160
161            // If there's a deadline, park the current thread until the deadline is reached.
162            if let Some(end) = deadline {
163                let now = Instant::now();
164
165                if now < end {
166                    thread::park_timeout(end - now);
167                } else {
168                    // The deadline has been reached. Try aborting select.
169                    return match self.try_select(Selected::Aborted) {
170                        Ok(()) => Selected::Aborted,
171                        Err(s) => s,
172                    };
173                }
174            } else {
175                thread::park();
176            }
177        }
178    }
179
180    /// Unparks the thread this context belongs to.
181    #[inline]
182    pub fn unpark(&self) {
183        self.inner.thread.unpark();
184    }
185
186    /// Returns the id of the thread this context belongs to.
187    #[inline]
188    pub fn thread_id(&self) -> ThreadId {
189        self.inner.thread_id
190    }
191}