1use crate::TimeoutExt;
6use futures::prelude::*;
7use std::pin::Pin;
8use std::sync::atomic::{AtomicUsize, Ordering};
9#[cfg(target_os = "fuchsia")]
10use std::task::Poll;
11use std::time::Duration;
12
13macro_rules! apply_timeout {
17 ($config:expr, $test:expr) => {{
18 let timeout = $config.timeout;
19 let test = $test;
20 move |run| {
21 let test = test(run);
22 async move {
23 if let Some(timeout) = timeout {
24 test.on_timeout(timeout, || panic!("timeout on run {}", run)).await
25 } else {
26 test.await
27 }
28 }
29 }
30 }};
31}
32
33pub trait TestResult: Sized {
35 fn run_singlethreaded(
37 test: &(dyn Sync + Fn(usize) -> Pin<Box<dyn Future<Output = Self>>>),
38 cfg: Config,
39 ) -> Self;
40
41 #[cfg(target_os = "fuchsia")]
43 fn run_until_stalled<
44 F: 'static + Sync + Fn(usize) -> Fut,
45 Fut: 'static + Future<Output = Self>,
46 >(
47 fake_time: bool,
48 test: F,
49 cfg: Config,
50 ) -> Self;
51
52 fn is_ok(&self) -> bool;
54}
55
56pub trait MultithreadedTestResult: Sized {
58 fn run<F: 'static + Sync + Fn(usize) -> Fut, Fut: 'static + Send + Future<Output = Self>>(
60 test: F,
61 threads: u8,
62 cfg: Config,
63 ) -> Self;
64
65 fn is_ok(&self) -> bool;
67}
68
69impl<E: Send + 'static + std::fmt::Debug> TestResult for Result<(), E> {
70 fn run_singlethreaded(
71 test: &(dyn Sync + Fn(usize) -> Pin<Box<dyn Future<Output = Self>>>),
72 cfg: Config,
73 ) -> Self {
74 cfg.run(1, |run| crate::LocalExecutor::new().run_singlethreaded(test(run)))
75 }
76
77 #[cfg(target_os = "fuchsia")]
78 fn run_until_stalled<
79 F: 'static + Sync + Fn(usize) -> Fut,
80 Fut: 'static + Future<Output = Self>,
81 >(
82 fake_time: bool,
83 test: F,
84 cfg: Config,
85 ) -> Self {
86 let test = apply_timeout!(cfg, |run| test(run));
87 cfg.run(1, |run| {
88 let mut executor = if fake_time {
89 crate::TestExecutor::new_with_fake_time()
90 } else {
91 crate::TestExecutor::new()
92 };
93 match executor.run_until_stalled(&mut std::pin::pin!(test(run))) {
94 Poll::Ready(result) => result,
95 Poll::Pending => panic!(
96 "Stalled without completing. Consider using \"run_singlethreaded\", or check \
97 for a deadlock."
98 ),
99 }
100 })
101 }
102
103 fn is_ok(&self) -> bool {
104 Result::is_ok(self)
105 }
106}
107
108impl<E: 'static + Send> MultithreadedTestResult for Result<(), E> {
109 fn run<F: 'static + Sync + Fn(usize) -> Fut, Fut: 'static + Send + Future<Output = Self>>(
110 test: F,
111 threads: u8,
112 cfg: Config,
113 ) -> Self {
114 let test = apply_timeout!(cfg, |run| test(run));
115 cfg.run(threads, |run| crate::SendExecutor::new(threads).run(test(run)))
118 }
119
120 fn is_ok(&self) -> bool {
121 Result::is_ok(self)
122 }
123}
124
125impl TestResult for () {
126 fn run_singlethreaded(
127 test: &(dyn Sync + Fn(usize) -> Pin<Box<dyn Future<Output = Self>>>),
128 cfg: Config,
129 ) -> Self {
130 let _ = cfg.run(1, |run| {
131 crate::LocalExecutor::new().run_singlethreaded(test(run));
132 Ok::<(), ()>(())
133 });
134 }
135
136 #[cfg(target_os = "fuchsia")]
137 fn run_until_stalled<
138 F: Sync + 'static + Fn(usize) -> Fut,
139 Fut: 'static + Future<Output = Self>,
140 >(
141 fake_time: bool,
142 test: F,
143 cfg: Config,
144 ) -> Self {
145 let _ = TestResult::run_until_stalled(
146 fake_time,
147 move |run| {
148 let test = test(run);
149 async move {
150 test.await;
151 Ok::<(), ()>(())
152 }
153 },
154 cfg,
155 );
156 }
157
158 fn is_ok(&self) -> bool {
159 true
160 }
161}
162
163impl MultithreadedTestResult for () {
164 fn run<F: 'static + Sync + Fn(usize) -> Fut, Fut: 'static + Send + Future<Output = Self>>(
165 test: F,
166 threads: u8,
167 cfg: Config,
168 ) -> Self {
169 let _ = cfg.run(threads, |run| {
172 crate::SendExecutor::new(threads).run(test(run));
173 Ok::<(), ()>(())
174 });
175 }
176
177 fn is_ok(&self) -> bool {
178 true
179 }
180}
181
182#[derive(Clone)]
184pub struct Config {
185 repeat_count: usize,
186 max_concurrency: usize,
187 max_threads: u8,
188 timeout: Option<Duration>,
189}
190
191fn env_var<T: std::str::FromStr>(name: &str, default: T) -> T {
192 std::env::var(name).unwrap_or_default().parse().unwrap_or(default)
193}
194
195impl Config {
196 fn get() -> Self {
197 let repeat_count = std::cmp::max(1, env_var("FASYNC_TEST_REPEAT_COUNT", 1));
198 let max_concurrency = env_var("FASYNC_TEST_MAX_CONCURRENCY", 0);
199 let timeout_seconds = env_var("FASYNC_TEST_TIMEOUT_SECONDS", 0);
200 let max_threads = env_var("FASYNC_TEST_MAX_THREADS", 0);
201 let timeout =
202 if timeout_seconds == 0 { None } else { Some(Duration::from_secs(timeout_seconds)) };
203 Self { repeat_count, max_concurrency, max_threads, timeout }
204 }
205
206 fn in_parallel<E: Send>(
207 &self,
208 threads: u8,
209 f: impl Fn() -> Result<(), E> + Sync,
210 ) -> Result<(), E> {
211 std::thread::scope(|s| {
212 let mut join_handles = Vec::new();
213 for _ in 1..threads {
214 join_handles.push(s.spawn(&f));
215 }
216 let result = f();
217 if result.is_err() {
218 return result;
219 }
220 for h in join_handles {
221 match h.join() {
222 Ok(result @ Err(_)) => return result,
223 _ => {}
224 }
225 }
226 Ok(())
227 })
228 }
229
230 fn run<E: Send>(
231 &self,
232 test_threads: u8,
233 f: impl Fn(usize) -> Result<(), E> + Sync,
234 ) -> Result<(), E> {
235 let mut threads = std::cmp::min(std::cmp::max(self.repeat_count, 1), self.max_concurrency);
238 if self.max_threads != 0 {
239 threads =
240 std::cmp::min(threads, std::cmp::max(self.max_threads / test_threads, 1) as usize);
241 }
242 let threads = u8::try_from(threads).unwrap_or(u8::MAX);
243 let run = AtomicUsize::new(0);
244 self.in_parallel(threads, || {
245 loop {
246 let this_run = run.fetch_add(1, Ordering::Relaxed);
247 if this_run >= self.repeat_count {
248 return Ok(());
249 }
250 let result = f(this_run);
251 if result.is_err() {
252 run.store(self.repeat_count, Ordering::Relaxed);
254 return result;
255 }
256 }
257 })
258 }
259}
260
261pub fn run_singlethreaded_test<F, Fut, R>(test: F) -> R
263where
264 F: 'static + Sync + Fn(usize) -> Fut,
265 Fut: 'static + Future<Output = R>,
266 R: TestResult,
267{
268 TestResult::run_singlethreaded(&|run| test(run).boxed_local(), Config::get())
269}
270
271#[cfg(target_os = "fuchsia")]
273pub fn run_until_stalled_test<F, Fut, R>(fake_time: bool, test: F) -> R
274where
275 F: 'static + Sync + Fn(usize) -> Fut,
276 Fut: 'static + Future<Output = R>,
277 R: TestResult,
278{
279 TestResult::run_until_stalled(fake_time, test, Config::get())
280}
281
282pub fn run_test<F, Fut, R>(test: F, threads: u8) -> R
284where
285 F: 'static + Sync + Fn(usize) -> Fut,
286 Fut: 'static + Send + Future<Output = R>,
287 R: MultithreadedTestResult,
288{
289 MultithreadedTestResult::run(test, threads, Config::get())
290}
291
292#[cfg(test)]
293mod tests {
294 use super::{Config, MultithreadedTestResult, TestResult};
295 use futures::lock::Mutex;
296 use futures::prelude::*;
297 use std::collections::HashSet;
298 use std::sync::Arc;
299 use std::time::Duration;
300
301 #[test]
302 fn run_singlethreaded() {
303 const REPEAT_COUNT: usize = 1000;
304 const MAX_THREADS: u8 = 10;
305 let pending_runs: Arc<Mutex<HashSet<_>>> =
306 Arc::new(Mutex::new((0..REPEAT_COUNT).collect()));
307 let pending_runs_child = pending_runs.clone();
308 TestResult::run_singlethreaded(
309 &move |i| {
310 let pending_runs_child = pending_runs_child.clone();
311 async move {
312 assert!(pending_runs_child.lock().await.remove(&i));
313 }
314 .boxed_local()
315 },
316 Config {
317 repeat_count: REPEAT_COUNT,
318 max_concurrency: 0,
319 max_threads: MAX_THREADS,
320 timeout: None,
321 },
322 );
323 assert!(pending_runs.try_lock().unwrap().is_empty());
324 }
325
326 #[ignore]
328 #[test]
329 #[should_panic]
330 fn run_singlethreaded_with_timeout() {
331 TestResult::run_singlethreaded(
332 &move |_| {
333 async move {
334 futures::future::pending::<()>().await;
335 }
336 .boxed_local()
337 },
338 Config {
339 repeat_count: 1,
340 max_concurrency: 0,
341 max_threads: 0,
342 timeout: Some(Duration::from_millis(1)),
343 },
344 );
345 }
346
347 #[test]
348 #[cfg(target_os = "fuchsia")]
349 fn run_until_stalled() {
350 const REPEAT_COUNT: usize = 1000;
351 let pending_runs: Arc<Mutex<HashSet<_>>> =
352 Arc::new(Mutex::new((0..REPEAT_COUNT).collect()));
353 let pending_runs_child = pending_runs.clone();
354 TestResult::run_until_stalled(
355 false,
356 move |i| {
357 let pending_runs_child = pending_runs_child.clone();
358 async move {
359 assert!(pending_runs_child.lock().await.remove(&i));
360 }
361 },
362 Config {
363 repeat_count: REPEAT_COUNT,
364 max_concurrency: 1,
365 max_threads: 1,
366 timeout: None,
367 },
368 );
369 assert!(pending_runs.try_lock().unwrap().is_empty());
370 }
371
372 #[test]
373 fn run() {
374 const REPEAT_COUNT: usize = 1000;
375 const THREADS: u8 = 4;
376 let pending_runs: Arc<Mutex<HashSet<_>>> =
377 Arc::new(Mutex::new((0..REPEAT_COUNT).collect()));
378 let pending_runs_child = pending_runs.clone();
379 MultithreadedTestResult::run(
380 move |i| {
381 let pending_runs_child = pending_runs_child.clone();
382 async move {
383 assert!(pending_runs_child.lock().await.remove(&i));
384 }
385 },
386 THREADS,
387 Config {
388 repeat_count: REPEAT_COUNT,
389 max_concurrency: 0,
390 max_threads: THREADS,
391 timeout: None,
392 },
393 );
394 assert!(pending_runs.try_lock().unwrap().is_empty());
395 }
396
397 #[ignore]
399 #[test]
400 #[should_panic]
401 fn run_with_timeout() {
402 const THREADS: u8 = 4;
403 MultithreadedTestResult::run(
404 move |_| async move {
405 futures::future::pending::<()>().await;
406 },
407 THREADS,
408 Config {
409 repeat_count: 1,
410 max_concurrency: 0,
411 max_threads: 0,
412 timeout: Some(Duration::from_millis(1)),
413 },
414 );
415 }
416}