fuchsia_async/runtime/fuchsia/executor/common.rs
1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use super::super::timer::Timers;
6use super::atomic_future::hooks::HooksMap;
7use super::atomic_future::{AtomicFutureHandle, AttemptPollResult};
8use super::packets::{
9 PacketReceiver, PacketReceiverMap, RawReceiverRegistration, ReceiverRegistration,
10};
11use super::scope::ScopeHandle;
12use super::time::{BootInstant, MonotonicInstant};
13use crate::runtime::instrument::TaskInstrument;
14use crossbeam::queue::SegQueue;
15use fuchsia_sync::Mutex;
16use zx::BootDuration;
17
18use std::any::Any;
19use std::cell::{Cell, RefCell};
20use std::fmt;
21use std::future::Future;
22use std::pin::Pin;
23use std::sync::Arc;
24use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU32, AtomicU64, AtomicUsize, Ordering};
25use std::task::Context;
26
27pub(crate) const TASK_READY_WAKEUP_ID: u64 = u64::MAX - 1;
28
29/// The id of the main task, which is a virtual task that lives from construction
30/// to destruction of the executor. The main task may correspond to multiple
31/// main futures, in cases where the executor runs multiple times during its lifetime.
32pub(crate) const MAIN_TASK_ID: usize = 0;
33
34thread_local!(
35 static EXECUTOR: RefCell<Option<ScopeHandle>> = const { RefCell::new(None) }
36);
37
38pub enum ExecutorTime {
39 RealTime,
40 /// Fake readings used in tests.
41 FakeTime {
42 // The fake monotonic clock reading.
43 mono_reading_ns: AtomicI64,
44 // An offset to add to mono_reading_ns to get the reading of the boot
45 // clock, disregarding the difference in timelines.
46 //
47 // We disregard the fact that the reading and offset can not be
48 // read atomically, this is usually not relevant in tests.
49 mono_to_boot_offset_ns: AtomicI64,
50 },
51}
52
53enum PollReadyTasksResult {
54 NoneReady,
55 MoreReady,
56 MainTaskCompleted,
57}
58
59/// 24 16 8 0
60/// +------------+------------+------------+
61/// | foreign | notified | sleeping |
62/// +------------+------------+------------+
63///
64/// sleeping : the number of threads sleeping
65/// notified : the number of notifications posted to wake sleeping threads
66/// foreign : the number of foreign threads processing tasks
67#[derive(Clone, Copy, Eq, PartialEq)]
68struct ThreadsState(u32);
69
70impl ThreadsState {
71 const fn sleeping(&self) -> u8 {
72 self.0 as u8
73 }
74
75 const fn notified(&self) -> u8 {
76 (self.0 >> 8) as u8
77 }
78
79 const fn with_sleeping(self, sleeping: u8) -> Self {
80 Self((self.0 & !0xff) | sleeping as u32)
81 }
82
83 const fn with_notified(self, notified: u8) -> Self {
84 Self(self.0 & !0xff00 | (notified as u32) << 8)
85 }
86
87 const fn with_foreign(self, foreign: u8) -> Self {
88 Self(self.0 & !0xff0000 | (foreign as u32) << 16)
89 }
90}
91
92#[cfg(test)]
93static ACTIVE_EXECUTORS: AtomicUsize = AtomicUsize::new(0);
94
95pub(crate) struct Executor {
96 pub(super) port: zx::Port,
97 monotonic_timers: Arc<Timers<MonotonicInstant>>,
98 boot_timers: Arc<Timers<BootInstant>>,
99 pub(super) done: AtomicBool,
100 is_local: bool,
101 pub(crate) receivers: PacketReceiverMap,
102 task_count: AtomicUsize,
103 pub(super) ready_tasks: SegQueue<TaskHandle>,
104 time: ExecutorTime,
105 // The low byte is the number of threads currently sleeping. The high byte is the number of
106 // of wake-up notifications pending.
107 pub(super) threads_state: AtomicU32,
108 pub(super) num_threads: u8,
109 pub(super) polled: AtomicU64,
110 // Data that belongs to the user that can be accessed via EHandle::local(). See
111 // `TestExecutor::poll_until_stalled`.
112 pub(super) owner_data: Mutex<Option<Box<dyn Any + Send>>>,
113 pub(super) instrument: Option<Arc<dyn TaskInstrument>>,
114 pub(super) hooks_map: HooksMap,
115}
116
117impl Executor {
118 pub fn new(
119 time: ExecutorTime,
120 is_local: bool,
121 num_threads: u8,
122 instrument: Option<Arc<dyn TaskInstrument>>,
123 ) -> Self {
124 Self::new_with_port(time, is_local, num_threads, zx::Port::create(), instrument)
125 }
126
127 pub fn new_with_port(
128 time: ExecutorTime,
129 is_local: bool,
130 num_threads: u8,
131 port: zx::Port,
132 instrument: Option<Arc<dyn TaskInstrument>>,
133 ) -> Self {
134 #[cfg(test)]
135 ACTIVE_EXECUTORS.fetch_add(1, Ordering::Relaxed);
136
137 // Is this a fake-time executor?
138 let is_fake = matches!(
139 time,
140 ExecutorTime::FakeTime { mono_reading_ns: _, mono_to_boot_offset_ns: _ }
141 );
142
143 Executor {
144 port,
145 monotonic_timers: Arc::new(Timers::<MonotonicInstant>::new(is_fake)),
146 boot_timers: Arc::new(Timers::<BootInstant>::new(is_fake)),
147 done: AtomicBool::new(false),
148 is_local,
149 receivers: PacketReceiverMap::new(),
150 task_count: AtomicUsize::new(MAIN_TASK_ID + 1),
151 ready_tasks: SegQueue::new(),
152 time,
153 threads_state: AtomicU32::new(0),
154 num_threads,
155 polled: AtomicU64::new(0),
156 owner_data: Mutex::new(None),
157 instrument,
158 hooks_map: HooksMap::default(),
159 }
160 }
161
162 pub fn set_local(root_scope: ScopeHandle) {
163 EXECUTOR.with(|e| {
164 let mut e = e.borrow_mut();
165 assert!(e.is_none(), "Cannot create multiple Fuchsia Executors");
166 *e = Some(root_scope);
167 });
168 }
169
170 fn poll_ready_tasks(&self) -> PollReadyTasksResult {
171 loop {
172 for _ in 0..16 {
173 let Some(task) = self.ready_tasks.pop() else {
174 return PollReadyTasksResult::NoneReady;
175 };
176 let task_id = task.id();
177 let complete = self.try_poll(task);
178 if complete && task_id == MAIN_TASK_ID {
179 return PollReadyTasksResult::MainTaskCompleted;
180 }
181 self.polled.fetch_add(1, Ordering::Relaxed);
182 }
183 // We didn't finish all the ready tasks. If there are sleeping threads, post a
184 // notification to wake one up.
185 let mut threads_state = ThreadsState(self.threads_state.load(Ordering::Relaxed));
186 loop {
187 if threads_state.sleeping() == 0 {
188 // All threads are awake now. Prevent starvation.
189 return PollReadyTasksResult::MoreReady;
190 }
191 if threads_state.notified() >= threads_state.sleeping() {
192 // All sleeping threads have been notified. Keep going and poll more tasks.
193 break;
194 }
195 match self.try_notify(threads_state) {
196 Ok(()) => break,
197 Err(s) => threads_state = s,
198 }
199 }
200 }
201 }
202
203 pub fn is_local(&self) -> bool {
204 self.is_local
205 }
206
207 pub fn next_task_id(&self) -> usize {
208 self.task_count.fetch_add(1, Ordering::Relaxed)
209 }
210
211 pub fn notify_task_ready(&self) {
212 // Only post if there's no thread running (or soon to be running). If we happen to be
213 // running on a thread for this executor, then threads_state won't be equal to num_threads,
214 // which means notifications only get fired if this is from a non-async thread, or a thread
215 // that belongs to a different executor. We use SeqCst ordering here to make sure this load
216 // happens *after* the change to ready_tasks and to synchronize with worker_lifecycle.
217 let mut threads_state = ThreadsState(self.threads_state.load(Ordering::SeqCst));
218
219 // We only want to notify if there are no pending notifications and there are no other
220 // threads running.
221 while threads_state == ThreadsState(0).with_sleeping(self.num_threads) {
222 match self.try_notify(threads_state) {
223 Ok(()) => break,
224 Err(s) => threads_state = s,
225 }
226 }
227 }
228
229 /// Tries to notify a thread to wake up. Returns threads_state if it fails.
230 fn try_notify(&self, old_threads_state: ThreadsState) -> Result<(), ThreadsState> {
231 self.threads_state
232 .compare_exchange_weak(
233 old_threads_state.0,
234 old_threads_state.0 + ThreadsState(0).with_notified(1).0,
235 Ordering::Relaxed,
236 Ordering::Relaxed,
237 )
238 .map(|_| self.notify_id(TASK_READY_WAKEUP_ID))
239 .map_err(ThreadsState)
240 }
241
242 pub fn wake_one_thread(&self) {
243 let mut threads_state = ThreadsState(self.threads_state.load(Ordering::Relaxed));
244 let current_sleeping = threads_state.sleeping();
245 if current_sleeping == 0 {
246 return;
247 }
248 while threads_state.notified() == 0 && threads_state.sleeping() >= current_sleeping {
249 match self.try_notify(threads_state) {
250 Ok(()) => break,
251 Err(s) => threads_state = s,
252 }
253 }
254 }
255
256 pub fn notify_id(&self, id: u64) {
257 let up = zx::UserPacket::from_u8_array([0; 32]);
258 let packet = zx::Packet::from_user_packet(id, 0 /* status??? */, up);
259 if let Err(e) = self.port.queue(&packet) {
260 // TODO: logging
261 eprintln!("Failed to queue notify in port: {e:?}");
262 }
263 }
264
265 /// Returns the current reading of the monotonic clock.
266 ///
267 /// For test executors running in fake time, returns the reading of the
268 /// fake monotonic clock.
269 pub fn now(&self) -> MonotonicInstant {
270 match &self.time {
271 ExecutorTime::RealTime => MonotonicInstant::from_zx(zx::MonotonicInstant::get()),
272 ExecutorTime::FakeTime { mono_reading_ns: t, .. } => {
273 MonotonicInstant::from_nanos(t.load(Ordering::Relaxed))
274 }
275 }
276 }
277
278 /// Returns the current reading of the boot clock.
279 ///
280 /// For test executors running in fake time, returns the reading of the
281 /// fake boot clock.
282 pub fn boot_now(&self) -> BootInstant {
283 match &self.time {
284 ExecutorTime::RealTime => BootInstant::from_zx(zx::BootInstant::get()),
285
286 ExecutorTime::FakeTime { mono_reading_ns: t, mono_to_boot_offset_ns } => {
287 // The two atomic values are loaded one after the other. This should
288 // not normally be an issue in tests.
289 let fake_mono_now = MonotonicInstant::from_nanos(t.load(Ordering::Relaxed));
290 let boot_offset_ns = mono_to_boot_offset_ns.load(Ordering::Relaxed);
291 BootInstant::from_nanos(fake_mono_now.into_nanos() + boot_offset_ns)
292 }
293 }
294 }
295
296 /// Sets the reading of the fake monotonic clock.
297 ///
298 /// # Panics
299 ///
300 /// If called on an executor that runs in real time.
301 pub fn set_fake_time(&self, new: MonotonicInstant) {
302 let boot_offset_ns = match &self.time {
303 ExecutorTime::RealTime => {
304 panic!("Error: called `set_fake_time` on an executor using actual time.")
305 }
306 ExecutorTime::FakeTime { mono_reading_ns: t, mono_to_boot_offset_ns } => {
307 t.store(new.into_nanos(), Ordering::Relaxed);
308 mono_to_boot_offset_ns.load(Ordering::Relaxed)
309 }
310 };
311 self.monotonic_timers.maybe_notify(new);
312
313 // Changing fake time also affects boot time. Notify boot clocks as well.
314 let new_boot_time = BootInstant::from_nanos(new.into_nanos() + boot_offset_ns);
315 self.boot_timers.maybe_notify(new_boot_time);
316 }
317
318 // Sets a new offset between boot and monotonic time.
319 //
320 // Only works for executors operating in fake time.
321 // The change in the fake offset will wake expired boot timers.
322 pub fn set_fake_boot_to_mono_offset(&self, offset: BootDuration) {
323 let mono_now_ns = match &self.time {
324 ExecutorTime::RealTime => {
325 panic!(
326 "Error: called `set_fake_boot_to_mono_offset` on an executor using actual time."
327 )
328 }
329 ExecutorTime::FakeTime { mono_reading_ns: t, mono_to_boot_offset_ns: b } => {
330 // We ignore the non-atomic update between b and t, it is likely
331 // not relevant in tests.
332 b.store(offset.into_nanos(), Ordering::Relaxed);
333 t.load(Ordering::Relaxed)
334 }
335 };
336 let new_boot_now = BootInstant::from_nanos(mono_now_ns) + offset;
337 self.boot_timers.maybe_notify(new_boot_now);
338 }
339
340 /// Returns `true` if this executor is running in real time. Returns
341 /// `false` if this executor si running in fake time.
342 pub fn is_real_time(&self) -> bool {
343 matches!(self.time, ExecutorTime::RealTime)
344 }
345
346 /// Must be called before `on_parent_drop`.
347 ///
348 /// Done flag must be set before dropping packet receivers
349 /// so that future receivers that attempt to deregister themselves
350 /// know that it's okay if their entries are already missing.
351 pub fn mark_done(&self) {
352 self.done.store(true, Ordering::SeqCst);
353
354 // Make sure there's at least one notification outstanding per thread to wake up all
355 // workers. This might be more notifications than required, but this way we don't have to
356 // worry about races where tasks are just about to sleep; when a task receives the
357 // notification, it will check done and terminate.
358 let mut threads_state = ThreadsState(self.threads_state.load(Ordering::Relaxed));
359 let num_threads = self.num_threads;
360 loop {
361 let notified = threads_state.notified();
362 if notified >= num_threads {
363 break;
364 }
365 match self.threads_state.compare_exchange_weak(
366 threads_state.0,
367 threads_state.with_notified(num_threads).0,
368 Ordering::Relaxed,
369 Ordering::Relaxed,
370 ) {
371 Ok(_) => {
372 for _ in notified..num_threads {
373 self.notify_id(TASK_READY_WAKEUP_ID);
374 }
375 return;
376 }
377 Err(old) => threads_state = ThreadsState(old),
378 }
379 }
380 }
381
382 /// Notes about the lifecycle of an Executor.
383 ///
384 /// a) The Executor stands as the only way to run a reactor based on a Fuchsia port, but the
385 /// lifecycle of the port itself is not currently tied to it. Executor vends clones of its
386 /// inner Arc structure to all receivers, so we don't have a type-safe way of ensuring that
387 /// the port is dropped alongside the Executor as it should.
388 /// TODO(https://fxbug.dev/42154828): Ensure the port goes away with the executor.
389 ///
390 /// b) The Executor's lifetime is also tied to the thread-local variable pointing to the
391 /// "current" executor being set, and that's unset when the executor is dropped.
392 ///
393 /// Point (a) is related to "what happens if I use a receiver after the executor is dropped",
394 /// and point (b) is related to "what happens when I try to create a new receiver when there
395 /// is no executor".
396 ///
397 /// Tokio, for example, encodes the lifetime of the reactor separately from the thread-local
398 /// storage [1]. And the reactor discourages usage of strong references to it by vending weak
399 /// references to it [2] instead of strong.
400 ///
401 /// There are pros and cons to both strategies. For (a), tokio encourages (but doesn't
402 /// enforce [3]) type-safety by vending weak pointers, but those add runtime overhead when
403 /// upgrading pointers. For (b) the difference mostly stand for "when is it safe to use IO
404 /// objects/receivers". Tokio says it's only safe to use them whenever a guard is in scope.
405 /// Fuchsia-async says it's safe to use them when a fuchsia_async::Executor is still in scope
406 /// in that thread.
407 ///
408 /// This acts as a prelude to the panic encoded in Executor::drop when receivers haven't
409 /// unregistered themselves when the executor drops. The choice to panic was made based on
410 /// patterns in fuchsia-async that may come to change:
411 ///
412 /// - Executor vends strong references to itself and those references are *stored* by most
413 /// receiver implementations (as opposed to reached out on TLS every time).
414 /// - Fuchsia-async objects return zx::Status on wait calls, there isn't an appropriate and
415 /// easy to understand error to return when polling on an extinct executor.
416 /// - All receivers are implemented in this crate and well-known.
417 ///
418 /// [1]: https://docs.rs/tokio/1.5.0/tokio/runtime/struct.Runtime.html#method.enter
419 /// [2]: https://github.com/tokio-rs/tokio/blob/b42f21ec3e212ace25331d0c13889a45769e6006/tokio/src/signal/unix/driver.rs#L35
420 /// [3]: by returning an upgraded Arc, tokio trusts callers to not "use it for too long", an
421 /// opaque non-clone-copy-or-send guard would be stronger than this. See:
422 /// https://github.com/tokio-rs/tokio/blob/b42f21ec3e212ace25331d0c13889a45769e6006/tokio/src/io/driver/mod.rs#L297
423 pub fn on_parent_drop(&self, root_scope: &ScopeHandle) {
424 // Drop all tasks.
425 // Any use of fasync::unblock can involve a waker. Wakers hold weak references to tasks, but
426 // as part of waking, there's an upgrade to a strong reference, so for a small amount of
427 // time `fasync::unblock` can hold a strong reference to a task which in turn holds the
428 // future for the task which in turn could hold references to receivers, which, if we did
429 // nothing about it, would trip the assertion below. For that reason, we forcibly drop the
430 // task futures here.
431 root_scope.drop_all_tasks();
432
433 // Drop all of the uncompleted tasks
434 while self.ready_tasks.pop().is_some() {}
435
436 // Deregister the timer receivers so that we can perform the check below.
437 self.monotonic_timers.deregister();
438 self.boot_timers.deregister();
439
440 // Do not allow any receivers to outlive the executor. That's very likely a bug waiting to
441 // happen. See discussion above.
442 //
443 // If you're here because you hit this panic check your code for:
444 //
445 // - A struct that contains a fuchsia_async::Executor NOT in the last position (last
446 // position gets dropped last: https://doc.rust-lang.org/reference/destructors.html).
447 //
448 // - A function scope that contains a fuchsia_async::Executor NOT in the first position
449 // (first position in function scope gets dropped last:
450 // https://doc.rust-lang.org/reference/destructors.html?highlight=scope#drop-scopes).
451 //
452 // - A function that holds a `fuchsia_async::Executor` in scope and whose last statement
453 // contains a temporary (temporaries are dropped after the function scope:
454 // https://doc.rust-lang.org/reference/destructors.html#temporary-scopes). This usually
455 // looks like a `match` statement at the end of the function without a semicolon.
456 //
457 // - Storing channel and FIDL objects in static variables.
458 //
459 // - fuchsia_async::unblock calls that move channels or FIDL objects to another thread.
460 assert!(self.receivers.is_empty(), "receivers must not outlive their executor");
461
462 // Remove the thread-local executor set in `new`.
463 EHandle::rm_local();
464 }
465
466 // The debugger looks for this function on the stack, so if its (fully-qualified) name changes,
467 // the debugger needs to be updated.
468 // LINT.IfChange
469 pub fn worker_lifecycle<const UNTIL_STALLED: bool>(self: &Arc<Executor>) {
470 // LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
471
472 self.monotonic_timers.register(self);
473 self.boot_timers.register(self);
474
475 loop {
476 // Keep track of whether we are considered asleep.
477 let mut sleeping = false;
478
479 match self.poll_ready_tasks() {
480 PollReadyTasksResult::NoneReady => {
481 // No more tasks, indicate we are sleeping. We use SeqCst ordering because we
482 // want this change here to happen *before* we check ready_tasks below. This
483 // synchronizes with notify_task_ready which is called *after* a task is added
484 // to ready_tasks.
485 const ONE_SLEEPING: ThreadsState = ThreadsState(0).with_sleeping(1);
486 self.threads_state.fetch_add(ONE_SLEEPING.0, Ordering::SeqCst);
487 // Check ready tasks again. If a task got posted, wake up. This has to be done
488 // because a notification won't get sent if there is at least one active thread
489 // so there's a window between the preceding two lines where a task could be
490 // made ready and a notification is not sent because it looks like there is at
491 // least one thread running.
492 if self.ready_tasks.is_empty() {
493 sleeping = true;
494 } else {
495 // We lost a race, we're no longer sleeping.
496 self.threads_state.fetch_sub(ONE_SLEEPING.0, Ordering::Relaxed);
497 }
498 }
499 PollReadyTasksResult::MoreReady => {}
500 PollReadyTasksResult::MainTaskCompleted => return,
501 }
502
503 // Check done here after updating threads_state to avoid shutdown races.
504 if self.done.load(Ordering::SeqCst) {
505 return;
506 }
507
508 enum Work {
509 None,
510 Packet(zx::Packet),
511 Stalled,
512 }
513
514 let mut notified = false;
515 let work = {
516 // If we're considered awake choose INFINITE_PAST which will make the wait call
517 // return immediately. Otherwise, wait until a packet arrives.
518 let deadline = if !sleeping || UNTIL_STALLED {
519 zx::Instant::INFINITE_PAST
520 } else {
521 zx::Instant::INFINITE
522 };
523
524 match self.port.wait(deadline) {
525 Ok(packet) => {
526 if packet.key() == TASK_READY_WAKEUP_ID {
527 notified = true;
528 Work::None
529 } else {
530 Work::Packet(packet)
531 }
532 }
533 Err(zx::Status::TIMED_OUT) => {
534 if !UNTIL_STALLED || !sleeping {
535 Work::None
536 } else {
537 Work::Stalled
538 }
539 }
540 Err(status) => {
541 panic!("Error calling port wait: {status:?}");
542 }
543 }
544 };
545
546 let threads_state_sub =
547 ThreadsState(0).with_sleeping(sleeping as u8).with_notified(notified as u8);
548 if threads_state_sub.0 > 0 {
549 self.threads_state.fetch_sub(threads_state_sub.0, Ordering::Relaxed);
550 }
551
552 match work {
553 Work::Packet(packet) => self.receivers.receive_packet(packet.key(), packet),
554 Work::None => {}
555 Work::Stalled => return,
556 }
557 }
558 }
559
560 /// Drops the main task.
561 ///
562 /// # Safety
563 ///
564 /// The caller must guarantee that the executor isn't running.
565 pub(super) unsafe fn drop_main_task(&self, root_scope: &ScopeHandle) {
566 root_scope.drop_task_unchecked(MAIN_TASK_ID);
567 }
568
569 fn try_poll(&self, task: TaskHandle) -> bool {
570 let task_waker = task.waker();
571 let poll_result = TaskHandle::set_current_with(&task, || {
572 task.try_poll(&mut Context::from_waker(&task_waker))
573 });
574 match poll_result {
575 AttemptPollResult::Yield => {
576 self.ready_tasks.push(task);
577 false
578 }
579 AttemptPollResult::IFinished | AttemptPollResult::Aborted => {
580 task.scope().task_did_finish(task.id());
581 true
582 }
583 _ => false,
584 }
585 }
586
587 /// Returns the monotonic timers.
588 pub fn monotonic_timers(&self) -> &Timers<MonotonicInstant> {
589 &self.monotonic_timers
590 }
591
592 /// Returns the boot timers.
593 pub fn boot_timers(&self) -> &Timers<BootInstant> {
594 &self.boot_timers
595 }
596
597 fn poll_tasks(&self, callback: impl FnOnce()) {
598 assert!(!self.is_local);
599
600 // Increment the count of foreign threads.
601 const ONE_FOREIGN: ThreadsState = ThreadsState(0).with_foreign(1);
602 self.threads_state.fetch_add(ONE_FOREIGN.0, Ordering::Relaxed);
603
604 callback();
605
606 // Poll up to 16 tasks.
607 for _ in 0..16 {
608 let Some(task) = self.ready_tasks.pop() else {
609 break;
610 };
611 let task_id = task.id();
612 if self.try_poll(task) && task_id == MAIN_TASK_ID {
613 break;
614 }
615 self.polled.fetch_add(1, Ordering::Relaxed);
616 }
617
618 let mut threads_state = ThreadsState(
619 self.threads_state.fetch_sub(ONE_FOREIGN.0, Ordering::SeqCst) - ONE_FOREIGN.0,
620 );
621
622 if !self.ready_tasks.is_empty() {
623 // There are tasks still ready to run, so wake up a thread if all the other threads are
624 // sleeping.
625 while threads_state == ThreadsState(0).with_sleeping(self.num_threads) {
626 match self.try_notify(threads_state) {
627 Ok(()) => break,
628 Err(s) => threads_state = s,
629 }
630 }
631 }
632 }
633
634 pub fn task_is_ready(&self, task: TaskHandle) {
635 self.ready_tasks.push(task);
636 self.notify_task_ready();
637 }
638}
639
640#[cfg(test)]
641impl Drop for Executor {
642 fn drop(&mut self) {
643 ACTIVE_EXECUTORS.fetch_sub(1, Ordering::Relaxed);
644 }
645}
646
647/// A handle to an executor.
648#[derive(Clone)]
649pub struct EHandle {
650 // LINT.IfChange
651 pub(super) root_scope: ScopeHandle,
652 // LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
653}
654
655impl fmt::Debug for EHandle {
656 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
657 f.debug_struct("EHandle").field("port", &self.inner().port).finish()
658 }
659}
660
661impl EHandle {
662 /// Returns the thread-local executor.
663 ///
664 /// # Panics
665 ///
666 /// If called outside the context of an active async executor.
667 pub fn local() -> Self {
668 let root_scope = EXECUTOR
669 .with(|e| e.borrow().as_ref().map(|x| x.clone()))
670 .expect("Fuchsia Executor must be created first");
671
672 EHandle { root_scope }
673 }
674
675 /// Set the fake time to a given value.
676 ///
677 /// # Panics
678 ///
679 /// If the executor was not created with fake time.
680 pub fn set_fake_time(&self, t: MonotonicInstant) {
681 self.inner().set_fake_time(t)
682 }
683
684 pub(super) fn rm_local() {
685 EXECUTOR.with(|e| *e.borrow_mut() = None);
686 }
687
688 /// The root scope of the executor.
689 ///
690 /// This can be used to spawn tasks that live as long as the executor, and
691 /// to create shorter-lived child scopes.
692 ///
693 /// Most users should create an owned scope with
694 /// [`Scope::new_with_name`][crate::Scope::new_with_name] instead of using this method.
695 pub fn global_scope(&self) -> &ScopeHandle {
696 &self.root_scope
697 }
698
699 /// Get a reference to the Fuchsia `zx::Port` being used to listen for events.
700 pub fn port(&self) -> &zx::Port {
701 &self.inner().port
702 }
703
704 /// Registers a `PacketReceiver` with the executor and returns a registration.
705 /// The `PacketReceiver` will be deregistered when the `Registration` is dropped.
706 pub fn register_receiver<T: PacketReceiver>(&self, receiver: T) -> ReceiverRegistration<T> {
707 self.inner().receivers.register(self.inner().clone(), receiver)
708 }
709
710 /// Registers a pinned `RawPacketReceiver` with the executor.
711 ///
712 /// The registration will be deregistered when dropped.
713 ///
714 /// NOTE: Unlike with `register_receiver`, `receive_packet` will be called whilst a lock is
715 /// held, so it is not safe to register or unregister receivers at that time.
716 pub fn register_pinned<T: PacketReceiver>(
717 &self,
718 raw_registration: Pin<&mut RawReceiverRegistration<T>>,
719 ) {
720 self.inner().receivers.register_pinned(self.clone(), raw_registration);
721 }
722
723 #[inline(always)]
724 pub(crate) fn inner(&self) -> &Arc<Executor> {
725 self.root_scope.executor()
726 }
727
728 /// Spawn a new task to be run on this executor.
729 ///
730 /// Tasks spawned using this method must be thread-safe (implement the `Send` trait), as they
731 /// may be run on either a singlethreaded or multithreaded executor.
732 pub fn spawn_detached(&self, future: impl Future<Output = ()> + Send + 'static) {
733 self.global_scope().spawn(future);
734 }
735
736 /// Spawn a new task to be run on this executor.
737 ///
738 /// This is similar to the `spawn_detached` method, but tasks spawned using this method do not
739 /// have to be threads-safe (implement the `Send` trait). In return, this method requires that
740 /// this executor is a LocalExecutor.
741 pub fn spawn_local_detached(&self, future: impl Future<Output = ()> + 'static) {
742 self.global_scope().spawn_local(future);
743 }
744
745 pub(crate) fn mono_timers(&self) -> &Arc<Timers<MonotonicInstant>> {
746 &self.inner().monotonic_timers
747 }
748
749 pub(crate) fn boot_timers(&self) -> &Arc<Timers<BootInstant>> {
750 &self.inner().boot_timers
751 }
752
753 /// Calls `callback` in the context of the executor and then polls (a limited number of) tasks
754 /// that are ready to run. If tasks remain ready and no other threads are running, a thread
755 /// will be woken. This can end up being a performance win in the case that the queue can be
756 /// cleared without needing to wake any other thread.
757 ///
758 /// # Panics
759 ///
760 /// If called on a single-threaded executor or if this thread is a thread managed by the
761 /// executor.
762 pub fn poll_tasks(&self, callback: impl FnOnce()) {
763 EXECUTOR.with(|e| {
764 assert!(
765 e.borrow_mut().replace(self.root_scope.clone()).is_none(),
766 "This thread is already associated with an executor"
767 );
768 });
769
770 self.inner().poll_tasks(callback);
771
772 EXECUTOR.with(|e| *e.borrow_mut() = None);
773 }
774}
775
776// AtomicFutureHandle can have a lifetime (for local executors we allow the main task to have a
777// non-static lifetime). The executor doesn't handle this though; the executor just assumes all
778// tasks have the 'static lifetime. It's up to the local executor to extend the lifetime and make
779// it safe.
780pub type TaskHandle = AtomicFutureHandle<'static>;
781
782thread_local! {
783 static CURRENT_TASK: Cell<*const TaskHandle> = const { Cell::new(std::ptr::null()) };
784}
785
786impl TaskHandle {
787 pub(crate) fn with_current<R>(f: impl FnOnce(Option<&TaskHandle>) -> R) -> R {
788 CURRENT_TASK.with(|cur| {
789 let cur = cur.get();
790 let cur = unsafe { cur.as_ref() };
791 f(cur)
792 })
793 }
794
795 fn set_current_with<R>(task: &TaskHandle, f: impl FnOnce() -> R) -> R {
796 CURRENT_TASK.with(|cur| {
797 cur.set(task);
798 let result = f();
799 cur.set(std::ptr::null());
800 result
801 })
802 }
803}
804
805#[cfg(test)]
806mod tests {
807 use super::{ACTIVE_EXECUTORS, EHandle};
808 use crate::SendExecutorBuilder;
809 use std::sync::Arc;
810 use std::sync::atomic::{AtomicU64, Ordering};
811
812 #[test]
813 fn test_no_leaks() {
814 std::thread::spawn(|| SendExecutorBuilder::new().num_threads(1).build().run(async {}))
815 .join()
816 .unwrap();
817
818 assert_eq!(ACTIVE_EXECUTORS.load(Ordering::Relaxed), 0);
819 }
820
821 #[test]
822 fn poll_tasks() {
823 SendExecutorBuilder::new().num_threads(1).build().run(async {
824 let ehandle = EHandle::local();
825
826 // This will tie up the executor's only running thread which ensures that the task
827 // we spawn below can only run on the foreign thread.
828 std::thread::spawn(move || {
829 let ran = Arc::new(AtomicU64::new(0));
830 ehandle.poll_tasks(|| {
831 let ran = ran.clone();
832 ehandle.spawn_detached(async move {
833 ran.fetch_add(1, Ordering::Relaxed);
834 });
835 });
836
837 // The spawned task should have run in this thread.
838 assert_eq!(ran.load(Ordering::Relaxed), 1);
839 })
840 .join()
841 .unwrap();
842 });
843 }
844}