fuchsia_async/runtime/fuchsia/executor/common.rs
1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use super::super::timer::Timers;
6use super::atomic_future::hooks::HooksMap;
7use super::atomic_future::{AtomicFutureHandle, AttemptPollResult};
8use super::packets::{
9 PacketReceiver, PacketReceiverMap, RawReceiverRegistration, ReceiverRegistration,
10};
11use super::scope::ScopeHandle;
12use super::time::{BootInstant, MonotonicInstant};
13use crate::runtime::instrument::TaskInstrument;
14use crossbeam::queue::SegQueue;
15use fuchsia_sync::Mutex;
16use zx::BootDuration;
17
18use std::any::Any;
19use std::cell::{Cell, RefCell};
20use std::fmt;
21use std::future::Future;
22use std::pin::Pin;
23use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU32, AtomicU64, Ordering};
24use std::sync::{Arc, OnceLock};
25use std::task::Context;
26use std::thread::ThreadId;
27
28pub(crate) const TASK_READY_WAKEUP_ID: u64 = u64::MAX - 1;
29
30thread_local!(
31 static EXECUTOR: RefCell<Option<ScopeHandle>> = const { RefCell::new(None) }
32);
33
34pub enum ExecutorTime {
35 RealTime,
36 /// Fake readings used in tests.
37 FakeTime {
38 // The fake monotonic clock reading.
39 mono_reading_ns: AtomicI64,
40 // An offset to add to mono_reading_ns to get the reading of the boot
41 // clock, disregarding the difference in timelines.
42 //
43 // We disregard the fact that the reading and offset can not be
44 // read atomically, this is usually not relevant in tests.
45 mono_to_boot_offset_ns: AtomicI64,
46 },
47}
48
49enum PollReadyTasksResult {
50 NoneReady,
51 MoreReady,
52 MainTaskCompleted,
53}
54
55/// 24 16 8 0
56/// +------------+------------+------------+
57/// | foreign | notified | sleeping |
58/// +------------+------------+------------+
59///
60/// sleeping : the number of threads sleeping
61/// notified : the number of notifications posted to wake sleeping threads
62/// foreign : the number of foreign threads processing tasks
63#[derive(Clone, Copy, Eq, PartialEq)]
64struct ThreadsState(u32);
65
66impl ThreadsState {
67 const fn sleeping(&self) -> u8 {
68 self.0 as u8
69 }
70
71 const fn notified(&self) -> u8 {
72 (self.0 >> 8) as u8
73 }
74
75 const fn with_sleeping(self, sleeping: u8) -> Self {
76 Self((self.0 & !0xff) | sleeping as u32)
77 }
78
79 const fn with_notified(self, notified: u8) -> Self {
80 Self(self.0 & !0xff00 | (notified as u32) << 8)
81 }
82
83 const fn with_foreign(self, foreign: u8) -> Self {
84 Self(self.0 & !0xff0000 | (foreign as u32) << 16)
85 }
86}
87
88#[cfg(test)]
89static ACTIVE_EXECUTORS: std::sync::atomic::AtomicUsize = std::sync::atomic::AtomicUsize::new(0);
90
91pub(crate) struct Executor {
92 pub(super) port: zx::Port,
93 monotonic_timers: Arc<Timers<MonotonicInstant>>,
94 boot_timers: Arc<Timers<BootInstant>>,
95 pub(super) done: AtomicBool,
96 is_local: bool,
97 pub(crate) receivers: PacketReceiverMap,
98
99 pub(super) ready_tasks: SegQueue<TaskHandle>,
100 time: ExecutorTime,
101 // The low byte is the number of threads currently sleeping. The high byte is the number of
102 // of wake-up notifications pending.
103 pub(super) threads_state: AtomicU32,
104 pub(super) num_threads: u8,
105 pub(super) polled: AtomicU64,
106 // Data that belongs to the user that can be accessed via EHandle::local(). See
107 // `TestExecutor::poll_until_stalled`.
108 pub(super) owner_data: Mutex<Option<Box<dyn Any + Send>>>,
109 pub(super) instrument: Option<Arc<dyn TaskInstrument>>,
110 pub(super) hooks_map: HooksMap,
111 pub(super) first_thread_id: OnceLock<ThreadId>,
112
113 // The time, in nanoseconds, that the executor was last executing a normal priority task.
114 last_active: AtomicI64,
115}
116
117impl Executor {
118 pub fn new(
119 time: ExecutorTime,
120 is_local: bool,
121 num_threads: u8,
122 instrument: Option<Arc<dyn TaskInstrument>>,
123 ) -> Self {
124 Self::new_with_port(time, is_local, num_threads, zx::Port::create(), instrument)
125 }
126
127 pub fn new_with_port(
128 time: ExecutorTime,
129 is_local: bool,
130 num_threads: u8,
131 port: zx::Port,
132 instrument: Option<Arc<dyn TaskInstrument>>,
133 ) -> Self {
134 #[cfg(test)]
135 ACTIVE_EXECUTORS.fetch_add(1, Ordering::Relaxed);
136
137 // Is this a fake-time executor?
138 let is_fake = matches!(
139 time,
140 ExecutorTime::FakeTime { mono_reading_ns: _, mono_to_boot_offset_ns: _ }
141 );
142
143 Executor {
144 port,
145 monotonic_timers: Arc::new(Timers::<MonotonicInstant>::new(is_fake)),
146 boot_timers: Arc::new(Timers::<BootInstant>::new(is_fake)),
147 done: AtomicBool::new(false),
148 is_local,
149 receivers: PacketReceiverMap::new(),
150 ready_tasks: SegQueue::new(),
151 time,
152 threads_state: AtomicU32::new(0),
153 num_threads,
154 polled: AtomicU64::new(0),
155 owner_data: Mutex::new(None),
156 instrument,
157 hooks_map: HooksMap::default(),
158 first_thread_id: OnceLock::new(),
159 last_active: Default::default(),
160 }
161 }
162
163 pub fn set_local(root_scope: ScopeHandle) {
164 root_scope.executor().first_thread_id.get_or_init(|| std::thread::current().id());
165 EXECUTOR.with(|e| {
166 let mut e = e.borrow_mut();
167 assert!(e.is_none(), "Cannot create multiple Fuchsia Executors");
168 *e = Some(root_scope);
169 });
170 }
171
172 fn poll_ready_tasks(&self, main_task: Option<&TaskHandle>) -> PollReadyTasksResult {
173 loop {
174 for _ in 0..16 {
175 let Some(task) = self.ready_tasks.pop() else {
176 return PollReadyTasksResult::NoneReady;
177 };
178 let is_main = Some(&task) == main_task;
179 let complete = self.try_poll(task);
180 if complete && is_main {
181 return PollReadyTasksResult::MainTaskCompleted;
182 }
183 self.polled.fetch_add(1, Ordering::Relaxed);
184 }
185 // We didn't finish all the ready tasks. If there are sleeping threads, post a
186 // notification to wake one up.
187 let mut threads_state = ThreadsState(self.threads_state.load(Ordering::Relaxed));
188 loop {
189 if threads_state.sleeping() == 0 {
190 // All threads are awake now. Prevent starvation.
191 return PollReadyTasksResult::MoreReady;
192 }
193 if threads_state.notified() >= threads_state.sleeping() {
194 // All sleeping threads have been notified. Keep going and poll more tasks.
195 break;
196 }
197 match self.try_notify(threads_state) {
198 Ok(()) => break,
199 Err(s) => threads_state = s,
200 }
201 }
202 }
203 }
204
205 pub fn is_local(&self) -> bool {
206 self.is_local
207 }
208
209 pub fn notify_task_ready(&self) {
210 // Only post if there's no thread running (or soon to be running). If we happen to be
211 // running on a thread for this executor, then threads_state won't be equal to num_threads,
212 // which means notifications only get fired if this is from a non-async thread, or a thread
213 // that belongs to a different executor. We use SeqCst ordering here to make sure this load
214 // happens *after* the change to ready_tasks and to synchronize with worker_lifecycle.
215 let mut threads_state = ThreadsState(self.threads_state.load(Ordering::SeqCst));
216
217 // We only want to notify if there are no pending notifications and there are no other
218 // threads running.
219 while threads_state == ThreadsState(0).with_sleeping(self.num_threads) {
220 match self.try_notify(threads_state) {
221 Ok(()) => break,
222 Err(s) => threads_state = s,
223 }
224 }
225 }
226
227 /// Tries to notify a thread to wake up. Returns threads_state if it fails.
228 fn try_notify(&self, old_threads_state: ThreadsState) -> Result<(), ThreadsState> {
229 self.threads_state
230 .compare_exchange_weak(
231 old_threads_state.0,
232 old_threads_state.0 + ThreadsState(0).with_notified(1).0,
233 Ordering::Relaxed,
234 Ordering::Relaxed,
235 )
236 .map(|_| self.notify_id(TASK_READY_WAKEUP_ID))
237 .map_err(ThreadsState)
238 }
239
240 pub fn wake_one_thread(&self) {
241 let mut threads_state = ThreadsState(self.threads_state.load(Ordering::Relaxed));
242 let current_sleeping = threads_state.sleeping();
243 if current_sleeping == 0 {
244 return;
245 }
246 while threads_state.notified() == 0 && threads_state.sleeping() >= current_sleeping {
247 match self.try_notify(threads_state) {
248 Ok(()) => break,
249 Err(s) => threads_state = s,
250 }
251 }
252 }
253
254 pub fn notify_id(&self, id: u64) {
255 let up = zx::UserPacket::from_u8_array([0; 32]);
256 let packet = zx::Packet::from_user_packet(id, 0 /* status??? */, up);
257 if let Err(e) = self.port.queue(&packet) {
258 // TODO: logging
259 eprintln!("Failed to queue notify in port: {e:?}");
260 }
261 }
262
263 /// Returns the current reading of the monotonic clock.
264 ///
265 /// For test executors running in fake time, returns the reading of the
266 /// fake monotonic clock.
267 pub fn now(&self) -> MonotonicInstant {
268 match &self.time {
269 ExecutorTime::RealTime => MonotonicInstant::from_zx(zx::MonotonicInstant::get()),
270 ExecutorTime::FakeTime { mono_reading_ns: t, .. } => {
271 MonotonicInstant::from_nanos(t.load(Ordering::Relaxed))
272 }
273 }
274 }
275
276 /// Returns the current reading of the boot clock.
277 ///
278 /// For test executors running in fake time, returns the reading of the
279 /// fake boot clock.
280 pub fn boot_now(&self) -> BootInstant {
281 match &self.time {
282 ExecutorTime::RealTime => BootInstant::from_zx(zx::BootInstant::get()),
283
284 ExecutorTime::FakeTime { mono_reading_ns: t, mono_to_boot_offset_ns } => {
285 // The two atomic values are loaded one after the other. This should
286 // not normally be an issue in tests.
287 let fake_mono_now = MonotonicInstant::from_nanos(t.load(Ordering::Relaxed));
288 let boot_offset_ns = mono_to_boot_offset_ns.load(Ordering::Relaxed);
289 BootInstant::from_nanos(fake_mono_now.into_nanos() + boot_offset_ns)
290 }
291 }
292 }
293
294 /// Sets the reading of the fake monotonic clock.
295 ///
296 /// # Panics
297 ///
298 /// If called on an executor that runs in real time.
299 pub fn set_fake_time(&self, new: MonotonicInstant) {
300 let boot_offset_ns = match &self.time {
301 ExecutorTime::RealTime => {
302 panic!("Error: called `set_fake_time` on an executor using actual time.")
303 }
304 ExecutorTime::FakeTime { mono_reading_ns: t, mono_to_boot_offset_ns } => {
305 t.store(new.into_nanos(), Ordering::Relaxed);
306 mono_to_boot_offset_ns.load(Ordering::Relaxed)
307 }
308 };
309 self.monotonic_timers.maybe_notify(new);
310
311 // Changing fake time also affects boot time. Notify boot clocks as well.
312 let new_boot_time = BootInstant::from_nanos(new.into_nanos() + boot_offset_ns);
313 self.boot_timers.maybe_notify(new_boot_time);
314 }
315
316 // Sets a new offset between boot and monotonic time.
317 //
318 // Only works for executors operating in fake time.
319 // The change in the fake offset will wake expired boot timers.
320 pub fn set_fake_boot_to_mono_offset(&self, offset: BootDuration) {
321 let mono_now_ns = match &self.time {
322 ExecutorTime::RealTime => {
323 panic!(
324 "Error: called `set_fake_boot_to_mono_offset` on an executor using actual time."
325 )
326 }
327 ExecutorTime::FakeTime { mono_reading_ns: t, mono_to_boot_offset_ns: b } => {
328 // We ignore the non-atomic update between b and t, it is likely
329 // not relevant in tests.
330 b.store(offset.into_nanos(), Ordering::Relaxed);
331 t.load(Ordering::Relaxed)
332 }
333 };
334 let new_boot_now = BootInstant::from_nanos(mono_now_ns) + offset;
335 self.boot_timers.maybe_notify(new_boot_now);
336 }
337
338 /// Returns `true` if this executor is running in real time. Returns
339 /// `false` if this executor si running in fake time.
340 pub fn is_real_time(&self) -> bool {
341 matches!(self.time, ExecutorTime::RealTime)
342 }
343
344 /// Must be called before `on_parent_drop`.
345 ///
346 /// Done flag must be set before dropping packet receivers
347 /// so that future receivers that attempt to deregister themselves
348 /// know that it's okay if their entries are already missing.
349 pub fn mark_done(&self) {
350 self.done.store(true, Ordering::SeqCst);
351
352 // Make sure there's at least one notification outstanding per thread to wake up all
353 // workers. This might be more notifications than required, but this way we don't have to
354 // worry about races where tasks are just about to sleep; when a task receives the
355 // notification, it will check done and terminate.
356 let mut threads_state = ThreadsState(self.threads_state.load(Ordering::Relaxed));
357 let num_threads = self.num_threads;
358 loop {
359 let notified = threads_state.notified();
360 if notified >= num_threads {
361 break;
362 }
363 match self.threads_state.compare_exchange_weak(
364 threads_state.0,
365 threads_state.with_notified(num_threads).0,
366 Ordering::Relaxed,
367 Ordering::Relaxed,
368 ) {
369 Ok(_) => {
370 for _ in notified..num_threads {
371 self.notify_id(TASK_READY_WAKEUP_ID);
372 }
373 return;
374 }
375 Err(old) => threads_state = ThreadsState(old),
376 }
377 }
378 }
379
380 /// Notes about the lifecycle of an Executor.
381 ///
382 /// a) The Executor stands as the only way to run a reactor based on a Fuchsia port, but the
383 /// lifecycle of the port itself is not currently tied to it. Executor vends clones of its
384 /// inner Arc structure to all receivers, so we don't have a type-safe way of ensuring that
385 /// the port is dropped alongside the Executor as it should.
386 /// TODO(https://fxbug.dev/42154828): Ensure the port goes away with the executor.
387 ///
388 /// b) The Executor's lifetime is also tied to the thread-local variable pointing to the
389 /// "current" executor being set, and that's unset when the executor is dropped.
390 ///
391 /// Point (a) is related to "what happens if I use a receiver after the executor is dropped",
392 /// and point (b) is related to "what happens when I try to create a new receiver when there
393 /// is no executor".
394 ///
395 /// Tokio, for example, encodes the lifetime of the reactor separately from the thread-local
396 /// storage [1]. And the reactor discourages usage of strong references to it by vending weak
397 /// references to it [2] instead of strong.
398 ///
399 /// There are pros and cons to both strategies. For (a), tokio encourages (but doesn't
400 /// enforce [3]) type-safety by vending weak pointers, but those add runtime overhead when
401 /// upgrading pointers. For (b) the difference mostly stand for "when is it safe to use IO
402 /// objects/receivers". Tokio says it's only safe to use them whenever a guard is in scope.
403 /// Fuchsia-async says it's safe to use them when a fuchsia_async::Executor is still in scope
404 /// in that thread.
405 ///
406 /// This acts as a prelude to the panic encoded in Executor::drop when receivers haven't
407 /// unregistered themselves when the executor drops. The choice to panic was made based on
408 /// patterns in fuchsia-async that may come to change:
409 ///
410 /// - Executor vends strong references to itself and those references are *stored* by most
411 /// receiver implementations (as opposed to reached out on TLS every time).
412 /// - Fuchsia-async objects return zx::Status on wait calls, there isn't an appropriate and
413 /// easy to understand error to return when polling on an extinct executor.
414 /// - All receivers are implemented in this crate and well-known.
415 ///
416 /// [1]: https://docs.rs/tokio/1.5.0/tokio/runtime/struct.Runtime.html#method.enter
417 /// [2]: https://github.com/tokio-rs/tokio/blob/b42f21ec3e212ace25331d0c13889a45769e6006/tokio/src/signal/unix/driver.rs#L35
418 /// [3]: by returning an upgraded Arc, tokio trusts callers to not "use it for too long", an
419 /// opaque non-clone-copy-or-send guard would be stronger than this. See:
420 /// https://github.com/tokio-rs/tokio/blob/b42f21ec3e212ace25331d0c13889a45769e6006/tokio/src/io/driver/mod.rs#L297
421 pub fn on_parent_drop(&self, root_scope: &ScopeHandle) {
422 // Drop all tasks.
423 // Any use of fasync::unblock can involve a waker. Wakers hold weak references to tasks, but
424 // as part of waking, there's an upgrade to a strong reference, so for a small amount of
425 // time `fasync::unblock` can hold a strong reference to a task which in turn holds the
426 // future for the task which in turn could hold references to receivers, which, if we did
427 // nothing about it, would trip the assertion below. For that reason, we forcibly drop the
428 // task futures here.
429 root_scope.drop_all_tasks();
430
431 // Drop all of the uncompleted tasks
432 while self.ready_tasks.pop().is_some() {}
433
434 // Deregister the timer receivers so that we can perform the check below.
435 self.monotonic_timers.deregister();
436 self.boot_timers.deregister();
437
438 // Do not allow any receivers to outlive the executor. That's very likely a bug waiting to
439 // happen. See discussion above.
440 //
441 // If you're here because you hit this panic check your code for:
442 //
443 // - A struct that contains a fuchsia_async::Executor NOT in the last position (last
444 // position gets dropped last: https://doc.rust-lang.org/reference/destructors.html).
445 //
446 // - A function scope that contains a fuchsia_async::Executor NOT in the first position
447 // (first position in function scope gets dropped last:
448 // https://doc.rust-lang.org/reference/destructors.html?highlight=scope#drop-scopes).
449 //
450 // - A function that holds a `fuchsia_async::Executor` in scope and whose last statement
451 // contains a temporary (temporaries are dropped after the function scope:
452 // https://doc.rust-lang.org/reference/destructors.html#temporary-scopes). This usually
453 // looks like a `match` statement at the end of the function without a semicolon.
454 //
455 // - Storing channel and FIDL objects in static variables.
456 //
457 // - fuchsia_async::unblock calls that move channels or FIDL objects to another thread.
458 assert!(self.receivers.is_empty(), "receivers must not outlive their executor");
459
460 // Remove the thread-local executor set in `new`.
461 EHandle::rm_local();
462 }
463
464 // The debugger looks for this function on the stack, so if its (fully-qualified) name changes,
465 // the debugger needs to be updated.
466 // LINT.IfChange
467 pub fn worker_lifecycle<const UNTIL_STALLED: bool>(
468 self: &Arc<Executor>,
469 // The main task should be specified for a single-threaded executor, but it isn't necessary
470 // for a multi-threaded executor since it has an independent mechanism for tracking when
471 // the main task terminates.
472 // TODO(https://fxbug.dev/481030722) remove this parameter
473 main_task: Option<&TaskHandle>,
474 ) {
475 // LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
476
477 assert!(
478 !self.is_local() || self.first_thread_id.get() == Some(&std::thread::current().id())
479 );
480
481 self.monotonic_timers.register(self);
482 self.boot_timers.register(self);
483
484 loop {
485 // Keep track of whether we are considered asleep.
486 let mut sleeping = false;
487
488 match self.poll_ready_tasks(main_task) {
489 PollReadyTasksResult::NoneReady => {
490 // No more tasks, indicate we are sleeping. We use SeqCst ordering because we
491 // want this change here to happen *before* we check ready_tasks below. This
492 // synchronizes with notify_task_ready which is called *after* a task is added
493 // to ready_tasks.
494 const ONE_SLEEPING: ThreadsState = ThreadsState(0).with_sleeping(1);
495 self.threads_state.fetch_add(ONE_SLEEPING.0, Ordering::SeqCst);
496 // Check ready tasks again. If a task got posted, wake up. This has to be done
497 // because a notification won't get sent if there is at least one active thread
498 // so there's a window between the preceding two lines where a task could be
499 // made ready and a notification is not sent because it looks like there is at
500 // least one thread running.
501 if self.ready_tasks.is_empty() {
502 sleeping = true;
503 } else {
504 // We lost a race, we're no longer sleeping.
505 self.threads_state.fetch_sub(ONE_SLEEPING.0, Ordering::Relaxed);
506 }
507 }
508 PollReadyTasksResult::MoreReady => {}
509 PollReadyTasksResult::MainTaskCompleted => return,
510 }
511
512 // Check done here after updating threads_state to avoid shutdown races.
513 if self.done.load(Ordering::SeqCst) {
514 return;
515 }
516
517 enum Work {
518 None,
519 Packet(zx::Packet),
520 Stalled,
521 }
522
523 let mut notified = false;
524 let work = {
525 // If we're considered awake choose INFINITE_PAST which will make the wait call
526 // return immediately. Otherwise, wait until a packet arrives.
527 let deadline = if !sleeping || UNTIL_STALLED {
528 zx::Instant::INFINITE_PAST
529 } else {
530 zx::Instant::INFINITE
531 };
532
533 match self.port.wait(deadline) {
534 Ok(packet) => {
535 if packet.key() == TASK_READY_WAKEUP_ID {
536 notified = true;
537 Work::None
538 } else {
539 Work::Packet(packet)
540 }
541 }
542 Err(zx::Status::TIMED_OUT) => {
543 if !UNTIL_STALLED || !sleeping {
544 Work::None
545 } else {
546 Work::Stalled
547 }
548 }
549 Err(status) => {
550 panic!("Error calling port wait: {status:?}");
551 }
552 }
553 };
554
555 let threads_state_sub =
556 ThreadsState(0).with_sleeping(sleeping as u8).with_notified(notified as u8);
557 if threads_state_sub.0 > 0 {
558 self.threads_state.fetch_sub(threads_state_sub.0, Ordering::Relaxed);
559 }
560
561 match work {
562 Work::Packet(packet) => self.receivers.receive_packet(packet.key(), packet),
563 Work::None => {}
564 Work::Stalled => return,
565 }
566 }
567 }
568
569 /// Drops the main task.
570 ///
571 /// # Safety
572 ///
573 /// The caller must guarantee that the executor isn't running.
574 pub(super) unsafe fn drop_main_task(&self, root_scope: &ScopeHandle, task: &TaskHandle) {
575 unsafe { root_scope.drop_task_unchecked(task) };
576 }
577
578 fn try_poll(&self, task: TaskHandle) -> bool {
579 let task_waker = task.waker();
580 let poll_result = TaskHandle::set_current_with(&task, || {
581 task.try_poll(&mut Context::from_waker(&task_waker))
582 });
583 if !task.is_low_priority() {
584 self.last_active.store(self.now().into_nanos(), Ordering::Relaxed);
585 }
586 match poll_result {
587 AttemptPollResult::Yield => {
588 self.ready_tasks.push(task);
589 false
590 }
591 AttemptPollResult::IFinished | AttemptPollResult::Aborted => {
592 task.scope().task_did_finish(&task);
593 true
594 }
595 _ => false,
596 }
597 }
598
599 /// Returns the monotonic timers.
600 pub fn monotonic_timers(&self) -> &Timers<MonotonicInstant> {
601 &self.monotonic_timers
602 }
603
604 /// Returns the boot timers.
605 pub fn boot_timers(&self) -> &Timers<BootInstant> {
606 &self.boot_timers
607 }
608
609 fn poll_tasks(&self, callback: impl FnOnce(), main_task: Option<&TaskHandle>) {
610 assert!(!self.is_local);
611
612 // Increment the count of foreign threads.
613 const ONE_FOREIGN: ThreadsState = ThreadsState(0).with_foreign(1);
614 self.threads_state.fetch_add(ONE_FOREIGN.0, Ordering::Relaxed);
615
616 callback();
617
618 // Poll up to 16 tasks.
619 for _ in 0..16 {
620 let Some(task) = self.ready_tasks.pop() else {
621 break;
622 };
623 let is_main = Some(&task) == main_task;
624 if self.try_poll(task) && is_main {
625 break;
626 }
627 self.polled.fetch_add(1, Ordering::Relaxed);
628 }
629
630 let mut threads_state = ThreadsState(
631 self.threads_state.fetch_sub(ONE_FOREIGN.0, Ordering::SeqCst) - ONE_FOREIGN.0,
632 );
633
634 if !self.ready_tasks.is_empty() {
635 // There are tasks still ready to run, so wake up a thread if all the other threads are
636 // sleeping.
637 while threads_state == ThreadsState(0).with_sleeping(self.num_threads) {
638 match self.try_notify(threads_state) {
639 Ok(()) => break,
640 Err(s) => threads_state = s,
641 }
642 }
643 }
644 }
645
646 pub fn task_is_ready(&self, task: TaskHandle) {
647 self.ready_tasks.push(task);
648 self.notify_task_ready();
649 }
650
651 pub(crate) fn last_active(&self) -> MonotonicInstant {
652 MonotonicInstant::from_nanos(self.last_active.load(Ordering::Relaxed))
653 }
654}
655
656#[cfg(test)]
657impl Drop for Executor {
658 fn drop(&mut self) {
659 ACTIVE_EXECUTORS.fetch_sub(1, Ordering::Relaxed);
660 }
661}
662
663/// A handle to an executor.
664#[derive(Clone)]
665pub struct EHandle {
666 // LINT.IfChange
667 pub(super) root_scope: ScopeHandle,
668 // LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
669}
670
671impl fmt::Debug for EHandle {
672 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
673 f.debug_struct("EHandle").field("port", &self.inner().port).finish()
674 }
675}
676
677impl EHandle {
678 /// Returns the thread-local executor.
679 ///
680 /// # Panics
681 ///
682 /// If called outside the context of an active async executor.
683 pub fn local() -> Self {
684 let root_scope = EXECUTOR
685 .with(|e| e.borrow().as_ref().map(|x| x.clone()))
686 .expect("Fuchsia Executor must be created first");
687
688 EHandle { root_scope }
689 }
690
691 /// Set the fake time to a given value.
692 ///
693 /// # Panics
694 ///
695 /// If the executor was not created with fake time.
696 pub fn set_fake_time(&self, t: MonotonicInstant) {
697 self.inner().set_fake_time(t)
698 }
699
700 pub(super) fn rm_local() {
701 EXECUTOR.with(|e| *e.borrow_mut() = None);
702 }
703
704 /// The root scope of the executor.
705 ///
706 /// This can be used to spawn tasks that live as long as the executor, and
707 /// to create shorter-lived child scopes.
708 ///
709 /// Most users should create an owned scope with
710 /// [`Scope::new_with_name`][crate::Scope::new_with_name] instead of using this method.
711 pub fn global_scope(&self) -> &ScopeHandle {
712 &self.root_scope
713 }
714
715 /// Get a reference to the Fuchsia `zx::Port` being used to listen for events.
716 pub fn port(&self) -> &zx::Port {
717 &self.inner().port
718 }
719
720 /// Registers a `PacketReceiver` with the executor and returns a registration.
721 /// The `PacketReceiver` will be deregistered when the `Registration` is dropped.
722 pub fn register_receiver<T: PacketReceiver>(&self, receiver: T) -> ReceiverRegistration<T> {
723 self.inner().receivers.register(self.inner().clone(), receiver)
724 }
725
726 /// Registers a pinned `RawPacketReceiver` with the executor.
727 ///
728 /// The registration will be deregistered when dropped.
729 ///
730 /// NOTE: Unlike with `register_receiver`, `receive_packet` will be called whilst a lock is
731 /// held, so it is not safe to register or unregister receivers at that time.
732 pub fn register_pinned<T: PacketReceiver>(
733 &self,
734 raw_registration: Pin<&mut RawReceiverRegistration<T>>,
735 ) {
736 self.inner().receivers.register_pinned(self.clone(), raw_registration);
737 }
738
739 #[inline(always)]
740 pub(crate) fn inner(&self) -> &Arc<Executor> {
741 self.root_scope.executor()
742 }
743
744 /// Spawn a new task to be run on this executor.
745 ///
746 /// Tasks spawned using this method must be thread-safe (implement the `Send` trait), as they
747 /// may be run on either a singlethreaded or multithreaded executor.
748 pub fn spawn_detached(&self, future: impl Future<Output = ()> + Send + 'static) {
749 self.global_scope().spawn(future);
750 }
751
752 /// Spawn a new task to be run on this executor.
753 ///
754 /// This is similar to the `spawn_detached` method, but tasks spawned using this method do not
755 /// have to be threads-safe (implement the `Send` trait). In return, this method requires that
756 /// this executor is a LocalExecutor.
757 pub fn spawn_local_detached(&self, future: impl Future<Output = ()> + 'static) {
758 self.global_scope().spawn_local(future);
759 }
760
761 pub(crate) fn mono_timers(&self) -> &Arc<Timers<MonotonicInstant>> {
762 &self.inner().monotonic_timers
763 }
764
765 pub(crate) fn boot_timers(&self) -> &Arc<Timers<BootInstant>> {
766 &self.inner().boot_timers
767 }
768
769 /// Calls `callback` in the context of the executor and then polls (a limited number of) tasks
770 /// that are ready to run. If tasks remain ready and no other threads are running, a thread
771 /// will be woken. This can end up being a performance win in the case that the queue can be
772 /// cleared without needing to wake any other thread.
773 ///
774 /// # Panics
775 ///
776 /// If called on a single-threaded executor or if this thread is a thread managed by the
777 /// executor.
778 pub fn poll_tasks(&self, callback: impl FnOnce()) {
779 EXECUTOR.with(|e| {
780 assert!(
781 e.borrow_mut().replace(self.root_scope.clone()).is_none(),
782 "This thread is already associated with an executor"
783 );
784 });
785
786 // `poll_tasks` should only ever be used on a multi-threaded executor, it's safe to pass
787 // None.
788 self.inner().poll_tasks(callback, None);
789
790 EXECUTOR.with(|e| *e.borrow_mut() = None);
791 }
792}
793
794// AtomicFutureHandle can have a lifetime (for local executors we allow the main task to have a
795// non-static lifetime). The executor doesn't handle this though; the executor just assumes all
796// tasks have the 'static lifetime. It's up to the local executor to extend the lifetime and make
797// it safe.
798pub type TaskHandle = AtomicFutureHandle<'static>;
799
800thread_local! {
801 static CURRENT_TASK: Cell<*const TaskHandle> = const { Cell::new(std::ptr::null()) };
802}
803
804impl TaskHandle {
805 pub(crate) fn with_current<R>(f: impl FnOnce(Option<&TaskHandle>) -> R) -> R {
806 CURRENT_TASK.with(|cur| {
807 let cur = cur.get();
808 let cur = unsafe { cur.as_ref() };
809 f(cur)
810 })
811 }
812
813 fn set_current_with<R>(task: &TaskHandle, f: impl FnOnce() -> R) -> R {
814 CURRENT_TASK.with(|cur| {
815 cur.set(task);
816 let result = f();
817 cur.set(std::ptr::null());
818 result
819 })
820 }
821}
822
823#[cfg(test)]
824mod tests {
825 use super::{ACTIVE_EXECUTORS, EHandle};
826 use crate::{LocalExecutorBuilder, SendExecutorBuilder};
827 use std::sync::Arc;
828 use std::sync::atomic::{AtomicU64, Ordering};
829
830 #[test]
831 fn test_no_leaks() {
832 std::thread::spawn(|| SendExecutorBuilder::new().num_threads(1).build().run(async {}))
833 .join()
834 .unwrap();
835
836 assert_eq!(ACTIVE_EXECUTORS.load(Ordering::Relaxed), 0);
837 }
838
839 #[test]
840 fn poll_tasks() {
841 SendExecutorBuilder::new().num_threads(1).build().run(async {
842 let ehandle = EHandle::local();
843
844 // This will tie up the executor's only running thread which ensures that the task
845 // we spawn below can only run on the foreign thread.
846 std::thread::spawn(move || {
847 let ran = Arc::new(AtomicU64::new(0));
848 ehandle.poll_tasks(|| {
849 let ran = ran.clone();
850 ehandle.spawn_detached(async move {
851 ran.fetch_add(1, Ordering::Relaxed);
852 });
853 });
854
855 // The spawned task should have run in this thread.
856 assert_eq!(ran.load(Ordering::Relaxed), 1);
857 })
858 .join()
859 .unwrap();
860 });
861 }
862
863 #[test]
864 #[should_panic]
865 fn spawn_local_from_different_thread() {
866 let _executor = LocalExecutorBuilder::new().build();
867 let ehandle = EHandle::local();
868 let _ = std::thread::spawn(move || {
869 ehandle.spawn_local_detached(async {});
870 })
871 .join();
872 }
873}