Skip to main content

run_test_suite_lib/
artifacts.rs

1// Copyright 2022 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::artifacts;
6use crate::cancel::NamedFutureExt;
7use crate::diagnostics::{self, LogCollectionOutcome};
8use crate::outcome::{ConnectionError, RunTestSuiteError, UnexpectedEventError};
9use crate::output::{
10    ArtifactType, DirectoryArtifactType, DynDirectoryArtifact, DynReporter, EntityReporter,
11};
12use crate::stream_util::StreamUtil;
13use anyhow::{Context as _, anyhow};
14use flex_client::Peered;
15use futures::AsyncReadExt;
16use futures::future::{BoxFuture, FutureExt, TryFutureExt, join_all};
17use futures::stream::{FuturesUnordered, StreamExt, TryStreamExt};
18use log::{debug, warn};
19use std::borrow::Borrow;
20use std::collections::VecDeque;
21use std::io::Write;
22use std::path::PathBuf;
23
24#[cfg(feature = "fdomain")]
25use test_diagnostics_fdomain as test_diagnostics;
26
27use flex_fuchsia_io as fio;
28use flex_fuchsia_test_manager as ftest_manager;
29use fuchsia_async as fasync;
30use test_diagnostics::zstd_compress::Decoder;
31
32/// Given an |artifact| reported over fuchsia.test.manager, create the appropriate artifact in the
33/// reporter. Returns a Future, which when polled to completion, drains the results from |artifact|
34/// and saves them to the reporter.
35///
36/// This method is an async method returning a Future so that the lifetime of |reporter| is not
37/// tied to the lifetime of the Future.
38/// The returned Future resolves to LogCollectionOutcome when logs are processed.
39pub(crate) async fn drain_artifact<'a, E, T>(
40    reporter: &'a EntityReporter<E, T>,
41    artifact: ftest_manager::Artifact,
42    log_opts: diagnostics::LogCollectionOptions,
43) -> Result<
44    BoxFuture<'static, Result<Option<LogCollectionOutcome>, anyhow::Error>>,
45    RunTestSuiteError,
46>
47where
48    T: Borrow<DynReporter>,
49{
50    match artifact {
51        ftest_manager::Artifact::Stdout(socket) => {
52            let stdout = reporter.new_artifact(&ArtifactType::Stdout)?;
53            Ok(copy_socket_artifact(socket, stdout).map_ok(|_| None).named("stdout").boxed())
54        }
55        ftest_manager::Artifact::Stderr(socket) => {
56            let stderr = reporter.new_artifact(&ArtifactType::Stderr)?;
57            Ok(copy_socket_artifact(socket, stderr).map_ok(|_| None).named("stderr").boxed())
58        }
59        ftest_manager::Artifact::Log(syslog) => {
60            let syslog_artifact = reporter.new_artifact(&ArtifactType::Syslog)?;
61            Ok(diagnostics::collect_logs(
62                test_diagnostics::LogStream::from_syslog(syslog)
63                    .map_err(|e| RunTestSuiteError::Connection(ConnectionError(e.into())))?,
64                syslog_artifact,
65                log_opts,
66            )
67            .map_ok(Some)
68            .named("syslog")
69            .boxed())
70        }
71        ftest_manager::Artifact::Custom(ftest_manager::CustomArtifact {
72            directory_and_token,
73            component_moniker,
74            ..
75        }) => {
76            let ftest_manager::DirectoryAndToken { directory, token, .. } = directory_and_token
77                .ok_or(UnexpectedEventError::MissingRequiredField {
78                    containing_struct: "CustomArtifact",
79                    field: "directory_and_token",
80                })?;
81            let directory_artifact = reporter
82                .new_directory_artifact(&DirectoryArtifactType::Custom, component_moniker)?;
83            Ok(async move {
84                let directory = directory.into_proxy();
85                let result =
86                    artifacts::copy_custom_artifact_directory(directory, directory_artifact).await;
87                // TODO(https://fxbug.dev/42165719): Remove this signal once Overnet
88                // supports automatically signalling EVENTPAIR_CLOSED when the
89                // handle is closed.
90                let _ = token.signal_peer(fidl::Signals::empty(), fidl::Signals::USER_0);
91                result
92            }
93            .map_ok(|()| None)
94            .named("custom_artifacts")
95            .boxed())
96        }
97        ftest_manager::Artifact::DebugData(iterator) => {
98            let output_directory = reporter
99                .new_directory_artifact(&DirectoryArtifactType::Debug, None /* moniker */)?;
100            Ok(artifacts::copy_debug_data(iterator.into_proxy(), output_directory)
101                .map(|()| Ok(None))
102                .named("debug_data")
103                .boxed())
104        }
105        ftest_manager::ArtifactUnknown!() => {
106            warn!("Encountered an unknown artifact");
107            Ok(futures::future::ready(Ok(None)).boxed())
108        }
109    }
110}
111
112/// Copy an artifact reported over a socket.
113async fn copy_socket_artifact<W: Write>(
114    socket: flex_client::Socket,
115    mut artifact: W,
116) -> Result<usize, anyhow::Error> {
117    let mut async_socket = flex_client::socket_to_async(socket);
118    let mut len = 0;
119    loop {
120        let done =
121            test_diagnostics::SocketReadFut::new(&mut async_socket, |maybe_buf| match maybe_buf {
122                Some(buf) => {
123                    len += buf.len();
124                    artifact.write_all(buf)?;
125                    Ok(false)
126                }
127                None => Ok(true),
128            })
129            .await?;
130        if done {
131            artifact.flush()?;
132            return Ok(len);
133        }
134    }
135}
136
137/// Copy and decompress (zstd) the artifact reported over a socket.
138/// Returns (decompressed, compressed) sizes.
139async fn copy_socket_artifact_and_decompress<W: Write>(
140    socket: flex_client::Socket,
141    mut artifact: W,
142) -> Result<(usize, usize), anyhow::Error> {
143    let mut async_socket = flex_client::socket_to_async(socket);
144    let mut buf = vec![0u8; 1024 * 1024 * 2];
145
146    let (mut decoder, mut receiver) = Decoder::new();
147    let task: fasync::Task<Result<usize, anyhow::Error>> = fasync::Task::spawn(async move {
148        let mut len = 0;
149        loop {
150            let l = async_socket.read(&mut buf).await?;
151            match l {
152                0 => {
153                    decoder.finish().await?;
154                    break;
155                }
156                _ => {
157                    len += l;
158                    decoder.decompress(&buf[..l]).await?;
159                }
160            }
161        }
162        Ok(len)
163    });
164
165    let mut decompressed_len = 0;
166    while let Some(buf) = receiver.next().await {
167        decompressed_len += buf.len();
168        artifact.write_all(&buf)?;
169    }
170    artifact.flush()?;
171
172    let compressed_len = task.await?;
173    return Ok((decompressed_len, compressed_len));
174}
175
176/// Copy debug data reported over a debug data iterator to an output directory.
177pub async fn copy_debug_data(
178    iterator: ftest_manager::DebugDataIteratorProxy,
179    output_directory: Box<DynDirectoryArtifact>,
180) {
181    let start = std::time::Instant::now();
182    const PIPELINED_REQUESTS: usize = 4;
183    let unprocessed_data_stream =
184        futures::stream::repeat_with(move || iterator.get_next_compressed())
185            .buffered(PIPELINED_REQUESTS);
186    let terminated_event_stream =
187        unprocessed_data_stream.take_until_stop_after(|result| match &result {
188            Ok(events) => events.is_empty(),
189            _ => true,
190        });
191
192    let data_futs = terminated_event_stream
193        .map(|result| match result {
194            Ok(vals) => vals,
195            Err(e) => {
196                warn!("Request failure: {:?}", e);
197                vec![]
198            }
199        })
200        .map(futures::stream::iter)
201        .flatten()
202        .map(|debug_data| {
203            let output =
204                debug_data.name.as_ref().ok_or_else(|| anyhow!("Missing profile name")).and_then(
205                    |name| {
206                        output_directory.new_file(&PathBuf::from(name)).map_err(anyhow::Error::from)
207                    },
208                );
209            fasync::Task::spawn(async move {
210                let _ = &debug_data;
211                let mut output = output?;
212                let socket =
213                    debug_data.socket.ok_or_else(|| anyhow!("Missing profile socket handle"))?;
214                debug!("Reading run profile \"{:?}\"", debug_data.name);
215                let start = std::time::Instant::now();
216                let (decompressed_len, compressed_len) =
217                    copy_socket_artifact_and_decompress(socket, &mut output).await.map_err(
218                        |e| {
219                            warn!("Error copying artifact '{:?}': {:?}", debug_data.name, e);
220                            e
221                        },
222                    )?;
223
224                debug!(
225                    "Copied file {:?}: {}({} - compressed) bytes in {:?}",
226                    debug_data.name,
227                    decompressed_len,
228                    compressed_len,
229                    start.elapsed()
230                );
231                Ok::<(), anyhow::Error>(())
232            })
233        })
234        .collect::<Vec<_>>()
235        .await;
236    join_all(data_futs).await;
237    debug!("All profiles downloaded in {:?}", start.elapsed());
238}
239
240/// Copy a directory into a directory artifact.
241async fn copy_custom_artifact_directory(
242    directory: fio::DirectoryProxy,
243    out_dir: Box<DynDirectoryArtifact>,
244) -> Result<(), anyhow::Error> {
245    let mut paths = vec![];
246    let mut enumerate = fuchsia_fs::directory::readdir_recursive(&directory, None);
247    while let Ok(Some(file)) = enumerate.try_next().await {
248        if file.kind == fuchsia_fs::directory::DirentKind::File {
249            paths.push(file.name);
250        }
251    }
252
253    let futs = FuturesUnordered::new();
254    paths.iter().for_each(|path| {
255        let file =
256            fuchsia_fs::directory::open_file_async(&directory, path, fuchsia_fs::PERM_READABLE);
257        let output_file = out_dir.new_file(std::path::Path::new(path));
258        futs.push(async move {
259            let file = file.with_context(|| format!("with path {:?}", path))?;
260            let mut output_file = output_file?;
261
262            copy_file_to_writer(&file, &mut output_file).await.map(|_| ())
263        });
264    });
265
266    futs.for_each(|result| {
267        if let Err(e) = result {
268            warn!("Custom artifact failure: {}", e);
269        }
270        async move {}
271    })
272    .await;
273
274    Ok(())
275}
276
277async fn copy_file_to_writer<T: Write>(
278    file: &fio::FileProxy,
279    output: &mut T,
280) -> Result<usize, anyhow::Error> {
281    const READ_SIZE: u64 = fio::MAX_BUF;
282
283    let mut vector = VecDeque::new();
284    // Arbitrary number of reads to pipeline.
285    const PIPELINED_READ_COUNT: u64 = 4;
286    for _n in 0..PIPELINED_READ_COUNT {
287        vector.push_back(file.read(READ_SIZE));
288    }
289    let mut len = 0;
290    loop {
291        let buf = vector.pop_front().unwrap().await?.map_err(zx_status::Status::from_raw)?;
292        if buf.is_empty() {
293            break;
294        }
295        len += buf.len();
296        output.write_all(&buf)?;
297        vector.push_back(file.read(READ_SIZE));
298    }
299    Ok(len)
300}
301
302#[cfg(test)]
303mod socket_tests {
304    use super::*;
305    use futures::AsyncWriteExt;
306
307    #[fuchsia::test]
308    async fn copy_socket() {
309        let cases = vec![vec![], b"0123456789abcde".to_vec(), vec![0u8; 4096]];
310
311        for case in cases.iter() {
312            let (client_socket, server_socket) = fidl::Socket::create_stream();
313            let mut output = vec![];
314            let write_fut = async move {
315                let mut async_socket = fidl::AsyncSocket::from_socket(server_socket);
316                async_socket.write_all(case.as_slice()).await.expect("write bytes");
317            };
318
319            let ((), res) =
320                futures::future::join(write_fut, copy_socket_artifact(client_socket, &mut output))
321                    .await;
322            res.expect("copy contents");
323            assert_eq!(output.as_slice(), case.as_slice());
324        }
325    }
326}
327
328// These tests use vfs, which is only available on Fuchsia.
329#[cfg(target_os = "fuchsia")]
330#[cfg(test)]
331mod file_tests {
332    use super::*;
333    use crate::output::InMemoryDirectoryWriter;
334    use flex_fuchsia_io as fio;
335    use fuchsia_async as fasync;
336    use futures::prelude::*;
337    use maplit::hashmap;
338    use std::collections::HashMap;
339    use std::sync::Arc;
340    use vfs::directory::helper::DirectlyMutable;
341    use vfs::directory::immutable::Simple;
342    use vfs::file::vmo::read_only;
343    use vfs::pseudo_directory;
344
345    async fn serve_content_over_socket(content: Vec<u8>, socket: zx::Socket) {
346        let mut socket = fidl::AsyncSocket::from_socket(socket);
347        socket.write_all(content.as_slice()).await.expect("Cannot serve content over socket");
348    }
349
350    async fn serve_and_copy_debug_data(
351        expected_files: &HashMap<PathBuf, Vec<u8>>,
352        directory_writer: InMemoryDirectoryWriter,
353    ) {
354        let mut served_files = vec![];
355        expected_files.iter().for_each(|(path, content)| {
356            let mut compressor = zstd::bulk::Compressor::new(0).unwrap();
357            let bytes = compressor.compress(&content).unwrap();
358            let (client, server) = zx::Socket::create_stream();
359            fasync::Task::spawn(serve_content_over_socket(bytes, server)).detach();
360            served_files.push(ftest_manager::DebugData {
361                name: Some(path.display().to_string()),
362                socket: Some(client.into()),
363                ..Default::default()
364            });
365        });
366
367        let (iterator_proxy, mut iterator_stream) =
368            fidl::endpoints::create_proxy_and_stream::<ftest_manager::DebugDataIteratorMarker>();
369        let serve_fut = async move {
370            let mut files_iter = served_files.into_iter();
371            while let Ok(Some(request)) = iterator_stream.try_next().await {
372                let responder = match request {
373                    ftest_manager::DebugDataIteratorRequest::GetNext { .. } => {
374                        panic!("Not Implemented");
375                    }
376                    ftest_manager::DebugDataIteratorRequest::GetNextCompressed { responder } => {
377                        responder
378                    }
379                };
380                let resp: Vec<_> = files_iter.by_ref().take(3).collect();
381                let _ = responder.send(resp);
382            }
383        };
384        futures::future::join(
385            serve_fut,
386            copy_debug_data(iterator_proxy, Box::new(directory_writer)),
387        )
388        .await;
389    }
390
391    fn test_cases() -> Vec<(&'static str, Arc<Simple>, HashMap<PathBuf, Vec<u8>>)> {
392        vec![
393            ("empty", pseudo_directory! {}, hashmap! {}),
394            (
395                "single file",
396                pseudo_directory! {
397                    "test_file.txt" => read_only("Hello, World!"),
398                },
399                hashmap! {
400                    "test_file.txt".to_string().into() => b"Hello, World!".to_vec()
401                },
402            ),
403            (
404                "subdir",
405                pseudo_directory! {
406                    "sub" => pseudo_directory! {
407                        "nested.txt" => read_only("Nested file!"),
408                    }
409                },
410                hashmap! {
411                    "sub/nested.txt".to_string().into() => b"Nested file!".to_vec()
412                },
413            ),
414            (
415                "empty file",
416                pseudo_directory! {
417                    "empty.txt" => read_only(""),
418                },
419                hashmap! {
420                    "empty.txt".to_string().into() => b"".to_vec()
421                },
422            ),
423            (
424                "big file",
425                pseudo_directory! {
426                    "big.txt" => read_only(vec![b's'; (fio::MAX_BUF as usize)*2]),
427                },
428                hashmap! {
429                    "big.txt".to_string().into() => vec![b's'; (fio::MAX_BUF as usize) *2 as usize]
430                },
431            ),
432            (
433                "100 files",
434                {
435                    let dir = pseudo_directory! {};
436                    for i in 0..100 {
437                        dir.add_entry(
438                            format!("{:?}.txt", i),
439                            read_only(format!("contents for {:?}", i)),
440                        )
441                        .expect("add file");
442                    }
443                    dir
444                },
445                (0..100)
446                    .map(|i| {
447                        (
448                            format!("{:?}.txt", i).into(),
449                            format!("contents for {:?}", i).into_bytes(),
450                        )
451                    })
452                    .collect(),
453            ),
454        ]
455    }
456
457    #[fuchsia::test]
458    async fn test_copy_dir() {
459        for (name, fake_dir, expected_files) in test_cases() {
460            let directory =
461                vfs::directory::serve(fake_dir, fio::PERM_READABLE | fio::PERM_WRITABLE);
462            let artifact = InMemoryDirectoryWriter::default();
463            copy_custom_artifact_directory(directory, Box::new(artifact.clone()))
464                .await
465                .expect("reading custom directory");
466            let actual_files: HashMap<_, _> = artifact
467                .files
468                .lock()
469                .iter()
470                .map(|(path, artifact)| (path.clone(), artifact.get_contents()))
471                .collect();
472            assert_eq!(expected_files, actual_files, "{}", name);
473        }
474    }
475
476    #[fuchsia::test]
477    async fn test_copy_debug_data() {
478        for (name, _fake_dir, expected_files) in test_cases() {
479            let artifact = InMemoryDirectoryWriter::default();
480            serve_and_copy_debug_data(&expected_files, artifact.clone()).await;
481            let actual_files: HashMap<_, _> = artifact
482                .files
483                .lock()
484                .iter()
485                .map(|(path, artifact)| (path.clone(), artifact.get_contents()))
486                .collect();
487            assert_eq!(expected_files, actual_files, "{}", name);
488        }
489    }
490}