fuchsia_async/
test_support.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::TimeoutExt;
6use futures::prelude::*;
7use std::pin::Pin;
8use std::sync::atomic::{AtomicUsize, Ordering};
9#[cfg(target_os = "fuchsia")]
10use std::task::Poll;
11use std::time::Duration;
12
13// Apply the timeout from config to test
14// Ideally this would be a function like Config::with_timeout, but we need to handle Send and !Send
15// and it's likely better not to have to duplicate this code.
16macro_rules! apply_timeout {
17    ($config:expr, $test:expr) => {{
18        let timeout = $config.timeout;
19        let test = $test;
20        move |run| {
21            let test = test(run);
22            async move {
23                if let Some(timeout) = timeout {
24                    test.on_timeout(timeout, || panic!("timeout on run {}", run)).await
25                } else {
26                    test.await
27                }
28            }
29        }
30    }};
31}
32
33/// Defines how to compose multiple test runs for a kind of test result.
34pub trait TestResult: Sized {
35    /// How to repeatedly run a test with this result in a single threaded executor.
36    fn run_singlethreaded(
37        test: &(dyn Sync + Fn(usize) -> Pin<Box<dyn Future<Output = Self>>>),
38        cfg: Config,
39    ) -> Self;
40
41    /// Similarly, but use run_until_stalled
42    #[cfg(target_os = "fuchsia")]
43    fn run_until_stalled<
44        F: 'static + Sync + Fn(usize) -> Fut,
45        Fut: 'static + Future<Output = Self>,
46    >(
47        fake_time: bool,
48        test: F,
49        cfg: Config,
50    ) -> Self;
51
52    /// Whether the result is successful.
53    fn is_ok(&self) -> bool;
54}
55
56/// Defines how to compose multiple test runs for a kind of test result in a multithreaded executor.
57pub trait MultithreadedTestResult: Sized {
58    /// How to repeatedly run a test with this result in a multi threaded executor.
59    fn run<F: 'static + Sync + Fn(usize) -> Fut, Fut: 'static + Send + Future<Output = Self>>(
60        test: F,
61        threads: u8,
62        cfg: Config,
63    ) -> Self;
64
65    /// Whether the result is successful.
66    fn is_ok(&self) -> bool;
67}
68
69impl<E: Send + 'static + std::fmt::Debug> TestResult for Result<(), E> {
70    fn run_singlethreaded(
71        test: &(dyn Sync + Fn(usize) -> Pin<Box<dyn Future<Output = Self>>>),
72        cfg: Config,
73    ) -> Self {
74        cfg.run(1, |run| crate::LocalExecutor::new().run_singlethreaded(test(run)))
75    }
76
77    #[cfg(target_os = "fuchsia")]
78    fn run_until_stalled<
79        F: 'static + Sync + Fn(usize) -> Fut,
80        Fut: 'static + Future<Output = Self>,
81    >(
82        fake_time: bool,
83        test: F,
84        cfg: Config,
85    ) -> Self {
86        let test = apply_timeout!(cfg, |run| test(run));
87        cfg.run(1, |run| {
88            let mut executor = if fake_time {
89                crate::TestExecutor::new_with_fake_time()
90            } else {
91                crate::TestExecutor::new()
92            };
93            match executor.run_until_stalled(&mut std::pin::pin!(test(run))) {
94                Poll::Ready(result) => result,
95                Poll::Pending => panic!(
96                    "Stalled without completing. Consider using \"run_singlethreaded\", or check \
97                     for a deadlock."
98                ),
99            }
100        })
101    }
102
103    fn is_ok(&self) -> bool {
104        Result::is_ok(self)
105    }
106}
107
108impl<E: 'static + Send> MultithreadedTestResult for Result<(), E> {
109    fn run<F: 'static + Sync + Fn(usize) -> Fut, Fut: 'static + Send + Future<Output = Self>>(
110        test: F,
111        threads: u8,
112        cfg: Config,
113    ) -> Self {
114        let test = apply_timeout!(cfg, |run| test(run));
115        // Fuchsia's SendExecutor actually uses an extra thread, but it doesn't do anything, so we
116        // don't count it.
117        cfg.run(threads, |run| crate::SendExecutor::new(threads).run(test(run)))
118    }
119
120    fn is_ok(&self) -> bool {
121        Result::is_ok(self)
122    }
123}
124
125impl TestResult for () {
126    fn run_singlethreaded(
127        test: &(dyn Sync + Fn(usize) -> Pin<Box<dyn Future<Output = Self>>>),
128        cfg: Config,
129    ) -> Self {
130        let _ = cfg.run(1, |run| {
131            crate::LocalExecutor::new().run_singlethreaded(test(run));
132            Ok::<(), ()>(())
133        });
134    }
135
136    #[cfg(target_os = "fuchsia")]
137    fn run_until_stalled<
138        F: Sync + 'static + Fn(usize) -> Fut,
139        Fut: 'static + Future<Output = Self>,
140    >(
141        fake_time: bool,
142        test: F,
143        cfg: Config,
144    ) -> Self {
145        let _ = TestResult::run_until_stalled(
146            fake_time,
147            move |run| {
148                let test = test(run);
149                async move {
150                    test.await;
151                    Ok::<(), ()>(())
152                }
153            },
154            cfg,
155        );
156    }
157
158    fn is_ok(&self) -> bool {
159        true
160    }
161}
162
163impl MultithreadedTestResult for () {
164    fn run<F: 'static + Sync + Fn(usize) -> Fut, Fut: 'static + Send + Future<Output = Self>>(
165        test: F,
166        threads: u8,
167        cfg: Config,
168    ) -> Self {
169        // Fuchsia's SendExecutor actually uses an extra thread, but it doesn't do anything, so we
170        // don't count it.
171        let _ = cfg.run(threads, |run| {
172            crate::SendExecutor::new(threads).run(test(run));
173            Ok::<(), ()>(())
174        });
175    }
176
177    fn is_ok(&self) -> bool {
178        true
179    }
180}
181
182/// Configuration variables for a single test run.
183#[derive(Clone)]
184pub struct Config {
185    repeat_count: usize,
186    max_concurrency: usize,
187    max_threads: u8,
188    timeout: Option<Duration>,
189}
190
191fn env_var<T: std::str::FromStr>(name: &str, default: T) -> T {
192    std::env::var(name).unwrap_or_default().parse().unwrap_or(default)
193}
194
195impl Config {
196    fn get() -> Self {
197        let repeat_count = std::cmp::max(1, env_var("FASYNC_TEST_REPEAT_COUNT", 1));
198        let max_concurrency = env_var("FASYNC_TEST_MAX_CONCURRENCY", 0);
199        let timeout_seconds = env_var("FASYNC_TEST_TIMEOUT_SECONDS", 0);
200        let max_threads = env_var("FASYNC_TEST_MAX_THREADS", 0);
201        let timeout =
202            if timeout_seconds == 0 { None } else { Some(Duration::from_secs(timeout_seconds)) };
203        Self { repeat_count, max_concurrency, max_threads, timeout }
204    }
205
206    fn in_parallel<E: Send>(
207        &self,
208        threads: u8,
209        f: impl Fn() -> Result<(), E> + Sync,
210    ) -> Result<(), E> {
211        std::thread::scope(|s| {
212            let mut join_handles = Vec::new();
213            for _ in 1..threads {
214                join_handles.push(s.spawn(&f));
215            }
216            let result = f();
217            if result.is_err() {
218                return result;
219            }
220            for h in join_handles {
221                match h.join() {
222                    Ok(result @ Err(_)) => return result,
223                    _ => {}
224                }
225            }
226            Ok(())
227        })
228    }
229
230    fn run<E: Send>(
231        &self,
232        test_threads: u8,
233        f: impl Fn(usize) -> Result<(), E> + Sync,
234    ) -> Result<(), E> {
235        // max_concurrency is the maximum number of runs of the same test to run in parallel, but
236        // each test can run multiple threads.  max_threads is the maximum number of threads.
237        let mut threads = std::cmp::min(std::cmp::max(self.repeat_count, 1), self.max_concurrency);
238        if self.max_threads != 0 {
239            threads =
240                std::cmp::min(threads, std::cmp::max(self.max_threads / test_threads, 1) as usize);
241        }
242        let threads = u8::try_from(threads).unwrap_or(u8::MAX);
243        let run = AtomicUsize::new(0);
244        self.in_parallel(threads, || {
245            loop {
246                let this_run = run.fetch_add(1, Ordering::Relaxed);
247                if this_run >= self.repeat_count {
248                    return Ok(());
249                }
250                let result = f(this_run);
251                if result.is_err() {
252                    // Prevent any more runs from starting.
253                    run.store(self.repeat_count, Ordering::Relaxed);
254                    return result;
255                }
256            }
257        })
258    }
259}
260
261/// Runs a test in an executor, potentially repeatedly and concurrently
262pub fn run_singlethreaded_test<F, Fut, R>(test: F) -> R
263where
264    F: 'static + Sync + Fn(usize) -> Fut,
265    Fut: 'static + Future<Output = R>,
266    R: TestResult,
267{
268    TestResult::run_singlethreaded(&|run| test(run).boxed_local(), Config::get())
269}
270
271/// Runs a test in an executor until it's stalled
272#[cfg(target_os = "fuchsia")]
273pub fn run_until_stalled_test<F, Fut, R>(fake_time: bool, test: F) -> R
274where
275    F: 'static + Sync + Fn(usize) -> Fut,
276    Fut: 'static + Future<Output = R>,
277    R: TestResult,
278{
279    TestResult::run_until_stalled(fake_time, test, Config::get())
280}
281
282/// Runs a test in an executor, potentially repeatedly and concurrently
283pub fn run_test<F, Fut, R>(test: F, threads: u8) -> R
284where
285    F: 'static + Sync + Fn(usize) -> Fut,
286    Fut: 'static + Send + Future<Output = R>,
287    R: MultithreadedTestResult,
288{
289    MultithreadedTestResult::run(test, threads, Config::get())
290}
291
292#[cfg(test)]
293mod tests {
294    use super::{Config, MultithreadedTestResult, TestResult};
295    use futures::lock::Mutex;
296    use futures::prelude::*;
297    use std::collections::HashSet;
298    use std::sync::Arc;
299    use std::time::Duration;
300
301    #[test]
302    fn run_singlethreaded() {
303        const REPEAT_COUNT: usize = 1000;
304        const MAX_THREADS: u8 = 10;
305        let pending_runs: Arc<Mutex<HashSet<_>>> =
306            Arc::new(Mutex::new((0..REPEAT_COUNT).collect()));
307        let pending_runs_child = pending_runs.clone();
308        TestResult::run_singlethreaded(
309            &move |i| {
310                let pending_runs_child = pending_runs_child.clone();
311                async move {
312                    assert!(pending_runs_child.lock().await.remove(&i));
313                }
314                .boxed_local()
315            },
316            Config {
317                repeat_count: REPEAT_COUNT,
318                max_concurrency: 0,
319                max_threads: MAX_THREADS,
320                timeout: None,
321            },
322        );
323        assert!(pending_runs.try_lock().unwrap().is_empty());
324    }
325
326    // TODO(https://fxbug.dev/42138715): should_panic tests trigger LSAN
327    #[ignore]
328    #[test]
329    #[should_panic]
330    fn run_singlethreaded_with_timeout() {
331        TestResult::run_singlethreaded(
332            &move |_| {
333                async move {
334                    futures::future::pending::<()>().await;
335                }
336                .boxed_local()
337            },
338            Config {
339                repeat_count: 1,
340                max_concurrency: 0,
341                max_threads: 0,
342                timeout: Some(Duration::from_millis(1)),
343            },
344        );
345    }
346
347    #[test]
348    #[cfg(target_os = "fuchsia")]
349    fn run_until_stalled() {
350        const REPEAT_COUNT: usize = 1000;
351        let pending_runs: Arc<Mutex<HashSet<_>>> =
352            Arc::new(Mutex::new((0..REPEAT_COUNT).collect()));
353        let pending_runs_child = pending_runs.clone();
354        TestResult::run_until_stalled(
355            false,
356            move |i| {
357                let pending_runs_child = pending_runs_child.clone();
358                async move {
359                    assert!(pending_runs_child.lock().await.remove(&i));
360                }
361            },
362            Config {
363                repeat_count: REPEAT_COUNT,
364                max_concurrency: 1,
365                max_threads: 1,
366                timeout: None,
367            },
368        );
369        assert!(pending_runs.try_lock().unwrap().is_empty());
370    }
371
372    #[test]
373    fn run() {
374        const REPEAT_COUNT: usize = 1000;
375        const THREADS: u8 = 4;
376        let pending_runs: Arc<Mutex<HashSet<_>>> =
377            Arc::new(Mutex::new((0..REPEAT_COUNT).collect()));
378        let pending_runs_child = pending_runs.clone();
379        MultithreadedTestResult::run(
380            move |i| {
381                let pending_runs_child = pending_runs_child.clone();
382                async move {
383                    assert!(pending_runs_child.lock().await.remove(&i));
384                }
385            },
386            THREADS,
387            Config {
388                repeat_count: REPEAT_COUNT,
389                max_concurrency: 0,
390                max_threads: THREADS,
391                timeout: None,
392            },
393        );
394        assert!(pending_runs.try_lock().unwrap().is_empty());
395    }
396
397    // TODO(https://fxbug.dev/42138715): should_panic tests trigger LSAN
398    #[ignore]
399    #[test]
400    #[should_panic]
401    fn run_with_timeout() {
402        const THREADS: u8 = 4;
403        MultithreadedTestResult::run(
404            move |_| async move {
405                futures::future::pending::<()>().await;
406            },
407            THREADS,
408            Config {
409                repeat_count: 1,
410                max_concurrency: 0,
411                max_threads: 0,
412                timeout: Some(Duration::from_millis(1)),
413            },
414        );
415    }
416}