1pub mod actor;
6pub mod environment;
7
8mod actor_runner;
9mod counter;
10
11use crate::counter::start_counter;
12use crate::environment::Environment;
13use fuchsia_async::{MonotonicInstant, Timer};
14use futures::future::{select, Aborted, Either};
15use futures::stream::FuturesUnordered;
16use futures::StreamExt;
17use log::{error, info};
18use rand::rngs::SmallRng;
19use rand::{Rng, SeedableRng};
20use std::pin::pin;
21use std::time::Duration;
22
23pub fn random_seed() -> u64 {
25 let mut temp_rng = SmallRng::from_entropy();
26 temp_rng.gen()
27}
28
29pub async fn run_test<E: 'static + Environment>(mut env: E) {
31 let env_string = format!("{:#?}", env);
32
33 info!("--------------------- stressor is starting -----------------------");
34 info!("{}", env_string);
35 info!("------------------------------------------------------------------");
36
37 {
38 let default_panic_hook = std::panic::take_hook();
40 let custom_panic_hook = env.panic_hook();
41 std::panic::set_hook(Box::new(move |panic_info| {
42 error!("");
43 error!("--------------------- stressor has crashed -----------------------");
44 error!("{}", env_string);
45 error!("------------------------------------------------------------------");
46 error!("");
47 if let Some(hook) = &custom_panic_hook {
48 hook();
49 }
50 default_panic_hook(panic_info);
51 }));
52 }
53
54 let target_operations = env.target_operations().unwrap_or(u64::MAX);
59 let timeout_secs = Duration::from_secs(env.timeout_seconds().unwrap_or(24 * 60 * 60));
60
61 let (counter_task, counter_tx) = start_counter(target_operations);
66
67 let timeout = pin!(Timer::new(MonotonicInstant::after(timeout_secs.into())));
69 let mut test_end = pin!(select(counter_task, timeout));
70
71 let mut generation: u64 = 0;
74
75 let (mut runner_tasks, mut runner_abort): (FuturesUnordered<_>, Vec<_>) = env
77 .actor_runners()
78 .await
79 .into_iter()
80 .map(|r| r.run(counter_tx.clone(), generation))
81 .unzip();
82
83 loop {
84 let either = select(test_end, runner_tasks.next()).await;
86 match either {
87 Either::Left((test_end_either, _next)) => {
88 let reason = match test_end_either {
89 Either::Left(..) => "operation count",
90 Either::Right(..) => "timeout",
91 };
92
93 info!(reason:%; "Stress test has completed!");
97 for abort in runner_abort {
98 abort.abort();
99 }
100 let () = runner_tasks.map(|_: Result<_, Aborted>| ()).collect().await;
106 break;
107 }
108 Either::Right((None, _counter_task)) => {
109 info!("No runners to operate");
110 break;
111 }
112 Either::Right((Some(result), task)) => {
113 let (runner, runner_generation) = result.expect("no tasks have been aborted");
114 test_end = task;
120
121 if runner_generation == generation {
123 info!("Resetting environment");
125 env.reset().await;
126
127 generation += 1;
129 }
130
131 let (task, abort) = runner.run(counter_tx.clone(), generation);
133 runner_tasks.push(task);
134 runner_abort.push(abort);
135 }
136 }
137 }
138}