fuchsia_async/runtime/fuchsia/executor/common.rs
1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use super::super::timer::Timers;
6use super::atomic_future::hooks::HooksMap;
7use super::atomic_future::{AtomicFutureHandle, AttemptPollResult};
8use super::packets::{
9 PacketReceiver, PacketReceiverMap, RawReceiverRegistration, ReceiverRegistration,
10};
11use super::scope::ScopeHandle;
12use super::time::{BootInstant, MonotonicInstant};
13use crate::runtime::instrument::TaskInstrument;
14use crossbeam::queue::SegQueue;
15use fuchsia_sync::Mutex;
16use zx::BootDuration;
17
18use std::any::Any;
19use std::cell::{Cell, RefCell};
20use std::fmt;
21use std::future::Future;
22use std::pin::Pin;
23use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU32, AtomicU64, AtomicUsize, Ordering};
24use std::sync::{Arc, OnceLock};
25use std::task::Context;
26use std::thread::ThreadId;
27
28pub(crate) const TASK_READY_WAKEUP_ID: u64 = u64::MAX - 1;
29
30/// The id of the main task, which is a virtual task that lives from construction
31/// to destruction of the executor. The main task may correspond to multiple
32/// main futures, in cases where the executor runs multiple times during its lifetime.
33pub(crate) const MAIN_TASK_ID: usize = 0;
34
35thread_local!(
36 static EXECUTOR: RefCell<Option<ScopeHandle>> = const { RefCell::new(None) }
37);
38
39pub enum ExecutorTime {
40 RealTime,
41 /// Fake readings used in tests.
42 FakeTime {
43 // The fake monotonic clock reading.
44 mono_reading_ns: AtomicI64,
45 // An offset to add to mono_reading_ns to get the reading of the boot
46 // clock, disregarding the difference in timelines.
47 //
48 // We disregard the fact that the reading and offset can not be
49 // read atomically, this is usually not relevant in tests.
50 mono_to_boot_offset_ns: AtomicI64,
51 },
52}
53
54enum PollReadyTasksResult {
55 NoneReady,
56 MoreReady,
57 MainTaskCompleted,
58}
59
60/// 24 16 8 0
61/// +------------+------------+------------+
62/// | foreign | notified | sleeping |
63/// +------------+------------+------------+
64///
65/// sleeping : the number of threads sleeping
66/// notified : the number of notifications posted to wake sleeping threads
67/// foreign : the number of foreign threads processing tasks
68#[derive(Clone, Copy, Eq, PartialEq)]
69struct ThreadsState(u32);
70
71impl ThreadsState {
72 const fn sleeping(&self) -> u8 {
73 self.0 as u8
74 }
75
76 const fn notified(&self) -> u8 {
77 (self.0 >> 8) as u8
78 }
79
80 const fn with_sleeping(self, sleeping: u8) -> Self {
81 Self((self.0 & !0xff) | sleeping as u32)
82 }
83
84 const fn with_notified(self, notified: u8) -> Self {
85 Self(self.0 & !0xff00 | (notified as u32) << 8)
86 }
87
88 const fn with_foreign(self, foreign: u8) -> Self {
89 Self(self.0 & !0xff0000 | (foreign as u32) << 16)
90 }
91}
92
93#[cfg(test)]
94static ACTIVE_EXECUTORS: AtomicUsize = AtomicUsize::new(0);
95
96pub(crate) struct Executor {
97 pub(super) port: zx::Port,
98 monotonic_timers: Arc<Timers<MonotonicInstant>>,
99 boot_timers: Arc<Timers<BootInstant>>,
100 pub(super) done: AtomicBool,
101 is_local: bool,
102 pub(crate) receivers: PacketReceiverMap,
103 task_count: AtomicUsize,
104 pub(super) ready_tasks: SegQueue<TaskHandle>,
105 time: ExecutorTime,
106 // The low byte is the number of threads currently sleeping. The high byte is the number of
107 // of wake-up notifications pending.
108 pub(super) threads_state: AtomicU32,
109 pub(super) num_threads: u8,
110 pub(super) polled: AtomicU64,
111 // Data that belongs to the user that can be accessed via EHandle::local(). See
112 // `TestExecutor::poll_until_stalled`.
113 pub(super) owner_data: Mutex<Option<Box<dyn Any + Send>>>,
114 pub(super) instrument: Option<Arc<dyn TaskInstrument>>,
115 pub(super) hooks_map: HooksMap,
116 pub(super) first_thread_id: OnceLock<ThreadId>,
117}
118
119impl Executor {
120 pub fn new(
121 time: ExecutorTime,
122 is_local: bool,
123 num_threads: u8,
124 instrument: Option<Arc<dyn TaskInstrument>>,
125 ) -> Self {
126 Self::new_with_port(time, is_local, num_threads, zx::Port::create(), instrument)
127 }
128
129 pub fn new_with_port(
130 time: ExecutorTime,
131 is_local: bool,
132 num_threads: u8,
133 port: zx::Port,
134 instrument: Option<Arc<dyn TaskInstrument>>,
135 ) -> Self {
136 #[cfg(test)]
137 ACTIVE_EXECUTORS.fetch_add(1, Ordering::Relaxed);
138
139 // Is this a fake-time executor?
140 let is_fake = matches!(
141 time,
142 ExecutorTime::FakeTime { mono_reading_ns: _, mono_to_boot_offset_ns: _ }
143 );
144
145 Executor {
146 port,
147 monotonic_timers: Arc::new(Timers::<MonotonicInstant>::new(is_fake)),
148 boot_timers: Arc::new(Timers::<BootInstant>::new(is_fake)),
149 done: AtomicBool::new(false),
150 is_local,
151 receivers: PacketReceiverMap::new(),
152 task_count: AtomicUsize::new(MAIN_TASK_ID + 1),
153 ready_tasks: SegQueue::new(),
154 time,
155 threads_state: AtomicU32::new(0),
156 num_threads,
157 polled: AtomicU64::new(0),
158 owner_data: Mutex::new(None),
159 instrument,
160 hooks_map: HooksMap::default(),
161 first_thread_id: OnceLock::new(),
162 }
163 }
164
165 pub fn set_local(root_scope: ScopeHandle) {
166 root_scope.executor().first_thread_id.get_or_init(|| std::thread::current().id());
167 EXECUTOR.with(|e| {
168 let mut e = e.borrow_mut();
169 assert!(e.is_none(), "Cannot create multiple Fuchsia Executors");
170 *e = Some(root_scope);
171 });
172 }
173
174 fn poll_ready_tasks(&self) -> PollReadyTasksResult {
175 loop {
176 for _ in 0..16 {
177 let Some(task) = self.ready_tasks.pop() else {
178 return PollReadyTasksResult::NoneReady;
179 };
180 let task_id = task.id();
181 let complete = self.try_poll(task);
182 if complete && task_id == MAIN_TASK_ID {
183 return PollReadyTasksResult::MainTaskCompleted;
184 }
185 self.polled.fetch_add(1, Ordering::Relaxed);
186 }
187 // We didn't finish all the ready tasks. If there are sleeping threads, post a
188 // notification to wake one up.
189 let mut threads_state = ThreadsState(self.threads_state.load(Ordering::Relaxed));
190 loop {
191 if threads_state.sleeping() == 0 {
192 // All threads are awake now. Prevent starvation.
193 return PollReadyTasksResult::MoreReady;
194 }
195 if threads_state.notified() >= threads_state.sleeping() {
196 // All sleeping threads have been notified. Keep going and poll more tasks.
197 break;
198 }
199 match self.try_notify(threads_state) {
200 Ok(()) => break,
201 Err(s) => threads_state = s,
202 }
203 }
204 }
205 }
206
207 pub fn is_local(&self) -> bool {
208 self.is_local
209 }
210
211 pub fn next_task_id(&self) -> usize {
212 self.task_count.fetch_add(1, Ordering::Relaxed)
213 }
214
215 pub fn notify_task_ready(&self) {
216 // Only post if there's no thread running (or soon to be running). If we happen to be
217 // running on a thread for this executor, then threads_state won't be equal to num_threads,
218 // which means notifications only get fired if this is from a non-async thread, or a thread
219 // that belongs to a different executor. We use SeqCst ordering here to make sure this load
220 // happens *after* the change to ready_tasks and to synchronize with worker_lifecycle.
221 let mut threads_state = ThreadsState(self.threads_state.load(Ordering::SeqCst));
222
223 // We only want to notify if there are no pending notifications and there are no other
224 // threads running.
225 while threads_state == ThreadsState(0).with_sleeping(self.num_threads) {
226 match self.try_notify(threads_state) {
227 Ok(()) => break,
228 Err(s) => threads_state = s,
229 }
230 }
231 }
232
233 /// Tries to notify a thread to wake up. Returns threads_state if it fails.
234 fn try_notify(&self, old_threads_state: ThreadsState) -> Result<(), ThreadsState> {
235 self.threads_state
236 .compare_exchange_weak(
237 old_threads_state.0,
238 old_threads_state.0 + ThreadsState(0).with_notified(1).0,
239 Ordering::Relaxed,
240 Ordering::Relaxed,
241 )
242 .map(|_| self.notify_id(TASK_READY_WAKEUP_ID))
243 .map_err(ThreadsState)
244 }
245
246 pub fn wake_one_thread(&self) {
247 let mut threads_state = ThreadsState(self.threads_state.load(Ordering::Relaxed));
248 let current_sleeping = threads_state.sleeping();
249 if current_sleeping == 0 {
250 return;
251 }
252 while threads_state.notified() == 0 && threads_state.sleeping() >= current_sleeping {
253 match self.try_notify(threads_state) {
254 Ok(()) => break,
255 Err(s) => threads_state = s,
256 }
257 }
258 }
259
260 pub fn notify_id(&self, id: u64) {
261 let up = zx::UserPacket::from_u8_array([0; 32]);
262 let packet = zx::Packet::from_user_packet(id, 0 /* status??? */, up);
263 if let Err(e) = self.port.queue(&packet) {
264 // TODO: logging
265 eprintln!("Failed to queue notify in port: {e:?}");
266 }
267 }
268
269 /// Returns the current reading of the monotonic clock.
270 ///
271 /// For test executors running in fake time, returns the reading of the
272 /// fake monotonic clock.
273 pub fn now(&self) -> MonotonicInstant {
274 match &self.time {
275 ExecutorTime::RealTime => MonotonicInstant::from_zx(zx::MonotonicInstant::get()),
276 ExecutorTime::FakeTime { mono_reading_ns: t, .. } => {
277 MonotonicInstant::from_nanos(t.load(Ordering::Relaxed))
278 }
279 }
280 }
281
282 /// Returns the current reading of the boot clock.
283 ///
284 /// For test executors running in fake time, returns the reading of the
285 /// fake boot clock.
286 pub fn boot_now(&self) -> BootInstant {
287 match &self.time {
288 ExecutorTime::RealTime => BootInstant::from_zx(zx::BootInstant::get()),
289
290 ExecutorTime::FakeTime { mono_reading_ns: t, mono_to_boot_offset_ns } => {
291 // The two atomic values are loaded one after the other. This should
292 // not normally be an issue in tests.
293 let fake_mono_now = MonotonicInstant::from_nanos(t.load(Ordering::Relaxed));
294 let boot_offset_ns = mono_to_boot_offset_ns.load(Ordering::Relaxed);
295 BootInstant::from_nanos(fake_mono_now.into_nanos() + boot_offset_ns)
296 }
297 }
298 }
299
300 /// Sets the reading of the fake monotonic clock.
301 ///
302 /// # Panics
303 ///
304 /// If called on an executor that runs in real time.
305 pub fn set_fake_time(&self, new: MonotonicInstant) {
306 let boot_offset_ns = match &self.time {
307 ExecutorTime::RealTime => {
308 panic!("Error: called `set_fake_time` on an executor using actual time.")
309 }
310 ExecutorTime::FakeTime { mono_reading_ns: t, mono_to_boot_offset_ns } => {
311 t.store(new.into_nanos(), Ordering::Relaxed);
312 mono_to_boot_offset_ns.load(Ordering::Relaxed)
313 }
314 };
315 self.monotonic_timers.maybe_notify(new);
316
317 // Changing fake time also affects boot time. Notify boot clocks as well.
318 let new_boot_time = BootInstant::from_nanos(new.into_nanos() + boot_offset_ns);
319 self.boot_timers.maybe_notify(new_boot_time);
320 }
321
322 // Sets a new offset between boot and monotonic time.
323 //
324 // Only works for executors operating in fake time.
325 // The change in the fake offset will wake expired boot timers.
326 pub fn set_fake_boot_to_mono_offset(&self, offset: BootDuration) {
327 let mono_now_ns = match &self.time {
328 ExecutorTime::RealTime => {
329 panic!(
330 "Error: called `set_fake_boot_to_mono_offset` on an executor using actual time."
331 )
332 }
333 ExecutorTime::FakeTime { mono_reading_ns: t, mono_to_boot_offset_ns: b } => {
334 // We ignore the non-atomic update between b and t, it is likely
335 // not relevant in tests.
336 b.store(offset.into_nanos(), Ordering::Relaxed);
337 t.load(Ordering::Relaxed)
338 }
339 };
340 let new_boot_now = BootInstant::from_nanos(mono_now_ns) + offset;
341 self.boot_timers.maybe_notify(new_boot_now);
342 }
343
344 /// Returns `true` if this executor is running in real time. Returns
345 /// `false` if this executor si running in fake time.
346 pub fn is_real_time(&self) -> bool {
347 matches!(self.time, ExecutorTime::RealTime)
348 }
349
350 /// Must be called before `on_parent_drop`.
351 ///
352 /// Done flag must be set before dropping packet receivers
353 /// so that future receivers that attempt to deregister themselves
354 /// know that it's okay if their entries are already missing.
355 pub fn mark_done(&self) {
356 self.done.store(true, Ordering::SeqCst);
357
358 // Make sure there's at least one notification outstanding per thread to wake up all
359 // workers. This might be more notifications than required, but this way we don't have to
360 // worry about races where tasks are just about to sleep; when a task receives the
361 // notification, it will check done and terminate.
362 let mut threads_state = ThreadsState(self.threads_state.load(Ordering::Relaxed));
363 let num_threads = self.num_threads;
364 loop {
365 let notified = threads_state.notified();
366 if notified >= num_threads {
367 break;
368 }
369 match self.threads_state.compare_exchange_weak(
370 threads_state.0,
371 threads_state.with_notified(num_threads).0,
372 Ordering::Relaxed,
373 Ordering::Relaxed,
374 ) {
375 Ok(_) => {
376 for _ in notified..num_threads {
377 self.notify_id(TASK_READY_WAKEUP_ID);
378 }
379 return;
380 }
381 Err(old) => threads_state = ThreadsState(old),
382 }
383 }
384 }
385
386 /// Notes about the lifecycle of an Executor.
387 ///
388 /// a) The Executor stands as the only way to run a reactor based on a Fuchsia port, but the
389 /// lifecycle of the port itself is not currently tied to it. Executor vends clones of its
390 /// inner Arc structure to all receivers, so we don't have a type-safe way of ensuring that
391 /// the port is dropped alongside the Executor as it should.
392 /// TODO(https://fxbug.dev/42154828): Ensure the port goes away with the executor.
393 ///
394 /// b) The Executor's lifetime is also tied to the thread-local variable pointing to the
395 /// "current" executor being set, and that's unset when the executor is dropped.
396 ///
397 /// Point (a) is related to "what happens if I use a receiver after the executor is dropped",
398 /// and point (b) is related to "what happens when I try to create a new receiver when there
399 /// is no executor".
400 ///
401 /// Tokio, for example, encodes the lifetime of the reactor separately from the thread-local
402 /// storage [1]. And the reactor discourages usage of strong references to it by vending weak
403 /// references to it [2] instead of strong.
404 ///
405 /// There are pros and cons to both strategies. For (a), tokio encourages (but doesn't
406 /// enforce [3]) type-safety by vending weak pointers, but those add runtime overhead when
407 /// upgrading pointers. For (b) the difference mostly stand for "when is it safe to use IO
408 /// objects/receivers". Tokio says it's only safe to use them whenever a guard is in scope.
409 /// Fuchsia-async says it's safe to use them when a fuchsia_async::Executor is still in scope
410 /// in that thread.
411 ///
412 /// This acts as a prelude to the panic encoded in Executor::drop when receivers haven't
413 /// unregistered themselves when the executor drops. The choice to panic was made based on
414 /// patterns in fuchsia-async that may come to change:
415 ///
416 /// - Executor vends strong references to itself and those references are *stored* by most
417 /// receiver implementations (as opposed to reached out on TLS every time).
418 /// - Fuchsia-async objects return zx::Status on wait calls, there isn't an appropriate and
419 /// easy to understand error to return when polling on an extinct executor.
420 /// - All receivers are implemented in this crate and well-known.
421 ///
422 /// [1]: https://docs.rs/tokio/1.5.0/tokio/runtime/struct.Runtime.html#method.enter
423 /// [2]: https://github.com/tokio-rs/tokio/blob/b42f21ec3e212ace25331d0c13889a45769e6006/tokio/src/signal/unix/driver.rs#L35
424 /// [3]: by returning an upgraded Arc, tokio trusts callers to not "use it for too long", an
425 /// opaque non-clone-copy-or-send guard would be stronger than this. See:
426 /// https://github.com/tokio-rs/tokio/blob/b42f21ec3e212ace25331d0c13889a45769e6006/tokio/src/io/driver/mod.rs#L297
427 pub fn on_parent_drop(&self, root_scope: &ScopeHandle) {
428 // Drop all tasks.
429 // Any use of fasync::unblock can involve a waker. Wakers hold weak references to tasks, but
430 // as part of waking, there's an upgrade to a strong reference, so for a small amount of
431 // time `fasync::unblock` can hold a strong reference to a task which in turn holds the
432 // future for the task which in turn could hold references to receivers, which, if we did
433 // nothing about it, would trip the assertion below. For that reason, we forcibly drop the
434 // task futures here.
435 root_scope.drop_all_tasks();
436
437 // Drop all of the uncompleted tasks
438 while self.ready_tasks.pop().is_some() {}
439
440 // Deregister the timer receivers so that we can perform the check below.
441 self.monotonic_timers.deregister();
442 self.boot_timers.deregister();
443
444 // Do not allow any receivers to outlive the executor. That's very likely a bug waiting to
445 // happen. See discussion above.
446 //
447 // If you're here because you hit this panic check your code for:
448 //
449 // - A struct that contains a fuchsia_async::Executor NOT in the last position (last
450 // position gets dropped last: https://doc.rust-lang.org/reference/destructors.html).
451 //
452 // - A function scope that contains a fuchsia_async::Executor NOT in the first position
453 // (first position in function scope gets dropped last:
454 // https://doc.rust-lang.org/reference/destructors.html?highlight=scope#drop-scopes).
455 //
456 // - A function that holds a `fuchsia_async::Executor` in scope and whose last statement
457 // contains a temporary (temporaries are dropped after the function scope:
458 // https://doc.rust-lang.org/reference/destructors.html#temporary-scopes). This usually
459 // looks like a `match` statement at the end of the function without a semicolon.
460 //
461 // - Storing channel and FIDL objects in static variables.
462 //
463 // - fuchsia_async::unblock calls that move channels or FIDL objects to another thread.
464 assert!(self.receivers.is_empty(), "receivers must not outlive their executor");
465
466 // Remove the thread-local executor set in `new`.
467 EHandle::rm_local();
468 }
469
470 // The debugger looks for this function on the stack, so if its (fully-qualified) name changes,
471 // the debugger needs to be updated.
472 // LINT.IfChange
473 pub fn worker_lifecycle<const UNTIL_STALLED: bool>(self: &Arc<Executor>) {
474 // LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
475
476 assert!(
477 !self.is_local() || self.first_thread_id.get() == Some(&std::thread::current().id())
478 );
479
480 self.monotonic_timers.register(self);
481 self.boot_timers.register(self);
482
483 loop {
484 // Keep track of whether we are considered asleep.
485 let mut sleeping = false;
486
487 match self.poll_ready_tasks() {
488 PollReadyTasksResult::NoneReady => {
489 // No more tasks, indicate we are sleeping. We use SeqCst ordering because we
490 // want this change here to happen *before* we check ready_tasks below. This
491 // synchronizes with notify_task_ready which is called *after* a task is added
492 // to ready_tasks.
493 const ONE_SLEEPING: ThreadsState = ThreadsState(0).with_sleeping(1);
494 self.threads_state.fetch_add(ONE_SLEEPING.0, Ordering::SeqCst);
495 // Check ready tasks again. If a task got posted, wake up. This has to be done
496 // because a notification won't get sent if there is at least one active thread
497 // so there's a window between the preceding two lines where a task could be
498 // made ready and a notification is not sent because it looks like there is at
499 // least one thread running.
500 if self.ready_tasks.is_empty() {
501 sleeping = true;
502 } else {
503 // We lost a race, we're no longer sleeping.
504 self.threads_state.fetch_sub(ONE_SLEEPING.0, Ordering::Relaxed);
505 }
506 }
507 PollReadyTasksResult::MoreReady => {}
508 PollReadyTasksResult::MainTaskCompleted => return,
509 }
510
511 // Check done here after updating threads_state to avoid shutdown races.
512 if self.done.load(Ordering::SeqCst) {
513 return;
514 }
515
516 enum Work {
517 None,
518 Packet(zx::Packet),
519 Stalled,
520 }
521
522 let mut notified = false;
523 let work = {
524 // If we're considered awake choose INFINITE_PAST which will make the wait call
525 // return immediately. Otherwise, wait until a packet arrives.
526 let deadline = if !sleeping || UNTIL_STALLED {
527 zx::Instant::INFINITE_PAST
528 } else {
529 zx::Instant::INFINITE
530 };
531
532 match self.port.wait(deadline) {
533 Ok(packet) => {
534 if packet.key() == TASK_READY_WAKEUP_ID {
535 notified = true;
536 Work::None
537 } else {
538 Work::Packet(packet)
539 }
540 }
541 Err(zx::Status::TIMED_OUT) => {
542 if !UNTIL_STALLED || !sleeping {
543 Work::None
544 } else {
545 Work::Stalled
546 }
547 }
548 Err(status) => {
549 panic!("Error calling port wait: {status:?}");
550 }
551 }
552 };
553
554 let threads_state_sub =
555 ThreadsState(0).with_sleeping(sleeping as u8).with_notified(notified as u8);
556 if threads_state_sub.0 > 0 {
557 self.threads_state.fetch_sub(threads_state_sub.0, Ordering::Relaxed);
558 }
559
560 match work {
561 Work::Packet(packet) => self.receivers.receive_packet(packet.key(), packet),
562 Work::None => {}
563 Work::Stalled => return,
564 }
565 }
566 }
567
568 /// Drops the main task.
569 ///
570 /// # Safety
571 ///
572 /// The caller must guarantee that the executor isn't running.
573 pub(super) unsafe fn drop_main_task(&self, root_scope: &ScopeHandle) {
574 root_scope.drop_task_unchecked(MAIN_TASK_ID);
575 }
576
577 fn try_poll(&self, task: TaskHandle) -> bool {
578 let task_waker = task.waker();
579 let poll_result = TaskHandle::set_current_with(&task, || {
580 task.try_poll(&mut Context::from_waker(&task_waker))
581 });
582 match poll_result {
583 AttemptPollResult::Yield => {
584 self.ready_tasks.push(task);
585 false
586 }
587 AttemptPollResult::IFinished | AttemptPollResult::Aborted => {
588 task.scope().task_did_finish(task.id());
589 true
590 }
591 _ => false,
592 }
593 }
594
595 /// Returns the monotonic timers.
596 pub fn monotonic_timers(&self) -> &Timers<MonotonicInstant> {
597 &self.monotonic_timers
598 }
599
600 /// Returns the boot timers.
601 pub fn boot_timers(&self) -> &Timers<BootInstant> {
602 &self.boot_timers
603 }
604
605 fn poll_tasks(&self, callback: impl FnOnce()) {
606 assert!(!self.is_local);
607
608 // Increment the count of foreign threads.
609 const ONE_FOREIGN: ThreadsState = ThreadsState(0).with_foreign(1);
610 self.threads_state.fetch_add(ONE_FOREIGN.0, Ordering::Relaxed);
611
612 callback();
613
614 // Poll up to 16 tasks.
615 for _ in 0..16 {
616 let Some(task) = self.ready_tasks.pop() else {
617 break;
618 };
619 let task_id = task.id();
620 if self.try_poll(task) && task_id == MAIN_TASK_ID {
621 break;
622 }
623 self.polled.fetch_add(1, Ordering::Relaxed);
624 }
625
626 let mut threads_state = ThreadsState(
627 self.threads_state.fetch_sub(ONE_FOREIGN.0, Ordering::SeqCst) - ONE_FOREIGN.0,
628 );
629
630 if !self.ready_tasks.is_empty() {
631 // There are tasks still ready to run, so wake up a thread if all the other threads are
632 // sleeping.
633 while threads_state == ThreadsState(0).with_sleeping(self.num_threads) {
634 match self.try_notify(threads_state) {
635 Ok(()) => break,
636 Err(s) => threads_state = s,
637 }
638 }
639 }
640 }
641
642 pub fn task_is_ready(&self, task: TaskHandle) {
643 self.ready_tasks.push(task);
644 self.notify_task_ready();
645 }
646}
647
648#[cfg(test)]
649impl Drop for Executor {
650 fn drop(&mut self) {
651 ACTIVE_EXECUTORS.fetch_sub(1, Ordering::Relaxed);
652 }
653}
654
655/// A handle to an executor.
656#[derive(Clone)]
657pub struct EHandle {
658 // LINT.IfChange
659 pub(super) root_scope: ScopeHandle,
660 // LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
661}
662
663impl fmt::Debug for EHandle {
664 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
665 f.debug_struct("EHandle").field("port", &self.inner().port).finish()
666 }
667}
668
669impl EHandle {
670 /// Returns the thread-local executor.
671 ///
672 /// # Panics
673 ///
674 /// If called outside the context of an active async executor.
675 pub fn local() -> Self {
676 let root_scope = EXECUTOR
677 .with(|e| e.borrow().as_ref().map(|x| x.clone()))
678 .expect("Fuchsia Executor must be created first");
679
680 EHandle { root_scope }
681 }
682
683 /// Set the fake time to a given value.
684 ///
685 /// # Panics
686 ///
687 /// If the executor was not created with fake time.
688 pub fn set_fake_time(&self, t: MonotonicInstant) {
689 self.inner().set_fake_time(t)
690 }
691
692 pub(super) fn rm_local() {
693 EXECUTOR.with(|e| *e.borrow_mut() = None);
694 }
695
696 /// The root scope of the executor.
697 ///
698 /// This can be used to spawn tasks that live as long as the executor, and
699 /// to create shorter-lived child scopes.
700 ///
701 /// Most users should create an owned scope with
702 /// [`Scope::new_with_name`][crate::Scope::new_with_name] instead of using this method.
703 pub fn global_scope(&self) -> &ScopeHandle {
704 &self.root_scope
705 }
706
707 /// Get a reference to the Fuchsia `zx::Port` being used to listen for events.
708 pub fn port(&self) -> &zx::Port {
709 &self.inner().port
710 }
711
712 /// Registers a `PacketReceiver` with the executor and returns a registration.
713 /// The `PacketReceiver` will be deregistered when the `Registration` is dropped.
714 pub fn register_receiver<T: PacketReceiver>(&self, receiver: T) -> ReceiverRegistration<T> {
715 self.inner().receivers.register(self.inner().clone(), receiver)
716 }
717
718 /// Registers a pinned `RawPacketReceiver` with the executor.
719 ///
720 /// The registration will be deregistered when dropped.
721 ///
722 /// NOTE: Unlike with `register_receiver`, `receive_packet` will be called whilst a lock is
723 /// held, so it is not safe to register or unregister receivers at that time.
724 pub fn register_pinned<T: PacketReceiver>(
725 &self,
726 raw_registration: Pin<&mut RawReceiverRegistration<T>>,
727 ) {
728 self.inner().receivers.register_pinned(self.clone(), raw_registration);
729 }
730
731 #[inline(always)]
732 pub(crate) fn inner(&self) -> &Arc<Executor> {
733 self.root_scope.executor()
734 }
735
736 /// Spawn a new task to be run on this executor.
737 ///
738 /// Tasks spawned using this method must be thread-safe (implement the `Send` trait), as they
739 /// may be run on either a singlethreaded or multithreaded executor.
740 pub fn spawn_detached(&self, future: impl Future<Output = ()> + Send + 'static) {
741 self.global_scope().spawn(future);
742 }
743
744 /// Spawn a new task to be run on this executor.
745 ///
746 /// This is similar to the `spawn_detached` method, but tasks spawned using this method do not
747 /// have to be threads-safe (implement the `Send` trait). In return, this method requires that
748 /// this executor is a LocalExecutor.
749 pub fn spawn_local_detached(&self, future: impl Future<Output = ()> + 'static) {
750 self.global_scope().spawn_local(future);
751 }
752
753 pub(crate) fn mono_timers(&self) -> &Arc<Timers<MonotonicInstant>> {
754 &self.inner().monotonic_timers
755 }
756
757 pub(crate) fn boot_timers(&self) -> &Arc<Timers<BootInstant>> {
758 &self.inner().boot_timers
759 }
760
761 /// Calls `callback` in the context of the executor and then polls (a limited number of) tasks
762 /// that are ready to run. If tasks remain ready and no other threads are running, a thread
763 /// will be woken. This can end up being a performance win in the case that the queue can be
764 /// cleared without needing to wake any other thread.
765 ///
766 /// # Panics
767 ///
768 /// If called on a single-threaded executor or if this thread is a thread managed by the
769 /// executor.
770 pub fn poll_tasks(&self, callback: impl FnOnce()) {
771 EXECUTOR.with(|e| {
772 assert!(
773 e.borrow_mut().replace(self.root_scope.clone()).is_none(),
774 "This thread is already associated with an executor"
775 );
776 });
777
778 self.inner().poll_tasks(callback);
779
780 EXECUTOR.with(|e| *e.borrow_mut() = None);
781 }
782}
783
784// AtomicFutureHandle can have a lifetime (for local executors we allow the main task to have a
785// non-static lifetime). The executor doesn't handle this though; the executor just assumes all
786// tasks have the 'static lifetime. It's up to the local executor to extend the lifetime and make
787// it safe.
788pub type TaskHandle = AtomicFutureHandle<'static>;
789
790thread_local! {
791 static CURRENT_TASK: Cell<*const TaskHandle> = const { Cell::new(std::ptr::null()) };
792}
793
794impl TaskHandle {
795 pub(crate) fn with_current<R>(f: impl FnOnce(Option<&TaskHandle>) -> R) -> R {
796 CURRENT_TASK.with(|cur| {
797 let cur = cur.get();
798 let cur = unsafe { cur.as_ref() };
799 f(cur)
800 })
801 }
802
803 fn set_current_with<R>(task: &TaskHandle, f: impl FnOnce() -> R) -> R {
804 CURRENT_TASK.with(|cur| {
805 cur.set(task);
806 let result = f();
807 cur.set(std::ptr::null());
808 result
809 })
810 }
811}
812
813#[cfg(test)]
814mod tests {
815 use super::{ACTIVE_EXECUTORS, EHandle};
816 use crate::{LocalExecutorBuilder, SendExecutorBuilder};
817 use std::sync::Arc;
818 use std::sync::atomic::{AtomicU64, Ordering};
819
820 #[test]
821 fn test_no_leaks() {
822 std::thread::spawn(|| SendExecutorBuilder::new().num_threads(1).build().run(async {}))
823 .join()
824 .unwrap();
825
826 assert_eq!(ACTIVE_EXECUTORS.load(Ordering::Relaxed), 0);
827 }
828
829 #[test]
830 fn poll_tasks() {
831 SendExecutorBuilder::new().num_threads(1).build().run(async {
832 let ehandle = EHandle::local();
833
834 // This will tie up the executor's only running thread which ensures that the task
835 // we spawn below can only run on the foreign thread.
836 std::thread::spawn(move || {
837 let ran = Arc::new(AtomicU64::new(0));
838 ehandle.poll_tasks(|| {
839 let ran = ran.clone();
840 ehandle.spawn_detached(async move {
841 ran.fetch_add(1, Ordering::Relaxed);
842 });
843 });
844
845 // The spawned task should have run in this thread.
846 assert_eq!(ran.load(Ordering::Relaxed), 1);
847 })
848 .join()
849 .unwrap();
850 });
851 }
852
853 #[test]
854 #[should_panic]
855 fn spawn_local_from_different_thread() {
856 let _executor = LocalExecutorBuilder::new().build();
857 let ehandle = EHandle::local();
858 let _ = std::thread::spawn(move || {
859 ehandle.spawn_local_detached(async {});
860 })
861 .join();
862 }
863}