fuchsia_async/runtime/fuchsia/executor/
common.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use super::super::timer::Timers;
6use super::atomic_future::hooks::HooksMap;
7use super::atomic_future::{AtomicFutureHandle, AttemptPollResult};
8use super::packets::{
9    PacketReceiver, PacketReceiverMap, RawReceiverRegistration, ReceiverRegistration,
10};
11use super::scope::ScopeHandle;
12use super::time::{BootInstant, MonotonicInstant};
13use crate::runtime::instrument::TaskInstrument;
14use crossbeam::queue::SegQueue;
15use fuchsia_sync::Mutex;
16use zx::BootDuration;
17
18use std::any::Any;
19use std::cell::{Cell, RefCell};
20use std::fmt;
21use std::future::Future;
22use std::pin::Pin;
23use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU32, AtomicU64, Ordering};
24use std::sync::{Arc, OnceLock};
25use std::task::Context;
26use std::thread::ThreadId;
27
28pub(crate) const TASK_READY_WAKEUP_ID: u64 = u64::MAX - 1;
29
30thread_local!(
31    static EXECUTOR: RefCell<Option<ScopeHandle>> = const { RefCell::new(None) }
32);
33
34pub enum ExecutorTime {
35    RealTime,
36    /// Fake readings used in tests.
37    FakeTime {
38        // The fake monotonic clock reading.
39        mono_reading_ns: AtomicI64,
40        // An offset to add to mono_reading_ns to get the reading of the boot
41        // clock, disregarding the difference in timelines.
42        //
43        // We disregard the fact that the reading and offset can not be
44        // read atomically, this is usually not relevant in tests.
45        mono_to_boot_offset_ns: AtomicI64,
46    },
47}
48
49enum PollReadyTasksResult {
50    NoneReady,
51    MoreReady,
52    MainTaskCompleted,
53}
54
55///  24           16           8            0
56///  +------------+------------+------------+
57///  |  foreign   |  notified  |  sleeping  |
58///  +------------+------------+------------+
59///
60///  sleeping : the number of threads sleeping
61///  notified : the number of notifications posted to wake sleeping threads
62///  foreign  : the number of foreign threads processing tasks
63#[derive(Clone, Copy, Eq, PartialEq)]
64struct ThreadsState(u32);
65
66impl ThreadsState {
67    const fn sleeping(&self) -> u8 {
68        self.0 as u8
69    }
70
71    const fn notified(&self) -> u8 {
72        (self.0 >> 8) as u8
73    }
74
75    const fn with_sleeping(self, sleeping: u8) -> Self {
76        Self((self.0 & !0xff) | sleeping as u32)
77    }
78
79    const fn with_notified(self, notified: u8) -> Self {
80        Self(self.0 & !0xff00 | (notified as u32) << 8)
81    }
82
83    const fn with_foreign(self, foreign: u8) -> Self {
84        Self(self.0 & !0xff0000 | (foreign as u32) << 16)
85    }
86}
87
88#[cfg(test)]
89static ACTIVE_EXECUTORS: std::sync::atomic::AtomicUsize = std::sync::atomic::AtomicUsize::new(0);
90
91pub(crate) struct Executor {
92    pub(super) port: zx::Port,
93    monotonic_timers: Arc<Timers<MonotonicInstant>>,
94    boot_timers: Arc<Timers<BootInstant>>,
95    pub(super) done: AtomicBool,
96    is_local: bool,
97    pub(crate) receivers: PacketReceiverMap,
98
99    pub(super) ready_tasks: SegQueue<TaskHandle>,
100    time: ExecutorTime,
101    // The low byte is the number of threads currently sleeping. The high byte is the number of
102    // of wake-up notifications pending.
103    pub(super) threads_state: AtomicU32,
104    pub(super) num_threads: u8,
105    pub(super) polled: AtomicU64,
106    // Data that belongs to the user that can be accessed via EHandle::local(). See
107    // `TestExecutor::poll_until_stalled`.
108    pub(super) owner_data: Mutex<Option<Box<dyn Any + Send>>>,
109    pub(super) instrument: Option<Arc<dyn TaskInstrument>>,
110    pub(super) hooks_map: HooksMap,
111    pub(super) first_thread_id: OnceLock<ThreadId>,
112}
113
114impl Executor {
115    pub fn new(
116        time: ExecutorTime,
117        is_local: bool,
118        num_threads: u8,
119        instrument: Option<Arc<dyn TaskInstrument>>,
120    ) -> Self {
121        Self::new_with_port(time, is_local, num_threads, zx::Port::create(), instrument)
122    }
123
124    pub fn new_with_port(
125        time: ExecutorTime,
126        is_local: bool,
127        num_threads: u8,
128        port: zx::Port,
129        instrument: Option<Arc<dyn TaskInstrument>>,
130    ) -> Self {
131        #[cfg(test)]
132        ACTIVE_EXECUTORS.fetch_add(1, Ordering::Relaxed);
133
134        // Is this a fake-time executor?
135        let is_fake = matches!(
136            time,
137            ExecutorTime::FakeTime { mono_reading_ns: _, mono_to_boot_offset_ns: _ }
138        );
139
140        Executor {
141            port,
142            monotonic_timers: Arc::new(Timers::<MonotonicInstant>::new(is_fake)),
143            boot_timers: Arc::new(Timers::<BootInstant>::new(is_fake)),
144            done: AtomicBool::new(false),
145            is_local,
146            receivers: PacketReceiverMap::new(),
147            ready_tasks: SegQueue::new(),
148            time,
149            threads_state: AtomicU32::new(0),
150            num_threads,
151            polled: AtomicU64::new(0),
152            owner_data: Mutex::new(None),
153            instrument,
154            hooks_map: HooksMap::default(),
155            first_thread_id: OnceLock::new(),
156        }
157    }
158
159    pub fn set_local(root_scope: ScopeHandle) {
160        root_scope.executor().first_thread_id.get_or_init(|| std::thread::current().id());
161        EXECUTOR.with(|e| {
162            let mut e = e.borrow_mut();
163            assert!(e.is_none(), "Cannot create multiple Fuchsia Executors");
164            *e = Some(root_scope);
165        });
166    }
167
168    fn poll_ready_tasks(&self, main_task: Option<&TaskHandle>) -> PollReadyTasksResult {
169        loop {
170            for _ in 0..16 {
171                let Some(task) = self.ready_tasks.pop() else {
172                    return PollReadyTasksResult::NoneReady;
173                };
174                let is_main = Some(&task) == main_task;
175                let complete = self.try_poll(task);
176                if complete && is_main {
177                    return PollReadyTasksResult::MainTaskCompleted;
178                }
179                self.polled.fetch_add(1, Ordering::Relaxed);
180            }
181            // We didn't finish all the ready tasks. If there are sleeping threads, post a
182            // notification to wake one up.
183            let mut threads_state = ThreadsState(self.threads_state.load(Ordering::Relaxed));
184            loop {
185                if threads_state.sleeping() == 0 {
186                    // All threads are awake now. Prevent starvation.
187                    return PollReadyTasksResult::MoreReady;
188                }
189                if threads_state.notified() >= threads_state.sleeping() {
190                    // All sleeping threads have been notified. Keep going and poll more tasks.
191                    break;
192                }
193                match self.try_notify(threads_state) {
194                    Ok(()) => break,
195                    Err(s) => threads_state = s,
196                }
197            }
198        }
199    }
200
201    pub fn is_local(&self) -> bool {
202        self.is_local
203    }
204
205    pub fn notify_task_ready(&self) {
206        // Only post if there's no thread running (or soon to be running). If we happen to be
207        // running on a thread for this executor, then threads_state won't be equal to num_threads,
208        // which means notifications only get fired if this is from a non-async thread, or a thread
209        // that belongs to a different executor. We use SeqCst ordering here to make sure this load
210        // happens *after* the change to ready_tasks and to synchronize with worker_lifecycle.
211        let mut threads_state = ThreadsState(self.threads_state.load(Ordering::SeqCst));
212
213        // We only want to notify if there are no pending notifications and there are no other
214        // threads running.
215        while threads_state == ThreadsState(0).with_sleeping(self.num_threads) {
216            match self.try_notify(threads_state) {
217                Ok(()) => break,
218                Err(s) => threads_state = s,
219            }
220        }
221    }
222
223    /// Tries to notify a thread to wake up. Returns threads_state if it fails.
224    fn try_notify(&self, old_threads_state: ThreadsState) -> Result<(), ThreadsState> {
225        self.threads_state
226            .compare_exchange_weak(
227                old_threads_state.0,
228                old_threads_state.0 + ThreadsState(0).with_notified(1).0,
229                Ordering::Relaxed,
230                Ordering::Relaxed,
231            )
232            .map(|_| self.notify_id(TASK_READY_WAKEUP_ID))
233            .map_err(ThreadsState)
234    }
235
236    pub fn wake_one_thread(&self) {
237        let mut threads_state = ThreadsState(self.threads_state.load(Ordering::Relaxed));
238        let current_sleeping = threads_state.sleeping();
239        if current_sleeping == 0 {
240            return;
241        }
242        while threads_state.notified() == 0 && threads_state.sleeping() >= current_sleeping {
243            match self.try_notify(threads_state) {
244                Ok(()) => break,
245                Err(s) => threads_state = s,
246            }
247        }
248    }
249
250    pub fn notify_id(&self, id: u64) {
251        let up = zx::UserPacket::from_u8_array([0; 32]);
252        let packet = zx::Packet::from_user_packet(id, 0 /* status??? */, up);
253        if let Err(e) = self.port.queue(&packet) {
254            // TODO: logging
255            eprintln!("Failed to queue notify in port: {e:?}");
256        }
257    }
258
259    /// Returns the current reading of the monotonic clock.
260    ///
261    /// For test executors running in fake time, returns the reading of the
262    /// fake monotonic clock.
263    pub fn now(&self) -> MonotonicInstant {
264        match &self.time {
265            ExecutorTime::RealTime => MonotonicInstant::from_zx(zx::MonotonicInstant::get()),
266            ExecutorTime::FakeTime { mono_reading_ns: t, .. } => {
267                MonotonicInstant::from_nanos(t.load(Ordering::Relaxed))
268            }
269        }
270    }
271
272    /// Returns the current reading of the boot clock.
273    ///
274    /// For test executors running in fake time, returns the reading of the
275    /// fake boot clock.
276    pub fn boot_now(&self) -> BootInstant {
277        match &self.time {
278            ExecutorTime::RealTime => BootInstant::from_zx(zx::BootInstant::get()),
279
280            ExecutorTime::FakeTime { mono_reading_ns: t, mono_to_boot_offset_ns } => {
281                // The two atomic values are loaded one after the other. This should
282                // not normally be an issue in tests.
283                let fake_mono_now = MonotonicInstant::from_nanos(t.load(Ordering::Relaxed));
284                let boot_offset_ns = mono_to_boot_offset_ns.load(Ordering::Relaxed);
285                BootInstant::from_nanos(fake_mono_now.into_nanos() + boot_offset_ns)
286            }
287        }
288    }
289
290    /// Sets the reading of the fake monotonic clock.
291    ///
292    /// # Panics
293    ///
294    /// If called on an executor that runs in real time.
295    pub fn set_fake_time(&self, new: MonotonicInstant) {
296        let boot_offset_ns = match &self.time {
297            ExecutorTime::RealTime => {
298                panic!("Error: called `set_fake_time` on an executor using actual time.")
299            }
300            ExecutorTime::FakeTime { mono_reading_ns: t, mono_to_boot_offset_ns } => {
301                t.store(new.into_nanos(), Ordering::Relaxed);
302                mono_to_boot_offset_ns.load(Ordering::Relaxed)
303            }
304        };
305        self.monotonic_timers.maybe_notify(new);
306
307        // Changing fake time also affects boot time.  Notify boot clocks as well.
308        let new_boot_time = BootInstant::from_nanos(new.into_nanos() + boot_offset_ns);
309        self.boot_timers.maybe_notify(new_boot_time);
310    }
311
312    // Sets a new offset between boot and monotonic time.
313    //
314    // Only works for executors operating in fake time.
315    // The change in the fake offset will wake expired boot timers.
316    pub fn set_fake_boot_to_mono_offset(&self, offset: BootDuration) {
317        let mono_now_ns = match &self.time {
318            ExecutorTime::RealTime => {
319                panic!(
320                    "Error: called `set_fake_boot_to_mono_offset` on an executor using actual time."
321                )
322            }
323            ExecutorTime::FakeTime { mono_reading_ns: t, mono_to_boot_offset_ns: b } => {
324                // We ignore the non-atomic update between b and t, it is likely
325                // not relevant in tests.
326                b.store(offset.into_nanos(), Ordering::Relaxed);
327                t.load(Ordering::Relaxed)
328            }
329        };
330        let new_boot_now = BootInstant::from_nanos(mono_now_ns) + offset;
331        self.boot_timers.maybe_notify(new_boot_now);
332    }
333
334    /// Returns `true` if this executor is running in real time.  Returns
335    /// `false` if this executor si running in fake time.
336    pub fn is_real_time(&self) -> bool {
337        matches!(self.time, ExecutorTime::RealTime)
338    }
339
340    /// Must be called before `on_parent_drop`.
341    ///
342    /// Done flag must be set before dropping packet receivers
343    /// so that future receivers that attempt to deregister themselves
344    /// know that it's okay if their entries are already missing.
345    pub fn mark_done(&self) {
346        self.done.store(true, Ordering::SeqCst);
347
348        // Make sure there's at least one notification outstanding per thread to wake up all
349        // workers. This might be more notifications than required, but this way we don't have to
350        // worry about races where tasks are just about to sleep; when a task receives the
351        // notification, it will check done and terminate.
352        let mut threads_state = ThreadsState(self.threads_state.load(Ordering::Relaxed));
353        let num_threads = self.num_threads;
354        loop {
355            let notified = threads_state.notified();
356            if notified >= num_threads {
357                break;
358            }
359            match self.threads_state.compare_exchange_weak(
360                threads_state.0,
361                threads_state.with_notified(num_threads).0,
362                Ordering::Relaxed,
363                Ordering::Relaxed,
364            ) {
365                Ok(_) => {
366                    for _ in notified..num_threads {
367                        self.notify_id(TASK_READY_WAKEUP_ID);
368                    }
369                    return;
370                }
371                Err(old) => threads_state = ThreadsState(old),
372            }
373        }
374    }
375
376    /// Notes about the lifecycle of an Executor.
377    ///
378    /// a) The Executor stands as the only way to run a reactor based on a Fuchsia port, but the
379    /// lifecycle of the port itself is not currently tied to it. Executor vends clones of its
380    /// inner Arc structure to all receivers, so we don't have a type-safe way of ensuring that
381    /// the port is dropped alongside the Executor as it should.
382    /// TODO(https://fxbug.dev/42154828): Ensure the port goes away with the executor.
383    ///
384    /// b) The Executor's lifetime is also tied to the thread-local variable pointing to the
385    /// "current" executor being set, and that's unset when the executor is dropped.
386    ///
387    /// Point (a) is related to "what happens if I use a receiver after the executor is dropped",
388    /// and point (b) is related to "what happens when I try to create a new receiver when there
389    /// is no executor".
390    ///
391    /// Tokio, for example, encodes the lifetime of the reactor separately from the thread-local
392    /// storage [1]. And the reactor discourages usage of strong references to it by vending weak
393    /// references to it [2] instead of strong.
394    ///
395    /// There are pros and cons to both strategies. For (a), tokio encourages (but doesn't
396    /// enforce [3]) type-safety by vending weak pointers, but those add runtime overhead when
397    /// upgrading pointers. For (b) the difference mostly stand for "when is it safe to use IO
398    /// objects/receivers". Tokio says it's only safe to use them whenever a guard is in scope.
399    /// Fuchsia-async says it's safe to use them when a fuchsia_async::Executor is still in scope
400    /// in that thread.
401    ///
402    /// This acts as a prelude to the panic encoded in Executor::drop when receivers haven't
403    /// unregistered themselves when the executor drops. The choice to panic was made based on
404    /// patterns in fuchsia-async that may come to change:
405    ///
406    /// - Executor vends strong references to itself and those references are *stored* by most
407    ///   receiver implementations (as opposed to reached out on TLS every time).
408    /// - Fuchsia-async objects return zx::Status on wait calls, there isn't an appropriate and
409    ///   easy to understand error to return when polling on an extinct executor.
410    /// - All receivers are implemented in this crate and well-known.
411    ///
412    /// [1]: https://docs.rs/tokio/1.5.0/tokio/runtime/struct.Runtime.html#method.enter
413    /// [2]: https://github.com/tokio-rs/tokio/blob/b42f21ec3e212ace25331d0c13889a45769e6006/tokio/src/signal/unix/driver.rs#L35
414    /// [3]: by returning an upgraded Arc, tokio trusts callers to not "use it for too long", an
415    /// opaque non-clone-copy-or-send guard would be stronger than this. See:
416    /// https://github.com/tokio-rs/tokio/blob/b42f21ec3e212ace25331d0c13889a45769e6006/tokio/src/io/driver/mod.rs#L297
417    pub fn on_parent_drop(&self, root_scope: &ScopeHandle) {
418        // Drop all tasks.
419        // Any use of fasync::unblock can involve a waker. Wakers hold weak references to tasks, but
420        // as part of waking, there's an upgrade to a strong reference, so for a small amount of
421        // time `fasync::unblock` can hold a strong reference to a task which in turn holds the
422        // future for the task which in turn could hold references to receivers, which, if we did
423        // nothing about it, would trip the assertion below. For that reason, we forcibly drop the
424        // task futures here.
425        root_scope.drop_all_tasks();
426
427        // Drop all of the uncompleted tasks
428        while self.ready_tasks.pop().is_some() {}
429
430        // Deregister the timer receivers so that we can perform the check below.
431        self.monotonic_timers.deregister();
432        self.boot_timers.deregister();
433
434        // Do not allow any receivers to outlive the executor. That's very likely a bug waiting to
435        // happen. See discussion above.
436        //
437        // If you're here because you hit this panic check your code for:
438        //
439        // - A struct that contains a fuchsia_async::Executor NOT in the last position (last
440        // position gets dropped last: https://doc.rust-lang.org/reference/destructors.html).
441        //
442        // - A function scope that contains a fuchsia_async::Executor NOT in the first position
443        // (first position in function scope gets dropped last:
444        // https://doc.rust-lang.org/reference/destructors.html?highlight=scope#drop-scopes).
445        //
446        // - A function that holds a `fuchsia_async::Executor` in scope and whose last statement
447        // contains a temporary (temporaries are dropped after the function scope:
448        // https://doc.rust-lang.org/reference/destructors.html#temporary-scopes). This usually
449        // looks like a `match` statement at the end of the function without a semicolon.
450        //
451        // - Storing channel and FIDL objects in static variables.
452        //
453        // - fuchsia_async::unblock calls that move channels or FIDL objects to another thread.
454        assert!(self.receivers.is_empty(), "receivers must not outlive their executor");
455
456        // Remove the thread-local executor set in `new`.
457        EHandle::rm_local();
458    }
459
460    // The debugger looks for this function on the stack, so if its (fully-qualified) name changes,
461    // the debugger needs to be updated.
462    // LINT.IfChange
463    pub fn worker_lifecycle<const UNTIL_STALLED: bool>(
464        self: &Arc<Executor>,
465        // The main task should be specified for a single-threaded executor, but it isn't necessary
466        // for a multi-threaded executor since it has an independent mechanism for tracking when
467        // the main task terminates.
468        // TODO(https://fxbug.dev/481030722) remove this parameter
469        main_task: Option<&TaskHandle>,
470    ) {
471        // LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
472
473        assert!(
474            !self.is_local() || self.first_thread_id.get() == Some(&std::thread::current().id())
475        );
476
477        self.monotonic_timers.register(self);
478        self.boot_timers.register(self);
479
480        loop {
481            // Keep track of whether we are considered asleep.
482            let mut sleeping = false;
483
484            match self.poll_ready_tasks(main_task) {
485                PollReadyTasksResult::NoneReady => {
486                    // No more tasks, indicate we are sleeping. We use SeqCst ordering because we
487                    // want this change here to happen *before* we check ready_tasks below. This
488                    // synchronizes with notify_task_ready which is called *after* a task is added
489                    // to ready_tasks.
490                    const ONE_SLEEPING: ThreadsState = ThreadsState(0).with_sleeping(1);
491                    self.threads_state.fetch_add(ONE_SLEEPING.0, Ordering::SeqCst);
492                    // Check ready tasks again. If a task got posted, wake up. This has to be done
493                    // because a notification won't get sent if there is at least one active thread
494                    // so there's a window between the preceding two lines where a task could be
495                    // made ready and a notification is not sent because it looks like there is at
496                    // least one thread running.
497                    if self.ready_tasks.is_empty() {
498                        sleeping = true;
499                    } else {
500                        // We lost a race, we're no longer sleeping.
501                        self.threads_state.fetch_sub(ONE_SLEEPING.0, Ordering::Relaxed);
502                    }
503                }
504                PollReadyTasksResult::MoreReady => {}
505                PollReadyTasksResult::MainTaskCompleted => return,
506            }
507
508            // Check done here after updating threads_state to avoid shutdown races.
509            if self.done.load(Ordering::SeqCst) {
510                return;
511            }
512
513            enum Work {
514                None,
515                Packet(zx::Packet),
516                Stalled,
517            }
518
519            let mut notified = false;
520            let work = {
521                // If we're considered awake choose INFINITE_PAST which will make the wait call
522                // return immediately.  Otherwise, wait until a packet arrives.
523                let deadline = if !sleeping || UNTIL_STALLED {
524                    zx::Instant::INFINITE_PAST
525                } else {
526                    zx::Instant::INFINITE
527                };
528
529                match self.port.wait(deadline) {
530                    Ok(packet) => {
531                        if packet.key() == TASK_READY_WAKEUP_ID {
532                            notified = true;
533                            Work::None
534                        } else {
535                            Work::Packet(packet)
536                        }
537                    }
538                    Err(zx::Status::TIMED_OUT) => {
539                        if !UNTIL_STALLED || !sleeping {
540                            Work::None
541                        } else {
542                            Work::Stalled
543                        }
544                    }
545                    Err(status) => {
546                        panic!("Error calling port wait: {status:?}");
547                    }
548                }
549            };
550
551            let threads_state_sub =
552                ThreadsState(0).with_sleeping(sleeping as u8).with_notified(notified as u8);
553            if threads_state_sub.0 > 0 {
554                self.threads_state.fetch_sub(threads_state_sub.0, Ordering::Relaxed);
555            }
556
557            match work {
558                Work::Packet(packet) => self.receivers.receive_packet(packet.key(), packet),
559                Work::None => {}
560                Work::Stalled => return,
561            }
562        }
563    }
564
565    /// Drops the main task.
566    ///
567    /// # Safety
568    ///
569    /// The caller must guarantee that the executor isn't running.
570    pub(super) unsafe fn drop_main_task(&self, root_scope: &ScopeHandle, task: &TaskHandle) {
571        unsafe { root_scope.drop_task_unchecked(task) };
572    }
573
574    fn try_poll(&self, task: TaskHandle) -> bool {
575        let task_waker = task.waker();
576        let poll_result = TaskHandle::set_current_with(&task, || {
577            task.try_poll(&mut Context::from_waker(&task_waker))
578        });
579        match poll_result {
580            AttemptPollResult::Yield => {
581                self.ready_tasks.push(task);
582                false
583            }
584            AttemptPollResult::IFinished | AttemptPollResult::Aborted => {
585                task.scope().task_did_finish(&task);
586                true
587            }
588            _ => false,
589        }
590    }
591
592    /// Returns the monotonic timers.
593    pub fn monotonic_timers(&self) -> &Timers<MonotonicInstant> {
594        &self.monotonic_timers
595    }
596
597    /// Returns the boot timers.
598    pub fn boot_timers(&self) -> &Timers<BootInstant> {
599        &self.boot_timers
600    }
601
602    fn poll_tasks(&self, callback: impl FnOnce(), main_task: Option<&TaskHandle>) {
603        assert!(!self.is_local);
604
605        // Increment the count of foreign threads.
606        const ONE_FOREIGN: ThreadsState = ThreadsState(0).with_foreign(1);
607        self.threads_state.fetch_add(ONE_FOREIGN.0, Ordering::Relaxed);
608
609        callback();
610
611        // Poll up to 16 tasks.
612        for _ in 0..16 {
613            let Some(task) = self.ready_tasks.pop() else {
614                break;
615            };
616            let is_main = Some(&task) == main_task;
617            if self.try_poll(task) && is_main {
618                break;
619            }
620            self.polled.fetch_add(1, Ordering::Relaxed);
621        }
622
623        let mut threads_state = ThreadsState(
624            self.threads_state.fetch_sub(ONE_FOREIGN.0, Ordering::SeqCst) - ONE_FOREIGN.0,
625        );
626
627        if !self.ready_tasks.is_empty() {
628            // There are tasks still ready to run, so wake up a thread if all the other threads are
629            // sleeping.
630            while threads_state == ThreadsState(0).with_sleeping(self.num_threads) {
631                match self.try_notify(threads_state) {
632                    Ok(()) => break,
633                    Err(s) => threads_state = s,
634                }
635            }
636        }
637    }
638
639    pub fn task_is_ready(&self, task: TaskHandle) {
640        self.ready_tasks.push(task);
641        self.notify_task_ready();
642    }
643}
644
645#[cfg(test)]
646impl Drop for Executor {
647    fn drop(&mut self) {
648        ACTIVE_EXECUTORS.fetch_sub(1, Ordering::Relaxed);
649    }
650}
651
652/// A handle to an executor.
653#[derive(Clone)]
654pub struct EHandle {
655    // LINT.IfChange
656    pub(super) root_scope: ScopeHandle,
657    // LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
658}
659
660impl fmt::Debug for EHandle {
661    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
662        f.debug_struct("EHandle").field("port", &self.inner().port).finish()
663    }
664}
665
666impl EHandle {
667    /// Returns the thread-local executor.
668    ///
669    /// # Panics
670    ///
671    /// If called outside the context of an active async executor.
672    pub fn local() -> Self {
673        let root_scope = EXECUTOR
674            .with(|e| e.borrow().as_ref().map(|x| x.clone()))
675            .expect("Fuchsia Executor must be created first");
676
677        EHandle { root_scope }
678    }
679
680    /// Set the fake time to a given value.
681    ///
682    /// # Panics
683    ///
684    /// If the executor was not created with fake time.
685    pub fn set_fake_time(&self, t: MonotonicInstant) {
686        self.inner().set_fake_time(t)
687    }
688
689    pub(super) fn rm_local() {
690        EXECUTOR.with(|e| *e.borrow_mut() = None);
691    }
692
693    /// The root scope of the executor.
694    ///
695    /// This can be used to spawn tasks that live as long as the executor, and
696    /// to create shorter-lived child scopes.
697    ///
698    /// Most users should create an owned scope with
699    /// [`Scope::new_with_name`][crate::Scope::new_with_name] instead of using this method.
700    pub fn global_scope(&self) -> &ScopeHandle {
701        &self.root_scope
702    }
703
704    /// Get a reference to the Fuchsia `zx::Port` being used to listen for events.
705    pub fn port(&self) -> &zx::Port {
706        &self.inner().port
707    }
708
709    /// Registers a `PacketReceiver` with the executor and returns a registration.
710    /// The `PacketReceiver` will be deregistered when the `Registration` is dropped.
711    pub fn register_receiver<T: PacketReceiver>(&self, receiver: T) -> ReceiverRegistration<T> {
712        self.inner().receivers.register(self.inner().clone(), receiver)
713    }
714
715    /// Registers a pinned `RawPacketReceiver` with the executor.
716    ///
717    /// The registration will be deregistered when dropped.
718    ///
719    /// NOTE: Unlike with `register_receiver`, `receive_packet` will be called whilst a lock is
720    /// held, so it is not safe to register or unregister receivers at that time.
721    pub fn register_pinned<T: PacketReceiver>(
722        &self,
723        raw_registration: Pin<&mut RawReceiverRegistration<T>>,
724    ) {
725        self.inner().receivers.register_pinned(self.clone(), raw_registration);
726    }
727
728    #[inline(always)]
729    pub(crate) fn inner(&self) -> &Arc<Executor> {
730        self.root_scope.executor()
731    }
732
733    /// Spawn a new task to be run on this executor.
734    ///
735    /// Tasks spawned using this method must be thread-safe (implement the `Send` trait), as they
736    /// may be run on either a singlethreaded or multithreaded executor.
737    pub fn spawn_detached(&self, future: impl Future<Output = ()> + Send + 'static) {
738        self.global_scope().spawn(future);
739    }
740
741    /// Spawn a new task to be run on this executor.
742    ///
743    /// This is similar to the `spawn_detached` method, but tasks spawned using this method do not
744    /// have to be threads-safe (implement the `Send` trait). In return, this method requires that
745    /// this executor is a LocalExecutor.
746    pub fn spawn_local_detached(&self, future: impl Future<Output = ()> + 'static) {
747        self.global_scope().spawn_local(future);
748    }
749
750    pub(crate) fn mono_timers(&self) -> &Arc<Timers<MonotonicInstant>> {
751        &self.inner().monotonic_timers
752    }
753
754    pub(crate) fn boot_timers(&self) -> &Arc<Timers<BootInstant>> {
755        &self.inner().boot_timers
756    }
757
758    /// Calls `callback` in the context of the executor and then polls (a limited number of) tasks
759    /// that are ready to run.  If tasks remain ready and no other threads are running, a thread
760    /// will be woken.  This can end up being a performance win in the case that the queue can be
761    /// cleared without needing to wake any other thread.
762    ///
763    /// # Panics
764    ///
765    /// If called on a single-threaded executor or if this thread is a thread managed by the
766    /// executor.
767    pub fn poll_tasks(&self, callback: impl FnOnce()) {
768        EXECUTOR.with(|e| {
769            assert!(
770                e.borrow_mut().replace(self.root_scope.clone()).is_none(),
771                "This thread is already associated with an executor"
772            );
773        });
774
775        // `poll_tasks` should only ever be used on a multi-threaded executor, it's safe to pass
776        // None.
777        self.inner().poll_tasks(callback, None);
778
779        EXECUTOR.with(|e| *e.borrow_mut() = None);
780    }
781}
782
783// AtomicFutureHandle can have a lifetime (for local executors we allow the main task to have a
784// non-static lifetime).  The executor doesn't handle this though; the executor just assumes all
785// tasks have the 'static lifetime.  It's up to the local executor to extend the lifetime and make
786// it safe.
787pub type TaskHandle = AtomicFutureHandle<'static>;
788
789thread_local! {
790    static CURRENT_TASK: Cell<*const TaskHandle> = const { Cell::new(std::ptr::null()) };
791}
792
793impl TaskHandle {
794    pub(crate) fn with_current<R>(f: impl FnOnce(Option<&TaskHandle>) -> R) -> R {
795        CURRENT_TASK.with(|cur| {
796            let cur = cur.get();
797            let cur = unsafe { cur.as_ref() };
798            f(cur)
799        })
800    }
801
802    fn set_current_with<R>(task: &TaskHandle, f: impl FnOnce() -> R) -> R {
803        CURRENT_TASK.with(|cur| {
804            cur.set(task);
805            let result = f();
806            cur.set(std::ptr::null());
807            result
808        })
809    }
810}
811
812#[cfg(test)]
813mod tests {
814    use super::{ACTIVE_EXECUTORS, EHandle};
815    use crate::{LocalExecutorBuilder, SendExecutorBuilder};
816    use std::sync::Arc;
817    use std::sync::atomic::{AtomicU64, Ordering};
818
819    #[test]
820    fn test_no_leaks() {
821        std::thread::spawn(|| SendExecutorBuilder::new().num_threads(1).build().run(async {}))
822            .join()
823            .unwrap();
824
825        assert_eq!(ACTIVE_EXECUTORS.load(Ordering::Relaxed), 0);
826    }
827
828    #[test]
829    fn poll_tasks() {
830        SendExecutorBuilder::new().num_threads(1).build().run(async {
831            let ehandle = EHandle::local();
832
833            // This will tie up the executor's only running thread which ensures that the task
834            // we spawn below can only run on the foreign thread.
835            std::thread::spawn(move || {
836                let ran = Arc::new(AtomicU64::new(0));
837                ehandle.poll_tasks(|| {
838                    let ran = ran.clone();
839                    ehandle.spawn_detached(async move {
840                        ran.fetch_add(1, Ordering::Relaxed);
841                    });
842                });
843
844                // The spawned task should have run in this thread.
845                assert_eq!(ran.load(Ordering::Relaxed), 1);
846            })
847            .join()
848            .unwrap();
849        });
850    }
851
852    #[test]
853    #[should_panic]
854    fn spawn_local_from_different_thread() {
855        let _executor = LocalExecutorBuilder::new().build();
856        let ehandle = EHandle::local();
857        let _ = std::thread::spawn(move || {
858            ehandle.spawn_local_detached(async {});
859        })
860        .join();
861    }
862}