async_task/task.rs
1use core::fmt;
2use core::future::Future;
3use core::marker::{PhantomData, Unpin};
4use core::mem;
5use core::pin::Pin;
6use core::ptr::NonNull;
7use core::sync::atomic::Ordering;
8use core::task::{Context, Poll};
9
10use crate::header::Header;
11use crate::state::*;
12
13/// A spawned task.
14///
15/// A [`Task`] can be awaited to retrieve the output of its future.
16///
17/// Dropping a [`Task`] cancels it, which means its future won't be polled again. To drop the
18/// [`Task`] handle without canceling it, use [`detach()`][`Task::detach()`] instead. To cancel a
19/// task gracefully and wait until it is fully destroyed, use the [`cancel()`][Task::cancel()]
20/// method.
21///
22/// Note that canceling a task actually wakes it and reschedules one last time. Then, the executor
23/// can destroy the task by simply dropping its [`Runnable`][`super::Runnable`] or by invoking
24/// [`run()`][`super::Runnable::run()`].
25///
26/// # Examples
27///
28/// ```
29/// use smol::{future, Executor};
30/// use std::thread;
31///
32/// let ex = Executor::new();
33///
34/// // Spawn a future onto the executor.
35/// let task = ex.spawn(async {
36/// println!("Hello from a task!");
37/// 1 + 2
38/// });
39///
40/// // Run an executor thread.
41/// thread::spawn(move || future::block_on(ex.run(future::pending::<()>())));
42///
43/// // Wait for the task's output.
44/// assert_eq!(future::block_on(task), 3);
45/// ```
46#[must_use = "tasks get canceled when dropped, use `.detach()` to run them in the background"]
47pub struct Task<T> {
48 /// A raw task pointer.
49 pub(crate) ptr: NonNull<()>,
50
51 /// A marker capturing generic type `T`.
52 pub(crate) _marker: PhantomData<T>,
53}
54
55unsafe impl<T: Send> Send for Task<T> {}
56unsafe impl<T> Sync for Task<T> {}
57
58impl<T> Unpin for Task<T> {}
59
60#[cfg(feature = "std")]
61impl<T> std::panic::UnwindSafe for Task<T> {}
62#[cfg(feature = "std")]
63impl<T> std::panic::RefUnwindSafe for Task<T> {}
64
65impl<T> Task<T> {
66 /// Detaches the task to let it keep running in the background.
67 ///
68 /// # Examples
69 ///
70 /// ```
71 /// use smol::{Executor, Timer};
72 /// use std::time::Duration;
73 ///
74 /// let ex = Executor::new();
75 ///
76 /// // Spawn a deamon future.
77 /// ex.spawn(async {
78 /// loop {
79 /// println!("I'm a daemon task looping forever.");
80 /// Timer::after(Duration::from_secs(1)).await;
81 /// }
82 /// })
83 /// .detach();
84 /// ```
85 pub fn detach(self) {
86 let mut this = self;
87 let _out = this.set_detached();
88 mem::forget(this);
89 }
90
91 /// Cancels the task and waits for it to stop running.
92 ///
93 /// Returns the task's output if it was completed just before it got canceled, or [`None`] if
94 /// it didn't complete.
95 ///
96 /// While it's possible to simply drop the [`Task`] to cancel it, this is a cleaner way of
97 /// canceling because it also waits for the task to stop running.
98 ///
99 /// # Examples
100 ///
101 /// ```
102 /// use smol::{future, Executor, Timer};
103 /// use std::thread;
104 /// use std::time::Duration;
105 ///
106 /// let ex = Executor::new();
107 ///
108 /// // Spawn a deamon future.
109 /// let task = ex.spawn(async {
110 /// loop {
111 /// println!("Even though I'm in an infinite loop, you can still cancel me!");
112 /// Timer::after(Duration::from_secs(1)).await;
113 /// }
114 /// });
115 ///
116 /// // Run an executor thread.
117 /// thread::spawn(move || future::block_on(ex.run(future::pending::<()>())));
118 ///
119 /// future::block_on(async {
120 /// Timer::after(Duration::from_secs(3)).await;
121 /// task.cancel().await;
122 /// });
123 /// ```
124 pub async fn cancel(self) -> Option<T> {
125 let mut this = self;
126 this.set_canceled();
127
128 struct Fut<T>(Task<T>);
129
130 impl<T> Future for Fut<T> {
131 type Output = Option<T>;
132
133 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
134 self.0.poll_task(cx)
135 }
136 }
137
138 Fut(this).await
139 }
140
141 /// Puts the task in canceled state.
142 fn set_canceled(&mut self) {
143 let ptr = self.ptr.as_ptr();
144 let header = ptr as *const Header;
145
146 unsafe {
147 let mut state = (*header).state.load(Ordering::Acquire);
148
149 loop {
150 // If the task has been completed or closed, it can't be canceled.
151 if state & (COMPLETED | CLOSED) != 0 {
152 break;
153 }
154
155 // If the task is not scheduled nor running, we'll need to schedule it.
156 let new = if state & (SCHEDULED | RUNNING) == 0 {
157 (state | SCHEDULED | CLOSED) + REFERENCE
158 } else {
159 state | CLOSED
160 };
161
162 // Mark the task as closed.
163 match (*header).state.compare_exchange_weak(
164 state,
165 new,
166 Ordering::AcqRel,
167 Ordering::Acquire,
168 ) {
169 Ok(_) => {
170 // If the task is not scheduled nor running, schedule it one more time so
171 // that its future gets dropped by the executor.
172 if state & (SCHEDULED | RUNNING) == 0 {
173 ((*header).vtable.schedule)(ptr);
174 }
175
176 // Notify the awaiter that the task has been closed.
177 if state & AWAITER != 0 {
178 (*header).notify(None);
179 }
180
181 break;
182 }
183 Err(s) => state = s,
184 }
185 }
186 }
187 }
188
189 /// Puts the task in detached state.
190 fn set_detached(&mut self) -> Option<T> {
191 let ptr = self.ptr.as_ptr();
192 let header = ptr as *const Header;
193
194 unsafe {
195 // A place where the output will be stored in case it needs to be dropped.
196 let mut output = None;
197
198 // Optimistically assume the `Task` is being detached just after creating the task.
199 // This is a common case so if the `Task` is datached, the overhead of it is only one
200 // compare-exchange operation.
201 if let Err(mut state) = (*header).state.compare_exchange_weak(
202 SCHEDULED | TASK | REFERENCE,
203 SCHEDULED | REFERENCE,
204 Ordering::AcqRel,
205 Ordering::Acquire,
206 ) {
207 loop {
208 // If the task has been completed but not yet closed, that means its output
209 // must be dropped.
210 if state & COMPLETED != 0 && state & CLOSED == 0 {
211 // Mark the task as closed in order to grab its output.
212 match (*header).state.compare_exchange_weak(
213 state,
214 state | CLOSED,
215 Ordering::AcqRel,
216 Ordering::Acquire,
217 ) {
218 Ok(_) => {
219 // Read the output.
220 output =
221 Some((((*header).vtable.get_output)(ptr) as *mut T).read());
222
223 // Update the state variable because we're continuing the loop.
224 state |= CLOSED;
225 }
226 Err(s) => state = s,
227 }
228 } else {
229 // If this is the last reference to the task and it's not closed, then
230 // close it and schedule one more time so that its future gets dropped by
231 // the executor.
232 let new = if state & (!(REFERENCE - 1) | CLOSED) == 0 {
233 SCHEDULED | CLOSED | REFERENCE
234 } else {
235 state & !TASK
236 };
237
238 // Unset the `TASK` flag.
239 match (*header).state.compare_exchange_weak(
240 state,
241 new,
242 Ordering::AcqRel,
243 Ordering::Acquire,
244 ) {
245 Ok(_) => {
246 // If this is the last reference to the task, we need to either
247 // schedule dropping its future or destroy it.
248 if state & !(REFERENCE - 1) == 0 {
249 if state & CLOSED == 0 {
250 ((*header).vtable.schedule)(ptr);
251 } else {
252 ((*header).vtable.destroy)(ptr);
253 }
254 }
255
256 break;
257 }
258 Err(s) => state = s,
259 }
260 }
261 }
262 }
263
264 output
265 }
266 }
267
268 /// Polls the task to retrieve its output.
269 ///
270 /// Returns `Some` if the task has completed or `None` if it was closed.
271 ///
272 /// A task becomes closed in the following cases:
273 ///
274 /// 1. It gets canceled by `Runnable::drop()`, `Task::drop()`, or `Task::cancel()`.
275 /// 2. Its output gets awaited by the `Task`.
276 /// 3. It panics while polling the future.
277 /// 4. It is completed and the `Task` gets dropped.
278 fn poll_task(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
279 let ptr = self.ptr.as_ptr();
280 let header = ptr as *const Header;
281
282 unsafe {
283 let mut state = (*header).state.load(Ordering::Acquire);
284
285 loop {
286 // If the task has been closed, notify the awaiter and return `None`.
287 if state & CLOSED != 0 {
288 // If the task is scheduled or running, we need to wait until its future is
289 // dropped.
290 if state & (SCHEDULED | RUNNING) != 0 {
291 // Replace the waker with one associated with the current task.
292 (*header).register(cx.waker());
293
294 // Reload the state after registering. It is possible changes occurred just
295 // before registration so we need to check for that.
296 state = (*header).state.load(Ordering::Acquire);
297
298 // If the task is still scheduled or running, we need to wait because its
299 // future is not dropped yet.
300 if state & (SCHEDULED | RUNNING) != 0 {
301 return Poll::Pending;
302 }
303 }
304
305 // Even though the awaiter is most likely the current task, it could also be
306 // another task.
307 (*header).notify(Some(cx.waker()));
308 return Poll::Ready(None);
309 }
310
311 // If the task is not completed, register the current task.
312 if state & COMPLETED == 0 {
313 // Replace the waker with one associated with the current task.
314 (*header).register(cx.waker());
315
316 // Reload the state after registering. It is possible that the task became
317 // completed or closed just before registration so we need to check for that.
318 state = (*header).state.load(Ordering::Acquire);
319
320 // If the task has been closed, restart.
321 if state & CLOSED != 0 {
322 continue;
323 }
324
325 // If the task is still not completed, we're blocked on it.
326 if state & COMPLETED == 0 {
327 return Poll::Pending;
328 }
329 }
330
331 // Since the task is now completed, mark it as closed in order to grab its output.
332 match (*header).state.compare_exchange(
333 state,
334 state | CLOSED,
335 Ordering::AcqRel,
336 Ordering::Acquire,
337 ) {
338 Ok(_) => {
339 // Notify the awaiter. Even though the awaiter is most likely the current
340 // task, it could also be another task.
341 if state & AWAITER != 0 {
342 (*header).notify(Some(cx.waker()));
343 }
344
345 // Take the output from the task.
346 let output = ((*header).vtable.get_output)(ptr) as *mut T;
347 return Poll::Ready(Some(output.read()));
348 }
349 Err(s) => state = s,
350 }
351 }
352 }
353 }
354}
355
356impl<T> Drop for Task<T> {
357 fn drop(&mut self) {
358 self.set_canceled();
359 self.set_detached();
360 }
361}
362
363impl<T> Future for Task<T> {
364 type Output = T;
365
366 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
367 match self.poll_task(cx) {
368 Poll::Ready(t) => Poll::Ready(t.expect("task has failed")),
369 Poll::Pending => Poll::Pending,
370 }
371 }
372}
373
374impl<T> fmt::Debug for Task<T> {
375 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
376 let ptr = self.ptr.as_ptr();
377 let header = ptr as *const Header;
378
379 f.debug_struct("Task")
380 .field("header", unsafe { &(*header) })
381 .finish()
382 }
383}