fuchsia_async/runtime/fuchsia/executor/
common.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use super::super::timer::Timers;
6use super::atomic_future::{AtomicFutureHandle, AttemptPollResult};
7use super::packets::{PacketReceiver, PacketReceiverMap, ReceiverRegistration};
8use super::scope::ScopeHandle;
9use super::time::{BootInstant, MonotonicInstant};
10use crossbeam::queue::SegQueue;
11use fuchsia_sync::Mutex;
12use zx::BootDuration;
13
14use std::any::Any;
15use std::cell::{Cell, RefCell};
16use std::future::Future;
17use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU32, AtomicU64, AtomicUsize, Ordering};
18use std::sync::Arc;
19use std::task::Context;
20use std::{fmt, u64, usize};
21
22pub(crate) const TASK_READY_WAKEUP_ID: u64 = u64::MAX - 1;
23
24/// The id of the main task, which is a virtual task that lives from construction
25/// to destruction of the executor. The main task may correspond to multiple
26/// main futures, in cases where the executor runs multiple times during its lifetime.
27pub(crate) const MAIN_TASK_ID: usize = 0;
28
29thread_local!(
30    static EXECUTOR: RefCell<Option<ScopeHandle>> = const { RefCell::new(None) }
31);
32
33pub enum ExecutorTime {
34    RealTime,
35    /// Fake readings used in tests.
36    FakeTime {
37        // The fake monotonic clock reading.
38        mono_reading_ns: AtomicI64,
39        // An offset to add to mono_reading_ns to get the reading of the boot
40        // clock, disregarding the difference in timelines.
41        //
42        // We disregard the fact that the reading and offset can not be
43        // read atomically, this is usually not relevant in tests.
44        mono_to_boot_offset_ns: AtomicI64,
45    },
46}
47
48enum PollReadyTasksResult {
49    NoneReady,
50    MoreReady,
51    MainTaskCompleted,
52}
53
54///  24           16           8            0
55///  +------------+------------+------------+
56///  |  foreign   |  notified  |  sleeping  |
57///  +------------+------------+------------+
58///
59///  sleeping : the number of threads sleeping
60///  notified : the number of notifications posted to wake sleeping threads
61///  foreign  : the number of foreign threads processing tasks
62#[derive(Clone, Copy, Eq, PartialEq)]
63struct ThreadsState(u32);
64
65impl ThreadsState {
66    const fn sleeping(&self) -> u8 {
67        self.0 as u8
68    }
69
70    const fn notified(&self) -> u8 {
71        (self.0 >> 8) as u8
72    }
73
74    const fn with_sleeping(self, sleeping: u8) -> Self {
75        Self((self.0 & !0xff) | sleeping as u32)
76    }
77
78    const fn with_notified(self, notified: u8) -> Self {
79        Self(self.0 & !0xff00 | (notified as u32) << 8)
80    }
81
82    const fn with_foreign(self, foreign: u8) -> Self {
83        Self(self.0 & !0xff0000 | (foreign as u32) << 16)
84    }
85}
86
87#[cfg(test)]
88static ACTIVE_EXECUTORS: AtomicUsize = AtomicUsize::new(0);
89
90pub(crate) struct Executor {
91    pub(super) port: zx::Port,
92    monotonic_timers: Arc<Timers<MonotonicInstant>>,
93    boot_timers: Arc<Timers<BootInstant>>,
94    pub(super) done: AtomicBool,
95    is_local: bool,
96    receivers: Mutex<PacketReceiverMap<Arc<dyn PacketReceiver>>>,
97    task_count: AtomicUsize,
98    pub(super) ready_tasks: SegQueue<TaskHandle>,
99    time: ExecutorTime,
100    // The low byte is the number of threads currently sleeping. The high byte is the number of
101    // of wake-up notifications pending.
102    pub(super) threads_state: AtomicU32,
103    pub(super) num_threads: u8,
104    pub(super) polled: AtomicU64,
105    // Data that belongs to the user that can be accessed via EHandle::local(). See
106    // `TestExecutor::poll_until_stalled`.
107    pub(super) owner_data: Mutex<Option<Box<dyn Any + Send>>>,
108}
109
110impl Executor {
111    pub fn new(time: ExecutorTime, is_local: bool, num_threads: u8) -> Self {
112        #[cfg(test)]
113        ACTIVE_EXECUTORS.fetch_add(1, Ordering::Relaxed);
114
115        let mut receivers: PacketReceiverMap<Arc<dyn PacketReceiver>> = PacketReceiverMap::new();
116
117        // Is this a fake-time executor?
118        let is_fake = matches!(
119            time,
120            ExecutorTime::FakeTime { mono_reading_ns: _, mono_to_boot_offset_ns: _ }
121        );
122        let monotonic_timers = receivers.insert(|key| {
123            let timers = Arc::new(Timers::<MonotonicInstant>::new(key, is_fake));
124            (timers.clone(), timers)
125        });
126        let boot_timers = receivers.insert(|key| {
127            let timers = Arc::new(Timers::<BootInstant>::new(key, is_fake));
128            (timers.clone(), timers)
129        });
130
131        Executor {
132            port: zx::Port::create(),
133            monotonic_timers,
134            boot_timers,
135            done: AtomicBool::new(false),
136            is_local,
137            receivers: Mutex::new(receivers),
138            task_count: AtomicUsize::new(MAIN_TASK_ID + 1),
139            ready_tasks: SegQueue::new(),
140            time,
141            threads_state: AtomicU32::new(0),
142            num_threads,
143            polled: AtomicU64::new(0),
144            owner_data: Mutex::new(None),
145        }
146    }
147
148    pub fn set_local(root_scope: ScopeHandle) {
149        EXECUTOR.with(|e| {
150            let mut e = e.borrow_mut();
151            assert!(e.is_none(), "Cannot create multiple Fuchsia Executors");
152            *e = Some(root_scope);
153        });
154    }
155
156    fn poll_ready_tasks(&self) -> PollReadyTasksResult {
157        loop {
158            for _ in 0..16 {
159                let Some(task) = self.ready_tasks.pop() else {
160                    return PollReadyTasksResult::NoneReady;
161                };
162                let task_id = task.id();
163                let complete = self.try_poll(task);
164                if complete && task_id == MAIN_TASK_ID {
165                    return PollReadyTasksResult::MainTaskCompleted;
166                }
167                self.polled.fetch_add(1, Ordering::Relaxed);
168            }
169            // We didn't finish all the ready tasks. If there are sleeping threads, post a
170            // notification to wake one up.
171            let mut threads_state = ThreadsState(self.threads_state.load(Ordering::Relaxed));
172            loop {
173                if threads_state.sleeping() == 0 {
174                    // All threads are awake now. Prevent starvation.
175                    return PollReadyTasksResult::MoreReady;
176                }
177                if threads_state.notified() >= threads_state.sleeping() {
178                    // All sleeping threads have been notified. Keep going and poll more tasks.
179                    break;
180                }
181                match self.try_notify(threads_state) {
182                    Ok(()) => break,
183                    Err(s) => threads_state = s,
184                }
185            }
186        }
187    }
188
189    pub fn is_local(&self) -> bool {
190        self.is_local
191    }
192
193    pub fn next_task_id(&self) -> usize {
194        self.task_count.fetch_add(1, Ordering::Relaxed)
195    }
196
197    pub fn notify_task_ready(&self) {
198        // Only post if there's no thread running (or soon to be running). If we happen to be
199        // running on a thread for this executor, then threads_state won't be equal to num_threads,
200        // which means notifications only get fired if this is from a non-async thread, or a thread
201        // that belongs to a different executor. We use SeqCst ordering here to make sure this load
202        // happens *after* the change to ready_tasks and to synchronize with worker_lifecycle.
203        let mut threads_state = ThreadsState(self.threads_state.load(Ordering::SeqCst));
204
205        // We only want to notify if there are no pending notifications and there are no other
206        // threads running.
207        while threads_state == ThreadsState(0).with_sleeping(self.num_threads) {
208            match self.try_notify(threads_state) {
209                Ok(()) => break,
210                Err(s) => threads_state = s,
211            }
212        }
213    }
214
215    /// Tries to notify a thread to wake up. Returns threads_state if it fails.
216    fn try_notify(&self, old_threads_state: ThreadsState) -> Result<(), ThreadsState> {
217        self.threads_state
218            .compare_exchange_weak(
219                old_threads_state.0,
220                old_threads_state.0 + ThreadsState(0).with_notified(1).0,
221                Ordering::Relaxed,
222                Ordering::Relaxed,
223            )
224            .map(|_| self.notify_id(TASK_READY_WAKEUP_ID))
225            .map_err(ThreadsState)
226    }
227
228    pub fn wake_one_thread(&self) {
229        let mut threads_state = ThreadsState(self.threads_state.load(Ordering::Relaxed));
230        let current_sleeping = threads_state.sleeping();
231        if current_sleeping == 0 {
232            return;
233        }
234        while threads_state.notified() == 0 && threads_state.sleeping() >= current_sleeping {
235            match self.try_notify(threads_state) {
236                Ok(()) => break,
237                Err(s) => threads_state = s,
238            }
239        }
240    }
241
242    pub fn notify_id(&self, id: u64) {
243        let up = zx::UserPacket::from_u8_array([0; 32]);
244        let packet = zx::Packet::from_user_packet(id, 0 /* status??? */, up);
245        if let Err(e) = self.port.queue(&packet) {
246            // TODO: logging
247            eprintln!("Failed to queue notify in port: {:?}", e);
248        }
249    }
250
251    pub fn deliver_packet(&self, key: u64, packet: zx::Packet) {
252        let receiver = match self.receivers.lock().get(key) {
253            // Clone the `Arc` so that we don't hold the lock
254            // any longer than absolutely necessary.
255            // The `receive_packet` impl may be arbitrarily complex.
256            Some(receiver) => receiver.clone(),
257            None => return,
258        };
259        receiver.receive_packet(packet);
260    }
261
262    /// Returns the current reading of the monotonic clock.
263    ///
264    /// For test executors running in fake time, returns the reading of the
265    /// fake monotonic clock.
266    pub fn now(&self) -> MonotonicInstant {
267        match &self.time {
268            ExecutorTime::RealTime => MonotonicInstant::from_zx(zx::MonotonicInstant::get()),
269            ExecutorTime::FakeTime { mono_reading_ns: t, .. } => {
270                MonotonicInstant::from_nanos(t.load(Ordering::Relaxed))
271            }
272        }
273    }
274
275    /// Returns the current reading of the boot clock.
276    ///
277    /// For test executors running in fake time, returns the reading of the
278    /// fake boot clock.
279    pub fn boot_now(&self) -> BootInstant {
280        match &self.time {
281            ExecutorTime::RealTime => BootInstant::from_zx(zx::BootInstant::get()),
282
283            ExecutorTime::FakeTime { mono_reading_ns: t, mono_to_boot_offset_ns } => {
284                // The two atomic values are loaded one after the other. This should
285                // not normally be an issue in tests.
286                let fake_mono_now = MonotonicInstant::from_nanos(t.load(Ordering::Relaxed));
287                let boot_offset_ns = mono_to_boot_offset_ns.load(Ordering::Relaxed);
288                BootInstant::from_nanos(fake_mono_now.into_nanos() + boot_offset_ns)
289            }
290        }
291    }
292
293    /// Sets the reading of the fake monotonic clock.
294    ///
295    /// # Panics
296    ///
297    /// If called on an executor that runs in real time.
298    pub fn set_fake_time(&self, new: MonotonicInstant) {
299        let boot_offset_ns = match &self.time {
300            ExecutorTime::RealTime => {
301                panic!("Error: called `set_fake_time` on an executor using actual time.")
302            }
303            ExecutorTime::FakeTime { mono_reading_ns: t, mono_to_boot_offset_ns } => {
304                t.store(new.into_nanos(), Ordering::Relaxed);
305                mono_to_boot_offset_ns.load(Ordering::Relaxed)
306            }
307        };
308        self.monotonic_timers.maybe_notify(new);
309
310        // Changing fake time also affects boot time.  Notify boot clocks as well.
311        let new_boot_time = BootInstant::from_nanos(new.into_nanos() + boot_offset_ns);
312        self.boot_timers.maybe_notify(new_boot_time);
313    }
314
315    // Sets a new offset between boot and monotonic time.
316    //
317    // Only works for executors operating in fake time.
318    // The change in the fake offset will wake expired boot timers.
319    pub fn set_fake_boot_to_mono_offset(&self, offset: BootDuration) {
320        let mono_now_ns = match &self.time {
321            ExecutorTime::RealTime => {
322                panic!("Error: called `set_fake_boot_to_mono_offset` on an executor using actual time.")
323            }
324            ExecutorTime::FakeTime { mono_reading_ns: t, mono_to_boot_offset_ns: b } => {
325                // We ignore the non-atomic update between b and t, it is likely
326                // not relevant in tests.
327                b.store(offset.into_nanos(), Ordering::Relaxed);
328                t.load(Ordering::Relaxed)
329            }
330        };
331        let new_boot_now = BootInstant::from_nanos(mono_now_ns) + offset;
332        self.boot_timers.maybe_notify(new_boot_now);
333    }
334
335    /// Returns `true` if this executor is running in real time.  Returns
336    /// `false` if this executor si running in fake time.
337    pub fn is_real_time(&self) -> bool {
338        matches!(self.time, ExecutorTime::RealTime)
339    }
340
341    /// Must be called before `on_parent_drop`.
342    ///
343    /// Done flag must be set before dropping packet receivers
344    /// so that future receivers that attempt to deregister themselves
345    /// know that it's okay if their entries are already missing.
346    pub fn mark_done(&self) {
347        self.done.store(true, Ordering::SeqCst);
348
349        // Make sure there's at least one notification outstanding per thread to wake up all
350        // workers. This might be more notifications than required, but this way we don't have to
351        // worry about races where tasks are just about to sleep; when a task receives the
352        // notification, it will check done and terminate.
353        let mut threads_state = ThreadsState(self.threads_state.load(Ordering::Relaxed));
354        let num_threads = self.num_threads;
355        loop {
356            let notified = threads_state.notified();
357            if notified >= num_threads {
358                break;
359            }
360            match self.threads_state.compare_exchange_weak(
361                threads_state.0,
362                threads_state.with_notified(num_threads).0,
363                Ordering::Relaxed,
364                Ordering::Relaxed,
365            ) {
366                Ok(_) => {
367                    for _ in notified..num_threads {
368                        self.notify_id(TASK_READY_WAKEUP_ID);
369                    }
370                    return;
371                }
372                Err(old) => threads_state = ThreadsState(old),
373            }
374        }
375    }
376
377    /// Notes about the lifecycle of an Executor.
378    ///
379    /// a) The Executor stands as the only way to run a reactor based on a Fuchsia port, but the
380    /// lifecycle of the port itself is not currently tied to it. Executor vends clones of its
381    /// inner Arc structure to all receivers, so we don't have a type-safe way of ensuring that
382    /// the port is dropped alongside the Executor as it should.
383    /// TODO(https://fxbug.dev/42154828): Ensure the port goes away with the executor.
384    ///
385    /// b) The Executor's lifetime is also tied to the thread-local variable pointing to the
386    /// "current" executor being set, and that's unset when the executor is dropped.
387    ///
388    /// Point (a) is related to "what happens if I use a receiver after the executor is dropped",
389    /// and point (b) is related to "what happens when I try to create a new receiver when there
390    /// is no executor".
391    ///
392    /// Tokio, for example, encodes the lifetime of the reactor separately from the thread-local
393    /// storage [1]. And the reactor discourages usage of strong references to it by vending weak
394    /// references to it [2] instead of strong.
395    ///
396    /// There are pros and cons to both strategies. For (a), tokio encourages (but doesn't
397    /// enforce [3]) type-safety by vending weak pointers, but those add runtime overhead when
398    /// upgrading pointers. For (b) the difference mostly stand for "when is it safe to use IO
399    /// objects/receivers". Tokio says it's only safe to use them whenever a guard is in scope.
400    /// Fuchsia-async says it's safe to use them when a fuchsia_async::Executor is still in scope
401    /// in that thread.
402    ///
403    /// This acts as a prelude to the panic encoded in Executor::drop when receivers haven't
404    /// unregistered themselves when the executor drops. The choice to panic was made based on
405    /// patterns in fuchsia-async that may come to change:
406    ///
407    /// - Executor vends strong references to itself and those references are *stored* by most
408    /// receiver implementations (as opposed to reached out on TLS every time).
409    /// - Fuchsia-async objects return zx::Status on wait calls, there isn't an appropriate and
410    /// easy to understand error to return when polling on an extinct executor.
411    /// - All receivers are implemented in this crate and well-known.
412    ///
413    /// [1]: https://docs.rs/tokio/1.5.0/tokio/runtime/struct.Runtime.html#method.enter
414    /// [2]: https://github.com/tokio-rs/tokio/blob/b42f21ec3e212ace25331d0c13889a45769e6006/tokio/src/signal/unix/driver.rs#L35
415    /// [3]: by returning an upgraded Arc, tokio trusts callers to not "use it for too long", an
416    /// opaque non-clone-copy-or-send guard would be stronger than this. See:
417    /// https://github.com/tokio-rs/tokio/blob/b42f21ec3e212ace25331d0c13889a45769e6006/tokio/src/io/driver/mod.rs#L297
418    pub fn on_parent_drop(&self, root_scope: &ScopeHandle) {
419        // Drop all tasks.
420        // Any use of fasync::unblock can involve a waker. Wakers hold weak references to tasks, but
421        // as part of waking, there's an upgrade to a strong reference, so for a small amount of
422        // time `fasync::unblock` can hold a strong reference to a task which in turn holds the
423        // future for the task which in turn could hold references to receivers, which, if we did
424        // nothing about it, would trip the assertion below. For that reason, we forcibly drop the
425        // task futures here.
426        root_scope.drop_all_tasks();
427
428        // Drop all of the uncompleted tasks
429        while let Some(_) = self.ready_tasks.pop() {}
430
431        // Unregister the timer receivers so that we can perform the check below.
432        self.receivers.lock().remove(self.monotonic_timers.port_key());
433        self.receivers.lock().remove(self.boot_timers.port_key());
434
435        // Do not allow any receivers to outlive the executor. That's very likely a bug waiting to
436        // happen. See discussion above.
437        //
438        // If you're here because you hit this panic check your code for:
439        //
440        // - A struct that contains a fuchsia_async::Executor NOT in the last position (last
441        // position gets dropped last: https://doc.rust-lang.org/reference/destructors.html).
442        //
443        // - A function scope that contains a fuchsia_async::Executor NOT in the first position
444        // (first position in function scope gets dropped last:
445        // https://doc.rust-lang.org/reference/destructors.html?highlight=scope#drop-scopes).
446        //
447        // - A function that holds a `fuchsia_async::Executor` in scope and whose last statement
448        // contains a temporary (temporaries are dropped after the function scope:
449        // https://doc.rust-lang.org/reference/destructors.html#temporary-scopes). This usually
450        // looks like a `match` statement at the end of the function without a semicolon.
451        //
452        // - Storing channel and FIDL objects in static variables.
453        //
454        // - fuchsia_async::unblock calls that move channels or FIDL objects to another thread.
455        assert!(
456            self.receivers.lock().mapping.is_empty(),
457            "receivers must not outlive their executor"
458        );
459
460        // Remove the thread-local executor set in `new`.
461        EHandle::rm_local();
462    }
463
464    // The debugger looks for this function on the stack, so if its (fully-qualified) name changes,
465    // the debugger needs to be updated.
466    // LINT.IfChange
467    pub fn worker_lifecycle<const UNTIL_STALLED: bool>(self: &Arc<Executor>) {
468        // LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
469        loop {
470            // Keep track of whether we are considered asleep.
471            let mut sleeping = false;
472
473            match self.poll_ready_tasks() {
474                PollReadyTasksResult::NoneReady => {
475                    // No more tasks, indicate we are sleeping. We use SeqCst ordering because we
476                    // want this change here to happen *before* we check ready_tasks below. This
477                    // synchronizes with notify_task_ready which is called *after* a task is added
478                    // to ready_tasks.
479                    const ONE_SLEEPING: ThreadsState = ThreadsState(0).with_sleeping(1);
480                    self.threads_state.fetch_add(ONE_SLEEPING.0, Ordering::SeqCst);
481                    // Check ready tasks again. If a task got posted, wake up. This has to be done
482                    // because a notification won't get sent if there is at least one active thread
483                    // so there's a window between the preceding two lines where a task could be
484                    // made ready and a notification is not sent because it looks like there is at
485                    // least one thread running.
486                    if self.ready_tasks.is_empty() {
487                        sleeping = true;
488                    } else {
489                        // We lost a race, we're no longer sleeping.
490                        self.threads_state.fetch_sub(ONE_SLEEPING.0, Ordering::Relaxed);
491                    }
492                }
493                PollReadyTasksResult::MoreReady => {}
494                PollReadyTasksResult::MainTaskCompleted => return,
495            }
496
497            // Check done here after updating threads_state to avoid shutdown races.
498            if self.done.load(Ordering::SeqCst) {
499                return;
500            }
501
502            enum Work {
503                None,
504                Packet(zx::Packet),
505                Stalled,
506            }
507
508            let mut notified = false;
509            let work = {
510                // If we're considered awake choose INFINITE_PAST which will make the wait call
511                // return immediately.  Otherwise, wait until a packet arrives.
512                let deadline = if !sleeping || UNTIL_STALLED {
513                    zx::Instant::INFINITE_PAST
514                } else {
515                    zx::Instant::INFINITE
516                };
517
518                match self.port.wait(deadline) {
519                    Ok(packet) => {
520                        if packet.key() == TASK_READY_WAKEUP_ID {
521                            notified = true;
522                            Work::None
523                        } else {
524                            Work::Packet(packet)
525                        }
526                    }
527                    Err(zx::Status::TIMED_OUT) => {
528                        if !UNTIL_STALLED || !sleeping {
529                            Work::None
530                        } else {
531                            Work::Stalled
532                        }
533                    }
534                    Err(status) => {
535                        panic!("Error calling port wait: {:?}", status);
536                    }
537                }
538            };
539
540            let threads_state_sub =
541                ThreadsState(0).with_sleeping(sleeping as u8).with_notified(notified as u8);
542            if threads_state_sub.0 > 0 {
543                self.threads_state.fetch_sub(threads_state_sub.0, Ordering::Relaxed);
544            }
545
546            match work {
547                Work::Packet(packet) => {
548                    self.deliver_packet(packet.key(), packet);
549                }
550                Work::None => {}
551                Work::Stalled => return,
552            }
553        }
554    }
555
556    /// Drops the main task.
557    ///
558    /// # Safety
559    ///
560    /// The caller must guarantee that the executor isn't running.
561    pub(super) unsafe fn drop_main_task(&self, root_scope: &ScopeHandle) {
562        root_scope.drop_task_unchecked(MAIN_TASK_ID);
563    }
564
565    fn try_poll(&self, task: TaskHandle) -> bool {
566        let task_waker = task.waker();
567        let poll_result = TaskHandle::set_current_with(&task, || {
568            task.try_poll(&mut Context::from_waker(&task_waker))
569        });
570        match poll_result {
571            AttemptPollResult::Yield => {
572                self.ready_tasks.push(task);
573                false
574            }
575            AttemptPollResult::IFinished | AttemptPollResult::Cancelled => {
576                task.scope().task_did_finish(task.id());
577                true
578            }
579            _ => false,
580        }
581    }
582
583    /// Returns the monotonic timers.
584    pub fn monotonic_timers(&self) -> &Timers<MonotonicInstant> {
585        &self.monotonic_timers
586    }
587
588    /// Returns the boot timers.
589    pub fn boot_timers(&self) -> &Timers<BootInstant> {
590        &self.boot_timers
591    }
592
593    fn poll_tasks(&self, callback: impl FnOnce()) {
594        assert!(!self.is_local);
595
596        // Increment the count of foreign threads.
597        const ONE_FOREIGN: ThreadsState = ThreadsState(0).with_foreign(1);
598        self.threads_state.fetch_add(ONE_FOREIGN.0, Ordering::Relaxed);
599
600        callback();
601
602        // Poll up to 16 tasks.
603        for _ in 0..16 {
604            let Some(task) = self.ready_tasks.pop() else {
605                break;
606            };
607            let task_id = task.id();
608            if self.try_poll(task) && task_id == MAIN_TASK_ID {
609                break;
610            }
611            self.polled.fetch_add(1, Ordering::Relaxed);
612        }
613
614        let mut threads_state = ThreadsState(
615            self.threads_state.fetch_sub(ONE_FOREIGN.0, Ordering::SeqCst) - ONE_FOREIGN.0,
616        );
617
618        if !self.ready_tasks.is_empty() {
619            // There are tasks still ready to run, so wake up a thread if all the other threads are
620            // sleeping.
621            while threads_state == ThreadsState(0).with_sleeping(self.num_threads) {
622                match self.try_notify(threads_state) {
623                    Ok(()) => break,
624                    Err(s) => threads_state = s,
625                }
626            }
627        }
628    }
629
630    pub fn task_is_ready(&self, task: TaskHandle) {
631        self.ready_tasks.push(task);
632        self.notify_task_ready();
633    }
634}
635
636#[cfg(test)]
637impl Drop for Executor {
638    fn drop(&mut self) {
639        ACTIVE_EXECUTORS.fetch_sub(1, Ordering::Relaxed);
640    }
641}
642
643/// A handle to an executor.
644#[derive(Clone)]
645pub struct EHandle {
646    // LINT.IfChange
647    pub(super) root_scope: ScopeHandle,
648    // LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
649}
650
651impl fmt::Debug for EHandle {
652    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
653        f.debug_struct("EHandle").field("port", &self.inner().port).finish()
654    }
655}
656
657impl EHandle {
658    /// Returns the thread-local executor.
659    ///
660    /// # Panics
661    ///
662    /// If called outside the context of an active async executor.
663    pub fn local() -> Self {
664        let root_scope = EXECUTOR
665            .with(|e| e.borrow().as_ref().map(|x| x.clone()))
666            .expect("Fuchsia Executor must be created first");
667
668        EHandle { root_scope }
669    }
670
671    pub(super) fn rm_local() {
672        EXECUTOR.with(|e| *e.borrow_mut() = None);
673    }
674
675    /// The root scope of the executor.
676    ///
677    /// This can be used to spawn tasks that live as long as the executor, and
678    /// to create shorter-lived child scopes.
679    ///
680    /// Most users should create an owned scope with
681    /// [`Scope::new_with_name`][crate::Scope::new_with_name] instead of using this method.
682    pub fn global_scope(&self) -> &ScopeHandle {
683        &self.root_scope
684    }
685
686    /// Get a reference to the Fuchsia `zx::Port` being used to listen for events.
687    pub fn port(&self) -> &zx::Port {
688        &self.inner().port
689    }
690
691    /// Registers a `PacketReceiver` with the executor and returns a registration.
692    /// The `PacketReceiver` will be deregistered when the `Registration` is dropped.
693    pub fn register_receiver<T>(&self, receiver: Arc<T>) -> ReceiverRegistration<T>
694    where
695        T: PacketReceiver,
696    {
697        self.inner().receivers.lock().insert(|key| {
698            (receiver.clone(), ReceiverRegistration { ehandle: self.clone(), key, receiver })
699        })
700    }
701
702    #[inline(always)]
703    pub(crate) fn inner(&self) -> &Arc<Executor> {
704        &self.root_scope.executor()
705    }
706
707    pub(crate) fn deregister_receiver(&self, key: u64) {
708        let mut lock = self.inner().receivers.lock();
709        if lock.contains(key) {
710            lock.remove(key);
711        } else {
712            // The executor is shutting down and already removed the entry.
713            assert!(self.inner().done.load(Ordering::SeqCst), "Missing receiver to deregister");
714        }
715    }
716
717    /// Spawn a new task to be run on this executor.
718    ///
719    /// Tasks spawned using this method must be thread-safe (implement the `Send` trait), as they
720    /// may be run on either a singlethreaded or multithreaded executor.
721    pub fn spawn_detached(&self, future: impl Future<Output = ()> + Send + 'static) {
722        self.global_scope().spawn(future);
723    }
724
725    /// Spawn a new task to be run on this executor.
726    ///
727    /// This is similar to the `spawn_detached` method, but tasks spawned using this method do not
728    /// have to be threads-safe (implement the `Send` trait). In return, this method requires that
729    /// this executor is a LocalExecutor.
730    pub fn spawn_local_detached(&self, future: impl Future<Output = ()> + 'static) {
731        self.global_scope().spawn_local(future);
732    }
733
734    pub(crate) fn mono_timers(&self) -> &Arc<Timers<MonotonicInstant>> {
735        &self.inner().monotonic_timers
736    }
737
738    pub(crate) fn boot_timers(&self) -> &Arc<Timers<BootInstant>> {
739        &self.inner().boot_timers
740    }
741
742    /// Calls `callback` in the context of the executor and then polls (a limited number of) tasks
743    /// that are ready to run.  If tasks remain ready and no other threads are running, a thread
744    /// will be woken.  This can end up being a performance win in the case that the queue can be
745    /// cleared without needing to wake any other thread.
746    ///
747    /// # Panics
748    ///
749    /// If called on a single-threaded executor or if this thread is a thread managed by the
750    /// executor.
751    pub fn poll_tasks(&self, callback: impl FnOnce()) {
752        EXECUTOR.with(|e| {
753            assert!(
754                e.borrow_mut().replace(self.root_scope.clone()).is_none(),
755                "This thread is already associated with an executor"
756            );
757        });
758
759        self.inner().poll_tasks(callback);
760
761        EXECUTOR.with(|e| *e.borrow_mut() = None);
762    }
763}
764
765// AtomicFutureHandle can have a lifetime (for local executors we allow the main task to have a
766// non-static lifetime).  The executor doesn't handle this though; the executor just assumes all
767// tasks have the 'static lifetime.  It's up to the local executor to extend the lifetime and make
768// it safe.
769pub type TaskHandle = AtomicFutureHandle<'static>;
770
771thread_local! {
772    static CURRENT_TASK: Cell<*const TaskHandle> = const { Cell::new(std::ptr::null()) };
773}
774
775impl TaskHandle {
776    pub(crate) fn with_current<R>(f: impl FnOnce(Option<&TaskHandle>) -> R) -> R {
777        CURRENT_TASK.with(|cur| {
778            let cur = cur.get();
779            let cur = unsafe { cur.as_ref() };
780            f(cur)
781        })
782    }
783
784    fn set_current_with<R>(task: &TaskHandle, f: impl FnOnce() -> R) -> R {
785        CURRENT_TASK.with(|cur| {
786            cur.set(task);
787            let result = f();
788            cur.set(std::ptr::null());
789            result
790        })
791    }
792}
793
794#[cfg(test)]
795mod tests {
796    use super::{EHandle, ACTIVE_EXECUTORS};
797    use crate::SendExecutor;
798    use std::sync::atomic::{AtomicU64, Ordering};
799    use std::sync::Arc;
800
801    #[test]
802    fn test_no_leaks() {
803        std::thread::spawn(|| SendExecutor::new(1).run(async {})).join().unwrap();
804
805        assert_eq!(ACTIVE_EXECUTORS.load(Ordering::Relaxed), 0);
806    }
807
808    #[test]
809    fn poll_tasks() {
810        SendExecutor::new(1).run(async {
811            let ehandle = EHandle::local();
812
813            // This will tie up the executor's only running thread which ensures that the task
814            // we spawn below can only run on the foreign thread.
815            std::thread::spawn(move || {
816                let ran = Arc::new(AtomicU64::new(0));
817                ehandle.poll_tasks(|| {
818                    let ran = ran.clone();
819                    ehandle.spawn_detached(async move {
820                        ran.fetch_add(1, Ordering::Relaxed);
821                    });
822                });
823
824                // The spawned task should have run in this thread.
825                assert_eq!(ran.load(Ordering::Relaxed), 1);
826            })
827            .join()
828            .unwrap();
829        });
830    }
831}