crossbeam_channel/
waker.rs

1//! Waking mechanism for threads blocked on channel operations.
2
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::thread::{self, ThreadId};
5
6use crate::context::Context;
7use crate::select::{Operation, Selected};
8use crate::utils::Spinlock;
9
10/// Represents a thread blocked on a specific channel operation.
11pub(crate) struct Entry {
12    /// The operation.
13    pub(crate) oper: Operation,
14
15    /// Optional packet.
16    pub(crate) packet: usize,
17
18    /// Context associated with the thread owning this operation.
19    pub(crate) cx: Context,
20}
21
22/// A queue of threads blocked on channel operations.
23///
24/// This data structure is used by threads to register blocking operations and get woken up once
25/// an operation becomes ready.
26pub(crate) struct Waker {
27    /// A list of select operations.
28    selectors: Vec<Entry>,
29
30    /// A list of operations waiting to be ready.
31    observers: Vec<Entry>,
32}
33
34impl Waker {
35    /// Creates a new `Waker`.
36    #[inline]
37    pub(crate) fn new() -> Self {
38        Waker {
39            selectors: Vec::new(),
40            observers: Vec::new(),
41        }
42    }
43
44    /// Registers a select operation.
45    #[inline]
46    pub(crate) fn register(&mut self, oper: Operation, cx: &Context) {
47        self.register_with_packet(oper, 0, cx);
48    }
49
50    /// Registers a select operation and a packet.
51    #[inline]
52    pub(crate) fn register_with_packet(&mut self, oper: Operation, packet: usize, cx: &Context) {
53        self.selectors.push(Entry {
54            oper,
55            packet,
56            cx: cx.clone(),
57        });
58    }
59
60    /// Unregisters a select operation.
61    #[inline]
62    pub(crate) fn unregister(&mut self, oper: Operation) -> Option<Entry> {
63        if let Some((i, _)) = self
64            .selectors
65            .iter()
66            .enumerate()
67            .find(|&(_, entry)| entry.oper == oper)
68        {
69            let entry = self.selectors.remove(i);
70            Some(entry)
71        } else {
72            None
73        }
74    }
75
76    /// Attempts to find another thread's entry, select the operation, and wake it up.
77    #[inline]
78    pub(crate) fn try_select(&mut self) -> Option<Entry> {
79        let mut entry = None;
80
81        if !self.selectors.is_empty() {
82            let thread_id = current_thread_id();
83
84            for i in 0..self.selectors.len() {
85                // Does the entry belong to a different thread?
86                if self.selectors[i].cx.thread_id() != thread_id {
87                    // Try selecting this operation.
88                    let sel = Selected::Operation(self.selectors[i].oper);
89                    let res = self.selectors[i].cx.try_select(sel);
90
91                    if res.is_ok() {
92                        // Provide the packet.
93                        self.selectors[i].cx.store_packet(self.selectors[i].packet);
94                        // Wake the thread up.
95                        self.selectors[i].cx.unpark();
96
97                        // Remove the entry from the queue to keep it clean and improve
98                        // performance.
99                        entry = Some(self.selectors.remove(i));
100                        break;
101                    }
102                }
103            }
104        }
105
106        entry
107    }
108
109    /// Returns `true` if there is an entry which can be selected by the current thread.
110    #[inline]
111    pub(crate) fn can_select(&self) -> bool {
112        if self.selectors.is_empty() {
113            false
114        } else {
115            let thread_id = current_thread_id();
116
117            self.selectors.iter().any(|entry| {
118                entry.cx.thread_id() != thread_id && entry.cx.selected() == Selected::Waiting
119            })
120        }
121    }
122
123    /// Registers an operation waiting to be ready.
124    #[inline]
125    pub(crate) fn watch(&mut self, oper: Operation, cx: &Context) {
126        self.observers.push(Entry {
127            oper,
128            packet: 0,
129            cx: cx.clone(),
130        });
131    }
132
133    /// Unregisters an operation waiting to be ready.
134    #[inline]
135    pub(crate) fn unwatch(&mut self, oper: Operation) {
136        self.observers.retain(|e| e.oper != oper);
137    }
138
139    /// Notifies all operations waiting to be ready.
140    #[inline]
141    pub(crate) fn notify(&mut self) {
142        for entry in self.observers.drain(..) {
143            if entry.cx.try_select(Selected::Operation(entry.oper)).is_ok() {
144                entry.cx.unpark();
145            }
146        }
147    }
148
149    /// Notifies all registered operations that the channel is disconnected.
150    #[inline]
151    pub(crate) fn disconnect(&mut self) {
152        for entry in self.selectors.iter() {
153            if entry.cx.try_select(Selected::Disconnected).is_ok() {
154                // Wake the thread up.
155                //
156                // Here we don't remove the entry from the queue. Registered threads must
157                // unregister from the waker by themselves. They might also want to recover the
158                // packet value and destroy it, if necessary.
159                entry.cx.unpark();
160            }
161        }
162
163        self.notify();
164    }
165}
166
167impl Drop for Waker {
168    #[inline]
169    fn drop(&mut self) {
170        debug_assert_eq!(self.selectors.len(), 0);
171        debug_assert_eq!(self.observers.len(), 0);
172    }
173}
174
175/// A waker that can be shared among threads without locking.
176///
177/// This is a simple wrapper around `Waker` that internally uses a mutex for synchronization.
178pub(crate) struct SyncWaker {
179    /// The inner `Waker`.
180    inner: Spinlock<Waker>,
181
182    /// `true` if the waker is empty.
183    is_empty: AtomicBool,
184}
185
186impl SyncWaker {
187    /// Creates a new `SyncWaker`.
188    #[inline]
189    pub(crate) fn new() -> Self {
190        SyncWaker {
191            inner: Spinlock::new(Waker::new()),
192            is_empty: AtomicBool::new(true),
193        }
194    }
195
196    /// Registers the current thread with an operation.
197    #[inline]
198    pub(crate) fn register(&self, oper: Operation, cx: &Context) {
199        let mut inner = self.inner.lock();
200        inner.register(oper, cx);
201        self.is_empty.store(
202            inner.selectors.is_empty() && inner.observers.is_empty(),
203            Ordering::SeqCst,
204        );
205    }
206
207    /// Unregisters an operation previously registered by the current thread.
208    #[inline]
209    pub(crate) fn unregister(&self, oper: Operation) -> Option<Entry> {
210        let mut inner = self.inner.lock();
211        let entry = inner.unregister(oper);
212        self.is_empty.store(
213            inner.selectors.is_empty() && inner.observers.is_empty(),
214            Ordering::SeqCst,
215        );
216        entry
217    }
218
219    /// Attempts to find one thread (not the current one), select its operation, and wake it up.
220    #[inline]
221    pub(crate) fn notify(&self) {
222        if !self.is_empty.load(Ordering::SeqCst) {
223            let mut inner = self.inner.lock();
224            if !self.is_empty.load(Ordering::SeqCst) {
225                inner.try_select();
226                inner.notify();
227                self.is_empty.store(
228                    inner.selectors.is_empty() && inner.observers.is_empty(),
229                    Ordering::SeqCst,
230                );
231            }
232        }
233    }
234
235    /// Registers an operation waiting to be ready.
236    #[inline]
237    pub(crate) fn watch(&self, oper: Operation, cx: &Context) {
238        let mut inner = self.inner.lock();
239        inner.watch(oper, cx);
240        self.is_empty.store(
241            inner.selectors.is_empty() && inner.observers.is_empty(),
242            Ordering::SeqCst,
243        );
244    }
245
246    /// Unregisters an operation waiting to be ready.
247    #[inline]
248    pub(crate) fn unwatch(&self, oper: Operation) {
249        let mut inner = self.inner.lock();
250        inner.unwatch(oper);
251        self.is_empty.store(
252            inner.selectors.is_empty() && inner.observers.is_empty(),
253            Ordering::SeqCst,
254        );
255    }
256
257    /// Notifies all threads that the channel is disconnected.
258    #[inline]
259    pub(crate) fn disconnect(&self) {
260        let mut inner = self.inner.lock();
261        inner.disconnect();
262        self.is_empty.store(
263            inner.selectors.is_empty() && inner.observers.is_empty(),
264            Ordering::SeqCst,
265        );
266    }
267}
268
269impl Drop for SyncWaker {
270    #[inline]
271    fn drop(&mut self) {
272        debug_assert_eq!(self.is_empty.load(Ordering::SeqCst), true);
273    }
274}
275
276/// Returns the id of the current thread.
277#[inline]
278fn current_thread_id() -> ThreadId {
279    thread_local! {
280        /// Cached thread-local id.
281        static THREAD_ID: ThreadId = thread::current().id();
282    }
283
284    THREAD_ID
285        .try_with(|id| *id)
286        .unwrap_or_else(|_| thread::current().id())
287}