stress_test_actor/
actor.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use anyhow::{format_err, Result};
6use fidl::endpoints::create_request_stream;
7use fidl_fuchsia_stresstest::{
8    Action as FidlAction, ActionIteratorMarker, ActionIteratorRequest, ActorRequest,
9    ActorRequestStream, Error,
10};
11use fuchsia_component::server::ServiceFs;
12use futures::future::BoxFuture;
13use futures::{StreamExt, TryStreamExt};
14use rand::rngs::SmallRng;
15use rand::SeedableRng;
16use rust_measure_tape_for_action::Measurable;
17use zx::sys::ZX_CHANNEL_MAX_MSG_BYTES;
18
19enum OutgoingProtocols {
20    Actor(ActorRequestStream),
21}
22
23/// This structure represents a single action that can be run by an actor.
24/// These actions are exposed to the stress test runner over FIDL using the Actor protocol.
25pub struct Action<D> {
26    /// The name of this action, as it will appear to the stress test runner
27    pub name: &'static str,
28
29    /// The function that will be invoked when this action is asked to execute.
30    /// This function returns a boxed future that will be awaited on to allow
31    /// async work to be done.
32    pub run: for<'a> fn(&'a mut D, SmallRng) -> BoxFuture<'a, Result<()>>,
33}
34
35impl<D> Action<D> {
36    /// Converts the rust object into its FIDL equivalent to be sent across the wire.
37    fn to_fidl(&self) -> FidlAction {
38        FidlAction { name: Some(self.name.to_string()), ..Default::default() }
39    }
40}
41
42/// This is an indefinite loop that is run inside a actor component. Clients use the Actor
43/// protocol to run test actions.
44///
45/// This method will serve the Actor protocol over its outgoing directory and wait for
46/// exactly one client to connect.
47///
48/// NOTE: This method takes and serves the process' outgoing directory handle.
49/// This handle should not be taken before this method is invoked.
50///
51/// NOTE: The actor library expects exactly one connection to the Actor protocol in this method.
52pub async fn actor_loop<D>(mut data: D, actions: Vec<Action<D>>) -> Result<()> {
53    let mut service_fs = ServiceFs::new();
54    service_fs.dir("svc").add_fidl_service(OutgoingProtocols::Actor);
55    service_fs.take_and_serve_directory_handle()?;
56
57    // Wait for a client to connect
58    let OutgoingProtocols::Actor(mut stream) = service_fs
59        .next()
60        .await
61        .ok_or_else(|| format_err!("Could not get next connection to Actor protocol"))?;
62
63    while let Some(request) = stream
64        .try_next()
65        .await
66        .map_err(|e| format_err!("FIDL error in call to Actor protocol: {}", e))?
67    {
68        match request {
69            ActorRequest::GetActions { responder } => {
70                let (client_end, mut stream) = create_request_stream::<ActionIteratorMarker>();
71                responder.send(client_end)?;
72
73                let actions: Vec<_> = actions.iter().map(|c| c.to_fidl()).collect();
74                let mut remaining_actions = &actions[..];
75
76                while let Some(ActionIteratorRequest::GetNext { responder }) =
77                    stream.try_next().await?
78                {
79                    let mut bytes_used: usize = 32;
80                    let mut action_count = 0;
81
82                    // Determine how many actions can be sent in a single FIDL message,
83                    // accounting for the header and `Action` size.
84                    for action in remaining_actions {
85                        bytes_used += action.measure().num_bytes;
86                        if bytes_used > ZX_CHANNEL_MAX_MSG_BYTES as usize {
87                            break;
88                        }
89                        action_count += 1;
90                    }
91                    responder.send(&remaining_actions[..action_count])?;
92                    remaining_actions = &remaining_actions[action_count..];
93
94                    // There are no more actions left to return. The client has gotten
95                    // an empty response, so they also know that there are no more actions.
96                    // Close this channel.
97                    if action_count == 0 {
98                        break;
99                    }
100                }
101            }
102            ActorRequest::Run { action_name, seed, responder } => {
103                if let Some(action) = actions.iter().find(|action| action.name == action_name) {
104                    let rng = SmallRng::seed_from_u64(seed);
105                    if let Err(e) = (action.run)(&mut data, rng).await {
106                        // The action failed. Return the error chain as an unstructured
107                        // error message.
108                        let chain: Vec<String> = e.chain().map(|c| c.to_string()).collect();
109                        let chain = chain.join(" -> ");
110                        responder.send(Some(&Error::ErrorString(chain)))?;
111                    } else {
112                        // The action succeeded
113                        responder.send(None)?;
114                    }
115                } else {
116                    responder.send(Some(&Error::ErrorString("Invalid action name".to_string())))?;
117                }
118            }
119        }
120    }
121
122    Ok(())
123}