async_task/
raw.rs

1use alloc::alloc::Layout;
2use core::cell::UnsafeCell;
3use core::future::Future;
4use core::mem::{self, ManuallyDrop};
5use core::pin::Pin;
6use core::ptr::NonNull;
7use core::sync::atomic::{AtomicUsize, Ordering};
8use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
9
10use crate::header::Header;
11use crate::state::*;
12use crate::utils::{abort, abort_on_panic, extend};
13use crate::Runnable;
14
15/// The vtable for a task.
16pub(crate) struct TaskVTable {
17    /// Schedules the task.
18    pub(crate) schedule: unsafe fn(*const ()),
19
20    /// Drops the future inside the task.
21    pub(crate) drop_future: unsafe fn(*const ()),
22
23    /// Returns a pointer to the output stored after completion.
24    pub(crate) get_output: unsafe fn(*const ()) -> *const (),
25
26    /// Drops the task reference (`Runnable` or `Waker`).
27    pub(crate) drop_ref: unsafe fn(ptr: *const ()),
28
29    /// Destroys the task.
30    pub(crate) destroy: unsafe fn(*const ()),
31
32    /// Runs the task.
33    pub(crate) run: unsafe fn(*const ()) -> bool,
34
35    /// Creates a new waker associated with the task.
36    pub(crate) clone_waker: unsafe fn(ptr: *const ()) -> RawWaker,
37}
38
39/// Memory layout of a task.
40///
41/// This struct contains the following information:
42///
43/// 1. How to allocate and deallocate the task.
44/// 2. How to access the fields inside the task.
45#[derive(Clone, Copy)]
46pub(crate) struct TaskLayout {
47    /// Memory layout of the whole task.
48    pub(crate) layout: Layout,
49
50    /// Offset into the task at which the schedule function is stored.
51    pub(crate) offset_s: usize,
52
53    /// Offset into the task at which the future is stored.
54    pub(crate) offset_f: usize,
55
56    /// Offset into the task at which the output is stored.
57    pub(crate) offset_r: usize,
58}
59
60/// Raw pointers to the fields inside a task.
61pub(crate) struct RawTask<F, T, S> {
62    /// The task header.
63    pub(crate) header: *const Header,
64
65    /// The schedule function.
66    pub(crate) schedule: *const S,
67
68    /// The future.
69    pub(crate) future: *mut F,
70
71    /// The output of the future.
72    pub(crate) output: *mut T,
73}
74
75impl<F, T, S> Copy for RawTask<F, T, S> {}
76
77impl<F, T, S> Clone for RawTask<F, T, S> {
78    fn clone(&self) -> Self {
79        *self
80    }
81}
82
83impl<F, T, S> RawTask<F, T, S>
84where
85    F: Future<Output = T>,
86    S: Fn(Runnable),
87{
88    const RAW_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
89        Self::clone_waker,
90        Self::wake,
91        Self::wake_by_ref,
92        Self::drop_waker,
93    );
94
95    /// Allocates a task with the given `future` and `schedule` function.
96    ///
97    /// It is assumed that initially only the `Runnable` and the `Task` exist.
98    pub(crate) fn allocate(future: F, schedule: S) -> NonNull<()> {
99        // Compute the layout of the task for allocation. Abort if the computation fails.
100        let task_layout = abort_on_panic(|| Self::task_layout());
101
102        unsafe {
103            // Allocate enough space for the entire task.
104            let ptr = match NonNull::new(alloc::alloc::alloc(task_layout.layout) as *mut ()) {
105                None => abort(),
106                Some(p) => p,
107            };
108
109            let raw = Self::from_ptr(ptr.as_ptr());
110
111            // Write the header as the first field of the task.
112            (raw.header as *mut Header).write(Header {
113                state: AtomicUsize::new(SCHEDULED | TASK | REFERENCE),
114                awaiter: UnsafeCell::new(None),
115                vtable: &TaskVTable {
116                    schedule: Self::schedule,
117                    drop_future: Self::drop_future,
118                    get_output: Self::get_output,
119                    drop_ref: Self::drop_ref,
120                    destroy: Self::destroy,
121                    run: Self::run,
122                    clone_waker: Self::clone_waker,
123                },
124            });
125
126            // Write the schedule function as the third field of the task.
127            (raw.schedule as *mut S).write(schedule);
128
129            // Write the future as the fourth field of the task.
130            raw.future.write(future);
131
132            ptr
133        }
134    }
135
136    /// Creates a `RawTask` from a raw task pointer.
137    #[inline]
138    pub(crate) fn from_ptr(ptr: *const ()) -> Self {
139        let task_layout = Self::task_layout();
140        let p = ptr as *const u8;
141
142        unsafe {
143            Self {
144                header: p as *const Header,
145                schedule: p.add(task_layout.offset_s) as *const S,
146                future: p.add(task_layout.offset_f) as *mut F,
147                output: p.add(task_layout.offset_r) as *mut T,
148            }
149        }
150    }
151
152    /// Returns the memory layout for a task.
153    #[inline]
154    fn task_layout() -> TaskLayout {
155        // Compute the layouts for `Header`, `S`, `F`, and `T`.
156        let layout_header = Layout::new::<Header>();
157        let layout_s = Layout::new::<S>();
158        let layout_f = Layout::new::<F>();
159        let layout_r = Layout::new::<T>();
160
161        // Compute the layout for `union { F, T }`.
162        let size_union = layout_f.size().max(layout_r.size());
163        let align_union = layout_f.align().max(layout_r.align());
164        let layout_union = unsafe { Layout::from_size_align_unchecked(size_union, align_union) };
165
166        // Compute the layout for `Header` followed `S` and `union { F, T }`.
167        let layout = layout_header;
168        let (layout, offset_s) = extend(layout, layout_s);
169        let (layout, offset_union) = extend(layout, layout_union);
170        let offset_f = offset_union;
171        let offset_r = offset_union;
172
173        TaskLayout {
174            layout,
175            offset_s,
176            offset_f,
177            offset_r,
178        }
179    }
180
181    /// Wakes a waker.
182    unsafe fn wake(ptr: *const ()) {
183        // This is just an optimization. If the schedule function has captured variables, then
184        // we'll do less reference counting if we wake the waker by reference and then drop it.
185        if mem::size_of::<S>() > 0 {
186            Self::wake_by_ref(ptr);
187            Self::drop_waker(ptr);
188            return;
189        }
190
191        let raw = Self::from_ptr(ptr);
192
193        let mut state = (*raw.header).state.load(Ordering::Acquire);
194
195        loop {
196            // If the task is completed or closed, it can't be woken up.
197            if state & (COMPLETED | CLOSED) != 0 {
198                // Drop the waker.
199                Self::drop_waker(ptr);
200                break;
201            }
202
203            // If the task is already scheduled, we just need to synchronize with the thread that
204            // will run the task by "publishing" our current view of the memory.
205            if state & SCHEDULED != 0 {
206                // Update the state without actually modifying it.
207                match (*raw.header).state.compare_exchange_weak(
208                    state,
209                    state,
210                    Ordering::AcqRel,
211                    Ordering::Acquire,
212                ) {
213                    Ok(_) => {
214                        // Drop the waker.
215                        Self::drop_waker(ptr);
216                        break;
217                    }
218                    Err(s) => state = s,
219                }
220            } else {
221                // Mark the task as scheduled.
222                match (*raw.header).state.compare_exchange_weak(
223                    state,
224                    state | SCHEDULED,
225                    Ordering::AcqRel,
226                    Ordering::Acquire,
227                ) {
228                    Ok(_) => {
229                        // If the task is not yet scheduled and isn't currently running, now is the
230                        // time to schedule it.
231                        if state & RUNNING == 0 {
232                            // Schedule the task.
233                            Self::schedule(ptr);
234                        } else {
235                            // Drop the waker.
236                            Self::drop_waker(ptr);
237                        }
238
239                        break;
240                    }
241                    Err(s) => state = s,
242                }
243            }
244        }
245    }
246
247    /// Wakes a waker by reference.
248    unsafe fn wake_by_ref(ptr: *const ()) {
249        let raw = Self::from_ptr(ptr);
250
251        let mut state = (*raw.header).state.load(Ordering::Acquire);
252
253        loop {
254            // If the task is completed or closed, it can't be woken up.
255            if state & (COMPLETED | CLOSED) != 0 {
256                break;
257            }
258
259            // If the task is already scheduled, we just need to synchronize with the thread that
260            // will run the task by "publishing" our current view of the memory.
261            if state & SCHEDULED != 0 {
262                // Update the state without actually modifying it.
263                match (*raw.header).state.compare_exchange_weak(
264                    state,
265                    state,
266                    Ordering::AcqRel,
267                    Ordering::Acquire,
268                ) {
269                    Ok(_) => break,
270                    Err(s) => state = s,
271                }
272            } else {
273                // If the task is not running, we can schedule right away.
274                let new = if state & RUNNING == 0 {
275                    (state | SCHEDULED) + REFERENCE
276                } else {
277                    state | SCHEDULED
278                };
279
280                // Mark the task as scheduled.
281                match (*raw.header).state.compare_exchange_weak(
282                    state,
283                    new,
284                    Ordering::AcqRel,
285                    Ordering::Acquire,
286                ) {
287                    Ok(_) => {
288                        // If the task is not running, now is the time to schedule.
289                        if state & RUNNING == 0 {
290                            // If the reference count overflowed, abort.
291                            if state > isize::max_value() as usize {
292                                abort();
293                            }
294
295                            // Schedule the task. There is no need to call `Self::schedule(ptr)`
296                            // because the schedule function cannot be destroyed while the waker is
297                            // still alive.
298                            let task = Runnable {
299                                ptr: NonNull::new_unchecked(ptr as *mut ()),
300                            };
301                            (*raw.schedule)(task);
302                        }
303
304                        break;
305                    }
306                    Err(s) => state = s,
307                }
308            }
309        }
310    }
311
312    /// Clones a waker.
313    unsafe fn clone_waker(ptr: *const ()) -> RawWaker {
314        let raw = Self::from_ptr(ptr);
315
316        // Increment the reference count. With any kind of reference-counted data structure,
317        // relaxed ordering is appropriate when incrementing the counter.
318        let state = (*raw.header).state.fetch_add(REFERENCE, Ordering::Relaxed);
319
320        // If the reference count overflowed, abort.
321        if state > isize::max_value() as usize {
322            abort();
323        }
324
325        RawWaker::new(ptr, &Self::RAW_WAKER_VTABLE)
326    }
327
328    /// Drops a waker.
329    ///
330    /// This function will decrement the reference count. If it drops down to zero, the associated
331    /// `Task` has been dropped too, and the task has not been completed, then it will get
332    /// scheduled one more time so that its future gets dropped by the executor.
333    #[inline]
334    unsafe fn drop_waker(ptr: *const ()) {
335        let raw = Self::from_ptr(ptr);
336
337        // Decrement the reference count.
338        let new = (*raw.header).state.fetch_sub(REFERENCE, Ordering::AcqRel) - REFERENCE;
339
340        // If this was the last reference to the task and the `Task` has been dropped too,
341        // then we need to decide how to destroy the task.
342        if new & !(REFERENCE - 1) == 0 && new & TASK == 0 {
343            if new & (COMPLETED | CLOSED) == 0 {
344                // If the task was not completed nor closed, close it and schedule one more time so
345                // that its future gets dropped by the executor.
346                (*raw.header)
347                    .state
348                    .store(SCHEDULED | CLOSED | REFERENCE, Ordering::Release);
349                Self::schedule(ptr);
350            } else {
351                // Otherwise, destroy the task right away.
352                Self::destroy(ptr);
353            }
354        }
355    }
356
357    /// Drops a task reference (`Runnable` or `Waker`).
358    ///
359    /// This function will decrement the reference count. If it drops down to zero and the
360    /// associated `Task` handle has been dropped too, then the task gets destroyed.
361    #[inline]
362    unsafe fn drop_ref(ptr: *const ()) {
363        let raw = Self::from_ptr(ptr);
364
365        // Decrement the reference count.
366        let new = (*raw.header).state.fetch_sub(REFERENCE, Ordering::AcqRel) - REFERENCE;
367
368        // If this was the last reference to the task and the `Task` has been dropped too,
369        // then destroy the task.
370        if new & !(REFERENCE - 1) == 0 && new & TASK == 0 {
371            Self::destroy(ptr);
372        }
373    }
374
375    /// Schedules a task for running.
376    ///
377    /// This function doesn't modify the state of the task. It only passes the task reference to
378    /// its schedule function.
379    unsafe fn schedule(ptr: *const ()) {
380        let raw = Self::from_ptr(ptr);
381
382        // If the schedule function has captured variables, create a temporary waker that prevents
383        // the task from getting deallocated while the function is being invoked.
384        let _waker;
385        if mem::size_of::<S>() > 0 {
386            _waker = Waker::from_raw(Self::clone_waker(ptr));
387        }
388
389        let task = Runnable {
390            ptr: NonNull::new_unchecked(ptr as *mut ()),
391        };
392        (*raw.schedule)(task);
393    }
394
395    /// Drops the future inside a task.
396    #[inline]
397    unsafe fn drop_future(ptr: *const ()) {
398        let raw = Self::from_ptr(ptr);
399
400        // We need a safeguard against panics because the destructor can panic.
401        abort_on_panic(|| {
402            raw.future.drop_in_place();
403        })
404    }
405
406    /// Returns a pointer to the output inside a task.
407    unsafe fn get_output(ptr: *const ()) -> *const () {
408        let raw = Self::from_ptr(ptr);
409        raw.output as *const ()
410    }
411
412    /// Cleans up task's resources and deallocates it.
413    ///
414    /// The schedule function will be dropped, and the task will then get deallocated.
415    /// The task must be closed before this function is called.
416    #[inline]
417    unsafe fn destroy(ptr: *const ()) {
418        let raw = Self::from_ptr(ptr);
419        let task_layout = Self::task_layout();
420
421        // We need a safeguard against panics because destructors can panic.
422        abort_on_panic(|| {
423            // Drop the schedule function.
424            (raw.schedule as *mut S).drop_in_place();
425        });
426
427        // Finally, deallocate the memory reserved by the task.
428        alloc::alloc::dealloc(ptr as *mut u8, task_layout.layout);
429    }
430
431    /// Runs a task.
432    ///
433    /// If polling its future panics, the task will be closed and the panic will be propagated into
434    /// the caller.
435    unsafe fn run(ptr: *const ()) -> bool {
436        let raw = Self::from_ptr(ptr);
437
438        // Create a context from the raw task pointer and the vtable inside the its header.
439        let waker = ManuallyDrop::new(Waker::from_raw(RawWaker::new(ptr, &Self::RAW_WAKER_VTABLE)));
440        let cx = &mut Context::from_waker(&waker);
441
442        let mut state = (*raw.header).state.load(Ordering::Acquire);
443
444        // Update the task's state before polling its future.
445        loop {
446            // If the task has already been closed, drop the task reference and return.
447            if state & CLOSED != 0 {
448                // Drop the future.
449                Self::drop_future(ptr);
450
451                // Mark the task as unscheduled.
452                let state = (*raw.header).state.fetch_and(!SCHEDULED, Ordering::AcqRel);
453
454                // Take the awaiter out.
455                let mut awaiter = None;
456                if state & AWAITER != 0 {
457                    awaiter = (*raw.header).take(None);
458                }
459
460                // Drop the task reference.
461                Self::drop_ref(ptr);
462
463                // Notify the awaiter that the future has been dropped.
464                if let Some(w) = awaiter {
465                    abort_on_panic(|| w.wake());
466                }
467                return false;
468            }
469
470            // Mark the task as unscheduled and running.
471            match (*raw.header).state.compare_exchange_weak(
472                state,
473                (state & !SCHEDULED) | RUNNING,
474                Ordering::AcqRel,
475                Ordering::Acquire,
476            ) {
477                Ok(_) => {
478                    // Update the state because we're continuing with polling the future.
479                    state = (state & !SCHEDULED) | RUNNING;
480                    break;
481                }
482                Err(s) => state = s,
483            }
484        }
485
486        // Poll the inner future, but surround it with a guard that closes the task in case polling
487        // panics.
488        let guard = Guard(raw);
489        let poll = <F as Future>::poll(Pin::new_unchecked(&mut *raw.future), cx);
490        mem::forget(guard);
491
492        match poll {
493            Poll::Ready(out) => {
494                // Replace the future with its output.
495                Self::drop_future(ptr);
496                raw.output.write(out);
497
498                // The task is now completed.
499                loop {
500                    // If the `Task` is dropped, we'll need to close it and drop the output.
501                    let new = if state & TASK == 0 {
502                        (state & !RUNNING & !SCHEDULED) | COMPLETED | CLOSED
503                    } else {
504                        (state & !RUNNING & !SCHEDULED) | COMPLETED
505                    };
506
507                    // Mark the task as not running and completed.
508                    match (*raw.header).state.compare_exchange_weak(
509                        state,
510                        new,
511                        Ordering::AcqRel,
512                        Ordering::Acquire,
513                    ) {
514                        Ok(_) => {
515                            // If the `Task` is dropped or if the task was closed while running,
516                            // now it's time to drop the output.
517                            if state & TASK == 0 || state & CLOSED != 0 {
518                                // Drop the output.
519                                abort_on_panic(|| raw.output.drop_in_place());
520                            }
521
522                            // Take the awaiter out.
523                            let mut awaiter = None;
524                            if state & AWAITER != 0 {
525                                awaiter = (*raw.header).take(None);
526                            }
527
528                            // Drop the task reference.
529                            Self::drop_ref(ptr);
530
531                            // Notify the awaiter that the future has been dropped.
532                            if let Some(w) = awaiter {
533                                abort_on_panic(|| w.wake());
534                            }
535                            break;
536                        }
537                        Err(s) => state = s,
538                    }
539                }
540            }
541            Poll::Pending => {
542                let mut future_dropped = false;
543
544                // The task is still not completed.
545                loop {
546                    // If the task was closed while running, we'll need to unschedule in case it
547                    // was woken up and then destroy it.
548                    let new = if state & CLOSED != 0 {
549                        state & !RUNNING & !SCHEDULED
550                    } else {
551                        state & !RUNNING
552                    };
553
554                    if state & CLOSED != 0 && !future_dropped {
555                        // The thread that closed the task didn't drop the future because it was
556                        // running so now it's our responsibility to do so.
557                        Self::drop_future(ptr);
558                        future_dropped = true;
559                    }
560
561                    // Mark the task as not running.
562                    match (*raw.header).state.compare_exchange_weak(
563                        state,
564                        new,
565                        Ordering::AcqRel,
566                        Ordering::Acquire,
567                    ) {
568                        Ok(state) => {
569                            // If the task was closed while running, we need to notify the awaiter.
570                            // If the task was woken up while running, we need to schedule it.
571                            // Otherwise, we just drop the task reference.
572                            if state & CLOSED != 0 {
573                                // Take the awaiter out.
574                                let mut awaiter = None;
575                                if state & AWAITER != 0 {
576                                    awaiter = (*raw.header).take(None);
577                                }
578
579                                // Drop the task reference.
580                                Self::drop_ref(ptr);
581
582                                // Notify the awaiter that the future has been dropped.
583                                if let Some(w) = awaiter {
584                                    abort_on_panic(|| w.wake());
585                                }
586                            } else if state & SCHEDULED != 0 {
587                                // The thread that woke the task up didn't reschedule it because
588                                // it was running so now it's our responsibility to do so.
589                                Self::schedule(ptr);
590                                return true;
591                            } else {
592                                // Drop the task reference.
593                                Self::drop_ref(ptr);
594                            }
595                            break;
596                        }
597                        Err(s) => state = s,
598                    }
599                }
600            }
601        }
602
603        return false;
604
605        /// A guard that closes the task if polling its future panics.
606        struct Guard<F, T, S>(RawTask<F, T, S>)
607        where
608            F: Future<Output = T>,
609            S: Fn(Runnable);
610
611        impl<F, T, S> Drop for Guard<F, T, S>
612        where
613            F: Future<Output = T>,
614            S: Fn(Runnable),
615        {
616            fn drop(&mut self) {
617                let raw = self.0;
618                let ptr = raw.header as *const ();
619
620                unsafe {
621                    let mut state = (*raw.header).state.load(Ordering::Acquire);
622
623                    loop {
624                        // If the task was closed while running, then unschedule it, drop its
625                        // future, and drop the task reference.
626                        if state & CLOSED != 0 {
627                            // The thread that closed the task didn't drop the future because it
628                            // was running so now it's our responsibility to do so.
629                            RawTask::<F, T, S>::drop_future(ptr);
630
631                            // Mark the task as not running and not scheduled.
632                            (*raw.header)
633                                .state
634                                .fetch_and(!RUNNING & !SCHEDULED, Ordering::AcqRel);
635
636                            // Take the awaiter out.
637                            let mut awaiter = None;
638                            if state & AWAITER != 0 {
639                                awaiter = (*raw.header).take(None);
640                            }
641
642                            // Drop the task reference.
643                            RawTask::<F, T, S>::drop_ref(ptr);
644
645                            // Notify the awaiter that the future has been dropped.
646                            if let Some(w) = awaiter {
647                                abort_on_panic(|| w.wake());
648                            }
649                            break;
650                        }
651
652                        // Mark the task as not running, not scheduled, and closed.
653                        match (*raw.header).state.compare_exchange_weak(
654                            state,
655                            (state & !RUNNING & !SCHEDULED) | CLOSED,
656                            Ordering::AcqRel,
657                            Ordering::Acquire,
658                        ) {
659                            Ok(state) => {
660                                // Drop the future because the task is now closed.
661                                RawTask::<F, T, S>::drop_future(ptr);
662
663                                // Take the awaiter out.
664                                let mut awaiter = None;
665                                if state & AWAITER != 0 {
666                                    awaiter = (*raw.header).take(None);
667                                }
668
669                                // Drop the task reference.
670                                RawTask::<F, T, S>::drop_ref(ptr);
671
672                                // Notify the awaiter that the future has been dropped.
673                                if let Some(w) = awaiter {
674                                    abort_on_panic(|| w.wake());
675                                }
676                                break;
677                            }
678                            Err(s) => state = s,
679                        }
680                    }
681                }
682            }
683        }
684    }
685}