async_task/raw.rs
1use alloc::alloc::Layout;
2use core::cell::UnsafeCell;
3use core::future::Future;
4use core::mem::{self, ManuallyDrop};
5use core::pin::Pin;
6use core::ptr::NonNull;
7use core::sync::atomic::{AtomicUsize, Ordering};
8use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
9
10use crate::header::Header;
11use crate::state::*;
12use crate::utils::{abort, abort_on_panic, extend};
13use crate::Runnable;
14
15/// The vtable for a task.
16pub(crate) struct TaskVTable {
17 /// Schedules the task.
18 pub(crate) schedule: unsafe fn(*const ()),
19
20 /// Drops the future inside the task.
21 pub(crate) drop_future: unsafe fn(*const ()),
22
23 /// Returns a pointer to the output stored after completion.
24 pub(crate) get_output: unsafe fn(*const ()) -> *const (),
25
26 /// Drops the task reference (`Runnable` or `Waker`).
27 pub(crate) drop_ref: unsafe fn(ptr: *const ()),
28
29 /// Destroys the task.
30 pub(crate) destroy: unsafe fn(*const ()),
31
32 /// Runs the task.
33 pub(crate) run: unsafe fn(*const ()) -> bool,
34
35 /// Creates a new waker associated with the task.
36 pub(crate) clone_waker: unsafe fn(ptr: *const ()) -> RawWaker,
37}
38
39/// Memory layout of a task.
40///
41/// This struct contains the following information:
42///
43/// 1. How to allocate and deallocate the task.
44/// 2. How to access the fields inside the task.
45#[derive(Clone, Copy)]
46pub(crate) struct TaskLayout {
47 /// Memory layout of the whole task.
48 pub(crate) layout: Layout,
49
50 /// Offset into the task at which the schedule function is stored.
51 pub(crate) offset_s: usize,
52
53 /// Offset into the task at which the future is stored.
54 pub(crate) offset_f: usize,
55
56 /// Offset into the task at which the output is stored.
57 pub(crate) offset_r: usize,
58}
59
60/// Raw pointers to the fields inside a task.
61pub(crate) struct RawTask<F, T, S> {
62 /// The task header.
63 pub(crate) header: *const Header,
64
65 /// The schedule function.
66 pub(crate) schedule: *const S,
67
68 /// The future.
69 pub(crate) future: *mut F,
70
71 /// The output of the future.
72 pub(crate) output: *mut T,
73}
74
75impl<F, T, S> Copy for RawTask<F, T, S> {}
76
77impl<F, T, S> Clone for RawTask<F, T, S> {
78 fn clone(&self) -> Self {
79 *self
80 }
81}
82
83impl<F, T, S> RawTask<F, T, S>
84where
85 F: Future<Output = T>,
86 S: Fn(Runnable),
87{
88 const RAW_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
89 Self::clone_waker,
90 Self::wake,
91 Self::wake_by_ref,
92 Self::drop_waker,
93 );
94
95 /// Allocates a task with the given `future` and `schedule` function.
96 ///
97 /// It is assumed that initially only the `Runnable` and the `Task` exist.
98 pub(crate) fn allocate(future: F, schedule: S) -> NonNull<()> {
99 // Compute the layout of the task for allocation. Abort if the computation fails.
100 let task_layout = abort_on_panic(|| Self::task_layout());
101
102 unsafe {
103 // Allocate enough space for the entire task.
104 let ptr = match NonNull::new(alloc::alloc::alloc(task_layout.layout) as *mut ()) {
105 None => abort(),
106 Some(p) => p,
107 };
108
109 let raw = Self::from_ptr(ptr.as_ptr());
110
111 // Write the header as the first field of the task.
112 (raw.header as *mut Header).write(Header {
113 state: AtomicUsize::new(SCHEDULED | TASK | REFERENCE),
114 awaiter: UnsafeCell::new(None),
115 vtable: &TaskVTable {
116 schedule: Self::schedule,
117 drop_future: Self::drop_future,
118 get_output: Self::get_output,
119 drop_ref: Self::drop_ref,
120 destroy: Self::destroy,
121 run: Self::run,
122 clone_waker: Self::clone_waker,
123 },
124 });
125
126 // Write the schedule function as the third field of the task.
127 (raw.schedule as *mut S).write(schedule);
128
129 // Write the future as the fourth field of the task.
130 raw.future.write(future);
131
132 ptr
133 }
134 }
135
136 /// Creates a `RawTask` from a raw task pointer.
137 #[inline]
138 pub(crate) fn from_ptr(ptr: *const ()) -> Self {
139 let task_layout = Self::task_layout();
140 let p = ptr as *const u8;
141
142 unsafe {
143 Self {
144 header: p as *const Header,
145 schedule: p.add(task_layout.offset_s) as *const S,
146 future: p.add(task_layout.offset_f) as *mut F,
147 output: p.add(task_layout.offset_r) as *mut T,
148 }
149 }
150 }
151
152 /// Returns the memory layout for a task.
153 #[inline]
154 fn task_layout() -> TaskLayout {
155 // Compute the layouts for `Header`, `S`, `F`, and `T`.
156 let layout_header = Layout::new::<Header>();
157 let layout_s = Layout::new::<S>();
158 let layout_f = Layout::new::<F>();
159 let layout_r = Layout::new::<T>();
160
161 // Compute the layout for `union { F, T }`.
162 let size_union = layout_f.size().max(layout_r.size());
163 let align_union = layout_f.align().max(layout_r.align());
164 let layout_union = unsafe { Layout::from_size_align_unchecked(size_union, align_union) };
165
166 // Compute the layout for `Header` followed `S` and `union { F, T }`.
167 let layout = layout_header;
168 let (layout, offset_s) = extend(layout, layout_s);
169 let (layout, offset_union) = extend(layout, layout_union);
170 let offset_f = offset_union;
171 let offset_r = offset_union;
172
173 TaskLayout {
174 layout,
175 offset_s,
176 offset_f,
177 offset_r,
178 }
179 }
180
181 /// Wakes a waker.
182 unsafe fn wake(ptr: *const ()) {
183 // This is just an optimization. If the schedule function has captured variables, then
184 // we'll do less reference counting if we wake the waker by reference and then drop it.
185 if mem::size_of::<S>() > 0 {
186 Self::wake_by_ref(ptr);
187 Self::drop_waker(ptr);
188 return;
189 }
190
191 let raw = Self::from_ptr(ptr);
192
193 let mut state = (*raw.header).state.load(Ordering::Acquire);
194
195 loop {
196 // If the task is completed or closed, it can't be woken up.
197 if state & (COMPLETED | CLOSED) != 0 {
198 // Drop the waker.
199 Self::drop_waker(ptr);
200 break;
201 }
202
203 // If the task is already scheduled, we just need to synchronize with the thread that
204 // will run the task by "publishing" our current view of the memory.
205 if state & SCHEDULED != 0 {
206 // Update the state without actually modifying it.
207 match (*raw.header).state.compare_exchange_weak(
208 state,
209 state,
210 Ordering::AcqRel,
211 Ordering::Acquire,
212 ) {
213 Ok(_) => {
214 // Drop the waker.
215 Self::drop_waker(ptr);
216 break;
217 }
218 Err(s) => state = s,
219 }
220 } else {
221 // Mark the task as scheduled.
222 match (*raw.header).state.compare_exchange_weak(
223 state,
224 state | SCHEDULED,
225 Ordering::AcqRel,
226 Ordering::Acquire,
227 ) {
228 Ok(_) => {
229 // If the task is not yet scheduled and isn't currently running, now is the
230 // time to schedule it.
231 if state & RUNNING == 0 {
232 // Schedule the task.
233 Self::schedule(ptr);
234 } else {
235 // Drop the waker.
236 Self::drop_waker(ptr);
237 }
238
239 break;
240 }
241 Err(s) => state = s,
242 }
243 }
244 }
245 }
246
247 /// Wakes a waker by reference.
248 unsafe fn wake_by_ref(ptr: *const ()) {
249 let raw = Self::from_ptr(ptr);
250
251 let mut state = (*raw.header).state.load(Ordering::Acquire);
252
253 loop {
254 // If the task is completed or closed, it can't be woken up.
255 if state & (COMPLETED | CLOSED) != 0 {
256 break;
257 }
258
259 // If the task is already scheduled, we just need to synchronize with the thread that
260 // will run the task by "publishing" our current view of the memory.
261 if state & SCHEDULED != 0 {
262 // Update the state without actually modifying it.
263 match (*raw.header).state.compare_exchange_weak(
264 state,
265 state,
266 Ordering::AcqRel,
267 Ordering::Acquire,
268 ) {
269 Ok(_) => break,
270 Err(s) => state = s,
271 }
272 } else {
273 // If the task is not running, we can schedule right away.
274 let new = if state & RUNNING == 0 {
275 (state | SCHEDULED) + REFERENCE
276 } else {
277 state | SCHEDULED
278 };
279
280 // Mark the task as scheduled.
281 match (*raw.header).state.compare_exchange_weak(
282 state,
283 new,
284 Ordering::AcqRel,
285 Ordering::Acquire,
286 ) {
287 Ok(_) => {
288 // If the task is not running, now is the time to schedule.
289 if state & RUNNING == 0 {
290 // If the reference count overflowed, abort.
291 if state > isize::max_value() as usize {
292 abort();
293 }
294
295 // Schedule the task. There is no need to call `Self::schedule(ptr)`
296 // because the schedule function cannot be destroyed while the waker is
297 // still alive.
298 let task = Runnable {
299 ptr: NonNull::new_unchecked(ptr as *mut ()),
300 };
301 (*raw.schedule)(task);
302 }
303
304 break;
305 }
306 Err(s) => state = s,
307 }
308 }
309 }
310 }
311
312 /// Clones a waker.
313 unsafe fn clone_waker(ptr: *const ()) -> RawWaker {
314 let raw = Self::from_ptr(ptr);
315
316 // Increment the reference count. With any kind of reference-counted data structure,
317 // relaxed ordering is appropriate when incrementing the counter.
318 let state = (*raw.header).state.fetch_add(REFERENCE, Ordering::Relaxed);
319
320 // If the reference count overflowed, abort.
321 if state > isize::max_value() as usize {
322 abort();
323 }
324
325 RawWaker::new(ptr, &Self::RAW_WAKER_VTABLE)
326 }
327
328 /// Drops a waker.
329 ///
330 /// This function will decrement the reference count. If it drops down to zero, the associated
331 /// `Task` has been dropped too, and the task has not been completed, then it will get
332 /// scheduled one more time so that its future gets dropped by the executor.
333 #[inline]
334 unsafe fn drop_waker(ptr: *const ()) {
335 let raw = Self::from_ptr(ptr);
336
337 // Decrement the reference count.
338 let new = (*raw.header).state.fetch_sub(REFERENCE, Ordering::AcqRel) - REFERENCE;
339
340 // If this was the last reference to the task and the `Task` has been dropped too,
341 // then we need to decide how to destroy the task.
342 if new & !(REFERENCE - 1) == 0 && new & TASK == 0 {
343 if new & (COMPLETED | CLOSED) == 0 {
344 // If the task was not completed nor closed, close it and schedule one more time so
345 // that its future gets dropped by the executor.
346 (*raw.header)
347 .state
348 .store(SCHEDULED | CLOSED | REFERENCE, Ordering::Release);
349 Self::schedule(ptr);
350 } else {
351 // Otherwise, destroy the task right away.
352 Self::destroy(ptr);
353 }
354 }
355 }
356
357 /// Drops a task reference (`Runnable` or `Waker`).
358 ///
359 /// This function will decrement the reference count. If it drops down to zero and the
360 /// associated `Task` handle has been dropped too, then the task gets destroyed.
361 #[inline]
362 unsafe fn drop_ref(ptr: *const ()) {
363 let raw = Self::from_ptr(ptr);
364
365 // Decrement the reference count.
366 let new = (*raw.header).state.fetch_sub(REFERENCE, Ordering::AcqRel) - REFERENCE;
367
368 // If this was the last reference to the task and the `Task` has been dropped too,
369 // then destroy the task.
370 if new & !(REFERENCE - 1) == 0 && new & TASK == 0 {
371 Self::destroy(ptr);
372 }
373 }
374
375 /// Schedules a task for running.
376 ///
377 /// This function doesn't modify the state of the task. It only passes the task reference to
378 /// its schedule function.
379 unsafe fn schedule(ptr: *const ()) {
380 let raw = Self::from_ptr(ptr);
381
382 // If the schedule function has captured variables, create a temporary waker that prevents
383 // the task from getting deallocated while the function is being invoked.
384 let _waker;
385 if mem::size_of::<S>() > 0 {
386 _waker = Waker::from_raw(Self::clone_waker(ptr));
387 }
388
389 let task = Runnable {
390 ptr: NonNull::new_unchecked(ptr as *mut ()),
391 };
392 (*raw.schedule)(task);
393 }
394
395 /// Drops the future inside a task.
396 #[inline]
397 unsafe fn drop_future(ptr: *const ()) {
398 let raw = Self::from_ptr(ptr);
399
400 // We need a safeguard against panics because the destructor can panic.
401 abort_on_panic(|| {
402 raw.future.drop_in_place();
403 })
404 }
405
406 /// Returns a pointer to the output inside a task.
407 unsafe fn get_output(ptr: *const ()) -> *const () {
408 let raw = Self::from_ptr(ptr);
409 raw.output as *const ()
410 }
411
412 /// Cleans up task's resources and deallocates it.
413 ///
414 /// The schedule function will be dropped, and the task will then get deallocated.
415 /// The task must be closed before this function is called.
416 #[inline]
417 unsafe fn destroy(ptr: *const ()) {
418 let raw = Self::from_ptr(ptr);
419 let task_layout = Self::task_layout();
420
421 // We need a safeguard against panics because destructors can panic.
422 abort_on_panic(|| {
423 // Drop the schedule function.
424 (raw.schedule as *mut S).drop_in_place();
425 });
426
427 // Finally, deallocate the memory reserved by the task.
428 alloc::alloc::dealloc(ptr as *mut u8, task_layout.layout);
429 }
430
431 /// Runs a task.
432 ///
433 /// If polling its future panics, the task will be closed and the panic will be propagated into
434 /// the caller.
435 unsafe fn run(ptr: *const ()) -> bool {
436 let raw = Self::from_ptr(ptr);
437
438 // Create a context from the raw task pointer and the vtable inside the its header.
439 let waker = ManuallyDrop::new(Waker::from_raw(RawWaker::new(ptr, &Self::RAW_WAKER_VTABLE)));
440 let cx = &mut Context::from_waker(&waker);
441
442 let mut state = (*raw.header).state.load(Ordering::Acquire);
443
444 // Update the task's state before polling its future.
445 loop {
446 // If the task has already been closed, drop the task reference and return.
447 if state & CLOSED != 0 {
448 // Drop the future.
449 Self::drop_future(ptr);
450
451 // Mark the task as unscheduled.
452 let state = (*raw.header).state.fetch_and(!SCHEDULED, Ordering::AcqRel);
453
454 // Take the awaiter out.
455 let mut awaiter = None;
456 if state & AWAITER != 0 {
457 awaiter = (*raw.header).take(None);
458 }
459
460 // Drop the task reference.
461 Self::drop_ref(ptr);
462
463 // Notify the awaiter that the future has been dropped.
464 if let Some(w) = awaiter {
465 abort_on_panic(|| w.wake());
466 }
467 return false;
468 }
469
470 // Mark the task as unscheduled and running.
471 match (*raw.header).state.compare_exchange_weak(
472 state,
473 (state & !SCHEDULED) | RUNNING,
474 Ordering::AcqRel,
475 Ordering::Acquire,
476 ) {
477 Ok(_) => {
478 // Update the state because we're continuing with polling the future.
479 state = (state & !SCHEDULED) | RUNNING;
480 break;
481 }
482 Err(s) => state = s,
483 }
484 }
485
486 // Poll the inner future, but surround it with a guard that closes the task in case polling
487 // panics.
488 let guard = Guard(raw);
489 let poll = <F as Future>::poll(Pin::new_unchecked(&mut *raw.future), cx);
490 mem::forget(guard);
491
492 match poll {
493 Poll::Ready(out) => {
494 // Replace the future with its output.
495 Self::drop_future(ptr);
496 raw.output.write(out);
497
498 // The task is now completed.
499 loop {
500 // If the `Task` is dropped, we'll need to close it and drop the output.
501 let new = if state & TASK == 0 {
502 (state & !RUNNING & !SCHEDULED) | COMPLETED | CLOSED
503 } else {
504 (state & !RUNNING & !SCHEDULED) | COMPLETED
505 };
506
507 // Mark the task as not running and completed.
508 match (*raw.header).state.compare_exchange_weak(
509 state,
510 new,
511 Ordering::AcqRel,
512 Ordering::Acquire,
513 ) {
514 Ok(_) => {
515 // If the `Task` is dropped or if the task was closed while running,
516 // now it's time to drop the output.
517 if state & TASK == 0 || state & CLOSED != 0 {
518 // Drop the output.
519 abort_on_panic(|| raw.output.drop_in_place());
520 }
521
522 // Take the awaiter out.
523 let mut awaiter = None;
524 if state & AWAITER != 0 {
525 awaiter = (*raw.header).take(None);
526 }
527
528 // Drop the task reference.
529 Self::drop_ref(ptr);
530
531 // Notify the awaiter that the future has been dropped.
532 if let Some(w) = awaiter {
533 abort_on_panic(|| w.wake());
534 }
535 break;
536 }
537 Err(s) => state = s,
538 }
539 }
540 }
541 Poll::Pending => {
542 let mut future_dropped = false;
543
544 // The task is still not completed.
545 loop {
546 // If the task was closed while running, we'll need to unschedule in case it
547 // was woken up and then destroy it.
548 let new = if state & CLOSED != 0 {
549 state & !RUNNING & !SCHEDULED
550 } else {
551 state & !RUNNING
552 };
553
554 if state & CLOSED != 0 && !future_dropped {
555 // The thread that closed the task didn't drop the future because it was
556 // running so now it's our responsibility to do so.
557 Self::drop_future(ptr);
558 future_dropped = true;
559 }
560
561 // Mark the task as not running.
562 match (*raw.header).state.compare_exchange_weak(
563 state,
564 new,
565 Ordering::AcqRel,
566 Ordering::Acquire,
567 ) {
568 Ok(state) => {
569 // If the task was closed while running, we need to notify the awaiter.
570 // If the task was woken up while running, we need to schedule it.
571 // Otherwise, we just drop the task reference.
572 if state & CLOSED != 0 {
573 // Take the awaiter out.
574 let mut awaiter = None;
575 if state & AWAITER != 0 {
576 awaiter = (*raw.header).take(None);
577 }
578
579 // Drop the task reference.
580 Self::drop_ref(ptr);
581
582 // Notify the awaiter that the future has been dropped.
583 if let Some(w) = awaiter {
584 abort_on_panic(|| w.wake());
585 }
586 } else if state & SCHEDULED != 0 {
587 // The thread that woke the task up didn't reschedule it because
588 // it was running so now it's our responsibility to do so.
589 Self::schedule(ptr);
590 return true;
591 } else {
592 // Drop the task reference.
593 Self::drop_ref(ptr);
594 }
595 break;
596 }
597 Err(s) => state = s,
598 }
599 }
600 }
601 }
602
603 return false;
604
605 /// A guard that closes the task if polling its future panics.
606 struct Guard<F, T, S>(RawTask<F, T, S>)
607 where
608 F: Future<Output = T>,
609 S: Fn(Runnable);
610
611 impl<F, T, S> Drop for Guard<F, T, S>
612 where
613 F: Future<Output = T>,
614 S: Fn(Runnable),
615 {
616 fn drop(&mut self) {
617 let raw = self.0;
618 let ptr = raw.header as *const ();
619
620 unsafe {
621 let mut state = (*raw.header).state.load(Ordering::Acquire);
622
623 loop {
624 // If the task was closed while running, then unschedule it, drop its
625 // future, and drop the task reference.
626 if state & CLOSED != 0 {
627 // The thread that closed the task didn't drop the future because it
628 // was running so now it's our responsibility to do so.
629 RawTask::<F, T, S>::drop_future(ptr);
630
631 // Mark the task as not running and not scheduled.
632 (*raw.header)
633 .state
634 .fetch_and(!RUNNING & !SCHEDULED, Ordering::AcqRel);
635
636 // Take the awaiter out.
637 let mut awaiter = None;
638 if state & AWAITER != 0 {
639 awaiter = (*raw.header).take(None);
640 }
641
642 // Drop the task reference.
643 RawTask::<F, T, S>::drop_ref(ptr);
644
645 // Notify the awaiter that the future has been dropped.
646 if let Some(w) = awaiter {
647 abort_on_panic(|| w.wake());
648 }
649 break;
650 }
651
652 // Mark the task as not running, not scheduled, and closed.
653 match (*raw.header).state.compare_exchange_weak(
654 state,
655 (state & !RUNNING & !SCHEDULED) | CLOSED,
656 Ordering::AcqRel,
657 Ordering::Acquire,
658 ) {
659 Ok(state) => {
660 // Drop the future because the task is now closed.
661 RawTask::<F, T, S>::drop_future(ptr);
662
663 // Take the awaiter out.
664 let mut awaiter = None;
665 if state & AWAITER != 0 {
666 awaiter = (*raw.header).take(None);
667 }
668
669 // Drop the task reference.
670 RawTask::<F, T, S>::drop_ref(ptr);
671
672 // Notify the awaiter that the future has been dropped.
673 if let Some(w) = awaiter {
674 abort_on_panic(|| w.wake());
675 }
676 break;
677 }
678 Err(s) => state = s,
679 }
680 }
681 }
682 }
683 }
684 }
685}