1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
pub mod actor;
pub mod environment;
mod actor_runner;
mod counter;
use crate::counter::start_counter;
use crate::environment::Environment;
use fuchsia_async::{MonotonicInstant, Timer};
use futures::future::{select, Aborted, Either};
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use rand::rngs::SmallRng;
use rand::{Rng, SeedableRng};
use std::pin::pin;
use std::time::Duration;
use tracing::{error, info};
/// Use entropy to generate a random seed
pub fn random_seed() -> u64 {
let mut temp_rng = SmallRng::from_entropy();
temp_rng.gen()
}
/// Runs the test loop for the given environment to completion.
pub async fn run_test<E: 'static + Environment>(mut env: E) {
let env_string = format!("{:#?}", env);
info!("--------------------- stressor is starting -----------------------");
info!("{}", env_string);
info!("------------------------------------------------------------------");
{
// Setup a panic handler that prints out details of this invocation on crash
let default_panic_hook = std::panic::take_hook();
let custom_panic_hook = env.panic_hook();
std::panic::set_hook(Box::new(move |panic_info| {
error!("");
error!("--------------------- stressor has crashed -----------------------");
error!("{}", env_string);
error!("------------------------------------------------------------------");
error!("");
if let Some(hook) = &custom_panic_hook {
hook();
}
default_panic_hook(panic_info);
}));
}
// Extract the data from the environment
// Defaults:
// - target_operations: 2^64
// - timeout_secs: 24 hours
let target_operations = env.target_operations().unwrap_or(u64::MAX);
let timeout_secs = Duration::from_secs(env.timeout_seconds().unwrap_or(24 * 60 * 60));
// Start the counter thread
// The counter thread keeps track of the global operation count.
// Each actor will send a message to the counter thread when an operation is completed.
// When the target operation count is hit, the counter task exits.
let (counter_task, counter_tx) = start_counter(target_operations);
// Create a timeout task
let timeout = pin!(Timer::new(MonotonicInstant::after(timeout_secs.into())));
let mut test_end = pin!(select(counter_task, timeout));
// A monotonically increasing counter representing the current generation.
// On every environment reset, the generation is incremented.
let mut generation: u64 = 0;
// Start all the runners
let (mut runner_tasks, mut runner_abort): (FuturesUnordered<_>, Vec<_>) = env
.actor_runners()
.await
.into_iter()
.map(|r| r.run(counter_tx.clone(), generation))
.unzip();
loop {
// Wait for one of the runners, counter task or timer to return
let either = select(test_end, runner_tasks.next()).await;
match either {
Either::Left((test_end_either, _next)) => {
let reason = match test_end_either {
Either::Left(..) => "operation count",
Either::Right(..) => "timeout",
};
// The counter/timer task returned.
// The target operation count was hit or the timer expired.
// The test has completed.
info!(%reason, "Stress test has completed!");
for abort in runner_abort {
abort.abort();
}
// We don't care if tasks finished or were aborted, but we want them not running
// anymore before we return.
//
// Runaway threads can cause problems if they're using objects from the main
// executor, and it's generally a good idea to clean up after ourselves here.
let () = runner_tasks.map(|_: Result<_, Aborted>| ()).collect().await;
break;
}
Either::Right((None, _counter_task)) => {
info!("No runners to operate");
break;
}
Either::Right((Some(result), task)) => {
let (runner, runner_generation) = result.expect("no tasks have been aborted");
// Normally, actor runners run indefinitely.
// However, one of the actor runners has returned.
// This is because an actor has requested an environment reset.
// Move the counter/timer back
test_end = task;
// Did the runner request a reset at the current generation?
if runner_generation == generation {
// Reset the environment
info!("Resetting environment");
env.reset().await;
// Advance the generation
generation += 1;
}
// Restart this runner with the current generation
let (task, abort) = runner.run(counter_tx.clone(), generation);
runner_tasks.push(task);
runner_abort.push(abort);
}
}
}
}