test_runners_lib/elf/
server.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

//! Shared traits and methods to be used in test suite servers for tests that are executed as ELF
//! components.

use crate::cases::TestCaseInfo;
use crate::elf::Component;
use crate::errors::{EnumerationError, RunTestError};
use async_trait::async_trait;
use futures::future::{AbortHandle, Shared};
use futures::lock::Mutex;
use futures::prelude::*;
use rust_measure_tape_for_case::Measurable as _;
use std::pin::Pin;
use std::sync::{Arc, Weak};
use thiserror::Error;
use tracing::{error, warn};
use zx::sys::ZX_CHANNEL_MAX_MSG_BYTES;
use {fidl_fuchsia_test as ftest, fuchsia_async as fasync};

/// A pinned, boxed future whose output is `Result<T, E>`.
pub type PinnedFuture<T, E> = Pin<Box<dyn Future<Output = Result<T, E>> + Send>>;
/// `SharedFuture` wrapper around `PinnedFuture<T, E>`. Can be cloned.
type SharedFuture<T, E> = Shared<PinnedFuture<T, E>>;
/// A mutable container around `SharedFuture<T, E>` that can be filled in when the stored future is
/// first created.
pub type MemoizedFutureContainer<T, E> = Arc<Mutex<Option<SharedFuture<T, E>>>>;
/// Ordered list of `TestCaseInfo`s.
pub type EnumeratedTestCases = Arc<Vec<TestCaseInfo>>;

/// Describes a test suite server for tests that are executed as ELF components.
#[async_trait]
pub trait SuiteServer: Sized + Sync + Send {
    /// Run this server.
    ///
    /// * `component`: Test component instance.
    /// * `test_url`: URL of test component.
    /// * `stream`: Stream to serve Suite protocol on.
    ///
    /// Returns abortable handle for suite server future.
    fn run(
        self,
        component: Weak<Component>,
        test_url: &str,
        stream: ftest::SuiteRequestStream,
    ) -> AbortHandle;

    /// Retrieves test information from the test binary.
    ///
    /// A cached list of test cases should be returned by cloning a
    /// `SharedFuture<EnumeratedTestCases, EnumerationError>` that is stored in the suite server
    /// struct.
    async fn enumerate_tests(
        &self,
        test_component: Arc<Component>,
    ) -> Result<EnumeratedTestCases, EnumerationError>;

    /// Runs requested tests and sends test events to the given listener.
    async fn run_tests(
        &self,
        invocations: Vec<ftest::Invocation>,
        run_options: ftest::RunOptions,
        test_component: Arc<Component>,
        run_listener: &ftest::RunListenerProxy,
    ) -> Result<(), RunTestError>;

    fn get_parallel_count(parallel: u16) -> usize {
        if parallel == 0 {
            warn!("Client passed number of concurrent tests as 0, setting it to 1.");
            return 1;
        }
        parallel.into()
    }

    /// Implements `fuchsia.test.Suite` service and runs test.
    async fn serve_test_suite(
        mut self,
        mut stream: ftest::SuiteRequestStream,
        component: Weak<Component>,
    ) -> Result<(), SuiteServerError> {
        while let Some(event) = stream.try_next().await.map_err(SuiteServerError::Stream)? {
            match event {
                ftest::SuiteRequest::GetTests { iterator, control_handle: _ } => {
                    let component = component.upgrade();
                    if component.is_none() {
                        // no component object, return, test has ended.
                        break;
                    }
                    let mut stream = iterator.into_stream();
                    let tests = self.enumerate_tests(component.unwrap()).await?;

                    fasync::Task::spawn(
                        async move {
                            let cases: Vec<_> = tests
                                .iter()
                                .map(|TestCaseInfo { name, enabled }| ftest::Case {
                                    name: Some(name.clone()),
                                    enabled: Some(*enabled),
                                    ..Default::default()
                                })
                                .collect();
                            let mut remaining_cases = &cases[..];
                            while let Some(ftest::CaseIteratorRequest::GetNext { responder }) =
                                stream.try_next().await?
                            {
                                // Paginate cases
                                // Page overhead of message header + vector
                                let mut bytes_used: usize = 32;
                                let mut case_count = 0;
                                for case in remaining_cases {
                                    bytes_used += case.measure().num_bytes;
                                    if bytes_used > ZX_CHANNEL_MAX_MSG_BYTES as usize {
                                        break;
                                    }
                                    case_count += 1;
                                }
                                responder
                                    .send(&remaining_cases[..case_count])
                                    .map_err(SuiteServerError::Response)?;
                                remaining_cases = &remaining_cases[case_count..];
                            }
                            Ok(())
                        }
                        .unwrap_or_else(|e: anyhow::Error| error!("error serving tests: {:?}", e)),
                    )
                    .detach();
                }
                ftest::SuiteRequest::Run { tests, options, listener, .. } => {
                    let component = component.upgrade();
                    if component.is_none() {
                        // no component object, return, test has ended.
                        break;
                    }

                    let listener = listener.into_proxy();

                    self.run_tests(tests, options, component.unwrap(), &listener).await?;
                    listener.on_finished().map_err(RunTestError::SendFinishAllTests).unwrap();
                }
            }
        }
        Ok(())
    }
}

/// Error encountered while running suite server
#[derive(Debug, Error)]
pub enum SuiteServerError {
    #[error("test enumeration failed: {:?}", _0)]
    Enumeration(crate::errors::EnumerationError),

    #[error("error running test: {:?}", _0)]
    RunTest(crate::errors::RunTestError),

    #[error("stream failed: {:?}", _0)]
    Stream(fidl::Error),

    #[error("Cannot send fidl response: {:?}", _0)]
    Response(fidl::Error),
}

impl From<EnumerationError> for SuiteServerError {
    fn from(error: crate::errors::EnumerationError) -> Self {
        SuiteServerError::Enumeration(error)
    }
}

impl From<RunTestError> for SuiteServerError {
    fn from(error: crate::errors::RunTestError) -> Self {
        SuiteServerError::RunTest(error)
    }
}

/// Error encountered while working with the FIDL library.
#[derive(Debug, Error)]
pub enum FidlError {
    #[error("cannot convert proxy to channel")]
    ProxyToChannel,

    #[error("cannot convert client end to proxy: {:?}", _0)]
    ClientEndToProxy(fidl::Error),

    #[error("cannot create fidl proxy: {:?}", _0)]
    CreateProxy(fidl::Error),
}

/// Error encountered while working with kernel object.
#[derive(Debug, Error)]
pub enum KernelError {
    #[error("job creation failed: {:?}", _0)]
    CreateJob(zx::Status),

    #[error("error waiting for test process to exit: {:?}", _0)]
    ProcessExit(zx::Status),

    #[error("error getting info from process: {:?}", _0)]
    ProcessInfo(zx::Status),

    #[error("error creating socket: {:?}", _0)]
    CreateSocket(zx::Status),

    #[error("cannot convert zircon socket to async socket: {:?}", _0)]
    SocketToAsync(zx::Status),
}