1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
use {
anyhow::{format_err, Result},
fidl::endpoints::create_request_stream,
fidl_fuchsia_stresstest::{
Action as FidlAction, ActionIteratorMarker, ActionIteratorRequest, ActorRequest,
ActorRequestStream, Error,
},
fuchsia_component::server::ServiceFs,
fuchsia_zircon::sys::ZX_CHANNEL_MAX_MSG_BYTES,
futures::{future::BoxFuture, StreamExt, TryStreamExt},
rand::{rngs::SmallRng, SeedableRng},
rust_measure_tape_for_action::Measurable,
};
enum OutgoingProtocols {
Actor(ActorRequestStream),
}
pub struct Action<D> {
pub name: &'static str,
pub run: for<'a> fn(&'a mut D, SmallRng) -> BoxFuture<'a, Result<()>>,
}
impl<D> Action<D> {
fn to_fidl(&self) -> FidlAction {
FidlAction { name: Some(self.name.to_string()), ..FidlAction::EMPTY }
}
}
pub async fn actor_loop<D>(mut data: D, actions: Vec<Action<D>>) -> Result<()> {
let mut service_fs = ServiceFs::new();
service_fs.dir("svc").add_fidl_service(OutgoingProtocols::Actor);
service_fs.take_and_serve_directory_handle()?;
let OutgoingProtocols::Actor(mut stream) = service_fs
.next()
.await
.ok_or(format_err!("Could not get next connection to Actor protocol"))?;
while let Some(request) = stream
.try_next()
.await
.map_err(|e| format_err!("FIDL error in call to Actor protocol: {}", e))?
{
match request {
ActorRequest::GetActions { responder } => {
let (client_end, mut stream) = create_request_stream::<ActionIteratorMarker>()?;
responder.send(client_end)?;
let mut iter = actions.iter().map(|c| c.to_fidl());
while let Some(ActionIteratorRequest::GetNext { responder }) =
stream.try_next().await?
{
let mut bytes_used: usize = 32;
let mut action_count = 0;
for action in iter.clone() {
bytes_used += action.measure().num_bytes;
if bytes_used > ZX_CHANNEL_MAX_MSG_BYTES as usize {
break;
}
action_count += 1;
}
responder.send(&mut iter.by_ref().take(action_count))?;
if action_count == 0 {
break;
}
}
}
ActorRequest::Run { action_name, seed, responder } => {
if let Some(action) = actions.iter().find(|action| action.name == action_name) {
let rng = SmallRng::seed_from_u64(seed);
if let Err(e) = (action.run)(&mut data, rng).await {
let chain: Vec<String> = e.chain().map(|c| c.to_string()).collect();
let chain = chain.join(" -> ");
responder.send(Some(&mut Error::ErrorString(chain)))?;
} else {
responder.send(None)?;
}
} else {
responder
.send(Some(&mut Error::ErrorString("Invalid action name".to_string())))?;
}
}
}
}
Ok(())
}