fuchsia_async/runtime/fuchsia/executor/
common.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use super::super::timer::Timers;
6use super::atomic_future::hooks::HooksMap;
7use super::atomic_future::{AtomicFutureHandle, AttemptPollResult};
8use super::packets::{
9    PacketReceiver, PacketReceiverMap, RawReceiverRegistration, ReceiverRegistration,
10};
11use super::scope::ScopeHandle;
12use super::time::{BootInstant, MonotonicInstant};
13use crate::runtime::instrument::TaskInstrument;
14use crossbeam::queue::SegQueue;
15use fuchsia_sync::Mutex;
16use zx::BootDuration;
17
18use std::any::Any;
19use std::cell::{Cell, RefCell};
20use std::fmt;
21use std::future::Future;
22use std::pin::Pin;
23use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU32, AtomicU64, AtomicUsize, Ordering};
24use std::sync::{Arc, OnceLock};
25use std::task::Context;
26use std::thread::ThreadId;
27
28pub(crate) const TASK_READY_WAKEUP_ID: u64 = u64::MAX - 1;
29
30/// The id of the main task, which is a virtual task that lives from construction
31/// to destruction of the executor. The main task may correspond to multiple
32/// main futures, in cases where the executor runs multiple times during its lifetime.
33pub(crate) const MAIN_TASK_ID: usize = 0;
34
35thread_local!(
36    static EXECUTOR: RefCell<Option<ScopeHandle>> = const { RefCell::new(None) }
37);
38
39pub enum ExecutorTime {
40    RealTime,
41    /// Fake readings used in tests.
42    FakeTime {
43        // The fake monotonic clock reading.
44        mono_reading_ns: AtomicI64,
45        // An offset to add to mono_reading_ns to get the reading of the boot
46        // clock, disregarding the difference in timelines.
47        //
48        // We disregard the fact that the reading and offset can not be
49        // read atomically, this is usually not relevant in tests.
50        mono_to_boot_offset_ns: AtomicI64,
51    },
52}
53
54enum PollReadyTasksResult {
55    NoneReady,
56    MoreReady,
57    MainTaskCompleted,
58}
59
60///  24           16           8            0
61///  +------------+------------+------------+
62///  |  foreign   |  notified  |  sleeping  |
63///  +------------+------------+------------+
64///
65///  sleeping : the number of threads sleeping
66///  notified : the number of notifications posted to wake sleeping threads
67///  foreign  : the number of foreign threads processing tasks
68#[derive(Clone, Copy, Eq, PartialEq)]
69struct ThreadsState(u32);
70
71impl ThreadsState {
72    const fn sleeping(&self) -> u8 {
73        self.0 as u8
74    }
75
76    const fn notified(&self) -> u8 {
77        (self.0 >> 8) as u8
78    }
79
80    const fn with_sleeping(self, sleeping: u8) -> Self {
81        Self((self.0 & !0xff) | sleeping as u32)
82    }
83
84    const fn with_notified(self, notified: u8) -> Self {
85        Self(self.0 & !0xff00 | (notified as u32) << 8)
86    }
87
88    const fn with_foreign(self, foreign: u8) -> Self {
89        Self(self.0 & !0xff0000 | (foreign as u32) << 16)
90    }
91}
92
93#[cfg(test)]
94static ACTIVE_EXECUTORS: AtomicUsize = AtomicUsize::new(0);
95
96pub(crate) struct Executor {
97    pub(super) port: zx::Port,
98    monotonic_timers: Arc<Timers<MonotonicInstant>>,
99    boot_timers: Arc<Timers<BootInstant>>,
100    pub(super) done: AtomicBool,
101    is_local: bool,
102    pub(crate) receivers: PacketReceiverMap,
103    task_count: AtomicUsize,
104    pub(super) ready_tasks: SegQueue<TaskHandle>,
105    time: ExecutorTime,
106    // The low byte is the number of threads currently sleeping. The high byte is the number of
107    // of wake-up notifications pending.
108    pub(super) threads_state: AtomicU32,
109    pub(super) num_threads: u8,
110    pub(super) polled: AtomicU64,
111    // Data that belongs to the user that can be accessed via EHandle::local(). See
112    // `TestExecutor::poll_until_stalled`.
113    pub(super) owner_data: Mutex<Option<Box<dyn Any + Send>>>,
114    pub(super) instrument: Option<Arc<dyn TaskInstrument>>,
115    pub(super) hooks_map: HooksMap,
116    pub(super) first_thread_id: OnceLock<ThreadId>,
117}
118
119impl Executor {
120    pub fn new(
121        time: ExecutorTime,
122        is_local: bool,
123        num_threads: u8,
124        instrument: Option<Arc<dyn TaskInstrument>>,
125    ) -> Self {
126        Self::new_with_port(time, is_local, num_threads, zx::Port::create(), instrument)
127    }
128
129    pub fn new_with_port(
130        time: ExecutorTime,
131        is_local: bool,
132        num_threads: u8,
133        port: zx::Port,
134        instrument: Option<Arc<dyn TaskInstrument>>,
135    ) -> Self {
136        #[cfg(test)]
137        ACTIVE_EXECUTORS.fetch_add(1, Ordering::Relaxed);
138
139        // Is this a fake-time executor?
140        let is_fake = matches!(
141            time,
142            ExecutorTime::FakeTime { mono_reading_ns: _, mono_to_boot_offset_ns: _ }
143        );
144
145        Executor {
146            port,
147            monotonic_timers: Arc::new(Timers::<MonotonicInstant>::new(is_fake)),
148            boot_timers: Arc::new(Timers::<BootInstant>::new(is_fake)),
149            done: AtomicBool::new(false),
150            is_local,
151            receivers: PacketReceiverMap::new(),
152            task_count: AtomicUsize::new(MAIN_TASK_ID + 1),
153            ready_tasks: SegQueue::new(),
154            time,
155            threads_state: AtomicU32::new(0),
156            num_threads,
157            polled: AtomicU64::new(0),
158            owner_data: Mutex::new(None),
159            instrument,
160            hooks_map: HooksMap::default(),
161            first_thread_id: OnceLock::new(),
162        }
163    }
164
165    pub fn set_local(root_scope: ScopeHandle) {
166        root_scope.executor().first_thread_id.get_or_init(|| std::thread::current().id());
167        EXECUTOR.with(|e| {
168            let mut e = e.borrow_mut();
169            assert!(e.is_none(), "Cannot create multiple Fuchsia Executors");
170            *e = Some(root_scope);
171        });
172    }
173
174    fn poll_ready_tasks(&self) -> PollReadyTasksResult {
175        loop {
176            for _ in 0..16 {
177                let Some(task) = self.ready_tasks.pop() else {
178                    return PollReadyTasksResult::NoneReady;
179                };
180                let task_id = task.id();
181                let complete = self.try_poll(task);
182                if complete && task_id == MAIN_TASK_ID {
183                    return PollReadyTasksResult::MainTaskCompleted;
184                }
185                self.polled.fetch_add(1, Ordering::Relaxed);
186            }
187            // We didn't finish all the ready tasks. If there are sleeping threads, post a
188            // notification to wake one up.
189            let mut threads_state = ThreadsState(self.threads_state.load(Ordering::Relaxed));
190            loop {
191                if threads_state.sleeping() == 0 {
192                    // All threads are awake now. Prevent starvation.
193                    return PollReadyTasksResult::MoreReady;
194                }
195                if threads_state.notified() >= threads_state.sleeping() {
196                    // All sleeping threads have been notified. Keep going and poll more tasks.
197                    break;
198                }
199                match self.try_notify(threads_state) {
200                    Ok(()) => break,
201                    Err(s) => threads_state = s,
202                }
203            }
204        }
205    }
206
207    pub fn is_local(&self) -> bool {
208        self.is_local
209    }
210
211    pub fn next_task_id(&self) -> usize {
212        self.task_count.fetch_add(1, Ordering::Relaxed)
213    }
214
215    pub fn notify_task_ready(&self) {
216        // Only post if there's no thread running (or soon to be running). If we happen to be
217        // running on a thread for this executor, then threads_state won't be equal to num_threads,
218        // which means notifications only get fired if this is from a non-async thread, or a thread
219        // that belongs to a different executor. We use SeqCst ordering here to make sure this load
220        // happens *after* the change to ready_tasks and to synchronize with worker_lifecycle.
221        let mut threads_state = ThreadsState(self.threads_state.load(Ordering::SeqCst));
222
223        // We only want to notify if there are no pending notifications and there are no other
224        // threads running.
225        while threads_state == ThreadsState(0).with_sleeping(self.num_threads) {
226            match self.try_notify(threads_state) {
227                Ok(()) => break,
228                Err(s) => threads_state = s,
229            }
230        }
231    }
232
233    /// Tries to notify a thread to wake up. Returns threads_state if it fails.
234    fn try_notify(&self, old_threads_state: ThreadsState) -> Result<(), ThreadsState> {
235        self.threads_state
236            .compare_exchange_weak(
237                old_threads_state.0,
238                old_threads_state.0 + ThreadsState(0).with_notified(1).0,
239                Ordering::Relaxed,
240                Ordering::Relaxed,
241            )
242            .map(|_| self.notify_id(TASK_READY_WAKEUP_ID))
243            .map_err(ThreadsState)
244    }
245
246    pub fn wake_one_thread(&self) {
247        let mut threads_state = ThreadsState(self.threads_state.load(Ordering::Relaxed));
248        let current_sleeping = threads_state.sleeping();
249        if current_sleeping == 0 {
250            return;
251        }
252        while threads_state.notified() == 0 && threads_state.sleeping() >= current_sleeping {
253            match self.try_notify(threads_state) {
254                Ok(()) => break,
255                Err(s) => threads_state = s,
256            }
257        }
258    }
259
260    pub fn notify_id(&self, id: u64) {
261        let up = zx::UserPacket::from_u8_array([0; 32]);
262        let packet = zx::Packet::from_user_packet(id, 0 /* status??? */, up);
263        if let Err(e) = self.port.queue(&packet) {
264            // TODO: logging
265            eprintln!("Failed to queue notify in port: {e:?}");
266        }
267    }
268
269    /// Returns the current reading of the monotonic clock.
270    ///
271    /// For test executors running in fake time, returns the reading of the
272    /// fake monotonic clock.
273    pub fn now(&self) -> MonotonicInstant {
274        match &self.time {
275            ExecutorTime::RealTime => MonotonicInstant::from_zx(zx::MonotonicInstant::get()),
276            ExecutorTime::FakeTime { mono_reading_ns: t, .. } => {
277                MonotonicInstant::from_nanos(t.load(Ordering::Relaxed))
278            }
279        }
280    }
281
282    /// Returns the current reading of the boot clock.
283    ///
284    /// For test executors running in fake time, returns the reading of the
285    /// fake boot clock.
286    pub fn boot_now(&self) -> BootInstant {
287        match &self.time {
288            ExecutorTime::RealTime => BootInstant::from_zx(zx::BootInstant::get()),
289
290            ExecutorTime::FakeTime { mono_reading_ns: t, mono_to_boot_offset_ns } => {
291                // The two atomic values are loaded one after the other. This should
292                // not normally be an issue in tests.
293                let fake_mono_now = MonotonicInstant::from_nanos(t.load(Ordering::Relaxed));
294                let boot_offset_ns = mono_to_boot_offset_ns.load(Ordering::Relaxed);
295                BootInstant::from_nanos(fake_mono_now.into_nanos() + boot_offset_ns)
296            }
297        }
298    }
299
300    /// Sets the reading of the fake monotonic clock.
301    ///
302    /// # Panics
303    ///
304    /// If called on an executor that runs in real time.
305    pub fn set_fake_time(&self, new: MonotonicInstant) {
306        let boot_offset_ns = match &self.time {
307            ExecutorTime::RealTime => {
308                panic!("Error: called `set_fake_time` on an executor using actual time.")
309            }
310            ExecutorTime::FakeTime { mono_reading_ns: t, mono_to_boot_offset_ns } => {
311                t.store(new.into_nanos(), Ordering::Relaxed);
312                mono_to_boot_offset_ns.load(Ordering::Relaxed)
313            }
314        };
315        self.monotonic_timers.maybe_notify(new);
316
317        // Changing fake time also affects boot time.  Notify boot clocks as well.
318        let new_boot_time = BootInstant::from_nanos(new.into_nanos() + boot_offset_ns);
319        self.boot_timers.maybe_notify(new_boot_time);
320    }
321
322    // Sets a new offset between boot and monotonic time.
323    //
324    // Only works for executors operating in fake time.
325    // The change in the fake offset will wake expired boot timers.
326    pub fn set_fake_boot_to_mono_offset(&self, offset: BootDuration) {
327        let mono_now_ns = match &self.time {
328            ExecutorTime::RealTime => {
329                panic!(
330                    "Error: called `set_fake_boot_to_mono_offset` on an executor using actual time."
331                )
332            }
333            ExecutorTime::FakeTime { mono_reading_ns: t, mono_to_boot_offset_ns: b } => {
334                // We ignore the non-atomic update between b and t, it is likely
335                // not relevant in tests.
336                b.store(offset.into_nanos(), Ordering::Relaxed);
337                t.load(Ordering::Relaxed)
338            }
339        };
340        let new_boot_now = BootInstant::from_nanos(mono_now_ns) + offset;
341        self.boot_timers.maybe_notify(new_boot_now);
342    }
343
344    /// Returns `true` if this executor is running in real time.  Returns
345    /// `false` if this executor si running in fake time.
346    pub fn is_real_time(&self) -> bool {
347        matches!(self.time, ExecutorTime::RealTime)
348    }
349
350    /// Must be called before `on_parent_drop`.
351    ///
352    /// Done flag must be set before dropping packet receivers
353    /// so that future receivers that attempt to deregister themselves
354    /// know that it's okay if their entries are already missing.
355    pub fn mark_done(&self) {
356        self.done.store(true, Ordering::SeqCst);
357
358        // Make sure there's at least one notification outstanding per thread to wake up all
359        // workers. This might be more notifications than required, but this way we don't have to
360        // worry about races where tasks are just about to sleep; when a task receives the
361        // notification, it will check done and terminate.
362        let mut threads_state = ThreadsState(self.threads_state.load(Ordering::Relaxed));
363        let num_threads = self.num_threads;
364        loop {
365            let notified = threads_state.notified();
366            if notified >= num_threads {
367                break;
368            }
369            match self.threads_state.compare_exchange_weak(
370                threads_state.0,
371                threads_state.with_notified(num_threads).0,
372                Ordering::Relaxed,
373                Ordering::Relaxed,
374            ) {
375                Ok(_) => {
376                    for _ in notified..num_threads {
377                        self.notify_id(TASK_READY_WAKEUP_ID);
378                    }
379                    return;
380                }
381                Err(old) => threads_state = ThreadsState(old),
382            }
383        }
384    }
385
386    /// Notes about the lifecycle of an Executor.
387    ///
388    /// a) The Executor stands as the only way to run a reactor based on a Fuchsia port, but the
389    /// lifecycle of the port itself is not currently tied to it. Executor vends clones of its
390    /// inner Arc structure to all receivers, so we don't have a type-safe way of ensuring that
391    /// the port is dropped alongside the Executor as it should.
392    /// TODO(https://fxbug.dev/42154828): Ensure the port goes away with the executor.
393    ///
394    /// b) The Executor's lifetime is also tied to the thread-local variable pointing to the
395    /// "current" executor being set, and that's unset when the executor is dropped.
396    ///
397    /// Point (a) is related to "what happens if I use a receiver after the executor is dropped",
398    /// and point (b) is related to "what happens when I try to create a new receiver when there
399    /// is no executor".
400    ///
401    /// Tokio, for example, encodes the lifetime of the reactor separately from the thread-local
402    /// storage [1]. And the reactor discourages usage of strong references to it by vending weak
403    /// references to it [2] instead of strong.
404    ///
405    /// There are pros and cons to both strategies. For (a), tokio encourages (but doesn't
406    /// enforce [3]) type-safety by vending weak pointers, but those add runtime overhead when
407    /// upgrading pointers. For (b) the difference mostly stand for "when is it safe to use IO
408    /// objects/receivers". Tokio says it's only safe to use them whenever a guard is in scope.
409    /// Fuchsia-async says it's safe to use them when a fuchsia_async::Executor is still in scope
410    /// in that thread.
411    ///
412    /// This acts as a prelude to the panic encoded in Executor::drop when receivers haven't
413    /// unregistered themselves when the executor drops. The choice to panic was made based on
414    /// patterns in fuchsia-async that may come to change:
415    ///
416    /// - Executor vends strong references to itself and those references are *stored* by most
417    ///   receiver implementations (as opposed to reached out on TLS every time).
418    /// - Fuchsia-async objects return zx::Status on wait calls, there isn't an appropriate and
419    ///   easy to understand error to return when polling on an extinct executor.
420    /// - All receivers are implemented in this crate and well-known.
421    ///
422    /// [1]: https://docs.rs/tokio/1.5.0/tokio/runtime/struct.Runtime.html#method.enter
423    /// [2]: https://github.com/tokio-rs/tokio/blob/b42f21ec3e212ace25331d0c13889a45769e6006/tokio/src/signal/unix/driver.rs#L35
424    /// [3]: by returning an upgraded Arc, tokio trusts callers to not "use it for too long", an
425    /// opaque non-clone-copy-or-send guard would be stronger than this. See:
426    /// https://github.com/tokio-rs/tokio/blob/b42f21ec3e212ace25331d0c13889a45769e6006/tokio/src/io/driver/mod.rs#L297
427    pub fn on_parent_drop(&self, root_scope: &ScopeHandle) {
428        // Drop all tasks.
429        // Any use of fasync::unblock can involve a waker. Wakers hold weak references to tasks, but
430        // as part of waking, there's an upgrade to a strong reference, so for a small amount of
431        // time `fasync::unblock` can hold a strong reference to a task which in turn holds the
432        // future for the task which in turn could hold references to receivers, which, if we did
433        // nothing about it, would trip the assertion below. For that reason, we forcibly drop the
434        // task futures here.
435        root_scope.drop_all_tasks();
436
437        // Drop all of the uncompleted tasks
438        while self.ready_tasks.pop().is_some() {}
439
440        // Deregister the timer receivers so that we can perform the check below.
441        self.monotonic_timers.deregister();
442        self.boot_timers.deregister();
443
444        // Do not allow any receivers to outlive the executor. That's very likely a bug waiting to
445        // happen. See discussion above.
446        //
447        // If you're here because you hit this panic check your code for:
448        //
449        // - A struct that contains a fuchsia_async::Executor NOT in the last position (last
450        // position gets dropped last: https://doc.rust-lang.org/reference/destructors.html).
451        //
452        // - A function scope that contains a fuchsia_async::Executor NOT in the first position
453        // (first position in function scope gets dropped last:
454        // https://doc.rust-lang.org/reference/destructors.html?highlight=scope#drop-scopes).
455        //
456        // - A function that holds a `fuchsia_async::Executor` in scope and whose last statement
457        // contains a temporary (temporaries are dropped after the function scope:
458        // https://doc.rust-lang.org/reference/destructors.html#temporary-scopes). This usually
459        // looks like a `match` statement at the end of the function without a semicolon.
460        //
461        // - Storing channel and FIDL objects in static variables.
462        //
463        // - fuchsia_async::unblock calls that move channels or FIDL objects to another thread.
464        assert!(self.receivers.is_empty(), "receivers must not outlive their executor");
465
466        // Remove the thread-local executor set in `new`.
467        EHandle::rm_local();
468    }
469
470    // The debugger looks for this function on the stack, so if its (fully-qualified) name changes,
471    // the debugger needs to be updated.
472    // LINT.IfChange
473    pub fn worker_lifecycle<const UNTIL_STALLED: bool>(self: &Arc<Executor>) {
474        // LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
475
476        assert!(
477            !self.is_local() || self.first_thread_id.get() == Some(&std::thread::current().id())
478        );
479
480        self.monotonic_timers.register(self);
481        self.boot_timers.register(self);
482
483        loop {
484            // Keep track of whether we are considered asleep.
485            let mut sleeping = false;
486
487            match self.poll_ready_tasks() {
488                PollReadyTasksResult::NoneReady => {
489                    // No more tasks, indicate we are sleeping. We use SeqCst ordering because we
490                    // want this change here to happen *before* we check ready_tasks below. This
491                    // synchronizes with notify_task_ready which is called *after* a task is added
492                    // to ready_tasks.
493                    const ONE_SLEEPING: ThreadsState = ThreadsState(0).with_sleeping(1);
494                    self.threads_state.fetch_add(ONE_SLEEPING.0, Ordering::SeqCst);
495                    // Check ready tasks again. If a task got posted, wake up. This has to be done
496                    // because a notification won't get sent if there is at least one active thread
497                    // so there's a window between the preceding two lines where a task could be
498                    // made ready and a notification is not sent because it looks like there is at
499                    // least one thread running.
500                    if self.ready_tasks.is_empty() {
501                        sleeping = true;
502                    } else {
503                        // We lost a race, we're no longer sleeping.
504                        self.threads_state.fetch_sub(ONE_SLEEPING.0, Ordering::Relaxed);
505                    }
506                }
507                PollReadyTasksResult::MoreReady => {}
508                PollReadyTasksResult::MainTaskCompleted => return,
509            }
510
511            // Check done here after updating threads_state to avoid shutdown races.
512            if self.done.load(Ordering::SeqCst) {
513                return;
514            }
515
516            enum Work {
517                None,
518                Packet(zx::Packet),
519                Stalled,
520            }
521
522            let mut notified = false;
523            let work = {
524                // If we're considered awake choose INFINITE_PAST which will make the wait call
525                // return immediately.  Otherwise, wait until a packet arrives.
526                let deadline = if !sleeping || UNTIL_STALLED {
527                    zx::Instant::INFINITE_PAST
528                } else {
529                    zx::Instant::INFINITE
530                };
531
532                match self.port.wait(deadline) {
533                    Ok(packet) => {
534                        if packet.key() == TASK_READY_WAKEUP_ID {
535                            notified = true;
536                            Work::None
537                        } else {
538                            Work::Packet(packet)
539                        }
540                    }
541                    Err(zx::Status::TIMED_OUT) => {
542                        if !UNTIL_STALLED || !sleeping {
543                            Work::None
544                        } else {
545                            Work::Stalled
546                        }
547                    }
548                    Err(status) => {
549                        panic!("Error calling port wait: {status:?}");
550                    }
551                }
552            };
553
554            let threads_state_sub =
555                ThreadsState(0).with_sleeping(sleeping as u8).with_notified(notified as u8);
556            if threads_state_sub.0 > 0 {
557                self.threads_state.fetch_sub(threads_state_sub.0, Ordering::Relaxed);
558            }
559
560            match work {
561                Work::Packet(packet) => self.receivers.receive_packet(packet.key(), packet),
562                Work::None => {}
563                Work::Stalled => return,
564            }
565        }
566    }
567
568    /// Drops the main task.
569    ///
570    /// # Safety
571    ///
572    /// The caller must guarantee that the executor isn't running.
573    pub(super) unsafe fn drop_main_task(&self, root_scope: &ScopeHandle) {
574        root_scope.drop_task_unchecked(MAIN_TASK_ID);
575    }
576
577    fn try_poll(&self, task: TaskHandle) -> bool {
578        let task_waker = task.waker();
579        let poll_result = TaskHandle::set_current_with(&task, || {
580            task.try_poll(&mut Context::from_waker(&task_waker))
581        });
582        match poll_result {
583            AttemptPollResult::Yield => {
584                self.ready_tasks.push(task);
585                false
586            }
587            AttemptPollResult::IFinished | AttemptPollResult::Aborted => {
588                task.scope().task_did_finish(task.id());
589                true
590            }
591            _ => false,
592        }
593    }
594
595    /// Returns the monotonic timers.
596    pub fn monotonic_timers(&self) -> &Timers<MonotonicInstant> {
597        &self.monotonic_timers
598    }
599
600    /// Returns the boot timers.
601    pub fn boot_timers(&self) -> &Timers<BootInstant> {
602        &self.boot_timers
603    }
604
605    fn poll_tasks(&self, callback: impl FnOnce()) {
606        assert!(!self.is_local);
607
608        // Increment the count of foreign threads.
609        const ONE_FOREIGN: ThreadsState = ThreadsState(0).with_foreign(1);
610        self.threads_state.fetch_add(ONE_FOREIGN.0, Ordering::Relaxed);
611
612        callback();
613
614        // Poll up to 16 tasks.
615        for _ in 0..16 {
616            let Some(task) = self.ready_tasks.pop() else {
617                break;
618            };
619            let task_id = task.id();
620            if self.try_poll(task) && task_id == MAIN_TASK_ID {
621                break;
622            }
623            self.polled.fetch_add(1, Ordering::Relaxed);
624        }
625
626        let mut threads_state = ThreadsState(
627            self.threads_state.fetch_sub(ONE_FOREIGN.0, Ordering::SeqCst) - ONE_FOREIGN.0,
628        );
629
630        if !self.ready_tasks.is_empty() {
631            // There are tasks still ready to run, so wake up a thread if all the other threads are
632            // sleeping.
633            while threads_state == ThreadsState(0).with_sleeping(self.num_threads) {
634                match self.try_notify(threads_state) {
635                    Ok(()) => break,
636                    Err(s) => threads_state = s,
637                }
638            }
639        }
640    }
641
642    pub fn task_is_ready(&self, task: TaskHandle) {
643        self.ready_tasks.push(task);
644        self.notify_task_ready();
645    }
646}
647
648#[cfg(test)]
649impl Drop for Executor {
650    fn drop(&mut self) {
651        ACTIVE_EXECUTORS.fetch_sub(1, Ordering::Relaxed);
652    }
653}
654
655/// A handle to an executor.
656#[derive(Clone)]
657pub struct EHandle {
658    // LINT.IfChange
659    pub(super) root_scope: ScopeHandle,
660    // LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
661}
662
663impl fmt::Debug for EHandle {
664    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
665        f.debug_struct("EHandle").field("port", &self.inner().port).finish()
666    }
667}
668
669impl EHandle {
670    /// Returns the thread-local executor.
671    ///
672    /// # Panics
673    ///
674    /// If called outside the context of an active async executor.
675    pub fn local() -> Self {
676        let root_scope = EXECUTOR
677            .with(|e| e.borrow().as_ref().map(|x| x.clone()))
678            .expect("Fuchsia Executor must be created first");
679
680        EHandle { root_scope }
681    }
682
683    /// Set the fake time to a given value.
684    ///
685    /// # Panics
686    ///
687    /// If the executor was not created with fake time.
688    pub fn set_fake_time(&self, t: MonotonicInstant) {
689        self.inner().set_fake_time(t)
690    }
691
692    pub(super) fn rm_local() {
693        EXECUTOR.with(|e| *e.borrow_mut() = None);
694    }
695
696    /// The root scope of the executor.
697    ///
698    /// This can be used to spawn tasks that live as long as the executor, and
699    /// to create shorter-lived child scopes.
700    ///
701    /// Most users should create an owned scope with
702    /// [`Scope::new_with_name`][crate::Scope::new_with_name] instead of using this method.
703    pub fn global_scope(&self) -> &ScopeHandle {
704        &self.root_scope
705    }
706
707    /// Get a reference to the Fuchsia `zx::Port` being used to listen for events.
708    pub fn port(&self) -> &zx::Port {
709        &self.inner().port
710    }
711
712    /// Registers a `PacketReceiver` with the executor and returns a registration.
713    /// The `PacketReceiver` will be deregistered when the `Registration` is dropped.
714    pub fn register_receiver<T: PacketReceiver>(&self, receiver: T) -> ReceiverRegistration<T> {
715        self.inner().receivers.register(self.inner().clone(), receiver)
716    }
717
718    /// Registers a pinned `RawPacketReceiver` with the executor.
719    ///
720    /// The registration will be deregistered when dropped.
721    ///
722    /// NOTE: Unlike with `register_receiver`, `receive_packet` will be called whilst a lock is
723    /// held, so it is not safe to register or unregister receivers at that time.
724    pub fn register_pinned<T: PacketReceiver>(
725        &self,
726        raw_registration: Pin<&mut RawReceiverRegistration<T>>,
727    ) {
728        self.inner().receivers.register_pinned(self.clone(), raw_registration);
729    }
730
731    #[inline(always)]
732    pub(crate) fn inner(&self) -> &Arc<Executor> {
733        self.root_scope.executor()
734    }
735
736    /// Spawn a new task to be run on this executor.
737    ///
738    /// Tasks spawned using this method must be thread-safe (implement the `Send` trait), as they
739    /// may be run on either a singlethreaded or multithreaded executor.
740    pub fn spawn_detached(&self, future: impl Future<Output = ()> + Send + 'static) {
741        self.global_scope().spawn(future);
742    }
743
744    /// Spawn a new task to be run on this executor.
745    ///
746    /// This is similar to the `spawn_detached` method, but tasks spawned using this method do not
747    /// have to be threads-safe (implement the `Send` trait). In return, this method requires that
748    /// this executor is a LocalExecutor.
749    pub fn spawn_local_detached(&self, future: impl Future<Output = ()> + 'static) {
750        self.global_scope().spawn_local(future);
751    }
752
753    pub(crate) fn mono_timers(&self) -> &Arc<Timers<MonotonicInstant>> {
754        &self.inner().monotonic_timers
755    }
756
757    pub(crate) fn boot_timers(&self) -> &Arc<Timers<BootInstant>> {
758        &self.inner().boot_timers
759    }
760
761    /// Calls `callback` in the context of the executor and then polls (a limited number of) tasks
762    /// that are ready to run.  If tasks remain ready and no other threads are running, a thread
763    /// will be woken.  This can end up being a performance win in the case that the queue can be
764    /// cleared without needing to wake any other thread.
765    ///
766    /// # Panics
767    ///
768    /// If called on a single-threaded executor or if this thread is a thread managed by the
769    /// executor.
770    pub fn poll_tasks(&self, callback: impl FnOnce()) {
771        EXECUTOR.with(|e| {
772            assert!(
773                e.borrow_mut().replace(self.root_scope.clone()).is_none(),
774                "This thread is already associated with an executor"
775            );
776        });
777
778        self.inner().poll_tasks(callback);
779
780        EXECUTOR.with(|e| *e.borrow_mut() = None);
781    }
782}
783
784// AtomicFutureHandle can have a lifetime (for local executors we allow the main task to have a
785// non-static lifetime).  The executor doesn't handle this though; the executor just assumes all
786// tasks have the 'static lifetime.  It's up to the local executor to extend the lifetime and make
787// it safe.
788pub type TaskHandle = AtomicFutureHandle<'static>;
789
790thread_local! {
791    static CURRENT_TASK: Cell<*const TaskHandle> = const { Cell::new(std::ptr::null()) };
792}
793
794impl TaskHandle {
795    pub(crate) fn with_current<R>(f: impl FnOnce(Option<&TaskHandle>) -> R) -> R {
796        CURRENT_TASK.with(|cur| {
797            let cur = cur.get();
798            let cur = unsafe { cur.as_ref() };
799            f(cur)
800        })
801    }
802
803    fn set_current_with<R>(task: &TaskHandle, f: impl FnOnce() -> R) -> R {
804        CURRENT_TASK.with(|cur| {
805            cur.set(task);
806            let result = f();
807            cur.set(std::ptr::null());
808            result
809        })
810    }
811}
812
813#[cfg(test)]
814mod tests {
815    use super::{ACTIVE_EXECUTORS, EHandle};
816    use crate::{LocalExecutorBuilder, SendExecutorBuilder};
817    use std::sync::Arc;
818    use std::sync::atomic::{AtomicU64, Ordering};
819
820    #[test]
821    fn test_no_leaks() {
822        std::thread::spawn(|| SendExecutorBuilder::new().num_threads(1).build().run(async {}))
823            .join()
824            .unwrap();
825
826        assert_eq!(ACTIVE_EXECUTORS.load(Ordering::Relaxed), 0);
827    }
828
829    #[test]
830    fn poll_tasks() {
831        SendExecutorBuilder::new().num_threads(1).build().run(async {
832            let ehandle = EHandle::local();
833
834            // This will tie up the executor's only running thread which ensures that the task
835            // we spawn below can only run on the foreign thread.
836            std::thread::spawn(move || {
837                let ran = Arc::new(AtomicU64::new(0));
838                ehandle.poll_tasks(|| {
839                    let ran = ran.clone();
840                    ehandle.spawn_detached(async move {
841                        ran.fetch_add(1, Ordering::Relaxed);
842                    });
843                });
844
845                // The spawned task should have run in this thread.
846                assert_eq!(ran.load(Ordering::Relaxed), 1);
847            })
848            .join()
849            .unwrap();
850        });
851    }
852
853    #[test]
854    #[should_panic]
855    fn spawn_local_from_different_thread() {
856        let _executor = LocalExecutorBuilder::new().build();
857        let ehandle = EHandle::local();
858        let _ = std::thread::spawn(move || {
859            ehandle.spawn_local_detached(async {});
860        })
861        .join();
862    }
863}