run_test_suite_lib/
running_suite.rs

1// Copyright 2022 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::cancel::{Cancelled, NamedFutureExt, OrCancel};
6use crate::outcome::{Lifecycle, Outcome, RunTestSuiteError, UnexpectedEventError};
7use crate::output::{self, ArtifactType, CaseId, SuiteReporter, Timestamp};
8use crate::stream_util::StreamUtil;
9use crate::trace::duration;
10use crate::{artifacts, diagnostics};
11use diagnostics_data::Severity;
12use fidl_fuchsia_test_manager::{
13    self as ftest_manager, LaunchError, SuiteArtifactGeneratedEventDetails,
14    SuiteStoppedEventDetails, TestCaseArtifactGeneratedEventDetails, TestCaseFinishedEventDetails,
15    TestCaseFoundEventDetails, TestCaseStartedEventDetails, TestCaseStoppedEventDetails,
16};
17use fuchsia_async as fasync;
18use futures::StreamExt;
19use futures::future::Either;
20use futures::prelude::*;
21use futures::stream::FuturesUnordered;
22use log::{error, info, warn};
23use std::collections::HashMap;
24use std::io::Write;
25use std::sync::Arc;
26use std::sync::atomic::{AtomicBool, Ordering};
27
28/// Struct used by |run_suite_and_collect_logs| to track the state of test cases and suites.
29struct CollectedEntityState<R> {
30    reporter: R,
31    name: String,
32    lifecycle: Lifecycle,
33    artifact_tasks:
34        Vec<fasync::Task<Result<Option<diagnostics::LogCollectionOutcome>, anyhow::Error>>>,
35}
36
37/// Collects results and artifacts for a single suite.
38// TODO(satsukiu): There's two ways to return an error here:
39// * Err(RunTestSuiteError)
40// * Ok(Outcome::Error(RunTestSuiteError))
41// We should consider how to consolidate these.
42pub(crate) async fn run_suite_and_collect_logs<F: Future<Output = ()> + Unpin>(
43    running_suite: RunningSuite,
44    suite_reporter: &SuiteReporter<'_>,
45    log_display: diagnostics::LogDisplayConfiguration,
46    cancel_fut: F,
47) -> Result<Outcome, RunTestSuiteError> {
48    duration!("collect_suite");
49
50    let RunningSuite {
51        mut event_stream,
52        timeout,
53        timeout_grace,
54        max_severity_logs,
55        no_cases_equals_success,
56        ..
57    } = running_suite;
58
59    let log_opts =
60        diagnostics::LogCollectionOptions { format: log_display, max_severity: max_severity_logs };
61
62    let mut test_cases: HashMap<u32, CollectedEntityState<_>> = HashMap::new();
63    let mut suite_state = CollectedEntityState {
64        reporter: suite_reporter,
65        name: "".to_string(),
66        lifecycle: Lifecycle::Found,
67        artifact_tasks: vec![],
68    };
69    let mut suite_finish_timestamp = Timestamp::Unknown;
70    let mut outcome = Outcome::Passed;
71
72    let collect_results_fut = async {
73        while let Some(event_result) = event_stream.next().named("next_event").await {
74            match event_result {
75                Err(e) => match (e, no_cases_equals_success) {
76                    (RunTestSuiteError::Launch(LaunchError::NoMatchingCases), Some(true)) => {
77                        suite_state.lifecycle = Lifecycle::Stopped;
78                    }
79                    (e, _) => {
80                        suite_state
81                            .reporter
82                            .stopped(&output::ReportedOutcome::Error, Timestamp::Unknown)?;
83                        return Err(e);
84                    }
85                },
86                Ok(event) => {
87                    let timestamp = Timestamp::from_nanos(event.timestamp);
88                    match event.details.expect("event cannot be None") {
89                        ftest_manager::EventDetails::TestCaseFound(TestCaseFoundEventDetails {
90                            test_case_name: Some(test_case_name),
91                            test_case_id: Some(test_case_id),
92                            ..
93                        }) => {
94                            if test_cases.contains_key(&test_case_id) {
95                                return Err(UnexpectedEventError::InvalidCaseEvent {
96                                    last_state: Lifecycle::Found,
97                                    next_state: Lifecycle::Found,
98                                    test_case_name,
99                                    test_case_id,
100                                }
101                                .into());
102                            }
103                            test_cases.insert(
104                                test_case_id,
105                                CollectedEntityState {
106                                    reporter: suite_reporter
107                                        .new_case(&test_case_name, &CaseId(test_case_id))?,
108                                    name: test_case_name,
109                                    lifecycle: Lifecycle::Found,
110                                    artifact_tasks: vec![],
111                                },
112                            );
113                        }
114                        ftest_manager::EventDetails::TestCaseStarted(
115                            TestCaseStartedEventDetails {
116                                test_case_id: Some(test_case_id), ..
117                            },
118                        ) => {
119                            let entry = test_cases.get_mut(&test_case_id).ok_or(
120                                UnexpectedEventError::CaseEventButNotFound {
121                                    next_state: Lifecycle::Started,
122                                    test_case_id,
123                                },
124                            )?;
125                            match &entry.lifecycle {
126                                Lifecycle::Found => {
127                                    // TODO(https://fxbug.dev/42159975): Record per-case runtime once we have an
128                                    // accurate way to measure it.
129                                    entry.reporter.started(Timestamp::Unknown)?;
130                                    entry.lifecycle = Lifecycle::Started;
131                                }
132                                other => {
133                                    return Err(UnexpectedEventError::InvalidCaseEvent {
134                                        last_state: *other,
135                                        next_state: Lifecycle::Started,
136                                        test_case_name: entry.name.clone(),
137                                        test_case_id,
138                                    }
139                                    .into());
140                                }
141                            }
142                        }
143                        ftest_manager::EventDetails::TestCaseArtifactGenerated(
144                            TestCaseArtifactGeneratedEventDetails {
145                                test_case_id: Some(test_case_id),
146                                artifact: Some(artifact),
147                                ..
148                            },
149                        ) => {
150                            let entry = test_cases.get_mut(&test_case_id).ok_or(
151                                UnexpectedEventError::CaseArtifactButNotFound { test_case_id },
152                            )?;
153                            if matches!(entry.lifecycle, Lifecycle::Finished) {
154                                return Err(UnexpectedEventError::CaseArtifactButFinished {
155                                    test_case_id,
156                                }
157                                .into());
158                            }
159                            let artifact_fut = artifacts::drain_artifact(
160                                &entry.reporter,
161                                artifact,
162                                log_opts.clone(),
163                            )
164                            .await?;
165                            entry.artifact_tasks.push(fasync::Task::spawn(artifact_fut));
166                        }
167                        ftest_manager::EventDetails::TestCaseStopped(
168                            TestCaseStoppedEventDetails {
169                                test_case_id: Some(test_case_id),
170                                result: Some(result),
171                                ..
172                            },
173                        ) => {
174                            let entry = test_cases.get_mut(&test_case_id).ok_or(
175                                UnexpectedEventError::CaseEventButNotFound {
176                                    next_state: Lifecycle::Stopped,
177                                    test_case_id,
178                                },
179                            )?;
180                            match &entry.lifecycle {
181                                Lifecycle::Started => {
182                                    // TODO(https://fxbug.dev/42159975): Record per-case runtime once we have an
183                                    // accurate way to measure it.
184                                    entry.reporter.stopped(&result.into(), Timestamp::Unknown)?;
185                                    entry.lifecycle = Lifecycle::Stopped;
186                                }
187                                other => {
188                                    return Err(UnexpectedEventError::InvalidCaseEvent {
189                                        last_state: *other,
190                                        next_state: Lifecycle::Stopped,
191                                        test_case_name: entry.name.clone(),
192                                        test_case_id,
193                                    }
194                                    .into());
195                                }
196                            }
197                        }
198                        ftest_manager::EventDetails::TestCaseFinished(
199                            TestCaseFinishedEventDetails {
200                                test_case_id: Some(test_case_id), ..
201                            },
202                        ) => {
203                            let entry = test_cases.get_mut(&test_case_id).ok_or(
204                                UnexpectedEventError::CaseEventButNotFound {
205                                    next_state: Lifecycle::Finished,
206                                    test_case_id,
207                                },
208                            )?;
209                            match &entry.lifecycle {
210                                Lifecycle::Stopped => {
211                                    // don't mark reporter finished yet, we want to finish draining
212                                    // artifacts separately.
213                                    entry.lifecycle = Lifecycle::Finished;
214                                }
215                                other => {
216                                    return Err(UnexpectedEventError::InvalidCaseEvent {
217                                        last_state: *other,
218                                        next_state: Lifecycle::Finished,
219                                        test_case_name: entry.name.clone(),
220                                        test_case_id,
221                                    }
222                                    .into());
223                                }
224                            }
225                        }
226                        ftest_manager::EventDetails::SuiteArtifactGenerated(
227                            SuiteArtifactGeneratedEventDetails { artifact: Some(artifact), .. },
228                        ) => {
229                            let artifact_fut = artifacts::drain_artifact(
230                                suite_reporter,
231                                artifact,
232                                log_opts.clone(),
233                            )
234                            .await?;
235                            suite_state.artifact_tasks.push(fasync::Task::spawn(artifact_fut));
236                        }
237                        ftest_manager::EventDetails::SuiteStarted(_) => {
238                            match &suite_state.lifecycle {
239                                Lifecycle::Found => {
240                                    suite_state.reporter.started(timestamp)?;
241                                    suite_state.lifecycle = Lifecycle::Started;
242                                }
243                                other => {
244                                    return Err(UnexpectedEventError::InvalidEvent {
245                                        last_state: *other,
246                                        next_state: Lifecycle::Started,
247                                    }
248                                    .into());
249                                }
250                            }
251                        }
252                        ftest_manager::EventDetails::SuiteStopped(SuiteStoppedEventDetails {
253                            result: Some(result),
254                            ..
255                        }) => match &suite_state.lifecycle {
256                            Lifecycle::Started => {
257                                suite_state.lifecycle = Lifecycle::Stopped;
258                                suite_finish_timestamp = timestamp;
259                                outcome = match result {
260                                    ftest_manager::SuiteResult::Finished => Outcome::Passed,
261                                    ftest_manager::SuiteResult::Failed => Outcome::Failed,
262                                    ftest_manager::SuiteResult::DidNotFinish => {
263                                        Outcome::Inconclusive
264                                    }
265                                    ftest_manager::SuiteResult::TimedOut
266                                    | ftest_manager::SuiteResult::Stopped => Outcome::Timedout,
267                                    ftest_manager::SuiteResult::InternalError => Outcome::error(
268                                        UnexpectedEventError::InternalErrorSuiteResult,
269                                    ),
270                                    r => {
271                                        return Err(
272                                            UnexpectedEventError::UnrecognizedSuiteResult {
273                                                result: r,
274                                            }
275                                            .into(),
276                                        );
277                                    }
278                                };
279                            }
280                            other => {
281                                return Err(UnexpectedEventError::InvalidEvent {
282                                    last_state: *other,
283                                    next_state: Lifecycle::Stopped,
284                                }
285                                .into());
286                            }
287                        },
288                        ftest_manager::EventDetailsUnknown!() => {
289                            warn!("Encountered unrecognized suite event");
290                        }
291                    }
292                }
293            }
294        }
295        drop(event_stream); // Explicit drop here to force ownership move.
296        Ok(())
297    }
298    .boxed_local();
299
300    let start_time = std::time::Instant::now();
301    let kill_timeout_future = match timeout {
302        None => futures::future::pending::<()>().boxed(),
303        Some(duration) => fasync::Timer::new(start_time + duration + timeout_grace).boxed(),
304    };
305
306    // If kill timeout or cancel occur, we want to stop polling events.
307    // kill_fut resolves to the outcome to which results should be overwritten
308    // if it resolves.
309    let kill_fut = async move {
310        match futures::future::select(cancel_fut, kill_timeout_future).await {
311            Either::Left(_) => Outcome::Cancelled,
312            Either::Right(_) => Outcome::Timedout,
313        }
314    }
315    .shared();
316
317    let early_termination_outcome =
318        match collect_results_fut.boxed_local().or_cancelled(kill_fut.clone()).await {
319            Ok(Ok(())) => None,
320            Ok(Err(e)) => return Err(e),
321            Err(Cancelled(outcome)) => Some(outcome),
322        };
323
324    // Finish collecting artifacts and report errors.
325    info!("Awaiting case artifacts");
326    let mut unfinished_test_case_names = vec![];
327    for (_, test_case) in test_cases.into_iter() {
328        let CollectedEntityState { reporter, name, lifecycle, artifact_tasks } = test_case;
329        match (lifecycle, early_termination_outcome.clone()) {
330            (Lifecycle::Started | Lifecycle::Found, Some(early)) => {
331                reporter.stopped(&early.into(), Timestamp::Unknown)?;
332            }
333            (Lifecycle::Found, None) => {
334                unfinished_test_case_names.push(name.clone());
335                reporter.stopped(&Outcome::Inconclusive.into(), Timestamp::Unknown)?;
336            }
337            (Lifecycle::Started, None) => {
338                unfinished_test_case_names.push(name.clone());
339                reporter.stopped(&Outcome::DidNotFinish.into(), Timestamp::Unknown)?;
340            }
341            (Lifecycle::Stopped | Lifecycle::Finished, _) => (),
342        }
343
344        let finish_artifacts_fut = FuturesUnordered::from_iter(artifact_tasks)
345            .map(|result| match result {
346                Err(e) => {
347                    error!("Failed to collect artifact for {}: {:?}", name, e);
348                }
349                Ok(Some(_log_result)) => warn!("Unexpectedly got log results for a test case"),
350                Ok(None) => (),
351            })
352            .collect::<()>();
353        if let Err(Cancelled(_)) = finish_artifacts_fut.or_cancelled(kill_fut.clone()).await {
354            warn!("Stopped polling artifacts for {} due to timeout", name);
355        }
356
357        reporter.finished()?;
358    }
359    if !unfinished_test_case_names.is_empty() {
360        outcome = Outcome::error(UnexpectedEventError::CasesDidNotFinish {
361            cases: unfinished_test_case_names,
362        });
363    }
364
365    match (suite_state.lifecycle, early_termination_outcome) {
366        (Lifecycle::Found | Lifecycle::Started, Some(early)) => {
367            if matches!(&outcome, Outcome::Passed | Outcome::Failed) {
368                outcome = early;
369            }
370        }
371        (Lifecycle::Found | Lifecycle::Started, None) => {
372            outcome = Outcome::error(UnexpectedEventError::SuiteDidNotReportStop);
373        }
374        // If the suite successfully reported a result, don't alter it.
375        (Lifecycle::Stopped, _) => (),
376        // Finished doesn't happen since there's no SuiteFinished event.
377        (Lifecycle::Finished, _) => unreachable!(),
378    }
379
380    let restricted_logs_present = AtomicBool::new(false);
381    let finish_artifacts_fut = FuturesUnordered::from_iter(suite_state.artifact_tasks)
382        .then(|result| async {
383            match result {
384                Err(e) => {
385                    error!("Failed to collect artifact for suite: {:?}", e);
386                }
387                Ok(Some(log_result)) => match log_result {
388                    diagnostics::LogCollectionOutcome::Error { restricted_logs } => {
389                        restricted_logs_present.store(true, Ordering::Relaxed);
390                        let mut log_artifact = match suite_reporter
391                            .new_artifact(&ArtifactType::RestrictedLog)
392                        {
393                            Ok(artifact) => artifact,
394                            Err(e) => {
395                                warn!("Error creating artifact to report restricted logs: {:?}", e);
396                                return;
397                            }
398                        };
399                        for log in restricted_logs.iter() {
400                            if let Err(e) = writeln!(log_artifact, "{}", log) {
401                                warn!("Error recording restricted logs: {:?}", e);
402                                return;
403                            }
404                        }
405                    }
406                    diagnostics::LogCollectionOutcome::Passed => (),
407                },
408                Ok(None) => (),
409            }
410        })
411        .collect::<()>();
412    if let Err(Cancelled(_)) = finish_artifacts_fut.or_cancelled(kill_fut).await {
413        warn!("Stopped polling artifacts due to timeout");
414    }
415    if restricted_logs_present.into_inner() && matches!(outcome, Outcome::Passed) {
416        outcome = Outcome::Failed;
417    }
418
419    suite_reporter.stopped(&outcome.clone().into(), suite_finish_timestamp)?;
420
421    Ok(outcome)
422}
423
424type EventStream =
425    std::pin::Pin<Box<dyn Stream<Item = Result<ftest_manager::Event, RunTestSuiteError>> + Send>>;
426
427/// A test suite that is known to have started execution. A suite is considered started once
428/// any event is produced for the suite.
429pub(crate) struct RunningSuite {
430    event_stream: EventStream,
431    max_severity_logs: Option<Severity>,
432    timeout: Option<std::time::Duration>,
433    timeout_grace: std::time::Duration,
434    no_cases_equals_success: Option<bool>,
435}
436
437pub(crate) struct WaitForStartArgs {
438    pub(crate) proxy: ftest_manager::SuiteControllerProxy,
439    pub(crate) max_severity_logs: Option<Severity>,
440    pub(crate) timeout: Option<std::time::Duration>,
441    pub(crate) timeout_grace: std::time::Duration,
442    pub(crate) max_pipelined: Option<usize>,
443    pub(crate) no_cases_equals_success: Option<bool>,
444}
445
446impl RunningSuite {
447    /// Number of concurrently active WatchEvents requests. Chosen by testing powers of 2 when
448    /// running a set of tests using ffx test against an emulator, and taking the value at
449    /// which improvement stops.
450    const DEFAULT_PIPELINED_REQUESTS: usize = 8;
451    pub(crate) async fn wait_for_start(args: WaitForStartArgs) -> Self {
452        let WaitForStartArgs {
453            proxy,
454            max_severity_logs,
455            timeout,
456            timeout_grace,
457            max_pipelined,
458            no_cases_equals_success,
459        } = args;
460
461        let proxy = Arc::new(proxy);
462        // Stream of fidl responses, with multiple concurrently active requests.
463        let unprocessed_event_stream = futures::stream::repeat_with(move || {
464            proxy.watch_events().inspect(|events_result| match events_result {
465                Ok(Ok(events)) => info!("Latest suite event: {:?}", events.last()),
466                _ => (),
467            })
468        })
469        .buffered(max_pipelined.unwrap_or(Self::DEFAULT_PIPELINED_REQUESTS));
470        // Terminate the stream after we get an error or empty list of events.
471        let terminated_event_stream =
472            unprocessed_event_stream.take_until_stop_after(|result| match &result {
473                Ok(Ok(events)) => events.is_empty(),
474                Err(_) | Ok(Err(_)) => true,
475            });
476        // Flatten the stream of vecs into a stream of single events.
477        let mut event_stream = terminated_event_stream
478            .map(Self::convert_to_result_vec)
479            .map(futures::stream::iter)
480            .flatten()
481            .peekable();
482        // Wait for the first event to be ready, which signals the suite has started.
483        std::pin::Pin::new(&mut event_stream).peek().await;
484
485        Self {
486            event_stream: event_stream.boxed(),
487            timeout,
488            timeout_grace,
489            max_severity_logs,
490            no_cases_equals_success,
491        }
492    }
493
494    fn convert_to_result_vec(
495        vec: Result<Result<Vec<ftest_manager::Event>, ftest_manager::LaunchError>, fidl::Error>,
496    ) -> Vec<Result<ftest_manager::Event, RunTestSuiteError>> {
497        match vec {
498            Ok(Ok(events)) => events.into_iter().map(Ok).collect(),
499            Ok(Err(e)) => vec![Err(e.into())],
500            Err(e) => vec![Err(e.into())],
501        }
502    }
503}
504
505#[cfg(test)]
506mod test {
507    use super::*;
508    use crate::output::EntityId;
509    use assert_matches::assert_matches;
510    use fidl::endpoints::create_proxy_and_stream;
511    use maplit::hashmap;
512
513    async fn respond_to_watch_events(
514        request_stream: &mut ftest_manager::SuiteControllerRequestStream,
515        events: Vec<ftest_manager::Event>,
516    ) {
517        let request = request_stream
518            .next()
519            .await
520            .expect("did not get next request")
521            .expect("error getting next request");
522        let responder = match request {
523            ftest_manager::SuiteControllerRequest::WatchEvents { responder } => responder,
524            r => panic!("Expected WatchEvents request but got {:?}", r),
525        };
526
527        responder.send(Ok(events)).expect("send events");
528    }
529
530    /// Serves all events to completion.
531    async fn serve_all_events(
532        mut request_stream: ftest_manager::SuiteControllerRequestStream,
533        events: Vec<ftest_manager::Event>,
534    ) {
535        const BATCH_SIZE: usize = 5;
536        let mut event_iter = events.into_iter();
537        while event_iter.len() > 0 {
538            respond_to_watch_events(
539                &mut request_stream,
540                event_iter.by_ref().take(BATCH_SIZE).collect(),
541            )
542            .await;
543        }
544        respond_to_watch_events(&mut request_stream, vec![]).await;
545    }
546
547    /// Serves all events to completion, then wait for the channel to close.
548    async fn serve_all_events_then_hang(
549        mut request_stream: ftest_manager::SuiteControllerRequestStream,
550        events: Vec<ftest_manager::Event>,
551    ) {
552        const BATCH_SIZE: usize = 5;
553        let mut event_iter = events.into_iter();
554        while event_iter.len() > 0 {
555            respond_to_watch_events(
556                &mut request_stream,
557                event_iter.by_ref().take(BATCH_SIZE).collect(),
558            )
559            .await;
560        }
561        let _requests = request_stream.collect::<Vec<_>>().await;
562    }
563
564    /// Creates an Event which is unpopulated, except for timestamp.
565    /// This isn't representative of an actual event from test framework, but is sufficient
566    /// to assert events are routed correctly.
567    fn create_empty_event(timestamp: i64) -> ftest_manager::Event {
568        ftest_manager::Event { timestamp: Some(timestamp), ..Default::default() }
569    }
570
571    macro_rules! assert_empty_events_eq {
572        ($t1:expr, $t2:expr) => {
573            assert_eq!($t1.timestamp, $t2.timestamp, "Got incorrect event.")
574        };
575    }
576
577    #[fuchsia::test]
578    async fn running_events_simple() {
579        let (suite_proxy, mut suite_request_stream) =
580            create_proxy_and_stream::<ftest_manager::SuiteControllerMarker>();
581        let suite_server_task = fasync::Task::spawn(async move {
582            respond_to_watch_events(&mut suite_request_stream, vec![create_empty_event(0)]).await;
583            respond_to_watch_events(&mut suite_request_stream, vec![]).await;
584            drop(suite_request_stream);
585        });
586
587        let mut running_suite = RunningSuite::wait_for_start(WaitForStartArgs {
588            proxy: suite_proxy,
589            max_severity_logs: None,
590            timeout: None,
591            timeout_grace: std::time::Duration::ZERO,
592            max_pipelined: None,
593            no_cases_equals_success: None,
594        })
595        .await;
596        assert_empty_events_eq!(
597            running_suite.event_stream.next().await.unwrap().unwrap(),
598            create_empty_event(0)
599        );
600        assert!(running_suite.event_stream.next().await.is_none());
601        // polling again should still give none.
602        assert!(running_suite.event_stream.next().await.is_none());
603        suite_server_task.await;
604    }
605
606    #[fuchsia::test]
607    async fn running_events_multiple_events() {
608        let (suite_proxy, mut suite_request_stream) =
609            create_proxy_and_stream::<ftest_manager::SuiteControllerMarker>();
610        let suite_server_task = fasync::Task::spawn(async move {
611            respond_to_watch_events(
612                &mut suite_request_stream,
613                vec![create_empty_event(0), create_empty_event(1)],
614            )
615            .await;
616            respond_to_watch_events(
617                &mut suite_request_stream,
618                vec![create_empty_event(2), create_empty_event(3)],
619            )
620            .await;
621            respond_to_watch_events(&mut suite_request_stream, vec![]).await;
622            drop(suite_request_stream);
623        });
624
625        let mut running_suite = RunningSuite::wait_for_start(WaitForStartArgs {
626            proxy: suite_proxy,
627            max_severity_logs: None,
628            timeout: None,
629            timeout_grace: std::time::Duration::ZERO,
630            max_pipelined: None,
631            no_cases_equals_success: None,
632        })
633        .await;
634
635        for num in 0..4 {
636            assert_empty_events_eq!(
637                running_suite.event_stream.next().await.unwrap().unwrap(),
638                create_empty_event(num)
639            );
640        }
641        assert!(running_suite.event_stream.next().await.is_none());
642        suite_server_task.await;
643    }
644
645    #[fuchsia::test]
646    async fn running_events_peer_closed() {
647        let (suite_proxy, mut suite_request_stream) =
648            create_proxy_and_stream::<ftest_manager::SuiteControllerMarker>();
649        let suite_server_task = fasync::Task::spawn(async move {
650            respond_to_watch_events(&mut suite_request_stream, vec![create_empty_event(1)]).await;
651            drop(suite_request_stream);
652        });
653
654        let mut running_suite = RunningSuite::wait_for_start(WaitForStartArgs {
655            proxy: suite_proxy,
656            max_severity_logs: None,
657            timeout: None,
658            timeout_grace: std::time::Duration::ZERO,
659            max_pipelined: None,
660            no_cases_equals_success: None,
661        })
662        .await;
663        assert_empty_events_eq!(
664            running_suite.event_stream.next().await.unwrap().unwrap(),
665            create_empty_event(1)
666        );
667        assert_matches!(
668            running_suite.event_stream.next().await,
669            Some(Err(RunTestSuiteError::Fidl(fidl::Error::ClientChannelClosed { .. })))
670        );
671        suite_server_task.await;
672    }
673
674    fn event_from_details(
675        timestamp: i64,
676        details: ftest_manager::EventDetails,
677    ) -> ftest_manager::Event {
678        let mut event = create_empty_event(timestamp);
679        event.details = Some(details);
680        event
681    }
682
683    fn case_found_event(timestamp: i64, test_case_id: u32, name: &str) -> ftest_manager::Event {
684        event_from_details(
685            timestamp,
686            ftest_manager::EventDetails::TestCaseFound(ftest_manager::TestCaseFoundEventDetails {
687                test_case_name: Some(name.into()),
688                test_case_id: Some(test_case_id),
689                ..Default::default()
690            }),
691        )
692    }
693
694    fn case_started_event(timestamp: i64, test_case_id: u32) -> ftest_manager::Event {
695        event_from_details(
696            timestamp,
697            ftest_manager::EventDetails::TestCaseStarted(
698                ftest_manager::TestCaseStartedEventDetails {
699                    test_case_id: Some(test_case_id),
700                    ..Default::default()
701                },
702            ),
703        )
704    }
705
706    fn case_stopped_event(
707        timestamp: i64,
708        test_case_id: u32,
709        result: ftest_manager::TestCaseResult,
710    ) -> ftest_manager::Event {
711        event_from_details(
712            timestamp,
713            ftest_manager::EventDetails::TestCaseStopped(
714                ftest_manager::TestCaseStoppedEventDetails {
715                    test_case_id: Some(test_case_id),
716                    result: Some(result),
717                    ..Default::default()
718                },
719            ),
720        )
721    }
722
723    fn case_finished_event(timestamp: i64, test_case_id: u32) -> ftest_manager::Event {
724        event_from_details(
725            timestamp,
726            ftest_manager::EventDetails::TestCaseFinished(
727                ftest_manager::TestCaseFinishedEventDetails {
728                    test_case_id: Some(test_case_id),
729                    ..Default::default()
730                },
731            ),
732        )
733    }
734
735    fn case_stdout_event(
736        timestamp: i64,
737        test_case_id: u32,
738        stdout: fidl::Socket,
739    ) -> ftest_manager::Event {
740        event_from_details(
741            timestamp,
742            ftest_manager::EventDetails::TestCaseArtifactGenerated(
743                ftest_manager::TestCaseArtifactGeneratedEventDetails {
744                    test_case_id: Some(test_case_id),
745                    artifact: Some(ftest_manager::Artifact::Stdout(stdout)),
746                    ..Default::default()
747                },
748            ),
749        )
750    }
751
752    fn case_stderr_event(
753        timestamp: i64,
754        test_case_id: u32,
755        stderr: fidl::Socket,
756    ) -> ftest_manager::Event {
757        event_from_details(
758            timestamp,
759            ftest_manager::EventDetails::TestCaseArtifactGenerated(
760                ftest_manager::TestCaseArtifactGeneratedEventDetails {
761                    test_case_id: Some(test_case_id),
762                    artifact: Some(ftest_manager::Artifact::Stderr(stderr)),
763                    ..Default::default()
764                },
765            ),
766        )
767    }
768
769    fn suite_started_event(timestamp: i64) -> ftest_manager::Event {
770        event_from_details(
771            timestamp,
772            ftest_manager::EventDetails::SuiteStarted(ftest_manager::SuiteStartedEventDetails {
773                ..Default::default()
774            }),
775        )
776    }
777
778    fn suite_stopped_event(
779        timestamp: i64,
780        result: ftest_manager::SuiteResult,
781    ) -> ftest_manager::Event {
782        event_from_details(
783            timestamp,
784            ftest_manager::EventDetails::SuiteStopped(ftest_manager::SuiteStoppedEventDetails {
785                result: Some(result),
786                ..Default::default()
787            }),
788        )
789    }
790
791    #[fuchsia::test]
792    async fn collect_events_simple() {
793        let all_events = vec![
794            suite_started_event(0),
795            case_found_event(100, 0, "my_test_case"),
796            case_started_event(200, 0),
797            case_stopped_event(300, 0, ftest_manager::TestCaseResult::Passed),
798            case_finished_event(400, 0),
799            suite_stopped_event(500, ftest_manager::SuiteResult::Finished),
800        ];
801
802        let (proxy, stream) = create_proxy_and_stream::<ftest_manager::SuiteControllerMarker>();
803        let test_fut = async move {
804            let reporter = output::InMemoryReporter::new();
805            let run_reporter = output::RunReporter::new(reporter.clone());
806            let suite_reporter = run_reporter.new_suite("test-url").expect("create new suite");
807
808            let suite = RunningSuite::wait_for_start(WaitForStartArgs {
809                proxy,
810                max_severity_logs: None,
811                timeout: None,
812                timeout_grace: std::time::Duration::ZERO,
813                max_pipelined: None,
814                no_cases_equals_success: None,
815            })
816            .await;
817            assert_eq!(
818                run_suite_and_collect_logs(
819                    suite,
820                    &suite_reporter,
821                    diagnostics::LogDisplayConfiguration::default(),
822                    futures::future::pending()
823                )
824                .await
825                .expect("collect results"),
826                Outcome::Passed
827            );
828            suite_reporter.finished().expect("Reporter finished");
829
830            let reports = reporter.get_reports();
831            let case = reports
832                .iter()
833                .find(|report| report.id == EntityId::Case { case: CaseId(0) })
834                .unwrap();
835            assert_eq!(case.report.name, "my_test_case");
836            assert_eq!(case.report.outcome, Some(output::ReportedOutcome::Passed));
837            assert!(case.report.is_finished);
838            assert!(case.report.artifacts.is_empty());
839            assert!(case.report.directories.is_empty());
840            let suite = reports.iter().find(|report| report.id == EntityId::Suite).unwrap();
841            assert_eq!(suite.report.name, "test-url");
842            assert_eq!(suite.report.outcome, Some(output::ReportedOutcome::Passed));
843            assert!(suite.report.is_finished);
844            assert!(suite.report.artifacts.is_empty());
845            assert!(suite.report.directories.is_empty());
846        };
847
848        futures::future::join(serve_all_events(stream, all_events), test_fut).await;
849    }
850
851    #[fuchsia::test]
852    async fn collect_events_with_case_artifacts() {
853        const STDOUT_CONTENT: &str = "stdout from my_test_case";
854        const STDERR_CONTENT: &str = "stderr from my_test_case";
855
856        let (stdout_write, stdout_read) = fidl::Socket::create_stream();
857        let (stderr_write, stderr_read) = fidl::Socket::create_stream();
858        let all_events = vec![
859            suite_started_event(0),
860            case_found_event(100, 0, "my_test_case"),
861            case_started_event(200, 0),
862            case_stdout_event(300, 0, stdout_read),
863            case_stderr_event(300, 0, stderr_read),
864            case_stopped_event(300, 0, ftest_manager::TestCaseResult::Passed),
865            case_finished_event(400, 0),
866            suite_stopped_event(500, ftest_manager::SuiteResult::Finished),
867        ];
868
869        let (proxy, stream) = create_proxy_and_stream::<ftest_manager::SuiteControllerMarker>();
870        let stdio_write_fut = async move {
871            let mut async_stdout = fasync::Socket::from_socket(stdout_write);
872            async_stdout.write_all(STDOUT_CONTENT.as_bytes()).await.expect("write to socket");
873            let mut async_stderr = fasync::Socket::from_socket(stderr_write);
874            async_stderr.write_all(STDERR_CONTENT.as_bytes()).await.expect("write to socket");
875        };
876        let test_fut = async move {
877            let reporter = output::InMemoryReporter::new();
878            let run_reporter = output::RunReporter::new(reporter.clone());
879            let suite_reporter = run_reporter.new_suite("test-url").expect("create new suite");
880
881            let suite = RunningSuite::wait_for_start(WaitForStartArgs {
882                proxy,
883                max_severity_logs: None,
884                timeout: None,
885                timeout_grace: std::time::Duration::ZERO,
886                max_pipelined: None,
887                no_cases_equals_success: None,
888            })
889            .await;
890            assert_eq!(
891                run_suite_and_collect_logs(
892                    suite,
893                    &suite_reporter,
894                    diagnostics::LogDisplayConfiguration::default(),
895                    futures::future::pending()
896                )
897                .await
898                .expect("collect results"),
899                Outcome::Passed
900            );
901            suite_reporter.finished().expect("Reporter finished");
902
903            let reports = reporter.get_reports();
904            let case = reports
905                .iter()
906                .find(|report| report.id == EntityId::Case { case: CaseId(0) })
907                .unwrap();
908            assert_eq!(case.report.name, "my_test_case");
909            assert_eq!(case.report.outcome, Some(output::ReportedOutcome::Passed));
910            assert!(case.report.is_finished);
911            assert_eq!(case.report.artifacts.len(), 2);
912            assert_eq!(
913                case.report
914                    .artifacts
915                    .iter()
916                    .map(|(artifact_type, artifact)| (*artifact_type, artifact.get_contents()))
917                    .collect::<HashMap<_, _>>(),
918                hashmap! {
919                    output::ArtifactType::Stdout => STDOUT_CONTENT.as_bytes().to_vec(),
920                    output::ArtifactType::Stderr => STDERR_CONTENT.as_bytes().to_vec()
921                }
922            );
923            assert!(case.report.directories.is_empty());
924
925            let suite = reports.iter().find(|report| report.id == EntityId::Suite).unwrap();
926            assert_eq!(suite.report.name, "test-url");
927            assert_eq!(suite.report.outcome, Some(output::ReportedOutcome::Passed));
928            assert!(suite.report.is_finished);
929            assert!(suite.report.artifacts.is_empty());
930            assert!(suite.report.directories.is_empty());
931        };
932
933        futures::future::join3(serve_all_events(stream, all_events), stdio_write_fut, test_fut)
934            .await;
935    }
936
937    #[fuchsia::test]
938    async fn collect_events_case_artifacts_complete_after_suite() {
939        const STDOUT_CONTENT: &str = "stdout from my_test_case";
940        const STDERR_CONTENT: &str = "stderr from my_test_case";
941
942        let (stdout_write, stdout_read) = fidl::Socket::create_stream();
943        let (stderr_write, stderr_read) = fidl::Socket::create_stream();
944        let all_events = vec![
945            suite_started_event(0),
946            case_found_event(100, 0, "my_test_case"),
947            case_started_event(200, 0),
948            case_stdout_event(300, 0, stdout_read),
949            case_stderr_event(300, 0, stderr_read),
950            case_stopped_event(300, 0, ftest_manager::TestCaseResult::Passed),
951            case_finished_event(400, 0),
952            suite_stopped_event(500, ftest_manager::SuiteResult::Finished),
953        ];
954
955        let (proxy, stream) = create_proxy_and_stream::<ftest_manager::SuiteControllerMarker>();
956        let serve_fut = async move {
957            // server side will send all events, then write to (and close) sockets.
958            serve_all_events(stream, all_events).await;
959            let mut async_stdout = fasync::Socket::from_socket(stdout_write);
960            async_stdout.write_all(STDOUT_CONTENT.as_bytes()).await.expect("write to socket");
961            let mut async_stderr = fasync::Socket::from_socket(stderr_write);
962            async_stderr.write_all(STDERR_CONTENT.as_bytes()).await.expect("write to socket");
963        };
964        let test_fut = async move {
965            let reporter = output::InMemoryReporter::new();
966            let run_reporter = output::RunReporter::new(reporter.clone());
967            let suite_reporter = run_reporter.new_suite("test-url").expect("create new suite");
968
969            let suite = RunningSuite::wait_for_start(WaitForStartArgs {
970                proxy,
971                max_severity_logs: None,
972                timeout: None,
973                timeout_grace: std::time::Duration::ZERO,
974                max_pipelined: Some(1),
975                no_cases_equals_success: None,
976            })
977            .await;
978            assert_eq!(
979                run_suite_and_collect_logs(
980                    suite,
981                    &suite_reporter,
982                    diagnostics::LogDisplayConfiguration::default(),
983                    futures::future::pending()
984                )
985                .await
986                .expect("collect results"),
987                Outcome::Passed
988            );
989            suite_reporter.finished().expect("Reporter finished");
990
991            let reports = reporter.get_reports();
992            let case = reports
993                .iter()
994                .find(|report| report.id == EntityId::Case { case: CaseId(0) })
995                .unwrap();
996            assert_eq!(case.report.name, "my_test_case");
997            assert_eq!(case.report.outcome, Some(output::ReportedOutcome::Passed));
998            assert!(case.report.is_finished);
999            assert_eq!(case.report.artifacts.len(), 2);
1000            assert_eq!(
1001                case.report
1002                    .artifacts
1003                    .iter()
1004                    .map(|(artifact_type, artifact)| (*artifact_type, artifact.get_contents()))
1005                    .collect::<HashMap<_, _>>(),
1006                hashmap! {
1007                    output::ArtifactType::Stdout => STDOUT_CONTENT.as_bytes().to_vec(),
1008                    output::ArtifactType::Stderr => STDERR_CONTENT.as_bytes().to_vec()
1009                }
1010            );
1011            assert!(case.report.directories.is_empty());
1012
1013            let suite = reports.iter().find(|report| report.id == EntityId::Suite).unwrap();
1014            assert_eq!(suite.report.name, "test-url");
1015            assert_eq!(suite.report.outcome, Some(output::ReportedOutcome::Passed));
1016            assert!(suite.report.is_finished);
1017            assert!(suite.report.artifacts.is_empty());
1018            assert!(suite.report.directories.is_empty());
1019        };
1020
1021        futures::future::join(serve_fut, test_fut).await;
1022    }
1023
1024    #[fuchsia::test]
1025    async fn collect_events_with_case_artifacts_sent_after_case_stopped() {
1026        const STDOUT_CONTENT: &str = "stdout from my_test_case";
1027        const STDERR_CONTENT: &str = "stderr from my_test_case";
1028
1029        let (stdout_write, stdout_read) = fidl::Socket::create_stream();
1030        let (stderr_write, stderr_read) = fidl::Socket::create_stream();
1031        let all_events = vec![
1032            suite_started_event(0),
1033            case_found_event(100, 0, "my_test_case"),
1034            case_started_event(200, 0),
1035            case_stopped_event(300, 0, ftest_manager::TestCaseResult::Passed),
1036            case_stdout_event(300, 0, stdout_read),
1037            case_stderr_event(300, 0, stderr_read),
1038            case_finished_event(400, 0),
1039            suite_stopped_event(500, ftest_manager::SuiteResult::Finished),
1040        ];
1041
1042        let (proxy, stream) = create_proxy_and_stream::<ftest_manager::SuiteControllerMarker>();
1043        let stdio_write_fut = async move {
1044            let mut async_stdout = fasync::Socket::from_socket(stdout_write);
1045            async_stdout.write_all(STDOUT_CONTENT.as_bytes()).await.expect("write to socket");
1046            let mut async_stderr = fasync::Socket::from_socket(stderr_write);
1047            async_stderr.write_all(STDERR_CONTENT.as_bytes()).await.expect("write to socket");
1048        };
1049        let test_fut = async move {
1050            let reporter = output::InMemoryReporter::new();
1051            let run_reporter = output::RunReporter::new(reporter.clone());
1052            let suite_reporter = run_reporter.new_suite("test-url").expect("create new suite");
1053
1054            let suite = RunningSuite::wait_for_start(WaitForStartArgs {
1055                proxy,
1056                max_severity_logs: None,
1057                timeout: None,
1058                timeout_grace: std::time::Duration::ZERO,
1059                max_pipelined: None,
1060                no_cases_equals_success: None,
1061            })
1062            .await;
1063            assert_eq!(
1064                run_suite_and_collect_logs(
1065                    suite,
1066                    &suite_reporter,
1067                    diagnostics::LogDisplayConfiguration::default(),
1068                    futures::future::pending()
1069                )
1070                .await
1071                .expect("collect results"),
1072                Outcome::Passed
1073            );
1074            suite_reporter.finished().expect("Reporter finished");
1075
1076            let reports = reporter.get_reports();
1077            let case = reports
1078                .iter()
1079                .find(|report| report.id == EntityId::Case { case: CaseId(0) })
1080                .unwrap();
1081            assert_eq!(case.report.name, "my_test_case");
1082            assert_eq!(case.report.outcome, Some(output::ReportedOutcome::Passed));
1083            assert!(case.report.is_finished);
1084            assert_eq!(case.report.artifacts.len(), 2);
1085            assert_eq!(
1086                case.report
1087                    .artifacts
1088                    .iter()
1089                    .map(|(artifact_type, artifact)| (*artifact_type, artifact.get_contents()))
1090                    .collect::<HashMap<_, _>>(),
1091                hashmap! {
1092                    output::ArtifactType::Stdout => STDOUT_CONTENT.as_bytes().to_vec(),
1093                    output::ArtifactType::Stderr => STDERR_CONTENT.as_bytes().to_vec()
1094                }
1095            );
1096            assert!(case.report.directories.is_empty());
1097
1098            let suite = reports.iter().find(|report| report.id == EntityId::Suite).unwrap();
1099            assert_eq!(suite.report.name, "test-url");
1100            assert_eq!(suite.report.outcome, Some(output::ReportedOutcome::Passed));
1101            assert!(suite.report.is_finished);
1102            assert!(suite.report.artifacts.is_empty());
1103            assert!(suite.report.directories.is_empty());
1104        };
1105
1106        futures::future::join3(serve_all_events(stream, all_events), stdio_write_fut, test_fut)
1107            .await;
1108    }
1109
1110    #[fuchsia::test]
1111    async fn collect_events_timed_out_case_with_hanging_artifacts() {
1112        // create sockets and leave the server end open to simulate a hang.
1113        let (_stdout_write, stdout_read) = fidl::Socket::create_stream();
1114        let (_stderr_write, stderr_read) = fidl::Socket::create_stream();
1115        let all_events = vec![
1116            suite_started_event(0),
1117            case_found_event(100, 0, "my_test_case"),
1118            case_started_event(200, 0),
1119            case_stdout_event(300, 0, stdout_read),
1120            case_stderr_event(300, 0, stderr_read),
1121        ];
1122
1123        let (proxy, stream) = create_proxy_and_stream::<ftest_manager::SuiteControllerMarker>();
1124        let test_fut = async move {
1125            let reporter = output::InMemoryReporter::new();
1126            let run_reporter = output::RunReporter::new(reporter.clone());
1127            let suite_reporter = run_reporter.new_suite("test-url").expect("create new suite");
1128
1129            let suite = RunningSuite::wait_for_start(WaitForStartArgs {
1130                proxy,
1131                max_severity_logs: None,
1132                timeout: Some(std::time::Duration::from_secs(2)),
1133                timeout_grace: std::time::Duration::ZERO,
1134                max_pipelined: None,
1135                no_cases_equals_success: None,
1136            })
1137            .await;
1138            assert_eq!(
1139                run_suite_and_collect_logs(
1140                    suite,
1141                    &suite_reporter,
1142                    diagnostics::LogDisplayConfiguration::default(),
1143                    futures::future::pending()
1144                )
1145                .await
1146                .expect("collect results"),
1147                Outcome::Timedout
1148            );
1149            suite_reporter.finished().expect("Reporter finished");
1150
1151            let reports = reporter.get_reports();
1152            let case = reports
1153                .iter()
1154                .find(|report| report.id == EntityId::Case { case: CaseId(0) })
1155                .unwrap();
1156            assert_eq!(case.report.name, "my_test_case");
1157            assert_eq!(case.report.outcome, Some(output::ReportedOutcome::Timedout));
1158            assert!(case.report.is_finished);
1159            assert_eq!(case.report.artifacts.len(), 2);
1160            assert_eq!(
1161                case.report
1162                    .artifacts
1163                    .iter()
1164                    .map(|(artifact_type, artifact)| (*artifact_type, artifact.get_contents()))
1165                    .collect::<HashMap<_, _>>(),
1166                hashmap! {
1167                    output::ArtifactType::Stdout => vec![],
1168                    output::ArtifactType::Stderr => vec![]
1169                }
1170            );
1171            assert!(case.report.directories.is_empty());
1172
1173            let suite = reports.iter().find(|report| report.id == EntityId::Suite).unwrap();
1174            assert_eq!(suite.report.name, "test-url");
1175            assert_eq!(suite.report.outcome, Some(output::ReportedOutcome::Timedout));
1176            assert!(suite.report.is_finished);
1177            assert!(suite.report.artifacts.is_empty());
1178            assert!(suite.report.directories.is_empty());
1179        };
1180
1181        futures::future::join(serve_all_events_then_hang(stream, all_events), test_fut).await;
1182    }
1183}