fuchsia_async/runtime/fuchsia/executor/common.rs
1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use super::super::timer::Timers;
6use super::atomic_future::hooks::HooksMap;
7use super::atomic_future::{AtomicFutureHandle, AttemptPollResult};
8use super::packets::{
9 PacketReceiver, PacketReceiverMap, RawReceiverRegistration, ReceiverRegistration,
10};
11use super::scope::ScopeHandle;
12use super::time::{BootInstant, MonotonicInstant};
13use crate::runtime::instrument::TaskInstrument;
14use crossbeam::queue::SegQueue;
15use fuchsia_sync::Mutex;
16use zx::BootDuration;
17
18use std::any::Any;
19use std::cell::{Cell, RefCell};
20use std::fmt;
21use std::future::Future;
22use std::pin::Pin;
23use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU32, AtomicU64, Ordering};
24use std::sync::{Arc, OnceLock};
25use std::task::Context;
26use std::thread::ThreadId;
27
28pub(crate) const TASK_READY_WAKEUP_ID: u64 = u64::MAX - 1;
29
30thread_local!(
31 static EXECUTOR: RefCell<Option<ScopeHandle>> = const { RefCell::new(None) }
32);
33
34pub enum ExecutorTime {
35 RealTime,
36 /// Fake readings used in tests.
37 FakeTime {
38 // The fake monotonic clock reading.
39 mono_reading_ns: AtomicI64,
40 // An offset to add to mono_reading_ns to get the reading of the boot
41 // clock, disregarding the difference in timelines.
42 //
43 // We disregard the fact that the reading and offset can not be
44 // read atomically, this is usually not relevant in tests.
45 mono_to_boot_offset_ns: AtomicI64,
46 },
47}
48
49enum PollReadyTasksResult {
50 NoneReady,
51 MoreReady,
52 MainTaskCompleted,
53}
54
55/// 24 16 8 0
56/// +------------+------------+------------+
57/// | foreign | notified | sleeping |
58/// +------------+------------+------------+
59///
60/// sleeping : the number of threads sleeping
61/// notified : the number of notifications posted to wake sleeping threads
62/// foreign : the number of foreign threads processing tasks
63#[derive(Clone, Copy, Eq, PartialEq)]
64struct ThreadsState(u32);
65
66impl ThreadsState {
67 const fn sleeping(&self) -> u8 {
68 self.0 as u8
69 }
70
71 const fn notified(&self) -> u8 {
72 (self.0 >> 8) as u8
73 }
74
75 const fn with_sleeping(self, sleeping: u8) -> Self {
76 Self((self.0 & !0xff) | sleeping as u32)
77 }
78
79 const fn with_notified(self, notified: u8) -> Self {
80 Self(self.0 & !0xff00 | (notified as u32) << 8)
81 }
82
83 const fn with_foreign(self, foreign: u8) -> Self {
84 Self(self.0 & !0xff0000 | (foreign as u32) << 16)
85 }
86}
87
88#[cfg(test)]
89static ACTIVE_EXECUTORS: std::sync::atomic::AtomicUsize = std::sync::atomic::AtomicUsize::new(0);
90
91pub(crate) struct Executor {
92 pub(super) port: zx::Port,
93 monotonic_timers: Arc<Timers<MonotonicInstant>>,
94 boot_timers: Arc<Timers<BootInstant>>,
95 pub(super) done: AtomicBool,
96 is_local: bool,
97 pub(crate) receivers: PacketReceiverMap,
98
99 pub(super) ready_tasks: SegQueue<TaskHandle>,
100 time: ExecutorTime,
101 // The low byte is the number of threads currently sleeping. The high byte is the number of
102 // of wake-up notifications pending.
103 pub(super) threads_state: AtomicU32,
104 pub(super) num_threads: u8,
105 pub(super) polled: AtomicU64,
106 // Data that belongs to the user that can be accessed via EHandle::local(). See
107 // `TestExecutor::poll_until_stalled`.
108 pub(super) owner_data: Mutex<Option<Box<dyn Any + Send>>>,
109 pub(super) instrument: Option<Arc<dyn TaskInstrument>>,
110 pub(super) hooks_map: HooksMap,
111 pub(super) first_thread_id: OnceLock<ThreadId>,
112}
113
114impl Executor {
115 pub fn new(
116 time: ExecutorTime,
117 is_local: bool,
118 num_threads: u8,
119 instrument: Option<Arc<dyn TaskInstrument>>,
120 ) -> Self {
121 Self::new_with_port(time, is_local, num_threads, zx::Port::create(), instrument)
122 }
123
124 pub fn new_with_port(
125 time: ExecutorTime,
126 is_local: bool,
127 num_threads: u8,
128 port: zx::Port,
129 instrument: Option<Arc<dyn TaskInstrument>>,
130 ) -> Self {
131 #[cfg(test)]
132 ACTIVE_EXECUTORS.fetch_add(1, Ordering::Relaxed);
133
134 // Is this a fake-time executor?
135 let is_fake = matches!(
136 time,
137 ExecutorTime::FakeTime { mono_reading_ns: _, mono_to_boot_offset_ns: _ }
138 );
139
140 Executor {
141 port,
142 monotonic_timers: Arc::new(Timers::<MonotonicInstant>::new(is_fake)),
143 boot_timers: Arc::new(Timers::<BootInstant>::new(is_fake)),
144 done: AtomicBool::new(false),
145 is_local,
146 receivers: PacketReceiverMap::new(),
147 ready_tasks: SegQueue::new(),
148 time,
149 threads_state: AtomicU32::new(0),
150 num_threads,
151 polled: AtomicU64::new(0),
152 owner_data: Mutex::new(None),
153 instrument,
154 hooks_map: HooksMap::default(),
155 first_thread_id: OnceLock::new(),
156 }
157 }
158
159 pub fn set_local(root_scope: ScopeHandle) {
160 root_scope.executor().first_thread_id.get_or_init(|| std::thread::current().id());
161 EXECUTOR.with(|e| {
162 let mut e = e.borrow_mut();
163 assert!(e.is_none(), "Cannot create multiple Fuchsia Executors");
164 *e = Some(root_scope);
165 });
166 }
167
168 fn poll_ready_tasks(&self, main_task: Option<&TaskHandle>) -> PollReadyTasksResult {
169 loop {
170 for _ in 0..16 {
171 let Some(task) = self.ready_tasks.pop() else {
172 return PollReadyTasksResult::NoneReady;
173 };
174 let is_main = Some(&task) == main_task;
175 let complete = self.try_poll(task);
176 if complete && is_main {
177 return PollReadyTasksResult::MainTaskCompleted;
178 }
179 self.polled.fetch_add(1, Ordering::Relaxed);
180 }
181 // We didn't finish all the ready tasks. If there are sleeping threads, post a
182 // notification to wake one up.
183 let mut threads_state = ThreadsState(self.threads_state.load(Ordering::Relaxed));
184 loop {
185 if threads_state.sleeping() == 0 {
186 // All threads are awake now. Prevent starvation.
187 return PollReadyTasksResult::MoreReady;
188 }
189 if threads_state.notified() >= threads_state.sleeping() {
190 // All sleeping threads have been notified. Keep going and poll more tasks.
191 break;
192 }
193 match self.try_notify(threads_state) {
194 Ok(()) => break,
195 Err(s) => threads_state = s,
196 }
197 }
198 }
199 }
200
201 pub fn is_local(&self) -> bool {
202 self.is_local
203 }
204
205 pub fn notify_task_ready(&self) {
206 // Only post if there's no thread running (or soon to be running). If we happen to be
207 // running on a thread for this executor, then threads_state won't be equal to num_threads,
208 // which means notifications only get fired if this is from a non-async thread, or a thread
209 // that belongs to a different executor. We use SeqCst ordering here to make sure this load
210 // happens *after* the change to ready_tasks and to synchronize with worker_lifecycle.
211 let mut threads_state = ThreadsState(self.threads_state.load(Ordering::SeqCst));
212
213 // We only want to notify if there are no pending notifications and there are no other
214 // threads running.
215 while threads_state == ThreadsState(0).with_sleeping(self.num_threads) {
216 match self.try_notify(threads_state) {
217 Ok(()) => break,
218 Err(s) => threads_state = s,
219 }
220 }
221 }
222
223 /// Tries to notify a thread to wake up. Returns threads_state if it fails.
224 fn try_notify(&self, old_threads_state: ThreadsState) -> Result<(), ThreadsState> {
225 self.threads_state
226 .compare_exchange_weak(
227 old_threads_state.0,
228 old_threads_state.0 + ThreadsState(0).with_notified(1).0,
229 Ordering::Relaxed,
230 Ordering::Relaxed,
231 )
232 .map(|_| self.notify_id(TASK_READY_WAKEUP_ID))
233 .map_err(ThreadsState)
234 }
235
236 pub fn wake_one_thread(&self) {
237 let mut threads_state = ThreadsState(self.threads_state.load(Ordering::Relaxed));
238 let current_sleeping = threads_state.sleeping();
239 if current_sleeping == 0 {
240 return;
241 }
242 while threads_state.notified() == 0 && threads_state.sleeping() >= current_sleeping {
243 match self.try_notify(threads_state) {
244 Ok(()) => break,
245 Err(s) => threads_state = s,
246 }
247 }
248 }
249
250 pub fn notify_id(&self, id: u64) {
251 let up = zx::UserPacket::from_u8_array([0; 32]);
252 let packet = zx::Packet::from_user_packet(id, 0 /* status??? */, up);
253 if let Err(e) = self.port.queue(&packet) {
254 // TODO: logging
255 eprintln!("Failed to queue notify in port: {e:?}");
256 }
257 }
258
259 /// Returns the current reading of the monotonic clock.
260 ///
261 /// For test executors running in fake time, returns the reading of the
262 /// fake monotonic clock.
263 pub fn now(&self) -> MonotonicInstant {
264 match &self.time {
265 ExecutorTime::RealTime => MonotonicInstant::from_zx(zx::MonotonicInstant::get()),
266 ExecutorTime::FakeTime { mono_reading_ns: t, .. } => {
267 MonotonicInstant::from_nanos(t.load(Ordering::Relaxed))
268 }
269 }
270 }
271
272 /// Returns the current reading of the boot clock.
273 ///
274 /// For test executors running in fake time, returns the reading of the
275 /// fake boot clock.
276 pub fn boot_now(&self) -> BootInstant {
277 match &self.time {
278 ExecutorTime::RealTime => BootInstant::from_zx(zx::BootInstant::get()),
279
280 ExecutorTime::FakeTime { mono_reading_ns: t, mono_to_boot_offset_ns } => {
281 // The two atomic values are loaded one after the other. This should
282 // not normally be an issue in tests.
283 let fake_mono_now = MonotonicInstant::from_nanos(t.load(Ordering::Relaxed));
284 let boot_offset_ns = mono_to_boot_offset_ns.load(Ordering::Relaxed);
285 BootInstant::from_nanos(fake_mono_now.into_nanos() + boot_offset_ns)
286 }
287 }
288 }
289
290 /// Sets the reading of the fake monotonic clock.
291 ///
292 /// # Panics
293 ///
294 /// If called on an executor that runs in real time.
295 pub fn set_fake_time(&self, new: MonotonicInstant) {
296 let boot_offset_ns = match &self.time {
297 ExecutorTime::RealTime => {
298 panic!("Error: called `set_fake_time` on an executor using actual time.")
299 }
300 ExecutorTime::FakeTime { mono_reading_ns: t, mono_to_boot_offset_ns } => {
301 t.store(new.into_nanos(), Ordering::Relaxed);
302 mono_to_boot_offset_ns.load(Ordering::Relaxed)
303 }
304 };
305 self.monotonic_timers.maybe_notify(new);
306
307 // Changing fake time also affects boot time. Notify boot clocks as well.
308 let new_boot_time = BootInstant::from_nanos(new.into_nanos() + boot_offset_ns);
309 self.boot_timers.maybe_notify(new_boot_time);
310 }
311
312 // Sets a new offset between boot and monotonic time.
313 //
314 // Only works for executors operating in fake time.
315 // The change in the fake offset will wake expired boot timers.
316 pub fn set_fake_boot_to_mono_offset(&self, offset: BootDuration) {
317 let mono_now_ns = match &self.time {
318 ExecutorTime::RealTime => {
319 panic!(
320 "Error: called `set_fake_boot_to_mono_offset` on an executor using actual time."
321 )
322 }
323 ExecutorTime::FakeTime { mono_reading_ns: t, mono_to_boot_offset_ns: b } => {
324 // We ignore the non-atomic update between b and t, it is likely
325 // not relevant in tests.
326 b.store(offset.into_nanos(), Ordering::Relaxed);
327 t.load(Ordering::Relaxed)
328 }
329 };
330 let new_boot_now = BootInstant::from_nanos(mono_now_ns) + offset;
331 self.boot_timers.maybe_notify(new_boot_now);
332 }
333
334 /// Returns `true` if this executor is running in real time. Returns
335 /// `false` if this executor si running in fake time.
336 pub fn is_real_time(&self) -> bool {
337 matches!(self.time, ExecutorTime::RealTime)
338 }
339
340 /// Must be called before `on_parent_drop`.
341 ///
342 /// Done flag must be set before dropping packet receivers
343 /// so that future receivers that attempt to deregister themselves
344 /// know that it's okay if their entries are already missing.
345 pub fn mark_done(&self) {
346 self.done.store(true, Ordering::SeqCst);
347
348 // Make sure there's at least one notification outstanding per thread to wake up all
349 // workers. This might be more notifications than required, but this way we don't have to
350 // worry about races where tasks are just about to sleep; when a task receives the
351 // notification, it will check done and terminate.
352 let mut threads_state = ThreadsState(self.threads_state.load(Ordering::Relaxed));
353 let num_threads = self.num_threads;
354 loop {
355 let notified = threads_state.notified();
356 if notified >= num_threads {
357 break;
358 }
359 match self.threads_state.compare_exchange_weak(
360 threads_state.0,
361 threads_state.with_notified(num_threads).0,
362 Ordering::Relaxed,
363 Ordering::Relaxed,
364 ) {
365 Ok(_) => {
366 for _ in notified..num_threads {
367 self.notify_id(TASK_READY_WAKEUP_ID);
368 }
369 return;
370 }
371 Err(old) => threads_state = ThreadsState(old),
372 }
373 }
374 }
375
376 /// Notes about the lifecycle of an Executor.
377 ///
378 /// a) The Executor stands as the only way to run a reactor based on a Fuchsia port, but the
379 /// lifecycle of the port itself is not currently tied to it. Executor vends clones of its
380 /// inner Arc structure to all receivers, so we don't have a type-safe way of ensuring that
381 /// the port is dropped alongside the Executor as it should.
382 /// TODO(https://fxbug.dev/42154828): Ensure the port goes away with the executor.
383 ///
384 /// b) The Executor's lifetime is also tied to the thread-local variable pointing to the
385 /// "current" executor being set, and that's unset when the executor is dropped.
386 ///
387 /// Point (a) is related to "what happens if I use a receiver after the executor is dropped",
388 /// and point (b) is related to "what happens when I try to create a new receiver when there
389 /// is no executor".
390 ///
391 /// Tokio, for example, encodes the lifetime of the reactor separately from the thread-local
392 /// storage [1]. And the reactor discourages usage of strong references to it by vending weak
393 /// references to it [2] instead of strong.
394 ///
395 /// There are pros and cons to both strategies. For (a), tokio encourages (but doesn't
396 /// enforce [3]) type-safety by vending weak pointers, but those add runtime overhead when
397 /// upgrading pointers. For (b) the difference mostly stand for "when is it safe to use IO
398 /// objects/receivers". Tokio says it's only safe to use them whenever a guard is in scope.
399 /// Fuchsia-async says it's safe to use them when a fuchsia_async::Executor is still in scope
400 /// in that thread.
401 ///
402 /// This acts as a prelude to the panic encoded in Executor::drop when receivers haven't
403 /// unregistered themselves when the executor drops. The choice to panic was made based on
404 /// patterns in fuchsia-async that may come to change:
405 ///
406 /// - Executor vends strong references to itself and those references are *stored* by most
407 /// receiver implementations (as opposed to reached out on TLS every time).
408 /// - Fuchsia-async objects return zx::Status on wait calls, there isn't an appropriate and
409 /// easy to understand error to return when polling on an extinct executor.
410 /// - All receivers are implemented in this crate and well-known.
411 ///
412 /// [1]: https://docs.rs/tokio/1.5.0/tokio/runtime/struct.Runtime.html#method.enter
413 /// [2]: https://github.com/tokio-rs/tokio/blob/b42f21ec3e212ace25331d0c13889a45769e6006/tokio/src/signal/unix/driver.rs#L35
414 /// [3]: by returning an upgraded Arc, tokio trusts callers to not "use it for too long", an
415 /// opaque non-clone-copy-or-send guard would be stronger than this. See:
416 /// https://github.com/tokio-rs/tokio/blob/b42f21ec3e212ace25331d0c13889a45769e6006/tokio/src/io/driver/mod.rs#L297
417 pub fn on_parent_drop(&self, root_scope: &ScopeHandle) {
418 // Drop all tasks.
419 // Any use of fasync::unblock can involve a waker. Wakers hold weak references to tasks, but
420 // as part of waking, there's an upgrade to a strong reference, so for a small amount of
421 // time `fasync::unblock` can hold a strong reference to a task which in turn holds the
422 // future for the task which in turn could hold references to receivers, which, if we did
423 // nothing about it, would trip the assertion below. For that reason, we forcibly drop the
424 // task futures here.
425 root_scope.drop_all_tasks();
426
427 // Drop all of the uncompleted tasks
428 while self.ready_tasks.pop().is_some() {}
429
430 // Deregister the timer receivers so that we can perform the check below.
431 self.monotonic_timers.deregister();
432 self.boot_timers.deregister();
433
434 // Do not allow any receivers to outlive the executor. That's very likely a bug waiting to
435 // happen. See discussion above.
436 //
437 // If you're here because you hit this panic check your code for:
438 //
439 // - A struct that contains a fuchsia_async::Executor NOT in the last position (last
440 // position gets dropped last: https://doc.rust-lang.org/reference/destructors.html).
441 //
442 // - A function scope that contains a fuchsia_async::Executor NOT in the first position
443 // (first position in function scope gets dropped last:
444 // https://doc.rust-lang.org/reference/destructors.html?highlight=scope#drop-scopes).
445 //
446 // - A function that holds a `fuchsia_async::Executor` in scope and whose last statement
447 // contains a temporary (temporaries are dropped after the function scope:
448 // https://doc.rust-lang.org/reference/destructors.html#temporary-scopes). This usually
449 // looks like a `match` statement at the end of the function without a semicolon.
450 //
451 // - Storing channel and FIDL objects in static variables.
452 //
453 // - fuchsia_async::unblock calls that move channels or FIDL objects to another thread.
454 assert!(self.receivers.is_empty(), "receivers must not outlive their executor");
455
456 // Remove the thread-local executor set in `new`.
457 EHandle::rm_local();
458 }
459
460 // The debugger looks for this function on the stack, so if its (fully-qualified) name changes,
461 // the debugger needs to be updated.
462 // LINT.IfChange
463 pub fn worker_lifecycle<const UNTIL_STALLED: bool>(
464 self: &Arc<Executor>,
465 // The main task should be specified for a single-threaded executor, but it isn't necessary
466 // for a multi-threaded executor since it has an independent mechanism for tracking when
467 // the main task terminates.
468 // TODO(https://fxbug.dev/481030722) remove this parameter
469 main_task: Option<&TaskHandle>,
470 ) {
471 // LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
472
473 assert!(
474 !self.is_local() || self.first_thread_id.get() == Some(&std::thread::current().id())
475 );
476
477 self.monotonic_timers.register(self);
478 self.boot_timers.register(self);
479
480 loop {
481 // Keep track of whether we are considered asleep.
482 let mut sleeping = false;
483
484 match self.poll_ready_tasks(main_task) {
485 PollReadyTasksResult::NoneReady => {
486 // No more tasks, indicate we are sleeping. We use SeqCst ordering because we
487 // want this change here to happen *before* we check ready_tasks below. This
488 // synchronizes with notify_task_ready which is called *after* a task is added
489 // to ready_tasks.
490 const ONE_SLEEPING: ThreadsState = ThreadsState(0).with_sleeping(1);
491 self.threads_state.fetch_add(ONE_SLEEPING.0, Ordering::SeqCst);
492 // Check ready tasks again. If a task got posted, wake up. This has to be done
493 // because a notification won't get sent if there is at least one active thread
494 // so there's a window between the preceding two lines where a task could be
495 // made ready and a notification is not sent because it looks like there is at
496 // least one thread running.
497 if self.ready_tasks.is_empty() {
498 sleeping = true;
499 } else {
500 // We lost a race, we're no longer sleeping.
501 self.threads_state.fetch_sub(ONE_SLEEPING.0, Ordering::Relaxed);
502 }
503 }
504 PollReadyTasksResult::MoreReady => {}
505 PollReadyTasksResult::MainTaskCompleted => return,
506 }
507
508 // Check done here after updating threads_state to avoid shutdown races.
509 if self.done.load(Ordering::SeqCst) {
510 return;
511 }
512
513 enum Work {
514 None,
515 Packet(zx::Packet),
516 Stalled,
517 }
518
519 let mut notified = false;
520 let work = {
521 // If we're considered awake choose INFINITE_PAST which will make the wait call
522 // return immediately. Otherwise, wait until a packet arrives.
523 let deadline = if !sleeping || UNTIL_STALLED {
524 zx::Instant::INFINITE_PAST
525 } else {
526 zx::Instant::INFINITE
527 };
528
529 match self.port.wait(deadline) {
530 Ok(packet) => {
531 if packet.key() == TASK_READY_WAKEUP_ID {
532 notified = true;
533 Work::None
534 } else {
535 Work::Packet(packet)
536 }
537 }
538 Err(zx::Status::TIMED_OUT) => {
539 if !UNTIL_STALLED || !sleeping {
540 Work::None
541 } else {
542 Work::Stalled
543 }
544 }
545 Err(status) => {
546 panic!("Error calling port wait: {status:?}");
547 }
548 }
549 };
550
551 let threads_state_sub =
552 ThreadsState(0).with_sleeping(sleeping as u8).with_notified(notified as u8);
553 if threads_state_sub.0 > 0 {
554 self.threads_state.fetch_sub(threads_state_sub.0, Ordering::Relaxed);
555 }
556
557 match work {
558 Work::Packet(packet) => self.receivers.receive_packet(packet.key(), packet),
559 Work::None => {}
560 Work::Stalled => return,
561 }
562 }
563 }
564
565 /// Drops the main task.
566 ///
567 /// # Safety
568 ///
569 /// The caller must guarantee that the executor isn't running.
570 pub(super) unsafe fn drop_main_task(&self, root_scope: &ScopeHandle, task: &TaskHandle) {
571 unsafe { root_scope.drop_task_unchecked(task) };
572 }
573
574 fn try_poll(&self, task: TaskHandle) -> bool {
575 let task_waker = task.waker();
576 let poll_result = TaskHandle::set_current_with(&task, || {
577 task.try_poll(&mut Context::from_waker(&task_waker))
578 });
579 match poll_result {
580 AttemptPollResult::Yield => {
581 self.ready_tasks.push(task);
582 false
583 }
584 AttemptPollResult::IFinished | AttemptPollResult::Aborted => {
585 task.scope().task_did_finish(&task);
586 true
587 }
588 _ => false,
589 }
590 }
591
592 /// Returns the monotonic timers.
593 pub fn monotonic_timers(&self) -> &Timers<MonotonicInstant> {
594 &self.monotonic_timers
595 }
596
597 /// Returns the boot timers.
598 pub fn boot_timers(&self) -> &Timers<BootInstant> {
599 &self.boot_timers
600 }
601
602 fn poll_tasks(&self, callback: impl FnOnce(), main_task: Option<&TaskHandle>) {
603 assert!(!self.is_local);
604
605 // Increment the count of foreign threads.
606 const ONE_FOREIGN: ThreadsState = ThreadsState(0).with_foreign(1);
607 self.threads_state.fetch_add(ONE_FOREIGN.0, Ordering::Relaxed);
608
609 callback();
610
611 // Poll up to 16 tasks.
612 for _ in 0..16 {
613 let Some(task) = self.ready_tasks.pop() else {
614 break;
615 };
616 let is_main = Some(&task) == main_task;
617 if self.try_poll(task) && is_main {
618 break;
619 }
620 self.polled.fetch_add(1, Ordering::Relaxed);
621 }
622
623 let mut threads_state = ThreadsState(
624 self.threads_state.fetch_sub(ONE_FOREIGN.0, Ordering::SeqCst) - ONE_FOREIGN.0,
625 );
626
627 if !self.ready_tasks.is_empty() {
628 // There are tasks still ready to run, so wake up a thread if all the other threads are
629 // sleeping.
630 while threads_state == ThreadsState(0).with_sleeping(self.num_threads) {
631 match self.try_notify(threads_state) {
632 Ok(()) => break,
633 Err(s) => threads_state = s,
634 }
635 }
636 }
637 }
638
639 pub fn task_is_ready(&self, task: TaskHandle) {
640 self.ready_tasks.push(task);
641 self.notify_task_ready();
642 }
643}
644
645#[cfg(test)]
646impl Drop for Executor {
647 fn drop(&mut self) {
648 ACTIVE_EXECUTORS.fetch_sub(1, Ordering::Relaxed);
649 }
650}
651
652/// A handle to an executor.
653#[derive(Clone)]
654pub struct EHandle {
655 // LINT.IfChange
656 pub(super) root_scope: ScopeHandle,
657 // LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
658}
659
660impl fmt::Debug for EHandle {
661 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
662 f.debug_struct("EHandle").field("port", &self.inner().port).finish()
663 }
664}
665
666impl EHandle {
667 /// Returns the thread-local executor.
668 ///
669 /// # Panics
670 ///
671 /// If called outside the context of an active async executor.
672 pub fn local() -> Self {
673 let root_scope = EXECUTOR
674 .with(|e| e.borrow().as_ref().map(|x| x.clone()))
675 .expect("Fuchsia Executor must be created first");
676
677 EHandle { root_scope }
678 }
679
680 /// Set the fake time to a given value.
681 ///
682 /// # Panics
683 ///
684 /// If the executor was not created with fake time.
685 pub fn set_fake_time(&self, t: MonotonicInstant) {
686 self.inner().set_fake_time(t)
687 }
688
689 pub(super) fn rm_local() {
690 EXECUTOR.with(|e| *e.borrow_mut() = None);
691 }
692
693 /// The root scope of the executor.
694 ///
695 /// This can be used to spawn tasks that live as long as the executor, and
696 /// to create shorter-lived child scopes.
697 ///
698 /// Most users should create an owned scope with
699 /// [`Scope::new_with_name`][crate::Scope::new_with_name] instead of using this method.
700 pub fn global_scope(&self) -> &ScopeHandle {
701 &self.root_scope
702 }
703
704 /// Get a reference to the Fuchsia `zx::Port` being used to listen for events.
705 pub fn port(&self) -> &zx::Port {
706 &self.inner().port
707 }
708
709 /// Registers a `PacketReceiver` with the executor and returns a registration.
710 /// The `PacketReceiver` will be deregistered when the `Registration` is dropped.
711 pub fn register_receiver<T: PacketReceiver>(&self, receiver: T) -> ReceiverRegistration<T> {
712 self.inner().receivers.register(self.inner().clone(), receiver)
713 }
714
715 /// Registers a pinned `RawPacketReceiver` with the executor.
716 ///
717 /// The registration will be deregistered when dropped.
718 ///
719 /// NOTE: Unlike with `register_receiver`, `receive_packet` will be called whilst a lock is
720 /// held, so it is not safe to register or unregister receivers at that time.
721 pub fn register_pinned<T: PacketReceiver>(
722 &self,
723 raw_registration: Pin<&mut RawReceiverRegistration<T>>,
724 ) {
725 self.inner().receivers.register_pinned(self.clone(), raw_registration);
726 }
727
728 #[inline(always)]
729 pub(crate) fn inner(&self) -> &Arc<Executor> {
730 self.root_scope.executor()
731 }
732
733 /// Spawn a new task to be run on this executor.
734 ///
735 /// Tasks spawned using this method must be thread-safe (implement the `Send` trait), as they
736 /// may be run on either a singlethreaded or multithreaded executor.
737 pub fn spawn_detached(&self, future: impl Future<Output = ()> + Send + 'static) {
738 self.global_scope().spawn(future);
739 }
740
741 /// Spawn a new task to be run on this executor.
742 ///
743 /// This is similar to the `spawn_detached` method, but tasks spawned using this method do not
744 /// have to be threads-safe (implement the `Send` trait). In return, this method requires that
745 /// this executor is a LocalExecutor.
746 pub fn spawn_local_detached(&self, future: impl Future<Output = ()> + 'static) {
747 self.global_scope().spawn_local(future);
748 }
749
750 pub(crate) fn mono_timers(&self) -> &Arc<Timers<MonotonicInstant>> {
751 &self.inner().monotonic_timers
752 }
753
754 pub(crate) fn boot_timers(&self) -> &Arc<Timers<BootInstant>> {
755 &self.inner().boot_timers
756 }
757
758 /// Calls `callback` in the context of the executor and then polls (a limited number of) tasks
759 /// that are ready to run. If tasks remain ready and no other threads are running, a thread
760 /// will be woken. This can end up being a performance win in the case that the queue can be
761 /// cleared without needing to wake any other thread.
762 ///
763 /// # Panics
764 ///
765 /// If called on a single-threaded executor or if this thread is a thread managed by the
766 /// executor.
767 pub fn poll_tasks(&self, callback: impl FnOnce()) {
768 EXECUTOR.with(|e| {
769 assert!(
770 e.borrow_mut().replace(self.root_scope.clone()).is_none(),
771 "This thread is already associated with an executor"
772 );
773 });
774
775 // `poll_tasks` should only ever be used on a multi-threaded executor, it's safe to pass
776 // None.
777 self.inner().poll_tasks(callback, None);
778
779 EXECUTOR.with(|e| *e.borrow_mut() = None);
780 }
781}
782
783// AtomicFutureHandle can have a lifetime (for local executors we allow the main task to have a
784// non-static lifetime). The executor doesn't handle this though; the executor just assumes all
785// tasks have the 'static lifetime. It's up to the local executor to extend the lifetime and make
786// it safe.
787pub type TaskHandle = AtomicFutureHandle<'static>;
788
789thread_local! {
790 static CURRENT_TASK: Cell<*const TaskHandle> = const { Cell::new(std::ptr::null()) };
791}
792
793impl TaskHandle {
794 pub(crate) fn with_current<R>(f: impl FnOnce(Option<&TaskHandle>) -> R) -> R {
795 CURRENT_TASK.with(|cur| {
796 let cur = cur.get();
797 let cur = unsafe { cur.as_ref() };
798 f(cur)
799 })
800 }
801
802 fn set_current_with<R>(task: &TaskHandle, f: impl FnOnce() -> R) -> R {
803 CURRENT_TASK.with(|cur| {
804 cur.set(task);
805 let result = f();
806 cur.set(std::ptr::null());
807 result
808 })
809 }
810}
811
812#[cfg(test)]
813mod tests {
814 use super::{ACTIVE_EXECUTORS, EHandle};
815 use crate::{LocalExecutorBuilder, SendExecutorBuilder};
816 use std::sync::Arc;
817 use std::sync::atomic::{AtomicU64, Ordering};
818
819 #[test]
820 fn test_no_leaks() {
821 std::thread::spawn(|| SendExecutorBuilder::new().num_threads(1).build().run(async {}))
822 .join()
823 .unwrap();
824
825 assert_eq!(ACTIVE_EXECUTORS.load(Ordering::Relaxed), 0);
826 }
827
828 #[test]
829 fn poll_tasks() {
830 SendExecutorBuilder::new().num_threads(1).build().run(async {
831 let ehandle = EHandle::local();
832
833 // This will tie up the executor's only running thread which ensures that the task
834 // we spawn below can only run on the foreign thread.
835 std::thread::spawn(move || {
836 let ran = Arc::new(AtomicU64::new(0));
837 ehandle.poll_tasks(|| {
838 let ran = ran.clone();
839 ehandle.spawn_detached(async move {
840 ran.fetch_add(1, Ordering::Relaxed);
841 });
842 });
843
844 // The spawned task should have run in this thread.
845 assert_eq!(ran.load(Ordering::Relaxed), 1);
846 })
847 .join()
848 .unwrap();
849 });
850 }
851
852 #[test]
853 #[should_panic]
854 fn spawn_local_from_different_thread() {
855 let _executor = LocalExecutorBuilder::new().build();
856 let ehandle = EHandle::local();
857 let _ = std::thread::spawn(move || {
858 ehandle.spawn_local_detached(async {});
859 })
860 .join();
861 }
862}