1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
45pub mod actor;
6pub mod environment;
78mod actor_runner;
9mod counter;
1011use crate::counter::start_counter;
12use crate::environment::Environment;
13use fuchsia_async::{MonotonicInstant, Timer};
14use futures::future::{select, Aborted, Either};
15use futures::stream::FuturesUnordered;
16use futures::StreamExt;
17use log::{error, info};
18use rand::rngs::SmallRng;
19use rand::{Rng, SeedableRng};
20use std::pin::pin;
21use std::time::Duration;
2223/// Use entropy to generate a random seed
24pub fn random_seed() -> u64 {
25let mut temp_rng = SmallRng::from_entropy();
26 temp_rng.gen()
27}
2829/// Runs the test loop for the given environment to completion.
30pub async fn run_test<E: 'static + Environment>(mut env: E) {
31let env_string = format!("{:#?}", env);
3233info!("--------------------- stressor is starting -----------------------");
34info!("{}", env_string);
35info!("------------------------------------------------------------------");
3637 {
38// Setup a panic handler that prints out details of this invocation on crash
39let default_panic_hook = std::panic::take_hook();
40let custom_panic_hook = env.panic_hook();
41 std::panic::set_hook(Box::new(move |panic_info| {
42error!("");
43error!("--------------------- stressor has crashed -----------------------");
44error!("{}", env_string);
45error!("------------------------------------------------------------------");
46error!("");
47if let Some(hook) = &custom_panic_hook {
48 hook();
49 }
50 default_panic_hook(panic_info);
51 }));
52 }
5354// Extract the data from the environment
55 // Defaults:
56 // - target_operations: 2^64
57 // - timeout_secs: 24 hours
58let target_operations = env.target_operations().unwrap_or(u64::MAX);
59let timeout_secs = Duration::from_secs(env.timeout_seconds().unwrap_or(24 * 60 * 60));
6061// Start the counter thread
62 // The counter thread keeps track of the global operation count.
63 // Each actor will send a message to the counter thread when an operation is completed.
64 // When the target operation count is hit, the counter task exits.
65let (counter_task, counter_tx) = start_counter(target_operations);
6667// Create a timeout task
68let timeout = pin!(Timer::new(MonotonicInstant::after(timeout_secs.into())));
69let mut test_end = pin!(select(counter_task, timeout));
7071// A monotonically increasing counter representing the current generation.
72 // On every environment reset, the generation is incremented.
73let mut generation: u64 = 0;
7475// Start all the runners
76let (mut runner_tasks, mut runner_abort): (FuturesUnordered<_>, Vec<_>) = env
77 .actor_runners()
78 .await
79.into_iter()
80 .map(|r| r.run(counter_tx.clone(), generation))
81 .unzip();
8283loop {
84// Wait for one of the runners, counter task or timer to return
85let either = select(test_end, runner_tasks.next()).await;
86match either {
87 Either::Left((test_end_either, _next)) => {
88let reason = match test_end_either {
89 Either::Left(..) => "operation count",
90 Either::Right(..) => "timeout",
91 };
9293// The counter/timer task returned.
94 // The target operation count was hit or the timer expired.
95 // The test has completed.
96info!(reason:%; "Stress test has completed!");
97for abort in runner_abort {
98 abort.abort();
99 }
100// We don't care if tasks finished or were aborted, but we want them not running
101 // anymore before we return.
102 //
103 // Runaway threads can cause problems if they're using objects from the main
104 // executor, and it's generally a good idea to clean up after ourselves here.
105let () = runner_tasks.map(|_: Result<_, Aborted>| ()).collect().await;
106break;
107 }
108 Either::Right((None, _counter_task)) => {
109info!("No runners to operate");
110break;
111 }
112 Either::Right((Some(result), task)) => {
113let (runner, runner_generation) = result.expect("no tasks have been aborted");
114// Normally, actor runners run indefinitely.
115 // However, one of the actor runners has returned.
116 // This is because an actor has requested an environment reset.
117118 // Move the counter/timer back
119test_end = task;
120121// Did the runner request a reset at the current generation?
122if runner_generation == generation {
123// Reset the environment
124info!("Resetting environment");
125 env.reset().await;
126127// Advance the generation
128generation += 1;
129 }
130131// Restart this runner with the current generation
132let (task, abort) = runner.run(counter_tx.clone(), generation);
133 runner_tasks.push(task);
134 runner_abort.push(abort);
135 }
136 }
137 }
138}