fuchsia_async/runtime/fuchsia/executor/
send.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use super::common::{Executor, ExecutorTime, MAIN_TASK_ID};
6use super::scope::ScopeHandle;
7use fuchsia_sync::{Condvar, Mutex};
8
9use futures::FutureExt;
10use std::future::Future;
11use std::sync::atomic::Ordering;
12use std::sync::Arc;
13use std::time::Duration;
14use std::{fmt, thread};
15
16/// A multi-threaded port-based executor for Fuchsia. Requires that tasks scheduled on it
17/// implement `Send` so they can be load balanced between worker threads.
18///
19/// Having a `SendExecutor` in scope allows the creation and polling of zircon objects, such as
20/// [`fuchsia_async::Channel`].
21///
22/// # Panics
23///
24/// `SendExecutor` will panic on drop if any zircon objects attached to it are still alive. In other
25/// words, zircon objects backed by a `SendExecutor` must be dropped before it.
26pub struct SendExecutor {
27    /// The inner executor state.
28    inner: Arc<Executor>,
29    // LINT.IfChange
30    /// The root scope.
31    root_scope: ScopeHandle,
32    // LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
33    /// Worker thread handles
34    threads: Vec<thread::JoinHandle<()>>,
35    worker_init: Option<Arc<dyn Fn() + Send + Sync + 'static>>,
36}
37
38impl fmt::Debug for SendExecutor {
39    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
40        f.debug_struct("SendExecutor").field("port", &self.inner.port).finish()
41    }
42}
43
44impl SendExecutor {
45    /// Create a new multi-threaded executor.
46    #[allow(deprecated)]
47    pub fn new(num_threads: u8) -> Self {
48        Self::new_inner(num_threads, None)
49    }
50
51    /// Set a new worker initialization callback. Will be invoked once at the start of each worker
52    /// thread.
53    pub fn set_worker_init(&mut self, worker_init: impl Fn() + Send + Sync + 'static) {
54        self.worker_init = Some(Arc::new(worker_init) as Arc<dyn Fn() + Send + Sync + 'static>);
55    }
56
57    /// Apply the worker initialization callback to an owned executor, returning the executor.
58    ///
59    /// The initialization callback will be invoked once at the start of each worker thread.
60    pub fn with_worker_init(mut self, worker_init: fn()) -> Self {
61        self.set_worker_init(worker_init);
62        self
63    }
64
65    fn new_inner(
66        num_threads: u8,
67        worker_init: Option<Arc<dyn Fn() + Send + Sync + 'static>>,
68    ) -> Self {
69        let inner =
70            Arc::new(Executor::new(ExecutorTime::RealTime, /* is_local */ false, num_threads));
71        let root_scope = ScopeHandle::root(inner.clone());
72        Executor::set_local(root_scope.clone());
73        Self { inner, root_scope, threads: Vec::default(), worker_init }
74    }
75
76    /// Get a reference to the Fuchsia `zx::Port` being used to listen for events.
77    pub fn port(&self) -> &zx::Port {
78        &self.inner.port
79    }
80
81    /// Run `future` to completion, using this thread and `num_threads` workers in a pool to
82    /// poll active tasks.
83    // The debugger looks for this function on the stack, so if its (fully-qualified) name changes,
84    // the debugger needs to be updated.
85    // LINT.IfChange
86    pub fn run<F>(&mut self, future: F) -> F::Output
87    // LINT.ThenChange(//src/developer/debug/zxdb/console/commands/verb_async_backtrace.cc)
88    where
89        F: Future + Send + 'static,
90        F::Output: Send + 'static,
91    {
92        assert!(self.inner.is_real_time(), "Error: called `run` on an executor using fake time");
93
94        let pair = Arc::new((Mutex::new(None), Condvar::new()));
95        let pair2 = pair.clone();
96
97        // Spawn a future which will set the result upon completion.
98        let task = self.root_scope.new_task(
99            Some(MAIN_TASK_ID),
100            future.map(move |fut_result| {
101                let (lock, cvar) = &*pair2;
102                let mut result = lock.lock();
103                *result = Some(fut_result);
104                cvar.notify_one();
105            }),
106        );
107        task.detach();
108        assert!(self.root_scope.insert_task(task, false));
109
110        // Start worker threads, handing off timers from the current thread.
111        self.inner.done.store(false, Ordering::SeqCst);
112        self.create_worker_threads();
113
114        // Wait until the signal the future has completed.
115        let (lock, cvar) = &*pair;
116        let mut result = lock.lock();
117        if result.is_none() {
118            let mut last_polled = 0;
119            let mut last_tasks_ready = false;
120            loop {
121                // This timeout is chosen to be quite high since it impacts all processes that have
122                // multi-threaded async executors, and it exists to workaround arguably misbehaving
123                // users (see the comment below).
124                cvar.wait_for(&mut result, Duration::from_millis(250));
125                if result.is_some() {
126                    break;
127                }
128                let polled = self.inner.polled.load(Ordering::Relaxed);
129                let tasks_ready = !self.inner.ready_tasks.is_empty();
130                if polled == last_polled && last_tasks_ready && tasks_ready {
131                    // If this log message is printed, it most likely means that a task has blocked
132                    // making a reentrant synchronous call that doesn't involve a port message being
133                    // processed by this same executor. This can arise even if you would expect
134                    // there to normally be other port messages involved. One example (that has
135                    // actually happened): spawn a task to service a fuchsia.io connection, then try
136                    // and synchronously connect to that service. If the task hasn't had a chance to
137                    // run, then the async channel might not be registered with the executor, and so
138                    // sending messages to the channel doesn't trigger a port message. Typically,
139                    // the way to solve these issues is to run the service in a different executor
140                    // (which could be the same or a different process).
141                    eprintln!("Tasks might be stalled!");
142                    self.inner.wake_one_thread();
143                }
144                last_polled = polled;
145                last_tasks_ready = tasks_ready;
146            }
147        }
148
149        // Spin down worker threads
150        self.join_all();
151
152        // Unwrap is fine because of the check to `is_none` above.
153        result.take().unwrap()
154    }
155
156    #[doc(hidden)]
157    /// Returns the root scope of the executor.
158    pub fn root_scope(&self) -> &ScopeHandle {
159        &self.root_scope
160    }
161
162    /// Add `self.num_threads` worker threads to the executor's thread pool.
163    /// `timers`: timers from the "main" thread which would otherwise be lost.
164    fn create_worker_threads(&mut self) {
165        for _ in 0..self.inner.num_threads {
166            let inner = self.inner.clone();
167            let root_scope = self.root_scope.clone();
168            let worker_init = self.worker_init.clone();
169            let thread = thread::Builder::new()
170                .name("executor_worker".to_string())
171                .spawn(move || {
172                    Executor::set_local(root_scope);
173                    if let Some(init) = worker_init.as_ref() {
174                        init();
175                    }
176                    inner.worker_lifecycle::</* UNTIL_STALLED: */ false>();
177                })
178                .expect("must be able to spawn threads");
179            self.threads.push(thread);
180        }
181    }
182
183    fn join_all(&mut self) {
184        self.inner.mark_done();
185
186        // Join the worker threads
187        for thread in self.threads.drain(..) {
188            thread.join().expect("Couldn't join worker thread.");
189        }
190    }
191}
192
193impl Drop for SendExecutor {
194    fn drop(&mut self) {
195        self.join_all();
196        self.inner.on_parent_drop(&self.root_scope);
197    }
198}
199
200// TODO(https://fxbug.dev/42156503) test SendExecutor with unit tests
201
202#[cfg(test)]
203mod tests {
204    use super::SendExecutor;
205    use crate::{Task, Timer};
206
207    use futures::channel::oneshot;
208    use std::sync::atomic::{AtomicU64, Ordering};
209    use std::sync::{Arc, Condvar, Mutex};
210
211    #[test]
212    fn test_stalled_triggers_wake_up() {
213        SendExecutor::new(2).run(async {
214            // The timer will only fire on one thread, so use one so we can get to a point where
215            // only one thread is running.
216            Timer::new(zx::MonotonicDuration::from_millis(10)).await;
217
218            let (tx, rx) = oneshot::channel();
219            let pair = Arc::new((Mutex::new(false), Condvar::new()));
220            let pair2 = pair.clone();
221
222            let _task = Task::spawn(async move {
223                // Send a notification to the other task.
224                tx.send(()).unwrap();
225                // Now block the thread waiting for the result.
226                let (lock, cvar) = &*pair;
227                let mut done = lock.lock().unwrap();
228                while !*done {
229                    done = cvar.wait(done).unwrap();
230                }
231            });
232
233            rx.await.unwrap();
234            let (lock, cvar) = &*pair2;
235            *lock.lock().unwrap() = true;
236            cvar.notify_one();
237        });
238    }
239
240    #[test]
241    fn worker_init_called_once_per_worker() {
242        static NUM_INIT_CALLS: AtomicU64 = AtomicU64::new(0);
243        fn initialize_test_worker() {
244            NUM_INIT_CALLS.fetch_add(1, Ordering::SeqCst);
245        }
246
247        let mut exec = SendExecutor::new(2).with_worker_init(initialize_test_worker);
248        exec.run(async {});
249        assert_eq!(NUM_INIT_CALLS.load(Ordering::SeqCst), 2);
250        exec.run(async {});
251        assert_eq!(NUM_INIT_CALLS.load(Ordering::SeqCst), 4);
252    }
253}