1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
pub mod actor;
pub mod environment;
mod actor_runner;
mod counter;
use {
crate::{counter::start_counter, environment::Environment},
fuchsia_async::{Time, Timer},
futures::{
future::{select, Aborted, Either},
stream::FuturesUnordered,
StreamExt,
},
rand::{rngs::SmallRng, Rng, SeedableRng},
std::time::Duration,
tracing::{error, info},
};
pub fn random_seed() -> u64 {
let mut temp_rng = SmallRng::from_entropy();
temp_rng.gen()
}
pub async fn run_test<E: 'static + Environment>(mut env: E) {
let env_string = format!("{:#?}", env);
info!("--------------------- stressor is starting -----------------------");
info!("{}", env_string);
info!("------------------------------------------------------------------");
{
let default_panic_hook = std::panic::take_hook();
let custom_panic_hook = env.panic_hook();
std::panic::set_hook(Box::new(move |panic_info| {
error!("");
error!("--------------------- stressor has crashed -----------------------");
error!("{}", env_string);
error!("------------------------------------------------------------------");
error!("");
if let Some(hook) = &custom_panic_hook {
hook();
}
default_panic_hook(panic_info);
}));
}
let target_operations = env.target_operations().unwrap_or(u64::MAX);
let timeout_secs = Duration::from_secs(env.timeout_seconds().unwrap_or(24 * 60 * 60));
let (counter_task, counter_tx) = start_counter(target_operations);
let timeout = Timer::new(Time::after(timeout_secs.into()));
let mut test_end = select(counter_task, timeout);
let mut generation: u64 = 0;
let (mut runner_tasks, mut runner_abort): (FuturesUnordered<_>, Vec<_>) = env
.actor_runners()
.await
.into_iter()
.map(|r| r.run(counter_tx.clone(), generation))
.unzip();
loop {
let either = select(test_end, runner_tasks.next()).await;
match either {
Either::Left((test_end_either, _next)) => {
let reason = match test_end_either {
Either::Left(..) => "operation count",
Either::Right(..) => "timeout",
};
info!(%reason, "Stress test has completed!");
for abort in runner_abort {
abort.abort();
}
let () = runner_tasks.map(|_: Result<_, Aborted>| ()).collect().await;
break;
}
Either::Right((None, _counter_task)) => {
info!("No runners to operate");
break;
}
Either::Right((Some(result), task)) => {
let (runner, runner_generation) = result.expect("no tasks have been aborted");
test_end = task;
if runner_generation == generation {
info!("Resetting environment");
env.reset().await;
generation += 1;
}
let (task, abort) = runner.run(counter_tx.clone(), generation);
runner_tasks.push(task);
runner_abort.push(abort);
}
}
}
}