fuchsia_async/runtime/fuchsia/executor/
common.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use super::super::timer::Timers;
6use super::atomic_future::hooks::HooksMap;
7use super::atomic_future::{AtomicFutureHandle, AttemptPollResult};
8use super::packets::{
9    PacketReceiver, PacketReceiverMap, RawReceiverRegistration, ReceiverRegistration,
10};
11use super::scope::ScopeHandle;
12use super::time::{BootInstant, MonotonicInstant};
13use crate::runtime::instrument::TaskInstrument;
14use crossbeam::queue::SegQueue;
15use fuchsia_sync::Mutex;
16use zx::BootDuration;
17
18use std::any::Any;
19use std::cell::{Cell, RefCell};
20use std::fmt;
21use std::future::Future;
22use std::pin::Pin;
23use std::sync::Arc;
24use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU32, AtomicU64, AtomicUsize, Ordering};
25use std::task::Context;
26
27pub(crate) const TASK_READY_WAKEUP_ID: u64 = u64::MAX - 1;
28
29/// The id of the main task, which is a virtual task that lives from construction
30/// to destruction of the executor. The main task may correspond to multiple
31/// main futures, in cases where the executor runs multiple times during its lifetime.
32pub(crate) const MAIN_TASK_ID: usize = 0;
33
34thread_local!(
35    static EXECUTOR: RefCell<Option<ScopeHandle>> = const { RefCell::new(None) }
36);
37
38pub enum ExecutorTime {
39    RealTime,
40    /// Fake readings used in tests.
41    FakeTime {
42        // The fake monotonic clock reading.
43        mono_reading_ns: AtomicI64,
44        // An offset to add to mono_reading_ns to get the reading of the boot
45        // clock, disregarding the difference in timelines.
46        //
47        // We disregard the fact that the reading and offset can not be
48        // read atomically, this is usually not relevant in tests.
49        mono_to_boot_offset_ns: AtomicI64,
50    },
51}
52
53enum PollReadyTasksResult {
54    NoneReady,
55    MoreReady,
56    MainTaskCompleted,
57}
58
59///  24           16           8            0
60///  +------------+------------+------------+
61///  |  foreign   |  notified  |  sleeping  |
62///  +------------+------------+------------+
63///
64///  sleeping : the number of threads sleeping
65///  notified : the number of notifications posted to wake sleeping threads
66///  foreign  : the number of foreign threads processing tasks
67#[derive(Clone, Copy, Eq, PartialEq)]
68struct ThreadsState(u32);
69
70impl ThreadsState {
71    const fn sleeping(&self) -> u8 {
72        self.0 as u8
73    }
74
75    const fn notified(&self) -> u8 {
76        (self.0 >> 8) as u8
77    }
78
79    const fn with_sleeping(self, sleeping: u8) -> Self {
80        Self((self.0 & !0xff) | sleeping as u32)
81    }
82
83    const fn with_notified(self, notified: u8) -> Self {
84        Self(self.0 & !0xff00 | (notified as u32) << 8)
85    }
86
87    const fn with_foreign(self, foreign: u8) -> Self {
88        Self(self.0 & !0xff0000 | (foreign as u32) << 16)
89    }
90}
91
92#[cfg(test)]
93static ACTIVE_EXECUTORS: AtomicUsize = AtomicUsize::new(0);
94
95pub(crate) struct Executor {
96    pub(super) port: zx::Port,
97    monotonic_timers: Arc<Timers<MonotonicInstant>>,
98    boot_timers: Arc<Timers<BootInstant>>,
99    pub(super) done: AtomicBool,
100    is_local: bool,
101    pub(crate) receivers: PacketReceiverMap,
102    task_count: AtomicUsize,
103    pub(super) ready_tasks: SegQueue<TaskHandle>,
104    time: ExecutorTime,
105    // The low byte is the number of threads currently sleeping. The high byte is the number of
106    // of wake-up notifications pending.
107    pub(super) threads_state: AtomicU32,
108    pub(super) num_threads: u8,
109    pub(super) polled: AtomicU64,
110    // Data that belongs to the user that can be accessed via EHandle::local(). See
111    // `TestExecutor::poll_until_stalled`.
112    pub(super) owner_data: Mutex<Option<Box<dyn Any + Send>>>,
113    pub(super) instrument: Option<Arc<dyn TaskInstrument>>,
114    pub(super) hooks_map: HooksMap,
115}
116
117impl Executor {
118    pub fn new(
119        time: ExecutorTime,
120        is_local: bool,
121        num_threads: u8,
122        instrument: Option<Arc<dyn TaskInstrument>>,
123    ) -> Self {
124        Self::new_with_port(time, is_local, num_threads, zx::Port::create(), instrument)
125    }
126
127    pub fn new_with_port(
128        time: ExecutorTime,
129        is_local: bool,
130        num_threads: u8,
131        port: zx::Port,
132        instrument: Option<Arc<dyn TaskInstrument>>,
133    ) -> Self {
134        #[cfg(test)]
135        ACTIVE_EXECUTORS.fetch_add(1, Ordering::Relaxed);
136
137        // Is this a fake-time executor?
138        let is_fake = matches!(
139            time,
140            ExecutorTime::FakeTime { mono_reading_ns: _, mono_to_boot_offset_ns: _ }
141        );
142
143        Executor {
144            port,
145            monotonic_timers: Arc::new(Timers::<MonotonicInstant>::new(is_fake)),
146            boot_timers: Arc::new(Timers::<BootInstant>::new(is_fake)),
147            done: AtomicBool::new(false),
148            is_local,
149            receivers: PacketReceiverMap::new(),
150            task_count: AtomicUsize::new(MAIN_TASK_ID + 1),
151            ready_tasks: SegQueue::new(),
152            time,
153            threads_state: AtomicU32::new(0),
154            num_threads,
155            polled: AtomicU64::new(0),
156            owner_data: Mutex::new(None),
157            instrument,
158            hooks_map: HooksMap::default(),
159        }
160    }
161
162    pub fn set_local(root_scope: ScopeHandle) {
163        EXECUTOR.with(|e| {
164            let mut e = e.borrow_mut();
165            assert!(e.is_none(), "Cannot create multiple Fuchsia Executors");
166            *e = Some(root_scope);
167        });
168    }
169
170    fn poll_ready_tasks(&self) -> PollReadyTasksResult {
171        loop {
172            for _ in 0..16 {
173                let Some(task) = self.ready_tasks.pop() else {
174                    return PollReadyTasksResult::NoneReady;
175                };
176                let task_id = task.id();
177                let complete = self.try_poll(task);
178                if complete && task_id == MAIN_TASK_ID {
179                    return PollReadyTasksResult::MainTaskCompleted;
180                }
181                self.polled.fetch_add(1, Ordering::Relaxed);
182            }
183            // We didn't finish all the ready tasks. If there are sleeping threads, post a
184            // notification to wake one up.
185            let mut threads_state = ThreadsState(self.threads_state.load(Ordering::Relaxed));
186            loop {
187                if threads_state.sleeping() == 0 {
188                    // All threads are awake now. Prevent starvation.
189                    return PollReadyTasksResult::MoreReady;
190                }
191                if threads_state.notified() >= threads_state.sleeping() {
192                    // All sleeping threads have been notified. Keep going and poll more tasks.
193                    break;
194                }
195                match self.try_notify(threads_state) {
196                    Ok(()) => break,
197                    Err(s) => threads_state = s,
198                }
199            }
200        }
201    }
202
203    pub fn is_local(&self) -> bool {
204        self.is_local
205    }
206
207    pub fn next_task_id(&self) -> usize {
208        self.task_count.fetch_add(1, Ordering::Relaxed)
209    }
210
211    pub fn notify_task_ready(&self) {
212        // Only post if there's no thread running (or soon to be running). If we happen to be
213        // running on a thread for this executor, then threads_state won't be equal to num_threads,
214        // which means notifications only get fired if this is from a non-async thread, or a thread
215        // that belongs to a different executor. We use SeqCst ordering here to make sure this load
216        // happens *after* the change to ready_tasks and to synchronize with worker_lifecycle.
217        let mut threads_state = ThreadsState(self.threads_state.load(Ordering::SeqCst));
218
219        // We only want to notify if there are no pending notifications and there are no other
220        // threads running.
221        while threads_state == ThreadsState(0).with_sleeping(self.num_threads) {
222            match self.try_notify(threads_state) {
223                Ok(()) => break,
224                Err(s) => threads_state = s,
225            }
226        }
227    }
228
229    /// Tries to notify a thread to wake up. Returns threads_state if it fails.
230    fn try_notify(&self, old_threads_state: ThreadsState) -> Result<(), ThreadsState> {
231        self.threads_state
232            .compare_exchange_weak(
233                old_threads_state.0,
234                old_threads_state.0 + ThreadsState(0).with_notified(1).0,
235                Ordering::Relaxed,
236                Ordering::Relaxed,
237            )
238            .map(|_| self.notify_id(TASK_READY_WAKEUP_ID))
239            .map_err(ThreadsState)
240    }
241
242    pub fn wake_one_thread(&self) {
243        let mut threads_state = ThreadsState(self.threads_state.load(Ordering::Relaxed));
244        let current_sleeping = threads_state.sleeping();
245        if current_sleeping == 0 {
246            return;
247        }
248        while threads_state.notified() == 0 && threads_state.sleeping() >= current_sleeping {
249            match self.try_notify(threads_state) {
250                Ok(()) => break,
251                Err(s) => threads_state = s,
252            }
253        }
254    }
255
256    pub fn notify_id(&self, id: u64) {
257        let up = zx::UserPacket::from_u8_array([0; 32]);
258        let packet = zx::Packet::from_user_packet(id, 0 /* status??? */, up);
259        if let Err(e) = self.port.queue(&packet) {
260            // TODO: logging
261            eprintln!("Failed to queue notify in port: {e:?}");
262        }
263    }
264
265    /// Returns the current reading of the monotonic clock.
266    ///
267    /// For test executors running in fake time, returns the reading of the
268    /// fake monotonic clock.
269    pub fn now(&self) -> MonotonicInstant {
270        match &self.time {
271            ExecutorTime::RealTime => MonotonicInstant::from_zx(zx::MonotonicInstant::get()),
272            ExecutorTime::FakeTime { mono_reading_ns: t, .. } => {
273                MonotonicInstant::from_nanos(t.load(Ordering::Relaxed))
274            }
275        }
276    }
277
278    /// Returns the current reading of the boot clock.
279    ///
280    /// For test executors running in fake time, returns the reading of the
281    /// fake boot clock.
282    pub fn boot_now(&self) -> BootInstant {
283        match &self.time {
284            ExecutorTime::RealTime => BootInstant::from_zx(zx::BootInstant::get()),
285
286            ExecutorTime::FakeTime { mono_reading_ns: t, mono_to_boot_offset_ns } => {
287                // The two atomic values are loaded one after the other. This should
288                // not normally be an issue in tests.
289                let fake_mono_now = MonotonicInstant::from_nanos(t.load(Ordering::Relaxed));
290                let boot_offset_ns = mono_to_boot_offset_ns.load(Ordering::Relaxed);
291                BootInstant::from_nanos(fake_mono_now.into_nanos() + boot_offset_ns)
292            }
293        }
294    }
295
296    /// Sets the reading of the fake monotonic clock.
297    ///
298    /// # Panics
299    ///
300    /// If called on an executor that runs in real time.
301    pub fn set_fake_time(&self, new: MonotonicInstant) {
302        let boot_offset_ns = match &self.time {
303            ExecutorTime::RealTime => {
304                panic!("Error: called `set_fake_time` on an executor using actual time.")
305            }
306            ExecutorTime::FakeTime { mono_reading_ns: t, mono_to_boot_offset_ns } => {
307                t.store(new.into_nanos(), Ordering::Relaxed);
308                mono_to_boot_offset_ns.load(Ordering::Relaxed)
309            }
310        };
311        self.monotonic_timers.maybe_notify(new);
312
313        // Changing fake time also affects boot time.  Notify boot clocks as well.
314        let new_boot_time = BootInstant::from_nanos(new.into_nanos() + boot_offset_ns);
315        self.boot_timers.maybe_notify(new_boot_time);
316    }
317
318    // Sets a new offset between boot and monotonic time.
319    //
320    // Only works for executors operating in fake time.
321    // The change in the fake offset will wake expired boot timers.
322    pub fn set_fake_boot_to_mono_offset(&self, offset: BootDuration) {
323        let mono_now_ns = match &self.time {
324            ExecutorTime::RealTime => {
325                panic!(
326                    "Error: called `set_fake_boot_to_mono_offset` on an executor using actual time."
327                )
328            }
329            ExecutorTime::FakeTime { mono_reading_ns: t, mono_to_boot_offset_ns: b } => {
330                // We ignore the non-atomic update between b and t, it is likely
331                // not relevant in tests.
332                b.store(offset.into_nanos(), Ordering::Relaxed);
333                t.load(Ordering::Relaxed)
334            }
335        };
336        let new_boot_now = BootInstant::from_nanos(mono_now_ns) + offset;
337        self.boot_timers.maybe_notify(new_boot_now);
338    }
339
340    /// Returns `true` if this executor is running in real time.  Returns
341    /// `false` if this executor si running in fake time.
342    pub fn is_real_time(&self) -> bool {
343        matches!(self.time, ExecutorTime::RealTime)
344    }
345
346    /// Must be called before `on_parent_drop`.
347    ///
348    /// Done flag must be set before dropping packet receivers
349    /// so that future receivers that attempt to deregister themselves
350    /// know that it's okay if their entries are already missing.
351    pub fn mark_done(&self) {
352        self.done.store(true, Ordering::SeqCst);
353
354        // Make sure there's at least one notification outstanding per thread to wake up all
355        // workers. This might be more notifications than required, but this way we don't have to
356        // worry about races where tasks are just about to sleep; when a task receives the
357        // notification, it will check done and terminate.
358        let mut threads_state = ThreadsState(self.threads_state.load(Ordering::Relaxed));
359        let num_threads = self.num_threads;
360        loop {
361            let notified = threads_state.notified();
362            if notified >= num_threads {
363                break;
364            }
365            match self.threads_state.compare_exchange_weak(
366                threads_state.0,
367                threads_state.with_notified(num_threads).0,
368                Ordering::Relaxed,
369                Ordering::Relaxed,
370            ) {
371                Ok(_) => {
372                    for _ in notified..num_threads {
373                        self.notify_id(TASK_READY_WAKEUP_ID);
374                    }
375                    return;
376                }
377                Err(old) => threads_state = ThreadsState(old),
378            }
379        }
380    }
381
382    /// Notes about the lifecycle of an Executor.
383    ///
384    /// a) The Executor stands as the only way to run a reactor based on a Fuchsia port, but the
385    /// lifecycle of the port itself is not currently tied to it. Executor vends clones of its
386    /// inner Arc structure to all receivers, so we don't have a type-safe way of ensuring that
387    /// the port is dropped alongside the Executor as it should.
388    /// TODO(https://fxbug.dev/42154828): Ensure the port goes away with the executor.
389    ///
390    /// b) The Executor's lifetime is also tied to the thread-local variable pointing to the
391    /// "current" executor being set, and that's unset when the executor is dropped.
392    ///
393    /// Point (a) is related to "what happens if I use a receiver after the executor is dropped",
394    /// and point (b) is related to "what happens when I try to create a new receiver when there
395    /// is no executor".
396    ///
397    /// Tokio, for example, encodes the lifetime of the reactor separately from the thread-local
398    /// storage [1]. And the reactor discourages usage of strong references to it by vending weak
399    /// references to it [2] instead of strong.
400    ///
401    /// There are pros and cons to both strategies. For (a), tokio encourages (but doesn't
402    /// enforce [3]) type-safety by vending weak pointers, but those add runtime overhead when
403    /// upgrading pointers. For (b) the difference mostly stand for "when is it safe to use IO
404    /// objects/receivers". Tokio says it's only safe to use them whenever a guard is in scope.
405    /// Fuchsia-async says it's safe to use them when a fuchsia_async::Executor is still in scope
406    /// in that thread.
407    ///
408    /// This acts as a prelude to the panic encoded in Executor::drop when receivers haven't
409    /// unregistered themselves when the executor drops. The choice to panic was made based on
410    /// patterns in fuchsia-async that may come to change:
411    ///
412    /// - Executor vends strong references to itself and those references are *stored* by most
413    ///   receiver implementations (as opposed to reached out on TLS every time).
414    /// - Fuchsia-async objects return zx::Status on wait calls, there isn't an appropriate and
415    ///   easy to understand error to return when polling on an extinct executor.
416    /// - All receivers are implemented in this crate and well-known.
417    ///
418    /// [1]: https://docs.rs/tokio/1.5.0/tokio/runtime/struct.Runtime.html#method.enter
419    /// [2]: https://github.com/tokio-rs/tokio/blob/b42f21ec3e212ace25331d0c13889a45769e6006/tokio/src/signal/unix/driver.rs#L35
420    /// [3]: by returning an upgraded Arc, tokio trusts callers to not "use it for too long", an
421    /// opaque non-clone-copy-or-send guard would be stronger than this. See:
422    /// https://github.com/tokio-rs/tokio/blob/b42f21ec3e212ace25331d0c13889a45769e6006/tokio/src/io/driver/mod.rs#L297
423    pub fn on_parent_drop(&self, root_scope: &ScopeHandle) {
424        // Drop all tasks.
425        // Any use of fasync::unblock can involve a waker. Wakers hold weak references to tasks, but
426        // as part of waking, there's an upgrade to a strong reference, so for a small amount of
427        // time `fasync::unblock` can hold a strong reference to a task which in turn holds the
428        // future for the task which in turn could hold references to receivers, which, if we did
429        // nothing about it, would trip the assertion below. For that reason, we forcibly drop the
430        // task futures here.
431        root_scope.drop_all_tasks();
432
433        // Drop all of the uncompleted tasks
434        while self.ready_tasks.pop().is_some() {}
435
436        // Deregister the timer receivers so that we can perform the check below.
437        self.monotonic_timers.deregister();
438        self.boot_timers.deregister();
439
440        // Do not allow any receivers to outlive the executor. That's very likely a bug waiting to
441        // happen. See discussion above.
442        //
443        // If you're here because you hit this panic check your code for:
444        //
445        // - A struct that contains a fuchsia_async::Executor NOT in the last position (last
446        // position gets dropped last: https://doc.rust-lang.org/reference/destructors.html).
447        //
448        // - A function scope that contains a fuchsia_async::Executor NOT in the first position
449        // (first position in function scope gets dropped last:
450        // https://doc.rust-lang.org/reference/destructors.html?highlight=scope#drop-scopes).
451        //
452        // - A function that holds a `fuchsia_async::Executor` in scope and whose last statement
453        // contains a temporary (temporaries are dropped after the function scope:
454        // https://doc.rust-lang.org/reference/destructors.html#temporary-scopes). This usually
455        // looks like a `match` statement at the end of the function without a semicolon.
456        //
457        // - Storing channel and FIDL objects in static variables.
458        //
459        // - fuchsia_async::unblock calls that move channels or FIDL objects to another thread.
460        assert!(self.receivers.is_empty(), "receivers must not outlive their executor");
461
462        // Remove the thread-local executor set in `new`.
463        EHandle::rm_local();
464    }
465
466    // The debugger looks for this function on the stack, so if its (fully-qualified) name changes,
467    // the debugger needs to be updated.
468    // LINT.IfChange
469    pub fn worker_lifecycle<const UNTIL_STALLED: bool>(self: &Arc<Executor>) {
470        // LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
471
472        self.monotonic_timers.register(self);
473        self.boot_timers.register(self);
474
475        loop {
476            // Keep track of whether we are considered asleep.
477            let mut sleeping = false;
478
479            match self.poll_ready_tasks() {
480                PollReadyTasksResult::NoneReady => {
481                    // No more tasks, indicate we are sleeping. We use SeqCst ordering because we
482                    // want this change here to happen *before* we check ready_tasks below. This
483                    // synchronizes with notify_task_ready which is called *after* a task is added
484                    // to ready_tasks.
485                    const ONE_SLEEPING: ThreadsState = ThreadsState(0).with_sleeping(1);
486                    self.threads_state.fetch_add(ONE_SLEEPING.0, Ordering::SeqCst);
487                    // Check ready tasks again. If a task got posted, wake up. This has to be done
488                    // because a notification won't get sent if there is at least one active thread
489                    // so there's a window between the preceding two lines where a task could be
490                    // made ready and a notification is not sent because it looks like there is at
491                    // least one thread running.
492                    if self.ready_tasks.is_empty() {
493                        sleeping = true;
494                    } else {
495                        // We lost a race, we're no longer sleeping.
496                        self.threads_state.fetch_sub(ONE_SLEEPING.0, Ordering::Relaxed);
497                    }
498                }
499                PollReadyTasksResult::MoreReady => {}
500                PollReadyTasksResult::MainTaskCompleted => return,
501            }
502
503            // Check done here after updating threads_state to avoid shutdown races.
504            if self.done.load(Ordering::SeqCst) {
505                return;
506            }
507
508            enum Work {
509                None,
510                Packet(zx::Packet),
511                Stalled,
512            }
513
514            let mut notified = false;
515            let work = {
516                // If we're considered awake choose INFINITE_PAST which will make the wait call
517                // return immediately.  Otherwise, wait until a packet arrives.
518                let deadline = if !sleeping || UNTIL_STALLED {
519                    zx::Instant::INFINITE_PAST
520                } else {
521                    zx::Instant::INFINITE
522                };
523
524                match self.port.wait(deadline) {
525                    Ok(packet) => {
526                        if packet.key() == TASK_READY_WAKEUP_ID {
527                            notified = true;
528                            Work::None
529                        } else {
530                            Work::Packet(packet)
531                        }
532                    }
533                    Err(zx::Status::TIMED_OUT) => {
534                        if !UNTIL_STALLED || !sleeping {
535                            Work::None
536                        } else {
537                            Work::Stalled
538                        }
539                    }
540                    Err(status) => {
541                        panic!("Error calling port wait: {status:?}");
542                    }
543                }
544            };
545
546            let threads_state_sub =
547                ThreadsState(0).with_sleeping(sleeping as u8).with_notified(notified as u8);
548            if threads_state_sub.0 > 0 {
549                self.threads_state.fetch_sub(threads_state_sub.0, Ordering::Relaxed);
550            }
551
552            match work {
553                Work::Packet(packet) => self.receivers.receive_packet(packet.key(), packet),
554                Work::None => {}
555                Work::Stalled => return,
556            }
557        }
558    }
559
560    /// Drops the main task.
561    ///
562    /// # Safety
563    ///
564    /// The caller must guarantee that the executor isn't running.
565    pub(super) unsafe fn drop_main_task(&self, root_scope: &ScopeHandle) {
566        root_scope.drop_task_unchecked(MAIN_TASK_ID);
567    }
568
569    fn try_poll(&self, task: TaskHandle) -> bool {
570        let task_waker = task.waker();
571        let poll_result = TaskHandle::set_current_with(&task, || {
572            task.try_poll(&mut Context::from_waker(&task_waker))
573        });
574        match poll_result {
575            AttemptPollResult::Yield => {
576                self.ready_tasks.push(task);
577                false
578            }
579            AttemptPollResult::IFinished | AttemptPollResult::Aborted => {
580                task.scope().task_did_finish(task.id());
581                true
582            }
583            _ => false,
584        }
585    }
586
587    /// Returns the monotonic timers.
588    pub fn monotonic_timers(&self) -> &Timers<MonotonicInstant> {
589        &self.monotonic_timers
590    }
591
592    /// Returns the boot timers.
593    pub fn boot_timers(&self) -> &Timers<BootInstant> {
594        &self.boot_timers
595    }
596
597    fn poll_tasks(&self, callback: impl FnOnce()) {
598        assert!(!self.is_local);
599
600        // Increment the count of foreign threads.
601        const ONE_FOREIGN: ThreadsState = ThreadsState(0).with_foreign(1);
602        self.threads_state.fetch_add(ONE_FOREIGN.0, Ordering::Relaxed);
603
604        callback();
605
606        // Poll up to 16 tasks.
607        for _ in 0..16 {
608            let Some(task) = self.ready_tasks.pop() else {
609                break;
610            };
611            let task_id = task.id();
612            if self.try_poll(task) && task_id == MAIN_TASK_ID {
613                break;
614            }
615            self.polled.fetch_add(1, Ordering::Relaxed);
616        }
617
618        let mut threads_state = ThreadsState(
619            self.threads_state.fetch_sub(ONE_FOREIGN.0, Ordering::SeqCst) - ONE_FOREIGN.0,
620        );
621
622        if !self.ready_tasks.is_empty() {
623            // There are tasks still ready to run, so wake up a thread if all the other threads are
624            // sleeping.
625            while threads_state == ThreadsState(0).with_sleeping(self.num_threads) {
626                match self.try_notify(threads_state) {
627                    Ok(()) => break,
628                    Err(s) => threads_state = s,
629                }
630            }
631        }
632    }
633
634    pub fn task_is_ready(&self, task: TaskHandle) {
635        self.ready_tasks.push(task);
636        self.notify_task_ready();
637    }
638}
639
640#[cfg(test)]
641impl Drop for Executor {
642    fn drop(&mut self) {
643        ACTIVE_EXECUTORS.fetch_sub(1, Ordering::Relaxed);
644    }
645}
646
647/// A handle to an executor.
648#[derive(Clone)]
649pub struct EHandle {
650    // LINT.IfChange
651    pub(super) root_scope: ScopeHandle,
652    // LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
653}
654
655impl fmt::Debug for EHandle {
656    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
657        f.debug_struct("EHandle").field("port", &self.inner().port).finish()
658    }
659}
660
661impl EHandle {
662    /// Returns the thread-local executor.
663    ///
664    /// # Panics
665    ///
666    /// If called outside the context of an active async executor.
667    pub fn local() -> Self {
668        let root_scope = EXECUTOR
669            .with(|e| e.borrow().as_ref().map(|x| x.clone()))
670            .expect("Fuchsia Executor must be created first");
671
672        EHandle { root_scope }
673    }
674
675    /// Set the fake time to a given value.
676    ///
677    /// # Panics
678    ///
679    /// If the executor was not created with fake time.
680    pub fn set_fake_time(&self, t: MonotonicInstant) {
681        self.inner().set_fake_time(t)
682    }
683
684    pub(super) fn rm_local() {
685        EXECUTOR.with(|e| *e.borrow_mut() = None);
686    }
687
688    /// The root scope of the executor.
689    ///
690    /// This can be used to spawn tasks that live as long as the executor, and
691    /// to create shorter-lived child scopes.
692    ///
693    /// Most users should create an owned scope with
694    /// [`Scope::new_with_name`][crate::Scope::new_with_name] instead of using this method.
695    pub fn global_scope(&self) -> &ScopeHandle {
696        &self.root_scope
697    }
698
699    /// Get a reference to the Fuchsia `zx::Port` being used to listen for events.
700    pub fn port(&self) -> &zx::Port {
701        &self.inner().port
702    }
703
704    /// Registers a `PacketReceiver` with the executor and returns a registration.
705    /// The `PacketReceiver` will be deregistered when the `Registration` is dropped.
706    pub fn register_receiver<T: PacketReceiver>(&self, receiver: T) -> ReceiverRegistration<T> {
707        self.inner().receivers.register(self.inner().clone(), receiver)
708    }
709
710    /// Registers a pinned `RawPacketReceiver` with the executor.
711    ///
712    /// The registration will be deregistered when dropped.
713    ///
714    /// NOTE: Unlike with `register_receiver`, `receive_packet` will be called whilst a lock is
715    /// held, so it is not safe to register or unregister receivers at that time.
716    pub fn register_pinned<T: PacketReceiver>(
717        &self,
718        raw_registration: Pin<&mut RawReceiverRegistration<T>>,
719    ) {
720        self.inner().receivers.register_pinned(self.clone(), raw_registration);
721    }
722
723    #[inline(always)]
724    pub(crate) fn inner(&self) -> &Arc<Executor> {
725        self.root_scope.executor()
726    }
727
728    /// Spawn a new task to be run on this executor.
729    ///
730    /// Tasks spawned using this method must be thread-safe (implement the `Send` trait), as they
731    /// may be run on either a singlethreaded or multithreaded executor.
732    pub fn spawn_detached(&self, future: impl Future<Output = ()> + Send + 'static) {
733        self.global_scope().spawn(future);
734    }
735
736    /// Spawn a new task to be run on this executor.
737    ///
738    /// This is similar to the `spawn_detached` method, but tasks spawned using this method do not
739    /// have to be threads-safe (implement the `Send` trait). In return, this method requires that
740    /// this executor is a LocalExecutor.
741    pub fn spawn_local_detached(&self, future: impl Future<Output = ()> + 'static) {
742        self.global_scope().spawn_local(future);
743    }
744
745    pub(crate) fn mono_timers(&self) -> &Arc<Timers<MonotonicInstant>> {
746        &self.inner().monotonic_timers
747    }
748
749    pub(crate) fn boot_timers(&self) -> &Arc<Timers<BootInstant>> {
750        &self.inner().boot_timers
751    }
752
753    /// Calls `callback` in the context of the executor and then polls (a limited number of) tasks
754    /// that are ready to run.  If tasks remain ready and no other threads are running, a thread
755    /// will be woken.  This can end up being a performance win in the case that the queue can be
756    /// cleared without needing to wake any other thread.
757    ///
758    /// # Panics
759    ///
760    /// If called on a single-threaded executor or if this thread is a thread managed by the
761    /// executor.
762    pub fn poll_tasks(&self, callback: impl FnOnce()) {
763        EXECUTOR.with(|e| {
764            assert!(
765                e.borrow_mut().replace(self.root_scope.clone()).is_none(),
766                "This thread is already associated with an executor"
767            );
768        });
769
770        self.inner().poll_tasks(callback);
771
772        EXECUTOR.with(|e| *e.borrow_mut() = None);
773    }
774}
775
776// AtomicFutureHandle can have a lifetime (for local executors we allow the main task to have a
777// non-static lifetime).  The executor doesn't handle this though; the executor just assumes all
778// tasks have the 'static lifetime.  It's up to the local executor to extend the lifetime and make
779// it safe.
780pub type TaskHandle = AtomicFutureHandle<'static>;
781
782thread_local! {
783    static CURRENT_TASK: Cell<*const TaskHandle> = const { Cell::new(std::ptr::null()) };
784}
785
786impl TaskHandle {
787    pub(crate) fn with_current<R>(f: impl FnOnce(Option<&TaskHandle>) -> R) -> R {
788        CURRENT_TASK.with(|cur| {
789            let cur = cur.get();
790            let cur = unsafe { cur.as_ref() };
791            f(cur)
792        })
793    }
794
795    fn set_current_with<R>(task: &TaskHandle, f: impl FnOnce() -> R) -> R {
796        CURRENT_TASK.with(|cur| {
797            cur.set(task);
798            let result = f();
799            cur.set(std::ptr::null());
800            result
801        })
802    }
803}
804
805#[cfg(test)]
806mod tests {
807    use super::{ACTIVE_EXECUTORS, EHandle};
808    use crate::SendExecutorBuilder;
809    use std::sync::Arc;
810    use std::sync::atomic::{AtomicU64, Ordering};
811
812    #[test]
813    fn test_no_leaks() {
814        std::thread::spawn(|| SendExecutorBuilder::new().num_threads(1).build().run(async {}))
815            .join()
816            .unwrap();
817
818        assert_eq!(ACTIVE_EXECUTORS.load(Ordering::Relaxed), 0);
819    }
820
821    #[test]
822    fn poll_tasks() {
823        SendExecutorBuilder::new().num_threads(1).build().run(async {
824            let ehandle = EHandle::local();
825
826            // This will tie up the executor's only running thread which ensures that the task
827            // we spawn below can only run on the foreign thread.
828            std::thread::spawn(move || {
829                let ran = Arc::new(AtomicU64::new(0));
830                ehandle.poll_tasks(|| {
831                    let ran = ran.clone();
832                    ehandle.spawn_detached(async move {
833                        ran.fetch_add(1, Ordering::Relaxed);
834                    });
835                });
836
837                // The spawned task should have run in this thread.
838                assert_eq!(ran.load(Ordering::Relaxed), 1);
839            })
840            .join()
841            .unwrap();
842        });
843    }
844}