fuchsia_async/runtime/fuchsia/executor/common.rs
1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use super::super::timer::Timers;
6use super::atomic_future::{AtomicFutureHandle, AttemptPollResult};
7use super::packets::{PacketReceiver, PacketReceiverMap, ReceiverRegistration};
8use super::scope::ScopeHandle;
9use super::time::{BootInstant, MonotonicInstant};
10use crossbeam::queue::SegQueue;
11use fuchsia_sync::Mutex;
12use zx::BootDuration;
13
14use std::any::Any;
15use std::cell::{Cell, RefCell};
16use std::future::Future;
17use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU32, AtomicU64, AtomicUsize, Ordering};
18use std::sync::Arc;
19use std::task::Context;
20use std::{fmt, u64, usize};
21
22pub(crate) const TASK_READY_WAKEUP_ID: u64 = u64::MAX - 1;
23
24/// The id of the main task, which is a virtual task that lives from construction
25/// to destruction of the executor. The main task may correspond to multiple
26/// main futures, in cases where the executor runs multiple times during its lifetime.
27pub(crate) const MAIN_TASK_ID: usize = 0;
28
29thread_local!(
30 static EXECUTOR: RefCell<Option<ScopeHandle>> = const { RefCell::new(None) }
31);
32
33pub enum ExecutorTime {
34 RealTime,
35 /// Fake readings used in tests.
36 FakeTime {
37 // The fake monotonic clock reading.
38 mono_reading_ns: AtomicI64,
39 // An offset to add to mono_reading_ns to get the reading of the boot
40 // clock, disregarding the difference in timelines.
41 //
42 // We disregard the fact that the reading and offset can not be
43 // read atomically, this is usually not relevant in tests.
44 mono_to_boot_offset_ns: AtomicI64,
45 },
46}
47
48enum PollReadyTasksResult {
49 NoneReady,
50 MoreReady,
51 MainTaskCompleted,
52}
53
54/// 24 16 8 0
55/// +------------+------------+------------+
56/// | foreign | notified | sleeping |
57/// +------------+------------+------------+
58///
59/// sleeping : the number of threads sleeping
60/// notified : the number of notifications posted to wake sleeping threads
61/// foreign : the number of foreign threads processing tasks
62#[derive(Clone, Copy, Eq, PartialEq)]
63struct ThreadsState(u32);
64
65impl ThreadsState {
66 const fn sleeping(&self) -> u8 {
67 self.0 as u8
68 }
69
70 const fn notified(&self) -> u8 {
71 (self.0 >> 8) as u8
72 }
73
74 const fn with_sleeping(self, sleeping: u8) -> Self {
75 Self((self.0 & !0xff) | sleeping as u32)
76 }
77
78 const fn with_notified(self, notified: u8) -> Self {
79 Self(self.0 & !0xff00 | (notified as u32) << 8)
80 }
81
82 const fn with_foreign(self, foreign: u8) -> Self {
83 Self(self.0 & !0xff0000 | (foreign as u32) << 16)
84 }
85}
86
87#[cfg(test)]
88static ACTIVE_EXECUTORS: AtomicUsize = AtomicUsize::new(0);
89
90pub(crate) struct Executor {
91 pub(super) port: zx::Port,
92 monotonic_timers: Arc<Timers<MonotonicInstant>>,
93 boot_timers: Arc<Timers<BootInstant>>,
94 pub(super) done: AtomicBool,
95 is_local: bool,
96 receivers: Mutex<PacketReceiverMap<Arc<dyn PacketReceiver>>>,
97 task_count: AtomicUsize,
98 pub(super) ready_tasks: SegQueue<TaskHandle>,
99 time: ExecutorTime,
100 // The low byte is the number of threads currently sleeping. The high byte is the number of
101 // of wake-up notifications pending.
102 pub(super) threads_state: AtomicU32,
103 pub(super) num_threads: u8,
104 pub(super) polled: AtomicU64,
105 // Data that belongs to the user that can be accessed via EHandle::local(). See
106 // `TestExecutor::poll_until_stalled`.
107 pub(super) owner_data: Mutex<Option<Box<dyn Any + Send>>>,
108}
109
110impl Executor {
111 pub fn new(time: ExecutorTime, is_local: bool, num_threads: u8) -> Self {
112 #[cfg(test)]
113 ACTIVE_EXECUTORS.fetch_add(1, Ordering::Relaxed);
114
115 let mut receivers: PacketReceiverMap<Arc<dyn PacketReceiver>> = PacketReceiverMap::new();
116
117 // Is this a fake-time executor?
118 let is_fake = matches!(
119 time,
120 ExecutorTime::FakeTime { mono_reading_ns: _, mono_to_boot_offset_ns: _ }
121 );
122 let monotonic_timers = receivers.insert(|key| {
123 let timers = Arc::new(Timers::<MonotonicInstant>::new(key, is_fake));
124 (timers.clone(), timers)
125 });
126 let boot_timers = receivers.insert(|key| {
127 let timers = Arc::new(Timers::<BootInstant>::new(key, is_fake));
128 (timers.clone(), timers)
129 });
130
131 Executor {
132 port: zx::Port::create(),
133 monotonic_timers,
134 boot_timers,
135 done: AtomicBool::new(false),
136 is_local,
137 receivers: Mutex::new(receivers),
138 task_count: AtomicUsize::new(MAIN_TASK_ID + 1),
139 ready_tasks: SegQueue::new(),
140 time,
141 threads_state: AtomicU32::new(0),
142 num_threads,
143 polled: AtomicU64::new(0),
144 owner_data: Mutex::new(None),
145 }
146 }
147
148 pub fn set_local(root_scope: ScopeHandle) {
149 EXECUTOR.with(|e| {
150 let mut e = e.borrow_mut();
151 assert!(e.is_none(), "Cannot create multiple Fuchsia Executors");
152 *e = Some(root_scope);
153 });
154 }
155
156 fn poll_ready_tasks(&self) -> PollReadyTasksResult {
157 loop {
158 for _ in 0..16 {
159 let Some(task) = self.ready_tasks.pop() else {
160 return PollReadyTasksResult::NoneReady;
161 };
162 let task_id = task.id();
163 let complete = self.try_poll(task);
164 if complete && task_id == MAIN_TASK_ID {
165 return PollReadyTasksResult::MainTaskCompleted;
166 }
167 self.polled.fetch_add(1, Ordering::Relaxed);
168 }
169 // We didn't finish all the ready tasks. If there are sleeping threads, post a
170 // notification to wake one up.
171 let mut threads_state = ThreadsState(self.threads_state.load(Ordering::Relaxed));
172 loop {
173 if threads_state.sleeping() == 0 {
174 // All threads are awake now. Prevent starvation.
175 return PollReadyTasksResult::MoreReady;
176 }
177 if threads_state.notified() >= threads_state.sleeping() {
178 // All sleeping threads have been notified. Keep going and poll more tasks.
179 break;
180 }
181 match self.try_notify(threads_state) {
182 Ok(()) => break,
183 Err(s) => threads_state = s,
184 }
185 }
186 }
187 }
188
189 pub fn is_local(&self) -> bool {
190 self.is_local
191 }
192
193 pub fn next_task_id(&self) -> usize {
194 self.task_count.fetch_add(1, Ordering::Relaxed)
195 }
196
197 pub fn notify_task_ready(&self) {
198 // Only post if there's no thread running (or soon to be running). If we happen to be
199 // running on a thread for this executor, then threads_state won't be equal to num_threads,
200 // which means notifications only get fired if this is from a non-async thread, or a thread
201 // that belongs to a different executor. We use SeqCst ordering here to make sure this load
202 // happens *after* the change to ready_tasks and to synchronize with worker_lifecycle.
203 let mut threads_state = ThreadsState(self.threads_state.load(Ordering::SeqCst));
204
205 // We only want to notify if there are no pending notifications and there are no other
206 // threads running.
207 while threads_state == ThreadsState(0).with_sleeping(self.num_threads) {
208 match self.try_notify(threads_state) {
209 Ok(()) => break,
210 Err(s) => threads_state = s,
211 }
212 }
213 }
214
215 /// Tries to notify a thread to wake up. Returns threads_state if it fails.
216 fn try_notify(&self, old_threads_state: ThreadsState) -> Result<(), ThreadsState> {
217 self.threads_state
218 .compare_exchange_weak(
219 old_threads_state.0,
220 old_threads_state.0 + ThreadsState(0).with_notified(1).0,
221 Ordering::Relaxed,
222 Ordering::Relaxed,
223 )
224 .map(|_| self.notify_id(TASK_READY_WAKEUP_ID))
225 .map_err(ThreadsState)
226 }
227
228 pub fn wake_one_thread(&self) {
229 let mut threads_state = ThreadsState(self.threads_state.load(Ordering::Relaxed));
230 let current_sleeping = threads_state.sleeping();
231 if current_sleeping == 0 {
232 return;
233 }
234 while threads_state.notified() == 0 && threads_state.sleeping() >= current_sleeping {
235 match self.try_notify(threads_state) {
236 Ok(()) => break,
237 Err(s) => threads_state = s,
238 }
239 }
240 }
241
242 pub fn notify_id(&self, id: u64) {
243 let up = zx::UserPacket::from_u8_array([0; 32]);
244 let packet = zx::Packet::from_user_packet(id, 0 /* status??? */, up);
245 if let Err(e) = self.port.queue(&packet) {
246 // TODO: logging
247 eprintln!("Failed to queue notify in port: {:?}", e);
248 }
249 }
250
251 pub fn deliver_packet(&self, key: u64, packet: zx::Packet) {
252 let receiver = match self.receivers.lock().get(key) {
253 // Clone the `Arc` so that we don't hold the lock
254 // any longer than absolutely necessary.
255 // The `receive_packet` impl may be arbitrarily complex.
256 Some(receiver) => receiver.clone(),
257 None => return,
258 };
259 receiver.receive_packet(packet);
260 }
261
262 /// Returns the current reading of the monotonic clock.
263 ///
264 /// For test executors running in fake time, returns the reading of the
265 /// fake monotonic clock.
266 pub fn now(&self) -> MonotonicInstant {
267 match &self.time {
268 ExecutorTime::RealTime => MonotonicInstant::from_zx(zx::MonotonicInstant::get()),
269 ExecutorTime::FakeTime { mono_reading_ns: t, .. } => {
270 MonotonicInstant::from_nanos(t.load(Ordering::Relaxed))
271 }
272 }
273 }
274
275 /// Returns the current reading of the boot clock.
276 ///
277 /// For test executors running in fake time, returns the reading of the
278 /// fake boot clock.
279 pub fn boot_now(&self) -> BootInstant {
280 match &self.time {
281 ExecutorTime::RealTime => BootInstant::from_zx(zx::BootInstant::get()),
282
283 ExecutorTime::FakeTime { mono_reading_ns: t, mono_to_boot_offset_ns } => {
284 // The two atomic values are loaded one after the other. This should
285 // not normally be an issue in tests.
286 let fake_mono_now = MonotonicInstant::from_nanos(t.load(Ordering::Relaxed));
287 let boot_offset_ns = mono_to_boot_offset_ns.load(Ordering::Relaxed);
288 BootInstant::from_nanos(fake_mono_now.into_nanos() + boot_offset_ns)
289 }
290 }
291 }
292
293 /// Sets the reading of the fake monotonic clock.
294 ///
295 /// # Panics
296 ///
297 /// If called on an executor that runs in real time.
298 pub fn set_fake_time(&self, new: MonotonicInstant) {
299 let boot_offset_ns = match &self.time {
300 ExecutorTime::RealTime => {
301 panic!("Error: called `set_fake_time` on an executor using actual time.")
302 }
303 ExecutorTime::FakeTime { mono_reading_ns: t, mono_to_boot_offset_ns } => {
304 t.store(new.into_nanos(), Ordering::Relaxed);
305 mono_to_boot_offset_ns.load(Ordering::Relaxed)
306 }
307 };
308 self.monotonic_timers.maybe_notify(new);
309
310 // Changing fake time also affects boot time. Notify boot clocks as well.
311 let new_boot_time = BootInstant::from_nanos(new.into_nanos() + boot_offset_ns);
312 self.boot_timers.maybe_notify(new_boot_time);
313 }
314
315 // Sets a new offset between boot and monotonic time.
316 //
317 // Only works for executors operating in fake time.
318 // The change in the fake offset will wake expired boot timers.
319 pub fn set_fake_boot_to_mono_offset(&self, offset: BootDuration) {
320 let mono_now_ns = match &self.time {
321 ExecutorTime::RealTime => {
322 panic!("Error: called `set_fake_boot_to_mono_offset` on an executor using actual time.")
323 }
324 ExecutorTime::FakeTime { mono_reading_ns: t, mono_to_boot_offset_ns: b } => {
325 // We ignore the non-atomic update between b and t, it is likely
326 // not relevant in tests.
327 b.store(offset.into_nanos(), Ordering::Relaxed);
328 t.load(Ordering::Relaxed)
329 }
330 };
331 let new_boot_now = BootInstant::from_nanos(mono_now_ns) + offset;
332 self.boot_timers.maybe_notify(new_boot_now);
333 }
334
335 /// Returns `true` if this executor is running in real time. Returns
336 /// `false` if this executor si running in fake time.
337 pub fn is_real_time(&self) -> bool {
338 matches!(self.time, ExecutorTime::RealTime)
339 }
340
341 /// Must be called before `on_parent_drop`.
342 ///
343 /// Done flag must be set before dropping packet receivers
344 /// so that future receivers that attempt to deregister themselves
345 /// know that it's okay if their entries are already missing.
346 pub fn mark_done(&self) {
347 self.done.store(true, Ordering::SeqCst);
348
349 // Make sure there's at least one notification outstanding per thread to wake up all
350 // workers. This might be more notifications than required, but this way we don't have to
351 // worry about races where tasks are just about to sleep; when a task receives the
352 // notification, it will check done and terminate.
353 let mut threads_state = ThreadsState(self.threads_state.load(Ordering::Relaxed));
354 let num_threads = self.num_threads;
355 loop {
356 let notified = threads_state.notified();
357 if notified >= num_threads {
358 break;
359 }
360 match self.threads_state.compare_exchange_weak(
361 threads_state.0,
362 threads_state.with_notified(num_threads).0,
363 Ordering::Relaxed,
364 Ordering::Relaxed,
365 ) {
366 Ok(_) => {
367 for _ in notified..num_threads {
368 self.notify_id(TASK_READY_WAKEUP_ID);
369 }
370 return;
371 }
372 Err(old) => threads_state = ThreadsState(old),
373 }
374 }
375 }
376
377 /// Notes about the lifecycle of an Executor.
378 ///
379 /// a) The Executor stands as the only way to run a reactor based on a Fuchsia port, but the
380 /// lifecycle of the port itself is not currently tied to it. Executor vends clones of its
381 /// inner Arc structure to all receivers, so we don't have a type-safe way of ensuring that
382 /// the port is dropped alongside the Executor as it should.
383 /// TODO(https://fxbug.dev/42154828): Ensure the port goes away with the executor.
384 ///
385 /// b) The Executor's lifetime is also tied to the thread-local variable pointing to the
386 /// "current" executor being set, and that's unset when the executor is dropped.
387 ///
388 /// Point (a) is related to "what happens if I use a receiver after the executor is dropped",
389 /// and point (b) is related to "what happens when I try to create a new receiver when there
390 /// is no executor".
391 ///
392 /// Tokio, for example, encodes the lifetime of the reactor separately from the thread-local
393 /// storage [1]. And the reactor discourages usage of strong references to it by vending weak
394 /// references to it [2] instead of strong.
395 ///
396 /// There are pros and cons to both strategies. For (a), tokio encourages (but doesn't
397 /// enforce [3]) type-safety by vending weak pointers, but those add runtime overhead when
398 /// upgrading pointers. For (b) the difference mostly stand for "when is it safe to use IO
399 /// objects/receivers". Tokio says it's only safe to use them whenever a guard is in scope.
400 /// Fuchsia-async says it's safe to use them when a fuchsia_async::Executor is still in scope
401 /// in that thread.
402 ///
403 /// This acts as a prelude to the panic encoded in Executor::drop when receivers haven't
404 /// unregistered themselves when the executor drops. The choice to panic was made based on
405 /// patterns in fuchsia-async that may come to change:
406 ///
407 /// - Executor vends strong references to itself and those references are *stored* by most
408 /// receiver implementations (as opposed to reached out on TLS every time).
409 /// - Fuchsia-async objects return zx::Status on wait calls, there isn't an appropriate and
410 /// easy to understand error to return when polling on an extinct executor.
411 /// - All receivers are implemented in this crate and well-known.
412 ///
413 /// [1]: https://docs.rs/tokio/1.5.0/tokio/runtime/struct.Runtime.html#method.enter
414 /// [2]: https://github.com/tokio-rs/tokio/blob/b42f21ec3e212ace25331d0c13889a45769e6006/tokio/src/signal/unix/driver.rs#L35
415 /// [3]: by returning an upgraded Arc, tokio trusts callers to not "use it for too long", an
416 /// opaque non-clone-copy-or-send guard would be stronger than this. See:
417 /// https://github.com/tokio-rs/tokio/blob/b42f21ec3e212ace25331d0c13889a45769e6006/tokio/src/io/driver/mod.rs#L297
418 pub fn on_parent_drop(&self, root_scope: &ScopeHandle) {
419 // Drop all tasks.
420 // Any use of fasync::unblock can involve a waker. Wakers hold weak references to tasks, but
421 // as part of waking, there's an upgrade to a strong reference, so for a small amount of
422 // time `fasync::unblock` can hold a strong reference to a task which in turn holds the
423 // future for the task which in turn could hold references to receivers, which, if we did
424 // nothing about it, would trip the assertion below. For that reason, we forcibly drop the
425 // task futures here.
426 root_scope.drop_all_tasks();
427
428 // Drop all of the uncompleted tasks
429 while let Some(_) = self.ready_tasks.pop() {}
430
431 // Unregister the timer receivers so that we can perform the check below.
432 self.receivers.lock().remove(self.monotonic_timers.port_key());
433 self.receivers.lock().remove(self.boot_timers.port_key());
434
435 // Do not allow any receivers to outlive the executor. That's very likely a bug waiting to
436 // happen. See discussion above.
437 //
438 // If you're here because you hit this panic check your code for:
439 //
440 // - A struct that contains a fuchsia_async::Executor NOT in the last position (last
441 // position gets dropped last: https://doc.rust-lang.org/reference/destructors.html).
442 //
443 // - A function scope that contains a fuchsia_async::Executor NOT in the first position
444 // (first position in function scope gets dropped last:
445 // https://doc.rust-lang.org/reference/destructors.html?highlight=scope#drop-scopes).
446 //
447 // - A function that holds a `fuchsia_async::Executor` in scope and whose last statement
448 // contains a temporary (temporaries are dropped after the function scope:
449 // https://doc.rust-lang.org/reference/destructors.html#temporary-scopes). This usually
450 // looks like a `match` statement at the end of the function without a semicolon.
451 //
452 // - Storing channel and FIDL objects in static variables.
453 //
454 // - fuchsia_async::unblock calls that move channels or FIDL objects to another thread.
455 assert!(
456 self.receivers.lock().mapping.is_empty(),
457 "receivers must not outlive their executor"
458 );
459
460 // Remove the thread-local executor set in `new`.
461 EHandle::rm_local();
462 }
463
464 // The debugger looks for this function on the stack, so if its (fully-qualified) name changes,
465 // the debugger needs to be updated.
466 // LINT.IfChange
467 pub fn worker_lifecycle<const UNTIL_STALLED: bool>(self: &Arc<Executor>) {
468 // LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
469 loop {
470 // Keep track of whether we are considered asleep.
471 let mut sleeping = false;
472
473 match self.poll_ready_tasks() {
474 PollReadyTasksResult::NoneReady => {
475 // No more tasks, indicate we are sleeping. We use SeqCst ordering because we
476 // want this change here to happen *before* we check ready_tasks below. This
477 // synchronizes with notify_task_ready which is called *after* a task is added
478 // to ready_tasks.
479 const ONE_SLEEPING: ThreadsState = ThreadsState(0).with_sleeping(1);
480 self.threads_state.fetch_add(ONE_SLEEPING.0, Ordering::SeqCst);
481 // Check ready tasks again. If a task got posted, wake up. This has to be done
482 // because a notification won't get sent if there is at least one active thread
483 // so there's a window between the preceding two lines where a task could be
484 // made ready and a notification is not sent because it looks like there is at
485 // least one thread running.
486 if self.ready_tasks.is_empty() {
487 sleeping = true;
488 } else {
489 // We lost a race, we're no longer sleeping.
490 self.threads_state.fetch_sub(ONE_SLEEPING.0, Ordering::Relaxed);
491 }
492 }
493 PollReadyTasksResult::MoreReady => {}
494 PollReadyTasksResult::MainTaskCompleted => return,
495 }
496
497 // Check done here after updating threads_state to avoid shutdown races.
498 if self.done.load(Ordering::SeqCst) {
499 return;
500 }
501
502 enum Work {
503 None,
504 Packet(zx::Packet),
505 Stalled,
506 }
507
508 let mut notified = false;
509 let work = {
510 // If we're considered awake choose INFINITE_PAST which will make the wait call
511 // return immediately. Otherwise, wait until a packet arrives.
512 let deadline = if !sleeping || UNTIL_STALLED {
513 zx::Instant::INFINITE_PAST
514 } else {
515 zx::Instant::INFINITE
516 };
517
518 match self.port.wait(deadline) {
519 Ok(packet) => {
520 if packet.key() == TASK_READY_WAKEUP_ID {
521 notified = true;
522 Work::None
523 } else {
524 Work::Packet(packet)
525 }
526 }
527 Err(zx::Status::TIMED_OUT) => {
528 if !UNTIL_STALLED || !sleeping {
529 Work::None
530 } else {
531 Work::Stalled
532 }
533 }
534 Err(status) => {
535 panic!("Error calling port wait: {:?}", status);
536 }
537 }
538 };
539
540 let threads_state_sub =
541 ThreadsState(0).with_sleeping(sleeping as u8).with_notified(notified as u8);
542 if threads_state_sub.0 > 0 {
543 self.threads_state.fetch_sub(threads_state_sub.0, Ordering::Relaxed);
544 }
545
546 match work {
547 Work::Packet(packet) => {
548 self.deliver_packet(packet.key(), packet);
549 }
550 Work::None => {}
551 Work::Stalled => return,
552 }
553 }
554 }
555
556 /// Drops the main task.
557 ///
558 /// # Safety
559 ///
560 /// The caller must guarantee that the executor isn't running.
561 pub(super) unsafe fn drop_main_task(&self, root_scope: &ScopeHandle) {
562 root_scope.drop_task_unchecked(MAIN_TASK_ID);
563 }
564
565 fn try_poll(&self, task: TaskHandle) -> bool {
566 let task_waker = task.waker();
567 let poll_result = TaskHandle::set_current_with(&task, || {
568 task.try_poll(&mut Context::from_waker(&task_waker))
569 });
570 match poll_result {
571 AttemptPollResult::Yield => {
572 self.ready_tasks.push(task);
573 false
574 }
575 AttemptPollResult::IFinished | AttemptPollResult::Cancelled => {
576 task.scope().task_did_finish(task.id());
577 true
578 }
579 _ => false,
580 }
581 }
582
583 /// Returns the monotonic timers.
584 pub fn monotonic_timers(&self) -> &Timers<MonotonicInstant> {
585 &self.monotonic_timers
586 }
587
588 /// Returns the boot timers.
589 pub fn boot_timers(&self) -> &Timers<BootInstant> {
590 &self.boot_timers
591 }
592
593 fn poll_tasks(&self, callback: impl FnOnce()) {
594 assert!(!self.is_local);
595
596 // Increment the count of foreign threads.
597 const ONE_FOREIGN: ThreadsState = ThreadsState(0).with_foreign(1);
598 self.threads_state.fetch_add(ONE_FOREIGN.0, Ordering::Relaxed);
599
600 callback();
601
602 // Poll up to 16 tasks.
603 for _ in 0..16 {
604 let Some(task) = self.ready_tasks.pop() else {
605 break;
606 };
607 let task_id = task.id();
608 if self.try_poll(task) && task_id == MAIN_TASK_ID {
609 break;
610 }
611 self.polled.fetch_add(1, Ordering::Relaxed);
612 }
613
614 let mut threads_state = ThreadsState(
615 self.threads_state.fetch_sub(ONE_FOREIGN.0, Ordering::SeqCst) - ONE_FOREIGN.0,
616 );
617
618 if !self.ready_tasks.is_empty() {
619 // There are tasks still ready to run, so wake up a thread if all the other threads are
620 // sleeping.
621 while threads_state == ThreadsState(0).with_sleeping(self.num_threads) {
622 match self.try_notify(threads_state) {
623 Ok(()) => break,
624 Err(s) => threads_state = s,
625 }
626 }
627 }
628 }
629
630 pub fn task_is_ready(&self, task: TaskHandle) {
631 self.ready_tasks.push(task);
632 self.notify_task_ready();
633 }
634}
635
636#[cfg(test)]
637impl Drop for Executor {
638 fn drop(&mut self) {
639 ACTIVE_EXECUTORS.fetch_sub(1, Ordering::Relaxed);
640 }
641}
642
643/// A handle to an executor.
644#[derive(Clone)]
645pub struct EHandle {
646 // LINT.IfChange
647 pub(super) root_scope: ScopeHandle,
648 // LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
649}
650
651impl fmt::Debug for EHandle {
652 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
653 f.debug_struct("EHandle").field("port", &self.inner().port).finish()
654 }
655}
656
657impl EHandle {
658 /// Returns the thread-local executor.
659 ///
660 /// # Panics
661 ///
662 /// If called outside the context of an active async executor.
663 pub fn local() -> Self {
664 let root_scope = EXECUTOR
665 .with(|e| e.borrow().as_ref().map(|x| x.clone()))
666 .expect("Fuchsia Executor must be created first");
667
668 EHandle { root_scope }
669 }
670
671 pub(super) fn rm_local() {
672 EXECUTOR.with(|e| *e.borrow_mut() = None);
673 }
674
675 /// The root scope of the executor.
676 ///
677 /// This can be used to spawn tasks that live as long as the executor, and
678 /// to create shorter-lived child scopes.
679 ///
680 /// Most users should create an owned scope with
681 /// [`Scope::new_with_name`][crate::Scope::new_with_name] instead of using this method.
682 pub fn global_scope(&self) -> &ScopeHandle {
683 &self.root_scope
684 }
685
686 /// Get a reference to the Fuchsia `zx::Port` being used to listen for events.
687 pub fn port(&self) -> &zx::Port {
688 &self.inner().port
689 }
690
691 /// Registers a `PacketReceiver` with the executor and returns a registration.
692 /// The `PacketReceiver` will be deregistered when the `Registration` is dropped.
693 pub fn register_receiver<T>(&self, receiver: Arc<T>) -> ReceiverRegistration<T>
694 where
695 T: PacketReceiver,
696 {
697 self.inner().receivers.lock().insert(|key| {
698 (receiver.clone(), ReceiverRegistration { ehandle: self.clone(), key, receiver })
699 })
700 }
701
702 #[inline(always)]
703 pub(crate) fn inner(&self) -> &Arc<Executor> {
704 &self.root_scope.executor()
705 }
706
707 pub(crate) fn deregister_receiver(&self, key: u64) {
708 let mut lock = self.inner().receivers.lock();
709 if lock.contains(key) {
710 lock.remove(key);
711 } else {
712 // The executor is shutting down and already removed the entry.
713 assert!(self.inner().done.load(Ordering::SeqCst), "Missing receiver to deregister");
714 }
715 }
716
717 /// Spawn a new task to be run on this executor.
718 ///
719 /// Tasks spawned using this method must be thread-safe (implement the `Send` trait), as they
720 /// may be run on either a singlethreaded or multithreaded executor.
721 pub fn spawn_detached(&self, future: impl Future<Output = ()> + Send + 'static) {
722 self.global_scope().spawn(future);
723 }
724
725 /// Spawn a new task to be run on this executor.
726 ///
727 /// This is similar to the `spawn_detached` method, but tasks spawned using this method do not
728 /// have to be threads-safe (implement the `Send` trait). In return, this method requires that
729 /// this executor is a LocalExecutor.
730 pub fn spawn_local_detached(&self, future: impl Future<Output = ()> + 'static) {
731 self.global_scope().spawn_local(future);
732 }
733
734 pub(crate) fn mono_timers(&self) -> &Arc<Timers<MonotonicInstant>> {
735 &self.inner().monotonic_timers
736 }
737
738 pub(crate) fn boot_timers(&self) -> &Arc<Timers<BootInstant>> {
739 &self.inner().boot_timers
740 }
741
742 /// Calls `callback` in the context of the executor and then polls (a limited number of) tasks
743 /// that are ready to run. If tasks remain ready and no other threads are running, a thread
744 /// will be woken. This can end up being a performance win in the case that the queue can be
745 /// cleared without needing to wake any other thread.
746 ///
747 /// # Panics
748 ///
749 /// If called on a single-threaded executor or if this thread is a thread managed by the
750 /// executor.
751 pub fn poll_tasks(&self, callback: impl FnOnce()) {
752 EXECUTOR.with(|e| {
753 assert!(
754 e.borrow_mut().replace(self.root_scope.clone()).is_none(),
755 "This thread is already associated with an executor"
756 );
757 });
758
759 self.inner().poll_tasks(callback);
760
761 EXECUTOR.with(|e| *e.borrow_mut() = None);
762 }
763}
764
765// AtomicFutureHandle can have a lifetime (for local executors we allow the main task to have a
766// non-static lifetime). The executor doesn't handle this though; the executor just assumes all
767// tasks have the 'static lifetime. It's up to the local executor to extend the lifetime and make
768// it safe.
769pub type TaskHandle = AtomicFutureHandle<'static>;
770
771thread_local! {
772 static CURRENT_TASK: Cell<*const TaskHandle> = const { Cell::new(std::ptr::null()) };
773}
774
775impl TaskHandle {
776 pub(crate) fn with_current<R>(f: impl FnOnce(Option<&TaskHandle>) -> R) -> R {
777 CURRENT_TASK.with(|cur| {
778 let cur = cur.get();
779 let cur = unsafe { cur.as_ref() };
780 f(cur)
781 })
782 }
783
784 fn set_current_with<R>(task: &TaskHandle, f: impl FnOnce() -> R) -> R {
785 CURRENT_TASK.with(|cur| {
786 cur.set(task);
787 let result = f();
788 cur.set(std::ptr::null());
789 result
790 })
791 }
792}
793
794#[cfg(test)]
795mod tests {
796 use super::{EHandle, ACTIVE_EXECUTORS};
797 use crate::SendExecutor;
798 use std::sync::atomic::{AtomicU64, Ordering};
799 use std::sync::Arc;
800
801 #[test]
802 fn test_no_leaks() {
803 std::thread::spawn(|| SendExecutor::new(1).run(async {})).join().unwrap();
804
805 assert_eq!(ACTIVE_EXECUTORS.load(Ordering::Relaxed), 0);
806 }
807
808 #[test]
809 fn poll_tasks() {
810 SendExecutor::new(1).run(async {
811 let ehandle = EHandle::local();
812
813 // This will tie up the executor's only running thread which ensures that the task
814 // we spawn below can only run on the foreign thread.
815 std::thread::spawn(move || {
816 let ran = Arc::new(AtomicU64::new(0));
817 ehandle.poll_tasks(|| {
818 let ran = ran.clone();
819 ehandle.spawn_detached(async move {
820 ran.fetch_add(1, Ordering::Relaxed);
821 });
822 });
823
824 // The spawned task should have run in this thread.
825 assert_eq!(ran.load(Ordering::Relaxed), 1);
826 })
827 .join()
828 .unwrap();
829 });
830 }
831}