stress_test/actor_runner.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95
// Copyright 2021 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use crate::actor::{Actor, ActorError};
use crate::counter::CounterTx;
use fuchsia_async::{MonotonicInstant, Task, Timer};
use futures::future::{AbortHandle, Abortable, Aborted};
use futures::lock::Mutex;
use log::debug;
use std::sync::Arc;
use std::time::Duration;
/// The thread that runs an actor indefinitely
#[derive(Clone)]
pub struct ActorRunner {
// The name of this actor
pub name: String,
// The duration to wait between actor operations
pub delay: Option<Duration>,
// A mutable reference to the actor for this configuration.
// The runner will lock on the actor when it is performing an operation.
// The environment can lock on the actor during reset.
pub actor: Arc<Mutex<dyn Actor>>,
}
impl ActorRunner {
pub fn new<A: Actor>(
name: impl ToString,
delay: Option<Duration>,
actor: Arc<Mutex<A>>,
) -> Self {
Self { name: name.to_string(), delay, actor: actor as Arc<Mutex<dyn Actor>> }
}
/// Run the actor in a new task indefinitely for the given generation.
/// The runner will stop if the actor requests an environment reset.
/// The amount of parallelism is determined by the caller's executor.
// TODO(https://fxbug.dev/42158958): Find a different way to set parallelism.
pub fn run(
self,
counter_tx: CounterTx,
generation: u64,
) -> (Task<Result<(ActorRunner, u64), Aborted>>, AbortHandle) {
let (abort_handle, abort_registration) = AbortHandle::new_pair();
let fut = async move {
let mut local_count: u64 = 0;
loop {
if let Some(delay) = self.delay {
debug!(
generation:%,
name:% = self.name,
local_count:%,
sleep_duration:? = delay;
"Sleeping"
);
Timer::new(MonotonicInstant::after(delay.into())).await;
}
debug!(generation:%, name:% = self.name, local_count:%; "Performing...");
// Lock on the actor and perform. This prevents the environment from
// modifying the actor until the operation is complete.
let result = {
let mut actor = self.actor.lock().await;
actor.perform().await
};
match result {
Ok(()) => {
// Count this iteration towards the global count
let _ = counter_tx.unbounded_send(self.name.clone());
debug!(generation:%, name:% = self.name, local_count:%; "Done!");
}
Err(ActorError::DoNotCount) => {
// Do not count this iteration towards global count
}
Err(ActorError::ResetEnvironment) => {
// Actor needs environment to be reset. Stop the runner
debug!(
generation:%, name:% = self.name, local_count:%;
"Reset Environment!"
);
return (self, generation);
}
}
local_count += 1;
}
};
(Task::spawn(Abortable::new(fut, abort_registration)), abort_handle)
}
}