stress_test/
actor_runner.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::actor::{Actor, ActorError};
6use crate::counter::CounterTx;
7use fuchsia_async::{MonotonicInstant, Task, Timer};
8use futures::future::{AbortHandle, Abortable, Aborted};
9use futures::lock::Mutex;
10use log::debug;
11use std::sync::Arc;
12use std::time::Duration;
13
14/// The thread that runs an actor indefinitely
15#[derive(Clone)]
16pub struct ActorRunner {
17    // The name of this actor
18    pub name: String,
19
20    // The duration to wait between actor operations
21    pub delay: Option<Duration>,
22
23    // A mutable reference to the actor for this configuration.
24    // The runner will lock on the actor when it is performing an operation.
25    // The environment can lock on the actor during reset.
26    pub actor: Arc<Mutex<dyn Actor>>,
27}
28
29impl ActorRunner {
30    pub fn new<A: Actor>(
31        name: impl ToString,
32        delay: Option<Duration>,
33        actor: Arc<Mutex<A>>,
34    ) -> Self {
35        Self { name: name.to_string(), delay, actor: actor as Arc<Mutex<dyn Actor>> }
36    }
37
38    /// Run the actor in a new task indefinitely for the given generation.
39    /// The runner will stop if the actor requests an environment reset.
40    /// The amount of parallelism is determined by the caller's executor.
41    // TODO(https://fxbug.dev/42158958): Find a different way to set parallelism.
42    pub fn run(
43        self,
44        counter_tx: CounterTx,
45        generation: u64,
46    ) -> (Task<Result<(ActorRunner, u64), Aborted>>, AbortHandle) {
47        let (abort_handle, abort_registration) = AbortHandle::new_pair();
48        let fut = async move {
49            let mut local_count: u64 = 0;
50            loop {
51                if let Some(delay) = self.delay {
52                    debug!(
53                        generation:%,
54                        name:% = self.name,
55                        local_count:%,
56                        sleep_duration:? = delay;
57                        "Sleeping"
58                    );
59                    Timer::new(MonotonicInstant::after(delay.into())).await;
60                }
61
62                debug!(generation:%, name:% = self.name, local_count:%; "Performing...");
63
64                // Lock on the actor and perform. This prevents the environment from
65                // modifying the actor until the operation is complete.
66                let result = {
67                    let mut actor = self.actor.lock().await;
68                    actor.perform().await
69                };
70
71                match result {
72                    Ok(()) => {
73                        // Count this iteration towards the global count
74                        let _ = counter_tx.unbounded_send(self.name.clone());
75                        debug!(generation:%, name:% = self.name, local_count:%; "Done!");
76                    }
77                    Err(ActorError::DoNotCount) => {
78                        // Do not count this iteration towards global count
79                    }
80                    Err(ActorError::ResetEnvironment) => {
81                        // Actor needs environment to be reset. Stop the runner
82                        debug!(
83                            generation:%, name:% = self.name, local_count:%;
84                            "Reset Environment!"
85                        );
86                        return (self, generation);
87                    }
88                }
89
90                local_count += 1;
91            }
92        };
93        (Task::spawn(Abortable::new(fut, abort_registration)), abort_handle)
94    }
95}