tokio/runtime/
context.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
use crate::loom::thread::AccessError;
use crate::runtime::coop;

use std::cell::Cell;

#[cfg(any(feature = "rt", feature = "macros"))]
use crate::util::rand::FastRand;

cfg_rt! {
    mod blocking;
    pub(crate) use blocking::{disallow_block_in_place, try_enter_blocking_region, BlockingRegionGuard};

    mod current;
    pub(crate) use current::{with_current, try_set_current, SetCurrentGuard};

    mod runtime;
    pub(crate) use runtime::{EnterRuntime, enter_runtime};

    mod scoped;
    use scoped::Scoped;

    use crate::runtime::{scheduler, task::Id};

    use std::task::Waker;

    cfg_taskdump! {
        use crate::runtime::task::trace;
    }
}

cfg_rt_multi_thread! {
    mod runtime_mt;
    pub(crate) use runtime_mt::{current_enter_context, exit_runtime};
}

struct Context {
    /// Uniquely identifies the current thread
    #[cfg(feature = "rt")]
    thread_id: Cell<Option<ThreadId>>,

    /// Handle to the runtime scheduler running on the current thread.
    #[cfg(feature = "rt")]
    current: current::HandleCell,

    /// Handle to the scheduler's internal "context"
    #[cfg(feature = "rt")]
    scheduler: Scoped<scheduler::Context>,

    #[cfg(feature = "rt")]
    current_task_id: Cell<Option<Id>>,

    /// Tracks if the current thread is currently driving a runtime.
    /// Note, that if this is set to "entered", the current scheduler
    /// handle may not reference the runtime currently executing. This
    /// is because other runtime handles may be set to current from
    /// within a runtime.
    #[cfg(feature = "rt")]
    runtime: Cell<EnterRuntime>,

    #[cfg(any(feature = "rt", feature = "macros"))]
    rng: Cell<Option<FastRand>>,

    /// Tracks the amount of "work" a task may still do before yielding back to
    /// the sheduler
    budget: Cell<coop::Budget>,

    #[cfg(all(
        tokio_unstable,
        tokio_taskdump,
        feature = "rt",
        target_os = "linux",
        any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64")
    ))]
    trace: trace::Context,
}

tokio_thread_local! {
    static CONTEXT: Context = const {
        Context {
            #[cfg(feature = "rt")]
            thread_id: Cell::new(None),

            // Tracks the current runtime handle to use when spawning,
            // accessing drivers, etc...
            #[cfg(feature = "rt")]
            current: current::HandleCell::new(),

            // Tracks the current scheduler internal context
            #[cfg(feature = "rt")]
            scheduler: Scoped::new(),

            #[cfg(feature = "rt")]
            current_task_id: Cell::new(None),

            // Tracks if the current thread is currently driving a runtime.
            // Note, that if this is set to "entered", the current scheduler
            // handle may not reference the runtime currently executing. This
            // is because other runtime handles may be set to current from
            // within a runtime.
            #[cfg(feature = "rt")]
            runtime: Cell::new(EnterRuntime::NotEntered),

            #[cfg(any(feature = "rt", feature = "macros"))]
            rng: Cell::new(None),

            budget: Cell::new(coop::Budget::unconstrained()),

            #[cfg(all(
                tokio_unstable,
                tokio_taskdump,
                feature = "rt",
                target_os = "linux",
                any(
                    target_arch = "aarch64",
                    target_arch = "x86",
                    target_arch = "x86_64"
                )
            ))]
            trace: trace::Context::new(),
        }
    }
}

#[cfg(any(feature = "macros", all(feature = "sync", feature = "rt")))]
pub(crate) fn thread_rng_n(n: u32) -> u32 {
    CONTEXT.with(|ctx| {
        let mut rng = ctx.rng.get().unwrap_or_else(FastRand::new);
        let ret = rng.fastrand_n(n);
        ctx.rng.set(Some(rng));
        ret
    })
}

pub(super) fn budget<R>(f: impl FnOnce(&Cell<coop::Budget>) -> R) -> Result<R, AccessError> {
    CONTEXT.try_with(|ctx| f(&ctx.budget))
}

cfg_rt! {
    use crate::runtime::ThreadId;

    pub(crate) fn thread_id() -> Result<ThreadId, AccessError> {
        CONTEXT.try_with(|ctx| {
            match ctx.thread_id.get() {
                Some(id) => id,
                None => {
                    let id = ThreadId::next();
                    ctx.thread_id.set(Some(id));
                    id
                }
            }
        })
    }

    pub(crate) fn set_current_task_id(id: Option<Id>) -> Option<Id> {
        CONTEXT.try_with(|ctx| ctx.current_task_id.replace(id)).unwrap_or(None)
    }

    pub(crate) fn current_task_id() -> Option<Id> {
        CONTEXT.try_with(|ctx| ctx.current_task_id.get()).unwrap_or(None)
    }

    #[track_caller]
    pub(crate) fn defer(waker: &Waker) {
        with_scheduler(|maybe_scheduler| {
            if let Some(scheduler) = maybe_scheduler {
                scheduler.defer(waker);
            } else {
                // Called from outside of the runtime, immediately wake the
                // task.
                waker.wake_by_ref();
            }
        });
    }

    pub(super) fn set_scheduler<R>(v: &scheduler::Context, f: impl FnOnce() -> R) -> R {
        CONTEXT.with(|c| c.scheduler.set(v, f))
    }

    #[track_caller]
    pub(super) fn with_scheduler<R>(f: impl FnOnce(Option<&scheduler::Context>) -> R) -> R {
        CONTEXT.with(|c| c.scheduler.with(f))
    }

    cfg_taskdump! {
        /// SAFETY: Callers of this function must ensure that trace frames always
        /// form a valid linked list.
        pub(crate) unsafe fn with_trace<R>(f: impl FnOnce(&trace::Context) -> R) -> Option<R> {
            CONTEXT.try_with(|c| f(&c.trace)).ok()
        }
    }
}