test_runners_lib/elf/
server.rsuse crate::cases::TestCaseInfo;
use crate::elf::Component;
use crate::errors::{EnumerationError, RunTestError};
use async_trait::async_trait;
use futures::future::{AbortHandle, Shared};
use futures::lock::Mutex;
use futures::prelude::*;
use rust_measure_tape_for_case::Measurable as _;
use std::pin::Pin;
use std::sync::{Arc, Weak};
use thiserror::Error;
use tracing::{error, warn};
use zx::sys::ZX_CHANNEL_MAX_MSG_BYTES;
use {fidl_fuchsia_test as ftest, fuchsia_async as fasync};
pub type PinnedFuture<T, E> = Pin<Box<dyn Future<Output = Result<T, E>> + Send>>;
type SharedFuture<T, E> = Shared<PinnedFuture<T, E>>;
pub type MemoizedFutureContainer<T, E> = Arc<Mutex<Option<SharedFuture<T, E>>>>;
pub type EnumeratedTestCases = Arc<Vec<TestCaseInfo>>;
#[async_trait]
pub trait SuiteServer: Sized + Sync + Send {
fn run(
self,
component: Weak<Component>,
test_url: &str,
stream: ftest::SuiteRequestStream,
) -> AbortHandle;
async fn enumerate_tests(
&self,
test_component: Arc<Component>,
) -> Result<EnumeratedTestCases, EnumerationError>;
async fn run_tests(
&self,
invocations: Vec<ftest::Invocation>,
run_options: ftest::RunOptions,
test_component: Arc<Component>,
run_listener: &ftest::RunListenerProxy,
) -> Result<(), RunTestError>;
fn get_parallel_count(parallel: u16) -> usize {
if parallel == 0 {
warn!("Client passed number of concurrent tests as 0, setting it to 1.");
return 1;
}
parallel.into()
}
async fn serve_test_suite(
mut self,
mut stream: ftest::SuiteRequestStream,
component: Weak<Component>,
) -> Result<(), SuiteServerError> {
while let Some(event) = stream.try_next().await.map_err(SuiteServerError::Stream)? {
match event {
ftest::SuiteRequest::GetTests { iterator, control_handle: _ } => {
let component = component.upgrade();
if component.is_none() {
break;
}
let mut stream = iterator.into_stream();
let tests = self.enumerate_tests(component.unwrap()).await?;
fasync::Task::spawn(
async move {
let cases: Vec<_> = tests
.iter()
.map(|TestCaseInfo { name, enabled }| ftest::Case {
name: Some(name.clone()),
enabled: Some(*enabled),
..Default::default()
})
.collect();
let mut remaining_cases = &cases[..];
while let Some(ftest::CaseIteratorRequest::GetNext { responder }) =
stream.try_next().await?
{
let mut bytes_used: usize = 32;
let mut case_count = 0;
for case in remaining_cases {
bytes_used += case.measure().num_bytes;
if bytes_used > ZX_CHANNEL_MAX_MSG_BYTES as usize {
break;
}
case_count += 1;
}
responder
.send(&remaining_cases[..case_count])
.map_err(SuiteServerError::Response)?;
remaining_cases = &remaining_cases[case_count..];
}
Ok(())
}
.unwrap_or_else(|e: anyhow::Error| error!("error serving tests: {:?}", e)),
)
.detach();
}
ftest::SuiteRequest::Run { tests, options, listener, .. } => {
let component = component.upgrade();
if component.is_none() {
break;
}
let listener = listener.into_proxy();
self.run_tests(tests, options, component.unwrap(), &listener).await?;
listener.on_finished().map_err(RunTestError::SendFinishAllTests).unwrap();
}
}
}
Ok(())
}
}
#[derive(Debug, Error)]
pub enum SuiteServerError {
#[error("test enumeration failed: {:?}", _0)]
Enumeration(crate::errors::EnumerationError),
#[error("error running test: {:?}", _0)]
RunTest(crate::errors::RunTestError),
#[error("stream failed: {:?}", _0)]
Stream(fidl::Error),
#[error("Cannot send fidl response: {:?}", _0)]
Response(fidl::Error),
}
impl From<EnumerationError> for SuiteServerError {
fn from(error: crate::errors::EnumerationError) -> Self {
SuiteServerError::Enumeration(error)
}
}
impl From<RunTestError> for SuiteServerError {
fn from(error: crate::errors::RunTestError) -> Self {
SuiteServerError::RunTest(error)
}
}
#[derive(Debug, Error)]
pub enum FidlError {
#[error("cannot convert proxy to channel")]
ProxyToChannel,
#[error("cannot convert client end to proxy: {:?}", _0)]
ClientEndToProxy(fidl::Error),
#[error("cannot create fidl proxy: {:?}", _0)]
CreateProxy(fidl::Error),
}
#[derive(Debug, Error)]
pub enum KernelError {
#[error("job creation failed: {:?}", _0)]
CreateJob(zx::Status),
#[error("error waiting for test process to exit: {:?}", _0)]
ProcessExit(zx::Status),
#[error("error getting info from process: {:?}", _0)]
ProcessInfo(zx::Status),
#[error("error creating socket: {:?}", _0)]
CreateSocket(zx::Status),
#[error("cannot convert zircon socket to async socket: {:?}", _0)]
SocketToAsync(zx::Status),
}