stress_test_actor/actor.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123
// Copyright 2021 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use anyhow::{format_err, Result};
use fidl::endpoints::create_request_stream;
use fidl_fuchsia_stresstest::{
Action as FidlAction, ActionIteratorMarker, ActionIteratorRequest, ActorRequest,
ActorRequestStream, Error,
};
use fuchsia_component::server::ServiceFs;
use futures::future::BoxFuture;
use futures::{StreamExt, TryStreamExt};
use rand::rngs::SmallRng;
use rand::SeedableRng;
use rust_measure_tape_for_action::Measurable;
use zx::sys::ZX_CHANNEL_MAX_MSG_BYTES;
enum OutgoingProtocols {
Actor(ActorRequestStream),
}
/// This structure represents a single action that can be run by an actor.
/// These actions are exposed to the stress test runner over FIDL using the Actor protocol.
pub struct Action<D> {
/// The name of this action, as it will appear to the stress test runner
pub name: &'static str,
/// The function that will be invoked when this action is asked to execute.
/// This function returns a boxed future that will be awaited on to allow
/// async work to be done.
pub run: for<'a> fn(&'a mut D, SmallRng) -> BoxFuture<'a, Result<()>>,
}
impl<D> Action<D> {
/// Converts the rust object into its FIDL equivalent to be sent across the wire.
fn to_fidl(&self) -> FidlAction {
FidlAction { name: Some(self.name.to_string()), ..Default::default() }
}
}
/// This is an indefinite loop that is run inside a actor component. Clients use the Actor
/// protocol to run test actions.
///
/// This method will serve the Actor protocol over its outgoing directory and wait for
/// exactly one client to connect.
///
/// NOTE: This method takes and serves the process' outgoing directory handle.
/// This handle should not be taken before this method is invoked.
///
/// NOTE: The actor library expects exactly one connection to the Actor protocol in this method.
pub async fn actor_loop<D>(mut data: D, actions: Vec<Action<D>>) -> Result<()> {
let mut service_fs = ServiceFs::new();
service_fs.dir("svc").add_fidl_service(OutgoingProtocols::Actor);
service_fs.take_and_serve_directory_handle()?;
// Wait for a client to connect
let OutgoingProtocols::Actor(mut stream) = service_fs
.next()
.await
.ok_or_else(|| format_err!("Could not get next connection to Actor protocol"))?;
while let Some(request) = stream
.try_next()
.await
.map_err(|e| format_err!("FIDL error in call to Actor protocol: {}", e))?
{
match request {
ActorRequest::GetActions { responder } => {
let (client_end, mut stream) = create_request_stream::<ActionIteratorMarker>();
responder.send(client_end)?;
let actions: Vec<_> = actions.iter().map(|c| c.to_fidl()).collect();
let mut remaining_actions = &actions[..];
while let Some(ActionIteratorRequest::GetNext { responder }) =
stream.try_next().await?
{
let mut bytes_used: usize = 32;
let mut action_count = 0;
// Determine how many actions can be sent in a single FIDL message,
// accounting for the header and `Action` size.
for action in remaining_actions {
bytes_used += action.measure().num_bytes;
if bytes_used > ZX_CHANNEL_MAX_MSG_BYTES as usize {
break;
}
action_count += 1;
}
responder.send(&remaining_actions[..action_count])?;
remaining_actions = &remaining_actions[action_count..];
// There are no more actions left to return. The client has gotten
// an empty response, so they also know that there are no more actions.
// Close this channel.
if action_count == 0 {
break;
}
}
}
ActorRequest::Run { action_name, seed, responder } => {
if let Some(action) = actions.iter().find(|action| action.name == action_name) {
let rng = SmallRng::seed_from_u64(seed);
if let Err(e) = (action.run)(&mut data, rng).await {
// The action failed. Return the error chain as an unstructured
// error message.
let chain: Vec<String> = e.chain().map(|c| c.to_string()).collect();
let chain = chain.join(" -> ");
responder.send(Some(&Error::ErrorString(chain)))?;
} else {
// The action succeeded
responder.send(None)?;
}
} else {
responder.send(Some(&Error::ErrorString("Invalid action name".to_string())))?;
}
}
}
}
Ok(())
}