Skip to main content

fuchsia_async/runtime/fuchsia/executor/
common.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use super::super::timer::Timers;
6use super::atomic_future::hooks::HooksMap;
7use super::atomic_future::{AtomicFutureHandle, AttemptPollResult};
8use super::packets::{
9    PacketReceiver, PacketReceiverMap, RawReceiverRegistration, ReceiverRegistration,
10};
11use super::scope::ScopeHandle;
12use super::time::{BootInstant, MonotonicInstant};
13use crate::runtime::instrument::TaskInstrument;
14use crossbeam::queue::SegQueue;
15use fuchsia_sync::Mutex;
16use zx::BootDuration;
17
18use std::any::Any;
19use std::cell::{Cell, RefCell};
20use std::fmt;
21use std::future::Future;
22use std::pin::Pin;
23use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU32, AtomicU64, Ordering};
24use std::sync::{Arc, OnceLock};
25use std::task::Context;
26use std::thread::ThreadId;
27
28pub(crate) const TASK_READY_WAKEUP_ID: u64 = u64::MAX - 1;
29
30thread_local!(
31    static EXECUTOR: RefCell<Option<ScopeHandle>> = const { RefCell::new(None) }
32);
33
34pub enum ExecutorTime {
35    RealTime,
36    /// Fake readings used in tests.
37    FakeTime {
38        // The fake monotonic clock reading.
39        mono_reading_ns: AtomicI64,
40        // An offset to add to mono_reading_ns to get the reading of the boot
41        // clock, disregarding the difference in timelines.
42        //
43        // We disregard the fact that the reading and offset can not be
44        // read atomically, this is usually not relevant in tests.
45        mono_to_boot_offset_ns: AtomicI64,
46    },
47}
48
49enum PollReadyTasksResult {
50    NoneReady,
51    MoreReady,
52    MainTaskCompleted,
53}
54
55///  24           16           8            0
56///  +------------+------------+------------+
57///  |  foreign   |  notified  |  sleeping  |
58///  +------------+------------+------------+
59///
60///  sleeping : the number of threads sleeping
61///  notified : the number of notifications posted to wake sleeping threads
62///  foreign  : the number of foreign threads processing tasks
63#[derive(Clone, Copy, Eq, PartialEq)]
64struct ThreadsState(u32);
65
66impl ThreadsState {
67    const fn sleeping(&self) -> u8 {
68        self.0 as u8
69    }
70
71    const fn notified(&self) -> u8 {
72        (self.0 >> 8) as u8
73    }
74
75    const fn with_sleeping(self, sleeping: u8) -> Self {
76        Self((self.0 & !0xff) | sleeping as u32)
77    }
78
79    const fn with_notified(self, notified: u8) -> Self {
80        Self(self.0 & !0xff00 | (notified as u32) << 8)
81    }
82
83    const fn with_foreign(self, foreign: u8) -> Self {
84        Self(self.0 & !0xff0000 | (foreign as u32) << 16)
85    }
86}
87
88#[cfg(test)]
89static ACTIVE_EXECUTORS: std::sync::atomic::AtomicUsize = std::sync::atomic::AtomicUsize::new(0);
90
91pub(crate) struct Executor {
92    pub(super) port: zx::Port,
93    monotonic_timers: Arc<Timers<MonotonicInstant>>,
94    boot_timers: Arc<Timers<BootInstant>>,
95    pub(super) done: AtomicBool,
96    is_local: bool,
97    pub(crate) receivers: PacketReceiverMap,
98
99    pub(super) ready_tasks: SegQueue<TaskHandle>,
100    time: ExecutorTime,
101    // The low byte is the number of threads currently sleeping. The high byte is the number of
102    // of wake-up notifications pending.
103    pub(super) threads_state: AtomicU32,
104    pub(super) num_threads: u8,
105    pub(super) polled: AtomicU64,
106    // Data that belongs to the user that can be accessed via EHandle::local(). See
107    // `TestExecutor::poll_until_stalled`.
108    pub(super) owner_data: Mutex<Option<Box<dyn Any + Send>>>,
109    pub(super) instrument: Option<Arc<dyn TaskInstrument>>,
110    pub(super) hooks_map: HooksMap,
111    pub(super) first_thread_id: OnceLock<ThreadId>,
112
113    // The time, in nanoseconds, that the executor was last executing a normal priority task.
114    last_active: AtomicI64,
115}
116
117impl Executor {
118    pub fn new(
119        time: ExecutorTime,
120        is_local: bool,
121        num_threads: u8,
122        instrument: Option<Arc<dyn TaskInstrument>>,
123    ) -> Self {
124        Self::new_with_port(time, is_local, num_threads, zx::Port::create(), instrument)
125    }
126
127    pub fn new_with_port(
128        time: ExecutorTime,
129        is_local: bool,
130        num_threads: u8,
131        port: zx::Port,
132        instrument: Option<Arc<dyn TaskInstrument>>,
133    ) -> Self {
134        #[cfg(test)]
135        ACTIVE_EXECUTORS.fetch_add(1, Ordering::Relaxed);
136
137        // Is this a fake-time executor?
138        let is_fake = matches!(
139            time,
140            ExecutorTime::FakeTime { mono_reading_ns: _, mono_to_boot_offset_ns: _ }
141        );
142
143        Executor {
144            port,
145            monotonic_timers: Arc::new(Timers::<MonotonicInstant>::new(is_fake)),
146            boot_timers: Arc::new(Timers::<BootInstant>::new(is_fake)),
147            done: AtomicBool::new(false),
148            is_local,
149            receivers: PacketReceiverMap::new(),
150            ready_tasks: SegQueue::new(),
151            time,
152            threads_state: AtomicU32::new(0),
153            num_threads,
154            polled: AtomicU64::new(0),
155            owner_data: Mutex::new(None),
156            instrument,
157            hooks_map: HooksMap::default(),
158            first_thread_id: OnceLock::new(),
159            last_active: Default::default(),
160        }
161    }
162
163    pub fn set_local(root_scope: ScopeHandle) {
164        root_scope.executor().first_thread_id.get_or_init(|| std::thread::current().id());
165        EXECUTOR.with(|e| {
166            let mut e = e.borrow_mut();
167            assert!(e.is_none(), "Cannot create multiple Fuchsia Executors");
168            *e = Some(root_scope);
169        });
170    }
171
172    fn poll_ready_tasks(&self, main_task: Option<&TaskHandle>) -> PollReadyTasksResult {
173        loop {
174            for _ in 0..16 {
175                let Some(task) = self.ready_tasks.pop() else {
176                    return PollReadyTasksResult::NoneReady;
177                };
178                let is_main = Some(&task) == main_task;
179                let complete = self.try_poll(task);
180                if complete && is_main {
181                    return PollReadyTasksResult::MainTaskCompleted;
182                }
183                self.polled.fetch_add(1, Ordering::Relaxed);
184            }
185            // We didn't finish all the ready tasks. If there are sleeping threads, post a
186            // notification to wake one up.
187            let mut threads_state = ThreadsState(self.threads_state.load(Ordering::Relaxed));
188            loop {
189                if threads_state.sleeping() == 0 {
190                    // All threads are awake now. Prevent starvation.
191                    return PollReadyTasksResult::MoreReady;
192                }
193                if threads_state.notified() >= threads_state.sleeping() {
194                    // All sleeping threads have been notified. Keep going and poll more tasks.
195                    break;
196                }
197                match self.try_notify(threads_state) {
198                    Ok(()) => break,
199                    Err(s) => threads_state = s,
200                }
201            }
202        }
203    }
204
205    pub fn is_local(&self) -> bool {
206        self.is_local
207    }
208
209    pub fn notify_task_ready(&self) {
210        // Only post if there's no thread running (or soon to be running). If we happen to be
211        // running on a thread for this executor, then threads_state won't be equal to num_threads,
212        // which means notifications only get fired if this is from a non-async thread, or a thread
213        // that belongs to a different executor. We use SeqCst ordering here to make sure this load
214        // happens *after* the change to ready_tasks and to synchronize with worker_lifecycle.
215        let mut threads_state = ThreadsState(self.threads_state.load(Ordering::SeqCst));
216
217        // We only want to notify if there are no pending notifications and there are no other
218        // threads running.
219        while threads_state == ThreadsState(0).with_sleeping(self.num_threads) {
220            match self.try_notify(threads_state) {
221                Ok(()) => break,
222                Err(s) => threads_state = s,
223            }
224        }
225    }
226
227    /// Tries to notify a thread to wake up. Returns threads_state if it fails.
228    fn try_notify(&self, old_threads_state: ThreadsState) -> Result<(), ThreadsState> {
229        self.threads_state
230            .compare_exchange_weak(
231                old_threads_state.0,
232                old_threads_state.0 + ThreadsState(0).with_notified(1).0,
233                Ordering::Relaxed,
234                Ordering::Relaxed,
235            )
236            .map(|_| self.notify_id(TASK_READY_WAKEUP_ID))
237            .map_err(ThreadsState)
238    }
239
240    pub fn wake_one_thread(&self) {
241        let mut threads_state = ThreadsState(self.threads_state.load(Ordering::Relaxed));
242        let current_sleeping = threads_state.sleeping();
243        if current_sleeping == 0 {
244            return;
245        }
246        while threads_state.notified() == 0 && threads_state.sleeping() >= current_sleeping {
247            match self.try_notify(threads_state) {
248                Ok(()) => break,
249                Err(s) => threads_state = s,
250            }
251        }
252    }
253
254    pub fn notify_id(&self, id: u64) {
255        let up = zx::UserPacket::from_u8_array([0; 32]);
256        let packet = zx::Packet::from_user_packet(id, 0 /* status??? */, up);
257        if let Err(e) = self.port.queue(&packet) {
258            // TODO: logging
259            eprintln!("Failed to queue notify in port: {e:?}");
260        }
261    }
262
263    /// Returns the current reading of the monotonic clock.
264    ///
265    /// For test executors running in fake time, returns the reading of the
266    /// fake monotonic clock.
267    pub fn now(&self) -> MonotonicInstant {
268        match &self.time {
269            ExecutorTime::RealTime => MonotonicInstant::from_zx(zx::MonotonicInstant::get()),
270            ExecutorTime::FakeTime { mono_reading_ns: t, .. } => {
271                MonotonicInstant::from_nanos(t.load(Ordering::Relaxed))
272            }
273        }
274    }
275
276    /// Returns the current reading of the boot clock.
277    ///
278    /// For test executors running in fake time, returns the reading of the
279    /// fake boot clock.
280    pub fn boot_now(&self) -> BootInstant {
281        match &self.time {
282            ExecutorTime::RealTime => BootInstant::from_zx(zx::BootInstant::get()),
283
284            ExecutorTime::FakeTime { mono_reading_ns: t, mono_to_boot_offset_ns } => {
285                // The two atomic values are loaded one after the other. This should
286                // not normally be an issue in tests.
287                let fake_mono_now = MonotonicInstant::from_nanos(t.load(Ordering::Relaxed));
288                let boot_offset_ns = mono_to_boot_offset_ns.load(Ordering::Relaxed);
289                BootInstant::from_nanos(fake_mono_now.into_nanos() + boot_offset_ns)
290            }
291        }
292    }
293
294    /// Sets the reading of the fake monotonic clock.
295    ///
296    /// # Panics
297    ///
298    /// If called on an executor that runs in real time.
299    pub fn set_fake_time(&self, new: MonotonicInstant) {
300        let boot_offset_ns = match &self.time {
301            ExecutorTime::RealTime => {
302                panic!("Error: called `set_fake_time` on an executor using actual time.")
303            }
304            ExecutorTime::FakeTime { mono_reading_ns: t, mono_to_boot_offset_ns } => {
305                t.store(new.into_nanos(), Ordering::Relaxed);
306                mono_to_boot_offset_ns.load(Ordering::Relaxed)
307            }
308        };
309        self.monotonic_timers.maybe_notify(new);
310
311        // Changing fake time also affects boot time.  Notify boot clocks as well.
312        let new_boot_time = BootInstant::from_nanos(new.into_nanos() + boot_offset_ns);
313        self.boot_timers.maybe_notify(new_boot_time);
314    }
315
316    // Sets a new offset between boot and monotonic time.
317    //
318    // Only works for executors operating in fake time.
319    // The change in the fake offset will wake expired boot timers.
320    pub fn set_fake_boot_to_mono_offset(&self, offset: BootDuration) {
321        let mono_now_ns = match &self.time {
322            ExecutorTime::RealTime => {
323                panic!(
324                    "Error: called `set_fake_boot_to_mono_offset` on an executor using actual time."
325                )
326            }
327            ExecutorTime::FakeTime { mono_reading_ns: t, mono_to_boot_offset_ns: b } => {
328                // We ignore the non-atomic update between b and t, it is likely
329                // not relevant in tests.
330                b.store(offset.into_nanos(), Ordering::Relaxed);
331                t.load(Ordering::Relaxed)
332            }
333        };
334        let new_boot_now = BootInstant::from_nanos(mono_now_ns) + offset;
335        self.boot_timers.maybe_notify(new_boot_now);
336    }
337
338    /// Returns `true` if this executor is running in real time.  Returns
339    /// `false` if this executor si running in fake time.
340    pub fn is_real_time(&self) -> bool {
341        matches!(self.time, ExecutorTime::RealTime)
342    }
343
344    /// Must be called before `on_parent_drop`.
345    ///
346    /// Done flag must be set before dropping packet receivers
347    /// so that future receivers that attempt to deregister themselves
348    /// know that it's okay if their entries are already missing.
349    pub fn mark_done(&self) {
350        self.done.store(true, Ordering::SeqCst);
351
352        // Make sure there's at least one notification outstanding per thread to wake up all
353        // workers. This might be more notifications than required, but this way we don't have to
354        // worry about races where tasks are just about to sleep; when a task receives the
355        // notification, it will check done and terminate.
356        let mut threads_state = ThreadsState(self.threads_state.load(Ordering::Relaxed));
357        let num_threads = self.num_threads;
358        loop {
359            let notified = threads_state.notified();
360            if notified >= num_threads {
361                break;
362            }
363            match self.threads_state.compare_exchange_weak(
364                threads_state.0,
365                threads_state.with_notified(num_threads).0,
366                Ordering::Relaxed,
367                Ordering::Relaxed,
368            ) {
369                Ok(_) => {
370                    for _ in notified..num_threads {
371                        self.notify_id(TASK_READY_WAKEUP_ID);
372                    }
373                    return;
374                }
375                Err(old) => threads_state = ThreadsState(old),
376            }
377        }
378    }
379
380    /// Notes about the lifecycle of an Executor.
381    ///
382    /// a) The Executor stands as the only way to run a reactor based on a Fuchsia port, but the
383    /// lifecycle of the port itself is not currently tied to it. Executor vends clones of its
384    /// inner Arc structure to all receivers, so we don't have a type-safe way of ensuring that
385    /// the port is dropped alongside the Executor as it should.
386    /// TODO(https://fxbug.dev/42154828): Ensure the port goes away with the executor.
387    ///
388    /// b) The Executor's lifetime is also tied to the thread-local variable pointing to the
389    /// "current" executor being set, and that's unset when the executor is dropped.
390    ///
391    /// Point (a) is related to "what happens if I use a receiver after the executor is dropped",
392    /// and point (b) is related to "what happens when I try to create a new receiver when there
393    /// is no executor".
394    ///
395    /// Tokio, for example, encodes the lifetime of the reactor separately from the thread-local
396    /// storage [1]. And the reactor discourages usage of strong references to it by vending weak
397    /// references to it [2] instead of strong.
398    ///
399    /// There are pros and cons to both strategies. For (a), tokio encourages (but doesn't
400    /// enforce [3]) type-safety by vending weak pointers, but those add runtime overhead when
401    /// upgrading pointers. For (b) the difference mostly stand for "when is it safe to use IO
402    /// objects/receivers". Tokio says it's only safe to use them whenever a guard is in scope.
403    /// Fuchsia-async says it's safe to use them when a fuchsia_async::Executor is still in scope
404    /// in that thread.
405    ///
406    /// This acts as a prelude to the panic encoded in Executor::drop when receivers haven't
407    /// unregistered themselves when the executor drops. The choice to panic was made based on
408    /// patterns in fuchsia-async that may come to change:
409    ///
410    /// - Executor vends strong references to itself and those references are *stored* by most
411    ///   receiver implementations (as opposed to reached out on TLS every time).
412    /// - Fuchsia-async objects return zx::Status on wait calls, there isn't an appropriate and
413    ///   easy to understand error to return when polling on an extinct executor.
414    /// - All receivers are implemented in this crate and well-known.
415    ///
416    /// [1]: https://docs.rs/tokio/1.5.0/tokio/runtime/struct.Runtime.html#method.enter
417    /// [2]: https://github.com/tokio-rs/tokio/blob/b42f21ec3e212ace25331d0c13889a45769e6006/tokio/src/signal/unix/driver.rs#L35
418    /// [3]: by returning an upgraded Arc, tokio trusts callers to not "use it for too long", an
419    /// opaque non-clone-copy-or-send guard would be stronger than this. See:
420    /// https://github.com/tokio-rs/tokio/blob/b42f21ec3e212ace25331d0c13889a45769e6006/tokio/src/io/driver/mod.rs#L297
421    pub fn on_parent_drop(&self, root_scope: &ScopeHandle) {
422        // Drop all tasks.
423        // Any use of fasync::unblock can involve a waker. Wakers hold weak references to tasks, but
424        // as part of waking, there's an upgrade to a strong reference, so for a small amount of
425        // time `fasync::unblock` can hold a strong reference to a task which in turn holds the
426        // future for the task which in turn could hold references to receivers, which, if we did
427        // nothing about it, would trip the assertion below. For that reason, we forcibly drop the
428        // task futures here.
429        root_scope.drop_all_tasks();
430
431        // Drop all of the uncompleted tasks
432        while self.ready_tasks.pop().is_some() {}
433
434        // Deregister the timer receivers so that we can perform the check below.
435        self.monotonic_timers.deregister();
436        self.boot_timers.deregister();
437
438        // Do not allow any receivers to outlive the executor. That's very likely a bug waiting to
439        // happen. See discussion above.
440        //
441        // If you're here because you hit this panic check your code for:
442        //
443        // - A struct that contains a fuchsia_async::Executor NOT in the last position (last
444        // position gets dropped last: https://doc.rust-lang.org/reference/destructors.html).
445        //
446        // - A function scope that contains a fuchsia_async::Executor NOT in the first position
447        // (first position in function scope gets dropped last:
448        // https://doc.rust-lang.org/reference/destructors.html?highlight=scope#drop-scopes).
449        //
450        // - A function that holds a `fuchsia_async::Executor` in scope and whose last statement
451        // contains a temporary (temporaries are dropped after the function scope:
452        // https://doc.rust-lang.org/reference/destructors.html#temporary-scopes). This usually
453        // looks like a `match` statement at the end of the function without a semicolon.
454        //
455        // - Storing channel and FIDL objects in static variables.
456        //
457        // - fuchsia_async::unblock calls that move channels or FIDL objects to another thread.
458        assert!(self.receivers.is_empty(), "receivers must not outlive their executor");
459
460        // Remove the thread-local executor set in `new`.
461        EHandle::rm_local();
462    }
463
464    // The debugger looks for this function on the stack, so if its (fully-qualified) name changes,
465    // the debugger needs to be updated.
466    // LINT.IfChange
467    pub fn worker_lifecycle<const UNTIL_STALLED: bool>(
468        self: &Arc<Executor>,
469        // The main task should be specified for a single-threaded executor, but it isn't necessary
470        // for a multi-threaded executor since it has an independent mechanism for tracking when
471        // the main task terminates.
472        // TODO(https://fxbug.dev/481030722) remove this parameter
473        main_task: Option<&TaskHandle>,
474    ) {
475        // LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
476
477        assert!(
478            !self.is_local() || self.first_thread_id.get() == Some(&std::thread::current().id())
479        );
480
481        self.monotonic_timers.register(self);
482        self.boot_timers.register(self);
483
484        loop {
485            // Keep track of whether we are considered asleep.
486            let mut sleeping = false;
487
488            match self.poll_ready_tasks(main_task) {
489                PollReadyTasksResult::NoneReady => {
490                    // No more tasks, indicate we are sleeping. We use SeqCst ordering because we
491                    // want this change here to happen *before* we check ready_tasks below. This
492                    // synchronizes with notify_task_ready which is called *after* a task is added
493                    // to ready_tasks.
494                    const ONE_SLEEPING: ThreadsState = ThreadsState(0).with_sleeping(1);
495                    self.threads_state.fetch_add(ONE_SLEEPING.0, Ordering::SeqCst);
496                    // Check ready tasks again. If a task got posted, wake up. This has to be done
497                    // because a notification won't get sent if there is at least one active thread
498                    // so there's a window between the preceding two lines where a task could be
499                    // made ready and a notification is not sent because it looks like there is at
500                    // least one thread running.
501                    if self.ready_tasks.is_empty() {
502                        sleeping = true;
503                    } else {
504                        // We lost a race, we're no longer sleeping.
505                        self.threads_state.fetch_sub(ONE_SLEEPING.0, Ordering::Relaxed);
506                    }
507                }
508                PollReadyTasksResult::MoreReady => {}
509                PollReadyTasksResult::MainTaskCompleted => return,
510            }
511
512            // Check done here after updating threads_state to avoid shutdown races.
513            if self.done.load(Ordering::SeqCst) {
514                return;
515            }
516
517            enum Work {
518                None,
519                Packet(zx::Packet),
520                Stalled,
521            }
522
523            let mut notified = false;
524            let work = {
525                // If we're considered awake choose INFINITE_PAST which will make the wait call
526                // return immediately.  Otherwise, wait until a packet arrives.
527                let deadline = if !sleeping || UNTIL_STALLED {
528                    zx::Instant::INFINITE_PAST
529                } else {
530                    zx::Instant::INFINITE
531                };
532
533                match self.port.wait(deadline) {
534                    Ok(packet) => {
535                        if packet.key() == TASK_READY_WAKEUP_ID {
536                            notified = true;
537                            Work::None
538                        } else {
539                            Work::Packet(packet)
540                        }
541                    }
542                    Err(zx::Status::TIMED_OUT) => {
543                        if !UNTIL_STALLED || !sleeping {
544                            Work::None
545                        } else {
546                            Work::Stalled
547                        }
548                    }
549                    Err(status) => {
550                        panic!("Error calling port wait: {status:?}");
551                    }
552                }
553            };
554
555            let threads_state_sub =
556                ThreadsState(0).with_sleeping(sleeping as u8).with_notified(notified as u8);
557            if threads_state_sub.0 > 0 {
558                self.threads_state.fetch_sub(threads_state_sub.0, Ordering::Relaxed);
559            }
560
561            match work {
562                Work::Packet(packet) => self.receivers.receive_packet(packet.key(), packet),
563                Work::None => {}
564                Work::Stalled => return,
565            }
566        }
567    }
568
569    /// Drops the main task.
570    ///
571    /// # Safety
572    ///
573    /// The caller must guarantee that the executor isn't running.
574    pub(super) unsafe fn drop_main_task(&self, root_scope: &ScopeHandle, task: &TaskHandle) {
575        unsafe { root_scope.drop_task_unchecked(task) };
576    }
577
578    fn try_poll(&self, task: TaskHandle) -> bool {
579        let task_waker = task.waker();
580        let poll_result = TaskHandle::set_current_with(&task, || {
581            task.try_poll(&mut Context::from_waker(&task_waker))
582        });
583        if !task.is_low_priority() {
584            self.last_active.store(self.now().into_nanos(), Ordering::Relaxed);
585        }
586        match poll_result {
587            AttemptPollResult::Yield => {
588                self.ready_tasks.push(task);
589                false
590            }
591            AttemptPollResult::IFinished | AttemptPollResult::Aborted => {
592                task.scope().task_did_finish(&task);
593                true
594            }
595            _ => false,
596        }
597    }
598
599    /// Returns the monotonic timers.
600    pub fn monotonic_timers(&self) -> &Timers<MonotonicInstant> {
601        &self.monotonic_timers
602    }
603
604    /// Returns the boot timers.
605    pub fn boot_timers(&self) -> &Timers<BootInstant> {
606        &self.boot_timers
607    }
608
609    fn poll_tasks(&self, callback: impl FnOnce(), main_task: Option<&TaskHandle>) {
610        assert!(!self.is_local);
611
612        // Increment the count of foreign threads.
613        const ONE_FOREIGN: ThreadsState = ThreadsState(0).with_foreign(1);
614        self.threads_state.fetch_add(ONE_FOREIGN.0, Ordering::Relaxed);
615
616        callback();
617
618        // Poll up to 16 tasks.
619        for _ in 0..16 {
620            let Some(task) = self.ready_tasks.pop() else {
621                break;
622            };
623            let is_main = Some(&task) == main_task;
624            if self.try_poll(task) && is_main {
625                break;
626            }
627            self.polled.fetch_add(1, Ordering::Relaxed);
628        }
629
630        let mut threads_state = ThreadsState(
631            self.threads_state.fetch_sub(ONE_FOREIGN.0, Ordering::SeqCst) - ONE_FOREIGN.0,
632        );
633
634        if !self.ready_tasks.is_empty() {
635            // There are tasks still ready to run, so wake up a thread if all the other threads are
636            // sleeping.
637            while threads_state == ThreadsState(0).with_sleeping(self.num_threads) {
638                match self.try_notify(threads_state) {
639                    Ok(()) => break,
640                    Err(s) => threads_state = s,
641                }
642            }
643        }
644    }
645
646    pub fn task_is_ready(&self, task: TaskHandle) {
647        self.ready_tasks.push(task);
648        self.notify_task_ready();
649    }
650
651    pub(crate) fn last_active(&self) -> MonotonicInstant {
652        MonotonicInstant::from_nanos(self.last_active.load(Ordering::Relaxed))
653    }
654}
655
656#[cfg(test)]
657impl Drop for Executor {
658    fn drop(&mut self) {
659        ACTIVE_EXECUTORS.fetch_sub(1, Ordering::Relaxed);
660    }
661}
662
663/// A handle to an executor.
664#[derive(Clone)]
665pub struct EHandle {
666    // LINT.IfChange
667    pub(super) root_scope: ScopeHandle,
668    // LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
669}
670
671impl fmt::Debug for EHandle {
672    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
673        f.debug_struct("EHandle").field("port", &self.inner().port).finish()
674    }
675}
676
677impl EHandle {
678    /// Returns the thread-local executor.
679    ///
680    /// # Panics
681    ///
682    /// If called outside the context of an active async executor.
683    pub fn local() -> Self {
684        let root_scope = EXECUTOR
685            .with(|e| e.borrow().as_ref().map(|x| x.clone()))
686            .expect("Fuchsia Executor must be created first");
687
688        EHandle { root_scope }
689    }
690
691    /// Set the fake time to a given value.
692    ///
693    /// # Panics
694    ///
695    /// If the executor was not created with fake time.
696    pub fn set_fake_time(&self, t: MonotonicInstant) {
697        self.inner().set_fake_time(t)
698    }
699
700    pub(super) fn rm_local() {
701        EXECUTOR.with(|e| *e.borrow_mut() = None);
702    }
703
704    /// The root scope of the executor.
705    ///
706    /// This can be used to spawn tasks that live as long as the executor, and
707    /// to create shorter-lived child scopes.
708    ///
709    /// Most users should create an owned scope with
710    /// [`Scope::new_with_name`][crate::Scope::new_with_name] instead of using this method.
711    pub fn global_scope(&self) -> &ScopeHandle {
712        &self.root_scope
713    }
714
715    /// Get a reference to the Fuchsia `zx::Port` being used to listen for events.
716    pub fn port(&self) -> &zx::Port {
717        &self.inner().port
718    }
719
720    /// Registers a `PacketReceiver` with the executor and returns a registration.
721    /// The `PacketReceiver` will be deregistered when the `Registration` is dropped.
722    pub fn register_receiver<T: PacketReceiver>(&self, receiver: T) -> ReceiverRegistration<T> {
723        self.inner().receivers.register(self.inner().clone(), receiver)
724    }
725
726    /// Registers a pinned `RawPacketReceiver` with the executor.
727    ///
728    /// The registration will be deregistered when dropped.
729    ///
730    /// NOTE: Unlike with `register_receiver`, `receive_packet` will be called whilst a lock is
731    /// held, so it is not safe to register or unregister receivers at that time.
732    pub fn register_pinned<T: PacketReceiver>(
733        &self,
734        raw_registration: Pin<&mut RawReceiverRegistration<T>>,
735    ) {
736        self.inner().receivers.register_pinned(self.clone(), raw_registration);
737    }
738
739    #[inline(always)]
740    pub(crate) fn inner(&self) -> &Arc<Executor> {
741        self.root_scope.executor()
742    }
743
744    /// Spawn a new task to be run on this executor.
745    ///
746    /// Tasks spawned using this method must be thread-safe (implement the `Send` trait), as they
747    /// may be run on either a singlethreaded or multithreaded executor.
748    pub fn spawn_detached(&self, future: impl Future<Output = ()> + Send + 'static) {
749        self.global_scope().spawn(future);
750    }
751
752    /// Spawn a new task to be run on this executor.
753    ///
754    /// This is similar to the `spawn_detached` method, but tasks spawned using this method do not
755    /// have to be threads-safe (implement the `Send` trait). In return, this method requires that
756    /// this executor is a LocalExecutor.
757    pub fn spawn_local_detached(&self, future: impl Future<Output = ()> + 'static) {
758        self.global_scope().spawn_local(future);
759    }
760
761    pub(crate) fn mono_timers(&self) -> &Arc<Timers<MonotonicInstant>> {
762        &self.inner().monotonic_timers
763    }
764
765    pub(crate) fn boot_timers(&self) -> &Arc<Timers<BootInstant>> {
766        &self.inner().boot_timers
767    }
768
769    /// Calls `callback` in the context of the executor and then polls (a limited number of) tasks
770    /// that are ready to run.  If tasks remain ready and no other threads are running, a thread
771    /// will be woken.  This can end up being a performance win in the case that the queue can be
772    /// cleared without needing to wake any other thread.
773    ///
774    /// # Panics
775    ///
776    /// If called on a single-threaded executor or if this thread is a thread managed by the
777    /// executor.
778    pub fn poll_tasks(&self, callback: impl FnOnce()) {
779        EXECUTOR.with(|e| {
780            assert!(
781                e.borrow_mut().replace(self.root_scope.clone()).is_none(),
782                "This thread is already associated with an executor"
783            );
784        });
785
786        // `poll_tasks` should only ever be used on a multi-threaded executor, it's safe to pass
787        // None.
788        self.inner().poll_tasks(callback, None);
789
790        EXECUTOR.with(|e| *e.borrow_mut() = None);
791    }
792}
793
794// AtomicFutureHandle can have a lifetime (for local executors we allow the main task to have a
795// non-static lifetime).  The executor doesn't handle this though; the executor just assumes all
796// tasks have the 'static lifetime.  It's up to the local executor to extend the lifetime and make
797// it safe.
798pub type TaskHandle = AtomicFutureHandle<'static>;
799
800thread_local! {
801    static CURRENT_TASK: Cell<*const TaskHandle> = const { Cell::new(std::ptr::null()) };
802}
803
804impl TaskHandle {
805    pub(crate) fn with_current<R>(f: impl FnOnce(Option<&TaskHandle>) -> R) -> R {
806        CURRENT_TASK.with(|cur| {
807            let cur = cur.get();
808            let cur = unsafe { cur.as_ref() };
809            f(cur)
810        })
811    }
812
813    fn set_current_with<R>(task: &TaskHandle, f: impl FnOnce() -> R) -> R {
814        CURRENT_TASK.with(|cur| {
815            cur.set(task);
816            let result = f();
817            cur.set(std::ptr::null());
818            result
819        })
820    }
821}
822
823#[cfg(test)]
824mod tests {
825    use super::{ACTIVE_EXECUTORS, EHandle};
826    use crate::{LocalExecutorBuilder, SendExecutorBuilder};
827    use std::sync::Arc;
828    use std::sync::atomic::{AtomicU64, Ordering};
829
830    #[test]
831    fn test_no_leaks() {
832        std::thread::spawn(|| SendExecutorBuilder::new().num_threads(1).build().run(async {}))
833            .join()
834            .unwrap();
835
836        assert_eq!(ACTIVE_EXECUTORS.load(Ordering::Relaxed), 0);
837    }
838
839    #[test]
840    fn poll_tasks() {
841        SendExecutorBuilder::new().num_threads(1).build().run(async {
842            let ehandle = EHandle::local();
843
844            // This will tie up the executor's only running thread which ensures that the task
845            // we spawn below can only run on the foreign thread.
846            std::thread::spawn(move || {
847                let ran = Arc::new(AtomicU64::new(0));
848                ehandle.poll_tasks(|| {
849                    let ran = ran.clone();
850                    ehandle.spawn_detached(async move {
851                        ran.fetch_add(1, Ordering::Relaxed);
852                    });
853                });
854
855                // The spawned task should have run in this thread.
856                assert_eq!(ran.load(Ordering::Relaxed), 1);
857            })
858            .join()
859            .unwrap();
860        });
861    }
862
863    #[test]
864    #[should_panic]
865    fn spawn_local_from_different_thread() {
866        let _executor = LocalExecutorBuilder::new().build();
867        let ehandle = EHandle::local();
868        let _ = std::thread::spawn(move || {
869            ehandle.spawn_local_detached(async {});
870        })
871        .join();
872    }
873}