1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947
// Copyright 2021 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use super::super::timer::{TimeWaker, TimerHandle, TimerHeap};
use super::{
instrumentation::{Collector, LocalCollector, WakeupReason},
packets::{PacketReceiver, PacketReceiverMap, ReceiverRegistration},
time::Time,
};
use crate::atomic_future::{AtomicFuture, AttemptPollResult};
use crossbeam::queue::SegQueue;
use fuchsia_sync::Mutex;
use fuchsia_zircon::{self as zx};
use rustc_hash::FxHashMap as HashMap;
use std::{
any::Any,
cell::RefCell,
fmt,
future::Future,
mem::ManuallyDrop,
panic::Location,
sync::atomic::{AtomicBool, AtomicI64, AtomicU16, AtomicU64, AtomicUsize, Ordering},
sync::{Arc, Weak},
task::{Context, Poll, RawWaker, RawWakerVTable, Waker},
u64, usize,
};
pub(crate) const TASK_READY_WAKEUP_ID: u64 = u64::MAX - 1;
/// The id of the main task, which is a virtual task that lives from construction
/// to destruction of the executor. The main task may correspond to multiple
/// main futures, in cases where the executor runs multiple times during its lifetime.
pub(crate) const MAIN_TASK_ID: usize = 0;
thread_local!(
static EXECUTOR: RefCell<Option<(Arc<Inner>, TimerHeap)>> = RefCell::new(None)
);
pub(crate) fn with_local_timer_heap<F, R>(f: F) -> R
where
F: FnOnce(&mut TimerHeap) -> R,
{
EXECUTOR.with(|e| {
(f)(&mut e
.borrow_mut()
.as_mut()
.expect("can't get timer heap before fuchsia_async::Executor is initialized")
.1)
})
}
pub enum ExecutorTime {
RealTime,
FakeTime(AtomicI64),
}
enum PollReadyTasksResult {
NoneReady,
MoreReady,
MainTaskCompleted,
}
// -- Helpers for threads_state below --
fn threads_sleeping(state: u16) -> u8 {
state as u8
}
fn threads_notified(state: u16) -> u8 {
(state >> 8) as u8
}
fn make_threads_state(sleeping: u8, notified: u8) -> u16 {
sleeping as u16 | ((notified as u16) << 8)
}
pub(super) struct Inner {
pub(super) port: zx::Port,
pub(super) done: AtomicBool,
is_local: bool,
receivers: Mutex<PacketReceiverMap<Arc<dyn PacketReceiver>>>,
task_count: AtomicUsize,
task_state: Mutex<TaskState>,
pub(super) ready_tasks: SegQueue<Arc<Task>>,
time: ExecutorTime,
pub(super) collector: Collector,
pub(super) source: Option<&'static Location<'static>>,
// The low byte is the number of threads currently sleeping. The high byte is the number of
// of wake-up notifications pending.
pub(super) threads_state: AtomicU16,
pub(super) num_threads: u8,
pub(super) polled: AtomicU64,
// Data that belongs to the user that can be accessed via EHandle::local(). See
// `TestExecutor::poll_until_stalled`.
pub(super) owner_data: Mutex<Option<Box<dyn Any + Send>>>,
}
struct TaskState {
all_tasks: HashMap<usize, Arc<Task>>,
join_wakers: HashMap<usize, Waker>,
}
impl Inner {
#[cfg_attr(trace_level_logging, track_caller)]
pub fn new(time: ExecutorTime, is_local: bool, num_threads: u8) -> Self {
#[cfg(trace_level_logging)]
let source = Some(Location::caller());
#[cfg(not(trace_level_logging))]
let source = None;
let collector = Collector::new();
Inner {
port: zx::Port::create(),
done: AtomicBool::new(false),
is_local,
receivers: Mutex::new(PacketReceiverMap::new()),
task_count: AtomicUsize::new(MAIN_TASK_ID + 1),
task_state: Mutex::new(TaskState {
all_tasks: HashMap::default(),
join_wakers: HashMap::default(),
}),
ready_tasks: SegQueue::new(),
time,
collector,
source,
threads_state: AtomicU16::new(0),
num_threads,
polled: AtomicU64::new(0),
owner_data: Mutex::new(None),
}
}
pub fn set_local(self: Arc<Self>, timers: TimerHeap) {
EXECUTOR.with(|e| {
let mut e = e.borrow_mut();
assert!(e.is_none(), "Cannot create multiple Fuchsia Executors");
*e = Some((self, timers));
});
}
fn poll_ready_tasks(&self, local_collector: &mut LocalCollector<'_>) -> PollReadyTasksResult {
loop {
for _ in 0..16 {
let Some(task) = self.ready_tasks.pop() else {
return PollReadyTasksResult::NoneReady;
};
let complete = self.try_poll(&task);
local_collector.task_polled(
task.id,
task.source(),
complete,
self.ready_tasks.len(),
);
if complete && task.id == MAIN_TASK_ID {
return PollReadyTasksResult::MainTaskCompleted;
}
self.polled.fetch_add(1, Ordering::Relaxed);
}
// We didn't finish all the ready tasks. If there are sleeping threads, post a
// notification to wake one up.
let mut threads_state = self.threads_state.load(Ordering::Relaxed);
loop {
if threads_sleeping(threads_state) == 0 {
// All threads are awake now. Prevent starvation.
return PollReadyTasksResult::MoreReady;
}
if threads_notified(threads_state) >= threads_sleeping(threads_state) {
// All sleeping threads have been notified. Keep going and poll more tasks.
break;
}
match self.try_notify(threads_state) {
Ok(()) => break,
Err(s) => threads_state = s,
}
}
}
}
#[cfg_attr(trace_level_logging, track_caller)]
pub fn spawn(self: &Arc<Self>, future: AtomicFuture<'static>) -> usize {
// Prevent a deadlock in `.all_tasks` when a task is spawned from a custom
// Drop impl while the executor is being torn down.
if self.done.load(Ordering::SeqCst) {
return usize::MAX;
}
let next_id = self.task_count.fetch_add(1, Ordering::Relaxed);
let task = Task::new(next_id, future, self.clone());
self.collector.task_created(next_id, task.source());
self.task_state.lock().all_tasks.insert(next_id, task.clone());
task.wake();
next_id
}
#[cfg_attr(trace_level_logging, track_caller)]
pub fn spawn_local<F: Future<Output = R> + 'static, R: 'static>(
self: &Arc<Self>,
future: F,
detached: bool,
) -> usize {
if !self.is_local {
panic!(
"Error: called `spawn_local` on multithreaded executor. \
Use `spawn` or a `LocalExecutor` instead."
);
}
// SAFETY: We've confirmed that the futures here will never be used across multiple threads,
// so the Send requirements that `new_local` requires should be met.
self.spawn(unsafe { AtomicFuture::new_local(future, detached) })
}
/// Spawns the main future.
pub fn spawn_main(self: &Arc<Self>, future: AtomicFuture<'static>) {
let task = Task::new(MAIN_TASK_ID, future, self.clone());
self.collector.task_created(MAIN_TASK_ID, task.source());
assert!(
self.task_state.lock().all_tasks.insert(MAIN_TASK_ID, task.clone()).is_none(),
"Existing main task"
);
task.wake();
}
pub fn notify_task_ready(&self) {
// Only post if there's no thread running (or soon to be running). If we happen to be
// running on a thread for this executor, then threads_state won't be equal to num_threads,
// which means notifications only get fired if this is from a non-async thread, or a thread
// that belongs to a different executor. We use SeqCst ordering here to make sure this load
// happens *after* the change to ready_tasks and to synchronize with worker_lifecycle.
let mut threads_state = self.threads_state.load(Ordering::SeqCst);
// We compare threads_state directly against self.num_threads (which means that
// notifications must be zero) because we only want to notify if there are no pending
// notifications and *all* threads are currently asleep.
while threads_state == self.num_threads as u16 {
match self.try_notify(threads_state) {
Ok(()) => break,
Err(s) => threads_state = s,
}
}
}
/// Tries to notify a thread to wake up. Returns threads_state if it fails.
fn try_notify(&self, old_threads_state: u16) -> Result<(), u16> {
self.threads_state
.compare_exchange_weak(
old_threads_state,
old_threads_state + 0x100, // <- Add one to notifications.
Ordering::Relaxed,
Ordering::Relaxed,
)
.map(|_| self.notify_id(TASK_READY_WAKEUP_ID))
}
pub fn wake_one_thread(&self) {
let mut threads_state = self.threads_state.load(Ordering::Relaxed);
let current_sleeping = threads_sleeping(threads_state);
if current_sleeping == 0 {
return;
}
while threads_notified(threads_state) == 0
&& threads_sleeping(threads_state) >= current_sleeping
{
match self.try_notify(threads_state) {
Ok(()) => break,
Err(s) => threads_state = s,
}
}
}
pub fn notify_id(&self, id: u64) {
let up = zx::UserPacket::from_u8_array([0; 32]);
let packet = zx::Packet::from_user_packet(id, 0 /* status??? */, up);
if let Err(e) = self.port.queue(&packet) {
// TODO: logging
eprintln!("Failed to queue notify in port: {:?}", e);
}
}
pub fn deliver_packet(&self, key: usize, packet: zx::Packet) {
let receiver = match self.receivers.lock().get(key) {
// Clone the `Arc` so that we don't hold the lock
// any longer than absolutely necessary.
// The `receive_packet` impl may be arbitrarily complex.
Some(receiver) => receiver.clone(),
None => return,
};
receiver.receive_packet(packet);
}
pub fn now(&self) -> Time {
match &self.time {
ExecutorTime::RealTime => Time::from_zx(zx::Time::get_monotonic()),
ExecutorTime::FakeTime(t) => Time::from_nanos(t.load(Ordering::Relaxed)),
}
}
pub fn set_fake_time(&self, new: Time) {
match &self.time {
ExecutorTime::RealTime => {
panic!("Error: called `set_fake_time` on an executor using actual time.")
}
ExecutorTime::FakeTime(t) => t.store(new.into_nanos(), Ordering::Relaxed),
}
}
pub fn is_real_time(&self) -> bool {
matches!(self.time, ExecutorTime::RealTime)
}
/// Must be called before `on_parent_drop`.
///
/// Done flag must be set before dropping packet receivers
/// so that future receivers that attempt to deregister themselves
/// know that it's okay if their entries are already missing.
pub fn mark_done(&self) {
self.done.store(true, Ordering::SeqCst);
// Make sure there's at least one notification outstanding per thread to wake up all
// workers. This might be more notifications than required, but this way we don't have to
// worry about races where tasks are just about to sleep; when a task receives the
// notification, it will check done and terminate.
let mut threads_state = self.threads_state.load(Ordering::Relaxed);
let num_threads = self.num_threads;
loop {
let notified = threads_notified(threads_state);
if notified >= num_threads {
break;
}
match self.threads_state.compare_exchange_weak(
threads_state,
make_threads_state(threads_sleeping(threads_state), num_threads),
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => {
for _ in notified..num_threads {
self.notify_id(TASK_READY_WAKEUP_ID);
}
return;
}
Err(old) => threads_state = old,
}
}
}
/// Notes about the lifecycle of an Executor.
///
/// a) The Executor stands as the only way to run a reactor based on a Fuchsia port, but the
/// lifecycle of the port itself is not currently tied to it. Executor vends clones of its
/// inner Arc structure to all receivers, so we don't have a type-safe way of ensuring that
/// the port is dropped alongside the Executor as it should.
/// TODO(https://fxbug.dev/42154828): Ensure the port goes away with the executor.
///
/// b) The Executor's lifetime is also tied to the thread-local variable pointing to the
/// "current" executor being set, and that's unset when the executor is dropped.
///
/// Point (a) is related to "what happens if I use a receiver after the executor is dropped",
/// and point (b) is related to "what happens when I try to create a new receiver when there
/// is no executor".
///
/// Tokio, for example, encodes the lifetime of the reactor separately from the thread-local
/// storage [1]. And the reactor discourages usage of strong references to it by vending weak
/// references to it [2] instead of strong.
///
/// There are pros and cons to both strategies. For (a), tokio encourages (but doesn't
/// enforce [3]) type-safety by vending weak pointers, but those add runtime overhead when
/// upgrading pointers. For (b) the difference mostly stand for "when is it safe to use IO
/// objects/receivers". Tokio says it's only safe to use them whenever a guard is in scope.
/// Fuchsia-async says it's safe to use them when a fuchsia_async::Executor is still in scope
/// in that thread.
///
/// This acts as a prelude to the panic encoded in Executor::drop when receivers haven't
/// unregistered themselves when the executor drops. The choice to panic was made based on
/// patterns in fuchsia-async that may come to change:
///
/// - Executor vends strong references to itself and those references are *stored* by most
/// receiver implementations (as opposed to reached out on TLS every time).
/// - Fuchsia-async objects return zx::Status on wait calls, there isn't an appropriate and
/// easy to understand error to return when polling on an extinct executor.
/// - All receivers are implemented in this crate and well-known.
///
/// [1]: https://docs.rs/tokio/1.5.0/tokio/runtime/struct.Runtime.html#method.enter
/// [2]: https://github.com/tokio-rs/tokio/blob/b42f21ec3e212ace25331d0c13889a45769e6006/tokio/src/signal/unix/driver.rs#L35
/// [3]: by returning an upgraded Arc, tokio trusts callers to not "use it for too long", an
/// opaque non-clone-copy-or-send guard would be stronger than this. See:
/// https://github.com/tokio-rs/tokio/blob/b42f21ec3e212ace25331d0c13889a45769e6006/tokio/src/io/driver/mod.rs#L297
pub fn on_parent_drop(&self) {
// Drop all tasks
let all_tasks = std::mem::take(&mut self.task_state.lock().all_tasks);
// Any use of fasync::unblock can involve a waker. Wakers hold weak references to tasks, but
// as part of waking, there's an upgrade to a strong reference, so for a small amount of
// time `fasync::unblock` can hold a strong reference to a task which in turn holds the
// future for the task which in turn could hold references to receivers, which, if we did
// nothing about it, would trip the assertion below. For that reason, we forcibly drop the
// task futures here.
for (_, task) in all_tasks {
task.future.try_drop().expect("Failed to drop task");
}
// Drop all of the uncompleted tasks
while let Some(_) = self.ready_tasks.pop() {}
// Synthetic main task marked completed
self.collector.task_completed(MAIN_TASK_ID, self.source);
// Do not allow any receivers to outlive the executor. That's very likely a bug waiting to
// happen. See discussion above.
//
// If you're here because you hit this panic check your code for:
//
// - A struct that contains a fuchsia_async::Executor NOT in the last position (last
// position gets dropped last: https://doc.rust-lang.org/reference/destructors.html).
//
// - A function scope that contains a fuchsia_async::Executor NOT in the first position
// (first position in function scope gets dropped last:
// https://doc.rust-lang.org/reference/destructors.html?highlight=scope#drop-scopes).
//
// - A function that holds a `fuchsia_async::Executor` in scope and whose last statement
// contains a temporary (temporaries are dropped after the function scope:
// https://doc.rust-lang.org/reference/destructors.html#temporary-scopes). This usually
// looks like a `match` statement at the end of the function without a semicolon.
//
// - Storing channel and FIDL objects in static variables.
//
// - fuchsia_async::unblock calls that move channels or FIDL objects to another thread.
assert!(
self.receivers.lock().mapping.is_empty(),
"receivers must not outlive their executor"
);
// Remove the thread-local executor set in `new`.
EHandle::rm_local();
}
// The debugger looks for this function on the stack, so if its (fully-qualified) name changes,
// the debugger needs to be updated.
// LINT.IfChange
pub fn worker_lifecycle<const UNTIL_STALLED: bool>(self: &Arc<Inner>) {
// LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
let mut local_collector = self.collector.create_local_collector();
loop {
// Keep track of whether we are considered asleep.
let mut sleeping = false;
match self.poll_ready_tasks(&mut local_collector) {
PollReadyTasksResult::NoneReady => {
// No more tasks, indicate we are sleeping. We use SeqCst ordering because we
// want this change here to happen *before* we check ready_tasks below. This
// synchronizes with notify_task_ready which is called *after* a task is added
// to ready_tasks.
self.threads_state.fetch_add(1, Ordering::SeqCst);
// Check ready tasks again. If a task got posted, wake up. This has to be done
// because a notification won't get sent if there is at least one active thread
// so there's a window between the preceding two lines where a task could be
// made ready and a notification is not sent because it looks like there is at
// least one thread running.
if self.ready_tasks.is_empty() {
sleeping = true;
} else {
// We lost a race, we're no longer sleeping.
self.threads_state.fetch_sub(1, Ordering::Relaxed);
}
}
PollReadyTasksResult::MoreReady => {}
PollReadyTasksResult::MainTaskCompleted => return,
}
// Check done here after updating threads_state to avoid shutdown races.
if self.done.load(Ordering::SeqCst) {
return;
}
enum Work {
None,
Packet(zx::Packet),
Timer(TimeWaker),
Stalled,
}
let mut notified = false;
let work = with_local_timer_heap(|timer_heap| {
// If we're considered awake choose INFINITE_PAST which will make the wait call
// return immediately. Otherwise choose a deadline from the timers.
let deadline = if !sleeping || UNTIL_STALLED {
Time::INFINITE_PAST
} else {
timer_heap.next_deadline().map(|t| t.time()).unwrap_or(Time::INFINITE)
};
local_collector.will_wait();
// into_zx: we are using real time, so the time is a monotonic time.
match self.port.wait(deadline.into_zx()) {
Ok(packet) => {
if packet.key() == TASK_READY_WAKEUP_ID {
local_collector.woke_up(WakeupReason::Notification);
notified = true;
Work::None
} else {
Work::Packet(packet)
}
}
Err(zx::Status::TIMED_OUT) => {
if !sleeping {
Work::None
} else if UNTIL_STALLED {
// Fire timers if using fake time.
if !self.is_real_time() {
if let Some(deadline) = timer_heap.next_deadline().map(|t| t.time())
{
if deadline <= self.now() {
return Work::Timer(timer_heap.pop().unwrap());
}
}
}
Work::Stalled
} else {
Work::Timer(timer_heap.pop().unwrap())
}
}
Err(status) => {
panic!("Error calling port wait: {:?}", status);
}
}
});
let threads_state_sub = make_threads_state(sleeping as u8, notified as u8);
if threads_state_sub > 0 {
self.threads_state.fetch_sub(threads_state_sub, Ordering::Relaxed);
}
match work {
Work::Packet(packet) => {
local_collector.woke_up(WakeupReason::Io);
self.deliver_packet(packet.key() as usize, packet);
}
Work::Timer(timer) => {
local_collector.woke_up(WakeupReason::Deadline);
timer.wake();
}
Work::None => {}
Work::Stalled => return,
}
}
}
/// Drops the main task.
///
/// # Safety
///
/// The caller must guarantee that the executor isn't running.
pub(super) unsafe fn drop_main_task(&self) {
if let Some(task) = self.task_state.lock().all_tasks.remove(&MAIN_TASK_ID) {
// Even though we've removed the task from active tasks, it could still be in
// pending_tasks, so we have to drop the future here. At time of writing, this is only
// used by the local executor and there could only be something in ready_tasks if
// there's a panic.
task.future.drop_future_unchecked();
}
}
/// Polls for a join result for the given task ID.
///
/// # Safety
///
/// The caller must guarantee that `R` is the correct type.
pub unsafe fn poll_join_result<R>(&self, task_id: usize, cx: &mut Context<'_>) -> Poll<R> {
let mut tasks = self.task_state.lock();
let Some(task) = tasks.all_tasks.get(&task_id) else { return Poll::Pending };
if let Some(result) = task.future.take_result() {
tasks.join_wakers.remove(&task_id);
tasks.all_tasks.remove(&task_id);
Poll::Ready(result)
} else {
tasks.join_wakers.insert(task_id, cx.waker().clone());
Poll::Pending
}
}
fn try_poll(&self, task: &Arc<Task>) -> bool {
// SAFETY: We meet the contract for RawWaker/RawWakerVtable.
let task_waker = unsafe {
Waker::from_raw(RawWaker::new(Arc::as_ptr(task) as *const (), &BORROWED_VTABLE))
};
match task.future.try_poll(&mut Context::from_waker(&task_waker)) {
AttemptPollResult::Yield => {
self.ready_tasks.push(task.clone());
false
}
AttemptPollResult::IFinished => {
let mut waker = None;
{
let mut tasks = self.task_state.lock();
if !task.future.is_detached_or_cancelled() {
waker = tasks.join_wakers.remove(&task.id);
} else if task.id != MAIN_TASK_ID {
tasks.all_tasks.remove(&task.id);
}
}
if let Some(waker) = waker {
waker.wake();
}
true
}
AttemptPollResult::Cancelled => {
self.task_state.lock().all_tasks.remove(&task.id);
true
}
_ => false,
}
}
}
/// A handle to an executor.
#[derive(Clone)]
pub struct EHandle {
pub(super) inner: Arc<Inner>,
}
impl fmt::Debug for EHandle {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("EHandle").field("port", &self.inner.port).finish()
}
}
impl EHandle {
/// Returns the thread-local executor.
///
/// # Panics
///
/// If called outside the context of an active async executor.
pub fn local() -> Self {
let inner = EXECUTOR
.with(|e| e.borrow().as_ref().map(|x| x.0.clone()))
.expect("Fuchsia Executor must be created first");
EHandle { inner }
}
pub(super) fn rm_local() {
EXECUTOR.with(|e| *e.borrow_mut() = None);
}
/// Get a reference to the Fuchsia `zx::Port` being used to listen for events.
pub fn port(&self) -> &zx::Port {
&self.inner.port
}
/// Registers a `PacketReceiver` with the executor and returns a registration.
/// The `PacketReceiver` will be deregistered when the `Registration` is dropped.
pub fn register_receiver<T>(&self, receiver: Arc<T>) -> ReceiverRegistration<T>
where
T: PacketReceiver,
{
let key = self.inner.receivers.lock().insert(receiver.clone()) as u64;
ReceiverRegistration { ehandle: self.clone(), key, receiver }
}
pub(crate) fn deregister_receiver(&self, key: u64) {
let key = key as usize;
let mut lock = self.inner.receivers.lock();
if lock.contains(key) {
lock.remove(key);
} else {
// The executor is shutting down and already removed the entry.
assert!(self.inner.done.load(Ordering::SeqCst), "Missing receiver to deregister");
}
}
pub(crate) fn register_timer(time: Time, handle: TimerHandle) {
with_local_timer_heap(|timer_heap| {
timer_heap.add_timer(time, handle);
});
}
/// See `Inner::spawn`.
#[cfg_attr(trace_level_logging, track_caller)]
pub(crate) fn spawn<R: Send + 'static>(
&self,
future: impl Future<Output = R> + Send + 'static,
) -> usize {
self.inner.spawn(AtomicFuture::new(future, false))
}
/// Spawn a new task to be run on this executor.
///
/// Tasks spawned using this method must be thread-safe (implement the `Send` trait), as they
/// may be run on either a singlethreaded or multithreaded executor.
#[cfg_attr(trace_level_logging, track_caller)]
pub fn spawn_detached(&self, future: impl Future<Output = ()> + Send + 'static) {
self.inner.spawn(AtomicFuture::new(future, true));
}
/// See `Inner::spawn_local`.
#[cfg_attr(trace_level_logging, track_caller)]
pub(crate) fn spawn_local<R: 'static>(
&self,
future: impl Future<Output = R> + 'static,
) -> usize {
self.inner.spawn_local(future, false)
}
/// Spawn a new task to be run on this executor.
///
/// This is similar to the `spawn_detached` method, but tasks spawned using this method do not
/// have to be threads-safe (implement the `Send` trait). In return, this method requires that
/// this executor is a LocalExecutor.
#[cfg_attr(trace_level_logging, track_caller)]
pub fn spawn_local_detached(&self, future: impl Future<Output = ()> + 'static) {
self.inner.spawn_local(future, true);
}
/// Marks the task as detached.
pub(crate) fn detach(&self, task_id: usize) {
let mut tasks = self.inner.task_state.lock();
if let Some(task) = tasks.all_tasks.get(&task_id) {
task.future.detach();
}
tasks.join_wakers.remove(&task_id);
}
/// Cancels the task.
///
/// # Safety
///
/// The caller must guarantee that `R` is the correct type.
pub(crate) unsafe fn cancel<R>(&self, task_id: usize) -> Option<R> {
let mut tasks = self.inner.task_state.lock();
tasks.join_wakers.remove(&task_id);
tasks.all_tasks.get(&task_id).and_then(|task| {
if task.future.cancel() {
self.inner.ready_tasks.push(task.clone());
}
task.future.take_result()
})
}
/// See `Inner::poll_join_result`.
pub(crate) unsafe fn poll_join_result<R>(
&self,
task_id: usize,
cx: &mut Context<'_>,
) -> Poll<R> {
self.inner.poll_join_result(task_id, cx)
}
}
pub(super) struct Task {
id: usize,
future: AtomicFuture<'static>,
executor: Arc<Inner>,
#[cfg(trace_level_logging)]
source: &'static Location<'static>,
}
impl Task {
#[cfg_attr(trace_level_logging, track_caller)]
fn new(id: usize, future: AtomicFuture<'static>, executor: Arc<Inner>) -> Arc<Self> {
let this = Arc::new(Self {
id,
future,
executor,
#[cfg(trace_level_logging)]
source: Location::caller(),
});
// Take a weak reference now to be used as a waker.
let _ = Arc::downgrade(&this).into_raw();
this
}
fn wake(self: &Arc<Self>) {
if self.future.mark_ready() {
self.executor.ready_tasks.push(self.clone());
self.executor.notify_task_ready();
}
}
fn source(&self) -> Option<&'static Location<'static>> {
#[cfg(trace_level_logging)]
{
Some(self.source)
}
#[cfg(not(trace_level_logging))]
None
}
}
impl Drop for Task {
fn drop(&mut self) {
// SAFETY: This balances the `into_raw` in `new`.
unsafe {
// TODO(https://fxbug.dev/328126836): We might need to revisit this when pointer
// provenance lands.
Weak::from_raw(self);
}
}
}
// This vtable is used for the waker that exists for the lifetime of the task, which gets dropped
// above, so these functions never drop.
static BORROWED_VTABLE: RawWakerVTable =
RawWakerVTable::new(waker_clone, waker_wake_by_ref, waker_wake_by_ref, waker_noop);
static VTABLE: RawWakerVTable =
RawWakerVTable::new(waker_clone, waker_wake, waker_wake_by_ref, waker_drop);
fn waker_clone(weak_raw: *const ()) -> RawWaker {
// SAFETY: `weak_raw` comes from a previous call to `into_raw`.
let weak = ManuallyDrop::new(unsafe { Weak::from_raw(weak_raw as *const Task) });
RawWaker::new((*weak).clone().into_raw() as *const _, &VTABLE)
}
fn waker_wake(weak_raw: *const ()) {
// SAFETY: `weak_raw` comes from a previous call to `into_raw`.
if let Some(task) = unsafe { Weak::from_raw(weak_raw as *const Task) }.upgrade() {
task.wake();
}
}
fn waker_wake_by_ref(weak_raw: *const ()) {
// SAFETY: `weak_raw` comes from a previous call to `into_raw`.
if let Some(task) =
ManuallyDrop::new(unsafe { Weak::from_raw(weak_raw as *const Task) }).upgrade()
{
task.wake();
}
}
fn waker_noop(_weak_raw: *const ()) {}
fn waker_drop(weak_raw: *const ()) {
// SAFETY: `weak_raw` comes from a previous call to `into_raw`.
unsafe {
Weak::from_raw(weak_raw as *const Task);
}
}
#[cfg(test)]
mod tests {
use {
crate::{LocalExecutor, Task},
std::{
future::poll_fn,
sync::{
atomic::{AtomicU32, Ordering},
Arc,
},
task::Poll,
},
};
async fn yield_to_executor() {
let mut done = false;
poll_fn(|cx| {
if done {
Poll::Ready(())
} else {
done = true;
cx.waker().wake_by_ref();
Poll::Pending
}
})
.await;
}
#[test]
fn test_detach() {
let mut e = LocalExecutor::new();
e.run_singlethreaded(async {
let counter = Arc::new(AtomicU32::new(0));
{
let counter = counter.clone();
Task::spawn(async move {
for _ in 0..5 {
yield_to_executor().await;
counter.fetch_add(1, Ordering::Relaxed);
}
})
.detach();
}
while counter.load(Ordering::Relaxed) != 5 {
yield_to_executor().await;
}
});
assert!(e.ehandle.inner.task_state.lock().join_wakers.is_empty());
}
#[test]
fn test_cancel() {
let mut e = LocalExecutor::new();
e.run_singlethreaded(async {
let ref_count = Arc::new(());
// First, just drop the task.
{
let ref_count = ref_count.clone();
let _ = Task::spawn(async move {
let _ref_count = ref_count;
let _: () = std::future::pending().await;
});
}
while Arc::strong_count(&ref_count) != 1 {
yield_to_executor().await;
}
// Now try explicitly cancelling.
let task = {
let ref_count = ref_count.clone();
Task::spawn(async move {
let _ref_count = ref_count;
let _: () = std::future::pending().await;
})
};
assert_eq!(task.cancel(), None);
while Arc::strong_count(&ref_count) != 1 {
yield_to_executor().await;
}
// Now cancel a task that has already finished.
let task = {
let ref_count = ref_count.clone();
Task::spawn(async move {
let _ref_count = ref_count;
})
};
// Wait for it to finish.
while Arc::strong_count(&ref_count) != 1 {
yield_to_executor().await;
}
assert_eq!(task.cancel(), Some(()));
});
assert!(e.ehandle.inner.task_state.lock().join_wakers.is_empty());
}
}