test_runners_elf_lib/
test_server.rsuse crate::launcher::ComponentLauncher;
use async_trait::async_trait;
use fidl_fuchsia_test::{
self as ftest, Invocation, Result_ as TestResult, RunListenerProxy, Status,
};
use fuchsia_async as fasync;
use futures::future::{abortable, AbortHandle};
use futures::prelude::*;
use futures::TryStreamExt;
use log::{debug, error};
use std::sync::{Arc, Weak};
use test_runners_lib::cases::TestCaseInfo;
use test_runners_lib::elf::{Component, EnumeratedTestCases, KernelError, SuiteServer};
use test_runners_lib::errors::*;
use test_runners_lib::logs::SocketLogWriter;
#[derive(Default)]
pub struct TestServer<T: ComponentLauncher> {
pub launcher: T,
}
static PARALLEL_DEFAULT: u16 = 1;
#[async_trait]
impl<T: 'static> SuiteServer for TestServer<T>
where
T: ComponentLauncher,
{
async fn enumerate_tests(
&self,
_test_component: Arc<Component>,
) -> Result<EnumeratedTestCases, EnumerationError> {
Ok(Arc::new(vec![TestCaseInfo { name: "main".to_string(), enabled: true }]))
}
async fn run_tests(
&self,
invocations: Vec<Invocation>,
run_options: ftest::RunOptions,
component: Arc<Component>,
run_listener: &RunListenerProxy,
) -> Result<(), RunTestError> {
let num_parallel =
Self::get_parallel_count(run_options.parallel.unwrap_or(PARALLEL_DEFAULT));
let invocations = stream::iter(invocations);
invocations
.map(Ok)
.try_for_each_concurrent(num_parallel, |invocation| {
self.run_test(invocation, &run_options, component.clone(), run_listener)
})
.await
}
fn run(
self,
weak_component: Weak<Component>,
test_url: &str,
stream: ftest::SuiteRequestStream,
) -> AbortHandle {
let test_url = test_url.to_owned();
let (fut, test_suite_abortable_handle) =
abortable(self.serve_test_suite(stream, weak_component));
fasync::Task::local(async move {
match fut.await {
Ok(result) => {
if let Err(e) = result {
error!("server failed for test {}: {:?}", test_url, e);
}
}
Err(e) => debug!("server aborted for test {}: {:?}", test_url, e),
}
debug!("Done running server for {}.", test_url);
})
.detach();
test_suite_abortable_handle
}
}
impl<T: 'static> TestServer<T>
where
T: ComponentLauncher,
{
pub fn new(launcher_: T) -> Self {
Self { launcher: launcher_ }
}
pub fn validate_args(_args: &Vec<String>) -> Result<(), ArgumentError> {
Ok(())
}
async fn run_test<'a>(
&'a self,
invocation: Invocation,
run_options: &ftest::RunOptions,
component: Arc<Component>,
run_listener: &RunListenerProxy,
) -> Result<(), RunTestError> {
if "main" != *invocation.name.as_ref().ok_or(RunTestError::TestCaseName)? {
return Ok(());
}
let (test_stdout, stdout_client) = zx::Socket::create_stream();
let (test_stderr, stderr_client) = zx::Socket::create_stream();
let (case_listener_proxy, listener) =
fidl::endpoints::create_proxy::<fidl_fuchsia_test::CaseListenerMarker>();
run_listener
.on_test_case_started(
&invocation,
ftest::StdHandles {
out: Some(stdout_client),
err: Some(stderr_client),
..Default::default()
},
listener,
)
.map_err(RunTestError::SendStart)?;
let mut test_stdout = SocketLogWriter::new(fasync::Socket::from_socket(test_stdout));
let mut test_stderr = SocketLogWriter::new(fasync::Socket::from_socket(test_stderr));
let mut args = component.args.clone();
if let Some(user_args) = &run_options.arguments {
args.extend(user_args.clone());
}
let (process, _job, stdout_logger, stderr_logger) =
self.launcher.launch_process(&component, args).await?;
let stdout_fut = stdout_logger.buffer_and_drain(&mut test_stdout);
let stderr_fut = stderr_logger.buffer_and_drain(&mut test_stderr);
let (ret1, ret2) = futures::future::join(stdout_fut, stderr_fut).await;
ret1?;
ret2?;
fasync::OnSignals::new(&process, zx::Signals::PROCESS_TERMINATED)
.await
.map_err(KernelError::ProcessExit)
.unwrap();
let process_info = process.info().map_err(RunTestError::ProcessInfo)?;
let status = match process_info.return_code {
0 => Status::Passed,
_ => Status::Failed,
};
case_listener_proxy
.finished(&TestResult { status: Some(status), ..Default::default() })
.map_err(RunTestError::SendFinish)?;
Ok(())
}
}