async_task/
runnable.rs

1use core::fmt;
2use core::future::Future;
3use core::marker::PhantomData;
4use core::mem;
5use core::ptr::NonNull;
6use core::sync::atomic::Ordering;
7use core::task::Waker;
8
9use crate::header::Header;
10use crate::raw::RawTask;
11use crate::state::*;
12use crate::Task;
13
14/// Creates a new task.
15///
16/// The returned [`Runnable`] is used to poll the `future`, and the [`Task`] is used to await its
17/// output.
18///
19/// Method [`run()`][`Runnable::run()`] polls the task's future once. Then, the [`Runnable`]
20/// vanishes and only reappears when its [`Waker`] wakes the task, thus scheduling it to be run
21/// again.
22///
23/// When the task is woken, its [`Runnable`] is passed to the `schedule` function.
24/// The `schedule` function should not attempt to run the [`Runnable`] nor to drop it. Instead, it
25/// should push it into a task queue so that it can be processed later.
26///
27/// If you need to spawn a future that does not implement [`Send`] or isn't `'static`, consider
28/// using [`spawn_local()`] or [`spawn_unchecked()`] instead.
29///
30/// # Examples
31///
32/// ```
33/// // The future inside the task.
34/// let future = async {
35///     println!("Hello, world!");
36/// };
37///
38/// // A function that schedules the task when it gets woken up.
39/// let (s, r) = flume::unbounded();
40/// let schedule = move |runnable| s.send(runnable).unwrap();
41///
42/// // Create a task with the future and the schedule function.
43/// let (runnable, task) = async_task::spawn(future, schedule);
44/// ```
45pub fn spawn<F, S>(future: F, schedule: S) -> (Runnable, Task<F::Output>)
46where
47    F: Future + Send + 'static,
48    F::Output: Send + 'static,
49    S: Fn(Runnable) + Send + Sync + 'static,
50{
51    unsafe { spawn_unchecked(future, schedule) }
52}
53
54/// Creates a new thread-local task.
55///
56/// This function is same as [`spawn()`], except it does not require [`Send`] on `future`. If the
57/// [`Runnable`] is used or dropped on another thread, a panic will occur.
58///
59/// This function is only available when the `std` feature for this crate is enabled.
60///
61/// # Examples
62///
63/// ```
64/// use async_task::Runnable;
65/// use flume::{Receiver, Sender};
66/// use std::rc::Rc;
67///
68/// thread_local! {
69///     // A queue that holds scheduled tasks.
70///     static QUEUE: (Sender<Runnable>, Receiver<Runnable>) = flume::unbounded();
71/// }
72///
73/// // Make a non-Send future.
74/// let msg: Rc<str> = "Hello, world!".into();
75/// let future = async move {
76///     println!("{}", msg);
77/// };
78///
79/// // A function that schedules the task when it gets woken up.
80/// let s = QUEUE.with(|(s, _)| s.clone());
81/// let schedule = move |runnable| s.send(runnable).unwrap();
82///
83/// // Create a task with the future and the schedule function.
84/// let (runnable, task) = async_task::spawn_local(future, schedule);
85/// ```
86#[cfg(feature = "std")]
87pub fn spawn_local<F, S>(future: F, schedule: S) -> (Runnable, Task<F::Output>)
88where
89    F: Future + 'static,
90    F::Output: 'static,
91    S: Fn(Runnable) + Send + Sync + 'static,
92{
93    use std::mem::ManuallyDrop;
94    use std::pin::Pin;
95    use std::task::{Context, Poll};
96    use std::thread::{self, ThreadId};
97
98    #[inline]
99    fn thread_id() -> ThreadId {
100        thread_local! {
101            static ID: ThreadId = thread::current().id();
102        }
103        ID.try_with(|id| *id)
104            .unwrap_or_else(|_| thread::current().id())
105    }
106
107    struct Checked<F> {
108        id: ThreadId,
109        inner: ManuallyDrop<F>,
110    }
111
112    impl<F> Drop for Checked<F> {
113        fn drop(&mut self) {
114            assert!(
115                self.id == thread_id(),
116                "local task dropped by a thread that didn't spawn it"
117            );
118            unsafe {
119                ManuallyDrop::drop(&mut self.inner);
120            }
121        }
122    }
123
124    impl<F: Future> Future for Checked<F> {
125        type Output = F::Output;
126
127        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
128            assert!(
129                self.id == thread_id(),
130                "local task polled by a thread that didn't spawn it"
131            );
132            unsafe { self.map_unchecked_mut(|c| &mut *c.inner).poll(cx) }
133        }
134    }
135
136    // Wrap the future into one that checks which thread it's on.
137    let future = Checked {
138        id: thread_id(),
139        inner: ManuallyDrop::new(future),
140    };
141
142    unsafe { spawn_unchecked(future, schedule) }
143}
144
145/// Creates a new task without [`Send`], [`Sync`], and `'static` bounds.
146///
147/// This function is same as [`spawn()`], except it does not require [`Send`], [`Sync`], and
148/// `'static` on `future` and `schedule`.
149///
150/// Safety requirements:
151///
152/// - If `future` is not [`Send`], its [`Runnable`] must be used and dropped on the original
153///   thread.
154/// - If `future` is not `'static`, borrowed variables must outlive its [`Runnable`].
155/// - If `schedule` is not [`Send`] and [`Sync`], the task's [`Waker`] must be used and dropped on
156///   the original thread.
157/// - If `schedule` is not `'static`, borrowed variables must outlive the task's [`Waker`].
158///
159/// # Examples
160///
161/// ```
162/// // The future inside the task.
163/// let future = async {
164///     println!("Hello, world!");
165/// };
166///
167/// // If the task gets woken up, it will be sent into this channel.
168/// let (s, r) = flume::unbounded();
169/// let schedule = move |runnable| s.send(runnable).unwrap();
170///
171/// // Create a task with the future and the schedule function.
172/// let (runnable, task) = unsafe { async_task::spawn_unchecked(future, schedule) };
173/// ```
174pub unsafe fn spawn_unchecked<F, S>(future: F, schedule: S) -> (Runnable, Task<F::Output>)
175where
176    F: Future,
177    S: Fn(Runnable),
178{
179    // Allocate large futures on the heap.
180    let ptr = if mem::size_of::<F>() >= 2048 {
181        let future = alloc::boxed::Box::pin(future);
182        RawTask::<_, F::Output, S>::allocate(future, schedule)
183    } else {
184        RawTask::<F, F::Output, S>::allocate(future, schedule)
185    };
186
187    let runnable = Runnable { ptr };
188    let task = Task {
189        ptr,
190        _marker: PhantomData,
191    };
192    (runnable, task)
193}
194
195/// A handle to a runnable task.
196///
197/// Every spawned task has a single [`Runnable`] handle, which only exists when the task is
198/// scheduled for running.
199///
200/// Method [`run()`][`Runnable::run()`] polls the task's future once. Then, the [`Runnable`]
201/// vanishes and only reappears when its [`Waker`] wakes the task, thus scheduling it to be run
202/// again.
203///
204/// Dropping a [`Runnable`] cancels the task, which means its future won't be polled again, and
205/// awaiting the [`Task`] after that will result in a panic.
206///
207/// # Examples
208///
209/// ```
210/// use async_task::Runnable;
211/// use once_cell::sync::Lazy;
212/// use std::{panic, thread};
213///
214/// // A simple executor.
215/// static QUEUE: Lazy<flume::Sender<Runnable>> = Lazy::new(|| {
216///     let (sender, receiver) = flume::unbounded::<Runnable>();
217///     thread::spawn(|| {
218///         for runnable in receiver {
219///             let _ignore_panic = panic::catch_unwind(|| runnable.run());
220///         }
221///     });
222///     sender
223/// });
224///
225/// // Create a task with a simple future.
226/// let schedule = |runnable| QUEUE.send(runnable).unwrap();
227/// let (runnable, task) = async_task::spawn(async { 1 + 2 }, schedule);
228///
229/// // Schedule the task and await its output.
230/// runnable.schedule();
231/// assert_eq!(smol::future::block_on(task), 3);
232/// ```
233pub struct Runnable {
234    /// A pointer to the heap-allocated task.
235    pub(crate) ptr: NonNull<()>,
236}
237
238unsafe impl Send for Runnable {}
239unsafe impl Sync for Runnable {}
240
241#[cfg(feature = "std")]
242impl std::panic::UnwindSafe for Runnable {}
243#[cfg(feature = "std")]
244impl std::panic::RefUnwindSafe for Runnable {}
245
246impl Runnable {
247    /// Schedules the task.
248    ///
249    /// This is a convenience method that passes the [`Runnable`] to the schedule function.
250    ///
251    /// # Examples
252    ///
253    /// ```
254    /// // A function that schedules the task when it gets woken up.
255    /// let (s, r) = flume::unbounded();
256    /// let schedule = move |runnable| s.send(runnable).unwrap();
257    ///
258    /// // Create a task with a simple future and the schedule function.
259    /// let (runnable, task) = async_task::spawn(async {}, schedule);
260    ///
261    /// // Schedule the task.
262    /// assert_eq!(r.len(), 0);
263    /// runnable.schedule();
264    /// assert_eq!(r.len(), 1);
265    /// ```
266    pub fn schedule(self) {
267        let ptr = self.ptr.as_ptr();
268        let header = ptr as *const Header;
269        mem::forget(self);
270
271        unsafe {
272            ((*header).vtable.schedule)(ptr);
273        }
274    }
275
276    /// Runs the task by polling its future.
277    ///
278    /// Returns `true` if the task was woken while running, in which case the [`Runnable`] gets
279    /// rescheduled at the end of this method invocation. Otherwise, returns `false` and the
280    /// [`Runnable`] vanishes until the task is woken.
281    /// The return value is just a hint: `true` usually indicates that the task has yielded, i.e.
282    /// it woke itself and then gave the control back to the executor.
283    ///
284    /// If the [`Task`] handle was dropped or if [`cancel()`][`Task::cancel()`] was called, then
285    /// this method simply destroys the task.
286    ///
287    /// If the polled future panics, this method propagates the panic, and awaiting the [`Task`]
288    /// after that will also result in a panic.
289    ///
290    /// # Examples
291    ///
292    /// ```
293    /// // A function that schedules the task when it gets woken up.
294    /// let (s, r) = flume::unbounded();
295    /// let schedule = move |runnable| s.send(runnable).unwrap();
296    ///
297    /// // Create a task with a simple future and the schedule function.
298    /// let (runnable, task) = async_task::spawn(async { 1 + 2 }, schedule);
299    ///
300    /// // Run the task and check its output.
301    /// runnable.run();
302    /// assert_eq!(smol::future::block_on(task), 3);
303    /// ```
304    pub fn run(self) -> bool {
305        let ptr = self.ptr.as_ptr();
306        let header = ptr as *const Header;
307        mem::forget(self);
308
309        unsafe { ((*header).vtable.run)(ptr) }
310    }
311
312    /// Returns a waker associated with this task.
313    ///
314    /// # Examples
315    ///
316    /// ```
317    /// use smol::future;
318    ///
319    /// // A function that schedules the task when it gets woken up.
320    /// let (s, r) = flume::unbounded();
321    /// let schedule = move |runnable| s.send(runnable).unwrap();
322    ///
323    /// // Create a task with a simple future and the schedule function.
324    /// let (runnable, task) = async_task::spawn(future::pending::<()>(), schedule);
325    ///
326    /// // Take a waker and run the task.
327    /// let waker = runnable.waker();
328    /// runnable.run();
329    ///
330    /// // Reschedule the task by waking it.
331    /// assert_eq!(r.len(), 0);
332    /// waker.wake();
333    /// assert_eq!(r.len(), 1);
334    /// ```
335    pub fn waker(&self) -> Waker {
336        let ptr = self.ptr.as_ptr();
337        let header = ptr as *const Header;
338
339        unsafe {
340            let raw_waker = ((*header).vtable.clone_waker)(ptr);
341            Waker::from_raw(raw_waker)
342        }
343    }
344}
345
346impl Drop for Runnable {
347    fn drop(&mut self) {
348        let ptr = self.ptr.as_ptr();
349        let header = ptr as *const Header;
350
351        unsafe {
352            let mut state = (*header).state.load(Ordering::Acquire);
353
354            loop {
355                // If the task has been completed or closed, it can't be canceled.
356                if state & (COMPLETED | CLOSED) != 0 {
357                    break;
358                }
359
360                // Mark the task as closed.
361                match (*header).state.compare_exchange_weak(
362                    state,
363                    state | CLOSED,
364                    Ordering::AcqRel,
365                    Ordering::Acquire,
366                ) {
367                    Ok(_) => break,
368                    Err(s) => state = s,
369                }
370            }
371
372            // Drop the future.
373            ((*header).vtable.drop_future)(ptr);
374
375            // Mark the task as unscheduled.
376            let state = (*header).state.fetch_and(!SCHEDULED, Ordering::AcqRel);
377
378            // Notify the awaiter that the future has been dropped.
379            if state & AWAITER != 0 {
380                (*header).notify(None);
381            }
382
383            // Drop the task reference.
384            ((*header).vtable.drop_ref)(ptr);
385        }
386    }
387}
388
389impl fmt::Debug for Runnable {
390    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
391        let ptr = self.ptr.as_ptr();
392        let header = ptr as *const Header;
393
394        f.debug_struct("Runnable")
395            .field("header", unsafe { &(*header) })
396            .finish()
397    }
398}