1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
45use anyhow::{format_err, Result};
6use fidl::endpoints::create_request_stream;
7use fidl_fuchsia_stresstest::{
8 Action as FidlAction, ActionIteratorMarker, ActionIteratorRequest, ActorRequest,
9 ActorRequestStream, Error,
10};
11use fuchsia_component::server::ServiceFs;
12use futures::future::BoxFuture;
13use futures::{StreamExt, TryStreamExt};
14use rand::rngs::SmallRng;
15use rand::SeedableRng;
16use rust_measure_tape_for_action::Measurable;
17use zx::sys::ZX_CHANNEL_MAX_MSG_BYTES;
1819enum OutgoingProtocols {
20 Actor(ActorRequestStream),
21}
2223/// This structure represents a single action that can be run by an actor.
24/// These actions are exposed to the stress test runner over FIDL using the Actor protocol.
25pub struct Action<D> {
26/// The name of this action, as it will appear to the stress test runner
27pub name: &'static str,
2829/// The function that will be invoked when this action is asked to execute.
30 /// This function returns a boxed future that will be awaited on to allow
31 /// async work to be done.
32pub run: for<'a> fn(&'a mut D, SmallRng) -> BoxFuture<'a, Result<()>>,
33}
3435impl<D> Action<D> {
36/// Converts the rust object into its FIDL equivalent to be sent across the wire.
37fn to_fidl(&self) -> FidlAction {
38 FidlAction { name: Some(self.name.to_string()), ..Default::default() }
39 }
40}
4142/// This is an indefinite loop that is run inside a actor component. Clients use the Actor
43/// protocol to run test actions.
44///
45/// This method will serve the Actor protocol over its outgoing directory and wait for
46/// exactly one client to connect.
47///
48/// NOTE: This method takes and serves the process' outgoing directory handle.
49/// This handle should not be taken before this method is invoked.
50///
51/// NOTE: The actor library expects exactly one connection to the Actor protocol in this method.
52pub async fn actor_loop<D>(mut data: D, actions: Vec<Action<D>>) -> Result<()> {
53let mut service_fs = ServiceFs::new();
54 service_fs.dir("svc").add_fidl_service(OutgoingProtocols::Actor);
55 service_fs.take_and_serve_directory_handle()?;
5657// Wait for a client to connect
58let OutgoingProtocols::Actor(mut stream) = service_fs
59 .next()
60 .await
61.ok_or_else(|| format_err!("Could not get next connection to Actor protocol"))?;
6263while let Some(request) = stream
64 .try_next()
65 .await
66.map_err(|e| format_err!("FIDL error in call to Actor protocol: {}", e))?
67{
68match request {
69 ActorRequest::GetActions { responder } => {
70let (client_end, mut stream) = create_request_stream::<ActionIteratorMarker>();
71 responder.send(client_end)?;
7273let actions: Vec<_> = actions.iter().map(|c| c.to_fidl()).collect();
74let mut remaining_actions = &actions[..];
7576while let Some(ActionIteratorRequest::GetNext { responder }) =
77 stream.try_next().await?
78{
79let mut bytes_used: usize = 32;
80let mut action_count = 0;
8182// Determine how many actions can be sent in a single FIDL message,
83 // accounting for the header and `Action` size.
84for action in remaining_actions {
85 bytes_used += action.measure().num_bytes;
86if bytes_used > ZX_CHANNEL_MAX_MSG_BYTES as usize {
87break;
88 }
89 action_count += 1;
90 }
91 responder.send(&remaining_actions[..action_count])?;
92 remaining_actions = &remaining_actions[action_count..];
9394// There are no more actions left to return. The client has gotten
95 // an empty response, so they also know that there are no more actions.
96 // Close this channel.
97if action_count == 0 {
98break;
99 }
100 }
101 }
102 ActorRequest::Run { action_name, seed, responder } => {
103if let Some(action) = actions.iter().find(|action| action.name == action_name) {
104let rng = SmallRng::seed_from_u64(seed);
105if let Err(e) = (action.run)(&mut data, rng).await {
106// The action failed. Return the error chain as an unstructured
107 // error message.
108let chain: Vec<String> = e.chain().map(|c| c.to_string()).collect();
109let chain = chain.join(" -> ");
110 responder.send(Some(&Error::ErrorString(chain)))?;
111 } else {
112// The action succeeded
113responder.send(None)?;
114 }
115 } else {
116 responder.send(Some(&Error::ErrorString("Invalid action name".to_string())))?;
117 }
118 }
119 }
120 }
121122Ok(())
123}