async_task/
task.rs

1use core::fmt;
2use core::future::Future;
3use core::marker::{PhantomData, Unpin};
4use core::mem;
5use core::pin::Pin;
6use core::ptr::NonNull;
7use core::sync::atomic::Ordering;
8use core::task::{Context, Poll};
9
10use crate::header::Header;
11use crate::state::*;
12
13/// A spawned task.
14///
15/// A [`Task`] can be awaited to retrieve the output of its future.
16///
17/// Dropping a [`Task`] cancels it, which means its future won't be polled again. To drop the
18/// [`Task`] handle without canceling it, use [`detach()`][`Task::detach()`] instead. To cancel a
19/// task gracefully and wait until it is fully destroyed, use the [`cancel()`][Task::cancel()]
20/// method.
21///
22/// Note that canceling a task actually wakes it and reschedules one last time. Then, the executor
23/// can destroy the task by simply dropping its [`Runnable`][`super::Runnable`] or by invoking
24/// [`run()`][`super::Runnable::run()`].
25///
26/// # Examples
27///
28/// ```
29/// use smol::{future, Executor};
30/// use std::thread;
31///
32/// let ex = Executor::new();
33///
34/// // Spawn a future onto the executor.
35/// let task = ex.spawn(async {
36///     println!("Hello from a task!");
37///     1 + 2
38/// });
39///
40/// // Run an executor thread.
41/// thread::spawn(move || future::block_on(ex.run(future::pending::<()>())));
42///
43/// // Wait for the task's output.
44/// assert_eq!(future::block_on(task), 3);
45/// ```
46#[must_use = "tasks get canceled when dropped, use `.detach()` to run them in the background"]
47pub struct Task<T> {
48    /// A raw task pointer.
49    pub(crate) ptr: NonNull<()>,
50
51    /// A marker capturing generic type `T`.
52    pub(crate) _marker: PhantomData<T>,
53}
54
55unsafe impl<T: Send> Send for Task<T> {}
56unsafe impl<T> Sync for Task<T> {}
57
58impl<T> Unpin for Task<T> {}
59
60#[cfg(feature = "std")]
61impl<T> std::panic::UnwindSafe for Task<T> {}
62#[cfg(feature = "std")]
63impl<T> std::panic::RefUnwindSafe for Task<T> {}
64
65impl<T> Task<T> {
66    /// Detaches the task to let it keep running in the background.
67    ///
68    /// # Examples
69    ///
70    /// ```
71    /// use smol::{Executor, Timer};
72    /// use std::time::Duration;
73    ///
74    /// let ex = Executor::new();
75    ///
76    /// // Spawn a deamon future.
77    /// ex.spawn(async {
78    ///     loop {
79    ///         println!("I'm a daemon task looping forever.");
80    ///         Timer::after(Duration::from_secs(1)).await;
81    ///     }
82    /// })
83    /// .detach();
84    /// ```
85    pub fn detach(self) {
86        let mut this = self;
87        let _out = this.set_detached();
88        mem::forget(this);
89    }
90
91    /// Cancels the task and waits for it to stop running.
92    ///
93    /// Returns the task's output if it was completed just before it got canceled, or [`None`] if
94    /// it didn't complete.
95    ///
96    /// While it's possible to simply drop the [`Task`] to cancel it, this is a cleaner way of
97    /// canceling because it also waits for the task to stop running.
98    ///
99    /// # Examples
100    ///
101    /// ```
102    /// use smol::{future, Executor, Timer};
103    /// use std::thread;
104    /// use std::time::Duration;
105    ///
106    /// let ex = Executor::new();
107    ///
108    /// // Spawn a deamon future.
109    /// let task = ex.spawn(async {
110    ///     loop {
111    ///         println!("Even though I'm in an infinite loop, you can still cancel me!");
112    ///         Timer::after(Duration::from_secs(1)).await;
113    ///     }
114    /// });
115    ///
116    /// // Run an executor thread.
117    /// thread::spawn(move || future::block_on(ex.run(future::pending::<()>())));
118    ///
119    /// future::block_on(async {
120    ///     Timer::after(Duration::from_secs(3)).await;
121    ///     task.cancel().await;
122    /// });
123    /// ```
124    pub async fn cancel(self) -> Option<T> {
125        let mut this = self;
126        this.set_canceled();
127
128        struct Fut<T>(Task<T>);
129
130        impl<T> Future for Fut<T> {
131            type Output = Option<T>;
132
133            fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
134                self.0.poll_task(cx)
135            }
136        }
137
138        Fut(this).await
139    }
140
141    /// Puts the task in canceled state.
142    fn set_canceled(&mut self) {
143        let ptr = self.ptr.as_ptr();
144        let header = ptr as *const Header;
145
146        unsafe {
147            let mut state = (*header).state.load(Ordering::Acquire);
148
149            loop {
150                // If the task has been completed or closed, it can't be canceled.
151                if state & (COMPLETED | CLOSED) != 0 {
152                    break;
153                }
154
155                // If the task is not scheduled nor running, we'll need to schedule it.
156                let new = if state & (SCHEDULED | RUNNING) == 0 {
157                    (state | SCHEDULED | CLOSED) + REFERENCE
158                } else {
159                    state | CLOSED
160                };
161
162                // Mark the task as closed.
163                match (*header).state.compare_exchange_weak(
164                    state,
165                    new,
166                    Ordering::AcqRel,
167                    Ordering::Acquire,
168                ) {
169                    Ok(_) => {
170                        // If the task is not scheduled nor running, schedule it one more time so
171                        // that its future gets dropped by the executor.
172                        if state & (SCHEDULED | RUNNING) == 0 {
173                            ((*header).vtable.schedule)(ptr);
174                        }
175
176                        // Notify the awaiter that the task has been closed.
177                        if state & AWAITER != 0 {
178                            (*header).notify(None);
179                        }
180
181                        break;
182                    }
183                    Err(s) => state = s,
184                }
185            }
186        }
187    }
188
189    /// Puts the task in detached state.
190    fn set_detached(&mut self) -> Option<T> {
191        let ptr = self.ptr.as_ptr();
192        let header = ptr as *const Header;
193
194        unsafe {
195            // A place where the output will be stored in case it needs to be dropped.
196            let mut output = None;
197
198            // Optimistically assume the `Task` is being detached just after creating the task.
199            // This is a common case so if the `Task` is datached, the overhead of it is only one
200            // compare-exchange operation.
201            if let Err(mut state) = (*header).state.compare_exchange_weak(
202                SCHEDULED | TASK | REFERENCE,
203                SCHEDULED | REFERENCE,
204                Ordering::AcqRel,
205                Ordering::Acquire,
206            ) {
207                loop {
208                    // If the task has been completed but not yet closed, that means its output
209                    // must be dropped.
210                    if state & COMPLETED != 0 && state & CLOSED == 0 {
211                        // Mark the task as closed in order to grab its output.
212                        match (*header).state.compare_exchange_weak(
213                            state,
214                            state | CLOSED,
215                            Ordering::AcqRel,
216                            Ordering::Acquire,
217                        ) {
218                            Ok(_) => {
219                                // Read the output.
220                                output =
221                                    Some((((*header).vtable.get_output)(ptr) as *mut T).read());
222
223                                // Update the state variable because we're continuing the loop.
224                                state |= CLOSED;
225                            }
226                            Err(s) => state = s,
227                        }
228                    } else {
229                        // If this is the last reference to the task and it's not closed, then
230                        // close it and schedule one more time so that its future gets dropped by
231                        // the executor.
232                        let new = if state & (!(REFERENCE - 1) | CLOSED) == 0 {
233                            SCHEDULED | CLOSED | REFERENCE
234                        } else {
235                            state & !TASK
236                        };
237
238                        // Unset the `TASK` flag.
239                        match (*header).state.compare_exchange_weak(
240                            state,
241                            new,
242                            Ordering::AcqRel,
243                            Ordering::Acquire,
244                        ) {
245                            Ok(_) => {
246                                // If this is the last reference to the task, we need to either
247                                // schedule dropping its future or destroy it.
248                                if state & !(REFERENCE - 1) == 0 {
249                                    if state & CLOSED == 0 {
250                                        ((*header).vtable.schedule)(ptr);
251                                    } else {
252                                        ((*header).vtable.destroy)(ptr);
253                                    }
254                                }
255
256                                break;
257                            }
258                            Err(s) => state = s,
259                        }
260                    }
261                }
262            }
263
264            output
265        }
266    }
267
268    /// Polls the task to retrieve its output.
269    ///
270    /// Returns `Some` if the task has completed or `None` if it was closed.
271    ///
272    /// A task becomes closed in the following cases:
273    ///
274    /// 1. It gets canceled by `Runnable::drop()`, `Task::drop()`, or `Task::cancel()`.
275    /// 2. Its output gets awaited by the `Task`.
276    /// 3. It panics while polling the future.
277    /// 4. It is completed and the `Task` gets dropped.
278    fn poll_task(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
279        let ptr = self.ptr.as_ptr();
280        let header = ptr as *const Header;
281
282        unsafe {
283            let mut state = (*header).state.load(Ordering::Acquire);
284
285            loop {
286                // If the task has been closed, notify the awaiter and return `None`.
287                if state & CLOSED != 0 {
288                    // If the task is scheduled or running, we need to wait until its future is
289                    // dropped.
290                    if state & (SCHEDULED | RUNNING) != 0 {
291                        // Replace the waker with one associated with the current task.
292                        (*header).register(cx.waker());
293
294                        // Reload the state after registering. It is possible changes occurred just
295                        // before registration so we need to check for that.
296                        state = (*header).state.load(Ordering::Acquire);
297
298                        // If the task is still scheduled or running, we need to wait because its
299                        // future is not dropped yet.
300                        if state & (SCHEDULED | RUNNING) != 0 {
301                            return Poll::Pending;
302                        }
303                    }
304
305                    // Even though the awaiter is most likely the current task, it could also be
306                    // another task.
307                    (*header).notify(Some(cx.waker()));
308                    return Poll::Ready(None);
309                }
310
311                // If the task is not completed, register the current task.
312                if state & COMPLETED == 0 {
313                    // Replace the waker with one associated with the current task.
314                    (*header).register(cx.waker());
315
316                    // Reload the state after registering. It is possible that the task became
317                    // completed or closed just before registration so we need to check for that.
318                    state = (*header).state.load(Ordering::Acquire);
319
320                    // If the task has been closed, restart.
321                    if state & CLOSED != 0 {
322                        continue;
323                    }
324
325                    // If the task is still not completed, we're blocked on it.
326                    if state & COMPLETED == 0 {
327                        return Poll::Pending;
328                    }
329                }
330
331                // Since the task is now completed, mark it as closed in order to grab its output.
332                match (*header).state.compare_exchange(
333                    state,
334                    state | CLOSED,
335                    Ordering::AcqRel,
336                    Ordering::Acquire,
337                ) {
338                    Ok(_) => {
339                        // Notify the awaiter. Even though the awaiter is most likely the current
340                        // task, it could also be another task.
341                        if state & AWAITER != 0 {
342                            (*header).notify(Some(cx.waker()));
343                        }
344
345                        // Take the output from the task.
346                        let output = ((*header).vtable.get_output)(ptr) as *mut T;
347                        return Poll::Ready(Some(output.read()));
348                    }
349                    Err(s) => state = s,
350                }
351            }
352        }
353    }
354}
355
356impl<T> Drop for Task<T> {
357    fn drop(&mut self) {
358        self.set_canceled();
359        self.set_detached();
360    }
361}
362
363impl<T> Future for Task<T> {
364    type Output = T;
365
366    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
367        match self.poll_task(cx) {
368            Poll::Ready(t) => Poll::Ready(t.expect("task has failed")),
369            Poll::Pending => Poll::Pending,
370        }
371    }
372}
373
374impl<T> fmt::Debug for Task<T> {
375    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
376        let ptr = self.ptr.as_ptr();
377        let header = ptr as *const Header;
378
379        f.debug_struct("Task")
380            .field("header", unsafe { &(*header) })
381            .finish()
382    }
383}