stress_test/
lib.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5pub mod actor;
6pub mod environment;
7
8mod actor_runner;
9mod counter;
10
11use crate::counter::start_counter;
12use crate::environment::Environment;
13use fuchsia_async::{MonotonicInstant, Timer};
14use futures::future::{select, Aborted, Either};
15use futures::stream::FuturesUnordered;
16use futures::StreamExt;
17use log::{error, info};
18use rand::rngs::SmallRng;
19use rand::{Rng, SeedableRng};
20use std::pin::pin;
21use std::time::Duration;
22
23/// Use entropy to generate a random seed
24pub fn random_seed() -> u64 {
25    let mut temp_rng = SmallRng::from_entropy();
26    temp_rng.gen()
27}
28
29/// Runs the test loop for the given environment to completion.
30pub async fn run_test<E: 'static + Environment>(mut env: E) {
31    let env_string = format!("{:#?}", env);
32
33    info!("--------------------- stressor is starting -----------------------");
34    info!("{}", env_string);
35    info!("------------------------------------------------------------------");
36
37    {
38        // Setup a panic handler that prints out details of this invocation on crash
39        let default_panic_hook = std::panic::take_hook();
40        let custom_panic_hook = env.panic_hook();
41        std::panic::set_hook(Box::new(move |panic_info| {
42            error!("");
43            error!("--------------------- stressor has crashed -----------------------");
44            error!("{}", env_string);
45            error!("------------------------------------------------------------------");
46            error!("");
47            if let Some(hook) = &custom_panic_hook {
48                hook();
49            }
50            default_panic_hook(panic_info);
51        }));
52    }
53
54    // Extract the data from the environment
55    // Defaults:
56    // - target_operations: 2^64
57    // - timeout_secs: 24 hours
58    let target_operations = env.target_operations().unwrap_or(u64::MAX);
59    let timeout_secs = Duration::from_secs(env.timeout_seconds().unwrap_or(24 * 60 * 60));
60
61    // Start the counter thread
62    // The counter thread keeps track of the global operation count.
63    // Each actor will send a message to the counter thread when an operation is completed.
64    // When the target operation count is hit, the counter task exits.
65    let (counter_task, counter_tx) = start_counter(target_operations);
66
67    // Create a timeout task
68    let timeout = pin!(Timer::new(MonotonicInstant::after(timeout_secs.into())));
69    let mut test_end = pin!(select(counter_task, timeout));
70
71    // A monotonically increasing counter representing the current generation.
72    // On every environment reset, the generation is incremented.
73    let mut generation: u64 = 0;
74
75    // Start all the runners
76    let (mut runner_tasks, mut runner_abort): (FuturesUnordered<_>, Vec<_>) = env
77        .actor_runners()
78        .await
79        .into_iter()
80        .map(|r| r.run(counter_tx.clone(), generation))
81        .unzip();
82
83    loop {
84        // Wait for one of the runners, counter task or timer to return
85        let either = select(test_end, runner_tasks.next()).await;
86        match either {
87            Either::Left((test_end_either, _next)) => {
88                let reason = match test_end_either {
89                    Either::Left(..) => "operation count",
90                    Either::Right(..) => "timeout",
91                };
92
93                // The counter/timer task returned.
94                // The target operation count was hit or the timer expired.
95                // The test has completed.
96                info!(reason:%; "Stress test has completed!");
97                for abort in runner_abort {
98                    abort.abort();
99                }
100                // We don't care if tasks finished or were aborted, but we want them not running
101                // anymore before we return.
102                //
103                // Runaway threads can cause problems if they're using objects from the main
104                // executor, and it's generally a good idea to clean up after ourselves here.
105                let () = runner_tasks.map(|_: Result<_, Aborted>| ()).collect().await;
106                break;
107            }
108            Either::Right((None, _counter_task)) => {
109                info!("No runners to operate");
110                break;
111            }
112            Either::Right((Some(result), task)) => {
113                let (runner, runner_generation) = result.expect("no tasks have been aborted");
114                // Normally, actor runners run indefinitely.
115                // However, one of the actor runners has returned.
116                // This is because an actor has requested an environment reset.
117
118                // Move the counter/timer back
119                test_end = task;
120
121                // Did the runner request a reset at the current generation?
122                if runner_generation == generation {
123                    // Reset the environment
124                    info!("Resetting environment");
125                    env.reset().await;
126
127                    // Advance the generation
128                    generation += 1;
129                }
130
131                // Restart this runner with the current generation
132                let (task, abort) = runner.run(counter_tx.clone(), generation);
133                runner_tasks.push(task);
134                runner_abort.push(abort);
135            }
136        }
137    }
138}