fuchsia_async/runtime/fuchsia/executor/
send.rs1use super::common::{Executor, ExecutorTime};
6use super::scope::ScopeHandle;
7use fuchsia_sync::{Condvar, Mutex};
8
9use crate::runtime::instrument::TaskInstrument;
10use futures::FutureExt;
11use std::future::Future;
12use std::sync::Arc;
13use std::sync::atomic::Ordering;
14use std::time::Duration;
15use std::{fmt, thread};
16
17pub struct SendExecutor {
28 inner: Arc<Executor>,
30 root_scope: ScopeHandle,
33 threads: Vec<thread::JoinHandle<()>>,
36 worker_init: Option<Arc<dyn Fn() + Send + Sync + 'static>>,
37}
38
39impl fmt::Debug for SendExecutor {
40 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
41 f.debug_struct("SendExecutor").field("port", &self.inner.port).finish()
42 }
43}
44
45impl SendExecutor {
46 fn new_inner(
47 num_threads: u8,
48 worker_init: Option<Arc<dyn Fn() + Send + Sync + 'static>>,
49 instrument: Option<Arc<dyn TaskInstrument>>,
50 ) -> Self {
51 let inner = Arc::new(Executor::new(
52 ExecutorTime::RealTime,
53 false,
54 num_threads,
55 instrument,
56 ));
57 let root_scope = ScopeHandle::root(inner.clone());
58 Executor::set_local(root_scope.clone());
59 Self { inner, root_scope, threads: Vec::default(), worker_init }
60 }
61
62 pub fn port(&self) -> &zx::Port {
64 &self.inner.port
65 }
66
67 pub fn run<F>(&mut self, future: F) -> F::Output
73 where
75 F: Future + Send + 'static,
76 F::Output: Send + 'static,
77 {
78 assert!(self.inner.is_real_time(), "Error: called `run` on an executor using fake time");
79
80 let pair = Arc::new((Mutex::new(None), Condvar::new()));
81 let pair2 = pair.clone();
82
83 let task = self.root_scope.new_task(future.map(move |fut_result| {
85 let (lock, cvar) = &*pair2;
86 let mut result = lock.lock();
87 *result = Some(fut_result);
88 cvar.notify_one();
89 }));
90 task.detach();
91 assert!(self.root_scope.insert_task(task, false));
92
93 self.inner.done.store(false, Ordering::SeqCst);
95 self.create_worker_threads();
96
97 let (lock, cvar) = &*pair;
99 let mut result = lock.lock();
100 if result.is_none() {
101 let mut last_polled = 0;
102 let mut last_tasks_ready = false;
103 loop {
104 cvar.wait_for(&mut result, Duration::from_millis(250));
108 if result.is_some() {
109 break;
110 }
111 let polled = self.inner.polled.load(Ordering::Relaxed);
112 let tasks_ready = !self.inner.ready_tasks.is_empty();
113 if polled == last_polled && last_tasks_ready && tasks_ready {
114 eprintln!("Tasks might be stalled!");
125 self.inner.wake_one_thread();
126 }
127 last_polled = polled;
128 last_tasks_ready = tasks_ready;
129 }
130 }
131
132 self.join_all();
134
135 result.take().unwrap()
137 }
138
139 #[doc(hidden)]
140 pub fn root_scope(&self) -> &ScopeHandle {
142 &self.root_scope
143 }
144
145 fn create_worker_threads(&mut self) {
148 for _ in 0..self.inner.num_threads {
149 let inner = self.inner.clone();
150 let root_scope = self.root_scope.clone();
151 let worker_init = self.worker_init.clone();
152 let thread = thread::Builder::new()
153 .name("executor_worker".to_string())
154 .spawn(move || {
155 Executor::set_local(root_scope);
156 if let Some(init) = worker_init.as_ref() {
157 init();
158 }
159 inner.worker_lifecycle::<false>(None);
160 })
161 .expect("must be able to spawn threads");
162 self.threads.push(thread);
163 }
164 }
165
166 fn join_all(&mut self) {
167 self.inner.mark_done();
168
169 for thread in self.threads.drain(..) {
171 thread.join().expect("Couldn't join worker thread.");
172 }
173 }
174}
175
176impl Drop for SendExecutor {
177 fn drop(&mut self) {
178 self.join_all();
179 self.inner.on_parent_drop(&self.root_scope);
180 }
181}
182
183#[derive(Default)]
185pub struct SendExecutorBuilder {
186 num_threads: Option<u8>,
187 worker_init: Option<Arc<dyn Fn() + Send + Sync + 'static>>,
188 instrument: Option<Arc<dyn TaskInstrument>>,
189}
190
191impl SendExecutorBuilder {
192 pub fn new() -> Self {
194 Self::default()
195 }
196
197 pub fn num_threads(mut self, num_threads: u8) -> Self {
199 self.num_threads = Some(num_threads);
200 self
201 }
202
203 pub fn worker_init(mut self, worker_init: impl Fn() + Send + Sync + 'static) -> Self {
205 self.worker_init = Some(Arc::new(worker_init));
206 self
207 }
208
209 pub fn instrument(mut self, instrument: Option<Arc<dyn TaskInstrument>>) -> Self {
211 self.instrument = instrument;
212 self
213 }
214
215 pub fn build(self) -> SendExecutor {
217 SendExecutor::new_inner(self.num_threads.unwrap_or(1), self.worker_init, self.instrument)
218 }
219}
220
221#[cfg(test)]
224mod tests {
225 use super::SendExecutorBuilder;
226 use crate::{Task, Timer};
227
228 use fuchsia_sync::{Condvar, Mutex};
229 use futures::channel::oneshot;
230 use std::sync::Arc;
231 use std::sync::atomic::{AtomicU64, Ordering};
232
233 #[test]
234 fn test_stalled_triggers_wake_up() {
235 SendExecutorBuilder::new().num_threads(2).build().run(async {
236 Timer::new(zx::MonotonicDuration::from_millis(10)).await;
239
240 let (tx, rx) = oneshot::channel();
241 let pair = Arc::new((Mutex::new(false), Condvar::new()));
242 let pair2 = pair.clone();
243
244 let _task = Task::spawn(async move {
245 tx.send(()).unwrap();
247 let (lock, cvar) = &*pair;
249 let mut done = lock.lock();
250 while !*done {
251 cvar.wait(&mut done);
252 }
253 });
254
255 rx.await.unwrap();
256 let (lock, cvar) = &*pair2;
257 *lock.lock() = true;
258 cvar.notify_one();
259 });
260 }
261
262 #[test]
263 fn worker_init_called_once_per_worker() {
264 static NUM_INIT_CALLS: AtomicU64 = AtomicU64::new(0);
265 fn initialize_test_worker() {
266 NUM_INIT_CALLS.fetch_add(1, Ordering::SeqCst);
267 }
268
269 let mut exec =
270 SendExecutorBuilder::new().num_threads(2).worker_init(initialize_test_worker).build();
271 exec.run(async {});
272 assert_eq!(NUM_INIT_CALLS.load(Ordering::SeqCst), 2);
273 exec.run(async {});
274 assert_eq!(NUM_INIT_CALLS.load(Ordering::SeqCst), 4);
275 }
276}