tokio/runtime/scheduler/
mod.rs

1cfg_rt! {
2    pub(crate) mod current_thread;
3    pub(crate) use current_thread::CurrentThread;
4
5    mod defer;
6    use defer::Defer;
7
8    pub(crate) mod inject;
9    pub(crate) use inject::Inject;
10}
11
12cfg_rt_multi_thread! {
13    mod block_in_place;
14    pub(crate) use block_in_place::block_in_place;
15
16    mod lock;
17    use lock::Lock;
18
19    pub(crate) mod multi_thread;
20    pub(crate) use multi_thread::MultiThread;
21
22    cfg_unstable! {
23        pub(crate) mod multi_thread_alt;
24        pub(crate) use multi_thread_alt::MultiThread as MultiThreadAlt;
25    }
26}
27
28use crate::runtime::driver;
29
30#[derive(Debug, Clone)]
31pub(crate) enum Handle {
32    #[cfg(feature = "rt")]
33    CurrentThread(Arc<current_thread::Handle>),
34
35    #[cfg(feature = "rt-multi-thread")]
36    MultiThread(Arc<multi_thread::Handle>),
37
38    #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
39    MultiThreadAlt(Arc<multi_thread_alt::Handle>),
40
41    // TODO: This is to avoid triggering "dead code" warnings many other places
42    // in the codebase. Remove this during a later cleanup
43    #[cfg(not(feature = "rt"))]
44    #[allow(dead_code)]
45    Disabled,
46}
47
48#[cfg(feature = "rt")]
49pub(super) enum Context {
50    CurrentThread(current_thread::Context),
51
52    #[cfg(feature = "rt-multi-thread")]
53    MultiThread(multi_thread::Context),
54
55    #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
56    MultiThreadAlt(multi_thread_alt::Context),
57}
58
59impl Handle {
60    #[cfg_attr(not(feature = "full"), allow(dead_code))]
61    pub(crate) fn driver(&self) -> &driver::Handle {
62        match *self {
63            #[cfg(feature = "rt")]
64            Handle::CurrentThread(ref h) => &h.driver,
65
66            #[cfg(feature = "rt-multi-thread")]
67            Handle::MultiThread(ref h) => &h.driver,
68
69            #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
70            Handle::MultiThreadAlt(ref h) => &h.driver,
71
72            #[cfg(not(feature = "rt"))]
73            Handle::Disabled => unreachable!(),
74        }
75    }
76}
77
78cfg_rt! {
79    use crate::future::Future;
80    use crate::loom::sync::Arc;
81    use crate::runtime::{blocking, task::Id};
82    use crate::runtime::context;
83    use crate::task::JoinHandle;
84    use crate::util::RngSeedGenerator;
85    use std::task::Waker;
86
87    macro_rules! match_flavor {
88        ($self:expr, $ty:ident($h:ident) => $e:expr) => {
89            match $self {
90                $ty::CurrentThread($h) => $e,
91
92                #[cfg(feature = "rt-multi-thread")]
93                $ty::MultiThread($h) => $e,
94
95                #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
96                $ty::MultiThreadAlt($h) => $e,
97            }
98        }
99    }
100
101    impl Handle {
102        #[track_caller]
103        pub(crate) fn current() -> Handle {
104            match context::with_current(Clone::clone) {
105                Ok(handle) => handle,
106                Err(e) => panic!("{}", e),
107            }
108        }
109
110        pub(crate) fn blocking_spawner(&self) -> &blocking::Spawner {
111            match_flavor!(self, Handle(h) => &h.blocking_spawner)
112        }
113
114        pub(crate) fn spawn<F>(&self, future: F, id: Id) -> JoinHandle<F::Output>
115        where
116            F: Future + Send + 'static,
117            F::Output: Send + 'static,
118        {
119            match self {
120                Handle::CurrentThread(h) => current_thread::Handle::spawn(h, future, id),
121
122                #[cfg(feature = "rt-multi-thread")]
123                Handle::MultiThread(h) => multi_thread::Handle::spawn(h, future, id),
124
125                #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
126                Handle::MultiThreadAlt(h) => multi_thread_alt::Handle::spawn(h, future, id),
127            }
128        }
129
130        pub(crate) fn shutdown(&self) {
131            match *self {
132                Handle::CurrentThread(_) => {},
133
134                #[cfg(feature = "rt-multi-thread")]
135                Handle::MultiThread(ref h) => h.shutdown(),
136
137                #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
138                Handle::MultiThreadAlt(ref h) => h.shutdown(),
139            }
140        }
141
142        pub(crate) fn seed_generator(&self) -> &RngSeedGenerator {
143            match_flavor!(self, Handle(h) => &h.seed_generator)
144        }
145
146        pub(crate) fn as_current_thread(&self) -> &Arc<current_thread::Handle> {
147            match self {
148                Handle::CurrentThread(handle) => handle,
149                #[cfg(feature = "rt-multi-thread")]
150                _ => panic!("not a CurrentThread handle"),
151            }
152        }
153
154        cfg_rt_multi_thread! {
155            cfg_unstable! {
156                pub(crate) fn expect_multi_thread_alt(&self) -> &Arc<multi_thread_alt::Handle> {
157                    match self {
158                        Handle::MultiThreadAlt(handle) => handle,
159                        _ => panic!("not a `MultiThreadAlt` handle"),
160                    }
161                }
162            }
163        }
164    }
165
166    impl Handle {
167        pub(crate) fn num_workers(&self) -> usize {
168            match self {
169                Handle::CurrentThread(_) => 1,
170                #[cfg(feature = "rt-multi-thread")]
171                Handle::MultiThread(handle) => handle.num_workers(),
172                #[cfg(all(tokio_unstable, feature = "rt-multi-thread"))]
173                Handle::MultiThreadAlt(handle) => handle.num_workers(),
174            }
175        }
176    }
177
178    cfg_unstable_metrics! {
179        use crate::runtime::{SchedulerMetrics, WorkerMetrics};
180
181        impl Handle {
182            pub(crate) fn num_blocking_threads(&self) -> usize {
183                match_flavor!(self, Handle(handle) => handle.num_blocking_threads())
184            }
185
186            pub(crate) fn num_idle_blocking_threads(&self) -> usize {
187                match_flavor!(self, Handle(handle) => handle.num_idle_blocking_threads())
188            }
189
190            pub(crate) fn active_tasks_count(&self) -> usize {
191                match_flavor!(self, Handle(handle) => handle.active_tasks_count())
192            }
193
194            pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {
195                match_flavor!(self, Handle(handle) => handle.scheduler_metrics())
196            }
197
198            pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics {
199                match_flavor!(self, Handle(handle) => handle.worker_metrics(worker))
200            }
201
202            pub(crate) fn injection_queue_depth(&self) -> usize {
203                match_flavor!(self, Handle(handle) => handle.injection_queue_depth())
204            }
205
206            pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize {
207                match_flavor!(self, Handle(handle) => handle.worker_local_queue_depth(worker))
208            }
209
210            pub(crate) fn blocking_queue_depth(&self) -> usize {
211                match_flavor!(self, Handle(handle) => handle.blocking_queue_depth())
212            }
213        }
214    }
215
216    impl Context {
217        #[track_caller]
218        pub(crate) fn expect_current_thread(&self) -> &current_thread::Context {
219            match self {
220                Context::CurrentThread(context) => context,
221                #[cfg(feature = "rt-multi-thread")]
222                _ => panic!("expected `CurrentThread::Context`")
223            }
224        }
225
226        pub(crate) fn defer(&self, waker: &Waker) {
227            match_flavor!(self, Context(context) => context.defer(waker));
228        }
229
230        cfg_rt_multi_thread! {
231            #[track_caller]
232            pub(crate) fn expect_multi_thread(&self) -> &multi_thread::Context {
233                match self {
234                    Context::MultiThread(context) => context,
235                    _ => panic!("expected `MultiThread::Context`")
236                }
237            }
238
239            cfg_unstable! {
240                #[track_caller]
241                pub(crate) fn expect_multi_thread_alt(&self) -> &multi_thread_alt::Context {
242                    match self {
243                        Context::MultiThreadAlt(context) => context,
244                        _ => panic!("expected `MultiThreadAlt::Context`")
245                    }
246                }
247            }
248        }
249    }
250}
251
252cfg_not_rt! {
253    #[cfg(any(
254        feature = "net",
255        all(unix, feature = "process"),
256        all(unix, feature = "signal"),
257        feature = "time",
258    ))]
259    impl Handle {
260        #[track_caller]
261        pub(crate) fn current() -> Handle {
262            panic!("{}", crate::util::error::CONTEXT_MISSING_ERROR)
263        }
264    }
265}