stress_test/actor_runner.rs
1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::actor::{Actor, ActorError};
6use crate::counter::CounterTx;
7use fuchsia_async::{MonotonicInstant, Task, Timer};
8use futures::future::{AbortHandle, Abortable, Aborted};
9use futures::lock::Mutex;
10use log::debug;
11use std::sync::Arc;
12use std::time::Duration;
13
14/// The thread that runs an actor indefinitely
15#[derive(Clone)]
16pub struct ActorRunner {
17 // The name of this actor
18 pub name: String,
19
20 // The duration to wait between actor operations
21 pub delay: Option<Duration>,
22
23 // A mutable reference to the actor for this configuration.
24 // The runner will lock on the actor when it is performing an operation.
25 // The environment can lock on the actor during reset.
26 pub actor: Arc<Mutex<dyn Actor>>,
27}
28
29impl ActorRunner {
30 pub fn new<A: Actor>(
31 name: impl ToString,
32 delay: Option<Duration>,
33 actor: Arc<Mutex<A>>,
34 ) -> Self {
35 Self { name: name.to_string(), delay, actor: actor as Arc<Mutex<dyn Actor>> }
36 }
37
38 /// Run the actor in a new task indefinitely for the given generation.
39 /// The runner will stop if the actor requests an environment reset.
40 /// The amount of parallelism is determined by the caller's executor.
41 // TODO(https://fxbug.dev/42158958): Find a different way to set parallelism.
42 pub fn run(
43 self,
44 counter_tx: CounterTx,
45 generation: u64,
46 ) -> (Task<Result<(ActorRunner, u64), Aborted>>, AbortHandle) {
47 let (abort_handle, abort_registration) = AbortHandle::new_pair();
48 let fut = async move {
49 let mut local_count: u64 = 0;
50 loop {
51 if let Some(delay) = self.delay {
52 debug!(
53 generation:%,
54 name:% = self.name,
55 local_count:%,
56 sleep_duration:? = delay;
57 "Sleeping"
58 );
59 Timer::new(MonotonicInstant::after(delay.into())).await;
60 }
61
62 debug!(generation:%, name:% = self.name, local_count:%; "Performing...");
63
64 // Lock on the actor and perform. This prevents the environment from
65 // modifying the actor until the operation is complete.
66 let result = {
67 let mut actor = self.actor.lock().await;
68 actor.perform().await
69 };
70
71 match result {
72 Ok(()) => {
73 // Count this iteration towards the global count
74 let _ = counter_tx.unbounded_send(self.name.clone());
75 debug!(generation:%, name:% = self.name, local_count:%; "Done!");
76 }
77 Err(ActorError::DoNotCount) => {
78 // Do not count this iteration towards global count
79 }
80 Err(ActorError::ResetEnvironment) => {
81 // Actor needs environment to be reset. Stop the runner
82 debug!(
83 generation:%, name:% = self.name, local_count:%;
84 "Reset Environment!"
85 );
86 return (self, generation);
87 }
88 }
89
90 local_count += 1;
91 }
92 };
93 (Task::spawn(Abortable::new(fut, abort_registration)), abort_handle)
94 }
95}